Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,15 @@ Joby sdílející `serialGroup` běží striktně v pořadí podle ID. `checkUnf

### Brokerová abstrakce (src/Broker/)

`Producer` a `Consumer` jsou rozhraní - přines si libovolný broker. K dispozici je hotová implementace `PhpAmqpLib` (volitelná závislost; viz README pro doporučené omezení `conflict` pinující `php-amqplib` na `^3.0`). Priorita je modelována jako **samostatné fronty** pojmenované `<queue>_<priority>`; `QUEUE_TOP_PRIORITY = 0` je vyhrazena pro řídicí (DIE) zprávy, takže ji konzumeři kontrolují jako první. `publishDie()` + tělo `DIE` je způsob, jak `reload-consumers` elegantně restartuje běžící konzumery.
`Producer` a `Consumer` jsou rozhraní - přines si libovolný broker. K dispozici je hotová implementace `PhpAmqpLib` (volitelná závislost; viz README pro doporučené omezení `conflict` pinující `php-amqplib` na `^3.0`). Priorita je modelována jako **samostatné fronty** pojmenované `<queue>_<priority>`; `QUEUE_TOP_PRIORITY = 0` je vyhrazena pro řídicí (DIE) zprávy, takže ji konzumeři kontrolují jako první. `publishDie()` + tělo `DIE` je způsob, jak `reload-consumers` elegantně restartuje běžící konzumery. Konzumer spuštěný s **labelem** (`consume -l <label>`) dostane vlastní top-priority frontu `<queue>_0_<label>` (`Manager::getTopPriorityName()` / `includeTopPriority()`); díky tomu lze přes `reload-consumers -l label1,label2` restartovat cíleně právě jeho a DIE zprávu nesní jiný konzumer. Label nesmí obsahovat oddělovač `_` (`QUEUE_NAME_PARTS_DELIMITER`).

### Konzolové příkazy (src/Console/)

Všechny příkazy kromě `ConsumeCommand` rozšiřují lokální abstraktní `Command`, který obaluje běh do `ADT\CommandLock` (`FileSystemStorage` pod `tempDir`), takže nemohou běžet souběžně samy se sebou.

- `background-queue:process` - vstupní bod pro cron (spouštět každou minutu).
- `background-queue:consume [queue] -j <jobs> -p <priorities>` - dlouhoběžící brokerový konzumer; `-p` přijímá rozsahy jako `20-40`, `25-`, `-20`.
- `background-queue:clear-finished [days]`, `background-queue:reload-consumers <number> [queue]`, `background-queue:update-schema`.
- `background-queue:consume [queue] -j <jobs> -p <priorities> -l <label>` - dlouhoběžící brokerový konzumer; `-p` přijímá rozsahy jako `20-40`, `25-`, `-20`; `-l` je volitelný label pro cílený restart.
- `background-queue:clear-finished [days]`, `background-queue:reload-consumers <number> [queue] [-l label1,label2]`, `background-queue:update-schema`.

### Hromadný insert

Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ Ve všech ostatních případech se záznam uloží jako úspěšně dokončený

`background-queue:clear-finished 14` Smaže všechny úspěšně zpracované záznamy starší 14 dní.

`background-queue:reload-consumers QUEUE NUMBER` Reloadne NUMBER consumerů pro danou QUEUE.
`background-queue:reload-consumers NUMBER [QUEUE] [-l LABEL1,LABEL2,...]` Pošle NUMBER restartovacích (DIE) zpráv. Bez `-l` do sdílené DIE fronty dané QUEUE, s `-l` cíleně do DIE fronty každého uvedeného labelu (viz `background-queue:consume -l`).

`background-queue:update-schema` Aktualizuje databázové schéma, pokud je potřeba.

Expand Down Expand Up @@ -294,6 +294,9 @@ Můžeme tedy jednoho konzumera vyhradit například na rozesílání registrač
Tím zajistíme, že rychlé odeslání registračního emailu nebude čekat na dlouho trvající úlohy, protože je odbaví první konzumer.
Ale pokud by se vyskytlo více požadavků na zasílání emailů, po nějaké době je začnou odbavovat všichni konzumeři.

Příkazu `background-queue:consume` máme dále možnost nastavit parametrem `-l` (label) jmenovku konzumera. Konzumer s labelem dostane vlastní DIE frontu (`<queue>_0_<label>`), takže ho lze při `background-queue:reload-consumers` restartovat cíleně pomocí `-l label1,label2,...`, aniž by DIE zprávy "snědl" jiný konzumer.
Aby šel každý konzumer restartovat samostatně, dej každému unikátní label - konzumeři se stejným labelem totiž jednu DIE frontu sdílejí. Bez labelu zůstává chování jako dřív: všichni konzumeři sdílejí jednu DIE frontu.

Dále máme možnost prioritu nastavenou pro callback přetížit při vkládání záznamu v metodě `publish`. Například víme, že se jedná o rozesílání newsletterů.
Tedy se jedná o zasílání emailů, ale s nízkou prioritou zpracování.

Expand Down
2 changes: 1 addition & 1 deletion src/Broker/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@

interface Consumer
{
public function consume(string $queue, array $priorities): void;
public function consume(string $queue, array $priorities, ?string $consumerLabel = null): void;
}
5 changes: 3 additions & 2 deletions src/Broker/PhpAmqpLib/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ public function __construct(private Manager $manager, private BackgroundQueue $b
/**
* @throws Exception
*/
public function consume(string $queue, array $priorities): void
public function consume(string $queue, array $priorities, ?string $consumerLabel = null): void
{
// TODO Do budoucna cheme podporovat libovolné priority a ne pouze jejich výčet.
// Zde si musíme vytáhnout seznam existujících front. To lze přes HTTP API pomocí CURL.

// Nejprve se chceme kouknout, jestli není zaslána zpráva k ukončení, proto na první místo dáme TOP_PRIORITY frontu.
array_unshift($priorities, Manager::QUEUE_TOP_PRIORITY);
// S labelem dostane konzumer vlastní DIE frontu "0_<label>", takže ho lze restartovat cíleně.
$priorities = $this->manager->includeTopPriority($priorities, $consumerLabel);

// Sestavíme si seznam názvů front v RabbitMQ (tedy včetně priorit) a všechny inicializujeme
$queuesWithPriorities = [];
Expand Down
34 changes: 32 additions & 2 deletions src/Broker/PhpAmqpLib/Manager.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
class Manager
{
const QUEUE_TOP_PRIORITY = 0;
const QUEUE_NAME_PARTS_DELIMITER = '_';

private array $connectionParams;
private array $queueParams;
Expand Down Expand Up @@ -144,8 +145,37 @@ public function setupQos(): void
$this->initQos = true;
}

public function getQueueWithPriority(string $queue, int $priority): string
public function getQueueWithPriority(string $queue, string $priority): string
{
return $queue . '_' . $priority;
return $queue . self::QUEUE_NAME_PARTS_DELIMITER . $priority;
}

/**
* Vloží na začátek seznamu priorit název top-priority (DIE) fronty.
* Konzumer ji tak vždy kontroluje jako první - aby na DIE zprávu reagoval přednostně.
*/
public function includeTopPriority(array $priorities, ?string $label = null): array
{
array_unshift($priorities, $this->getTopPriorityName($label));
return $priorities;
}

/**
* Vrátí název top-priority fronty. Bez labelu je to sdílená "0" fronta;
* s labelem vznikne samostatná "0_<label>" fronta, díky níž má každý takto označený
* konzumer vlastní DIE frontu a lze ho restartovat cíleně (viz ReloadConsumersCommand).
*/
public function getTopPriorityName(?string $label = null): string
{
$topPriority = (string) self::QUEUE_TOP_PRIORITY;
if (!is_null($label)) {
if (strpos($label, self::QUEUE_NAME_PARTS_DELIMITER) !== false) {
throw new Exception('Label cannot contain "' . self::QUEUE_NAME_PARTS_DELIMITER . '".');
}

$topPriority .= self::QUEUE_NAME_PARTS_DELIMITER . $label;
}

return $topPriority;
}
}
6 changes: 3 additions & 3 deletions src/Broker/PhpAmqpLib/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public function __construct(private Manager $manager)
/**
* @throws Exception
*/
public function publish(string $id, string $queue, int $priority, ?int $expiration = null): void
public function publish(string $id, string $queue, string $priority, ?int $expiration = null): void
{
$queue = $this->manager->getQueueWithPriority($queue, $priority);
$exchange = $queue;
Expand Down Expand Up @@ -49,9 +49,9 @@ public function publish(string $id, string $queue, int $priority, ?int $expirati
/**
* @throws Exception
*/
public function publishDie(string $queue): void
public function publishDie(string $queue, ?string $consumerLabel = null): void
{
$this->publish(self::DIE, $queue, Manager::QUEUE_TOP_PRIORITY);
$this->publish(self::DIE, $queue, $this->manager->getTopPriorityName($consumerLabel));
}

private function createMessage(string $body): AMQPMessage
Expand Down
4 changes: 2 additions & 2 deletions src/Broker/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

interface Producer
{
public function publish(string $id, string $queue, int $priority, ?int $expiration = null): void;
public function publishDie(string $queue): void;
public function publish(string $id, string $queue, string $priority, ?int $expiration = null): void;
public function publishDie(string $queue, ?string $consumerLabel = null): void;
}
4 changes: 3 additions & 1 deletion src/Console/ConsumeCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ protected function configure(): void
$this->addArgument('queue', InputArgument::OPTIONAL);
$this->addOption('jobs', 'j', InputOption::VALUE_REQUIRED, 'Number of jobs consumed by one consumer in one process', 1);
$this->addOption('priorities', 'p', InputOption::VALUE_REQUIRED, 'Priorities for consume (e.g. 10, 20-40, 25-, -20)');
$this->addOption('label', 'l', InputOption::VALUE_OPTIONAL, 'Consumer label for targeted restart via reload-consumers command');
}

/**
Expand All @@ -35,6 +36,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
{
$jobs = $input->getOption('jobs');
$priorities = $this->getPrioritiesListBasedConfig($input->getOption('priorities'));
$label = $input->getOption('label');

if (!is_numeric($jobs)) {
$output->writeln("<error>Option --jobs has to be integer</error>");
Expand All @@ -43,7 +45,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int

for ($i = 0; $i < (int)$jobs; $i++) {
$this->backgroundQueue->dieIfNecessary();
$this->consumer->consume($this->backgroundQueue->getQueue($input->getArgument('queue')), $priorities);
$this->consumer->consume($this->backgroundQueue->getQueue($input->getArgument('queue')), $priorities, $label);
}

return self::SUCCESS;
Expand Down
24 changes: 20 additions & 4 deletions src/Console/ReloadConsumersCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

#[AsCommand(name: 'background-queue:reload-consumers', description: 'Creates the specified number of noop messages to reload consumers consuming specified queue.')]
#[AsCommand(name: 'background-queue:reload-consumers', description: 'Restarts consumers by sending DIE messages to their (optionally label-specific) top-priority queue.')]
class ReloadConsumersCommand extends Command
{
public function __construct(private readonly BackgroundQueue $backgroundQueue, private readonly Producer $producer)
Expand All @@ -22,19 +23,34 @@ protected function configure(): void
$this->addArgument(
"number",
InputArgument::REQUIRED,
'Number of consumers to reload.'
'Number of DIE messages to send to each targeted consumer queue.'
);
$this->addArgument(
"queue",
InputArgument::OPTIONAL,
'A queue whose consumers are to reload.'
);
$this->addOption(
'label',
'l',
InputOption::VALUE_OPTIONAL,
'Comma-separated consumer labels to restart (see consume --label). Empty targets the shared DIE queue.'
);
}

protected function executeCommand(InputInterface $input, OutputInterface $output): int
{
for ($i = 0; $i < $input->getArgument("number"); $i++) {
$this->producer->publishDie($this->backgroundQueue->getQueue($input->getArgument("queue")));
// Labely cílí restart na konkrétní konzumery (každý má vlastní DIE frontu "0_<label>").
// Bez labelu se posílá do sdílené DIE fronty - zpětně kompatibilní původní chování.
$labels = $input->getOption('label');
$labels = $labels ? explode(',', $labels) : [null];

$queue = $this->backgroundQueue->getQueue($input->getArgument("queue"));

foreach ($labels as $label) {
for ($i = 0; $i < $input->getArgument("number"); $i++) {
$this->producer->publishDie($queue, $label);
}
}

return self::SUCCESS;
Expand Down
72 changes: 72 additions & 0 deletions tests/Integration/ManagerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
<?php

namespace Tests\Integration;

use ADT\BackgroundQueue\Broker\PhpAmqpLib\Manager;
use Codeception\Test\Unit;
use Tests\Support\IntegrationTester;

/**
* Testuje pojmenování front v Manageru se zaměřením na label-specific DIE fronty
* (feature pro cílený restart konzumerů). Čistá logika - nepotřebuje DB ani RabbitMQ.
*/
class ManagerTest extends Unit
{
protected IntegrationTester $tester;

private function getManager(): Manager
{
return new Manager([], ['arguments' => []]);
}

public function testGetTopPriorityNameWithoutLabel()
{
// Bez labelu zůstává sdílená top-priority (DIE) fronta "0".
$this->tester->assertSame('0', $this->getManager()->getTopPriorityName());
$this->tester->assertSame('0', $this->getManager()->getTopPriorityName(null));
}

public function testGetTopPriorityNameWithLabel()
{
// S labelem vznikne samostatná DIE fronta "0_<label>".
$this->tester->assertSame('0_consumer1', $this->getManager()->getTopPriorityName('consumer1'));
}

public function testGetTopPriorityNameRejectsLabelWithDelimiter()
{
// Label se vkládá do názvu fronty za oddělovač "_", takže ho sám obsahovat nesmí.
try {
$this->getManager()->getTopPriorityName('foo_bar');
$this->tester->fail('Očekávána výjimka pro label obsahující "_".');
} catch (\Exception $e) {
$this->tester->assertSame('Label cannot contain "_".', $e->getMessage());
}
}

public function testIncludeTopPriorityPrependsSharedQueue()
{
// Top-priority fronta se vkládá na začátek, aby ji konzumer kontroloval jako první.
$this->tester->assertSame(['0', 10, 20], $this->getManager()->includeTopPriority([10, 20]));
}

public function testIncludeTopPriorityPrependsLabelledQueue()
{
$this->tester->assertSame(['0_worker', 10, 20], $this->getManager()->includeTopPriority([10, 20], 'worker'));
}

public function testGetQueueWithPriority()
{
$this->tester->assertSame('general_10', $this->getManager()->getQueueWithPriority('general', '10'));
}

public function testGetQueueWithPrioritySupportsNamedQueueAndLabelledTopPriority()
{
// Regrese: pojmenovaná fronta (obsahuje "_") v kombinaci s label DIE prioritou ("0_label")
// musí projít. Dřívější kontrola na "_" v názvu fronty by tohle chybně odmítla.
$manager = $this->getManager();
$this->tester->assertSame(
'general_myqueue_0_worker',
$manager->getQueueWithPriority('general_myqueue', $manager->getTopPriorityName('worker'))
);
}
}
96 changes: 96 additions & 0 deletions tests/Integration/ReloadConsumersCommandTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
<?php

namespace Tests\Integration;

use ADT\BackgroundQueue\BackgroundQueue;
use ADT\BackgroundQueue\Broker\Producer;
use ADT\BackgroundQueue\Console\ReloadConsumersCommand;
use Codeception\Test\Unit;
use Doctrine\DBAL\DriverManager;
use Symfony\Component\Console\Input\ArrayInput;
use Symfony\Component\Console\Output\NullOutput;
use Tests\Support\IntegrationTester;

/**
* Ověřuje, že reload-consumers pošle DIE zprávy do správných (label-specific) front
* a ve správném počtu - tedy jádro cíleného restartu konzumerů.
*/
class ReloadConsumersCommandTest extends Unit
{
protected IntegrationTester $tester;

/**
* Spustí executeCommand() přímo (obejde zámek z abstraktního Command) a vrátí
* seznam volání publishDie() ve tvaru [['queue' => ..., 'label' => ...], ...].
*/
private function runReload(array $input): array
{
$producer = new class implements Producer {
public array $dieCalls = [];
public function publish(string $id, string $queue, string $priority, ?int $expiration = null): void {}
public function publishDie(string $queue, ?string $consumerLabel = null): void
{
$this->dieCalls[] = ['queue' => $queue, 'label' => $consumerLabel];
}
};

$backgroundQueue = new BackgroundQueue([
'queue' => 'general',
'priorities' => [10],
'connection' => DriverManager::getConnection(BackgroundQueue::parseDsn(self::getDsn())),
'logger' => null,
'producer' => $producer,
]);

$command = new ReloadConsumersCommand($backgroundQueue, $producer);

$arrayInput = new ArrayInput($input);
$arrayInput->bind($command->getDefinition());

$method = (new \ReflectionObject($command))->getMethod('executeCommand');
$method->setAccessible(true);
$method->invoke($command, $arrayInput, new NullOutput());

return $producer->dieCalls;
}

public function testWithoutLabelSendsToSharedQueue()
{
// Bez labelu se posílá NUMBER DIE zpráv do sdílené DIE fronty (label = null) - původní chování.
$calls = $this->runReload(['number' => 3]);

$this->tester->assertCount(3, $calls);
foreach ($calls as $call) {
$this->tester->assertSame('general', $call['queue']);
$this->tester->assertNull($call['label']);
}
}

public function testWithLabelsSendsNumberPerLabel()
{
// NUMBER zpráv na každý vyjmenovaný label - cílený restart konkrétních konzumerů.
$calls = $this->runReload(['number' => 2, '--label' => 'a,b']);

$this->tester->assertSame([
['queue' => 'general', 'label' => 'a'],
['queue' => 'general', 'label' => 'a'],
['queue' => 'general', 'label' => 'b'],
['queue' => 'general', 'label' => 'b'],
], $calls);
}

public function testRespectsNamedQueue()
{
// Volitelný argument queue rozšíří základní frontu (general -> general_myqueue).
$calls = $this->runReload(['number' => 1, 'queue' => 'myqueue', '--label' => 'x']);

$this->tester->assertSame([
['queue' => 'general_myqueue', 'label' => 'x'],
], $calls);
}

private static function getDsn(): string
{
return 'mysql://' . $_ENV['PROJECT_DB_USER'] . ':' . $_ENV['PROJECT_DB_PASSWORD'] . '@' . $_ENV['PROJECT_DB_HOST'] . ':' . $_ENV['PROJECT_DB_PORT'] . '/' . $_ENV['PROJECT_DB_DBNAME'];
}
}
Loading