forked from CopilotKit/CopilotKit
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathagent_executor.py
More file actions
197 lines (170 loc) · 8.07 KB
/
Copy pathagent_executor.py
File metadata and controls
197 lines (170 loc) · 8.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import (
DataPart,
Part,
Task,
TaskState,
TextPart,
UnsupportedOperationError,
)
from a2a.utils import (
new_agent_parts_message,
new_agent_text_message,
new_task,
)
from a2a.utils.errors import ServerError
from a2ui.a2ui_extension import create_a2ui_part, try_activate_a2ui_extension
from agent import RestaurantAgent
logger = logging.getLogger(__name__)
class RestaurantAgentExecutor(AgentExecutor):
"""Restaurant AgentExecutor Example."""
def __init__(self, base_url: str):
# Instantiate two agents: one for UI and one for text-only.
# The appropriate one will be chosen at execution time.
self.ui_agent = RestaurantAgent(base_url=base_url, use_ui=True)
self.text_agent = RestaurantAgent(base_url=base_url, use_ui=False)
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
query = ""
ui_event_part = None
action = None
logger.info(
f"--- Client requested extensions: {context.requested_extensions} ---"
)
use_ui = try_activate_a2ui_extension(context)
# Determine which agent to use based on whether the a2ui extension is active.
if use_ui:
agent = self.ui_agent
logger.info(
"--- AGENT_EXECUTOR: A2UI extension is active. Using UI agent. ---"
)
else:
agent = self.text_agent
logger.info(
"--- AGENT_EXECUTOR: A2UI extension is not active. Using text agent. ---"
)
if context.message and context.message.parts:
logger.info(
f"--- AGENT_EXECUTOR: Processing {len(context.message.parts)} message parts ---"
)
for i, part in enumerate(context.message.parts):
if isinstance(part.root, DataPart):
if "userAction" in part.root.data:
logger.info(f" Part {i}: Found a2ui UI ClientEvent payload.")
ui_event_part = part.root.data["userAction"]
else:
logger.info(f" Part {i}: DataPart (data: {part.root.data})")
elif isinstance(part.root, TextPart):
logger.info(f" Part {i}: TextPart (text: {part.root.text})")
else:
logger.info(f" Part {i}: Unknown part type ({type(part.root)})")
if ui_event_part:
logger.info(f"Received a2ui ClientEvent: {ui_event_part}")
action = ui_event_part.get("actionName")
ctx = ui_event_part.get("context", {})
if action == "book_restaurant":
restaurant_name = ctx.get("restaurantName", "Unknown Restaurant")
address = ctx.get("address", "Address not provided")
image_url = ctx.get("imageUrl", "")
query = f"USER_WANTS_TO_BOOK: {restaurant_name}, Address: {address}, ImageURL: {image_url}"
elif action == "submit_booking":
restaurant_name = ctx.get("restaurantName", "Unknown Restaurant")
party_size = ctx.get("partySize", "Unknown Size")
reservation_time = ctx.get("reservationTime", "Unknown Time")
dietary_reqs = ctx.get("dietary", "None")
image_url = ctx.get("imageUrl", "")
query = f"User submitted a booking for {restaurant_name} for {party_size} people at {reservation_time} with dietary requirements: {dietary_reqs}. The image URL is {image_url}"
else:
query = f"User submitted an event: {action} with data: {ctx}"
else:
logger.info("No a2ui UI event part found. Falling back to text input.")
query = context.get_user_input()
logger.info(f"--- AGENT_EXECUTOR: Final query for LLM: '{query}' ---")
task = context.current_task
if not task:
task = new_task(context.message)
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
async for item in agent.stream(query, task.context_id):
is_task_complete = item["is_task_complete"]
if not is_task_complete:
await updater.update_status(
TaskState.working,
new_agent_text_message(item["updates"], task.context_id, task.id),
)
continue
final_state = (
TaskState.completed
if action == "submit_booking"
else TaskState.input_required
)
content = item["content"]
final_parts = []
if "---a2ui_JSON---" in content:
logger.info("Splitting final response into text and UI parts.")
text_content, json_string = content.split("---a2ui_JSON---", 1)
if text_content.strip():
final_parts.append(Part(root=TextPart(text=text_content.strip())))
if json_string.strip():
try:
json_string_cleaned = (
json_string.strip().lstrip("```json").rstrip("```").strip()
)
# The new protocol sends a stream of JSON objects.
# For this example, we'll assume they are sent as a list in the final response.
json_data = json.loads(json_string_cleaned)
if isinstance(json_data, list):
logger.info(
f"Found {len(json_data)} messages. Creating individual DataParts."
)
for message in json_data:
final_parts.append(create_a2ui_part(message))
else:
# Handle the case where a single JSON object is returned
logger.info(
"Received a single JSON object. Creating a DataPart."
)
final_parts.append(create_a2ui_part(json_data))
except json.JSONDecodeError as e:
logger.error(f"Failed to parse UI JSON: {e}")
final_parts.append(Part(root=TextPart(text=json_string)))
else:
final_parts.append(Part(root=TextPart(text=content.strip())))
logger.info("--- FINAL PARTS TO BE SENT ---")
for i, part in enumerate(final_parts):
logger.info(f" - Part {i}: Type = {type(part.root)}")
if isinstance(part.root, TextPart):
logger.info(f" - Text: {part.root.text[:200]}...")
elif isinstance(part.root, DataPart):
logger.info(f" - Data: {str(part.root.data)[:200]}...")
logger.info("-----------------------------")
await updater.update_status(
final_state,
new_agent_parts_message(final_parts, task.context_id, task.id),
final=(final_state == TaskState.completed),
)
break
async def cancel(
self, request: RequestContext, event_queue: EventQueue
) -> Task | None:
raise ServerError(error=UnsupportedOperationError())