Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 48 additions & 10 deletions src/security_scanner/runtime/incremental_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import datetime as dt
import fnmatch
import logging
import subprocess
from collections.abc import Callable, Sequence
from dataclasses import dataclass, field
Expand Down Expand Up @@ -47,6 +48,7 @@
# This is a LOGIC default only; the real pool size is a box-gate decision (do not
# invent a load-validated value) so it stays injectable per invocation.
DEFAULT_FETCH_CONCURRENCY = 8
_LOGGER = logging.getLogger(__name__)


class GitDiscoveryError(RuntimeError):
Expand Down Expand Up @@ -388,7 +390,7 @@ def _cursor_shas_for(
def _plan_fetches(
request: IncrementalDiscoveryRequest,
targets: Sequence[ScanTarget],
) -> tuple[list[ScanTarget], int]:
) -> tuple[list[ScanTarget], list[ScanTarget]]:
"""ls-remote skip pass (SC-6a): split targets into fetch-now vs idle-skip.

When no ``ls_remote`` runner is injected, EVERY target is fetched (legacy
Expand All @@ -397,10 +399,10 @@ def _plan_fetches(
``skipped_idle``.
"""
if request.ls_remote is None:
return list(targets), 0
return list(targets), []

to_fetch: list[ScanTarget] = []
skipped_idle = 0
skipped_idle: list[ScanTarget] = []
for target in targets:
repo_id = repo_id_for_scan_target_url(target.url)
decision = decide_ls_remote_skip(
Expand All @@ -413,7 +415,7 @@ def _plan_fetches(
if decision.changed:
to_fetch.append(target)
else:
skipped_idle += 1
skipped_idle.append(target)
return to_fetch, skipped_idle


Expand Down Expand Up @@ -482,7 +484,14 @@ def run_incremental_discovery(
ledger_skipped = 0
skipped_non_fast_forward = 0

to_fetch, skipped_idle = _plan_fetches(request, targets)
to_fetch, skipped_idle_targets = _plan_fetches(request, targets)
skipped_idle = len(skipped_idle_targets)
if request.mode == DISCOVERY_MODE_ENQUEUE:
for target in skipped_idle_targets:
_advance_incremental_health(
request,
repo_id=repo_id_for_scan_target_url(target.url),
)
Comment thread
pureliture marked this conversation as resolved.
fetched, fetch_failed = _run_fetches(request, to_fetch)

fetch_ok = 0
Expand All @@ -498,13 +507,12 @@ def run_incremental_discovery(
fetch_ok += 1
refs_observed += len(refs)
if not refs:
request.store.advance_repo_health(
repo_id,
job_type=JOB_TYPE_INCREMENTAL,
completed_at=_now(request),
)
if request.mode == DISCOVERY_MODE_ENQUEUE:
_advance_incremental_health(request, repo_id=repo_id)
continue

pending_work_observed = False
unsafe_to_mark_fresh = False
for git_ref in refs:
current_state = request.store.get_ref_state(repo_id, git_ref.ref_name)
observed_state = RefState(
Expand All @@ -516,6 +524,7 @@ def run_incremental_discovery(
)
if request.mode == DISCOVERY_MODE_INITIALIZE or current_state is None:
request.store.put_ref_state(observed_state)
unsafe_to_mark_fresh = True
continue
if current_state.last_seen_sha == git_ref.commit_sha:
continue
Expand All @@ -525,6 +534,7 @@ def run_incremental_discovery(
git_ref.commit_sha,
):
skipped_non_fast_forward += 1
unsafe_to_mark_fresh = True
continue

commits = request.git.list_new_commits(
Expand All @@ -542,6 +552,7 @@ def run_incremental_discovery(
if request.store.has_scan_ledger(key):
ledger_skipped += 1
continue
pending_work_observed = True
job = _scan_job_for_commit(
repo_id=repo_id,
repo_url=repo_url,
Expand All @@ -558,6 +569,13 @@ def run_incremental_discovery(

request.store.put_ref_state(observed_state)

if (
request.mode == DISCOVERY_MODE_ENQUEUE
and not pending_work_observed
and not unsafe_to_mark_fresh
):
_advance_incremental_health(request, repo_id=repo_id)

return IncrementalDiscoverySummary(
targets=summary.targets,
fetch_ok=fetch_ok,
Expand All @@ -570,6 +588,26 @@ def run_incremental_discovery(
)


def _advance_incremental_health(
request: IncrementalDiscoveryRequest,
*,
repo_id: str,
) -> None:
"""Best-effort heartbeat for repos with no new incremental work."""
try:
request.store.advance_repo_health(
repo_id,
job_type=JOB_TYPE_INCREMENTAL,
completed_at=_now(request),
)
except Exception: # noqa: BLE001 - heartbeat failure must not abort discovery.
_LOGGER.warning(
"failed to advance incremental repo health: %s",
repo_id,
exc_info=True,
)


def _scan_ledger_key(
*,
repo_id: str,
Expand Down
136 changes: 136 additions & 0 deletions tests/test_incremental_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ def advance_repo_health(self, repo_id: str, *, job_type: str, completed_at) -> N
self.health_advances.append((repo_id, job_type, completed_at))


class FailingHealthStore(FakeIncrementalStore):
def advance_repo_health(self, repo_id: str, *, job_type: str, completed_at) -> None:
raise RuntimeError("repo health unavailable")


class FakeGitDiscovery:
def __init__(self) -> None:
self.refs_by_path: dict[Path, list[GitRef]] = {}
Expand Down Expand Up @@ -235,6 +240,91 @@ def test_enqueue_no_refs_marks_repo_incrementally_fresh_without_job():
assert store.health_advances == [(repo_id, "incremental", NOW)]


def test_initialize_no_refs_does_not_mark_repo_incrementally_fresh():
repo_path = Path("/synthetic-cache/example-repo")
store = FakeIncrementalStore([TARGET])
git = FakeGitDiscovery()
git.refs_by_path[repo_path] = []

summary = run_incremental_discovery(
_request(
mode=DISCOVERY_MODE_INITIALIZE,
store=store,
git=git,
fetch_repo=lambda url: repo_path,
)
)

assert summary.fetch_ok == 1
assert summary.refs_observed == 0
assert summary.jobs_enqueued == 0
assert store.jobs == {}
assert store.health_advances == []


def test_enqueue_unchanged_refs_marks_repo_incrementally_fresh_without_job():
repo_path = Path("/synthetic-cache/example-repo")
repo_id = repo_id_for_scan_target_url(TARGET.url)
store = FakeIncrementalStore([TARGET])
store.put_ref_state(
RefState(
repo_id=repo_id,
repo_url=TARGET.url,
ref_name=REF_MAIN,
last_seen_sha=OLD_SHA,
updated_at=NOW,
)
)
git = FakeGitDiscovery()
git.refs_by_path[repo_path] = [GitRef(ref_name=REF_MAIN, commit_sha=OLD_SHA)]

summary = run_incremental_discovery(
_request(
mode=DISCOVERY_MODE_ENQUEUE,
store=store,
git=git,
fetch_repo=lambda url: repo_path,
)
)

assert summary.fetch_ok == 1
assert summary.refs_observed == 1
assert summary.jobs_enqueued == 0
assert store.jobs == {}
assert store.health_advances == [(repo_id, "incremental", NOW)]


def test_enqueue_unchanged_refs_continues_when_health_heartbeat_fails():
repo_path = Path("/synthetic-cache/example-repo")
repo_id = repo_id_for_scan_target_url(TARGET.url)
store = FailingHealthStore([TARGET])
store.put_ref_state(
RefState(
repo_id=repo_id,
repo_url=TARGET.url,
ref_name=REF_MAIN,
last_seen_sha=OLD_SHA,
updated_at=NOW,
)
)
git = FakeGitDiscovery()
git.refs_by_path[repo_path] = [GitRef(ref_name=REF_MAIN, commit_sha=OLD_SHA)]

summary = run_incremental_discovery(
_request(
mode=DISCOVERY_MODE_ENQUEUE,
store=store,
git=git,
fetch_repo=lambda url: repo_path,
)
)

assert summary.fetch_ok == 1
assert summary.refs_observed == 1
assert summary.jobs_enqueued == 0
assert store.jobs == {}


def test_enqueue_skips_commits_present_in_ledger():
repo_path = Path("/synthetic-cache/example-repo")
repo_id = repo_id_for_scan_target_url(TARGET.url)
Expand Down Expand Up @@ -279,6 +369,52 @@ def test_enqueue_skips_commits_present_in_ledger():
assert store.ref_states[(repo_id, REF_MAIN)].last_seen_sha == NEW_SHA


def test_enqueue_all_commits_present_in_ledger_marks_repo_incrementally_fresh():
repo_path = Path("/synthetic-cache/example-repo")
repo_id = repo_id_for_scan_target_url(TARGET.url)
store = FakeIncrementalStore([TARGET])
store.put_ref_state(
RefState(
repo_id=repo_id,
repo_url=TARGET.url,
ref_name=REF_MAIN,
last_seen_sha=OLD_SHA,
updated_at=NOW,
)
)
scanner = _scanner()
for commit_sha in (MID_SHA, NEW_SHA):
store.ledger.add(
ScanLedgerKey(
repo_id=repo_id,
commit_sha=commit_sha,
scanner_name=scanner.scanner_name,
scanner_version=scanner.scanner_version,
rule_pack_version=scanner.rule_pack_version,
scanner_config_hash=scanner.scanner_config_hash,
)
)
git = FakeGitDiscovery()
git.refs_by_path[repo_path] = [GitRef(ref_name=REF_MAIN, commit_sha=NEW_SHA)]
git.ancestor_results[(repo_path, OLD_SHA, NEW_SHA)] = True
git.commits_by_range[(repo_path, OLD_SHA, NEW_SHA)] = [MID_SHA, NEW_SHA]

summary = run_incremental_discovery(
_request(
mode=DISCOVERY_MODE_ENQUEUE,
store=store,
git=git,
fetch_repo=lambda url: repo_path,
)
)

assert summary.ledger_skipped == 2
assert summary.jobs_enqueued == 0
assert store.jobs == {}
assert store.ref_states[(repo_id, REF_MAIN)].last_seen_sha == NEW_SHA
assert store.health_advances == [(repo_id, "incremental", NOW)]


def test_non_fast_forward_is_reported_without_advancing_ref_state():
repo_path = Path("/synthetic-cache/example-repo")
repo_id = repo_id_for_scan_target_url(TARGET.url)
Expand Down
8 changes: 8 additions & 0 deletions tests/test_m4_poll_baseline.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,10 @@ def fetch_repo(url: str) -> Path:
assert summary.fetch_ok == 0
assert fetched_urls == [] # SKIP: no fetch ran for idle repos
assert summary.jobs_enqueued == 0
assert store.health_advances == [
(repo_id_a, JOB_TYPE_INCREMENTAL, NOW),
(repo_id_b, JOB_TYPE_INCREMENTAL, NOW),
]


def test_cursor_shas_for_glob_pattern_yields_non_empty_cursor():
Expand Down Expand Up @@ -448,6 +452,10 @@ def fetch_repo(url: str) -> Path:
assert summary.fetch_ok == 0
assert fetched_urls == []
assert summary.jobs_enqueued == 0
assert store.health_advances == [
(repo_id_for_scan_target_url(INCLUDED_A), JOB_TYPE_INCREMENTAL, NOW),
(repo_id_for_scan_target_url(INCLUDED_B), JOB_TYPE_INCREMENTAL, NOW),
]


def test_discovery_ls_remote_changed_fetches_and_enqueues_incremental_job():
Expand Down
Loading