Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 186 additions & 32 deletions src/security_scanner/scanners/gitleaks/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import logging
from pathlib import Path
from typing import Any

from security_scanner.core.finding.model import (
ConfidenceLevel,
Expand All @@ -46,10 +47,93 @@
logger = logging.getLogger(__name__)

# Default severity/confidence applied to all gitleaks findings.
# TODO(phase2): implement per-rule severity mapping based on gitleaks tags/rule metadata.
DEFAULT_SEVERITY = Severity.HIGH.value
DEFAULT_CONFIDENCE = ConfidenceLevel.MEDIUM.value

# Rule ID static mapping tables
RULE_SEVERITY_MAP = {
"aws-access-token": Severity.CRITICAL.value,
"gcp-api-key": Severity.CRITICAL.value,
"slack-webhook": Severity.HIGH.value,
"synthetic-fake-token": Severity.LOW.value,
}

RULE_CONFIDENCE_MAP = {
"aws-access-token": ConfidenceLevel.HIGH.value,
"gcp-api-key": ConfidenceLevel.MEDIUM.value,
"slack-webhook": ConfidenceLevel.MEDIUM.value,
"synthetic-fake-token": ConfidenceLevel.LOW.value,
}
Comment thread
pureliture marked this conversation as resolved.

VALID_SEVERITIES = {severity.value for severity in Severity}
VALID_CONFIDENCES = {confidence.value for confidence in ConfidenceLevel}


def _get_case_insensitive(data: dict, key: str, default: Any = None) -> Any:
if not isinstance(data, dict):
return default
if key in data:
return data[key]
key_lower = key.lower()
for k, v in data.items():
if not isinstance(k, str):
continue
if k.lower() == key_lower:
return v
return default


def resolve_severity_and_confidence(rule_id: str | None, tags: list[str] | None) -> tuple[str, str]:
"""Resolve severity and confidence based on tags, rule ID, and fallbacks.

Priorities:
1. Tags-based mapping (parse 'severity:<level>' and 'confidence:<level>', case-insensitive)
2. Rule ID-based mapping (static lookup tables and 'aws-' prefix match for severity)
3. Fallbacks (DEFAULT_SEVERITY and DEFAULT_CONFIDENCE)
"""
severity_from_tags = None
confidence_from_tags = None

if tags:
for tag in tags:
if not isinstance(tag, str) or ":" not in tag:
continue
parts = tag.split(":", 1)
key = parts[0].strip().lower()
val = parts[1].strip().upper()
if key == "severity":
if val in VALID_SEVERITIES:
severity_from_tags = val
elif key == "confidence":
if val in VALID_CONFIDENCES:
confidence_from_tags = val

# Determine Severity
if severity_from_tags is not None:
severity = severity_from_tags
elif isinstance(rule_id, str):
if rule_id in RULE_SEVERITY_MAP:
severity = RULE_SEVERITY_MAP[rule_id]
elif rule_id.startswith("aws-"):
severity = Severity.CRITICAL.value
else:
severity = DEFAULT_SEVERITY
else:
severity = DEFAULT_SEVERITY

# Determine Confidence
if confidence_from_tags is not None:
confidence = confidence_from_tags
elif isinstance(rule_id, str):
if rule_id in RULE_CONFIDENCE_MAP:
confidence = RULE_CONFIDENCE_MAP[rule_id]
else:
confidence = DEFAULT_CONFIDENCE
else:
confidence = DEFAULT_CONFIDENCE

return severity, confidence


def map_gitleaks_item(
item: dict,
Expand All @@ -62,69 +146,139 @@ def map_gitleaks_item(
index: int | None = None,
) -> Finding | None:
"""Map one Gitleaks JSON item to a core Finding, or None if invalid."""
rule_id = item.get("RuleID", "")
file_path = normalize_report_path(item.get("File", ""), source_root)
start_line = item.get("StartLine")
raw_secret = item.get("Secret", "")

if start_line is None:
_warning("item %s missing StartLine, skipping", index)
if not isinstance(item, dict):
_warning("item is not a dict: %s", type(item))
return None

line_end = item.get("EndLine")
fingerprint_override = item.get("Fingerprint") or None
triage_reason = item.get("Description") or None
try:
rule_id = _get_case_insensitive(item, "RuleID")
file_val = _get_case_insensitive(item, "File", "")
file_path = normalize_report_path(file_val, source_root)
if file_path is None:
_warning("item %s has invalid or escaping File: %s", index, file_val)
return None

start_line = _get_case_insensitive(item, "StartLine")
if start_line is None:
_warning("item %s missing StartLine, skipping", index)
return None

raw_secret = _get_case_insensitive(item, "Secret", "")
line_end = _get_case_insensitive(item, "EndLine")
fingerprint_override = _get_case_insensitive(item, "Fingerprint") or None
triage_reason = _get_case_insensitive(item, "Description") or None
commit_val = _get_case_insensitive(item, "Commit") or None
match_val = _get_case_insensitive(item, "Match")

start_line_int = int(start_line)
if start_line_int < 1:
raise ValueError(f"StartLine must be >= 1, got {start_line_int}")

end_line_int = int(line_end) if line_end is not None else None
tags = item.get("Tags")
if end_line_int is not None:
if end_line_int < 1:
raise ValueError(f"EndLine must be >= 1, got {end_line_int}")
if end_line_int < start_line_int:
raise ValueError(f"EndLine ({end_line_int}) cannot be less than StartLine ({start_line_int})")

tags = _get_case_insensitive(item, "Tags")
if isinstance(tags, (list, tuple, set)):
tags_list = [t for t in tags if isinstance(t, str)]
elif hasattr(tags, "__iter__") and not isinstance(tags, (str, bytes, dict)):
tags_list = [t for t in tags if isinstance(t, str)]
else:
tags_list = []

severity, confidence = resolve_severity_and_confidence(rule_id, tags_list)
rule_id_str = str(rule_id) if rule_id is not None else ""

gitleaks_payload = GitleaksFindingPayload(
rule_id=rule_id,
rule_id=rule_id_str,
file=file_path,
start_line=start_line_int,
end_line=end_line_int,
match=item.get("Match"),
match=match_val,
secret=raw_secret,
fingerprint=fingerprint_override,
description=item.get("Description"),
commit=item.get("Commit"),
tags=list(tags) if isinstance(tags, list) else [],
description=triage_reason,
commit=commit_val,
tags=tags_list,
)
return Finding.create(
repo_full_name=repo_full_name,
rule_id=rule_id,
rule_id=rule_id_str,
file_path=file_path,
line_start=start_line_int,
raw_secret=raw_secret,
source_tool=source_tool,
scan_run_id=scan_run_id,
rule_pack_version=rule_pack_version,
line_end=end_line_int,
severity=DEFAULT_SEVERITY,
confidence=DEFAULT_CONFIDENCE,
repo_commit=item.get("Commit") or None,
severity=severity,
confidence=confidence,
repo_commit=commit_val,
fingerprint_override=fingerprint_override,
triage_reason=triage_reason,
gitleaks=gitleaks_payload,
)
except (ValueError, TypeError) as exc:
except (ValueError, TypeError, AttributeError) as exc:
_warning("failed to create Finding for item %s: %s", index, exc)
return None


def normalize_report_path(file_path: str, source_root: Path | None) -> str:
"""Return repo-relative path when Gitleaks emits an absolute path."""
if not file_path or source_root is None:
return file_path
def normalize_report_path(file_path: Any, source_root: Path | None) -> str | None:
"""Return repo-relative path when Gitleaks emits an absolute path.

path = Path(file_path)
if not path.is_absolute():
return file_path
Returns None if file_path is invalid (e.g. invalid type or contains null bytes),
or if it escapes the repository root.
"""
if not isinstance(file_path, str):
return None
if "\x00" in file_path:
return None
if not file_path:
return ""

try:
return path.resolve().relative_to(source_root.resolve()).as_posix()
except ValueError:
return file_path
path = Path(file_path)
except (TypeError, ValueError):
return None

# Check for path traversal escaping repo root (if source_root is provided)
if source_root is not None:
try:
resolved_root = source_root.resolve()
# If path is absolute, check if it's within source_root
if path.is_absolute():
resolved_path = path.resolve()
relative = resolved_path.relative_to(resolved_root)
return relative.as_posix()
else:
# If path is relative, resolve it relative to source_root to ensure no escape
resolved_path = (resolved_root / path).resolve()
relative = resolved_path.relative_to(resolved_root)
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
return relative.as_posix()
except ValueError:
Comment thread
pureliture marked this conversation as resolved.
# Escapes source_root
return None
except (TypeError, RuntimeError, OSError):
return None
else:
# If source_root is None, we still must prevent escaping repository root.
# Absolute path (e.g. /etc/passwd) cannot be resolved to a relative path, so it escapes.
if path.is_absolute():
return None
# Check if relative path contains '..' that goes outside the relative base.
try:
dummy_root = Path("/dummy/root").resolve()
resolved_path = (dummy_root / path).resolve()
relative = resolved_path.relative_to(dummy_root)
return relative.as_posix()
except ValueError:
# Escapes relative root (e.g. ../../etc/passwd)
return None
except (TypeError, RuntimeError, OSError):
return None


def _warning(message: str, *args: object) -> None:
Expand Down
Loading
Loading