Checkpointing & Persistenz

Zustandsspeicherung und Session-Persistenz in LangGraph


Inhaltsverzeichnis

  1. Überblick
  2. Wie Checkpointing funktioniert
    1. Kernkonzepte
    2. Thread-IDs
  3. Checkpointer-Typen
    1. MemorySaver – Entwicklung & Tests
    2. SqliteSaver – Leichtgewichtige Persistenz
    3. AsyncSqliteSaver – Async-Anwendungen
    4. PostgresSaver – Produktionsumgebungen
    5. Entscheidungshilfe
  4. Vollständiges Beispiel: Multi-Turn-Konversation
  5. Checkpoint-State abrufen
  6. Interrupt & Resume (Human-in-the-Loop)
  7. Time Travel: Zu früherem State zurückkehren
  8. Best Practices
    1. Thread-ID-Design
    2. State-Größe kontrollieren
    3. Checkpointer-Ressourcen korrekt schließen
  9. Häufige Fehler
    1. interrupt() ohne Checkpointer
    2. Gleiche thread_id für verschiedene Nutzer
    3. State direkt mutieren
  10. Zusammenfassung
  11. Abgrenzung zu verwandten Dokumenten

Überblick

In einem einfachen LangGraph-Workflow läuft eine Konversation vollständig im Arbeitsspeicher – sobald der Prozess endet, ist der gesamte State verloren. Checkpointing löst dieses Problem: Der State wird nach jedem Node-Schritt persistiert und kann jederzeit wiederhergestellt werden.

Fähigkeit Beschreibung
Multi-Turn-Konversationen Nutzer können Gespräche unterbrechen und nahtlos fortsetzen
Human-in-the-Loop Agent pausiert und wartet auf menschliche Eingabe
Fehlertoleranz Bei Absturz läuft der Workflow vom letzten Checkpoint weiter
Time Travel Zu einem früheren State zurückspringen und Workflow neu starten

[!INFO] Checkpointing ist kein Logging.
Es speichert den vollständigen State – nicht nur Protokolleinträge. Jeder gespeicherte Checkpoint ist ein vollständiger Snapshot, von dem aus der Workflow exakt fortgesetzt werden kann.


Wie Checkpointing funktioniert

LangGraph speichert nach jedem Node-Ausführungsschritt einen Snapshot des States. Diese Snapshots sind nach Thread-ID und Checkpoint-ID adressierbar.

flowchart LR
    S([START]) --> N1[Node A]
    N1 -->|Checkpoint 1| N2[Node B]
    N2 -->|Checkpoint 2| N3[Node C]
    N3 -->|Checkpoint 3| E([END])

    CP1[(Checkpoint 1)] -.->|gespeichert| N1
    CP2[(Checkpoint 2)] -.->|gespeichert| N2
    CP3[(Checkpoint 3)] -.->|gespeichert| N3

Kernkonzepte

Konzept Beschreibung
Thread Eine Konversations-Sitzung, identifiziert durch thread_id
Checkpoint Vollständiger Snapshot des States nach einem Node
Checkpoint-ID Eindeutige ID jedes Snapshots
Namespace Organisationseinheit für mehrere Threads

Thread-IDs

Jede Konversation erhält eine eindeutige thread_id. LangGraph speichert und lädt Checkpoints automatisch anhand dieser ID.

# Erster Aufruf
config = {"configurable": {"thread_id": "nutzer-123-session-1"}}
result1 = app.invoke(
    {"messages": [{"role": "user", "content": "Mein Name ist Anna."}]},
    config=config
)

# Zweiter Aufruf – Kontext ist erhalten, da gleiche thread_id
result2 = app.invoke(
    {"messages": [{"role": "user", "content": "Wie heisse ich?"}]},
    config=config
)
# Antwort: "Du heisst Anna." – der Agent erinnert sich

Checkpointer-Typen

LangGraph bietet verschiedene Checkpointer für unterschiedliche Anforderungen.

MemorySaver – Entwicklung & Tests

from langgraph.checkpoint.memory import MemorySaver

checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)
Eigenschaft Wert
Persistenz Nein – verloren beim Neustart
Performance Sehr schnell
Einsatz Entwicklung, Tests, einfache Demos
Setup Keine externe Abhängigkeit

SqliteSaver – Leichtgewichtige Persistenz

from langgraph.checkpoint.sqlite import SqliteSaver

# Datei-basiert (persistent)
with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
    app = graph.compile(checkpointer=checkpointer)
    result = app.invoke(inputs, config=config)
