forked from CopilotKit/CopilotKit
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathshared_state_streaming.py
More file actions
94 lines (81 loc) · 3.24 KB
/
Copy pathshared_state_streaming.py
File metadata and controls
94 lines (81 loc) · 3.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
"""LangGraph agent backing the State Streaming demo.
Demonstrates per-token state-delta streaming. The agent writes a long
`document` string into shared agent state via a `write_document` tool;
`StateStreamingMiddleware(StateItem(...))` tells CopilotKit to forward
*every token* of the tool's `content` argument directly into the
`document` state key as it is generated. The UI (useAgent) sees
`state.document` grow token-by-token, without waiting for the tool call
to finish.
This is the canonical per-token state-streaming pattern:
docs.copilotkit.ai/integrations/langgraph/shared-state/predictive-state-updates
"""
# @region[state-streaming-middleware]
import uuid
from langchain.agents import AgentState as BaseAgentState, create_agent
from langchain.tools import ToolRuntime, tool
from langchain_core.messages import ToolMessage
from langchain_openai import ChatOpenAI
from langgraph.types import Command
from copilotkit import (
CopilotKitMiddleware,
StateItem,
StateStreamingMiddleware,
)
class AgentState(BaseAgentState):
"""Shared state. `document` is streamed token-by-token."""
document: str
@tool
def write_document(document: str, runtime: ToolRuntime) -> Command:
"""Write a document for the user.
Always call this tool when the user asks you to write or draft
something of any length (an essay, poem, email, summary, etc.).
The `document` argument is streamed *per token* into shared agent
state under the `document` key, so the UI can render it as it is
generated.
"""
return Command(
update={
"document": document,
"messages": [
ToolMessage(
content="Document written to shared state.",
name="write_document",
id=str(uuid.uuid4()),
tool_call_id=runtime.tool_call_id,
)
],
}
)
graph = create_agent(
model=ChatOpenAI(model="gpt-5.4"),
tools=[write_document],
middleware=[
CopilotKitMiddleware(),
# Forward every token of write_document's `document` argument
# straight into state["document"] while the tool call is still
# streaming. Without this, `document` would only update once
# the tool call completes.
#
# NOTE: the frontend `usePredictStateSubscription` hook indexes
# the (partial-JSON-parsed) tool args by `state_key`, so the
# tool's argument name MUST match `state_key` ("document") for
# per-token deltas to land in `state.document`.
StateStreamingMiddleware(
StateItem(
state_key="document",
tool="write_document",
tool_argument="document",
)
),
],
state_schema=AgentState,
system_prompt=(
"You are a collaborative writing assistant. Whenever the user asks "
"you to write, draft, or revise any piece of text, ALWAYS call the "
"`write_document` tool with the full content as a single string in "
"the `document` argument. Never paste the document into a chat "
"message directly — the document belongs in shared state and the "
"UI renders it live as you type."
),
)
# @endregion[state-streaming-middleware]