forked from CopilotKit/CopilotKit
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patha2ui_dynamic.py
More file actions
300 lines (274 loc) · 11.4 KB
/
Copy patha2ui_dynamic.py
File metadata and controls
300 lines (274 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
"""Claude Agent SDK backend for the Declarative Generative UI (A2UI Dynamic) demo.
The agent exposes a single `generate_a2ui(context: str)` tool. When called,
it invokes a secondary OpenAI client bound to the `render_a2ui` tool schema
(forced via `tool_choice`) and returns an `a2ui_operations` container which
the runtime's A2UI middleware detects and forwards to the frontend renderer.
The dedicated runtime route (`api/copilotkit-declarative-gen-ui/route.ts`)
sets `injectA2UITool: false` so the runtime does not double-bind a second
A2UI tool on top of this one — the registered client catalog is still
serialised into `copilotkit.context` so the secondary LLM knows what's
available.
Mirrors the langgraph-python and ag2 references.
"""
from __future__ import annotations
import json
import os
import traceback
from collections.abc import AsyncIterator
from textwrap import dedent
from typing import Any
import anthropic
import openai
from ag_ui.core import (
EventType,
RunAgentInput,
RunFinishedEvent,
RunStartedEvent,
TextMessageContentEvent,
TextMessageEndEvent,
TextMessageStartEvent,
ToolCallArgsEvent,
ToolCallEndEvent,
ToolCallResultEvent,
ToolCallStartEvent,
)
from ag_ui.encoder import EventEncoder
from tools import (
RENDER_A2UI_TOOL_SCHEMA,
build_a2ui_operations_from_tool_call,
)
SYSTEM_PROMPT = dedent("""
You are a demo assistant for Declarative Generative UI (A2UI — Dynamic
Schema). Whenever a response would benefit from a rich visual — a
dashboard, status report, KPI summary, card layout, info grid, a
pie/donut chart of part-of-whole breakdowns, a bar chart comparing
values across categories, or anything more structured than plain text —
call `generate_a2ui` to draw it. The registered catalog includes
`Card`, `StatusBadge`, `Metric`, `InfoRow`, `PrimaryButton`, `PieChart`,
and `BarChart` (in addition to the basic A2UI primitives). Prefer
`PieChart` for part-of-whole breakdowns and `BarChart` for comparisons
across categories. `generate_a2ui` takes a single `context` argument
summarising the conversation. Keep chat replies to one short sentence;
let the UI do the talking.
""").strip()
GENERATE_A2UI_TOOL = {
"name": "generate_a2ui",
"description": (
"Generate dynamic A2UI components based on the conversation. "
"A secondary LLM designs the UI schema and data using the registered catalog."
),
"input_schema": {
"type": "object",
"properties": {
"context": {
"type": "string",
"description": "Conversation context summary the secondary LLM should design UI from.",
},
},
"required": ["context"],
},
}
def _generate_a2ui(
context: str, conversation_messages: list[dict[str, Any]] | None = None
) -> dict[str, Any]:
"""Invoke a secondary LLM bound to render_a2ui and return an operations container."""
client = openai.OpenAI()
llm_messages: list[dict[str, Any]] = [
{"role": "system", "content": context or "Generate a useful dashboard UI."},
]
if conversation_messages:
llm_messages.extend(conversation_messages)
else:
llm_messages.append(
{
"role": "user",
"content": "Generate a dynamic A2UI dashboard based on the conversation.",
}
)
response = client.chat.completions.create(
model=os.getenv("OPENAI_MODEL", "gpt-4.1"),
messages=llm_messages,
tools=[{"type": "function", "function": RENDER_A2UI_TOOL_SCHEMA}],
tool_choice={"type": "function", "function": {"name": "render_a2ui"}},
)
choice = response.choices[0]
if choice.message.tool_calls:
args = json.loads(choice.message.tool_calls[0].function.arguments)
return build_a2ui_operations_from_tool_call(args)
return {"error": "LLM did not call render_a2ui"}
async def run_a2ui_dynamic_agent(input_data: RunAgentInput) -> AsyncIterator[str]:
"""Stream a Claude conversation that may call `generate_a2ui`."""
encoder = EventEncoder()
client = anthropic.AsyncAnthropic(api_key=os.getenv("ANTHROPIC_API_KEY", ""))
messages: list[dict[str, Any]] = []
for msg in input_data.messages or []:
role = msg.role.value if hasattr(msg.role, "value") else str(msg.role)
if role not in ("user", "assistant"):
continue
raw = getattr(msg, "content", None)
content = ""
if isinstance(raw, str):
content = raw
elif isinstance(raw, list):
parts = []
for part in raw:
if hasattr(part, "text"):
parts.append(part.text)
elif isinstance(part, dict) and "text" in part:
parts.append(part["text"])
content = "".join(parts)
if content:
messages.append({"role": role, "content": content})
thread_id = input_data.thread_id or "default"
run_id = input_data.run_id or "run-1"
yield encoder.encode(
RunStartedEvent(type=EventType.RUN_STARTED, thread_id=thread_id, run_id=run_id)
)
while True:
msg_id = f"msg-{run_id}-{len(messages)}"
yield encoder.encode(
TextMessageStartEvent(
type=EventType.TEXT_MESSAGE_START,
message_id=msg_id,
role="assistant",
)
)
response_text = ""
tool_calls: list[dict[str, Any]] = []
try:
async with client.messages.stream(
model=os.getenv("ANTHROPIC_MODEL", "claude-opus-4-5"),
max_tokens=2048,
system=SYSTEM_PROMPT,
messages=messages,
tools=[GENERATE_A2UI_TOOL],
) as stream:
current_tool_id: str | None = None
current_tool_name: str | None = None
current_tool_args = ""
async for event in stream:
etype = type(event).__name__
if etype == "RawContentBlockStartEvent":
block = event.content_block # type: ignore[attr-defined]
if block.type == "tool_use":
current_tool_id = block.id
current_tool_name = block.name
current_tool_args = ""
yield encoder.encode(
ToolCallStartEvent(
type=EventType.TOOL_CALL_START,
tool_call_id=current_tool_id,
tool_call_name=current_tool_name,
parent_message_id=msg_id,
)
)
elif etype == "RawContentBlockDeltaEvent":
delta = event.delta # type: ignore[attr-defined]
if delta.type == "text_delta":
response_text += delta.text
yield encoder.encode(
TextMessageContentEvent(
type=EventType.TEXT_MESSAGE_CONTENT,
message_id=msg_id,
delta=delta.text,
)
)
elif delta.type == "input_json_delta":
current_tool_args += delta.partial_json
yield encoder.encode(
ToolCallArgsEvent(
type=EventType.TOOL_CALL_ARGS,
tool_call_id=current_tool_id or "",
delta=delta.partial_json,
)
)
elif etype in (
"RawContentBlockStopEvent",
"ParsedContentBlockStopEvent",
):
if current_tool_id and current_tool_name:
yield encoder.encode(
ToolCallEndEvent(
type=EventType.TOOL_CALL_END,
tool_call_id=current_tool_id,
)
)
try:
parsed = (
json.loads(current_tool_args)
if current_tool_args
else {}
)
except json.JSONDecodeError:
parsed = {}
tool_calls.append(
{
"id": current_tool_id,
"name": current_tool_name,
"input": parsed,
}
)
current_tool_id = None
current_tool_name = None
current_tool_args = ""
except Exception:
err_text = f"Agent error: {traceback.format_exc()}"
yield encoder.encode(
TextMessageContentEvent(
type=EventType.TEXT_MESSAGE_CONTENT,
message_id=msg_id,
delta=err_text,
)
)
yield encoder.encode(
TextMessageEndEvent(
type=EventType.TEXT_MESSAGE_END,
message_id=msg_id,
)
)
if not tool_calls:
break
# Build assistant turn with tool_use blocks.
assistant_content: list[dict[str, Any]] = []
if response_text:
assistant_content.append({"type": "text", "text": response_text})
for tc in tool_calls:
assistant_content.append(
{
"type": "tool_use",
"id": tc["id"],
"name": tc["name"],
"input": tc["input"],
}
)
messages.append({"role": "assistant", "content": assistant_content})
# Execute generate_a2ui and emit tool_result.
tool_results: list[dict[str, Any]] = []
for tc in tool_calls:
if tc["name"] == "generate_a2ui":
ctx = tc["input"].get("context", "")
result_obj = _generate_a2ui(ctx, conversation_messages=messages)
result_text = json.dumps(result_obj)
else:
result_text = json.dumps({"error": f"unknown tool {tc['name']}"})
yield encoder.encode(
ToolCallResultEvent(
type=EventType.TOOL_CALL_RESULT,
tool_call_id=tc["id"],
message_id=f"{msg_id}-tool-result-{tc['id']}",
content=result_text,
)
)
tool_results.append(
{
"type": "tool_result",
"tool_use_id": tc["id"],
"content": result_text,
}
)
messages.append({"role": "user", "content": tool_results})
yield encoder.encode(
RunFinishedEvent(
type=EventType.RUN_FINISHED, thread_id=thread_id, run_id=run_id
)
)