diff --git a/deploy/systemd/user/security-scanner-personal-dead-letter-auto-requeue.service b/deploy/systemd/user/security-scanner-personal-dead-letter-auto-requeue.service
new file mode 100644
index 0000000..5fed7a0
--- /dev/null
+++ b/deploy/systemd/user/security-scanner-personal-dead-letter-auto-requeue.service
@@ -0,0 +1,27 @@
+[Unit]
+Description=security-scanner personal dead-letter auto-requeue
+Documentation=https://github.com/source-security-dev/security-scanner
+
+[Service]
+Type=oneshot
+Slice=securityscanner.slice
+Nice=15
+IOSchedulingClass=idle
+TasksMax=128
+WorkingDirectory=%h/security-scanner
+EnvironmentFile=-%h/.config/security-scanner/personal-prod.env
+Environment=PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/bin
+Environment=SECURITY_SCANNER_STORAGE_BACKEND=dynamodb
+Environment=SECURITY_SCANNER_DYNAMO_ENDPOINT=http://localhost:4567
+Environment=SECURITY_SCANNER_DYNAMO_TABLE=security_scanner_personal
+Environment=SECURITY_SCANNER_DEAD_LETTER_AUTO_REQUEUE_LIMIT=10
+Environment=SECURITY_SCANNER_DEAD_LETTER_AUTO_REQUEUE_COOLDOWN_MINUTES=30
+ExecStart=/usr/bin/env PATH=%h/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/bin %h/.local/bin/uv run security-scanner dead-letter auto-requeue \
+ --storage-backend ${SECURITY_SCANNER_STORAGE_BACKEND} \
+ --job-type verify \
+ --cooldown-minutes ${SECURITY_SCANNER_DEAD_LETTER_AUTO_REQUEUE_COOLDOWN_MINUTES} \
+ --limit ${SECURITY_SCANNER_DEAD_LETTER_AUTO_REQUEUE_LIMIT} \
+ --apply
+
+[Install]
+WantedBy=default.target
diff --git a/deploy/systemd/user/security-scanner-personal-dead-letter-auto-requeue.timer b/deploy/systemd/user/security-scanner-personal-dead-letter-auto-requeue.timer
new file mode 100644
index 0000000..e274739
--- /dev/null
+++ b/deploy/systemd/user/security-scanner-personal-dead-letter-auto-requeue.timer
@@ -0,0 +1,12 @@
+[Unit]
+Description=Scheduler for security-scanner personal dead-letter auto-requeue
+Documentation=https://github.com/source-security-dev/security-scanner
+
+[Timer]
+OnCalendar=*:0/30:00
+Persistent=true
+RandomizedDelaySec=300
+Unit=security-scanner-personal-dead-letter-auto-requeue.service
+
+[Install]
+WantedBy=timers.target
diff --git a/docs/workbench/specs/dead-letter-auto-requeue/design.md b/docs/workbench/specs/dead-letter-auto-requeue/design.md
new file mode 100644
index 0000000..a4806f0
--- /dev/null
+++ b/docs/workbench/specs/dead-letter-auto-requeue/design.md
@@ -0,0 +1,301 @@
+# Dead-Letter Auto-Requeue Design Spec
+
+## Overview
+
+Dead-letter auto-requeue extends the existing dead-letter recovery module with a
+conservative, timer-ready policy for transient verifier failures. It reuses the
+current DB-backed `SCAN_JOB` queue and manual recovery seam, without adding SQS,
+LocalStack, a new table, or a new GSI.
+
+## Requirements Reference
+
+- Phase 1 source: `requirements.md`
+- Preview companion: `requirements.html`
+- Core scope: one PR, conservative timer-ready auto-requeue.
+- Core policy: verify transient dead-letter jobs only, default cooldown 30
+ minutes, at most one automatic requeue per job.
+- Explicit exclusions: SQS, LocalStack, Kafka-style offsets, outbox framework,
+ new storage tables, new GSIs, broad automatic recovery.
+
+## Approach Proposal
+
+### Selected: extend existing recovery seam
+
+Add auto-requeue behavior to the existing `runtime/dead_letter_recovery.py`
+runtime seam, storage protocol, scan CLI command group, and user systemd unit
+set.
+
+Why this is selected:
+
+- Auto-requeue is a policy-controlled subset of dead-letter recovery, not a new
+ queue subsystem.
+- The current module already owns public-safe classification, bounded reads,
+ guarded requeue, and rendering.
+- Reusing the seam keeps manual and automatic recovery consistent while avoiding
+ duplicate DTOs and storage paths.
+
+### Guardrail inside the selected approach
+
+Do not bolt timer/cooldown conditions into the manual requeue path. The module
+should expose separate use cases:
+
+- manual recovery: operator-selected subset;
+- automatic recovery: policy-selected transient subset;
+- shared primitives: classification, public-safe projection, bounded guarded
+ storage apply.
+
+### Rejected: separate auto-requeue module
+
+A separate module would create a cleaner file split, but it would likely
+duplicate classification, selection, rendering, and guarded storage logic already
+owned by dead-letter recovery.
+
+### Rejected: queue policy layer
+
+A general retry/dead-letter policy layer could be useful later, but this PR does
+not need multi-queue policy orchestration, provider-specific behavior, or a new
+workflow abstraction.
+
+## Architecture
+
+```mermaid
+flowchart TD
+ Timer["systemd user timer"] --> Service["auto-requeue service"]
+ Operator["operator CLI dry-run/apply"] --> CLI["scan dead-letter auto-requeue"]
+ Service --> CLI
+ CLI --> Runtime["runtime.dead_letter_recovery"]
+ Runtime --> Classifier["DeadLetterClassification"]
+ Runtime --> Policy["AutoRequeuePolicy"]
+ Runtime --> Store["IncrementalScanStore dead-letter methods"]
+ Store --> DB["SCAN_JOB status=dead_letter rows"]
+ Policy --> Selection["verify + transient + cooldown + one-shot"]
+ Selection --> GuardedApply["conditional pending transition"]
+ GuardedApply --> Pending["SCAN_JOB status=pending"]
+ Pending --> Drain["verify-drain"]
+```
+
+## Data Flow
+
+### Dry Run
+
+1. CLI builds an `AutoRequeueDeadLetterRequest`.
+2. Runtime reads a bounded dead-letter page using the existing storage seam.
+3. Runtime classifies each job into:
+ - terminal reason;
+ - root error class;
+ - public-safe auto eligibility.
+4. Runtime filters to jobs that match all automatic policy gates.
+5. Runtime returns a public-safe summary with `would_move`, selected root error
+ class counts, and skipped reasons.
+6. No storage mutation occurs.
+
+### Apply
+
+1. The same request runs with explicit apply enabled.
+2. Runtime computes eligible jobs with the same policy as dry run.
+3. Runtime asks storage to move only matching rows from `dead_letter` to
+ `pending`.
+4. Storage applies existing safety guards:
+ - row still exists;
+ - status is still `dead_letter`;
+ - updated time is not newer than the selection watermark;
+ - filter fields still match.
+5. Storage clears execution ownership and sets `next_attempt_at` to apply time.
+6. Storage preserves failure evidence and marks the job as already
+ auto-requeued once.
+7. Runtime returns moved/skipped counts and public-safe classification counts.
+
+### Timer
+
+1. The user-level timer invokes the auto-requeue CLI with conservative defaults.
+2. Default policy targets verify jobs only.
+3. Default cooldown is 30 minutes.
+4. Default limit is small and configurable through unit environment.
+5. Timer files are shipped but not treated as broad auto-recovery being enabled.
+
+## Component Details
+
+### `runtime.dead_letter_recovery`
+
+Add a richer classification DTO while preserving public-safe output.
+
+Conceptual shape:
+
+```text
+DeadLetterClassification
+ terminal_reason
+ root_error_class
+ auto_requeue_eligible
+```
+
+`terminal_reason` explains why the job reached `dead_letter`, for example retry
+budget exhausted or lease expiry budget exhausted.
+
+`root_error_class` explains what kind of failure caused that terminal state, for
+example verifier timeout, verifier transport, malformed verify job, scanner
+runtime, or unknown.
+
+The current single `error_class` rendering can stay for compatibility, but
+auto-requeue selection must use `root_error_class`, not only
+`retry-budget-exhausted`.
+
+Add a dedicated automatic use case:
+
+```text
+auto_requeue_dead_letter_jobs(request) -> DeadLetterAutoRequeueSummary
+```
+
+Responsibilities:
+
+- validate positive limit and non-negative cooldown;
+- select only jobs older than the cooldown;
+- select only allowed job types, defaulting to verify when invoked by timer;
+- select only transient root error classes;
+- skip jobs that were already automatically requeued once;
+- support dry-run and explicit apply;
+- return public-safe moved/would-move/skipped counts.
+
+### Storage
+
+Reuse existing dead-letter inspect and guarded requeue operations, but add enough
+stored evidence to enforce one automatic requeue per job.
+
+The implementation can choose the smallest compatible representation, but it
+must support:
+
+- checking whether a job was already auto-requeued;
+- marking a moved job as auto-requeued once during apply;
+- preserving attempts, max attempts, lease expiry counters, fence, last error,
+ and finding snapshot.
+
+No new table or GSI is introduced. The metadata must live on the existing
+`SCAN_JOB` row or an already-supported extension field.
+
+### CLI
+
+Add a scan queue adjacent command under the existing dead-letter group.
+
+Recommended shape:
+
+```text
+dead-letter auto-requeue
+```
+
+Required behavior:
+
+- dry-run by default;
+- `--apply` required for mutation;
+- bounded `--limit`;
+- `--cooldown-minutes` defaulting to 30;
+- `--job-type` supported, timer defaulting to verify;
+- `--root-error-class` or equivalent filter supported for diagnostics;
+- public-safe output only.
+
+The command should render:
+
+- applied yes/no;
+- would move;
+- moved;
+- selected counts by job type and root error class;
+- skipped counts by public-safe reason.
+
+### Systemd User Units
+
+Add personal user units for conservative timer-ready operation.
+
+Recommended files:
+
+```text
+deploy/systemd/user/security-scanner-personal-dead-letter-auto-requeue.service
+deploy/systemd/user/security-scanner-personal-dead-letter-auto-requeue.timer
+```
+
+The service should invoke the CLI with:
+
+- storage backend from the personal environment;
+- job type verify;
+- cooldown 30 minutes;
+- small limit;
+- explicit apply.
+
+The timer cadence should be slower than the cooldown or otherwise avoid tight
+repeat loops. The unit files should be installable, but enabling remains an
+operator action.
+
+## Error Handling
+
+- Unsupported storage backend exits before mutation.
+- Invalid limit or cooldown exits before mutation.
+- Unknown or permanent root error classes are skipped, not fatal.
+- Jobs updated after selection are skipped by conditional write.
+- Jobs already auto-requeued once are skipped.
+- Empty eligible set is a successful no-op.
+- Storage failures fail the command with a public-safe diagnostic.
+- Raw job id, raw error, private repo/path/ref/commit material, endpoint, prompt,
+ response, and finding snapshot are never printed.
+
+## Testing Strategy
+
+### Runtime tests
+
+- Retry-budget-exhausted timeout jobs expose root error class verifier timeout.
+- Retry-budget-exhausted transport jobs expose root error class verifier
+ transport.
+- Malformed verify jobs are never auto eligible.
+- Unknown jobs are never auto eligible.
+- Cooldown blocks recent dead-letter jobs.
+- A job already auto-requeued once is skipped.
+- Dry-run performs no mutation and reports would-move counts.
+- Apply reports moved and skipped counts.
+
+### Storage tests
+
+- Auto-requeue apply moves eligible dead-letter rows to pending.
+- Apply marks the row as auto-requeued once.
+- Apply preserves failure evidence and retry/lease counters.
+- Apply skips rows changed after selection.
+- Existing manual requeue behavior remains compatible.
+
+### CLI tests
+
+- Auto-requeue dry-run renders public-safe summary.
+- Auto-requeue apply requires explicit flag.
+- Unsupported backend exits without mutation.
+- Cooldown and limit validation reject invalid values.
+- Output excludes raw private or secret-like material.
+
+### Systemd tests
+
+- Personal unit invokes the new CLI command.
+- Personal unit uses verify job type and 30 minute cooldown.
+- Timer exists and is not a broad recovery surface.
+
+## TDD Strategy
+
+Use red -> green -> refactor.
+
+1. Add failing runtime tests for split classification and auto eligibility.
+2. Add failing runtime tests for cooldown, one-shot skip, dry-run, and apply
+ summary.
+3. Add failing storage tests for marking one-shot metadata and preserving
+ existing failure evidence.
+4. Add failing CLI tests for dry-run/apply and validation.
+5. Add failing systemd unit tests for conservative timer-ready wiring.
+6. Implement the smallest runtime, storage, CLI, and unit changes to pass.
+7. Refactor only after focused tests pass.
+
+## Milestones
+
+- M1: Classification split — done when tests prove terminal reason and root
+ error class are distinct and timeout/transport exhausted jobs are eligible.
+- M2: Auto-requeue runtime policy — done when tests prove cooldown, one-shot
+ guard, dry-run, apply, and public-safe skipped counts.
+- M3: Storage evidence — done when tests prove rows can be marked
+ auto-requeued once without resetting failure history.
+- M4: CLI and timer-ready units — done when CLI and systemd tests prove
+ conservative verify-only auto-requeue wiring.
+
+## Open Questions
+
+- None. Implementation may adjust exact flag names if the approved behavior and
+ public-safe contract remain intact.
diff --git a/docs/workbench/specs/dead-letter-auto-requeue/requirements.html b/docs/workbench/specs/dead-letter-auto-requeue/requirements.html
new file mode 100644
index 0000000..554edea
--- /dev/null
+++ b/docs/workbench/specs/dead-letter-auto-requeue/requirements.html
@@ -0,0 +1,42 @@
+
+
+
+
+
+ Dead-Letter Auto-Requeue Requirements
+
+
+
+ Dead-Letter Auto-Requeue Requirements
+ Generated companion preview. Source of truth is requirements.md.
+ 승인 대상
+
+ - Source of truth:
requirements.md
+ - Preview companion:
requirements.html
+
+ 핵심 목표
+ 기존 수동 dead-letter recovery 위에 transient failure 전용 자동 requeue를 단일 PR로 추가한다. 기본 운영 posture는 conservative timer-ready다. SQS, LocalStack, outbox, 새 테이블, 새 GSI는 범위 밖이다.
+ 주요 요구사항
+
+ - Dry-run과 explicit apply를 분리한다.
+ - Limit, cooldown, job type, root error class로 후보를 제한한다.
+ - Verifier timeout, verifier transport, lease-expired budget 계열만 자동 대상으로 삼는다.
+ - Malformed payload, missing snapshot, schema/validation, unknown error는 operator-only로 남긴다.
+ - Public-safe aggregate와 skipped counts만 출력한다.
+ - Systemd user timer에서 작은 limit으로 주기 실행할 수 있어야 한다.
+ - Timer-ready 산출물은 제공하되, 운영자가 별도로 enable하기 전까지 broad auto-recovery를 전제로 하지 않는다.
+ - 같은 job에는 자동 재투입 기회를 최대 1회만 준다.
+ - 기본 cooldown은 30분이다.
+
+ 미결정 항목
+ 없음. requirements.md 승인 후 Phase 2에서 design spec을 작성한다.
+
+
diff --git a/docs/workbench/specs/dead-letter-auto-requeue/requirements.md b/docs/workbench/specs/dead-letter-auto-requeue/requirements.md
new file mode 100644
index 0000000..b8e9264
--- /dev/null
+++ b/docs/workbench/specs/dead-letter-auto-requeue/requirements.md
@@ -0,0 +1,112 @@
+# Dead-Letter Auto-Requeue Requirements
+
+## 승인 대상
+
+- Source of truth: `requirements.md`
+- Preview companion: `requirements.html`
+
+## 질문-답변 흐름
+
+### Q: 이번 단일 PR의 목표는 무엇인가?
+
+Transient dead-letter auto-requeue로 잡는다.
+
+기존 수동 dead-letter inspect/requeue는 유지한다. 이번 PR은 과부하, 일시 LLM timeout, transport 장애, lease expiry budget 소진처럼 시간이 지나면 회복될 수 있는 dead-letter job만 제한적으로 다시 `pending`에 올려 운영자 반복 작업을 줄인다.
+
+배제한 선택지는 다음과 같다.
+
+- 모든 dead-letter 자동 재투입: poison job loop와 verifier 비용 폭증 위험이 크다.
+- SQS/LocalStack/outbox 도입: 현재 scanner queue 요구사항보다 무겁고 이번 문제의 직접 원인이 아니다.
+- 수동 requeue만 유지: 안전하지만 과부하성 transient dead-letter가 반복될 때 운영자가 같은 작업을 계속 수행해야 한다.
+
+### Q: 자동 requeue의 기본 운영 posture를 어디까지 열 것인가?
+
+Conservative timer-ready로 확정한다.
+
+이번 PR은 CLI와 systemd user service/timer 파일을 제공하되, 자동 requeue 대상은 verify job의 transient failure subset으로 좁히고 small limit과 cooldown을 기본 전제로 둔다. Timer 파일은 배포 가능해야 하지만 broad auto-recovery를 기본값으로 만들지 않는다. 실제 enable 여부와 cadence 조정은 운영자가 별도 판단한다.
+
+배제한 선택지는 다음과 같다.
+
+- Controlled auto-on: 설치 즉시 timer가 주기 실행되어 편하지만, transient 분류 오류나 endpoint 장애가 남아 있을 때 자동 재투입이 바로 반복될 수 있다.
+- Manual-first: 분류 개선과 manual command까지만 다루면 가장 안전하지만, 이번 단일 PR에서 timer-ready 자동화까지 갖추려는 목표에는 못 미친다.
+
+### Q: 같은 dead-letter job에 자동 재투입 기회를 몇 번까지 줄 것인가?
+
+최대 1회로 확정한다.
+
+Transient failure로 분류된 job은 cooldown 이후 자동으로 한 번만 `pending`에 복귀할 수 있다. 같은 job이 다시 dead-letter 되면 자동화는 멈추고 operator-only 상태로 남긴다. 이 정책은 과부하성 일시 장애에는 회복 기회를 주되, 잘못 분류된 poison job이나 계속 불안정한 verifier endpoint가 자동 루프를 만드는 것을 막는다.
+
+배제한 선택지는 다음과 같다.
+
+- 최대 3회: 일시 장애 회복 가능성은 높지만 endpoint 문제가 지속될 때 자동 재투입이 길게 반복된다.
+- 기존 retry budget만 사용: 구현은 작지만 dead-letter 이후 자동 재투입 횟수를 별도로 제한하지 못해 cooldown마다 반복될 수 있다.
+
+### Q: 자동 requeue의 cooldown 기본값을 얼마로 둘 것인가?
+
+30분으로 확정한다.
+
+Dead-letter 직후 즉시 재투입하지 않고 30분을 기다린 뒤 transient 후보만 자동 requeue 대상으로 삼는다. 이 값은 과부하나 verifier endpoint 흔들림이 가라앉을 시간을 주면서, 2시간 이상 지연되는 보수 정책보다 backlog 회복을 빠르게 시작한다.
+
+배제한 선택지는 다음과 같다.
+
+- 2시간: 더 보수적이지만 transient 복구와 backlog drain 시작이 늦어진다.
+- 운영자가 항상 지정: 실수 방지는 강하지만 timer-ready 기본 운영 경험이 나빠진다.
+
+## 기능 요구사항
+
+- 운영자는 transient dead-letter만 대상으로 하는 자동 requeue를 dry-run으로 확인할 수 있어야 한다.
+- 운영자는 명시적 apply 모드에서만 자동 requeue 후보를 실제 `pending` 상태로 되돌릴 수 있어야 한다.
+- 자동 requeue는 기존 수동 `dead-letter inspect` / `dead-letter requeue` 흐름과 같은 public-safe 출력 원칙을 지켜야 한다.
+- 자동 requeue는 한 번에 처리할 최대 job 수를 반드시 가져야 한다.
+- 자동 requeue는 dead-letter가 된 뒤 일정 cooldown이 지난 job만 대상으로 해야 한다.
+- 자동 requeue의 기본 cooldown은 30분이어야 한다.
+- 자동 requeue는 job type filter를 지원해야 하며, 첫 운영 대상은 verify job으로 제한할 수 있어야 한다.
+- 자동 requeue는 root error class가 transient인 job만 대상으로 해야 한다.
+- Transient root error class는 최소 verifier timeout, verifier transport, lease-expired budget 계열을 포함해야 한다.
+- 자동 requeue는 malformed verify payload, missing finding snapshot, schema/validation 문제, unknown error를 자동 대상에서 제외해야 한다.
+- 자동 requeue는 기존 실패 이력과 retry/lease budget evidence를 public-safe하게 보존해야 한다.
+- 자동 requeue는 같은 job에 최대 1회만 실행 기회를 줘야 한다.
+- 자동 requeue 이후 같은 job이 다시 dead-letter 되면 자동 대상에서 제외하고 operator-only 상태로 남겨야 한다.
+- 자동 requeue는 자동 재투입 여부를 판단할 public-safe evidence를 저장하거나 유지해야 한다.
+- 자동 requeue는 이미 변경된 job, 더 이상 filter와 맞지 않는 job, preview 기준 이후 새로 dead-letter 된 job을 임의로 움직이지 않아야 한다.
+- 자동 requeue 결과는 moved, would move, skipped, selected root error class counts를 public-safe aggregate로 보여줘야 한다.
+- 자동 requeue는 기본적으로 systemd user timer에서 주기 실행할 수 있어야 한다.
+- Timer는 처음부터 broad auto-recovery를 수행하지 않고 verify transient subset을 작은 limit으로 처리할 수 있어야 한다.
+- Timer-ready 산출물은 제공하되, 운영자가 별도로 enable하기 전까지 broad auto-recovery를 전제로 하지 않아야 한다.
+- 기존 manual inspect/requeue command는 계속 사용할 수 있어야 한다.
+
+## 비기능 요구사항
+
+| 항목 | 요구값 |
+| --- | --- |
+| Public safety | committed file, CLI output, timer output에는 raw secret, private repo/path/ref, endpoint, credential-like value, raw prompt/response, raw `last_error`, raw `finding_snapshot`을 노출하지 않는다. |
+| Operational safety | 자동 requeue는 bounded, cooldown-gated, class-gated, explicit apply-gated여야 한다. |
+| Poison-loop guard | permanent 또는 unknown failure는 자동 재투입하지 않는다. |
+| Scope control | SQS, LocalStack, Kafka-style offset, 새 테이블, 새 GSI, heavy outbox는 이번 PR 범위가 아니다. |
+| Compatibility | 기존 DB-backed `SCAN_JOB` queue와 수동 dead-letter recovery semantics를 깨지 않는다. |
+| Observability | 자동 requeue는 사람이 원인을 파악할 수 있는 public-safe aggregate와 skipped counts를 남겨야 한다. |
+| TDD | root classification, candidate selection, dry-run/apply, cooldown, timer unit을 focused tests로 검증한다. |
+
+## 사용자 시나리오
+
+- LLM verifier throughput보다 verify job 유입이 빨라 timeout성 dead-letter가 쌓인다.
+- 유입이 멈춘 뒤 cooldown이 지나면 auto-requeue가 timeout/transport 계열 verify job 일부를 `pending`으로 되돌린다.
+- 다음 verify-drain이 되돌린 job을 다시 처리한다.
+- 성공한 job은 completed/disposition write로 끝난다.
+- 계속 실패하는 job은 다시 dead-letter가 되며, 자동 정책 한계 밖이면 운영자가 inspect한다.
+- malformed payload나 missing finding snapshot 같은 permanent job은 자동 requeue 대상에서 제외되어 계속 operator-only 상태로 남는다.
+
+## 코드 기반 사실
+
+- 기존 `runtime/dead_letter_recovery.py`는 public-safe inspect/requeue DTO와 renderer를 가진다.
+- 기존 `DeadLetterFilters`는 `job_type`과 `error_class` filter를 가진다.
+- 기존 `error_class_for_job`은 lease expiry budget, retry budget exhausted, malformed verify job, verifier transport, verifier timeout, scanner runtime, unknown을 coarse bucket으로 분류한다.
+- 현재 분류는 attempts budget exhausted가 root cause보다 먼저 적용될 수 있어, timeout으로 retry budget이 소진된 job이 `retry-budget-exhausted`로 뭉뚱그려질 수 있다.
+- 기존 NoSQL store는 bounded dead-letter inspect와 conditional requeue apply를 제공한다.
+- 기존 requeue는 status가 여전히 `dead_letter`이고 preview watermark 이전 job만 `pending`으로 되돌린다.
+- 기존 CLI는 `dead-letter inspect`와 `dead-letter requeue`를 제공한다.
+- 기존 personal verifier는 `verify-drain` systemd user service/timer로 동작한다.
+
+## 미결정 항목
+
+- 없음. `requirements.md` 승인 후 Phase 2에서 접근 후보와 detailed design을 작성한다.
diff --git a/governance/autopilot_goal.yml b/governance/autopilot_goal.yml
index 25e2223..ef86fdb 100644
--- a/governance/autopilot_goal.yml
+++ b/governance/autopilot_goal.yml
@@ -15,7 +15,12 @@ policy_decisions:
fork_prs: blocked-or-skipped-before-secrets
public_artifacts: synthetic-or-redacted-only
allowed_writes:
+ - deploy/systemd/user/security-scanner-personal-dead-letter-auto-requeue.service
+ - deploy/systemd/user/security-scanner-personal-dead-letter-auto-requeue.timer
- deploy/systemd/user/security-scanner-personal-llm-verify.service
+ - docs/workbench/specs/dead-letter-auto-requeue/design.md
+ - docs/workbench/specs/dead-letter-auto-requeue/requirements.html
+ - docs/workbench/specs/dead-letter-auto-requeue/requirements.md
- docs/workbench/specs/dead-letter-queue-recovery/design.md
- docs/workbench/specs/dead-letter-queue-recovery/requirements.html
- docs/workbench/specs/dead-letter-queue-recovery/requirements.md
diff --git a/src/security_scanner/cli/commands/scan.py b/src/security_scanner/cli/commands/scan.py
index 5af63a8..a6f8171 100644
--- a/src/security_scanner/cli/commands/scan.py
+++ b/src/security_scanner/cli/commands/scan.py
@@ -39,10 +39,15 @@
residual_for_repo,
)
from security_scanner.runtime.dead_letter_recovery import (
+ DEFAULT_AUTO_REQUEUE_COOLDOWN,
+ DEFAULT_TRANSIENT_ROOT_ERROR_CLASSES,
+ DeadLetterAutoRequeueRequest,
DeadLetterFilters,
DeadLetterInspectRequest,
DeadLetterRequeueRequest,
+ auto_requeue_dead_letter_jobs,
inspect_dead_letter_jobs,
+ render_dead_letter_auto_requeue,
render_dead_letter_inspect,
render_dead_letter_requeue,
requeue_dead_letter_jobs,
@@ -95,6 +100,7 @@
resolve_verifier_config,
)
from security_scanner.runtime.verify_queue import (
+ JOB_TYPE_VERIFY,
HistoricalVerifyBackfillSummary,
backfill_historical_verify_jobs,
enqueue_historical_verify_jobs,
@@ -451,6 +457,40 @@ def register(subparsers) -> None:
add_incremental_storage_args(dead_letter_requeue_parser)
dead_letter_requeue_parser.set_defaults(func=cmd_dead_letter_requeue)
+ dead_letter_auto_requeue_parser = dead_letter_subparsers.add_parser(
+ "auto-requeue",
+ help="Preview or apply transient dead-letter auto-requeue.",
+ )
+ dead_letter_auto_requeue_parser.add_argument(
+ "--limit",
+ type=int,
+ required=True,
+ metavar="N",
+ help="Maximum dead-letter jobs to inspect for auto-requeue.",
+ )
+ dead_letter_auto_requeue_parser.add_argument(
+ "--cooldown-minutes",
+ type=float,
+ default=DEFAULT_AUTO_REQUEUE_COOLDOWN.total_seconds() / 60,
+ metavar="N",
+ help="Minimum dead-letter age before auto-requeue (default: 30).",
+ )
+ dead_letter_auto_requeue_parser.add_argument(
+ "--root-error-class",
+ action="append",
+ default=None,
+ metavar="CLASS",
+ help="Transient root error class to include; repeat for multiple classes.",
+ )
+ dead_letter_auto_requeue_parser.add_argument(
+ "--apply",
+ action="store_true",
+ help="Actually move eligible transient jobs to pending.",
+ )
+ _add_dead_letter_filter_args(dead_letter_auto_requeue_parser)
+ add_incremental_storage_args(dead_letter_auto_requeue_parser)
+ dead_letter_auto_requeue_parser.set_defaults(func=cmd_dead_letter_auto_requeue)
+
residual_parser = subparsers.add_parser(
"residual",
help="Show per-branch residual findings for a repository.",
@@ -1101,6 +1141,61 @@ def cmd_dead_letter_requeue(args: argparse.Namespace) -> int:
return 0
+def cmd_dead_letter_auto_requeue(args: argparse.Namespace) -> int:
+ """Preview or apply transient dead-letter auto-requeue."""
+
+ if args.storage_backend != "dynamodb":
+ print(
+ "error: dead-letter auto-requeue supports --storage-backend dynamodb only",
+ file=sys.stderr,
+ )
+ return 2
+ transient_root_error_classes = (
+ frozenset(args.root_error_class) if args.root_error_class else None
+ )
+ try:
+ request_kwargs = {}
+ if transient_root_error_classes is not None:
+ unsupported = (
+ transient_root_error_classes - DEFAULT_TRANSIENT_ROOT_ERROR_CLASSES
+ )
+ if unsupported:
+ allowed = ", ".join(sorted(DEFAULT_TRANSIENT_ROOT_ERROR_CLASSES))
+ invalid = ", ".join(sorted(unsupported))
+ raise ValueError(
+ "transient_root_error_classes must be a subset of "
+ f"supported transient classes ({allowed}); invalid: {invalid}"
+ )
+ request_kwargs["transient_root_error_classes"] = (
+ transient_root_error_classes
+ )
+ filters = _dead_letter_filters_from_args(args)
+ if filters.job_type is None:
+ filters = DeadLetterFilters(
+ job_type=JOB_TYPE_VERIFY,
+ error_class=filters.error_class,
+ )
+ summary = auto_requeue_dead_letter_jobs(
+ DeadLetterAutoRequeueRequest(
+ store=store_from_args(args),
+ limit=args.limit,
+ filters=filters,
+ cooldown=dt.timedelta(minutes=args.cooldown_minutes),
+ apply=args.apply,
+ **request_kwargs,
+ )
+ )
+ except ValueError as exc:
+ print(f"error: {exc}", file=sys.stderr)
+ return 2
+ except Exception as exc: # noqa: BLE001 - fatal storage/runtime error.
+ print(f"error: dead-letter auto-requeue failed: {exc}", file=sys.stderr)
+ return 1
+
+ print(render_dead_letter_auto_requeue(summary), end="")
+ return 0
+
+
def _add_dead_letter_filter_args(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--job-type",
diff --git a/src/security_scanner/runtime/dead_letter_recovery.py b/src/security_scanner/runtime/dead_letter_recovery.py
index 7ed8952..5f059a1 100644
--- a/src/security_scanner/runtime/dead_letter_recovery.py
+++ b/src/security_scanner/runtime/dead_letter_recovery.py
@@ -18,6 +18,18 @@
ERROR_CLASS_VERIFIER_TIMEOUT = "verifier-timeout"
ERROR_CLASS_SCANNER_RUNTIME = "scanner-runtime"
ERROR_CLASS_UNKNOWN = "unknown"
+TERMINAL_REASON_RETRY_BUDGET_EXHAUSTED = ERROR_CLASS_RETRY_BUDGET_EXHAUSTED
+TERMINAL_REASON_LEASE_EXPIRED_BUDGET = ERROR_CLASS_LEASE_EXPIRED_BUDGET
+TERMINAL_REASON_NONE = "none"
+DEFAULT_AUTO_REQUEUE_COOLDOWN = dt.timedelta(minutes=30)
+DEFAULT_AUTO_REQUEUE_SCAN_PAGES = 10
+DEFAULT_TRANSIENT_ROOT_ERROR_CLASSES = frozenset(
+ {
+ ERROR_CLASS_LEASE_EXPIRED_BUDGET,
+ ERROR_CLASS_VERIFIER_TIMEOUT,
+ ERROR_CLASS_VERIFIER_TRANSPORT,
+ }
+)
@dataclass(frozen=True)
@@ -40,6 +52,16 @@ class DeadLetterSample:
age: str
+@dataclass(frozen=True)
+class DeadLetterClassification:
+ """Public-safe dead-letter classification for recovery decisions."""
+
+ terminal_reason: str
+ root_error_class: str
+ error_class: str
+ auto_requeue_eligible: bool
+
+
@dataclass(frozen=True)
class DeadLetterInspectRequest:
"""Inputs for one public-safe dead-letter inspect read."""
@@ -88,6 +110,33 @@ class DeadLetterRequeueSummary:
applied: bool = False
+@dataclass(frozen=True)
+class DeadLetterAutoRequeueRequest:
+ """Inputs for policy-selected transient dead-letter auto-requeue."""
+
+ store: DeadLetterRecoveryStore
+ limit: int
+ filters: DeadLetterFilters = field(default_factory=DeadLetterFilters)
+ cooldown: dt.timedelta = DEFAULT_AUTO_REQUEUE_COOLDOWN
+ scan_pages: int = DEFAULT_AUTO_REQUEUE_SCAN_PAGES
+ apply: bool = False
+ transient_root_error_classes: frozenset[str] = DEFAULT_TRANSIENT_ROOT_ERROR_CLASSES
+ now_factory: Callable[[], dt.datetime] = lambda: dt.datetime.now(dt.UTC).replace(
+ microsecond=0
+ )
+
+
+@dataclass(frozen=True)
+class DeadLetterAutoRequeueSummary:
+ """Public-safe auto-requeue result."""
+
+ would_move: int = 0
+ moved: int = 0
+ skipped: dict[str, int] = field(default_factory=dict)
+ classification_counts: dict[str, int] = field(default_factory=dict)
+ applied: bool = False
+
+
class DeadLetterRecoveryStore(Protocol):
"""Storage operations required by dead-letter recovery runtime."""
@@ -97,6 +146,7 @@ def inspect_dead_letter_jobs(
limit: int,
filters: DeadLetterFilters,
updated_before: dt.datetime | None,
+ page_limit: int | None = None,
) -> list[ScanJob]:
"""Return a bounded page of dead-letter jobs for safe projection."""
@@ -110,6 +160,15 @@ def requeue_dead_letter_jobs(
):
"""Conditionally move matching dead-letter jobs back to pending."""
+ def auto_requeue_dead_letter_jobs(
+ self,
+ *,
+ jobs: list[ScanJob],
+ preview_watermark: dt.datetime,
+ now: dt.datetime,
+ ):
+ """Conditionally auto-requeue selected jobs and mark recovery evidence."""
+
def inspect_dead_letter_jobs(
request: DeadLetterInspectRequest,
@@ -178,6 +237,66 @@ def requeue_dead_letter_jobs(
)
+def auto_requeue_dead_letter_jobs(
+ request: DeadLetterAutoRequeueRequest,
+) -> DeadLetterAutoRequeueSummary:
+ """Preview or apply policy-selected transient dead-letter requeue."""
+
+ _validate_positive("limit", request.limit)
+ if request.cooldown < dt.timedelta(0):
+ raise ValueError("cooldown must be non-negative")
+ _validate_positive("scan_pages", request.scan_pages)
+ _validate_transient_root_error_classes(request.transient_root_error_classes)
+
+ now = _now(request.now_factory)
+ cooldown_watermark = now - request.cooldown
+ jobs = request.store.inspect_dead_letter_jobs(
+ limit=request.limit,
+ filters=request.filters,
+ updated_before=cooldown_watermark,
+ page_limit=request.scan_pages,
+ )
+ eligible: list[ScanJob] = []
+ skipped: Counter[str] = Counter()
+ for job in jobs:
+ classification = classify_dead_letter_job(job)
+ if _ensure_utc(job.updated_at) > cooldown_watermark:
+ skipped["cooldown"] += 1
+ continue
+ if getattr(job, "auto_requeue_count", 0) >= 1:
+ skipped["already_auto_requeued"] += 1
+ continue
+ if classification.root_error_class not in request.transient_root_error_classes:
+ skipped["not_transient"] += 1
+ continue
+ eligible.append(job)
+
+ classification_counts = _root_classification_counts(eligible)
+ if not request.apply:
+ return DeadLetterAutoRequeueSummary(
+ would_move=len(eligible),
+ moved=0,
+ skipped=dict(skipped),
+ classification_counts=classification_counts,
+ applied=False,
+ )
+
+ result = request.store.auto_requeue_dead_letter_jobs(
+ jobs=eligible,
+ preview_watermark=cooldown_watermark,
+ now=now,
+ )
+ moved, apply_skipped = _coerce_requeue_result(result)
+ skipped.update(apply_skipped)
+ return DeadLetterAutoRequeueSummary(
+ would_move=0,
+ moved=moved,
+ skipped=dict(skipped),
+ classification_counts=classification_counts,
+ applied=True,
+ )
+
+
def render_dead_letter_inspect(summary: DeadLetterInspectSummary) -> str:
"""Render a stable public-safe inspect report."""
@@ -203,6 +322,18 @@ def render_dead_letter_inspect(summary: DeadLetterInspectSummary) -> str:
def render_dead_letter_requeue(summary: DeadLetterRequeueSummary) -> str:
"""Render a stable public-safe requeue report."""
+ return _render_requeue_like_summary(summary)
+
+
+def render_dead_letter_auto_requeue(summary: DeadLetterAutoRequeueSummary) -> str:
+ """Render a stable public-safe auto-requeue report."""
+
+ return _render_requeue_like_summary(summary)
+
+
+def _render_requeue_like_summary(
+ summary: DeadLetterRequeueSummary | DeadLetterAutoRequeueSummary,
+) -> str:
lines = [
f"applied: {'yes' if summary.applied else 'no'}",
f"would move: {summary.would_move}",
@@ -218,14 +349,40 @@ def render_dead_letter_requeue(summary: DeadLetterRequeueSummary) -> str:
def error_class_for_job(job: ScanJob) -> str:
"""Return the public-safe error class for a dead-letter job."""
+ return classify_dead_letter_job(job).error_class
+
+
+def classify_dead_letter_job(job: ScanJob) -> DeadLetterClassification:
+ """Return terminal and root-cause classification for one dead-letter job."""
+
+ terminal_reason = _terminal_reason_for_job(job)
+ root_error_class = _root_error_class_for_job(job)
+ error_class = (
+ terminal_reason
+ if terminal_reason != TERMINAL_REASON_NONE
+ else root_error_class
+ )
+ return DeadLetterClassification(
+ terminal_reason=terminal_reason,
+ root_error_class=root_error_class,
+ error_class=error_class,
+ auto_requeue_eligible=(
+ root_error_class in DEFAULT_TRANSIENT_ROOT_ERROR_CLASSES
+ and getattr(job, "auto_requeue_count", 0) < 1
+ ),
+ )
+
+
+def _terminal_reason_for_job(job: ScanJob) -> str:
if job.lease_expiry_count >= job.max_lease_expiries:
- return ERROR_CLASS_LEASE_EXPIRED_BUDGET
+ return TERMINAL_REASON_LEASE_EXPIRED_BUDGET
if job.attempts >= job.max_attempts:
- return ERROR_CLASS_RETRY_BUDGET_EXHAUSTED
+ return TERMINAL_REASON_RETRY_BUDGET_EXHAUSTED
+ return TERMINAL_REASON_NONE
+
+def _root_error_class_for_job(job: ScanJob) -> str:
error = (job.last_error or "").lower()
- if "lease expired" in error or "max_lease_expiries" in error:
- return ERROR_CLASS_LEASE_EXPIRED_BUDGET
if "malformed" in error or "payload" in error or "finding snapshot" in error:
return ERROR_CLASS_MALFORMED_VERIFY_JOB
if "timeout" in error or "timed out" in error:
@@ -239,6 +396,10 @@ def error_class_for_job(job: ScanJob) -> str:
return ERROR_CLASS_VERIFIER_TRANSPORT
if "scanner" in error or "gitleaks" in error or "scan" in error:
return ERROR_CLASS_SCANNER_RUNTIME
+ if "lease expired" in error or "max_lease_expiries" in error:
+ return ERROR_CLASS_LEASE_EXPIRED_BUDGET
+ if job.lease_expiry_count >= job.max_lease_expiries and not error:
+ return ERROR_CLASS_LEASE_EXPIRED_BUDGET
return ERROR_CLASS_UNKNOWN
@@ -249,6 +410,13 @@ def _classification_counts(jobs: list[ScanJob]) -> dict[str, int]:
return dict(counts)
+def _root_classification_counts(jobs: list[ScanJob]) -> dict[str, int]:
+ counts: Counter[str] = Counter()
+ for job in jobs:
+ counts[f"{job.job_type}:{classify_dead_letter_job(job).root_error_class}"] += 1
+ return dict(counts)
+
+
def _filter_by_error_class(
jobs: list[ScanJob], filters: DeadLetterFilters
) -> list[ScanJob]:
@@ -299,6 +467,19 @@ def _validate_positive(name: str, value: int) -> None:
raise ValueError(f"{name} must be positive")
+def _validate_transient_root_error_classes(root_error_classes: frozenset[str]) -> None:
+ if not root_error_classes:
+ raise ValueError("transient_root_error_classes must not be empty")
+ unsupported = root_error_classes - DEFAULT_TRANSIENT_ROOT_ERROR_CLASSES
+ if unsupported:
+ allowed = ", ".join(sorted(DEFAULT_TRANSIENT_ROOT_ERROR_CLASSES))
+ invalid = ", ".join(sorted(unsupported))
+ raise ValueError(
+ "transient_root_error_classes must be a subset of "
+ f"supported transient classes ({allowed}); invalid: {invalid}"
+ )
+
+
def _now(now_factory: Callable[[], dt.datetime]) -> dt.datetime:
return _ensure_utc(now_factory()).replace(microsecond=0)
diff --git a/src/security_scanner/storage/adapters/nosql_db/items.py b/src/security_scanner/storage/adapters/nosql_db/items.py
index aa8d03d..91713c5 100644
--- a/src/security_scanner/storage/adapters/nosql_db/items.py
+++ b/src/security_scanner/storage/adapters/nosql_db/items.py
@@ -707,6 +707,7 @@ def scan_job_to_item(job: ScanJob) -> dict[str, Any]:
"leaseExpiryCount": job.lease_expiry_count,
"maxLeaseExpiries": job.max_lease_expiries,
"findingSnapshot": job.finding_snapshot,
+ "autoRequeueCount": job.auto_requeue_count,
}
)
# Hot queue partitions (pending/leased) are sharded (D1); cold terminal
@@ -755,6 +756,7 @@ def scan_job_from_item(item: dict[str, Any]) -> ScanJob:
lease_expiry_count=int(item.get("leaseExpiryCount", 0)),
max_lease_expiries=int(item.get("maxLeaseExpiries", 5)),
finding_snapshot=item.get("findingSnapshot"),
+ auto_requeue_count=int(item.get("autoRequeueCount", 0)),
)
diff --git a/src/security_scanner/storage/adapters/nosql_db/store.py b/src/security_scanner/storage/adapters/nosql_db/store.py
index 891196e..f61ac46 100644
--- a/src/security_scanner/storage/adapters/nosql_db/store.py
+++ b/src/security_scanner/storage/adapters/nosql_db/store.py
@@ -960,6 +960,7 @@ def inspect_dead_letter_jobs(
limit: int,
filters: Any,
updated_before: dt.datetime | None,
+ page_limit: int | None = None,
) -> list[ScanJob]:
"""Read a bounded page of dead-letter jobs for public-safe projection."""
@@ -967,7 +968,8 @@ def inspect_dead_letter_jobs(
return []
jobs: list[ScanJob] = []
next_key: dict[str, Any] | None = None
- while len(jobs) < limit:
+ pages_read = 0
+ while len(jobs) < limit and (page_limit is None or pages_read < page_limit):
query_args: dict[str, Any] = {
"IndexName": GSI1_NAME,
"KeyConditionExpression": "gsi1pk = :pk",
@@ -979,6 +981,7 @@ def inspect_dead_letter_jobs(
if next_key is not None:
query_args["ExclusiveStartKey"] = next_key
response = self._table.query(**query_args)
+ pages_read += 1
for job in items_to_scan_jobs(response.get("Items", [])):
if self._dead_letter_job_matches(
job, filters=filters, updated_before=updated_before
@@ -1008,14 +1011,7 @@ def requeue_dead_letter_jobs(
filters=filters,
updated_before=preview_watermark,
):
- updated = replace(
- job,
- status=SCAN_JOB_STATUS_PENDING,
- worker_id=None,
- lease_until=None,
- next_attempt_at=_ensure_utc(now),
- updated_at=_ensure_utc(now),
- )
+ updated = self._pending_dead_letter_job(job, now=now)
if self._put_requeued_dead_letter_job(
updated, preview_watermark=preview_watermark
):
@@ -1024,6 +1020,51 @@ def requeue_dead_letter_jobs(
skipped["changed"] += 1
return DeadLetterRequeueResult(moved=moved, skipped=dict(skipped))
+ def auto_requeue_dead_letter_jobs(
+ self,
+ *,
+ jobs: list[ScanJob],
+ preview_watermark: dt.datetime,
+ now: dt.datetime,
+ ) -> DeadLetterRequeueResult:
+ """Move selected transient dead-letter jobs to pending once."""
+
+ moved = 0
+ skipped: Counter[str] = Counter()
+ for job in jobs:
+ updated = self._pending_dead_letter_job(
+ job,
+ now=now,
+ auto_requeue_count=job.auto_requeue_count + 1,
+ )
+ if self._put_auto_requeued_dead_letter_job(
+ updated,
+ original_auto_requeue_count=job.auto_requeue_count,
+ preview_watermark=preview_watermark,
+ ):
+ moved += 1
+ else:
+ skipped["changed"] += 1
+ return DeadLetterRequeueResult(moved=moved, skipped=dict(skipped))
+
+ def _pending_dead_letter_job(
+ self,
+ job: ScanJob,
+ *,
+ now: dt.datetime,
+ auto_requeue_count: int | None = None,
+ ) -> ScanJob:
+ changes: dict[str, Any] = {
+ "status": SCAN_JOB_STATUS_PENDING,
+ "worker_id": None,
+ "lease_until": None,
+ "next_attempt_at": _ensure_utc(now),
+ "updated_at": _ensure_utc(now),
+ }
+ if auto_requeue_count is not None:
+ changes["auto_requeue_count"] = auto_requeue_count
+ return replace(job, **changes)
+
def acquire_repo_lease(
self,
repo_id: str,
@@ -1601,6 +1642,52 @@ def _put_requeued_dead_letter_job(
return False
raise
+ def _put_auto_requeued_dead_letter_job(
+ self,
+ job: ScanJob,
+ *,
+ original_auto_requeue_count: int,
+ preview_watermark: dt.datetime,
+ ) -> bool:
+ last_error_condition = "lastError = :last_error"
+ expression_values: dict[str, Any] = {
+ ":dead_letter": SCAN_JOB_STATUS_DEAD_LETTER,
+ ":preview_watermark": datetime_to_iso(preview_watermark),
+ ":auto_requeue_count": original_auto_requeue_count,
+ ":job_type": job.job_type,
+ ":attempts": job.attempts,
+ ":max_attempts": job.max_attempts,
+ ":lease_expiry_count": job.lease_expiry_count,
+ ":max_lease_expiries": job.max_lease_expiries,
+ ":last_error": job.last_error,
+ }
+ if job.last_error is None:
+ last_error_condition = (
+ "(attribute_not_exists(lastError) OR lastError = :last_error)"
+ )
+ try:
+ self._table.put_item(
+ Item=scan_job_to_item(job),
+ ConditionExpression=(
+ "attribute_exists(PK) AND attribute_exists(SK) AND "
+ "#status = :dead_letter AND updatedAt <= :preview_watermark "
+ "AND jobType = :job_type AND attempts = :attempts "
+ "AND maxAttempts = :max_attempts "
+ "AND leaseExpiryCount = :lease_expiry_count "
+ "AND maxLeaseExpiries = :max_lease_expiries "
+ f"AND {last_error_condition} "
+ "AND (attribute_not_exists(autoRequeueCount) OR "
+ "autoRequeueCount = :auto_requeue_count)"
+ ),
+ ExpressionAttributeNames={"#status": "status"},
+ ExpressionAttributeValues=expression_values,
+ )
+ return True
+ except Exception as exc:
+ if _is_conditional_check_failure(exc):
+ return False
+ raise
+
def _put_scan_ledger_if_absent(self, ledger: ScanLedgerEntry) -> None:
try:
self._table.put_item(
diff --git a/src/security_scanner/storage/base.py b/src/security_scanner/storage/base.py
index 59412f8..88e5096 100644
--- a/src/security_scanner/storage/base.py
+++ b/src/security_scanner/storage/base.py
@@ -130,6 +130,9 @@ class ScanJob:
# redacted finding snapshot, such as async verifier jobs. This is queue
# scheduling state, not repo freshness or durable sync domain data.
finding_snapshot: dict[str, Any] | None = None
+ # Counts automated dead-letter recovery attempts. Manual requeue preserves
+ # this value; auto-requeue increments it once to enforce the one-shot policy.
+ auto_requeue_count: int = 0
@property
def ledger_key(self) -> ScanLedgerKey:
@@ -624,6 +627,7 @@ def inspect_dead_letter_jobs(
limit: int,
filters: Any,
updated_before: dt.datetime | None,
+ page_limit: int | None = None,
) -> list[ScanJob]:
"""Return a bounded page of dead-letter jobs for public-safe projection."""
@@ -637,6 +641,15 @@ def requeue_dead_letter_jobs(
) -> DeadLetterRequeueResult:
"""Conditionally return matching dead-letter jobs to pending."""
+ def auto_requeue_dead_letter_jobs(
+ self,
+ *,
+ jobs: list[ScanJob],
+ preview_watermark: dt.datetime,
+ now: dt.datetime,
+ ) -> DeadLetterRequeueResult:
+ """Conditionally return selected transient dead-letter jobs to pending."""
+
@runtime_checkable
class ScanRunHealthStore(Protocol):
diff --git a/tests/test_cli_dead_letter.py b/tests/test_cli_dead_letter.py
index 6ef2be9..930116c 100644
--- a/tests/test_cli_dead_letter.py
+++ b/tests/test_cli_dead_letter.py
@@ -5,7 +5,10 @@
import datetime as dt
from security_scanner.cli import main
-from security_scanner.runtime.dead_letter_recovery import DeadLetterFilters
+from security_scanner.runtime.dead_letter_recovery import (
+ ERROR_CLASS_MALFORMED_VERIFY_JOB,
+ DeadLetterFilters,
+)
from security_scanner.runtime.verify_queue import JOB_TYPE_VERIFY
from security_scanner.storage.base import ScanJob
@@ -15,7 +18,9 @@
class FakeDeadLetterStore:
def __init__(self) -> None:
self.jobs = [_dead_letter_job()]
+ self.inspect_calls: list[dict] = []
self.requeue_calls: list[dict] = []
+ self.auto_requeue_calls: list[dict] = []
def inspect_dead_letter_jobs(
self,
@@ -23,7 +28,16 @@ def inspect_dead_letter_jobs(
limit: int,
filters: DeadLetterFilters,
updated_before: dt.datetime | None,
+ page_limit: int | None = None,
) -> list[ScanJob]:
+ self.inspect_calls.append(
+ {
+ "limit": limit,
+ "filters": filters,
+ "updated_before": updated_before,
+ "page_limit": page_limit,
+ }
+ )
return [
job
for job in self.jobs[:limit]
@@ -48,6 +62,22 @@ def requeue_dead_letter_jobs(
)
return {"moved": 1, "skipped": {}}
+ def auto_requeue_dead_letter_jobs(
+ self,
+ *,
+ jobs: list[ScanJob],
+ preview_watermark: dt.datetime,
+ now: dt.datetime,
+ ):
+ self.auto_requeue_calls.append(
+ {
+ "jobs": jobs,
+ "preview_watermark": preview_watermark,
+ "now": now,
+ }
+ )
+ return {"moved": len(jobs), "skipped": {}}
+
def test_dead_letter_inspect_cli_renders_public_safe_summary(monkeypatch, capsys):
store = FakeDeadLetterStore()
@@ -111,6 +141,118 @@ def test_dead_letter_requeue_apply_requires_preview_watermark(capsys):
assert "preview-watermark" in captured.err
+def test_dead_letter_auto_requeue_cli_dry_run_is_public_safe(monkeypatch, capsys):
+ store = FakeDeadLetterStore()
+ monkeypatch.setattr(
+ "security_scanner.cli._store.create_finding_store",
+ lambda backend, **kwargs: store,
+ )
+
+ exit_code = main(
+ [
+ "dead-letter",
+ "auto-requeue",
+ "--limit",
+ "10",
+ "--job-type",
+ JOB_TYPE_VERIFY,
+ ]
+ )
+
+ captured = capsys.readouterr()
+ assert exit_code == 0
+ assert store.auto_requeue_calls == []
+ assert "applied: no" in captured.out
+ assert "would move: 1" in captured.out
+ assert "verify:verifier-timeout: 1" in captured.out
+ for private_value in (
+ "synthetic-redaction-repo",
+ "synthetic-redaction-branch",
+ "worker-redaction-marker",
+ "redaction-marker.invalid",
+ "SCANNER_FAKE_SECRET_TOKEN_900001",
+ "synthetic-redaction/service/config.py",
+ store.jobs[0].job_id,
+ ):
+ assert private_value not in captured.out
+
+
+def test_dead_letter_auto_requeue_cli_apply_is_explicit(monkeypatch, capsys):
+ store = FakeDeadLetterStore()
+ monkeypatch.setattr(
+ "security_scanner.cli._store.create_finding_store",
+ lambda backend, **kwargs: store,
+ )
+
+ exit_code = main(
+ [
+ "dead-letter",
+ "auto-requeue",
+ "--limit",
+ "10",
+ "--job-type",
+ JOB_TYPE_VERIFY,
+ "--apply",
+ ]
+ )
+
+ captured = capsys.readouterr()
+ assert exit_code == 0
+ assert len(store.auto_requeue_calls) == 1
+ assert "applied: yes" in captured.out
+ assert "moved: 1" in captured.out
+
+
+def test_dead_letter_auto_requeue_cli_defaults_to_verify_jobs(monkeypatch, capsys):
+ store = FakeDeadLetterStore()
+ monkeypatch.setattr(
+ "security_scanner.cli._store.create_finding_store",
+ lambda backend, **kwargs: store,
+ )
+
+ exit_code = main(["dead-letter", "auto-requeue", "--limit", "10"])
+
+ captured = capsys.readouterr()
+ assert exit_code == 0
+ assert store.inspect_calls[0]["filters"].job_type == JOB_TYPE_VERIFY
+ assert "would move: 1" in captured.out
+
+
+def test_dead_letter_auto_requeue_cli_rejects_non_transient_root_class(capsys):
+ exit_code = main(
+ [
+ "dead-letter",
+ "auto-requeue",
+ "--limit",
+ "1",
+ "--root-error-class",
+ ERROR_CLASS_MALFORMED_VERIFY_JOB,
+ "--apply",
+ ]
+ )
+
+ captured = capsys.readouterr()
+ assert exit_code == 2
+ assert "transient_root_error_classes" in captured.err
+
+
+def test_dead_letter_auto_requeue_cli_rejects_invalid_cooldown(capsys):
+ exit_code = main(
+ [
+ "dead-letter",
+ "auto-requeue",
+ "--limit",
+ "1",
+ "--cooldown-minutes",
+ "-1",
+ ]
+ )
+
+ captured = capsys.readouterr()
+ assert exit_code == 2
+ assert "cooldown" in captured.err
+
+
def _dead_letter_job() -> ScanJob:
return ScanJob(
job_id="synthetic-redaction-marker-job-123",
@@ -133,9 +275,9 @@ def _dead_letter_job() -> ScanJob:
lease_until=None,
next_attempt_at=NOW,
created_at=NOW - dt.timedelta(hours=2),
- updated_at=NOW - dt.timedelta(minutes=10),
+ updated_at=NOW - dt.timedelta(minutes=45),
last_error=(
- "HTTPError from https://redaction-marker.invalid/token?"
+ "Verifier timed out after calling https://redaction-marker.invalid/token?"
"marker=SCANNER_FAKE_SECRET_TOKEN_900001"
),
job_type=JOB_TYPE_VERIFY,
diff --git a/tests/test_dead_letter_recovery.py b/tests/test_dead_letter_recovery.py
index 06e14f5..4c61101 100644
--- a/tests/test_dead_letter_recovery.py
+++ b/tests/test_dead_letter_recovery.py
@@ -7,9 +7,17 @@
import pytest
from security_scanner.runtime.dead_letter_recovery import (
+ DEFAULT_AUTO_REQUEUE_SCAN_PAGES,
+ ERROR_CLASS_LEASE_EXPIRED_BUDGET,
+ ERROR_CLASS_MALFORMED_VERIFY_JOB,
+ ERROR_CLASS_VERIFIER_TIMEOUT,
+ TERMINAL_REASON_RETRY_BUDGET_EXHAUSTED,
+ DeadLetterAutoRequeueRequest,
DeadLetterFilters,
DeadLetterInspectRequest,
DeadLetterRequeueRequest,
+ auto_requeue_dead_letter_jobs,
+ classify_dead_letter_job,
inspect_dead_letter_jobs,
render_dead_letter_inspect,
render_dead_letter_requeue,
@@ -24,7 +32,9 @@
class FakeDeadLetterStore:
def __init__(self, jobs: list[ScanJob]) -> None:
self.jobs = jobs
+ self.inspect_calls: list[dict] = []
self.requeue_calls: list[dict] = []
+ self.auto_requeue_calls: list[dict] = []
def inspect_dead_letter_jobs(
self,
@@ -32,7 +42,16 @@ def inspect_dead_letter_jobs(
limit: int,
filters: DeadLetterFilters,
updated_before: dt.datetime | None,
+ page_limit: int | None = None,
) -> list[ScanJob]:
+ self.inspect_calls.append(
+ {
+ "limit": limit,
+ "filters": filters,
+ "updated_before": updated_before,
+ "page_limit": page_limit,
+ }
+ )
selected = [
job
for job in self.jobs
@@ -59,6 +78,22 @@ def requeue_dead_letter_jobs(
)
return {"moved": min(limit, len(self.jobs)), "skipped": {}}
+ def auto_requeue_dead_letter_jobs(
+ self,
+ *,
+ jobs: list[ScanJob],
+ preview_watermark: dt.datetime,
+ now: dt.datetime,
+ ):
+ self.auto_requeue_calls.append(
+ {
+ "jobs": jobs,
+ "preview_watermark": preview_watermark,
+ "now": now,
+ }
+ )
+ return {"moved": len(jobs), "skipped": {}}
+
def test_inspect_renders_aggregate_and_redacted_samples():
private_job = ScanJob(
@@ -172,6 +207,225 @@ def test_requeue_apply_requires_preview_watermark():
assert store.requeue_calls == []
+def test_classification_splits_terminal_reason_from_root_error_class():
+ job = _dead_letter_job(
+ "job-timeout",
+ attempts=3,
+ max_attempts=3,
+ last_error="Verifier timed out.",
+ )
+
+ classification = classify_dead_letter_job(job)
+
+ assert classification.terminal_reason == TERMINAL_REASON_RETRY_BUDGET_EXHAUSTED
+ assert classification.root_error_class == ERROR_CLASS_VERIFIER_TIMEOUT
+ assert classification.error_class == "retry-budget-exhausted"
+ assert classification.auto_requeue_eligible is True
+
+
+def test_malformed_verify_job_is_never_auto_requeue_eligible():
+ job = _dead_letter_job(
+ "job-malformed",
+ attempts=3,
+ max_attempts=3,
+ last_error="verify job is missing finding snapshot",
+ )
+
+ classification = classify_dead_letter_job(job)
+
+ assert classification.root_error_class == ERROR_CLASS_MALFORMED_VERIFY_JOB
+ assert classification.auto_requeue_eligible is False
+
+
+def test_lease_expired_budget_is_transient_auto_requeue_root_class():
+ job = _dead_letter_job(
+ "job-lease-expired",
+ lease_expiry_count=5,
+ max_lease_expiries=5,
+ last_error=None,
+ )
+
+ classification = classify_dead_letter_job(job)
+
+ assert classification.terminal_reason == ERROR_CLASS_LEASE_EXPIRED_BUDGET
+ assert classification.root_error_class == ERROR_CLASS_LEASE_EXPIRED_BUDGET
+ assert classification.auto_requeue_eligible is True
+
+
+def test_lease_expired_budget_does_not_override_malformed_verify_evidence():
+ job = _dead_letter_job(
+ "job-lease-expired-malformed",
+ lease_expiry_count=5,
+ max_lease_expiries=5,
+ last_error="verify job is missing finding snapshot",
+ )
+
+ classification = classify_dead_letter_job(job)
+
+ assert classification.terminal_reason == ERROR_CLASS_LEASE_EXPIRED_BUDGET
+ assert classification.root_error_class == ERROR_CLASS_MALFORMED_VERIFY_JOB
+ assert classification.auto_requeue_eligible is False
+
+
+def test_auto_requeue_rejects_non_transient_root_class_allow_list():
+ store = FakeDeadLetterStore([])
+
+ with pytest.raises(ValueError, match="transient_root_error_classes"):
+ auto_requeue_dead_letter_jobs(
+ DeadLetterAutoRequeueRequest(
+ store=store,
+ limit=10,
+ transient_root_error_classes=frozenset(
+ {ERROR_CLASS_MALFORMED_VERIFY_JOB}
+ ),
+ now_factory=lambda: NOW,
+ )
+ )
+
+
+def test_auto_requeue_dry_run_selects_transient_jobs_after_cooldown():
+ old_timeout = _dead_letter_job(
+ "job-timeout-old",
+ attempts=3,
+ max_attempts=3,
+ last_error="Verifier timed out.",
+ updated_at=NOW - dt.timedelta(minutes=45),
+ )
+ recent_timeout = _dead_letter_job(
+ "job-timeout-recent",
+ attempts=3,
+ max_attempts=3,
+ last_error="Verifier timed out.",
+ updated_at=NOW - dt.timedelta(minutes=5),
+ )
+ malformed = _dead_letter_job(
+ "job-malformed",
+ attempts=3,
+ max_attempts=3,
+ last_error="verify job is missing finding snapshot",
+ updated_at=NOW - dt.timedelta(minutes=45),
+ )
+ store = FakeDeadLetterStore([old_timeout, recent_timeout, malformed])
+
+ summary = auto_requeue_dead_letter_jobs(
+ DeadLetterAutoRequeueRequest(
+ store=store,
+ limit=10,
+ filters=DeadLetterFilters(job_type=JOB_TYPE_VERIFY),
+ cooldown=dt.timedelta(minutes=30),
+ apply=False,
+ now_factory=lambda: NOW,
+ )
+ )
+
+ assert summary.applied is False
+ assert summary.would_move == 1
+ assert summary.moved == 0
+ assert summary.classification_counts["verify:verifier-timeout"] == 1
+ assert summary.skipped.get("cooldown", 0) == 0
+ assert summary.skipped["not_transient"] == 1
+ assert store.inspect_calls[0]["updated_before"] == NOW - dt.timedelta(minutes=30)
+ assert store.inspect_calls[0]["page_limit"] == DEFAULT_AUTO_REQUEUE_SCAN_PAGES
+ assert store.auto_requeue_calls == []
+
+
+def test_auto_requeue_read_cutoff_prevents_recent_jobs_consuming_limit():
+ recent_timeout = _dead_letter_job(
+ "job-timeout-recent",
+ attempts=3,
+ max_attempts=3,
+ last_error="Verifier timed out.",
+ updated_at=NOW - dt.timedelta(minutes=5),
+ )
+ old_timeout = _dead_letter_job(
+ "job-timeout-old",
+ attempts=3,
+ max_attempts=3,
+ last_error="Verifier timed out.",
+ updated_at=NOW - dt.timedelta(minutes=45),
+ )
+ store = FakeDeadLetterStore([recent_timeout, old_timeout])
+
+ summary = auto_requeue_dead_letter_jobs(
+ DeadLetterAutoRequeueRequest(
+ store=store,
+ limit=1,
+ filters=DeadLetterFilters(job_type=JOB_TYPE_VERIFY),
+ cooldown=dt.timedelta(minutes=30),
+ apply=False,
+ now_factory=lambda: NOW,
+ )
+ )
+
+ assert summary.would_move == 1
+ assert store.inspect_calls[0]["updated_before"] == NOW - dt.timedelta(minutes=30)
+
+
+def test_auto_requeue_skips_jobs_already_auto_requeued_once():
+ already_requeued = _dead_letter_job(
+ "job-timeout-requeued",
+ attempts=3,
+ max_attempts=3,
+ last_error="Verifier timed out.",
+ updated_at=NOW - dt.timedelta(minutes=45),
+ auto_requeue_count=1,
+ )
+ store = FakeDeadLetterStore([already_requeued])
+
+ summary = auto_requeue_dead_letter_jobs(
+ DeadLetterAutoRequeueRequest(
+ store=store,
+ limit=10,
+ filters=DeadLetterFilters(job_type=JOB_TYPE_VERIFY),
+ cooldown=dt.timedelta(minutes=30),
+ apply=False,
+ now_factory=lambda: NOW,
+ )
+ )
+
+ assert summary.would_move == 0
+ assert summary.skipped["already_auto_requeued"] == 1
+
+
+def test_auto_requeue_apply_uses_selected_jobs_only():
+ old_timeout = _dead_letter_job(
+ "job-timeout-old",
+ attempts=3,
+ max_attempts=3,
+ last_error="Verifier timed out.",
+ updated_at=NOW - dt.timedelta(minutes=45),
+ )
+ malformed = _dead_letter_job(
+ "job-malformed",
+ attempts=3,
+ max_attempts=3,
+ last_error="verify job is missing finding snapshot",
+ updated_at=NOW - dt.timedelta(minutes=45),
+ )
+ store = FakeDeadLetterStore([old_timeout, malformed])
+
+ summary = auto_requeue_dead_letter_jobs(
+ DeadLetterAutoRequeueRequest(
+ store=store,
+ limit=10,
+ filters=DeadLetterFilters(job_type=JOB_TYPE_VERIFY),
+ cooldown=dt.timedelta(minutes=30),
+ apply=True,
+ now_factory=lambda: NOW,
+ )
+ )
+
+ assert summary.applied is True
+ assert summary.would_move == 0
+ assert summary.moved == 1
+ assert [job.job_id for job in store.auto_requeue_calls[0]["jobs"]] == [
+ old_timeout.job_id
+ ]
+ assert store.auto_requeue_calls[0]["preview_watermark"] == NOW - dt.timedelta(
+ minutes=30
+ )
+
+
def test_inspect_rejects_invalid_bounds():
store = FakeDeadLetterStore([])
@@ -196,7 +450,17 @@ def test_inspect_rejects_invalid_bounds():
)
-def _dead_letter_job(job_id: str) -> ScanJob:
+def _dead_letter_job(
+ job_id: str,
+ *,
+ attempts: int = 1,
+ max_attempts: int = 3,
+ last_error: str = "malformed verify job payload",
+ updated_at: dt.datetime | None = None,
+ auto_requeue_count: int = 0,
+ lease_expiry_count: int = 0,
+ max_lease_expiries: int = 5,
+) -> ScanJob:
return ScanJob(
job_id=job_id,
repo_id="repo_synthetic",
@@ -212,13 +476,16 @@ def _dead_letter_job(job_id: str) -> ScanJob:
scanner_config_hash="default",
priority=100,
status="dead_letter",
- attempts=1,
- max_attempts=3,
+ attempts=attempts,
+ max_attempts=max_attempts,
worker_id=None,
lease_until=None,
next_attempt_at=NOW,
created_at=NOW - dt.timedelta(minutes=30),
- updated_at=NOW - dt.timedelta(minutes=5),
- last_error="malformed verify job payload",
+ updated_at=updated_at or NOW - dt.timedelta(minutes=5),
+ last_error=last_error,
job_type=JOB_TYPE_VERIFY,
+ auto_requeue_count=auto_requeue_count,
+ lease_expiry_count=lease_expiry_count,
+ max_lease_expiries=max_lease_expiries,
)
diff --git a/tests/test_incremental_scan_storage.py b/tests/test_incremental_scan_storage.py
index c194eaf..5fe7926 100644
--- a/tests/test_incremental_scan_storage.py
+++ b/tests/test_incremental_scan_storage.py
@@ -205,6 +205,42 @@ def _condition_allows(self, kwargs: dict, existing: dict | None) -> bool:
and existing.get("status") == values[":dead_letter"]
and existing.get("updatedAt", "") <= values[":preview_watermark"]
)
+ if (
+ expression
+ and "autoRequeueCount = :auto_requeue_count" in expression
+ and "jobType = :job_type" in expression
+ ):
+ last_error_matches = (
+ existing is not None
+ and (
+ (
+ "lastError = :last_error" in expression
+ and existing.get("lastError") == values[":last_error"]
+ )
+ or (
+ "attribute_not_exists(lastError)" in expression
+ and "lastError" not in existing
+ )
+ )
+ )
+ return (
+ existing is not None
+ and existing.get("status") == values[":dead_letter"]
+ and existing.get("updatedAt", "") <= values[":preview_watermark"]
+ and existing.get("jobType") == values[":job_type"]
+ and existing.get("attempts") == values[":attempts"]
+ and existing.get("maxAttempts") == values[":max_attempts"]
+ and existing.get("leaseExpiryCount")
+ == values[":lease_expiry_count"]
+ and existing.get("maxLeaseExpiries")
+ == values[":max_lease_expiries"]
+ and last_error_matches
+ and (
+ "autoRequeueCount" not in existing
+ or existing.get("autoRequeueCount")
+ == values[":auto_requeue_count"]
+ )
+ )
if expression == "attribute_not_exists(PK) OR leaseUntil <= :now":
return existing is None or existing["leaseUntil"] <= values[":now"]
if expression == "workerId = :worker_id":
@@ -428,6 +464,16 @@ def test_scan_job_item_round_trips_verify_finding_snapshot():
assert restored.finding_snapshot == finding.to_dict()
+def test_scan_job_item_round_trips_auto_requeue_count():
+ job = ScanJob(**{**_make_job().__dict__, "auto_requeue_count": 1})
+
+ item = scan_job_to_item(job)
+ restored = scan_job_from_item(item)
+
+ assert item["autoRequeueCount"] == 1
+ assert restored.auto_requeue_count == 1
+
+
def test_repo_id_and_job_id_are_deterministic_from_contract_fields():
normalized = "https://github.com/example-org/example-repo"
expected_repo_id = (
@@ -945,6 +991,53 @@ def test_inspect_dead_letter_jobs_paginates_until_limit_matches_after_filtering(
assert "ExclusiveStartKey" in table.query_calls[-1]
+def test_inspect_dead_letter_jobs_respects_optional_page_limit():
+ store, table = _make_store()
+ first_incremental = ScanJob(
+ **{
+ **_make_job(
+ commit_sha="6" * 40,
+ next_attempt_at=NOW - dt.timedelta(minutes=5),
+ ).__dict__,
+ "status": "dead_letter",
+ "attempts": 3,
+ }
+ )
+ second_incremental = ScanJob(
+ **{
+ **_make_job(
+ commit_sha="7" * 40,
+ next_attempt_at=NOW - dt.timedelta(minutes=4),
+ ).__dict__,
+ "status": "dead_letter",
+ "attempts": 3,
+ }
+ )
+ verify_dead_letter = ScanJob(
+ **{
+ **_make_job(
+ commit_sha="8" * 40,
+ job_type=JOB_TYPE_VERIFY,
+ next_attempt_at=NOW - dt.timedelta(minutes=3),
+ ).__dict__,
+ "status": "dead_letter",
+ "attempts": 3,
+ }
+ )
+ for job in (first_incremental, second_incremental, verify_dead_letter):
+ table.put_item(Item=scan_job_to_item(job))
+
+ jobs = store.inspect_dead_letter_jobs(
+ limit=1,
+ filters=DeadLetterFilters(job_type=JOB_TYPE_VERIFY),
+ updated_before=NOW + dt.timedelta(minutes=1),
+ page_limit=2,
+ )
+
+ assert jobs == []
+ assert len(table.query_calls) == 2
+
+
def test_inspect_dead_letter_jobs_normalizes_job_updated_at_before_comparison():
store, table = _make_store()
verify_dead_letter = ScanJob(
@@ -1013,6 +1106,122 @@ def test_requeue_dead_letter_jobs_preserves_failure_history_and_retry_budgets():
assert updated.finding_snapshot == original.finding_snapshot
+def test_auto_requeue_dead_letter_jobs_marks_one_shot_metadata_and_preserves_history():
+ store, table = _make_store()
+ original = ScanJob(
+ **{
+ **_make_job(commit_sha="9" * 40, job_type=JOB_TYPE_VERIFY).__dict__,
+ "status": "dead_letter",
+ "attempts": 3,
+ "max_attempts": 3,
+ "worker_id": "stale-worker",
+ "lease_until": NOW - dt.timedelta(minutes=5),
+ "last_error": "Verifier timed out.",
+ "fence": 7,
+ "lease_expiry_count": 1,
+ "finding_snapshot": {"path": "private/config.py"},
+ "auto_requeue_count": 0,
+ }
+ )
+ table.put_item(Item=scan_job_to_item(original))
+
+ summary = store.auto_requeue_dead_letter_jobs(
+ jobs=[original],
+ preview_watermark=NOW + dt.timedelta(minutes=1),
+ now=NOW + dt.timedelta(minutes=2),
+ )
+
+ updated = scan_job_from_item(
+ table.get_item(Key={"PK": f"SCAN_JOB#{original.job_id}", "SK": "META"})[
+ "Item"
+ ]
+ )
+ assert summary.moved == 1
+ assert summary.skipped == {}
+ assert updated.status == "pending"
+ assert updated.worker_id is None
+ assert updated.lease_until is None
+ assert updated.next_attempt_at == NOW + dt.timedelta(minutes=2)
+ assert updated.attempts == original.attempts
+ assert updated.max_attempts == original.max_attempts
+ assert updated.lease_expiry_count == original.lease_expiry_count
+ assert updated.fence == original.fence
+ assert updated.last_error == original.last_error
+ assert updated.finding_snapshot == original.finding_snapshot
+ assert updated.auto_requeue_count == 1
+
+
+def test_auto_requeue_dead_letter_jobs_allows_null_last_error_attribute():
+ store, table = _make_store()
+ original = ScanJob(
+ **{
+ **_make_job(commit_sha="9" * 40, job_type=JOB_TYPE_VERIFY).__dict__,
+ "status": "dead_letter",
+ "attempts": 3,
+ "max_attempts": 3,
+ "last_error": None,
+ "auto_requeue_count": 0,
+ }
+ )
+ table.put_item(Item=scan_job_to_item(original))
+
+ summary = store.auto_requeue_dead_letter_jobs(
+ jobs=[original],
+ preview_watermark=NOW + dt.timedelta(minutes=1),
+ now=NOW + dt.timedelta(minutes=2),
+ )
+
+ updated = scan_job_from_item(
+ table.get_item(Key={"PK": f"SCAN_JOB#{original.job_id}", "SK": "META"})[
+ "Item"
+ ]
+ )
+ assert summary.moved == 1
+ assert summary.skipped == {}
+ assert updated.status == "pending"
+ assert updated.last_error is None
+ assert updated.auto_requeue_count == 1
+
+
+def test_auto_requeue_dead_letter_jobs_skips_changed_classification_inputs():
+ store, table = _make_store()
+ original = ScanJob(
+ **{
+ **_make_job(commit_sha="9" * 40, job_type=JOB_TYPE_VERIFY).__dict__,
+ "status": "dead_letter",
+ "attempts": 3,
+ "max_attempts": 3,
+ "last_error": "Verifier timed out.",
+ "auto_requeue_count": 0,
+ }
+ )
+ table.put_item(Item=scan_job_to_item(original))
+ changed = ScanJob(
+ **{
+ **original.__dict__,
+ "last_error": "verify job is missing finding snapshot",
+ }
+ )
+ table.put_item(Item=scan_job_to_item(changed))
+
+ summary = store.auto_requeue_dead_letter_jobs(
+ jobs=[original],
+ preview_watermark=NOW + dt.timedelta(minutes=1),
+ now=NOW + dt.timedelta(minutes=2),
+ )
+
+ stored = scan_job_from_item(
+ table.get_item(Key={"PK": f"SCAN_JOB#{original.job_id}", "SK": "META"})[
+ "Item"
+ ]
+ )
+ assert summary.moved == 0
+ assert summary.skipped == {"changed": 1}
+ assert stored.status == "dead_letter"
+ assert stored.last_error == changed.last_error
+ assert stored.auto_requeue_count == 0
+
+
def test_complete_processed_job_writes_findings_ledger_then_completed_job():
store, table = _make_store()
job = _make_job()
diff --git a/tests/test_personal_prod_systemd_units.py b/tests/test_personal_prod_systemd_units.py
index d77f733..10aea8b 100644
--- a/tests/test_personal_prod_systemd_units.py
+++ b/tests/test_personal_prod_systemd_units.py
@@ -51,6 +51,11 @@
"security-scanner-personal-ops-snapshot.timer",
"*-*-* 09:00:00",
),
+ "dead-letter-auto-requeue": (
+ "security-scanner-personal-dead-letter-auto-requeue.service",
+ "security-scanner-personal-dead-letter-auto-requeue.timer",
+ "*:0/30:00",
+ ),
}
ALL_SERVICE_FILES = [
@@ -248,6 +253,23 @@ def test_personal_llm_verify_expands_user_path_in_execstart_only() -> None:
assert "%h/.local/bin/uv run security-scanner verify-drain" in text
+def test_personal_dead_letter_auto_requeue_uses_conservative_verify_defaults() -> None:
+ text = (
+ USER_SYSTEMD_DIR
+ / "security-scanner-personal-dead-letter-auto-requeue.service"
+ ).read_text(encoding="utf-8")
+
+ assert "%h/.local/bin/uv run security-scanner dead-letter auto-requeue" in text
+ assert "--job-type verify" in text
+ assert (
+ "--cooldown-minutes "
+ "${SECURITY_SCANNER_DEAD_LETTER_AUTO_REQUEUE_COOLDOWN_MINUTES}"
+ ) in text
+ assert "--limit ${SECURITY_SCANNER_DEAD_LETTER_AUTO_REQUEUE_LIMIT}" in text
+ assert "--apply" in text
+ assert "SECURITY_SCANNER_DEAD_LETTER_AUTO_REQUEUE_COOLDOWN_MINUTES=30" in text
+
+
def test_personal_incr_poll_treats_partial_discovery_as_success() -> None:
parser = _parse_unit(
USER_SYSTEMD_DIR / "security-scanner-personal-incr-poll.service"