Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 147 additions & 0 deletions src/lib/usage-normalizer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/**
* Pure (no DB, no network, no global state) helpers that convert upstream
* usage payloads from OpenAI and Anthropic into a single shape the rest of
* the system stores.
*
* Field rules (see docs/design/02-database-schema.md):
* - Anthropic `cache_creation_input_tokens` is folded into `inputTokens`.
* - OpenAI `completion_tokens` already includes reasoning tokens — never
* add them on top of `outputTokens`. `reasoningTokens` is informational.
*/

export interface NormalizedUsage {
inputTokens: number
cachedInputTokens: number
outputTokens: number
reasoningTokens: number
totalTokens: number
}

export class UsageMissingError extends Error {
constructor(message = "Upstream stream never delivered usage information") {
super(message)
this.name = "UsageMissingError"
}
}

interface OpenAIUsageShape {
prompt_tokens?: number
completion_tokens?: number
total_tokens?: number
prompt_tokens_details?: { cached_tokens?: number }
completion_tokens_details?: { reasoning_tokens?: number }
}

interface AnthropicUsageShape {
input_tokens?: number
output_tokens?: number
cache_read_input_tokens?: number
cache_creation_input_tokens?: number
}

interface AnthropicMessageShape {
type?: string
message?: { usage?: AnthropicUsageShape }
usage?: AnthropicUsageShape
}

const numOr0 = (v: unknown): number => (typeof v === "number" ? v : 0)

export function normalizeOpenAIFinal(usage: unknown): NormalizedUsage {
const u = (usage ?? {}) as OpenAIUsageShape
const inputTokens = numOr0(u.prompt_tokens)
const cachedInputTokens = numOr0(u.prompt_tokens_details?.cached_tokens)
const outputTokens = numOr0(u.completion_tokens)
const reasoningTokens = numOr0(u.completion_tokens_details?.reasoning_tokens)
const totalTokens = numOr0(u.total_tokens) || inputTokens + outputTokens
return {
inputTokens,
cachedInputTokens,
outputTokens,
reasoningTokens,
totalTokens,
}
}

export function normalizeAnthropicMessage(message: unknown): NormalizedUsage {
const m = (message ?? {}) as { usage?: AnthropicUsageShape }
const u = m.usage ?? {}
const baseInput = numOr0(u.input_tokens)
const cacheCreate = numOr0(u.cache_creation_input_tokens)
const cachedInputTokens = numOr0(u.cache_read_input_tokens)
const inputTokens = baseInput + cacheCreate
const outputTokens = numOr0(u.output_tokens)
return {
inputTokens,
cachedInputTokens,
outputTokens,
reasoningTokens: 0,
totalTokens: inputTokens + outputTokens,
}
}

export function normalizeEmbeddings(usage: unknown): NormalizedUsage {
const u = (usage ?? {}) as { prompt_tokens?: number; total_tokens?: number }
const inputTokens = numOr0(u.prompt_tokens)
return {
inputTokens,
cachedInputTokens: 0,
outputTokens: 0,
reasoningTokens: 0,
totalTokens: numOr0(u.total_tokens) || inputTokens,
}
}

export interface StreamUsageAccumulator {
feed(chunk: unknown): void
finalize(): NormalizedUsage
}

export function createOpenAIAccumulator(): StreamUsageAccumulator {
let saved: OpenAIUsageShape | undefined

return {
feed(chunk) {
const c = chunk as { usage?: OpenAIUsageShape } | null | undefined
if (c && c.usage) {
saved = c.usage
}
},
finalize() {
if (!saved) throw new UsageMissingError()
return normalizeOpenAIFinal(saved)
},
}
}

export function createAnthropicAccumulator(): StreamUsageAccumulator {
let inputTokens = 0
let cachedInputTokens = 0
let outputTokens = 0

return {
feed(chunk) {
const ev = (chunk ?? {}) as AnthropicMessageShape
if (ev.type === "message_start" && ev.message?.usage) {
const u = ev.message.usage
inputTokens =
numOr0(u.input_tokens) + numOr0(u.cache_creation_input_tokens)
cachedInputTokens = numOr0(u.cache_read_input_tokens)
outputTokens = numOr0(u.output_tokens)
return
}
if (ev.type === "message_delta" && ev.usage) {
outputTokens = Math.max(outputTokens, numOr0(ev.usage.output_tokens))
}
},
finalize() {
return {
inputTokens,
cachedInputTokens,
outputTokens,
reasoningTokens: 0,
totalTokens: inputTokens + outputTokens,
}
},
}
}
142 changes: 142 additions & 0 deletions tests/usage-normalizer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import { test, expect, describe } from "bun:test"

