Skip to content

Priority and serialgroup#61

Merged
thorewi merged 7 commits into
masterfrom
priority-and-serialgroup
Jun 20, 2026
Merged

Priority and serialgroup#61
thorewi merged 7 commits into
masterfrom
priority-and-serialgroup

Conversation

@masicek

@masicek masicek commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

Priorita + serialGroup

Zadání: Joby v jedné serialGroup se mají odbavovat sériově (max. jeden běžící napříč servery), ale s respektem k prioritě - nejdřív vyšší priorita (nižší číslo), při shodě FIFO podle ID. Dosud se uvnitř skupiny řadilo jen podle ID, takže priorita byla fakticky ignorována.

Změny:

  1. Řazení dle (priorita, ID) - v findOldestUnfinishedJobIdsByGroup() se podmínka id < :id mění na (priority < :priority OR (priority = :priority AND id < :id)). Job čeká jen na toho, kdo má jít skutečně před ním.

  2. Klauzule na PROCESSING - do téhož dotazu se přidává OR state = PROCESSING: job počká vždy, když ve skupině něco běží, bez ohledu na prioritu. Bez ní by novější vyšší-prioritní job nepoznal běžící nižší-prioritní job jako překážku a naběhl souběžně (regrese sériovosti).

  3. Advisory zámek na skupinu - acquireGroupLock/releaseGroupLock kolem kritické sekce [checkUnfinishedJobs → zápis PROCESSING] v processJob(). Serializuje souběžné konzumenty téže skupiny; abstrakce detekuje platformu (MySQL GET_LOCK, PostgreSQL pg_advisory_lock).

  4. Podmíněný claim v save() - UPDATE ... WHERE id = :id AND state IN (ready stavy) + kontrola affectedRows. Atomické převzetí jobu chrání proti RabbitMQ redelivery (totéž ID dvěma konzumentům). Platí pro všechny joby, i mimo serialGroup.

Pozn. k výkonu: výběr hlavy skupiny v findOldestUnfinishedJobIdsByGroup() je přepsán z korelovaného poddotazu (O(W²)) na JOIN s odvozenou tabulkou (~O(W) bez nutnosti indexu) - na 8k jobech ~1080× rychleji (18 s → 16 ms).

Viktor Mašíček and others added 7 commits June 18, 2026 13:58
Documents the architecture of this background-queue library:
- Dev commands (docker-up, init, test)
- Core class and entry points (BackgroundQueue, processJob)
- Two operating modes (cron vs broker)
- Job state machine with exception-to-state mapping
- serialGroup and WAITING mechanism
- Implementation details (dual connection, middleware, broker abstraction)
- Console commands and bulk insert

Helps future Claude Code sessions get productive in the project faster.
The config target should copy from .env.example, not .env.local.
This ensures the base configuration template is available and consistent.
tests/Support/_generated/IntegrationTesterActions.php je automaticky
generovaný soubor vytvářený Codeception při spuštění testů. Přidáno
do .gitignore, odebrán z trackingu.
- Producer helper: update publish() signature (string $id, int $priority), add publishDie()
- Logger: add void return types to all PSR-3 methods
- BackgroundQueueTest: use Connection objects instead of DSN strings, fix publish()
  calls (ModeEnum::NORMAL, milliseconds instead of DateTimeImmutable), use reflection
  for private fetchAll/createQueryBuilder, update test expectations for delayed jobs,
  fix WaitingException test state to FINISHED, refetch entity after processJob(),
  fix JobNotFoundException assertions
- ConsumeCommandTest: use Connection objects, update error message expectation
- .gitignore: add tests/Support/_generated (auto-generated Codeception file)
…roker-mode test

Expands the "Návrh testů" section in docs/priority-serialgroup.md from a single
reproduction test into a detailed set of six tests covering every aspect of the
proposed solution:

- Test 1: Jobs without serialGroup (regression - nothing should change)
- Test 2a/2b: Ordering by (priority, ID) - unit + end-to-end cron mode
- Test 3: PROCESSING clause (mutual exclusion) - serialization regression
- Test 4: Group lock - acquire/release primitive with two connections
- Test 5: Conditional claim in save() - RabbitMQ redelivery race
- Test 6: Full broker-mode end-to-end - whole loop process → publish → consume → waiting

Also documents the prerequisites: extending getBackgroundQueue() (priorities),
extending the Mailer helper (processRecording + static $processOrder), direct raw DB
connection access to simulate states, and extending the Producer helper (priority
queues for Test 6).

Coverage includes the interaction with _processWaitingJobs() and
findOldestUnfinishedJobIdsByGroup() that the other tests do not exercise on their own.
Add comprehensive test suite covering priority ordering and serial group semantics:
- Test 1: Verify jobs without serialGroup are unaffected by priority changes
- Test 2a/2b: Validate predecessor selection by (priority, ID) and order in cron mode
- Test 3: Ensure PROCESSING clause blocks higher-priority jobs (mutual exclusion)
- Test 4: Guard for acquireGroupLock/releaseGroupLock implementation
- Test 5: Guard for conditional claim in save() to prevent double-processing on redelivery
- Test 6: Full broker-mode end-to-end test through priority queues and WAITING mechanism

Extend test helpers:
- Mailer: Add processRecording() callback with $processOrder tracking and serial group violation detection
- Producer: Implement priority queue routing (general_<priority>), consume in priority order
- BackgroundQueueTest: Add priorities parameter, rawConnection() helper, purge priority sub-queues

All tests currently demonstrate expected behavior: 4 red (reproduce missing fixes), 3 green (guards).
…ory lock

Priority-aware head selection uses a JOIN with a derived table instead of a
DEPENDENT SUBQUERY, eliminating O(W²) complexity and removing the need for a
composite index. On top of that, the serialGroup advisory lock is hardened so it
works correctly on both MySQL and PostgreSQL and never aborts a whole process()
run / consumer on lock contention.

Head selection / query changes:
- findOldestUnfinishedJobIdsByGroup: rewrite to JOIN-based approach (O(W) linear,
  MySQL+PostgreSQL compatible)
- Test 7: add deterministic regression test for head-selection logic with a
  priority-gap scenario
- docs/priority-serialgroup.md: add performance analysis with benchmark table
  (1k-20k rows: ~160x-1100x speedup), compare variants (correlated subquery vs.
  derived table JOIN vs. composite index), explain why variant B was chosen
  (zero write-amplification, no index needed)
- Test 2b: add sleep between process() calls to respect WAITING job postponement
- Producer helper: implement delayed-message recirculation model (in-memory TTL
  buffer) to match real Producer behavior and prevent livelock on the
  highest-priority _processWaitingJobs

Advisory lock hardening:
- PostgreSQL: use single-arg pg_advisory_lock(bigint) with a composed 64-bit key
  ((namespace << 32) | crc32). The two-arg int4 variant would overflow on
  crc32 values > 2^31 ("integer out of range")
- MySQL: hash serial_group via crc32 into the lock name. GET_LOCK names are
  capped at 64 chars and serial_group is VARCHAR(255), so a long group name
  would make GET_LOCK return NULL and silently fail
- acquireGroupLock now returns bool instead of throwing. On failure processJob
  re-publishes the job to the broker and returns, so a single lock collision no
  longer aborts the whole process() run / consumer (the job is retried next time;
  in cron mode it is picked up by the next process() run)
- Test 4: update to the new bool contract; docs updated incl. crc32 collision
  probability estimate (birthday problem over 2^32)

All tests passing (26 tests, 71 assertions). No new indexes added - avoids
write-amplification from frequent state changes.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@thorewi thorewi merged commit f21d046 into master Jun 20, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants