self::STATE_BACK_TO_BROKER, self::STATE_READY => self::STATE_READY, self::STATE_TEMPORARILY_FAILED => self::STATE_TEMPORARILY_FAILED, self::STATE_WAITING => self::STATE_WAITING, self::STATE_BROKER_FAILED => self::STATE_BROKER_FAILED ]; const FINISHED_STATES = [ self::STATE_FINISHED => self::STATE_FINISHED, self::STATE_REDUNDANT => self::STATE_REDUNDANT, ]; private ?int $id = null; private string $queue; private ?int $priority; private string $callbackName; private ?string $parameters = null; /** @see self::setParameters() */ private ?string $parametersJson = null; /** @see self::setParameters() */ private int $state = self::STATE_READY; private DateTimeImmutable $createdAt; private ?DateTimeImmutable $lastAttemptAt = null; private int $numberOfAttempts = 0; private ?string $errorMessage = null; private ?string $serialGroup = null; private ?int $coalesceThreshold = null; private ?string $identifier = null; private ?int $postponedBy = null; private bool $processedByBroker = false; private ?int $executionTime = null; private ?DateTimeImmutable $finishedAt = null; private ?int $pid = null; // PID supervisor consumera uvintř docker kontejneru private ?string $metadata = null; // ukládá ve formátu JSON private ?string $memory = null; // ukládá ve formátu JSON private ModeEnum $mode = ModeEnum::NORMAL; private DateTimeImmutable $updatedAt; public function __construct() { $this->createdAt = new DateTimeImmutable(); $this->updatedAt = new DateTimeImmutable(); } public function __clone() { $this->id = null; } public function setId(int $id): self { $this->id = $id; return $this; } public function getId(): ?int { return $this->id; } public function setQueue(string $queue): self { $this->queue = $queue; return $this; } public function getQueue(): string { return $this->queue; } public function setPriority(?int $priority): self { $this->priority = $priority; return $this; } public function getPriority(): ?int { return $this->priority; } public function getCallbackName(): string { return $this->callbackName; } public function setCallbackName(string $callbackName): self { $this->callbackName = $callbackName; return $this; } public function getSerialGroup(): ?string { return $this->serialGroup; } public function setSerialGroup(?string $serialGroup): self { $this->serialGroup = $serialGroup; return $this; } public function getCoalesceThreshold(): ?int { return $this->coalesceThreshold; } public function setCoalesceThreshold(?int $coalesceThreshold): self { $this->coalesceThreshold = $coalesceThreshold; return $this; } /** * @return array * @throws JsonException */ public function getParameters(): array { if ($this->parametersJson) { return array_map(function ($value) { return Utils::getDateTimeFromArray($value, true); }, Json::decode($this->parametersJson, forceArrays: true)); } $this->parameters = is_resource($this->parameters) ? stream_get_contents($this->parameters) : $this->parameters; if (!is_null($this->parameters)) { return unserialize($this->parameters); } return []; } /** * @throws JsonException */ public function setParameters(?array $parameters): self { if (!$parameters) { return $this; } if ($this->isJsonable($parameters)) { $this->parametersJson = Json::encode($parameters); } else { $this->parameters = serialize($parameters); } return $this; } public function getState(): int { return $this->state; } public function setState(int $state): self { if (in_array($state, self::FINISHED_STATES) && $this->state != $state) { $this->updateFinishedAt(); } $this->state = $state; return $this; } public function getLastAttemptAt(): ?DateTimeImmutable { return $this->lastAttemptAt; } public function updateLastAttemptAt(): self { $this->lastAttemptAt = new DateTimeImmutable(); return $this; } public function getNumberOfAttempts(): int { return $this->numberOfAttempts; } public function increaseNumberOfAttempts(): self { $this->numberOfAttempts++; return $this; } public function getErrorMessage(): ?string { return $this->errorMessage; } public function setErrorMessage(?string $errorMessage): self { $this->errorMessage = $errorMessage; return $this; } public function getIdentifier(): ?string { return $this->identifier; } public function setIdentifier(?string $identifier): self { $this->identifier = $identifier; return $this; } public function getPostponedBy(): ?int { return $this->postponedBy; } public function setPostponedBy(?int $postponedBy): self { $this->postponedBy = $postponedBy; return $this; } public function getProcessedByBroker(): bool { return $this->processedByBroker; } public function setProcessedByBroker(bool $processedByBroker): self { $this->processedByBroker = $processedByBroker; return $this; } public function getFinishedAt(): ?DateTimeImmutable { return $this->finishedAt; } public function updateFinishedAt(): self { $this->finishedAt = new DateTimeImmutable(); return $this; } public function getPid(): ?int { return $this->pid; } public function updatePid(): self { $this->pid = getmypid(); return $this; } public function getMetadata(): ?array { return json_decode($this->metadata, true); } public function setMetadata(?array $metadata): self { $this->metadata = json_encode($metadata); return $this; } public function getMemory(): ?array { return json_decode($this->memory, true); } public function setMemory(?array $memory): self { $this->memory = json_encode($memory); return $this; } public function isReadyForProcess(): bool { return isset(self::READY_TO_PROCESS_STATES[$this->state]); } /** * @throws Exception */ public static function createEntity(array $values): self { $entity = (new ReflectionClass(self::class))->newInstanceWithoutConstructor(); $entity->id = $values['id']; $entity->queue = $values['queue']; $entity->priority = $values['priority']; $entity->callbackName = $values['callback_name']; $entity->parameters = $values['parameters']; $entity->parametersJson = $values['parameters_json']; $entity->state = $values['state']; $entity->createdAt = new DateTimeImmutable($values['created_at']); $entity->lastAttemptAt = $values['last_attempt_at'] ? new DateTimeImmutable($values['last_attempt_at']) : null; $entity->numberOfAttempts = $values['number_of_attempts']; $entity->errorMessage = $values['error_message']; $entity->serialGroup = $values['serial_group']; $entity->coalesceThreshold = $values['coalesce_threshold']; $entity->identifier = $values['identifier']; $entity->mode = ModeEnum::from($values['mode']); $entity->postponedBy = $values['postponed_by']; $entity->processedByBroker = $values['processed_by_broker']; $entity->executionTime = $values['execution_time']; $entity->finishedAt = $values['finished_at'] ? new DateTimeImmutable($values['finished_at']) : null; $entity->pid = $values['pid']; $entity->metadata = $values['metadata']; $entity->memory = $values['memory']; $entity->updatedAt = new DateTimeImmutable($values['updated_at']); return $entity; } public function getDatabaseValues(): array { return [ 'queue' => $this->queue, 'priority' => $this->priority, 'callback_name' => $this->callbackName, 'parameters' => $this->parameters, 'parameters_json' => $this->parametersJson, 'state' => $this->state, 'created_at' => $this->createdAt->format('Y-m-d H:i:s'), 'last_attempt_at' => $this->lastAttemptAt?->format('Y-m-d H:i:s'), 'number_of_attempts' => $this->numberOfAttempts, 'error_message' => $this->errorMessage, 'serial_group' => $this->serialGroup, 'coalesce_threshold' => $this->coalesceThreshold, 'identifier' => $this->identifier, 'mode' => $this->mode->value, 'postponed_by' => $this->postponedBy, 'processed_by_broker' => (int) $this->processedByBroker, 'execution_time' => (int) $this->executionTime, 'finished_at' => $this->finishedAt?->format('Y-m-d H:i:s'), 'pid' => $this->pid, 'metadata' => $this->metadata, 'memory' => $this->memory, 'updated_at' => $this->updatedAt->format('Y-m-d H:i:s') ]; } /** * @throws Exception */ public function getAvailableFrom(): DateTime { return new DateTime('@' . (max($this->createdAt->getTimestamp(), $this->lastAttemptAt ? $this->lastAttemptAt->getTimestamp() : 0) + ceil($this->postponedBy/ 1000))); } public function getExecutionTime(): ?int { return $this->executionTime; } public function setExecutionTime(?int $executionTime): self { $this->executionTime = $executionTime; return $this; } public function getMode(): ModeEnum { return $this->mode; } public function setMode(ModeEnum $mode): void { $this->mode = $mode; } public function isModeUnique(): bool { return $this->mode === ModeEnum::UNIQUE; } public function isModeRecurring(): bool { return $this->mode === ModeEnum::RECURRING; } private function isJsonable(array $value): bool { foreach ($value as $item) { if (is_array($item)) { if (!$this->isJsonable($item)) { return false; } } elseif (is_object($item) && !$item instanceof \DateTimeInterface) { return false; } } return true; } public function getUpdatedAt(): DateTimeImmutable { return $this->updatedAt; } public function setUpdatedAt(DateTimeImmutable $updatedAt): void { $this->updatedAt = $updatedAt; } }