From 5f8b55f5875d03aa4c6f05f602ea156ddf74e143 Mon Sep 17 00:00:00 2001 From: lsh1756 Date: Sat, 27 Jun 2026 17:24:58 +0900 Subject: [PATCH 1/2] =?UTF-8?q?feat:=20dead-letter=20auto-requeue=20?= =?UTF-8?q?=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * transient dead-letter root 분류와 1회 자동 재투입 정책 추가 * NoSQL 조건부 requeue와 personal systemd timer-ready unit 추가 * CLI/runtime/storage 회귀 테스트와 리뷰 보강 케이스 추가 --- ...-personal-dead-letter-auto-requeue.service | 27 ++ ...er-personal-dead-letter-auto-requeue.timer | 12 + .../specs/dead-letter-auto-requeue/design.md | 301 ++++++++++++++++++ .../requirements.html | 42 +++ .../dead-letter-auto-requeue/requirements.md | 112 +++++++ src/security_scanner/cli/commands/scan.py | 95 ++++++ .../runtime/dead_letter_recovery.py | 189 ++++++++++- .../storage/adapters/nosql_db/items.py | 2 + .../storage/adapters/nosql_db/store.py | 103 +++++- src/security_scanner/storage/base.py | 13 + tests/test_cli_dead_letter.py | 148 ++++++++- tests/test_dead_letter_recovery.py | 277 +++++++++++++++- tests/test_incremental_scan_storage.py | 177 ++++++++++ tests/test_personal_prod_systemd_units.py | 22 ++ 14 files changed, 1499 insertions(+), 21 deletions(-) create mode 100644 deploy/systemd/user/security-scanner-personal-dead-letter-auto-requeue.service create mode 100644 deploy/systemd/user/security-scanner-personal-dead-letter-auto-requeue.timer create mode 100644 docs/workbench/specs/dead-letter-auto-requeue/design.md create mode 100644 docs/workbench/specs/dead-letter-auto-requeue/requirements.html create mode 100644 docs/workbench/specs/dead-letter-auto-requeue/requirements.md diff --git a/deploy/systemd/user/security-scanner-personal-dead-letter-auto-requeue.service b/deploy/systemd/user/security-scanner-personal-dead-letter-auto-requeue.service new file mode 100644 index 0000000..5fed7a0 --- /dev/null +++ b/deploy/systemd/user/security-scanner-personal-dead-letter-auto-requeue.service @@ -0,0 +1,27 @@ +[Unit] +Description=security-scanner personal dead-letter auto-requeue +Documentation=https://github.com/source-security-dev/security-scanner + +[Service] +Type=oneshot +Slice=securityscanner.slice +Nice=15 +IOSchedulingClass=idle +TasksMax=128 +WorkingDirectory=%h/security-scanner +EnvironmentFile=-%h/.config/security-scanner/personal-prod.env +Environment=PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/bin +Environment=SECURITY_SCANNER_STORAGE_BACKEND=dynamodb +Environment=SECURITY_SCANNER_DYNAMO_ENDPOINT=http://localhost:4567 +Environment=SECURITY_SCANNER_DYNAMO_TABLE=security_scanner_personal +Environment=SECURITY_SCANNER_DEAD_LETTER_AUTO_REQUEUE_LIMIT=10 +Environment=SECURITY_SCANNER_DEAD_LETTER_AUTO_REQUEUE_COOLDOWN_MINUTES=30 +ExecStart=/usr/bin/env PATH=%h/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/bin %h/.local/bin/uv run security-scanner dead-letter auto-requeue \ + --storage-backend ${SECURITY_SCANNER_STORAGE_BACKEND} \ + --job-type verify \ + --cooldown-minutes ${SECURITY_SCANNER_DEAD_LETTER_AUTO_REQUEUE_COOLDOWN_MINUTES} \ + --limit ${SECURITY_SCANNER_DEAD_LETTER_AUTO_REQUEUE_LIMIT} \ + --apply + +[Install] +WantedBy=default.target diff --git a/deploy/systemd/user/security-scanner-personal-dead-letter-auto-requeue.timer b/deploy/systemd/user/security-scanner-personal-dead-letter-auto-requeue.timer new file mode 100644 index 0000000..e274739 --- /dev/null +++ b/deploy/systemd/user/security-scanner-personal-dead-letter-auto-requeue.timer @@ -0,0 +1,12 @@ +[Unit] +Description=Scheduler for security-scanner personal dead-letter auto-requeue +Documentation=https://github.com/source-security-dev/security-scanner + +[Timer] +OnCalendar=*:0/30:00 +Persistent=true +RandomizedDelaySec=300 +Unit=security-scanner-personal-dead-letter-auto-requeue.service + +[Install] +WantedBy=timers.target diff --git a/docs/workbench/specs/dead-letter-auto-requeue/design.md b/docs/workbench/specs/dead-letter-auto-requeue/design.md new file mode 100644 index 0000000..a4806f0 --- /dev/null +++ b/docs/workbench/specs/dead-letter-auto-requeue/design.md @@ -0,0 +1,301 @@ +# Dead-Letter Auto-Requeue Design Spec + +## Overview + +Dead-letter auto-requeue extends the existing dead-letter recovery module with a +conservative, timer-ready policy for transient verifier failures. It reuses the +current DB-backed `SCAN_JOB` queue and manual recovery seam, without adding SQS, +LocalStack, a new table, or a new GSI. + +## Requirements Reference + +- Phase 1 source: `requirements.md` +- Preview companion: `requirements.html` +- Core scope: one PR, conservative timer-ready auto-requeue. +- Core policy: verify transient dead-letter jobs only, default cooldown 30 + minutes, at most one automatic requeue per job. +- Explicit exclusions: SQS, LocalStack, Kafka-style offsets, outbox framework, + new storage tables, new GSIs, broad automatic recovery. + +## Approach Proposal + +### Selected: extend existing recovery seam + +Add auto-requeue behavior to the existing `runtime/dead_letter_recovery.py` +runtime seam, storage protocol, scan CLI command group, and user systemd unit +set. + +Why this is selected: + +- Auto-requeue is a policy-controlled subset of dead-letter recovery, not a new + queue subsystem. +- The current module already owns public-safe classification, bounded reads, + guarded requeue, and rendering. +- Reusing the seam keeps manual and automatic recovery consistent while avoiding + duplicate DTOs and storage paths. + +### Guardrail inside the selected approach + +Do not bolt timer/cooldown conditions into the manual requeue path. The module +should expose separate use cases: + +- manual recovery: operator-selected subset; +- automatic recovery: policy-selected transient subset; +- shared primitives: classification, public-safe projection, bounded guarded + storage apply. + +### Rejected: separate auto-requeue module + +A separate module would create a cleaner file split, but it would likely +duplicate classification, selection, rendering, and guarded storage logic already +owned by dead-letter recovery. + +### Rejected: queue policy layer + +A general retry/dead-letter policy layer could be useful later, but this PR does +not need multi-queue policy orchestration, provider-specific behavior, or a new +workflow abstraction. + +## Architecture + +```mermaid +flowchart TD + Timer["systemd user timer"] --> Service["auto-requeue service"] + Operator["operator CLI dry-run/apply"] --> CLI["scan dead-letter auto-requeue"] + Service --> CLI + CLI --> Runtime["runtime.dead_letter_recovery"] + Runtime --> Classifier["DeadLetterClassification"] + Runtime --> Policy["AutoRequeuePolicy"] + Runtime --> Store["IncrementalScanStore dead-letter methods"] + Store --> DB["SCAN_JOB status=dead_letter rows"] + Policy --> Selection["verify + transient + cooldown + one-shot"] + Selection --> GuardedApply["conditional pending transition"] + GuardedApply --> Pending["SCAN_JOB status=pending"] + Pending --> Drain["verify-drain"] +``` + +## Data Flow + +### Dry Run + +1. CLI builds an `AutoRequeueDeadLetterRequest`. +2. Runtime reads a bounded dead-letter page using the existing storage seam. +3. Runtime classifies each job into: + - terminal reason; + - root error class; + - public-safe auto eligibility. +4. Runtime filters to jobs that match all automatic policy gates. +5. Runtime returns a public-safe summary with `would_move`, selected root error + class counts, and skipped reasons. +6. No storage mutation occurs. + +### Apply + +1. The same request runs with explicit apply enabled. +2. Runtime computes eligible jobs with the same policy as dry run. +3. Runtime asks storage to move only matching rows from `dead_letter` to + `pending`. +4. Storage applies existing safety guards: + - row still exists; + - status is still `dead_letter`; + - updated time is not newer than the selection watermark; + - filter fields still match. +5. Storage clears execution ownership and sets `next_attempt_at` to apply time. +6. Storage preserves failure evidence and marks the job as already + auto-requeued once. +7. Runtime returns moved/skipped counts and public-safe classification counts. + +### Timer + +1. The user-level timer invokes the auto-requeue CLI with conservative defaults. +2. Default policy targets verify jobs only. +3. Default cooldown is 30 minutes. +4. Default limit is small and configurable through unit environment. +5. Timer files are shipped but not treated as broad auto-recovery being enabled. + +## Component Details + +### `runtime.dead_letter_recovery` + +Add a richer classification DTO while preserving public-safe output. + +Conceptual shape: + +```text +DeadLetterClassification + terminal_reason + root_error_class + auto_requeue_eligible +``` + +`terminal_reason` explains why the job reached `dead_letter`, for example retry +budget exhausted or lease expiry budget exhausted. + +`root_error_class` explains what kind of failure caused that terminal state, for +example verifier timeout, verifier transport, malformed verify job, scanner +runtime, or unknown. + +The current single `error_class` rendering can stay for compatibility, but +auto-requeue selection must use `root_error_class`, not only +`retry-budget-exhausted`. + +Add a dedicated automatic use case: + +```text +auto_requeue_dead_letter_jobs(request) -> DeadLetterAutoRequeueSummary +``` + +Responsibilities: + +- validate positive limit and non-negative cooldown; +- select only jobs older than the cooldown; +- select only allowed job types, defaulting to verify when invoked by timer; +- select only transient root error classes; +- skip jobs that were already automatically requeued once; +- support dry-run and explicit apply; +- return public-safe moved/would-move/skipped counts. + +### Storage + +Reuse existing dead-letter inspect and guarded requeue operations, but add enough +stored evidence to enforce one automatic requeue per job. + +The implementation can choose the smallest compatible representation, but it +must support: + +- checking whether a job was already auto-requeued; +- marking a moved job as auto-requeued once during apply; +- preserving attempts, max attempts, lease expiry counters, fence, last error, + and finding snapshot. + +No new table or GSI is introduced. The metadata must live on the existing +`SCAN_JOB` row or an already-supported extension field. + +### CLI + +Add a scan queue adjacent command under the existing dead-letter group. + +Recommended shape: + +```text +dead-letter auto-requeue +``` + +Required behavior: + +- dry-run by default; +- `--apply` required for mutation; +- bounded `--limit`; +- `--cooldown-minutes` defaulting to 30; +- `--job-type` supported, timer defaulting to verify; +- `--root-error-class` or equivalent filter supported for diagnostics; +- public-safe output only. + +The command should render: + +- applied yes/no; +- would move; +- moved; +- selected counts by job type and root error class; +- skipped counts by public-safe reason. + +### Systemd User Units + +Add personal user units for conservative timer-ready operation. + +Recommended files: + +```text +deploy/systemd/user/security-scanner-personal-dead-letter-auto-requeue.service +deploy/systemd/user/security-scanner-personal-dead-letter-auto-requeue.timer +``` + +The service should invoke the CLI with: + +- storage backend from the personal environment; +- job type verify; +- cooldown 30 minutes; +- small limit; +- explicit apply. + +The timer cadence should be slower than the cooldown or otherwise avoid tight +repeat loops. The unit files should be installable, but enabling remains an +operator action. + +## Error Handling + +- Unsupported storage backend exits before mutation. +- Invalid limit or cooldown exits before mutation. +- Unknown or permanent root error classes are skipped, not fatal. +- Jobs updated after selection are skipped by conditional write. +- Jobs already auto-requeued once are skipped. +- Empty eligible set is a successful no-op. +- Storage failures fail the command with a public-safe diagnostic. +- Raw job id, raw error, private repo/path/ref/commit material, endpoint, prompt, + response, and finding snapshot are never printed. + +## Testing Strategy + +### Runtime tests + +- Retry-budget-exhausted timeout jobs expose root error class verifier timeout. +- Retry-budget-exhausted transport jobs expose root error class verifier + transport. +- Malformed verify jobs are never auto eligible. +- Unknown jobs are never auto eligible. +- Cooldown blocks recent dead-letter jobs. +- A job already auto-requeued once is skipped. +- Dry-run performs no mutation and reports would-move counts. +- Apply reports moved and skipped counts. + +### Storage tests + +- Auto-requeue apply moves eligible dead-letter rows to pending. +- Apply marks the row as auto-requeued once. +- Apply preserves failure evidence and retry/lease counters. +- Apply skips rows changed after selection. +- Existing manual requeue behavior remains compatible. + +### CLI tests + +- Auto-requeue dry-run renders public-safe summary. +- Auto-requeue apply requires explicit flag. +- Unsupported backend exits without mutation. +- Cooldown and limit validation reject invalid values. +- Output excludes raw private or secret-like material. + +### Systemd tests + +- Personal unit invokes the new CLI command. +- Personal unit uses verify job type and 30 minute cooldown. +- Timer exists and is not a broad recovery surface. + +## TDD Strategy + +Use red -> green -> refactor. + +1. Add failing runtime tests for split classification and auto eligibility. +2. Add failing runtime tests for cooldown, one-shot skip, dry-run, and apply + summary. +3. Add failing storage tests for marking one-shot metadata and preserving + existing failure evidence. +4. Add failing CLI tests for dry-run/apply and validation. +5. Add failing systemd unit tests for conservative timer-ready wiring. +6. Implement the smallest runtime, storage, CLI, and unit changes to pass. +7. Refactor only after focused tests pass. + +## Milestones + +- M1: Classification split — done when tests prove terminal reason and root + error class are distinct and timeout/transport exhausted jobs are eligible. +- M2: Auto-requeue runtime policy — done when tests prove cooldown, one-shot + guard, dry-run, apply, and public-safe skipped counts. +- M3: Storage evidence — done when tests prove rows can be marked + auto-requeued once without resetting failure history. +- M4: CLI and timer-ready units — done when CLI and systemd tests prove + conservative verify-only auto-requeue wiring. + +## Open Questions + +- None. Implementation may adjust exact flag names if the approved behavior and + public-safe contract remain intact. diff --git a/docs/workbench/specs/dead-letter-auto-requeue/requirements.html b/docs/workbench/specs/dead-letter-auto-requeue/requirements.html new file mode 100644 index 0000000..554edea --- /dev/null +++ b/docs/workbench/specs/dead-letter-auto-requeue/requirements.html @@ -0,0 +1,42 @@ + + + + + + Dead-Letter Auto-Requeue Requirements + + + +

