Priority and serialgroup#61
Merged
Merged
Conversation
Documents the architecture of this background-queue library: - Dev commands (docker-up, init, test) - Core class and entry points (BackgroundQueue, processJob) - Two operating modes (cron vs broker) - Job state machine with exception-to-state mapping - serialGroup and WAITING mechanism - Implementation details (dual connection, middleware, broker abstraction) - Console commands and bulk insert Helps future Claude Code sessions get productive in the project faster.
The config target should copy from .env.example, not .env.local. This ensures the base configuration template is available and consistent.
tests/Support/_generated/IntegrationTesterActions.php je automaticky generovaný soubor vytvářený Codeception při spuštění testů. Přidáno do .gitignore, odebrán z trackingu.
- Producer helper: update publish() signature (string $id, int $priority), add publishDie() - Logger: add void return types to all PSR-3 methods - BackgroundQueueTest: use Connection objects instead of DSN strings, fix publish() calls (ModeEnum::NORMAL, milliseconds instead of DateTimeImmutable), use reflection for private fetchAll/createQueryBuilder, update test expectations for delayed jobs, fix WaitingException test state to FINISHED, refetch entity after processJob(), fix JobNotFoundException assertions - ConsumeCommandTest: use Connection objects, update error message expectation - .gitignore: add tests/Support/_generated (auto-generated Codeception file)
…roker-mode test Expands the "Návrh testů" section in docs/priority-serialgroup.md from a single reproduction test into a detailed set of six tests covering every aspect of the proposed solution: - Test 1: Jobs without serialGroup (regression - nothing should change) - Test 2a/2b: Ordering by (priority, ID) - unit + end-to-end cron mode - Test 3: PROCESSING clause (mutual exclusion) - serialization regression - Test 4: Group lock - acquire/release primitive with two connections - Test 5: Conditional claim in save() - RabbitMQ redelivery race - Test 6: Full broker-mode end-to-end - whole loop process → publish → consume → waiting Also documents the prerequisites: extending getBackgroundQueue() (priorities), extending the Mailer helper (processRecording + static $processOrder), direct raw DB connection access to simulate states, and extending the Producer helper (priority queues for Test 6). Coverage includes the interaction with _processWaitingJobs() and findOldestUnfinishedJobIdsByGroup() that the other tests do not exercise on their own.
Add comprehensive test suite covering priority ordering and serial group semantics: - Test 1: Verify jobs without serialGroup are unaffected by priority changes - Test 2a/2b: Validate predecessor selection by (priority, ID) and order in cron mode - Test 3: Ensure PROCESSING clause blocks higher-priority jobs (mutual exclusion) - Test 4: Guard for acquireGroupLock/releaseGroupLock implementation - Test 5: Guard for conditional claim in save() to prevent double-processing on redelivery - Test 6: Full broker-mode end-to-end test through priority queues and WAITING mechanism Extend test helpers: - Mailer: Add processRecording() callback with $processOrder tracking and serial group violation detection - Producer: Implement priority queue routing (general_<priority>), consume in priority order - BackgroundQueueTest: Add priorities parameter, rawConnection() helper, purge priority sub-queues All tests currently demonstrate expected behavior: 4 red (reproduce missing fixes), 3 green (guards).
…ory lock
Priority-aware head selection uses a JOIN with a derived table instead of a
DEPENDENT SUBQUERY, eliminating O(W²) complexity and removing the need for a
composite index. On top of that, the serialGroup advisory lock is hardened so it
works correctly on both MySQL and PostgreSQL and never aborts a whole process()
run / consumer on lock contention.
Head selection / query changes:
- findOldestUnfinishedJobIdsByGroup: rewrite to JOIN-based approach (O(W) linear,
MySQL+PostgreSQL compatible)
- Test 7: add deterministic regression test for head-selection logic with a
priority-gap scenario
- docs/priority-serialgroup.md: add performance analysis with benchmark table
(1k-20k rows: ~160x-1100x speedup), compare variants (correlated subquery vs.
derived table JOIN vs. composite index), explain why variant B was chosen
(zero write-amplification, no index needed)
- Test 2b: add sleep between process() calls to respect WAITING job postponement
- Producer helper: implement delayed-message recirculation model (in-memory TTL
buffer) to match real Producer behavior and prevent livelock on the
highest-priority _processWaitingJobs
Advisory lock hardening:
- PostgreSQL: use single-arg pg_advisory_lock(bigint) with a composed 64-bit key
((namespace << 32) | crc32). The two-arg int4 variant would overflow on
crc32 values > 2^31 ("integer out of range")
- MySQL: hash serial_group via crc32 into the lock name. GET_LOCK names are
capped at 64 chars and serial_group is VARCHAR(255), so a long group name
would make GET_LOCK return NULL and silently fail
- acquireGroupLock now returns bool instead of throwing. On failure processJob
re-publishes the job to the broker and returns, so a single lock collision no
longer aborts the whole process() run / consumer (the job is retried next time;
in cron mode it is picked up by the next process() run)
- Test 4: update to the new bool contract; docs updated incl. crc32 collision
probability estimate (birthday problem over 2^32)
All tests passing (26 tests, 71 assertions). No new indexes added - avoids
write-amplification from frequent state changes.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Priorita + serialGroup
Zadání: Joby v jedné
serialGroupse mají odbavovat sériově (max. jeden běžící napříč servery), ale s respektem k prioritě - nejdřív vyšší priorita (nižší číslo), při shodě FIFO podle ID. Dosud se uvnitř skupiny řadilo jen podle ID, takže priorita byla fakticky ignorována.Změny:
Řazení dle
(priorita, ID)- vfindOldestUnfinishedJobIdsByGroup()se podmínkaid < :idmění na(priority < :priority OR (priority = :priority AND id < :id)). Job čeká jen na toho, kdo má jít skutečně před ním.Klauzule na
PROCESSING- do téhož dotazu se přidáváOR state = PROCESSING: job počká vždy, když ve skupině něco běží, bez ohledu na prioritu. Bez ní by novější vyšší-prioritní job nepoznal běžící nižší-prioritní job jako překážku a naběhl souběžně (regrese sériovosti).Advisory zámek na skupinu -
acquireGroupLock/releaseGroupLockkolem kritické sekce[checkUnfinishedJobs → zápis PROCESSING]vprocessJob(). Serializuje souběžné konzumenty téže skupiny; abstrakce detekuje platformu (MySQLGET_LOCK, PostgreSQLpg_advisory_lock).Podmíněný claim v
save()-UPDATE ... WHERE id = :id AND state IN (ready stavy)+ kontrolaaffectedRows. Atomické převzetí jobu chrání proti RabbitMQ redelivery (totéž ID dvěma konzumentům). Platí pro všechny joby, i mimo serialGroup.Pozn. k výkonu: výběr hlavy skupiny v
findOldestUnfinishedJobIdsByGroup()je přepsán z korelovaného poddotazu (O(W²)) na JOIN s odvozenou tabulkou (~O(W) bez nutnosti indexu) - na 8k jobech ~1080× rychleji (18 s → 16 ms).