forked from CopilotKit/CopilotKit
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrequest-handler.ts
More file actions
130 lines (112 loc) · 3.3 KB
/
Copy pathrequest-handler.ts
File metadata and controls
130 lines (112 loc) · 3.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import type { IncomingMessage } from "http";
import { Readable } from "node:stream";
export type IncomingWithBody = IncomingMessage & {
body?: unknown;
complete?: boolean;
};
export function readableStreamToNodeStream(
webStream: ReadableStream,
): Readable {
const reader = webStream.getReader();
return new Readable({
async read() {
try {
const { done, value } = await reader.read();
if (done) {
this.push(null);
} else {
this.push(Buffer.from(value));
}
} catch (err) {
this.destroy(err as Error);
}
},
});
}
export function nodeStreamToReadableStream(
nodeStream: Readable,
): ReadableStream<Uint8Array> {
return new ReadableStream({
start(controller) {
nodeStream.on("data", (chunk) => {
controller.enqueue(
chunk instanceof Buffer ? new Uint8Array(chunk) : chunk,
);
});
nodeStream.on("end", () => {
controller.close();
});
nodeStream.on("error", (err) => {
controller.error(err);
});
},
cancel() {
nodeStream.destroy();
},
});
}
export function getFullUrl(req: IncomingMessage): string {
// Use req.url (path relative to mount point) for Hono routing to work correctly.
// Express sets req.url to the path after the mount point (e.g., "/" when mounted at "/copilotkit").
// Pure Node HTTP sets req.url to the full path.
const path = req.url || "/";
const host =
(req.headers["x-forwarded-host"] as string) ||
(req.headers.host as string) ||
"localhost";
const proto =
(req.headers["x-forwarded-proto"] as string) ||
((req.socket as any).encrypted ? "https" : "http");
return `${proto}://${host}${path}`;
}
export function toHeaders(rawHeaders: IncomingMessage["headers"]): Headers {
const headers = new Headers();
for (const [key, value] of Object.entries(rawHeaders)) {
if (value === undefined) continue;
if (Array.isArray(value)) {
value.forEach((entry) => headers.append(key, entry));
continue;
}
headers.append(key, value);
}
return headers;
}
export function isStreamConsumed(req: IncomingWithBody): boolean {
const readableState = (req as any)._readableState;
return Boolean(
req.readableEnded ||
req.complete ||
readableState?.ended ||
readableState?.endEmitted,
);
}
export function synthesizeBodyFromParsedBody(
parsedBody: unknown,
headers: Headers,
): { body: BodyInit | null; contentType?: string } {
if (parsedBody === null || parsedBody === undefined) {
return { body: null };
}
if (parsedBody instanceof Buffer || parsedBody instanceof Uint8Array) {
// Buffer/Uint8Array<ArrayBufferLike> are valid fetch bodies at runtime,
// but the DOM lib's BodyInit only admits ArrayBuffer-backed views.
return { body: parsedBody as BodyInit };
}
if (typeof parsedBody === "string") {
return {
body: parsedBody,
contentType: headers.get("content-type") ?? "text/plain",
};
}
return {
body: JSON.stringify(parsedBody),
contentType: "application/json",
};
}
export function isDisturbedOrLockedError(error: unknown): boolean {
return (
error instanceof TypeError &&
typeof error.message === "string" &&
(error.message.includes("disturbed") || error.message.includes("locked"))
);
}