Skip to content

Unit tests for osism/services/listener.py #2358

Description

@berendt

Background

Follow-up to #2192 (foundation) and PR #2193 (pytest + Zuul infrastructure). Part of Tier 6 (#2199). osism/services/listener.py (518 LOC) is the RabbitMQ notification listener: a BaremetalEvents dispatcher mapping Ironic event types to NetBox Celery tasks, and a NotificationsDump(ConsumerMixin) consumer with passive exchange discovery (retry loop, background discovery thread, consumer restart) plus message handling (event-bridge forwarding, OSISM API delivery with retries, handler dispatch).

Scope

Add tests/unit/services/test_listener.py. There is no tests/unit/services/ package and no listener coverage yet. The package (tests/unit/services/__init__.py) is also created by the companion Tier 6 issue covering the other osism/services/ modules (event_bridge.py / websocket_manager.py) — coordinate so whichever PR lands first creates the __init__.py and the second rebases.

Focus: the BaremetalEvents state machine and exchange handling. main() is a while True loop — cover only the two reachable branches with an escape hatch (see hints); the module is large, so the issue may be split further during implementation.

Test targets

Module constants — listener.py:25

  • EXCHANGES_CONFIG contains exactly the 6 services (ironic, nova, neutron, cinder, keystone, glance); for each, exchange == service, routing_key == f"{service}_versioned_notifications.info", queue == f"osism-listener-{service}"
  • Legacy constants EXCHANGE_NAME/ROUTING_KEY/QUEUE_NAME (listener.py:59) match the ironic entry

BaremetalEvents.get_handler()listener.py:97

No patching needed (returns bound methods).

  • Each of the 8 registered event types resolves to the matching bound method, e.g. "baremetal.node.power_set.end"node_power_set_end, "baremetal.node.provision_set.start"node_provision_set_start, "baremetal.node.delete.end"node_delete_end (parametrize over the full _handler tree)
  • Unknown leaf ("baremetal.node.power_set.start") and unknown branch ("baremetal.port.create.end") → KeyError caught, returns the default handler; calling it with {"ironic_object.data": {"name": "node-1"}} logs "<event_type> ## node-1" and triggers no NetBox task
  • More than 4 segments ("baremetal.node.provision_set.end.extra") → still resolves via keys 0–3
  • Fewer than 4 segments ("baremetal.node.power_set") → raises IndexError (only KeyError is caught) — assert with pytest.raises to document current behavior
  • get_object_data() (listener.py:94) with payload missing "ironic_object.data"KeyError

BaremetalEvents handler methods — listener.py:113listener.py:169

Patch (the tasks are Celery-wrapped; only .delay is called here, no broker needed):

  • osism.services.listener.netbox.set_power_state.delay
  • osism.services.listener.netbox.set_provision_state.delay
  • osism.services.listener.netbox.set_maintenance.delay

Cases (parametrize where sensible; payload is always {"ironic_object.data": {...}}):

  • node_power_set_end (listener.py:113) → set_power_state.delay("node-1", "power on")
  • node_power_state_corrected_success (listener.py:121) → set_power_state.delay("node-1", "power off")
  • node_maintenance_set_end (listener.py:129) → set_maintenance.delay("node-1", state=True) — note the keyword argument
  • node_provision_set_start (listener.py:146), node_provision_set_end (listener.py:154), node_provision_set_success (listener.py:137) → each calls set_provision_state.delay("node-1", "<provision_state>")
  • node_delete_end (listener.py:162) → set_provision_state.delay("node-1", None) and set_power_state.delay("node-1", None)
  • node_create_end (listener.py:169) → set_provision_state.delay(...) and set_power_state.delay(...) with the payload values

NotificationsDump.__init__()listener.py:178

