-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathBackgroundQueueExtension.php
More file actions
120 lines (104 loc) · 3.71 KB
/
Copy pathBackgroundQueueExtension.php
File metadata and controls
120 lines (104 loc) · 3.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
<?php
namespace ADT\BackgroundQueueNette\DI;
use ADT\BackgroundQueue\BackgroundQueue;
use ADT\BackgroundQueue\Console\ClearFinishedCommand;
use ADT\BackgroundQueue\Console\ConsumeCommand;
use ADT\BackgroundQueue\Console\ProcessCommand;
use ADT\BackgroundQueue\Console\ReloadConsumersCommand;
use ADT\BackgroundQueue\Console\UpdateSchemaCommand;
use Nette\DI\CompilerExtension;
use Nette\Schema\Expect;
use Nette\Schema\Processor;
use Nette\Schema\Schema;
use stdClass;
/** @noinspection PhpUnused */
class BackgroundQueueExtension extends CompilerExtension
{
public function getConfigSchema(): Schema
{
return Expect::structure([
'callbacks' => Expect::arrayOf(
Expect::anyOf(
Expect::type('callable'), // callbackName => callback
Expect::structure([ // callbackName => callback + queue
'callback' => Expect::type('callable'),
'queue' => Expect::string(),
])
),
'string'
)->required(),
'notifyOnNumberOfAttempts' => Expect::int()->min(1)->required(),
'tempDir' => Expect::string()->required(),
'locksDir' => Expect::string()->required(),
'queue' => Expect::string()->nullable(),
'connection' => Expect::anyOf('string', Expect::arrayOf('int|string|object', 'string')),
'tableName' => Expect::string('background_job'),
'producer' => Expect::string()->nullable(),
'waitingJobExpiration' => Expect::int(1000),
'logger'=> Expect::anyOf(Expect::type(\Nette\DI\Definitions\Statement::class), Expect::type(\Nette\DI\Statement::class))->nullable(),
'onBeforeProcess' => Expect::type('callable')->nullable(),
'onError' => Expect::type('callable')->nullable(),
'onAfterProcess' => Expect::type('callable')->nullable(),
'onProcessingGetMetadata' => Expect::type('callable')->nullable(),
]);
}
public function loadConfiguration(): void
{
// nette/di 2.4
$this->config = $config = $this->objectToArray((new Processor)->process($this->getConfigSchema(), $this->config));
$builder = $this->getContainerBuilder();
foreach ($config['callbacks'] as $callbackName => $callbackData) {
if (!isset($callbackData['callback'])) {
// structure unification:
// from: callbackName => callback
// to: callbackName => callback + queue
$config['callbacks'][$callbackName] = [
'callback' => $callbackData,
'queue' => null,
];
}
}
// service registration
$builder->addDefinition($this->prefix('service'))
->setFactory(BackgroundQueue::class)
->setArguments(['config' => $config]);
// command registration
$defs[] = $builder->addDefinition($this->prefix('clearFinishedCommand'))
->setFactory(ClearFinishedCommand::class)
->setAutowired(false);
$defs[] = $builder->addDefinition($this->prefix('processCommand'))
->setFactory(ProcessCommand::class)
->setAutowired(false);
if ($config['producer']) {
$builder->addDefinition($this->prefix('consumeCommand'))
->setFactory(ConsumeCommand::class)
->setAutowired(false);
$defs[] = $builder->addDefinition($this->prefix('reloadConsumerCommand'))
->setFactory(ReloadConsumersCommand::class)
->setAutowired(false);
}
$defs[] = $builder->addDefinition($this->prefix('updateSchemaCommand'))
->setFactory(UpdateSchemaCommand::class)
->setAutowired(false);
foreach ($defs as $_def) {
$_def->addSetup('setLocksDir', [$config['locksDir']]);
}
}
private function objectToArray($array)
{
if (is_array($array)) {
foreach ($array as $key => $value) {
if (is_array($value)) {
$array[$key] = $this->objectToArray($value);
}
if ($value instanceof stdClass) {
$array[$key] = $this->objectToArray((array)$value);
}
}
}
if ($array instanceof stdClass) {
return $this->objectToArray((array)$array);
}
return $array;
}
}