Eigenschaft Wert
Persistenz Ja – Datei bleibt erhalten
Performance Gut für moderate Last
Einsatz Prototypen, lokale Anwendungen
Setup pip install langgraph-checkpoint-sqlite

AsyncSqliteSaver – Async-Anwendungen

from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

async with AsyncSqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
    app = graph.compile(checkpointer=checkpointer)
    result = await app.ainvoke(inputs, config=config)

PostgresSaver – Produktionsumgebungen

from langgraph.checkpoint.postgres import PostgresSaver
import psycopg

with psycopg.connect("postgresql://user:pass@host/db") as conn:
    checkpointer = PostgresSaver(conn)
    checkpointer.setup()  # Erstellt notwendige Tabellen einmalig
    app = graph.compile(checkpointer=checkpointer)
Eigenschaft Wert
Persistenz Ja – vollständig persistent
Performance Skalierbar, Multi-User-fähig
Einsatz Produktionssysteme
Setup pip install langgraph-checkpoint-postgres

Entscheidungshilfe

flowchart TD
    A{Persistenz nötig?}
    A -->|Nein| B[MemorySaver]
    A -->|Ja| C{Produktionsumgebung?}
    C -->|Nein| D[SqliteSaver]
    C -->|Ja| E{Async?}
    E -->|Ja| F[AsyncSqliteSaver + Postgres]
    E -->|Nein| G[PostgresSaver]

Vollständiges Beispiel: Multi-Turn-Konversation

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langchain.chat_models import init_chat_model

class ConversationState(TypedDict):
    messages: Annotated[list, add_messages]

llm = init_chat_model("openai:gpt-4o-mini", temperature=0.0)

def chat_node(state: ConversationState) -> ConversationState:
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

# Graph mit Checkpointing aufbauen
graph = StateGraph(ConversationState)
graph.add_node("chat", chat_node)
graph.add_edge(START, "chat")
graph.add_edge("chat", END)

checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)

# Erste Nachricht
config = {"configurable": {"thread_id": "user-42"}}
result = app.invoke(
    {"messages": [{"role": "user", "content": "Mein Name ist Anna."}]},
    config=config
)

# Zweite Nachricht – Kontext aus Checkpoint geladen
result = app.invoke(
    {"messages": [{"role": "user", "content": "Wie heisse ich?"}]},
    config=config
)
print(result["messages"][-1].content)
# "Du heisst Anna."

Checkpoint-State abrufen

# Aktuellen State eines Threads abrufen
state = app.get_state(config)
print(state.values)       # Aktueller State-Inhalt
print(state.next)         # Nächste geplante Nodes (bei Interrupt)
print(state.config)       # Aktuelle Konfiguration inkl. checkpoint_id

# Verlauf aller Checkpoints (neueste zuerst)
for checkpoint in app.get_state_history(config):
    cid = checkpoint.config["configurable"]["checkpoint_id"]
    n_msgs = len(checkpoint.values.get("messages", []))
    print(f"Checkpoint {cid}: {n_msgs} Nachrichten")

Interrupt & Resume (Human-in-the-Loop)

Checkpointing ist die technische Basis für Human-in-the-Loop. Der Agent pausiert an einem definierten Punkt – der State bleibt gespeichert, bis ein Mensch antwortet.

from langgraph.types import interrupt, Command

def kritische_aktion(state: ConversationState) -> ConversationState:
    """Node, der menschliche Bestätigung erfordert."""
    # interrupt() pausiert den Workflow und gibt Daten zurück
    entscheidung = interrupt({
        "frage": "Soll ich die E-Mail wirklich senden?",
        "empfaenger": "team@firma.de"
    })

    if entscheidung == "ja":
        return {"messages": [{"role": "assistant", "content": "E-Mail gesendet."}]}
    else:
        return {"messages": [{"role": "assistant", "content": "E-Mail abgebrochen."}]}

# Workflow starten – stoppt beim interrupt()
app.invoke(inputs, config=config)

# Mensch entscheidet und setzt Workflow fort
app.invoke(Command(resume="ja"), config=config)
flowchart LR
    A([START]) --> B[Aufgabe bearbeiten]
    B --> C{Kritische Aktion}
    C -->|interrupt| D[(Checkpoint\ngespeichert)]
    D --> E[Mensch entscheidet]
    E -->|resume| F[Aktion ausfuehren]
    F --> G([END])

    style D fill:#FFA500,color:#000
    style E fill:#87CEEB,color:#000

