-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_queue_status.py
More file actions
127 lines (97 loc) · 3.38 KB
/
Copy pathtest_queue_status.py
File metadata and controls
127 lines (97 loc) · 3.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
"""Tests for queue-status runtime and CLI."""
from __future__ import annotations
import datetime as dt
from security_scanner.cli import main
from security_scanner.runtime.queue_status import (
QueueStatusRequest,
read_queue_status,
render_queue_status,
)
from security_scanner.storage.base import QueueStatus
NOW = dt.datetime(2026, 6, 12, 14, 0, tzinfo=dt.UTC)
class FakeQueueStatusStore:
def __init__(self, status: QueueStatus) -> None:
self.status = status
self.now_values: list[dt.datetime] = []
def get_queue_status(self, now: dt.datetime) -> QueueStatus:
self.now_values.append(now)
return self.status
def test_read_queue_status_passes_utc_now_to_store():
status = QueueStatus(
job_counts_by_status={"pending": 2},
expired_job_leases=1,
expired_repo_leases=0,
)
store = FakeQueueStatusStore(status)
observed = read_queue_status(
QueueStatusRequest(store=store, now_factory=lambda: NOW.replace(tzinfo=None))
)
assert observed == status
assert store.now_values == [NOW]
def test_render_queue_status_groups_jobs_and_expired_leases():
rendered = render_queue_status(
QueueStatus(
job_counts_by_status={
"pending": 2,
"leased": 1,
"completed": 3,
"dead_letter": 4,
},
expired_job_leases=5,
expired_repo_leases=6,
)
)
assert rendered == (
"pending: 2\n"
"leased: 1\n"
"completed: 3\n"
"dead_letter: 4\n"
"expired job leases: 5\n"
"expired repo leases: 6\n"
)
def test_render_queue_status_includes_zeroes_for_missing_known_statuses():
rendered = render_queue_status(
QueueStatus(
job_counts_by_status={"pending": 1},
expired_job_leases=0,
expired_repo_leases=0,
)
)
assert "leased: 0" in rendered
assert "completed: 0" in rendered
assert "dead_letter: 0" in rendered
def test_queue_status_cli_defaults_to_dynamodb(monkeypatch, capsys):
store = FakeQueueStatusStore(
QueueStatus(
job_counts_by_status={"pending": 2, "completed": 1},
expired_job_leases=1,
expired_repo_leases=0,
)
)
monkeypatch.setattr(
"security_scanner.cli._store.create_finding_store",
lambda backend, **kwargs: store,
)
exit_code = main(["queue-status"])
captured = capsys.readouterr()
assert exit_code == 0
assert "pending: 2" in captured.out
assert "completed: 1" in captured.out
assert "expired job leases: 1" in captured.out
def test_queue_status_rejects_jsonl_backend(capsys):
exit_code = main(["queue-status", "--storage-backend", "jsonl"])
captured = capsys.readouterr()
assert exit_code == 2
assert "dynamodb only" in captured.err
def test_queue_status_storage_failure_exits_one(monkeypatch, capsys):
class BrokenStore:
def get_queue_status(self, now: dt.datetime) -> QueueStatus:
raise RuntimeError("synthetic queue status failure")
monkeypatch.setattr(
"security_scanner.cli._store.create_finding_store",
lambda backend, **kwargs: BrokenStore(),
)
exit_code = main(["queue-status"])
captured = capsys.readouterr()
assert exit_code == 1
assert "synthetic queue status failure" in captured.err