import {
createAnthropicAccumulator,
createOpenAIAccumulator,
normalizeAnthropicMessage,
normalizeEmbeddings,
normalizeOpenAIFinal,
UsageMissingError,
} from "../src/lib/usage-normalizer"

describe("normalizeOpenAIFinal", () => {
test("maps prompt/completion/total + cached + reasoning", () => {
const out = normalizeOpenAIFinal({
prompt_tokens: 100,
completion_tokens: 50,
total_tokens: 150,
prompt_tokens_details: { cached_tokens: 20 },
completion_tokens_details: { reasoning_tokens: 10 },
})
expect(out).toEqual({
inputTokens: 100,
cachedInputTokens: 20,
outputTokens: 50,
reasoningTokens: 10,
totalTokens: 150,
})
})

test("missing fields default to 0", () => {
const out = normalizeOpenAIFinal({
prompt_tokens: 10,
completion_tokens: 5,
})
expect(out.inputTokens).toBe(10)
expect(out.outputTokens).toBe(5)
expect(out.cachedInputTokens).toBe(0)
expect(out.reasoningTokens).toBe(0)
expect(out.totalTokens).toBe(15)
})
})

describe("normalizeAnthropicMessage", () => {
test("folds cache_creation_input_tokens into inputTokens", () => {
const out = normalizeAnthropicMessage({
usage: {
input_tokens: 100,
cache_creation_input_tokens: 25,
cache_read_input_tokens: 75,
output_tokens: 30,
},
})
expect(out.inputTokens).toBe(125)
expect(out.cachedInputTokens).toBe(75)
expect(out.outputTokens).toBe(30)
expect(out.totalTokens).toBe(155)
})
})

describe("normalizeEmbeddings", () => {
test("uses prompt_tokens as input", () => {
const out = normalizeEmbeddings({ prompt_tokens: 12, total_tokens: 12 })
expect(out.inputTokens).toBe(12)
expect(out.outputTokens).toBe(0)
expect(out.totalTokens).toBe(12)
})
})

describe("createOpenAIAccumulator", () => {
test("captures usage from final chunk", () => {
const acc = createOpenAIAccumulator()
acc.feed({ choices: [{ delta: { content: "hi" } }] })
acc.feed({
choices: [],
usage: {
prompt_tokens: 10,
completion_tokens: 5,
total_tokens: 15,
},
})
const out = acc.finalize()
expect(out.inputTokens).toBe(10)
expect(out.outputTokens).toBe(5)
})

test("throws when usage chunk never arrives", () => {
const acc = createOpenAIAccumulator()
acc.feed({ choices: [{ delta: { content: "hi" } }] })
expect(() => acc.finalize()).toThrow(UsageMissingError)
})
})

describe("createAnthropicAccumulator", () => {
test("aggregates message_start + message_delta", () => {
const acc = createAnthropicAccumulator()
acc.feed({
type: "message_start",
message: {
usage: {
input_tokens: 50,
cache_creation_input_tokens: 10,
cache_read_input_tokens: 20,
output_tokens: 1,
},
},
})
acc.feed({
type: "content_block_delta",
delta: { type: "text_delta", text: "a" },
})
acc.feed({ type: "message_delta", usage: { output_tokens: 7 } })
acc.feed({ type: "message_delta", usage: { output_tokens: 12 } })
const out = acc.finalize()
expect(out.inputTokens).toBe(60)
expect(out.cachedInputTokens).toBe(20)
expect(out.outputTokens).toBe(12)
expect(out.totalTokens).toBe(72)
})

test("returns sane zeros if only message_start arrived", () => {
const acc = createAnthropicAccumulator()
acc.feed({
type: "message_start",
message: { usage: { input_tokens: 5, output_tokens: 1 } },
})
const out = acc.finalize()
expect(out.inputTokens).toBe(5)
expect(out.outputTokens).toBe(1)
})

test("returns zeros when nothing arrives", () => {
const acc = createAnthropicAccumulator()
const out = acc.finalize()
expect(out).toEqual({
inputTokens: 0,
cachedInputTokens: 0,
outputTokens: 0,
reasoningTokens: 0,
totalTokens: 0,
})
})
})