Skip to content

Commit 7ac0b8a

Browse files
stephentoubCopilot
andcommitted
Serialize event dispatch in .NET and Go SDKs
Both the .NET and Go SDKs previously invoked user event handlers inline on the transport reader thread/goroutine. This meant a slow handler would block all message processing on the connection, and in Go it could deadlock when a broadcast handler (tool call, permission request) issued an RPC request back through the same reader. This PR decouples user handler dispatch from the transport by routing events through a channel (Go) / Channel<T> (.NET). A single consumer goroutine/task drains the channel and invokes user handlers serially, in FIFO order. This matches the guarantees provided by the Node.js and Python SDKs (which get natural serialization from their single-threaded event loops) while fitting Go's and .NET's multi-threaded runtimes. Broadcast handlers (tool calls, permission requests) are fired as fire-and-forget directly from the dispatch entry point, outside the channel, so a stalled handler cannot block event delivery. This matches the existing Node.js (\�oid this._executeToolAndRespond()\) and Python (\�syncio.ensure_future()\) behavior. Go changes: - Add eventCh channel to Session; start processEvents consumer goroutine - dispatchEvent enqueues to channel and fires broadcast handler goroutine - Close channel on Disconnect to stop the consumer - Update unit tests and E2E tests for async delivery .NET changes: - Add unbounded Channel<SessionEvent> to CopilotSession; start ProcessEventsAsync consumer task in constructor - DispatchEvent enqueues to channel and fires broadcast handler task - Complete channel on DisposeAsync - Per-handler error catching via ImmutableArray iteration - Cache handler array snapshot to avoid repeated allocation - Inline broadcast error handling into HandleBroadcastEventAsync - Update Should_Receive_Session_Events test to await async delivery - Add Handler_Exception_Does_Not_Halt_Event_Delivery test - Add DisposeAsync_From_Handler_Does_Not_Deadlock test Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 062b61c commit 7ac0b8a

File tree

6 files changed

+378
-127
lines changed

6 files changed

+378
-127
lines changed

