Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions src/lib/usage-recorder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import consola from "consola"

import type { Account } from "./account-pool"
import type { NormalizedUsage } from "./usage-normalizer"

import { getDb } from "./db"

export type UsageEndpoint = "chat.completions" | "messages" | "embeddings"
export type UpstreamFormat = "openai" | "anthropic"
export type UsageStatus = "ok" | "error" | "aborted"

export interface RecordUsageInput {
account: Account
modelId: string
endpoint: UsageEndpoint
upstreamFormat: UpstreamFormat
isStreaming: boolean
usage: NormalizedUsage
durationMs: number
status: UsageStatus
requestId?: string
isInternal?: boolean
}

interface PricingRow {
input_per_mtok: number | null
cached_input_per_mtok: number | null
output_per_mtok: number | null
reasoning_per_mtok: number | null
premium_unit_price: number | null
premium_multiplier: number | null
}

/**
* Record a single upstream usage event and atomically update the daily
* aggregate. Errors are swallowed (logged via consola); recorder failure
* must not break the caller's response path.
*/
export function recordUsage(input: RecordUsageInput): void {
if (input.isInternal) return

try {
const db = getDb()
const ts = Date.now()

const pricing = db
.query<PricingRow, [string]>(
`SELECT input_per_mtok,
cached_input_per_mtok,
output_per_mtok,
reasoning_per_mtok,
premium_unit_price,
premium_multiplier
FROM model_pricing
WHERE model_id = ?`,
)
.get(input.modelId)

const inputPrice = pricing?.input_per_mtok ?? null
const cachedInputPrice = pricing?.cached_input_per_mtok ?? null
const outputPrice = pricing?.output_per_mtok ?? null
const reasoningPrice = pricing?.reasoning_per_mtok ?? null
const premiumUnitPrice = pricing?.premium_unit_price ?? null
const premiumMultiplier = pricing?.premium_multiplier ?? null
const premiumRequestCount = premiumMultiplier ?? 0

const insertEvent = db.prepare(
`INSERT INTO usage_events (
ts, account_name, model_id, endpoint, upstream_format, is_streaming,
input_tokens, cached_input_tokens, output_tokens, reasoning_tokens,
total_tokens, premium_request_count,
input_price_snapshot, cached_input_price_snapshot,
output_price_snapshot, reasoning_price_snapshot,
premium_unit_price_snapshot, premium_multiplier_snapshot,
request_id, status, duration_ms
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
)

const upsertDaily = db.prepare(
`INSERT INTO usage_daily (
day, account_name, model_id, endpoint,
req_count, input_tokens, cached_input_tokens,
output_tokens, reasoning_tokens, total_tokens, premium_requests
) VALUES (
date(?/1000, 'unixepoch', 'localtime'),
?, ?, ?, 1, ?, ?, ?, ?, ?, ?
)
ON CONFLICT(day, account_name, model_id, endpoint) DO UPDATE SET
req_count = req_count + 1,
input_tokens = input_tokens + excluded.input_tokens,
cached_input_tokens = cached_input_tokens + excluded.cached_input_tokens,
output_tokens = output_tokens + excluded.output_tokens,
reasoning_tokens = reasoning_tokens + excluded.reasoning_tokens,
total_tokens = total_tokens + excluded.total_tokens,
premium_requests = premium_requests + excluded.premium_requests`,
)

const tx = db.transaction(() => {
insertEvent.run(
ts,
input.account.name,
input.modelId,
input.endpoint,
input.upstreamFormat,
input.isStreaming ? 1 : 0,
input.usage.inputTokens,
input.usage.cachedInputTokens,
input.usage.outputTokens,
input.usage.reasoningTokens,
input.usage.totalTokens,
premiumRequestCount,
inputPrice,
cachedInputPrice,
outputPrice,
reasoningPrice,
premiumUnitPrice,
premiumMultiplier,
input.requestId ?? null,
input.status,
input.durationMs,
)
upsertDaily.run(
ts,
input.account.name,
input.modelId,
input.endpoint,
input.usage.inputTokens,
input.usage.cachedInputTokens,
input.usage.outputTokens,
input.usage.reasoningTokens,
input.usage.totalTokens,
premiumRequestCount,
)
})
tx()
} catch (err) {
consola.error("[usage-recorder] failed to record usage:", err)
}
}
132 changes: 132 additions & 0 deletions tests/usage-recorder.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import { test, expect, describe, beforeEach } from "bun:test"

import type { Account } from "../src/lib/account-pool"

import { __resetDbForTests, initDb } from "../src/lib/db"
import { recordUsage } from "../src/lib/usage-recorder"

const ACCOUNT: Account = {
name: "alice",
accountType: "individual",
githubToken: "ghu_a",
copilotToken: "tok_a",
copilotTokenRefreshAt: 0,
inFlight: 0,
lastUsedAt: 0,
failureCount: 0,
}

const baseInput = {
account: ACCOUNT,
modelId: "gpt-4o",
endpoint: "chat.completions" as const,
upstreamFormat: "openai" as const,
isStreaming: false,
usage: {
inputTokens: 100,
cachedInputTokens: 20,
outputTokens: 50,
reasoningTokens: 0,
totalTokens: 150,
},
durationMs: 123,
status: "ok" as const,
}

function setupDb() {
__resetDbForTests()
const db = initDb(":memory:")
db.run(
"INSERT INTO accounts (name, account_type, created_at) VALUES (?, ?, ?)",
[ACCOUNT.name, ACCOUNT.accountType, Date.now()],
)
return db
}

describe("recordUsage", () => {
beforeEach(() => {
__resetDbForTests()
})

test("inserts an event and a daily row", () => {
const db = setupDb()
db.run(
`INSERT INTO model_pricing (
model_id, input_per_mtok, cached_input_per_mtok, output_per_mtok,
reasoning_per_mtok, premium_multiplier, premium_unit_price,
updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
["gpt-4o", 5, 1, 15, 0, 1.0, 0.04, Date.now()],
)
recordUsage(baseInput)

const events = db.query("SELECT * FROM usage_events").all() as Array<{
account_name: string
model_id: string
input_tokens: number
input_price_snapshot: number
premium_request_count: number
}>
expect(events).toHaveLength(1)
expect(events[0].account_name).toBe("alice")
expect(events[0].input_price_snapshot).toBe(5)
expect(events[0].premium_request_count).toBe(1)

const daily = db.query("SELECT * FROM usage_daily").all() as Array<{
req_count: number
input_tokens: number
premium_requests: number
}>
expect(daily).toHaveLength(1)
expect(daily[0].req_count).toBe(1)
expect(daily[0].input_tokens).toBe(100)
expect(daily[0].premium_requests).toBe(1)
})

test("second insert into same (day,account,model,endpoint) increments daily", () => {
const db = setupDb()
recordUsage(baseInput)
recordUsage(baseInput)
const daily = db
.query<
{ req_count: number; input_tokens: number },
[]
>("SELECT req_count, input_tokens FROM usage_daily")
.all()
expect(daily).toHaveLength(1)
expect(daily[0].req_count).toBe(2)
expect(daily[0].input_tokens).toBe(200)
})

test("missing model_pricing row -> snapshots null and no throw", () => {
const db = setupDb()
recordUsage(baseInput)
const ev = db
.query<
{ input_price_snapshot: number | null; premium_request_count: number },
[]
>("SELECT input_price_snapshot, premium_request_count FROM usage_events")
.get()
expect(ev?.input_price_snapshot).toBeNull()
expect(ev?.premium_request_count).toBe(0)
})

test("isInternal=true inserts nothing", () => {
const db = setupDb()
recordUsage({ ...baseInput, isInternal: true })
const events = db.query("SELECT * FROM usage_events").all()
expect(events).toHaveLength(0)
})

test("recorder errors are swallowed", () => {
setupDb()
// Force an error by passing an invalid endpoint type via cast.
expect(() =>
recordUsage({
...baseInput,
// @ts-expect-error intentional bad value to trigger SQL CHECK fail (none here, but recorder must not throw on weird input)
endpoint: undefined,
}),
).not.toThrow()
})
})