Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/security_scanner/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from security_scanner.cli.commands import (
doctor,
migrate,
quickstart,
report,
scan,
Expand All @@ -21,7 +22,16 @@
verify,
)

_COMMAND_MODULES = (scan, report, verify, storage, doctor, quickstart, targets)
_COMMAND_MODULES = (
scan,
report,
verify,
storage,
doctor,
quickstart,
targets,
migrate,
)


def build_parser() -> argparse.ArgumentParser:
Expand Down
81 changes: 81 additions & 0 deletions src/security_scanner/cli/commands/migrate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""backfill-repo-axis subcommand: migrate legacy REPO# GSI1 rows to shards (#23).

Repo-axis GSI1 rows written before issue #23 live on the unsharded
``REPO#<repo>`` partition. This command runs the in-place backfill that re-points
each existing primary item's GSI projection to a ``REPO#<repo>#SHARD#<bucket>``
partition. It mutates only the GSI key fields of rows that already exist; it never
copies rows or writes finding payload data.

``--dry-run`` reports the per-entity legacy inventory without mutating anything.
"""

from __future__ import annotations

import argparse
Comment thread
pureliture marked this conversation as resolved.
import sys

from security_scanner.cli._args import add_storage_args
from security_scanner.cli._store import dynamodb_config_from_args
from security_scanner.storage.adapters.nosql_db.repo_axis_migration import (
backfill_repo_axis,
inventory_legacy_repo_axis,
)
from security_scanner.storage.adapters.nosql_db.transport import (
make_boto3_resource_and_client,
)


def register(subparsers) -> None:
parser = subparsers.add_parser(
"backfill-repo-axis",
help="Migrate legacy REPO# GSI1 rows to the sharded repo-axis layout (#23).",
)
add_storage_args(parser, include_jsonl_path="", default_backend="dynamodb")
parser.add_argument(
"--dry-run",
action="store_true",
help="Report legacy inventory per entity without mutating anything.",
)
parser.set_defaults(func=cmd_backfill_repo_axis)


def _table_from_args(args: argparse.Namespace):
config = dynamodb_config_from_args(args)
resource, _client = make_boto3_resource_and_client(config)
return resource.Table(config.table_name)


def cmd_backfill_repo_axis(args: argparse.Namespace) -> int:
"""Backfill (or inventory) legacy repo-axis GSI1 rows for the dynamodb store."""
if args.storage_backend != "dynamodb":
print(
"backfill-repo-axis requires --storage-backend dynamodb "
f"(got '{args.storage_backend}')",
file=sys.stderr,
)
return 2
Comment thread
pureliture marked this conversation as resolved.

table = _table_from_args(args)

if args.dry_run:
inventory = inventory_legacy_repo_axis(table)
print("repo-axis legacy inventory (dry-run, no mutation):")
for entity_type, count in inventory.items():
print(f" {entity_type}: {count}")
print(f" total legacy rows: {sum(inventory.values())}")
return 0

report = backfill_repo_axis(table)
print("repo-axis backfill report:")
for entity_type, counts in report.by_entity.items():
print(
f" {entity_type}: inventory={counts.inventory} "
f"backfilled={counts.backfilled} skipped={counts.skipped} "
f"failed={counts.failed} remaining={counts.remaining}"
)
any_failed = any(counts.failed for counts in report.by_entity.values())
if report.gate_clear and not any_failed:
print("gate: CLEAR (no legacy repo-axis rows remain)")
return 0
print("gate: NOT CLEAR (legacy rows remain or failures occurred)")
return 1
1 change: 1 addition & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,4 +722,5 @@ def test_subcommand_registration_order_is_stable():
"enable-target",
"disable-target",
"sync",
"backfill-repo-axis",
]
172 changes: 172 additions & 0 deletions tests/test_cli_backfill_repo_axis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
"""Tests for the backfill-repo-axis CLI subcommand (issue #23)."""

from __future__ import annotations

from security_scanner.cli.app import main


class _FakeTable:
class ConditionalCheckFailedException(Exception):
pass

def __init__(self, items: list[dict]) -> None:
self.items = items
self.update_calls: list[dict] = []

def scan(self, **kwargs) -> dict:
entity_type = kwargs["ExpressionAttributeValues"][":entity_type"]
return {
"Items": [dict(i) for i in self.items if i.get("entityType") == entity_type]
}

def update_item(
self,
*,
Key: dict,
UpdateExpression: str,
ExpressionAttributeValues: dict,
ConditionExpression: str | None = None,
) -> dict:
self.update_calls.append(Key)
item = next(
(i for i in self.items if i["PK"] == Key["PK"] and i["SK"] == Key["SK"]),
None,
)
if item is None or (
ConditionExpression
and "attribute_not_exists(repoAxisVersion)" in ConditionExpression
and "repoAxisVersion" in item
):
raise self.ConditionalCheckFailedException()
for assignment in UpdateExpression.removeprefix("SET ").split(", "):
target, value_key = assignment.split(" = ", 1)
item[target] = ExpressionAttributeValues[value_key]
return {}


class _FakeResource:
def __init__(self, table: _FakeTable) -> None:
self._table = table

def Table(self, _name: str) -> _FakeTable: # noqa: N802 - boto3 API shape
return self._table


def _legacy_finding(repo: str = "fake-org/fake-repo") -> dict:
return {
"PK": "FINDING#f1",
"SK": "META",
"entityType": "FINDING",
"gsi1pk": f"REPO#{repo}",
"gsi1sk": "FINDING#f1",
"findingId": "f1",
"repo": repo,
}


def _legacy_ghas(repo: str = "fake-org/fake-repo") -> dict:
return {
"PK": "GHAS_ALERT#g1",
"SK": "META",
"entityType": "GHAS_ALERT",
"gsi1pk": f"REPO#{repo}",
"gsi1sk": "GHAS_ALERT#2026-06-16T00:00:00+00:00#g1",
"ghasAlertId": "g1",
"repository": repo,
"fetchedAt": "2026-06-16T00:00:00+00:00",
}


def _patch_table(monkeypatch, table: _FakeTable) -> None:
monkeypatch.setattr(
"security_scanner.cli.commands.migrate.make_boto3_resource_and_client",
lambda config: (_FakeResource(table), object()),
)


def test_dry_run_reports_inventory_without_mutating(monkeypatch, capsys):
table = _FakeTable([_legacy_finding(), _legacy_ghas()])
_patch_table(monkeypatch, table)

exit_code = main(
["backfill-repo-axis", "--storage-backend", "dynamodb", "--dry-run"]
)

out = capsys.readouterr().out
assert exit_code == 0
assert "dry-run" in out
assert "FINDING: 1" in out
assert "GHAS_ALERT: 1" in out
assert "total legacy rows: 2" in out
assert table.update_calls == [] # no mutation


def test_apply_backfills_and_reports_gate_clear(monkeypatch, capsys):
table = _FakeTable([_legacy_finding(), _legacy_ghas()])
_patch_table(monkeypatch, table)

exit_code = main(["backfill-repo-axis", "--storage-backend", "dynamodb"])

out = capsys.readouterr().out
assert exit_code == 0
assert "gate: CLEAR" in out
# both legacy rows updated in place to sharded keys
assert {(k["PK"], k["SK"]) for k in table.update_calls} == {
("FINDING#f1", "META"),
("GHAS_ALERT#g1", "META"),
}
for item in table.items:
assert item["repoAxisVersion"] == 2
assert "#SHARD#" in item["gsi1pk"]


def test_apply_is_idempotent_second_run_clear_with_no_updates(monkeypatch, capsys):
table = _FakeTable([_legacy_finding(), _legacy_ghas()])
_patch_table(monkeypatch, table)

assert main(["backfill-repo-axis", "--storage-backend", "dynamodb"]) == 0
updates_after_first = len(table.update_calls)
capsys.readouterr()

exit_code = main(["backfill-repo-axis", "--storage-backend", "dynamodb"])
out = capsys.readouterr().out

assert exit_code == 0
assert "gate: CLEAR" in out
assert len(table.update_calls) == updates_after_first # nothing left to backfill


def test_rejects_non_dynamodb_backend(monkeypatch, capsys):
# must not even try to build a table for jsonl
called = False

def _boom(config):
nonlocal called
called = True
raise AssertionError("should not build a table for jsonl")

monkeypatch.setattr(
"security_scanner.cli.commands.migrate.make_boto3_resource_and_client", _boom
)

exit_code = main(["backfill-repo-axis", "--storage-backend", "jsonl"])

err = capsys.readouterr().err
assert exit_code == 2
assert "requires --storage-backend dynamodb" in err
assert called is False
Comment thread
pureliture marked this conversation as resolved.


def test_apply_reports_gate_not_clear_on_failure(monkeypatch, capsys):
class _FailingTable(_FakeTable):
def update_item(self, **kwargs):
raise Exception("DynamoDB write failed")

table = _FailingTable([_legacy_finding()])
_patch_table(monkeypatch, table)

exit_code = main(["backfill-repo-axis", "--storage-backend", "dynamodb"])

out = capsys.readouterr().out
assert exit_code == 1
assert "gate: NOT CLEAR" in out
Loading