Skip to content

Commit e3b8b9e

Browse files
committed
feat: standardize handling of incomplete streams and improve finish_reason logic
- Added `handleIncompleteStream` function to synthesize appropriate SSE events when streams terminate abruptly without proper completion markers. - Introduced `messageStopSent` state for clearer tracking of stream termination. - Corrected handling of `finish_reason` for scenarios where tool calls are present, ensuring proper continuation of pending operations. - Updated tests and state interfaces to reflect new logic.
1 parent c8057c6 commit e3b8b9e

6 files changed

Lines changed: 120 additions & 31 deletions

File tree

src/lib/request-logger.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ export const requestLogger: MiddlewareHandler = async (c, next) => {
111111
const prefix = ok ? `${CYAN}${R}` : `${RED}${R}`
112112
const methodStr = pad(`${DIM}${method}${R}`, 4)
113113
const pathStr = pad(`${CYAN}${path}${R}`, 27)
114-
const modelStr = pad(model ? `${YELLOW}${model}${R}` : "", 18)
114+
const modelStr = pad(model ? `${YELLOW}${model}${R}` : "", 26)
115115
const sessionStr = pad(sessionId ? `${DIM}${sessionId}${R}` : "", 8)
116116
const streamStr = pad(stream === true ? `${BLUE}stream${R}` : "", 6)
117117
const statusStr = pad(colorStatus(status), 3)

src/routes/messages/anthropic-types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,8 @@ export type AnthropicStreamEventData =
276276
// State for streaming translation
277277
export interface AnthropicStreamState {
278278
messageStartSent: boolean
279+
/** Whether the terminal message_stop event has been emitted. */
280+
messageStopSent: boolean
279281
contentBlockIndex: number
280282
contentBlockOpen: boolean
281283
/** Whether the currently open content block is a thinking block. */

src/routes/messages/handler.ts

Lines changed: 80 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,7 @@ async function pipeStreamToClient(
392392
const { thinkingEnabled, imageTokenOverhead = 0 } = options
393393
const streamState: AnthropicStreamState = {
394394
messageStartSent: false,
395+
messageStopSent: false,
395396
contentBlockIndex: 0,
396397
contentBlockOpen: false,
397398
thinkingBlockOpen: false,
@@ -436,23 +437,7 @@ async function pipeStreamToClient(
436437
}
437438
}
438439

439-
// If the upstream stream ended without ever sending a usable chunk
440-
// (e.g. the model returned an empty response or only unsupported
441-
// event types), no message_start was emitted and the client sees an
442-
// empty SSE connection with no indication of what went wrong.
443-
// Emit a synthetic Anthropic error so the UI can surface the problem.
444-
if (!streamState.messageStartSent) {
445-
consola.warn(
446-
"Copilot stream ended without producing any content — emitting error event",
447-
)
448-
const errorEvent = translateErrorToAnthropicErrorEvent(
449-
"The model returned an empty response. This may indicate the model is unavailable or does not support this request.",
450-
)
451-
await stream.writeSSE({
452-
event: errorEvent.type,
453-
data: JSON.stringify(errorEvent),
454-
})
455-
}
440+
await handleIncompleteStream(stream, streamState)
456441
} catch (error) {
457442
consola.error("Stream error from Copilot:", error)
458443

@@ -480,6 +465,84 @@ async function pipeStreamToClient(
480465
}
481466
}
482467

468+
/**
469+
* Handles the case where the upstream stream ended without a proper Anthropic
470+
* termination sequence (message_delta + message_stop).
471+
*
472+
* Two scenarios:
473+
* 1. Stream never produced any content → emit a synthetic error event.
474+
* 2. Stream started (message_start sent) but ended without finish_reason →
475+
* synthesize the missing termination events so Claude Code can proceed.
476+
*/
477+
async function handleIncompleteStream(
478+
stream: SSEStreamingApi,
479+
state: AnthropicStreamState,
480+
): Promise<void> {
481+
if (!state.messageStartSent) {
482+
// No usable chunks arrived at all.
483+
consola.warn(
484+
"Copilot stream ended without producing any content — emitting error event",
485+
)
486+
const errorEvent = translateErrorToAnthropicErrorEvent(
487+
"The model returned an empty response. This may indicate the model is unavailable or does not support this request.",
488+
)
489+
await stream.writeSSE({
490+
event: errorEvent.type,
491+
data: JSON.stringify(errorEvent),
492+
})
493+
return
494+
}
495+
496+
if (state.messageStopSent) {
497+
return // Stream ended normally, nothing to do.
498+
}
499+
500+
// The upstream stream started but ended without a chunk containing
501+
// finish_reason — no message_delta / message_stop was ever sent.
502+
// Some models (notably Gemini) can terminate the stream abruptly after
503+
// emitting content or tool-call chunks. Without a proper termination
504+
// sequence Claude Code sees the SSE connection close with no indication
505+
// of completion and treats the turn as abandoned / silently dead.
506+
consola.warn(
507+
"Copilot stream ended without finish_reason — synthesizing message_delta/message_stop",
508+
)
509+
510+
if (state.contentBlockOpen) {
511+
await stream.writeSSE({
512+
event: "content_block_stop",
513+
data: JSON.stringify({
514+
type: "content_block_stop",
515+
index: state.contentBlockIndex,
516+
}),
517+
})
518+
}
519+
520+
// Determine the correct stop_reason: if tool calls were emitted
521+
// during the stream, the model intended "tool_use"; otherwise
522+
// default to "end_turn".
523+
const hasToolCalls = Object.keys(state.toolCalls).length > 0
524+
const stopReason = hasToolCalls ? "tool_use" : "end_turn"
525+
526+
await stream.writeSSE({
527+
event: "message_delta",
528+
data: JSON.stringify({
529+
type: "message_delta",
530+
delta: {
531+
stop_reason: stopReason,
532+
stop_sequence: null,
533+
},
534+
usage: {
535+
input_tokens: 0,
536+
output_tokens: 0,
537+
},
538+
}),
539+
})
540+
await stream.writeSSE({
541+
event: "message_stop",
542+
data: JSON.stringify({ type: "message_stop" }),
543+
})
544+
}
545+
483546
const isNonStreaming = (
484547
response: Awaited<ReturnType<typeof createChatCompletions>>,
485548
): response is ChatCompletionResponse => Object.hasOwn(response, "choices")

src/routes/messages/non-stream-translation.ts

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -387,25 +387,35 @@ export function translateToAnthropic(
387387

388388
// Note: GitHub Copilot doesn't generate thinking blocks, so we don't include them in responses
389389

390+
// Some models (notably Gemini) intermittently return finish_reason "stop"
391+
// even when they emitted tool calls. Correct this to "tool_calls" so Claude
392+
// Code executes the pending tool calls instead of treating the turn as done.
393+
const correctedStopReason =
394+
allToolUseBlocks.length > 0 && stopReason === "stop" ?
395+
"tool_calls"
396+
: stopReason
397+
390398
return {
391399
id: toAnthropicMessageId(response.id),
392400
type: "message",
393401
role: "assistant",
394402
model: response.model,
395403
content: [...allTextBlocks, ...allToolUseBlocks],
396-
stop_reason: mapOpenAIStopReasonToAnthropic(stopReason),
404+
stop_reason: mapOpenAIStopReasonToAnthropic(correctedStopReason),
397405
stop_sequence: null,
398-
usage: {
399-
input_tokens:
400-
(response.usage?.prompt_tokens ?? 0)
401-
- (response.usage?.prompt_tokens_details?.cached_tokens ?? 0),
402-
output_tokens: response.usage?.completion_tokens ?? 0,
403-
...(response.usage?.prompt_tokens_details?.cached_tokens
404-
!== undefined && {
405-
cache_read_input_tokens:
406-
response.usage.prompt_tokens_details.cached_tokens,
407-
}),
408-
},
406+
usage: buildAnthropicUsage(response.usage),
407+
}
408+
}
409+
410+
function buildAnthropicUsage(usage: ChatCompletionResponse["usage"]) {
411+
return {
412+
input_tokens:
413+
(usage?.prompt_tokens ?? 0)
414+
- (usage?.prompt_tokens_details?.cached_tokens ?? 0),
415+
output_tokens: usage?.completion_tokens ?? 0,
416+
...(usage?.prompt_tokens_details?.cached_tokens !== undefined && {
417+
cache_read_input_tokens: usage.prompt_tokens_details.cached_tokens,
418+
}),
409419
}
410420
}
411421

src/routes/messages/stream-translation.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,11 +236,22 @@ export function translateChunkToAnthropicEvents(
236236
state.contentBlockOpen = false
237237
}
238238

239+
// Some models (notably Gemini) intermittently return finish_reason "stop"
240+
// even when they emitted tool calls in the response. Claude Code interprets
241+
// stop_reason "end_turn" as "model is done" and skips pending tool
242+
// executions, causing the session to stall after a few rounds.
243+
// Detect this mismatch and correct finish_reason to "tool_calls".
244+
const hasToolCalls = Object.keys(state.toolCalls).length > 0
245+
const correctedFinishReason =
246+
hasToolCalls && choice.finish_reason === "stop" ?
247+
"tool_calls"
248+
: choice.finish_reason
249+
239250
events.push(
240251
{
241252
type: "message_delta",
242253
delta: {
243-
stop_reason: mapOpenAIStopReasonToAnthropic(choice.finish_reason),
254+
stop_reason: mapOpenAIStopReasonToAnthropic(correctedFinishReason),
244255
stop_sequence: null,
245256
},
246257
usage: {
@@ -259,6 +270,7 @@ export function translateChunkToAnthropicEvents(
259270
type: "message_stop",
260271
},
261272
)
273+
state.messageStopSent = true
262274
}
263275

264276
return events

tests/anthropic-response.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ describe("OpenAI to Anthropic Streaming Response Translation", () => {
249249

250250
const streamState: AnthropicStreamState = {
251251
messageStartSent: false,
252+
messageStopSent: false,
252253
contentBlockIndex: 0,
253254
contentBlockOpen: false,
254255
thinkingBlockOpen: false,
@@ -351,6 +352,7 @@ describe("OpenAI to Anthropic Streaming Response Translation", () => {
351352
// Streaming translation requires state
352353
const streamState: AnthropicStreamState = {
353354
messageStartSent: false,
355+
messageStopSent: false,
354356
contentBlockIndex: 0,
355357
contentBlockOpen: false,
356358
thinkingBlockOpen: false,

0 commit comments

Comments
 (0)