Dead-Letter Auto-Requeue Requirements

+

Generated companion preview. Source of truth is requirements.md.

+

승인 대상

+ +

핵심 목표

+

기존 수동 dead-letter recovery 위에 transient failure 전용 자동 requeue를 단일 PR로 추가한다. 기본 운영 posture는 conservative timer-ready다. SQS, LocalStack, outbox, 새 테이블, 새 GSI는 범위 밖이다.

+

주요 요구사항

+ +

미결정 항목

+

없음. requirements.md 승인 후 Phase 2에서 design spec을 작성한다.

+ + diff --git a/docs/workbench/specs/dead-letter-auto-requeue/requirements.md b/docs/workbench/specs/dead-letter-auto-requeue/requirements.md new file mode 100644 index 0000000..b8e9264 --- /dev/null +++ b/docs/workbench/specs/dead-letter-auto-requeue/requirements.md @@ -0,0 +1,112 @@ +# Dead-Letter Auto-Requeue Requirements + +## 승인 대상 + +- Source of truth: `requirements.md` +- Preview companion: `requirements.html` + +## 질문-답변 흐름 + +### Q: 이번 단일 PR의 목표는 무엇인가? + +Transient dead-letter auto-requeue로 잡는다. + +기존 수동 dead-letter inspect/requeue는 유지한다. 이번 PR은 과부하, 일시 LLM timeout, transport 장애, lease expiry budget 소진처럼 시간이 지나면 회복될 수 있는 dead-letter job만 제한적으로 다시 `pending`에 올려 운영자 반복 작업을 줄인다. + +배제한 선택지는 다음과 같다. + +- 모든 dead-letter 자동 재투입: poison job loop와 verifier 비용 폭증 위험이 크다. +- SQS/LocalStack/outbox 도입: 현재 scanner queue 요구사항보다 무겁고 이번 문제의 직접 원인이 아니다. +- 수동 requeue만 유지: 안전하지만 과부하성 transient dead-letter가 반복될 때 운영자가 같은 작업을 계속 수행해야 한다. + +### Q: 자동 requeue의 기본 운영 posture를 어디까지 열 것인가? + +Conservative timer-ready로 확정한다. + +이번 PR은 CLI와 systemd user service/timer 파일을 제공하되, 자동 requeue 대상은 verify job의 transient failure subset으로 좁히고 small limit과 cooldown을 기본 전제로 둔다. Timer 파일은 배포 가능해야 하지만 broad auto-recovery를 기본값으로 만들지 않는다. 실제 enable 여부와 cadence 조정은 운영자가 별도 판단한다. + +배제한 선택지는 다음과 같다. + +- Controlled auto-on: 설치 즉시 timer가 주기 실행되어 편하지만, transient 분류 오류나 endpoint 장애가 남아 있을 때 자동 재투입이 바로 반복될 수 있다. +- Manual-first: 분류 개선과 manual command까지만 다루면 가장 안전하지만, 이번 단일 PR에서 timer-ready 자동화까지 갖추려는 목표에는 못 미친다. + +### Q: 같은 dead-letter job에 자동 재투입 기회를 몇 번까지 줄 것인가? + +최대 1회로 확정한다. + +Transient failure로 분류된 job은 cooldown 이후 자동으로 한 번만 `pending`에 복귀할 수 있다. 같은 job이 다시 dead-letter 되면 자동화는 멈추고 operator-only 상태로 남긴다. 이 정책은 과부하성 일시 장애에는 회복 기회를 주되, 잘못 분류된 poison job이나 계속 불안정한 verifier endpoint가 자동 루프를 만드는 것을 막는다. + +배제한 선택지는 다음과 같다. + +- 최대 3회: 일시 장애 회복 가능성은 높지만 endpoint 문제가 지속될 때 자동 재투입이 길게 반복된다. +- 기존 retry budget만 사용: 구현은 작지만 dead-letter 이후 자동 재투입 횟수를 별도로 제한하지 못해 cooldown마다 반복될 수 있다. + +### Q: 자동 requeue의 cooldown 기본값을 얼마로 둘 것인가? + +30분으로 확정한다. + +Dead-letter 직후 즉시 재투입하지 않고 30분을 기다린 뒤 transient 후보만 자동 requeue 대상으로 삼는다. 이 값은 과부하나 verifier endpoint 흔들림이 가라앉을 시간을 주면서, 2시간 이상 지연되는 보수 정책보다 backlog 회복을 빠르게 시작한다. + +배제한 선택지는 다음과 같다. + +- 2시간: 더 보수적이지만 transient 복구와 backlog drain 시작이 늦어진다. +- 운영자가 항상 지정: 실수 방지는 강하지만 timer-ready 기본 운영 경험이 나빠진다. + +## 기능 요구사항 + +- 운영자는 transient dead-letter만 대상으로 하는 자동 requeue를 dry-run으로 확인할 수 있어야 한다. +- 운영자는 명시적 apply 모드에서만 자동 requeue 후보를 실제 `pending` 상태로 되돌릴 수 있어야 한다. +- 자동 requeue는 기존 수동 `dead-letter inspect` / `dead-letter requeue` 흐름과 같은 public-safe 출력 원칙을 지켜야 한다. +- 자동 requeue는 한 번에 처리할 최대 job 수를 반드시 가져야 한다. +- 자동 requeue는 dead-letter가 된 뒤 일정 cooldown이 지난 job만 대상으로 해야 한다. +- 자동 requeue의 기본 cooldown은 30분이어야 한다. +- 자동 requeue는 job type filter를 지원해야 하며, 첫 운영 대상은 verify job으로 제한할 수 있어야 한다. +- 자동 requeue는 root error class가 transient인 job만 대상으로 해야 한다. +- Transient root error class는 최소 verifier timeout, verifier transport, lease-expired budget 계열을 포함해야 한다. +- 자동 requeue는 malformed verify payload, missing finding snapshot, schema/validation 문제, unknown error를 자동 대상에서 제외해야 한다. +- 자동 requeue는 기존 실패 이력과 retry/lease budget evidence를 public-safe하게 보존해야 한다. +- 자동 requeue는 같은 job에 최대 1회만 실행 기회를 줘야 한다. +- 자동 requeue 이후 같은 job이 다시 dead-letter 되면 자동 대상에서 제외하고 operator-only 상태로 남겨야 한다. +- 자동 requeue는 자동 재투입 여부를 판단할 public-safe evidence를 저장하거나 유지해야 한다. +- 자동 requeue는 이미 변경된 job, 더 이상 filter와 맞지 않는 job, preview 기준 이후 새로 dead-letter 된 job을 임의로 움직이지 않아야 한다. +- 자동 requeue 결과는 moved, would move, skipped, selected root error class counts를 public-safe aggregate로 보여줘야 한다. +- 자동 requeue는 기본적으로 systemd user timer에서 주기 실행할 수 있어야 한다. +- Timer는 처음부터 broad auto-recovery를 수행하지 않고 verify transient subset을 작은 limit으로 처리할 수 있어야 한다. +- Timer-ready 산출물은 제공하되, 운영자가 별도로 enable하기 전까지 broad auto-recovery를 전제로 하지 않아야 한다. +- 기존 manual inspect/requeue command는 계속 사용할 수 있어야 한다. + +## 비기능 요구사항 + +| 항목 | 요구값 | +| --- | --- | +| Public safety | committed file, CLI output, timer output에는 raw secret, private repo/path/ref, endpoint, credential-like value, raw prompt/response, raw `last_error`, raw `finding_snapshot`을 노출하지 않는다. | +| Operational safety | 자동 requeue는 bounded, cooldown-gated, class-gated, explicit apply-gated여야 한다. | +| Poison-loop guard | permanent 또는 unknown failure는 자동 재투입하지 않는다. | +| Scope control | SQS, LocalStack, Kafka-style offset, 새 테이블, 새 GSI, heavy outbox는 이번 PR 범위가 아니다. | +| Compatibility | 기존 DB-backed `SCAN_JOB` queue와 수동 dead-letter recovery semantics를 깨지 않는다. | +| Observability | 자동 requeue는 사람이 원인을 파악할 수 있는 public-safe aggregate와 skipped counts를 남겨야 한다. | +| TDD | root classification, candidate selection, dry-run/apply, cooldown, timer unit을 focused tests로 검증한다. | + +## 사용자 시나리오 + +- LLM verifier throughput보다 verify job 유입이 빨라 timeout성 dead-letter가 쌓인다. +- 유입이 멈춘 뒤 cooldown이 지나면 auto-requeue가 timeout/transport 계열 verify job 일부를 `pending`으로 되돌린다. +- 다음 verify-drain이 되돌린 job을 다시 처리한다. +- 성공한 job은 completed/disposition write로 끝난다. +- 계속 실패하는 job은 다시 dead-letter가 되며, 자동 정책 한계 밖이면 운영자가 inspect한다. +- malformed payload나 missing finding snapshot 같은 permanent job은 자동 requeue 대상에서 제외되어 계속 operator-only 상태로 남는다. + +## 코드 기반 사실 + +- 기존 `runtime/dead_letter_recovery.py`는 public-safe inspect/requeue DTO와 renderer를 가진다. +- 기존 `DeadLetterFilters`는 `job_type`과 `error_class` filter를 가진다. +- 기존 `error_class_for_job`은 lease expiry budget, retry budget exhausted, malformed verify job, verifier transport, verifier timeout, scanner runtime, unknown을 coarse bucket으로 분류한다. +- 현재 분류는 attempts budget exhausted가 root cause보다 먼저 적용될 수 있어, timeout으로 retry budget이 소진된 job이 `retry-budget-exhausted`로 뭉뚱그려질 수 있다. +- 기존 NoSQL store는 bounded dead-letter inspect와 conditional requeue apply를 제공한다. +- 기존 requeue는 status가 여전히 `dead_letter`이고 preview watermark 이전 job만 `pending`으로 되돌린다. +- 기존 CLI는 `dead-letter inspect`와 `dead-letter requeue`를 제공한다. +- 기존 personal verifier는 `verify-drain` systemd user service/timer로 동작한다. + +## 미결정 항목 + +- 없음. `requirements.md` 승인 후 Phase 2에서 접근 후보와 detailed design을 작성한다. diff --git a/src/security_scanner/cli/commands/scan.py b/src/security_scanner/cli/commands/scan.py index 5af63a8..a6f8171 100644 --- a/src/security_scanner/cli/commands/scan.py +++ b/src/security_scanner/cli/commands/scan.py @@ -39,10 +39,15 @@ residual_for_repo, ) from security_scanner.runtime.dead_letter_recovery import ( + DEFAULT_AUTO_REQUEUE_COOLDOWN, + DEFAULT_TRANSIENT_ROOT_ERROR_CLASSES, + DeadLetterAutoRequeueRequest, DeadLetterFilters, DeadLetterInspectRequest, DeadLetterRequeueRequest, + auto_requeue_dead_letter_jobs, inspect_dead_letter_jobs, + render_dead_letter_auto_requeue, render_dead_letter_inspect, render_dead_letter_requeue, requeue_dead_letter_jobs, @@ -95,6 +100,7 @@ resolve_verifier_config, ) from security_scanner.runtime.verify_queue import ( + JOB_TYPE_VERIFY, HistoricalVerifyBackfillSummary, backfill_historical_verify_jobs, enqueue_historical_verify_jobs, @@ -451,6 +457,40 @@ def register(subparsers) -> None: add_incremental_storage_args(dead_letter_requeue_parser) dead_letter_requeue_parser.set_defaults(func=cmd_dead_letter_requeue) + dead_letter_auto_requeue_parser = dead_letter_subparsers.add_parser( + "auto-requeue", + help="Preview or apply transient dead-letter auto-requeue.", + ) + dead_letter_auto_requeue_parser.add_argument( + "--limit", + type=int, + required=True, + metavar="N", + help="Maximum dead-letter jobs to inspect for auto-requeue.", + ) + dead_letter_auto_requeue_parser.add_argument( + "--cooldown-minutes", + type=float, + default=DEFAULT_AUTO_REQUEUE_COOLDOWN.total_seconds() / 60, + metavar="N", + help="Minimum dead-letter age before auto-requeue (default: 30).", + ) + dead_letter_auto_requeue_parser.add_argument( + "--root-error-class", + action="append", + default=None, + metavar="CLASS", + help="Transient root error class to include; repeat for multiple classes.", + ) + dead_letter_auto_requeue_parser.add_argument( + "--apply", + action="store_true", + help="Actually move eligible transient jobs to pending.", + ) + _add_dead_letter_filter_args(dead_letter_auto_requeue_parser) + add_incremental_storage_args(dead_letter_auto_requeue_parser) + dead_letter_auto_requeue_parser.set_defaults(func=cmd_dead_letter_auto_requeue) + residual_parser = subparsers.add_parser( "residual", help="Show per-branch residual findings for a repository.", @@ -1101,6 +1141,61 @@ def cmd_dead_letter_requeue(args: argparse.Namespace) -> int: return 0 +def cmd_dead_letter_auto_requeue(args: argparse.Namespace) -> int: + """Preview or apply transient dead-letter auto-requeue.""" + + if args.storage_backend != "dynamodb": + print( + "error: dead-letter auto-requeue supports --storage-backend dynamodb only", + file=sys.stderr, + ) + return 2 + transient_root_error_classes = ( + frozenset(args.root_error_class) if args.root_error_class else None + ) + try: + request_kwargs = {} + if transient_root_error_classes is not None: + unsupported = ( + transient_root_error_classes - DEFAULT_TRANSIENT_ROOT_ERROR_CLASSES + ) + if unsupported: + allowed = ", ".join(sorted(DEFAULT_TRANSIENT_ROOT_ERROR_CLASSES)) + invalid = ", ".join(sorted(unsupported)) + raise ValueError( + "transient_root_error_classes must be a subset of " + f"supported transient classes ({allowed}); invalid: {invalid}" + ) + request_kwargs["transient_root_error_classes"] = ( + transient_root_error_classes + ) + filters = _dead_letter_filters_from_args(args) + if filters.job_type is None: + filters = DeadLetterFilters( + job_type=JOB_TYPE_VERIFY, + error_class=filters.error_class, + ) + summary = auto_requeue_dead_letter_jobs( + DeadLetterAutoRequeueRequest( + store=store_from_args(args), + limit=args.limit, + filters=filters, + cooldown=dt.timedelta(minutes=args.cooldown_minutes), + apply=args.apply, + **request_kwargs, + ) + ) + except ValueError as exc: + print(f"error: {exc}", file=sys.stderr) + return 2 + except Exception as exc: # noqa: BLE001 - fatal storage/runtime error. + print(f"error: dead-letter auto-requeue failed: {exc}", file=sys.stderr) + return 1 + + print(render_dead_letter_auto_requeue(summary), end="") + return 0 + + def _add_dead_letter_filter_args(parser: argparse.ArgumentParser) -> None: parser.add_argument( "--job-type", diff --git a/src/security_scanner/runtime/dead_letter_recovery.py b/src/security_scanner/runtime/dead_letter_recovery.py index 7ed8952..5f059a1 100644 --- a/src/security_scanner/runtime/dead_letter_recovery.py +++ b/src/security_scanner/runtime/dead_letter_recovery.py @@ -18,6 +18,18 @@ ERROR_CLASS_VERIFIER_TIMEOUT = "verifier-timeout" ERROR_CLASS_SCANNER_RUNTIME = "scanner-runtime" ERROR_CLASS_UNKNOWN = "unknown" +TERMINAL_REASON_RETRY_BUDGET_EXHAUSTED = ERROR_CLASS_RETRY_BUDGET_EXHAUSTED +TERMINAL_REASON_LEASE_EXPIRED_BUDGET = ERROR_CLASS_LEASE_EXPIRED_BUDGET +TERMINAL_REASON_NONE = "none" +DEFAULT_AUTO_REQUEUE_COOLDOWN = dt.timedelta(minutes=30) +DEFAULT_AUTO_REQUEUE_SCAN_PAGES = 10 +DEFAULT_TRANSIENT_ROOT_ERROR_CLASSES = frozenset( + { + ERROR_CLASS_LEASE_EXPIRED_BUDGET, + ERROR_CLASS_VERIFIER_TIMEOUT, + ERROR_CLASS_VERIFIER_TRANSPORT, + } +) @dataclass(frozen=True) @@ -40,6 +52,16 @@ class DeadLetterSample: age: str +@dataclass(frozen=True) +class DeadLetterClassification: + """Public-safe dead-letter classification for recovery decisions.""" + + terminal_reason: str + root_error_class: str + error_class: str + auto_requeue_eligible: bool + + @dataclass(frozen=True) class DeadLetterInspectRequest: """Inputs for one public-safe dead-letter inspect read.""" @@ -88,6 +110,33 @@ class DeadLetterRequeueSummary: applied: bool = False +@dataclass(frozen=True) +class DeadLetterAutoRequeueRequest: + """Inputs for policy-selected transient dead-letter auto-requeue.""" + + store: DeadLetterRecoveryStore + limit: int + filters: DeadLetterFilters = field(default_factory=DeadLetterFilters) + cooldown: dt.timedelta = DEFAULT_AUTO_REQUEUE_COOLDOWN + scan_pages: int = DEFAULT_AUTO_REQUEUE_SCAN_PAGES + apply: bool = False + transient_root_error_classes: frozenset[str] = DEFAULT_TRANSIENT_ROOT_ERROR_CLASSES + now_factory: Callable[[], dt.datetime] = lambda: dt.datetime.now(dt.UTC).replace( + microsecond=0 + ) + + +@dataclass(frozen=True) +class DeadLetterAutoRequeueSummary: + """Public-safe auto-requeue result.""" + + would_move: int = 0 + moved: int = 0 + skipped: dict[str, int] = field(default_factory=dict) + classification_counts: dict[str, int] = field(default_factory=dict) + applied: bool = False + + class DeadLetterRecoveryStore(Protocol): """Storage operations required by dead-letter recovery runtime.""" @@ -97,6 +146,7 @@ def inspect_dead_letter_jobs( limit: int, filters: DeadLetterFilters, updated_before: dt.datetime | None, + page_limit: int | None = None, ) -> list[ScanJob]: """Return a bounded page of dead-letter jobs for safe projection.""" @@ -110,6 +160,15 @@ def requeue_dead_letter_jobs( ): """Conditionally move matching dead-letter jobs back to pending.""" + def auto_requeue_dead_letter_jobs( + self, + *, + jobs: list[ScanJob], + preview_watermark: dt.datetime, + now: dt.datetime, + ): + """Conditionally auto-requeue selected jobs and mark recovery evidence.""" + def inspect_dead_letter_jobs( request: DeadLetterInspectRequest, @@ -178,6 +237,66 @@ def requeue_dead_letter_jobs( ) +def auto_requeue_dead_letter_jobs( + request: DeadLetterAutoRequeueRequest, +) -> DeadLetterAutoRequeueSummary: + """Preview or apply policy-selected transient dead-letter requeue.""" + + _validate_positive("limit", request.limit) + if request.cooldown < dt.timedelta(0): + raise ValueError("cooldown must be non-negative") + _validate_positive("scan_pages", request.scan_pages) + _validate_transient_root_error_classes(request.transient_root_error_classes) + + now = _now(request.now_factory) + cooldown_watermark = now - request.cooldown + jobs = request.store.inspect_dead_letter_jobs( + limit=request.limit, + filters=request.filters, + updated_before=cooldown_watermark, + page_limit=request.scan_pages, + ) + eligible: list[ScanJob] = [] + skipped: Counter[str] = Counter() + for job in jobs: + classification = classify_dead_letter_job(job) + if _ensure_utc(job.updated_at) > cooldown_watermark: + skipped["cooldown"] += 1 + continue + if getattr(job, "auto_requeue_count", 0) >= 1: + skipped["already_auto_requeued"] += 1 + continue + if classification.root_error_class not in request.transient_root_error_classes: + skipped["not_transient"] += 1 + continue + eligible.append(job) + + classification_counts = _root_classification_counts(eligible) + if not request.apply: + return DeadLetterAutoRequeueSummary( + would_move=len(eligible), + moved=0, + skipped=dict(skipped), + classification_counts=classification_counts, + applied=False, + ) + + result = request.store.auto_requeue_dead_letter_jobs( + jobs=eligible, + preview_watermark=cooldown_watermark, + now=now, + ) + moved, apply_skipped = _coerce_requeue_result(result) + skipped.update(apply_skipped) + return DeadLetterAutoRequeueSummary( + would_move=0, + moved=moved, + skipped=dict(skipped), + classification_counts=classification_counts, + applied=True, + ) + + def render_dead_letter_inspect(summary: DeadLetterInspectSummary) -> str: """Render a stable public-safe inspect report.""" @@ -203,6 +322,18 @@ def render_dead_letter_inspect(summary: DeadLetterInspectSummary) -> str: def render_dead_letter_requeue(summary: DeadLetterRequeueSummary) -> str: """Render a stable public-safe requeue report.""" + return _render_requeue_like_summary(summary) + + +def render_dead_letter_auto_requeue(summary: DeadLetterAutoRequeueSummary) -> str: + """Render a stable public-safe auto-requeue report.""" + + return _render_requeue_like_summary(summary) + + +def _render_requeue_like_summary( + summary: DeadLetterRequeueSummary | DeadLetterAutoRequeueSummary, +) -> str: lines = [ f"applied: {'yes' if summary.applied else 'no'}", f"would move: {summary.would_move}", @@ -218,14 +349,40 @@ def render_dead_letter_requeue(summary: DeadLetterRequeueSummary) -> str: def error_class_for_job(job: ScanJob) -> str: """Return the public-safe error class for a dead-letter job.""" + return classify_dead_letter_job(job).error_class + + +def classify_dead_letter_job(job: ScanJob) -> DeadLetterClassification: + """Return terminal and root-cause classification for one dead-letter job.""" + + terminal_reason = _terminal_reason_for_job(job) + root_error_class = _root_error_class_for_job(job) + error_class = ( + terminal_reason + if terminal_reason != TERMINAL_REASON_NONE + else root_error_class + ) + return DeadLetterClassification( + terminal_reason=terminal_reason, + root_error_class=root_error_class, + error_class=error_class, + auto_requeue_eligible=( + root_error_class in DEFAULT_TRANSIENT_ROOT_ERROR_CLASSES + and getattr(job, "auto_requeue_count", 0) < 1 + ), + ) + + +def _terminal_reason_for_job(job: ScanJob) -> str: if job.lease_expiry_count >= job.max_lease_expiries: - return ERROR_CLASS_LEASE_EXPIRED_BUDGET + return TERMINAL_REASON_LEASE_EXPIRED_BUDGET if job.attempts >= job.max_attempts: - return ERROR_CLASS_RETRY_BUDGET_EXHAUSTED + return TERMINAL_REASON_RETRY_BUDGET_EXHAUSTED + return TERMINAL_REASON_NONE + +def _root_error_class_for_job(job: ScanJob) -> str: error = (job.last_error or "").lower() - if "lease expired" in error or "max_lease_expiries" in error: - return ERROR_CLASS_LEASE_EXPIRED_BUDGET if "malformed" in error or "payload" in error or "finding snapshot" in error: return ERROR_CLASS_MALFORMED_VERIFY_JOB if "timeout" in error or "timed out" in error: @@ -239,6 +396,10 @@ def error_class_for_job(job: ScanJob) -> str: return ERROR_CLASS_VERIFIER_TRANSPORT if "scanner" in error or "gitleaks" in error or "scan" in error: return ERROR_CLASS_SCANNER_RUNTIME + if "lease expired" in error or "max_lease_expiries" in error: + return ERROR_CLASS_LEASE_EXPIRED_BUDGET + if job.lease_expiry_count >= job.max_lease_expiries and not error: + return ERROR_CLASS_LEASE_EXPIRED_BUDGET return ERROR_CLASS_UNKNOWN @@ -249,6 +410,13 @@ def _classification_counts(jobs: list[ScanJob]) -> dict[str, int]: return dict(counts) +def _root_classification_counts(jobs: list[ScanJob]) -> dict[str, int]: + counts: Counter[str] = Counter() + for job in jobs: + counts[f"{job.job_type}:{classify_dead_letter_job(job).root_error_class}"] += 1 + return dict(counts) + + def _filter_by_error_class( jobs: list[ScanJob], filters: DeadLetterFilters ) -> list[ScanJob]: @@ -299,6 +467,19 @@ def _validate_positive(name: str, value: int) -> None: raise ValueError(f"{name} must be positive") +def _validate_transient_root_error_classes(root_error_classes: frozenset[str]) -> None: + if not root_error_classes: + raise ValueError("transient_root_error_classes must not be empty") + unsupported = root_error_classes - DEFAULT_TRANSIENT_ROOT_ERROR_CLASSES + if unsupported: + allowed = ", ".join(sorted(DEFAULT_TRANSIENT_ROOT_ERROR_CLASSES)) + invalid = ", ".join(sorted(unsupported)) + raise ValueError( + "transient_root_error_classes must be a subset of " + f"supported transient classes ({allowed}); invalid: {invalid}" + ) + + def _now(now_factory: Callable[[], dt.datetime]) -> dt.datetime: return _ensure_utc(now_factory()).replace(microsecond=0) diff --git a/src/security_scanner/storage/adapters/nosql_db/items.py b/src/security_scanner/storage/adapters/nosql_db/items.py index aa8d03d..91713c5 100644 --- a/src/security_scanner/storage/adapters/nosql_db/items.py +++ b/src/security_scanner/storage/adapters/nosql_db/items.py @@ -707,6 +707,7 @@ def scan_job_to_item(job: ScanJob) -> dict[str, Any]: "leaseExpiryCount": job.lease_expiry_count, "maxLeaseExpiries": job.max_lease_expiries, "findingSnapshot": job.finding_snapshot, + "autoRequeueCount": job.auto_requeue_count, } ) # Hot queue partitions (pending/leased) are sharded (D1); cold terminal @@ -755,6 +756,7 @@ def scan_job_from_item(item: dict[str, Any]) -> ScanJob: lease_expiry_count=int(item.get("leaseExpiryCount", 0)), max_lease_expiries=int(item.get("maxLeaseExpiries", 5)), finding_snapshot=item.get("findingSnapshot"), + auto_requeue_count=int(item.get("autoRequeueCount", 0)), ) diff --git a/src/security_scanner/storage/adapters/nosql_db/store.py b/src/security_scanner/storage/adapters/nosql_db/store.py index 891196e..e9f5b14 100644 --- a/src/security_scanner/storage/adapters/nosql_db/store.py +++ b/src/security_scanner/storage/adapters/nosql_db/store.py @@ -960,6 +960,7 @@ def inspect_dead_letter_jobs( limit: int, filters: Any, updated_before: dt.datetime | None, + page_limit: int | None = None, ) -> list[ScanJob]: """Read a bounded page of dead-letter jobs for public-safe projection.""" @@ -967,7 +968,8 @@ def inspect_dead_letter_jobs( return [] jobs: list[ScanJob] = [] next_key: dict[str, Any] | None = None - while len(jobs) < limit: + pages_read = 0 + while len(jobs) < limit and (page_limit is None or pages_read < page_limit): query_args: dict[str, Any] = { "IndexName": GSI1_NAME, "KeyConditionExpression": "gsi1pk = :pk", @@ -979,6 +981,7 @@ def inspect_dead_letter_jobs( if next_key is not None: query_args["ExclusiveStartKey"] = next_key response = self._table.query(**query_args) + pages_read += 1 for job in items_to_scan_jobs(response.get("Items", [])): if self._dead_letter_job_matches( job, filters=filters, updated_before=updated_before @@ -1008,14 +1011,7 @@ def requeue_dead_letter_jobs( filters=filters, updated_before=preview_watermark, ): - updated = replace( - job, - status=SCAN_JOB_STATUS_PENDING, - worker_id=None, - lease_until=None, - next_attempt_at=_ensure_utc(now), - updated_at=_ensure_utc(now), - ) + updated = self._pending_dead_letter_job(job, now=now) if self._put_requeued_dead_letter_job( updated, preview_watermark=preview_watermark ): @@ -1024,6 +1020,51 @@ def requeue_dead_letter_jobs( skipped["changed"] += 1 return DeadLetterRequeueResult(moved=moved, skipped=dict(skipped)) + def auto_requeue_dead_letter_jobs( + self, + *, + jobs: list[ScanJob], + preview_watermark: dt.datetime, + now: dt.datetime, + ) -> DeadLetterRequeueResult: + """Move selected transient dead-letter jobs to pending once.""" + + moved = 0 + skipped: Counter[str] = Counter() + for job in jobs: + updated = self._pending_dead_letter_job( + job, + now=now, + auto_requeue_count=job.auto_requeue_count + 1, + ) + if self._put_auto_requeued_dead_letter_job( + updated, + original_auto_requeue_count=job.auto_requeue_count, + preview_watermark=preview_watermark, + ): + moved += 1 + else: + skipped["changed"] += 1 + return DeadLetterRequeueResult(moved=moved, skipped=dict(skipped)) + + def _pending_dead_letter_job( + self, + job: ScanJob, + *, + now: dt.datetime, + auto_requeue_count: int | None = None, + ) -> ScanJob: + changes: dict[str, Any] = { + "status": SCAN_JOB_STATUS_PENDING, + "worker_id": None, + "lease_until": None, + "next_attempt_at": _ensure_utc(now), + "updated_at": _ensure_utc(now), + } + if auto_requeue_count is not None: + changes["auto_requeue_count"] = auto_requeue_count + return replace(job, **changes) + def acquire_repo_lease( self, repo_id: str, @@ -1601,6 +1642,50 @@ def _put_requeued_dead_letter_job( return False raise + def _put_auto_requeued_dead_letter_job( + self, + job: ScanJob, + *, + original_auto_requeue_count: int, + preview_watermark: dt.datetime, + ) -> bool: + last_error_condition = "attribute_not_exists(lastError)" + expression_values: dict[str, Any] = { + ":dead_letter": SCAN_JOB_STATUS_DEAD_LETTER, + ":preview_watermark": datetime_to_iso(preview_watermark), + ":auto_requeue_count": original_auto_requeue_count, + ":job_type": job.job_type, + ":attempts": job.attempts, + ":max_attempts": job.max_attempts, + ":lease_expiry_count": job.lease_expiry_count, + ":max_lease_expiries": job.max_lease_expiries, + } + if job.last_error is not None: + last_error_condition = "lastError = :last_error" + expression_values[":last_error"] = job.last_error + try: + self._table.put_item( + Item=scan_job_to_item(job), + ConditionExpression=( + "attribute_exists(PK) AND attribute_exists(SK) AND " + "#status = :dead_letter AND updatedAt <= :preview_watermark " + "AND jobType = :job_type AND attempts = :attempts " + "AND maxAttempts = :max_attempts " + "AND leaseExpiryCount = :lease_expiry_count " + "AND maxLeaseExpiries = :max_lease_expiries " + f"AND {last_error_condition} " + "AND (attribute_not_exists(autoRequeueCount) OR " + "autoRequeueCount = :auto_requeue_count)" + ), + ExpressionAttributeNames={"#status": "status"}, + ExpressionAttributeValues=expression_values, + ) + return True + except Exception as exc: + if _is_conditional_check_failure(exc): + return False + raise + def _put_scan_ledger_if_absent(self, ledger: ScanLedgerEntry) -> None: try: self._table.put_item( diff --git a/src/security_scanner/storage/base.py b/src/security_scanner/storage/base.py index 59412f8..88e5096 100644 --- a/src/security_scanner/storage/base.py +++ b/src/security_scanner/storage/base.py @@ -130,6 +130,9 @@ class ScanJob: # redacted finding snapshot, such as async verifier jobs. This is queue # scheduling state, not repo freshness or durable sync domain data. finding_snapshot: dict[str, Any] | None = None + # Counts automated dead-letter recovery attempts. Manual requeue preserves + # this value; auto-requeue increments it once to enforce the one-shot policy. + auto_requeue_count: int = 0 @property def ledger_key(self) -> ScanLedgerKey: @@ -624,6 +627,7 @@ def inspect_dead_letter_jobs( limit: int, filters: Any, updated_before: dt.datetime | None, + page_limit: int | None = None, ) -> list[ScanJob]: """Return a bounded page of dead-letter jobs for public-safe projection.""" @@ -637,6 +641,15 @@ def requeue_dead_letter_jobs( ) -> DeadLetterRequeueResult: """Conditionally return matching dead-letter jobs to pending.""" + def auto_requeue_dead_letter_jobs( + self, + *, + jobs: list[ScanJob], + preview_watermark: dt.datetime, + now: dt.datetime, + ) -> DeadLetterRequeueResult: + """Conditionally return selected transient dead-letter jobs to pending.""" + @runtime_checkable class ScanRunHealthStore(Protocol): diff --git a/tests/test_cli_dead_letter.py b/tests/test_cli_dead_letter.py index 6ef2be9..930116c 100644 --- a/tests/test_cli_dead_letter.py +++ b/tests/test_cli_dead_letter.py @@ -5,7 +5,10 @@ import datetime as dt from security_scanner.cli import main -from security_scanner.runtime.dead_letter_recovery import DeadLetterFilters +from security_scanner.runtime.dead_letter_recovery import ( + ERROR_CLASS_MALFORMED_VERIFY_JOB, + DeadLetterFilters, +) from security_scanner.runtime.verify_queue import JOB_TYPE_VERIFY from security_scanner.storage.base import ScanJob @@ -15,7 +18,9 @@ class FakeDeadLetterStore: def __init__(self) -> None: self.jobs = [_dead_letter_job()] + self.inspect_calls: list[dict] = [] self.requeue_calls: list[dict] = [] + self.auto_requeue_calls: list[dict] = [] def inspect_dead_letter_jobs( self, @@ -23,7 +28,16 @@ def inspect_dead_letter_jobs( limit: int, filters: DeadLetterFilters, updated_before: dt.datetime | None, + page_limit: int | None = None, ) -> list[ScanJob]: + self.inspect_calls.append( + { + "limit": limit, + "filters": filters, + "updated_before": updated_before, + "page_limit": page_limit, + } + ) return [ job for job in self.jobs[:limit] @@ -48,6 +62,22 @@ def requeue_dead_letter_jobs( ) return {"moved": 1, "skipped": {}} + def auto_requeue_dead_letter_jobs( + self, + *, + jobs: list[ScanJob], + preview_watermark: dt.datetime, + now: dt.datetime, + ): + self.auto_requeue_calls.append( + { + "jobs": jobs, + "preview_watermark": preview_watermark, + "now": now, + } + ) + return {"moved": len(jobs), "skipped": {}} + def test_dead_letter_inspect_cli_renders_public_safe_summary(monkeypatch, capsys): store = FakeDeadLetterStore() @@ -111,6 +141,118 @@ def test_dead_letter_requeue_apply_requires_preview_watermark(capsys): assert "preview-watermark" in captured.err +def test_dead_letter_auto_requeue_cli_dry_run_is_public_safe(monkeypatch, capsys): + store = FakeDeadLetterStore() + monkeypatch.setattr( + "security_scanner.cli._store.create_finding_store", + lambda backend, **kwargs: store, + ) + + exit_code = main( + [ + "dead-letter", + "auto-requeue", + "--limit", + "10", + "--job-type", + JOB_TYPE_VERIFY, + ] + ) + + captured = capsys.readouterr() + assert exit_code == 0 + assert store.auto_requeue_calls == [] + assert "applied: no" in captured.out + assert "would move: 1" in captured.out + assert "verify:verifier-timeout: 1" in captured.out + for private_value in ( + "synthetic-redaction-repo", + "synthetic-redaction-branch", + "worker-redaction-marker", + "redaction-marker.invalid", + "SCANNER_FAKE_SECRET_TOKEN_900001", + "synthetic-redaction/service/config.py", + store.jobs[0].job_id, + ): + assert private_value not in captured.out + + +def test_dead_letter_auto_requeue_cli_apply_is_explicit(monkeypatch, capsys): + store = FakeDeadLetterStore() + monkeypatch.setattr( + "security_scanner.cli._store.create_finding_store", + lambda backend, **kwargs: store, + ) + + exit_code = main( + [ + "dead-letter", + "auto-requeue", + "--limit", + "10", + "--job-type", + JOB_TYPE_VERIFY, + "--apply", + ] + ) + + captured = capsys.readouterr() + assert exit_code == 0 + assert len(store.auto_requeue_calls) == 1 + assert "applied: yes" in captured.out + assert "moved: 1" in captured.out + + +def test_dead_letter_auto_requeue_cli_defaults_to_verify_jobs(monkeypatch, capsys): + store = FakeDeadLetterStore() + monkeypatch.setattr( + "security_scanner.cli._store.create_finding_store", + lambda backend, **kwargs: store, + ) + + exit_code = main(["dead-letter", "auto-requeue", "--limit", "10"]) + + captured = capsys.readouterr() + assert exit_code == 0 + assert store.inspect_calls[0]["filters"].job_type == JOB_TYPE_VERIFY + assert "would move: 1" in captured.out + + +def test_dead_letter_auto_requeue_cli_rejects_non_transient_root_class(capsys): + exit_code = main( + [ + "dead-letter", + "auto-requeue", + "--limit", + "1", + "--root-error-class", + ERROR_CLASS_MALFORMED_VERIFY_JOB, + "--apply", + ] + ) + + captured = capsys.readouterr() + assert exit_code == 2 + assert "transient_root_error_classes" in captured.err + + +def test_dead_letter_auto_requeue_cli_rejects_invalid_cooldown(capsys): + exit_code = main( + [ + "dead-letter", + "auto-requeue", + "--limit", + "1", + "--cooldown-minutes", + "-1", + ] + ) + + captured = capsys.readouterr() + assert exit_code == 2 + assert "cooldown" in captured.err + + def _dead_letter_job() -> ScanJob: return ScanJob( job_id="synthetic-redaction-marker-job-123", @@ -133,9 +275,9 @@ def _dead_letter_job() -> ScanJob: lease_until=None, next_attempt_at=NOW, created_at=NOW - dt.timedelta(hours=2), - updated_at=NOW - dt.timedelta(minutes=10), + updated_at=NOW - dt.timedelta(minutes=45), last_error=( - "HTTPError from https://redaction-marker.invalid/token?" + "Verifier timed out after calling https://redaction-marker.invalid/token?" "marker=SCANNER_FAKE_SECRET_TOKEN_900001" ), job_type=JOB_TYPE_VERIFY, diff --git a/tests/test_dead_letter_recovery.py b/tests/test_dead_letter_recovery.py index 06e14f5..4c61101 100644 --- a/tests/test_dead_letter_recovery.py +++ b/tests/test_dead_letter_recovery.py @@ -7,9 +7,17 @@ import pytest from security_scanner.runtime.dead_letter_recovery import ( + DEFAULT_AUTO_REQUEUE_SCAN_PAGES, + ERROR_CLASS_LEASE_EXPIRED_BUDGET, + ERROR_CLASS_MALFORMED_VERIFY_JOB, + ERROR_CLASS_VERIFIER_TIMEOUT, + TERMINAL_REASON_RETRY_BUDGET_EXHAUSTED, + DeadLetterAutoRequeueRequest, DeadLetterFilters, DeadLetterInspectRequest, DeadLetterRequeueRequest, + auto_requeue_dead_letter_jobs, + classify_dead_letter_job, inspect_dead_letter_jobs, render_dead_letter_inspect, render_dead_letter_requeue, @@ -24,7 +32,9 @@ class FakeDeadLetterStore: def __init__(self, jobs: list[ScanJob]) -> None: self.jobs = jobs + self.inspect_calls: list[dict] = [] self.requeue_calls: list[dict] = [] + self.auto_requeue_calls: list[dict] = [] def inspect_dead_letter_jobs( self, @@ -32,7 +42,16 @@ def inspect_dead_letter_jobs( limit: int, filters: DeadLetterFilters, updated_before: dt.datetime | None, + page_limit: int | None = None, ) -> list[ScanJob]: + self.inspect_calls.append( + { + "limit": limit, + "filters": filters, + "updated_before": updated_before, + "page_limit": page_limit, + } + ) selected = [ job for job in self.jobs @@ -59,6 +78,22 @@ def requeue_dead_letter_jobs( ) return {"moved": min(limit, len(self.jobs)), "skipped": {}} + def auto_requeue_dead_letter_jobs( + self, + *, + jobs: list[ScanJob], + preview_watermark: dt.datetime, + now: dt.datetime, + ): + self.auto_requeue_calls.append( + { + "jobs": jobs, + "preview_watermark": preview_watermark, + "now": now, + } + ) + return {"moved": len(jobs), "skipped": {}} + def test_inspect_renders_aggregate_and_redacted_samples(): private_job = ScanJob( @@ -172,6 +207,225 @@ def test_requeue_apply_requires_preview_watermark(): assert store.requeue_calls == [] +def test_classification_splits_terminal_reason_from_root_error_class(): + job = _dead_letter_job( + "job-timeout", + attempts=3, + max_attempts=3, + last_error="Verifier timed out.", + ) + + classification = classify_dead_letter_job(job) + + assert classification.terminal_reason == TERMINAL_REASON_RETRY_BUDGET_EXHAUSTED + assert classification.root_error_class == ERROR_CLASS_VERIFIER_TIMEOUT + assert classification.error_class == "retry-budget-exhausted" + assert classification.auto_requeue_eligible is True + + +def test_malformed_verify_job_is_never_auto_requeue_eligible(): + job = _dead_letter_job( + "job-malformed", + attempts=3, + max_attempts=3, + last_error="verify job is missing finding snapshot", + ) + + classification = classify_dead_letter_job(job) + + assert classification.root_error_class == ERROR_CLASS_MALFORMED_VERIFY_JOB + assert classification.auto_requeue_eligible is False + + +def test_lease_expired_budget_is_transient_auto_requeue_root_class(): + job = _dead_letter_job( + "job-lease-expired", + lease_expiry_count=5, + max_lease_expiries=5, + last_error=None, + ) + + classification = classify_dead_letter_job(job) + + assert classification.terminal_reason == ERROR_CLASS_LEASE_EXPIRED_BUDGET + assert classification.root_error_class == ERROR_CLASS_LEASE_EXPIRED_BUDGET + assert classification.auto_requeue_eligible is True + + +def test_lease_expired_budget_does_not_override_malformed_verify_evidence(): + job = _dead_letter_job( + "job-lease-expired-malformed", + lease_expiry_count=5, + max_lease_expiries=5, + last_error="verify job is missing finding snapshot", + ) + + classification = classify_dead_letter_job(job) + + assert classification.terminal_reason == ERROR_CLASS_LEASE_EXPIRED_BUDGET + assert classification.root_error_class == ERROR_CLASS_MALFORMED_VERIFY_JOB + assert classification.auto_requeue_eligible is False + + +def test_auto_requeue_rejects_non_transient_root_class_allow_list(): + store = FakeDeadLetterStore([]) + + with pytest.raises(ValueError, match="transient_root_error_classes"): + auto_requeue_dead_letter_jobs( + DeadLetterAutoRequeueRequest( + store=store, + limit=10, + transient_root_error_classes=frozenset( + {ERROR_CLASS_MALFORMED_VERIFY_JOB} + ), + now_factory=lambda: NOW, + ) + ) + + +def test_auto_requeue_dry_run_selects_transient_jobs_after_cooldown(): + old_timeout = _dead_letter_job( + "job-timeout-old", + attempts=3, + max_attempts=3, + last_error="Verifier timed out.", + updated_at=NOW - dt.timedelta(minutes=45), + ) + recent_timeout = _dead_letter_job( + "job-timeout-recent", + attempts=3, + max_attempts=3, + last_error="Verifier timed out.", + updated_at=NOW - dt.timedelta(minutes=5), + ) + malformed = _dead_letter_job( + "job-malformed", + attempts=3, + max_attempts=3, + last_error="verify job is missing finding snapshot", + updated_at=NOW - dt.timedelta(minutes=45), + ) + store = FakeDeadLetterStore([old_timeout, recent_timeout, malformed]) + + summary = auto_requeue_dead_letter_jobs( + DeadLetterAutoRequeueRequest( + store=store, + limit=10, + filters=DeadLetterFilters(job_type=JOB_TYPE_VERIFY), + cooldown=dt.timedelta(minutes=30), + apply=False, + now_factory=lambda: NOW, + ) + ) + + assert summary.applied is False + assert summary.would_move == 1 + assert summary.moved == 0 + assert summary.classification_counts["verify:verifier-timeout"] == 1 + assert summary.skipped.get("cooldown", 0) == 0 + assert summary.skipped["not_transient"] == 1 + assert store.inspect_calls[0]["updated_before"] == NOW - dt.timedelta(minutes=30) + assert store.inspect_calls[0]["page_limit"] == DEFAULT_AUTO_REQUEUE_SCAN_PAGES + assert store.auto_requeue_calls == [] + + +def test_auto_requeue_read_cutoff_prevents_recent_jobs_consuming_limit(): + recent_timeout = _dead_letter_job( + "job-timeout-recent", + attempts=3, + max_attempts=3, + last_error="Verifier timed out.", + updated_at=NOW - dt.timedelta(minutes=5), + ) + old_timeout = _dead_letter_job( + "job-timeout-old", + attempts=3, + max_attempts=3, + last_error="Verifier timed out.", + updated_at=NOW - dt.timedelta(minutes=45), + ) + store = FakeDeadLetterStore([recent_timeout, old_timeout]) + + summary = auto_requeue_dead_letter_jobs( + DeadLetterAutoRequeueRequest( + store=store, + limit=1, + filters=DeadLetterFilters(job_type=JOB_TYPE_VERIFY), + cooldown=dt.timedelta(minutes=30), + apply=False, + now_factory=lambda: NOW, + ) + ) + + assert summary.would_move == 1 + assert store.inspect_calls[0]["updated_before"] == NOW - dt.timedelta(minutes=30) + + +def test_auto_requeue_skips_jobs_already_auto_requeued_once(): + already_requeued = _dead_letter_job( + "job-timeout-requeued", + attempts=3, + max_attempts=3, + last_error="Verifier timed out.", + updated_at=NOW - dt.timedelta(minutes=45), + auto_requeue_count=1, + ) + store = FakeDeadLetterStore([already_requeued]) + + summary = auto_requeue_dead_letter_jobs( + DeadLetterAutoRequeueRequest( + store=store, + limit=10, + filters=DeadLetterFilters(job_type=JOB_TYPE_VERIFY), + cooldown=dt.timedelta(minutes=30), + apply=False, + now_factory=lambda: NOW, + ) + ) + + assert summary.would_move == 0 + assert summary.skipped["already_auto_requeued"] == 1 + + +def test_auto_requeue_apply_uses_selected_jobs_only(): + old_timeout = _dead_letter_job( + "job-timeout-old", + attempts=3, + max_attempts=3, + last_error="Verifier timed out.", + updated_at=NOW - dt.timedelta(minutes=45), + ) + malformed = _dead_letter_job( + "job-malformed", + attempts=3, + max_attempts=3, + last_error="verify job is missing finding snapshot", + updated_at=NOW - dt.timedelta(minutes=45), + ) + store = FakeDeadLetterStore([old_timeout, malformed]) + + summary = auto_requeue_dead_letter_jobs( + DeadLetterAutoRequeueRequest( + store=store, + limit=10, + filters=DeadLetterFilters(job_type=JOB_TYPE_VERIFY), + cooldown=dt.timedelta(minutes=30), + apply=True, + now_factory=lambda: NOW, + ) + ) + + assert summary.applied is True + assert summary.would_move == 0 + assert summary.moved == 1 + assert [job.job_id for job in store.auto_requeue_calls[0]["jobs"]] == [ + old_timeout.job_id + ] + assert store.auto_requeue_calls[0]["preview_watermark"] == NOW - dt.timedelta( + minutes=30 + ) + + def test_inspect_rejects_invalid_bounds(): store = FakeDeadLetterStore([]) @@ -196,7 +450,17 @@ def test_inspect_rejects_invalid_bounds(): ) -def _dead_letter_job(job_id: str) -> ScanJob: +def _dead_letter_job( + job_id: str, + *, + attempts: int = 1, + max_attempts: int = 3, + last_error: str = "malformed verify job payload", + updated_at: dt.datetime | None = None, + auto_requeue_count: int = 0, + lease_expiry_count: int = 0, + max_lease_expiries: int = 5, +) -> ScanJob: return ScanJob( job_id=job_id, repo_id="repo_synthetic", @@ -212,13 +476,16 @@ def _dead_letter_job(job_id: str) -> ScanJob: scanner_config_hash="default", priority=100, status="dead_letter", - attempts=1, - max_attempts=3, + attempts=attempts, + max_attempts=max_attempts, worker_id=None, lease_until=None, next_attempt_at=NOW, created_at=NOW - dt.timedelta(minutes=30), - updated_at=NOW - dt.timedelta(minutes=5), - last_error="malformed verify job payload", + updated_at=updated_at or NOW - dt.timedelta(minutes=5), + last_error=last_error, job_type=JOB_TYPE_VERIFY, + auto_requeue_count=auto_requeue_count, + lease_expiry_count=lease_expiry_count, + max_lease_expiries=max_lease_expiries, ) diff --git a/tests/test_incremental_scan_storage.py b/tests/test_incremental_scan_storage.py index c194eaf..268c466 100644 --- a/tests/test_incremental_scan_storage.py +++ b/tests/test_incremental_scan_storage.py @@ -205,6 +205,42 @@ def _condition_allows(self, kwargs: dict, existing: dict | None) -> bool: and existing.get("status") == values[":dead_letter"] and existing.get("updatedAt", "") <= values[":preview_watermark"] ) + if ( + expression + and "autoRequeueCount = :auto_requeue_count" in expression + and "jobType = :job_type" in expression + ): + last_error_matches = ( + existing is not None + and ( + ( + "lastError = :last_error" in expression + and existing.get("lastError") == values[":last_error"] + ) + or ( + "attribute_not_exists(lastError)" in expression + and "lastError" not in existing + ) + ) + ) + return ( + existing is not None + and existing.get("status") == values[":dead_letter"] + and existing.get("updatedAt", "") <= values[":preview_watermark"] + and existing.get("jobType") == values[":job_type"] + and existing.get("attempts") == values[":attempts"] + and existing.get("maxAttempts") == values[":max_attempts"] + and existing.get("leaseExpiryCount") + == values[":lease_expiry_count"] + and existing.get("maxLeaseExpiries") + == values[":max_lease_expiries"] + and last_error_matches + and ( + "autoRequeueCount" not in existing + or existing.get("autoRequeueCount") + == values[":auto_requeue_count"] + ) + ) if expression == "attribute_not_exists(PK) OR leaseUntil <= :now": return existing is None or existing["leaseUntil"] <= values[":now"] if expression == "workerId = :worker_id": @@ -428,6 +464,16 @@ def test_scan_job_item_round_trips_verify_finding_snapshot(): assert restored.finding_snapshot == finding.to_dict() +def test_scan_job_item_round_trips_auto_requeue_count(): + job = ScanJob(**{**_make_job().__dict__, "auto_requeue_count": 1}) + + item = scan_job_to_item(job) + restored = scan_job_from_item(item) + + assert item["autoRequeueCount"] == 1 + assert restored.auto_requeue_count == 1 + + def test_repo_id_and_job_id_are_deterministic_from_contract_fields(): normalized = "https://github.com/example-org/example-repo" expected_repo_id = ( @@ -945,6 +991,53 @@ def test_inspect_dead_letter_jobs_paginates_until_limit_matches_after_filtering( assert "ExclusiveStartKey" in table.query_calls[-1] +def test_inspect_dead_letter_jobs_respects_optional_page_limit(): + store, table = _make_store() + first_incremental = ScanJob( + **{ + **_make_job( + commit_sha="6" * 40, + next_attempt_at=NOW - dt.timedelta(minutes=5), + ).__dict__, + "status": "dead_letter", + "attempts": 3, + } + ) + second_incremental = ScanJob( + **{ + **_make_job( + commit_sha="7" * 40, + next_attempt_at=NOW - dt.timedelta(minutes=4), + ).__dict__, + "status": "dead_letter", + "attempts": 3, + } + ) + verify_dead_letter = ScanJob( + **{ + **_make_job( + commit_sha="8" * 40, + job_type=JOB_TYPE_VERIFY, + next_attempt_at=NOW - dt.timedelta(minutes=3), + ).__dict__, + "status": "dead_letter", + "attempts": 3, + } + ) + for job in (first_incremental, second_incremental, verify_dead_letter): + table.put_item(Item=scan_job_to_item(job)) + + jobs = store.inspect_dead_letter_jobs( + limit=1, + filters=DeadLetterFilters(job_type=JOB_TYPE_VERIFY), + updated_before=NOW + dt.timedelta(minutes=1), + page_limit=2, + ) + + assert jobs == [] + assert len(table.query_calls) == 2 + + def test_inspect_dead_letter_jobs_normalizes_job_updated_at_before_comparison(): store, table = _make_store() verify_dead_letter = ScanJob( @@ -1013,6 +1106,90 @@ def test_requeue_dead_letter_jobs_preserves_failure_history_and_retry_budgets(): assert updated.finding_snapshot == original.finding_snapshot +def test_auto_requeue_dead_letter_jobs_marks_one_shot_metadata_and_preserves_history(): + store, table = _make_store() + original = ScanJob( + **{ + **_make_job(commit_sha="9" * 40, job_type=JOB_TYPE_VERIFY).__dict__, + "status": "dead_letter", + "attempts": 3, + "max_attempts": 3, + "worker_id": "stale-worker", + "lease_until": NOW - dt.timedelta(minutes=5), + "last_error": "Verifier timed out.", + "fence": 7, + "lease_expiry_count": 1, + "finding_snapshot": {"path": "private/config.py"}, + "auto_requeue_count": 0, + } + ) + table.put_item(Item=scan_job_to_item(original)) + + summary = store.auto_requeue_dead_letter_jobs( + jobs=[original], + preview_watermark=NOW + dt.timedelta(minutes=1), + now=NOW + dt.timedelta(minutes=2), + ) + + updated = scan_job_from_item( + table.get_item(Key={"PK": f"SCAN_JOB#{original.job_id}", "SK": "META"})[ + "Item" + ] + ) + assert summary.moved == 1 + assert summary.skipped == {} + assert updated.status == "pending" + assert updated.worker_id is None + assert updated.lease_until is None + assert updated.next_attempt_at == NOW + dt.timedelta(minutes=2) + assert updated.attempts == original.attempts + assert updated.max_attempts == original.max_attempts + assert updated.lease_expiry_count == original.lease_expiry_count + assert updated.fence == original.fence + assert updated.last_error == original.last_error + assert updated.finding_snapshot == original.finding_snapshot + assert updated.auto_requeue_count == 1 + + +def test_auto_requeue_dead_letter_jobs_skips_changed_classification_inputs(): + store, table = _make_store() + original = ScanJob( + **{ + **_make_job(commit_sha="9" * 40, job_type=JOB_TYPE_VERIFY).__dict__, + "status": "dead_letter", + "attempts": 3, + "max_attempts": 3, + "last_error": "Verifier timed out.", + "auto_requeue_count": 0, + } + ) + table.put_item(Item=scan_job_to_item(original)) + changed = ScanJob( + **{ + **original.__dict__, + "last_error": "verify job is missing finding snapshot", + } + ) + table.put_item(Item=scan_job_to_item(changed)) + + summary = store.auto_requeue_dead_letter_jobs( + jobs=[original], + preview_watermark=NOW + dt.timedelta(minutes=1), + now=NOW + dt.timedelta(minutes=2), + ) + + stored = scan_job_from_item( + table.get_item(Key={"PK": f"SCAN_JOB#{original.job_id}", "SK": "META"})[ + "Item" + ] + ) + assert summary.moved == 0 + assert summary.skipped == {"changed": 1} + assert stored.status == "dead_letter" + assert stored.last_error == changed.last_error + assert stored.auto_requeue_count == 0 + + def test_complete_processed_job_writes_findings_ledger_then_completed_job(): store, table = _make_store() job = _make_job() diff --git a/tests/test_personal_prod_systemd_units.py b/tests/test_personal_prod_systemd_units.py index d77f733..10aea8b 100644 --- a/tests/test_personal_prod_systemd_units.py +++ b/tests/test_personal_prod_systemd_units.py @@ -51,6 +51,11 @@ "security-scanner-personal-ops-snapshot.timer", "*-*-* 09:00:00", ), + "dead-letter-auto-requeue": ( + "security-scanner-personal-dead-letter-auto-requeue.service", + "security-scanner-personal-dead-letter-auto-requeue.timer", + "*:0/30:00", + ), } ALL_SERVICE_FILES = [ @@ -248,6 +253,23 @@ def test_personal_llm_verify_expands_user_path_in_execstart_only() -> None: assert "%h/.local/bin/uv run security-scanner verify-drain" in text +def test_personal_dead_letter_auto_requeue_uses_conservative_verify_defaults() -> None: + text = ( + USER_SYSTEMD_DIR + / "security-scanner-personal-dead-letter-auto-requeue.service" + ).read_text(encoding="utf-8") + + assert "%h/.local/bin/uv run security-scanner dead-letter auto-requeue" in text + assert "--job-type verify" in text + assert ( + "--cooldown-minutes " + "${SECURITY_SCANNER_DEAD_LETTER_AUTO_REQUEUE_COOLDOWN_MINUTES}" + ) in text + assert "--limit ${SECURITY_SCANNER_DEAD_LETTER_AUTO_REQUEUE_LIMIT}" in text + assert "--apply" in text + assert "SECURITY_SCANNER_DEAD_LETTER_AUTO_REQUEUE_COOLDOWN_MINUTES=30" in text + + def test_personal_incr_poll_treats_partial_discovery_as_success() -> None: parser = _parse_unit( USER_SYSTEMD_DIR / "security-scanner-personal-incr-poll.service" From fd4f62eb21654549fdba071ba85b88155db20a6d Mon Sep 17 00:00:00 2001 From: lsh1756 Date: Sat, 27 Jun 2026 18:35:26 +0900 Subject: [PATCH 2/2] =?UTF-8?q?fix:=20dead-letter=20auto-requeue=20?= =?UTF-8?q?=ED=9B=84=EC=86=8D=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- governance/autopilot_goal.yml | 5 +++ .../storage/adapters/nosql_db/store.py | 10 +++--- tests/test_incremental_scan_storage.py | 32 +++++++++++++++++++ 3 files changed, 43 insertions(+), 4 deletions(-) diff --git a/governance/autopilot_goal.yml b/governance/autopilot_goal.yml index 25e2223..ef86fdb 100644 --- a/governance/autopilot_goal.yml +++ b/governance/autopilot_goal.yml @@ -15,7 +15,12 @@ policy_decisions: fork_prs: blocked-or-skipped-before-secrets public_artifacts: synthetic-or-redacted-only allowed_writes: + - deploy/systemd/user/security-scanner-personal-dead-letter-auto-requeue.service + - deploy/systemd/user/security-scanner-personal-dead-letter-auto-requeue.timer - deploy/systemd/user/security-scanner-personal-llm-verify.service + - docs/workbench/specs/dead-letter-auto-requeue/design.md + - docs/workbench/specs/dead-letter-auto-requeue/requirements.html + - docs/workbench/specs/dead-letter-auto-requeue/requirements.md - docs/workbench/specs/dead-letter-queue-recovery/design.md - docs/workbench/specs/dead-letter-queue-recovery/requirements.html - docs/workbench/specs/dead-letter-queue-recovery/requirements.md diff --git a/src/security_scanner/storage/adapters/nosql_db/store.py b/src/security_scanner/storage/adapters/nosql_db/store.py index e9f5b14..f61ac46 100644 --- a/src/security_scanner/storage/adapters/nosql_db/store.py +++ b/src/security_scanner/storage/adapters/nosql_db/store.py @@ -1649,7 +1649,7 @@ def _put_auto_requeued_dead_letter_job( original_auto_requeue_count: int, preview_watermark: dt.datetime, ) -> bool: - last_error_condition = "attribute_not_exists(lastError)" + last_error_condition = "lastError = :last_error" expression_values: dict[str, Any] = { ":dead_letter": SCAN_JOB_STATUS_DEAD_LETTER, ":preview_watermark": datetime_to_iso(preview_watermark), @@ -1659,10 +1659,12 @@ def _put_auto_requeued_dead_letter_job( ":max_attempts": job.max_attempts, ":lease_expiry_count": job.lease_expiry_count, ":max_lease_expiries": job.max_lease_expiries, + ":last_error": job.last_error, } - if job.last_error is not None: - last_error_condition = "lastError = :last_error" - expression_values[":last_error"] = job.last_error + if job.last_error is None: + last_error_condition = ( + "(attribute_not_exists(lastError) OR lastError = :last_error)" + ) try: self._table.put_item( Item=scan_job_to_item(job), diff --git a/tests/test_incremental_scan_storage.py b/tests/test_incremental_scan_storage.py index 268c466..5fe7926 100644 --- a/tests/test_incremental_scan_storage.py +++ b/tests/test_incremental_scan_storage.py @@ -1151,6 +1151,38 @@ def test_auto_requeue_dead_letter_jobs_marks_one_shot_metadata_and_preserves_his assert updated.auto_requeue_count == 1 +def test_auto_requeue_dead_letter_jobs_allows_null_last_error_attribute(): + store, table = _make_store() + original = ScanJob( + **{ + **_make_job(commit_sha="9" * 40, job_type=JOB_TYPE_VERIFY).__dict__, + "status": "dead_letter", + "attempts": 3, + "max_attempts": 3, + "last_error": None, + "auto_requeue_count": 0, + } + ) + table.put_item(Item=scan_job_to_item(original)) + + summary = store.auto_requeue_dead_letter_jobs( + jobs=[original], + preview_watermark=NOW + dt.timedelta(minutes=1), + now=NOW + dt.timedelta(minutes=2), + ) + + updated = scan_job_from_item( + table.get_item(Key={"PK": f"SCAN_JOB#{original.job_id}", "SK": "META"})[ + "Item" + ] + ) + assert summary.moved == 1 + assert summary.skipped == {} + assert updated.status == "pending" + assert updated.last_error is None + assert updated.auto_requeue_count == 1 + + def test_auto_requeue_dead_letter_jobs_skips_changed_classification_inputs(): store, table = _make_store() original = ScanJob(