-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathJsonRpc.cs
More file actions
968 lines (861 loc) · 38.7 KB
/
JsonRpc.cs
File metadata and controls
968 lines (861 loc) · 38.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
*--------------------------------------------------------------------------------------------*/
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using System.Buffers;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Globalization;
using System.Reflection;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Text.Json.Serialization.Metadata;
using System.Text.Unicode;
namespace GitHub.Copilot;
/// <summary>
/// A lightweight JSON-RPC 2.0 implementation covering only the features used
/// by this SDK to talk to the Copilot CLI. Messages are framed using the
/// LSP-style header convention (<c>Content-Length: N\r\n\r\n</c> followed by
/// N bytes of JSON body) — the same wire format used by the Language Server
/// Protocol and the Copilot CLI's other language SDKs (Go, Node, Python).
/// This is not a general-purpose JSON-RPC stack: it is narrowly scoped to the
/// methods, transports, and framing the CLI uses.
/// </summary>
internal sealed partial class JsonRpc : IDisposable
{
private const int ErrorCodeMethodNotFound = -32601;
private const int ErrorCodeInternalError = -32603;
private readonly Stream _sendStream;
private readonly Stream _receiveStream;
private readonly JsonSerializerOptions _serializerOptions;
private readonly ILogger _logger;
private readonly ConcurrentDictionary<long, PendingRequest> _pendingRequests = new();
private readonly ConcurrentDictionary<string, MethodRegistration> _methods = new();
private readonly TaskCompletionSource _completionSource = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly SemaphoreSlim _writeLock = new(1, 1);
private readonly CancellationTokenSource _disposeCts = new();
private long _nextId;
private bool _disposed;
/// <summary>
/// Initializes a new <see cref="JsonRpc"/>.
/// </summary>
/// <param name="sendStream">The stream to write outgoing messages to.</param>
/// <param name="receiveStream">The stream to read incoming messages from.</param>
/// <param name="serializerOptions">JSON serializer options (should include all needed source-gen contexts).</param>
/// <param name="logger">Optional logger for diagnostics.</param>
public JsonRpc(Stream sendStream, Stream receiveStream, JsonSerializerOptions serializerOptions, ILogger? logger = null)
{
_sendStream = sendStream;
_receiveStream = receiveStream;
_serializerOptions = serializerOptions;
_logger = logger ?? NullLogger.Instance;
}
/// <summary>
/// A <see cref="Task"/> that completes when the connection is closed or faulted.
/// </summary>
public Task Completion => _completionSource.Task;
/// <summary>
/// Begins reading messages from the receive stream. Call once after registering all method handlers.
/// </summary>
public void StartListening()
{
_ = ReadLoopAsync(_disposeCts.Token);
}
/// <summary>
/// Sends a JSON-RPC request and waits for the response.
/// </summary>
/// <param name="method">The JSON-RPC method name.</param>
/// <param name="args">Positional arguments for the call.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <param name="onResponseInline">
/// Optional callback invoked synchronously from the read loop after the
/// response is parsed but before the awaiter resumes. Use this when you
/// need to mutate client-side state (for example, register a server-assigned
/// session id) before any subsequent notification on the same connection is
/// dispatched. The callback receives the raw JSON-RPC <c>result</c> element.
/// If the callback throws, the exception is propagated to the awaiter.
/// </param>
public async Task<T> InvokeAsync<T>(string method, object?[]? args, CancellationToken cancellationToken, Action<JsonElement>? onResponseInline = null)
{
var timingTimestamp = Stopwatch.GetTimestamp();
var id = Interlocked.Increment(ref _nextId);
var pending = new PendingRequest(onResponseInline);
_pendingRequests[id] = pending;
CancellationTokenRegistration cancelRegistration = default;
try
{
if (cancellationToken.CanBeCanceled)
{
cancelRegistration = cancellationToken.Register(static state =>
{
var (self, reqId, ct) = ((JsonRpc, long, CancellationToken))state!;
if (self._pendingRequests.TryRemove(reqId, out var p))
{
p.TrySetCanceled(ct);
}
// Best-effort cancel notification
_ = self.SendCancelNotificationAsync(reqId);
}, (this, id, cancellationToken));
}
// Send request message
await SendMessageAsync(new JsonRpcRequest
{
Id = id,
Method = method,
Params = SerializeArgs(args),
}, JsonRpcWireContext.Default.JsonRpcRequest, cancellationToken).ConfigureAwait(false);
var responseElement = await pending.Task.ConfigureAwait(false);
if (responseElement.ValueKind == JsonValueKind.Null || responseElement.ValueKind == JsonValueKind.Undefined)
{
LogInvokeTiming(LogLevel.Debug, null, method, id, "Succeeded", timingTimestamp);
return default!;
}
var result = (T)responseElement.Deserialize(_serializerOptions.GetTypeInfo(typeof(T)))!;
LogInvokeTiming(LogLevel.Debug, null, method, id, "Succeeded", timingTimestamp);
return result;
}
catch (OperationCanceledException ex)
{
LogInvokeTiming(LogLevel.Debug, ex, method, id, "Canceled", timingTimestamp);
throw;
}
catch (Exception ex)
{
LogInvokeTiming(LogLevel.Warning, ex, method, id, "Failed", timingTimestamp);
throw;
}
finally
{
_pendingRequests.TryRemove(id, out _);
await cancelRegistration.DisposeAsync().ConfigureAwait(false);
}
}
private void LogInvokeTiming(
LogLevel level,
Exception? exception,
string method,
long requestId,
string status,
long startTimestamp)
{
if (!_logger.IsEnabled(level))
{
return;
}
var elapsed = Stopwatch.GetElapsedTime(startTimestamp);
_logger.Log(
level,
exception,
"JsonRpc.InvokeAsync JSON-RPC request finished. Elapsed={Elapsed}, Method={Method}, RequestId={RequestId}, Status={Status}",
elapsed,
method,
requestId,
status);
}
/// <summary>
/// Registers a method handler that receives positional parameters.
/// If singleObjectParam is false (the default), parameter names and types are inferred from the delegate's signature.
/// If singleObjectParam is true, the entire params object is deserialized as the handler's first parameter.
/// </summary>
public void SetLocalRpcMethod(string methodName, Delegate handler, bool singleObjectParam = false)
{
_methods[methodName] = new(handler, singleObjectParam);
}
/// <inheritdoc />
public void Dispose()
{
if (_disposed)
{
return;
}
_disposed = true;
_disposeCts.Cancel();
// Fail all pending requests
foreach (var kvp in _pendingRequests)
{
if (_pendingRequests.TryRemove(kvp.Key, out var pending))
{
pending.TrySetException(new ObjectDisposedException(nameof(JsonRpc)));
}
}
_completionSource.TrySetResult();
_writeLock.Dispose();
}
private async Task SendMessageAsync<T>(T message, JsonTypeInfo<T> typeInfo, CancellationToken cancellationToken)
{
// "Content-Length: " (16) + max int digits (10) + "\r\n\r\n" (4)
const int MaxHeaderLength = 30;
var json = JsonSerializer.SerializeToUtf8Bytes(message, typeInfo);
var headerBuf = ArrayPool<byte>.Shared.Rent(MaxHeaderLength);
bool wrote = Utf8.TryWrite(headerBuf, $"Content-Length: {json.Length}\r\n\r\n", out int headerLen);
Debug.Assert(wrote && headerLen > 0);
// Cancellation only applies to *waiting* for the write lock. Once we hold the lock
// and start writing a framed message, we must finish it — cancelling between the
// header and the body (or mid-body) would leave the peer waiting for N body bytes
// that never arrive, desynchronizing the LSP-style stream for every subsequent
// message on this connection.
await _writeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
await _sendStream.WriteAsync(headerBuf.AsMemory(0, headerLen), CancellationToken.None).ConfigureAwait(false);
await _sendStream.WriteAsync(json, CancellationToken.None).ConfigureAwait(false);
await _sendStream.FlushAsync(CancellationToken.None).ConfigureAwait(false);
}
finally
{
_writeLock.Release();
ArrayPool<byte>.Shared.Return(headerBuf);
}
}
private async Task ReadLoopAsync(CancellationToken cancellationToken)
{
var buffer = new byte[256];
int carried = 0; // bytes in buffer carried over from previous read
try
{
while (!cancellationToken.IsCancellationRequested)
{
// Read headers and body
var (contentLength, buf, newCarried) = await ReadMessageAsync(buffer, carried, cancellationToken).ConfigureAwait(false);
if (contentLength < 0)
{
break; // Stream ended
}
// Keep the (possibly grown) buffer and carry-over count for next iteration
buffer = buf;
carried = newCarried;
// Parse the raw JSON. Body is at buffer[0..contentLength], carried bytes
// for the next message are at buffer[contentLength..contentLength+carried].
JsonElement? message = null;
try
{
using var doc = JsonDocument.Parse(buffer.AsMemory(0, contentLength));
message = doc.RootElement.Clone();
}
catch (JsonException ex)
{
_logger.LogWarning(ex, "Failed to parse incoming JSON-RPC message");
}
// Always move carried bytes to the front, even on parse failure — otherwise
// the next ReadMessageAsync call would scan stale body bytes as headers.
// This must happen AFTER parsing because the carried region overlaps where
// the body lived.
if (carried > 0)
{
Buffer.BlockCopy(buffer, contentLength, buffer, 0, carried);
}
if (message is not { } parsed)
{
continue;
}
// Route the message
if (parsed.TryGetProperty("id", out var idProp) && !parsed.TryGetProperty("method", out _))
{
// It's a response to one of our requests
HandleResponse(parsed, idProp);
}
else if (parsed.TryGetProperty("method", out var methodProp) && methodProp.GetString() is string methodName)
{
_ = HandleIncomingMethodAsync(methodName, parsed, cancellationToken);
}
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Normal shutdown
}
catch (Exception ex)
{
_logger.LogDebug(ex, "JSON-RPC read loop ended");
}
finally
{
// Fail all pending requests
foreach (var kvp in _pendingRequests)
{
if (_pendingRequests.TryRemove(kvp.Key, out var pending))
{
pending.TrySetException(new ConnectionLostException());
}
}
_completionSource.TrySetResult();
}
}
/// <summary>
/// Reads headers and body in one pass.
/// On return, body is at buffer[0..ContentLength], and any overflow bytes
/// from the next message are at buffer[ContentLength..ContentLength+Carried].
/// The caller must move the carried bytes to the front before the next call.
/// </summary>
/// <param name="buffer">Shared buffer (may be grown).</param>
/// <param name="carried">Bytes already in buffer[0..carried] from a previous read.</param>
/// <param name="cancellationToken">Cancellation token.</param>
private async ValueTask<(int ContentLength, byte[] Buffer, int Carried)> ReadMessageAsync(byte[] buffer, int carried, CancellationToken cancellationToken)
{
// Read until we find the \r\n\r\n header terminator.
// carried bytes are already at buffer[0..carried].
int filled = carried;
int headerEnd = -1; // index of first byte after \r\n\r\n
// Check carried bytes first for a header terminator
{
int pos = buffer.AsSpan(0, filled).IndexOf("\r\n\r\n"u8);
if (pos >= 0)
{
headerEnd = pos + 4;
}
}
while (headerEnd < 0)
{
if (filled == buffer.Length)
{
Array.Resize(ref buffer, buffer.Length * 2);
}
int bytesRead = await _receiveStream.ReadAsync(buffer.AsMemory(filled, buffer.Length - filled), cancellationToken).ConfigureAwait(false);
if (bytesRead == 0)
{
// Clean EOF only if we haven't started a frame; otherwise the peer truncated mid-header.
if (filled == 0)
{
return (-1, buffer, 0);
}
throw new EndOfStreamException("Stream ended while reading JSON-RPC headers.");
}
filled += bytesRead;
// Scan for \r\n\r\n starting from where a match could begin
int scanStart = Math.Max(filled - bytesRead - 3, 0);
int pos = buffer.AsSpan(scanStart, filled - scanStart).IndexOf("\r\n\r\n"u8);
if (pos >= 0)
{
headerEnd = scanStart + pos + 4;
}
}
// Parse Content-Length. LSP framing puts each header on its own \r\n-terminated
// line; we walk the lines and require an exact "Content-Length: " prefix at the
// start of one of them. A substring match anywhere in the header block would
// false-positive on values like "X-Trace: Content-Length: 5" and desync the stream.
// A missing or unparsable Content-Length means the framing is broken — there's
// no safe way to resync, so throw and let the read loop terminate the connection.
int contentLength = -1;
ReadOnlySpan<byte> prefix = "Content-Length: "u8;
// headerEnd points just past the \r\n\r\n terminator. Drop only the trailing
// empty line's \r\n; each remaining header line is still \r\n-terminated and
// gets split out by the IndexOf below.
var headerLines = buffer.AsSpan(0, headerEnd - 2);
while (!headerLines.IsEmpty)
{
int lineEnd = headerLines.IndexOf("\r\n"u8);
ReadOnlySpan<byte> line = lineEnd >= 0 ? headerLines.Slice(0, lineEnd) : headerLines;
if (line.StartsWith(prefix) &&
(contentLength >= 0 ||
!int.TryParse(line.Slice(prefix.Length), NumberStyles.None, CultureInfo.InvariantCulture, out contentLength) ||
contentLength < 0))
{
throw new InvalidDataException("JSON-RPC frame has a missing, duplicate, or invalid Content-Length header.");
}
headerLines = lineEnd >= 0 ? headerLines.Slice(lineEnd + 2) : default;
}
if (contentLength < 0)
{
throw new InvalidDataException("JSON-RPC frame is missing the Content-Length header.");
}
// Bytes after the header that we already have
int extraBytes = filled - headerEnd;
// Ensure buffer is large enough for the body and any overflow already read.
int needed = Math.Max(contentLength, extraBytes);
if (needed > buffer.Length)
{
var newBuffer = new byte[needed];
Buffer.BlockCopy(buffer, headerEnd, newBuffer, 0, extraBytes);
buffer = newBuffer;
}
else if (extraBytes > 0)
{
Buffer.BlockCopy(buffer, headerEnd, buffer, 0, extraBytes);
}
// Read remaining body bytes if we don't have enough
if (extraBytes < contentLength)
{
await _receiveStream.ReadExactlyAsync(buffer.AsMemory(extraBytes, contentLength - extraBytes), cancellationToken).ConfigureAwait(false);
return (contentLength, buffer, 0);
}
// We read more than the body — overflow belongs to the next message
int overflow = extraBytes - contentLength;
return (contentLength, buffer, overflow);
}
private void HandleResponse(JsonElement message, JsonElement idProp)
{
if (!idProp.TryGetInt64(out long id))
{
return;
}
if (!_pendingRequests.TryRemove(id, out var pending))
{
return;
}
if (message.TryGetProperty("error", out var errorProp))
{
var errorMessage = errorProp.TryGetProperty("message", out var msgProp)
? msgProp.GetString() ?? "Unknown error"
: "Unknown error";
var errorCode = errorProp.TryGetProperty("code", out var codeProp) && codeProp.ValueKind == JsonValueKind.Number
? codeProp.GetInt32()
: 0;
var errorData = errorProp.TryGetProperty("data", out var dataProp)
? dataProp
: (JsonElement?)null;
pending.TrySetException(new RemoteRpcException(errorMessage, errorCode, errorData));
}
else if (message.TryGetProperty("result", out var resultProp))
{
var cloned = resultProp.Clone();
if (pending.OnResultInline is { } inline)
{
// Run the inline callback synchronously in the read loop so any
// state it mutates (e.g. session registration) is visible before
// the read loop dispatches the next message.
try
{
inline(cloned);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Inline response callback for request {RequestId} threw", id);
pending.TrySetException(ex);
return;
}
}
pending.TrySetResult(cloned);
}
else
{
// Per JSON-RPC 2.0, a response must have either "result" or "error".
// Treat missing result as null result.
pending.TrySetResult(default);
}
}
private async Task HandleIncomingMethodAsync(string methodName, JsonElement message, CancellationToken cancellationToken)
{
try
{
JsonElement? requestId = null;
if (message.TryGetProperty("id", out var idProp))
{
requestId = idProp;
}
if (!_methods.TryGetValue(methodName, out var registration))
{
if (requestId.HasValue)
{
await SendErrorResponseAsync(requestId.Value, ErrorCodeMethodNotFound, $"Method not found: {methodName}", cancellationToken).ConfigureAwait(false);
}
return;
}
message.TryGetProperty("params", out var paramsProp);
try
{
var result = await InvokeHandlerAsync(registration, paramsProp, cancellationToken).ConfigureAwait(false);
if (requestId.HasValue)
{
await SendResultResponseAsync(requestId.Value, result, cancellationToken).ConfigureAwait(false);
}
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
var actual = ex is TargetInvocationException tie && tie.InnerException != null ? tie.InnerException : ex;
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Error handling JSON-RPC method {Method}: {Error}", methodName, actual.Message);
}
if (requestId.HasValue)
{
if (actual is LocalRpcInvocationException lre)
{
await SendErrorResponseAsync(requestId.Value, lre.Code, lre.Message, lre.Data, cancellationToken).ConfigureAwait(false);
}
else
{
await SendErrorResponseAsync(requestId.Value, ErrorCodeInternalError, actual.Message, cancellationToken).ConfigureAwait(false);
}
}
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Normal shutdown — cancellation propagated from the read loop.
}
catch (Exception ex)
{
// Belt-and-braces: this method is fire-and-forget from the read loop, so any
// exception escaping here would become an unobserved task exception. The most
// likely sources are IOException/ObjectDisposedException from sending the error
// response after the underlying transport is gone.
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug(ex, "Unobserved error in JSON-RPC method dispatch for {Method}", methodName);
}
}
}
private async ValueTask<object?> InvokeHandlerAsync(MethodRegistration registration, JsonElement paramsProp, CancellationToken cancellationToken)
{
var parameters = registration.Parameters;
// Build argument list
var invokeArgs = new object?[parameters.Length];
if (registration.SingleObjectParam)
{
// Single-object deserialization: entire `params` → first parameter.
// Every singleObjectParam handler has shape (TRequest, CancellationToken),
// so `params` must be a JSON object.
if (paramsProp.ValueKind != JsonValueKind.Object)
{
throw new InvalidOperationException(
$"Expected JSON object for `params` of single-object-param handler; got '{paramsProp.ValueKind}'.");
}
for (int i = 0; i < parameters.Length; i++)
{
if (parameters[i].ParameterType == typeof(CancellationToken))
{
invokeArgs[i] = cancellationToken;
}
else if (i == 0)
{
invokeArgs[i] = paramsProp.Deserialize(_serializerOptions.GetTypeInfo(parameters[i].ParameterType));
}
}
}
else if (paramsProp.ValueKind == JsonValueKind.Array)
{
// Positional parameters. Optional params (with defaults) are filled when absent.
int jsonIndex = 0;
int arrayLength = paramsProp.GetArrayLength();
for (int i = 0; i < parameters.Length; i++)
{
if (parameters[i].ParameterType == typeof(CancellationToken))
{
invokeArgs[i] = cancellationToken;
}
else if (jsonIndex < arrayLength)
{
invokeArgs[i] = paramsProp[jsonIndex].Deserialize(_serializerOptions.GetTypeInfo(parameters[i].ParameterType));
jsonIndex++;
}
else
{
invokeArgs[i] = parameters[i].HasDefaultValue ? parameters[i].DefaultValue : null;
}
}
}
else if (paramsProp.ValueKind == JsonValueKind.Object)
{
// Named parameters. The CLI sends notifications/requests as a JSON object whose
// property names match the handler's parameter names (camelCased per web defaults).
// Look up each parameter by name; missing optional parameters fall back to defaults.
for (int i = 0; i < parameters.Length; i++)
{
if (parameters[i].ParameterType == typeof(CancellationToken))
{
invokeArgs[i] = cancellationToken;
}
else if (parameters[i].Name is { } paramName &&
TryGetPropertyCaseInsensitive(paramsProp, paramName, out var valueProp))
{
invokeArgs[i] = valueProp.Deserialize(_serializerOptions.GetTypeInfo(parameters[i].ParameterType));
}
else
{
invokeArgs[i] = parameters[i].HasDefaultValue ? parameters[i].DefaultValue : null;
}
}
}
else
{
// Missing/null `params` for a handler with required positional parameters is a
// protocol violation. Surface it as an error rather than silently filling defaults.
throw new InvalidOperationException(
$"Unsupported JSON-RPC params shape '{paramsProp.ValueKind}' for handler with positional parameters.");
}
// Invoke
var result = registration.Handler.DynamicInvoke(invokeArgs);
// Handlers return one of: a synchronous value, Task (void async), or ValueTask<T>.
if (result is Task task)
{
// Task<T> handlers are not supported — use ValueTask<T> for results.
Debug.Assert(!task.GetType().IsGenericType, "Task<T> handlers are not supported; use ValueTask<T>.");
await task.ConfigureAwait(false);
return null;
}
if (result is not null && registration.ValueTaskAsTaskMethod is { } valueTaskAsTaskMethod)
{
var asTask = (Task)valueTaskAsTaskMethod.Invoke(result, null)!;
await asTask.ConfigureAwait(false);
return registration.TaskResultGetter!.Invoke(asTask, null);
}
return result;
}
private static bool TryGetPropertyCaseInsensitive(JsonElement obj, string name, out JsonElement value)
{
// Fast path: exact match. The CLI uses camelCase property names that match the
// C# parameter names exactly, so this should hit in the common case.
if (obj.TryGetProperty(name, out value))
{
return true;
}
foreach (var prop in obj.EnumerateObject())
{
if (string.Equals(prop.Name, name, StringComparison.OrdinalIgnoreCase))
{
value = prop.Value;
return true;
}
}
value = default;
return false;
}
private JsonElement? SerializeArgs(object?[]? args)
{
if (args is null || args.Length == 0)
{
return null;
}
// The Copilot CLI uses vscode-jsonrpc-style request handlers, which expect
// `params` to be the single request object (not wrapped in a positional array).
// The other SDKs (Node, Python, Go) all send single-object params, and every
// generated call site here passes exactly one request object. For the rare
// multi-arg case, fall back to a positional array.
if (args.Length == 1)
{
var arg = args[0];
if (arg is null)
{
return null;
}
var typeInfo = _serializerOptions.GetTypeInfo(arg.GetType());
return JsonSerializer.SerializeToElement(arg, typeInfo);
}
// Source-generated JsonSerializerOptions do not provide metadata for object[],
// so build the JSON array manually, serializing each element with a TypeInfo
// looked up by its runtime type from the merged resolver.
var buffer = new ArrayBufferWriter<byte>();
using (var writer = new Utf8JsonWriter(buffer))
{
writer.WriteStartArray();
foreach (var arg in args)
{
if (arg is null)
{
writer.WriteNullValue();
}
else
{
var typeInfo = _serializerOptions.GetTypeInfo(arg.GetType());
JsonSerializer.Serialize(writer, arg, typeInfo);
}
}
writer.WriteEndArray();
}
using var doc = JsonDocument.Parse(buffer.WrittenMemory);
return doc.RootElement.Clone();
}
private async Task SendResultResponseAsync(JsonElement id, object? result, CancellationToken cancellationToken)
{
try
{
// Convert the result to a JsonElement using the runtime type, looked up via
// the merged resolver. Source-gen serialization of an `object`-typed property
// would otherwise have no way to find metadata for the actual response type
// (e.g. SystemMessageTransformRpcResponse, SessionFsReadFileResult, ...).
JsonElement? resultElement = null;
if (result is not null)
{
var typeInfo = _serializerOptions.GetTypeInfo(result.GetType());
resultElement = JsonSerializer.SerializeToElement(result, typeInfo);
}
await SendMessageAsync(new JsonRpcResponse
{
Id = id,
Result = resultElement,
}, JsonRpcWireContext.Default.JsonRpcResponse, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex) when (ex is IOException or ObjectDisposedException or OperationCanceledException)
{
// Connection lost during response — nothing we can do
}
}
private async Task SendErrorResponseAsync(JsonElement id, int code, string message, CancellationToken cancellationToken)
=> await SendErrorResponseAsync(id, code, message, data: null, cancellationToken).ConfigureAwait(false);
private async Task SendErrorResponseAsync(JsonElement id, int code, string message, JsonElement? data, CancellationToken cancellationToken)
{
try
{
await SendMessageAsync(new JsonRpcErrorResponse
{
Id = id,
Error = new JsonRpcError { Code = code, Message = message, Data = data },
}, JsonRpcWireContext.Default.JsonRpcErrorResponse, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex) when (ex is IOException or ObjectDisposedException or OperationCanceledException)
{
// Connection lost during error response — nothing we can do
}
}
private async Task SendCancelNotificationAsync(long requestId)
{
try
{
await SendMessageAsync(new JsonRpcNotification
{
Method = "$/cancelRequest",
Params = JsonSerializer.SerializeToElement(
new CancelRequestParams { Id = requestId },
CancelRequestParamsContext.Default.CancelRequestParams),
}, JsonRpcWireContext.Default.JsonRpcNotification, CancellationToken.None).ConfigureAwait(false);
}
catch (Exception ex) when (ex is IOException or ObjectDisposedException or OperationCanceledException)
{
// Best effort — connection may already be gone
}
}
private sealed class PendingRequest(Action<JsonElement>? onResultInline = null) : TaskCompletionSource<JsonElement>(TaskCreationOptions.RunContinuationsAsynchronously)
{
/// <summary>
/// Optional callback invoked synchronously from the read loop after the
/// response is parsed but before the awaiter resumes. Used to perform
/// state changes that must happen before any subsequent notification on
/// the same connection is dispatched (e.g. registering a session whose
/// id was assigned by the server in the response).
/// </summary>
public Action<JsonElement>? OnResultInline { get; } = onResultInline;
}
private static readonly MethodInfo s_taskGetResult = typeof(Task<>).GetProperty(nameof(Task<int>.Result), BindingFlags.Instance | BindingFlags.Public)!.GetMethod!;
private static readonly MethodInfo s_valueTaskAsTask = typeof(ValueTask<>).GetMethod(nameof(ValueTask<int>.AsTask), BindingFlags.Instance | BindingFlags.Public)!;
private sealed class MethodRegistration
{
public MethodRegistration(Delegate handler, bool singleObjectParam)
{
Handler = handler;
SingleObjectParam = singleObjectParam;
Parameters = handler.Method.GetParameters();
var returnType = handler.Method.ReturnType;
if (returnType.IsGenericType && returnType.GetGenericTypeDefinition() == typeof(ValueTask<>))
{
ValueTaskAsTaskMethod = GetMethodFromGenericMethodDefinition(returnType, s_valueTaskAsTask);
TaskResultGetter = GetMethodFromGenericMethodDefinition(ValueTaskAsTaskMethod.ReturnType, s_taskGetResult);
}
}
public Delegate Handler { get; }
public bool SingleObjectParam { get; }
public ParameterInfo[] Parameters { get; }
public MethodInfo? ValueTaskAsTaskMethod { get; }
public MethodInfo? TaskResultGetter { get; }
}
private static MethodInfo GetMethodFromGenericMethodDefinition(Type specializedType, MethodInfo genericMethodDefinition)
{
Debug.Assert(
specializedType.IsGenericType && specializedType.GetGenericTypeDefinition() == genericMethodDefinition.DeclaringType,
"Generic member definition doesn't match type.");
#if NET8_0_OR_GREATER
return (MethodInfo)specializedType.GetMemberWithSameMetadataDefinitionAs(genericMethodDefinition);
#else
const BindingFlags All = BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Static | BindingFlags.Instance;
return specializedType.GetMethods(All).First(m => m.MetadataToken == genericMethodDefinition.MetadataToken);
#endif
}
[JsonSourceGenerationOptions(
JsonSerializerDefaults.Web,
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull)]
[JsonSerializable(typeof(JsonRpcRequest))]
[JsonSerializable(typeof(JsonRpcResponse))]
[JsonSerializable(typeof(JsonRpcErrorResponse))]
[JsonSerializable(typeof(JsonRpcNotification))]
private partial class JsonRpcWireContext : JsonSerializerContext;
private sealed class JsonRpcRequest
{
[JsonPropertyName("jsonrpc")]
public string Jsonrpc { get; } = "2.0";
[JsonPropertyName("id")]
public long Id { get; set; }
[JsonPropertyName("method")]
public string Method { get; set; } = string.Empty;
[JsonPropertyName("params")]
public JsonElement? Params { get; set; }
}
private sealed class JsonRpcResponse
{
[JsonPropertyName("jsonrpc")]
public string Jsonrpc { get; } = "2.0";
[JsonPropertyName("id")]
public JsonElement Id { get; set; }
// JSON-RPC 2.0 requires every response to carry either `result` or `error`.
// vscode-jsonrpc (used by the CLI) rejects responses that have neither with
// "The received response has neither a result nor an error property", so we
// must emit `result: null` for void-returning handlers — overriding the
// context-level WhenWritingNull policy.
[JsonPropertyName("result")]
[JsonIgnore(Condition = JsonIgnoreCondition.Never)]
public JsonElement? Result { get; set; }
}
private sealed class JsonRpcErrorResponse
{
[JsonPropertyName("jsonrpc")]
public string Jsonrpc { get; } = "2.0";
[JsonPropertyName("id")]
public JsonElement Id { get; set; }
[JsonPropertyName("error")]
public JsonRpcError? Error { get; set; }
}
private sealed class JsonRpcError
{
[JsonPropertyName("code")]
public int Code { get; set; }
[JsonPropertyName("message")]
public string Message { get; set; } = string.Empty;
[JsonPropertyName("data")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public JsonElement? Data { get; set; }
}
private sealed class JsonRpcNotification
{
[JsonPropertyName("jsonrpc")]
public string Jsonrpc { get; } = "2.0";
[JsonPropertyName("method")]
public string Method { get; set; } = string.Empty;
[JsonPropertyName("params")]
public JsonElement? Params { get; set; }
}
private sealed class CancelRequestParams
{
[JsonPropertyName("id")]
public long Id { get; set; }
}
[JsonSerializable(typeof(CancelRequestParams))]
private partial class CancelRequestParamsContext : JsonSerializerContext;
}
/// <summary>
/// Thrown when the JSON-RPC connection is lost unexpectedly.
/// </summary>
internal sealed class ConnectionLostException() : IOException("The JSON-RPC connection was lost.");
/// <summary>
/// Thrown when the remote side returns a JSON-RPC error response.
/// </summary>
internal sealed class RemoteRpcException(string message, int errorCode, JsonElement? errorData = null, Exception? innerException = null) : Exception(message, innerException)
{
/// <summary>JSON-RPC 2.0 reserved error code: requested method does not exist.</summary>
public const int MethodNotFoundErrorCode = -32601;
public int ErrorCode { get; } = errorCode;
public JsonElement? ErrorData { get; } = errorData.HasValue ? errorData.Value.Clone() : null;
}
/// <summary>
/// Allows handler methods registered via <c>JsonRpcConnection.SetLocalRpcMethod</c>
/// to surface a structured JSON-RPC error response (code, message, and optional
/// <c>data</c> payload) instead of the default <c>ErrorCodeInternalError</c> envelope.
/// </summary>
internal sealed class LocalRpcInvocationException : Exception
{
public LocalRpcInvocationException(int code, string message, JsonElement? data = null) : base(message)
{
Code = code;
Data = data;
}
public int Code { get; }
public new JsonElement? Data { get; }
}