Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ Documentation=https://github.com/source-security-dev/security-scanner
[Timer]
OnActiveSec=2min
OnUnitActiveSec=2min
Persistent=true
RandomizedDelaySec=15
Unit=security-scanner-personal-lease-reaper.service

Expand Down
2 changes: 1 addition & 1 deletion deploy/systemd/user/securityscanner.slice
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ IOAccounting=true
TasksAccounting=true
CPUQuota=150%
MemoryMax=3G
TasksMax=512
TasksMax=1024
IOWeight=100
12 changes: 5 additions & 7 deletions src/security_scanner/runtime/incremental_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,13 +498,11 @@ def run_incremental_discovery(
fetch_ok += 1
refs_observed += len(refs)
if not refs:
advance = getattr(request.store, "advance_repo_health", None)
if advance is not None:
advance(
repo_id,
job_type=JOB_TYPE_INCREMENTAL,
completed_at=_now(request),
)
request.store.advance_repo_health(
repo_id,
job_type=JOB_TYPE_INCREMENTAL,
completed_at=_now(request),
)
continue

for git_ref in refs:
Expand Down
35 changes: 25 additions & 10 deletions src/security_scanner/runtime/scan_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@

from security_scanner.core.finding.model import Finding
from security_scanner.core.scan.options import ScanOptions
from security_scanner.runtime.baseline_enqueue import BASELINE_COMMIT_SENTINEL
from security_scanner.runtime.branch_residual import (
branch_from_ref,
finding_with_context,
)
from security_scanner.scanners.gitleaks.scanner import GitleaksScanner
from security_scanner.storage.base import (
IncrementalScanStore,
JOB_TYPE_BASELINE,
IncrementalScanStore,
ScanJob,
ScanLedgerEntry,
)
Expand Down Expand Up @@ -234,7 +235,7 @@ def make_default_scanner() -> GitleaksScanner:

def _scan_options_for_job(job: ScanJob) -> ScanOptions:
"""Return gitleaks scan options for one queued job."""
if job.job_type == JOB_TYPE_BASELINE:
if _is_baseline_job(job):
return ScanOptions(include_history=True)
return ScanOptions(
include_history=True,
Expand All @@ -244,26 +245,40 @@ def _scan_options_for_job(job: ScanJob) -> ScanOptions:

def _finding_context_for_job(job: ScanJob) -> tuple[str | None, str | None]:
"""Return occurrence context to stamp onto scanner findings."""
if job.job_type == JOB_TYPE_BASELINE:
if _is_baseline_job(job):
return None, None
return job.commit_sha, branch_from_ref(job.ref_name)


def _is_baseline_job(job: ScanJob) -> bool:
"""Return whether a job should run as a full-history baseline scan."""
return (
job.job_type == JOB_TYPE_BASELINE
or job.commit_sha == BASELINE_COMMIT_SENTINEL
)


def _repo_health_job_type(job: ScanJob) -> str:
if _is_baseline_job(job):
return JOB_TYPE_BASELINE
return job.job_type


def _advance_repo_health(
request: ScanWorkerRequest, job: ScanJob, *, completed_at: dt.datetime
) -> None:
"""Advance per-repo freshness after a successful completion (FR-7/SC-5).

Keyed by ``job.job_type`` so an incremental completion advances the
incremental field and a baseline completion the full-scan field, via the
store's attribute-scoped advancing-only conditional write. Guarded with
``hasattr`` so a minimal store fake that predates REPO_HEALTH still runs the
worker; a real store always implements it.
store's attribute-scoped advancing-only conditional write. Legacy baseline
jobs that predate ``jobType`` are recognized by their commit sentinel.
"""
advance = getattr(request.store, "advance_repo_health", None)
if advance is None:
return
advance(job.repo_id, job_type=job.job_type, completed_at=completed_at)
request.store.advance_repo_health(
job.repo_id,
job_type=_repo_health_job_type(job),
completed_at=completed_at,
)


def _scan_run_id_for_job(job: ScanJob) -> str:
Expand Down
3 changes: 2 additions & 1 deletion src/security_scanner/targets/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from typing import Literal
from urllib.parse import urlsplit


ScmProvider = Literal["auto", "github", "gitlab"]


Expand Down Expand Up @@ -44,6 +43,8 @@ class UnsupportedHostError(FetchError):

def _default_cache_root() -> Path:
configured = os.environ.get("SECURITY_SCANNER_CACHE_ROOT")
if configured is not None:
configured = configured.strip()
if configured:
return Path(configured).expanduser()
return Path.home() / ".cache" / "security-scanner" / "repos"
Expand Down
4 changes: 4 additions & 0 deletions tests/test_cli_scan_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def __init__(self, jobs: list[ScanJob] | None = None) -> None:
self.retry_failures: list[tuple[str, str, dt.datetime]] = []
self.pending_returns: list[tuple[str, str]] = []
self.ledger_keys: set[ScanLedgerKey] = set()
self.health_advances: list[tuple[str, str, dt.datetime]] = []
self.repo_lease_available = True

def lease_next_scan_job(self, worker_id, lease_seconds, now):
Expand Down Expand Up @@ -51,6 +52,9 @@ def complete_processed_job(self, job, findings, ledger):
self.completed.append((job, list(findings), ledger))
self.ledger_keys.add(ledger.key)

def advance_repo_health(self, repo_id, *, job_type, completed_at):
self.health_advances.append((repo_id, job_type, completed_at))

def record_retryable_failure(
self, job_id, error, next_attempt_at, *, worker_id=None, fence=None
):
Expand Down
28 changes: 27 additions & 1 deletion tests/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def test_existing_cache_path_triggers_git_fetch(monkeypatch, tmp_path):


def test_default_cache_root_uses_home(monkeypatch, tmp_path):
"""When cache_root is omitted, path is ~/.cache/security-scanner/repos/<owner>/<repo>."""
"""When cache_root is omitted, path is under ~/.cache/security-scanner."""
monkeypatch.delenv("SECURITY_SCANNER_CACHE_ROOT", raising=False)
monkeypatch.setattr(Path, "home", classmethod(lambda cls: tmp_path))

Expand Down Expand Up @@ -183,6 +183,32 @@ def test_default_cache_root_can_be_overridden_by_env(monkeypatch, tmp_path):
assert cmd[4] == str(expected_path)


def test_blank_cache_root_env_uses_home_default(monkeypatch, tmp_path):
monkeypatch.setenv("SECURITY_SCANNER_CACHE_ROOT", " ")
monkeypatch.setattr(Path, "home", classmethod(lambda cls: tmp_path))

calls = []
monkeypatch.setattr(
"security_scanner.targets.fetcher.subprocess.run",
_record_run(calls),
)

result = fetch_or_clone("https://github.com/octocat/hello-world")

expected_path = (
tmp_path
/ ".cache"
/ "security-scanner"
/ "repos"
/ "github.com"
/ "octocat"
/ "hello-world"
)
assert result == expected_path
cmd, _ = calls[0]
assert cmd[4] == str(expected_path)


def test_missing_gh_falls_back_to_public_git_clone(monkeypatch, tmp_path):
calls = []

Expand Down
4 changes: 4 additions & 0 deletions tests/test_incremental_scan_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def __init__(self, targets: Sequence[ScanTarget]) -> None:
self.ledger: dict[ScanLedgerKey, ScanLedgerEntry] = {}
self.repo_leases: dict[str, RepoLease] = {}
self.findings: list[Finding] = []
self.health_advances: list[tuple[str, str, dt.datetime]] = []

def list_scan_targets(self) -> list[ScanTarget]:
return list(self.targets)
Expand Down Expand Up @@ -115,6 +116,9 @@ def complete_processed_job(
}
)

def advance_repo_health(self, repo_id: str, *, job_type: str, completed_at) -> None:
self.health_advances.append((repo_id, job_type, completed_at))

def record_retryable_failure(
self,
job_id: str,
Expand Down
4 changes: 4 additions & 0 deletions tests/test_m4_poll_baseline.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def __init__(
self.jobs: dict[str, ScanJob] = {}
self.ledger: set[ScanLedgerKey] = set()
self.pending_backlog = 0
self.health_advances: list[tuple[str, str, dt.datetime]] = []

# CatalogStore
def read_all_catalog_entries(self) -> list[CatalogEntry]:
Expand Down Expand Up @@ -118,6 +119,9 @@ def enqueue_commit_scan_job(self, job: ScanJob) -> bool:
self.jobs[job.job_id] = job
return True

def advance_repo_health(self, repo_id: str, *, job_type: str, completed_at) -> None:
self.health_advances.append((repo_id, job_type, completed_at))

def get_queue_status(self, now: dt.datetime) -> QueueStatus:
return QueueStatus(
job_counts_by_status={"pending": self.pending_backlog},
Expand Down
31 changes: 24 additions & 7 deletions tests/test_personal_prod_systemd_units.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
WORKER_TEMPLATE.name,
*(service for service, _, _ in PERIODIC_UNITS.values()),
]
ALL_USER_SYSTEMD_FILES = sorted(
path
for path in USER_SYSTEMD_DIR.glob("security-scanner-personal*")
if path.is_file()
)


def _parse_unit(path: Path) -> configparser.ConfigParser:
Expand All @@ -54,7 +59,7 @@ def test_personal_slice_sets_aggregate_caps() -> None:
slice_section = dict(parser.items("Slice"))
assert slice_section["CPUQuota"] == "150%"
assert slice_section["MemoryMax"] == "3G"
assert slice_section["TasksMax"] == "512"
assert int(slice_section["TasksMax"]) >= 1024
assert slice_section["IOWeight"] == "100"


Expand All @@ -65,14 +70,18 @@ def test_personal_worker_template_is_capped_and_instanced() -> None:
assert service["Slice"] == "securityscanner.slice"
assert service["Nice"] == "15"
assert service["IOSchedulingClass"] == "idle"
assert service["EnvironmentFile"] == "-%h/.config/security-scanner/personal-prod.env"
assert service["EnvironmentFile"] == (
"-%h/.config/security-scanner/personal-prod.env"
)
assert service["WorkingDirectory"] == "%h/security-scanner"
assert "scan-worker" in service["ExecStart"]
assert "--daemon" in service["ExecStart"]
assert "security-scanner-personal-scan-worker@%i" in service["ExecStart"]
assert "--notification-log" not in service["ExecStart"]
assert service["Restart"] == "on-failure"
assert parser.get("Install", "WantedBy") == "security-scanner-personal-workers.target"
assert parser.get("Install", "WantedBy") == (
"security-scanner-personal-workers.target"
)


def test_personal_worker_target_is_user_level() -> None:
Expand All @@ -94,7 +103,9 @@ def test_personal_periodic_units_are_user_level_and_scheduled(
assert service_section["Slice"] == "securityscanner.slice"
assert service_section["Nice"] == "15"
assert service_section["IOSchedulingClass"] == "idle"
assert service_section["EnvironmentFile"] == "-%h/.config/security-scanner/personal-prod.env"
assert service_section["EnvironmentFile"] == (
"-%h/.config/security-scanner/personal-prod.env"
)
assert "uv run security-scanner" in service_section["ExecStart"]
assert service.get("Install", "WantedBy") == "default.target"

Expand All @@ -104,13 +115,17 @@ def test_personal_periodic_units_are_user_level_and_scheduled(
if calendar is None:
assert timer_section["OnActiveSec"] == "2min"
assert timer_section["OnUnitActiveSec"] == "2min"
assert "Persistent" not in timer_section
else:
assert timer_section["OnCalendar"] == calendar
assert timer_section["Persistent"] == "true"
assert timer.get("Install", "WantedBy") == "timers.target"


@pytest.mark.parametrize("service_name", ALL_SERVICE_FILES)
def test_personal_units_use_personal_table_and_no_docker_mutation(service_name: str) -> None:
def test_personal_units_use_personal_table_and_no_docker_mutation(
service_name: str,
) -> None:
text = (USER_SYSTEMD_DIR / service_name).read_text(encoding="utf-8")
assert "SECURITY_SCANNER_DYNAMO_ENDPOINT=http://localhost:4567" in text
assert "SECURITY_SCANNER_DYNAMO_TABLE=security_scanner_personal" in text
Expand All @@ -128,10 +143,12 @@ def test_personal_units_use_personal_table_and_no_docker_mutation(service_name:
)
def test_personal_clone_cache_is_isolated(service_name: str) -> None:
text = (USER_SYSTEMD_DIR / service_name).read_text(encoding="utf-8")
assert "SECURITY_SCANNER_CACHE_ROOT=%h/.cache/security-scanner-personal/repos" in text
assert (
"SECURITY_SCANNER_CACHE_ROOT=%h/.cache/security-scanner-personal/repos" in text
)


@pytest.mark.parametrize("path", [SLICE, WORKER_TEMPLATE, WORKER_TARGET])
@pytest.mark.parametrize("path", [SLICE, *ALL_USER_SYSTEMD_FILES])
def test_personal_artifacts_avoid_machine_local_paths_and_accounts(path: Path) -> None:
text = path.read_text(encoding="utf-8")
forbidden = ("/home/", "/Users/", "/srv/", "/var/", "pureliture")
Expand Down
4 changes: 4 additions & 0 deletions tests/test_quickstart.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(self) -> None:
self.jobs: dict[str, ScanJob] = {}
self.ledger: dict[ScanLedgerKey, ScanLedgerEntry] = {}
self.repo_leases: dict[str, RepoLease] = {}
self.health_advances: list[tuple[str, str, dt.datetime]] = []

def bootstrap(self) -> None:
self.bootstrap_calls += 1
Expand Down Expand Up @@ -98,6 +99,9 @@ def complete_processed_job(
}
)

def advance_repo_health(self, repo_id: str, *, job_type: str, completed_at) -> None:
self.health_advances.append((repo_id, job_type, completed_at))

def record_retryable_failure(
self,
job_id: str,
Expand Down
24 changes: 24 additions & 0 deletions tests/test_scan_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,16 @@ def _baseline_job() -> ScanJob:
)


def _legacy_baseline_job() -> ScanJob:
job = _baseline_job()
return ScanJob(
**{
**job.__dict__,
"job_type": "incremental",
}
)


def test_completed_incremental_job_advances_incremental_repo_health():
# SC-5: a successful incremental completion advances the incremental field,
# keyed by job_type, with the scan completion timestamp.
Expand Down Expand Up @@ -523,6 +533,20 @@ def test_baseline_job_runs_full_history_without_sentinel_context():
assert ledger.commit_sha == "baseline"


def test_legacy_baseline_sentinel_runs_full_history_and_advances_baseline_health():
store = FakeWorkerStore([_legacy_baseline_job()])
scanner = FakeScanner(findings=[_finding(commit=None)])

run_scan_worker_once(_request(store, scanner))

call = scanner.calls[0]
assert call["scan_options"] == ScanOptions(include_history=True, git_log_opts=None)
_, findings, _ledger = store.completed[0]
assert findings[0].repo.commit is None
assert findings[0].repo.branch is None
assert store.health_advances == [(REPO_ID, JOB_TYPE_BASELINE, NOW)]


def test_ledger_already_present_completion_still_advances_repo_health():
# The fast path (ledger already exists) is still a successful completion for
# this repo, so freshness must advance.
Expand Down
Loading