From 1b362eb29e8e2bb183e647f2d8ec9c0512dd5b0a Mon Sep 17 00:00:00 2001 From: pureliture Date: Fri, 19 Jun 2026 18:03:35 +0900 Subject: [PATCH 1/3] feat(verifier): line-stable suppression via match_key (#6) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Re-asking the verifier (or a human) about a secret that was already dispositioned false-positive/resolved/ignored is wasteful and noisy. The finding_id includes line_start (#12 L1), so a secret that merely moves lines gets a new identity and would be re-verified despite an existing decision. Add a secondary, line/commit-independent suppression key (match_key) keyed on repo/file/rule + the salted secret_hash (content), never on line/commit. It is a suppression key only — finding_id identity is unchanged. Because secret_hash is content-derived, distinct secrets get distinct keys, so suppression never crosses different leaks. - core: compute_match_key() (mk_ + sha256[:32] over stable material). - items: FINDING_STATE carries matchKey (dropped when no secret_hash). - store: best-effort MATCHKEY# -> disposition pointer written post- transaction for terminal non-blocking statuses only; find_disposition_by_match_key lookup. No GSI3 (pointer-item keeps schema/blast-radius minimal); pointer lag is fail-safe (re-ask). - runtime: resolve_existing_disposition gate (finding_id first, then match_key); scan-all suppression gate skips re-verify, inherits prior verdict on line-move (actor/source="inherited"), and reports inherited_suppressions. Lookup errors fail safe to re-verify. Tests: match_key core + gate units, store-level pointer write/lookup, scan-all inheritance integration. Full suite green (702 + new). Co-Authored-By: Claude Opus 4.8 --- .../specs/stable-suppression/design.md | 127 ++++++++++++++++++ .../specs/stable-suppression/requirements.md | 102 ++++++++++++++ src/security_scanner/core/finding/model.py | 22 +++ .../runtime/disposition_lookup.py | 88 ++++++++++++ src/security_scanner/runtime/scan_all.py | 31 +++++ .../storage/adapters/nosql_db/items.py | 11 +- .../storage/adapters/nosql_db/store.py | 55 +++++++- tests/test_cli_scan_all.py | 125 ++++++++++++++++- tests/test_dynamodb_compatible_store.py | 98 ++++++++++++++ tests/test_match_key_suppression.py | 119 ++++++++++++++++ 10 files changed, 775 insertions(+), 3 deletions(-) create mode 100644 docs/workbench/specs/stable-suppression/design.md create mode 100644 docs/workbench/specs/stable-suppression/requirements.md create mode 100644 src/security_scanner/runtime/disposition_lookup.py create mode 100644 tests/test_match_key_suppression.py diff --git a/docs/workbench/specs/stable-suppression/design.md b/docs/workbench/specs/stable-suppression/design.md new file mode 100644 index 0000000..e0a46ba --- /dev/null +++ b/docs/workbench/specs/stable-suppression/design.md @@ -0,0 +1,127 @@ +# design.md — #6 라인 이동에 강인한 suppression + +## 1. 개요 + +동일 secret의 **라인 이동**으로 finding_id가 바뀌어 발생하는 verifier 재질문(re-ask)을, **finding_id를 변경하지 않는 additive 방식**으로 해결한다. 라인·commit과 무관한 content 기반 보조 키 `match_key`를 도입하고, 신규 disposition-lookup gate가 finding_id(1순위)→match_key(2순위)로 기존 terminal non-blocking 판정을 조회해 재질문을 skip한다. + +핵심 설계 원칙: **finding_id identity 불변(#12 L1 보존), match_key는 보조 suppression 키일 뿐 identity가 아님.** + +## 2. 요구사항 참조 + +- R1–R3: match_key 정의/안정성/구분성 → §4, §5. +- R4: item 기록/GSI → §6. +- R5–R8: lookup gate/승계/멱등/summary → §7, §8. +- N1–N3: 비파괴/단일 query/fail-safe → §10, §11. + +## 3. 접근 후보와 선택 + +| 후보 | 내용 | 라인 안정 | finding_id 영향 | 결정 | +|---|---|---|---|---| +| (a) gitleaks-native fingerprint 선호 | combine_repo_fingerprint 우선 | ✗ (gitleaks FP=`commit:file:rule:startline`, line·commit 포함; context7 확인) | 이미 기본 | **기각** | +| (b) 별도 content 기반 match_key + lookup gate | finding_id 불변, 보조 키로 disposition 조회 | ✓ (salted secret_hash 기반) | **없음** | **채택** | +| (c) versioned fingerprint + migration | fingerprint 버전화 후 재-ID | ✓ | 전체 재-ID | **기각(#12 L1 위반)** | + +선택 근거는 requirements.md §7(Q2–Q4). (a)는 context7로 확인한 gitleaks Fingerprint 포맷이 라인·commit을 포함해 라인 안정성이 없어 탈락. (c)는 finding_id 전체 재발급으로 #12 L1·`FINDING#` 파티션 키 위반. (b)는 `occurrence_key_for_finding`의 content fallback 선례를 따른 최소 침습안. + +## 4. 아키텍처 + +``` +scanner(mapper) → Finding(finding_id 불변) + │ + ┌────────────┴─────────────┐ + │ core/finding/model.py │ ← compute_match_key() 신규 (순수, 의존 없음) + └────────────┬─────────────┘ + │ match_key + ┌─────────────────┴───────────────────┐ + │ storage/adapters/nosql_db/items.py │ finding_to_items: matchKey 컬럼 + GSI 키 + │ storage/adapters/nosql_db/store.py │ find_disposition_by_match_key() 신규 + └─────────────────┬────────────────────┘ + │ + ┌─────────────────┴───────────────────┐ + │ runtime: disposition_lookup gate │ finding_id→match_key 순 조회 + │ scan_all / verify_artifact 에서 사용 │ 승계 시 verify skip + inherited event + └──────────────────────────────────────┘ +``` + +## 5. 구성요소 + +### 5.1 `compute_match_key` (core/finding/model.py, 신규·순수) +``` +def compute_match_key(repo_full_name, file_path, rule_id, secret_hash) -> str: + material = {"repo": repo_full_name, "file": file_path, + "rule": rule_id, "secretHash": secret_hash} + encoded = json.dumps(material, sort_keys=True, separators=(",", ":")) + return "mk_" + hashlib.sha256(encoded.encode()).hexdigest()[:32] +``` +- `secret_hash`는 `hash_secret()` 산출물(salted SHA-256, 라인·commit 무관). +- 라인/commit/start_line을 **포함하지 않음** → 라인 이동에 안정. +- `Finding`에 `match_key`는 저장 필드로 추가하지 않고(스키마 round-trip 비파괴), storage 레이어에서 `compute_match_key(repo, file, rule, evidence.secret_hash)`로 파생한다(N1 보존). secret_hash가 없으면 match_key 미생성(legacy/edge). + +### 5.2 items.py +- `finding_to_items`의 FINDING_STATE·observation에 `matchKey` 추가(additive, `without_none` drop). +- FINDING_STATE에 `match_key→finding_id` 보조 GSI: 예 `gsi3pk = MATCHKEY#`, `gsi3sk = STATE##` (시간 역순 최신 조회용). + +### 5.3 store.py — `find_disposition_by_match_key(match_key) -> dict | None` +- `gsi3pk = MATCHKEY#`를 ScanIndexForward=False로 1회 query, terminal non-blocking status인 최신 FINDING_STATE 1건 반환(N2: scan 금지). + +### 5.4 runtime disposition_lookup gate (신규 모듈, 예: `runtime/disposition_lookup.py`) +``` +def resolve_existing_disposition(store, finding) -> ExistingDisposition | None: + st = store.read_finding_state(finding.finding_id) # 1순위 + if st and st["status"] in NON_BLOCKING: return ... + mk = compute_match_key(...); hit = store.find_disposition_by_match_key(mk) # 2순위 + if hit and hit["status"] in NON_BLOCKING: return ...(inherited) + return None +``` + +## 6. 데이터 흐름 + +1. scan → mapper가 Finding 생성(finding_id 불변). +2. verifier 경로 진입 전, 각 finding에 대해 `resolve_existing_disposition` 호출. +3. (1순위) finding_id에 terminal non-blocking state가 있으면 verify skip(기존에도 동일 위치 재발견 처리). +4. (2순위) 라인 이동으로 finding_id는 새 것이지만 match_key가 일치하면 → verify skip + `set_finding_disposition(new_finding_id, ..., source="inherited")` 멱등 호출로 GLOBAL status 채움. +5. 매치 없음 → 기존대로 verifier 실행. +6. write 시 `finding_to_items`가 matchKey/GSI를 함께 기록해 다음 scan부터 2순위 조회 가능. + +## 7. 승계(inherit) 정책 + +- 승계 status: `FALSE_POSITIVE` / `IGNORED` / `RESOLVED`(gate.py `_NON_BLOCKING_STATUSES`와 일치). `OPEN`/`NEEDS_REVIEW` 비승계. +- 새 finding_id에 disposition event 기록: `source="inherited"`, `reason="라인 이동으로 match_key 일치 판정 승계"`, verdict는 원판정 verdict 보존. +- 멱등: `set_finding_disposition`은 이미 version 기반 optimistic concurrency(`#version` 조건)를 쓰므로, 이미 동일 status로 inherited면 재기록 생략(또는 동일 결과로 수렴). 중복 STATE_EVENT 방지를 위해 inherit 전 `read_finding_state(new_id)`로 이미 채워졌는지 확인. + +## 8. 에러 처리 + +- store/GSI 조회 실패: lookup을 fail-open이 아니라 **fail-safe(미승계=재질문)** 로 처리. 즉 조회 실패 시 verifier가 실행되어 over-ask는 되어도 false-suppression은 없음(N3). +- match_key 산출 불가(secret_hash 부재): 2순위 skip, 기존대로 verify. +- 모든 요약은 public-safe: `_public_safe_exception` 패턴 재사용, raw secret/match 비노출(R9). +- scan_all summary에 `inherited_suppressions` 카운트 추가(기존 ScanAllVerifierSummary 확장, additive). + +## 9. 테스트 전략 + +- **단위(core)**: `compute_match_key` 라인 불변(같은 secret·file, line 다름 → 동일), repo/rule/file/secret 다름 → 상이, raw secret 비포함. +- **단위(items/store)**: finding_to_items가 matchKey/gsi3 기록, `find_disposition_by_match_key`가 terminal non-blocking 최신 1건만 반환, legacy(matchKey 없음) miss. +- **runtime gate**: fake store로 (1) finding_id 직접 히트, (2) 라인 이동 match_key 히트→inherited, (3) OPEN 비승계, (4) 조회 실패 시 fail-safe 재질문, (5) inherit 멱등. +- **회귀**: `test_finding.py` 전체 무변경 통과(N1), 기존 round-trip·gate 테스트 통과. +- 모든 fixture는 synthetic(`AKIAFAKEEXAMPLE*`). + +## 10. 마일스톤 + +1. `compute_match_key`(core) + 단위 테스트(red→green). finding_id 영향 0 증명. +2. items.py matchKey/gsi3 additive 기록 + store `find_disposition_by_match_key` + 단위 테스트. +3. runtime disposition_lookup gate(순수 함수+fake store 테스트). +4. scan_all/verify_artifact 배선: verify 전 lookup, 승계 시 skip + inherited event + summary 카운트. +5. public-safe docs(getting-started)에 라인-이동 suppression·salt 회전 경고 추가. + +## 11. 비파괴/Fail-safe 보장 요약 + +- finding_id/fingerprint/compute_fingerprint 무변경 → 기존 stored state·#12 L1 보존. +- matchKey/gsi3는 additive, legacy item은 miss로 graceful. +- lookup 실패·미존재는 재질문(fail-safe), salt 회전은 over-ask 방향. +- JSONL-only 경로는 비대상(capability 부재) — 문서에 명시. + +## 12. 열린 질문 + +- OQ1. 기존 stored finding에 대한 matchKey 백필을 별도 마이그레이션으로 제공할지(현재 비범위, 미백필 시 라인 이동 1회는 재질문될 수 있음). +- OQ2. match_key를 FINDING_STATE에만 둘지 observation에도 둘지 — residual(#12 L2) 조회와의 결합 여부에 따라 결정(기본: state 중심, observation은 진단용 additive). +- OQ3. inherited 시 새 finding_id의 confidence/severity는 원본 그대로 둘지(기본: 그대로, disposition만 승계). +- OQ4. 라인 이동 + 동시 rule 변경(같은 secret, 다른 rule_id)은 의도적으로 비승계 — 정책 확정 여부. \ No newline at end of file diff --git a/docs/workbench/specs/stable-suppression/requirements.md b/docs/workbench/specs/stable-suppression/requirements.md new file mode 100644 index 0000000..1324548 --- /dev/null +++ b/docs/workbench/specs/stable-suppression/requirements.md @@ -0,0 +1,102 @@ +# requirements.md — #6 라인 이동에 강인한 suppression + +## 1. 배경 / 문제 + +`compute_fingerprint(repo, file_path, line_start, rule_id)`는 `line_start`를 포함하고, `finding_id = "finding_" + SHA-1(fingerprint)[:16]`로 파생된다(`core/finding/model.py`). 따라서 **같은 secret이 파일 안에서 라인만 이동하면 finding_id가 바뀐다.** 새 finding_id에는 기존 disposition(예: FALSE_POSITIVE)이 연결되지 않으므로 verifier가 동일 secret을 **다시 묻는다(re-ask)**. + +추가로 코드 조사에서 확인된 더 큰 사실: + +- `scan_all._run_verifier_disposition_writes`와 `verify_artifact.run_verify_artifact`는 **fresh-scan finding 전부를 무조건 verify**한다. `read_finding_state`로 기존 판정을 조회해 재질문을 skip하는 경로가 **존재하지 않는다**(grep 확인). 즉 라인 이동이 아니어도 매 scan마다 재질문이 발생한다. +- 즉 #6의 "suppression을 라인 이동에 강인하게"는, 라인 안정 키만으로는 효과가 없고 **"기존 disposition을 조회해 재질문을 skip하는 lookup gate"를 함께 도입**해야 의미가 있다. + +## 2. CRITICAL 제약 (불변식) + +- `compute_fingerprint` / `compute_finding_id` / `Finding.create`의 finding_id 산출 로직은 **변경하지 않는다.** finding_id는 `items.py`의 `FINDING#` identity/state/observation 키와 #12 L1(branch/commit=occurrence, status=GLOBAL) 결정의 뿌리이며, 바꾸면 저장된 모든 FINDING_STATE/STATE_EVENT/observation/disposition ledger가 재-ID되어 dedup·residual·상태가 깨진다. +- 변경은 **additive·non-breaking**이어야 한다. 기존 item·테스트·round-trip 계약을 보존한다. +- public-safe/redacted-only, local-first. target-repo PR gating·알림/엔드포인트 연동 없음. + +## 3. 범위 (In Scope) + +- 라인 이동에 안정적인 **별도 보조 키 `match_key`**(content 기반, 라인·commit 비포함) 정의 및 산출. +- `match_key`를 FINDING_STATE / observation item에 **additive 컬럼**으로 기록 + `match_key → finding_id` 보조 GSI. +- 신규 **disposition-lookup gate**: `finding_id` 직접 매치 → 실패 시 `match_key` 매치 순으로 기존 terminal non-blocking disposition을 조회. +- verifier 경로(`scan_all`·`verify_artifact`)에서, 이미 terminal non-blocking 판정을 가진 finding은 **재질문 skip**하고 새 finding_id에 `source="inherited"` disposition event를 멱등 기록. + +## 4. 비범위 (Out of Scope, YAGNI) + +- finding_id/fingerprint 스키마 변경, versioned fingerprint, 전체 마이그레이션. +- 파일 rename/이동 추적, secret 값 변경(키 로테이션) 추적. +- JSONL-only 로컬 경로에 대한 disposition/suppression(현재 미지원 capability). +- 기존 stored item 백필(별도 작업으로 분리). +- 알림·target PR gating·외부 endpoint. + +## 5. 기능 요구사항 + +- R1. `match_key`는 `salted secret_hash`(라인·commit 무관) + `repo_full_name` + `rule_id` + 정규화 `file_path`의 canonical JSON을 SHA-256한 값으로 산출한다. 접두사 `mk_`. +- R2. 같은 secret이 같은 파일·repo·rule에서 라인만 다르면 `match_key`가 **동일**해야 한다. 라인이 달라도 `match_key`는 불변임을 테스트로 증명한다. +- R3. 다른 repo / 다른 rule / 다른 파일 / 다른 secret이면 `match_key`가 **달라야** 한다(over-suppress 방지). +- R4. `finding_to_items`는 `match_key`를 FINDING_STATE·observation item에 기록하고, `match_key→finding_id` 보조 GSI 키를 추가한다. `without_none`으로 누락 시 drop(legacy back-compat). +- R5. lookup gate는 (1) `read_finding_state(finding_id)` → terminal non-blocking이면 그대로 승계, (2) 없으면 `match_key`로 가장 최근 terminal non-blocking disposition을 조회한다. +- R6. 승계 대상 status는 `FALSE_POSITIVE`, `IGNORED`, `RESOLVED`만(gate.py non-blocking 집합과 일치). `OPEN`/`NEEDS_REVIEW`는 승계하지 않는다. +- R7. 라인 이동으로 새 finding_id가 생긴 경우, 그 finding_id에 `set_finding_disposition(..., source="inherited", actor=<원판정 actor 보존 or "match-key-inherit">)`를 호출해 GLOBAL status를 채운다. 동일 finding_id에 대해 멱등이어야 한다(중복 inherit 금지). +- R8. verifier 경로는 승계가 일어나면 해당 finding을 verify 호출에서 **skip**하고 summary에 `inherited_suppressions` 카운트를 노출한다(public-safe). +- R9. `match_key` 값·lookup 결과·event 어디에도 raw secret/match가 노출되지 않는다(redacted-only). + +## 6. 비기능 요구사항 + +- N1. 기존 `test_finding.py`(fingerprint/finding_id 기대값)와 round-trip 계약은 **변경 없이 통과**해야 한다. +- N2. lookup은 NoSQL adapter 단일-테이블 access pattern 내에서 GSI 1회 query로 수행(scan 금지). +- N3. salt 회전 시 match-key suppression이 리셋되어도 **over-ask(fail-safe)** 방향이어야 하며, over-suppress가 되어선 안 된다. + +## 7. 확정 결정 (자문자답) + +### Q1. finding_id를 라인 비포함으로 바꿔 근본 해결할까? +**A. 아니오.** `compute_fingerprint=[repo,path,line,rule]`·`finding_id=SHA-1(fingerprint)`는 `items.py`의 `FINDING#` PK/SK/GSI와 #12 L1의 정체성 뿌리다. 변경 시 저장된 FINDING_STATE/STATE_EVENT/observation/disposition이 전부 재-ID되어 dedup·residual·상태가 붕괴한다. → finding_id **불변** 고정. + +### Q2. 옵션(a) gitleaks-native fingerprint 선호로 라인 안정성이 생기나? +**A. 아니오, 기각.** `mapper.py`는 gitleaks `Fingerprint`가 있으면 이미 `fingerprint_override`로 finding_id를 그걸로 derive한다(기본 동작). 그러나 context7로 확인한 gitleaks v8 Fingerprint 포맷은 `commit:file:rule:startline`(예: `6e6ee65...:path:generic-api-key:4`)로 **start line을 포함**하고 git 모드는 **commit까지 포함**한다. 라인 이동 시 fingerprint가 바뀌므로 라인 안정성이 없다. 오히려 commit 때문에 더 불안정. + +### Q3. 옵션(c) versioned fingerprint + migration은? +**A. 기각.** 새 version은 신규 finding_id 체계를 만들어 stored state와 단절되고, 마이그레이션은 finding_id 재발급을 요구해 #12 L1과 `FINDING#` 파티션 키를 위반한다. non-breaking·YAGNI 원칙과 충돌. + +### Q4. 채택안 — 옵션(b)? +**A. 채택.** finding_id 불변 + 별도 content 기반 `match_key`(라인·commit 무관)를 추가하고 disposition lookup이 이를 2순위로 조회. 근거: `occurrence_key_for_finding`이 이미 fingerprint 부재 시 `secretHash`+`matchHash` content fallback을 갖는 선례(items.py 643–648), `secret_hash`=`hash_secret()`의 salted SHA-256(raw secret)는 라인·commit과 무관. + +### Q5. match_key 구성요소? +**A.** `salted secret_hash + repo_full_name + rule_id + 정규화 file_path` → canonical JSON(sort_keys, separators) → SHA-256. file_path를 포함해 '같은 파일 내 라인 이동'으로 scope를 좁혀 다른 파일의 별개 노출을 한꺼번에 suppress하지 않게 한다. delimiter injection은 canonical JSON으로 차단(기존 선례 동일). + +### Q6. file_path를 넣으면 rename에 약한데? +**A. 의도된 trade-off.** #6 범위는 '라인 이동'이며 rename은 GHAS `comparison_key`(repo+file+line+secret_type)도 깨지는 상위 범위다. file 제외 시 false-suppression(보안 누락) 위험이 크다. file 포함 고정, rename 내성은 비범위. + +### Q7. lookup gate는 신규인가? +**A. 신규.** 현재 verifier 경로 어디에도 `read_finding_state`로 재질문을 skip하는 gate가 없다(grep 확인). #6은 라인 안정 키 + lookup gate를 함께 도입해야 효과가 있다. **최대 리스크로 명시.** + +### Q8. lookup 우선순위? +**A.** finding_id 직접 매치(1순위, 정확·저렴) → 없으면 match_key(2순위, 라인 이동 흡수). 2단계화로 false-suppression 최소화. + +### Q9. 어떤 판정을 승계? +**A.** terminal non-blocking(`FALSE_POSITIVE`/`IGNORED`/`RESOLVED`)만. gate.py non-blocking 집합과 일치. `OPEN`/`NEEDS_REVIEW`는 비승계. 새 finding_id에는 `source="inherited"` disposition event를 멱등 기록해 GLOBAL status 단일성 유지. + +### Q10. 저장/backend 범위? +**A.** NoSQL adapter의 FINDING_STATE+observation에 `matchKey` 컬럼 + `match_key→finding_id` GSI. JSONL store는 disposition 미지원이라 비대상. + +### Q11. legacy item 호환? +**A.** `finding_to_items`에서 additive 기록, `without_none`로 None drop. 기존 item은 matchKey 없음 → 2순위 단순 miss(over-suppress 없음). 백필은 비범위. + +### Q12. salt 변경 영향? +**A.** match_key는 `SECURITY_SCANNER_HASH_SALT` 의존이라 salt 회전 시 전부 달라져 suppression 리셋 → over-ask(fail-safe). 운영 문서에 'salt 회전 = match-key suppression 리셋' 경고. + +## 8. 변경하지 않는 것 (명시) + +- `compute_fingerprint`, `combine_repo_fingerprint`, `compute_finding_id`, `Finding.create`의 finding_id 산출. +- 기존 `fingerprint`/`finding_id` 필드 의미·`FINDING#` 키. +- `test_finding.py`의 기대값. +- JSONL store 동작. + +## 9. 수용 기준 + +- AC1. 라인만 다른 동일 secret 두 finding이 서로 다른 finding_id를 갖되 동일 `match_key`를 갖는다. +- AC2. 첫 finding을 FALSE_POSITIVE로 판정 후 라인 이동한 두 번째 finding은 verifier 재질문 없이 inherited disposition을 받는다. +- AC3. 다른 파일/rule/repo/secret은 동일 match_key를 갖지 않아 승계되지 않는다. +- AC4. `test_finding.py` 및 기존 storage round-trip 테스트가 변경 없이 통과. +- AC5. inherited disposition event는 멱등(같은 finding_id 중복 호출에도 1건). \ No newline at end of file diff --git a/src/security_scanner/core/finding/model.py b/src/security_scanner/core/finding/model.py index 37c9bb0..d046c71 100644 --- a/src/security_scanner/core/finding/model.py +++ b/src/security_scanner/core/finding/model.py @@ -148,6 +148,28 @@ def compute_finding_id(fingerprint: str) -> str: return f"finding_{digest}" +def compute_match_key( + repo_full_name: str, file_path: str, rule_id: str, secret_hash: str +) -> str: + """Return a line/commit-independent suppression key for one secret. + + Unlike ``finding_id`` (which includes line_start), this keys on the secret + *content* (``secret_hash``, a salted SHA-256) plus repo/file/rule, so the same + secret that merely moved lines yields the same match_key. It is a secondary + *suppression* key only — NOT an identity; ``finding_id`` is unchanged (#12 L1). + Because secret_hash is content-derived, two distinct secrets get distinct keys, + so suppression never spans different leaks. + """ + material = { + "repo": repo_full_name, + "file": file_path, + "rule": rule_id, + "secretHash": secret_hash, + } + encoded = json.dumps(material, sort_keys=True, separators=(",", ":")) + return "mk_" + hashlib.sha256(encoded.encode("utf-8")).hexdigest()[:32] + + # --------------------------------------------------------------------------- # Nested sub-dataclasses # --------------------------------------------------------------------------- diff --git a/src/security_scanner/runtime/disposition_lookup.py b/src/security_scanner/runtime/disposition_lookup.py new file mode 100644 index 0000000..c83b06a --- /dev/null +++ b/src/security_scanner/runtime/disposition_lookup.py @@ -0,0 +1,88 @@ +"""Line-stable disposition lookup gate (#6). + +Resolves whether a finding already has a terminal non-blocking disposition that +should suppress re-verification, checking two keys in order: + +1. ``finding_id`` — the exact finding was already dispositioned (same location). +2. ``match_key`` — the same secret content (repo/file/rule + salted secret_hash) + was dispositioned under a *different* finding_id, i.e. it merely moved lines. + +finding_id is never changed; match_key is a secondary suppression key only. The +caller decides fail-safety: if a store lookup raises, the caller should re-verify +(over-ask) rather than suppress — never the reverse. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Protocol + +from security_scanner.core.finding.model import ( + Finding, + Status, + compute_match_key, +) + +NON_BLOCKING_STATUSES = frozenset( + {Status.RESOLVED.value, Status.FALSE_POSITIVE.value, Status.IGNORED.value} +) + + +@dataclass(frozen=True) +class ExistingDisposition: + """An existing terminal non-blocking disposition that suppresses re-asking.""" + + finding_id: str + status: str + verdict: str | None + via: str # "finding_id" (direct) or "match_key" (line-move inheritance) + + +class _DispositionLookupStore(Protocol): + def read_finding_state(self, finding_id: str) -> dict[str, Any] | None: ... + + def find_disposition_by_match_key( + self, match_key: str + ) -> dict[str, Any] | None: ... + + +def _verdict_of(state: dict[str, Any]) -> str | None: + triage = state.get("triage") + if isinstance(triage, dict): + return triage.get("verdict") + return state.get("verdict") + + +def resolve_existing_disposition( + store: _DispositionLookupStore, finding: Finding +) -> ExistingDisposition | None: + """Return an existing non-blocking disposition for *finding*, or None. + + Does not catch store errors — the caller wraps this and fails safe (re-verify) + on error, so a lookup failure never silently suppresses a finding. + """ + state = store.read_finding_state(finding.finding_id) + if state is not None and state.get("status") in NON_BLOCKING_STATUSES: + return ExistingDisposition( + finding_id=finding.finding_id, + status=state["status"], + verdict=_verdict_of(state), + via="finding_id", + ) + + secret_hash = finding.evidence.secret_hash + if not secret_hash: + return None + + match_key = compute_match_key( + finding.repo.full_name, finding.location.file_path, finding.rule_id, secret_hash + ) + pointer = store.find_disposition_by_match_key(match_key) + if pointer is not None and pointer.get("status") in NON_BLOCKING_STATUSES: + return ExistingDisposition( + finding_id=pointer.get("findingId", finding.finding_id), + status=pointer["status"], + verdict=pointer.get("verdict"), + via="match_key", + ) + return None diff --git a/src/security_scanner/runtime/scan_all.py b/src/security_scanner/runtime/scan_all.py index b5ef5cf..01beffa 100644 --- a/src/security_scanner/runtime/scan_all.py +++ b/src/security_scanner/runtime/scan_all.py @@ -14,6 +14,7 @@ from security_scanner.core.scan.options import ScanOptions from security_scanner.llm.common.verifier import VerifierConfig from security_scanner.runtime import verify_artifact as verifier_runtime +from security_scanner.runtime.disposition_lookup import resolve_existing_disposition from security_scanner.runtime.local_scan import ( RULE_PACK_VERSION, LocalScanRequest, @@ -135,6 +136,7 @@ class ScanAllVerifierSummary: needs_review: int dispositions_written: int failed: int + inherited_suppressions: int = 0 failures: list[ScanAllVerifierFailure] = field(default_factory=list) def to_notification_dict(self) -> dict: @@ -145,6 +147,7 @@ def to_notification_dict(self) -> dict: "terminal_verdicts": self.terminal_verdicts, "needs_review": self.needs_review, "dispositions_written": self.dispositions_written, + "inherited_suppressions": self.inherited_suppressions, "failed": self.failed, "failures": [failure.to_dict() for failure in self.failures], } @@ -527,9 +530,36 @@ def _run_verifier_disposition_writes( terminal_verdicts = 0 needs_review = 0 dispositions_written = 0 + inherited_suppressions = 0 failures: list[ScanAllVerifierFailure] = [] for finding in findings: + # Suppression gate (#6): skip re-verifying a finding that already has a + # terminal non-blocking disposition (by finding_id, or by match_key when + # the same secret merely moved lines). Fail-safe: a lookup error falls + # through to re-verification rather than silently suppressing. + try: + existing = resolve_existing_disposition(disposition_store, finding) + except Exception: # noqa: BLE001 - fail-safe: re-verify on lookup error. + existing = None + if existing is not None: + inherited_suppressions += 1 + if existing.via == "match_key" and existing.verdict is not None: + try: + disposition_store.set_finding_disposition( + finding.finding_id, + status=existing.status, + verdict=existing.verdict, + actor="inherited", + source="inherited", + reason="line-move match_key inheritance", + repo=finding.repo.full_name, + rule_id=finding.rule_id, + ) + except Exception: # noqa: BLE001 - best-effort inherit. + pass + continue + attempted += 1 try: verifier_result = verifier.verify(finding) @@ -589,6 +619,7 @@ def _run_verifier_disposition_writes( terminal_verdicts=terminal_verdicts, needs_review=needs_review, dispositions_written=dispositions_written, + inherited_suppressions=inherited_suppressions, failed=len(failures), failures=failures, ) diff --git a/src/security_scanner/storage/adapters/nosql_db/items.py b/src/security_scanner/storage/adapters/nosql_db/items.py index 82b139c..dc8ddde 100644 --- a/src/security_scanner/storage/adapters/nosql_db/items.py +++ b/src/security_scanner/storage/adapters/nosql_db/items.py @@ -13,7 +13,7 @@ from urllib.parse import urlsplit, urlunsplit from security_scanner.catalog.scan_target import ScanTarget -from security_scanner.core.finding.model import Finding +from security_scanner.core.finding.model import Finding, compute_match_key from security_scanner.storage.adapters.nosql_db.list_axis import ( REPO_LIST_AXIS, SCAN_DATE_AXIS, @@ -767,6 +767,15 @@ def finding_to_items(finding: Finding) -> list[dict[str, Any]]: "repo": repo, "ruleId": rule_id, "fingerprint": finding.fingerprint, + # line/commit-independent suppression key (#6); dropped by without_none + # when the finding has no secret_hash (legacy/edge). + "matchKey": ( + compute_match_key( + repo, finding.location.file_path, rule_id, finding.evidence.secret_hash + ) + if finding.evidence.secret_hash + else None + ), "status": finding.status, "triage": finding.triage.to_dict(), "version": 0, diff --git a/src/security_scanner/storage/adapters/nosql_db/store.py b/src/security_scanner/storage/adapters/nosql_db/store.py index d4b331d..780b2a7 100644 --- a/src/security_scanner/storage/adapters/nosql_db/store.py +++ b/src/security_scanner/storage/adapters/nosql_db/store.py @@ -11,7 +11,7 @@ from boto3.dynamodb.types import TypeSerializer from security_scanner.catalog.scan_target import ScanTarget -from security_scanner.core.finding.model import Finding +from security_scanner.core.finding.model import Finding, Status from security_scanner.storage.adapters.nosql_db.access import ( items_to_finding_state_by_id, items_to_findings, @@ -87,6 +87,12 @@ TargetScanResult, ) +# Terminal non-blocking statuses that are worth a match_key suppression pointer +# (mirrors core.policy.gate non-blocking set). +_DISPOSITION_NON_BLOCKING = frozenset( + {Status.RESOLVED.value, Status.FALSE_POSITIVE.value, Status.IGNORED.value} +) + class DynamoDbCompatibleFindingStore: """FindingStore implementation backed by a DynamoDB-compatible endpoint.""" @@ -670,11 +676,58 @@ def set_finding_disposition( }, ], ) + self._record_match_disposition_pointer( + state_item.get("matchKey"), + finding_id=finding_id, + status=status, + verdict=verdict, + decided_at=decided_at_iso, + ) return except Exception as exc: if not _is_conditional_check_failure(exc) or attempt == 2: raise + def _record_match_disposition_pointer( + self, + match_key: str | None, + *, + finding_id: str, + status: str, + verdict: str, + decided_at: str, + ) -> None: + """Upsert a match_key -> disposition pointer for line-stable suppression (#6). + + Best-effort and idempotent (latest wins): only written for terminal + non-blocking statuses on findings that carry a matchKey. Not transactional + with the state update — if it lags, suppression simply re-asks (fail-safe). + """ + if not match_key or status not in _DISPOSITION_NON_BLOCKING: + return + self._table.put_item( + Item={ + "PK": f"MATCHKEY#{match_key}", + "SK": "DISPOSITION", + "entityType": "MATCH_DISPOSITION", + "matchKey": match_key, + "findingId": finding_id, + "status": status, + "verdict": verdict, + "decidedAt": decided_at, + } + ) + + def find_disposition_by_match_key(self, match_key: str) -> dict[str, Any] | None: + """Return the latest disposition pointer for a match_key, or None (#6).""" + response = self._table.get_item( + Key={"PK": f"MATCHKEY#{match_key}", "SK": "DISPOSITION"} + ) + item = response.get("Item") + if not item or item.get("entityType") != "MATCH_DISPOSITION": + return None + return item + def read_all(self) -> list[Finding]: finding_items = scan_all_pages( self._table, diff --git a/tests/test_cli_scan_all.py b/tests/test_cli_scan_all.py index 42ce19b..7e6536e 100644 --- a/tests/test_cli_scan_all.py +++ b/tests/test_cli_scan_all.py @@ -11,7 +11,12 @@ from security_scanner.catalog.scan_target import ScanTarget from security_scanner.cli.commands import scan as cli_scan from security_scanner.cli import main -from security_scanner.core.finding.model import Finding, Status, Verdict +from security_scanner.core.finding.model import ( + Finding, + Status, + Verdict, + compute_match_key, +) from security_scanner.llm.common.verifier import VerifierResult from security_scanner.runtime.local_scan import ( LocalScanRequest, @@ -59,6 +64,8 @@ def __init__(self) -> None: self.bootstrap_required = False self.disposition_calls: list[dict] = [] self.raise_on_disposition: Exception | None = None + self.finding_states: dict[str, dict] = {} + self.match_dispositions: dict[str, dict] = {} def put_scan_target(self, target: ScanTarget) -> None: self._targets.append(target) @@ -81,6 +88,12 @@ def set_finding_disposition(self, finding_id, **kwargs): raise self.raise_on_disposition self.disposition_calls.append({"finding_id": finding_id, **kwargs}) + def read_finding_state(self, finding_id): + return self.finding_states.get(finding_id) + + def find_disposition_by_match_key(self, match_key): + return self.match_dispositions.get(match_key) + @pytest.fixture def fake_store(monkeypatch): @@ -549,6 +562,116 @@ def verify(self, finding): "terminal_verdicts": 2, "needs_review": 1, "dispositions_written": 2, + "inherited_suppressions": 0, + "failed": 0, + "failures": [], + } + + +def test_scan_all_verify_inherits_match_key_suppression( + fake_store, lock_path, monkeypatch, capsys, tmp_path +): + fake_store.put_scan_target( + ScanTarget(url="https://github.com/example-org/repo-a", name="example-org/repo-a") + ) + cache = tmp_path / "repo-a" + cache.mkdir() + # Same secret previously dispositioned false-positive, now moved to a new + # line: only the match_key pointer exists (no finding_id state for the move). + moved = _make_finding(repo="example-org/repo-a", line_start=500) + match_key = compute_match_key( + moved.repo.full_name, + moved.location.file_path, + moved.rule_id, + moved.evidence.secret_hash, + ) + fake_store.match_dispositions[match_key] = { + "status": Status.FALSE_POSITIVE.value, + "verdict": Verdict.FALSE_POSITIVE.value, + "findingId": "finding_original", + } + + monkeypatch.setattr( + "security_scanner.cli.commands.scan.fetch_or_clone", + lambda url, *a, **k: cache, + ) + monkeypatch.setattr( + "security_scanner.cli.commands.scan.run_local_scan", + lambda *a, **k: LocalScanResult( + manifest_path="", + scan_run_id="scan_run_abc", + rule_pack_version="secret-rules-0.1.0", + destination="dynamodb", + total_targets=1, + scanned=1, + total_findings=1, + target_results=[ + LocalScanTargetResult( + target_name="example-org/repo-a", + status="scanned", + finding_count=1, + findings=[moved], + ) + ], + ), + ) + verifier_calls: list[str] = [] + + class FakeVerifier: + def __init__(self, config): + self.config = config + + def verify(self, finding): + verifier_calls.append(finding.finding_id) + raise AssertionError("suppressed finding must not be verified") + + monkeypatch.setattr( + "security_scanner.runtime.verify_artifact.OllamaChatVerifier", + FakeVerifier, + ) + + log_path = tmp_path / "scan-all.log.jsonl" + exit_code = main( + [ + "scan-all", + "--storage-backend", + "dynamodb", + "--notification-log", + str(log_path), + "--verify-artifacts", + "--ollama-host", + "http://127.0.0.1:11434", + "--ollama-model", + "synthetic-model", + ] + ) + + captured = capsys.readouterr() + assert exit_code == 0, captured.err + # The moved finding is suppressed before verification ever runs. + assert verifier_calls == [] + # match_key inheritance re-stamps the new finding_id with the prior verdict. + assert fake_store.disposition_calls == [ + { + "finding_id": moved.finding_id, + "status": Status.FALSE_POSITIVE.value, + "verdict": Verdict.FALSE_POSITIVE.value, + "actor": "inherited", + "source": "inherited", + "reason": "line-move match_key inheritance", + "repo": moved.repo.full_name, + "rule_id": moved.rule_id, + } + ] + summary = next(r for r in _read_log(log_path) if r["type"] == "summary") + assert summary["verification"] == { + "enabled": True, + "findings_total": 1, + "attempted": 0, + "terminal_verdicts": 0, + "needs_review": 0, + "dispositions_written": 0, + "inherited_suppressions": 1, "failed": 0, "failures": [], } diff --git a/tests/test_dynamodb_compatible_store.py b/tests/test_dynamodb_compatible_store.py index d817f64..53b419e 100644 --- a/tests/test_dynamodb_compatible_store.py +++ b/tests/test_dynamodb_compatible_store.py @@ -16,6 +16,7 @@ GitleaksFindingPayload, Status, Verdict, + compute_match_key, ) from security_scanner.runtime.branch_residual import residual_for_repo from security_scanner.storage.adapters.nosql_db.list_axis import ( @@ -1324,6 +1325,103 @@ def transact_write_items(self, *, TransactItems: list[dict]) -> dict: assert events[0].to_verdict == Verdict.FALSE_POSITIVE.value +def test_finding_state_carries_match_key(): + finding = _make() + table = FakeDynamoTable(finding_to_items(finding)) + client = FakeDynamoClient(table) + store = DynamoDbCompatibleFindingStore( + DynamoDbCompatibleConfig(table_name="SecurityScannerLocal"), + resource=FakeDynamoResource(table), + client=client, + ) + + state = store.read_finding_state(finding.finding_id) + assert state is not None + expected = compute_match_key( + finding.repo.full_name, + finding.location.file_path, + finding.rule_id, + finding.evidence.secret_hash, + ) + assert state["matchKey"] == expected + assert FAKE_SECRET not in state["matchKey"] + + +def test_non_blocking_disposition_writes_match_key_pointer(): + finding = _make(triage_verdict=Verdict.NEEDS_REVIEW.value) + table = FakeDynamoTable(finding_to_items(finding)) + client = FakeDynamoClient(table) + store = DynamoDbCompatibleFindingStore( + DynamoDbCompatibleConfig(table_name="SecurityScannerLocal"), + resource=FakeDynamoResource(table), + client=client, + ) + + store.set_finding_disposition( + finding.finding_id, + status=Status.FALSE_POSITIVE.value, + verdict=Verdict.FALSE_POSITIVE.value, + actor="human", + source="manual", + reason="Confirmed false positive.", + repo=finding.repo.full_name, + rule_id=finding.rule_id, + decided_at="2026-06-16T01:02:03+00:00", + ) + + match_key = compute_match_key( + finding.repo.full_name, + finding.location.file_path, + finding.rule_id, + finding.evidence.secret_hash, + ) + pointer = store.find_disposition_by_match_key(match_key) + assert pointer is not None + assert pointer["entityType"] == "MATCH_DISPOSITION" + assert pointer["PK"] == f"MATCHKEY#{match_key}" + assert pointer["SK"] == "DISPOSITION" + assert pointer["findingId"] == finding.finding_id + assert pointer["status"] == Status.FALSE_POSITIVE.value + assert pointer["verdict"] == Verdict.FALSE_POSITIVE.value + assert pointer["decidedAt"] == "2026-06-16T01:02:03+00:00" + + +def test_blocking_disposition_writes_no_match_key_pointer(): + finding = _make(triage_verdict=Verdict.NEEDS_REVIEW.value) + table = FakeDynamoTable(finding_to_items(finding)) + client = FakeDynamoClient(table) + store = DynamoDbCompatibleFindingStore( + DynamoDbCompatibleConfig(table_name="SecurityScannerLocal"), + resource=FakeDynamoResource(table), + client=client, + ) + + # A true-positive keeps the finding OPEN (blocking) — no suppression pointer. + store.set_finding_disposition( + finding.finding_id, + status=Status.OPEN.value, + verdict=Verdict.TRUE_POSITIVE.value, + actor="human", + source="manual", + reason="Confirmed real secret.", + repo=finding.repo.full_name, + rule_id=finding.rule_id, + decided_at="2026-06-16T01:02:03+00:00", + ) + + match_key = compute_match_key( + finding.repo.full_name, + finding.location.file_path, + finding.rule_id, + finding.evidence.secret_hash, + ) + assert store.find_disposition_by_match_key(match_key) is None + pointer_items = [ + item for item in table.items if item.get("entityType") == "MATCH_DISPOSITION" + ] + assert pointer_items == [] + + def test_clear_is_not_supported_for_dynamodb_store(): store = DynamoDbCompatibleFindingStore( DynamoDbCompatibleConfig(table_name="SecurityScannerLocal"), diff --git a/tests/test_match_key_suppression.py b/tests/test_match_key_suppression.py new file mode 100644 index 0000000..f20b76b --- /dev/null +++ b/tests/test_match_key_suppression.py @@ -0,0 +1,119 @@ +"""Tests for line-stable match_key suppression (#23 follow-on #6).""" + +from __future__ import annotations + +from security_scanner.core.finding.model import Finding, compute_match_key +from security_scanner.runtime.disposition_lookup import resolve_existing_disposition + +FAKE_SECRET = "AKIAFAKEEXAMPLE000600" + + +def _finding(*, line_start=10, secret=FAKE_SECRET) -> Finding: + return Finding.create( + repo_full_name="org/repo", + rule_id="generic-api-key", + file_path="src/config.py", + line_start=line_start, + raw_secret=secret, + source_tool="gitleaks", + scan_run_id="scan_mk1", + rule_pack_version="secret-rules-0.1.0", + ) + + +def _match_key(f: Finding) -> str: + return compute_match_key( + f.repo.full_name, f.location.file_path, f.rule_id, f.evidence.secret_hash + ) + + +# --- compute_match_key (core) --- + + +def test_match_key_is_line_independent(): + a = _finding(line_start=10) + b = _finding(line_start=999) # same secret, moved lines + assert _match_key(a) == _match_key(b) + # but finding_id DOES differ by line (identity unchanged, #12 L1) + assert a.finding_id != b.finding_id + + +def test_match_key_differs_for_different_secret(): + a = _finding(secret="AKIAFAKEEXAMPLE000601") + b = _finding(secret="AKIAFAKEEXAMPLE000602") + assert _match_key(a) != _match_key(b) # distinct leaks never share a key + + +def test_match_key_excludes_raw_secret(): + key = _match_key(_finding()) + assert FAKE_SECRET not in key + assert key.startswith("mk_") + + +# --- resolve_existing_disposition (gate) --- + + +class _FakeStore: + def __init__(self, *, state=None, pointer=None): + self._state = state + self._pointer = pointer + self.match_key_queries: list[str] = [] + + def read_finding_state(self, finding_id): + return self._state + + def find_disposition_by_match_key(self, match_key): + self.match_key_queries.append(match_key) + return self._pointer + + +def test_direct_finding_id_hit_suppresses(): + f = _finding() + store = _FakeStore( + state={"status": "FALSE_POSITIVE", "triage": {"verdict": "FALSE_POSITIVE"}} + ) + existing = resolve_existing_disposition(store, f) + assert existing is not None + assert existing.via == "finding_id" + assert existing.status == "FALSE_POSITIVE" + assert store.match_key_queries == [] # short-circuits, no match_key query + + +def test_open_finding_not_suppressed(): + f = _finding() + store = _FakeStore(state={"status": "OPEN", "triage": {"verdict": "NEEDS_REVIEW"}}) + assert resolve_existing_disposition(store, f) is None + + +def test_match_key_inheritance_on_line_move(): + moved = _finding(line_start=500) + mk = _match_key(moved) + # no direct state for the new finding_id, but a pointer exists for the secret + store = _FakeStore( + state=None, + pointer={ + "status": "FALSE_POSITIVE", + "verdict": "FALSE_POSITIVE", + "findingId": "finding_original", + "matchKey": mk, + }, + ) + existing = resolve_existing_disposition(store, moved) + assert existing is not None + assert existing.via == "match_key" + assert existing.finding_id == "finding_original" + assert existing.status == "FALSE_POSITIVE" + + +def test_no_pointer_is_not_suppressed(): + f = _finding() + store = _FakeStore(state=None, pointer=None) + assert resolve_existing_disposition(store, f) is None + + +def test_blocking_pointer_does_not_suppress(): + f = _finding() + store = _FakeStore( + state=None, pointer={"status": "OPEN", "verdict": "TRUE_POSITIVE"} + ) + assert resolve_existing_disposition(store, f) is None From 554ea2aee6a3f09c8185338957d43aedeb7fffc3 Mon Sep 17 00:00:00 2001 From: pureliture Date: Fri, 19 Jun 2026 18:06:45 +0900 Subject: [PATCH 2/3] fix(verifier): docstring bodies for lookup Protocol stubs (#6) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CodeQL flagged the `...` Ellipsis bodies on the _DispositionLookupStore Protocol methods as "Statement has no effect" (py/ineffectual-statement). Replace them with one-line docstrings — the idiomatic, alert-free body for a typing.Protocol method. Co-Authored-By: Claude Opus 4.8 --- src/security_scanner/runtime/disposition_lookup.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/security_scanner/runtime/disposition_lookup.py b/src/security_scanner/runtime/disposition_lookup.py index c83b06a..7aeb1eb 100644 --- a/src/security_scanner/runtime/disposition_lookup.py +++ b/src/security_scanner/runtime/disposition_lookup.py @@ -39,11 +39,11 @@ class ExistingDisposition: class _DispositionLookupStore(Protocol): - def read_finding_state(self, finding_id: str) -> dict[str, Any] | None: ... + def read_finding_state(self, finding_id: str) -> dict[str, Any] | None: + """Return the global FINDING_STATE row for a finding, or None.""" - def find_disposition_by_match_key( - self, match_key: str - ) -> dict[str, Any] | None: ... + def find_disposition_by_match_key(self, match_key: str) -> dict[str, Any] | None: + """Return the match_key -> disposition pointer, or None.""" def _verdict_of(state: dict[str, Any]) -> str | None: From 533468deaf4314a2ea01e21f3188931081833097 Mon Sep 17 00:00:00 2001 From: pureliture Date: Fri, 19 Jun 2026 18:12:22 +0900 Subject: [PATCH 3/3] fix(verifier): harden match_key suppression per review (#6) Address gemini-code-assist review on PR #43: - over-suppression (HIGH): resolve_existing_disposition fell through to the match_key pointer even when the exact finding_id had a *deliberate* blocking decision, letting a same-secret false-positive suppress a re-opened/real finding. Now a blocking direct state with a recorded decision (decidedBy) is definitive and returns None. A default scan-created OPEN row (version 0, no decidedBy) still falls through, so line-move inheritance keeps working once findings are persisted before verification (the literal "any blocking state wins" suggestion would have disabled the feature entirely). - stale pointer (SECURITY-HIGH): _record_match_disposition_pointer now deletes the MATCHKEY pointer when a status flips back to blocking (e.g. a false positive re-opened as a real secret), so a later line-move of the same secret is re-verified instead of silently suppressed by an outdated verdict. - best-effort (MEDIUM): the pointer put/delete is wrapped in try/except so a transient write failure never fails the already-committed disposition transaction, matching the documented fail-safe contract. Tests: deliberate-blocking-overrides-pointer and default-open-still-inherits gate units; reopen-deletes-stale-pointer store test; FakeDynamoTable gains delete_item. Full suite green (705). Co-Authored-By: Claude Opus 4.8 --- .../runtime/disposition_lookup.py | 23 +++++--- .../storage/adapters/nosql_db/store.py | 48 +++++++++------ tests/test_dynamodb_compatible_store.py | 58 +++++++++++++++++++ tests/test_match_key_suppression.py | 38 ++++++++++++ 4 files changed, 143 insertions(+), 24 deletions(-) diff --git a/src/security_scanner/runtime/disposition_lookup.py b/src/security_scanner/runtime/disposition_lookup.py index 7aeb1eb..309ecd0 100644 --- a/src/security_scanner/runtime/disposition_lookup.py +++ b/src/security_scanner/runtime/disposition_lookup.py @@ -62,13 +62,22 @@ def resolve_existing_disposition( on error, so a lookup failure never silently suppresses a finding. """ state = store.read_finding_state(finding.finding_id) - if state is not None and state.get("status") in NON_BLOCKING_STATUSES: - return ExistingDisposition( - finding_id=finding.finding_id, - status=state["status"], - verdict=_verdict_of(state), - via="finding_id", - ) + if state is not None: + if state.get("status") in NON_BLOCKING_STATUSES: + return ExistingDisposition( + finding_id=finding.finding_id, + status=state["status"], + verdict=_verdict_of(state), + via="finding_id", + ) + # A *deliberate* blocking decision for this exact finding_id is the + # definitive authority: a re-opened / confirmed-real finding must never + # be overridden by a same-secret match_key suppression pointer. A blocking + # state with no recorded decision (``decidedBy``) is just the default + # scan-created OPEN row — fall through so a line-moved secret can still + # inherit a prior verdict via its match_key. + if state.get("decidedBy"): + return None secret_hash = finding.evidence.secret_hash if not secret_hash: diff --git a/src/security_scanner/storage/adapters/nosql_db/store.py b/src/security_scanner/storage/adapters/nosql_db/store.py index 780b2a7..3e4de46 100644 --- a/src/security_scanner/storage/adapters/nosql_db/store.py +++ b/src/security_scanner/storage/adapters/nosql_db/store.py @@ -697,26 +697,40 @@ def _record_match_disposition_pointer( verdict: str, decided_at: str, ) -> None: - """Upsert a match_key -> disposition pointer for line-stable suppression (#6). + """Reconcile the match_key -> disposition pointer for suppression (#6). - Best-effort and idempotent (latest wins): only written for terminal - non-blocking statuses on findings that carry a matchKey. Not transactional - with the state update — if it lags, suppression simply re-asks (fail-safe). + Best-effort and idempotent (latest wins), keyed on the finding's matchKey: + + - terminal non-blocking status -> upsert the pointer (enables line-stable + inheritance of the verdict when the same secret later moves lines); + - blocking status (e.g. a false-positive re-opened as a real secret) -> + delete any stale pointer, so a future line-move of the same secret is + re-verified rather than silently suppressed by an outdated verdict. + + Not transactional with the state update: pointer-write failures are + swallowed so they never fail the (already-committed) disposition; if the + pointer lags, suppression simply re-asks (fail-safe). """ - if not match_key or status not in _DISPOSITION_NON_BLOCKING: + if not match_key: return - self._table.put_item( - Item={ - "PK": f"MATCHKEY#{match_key}", - "SK": "DISPOSITION", - "entityType": "MATCH_DISPOSITION", - "matchKey": match_key, - "findingId": finding_id, - "status": status, - "verdict": verdict, - "decidedAt": decided_at, - } - ) + pointer_key = {"PK": f"MATCHKEY#{match_key}", "SK": "DISPOSITION"} + try: + if status in _DISPOSITION_NON_BLOCKING: + self._table.put_item( + Item={ + **pointer_key, + "entityType": "MATCH_DISPOSITION", + "matchKey": match_key, + "findingId": finding_id, + "status": status, + "verdict": verdict, + "decidedAt": decided_at, + } + ) + else: + self._table.delete_item(Key=pointer_key) + except Exception: # noqa: BLE001 - best-effort; never fail the main txn. + pass def find_disposition_by_match_key(self, match_key: str) -> dict[str, Any] | None: """Return the latest disposition pointer for a match_key, or None (#6).""" diff --git a/tests/test_dynamodb_compatible_store.py b/tests/test_dynamodb_compatible_store.py index 53b419e..564592c 100644 --- a/tests/test_dynamodb_compatible_store.py +++ b/tests/test_dynamodb_compatible_store.py @@ -110,6 +110,14 @@ def put_item(self, *, Item: dict, **kwargs) -> dict: def batch_writer(self): return self._BatchWriter(self) + def delete_item(self, *, Key: dict, **kwargs) -> dict: + self.items = [ + item + for item in self.items + if not (item.get("PK") == Key["PK"] and item.get("SK") == Key["SK"]) + ] + return {"ResponseMetadata": {"HTTPStatusCode": 200}} + def get_item(self, *, Key: dict, **kwargs) -> dict: for item in self.items: if item.get("PK") == Key["PK"] and item.get("SK") == Key["SK"]: @@ -1422,6 +1430,56 @@ def test_blocking_disposition_writes_no_match_key_pointer(): assert pointer_items == [] +def test_reopening_disposition_deletes_stale_match_key_pointer(): + finding = _make(triage_verdict=Verdict.NEEDS_REVIEW.value) + table = FakeDynamoTable(finding_to_items(finding)) + client = FakeDynamoClient(table) + store = DynamoDbCompatibleFindingStore( + DynamoDbCompatibleConfig(table_name="SecurityScannerLocal"), + resource=FakeDynamoResource(table), + client=client, + ) + match_key = compute_match_key( + finding.repo.full_name, + finding.location.file_path, + finding.rule_id, + finding.evidence.secret_hash, + ) + + # First a false-positive disposition writes the suppression pointer. + store.set_finding_disposition( + finding.finding_id, + status=Status.FALSE_POSITIVE.value, + verdict=Verdict.FALSE_POSITIVE.value, + actor="human", + source="manual", + reason="Looked benign.", + repo=finding.repo.full_name, + rule_id=finding.rule_id, + decided_at="2026-06-16T01:02:03+00:00", + ) + assert store.find_disposition_by_match_key(match_key) is not None + + # Re-opening it as a real secret must drop the now-stale pointer so a later + # line-move of the same secret is re-verified, not silently suppressed. + store.set_finding_disposition( + finding.finding_id, + status=Status.OPEN.value, + verdict=Verdict.TRUE_POSITIVE.value, + actor="human", + source="manual", + reason="Actually a live credential.", + repo=finding.repo.full_name, + rule_id=finding.rule_id, + decided_at="2026-06-17T01:02:03+00:00", + ) + assert store.find_disposition_by_match_key(match_key) is None + pointer_items = [ + item for item in table.items if item.get("entityType") == "MATCH_DISPOSITION" + ] + assert pointer_items == [] + + def test_clear_is_not_supported_for_dynamodb_store(): store = DynamoDbCompatibleFindingStore( DynamoDbCompatibleConfig(table_name="SecurityScannerLocal"), diff --git a/tests/test_match_key_suppression.py b/tests/test_match_key_suppression.py index f20b76b..6141e6e 100644 --- a/tests/test_match_key_suppression.py +++ b/tests/test_match_key_suppression.py @@ -105,6 +105,44 @@ def test_match_key_inheritance_on_line_move(): assert existing.status == "FALSE_POSITIVE" +def test_default_open_state_still_inherits_via_match_key(): + # A freshly-scanned finding carries a default OPEN state (version 0, no + # decidedBy). That must NOT block match_key inheritance — otherwise the + # line-move use case never fires once findings are persisted before verify. + moved = _finding(line_start=500) + mk = _match_key(moved) + store = _FakeStore( + state={"status": "OPEN", "version": 0, "triage": {"verdict": "NEEDS_REVIEW"}}, + pointer={ + "status": "FALSE_POSITIVE", + "verdict": "FALSE_POSITIVE", + "findingId": "finding_original", + "matchKey": mk, + }, + ) + existing = resolve_existing_disposition(store, moved) + assert existing is not None + assert existing.via == "match_key" + assert existing.status == "FALSE_POSITIVE" + + +def test_deliberate_blocking_decision_overrides_match_key_pointer(): + # A re-opened / confirmed-real finding (decidedBy recorded) is definitive: + # a same-secret FALSE_POSITIVE pointer must NOT suppress it. + f = _finding() + store = _FakeStore( + state={ + "status": "OPEN", + "version": 2, + "decidedBy": "human", + "triage": {"verdict": "TRUE_POSITIVE"}, + }, + pointer={"status": "FALSE_POSITIVE", "verdict": "FALSE_POSITIVE"}, + ) + assert resolve_existing_disposition(store, f) is None + assert store.match_key_queries == [] # definitive state short-circuits + + def test_no_pointer_is_not_suppressed(): f = _finding() store = _FakeStore(state=None, pointer=None)