Un agent fait plusieurs appels LLM, chacun de plusieurs secondes. En invoke(), l’utilisateur fixe un écran figé pendant dix, vingt secondes, sans savoir si quelque chose se passe. C’est rédhibitoire. Le streaming change tout : il restitue le travail de l’agent au fur et à mesure — un nœud franchi, un token produit, une recherche lancée.

La latence réelle ne baisse pas. La latence perçue, elle, s’effondre. Et c’est elle qui décide si votre agent est utilisable.

Les modes de streaming

graph.stream() (et astream() en async) remplace invoke(). Tout se joue dans le paramètre stream_mode, qui décide de ce qui est émis.

Fig.01 · Modes de stream
gros grain values état complet
étapes updates ∆ par nœud
fin grain messages tokens LLM
sur-mesure custom vos événements
Quatre granularités, du gros grain (l'état complet) au fin grain (le token). On les combine selon l'interface à nourrir.
# updates : la mise à jour produite par chaque nœud (idéal pour « étape franchie »)
for chunk in agent.stream(entree, stream_mode="updates"):
    for nom_noeud, maj in chunk.items():
        print(f"[{nom_noeud}] a produit : {maj}")

# values : l'état complet après chaque super-step (idéal pour resynchroniser une UI)
for etat in agent.stream(entree, stream_mode="values"):
    print(etat["messages"][-1])

Streamer les tokens du modèle

Le mode messages émet les tokens des appels LLM au fil de leur génération — l’effet « machine à écrire ». Chaque élément est un couple (fragment, métadonnées).

for fragment, meta in agent.stream(entree, stream_mode="messages"):
    # meta indique de quel nœud / appel provient le token
    if fragment.content:
        print(fragment.content, end="", flush=True)

Combiner plusieurs modes

On peut demander plusieurs modes à la fois : stream() émet alors des tuples (mode, donnée). Indispensable pour une UI riche qui montre et les étapes et les tokens.

for mode, donnee in agent.stream(entree, stream_mode=["updates", "messages"]):
    if mode == "updates":
        afficher_etape(donnee)        # « Recherche en cours… »
    elif mode == "messages":
        fragment, meta = donnee
        afficher_token(fragment.content)

Émettre ses propres événements (custom)

Le plus puissant : depuis l’intérieur d’un nœud ou d’un outil, on émet des événements arbitraires — une progression, un statut métier — captés en stream_mode="custom". Idéal pour les outils longs.

from langgraph.config import get_stream_writer

def rechercher_massif(state: Etat) -> dict:
    writer = get_stream_writer()
    for i, source in enumerate(sources):
        writer({"progress": i / len(sources), "source": source.nom})  # → stream custom
        indexer(source)
    return {"statut": "indexé"}

Côté consommateur :

for event in agent.stream(entree, stream_mode="custom"):
    print(f"Progression : {event['progress']:.0%}{event['source']}")

C’est ainsi qu’on affiche « Analyse de la source 3/12… » pendant qu’un outil travaille, sans attendre qu’il ait fini.

Événements fins : astream_events

Pour une granularité maximale (début/fin de chaque LLM, outil, chaîne, avec leurs entrées/sorties), astream_events émet un flux d’événements typés. Plus verbeux, mais idéal pour piloter une UI très détaillée ou du logging.

async for event in agent.astream_events(entree, version="v2"):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        print(event["data"]["chunk"].content, end="")
    elif kind == "on_tool_start":
        print(f"\n→ outil {event['name']} démarré")

Streamer à travers les sous-graphes

Un agent composé de sous-graphes masque par défaut leur activité interne. subgraphs=True révèle le flux imbriqué : chaque événement est alors préfixé par le chemin (namespace) du sous-graphe.

for namespace, chunk in agent.stream(entree, stream_mode="updates", subgraphs=True):
    print(f"{'/'.join(namespace) or 'racine'}{chunk}")

Jusqu’à l’interface : SSE

Du serveur au navigateur, le transport naturel d’un flux d’agent est SSE (Server-Sent Events) — unidirectionnel, simple, résilient. Le serveur itère le stream() et pousse chaque morceau.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json

app = FastAPI()

@app.post("/chat")
async def chat(req: Requete):
    async def flux():
        async for mode, donnee in agent.astream(req.entree, stream_mode=["updates", "messages"]):
            yield f"data: {json.dumps(serialiser(mode, donnee))}\n\n"
    return StreamingResponse(flux(), media_type="text/event-stream")

Que montrer, et que cacher

Streamer tout, brut, noie l’utilisateur. L’art est de traduire le flux en signaux lisibles :

  • Montrer : les tokens de la réponse finale, les étapes de haut niveau (« Recherche… », « Rédaction… »), la progression des outils longs.
  • Cacher : le raisonnement de routage interne, les appels d’outils techniques, les tokens des LLM intermédiaires (résumé, classification).

Bonnes pratiques & pièges

Bonnes pratiques
  • Choisissez le mode selon l'UI : updates pour les étapes, messages pour les tokens, custom pour la progression.
  • Combinez les modes (liste) pour montrer étapes ET tokens dans la même interface.
  • Filtrez les tokens par nœud (métadonnées) : ne streamez visiblement que la réponse finale.
  • Émettez des événements custom depuis les outils longs (get_stream_writer) pour une vraie progression.
  • Projetez chaque morceau sur un schéma JSON stable avant de l'envoyer au front.
  • Avec un checkpointer, laissez l'agent continuer après une déconnexion et reprenez le flux.
Pièges à éviter
  • Rester en invoke() : dix secondes d'écran figé suffisent à faire fuir l'utilisateur.
  • Streamer tout brut : le raisonnement interne et les LLM intermédiaires noient le signal.
  • Envoyer des objets LangChain non sérialisés au navigateur : ça casse côté front.
  • Ignorer la déconnexion client : l'agent tourne dans le vide et la facture monte.
  • Streamer les tokens de TOUS les LLM, y compris le routeur : l'UI devient illisible.

Ce qu’il faut retenir

  • Le streaming abat la latence perçue — celle qui décide de l’utilisabilité.
  • Quatre modes : values (état), updates (∆ par nœud), messages (tokens), custom (vos événements) — combinables.
  • get_stream_writer émet des événements métier depuis les nœuds et outils.
  • astream_events et subgraphs=True donnent la granularité fine et le flux imbriqué.
  • En production : transport SSE, sérialisation stable, gestion de la déconnexion.
  • Le streaming est une couche de présentation : traduisez le flux brut en signaux lisibles.

Vous tenez désormais toute la chaîne : orchestrer, outiller, mémoriser, persister, observer, évaluer, et restituer en temps réel. De quoi mener un agent du prototype à la production — exactement la promesse des cas pratiques.

#langgraph#streaming#ux#tokens#production