Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions docs/tasks/04-with-account-wrapper.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Task 04 — Handler `withAccount` wrapper

**Depends on:** 03
**Unblocks:** 07, 08, 09, 10

## Goal

Replace the inline `acquire/release` placeholder from task 03 with a single
`withAccount` helper that handles retry, cooldown, abort, and the
`x-internal-pricing-sync` exemption.

## Scope

New file `src/lib/with-account.ts`:

```ts
export async function withAccount<T>(
c: Context,
fn: (account: Account) => Promise<T>,
): Promise<T> {
const isInternal = c.req.header('x-internal-pricing-sync') === '1'
const maxRetries = Math.min(state.pool.size(), 3)
let lastErr: unknown
for (let attempt = 0; attempt < maxRetries; attempt++) {
const account = await state.pool.acquire()
try {
const out = await fn(account)
account.consecutiveFailures = 0
return out
} catch (e) {
lastErr = e
if (isClientError(e)) throw e // 4xx (non-401) — no retry
if (isAuthError(e)) triggerRefresh(account) // 401 — refresh, then retry
else state.pool.markCooldown(account, 30_000) // 5xx / network
} finally {
state.pool.release(account)
}
}
throw lastErr
}
```

Update each handler to:

```ts
return withAccount(c, async (account) => {
// ...existing logic with `account` threaded into service call...
})
```

Streaming handlers must not retry once the SSE response has begun flushing.
Either:

- Detect "headers already sent" and rethrow without rotating, OR
- Wrap retry only around the `fetch()` upstream call, and once events start
flowing, abort retry.

## Definition of Done

- [ ] `withAccount` is the only place that calls `pool.acquire/release`
outside startup code.
- [ ] Unit test: forcing a 401 once causes one retry against a different
account (use a dummy pool of two accounts).
- [ ] Unit test: forcing a 4xx never retries.
- [ ] Manual smoke: kill one account's token mid-flight; new requests succeed
on the other account; the dead account enters cooldown.
- [ ] Internal `x-internal-pricing-sync: 1` requests bypass nothing in this
task (the exemption only matters for the recorder in task 06).
109 changes: 109 additions & 0 deletions src/lib/with-account.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import type { Context } from "hono"

import consola from "consola"

import type { AccountPool, Account } from "./account-pool"

import { HTTPError } from "./error"
import { state } from "./state"
import { setupCopilotTokenFor } from "./token"

const COOLDOWN_MS = 30_000
const MAX_RETRIES_CAP = 3

export interface WithAccountOptions {
/** Override max retries (still capped by pool size). */
maxRetries?: number
}

async function safeRefresh(account: Account): Promise<boolean> {
try {
await setupCopilotTokenFor(account)
return true
} catch (refreshErr) {
consola.error(`[${account.name}] token refresh failed:`, refreshErr)
return false
}
}

/**
* Handle one error from `fn(account)`.
* Throws to bubble up immediately, returns nothing to continue the retry loop.
*/
async function handleAttemptError(
pool: AccountPool,
account: Account,
err: unknown,
): Promise<void> {
if (err instanceof HTTPError) {
const { status } = err.response
if (status === 401) {
consola.warn(
`[${account.name}] 401 from upstream; refreshing token and retrying`,
)
const ok = await safeRefresh(account)
if (!ok) pool.markCooldown(account, COOLDOWN_MS)
return
}
if (status >= 400 && status < 500) {
// Client error — propagate without retry.
throw err
}
// 5xx
consola.warn(
`[${account.name}] ${status} from upstream; cooling down ${COOLDOWN_MS}ms`,
)
pool.markCooldown(account, COOLDOWN_MS)
return
}

// Non-HTTP error (network, timeout, etc.)
consola.warn(`[${account.name}] non-HTTP error; cooling down`, err)
pool.markCooldown(account, COOLDOWN_MS)
}

