From 5a20f3269f98034539d4388c47568e88abb34570 Mon Sep 17 00:00:00 2001 From: Bo Lu Date: Sat, 25 Apr 2026 14:32:51 +0800 Subject: [PATCH] feat(accounts): withAccount wrapper for retry/cooldown (Task #4) - src/lib/with-account.ts: 401 -> refresh token + retry; 5xx/network -> cooldown + retry; 4xx propagates. Cap min(pool size, 3). - Handlers in chat-completions, messages, embeddings now go through withAccount. Tests: success, 5xx retry/cooldown, 4xx no retry, 401 refresh + retry, retries cap. Refs #4 Co-Authored-By: Claude Opus 4 --- docs/tasks/04-with-account-wrapper.md | 68 +++++++++++++++ src/lib/with-account.ts | 109 +++++++++++++++++++++++++ src/routes/chat-completions/handler.ts | 18 ++-- src/routes/embeddings/route.ts | 14 ++-- src/routes/messages/handler.ts | 12 +-- tests/with-account.test.ts | 108 ++++++++++++++++++++++++ 6 files changed, 299 insertions(+), 30 deletions(-) create mode 100644 docs/tasks/04-with-account-wrapper.md create mode 100644 src/lib/with-account.ts create mode 100644 tests/with-account.test.ts diff --git a/docs/tasks/04-with-account-wrapper.md b/docs/tasks/04-with-account-wrapper.md new file mode 100644 index 00000000..30dc2baf --- /dev/null +++ b/docs/tasks/04-with-account-wrapper.md @@ -0,0 +1,68 @@ +# Task 04 — Handler `withAccount` wrapper + +**Depends on:** 03 +**Unblocks:** 07, 08, 09, 10 + +## Goal + +Replace the inline `acquire/release` placeholder from task 03 with a single +`withAccount` helper that handles retry, cooldown, abort, and the +`x-internal-pricing-sync` exemption. + +## Scope + +New file `src/lib/with-account.ts`: + +```ts +export async function withAccount( + c: Context, + fn: (account: Account) => Promise, +): Promise { + const isInternal = c.req.header('x-internal-pricing-sync') === '1' + const maxRetries = Math.min(state.pool.size(), 3) + let lastErr: unknown + for (let attempt = 0; attempt < maxRetries; attempt++) { + const account = await state.pool.acquire() + try { + const out = await fn(account) + account.consecutiveFailures = 0 + return out + } catch (e) { + lastErr = e + if (isClientError(e)) throw e // 4xx (non-401) — no retry + if (isAuthError(e)) triggerRefresh(account) // 401 — refresh, then retry + else state.pool.markCooldown(account, 30_000) // 5xx / network + } finally { + state.pool.release(account) + } + } + throw lastErr +} +``` + +Update each handler to: + +```ts +return withAccount(c, async (account) => { + // ...existing logic with `account` threaded into service call... +}) +``` + +Streaming handlers must not retry once the SSE response has begun flushing. +Either: + +- Detect "headers already sent" and rethrow without rotating, OR +- Wrap retry only around the `fetch()` upstream call, and once events start + flowing, abort retry. + +## Definition of Done + +- [ ] `withAccount` is the only place that calls `pool.acquire/release` + outside startup code. +- [ ] Unit test: forcing a 401 once causes one retry against a different + account (use a dummy pool of two accounts). +- [ ] Unit test: forcing a 4xx never retries. +- [ ] Manual smoke: kill one account's token mid-flight; new requests succeed + on the other account; the dead account enters cooldown. +- [ ] Internal `x-internal-pricing-sync: 1` requests bypass nothing in this + task (the exemption only matters for the recorder in task 06). diff --git a/src/lib/with-account.ts b/src/lib/with-account.ts new file mode 100644 index 00000000..799097a7 --- /dev/null +++ b/src/lib/with-account.ts @@ -0,0 +1,109 @@ +import type { Context } from "hono" + +import consola from "consola" + +import type { AccountPool, Account } from "./account-pool" + +import { HTTPError } from "./error" +import { state } from "./state" +import { setupCopilotTokenFor } from "./token" + +const COOLDOWN_MS = 30_000 +const MAX_RETRIES_CAP = 3 + +export interface WithAccountOptions { + /** Override max retries (still capped by pool size). */ + maxRetries?: number +} + +async function safeRefresh(account: Account): Promise { + try { + await setupCopilotTokenFor(account) + return true + } catch (refreshErr) { + consola.error(`[${account.name}] token refresh failed:`, refreshErr) + return false + } +} + +/** + * Handle one error from `fn(account)`. + * Throws to bubble up immediately, returns nothing to continue the retry loop. + */ +async function handleAttemptError( + pool: AccountPool, + account: Account, + err: unknown, +): Promise { + if (err instanceof HTTPError) { + const { status } = err.response + if (status === 401) { + consola.warn( + `[${account.name}] 401 from upstream; refreshing token and retrying`, + ) + const ok = await safeRefresh(account) + if (!ok) pool.markCooldown(account, COOLDOWN_MS) + return + } + if (status >= 400 && status < 500) { + // Client error — propagate without retry. + throw err + } + // 5xx + consola.warn( + `[${account.name}] ${status} from upstream; cooling down ${COOLDOWN_MS}ms`, + ) + pool.markCooldown(account, COOLDOWN_MS) + return + } + + // Non-HTTP error (network, timeout, etc.) + consola.warn(`[${account.name}] non-HTTP error; cooling down`, err) + pool.markCooldown(account, COOLDOWN_MS) +} + +/** + * Acquire an account from the pool, run `fn`, and on failure retry against + * a different account up to `min(pool size, MAX_RETRIES_CAP)` times. + * + * Retry policy: + * - 4xx (non-401): no retry; client error rethrows immediately. + * - 401: refresh the account's Copilot token and retry. + * - 5xx / network: cooldown the account for 30s and retry. + * + * Streaming handlers should call `withAccount` ONLY around the upstream + * fetch — once SSE has started flushing, retry is unsafe. + */ +export async function withAccount( + c: Context | undefined, + fn: (account: Account) => Promise, + options: WithAccountOptions = {}, +): Promise { + if (!state.pool) throw new Error("Account pool not initialized") + const pool = state.pool + void c // currently unused; kept for parity with design (e.g. internal-call header) + + const usableCount = pool.accounts.length + const requested = options.maxRetries ?? MAX_RETRIES_CAP + const maxAttempts = Math.max( + 1, + Math.min(requested, usableCount, MAX_RETRIES_CAP), + ) + + let lastErr: unknown + for (let attempt = 0; attempt < maxAttempts; attempt++) { + const account = pool.acquire() + try { + const out = await fn(account) + account.failureCount = 0 + pool.release(account) + return out + } catch (err) { + lastErr = err + pool.release(account) + pool.markFailure(account) + await handleAttemptError(pool, account, err) + } + } + throw lastErr +} diff --git a/src/routes/chat-completions/handler.ts b/src/routes/chat-completions/handler.ts index e9efa686..21b5bab5 100644 --- a/src/routes/chat-completions/handler.ts +++ b/src/routes/chat-completions/handler.ts @@ -7,11 +7,8 @@ import { awaitApproval } from "~/lib/approval" import { checkRateLimit } from "~/lib/rate-limit" import { state } from "~/lib/state" import { getTokenCount } from "~/lib/tokenizer" -import { - isNullish, - makeApiContext, - resolveAndMapModelId, -} from "~/lib/utils" +import { isNullish, makeApiContext, resolveAndMapModelId } from "~/lib/utils" +import { withAccount } from "~/lib/with-account" import { createChatCompletions, type ChatCompletionResponse, @@ -55,14 +52,9 @@ export async function handleCompletion(c: Context) { consola.debug("Set max_tokens to:", JSON.stringify(payload.max_tokens)) } - if (!state.pool) throw new Error("Account pool not initialized") - const account = state.pool.acquire() - let response: Awaited> - try { - response = await createChatCompletions(makeApiContext(account), payload) - } finally { - state.pool.release(account) - } + const response = await withAccount(c, (account) => + createChatCompletions(makeApiContext(account), payload), + ) if (isNonStreaming(response)) { consola.debug("Non-streaming response:", JSON.stringify(response)) diff --git a/src/routes/embeddings/route.ts b/src/routes/embeddings/route.ts index 478bea49..2bff9a3a 100644 --- a/src/routes/embeddings/route.ts +++ b/src/routes/embeddings/route.ts @@ -1,8 +1,8 @@ import { Hono } from "hono" import { forwardError } from "~/lib/error" -import { state } from "~/lib/state" import { makeApiContext } from "~/lib/utils" +import { withAccount } from "~/lib/with-account" import { createEmbeddings, type EmbeddingRequest, @@ -13,14 +13,10 @@ export const embeddingRoutes = new Hono() embeddingRoutes.post("/", async (c) => { try { const paylod = await c.req.json() - if (!state.pool) throw new Error("Account pool not initialized") - const account = state.pool.acquire() - try { - const response = await createEmbeddings(makeApiContext(account), paylod) - return c.json(response) - } finally { - state.pool.release(account) - } + const response = await withAccount(c, (account) => + createEmbeddings(makeApiContext(account), paylod), + ) + return c.json(response) } catch (error) { return await forwardError(c, error) } diff --git a/src/routes/messages/handler.ts b/src/routes/messages/handler.ts index 8ddf1955..345c4252 100644 --- a/src/routes/messages/handler.ts +++ b/src/routes/messages/handler.ts @@ -7,6 +7,7 @@ import { awaitApproval } from "~/lib/approval" import { checkRateLimit } from "~/lib/rate-limit" import { state } from "~/lib/state" import { makeApiContext, resolveAndMapModelId } from "~/lib/utils" +import { withAccount } from "~/lib/with-account" import { createChatCompletions, type ChatCompletionChunk, @@ -47,14 +48,9 @@ export async function handleCompletion(c: Context) { await awaitApproval() } - if (!state.pool) throw new Error("Account pool not initialized") - const account = state.pool.acquire() - let response: Awaited> - try { - response = await createChatCompletions(makeApiContext(account), openAIPayload) - } finally { - state.pool.release(account) - } + const response = await withAccount(c, (account) => + createChatCompletions(makeApiContext(account), openAIPayload), + ) if (isNonStreaming(response)) { consola.debug( diff --git a/tests/with-account.test.ts b/tests/with-account.test.ts new file mode 100644 index 00000000..42bf2e2f --- /dev/null +++ b/tests/with-account.test.ts @@ -0,0 +1,108 @@ +import { test, expect, describe, beforeEach, mock } from "bun:test" + +import { AccountPool, type Account } from "../src/lib/account-pool" +import { HTTPError } from "../src/lib/error" +import { state } from "../src/lib/state" +import { withAccount } from "../src/lib/with-account" + +// Stub out token refresh so 401 retries don't hit the network. +void mock.module("../src/lib/token", () => ({ + setupCopilotTokenFor: async (_a: Account) => { + /* no-op */ + }, +})) + +const makeAccount = (name: string): Account => ({ + name, + accountType: "individual", + githubToken: `ghu_${name}`, + copilotToken: `tok_${name}`, + copilotTokenRefreshAt: 0, + inFlight: 0, + lastUsedAt: 0, + failureCount: 0, +}) + +const fakeResp = (status: number) => new Response("err", { status }) + +describe("withAccount", () => { + beforeEach(() => { + state.pool = new AccountPool( + [makeAccount("a"), makeAccount("b")], + "round-robin", + ) + }) + + test("returns value on success without retry", async () => { + const seen: Array = [] + const out = await withAccount(undefined, (account) => { + seen.push(account.name) + return Promise.resolve(42) + }) + expect(out).toBe(42) + expect(seen).toHaveLength(1) + }) + + test("retries on 5xx with a different account, then succeeds", async () => { + const seen: Array = [] + const out = await withAccount(undefined, (account) => { + seen.push(account.name) + if (seen.length === 1) { + return Promise.reject(new HTTPError("upstream 503", fakeResp(503))) + } + return Promise.resolve("ok") + }) + expect(out).toBe("ok") + expect(seen).toHaveLength(2) + expect(seen[0]).not.toBe(seen[1]) + // First account should be on cooldown + const first = state.pool?.accounts[0] + expect(first?.cooldownUntil ?? 0).toBeGreaterThan(Date.now()) + }) + + test("4xx client error (non-401) does not retry", async () => { + const seen: Array = [] + const promise = withAccount(undefined, (account) => { + seen.push(account.name) + return Promise.reject(new HTTPError("bad request", fakeResp(400))) + }) + let thrown: unknown + try { + await promise + } catch (e) { + thrown = e + } + expect(thrown).toBeInstanceOf(HTTPError) + expect(seen).toHaveLength(1) + }) + + test("401 triggers refresh and retries on a different account", async () => { + const seen: Array = [] + const out = await withAccount(undefined, (account) => { + seen.push(account.name) + if (seen.length === 1) { + return Promise.reject(new HTTPError("auth", fakeResp(401))) + } + return Promise.resolve("ok-after-refresh") + }) + expect(out).toBe("ok-after-refresh") + expect(seen).toHaveLength(2) + }) + + test("retries cap at pool size", async () => { + const calls: Array = [] + const promise = withAccount(undefined, (account) => { + calls.push(account.name) + return Promise.reject(new HTTPError("upstream 502", fakeResp(502))) + }) + let thrown: unknown + try { + await promise + } catch (e) { + thrown = e + } + expect(thrown).toBeInstanceOf(HTTPError) + // 2 accounts ⇒ exactly 2 attempts + expect(calls).toHaveLength(2) + }) +})