Background
Follow-up to #2192 (foundation) and PR #2193 (pytest + Zuul infrastructure). Part of Tier 6 (#2199). osism/services/websocket_manager.py (271 LOC) implements EventMessage, per-connection event/node/service filters and a queue-based broadcaster for FastAPI WebSocket clients. osism/services/event_bridge.py (304 LOC) is a Redis-pub/sub bridge that forwards events from the RabbitMQ listener container to the WebSocket manager in the API container, with a local-queue fallback and reconnect logic.
Scope
Create the tests/unit/services/ package (__init__.py) plus tests/unit/services/test_websocket_manager.py and tests/unit/services/test_event_bridge.py. There is no existing coverage for either module (tests/unit/ currently only has commands/, tasks/, utils/).
New dev dependency required: most WebSocketManager methods are coroutines, so the tests need pytest-asyncio. The Pipfile [dev-packages] only contains pytest, pytest-cov and pytest-mock — add pytest-asyncio (pinned like the other dev packages). Either set asyncio_mode = auto in the [tool:pytest] section of setup.cfg or mark async tests explicitly with @pytest.mark.asyncio (the plugin registers the marker, so --strict-markers stays happy). No new runtime dependency is needed: fastapi and redis (via celery[redis]) come from requirements.txt, which the Zuul job installs via pipenv run pip install . (playbooks/test-unit.yml, install_requires = file: requirements.txt).
Note: osism/services/event_bridge.py instantiates a module-level singleton at import time (event_bridge.py:304) whose _init_redis() attempts a real Redis connection. The exception is caught (the import stays safe), but tests must construct fresh EventBridge instances with redis.Redis patched — see mocking hints. The _redis_subscriber_loop retry machinery is the trickiest part; if it turns out too brittle, this issue may be split further during implementation (as done in Tier 3).
Test targets
EventMessage — websocket_manager.py:15
No patching needed (pure class; sync tests).
- Constructor sets
event_type, source, data, node_name; node_name defaults to None
id is a valid UUID4 string; two instances get different ids
timestamp ends with "Z" and the prefix parses as ISO 8601 (do not assert an exact value — datetime.utcnow())
to_dict() (websocket_manager.py:32) returns exactly the keys id, timestamp, event_type, source, node_name, data
to_json() (websocket_manager.py:43) round-trips via json.loads back to to_dict()
WebSocketConnection.matches_filters() — websocket_manager.py:57
No patching needed (WebSocketConnection(MagicMock()) suffices; sync tests).
- No filters set →
True for any event (early return)
event_filters=["baremetal.node.power_set"]: matching event_type → True, non-matching → False
node_filters=["node1"]: event.node_name="node1" → True; node_name="other" → False; node_name=None → False (explicit is not None check)
service_filters=["baremetal"]: event_type="baremetal.node.power_set" → True (service = first dot segment); event_type="compute.instance.update" → False
event_type="" with service_filters=["unknown"] → True (falsy event_type maps to service "unknown")
- Combined filters are AND-ed: event filter matches but node filter does not →
False
WebSocketManager.connect() / disconnect() — websocket_manager.py:95 / websocket_manager.py:106
Use a MagicMock websocket with accept = AsyncMock(); patch WebSocketManager._broadcast_events (or asyncio.create_task at osism.services.websocket_manager.asyncio.create_task) so no real broadcaster loop starts, and always instantiate a fresh WebSocketManager() inside the async test (do not reuse the module-level singleton websocket_manager.py:271).
connect() awaits websocket.accept() and registers a WebSocketConnection in self.connections
- First
connect() starts the broadcaster task; a second connect() while the task is running does not start another one (done() check)
disconnect() removes the connection; disconnecting an unknown websocket is a no-op (pop(..., None))
WebSocketManager.update_filters() — websocket_manager.py:114
- Sets
event_filters / node_filters / service_filters when passed
- Arguments left as
None keep the existing lists (partial update); passing [] clears a filter list
- Unknown websocket → silent no-op, no exception
WebSocketManager.add_event() / send_heartbeat() — websocket_manager.py:136 / websocket_manager.py:258
add_event() puts the event on event_queue (qsize() == 1, get_nowait() returns it)
send_heartbeat() with no connections → returns early, queue stays empty
send_heartbeat() with a registered connection → queues an EventMessage with event_type="heartbeat", source="osism", data={"message": "ping"}
WebSocketManager.broadcast_event_from_notification() — websocket_manager.py:140
No patching needed beyond a fresh manager; inspect the queued EventMessage via event_queue.get_nowait().
baremetal.* with "ironic_object.data" → node_name from name, resource_id from uuid
compute.* / nova.* with "nova_object.data" → node_name prefers host over name; falls back to name when host missing
network.* / neutron.* with "neutron_object.data" → resource_id from id (fallback uuid), node_name from name (fallback device_id)
volume.* with "cinder_object.data" → resource_id id/uuid, node_name name/display_name
image.* with "glance_object.data" and identity.* with "keystone_object.data" → analogous
- Known service type but expected payload key missing (e.g.
baremetal without "ironic_object.data") → node_name/resource_id stay None, event still queued
- Unknown service type (
"foo.bar") → no extraction, event queued with service_type="foo"
event_type="" → service_type="unknown"
- Queued event:
source="openstack", data contains the original payload keys plus service_type and resource_id; the caller's payload dict is not mutated (payload.copy())
- Exception path: pass a payload without
.copy()/.get semantics (e.g. a list) → exception caught, error logged, nothing queued
WebSocketManager._broadcast_events() — websocket_manager.py:204
Insert fake connections directly into manager.connections (bypassing connect()), pre-fill the queue, then run task = asyncio.create_task(manager._broadcast_events()), give it a tick (await asyncio.sleep(0.05)), assert, and task.cancel() + await it in a finally block.
- Event matching a connection's filters →
websocket.send_text awaited once with event.to_json()
- Event not matching →
send_text not called for that connection, but called for a second matching connection (sent_count logic)
send_text raises WebSocketDisconnect → connection removed from self.connections
send_text raises generic Exception → error logged, connection removed as well
- Queue event while
self.connections is empty → event consumed, nothing sent, loop continues
task.cancel() → CancelledError branch logs and exits the loop (task finishes, no exception propagates)
EventBridge.__init__() / _init_redis() — event_bridge.py:29 / event_bridge.py:41
Patch osism.services.event_bridge.redis.Redis; for env vars use monkeypatch.setenv / delenv.
- Defaults:
redis.Redis called with host="redis", port=6379, db=0, decode_responses=True, socket_connect_timeout=10, socket_timeout=None, health_check_interval=30
REDIS_HOST / REDIS_PORT / REDIS_DB env vars override the defaults (ints for port/db)
- Successful
ping() → _redis_client set and pubsub() called (subscriber created)
ping() raises → error logged, _redis_client and _redis_subscriber both None
osism.services.event_bridge.REDIS_AVAILABLE patched to False → warning logged, redis.Redis never called, client stays None
EventBridge.set_websocket_manager() — event_bridge.py:76
Patch EventBridge._start_redis_subscriber and EventBridge._start_processor_thread so no real threads start.
- With Redis client present and
_subscriber_thread is None → _start_redis_subscriber called
- Without Redis client (
_redis_client = None) → _start_redis_subscriber not called
_subscriber_thread already set → subscriber not started again
- Processor:
_processor_thread is None → _start_processor_thread called; an existing thread mock with is_alive() == True → not called again
_websocket_manager is stored
EventBridge.add_event() — event_bridge.py:89
- Redis path:
publish("osism:events", json.dumps({"event_type": ..., "payload": ...})) called; return value > 0 → info log, no warning
publish returns 0 → warning "No Redis subscribers …" logged
- First
publish raises → _init_redis() re-invoked (patch it), client still available → second publish succeeds ("after reconnect" log)
- First
publish raises and re-init leaves _redis_client = None → fallback: event lands on _event_queue (local queue)
- No Redis client at all → event goes straight to
_event_queue
queue.Full: patch _event_queue.put_nowait with side_effect=queue.Full (the real queue is unbounded) → warning logged, no exception propagates
- Generic exception (e.g. patched
json.dumps raising) → error logged, swallowed
EventBridge._redis_subscriber_loop() — event_bridge.py:154
Call the loop synchronously in the test (no thread) with _redis_subscriber replaced by a MagicMock; drive termination via get_message side effects that call self._shutdown_event.set(), and patch EventBridge._process_single_event. For retry tests patch _init_redis and use a _shutdown_event.wait spy or small retry_delay tolerance — each test must guarantee loop exit.
_redis_subscriber is None → error logged, immediate return
subscribe("osism:events") called once on entry
get_message returns None (timeout) → loop continues until shutdown is set
- Message of
type="message" with valid JSON and _websocket_manager set → _process_single_event called with the decoded dict
- Valid message but
_websocket_manager is None → event pushed to _event_queue instead
- Message with invalid JSON →
json.JSONDecodeError branch, error logged, loop continues
_process_single_event raising → generic "Error processing Redis event" branch, loop continues
get_message raising → inner loop breaks, outer except increments retry, _init_redis re-called after wait(retry_delay)
- Five consecutive failures → "Max Redis reconnection attempts reached, giving up" logged, loop ends
finally closes _redis_subscriber; close() raising is swallowed
EventBridge._process_single_event() — event_bridge.py:237
_websocket_manager is None → warning "No WebSocket manager available", returns without error
- With manager:
broadcast_event_from_notification must be an AsyncMock (the method spins up a fresh event loop via asyncio.new_event_loop() and run_until_complete); assert it was awaited with event_data["event_type"] and event_data["payload"]
- Coroutine raising → error logged, exception swallowed
EventBridge._process_events() — event_bridge.py:263
Run synchronously (no thread): pre-fill _event_queue, patch _process_single_event with a side effect that sets _shutdown_event after the first event.
- Queued event →
_process_single_event called, task_done() honoured (queue unfinished_tasks drops)
- Empty queue (
queue.Empty after the 1 s timeout) → continue until shutdown; verify clean exit when _shutdown_event is pre-set with an empty queue
_process_single_event raising → error logged, loop continues (set shutdown in a second side-effect call to terminate)
EventBridge.shutdown() — event_bridge.py:283
- Sets
_shutdown_event
_redis_subscriber.close() called; close() raising → error logged, shutdown continues
- Thread mocks with
is_alive() == True → join(timeout=5.0) called for both; is_alive() == False or None → no join
Mocking hints
- Async tests: everything on
WebSocketManager except the constructor is async def. With pytest-asyncio in strict mode, decorate with @pytest.mark.asyncio; alternatively set asyncio_mode = auto in setup.cfg [tool:pytest].
- Fake websockets:
ws = MagicMock(); ws.accept = AsyncMock(); ws.send_text = AsyncMock() — WebSocket is only used as a dict key, so no spec is required. WebSocketDisconnect is imported in the module namespace; raise the real class from fastapi in send_text.side_effect.
- Never use the module-level singletons (
websocket_manager.py:271, event_bridge.py:304). Create fresh instances per test so queues/locks bind to the running loop and no state leaks. For EventBridge, always construct under mocker.patch("osism.services.event_bridge.redis.Redis") — otherwise the constructor attempts a real connection. The import-time singleton itself only logs a connection error during collection (exception is caught), which is acceptable.
- No real threads: test
_redis_subscriber_loop and _process_events by calling the target function directly with _shutdown_event-driven termination, and stub _start_redis_subscriber / _start_processor_thread in set_websocket_manager tests. This keeps the tests deterministic.
- Broadcaster loop: the only non-deterministic test is
_broadcast_events; bound it with asyncio.wait_for(...) or a short sleep + cancel() in finally so a regression cannot hang CI.
- Use the
loguru_logs pattern only where applicable — these two modules use the stdlib logging module (logging.getLogger("osism.websocket") / "osism.event_bridge"), so plain caplog with caplog.set_level(logging.DEBUG, logger="osism.websocket") works here.
- Notification payload fixture example for the extraction branches:
payload = {"ironic_object.data": {"name": "node1", "uuid": "abc-123"}}
await manager.broadcast_event_from_notification("baremetal.node.power_set.end", payload)
event = manager.event_queue.get_nowait()
assert event.node_name == "node1"
assert event.data["resource_id"] == "abc-123"
Definition of Done
Dependencies
Background
Follow-up to #2192 (foundation) and PR #2193 (pytest + Zuul infrastructure). Part of Tier 6 (#2199).
osism/services/websocket_manager.py(271 LOC) implementsEventMessage, per-connection event/node/service filters and a queue-based broadcaster for FastAPI WebSocket clients.osism/services/event_bridge.py(304 LOC) is a Redis-pub/sub bridge that forwards events from the RabbitMQ listener container to the WebSocket manager in the API container, with a local-queue fallback and reconnect logic.Scope
Create the
tests/unit/services/package (__init__.py) plustests/unit/services/test_websocket_manager.pyandtests/unit/services/test_event_bridge.py. There is no existing coverage for either module (tests/unit/currently only hascommands/,tasks/,utils/).New dev dependency required: most
WebSocketManagermethods are coroutines, so the tests needpytest-asyncio. The Pipfile[dev-packages]only containspytest,pytest-covandpytest-mock— addpytest-asyncio(pinned like the other dev packages). Either setasyncio_mode = autoin the[tool:pytest]section ofsetup.cfgor mark async tests explicitly with@pytest.mark.asyncio(the plugin registers the marker, so--strict-markersstays happy). No new runtime dependency is needed:fastapiandredis(viacelery[redis]) come fromrequirements.txt, which the Zuul job installs viapipenv run pip install .(playbooks/test-unit.yml,install_requires = file: requirements.txt).Note:
osism/services/event_bridge.pyinstantiates a module-level singleton at import time (event_bridge.py:304) whose_init_redis()attempts a real Redis connection. The exception is caught (the import stays safe), but tests must construct freshEventBridgeinstances withredis.Redispatched — see mocking hints. The_redis_subscriber_loopretry machinery is the trickiest part; if it turns out too brittle, this issue may be split further during implementation (as done in Tier 3).Test targets
EventMessage—websocket_manager.py:15No patching needed (pure class; sync tests).
event_type,source,data,node_name;node_namedefaults toNoneidis a valid UUID4 string; two instances get different idstimestampends with"Z"and the prefix parses as ISO 8601 (do not assert an exact value —datetime.utcnow())to_dict()(websocket_manager.py:32) returns exactly the keysid,timestamp,event_type,source,node_name,datato_json()(websocket_manager.py:43) round-trips viajson.loadsback toto_dict()WebSocketConnection.matches_filters()—websocket_manager.py:57No patching needed (
WebSocketConnection(MagicMock())suffices; sync tests).Truefor any event (early return)event_filters=["baremetal.node.power_set"]: matchingevent_type→True, non-matching →Falsenode_filters=["node1"]:event.node_name="node1"→True;node_name="other"→False;node_name=None→False(explicitis not Nonecheck)service_filters=["baremetal"]:event_type="baremetal.node.power_set"→True(service = first dot segment);event_type="compute.instance.update"→Falseevent_type=""withservice_filters=["unknown"]→True(falsy event_type maps to service"unknown")FalseWebSocketManager.connect()/disconnect()—websocket_manager.py:95/websocket_manager.py:106Use a
MagicMockwebsocket withaccept = AsyncMock(); patchWebSocketManager._broadcast_events(orasyncio.create_taskatosism.services.websocket_manager.asyncio.create_task) so no real broadcaster loop starts, and always instantiate a freshWebSocketManager()inside the async test (do not reuse the module-level singletonwebsocket_manager.py:271).connect()awaitswebsocket.accept()and registers aWebSocketConnectioninself.connectionsconnect()starts the broadcaster task; a secondconnect()while the task is running does not start another one (done()check)disconnect()removes the connection; disconnecting an unknown websocket is a no-op (pop(..., None))WebSocketManager.update_filters()—websocket_manager.py:114event_filters/node_filters/service_filterswhen passedNonekeep the existing lists (partial update); passing[]clears a filter listWebSocketManager.add_event()/send_heartbeat()—websocket_manager.py:136/websocket_manager.py:258add_event()puts the event onevent_queue(qsize() == 1,get_nowait()returns it)send_heartbeat()with no connections → returns early, queue stays emptysend_heartbeat()with a registered connection → queues anEventMessagewithevent_type="heartbeat",source="osism",data={"message": "ping"}WebSocketManager.broadcast_event_from_notification()—websocket_manager.py:140No patching needed beyond a fresh manager; inspect the queued
EventMessageviaevent_queue.get_nowait().baremetal.*with"ironic_object.data"→node_namefromname,resource_idfromuuidcompute.*/nova.*with"nova_object.data"→node_nameprefershostovername; falls back tonamewhenhostmissingnetwork.*/neutron.*with"neutron_object.data"→resource_idfromid(fallbackuuid),node_namefromname(fallbackdevice_id)volume.*with"cinder_object.data"→resource_idid/uuid,node_namename/display_nameimage.*with"glance_object.data"andidentity.*with"keystone_object.data"→ analogousbaremetalwithout"ironic_object.data") →node_name/resource_idstayNone, event still queued"foo.bar") → no extraction, event queued withservice_type="foo"event_type=""→service_type="unknown"source="openstack",datacontains the original payload keys plusservice_typeandresource_id; the caller's payload dict is not mutated (payload.copy()).copy()/.getsemantics (e.g. a list) → exception caught, error logged, nothing queuedWebSocketManager._broadcast_events()—websocket_manager.py:204Insert fake connections directly into
manager.connections(bypassingconnect()), pre-fill the queue, then runtask = asyncio.create_task(manager._broadcast_events()), give it a tick (await asyncio.sleep(0.05)), assert, andtask.cancel()+ await it in afinallyblock.websocket.send_textawaited once withevent.to_json()send_textnot called for that connection, but called for a second matching connection (sent_countlogic)send_textraisesWebSocketDisconnect→ connection removed fromself.connectionssend_textraises genericException→ error logged, connection removed as wellself.connectionsis empty → event consumed, nothing sent, loop continuestask.cancel()→CancelledErrorbranch logs and exits the loop (task finishes, no exception propagates)EventBridge.__init__()/_init_redis()—event_bridge.py:29/event_bridge.py:41Patch
osism.services.event_bridge.redis.Redis; for env vars usemonkeypatch.setenv/delenv.redis.Rediscalled withhost="redis",port=6379,db=0,decode_responses=True,socket_connect_timeout=10,socket_timeout=None,health_check_interval=30REDIS_HOST/REDIS_PORT/REDIS_DBenv vars override the defaults (ints for port/db)ping()→_redis_clientset andpubsub()called (subscriber created)ping()raises → error logged,_redis_clientand_redis_subscriberbothNoneosism.services.event_bridge.REDIS_AVAILABLEpatched toFalse→ warning logged,redis.Redisnever called, client staysNoneEventBridge.set_websocket_manager()—event_bridge.py:76Patch
EventBridge._start_redis_subscriberandEventBridge._start_processor_threadso no real threads start._subscriber_thread is None→_start_redis_subscribercalled_redis_client = None) →_start_redis_subscribernot called_subscriber_threadalready set → subscriber not started again_processor_thread is None→_start_processor_threadcalled; an existing thread mock withis_alive() == True→ not called again_websocket_manageris storedEventBridge.add_event()—event_bridge.py:89publish("osism:events", json.dumps({"event_type": ..., "payload": ...}))called; return value > 0 → info log, no warningpublishreturns0→ warning "No Redis subscribers …" loggedpublishraises →_init_redis()re-invoked (patch it), client still available → secondpublishsucceeds ("after reconnect" log)publishraises and re-init leaves_redis_client = None→ fallback: event lands on_event_queue(local queue)_event_queuequeue.Full: patch_event_queue.put_nowaitwithside_effect=queue.Full(the real queue is unbounded) → warning logged, no exception propagatesjson.dumpsraising) → error logged, swallowedEventBridge._redis_subscriber_loop()—event_bridge.py:154Call the loop synchronously in the test (no thread) with
_redis_subscriberreplaced by aMagicMock; drive termination viaget_messageside effects that callself._shutdown_event.set(), and patchEventBridge._process_single_event. For retry tests patch_init_redisand use a_shutdown_event.waitspy or smallretry_delaytolerance — each test must guarantee loop exit._redis_subscriber is None→ error logged, immediate returnsubscribe("osism:events")called once on entryget_messagereturnsNone(timeout) → loop continues until shutdown is settype="message"with valid JSON and_websocket_managerset →_process_single_eventcalled with the decoded dict_websocket_manager is None→ event pushed to_event_queueinsteadjson.JSONDecodeErrorbranch, error logged, loop continues_process_single_eventraising → generic "Error processing Redis event" branch, loop continuesget_messageraising → inner loop breaks, outerexceptincrements retry,_init_redisre-called afterwait(retry_delay)finallycloses_redis_subscriber;close()raising is swallowedEventBridge._process_single_event()—event_bridge.py:237_websocket_manager is None→ warning "No WebSocket manager available", returns without errorbroadcast_event_from_notificationmust be anAsyncMock(the method spins up a fresh event loop viaasyncio.new_event_loop()andrun_until_complete); assert it was awaited withevent_data["event_type"]andevent_data["payload"]EventBridge._process_events()—event_bridge.py:263Run synchronously (no thread): pre-fill
_event_queue, patch_process_single_eventwith a side effect that sets_shutdown_eventafter the first event._process_single_eventcalled,task_done()honoured (queueunfinished_tasksdrops)queue.Emptyafter the 1 s timeout) →continueuntil shutdown; verify clean exit when_shutdown_eventis pre-set with an empty queue_process_single_eventraising → error logged, loop continues (set shutdown in a second side-effect call to terminate)EventBridge.shutdown()—event_bridge.py:283_shutdown_event_redis_subscriber.close()called;close()raising → error logged, shutdown continuesis_alive() == True→join(timeout=5.0)called for both;is_alive() == FalseorNone→ no joinMocking hints
WebSocketManagerexcept the constructor isasync def. Withpytest-asyncioin strict mode, decorate with@pytest.mark.asyncio; alternatively setasyncio_mode = autoinsetup.cfg[tool:pytest].ws = MagicMock(); ws.accept = AsyncMock(); ws.send_text = AsyncMock()—WebSocketis only used as a dict key, so no spec is required.WebSocketDisconnectis imported in the module namespace; raise the real class fromfastapiinsend_text.side_effect.websocket_manager.py:271,event_bridge.py:304). Create fresh instances per test so queues/locks bind to the running loop and no state leaks. ForEventBridge, always construct undermocker.patch("osism.services.event_bridge.redis.Redis")— otherwise the constructor attempts a real connection. The import-time singleton itself only logs a connection error during collection (exception is caught), which is acceptable._redis_subscriber_loopand_process_eventsby calling the target function directly with_shutdown_event-driven termination, and stub_start_redis_subscriber/_start_processor_threadinset_websocket_managertests. This keeps the tests deterministic._broadcast_events; bound it withasyncio.wait_for(...)or a shortsleep+cancel()infinallyso a regression cannot hang CI.loguru_logspattern only where applicable — these two modules use the stdlibloggingmodule (logging.getLogger("osism.websocket")/"osism.event_bridge"), so plaincaplogwithcaplog.set_level(logging.DEBUG, logger="osism.websocket")works here.Definition of Done
pytest-asyncioadded to Pipfile[dev-packages](pinned); async tests marked orasyncio_mode = autoconfiguredtests/unit/services/__init__.py,tests/unit/services/test_websocket_manager.pyandtests/unit/services/test_event_bridge.pycreatedpytest --cov=osism.services.websocket_manager --cov=osism.services.event_bridgeshows ≥ 90 %pipenv run pytest tests/unit/services/passes locallyflake8,mypy,python-blackremain greenpython-osism-unit-testspassesDependencies