diff --git a/src/lib/usage-recorder.ts b/src/lib/usage-recorder.ts new file mode 100644 index 00000000..2debabca --- /dev/null +++ b/src/lib/usage-recorder.ts @@ -0,0 +1,139 @@ +import consola from "consola" + +import type { Account } from "./account-pool" +import type { NormalizedUsage } from "./usage-normalizer" + +import { getDb } from "./db" + +export type UsageEndpoint = "chat.completions" | "messages" | "embeddings" +export type UpstreamFormat = "openai" | "anthropic" +export type UsageStatus = "ok" | "error" | "aborted" + +export interface RecordUsageInput { + account: Account + modelId: string + endpoint: UsageEndpoint + upstreamFormat: UpstreamFormat + isStreaming: boolean + usage: NormalizedUsage + durationMs: number + status: UsageStatus + requestId?: string + isInternal?: boolean +} + +interface PricingRow { + input_per_mtok: number | null + cached_input_per_mtok: number | null + output_per_mtok: number | null + reasoning_per_mtok: number | null + premium_unit_price: number | null + premium_multiplier: number | null +} + +/** + * Record a single upstream usage event and atomically update the daily + * aggregate. Errors are swallowed (logged via consola); recorder failure + * must not break the caller's response path. + */ +export function recordUsage(input: RecordUsageInput): void { + if (input.isInternal) return + + try { + const db = getDb() + const ts = Date.now() + + const pricing = db + .query( + `SELECT input_per_mtok, + cached_input_per_mtok, + output_per_mtok, + reasoning_per_mtok, + premium_unit_price, + premium_multiplier + FROM model_pricing + WHERE model_id = ?`, + ) + .get(input.modelId) + + const inputPrice = pricing?.input_per_mtok ?? null + const cachedInputPrice = pricing?.cached_input_per_mtok ?? null + const outputPrice = pricing?.output_per_mtok ?? null + const reasoningPrice = pricing?.reasoning_per_mtok ?? null + const premiumUnitPrice = pricing?.premium_unit_price ?? null + const premiumMultiplier = pricing?.premium_multiplier ?? null + const premiumRequestCount = premiumMultiplier ?? 0 + + const insertEvent = db.prepare( + `INSERT INTO usage_events ( + ts, account_name, model_id, endpoint, upstream_format, is_streaming, + input_tokens, cached_input_tokens, output_tokens, reasoning_tokens, + total_tokens, premium_request_count, + input_price_snapshot, cached_input_price_snapshot, + output_price_snapshot, reasoning_price_snapshot, + premium_unit_price_snapshot, premium_multiplier_snapshot, + request_id, status, duration_ms + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + ) + + const upsertDaily = db.prepare( + `INSERT INTO usage_daily ( + day, account_name, model_id, endpoint, + req_count, input_tokens, cached_input_tokens, + output_tokens, reasoning_tokens, total_tokens, premium_requests + ) VALUES ( + date(?/1000, 'unixepoch', 'localtime'), + ?, ?, ?, 1, ?, ?, ?, ?, ?, ? + ) + ON CONFLICT(day, account_name, model_id, endpoint) DO UPDATE SET + req_count = req_count + 1, + input_tokens = input_tokens + excluded.input_tokens, + cached_input_tokens = cached_input_tokens + excluded.cached_input_tokens, + output_tokens = output_tokens + excluded.output_tokens, + reasoning_tokens = reasoning_tokens + excluded.reasoning_tokens, + total_tokens = total_tokens + excluded.total_tokens, + premium_requests = premium_requests + excluded.premium_requests`, + ) + + const tx = db.transaction(() => { + insertEvent.run( + ts, + input.account.name, + input.modelId, + input.endpoint, + input.upstreamFormat, + input.isStreaming ? 1 : 0, + input.usage.inputTokens, + input.usage.cachedInputTokens, + input.usage.outputTokens, + input.usage.reasoningTokens, + input.usage.totalTokens, + premiumRequestCount, + inputPrice, + cachedInputPrice, + outputPrice, + reasoningPrice, + premiumUnitPrice, + premiumMultiplier, + input.requestId ?? null, + input.status, + input.durationMs, + ) + upsertDaily.run( + ts, + input.account.name, + input.modelId, + input.endpoint, + input.usage.inputTokens, + input.usage.cachedInputTokens, + input.usage.outputTokens, + input.usage.reasoningTokens, + input.usage.totalTokens, + premiumRequestCount, + ) + }) + tx() + } catch (err) { + consola.error("[usage-recorder] failed to record usage:", err) + } +} diff --git a/tests/usage-recorder.test.ts b/tests/usage-recorder.test.ts new file mode 100644 index 00000000..0bed7425 --- /dev/null +++ b/tests/usage-recorder.test.ts @@ -0,0 +1,132 @@ +import { test, expect, describe, beforeEach } from "bun:test" + +import type { Account } from "../src/lib/account-pool" + +import { __resetDbForTests, initDb } from "../src/lib/db" +import { recordUsage } from "../src/lib/usage-recorder" + +const ACCOUNT: Account = { + name: "alice", + accountType: "individual", + githubToken: "ghu_a", + copilotToken: "tok_a", + copilotTokenRefreshAt: 0, + inFlight: 0, + lastUsedAt: 0, + failureCount: 0, +} + +const baseInput = { + account: ACCOUNT, + modelId: "gpt-4o", + endpoint: "chat.completions" as const, + upstreamFormat: "openai" as const, + isStreaming: false, + usage: { + inputTokens: 100, + cachedInputTokens: 20, + outputTokens: 50, + reasoningTokens: 0, + totalTokens: 150, + }, + durationMs: 123, + status: "ok" as const, +} + +function setupDb() { + __resetDbForTests() + const db = initDb(":memory:") + db.run( + "INSERT INTO accounts (name, account_type, created_at) VALUES (?, ?, ?)", + [ACCOUNT.name, ACCOUNT.accountType, Date.now()], + ) + return db +} + +describe("recordUsage", () => { + beforeEach(() => { + __resetDbForTests() + }) + + test("inserts an event and a daily row", () => { + const db = setupDb() + db.run( + `INSERT INTO model_pricing ( + model_id, input_per_mtok, cached_input_per_mtok, output_per_mtok, + reasoning_per_mtok, premium_multiplier, premium_unit_price, + updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, + ["gpt-4o", 5, 1, 15, 0, 1.0, 0.04, Date.now()], + ) + recordUsage(baseInput) + + const events = db.query("SELECT * FROM usage_events").all() as Array<{ + account_name: string + model_id: string + input_tokens: number + input_price_snapshot: number + premium_request_count: number + }> + expect(events).toHaveLength(1) + expect(events[0].account_name).toBe("alice") + expect(events[0].input_price_snapshot).toBe(5) + expect(events[0].premium_request_count).toBe(1) + + const daily = db.query("SELECT * FROM usage_daily").all() as Array<{ + req_count: number + input_tokens: number + premium_requests: number + }> + expect(daily).toHaveLength(1) + expect(daily[0].req_count).toBe(1) + expect(daily[0].input_tokens).toBe(100) + expect(daily[0].premium_requests).toBe(1) + }) + + test("second insert into same (day,account,model,endpoint) increments daily", () => { + const db = setupDb() + recordUsage(baseInput) + recordUsage(baseInput) + const daily = db + .query< + { req_count: number; input_tokens: number }, + [] + >("SELECT req_count, input_tokens FROM usage_daily") + .all() + expect(daily).toHaveLength(1) + expect(daily[0].req_count).toBe(2) + expect(daily[0].input_tokens).toBe(200) + }) + + test("missing model_pricing row -> snapshots null and no throw", () => { + const db = setupDb() + recordUsage(baseInput) + const ev = db + .query< + { input_price_snapshot: number | null; premium_request_count: number }, + [] + >("SELECT input_price_snapshot, premium_request_count FROM usage_events") + .get() + expect(ev?.input_price_snapshot).toBeNull() + expect(ev?.premium_request_count).toBe(0) + }) + + test("isInternal=true inserts nothing", () => { + const db = setupDb() + recordUsage({ ...baseInput, isInternal: true }) + const events = db.query("SELECT * FROM usage_events").all() + expect(events).toHaveLength(0) + }) + + test("recorder errors are swallowed", () => { + setupDb() + // Force an error by passing an invalid endpoint type via cast. + expect(() => + recordUsage({ + ...baseInput, + // @ts-expect-error intentional bad value to trigger SQL CHECK fail (none here, but recorder must not throw on weird input) + endpoint: undefined, + }), + ).not.toThrow() + }) +})