Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions governance/autopilot_goal.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ allowed_writes:
- src/security_scanner/cli/commands/operations.py
- src/security_scanner/cli/commands/scan.py
- src/security_scanner/cli/commands/verify.py
- src/security_scanner/core/finding/model.py
- src/security_scanner/runtime/incremental_discovery.py
- src/security_scanner/runtime/notification_log.py
- src/security_scanner/runtime/dead_letter_recovery.py
Expand All @@ -42,6 +43,7 @@ allowed_writes:
- tests/test_dead_letter_recovery.py
- tests/test_incremental_discovery.py
- tests/test_incremental_scan_storage.py
- tests/test_llm_verifier.py
- tests/test_personal_prod_systemd_units.py
- tests/test_scan_worker.py
- tests/test_verify_cli.py
Expand Down
12 changes: 10 additions & 2 deletions src/security_scanner/core/finding/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from collections.abc import Iterable
from dataclasses import dataclass, field


# ---------------------------------------------------------------------------
# Salt constant
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -339,10 +338,19 @@ def from_dict(cls, data: dict) -> "Location":
return cls(
file_path=data["filePath"],
line_start=int(data["lineStart"]),
line_end=data.get("lineEnd"),
line_end=_optional_int(data.get("lineEnd")),
)


def _optional_int(value: object) -> int | None:
if value is None or isinstance(value, bool):
return None
try:
return int(value)
except (TypeError, ValueError):
return None


@dataclass
class Evidence:
"""Evidence attached to a finding.
Expand Down
7 changes: 3 additions & 4 deletions src/security_scanner/runtime/verify_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,9 @@
# Verify jobs ride the same queue but as their own free-form job_type value. The
# string is intentionally NOT added to storage.base (incremental/baseline are the
# only freshness-bearing classes); a verify completion advances NO freshness
# field, so it must never reach the repo-health advance path. The code-scan
# worker enforces this by returning any leased job_type=="verify" job to pending
# before fetch/scan/_advance_repo_health (scan_worker.run_scan_worker_once); the
# dedicated drain path (drain_verify_jobs) is the only consumer of verify jobs.
# field, so it must never reach the repo-health advance path. Storage lease
# selection keeps code-scan workers on non-verify jobs; the dedicated drain path
# (drain_verify_jobs) is the only consumer of verify jobs.
JOB_TYPE_VERIFY = "verify"

# Verify jobs are the LOWEST queue precedence: the queue sorts ascending on
Expand Down
121 changes: 114 additions & 7 deletions src/security_scanner/storage/adapters/nosql_db/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import base64
import datetime as dt
import hashlib
import heapq
import json
import random
from collections import Counter
Expand Down Expand Up @@ -93,6 +94,7 @@
SCAN_DATE_AXIS,
SCAN_JOB_AXIS,
TARGET_LIST_AXIS,
legacy_list_axis_pk,
sharded_list_axis_pk,
)
from security_scanner.storage.adapters.nosql_db.list_axis_reader import (
Expand Down Expand Up @@ -627,7 +629,10 @@ def lease_next_scan_job(
window = max(dequeue_window, 1)

candidates = self._read_lease_candidate_window(
now=now, window=window, include_legacy=include_legacy
now=now,
window=window,
include_legacy=include_legacy,
exclude_job_type="verify",
)
eligible = [job for job in candidates if _scan_job_is_lease_eligible(job, now)]
if not eligible:
Expand Down Expand Up @@ -664,7 +669,7 @@ def lease_next_verify_job(
now = _ensure_utc(now)
window = max(dequeue_window, 1)
candidates = self._read_lease_candidate_window(
now=now, window=window, include_legacy=False
now=now, window=window, include_legacy=False, job_type="verify"
)
eligible = [
job
Expand Down Expand Up @@ -772,19 +777,30 @@ def _read_lease_candidate_window(
now: dt.datetime,
window: int,
include_legacy: bool,
job_type: str | None = None,
exclude_job_type: str | None = None,
) -> list[ScanJob]:
"""Return the bounded global head window across pending+leased shards.

Each status partition is read ordered+limited (``read_list_axis_ordered``
ascending), then the two status streams are merged and truncated to
``window`` by the same lease sort key the GSI sort key encodes. The read
is bounded by ``window`` per status, never a full-partition scan.
``window`` by the same lease sort key the GSI sort key encodes.
Optional job-type filters are applied in the index query path so a large
verifier backlog cannot fill a scan-worker window, and vice versa.
"""
pending = self._read_scan_jobs_by_status_bounded(
SCAN_JOB_STATUS_PENDING, limit=window, include_legacy=include_legacy
SCAN_JOB_STATUS_PENDING,
limit=window,
include_legacy=include_legacy,
job_type=job_type,
exclude_job_type=exclude_job_type,
)
leased = self._read_scan_jobs_by_status_bounded(
SCAN_JOB_STATUS_LEASED, limit=window, include_legacy=include_legacy
SCAN_JOB_STATUS_LEASED,
limit=window,
include_legacy=include_legacy,
job_type=job_type,
exclude_job_type=exclude_job_type,
)
merged = [*pending, *leased]
merged.sort(key=_scan_job_lease_sort_key)
Expand Down Expand Up @@ -1645,7 +1661,13 @@ def _dead_letter_job_matches(
return True

def _read_scan_jobs_by_status_bounded(
self, status: str, *, limit: int, include_legacy: bool = False
self,
status: str,
*,
limit: int,
include_legacy: bool = False,
job_type: str | None = None,
exclude_job_type: str | None = None,
) -> list[ScanJob]:
"""Read at most ``limit`` head jobs of one sharded status (FR-5/SC-1).

