Summary
EventBus.emit() assumes every subscribed handler returns a coroutine and passes the return value directly to asyncio.create_task(). If a user or test registers a synchronous callback (for example lambda e: seen.append(e)), the callback side effect runs, then create_task(None) raises TypeError before the event is delivered to any EventStream queues.
This makes one sync subscriber capable of aborting event fanout for unrelated consumers. In PBX/SMS deployments, that can make monitoring streams miss events if an integration callback is accidentally synchronous.
Root cause / affected code
callstack/events/bus.py:29-31 accepts a generic Callable in subscribe() without validating async-only handlers.
callstack/events/bus.py:39-46 calls asyncio.create_task(fn(event)) before queue delivery. A sync function returns None, so create_task() raises immediately and the queue fanout loop is skipped.
tests/test_modem.py:235, :249, and :271 already use sync lambda subscribers, but those paths hide the failure because the synchronous side effect happens before TypeError and some URC dispatch paths swallow the exception.
Evidence
Baseline gates were healthy before this scout:
git diff --check
# exit 0
PYTHONPATH=. uv run --no-project --with pytest --with pytest-asyncio --with pytest-aiohttp --with pyserial-asyncio --with aiosqlite pytest tests/ -q
# 419 passed in 6.80s
Minimal no-hardware reproduction on main (47853962d46a67074b2b356d78cdf9fdc788cda5):
PYTHONPATH=. uv run --no-project --with pyserial-asyncio python -c 'import asyncio
from callstack.events.bus import EventBus
from callstack.events.types import RingEvent
async def main():
bus=EventBus()
seen=[]
bus.subscribe(RingEvent, lambda e: seen.append(type(e).__name__))
async with bus.stream(RingEvent) as stream:
try:
await bus.emit(RingEvent())
raised=None
except Exception as exc:
raised=f"{type(exc).__name__}: {exc}"
queued=await stream.next(timeout=0.05)
print({"sync_handler_seen": seen, "emit_raised": raised, "stream_received": queued is not None})
asyncio.run(main())'
Observed output:
{'sync_handler_seen': ['RingEvent'], 'emit_raised': 'TypeError: a coroutine was expected, got None', 'stream_received': False}
Expected behavior
One bad or synchronous subscriber should not prevent streams/queues and other subscribers from receiving the event. Either:
subscribe()/on() should reject non-async handlers with a clear error before runtime fanout, or
emit() should support synchronous callbacks safely and continue queue delivery.
Actual behavior
The sync callback runs, then emit() raises TypeError; because the exception happens before queue fanout, EventStream.next() times out and misses the RingEvent.
Suggested fix direction
Prefer a small, explicit contract hardening slice:
- Make the accepted handler type explicit (
Callable[[T], Awaitable[None]]) and reject non-coroutine functions at subscribe()/on() time, or detect awaitables in emit() and run sync handlers defensively.
- Ensure subscriber exceptions (including sync handler mistakes) are isolated per subscriber and do not block stream queue delivery.
- Add regression coverage in
tests/test_events.py for the chosen contract and for queue delivery when a subscriber fails.
- Update the few tests that currently use sync lambda subscribers if the public contract remains async-only.
Acceptance criteria
Verification gates
git diff --check
PYTHONPATH=. uv run --no-project --with pytest --with pytest-asyncio --with pyserial-asyncio pytest tests/test_events.py tests/test_modem.py -q
PYTHONPATH=. uv run --no-project --with pytest --with pytest-asyncio --with pytest-aiohttp --with pyserial-asyncio --with aiosqlite pytest tests/ -q
Duplicate checks performed
gh issue list --repo Justinabox/Callstack --state all --search 'EventBus sync subscriber create_task TypeError in:title,body' --limit 20
# no results
gh issue list --repo Justinabox/Callstack --state all --search 'event bus subscriber lambda coroutine TypeError in:title,body' --limit 20
# no results
gh issue list --repo Justinabox/Callstack --state all --search 'sync event handler blocks EventStream queue in:title,body' --limit 20
# no results
Summary
EventBus.emit()assumes every subscribed handler returns a coroutine and passes the return value directly toasyncio.create_task(). If a user or test registers a synchronous callback (for examplelambda e: seen.append(e)), the callback side effect runs, thencreate_task(None)raisesTypeErrorbefore the event is delivered to anyEventStreamqueues.This makes one sync subscriber capable of aborting event fanout for unrelated consumers. In PBX/SMS deployments, that can make monitoring streams miss events if an integration callback is accidentally synchronous.
Root cause / affected code
callstack/events/bus.py:29-31accepts a genericCallableinsubscribe()without validating async-only handlers.callstack/events/bus.py:39-46callsasyncio.create_task(fn(event))before queue delivery. A sync function returnsNone, socreate_task()raises immediately and the queue fanout loop is skipped.tests/test_modem.py:235,:249, and:271already use sync lambda subscribers, but those paths hide the failure because the synchronous side effect happens beforeTypeErrorand some URC dispatch paths swallow the exception.Evidence
Baseline gates were healthy before this scout:
Minimal no-hardware reproduction on
main(47853962d46a67074b2b356d78cdf9fdc788cda5):Observed output:
Expected behavior
One bad or synchronous subscriber should not prevent streams/queues and other subscribers from receiving the event. Either:
subscribe()/on()should reject non-async handlers with a clear error before runtime fanout, oremit()should support synchronous callbacks safely and continue queue delivery.Actual behavior
The sync callback runs, then
emit()raisesTypeError; because the exception happens before queue fanout,EventStream.next()times out and misses theRingEvent.Suggested fix direction
Prefer a small, explicit contract hardening slice:
Callable[[T], Awaitable[None]]) and reject non-coroutine functions atsubscribe()/on()time, or detect awaitables inemit()and run sync handlers defensively.tests/test_events.pyfor the chosen contract and for queue delivery when a subscriber fails.Acceptance criteria
EventStreamconsumers to miss the emitted event.EventBus.subscribe()/on()async-vs-sync contract is enforced or documented by tests.Verification gates
Duplicate checks performed