From fd84ee2c52d0ea894b784d3e58c0f76fda347d47 Mon Sep 17 00:00:00 2001 From: 202226442 <202226442@any3.com> Date: Fri, 24 Apr 2026 22:41:49 +0800 Subject: [PATCH] =?UTF-8?q?feat(server):=20=E6=B7=BB=E5=8A=A0=E5=AF=B9Ling?= =?UTF-8?q?ma=E6=8F=90=E4=BE=9B=E5=95=86=E7=9A=84=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E5=B9=B6=E9=87=8D=E6=9E=84=E8=81=8A=E5=A4=A9=E5=AE=8C=E6=88=90?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 添加新的Provider类型定义,支持copilot和lingma两种提供商 - 实现Lingma提供商的连接、初始化和聊天完成功能 - 重构createChatCompletions服务以支持多提供商架构 - 在start命令中添加provider参数和相关配置选项 - 为Lingma提供商实现模型缓存和配置解析功能 - 添加APIError类用于更精确的错误处理 - 在各个路由中添加提供商特定的功能限制检查 - 实现Lingma聊天完成的验证和渲染逻辑 - 更新状态管理以支持新的提供商模式 --- src/lib/error.ts | 27 + src/lib/state.ts | 6 + src/lib/utils.ts | 13 + src/routes/chat-completions/handler.ts | 2 +- src/routes/embeddings/route.ts | 11 +- src/routes/messages/route.ts | 19 +- src/routes/models/route.ts | 4 +- src/routes/token/route.ts | 12 +- src/routes/usage/route.ts | 13 +- src/services/create-chat-completions.ts | 28 + src/services/lingma/acp.ts | 41 ++ src/services/lingma/config.ts | 39 ++ .../lingma/create-chat-completions.ts | 515 ++++++++++++++++++ src/services/lingma/json-rpc.ts | 321 +++++++++++ src/services/lingma/models.ts | 44 ++ src/services/lingma/provider.ts | 49 ++ src/start.ts | 75 ++- tests/lingma-provider.test.ts | 314 +++++++++++ 18 files changed, 1511 insertions(+), 22 deletions(-) create mode 100644 src/services/create-chat-completions.ts create mode 100644 src/services/lingma/acp.ts create mode 100644 src/services/lingma/config.ts create mode 100644 src/services/lingma/create-chat-completions.ts create mode 100644 src/services/lingma/json-rpc.ts create mode 100644 src/services/lingma/models.ts create mode 100644 src/services/lingma/provider.ts create mode 100644 tests/lingma-provider.test.ts diff --git a/src/lib/error.ts b/src/lib/error.ts index c39c22596..15dc63759 100644 --- a/src/lib/error.ts +++ b/src/lib/error.ts @@ -12,9 +12,36 @@ export class HTTPError extends Error { } } +export class APIError extends Error { + status: ContentfulStatusCode + type: string + + constructor( + message: string, + status: ContentfulStatusCode = 500, + type = "error", + ) { + super(message) + this.status = status + this.type = type + } +} + export async function forwardError(c: Context, error: unknown) { consola.error("Error occurred:", error) + if (error instanceof APIError) { + return c.json( + { + error: { + message: error.message, + type: error.type, + }, + }, + error.status, + ) + } + if (error instanceof HTTPError) { const errorText = await error.response.text() let errorJson: unknown diff --git a/src/lib/state.ts b/src/lib/state.ts index 5ba4dc1d1..a3c9fd2ac 100644 --- a/src/lib/state.ts +++ b/src/lib/state.ts @@ -1,8 +1,13 @@ import type { ModelsResponse } from "~/services/copilot/get-models" +import type { LingmaRpcClient } from "~/services/lingma/json-rpc" + +export type Provider = "copilot" | "lingma" export interface State { + provider: Provider githubToken?: string copilotToken?: string + lingmaClient?: LingmaRpcClient accountType: string models?: ModelsResponse @@ -18,6 +23,7 @@ export interface State { } export const state: State = { + provider: "copilot", accountType: "individual", manualApprove: false, rateLimitWait: false, diff --git a/src/lib/utils.ts b/src/lib/utils.ts index cc80be667..6ecf39a19 100644 --- a/src/lib/utils.ts +++ b/src/lib/utils.ts @@ -2,6 +2,10 @@ import consola from "consola" import { getModels } from "~/services/copilot/get-models" import { getVSCodeVersion } from "~/services/get-vscode-version" +import { + createLingmaModels, + parseLingmaModelIds, +} from "~/services/lingma/models" import { state } from "./state" @@ -18,6 +22,15 @@ export async function cacheModels(): Promise { state.models = models } +export async function cacheProviderModels(): Promise { + if (state.provider === "lingma") { + state.models = createLingmaModels(parseLingmaModelIds()) + return + } + + await cacheModels() +} + export const cacheVSCodeVersion = async () => { const response = await getVSCodeVersion() state.vsCodeVersion = response diff --git a/src/routes/chat-completions/handler.ts b/src/routes/chat-completions/handler.ts index 04a5ae9ed..e3ef6e52c 100644 --- a/src/routes/chat-completions/handler.ts +++ b/src/routes/chat-completions/handler.ts @@ -12,7 +12,7 @@ import { createChatCompletions, type ChatCompletionResponse, type ChatCompletionsPayload, -} from "~/services/copilot/create-chat-completions" +} from "~/services/create-chat-completions" export async function handleCompletion(c: Context) { await checkRateLimit(state) diff --git a/src/routes/embeddings/route.ts b/src/routes/embeddings/route.ts index 4c4fc7b8a..e7681f14f 100644 --- a/src/routes/embeddings/route.ts +++ b/src/routes/embeddings/route.ts @@ -1,6 +1,7 @@ import { Hono } from "hono" -import { forwardError } from "~/lib/error" +import { APIError, forwardError } from "~/lib/error" +import { state } from "~/lib/state" import { createEmbeddings, type EmbeddingRequest, @@ -10,6 +11,14 @@ export const embeddingRoutes = new Hono() embeddingRoutes.post("/", async (c) => { try { + if (state.provider === "lingma") { + throw new APIError( + "Embeddings are not supported by the Lingma provider in V1", + 501, + "unsupported_feature", + ) + } + const paylod = await c.req.json() const response = await createEmbeddings(paylod) diff --git a/src/routes/messages/route.ts b/src/routes/messages/route.ts index ef72d802e..54f71006c 100644 --- a/src/routes/messages/route.ts +++ b/src/routes/messages/route.ts @@ -1,6 +1,7 @@ import { Hono } from "hono" -import { forwardError } from "~/lib/error" +import { APIError, forwardError } from "~/lib/error" +import { state } from "~/lib/state" import { handleCountTokens } from "./count-tokens-handler" import { handleCompletion } from "./handler" @@ -9,6 +10,14 @@ export const messageRoutes = new Hono() messageRoutes.post("/", async (c) => { try { + if (state.provider === "lingma") { + throw new APIError( + "Anthropic messages are not supported by the Lingma provider in V1", + 501, + "unsupported_feature", + ) + } + return await handleCompletion(c) } catch (error) { return await forwardError(c, error) @@ -17,6 +26,14 @@ messageRoutes.post("/", async (c) => { messageRoutes.post("/count_tokens", async (c) => { try { + if (state.provider === "lingma") { + throw new APIError( + "Token counting is not supported by the Lingma provider in V1", + 501, + "unsupported_feature", + ) + } + return await handleCountTokens(c) } catch (error) { return await forwardError(c, error) diff --git a/src/routes/models/route.ts b/src/routes/models/route.ts index 5254e2af7..c5278bd61 100644 --- a/src/routes/models/route.ts +++ b/src/routes/models/route.ts @@ -2,7 +2,7 @@ import { Hono } from "hono" import { forwardError } from "~/lib/error" import { state } from "~/lib/state" -import { cacheModels } from "~/lib/utils" +import { cacheProviderModels } from "~/lib/utils" export const modelRoutes = new Hono() @@ -10,7 +10,7 @@ modelRoutes.get("/", async (c) => { try { if (!state.models) { // This should be handled by startup logic, but as a fallback. - await cacheModels() + await cacheProviderModels() } const models = state.models?.data.map((model) => ({ diff --git a/src/routes/token/route.ts b/src/routes/token/route.ts index dd0456d9a..23861f49b 100644 --- a/src/routes/token/route.ts +++ b/src/routes/token/route.ts @@ -1,16 +1,24 @@ import { Hono } from "hono" +import { APIError, forwardError } from "~/lib/error" import { state } from "~/lib/state" export const tokenRoute = new Hono() tokenRoute.get("/", (c) => { try { + if (state.provider === "lingma") { + throw new APIError( + "Token inspection is not supported by the Lingma provider in V1", + 501, + "unsupported_feature", + ) + } + return c.json({ token: state.copilotToken, }) } catch (error) { - console.error("Error fetching token:", error) - return c.json({ error: "Failed to fetch token", token: null }, 500) + return forwardError(c, error) } }) diff --git a/src/routes/usage/route.ts b/src/routes/usage/route.ts index 3e9473236..c0a94e67a 100644 --- a/src/routes/usage/route.ts +++ b/src/routes/usage/route.ts @@ -1,15 +1,24 @@ import { Hono } from "hono" +import { APIError, forwardError } from "~/lib/error" +import { state } from "~/lib/state" import { getCopilotUsage } from "~/services/github/get-copilot-usage" export const usageRoute = new Hono() usageRoute.get("/", async (c) => { try { + if (state.provider === "lingma") { + throw new APIError( + "Usage is not supported by the Lingma provider in V1", + 501, + "unsupported_feature", + ) + } + const usage = await getCopilotUsage() return c.json(usage) } catch (error) { - console.error("Error fetching Copilot usage:", error) - return c.json({ error: "Failed to fetch Copilot usage" }, 500) + return await forwardError(c, error) } }) diff --git a/src/services/create-chat-completions.ts b/src/services/create-chat-completions.ts new file mode 100644 index 000000000..fbabb9f2e --- /dev/null +++ b/src/services/create-chat-completions.ts @@ -0,0 +1,28 @@ +import { state } from "~/lib/state" + +import type { ChatCompletionsPayload } from "./copilot/create-chat-completions" + +import { createChatCompletions as createCopilotChatCompletions } from "./copilot/create-chat-completions" +import { createLingmaChatCompletions } from "./lingma/create-chat-completions" + +export type { + ChatCompletionChunk, + ChatCompletionResponse, + ChatCompletionsPayload, + ContentPart, + ImagePart, + Message, + TextPart, + Tool, + ToolCall, +} from "./copilot/create-chat-completions" + +export const createChatCompletions = async ( + payload: ChatCompletionsPayload, +) => { + if (state.provider === "lingma") { + return await createLingmaChatCompletions(payload) + } + + return await createCopilotChatCompletions(payload) +} diff --git a/src/services/lingma/acp.ts b/src/services/lingma/acp.ts new file mode 100644 index 000000000..69a622048 --- /dev/null +++ b/src/services/lingma/acp.ts @@ -0,0 +1,41 @@ +import { basename } from "node:path" +import { pathToFileURL } from "node:url" + +import type { LingmaRpcClient } from "./json-rpc" + +const initializedGenerations = new WeakMap() + +export async function ensureLingmaAcpInitialized( + client: LingmaRpcClient, + workspacePath = process.cwd(), +): Promise { + await client.connect() + if (initializedGenerations.get(client) === client.generation) return + + const rootUri = pathToFileURL(workspacePath).href + await client.request("initialize", { + processId: process.pid, + rootUri, + rootPath: workspacePath, + workspaceFolders: [ + { + uri: rootUri, + name: basename(workspacePath), + }, + ], + capabilities: { + workspace: { + workspaceFolders: true, + configuration: true, + }, + }, + clientInfo: { + name: "copilot-api", + version: "0.0.0", + }, + allowStatistics: false, + configuration: {}, + }) + + initializedGenerations.set(client, client.generation) +} diff --git a/src/services/lingma/config.ts b/src/services/lingma/config.ts new file mode 100644 index 000000000..7ae50255c --- /dev/null +++ b/src/services/lingma/config.ts @@ -0,0 +1,39 @@ +import { readFile } from "node:fs/promises" +import { homedir } from "node:os" +import { join } from "node:path" + +export interface LingmaConnectionOptions { + cacheDir?: string + wsUrl?: string +} + +interface LingmaInfoFile { + websocketPort?: number +} + +export function getDefaultLingmaCacheDir(): string { + return join( + homedir(), + "Library", + "Application Support", + "Lingma", + "SharedClientCache", + ) +} + +export async function resolveLingmaWebSocketUrl( + options: LingmaConnectionOptions, +): Promise { + if (options.wsUrl) return options.wsUrl + + const cacheDir = options.cacheDir ?? getDefaultLingmaCacheDir() + const infoPath = join(cacheDir, ".info.json") + const raw = await readFile(infoPath, "utf8") + const info = JSON.parse(raw) as LingmaInfoFile + + if (!info.websocketPort) { + throw new Error(`Lingma websocketPort not found in ${infoPath}`) + } + + return `ws://127.0.0.1:${info.websocketPort}` +} diff --git a/src/services/lingma/create-chat-completions.ts b/src/services/lingma/create-chat-completions.ts new file mode 100644 index 000000000..bbcc4630d --- /dev/null +++ b/src/services/lingma/create-chat-completions.ts @@ -0,0 +1,515 @@ +import consola from "consola" + +import type { + ChatCompletionResponse, + ChatCompletionsPayload, + ContentPart, + Message, +} from "~/services/copilot/create-chat-completions" + +import { APIError } from "~/lib/error" +import { state } from "~/lib/state" + +import { ensureLingmaAcpInitialized } from "./acp" + +interface LingmaSessionNewResponse { + sessionId?: string +} + +interface LingmaSessionPromptResponse { + requestId?: string + success?: boolean + errorCode?: string + errorMessage?: string + data?: unknown + result?: unknown + stopReason?: string +} + +interface LingmaAcpPromptPayload { + sessionId: string + _meta: Record + prompt: Array<{ + type: "text" + text: string + }> +} + +interface LingmaAnswerCollector { + chunks: Array + finish: () => void + finishWithError: (error: Error) => void +} + +const COMPLETION_TIMEOUT_MS = 120_000 +const ACP_REQUEST_ID_KEY = "ai-coding/request-id" +const ACP_MODEL_KEY = "ai-coding/model" +const ACP_MODE_KEY = "ai-coding/mode" + +let lingmaChatQueue: Promise = Promise.resolve() + +export async function createLingmaChatCompletions( + payload: ChatCompletionsPayload, +): Promise { + const run = lingmaChatQueue.then( + () => createLingmaChatCompletionsUnsafe(payload), + () => createLingmaChatCompletionsUnsafe(payload), + ) + lingmaChatQueue = run.then( + () => undefined, + () => undefined, + ) + + return await run +} + +export function validateLingmaChatPayload( + payload: ChatCompletionsPayload, +): void { + if (payload.stream) { + throw new APIError( + "Lingma provider does not support stream: true in V1", + 501, + "unsupported_feature", + ) + } + + if (payload.n && payload.n > 1) { + throw new APIError("Lingma provider only supports n: 1", 400) + } + + if (payload.response_format) { + throw new APIError( + "Lingma provider does not support response_format in V1", + 501, + "unsupported_feature", + ) + } + + if (payload.tools?.length) { + throw new APIError( + "Lingma provider does not support tools in V1", + 501, + "unsupported_feature", + ) + } + + if (payload.tool_choice && payload.tool_choice !== "none") { + throw new APIError( + "Lingma provider does not support tool_choice in V1", + 501, + "unsupported_feature", + ) + } + + for (const message of payload.messages) { + if (message.tool_calls?.length) { + throw new APIError( + "Lingma provider does not support tool calls in V1", + 501, + "unsupported_feature", + ) + } + + if (Array.isArray(message.content)) { + const hasImage = message.content.some((part) => part.type === "image_url") + if (hasImage) { + throw new APIError( + "Lingma provider does not support image inputs in V1", + 501, + "unsupported_feature", + ) + } + } + } +} + +export function renderLingmaPrompt(payload: ChatCompletionsPayload): string { + const prompt = payload.messages + .map((message) => renderMessage(message)) + .filter((message) => message.length > 0) + .join("\n\n") + + if (!prompt) throw new APIError("Lingma prompt cannot be empty", 400) + + return prompt +} + +async function createLingmaChatCompletionsUnsafe( + payload: ChatCompletionsPayload, +): Promise { + validateLingmaChatPayload(payload) + + const client = state.lingmaClient + if (!client) { + throw new APIError("Lingma RPC client is not initialized", 500) + } + + const requestId = crypto.randomUUID() + const content = renderLingmaPrompt(payload) + const answer = await runLingmaAcpCompletion(requestId, payload.model, content) + + const created = Math.floor(Date.now() / 1000) + return { + id: `chatcmpl-${requestId}`, + object: "chat.completion", + created, + model: payload.model, + choices: [ + { + index: 0, + message: { + role: "assistant", + content: answer, + }, + logprobs: null, + finish_reason: "stop", + }, + ], + } +} + +async function runLingmaAcpCompletion( + requestId: string, + model: string, + content: string, +): Promise { + const client = state.lingmaClient + if (!client) throw new APIError("Lingma RPC client is not initialized", 500) + + await ensureLingmaAcpInitialized(client) + + const session = await client.request( + "session/new", + { + cwd: process.cwd(), + mcpServers: [], + _meta: {}, + }, + ) + if (!session.sessionId) { + throw new APIError("Lingma session/new did not return a sessionId", 502) + } + + const promptPayload: LingmaAcpPromptPayload = { + sessionId: session.sessionId, + _meta: { + [ACP_REQUEST_ID_KEY]: requestId, + [ACP_MODEL_KEY]: model, + [ACP_MODE_KEY]: "", + }, + prompt: [{ type: "text", text: content }], + } + + return await collectLingmaAnswer( + requestId, + session.sessionId, + async (expectedIds) => { + const response = await client.request( + "session/prompt", + promptPayload, + COMPLETION_TIMEOUT_MS, + ) + + if (response.requestId) expectedIds.add(response.requestId) + if (response.success === false) { + throw new APIError( + response.errorMessage + || response.errorCode + || "Lingma chat request failed", + 502, + ) + } + + return extractDirectAnswer(response) + }, + ) +} + +async function collectLingmaAnswer( + requestId: string, + sessionId: string, + sendRequest: (expectedIds: Set) => Promise, +): Promise { + const client = state.lingmaClient + if (!client) throw new APIError("Lingma RPC client is not initialized", 500) + + const expectedIds = new Set([requestId]) + const chunks: Array = [] + let settled = false + + return await new Promise((resolve, reject) => { + const cleanupCallbacks: Array<() => void> = [] + + const cleanup = () => { + for (const cleanupCallback of cleanupCallbacks) cleanupCallback() + } + + const timer = setTimeout(() => { + if (settled) return + settled = true + cleanup() + reject(new APIError("Lingma chat completion timed out", 504, "timeout")) + }, COMPLETION_TIMEOUT_MS) + + const finish = () => { + if (settled) return + settled = true + clearTimeout(timer) + cleanup() + resolve(chunks.join("")) + } + + const finishWithError = (error: Error) => { + if (settled) return + settled = true + clearTimeout(timer) + cleanup() + reject(error) + } + + cleanupCallbacks.push( + client.onNotification("session/update", (params) => { + if (!matchesExpectedRequest(params, expectedIds, sessionId)) return + handleSessionUpdate(params, { + chunks, + finish, + finishWithError, + }) + }), + client.onNotification("chat/answer", (params) => { + if (!matchesExpectedRequest(params, expectedIds)) return + const text = extractText(params) + if (text) appendLingmaAnswerText(chunks, text) + }), + client.onNotification("chat/finish", (params) => { + if (!matchesExpectedRequest(params, expectedIds)) return + const text = extractText(params) + if (text) appendLingmaAnswerText(chunks, text) + finish() + }), + client.onNotification("chat/error", (params) => { + if (!matchesExpectedRequest(params, expectedIds)) return + finishWithError( + new APIError( + extractErrorMessage(params) ?? "Lingma chat failed", + 502, + ), + ) + }), + client.onNotification("chat/think", (params) => { + if (matchesExpectedRequest(params, expectedIds)) { + consola.debug("Lingma thinking event:", params) + } + }), + ) + + sendRequest(expectedIds) + .then((directAnswer) => { + if (!directAnswer) return + appendLingmaAnswerText(chunks, directAnswer) + finish() + }) + .catch((error: unknown) => { + finishWithError( + error instanceof Error ? error : new Error(String(error)), + ) + }) + }) +} + +function handleSessionUpdate( + params: unknown, + collector: LingmaAnswerCollector, +): void { + const update = getRecordValue(params, "update") + const sessionUpdate = extractKnownString(update, ["sessionUpdate"]) + + if (sessionUpdate === "agent_message_chunk") { + const text = extractText(getRecordValue(update, "content")) + if (text) appendLingmaAnswerText(collector.chunks, text) + return + } + + if (sessionUpdate === "agent_thought_chunk") { + consola.debug("Lingma thinking event:", params) + return + } + + if (sessionUpdate !== "notification") return + + const type = extractKnownString(update, ["type"]) + if (type === "chat_finish") { + const data = getRecordValue(update, "data") + const fullAnswer = extractKnownString(data, ["fullAnswer"]) + if (fullAnswer) appendLingmaAnswerText(collector.chunks, fullAnswer) + + const statusCode = extractStatusCode(data) + const reason = extractKnownString(data, ["reason"]) + if ((statusCode && statusCode >= 400) || isFailureReason(reason)) { + collector.finishWithError( + new APIError( + extractErrorMessage(data) ?? reason ?? "Lingma chat failed", + 502, + ), + ) + return + } + + collector.finish() + return + } + + if (type === "chat_error" || type === "error") { + collector.finishWithError( + new APIError(extractErrorMessage(update) ?? "Lingma chat failed", 502), + ) + } +} + +function renderMessage(message: Message): string { + const content = renderContent(message.content) + if (!content) return "" + + const name = message.name ? ` ${message.name}` : "" + const toolCallId = + message.tool_call_id ? ` tool_call_id=${message.tool_call_id}` : "" + return `${message.role.toUpperCase()}${name}${toolCallId}:\n${content}` +} + +function renderContent(content: Message["content"]): string { + if (typeof content === "string") return content + if (!content) return "" + + return content + .map((part) => renderContentPart(part)) + .filter(Boolean) + .join("\n") +} + +function renderContentPart(part: ContentPart): string { + if (part.type === "text") return part.text + throw new APIError( + "Lingma provider does not support image inputs in V1", + 501, + "unsupported_feature", + ) +} + +function extractDirectAnswer( + response: LingmaSessionPromptResponse, +): string | undefined { + return extractText(response.data) ?? extractText(response.result) +} + +function appendLingmaAnswerText(chunks: Array, text: string): void { + if (!text) return + const current = chunks.join("") + if (current && text.startsWith(current)) { + chunks.splice(0, chunks.length, text) + return + } + chunks.push(text) +} + +function matchesExpectedRequest( + params: unknown, + expectedIds: Set, + expectedSessionId?: string, +): boolean { + const eventRequestId = extractRequestId(params) + if (eventRequestId && !expectedIds.has(eventRequestId)) return false + + const eventSessionId = extractSessionId(params) + if ( + expectedSessionId + && eventSessionId + && eventSessionId !== expectedSessionId + ) { + return false + } + + if (expectedSessionId) return Boolean(eventRequestId || eventSessionId) + return !eventRequestId || expectedIds.has(eventRequestId) +} + +function extractRequestId(value: unknown): string | undefined { + return extractKnownString(value, [ + ACP_REQUEST_ID_KEY, + "requestId", + "request_id", + "id", + ]) +} + +function extractSessionId(value: unknown): string | undefined { + return extractKnownString(value, ["sessionId", "session_id"]) +} + +function extractText(value: unknown): string | undefined { + if (typeof value === "string") return value + + return extractKnownString(value, ["content", "text", "answer", "delta"]) +} + +function extractErrorMessage(value: unknown): string | undefined { + return extractKnownString(value, [ + "errorMessage", + "message", + "error", + "errorCode", + ]) +} + +function extractStatusCode(value: unknown): number | undefined { + if (!isRecord(value)) return undefined + + const statusCode = value.statusCode + if (typeof statusCode === "number") return statusCode + + return undefined +} + +function isFailureReason(reason: string | undefined): boolean { + return Boolean(reason && !["end_turn", "stop", "success"].includes(reason)) +} + +function extractKnownString( + value: unknown, + keys: Array, + depth = 0, +): string | undefined { + if (depth > 3 || !isRecord(value)) return undefined + + for (const key of keys) { + const candidate = value[key] + if (typeof candidate === "string" && candidate.length > 0) { + return candidate + } + } + + for (const key of [ + "_meta", + "data", + "result", + "payload", + "message", + "content", + "update", + ]) { + const nested = value[key] + const candidate = extractKnownString(nested, keys, depth + 1) + if (candidate) return candidate + } + + return undefined +} + +function getRecordValue(value: unknown, key: string): unknown { + if (!isRecord(value)) return undefined + return value[key] +} + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null +} diff --git a/src/services/lingma/json-rpc.ts b/src/services/lingma/json-rpc.ts new file mode 100644 index 000000000..ea9acbbac --- /dev/null +++ b/src/services/lingma/json-rpc.ts @@ -0,0 +1,321 @@ +import consola from "consola" +import { Buffer } from "node:buffer" + +import { APIError } from "~/lib/error" + +export type JsonRpcId = number | string + +export interface JsonRpcRequestMessage { + jsonrpc: "2.0" + id: JsonRpcId + method: string + params?: unknown +} + +export interface JsonRpcNotificationMessage { + jsonrpc: "2.0" + method: string + params?: unknown +} + +export interface JsonRpcResponseMessage { + jsonrpc: "2.0" + id: JsonRpcId + result?: unknown + error?: { + code?: number + message?: string + data?: unknown + } +} + +type JsonRpcOutgoingMessage = JsonRpcNotificationMessage | JsonRpcRequestMessage +type JsonRpcIncomingMessage = + | JsonRpcNotificationMessage + | JsonRpcResponseMessage +type NotificationHandler = ( + params: unknown, + message: JsonRpcNotificationMessage, +) => void + +interface PendingRequest { + resolve: (value: unknown) => void + reject: (reason: Error) => void + timer: ReturnType +} + +const HEADER_DELIMITER = Buffer.from("\r\n\r\n") +const DEFAULT_REQUEST_TIMEOUT_MS = 30_000 + +export function encodeJsonRpcFrame(message: JsonRpcOutgoingMessage): string { + const payload = JSON.stringify(message) + return `Content-Length: ${Buffer.byteLength(payload, "utf8")}\r\n\r\n${payload}` +} + +export class JsonRpcFrameParser { + private buffer = Buffer.alloc(0) + + push(chunk: unknown): Array { + this.buffer = Buffer.concat([this.buffer, toBuffer(chunk)]) + + const messages: Array = [] + while (true) { + const headerEnd = this.buffer.indexOf(HEADER_DELIMITER) + if (headerEnd === -1) break + + const header = this.buffer.subarray(0, headerEnd).toString("ascii") + const match = /(?:^|\r\n)Content-Length:\s*(\d+)/i.exec(header) + if (!match?.[1]) throw new Error("Invalid Lingma JSON-RPC frame header") + + const contentLength = Number.parseInt(match[1], 10) + const bodyStart = headerEnd + HEADER_DELIMITER.length + const frameEnd = bodyStart + contentLength + if (this.buffer.length < frameEnd) break + + const body = this.buffer.subarray(bodyStart, frameEnd).toString("utf8") + this.buffer = this.buffer.subarray(frameEnd) + messages.push(parseJsonRpcMessage(body)) + } + + return messages + } + + reset(): void { + this.buffer = Buffer.alloc(0) + } +} + +export class LingmaRpcClient { + private socket?: WebSocket + private connectPromise?: Promise + private readonly parser = new JsonRpcFrameParser() + private readonly requestTimeoutMs: number + private readonly url: string + private connectionGeneration = 0 + private requestId = 0 + private readonly pending = new Map() + private readonly notificationHandlers = new Map< + string, + Set + >() + + constructor(url: string, requestTimeoutMs = DEFAULT_REQUEST_TIMEOUT_MS) { + this.url = url + this.requestTimeoutMs = requestTimeoutMs + } + + get generation(): number { + return this.connectionGeneration + } + + async connect(): Promise { + if (this.socket?.readyState === 1) return + if (this.connectPromise) { + await this.connectPromise + return + } + + this.connectPromise = new Promise((resolve, reject) => { + const socket = new WebSocket(this.url) + this.socket = socket + + socket.addEventListener("open", () => { + this.connectionGeneration += 1 + this.connectPromise = undefined + resolve() + }) + + socket.addEventListener("error", () => { + const error = new APIError( + `Failed to connect to Lingma RPC at ${this.url}`, + 502, + ) + this.connectPromise = undefined + reject(error) + }) + + socket.addEventListener("close", () => { + const error = new APIError("Lingma RPC connection closed", 502) + this.socket = undefined + this.connectPromise = undefined + this.parser.reset() + this.rejectPending(error) + }) + + socket.addEventListener("message", (event) => { + this.handleSocketMessage(event.data as unknown) + }) + }) + + await this.connectPromise + } + + async request( + method: string, + params?: unknown, + timeoutMs = this.requestTimeoutMs, + ): Promise { + await this.connect() + + const id = this.requestId++ + const message: JsonRpcRequestMessage = { + jsonrpc: "2.0", + id, + method, + params, + } + + return await new Promise((resolve, reject) => { + const timer = setTimeout(() => { + this.pending.delete(id) + reject(new APIError(`Lingma RPC request timed out: ${method}`, 504)) + }, timeoutMs) + + this.pending.set(id, { + resolve: (value) => { + resolve(value as T) + }, + reject, + timer, + }) + + try { + this.send(message) + } catch (error) { + clearTimeout(timer) + this.pending.delete(id) + reject(error instanceof Error ? error : new Error(String(error))) + } + }) + } + + onNotification(method: string, handler: NotificationHandler): () => void { + const handlers = this.notificationHandlers.get(method) ?? new Set() + handlers.add(handler) + this.notificationHandlers.set(method, handlers) + + return () => { + handlers.delete(handler) + if (handlers.size === 0) this.notificationHandlers.delete(method) + } + } + + async notify(method: string, params?: unknown): Promise { + await this.connect() + this.send({ + jsonrpc: "2.0", + method, + params, + }) + } + + close(): void { + this.rejectPending(new APIError("Lingma RPC connection closed", 502)) + this.socket?.close() + this.socket = undefined + this.connectPromise = undefined + this.parser.reset() + } + + private send(message: JsonRpcOutgoingMessage): void { + if (this.socket?.readyState !== 1) { + throw new APIError("Lingma RPC is not connected", 502) + } + + this.socket.send(encodeJsonRpcFrame(message)) + } + + private handleSocketMessage(data: unknown): void { + let messages: Array + try { + messages = this.parser.push(data) + } catch (error) { + const reason = + error instanceof Error ? error : ( + new Error("Failed to parse Lingma JSON-RPC frame") + ) + this.rejectPending(reason) + this.socket?.close() + return + } + + for (const message of messages) { + if ("id" in message && !("method" in message)) { + this.handleResponse(message) + } else { + this.handleNotification(message as JsonRpcNotificationMessage) + } + } + } + + private handleResponse(message: JsonRpcResponseMessage): void { + const pending = this.pending.get(message.id) + if (!pending) return + + clearTimeout(pending.timer) + this.pending.delete(message.id) + + if (message.error) { + pending.reject( + new APIError(message.error.message ?? "Lingma RPC request failed", 502), + ) + return + } + + pending.resolve(message.result) + } + + private handleNotification(message: JsonRpcNotificationMessage): void { + const handlers = this.notificationHandlers.get(message.method) + if (!handlers) return + + for (const handler of handlers) { + try { + handler(message.params, message) + } catch (error) { + consola.warn("Lingma notification handler failed:", error) + } + } + } + + private rejectPending(error: Error): void { + for (const pending of this.pending.values()) { + clearTimeout(pending.timer) + pending.reject(error) + } + this.pending.clear() + } +} + +function toBuffer(chunk: unknown): Buffer { + if (typeof chunk === "string") return Buffer.from(chunk, "utf8") + if (Buffer.isBuffer(chunk)) return chunk + if (chunk instanceof ArrayBuffer) return Buffer.from(chunk) + if (chunk instanceof Uint8Array) { + return Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength) + } + + throw new Error("Unsupported Lingma JSON-RPC frame chunk") +} + +function parseJsonRpcMessage(body: string): JsonRpcIncomingMessage { + const parsed = JSON.parse(body) as unknown + if (!isRecord(parsed)) throw new Error("Invalid Lingma JSON-RPC message") + if (parsed.jsonrpc !== "2.0") { + throw new Error("Invalid Lingma JSON-RPC version") + } + + if ("id" in parsed && ("result" in parsed || "error" in parsed)) { + return parsed as unknown as JsonRpcResponseMessage + } + + if (typeof parsed.method === "string") { + return parsed as unknown as JsonRpcNotificationMessage + } + + throw new Error("Unsupported Lingma JSON-RPC message") +} + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null +} diff --git a/src/services/lingma/models.ts b/src/services/lingma/models.ts new file mode 100644 index 000000000..1c5bc861c --- /dev/null +++ b/src/services/lingma/models.ts @@ -0,0 +1,44 @@ +import type { ModelsResponse } from "~/services/copilot/get-models" + +export const DEFAULT_LINGMA_MODEL_IDS = [ + "org_auto", + "dashscope_qmodel", + "dashscope_qwen3_coder", + "dashscope_qwen_plus_20250428_thinking", + "dashscope_qwen_max_latest", + "kmodel", + "mmodel", +] + +export function createLingmaModels(modelIds: Array): ModelsResponse { + return { + object: "list", + data: modelIds.map((id) => ({ + id, + name: id, + object: "model", + vendor: "lingma", + version: "", + preview: false, + model_picker_enabled: true, + capabilities: { + family: "lingma", + limits: {}, + object: "model_capabilities", + supports: {}, + tokenizer: "o200k_base", + type: "chat", + }, + })), + } +} + +export function parseLingmaModelIds(raw?: string): Array { + const modelIds = + raw + ?.split(",") + .map((model) => model.trim()) + .filter((model) => model.length > 0) ?? DEFAULT_LINGMA_MODEL_IDS + + return [...new Set(modelIds)] +} diff --git a/src/services/lingma/provider.ts b/src/services/lingma/provider.ts new file mode 100644 index 000000000..1d7c7f240 --- /dev/null +++ b/src/services/lingma/provider.ts @@ -0,0 +1,49 @@ +import consola from "consola" + +import { APIError } from "~/lib/error" +import { state } from "~/lib/state" + +import { ensureLingmaAcpInitialized } from "./acp" +import { resolveLingmaWebSocketUrl } from "./config" +import { LingmaRpcClient } from "./json-rpc" +import { createLingmaModels, parseLingmaModelIds } from "./models" + +interface SetupLingmaProviderOptions { + cacheDir?: string + wsUrl?: string + models?: string +} + +interface LingmaAuthStatus { + status?: number + name?: string + email?: string +} + +export async function setupLingmaProvider( + options: SetupLingmaProviderOptions, +): Promise { + const wsUrl = await resolveLingmaWebSocketUrl({ + cacheDir: options.cacheDir, + wsUrl: options.wsUrl, + }) + const client = new LingmaRpcClient(wsUrl) + await client.connect() + + const authStatus = await client.request("auth/status", {}) + if (authStatus.status !== 2) { + client.close() + throw new APIError( + "Lingma IDE is not logged in. Open Lingma IDE, complete login, then start this server again.", + 401, + "authentication_error", + ) + } + + state.lingmaClient = client + state.models = createLingmaModels(parseLingmaModelIds(options.models)) + await ensureLingmaAcpInitialized(client) + + const label = authStatus.name || authStatus.email || "current Lingma user" + consola.info(`Using Lingma local IDE session for ${label}`) +} diff --git a/src/start.ts b/src/start.ts index 14abbbdff..cee6cdbef 100644 --- a/src/start.ts +++ b/src/start.ts @@ -9,12 +9,14 @@ import invariant from "tiny-invariant" import { ensurePaths } from "./lib/paths" import { initProxyFromEnv } from "./lib/proxy" import { generateEnvScript } from "./lib/shell" -import { state } from "./lib/state" +import { state, type Provider } from "./lib/state" import { setupCopilotToken, setupGitHubToken } from "./lib/token" import { cacheModels, cacheVSCodeVersion } from "./lib/utils" import { server } from "./server" +import { setupLingmaProvider } from "./services/lingma/provider" interface RunServerOptions { + provider: Provider port: number verbose: boolean accountType: string @@ -25,6 +27,9 @@ interface RunServerOptions { claudeCode: boolean showToken: boolean proxyEnv: boolean + lingmaCacheDir?: string + lingmaWsUrl?: string + lingmaModels?: string } export async function runServer(options: RunServerOptions): Promise { @@ -37,8 +42,9 @@ export async function runServer(options: RunServerOptions): Promise { consola.info("Verbose logging enabled") } + state.provider = options.provider state.accountType = options.accountType - if (options.accountType !== "individual") { + if (options.provider === "copilot" && options.accountType !== "individual") { consola.info(`Using ${options.accountType} plan GitHub account`) } @@ -48,17 +54,30 @@ export async function runServer(options: RunServerOptions): Promise { state.showToken = options.showToken await ensurePaths() - await cacheVSCodeVersion() - if (options.githubToken) { - state.githubToken = options.githubToken - consola.info("Using provided GitHub token") - } else { - await setupGitHubToken() + if (options.provider === "lingma" && options.claudeCode) { + throw new Error("--claude-code is only supported by the copilot provider") } - await setupCopilotToken() - await cacheModels() + if (options.provider === "lingma") { + await setupLingmaProvider({ + cacheDir: options.lingmaCacheDir, + wsUrl: options.lingmaWsUrl, + models: options.lingmaModels, + }) + } else { + await cacheVSCodeVersion() + + if (options.githubToken) { + state.githubToken = options.githubToken + consola.info("Using provided GitHub token") + } else { + await setupGitHubToken() + } + + await setupCopilotToken() + await cacheModels() + } consola.info( `Available models: \n${state.models?.data.map((model) => `- ${model.id}`).join("\n")}`, @@ -110,9 +129,13 @@ export async function runServer(options: RunServerOptions): Promise { } } - consola.box( - `🌐 Usage Viewer: https://ericc-ch.github.io/copilot-api?endpoint=${serverUrl}/usage`, - ) + if (options.provider === "copilot") { + consola.box( + `🌐 Usage Viewer: https://ericc-ch.github.io/copilot-api?endpoint=${serverUrl}/usage`, + ) + } else { + consola.info(`Lingma OpenAI-compatible endpoint: ${serverUrl}/v1`) + } serve({ fetch: server.fetch as ServerHandler, @@ -132,6 +155,11 @@ export const start = defineCommand({ default: "4141", description: "Port to listen on", }, + provider: { + type: "string", + default: "copilot", + description: "Provider to use (copilot, lingma)", + }, verbose: { alias: "v", type: "boolean", @@ -184,6 +212,18 @@ export const start = defineCommand({ default: false, description: "Initialize proxy from environment variables", }, + "lingma-cache-dir": { + type: "string", + description: "Lingma SharedClientCache directory containing .info.json", + }, + "lingma-ws-url": { + type: "string", + description: "Override Lingma local JSON-RPC WebSocket URL", + }, + "lingma-models": { + type: "string", + description: "Comma-separated Lingma model ids to expose", + }, }, run({ args }) { const rateLimitRaw = args["rate-limit"] @@ -192,6 +232,7 @@ export const start = defineCommand({ rateLimitRaw === undefined ? undefined : Number.parseInt(rateLimitRaw, 10) return runServer({ + provider: parseProvider(args.provider), port: Number.parseInt(args.port, 10), verbose: args.verbose, accountType: args["account-type"], @@ -202,6 +243,14 @@ export const start = defineCommand({ claudeCode: args["claude-code"], showToken: args["show-token"], proxyEnv: args["proxy-env"], + lingmaCacheDir: args["lingma-cache-dir"], + lingmaWsUrl: args["lingma-ws-url"], + lingmaModels: args["lingma-models"], }) }, }) + +function parseProvider(value: string): Provider { + if (value === "copilot" || value === "lingma") return value + throw new Error(`Unsupported provider: ${value}`) +} diff --git a/tests/lingma-provider.test.ts b/tests/lingma-provider.test.ts new file mode 100644 index 000000000..bbd7c37fd --- /dev/null +++ b/tests/lingma-provider.test.ts @@ -0,0 +1,314 @@ +import { afterEach, describe, expect, test } from "bun:test" +import { mkdtemp, rm, writeFile } from "node:fs/promises" +import { tmpdir } from "node:os" +import { join } from "node:path" + +import { state } from "../src/lib/state" +import { server } from "../src/server" +import { resolveLingmaWebSocketUrl } from "../src/services/lingma/config" +import { + createLingmaChatCompletions, + renderLingmaPrompt, + validateLingmaChatPayload, +} from "../src/services/lingma/create-chat-completions" +import { + encodeJsonRpcFrame, + JsonRpcFrameParser, + LingmaRpcClient, +} from "../src/services/lingma/json-rpc" +import { createLingmaModels } from "../src/services/lingma/models" + +afterEach(() => { + state.provider = "copilot" + state.lingmaClient = undefined + state.models = undefined +}) + +describe("Lingma JSON-RPC framing", () => { + test("parses split and coalesced frames", () => { + const parser = new JsonRpcFrameParser() + const first = encodeJsonRpcFrame({ + jsonrpc: "2.0", + id: 1, + method: "auth/status", + params: {}, + }) + const second = encodeJsonRpcFrame({ + jsonrpc: "2.0", + method: "chat/answer", + params: { content: "ok" }, + }) + + expect(parser.push(first.slice(0, 10))).toEqual([]) + expect(parser.push(first.slice(10) + second)).toEqual([ + { + jsonrpc: "2.0", + id: 1, + method: "auth/status", + params: {}, + }, + { + jsonrpc: "2.0", + method: "chat/answer", + params: { content: "ok" }, + }, + ]) + }) + + test("throws on invalid JSON frames", () => { + const parser = new JsonRpcFrameParser() + expect(() => parser.push("Content-Length: 1\r\n\r\n{")).toThrow() + }) + + test("times out unanswered requests", async () => { + const originalWebSocket = globalThis.WebSocket + const globalWithWebSocket = globalThis as unknown as { + WebSocket: typeof WebSocket + } + globalWithWebSocket.WebSocket = + TimeoutWebSocket as unknown as typeof WebSocket + + try { + const client = new LingmaRpcClient("ws://127.0.0.1:1", 5) + let error: unknown + try { + await client.request("auth/status", {}) + } catch (caughtError) { + error = caughtError + } + + expect(error).toBeInstanceOf(Error) + expect((error as Error).message).toContain("timed out") + client.close() + } finally { + globalWithWebSocket.WebSocket = originalWebSocket + } + }) +}) + +describe("Lingma config and models", () => { + test("resolves websocket URL from .info.json", async () => { + const dir = await mkdtemp(join(tmpdir(), "lingma-cache-")) + try { + await writeFile( + join(dir, ".info.json"), + JSON.stringify({ websocketPort: 36510 }), + ) + + const discoveredUrl = await resolveLingmaWebSocketUrl({ cacheDir: dir }) + const overrideUrl = await resolveLingmaWebSocketUrl({ + wsUrl: "ws://127.0.0.1:45678", + }) + + expect(discoveredUrl).toBe("ws://127.0.0.1:36510") + expect(overrideUrl).toBe("ws://127.0.0.1:45678") + } finally { + await rm(dir, { recursive: true, force: true }) + } + }) + + test("creates Copilot-shaped static model responses", () => { + const models = createLingmaModels(["lingma"]) + expect(models.data[0].id).toBe("lingma") + expect(models.data[0].vendor).toBe("lingma") + expect(models.data[0].capabilities.tokenizer).toBe("o200k_base") + }) +}) + +describe("Lingma chat completions", () => { + test("renders text-only OpenAI messages with roles", () => { + const prompt = renderLingmaPrompt({ + model: "lingma", + messages: [ + { role: "system", content: "Be terse." }, + { + role: "user", + content: [ + { type: "text", text: "Hello" }, + { type: "text", text: "World" }, + ], + }, + ], + }) + + expect(prompt).toBe("SYSTEM:\nBe terse.\n\nUSER:\nHello\nWorld") + }) + + test("rejects unsupported V1 payload features", () => { + expect(() => + validateLingmaChatPayload({ + model: "lingma", + stream: true, + messages: [{ role: "user", content: "Hello" }], + }), + ).toThrow("stream") + + expect(() => + validateLingmaChatPayload({ + model: "lingma", + tools: [ + { + type: "function", + function: { name: "search", parameters: {} }, + }, + ], + messages: [{ role: "user", content: "Hello" }], + }), + ).toThrow("tools") + }) + + test("assembles a non-streaming OpenAI response from Lingma events", async () => { + state.provider = "lingma" + state.lingmaClient = createFakeLingmaClient("Hello from Lingma") + + const response = await createLingmaChatCompletions({ + model: "lingma", + messages: [{ role: "user", content: "Hello" }], + }) + + expect(response.object).toBe("chat.completion") + expect(response.choices[0].message.content).toBe("Hello from Lingma") + expect(response.usage).toBeUndefined() + }) +}) + +describe("Lingma routes", () => { + test("returns static models in Lingma mode", async () => { + state.provider = "lingma" + state.models = createLingmaModels(["lingma"]) + + const response = await server.request("/v1/models") + const body = (await response.json()) as { data: Array<{ id: string }> } + + expect(response.status).toBe(200) + expect(body.data[0].id).toBe("lingma") + }) + + test("rejects Lingma streaming chat requests", async () => { + state.provider = "lingma" + state.models = createLingmaModels(["lingma"]) + + const response = await server.request("/v1/chat/completions", { + method: "POST", + body: JSON.stringify({ + model: "lingma", + stream: true, + messages: [{ role: "user", content: "Hello" }], + }), + }) + + expect(response.status).toBe(501) + }) + + test("rejects unsupported Lingma endpoints", async () => { + state.provider = "lingma" + + const response = await server.request("/v1/messages", { + method: "POST", + body: JSON.stringify({ + model: "lingma", + max_tokens: 1, + messages: [{ role: "user", content: "Hello" }], + }), + }) + + expect(response.status).toBe(501) + }) +}) + +class TimeoutWebSocket { + readyState = 0 + private readonly listeners = new Map void>>() + + constructor(_url: string) { + queueMicrotask(() => { + this.readyState = 1 + this.emit("open") + }) + } + + addEventListener(type: string, handler: () => void): void { + const handlers = this.listeners.get(type) ?? new Set() + handlers.add(handler) + this.listeners.set(type, handlers) + } + + send(_data: string): void {} + + close(): void { + this.readyState = 3 + this.emit("close") + } + + private emit(type: string): void { + for (const handler of this.listeners.get(type) ?? []) handler() + } +} + +function createFakeLingmaClient(answer: string): LingmaRpcClient { + const handlers = new Map void>>() + const sessionId = "session-id" + const generation = 1 + + return { + generation, + connect: () => Promise.resolve(), + request: (method: string, params: unknown) => { + if (method === "initialize") { + return Promise.resolve({}) + } + + if (method === "session/new") { + return Promise.resolve({ sessionId }) + } + + const request = params as { + sessionId: string + _meta: Record + } + return new Promise((resolve) => { + queueMicrotask(() => { + emit(handlers, "session/update", { + sessionId: request.sessionId, + _meta: { + "ai-coding/request-id": request._meta["ai-coding/request-id"], + }, + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: answer }, + }, + }) + emit(handlers, "session/update", { + sessionId: request.sessionId, + _meta: { + "ai-coding/request-id": request._meta["ai-coding/request-id"], + }, + update: { + sessionUpdate: "notification", + type: "chat_finish", + data: { reason: "success", statusCode: 200 }, + }, + }) + resolve({ stopReason: "end_turn" }) + }) + }) + }, + onNotification: (method: string, handler: (params: unknown) => void) => { + const methodHandlers = handlers.get(method) ?? new Set() + methodHandlers.add(handler) + handlers.set(method, methodHandlers) + + return () => { + methodHandlers.delete(handler) + } + }, + } as unknown as LingmaRpcClient +} + +function emit( + handlers: Map void>>, + method: string, + params: unknown, +): void { + for (const handler of handlers.get(method) ?? []) handler(params) +}