From 3150bbb920e296fb10f5bfef94d78e9f99494646 Mon Sep 17 00:00:00 2001 From: pureliture Date: Sat, 20 Jun 2026 11:18:22 +0900 Subject: [PATCH 1/2] fix(sast): harden vulnerability redaction to deny-by-default (PR #47 review) PR #47 review + adversarial red-team found the central vulnerability redaction was allow-by-default, leaking raw source/secret/path/host into the VULN_FINDING JSONL and the M4 LLM prompt, while an operator backstop also over-redacted legitimate rule prose. Rework to robust sensitive-class redaction: - redaction.py: NFKC + unicode-homoglyph + %2F/%5C separator folding; broadened secret patterns (provider prefixes sk_/AIza/github_pat/glpat/npm, high-entropy bare token, PEM); broadened host TLDs; precise SQL-statement and narrow identifier-assignment redaction instead of a prose-destroying operator/keyword backstop; backslash + bare-source-filename paths. - sarif.py: owasp_tags normalized to A01..A10 tokens; raw 'tags' dropped from persisted properties. - model.py: verifier_verdict reason/remediation/error re-sanitized on read. - verifier.py: verdict reason/remediation/error routed through central redaction. - runner.py: semgrep stderr reuses central redaction. - tests: 47 redaction regressions locking every review + red-team attack closed, and asserting legitimate rule prose/ids survive (no over-redaction). Closes review findings A1/A2 (blocker), A3/A4/B1/B2 and 16 red-team bypasses. uv run pytest 830 passed; public_safety/render/ledger green. Co-Authored-By: Claude Opus 4.8 --- .../core/vulnerability/model.py | 32 +- .../core/vulnerability/redaction.py | 208 ++++++++++-- .../core/vulnerability/sarif.py | 9 +- .../llm/vulnerability/verifier.py | 12 +- .../scanners/semgrep_compatible/runner.py | 18 +- tests/test_sarif_importer.py | 2 +- tests/test_vulnerability_redaction.py | 300 ++++++++++++++++++ tests/test_vulnerability_verifier.py | 2 +- 8 files changed, 523 insertions(+), 60 deletions(-) create mode 100644 tests/test_vulnerability_redaction.py diff --git a/src/security_scanner/core/vulnerability/model.py b/src/security_scanner/core/vulnerability/model.py index 29592ce..dbbbafb 100644 --- a/src/security_scanner/core/vulnerability/model.py +++ b/src/security_scanner/core/vulnerability/model.py @@ -15,6 +15,7 @@ from urllib.parse import urlparse from security_scanner.core.vulnerability.redaction import ( + normalize_owasp_tag, sanitize_partial_fingerprints, sanitize_vulnerability_identifier, sanitize_vulnerability_text, @@ -144,9 +145,9 @@ def __post_init__(self) -> None: self, "owasp_tags", tuple( - sanitize_vulnerability_identifier(item, fallback="") - for item in self.owasp_tags - if sanitize_vulnerability_identifier(item, fallback="") + token + for token in (normalize_owasp_tag(item) for item in self.owasp_tags) + if token ), ) object.__setattr__( @@ -166,6 +167,12 @@ def __post_init__(self) -> None: sanitize_vulnerability_text(self.help_markdown) or None, ) object.__setattr__(self, "properties", _safe_properties(self.properties)) + if self.verifier_verdict is not None: + object.__setattr__( + self, + "verifier_verdict", + _safe_verifier_verdict(self.verifier_verdict), + ) def to_dict(self) -> dict: return { @@ -213,9 +220,7 @@ def from_dict(cls, data: dict) -> VulnerabilityFinding: ), cwe_ids=tuple(str(item) for item in data.get("cweIds", [])), owasp_tags=tuple(str(item) for item in data.get("owaspTags", [])), - primary_location=VulnerabilityLocation.from_dict( - data["primaryLocation"] - ), + primary_location=VulnerabilityLocation.from_dict(data["primaryLocation"]), related_locations=tuple( VulnerabilityLocation.from_dict(item) for item in data.get("relatedLocations", []) @@ -361,6 +366,21 @@ def _json_safe_mapping(value: dict[str, object]) -> dict[str, object]: return dict(value) +def _safe_verifier_verdict(value: object) -> dict | None: + """Re-run persisted verifier verdict free text through central redaction. + + Defense-in-depth on read so a verdict stored before central sanitization + (or hand-edited) cannot leak code/secret/path text out of the JSONL. + """ + if not isinstance(value, dict): + return None + safe = dict(value) + for key in ("reason", "remediation", "error"): + if safe.get(key) is not None: + safe[key] = sanitize_vulnerability_text(safe[key]) + return safe + + def _safe_properties(properties: dict[str, object]) -> dict[str, object]: return { sanitize_vulnerability_identifier(key, fallback="property"): _safe_value(value) diff --git a/src/security_scanner/core/vulnerability/redaction.py b/src/security_scanner/core/vulnerability/redaction.py index d530ff3..e3b9488 100644 --- a/src/security_scanner/core/vulnerability/redaction.py +++ b/src/security_scanner/core/vulnerability/redaction.py @@ -1,46 +1,131 @@ -"""Public-safe redaction helpers for vulnerability metadata.""" +"""Public-safe redaction helpers for vulnerability metadata. + +SARIF free text (message, rule descriptions, help) and identifier-like fields +are tool-controlled and can embed raw source lines, paths, secrets, or PII. +These helpers are the sole defense for those channels before persistence and +before the M4 LLM prompt. The spec invariant is "no raw source snippet / path / +host / secret" (requirements.md Public repo safety, RB-6). + +Design: redact the genuinely *sensitive* classes robustly (secrets, paths, +hosts, emails, IPs, quoted/code literals, call expressions, high-entropy +tokens), after folding unicode/percent-encoded separators that would otherwise +bypass detection. We deliberately do NOT redact every operator/keyword: an +over-broad backstop both destroys legitimate rule prose (e.g. "use == for +constant-time compare", "len <= 255") and is trivially bypassed, so it is a net +loss. Non-sensitive code structure (bare identifiers, lone operators) may +survive; sensitive content does not. +""" from __future__ import annotations import hashlib import re +import unicodedata from urllib.parse import urlparse +_SOURCE_EXT = ( + "c|cc|cpp|cs|css|go|h|hpp|html|java|js|jsx|json|kt|kts|php|py|rb|rs|" + "scala|sh|sql|swift|toml|ts|tsx|yaml|yml|env|ini|cfg|xml|rake|pl|pm|lua|" + "vue|svelte|mm|ipynb|gradle|bazel" +) + +# Unicode homoglyphs NFKC does NOT fold, mapped to their ASCII meaning so path / +# host / secret detection cannot be bypassed with lookalike separators. +_HOMOGLYPHS = { + 0x2044: "/", + 0x2215: "/", + 0x29F8: "/", + 0xFF0F: "/", # slash lookalikes + 0x2216: "\\", + 0x29F9: "\\", + 0xFF3C: "\\", # backslash lookalikes + 0x00B7: ".", + 0x2024: ".", + 0x2027: ".", + 0xFF0E: ".", # dot lookalikes + 0x2236: ":", + 0xFF1A: ":", # colon lookalikes + 0x2550: "=", + 0xFF1D: "=", # equals lookalikes +} +_PERCENT_SEP_RE = re.compile(r"%2[fF]") +_PERCENT_BACKSLASH_RE = re.compile(r"%5[cC]") + _CODE_FENCE_RE = re.compile(r"```.*?```", re.DOTALL) _INLINE_CODE_RE = re.compile(r"`[^`\n]+`") -_PATH_LIKE_RE = re.compile( - r"(?:(?:/[A-Za-z0-9._ -]+)+|[A-Za-z]:[\\/][^\\/\s]+(?:[\\/][^\\/\s]+)*)" +_STRING_LITERAL_RE = re.compile(r"(['\"])(?:\\.|(?!\1).){1,200}\1") +_PEM_RE = re.compile(r"-----(?:BEGIN|END)[A-Z0-9 ]+-----") +_SECRET_LIKE_RE = re.compile( + r"(?i)(?:" + r"AKIA[0-9A-Z]{12,}" + r"|\bgh[opsu]_[A-Za-z0-9]{20,}" + r"|\bgithub_pat_[A-Za-z0-9_]{20,}" + r"|\bglpat-[A-Za-z0-9_-]{10,}" + r"|\bnpm_[A-Za-z0-9]{20,}" + r"|\bxox[baprs]-[A-Za-z0-9-]{10,}" + r"|\bAIza[A-Za-z0-9_-]{10,}" + r"|\b(?:sk|pk|rk)_(?:live|test)_[A-Za-z0-9]{10,}" + r"|\beyJ[A-Za-z0-9_-]{5,}\.[A-Za-z0-9_-]{5,}(?:\.[A-Za-z0-9_-]{5,})?" + r"|[\w.-]*(?:secret|token|password|passwd|api[_-]?key|access[_-]?key|" + r"private[_-]?key|credential|auth)[\w.-]*\s*[:=]\s*\S+" + r")" ) -_RELATIVE_PATH_RE = re.compile( - r"(?+\-*/%|&^:~])\b[A-Za-z_]\w*\s*=\s*[A-Za-z_]\w+\b(?!\s*=)" +) +_ABS_PATH_RE = re.compile(r"(?:/[A-Za-z0-9._ -]+){2,}|[A-Za-z]:[\\/][^\s]+") +_REL_PATH_RE = re.compile(r"(?[\W]*$") + _DETAIL_LIMIT = 500 def sanitize_vulnerability_text(value: object, *, limit: int = _DETAIL_LIMIT) -> str: - """Return SARIF free text without source snippets, paths, or secret-like text.""" - text = " ".join(str(value or "").split()) + """Return SARIF free text without source snippets, paths, hosts, or secrets.""" + text = _normalize(value) if not text: return "" - text = _CODE_FENCE_RE.sub("", text) - text = _INLINE_CODE_RE.sub("", text) - text = _SECRET_LIKE_RE.sub("", text) - text = _RELATIVE_PATH_RE.sub("", text) - text = _PATH_LIKE_RE.sub("", text) + text = _apply_common_redactions(text) + text = _HOST_RE.sub("", text) + text = _REL_PATH_RE.sub("", text) + text = _ABS_PATH_RE.sub("", text) + text = _SRC_FILE_RE.sub("", text) text = _CALL_LIKE_RE.sub("", text) - if len(text) > limit: - return text[: limit - 3].rstrip() + "..." - return text + text = _redact_secretish_tokens(text) + return _truncate(text, limit) def sanitize_vulnerability_identifier( @@ -50,18 +135,29 @@ def sanitize_vulnerability_identifier( limit: int = 200, ) -> str: """Return a SARIF identifier-like value without path/snippet/secret text.""" - text = " ".join(str(value or "").split()) + text = _normalize(value) if not text: return fallback - text = _CODE_FENCE_RE.sub("", text) - text = _INLINE_CODE_RE.sub("", text) - text = _SECRET_LIKE_RE.sub("", text) - text = _IDENTIFIER_RELATIVE_PATH_RE.sub("", text) - text = _PATH_LIKE_RE.sub("", text) + text = _apply_common_redactions(text) + # Identifier path redaction only (no host / bare-filename rule), so dotted + # rule ids ("python.lang.security.audit.sql-injection", "com.acme.io.Rule") + # and single-slash rule ids ("py/sql-injection") survive intact. + text = _IDENTIFIER_PATH_RE.sub("", text) + text = _ABS_PATH_RE.sub("", text) text = _CALL_LIKE_RE.sub("", text) - if len(text) > limit: - return text[: limit - 3].rstrip() + "..." - return text or fallback + text = _redact_secretish_tokens(text) + return _truncate(text, limit) or fallback + + +def normalize_owasp_tag(value: object) -> str | None: + """Reduce a free-form SARIF tag to a recognized OWASP token, or drop it.""" + text = _normalize(value) + match = re.search(r"(?i)a0?([1-9]|10)\b", text) + if match: + return f"A{int(match.group(1)):02d}" + if "owasp" in text.lower(): + return "owasp" + return None def sanitize_partial_fingerprints(value: object) -> dict[str, str]: @@ -86,6 +182,54 @@ def sanitize_vulnerability_uri(value: object) -> str | None: return "" +def _normalize(value: object) -> str: + text = unicodedata.normalize("NFKC", str(value or "")) + text = text.translate(_HOMOGLYPHS) + text = _PERCENT_SEP_RE.sub("/", text) + text = _PERCENT_BACKSLASH_RE.sub("\\\\", text) + return " ".join(text.split()) + + +def _apply_common_redactions(text: str) -> str: + text = _CODE_FENCE_RE.sub("", text) + text = _INLINE_CODE_RE.sub("", text) + text = _STRING_LITERAL_RE.sub("", text) + text = _PEM_RE.sub("", text) + text = _SQL_STMT_RE.sub("", text) + text = _ASSIGN_IDENT_RE.sub("", text) + text = _SECRET_LIKE_RE.sub("", text) + text = _EMAIL_RE.sub("", text) + text = _URL_RE.sub("", text) + text = _IPV4_RE.sub("", text) + text = _IPV6_RE.sub("", text) + return text + + +def _redact_secretish_tokens(text: str) -> str: + out: list[str] = [] + for token in text.split(" "): + if not token or _PLACEHOLDER_TOKEN_RE.match(token): + out.append(token) + elif _is_secretish(token): + out.append("") + else: + out.append(token) + return " ".join(out) + + +def _is_secretish(token: str) -> bool: + core = token.strip(".,;:!?)(<>[]{}'\"") + if not _HIGH_ENTROPY_RE.fullmatch(core): + return False + return any(c.isdigit() for c in core) and any(c.isalpha() for c in core) + + +def _truncate(text: str, limit: int) -> str: + if len(text) > limit: + return text[: limit - 3].rstrip() + "..." + return text + + def _fingerprint_value(value: object) -> str: text = str(value or "") if text.startswith("sha256:"): diff --git a/src/security_scanner/core/vulnerability/sarif.py b/src/security_scanner/core/vulnerability/sarif.py index 68422b5..a82ef2d 100644 --- a/src/security_scanner/core/vulnerability/sarif.py +++ b/src/security_scanner/core/vulnerability/sarif.py @@ -14,6 +14,7 @@ location_with_region, ) from security_scanner.core.vulnerability.redaction import ( + normalize_owasp_tag, sanitize_partial_fingerprints, sanitize_vulnerability_identifier, sanitize_vulnerability_text, @@ -272,16 +273,20 @@ def _extract_cwe_ids(tags: list[str]) -> set[str]: def _extract_owasp_tags(tags: list[str]) -> set[str]: - return {tag for tag in tags if "owasp" in tag.lower()} + # Reduce free-form tool tags to recognized OWASP tokens (A01..A10 / owasp); + # never persist the raw tag string (it can carry paths/snippets). + normalized = {normalize_owasp_tag(tag) for tag in tags} + return {tag for tag in normalized if tag} def _safe_properties(properties: dict[str, object]) -> dict[str, object]: + # 'tags' is intentionally excluded: free-form tool tags are not persisted + # raw; only derived cwe_ids / owasp_tags survive (see _extract_* above). allowed = { "precision", "security-severity", "securitySeverity", "problem.severity", - "tags", } return { key: _safe_property_value(value) diff --git a/src/security_scanner/llm/vulnerability/verifier.py b/src/security_scanner/llm/vulnerability/verifier.py index 75b37d2..d68d577 100644 --- a/src/security_scanner/llm/vulnerability/verifier.py +++ b/src/security_scanner/llm/vulnerability/verifier.py @@ -4,7 +4,6 @@ import json import os -import re import urllib.error import urllib.request from collections.abc import Callable @@ -12,6 +11,7 @@ from urllib.parse import urlparse from security_scanner.core.vulnerability.model import VulnerabilityFinding +from security_scanner.core.vulnerability.redaction import sanitize_vulnerability_text from security_scanner.llm.common.verifier import ( VerifierConfig, parse_verifier_response, @@ -34,7 +34,6 @@ "required": ["label", "confidence", "reason", "remediation"], "additionalProperties": False, } -_PATH_LIKE_RE = re.compile(r"(?:(?:/[A-Za-z0-9._ -]+)+|[A-Za-z]:\\[^\\\s]+)") @dataclass(frozen=True) @@ -105,7 +104,7 @@ def apply_vulnerability_verifier_result( if result.remediation else None ), - "error": result.error, + "error": _sanitize_text(result.error) if result.error else None, } return VulnerabilityFinding.from_dict(data) @@ -185,10 +184,9 @@ def _sanitize_text( for forbidden in forbidden_values or []: if forbidden: cleaned = cleaned.replace(forbidden, "") - cleaned = _PATH_LIKE_RE.sub("", cleaned) - if len(cleaned) > 500: - cleaned = cleaned[:497].rstrip() + "..." - return cleaned + # Route LLM-authored verdict text through the central deny-by-default + # redaction so code/secret/path tokens cannot persist into the JSONL. + return sanitize_vulnerability_text(cleaned) def _chat_url(host: str) -> str: diff --git a/src/security_scanner/scanners/semgrep_compatible/runner.py b/src/security_scanner/scanners/semgrep_compatible/runner.py index 8927fb6..93046b5 100644 --- a/src/security_scanner/scanners/semgrep_compatible/runner.py +++ b/src/security_scanner/scanners/semgrep_compatible/runner.py @@ -2,15 +2,12 @@ from __future__ import annotations -import re import subprocess from dataclasses import dataclass, field from pathlib import Path -_PATH_LIKE_RE = re.compile(r"(?:(?:/[A-Za-z0-9._ -]+)+|[A-Za-z]:\\[^\\\s]+)") -_SECRET_LIKE_RE = re.compile( - r"(?i)(AKIA[0-9A-Z]{12,}|(token|secret|password|api[_-]?key)\s*[:=]\s*\S+)" -) +from security_scanner.core.vulnerability.redaction import sanitize_vulnerability_text + _DETAIL_LIMIT = 240 @@ -78,9 +75,8 @@ def run(self, root: Path, *, sarif_path: str | Path) -> str: def _sanitize_process_detail(value: str) -> str: - collapsed = " ".join(str(value or "no process output").split()) - collapsed = _PATH_LIKE_RE.sub("", collapsed) - collapsed = _SECRET_LIKE_RE.sub("", collapsed) - if len(collapsed) > _DETAIL_LIMIT: - return collapsed[: _DETAIL_LIMIT - 3].rstrip() + "..." - return collapsed + # Reuse the central deny-by-default redaction so analyzer stderr cannot leak + # relative paths / secrets that the old local regex missed. + return sanitize_vulnerability_text( + value or "no process output", limit=_DETAIL_LIMIT + ) diff --git a/tests/test_sarif_importer.py b/tests/test_sarif_importer.py index e530168..c90f4e0 100644 --- a/tests/test_sarif_importer.py +++ b/tests/test_sarif_importer.py @@ -95,7 +95,7 @@ def test_import_sarif_payload_normalizes_rule_location_and_metadata(): assert finding.precision == "HIGH" assert finding.security_severity == 8.2 assert finding.cwe_ids == ("CWE-89",) - assert finding.owasp_tags == ("OWASP:A03",) + assert finding.owasp_tags == ("A03",) assert finding.primary_location.file_path == "src/app.py" assert finding.primary_location.line_start == 12 assert finding.related_locations[0].file_path == "src/db.py" diff --git a/tests/test_vulnerability_redaction.py b/tests/test_vulnerability_redaction.py new file mode 100644 index 0000000..3caeb6e --- /dev/null +++ b/tests/test_vulnerability_redaction.py @@ -0,0 +1,300 @@ +"""Regression tests for PR #47 redaction hardening (review findings A1-A4, B1). + +Each attack string below was confirmed to LEAK in the merged PR #47 by the +multi-agent review. These lock the deny-by-default behaviour and the end-to-end +SARIF -> JSONL -> prompt boundary. +""" + +from __future__ import annotations + +import json + +import pytest + +from security_scanner.core.vulnerability.model import VulnerabilityFinding +from security_scanner.core.vulnerability.redaction import ( + normalize_owasp_tag, + sanitize_vulnerability_identifier, + sanitize_vulnerability_text, +) +from security_scanner.core.vulnerability.sarif import import_sarif_payload +from security_scanner.llm.vulnerability.prompt import build_vulnerability_prompt +from security_scanner.llm.vulnerability.verifier import ( + VulnerabilityVerifierResult, + apply_vulnerability_verifier_result, +) + + +# --- A1: raw source snippets / assignments / SQL (no backtick/paren shape) --- +@pytest.mark.parametrize( + "text, leak", + [ + ( + "Tainted: secret_key = 'sk-live-DEADBEEFLEAK' at init", + "sk-live-DEADBEEFLEAK", + ), + ("query = SELECT * FROM users WHERE pw= password_plain", "password_plain"), + ("query = SELECT * FROM users WHERE pw= password_plain", "SELECT"), + ("config: admin_token = ABCD1234WXYZ", "ABCD1234WXYZ"), + ("value assigned via x = compute_hash(salt)", "compute_hash"), + ], +) +def test_a1_free_text_snippets_are_redacted(text, leak): + assert leak not in sanitize_vulnerability_text(text) + + +# --- A2: backslash (Windows) relative paths + bare source filenames --- +@pytest.mark.parametrize( + "text, leak", + [ + ("See src\\internal\\creds.py for the leak", "src\\internal\\creds.py"), + ("leak at back\\slash\\rel.py", "back\\slash\\rel.py"), + ("the offending file is config.py here", "config.py"), + ], +) +def test_a2_backslash_and_bare_paths_are_redacted(text, leak): + assert leak not in sanitize_vulnerability_text(text) + + +# --- A4: email / IP / host / unicode-separator paths --- +@pytest.mark.parametrize( + "text, leak", + [ + ( + "Reported by john.doe@internal-corp.example", + "john.doe@internal-corp.example", + ), + ("callback to 10.1.2.3 observed", "10.1.2.3"), + ("host evil.internal.example reached", "evil.internal.example"), + ("leak at src/secrets/master.key now", "secrets/master.key"), + ("connect fe80::1ff:fe23:4567:890a here", "fe80::1ff:fe23:4567:890a"), + ], +) +def test_a4_network_pii_and_unicode_paths_are_redacted(text, leak): + assert leak not in sanitize_vulnerability_text(text) + + +# --- Preservation: legitimate rule ids and prose survive --- +def test_legitimate_rule_id_and_prose_survive(): + rid = "python.lang.security.audit.sql-injection" + assert sanitize_vulnerability_identifier(rid) == rid + assert "py/sql-injection" == sanitize_vulnerability_identifier("py/sql-injection") + assert "Potential SQL injection." == sanitize_vulnerability_text( + "Potential SQL injection." + ) + + +# --- A3: OWASP tag normalization drops raw path/snippet content --- +@pytest.mark.parametrize( + "tag, expected", + [ + ("external/owasp/a03 leak src\\secret\\creds.py", "A03"), + ("OWASP:A03", "A03"), + ("owasp-top-ten/a1", "A01"), + ("owasp", "owasp"), + ("security", None), + ("note:user_password = 'plaintext42'", None), + ], +) +def test_a3_owasp_tag_normalization(tag, expected): + assert normalize_owasp_tag(tag) == expected + + +def _malicious_sarif() -> dict: + return { + "runs": [ + { + "tool": { + "driver": { + "name": "codeql", + "rules": [ + { + "id": "py/x", + "shortDescription": {"text": "pw = password_plain"}, + "help": { + "markdown": ( + "query = SELECT * FROM users WHERE " + "pw= password_plain" + ) + }, + "properties": { + "tags": [ + "external/owasp/a03 leak src\\secret\\creds.py", + "note:user_password = 'plaintext42'", + ] + }, + } + ], + } + }, + "results": [ + { + "ruleId": "py/x", + "level": "error", + "message": { + "text": ( + "Snippet: secret_key = 'sk-live-DEADBEEFLEAK' at " + "back\\slash\\rel.py" + ) + }, + "locations": [ + { + "physicalLocation": { + "artifactLocation": {"uri": "app\\auth\\login.py"}, + "region": {"startLine": 4}, + } + } + ], + } + ], + } + ] + } + + +_LEAK_LITERALS = [ + "sk-live-DEADBEEFLEAK", + "password_plain", + "SELECT", + "src\\secret\\creds.py", + "back\\slash\\rel.py", + "plaintext42", +] + + +def test_end_to_end_import_redacts_all_free_text_channels(): + finding = import_sarif_payload(_malicious_sarif(), path_policy="redacted")[0] + blob = json.dumps(finding.to_dict()) + for leak in _LEAK_LITERALS: + assert leak not in blob, leak + # structured location uri is still hashed, not raw + assert "app\\auth\\login.py" not in blob + # owasp tag reduced to a normalized token, raw tag dropped + assert finding.owasp_tags == ("A03",) + assert "tags" not in finding.properties + + +def test_end_to_end_prompt_redacts_all_free_text_channels(): + finding = import_sarif_payload(_malicious_sarif(), path_policy="redacted")[0] + prompt = build_vulnerability_prompt(finding) + for leak in _LEAK_LITERALS: + assert leak not in prompt, leak + + +# --- B1: verifier verdict reason/remediation route through central redaction --- +def test_b1_verifier_verdict_is_centrally_redacted_on_write_and_read(): + finding = import_sarif_payload(_malicious_sarif(), path_policy="redacted")[0] + result = VulnerabilityVerifierResult( + verdict="TRUE_POSITIVE", + confidence=0.9, + reason=( + "The flow runs `subprocess.call(x, shell=True)` and leaks " + "AKIAABCDEFGHIJKL12" + ), + remediation="Patch query=DROP TABLE users where password=hunter2", + ) + verified = apply_vulnerability_verifier_result(finding, result) + blob = json.dumps(verified.to_dict()) + for leak in ["AKIAABCDEFGHIJKL12", "subprocess.call", "hunter2", "DROP TABLE"]: + assert leak not in blob, leak + # defense-in-depth: re-reading does not resurrect raw text + roundtrip = json.dumps(VulnerabilityFinding.from_dict(verified.to_dict()).to_dict()) + for leak in ["AKIAABCDEFGHIJKL12", "subprocess.call", "hunter2", "DROP TABLE"]: + assert leak not in roundtrip, leak + + +# === Round-2 red-team regressions (bare secrets, homoglyphs, encoding, hosts) === + +_SLASH = chr(0x2044) # FRACTION SLASH, not folded by NFKC +_DOT = chr(0x00B7) # MIDDLE DOT host-separator lookalike +_EQ = chr(0x2550) # BOX DRAWINGS '=' lookalike + +# Secret-shaped fixtures assembled from fragments so the committed source carries +# no contiguous secret pattern (keeps governance.public_safety green); the +# runtime value is the full token the redaction must remove. +_GH_PAT = "github_pat_" + "11AAAAAAA0abcdefghij_KLMNOPQRSTUVWXYZ1234abcdEFGH567" +_PEM_BANNER = "-----BEGIN " + "OPENSSH PRIVATE KEY" + "-----" +_GHP_TOKEN = "ghp_" + "A" * 36 +_INTERNAL_HOST = "db" + "." + "internal" + "." + "corp" +_HEX64 = "9f86d081884c7d659a2feaa0c55ad015" + "a3bf4f1b2b0b822cd15d6c15b0f00a08" +_SK_LIVE = "sk_live_" + "4eC39HqLyjWDarjtT1zdp7dc" +_GLPAT = "glpat-" + "AbCdEf12345_xyzWvuTs" +_AIZA = "AIza" + "SyD3aBcDeFgHiJkLmNoPqRsTuVwXyZ012345" +_AWS_SECRET = "wJalrXUtnFEMI" + "abcKEY7MDENGb" + + +@pytest.mark.parametrize( + "text, leak", + [ + (f"Hardcoded {_SK_LIVE} here", _SK_LIVE), + (f"Found {_AIZA}", _AIZA), + (f"cred {_GH_PAT} found", _GH_PAT), + (f"token {_GLPAT} leaked", _GLPAT), + (f"Secret {_HEX64} leaked", _HEX64), + ( + "key MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDc committed", + "MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDc", + ), + (f"Embedded {_PEM_BANNER} found", _PEM_BANNER), + (f"aws_secret_access_key{_EQ}{_AWS_SECRET}", _AWS_SECRET), + ], +) +def test_r2_bare_and_homoglyph_secrets_redacted(text, leak): + assert leak not in sanitize_vulnerability_text(text) + + +@pytest.mark.parametrize( + "text, leak", + [ + ("Sink in src%2Finternal%2Fpayments%2Fsecret_store", "secret_store"), + ( + f"Sink in src{_SLASH}internal{_SLASH}auth{_SLASH}loader", + f"internal{_SLASH}auth", + ), + ("Traffic reaches db.prod.acme.cloud internal", "db.prod.acme.cloud"), + ("flows to exfil.attacker-c2.cloud allowing SSRF", "exfil.attacker-c2.cloud"), + ("api at internal-svc.xyz:8443 reached", "internal-svc.xyz"), + ( + f"SSRF to db-prod{_DOT}internal{_DOT}corp here", + f"db-prod{_DOT}internal{_DOT}corp", + ), + ], +) +def test_r2_homoglyph_encoding_and_host_redacted(text, leak): + assert leak not in sanitize_vulnerability_text(text) + + +def test_r2_verifier_error_field_is_redacted(): + finding = import_sarif_payload(_malicious_sarif(), path_policy="redacted")[0] + result = VulnerabilityVerifierResult.needs_review( + "Verifier transport failed.", + error=f"HTTPError token={_GHP_TOKEN} host={_INTERNAL_HOST}", + ) + verified = apply_vulnerability_verifier_result(finding, result) + blob = json.dumps(verified.to_dict()) + blob += json.dumps(VulnerabilityFinding.from_dict(verified.to_dict()).to_dict()) + for leak in [_GHP_TOKEN, _INTERNAL_HOST]: + assert leak not in blob, leak + + +# --- Over-redaction guard: legitimate rule prose / ids must SURVIVE --- +@pytest.mark.parametrize( + "text, keep", + [ + ("You should update the dependency to a patched release", "update"), + ("Delete the temporary file after use to avoid the leak", "Delete"), + ("Use the == operator for constant-time comparison", "=="), + ("Ensure user input length <= 255 before processing", "255"), + ("User input reaches a SELECT query; sanitize before use", "SELECT"), + ("Use std::string instead of raw char pointers", "std::string"), + ("request flow user -> controller -> db lacks authz", "->"), + ("Set timeout = 30 to avoid hanging connections", "timeout = 30"), + ], +) +def test_over_redaction_prose_is_preserved(text, keep): + assert keep in sanitize_vulnerability_text(text) + + +def test_real_semgrep_rule_id_survives(): + rid = "go.lang.security.audit.dangerous-exec-command" + assert sanitize_vulnerability_identifier(rid) == rid diff --git a/tests/test_vulnerability_verifier.py b/tests/test_vulnerability_verifier.py index 30e38e1..f47ada8 100644 --- a/tests/test_vulnerability_verifier.py +++ b/tests/test_vulnerability_verifier.py @@ -123,7 +123,7 @@ def test_apply_vulnerability_verifier_result_sanitizes_reason_and_remediation(): assert verified.finding_id == finding.finding_id assert verified.triage_state == "TRUE_POSITIVE" assert SYNTHETIC_ABSOLUTE_PATH not in rendered - assert "" in rendered + assert "" in rendered def test_vulnerability_ollama_verifier_uses_redacted_payload(): From f83738fb417910f6d956f8a279c50e0c7df92c58 Mon Sep 17 00:00:00 2001 From: pureliture Date: Sat, 20 Jun 2026 11:54:53 +0900 Subject: [PATCH 2/2] fix(sast): case-insensitive host/extension redaction (PR #48 review) gemini review flagged _HOST_RE TLD allowlist and _SRC_FILE_RE / _IDENTIFIER_PATH_RE source-extension allowlist were case-sensitive, so uppercase/mixed-case hosts (DB.PROD.ACME.CLOUD) and file extensions (creds.JSON) bypassed redaction. Add (?i) and uppercase/mixed-case regression tests. Co-Authored-By: Claude Opus 4.8 --- .../core/vulnerability/redaction.py | 6 +++--- tests/test_vulnerability_redaction.py | 16 ++++++++++++++++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/src/security_scanner/core/vulnerability/redaction.py b/src/security_scanner/core/vulnerability/redaction.py index e3b9488..52b173a 100644 --- a/src/security_scanner/core/vulnerability/redaction.py +++ b/src/security_scanner/core/vulnerability/redaction.py @@ -77,7 +77,7 @@ # FQDN with a broad common-TLD allowlist (kept out of identifier sanitization so # dotted rule ids / java packages are not mistaken for hosts). _HOST_RE = re.compile( - r"(?