Skip to content

Commit 3a23ff5

Browse files
committed
feat: classify late stream disconnects
1 parent 0a05b26 commit 3a23ff5

File tree

7 files changed

+331
-19
lines changed

7 files changed

+331
-19
lines changed

src/lib/exchange-capture.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ export function createResponsesStreamAccumulator(): ResponsesStreamAccumulator {
365365
export function collectResponsesStreamEvent(
366366
accumulator: ResponsesStreamAccumulator,
367367
data: string,
368-
): void {
368+
): boolean {
369369
try {
370370
const parsed = JSON.parse(data) as {
371371
type?: string
@@ -376,10 +376,13 @@ export function collectResponsesStreamEvent(
376376
|| parsed.type === "response.done"
377377
) {
378378
accumulator.response = parsed.response
379+
return true
379380
}
380381
} catch {
381382
// ignore parse errors
382383
}
384+
385+
return false
383386
}
384387

385388
interface AnthropicToolInputBuffer {

src/lib/sse.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,12 @@ export function normalizeUpstreamSSEMessage(
3030
export async function writeUpstreamSSE(
3131
stream: SSEStreamingApi,
3232
chunk: UpstreamSSEMessage,
33-
): Promise<void> {
33+
): Promise<boolean> {
3434
const message = normalizeUpstreamSSEMessage(chunk)
3535
if (!message) {
36-
return
36+
return false
3737
}
3838

3939
await stream.writeSSE(message)
40+
return true
4041
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
import consola from "consola"
2+
3+
export type StreamClientDisconnectPhase =
4+
| "before_terminal_signal"
5+
| "after_terminal_event_seen"
6+
| "after_terminal_event_written"
7+
| "after_done_seen"
8+
| "after_done_written"
9+
| "after_upstream_end"
10+
11+
export type StreamClientDisconnectCompletionEstimate =
12+
| "incomplete"
13+
| "likely_complete"
14+
| "very_likely_complete"
15+
16+
export interface StreamClientDisconnectProgress {
17+
forwardedChunkCount: number
18+
sawDoneSentinel: boolean
19+
sawTerminalEvent: boolean
20+
upstreamEnded: boolean
21+
wroteDoneSentinel: boolean
22+
wroteTerminalEvent: boolean
23+
}
24+
25+
export function createStreamClientDisconnectProgress(): StreamClientDisconnectProgress {
26+
return {
27+
forwardedChunkCount: 0,
28+
sawDoneSentinel: false,
29+
sawTerminalEvent: false,
30+
upstreamEnded: false,
31+
wroteDoneSentinel: false,
32+
wroteTerminalEvent: false,
33+
}
34+
}
35+
36+
export function classifyStreamClientDisconnect(
37+
progress: StreamClientDisconnectProgress,
38+
): {
39+
completionEstimate: StreamClientDisconnectCompletionEstimate
40+
late: boolean
41+
phase: StreamClientDisconnectPhase
42+
} {
43+
if (progress.upstreamEnded) {
44+
return {
45+
completionEstimate: "very_likely_complete",
46+
late: true,
47+
phase: "after_upstream_end",
48+
}
49+
}
50+
51+
if (progress.wroteDoneSentinel) {
52+
return {
53+
completionEstimate: "very_likely_complete",
54+
late: true,
55+
phase: "after_done_written",
56+
}
57+
}
58+
59+
if (progress.sawDoneSentinel) {
60+
return {
61+
completionEstimate: "likely_complete",
62+
late: true,
63+
phase: "after_done_seen",
64+
}
65+
}
66+
67+
if (progress.wroteTerminalEvent) {
68+
return {
69+
completionEstimate: "likely_complete",
70+
late: true,
71+
phase: "after_terminal_event_written",
72+
}
73+
}
74+
75+
if (progress.sawTerminalEvent) {
76+
return {
77+
completionEstimate: "likely_complete",
78+
late: true,
79+
phase: "after_terminal_event_seen",
80+
}
81+
}
82+
83+
return {
84+
completionEstimate: "incomplete",
85+
late: false,
86+
phase: "before_terminal_signal",
87+
}
88+
}
89+
90+
export function formatStreamClientDisconnectLog(options: {
91+
completionEstimate: StreamClientDisconnectCompletionEstimate
92+
late: boolean
93+
phase: StreamClientDisconnectPhase
94+
requestId: string
95+
route: string
96+
upstreamPath: string
97+
forwardedChunks: number
98+
}): string {
99+
return [
100+
"stream_client_disconnect",
101+
`requestId=${options.requestId}`,
102+
`route=${options.route}`,
103+
`upstreamPath=${options.upstreamPath}`,
104+
`disconnectPhase=${options.phase}`,
105+
`completionEstimate=${options.completionEstimate}`,
106+
`forwardedChunks=${options.forwardedChunks}`,
107+
`late=${String(options.late)}`,
108+
].join(" ")
109+
}
110+
111+
export function logStreamClientDisconnect(options: {
112+
progress: StreamClientDisconnectProgress
113+
requestId: string
114+
route: string
115+
upstreamPath: string
116+
}): void {
117+
const classification = classifyStreamClientDisconnect(options.progress)
118+
const line = formatStreamClientDisconnectLog({
119+
completionEstimate: classification.completionEstimate,
120+
late: classification.late,
121+
phase: classification.phase,
122+
requestId: options.requestId,
123+
route: options.route,
124+
upstreamPath: options.upstreamPath,
125+
forwardedChunks: options.progress.forwardedChunkCount,
126+
})
127+
128+
if (classification.late) {
129+
consola.info(line)
130+
return
131+
}
132+
133+
consola.warn(line)
134+
}

src/routes/chat-completions/handler.ts

Lines changed: 68 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ import { logRequest } from "~/lib/request-log"
1919
import { disableBunRequestTimeout } from "~/lib/request-timeout"
2020
import { writeUpstreamSSE } from "~/lib/sse"
2121
import { state } from "~/lib/state"
22+
import {
23+
createStreamClientDisconnectProgress,
24+
logStreamClientDisconnect,
25+
} from "~/lib/stream-client-disconnect"
2226
import {
2327
recordClientAbortedExchange,
2428
recordCompletedExchange,
@@ -247,6 +251,7 @@ function streamChatCompletion({
247251
let streamCompleted = false
248252
let streamFailed = false
249253
let exchangeCaptured = false
254+
const disconnectProgress = createStreamClientDisconnectProgress()
250255

251256
const captureOnce = async (options: {
252257
outcome: "completed" | "client_aborted" | "failed"
@@ -320,7 +325,8 @@ function streamChatCompletion({
320325
"abort",
321326
() => {
322327
if (!streamCompleted && !streamFailed) {
323-
consola.warn("Chat completions stream client disconnected", {
328+
logStreamClientDisconnect({
329+
progress: disconnectProgress,
324330
requestId: trace.requestId,
325331
route: c.req.path,
326332
upstreamPath: "/chat/completions",
@@ -336,12 +342,23 @@ function streamChatCompletion({
336342
async (stream) => {
337343
for await (const chunk of response) {
338344
consola.debug("Streaming chunk:", JSON.stringify(chunk))
339-
if (chunk.data && chunk.data !== "[DONE]") {
345+
const isDoneSentinel = chunk.data === "[DONE]"
346+
let sawTerminalEvent = false
347+
348+
if (isDoneSentinel) {
349+
disconnectProgress.sawDoneSentinel = true
350+
}
351+
352+
if (chunk.data && !isDoneSentinel) {
340353
try {
341354
const parsed = JSON.parse(chunk.data) as {
342355
usage?: { prompt_tokens?: number; completion_tokens?: number }
343356
} & Parameters<typeof collectChatCompletionChunk>[1]
344357
collectChatCompletionChunk(accumulator, parsed)
358+
sawTerminalEvent = isTerminalChatCompletionChunk(parsed)
359+
if (sawTerminalEvent) {
360+
disconnectProgress.sawTerminalEvent = true
361+
}
345362
if (parsed.usage) {
346363
usageIn = parsed.usage.prompt_tokens
347364
usageOut = parsed.usage.completion_tokens
@@ -350,9 +367,21 @@ function streamChatCompletion({
350367
// ignore parse errors for usage extraction
351368
}
352369
}
353-
await writeUpstreamSSE(stream, chunk)
370+
371+
if (await writeUpstreamSSE(stream, chunk)) {
372+
disconnectProgress.forwardedChunkCount += 1
373+
374+
if (sawTerminalEvent) {
375+
disconnectProgress.wroteTerminalEvent = true
376+
}
377+
378+
if (isDoneSentinel) {
379+
disconnectProgress.wroteDoneSentinel = true
380+
}
381+
}
354382
}
355383

384+
disconnectProgress.upstreamEnded = true
356385
streamCompleted = true
357386
await captureOnce({ outcome: "completed" })
358387
},
@@ -392,6 +421,7 @@ function streamCodexCompletion({
392421
let streamCompleted = false
393422
let streamFailed = false
394423
let exchangeCaptured = false
424+
const disconnectProgress = createStreamClientDisconnectProgress()
395425

396426
const captureOnce = async (options: {
397427
outcome: "completed" | "client_aborted" | "failed"
@@ -472,7 +502,8 @@ function streamCodexCompletion({
472502
"abort",
473503
() => {
474504
if (!streamCompleted && !streamFailed) {
475-
consola.warn("Codex responses stream client disconnected", {
505+
logStreamClientDisconnect({
506+
progress: disconnectProgress,
476507
requestId: trace.requestId,
477508
route: c.req.path,
478509
upstreamPath: "/v1/responses",
@@ -501,21 +532,41 @@ function streamCodexCompletion({
501532
)) {
502533
const chunkData =
503534
typeof chunk.data === "string" ? chunk.data : await chunk.data
504-
if (chunkData && chunkData !== "[DONE]") {
535+
const isDoneSentinel = chunkData === "[DONE]"
536+
let sawTerminalEvent = false
537+
538+
if (isDoneSentinel) {
539+
disconnectProgress.sawDoneSentinel = true
540+
}
541+
542+
if (chunkData && !isDoneSentinel) {
505543
try {
506-
collectChatCompletionChunk(
507-
accumulator,
508-
JSON.parse(chunkData) as Parameters<
509-
typeof collectChatCompletionChunk
510-
>[1],
511-
)
544+
const parsed = JSON.parse(chunkData) as Parameters<
545+
typeof collectChatCompletionChunk
546+
>[1]
547+
collectChatCompletionChunk(accumulator, parsed)
548+
sawTerminalEvent = isTerminalChatCompletionChunk(parsed)
549+
if (sawTerminalEvent) {
550+
disconnectProgress.sawTerminalEvent = true
551+
}
512552
} catch {
513553
// ignore parse errors
514554
}
515555
}
556+
516557
await stream.writeSSE(chunk)
558+
disconnectProgress.forwardedChunkCount += 1
559+
560+
if (sawTerminalEvent) {
561+
disconnectProgress.wroteTerminalEvent = true
562+
}
563+
564+
if (isDoneSentinel) {
565+
disconnectProgress.wroteDoneSentinel = true
566+
}
517567
}
518568

569+
disconnectProgress.upstreamEnded = true
519570
streamCompleted = true
520571
await captureOnce({ outcome: "completed" })
521572
},
@@ -532,6 +583,12 @@ function streamCodexCompletion({
532583
)
533584
}
534585

586+
function isTerminalChatCompletionChunk(
587+
chunk: Parameters<typeof collectChatCompletionChunk>[1],
588+
): boolean {
589+
return Boolean(chunk.choices[0]?.finish_reason)
590+
}
591+
535592
async function captureChatExchange({
536593
c,
537594
payload,

src/routes/messages/handler.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ import { checkRateLimit } from "~/lib/rate-limit"
1515
import { logRequest } from "~/lib/request-log"
1616
import { disableBunRequestTimeout } from "~/lib/request-timeout"
1717
import { state } from "~/lib/state"
18+
import {
19+
createStreamClientDisconnectProgress,
20+
logStreamClientDisconnect,
21+
} from "~/lib/stream-client-disconnect"
1822
import {
1923
recordClientAbortedExchange,
2024
recordCompletedExchange,
@@ -166,6 +170,7 @@ function streamAnthropicCompletion({
166170
let streamFailed = false
167171
let exchangeCaptured = false
168172
const accumulator = createAnthropicResponseAccumulator()
173+
const disconnectProgress = createStreamClientDisconnectProgress()
169174

170175
const captureOnce = async (options: {
171176
outcome: "completed" | "client_aborted" | "failed"
@@ -234,7 +239,8 @@ function streamAnthropicCompletion({
234239
"abort",
235240
() => {
236241
if (!streamCompleted && !streamFailed) {
237-
consola.warn("Anthropic stream client disconnected", {
242+
logStreamClientDisconnect({
243+
progress: disconnectProgress,
238244
requestId: trace.requestId,
239245
route: c.req.path,
240246
upstreamPath: "/chat/completions",
@@ -251,6 +257,7 @@ function streamAnthropicCompletion({
251257
for await (const rawEvent of response) {
252258
consola.debug("Copilot raw stream event:", JSON.stringify(rawEvent))
253259
if (rawEvent.data === "[DONE]") {
260+
disconnectProgress.sawDoneSentinel = true
254261
break
255262
}
256263

@@ -268,13 +275,21 @@ function streamAnthropicCompletion({
268275
for (const event of events) {
269276
collectAnthropicStreamEvent(accumulator, event)
270277
consola.debug("Translated Anthropic event:", JSON.stringify(event))
278+
if (event.type === "message_stop") {
279+
disconnectProgress.sawTerminalEvent = true
280+
}
271281
await stream.writeSSE({
272282
event: event.type,
273283
data: JSON.stringify(event),
274284
})
285+
disconnectProgress.forwardedChunkCount += 1
286+
if (event.type === "message_stop") {
287+
disconnectProgress.wroteTerminalEvent = true
288+
}
275289
}
276290
}
277291

292+
disconnectProgress.upstreamEnded = true
278293
streamCompleted = true
279294
await captureOnce({ outcome: "completed" })
280295
},

0 commit comments

Comments
 (0)