Skip to content

Unit tests for osism/services/{websocket_manager,event_bridge}.py #2357

Description

@berendt

Background

Follow-up to #2192 (foundation) and PR #2193 (pytest + Zuul infrastructure). Part of Tier 6 (#2199). osism/services/websocket_manager.py (271 LOC) implements EventMessage, per-connection event/node/service filters and a queue-based broadcaster for FastAPI WebSocket clients. osism/services/event_bridge.py (304 LOC) is a Redis-pub/sub bridge that forwards events from the RabbitMQ listener container to the WebSocket manager in the API container, with a local-queue fallback and reconnect logic.

Scope

Create the tests/unit/services/ package (__init__.py) plus tests/unit/services/test_websocket_manager.py and tests/unit/services/test_event_bridge.py. There is no existing coverage for either module (tests/unit/ currently only has commands/, tasks/, utils/).

New dev dependency required: most WebSocketManager methods are coroutines, so the tests need pytest-asyncio. The Pipfile [dev-packages] only contains pytest, pytest-cov and pytest-mock — add pytest-asyncio (pinned like the other dev packages). Either set asyncio_mode = auto in the [tool:pytest] section of setup.cfg or mark async tests explicitly with @pytest.mark.asyncio (the plugin registers the marker, so --strict-markers stays happy). No new runtime dependency is needed: fastapi and redis (via celery[redis]) come from requirements.txt, which the Zuul job installs via pipenv run pip install . (playbooks/test-unit.yml, install_requires = file: requirements.txt).

Note: osism/services/event_bridge.py instantiates a module-level singleton at import time (event_bridge.py:304) whose _init_redis() attempts a real Redis connection. The exception is caught (the import stays safe), but tests must construct fresh EventBridge instances with redis.Redis patched — see mocking hints. The _redis_subscriber_loop retry machinery is the trickiest part; if it turns out too brittle, this issue may be split further during implementation (as done in Tier 3).

Test targets

EventMessagewebsocket_manager.py:15

No patching needed (pure class; sync tests).

  • Constructor sets event_type, source, data, node_name; node_name defaults to None
  • id is a valid UUID4 string; two instances get different ids
  • timestamp ends with "Z" and the prefix parses as ISO 8601 (do not assert an exact value — datetime.utcnow())
  • to_dict() (websocket_manager.py:32) returns exactly the keys id, timestamp, event_type, source, node_name, data
  • to_json() (websocket_manager.py:43) round-trips via json.loads back to to_dict()

WebSocketConnection.matches_filters()websocket_manager.py:57

No patching needed (WebSocketConnection(MagicMock()) suffices; sync tests).

  • No filters set → True for any event (early return)
  • event_filters=["baremetal.node.power_set"]: matching event_typeTrue, non-matching → False
  • node_filters=["node1"]: event.node_name="node1"True; node_name="other"False; node_name=NoneFalse (explicit is not None check)
  • service_filters=["baremetal"]: event_type="baremetal.node.power_set"True (service = first dot segment); event_type="compute.instance.update"False
  • event_type="" with service_filters=["unknown"]True (falsy event_type maps to service "unknown")
  • Combined filters are AND-ed: event filter matches but node filter does not → False

WebSocketManager.connect() / disconnect()websocket_manager.py:95 / websocket_manager.py:106

Use a MagicMock websocket with accept = AsyncMock(); patch WebSocketManager._broadcast_events (or asyncio.create_task at osism.services.websocket_manager.asyncio.create_task) so no real broadcaster loop starts, and always instantiate a fresh WebSocketManager() inside the async test (do not reuse the module-level singleton websocket_manager.py:271).

  • connect() awaits websocket.accept() and registers a WebSocketConnection in self.connections
  • First connect() starts the broadcaster task; a second connect() while the task is running does not start another one (done() check)
  • disconnect() removes the connection; disconnecting an unknown websocket is a no-op (pop(..., None))

WebSocketManager.update_filters()websocket_manager.py:114

  • Sets event_filters / node_filters / service_filters when passed
  • Arguments left as None keep the existing lists (partial update); passing [] clears a filter list
  • Unknown websocket → silent no-op, no exception

WebSocketManager.add_event() / send_heartbeat()websocket_manager.py:136 / websocket_manager.py:258

  • add_event() puts the event on event_queue (qsize() == 1, get_nowait() returns it)
  • send_heartbeat() with no connections → returns early, queue stays empty
  • send_heartbeat() with a registered connection → queues an EventMessage with event_type="heartbeat", source="osism", data={"message": "ping"}