Expand All @@ -1654,7 +1676,23 @@ def _read_scan_jobs_by_status_bounded(
and a per-shard ``Limit``, so the read is bounded to the head window
instead of the whole partition. No new GSI: the gsi1sk sort key
``nextAttemptAt#priority#createdAt#jobId`` already encodes FIFO order.

When ``job_type`` or ``exclude_job_type`` is set, the returned candidate
count remains bounded by ``limit`` but the query may advance across
filtered-out pages. That preserves scan/verifier liveness without a
typed queue projection.
"""
if job_type is not None and exclude_job_type is not None:
raise ValueError("job_type and exclude_job_type are mutually exclusive")
if job_type is not None or exclude_job_type is not None:
items = self._read_scan_job_axis_filtered(
status,
limit=limit,
include_legacy=include_legacy,
job_type=job_type,
exclude_job_type=exclude_job_type,
)
return items_to_scan_jobs(items)
items = read_list_axis_ordered(
self._table,
spec=SCAN_JOB_AXIS,
Expand All @@ -1666,6 +1704,75 @@ def _read_scan_jobs_by_status_bounded(
)
return items_to_scan_jobs(items)

def _read_scan_job_axis_filtered(
self,
status: str,
*,
limit: int,
include_legacy: bool,
job_type: str | None,
exclude_job_type: str | None,
) -> list[dict[str, Any]]:
width = bucket_width(SCAN_JOB_AXIS.shard_count)
partitions = [
sharded_list_axis_pk(
SCAN_JOB_AXIS, f"SCAN_JOB_STATUS#{status}", f"{bucket:0{width}d}"
)
for bucket in range(SCAN_JOB_AXIS.shard_count)
]
if include_legacy:
partitions.append(legacy_list_axis_pk(f"SCAN_JOB_STATUS#{status}"))

per_partition = [
self._query_scan_job_axis_filtered_partition(
partition,
limit=limit,
job_type=job_type,
exclude_job_type=exclude_job_type,
)
for partition in partitions
]
merged = heapq.merge(
*per_partition,
key=lambda item: item.get(SCAN_JOB_AXIS.gsi_sk_field) or "",
)
deduped: list[dict[str, Any]] = []
seen: set[tuple[Any, Any]] = set()
for item in merged:
key = (item.get("PK"), item.get("SK"))
if key in seen:
continue
seen.add(key)
deduped.append(item)
if len(deduped) >= limit:
break
return deduped

def _query_scan_job_axis_filtered_partition(
self,
partition: str,
*,
limit: int,
job_type: str | None,
exclude_job_type: str | None,
) -> list[dict[str, Any]]:
values: dict[str, Any] = {":pk": partition}
query_args: dict[str, Any] = {
"IndexName": GSI1_NAME,
"KeyConditionExpression": f"{SCAN_JOB_AXIS.gsi_pk_field} = :pk",
"ExpressionAttributeValues": values,
"ScanIndexForward": True,
}
if job_type is not None:
values[":job_type"] = job_type
query_args["FilterExpression"] = "jobType = :job_type"
elif exclude_job_type is not None:
values[":excluded_job_type"] = exclude_job_type
query_args["FilterExpression"] = (
"attribute_not_exists(jobType) OR jobType <> :excluded_job_type"
)
return query_all_pages(self._table, limit=limit, **query_args)

def reap_expired_leases(self, now: dt.datetime) -> ReapSummary:
"""Reclaim expired job + repo leases on a timer (FR-6 lease-reaper).

Expand Down
103 changes: 101 additions & 2 deletions tests/test_incremental_scan_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from security_scanner.core.finding.model import Finding
from security_scanner.runtime.dead_letter_recovery import DeadLetterFilters
from security_scanner.runtime.verify_queue import JOB_TYPE_VERIFY
from security_scanner.storage.adapters.nosql_db.axis_core import axis_shard
from security_scanner.storage.adapters.nosql_db.items import (
finding_to_items,
ref_state_from_item,
Expand All @@ -22,6 +23,7 @@
scan_ledger_entry_from_item,
scan_ledger_entry_to_item,
)
from security_scanner.storage.adapters.nosql_db.list_axis import SCAN_JOB_AXIS
from security_scanner.storage.adapters.nosql_db.store import (
DynamoDbCompatibleFindingStore,
)
Expand Down Expand Up @@ -123,12 +125,13 @@ def query(self, **kwargs) -> dict:
page_items = items[: kwargs["Limit"]]
else:
page_items = items
filtered_items = self._apply_filter_expression(page_items, kwargs)
# Select=COUNT returns only a count and no item bodies, modeling real
# DynamoDB so the SC-7 read-API backlog path can prove it never reads
# SCAN_JOB rows (and never Scans the table).
if kwargs.get("Select") == "COUNT":
return {"Count": len(page_items), "ScannedCount": len(page_items)}
response = {"Items": [dict(item) for item in page_items]}
return {"Count": len(filtered_items), "ScannedCount": len(page_items)}
response = {"Items": [dict(item) for item in filtered_items]}
if "Limit" in kwargs and len(items) > len(page_items):
last = page_items[-1]
response["LastEvaluatedKey"] = {"PK": last["PK"], "SK": last["SK"]}
Expand Down Expand Up @@ -162,6 +165,27 @@ def _get_existing(self, pk: str, sk: str) -> dict | None:
return item
return None

@staticmethod
def _apply_filter_expression(items: list[dict], kwargs: dict) -> list[dict]:
expression = kwargs.get("FilterExpression")
values = kwargs.get("ExpressionAttributeValues", {})
if expression is None:
return items
if expression == "jobType = :job_type":
job_type = values[":job_type"]
return [item for item in items if item.get("jobType") == job_type]
if (
expression
== "attribute_not_exists(jobType) OR jobType <> :excluded_job_type"
):
excluded = values[":excluded_job_type"]
return [
item
for item in items
if "jobType" not in item or item.get("jobType") != excluded
]
return items

def _condition_allows(self, kwargs: dict, existing: dict | None) -> bool:
expression = kwargs.get("ConditionExpression")
if expression is None:
Expand Down Expand Up @@ -322,6 +346,19 @@ def _make_job(
)


def _same_scan_job_shard_commit_shas(count: int) -> list[str]:
by_shard: dict[str, list[str]] = {}
for value in range(10_000):
commit_sha = f"{value:040x}"
job = _make_job(commit_sha=commit_sha)
shard = axis_shard(job.job_id, shard_count=SCAN_JOB_AXIS.shard_count)
commits = by_shard.setdefault(shard, [])
commits.append(commit_sha)
if len(commits) == count:
return commits
raise AssertionError("unable to find enough synthetic jobs in one shard")


def _make_ledger(job: ScanJob) -> ScanLedgerEntry:
return ScanLedgerEntry(
repo_id=job.repo_id,
Expand Down Expand Up @@ -483,6 +520,68 @@ def test_lease_next_verify_job_only_leases_verify_jobs():
assert leased.worker_id == "verify-worker"


def test_lease_next_scan_job_ignores_verify_backlog_filling_window():
store, _ = _make_store()
commit_shas = _same_scan_job_shard_commit_shas(3)
verify_jobs = [
_make_job(
commit_sha=commit_sha,
job_type=JOB_TYPE_VERIFY,
next_attempt_at=NOW - dt.timedelta(minutes=3 - index),
)
for index, commit_sha in enumerate(commit_shas[:2])
]
scan_job = _make_job(
commit_sha=commit_shas[2],
next_attempt_at=NOW - dt.timedelta(seconds=30),
)
for verify_job in verify_jobs:
store.enqueue_commit_scan_job(verify_job)
store.enqueue_commit_scan_job(scan_job)

leased = store.lease_next_scan_job(
worker_id="scan-worker",
lease_seconds=60,
now=NOW,
dequeue_window=1,
)

assert leased is not None
assert leased.job_id == scan_job.job_id
assert leased.job_type != JOB_TYPE_VERIFY


def test_lease_next_verify_job_ignores_scan_backlog_filling_window():
store, _ = _make_store()
commit_shas = _same_scan_job_shard_commit_shas(3)
scan_jobs = [
_make_job(
commit_sha=commit_sha,
next_attempt_at=NOW - dt.timedelta(minutes=3 - index),
)
for index, commit_sha in enumerate(commit_shas[:2])
]
verify_job = _make_job(
commit_sha=commit_shas[2],
job_type=JOB_TYPE_VERIFY,
next_attempt_at=NOW - dt.timedelta(seconds=30),
)
for scan_job in scan_jobs:
store.enqueue_commit_scan_job(scan_job)
store.enqueue_commit_scan_job(verify_job)

leased = store.lease_next_verify_job(
worker_id="verify-worker",
lease_seconds=60,
now=NOW,
dequeue_window=1,
)

assert leased is not None
assert leased.job_id == verify_job.job_id
assert leased.job_type == JOB_TYPE_VERIFY


def test_finding_for_verify_job_reads_embedded_snapshot():
store, _ = _make_store()
finding = _make_finding(_make_job())
Expand Down
Loading
Loading