Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions docs/views/source-scan-results-nosql-schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,44 @@ Schema는 데이터 모양보다 질문에서 출발합니다.
| --- | --- | --- |
| `REPO_META` | 대상 repository의 현재 정보와 최근 스캔 요약 | 대상 목록 |
| `SCAN_RUN` | 한 번의 스캔 실행 요약 | 실행 이력 |
| `FINDING` | 특정 실행에서 관측된 finding | 상세 검토 |
| `FINDING` | dedup된 finding identity | identity lookup |
| `FINDING_OBSERVATION` | 특정 scan run에서 관측된 finding snapshot | 상세 검토 |
| `FINDING_STATE` | dedup된 finding의 lifecycle/triage 상태 | 재검토와 gate |
| `EVAL_RUN` | synthetic corpus 평가 결과 | 품질 추적 |

## CORE item shape

현재 구현 범위는 CORE row만 다룹니다.

| Entity | PK | SK | 핵심 내용 |
| --- | --- | --- | --- |
| `REPO_META` | `REPO#<repoKey>` | `META` | repository metadata와 최근 스캔 요약 |
| `SCAN_RUN` | `REPO#<repoKey>` | `SCAN_RUN#<scanAtIso>#<scanRunId>` | scan run summary와 artifact pointer |
| `FINDING` | `FINDING#<findingId>` | `META` | repo, rule, source tool, location, fingerprint 같은 identity field |
| `FINDING_OBSERVATION` | `RUN#<scanRunId>` | `OBS#<findingId>#<occurrenceKey>` | run-scoped finding snapshot |
| `FINDING_STATE` | `FINDING#<findingId>` | `STATE#GLOBAL` | status와 triage lifecycle state |

`FINDING` identity row에는 scanner evidence snapshot이나 triage state를 넣지 않습니다.
Scan run별 evidence snapshot은 `FINDING_OBSERVATION`에 두고, runtime read는
observation snapshot에 `FINDING_STATE`를 overlay해서 `Finding`을 복원합니다.

`occurrenceKey`는 redacted canonical observation identity의 deterministic hash입니다.
재료는 `repo`, `ruleId`, `sourceTool`, `file`, `startLine`, `fingerprint`를 기본으로
하고, redacted fallback으로 `secretHash`, `matchHash`를 사용할 수 있습니다. Raw
secret이나 raw match 문자열은 occurrence key material에 넣지 않습니다.

`FINDING_STATE`는 현재 `GLOBAL` scope만 사용합니다. Scan write는 state row가 없을
때만 default state를 만들고, 이미 존재하는 manual triage verdict/verifier/reason을
blind overwrite하지 않습니다. Observation write는 state와 분리되어 idempotent하게
처리합니다.

## 조회 기준

| 알고 싶은 것 | 접근 방식 |
| --- | --- |
| 최근 대상 목록 | repo list index를 page 단위로 조회 |
| 대상별 스캔 이력 | 대상 partition에서 scan run만 조회 |
| 특정 실행의 finding | scan run partition에서 finding 조회 |
| finding 상태 | finding별 state item 조회 |
| 특정 실행의 finding | scan run partition에서 `OBS#` item 조회 |
| finding 상태 | finding별 `STATE#GLOBAL` item 조회 |
| report/gate 판단 | finding snapshot에 lifecycle state를 merge한 뒤 계산 |

## 안전 규칙
Expand All @@ -42,6 +68,16 @@ Schema는 데이터 모양보다 질문에서 출발합니다.
- 실제 외부 export, 비공개 finding, DB dump는 이 저장소 밖에 둡니다.
- TTL, streams, transaction, 운영 DynamoDB behavior는 현재 기본 요구사항이 아닙니다.

## 현재 non-goals

다음 row/table은 CORE schema split 범위가 아닙니다.

- `FindingFingerprintMap`
- `ScanRunQueryRows`
- `PatternQueryRows`
- standalone Artifacts table/item
- TTL, streams, Lambda, production DynamoDB behavior

## 로컬 실행 환경

Dynalite는 로컬 검증 후보입니다. DynamoDB Local과 LocalStack은 parity 또는 adapter integration을 확인할 때 검토할 수 있지만, 현재 기본 운영 결정은 아닙니다.
9 changes: 7 additions & 2 deletions src/security_scanner/storage/adapters/nosql_db/access.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@


def items_to_findings(items: Iterable[dict[str, Any]]) -> list[Finding]:
"""Return Finding objects from run-scoped finding items."""
"""Return Finding objects from observation items."""
findings: list[Finding] = []
for item in items:
if item.get("entityType") == "FINDING" and "finding" in item:
if (
item.get("entityType") == "FINDING_OBSERVATION"
and "findingSnapshot" in item
):
findings.append(Finding.from_dict(item["findingSnapshot"]))
elif item.get("entityType") == "FINDING" and "finding" in item:
findings.append(Finding.from_dict(item["finding"]))
return findings

