-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path_store.py
More file actions
57 lines (47 loc) · 2.06 KB
/
Copy path_store.py
File metadata and controls
57 lines (47 loc) · 2.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
"""Shared store-resolution helpers for CLI command modules.
Resolve an argparse namespace into a finding store, query result, or DynamoDB
config. Separated from ``_args`` (argparse-only) because these helpers depend on
storage and runtime modules. Kept out of ``cli.app`` so command modules can
share them without a back-reference to the app entrypoint.
"""
from __future__ import annotations
import argparse
from security_scanner.runtime.finding_query import FindingQueryRequest, read_findings
from security_scanner.storage.adapters.nosql_db.transport import (
DynamoDbCompatibleConfig,
)
from security_scanner.storage.factory import create_finding_store
def dynamodb_config_from_args(args: argparse.Namespace) -> DynamoDbCompatibleConfig:
env_config = DynamoDbCompatibleConfig.from_env()
return DynamoDbCompatibleConfig(
table_name=args.dynamodb_table or env_config.table_name,
endpoint_url=args.dynamodb_endpoint_url or env_config.endpoint_url,
region_name=args.dynamodb_region or env_config.region_name,
aws_access_key_id=env_config.aws_access_key_id,
aws_secret_access_key=env_config.aws_secret_access_key,
)
def store_from_args(args: argparse.Namespace):
kwargs = {}
if args.storage_backend == "jsonl":
kwargs["jsonl_path"] = getattr(args, "output", None) or getattr(
args, "findings", None
)
else:
kwargs["dynamodb_config"] = dynamodb_config_from_args(args)
return create_finding_store(args.storage_backend, **kwargs)
def read_findings_from_args(args: argparse.Namespace):
jsonl_path = None
dynamodb_config = None
if args.storage_backend == "jsonl":
jsonl_path = getattr(args, "findings", None)
else:
dynamodb_config = dynamodb_config_from_args(args)
return read_findings(
FindingQueryRequest(
storage_backend=args.storage_backend,
jsonl_path=jsonl_path,
scan_run_id=getattr(args, "scan_run_id", None),
dynamodb_config=dynamodb_config,
),
store_factory=create_finding_store,
)