From de4faccae3e0aca25de9d06a35ef449d210858a8 Mon Sep 17 00:00:00 2001 From: pureliture Date: Wed, 17 Jun 2026 10:32:48 +0900 Subject: [PATCH] feat(storage): shard repo-axis GSI1 partition (issue #23) Spread per-repo rows across REPO##SHARD#00..15 (repoAxisVersion=2, shardCount=16) to remove the single hot REPO# GSI1 partition. - RepoAxisKey + repo_axis_inputs: single source of truth for the per-entity sharded key formula; write mappers and backfill share it (no drift). - read_repo_axis: scatter-gather fan-out with migration-only include_legacy fallback, fail-closed on shard error, dedupe by (PK,SK) preferring higher repoAxisVersion, canonical (gsi1sk,PK,SK) order. - read_observations_for_repo / residual_for_repo route through the fan-out reader; direct primary-key reads unchanged. - repo_axis_migration: in-place conditional backfill (not row copy) with a per-entity inventory/backfilled/skipped/failed/remaining report and a documented compatibility-removal gate. - regression scan: single allowed construction site; bans any #SHARD# literal, raw gsi1pk=REPO#, and GSI1 reads on the raw REPO# partition. Implements docs/workbench/specs/issue-23-repo-gsi1-sharding/design.md (M1-M5) and the post-implementation multi-agent review fixes. Full suite green. Co-Authored-By: Claude Opus 4.8 --- .../issue-23-repo-gsi1-sharding/design.md | 314 +++++++ .../issue-23-repo-gsi1-sharding/milestones.md | 70 ++ .../requirements.html | 95 +++ .../requirements.md | 112 +++ .../runtime/branch_residual.py | 15 +- .../storage/adapters/nosql_db/items.py | 39 +- .../storage/adapters/nosql_db/repo_axis.py | 205 +++++ .../adapters/nosql_db/repo_axis_migration.py | 170 ++++ .../adapters/nosql_db/repo_axis_reader.py | 125 +++ .../storage/adapters/nosql_db/store.py | 39 +- .../storage/adapters/nosql_db/transport.py | 3 +- tests/test_cli_residual.py | 4 +- tests/test_dynamodb_compatible_store.py | 148 +++- tests/test_repo_axis_sharding.py | 789 ++++++++++++++++++ 14 files changed, 2058 insertions(+), 70 deletions(-) create mode 100644 docs/workbench/specs/issue-23-repo-gsi1-sharding/design.md create mode 100644 docs/workbench/specs/issue-23-repo-gsi1-sharding/milestones.md create mode 100644 docs/workbench/specs/issue-23-repo-gsi1-sharding/requirements.html create mode 100644 docs/workbench/specs/issue-23-repo-gsi1-sharding/requirements.md create mode 100644 src/security_scanner/storage/adapters/nosql_db/repo_axis.py create mode 100644 src/security_scanner/storage/adapters/nosql_db/repo_axis_migration.py create mode 100644 src/security_scanner/storage/adapters/nosql_db/repo_axis_reader.py create mode 100644 tests/test_repo_axis_sharding.py diff --git a/docs/workbench/specs/issue-23-repo-gsi1-sharding/design.md b/docs/workbench/specs/issue-23-repo-gsi1-sharding/design.md new file mode 100644 index 0000000..143d546 --- /dev/null +++ b/docs/workbench/specs/issue-23-repo-gsi1-sharding/design.md @@ -0,0 +1,314 @@ +# Issue 23 REPO GSI1 Sharding Design Spec + +## 개요 + +`REPO#` GSI1 단일 partition에 per-repo entity가 몰리는 hot partition +위험을 제거한다. 설계는 repo-local read 의미를 유지하면서, 새 row는 shard된 +repo axis에 쓰고 legacy `REPO#` item은 migration 기간에만 읽는다. + +## 요구사항 참조 + +- Phase 1 source: `requirements.md` +- Preview companion: `requirements.html` +- 승인된 핵심 요구사항: + - 설계만 먼저 고정하고 구현은 이후 실행 단계에서 진행한다. + - 전체 `REPO#` GSI1 axis를 대상으로 한다. + - Legacy read는 migration 기간용 compatibility path다. + - 기존 repo-local 정렬 의미와 read behavior를 보존한다. + - Cloud rollout package, canary, alarm, rollback runbook은 제외한다. + +## 접근 후보 + +### 후보 A: Fixed shard fan-out with scatter-gather reads + +새 row의 repo axis key를 `REPO##SHARD#` 형태로 분산한다. Repo-local +reader는 알려진 shard set 전체를 query한다. Migration mode에서만 legacy +`REPO#`도 같이 읽은 뒤 canonical order로 병합한다. + +이 방식을 선택한다. 현재 single-table/GSI 구조를 크게 바꾸지 않고, #23의 +hot partition 원인을 직접 줄이며, migration-only back-compat 요구사항과 잘 +맞는다. + +### 후보 B: Separate repo timeline index + +Shard된 write index와 별도의 timeline index를 둔다. Read path는 단순해질 수 +있지만 새 GSI 설계와 table schema migration 범위가 커진다. 또한 timeline index가 +repo 단일 partition이면 같은 hot partition 문제가 되돌아온다. + +### 후보 C: Entity-specific partial sharding + +`FINDING_OBSERVATION` 같은 고볼륨 entity만 먼저 shard한다. 구현은 작지만 +`FINDING_STATE`, `STATE_EVENT`, `SCAN_LEDGER`, 현재 코드의 `GHAS_ALERT` 같은 +repo-axis row가 남아 #23 완료 기준을 흐린다. + +## 아키텍처 + +```mermaid +flowchart TD + W["Write mapper"] --> RAK["RepoAxisKey helper"] + RAK --> S1["GSI1: REPO#repo#SHARD#00"] + RAK --> S2["GSI1: REPO#repo#SHARD#01"] + RAK --> SN["GSI1: REPO#repo#SHARD#N"] + Legacy["Legacy GSI1: REPO#repo"] --> Reader["RepoAxisReader"] + S1 --> Reader + S2 --> Reader + SN --> Reader + Reader --> Merge["dedupe and sort"] + Merge --> Residual["residual_for_repo"] + Merge --> RepoReads["repo-local state/event/ledger reads"] +``` + +### 구성요소 + +`RepoAxisKey` + +- 역할: repo-axis shard key 생성을 한 곳에서 소유한다. +- 입력: `repoAxisId`, entity type, stable shard material, optional timestamp. +- 출력: `gsi1pk`, `gsi1sk`, shard metadata fields. +- 의존성: deterministic hash function. + +`RepoAxisReader` + +- 역할: fan-out read를 storage caller에서 숨긴다. +- 입력: `repoAxisId`, entity prefix, sort mode, `include_legacy` flag. +- 출력: 중복 제거된 item list. +- 의존성: `query_all_pages`, configured shard count. + +`RepoAxisMigrationCompat` + +- 역할: backfill 기간 동안 legacy `REPO#` row를 읽을 수 있게 한다. +- 입력: `RepoAxisReader`와 같은 read request. +- 출력: compatibility가 켜진 동안만 포함되는 legacy row. +- 의존성: write ownership 없음. Legacy key를 permanent API로 만들면 안 된다. + +`repoAxisId` + +- 역할: sharding helper와 reader가 사용하는 단일 repo-axis identifier다. +- 입력 source는 entity별 기존 domain field를 그대로 사용한다. +- 목표: local scan target name, incremental `repo_id`, GHAS repository 값이 + helper 호출부마다 다르게 해석되는 일을 막는다. + +## 데이터 모델 + +### Sharded GSI1 key 형식 + +새 repo-axis row는 다음 형태를 쓴다. + +```text +gsi1pk = REPO##SHARD# +gsi1sk = +repoAxisVersion = 2 +repoAxisShardCount = 16 +repoAxisShard = +``` + +``은 `00`부터 `15`까지의 fixed-width deterministic 값이다. +`repoAxisVersion = 2`의 shard count는 16으로 고정한다. Shard count는 operator +runtime knob가 아니라 durable schema contract다. 나중에 count를 바꾸려면 +`repoAxisVersion = 3` 같은 새 version과 별도 rehash migration 또는 active version +fan-out design이 필요하다. + +### repoAxisId mapping + +| Entity | repoAxisId source | 비고 | +| --- | --- | --- | +| `FINDING` | `finding.repo.full_name` | local scan은 target name 계열을 유지한다. | +| `FINDING_OBSERVATION` | `finding.repo.full_name` | scan-worker path는 finding context를 `repo_id`와 맞춘다. | +| `FINDING_STATE` | `finding.repo.full_name` | direct primary read는 `findingId` 기준으로 유지한다. | +| `STATE_EVENT` | `event.repo` | repo-axis GSI1은 listing/index용이다. | +| `SCAN_LEDGER` | `entry.repo_id` | point read는 `ScanLedgerKey` primary key로 유지한다. | +| `GHAS_ALERT` | `alert.repository` | 현재 reader는 table scan이지만 mapper regression 대상이다. | + +Helper는 `repoAxisId` 문자열을 canonicalize하지 않는다. 입력 domain object가 이미 +소유한 repository identity를 전달받고, identity normalization은 해당 domain flow의 +책임으로 둔다. + +Shard material은 logical row마다 안정적이어야 한다. + +| Entity | Shard material | +| --- | --- | +| `FINDING` | `findingId` | +| `FINDING_OBSERVATION` | `scanRunId`, `findingId`, `occurrenceKey` | +| `FINDING_STATE` | `findingId` | +| `STATE_EVENT` | `findingId`, `decidedAt`, `eventSeq` | +| `SCAN_LEDGER` | `repoId`, `commitSha`, scanner tuple | +| `GHAS_ALERT` | `ghasAlertId` | + +`GHAS_ALERT`는 현재 `origin/main`에서 `gsi1pk = REPO#`로 mapping되므로 +포함한다. 앞으로 같은 repo-axis GSI1 partition에 쓰는 entity가 추가되면 +`RepoAxisKey`를 쓰거나 out of scope로 명시해야 한다. + +### Sort key 의미 + +기존 repo-local sort 의미를 보존한다. Reader가 chronological order에 의존하면 +`gsi1sk`에는 sortable time material이 있어야 한다. 기존 read가 identity/prefix +기반이면 sharding이 더 강한 time semantics를 새로 만들지 않는다. + +예시: + +- `STATE_EVENT###`는 time-sortable 상태와 stable + tie-breaker를 함께 제공한다. +- `RUN#...#OBS#...`는 residual derivation용 observation prefix를 유지한다. +- `LEDGER#...`는 repo ledger lookup/listing material을 유지한다. Chronological + ledger listing은 현재 behavior가 아니므로 새 requirement로 만들지 않는다. + +Scatter-gather reader가 ordered result를 반환할 때 canonical order는 +`(gsi1sk, PK, SK)`다. Entity별로 더 강한 ordering이 필요하면 spec에 별도 tuple을 +추가해야 한다. + +### 범위 분리 + +| Surface | 현재 storage path | #23 이후 path | 보존 기준 | +| --- | --- | --- | --- | +| `read_observations_for_repo` | GSI1 `REPO#` + `RUN#` prefix | `RepoAxisReader` fan-out + optional legacy mode | residual input parity | +| `FINDING` repo-axis index | GSI1 `REPO#` + `FINDING#` prefix | sharded GSI1 | mapper/index parity | +| `FINDING_STATE` repo-axis index | GSI1 `REPO#` + `FINDING#` prefix | sharded GSI1 | direct `read_finding_state` unchanged | +| `STATE_EVENT` repo-axis index | GSI1 `REPO#` + `STATE_EVENT#` prefix | sharded GSI1 with stable order | direct `read_finding_state_events` unchanged | +| `SCAN_LEDGER` repo-axis index | GSI1 `REPO#` + `LEDGER#` prefix | sharded GSI1 | `has_scan_ledger` point read unchanged | +| `GHAS_ALERT` repo-axis index | GSI1 `REPO#` | sharded GSI1 | current `read_ghas_alerts` scan behavior unchanged | +| `REF_STATE` listing | primary `PK = REPO#` + `REF#` prefix | retained primary path | unchanged | +| `SCAN_RUN` history | primary `PK = REPO#` + `SCAN_RUN#` prefix | retained primary path | unchanged | +| `SCAN_JOB` repo status index | GSI2 `REPO#` | out of GSI1 #23 scope | unchanged | + +## 데이터 흐름 + +### 새 write 경로 + +1. Item mapper가 base item을 만든다. +2. Repo-axis GSI1 entity는 mapper가 `RepoAxisKey`를 호출한다. +3. Mapper는 sharded `gsi1pk`만 쓴다. +4. Primary key는 별도 primary-key hotspot이 증명되기 전까지 유지한다. + `read_finding_state`, `has_scan_ledger`, per-finding event read는 직접 + primary-key behavior를 유지한다. + +### Repo-local read 경로 + +1. Caller가 repo-axis item을 entity prefix로 요청한다. +2. `RepoAxisReader`가 해당 repo의 모든 configured shard partition을 query한다. +3. Migration mode에서 `include_legacy=True`인 경우에만 legacy + `gsi1pk = REPO#`도 query한다. +4. Reader가 `(PK, SK)` 기준으로 defensive dedupe한다. 정상 steady state에서는 같은 + primary item이 legacy와 sharded partition에 동시에 존재하지 않는다. +5. 기존 behavior가 ordered result를 노출했다면 `(gsi1sk, PK, SK)` 기준으로 sort한다. +6. Caller는 sharding 전과 같은 logical result set을 받는다. + +`include_legacy` 기본값은 `False`다. Migration/backfill 검증 경로와 legacy +compatibility tests만 명시적으로 `True`를 사용한다. 이 flag가 영구 runtime +기능처럼 숨겨지면 안 된다. + +### Residual derivation 경로 + +`residual_for_repo`는 기존 ownership split을 유지한다. + +- `list_ref_states(repo_id)`는 primary `PK = REPO#` partition에서 + `REF_STATE` row를 읽는다. +- `read_observations_for_repo(repo_id)`는 `RUN#` prefix로 `RepoAxisReader`를 + 사용한다. 이 method가 #23에서 반드시 fan-out reader로 전환되는 기존 repo-local + read다. +- `residual_by_branch`는 tests가 ordering dependency를 드러내지 않는 한 pure + function으로 유지한다. + +`REF_STATE`는 현재 GSI1 repo-axis row가 아니다. 이 설계는 기존 listing behavior를 +보존하지만, 별도 primary-key hot partition 요구가 확인되기 전에는 이동하지 않는다. + +## 구성요소 상세 + +### Item mapping + +현재 `gsi1pk = REPO#...`를 내보내는 item mapper는 모두 `RepoAxisKey`를 거쳐야 +한다. 현재 알려진 entity는 다음과 같다. + +- `GHAS_ALERT` +- `FINDING` +- `FINDING_OBSERVATION` +- `FINDING_STATE` +- `STATE_EVENT` +- `SCAN_LEDGER` + +새 mapper가 helper 없이 raw `gsi1pk = f"REPO#..."`를 추가하면 test가 실패해야 +한다. + +### Reader compatibility + +Compatibility mode는 read-only다. + +- New writes는 legacy `REPO#` GSI1 key로 dual-write하지 않는다. +- Backfill은 별도 row copy가 아니라 existing primary item의 `gsi1pk`, `gsi1sk`, + `repoAxisVersion`, `repoAxisShardCount`, `repoAxisShard`를 conditional update한다. +- Reader는 migration mode에서만 legacy row를 포함한다. +- Legacy item과 sharded item은 같은 `(PK, SK)`로 공존할 수 없다. Dedupe는 GSI + eventual-consistency 또는 test double 방어용이다. + +### Migration 완료 기준 + +이 설계는 production rollout을 정의하지 않는다. 대신 compatibility code 제거 +gate를 정의한다. + +- Scope 안의 모든 legacy `REPO#` row type에 대해 entity별 inventory count, + updated/backfilled count, skipped count, failed count가 report된다. +- `FINDING`, `FINDING_OBSERVATION`, `FINDING_STATE`, `STATE_EVENT`, `SCAN_LEDGER`, + `GHAS_ALERT` 각각에서 remaining legacy `gsi1pk = REPO#` count가 0이다. +- Sharded-only read가 representative repo뿐 아니라 entity별 sampled repo에서 같은 + logical repo-local result를 반환한다. +- `include_legacy=False`에서 parity tests가 통과한다. +- Normal sharded read에서 legacy query call count가 0임을 test가 확인한다. +- Legacy-only, sharded-only, mixed legacy plus sharded state를 tests가 다룬다. +- Legacy read 제거가 normal sharded read를 바꾸지 않도록 named code path 또는 + flag가 있다. + +## 에러 처리 + +- Shard query 하나라도 실패하면 repo-axis read는 fail closed한다. Partial repo + result를 complete result처럼 반환하지 않는다. +- Legacy query와 sharded query가 같은 `(PK, SK)`를 반환하면 reader는 higher + `repoAxisVersion` item을 우선한다. 이는 expected steady state가 아니라 defensive + behavior다. +- `repoAxisVersion` 또는 `repoAxisShard`가 없는 item은 legacy partition에서 읽힌 + 경우에만 legacy로 취급한다. +- Shard count 변경은 별도 migration design이다. `repoAxisVersion = 2`는 shard + count 16을 durable schema contract로 둔다. + +## 테스트 전략 + +- `RepoAxisKey` deterministic bucket assignment unit test. +- `RepoAxisKey`가 `repoAxisVersion = 2`, `repoAxisShardCount = 16`, bucket `00`부터 + `15`를 쓰는지 검증하는 unit test. +- Entity별 `repoAxisId` source mapping test. +- 모든 repo-axis entity가 sharded `gsi1pk`를 쓰고 expected `gsi1sk` prefix를 + 유지한다는 mapper test. +- `STATE_EVENT` sharded `gsi1sk`가 `eventSeq` suffix를 포함한다는 mapper test. +- Fan-out query, defensive dedupe, `(gsi1sk, PK, SK)` ordering, legacy fallback을 + 검증하는 reader test. +- Shard query 하나가 실패하면 partial result를 반환하지 않는 fail-closed reader + test. +- `include_legacy=False` 기본값에서 legacy query call count가 0이라는 reader test. +- `residual_for_repo`가 legacy-only, sharded-only, mixed row에서 같은 logical + result를 반환한다는 residual test. +- `read_finding_state`, `has_scan_ledger` 같은 direct primary-key read가 유지됨을 + 보이는 storage adapter test. +- Backfill이 row copy가 아니라 existing primary item의 GSI projection field를 + conditional update한다는 migration test. +- Entity별 legacy inventory/backfilled/skipped/failed/remaining count report test. +- `src/security_scanner/storage/adapters/nosql_db` 안에서 helper 밖 raw + `gsi1pk = REPO#` construction을 잡는 regression scan. 허용 예외는 `RepoAxisKey` + helper와 legacy compatibility reader뿐이다. + +## 마일스톤 + +- M1: Repo-axis helper와 tests를 추가한다. `repoAxisVersion = 2`, shard count 16, + bucket range, entity별 `repoAxisId` mapping이 test로 고정되면 완료다. +- M2: Repo-axis mappers를 helper로 route한다. 현재 알려진 entity가 new writes에서 + legacy `REPO#` GSI1 key를 더 이상 내보내지 않고 `STATE_EVENT` ordering + suffix가 안정화되면 완료다. +- M3: Scatter-gather reader와 legacy fallback을 추가한다. Repo-local reads가 + `include_legacy=False` 기본값, fail-closed behavior, canonical ordering, + legacy-only, sharded-only, mixed state에서 통과하면 완료다. +- M4: Residual과 repo-local readers를 연결한다. `residual_for_repo`와 관련 storage + tests가 parity를 증명하면 완료다. +- M5: Migration removal gate를 구현 근처에 문서화한다. Entity별 inventory, + updated/backfilled, skipped, failed, remaining legacy count와 legacy query call + count 0 조건이 기록되면 완료다. + +## 열린 질문 + +- 현재 design gate에는 없다. Cloud rollout details는 의도적으로 이 spec 밖에 둔다. diff --git a/docs/workbench/specs/issue-23-repo-gsi1-sharding/milestones.md b/docs/workbench/specs/issue-23-repo-gsi1-sharding/milestones.md new file mode 100644 index 0000000..ca2a25a --- /dev/null +++ b/docs/workbench/specs/issue-23-repo-gsi1-sharding/milestones.md @@ -0,0 +1,70 @@ +# Milestones — issue-23-repo-gsi1-sharding + +Source design: `design.md` (approved). Execution via agentic-execution loop. + +## M1 RepoAxisKey helper + tests +- status: done +- evidence: `tests/test_repo_axis_sharding.py` M1 cases pass — version=2, + shardCount=16, bucket 00..15 fixed-width, deterministic, distributes across all + 16 buckets, projection shape, no id canonicalization, legacy pk unsharded. + +## M2 Route mappers through RepoAxisKey +- status: done +- evidence: 20 passed (M1+M2). All repo-axis mappers (FINDING/OBSERVATION/STATE, + STATE_EVENT, SCAN_LEDGER, GHAS_ALERT) emit sharded gsi1pk + metadata, preserve + gsi1sk prefixes; STATE_EVENT gsi1sk gains eventSeq suffix (GSI2 unchanged); + regression scan confirms no raw gsi1pk=REPO#/#SHARD# outside repo_axis modules. +- note: end-to-end read tests (residual/read_observations) are transiently red + here because writes are sharded but reads still hit the unsharded partition; + restored by M4 fan-out wiring. Verified jointly in the M4 full-suite run. + +## M3 Scatter-gather reader + legacy fallback +- status: done +- evidence: `repo_axis_reader.read_repo_axis` + tests — fans out across all 16 + shards, zero legacy queries when include_legacy=False, merges legacy when True, + dedupes (PK,SK) preferring higher repoAxisVersion, canonical (gsi1sk,PK,SK) + order, fails closed (no partial) when a shard query raises. + +## M4 Wire residual + repo-local readers +- status: done +- evidence: 69 passed. store.read_observations_for_repo now fans out via + read_repo_axis (include_legacy param, default False). residual parity proven + identical across legacy-only / sharded-only / mixed layouts; default read + issues no legacy partition query. Direct primary-key reads (read_finding_state, + has_scan_ledger, read_finding_state_events) unchanged — full suite green. + +## Multi-agent review (post-M5) +- 5-dimension review (correctness/security/design-spec/tests/migration-ops), + each finding adversarially verified by an independent opus agent. +- 16 findings raised, 9 survived verification (0 critical/high; 3 medium, 5 low, + 1 nit). All 9 addressed: + - regression scan hardened: single source-of-truth for REPO#/#SHARD# literals + in repo_axis.py; scan now bans any `#SHARD#` literal + raw gsi1pk=REPO# + a + GSI1 read on raw REPO# partition (construction-method-agnostic). + - residual parity test now drives the production residual_for_repo path with + include_legacy threaded through residual_for_repo + _ResidualStore. + - backfill projection moved inside try/except (one bad row → failed, not abort). + - STATE_EVENT partial-migration ordering caveat documented in the reader. + - migration no longer reaches into private items._shard_material; join + + key construction centralized in repo_axis.py (drift guard retained). + - added tests: backfill idempotent re-run; fake update_item enforces every + ConditionExpression clause; canonical (gsi1sk,PK,SK) tie-break; multi-page + pagination within a single shard. +- evidence: full suite 616 passed, ruff clean; changeset scoped to 6 modified + + 4 new files + spec docs. +- follow-up (no defer): #5 fully resolved — the per-entity repo-axis key formula + is now a single source of truth (`repo_axis.repo_axis_inputs` / + `repo_axis_projection_for_item`). Write mappers build the base item then merge + the projection derived from the item's own fields, the exact function the + backfill uses, so the two formulas can no longer drift (the parallel + derivation in repo_axis_migration was deleted). Still 616 passed, ruff clean. + +## M5 Migration removal gate +- status: done +- evidence: `repo_axis_migration.py` — backfill conditionally updates each + existing primary item's GSI projection in place (update_item, not row copy), + proven by the in-place test (row count unchanged, same PK/SK, now sharded); + per-entity inventory/backfilled/skipped/failed/remaining report with gate_clear; + drift guard pins backfilled projection == write-mapper projection for all 6 + entities; removal-gate checklist documented in the module docstring near the + implementation. Full suite 612 passed, ruff clean. diff --git a/docs/workbench/specs/issue-23-repo-gsi1-sharding/requirements.html b/docs/workbench/specs/issue-23-repo-gsi1-sharding/requirements.html new file mode 100644 index 0000000..9d53824 --- /dev/null +++ b/docs/workbench/specs/issue-23-repo-gsi1-sharding/requirements.html @@ -0,0 +1,95 @@ + + + + + +Issue 23 REPO GSI1 Sharding Requirements + + +
+

Issue 23 REPO GSI1 Sharding Requirements

+

승인 대상

+
    +
  • Source of truth: requirements.md
  • +
  • Preview companion: requirements.html
  • +
+

질문-답변 흐름

+

Q: #23의 목표 수준은 무엇인가?

+

우선 설계만 고정한다. 구현은 이후 별도 실행 단계에서 진행한다.

+

선택하지 않은 범위:

+
    +
  • 구현 가능한 최소 요구사항까지 즉시 확정
  • +
  • cloud rollout 완성 기준까지 포함
  • +
+

Q: 기존 단일-partition row는 어떻게 취급하는가?

+

Migration 기간에만 back-compat을 제공한다.

+

새 shard 구조는 legacy REPO#<repo> row를 migration/backfill 기간 동안 읽을 수 있어야 한다. 단, legacy read는 영구 기능이 아니라 제거 가능한 compatibility path로 설계한다.

+

Q: Sharding 적용 범위는 어디까지인가?

+

전체 REPO#<repo> GSI1 axis를 대상으로 한다.

+

FINDING, FINDING_OBSERVATION, FINDING_STATE, STATE_EVENT, SCAN_LEDGER, GHAS_ALERT처럼 현재 gsi1pk = REPO#<repo>에 모이는 per-repo entity는 같은 설계 원칙으로 다룬다. 고볼륨 entity만 먼저 쪼개거나 residual_for_repo만 별도 처리하는 부분 해결은 #23 요구사항으로 보지 않는다.

+

Q: Sharding 후 정렬/조회 보존 기준은 무엇인가?

+

기존 repo-local 정렬 semantics와 read behavior를 보존한다.

+

Shard 전 기존 read behavior가 제공하던 repo-local 정렬 의미를 보존한다. 즉 time-sortable key가 이미 있는 observations/state events는 shard 뒤에도 같은 의미로 재구성할 수 있어야 한다. SCAN_LEDGER처럼 현재 chronological listing을 제공하지 않는 entity에 새 시간순 API를 추가하는 것은 이번 요구사항이 아니다. 여러 repo를 섞은 global operational timeline도 이번 요구사항의 필수 범위가 아니다.

+

Q: 기존 read behavior 중 무엇을 보존해야 하는가?

+

Issue #23은 hot partition 해소 작업이며, 사용자 관점의 repo-local read 의미를 바꾸는 작업이 아니다.

+

다음 동작은 shard 뒤에도 의미가 유지되어야 한다.

+
    +
  • Per-repo residual derivation
  • +
  • Observation reads for a repo
  • +
  • Finding state and state event reads
  • +
  • Scan ledger point-read behavior
  • +
  • REF_STATE listing for a repo
  • +
+

Q: Migration 완료 판단 기준은 어디까지 요구하는가?

+

이번 Phase 1에서는 production rollout 완료 기준을 요구하지 않는다.

+

대신 설계는 legacy compatibility path를 제거할 수 있는 조건을 명시해야 한다. 예를 들어 모든 legacy REPO#<repo> rows가 backfill 대상에서 해소되고, 새 read path가 같은 repo-local 결과를 반환한다는 검증 지점이 있어야 한다. 실제 canary, alarm, rollback runbook은 cloud rollout package 범위로 남긴다.

+

기능 요구사항

+
    +
  • REPO#<repo> GSI1 단일 partition에 per-repo entity가 몰려 생기는 cloud-scale hot partition 위험을 설계 수준에서 해소한다.
  • +
  • Phase 1 산출물은 요구사항 source를 확정하는 데 집중하며 구현 코드는 변경하지 않는다.
  • +
  • Legacy REPO#<repo> row는 migration/backfill 기간 동안 읽을 수 있어야 한다.
  • +
  • Legacy read path는 제거 가능한 compatibility path여야 하며 영구 API 계약으로 고정하지 않는다.
  • +
  • Sharding 적용 범위는 전체 REPO#<repo> GSI1 axis다.
  • +
  • FINDING, FINDING_OBSERVATION, FINDING_STATE, STATE_EVENT, SCAN_LEDGER, GHAS_ALERT 모두 hot partition 회피 설계 범위에 포함한다.
  • +
  • Shard 뒤에도 기존 repo-local 정렬 의미를 보존한다.
  • +
  • Per-repo residual derivation, observation reads, finding state/event reads, scan ledger point reads, REF_STATE listing의 사용자 관찰 의미를 보존한다.
  • +
  • GSI1 repo-axis sharding과 primary-key retained path를 설계에서 분리해 설명한다.
  • +
  • 설계는 legacy compatibility path를 제거할 수 있는 migration 완료 조건을 entity별 countable 기준으로 명시한다.
  • +
  • Cloud rollout package, canary, alarm, rollback runbook은 이번 요구사항의 필수 범위가 아니다.
  • +
+

비기능 요구사항

+ + + + + + + +
항목요구값
Public repo safetySynthetic/public-safe 내용만 기록한다. 실제 조직명, host, secret, private finding은 기록하지 않는다.
ScopeLocal/MVP blocker가 아니라 cloud-scale 전 설계 결정으로 다룬다.
Query semantics기존 repo-local 정렬 의미와 read behavior를 보존한다.
Migration postureLegacy read는 migration 기간용 compatibility path이며 entity별 countable 제거 조건을 가져야 한다.
Approval gaterequirements.md 승인 전에는 design.md를 작성하지 않는다.
+

사용자 시나리오

+
    +
  • Maintainer가 #23 구현 전에 sharding 범위와 migration 호환성 원칙을 확인한다.
  • +
  • Implementer가 legacy read를 영구 기능으로 오해하지 않고 제거 가능한 경로로 설계한다.
  • +
  • Implementer가 observation만 shard하고 state/event/ledger를 남기는 부분 해결을 #23 완료로 오해하지 않는다.
  • +
  • Operator가 cloud rollout package 없이도 설계 단계에서 legacy path 제거 조건을 이해한다.
  • +
+

미결정 항목

+
    +
  • 없음. requirements.md 승인 후 Phase 2에서 접근 후보와 trade-off를 제시한다.
  • +
+
+ diff --git a/docs/workbench/specs/issue-23-repo-gsi1-sharding/requirements.md b/docs/workbench/specs/issue-23-repo-gsi1-sharding/requirements.md new file mode 100644 index 0000000..3eb2490 --- /dev/null +++ b/docs/workbench/specs/issue-23-repo-gsi1-sharding/requirements.md @@ -0,0 +1,112 @@ +# Issue 23 REPO GSI1 Sharding Requirements + +## 승인 대상 + +- Source of truth: `requirements.md` +- Preview companion: `requirements.html` + +## 질문-답변 흐름 + +### Q: #23의 목표 수준은 무엇인가? + +우선 설계만 고정한다. 구현은 이후 별도 실행 단계에서 진행한다. + +선택하지 않은 범위: + +- 구현 가능한 최소 요구사항까지 즉시 확정 +- cloud rollout 완성 기준까지 포함 + +### Q: 기존 단일-partition row는 어떻게 취급하는가? + +Migration 기간에만 back-compat을 제공한다. + +새 shard 구조는 legacy `REPO#` row를 migration/backfill 기간 동안 읽을 수 +있어야 한다. 단, legacy read는 영구 기능이 아니라 제거 가능한 compatibility +path로 설계한다. + +### Q: Sharding 적용 범위는 어디까지인가? + +전체 `REPO#` GSI1 axis를 대상으로 한다. + +`FINDING`, `FINDING_OBSERVATION`, `FINDING_STATE`, `STATE_EVENT`, `SCAN_LEDGER`, +`GHAS_ALERT`처럼 현재 `gsi1pk = REPO#`에 모이는 per-repo entity는 같은 +설계 원칙으로 다룬다. 고볼륨 entity만 먼저 쪼개거나 `residual_for_repo`만 별도 +처리하는 부분 해결은 #23 요구사항으로 보지 않는다. + +### Q: Sharding 후 정렬/조회 보존 기준은 무엇인가? + +기존 repo-local 정렬 semantics와 read behavior를 보존한다. + +Shard 전 기존 read behavior가 제공하던 repo-local 정렬 의미를 보존한다. 즉 +time-sortable key가 이미 있는 observations/state events는 shard 뒤에도 같은 +의미로 재구성할 수 있어야 한다. `SCAN_LEDGER`처럼 현재 chronological listing을 +제공하지 않는 entity에 새 시간순 API를 추가하는 것은 이번 요구사항이 아니다. +여러 repo를 섞은 global operational timeline도 이번 요구사항의 필수 범위가 +아니다. + +### Q: 기존 read behavior 중 무엇을 보존해야 하는가? + +Issue #23은 hot partition 해소 작업이며, 사용자 관점의 repo-local read 의미를 +바꾸는 작업이 아니다. + +다음 동작은 shard 뒤에도 의미가 유지되어야 한다. + +- Per-repo residual derivation +- Observation reads for a repo +- Finding state and state event reads +- Scan ledger point-read behavior +- `REF_STATE` listing for a repo + +### Q: Migration 완료 판단 기준은 어디까지 요구하는가? + +이번 Phase 1에서는 production rollout 완료 기준을 요구하지 않는다. + +대신 설계는 legacy compatibility path를 제거할 수 있는 조건을 명시해야 한다. +예를 들어 모든 legacy `REPO#` rows가 backfill 대상에서 해소되고, 새 read +path가 같은 repo-local 결과를 반환한다는 검증 지점이 있어야 한다. 실제 canary, +alarm, rollback runbook은 cloud rollout package 범위로 남긴다. + +## 기능 요구사항 + +- `REPO#` GSI1 단일 partition에 per-repo entity가 몰려 생기는 cloud-scale + hot partition 위험을 설계 수준에서 해소한다. +- Phase 1 산출물은 요구사항 source를 확정하는 데 집중하며 구현 코드는 변경하지 + 않는다. +- Legacy `REPO#` row는 migration/backfill 기간 동안 읽을 수 있어야 한다. +- Legacy read path는 제거 가능한 compatibility path여야 하며 영구 API 계약으로 + 고정하지 않는다. +- Sharding 적용 범위는 전체 `REPO#` GSI1 axis다. +- `FINDING`, `FINDING_OBSERVATION`, `FINDING_STATE`, `STATE_EVENT`, `SCAN_LEDGER`, + `GHAS_ALERT` 모두 hot partition 회피 설계 범위에 포함한다. +- Shard 뒤에도 기존 repo-local 정렬 의미를 보존한다. +- Per-repo residual derivation, observation reads, finding state/event reads, + scan ledger point reads, `REF_STATE` listing의 사용자 관찰 의미를 보존한다. +- GSI1 repo-axis sharding과 primary-key retained path를 설계에서 분리해 설명한다. +- 설계는 legacy compatibility path를 제거할 수 있는 migration 완료 조건을 + entity별 countable 기준으로 명시한다. +- Cloud rollout package, canary, alarm, rollback runbook은 이번 요구사항의 + 필수 범위가 아니다. + +## 비기능 요구사항 + +| 항목 | 요구값 | +| --- | --- | +| Public repo safety | Synthetic/public-safe 내용만 기록한다. 실제 조직명, host, secret, private finding은 기록하지 않는다. | +| Scope | Local/MVP blocker가 아니라 cloud-scale 전 설계 결정으로 다룬다. | +| Query semantics | 기존 repo-local 정렬 의미와 read behavior를 보존한다. | +| Migration posture | Legacy read는 migration 기간용 compatibility path이며 entity별 countable 제거 조건을 가져야 한다. | +| Approval gate | `requirements.md` 승인 전에는 `design.md`를 작성하지 않는다. | + +## 사용자 시나리오 + +- Maintainer가 #23 구현 전에 sharding 범위와 migration 호환성 원칙을 확인한다. +- Implementer가 legacy read를 영구 기능으로 오해하지 않고 제거 가능한 경로로 + 설계한다. +- Implementer가 observation만 shard하고 state/event/ledger를 남기는 부분 해결을 + #23 완료로 오해하지 않는다. +- Operator가 cloud rollout package 없이도 설계 단계에서 legacy path 제거 조건을 + 이해한다. + +## 미결정 항목 + +- 없음. `requirements.md` 승인 후 Phase 2에서 접근 후보와 trade-off를 제시한다. diff --git a/src/security_scanner/runtime/branch_residual.py b/src/security_scanner/runtime/branch_residual.py index f365b1a..b008987 100644 --- a/src/security_scanner/runtime/branch_residual.py +++ b/src/security_scanner/runtime/branch_residual.py @@ -117,16 +117,25 @@ class _ResidualStore(Protocol): def list_ref_states(self, repo_id: str) -> list[RefState]: """Return ref states for the repository.""" - def read_observations_for_repo(self, repo_id: str) -> list[Mapping[str, Any]]: + def read_observations_for_repo( + self, repo_id: str, *, include_legacy: bool = False + ) -> list[Mapping[str, Any]]: """Return observation records for the repository.""" -def residual_for_repo(store: _ResidualStore, repo_id: str) -> list[BranchResidual]: +def residual_for_repo( + store: _ResidualStore, repo_id: str, *, include_legacy: bool = False +) -> list[BranchResidual]: """End-to-end per-branch residual for one repository. Reads ref states + observations within the repo partition and derives residual. Status/disposition stays global (L1); this is a derived view. + ``include_legacy`` is the migration-only compatibility path (issue #23): it + folds the pre-sharding ``REPO#`` GSI1 partition into the observation + fan-out so residual stays correct while a repo is partially backfilled. It + defaults to ``False`` so steady-state residual issues zero legacy queries. + NOTE: meaningful only for incrementally-scanned repos, where REF_STATE rows and observation ``gsi1pk`` are both keyed by the same ``repo_id`` (the scan-worker path sets ``repo_full_name == repo_id``). For local_scan-only @@ -134,5 +143,5 @@ def residual_for_repo(store: _ResidualStore, repo_id: str) -> list[BranchResidua """ return residual_by_branch( store.list_ref_states(repo_id), - store.read_observations_for_repo(repo_id), + store.read_observations_for_repo(repo_id, include_legacy=include_legacy), ) diff --git a/src/security_scanner/storage/adapters/nosql_db/items.py b/src/security_scanner/storage/adapters/nosql_db/items.py index 6a6d8e3..e577e27 100644 --- a/src/security_scanner/storage/adapters/nosql_db/items.py +++ b/src/security_scanner/storage/adapters/nosql_db/items.py @@ -14,6 +14,9 @@ from security_scanner.catalog.scan_target import ScanTarget from security_scanner.core.finding.model import Finding +from security_scanner.storage.adapters.nosql_db.repo_axis import ( + repo_axis_projection_for_item, +) from security_scanner.storage.adapters.nosql_db.transport import ( REPO_LIST_PK, TARGET_LIST_PK, @@ -241,13 +244,11 @@ def ghas_alert_to_item(alert: GhasAlertRecord) -> dict[str, Any]: """Map redacted GHAS alert metadata into the NoSQL item shape.""" fetched_at = datetime_to_iso(alert.fetched_at) comparison_key = alert.comparison_key - return without_none( + item = without_none( { "PK": f"GHAS_ALERT#{alert.ghas_alert_id}", "SK": "META", "entityType": "GHAS_ALERT", - "gsi1pk": f"REPO#{alert.repository}", - "gsi1sk": f"GHAS_ALERT#{fetched_at}#{alert.ghas_alert_id}", "gsi2pk": GHAS_ALERT_LIST_PK, "gsi2sk": f"{fetched_at}#{alert.repository}#{alert.ghas_alert_id}", "ghasAlertId": alert.ghas_alert_id, @@ -274,6 +275,8 @@ def ghas_alert_to_item(alert: GhasAlertRecord) -> dict[str, Any]: "fetchedAt": fetched_at, } ) + item.update(repo_axis_projection_for_item(item)) + return item def ghas_alert_from_item(item: dict[str, Any]) -> GhasAlertRecord: @@ -506,14 +509,9 @@ def scan_ledger_key_to_key(key: ScanLedgerKey) -> dict[str, str]: def scan_ledger_entry_to_item(entry: ScanLedgerEntry) -> dict[str, Any]: """Map a SCAN_LEDGER domain object into the NoSQL item shape.""" scanned_at = datetime_to_iso(entry.scanned_at) - return { + item = { **scan_ledger_key_to_key(entry.key), "entityType": "SCAN_LEDGER", - "gsi1pk": f"REPO#{entry.repo_id}", - "gsi1sk": ( - f"LEDGER#{entry.commit_sha}#{entry.scanner_name}#" - f"{entry.rule_pack_version}" - ), "repoId": entry.repo_id, "commitSha": entry.commit_sha, "scannerName": entry.scanner_name, @@ -525,6 +523,8 @@ def scan_ledger_entry_to_item(entry: ScanLedgerEntry) -> dict[str, Any]: "scannedAt": scanned_at, "findingCount": entry.finding_count, } + item.update(repo_axis_projection_for_item(item)) + return item def scan_ledger_entry_from_item(item: dict[str, Any]) -> ScanLedgerEntry: @@ -585,13 +585,11 @@ def new_state_event_seq() -> str: def finding_state_event_to_item(event: FindingStateEvent) -> dict[str, Any]: """Map a finding lifecycle transition into an append-only event item.""" created_at = now_iso() - return without_none( + item = without_none( { "PK": f"FINDING#{event.finding_id}", "SK": f"STATE_EVENT#{event.decided_at}#{event.event_seq}", "entityType": "STATE_EVENT", - "gsi1pk": f"REPO#{event.repo}", - "gsi1sk": f"STATE_EVENT#{event.decided_at}#{event.finding_id}", "gsi2pk": f"RULE#{event.rule_id}", "gsi2sk": f"STATE_EVENT#{event.decided_at}#{event.finding_id}", "createdAt": created_at, @@ -610,6 +608,10 @@ def finding_state_event_to_item(event: FindingStateEvent) -> dict[str, Any]: "eventSeq": event.event_seq, } ) + # repo-axis GSI1 (sharded gsi1pk + eventSeq-suffixed gsi1sk) derived from the + # item's own fields, the same path the backfill uses. + item.update(repo_axis_projection_for_item(item)) + return item def finding_state_event_from_item(item: dict[str, Any]) -> FindingStateEvent: @@ -664,8 +666,6 @@ def finding_to_items(finding: Finding) -> list[dict[str, Any]]: "PK": f"FINDING#{finding_id}", "SK": "META", "entityType": "FINDING", - "gsi1pk": f"REPO#{repo}", - "gsi1sk": f"FINDING#{finding_id}", "gsi2pk": f"RULE#{rule_id}", "gsi2sk": f"FINDING#{finding_id}", "createdAt": now, @@ -686,8 +686,6 @@ def finding_to_items(finding: Finding) -> list[dict[str, Any]]: "PK": f"RUN#{run_id}", "SK": f"OBS#{finding_id}#{occurrence_key}", "entityType": "FINDING_OBSERVATION", - "gsi1pk": f"REPO#{repo}", - "gsi1sk": f"RUN#{run_id}#OBS#{finding_id}#{occurrence_key}", "gsi2pk": f"RULE#{rule_id}", "gsi2sk": f"RUN#{run_id}#{repo}#{finding_id}#{occurrence_key}", "createdAt": now, @@ -712,8 +710,6 @@ def finding_to_items(finding: Finding) -> list[dict[str, Any]]: "PK": f"FINDING#{finding_id}", "SK": f"STATE#{STATE_SCOPE_GLOBAL}", "entityType": "FINDING_STATE", - "gsi1pk": f"REPO#{repo}", - "gsi1sk": f"FINDING#{finding_id}", "gsi2pk": f"RULE#{rule_id}", "gsi2sk": f"FINDING#{finding_id}", "createdAt": now, @@ -727,11 +723,16 @@ def finding_to_items(finding: Finding) -> list[dict[str, Any]]: "triage": finding.triage.to_dict(), "version": 0, } - return [ + items = [ without_none(identity_item), without_none(observation_item), without_none(state_item), ] + # repo-axis GSI1 keys derived from each item's own fields — the single + # formula the backfill also uses, so writes and backfills cannot drift. + for item in items: + item.update(repo_axis_projection_for_item(item)) + return items def scan_date(scan_at_iso: str) -> str: diff --git a/src/security_scanner/storage/adapters/nosql_db/repo_axis.py b/src/security_scanner/storage/adapters/nosql_db/repo_axis.py new file mode 100644 index 0000000..7669933 --- /dev/null +++ b/src/security_scanner/storage/adapters/nosql_db/repo_axis.py @@ -0,0 +1,205 @@ +"""Repo-axis GSI1 sharding helper and scatter-gather reader (issue #23). + +`REPO#` GSI1 used to be a single partition per repository, so every +per-repo entity row collided on one hot partition. This module owns the sharded +repo-axis key contract so that new rows spread across a fixed set of shard +partitions while repo-local reads stay logically identical. + +``RepoAxisKey`` owns the sharded write key (``gsi1pk`` + shard metadata) and is +the single source of truth for the repo-axis partition format: the literal +``REPO#`` prefix and ``#SHARD#`` infix exist only in this module. The reader +(:mod:`repo_axis_reader`) and the migration (:mod:`repo_axis_migration`) compose +keys through the helpers here rather than typing the literals, so the regression +scan can treat this module as the sole allowed construction site. + +This module deliberately imports nothing else from the ``nosql_db`` package so +that ``items`` (the write mappers) can depend on it without an import cycle. + +Shard count is a durable schema contract, not a runtime knob. ``repoAxisVersion += 2`` pins ``repoAxisShardCount = 16``. Changing the count requires a new +version (e.g. ``repoAxisVersion = 3``) plus a separate rehash migration or an +active-version fan-out design. +""" + +from __future__ import annotations + +import hashlib +from dataclasses import dataclass +from typing import Any + +REPO_AXIS_VERSION = 2 +REPO_AXIS_SHARD_COUNT = 16 +#: Literal repo-axis partition prefix/infix. Defined once here; every other +#: module composes keys via the helpers below instead of retyping these. +REPO_AXIS_PARTITION_PREFIX = "REPO#" +REPO_AXIS_SHARD_INFIX = "#SHARD#" + + +def repo_axis_material(*parts: str) -> str: + """Join stable shard inputs into one deterministic material string. + + Uses a NUL separator so distinct field boundaries cannot collide (e.g. + ``("a", "bc")`` differs from ``("ab", "c")``). Shared by the write mappers + and the backfill so both derive identical shard buckets. + """ + return "\0".join(parts) + + +def repo_axis_shard( + shard_material: str, *, shard_count: int = REPO_AXIS_SHARD_COUNT +) -> str: + """Return the fixed-width deterministic shard bucket for stable material. + + The bucket is derived from a SHA-256 digest of ``shard_material`` so the same + logical row always lands in the same partition regardless of process or host. + """ + if shard_count <= 0: + raise ValueError("shard_count must be positive") + digest = hashlib.sha256(shard_material.encode("utf-8")).hexdigest() + bucket = int(digest, 16) % shard_count + return f"{bucket:0{_bucket_width(shard_count)}d}" + + +def _bucket_width(shard_count: int) -> int: + """Return the zero-pad width for the largest bucket index.""" + return len(str(shard_count - 1)) + + +def sharded_repo_axis_pk(repo_axis_id: str, bucket: str) -> str: + """Return the sharded ``REPO##SHARD#`` partition key.""" + return f"{REPO_AXIS_PARTITION_PREFIX}{repo_axis_id}{REPO_AXIS_SHARD_INFIX}{bucket}" + + +def legacy_repo_axis_pk(repo_axis_id: str) -> str: + """Return the pre-sharding ``REPO#`` GSI1 partition key. + + Used only by the migration-only legacy read path; new writes never use it. + """ + return f"{REPO_AXIS_PARTITION_PREFIX}{repo_axis_id}" + + +@dataclass(frozen=True) +class RepoAxisKey: + """Sharded repo-axis GSI1 key for one logical row. + + Construct with :meth:`build` from the repo-axis identity, the existing entity + sort key, and stable shard material. ``gsi1sk`` is passed through unchanged so + each entity keeps its established prefix and sort semantics; this helper only + owns the sharded ``gsi1pk`` and the durable shard metadata. + """ + + gsi1pk: str + gsi1sk: str + repo_axis_shard: str + repo_axis_version: int = REPO_AXIS_VERSION + repo_axis_shard_count: int = REPO_AXIS_SHARD_COUNT + + @classmethod + def build( + cls, + *, + repo_axis_id: str, + gsi1sk: str, + shard_material: str, + shard_count: int = REPO_AXIS_SHARD_COUNT, + ) -> RepoAxisKey: + """Build a sharded repo-axis key. + + ``repo_axis_id`` is the repository identity already owned by the calling + domain object; this helper does not canonicalize it. ``shard_material`` + must be stable for the logical row across rescans. + """ + bucket = repo_axis_shard(shard_material, shard_count=shard_count) + return cls( + gsi1pk=sharded_repo_axis_pk(repo_axis_id, bucket), + gsi1sk=gsi1sk, + repo_axis_shard=bucket, + repo_axis_shard_count=shard_count, + ) + + def projection(self) -> dict[str, Any]: + """Return the GSI1 projection fields to merge into an item mapping.""" + return { + "gsi1pk": self.gsi1pk, + "gsi1sk": self.gsi1sk, + "repoAxisVersion": self.repo_axis_version, + "repoAxisShardCount": self.repo_axis_shard_count, + "repoAxisShard": self.repo_axis_shard, + } + + +#: In-scope repo-axis GSI1 entities (design "Item mapping"). +REPO_AXIS_ENTITY_TYPES: tuple[str, ...] = ( + "FINDING", + "FINDING_OBSERVATION", + "FINDING_STATE", + "STATE_EVENT", + "SCAN_LEDGER", + "GHAS_ALERT", +) + + +def repo_axis_inputs(item: dict[str, Any]) -> tuple[str, str, str]: + """Derive ``(repo_axis_id, gsi1sk, shard_material)`` from a row's own fields. + + This is the SINGLE source of truth for the per-entity repo-axis key formula, + keyed by ``entityType`` and reading only the stored domain field names. Both + the write mappers (:mod:`items`) and the backfill (:mod:`repo_axis_migration`) + feed an item dict through here, so the sharded ``gsi1pk``/``gsi1sk`` for a + fresh write and for a backfilled legacy row are computed by the exact same + code — they cannot drift. + + ``repo_axis_id`` is taken verbatim from the entity's own repository field; it + is never canonicalized here (design "repoAxisId mapping"). + """ + entity_type = item.get("entityType") + if entity_type in ("FINDING", "FINDING_STATE"): + return item["repo"], f"FINDING#{item['findingId']}", item["findingId"] + if entity_type == "FINDING_OBSERVATION": + return ( + item["repo"], + f"RUN#{item['scanRunId']}#OBS#{item['findingId']}#{item['occurrenceKey']}", + repo_axis_material( + item["scanRunId"], item["findingId"], item["occurrenceKey"] + ), + ) + if entity_type == "STATE_EVENT": + # eventSeq suffix is the stable tie-breaker for same-decidedAt events. + return ( + item["repo"], + f"STATE_EVENT#{item['decidedAt']}#{item['findingId']}#{item['eventSeq']}", + repo_axis_material(item["findingId"], item["decidedAt"], item["eventSeq"]), + ) + if entity_type == "SCAN_LEDGER": + return ( + item["repoId"], + f"LEDGER#{item['commitSha']}#{item['scannerName']}#{item['rulePackVersion']}", + repo_axis_material( + item["repoId"], + item["commitSha"], + item["scannerName"], + item["scannerVersion"], + item["rulePackVersion"], + item["scannerConfigHash"], + ), + ) + if entity_type == "GHAS_ALERT": + return ( + item["repository"], + f"GHAS_ALERT#{item['fetchedAt']}#{item['ghasAlertId']}", + item["ghasAlertId"], + ) + raise ValueError(f"unsupported repo-axis entity type: {entity_type!r}") + + +def repo_axis_projection_for_item(item: dict[str, Any]) -> dict[str, Any]: + """Return the sharded GSI1 projection for an item from its own fields. + + Used by both the write mappers and the backfill so the two never diverge. + """ + repo_axis_id, gsi1sk, shard_material = repo_axis_inputs(item) + return RepoAxisKey.build( + repo_axis_id=repo_axis_id, + gsi1sk=gsi1sk, + shard_material=shard_material, + ).projection() diff --git a/src/security_scanner/storage/adapters/nosql_db/repo_axis_migration.py b/src/security_scanner/storage/adapters/nosql_db/repo_axis_migration.py new file mode 100644 index 0000000..ff6cc12 --- /dev/null +++ b/src/security_scanner/storage/adapters/nosql_db/repo_axis_migration.py @@ -0,0 +1,170 @@ +"""Repo-axis backfill and the compatibility-removal gate (issue #23). + +This module does NOT define a production rollout. It defines the mechanics and +the *gate* for retiring the migration-only legacy read path +(``include_legacy=True`` in :mod:`repo_axis_reader`). + +Backfill upgrades pre-sharding rows in place. A legacy row lives on the +unsharded ``REPO#`` GSI1 partition with no ``repoAxisVersion``. Backfill +recomputes the sharded projection from the row's own attributes and issues a +*conditional update of the existing primary item* — it never copies the row to a +new key. The condition guards against clobbering an already-sharded row. + +Removal gate — the legacy read path may be deleted only when ALL hold, per +``design.md`` "Migration 완료 기준": + +1. For every in-scope entity (``FINDING``, ``FINDING_OBSERVATION``, + ``FINDING_STATE``, ``STATE_EVENT``, ``SCAN_LEDGER``, ``GHAS_ALERT``) the + backfill report shows inventory, backfilled, skipped, failed counts and a + ``remaining`` legacy count of 0. +2. Sharded-only reads (``include_legacy=False``) return the same logical + repo-local result as before, validated per entity on sampled repos. +3. Parity tests pass with ``include_legacy=False``. +4. A test confirms normal sharded reads issue zero legacy query calls. +5. Legacy-only, sharded-only, and mixed states are all covered by tests. +6. The legacy read lives behind the named ``include_legacy`` flag so removing it + cannot change the default sharded read path. + +When the gate is met, delete the ``include_legacy`` branch in +``repo_axis_reader.read_repo_axis`` and the ``include_legacy`` parameter on +``DynamoDbCompatibleFindingStore.read_observations_for_repo``; this module's +inventory/backfill mechanics may then be retired with it. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +from security_scanner.storage.adapters.nosql_db.access import scan_all_pages +from security_scanner.storage.adapters.nosql_db.repo_axis import ( + REPO_AXIS_ENTITY_TYPES, + REPO_AXIS_PARTITION_PREFIX, + REPO_AXIS_SHARD_INFIX, + repo_axis_projection_for_item, +) +from security_scanner.storage.adapters.nosql_db.store import ( + _is_conditional_check_failure, +) + +# Re-exported: backfill and the write mappers share this single key formula. +__all__ = [ + "EntityBackfillCounts", + "RepoAxisBackfillReport", + "backfill_repo_axis", + "inventory_legacy_repo_axis", + "is_legacy_repo_axis_item", + "repo_axis_projection_for_item", +] + + +@dataclass(frozen=True) +class EntityBackfillCounts: + """Per-entity backfill outcome for the removal gate report.""" + + inventory: int + backfilled: int + skipped: int + failed: int + remaining: int + + +@dataclass(frozen=True) +class RepoAxisBackfillReport: + """Per-entity backfill counts; ``remaining == 0`` everywhere clears the gate.""" + + by_entity: dict[str, EntityBackfillCounts] + + @property + def gate_clear(self) -> bool: + """True when no in-scope entity has remaining legacy rows.""" + return all(counts.remaining == 0 for counts in self.by_entity.values()) + + +def is_legacy_repo_axis_item(item: dict[str, Any]) -> bool: + """Return True for a pre-sharding repo-axis row (unsharded, no version).""" + if item.get("entityType") not in REPO_AXIS_ENTITY_TYPES: + return False + gsi1pk = str(item.get("gsi1pk", "")) + return ( + gsi1pk.startswith(REPO_AXIS_PARTITION_PREFIX) + and REPO_AXIS_SHARD_INFIX not in gsi1pk + and "repoAxisVersion" not in item + ) + + +def backfill_repo_axis(table: Any) -> RepoAxisBackfillReport: + """Backfill every legacy repo-axis row in place and return the gate report. + + For each in-scope entity: inventory the legacy rows, conditionally update each + existing primary item with its sharded projection, then re-inventory to report + the remaining legacy count. + """ + by_entity: dict[str, EntityBackfillCounts] = {} + for entity_type in REPO_AXIS_ENTITY_TYPES: + legacy = _scan_legacy(table, entity_type) + backfilled = skipped = failed = 0 + for item in legacy: + outcome = _backfill_one(table, item) + if outcome == "backfilled": + backfilled += 1 + elif outcome == "skipped": + skipped += 1 + else: + failed += 1 + remaining = len(_scan_legacy(table, entity_type)) + by_entity[entity_type] = EntityBackfillCounts( + inventory=len(legacy), + backfilled=backfilled, + skipped=skipped, + failed=failed, + remaining=remaining, + ) + return RepoAxisBackfillReport(by_entity=by_entity) + + +def _scan_legacy(table: Any, entity_type: str) -> list[dict[str, Any]]: + items = scan_all_pages( + table, + FilterExpression="entityType = :entity_type", + ExpressionAttributeValues={":entity_type": entity_type}, + ) + return [item for item in items if is_legacy_repo_axis_item(item)] + + +def _backfill_one(table: Any, item: dict[str, Any]) -> str: + try: + # Inside the try so one malformed legacy row is classified 'failed' + # instead of aborting the whole migration pass. + projection = repo_axis_projection_for_item(item) + table.update_item( + Key={"PK": item["PK"], "SK": item["SK"]}, + UpdateExpression=( + "SET gsi1pk = :pk, gsi1sk = :sk, repoAxisVersion = :v, " + "repoAxisShardCount = :c, repoAxisShard = :s" + ), + ConditionExpression=( + "attribute_exists(PK) AND attribute_exists(SK) AND " + "attribute_not_exists(repoAxisVersion)" + ), + ExpressionAttributeValues={ + ":pk": projection["gsi1pk"], + ":sk": projection["gsi1sk"], + ":v": projection["repoAxisVersion"], + ":c": projection["repoAxisShardCount"], + ":s": projection["repoAxisShard"], + }, + ) + return "backfilled" + except Exception as exc: # noqa: BLE001 - classified below + if _is_conditional_check_failure(exc): + return "skipped" + return "failed" + + +def inventory_legacy_repo_axis(table: Any) -> dict[str, int]: + """Return per-entity legacy row counts without mutating anything.""" + return { + entity_type: len(_scan_legacy(table, entity_type)) + for entity_type in REPO_AXIS_ENTITY_TYPES + } diff --git a/src/security_scanner/storage/adapters/nosql_db/repo_axis_reader.py b/src/security_scanner/storage/adapters/nosql_db/repo_axis_reader.py new file mode 100644 index 0000000..d838a1b --- /dev/null +++ b/src/security_scanner/storage/adapters/nosql_db/repo_axis_reader.py @@ -0,0 +1,125 @@ +"""Scatter-gather repo-axis GSI1 read with migration-only legacy fallback (#23). + +The write side shards each per-repo row across a fixed set of ``REPO#`` +shard partitions (see :mod:`repo_axis`). A repo-local read therefore has to fan +out across every shard partition and merge the results back into the single +logical result set callers used to get from the unsharded partition. + +Partition keys (sharded and legacy) are composed through the helpers in +:mod:`repo_axis`, so this module never types the partition-key literals itself; +the legacy partition is only read behind the ``include_legacy`` flag. + +Ordering caveat during partial migration: the canonical sort is raw +``(gsi1sk, PK, SK)``. For STATE_EVENT the sharded ``gsi1sk`` carries an +``#`` suffix that legacy (not-yet-backfilled) rows lack, so in a mixed +window legacy events sort just before their sharded siblings rather than purely +by ``eventSeq``. No production caller reads STATE_EVENT through this reader today +(events are read by primary key); a future STATE_EVENT repo-axis reader must not +rely on this canonical order until backfill completes, or must sort explicitly by +``(decidedAt, findingId, eventSeq)``. +""" + +from __future__ import annotations + +from collections.abc import Callable +from typing import Any + +from security_scanner.storage.adapters.nosql_db.access import query_all_pages +from security_scanner.storage.adapters.nosql_db.repo_axis import ( + REPO_AXIS_SHARD_COUNT, + legacy_repo_axis_pk, + sharded_repo_axis_pk, +) +from security_scanner.storage.adapters.nosql_db.transport import GSI1_NAME + +QueryPages = Callable[..., list[dict[str, Any]]] + + +def read_repo_axis( + table: Any, + *, + repo_axis_id: str, + gsi1sk_prefix: str | None = None, + shard_count: int = REPO_AXIS_SHARD_COUNT, + include_legacy: bool = False, + query_pages: QueryPages = query_all_pages, +) -> list[dict[str, Any]]: + """Fan out across every shard partition for one repo-axis id. + + Reads each sharded ``REPO#`` partition (via ``sharded_repo_axis_pk``) + and, only when ``include_legacy`` is true, the legacy unsharded ``REPO#`` + partition. Results are + defensively deduped by ``(PK, SK)`` preferring the higher ``repoAxisVersion`` + and returned in canonical ``(gsi1sk, PK, SK)`` order. + + Fails closed: if any shard query raises, the error propagates and no partial + result is returned. A partial repo result must never look like a complete one. + + ``include_legacy`` defaults to ``False`` so steady-state reads issue zero + legacy queries; only migration/backfill validation and legacy compatibility + tests opt in. + """ + width = len(str(shard_count - 1)) + collected: list[dict[str, Any]] = [] + for bucket in range(shard_count): + partition = sharded_repo_axis_pk(repo_axis_id, f"{bucket:0{width}d}") + collected.extend(_query_partition(query_pages, table, partition, gsi1sk_prefix)) + if include_legacy: + collected.extend( + _query_partition( + query_pages, table, legacy_repo_axis_pk(repo_axis_id), gsi1sk_prefix + ) + ) + return _dedupe_and_sort(collected) + + +def _query_partition( + query_pages: QueryPages, + table: Any, + partition: str, + gsi1sk_prefix: str | None, +) -> list[dict[str, Any]]: + if gsi1sk_prefix is None: + return query_pages( + table, + IndexName=GSI1_NAME, + KeyConditionExpression="gsi1pk = :pk", + ExpressionAttributeValues={":pk": partition}, + ) + return query_pages( + table, + IndexName=GSI1_NAME, + KeyConditionExpression="gsi1pk = :pk AND begins_with(gsi1sk, :sk_prefix)", + ExpressionAttributeValues={":pk": partition, ":sk_prefix": gsi1sk_prefix}, + ) + + +def _dedupe_and_sort(items: list[dict[str, Any]]) -> list[dict[str, Any]]: + """Dedupe by primary key, prefer higher repoAxisVersion, sort canonically. + + A sharded item and a legacy item with the same ``(PK, SK)`` are not expected + in steady state; dedupe is defensive against GSI eventual consistency and + test doubles. When they do collide, the higher ``repoAxisVersion`` wins. + """ + best: dict[tuple[Any, Any], dict[str, Any]] = {} + for item in items: + key = (item.get("PK"), item.get("SK")) + existing = best.get(key) + if existing is None or _version_of(item) > _version_of(existing): + best[key] = item + return sorted( + best.values(), + key=lambda item: ( + item.get("gsi1sk", ""), + item.get("PK", ""), + item.get("SK", ""), + ), + ) + + +def _version_of(item: dict[str, Any]) -> int: + """Return the repo-axis version, treating legacy (absent) items as 0.""" + try: + return int(item.get("repoAxisVersion", 0)) + except (TypeError, ValueError): + return 0 diff --git a/src/security_scanner/storage/adapters/nosql_db/store.py b/src/security_scanner/storage/adapters/nosql_db/store.py index 4879a15..3d6cacd 100644 --- a/src/security_scanner/storage/adapters/nosql_db/store.py +++ b/src/security_scanner/storage/adapters/nosql_db/store.py @@ -53,6 +53,7 @@ secret_evidence_to_item, split_target_name, ) +from security_scanner.storage.adapters.nosql_db.repo_axis_reader import read_repo_axis from security_scanner.storage.adapters.nosql_db.transport import ( GSI1_NAME, REPO_LIST_PK, @@ -433,9 +434,7 @@ def read_recent_repo_metadata(self, limit: int | None = None) -> list[RepoMetada self._table, limit=limit, IndexName=GSI1_NAME, - KeyConditionExpression=( - "gsi1pk = :pk AND begins_with(gsi1sk, :sk_prefix)" - ), + KeyConditionExpression=("gsi1pk = :pk AND begins_with(gsi1sk, :sk_prefix)"), ExpressionAttributeValues={ ":pk": REPO_LIST_PK, ":sk_prefix": "UPDATED#", @@ -497,23 +496,28 @@ def list_ref_states(self, repo_id: str) -> list[RefState]: if item.get("entityType") == "REF_STATE" ] - def read_observations_for_repo(self, repo_id: str) -> list[dict[str, Any]]: + def read_observations_for_repo( + self, repo_id: str, *, include_legacy: bool = False + ) -> list[dict[str, Any]]: """Return raw FINDING_OBSERVATION items for one repository. - Used to derive per-branch residual within the REPO# partition - (issue #12 L2) without a dedicated branch index. + Used to derive per-branch residual within the repo-axis partition + (issue #12 L2) without a dedicated branch index. The repo-axis GSI1 is + sharded (issue #23), so this fans out across every shard partition via + ``read_repo_axis`` and merges the result. Observations use + ``gsi1sk="RUN#..."`` so the begins_with prefix narrows each shard read; + the entityType filter is belt-and-braces (FINDING/FINDING_STATE use + "FINDING#", STATE_EVENT uses "STATE_EVENT#"). + + ``include_legacy`` is the migration-only compatibility path: it adds the + pre-sharding ``REPO#`` partition to the fan-out. It defaults to + ``False`` so steady-state residual reads issue zero legacy queries. """ - # Observations use gsi1sk="RUN#...", so the begins_with prefix narrows the - # GSI1 partition read server-side; the entityType filter is belt-and-braces - # (FINDING/FINDING_STATE use "FINDING#", STATE_EVENT uses "STATE_EVENT#"). - items = query_all_pages( + items = read_repo_axis( self._table, - IndexName=GSI1_NAME, - KeyConditionExpression="gsi1pk = :pk AND begins_with(gsi1sk, :sk_prefix)", - ExpressionAttributeValues={ - ":pk": f"REPO#{repo_id}", - ":sk_prefix": "RUN#", - }, + repo_axis_id=repo_id, + gsi1sk_prefix="RUN#", + include_legacy=include_legacy, ) return [ item for item in items if item.get("entityType") == "FINDING_OBSERVATION" @@ -789,8 +793,7 @@ def _is_conditional_check_failure(exc: Exception) -> bool: if code == "TransactionCanceledException": reasons = response.get("CancellationReasons", []) return not reasons or any( - reason.get("Code") == "ConditionalCheckFailed" - for reason in reasons + reason.get("Code") == "ConditionalCheckFailed" for reason in reasons ) return exc.__class__.__name__ == "ConditionalCheckFailedException" diff --git a/src/security_scanner/storage/adapters/nosql_db/transport.py b/src/security_scanner/storage/adapters/nosql_db/transport.py index fa6a702..5602633 100644 --- a/src/security_scanner/storage/adapters/nosql_db/transport.py +++ b/src/security_scanner/storage/adapters/nosql_db/transport.py @@ -6,7 +6,6 @@ from dataclasses import dataclass from typing import Any - DEFAULT_TABLE_NAME = "SecurityScannerLocal" DEFAULT_ENDPOINT_URL = "http://localhost:4567" DEFAULT_REGION_NAME = "us-west-2" @@ -27,7 +26,7 @@ class DynamoDbCompatibleConfig: aws_secret_access_key: str = "dummy" @classmethod - def from_env(cls) -> "DynamoDbCompatibleConfig": + def from_env(cls) -> DynamoDbCompatibleConfig: """Build config from public-safe environment variable names.""" return cls( table_name=os.environ.get( diff --git a/tests/test_cli_residual.py b/tests/test_cli_residual.py index 83e1ae7..a0ccac3 100644 --- a/tests/test_cli_residual.py +++ b/tests/test_cli_residual.py @@ -22,7 +22,9 @@ def list_ref_states(self, repo_id: str) -> list[RefState]: ) ] - def read_observations_for_repo(self, repo_id: str) -> list[dict]: + def read_observations_for_repo( + self, repo_id: str, *, include_legacy: bool = False + ) -> list[dict]: return [ {"branch": "main", "commit": "S2", "findingId": "finding_residual"}, {"branch": "main", "commit": "S1", "findingId": "finding_stale"}, diff --git a/tests/test_dynamodb_compatible_store.py b/tests/test_dynamodb_compatible_store.py index a872f02..116e7d1 100644 --- a/tests/test_dynamodb_compatible_store.py +++ b/tests/test_dynamodb_compatible_store.py @@ -165,9 +165,10 @@ def query(self, **kwargs) -> dict: if page_index == 0: assert "ExclusiveStartKey" not in kwargs else: - assert kwargs["ExclusiveStartKey"] == self.pages[page_index - 1][ - "LastEvaluatedKey" - ] + assert ( + kwargs["ExclusiveStartKey"] + == self.pages[page_index - 1]["LastEvaluatedKey"] + ) return page @@ -322,9 +323,14 @@ def test_build_table_schema_matches_single_table_keys_and_indexes(): "GSI1", "GSI2", } - assert { - attr["AttributeName"] for attr in schema["AttributeDefinitions"] - } == {"PK", "SK", "gsi1pk", "gsi1sk", "gsi2pk", "gsi2sk"} + assert {attr["AttributeName"] for attr in schema["AttributeDefinitions"]} == { + "PK", + "SK", + "gsi1pk", + "gsi1sk", + "gsi2pk", + "gsi2sk", + } def test_finding_to_items_writes_core_identity_observation_and_state_items(): @@ -352,7 +358,11 @@ def test_finding_to_items_writes_core_identity_observation_and_state_items(): state_item = items[2] assert identity_item["PK"] == f"FINDING#{finding.finding_id}" assert identity_item["SK"] == "META" - assert identity_item["gsi1pk"] == f"REPO#{finding.repo.full_name}" + # repo-axis GSI1 is sharded (#23): partition fans out by stable material. + assert identity_item["gsi1pk"].startswith(f"REPO#{finding.repo.full_name}#SHARD#") + assert identity_item["gsi1sk"] == f"FINDING#{finding.finding_id}" + assert identity_item["repoAxisVersion"] == 2 + assert identity_item["repoAxisShardCount"] == 16 assert identity_item["gsi2pk"] == f"RULE#{finding.rule_id}" assert identity_item["sourceTool"] == "gitleaks" assert "sourceToolVersion" not in identity_item @@ -377,9 +387,10 @@ def test_finding_to_items_writes_core_identity_observation_and_state_items(): same_fingerprint_observation = finding_to_items( Finding.from_dict(same_fingerprint_data) )[1] - assert same_fingerprint_observation["occurrenceKey"] == observation_item[ - "occurrenceKey" - ] + assert ( + same_fingerprint_observation["occurrenceKey"] + == observation_item["occurrenceKey"] + ) assert state_item["PK"] == f"FINDING#{finding.finding_id}" assert state_item["SK"] == "STATE#GLOBAL" assert state_item["stateScopeKey"] == "GLOBAL" @@ -425,8 +436,7 @@ def test_repo_metadata_to_item_keeps_runtime_metadata_and_repo_list_index(): assert item["enabled"] is True assert item["gsi1pk"] == "REPO_LIST#ALL" assert ( - item["gsi1sk"] - == "UPDATED#2026-05-22T00:00:00Z#github#example-org/example-repo" + item["gsi1sk"] == "UPDATED#2026-05-22T00:00:00Z#github#example-org/example-repo" ) @@ -481,12 +491,17 @@ def test_finding_state_event_to_item_writes_append_only_indexes(): assert item["PK"] == "FINDING#finding_synthetic" assert ( - item["SK"] - == "STATE_EVENT#2026-06-16T01:02:03+00:00#01JY0000000000000000000000" + item["SK"] == "STATE_EVENT#2026-06-16T01:02:03+00:00#01JY0000000000000000000000" ) assert item["entityType"] == "STATE_EVENT" - assert item["gsi1pk"] == "REPO#fake-org/fake-repo" - assert item["gsi1sk"] == "STATE_EVENT#2026-06-16T01:02:03+00:00#finding_synthetic" + # repo-axis GSI1 is sharded and the gsi1sk gains an eventSeq tie-breaker (#23); + # the rule-axis GSI2 is out of scope and keeps its existing key shape. + assert item["gsi1pk"].startswith("REPO#fake-org/fake-repo#SHARD#") + assert item["gsi1sk"] == ( + "STATE_EVENT#2026-06-16T01:02:03+00:00#finding_synthetic" + "#01JY0000000000000000000000" + ) + assert item["repoAxisShard"] == item["gsi1pk"].rsplit("#", 1)[-1] assert item["gsi2pk"] == "RULE#generic-api-key" assert item["gsi2sk"] == "STATE_EVENT#2026-06-16T01:02:03+00:00#finding_synthetic" assert item["fromStatus"] == Status.OPEN.value @@ -982,9 +997,10 @@ def query(self, **kwargs) -> dict: if page_index == 0: assert "ExclusiveStartKey" not in kwargs else: - assert kwargs["ExclusiveStartKey"] == self.run_pages[page_index - 1][ - "LastEvaluatedKey" - ] + assert ( + kwargs["ExclusiveStartKey"] + == self.run_pages[page_index - 1]["LastEvaluatedKey"] + ) return page table = PaginatedRunAndStateTable() @@ -1020,8 +1036,7 @@ def test_read_for_scan_run_batches_state_lookup_in_chunks_of_100(): assert len(table.query_calls) == 1 assert len(resource.batch_get_calls) == 2 assert [ - len(call["SecurityScannerLocal"]["Keys"]) - for call in resource.batch_get_calls + len(call["SecurityScannerLocal"]["Keys"]) for call in resource.batch_get_calls ] == [100, 1] @@ -1127,9 +1142,7 @@ def test_read_for_scan_run_merges_state_for_findings_in_that_run_only(): line_start=12, ) items = ( - finding_to_items(first) - + finding_to_items(second) - + finding_to_items(other_run) + finding_to_items(first) + finding_to_items(second) + finding_to_items(other_run) ) for item in items: if ( @@ -1233,9 +1246,7 @@ def test_set_finding_disposition_updates_state_and_appends_event(): item for item in table.items if item.get("entityType") == "STATE_EVENT" ] assert len(event_items) == 1 - assert event_items[0]["SK"].startswith( - "STATE_EVENT#2026-06-16T01:02:03+00:00#" - ) + assert event_items[0]["SK"].startswith("STATE_EVENT#2026-06-16T01:02:03+00:00#") assert len(client.transact_calls) == 1 raw_transaction = client.transact_calls[0]["TransactItems"] assert raw_transaction[0]["Update"]["Key"]["PK"] == { @@ -1418,3 +1429,86 @@ def test_residual_for_repo_derives_per_branch_from_latest_commit(): assert result[0].branch == "main" assert result[0].commit == "S2" assert result[0].finding_ids == [residual.finding_id] + + +def _legacy_observation_item(finding: Finding, repo: str) -> dict: + """Return a pre-sharding (#23) observation item: unsharded gsi1pk, no axis.""" + item = dict(finding_to_items(finding)[1]) + item["gsi1pk"] = f"REPO#{repo}" + for field in ("repoAxisVersion", "repoAxisShardCount", "repoAxisShard"): + item.pop(field, None) + return item + + +def _make_store_with_main_ref(table: FakeDynamoTable, repo: str): + store = DynamoDbCompatibleFindingStore( + DynamoDbCompatibleConfig(table_name="SecurityScannerLocal"), + resource=FakeDynamoResource(table), + client=FakeDynamoClient(), + ) + store.put_ref_state( + RefState( + repo_id=repo, + repo_url="https://example/r", + ref_name="refs/heads/main", + last_seen_sha="S2", + updated_at=dt.datetime(2026, 6, 16, tzinfo=dt.timezone.utc), + ) + ) + return store + + +def test_residual_parity_across_legacy_only_sharded_only_and_mixed(): + # The same logical observation set must derive the same residual whether the + # rows are sharded (#23), pre-sharding legacy, or a mix during migration. + repo = "fake-org/fake-repo" + residual = _make(repo_branch="main", repo_commit="S2", line_start=10) + stale = _make(repo_branch="main", repo_commit="S1", line_start=20) + + # sharded-only: production steady state, no legacy queries + sharded_table = FakeDynamoTable() + sharded_store = _make_store_with_main_ref(sharded_table, repo) + for finding in (residual, stale): + for item in finding_to_items(finding): + sharded_table.put_item(Item=item) + sharded_residual = residual_for_repo(sharded_store, repo) + + # legacy-only: pre-migration data, read via the compatibility fan-out + legacy_table = FakeDynamoTable() + legacy_store = _make_store_with_main_ref(legacy_table, repo) + for finding in (residual, stale): + legacy_table.put_item(Item=_legacy_observation_item(finding, repo)) + # production path: residual_for_repo threads include_legacy through to the + # fan-out reader, so this exercises the real migration read, not a hand-built one + legacy_residual = residual_for_repo(legacy_store, repo, include_legacy=True) + + # mixed: residual present in both partitions (dedupe), stale only legacy + mixed_table = FakeDynamoTable() + mixed_store = _make_store_with_main_ref(mixed_table, repo) + for item in finding_to_items(residual): + mixed_table.put_item(Item=item) # sharded + mixed_table.put_item(Item=_legacy_observation_item(residual, repo)) # dup legacy + mixed_table.put_item(Item=_legacy_observation_item(stale, repo)) # legacy only + mixed_residual = residual_for_repo(mixed_store, repo, include_legacy=True) + + expected = [("main", "S2", [residual.finding_id])] + for derived in (sharded_residual, legacy_residual, mixed_residual): + assert [(r.branch, r.commit, r.finding_ids) for r in derived] == expected + + +def test_read_observations_for_repo_default_issues_no_legacy_query(): + # Steady-state residual reads must not touch the legacy unsharded partition. + repo = "fake-org/fake-repo" + table = FakeDynamoTable() + store = _make_store_with_main_ref(table, repo) + finding = _make(repo_branch="main", repo_commit="S2") + for item in finding_to_items(finding): + table.put_item(Item=item) + + store.read_observations_for_repo(repo) + + queried_pks = [ + call["ExpressionAttributeValues"][":pk"] for call in table.query_calls + ] + assert f"REPO#{repo}" not in queried_pks + assert all(pk.startswith(f"REPO#{repo}#SHARD#") for pk in queried_pks) diff --git a/tests/test_repo_axis_sharding.py b/tests/test_repo_axis_sharding.py new file mode 100644 index 0000000..e28a1b7 --- /dev/null +++ b/tests/test_repo_axis_sharding.py @@ -0,0 +1,789 @@ +"""Tests for repo-axis GSI1 sharding (issue #23).""" + +from __future__ import annotations + +import datetime as dt +import pathlib +import re + +from security_scanner.core.finding.model import Finding +from security_scanner.storage.adapters.nosql_db import repo_axis as repo_axis_module +from security_scanner.storage.adapters.nosql_db.items import ( + finding_state_event_to_item, + finding_to_items, + ghas_alert_to_item, + scan_ledger_entry_to_item, +) +from security_scanner.storage.adapters.nosql_db.repo_axis import ( + REPO_AXIS_SHARD_COUNT, + REPO_AXIS_VERSION, + RepoAxisKey, + legacy_repo_axis_pk, + repo_axis_shard, +) +from security_scanner.storage.base import ( + FindingStateEvent, + GhasAlertRecord, + ScanLedgerEntry, +) + +FAKE_SECRET = "AKIAFAKEEXAMPLE000900" +SCAN_RUN_ID = "scan_axis01" +RULE_PACK = "secret-rules-0.1.0" + + +def _finding() -> Finding: + return Finding.create( + repo_full_name="fake-org/fake-repo", + rule_id="generic-api-key", + file_path="src/config.py", + line_start=10, + raw_secret=FAKE_SECRET, + source_tool="gitleaks", + scan_run_id=SCAN_RUN_ID, + rule_pack_version=RULE_PACK, + ) + + +def _ghas_alert() -> GhasAlertRecord: + return GhasAlertRecord( + ghas_alert_id="ghas_001", + repository="fake-org/fake-repo", + alert_number=1, + secret_type="aws_access_key_id", + state="open", + fetched_at=dt.datetime(2026, 6, 16, 1, 2, 3, tzinfo=dt.timezone.utc), + ) + + +def _ledger_entry() -> ScanLedgerEntry: + return ScanLedgerEntry( + repo_id="repo_abc123", + commit_sha="c" * 40, + scanner_name="gitleaks", + scanner_version="8.18.0", + rule_pack_version=RULE_PACK, + scanner_config_hash="cfg123", + scan_run_id=SCAN_RUN_ID, + job_id="scan_job_xyz", + scanned_at=dt.datetime(2026, 6, 16, 1, 2, 3, tzinfo=dt.timezone.utc), + finding_count=0, + ) + + +def _state_event() -> FindingStateEvent: + return FindingStateEvent( + finding_id="finding_synthetic", + repo="fake-org/fake-repo", + rule_id="generic-api-key", + from_status="open", + to_status="false_positive", + from_verdict="needs_review", + to_verdict="false_positive", + actor="lfm2.5-thinking", + source="verifier", + reason="Synthetic false positive.", + decided_at="2026-06-16T01:02:03+00:00", + event_seq="01JY0000000000000000000000", + ) + + +def test_repo_axis_shard_is_deterministic_for_same_material(): + first = repo_axis_shard("finding_abc") + second = repo_axis_shard("finding_abc") + + assert first == second + + +def test_repo_axis_shard_buckets_are_fixed_width_within_range(): + buckets = {repo_axis_shard(f"finding_{n}") for n in range(500)} + + # Every emitted bucket is a two-digit "00".."15" string within the contract. + assert buckets + for bucket in buckets: + assert bucket.isdigit() + assert len(bucket) == 2 + assert 0 <= int(bucket) < REPO_AXIS_SHARD_COUNT + + +def test_repo_axis_shard_distributes_across_all_buckets(): + # With enough distinct material every shard bucket should be exercised, which + # is the whole point of fan-out: load spreads off the single hot partition. + buckets = {repo_axis_shard(f"finding_{n}") for n in range(2000)} + + assert buckets == {f"{n:02d}" for n in range(REPO_AXIS_SHARD_COUNT)} + + +def test_repo_axis_key_pins_durable_schema_contract(): + key = RepoAxisKey.build( + repo_axis_id="fake-org/fake-repo", + gsi1sk="FINDING#finding_abc", + shard_material="finding_abc", + ) + + assert key.repo_axis_version == REPO_AXIS_VERSION == 2 + assert key.repo_axis_shard_count == REPO_AXIS_SHARD_COUNT == 16 + assert key.gsi1sk == "FINDING#finding_abc" + assert key.gsi1pk == f"REPO#fake-org/fake-repo#SHARD#{key.repo_axis_shard}" + + +def test_repo_axis_key_projection_carries_all_metadata_fields(): + key = RepoAxisKey.build( + repo_axis_id="fake-org/fake-repo", + gsi1sk="FINDING#finding_abc", + shard_material="finding_abc", + ) + + projection = key.projection() + + assert projection == { + "gsi1pk": key.gsi1pk, + "gsi1sk": "FINDING#finding_abc", + "repoAxisVersion": 2, + "repoAxisShardCount": 16, + "repoAxisShard": key.repo_axis_shard, + } + + +def test_repo_axis_id_is_not_canonicalized_by_helper(): + # local scan target names, incremental repo_id, and GHAS repository values are + # all passed through verbatim; identity normalization is the caller's job. + key = RepoAxisKey.build( + repo_axis_id="Weird/Repo Name", + gsi1sk="FINDING#x", + shard_material="x", + ) + + assert key.gsi1pk.startswith("REPO#Weird/Repo Name#SHARD#") + + +def test_legacy_repo_axis_pk_is_unsharded(): + assert legacy_repo_axis_pk("fake-org/fake-repo") == "REPO#fake-org/fake-repo" + + +# --- M2: mappers route through RepoAxisKey --------------------------------- + + +def _assert_sharded(item: dict, *, repo_axis_id: str, gsi1sk_prefix: str) -> None: + assert item["gsi1pk"].startswith(f"REPO#{repo_axis_id}#SHARD#") + assert item["gsi1sk"].startswith(gsi1sk_prefix) + assert item["repoAxisVersion"] == REPO_AXIS_VERSION + assert item["repoAxisShardCount"] == REPO_AXIS_SHARD_COUNT + assert item["repoAxisShard"] == item["gsi1pk"].rsplit("#", 1)[-1] + + +def test_finding_mappers_emit_sharded_repo_axis_with_preserved_prefixes(): + finding = _finding() + identity, observation, state = finding_to_items(finding) + repo = finding.repo.full_name + + _assert_sharded( + identity, repo_axis_id=repo, gsi1sk_prefix=f"FINDING#{finding.finding_id}" + ) + _assert_sharded( + observation, + repo_axis_id=repo, + gsi1sk_prefix=f"RUN#{SCAN_RUN_ID}#OBS#{finding.finding_id}#", + ) + _assert_sharded( + state, repo_axis_id=repo, gsi1sk_prefix=f"FINDING#{finding.finding_id}" + ) + + +def test_ghas_alert_mapper_emits_sharded_repo_axis(): + alert = _ghas_alert() + item = ghas_alert_to_item(alert) + + _assert_sharded(item, repo_axis_id=alert.repository, gsi1sk_prefix="GHAS_ALERT#") + + +def test_scan_ledger_mapper_emits_sharded_repo_axis(): + entry = _ledger_entry() + item = scan_ledger_entry_to_item(entry) + + _assert_sharded( + item, repo_axis_id=entry.repo_id, gsi1sk_prefix=f"LEDGER#{entry.commit_sha}#" + ) + + +def test_state_event_sharded_gsi1sk_includes_event_seq_suffix(): + event = _state_event() + item = finding_state_event_to_item(event) + + _assert_sharded(item, repo_axis_id=event.repo, gsi1sk_prefix="STATE_EVENT#") + # the eventSeq suffix is the stable tie-breaker for same-decidedAt events + assert item["gsi1sk"].endswith(f"#{event.event_seq}") + assert item["gsi1sk"] == ( + f"STATE_EVENT#{event.decided_at}#{event.finding_id}#{event.event_seq}" + ) + # rule-axis GSI2 is out of #23 scope and keeps the unsuffixed key + assert item["gsi2sk"] == f"STATE_EVENT#{event.decided_at}#{event.finding_id}" + + +def test_repo_axis_id_source_mapping_per_entity(): + # Each entity derives repoAxisId from its own domain field (design table). + finding = _finding() + identity, observation, state = finding_to_items(finding) + assert identity["gsi1pk"].startswith(f"REPO#{finding.repo.full_name}#SHARD#") + assert observation["gsi1pk"].startswith(f"REPO#{finding.repo.full_name}#SHARD#") + assert state["gsi1pk"].startswith(f"REPO#{finding.repo.full_name}#SHARD#") + + event = _state_event() + assert finding_state_event_to_item(event)["gsi1pk"].startswith( + f"REPO#{event.repo}#SHARD#" + ) + + entry = _ledger_entry() + assert scan_ledger_entry_to_item(entry)["gsi1pk"].startswith( + f"REPO#{entry.repo_id}#SHARD#" + ) + + alert = _ghas_alert() + assert ghas_alert_to_item(alert)["gsi1pk"].startswith( + f"REPO#{alert.repository}#SHARD#" + ) + + +def test_observation_shard_is_stable_across_rescans(): + # Same logical observation row must always land in the same shard so the + # fan-out reader and any backfill stay deterministic. + first = finding_to_items(_finding())[1] + second = finding_to_items(_finding())[1] + assert first["repoAxisShard"] == second["repoAxisShard"] + + +def test_no_raw_repo_axis_gsi1_construction_outside_helper(): + # Regression scan: only the RepoAxisKey helper and the legacy compatibility + # reader may construct a repo-axis gsi1 partition key. Mappers must route + # through the helper, never emit raw `gsi1pk = REPO#...` or `#SHARD#`. + pkg_dir = pathlib.Path(repo_axis_module.__file__).parent + # repo_axis.py is the single source of truth for the REPO#/#SHARD# literals; + # every other module composes keys through its helpers. + allowed = {"repo_axis.py"} + offenders: list[str] = [] + for path in sorted(pkg_dir.glob("*.py")): + if path.name in allowed: + continue + text = path.read_text(encoding="utf-8") + # Any `#SHARD#` literal at all (f-string, concat, .format, %, plain) is a + # hand-rolled sharded partition key — construction-method-agnostic. + if "#SHARD#" in text: + offenders.append(f"{path.name}: raw #SHARD# literal") + # raw mapping of the repo-axis partition straight onto gsi1pk + if re.search(r'["\']gsi1pk["\']\s*:\s*f?["\']REPO#', text): + offenders.append(f"{path.name}: raw gsi1pk=REPO# mapping") + # a GSI1 read whose :pk is a raw REPO# literal would re-introduce the hot + # partition / bypass the fan-out reader. Primary-key reads of REPO# are + # fine (design-retained), so only flag when GSI1/IndexName is nearby. + for match in re.finditer(r'["\']:pk["\']\s*:\s*f?["\']REPO#', text): + window = text[max(0, match.start() - 400) : match.start()] + if "GSI1" in window or "IndexName" in window: + offenders.append(f"{path.name}: GSI1 read on raw REPO# partition") + assert offenders == [], offenders + + +# --- M3: scatter-gather reader --------------------------------------------- + + +class _FakeGsiTable: + """Minimal GSI1 fan-out table double recording every query partition.""" + + def __init__(self, items: list[dict] | None = None) -> None: + self.items = items or [] + self.queried_pks: list[str] = [] + + def add(self, **item) -> None: + self.items.append(item) + + def query(self, **kwargs) -> dict: + values = kwargs["ExpressionAttributeValues"] + pk = values[":pk"] + self.queried_pks.append(pk) + prefix = values.get(":sk_prefix") + matched = [ + item + for item in self.items + if item.get("gsi1pk") == pk + and (prefix is None or str(item.get("gsi1sk", "")).startswith(prefix)) + ] + return {"Items": matched} + + +def _sharded_item(repo: str, gsi1sk: str, shard_material: str, **extra) -> dict: + key = RepoAxisKey.build( + repo_axis_id=repo, gsi1sk=gsi1sk, shard_material=shard_material + ) + return {**key.projection(), **extra} + + +def test_reader_fans_out_across_every_shard_partition(): + from security_scanner.storage.adapters.nosql_db.repo_axis_reader import ( + read_repo_axis, + ) + + repo = "fake-org/fake-repo" + table = _FakeGsiTable() + table.add( + **_sharded_item(repo, "RUN#r1#OBS#f1#o1", "r1f1o1", PK="RUN#r1", SK="OBS#f1#o1") + ) + table.add( + **_sharded_item(repo, "RUN#r1#OBS#f2#o2", "r1f2o2", PK="RUN#r1", SK="OBS#f2#o2") + ) + + result = read_repo_axis(table, repo_axis_id=repo, gsi1sk_prefix="RUN#") + + assert len(table.queried_pks) == REPO_AXIS_SHARD_COUNT + assert {(it["PK"], it["SK"]) for it in result} == { + ("RUN#r1", "OBS#f1#o1"), + ("RUN#r1", "OBS#f2#o2"), + } + + +def test_reader_default_issues_zero_legacy_queries(): + from security_scanner.storage.adapters.nosql_db.repo_axis_reader import ( + read_repo_axis, + ) + + repo = "fake-org/fake-repo" + table = _FakeGsiTable() + + read_repo_axis(table, repo_axis_id=repo, gsi1sk_prefix="RUN#") + + assert legacy_repo_axis_pk(repo) not in table.queried_pks + assert len(table.queried_pks) == REPO_AXIS_SHARD_COUNT + + +def test_reader_include_legacy_queries_legacy_partition_and_merges(): + from security_scanner.storage.adapters.nosql_db.repo_axis_reader import ( + read_repo_axis, + ) + + repo = "fake-org/fake-repo" + table = _FakeGsiTable() + table.add( + **_sharded_item(repo, "RUN#r1#OBS#f1#o1", "r1f1o1", PK="RUN#r1", SK="OBS#f1#o1") + ) + # legacy-only row: unsharded gsi1pk, no repoAxisVersion + table.add( + gsi1pk=legacy_repo_axis_pk(repo), + gsi1sk="RUN#r0#OBS#f0#o0", + PK="RUN#r0", + SK="OBS#f0#o0", + ) + + sharded_only = read_repo_axis(table, repo_axis_id=repo, gsi1sk_prefix="RUN#") + with_legacy = read_repo_axis( + table, repo_axis_id=repo, gsi1sk_prefix="RUN#", include_legacy=True + ) + + assert legacy_repo_axis_pk(repo) in table.queried_pks + assert {(it["PK"], it["SK"]) for it in sharded_only} == {("RUN#r1", "OBS#f1#o1")} + assert {(it["PK"], it["SK"]) for it in with_legacy} == { + ("RUN#r1", "OBS#f1#o1"), + ("RUN#r0", "OBS#f0#o0"), + } + + +def test_reader_dedupes_same_key_preferring_higher_version(): + from security_scanner.storage.adapters.nosql_db.repo_axis_reader import ( + read_repo_axis, + ) + + repo = "fake-org/fake-repo" + table = _FakeGsiTable() + table.add( + **_sharded_item( + repo, + "RUN#r1#OBS#f1#o1", + "r1f1o1", + PK="RUN#r1", + SK="OBS#f1#o1", + payload="sharded", + ) + ) + # legacy duplicate of the same (PK, SK) without a version + table.add( + gsi1pk=legacy_repo_axis_pk(repo), + gsi1sk="RUN#r1#OBS#f1#o1", + PK="RUN#r1", + SK="OBS#f1#o1", + payload="legacy", + ) + + result = read_repo_axis( + table, repo_axis_id=repo, gsi1sk_prefix="RUN#", include_legacy=True + ) + + assert len(result) == 1 + assert result[0]["payload"] == "sharded" + + +def test_reader_returns_canonical_gsi1sk_pk_sk_order(): + from security_scanner.storage.adapters.nosql_db.repo_axis_reader import ( + read_repo_axis, + ) + + repo = "fake-org/fake-repo" + table = _FakeGsiTable() + table.add( + **_sharded_item( + repo, "STATE_EVENT#2026-06-16#f1#s2", "a", PK="FINDING#f1", SK="b" + ) + ) + table.add( + **_sharded_item( + repo, "STATE_EVENT#2026-06-15#f1#s1", "b", PK="FINDING#f1", SK="a" + ) + ) + + result = read_repo_axis(table, repo_axis_id=repo, gsi1sk_prefix="STATE_EVENT#") + + assert [it["gsi1sk"] for it in result] == [ + "STATE_EVENT#2026-06-15#f1#s1", + "STATE_EVENT#2026-06-16#f1#s2", + ] + + +def test_reader_fails_closed_when_a_shard_query_raises(): + import pytest + + from security_scanner.storage.adapters.nosql_db.repo_axis_reader import ( + read_repo_axis, + ) + + repo = "fake-org/fake-repo" + + class _ExplodingTable(_FakeGsiTable): + def query(self, **kwargs): + pk = kwargs["ExpressionAttributeValues"][":pk"] + if pk.endswith("#SHARD#03"): + raise RuntimeError("shard 03 unavailable") + return super().query(**kwargs) + + table = _ExplodingTable() + table.add( + **_sharded_item(repo, "RUN#r1#OBS#f1#o1", "r1f1o1", PK="RUN#r1", SK="OBS#f1#o1") + ) + + with pytest.raises(RuntimeError, match="shard 03 unavailable"): + read_repo_axis(table, repo_axis_id=repo, gsi1sk_prefix="RUN#") + + +# --- M5: migration removal gate -------------------------------------------- + + +def _to_legacy(item: dict) -> dict: + """Return the pre-sharding (#23) form of a sharded mapper item.""" + legacy = dict(item) + repo = item["gsi1pk"][len("REPO#") :].rsplit("#SHARD#", 1)[0] + legacy["gsi1pk"] = f"REPO#{repo}" + for field in ("repoAxisVersion", "repoAxisShardCount", "repoAxisShard"): + legacy.pop(field, None) + if legacy.get("entityType") == "STATE_EVENT": + # legacy gsi1sk had no eventSeq tie-breaker suffix + legacy["gsi1sk"] = f"STATE_EVENT#{item['decidedAt']}#{item['findingId']}" + return legacy + + +class _MigrationTable: + class ConditionalCheckFailedException(Exception): + pass + + def __init__(self, items: list[dict]) -> None: + self.items = [dict(item) for item in items] + self.update_calls: list[dict] = [] + + def scan(self, **kwargs) -> dict: + entity_type = kwargs["ExpressionAttributeValues"][":entity_type"] + return { + "Items": [dict(i) for i in self.items if i.get("entityType") == entity_type] + } + + def _find(self, key: dict) -> dict | None: + for item in self.items: + if item.get("PK") == key["PK"] and item.get("SK") == key["SK"]: + return item + return None + + def update_item( + self, + *, + Key: dict, + UpdateExpression: str, + ExpressionAttributeValues: dict, + ConditionExpression: str | None = None, + ) -> dict: + self.update_calls.append(Key) + item = self._find(Key) + if not self._condition_holds(ConditionExpression, item): + raise self.ConditionalCheckFailedException() + assert item is not None # guaranteed by attribute_exists(PK) clause + for assignment in UpdateExpression.removeprefix("SET ").split(", "): + target, value_key = assignment.split(" = ", 1) + item[target] = ExpressionAttributeValues[value_key] + return {"ResponseMetadata": {"HTTPStatusCode": 200}} + + def _condition_holds(self, expression: str | None, item: dict | None) -> bool: + """Enforce every attribute_exists / attribute_not_exists clause faithfully.""" + if expression is None: + return item is not None + for clause in expression.split(" AND "): + clause = clause.strip() + if clause.startswith("attribute_exists("): + attr = clause[len("attribute_exists(") : -1] + if item is None or attr not in item: + return False + elif clause.startswith("attribute_not_exists("): + attr = clause[len("attribute_not_exists(") : -1] + if item is not None and attr in item: + return False + else: # pragma: no cover - defensive: unknown clause form + raise AssertionError(f"unhandled condition clause: {clause!r}") + return True + + +def test_backfill_updates_existing_item_in_place_not_row_copy(): + from security_scanner.storage.adapters.nosql_db.repo_axis_migration import ( + backfill_repo_axis, + ) + + observation = finding_to_items(_finding())[1] + legacy = _to_legacy(observation) + table = _MigrationTable([legacy]) + before = len(table.items) + legacy_key = (legacy["PK"], legacy["SK"]) + + report = backfill_repo_axis(table) + + # in place: no new rows, same primary key, now sharded + assert len(table.items) == before + updated = table.items[0] + assert (updated["PK"], updated["SK"]) == legacy_key + assert updated["gsi1pk"].startswith(f"REPO#{_finding().repo.full_name}#SHARD#") + assert updated["repoAxisVersion"] == 2 + assert table.update_calls == [{"PK": legacy["PK"], "SK": legacy["SK"]}] + counts = report.by_entity["FINDING_OBSERVATION"] + assert (counts.inventory, counts.backfilled, counts.remaining) == (1, 1, 0) + + +def test_backfill_report_counts_per_entity_and_clears_gate(): + from security_scanner.storage.adapters.nosql_db.repo_axis_migration import ( + backfill_repo_axis, + ) + + identity, observation, state = finding_to_items(_finding()) + legacy_rows = [ + _to_legacy(identity), + _to_legacy(observation), + _to_legacy(state), + _to_legacy(finding_state_event_to_item(_state_event())), + _to_legacy(scan_ledger_entry_to_item(_ledger_entry())), + _to_legacy(ghas_alert_to_item(_ghas_alert())), + ] + # an already-sharded row must not be counted as legacy inventory + already_sharded = ghas_alert_to_item(_ghas_alert()) + already_sharded["PK"] = "GHAS_ALERT#ghas_002" + table = _MigrationTable([*legacy_rows, already_sharded]) + + report = backfill_repo_axis(table) + + for entity_type in ( + "FINDING", + "FINDING_OBSERVATION", + "FINDING_STATE", + "STATE_EVENT", + "SCAN_LEDGER", + "GHAS_ALERT", + ): + counts = report.by_entity[entity_type] + assert (counts.inventory, counts.backfilled, counts.skipped, counts.failed) == ( + 1, + 1, + 0, + 0, + ), entity_type + assert counts.remaining == 0, entity_type + assert report.gate_clear is True + + +def test_backfill_classifies_skipped_and_failed(): + from security_scanner.storage.adapters.nosql_db.repo_axis_migration import ( + backfill_repo_axis, + ) + + legacy = _to_legacy(ghas_alert_to_item(_ghas_alert())) + + class _SkipTable(_MigrationTable): + def update_item(self, **kwargs): + self.update_calls.append(kwargs["Key"]) + raise self.ConditionalCheckFailedException() + + class _FailTable(_MigrationTable): + def update_item(self, **kwargs): + self.update_calls.append(kwargs["Key"]) + raise RuntimeError("dynamo unavailable") + + skip_report = backfill_repo_axis(_SkipTable([legacy])) + fail_report = backfill_repo_axis(_FailTable([legacy])) + + skip = skip_report.by_entity["GHAS_ALERT"] + assert (skip.backfilled, skip.skipped, skip.failed, skip.remaining) == (0, 1, 0, 1) + assert skip_report.gate_clear is False + + fail = fail_report.by_entity["GHAS_ALERT"] + assert (fail.backfilled, fail.skipped, fail.failed, fail.remaining) == (0, 0, 1, 1) + + +def test_backfilled_projection_matches_write_mapper_for_every_entity(): + # Drift guard: a backfilled legacy row must land on the exact same sharded key + # a fresh write would produce, so reads see one logical row, not two. + from security_scanner.storage.adapters.nosql_db.repo_axis_migration import ( + repo_axis_projection_for_item, + ) + + identity, observation, state = finding_to_items(_finding()) + mapper_items = [ + identity, + observation, + state, + finding_state_event_to_item(_state_event()), + scan_ledger_entry_to_item(_ledger_entry()), + ghas_alert_to_item(_ghas_alert()), + ] + for sharded in mapper_items: + expected = { + "gsi1pk": sharded["gsi1pk"], + "gsi1sk": sharded["gsi1sk"], + "repoAxisVersion": sharded["repoAxisVersion"], + "repoAxisShardCount": sharded["repoAxisShardCount"], + "repoAxisShard": sharded["repoAxisShard"], + } + assert repo_axis_projection_for_item(_to_legacy(sharded)) == expected, sharded[ + "entityType" + ] + + +def test_inventory_counts_legacy_rows_without_mutating(): + from security_scanner.storage.adapters.nosql_db.repo_axis_migration import ( + inventory_legacy_repo_axis, + ) + + legacy = _to_legacy(ghas_alert_to_item(_ghas_alert())) + table = _MigrationTable([legacy]) + + inventory = inventory_legacy_repo_axis(table) + + assert inventory["GHAS_ALERT"] == 1 + assert inventory["FINDING"] == 0 + assert table.update_calls == [] + + +def test_backfill_is_idempotent_on_rerun(): + from security_scanner.storage.adapters.nosql_db.repo_axis_migration import ( + backfill_repo_axis, + ) + + identity, observation, state = finding_to_items(_finding()) + table = _MigrationTable( + [_to_legacy(identity), _to_legacy(observation), _to_legacy(state)] + ) + + first = backfill_repo_axis(table) + updates_after_first = len(table.update_calls) + second = backfill_repo_axis(table) + + assert first.gate_clear and second.gate_clear + # second pass finds no legacy rows: zero inventory, zero new update_item calls + for entity_type in ("FINDING", "FINDING_OBSERVATION", "FINDING_STATE"): + c2 = second.by_entity[entity_type] + assert (c2.inventory, c2.backfilled, c2.skipped, c2.remaining) == (0, 0, 0, 0) + assert len(table.update_calls) == updates_after_first + + +def test_migration_fake_enforces_attribute_exists_pk_clause(): + # Fidelity: a conditional update against a missing primary item must fail the + # condition (attribute_exists(PK)), exactly as real DynamoDB would. + table = _MigrationTable([]) + with __import__("pytest").raises(_MigrationTable.ConditionalCheckFailedException): + table.update_item( + Key={"PK": "FINDING#missing", "SK": "META"}, + UpdateExpression="SET gsi1pk = :pk", + ConditionExpression=( + "attribute_exists(PK) AND attribute_exists(SK) AND " + "attribute_not_exists(repoAxisVersion)" + ), + ExpressionAttributeValues={":pk": "REPO#x#SHARD#00"}, + ) + + +def test_reader_canonical_order_breaks_ties_on_pk_then_sk(): + from security_scanner.storage.adapters.nosql_db.repo_axis_reader import ( + read_repo_axis, + ) + + repo = "fake-org/fake-repo" + table = _FakeGsiTable() + # identical gsi1sk; order must then fall back to PK, then SK + table.add(**_sharded_item(repo, "FINDING#f", "m1", PK="FINDING#b", SK="META")) + table.add(**_sharded_item(repo, "FINDING#f", "m2", PK="FINDING#a", SK="S2")) + table.add(**_sharded_item(repo, "FINDING#f", "m3", PK="FINDING#a", SK="S1")) + + result = read_repo_axis(table, repo_axis_id=repo, gsi1sk_prefix="FINDING#") + + assert [(it["PK"], it["SK"]) for it in result] == [ + ("FINDING#a", "S1"), + ("FINDING#a", "S2"), + ("FINDING#b", "META"), + ] + + +class _PaginatedGsiTable(_FakeGsiTable): + """Fan-out table that paginates one shard partition across two pages.""" + + def __init__(self, paginated_pk: str) -> None: + super().__init__() + self._paginated_pk = paginated_pk + + def query(self, **kwargs) -> dict: + values = kwargs["ExpressionAttributeValues"] + pk = values[":pk"] + self.queried_pks.append(pk) + prefix = values.get(":sk_prefix") + matched = [ + item + for item in self.items + if item.get("gsi1pk") == pk + and (prefix is None or str(item.get("gsi1sk", "")).startswith(prefix)) + ] + if pk == self._paginated_pk and "ExclusiveStartKey" not in kwargs: + # first page: return half + a continuation cursor + return {"Items": matched[:1], "LastEvaluatedKey": {"PK": "cursor"}} + if pk == self._paginated_pk: + assert kwargs["ExclusiveStartKey"] == {"PK": "cursor"} + return {"Items": matched[1:]} + return {"Items": matched} + + +def test_reader_threads_pagination_within_a_single_shard(): + from security_scanner.storage.adapters.nosql_db.repo_axis_reader import ( + read_repo_axis, + ) + + repo = "fake-org/fake-repo" + # place two rows in the same shard so one partition spans two query pages + a = _sharded_item(repo, "RUN#r#OBS#f1#o1", "f1", PK="RUN#r", SK="OBS#f1#o1") + b = _sharded_item(repo, "RUN#r#OBS#f2#o2", "f2", PK="RUN#r", SK="OBS#f2#o2") + # force both into the same bucket by reusing one shard partition key + shard_pk = a["gsi1pk"] + b["gsi1pk"] = shard_pk + b["repoAxisShard"] = a["repoAxisShard"] + table = _PaginatedGsiTable(paginated_pk=shard_pk) + table.add(**a) + table.add(**b) + + result = read_repo_axis(table, repo_axis_id=repo, gsi1sk_prefix="RUN#") + + assert {(it["PK"], it["SK"]) for it in result} == { + ("RUN#r", "OBS#f1#o1"), + ("RUN#r", "OBS#f2#o2"), + }