From 5e9a65d7fd32da58a386fc2883f22a9fadc236ff Mon Sep 17 00:00:00 2001 From: lsh1756 Date: Fri, 26 Jun 2026 21:41:33 +0900 Subject: [PATCH 1/3] =?UTF-8?q?fix:=20verifier=20=ED=81=90=20drain=20?= =?UTF-8?q?=EB=9F=B0=ED=83=80=EC=9E=84=20=EC=98=A4=EB=A5=98=20=EC=88=98?= =?UTF-8?q?=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/security_scanner/core/finding/model.py | 3 ++- .../storage/adapters/nosql_db/store.py | 6 +++++- tests/test_incremental_scan_storage.py | 18 ++++++++++++++++++ tests/test_llm_verifier.py | 14 +++++++++++++- 4 files changed, 38 insertions(+), 3 deletions(-) diff --git a/src/security_scanner/core/finding/model.py b/src/security_scanner/core/finding/model.py index 45c43cf..2848132 100644 --- a/src/security_scanner/core/finding/model.py +++ b/src/security_scanner/core/finding/model.py @@ -336,10 +336,11 @@ def to_dict(self) -> dict: @classmethod def from_dict(cls, data: dict) -> "Location": + line_end = data.get("lineEnd") return cls( file_path=data["filePath"], line_start=int(data["lineStart"]), - line_end=data.get("lineEnd"), + line_end=int(line_end) if line_end is not None else None, ) diff --git a/src/security_scanner/storage/adapters/nosql_db/store.py b/src/security_scanner/storage/adapters/nosql_db/store.py index e5f363f..957d385 100644 --- a/src/security_scanner/storage/adapters/nosql_db/store.py +++ b/src/security_scanner/storage/adapters/nosql_db/store.py @@ -629,7 +629,11 @@ def lease_next_scan_job( candidates = self._read_lease_candidate_window( now=now, window=window, include_legacy=include_legacy ) - eligible = [job for job in candidates if _scan_job_is_lease_eligible(job, now)] + eligible = [ + job + for job in candidates + if job.job_type != "verify" and _scan_job_is_lease_eligible(job, now) + ] if not eligible: return None diff --git a/tests/test_incremental_scan_storage.py b/tests/test_incremental_scan_storage.py index a2aafa1..d68ecc0 100644 --- a/tests/test_incremental_scan_storage.py +++ b/tests/test_incremental_scan_storage.py @@ -483,6 +483,24 @@ def test_lease_next_verify_job_only_leases_verify_jobs(): assert leased.worker_id == "verify-worker" +def test_lease_next_scan_job_ignores_verify_jobs(): + store, _ = _make_store() + verify_job = _make_job(commit_sha="1" * 40, job_type=JOB_TYPE_VERIFY) + scan_job = _make_job(commit_sha="2" * 40) + store.enqueue_commit_scan_job(verify_job) + store.enqueue_commit_scan_job(scan_job) + + leased = store.lease_next_scan_job( + worker_id="scan-worker", + lease_seconds=60, + now=NOW, + ) + + assert leased is not None + assert leased.job_id == scan_job.job_id + assert leased.job_type != JOB_TYPE_VERIFY + + def test_finding_for_verify_job_reads_embedded_snapshot(): store, _ = _make_store() finding = _make_finding(_make_job()) diff --git a/tests/test_llm_verifier.py b/tests/test_llm_verifier.py index 1d634e5..90f5c59 100644 --- a/tests/test_llm_verifier.py +++ b/tests/test_llm_verifier.py @@ -8,6 +8,7 @@ import json import threading +from decimal import Decimal from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from security_scanner.core.finding.model import ( @@ -24,7 +25,6 @@ ) from security_scanner.llm.ollama.client import OllamaChatVerifier - FAKE_SECRET = "SCANNER_FAKE_SECRET_TOKEN_111111" FAKE_MATCH = f"token={FAKE_SECRET}" @@ -88,6 +88,18 @@ def test_prompt_surfaces_configuration_match_as_true_positive(): assert "Current finding matched label: true_positive." in prompt +def test_prompt_accepts_dynamodb_decimal_line_end_snapshot(): + payload = _finding(file_path="config/positive.env", line_start=4).to_dict() + payload["location"]["lineStart"] = Decimal("4") + payload["location"]["lineEnd"] = Decimal("4") + + finding = Finding.from_dict(payload) + prompt = build_redacted_prompt(finding) + + assert '"lineStart":4' in prompt + assert '"lineEnd":4' in prompt + + def test_valid_json_maps_to_verifier_verdict(): result = parse_verifier_response( json.dumps( From 35b1be83ca7e69982f2f4ab49e5b547235764df7 Mon Sep 17 00:00:00 2001 From: lsh1756 Date: Sat, 27 Jun 2026 15:40:59 +0900 Subject: [PATCH 2/3] =?UTF-8?q?chore:=20verifier=20drain=20goal=20?= =?UTF-8?q?=EB=B2=94=EC=9C=84=20=EA=B0=B1=EC=8B=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- governance/autopilot_goal.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/governance/autopilot_goal.yml b/governance/autopilot_goal.yml index 3f6633c..25e2223 100644 --- a/governance/autopilot_goal.yml +++ b/governance/autopilot_goal.yml @@ -26,6 +26,7 @@ allowed_writes: - src/security_scanner/cli/commands/operations.py - src/security_scanner/cli/commands/scan.py - src/security_scanner/cli/commands/verify.py + - src/security_scanner/core/finding/model.py - src/security_scanner/runtime/incremental_discovery.py - src/security_scanner/runtime/notification_log.py - src/security_scanner/runtime/dead_letter_recovery.py @@ -42,6 +43,7 @@ allowed_writes: - tests/test_dead_letter_recovery.py - tests/test_incremental_discovery.py - tests/test_incremental_scan_storage.py + - tests/test_llm_verifier.py - tests/test_personal_prod_systemd_units.py - tests/test_scan_worker.py - tests/test_verify_cli.py From 7a21b36d6788c3a7283f08d2b0dee20446a11261 Mon Sep 17 00:00:00 2001 From: lsh1756 Date: Sat, 27 Jun 2026 15:56:20 +0900 Subject: [PATCH 3/3] =?UTF-8?q?fix:=20verifier=20=ED=81=90=20lease=20?= =?UTF-8?q?=EB=B6=84=EB=A6=AC=20=EB=B3=B4=EA=B0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/security_scanner/core/finding/model.py | 13 +- src/security_scanner/runtime/verify_queue.py | 7 +- .../storage/adapters/nosql_db/store.py | 127 ++++++++++++++++-- tests/test_incremental_scan_storage.py | 93 ++++++++++++- tests/test_llm_verifier.py | 11 ++ 5 files changed, 226 insertions(+), 25 deletions(-) diff --git a/src/security_scanner/core/finding/model.py b/src/security_scanner/core/finding/model.py index 2848132..2494369 100644 --- a/src/security_scanner/core/finding/model.py +++ b/src/security_scanner/core/finding/model.py @@ -25,7 +25,6 @@ from collections.abc import Iterable from dataclasses import dataclass, field - # --------------------------------------------------------------------------- # Salt constant # --------------------------------------------------------------------------- @@ -336,14 +335,22 @@ def to_dict(self) -> dict: @classmethod def from_dict(cls, data: dict) -> "Location": - line_end = data.get("lineEnd") return cls( file_path=data["filePath"], line_start=int(data["lineStart"]), - line_end=int(line_end) if line_end is not None else None, + line_end=_optional_int(data.get("lineEnd")), ) +def _optional_int(value: object) -> int | None: + if value is None or isinstance(value, bool): + return None + try: + return int(value) + except (TypeError, ValueError): + return None + + @dataclass class Evidence: """Evidence attached to a finding. diff --git a/src/security_scanner/runtime/verify_queue.py b/src/security_scanner/runtime/verify_queue.py index 8e9013d..7e17adb 100644 --- a/src/security_scanner/runtime/verify_queue.py +++ b/src/security_scanner/runtime/verify_queue.py @@ -54,10 +54,9 @@ # Verify jobs ride the same queue but as their own free-form job_type value. The # string is intentionally NOT added to storage.base (incremental/baseline are the # only freshness-bearing classes); a verify completion advances NO freshness -# field, so it must never reach the repo-health advance path. The code-scan -# worker enforces this by returning any leased job_type=="verify" job to pending -# before fetch/scan/_advance_repo_health (scan_worker.run_scan_worker_once); the -# dedicated drain path (drain_verify_jobs) is the only consumer of verify jobs. +# field, so it must never reach the repo-health advance path. Storage lease +# selection keeps code-scan workers on non-verify jobs; the dedicated drain path +# (drain_verify_jobs) is the only consumer of verify jobs. JOB_TYPE_VERIFY = "verify" # Verify jobs are the LOWEST queue precedence: the queue sorts ascending on diff --git a/src/security_scanner/storage/adapters/nosql_db/store.py b/src/security_scanner/storage/adapters/nosql_db/store.py index 957d385..891196e 100644 --- a/src/security_scanner/storage/adapters/nosql_db/store.py +++ b/src/security_scanner/storage/adapters/nosql_db/store.py @@ -5,6 +5,7 @@ import base64 import datetime as dt import hashlib +import heapq import json import random from collections import Counter @@ -93,6 +94,7 @@ SCAN_DATE_AXIS, SCAN_JOB_AXIS, TARGET_LIST_AXIS, + legacy_list_axis_pk, sharded_list_axis_pk, ) from security_scanner.storage.adapters.nosql_db.list_axis_reader import ( @@ -627,13 +629,12 @@ def lease_next_scan_job( window = max(dequeue_window, 1) candidates = self._read_lease_candidate_window( - now=now, window=window, include_legacy=include_legacy + now=now, + window=window, + include_legacy=include_legacy, + exclude_job_type="verify", ) - eligible = [ - job - for job in candidates - if job.job_type != "verify" and _scan_job_is_lease_eligible(job, now) - ] + eligible = [job for job in candidates if _scan_job_is_lease_eligible(job, now)] if not eligible: return None @@ -668,7 +669,7 @@ def lease_next_verify_job( now = _ensure_utc(now) window = max(dequeue_window, 1) candidates = self._read_lease_candidate_window( - now=now, window=window, include_legacy=False + now=now, window=window, include_legacy=False, job_type="verify" ) eligible = [ job @@ -776,19 +777,30 @@ def _read_lease_candidate_window( now: dt.datetime, window: int, include_legacy: bool, + job_type: str | None = None, + exclude_job_type: str | None = None, ) -> list[ScanJob]: """Return the bounded global head window across pending+leased shards. Each status partition is read ordered+limited (``read_list_axis_ordered`` ascending), then the two status streams are merged and truncated to - ``window`` by the same lease sort key the GSI sort key encodes. The read - is bounded by ``window`` per status, never a full-partition scan. + ``window`` by the same lease sort key the GSI sort key encodes. + Optional job-type filters are applied in the index query path so a large + verifier backlog cannot fill a scan-worker window, and vice versa. """ pending = self._read_scan_jobs_by_status_bounded( - SCAN_JOB_STATUS_PENDING, limit=window, include_legacy=include_legacy + SCAN_JOB_STATUS_PENDING, + limit=window, + include_legacy=include_legacy, + job_type=job_type, + exclude_job_type=exclude_job_type, ) leased = self._read_scan_jobs_by_status_bounded( - SCAN_JOB_STATUS_LEASED, limit=window, include_legacy=include_legacy + SCAN_JOB_STATUS_LEASED, + limit=window, + include_legacy=include_legacy, + job_type=job_type, + exclude_job_type=exclude_job_type, ) merged = [*pending, *leased] merged.sort(key=_scan_job_lease_sort_key) @@ -1649,7 +1661,13 @@ def _dead_letter_job_matches( return True def _read_scan_jobs_by_status_bounded( - self, status: str, *, limit: int, include_legacy: bool = False + self, + status: str, + *, + limit: int, + include_legacy: bool = False, + job_type: str | None = None, + exclude_job_type: str | None = None, ) -> list[ScanJob]: """Read at most ``limit`` head jobs of one sharded status (FR-5/SC-1). @@ -1658,7 +1676,23 @@ def _read_scan_jobs_by_status_bounded( and a per-shard ``Limit``, so the read is bounded to the head window instead of the whole partition. No new GSI: the gsi1sk sort key ``nextAttemptAt#priority#createdAt#jobId`` already encodes FIFO order. + + When ``job_type`` or ``exclude_job_type`` is set, the returned candidate + count remains bounded by ``limit`` but the query may advance across + filtered-out pages. That preserves scan/verifier liveness without a + typed queue projection. """ + if job_type is not None and exclude_job_type is not None: + raise ValueError("job_type and exclude_job_type are mutually exclusive") + if job_type is not None or exclude_job_type is not None: + items = self._read_scan_job_axis_filtered( + status, + limit=limit, + include_legacy=include_legacy, + job_type=job_type, + exclude_job_type=exclude_job_type, + ) + return items_to_scan_jobs(items) items = read_list_axis_ordered( self._table, spec=SCAN_JOB_AXIS, @@ -1670,6 +1704,75 @@ def _read_scan_jobs_by_status_bounded( ) return items_to_scan_jobs(items) + def _read_scan_job_axis_filtered( + self, + status: str, + *, + limit: int, + include_legacy: bool, + job_type: str | None, + exclude_job_type: str | None, + ) -> list[dict[str, Any]]: + width = bucket_width(SCAN_JOB_AXIS.shard_count) + partitions = [ + sharded_list_axis_pk( + SCAN_JOB_AXIS, f"SCAN_JOB_STATUS#{status}", f"{bucket:0{width}d}" + ) + for bucket in range(SCAN_JOB_AXIS.shard_count) + ] + if include_legacy: + partitions.append(legacy_list_axis_pk(f"SCAN_JOB_STATUS#{status}")) + + per_partition = [ + self._query_scan_job_axis_filtered_partition( + partition, + limit=limit, + job_type=job_type, + exclude_job_type=exclude_job_type, + ) + for partition in partitions + ] + merged = heapq.merge( + *per_partition, + key=lambda item: item.get(SCAN_JOB_AXIS.gsi_sk_field) or "", + ) + deduped: list[dict[str, Any]] = [] + seen: set[tuple[Any, Any]] = set() + for item in merged: + key = (item.get("PK"), item.get("SK")) + if key in seen: + continue + seen.add(key) + deduped.append(item) + if len(deduped) >= limit: + break + return deduped + + def _query_scan_job_axis_filtered_partition( + self, + partition: str, + *, + limit: int, + job_type: str | None, + exclude_job_type: str | None, + ) -> list[dict[str, Any]]: + values: dict[str, Any] = {":pk": partition} + query_args: dict[str, Any] = { + "IndexName": GSI1_NAME, + "KeyConditionExpression": f"{SCAN_JOB_AXIS.gsi_pk_field} = :pk", + "ExpressionAttributeValues": values, + "ScanIndexForward": True, + } + if job_type is not None: + values[":job_type"] = job_type + query_args["FilterExpression"] = "jobType = :job_type" + elif exclude_job_type is not None: + values[":excluded_job_type"] = exclude_job_type + query_args["FilterExpression"] = ( + "attribute_not_exists(jobType) OR jobType <> :excluded_job_type" + ) + return query_all_pages(self._table, limit=limit, **query_args) + def reap_expired_leases(self, now: dt.datetime) -> ReapSummary: """Reclaim expired job + repo leases on a timer (FR-6 lease-reaper). diff --git a/tests/test_incremental_scan_storage.py b/tests/test_incremental_scan_storage.py index d68ecc0..c194eaf 100644 --- a/tests/test_incremental_scan_storage.py +++ b/tests/test_incremental_scan_storage.py @@ -9,6 +9,7 @@ from security_scanner.core.finding.model import Finding from security_scanner.runtime.dead_letter_recovery import DeadLetterFilters from security_scanner.runtime.verify_queue import JOB_TYPE_VERIFY +from security_scanner.storage.adapters.nosql_db.axis_core import axis_shard from security_scanner.storage.adapters.nosql_db.items import ( finding_to_items, ref_state_from_item, @@ -22,6 +23,7 @@ scan_ledger_entry_from_item, scan_ledger_entry_to_item, ) +from security_scanner.storage.adapters.nosql_db.list_axis import SCAN_JOB_AXIS from security_scanner.storage.adapters.nosql_db.store import ( DynamoDbCompatibleFindingStore, ) @@ -123,12 +125,13 @@ def query(self, **kwargs) -> dict: page_items = items[: kwargs["Limit"]] else: page_items = items + filtered_items = self._apply_filter_expression(page_items, kwargs) # Select=COUNT returns only a count and no item bodies, modeling real # DynamoDB so the SC-7 read-API backlog path can prove it never reads # SCAN_JOB rows (and never Scans the table). if kwargs.get("Select") == "COUNT": - return {"Count": len(page_items), "ScannedCount": len(page_items)} - response = {"Items": [dict(item) for item in page_items]} + return {"Count": len(filtered_items), "ScannedCount": len(page_items)} + response = {"Items": [dict(item) for item in filtered_items]} if "Limit" in kwargs and len(items) > len(page_items): last = page_items[-1] response["LastEvaluatedKey"] = {"PK": last["PK"], "SK": last["SK"]} @@ -162,6 +165,27 @@ def _get_existing(self, pk: str, sk: str) -> dict | None: return item return None + @staticmethod + def _apply_filter_expression(items: list[dict], kwargs: dict) -> list[dict]: + expression = kwargs.get("FilterExpression") + values = kwargs.get("ExpressionAttributeValues", {}) + if expression is None: + return items + if expression == "jobType = :job_type": + job_type = values[":job_type"] + return [item for item in items if item.get("jobType") == job_type] + if ( + expression + == "attribute_not_exists(jobType) OR jobType <> :excluded_job_type" + ): + excluded = values[":excluded_job_type"] + return [ + item + for item in items + if "jobType" not in item or item.get("jobType") != excluded + ] + return items + def _condition_allows(self, kwargs: dict, existing: dict | None) -> bool: expression = kwargs.get("ConditionExpression") if expression is None: @@ -322,6 +346,19 @@ def _make_job( ) +def _same_scan_job_shard_commit_shas(count: int) -> list[str]: + by_shard: dict[str, list[str]] = {} + for value in range(10_000): + commit_sha = f"{value:040x}" + job = _make_job(commit_sha=commit_sha) + shard = axis_shard(job.job_id, shard_count=SCAN_JOB_AXIS.shard_count) + commits = by_shard.setdefault(shard, []) + commits.append(commit_sha) + if len(commits) == count: + return commits + raise AssertionError("unable to find enough synthetic jobs in one shard") + + def _make_ledger(job: ScanJob) -> ScanLedgerEntry: return ScanLedgerEntry( repo_id=job.repo_id, @@ -483,17 +520,30 @@ def test_lease_next_verify_job_only_leases_verify_jobs(): assert leased.worker_id == "verify-worker" -def test_lease_next_scan_job_ignores_verify_jobs(): +def test_lease_next_scan_job_ignores_verify_backlog_filling_window(): store, _ = _make_store() - verify_job = _make_job(commit_sha="1" * 40, job_type=JOB_TYPE_VERIFY) - scan_job = _make_job(commit_sha="2" * 40) - store.enqueue_commit_scan_job(verify_job) + commit_shas = _same_scan_job_shard_commit_shas(3) + verify_jobs = [ + _make_job( + commit_sha=commit_sha, + job_type=JOB_TYPE_VERIFY, + next_attempt_at=NOW - dt.timedelta(minutes=3 - index), + ) + for index, commit_sha in enumerate(commit_shas[:2]) + ] + scan_job = _make_job( + commit_sha=commit_shas[2], + next_attempt_at=NOW - dt.timedelta(seconds=30), + ) + for verify_job in verify_jobs: + store.enqueue_commit_scan_job(verify_job) store.enqueue_commit_scan_job(scan_job) leased = store.lease_next_scan_job( worker_id="scan-worker", lease_seconds=60, now=NOW, + dequeue_window=1, ) assert leased is not None @@ -501,6 +551,37 @@ def test_lease_next_scan_job_ignores_verify_jobs(): assert leased.job_type != JOB_TYPE_VERIFY +def test_lease_next_verify_job_ignores_scan_backlog_filling_window(): + store, _ = _make_store() + commit_shas = _same_scan_job_shard_commit_shas(3) + scan_jobs = [ + _make_job( + commit_sha=commit_sha, + next_attempt_at=NOW - dt.timedelta(minutes=3 - index), + ) + for index, commit_sha in enumerate(commit_shas[:2]) + ] + verify_job = _make_job( + commit_sha=commit_shas[2], + job_type=JOB_TYPE_VERIFY, + next_attempt_at=NOW - dt.timedelta(seconds=30), + ) + for scan_job in scan_jobs: + store.enqueue_commit_scan_job(scan_job) + store.enqueue_commit_scan_job(verify_job) + + leased = store.lease_next_verify_job( + worker_id="verify-worker", + lease_seconds=60, + now=NOW, + dequeue_window=1, + ) + + assert leased is not None + assert leased.job_id == verify_job.job_id + assert leased.job_type == JOB_TYPE_VERIFY + + def test_finding_for_verify_job_reads_embedded_snapshot(): store, _ = _make_store() finding = _make_finding(_make_job()) diff --git a/tests/test_llm_verifier.py b/tests/test_llm_verifier.py index 90f5c59..37a9126 100644 --- a/tests/test_llm_verifier.py +++ b/tests/test_llm_verifier.py @@ -100,6 +100,17 @@ def test_prompt_accepts_dynamodb_decimal_line_end_snapshot(): assert '"lineEnd":4' in prompt +def test_prompt_treats_malformed_optional_line_end_as_absent(): + for line_end in ("", "not-a-number", True, [], {}): + payload = _finding(file_path="config/positive.env", line_start=4).to_dict() + payload["location"]["lineEnd"] = line_end + + finding = Finding.from_dict(payload) + + assert finding.location.line_start == 4 + assert finding.location.line_end is None + + def test_valid_json_maps_to_verifier_verdict(): result = parse_verifier_response( json.dumps(