-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfinding_query.py
More file actions
69 lines (55 loc) · 2.13 KB
/
Copy pathfinding_query.py
File metadata and controls
69 lines (55 loc) · 2.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
"""Finding query runtime use case."""
from __future__ import annotations
from collections.abc import Sequence
from dataclasses import dataclass
from pathlib import Path
from typing import Callable
from security_scanner.core.finding.model import (
Finding,
filter_findings_by_disposition,
)
from security_scanner.storage.base import FindingReader
from security_scanner.storage.adapters.nosql_db.transport import (
DynamoDbCompatibleConfig,
)
from security_scanner.storage.factory import create_finding_store
StoreFactory = Callable[..., FindingReader]
@dataclass(frozen=True)
class FindingQueryRequest:
"""Inputs needed to read findings from a storage backend."""
storage_backend: str
jsonl_path: str | Path | None = None
scan_run_id: str | None = None
dynamodb_config: DynamoDbCompatibleConfig | None = None
# Optional disposition filter (FR-11). When set, only findings whose
# disposition is in this set are returned; this is the read-API seam M7
# consumes to power the dashboard's disposition filter. None = no filter.
dispositions: Sequence[str] | None = None
def _reader_for_request(
request: FindingQueryRequest,
store_factory: StoreFactory,
) -> FindingReader:
backend = _normalize_backend(request.storage_backend)
if backend == "jsonl":
return store_factory(backend, jsonl_path=request.jsonl_path)
return store_factory(
backend,
dynamodb_config=request.dynamodb_config,
)
def _normalize_backend(backend: str) -> str:
return backend.lower().replace("_", "-")
def read_findings(
request: FindingQueryRequest,
*,
store: FindingReader | None = None,
store_factory: StoreFactory = create_finding_store,
) -> list[Finding]:
"""Read findings for a query request through the explicit reader seam."""
reader = store or _reader_for_request(request, store_factory)
if request.scan_run_id:
findings = reader.read_for_scan_run(request.scan_run_id)
else:
findings = reader.read_all()
if request.dispositions is not None:
findings = filter_findings_by_disposition(findings, request.dispositions)
return findings