forked from CopilotKit/CopilotKit
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsubagents.py
More file actions
299 lines (251 loc) · 11.5 KB
/
Copy pathsubagents.py
File metadata and controls
299 lines (251 loc) · 11.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
"""LangGraph agent backing the Sub-Agents demo.
Demonstrates multi-agent delegation with a visible delegation log.
A top-level "supervisor" LLM orchestrates three specialized sub-agents,
exposed as tools:
- `research_agent` — gathers facts
- `writing_agent` — drafts prose
- `critique_agent` — reviews drafts
Each sub-agent is a full `create_agent(...)` under the hood. Every
delegation appends an entry to the `delegations` slot in shared agent
state so the UI can render a live "delegation log" as the supervisor
fans work out and collects results. This is the canonical LangGraph
sub-agents-as-tools pattern, adapted to surface delegation events to
the frontend via CopilotKit's shared-state channel.
"""
# @region[supervisor-delegation-tools]
# @region[subagent-setup]
import operator
import uuid
from typing import Annotated, Literal, TypedDict
from langchain.agents import AgentState as BaseAgentState, create_agent
from langchain.tools import ToolRuntime, tool
from langchain_core.messages import AIMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langgraph.types import Command
from copilotkit import CopilotKitMiddleware
from src.agents._header_forwarding_middleware import HeaderForwardingMiddleware
# ---------------------------------------------------------------------------
# Shared state
# ---------------------------------------------------------------------------
class Delegation(TypedDict):
id: str
sub_agent: Literal["research_agent", "writing_agent", "critique_agent"]
task: str
status: Literal["completed"]
result: str
# Cap the supervisor → critique sub-agent loop at a single iteration.
# Without this, the supervisor LLM occasionally re-calls `critique_agent`
# repeatedly on the same draft (visible as stacking 🧐 cards in the
# chat). The critic only adds value once per draft, so we hard-stop
# after `_MAX_CRITIQUE_ITERATIONS` invocations and return a no-op
# result.
_MAX_CRITIQUE_ITERATIONS = 1
class AgentState(BaseAgentState):
"""Shared state. `delegations` is rendered as a live log in the UI.
`delegations` uses an `operator.add` reducer so that concurrent
sub-agent emissions in the same supervisor step accumulate into a
single list instead of conflicting (LangGraph would otherwise raise
`INVALID_CONCURRENT_GRAPH_UPDATE` — "Can receive only one value per
step. Use an Annotated key to handle multiple values.").
"""
delegations: Annotated[list[Delegation], operator.add]
# ---------------------------------------------------------------------------
# Sub-agents (real LLM agents under the hood)
# ---------------------------------------------------------------------------
# Each sub-agent is a full-fledged `create_agent(...)` with its own
# system prompt. They don't share memory or tools with the supervisor —
# the supervisor only sees their return value.
_sub_model = ChatOpenAI(model="gpt-5.4")
# Each sub-agent gets the minimal HeaderForwardingMiddleware so the
# inbound x-aimock-context (and other x-*) headers from the supervisor's
# inbound HTTP request propagate to the sub-agent's outbound LLM call.
# We intentionally do NOT attach the full CopilotKitMiddleware here —
# the supervisor handles App-Context / frontend-tool injection for the
# whole run, and adding it on sub-agents would double-inject prompt
# state. Header forwarding alone is enough to keep aimock matching.
_research_agent = create_agent(
model=_sub_model,
tools=[],
system_prompt=(
"You are a research sub-agent. Given a topic, produce a concise "
"bulleted list of 3-5 key facts. No preamble, no closing."
),
middleware=[HeaderForwardingMiddleware()],
)
_writing_agent = create_agent(
model=_sub_model,
tools=[],
system_prompt=(
"You are a writing sub-agent. Given a brief and optional source "
"facts, produce a polished 1-paragraph draft. Be clear and "
"concrete. No preamble."
),
middleware=[HeaderForwardingMiddleware()],
)
_critique_agent = create_agent(
model=_sub_model,
tools=[],
system_prompt=(
"You are an editorial critique sub-agent. Given a draft, give "
"2-3 crisp, actionable critiques. No preamble."
),
middleware=[HeaderForwardingMiddleware()],
)
# @endregion[subagent-setup]
# Sentinel surfaced when a sub-agent run produces no usable text. Kept
# as a module-level constant so the harness probe (and any UI fallback)
# can match the exact phrase. The leading/trailing angle brackets keep
# it out of plausible LLM phrasing.
SUB_AGENT_EMPTY_SENTINEL = "<sub-agent produced no output>"
def _invoke_sub_agent(agent, task: str) -> str:
"""Run a sub-agent on `task` and return its final prose message."""
result = agent.invoke({"messages": [HumanMessage(content=task)]})
messages = result.get("messages", [])
# Walk newest -> oldest so we pick the answer for THIS task, not a stale
# intro. Skip empty AIMessages that only carry tool_calls.
for msg in reversed(messages):
if isinstance(msg, AIMessage):
content = msg.content
if isinstance(content, str) and content.strip():
return content
# Some providers stream content as a list of content blocks
# (e.g. {"type": "text", "text": "..."}); concatenate the text.
# The `isinstance(block.get("text"), str)` guard rejects
# `{"type": "text", "text": null}` payloads — a known provider
# quirk — that would otherwise crash `"".join(...)` with
# `TypeError: sequence item N: expected str instance, NoneType found`.
if isinstance(content, list):
parts = [
block["text"]
for block in content
if isinstance(block, dict)
and block.get("type") == "text"
and isinstance(block.get("text"), str)
]
joined = "".join(parts).strip()
if joined:
return joined
return SUB_AGENT_EMPTY_SENTINEL
def _delegation_update(
sub_agent: str,
task: str,
result: str,
tool_call_id: str,
) -> Command:
"""Append a completed delegation entry to shared state.
Returns just the new entry (a one-element list). The reducer on
`AgentState.delegations` is `operator.add`, which concatenates the
new list with the prior state — so we must NOT echo back the
existing delegations here, or they would be duplicated each step.
"""
entry: Delegation = {
"id": str(uuid.uuid4()),
"sub_agent": sub_agent, # type: ignore[typeddict-item]
"task": task,
"status": "completed",
"result": result,
}
return Command(
update={
"delegations": [entry],
"messages": [
ToolMessage(
content=result,
name=sub_agent,
id=str(uuid.uuid4()),
tool_call_id=tool_call_id,
)
],
}
)
# ---------------------------------------------------------------------------
# Supervisor tools (each tool delegates to one sub-agent)
# ---------------------------------------------------------------------------
# Each @tool wraps a sub-agent invocation. The supervisor LLM "calls"
# these tools to delegate work; each call synchronously runs the
# matching sub-agent, records the delegation into shared state, and
# returns the sub-agent's output as a ToolMessage the supervisor can
# read on its next step.
@tool
def research_agent(task: str, runtime: ToolRuntime) -> Command:
"""Delegate a research task to the research sub-agent.
Use for: gathering facts, background, definitions, statistics.
Returns a bulleted list of key facts.
"""
result = _invoke_sub_agent(_research_agent, task)
return _delegation_update("research_agent", task, result, runtime.tool_call_id)
@tool
def writing_agent(task: str, runtime: ToolRuntime) -> Command:
"""Delegate a drafting task to the writing sub-agent.
Use for: producing a polished paragraph, draft, or summary. Pass
relevant facts from prior research inside `task`.
"""
result = _invoke_sub_agent(_writing_agent, task)
return _delegation_update("writing_agent", task, result, runtime.tool_call_id)
@tool
def critique_agent(task: str, runtime: ToolRuntime) -> Command:
"""Delegate a critique task to the critique sub-agent.
Use for: reviewing a draft and suggesting concrete improvements.
Capped at `_MAX_CRITIQUE_ITERATIONS` invocations per supervisor run
— the supervisor LLM occasionally re-calls the critic in a loop and
each rerun produces near-identical output, so additional calls are
short-circuited with a no-op result that nudges the supervisor to
finish.
"""
state: AgentState = runtime.state # type: ignore[assignment]
delegations = state.get("delegations") or []
prior_critiques = sum(
1 for d in delegations if d.get("sub_agent") == "critique_agent"
)
if prior_critiques >= _MAX_CRITIQUE_ITERATIONS:
# Short-circuit without appending another delegation entry — the
# UI renders one card per delegation and we want exactly one
# critic card per supervisor run, even if the LLM ignores the
# system prompt and re-issues the call.
skip_message = (
"Critique already produced for this run. "
"Stop calling critique_agent and return your final answer "
"to the user now."
)
return Command(
update={
"messages": [
ToolMessage(
content=skip_message,
name="critique_agent",
id=str(uuid.uuid4()),
tool_call_id=runtime.tool_call_id,
)
],
}
)
result = _invoke_sub_agent(_critique_agent, task)
return _delegation_update("critique_agent", task, result, runtime.tool_call_id)
# @endregion[supervisor-delegation-tools]
# ---------------------------------------------------------------------------
# Supervisor (the graph we export)
# ---------------------------------------------------------------------------
graph = create_agent(
model=ChatOpenAI(model="gpt-5.4"),
tools=[research_agent, writing_agent, critique_agent],
middleware=[CopilotKitMiddleware()],
state_schema=AgentState,
system_prompt=(
"You are a supervisor agent that coordinates three specialized "
"sub-agents to produce high-quality deliverables.\n\n"
"Available sub-agents (call them as tools):\n"
" - research_agent: gathers facts on a topic.\n"
" - writing_agent: turns facts + a brief into a polished draft.\n"
" - critique_agent: reviews a draft and suggests improvements.\n\n"
"For every non-trivial user request, delegate in sequence: "
"research_agent -> writing_agent -> critique_agent. "
"IMPORTANT: call EACH sub-agent EXACTLY ONCE per user request. "
"After critique_agent returns, do NOT call any sub-agent "
"again — return a concise final answer to the user that "
"incorporates the critique. Pass the relevant facts/draft "
"through the `task` argument of each tool. Keep your own "
"messages short — explain the plan once, delegate, then return "
"a concise summary once done. The UI shows the user a live log "
"of every sub-agent delegation."
),
)