WebSocketManager.broadcast_event_from_notification()websocket_manager.py:140

No patching needed beyond a fresh manager; inspect the queued EventMessage via event_queue.get_nowait().

  • baremetal.* with "ironic_object.data"node_name from name, resource_id from uuid
  • compute.* / nova.* with "nova_object.data"node_name prefers host over name; falls back to name when host missing
  • network.* / neutron.* with "neutron_object.data"resource_id from id (fallback uuid), node_name from name (fallback device_id)
  • volume.* with "cinder_object.data"resource_id id/uuid, node_name name/display_name
  • image.* with "glance_object.data" and identity.* with "keystone_object.data" → analogous
  • Known service type but expected payload key missing (e.g. baremetal without "ironic_object.data") → node_name/resource_id stay None, event still queued
  • Unknown service type ("foo.bar") → no extraction, event queued with service_type="foo"
  • event_type=""service_type="unknown"
  • Queued event: source="openstack", data contains the original payload keys plus service_type and resource_id; the caller's payload dict is not mutated (payload.copy())
  • Exception path: pass a payload without .copy()/.get semantics (e.g. a list) → exception caught, error logged, nothing queued

WebSocketManager._broadcast_events()websocket_manager.py:204

Insert fake connections directly into manager.connections (bypassing connect()), pre-fill the queue, then run task = asyncio.create_task(manager._broadcast_events()), give it a tick (await asyncio.sleep(0.05)), assert, and task.cancel() + await it in a finally block.

  • Event matching a connection's filters → websocket.send_text awaited once with event.to_json()
  • Event not matching → send_text not called for that connection, but called for a second matching connection (sent_count logic)
  • send_text raises WebSocketDisconnect → connection removed from self.connections
  • send_text raises generic Exception → error logged, connection removed as well
  • Queue event while self.connections is empty → event consumed, nothing sent, loop continues
  • task.cancel()CancelledError branch logs and exits the loop (task finishes, no exception propagates)

EventBridge.__init__() / _init_redis()event_bridge.py:29 / event_bridge.py:41

Patch osism.services.event_bridge.redis.Redis; for env vars use monkeypatch.setenv / delenv.

  • Defaults: redis.Redis called with host="redis", port=6379, db=0, decode_responses=True, socket_connect_timeout=10, socket_timeout=None, health_check_interval=30
  • REDIS_HOST / REDIS_PORT / REDIS_DB env vars override the defaults (ints for port/db)
  • Successful ping()_redis_client set and pubsub() called (subscriber created)
  • ping() raises → error logged, _redis_client and _redis_subscriber both None
  • osism.services.event_bridge.REDIS_AVAILABLE patched to False → warning logged, redis.Redis never called, client stays None

EventBridge.set_websocket_manager()event_bridge.py:76

Patch EventBridge._start_redis_subscriber and EventBridge._start_processor_thread so no real threads start.

  • With Redis client present and _subscriber_thread is None_start_redis_subscriber called
  • Without Redis client (_redis_client = None) → _start_redis_subscriber not called
  • _subscriber_thread already set → subscriber not started again
  • Processor: _processor_thread is None_start_processor_thread called; an existing thread mock with is_alive() == True → not called again
  • _websocket_manager is stored

EventBridge.add_event()event_bridge.py:89

  • Redis path: publish("osism:events", json.dumps({"event_type": ..., "payload": ...})) called; return value > 0 → info log, no warning
  • publish returns 0 → warning "No Redis subscribers …" logged
  • First publish raises → _init_redis() re-invoked (patch it), client still available → second publish succeeds ("after reconnect" log)
  • First publish raises and re-init leaves _redis_client = None → fallback: event lands on _event_queue (local queue)
  • No Redis client at all → event goes straight to _event_queue
  • queue.Full: patch _event_queue.put_nowait with side_effect=queue.Full (the real queue is unbounded) → warning logged, no exception propagates
  • Generic exception (e.g. patched json.dumps raising) → error logged, swallowed

EventBridge._redis_subscriber_loop()event_bridge.py:154

