Skip to content

Prevent sync EventBus subscribers from aborting stream delivery #81

Description

@Justinabox

Summary

EventBus.emit() assumes every subscribed handler returns a coroutine and passes the return value directly to asyncio.create_task(). If a user or test registers a synchronous callback (for example lambda e: seen.append(e)), the callback side effect runs, then create_task(None) raises TypeError before the event is delivered to any EventStream queues.

This makes one sync subscriber capable of aborting event fanout for unrelated consumers. In PBX/SMS deployments, that can make monitoring streams miss events if an integration callback is accidentally synchronous.

Root cause / affected code

  • callstack/events/bus.py:29-31 accepts a generic Callable in subscribe() without validating async-only handlers.
  • callstack/events/bus.py:39-46 calls asyncio.create_task(fn(event)) before queue delivery. A sync function returns None, so create_task() raises immediately and the queue fanout loop is skipped.
  • tests/test_modem.py:235, :249, and :271 already use sync lambda subscribers, but those paths hide the failure because the synchronous side effect happens before TypeError and some URC dispatch paths swallow the exception.

Evidence

Baseline gates were healthy before this scout:

git diff --check
# exit 0

PYTHONPATH=. uv run --no-project --with pytest --with pytest-asyncio --with pytest-aiohttp --with pyserial-asyncio --with aiosqlite pytest tests/ -q
# 419 passed in 6.80s

Minimal no-hardware reproduction on main (47853962d46a67074b2b356d78cdf9fdc788cda5):

PYTHONPATH=. uv run --no-project --with pyserial-asyncio python -c 'import asyncio
from callstack.events.bus import EventBus
from callstack.events.types import RingEvent
async def main():
    bus=EventBus()
    seen=[]
    bus.subscribe(RingEvent, lambda e: seen.append(type(e).__name__))
    async with bus.stream(RingEvent) as stream:
        try:
            await bus.emit(RingEvent())
            raised=None
        except Exception as exc:
            raised=f"{type(exc).__name__}: {exc}"
        queued=await stream.next(timeout=0.05)
        print({"sync_handler_seen": seen, "emit_raised": raised, "stream_received": queued is not None})
asyncio.run(main())'

Observed output:

{'sync_handler_seen': ['RingEvent'], 'emit_raised': 'TypeError: a coroutine was expected, got None', 'stream_received': False}

Expected behavior

One bad or synchronous subscriber should not prevent streams/queues and other subscribers from receiving the event. Either:

  1. subscribe()/on() should reject non-async handlers with a clear error before runtime fanout, or
  2. emit() should support synchronous callbacks safely and continue queue delivery.

Actual behavior

The sync callback runs, then emit() raises TypeError; because the exception happens before queue fanout, EventStream.next() times out and misses the RingEvent.

Suggested fix direction

Prefer a small, explicit contract hardening slice:

  • Make the accepted handler type explicit (Callable[[T], Awaitable[None]]) and reject non-coroutine functions at subscribe()/on() time, or detect awaitables in emit() and run sync handlers defensively.
  • Ensure subscriber exceptions (including sync handler mistakes) are isolated per subscriber and do not block stream queue delivery.
  • Add regression coverage in tests/test_events.py for the chosen contract and for queue delivery when a subscriber fails.
  • Update the few tests that currently use sync lambda subscribers if the public contract remains async-only.

Acceptance criteria

  • A mistakenly synchronous subscriber cannot cause EventStream consumers to miss the emitted event.
  • Subscriber failure handling is deterministic and logged without aborting the entire fanout.
  • The public EventBus.subscribe()/on() async-vs-sync contract is enforced or documented by tests.
  • Existing async subscriber behavior remains unchanged.

Verification gates

git diff --check
PYTHONPATH=. uv run --no-project --with pytest --with pytest-asyncio --with pyserial-asyncio pytest tests/test_events.py tests/test_modem.py -q
PYTHONPATH=. uv run --no-project --with pytest --with pytest-asyncio --with pytest-aiohttp --with pyserial-asyncio --with aiosqlite pytest tests/ -q

Duplicate checks performed

gh issue list --repo Justinabox/Callstack --state all --search 'EventBus sync subscriber create_task TypeError in:title,body' --limit 20
# no results

gh issue list --repo Justinabox/Callstack --state all --search 'event bus subscriber lambda coroutine TypeError in:title,body' --limit 20
# no results

gh issue list --repo Justinabox/Callstack --state all --search 'sync event handler blocks EventStream queue in:title,body' --limit 20
# no results

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions