diff --git a/src/security_scanner/scanners/gitleaks/mapper.py b/src/security_scanner/scanners/gitleaks/mapper.py index 761fe87..2bcf58a 100644 --- a/src/security_scanner/scanners/gitleaks/mapper.py +++ b/src/security_scanner/scanners/gitleaks/mapper.py @@ -34,6 +34,7 @@ import logging from pathlib import Path +from typing import Any from security_scanner.core.finding.model import ( ConfidenceLevel, @@ -46,10 +47,93 @@ logger = logging.getLogger(__name__) # Default severity/confidence applied to all gitleaks findings. -# TODO(phase2): implement per-rule severity mapping based on gitleaks tags/rule metadata. DEFAULT_SEVERITY = Severity.HIGH.value DEFAULT_CONFIDENCE = ConfidenceLevel.MEDIUM.value +# Rule ID static mapping tables +RULE_SEVERITY_MAP = { + "aws-access-token": Severity.CRITICAL.value, + "gcp-api-key": Severity.CRITICAL.value, + "slack-webhook": Severity.HIGH.value, + "synthetic-fake-token": Severity.LOW.value, +} + +RULE_CONFIDENCE_MAP = { + "aws-access-token": ConfidenceLevel.HIGH.value, + "gcp-api-key": ConfidenceLevel.MEDIUM.value, + "slack-webhook": ConfidenceLevel.MEDIUM.value, + "synthetic-fake-token": ConfidenceLevel.LOW.value, +} + +VALID_SEVERITIES = {severity.value for severity in Severity} +VALID_CONFIDENCES = {confidence.value for confidence in ConfidenceLevel} + + +def _get_case_insensitive(data: dict, key: str, default: Any = None) -> Any: + if not isinstance(data, dict): + return default + if key in data: + return data[key] + key_lower = key.lower() + for k, v in data.items(): + if not isinstance(k, str): + continue + if k.lower() == key_lower: + return v + return default + + +def resolve_severity_and_confidence(rule_id: str | None, tags: list[str] | None) -> tuple[str, str]: + """Resolve severity and confidence based on tags, rule ID, and fallbacks. + + Priorities: + 1. Tags-based mapping (parse 'severity:' and 'confidence:', case-insensitive) + 2. Rule ID-based mapping (static lookup tables and 'aws-' prefix match for severity) + 3. Fallbacks (DEFAULT_SEVERITY and DEFAULT_CONFIDENCE) + """ + severity_from_tags = None + confidence_from_tags = None + + if tags: + for tag in tags: + if not isinstance(tag, str) or ":" not in tag: + continue + parts = tag.split(":", 1) + key = parts[0].strip().lower() + val = parts[1].strip().upper() + if key == "severity": + if val in VALID_SEVERITIES: + severity_from_tags = val + elif key == "confidence": + if val in VALID_CONFIDENCES: + confidence_from_tags = val + + # Determine Severity + if severity_from_tags is not None: + severity = severity_from_tags + elif isinstance(rule_id, str): + if rule_id in RULE_SEVERITY_MAP: + severity = RULE_SEVERITY_MAP[rule_id] + elif rule_id.startswith("aws-"): + severity = Severity.CRITICAL.value + else: + severity = DEFAULT_SEVERITY + else: + severity = DEFAULT_SEVERITY + + # Determine Confidence + if confidence_from_tags is not None: + confidence = confidence_from_tags + elif isinstance(rule_id, str): + if rule_id in RULE_CONFIDENCE_MAP: + confidence = RULE_CONFIDENCE_MAP[rule_id] + else: + confidence = DEFAULT_CONFIDENCE + else: + confidence = DEFAULT_CONFIDENCE + + return severity, confidence + def map_gitleaks_item( item: dict, @@ -62,37 +146,67 @@ def map_gitleaks_item( index: int | None = None, ) -> Finding | None: """Map one Gitleaks JSON item to a core Finding, or None if invalid.""" - rule_id = item.get("RuleID", "") - file_path = normalize_report_path(item.get("File", ""), source_root) - start_line = item.get("StartLine") - raw_secret = item.get("Secret", "") - - if start_line is None: - _warning("item %s missing StartLine, skipping", index) + if not isinstance(item, dict): + _warning("item is not a dict: %s", type(item)) return None - line_end = item.get("EndLine") - fingerprint_override = item.get("Fingerprint") or None - triage_reason = item.get("Description") or None try: + rule_id = _get_case_insensitive(item, "RuleID") + file_val = _get_case_insensitive(item, "File", "") + file_path = normalize_report_path(file_val, source_root) + if file_path is None: + _warning("item %s has invalid or escaping File: %s", index, file_val) + return None + + start_line = _get_case_insensitive(item, "StartLine") + if start_line is None: + _warning("item %s missing StartLine, skipping", index) + return None + + raw_secret = _get_case_insensitive(item, "Secret", "") + line_end = _get_case_insensitive(item, "EndLine") + fingerprint_override = _get_case_insensitive(item, "Fingerprint") or None + triage_reason = _get_case_insensitive(item, "Description") or None + commit_val = _get_case_insensitive(item, "Commit") or None + match_val = _get_case_insensitive(item, "Match") + start_line_int = int(start_line) + if start_line_int < 1: + raise ValueError(f"StartLine must be >= 1, got {start_line_int}") + end_line_int = int(line_end) if line_end is not None else None - tags = item.get("Tags") + if end_line_int is not None: + if end_line_int < 1: + raise ValueError(f"EndLine must be >= 1, got {end_line_int}") + if end_line_int < start_line_int: + raise ValueError(f"EndLine ({end_line_int}) cannot be less than StartLine ({start_line_int})") + + tags = _get_case_insensitive(item, "Tags") + if isinstance(tags, (list, tuple, set)): + tags_list = [t for t in tags if isinstance(t, str)] + elif hasattr(tags, "__iter__") and not isinstance(tags, (str, bytes, dict)): + tags_list = [t for t in tags if isinstance(t, str)] + else: + tags_list = [] + + severity, confidence = resolve_severity_and_confidence(rule_id, tags_list) + rule_id_str = str(rule_id) if rule_id is not None else "" + gitleaks_payload = GitleaksFindingPayload( - rule_id=rule_id, + rule_id=rule_id_str, file=file_path, start_line=start_line_int, end_line=end_line_int, - match=item.get("Match"), + match=match_val, secret=raw_secret, fingerprint=fingerprint_override, - description=item.get("Description"), - commit=item.get("Commit"), - tags=list(tags) if isinstance(tags, list) else [], + description=triage_reason, + commit=commit_val, + tags=tags_list, ) return Finding.create( repo_full_name=repo_full_name, - rule_id=rule_id, + rule_id=rule_id_str, file_path=file_path, line_start=start_line_int, raw_secret=raw_secret, @@ -100,31 +214,71 @@ def map_gitleaks_item( scan_run_id=scan_run_id, rule_pack_version=rule_pack_version, line_end=end_line_int, - severity=DEFAULT_SEVERITY, - confidence=DEFAULT_CONFIDENCE, - repo_commit=item.get("Commit") or None, + severity=severity, + confidence=confidence, + repo_commit=commit_val, fingerprint_override=fingerprint_override, triage_reason=triage_reason, gitleaks=gitleaks_payload, ) - except (ValueError, TypeError) as exc: + except (ValueError, TypeError, AttributeError) as exc: _warning("failed to create Finding for item %s: %s", index, exc) return None -def normalize_report_path(file_path: str, source_root: Path | None) -> str: - """Return repo-relative path when Gitleaks emits an absolute path.""" - if not file_path or source_root is None: - return file_path +def normalize_report_path(file_path: Any, source_root: Path | None) -> str | None: + """Return repo-relative path when Gitleaks emits an absolute path. - path = Path(file_path) - if not path.is_absolute(): - return file_path + Returns None if file_path is invalid (e.g. invalid type or contains null bytes), + or if it escapes the repository root. + """ + if not isinstance(file_path, str): + return None + if "\x00" in file_path: + return None + if not file_path: + return "" try: - return path.resolve().relative_to(source_root.resolve()).as_posix() - except ValueError: - return file_path + path = Path(file_path) + except (TypeError, ValueError): + return None + + # Check for path traversal escaping repo root (if source_root is provided) + if source_root is not None: + try: + resolved_root = source_root.resolve() + # If path is absolute, check if it's within source_root + if path.is_absolute(): + resolved_path = path.resolve() + relative = resolved_path.relative_to(resolved_root) + return relative.as_posix() + else: + # If path is relative, resolve it relative to source_root to ensure no escape + resolved_path = (resolved_root / path).resolve() + relative = resolved_path.relative_to(resolved_root) + return relative.as_posix() + except ValueError: + # Escapes source_root + return None + except (TypeError, RuntimeError, OSError): + return None + else: + # If source_root is None, we still must prevent escaping repository root. + # Absolute path (e.g. /etc/passwd) cannot be resolved to a relative path, so it escapes. + if path.is_absolute(): + return None + # Check if relative path contains '..' that goes outside the relative base. + try: + dummy_root = Path("/dummy/root").resolve() + resolved_path = (dummy_root / path).resolve() + relative = resolved_path.relative_to(dummy_root) + return relative.as_posix() + except ValueError: + # Escapes relative root (e.g. ../../etc/passwd) + return None + except (TypeError, RuntimeError, OSError): + return None def _warning(message: str, *args: object) -> None: diff --git a/tests/test_dynamic_mapping_e2e.py b/tests/test_dynamic_mapping_e2e.py new file mode 100644 index 0000000..1472485 --- /dev/null +++ b/tests/test_dynamic_mapping_e2e.py @@ -0,0 +1,486 @@ +"""E2E test suite for Gitleaks Scanner dynamic severity and confidence mapping. + +Tests dynamic mapping logic (Tags-based, Rule ID-based, and Fallback) +from an opaque-box perspective by calling GitleaksScanner.scan(), run_local_scan(), +and CLI command entry points. +""" + +from __future__ import annotations + +import argparse +import json +from pathlib import Path +import pytest + +from security_scanner.core.finding.model import Finding, Severity, ConfidenceLevel +from security_scanner.scanners.gitleaks.parser import parse_gitleaks_report +from security_scanner.runtime.local_scan import ( + run_local_scan, + LocalScanRequest, +) +from security_scanner.cli.app import cmd_scan, cmd_gate +from security_scanner.storage.jsonl_store import JsonlFindingStore + + +# --------------------------------------------------------------------------- +# Test Setup & Helper Fakes +# --------------------------------------------------------------------------- + +class FakeRunner: + """Fake GitleaksRunner returning custom Gitleaks JSON reports.""" + def __init__(self, raw_json: str = "[]") -> None: + self.raw_json = raw_json + + def run(self, root: Path, scan_options=None) -> str: + return self.raw_json + + +def make_gitleaks_item( + rule_id: str, + tags: list[str] | None = None, + file: str = "src/main.py", + start_line: int = 1, + secret: str = "AKIAFAKEEXAMPLE000001", +) -> dict: + """Generate a single Gitleaks JSON item.""" + item = { + "RuleID": rule_id, + "File": file, + "StartLine": start_line, + "EndLine": start_line, + "Secret": secret, + "Match": f"secret = {secret}", + "Description": f"Detected {rule_id}", + "Fingerprint": f"fp-{rule_id}-{start_line}", + } + if tags is not None: + item["Tags"] = tags + return item + + +def run_scanner_with_report(report_items: list[dict]) -> list[Finding]: + """Execute parser directly with a fake report.""" + raw_json = json.dumps(report_items) + return parse_gitleaks_report( + raw_json, + repo_full_name="example/repo", + scan_run_id="scan_dynamic_mapping_test", + rule_pack_version="secret-rules-0.1.0", + source_root=Path("/tmp/fake-repo"), + source_tool="gitleaks", + ) + + +# =========================================================================== +# Tier 1: Feature Coverage (Tags mapping >= 5, Rule ID mapping >= 5) +# =========================================================================== + +class TestTier1FeatureCoverage: + # --- Tags-based mapping (Expected: parsed severity/confidence from tags) --- + + def test_tags_severity_critical(self): + item = make_gitleaks_item("some-rule", tags=["severity:critical"]) + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + assert findings[0].severity == Severity.CRITICAL.value + + def test_tags_severity_high(self): + item = make_gitleaks_item("some-rule", tags=["severity:high"]) + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + assert findings[0].severity == Severity.HIGH.value + + def test_tags_severity_medium(self): + item = make_gitleaks_item("some-rule", tags=["severity:medium"]) + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + assert findings[0].severity == Severity.MEDIUM.value + + def test_tags_severity_low(self): + item = make_gitleaks_item("some-rule", tags=["severity:low"]) + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + assert findings[0].severity == Severity.LOW.value + + def test_tags_severity_info(self): + item = make_gitleaks_item("some-rule", tags=["severity:info"]) + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + assert findings[0].severity == Severity.INFO.value + + def test_tags_confidence_high(self): + item = make_gitleaks_item("some-rule", tags=["confidence:high"]) + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + assert findings[0].confidence == ConfidenceLevel.HIGH.value + + def test_tags_confidence_medium(self): + item = make_gitleaks_item("some-rule", tags=["confidence:medium"]) + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + assert findings[0].confidence == ConfidenceLevel.MEDIUM.value + + def test_tags_confidence_low(self): + item = make_gitleaks_item("some-rule", tags=["confidence:low"]) + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + assert findings[0].confidence == ConfidenceLevel.LOW.value + + # --- Rule ID-based mapping (Expected: mapped via RULE_SEVERITY_MAP / RULE_CONFIDENCE_MAP) --- + + def test_rule_id_aws_access_token(self): + item = make_gitleaks_item("aws-access-token") + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + assert findings[0].severity == Severity.CRITICAL.value + assert findings[0].confidence == ConfidenceLevel.HIGH.value + + def test_rule_id_gcp_api_key(self): + item = make_gitleaks_item("gcp-api-key") + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + assert findings[0].severity == Severity.CRITICAL.value + + def test_rule_id_slack_webhook(self): + item = make_gitleaks_item("slack-webhook") + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + assert findings[0].severity == Severity.HIGH.value + + def test_rule_id_synthetic_fake_token(self): + item = make_gitleaks_item("synthetic-fake-token") + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + assert findings[0].severity == Severity.LOW.value + assert findings[0].confidence == ConfidenceLevel.LOW.value + + def test_rule_id_aws_prefix_match(self): + """Rule ID matching a prefix like 'aws-' should resolve to CRITICAL.""" + item = make_gitleaks_item("aws-secret-key") + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + assert findings[0].severity == Severity.CRITICAL.value + + +# =========================================================================== +# Tier 2: Boundary & Corner Cases (Tags >= 5, Rule ID >= 5) +# =========================================================================== + +class TestTier2BoundaryCases: + # --- Tags boundary cases --- + + def test_tags_case_insensitivity(self): + item = make_gitleaks_item("some-rule", tags=["Severity:Critical", "CONFIDENCE:LOW"]) + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + assert findings[0].severity == Severity.CRITICAL.value + assert findings[0].confidence == ConfidenceLevel.LOW.value + + def test_tags_whitespace(self): + item = make_gitleaks_item("some-rule", tags=["severity: high ", "confidence: medium "]) + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + assert findings[0].severity == Severity.HIGH.value + assert findings[0].confidence == ConfidenceLevel.MEDIUM.value + + def test_tags_empty_and_none(self): + item1 = make_gitleaks_item("some-rule", tags=[]) + item2 = make_gitleaks_item("some-rule", tags=None) + findings = run_scanner_with_report([item1, item2]) + assert len(findings) == 2 + for f in findings: + assert f.severity == Severity.HIGH.value # Fallback DEFAULT_SEVERITY + assert f.confidence == ConfidenceLevel.MEDIUM.value # Fallback DEFAULT_CONFIDENCE + + def test_tags_invalid_prefix(self): + item = make_gitleaks_item("some-rule", tags=["sever:critical", "conf:high"]) + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + assert findings[0].severity == Severity.HIGH.value + assert findings[0].confidence == ConfidenceLevel.MEDIUM.value + + def test_tags_unknown_level(self): + item = make_gitleaks_item("some-rule", tags=["severity:super-critical", "confidence:very-high"]) + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + assert findings[0].severity == Severity.HIGH.value + assert findings[0].confidence == ConfidenceLevel.MEDIUM.value + + def test_tags_missing_value(self): + item = make_gitleaks_item("some-rule", tags=["severity:", "confidence:"]) + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + assert findings[0].severity == Severity.HIGH.value + assert findings[0].confidence == ConfidenceLevel.MEDIUM.value + + # --- Rule ID boundary cases --- + + def test_rule_id_empty(self): + item = make_gitleaks_item("") + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + assert findings[0].severity == Severity.HIGH.value + assert findings[0].confidence == ConfidenceLevel.MEDIUM.value + + def test_rule_id_unknown(self): + item = make_gitleaks_item("unknown-rule-id") + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + assert findings[0].severity == Severity.HIGH.value + assert findings[0].confidence == ConfidenceLevel.MEDIUM.value + + def test_rule_id_whitespace(self): + item = make_gitleaks_item(" aws-access-token ") + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + # Whitespace rule-id should fallback if exact matching is used + assert findings[0].severity == Severity.HIGH.value + assert findings[0].confidence == ConfidenceLevel.MEDIUM.value + + def test_rule_id_none(self): + item = make_gitleaks_item("aws-access-token") + del item["RuleID"] # RuleID key missing + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + assert findings[0].severity == Severity.HIGH.value + assert findings[0].confidence == ConfidenceLevel.MEDIUM.value + + def test_rule_id_integer(self): + item = make_gitleaks_item("aws-access-token") + item["RuleID"] = 12345 # Non-string RuleID + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + assert findings[0].severity == Severity.HIGH.value + assert findings[0].confidence == ConfidenceLevel.MEDIUM.value + + +# =========================================================================== +# Tier 3: Cross-Feature Combinations (>= 2) +# =========================================================================== + +class TestTier3CrossFeatureCombinations: + def test_combo_tags_overrides_rule_id(self): + """Tags mapping takes precedence over Rule ID mapping.""" + # aws-access-token maps to CRITICAL, but tag says LOW + item = make_gitleaks_item("aws-access-token", tags=["severity:low", "confidence:low"]) + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + assert findings[0].severity == Severity.LOW.value + assert findings[0].confidence == ConfidenceLevel.LOW.value + + def test_combo_tags_severity_and_rule_id_confidence(self): + """Tags severity combines with Rule ID confidence if tag lacks confidence.""" + # aws-access-token maps to confidence HIGH. Tag specifies severity LOW but no confidence. + item = make_gitleaks_item("aws-access-token", tags=["severity:low"]) + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + assert findings[0].severity == Severity.LOW.value + assert findings[0].confidence == ConfidenceLevel.HIGH.value + + def test_combo_invalid_tags_falls_back_to_rule_id(self): + """If tag parsing fails due to invalid values, Rule ID mapping is used.""" + # tag severity is invalid; should use Rule ID mapping for severity (CRITICAL) + item = make_gitleaks_item("aws-access-token", tags=["severity:invalid"]) + findings = run_scanner_with_report([item]) + assert len(findings) == 1 + assert findings[0].severity == Severity.CRITICAL.value + assert findings[0].confidence == ConfidenceLevel.HIGH.value + + +# =========================================================================== +# Tier 4: Real-World Application Scenarios (>= 5) +# =========================================================================== + +class TestTier4RealWorldScenarios: + @pytest.fixture + def setup_manifest(self, tmp_path): + """Create a temporary manifest.yaml for scanning.""" + manifest_path = tmp_path / "manifest.yaml" + # Create a synthetic target clone dir + target_dir = tmp_path / "fake-repo" + target_dir.mkdir(parents=True, exist_ok=True) + # Write dummy file to scan + (target_dir / "app.py").write_text("API_KEY = 'AKIAFAKEEXAMPLE000001'") + + manifest_data = { + "version": 1, + "gitleaks_config": "", + "targets": [ + { + "name": "example-repo", + "path": target_dir.as_posix(), + "url": f"file://{target_dir.as_posix()}", + "enabled": True, + } + ], + } + manifest_path.write_text(json.dumps(manifest_data)) + return manifest_path, target_dir + + def test_scenario_dev_env_scan(self, setup_manifest, tmp_path): + """Scenario: Running scan in dev env where a synthetic token is detected.""" + manifest_path, target_dir = setup_manifest + report_items = [ + make_gitleaks_item("synthetic-fake-token", file="app.py", secret="FAKE-TOKEN-123") + ] + + # Use run_local_scan with mock scanner factory returning synthetic findings + output_jsonl = tmp_path / "findings.jsonl" + request = LocalScanRequest( + manifest_path=manifest_path, + output_destination=str(output_jsonl), + storage_backend="jsonl", + ) + + def mock_scanner_factory(manifest): + class MockScanner: + name = "gitleaks" + def scan(self, **kwargs): + return run_scanner_with_report(report_items) + return MockScanner() + + result = run_local_scan(request, scanner_factory=mock_scanner_factory) + assert result.total_findings == 1 + + # Verify persisted findings severity + store = JsonlFindingStore(str(output_jsonl)) + findings = store.read_all() + assert len(findings) == 1 + assert findings[0].severity == Severity.LOW.value + assert findings[0].confidence == ConfidenceLevel.LOW.value + + def test_scenario_cloud_infra_scan(self, setup_manifest, tmp_path): + """Scenario: Cloud infrastructure scan with critical AWS and GCP keys.""" + manifest_path, target_dir = setup_manifest + report_items = [ + make_gitleaks_item("aws-access-token", file="infra/aws.tf"), + make_gitleaks_item("gcp-api-key", file="infra/gcp.tf"), + ] + + output_jsonl = tmp_path / "findings.jsonl" + request = LocalScanRequest( + manifest_path=manifest_path, + output_destination=str(output_jsonl), + storage_backend="jsonl", + ) + + def mock_scanner_factory(manifest): + class MockScanner: + name = "gitleaks" + def scan(self, **kwargs): + return run_scanner_with_report(report_items) + return MockScanner() + + run_local_scan(request, scanner_factory=mock_scanner_factory) + store = JsonlFindingStore(str(output_jsonl)) + findings = store.read_all() + assert len(findings) == 2 + severities = {f.severity for f in findings} + assert severities == {Severity.CRITICAL.value} + + def test_scenario_mixed_scan(self, setup_manifest, tmp_path): + """Scenario: Mixed findings with various rules and tags.""" + manifest_path, _ = setup_manifest + report_items = [ + make_gitleaks_item("aws-access-token"), # CRITICAL, HIGH + make_gitleaks_item("synthetic-fake-token"), # LOW, LOW + make_gitleaks_item("unknown-rule", tags=["severity:medium"]), # MEDIUM, MEDIUM (fallback) + ] + + output_jsonl = tmp_path / "findings.jsonl" + request = LocalScanRequest( + manifest_path=manifest_path, + output_destination=str(output_jsonl), + storage_backend="jsonl", + ) + + def mock_scanner_factory(manifest): + class MockScanner: + name = "gitleaks" + def scan(self, **kwargs): + return run_scanner_with_report(report_items) + return MockScanner() + + run_local_scan(request, scanner_factory=mock_scanner_factory) + store = JsonlFindingStore(str(output_jsonl)) + findings = store.read_all() + assert len(findings) == 3 + mapped = {f.rule_id: (f.severity, f.confidence) for f in findings} + assert mapped["aws-access-token"] == (Severity.CRITICAL.value, ConfidenceLevel.HIGH.value) + assert mapped["synthetic-fake-token"] == (Severity.LOW.value, ConfidenceLevel.LOW.value) + assert mapped["unknown-rule"] == (Severity.MEDIUM.value, ConfidenceLevel.MEDIUM.value) + + def test_scenario_cli_scan_to_jsonl(self, setup_manifest, tmp_path, monkeypatch): + """Scenario: Invoking scan command through CLI entry point.""" + manifest_path, _ = setup_manifest + output_jsonl = tmp_path / "cli_findings.jsonl" + + report_items = [make_gitleaks_item("aws-access-token")] + + monkeypatch.setattr( + "security_scanner.runtime.local_scan.GitleaksScanner.scan", + lambda self, **kwargs: run_scanner_with_report(report_items) + ) + + # Build arguments for cmd_scan + parser = argparse.ArgumentParser() + parser.add_argument("--manifest") + parser.add_argument("--output") + parser.add_argument("--storage-backend") + parser.add_argument("--raw-evidence") + + args = parser.parse_args([ + "--manifest", str(manifest_path), + "--output", str(output_jsonl), + "--storage-backend", "jsonl", + "--raw-evidence", "", + ]) + + exit_code = cmd_scan(args) + assert exit_code == 0 + + # Read JSONL and verify dynamic severity + store = JsonlFindingStore(str(output_jsonl)) + findings = store.read_all() + assert len(findings) == 1 + assert findings[0].severity == Severity.CRITICAL.value + + def test_scenario_cli_gate_evaluation(self, setup_manifest, tmp_path, monkeypatch): + """Scenario: CLI gate evaluation against thresholds.""" + # 1. Create findings JSONL with CRITICAL severity + output_jsonl = tmp_path / "gate_findings.jsonl" + findings = [ + Finding.create( + repo_full_name="example/repo", + rule_id="aws-access-token", + file_path="src/main.py", + line_start=1, + raw_secret="AKIAFAKEEXAMPLE000001", + source_tool="gitleaks", + scan_run_id="scan_test", + rule_pack_version="0.1.0", + severity=Severity.CRITICAL.value, + confidence=ConfidenceLevel.HIGH.value, + ) + ] + store = JsonlFindingStore(str(output_jsonl)) + store.prepare_for_scan() + store.extend(findings) + + # 2. Call cmd_gate with max=0 + parser = argparse.ArgumentParser() + parser.add_argument("--findings") + parser.add_argument("--storage-backend") + parser.add_argument("--max", type=int) + + args = parser.parse_args([ + "--findings", str(output_jsonl), + "--storage-backend", "jsonl", + "--max", "0", + ]) + + # gate evaluates CRITICAL/HIGH as blocking (depending on gate.py logic) + # Should exit non-zero since there is a critical finding + exit_code = cmd_gate(args) + # Typically fails (returns 1) because the gate threshold of 0 is exceeded by the critical finding + assert exit_code == 1 diff --git a/tests/test_gitleaks_mapper.py b/tests/test_gitleaks_mapper.py index e52b21d..9a7246d 100644 --- a/tests/test_gitleaks_mapper.py +++ b/tests/test_gitleaks_mapper.py @@ -178,8 +178,212 @@ def test_normalize_report_path_returns_relative_path_inside_source_root(tmp_path assert normalize_report_path(str(file_path), source_root) == "deploy/secrets.env" -def test_normalize_report_path_keeps_external_absolute_path(tmp_path): +def test_normalize_report_path_normalizes_relative_path_inside_source_root(tmp_path): + source_root = tmp_path / "repo" + + assert normalize_report_path("deploy/../config/secrets.env", source_root) == ( + "config/secrets.env" + ) + + +def test_normalize_report_path_blocks_external_absolute_path(tmp_path): source_root = tmp_path / "repo" other_path = tmp_path / "other" / "secrets.env" - assert normalize_report_path(str(other_path), source_root) == str(other_path) + assert normalize_report_path(str(other_path), source_root) is None + + +def test_map_gitleaks_item_resolves_severity_from_tags(): + item = { + **MINIMAL_ITEM, + "Tags": ["severity:info", "confidence:high"] + } + finding = _map(item) + assert finding is not None + assert finding.severity == "INFO" + assert finding.confidence == "HIGH" + + +def test_map_gitleaks_item_resolves_severity_from_rule_id(): + # aws-access-token + finding_aws = _map({**MINIMAL_ITEM, "RuleID": "aws-access-token"}) + assert finding_aws is not None + assert finding_aws.severity == "CRITICAL" + assert finding_aws.confidence == "HIGH" + + # aws- prefix match + finding_prefix = _map({**MINIMAL_ITEM, "RuleID": "aws-custom-policy"}) + assert finding_prefix is not None + assert finding_prefix.severity == "CRITICAL" + assert finding_prefix.confidence == "MEDIUM" # fallback confidence + + # slack-webhook + finding_slack = _map({**MINIMAL_ITEM, "RuleID": "slack-webhook"}) + assert finding_slack is not None + assert finding_slack.severity == "HIGH" + assert finding_slack.confidence == "MEDIUM" + + +def test_map_gitleaks_item_tag_case_insensitivity(): + item = { + **MINIMAL_ITEM, + "Tags": ["SEVERITY:medium", "Confidence:Low"] + } + finding = _map(item) + assert finding is not None + assert finding.severity == "MEDIUM" + assert finding.confidence == "LOW" + + +def test_map_gitleaks_item_invalid_tags_fallback(): + # Invalid tag value, fallback to rule id (aws-access-token -> CRITICAL / HIGH) + item_rule = { + **MINIMAL_ITEM, + "RuleID": "aws-access-token", + "Tags": ["severity:super-critical", "confidence:very-high"] + } + finding_rule = _map(item_rule) + assert finding_rule is not None + assert finding_rule.severity == "CRITICAL" + assert finding_rule.confidence == "HIGH" + + # Invalid tag value, no rule id match -> fallback to DEFAULT (HIGH / MEDIUM) + item_def = { + **MINIMAL_ITEM, + "RuleID": "unknown-rule", + "Tags": ["severity:unknown", "confidence:unknown"] + } + finding_def = _map(item_def) + assert finding_def is not None + assert finding_def.severity == "HIGH" + assert finding_def.confidence == "MEDIUM" + + +def test_map_gitleaks_item_non_dict_input(): + assert map_gitleaks_item(None, repo_full_name=REPO_FULL_NAME, scan_run_id=SCAN_RUN_ID, rule_pack_version=RULE_PACK) is None + assert map_gitleaks_item("not-a-dict", repo_full_name=REPO_FULL_NAME, scan_run_id=SCAN_RUN_ID, rule_pack_version=RULE_PACK) is None + assert map_gitleaks_item([1, 2, 3], repo_full_name=REPO_FULL_NAME, scan_run_id=SCAN_RUN_ID, rule_pack_version=RULE_PACK) is None + + +def test_map_gitleaks_item_case_variant_keys(): + item = { + "ruleid": "aws-access-token", + "file": "deploy/secrets.env", + "startline": 3, + "secret": "AKIAFAKEEXAMPLE000003", + "match": "AWS_ACCESS_KEY=AKIAFAKEEXAMPLE000003", + } + finding = _map(item) + assert finding is not None + assert finding.rule_id == "aws-access-token" + assert finding.location.file_path == "deploy/secrets.env" + assert finding.location.line_start == 3 + + +def test_map_gitleaks_item_null_byte_filepath(): + item = { + **MINIMAL_ITEM, + "File": "deploy/secrets\x00.env", + } + finding = _map(item) + assert finding is None + + +def test_map_gitleaks_item_filepath_invalid_type(): + item = { + **MINIMAL_ITEM, + "File": {"nested": "path"}, + } + finding = _map(item) + assert finding is None + + item2 = { + **MINIMAL_ITEM, + "File": 12345, + } + finding2 = _map(item2) + assert finding2 is None + + +def test_map_gitleaks_item_negative_and_unordered_lines(): + item_neg_start = { + **MINIMAL_ITEM, + "StartLine": -10, + } + assert _map(item_neg_start) is None + + item_neg_end = { + **MINIMAL_ITEM, + "StartLine": 5, + "EndLine": -5, + } + assert _map(item_neg_end) is None + + item_unordered = { + **MINIMAL_ITEM, + "StartLine": 10, + "EndLine": 5, + } + assert _map(item_unordered) is None + + +def test_map_gitleaks_item_tags_as_tuple_or_set(): + item_tuple = { + **MINIMAL_ITEM, + "Tags": ("severity:critical", "confidence:high") + } + finding = _map(item_tuple) + assert finding is not None + assert finding.severity == "CRITICAL" + assert finding.confidence == "HIGH" + + item_set = { + **MINIMAL_ITEM, + "Tags": {"severity:critical", "confidence:high"} + } + finding2 = _map(item_set) + assert finding2 is not None + assert finding2.severity == "CRITICAL" + assert finding2.confidence == "HIGH" + + +def test_map_gitleaks_item_tags_invalid_elements(): + item = { + **MINIMAL_ITEM, + "Tags": ["severity:critical", 123, {"nested": "value"}] + } + finding = _map(item) + assert finding is not None + assert finding.severity == "CRITICAL" + assert finding.gitleaks.tags == ["severity:critical"] + + +def test_map_gitleaks_item_ignores_non_string_keys_for_case_insensitive_lookup(): + item = { + 123: "ignored", + "ruleid": "synthetic-fake-token", + "file": "deploy/secrets.env", + "startline": 3, + "secret": "AKIAFAKEEXAMPLE000003", + "match": "AWS_ACCESS_KEY=AKIAFAKEEXAMPLE000003", + } + + finding = _map(item) + + assert finding is not None + assert finding.rule_id == "synthetic-fake-token" + assert finding.severity == "LOW" + + +def test_normalize_report_path_relative_escape(tmp_path): + source_root = tmp_path / "repo" + escape_path = "../../etc/passwd" + assert normalize_report_path(escape_path, source_root) is None + + +def test_normalize_report_path_absolute_escape_no_source_root(): + assert normalize_report_path("/etc/passwd", None) is None + + +def test_normalize_report_path_relative_escape_no_source_root(): + assert normalize_report_path("../../etc/passwd", None) is None