Patch osism.services.listener.settings.OSISM_API_URL and osism.services.listener.requests.Session; pass a MagicMock() connection.

  • OSISM_API_URL = Noneosism_api_session is None, osism_baremetal_api_url is None
  • OSISM_API_URL = "http://api:8000/" (trailing slash) → session created, URL "http://api:8000/notifications/baremetal" (rstrip("/") applied)
  • Event bridge import succeeds → self.event_bridge is osism.services.event_bridge.event_bridge (the module singleton)
  • ImportError branch → mocker.patch.dict(sys.modules, {"osism.services.event_bridge": None}) makes the from ... import raise → event_bridge is None, warning "Event bridge not available" logged
  • Initial state: _available_exchanges == {}, _discovery_thread is None, _stop_discovery/_new_exchanges_found events not set

_get_exchange_properties()listener.py:208

  • Channel mock: exchange_declare succeeds → returns {"type": "topic", "durable": True}; assert call used passive=True and type="topic"
  • exchange_declare raises (any Exception) → returns None, debug logged

_check_for_new_exchanges()listener.py:232

Mock self.connection.channel() as a context manager (__enter__ returns a channel mock); stub _get_exchange_properties per exchange.

  • No exchange exists (props always None) → returns False, _available_exchanges stays empty
  • One exchange exists (e.g. ironic) → entry added with merged config plus exchange_props key, returns True
  • Service already in _available_exchanges → skipped (_get_exchange_properties not called for it)
  • connection.channel() raises → warning "Error checking for new exchanges" logged, returns False

_exchange_discovery_loop()listener.py:259

Call the method directly (no real thread); replace self._stop_discovery with a MagicMock event to script iterations — never rely on real timeouts.

  • wait() returns True (stop requested) → breaks immediately, no exchange check
  • All exchanges already in _available_exchanges (fill with EXCHANGES_CONFIG) → breaks with "Stopping exchange discovery" log, _check_for_new_exchanges not called
  • _check_for_new_exchanges returns True_new_exchanges_found set and self.should_stop is True; loop exits on next scripted is_set()/wait() step
  • _check_for_new_exchanges returns False → loop continues without signaling (script wait side effect [False, True])

_start_exchange_discovery()listener.py:290 / _stop_exchange_discovery()listener.py:305

Patch osism.services.listener.threading.Thread.

  • All exchanges already available → no thread created
  • Otherwise → _stop_discovery.clear() called, Thread(target=self._exchange_discovery_loop, name="exchange-discovery", daemon=True) constructed and .start() called
  • Stop: sets _stop_discovery; thread alive → join(timeout=5); _discovery_thread is None → no error; thread not alive → join not called

_wait_for_exchanges()listener.py:311

Patch osism.services.listener.time.sleep and stub _check_for_new_exchanges.

  • _available_exchanges already populated → returns immediately, no check, no sleep
  • First check finds nothing, second populates _available_exchanges (use a side_effect function mutating the dict) → sleep called once with EXCHANGE_RETRY_INTERVAL (60)
  • Safety: give the sleep mock a bounded side_effect (raise after N calls) so a regression cannot hang pytest

get_consumers()listener.py:331

Stub _wait_for_exchanges and _start_exchange_discovery on the instance; pre-populate _available_exchanges; patch osism.services.listener.Exchange and osism.services.listener.Queue; pass a MagicMock consumer factory.

  • Two available exchanges → two consumers returned; Exchange(...) called with type/durable from exchange_props and passive=True; Queue(...) called with routing_key, auto_delete=False, no_ack=True; consumer(queue, callbacks=[self.on_message])
  • exchange_props missing type/durable → defaults "topic"/True used
  • Exchange constructor raises for one service → error logged, remaining consumers still returned
  • _available_exchanges left empty after the stubbed wait → returns [], error "No consumers could be configured" logged

on_message()listener.py:377

Build body = {"oslo.message": json.dumps(data)} with data containing event_type, payload, priority, timestamp, publisher_id, message_id. Patch osism.services.listener.time.sleep for retry tests; set self.event_bridge / self.osism_api_session directly on the instance.

