/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
*--------------------------------------------------------------------------------------------*/
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using System.Buffers;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Globalization;
using System.Reflection;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Text.Json.Serialization.Metadata;
using System.Text.Unicode;
namespace GitHub.Copilot;
///
/// A lightweight JSON-RPC 2.0 implementation covering only the features used
/// by this SDK to talk to the Copilot CLI. Messages are framed using the
/// LSP-style header convention (Content-Length: N\r\n\r\n followed by
/// N bytes of JSON body) — the same wire format used by the Language Server
/// Protocol and the Copilot CLI's other language SDKs (Go, Node, Python).
/// This is not a general-purpose JSON-RPC stack: it is narrowly scoped to the
/// methods, transports, and framing the CLI uses.
///
internal sealed partial class JsonRpc : IDisposable
{
private const int ErrorCodeMethodNotFound = -32601;
private const int ErrorCodeInternalError = -32603;
private readonly Stream _sendStream;
private readonly Stream _receiveStream;
private readonly JsonSerializerOptions _serializerOptions;
private readonly ILogger _logger;
private readonly ConcurrentDictionary _pendingRequests = new();
private readonly ConcurrentDictionary _methods = new();
private readonly TaskCompletionSource _completionSource = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly SemaphoreSlim _writeLock = new(1, 1);
private readonly CancellationTokenSource _disposeCts = new();
private long _nextId;
private bool _disposed;
///
/// Initializes a new .
///
/// The stream to write outgoing messages to.
/// The stream to read incoming messages from.
/// JSON serializer options (should include all needed source-gen contexts).
/// Optional logger for diagnostics.
public JsonRpc(Stream sendStream, Stream receiveStream, JsonSerializerOptions serializerOptions, ILogger? logger = null)
{
_sendStream = sendStream;
_receiveStream = receiveStream;
_serializerOptions = serializerOptions;
_logger = logger ?? NullLogger.Instance;
}
///
/// A that completes when the connection is closed or faulted.
///
public Task Completion => _completionSource.Task;
///
/// Begins reading messages from the receive stream. Call once after registering all method handlers.
///
public void StartListening()
{
_ = ReadLoopAsync(_disposeCts.Token);
}
///
/// Sends a JSON-RPC request and waits for the response.
///
/// The JSON-RPC method name.
/// Positional arguments for the call.
/// Cancellation token.
///
/// Optional callback invoked synchronously from the read loop after the
/// response is parsed but before the awaiter resumes. Use this when you
/// need to mutate client-side state (for example, register a server-assigned
/// session id) before any subsequent notification on the same connection is
/// dispatched. The callback receives the raw JSON-RPC result element.
/// If the callback throws, the exception is propagated to the awaiter.
///
public async Task InvokeAsync(string method, object?[]? args, CancellationToken cancellationToken, Action? onResponseInline = null)
{
var timingTimestamp = Stopwatch.GetTimestamp();
var id = Interlocked.Increment(ref _nextId);
var pending = new PendingRequest(onResponseInline);
_pendingRequests[id] = pending;
CancellationTokenRegistration cancelRegistration = default;
try
{
if (cancellationToken.CanBeCanceled)
{
cancelRegistration = cancellationToken.Register(static state =>
{
var (self, reqId, ct) = ((JsonRpc, long, CancellationToken))state!;
if (self._pendingRequests.TryRemove(reqId, out var p))
{
p.TrySetCanceled(ct);
}
// Best-effort cancel notification
_ = self.SendCancelNotificationAsync(reqId);
}, (this, id, cancellationToken));
}
// Send request message
await SendMessageAsync(new JsonRpcRequest
{
Id = id,
Method = method,
Params = SerializeArgs(args),
}, JsonRpcWireContext.Default.JsonRpcRequest, cancellationToken).ConfigureAwait(false);
var responseElement = await pending.Task.ConfigureAwait(false);
if (responseElement.ValueKind == JsonValueKind.Null || responseElement.ValueKind == JsonValueKind.Undefined)
{
LogInvokeTiming(LogLevel.Debug, null, method, id, "Succeeded", timingTimestamp);
return default!;
}
var result = (T)responseElement.Deserialize(_serializerOptions.GetTypeInfo(typeof(T)))!;
LogInvokeTiming(LogLevel.Debug, null, method, id, "Succeeded", timingTimestamp);
return result;
}
catch (OperationCanceledException ex)
{
LogInvokeTiming(LogLevel.Debug, ex, method, id, "Canceled", timingTimestamp);
throw;
}
catch (Exception ex)
{
LogInvokeTiming(LogLevel.Warning, ex, method, id, "Failed", timingTimestamp);
throw;
}
finally
{
_pendingRequests.TryRemove(id, out _);
await cancelRegistration.DisposeAsync().ConfigureAwait(false);
}
}
private void LogInvokeTiming(
LogLevel level,
Exception? exception,
string method,
long requestId,
string status,
long startTimestamp)
{
if (!_logger.IsEnabled(level))
{
return;
}
var elapsed = Stopwatch.GetElapsedTime(startTimestamp);
_logger.Log(
level,
exception,
"JsonRpc.InvokeAsync JSON-RPC request finished. Elapsed={Elapsed}, Method={Method}, RequestId={RequestId}, Status={Status}",
elapsed,
method,
requestId,
status);
}
///
/// Registers a method handler that receives positional parameters.
/// If singleObjectParam is false (the default), parameter names and types are inferred from the delegate's signature.
/// If singleObjectParam is true, the entire params object is deserialized as the handler's first parameter.
///
public void SetLocalRpcMethod(string methodName, Delegate handler, bool singleObjectParam = false)
{
_methods[methodName] = new(handler, singleObjectParam);
}
///
public void Dispose()
{
if (_disposed)
{
return;
}
_disposed = true;
_disposeCts.Cancel();
// Fail all pending requests
foreach (var kvp in _pendingRequests)
{
if (_pendingRequests.TryRemove(kvp.Key, out var pending))
{
pending.TrySetException(new ObjectDisposedException(nameof(JsonRpc)));
}
}
_completionSource.TrySetResult();
_writeLock.Dispose();
}
private async Task SendMessageAsync(T message, JsonTypeInfo typeInfo, CancellationToken cancellationToken)
{
// "Content-Length: " (16) + max int digits (10) + "\r\n\r\n" (4)
const int MaxHeaderLength = 30;
var json = JsonSerializer.SerializeToUtf8Bytes(message, typeInfo);
var headerBuf = ArrayPool.Shared.Rent(MaxHeaderLength);
bool wrote = Utf8.TryWrite(headerBuf, $"Content-Length: {json.Length}\r\n\r\n", out int headerLen);
Debug.Assert(wrote && headerLen > 0);
// Cancellation only applies to *waiting* for the write lock. Once we hold the lock
// and start writing a framed message, we must finish it — cancelling between the
// header and the body (or mid-body) would leave the peer waiting for N body bytes
// that never arrive, desynchronizing the LSP-style stream for every subsequent
// message on this connection.
await _writeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
await _sendStream.WriteAsync(headerBuf.AsMemory(0, headerLen), CancellationToken.None).ConfigureAwait(false);
await _sendStream.WriteAsync(json, CancellationToken.None).ConfigureAwait(false);
await _sendStream.FlushAsync(CancellationToken.None).ConfigureAwait(false);
}
finally
{
_writeLock.Release();
ArrayPool.Shared.Return(headerBuf);
}
}
private async Task ReadLoopAsync(CancellationToken cancellationToken)
{
var buffer = new byte[256];
int carried = 0; // bytes in buffer carried over from previous read
try
{
while (!cancellationToken.IsCancellationRequested)
{
// Read headers and body
var (contentLength, buf, newCarried) = await ReadMessageAsync(buffer, carried, cancellationToken).ConfigureAwait(false);
if (contentLength < 0)
{
break; // Stream ended
}
// Keep the (possibly grown) buffer and carry-over count for next iteration
buffer = buf;
carried = newCarried;
// Parse the raw JSON. Body is at buffer[0..contentLength], carried bytes
// for the next message are at buffer[contentLength..contentLength+carried].
JsonElement? message = null;
try
{
using var doc = JsonDocument.Parse(buffer.AsMemory(0, contentLength));
message = doc.RootElement.Clone();
}
catch (JsonException ex)
{
_logger.LogWarning(ex, "Failed to parse incoming JSON-RPC message");
}
// Always move carried bytes to the front, even on parse failure — otherwise
// the next ReadMessageAsync call would scan stale body bytes as headers.
// This must happen AFTER parsing because the carried region overlaps where
// the body lived.
if (carried > 0)
{
Buffer.BlockCopy(buffer, contentLength, buffer, 0, carried);
}
if (message is not { } parsed)
{
continue;
}
// Route the message
if (parsed.TryGetProperty("id", out var idProp) && !parsed.TryGetProperty("method", out _))
{
// It's a response to one of our requests
HandleResponse(parsed, idProp);
}
else if (parsed.TryGetProperty("method", out var methodProp) && methodProp.GetString() is string methodName)
{
_ = HandleIncomingMethodAsync(methodName, parsed, cancellationToken);
}
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Normal shutdown
}
catch (Exception ex)
{
_logger.LogDebug(ex, "JSON-RPC read loop ended");
}
finally
{
// Fail all pending requests
foreach (var kvp in _pendingRequests)
{
if (_pendingRequests.TryRemove(kvp.Key, out var pending))
{
pending.TrySetException(new ConnectionLostException());
}
}
_completionSource.TrySetResult();
}
}
///
/// Reads headers and body in one pass.
/// On return, body is at buffer[0..ContentLength], and any overflow bytes
/// from the next message are at buffer[ContentLength..ContentLength+Carried].
/// The caller must move the carried bytes to the front before the next call.
///
/// Shared buffer (may be grown).
/// Bytes already in buffer[0..carried] from a previous read.
/// Cancellation token.
private async ValueTask<(int ContentLength, byte[] Buffer, int Carried)> ReadMessageAsync(byte[] buffer, int carried, CancellationToken cancellationToken)
{
// Read until we find the \r\n\r\n header terminator.
// carried bytes are already at buffer[0..carried].
int filled = carried;
int headerEnd = -1; // index of first byte after \r\n\r\n
// Check carried bytes first for a header terminator
{
int pos = buffer.AsSpan(0, filled).IndexOf("\r\n\r\n"u8);
if (pos >= 0)
{
headerEnd = pos + 4;
}
}
while (headerEnd < 0)
{
if (filled == buffer.Length)
{
Array.Resize(ref buffer, buffer.Length * 2);
}
int bytesRead = await _receiveStream.ReadAsync(buffer.AsMemory(filled, buffer.Length - filled), cancellationToken).ConfigureAwait(false);
if (bytesRead == 0)
{
// Clean EOF only if we haven't started a frame; otherwise the peer truncated mid-header.
if (filled == 0)
{
return (-1, buffer, 0);
}
throw new EndOfStreamException("Stream ended while reading JSON-RPC headers.");
}
filled += bytesRead;
// Scan for \r\n\r\n starting from where a match could begin
int scanStart = Math.Max(filled - bytesRead - 3, 0);
int pos = buffer.AsSpan(scanStart, filled - scanStart).IndexOf("\r\n\r\n"u8);
if (pos >= 0)
{
headerEnd = scanStart + pos + 4;
}
}
// Parse Content-Length. LSP framing puts each header on its own \r\n-terminated
// line; we walk the lines and require an exact "Content-Length: " prefix at the
// start of one of them. A substring match anywhere in the header block would
// false-positive on values like "X-Trace: Content-Length: 5" and desync the stream.
// A missing or unparsable Content-Length means the framing is broken — there's
// no safe way to resync, so throw and let the read loop terminate the connection.
int contentLength = -1;
ReadOnlySpan prefix = "Content-Length: "u8;
// headerEnd points just past the \r\n\r\n terminator. Drop only the trailing
// empty line's \r\n; each remaining header line is still \r\n-terminated and
// gets split out by the IndexOf below.
var headerLines = buffer.AsSpan(0, headerEnd - 2);
while (!headerLines.IsEmpty)
{
int lineEnd = headerLines.IndexOf("\r\n"u8);
ReadOnlySpan line = lineEnd >= 0 ? headerLines.Slice(0, lineEnd) : headerLines;
if (line.StartsWith(prefix) &&
(contentLength >= 0 ||
!int.TryParse(line.Slice(prefix.Length), NumberStyles.None, CultureInfo.InvariantCulture, out contentLength) ||
contentLength < 0))
{
throw new InvalidDataException("JSON-RPC frame has a missing, duplicate, or invalid Content-Length header.");
}
headerLines = lineEnd >= 0 ? headerLines.Slice(lineEnd + 2) : default;
}
if (contentLength < 0)
{
throw new InvalidDataException("JSON-RPC frame is missing the Content-Length header.");
}
// Bytes after the header that we already have
int extraBytes = filled - headerEnd;
// Ensure buffer is large enough for the body and any overflow already read.
int needed = Math.Max(contentLength, extraBytes);
if (needed > buffer.Length)
{
var newBuffer = new byte[needed];
Buffer.BlockCopy(buffer, headerEnd, newBuffer, 0, extraBytes);
buffer = newBuffer;
}
else if (extraBytes > 0)
{
Buffer.BlockCopy(buffer, headerEnd, buffer, 0, extraBytes);
}
// Read remaining body bytes if we don't have enough
if (extraBytes < contentLength)
{
await _receiveStream.ReadExactlyAsync(buffer.AsMemory(extraBytes, contentLength - extraBytes), cancellationToken).ConfigureAwait(false);
return (contentLength, buffer, 0);
}
// We read more than the body — overflow belongs to the next message
int overflow = extraBytes - contentLength;
return (contentLength, buffer, overflow);
}
private void HandleResponse(JsonElement message, JsonElement idProp)
{
if (!idProp.TryGetInt64(out long id))
{
return;
}
if (!_pendingRequests.TryRemove(id, out var pending))
{
return;
}
if (message.TryGetProperty("error", out var errorProp))
{
var errorMessage = errorProp.TryGetProperty("message", out var msgProp)
? msgProp.GetString() ?? "Unknown error"
: "Unknown error";
var errorCode = errorProp.TryGetProperty("code", out var codeProp) && codeProp.ValueKind == JsonValueKind.Number
? codeProp.GetInt32()
: 0;
var errorData = errorProp.TryGetProperty("data", out var dataProp)
? dataProp
: (JsonElement?)null;
pending.TrySetException(new RemoteRpcException(errorMessage, errorCode, errorData));
}
else if (message.TryGetProperty("result", out var resultProp))
{
var cloned = resultProp.Clone();
if (pending.OnResultInline is { } inline)
{
// Run the inline callback synchronously in the read loop so any
// state it mutates (e.g. session registration) is visible before
// the read loop dispatches the next message.
try
{
inline(cloned);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Inline response callback for request {RequestId} threw", id);
pending.TrySetException(ex);
return;
}
}
pending.TrySetResult(cloned);
}
else
{
// Per JSON-RPC 2.0, a response must have either "result" or "error".
// Treat missing result as null result.
pending.TrySetResult(default);
}
}
private async Task HandleIncomingMethodAsync(string methodName, JsonElement message, CancellationToken cancellationToken)
{
try
{
JsonElement? requestId = null;
if (message.TryGetProperty("id", out var idProp))
{
requestId = idProp;
}
if (!_methods.TryGetValue(methodName, out var registration))
{
if (requestId.HasValue)
{
await SendErrorResponseAsync(requestId.Value, ErrorCodeMethodNotFound, $"Method not found: {methodName}", cancellationToken).ConfigureAwait(false);
}
return;
}
message.TryGetProperty("params", out var paramsProp);
try
{
var result = await InvokeHandlerAsync(registration, paramsProp, cancellationToken).ConfigureAwait(false);
if (requestId.HasValue)
{
await SendResultResponseAsync(requestId.Value, result, cancellationToken).ConfigureAwait(false);
}
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
var actual = ex is TargetInvocationException tie && tie.InnerException != null ? tie.InnerException : ex;
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Error handling JSON-RPC method {Method}: {Error}", methodName, actual.Message);
}
if (requestId.HasValue)
{
if (actual is LocalRpcInvocationException lre)
{
await SendErrorResponseAsync(requestId.Value, lre.Code, lre.Message, lre.Data, cancellationToken).ConfigureAwait(false);
}
else
{
await SendErrorResponseAsync(requestId.Value, ErrorCodeInternalError, actual.Message, cancellationToken).ConfigureAwait(false);
}
}
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Normal shutdown — cancellation propagated from the read loop.
}
catch (Exception ex)
{
// Belt-and-braces: this method is fire-and-forget from the read loop, so any
// exception escaping here would become an unobserved task exception. The most
// likely sources are IOException/ObjectDisposedException from sending the error
// response after the underlying transport is gone.
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug(ex, "Unobserved error in JSON-RPC method dispatch for {Method}", methodName);
}
}
}
private async ValueTask