[!WARNING] interrupt() erfordert einen Checkpointer
Ohne kompilierten Checkpointer wirft interrupt() eine Exception. Immer graph.compile(checkpointer=...) verwenden, wenn HITL genutzt wird.


Time Travel: Zu früherem State zurückkehren

LangGraph ermöglicht es, zu einem früheren Checkpoint zurückzuspringen und den Workflow von dort neu zu starten.

# Verlauf abrufen (neueste zuerst)
history = list(app.get_state_history(config))

# Früherer Checkpoint (z.B. vor einer Fehlerentscheidung)
earlier = history[3]

# Workflow vom früheren State aus neu starten
result = app.invoke(
    {"messages": [{"role": "user", "content": "Versuche es anders."}]},
    config=earlier.config  # Früherer Checkpoint als Ausgangspunkt
)

Einsatzszenarien:

Szenario Beschreibung
Fehlerkorrektur Fehlerhafte Entscheidung rückgängig machen
Alternatives Routing Anderen Entscheidungspfad testen
Debugging Agenten-Verhalten schrittweise analysieren
A/B-Vergleich Zwei Antworten aus gleicher Ausgangssituation vergleichen

Best Practices

Thread-ID-Design

[!TIP] Thread-ID-Design für Multi-User
Für produktive Systeme: thread_id = f"user_{user_id}_session_{session_id}". Generische IDs wie "session1" führen zu Datenvermischung zwischen Nutzern.

# Eindeutige, nachvollziehbare Thread-IDs
thread_id = f"user_{user_id}_session_{session_id}"

# UUID für automatisch generierte IDs
import uuid
thread_id = str(uuid.uuid4())

# Nicht: zu generisch
# thread_id = "session1"  # Kollisionsgefahr

State-Größe kontrollieren

Checkpoints speichern den gesamten State. Großer State bedeutet mehr Speicher und langsamere I/O.

from langchain_core.messages import trim_messages

def trim_node(state: ConversationState) -> ConversationState:
    """Begrenzt die Nachrichten-History vor dem Checkpointing."""
    trimmed = trim_messages(
        state["messages"],
        max_tokens=4000,
        strategy="last",
        token_counter=llm,
    )
    return {"messages": trimmed}

Checkpointer-Ressourcen korrekt schließen

# Mit Context Manager (empfohlen)
with SqliteSaver.from_conn_string("db.sqlite") as checkpointer:
    app = graph.compile(checkpointer=checkpointer)
    result = app.invoke(inputs, config=config)
# Ressource wird automatisch freigegeben

Häufige Fehler

interrupt() ohne Checkpointer

# Fehler
app = graph.compile()  # Kein Checkpointer
app.invoke(inputs, config=config)  # Wirft: "No checkpointer set"

# Richtig
app = graph.compile(checkpointer=MemorySaver())

Gleiche thread_id für verschiedene Nutzer

# Falsch: alle Nutzer teilen einen Thread
config = {"configurable": {"thread_id": "global"}}

# Richtig: pro Nutzer eigener Thread
config = {"configurable": {"thread_id": f"user_{user_id}"}}

State direkt mutieren

# Falsch: mutiert den gespeicherten State
def bad_node(state):
    state["messages"].append("neue Nachricht")
    return state

# Richtig: neue Werte als Return
def good_node(state):
    return {"messages": [{"role": "assistant", "content": "neue Nachricht"}]}

Zusammenfassung

Checkpointing ist das technische Fundament für persistente, unterbrechbare Agenten-Workflows.

Konzept Kernaussage
MemorySaver Entwicklung & Tests – kein Setup, nicht persistent
SqliteSaver Lokale Persistenz – einfach, dateibasiert
PostgresSaver Produktion – skalierbar, multi-user-fähig
Thread-ID Identifiziert eine Konversations-Sitzung eindeutig
interrupt() Pausiert Workflow für menschliche Eingabe
Time Travel Zu früherem State zurückspringen

Verwandte Konzepte:

Abgrenzung zu verwandten Dokumenten

Dokument Inhalt
State Management Definition und Struktur des States, der gespeichert wird
Memory-Systeme Langzeitgedächtnis jenseits des Checkpoint-Scopes (semantisch, per User)
Human-in-the-Loop Interrupt & Resume als Anwendungsfall von Checkpointing

Version: 1.0
Stand: März 2026
Kurs: KI-Agenten. Verstehen. Anwenden. Gestalten.