Skip to content

Commit 4013b77

Browse files
committed
feat: optimize X-Initiator logic
1 parent bf5b145 commit 4013b77

6 files changed

Lines changed: 376 additions & 46 deletions

File tree

src/lib/billing-cycle.ts

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/**
2+
* Billing Cycle Manager
3+
*
4+
* Manages billing cycles to ensure only the first request in a cycle is billed.
5+
*
6+
* Rules:
7+
* - First request in cycle: X-Initiator = user (billed)
8+
* - Subsequent requests: X-Initiator = agent (not billed)
9+
* - Cycle resets after 5 minutes of inactivity (no requests after last response)
10+
* - Concurrent first requests: only ONE is billed
11+
* - Failed requests: do not enter cycle, no billing
12+
*/
13+
14+
const CYCLE_TIMEOUT_MS = 5 * 60 * 1000 // 5 minutes
15+
16+
class BillingCycleManager {
17+
private inCycle: boolean = false
18+
private lastResponseTime: number = 0
19+
private pendingResponses: number = 0
20+
private lock: Promise<void> = Promise.resolve()
21+
22+
/**
23+
* Determines whether the current request should be billed.
24+
* Thread-safe: handles concurrent requests correctly.
25+
*
26+
* @returns 'user' if request should be billed, 'agent' if not
27+
*/
28+
async determineInitiator(): Promise<"user" | "agent"> {
29+
// Acquire lock for thread-safety
30+
await this.acquireLock()
31+
32+
try {
33+
const now = Date.now()
34+
35+
// Check if cycle has timed out (5 minutes since last response)
36+
if (
37+
this.inCycle
38+
&& this.pendingResponses === 0
39+
&& now - this.lastResponseTime > CYCLE_TIMEOUT_MS
40+
) {
41+
this.inCycle = false
42+
}
43+
44+
// Determine billing
45+
if (!this.inCycle) {
46+
// First request in new cycle - bill it
47+
this.inCycle = true
48+
this.pendingResponses++
49+
return "user"
50+
}
51+
52+
// Already in cycle - don't bill
53+
this.pendingResponses++
54+
return "agent"
55+
} finally {
56+
this.releaseLock()
57+
}
58+
}
59+
60+
/**
61+
* Mark a response as complete (for both streaming and non-streaming).
62+
* Updates the last response timestamp.
63+
*/
64+
markResponseComplete(): void {
65+
this.lastResponseTime = Date.now()
66+
this.pendingResponses = Math.max(0, this.pendingResponses - 1)
67+
}
68+
69+
/**
70+
* Mark a request as failed.
71+
* Failed requests do not contribute to the billing cycle.
72+
*/
73+
markRequestFailed(): void {
74+
this.pendingResponses = Math.max(0, this.pendingResponses - 1)
75+
76+
// If this was the first request and it failed, exit the cycle
77+
if (this.pendingResponses === 0 && this.lastResponseTime === 0) {
78+
this.inCycle = false
79+
}
80+
}
81+
82+
/**
83+
* Get current cycle status (for debugging/monitoring)
84+
*/
85+
getStatus(): {
86+
inCycle: boolean
87+
lastResponseTime: number
88+
pendingResponses: number
89+
} {
90+
return {
91+
inCycle: this.inCycle,
92+
lastResponseTime: this.lastResponseTime,
93+
pendingResponses: this.pendingResponses,
94+
}
95+
}
96+
97+
/**
98+
* Reset the billing cycle (for testing purposes)
99+
*/
100+
reset(): void {
101+
this.inCycle = false
102+
this.lastResponseTime = 0
103+
this.pendingResponses = 0
104+
}
105+
106+
// Simple async lock implementation
107+
private async acquireLock(): Promise<void> {
108+
const currentLock = this.lock
109+
let releaseLock!: () => void
110+
this.lock = new Promise((resolve) => {
111+
releaseLock = resolve
112+
})
113+
await currentLock
114+
this.releaseLock = releaseLock
115+
}
116+
117+
private releaseLock: () => void = () => {}
118+
}
119+
120+
// Singleton instance
121+
export const billingCycleManager = new BillingCycleManager()

src/routes/chat-completions/handler.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import consola from "consola"
44
import { streamSSE, type SSEMessage } from "hono/streaming"
55

