Background
Follow-up to #2192 (foundation) and PR #2193 (pytest + Zuul infrastructure). Part of Tier 6 (#2199). osism/services/listener.py (518 LOC) is the RabbitMQ notification listener: a BaremetalEvents dispatcher mapping Ironic event types to NetBox Celery tasks, and a NotificationsDump(ConsumerMixin) consumer with passive exchange discovery (retry loop, background discovery thread, consumer restart) plus message handling (event-bridge forwarding, OSISM API delivery with retries, handler dispatch).
Scope
Add tests/unit/services/test_listener.py. There is no tests/unit/services/ package and no listener coverage yet. The package (tests/unit/services/__init__.py) is also created by the companion Tier 6 issue covering the other osism/services/ modules (event_bridge.py / websocket_manager.py) — coordinate so whichever PR lands first creates the __init__.py and the second rebases.
Focus: the BaremetalEvents state machine and exchange handling. main() is a while True loop — cover only the two reachable branches with an escape hatch (see hints); the module is large, so the issue may be split further during implementation.
Test targets
Module constants — listener.py:25
EXCHANGES_CONFIG contains exactly the 6 services (ironic, nova, neutron, cinder, keystone, glance); for each, exchange == service, routing_key == f"{service}_versioned_notifications.info", queue == f"osism-listener-{service}"
- Legacy constants
EXCHANGE_NAME/ROUTING_KEY/QUEUE_NAME (listener.py:59) match the ironic entry
BaremetalEvents.get_handler() — listener.py:97
No patching needed (returns bound methods).
- Each of the 8 registered event types resolves to the matching bound method, e.g.
"baremetal.node.power_set.end" → node_power_set_end, "baremetal.node.provision_set.start" → node_provision_set_start, "baremetal.node.delete.end" → node_delete_end (parametrize over the full _handler tree)
- Unknown leaf (
"baremetal.node.power_set.start") and unknown branch ("baremetal.port.create.end") → KeyError caught, returns the default handler; calling it with {"ironic_object.data": {"name": "node-1"}} logs "<event_type> ## node-1" and triggers no NetBox task
- More than 4 segments (
"baremetal.node.provision_set.end.extra") → still resolves via keys 0–3
- Fewer than 4 segments (
"baremetal.node.power_set") → raises IndexError (only KeyError is caught) — assert with pytest.raises to document current behavior
get_object_data() (listener.py:94) with payload missing "ironic_object.data" → KeyError
BaremetalEvents handler methods — listener.py:113–listener.py:169
Patch (the tasks are Celery-wrapped; only .delay is called here, no broker needed):
osism.services.listener.netbox.set_power_state.delay
osism.services.listener.netbox.set_provision_state.delay
osism.services.listener.netbox.set_maintenance.delay
Cases (parametrize where sensible; payload is always {"ironic_object.data": {...}}):
node_power_set_end (listener.py:113) → set_power_state.delay("node-1", "power on")
node_power_state_corrected_success (listener.py:121) → set_power_state.delay("node-1", "power off")
node_maintenance_set_end (listener.py:129) → set_maintenance.delay("node-1", state=True) — note the keyword argument
node_provision_set_start (listener.py:146), node_provision_set_end (listener.py:154), node_provision_set_success (listener.py:137) → each calls set_provision_state.delay("node-1", "<provision_state>")
node_delete_end (listener.py:162) → set_provision_state.delay("node-1", None) and set_power_state.delay("node-1", None)
node_create_end (listener.py:169) → set_provision_state.delay(...) and set_power_state.delay(...) with the payload values
NotificationsDump.__init__() — listener.py:178
Patch osism.services.listener.settings.OSISM_API_URL and osism.services.listener.requests.Session; pass a MagicMock() connection.
OSISM_API_URL = None → osism_api_session is None, osism_baremetal_api_url is None
OSISM_API_URL = "http://api:8000/" (trailing slash) → session created, URL "http://api:8000/notifications/baremetal" (rstrip("/") applied)
- Event bridge import succeeds →
self.event_bridge is osism.services.event_bridge.event_bridge (the module singleton)
- ImportError branch →
mocker.patch.dict(sys.modules, {"osism.services.event_bridge": None}) makes the from ... import raise → event_bridge is None, warning "Event bridge not available" logged
- Initial state:
_available_exchanges == {}, _discovery_thread is None, _stop_discovery/_new_exchanges_found events not set
_get_exchange_properties() — listener.py:208
- Channel mock:
exchange_declare succeeds → returns {"type": "topic", "durable": True}; assert call used passive=True and type="topic"
exchange_declare raises (any Exception) → returns None, debug logged
_check_for_new_exchanges() — listener.py:232
Mock self.connection.channel() as a context manager (__enter__ returns a channel mock); stub _get_exchange_properties per exchange.
- No exchange exists (props always
None) → returns False, _available_exchanges stays empty
- One exchange exists (e.g.
ironic) → entry added with merged config plus exchange_props key, returns True
- Service already in
_available_exchanges → skipped (_get_exchange_properties not called for it)
connection.channel() raises → warning "Error checking for new exchanges" logged, returns False
_exchange_discovery_loop() — listener.py:259
Call the method directly (no real thread); replace self._stop_discovery with a MagicMock event to script iterations — never rely on real timeouts.
wait() returns True (stop requested) → breaks immediately, no exchange check
- All exchanges already in
_available_exchanges (fill with EXCHANGES_CONFIG) → breaks with "Stopping exchange discovery" log, _check_for_new_exchanges not called
_check_for_new_exchanges returns True → _new_exchanges_found set and self.should_stop is True; loop exits on next scripted is_set()/wait() step
_check_for_new_exchanges returns False → loop continues without signaling (script wait side effect [False, True])
_start_exchange_discovery() — listener.py:290 / _stop_exchange_discovery() — listener.py:305
Patch osism.services.listener.threading.Thread.
- All exchanges already available → no thread created
- Otherwise →
_stop_discovery.clear() called, Thread(target=self._exchange_discovery_loop, name="exchange-discovery", daemon=True) constructed and .start() called
- Stop: sets
_stop_discovery; thread alive → join(timeout=5); _discovery_thread is None → no error; thread not alive → join not called
_wait_for_exchanges() — listener.py:311
Patch osism.services.listener.time.sleep and stub _check_for_new_exchanges.
_available_exchanges already populated → returns immediately, no check, no sleep
- First check finds nothing, second populates
_available_exchanges (use a side_effect function mutating the dict) → sleep called once with EXCHANGE_RETRY_INTERVAL (60)
- Safety: give the
sleep mock a bounded side_effect (raise after N calls) so a regression cannot hang pytest
get_consumers() — listener.py:331
Stub _wait_for_exchanges and _start_exchange_discovery on the instance; pre-populate _available_exchanges; patch osism.services.listener.Exchange and osism.services.listener.Queue; pass a MagicMock consumer factory.
- Two available exchanges → two consumers returned;
Exchange(...) called with type/durable from exchange_props and passive=True; Queue(...) called with routing_key, auto_delete=False, no_ack=True; consumer(queue, callbacks=[self.on_message])
exchange_props missing type/durable → defaults "topic"/True used
Exchange constructor raises for one service → error logged, remaining consumers still returned
_available_exchanges left empty after the stubbed wait → returns [], error "No consumers could be configured" logged
on_message() — listener.py:377
Build body = {"oslo.message": json.dumps(data)} with data containing event_type, payload, priority, timestamp, publisher_id, message_id. Patch osism.services.listener.time.sleep for retry tests; set self.event_bridge / self.osism_api_session directly on the instance.
Payload-info extraction / logging
baremetal event with ironic_object.data → only name/provision_state/power_state keys logged
compute/nova event with nova_object.data → only uuid/host/state/task_state keys
network/neutron event → {"service": "neutron"}
- Other service (
identity) → {"service": "identity"}; missing event_type → service "unknown" (note: dispatch via data["event_type"] then raises KeyError if no API session — keep payload absent only where the code path allows)
Event bridge forwarding
event_bridge set → add_event(data["event_type"], data["payload"]) called once
add_event raises → both error logs emitted (second one digs payload["ironic_object.data"]["name"] with .get fallbacks → "unknown" for non-ironic payloads), processing continues
event_bridge = None → no forwarding attempted
OSISM API delivery (osism_api_session set)
post returns status 204 → success log, exactly one call, no handler dispatch
requests.ConnectionError on all 3 tries → 3 posts, give-up log includes json.dumps(data), sleep called with 3 then 9 (pow(3, tries - 1))
requests.Timeout → same retry/give-up path
HTTPError with response.status_code = 404 → gives up after first try ("client side error")
status_code = 500 → also gives up early (condition is <= 500) — assert to document current behavior
status_code = 503 → retried up to 3 times
- Status 200 (not 204,
raise_for_status() no-op) → falls through to the failure branch and retries — document current behavior
- Assert the posted JSON body contains exactly
priority, event_type, timestamp, publisher_id, message_id, payload
Handler dispatch (no API session)
osism_api_session = None → baremetal_events.get_handler(data["event_type"]) resolved and called with data["payload"] (stub get_handler or patch the three netbox.*.delay targets and assert the task call)
main() — listener.py:486
Patch osism.services.listener.Connection, osism.services.listener.NotificationsDump, osism.services.listener.time.sleep. Escape the while True loop by raising a sentinel exception (e.g. from sleep or the second Connection call) and asserting it with pytest.raises.
Connection raises ConnectionRefusedError → error logged, sleep(60) called, loop retries (sentinel on second iteration)
- Restart path: consumer mock with
_new_exchanges_found.is_set() → True once → _stop_exchange_discovery() called, _new_exchanges_found.clear() called, _available_exchanges carried over into the next NotificationsDump instance (sentinel ends the second iteration)
Mocking hints
BROKER_URI is read from the environment at import time (listener.py:62) — patch osism.services.listener.BROKER_URI in main() tests, not the env var.
- Importing the module pulls in
osism.tasks.netbox and osism.settings; the shared tests/conftest.py ansible stubs already make this import safe in the unit-test venv.
NotificationsDump always needs a connection argument — a plain MagicMock() suffices; for _check_for_new_exchanges configure connection.channel.return_value.__enter__.return_value as the channel.
- Never start the real discovery thread or hit real timeouts: test
_exchange_discovery_loop synchronously with a scripted _stop_discovery mock, and patch threading.Thread in _start_exchange_discovery.
should_stop is a ConsumerMixin property-backed attribute; asserting consumer.should_stop is True after the discovery loop works on the instance.
- Use the shared
loguru_logs fixture from tests/conftest.py to assert log messages (pytest caplog does not capture loguru).
- Celery tasks: only
.delay is invoked here — patch the three osism.services.listener.netbox.*.delay attributes; no broker or task execution involved.
- For the
sys.modules ImportError trick, mapping a module name to None makes from osism.services.event_bridge import event_bridge raise ImportError; mocker.patch.dict(sys.modules, ...) restores it automatically.
Definition of Done
Dependencies
Background
Follow-up to #2192 (foundation) and PR #2193 (pytest + Zuul infrastructure). Part of Tier 6 (#2199).
osism/services/listener.py(518 LOC) is the RabbitMQ notification listener: aBaremetalEventsdispatcher mapping Ironic event types to NetBox Celery tasks, and aNotificationsDump(ConsumerMixin)consumer with passive exchange discovery (retry loop, background discovery thread, consumer restart) plus message handling (event-bridge forwarding, OSISM API delivery with retries, handler dispatch).Scope
Add
tests/unit/services/test_listener.py. There is notests/unit/services/package and no listener coverage yet. The package (tests/unit/services/__init__.py) is also created by the companion Tier 6 issue covering the otherosism/services/modules (event_bridge.py/websocket_manager.py) — coordinate so whichever PR lands first creates the__init__.pyand the second rebases.Focus: the
BaremetalEventsstate machine and exchange handling.main()is awhile Trueloop — cover only the two reachable branches with an escape hatch (see hints); the module is large, so the issue may be split further during implementation.Test targets
Module constants —
listener.py:25EXCHANGES_CONFIGcontains exactly the 6 services (ironic,nova,neutron,cinder,keystone,glance); for each,exchange == service,routing_key == f"{service}_versioned_notifications.info",queue == f"osism-listener-{service}"EXCHANGE_NAME/ROUTING_KEY/QUEUE_NAME(listener.py:59) match theironicentryBaremetalEvents.get_handler()—listener.py:97No patching needed (returns bound methods).
"baremetal.node.power_set.end"→node_power_set_end,"baremetal.node.provision_set.start"→node_provision_set_start,"baremetal.node.delete.end"→node_delete_end(parametrize over the full_handlertree)"baremetal.node.power_set.start") and unknown branch ("baremetal.port.create.end") →KeyErrorcaught, returns the default handler; calling it with{"ironic_object.data": {"name": "node-1"}}logs"<event_type> ## node-1"and triggers no NetBox task"baremetal.node.provision_set.end.extra") → still resolves via keys 0–3"baremetal.node.power_set") → raisesIndexError(onlyKeyErroris caught) — assert withpytest.raisesto document current behaviorget_object_data()(listener.py:94) with payload missing"ironic_object.data"→KeyErrorBaremetalEventshandler methods —listener.py:113–listener.py:169Patch (the tasks are Celery-wrapped; only
.delayis called here, no broker needed):osism.services.listener.netbox.set_power_state.delayosism.services.listener.netbox.set_provision_state.delayosism.services.listener.netbox.set_maintenance.delayCases (parametrize where sensible; payload is always
{"ironic_object.data": {...}}):node_power_set_end(listener.py:113) →set_power_state.delay("node-1", "power on")node_power_state_corrected_success(listener.py:121) →set_power_state.delay("node-1", "power off")node_maintenance_set_end(listener.py:129) →set_maintenance.delay("node-1", state=True)— note the keyword argumentnode_provision_set_start(listener.py:146),node_provision_set_end(listener.py:154),node_provision_set_success(listener.py:137) → each callsset_provision_state.delay("node-1", "<provision_state>")node_delete_end(listener.py:162) →set_provision_state.delay("node-1", None)andset_power_state.delay("node-1", None)node_create_end(listener.py:169) →set_provision_state.delay(...)andset_power_state.delay(...)with the payload valuesNotificationsDump.__init__()—listener.py:178Patch
osism.services.listener.settings.OSISM_API_URLandosism.services.listener.requests.Session; pass aMagicMock()connection.OSISM_API_URL = None→osism_api_session is None,osism_baremetal_api_url is NoneOSISM_API_URL = "http://api:8000/"(trailing slash) → session created, URL"http://api:8000/notifications/baremetal"(rstrip("/")applied)self.event_bridgeisosism.services.event_bridge.event_bridge(the module singleton)mocker.patch.dict(sys.modules, {"osism.services.event_bridge": None})makes thefrom ... importraise →event_bridge is None, warning "Event bridge not available" logged_available_exchanges == {},_discovery_thread is None,_stop_discovery/_new_exchanges_foundevents not set_get_exchange_properties()—listener.py:208exchange_declaresucceeds → returns{"type": "topic", "durable": True}; assert call usedpassive=Trueandtype="topic"exchange_declareraises (anyException) → returnsNone, debug logged_check_for_new_exchanges()—listener.py:232Mock
self.connection.channel()as a context manager (__enter__returns a channel mock); stub_get_exchange_propertiesper exchange.None) → returnsFalse,_available_exchangesstays emptyironic) → entry added with merged config plusexchange_propskey, returnsTrue_available_exchanges→ skipped (_get_exchange_propertiesnot called for it)connection.channel()raises → warning "Error checking for new exchanges" logged, returnsFalse_exchange_discovery_loop()—listener.py:259Call the method directly (no real thread); replace
self._stop_discoverywith aMagicMockevent to script iterations — never rely on real timeouts.wait()returnsTrue(stop requested) → breaks immediately, no exchange check_available_exchanges(fill withEXCHANGES_CONFIG) → breaks with "Stopping exchange discovery" log,_check_for_new_exchangesnot called_check_for_new_exchangesreturnsTrue→_new_exchanges_foundset andself.should_stop is True; loop exits on next scriptedis_set()/wait()step_check_for_new_exchangesreturnsFalse→ loop continues without signaling (scriptwaitside effect[False, True])_start_exchange_discovery()—listener.py:290/_stop_exchange_discovery()—listener.py:305Patch
osism.services.listener.threading.Thread._stop_discovery.clear()called,Thread(target=self._exchange_discovery_loop, name="exchange-discovery", daemon=True)constructed and.start()called_stop_discovery; thread alive →join(timeout=5);_discovery_thread is None→ no error; thread not alive →joinnot called_wait_for_exchanges()—listener.py:311Patch
osism.services.listener.time.sleepand stub_check_for_new_exchanges._available_exchangesalready populated → returns immediately, no check, no sleep_available_exchanges(use aside_effectfunction mutating the dict) →sleepcalled once withEXCHANGE_RETRY_INTERVAL(60)sleepmock a boundedside_effect(raise after N calls) so a regression cannot hang pytestget_consumers()—listener.py:331Stub
_wait_for_exchangesand_start_exchange_discoveryon the instance; pre-populate_available_exchanges; patchosism.services.listener.Exchangeandosism.services.listener.Queue; pass aMagicMockconsumer factory.Exchange(...)called withtype/durablefromexchange_propsandpassive=True;Queue(...)called withrouting_key,auto_delete=False,no_ack=True;consumer(queue, callbacks=[self.on_message])exchange_propsmissingtype/durable→ defaults"topic"/TrueusedExchangeconstructor raises for one service → error logged, remaining consumers still returned_available_exchangesleft empty after the stubbed wait → returns[], error "No consumers could be configured" loggedon_message()—listener.py:377Build
body = {"oslo.message": json.dumps(data)}withdatacontainingevent_type,payload,priority,timestamp,publisher_id,message_id. Patchosism.services.listener.time.sleepfor retry tests; setself.event_bridge/self.osism_api_sessiondirectly on the instance.Payload-info extraction / logging
baremetalevent withironic_object.data→ onlyname/provision_state/power_statekeys loggedcompute/novaevent withnova_object.data→ onlyuuid/host/state/task_statekeysnetwork/neutronevent →{"service": "neutron"}identity) →{"service": "identity"}; missingevent_type→ service"unknown"(note: dispatch viadata["event_type"]then raisesKeyErrorif no API session — keeppayloadabsent only where the code path allows)Event bridge forwarding
event_bridgeset →add_event(data["event_type"], data["payload"])called onceadd_eventraises → both error logs emitted (second one digspayload["ironic_object.data"]["name"]with.getfallbacks →"unknown"for non-ironic payloads), processing continuesevent_bridge = None→ no forwarding attemptedOSISM API delivery (
osism_api_sessionset)postreturns status 204 → success log, exactly one call, no handler dispatchrequests.ConnectionErroron all 3 tries → 3 posts, give-up log includesjson.dumps(data),sleepcalled with 3 then 9 (pow(3, tries - 1))requests.Timeout→ same retry/give-up pathHTTPErrorwithresponse.status_code = 404→ gives up after first try ("client side error")status_code = 500→ also gives up early (condition is<= 500) — assert to document current behaviorstatus_code = 503→ retried up to 3 timesraise_for_status()no-op) → falls through to the failure branch and retries — document current behaviorpriority,event_type,timestamp,publisher_id,message_id,payloadHandler dispatch (no API session)
osism_api_session = None→baremetal_events.get_handler(data["event_type"])resolved and called withdata["payload"](stubget_handleror patch the threenetbox.*.delaytargets and assert the task call)main()—listener.py:486Patch
osism.services.listener.Connection,osism.services.listener.NotificationsDump,osism.services.listener.time.sleep. Escape thewhile Trueloop by raising a sentinel exception (e.g. fromsleepor the secondConnectioncall) and asserting it withpytest.raises.ConnectionraisesConnectionRefusedError→ error logged,sleep(60)called, loop retries (sentinel on second iteration)_new_exchanges_found.is_set()→Trueonce →_stop_exchange_discovery()called,_new_exchanges_found.clear()called,_available_exchangescarried over into the nextNotificationsDumpinstance (sentinel ends the second iteration)Mocking hints
BROKER_URIis read from the environment at import time (listener.py:62) — patchosism.services.listener.BROKER_URIinmain()tests, not the env var.osism.tasks.netboxandosism.settings; the sharedtests/conftest.pyansible stubs already make this import safe in the unit-test venv.NotificationsDumpalways needs a connection argument — a plainMagicMock()suffices; for_check_for_new_exchangesconfigureconnection.channel.return_value.__enter__.return_valueas the channel._exchange_discovery_loopsynchronously with a scripted_stop_discoverymock, and patchthreading.Threadin_start_exchange_discovery.should_stopis aConsumerMixinproperty-backed attribute; assertingconsumer.should_stop is Trueafter the discovery loop works on the instance.loguru_logsfixture fromtests/conftest.pyto assert log messages (pytestcaplogdoes not capture loguru)..delayis invoked here — patch the threeosism.services.listener.netbox.*.delayattributes; no broker or task execution involved.sys.modulesImportError trick, mapping a module name toNonemakesfrom osism.services.event_bridge import event_bridgeraiseImportError;mocker.patch.dict(sys.modules, ...)restores it automatically.Definition of Done
tests/unit/services/__init__.pypresent (created here or by the companion Tier 6 services issue — coordinate to avoid conflicts)tests/unit/services/test_listener.pycreatedpytest --cov=osism.services.listenershows ≥ 90 % (theif __name__ == "__main__"guard andconsumer.run()plumbing may stay uncovered)pipenv run pytest tests/unit/services/test_listener.pypasses locallyflake8,mypy,python-blackremain greenpython-osism-unit-testspassesDependencies
osism/services/(sharedtests/unit/services/__init__.py).