forked from CopilotKit/CopilotKit
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathagentcore-runner.ts
More file actions
67 lines (63 loc) · 2.65 KB
/
Copy pathagentcore-runner.ts
File metadata and controls
67 lines (63 loc) · 2.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import { randomUUID } from "node:crypto";
import {
EventType,
type BaseEvent,
type Message,
type MessagesSnapshotEvent,
type ToolCall,
type ToolCallResultEvent,
} from "@ag-ui/client";
import { InMemoryAgentRunner } from "@copilotkit/runtime/v2";
import { concatMap, Observable, of } from "rxjs";
/**
* AgentCore stores conversation history server-side via AgentCoreMemorySaver /
* AgentCoreMemorySessionManager. When CopilotKit reconnects to an existing
* thread (e.g. page refresh), two issues arise that this runner fixes:
*
* 1. Unknown threads — CopilotKit may call `connect()` for a thread that has
* never had a `run()` (first load). The base runner would error; we emit an
* empty snapshot instead so the UI initialises cleanly.
* 2. Missing tool-call results — AgentCore's replayed history contains assistant
* messages with tool calls but no corresponding TOOL_CALL_RESULT events.
* CopilotKit needs those to reconcile its message state, so we synthesise
* empty results for each past tool call before the snapshot.
*/
export class AgentCoreRunner extends InMemoryAgentRunner {
private readonly knownThreadIds = new Set<string>();
override run(request: Parameters<InMemoryAgentRunner["run"]>[0]) {
if (request.threadId) this.knownThreadIds.add(request.threadId);
return super.run(request);
}
override connect(request: Parameters<InMemoryAgentRunner["connect"]>[0]) {
if (!request.threadId || !this.knownThreadIds.has(request.threadId)) {
const threadId = request.threadId ?? randomUUID();
const runId = randomUUID();
return of(
{ type: EventType.RUN_STARTED, threadId, runId },
{ type: EventType.MESSAGES_SNAPSHOT, messages: [] },
{ type: EventType.RUN_FINISHED, threadId, runId },
) as Observable<BaseEvent>;
}
return super.connect(request).pipe(
concatMap((event: BaseEvent) => {
if (event.type !== EventType.MESSAGES_SNAPSHOT) return of(event);
const snapshot = event as MessagesSnapshotEvent;
const replayedResults: ToolCallResultEvent[] =
snapshot.messages.flatMap((message: Message) => {
if (message.role !== "assistant" || !message.toolCalls?.length)
return [];
return message.toolCalls.map<ToolCallResultEvent>(
(toolCall: ToolCall) => ({
type: EventType.TOOL_CALL_RESULT,
toolCallId: toolCall.id,
messageId: `${toolCall.id}-result`,
content: "",
role: "tool",
}),
);
});
return of(...replayedResults, snapshot) as Observable<BaseEvent>;
}),
);
}
}