Call the loop synchronously in the test (no thread) with _redis_subscriber replaced by a MagicMock; drive termination via get_message side effects that call self._shutdown_event.set(), and patch EventBridge._process_single_event. For retry tests patch _init_redis and use a _shutdown_event.wait spy or small retry_delay tolerance — each test must guarantee loop exit.

  • _redis_subscriber is None → error logged, immediate return
  • subscribe("osism:events") called once on entry
  • get_message returns None (timeout) → loop continues until shutdown is set
  • Message of type="message" with valid JSON and _websocket_manager set → _process_single_event called with the decoded dict
  • Valid message but _websocket_manager is None → event pushed to _event_queue instead
  • Message with invalid JSON → json.JSONDecodeError branch, error logged, loop continues
  • _process_single_event raising → generic "Error processing Redis event" branch, loop continues
  • get_message raising → inner loop breaks, outer except increments retry, _init_redis re-called after wait(retry_delay)
  • Five consecutive failures → "Max Redis reconnection attempts reached, giving up" logged, loop ends
  • finally closes _redis_subscriber; close() raising is swallowed

EventBridge._process_single_event()event_bridge.py:237

  • _websocket_manager is None → warning "No WebSocket manager available", returns without error
  • With manager: broadcast_event_from_notification must be an AsyncMock (the method spins up a fresh event loop via asyncio.new_event_loop() and run_until_complete); assert it was awaited with event_data["event_type"] and event_data["payload"]
  • Coroutine raising → error logged, exception swallowed

EventBridge._process_events()event_bridge.py:263

Run synchronously (no thread): pre-fill _event_queue, patch _process_single_event with a side effect that sets _shutdown_event after the first event.

  • Queued event → _process_single_event called, task_done() honoured (queue unfinished_tasks drops)
  • Empty queue (queue.Empty after the 1 s timeout) → continue until shutdown; verify clean exit when _shutdown_event is pre-set with an empty queue
  • _process_single_event raising → error logged, loop continues (set shutdown in a second side-effect call to terminate)

EventBridge.shutdown()event_bridge.py:283

  • Sets _shutdown_event
  • _redis_subscriber.close() called; close() raising → error logged, shutdown continues
  • Thread mocks with is_alive() == Truejoin(timeout=5.0) called for both; is_alive() == False or None → no join

Mocking hints

  • Async tests: everything on WebSocketManager except the constructor is async def. With pytest-asyncio in strict mode, decorate with @pytest.mark.asyncio; alternatively set asyncio_mode = auto in setup.cfg [tool:pytest].
  • Fake websockets: ws = MagicMock(); ws.accept = AsyncMock(); ws.send_text = AsyncMock()WebSocket is only used as a dict key, so no spec is required. WebSocketDisconnect is imported in the module namespace; raise the real class from fastapi in send_text.side_effect.
  • Never use the module-level singletons (websocket_manager.py:271, event_bridge.py:304). Create fresh instances per test so queues/locks bind to the running loop and no state leaks. For EventBridge, always construct under mocker.patch("osism.services.event_bridge.redis.Redis") — otherwise the constructor attempts a real connection. The import-time singleton itself only logs a connection error during collection (exception is caught), which is acceptable.
  • No real threads: test _redis_subscriber_loop and _process_events by calling the target function directly with _shutdown_event-driven termination, and stub _start_redis_subscriber / _start_processor_thread in set_websocket_manager tests. This keeps the tests deterministic.
  • Broadcaster loop: the only non-deterministic test is _broadcast_events; bound it with asyncio.wait_for(...) or a short sleep + cancel() in finally so a regression cannot hang CI.
  • Use the loguru_logs pattern only where applicable — these two modules use the stdlib logging module (logging.getLogger("osism.websocket") / "osism.event_bridge"), so plain caplog with caplog.set_level(logging.DEBUG, logger="osism.websocket") works here.
  • Notification payload fixture example for the extraction branches:
    payload = {"ironic_object.data": {"name": "node1", "uuid": "abc-123"}}
    await manager.broadcast_event_from_notification("baremetal.node.power_set.end", payload)
    event = manager.event_queue.get_nowait()
    assert event.node_name == "node1"
    assert event.data["resource_id"] == "abc-123"

Definition of Done

  • pytest-asyncio added to Pipfile [dev-packages] (pinned); async tests marked or asyncio_mode = auto configured
  • tests/unit/services/__init__.py, tests/unit/services/test_websocket_manager.py and tests/unit/services/test_event_bridge.py created
  • All listed cases covered
  • pytest --cov=osism.services.websocket_manager --cov=osism.services.event_bridge shows ≥ 90 %
  • pipenv run pytest tests/unit/services/ passes locally
  • No hanging tests: all loop tests terminate via shutdown events / task cancellation
  • flake8, mypy, python-black remain green
  • Zuul job python-osism-unit-tests passes

Dependencies

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    Status
    Ready

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions