| 3 min read

How to Structure LangGraph State Machines for Production

LangGraph state machines AI agents LangChain Python production

LangGraph Changed How I Build AI Agents

Before LangGraph, my multi-step AI workflows were tangled chains of function calls with ad-hoc error handling. LangGraph brings state machine discipline to agent development. It forces you to think about states, transitions, and failure modes upfront, which is exactly what production systems need.

Here is how I structure LangGraph state machines for systems that need to run reliably at scale.

State Design Principles

The most important decision in a LangGraph application is your state schema. Get this wrong and you will be refactoring constantly.

from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, END
from operator import add

class AgentState(TypedDict):
    # Input fields
    query: str
    context: dict
    
    # Processing fields
    plan: list[str]
    current_step: int
    intermediate_results: Annotated[list[dict], add]
    
    # Control fields
    status: Literal["planning", "executing", "reviewing", "complete", "failed"]
    error_count: int
    max_retries: int
    
    # Output fields
    final_result: str
    metadata: dict

Three Categories of State

I always organize state into three categories:

  • Input fields: Set once at the beginning, never modified by nodes
  • Processing fields: Modified by nodes as work progresses. Use reducers like add for append-only lists
  • Control fields: Drive routing decisions. Status fields, counters, and flags

Graph Structure

def build_graph() -> StateGraph:
    graph = StateGraph(AgentState)
    
    # Add nodes
    graph.add_node("planner", planner_node)
    graph.add_node("executor", executor_node)
    graph.add_node("reviewer", reviewer_node)
    graph.add_node("error_handler", error_handler_node)
    
    # Set entry point
    graph.set_entry_point("planner")
    
    # Add conditional edges
    graph.add_conditional_edges(
        "planner",
        route_after_planning,
        {"execute": "executor", "failed": "error_handler"}
    )
    
    graph.add_conditional_edges(
        "executor",
        route_after_execution,
        {"review": "reviewer", "retry": "executor", "failed": "error_handler"}
    )
    
    graph.add_conditional_edges(
        "reviewer",
        route_after_review,
        {"complete": END, "revise": "executor", "failed": "error_handler"}
    )
    
    graph.add_edge("error_handler", END)
    
    return graph.compile()

Node Implementation Patterns

Each node should be a focused function that reads from state, performs one operation, and returns state updates.

async def planner_node(state: AgentState) -> dict:
    try:
        response = await llm.ainvoke(
            f"Create a step-by-step plan to answer: {state['query']}\n"
            f"Context: {state['context']}"
        )
        
        plan = parse_plan(response.content)
        return {
            "plan": plan,
            "current_step": 0,
            "status": "executing"
        }
    except Exception as e:
        return {
            "status": "failed",
            "error_count": state.get("error_count", 0) + 1,
            "metadata": {"error": str(e), "node": "planner"}
        }

async def executor_node(state: AgentState) -> dict:
    step_index = state["current_step"]
    step = state["plan"][step_index]
    
    try:
        result = await execute_step(step, state["intermediate_results"])
        
        return {
            "intermediate_results": [result],  # Reducer will append
            "current_step": step_index + 1,
            "status": "executing" if step_index + 1 < len(state["plan"]) else "reviewing"
        }
    except Exception as e:
        return {
            "error_count": state.get("error_count", 0) + 1,
            "metadata": {"error": str(e), "node": "executor", "step": step_index}
        }

Routing Functions

def route_after_execution(state: AgentState) -> str:
    if state["error_count"] >= state.get("max_retries", 3):
        return "failed"
    
    if state["status"] == "reviewing":
        return "review"
    
    if state.get("metadata", {}).get("error"):
        return "retry"
    
    return "review" if state["current_step"] >= len(state["plan"]) else "execute"

Checkpointing for Long-Running Workflows

For workflows that take minutes or hours, checkpointing lets you resume from the last successful state after a crash.

from langgraph.checkpoint.sqlite import SqliteSaver

# Create checkpointer
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")

# Compile graph with checkpointing
graph = build_graph()
app = graph.compile(checkpointer=checkpointer)

# Run with thread ID for resumability
config = {"configurable": {"thread_id": "workflow-123"}}
result = await app.ainvoke(initial_state, config=config)

# Resume after crash
result = await app.ainvoke(None, config=config)  # Resumes from checkpoint

Testing LangGraph Applications

Testing state machines requires testing both individual nodes and the routing logic.

import pytest

async def test_planner_node_success():
    state = {
        "query": "What is the weather in London?",
        "context": {},
        "error_count": 0
    }
    result = await planner_node(state)
    assert result["status"] == "executing"
    assert len(result["plan"]) > 0

def test_routing_after_max_retries():
    state = {
        "error_count": 3,
        "max_retries": 3,
        "status": "executing",
        "plan": ["step1"],
        "current_step": 0
    }
    assert route_after_execution(state) == "failed"

async def test_full_workflow():
    app = build_graph()
    result = await app.ainvoke({
        "query": "Summarize the latest sales data",
        "context": {"department": "engineering"},
        "max_retries": 3,
        "error_count": 0
    })
    assert result["status"] in ["complete", "failed"]

Production Checklist

  • Every node must handle exceptions and update error state
  • All routing functions must handle unexpected state combinations
  • Set maximum iteration limits to prevent infinite loops
  • Use checkpointing for any workflow that takes more than 30 seconds
  • Log state transitions for debugging and monitoring
  • Add timeout limits per node to prevent hanging on slow LLM calls

LangGraph is the most production-ready framework I have used for building AI agents. The state machine paradigm might feel constraining at first, but that constraint is exactly what prevents the chaos that plagues most agent systems in production.