[codex] Add incremental scan queue worker MVP#15
Conversation
Add the issue #12 ADR, implementation spec, and Goal-based agentic workflow for the incremental scan queue worker MVP. Co-Authored-By: Codex GPT-5 <noreply@openai.com>
Implement the G1 storage model for incremental scan queue state, commit ledger rows, job leases, repo leases, and retry-safe completion. Co-Authored-By: Codex GPT-5 <noreply@openai.com>
Implement discover-updates initialize and enqueue modes with git ref discovery, ref state updates, ledger skips, and queue job creation. Co-Authored-By: Codex GPT-5 <noreply@openai.com>
Add git log opts support for Gitleaks and implement scan-worker --once for queued incremental commit jobs. Co-Authored-By: Codex GPT-5 <noreply@openai.com>
Co-Authored-By: Codex GPT-5 <noreply@openai.com>
Add container/runtime support for a one-command local quickstart path with DynamoDB Local persistence, SCM preflight checks, public git fallback, and quickstart queue seeding/worker execution. Co-Authored-By: Codex GPT-5 <noreply@openai.com>
Extract scan-all locking, catalog lookup, fetch isolation, in-memory manifest assembly, local scan invocation, and notification logging into runtime/scan_all.py while keeping the CLI as a thin presentation wrapper. Co-Authored-By: Codex GPT-5 <noreply@openai.com>
There was a problem hiding this comment.
Code Review
This pull request implements an incremental scan queue worker MVP for branch-aware secret scanning, replacing Dynalite with DynamoDB Local and introducing new CLI commands (discover-updates, scan-worker, queue-status, doctor, and quickstart) along with a turnkey Docker Compose worker. The feedback recommends improving lock safety in run_scan_all using a with statement, optimizing get_queue_status by querying GSI1 instead of scanning the table, continuing the worker loop on repository lease failures instead of breaking, and expanding type validation tests for git_log_opts.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| try: | ||
| lock_cm = _scan_all_lock(lock_path) | ||
| lock_cm.__enter__() | ||
| except _LockContention: | ||
| event_at = request.now_factory() | ||
| request.notification_writer( | ||
| log_path, | ||
| lock_contention_record( | ||
| event_at=event_at, | ||
| lock_path=str(lock_path), | ||
| ), | ||
| ) | ||
| return ScanAllResult( | ||
| exit_code=3, | ||
| status="lock_contention", | ||
| error="another scan-all is running", | ||
| ) | ||
| except OSError as exc: | ||
| request.notification_writer( | ||
| log_path, | ||
| fatal_error_record( | ||
| event_at=request.now_factory(), | ||
| scan_run_id=None, | ||
| error=f"Lock acquisition failed: {exc}", | ||
| stage="lock_acquisition", | ||
| ), | ||
| ) | ||
| return ScanAllResult( | ||
| exit_code=1, | ||
| status="lock_failure", | ||
| error=f"failed to acquire lock: {exc}", | ||
| ) | ||
|
|
||
| try: | ||
| return _run_scan_all_locked(request, started_at, log_path) | ||
| finally: | ||
| lock_cm.__exit__(None, None, None) |
There was a problem hiding this comment.
run_scan_all 함수에서 @contextlib.contextmanager로 구현된 _scan_all_lock을 사용할 때, __enter__()와 __exit__()를 수동으로 호출하고 있습니다. 이 방식은 예외 발생 시 컨텍스트 매니저가 예외를 정상적으로 전달받지 못해 예외가 누락되거나 contextlib 내부에서 RuntimeError가 발생할 수 있어 안전하지 않습니다.
대신 Python의 표준 with 문을 사용하여 안전하고 직관적으로 락을 관리하는 것을 권장합니다.
try:
with _scan_all_lock(lock_path):
return _run_scan_all_locked(request, started_at, log_path)
except _LockContention:
event_at = request.now_factory()
request.notification_writer(
log_path,
lock_contention_record(
event_at=event_at,
lock_path=str(lock_path),
),
)
return ScanAllResult(
exit_code=3,
status="lock_contention",
error="another scan-all is running",
)
except OSError as exc:
request.notification_writer(
log_path,
fatal_error_record(
event_at=request.now_factory(),
scan_run_id=None,
error=f"Lock acquisition failed: {exc}",
stage="lock_acquisition",
),
)
return ScanAllResult(
exit_code=1,
status="lock_failure",
error=f"failed to acquire lock: {exc}",
)| lease_items = scan_all_pages( | ||
| self._table, | ||
| FilterExpression="entityType = :entity_type", | ||
| ExpressionAttributeValues={":entity_type": "REPO_LEASE"}, | ||
| ) |
There was a problem hiding this comment.
get_queue_status 메서드에서 scan_all_pages를 사용하여 테이블 전체를 스캔하고 있습니다. DynamoDB의 Scan 작업은 필터 표현식 적용 전에 테이블의 모든 항목을 읽으므로, 데이터가 늘어남에 따라 성능 저하와 높은 비용(RCU 소모)을 초래합니다.
REPO_LEASE 항목은 모두 gsi1pk = REPO_LEASE#ALL 형태의 GSI1 키를 가지고 있으므로, Scan 대신 GSI1에 대한 Query를 사용하면 훨씬 효율적입니다.
| lease_items = scan_all_pages( | |
| self._table, | |
| FilterExpression="entityType = :entity_type", | |
| ExpressionAttributeValues={":entity_type": "REPO_LEASE"}, | |
| ) | |
| lease_items = query_all_pages( | |
| self._table, | |
| IndexName=GSI1_NAME, | |
| KeyConditionExpression="gsi1pk = :pk", | |
| ExpressionAttributeValues={":pk": "REPO_LEASE#ALL"}, | |
| ) |
| if not request.store.acquire_repo_lease( | ||
| job.repo_id, | ||
| worker_id, | ||
| request.lease_seconds, | ||
| ): | ||
| request.store.return_job_to_pending(job.job_id, "repo lease unavailable") | ||
| break |
There was a problem hiding this comment.
특정 리포지토리의 임대(Repo Lease)를 획득하지 못했을 때 break를 호출하여 루프를 완전히 빠져나가고 있습니다. 이로 인해 큐에 대기 중인 다른 독립적인 리포지토리의 작업들이 있음에도 불구하고, 단 하나의 리포지토리가 잠겨 있다는 이유로 워커가 프로세싱을 완전히 중단하게 됩니다.
break 대신 continue를 사용하여 다른 리포지토리의 작업을 계속 시도할 수 있도록 개선하는 것이 좋습니다. 단, return_job_to_pending이 해당 작업을 즉시 다시 pending 상태로 되돌리기 때문에, 동일한 워커가 다음 루프에서 동일한 작업을 다시 가져와 무한 루프를 도는 것을 방지하기 위해 return_job_to_pending 시 약간의 지연 시간(Backoff)을 next_attempt_at에 추가하는 것을 권장합니다.
| if not request.store.acquire_repo_lease( | |
| job.repo_id, | |
| worker_id, | |
| request.lease_seconds, | |
| ): | |
| request.store.return_job_to_pending(job.job_id, "repo lease unavailable") | |
| break | |
| if not request.store.acquire_repo_lease( | |
| job.repo_id, | |
| worker_id, | |
| request.lease_seconds, | |
| ): | |
| request.store.return_job_to_pending(job.job_id, "repo lease unavailable") | |
| continue |
| def test_scan_git_log_opts_non_string_raises(tmp_path: Path) -> None: | ||
| yaml_content = """\ | ||
| version: 1 | ||
| targets: | ||
| - name: demo-bad-log-opts | ||
| path: /tmp/demo-bad-log-opts | ||
| scan: | ||
| git_log_opts: 123 | ||
| """ | ||
| p = write_yaml(tmp_path, yaml_content) | ||
|
|
||
| with pytest.raises(ManifestError, match="scan.git_log_opts"): | ||
| load_manifest(p) |
There was a problem hiding this comment.
git_log_opts 필드에 대한 타입 검증 테스트가 정수형(123)에 대해서만 수행되고 있습니다. 저장소의 일반 규칙(General Rules)에 따라, JSON/YAML 파싱 필드의 방어적 타입 검증 시 None, boolean, list, dictionary 등 다양한 예기치 않은 타입들이 들어왔을 때도 견고하게 동작하는지 검증하는 종합적인 단위 테스트를 추가해야 합니다.
@pytest.mark.parametrize(
"invalid_value",
[
"null",
"123",
"True",
"['--some-opt']",
"{'key': 'value'}",
],
)
def test_scan_git_log_opts_invalid_types_raise(tmp_path: Path, invalid_value) -> None:
yaml_content = f"""\\
version: 1
targets:
- name: demo-bad-log-opts
path: /tmp/demo-bad-log-opts
scan:
git_log_opts: {invalid_value}
"""
p = write_yaml(tmp_path, yaml_content)
with pytest.raises(ManifestError, match="scan.git_log_opts"):
load_manifest(p)References
- When implementing defensive type validation for parsed JSON fields (e.g., verifying a field is a string), ensure robustness by adding comprehensive unit tests that cover various unexpected types, including None, numbers, booleans, lists, and dictionaries.
Resolve conflicts between incremental queue worker support and the latest severity/noise-filter changes on main. Keep git_log_opts and enable_noise_filter manifest options together.\n\nCo-Authored-By: Codex GPT-5 <noreply@openai.com>
Use a default factory for the scan-all notification writer so static analysis does not treat the writer as a bound dataclass method. Clean up scan-worker test notices reported by CodeQL. Co-Authored-By: Codex GPT-5 <noreply@openai.com>
Purpose & Motivation
Issue #12의 incremental scan queue worker MVP를 구현합니다.
주요 목표는 기존
scan-all중심 batch 실행에 더해, commit 단위 durable queue/ledger 모델을 추가하고 Docker Compose + DynamoDB Local로 새 PC에서도 public repo quickstart smoke가 가능한 경로를 제공하는 것입니다.Context
이 PR은 다음 흐름을 추가합니다.
REF_STATE,SCAN_JOB,SCAN_LEDGER,REPO_LEASElogical entity를 DynamoDB-compatible single-table store에 추가repoId/jobId와 conditional write 기반 enqueue idempotency 구현gh,glab,gitleaks기반 turnkey path 추가scan-allorchestration을runtime/scan_all.py로 이동해 CLI 책임 축소저장소는 물리적으로 single-table 구조를 유지하지만, queue/ledger/lease를 위한 logical schema와 access pattern을 추가했습니다.
Note
리뷰 시 특히 봐주면 좋은 부분입니다.
complete_processed_job의 순서: findings write →SCAN_LEDGERput-if-absent →SCAN_JOBcompleteddead_letter로 전환하는 동작REPO_LEASEkey shape:gsi1pk=REPO_LEASE#ALL,gsi1sk=<lease_until>#<repo_id>SCAN_JOB_STATUS#pending같은 MVP status partition은 500+ repo 지속 운영용으로는 shard/counter 개선이 필요합니다. 이 PR은 MVP/local-turnkey proof 범위입니다.Dependency
Refs #12.
후속 작업 후보:
Checklist
uv run pytest -q tests/test_incremental_scan_storage.pyuv run pytest -q tests/test_scan_target_storage.py tests/test_cli_scan_all.pyuv run pytest -qgit diff --check origin/main..HEADragflow-ubuntuwith public synthetic target path