forked from CopilotKit/CopilotKit
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstreaming-fetch.ts
More file actions
367 lines (333 loc) · 12.3 KB
/
Copy pathstreaming-fetch.ts
File metadata and controls
367 lines (333 loc) · 12.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
/**
* Streaming fetch implementation for React Native.
*
* React Native's built-in fetch doesn't support response.body.getReader()
* (ReadableStream). This replaces global.fetch with an XHR-based
* implementation that streams chunks via ReadableStream, enabling
* CopilotKit's SSE-based agent communication.
*
* If native fetch already supports ReadableStream bodies (newer RN / Hermes),
* the replacement is skipped entirely.
*
* THREADING NOTE: In React Native, XHR callbacks (onprogress, onload, etc.)
* may fire on a native networking thread. Pushing data into the ReadableStream
* from that thread can trigger downstream React setState calls on the wrong
* thread, causing iOS to kill the process with "deleted thread with uncommitted
* CATransaction". All stream-mutating operations are therefore deferred via
* setTimeout(fn, 0) to bounce back to the JS thread (main thread in Hermes).
*
* Call `installStreamingFetch()` once at app startup after polyfills.
*/
declare const global: typeof globalThis;
/** Subset of the Response interface implemented by the streaming fetch polyfill. */
interface StreamingFetchResponse {
readonly ok: boolean;
readonly status: number;
readonly statusText: string;
readonly url: string;
readonly type: string;
readonly redirected: boolean;
readonly bodyUsed: boolean;
readonly headers: Headers;
readonly body: ReadableStream<Uint8Array>;
json(): Promise<unknown>;
text(): Promise<string>;
arrayBuffer(): Promise<ArrayBuffer>;
blob(): Promise<Blob>;
clone(): never;
formData(): Promise<never>;
}
function createAbortError(): DOMException {
return new (global as any).DOMException(
"The operation was aborted.",
"AbortError",
);
}
export function installStreamingFetch(): void {
// Skip if native fetch already supports ReadableStream body.
// Newer React Native versions (Hermes) may support this natively.
try {
const testResponse = new Response("");
if (
testResponse.body != null &&
typeof testResponse.body.getReader === "function"
) {
return;
}
} catch (e) {
// Response constructor unavailable — expected in older RN environments.
if (
__DEV__ &&
e instanceof Error &&
!(e instanceof ReferenceError) &&
!(e instanceof TypeError)
) {
console.warn(
"[CopilotKit] Unexpected error during streaming fetch feature detection, " +
"installing XHR-based polyfill:",
e,
);
}
}
const originalFetch = global.fetch;
const TextEncoder = global.TextEncoder;
const streamingFetch = function streamingFetch(
input: RequestInfo | URL,
init?: RequestInit,
): Promise<Response> {
// Extract defaults from Request object when input is a Request
const request =
typeof input !== "string" && !(input instanceof URL) ? input : null;
let url: string;
if (typeof input === "string") {
url = input;
} else if (input instanceof URL) {
url = input.href;
} else {
url = (input as Request).url;
}
const method = init?.method || request?.method || "GET";
const headers = init?.headers || (request ? request.headers : {});
const body = (init?.body ?? request?.body) as string | null | undefined;
const signal = init?.signal || request?.signal;
return new Promise((resolve, reject) => {
// Reject immediately if signal is already aborted (per fetch spec)
if (signal?.aborted) {
reject(createAbortError());
return;
}
const xhr = new XMLHttpRequest();
xhr.open(method, url);
// Default 60s timeout to prevent hanging on stalled mobile connections
// (WiFi→cellular transitions, tunnels, serverless cold starts).
// Callers can still use AbortSignal.timeout() for finer control.
xhr.timeout = 60_000;
let headerEntries: [string, string][];
if (headers instanceof Headers) {
headerEntries = Array.from(headers.entries());
} else if (Array.isArray(headers)) {
headerEntries = headers as [string, string][];
} else {
headerEntries = Object.entries(headers as Record<string, string>);
}
for (const [key, value] of headerEntries) {
xhr.setRequestHeader(key, value as string);
}
xhr.responseType = "text";
let streamController: ReadableStreamDefaultController<Uint8Array> | null =
null;
let lastIndex = 0;
let streamClosed = false;
let settled = false;
const encoder = new TextEncoder();
// Promise that resolves/rejects when XHR completes or fails
let resolveFullText: (text: string) => void;
let rejectFullText: (error: Error) => void;
const fullTextPromise = new Promise<string>((res, rej) => {
resolveFullText = res;
rejectFullText = rej;
});
// Prevent unhandled rejection when error occurs but .text()/.json() is never called
fullTextPromise.catch(() => {});
function closeStream() {
if (streamController && !streamClosed) {
streamClosed = true;
streamController.close();
}
}
function errorStream(err: Error) {
if (streamController && !streamClosed) {
streamClosed = true;
streamController.error(err);
}
}
function flushChunks() {
if (
streamController &&
!streamClosed &&
xhr.responseText.length > lastIndex
) {
const newData = xhr.responseText.slice(lastIndex);
lastIndex = xhr.responseText.length;
streamController.enqueue(encoder.encode(newData));
}
}
/** Centralized error handler — errors the stream, rejects fullTextPromise,
* and rejects the outer fetch promise if not yet settled. */
function fail(err: Error) {
cleanupAbortListener();
errorStream(err);
rejectFullText(err);
if (!settled) {
settled = true;
reject(err);
}
}
const onAbort = () => {
fail(createAbortError());
xhr.abort();
};
if (signal) {
signal.addEventListener("abort", onAbort);
}
function cleanupAbortListener() {
if (signal) {
signal.removeEventListener("abort", onAbort);
}
}
const stream = new ReadableStream<Uint8Array>({
start(controller) {
streamController = controller;
},
cancel() {
xhr.abort();
rejectFullText(createAbortError());
},
});
// All XHR callbacks are wrapped with setTimeout(fn, 0) to ensure they
// run on the JS thread. In React Native, XHR callbacks may fire on a
// native networking thread; calling streamController.enqueue() there
// triggers downstream React setState on the wrong thread, which causes
// iOS to kill the process ("deleted thread with uncommitted CATransaction").
// setTimeout(fn, 0) defers execution to the JS event loop (main thread
// in Hermes) with negligible latency — streaming still feels real-time.
xhr.onprogress = function () {
setTimeout(() => {
try {
flushChunks();
} catch (err) {
fail(err instanceof Error ? err : new Error(String(err)));
xhr.abort();
}
}, 0);
};
xhr.onload = function () {
setTimeout(() => {
cleanupAbortListener();
try {
flushChunks();
} catch (err) {
fail(err instanceof Error ? err : new Error(String(err)));
return;
}
closeStream();
resolveFullText(xhr.responseText);
}, 0);
};
xhr.onerror = function () {
setTimeout(() => {
fail(new TypeError("Network request failed"));
}, 0);
};
xhr.ontimeout = function () {
setTimeout(() => {
fail(new TypeError("Network request timed out"));
}, 0);
};
// Resolve with Response once headers arrive.
// Guard against status === 0 which XHR produces for CORS failures,
// DNS errors, and mixed-content blocks — let onerror handle those.
let resp: StreamingFetchResponse | null = null;
xhr.onreadystatechange = function () {
// Capture XHR state synchronously before deferring — XHR properties
// may change between now and when setTimeout fires.
const readyState = xhr.readyState;
const xhrStatus = xhr.status;
const xhrStatusText = xhr.statusText;
const rawHeaders = xhr.getAllResponseHeaders() || "";
setTimeout(() => {
// Safety net: if XHR completed but we never resolved/rejected, fail explicitly.
// This can happen when status === 0 and onerror doesn't fire (some RN networking impls).
if (readyState === 4 && !settled && !resp) {
fail(
new TypeError(
`Network request to ${url} completed with status ${xhrStatus} but no response was produced. ` +
`This may indicate a CORS failure, DNS error, or React Native networking issue.`,
),
);
return;
}
if (readyState >= 2 && !resp && xhrStatus !== 0) {
const respHeaders: Record<string, string> = {};
for (const line of rawHeaders.trim().split("\r\n")) {
const idx = line.indexOf(": ");
if (idx > 0) {
respHeaders[line.slice(0, idx).toLowerCase()] = line.slice(
idx + 2,
);
}
}
const responseHeaders = new Headers(respHeaders);
let bodyUsed = false;
resp = {
// Duck-typed Response object (not a native Response instance)
ok: xhrStatus >= 200 && xhrStatus < 300,
status: xhrStatus,
statusText: xhrStatusText,
url: url,
type: "basic",
redirected: false,
get bodyUsed() {
return bodyUsed;
},
headers: responseHeaders,
body: stream,
json: async () => {
bodyUsed = true;
const text = await fullTextPromise;
try {
return JSON.parse(text);
} catch (e) {
throw new TypeError(
`Failed to parse JSON from ${method} ${url} (status ${xhrStatus}): ${
text.length > 200 ? text.slice(0, 200) + "..." : text
}`,
);
}
},
text: async () => {
bodyUsed = true;
return fullTextPromise;
},
arrayBuffer: async () => {
bodyUsed = true;
return encoder.encode(await fullTextPromise).buffer;
},
blob: async () => {
bodyUsed = true;
const buf = encoder.encode(await fullTextPromise);
if (typeof Blob !== "undefined") {
return new Blob([buf], {
type: responseHeaders.get("content-type") || "",
});
}
throw new Error(
"Blob is not available in this React Native environment.",
);
},
clone: () => {
throw new Error(
"Response.clone() is not supported by the React Native streaming fetch polyfill.",
);
},
formData: async () => {
throw new Error(
"Response.formData() is not supported by the React Native streaming fetch polyfill.",
);
},
};
settled = true;
// NOTE: abort listener is NOT removed here — the signal must remain
// wired to xhr.abort() for mid-stream cancellation. Cleanup happens
// in terminal handlers (onload, onerror, ontimeout) or onAbort itself.
resolve(resp as unknown as Response);
}
}, 0);
};
xhr.send(body ?? null);
});
};
// Expose original fetch for opt-out (e.g., third-party libs that need native behavior)
(streamingFetch as any).__originalFetch = originalFetch;
global.fetch = streamingFetch;
}