Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 118 additions & 12 deletions src/security_scanner/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@

import argparse
import os
import signal
import sys
import threading
from pathlib import Path

from security_scanner.catalog.scan_target import ScanTarget
Expand Down Expand Up @@ -69,10 +71,16 @@
run_scan_all,
utc_now_iso,
)
from security_scanner.runtime.branch_residual import (
BranchResidual,
residual_for_repo,
)
from security_scanner.runtime.scan_worker import (
ScanWorkerDaemonSummary,
ScanWorkerRequest,
ScanWorkerSummary,
make_default_scanner,
run_scan_worker,
run_scan_worker_once,
)
from security_scanner.runtime.verify_artifact import (
Expand Down Expand Up @@ -407,9 +415,13 @@ def _render_discovery_summary(summary: IncrementalDiscoverySummary) -> None:


def cmd_scan_worker(args: argparse.Namespace) -> int:
"""Process queued incremental scan jobs once."""
if not args.once:
print("error: scan-worker MVP requires --once", file=sys.stderr)
"""Process queued incremental scan jobs once or as a polling daemon."""
if not args.once and not args.daemon:
print("error: scan-worker requires --once or --daemon", file=sys.stderr)
return 2
if args.once and args.daemon:
print("error: scan-worker --once and --daemon are mutually exclusive",
file=sys.stderr)
return 2
if args.storage_backend != "dynamodb":
print(
Expand All @@ -420,16 +432,24 @@ def cmd_scan_worker(args: argparse.Namespace) -> int:

try:
store = _store_from_args(args)
summary = run_scan_worker_once(
ScanWorkerRequest(
store=store,
fetch_repo=fetch_or_clone,
scanner=make_default_scanner(),
max_jobs=args.max_jobs,
lease_seconds=args.lease_seconds,
worker_id=args.worker_id,
)
request = ScanWorkerRequest(
store=store,
fetch_repo=fetch_or_clone,
scanner=make_default_scanner(),
max_jobs=args.max_jobs,
lease_seconds=args.lease_seconds,
worker_id=args.worker_id,
)
if args.daemon:
stop = _install_signal_shutdown()
daemon_summary = run_scan_worker(
request,
poll_interval_seconds=args.poll_interval,
should_continue=lambda: not stop.is_set(),
)
_render_scan_worker_daemon_summary(daemon_summary)
return 2 if daemon_summary.has_permanent_failure else 0
summary = run_scan_worker_once(request)
except Exception as exc: # noqa: BLE001 - fatal storage/runtime error.
print(f"error: scan-worker failed: {exc}", file=sys.stderr)
return 1
Expand All @@ -438,6 +458,33 @@ def cmd_scan_worker(args: argparse.Namespace) -> int:
return 2 if summary.has_permanent_failure else 0


def _install_signal_shutdown() -> threading.Event:
"""Return an Event set on SIGINT/SIGTERM for graceful daemon shutdown.

signal.signal() may only be called from the main thread; off the main thread
(e.g. embedded in a worker thread or some test runners) we skip registration
and return an Event that the caller can still set manually.
"""
stop = threading.Event()
if threading.current_thread() is not threading.main_thread():
return stop

def _handler(_signum, _frame): # noqa: ANN001 - signal handler signature
stop.set()

for sig in (signal.SIGINT, signal.SIGTERM):
signal.signal(sig, _handler)
return stop
Comment thread
pureliture marked this conversation as resolved.


def _render_scan_worker_daemon_summary(summary: ScanWorkerDaemonSummary) -> None:
print(f"polls: {summary.polls}")
print(f"leased: {summary.leased}")
print(f"completed: {summary.completed}")
print(f"retryable: {summary.retryable}")
print(f"dead-lettered: {summary.dead_lettered}")


def _render_scan_worker_summary(summary: ScanWorkerSummary) -> None:
print(f"leased: {summary.leased}")
print(f"completed: {summary.completed}")
Expand Down Expand Up @@ -465,6 +512,40 @@ def cmd_queue_status(args: argparse.Namespace) -> int:
return 0


def cmd_residual(args: argparse.Namespace) -> int:
"""Show per-branch residual findings for one repository (issue #12)."""
if args.storage_backend != "dynamodb":
print(
"error: residual supports --storage-backend dynamodb only",
file=sys.stderr,
)
return 2

try:
store = _store_from_args(args)
residuals = residual_for_repo(store, args.repo)
except Exception as exc: # noqa: BLE001 - fatal storage/runtime error.
print(f"error: residual failed: {exc}", file=sys.stderr)
return 1

print(_render_residual(args.repo, residuals), end="")
return 0


def _render_residual(repo: str, residuals: list[BranchResidual]) -> str:
lines = [f"repo: {repo}"]
if not residuals:
lines.append(" (no branch residual — repo not incrementally scanned?)")
for residual in residuals:
lines.append(
f" branch {residual.branch} @ {residual.commit}: "
f"{len(residual.finding_ids)} residual finding(s)"
)
for finding_id in residual.finding_ids:
lines.append(f" - {finding_id}")
return "\n".join(lines) + "\n"


def cmd_doctor(args: argparse.Namespace) -> int:
"""Check local runtime dependencies and optional private SCM auth."""
result = run_doctor(
Expand Down Expand Up @@ -1091,6 +1172,18 @@ def build_parser() -> argparse.ArgumentParser:
action="store_true",
help="Process at most --max-jobs jobs and exit.",
)
scan_worker_parser.add_argument(
"--daemon",
action="store_true",
help="Poll the queue continuously until SIGINT/SIGTERM.",
)
scan_worker_parser.add_argument(
"--poll-interval",
type=float,
default=5.0,
metavar="SECONDS",
help="Idle poll interval for --daemon (default: 5.0).",
)
scan_worker_parser.add_argument(
"--max-jobs",
type=int,
Expand Down Expand Up @@ -1121,6 +1214,19 @@ def build_parser() -> argparse.ArgumentParser:
_add_incremental_storage_args(queue_status_parser)
queue_status_parser.set_defaults(func=cmd_queue_status)

residual_parser = subparsers.add_parser(
"residual",
help="Show per-branch residual findings for a repository.",
)
residual_parser.add_argument(
"--repo",
required=True,
metavar="REPO_ID",
help="Repository id (incrementally-scanned repo_id) to report residual for.",
)
_add_incremental_storage_args(residual_parser)
residual_parser.set_defaults(func=cmd_residual)

doctor_parser = subparsers.add_parser(
"doctor",
help="Check local binaries and optional private SCM auth.",
Expand Down
138 changes: 138 additions & 0 deletions src/security_scanner/runtime/branch_residual.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""Per-branch residual computation for incremental scanning (issue #12).

Design (grill L1/L2): finding status/disposition is GLOBAL; branch is an
occurrence dimension only. "Residual on branch B" is DERIVED, not stored: a
finding is residual on B when it appears in an observation at B's latest scanned
commit (``RefState.last_seen_sha``). No new GSI — callers pass observations read
within the ``REPO#<repo>`` partition.
"""

from __future__ import annotations

from collections.abc import Iterable, Mapping
from dataclasses import dataclass
from typing import Any, Protocol

from security_scanner.core.finding.model import Finding
from security_scanner.storage.base import RefState


def branch_from_ref(ref_name: str | None) -> str | None:
"""Derive a short branch name from a git ref.

branch never enters finding identity (L1). Tags resolve to None since they
are not branches.
"""
if not ref_name:
return None
if ref_name.startswith("refs/heads/"):
return ref_name[len("refs/heads/") :]
if ref_name.startswith("refs/remotes/"):
rest = ref_name[len("refs/remotes/") :]
parts = rest.split("/", 1)
return parts[1] if len(parts) == 2 else parts[0]
if ref_name.startswith("refs/tags/"):
return None
return ref_name


def finding_with_context(
finding: Finding, *, commit: str | None, branch: str | None
) -> Finding:
"""Tag a finding with scan-context commit/branch (occurrence, not identity).

None values never clobber existing context; identity (finding_id) is
unaffected because branch/commit are not part of the fingerprint (L1).
"""
if commit is None and branch is None:
return finding
if finding.repo.commit == commit and finding.repo.branch == branch:
return finding
data = finding.to_dict()
repo = dict(data["repo"])
if commit is not None:
repo["commit"] = commit
if branch is not None:
repo["branch"] = branch
data["repo"] = repo
return Finding.from_dict(data)
Comment thread
pureliture marked this conversation as resolved.


@dataclass(frozen=True)
class BranchResidual:
"""Findings still present on a branch at its latest scanned commit."""

branch: str
commit: str
finding_ids: list[str]


def residual_by_branch(
ref_states: Iterable[RefState],
observations: Iterable[Mapping[str, Any]],
) -> list[BranchResidual]:
"""Compute residual findings per branch.

Parameters
----------
ref_states:
Latest known ref states for one repository (``last_seen_sha`` per ref).
observations:
Observation records (top-level ``branch`` / ``commit`` / ``findingId``)
read within the repository partition.

Returns one ``BranchResidual`` per ref that resolves to a branch (tags
skipped), ordered by branch name. A branch with no matching observation
yields an empty ``finding_ids`` list.
"""
# Index observations by commit once (O(M)). Matching is on commit only: a
# commit is scanned once and its observation carries the first ref's branch,
# but a commit reachable from several refs is residual on every ref whose tip
# is that commit (branch is occurrence, derived from the ref).
finding_ids_by_commit: dict[str, list[str]] = {}
for obs in observations:
commit = obs.get("commit")
finding_id = obs.get("findingId")
if commit is None or finding_id is None:
continue
finding_ids_by_commit.setdefault(commit, []).append(finding_id)

results: list[BranchResidual] = []
for ref in ref_states: # O(N)
branch = branch_from_ref(ref.ref_name)
if branch is None:
continue
ids = finding_ids_by_commit.get(ref.last_seen_sha, [])
results.append(
BranchResidual(
branch=branch,
commit=ref.last_seen_sha,
finding_ids=sorted(set(ids)),
)
)
return sorted(results, key=lambda r: r.branch)


class _ResidualStore(Protocol):
def list_ref_states(self, repo_id: str) -> list[RefState]:
"""Return ref states for the repository."""

def read_observations_for_repo(self, repo_id: str) -> list[Mapping[str, Any]]:
"""Return observation records for the repository."""


def residual_for_repo(store: _ResidualStore, repo_id: str) -> list[BranchResidual]:
"""End-to-end per-branch residual for one repository.

Reads ref states + observations within the repo partition and derives
residual. Status/disposition stays global (L1); this is a derived view.

NOTE: meaningful only for incrementally-scanned repos, where REF_STATE rows
and observation ``gsi1pk`` are both keyed by the same ``repo_id`` (the
scan-worker path sets ``repo_full_name == repo_id``). For local_scan-only
repos there are no REF_STATE rows, so this returns an empty list.
"""
return residual_by_branch(
store.list_ref_states(repo_id),
store.read_observations_for_repo(repo_id),
)
Loading
Loading