Skip to content

Commit 3b5d4bd

Browse files
lubobill1990claude
andcommitted
feat(usage): record /v1/messages usage (Task #10)
The /v1/messages route translates Anthropic→OpenAI before sending, so we use the OpenAI accumulator to capture usage. endpoint='messages', upstreamFormat='anthropic' tags the API surface. Stream branch: feed every chunk into accumulator, finalize on close (ok/aborted/error). Non-stream: normalize response.usage. Errors before the upstream call write a status='error' row. Refs #10 Co-Authored-By: Claude Opus 4 <noreply@anthropic.com>
1 parent 93bd534 commit 3b5d4bd

1 file changed

Lines changed: 197 additions & 34 deletions

File tree

src/routes/messages/handler.ts

Lines changed: 197 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,18 @@ import type { Context } from "hono"
33
import consola from "consola"
44
import { streamSSE } from "hono/streaming"
55

6+
import type { Account } from "~/lib/account-pool"
7+
68
import { awaitApproval } from "~/lib/approval"
79
import { checkRateLimit } from "~/lib/rate-limit"
810
import { state } from "~/lib/state"
11+
import {
12+
createOpenAIAccumulator,
13+
normalizeOpenAIFinal,
14+
UsageMissingError,
15+
type NormalizedUsage,
16+
} from "~/lib/usage-normalizer"
17+
import { recordUsage } from "~/lib/usage-recorder"
918
import { makeApiContext, resolveAndMapModelId } from "~/lib/utils"
1019
import { withAccount } from "~/lib/with-account"
1120
import {
@@ -24,6 +33,129 @@ import {
2433
} from "./non-stream-translation"
2534
import { translateChunkToAnthropicEvents } from "./stream-translation"
2635

36+
const ZERO_USAGE: NormalizedUsage = {
37+
inputTokens: 0,
38+
cachedInputTokens: 0,
39+
outputTokens: 0,
40+
reasoningTokens: 0,
41+
totalTokens: 0,
42+
}
43+
44+
interface RecordCtx {
45+
account: Account
46+
modelId: string
47+
isInternal: boolean
48+
tStart: number
49+
}
50+
51+
interface RecordOkArgs {
52+
ctx: RecordCtx
53+
usage: NormalizedUsage
54+
isStreaming: boolean
55+
requestId?: string
56+
}
57+
58+
interface RecordFailureArgs {
59+
ctx: RecordCtx
60+
status: "error" | "aborted"
61+
isStreaming: boolean
62+
usage?: NormalizedUsage
63+
}
64+
65+
function recordOk(args: RecordOkArgs) {
66+
recordUsage({
67+
account: args.ctx.account,
68+
modelId: args.ctx.modelId,
69+
endpoint: "messages",
70+
upstreamFormat: "anthropic",
71+
isStreaming: args.isStreaming,
72+
usage: args.usage,
73+
durationMs: Date.now() - args.ctx.tStart,
74+
status: "ok",
75+
requestId: args.requestId,
76+
isInternal: args.ctx.isInternal,
77+
})
78+
}
79+
80+
function recordFailure(args: RecordFailureArgs) {
81+
recordUsage({
82+
account: args.ctx.account,
83+
modelId: args.ctx.modelId,
84+
endpoint: "messages",
85+
upstreamFormat: "anthropic",
86+
isStreaming: args.isStreaming,
87+
usage: args.usage ?? ZERO_USAGE,
88+
durationMs: Date.now() - args.ctx.tStart,
89+
status: args.status,
90+
isInternal: args.ctx.isInternal,
91+
})
92+
}
93+
94+
function streamAndRecord(
95+
c: Context,
96+
response: AsyncIterable<{ data?: string }>,
97+
ctx: RecordCtx,
98+
) {
99+
return streamSSE(c, async (stream) => {
100+
const accumulator = createOpenAIAccumulator()
101+
const streamState: AnthropicStreamState = {
102+
messageStartSent: false,
103+
contentBlockIndex: 0,
104+
contentBlockOpen: false,
105+
toolCalls: {},
106+
}
107+
let status: "ok" | "error" | "aborted" = "ok"
108+
let lastRequestId: string | undefined
109+
110+
try {
111+
for await (const rawEvent of response) {
112+
if (c.req.raw.signal.aborted) {
113+
status = "aborted"
114+
break
115+
}
116+
if (rawEvent.data === "[DONE]") break
117+
if (!rawEvent.data) continue
118+
119+
const chunk = JSON.parse(rawEvent.data) as ChatCompletionChunk
120+
if (chunk.id) lastRequestId = chunk.id
121+
accumulator.feed(chunk)
122+
123+
const events = translateChunkToAnthropicEvents(chunk, streamState)
124+
for (const event of events) {
125+
await stream.writeSSE({
126+
event: event.type,
127+
data: JSON.stringify(event),
128+
})
129+
}
130+
}
131+
} catch (err) {
132+
status = "error"
133+
consola.error("Streaming /v1/messages error:", err)
134+
}
135+
136+
let usage: NormalizedUsage
137+
try {
138+
usage = accumulator.finalize()
139+
} catch (err) {
140+
if (err instanceof UsageMissingError) {
141+
consola.warn(
142+
"Anthropic stream completed without an include_usage frame; recording zero usage",
143+
)
144+
} else {
145+
consola.error("Failed to finalize Anthropic stream usage:", err)
146+
}
147+
usage = ZERO_USAGE
148+
if (status === "ok") status = "error"
149+
}
150+
151+
if (status === "ok") {
152+
recordOk({ ctx, usage, isStreaming: true, requestId: lastRequestId })
153+
} else {
154+
recordFailure({ ctx, status, isStreaming: true, usage })
155+
}
156+
})
157+
}
158+
27159
export async function handleCompletion(c: Context) {
28160
await checkRateLimit(state)
29161

@@ -48,53 +180,84 @@ export async function handleCompletion(c: Context) {
48180
await awaitApproval()
49181
}
50182

51-
const response = await withAccount(c, (account) =>
52-
createChatCompletions(makeApiContext(account), openAIPayload),
53-
)
183+
const isInternal = c.req.header("x-internal-pricing-sync") === "1"
184+
const tStart = Date.now()
185+
let usedAccount: Account | undefined
186+
187+
let response: Awaited<ReturnType<typeof createChatCompletions>>
188+
try {
189+
response = await withAccount(c, (account) => {
190+
usedAccount = account
191+
return createChatCompletions(makeApiContext(account), openAIPayload)
192+
})
193+
} catch (err) {
194+
if (usedAccount) {
195+
recordFailure({
196+
ctx: {
197+
account: usedAccount,
198+
modelId: openAIPayload.model,
199+
isInternal,
200+
tStart,
201+
},
202+
status: "error",
203+
isStreaming: Boolean(openAIPayload.stream),
204+
})
205+
}
206+
throw err
207+
}
54208

55209
if (isNonStreaming(response)) {
56210
consola.debug(
57211
"Non-streaming response from Copilot:",
58212
JSON.stringify(response).slice(-400),
59213
)
60214
const anthropicResponse = translateToAnthropic(response)
61-
consola.debug(
62-
"Translated Anthropic response:",
63-
JSON.stringify(anthropicResponse),
64-
)
215+
if (usedAccount) {
216+
recordOk({
217+
ctx: {
218+
account: usedAccount,
219+
modelId: openAIPayload.model,
220+
isInternal,
221+
tStart,
222+
},
223+
usage: normalizeOpenAIFinal(response.usage),
224+
isStreaming: false,
225+
requestId: response.id,
226+
})
227+
}
65228
return c.json(anthropicResponse)
66229
}
67230

68231
consola.debug("Streaming response from Copilot")
69-
return streamSSE(c, async (stream) => {
70-
const streamState: AnthropicStreamState = {
71-
messageStartSent: false,
72-
contentBlockIndex: 0,
73-
contentBlockOpen: false,
74-
toolCalls: {},
75-
}
76-
77-
for await (const rawEvent of response) {
78-
consola.debug("Copilot raw stream event:", JSON.stringify(rawEvent))
79-
if (rawEvent.data === "[DONE]") {
80-
break
232+
if (!usedAccount) {
233+
return streamSSE(c, async (stream) => {
234+
const streamState: AnthropicStreamState = {
235+
messageStartSent: false,
236+
contentBlockIndex: 0,
237+
contentBlockOpen: false,
238+
toolCalls: {},
81239
}
82-
83-
if (!rawEvent.data) {
84-
continue
240+
for await (const rawEvent of response) {
241+
if (rawEvent.data === "[DONE]") break
242+
if (!rawEvent.data) continue
243+
const chunk = JSON.parse(rawEvent.data) as ChatCompletionChunk
244+
for (const event of translateChunkToAnthropicEvents(
245+
chunk,
246+
streamState,
247+
)) {
248+
await stream.writeSSE({
249+
event: event.type,
250+
data: JSON.stringify(event),
251+
})
252+
}
85253
}
86-
87-
const chunk = JSON.parse(rawEvent.data) as ChatCompletionChunk
88-
const events = translateChunkToAnthropicEvents(chunk, streamState)
89-
90-
for (const event of events) {
91-
consola.debug("Translated Anthropic event:", JSON.stringify(event))
92-
await stream.writeSSE({
93-
event: event.type,
94-
data: JSON.stringify(event),
95-
})
96-
}
97-
}
254+
})
255+
}
256+
return streamAndRecord(c, response, {
257+
account: usedAccount,
258+
modelId: openAIPayload.model,
259+
isInternal,
260+
tStart,
98261
})
99262
}
100263

0 commit comments

Comments
 (0)