Payload-info extraction / logging

  • baremetal event with ironic_object.data → only name/provision_state/power_state keys logged
  • compute/nova event with nova_object.data → only uuid/host/state/task_state keys
  • network/neutron event → {"service": "neutron"}
  • Other service (identity) → {"service": "identity"}; missing event_type → service "unknown" (note: dispatch via data["event_type"] then raises KeyError if no API session — keep payload absent only where the code path allows)

Event bridge forwarding

  • event_bridge set → add_event(data["event_type"], data["payload"]) called once
  • add_event raises → both error logs emitted (second one digs payload["ironic_object.data"]["name"] with .get fallbacks → "unknown" for non-ironic payloads), processing continues
  • event_bridge = None → no forwarding attempted

OSISM API delivery (osism_api_session set)

  • post returns status 204 → success log, exactly one call, no handler dispatch
  • requests.ConnectionError on all 3 tries → 3 posts, give-up log includes json.dumps(data), sleep called with 3 then 9 (pow(3, tries - 1))
  • requests.Timeout → same retry/give-up path
  • HTTPError with response.status_code = 404 → gives up after first try ("client side error")
  • status_code = 500 → also gives up early (condition is <= 500) — assert to document current behavior
  • status_code = 503 → retried up to 3 times
  • Status 200 (not 204, raise_for_status() no-op) → falls through to the failure branch and retries — document current behavior
  • Assert the posted JSON body contains exactly priority, event_type, timestamp, publisher_id, message_id, payload

Handler dispatch (no API session)

  • osism_api_session = Nonebaremetal_events.get_handler(data["event_type"]) resolved and called with data["payload"] (stub get_handler or patch the three netbox.*.delay targets and assert the task call)

main()listener.py:486

Patch osism.services.listener.Connection, osism.services.listener.NotificationsDump, osism.services.listener.time.sleep. Escape the while True loop by raising a sentinel exception (e.g. from sleep or the second Connection call) and asserting it with pytest.raises.

  • Connection raises ConnectionRefusedError → error logged, sleep(60) called, loop retries (sentinel on second iteration)
  • Restart path: consumer mock with _new_exchanges_found.is_set()True once → _stop_exchange_discovery() called, _new_exchanges_found.clear() called, _available_exchanges carried over into the next NotificationsDump instance (sentinel ends the second iteration)

Mocking hints

  • BROKER_URI is read from the environment at import time (listener.py:62) — patch osism.services.listener.BROKER_URI in main() tests, not the env var.
  • Importing the module pulls in osism.tasks.netbox and osism.settings; the shared tests/conftest.py ansible stubs already make this import safe in the unit-test venv.
  • NotificationsDump always needs a connection argument — a plain MagicMock() suffices; for _check_for_new_exchanges configure connection.channel.return_value.__enter__.return_value as the channel.
  • Never start the real discovery thread or hit real timeouts: test _exchange_discovery_loop synchronously with a scripted _stop_discovery mock, and patch threading.Thread in _start_exchange_discovery.
  • should_stop is a ConsumerMixin property-backed attribute; asserting consumer.should_stop is True after the discovery loop works on the instance.
  • Use the shared loguru_logs fixture from tests/conftest.py to assert log messages (pytest caplog does not capture loguru).
  • Celery tasks: only .delay is invoked here — patch the three osism.services.listener.netbox.*.delay attributes; no broker or task execution involved.
  • For the sys.modules ImportError trick, mapping a module name to None makes from osism.services.event_bridge import event_bridge raise ImportError; mocker.patch.dict(sys.modules, ...) restores it automatically.

Definition of Done

  • tests/unit/services/__init__.py present (created here or by the companion Tier 6 services issue — coordinate to avoid conflicts)
  • tests/unit/services/test_listener.py created
  • All listed cases covered
  • pytest --cov=osism.services.listener shows ≥ 90 % (the if __name__ == "__main__" guard and consumer.run() plumbing may stay uncovered)
  • pipenv run pytest tests/unit/services/test_listener.py passes locally
  • flake8, mypy, python-black remain green
  • Zuul job python-osism-unit-tests passes

Dependencies

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    Status
    Ready

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions