forked from CopilotKit/CopilotKit
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathobservability.ts
More file actions
165 lines (152 loc) · 4.56 KB
/
Copy pathobservability.ts
File metadata and controls
165 lines (152 loc) · 4.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import type { RuntimeEventSource } from "../service-adapters/events";
import { RuntimeEventTypes } from "../service-adapters/events";
export interface LLMRequestData {
threadId?: string;
runId?: string;
model?: string;
messages: any[];
actions?: any[];
forwardedParameters?: any;
timestamp: number;
provider?: string;
[key: string]: any;
}
export interface LLMResponseData {
threadId: string;
runId?: string;
model?: string;
output: any;
latency: number;
timestamp: number;
provider?: string;
isProgressiveChunk?: boolean;
isFinalResponse?: boolean;
[key: string]: any;
}
export interface LLMErrorData {
threadId?: string;
runId?: string;
model?: string;
error: Error | string;
timestamp: number;
provider?: string;
[key: string]: any;
}
export interface CopilotObservabilityHooks {
handleRequest: (data: LLMRequestData) => void | Promise<void>;
handleResponse: (data: LLMResponseData) => void | Promise<void>;
handleError: (data: LLMErrorData) => void | Promise<void>;
}
/**
* Configuration for CopilotKit logging functionality.
*
* @remarks
* Custom logging handlers require a valid CopilotKit public API key.
* Sign up at https://docs.copilotkit.ai/built-in-agent/quickstart#create-a-free-account to get your key.
*/
export interface CopilotObservabilityConfig {
/**
* Enable or disable logging functionality.
*
* @default false
*/
enabled: boolean;
/**
* Controls whether logs are streamed progressively or buffered.
* - When true: Each token and update is logged as it's generated (real-time)
* - When false: Complete responses are logged after completion (batched)
*
* @default true
*/
progressive: boolean;
/**
* Custom observability hooks for request, response, and error events.
*
* @remarks
* Using custom observability hooks requires a valid CopilotKit public API key.
*/
hooks: CopilotObservabilityHooks;
}
/**
* Setup progressive logging by wrapping the event stream
*/
function setupProgressiveLogging(
eventSource: RuntimeEventSource,
streamedChunks: any[],
requestStartTime: number,
context: {
threadId?: string;
runId?: string;
model?: string;
provider?: string;
agentName?: string;
nodeName?: string;
},
publicApiKey?: string,
): void {
if (
this.observability?.enabled &&
this.observability.progressive &&
publicApiKey
) {
// Keep reference to original stream function
const originalStream = eventSource.stream.bind(eventSource);
// Wrap the stream function to intercept events
eventSource.stream = async (callback) => {
await originalStream(async (eventStream$) => {
// Create subscription to capture streaming events
eventStream$.subscribe({
next: (event) => {
// Only log content chunks
if (event.type === RuntimeEventTypes.TextMessageContent) {
// Store the chunk
streamedChunks.push(event.content);
// Log each chunk separately for progressive mode
try {
const progressiveData: LLMResponseData = {
threadId: context.threadId || "",
runId: context.runId,
model: context.model,
output: event.content,
latency: Date.now() - requestStartTime,
timestamp: Date.now(),
provider: context.provider,
isProgressiveChunk: true,
agentName: context.agentName,
nodeName: context.nodeName,
};
// Use Promise to handle async logger without awaiting
Promise.resolve()
.then(() => {
this.observability.hooks.handleResponse(progressiveData);
})
.catch((error) => {
console.error("Error in progressive logging:", error);
});
} catch (error) {
console.error("Error preparing progressive log data:", error);
}
}
},
});
// Call the original callback with the event stream
await callback(eventStream$);
});
};
}
}
/**
* Log error if observability is enabled
*/
async function logObservabilityError(
errorData: LLMErrorData,
publicApiKey?: string,
): Promise<void> {
if (this.observability?.enabled && publicApiKey) {
try {
await this.observability.hooks.handleError(errorData);
} catch (logError) {
console.error("Error logging LLM error:", logError);
}
}
}