-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathConsumeCommand.php
More file actions
90 lines (73 loc) · 2.71 KB
/
Copy pathConsumeCommand.php
File metadata and controls
90 lines (73 loc) · 2.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
<?php
namespace ADT\BackgroundQueue\Console;
use ADT\BackgroundQueue\BackgroundQueue;
use ADT\BackgroundQueue\Broker\Consumer;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
class ConsumeCommand extends \Symfony\Component\Console\Command\Command
{
protected static $defaultName = 'background-queue:consume';
private Consumer $consumer;
private BackgroundQueue $backgroundQueue;
public function __construct(Consumer $consumer, BackgroundQueue $backgroundQueue)
{
parent::__construct();
$this->consumer = $consumer;
$this->backgroundQueue = $backgroundQueue;
}
protected function configure()
{
$this->setName('background-queue:consume');
$this->addArgument('queue', InputArgument::REQUIRED);
$this->addOption('jobs', 'j', InputOption::VALUE_REQUIRED, 'Number of jobs consumed by one consumer in one process', 1);
$this->addOption('priorities', 'p', InputOption::VALUE_REQUIRED, 'Priorities for consume (e.g. 10, 20-40, 25-, -20)');
$this->setDescription('Start consumer.');
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$jobs = $input->getOption('jobs');
$priorities = $this->getPrioritiesListBasedConfig($input->getOption('priorities'));
if (!is_numeric($jobs)) {
$output->writeln("<error>Option --jobs has to be integer</error>");
return 1;
}
for ($i = 0; $i < (int)$jobs; $i++) {
$this->backgroundQueue->dieIfNecessary();
$this->consumer->consume($input->getArgument('queue'), $priorities);
}
return 0;
}
private function getPrioritiesListBasedConfig(?string $prioritiesText = null): array
{
$prioritiesAvailable = $this->backgroundQueue->getConfig()['priorities'];
if (is_null($prioritiesText)) {
return $prioritiesAvailable;
}
if (strpos($prioritiesText, '-') === false) {
$priority = (int)$prioritiesText;
if (!in_array($priority, $prioritiesAvailable)) {
throw new \Exception("Priority $priority is not in available priorities [" . implode(',', $prioritiesAvailable) . "]");
}
return [$priority];
}
list($min, $max) = explode('-', $prioritiesText);
if ($min === '') {
$min = $prioritiesAvailable[0];
}
if ($max === '') {
$max = end($prioritiesAvailable);
}
$priorities = [];
foreach ($prioritiesAvailable as $priorityAvailable) {
if ($priorityAvailable >= $min && $priorityAvailable <= $max) {
$priorities[] = $priorityAvailable;
}
}
if (!count($priorities)) {
throw new \Exception("Priority $prioritiesText has not intersections with availables priorities [" . implode(',', $prioritiesAvailable) . "]");
}
return $priorities;
}
}