Expand Down
84 changes: 73 additions & 11 deletions src/security_scanner/storage/adapters/nosql_db/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from __future__ import annotations

import datetime as dt
import hashlib
import json
from collections import Counter
from dataclasses import dataclass, field
from typing import Any, Iterable
Expand Down Expand Up @@ -161,37 +163,92 @@ def scan_target_to_item(target: ScanTarget) -> dict[str, Any]:

def scan_target_from_item(item: dict[str, Any]) -> ScanTarget:
"""Reconstruct a scan target from a table item."""
return ScanTarget(url=item["url"], name=item["name"], enabled=bool(item.get("enabled", True)))
return ScanTarget(
url=item["url"],
name=item["name"],
enabled=bool(item.get("enabled", True)),
)


STATE_SCOPE_GLOBAL = "GLOBAL"


def occurrence_key_for_finding(finding: Finding) -> str:
"""Return a deterministic redacted observation occurrence key."""
material: dict[str, Any] = {
"repo": finding.repo.full_name,
"ruleId": finding.rule_id,
"sourceTool": finding.source_tool,
"file": finding.location.file_path,
"startLine": finding.location.line_start,
"fingerprint": finding.fingerprint,
}
if not finding.fingerprint:
material["secretHash"] = finding.evidence.secret_hash
if finding.gitleaks and finding.gitleaks.match:
material["matchHash"] = hashlib.sha256(
finding.gitleaks.match.encode("utf-8")
).hexdigest()
encoded = json.dumps(without_none(material), sort_keys=True, separators=(",", ":"))
digest = hashlib.sha256(encoded.encode("utf-8")).hexdigest()[:32]
return f"occ_{digest}"


def finding_to_items(finding: Finding) -> list[dict[str, Any]]:
"""Map one Finding into run-scoped and lifecycle state table items."""
"""Map one Finding into identity, observation, and lifecycle state items."""
now = now_iso()
finding_id = finding.finding_id
repo = finding.repo.full_name
run_id = finding.scan.scan_run_id
rule_id = finding.rule_id
occurrence_key = occurrence_key_for_finding(finding)

run_sort = f"FINDING#{repo}#{finding_id}"
run_item = {
"PK": f"RUN#{run_id}",
"SK": run_sort,
identity_item = {
"PK": f"FINDING#{finding_id}",
"SK": "META",
"entityType": "FINDING",
"gsi1pk": f"REPO#{repo}",
"gsi1sk": f"RUN#{run_id}#{run_sort}",
"gsi1sk": f"FINDING#{finding_id}",
"gsi2pk": f"RULE#{rule_id}",
"gsi2sk": f"FINDING#{finding_id}",
"createdAt": now,
"updatedAt": now,
"findingId": finding_id,
"repo": repo,
"ruleId": rule_id,
"sourceTool": finding.source_tool,
"sourceToolVersion": finding.source_tool_version,
"category": finding.category,
"severity": finding.severity,
"confidence": finding.confidence,
"file": finding.location.file_path,
"startLine": finding.location.line_start,
"fingerprint": finding.fingerprint,
}
observation_item = {
"PK": f"RUN#{run_id}",
"SK": f"OBS#{finding_id}#{occurrence_key}",
"entityType": "FINDING_OBSERVATION",
"gsi1pk": f"REPO#{repo}",
"gsi1sk": f"RUN#{run_id}#OBS#{finding_id}#{occurrence_key}",
"gsi2pk": f"RULE#{rule_id}",
"gsi2sk": f"RUN#{run_id}#{repo}#{finding_id}",
"gsi2sk": f"RUN#{run_id}#{repo}#{finding_id}#{occurrence_key}",
"createdAt": now,
"updatedAt": now,
"findingId": finding_id,
"scanRunId": run_id,
"occurrenceKey": occurrence_key,
"repo": repo,
"ruleId": rule_id,
"sourceTool": finding.source_tool,
"file": finding.location.file_path,
"startLine": finding.location.line_start,
"fingerprint": finding.fingerprint,
"finding": finding.to_dict(),
"findingSnapshot": finding.to_dict(),
}
state_item = {
"PK": f"FINDING#{finding_id}",
"SK": "STATE",
"SK": f"STATE#{STATE_SCOPE_GLOBAL}",
"entityType": "FINDING_STATE",
"gsi1pk": f"REPO#{repo}",
"gsi1sk": f"FINDING#{finding_id}",
Expand All @@ -200,13 +257,18 @@ def finding_to_items(finding: Finding) -> list[dict[str, Any]]:
"createdAt": now,
"updatedAt": now,
"findingId": finding_id,
"stateScopeKey": STATE_SCOPE_GLOBAL,
"repo": repo,
"ruleId": rule_id,
"fingerprint": finding.fingerprint,
"status": finding.status,
"triage": finding.triage.to_dict(),
}
return [run_item, state_item]
return [
without_none(identity_item),
without_none(observation_item),
without_none(state_item),
]


def scan_date(scan_at_iso: str) -> str:
Expand Down
69 changes: 60 additions & 9 deletions src/security_scanner/storage/adapters/nosql_db/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
)
from security_scanner.storage.adapters.nosql_db.items import (
RepoMetadata,
STATE_SCOPE_GLOBAL,
ScanRunSummary,
counts_by_category,
finding_to_items,
Expand Down Expand Up @@ -79,7 +80,10 @@ def ensure_table(self) -> None:

def append(self, finding: Finding) -> None:
for item in finding_to_items(finding):
self._table.put_item(Item=item)
if item.get("entityType") == "FINDING_STATE":
self._put_state_item_if_absent(item)
else:
self._table.put_item(Item=item)

def extend(self, findings: Iterable[Finding]) -> None:
for finding in findings:
Expand Down Expand Up @@ -183,23 +187,21 @@ def read_for_scan_run(self, scan_run_id: str) -> list[Finding]:
KeyConditionExpression="PK = :pk AND begins_with(SK, :sk_prefix)",
ExpressionAttributeValues={
":pk": f"RUN#{scan_run_id}",
":sk_prefix": "FINDING#",
":sk_prefix": "OBS#",
},
)
findings = items_to_findings(items)
Comment thread
pureliture marked this conversation as resolved.
state_by_id = {
finding.finding_id: state
for finding in findings
if (state := self.read_finding_state(finding.finding_id)) is not None
}
state_by_id = self._batch_read_finding_states(
finding.finding_id for finding in findings
)
return merge_finding_states(findings, state_by_id)

