-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathConsumeCommand.php
More file actions
93 lines (77 loc) · 2.72 KB
/
Copy pathConsumeCommand.php
File metadata and controls
93 lines (77 loc) · 2.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
<?php
namespace ADT\BackgroundQueue\Console;
use ADT\BackgroundQueue\BackgroundQueue;
use ADT\BackgroundQueue\Broker\Consumer;
use Exception;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
#[AsCommand(name: 'background-queue:consume', description: 'Start consumer.')]
class ConsumeCommand extends \Symfony\Component\Console\Command\Command
{
public function __construct(
private readonly Consumer $consumer,
private readonly BackgroundQueue $backgroundQueue
) {
parent::__construct();
}
protected function configure(): void
{
$this->addArgument('queue', InputArgument::OPTIONAL);
$this->addOption('jobs', 'j', InputOption::VALUE_REQUIRED, 'Number of jobs consumed by one consumer in one process', 1);
$this->addOption('priorities', 'p', InputOption::VALUE_REQUIRED, 'Priorities for consume (e.g. 10, 20-40, 25-, -20)');
}
/**
* @throws Exception
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$jobs = $input->getOption('jobs');
$priorities = $this->getPrioritiesListBasedConfig($input->getOption('priorities'));
if (!is_numeric($jobs)) {
$output->writeln("<error>Option --jobs has to be integer</error>");
return self::FAILURE;
}
for ($i = 0; $i < (int)$jobs; $i++) {
$this->backgroundQueue->dieIfNecessary();
$this->consumer->consume($this->backgroundQueue->getQueue($input->getArgument('queue')), $priorities);
}
return self::SUCCESS;
}
/**
* @throws Exception
*/
private function getPrioritiesListBasedConfig(?string $prioritiesText = null): array
{
$prioritiesAvailable = $this->backgroundQueue->getConfig()['priorities'];
if (is_null($prioritiesText)) {
return $prioritiesAvailable;
}
if (!str_contains($prioritiesText, '-')) {
$priority = (int)$prioritiesText;
if (!in_array($priority, $prioritiesAvailable)) {
throw new Exception("Priority $priority is not in available priorities [" . implode(',', $prioritiesAvailable) . "]");
}
return [$priority];
}
list($min, $max) = explode('-', $prioritiesText);
if ($min === '') {
$min = $prioritiesAvailable[0];
}
if ($max === '') {
$max = end($prioritiesAvailable);
}
$priorities = [];
foreach ($prioritiesAvailable as $priorityAvailable) {
if ($priorityAvailable >= $min && $priorityAvailable <= $max) {
$priorities[] = $priorityAvailable;
}
}
if (!count($priorities)) {
throw new Exception("Priority $prioritiesText has not intersections with availables priorities [" . implode(',', $prioritiesAvailable) . "]");
}
return $priorities;
}
}