LangGraph Best Practices
Vertiefung für Teilnehmende, die nach dem Einsteiger-Guide robuste LangGraph-Muster für echte Workflows nachschlagen möchten.
Diese Seite ist keine erste Einführung in LangGraph. Sie richtet sich an Teilnehmende, die bereits verstanden haben, warum ein Graph nötig ist und nun stabilere Muster für State, Routing, Checkpointing oder Multi-Agent-Flüsse suchen. Für den ersten Zugang empfiehlt sich zuerst LangGraph Einsteiger.
Der Ton dieser Seite ist bewusst normativer als in den Konzept- und Einsteigerseiten. Im Kurs bedeutet das nicht, dass jede Funktion sofort verwendet werden muss. Wichtiger ist zu verstehen, wann ein Pattern wirklich nötig ist und wann es für den aktuellen Schritt noch überdimensioniert wäre.
Inhaltsverzeichnis
- 🎯 Wann LangGraph statt einfachem
create_agent()? - Überblick der wichtigsten LangGraph-Standards
- 1️⃣ StateGraph mit TypedDict - Type-Safe State Management
- 2️⃣ Nodes & Edges - Workflow-Definition
- 3️⃣ Conditional Routing - Dynamische Entscheidungen
- 4️⃣ Checkpointing & Memory - Persistenz & Recovery
- 5️⃣ Human-in-the-Loop (erweitert) - Interrupt & Resume
- 6️⃣ Subgraphs & Multi-Agent - Modulare Systeme
- 7️⃣ Stream Modes - Debugging & Monitoring
- 🚀 Complete Example: Multi-Agent Research System
- 📚 Import-Cheatsheet
- ⚠️ Migration von LangChain zu LangGraph
- 🎯 Best Practices Zusammenfassung
- 📖 Weitere Ressourcen
- Abgrenzung zu verwandten Dokumenten
- 📝 Changelog
🎯 Wann LangGraph statt einfachem create_agent()?
Für Einsteiger ist diese Tabelle wichtiger als viele Detailabschnitte weiter unten. Erst wenn der Anwendungsfall wirklich mehrstufig, verzweigt oder langlebig wird, lohnt sich die Vertiefung in die restlichen Patterns.
| Use Case | LangChain create_agent() | LangGraph |
|---|---|---|
| Einfacher Agent mit Tools | ✅ Perfekt | ❌ Overkill |
| Multi-Step Workflows mit Bedingungen | ⚠️ Begrenzt | ✅ Ideal |
| Multi-Agent-Systeme (Supervisor, Hierarchie) | ❌ Nicht möglich | ✅ Essential |
| Langlebige Sessions mit Checkpointing | ❌ Nicht möglich | ✅ Essential |
| Human-in-the-Loop (erweitert) | ⚠️ Basic (via Middleware) | ✅ Advanced |
| Conditional Routing | ❌ Nicht möglich | ✅ Essential |
| State Persistence über Tage/Wochen | ❌ Nicht möglich | ✅ Essential |
Faustregel:
- ✅ LangChain für einfache, lineare Agent-Tasks
- ✅ LangGraph für komplexe, verzweigte Workflows und Multi-Agent-Systeme
Überblick der wichtigsten LangGraph-Standards
| # | Feature | Priorität | Hauptvorteil | Use Case |
|---|---|---|---|---|
| 1 | StateGraph mit TypedDict | Kernstandard | Type-safe State Management | Alle Workflows |
| 2 | Nodes & Edges | Kernstandard | Workflow-Definition | Alle Workflows |
| 3 | Conditional Routing | Kernstandard | Dynamische Entscheidungen | Verzweigte Logik |
| 4 | Checkpointing & Memory | Aufbauend | Persistenz & Recovery | Langlebige Sessions |
| 5 | Human-in-the-Loop (erweitert) | Aufbauend | Interrupt & Resume | Kritische Entscheidungen |
| 6 | Subgraphs & Multi-Agent | Vertiefung | Modulare Systeme | Komplexe Workflows |
| 7 | Stream Modes | Vertiefung | Debugging & Monitoring | Beobachtbare Apps |
1️⃣ StateGraph mit TypedDict - Type-Safe State Management
❌ ALT (fehleranfällig)
# Untyped State - Runtime-Fehler möglich
graph = StateGraph(dict)
state = {"messages": [], "count": 0} # Keine Type-Checks!
✅ NEU (PFLICHT)
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
class AgentState(TypedDict):
"""State-Definition mit TypedDict für Type-Safety."""
messages: Annotated[list, add_messages] # Reducer für Message-Akkumulation
user_id: str
session_id: str
current_step: int
# StateGraph mit typisiertem State
graph = StateGraph(AgentState)
🎯 Vorteile
- ✅ Type-Safety: Fehler zur Entwicklungszeit, nicht zur Laufzeit
- ✅ IDE-Unterstützung: Autocomplete und IntelliSense
- ✅ Reducer-Support:
add_messagesfür Message-Akkumulation - ✅ Minimal Overhead: TypedDict ist stdlib, keine Runtime-Kosten
📦 Best Practices: TypedDict vs. Pydantic
| Kriterium | TypedDict | Pydantic BaseModel |
|---|---|---|
| Performance | ✅ Schnell (kein Overhead) | ⚠️ Langsamer (Validation) |
| Use Case | Interne State Machines | API Boundaries, User Input |
| Validation | ❌ Keine Runtime-Validation | ✅ Strikte Validation |
| Empfehlung | LangGraph State (intern) | Input/Output-Validierung |
# ✅ Best Practice: TypedDict für State
class GraphState(TypedDict):
messages: Annotated[list, add_messages]
# ✅ Pydantic für User Input
from pydantic import BaseModel
class UserInput(BaseModel):
query: str
temperature: float = 0.7
2️⃣ Nodes & Edges - Workflow-Definition
🔄 Workflow-Bausteine
Nodes = Funktionen, die State transformieren
def agent_node(state: AgentState) -> AgentState:
"""Ein Node ist eine Funktion, die State empfängt und transformiert."""
messages = state["messages"]
response = llm.invoke(messages)
return {"messages": [response]}
def tool_node(state: AgentState) -> AgentState:
"""Tool-Ausführung als Node."""
result = execute_tool(state["messages"][-1])
return {"messages": [result]}
Edges = Verbindungen zwischen Nodes
from langgraph.graph import StateGraph, START, END
graph = StateGraph(AgentState)
# Nodes hinzufügen
graph.add_node("agent", agent_node)
graph.add_node("tools", tool_node)
# Edges definieren
graph.add_edge(START, "agent") # Start → agent
graph.add_edge("tools", "agent") # tools → agent (Loop)
graph.add_edge("agent", END) # agent → END
📦 Graph-Visualisierung (PFLICHT nach compile())
Nach graph_builder.compile() den Graphen immer grafisch darstellen – als sofortige Sichtprüfung der Struktur (Nodes, Edges, Routing).
from IPython.display import Image, display
# Direkt nach graph.compile():
graph = graph_builder.compile()
try:
display(Image(graph.get_graph().draw_mermaid_png()))
except Exception as e:
print(f"⚠️ Graph-Visualisierung nicht verfügbar: {e}")
Warum try/except: draw_mermaid_png() benötigt pygraphviz oder einen Playwright-Browser. In Colab und Jupyter meist verfügbar – try/except verhindert Abbruch falls nicht installiert.
Was die Visualisierung zeigt:
- Alle Nodes mit Namen
- Edges und deren Richtung
- Conditional Edges als Verzweigungen
- START- und END-Knoten
🎯 Vorteile
- ✅ Klarheit: Workflow ist visuell verständlich
- ✅ Testbarkeit: Nodes sind isolierte Funktionen
- ✅ Wiederverwendbarkeit: Nodes können geteilt werden
- ✅ Debugging: Einzelne Nodes testbar
📦 Advanced: Node Caching (verfügbar ab v1.0)
Warum: Eliminiere redundante Berechnungen und spare API-Kosten während der Entwicklung.
from langgraph.graph import StateGraph
graph = StateGraph(AgentState)
# Node mit Caching
def expensive_research(state: AgentState) -> AgentState:
"""Teurer Research-Node - Ergebnis wird gecacht."""
result = expensive_api_call(state["query"])
return {"research_data": result}
# Node mit Cache aktivieren
graph.add_node("research", expensive_research, cache=True)
# Optional: Cache TTL setzen (in Sekunden)
graph.add_node("research", expensive_research, cache=True, cache_ttl=3600) # 1 Stunde
Use Cases:
- ✅ Teure API-Calls (Web Search, Database Queries)
- ✅ Development & Testing (schnellere Iteration)
- ✅ Wiederholbare Berechnungen mit gleichen Inputs
Vorteile:
- 🚀 Entwicklungsgeschwindigkeit erhöhen
- 💰 API-Kosten reduzieren
- ⚡ Workflow-Iterationen beschleunigen
📦 Advanced: Node Defaults — set_node_defaults() (neu in v1.2.0)
Warum: Setze Default-Konfigurationen für alle Nodes an einer zentralen Stelle.
graph = StateGraph(AgentState)
graph.set_node_defaults(retry=RetryPolicy(max_attempts=3), tags=["production"])
graph.add_node("agent", agent_node) # erbt retry + tags automatisch
graph.add_node("tools", tool_node) # erbt retry + tags automatisch
Use Cases: Einheitliche Retry-Policy · Globale Tags für Observability · Weniger Boilerplate
📦 Advanced: Durable Error-Handler Resume (neu in v1.2.0)
Warum: Fehler-Recovery über Host-Neustarts hinweg — kein Zustandsverlust bei Absturz.
config = {"configurable": {"thread_id": "session-123"}}
try:
result = graph.invoke(state, config)
except Exception:
pass # Host neu gestartet
result = graph.invoke(None, config) # setzt ab letztem Checkpoint fort
📦 Advanced: Deferred Nodes (verfügbar ab v1.0)
Warum: Verzögere Node-Ausführung bis alle Upstream-Pfade abgeschlossen sind (Map-Reduce, Consensus).
from langgraph.graph import StateGraph
graph = StateGraph(AgentState)
# Parallel Research Nodes
def web_search_node(state: AgentState) -> AgentState:
return {"web_results": search_web(state["query"])}
def db_query_node(state: AgentState) -> AgentState:
return {"db_results": query_database(state["query"])}
def api_fetch_node(state: AgentState) -> AgentState:
return {"api_results": fetch_from_api(state["query"])}
# Deferred Node wartet auf ALLE Upstream-Nodes
def aggregate_results(state: AgentState) -> AgentState:
"""Aggregiert Ergebnisse von allen parallelen Research-Nodes."""
all_results = (
state.get("web_results", []) +
state.get("db_results", []) +
state.get("api_results", [])
)
return {"aggregated_data": all_results}
# Nodes hinzufügen
graph.add_node("web_search", web_search_node)
graph.add_node("db_query", db_query_node)
graph.add_node("api_fetch", api_fetch_node)
# Deferred Node - wartet auf alle Upstream-Nodes!
graph.add_node("aggregate", aggregate_results, deferred=True)
# Alle Worker-Nodes führen zum Aggregator
graph.add_edge("web_search", "aggregate")
graph.add_edge("db_query", "aggregate")
graph.add_edge("api_fetch", "aggregate")
Use Cases:
- ✅ Map-Reduce Patterns: Parallele Verarbeitung + Aggregation
- ✅ Consensus Mechanisms: Mehrere Agents einigen sich auf Ergebnis
- ✅ Multi-Agent Collaboration: Warte auf alle Agent-Antworten vor Entscheidung
Vorteile:
- 🔄 Saubere Map-Reduce-Semantik
- 🤝 Perfekt für Multi-Agent-Consensus
- ⚡ Maximale Parallelisierung + saubere Aggregation
3️⃣ Conditional Routing - Dynamische Entscheidungen
❌ ALT (statische Edges)
graph.add_edge("agent", "tools") # Immer gleicher Pfad
✅ NEU (PFLICHT für verzweigte Logik)
def should_continue(state: AgentState) -> str:
"""Routing-Funktion: Entscheidet zur Laufzeit, welcher Node als nächstes."""
messages = state["messages"]
last_message = messages[-1]
# Entscheidungslogik
if last_message.tool_calls:
return "tools" # Agent will Tool aufrufen
return END # Agent ist fertig
# Conditional Edge hinzufügen
graph.add_conditional_edges(
"agent", # Von welchem Node
should_continue, # Routing-Funktion
{
"tools": "tools", # Wenn "tools" → zum tools-Node
END: END # Wenn END → Workflow beenden
}
)
🎯 Vorteile
- ✅ Dynamische Workflows: Pfad abhängig von State
- ✅ Komplexe Logik: Beliebige Bedingungen möglich
- ✅ Fehlerbehandlung: Routing zu Error-Nodes
- ✅ Multi-Agent: Routing zu verschiedenen Agents
📦 Advanced: Routing zu mehreren Nodes
def route_to_specialists(state: AgentState) -> list[str]:
"""Routing zu mehreren Nodes parallel."""
task_type = state["task_type"]
if task_type == "research":
return ["web_search", "database_query"] # Parallel
elif task_type == "analysis":
return ["data_analyzer"]
return [END]
graph.add_conditional_edges(
"supervisor",
route_to_specialists
)
4️⃣ Checkpointing & Memory - Persistenz & Recovery
❌ ALT (State verloren bei Neustart)
graph = graph_builder.compile() # Kein Checkpointing!
# Bei Crash oder Neustart: Alles weg ❌
✅ NEU (PFLICHT für Production)
from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver
# Option 1: In-Memory (Development)
memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory)
# Option 2: SQLite (Production)
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
graph = graph_builder.compile(checkpointer=checkpointer)
# Workflow mit Thread-ID starten (für Persistenz)
config = {"configurable": {"thread_id": "user-123-session-456"}}
result = graph.invoke(initial_state, config)
# Später fortsetzen (auch nach Tagen!)
result = graph.invoke(None, config) # Lädt automatisch letzten Checkpoint
🎯 Vorteile
- ✅ Crash-Recovery: Kein Datenverlust bei Absturz
- ✅ Langlebige Sessions: Workflows über Tage/Wochen
- ✅ Debugging: State-Historie verfügbar
- ✅ Rollback: Zurück zu früheren Checkpoints
📦 Checkpoint-Backends
| Backend | Use Case | Persistenz | Version |
|---|---|---|---|
MemorySaver | Development, Testing | ❌ Nur RAM | - |
SqliteSaver | Production (single instance) | ✅ Disk | v3.0.1+ (Security Hardening) |
PostgresSaver | Production (distributed) | ✅ Database | v3.0.2+ (Security Hardening) |
| Custom | Spezielle Anforderungen | ✅ Anpassbar | - |
⚠️ Security Update (v1.0.5):
- SQLite Checkpoint v3.0.1: Security Hardening für Production-Deployments
- PostgreSQL Checkpoint v3.0.2: Security Hardening + Custom Encryption at Rest
- Empfehlung: Update auf v3.x für sichere Production-Umgebungen
# SQLite mit Security Features (v3.0.1+)
from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string(
"checkpoints.db",
encryption_key="your-encryption-key" # Optional: Custom Encryption
)
# PostgreSQL mit Security Features (v3.0.2+)
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/db",
encryption_at_rest=True # NEU: Encryption at Rest
)
5️⃣ Human-in-the-Loop (erweitert) - Interrupt & Resume
❌ ALT (LangChain Middleware - basic)
# LangChain HumanInTheLoopMiddleware ist limitiert
middleware = [HumanInTheLoopMiddleware(tool_names=["delete_file"])]
✅ NEU (PFLICHT für erweiterte Kontrolle)
from langgraph.types import interrupt, Command
def approval_node(state: AgentState) -> AgentState:
"""Node mit Human-in-the-Loop Interrupt."""
action = state["proposed_action"]
# Workflow pausieren und auf Benutzer warten
approved = interrupt(
f"Approve this action? {action}\nType 'yes' or 'no'"
)
if approved == "yes":
return {"status": "approved"}
else:
return {"status": "rejected"}
# Graph mit Interrupt kompilieren
graph = graph_builder.compile(checkpointer=checkpointer)
# Workflow starten
config = {"configurable": {"thread_id": "session-123"}}
try:
result = graph.invoke(initial_state, config)
except GraphInterrupt as e:
# Interrupt wurde ausgelöst
print(f"Waiting for user input: {e.message}")
# Nach User-Input: Fortsetzen mit Command
from langgraph.types import Command
result = graph.invoke(
Command(resume="yes"), # User-Antwort übergeben
config
)
🎯 Vorteile
- ✅ Volle Kontrolle: Workflow pausiert exakt an definiertem Punkt
- ✅ Asynchron: User kann Stunden/Tage später antworten
- ✅ Multiple Interrupts: Mehrere Pausen im Workflow
- ✅ State-Preservation: Kompletter State bleibt erhalten
📦 Interrupt-Patterns
Pattern 1: Approval Gate
def requires_approval(state: AgentState) -> AgentState:
action = state["action"]
approved = interrupt(f"Approve: {action}?")
return {"approved": approved == "yes"}
Pattern 2: Data Collection
def collect_user_data(state: AgentState) -> AgentState:
name = interrupt("What is your name?")
age = interrupt("What is your age?")
return {"name": name, "age": age}
Pattern 3: Error Handling
def error_recovery(state: AgentState) -> AgentState:
error = state["error"]
choice = interrupt(f"Error: {error}\nRetry or Skip?")
return {"action": choice}
🆕 NEU in v1.1.7: Time-Travel / Replay-Fix bei Interrupt-Return
Fix ab v1.1.7: spult automatisch zum letzten Interrupt-Checkpoint zurück.
Typischer Fehler: weglassen. Ohne Checkpointer kein Replay-Punkt.
📦 Advanced: Multiple Interrupt Resume (verfügbar ab v1.0)
Warum: Parallele Tool Calls oder mehrere Interrupts gleichzeitig fortsetzen.
Problem (vor v1.0): Interrupts mussten sequenziell bearbeitet werden.
Lösung (ab v1.0): Alle Interrupts können gleichzeitig mit einem einzigen Command(resume=...) fortgesetzt werden.
from langgraph.types import Command, interrupt
from langgraph.errors import GraphInterrupt
# Workflow mit parallelen Tool Calls
def parallel_tool_node(state: AgentState) -> AgentState:
"""Mehrere Tools parallel ausführen - jedes braucht Approval."""
tool_calls = state["tool_calls"]
# Jeder Tool Call löst eigenen Interrupt aus
results = []
for tool in tool_calls:
approved = interrupt(f"Approve tool: {tool.name}?")
results.append({"tool": tool.name, "approved": approved})
return {"tool_results": results}
# Graph ausführen
config = {"configurable": {"thread_id": "session-1"}}
try:
result = graph.invoke(initial_state, config)
except GraphInterrupt as e:
# Mehrere Interrupts (z.B. 3 parallele Tool Calls)
interrupts = e.interrupts # Liste von Interrupt-Objekten
print(f"Got {len(interrupts)} interrupts to resolve")
# User gibt Feedback für ALLE Interrupts
resume_values = {
interrupts[0].id: "yes", # Approve tool1
interrupts[1].id: "yes", # Approve tool2
interrupts[2].id: "no" # Reject tool3
}
# Resume ALLE Interrupts gleichzeitig! (verfügbar ab v1.0)
result = graph.invoke(
Command(resume=resume_values),
config
)
print(f"All interrupts resolved: {result}")
Use Cases:
- ✅ Parallel Tool Calls: Agent will mehrere Tools gleichzeitig ausführen
- ✅ Batch Approvals: User kann mehrere Aktionen auf einmal genehmigen
- ✅ Out-of-Order Resume: Interrupts können in beliebiger Reihenfolge beantwortet werden
Vorteile:
- 🚀 Effizienz: Keine sequenzielle Bearbeitung mehr nötig
- 🤝 User Experience: User kann alle Entscheidungen auf einmal treffen
- ⚡ Performance: Parallele Tool Calls bleiben parallel
Best Practice:
# Interrupt-IDs tracken für bessere UX
interrupts_map = {}
for interrupt in e.interrupts:
print(f"[{interrupt.id}] {interrupt.message}")
interrupts_map[interrupt.id] = interrupt.message
# User gibt Antworten basierend auf IDs
user_responses = collect_user_input(interrupts_map)
# Resume mit allen Antworten
result = graph.invoke(Command(resume=user_responses), config)
⚠️ Best Practices (2025)
- Verwende
interrupt()(Standard seit v1.0)# ✅ NEU value = interrupt("message") # ❌ ALT (deprecated) raise NodeInterrupt("message") - Interrupt-Reihenfolge ist wichtig
# ✅ Konsistente Reihenfolge name = interrupt("Name?") age = interrupt("Age?") # ❌ Dynamische Reihenfolge (kann Probleme verursachen) if random.choice([True, False]): interrupt("A") interrupt("B") # Index-basiertes Matching kann fehlschlagen - Resume mit Command-Object
# ✅ Empfohlen graph.invoke(Command(resume="value"), config) # ❌ Vermeiden (deprecated) graph.invoke({"resume": "value"}, config)
6️⃣ Subgraphs & Multi-Agent - Modulare Systeme
🏗️ Multi-Agent-Patterns
Pattern 1: Supervisor Pattern
from langchain.agents import create_agent
# Spezialisierte Worker-Agents
research_agent = create_agent(
model=llm,
tools=[web_search, database_query],
system_prompt="You are a research specialist"
)
writer_agent = create_agent(
model=llm,
tools=[write_document],
system_prompt="You are a writing specialist"
)
# Supervisor-Node
def supervisor(state: AgentState) -> Command:
"""Supervisor entscheidet, welcher Agent als nächstes."""
task = state["current_task"]
if "research" in task:
return Command(goto="research_agent")
elif "write" in task:
return Command(goto="writer_agent")
return Command(goto=END)
# Graph mit Supervisor
graph = StateGraph(AgentState)
graph.add_node("supervisor", supervisor)
graph.add_node("research_agent", research_agent)
graph.add_node("writer_agent", writer_agent)
graph.add_edge(START, "supervisor")
graph.add_edge("research_agent", "supervisor")
graph.add_edge("writer_agent", "supervisor")
Pattern 2: Hierarchical Teams mit Subgraphs
def create_research_team() -> StateGraph:
"""Subgraph für Research-Team."""
team_graph = StateGraph(AgentState)
team_graph.add_node("web_searcher", web_search_node)
team_graph.add_node("db_analyst", db_analyst_node)
team_graph.add_node("summarizer", summarizer_node)
# ... Edges definieren
return team_graph.compile()
def create_writing_team() -> StateGraph:
"""Subgraph für Writing-Team."""
team_graph = StateGraph(AgentState)
team_graph.add_node("writer", writer_node)
team_graph.add_node("editor", editor_node)
# ... Edges definieren
return team_graph.compile()
# Top-Level Graph mit Subgraphs
main_graph = StateGraph(AgentState)
main_graph.add_node("research_team", create_research_team())
main_graph.add_node("writing_team", create_writing_team())
main_graph.add_node("coordinator", coordinator_node)
main_graph.add_edge(START, "coordinator")
main_graph.add_conditional_edges("coordinator", route_to_team)
🎯 Vorteile
- ✅ Modularität: Teams als wiederverwendbare Komponenten
- ✅ Skalierbarkeit: Einfache Erweiterung durch neue Teams
- ✅ Klarheit: Klare Verantwortlichkeiten
- ✅ Testing: Subgraphs isoliert testbar
📦 Wann welches Pattern?
| Pattern | Use Case | Komplexität | Vorteile |
|---|---|---|---|
| Supervisor | 3-5 Agents, flache Struktur | ⭐ Niedrig | Einfach, schnell |
| Hierarchical | >5 Agents, mehrere Ebenen | ⭐⭐ Mittel | Skalierbar, strukturiert |
| Collaborative | Agents arbeiten parallel | ⭐⭐⭐ Hoch | Effizient, komplex |
🆕 ToolRuntime – Dependency Injection in Tools (langgraph-prebuilt 1.0.8, Feb 2026)
ToolRuntime ermöglicht Tools den Zugriff auf den vollständigen LangGraph-Laufzeit-Kontext via Dependency Injection – ähnlich wie InjectedState und InjectedStore, aber mit Zugang zum gesamten Runtime-Bundle.
from dataclasses import dataclass
from langgraph.prebuilt import ToolNode
from langchain_core.tools import tool
from typing import Any
from langchain_core.runnables import RunnableConfig
# ToolRuntime wird automatisch von ToolNode injiziert
@tool
def kontext_bewusstes_tool(query: str, runtime: ToolRuntime) -> str:
"""Tool mit vollständigem Zugriff auf den LangGraph-Laufzeit-Kontext."""
# Aktuellen State lesen (ohne State-Parameter im Tool)
aktueller_user = runtime.state.get("user_id", "unbekannt")
# Persistenten Store nutzen
if runtime.store:
vergangene_anfragen = runtime.store.get("anfragen", runtime.state["session_id"])
# Tool-Call-ID für Tracking
call_id = runtime.tool_call_id
return f"Antwort für {aktueller_user}: {query}"
# ToolNode injiziert ToolRuntime automatisch
tools = [kontext_bewusstes_tool]
tool_node = ToolNode(tools)
graph = StateGraph(AgentState)
graph.add_node("tools", tool_node) # ToolRuntime wird automatisch übergeben
ToolRuntime-Felder:
| Feld | Typ | Beschreibung |
|---|---|---|
state | dict | Aktueller Graph-State |
context | Any | LangGraph Runtime-Context |
config | RunnableConfig | Runnable-Konfiguration |
stream_writer | StreamWriter | Für direktes Streaming aus dem Tool |
tool_call_id | str | ID des aktuellen Tool-Calls |
store | BaseStore \| None | Persistenter Store (falls konfiguriert) |
Wann ToolRuntime verwenden:
- ✅ Tool braucht Zugriff auf State-Felder (ohne
InjectedState-Annotation) - ✅ Dynamisch registrierte Tools (zur Laufzeit hinzugefügt)
- ✅ Tool soll direkt in den Stream schreiben (
runtime.stream_writer) - ❌ Einfache Tools ohne Kontext-Bedarf → kein Overhead nötig
🆕 NEU in v1.1.10: ToolNode Command Handoff
ToolNode kann Command-Objekte zurückgeben — direktes Routing aus dem Tool heraus, ohne separate Conditional Edges.
from langgraph.types import Command
from langchain_core.tools import tool
@tool
def eskaliere_zu_mensch(grund: str) -> Command:
"""Eskaliert direkt zum Human-Approval-Node."""
return Command(goto="human_approval", update={"eskalierungsgrund": grund})
tool_node = ToolNode([eskaliere_zu_mensch, normales_tool])
7️⃣ Stream Modes - Debugging & Monitoring
❌ ALT (keine Einsicht in Workflow)
result = graph.invoke(state) # Blackbox - keine Zwischenschritte
✅ NEU (PFLICHT für Production)
# Stream Mode: "values" - Full State nach jedem Node
for event in graph.stream(initial_state, config):
print(f"Node: {event['node']}")
print(f"State: {event['values']}")
# Stream Mode: "updates" - Nur State-Deltas
for event in graph.stream(initial_state, config, stream_mode="updates"):
print(f"Update from {event['node']}: {event['updates']}")
# Stream Mode: "debug" - Detaillierte Traces
for event in graph.stream(initial_state, config, stream_mode="debug"):
print(f"Debug: {event}")
# Kombination mehrerer Modes
for event in graph.stream(
initial_state,
config,
stream_mode=["values", "updates", "debug"]
):
if event["type"] == "values":
print(f"State: {event['values']}")
elif event["type"] == "updates":
print(f"Update: {event['updates']}")
🎯 Vorteile
- ✅ Real-time Monitoring: Workflow-Progress live verfolgen
- ✅ Debugging: Fehler sofort identifizieren
- ✅ User Feedback: Progress-Bar, Status-Updates
- ✅ Production Monitoring: Logging, Metrics
🆕 NEU in v1.1.0: v2-Streaming-Format (StreamPart-Dicts)
Ab v1.1.0 liefert strukturierte -Dicts. gibt mit und zurück.
Grenze: Altes -Format erzeugt ab v1.1 Deprecation-Warnings.
📦 Stream Modes Übersicht
| Mode | Output | Use Case | Performance |
|---|---|---|---|
| values | Full State | Simple Workflows, Debugging | ⚠️ Viel Daten |
| updates | State Deltas | Production, Monitoring | ✅ Effizient |
| debug | Execution Traces | Development, Fehlersuche | ⚠️ Verbose |
| messages | LLM Tokens | Streaming Chat UIs | ✅ Real-time |
| custom | User-defined | Spezielle Anforderungen | ✅ Flexibel |
📦 Practical Example: Progress Bar
from tqdm import tqdm
def run_with_progress(graph, state, config):
"""Workflow mit Progress Bar."""
nodes = ["node1", "node2", "node3", "node4"]
with tqdm(total=len(nodes), desc="Workflow") as pbar:
for event in graph.stream(state, config, stream_mode="updates"):
node_name = list(event.keys())[0]
pbar.set_description(f"Running: {node_name}")
pbar.update(1)
return graph.invoke(state, config)
📦 Advanced: Pre/Post Model Hooks (verfügbar ab v1.0)
Warum: Custom Logic vor/nach Model Calls für Context-Management, Guardrails und Human-in-Loop Gates.
Use Cases:
- Pre-Hook: Context-Size-Management (Token-Bloat verhindern), Custom Prompt Injection
- Post-Hook: Guardrails, Content Moderation, Compliance-Checks, Human Review Gates
# Ausnahme: create_react_agent aus langgraph.prebuilt (nicht create_agent aus langchain.agents),
# da pre_model_hook / post_model_hook LangGraph-exklusive Parameter sind.
from langgraph.prebuilt import create_react_agent
def pre_model_hook(state):
"""Vor Model Call: Verhindere Token-Bloat."""
messages = state["messages"]
token_count = sum(len(m.content) for m in messages)
if token_count > 100000:
# Fasse alte Messages zusammen
summary = llm_summarize(messages[:40])
return {
"messages": [
SystemMessage(content=summary),
*messages[40:]
]
}
return state
def post_model_hook(state, response):
"""Nach Model Call: Content Moderation & Guardrails."""
# Guardrail 1: Sensitive Content Detection
if contains_pii(response.content):
return {"needs_review": True, "reason": "PII detected"}
# Guardrail 2: Policy Violation
if violates_policy(response.content):
return interrupt(f"Policy violation detected. Review required:\n{response.content}")
# Guardrail 3: Confidence Check
if response.confidence < 0.7:
return {"approved": False, "reason": "Low confidence"}
return {"response": response, "approved": True}
# Agent mit Hooks erstellen
agent = create_react_agent(
model=llm,
tools=[tool1, tool2],
pre_model_hook=pre_model_hook, # NEU!
post_model_hook=post_model_hook # NEU!
)
Vorteile:
- 🛡️ Safety-Layer: Automatische Guardrails vor Production-Deployment
- 💰 Token-Management: Verhindere teure Token-Overflows
- 🔍 Compliance: DSGVO-konforme PII-Redaktion
- 🤝 Human-in-Loop: Pause bei kritischen Entscheidungen
Best Practices:
- Pre-Hook für Context-Management
def pre_hook(state): """Optimiere Context-Size.""" if len(state["messages"]) > 50: return summarize_old_messages(state) return state - Post-Hook für Guardrails
def post_hook(state, response): """Multi-Layer Safety.""" # Layer 1: Content Moderation if is_unsafe(response): return {"blocked": True} # Layer 2: Fact-Check if needs_verification(response): return interrupt("Verify facts before proceeding") return {"response": response} - Kombination mit Human-in-Loop
def post_hook(state, response): """Human Review für kritische Aktionen.""" if response.tool_calls: for call in response.tool_calls: if call.name in ["delete_database", "send_email"]: return interrupt(f"Approve tool call: {call.name}?") return {"response": response}
🚀 Complete Example: Multi-Agent Research System
from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.types import interrupt, Command
# 1. State Definition mit TypedDict
class ResearchState(TypedDict):
messages: Annotated[list, add_messages]
topic: str
research_data: str
report: str
approved: bool
# 2. Nodes definieren
def supervisor(state: ResearchState) -> Command:
"""Supervisor entscheidet über nächsten Schritt."""
if not state.get("research_data"):
return Command(goto="researcher")
elif not state.get("report"):
return Command(goto="writer")
elif not state.get("approved"):
return Command(goto="approval")
return Command(goto=END)
def researcher(state: ResearchState) -> ResearchState:
"""Research Agent sammelt Daten."""
topic = state["topic"]
# Simuliere Research
data = f"Research data about {topic}..."
return {"research_data": data}
def writer(state: ResearchState) -> ResearchState:
"""Writer Agent erstellt Report."""
data = state["research_data"]
report = f"Report based on: {data}"
return {"report": report}
def approval(state: ResearchState) -> ResearchState:
"""Human-in-the-Loop Approval."""
report = state["report"]
approved = interrupt(f"Approve this report?\n{report}\n(yes/no)")
return {"approved": approved == "yes"}
# 3. Graph mit Conditional Routing
graph_builder = StateGraph(ResearchState)
graph_builder.add_node("supervisor", supervisor)
graph_builder.add_node("researcher", researcher)
graph_builder.add_node("writer", writer)
graph_builder.add_node("approval", approval)
graph_builder.add_edge(START, "supervisor")
graph_builder.add_edge("researcher", "supervisor")
graph_builder.add_edge("writer", "supervisor")
graph_builder.add_edge("approval", "supervisor")
# 4. Checkpointing für Persistenz
checkpointer = SqliteSaver.from_conn_string("research.db")
graph = graph_builder.compile(checkpointer=checkpointer)
# 5. Workflow mit Streaming ausführen
config = {"configurable": {"thread_id": "research-session-1"}}
initial_state = {"topic": "LangGraph Best Practices"}
try:
for event in graph.stream(initial_state, config, stream_mode="updates"):
print(f"Update: {event}")
except GraphInterrupt as e:
print(f"Waiting for approval: {e.message}")
# User gibt Feedback
user_input = input("Your decision: ")
# Resume mit Command
result = graph.invoke(Command(resume=user_input), config)
print(f"Final result: {result}")
📚 Import-Cheatsheet
# Core
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from typing import TypedDict, Annotated
# Checkpointing
from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver
# Human-in-the-Loop
from langgraph.types import interrupt, Command
from langgraph.errors import GraphInterrupt
# Integration mit LangChain Agents
from langchain.agents import create_agent
from langchain.chat_models import init_chat_model
from langchain_core.tools import tool
⚠️ Migration von LangChain zu LangGraph
Wann migrieren?
- ✅ Multi-Step Workflows mit Verzweigungen
- ✅ Multi-Agent-Systeme (>2 Agents)
- ✅ Langlebige Sessions (>1 Stunde)
- ✅ Human-in-the-Loop mit erweiterten Features
- ✅ State Persistence erforderlich
Migration-Checkliste
- State als TypedDict definieren
- Agent-Logik in Nodes umwandeln
- Conditional Routing für Verzweigungen
- Checkpointer hinzufügen (SQLite/Postgres)
- Interrupt-Punkte für Human-in-the-Loop
- Stream Modes für Monitoring
- Tests für einzelne Nodes schreiben
- Graph nach
compile()visualisieren (draw_mermaid_png()) config_schema→context_schemaersetzen (deprecated seit v1.0)langgraph.prebuiltImports →langchain.agentsmigrieren
⚠️ Deprecated APIs (ab v1.0)
| Deprecated | Ersatz | Status |
|---|---|---|
config_schema Parameter in StateGraph | context_schema | Deprecation-Warning, Entfernung in v2.0 |
langgraph.prebuilt.create_react_agent | langchain.agents.create_react_agent / create_agent | Deprecation-Warning, Entfernung in v2.0 |
langgraph.prebuilt.AgentState | langchain.agents.AgentState | Deprecation-Warning, Entfernung in v2.0 |
raise NodeInterrupt() | interrupt() Funktion | Deprecation-Warning |
# ❌ ALT (deprecated)
graph = StateGraph(MyState, config_schema=ConfigSchema)
# ✅ NEU (v1.0+)
graph = StateGraph(MyState, context_schema=ContextSchema)
🎯 Best Practices Zusammenfassung
1. State Design
- ✅ TypedDict für LangGraph State (Performance)
- ✅ Pydantic für Input/Output-Validierung
- ✅ Minimal State: Nur nötige Daten im State
- ✅ Reducer:
add_messagesfür Message-Akkumulation
2. Workflow-Design
- ✅ Kleine Nodes: Eine Verantwortung pro Node
- ✅ Conditional Routing: Statt viele kleine Edges
- ✅ Subgraphs: Für >5 Nodes gruppieren
- ✅ Error Handling: Dedicated Error-Nodes
- ✅ Graph-Visualisierung: Direkt nach
compile()mitdraw_mermaid_png()
3. Production-Ready
- ✅ Checkpointing: Immer in Production
- ✅ Stream Modes: Für Monitoring
- ✅ Thread-IDs: User/Session-spezifisch
- ✅ Testing: Nodes isoliert testen
- ✅ Recursion Limit: Explizit setzen (Default geändert in v1.0.6)
- ✅ Python: Minimum 3.10+ (3.13 kompatibel, 3.9 abgekündigt)
4. Human-in-the-Loop
- ✅
interrupt()verwenden (nichtNodeInterrupt) - ✅ Konsistente Reihenfolge der Interrupts
- ✅ Command-Object für Resume
- ✅ Approval Gates für kritische Aktionen
📖 Weitere Ressourcen
- LangGraph Docs: https://langchain-ai.github.io/langgraph/
- LangGraph Changelog: https://changelog.langchain.com/ (Node Caching, Deferred Nodes, Hooks)
- Multi-Agent Tutorial: https://langchain-ai.github.io/langgraph/tutorials/multi_agent/
- Checkpointing Guide: https://langchain-ai.github.io/langgraph/concepts/persistence/
- Human-in-the-Loop: https://langchain-ai.github.io/langgraph/concepts/human_in_the_loop/
- LangGraph Studio: Visuelle Debugging-Umgebung
Abgrenzung zu verwandten Dokumenten
| Dokument | Frage |
|---|---|
| Erste Agenten | Wo starte ich als Einsteiger mit LangGraph Best Practices? |
| Qualität und Sicherheit | Welche Produktionsstandards gelten für LangGraph Best Practices? |
📝 Changelog
Changelog v1.6 (Mai 2026):
- 🆕
set_node_defaults()— Node-Level Default-Konfigurationen (LangGraph v1.2.0) - 🆕 Durable Error-Handler Resume — Fehler-Recovery über Host-Neustarts hinweg (v1.2.0)
- 🆕 ToolNode Command Handoff — direktes Routing aus Tools via
Command(v1.1.10)
Changelog v1.5 (Mai 2026):
- 🆕 HITL Replay-Fix —
Command(resume=...)spult zum Interrupt-Checkpoint zurück (v1.1.7) - 🆕 v2-Streaming-Format —
StreamPart-Dicts,GraphOutput.value/.interrupts(v1.1.0+)
Version: 1.6
Stand: Mai 2026
Kurs: KI-Agenten. Verstehen. Anwenden. Gestalten.