From bd533bfb6a844c4b3e746810ef7b7a550aee30be Mon Sep 17 00:00:00 2001 From: pureliture Date: Fri, 19 Jun 2026 17:39:40 +0900 Subject: [PATCH] =?UTF-8?q?feat(cli):=20disposition=20set=20=E2=80=94=20ma?= =?UTF-8?q?nual=20human=20finding=20verdict=20(#4)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A person can record true_positive/false_positive on a finding through the same channel the Ollama verifier uses (set_finding_disposition → global FINDING_STATE + STATE_EVENT ledger), with actor=human / source=manual. Because finding_id is branch/commit-stable, a cleared finding stays cleared on re-detection, so future re-asks are suppressed. - cli/commands/disposition.py: `disposition set --finding-id --verdict [--reason --repo --rule-id]`; reuses disposition_status_for_verdict (false_positive→FALSE_POSITIVE, true_positive→OPEN); reads repo/ruleId from the finding's stored state unless overridden; dynamodb-only (exit 2), unknown finding → exit 2. - app.py: register the command. Out of scope: notification/endpoint (design exclusion) and IGNORED status (not a Verdict value) — possible follow-up. Spec: docs/workbench/specs/human-disposition/. Co-Authored-By: Claude Opus 4.8 --- .../specs/human-disposition/design.md | 128 ++++++++++++++++ .../specs/human-disposition/requirements.md | 142 ++++++++++++++++++ src/security_scanner/cli/app.py | 2 + .../cli/commands/disposition.py | 121 +++++++++++++++ tests/test_cli.py | 1 + tests/test_cli_disposition.py | 104 +++++++++++++ 6 files changed, 498 insertions(+) create mode 100644 docs/workbench/specs/human-disposition/design.md create mode 100644 docs/workbench/specs/human-disposition/requirements.md create mode 100644 src/security_scanner/cli/commands/disposition.py create mode 100644 tests/test_cli_disposition.py diff --git a/docs/workbench/specs/human-disposition/design.md b/docs/workbench/specs/human-disposition/design.md new file mode 100644 index 0000000..1f99522 --- /dev/null +++ b/docs/workbench/specs/human-disposition/design.md @@ -0,0 +1,128 @@ +# 분석가 수동 disposition 채널 (`disposition set`) — Design + +## 개요 + +사람(analyst)이 하나의 finding에 대해 판정(true_positive / false_positive / ignored)을 +CLI로 직접 기록하는 채널을 추가한다. 자동 verifier가 쓰는 것과 **완전히 동일한** +`DynamoDbCompatibleFindingStore.set_finding_disposition`을 경유하여 글로벌 +`FINDING_STATE`(최신 상태, optimistic-lock `version`)와 append-only `STATE_EVENT` ledger에 +한 transaction으로 기록한다. 차이는 출처 메타(`actor="human"`, `source="manual"`)와 +사람만 가능한 IGNORED 억제뿐이다. stable finding_id가 글로벌 상태에 매핑되므로 수동 판정은 +재탐지를 가로질러 유지되고, 기존 gate/merge 메커니즘이 그대로 억제를 실현한다. + +NO notification/endpoint, NO target-repo PR gating, public-safe/redacted-only, local-first, +design-only(롤아웃 제외). + +## 요구사항 참조 + +- 채널 재사용: 기존 `set_finding_disposition`(verifier가 `source="verifier"`로 호출하는 + 바로 그 메서드)을 `source="manual"`로 호출. → requirements '채널 일관성'. +- status 파생: verifier `disposition_status_for_verdict` 재사용 + manual 전용 IGNORED 확장. +- repo/rule_id: finding_id의 기존 FINDING_STATE에서 자동 도출, 플래그는 override. +- 억제: gate non-blocking status + 조건부 상태 생성 + read overlay (신규 코드 없음). + +## 접근 후보 + 선택 + +**후보 A — 기존 set_finding_disposition을 manual 인자로 직접 호출 (선택).** +신규 runtime use case 모듈 없이, 얇은 runtime 헬퍼 + CLI 핸들러가 store를 직접 부른다. +verifier가 `record_verifier_disposition`이라는 얇은 어댑터로 같은 store를 부르는 선례와 대칭. +- 장점: 코드 최소, 채널 단일화 보장, 회귀면 작음. 단점: runtime 로직이 얇아 거의 CLI에 가까움. + +**후보 B — `runtime/manual_disposition.py` 전용 use case dataclass(Request/Result)로 캡슐화.** +verify_artifact.py의 `VerifyArtifactRequest`/`run_verify_artifact` 구조를 미러. +- 장점: 테스트 시 store 주입 seam이 깔끔, 입력 검증을 runtime에 응집. 단점: 단일 store 호출에 + 비해 과한 인프라(YAGNI 위반 소지). + +**선택: A를 기본으로 하되 status 파생/repo·rule_id 도출만 runtime 헬퍼로 분리(A+).** +근거: 로직 부피가 작아 B의 Request/Result dataclass는 과하다. 단, status 파생과 +'state에서 repo/rule_id 도출'은 순수 함수로 분리해 단위 테스트(박스 불필요)와 verifier와의 +대칭(`disposition_status_for_verdict` 옆에 `manual_disposition_status_for_verdict`)을 확보한다. + +## 아키텍처 + +``` +analyst ──`disposition set --finding-id --verdict [--reason]`──► cli/commands/disposition.py + │ + store_from_args(dynamodb) ◄─────┤ (jsonl이면 exit 2) + │ + read_finding_state(finding_id) ──► repo/ruleId 도출 (override 우선) + │ + manual_disposition_status_for_verdict(verdict) ──► status (FALSE_POSITIVE/OPEN/IGNORED) + │ + set_finding_disposition(finding_id, status, verdict, actor='human', source='manual', + reason, repo, rule_id) ──► transact_write + ▼ + FINDING_STATE(update, version+1) + STATE_EVENT(append) + │ + (이후) 재스캔 append() ─► _put_state_item_if_absent (수동 상태 보존) ─► read merge_finding_states + ▼ + gate _is_blocking: FALSE_POSITIVE/IGNORED → non-blocking +``` + +## 구성요소 + +### CLI: `cli/commands/disposition.py` (신규) +- `register(subparsers)`: `disposition` 부모 parser 생성 → 그 안에 nested subparsers로 + `set` 액션 등록. `set` parser에 `--finding-id`(필수), `--verdict`(필수, choices=소문자 + 3값), `--reason`(선택), `--repo`/`--rule-id`(선택 override), `add_storage_args(..., include_jsonl_path="")`. +- `cmd_disposition_set(args)`: + 1. jsonl backend면 `error: disposition requires --storage-backend dynamodb` + return 2. + 2. `store = store_from_args(args)`. + 3. verdict(소문자) → 내부 토큰: `false_positive→Verdict.FALSE_POSITIVE.value`, + `true_positive→Verdict.TRUE_POSITIVE.value`, `ignored`는 verdict는 그대로 두되 status로 표현. + 4. `state = store.read_finding_state(args.finding_id)`; None이면 명시 메시지 + return 2. + 5. repo = `args.repo or state["repo"]`, rule_id = `args.rule_id or state["ruleId"]`. + 6. status = `manual_disposition_status_for_verdict(verdict_token)`. + 7. `store.set_finding_disposition(finding_id, status=status, verdict=verdict_token, + actor="human", source="manual", reason=reason_or_default, repo=repo, rule_id=rule_id)`. + 8. ValueError 캡처 → `error: ...` + return 2 (verify.py 패턴). + 9. 성공: `Disposed {finding_id}: {prev_status} -> {status} (manual)` 출력, return 0. + +### runtime 헬퍼: `runtime/verify_artifact.py` 또는 신규 작은 모듈 +- `manual_disposition_status_for_verdict(verdict: str) -> str`: + FALSE_POSITIVE→Status.FALSE_POSITIVE, TRUE_POSITIVE→Status.OPEN, + (IGNORED 토큰)→Status.IGNORED. verifier의 `disposition_status_for_verdict`를 내부 재사용하되 + None 케이스를 manual에서는 허용하지 않음(사람은 항상 terminal 의도). verifier 함수는 불변. + +### 재사용(변경 없음) +- `store.read_finding_state` / `store.set_finding_disposition` / `cli._store.store_from_args` + / `cli._args.add_storage_args` / `gate._is_blocking` / `merge_finding_states`. + +## 데이터 흐름 (억제) + +1. **기록**: FINDING_STATE.status←FALSE_POSITIVE(또는 IGNORED), triage.verdict←해당값, + decidedBy='human', decisionSource='manual', version+1; STATE_EVENT 1건(from→to, actor/source). +2. **재탐지**: 스캔이 같은 finding_id를 다시 적재해도 FINDING_STATE는 조건부 생성만 + (`_put_state_item_if_absent`)이라 version>=1 수동 상태를 덮지 않음. +3. **소비**: read 시 `merge_finding_states`가 수동 status/triage를 observation 위에 overlay → + gate에서 non-blocking, report에서 cleared로 표시. 억제 성립. + +## 에러 처리 + +- jsonl backend → exit 2(명시 메시지). compare-ghas 선례. +- 미존재 finding_id → `read_finding_state`가 None 또는 set_finding_disposition이 + `ValueError("finding state does not exist for ...")` → exit 2. +- 잘못된 verdict → argparse choices가 exit 2. +- transact 충돌 → store가 3회 재시도(기존). 모두 실패면 raise → CLI가 public-safe 메시지로 변환. +- 모든 에러 출력은 redacted-only(시크릿/원문/내부 스택 미노출). + +## 테스트 전략 (TDD, FakeDynamoTable 재사용, 박스 불필요) +- M1: `manual_disposition_status_for_verdict` 매핑 단위 테스트(3 verdict→status, verifier 함수 불변 확인). +- M2: store 단위 — read_finding_state로 repo/ruleId 도출, set_finding_disposition 호출 인자 + (actor='human', source='manual') 검증, STATE_EVENT 1건 append·version+1 확인. +- M3: CLI — false_positive/true_positive/ignored 각 경로 exit 0 + 출력; jsonl 거부 exit 2; + 미존재 finding_id exit 2; 잘못된 verdict exit 2; --repo/--rule-id override 동작. +- M4: 억제 회귀 — 수동 FALSE_POSITIVE 후 재탐지 append → read에서 status 유지, gate non-blocking. +- M5: `test_subcommand_registration_order_is_stable` expected 리스트에 `disposition` 추가·green. + +## 마일스톤 +- M1: `manual_disposition_status_for_verdict` 헬퍼 + 단위 테스트. +- M2: `cli/commands/disposition.py` register/cmd + app.py `_COMMAND_MODULES` 등록. +- M3: 검증/에러 경로(backend 거부, 미존재 id, ValueError→exit 2) + CLI 테스트. +- M4: 억제 회귀 테스트 + 서브커맨드 순서 테스트 갱신. + +## 열린 질문 +- IGNORED 입력 표면: `--verdict ignored` 단일 입력(제안) vs `--status` 별도 플래그. +- `--decided-at` 사용자 노출(소급 감사) 여부 — 기본 미노출 제안. +- human↔verifier 경합 우선순위: last-writer-wins(현 동작) 유지 vs human override 정책 — 본 범위 밖. +- `disposition show/list`(STATE_EVENT ledger 조회) 별도 spec로 분리 여부. \ No newline at end of file diff --git a/docs/workbench/specs/human-disposition/requirements.md b/docs/workbench/specs/human-disposition/requirements.md new file mode 100644 index 0000000..3804a25 --- /dev/null +++ b/docs/workbench/specs/human-disposition/requirements.md @@ -0,0 +1,142 @@ +# 분석가 수동 disposition 채널 (`disposition set`) Requirements + +> 출처: security-scanner 잔여 기능 #4 (human/analyst disposition channel). 본 문서는 +> 실제 코드(`storage/base.py`, `storage/adapters/nosql_db/store.py`, +> `runtime/verify_artifact.py`, `core/finding/model.py`, `cli/commands/*`)를 읽고 +> 자문자답으로 모든 결정을 확정한 design-only spec이다. 대화형 질문 없음. + +## 승인 대상 + +- Source of truth: `requirements.md` +- Preview companion: `requirements.html` + +## 배경 / 코드 대조 검증 결과 + +- `set_finding_disposition`은 이미 backend-neutral Protocol(`storage/base.py:269-286`, + `FindingDispositionWriter`)이고 dynamodb 구현(`store.py:543-639`)이 존재한다. + 글로벌 `FINDING_STATE`(SK=`STATE#GLOBAL`)를 optimistic-lock(`version`)으로 update하고 + append-only `STATE_EVENT`를 같은 transaction으로 put하며, 충돌 시 3회 재시도한다. +- 시그니처: `set_finding_disposition(finding_id, *, status, verdict, actor, source, + reason, repo, rule_id, decided_at=None)`. `repo`/`rule_id`/`status`/`verdict`가 필수다. +- verifier 경로(`verify_artifact.py:198-221`, `record_verifier_disposition`)는 + `actor=config.model`, `source="verifier"`로 호출하고 status는 + `disposition_status_for_verdict`(`:224-230`)로 verdict에서 파생한다: + FALSE_POSITIVE→Status.FALSE_POSITIVE, TRUE_POSITIVE→Status.OPEN, 그 외→None(미기록). +- `read_finding_state(finding_id)`(`store.py:522-529`)는 FINDING_STATE item을 그대로 + 반환하고, 그 item은 `repo`/`ruleId`/`status`/`triage`/`fingerprint`를 보유한다 + (`items.py:711-728`). 즉 finding_id만 있으면 repo/rule_id를 도출할 수 있다. +- finding_id는 stable(`compute_finding_id`, model.py:142-148): repo·path·line·rule의 + canonical fingerprint에서 파생. 재탐지해도 동일 finding_id → 동일 글로벌 상태. +- 억제 메커니즘은 이미 성립: gate `_is_blocking`(`gate.py:38-49`)은 + status∈{RESOLVED,FALSE_POSITIVE,IGNORED}를 non-blocking으로, 재탐지의 FINDING_STATE + 쓰기는 `_put_state_item_if_absent`(조건부 생성, `store.py:662-674`)라 수동 상태를 + 덮지 않으며, read는 `merge_finding_states`(`access.py:148-156`)로 overlay한다. +- CLI는 `cli/commands/*`에서 `register(subparsers)`+`set_defaults(func=cmd_x)` 패턴 + (`app.py:14-35`). 서브커맨드 순서는 `test_cli.py:684`가 핀고정한다. +- jsonl backend는 `set_finding_disposition`/FINDING_STATE 미구현. compare-ghas가 + 이미 'jsonl이면 거부'(report.py:168-173) 선례를 둔다. + +## 질문-답변 흐름 (확정 결정 · 자문자답) + +### Q: CLI 이름/형태는? +**`security-scanner disposition set --finding-id --verdict [--reason ]`** +(+ storage args). 근거: report/gate/verify가 모두 동사형 단일 서브커맨드지만, +disposition은 STATE_EVENT ledger 조회(`read_finding_state_events`, `store.py:531-541`)를 +미래에 `disposition show/list`로 노출할 여지가 명확하다. `disposition ` 2단 +구조가 그 확장에 자연스럽고, 지금은 `set`만 구현한다(YAGNI). verb-only 단일 커맨드 +(`disposition-set`)는 미래 action 추가 시 이름 충돌을 부른다. + +### Q: status를 verdict에서 어떻게 파생하나? +**verifier와 동일 정신으로 파생하되 manual 전용 매핑 `manual_disposition_status_for_verdict`를 +신설**한다. 기존 `disposition_status_for_verdict`(verifier 의미: TRUE_POSITIVE→OPEN, +FALSE_POSITIVE→FALSE_POSITIVE, NEEDS_REVIEW→None)는 변경하지 않고 그대로 재사용한다. +사람만 가능한 **IGNORED 억제**를 위해 manual 매핑은 추가 케이스를 얹는다. 근거: verifier는 +'재오픈/오탐'만 표현하지만 사람은 '오탐은 아니지만 무시'(IGNORED)를 표현할 수 있어야 하고, +이는 gate에서 non-blocking이다(`gate.py:24-28`). verifier 함수에 IGNORED를 끼워넣으면 +verifier 동작이 바뀌므로 분리한다. + +### Q: repo/rule_id를 플래그로 받나, finding에서 가져오나? +**기본은 자동 도출, 플래그는 override escape hatch.** `read_finding_state(finding_id)` +반환 item의 `repo`/`ruleId`를 읽어 set_finding_disposition에 넘긴다. `--repo`/`--rule-id`는 +주어졌을 때만 도출값을 덮는다(레거시/불일치 보정용). 근거: FINDING_STATE 행이 둘 다 +보유하고(items.py:723-724), 내부 도메인 식별자를 매 호출마다 사람에게 요구하는 것은 +UX·정합성 모두 나쁘다. finding_id에 대응하는 상태 행이 없으면 set_finding_disposition이 +`ValueError("finding state does not exist for ...")`(`store.py:561-562`)로 raise → +미존재/오탐 finding_id는 자연히 거부된다(fail-safe). + +### Q: 입력 검증은 어디서 하나? +**CLI는 verdict를 argparse `choices`로 Verdict enum 값에 제한**(잘못된 값 → argparse가 +exit 2)하고, 그 외 도메인 검증(상태 존재, 상태 전이 적법성)은 store/도메인에 위임한다. +근거: `Triage.__post_init__`(model.py:283-288)·set_finding_disposition이 이미 검증을 +수행한다. CLI에서 중복 구현하면 SoT가 갈라진다. verify.py:87-89가 ValueError를 잡아 +`error: ...`/exit 2로 변환하는 패턴을 그대로 미러한다. + +### Q: 멱등성/감사 보장은 어떻게 되나? +**신규 보장 추가 없이 기존 transact_write를 그대로 경유**한다. 같은 판정을 반복 호출하면 +STATE_EVENT가 매번 1건 append되고(불변 audit log 의도와 일치), `version`은 단조 증가, +`decided_at`은 기본 호출 시각(UTC, 주입 가능). 근거: store.py:584-635가 version 조건부 +update + STATE_EVENT 조건부 put을 1 transaction으로 묶고 conflict 시 재시도한다. CLI는 +`actor="human"`, `source="manual"`만 고정 주입한다. + +### Q: 어떤 storage backend를 지원하나? +**dynamodb만.** jsonl backend면 stderr 경고 후 exit 2. 근거: FindingDispositionWriter/ +FINDING_STATE/STATE_EVENT는 DynamoDbCompatibleFindingStore에만 존재하고 JsonlFindingStore엔 +`set_finding_disposition`이 없다. compare-ghas(report.py:168-173)의 backend 거부 선례를 따른다. + +### Q: 재탐지 억제는 신규 코드가 필요한가? +**불필요.** 재탐지 시 `append()`→`finding_to_items`가 version=0 FINDING_STATE를 만들지만 +`_put_state_item_if_absent`(조건부 생성)가 이미 존재하는 수동 상태(version>=1)를 덮지 않고, +read 경로의 `merge_finding_states`가 FALSE_POSITIVE/IGNORED status를 overlay한다. gate는 +해당 status를 non-blocking 처리한다. 따라서 stable finding_id → 글로벌 상태 억제가 그대로 성립. + +### Q: 억제 의미의 verdict↔status 짝은? +- `false_positive` → status FALSE_POSITIVE + verdict FALSE_POSITIVE = **완전 억제**(오탐 단정). +- `true_positive` → status OPEN + verdict TRUE_POSITIVE = **계속 차단**(진성, 노출 유지). +- `ignored` → status IGNORED + verdict 유지 = **억제하되 오탐 단정 아님**. +근거: gate.py:5-9,38-49의 blocking 규칙(status∈{RESOLVED,FALSE_POSITIVE,IGNORED}= +non-blocking, OPEN+{NEEDS_REVIEW,TRUE_POSITIVE}=blocking). 사람의 '억제'는 FALSE_POSITIVE +또는 IGNORED status로만 달성된다. + +## 기능 요구사항 + +- `cli/commands/`에 disposition 커맨드 모듈을 신설하고 `register(subparsers)`로 + `disposition` 부모 parser + `set` 서브액션을 등록한다. `app.py:_COMMAND_MODULES`에 추가. +- `disposition set`은 다음 인자를 받는다: + - `--finding-id ` (필수) + - `--verdict {true_positive,false_positive,ignored}` (필수, 소문자 입력→내부 enum 매핑) + - `--reason ` (선택, 미제공 시 'Manual disposition; no reason provided.' 기본) + - `--repo ` / `--rule-id ` (선택, override only) + - 기존 storage args(`add_storage_args`, `--storage-backend` 등) +- 핸들러는 dynamodb store를 만들어 `read_finding_state(finding_id)`로 repo/rule_id를 + 도출(override 우선), `manual_disposition_status_for_verdict`로 status 파생, + `set_finding_disposition(..., actor="human", source="manual")`을 호출한다. +- jsonl backend는 거부(exit 2). 미존재 finding_id는 ValueError→exit 2. +- 성공 시 finding_id·이전→이후 status를 1줄로 출력(redacted only, 시크릿/원문 금지). + +## 비기능 요구사항 + +| 항목 | 요구값 | +| --- | --- | +| 범위 | YAGNI. `set`만 구현. `list`/`show`는 미래(부모 커맨드 구조만 확보). notification/endpoint 제외. target-repo PR gating 제외. | +| public-safe | redacted-only. reason은 사람이 입력한 자유텍스트이므로 시크릿을 권장하지 않음(문서 경고). 시크릿/원문/스택트레이스 미노출. | +| 채널 일관성 | verifier와 동일한 set_finding_disposition·FINDING_STATE·STATE_EVENT를 경유. 차이는 actor='human'/source='manual'과 IGNORED 지원뿐. verifier 함수/동작 불변. | +| backend | dynamodb 전용. jsonl 명시 거부(compare-ghas 선례). | +| 검증 | verdict는 argparse choices. 도메인 검증은 store 위임. ValueError→exit 2(verify.py 패턴). | +| 회귀 | `test_subcommand_registration_order_is_stable`(test_cli.py:684) expected 리스트 갱신 필수. set_finding_disposition·gate·merge_finding_states 동작 불변. | +| 의존 방향 | command→runtime/core/storage만. command→app 역참조 금지. 공유 헬퍼는 `cli/_args.py`/`cli/_store.py` 재사용. | + +## 사용자 시나리오 + +- 분석가가 리포트에서 오탐 finding_id를 확인하고 + `security-scanner disposition set --finding-id finding_xxx --verdict false_positive + --reason "test fixture, not a real key" --storage-backend dynamodb ...` 실행 → + 글로벌 상태가 FALSE_POSITIVE로 닫히고 STATE_EVENT 1건 append, 이후 재스캔/게이트에서 차단·노출되지 않음. +- 분석가가 진성이지만 당장 조치 불가한 finding을 `--verdict ignored`로 무시 → gate non-blocking, + 단 오탐으로 단정하지 않음(verdict 유지). + +## 미결정 항목 + +- IGNORED 표면: `--verdict ignored`(verdict 차원에 ignored 별칭) vs `--status` 별도 플래그. + design에서 `--verdict {true_positive,false_positive,ignored}` 단일 입력으로 확정 제안. +- `--decided-at` 사용자 노출 여부(소급 감사 vs 위변조 표면) — 기본 미노출. +- human↔verifier 경합 시 우선순위(last-writer-wins 유지 vs human override) — 본 범위 밖, open. \ No newline at end of file diff --git a/src/security_scanner/cli/app.py b/src/security_scanner/cli/app.py index 5b8cfbc..24eb15e 100644 --- a/src/security_scanner/cli/app.py +++ b/src/security_scanner/cli/app.py @@ -12,6 +12,7 @@ import sys from security_scanner.cli.commands import ( + disposition, doctor, migrate, quickstart, @@ -33,6 +34,7 @@ quickstart, targets, migrate, + disposition, ) diff --git a/src/security_scanner/cli/commands/disposition.py b/src/security_scanner/cli/commands/disposition.py new file mode 100644 index 0000000..69cd8a3 --- /dev/null +++ b/src/security_scanner/cli/commands/disposition.py @@ -0,0 +1,121 @@ +"""disposition subcommand: record a human/analyst finding verdict (#4). + +Lets a person manually mark a finding true_positive / false_positive through the +same channel the Ollama verifier uses (set_finding_disposition → global +FINDING_STATE + append-only STATE_EVENT ledger), with actor=human / source=manual. +Because finding_id is branch/commit-stable, a cleared finding stays cleared on +re-detection (global state), suppressing future re-asks. + +No notification/endpoint integration (out of scope). IGNORED status is not exposed +here (it is not a Verdict value); that is a possible follow-up. +""" + +from __future__ import annotations + +import argparse +import sys + +from security_scanner.cli._args import add_storage_args +from security_scanner.cli._store import store_from_args +from security_scanner.core.finding.model import Verdict +from security_scanner.runtime.verify_artifact import disposition_status_for_verdict + +_VERDICT_CHOICES = { + "true_positive": Verdict.TRUE_POSITIVE.value, + "false_positive": Verdict.FALSE_POSITIVE.value, +} + + +def register(subparsers) -> None: + parser = subparsers.add_parser( + "disposition", + help="Record a human verdict on a finding (manual triage).", + ) + actions = parser.add_subparsers(dest="disposition_action", metavar="") + set_parser = actions.add_parser( + "set", + help="Set a finding's verdict (true_positive/false_positive) manually.", + ) + set_parser.add_argument( + "--finding-id", required=True, metavar="ID", + help="Finding id to disposition.", + ) + set_parser.add_argument( + "--verdict", required=True, choices=sorted(_VERDICT_CHOICES), + help="Human verdict.", + ) + set_parser.add_argument( + "--reason", default="manual triage", metavar="TEXT", + help="Reason recorded in the disposition ledger (default: 'manual triage').", + ) + set_parser.add_argument( + "--repo", default=None, metavar="REPO", + help="Override repo (default: taken from the finding's stored state).", + ) + set_parser.add_argument( + "--rule-id", default=None, metavar="RULE", + help="Override rule id (default: taken from the finding's stored state).", + ) + add_storage_args(set_parser, include_jsonl_path="", default_backend="dynamodb") + set_parser.set_defaults(func=cmd_disposition_set) + + parser.set_defaults(func=_cmd_disposition_help, _parser=parser) + + +def _cmd_disposition_help(args: argparse.Namespace) -> int: + args._parser.print_help() + return 1 + + +def cmd_disposition_set(args: argparse.Namespace) -> int: + """Record a human verdict on one finding via set_finding_disposition.""" + if args.storage_backend != "dynamodb": + print( + "error: disposition requires --storage-backend dynamodb", + file=sys.stderr, + ) + return 2 + + store = store_from_args(args) + verdict = _VERDICT_CHOICES[args.verdict] + status = disposition_status_for_verdict(verdict) + if status is None: # defensive: only terminal verdicts are exposed + print(f"error: non-terminal verdict {args.verdict}", file=sys.stderr) + return 2 + + state = store.read_finding_state(args.finding_id) + if state is None: + print( + f"error: no finding state for {args.finding_id} " + "(unknown finding or not yet scanned)", + file=sys.stderr, + ) + return 2 + + repo = args.repo or state.get("repo") + rule_id = args.rule_id or state.get("ruleId") + if not repo or not rule_id: + print( + "error: finding state is missing repo/ruleId; pass --repo/--rule-id", + file=sys.stderr, + ) + return 2 + + previous_status = state.get("status") + try: + store.set_finding_disposition( + args.finding_id, + status=status, + verdict=verdict, + actor="human", + source="manual", + reason=args.reason, + repo=repo, + rule_id=rule_id, + ) + except ValueError as exc: + print(f"error: {exc}", file=sys.stderr) + return 2 + + print(f"Disposed {args.finding_id}: {previous_status} -> {status} (manual)") + return 0 diff --git a/tests/test_cli.py b/tests/test_cli.py index 8adf766..1e4086b 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -726,4 +726,5 @@ def test_subcommand_registration_order_is_stable(): "sync", "backfill-repo-axis", "backfill-list-axis", + "disposition", ] diff --git a/tests/test_cli_disposition.py b/tests/test_cli_disposition.py new file mode 100644 index 0000000..4843f29 --- /dev/null +++ b/tests/test_cli_disposition.py @@ -0,0 +1,104 @@ +"""CLI tests for the disposition subcommand (#23 follow-on #4).""" + +from __future__ import annotations + +from security_scanner.cli import main + + +class FakeDispositionStore: + def __init__(self, state: dict | None) -> None: + self._state = state + self.calls: list[dict] = [] + + def read_finding_state(self, finding_id: str) -> dict | None: + return self._state + + def set_finding_disposition(self, finding_id, **kwargs): + self.calls.append({"finding_id": finding_id, **kwargs}) + + +def _patch(monkeypatch, store): + monkeypatch.setattr( + "security_scanner.cli._store.create_finding_store", + lambda backend, **kwargs: store, + ) + + +_STATE = {"status": "OPEN", "repo": "org/repo", "ruleId": "generic-api-key"} + + +def test_set_false_positive_writes_disposition(monkeypatch, capsys): + store = FakeDispositionStore(dict(_STATE)) + _patch(monkeypatch, store) + + exit_code = main([ + "disposition", "set", "--finding-id", "finding_x", + "--verdict", "false_positive", "--reason", "test fixture", + "--storage-backend", "dynamodb", + ]) + + out = capsys.readouterr().out + assert exit_code == 0 + assert "Disposed finding_x: OPEN -> FALSE_POSITIVE (manual)" in out + assert len(store.calls) == 1 + call = store.calls[0] + assert call["status"] == "FALSE_POSITIVE" + assert call["verdict"] == "FALSE_POSITIVE" + assert call["actor"] == "human" and call["source"] == "manual" + assert call["reason"] == "test fixture" + assert call["repo"] == "org/repo" and call["rule_id"] == "generic-api-key" + + +def test_set_true_positive_maps_to_open(monkeypatch, capsys): + store = FakeDispositionStore(dict(_STATE)) + _patch(monkeypatch, store) + exit_code = main([ + "disposition", "set", "--finding-id", "f", "--verdict", "true_positive", + "--storage-backend", "dynamodb", + ]) + assert exit_code == 0 + assert store.calls[0]["status"] == "OPEN" + assert store.calls[0]["verdict"] == "TRUE_POSITIVE" + + +def test_unknown_finding_fails(monkeypatch, capsys): + store = FakeDispositionStore(None) + _patch(monkeypatch, store) + exit_code = main([ + "disposition", "set", "--finding-id", "missing", + "--verdict", "false_positive", "--storage-backend", "dynamodb", + ]) + assert exit_code == 2 + assert "no finding state for missing" in capsys.readouterr().err + assert store.calls == [] + + +def test_repo_rule_override(monkeypatch): + store = FakeDispositionStore({"status": "OPEN"}) # state missing repo/ruleId + _patch(monkeypatch, store) + exit_code = main([ + "disposition", "set", "--finding-id", "f", "--verdict", "false_positive", + "--repo", "o/r", "--rule-id", "rule_z", "--storage-backend", "dynamodb", + ]) + assert exit_code == 0 + assert store.calls[0]["repo"] == "o/r" and store.calls[0]["rule_id"] == "rule_z" + + +def test_missing_repo_rule_without_override_fails(monkeypatch, capsys): + store = FakeDispositionStore({"status": "OPEN"}) # no repo/ruleId, no override + _patch(monkeypatch, store) + exit_code = main([ + "disposition", "set", "--finding-id", "f", "--verdict", "false_positive", + "--storage-backend", "dynamodb", + ]) + assert exit_code == 2 + assert "missing repo/ruleId" in capsys.readouterr().err + + +def test_rejects_jsonl_backend(capsys): + exit_code = main([ + "disposition", "set", "--finding-id", "f", "--verdict", "false_positive", + "--storage-backend", "jsonl", + ]) + assert exit_code == 2 + assert "requires --storage-backend dynamodb" in capsys.readouterr().err