forked from CopilotKit/CopilotKit
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathshared_state_streaming.py
More file actions
101 lines (86 loc) · 3.36 KB
/
Copy pathshared_state_streaming.py
File metadata and controls
101 lines (86 loc) · 3.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
"""shared-state-streaming — MAF agent that streams `document` per token.
Mirrors LangGraph's `langgraph-python/src/agents/shared_state_streaming.py`.
The frontend (`src/app/demos/shared-state-streaming/page.tsx`) subscribes
to `agent.state.document` via `useAgent` and re-renders the document
view as content arrives. This agent's job is to call `write_document`
with a full document string; the `predict_state_config` here mirrors
LGP's `StateStreamingMiddleware(StateItem(state_key="document",
tool="write_document", tool_argument="document"))` — it tells the
runtime to forward every token of the tool's `document` argument
directly into `state.document` while the tool call is still streaming.
"""
from __future__ import annotations
from textwrap import dedent
from typing import Annotated
from agent_framework import Agent, BaseChatClient, tool
from agent_framework_ag_ui import AgentFrameworkAgent, state_update
from pydantic import Field
STATE_SCHEMA: dict[str, object] = {
"document": {
"type": "string",
"description": "The full document body, streamed token-by-token.",
}
}
# Tells the runtime to stream tool-argument deltas straight into
# `state.document` while `write_document` is still streaming — matches
# LGP's StateStreamingMiddleware setup.
PREDICT_STATE_CONFIG: dict[str, dict[str, str]] = {
"document": {
"tool": "write_document",
"tool_argument": "document",
}
}
@tool(
name="write_document",
description=(
"Write a document for the user. Always call this tool when the "
"user asks you to write, draft, or revise any text. The "
"`document` argument is streamed per-token into shared state "
"under the `document` key so the UI renders the body live."
),
)
def write_document(
document: Annotated[
str,
Field(description="The full document content as a single string."),
],
):
"""Commit the final document body to shared state.
Per-token streaming of the `document` arg is handled by the runtime
via `predict_state_config`; this final `state_update` is the
authoritative commit after the tool finishes streaming.
"""
return state_update(
text="Document written to shared state.",
state={"document": document},
)
SYSTEM_PROMPT = dedent(
"""
You are a collaborative writing assistant. Whenever the user asks
you to write, draft, or revise any piece of text, ALWAYS call the
`write_document` tool with the full content as a single string in
the `document` argument. Never paste the document into a chat
message directly — the document belongs in shared state and the UI
renders it live as you type.
"""
).strip()
def create_shared_state_streaming_agent(
chat_client: BaseChatClient,
) -> AgentFrameworkAgent:
"""Instantiate the shared-state-streaming MAF agent."""
base_agent = Agent(
client=chat_client,
name="shared_state_streaming_agent",
instructions=SYSTEM_PROMPT,
tools=[write_document],
)
return AgentFrameworkAgent(
agent=base_agent,
name="SharedStateStreamingAgent",
description=(
"Per-token state streaming: `write_document` arg deltas land "
"in `state.document` as the tool call is generated."
),
predict_state_config=PREDICT_STATE_CONFIG,
require_confirmation=False,
)