diff --git a/src/security_scanner/runtime/incremental_discovery.py b/src/security_scanner/runtime/incremental_discovery.py index 91a9c79..87c0097 100644 --- a/src/security_scanner/runtime/incremental_discovery.py +++ b/src/security_scanner/runtime/incremental_discovery.py @@ -4,6 +4,7 @@ import datetime as dt import fnmatch +import logging import subprocess from collections.abc import Callable, Sequence from dataclasses import dataclass, field @@ -47,6 +48,7 @@ # This is a LOGIC default only; the real pool size is a box-gate decision (do not # invent a load-validated value) so it stays injectable per invocation. DEFAULT_FETCH_CONCURRENCY = 8 +_LOGGER = logging.getLogger(__name__) class GitDiscoveryError(RuntimeError): @@ -388,7 +390,7 @@ def _cursor_shas_for( def _plan_fetches( request: IncrementalDiscoveryRequest, targets: Sequence[ScanTarget], -) -> tuple[list[ScanTarget], int]: +) -> tuple[list[ScanTarget], list[ScanTarget]]: """ls-remote skip pass (SC-6a): split targets into fetch-now vs idle-skip. When no ``ls_remote`` runner is injected, EVERY target is fetched (legacy @@ -397,10 +399,10 @@ def _plan_fetches( ``skipped_idle``. """ if request.ls_remote is None: - return list(targets), 0 + return list(targets), [] to_fetch: list[ScanTarget] = [] - skipped_idle = 0 + skipped_idle: list[ScanTarget] = [] for target in targets: repo_id = repo_id_for_scan_target_url(target.url) decision = decide_ls_remote_skip( @@ -413,7 +415,7 @@ def _plan_fetches( if decision.changed: to_fetch.append(target) else: - skipped_idle += 1 + skipped_idle.append(target) return to_fetch, skipped_idle @@ -482,7 +484,14 @@ def run_incremental_discovery( ledger_skipped = 0 skipped_non_fast_forward = 0 - to_fetch, skipped_idle = _plan_fetches(request, targets) + to_fetch, skipped_idle_targets = _plan_fetches(request, targets) + skipped_idle = len(skipped_idle_targets) + if request.mode == DISCOVERY_MODE_ENQUEUE: + for target in skipped_idle_targets: + _advance_incremental_health( + request, + repo_id=repo_id_for_scan_target_url(target.url), + ) fetched, fetch_failed = _run_fetches(request, to_fetch) fetch_ok = 0 @@ -498,13 +507,12 @@ def run_incremental_discovery( fetch_ok += 1 refs_observed += len(refs) if not refs: - request.store.advance_repo_health( - repo_id, - job_type=JOB_TYPE_INCREMENTAL, - completed_at=_now(request), - ) + if request.mode == DISCOVERY_MODE_ENQUEUE: + _advance_incremental_health(request, repo_id=repo_id) continue + pending_work_observed = False + unsafe_to_mark_fresh = False for git_ref in refs: current_state = request.store.get_ref_state(repo_id, git_ref.ref_name) observed_state = RefState( @@ -516,6 +524,7 @@ def run_incremental_discovery( ) if request.mode == DISCOVERY_MODE_INITIALIZE or current_state is None: request.store.put_ref_state(observed_state) + unsafe_to_mark_fresh = True continue if current_state.last_seen_sha == git_ref.commit_sha: continue @@ -525,6 +534,7 @@ def run_incremental_discovery( git_ref.commit_sha, ): skipped_non_fast_forward += 1 + unsafe_to_mark_fresh = True continue commits = request.git.list_new_commits( @@ -542,6 +552,7 @@ def run_incremental_discovery( if request.store.has_scan_ledger(key): ledger_skipped += 1 continue + pending_work_observed = True job = _scan_job_for_commit( repo_id=repo_id, repo_url=repo_url, @@ -558,6 +569,13 @@ def run_incremental_discovery( request.store.put_ref_state(observed_state) + if ( + request.mode == DISCOVERY_MODE_ENQUEUE + and not pending_work_observed + and not unsafe_to_mark_fresh + ): + _advance_incremental_health(request, repo_id=repo_id) + return IncrementalDiscoverySummary( targets=summary.targets, fetch_ok=fetch_ok, @@ -570,6 +588,26 @@ def run_incremental_discovery( ) +def _advance_incremental_health( + request: IncrementalDiscoveryRequest, + *, + repo_id: str, +) -> None: + """Best-effort heartbeat for repos with no new incremental work.""" + try: + request.store.advance_repo_health( + repo_id, + job_type=JOB_TYPE_INCREMENTAL, + completed_at=_now(request), + ) + except Exception: # noqa: BLE001 - heartbeat failure must not abort discovery. + _LOGGER.warning( + "failed to advance incremental repo health: %s", + repo_id, + exc_info=True, + ) + + def _scan_ledger_key( *, repo_id: str, diff --git a/tests/test_incremental_discovery.py b/tests/test_incremental_discovery.py index 80a0119..c8971c4 100644 --- a/tests/test_incremental_discovery.py +++ b/tests/test_incremental_discovery.py @@ -66,6 +66,11 @@ def advance_repo_health(self, repo_id: str, *, job_type: str, completed_at) -> N self.health_advances.append((repo_id, job_type, completed_at)) +class FailingHealthStore(FakeIncrementalStore): + def advance_repo_health(self, repo_id: str, *, job_type: str, completed_at) -> None: + raise RuntimeError("repo health unavailable") + + class FakeGitDiscovery: def __init__(self) -> None: self.refs_by_path: dict[Path, list[GitRef]] = {} @@ -235,6 +240,91 @@ def test_enqueue_no_refs_marks_repo_incrementally_fresh_without_job(): assert store.health_advances == [(repo_id, "incremental", NOW)] +def test_initialize_no_refs_does_not_mark_repo_incrementally_fresh(): + repo_path = Path("/synthetic-cache/example-repo") + store = FakeIncrementalStore([TARGET]) + git = FakeGitDiscovery() + git.refs_by_path[repo_path] = [] + + summary = run_incremental_discovery( + _request( + mode=DISCOVERY_MODE_INITIALIZE, + store=store, + git=git, + fetch_repo=lambda url: repo_path, + ) + ) + + assert summary.fetch_ok == 1 + assert summary.refs_observed == 0 + assert summary.jobs_enqueued == 0 + assert store.jobs == {} + assert store.health_advances == [] + + +def test_enqueue_unchanged_refs_marks_repo_incrementally_fresh_without_job(): + repo_path = Path("/synthetic-cache/example-repo") + repo_id = repo_id_for_scan_target_url(TARGET.url) + store = FakeIncrementalStore([TARGET]) + store.put_ref_state( + RefState( + repo_id=repo_id, + repo_url=TARGET.url, + ref_name=REF_MAIN, + last_seen_sha=OLD_SHA, + updated_at=NOW, + ) + ) + git = FakeGitDiscovery() + git.refs_by_path[repo_path] = [GitRef(ref_name=REF_MAIN, commit_sha=OLD_SHA)] + + summary = run_incremental_discovery( + _request( + mode=DISCOVERY_MODE_ENQUEUE, + store=store, + git=git, + fetch_repo=lambda url: repo_path, + ) + ) + + assert summary.fetch_ok == 1 + assert summary.refs_observed == 1 + assert summary.jobs_enqueued == 0 + assert store.jobs == {} + assert store.health_advances == [(repo_id, "incremental", NOW)] + + +def test_enqueue_unchanged_refs_continues_when_health_heartbeat_fails(): + repo_path = Path("/synthetic-cache/example-repo") + repo_id = repo_id_for_scan_target_url(TARGET.url) + store = FailingHealthStore([TARGET]) + store.put_ref_state( + RefState( + repo_id=repo_id, + repo_url=TARGET.url, + ref_name=REF_MAIN, + last_seen_sha=OLD_SHA, + updated_at=NOW, + ) + ) + git = FakeGitDiscovery() + git.refs_by_path[repo_path] = [GitRef(ref_name=REF_MAIN, commit_sha=OLD_SHA)] + + summary = run_incremental_discovery( + _request( + mode=DISCOVERY_MODE_ENQUEUE, + store=store, + git=git, + fetch_repo=lambda url: repo_path, + ) + ) + + assert summary.fetch_ok == 1 + assert summary.refs_observed == 1 + assert summary.jobs_enqueued == 0 + assert store.jobs == {} + + def test_enqueue_skips_commits_present_in_ledger(): repo_path = Path("/synthetic-cache/example-repo") repo_id = repo_id_for_scan_target_url(TARGET.url) @@ -279,6 +369,52 @@ def test_enqueue_skips_commits_present_in_ledger(): assert store.ref_states[(repo_id, REF_MAIN)].last_seen_sha == NEW_SHA +def test_enqueue_all_commits_present_in_ledger_marks_repo_incrementally_fresh(): + repo_path = Path("/synthetic-cache/example-repo") + repo_id = repo_id_for_scan_target_url(TARGET.url) + store = FakeIncrementalStore([TARGET]) + store.put_ref_state( + RefState( + repo_id=repo_id, + repo_url=TARGET.url, + ref_name=REF_MAIN, + last_seen_sha=OLD_SHA, + updated_at=NOW, + ) + ) + scanner = _scanner() + for commit_sha in (MID_SHA, NEW_SHA): + store.ledger.add( + ScanLedgerKey( + repo_id=repo_id, + commit_sha=commit_sha, + scanner_name=scanner.scanner_name, + scanner_version=scanner.scanner_version, + rule_pack_version=scanner.rule_pack_version, + scanner_config_hash=scanner.scanner_config_hash, + ) + ) + git = FakeGitDiscovery() + git.refs_by_path[repo_path] = [GitRef(ref_name=REF_MAIN, commit_sha=NEW_SHA)] + git.ancestor_results[(repo_path, OLD_SHA, NEW_SHA)] = True + git.commits_by_range[(repo_path, OLD_SHA, NEW_SHA)] = [MID_SHA, NEW_SHA] + + summary = run_incremental_discovery( + _request( + mode=DISCOVERY_MODE_ENQUEUE, + store=store, + git=git, + fetch_repo=lambda url: repo_path, + ) + ) + + assert summary.ledger_skipped == 2 + assert summary.jobs_enqueued == 0 + assert store.jobs == {} + assert store.ref_states[(repo_id, REF_MAIN)].last_seen_sha == NEW_SHA + assert store.health_advances == [(repo_id, "incremental", NOW)] + + def test_non_fast_forward_is_reported_without_advancing_ref_state(): repo_path = Path("/synthetic-cache/example-repo") repo_id = repo_id_for_scan_target_url(TARGET.url) diff --git a/tests/test_m4_poll_baseline.py b/tests/test_m4_poll_baseline.py index e193b75..4a10b5b 100644 --- a/tests/test_m4_poll_baseline.py +++ b/tests/test_m4_poll_baseline.py @@ -376,6 +376,10 @@ def fetch_repo(url: str) -> Path: assert summary.fetch_ok == 0 assert fetched_urls == [] # SKIP: no fetch ran for idle repos assert summary.jobs_enqueued == 0 + assert store.health_advances == [ + (repo_id_a, JOB_TYPE_INCREMENTAL, NOW), + (repo_id_b, JOB_TYPE_INCREMENTAL, NOW), + ] def test_cursor_shas_for_glob_pattern_yields_non_empty_cursor(): @@ -448,6 +452,10 @@ def fetch_repo(url: str) -> Path: assert summary.fetch_ok == 0 assert fetched_urls == [] assert summary.jobs_enqueued == 0 + assert store.health_advances == [ + (repo_id_for_scan_target_url(INCLUDED_A), JOB_TYPE_INCREMENTAL, NOW), + (repo_id_for_scan_target_url(INCLUDED_B), JOB_TYPE_INCREMENTAL, NOW), + ] def test_discovery_ls_remote_changed_fetches_and_enqueues_incremental_job():