/**
* Acquire an account from the pool, run `fn`, and on failure retry against
* a different account up to `min(pool size, MAX_RETRIES_CAP)` times.
*
* Retry policy:
* - 4xx (non-401): no retry; client error rethrows immediately.
* - 401: refresh the account's Copilot token and retry.
* - 5xx / network: cooldown the account for 30s and retry.
*
* Streaming handlers should call `withAccount` ONLY around the upstream
* fetch — once SSE has started flushing, retry is unsafe.
*/
export async function withAccount<T>(
c: Context | undefined,
fn: (account: Account) => Promise<T>,
options: WithAccountOptions = {},
): Promise<T> {
if (!state.pool) throw new Error("Account pool not initialized")
const pool = state.pool
void c // currently unused; kept for parity with design (e.g. internal-call header)

const usableCount = pool.accounts.length
const requested = options.maxRetries ?? MAX_RETRIES_CAP
const maxAttempts = Math.max(
1,
Math.min(requested, usableCount, MAX_RETRIES_CAP),
)

let lastErr: unknown
for (let attempt = 0; attempt < maxAttempts; attempt++) {
const account = pool.acquire()
try {
const out = await fn(account)
account.failureCount = 0
pool.release(account)
return out
} catch (err) {
lastErr = err
pool.release(account)
pool.markFailure(account)
await handleAttemptError(pool, account, err)
}
}
throw lastErr
}
18 changes: 5 additions & 13 deletions src/routes/chat-completions/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@ import { awaitApproval } from "~/lib/approval"
import { checkRateLimit } from "~/lib/rate-limit"
import { state } from "~/lib/state"
import { getTokenCount } from "~/lib/tokenizer"
import {
isNullish,
makeApiContext,
resolveAndMapModelId,
} from "~/lib/utils"
import { isNullish, makeApiContext, resolveAndMapModelId } from "~/lib/utils"
import { withAccount } from "~/lib/with-account"
import {
createChatCompletions,
type ChatCompletionResponse,
Expand Down Expand Up @@ -55,14 +52,9 @@ export async function handleCompletion(c: Context) {
consola.debug("Set max_tokens to:", JSON.stringify(payload.max_tokens))
}

if (!state.pool) throw new Error("Account pool not initialized")
const account = state.pool.acquire()
let response: Awaited<ReturnType<typeof createChatCompletions>>
try {
response = await createChatCompletions(makeApiContext(account), payload)
} finally {
state.pool.release(account)
}
const response = await withAccount(c, (account) =>
createChatCompletions(makeApiContext(account), payload),
)

if (isNonStreaming(response)) {
consola.debug("Non-streaming response:", JSON.stringify(response))
Expand Down
14 changes: 5 additions & 9 deletions src/routes/embeddings/route.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Hono } from "hono"

import { forwardError } from "~/lib/error"
import { state } from "~/lib/state"
import { makeApiContext } from "~/lib/utils"
import { withAccount } from "~/lib/with-account"
import {
createEmbeddings,
type EmbeddingRequest,
Expand All @@ -13,14 +13,10 @@ export const embeddingRoutes = new Hono()
embeddingRoutes.post("/", async (c) => {
try {
const paylod = await c.req.json<EmbeddingRequest>()
if (!state.pool) throw new Error("Account pool not initialized")
const account = state.pool.acquire()
try {
const response = await createEmbeddings(makeApiContext(account), paylod)
return c.json(response)
} finally {
state.pool.release(account)
}
const response = await withAccount(c, (account) =>
createEmbeddings(makeApiContext(account), paylod),
)
return c.json(response)
} catch (error) {
return await forwardError(c, error)
}
Expand Down
12 changes: 4 additions & 8 deletions src/routes/messages/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { awaitApproval } from "~/lib/approval"
import { checkRateLimit } from "~/lib/rate-limit"
import { state } from "~/lib/state"
import { makeApiContext, resolveAndMapModelId } from "~/lib/utils"
import { withAccount } from "~/lib/with-account"
import {
createChatCompletions,
type ChatCompletionChunk,
Expand Down Expand Up @@ -47,14 +48,9 @@ export async function handleCompletion(c: Context) {
await awaitApproval()
}

if (!state.pool) throw new Error("Account pool not initialized")
const account = state.pool.acquire()
let response: Awaited<ReturnType<typeof createChatCompletions>>
try {
response = await createChatCompletions(makeApiContext(account), openAIPayload)
} finally {
state.pool.release(account)
}
const response = await withAccount(c, (account) =>
createChatCompletions(makeApiContext(account), openAIPayload),
)

if (isNonStreaming(response)) {
consola.debug(
Expand Down
108 changes: 108 additions & 0 deletions tests/with-account.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import { test, expect, describe, beforeEach, mock } from "bun:test"

import { AccountPool, type Account } from "../src/lib/account-pool"
import { HTTPError } from "../src/lib/error"
import { state } from "../src/lib/state"
import { withAccount } from "../src/lib/with-account"

// Stub out token refresh so 401 retries don't hit the network.
void mock.module("../src/lib/token", () => ({
setupCopilotTokenFor: async (_a: Account) => {
/* no-op */
},
}))

const makeAccount = (name: string): Account => ({
name,
accountType: "individual",
githubToken: `ghu_${name}`,
copilotToken: `tok_${name}`,
copilotTokenRefreshAt: 0,
inFlight: 0,
lastUsedAt: 0,
failureCount: 0,
})

const fakeResp = (status: number) => new Response("err", { status })

describe("withAccount", () => {
beforeEach(() => {
state.pool = new AccountPool(
[makeAccount("a"), makeAccount("b")],
"round-robin",
)
})

test("returns value on success without retry", async () => {
const seen: Array<string> = []
const out = await withAccount(undefined, (account) => {
seen.push(account.name)
return Promise.resolve(42)
})
expect(out).toBe(42)
expect(seen).toHaveLength(1)
})

test("retries on 5xx with a different account, then succeeds", async () => {
const seen: Array<string> = []
const out = await withAccount(undefined, (account) => {
seen.push(account.name)
if (seen.length === 1) {
return Promise.reject(new HTTPError("upstream 503", fakeResp(503)))
}
return Promise.resolve("ok")
})
expect(out).toBe("ok")
expect(seen).toHaveLength(2)
expect(seen[0]).not.toBe(seen[1])
// First account should be on cooldown
const first = state.pool?.accounts[0]
expect(first?.cooldownUntil ?? 0).toBeGreaterThan(Date.now())
})

test("4xx client error (non-401) does not retry", async () => {
const seen: Array<string> = []
const promise = withAccount(undefined, (account) => {
seen.push(account.name)
return Promise.reject(new HTTPError("bad request", fakeResp(400)))
})
let thrown: unknown
try {
await promise
} catch (e) {
thrown = e
}
expect(thrown).toBeInstanceOf(HTTPError)
expect(seen).toHaveLength(1)
})

test("401 triggers refresh and retries on a different account", async () => {
const seen: Array<string> = []
const out = await withAccount(undefined, (account) => {
seen.push(account.name)
if (seen.length === 1) {
return Promise.reject(new HTTPError("auth", fakeResp(401)))
}
return Promise.resolve("ok-after-refresh")
})
expect(out).toBe("ok-after-refresh")
expect(seen).toHaveLength(2)
})

test("retries cap at pool size", async () => {
const calls: Array<string> = []
const promise = withAccount(undefined, (account) => {
calls.push(account.name)
return Promise.reject(new HTTPError("upstream 502", fakeResp(502)))
})
let thrown: unknown
try {
await promise
} catch (e) {
thrown = e
}
expect(thrown).toBeInstanceOf(HTTPError)
// 2 accounts ⇒ exactly 2 attempts
expect(calls).toHaveLength(2)
})
})