Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,17 @@ Joby sdílející `serialGroup` běží striktně v pořadí podle ID. `checkUnf

### Brokerová abstrakce (src/Broker/)

`Producer` a `Consumer` jsou rozhraní - přines si libovolný broker. K dispozici je hotová implementace `PhpAmqpLib` (volitelná závislost; viz README pro doporučené omezení `conflict` pinující `php-amqplib` na `^3.0`). Priorita je modelována jako **samostatné fronty** pojmenované `<queue>_<priority>`; `QUEUE_TOP_PRIORITY = 0` je vyhrazena pro řídicí (DIE) zprávy, takže ji konzumeři kontrolují jako první. `publishDie()` + tělo `DIE` je způsob, jak `reload-consumers` elegantně restartuje běžící konzumery.
`Producer` a `Consumer` jsou rozhraní - přines si libovolný broker. K dispozici je hotová implementace `PhpAmqpLib` (volitelná závislost; viz README pro doporučené omezení `conflict` pinující `php-amqplib` na `^3.0`). Priorita je modelována jako **samostatné fronty** pojmenované `<queue>_<priority>`; `QUEUE_TOP_PRIORITY = 0` je vyhrazena pro řídicí (DIE) zprávy, takže ji konzumeři kontrolují jako první. `publishDie()` + tělo `DIE` je způsob, jak `reload-consumers` elegantně restartuje běžící konzumery. Konzumer spuštěný s **labelem** (`consume -l <label>`) dostane vlastní top-priority frontu `<queue>_0_<label>` (`Manager::getTopPriorityName()` / `includeTopPriority()`); díky tomu lze přes `reload-consumers -l label1,label2` restartovat cíleně právě jeho a DIE zprávu nesní jiný konzumer. Label nesmí obsahovat oddělovač `_` (`QUEUE_NAME_PARTS_DELIMITER`).

Vedle `DIE` existuje druhá řídicí zpráva `SHUTDOWN` (`publishShutdown()`, posílá ji `shutdown-consumers` se stejným cílením přes label). Liší se jen exit kódem po dojetí rozdělaného jobu: `DIE` ukončí konzumera přes `die()` (exit 0, supervisor ho typicky restartuje), `SHUTDOWN` přes `exit(Producer::NICE_SHUTDOWN_EXIT_CODE)` (= 100). Ten kód patří do supervisor `exitcodes` při `autorestart=unexpected`, takže konzumer už znovu nenaběhne - "nice shutdown" pro řízené zastavení (např. před restartem serveru). Detaily a vzor konfigurace supervisoru viz README sekce 6.

### Konzolové příkazy (src/Console/)

Všechny příkazy kromě `ConsumeCommand` rozšiřují lokální abstraktní `Command`, který obaluje běh do `ADT\CommandLock` (`FileSystemStorage` pod `tempDir`), takže nemohou běžet souběžně samy se sebou.

- `background-queue:process` - vstupní bod pro cron (spouštět každou minutu).
- `background-queue:consume [queue] -j <jobs> -p <priorities>` - dlouhoběžící brokerový konzumer; `-p` přijímá rozsahy jako `20-40`, `25-`, `-20`.
- `background-queue:clear-finished [days]`, `background-queue:reload-consumers <number> [queue]`, `background-queue:update-schema`.
- `background-queue:consume [queue] -j <jobs> -p <priorities> -l <label>` - dlouhoběžící brokerový konzumer; `-p` přijímá rozsahy jako `20-40`, `25-`, `-20`; `-l` je volitelný label pro cílený restart.
- `background-queue:clear-finished [days]`, `background-queue:reload-consumers <number> [queue] [-l label1,label2]`, `background-queue:shutdown-consumers <number> [queue] [-l label1,label2]` (řízené zastavení = reload, ale s exit kódem `NICE_SHUTDOWN_EXIT_CODE`, který supervisor nerestartuje), `background-queue:update-schema`.

### Hromadný insert

Expand Down
32 changes: 31 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ Ve všech ostatních případech se záznam uloží jako úspěšně dokončený

`background-queue:clear-finished 14` Smaže všechny úspěšně zpracované záznamy starší 14 dní.

`background-queue:reload-consumers QUEUE NUMBER` Reloadne NUMBER consumerů pro danou QUEUE.
`background-queue:reload-consumers NUMBER [QUEUE] [-l LABEL1,LABEL2,...]` Pošle NUMBER restartovacích (DIE) zpráv. Bez `-l` do sdílené DIE fronty dané QUEUE, s `-l` cíleně do DIE fronty každého uvedeného labelu (viz `background-queue:consume -l`).

`background-queue:shutdown-consumers NUMBER [QUEUE] [-l LABEL1,LABEL2,...]` Funguje stejně jako `reload-consumers` (stejné cílení přes QUEUE a `-l`), ale místo restartu konzumery **řízeně zastaví** - dojedou rozdělaný job, další si nevezmou a ukončí se exit kódem určeným k tomu, aby je supervisor už znovu nenastartoval (viz sekce [Řízené zastavení konzumerů](#6-řízené-zastavení-konzumerů-nice-shutdown)).

`background-queue:update-schema` Aktualizuje databázové schéma, pokud je potřeba.

Expand Down Expand Up @@ -294,6 +296,34 @@ Můžeme tedy jednoho konzumera vyhradit například na rozesílání registrač
Tím zajistíme, že rychlé odeslání registračního emailu nebude čekat na dlouho trvající úlohy, protože je odbaví první konzumer.
Ale pokud by se vyskytlo více požadavků na zasílání emailů, po nějaké době je začnou odbavovat všichni konzumeři.

Příkazu `background-queue:consume` máme dále možnost nastavit parametrem `-l` (label) jmenovku konzumera. Konzumer s labelem dostane vlastní DIE frontu (`<queue>_0_<label>`), takže ho lze při `background-queue:reload-consumers` restartovat cíleně pomocí `-l label1,label2,...`, aniž by DIE zprávy "snědl" jiný konzumer.
Aby šel každý konzumer restartovat samostatně, dej každému unikátní label - konzumeři se stejným labelem totiž jednu DIE frontu sdílejí. Bez labelu zůstává chování jako dřív: všichni konzumeři sdílejí jednu DIE frontu.

### 6 Řízené zastavení konzumerů (nice shutdown)

Konzumery typicky spouští a hlídá [supervisor](http://supervisord.org/) - když proces skončí, podle konfigurace ho znovu nastartuje. Toho využívá `background-queue:reload-consumers`: pošle DIE zprávu, konzumer se po dojetí rozdělaného jobu ukončí (exit kód `0`) a supervisor ho nahodí znovu. Tím se konzumeři "obmění" (např. kvůli nasazení nové verze kódu).

Někdy ale chceme konzumery zastavit a **nenechat je znovu nastartovat** - třeba před restartem serveru nebo údržbou. K tomu slouží `background-queue:shutdown-consumers`. Funguje úplně stejně jako `reload-consumers` (stejné cílení přes QUEUE a `-l`), jen konzumer po dojetí rozdělaného jobu skončí dohodnutým **exit kódem `100`** (`ADT\BackgroundQueue\Broker\PhpAmqpLib\Producer::NICE_SHUTDOWN_EXIT_CODE`). V obou případech konzumer nejprve dokončí právě zpracovávaný job a teprve pak se ukončí - žádný job se neztratí ani nepřeruší.

Aby supervisor uměl odlišit "restartuj" (reload) od "nech být" (shutdown), nastav v konfiguraci hlídaného programu `exitcodes` na shutdown exit kód a `autorestart=unexpected`:

```ini
[program:bq-consumer]
command=php bin/console background-queue:consume
autorestart=unexpected
exitcodes=100
```

Sémantika `autorestart=unexpected` (mimochodem výchozí hodnota): supervisor restartuje proces jen tehdy, když skončí exit kódem, který **není** v `exitcodes`. Pak platí:

| Exit kód | Situace | Je v `exitcodes=100`? | Chování supervisoru |
|---|---|---|---|
| `0` | reload (DIE) i běžný konec | ne | "unexpected" -> **restartuje** |
| `100` | nice shutdown | ano (expected) | **nenastartuje** |
| jiný | pád procesu | ne | "unexpected" -> restartuje |

Výchozí `exitcodes` je `0`, proto je nutné ho přepsat na `100` - jinak by byl naopak restartován běžný konec a nice shutdown by se choval jako pád. Pozor také, že `autorestart` se uplatní až pro proces, který úspěšně naběhl (stav `RUNNING`); rozběh řídí `startsecs`/`startretries`.

Dále máme možnost prioritu nastavenou pro callback přetížit při vkládání záznamu v metodě `publish`. Například víme, že se jedná o rozesílání newsletterů.
Tedy se jedná o zasílání emailů, ale s nízkou prioritou zpracování.

Expand Down
2 changes: 1 addition & 1 deletion src/Broker/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@

interface Consumer
{
public function consume(string $queue, array $priorities): void;
public function consume(string $queue, array $priorities, ?string $consumerLabel = null): void;
}
12 changes: 10 additions & 2 deletions src/Broker/PhpAmqpLib/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ public function __construct(private Manager $manager, private BackgroundQueue $b
/**
* @throws Exception
*/
public function consume(string $queue, array $priorities): void
public function consume(string $queue, array $priorities, ?string $consumerLabel = null): void
{
// TODO Do budoucna cheme podporovat libovolné priority a ne pouze jejich výčet.
// Zde si musíme vytáhnout seznam existujících front. To lze přes HTTP API pomocí CURL.

// Nejprve se chceme kouknout, jestli není zaslána zpráva k ukončení, proto na první místo dáme TOP_PRIORITY frontu.
array_unshift($priorities, Manager::QUEUE_TOP_PRIORITY);
// S labelem dostane konzumer vlastní DIE frontu "0_<label>", takže ho lze restartovat cíleně.
$priorities = $this->manager->includeTopPriority($priorities, $consumerLabel);

// Sestavíme si seznam názvů front v RabbitMQ (tedy včetně priorit) a všechny inicializujeme
$queuesWithPriorities = [];
Expand All @@ -47,9 +48,16 @@ public function consume(string $queue, array $priorities): void
$this->manager->closeChannel();

if ($msg->getBody() === Producer::DIE) {
// Restart: exit 0 -> supervisor konzumera (typicky) znovu nastartuje.
die();
}

if ($msg->getBody() === Producer::SHUTDOWN) {
// Nice shutdown: rozdělaný job je už hotový (zpracovává se sériově, prefetch 1), další si nebereme
// a ukončíme se dohodnutým exit kódem, který má supervisor v "exitcodes" - proces už nenaběhne.
exit(Producer::NICE_SHUTDOWN_EXIT_CODE);
}

$this->backgroundQueue->processJob((int)$msg->getBody());
});
}
Expand Down
34 changes: 32 additions & 2 deletions src/Broker/PhpAmqpLib/Manager.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
class Manager
{
const QUEUE_TOP_PRIORITY = 0;
const QUEUE_NAME_PARTS_DELIMITER = '_';

private array $connectionParams;
private array $queueParams;
Expand Down Expand Up @@ -144,8 +145,37 @@ public function setupQos(): void
$this->initQos = true;
}

public function getQueueWithPriority(string $queue, int $priority): string
public function getQueueWithPriority(string $queue, string $priority): string
{
return $queue . '_' . $priority;
return $queue . self::QUEUE_NAME_PARTS_DELIMITER . $priority;
}

/**
* Vloží na začátek seznamu priorit název top-priority (DIE) fronty.
* Konzumer ji tak vždy kontroluje jako první - aby na DIE zprávu reagoval přednostně.
*/
public function includeTopPriority(array $priorities, ?string $label = null): array
{
array_unshift($priorities, $this->getTopPriorityName($label));
return $priorities;
}

/**
* Vrátí název top-priority fronty. Bez labelu je to sdílená "0" fronta;
* s labelem vznikne samostatná "0_<label>" fronta, díky níž má každý takto označený
* konzumer vlastní DIE frontu a lze ho restartovat cíleně (viz ReloadConsumersCommand).
*/
public function getTopPriorityName(?string $label = null): string
{
$topPriority = (string) self::QUEUE_TOP_PRIORITY;
if (!is_null($label)) {
if (strpos($label, self::QUEUE_NAME_PARTS_DELIMITER) !== false) {
throw new Exception('Label cannot contain "' . self::QUEUE_NAME_PARTS_DELIMITER . '".');
}

$topPriority .= self::QUEUE_NAME_PARTS_DELIMITER . $label;
}

return $topPriority;
}
}
25 changes: 22 additions & 3 deletions src/Broker/PhpAmqpLib/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,22 @@
{
const DIE = 'die';

// "Nice shutdown": konzumer dojede rozdělaný job, vezme si tuto řídicí zprávu místo dalšího jobu
// a ukončí se s NICE_SHUTDOWN_EXIT_CODE. Na rozdíl od DIE (exit 0, supervisor konzumera restartuje)
// je tento exit kód určen k zařazení do supervisor "exitcodes", takže proces už znovu nenaběhne.
// Slouží k řízenému zastavení konzumerů (např. před restartem serveru).
const SHUTDOWN = 'shutdown';

const NICE_SHUTDOWN_EXIT_CODE = 100;

public function __construct(private Manager $manager)
{
}

/**
* @throws Exception
*/
public function publish(string $id, string $queue, int $priority, ?int $expiration = null): void
public function publish(string $id, string $queue, string $priority, ?int $expiration = null): void
{
$queue = $this->manager->getQueueWithPriority($queue, $priority);
$exchange = $queue;
Expand Down Expand Up @@ -49,9 +57,20 @@ public function publish(string $id, string $queue, int $priority, ?int $expirati
/**
* @throws Exception
*/
public function publishDie(string $queue): void
public function publishDie(string $queue, ?string $consumerLabel = null): void
{
$this->publish(self::DIE, $queue, $this->manager->getTopPriorityName($consumerLabel));
}

/**
* Pošle do (případně label-specifické) DIE fronty zprávu pro "nice shutdown" - konzumer se po dojetí
* rozdělaného jobu ukončí s NICE_SHUTDOWN_EXIT_CODE a supervisor ho už nenastartuje (viz README).
*
* @throws Exception
*/
public function publishShutdown(string $queue, ?string $consumerLabel = null): void
{
$this->publish(self::DIE, $queue, Manager::QUEUE_TOP_PRIORITY);
$this->publish(self::SHUTDOWN, $queue, $this->manager->getTopPriorityName($consumerLabel));
}

private function createMessage(string $body): AMQPMessage
Expand Down
5 changes: 3 additions & 2 deletions src/Broker/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

interface Producer
{
public function publish(string $id, string $queue, int $priority, ?int $expiration = null): void;
public function publishDie(string $queue): void;
public function publish(string $id, string $queue, string $priority, ?int $expiration = null): void;
public function publishDie(string $queue, ?string $consumerLabel = null): void;
public function publishShutdown(string $queue, ?string $consumerLabel = null): void;
}
4 changes: 3 additions & 1 deletion src/Console/ConsumeCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ protected function configure(): void
$this->addArgument('queue', InputArgument::OPTIONAL);
$this->addOption('jobs', 'j', InputOption::VALUE_REQUIRED, 'Number of jobs consumed by one consumer in one process', 1);
$this->addOption('priorities', 'p', InputOption::VALUE_REQUIRED, 'Priorities for consume (e.g. 10, 20-40, 25-, -20)');
$this->addOption('label', 'l', InputOption::VALUE_OPTIONAL, 'Consumer label for targeted restart via reload-consumers command');
}

/**
Expand All @@ -35,6 +36,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
{
$jobs = $input->getOption('jobs');
$priorities = $this->getPrioritiesListBasedConfig($input->getOption('priorities'));
$label = $input->getOption('label');

if (!is_numeric($jobs)) {
$output->writeln("<error>Option --jobs has to be integer</error>");
Expand All @@ -43,7 +45,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int

for ($i = 0; $i < (int)$jobs; $i++) {
$this->backgroundQueue->dieIfNecessary();
$this->consumer->consume($this->backgroundQueue->getQueue($input->getArgument('queue')), $priorities);
$this->consumer->consume($this->backgroundQueue->getQueue($input->getArgument('queue')), $priorities, $label);
}

return self::SUCCESS;
Expand Down
24 changes: 20 additions & 4 deletions src/Console/ReloadConsumersCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

#[AsCommand(name: 'background-queue:reload-consumers', description: 'Creates the specified number of noop messages to reload consumers consuming specified queue.')]
#[AsCommand(name: 'background-queue:reload-consumers', description: 'Restarts consumers by sending DIE messages to their (optionally label-specific) top-priority queue.')]
class ReloadConsumersCommand extends Command
{
public function __construct(private readonly BackgroundQueue $backgroundQueue, private readonly Producer $producer)
Expand All @@ -22,19 +23,34 @@ protected function configure(): void
$this->addArgument(
"number",
InputArgument::REQUIRED,
'Number of consumers to reload.'
'Number of DIE messages to send to each targeted consumer queue.'
);
$this->addArgument(
"queue",
InputArgument::OPTIONAL,
'A queue whose consumers are to reload.'
);
$this->addOption(
'label',
'l',
InputOption::VALUE_OPTIONAL,
'Comma-separated consumer labels to restart (see consume --label). Empty targets the shared DIE queue.'
);
}

protected function executeCommand(InputInterface $input, OutputInterface $output): int
{
for ($i = 0; $i < $input->getArgument("number"); $i++) {
$this->producer->publishDie($this->backgroundQueue->getQueue($input->getArgument("queue")));
// Labely cílí restart na konkrétní konzumery (každý má vlastní DIE frontu "0_<label>").
// Bez labelu se posílá do sdílené DIE fronty - zpětně kompatibilní původní chování.
$labels = $input->getOption('label');
$labels = $labels ? explode(',', $labels) : [null];

$queue = $this->backgroundQueue->getQueue($input->getArgument("queue"));

foreach ($labels as $label) {
for ($i = 0; $i < $input->getArgument("number"); $i++) {
$this->producer->publishDie($queue, $label);
}
}

return self::SUCCESS;
Expand Down
58 changes: 58 additions & 0 deletions src/Console/ShutdownConsumersCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?php

namespace ADT\BackgroundQueue\Console;

use ADT\BackgroundQueue\BackgroundQueue;
use ADT\BackgroundQueue\Broker\Producer;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

#[AsCommand(name: 'background-queue:shutdown-consumers', description: 'Gracefully stops consumers by sending shutdown messages to their (optionally label-specific) top-priority queue. Unlike reload-consumers, consumers exit with a code meant to keep the supervisor from restarting them.')]
class ShutdownConsumersCommand extends Command
{
public function __construct(private readonly BackgroundQueue $backgroundQueue, private readonly Producer $producer)
{
parent::__construct();
}

protected function configure(): void
{
$this->addArgument(
"number",
InputArgument::REQUIRED,
'Number of shutdown messages to send to each targeted consumer queue.'
);
$this->addArgument(
"queue",
InputArgument::OPTIONAL,
'A queue whose consumers are to shut down.'
);
$this->addOption(
'label',
'l',
InputOption::VALUE_OPTIONAL,
'Comma-separated consumer labels to shut down (see consume --label). Empty targets the shared DIE queue.'
);
}

protected function executeCommand(InputInterface $input, OutputInterface $output): int
{
// Stejné cílení jako reload-consumers (sdílená vs. label-specifická DIE fronta), liší se jen typem
// řídicí zprávy: shutdown konzumera ukončí tak, aby ho supervisor už znovu nenastartoval (viz README).
$labels = $input->getOption('label');
$labels = $labels ? explode(',', $labels) : [null];

$queue = $this->backgroundQueue->getQueue($input->getArgument("queue"));

foreach ($labels as $label) {
for ($i = 0; $i < $input->getArgument("number"); $i++) {
$this->producer->publishShutdown($queue, $label);
}
}

return self::SUCCESS;
}
}
Loading