dotnet/src/Client.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ public async Task<CopilotSession> CreateSessionAsync(SessionConfig config, Cance
407407

408408
// Create and register the session before issuing the RPC so that
409409
// events emitted by the CLI (e.g. session.start) are not dropped.
410-
var session = new CopilotSession(sessionId, connection.Rpc);
410+
var session = new CopilotSession(sessionId, connection.Rpc, _logger);
411411
session.RegisterTools(config.Tools ?? []);
412412
session.RegisterPermissionHandler(config.OnPermissionRequest);
413413
if (config.OnUserInputRequest != null)
@@ -511,7 +511,7 @@ public async Task<CopilotSession> ResumeSessionAsync(string sessionId, ResumeSes
511511

512512
// Create and register the session before issuing the RPC so that
513513
// events emitted by the CLI (e.g. session.start) are not dropped.
514-
var session = new CopilotSession(sessionId, connection.Rpc);
514+
var session = new CopilotSession(sessionId, connection.Rpc, _logger);
515515
session.RegisterTools(config.Tools ?? []);
516516
session.RegisterPermissionHandler(config.OnPermissionRequest);
517517
if (config.OnUserInputRequest != null)

dotnet/src/Session.cs

Lines changed: 112 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@
22
* Copyright (c) Microsoft Corporation. All rights reserved.
33
*--------------------------------------------------------------------------------------------*/
44

5+
using GitHub.Copilot.SDK.Rpc;
56
using Microsoft.Extensions.AI;
7+
using Microsoft.Extensions.Logging;
68
using StreamJsonRpc;
9+
using System.Collections.Immutable;
710
using System.Text.Json;
811
using System.Text.Json.Nodes;
912
using System.Text.Json.Serialization;
10-
using GitHub.Copilot.SDK.Rpc;
13+
using System.Threading.Channels;
1114

1215
namespace GitHub.Copilot.SDK;
1316

@@ -52,22 +55,27 @@ namespace GitHub.Copilot.SDK;
5255
/// </example>
5356
public sealed partial class CopilotSession : IAsyncDisposable
5457
{
55-
/// <summary>
56-
/// Multicast delegate used as a thread-safe, insertion-ordered handler list.
57-
/// The compiler-generated add/remove accessors use a lock-free CAS loop over the backing field.
58-
/// Dispatch reads the field once (inherent snapshot, no allocation).
59-
/// Expected handler count is small (typically 1–3), so Delegate.Combine/Remove cost is negligible.
60-
/// </summary>
61-
private event SessionEventHandler? EventHandlers;
6258
private readonly Dictionary<string, AIFunction> _toolHandlers = [];
6359
private readonly JsonRpc _rpc;
60+
private readonly ILogger _logger;
61+
6462
private volatile PermissionRequestHandler? _permissionHandler;
6563
private volatile UserInputHandler? _userInputHandler;
64+
private ImmutableArray<SessionEventHandler> _eventHandlers = ImmutableArray<SessionEventHandler>.Empty;
65+
6666
private SessionHooks? _hooks;
6767
private readonly SemaphoreSlim _hooksLock = new(1, 1);
6868
private SessionRpc? _sessionRpc;
6969
private int _isDisposed;
7070

71+
/// <summary>
72+
/// Channel that serializes event dispatch. <see cref="DispatchEvent"/> enqueues;
73+
/// a single background consumer (<see cref="ProcessEventsAsync"/>) dequeues and
74+
/// invokes handlers one at a time, preserving arrival order.
75+
/// </summary>
76+
private readonly Channel<SessionEvent> _eventChannel = Channel.CreateUnbounded<SessionEvent>(
77+
new() { SingleReader = true });
78+
7179
/// <summary>
7280
/// Gets the unique identifier for this session.
7381
/// </summary>
@@ -93,15 +101,20 @@ public sealed partial class CopilotSession : IAsyncDisposable
93101
/// </summary>
94102
/// <param name="sessionId">The unique identifier for this session.</param>
95103
/// <param name="rpc">The JSON-RPC connection to the Copilot CLI.</param>
104+
/// <param name="logger">Logger for diagnostics.</param>
96105
/// <param name="workspacePath">The workspace path if infinite sessions are enabled.</param>
97106
/// <remarks>
98107
/// This constructor is internal. Use <see cref="CopilotClient.CreateSessionAsync"/> to create sessions.
99108
/// </remarks>
100-
internal CopilotSession(string sessionId, JsonRpc rpc, string? workspacePath = null)
109+
internal CopilotSession(string sessionId, JsonRpc rpc, ILogger logger, string? workspacePath = null)
101110
{
102111
SessionId = sessionId;
103112
_rpc = rpc;
113+
_logger = logger;
104114
WorkspacePath = workspacePath;
115+
116+
// Start the asynchronous processing loop.
117+
_ = ProcessEventsAsync();
105118
}
106119

107120
private Task<T> InvokeRpcAsync<T>(string method, object?[]? args, CancellationToken cancellationToken)
@@ -236,7 +249,9 @@ void Handler(SessionEvent evt)
236249
/// Multiple handlers can be registered and will all receive events.
237250
/// </para>
238251
/// <para>
239-
/// Handler exceptions are allowed to propagate so they are not lost.
252+
/// Handlers are invoked serially in event-arrival order on a background thread.
253+
/// A handler will never be called concurrently with itself or with other handlers
254+
/// on the same session.
240255
/// </para>
241256
/// </remarks>
242257
/// <example>
@@ -259,27 +274,53 @@ void Handler(SessionEvent evt)
259274
/// </example>
260275
public IDisposable On(SessionEventHandler handler)
261276
{
262-
EventHandlers += handler;
263-
return new ActionDisposable(() => EventHandlers -= handler);
277+
ImmutableInterlocked.Update(ref _eventHandlers, array => array.Add(handler));
278+
return new ActionDisposable(() => ImmutableInterlocked.Update(ref _eventHandlers, array => array.Remove(handler)));
264279
}
265280

266281
/// <summary>
267-
/// Dispatches an event to all registered handlers.
282+
/// Enqueues an event for serial dispatch to all registered handlers.
268283
/// </summary>
269284
/// <param name="sessionEvent">The session event to dispatch.</param>
270285
/// <remarks>
271-
/// This method is internal. Handler exceptions are allowed to propagate so they are not lost.
272-
/// Broadcast request events (external_tool.requested, permission.requested) are handled
273-
/// internally before being forwarded to user handlers.
286+
/// This method is non-blocking. Broadcast request events (external_tool.requested,
287+
/// permission.requested) are fired concurrently so that a stalled handler does not
288+
/// block event delivery. The event is then placed into an in-memory channel and
289+
/// processed by a single background consumer (<see cref="ProcessEventsAsync"/>),
290+
/// which guarantees user handlers see events one at a time, in order.
274291
/// </remarks>
275292
internal void DispatchEvent(SessionEvent sessionEvent)
276293
{
277-
// Handle broadcast request events (protocol v3) before dispatching to user handlers.
278-
// Fire-and-forget: the response is sent asynchronously via RPC.
279-
HandleBroadcastEventAsync(sessionEvent);
294+
// Fire broadcast work concurrently (fire-and-forget with error logging).
295+
// This is done outside the channel so broadcast handlers don't block the
296+
// consumer loop — important when a secondary client's handler intentionally
297+
// never completes (multi-client permission scenario).
298+
_ = HandleBroadcastEventAsync(sessionEvent);
299+
300+
// Queue the event for serial processing by user handlers.
301+
_eventChannel.Writer.TryWrite(sessionEvent);
302+
}
280303

281-
// Reading the field once gives us a snapshot; delegates are immutable.
282-
EventHandlers?.Invoke(sessionEvent);
304+
/// <summary>
305+
/// Single-reader consumer loop that processes events from the channel.
306+
/// Ensures user event handlers are invoked serially and in FIFO order.
307+
/// </summary>
308+
private async Task ProcessEventsAsync()
309+
{
310+
await foreach (var sessionEvent in _eventChannel.Reader.ReadAllAsync())
311+
{
312+
foreach (var handler in _eventHandlers)
313+
{
314+
try
315+
{
316+
handler(sessionEvent);
317+
}
318+
catch (Exception ex)
319+
{
320+
LogEventHandlerError(ex);
321+
}
322+
}
323+
}
283324
}
284325

285326
/// <summary>
@@ -355,37 +396,44 @@ internal async Task<PermissionRequestResult> HandlePermissionRequestAsync(JsonEl
355396
/// Implements the protocol v3 broadcast model where tool calls and permission requests
356397
/// are broadcast as session events to all clients.
357398
/// </summary>
358-
private async void HandleBroadcastEventAsync(SessionEvent sessionEvent)
399+
private async Task HandleBroadcastEventAsync(SessionEvent sessionEvent)
359400
{
360-
switch (sessionEvent)
401+
try
361402
{
362-
case ExternalToolRequestedEvent toolEvent:
363-
{
364-
var data = toolEvent.Data;
365-
if (string.IsNullOrEmpty(data.RequestId) || string.IsNullOrEmpty(data.ToolName))
366-
return;
367-
368-
var tool = GetTool(data.ToolName);
369-
if (tool is null)
370-
return; // This client doesn't handle this tool; another client will.
371-
372-
await ExecuteToolAndRespondAsync(data.RequestId, data.ToolName, data.ToolCallId, data.Arguments, tool);
373-
break;
374-
}
375-
376-
case PermissionRequestedEvent permEvent:
377-
{
378-
var data = permEvent.Data;
379-
if (string.IsNullOrEmpty(data.RequestId) || data.PermissionRequest is null)
380-
return;
381-
382-
var handler = _permissionHandler;
383-
if (handler is null)
384-
return; // This client doesn't handle permissions; another client will.
385-
386-
await ExecutePermissionAndRespondAsync(data.RequestId, data.PermissionRequest, handler);
387-
break;
388-
}
403+
switch (sessionEvent)
404+
{
405+
case ExternalToolRequestedEvent toolEvent:
406+
{
407+
var data = toolEvent.Data;
408+
if (string.IsNullOrEmpty(data.RequestId) || string.IsNullOrEmpty(data.ToolName))
409+
return;
410+
411+
var tool = GetTool(data.ToolName);
412+
if (tool is null)
413+
return; // This client doesn't handle this tool; another client will.
414+
415+
await ExecuteToolAndRespondAsync(data.RequestId, data.ToolName, data.ToolCallId, data.Arguments, tool);
416+
break;
417+
}
418+
419+
case PermissionRequestedEvent permEvent:
420+
{
421+
var data = permEvent.Data;
422+
if (string.IsNullOrEmpty(data.RequestId) || data.PermissionRequest is null)
423+
return;
424+
425+
var handler = _permissionHandler;
426+
if (handler is null)
427+
return; // This client doesn't handle permissions; another client will.
428+
429+
await ExecutePermissionAndRespondAsync(data.RequestId, data.PermissionRequest, handler);
430+
break;
431+
}
432+
}
433+
}
434+
catch (Exception ex) when (ex is not OperationCanceledException)
435+
{
436+
LogBroadcastHandlerError(ex);
389437
}
390438
}
391439

@@ -703,6 +751,11 @@ public async Task LogAsync(string message, SessionLogRequestLevel? level = null,
703751
/// <returns>A task representing the dispose operation.</returns>
704752
/// <remarks>
705753
/// <para>
754+
/// The caller should ensure the session is idle (e.g., <see cref="SendAndWaitAsync"/>
755+
/// has returned) before disposing. If the session is not idle, in-flight event handlers
756+
/// or tool handlers may observe failures.
757+
/// </para>
758+
/// <para>
706759
/// Session state on disk (conversation history, planning state, artifacts) is
707760
/// preserved, so the conversation can be resumed later by calling
708761
/// <see cref="CopilotClient.ResumeSessionAsync"/> with the session ID. To
@@ -731,6 +784,8 @@ public async ValueTask DisposeAsync()
731784
return;
732785
}
733786

787+
_eventChannel.Writer.TryComplete();
788+
734789
try
735790
{
736791
await InvokeRpcAsync<object>(
@@ -745,12 +800,18 @@ await InvokeRpcAsync<object>(
745800
// Connection is broken or closed
746801
}
747802

748-
EventHandlers = null;
803+
_eventHandlers = ImmutableInterlocked.InterlockedExchange(ref _eventHandlers, ImmutableArray<SessionEventHandler>.Empty);
749804
_toolHandlers.Clear();
750805

751806
_permissionHandler = null;
752807
}
753808

809+
[LoggerMessage(Level = LogLevel.Error, Message = "Unhandled exception in broadcast event handler")]
810+
private partial void LogBroadcastHandlerError(Exception exception);
811+
812+
[LoggerMessage(Level = LogLevel.Error, Message = "Unhandled exception in session event handler")]
813+
private partial void LogEventHandlerError(Exception exception);
814+
754815
internal record SendMessageRequest
755816
{
756817
public string SessionId { get; init; } = string.Empty;

dotnet/test/SessionTests.cs

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,18 +249,40 @@ public async Task Should_Receive_Session_Events()
249249
// session.start is emitted during the session.create RPC; if the session
250250
// weren't registered in the sessions map before the RPC, it would be dropped.
251251
var earlyEvents = new List<SessionEvent>();
252+
var sessionStartReceived = new TaskCompletionSource<bool>();
252253
var session = await CreateSessionAsync(new SessionConfig
253254
{
254-
OnEvent = evt => earlyEvents.Add(evt),
255+
OnEvent = evt =>
256+
{
257+
earlyEvents.Add(evt);
258+
if (evt is SessionStartEvent)
259+
sessionStartReceived.TrySetResult(true);
260+
},
255261
});
256262

263+
// session.start is dispatched asynchronously via the event channel;
264+
// wait briefly for the consumer to deliver it.
265+
var started = await Task.WhenAny(sessionStartReceived.Task, Task.Delay(TimeSpan.FromSeconds(5)));
266+
Assert.Equal(sessionStartReceived.Task, started);
257267
Assert.Contains(earlyEvents, evt => evt is SessionStartEvent);
258268

259269
var receivedEvents = new List<SessionEvent>();
260270
var idleReceived = new TaskCompletionSource<bool>();
271+
var concurrentCount = 0;
272+
var maxConcurrent = 0;
261273

262274
session.On(evt =>
263275
{
276+
// Track concurrent handler invocations to verify serial dispatch.
277+
var current = Interlocked.Increment(ref concurrentCount);
278+
var seenMax = Volatile.Read(ref maxConcurrent);
279+
if (current > seenMax)
280+
Interlocked.CompareExchange(ref maxConcurrent, current, seenMax);
281+
282+
Thread.Sleep(10);
283+
284+
Interlocked.Decrement(ref concurrentCount);
285+
264286
receivedEvents.Add(evt);
265287
if (evt is SessionIdleEvent)
266288
{
@@ -281,6 +303,9 @@ public async Task Should_Receive_Session_Events()
281303
Assert.Contains(receivedEvents, evt => evt is AssistantMessageEvent);
282304
Assert.Contains(receivedEvents, evt => evt is SessionIdleEvent);
283305

306+
// Events must be dispatched serially — never more than one handler invocation at a time.
307+
Assert.Equal(1, maxConcurrent);
308+
284309
// Verify the assistant response contains the expected answer
285310
var assistantMessage = await TestHelper.GetFinalAssistantMessageAsync(session);
286311
Assert.NotNull(assistantMessage);
@@ -452,6 +477,58 @@ await WaitForAsync(() =>
452477
Assert.Equal("notification", ephemeralEvent.Data.InfoType);
453478
}
454479

480+
[Fact]
481+
public async Task Handler_Exception_Does_Not_Halt_Event_Delivery()
482+
{
483+
await Ctx.ConfigureForTestAsync("session", "should_have_stateful_conversation");
484+
485+
var session = await CreateSessionAsync();
486+
var eventCount = 0;
487+
var gotIdle = new TaskCompletionSource();
488+
489+
session.On(evt =>
490+
{
491+
eventCount++;
492+
493+
// Throw on the first event to verify the loop keeps going.
494+
if (eventCount == 1)
495+
throw new InvalidOperationException("boom");
496+
497+
if (evt is SessionIdleEvent)
498+
gotIdle.TrySetResult();
499+
});
500+
501+
await session.SendAsync(new MessageOptions { Prompt = "What is 1+1?" });
502+
503+
await gotIdle.Task.WaitAsync(TimeSpan.FromSeconds(30));
504+
505+
// Handler saw more than just the first (throwing) event.
506+
Assert.True(eventCount > 1);
507+
}
508+
509+
[Fact]
510+
public async Task DisposeAsync_From_Handler_Does_Not_Deadlock()
511+
{
512+
await Ctx.ConfigureForTestAsync("session", "should_have_stateful_conversation");
513+
514+
var session = await CreateSessionAsync();
515+
var disposed = new TaskCompletionSource();
516+
517+
session.On(evt =>
518+
{
519+
if (evt is UserMessageEvent)
520+
{
521+
// Call DisposeAsync from within a handler — must not deadlock.
522+
session.DisposeAsync().AsTask().ContinueWith(_ => disposed.TrySetResult());
523+
}
524+
});
525+
526+
await session.SendAsync(new MessageOptions { Prompt = "What is 1+1?" });
527+
528+
// If this times out, we deadlocked.
529+
await disposed.Task.WaitAsync(TimeSpan.FromSeconds(10));
530+
}
531+
455532
private static async Task WaitForAsync(Func<bool> condition, TimeSpan timeout)
456533
{
457534
var deadline = DateTime.UtcNow + timeout;

0 commit comments

Comments
 (0)