Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 197 additions & 34 deletions src/routes/messages/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,18 @@ import type { Context } from "hono"
import consola from "consola"
import { streamSSE } from "hono/streaming"

import type { Account } from "~/lib/account-pool"

import { awaitApproval } from "~/lib/approval"
import { checkRateLimit } from "~/lib/rate-limit"
import { state } from "~/lib/state"
import {
createOpenAIAccumulator,
normalizeOpenAIFinal,
UsageMissingError,
type NormalizedUsage,
} from "~/lib/usage-normalizer"
import { recordUsage } from "~/lib/usage-recorder"
import { makeApiContext, resolveAndMapModelId } from "~/lib/utils"
import { withAccount } from "~/lib/with-account"
import {
Expand All @@ -24,6 +33,129 @@ import {
} from "./non-stream-translation"
import { translateChunkToAnthropicEvents } from "./stream-translation"

const ZERO_USAGE: NormalizedUsage = {
inputTokens: 0,
cachedInputTokens: 0,
outputTokens: 0,
reasoningTokens: 0,
totalTokens: 0,
}

interface RecordCtx {
account: Account
modelId: string
isInternal: boolean
tStart: number
}

interface RecordOkArgs {
ctx: RecordCtx
usage: NormalizedUsage
isStreaming: boolean
requestId?: string
}

interface RecordFailureArgs {
ctx: RecordCtx
status: "error" | "aborted"
isStreaming: boolean
usage?: NormalizedUsage
}

function recordOk(args: RecordOkArgs) {
recordUsage({
account: args.ctx.account,
modelId: args.ctx.modelId,
endpoint: "messages",
upstreamFormat: "anthropic",
isStreaming: args.isStreaming,
usage: args.usage,
durationMs: Date.now() - args.ctx.tStart,
status: "ok",
requestId: args.requestId,
isInternal: args.ctx.isInternal,
})
}

function recordFailure(args: RecordFailureArgs) {
recordUsage({
account: args.ctx.account,
modelId: args.ctx.modelId,
endpoint: "messages",
upstreamFormat: "anthropic",
isStreaming: args.isStreaming,
usage: args.usage ?? ZERO_USAGE,
durationMs: Date.now() - args.ctx.tStart,
status: args.status,
isInternal: args.ctx.isInternal,
})
}

function streamAndRecord(
c: Context,
response: AsyncIterable<{ data?: string }>,
ctx: RecordCtx,
) {
return streamSSE(c, async (stream) => {
const accumulator = createOpenAIAccumulator()
const streamState: AnthropicStreamState = {
messageStartSent: false,
contentBlockIndex: 0,
contentBlockOpen: false,
toolCalls: {},
}
let status: "ok" | "error" | "aborted" = "ok"
let lastRequestId: string | undefined

try {
for await (const rawEvent of response) {
if (c.req.raw.signal.aborted) {
status = "aborted"
break
}
if (rawEvent.data === "[DONE]") break
if (!rawEvent.data) continue

const chunk = JSON.parse(rawEvent.data) as ChatCompletionChunk
if (chunk.id) lastRequestId = chunk.id
accumulator.feed(chunk)

const events = translateChunkToAnthropicEvents(chunk, streamState)
for (const event of events) {
await stream.writeSSE({
event: event.type,
data: JSON.stringify(event),
})
}
}
} catch (err) {
status = "error"
consola.error("Streaming /v1/messages error:", err)
}

let usage: NormalizedUsage
try {
usage = accumulator.finalize()
} catch (err) {
if (err instanceof UsageMissingError) {
consola.warn(
"Anthropic stream completed without an include_usage frame; recording zero usage",
)
} else {
consola.error("Failed to finalize Anthropic stream usage:", err)
}
usage = ZERO_USAGE
if (status === "ok") status = "error"
}

if (status === "ok") {
recordOk({ ctx, usage, isStreaming: true, requestId: lastRequestId })
} else {
recordFailure({ ctx, status, isStreaming: true, usage })
}
})
}

export async function handleCompletion(c: Context) {
await checkRateLimit(state)

Expand All @@ -48,53 +180,84 @@ export async function handleCompletion(c: Context) {
await awaitApproval()
}

const response = await withAccount(c, (account) =>
createChatCompletions(makeApiContext(account), openAIPayload),
)
const isInternal = c.req.header("x-internal-pricing-sync") === "1"
const tStart = Date.now()
let usedAccount: Account | undefined

let response: Awaited<ReturnType<typeof createChatCompletions>>
try {
response = await withAccount(c, (account) => {
usedAccount = account
return createChatCompletions(makeApiContext(account), openAIPayload)
})
} catch (err) {
if (usedAccount) {
recordFailure({
ctx: {
account: usedAccount,
modelId: openAIPayload.model,
isInternal,
tStart,
},
status: "error",
isStreaming: Boolean(openAIPayload.stream),
})
}
throw err
}

if (isNonStreaming(response)) {
consola.debug(
"Non-streaming response from Copilot:",
JSON.stringify(response).slice(-400),
)
const anthropicResponse = translateToAnthropic(response)
consola.debug(
"Translated Anthropic response:",
JSON.stringify(anthropicResponse),
)
if (usedAccount) {
recordOk({
ctx: {
account: usedAccount,
modelId: openAIPayload.model,
isInternal,
tStart,
},
usage: normalizeOpenAIFinal(response.usage),
isStreaming: false,
requestId: response.id,
})
}
return c.json(anthropicResponse)
}

consola.debug("Streaming response from Copilot")
return streamSSE(c, async (stream) => {
const streamState: AnthropicStreamState = {
messageStartSent: false,
contentBlockIndex: 0,
contentBlockOpen: false,
toolCalls: {},
}

for await (const rawEvent of response) {
consola.debug("Copilot raw stream event:", JSON.stringify(rawEvent))
if (rawEvent.data === "[DONE]") {
break
if (!usedAccount) {
return streamSSE(c, async (stream) => {
const streamState: AnthropicStreamState = {
messageStartSent: false,
contentBlockIndex: 0,
contentBlockOpen: false,
toolCalls: {},
}

if (!rawEvent.data) {
continue
for await (const rawEvent of response) {
if (rawEvent.data === "[DONE]") break
if (!rawEvent.data) continue
const chunk = JSON.parse(rawEvent.data) as ChatCompletionChunk
for (const event of translateChunkToAnthropicEvents(
chunk,
streamState,
)) {
await stream.writeSSE({
event: event.type,
data: JSON.stringify(event),
})
}
}

const chunk = JSON.parse(rawEvent.data) as ChatCompletionChunk
const events = translateChunkToAnthropicEvents(chunk, streamState)

for (const event of events) {
consola.debug("Translated Anthropic event:", JSON.stringify(event))
await stream.writeSSE({
event: event.type,
data: JSON.stringify(event),
})
}
}
})
}
return streamAndRecord(c, response, {
account: usedAccount,
modelId: openAIPayload.model,
isInternal,
tStart,
})
}

Expand Down