66
import { awaitApproval } from "~/lib/approval"
7+
import { billingCycleManager } from "~/lib/billing-cycle"
78
import { checkRateLimit } from "~/lib/rate-limit"
89
import { state } from "~/lib/state"
910
import { getTokenCount } from "~/lib/tokenizer"
@@ -56,9 +57,17 @@ export async function handleCompletion(c: Context) {
5657

5758
consola.debug("Streaming response")
5859
return streamSSE(c, async (stream) => {
59-
for await (const chunk of response) {
60-
consola.debug("Streaming chunk:", JSON.stringify(chunk))
61-
await stream.writeSSE(chunk as SSEMessage)
60+
try {
61+
for await (const chunk of response) {
62+
consola.debug("Streaming chunk:", JSON.stringify(chunk))
63+
await stream.writeSSE(chunk as SSEMessage)
64+
}
65+
// Mark response complete after all chunks are sent
66+
billingCycleManager.markResponseComplete()
67+
} catch (error) {
68+
// If streaming fails, mark request as failed
69+
billingCycleManager.markRequestFailed()
70+
throw error
6271
}
6372
})
6473
}

src/routes/messages/handler.ts

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import consola from "consola"
44
import { streamSSE } from "hono/streaming"
55

66
import { awaitApproval } from "~/lib/approval"
7+
import { billingCycleManager } from "~/lib/billing-cycle"
78
import { checkRateLimit } from "~/lib/rate-limit"
89
import { state } from "~/lib/state"
910
import {
@@ -62,26 +63,34 @@ export async function handleCompletion(c: Context) {
6263
toolCalls: {},
6364
}
6465

65-
for await (const rawEvent of response) {
66-
consola.debug("Copilot raw stream event:", JSON.stringify(rawEvent))
67-
if (rawEvent.data === "[DONE]") {
68-
break
69-
}
66+
try {
67+
for await (const rawEvent of response) {
68+
consola.debug("Copilot raw stream event:", JSON.stringify(rawEvent))
69+
if (rawEvent.data === "[DONE]") {
70+
break
71+
}
7072

71-
if (!rawEvent.data) {
72-
continue
73-
}
73+
if (!rawEvent.data) {
74+
continue
75+
}
7476

75-
const chunk = JSON.parse(rawEvent.data) as ChatCompletionChunk
76-
const events = translateChunkToAnthropicEvents(chunk, streamState)
77+
const chunk = JSON.parse(rawEvent.data) as ChatCompletionChunk
78+
const events = translateChunkToAnthropicEvents(chunk, streamState)
7779

78-
for (const event of events) {
79-
consola.debug("Translated Anthropic event:", JSON.stringify(event))
80-
await stream.writeSSE({
81-
event: event.type,
82-
data: JSON.stringify(event),
83-
})
80+
for (const event of events) {
81+
consola.debug("Translated Anthropic event:", JSON.stringify(event))
82+
await stream.writeSSE({
83+
event: event.type,
84+
data: JSON.stringify(event),
85+
})
86+
}
8487
}
88+
// Mark response complete after all chunks are sent
89+
billingCycleManager.markResponseComplete()
90+
} catch (error) {
91+
// If streaming fails, mark request as failed
92+
billingCycleManager.markRequestFailed()
93+
throw error
8594
}
8695
})
8796
}

src/services/copilot/create-chat-completions.ts

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import consola from "consola"
22
import { events } from "fetch-event-stream"
33

44
import { copilotHeaders, copilotBaseUrl } from "~/lib/api-config"
5+
import { billingCycleManager } from "~/lib/billing-cycle"
56
import { HTTPError } from "~/lib/error"
67
import { state } from "~/lib/state"
78

@@ -16,34 +17,43 @@ export const createChatCompletions = async (
1617
&& x.content?.some((x) => x.type === "image_url"),
1718
)
1819

19-
// Agent/user check for X-Initiator header
20-
// Determine if any message is from an agent ("assistant" or "tool")
21-
const isAgentCall = payload.messages.some((msg) =>
22-
["assistant", "tool"].includes(msg.role),
23-
)
20+
// Determine X-Initiator based on billing cycle
21+
const initiator = await billingCycleManager.determineInitiator()
2422

2523
// Build headers and add X-Initiator
2624
const headers: Record<string, string> = {
2725
...copilotHeaders(state, enableVision),
28-
"X-Initiator": isAgentCall ? "agent" : "user",
26+
"X-Initiator": initiator,
2927
}
3028

31-
const response = await fetch(`${copilotBaseUrl(state)}/chat/completions`, {
32-
method: "POST",
33-
headers,
34-
body: JSON.stringify(payload),
35-
})
36-
37-
if (!response.ok) {
38-
consola.error("Failed to create chat completions", response)
39-
throw new HTTPError("Failed to create chat completions", response)
29+
let response: Response
30+
try {
31+
response = await fetch(`${copilotBaseUrl(state)}/chat/completions`, {
32+
method: "POST",
33+
headers,
34+
body: JSON.stringify(payload),
35+
})
36+
37+
if (!response.ok) {
38+
consola.error("Failed to create chat completions", response)
39+
billingCycleManager.markRequestFailed()
40+
throw new HTTPError("Failed to create chat completions", response)
41+
}
42+
} catch (error) {
43+
billingCycleManager.markRequestFailed()
44+
throw error
4045
}
4146

42-
if (payload.stream) {
43-
return events(response)
47+
// For non-streaming, mark as complete immediately after receiving response
48+
if (!payload.stream) {
49+
const result = (await response.json()) as ChatCompletionResponse
50+
billingCycleManager.markResponseComplete()
51+
return result
4452
}
4553

46-
return (await response.json()) as ChatCompletionResponse
54+
// For streaming, return the event stream
55+
// Note: The caller (route handler) must call markResponseComplete() after stream ends
56+
return events(response)
4757
}
4858

4959
// Streaming types

0 commit comments

Comments
 (0)