-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscan_worker.py
More file actions
361 lines (305 loc) · 12.1 KB
/
Copy pathscan_worker.py
File metadata and controls
361 lines (305 loc) · 12.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
"""Incremental scan worker runtime."""
from __future__ import annotations
import datetime as dt
import time
import uuid
from collections.abc import Callable
from dataclasses import dataclass
from pathlib import Path
from typing import Protocol
from security_scanner.core.finding.model import Finding
from security_scanner.core.scan.options import ScanOptions
from security_scanner.runtime.baseline_enqueue import BASELINE_COMMIT_SENTINEL
from security_scanner.runtime.branch_residual import (
branch_from_ref,
finding_with_context,
)
from security_scanner.runtime.verify_queue import JOB_TYPE_VERIFY
from security_scanner.scanners.gitleaks.scanner import GitleaksScanner
from security_scanner.storage.base import (
JOB_TYPE_BASELINE,
IncrementalScanStore,
ScanJob,
ScanLedgerEntry,
)
DEFAULT_LEASE_SECONDS = 300
DEFAULT_RETRY_DELAY_SECONDS = 60
# Optional async-verify enqueue hook (M3 path 2). Called best-effort in the hot
# path after a successful completion with
# ``(store, findings, origin_job=..., now=...)``; it enqueues a
# ``job_type="verify"`` job per ambiguous finding (no LLM call here). Defaults to
# None so the worker's pre-M3 behavior is byte-identical when the async tier is
# not wired (offline box / verifier disabled).
VerifyEnqueue = Callable[..., object]
class CommitScanner(Protocol):
"""Scanner capability needed by scan-worker."""
def scan(
self,
*,
repo_full_name: str,
root: Path,
scan_options: ScanOptions | None,
scan_run_id: str,
rule_pack_version: str,
) -> list[Finding]:
"""Scan a local checkout and return findings."""
@dataclass(frozen=True)
class ScanWorkerRequest:
"""Inputs for one bounded scan-worker invocation."""
store: IncrementalScanStore
fetch_repo: Callable[[str], Path]
scanner: CommitScanner
max_jobs: int = 1
lease_seconds: int = DEFAULT_LEASE_SECONDS
worker_id: str | None = None
retry_delay_seconds: int = DEFAULT_RETRY_DELAY_SECONDS
now_factory: Callable[[], dt.datetime] = lambda: dt.datetime.now(dt.UTC).replace(
microsecond=0
)
# Async LLM-verify enqueue hook (M3 path 2). None keeps pre-M3 behavior.
verify_enqueue: VerifyEnqueue | None = None
@dataclass(frozen=True)
class ScanWorkerSummary:
"""Operator-facing scan-worker summary."""
leased: int = 0
completed: int = 0
retryable: int = 0
dead_lettered: int = 0
@property
def has_permanent_failure(self) -> bool:
"""Return whether the CLI should exit 2."""
return self.dead_lettered > 0
def run_scan_worker_once(request: ScanWorkerRequest) -> ScanWorkerSummary:
"""Lease and process up to max_jobs queued scan jobs."""
worker_id = request.worker_id or f"worker_{uuid.uuid4().hex[:12]}"
leased_count = 0
completed = 0
retryable = 0
dead_lettered = 0
for _ in range(max(request.max_jobs, 0)):
now = _now(request)
job = request.store.lease_next_scan_job(
worker_id=worker_id,
lease_seconds=request.lease_seconds,
now=now,
)
if job is None:
break
leased_count += 1
# M3 guard: a job_type="verify" job belongs to the async-verify drain
# path, NOT this code-scan worker. If the queue ever hands one here
# (e.g. real scan work is drained and only verify jobs remain), it must
# never reach fetch_repo / scanner.scan / _advance_repo_health — its
# "commit" is a synthetic per-finding marker and it advances no repo
# freshness. Return it to pending so the dedicated drain path leases it.
if job.job_type == JOB_TYPE_VERIFY:
request.store.return_job_to_pending(
job.job_id, "verify job is not handled by the code-scan worker"
)
continue
if request.store.has_scan_ledger(job.ledger_key):
request.store.complete_processed_job(
job,
findings=[],
ledger=_ledger_for_job(job, scanned_at=now, finding_count=0),
)
_advance_repo_health(request, job, completed_at=now)
completed += 1
continue
repo_fence = request.store.acquire_repo_lease(
job.repo_id,
worker_id,
request.lease_seconds,
)
if not repo_fence:
# FR-6 skip-bug fix: one unavailable repo must NOT stop the whole
# invocation. Return just this job to pending and CONTINUE to the
# next job so the N-process worker pool keeps draining work.
request.store.return_job_to_pending(job.job_id, "repo lease unavailable")
continue
try:
repo_path = request.fetch_repo(job.repo_url)
scan_run_id = _scan_run_id_for_job(job)
findings = request.scanner.scan(
repo_full_name=job.repo_id,
root=repo_path,
scan_options=_scan_options_for_job(job),
scan_run_id=scan_run_id,
rule_pack_version=job.rule_pack_version,
)
commit, branch = _finding_context_for_job(job)
findings = [
finding_with_context(finding, commit=commit, branch=branch)
for finding in findings
]
scanned_at = _now(request)
request.store.complete_processed_job(
job,
findings=findings,
ledger=_ledger_for_job(
job,
scanned_at=scanned_at,
finding_count=len(findings),
),
)
_advance_repo_health(request, job, completed_at=scanned_at)
# M3 path 2 (async LLM tier): hand the completed findings to the
# verify-enqueue seam so ambiguous findings become separate
# job_type="verify" jobs drained off the hot path. NO LLM call here.
# Best-effort: an enqueue failure must not roll back a completed scan.
_enqueue_verify_jobs(request, job, findings, now=scanned_at)
completed += 1
except Exception as exc: # noqa: BLE001 - scanner/runtime failure is retryable until exhausted.
if job.attempts + 1 >= job.max_attempts:
dead_lettered += 1
else:
retryable += 1
# Fence the failure write with the leased job's worker_id+fence so a
# reaped/slow original worker cannot stamp a failure over a job the
# reaper already returned to another worker (FR-6/SC-2).
request.store.record_retryable_failure(
job.job_id,
error=str(exc),
next_attempt_at=_now(request)
+ dt.timedelta(seconds=request.retry_delay_seconds),
worker_id=worker_id,
fence=job.fence,
)
finally:
# Release only the repo lease this worker still owns: the repo fence
# rejects a stale release after the lease was reaped + re-acquired.
request.store.release_repo_lease(job.repo_id, worker_id, fence=repo_fence)
return ScanWorkerSummary(
leased=leased_count,
completed=completed,
retryable=retryable,
dead_lettered=dead_lettered,
)
@dataclass(frozen=True)
class ScanWorkerDaemonSummary:
"""Aggregated summary across a daemon poll loop."""
polls: int = 0
leased: int = 0
completed: int = 0
retryable: int = 0
dead_lettered: int = 0
@property
def has_permanent_failure(self) -> bool:
return self.dead_lettered > 0
def run_scan_worker(
request: ScanWorkerRequest,
*,
poll_interval_seconds: float,
max_polls: int | None = None,
sleep: Callable[[float], None] = time.sleep,
should_continue: Callable[[], bool] = lambda: True,
) -> ScanWorkerDaemonSummary:
"""Poll the queue until shutdown, draining work then idling.
Each poll runs one bounded ``run_scan_worker_once``. When a poll leases no
work the loop sleeps ``poll_interval_seconds`` before the next poll; when it
finds work it polls again immediately to drain the backlog. The loop stops
when ``should_continue()`` is False or ``max_polls`` is reached. Sleep and
shutdown are injected so the loop is deterministic under test.
"""
polls = leased = completed = retryable = dead_lettered = 0
def _bounded() -> bool:
return max_polls is None or polls < max_polls
while should_continue() and _bounded():
summary = run_scan_worker_once(request)
polls += 1
leased += summary.leased
completed += summary.completed
retryable += summary.retryable
dead_lettered += summary.dead_lettered
if summary.leased == 0 and should_continue() and _bounded():
sleep(poll_interval_seconds)
return ScanWorkerDaemonSummary(
polls=polls,
leased=leased,
completed=completed,
retryable=retryable,
dead_lettered=dead_lettered,
)
def make_default_scanner() -> GitleaksScanner:
"""Return the default commit scanner."""
return GitleaksScanner()
def _scan_options_for_job(job: ScanJob) -> ScanOptions:
"""Return gitleaks scan options for one queued job."""
if _is_baseline_job(job):
return ScanOptions(include_history=True)
return ScanOptions(
include_history=True,
git_log_opts=f"{job.commit_sha}^!",
)
def _finding_context_for_job(job: ScanJob) -> tuple[str | None, str | None]:
"""Return occurrence context to stamp onto scanner findings."""
if _is_baseline_job(job):
return None, None
return job.commit_sha, branch_from_ref(job.ref_name)
def _is_baseline_job(job: ScanJob) -> bool:
"""Return whether a job should run as a full-history baseline scan."""
return (
job.job_type == JOB_TYPE_BASELINE
or job.commit_sha == BASELINE_COMMIT_SENTINEL
)
def _repo_health_job_type(job: ScanJob) -> str:
if _is_baseline_job(job):
return JOB_TYPE_BASELINE
return job.job_type
def _advance_repo_health(
request: ScanWorkerRequest, job: ScanJob, *, completed_at: dt.datetime
) -> None:
"""Advance per-repo freshness after a successful completion (FR-7/SC-5).
Keyed by ``job.job_type`` so an incremental completion advances the
incremental field and a baseline completion the full-scan field, via the
store's attribute-scoped advancing-only conditional write. Legacy baseline
jobs that predate ``jobType`` are recognized by their commit sentinel.
"""
request.store.advance_repo_health(
job.repo_id,
job_type=_repo_health_job_type(job),
completed_at=completed_at,
)
def _enqueue_verify_jobs(
request: ScanWorkerRequest,
job: ScanJob,
findings: list[Finding],
*,
now: dt.datetime,
) -> None:
"""Hand completed findings to the async verify-enqueue seam (M3 path 2).
Best-effort: the scan already completed, so an enqueue failure must never
fail the job or trigger a retry. No-op when no hook is wired (pre-M3
behavior) so the worker default path is unchanged.
"""
if request.verify_enqueue is None:
return
try:
request.verify_enqueue(request.store, findings, origin_job=job, now=now)
except Exception: # noqa: BLE001 - async enqueue is downstream of a done scan.
return
def _scan_run_id_for_job(job: ScanJob) -> str:
return f"scan_run_{job.job_id}"
def _ledger_for_job(
job: ScanJob,
*,
scanned_at: dt.datetime,
finding_count: int,
) -> ScanLedgerEntry:
return ScanLedgerEntry(
repo_id=job.repo_id,
commit_sha=job.commit_sha,
scanner_name=job.scanner_name,
scanner_version=job.scanner_version,
rule_pack_version=job.rule_pack_version,
scanner_config_hash=job.scanner_config_hash,
scan_run_id=_scan_run_id_for_job(job),
job_id=job.job_id,
scanned_at=scanned_at,
finding_count=finding_count,
)
def _now(request: ScanWorkerRequest) -> dt.datetime:
value = request.now_factory()
if value.tzinfo is None:
return value.replace(tzinfo=dt.UTC)
return value.astimezone(dt.UTC).replace(microsecond=0)