-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqueue_status.py
More file actions
61 lines (47 loc) · 1.85 KB
/
Copy pathqueue_status.py
File metadata and controls
61 lines (47 loc) · 1.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
"""Incremental queue status runtime."""
from __future__ import annotations
import datetime as dt
from dataclasses import dataclass
from typing import Callable
from security_scanner.storage.adapters.nosql_db.items import (
SCAN_JOB_STATUS_COMPLETED,
SCAN_JOB_STATUS_DEAD_LETTER,
SCAN_JOB_STATUS_LEASED,
SCAN_JOB_STATUS_PENDING,
)
from security_scanner.storage.base import IncrementalScanStore, QueueStatus
QUEUE_STATUS_ORDER = (
SCAN_JOB_STATUS_PENDING,
SCAN_JOB_STATUS_LEASED,
SCAN_JOB_STATUS_COMPLETED,
SCAN_JOB_STATUS_DEAD_LETTER,
)
@dataclass(frozen=True)
class QueueStatusRequest:
"""Inputs for one queue-status read."""
store: IncrementalScanStore
now_factory: Callable[[], dt.datetime] = lambda: dt.datetime.now(dt.UTC).replace(
microsecond=0
)
def read_queue_status(request: QueueStatusRequest) -> QueueStatus:
"""Return queue status counts using the storage model's clock-sensitive view."""
return request.store.get_queue_status(_now(request))
def render_queue_status(status: QueueStatus) -> str:
"""Render a stable human-readable queue status report."""
lines = [
f"{job_status}: {status.job_counts_by_status.get(job_status, 0)}"
for job_status in QUEUE_STATUS_ORDER
]
extra_statuses = sorted(set(status.job_counts_by_status) - set(QUEUE_STATUS_ORDER))
lines.extend(
f"{job_status}: {status.job_counts_by_status[job_status]}"
for job_status in extra_statuses
)
lines.append(f"expired job leases: {status.expired_job_leases}")
lines.append(f"expired repo leases: {status.expired_repo_leases}")
return "\n".join(lines) + "\n"
def _now(request: QueueStatusRequest) -> dt.datetime:
value = request.now_factory()
if value.tzinfo is None:
return value.replace(tzinfo=dt.UTC)
return value.astimezone(dt.UTC).replace(microsecond=0)