Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions src/security_scanner/core/vulnerability/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from urllib.parse import urlparse

from security_scanner.core.vulnerability.redaction import (
normalize_owasp_tag,
sanitize_partial_fingerprints,
sanitize_vulnerability_identifier,
sanitize_vulnerability_text,
Expand Down Expand Up @@ -144,9 +145,9 @@ def __post_init__(self) -> None:
self,
"owasp_tags",
tuple(
sanitize_vulnerability_identifier(item, fallback="")
for item in self.owasp_tags
if sanitize_vulnerability_identifier(item, fallback="")
token
for token in (normalize_owasp_tag(item) for item in self.owasp_tags)
if token
),
)
object.__setattr__(
Expand All @@ -166,6 +167,12 @@ def __post_init__(self) -> None:
sanitize_vulnerability_text(self.help_markdown) or None,
)
object.__setattr__(self, "properties", _safe_properties(self.properties))
if self.verifier_verdict is not None:
object.__setattr__(
self,
"verifier_verdict",
_safe_verifier_verdict(self.verifier_verdict),
)

def to_dict(self) -> dict:
return {
Expand Down Expand Up @@ -213,9 +220,7 @@ def from_dict(cls, data: dict) -> VulnerabilityFinding:
),
cwe_ids=tuple(str(item) for item in data.get("cweIds", [])),
owasp_tags=tuple(str(item) for item in data.get("owaspTags", [])),
primary_location=VulnerabilityLocation.from_dict(
data["primaryLocation"]
),
primary_location=VulnerabilityLocation.from_dict(data["primaryLocation"]),
related_locations=tuple(
VulnerabilityLocation.from_dict(item)
for item in data.get("relatedLocations", [])
Expand Down Expand Up @@ -361,6 +366,21 @@ def _json_safe_mapping(value: dict[str, object]) -> dict[str, object]:
return dict(value)


def _safe_verifier_verdict(value: object) -> dict | None:
"""Re-run persisted verifier verdict free text through central redaction.

Defense-in-depth on read so a verdict stored before central sanitization
(or hand-edited) cannot leak code/secret/path text out of the JSONL.
"""
if not isinstance(value, dict):
return None
safe = dict(value)
for key in ("reason", "remediation", "error"):
if safe.get(key) is not None:
safe[key] = sanitize_vulnerability_text(safe[key])
return safe


def _safe_properties(properties: dict[str, object]) -> dict[str, object]:
return {
sanitize_vulnerability_identifier(key, fallback="property"): _safe_value(value)
Expand Down
208 changes: 176 additions & 32 deletions src/security_scanner/core/vulnerability/redaction.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,131 @@
"""Public-safe redaction helpers for vulnerability metadata."""
"""Public-safe redaction helpers for vulnerability metadata.

SARIF free text (message, rule descriptions, help) and identifier-like fields
are tool-controlled and can embed raw source lines, paths, secrets, or PII.
These helpers are the sole defense for those channels before persistence and
before the M4 LLM prompt. The spec invariant is "no raw source snippet / path /
host / secret" (requirements.md Public repo safety, RB-6).

Design: redact the genuinely *sensitive* classes robustly (secrets, paths,
hosts, emails, IPs, quoted/code literals, call expressions, high-entropy
tokens), after folding unicode/percent-encoded separators that would otherwise
bypass detection. We deliberately do NOT redact every operator/keyword: an
over-broad backstop both destroys legitimate rule prose (e.g. "use == for
constant-time compare", "len <= 255") and is trivially bypassed, so it is a net
loss. Non-sensitive code structure (bare identifiers, lone operators) may
survive; sensitive content does not.
"""

from __future__ import annotations

import hashlib
import re
import unicodedata
from urllib.parse import urlparse

_SOURCE_EXT = (
"c|cc|cpp|cs|css|go|h|hpp|html|java|js|jsx|json|kt|kts|php|py|rb|rs|"
"scala|sh|sql|swift|toml|ts|tsx|yaml|yml|env|ini|cfg|xml|rake|pl|pm|lua|"
"vue|svelte|mm|ipynb|gradle|bazel"
)

# Unicode homoglyphs NFKC does NOT fold, mapped to their ASCII meaning so path /
# host / secret detection cannot be bypassed with lookalike separators.
_HOMOGLYPHS = {
0x2044: "/",
0x2215: "/",
0x29F8: "/",
0xFF0F: "/", # slash lookalikes
0x2216: "\\",
0x29F9: "\\",
0xFF3C: "\\", # backslash lookalikes
0x00B7: ".",
0x2024: ".",
0x2027: ".",
0xFF0E: ".", # dot lookalikes
0x2236: ":",
0xFF1A: ":", # colon lookalikes
0x2550: "=",
0xFF1D: "=", # equals lookalikes
}
_PERCENT_SEP_RE = re.compile(r"%2[fF]")
_PERCENT_BACKSLASH_RE = re.compile(r"%5[cC]")

_CODE_FENCE_RE = re.compile(r"```.*?```", re.DOTALL)
_INLINE_CODE_RE = re.compile(r"`[^`\n]+`")
_PATH_LIKE_RE = re.compile(
r"(?:(?:/[A-Za-z0-9._ -]+)+|[A-Za-z]:[\\/][^\\/\s]+(?:[\\/][^\\/\s]+)*)"
_STRING_LITERAL_RE = re.compile(r"(['\"])(?:\\.|(?!\1).){1,200}\1")
_PEM_RE = re.compile(r"-----(?:BEGIN|END)[A-Z0-9 ]+-----")
_SECRET_LIKE_RE = re.compile(
r"(?i)(?:"
r"AKIA[0-9A-Z]{12,}"
r"|\bgh[opsu]_[A-Za-z0-9]{20,}"
r"|\bgithub_pat_[A-Za-z0-9_]{20,}"
r"|\bglpat-[A-Za-z0-9_-]{10,}"
r"|\bnpm_[A-Za-z0-9]{20,}"
r"|\bxox[baprs]-[A-Za-z0-9-]{10,}"
r"|\bAIza[A-Za-z0-9_-]{10,}"
r"|\b(?:sk|pk|rk)_(?:live|test)_[A-Za-z0-9]{10,}"
r"|\beyJ[A-Za-z0-9_-]{5,}\.[A-Za-z0-9_-]{5,}(?:\.[A-Za-z0-9_-]{5,})?"
r"|[\w.-]*(?:secret|token|password|passwd|api[_-]?key|access[_-]?key|"
r"private[_-]?key|credential|auth)[\w.-]*\s*[:=]\s*\S+"
r")"
)
_RELATIVE_PATH_RE = re.compile(
r"(?<![/\\\w.-])(?:[A-Za-z0-9_.-]+/){1,}[A-Za-z0-9_.-]+\b"
_EMAIL_RE = re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b")
_URL_RE = re.compile(r"(?i)\b(?:https?|ftp|ftps|file|ssh|git)://\S+")
_IPV4_RE = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b")
_IPV6_RE = re.compile(r"\b(?:[0-9A-Fa-f]{1,4}:){2,7}[0-9A-Fa-f]{1,4}\b")
# FQDN with a broad common-TLD allowlist (kept out of identifier sanitization so
# dotted rule ids / java packages are not mistaken for hosts).
_HOST_RE = re.compile(
r"(?i)(?<![\w.@-])(?:[A-Za-z0-9-]+\.)+"
r"(?:com|net|org|io|dev|ai|co|cloud|app|xyz|us|uk|ca|de|fr|jp|cn|ru|eu|me|"
r"tv|cc|sh|info|biz|site|online|tech|store|zip|mov|svc|k8s|"
r"internal|local|example|corp|gov|edu|test|lan|intra|prod|stg|qa)\b"
r"(?::\d{2,5})?"
)
Comment on lines +79 to 85

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The regular expression _HOST_RE is compiled without case-insensitivity. Consequently, hostnames with uppercase TLDs or domains (e.g., evil.internal.EXAMPLE, DB.INTERNAL, EXAMPLE.COM) will bypass host redaction. Adding the (?i) inline flag at the beginning of the pattern ensures that all host variations are correctly redacted.

Suggested change
_HOST_RE = re.compile(
r"(?<![\w.@-])(?:[A-Za-z0-9-]+\.)+"
r"(?:com|net|org|io|dev|ai|co|cloud|app|xyz|us|uk|ca|de|fr|jp|cn|ru|eu|me|"
r"tv|cc|sh|info|biz|site|online|tech|store|zip|mov|svc|k8s|"
r"internal|local|example|corp|gov|edu|test|lan|intra|prod|stg|qa)\b"
r"(?::\d{2,5})?"
)
_HOST_RE = re.compile(
r"(?i)(?<![\w.@-])(?:[A-Za-z0-9-]+\.)+"
r"(?:com|net|org|io|dev|ai|co|cloud|app|xyz|us|uk|ca|de|fr|jp|cn|ru|eu|me|"
r"tv|cc|sh|info|biz|site|online|tech|store|zip|mov|svc|k8s|"
r"internal|local|example|corp|gov|edu|test|lan|intra|prod|stg|qa)\b"
r"(?::\d{2,5})?"
)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

반영했습니다 (f83738f) — _HOST_RE(?i)를 적용해 대문자/혼합 host TLD(예: DB.PROD.ACME.CLOUD) 우회를 닫고, 대문자 host 회귀 테스트를 추가했습니다.

_SECRET_LIKE_RE = re.compile(
r"(?i)(AKIA[0-9A-Z]{12,}|(token|secret|password|api[_-]?key)\s*[:=]\s*\S+)"
# Real SQL statements (keyword + structural clause), NOT bare keywords in prose.
_SQL_STMT_RE = re.compile(
r"(?i)\bselect\b[^\n]{0,200}?\bfrom\b[^\n]{0,200}"
r"|\binsert\s+into\b[^\n]{0,200}"
r"|\bupdate\b[^\n]{1,80}?\bset\b[^\n]{0,200}"
r"|\bdelete\s+from\b[^\n]{0,200}"
r"|\b(?:drop|truncate|alter|create)\s+(?:table|database|schema|index|view)\b"
r"[^\n]{0,200}"
)
_CALL_LIKE_RE = re.compile(r"\b[A-Za-z_][A-Za-z0-9_.]*\s*\([^()\n]{1,160}\)")
_IDENTIFIER_RELATIVE_PATH_RE = re.compile(
r"(?<![/\\\w.-])(?:[A-Za-z0-9_.-]+/){1,}[A-Za-z0-9_.-]+"
r"\.(?:c|cc|cpp|cs|css|go|h|hpp|html|java|js|json|jsx|kt|kts|php|py|rb|rs|"
r"scala|sh|sql|swift|toml|ts|tsx|yaml|yml)\b|"
r"(?<![/\\\w.-])(?:[A-Za-z0-9_.-]+/){2,}[A-Za-z0-9_.-]+\b"
# Narrow identifier assignment "name = ident" (source variable bound to another
# identifier). Numeric/quoted RHS ("timeout = 30") and comparisons (==, <=) are
# left alone so legitimate rule prose survives.
_ASSIGN_IDENT_RE = re.compile(
r"(?<![=!<>+\-*/%|&^:~])\b[A-Za-z_]\w*\s*=\s*[A-Za-z_]\w+\b(?!\s*=)"
)
_ABS_PATH_RE = re.compile(r"(?:/[A-Za-z0-9._ -]+){2,}|[A-Za-z]:[\\/][^\s]+")
_REL_PATH_RE = re.compile(r"(?<![/\\\w.-])(?:[A-Za-z0-9_.-]+[\\/]){1,}[A-Za-z0-9_.-]+")
_SRC_FILE_RE = re.compile(rf"(?i)(?<![/\\\w.-])[A-Za-z0-9_.-]+\.(?:{_SOURCE_EXT})\b")
_IDENTIFIER_PATH_RE = re.compile(
rf"(?i)(?<![/\\\w.-])(?:[A-Za-z0-9_.-]+[\\/]){{1,}}[A-Za-z0-9_.-]+\.(?:{_SOURCE_EXT})\b"
r"|(?<![/\\\w.-])(?:[A-Za-z0-9_.-]+[\\/]){2,}[A-Za-z0-9_.-]+"
)
_CALL_LIKE_RE = re.compile(r"\b[A-Za-z_][A-Za-z0-9_.]*\s*\([^()\n]{1,160}\)")
# High-entropy bare token: catches keys/hashes/blobs with no keyword anchor.
_HIGH_ENTROPY_RE = re.compile(r"[A-Za-z0-9+/=_.\-]{20,}")
_PLACEHOLDER_TOKEN_RE = re.compile(r"^<redacted-[a-z]+>[\W]*$")

_DETAIL_LIMIT = 500


def sanitize_vulnerability_text(value: object, *, limit: int = _DETAIL_LIMIT) -> str:
"""Return SARIF free text without source snippets, paths, or secret-like text."""
text = " ".join(str(value or "").split())
"""Return SARIF free text without source snippets, paths, hosts, or secrets."""
text = _normalize(value)
if not text:
return ""
text = _CODE_FENCE_RE.sub("<redacted-code>", text)
text = _INLINE_CODE_RE.sub("<redacted-code>", text)
text = _SECRET_LIKE_RE.sub("<redacted-secret>", text)
text = _RELATIVE_PATH_RE.sub("<redacted-path>", text)
text = _PATH_LIKE_RE.sub("<redacted-path>", text)
text = _apply_common_redactions(text)
text = _HOST_RE.sub("<redacted-host>", text)
text = _REL_PATH_RE.sub("<redacted-path>", text)
text = _ABS_PATH_RE.sub("<redacted-path>", text)
text = _SRC_FILE_RE.sub("<redacted-path>", text)
text = _CALL_LIKE_RE.sub("<redacted-code>", text)
if len(text) > limit:
return text[: limit - 3].rstrip() + "..."
return text
text = _redact_secretish_tokens(text)
return _truncate(text, limit)


def sanitize_vulnerability_identifier(
Expand All @@ -50,18 +135,29 @@ def sanitize_vulnerability_identifier(
limit: int = 200,
) -> str:
"""Return a SARIF identifier-like value without path/snippet/secret text."""
text = " ".join(str(value or "").split())
text = _normalize(value)
if not text:
return fallback
text = _CODE_FENCE_RE.sub("<redacted-code>", text)
text = _INLINE_CODE_RE.sub("<redacted-code>", text)
text = _SECRET_LIKE_RE.sub("<redacted-secret>", text)
text = _IDENTIFIER_RELATIVE_PATH_RE.sub("<redacted-path>", text)
text = _PATH_LIKE_RE.sub("<redacted-path>", text)
text = _apply_common_redactions(text)
# Identifier path redaction only (no host / bare-filename rule), so dotted
# rule ids ("python.lang.security.audit.sql-injection", "com.acme.io.Rule")
# and single-slash rule ids ("py/sql-injection") survive intact.
text = _IDENTIFIER_PATH_RE.sub("<redacted-path>", text)
text = _ABS_PATH_RE.sub("<redacted-path>", text)
text = _CALL_LIKE_RE.sub("<redacted-code>", text)
if len(text) > limit:
return text[: limit - 3].rstrip() + "..."
return text or fallback
text = _redact_secretish_tokens(text)
return _truncate(text, limit) or fallback


def normalize_owasp_tag(value: object) -> str | None:
"""Reduce a free-form SARIF tag to a recognized OWASP token, or drop it."""
text = _normalize(value)
match = re.search(r"(?i)a0?([1-9]|10)\b", text)
if match:
return f"A{int(match.group(1)):02d}"
if "owasp" in text.lower():
return "owasp"
return None


def sanitize_partial_fingerprints(value: object) -> dict[str, str]:
Expand All @@ -86,6 +182,54 @@ def sanitize_vulnerability_uri(value: object) -> str | None:
return "<redacted-uri>"


def _normalize(value: object) -> str:
text = unicodedata.normalize("NFKC", str(value or ""))
text = text.translate(_HOMOGLYPHS)
text = _PERCENT_SEP_RE.sub("/", text)
text = _PERCENT_BACKSLASH_RE.sub("\\\\", text)
return " ".join(text.split())


def _apply_common_redactions(text: str) -> str:
text = _CODE_FENCE_RE.sub("<redacted-code>", text)
text = _INLINE_CODE_RE.sub("<redacted-code>", text)
text = _STRING_LITERAL_RE.sub("<redacted-code>", text)
text = _PEM_RE.sub("<redacted-secret>", text)
text = _SQL_STMT_RE.sub("<redacted-code>", text)
text = _ASSIGN_IDENT_RE.sub("<redacted-code>", text)
text = _SECRET_LIKE_RE.sub("<redacted-secret>", text)
text = _EMAIL_RE.sub("<redacted-contact>", text)
text = _URL_RE.sub("<redacted-uri>", text)
text = _IPV4_RE.sub("<redacted-host>", text)
text = _IPV6_RE.sub("<redacted-host>", text)
return text


def _redact_secretish_tokens(text: str) -> str:
out: list[str] = []
for token in text.split(" "):
if not token or _PLACEHOLDER_TOKEN_RE.match(token):
out.append(token)
elif _is_secretish(token):
out.append("<redacted-secret>")
else:
out.append(token)
return " ".join(out)


def _is_secretish(token: str) -> bool:
core = token.strip(".,;:!?)(<>[]{}'\"")
if not _HIGH_ENTROPY_RE.fullmatch(core):
return False
return any(c.isdigit() for c in core) and any(c.isalpha() for c in core)


def _truncate(text: str, limit: int) -> str:
if len(text) > limit:
return text[: limit - 3].rstrip() + "..."
return text


def _fingerprint_value(value: object) -> str:
text = str(value or "")
if text.startswith("sha256:"):
Expand Down
9 changes: 7 additions & 2 deletions src/security_scanner/core/vulnerability/sarif.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
location_with_region,
)
from security_scanner.core.vulnerability.redaction import (
normalize_owasp_tag,
sanitize_partial_fingerprints,
sanitize_vulnerability_identifier,
sanitize_vulnerability_text,
Expand Down Expand Up @@ -272,16 +273,20 @@ def _extract_cwe_ids(tags: list[str]) -> set[str]:


def _extract_owasp_tags(tags: list[str]) -> set[str]:
return {tag for tag in tags if "owasp" in tag.lower()}
# Reduce free-form tool tags to recognized OWASP tokens (A01..A10 / owasp);
# never persist the raw tag string (it can carry paths/snippets).
normalized = {normalize_owasp_tag(tag) for tag in tags}
return {tag for tag in normalized if tag}


def _safe_properties(properties: dict[str, object]) -> dict[str, object]:
# 'tags' is intentionally excluded: free-form tool tags are not persisted
# raw; only derived cwe_ids / owasp_tags survive (see _extract_* above).
allowed = {
"precision",
"security-severity",
"securitySeverity",
"problem.severity",
"tags",
}
return {
key: _safe_property_value(value)
Expand Down
12 changes: 5 additions & 7 deletions src/security_scanner/llm/vulnerability/verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@

import json
import os
import re
import urllib.error
import urllib.request
from collections.abc import Callable
from dataclasses import dataclass
from urllib.parse import urlparse

from security_scanner.core.vulnerability.model import VulnerabilityFinding
from security_scanner.core.vulnerability.redaction import sanitize_vulnerability_text
from security_scanner.llm.common.verifier import (
VerifierConfig,
parse_verifier_response,
Expand All @@ -34,7 +34,6 @@
"required": ["label", "confidence", "reason", "remediation"],
"additionalProperties": False,
}
_PATH_LIKE_RE = re.compile(r"(?:(?:/[A-Za-z0-9._ -]+)+|[A-Za-z]:\\[^\\\s]+)")


@dataclass(frozen=True)
Expand Down Expand Up @@ -105,7 +104,7 @@ def apply_vulnerability_verifier_result(
if result.remediation
else None
),
"error": result.error,
"error": _sanitize_text(result.error) if result.error else None,
}
return VulnerabilityFinding.from_dict(data)

Expand Down Expand Up @@ -185,10 +184,9 @@ def _sanitize_text(
for forbidden in forbidden_values or []:
if forbidden:
cleaned = cleaned.replace(forbidden, "<redacted>")
cleaned = _PATH_LIKE_RE.sub("<redacted>", cleaned)
if len(cleaned) > 500:
cleaned = cleaned[:497].rstrip() + "..."
return cleaned
# Route LLM-authored verdict text through the central deny-by-default
# redaction so code/secret/path tokens cannot persist into the JSONL.
return sanitize_vulnerability_text(cleaned)


def _chat_url(host: str) -> str:
Expand Down
Loading
Loading