Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,10 @@ build/
!.claude/skills/**
.routine-harness/

# Multi-agent scratch (briefings/handoffs may contain private absolute paths)
.agents/*
!.agents/skills/
!.agents/skills/**

# Local git worktrees
.worktrees/
5 changes: 4 additions & 1 deletion src/security_scanner/cli/commands/vulnerability.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from security_scanner.cli._store import dynamodb_config_from_args
from security_scanner.runtime.vulnerability_freshness import (
VulnFreshnessRequest,
VulnFreshnessResult,
run_vuln_freshness_eval,
)
from security_scanner.runtime.vulnerability_scan import (
Expand Down Expand Up @@ -331,7 +332,9 @@ def cmd_vuln_freshness_eval(args: argparse.Namespace) -> int:
return 0


def _append_freshness_notification(log_path: str, result) -> None:
def _append_freshness_notification(
log_path: str, result: VulnFreshnessResult
) -> None:
Comment on lines +335 to +337

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

일반 규칙에 따르면, 주 트랜잭션에 영향을 주지 않는 베스트 에포트(best-effort) 성격의 비트랜잭션 보조 작업(예: 알림 파일 추가, 로그 기록 등)은 개별적으로 인라인 try-except 블록을 반복하기보다, 일시적인 실패를 잡아내고 exc_info와 함께 로그를 남긴 후 메인 프로세스를 계속 진행시키는 중앙 집중식 헬퍼 함수를 통해 라우팅해야 합니다. _append_freshness_notification 작업을 이러한 공통 헬퍼 함수를 사용하도록 수정하는 것을 권장합니다.

References
  1. Route best-effort, non-transactional secondary operations (such as heartbeat writes) through a centralized helper function that catches transient failures, logs them with exc_info, and allows the main process to continue, rather than repeating inline try-except blocks or letting failures abort the main transaction.

path = Path(log_path)
path.parent.mkdir(parents=True, exist_ok=True)
record = {
Expand Down
19 changes: 14 additions & 5 deletions src/security_scanner/runtime/vulnerability_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def resolve_semgrep_binary(binary: str) -> str | None:
resolved = shutil.which(binary)
if resolved is not None:
return resolved
candidate = Path(binary)
candidate = Path(binary).expanduser()
if candidate.is_absolute() and candidate.exists():
return str(candidate)
Comment on lines +209 to 211

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

바이너리 경로가 디렉토리가 아닌 실제 파일인지 확인하기 위해 candidate.exists() 대신 candidate.is_file()을 사용하는 것이 더 안전합니다. 만약 사용자가 실수로 디렉토리 경로를 지정했을 경우, 존재 여부만 체크하면 디렉토리 경로를 그대로 반환하여 이후 실행 단계에서 권한 오류(PermissionError)나 실행 불가 오류가 발생할 수 있습니다.

Suggested change
candidate = Path(binary).expanduser()
if candidate.is_absolute() and candidate.exists():
return str(candidate)
candidate = Path(binary).expanduser()
if candidate.is_absolute() and candidate.is_file():
return str(candidate)

return None
Expand Down Expand Up @@ -312,12 +312,12 @@ def _scan_one_catalog_repo(
artifact_path = (
artifact_root / _safe_repo_dir(entry.repo_id) / f"{request.scan_run_id}.jsonl"
)
if artifact_path.exists():
raise ValueError(
f"refusing to clobber existing vuln artifact: {artifact_path}"
)

try:
if artifact_path.exists():
raise ValueError(
f"refusing to clobber existing vuln artifact: {artifact_path}"
)
result = run_vulnerability_scan(
ScanVulnerabilityRequest(
root=cache_path,
Expand All @@ -337,6 +337,15 @@ def _scan_one_catalog_repo(
status=status,
detail=_short_detail(exc),
)
except Exception as exc: # noqa: BLE001 - per-repo isolation: one repo's
# unexpected failure (clobber guard, import/SARIF errors, ...) must never
# abort the whole catalog pass; record it as a scan-error outcome.
return RepoScanOutcome(
repo_id=entry.repo_id,
repo_url=entry.repo_url,
status="scan-error",
detail=_short_detail(exc),
)

return RepoScanOutcome(
repo_id=entry.repo_id,
Expand Down
64 changes: 53 additions & 11 deletions tests/test_vuln_catalog_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import json
from pathlib import Path

import pytest

from security_scanner.runtime.vulnerability_scan import (
CatalogScanRequest,
run_catalog_vulnerability_scan,
Expand Down Expand Up @@ -164,8 +162,10 @@ def test_two_runs_distinct_scan_run_ids_do_not_clobber(tmp_path):
assert (artifact_dir / "alpha" / "run2.jsonl").is_file()


def test_reusing_same_scan_run_id_raises_no_clobber_guard(tmp_path):
store = FakeStore([_entry("alpha", included=True)])
def test_reusing_same_scan_run_id_is_isolated_scan_error(tmp_path):
# The no-clobber guard must NOT abort the whole pass; a reused scan_run_id
# surfaces as an isolated per-repo scan-error outcome (PR #64 review HIGH).
store = FakeStore([_entry("alpha", included=True), _entry("beta", included=True)])
artifact_dir = tmp_path / "artifacts"
fetcher = _make_fetcher(tmp_path)

Expand All @@ -175,13 +175,20 @@ def test_reusing_same_scan_run_id_raises_no_clobber_guard(tmp_path):
fetcher=fetcher,
runner=FakeRunner(),
)
with pytest.raises(ValueError, match="clobber"):
run_catalog_vulnerability_scan(
CatalogScanRequest(artifact_dir=artifact_dir, scan_run_id="run1"),
store=store,
fetcher=fetcher,
runner=FakeRunner(),
)
# Second pass with the same id: alpha would clobber; the pass still completes
# and reports the clobber as a scan-error rather than raising.
result = run_catalog_vulnerability_scan(
CatalogScanRequest(artifact_dir=artifact_dir, scan_run_id="run1"),
store=store,
fetcher=fetcher,
runner=FakeRunner(),
)

assert result.preflight_ok is True
by_repo = {o.repo_id: o for o in result.outcomes}
assert by_repo["alpha"].status == "scan-error"
assert "clobber" in (by_repo["alpha"].detail or "")
assert by_repo["beta"].status == "scan-error"


def test_fetch_failure_for_one_repo_does_not_abort_run(tmp_path):
Expand Down Expand Up @@ -241,3 +248,38 @@ def test_preflight_fails_without_runner_and_bogus_binary(tmp_path):
assert "not found" in result.preflight_detail
# No artifacts written when preflight fails.
assert not artifact_dir.exists()


def test_unexpected_scan_exception_is_isolated_not_aborting(tmp_path):
# An unexpected (non-Semgrep) error in one repo's scan must be caught and
# reported as scan-error, never aborting the batch (PR #64 review HIGH).
store = FakeStore(
[_entry("alpha", included=True), _entry("beta", included=True)]
)
artifact_dir = tmp_path / "artifacts"

class FlakyRunner:
def __init__(self) -> None:
self.calls = 0

def run(self, root_path, *, sarif_path):
self.calls += 1
if self.calls == 1:
raise RuntimeError("unexpected analyzer crash")
Path(sarif_path).write_text(
json.dumps(_sarif_payload()), encoding="utf-8"
)

result = run_catalog_vulnerability_scan(
CatalogScanRequest(artifact_dir=artifact_dir, scan_run_id="run1"),
store=store,
fetcher=_make_fetcher(tmp_path),
runner=FlakyRunner(),
)

by_repo = {o.repo_id: o for o in result.outcomes}
assert by_repo["alpha"].status == "scan-error"
assert "crash" in (by_repo["alpha"].detail or "")
assert by_repo["beta"].status == "scanned"
assert result.error_count == 1
assert result.scanned_count == 1
Loading