def read_finding_state(self, finding_id: str) -> dict[str, Any] | None:
response = self._table.query(
KeyConditionExpression="PK = :pk AND begins_with(SK, :sk_prefix)",
ExpressionAttributeValues={
":pk": f"FINDING#{finding_id}",
":sk_prefix": "STATE",
":sk_prefix": "STATE#",
},
Limit=1,
)
Expand All @@ -210,7 +212,7 @@ def read_all(self) -> list[Finding]:
finding_items = scan_all_pages(
self._table,
FilterExpression="entityType = :entity_type",
ExpressionAttributeValues={":entity_type": "FINDING"},
ExpressionAttributeValues={":entity_type": "FINDING_OBSERVATION"},
)
state_items = scan_all_pages(
self._table,
Expand All @@ -227,3 +229,52 @@ def clear(self) -> None:
"DynamoDbCompatibleFindingStore refuses destructive clear(); "
"delete/recreate the local table explicitly instead"
)

def _put_state_item_if_absent(self, item: dict[str, Any]) -> None:
"""Create lifecycle state only when manual triage has no row yet."""
try:
self._table.put_item(
Item=item,
ConditionExpression="attribute_not_exists(PK) AND attribute_not_exists(SK)",
)
except Exception as exc:
if _is_conditional_check_failure(exc):
return
raise

def _batch_read_finding_states(
self,
finding_ids: Iterable[str],
) -> dict[str, dict[str, Any]]:
"""Fetch lifecycle state rows without one query per finding."""
unique_finding_ids = list(dict.fromkeys(finding_ids))
state_by_id: dict[str, dict[str, Any]] = {}
for start in range(0, len(unique_finding_ids), 100):
keys = [
{
"PK": f"FINDING#{finding_id}",
"SK": f"STATE#{STATE_SCOPE_GLOBAL}",
}
for finding_id in unique_finding_ids[start : start + 100]
]
request_items = {self.config.table_name: {"Keys": keys}}
while request_items:
response = self._resource.batch_get_item(RequestItems=request_items)
for item in response.get("Responses", {}).get(
self.config.table_name,
[],
):
if item.get("entityType") == "FINDING_STATE":
state_by_id[item["findingId"]] = item
request_items = response.get("UnprocessedKeys", {})
return state_by_id


def _is_conditional_check_failure(exc: Exception) -> bool:
"""Return True for DynamoDB conditional-write conflicts."""
response = getattr(exc, "response", None)
if isinstance(response, dict):
error = response.get("Error", {})
if error.get("Code") == "ConditionalCheckFailedException":
return True
return exc.__class__.__name__ == "ConditionalCheckFailedException"
Loading
Loading