-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Expand file tree
/
Copy pathResponseStream.swift
More file actions
45 lines (38 loc) · 1.35 KB
/
ResponseStream.swift
File metadata and controls
45 lines (38 loc) · 1.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import Foundation
struct ResponseStream<Chunk>: AsyncSequence {
func makeAsyncIterator() -> Stream.AsyncIterator {
stream.makeAsyncIterator()
}
typealias Stream = AsyncThrowingStream<Chunk, Error>
typealias AsyncIterator = Stream.AsyncIterator
typealias Element = Chunk
struct LineContent {
let chunk: Chunk?
let done: Bool
}
let stream: Stream
init(result: URLSession.AsyncBytes, lineExtractor: @escaping (String) throws -> LineContent) {
stream = AsyncThrowingStream<Chunk, Error> { continuation in
let task = Task {
do {
for try await line in result.lines {
if Task.isCancelled { break }
let content = try lineExtractor(line)
if let chunk = content.chunk {
continuation.yield(chunk)
}
if content.done { break }
}
continuation.finish()
} catch {
continuation.finish(throwing: error)
result.task.cancel()
}
}
continuation.onTermination = { _ in
task.cancel()
result.task.cancel()
}
}
}
}