Záznamy v jedné frontě (např. transcribe) sdílejí stejnou serialGroup a mají se
odbavovat striktně jeden po druhém (sériově), i přes více serverů (2 servery, na každém
jeden konzument). Záznamy ale mají různé priority a je potřeba, aby priorita měla
přednost: nejdřív se mají odbavit všechny s prioritou 1, teprve pak s prioritou 2.
Cílová sémantika (potvrzeno v diskuzi):
- serialGroup = vzájemné vyloučení (sériovost) - z jedné skupiny běží vždy max. jeden job současně, napříč všemi konzumenty/servery. Neurčuje pořadí.
- priorita = pořadí - z čekajících jobů jde na řadu nejdřív vyšší priorita (nižší číslo), při shodě priority FIFO podle ID.
Pro frontu a1 b1 c2 d1 e1 f2 g1 (vše jedna skupina, čísla = priorita) je cílové pořadí
zpracování a1 b1 d1 e1 g1 c2 f2.
Priorita = oddělené fronty, řazené vzestupně. V RabbitMQ se priorita modeluje jako
samostatná fronta <queue>_<priority> (src/Broker/PhpAmqpLib/Producer.php:21-23).
Konzument je obchází ve vzestupném pořadí priorit (src/Broker/PhpAmqpLib/Consumer.php:24-37),
takže nižší číslo = vyšší přednost a priorita 1 se odbavuje před prioritou 2.
serialGroup = striktní řazení podle ID. Při zpracování jobu se serialGroup volá
checkUnfinishedJobs() (src/BackgroundQueue.php:714-733) →
getPreviousUnfinishedJobId() → findOldestUnfinishedJobIdsByGroup()
(src/BackgroundQueue.php:594-620). Ta hledá jakýkoli nedokončený job ve stejné skupině
s id < :id (řádek 607-608). Pokud existuje, aktuální job jde do STATE_WAITING.
Tedy: job čeká na kterýkoli job s nižším ID ve skupině, bez ohledu na prioritu.
Tyto dvě pravidla si odporují. Skupina transcribe, joby vznikají v pořadí:
- ID 1, priorita 2 (do fronty
transcribe_2) - ID 2, priorita 1 (do fronty
transcribe_1)
- Konzument preferuje
transcribe_1→ vezme ID 2 (prio 1).checkUnfinishedJobsnajde starší ID 1 (id < 2) → ID 2 jde do WAITING, ačkoli má vyšší prioritu. - ID 1 (prio 2) se z
transcribe_2nedostane ke slovu, dokud konzument upřednostňuje prioritu 1, kam stále přitékají joby, které okamžitě skončí ve WAITING. _processWaitingJobsWAITING joby vrací zpět do jejich prioritní fronty → smyčka se opakuje.
Výsledkem je, že priorita je fakticky ignorována / invertována: starší job s horší prioritou (nižší ID) blokuje novější joby s vyšší prioritou.
I bez priorit existuje race, která platí již dnes: RabbitMQ doručí totéž ID jobu dvěma konzumentům, pokud konzumentu A vypadne TCP spojení s RabbitMQ (síťový hiccup, restart klienta) zatímco job zpracovává. RabbitMQ označí zprávu za nedoručenou a doručí ji konzumentu B - přitom konzument A stále běží a job také zpracovává.
Ochrana existuje: isReadyForProcess() (src/BackgroundQueue.php:243) odstaví konzumenta
B, pokud A stihl zapsat STATE_PROCESSING do DB. Ale mezi getEntity() (řádek 237) a
save() (řádek 284) je check-then-act okénko - pokud B získá job v tomto okamžiku, oba
projdou a tentýž job běží dvakrát.
Opravuje to podmíněný claim: UPDATE ... WHERE id = :id AND state IN (ready stavy) +
kontrola affectedRows == 1 (viz Cesta A, bod 4 níže). Tato oprava je nezávislá na
prioritách a smysluplná i sama o sobě.
Problém nelze vyřešit pouhou záměnou řazení id → (priorita, id).
Uvnitř serialGroup se řadí výhradně podle ID: job počká, pokud ve skupině existuje
jakýkoli nedokončený job s nižším ID (id < :id v findOldestUnfinishedJobIdsByGroup(),
src/BackgroundQueue.php:607-608).
Protože autoincrement ID jen roste, platí jednoduchá rovnice:
běžící (PROCESSING) job = ten, co začal nejdřív = má nejnižší ID ve skupině = je předchůdcem úplně všech ostatních.
Takže každý nově příchozí job má vždy vyšší ID než ten běžící, uvidí ho přes id < :id
jako svého předchůdce a počká ve STATE_WAITING. Vzájemné vyloučení tu vůbec není
explicitně naprogramované - vzniká jako vedlejší efekt řazení podle ID. Ordering
(pořadí) a exclusion (max. jeden běžící) jsou dnes jedna a táž podmínka.
Jakmile o pořadí začne rozhodovat (priorita, ID), ta rovnice přestane platit:
- Běží job s horší prioritou (vyšší číslo), který má ale nízké ID (přišel dřív).
- Pak přijde nový job s lepší prioritou (nižší číslo), nutně s vyšším ID (přišel později).
Nový job má jít před ten běžící (lepší priorita). Podmínka
(priority < :priority OR (priority = :priority AND id < :id)) ho neblokuje - běžící
job má horší prioritu, takže z pohledu pořadí není předchůdce. Nový job tedy nepočká a
rozjede se souběžně s běžícím → dva joby téže skupiny běží naráz → porušení
sériovosti.
Závěr: Z jedné podmínky se musí stát dvě oddělené věci:
- Ordering - "kdo jde na řadu" →
(priorita, ID). - Mutual exclusion - "nikdy dva naráz" → musí se vynutit zvlášť, protože už ji ordering "nenese s sebou".
To řeší obě navržené cesty: Cesta A přidává explicitní klauzuli na state = PROCESSING
(bod 2 níže) plus zámek na skupinu, Cesta B dělá exclusion strukturálně tím, že do
brokera pustí vždy jen jednu hlavu skupiny.
Uvnitř serialGroup přestaneme řadit podle ID a začneme řadit podle (priorita, ID).
Sériovost zajistíme explicitní kontrolou běžícího jobu + zámkem, aby se to nerozbilo
při souběhu konzumentů.
1) Výběr předchůdce dle (priorita, ID)
V findOldestUnfinishedJobIdsByGroup() (src/BackgroundQueue.php:594-620) nahradit
podmínku id < :id (řádek 607-608) za lexikografické porovnání a řadit/agregovat podle
(priority, id) místo jen id:
(priority < :priority OR (priority = :priority AND id < :id))Job pak čeká, jen pokud ve skupině existuje nedokončený job, který má jít před ním (vyšší priorita, nebo stejná priorita a nižší ID).
2) Klauzule na PROCESSING (vzájemné vyloučení)
Do téhož dotazu přidat: job počká vždy, když je ve skupině jakýkoli job ve stavu
PROCESSING, bez ohledu na prioritu:
... AND id <> :id AND (
state = PROCESSING
OR (priority < :priority OR (priority = :priority AND id < :id))
)Bez této klauzule by později vložený job s vyšší prioritou nepoznal běžící nižší-prioritní job jako překážku a naběhl by souběžně. Tato část je povinná - bez ní oprava zanáší regresi sériovosti, která dnes neexistuje.
Příklad kolize bez klauzule na PROCESSING (nevyžaduje souběh - stane se vždy):
| Čas | Konzument A | Konzument B | DB stav |
|---|---|---|---|
| T1 | zpracovává ID 1 (prio 2), callback běží | - | ID 1 = PROCESSING |
| T2 | - | checkUnfinishedJobs(ID 2, prio 1): hledá job kde (priority < 1 OR (priority=1 AND id < 2)) → ID 1 má prioritu 2, nesplňuje ani jednu větev → žádný předchůdce |
ID 1 = PROCESSING |
| T3 | - | zapisuje PROCESSING pro ID 2, spouští callback | ID 1 = PROCESSING, ID 2 = PROCESSING |
Oba joby skupiny transcribe běží naráz. S klauzulí OR state = PROCESSING B v T2 ID 1
najde a správně jde do WAITING:
| Čas | Konzument A | Konzument B | DB stav |
|---|---|---|---|
| T1 | zpracovává ID 1 (prio 2), callback běží | - | ID 1 = PROCESSING |
| T2 | - | checkUnfinishedJobs(ID 2, prio 1): state = PROCESSING → ID 1 je blocker → jde do WAITING |
ID 1 = PROCESSING |
| T3 | dokončí, zapisuje FINISHED | - | ID 1 = FINISHED, ID 2 = WAITING |
| T4 | - | _processWaitingJobs přepne ID 2 na READY, publikuje do brokera |
ID 2 = READY |
| T5 | - | konzument převezme ID 2, spouští callback | ID 2 = PROCESSING |
3) Zámek na skupinu kolem kritické sekce
processJob() (src/BackgroundQueue.php:232-284) je dnes čistý check-then-act:
nejdřív checkUnfinishedJobs() (řádek 254), pak až o kus dál setState(PROCESSING)
save()(272-284), mezi tím není zámek. Dva konzumenti tak mohou projít kontrolou současně.
Řešení: kolem [checkUnfinishedJobs + zápis PROCESSING] dát pojmenovaný zámek na
skupinu - MySQL GET_LOCK('bgq:'+crc32(serialGroup)) na začátku, RELEASE_LOCK po zápisu.
Drží se jen pár mikrosekund (ne po dobu zpracování callbacku). processJob má vlastní
createConnection() (řádek 235), takže connection-scoped zámek sedí. Serializuje jen
konzumenty téže skupiny, různé skupiny se neperou.
Abstrakce pro MySQL i PostgreSQL
Advisory locky jsou v obou DB různé, ale logika je stejná. Řešení: dvě privátní metody
na BackgroundQueue, které detekují platformu přes Doctrine DBAL a zavolají správný SQL.
Detekce platformy:
use Doctrine\DBAL\Platforms\MySQLPlatform;
use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
$platform = $this->connection->getDatabasePlatform();acquireGroupLock vrací bool (true = získáno, false = v limitu nezískáno) - nevyhazuje
výjimku. Když se zámek nezíská, processJob job nezahodí: vrátí ho do brokera
(publishToBroker) a zpracuje se příště (v cron módu je to no-op a job vezme příští běh
process()). Tím jediná kolize zámku neshodí výjimkou celý běh process() / konzumenta.
MySQL (GET_LOCK / RELEASE_LOCK):
private function acquireGroupLock(string $serialGroup): bool
{
// Název zámku musí být max 64 znaků (jinak GET_LOCK vrátí NULL); serial_group je
// VARCHAR(255), proto skupinu hashujeme přes crc32 na stabilní krátký název.
$acquired = $this->connection->fetchOne(
"SELECT GET_LOCK(?, ?)",
['bgq:' . crc32($serialGroup), self::GROUP_LOCK_TIMEOUT]
);
return (int) $acquired === 1; // 1 = získáno, 0 = timeout, NULL = chyba
}
private function releaseGroupLock(string $serialGroup): void
{
$this->connection->executeStatement(
"SELECT RELEASE_LOCK(?)",
['bgq:' . crc32($serialGroup)]
);
}PostgreSQL (pg_advisory_lock / pg_advisory_unlock):
private function acquireGroupLock(string $serialGroup): bool
{
// pg_advisory_lock(bigint): klíč = (namespace << 32) | crc32(skupina). Dvouargumentová
// varianta bere int4 a crc32 (až 2^32-1) by ji přetekla ("integer out of range"), proto
// jednoargumentová bigint varianta se složeným 64bit klíčem. Timeout přes lock_timeout.
$this->connection->executeStatement("SET lock_timeout = '" . (self::GROUP_LOCK_TIMEOUT * 1000) . "'");
try {
$this->connection->executeStatement(
"SELECT pg_advisory_lock(?)",
[(self::PG_LOCK_NAMESPACE << 32) | crc32($serialGroup)]
);
} catch (\Doctrine\DBAL\Exception $e) {
return false; // vypršení lock_timeout - job zpracujeme příště
}
return true;
}
private function releaseGroupLock(string $serialGroup): void
{
$this->connection->executeStatement(
"SELECT pg_advisory_unlock(?)",
[(self::PG_LOCK_NAMESPACE << 32) | crc32($serialGroup)]
);
}Konstanta PG_LOCK_NAMESPACE (0x42475100 = ASCII BGQ\0) tvoří horní 32 bitů klíče a
odděluje zámky background-queue od ostatních advisory locků v aplikaci; dolních 32 bitů je
crc32($serialGroup). Složený klíč se vejde do signed bigint (namespace << 32 ≈ 4,8e18 <
bigint max ~9,2e18). MySQL i PostgreSQL tak mapují skupinu na crc32, takže kolize nastane
jen při kolizi crc32 dvou různých názvů skupin - viz odhad pravděpodobnosti níže. Kolize
neohrozí korektnost, jen by dvě nezávislé skupiny zbytečně serializovala.
Pravděpodobnost kolize crc32 (birthday problem, prostor 2^32). Pro n současně živých
různých názvů skupin je šance, že aspoň dvě sdílí crc32, ≈ 1 - e^(-n²/2^33):
různých skupin n |
P(aspoň jedna kolize) |
|---|---|
| 100 | ~0,0001 % (1 z ~860 tis.) |
| 1 000 | ~0,012 % (1 z ~8 600) |
| 10 000 | ~1,15 % |
| 50 000 | ~25 % |
| 77 000 | ~50 % |
V praxi je počet současně živých serialGroup řádově malý (jednotky až stovky), takže je
kolize zanedbatelná. Kdyby se počet skupin blížil desítkám tisíc, šlo by přejít na 64bit
hash (např. crc32b + crc32 do dvou půlek, nebo substr(md5(...), 0, 16) na hex → int).
processJob volá acquireGroupLock / releaseGroupLock beze znalosti DB platformy
a switch logika žije pouze v těchto dvou metodách.
Příklad kolize bez zámku (A má horší prioritu, B přijde dřív než A zapsal PROCESSING):
| Čas | Konzument A | Konzument B | DB stav |
|---|---|---|---|
| T1 | checkUnfinishedJobs(ID 1, prio 2): žádný předchůdce → projde |
- | ID 1 = READY |
| T2 | (ještě nepsal PROCESSING) | checkUnfinishedJobs(ID 2, prio 1): ID 1 má prio 2 (nesplňuje priority < 1), prio 2 ≠ 1 (nesplňuje ordering), stav = READY (ne PROCESSING) → žádný předchůdce → projde |
ID 1 = READY |
| T3 | zapisuje PROCESSING pro ID 1 | (již po checku) | ID 1 = PROCESSING |
| T4 | - | zapisuje PROCESSING pro ID 2 | ID 1 = PROCESSING, ID 2 = PROCESSING |
Se zámkem B v T2 čeká, než A dokončí celou sekvenci [check → zápis PROCESSING]. Až pak provede svůj check - a tehdy ID 1 už je PROCESSING → B správně jde do WAITING:
| Čas | Konzument A | Konzument B | DB stav |
|---|---|---|---|
| T1 | GET_LOCK('bgq:transcribe') → získá |
- | ID 1 = READY |
| T2 | checkUnfinishedJobs(ID 1, prio 2) → projde |
GET_LOCK('bgq:transcribe') → čeká |
ID 1 = READY |
| T3 | zapisuje PROCESSING pro ID 1 | (stále čeká) | ID 1 = PROCESSING |
| T4 | RELEASE_LOCK |
- | ID 1 = PROCESSING |
| T5 | - | získá zámek | ID 1 = PROCESSING |
| T6 | - | checkUnfinishedJobs(ID 2, prio 1): state = PROCESSING → ID 1 je blocker → jde do WAITING |
ID 1 = PROCESSING |
| T7 | - | RELEASE_LOCK |
ID 1 = PROCESSING, ID 2 = WAITING |
4) Podmíněný claim (levná pojistka)
Zámek (bod 3) chrání kritickou sekci [checkUnfinishedJobs → zápis PROCESSING] uvnitř
jednoho processJob() volání. Ale existuje scénář, který ho obejde: tentýž job ID
doručený dvěma konzumentům (RabbitMQ redelivery při výpadku spojení konzumenta).
Dnes save() (src/BackgroundQueue.php:643) dělá:
UPDATE background_job SET state = PROCESSING, ... WHERE id = :idBez podmínky na stav. Takže pokud konzument A drží job a zpracovává ho, ale jeho TCP
spojení s RabbitMQ vypadne - RabbitMQ doručí totéž ID konzumentu B. B projde
isReadyForProcess() (job je stále PROCESSING → vrátí false) a zastaví se. Ale
pouze pokud A stihl zapsat PROCESSING. Pokud B dostane zprávu dřív, než A zapsal:
| Čas | Konzument A | Konzument B | DB stav |
|---|---|---|---|
| T1 | isReadyForProcess() → READY → OK |
- | ID 1 = READY |
| T2 | (ještě nepsal PROCESSING) | dostane redelivery ID 1, isReadyForProcess() → READY → OK |
ID 1 = READY |
| T3 | UPDATE SET state=PROCESSING WHERE id=1 → zapíše |
- | ID 1 = PROCESSING |
| T4 | spouští callback | UPDATE SET state=PROCESSING WHERE id=1 → také zapíše (bez podmínky na stav!) |
ID 1 = PROCESSING |
| T5 | callback běží | callback také běží | oba zpracovávají ID 1 |
Oprava: doplnit podmínku na stav a kontrolovat affectedRows:
UPDATE background_job SET state = PROCESSING, ... WHERE id = :id AND state IN (1, 4, -1)
-- STATE_READY=1, STATE_TEMPORARILY_FAILED=4, STATE_BACK_TO_BROKER=-1$affected = $this->connection->update(
$this->config['tableName'],
$entity->getDatabaseValues(),
['id' => $entity->getId(), 'state' => $readyStates] // WHERE id = ? AND state IN (?)
);
if ($affected === 0) {
return; // job mezitím sebral nebo změnil stav jiný konzument
}Pokud affectedRows === 0, job mezitím přešel do jiného stavu (jiný konzument ho
zpracovává nebo dokončil) → aktuální konzument tiše vrátí return. Chrání i mimo
serialGroup - platí pro jakýkoli job doručený vícekrát.
Jde o laciné pojistné patro navíc: UPDATE je atomický na úrovni DB, takže i při dokonalém souběhu T3 a T4 může podmíněný UPDATE uspět jen jednomu z konzumentů.
Náročnost je třeba posoudit zvlášť pro dva dotazy, které řazení dle (priorita, ID)
přepracovává.
a) getPreviousUnfinishedJobId (hot path - volá se pro každý zpracovávaný job).
Přidané podmínky (priority, PROCESSING) jsou jen residual filtry na řádcích, které už
vybral index state (updateSchema() zakládá indexy na id, identifier, state).
Dotaz je navíc omezen na jednu serial_group a stačí mu existence prvního předchůdce, proto
má LIMIT 1 a vrací max jeden řádek. Žádný nový index nepotřebuje.
b) findOldestUnfinishedJobIdsByGroup (volá processWaitingJobs, v broker módu zhruba
jednou za waitingJobExpiration, tj. ~1×/s). Tady "hlava skupiny" už není prosté
MIN(id) jako ve staré implementaci (řazení jen dle ID), ale "nejmenší ID z nejvyšší
přítomné priority ve skupině". To vyžaduje agregaci přes (priorita, ID) a právě tady
vznikl problém s náročností. Níže zvažované varianty řeší jen tenhle dotaz.
-
Stará implementace (před prioritou):
SELECT MIN(id) ... GROUP BY serial_group. Jeden grupovaný průchod, ~O(W) (W = počet nedokončených jobů ve frontě). Funguje ale jen pro řazení čistě podle ID, prioritu vyjádřit neumí. -
Mezikrok (zavržený) - dedup v PHP: vytáhnout všechny řádky seřazené dle
(priorita, ID)a v PHP nechat první na skupinu. Jednoduché, ale do PHP táhne všechny WAITING joby napříč skupinami - horší než stará implementace. Zamítnuto. -
Korelovaný poddotaz:
... AND priority = (SELECT MIN(sub.priority) ... WHERE sub.serial_group = ...). Agregace zpět v SQL (jeden řádek na skupinu), aleEXPLAINukázalDEPENDENT SUBQUERY- poddotaz se vyhodnocuje pro každý vnější řádek, tedy O(W²). Bez kompozitního indexu na velkém objemu extrémně pomalé (viz benchmark). -
Varianta A - kompozitní index
(serial_group, state, priority, id): korelovaný poddotaz se s ním změní z plného skenu nareflookup, tedy zpět k ~lineárnímu. Nevýhody:stateje v indexu, takže každá z mnoha změn stavu jobu navíc udržuje tenhle (široký, kvůliserial_group VARCHAR(255)má klíč ~1 KB) index = write-amplifikace; index pokrývá i řádky bez serialGroup (serial_group IS NULL) a pomáhá jen dvěma dotazům. -
Varianta B (zvolená) - JOIN s odvozenou tabulkou: nejnižší priorita každé skupiny se spočítá jednou v odvozené tabulce a připojí se zpět:
SELECT MIN(t.id) AS id FROM background_job t INNER JOIN ( SELECT serial_group, MIN(priority) AS min_priority FROM background_job WHERE queue LIKE ... AND state IN (...) GROUP BY serial_group ) head ON head.serial_group = t.serial_group AND t.priority = head.min_priority WHERE t.queue LIKE ... AND t.state IN (...) GROUP BY t.serial_group ORDER BY id ASC
~O(W) nezávisle na indexu - žádný nový index, žádná daň na zápisech.
| řádků (W) | skupin | korelovaný poddotaz | odvozená tabulka (B) | zrychlení |
|---|---|---|---|---|
| 1 000 | 50 | 289 ms | 1,8 ms | ~160× |
| 2 000 | 100 | 1 124 ms | 3,3 ms | ~340× |
| 4 000 | 200 | 4 480 ms | 8,8 ms | ~510× |
| 8 000 | 400 | 17 893 ms | 16,5 ms | ~1080× |
| 20 000 | 1 000 | ~112 000 ms* | 100 ms | ~1100× |
* korelovaný na 20k nedoměřen do konce (běžel by ~2 min/běh); hodnota je extrapolace z čisté O(W²) řady (každé zdvojnásobení W ≈ 4× čas). Výstupy obou variant jsou bitově shodné (ověřeno md5 i regresním testem, viz Test 7 níže).
Závěr: Korelovaný poddotaz roste kvadraticky a při ~1×/s cadenci v broker módu je na 8k+
nedokončených jobech neudržitelný (18 s/běh). Varianta B roste prakticky lineárně a zůstává
v jednotkách až ~100 ms i na 20k, bez nutnosti indexu (a tedy bez write-amplifikace na časté
změny stavu, kterou by přinesla varianta A). Proto je v
findOldestUnfinishedJobIdsByGroup zvolena varianta B; hot-path getPreviousUnfinishedJobId
řeší množství přes LIMIT 1. Kompozitní index (A) tím není potřeba - pokud by se v budoucnu
hodil pro jiné dotazy, je to nezávislé rozhodnutí.
- Plus: malý, lokální zásah - jádro je úprava jednoho dotazu.
- Plus: přenositelnost MySQL/PostgreSQL řeší abstrakce
acquireGroupLock/releaseGroupLocks detekcí platformy přes Doctrine DBAL (viz výše). - Minus: závislost na DB advisory locku - přidává další synchronizační primitiv.
- Vzájemné vyloučení zůstává vynucováno "líně" při konzumaci.
Joby bez serialGroup zůstávají beze změny: checkUnfinishedJobs()
(src/BackgroundQueue.php:716-718) pro ně hned dělá return true, takže nikdy nejdou do
WAITING ani nehledají předchůdce. Body 1-3 (řazení, klauzule na PROCESSING, zámek na
skupinu) se jich tedy netýkají - zámek se navíc bere jen je-li serialGroup nastavena.
Jediný dotyk je bod 4 (podmíněný claim v save()), který platí pro všechny joby jako
atomické převzetí ke zpracování; publikování zůstává netknuté.
Všechny testy patří do jediné existující suite Integration
(tests/Integration/BackgroundQueueTest.php). Drží se zavedených patternů: data providery
parametrizované přes producer/waitingQueue, reflexe na privátní metody
(new \ReflectionClass(BackgroundQueue::class), jako u testCheckUnfinishedJobs,
tests/Integration/BackgroundQueueTest.php:237-257) a helper fetchAllJobs() pro čtení
stavu jobů z DB.
Bez následujících úprav testy níže nelze napsat:
-
getBackgroundQueue()(tests/Integration/BackgroundQueueTest.php:278) - přidat parametrpriorities(default[1]jako dnes) a propsat ho do konfigurace, ať lze publikovat na různé priority. -
Mailerhelper (tests/Support/Helper/Mailer.php) - přidat callback, který si parametr (značku jobu) připíše do sdíleného statického pole, např.:public static array $processOrder = []; public function processRecording(string $mark): void { self::$processOrder[] = $mark; }
Pole se v
_before()/clear()musí vynulovat, aby testy byly nezávislé. -
Přímý přístup k DB connection v testu (jako
clear(),tests/Integration/BackgroundQueueTest.php:317) pro ruční nastavení stavu/priority řádku - tím se deterministicky nasimulují stavy, které by jinak vznikly jen souběhem (běžícíPROCESSINGjob, job sebraný jiným konzumentem). -
Producerhelper (tests/Support/Helper/Producer.php) - pouze pro Test 6.consume()(řádek 35) igetMessageCount()(řádek 49) dnes natvrdo pracují s frontou'general'a neumí prioritní fronty<queue>_<priority>(general_1,general_2). Pro broker-mode test je nutné je rozšířit tak, aby konzumovaly z prioritních front ve správném pořadí (nižší číslo dřív), jak to dělá reálnýConsumer(src/Broker/PhpAmqpLib/Consumer.php:24-37).
Každý test je psán tak, aby před opravou padal (červená) a po opravě prošel (zelená),
kromě testu bez serialGroup a testu zámkového primitiva, které ověřují, že se nic
nerozbilo / že nová abstrakce funguje.
Ověřuje, že se opravy bodů 1-3 jobů bez serialGroup vůbec nedotknou.
- Publikovat několik jobů bez
serialGroup, s proloženými prioritami (např. prio 2, prio 1, prio 2). - Reflexí zavolat
checkUnfinishedJobs()(src/BackgroundQueue.php:714) na každý z nich. - Aser: vždy vrací
true, žádný job neskončí veSTATE_WAITING(řádek 716-718 děláreturn truehned na vstupu). - Doplnit běh
process()(cron mód, bez produceru) a ověřit, že se všechny zpracují (STATE_FINISHED) - pořadí mezi nimi se neřeší, sériovost se neuplatňuje.
Jádro opravy a hlavní reprodukce. Lze pojmout dvěma vrstvami, doporučuji obě:
2a) Jednotka nad getPreviousUnfinishedJobId() (src/BackgroundQueue.php:582,
přes reflexi). Do jedné serialGroup vložit joby a ručně jim nastavit priority, pak
ověřit, koho dotaz označí za předchůdce:
- job A: ID 1, prio 2; job B: ID 2, prio 1 (vyšší priorita, přišel později).
getPreviousUnfinishedJobId(B)→null(B nemá jít po nikom: nikdo nemá vyšší prioritu ani stejnou prioritu s nižším ID).getPreviousUnfinishedJobId(A)→ ID 2 (B má jít před A).- Před opravou je výsledek opačný (řadí se jen podle
id < :id, řádek 607-608), takže test červeně reprodukuje bug.
2b) End-to-end pořadí přes process() v cron módu (bez produceru). Cron mód je pro
tento test čistě deterministický: process() (src/BackgroundQueue.php:184) projde
zpracovatelné joby, hlava skupiny se zpracuje a zbytek skupiny jde do STATE_WAITING;
opakovaným voláním process() (WAITING je v cron módu zpracovatelný stav) se postupně
odbaví celá skupina.
- Konfigurace
priorities[1, 2], callbackprocessRecording, jako značku použít identifikátor jobu (a,b, ...). - Do jedné
serialGrouppublikovat kanonický příklad ze zadání:a1 b1 c2 d1 e1 f2 g1(písmeno = pořadí vložení, číslo = priorita). - Volat
process()v cyklu (např. dokud existují nedokončené joby, s pojistným stropem iterací). - Aser:
Mailer::$processOrder === ['a','b','d','e','g','c','f'](priorita ASC, při shodě ID ASC). - Aser sériovosti: v žádném okamžiku neběží dva joby skupiny naráz - v cron módu to
plyne z toho, že
process()zpracovává joby inline jeden po druhém; lze ověřit nepřímo tím, že doSTATE_PROCESSINGse nikdy nedostanou dva řádky téže skupiny současně (kontrola uvnitřprocessRecordingpřesfetchAllJobs).
Ověřuje scénář z tabulky v bodě 2 (běžící nižší-prioritní job musí zablokovat novější job s vyšší prioritou). Deterministicky, bez souběhu, ruční manipulací stavu v DB:
- Do jedné
serialGrouppublikovat job A (ID 1, prio 2) a job B (ID 2, prio 1). - Ručně (raw connection) nastavit A na
STATE_PROCESSING(simulace běžícího callbacku). - Reflexí zavolat
checkUnfinishedJobs(B)(src/BackgroundQueue.php:714). - Aser: vrací
falsea B přejde doSTATE_WAITING, ačkoli A není ordering-předchůdce (A má horší prioritu). Blok vzniká jen díky klauzuliOR state = PROCESSING. - Bez této klauzule (a po pouhé opravě bodu 1) by
checkUnfinishedJobs(B)vrátiltruea vznikla by regrese sériovosti, kterou tento test chytí. - Doplňkový případ: A ve
STATE_FINISHED→checkUnfinishedJobs(B)vracítrue(dokončený job není překážka).
Plný race check-then-act (dva konzumenti projdou kontrolou současně) se v jednovláknovém PHPUnit procesu deterministicky nereprodukuje - tady to v testech přiznáváme a ověřujeme jen to, že zámkové primitivum funguje. Reálné vzájemné vyloučení za souběhu pak fakticky stojí na tomto primitivu plus na podmíněném claimu (Test 5).
Test samotného primitiva (acquireGroupLock / releaseGroupLock, přes reflexi), nezávisle
na platformě (poběží proti MySQL z CI):
- Vytvořit dvě samostatné DBAL connection ze stejného DSN (jako
processJobpřescreateConnection(),src/BackgroundQueue.php:235). - Na connection č. 1
acquireGroupLock('skupina')→ vrátítrue(uspěje). - Na connection č. 2
acquireGroupLock('skupina')s krátkým timeoutem → vrátífalse(u MySQLGET_LOCK(..., timeout)vrátí 0; metoda nevyhazuje výjimku, jen vrátífalse). - Na connection č. 1
releaseGroupLock('skupina'). - Na connection č. 2
acquireGroupLock('skupina')→ nyní vrátítrue(uspěje). - Druhý případ: dvě různé skupiny se navzájem neblokují (obě connection získají zámek každá pro svou skupinu) - ověří, že serializujeme jen v rámci jedné skupiny.
Ověřuje atomické převzetí jobu (RabbitMQ redelivery doručí totéž ID dvakrát). Race mezi
dvěma UPDATE se nasimuluje deterministicky tím, že DB řádek přepneme „pod rukama" do
ne-ready stavu, zatímco v paměti držíme starou (READY) entitu:
- Publikovat job bez
serialGroups callbackemprocessRecording. - Načíst entitu (
fetchAllJobs) - v paměti jeSTATE_READY. - Raw connection přepnout řádek v DB na
STATE_FINISHED(případněSTATE_PROCESSING) - simulace „job mezitím sebral jiný konzument". - Zavolat
processJob(id)(src/BackgroundQueue.php:232). - Aser: callback se nespustil (
Mailer::$processOrderje prázdné) a stav v DB se zSTATE_FINISHEDnezměnil - podmíněnýUPDATE ... WHERE id = :id AND state IN (ready)vrátilaffectedRows === 0aprocessJobtiše skončil. - Negativní kontrola (job je READY): běžný job projde,
affectedRows === 1, callback se spustí, stav jeSTATE_FINISHED- ověří, že claim nezablokoval normální cestu. - Tento test platí i mimo
serialGroup(claim je společný pro všechny joby, viz kapitola „Joby bez serialGroup").
Jediný „velký" test. Ostatní testy jsou izolované a deterministické, ale pozorovaný problém
se projevuje v broker módu (kapitola „Důsledek", docs/priority-serialgroup.md:41-46),
ne v cron módu. Tenhle test jako jediný projede celou produkční smyčku
process() → publikace do prioritní fronty → konzumace → checkUnfinishedJobs → WAITING →
_processWaitingJobs → zpět na READY → znovupublikace. Tím zároveň pokryje interakci s
processWaitingJobs() (src/BackgroundQueue.php:768), která sdílí
findOldestUnfinishedJobIdsByGroup() (řádek 594) - žádný z testů 1-5 ji neprověří.
- Konfigurace:
producerzapnutý,priorities[1, 2], callbackprocessRecording, jako značku použít identifikátor jobu. - Do jedné
serialGrouppublikovat kanonický příklada1 b1 c2 d1 e1 f2 g1. - V cyklu (s pojistným stropem iterací) opakovat:
process()- v broker módu přepne způsobilé řádky na READY a publikuje jejich ID do prioritních front; zároveň zaregistruje interní_processWaitingJobs(src/BackgroundQueue.php:192-194).- konzumovat dostupné zprávy přes rozšířený
Producerhelper (prioritní fronty v pořadí priorit) a na každé ID zavolatprocessJob()- to je cesta, kterou v provozu jedeConsumer::consume(). - nechat proběhnout
_processWaitingJobs(zkonzumovat a zpracovat i jeho job), aby se nejstarší WAITING hlava skupiny vrátila na READY.
- cyklus končí, až nejsou žádné nedokončené joby.
- Aser pořadí:
Mailer::$processOrder === ['a','b','d','e','g','c','f'](priorita ASC, při shodě ID ASC) - stejný cíl jako 2b, ale přes reálnou broker cestu. - Aser sériovosti: v žádném okamžiku nejsou dva joby téže skupiny ve
STATE_PROCESSINGsoučasně (kontrola uvnitřprocessRecordingpřesfetchAllJobs). - Před opravou červeně reprodukuje invertovanou prioritu z kapitoly „Důsledek", po opravě zezelená.
Pozn.: test vyžaduje běžící RabbitMQ (běží v CI kontejneru background-queue_rabbitmq) a
je z celé sady nejdražší a nejcitlivější na časování; proto zůstává jediný svého druhu -
konkrétní větve levněji a stabilněji pokrývají jednotkové testy 2a/3/5.
Doplňkový, čistě deterministický test pro findOldestUnfinishedJobIdsByGroup() a navazující
processWaitingJobs(). Happy-path Test 6 tuhle větev s daty neprověří (po opravě řazení se
v něm do WAITING nic nedostane, takže processWaitingJobs jede naprázdno), proto ji testujeme
cíleně - včetně skupiny s "dírou" v prioritách (žádný job na nejvyšší prioritě). Zároveň
regresně hlídá implementaci výběru hlavy přes odvozenou tabulku (viz kapitola „Náročnost,
výběr hlavy skupiny a benchmark").
- Konfigurace
priorities[1, 2, 3], cron mód (bez produceru), callbackprocessRecording. - Skupina
gA: joby s prioritami 2, 1, 1 → hlava jea2(nejmenší ID v nejvyšší přítomné prioritě 1). - Skupina
gBs dírou: joby jen s prioritami 3 a 2 (žádná 1) → hlava jeb2(priorita 2), neb1. Ověřuje, že "nejvyšší priorita" znamená nejmenší přítomnou prioritu, ne pevnou hodnotu. - Všechny joby ručně přepnout na
STATE_WAITING, pak reflexí:findOldestUnfinishedJobIdsByGroup(STATE_WAITING)→ vrátí hlavy[a2, b2].processWaitingJobs()→ přepne právě tyhle hlavy naREADY, zbytek zůstaneWAITING.
- Před starou logikou (
MIN(id)bez priority) by hlavy vyšlya1/b1→ test červeně chytí regresi.
| Test | Pokrývá | Charakter |
|---|---|---|
| 1 | kapitola „Joby bez serialGroup" | regrese, zelený před i po |
| 2a/2b | bod 1 (řazení dle priority, ID) | reprodukce, červený → zelený |
| 3 | bod 2 (klauzule PROCESSING) | reprodukce regrese sériovosti |
| 4 | bod 3 (zámek na skupinu) | jen primitivum (race nereprodukovatelný single-process) |
| 5 | bod 4 (podmíněný claim) | reprodukce redelivery race |
| 6 | celá broker smyčka + _processWaitingJobs |
end-to-end, vyžaduje RabbitMQ, nejdražší |
| 7 | výběr hlavy skupiny dle (priorita, ID), findOldestUnfinishedJobIdsByGroup/processWaitingJobs |
deterministická reprodukce, červený → zelený |