#!/usr/bin/env python3 """ Build vulnerability explorer fixtures (JSON + CSV) from the canonical graph-40k fixture. Generates deterministic outputs in `samples/graph/graph-40k/explorer/`: - vuln-explorer.json - vuln-explorer.csv - manifest.json (hashes + counts) """ from __future__ import annotations import csv import hashlib import json from pathlib import Path from typing import List ROOT = Path(__file__).resolve().parent.parent GRAPH_ROOT = ROOT / "graph-40k" OVERLAY_PATH = GRAPH_ROOT / "overlay.ndjson" OUT_DIR = GRAPH_ROOT / "explorer" # Fixed advisory set to keep fixtures stable and small. ADVISORIES = [ ("CVE-2024-0001", "critical"), ("CVE-2024-0002", "high"), ("CVE-2023-9999", "medium"), ("CVE-2025-1234", "low"), ("CVE-2022-4242", "none"), ] def sha256(path: Path) -> str: h = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(8192), b""): h.update(chunk) return h.hexdigest() def load_overlays() -> List[dict]: overlays: List[dict] = [] with OVERLAY_PATH.open("r", encoding="utf-8") as f: for line in f: if line.strip(): overlays.append(json.loads(line)) overlays.sort(key=lambda o: o["overlay_id"]) return overlays def build_records() -> List[dict]: overlays = load_overlays()[: len(ADVISORIES)] records: List[dict] = [] for idx, overlay in enumerate(overlays): advisory_id, advisory_sev = ADVISORIES[idx] reachable = idx % 2 == 0 # alternate reachable/unreachable for UI coverage status = "affected" if reachable else "not_affected" conflict = "policy_deny_vs_scanner_affected" if overlay["verdict"] == "deny" and reachable else None record = { "component": overlay["node_id"], "advisory": advisory_id, "advisory_severity": advisory_sev, "reachability": "reachable" if reachable else "unreachable", "status": status, "policy_overlay_id": overlay["overlay_id"], "policy_verdict": overlay["verdict"], "policy_severity": overlay["severity"], "policy_rule_id": overlay["rule_id"], "evidence": [ "sbom:mock-sbom-v1", f"overlay:{overlay['overlay_id']}", ], "conflict": conflict or "", "snapshot": overlay["snapshot"], "tenant": overlay["tenant"], } records.append(record) return records def write_json(records: List[dict], path: Path) -> None: path.write_text(json.dumps(records, indent=2, sort_keys=True)) def write_csv(records: List[dict], path: Path) -> None: fieldnames = [ "component", "advisory", "advisory_severity", "reachability", "status", "policy_overlay_id", "policy_verdict", "policy_severity", "policy_rule_id", "evidence", "conflict", "snapshot", "tenant", ] with path.open("w", encoding="utf-8", newline="") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for r in records: row = r.copy() row["evidence"] = ";".join(r["evidence"]) writer.writerow(row) def write_manifest(json_path: Path, csv_path: Path, count: int, manifest_path: Path) -> None: manifest = { "fixture": "graph-40k", "advisories": [a for a, _ in ADVISORIES], "count": count, "hashes": { "vuln-explorer.json": sha256(json_path), "vuln-explorer.csv": sha256(csv_path), }, } manifest_path.write_text(json.dumps(manifest, indent=2, sort_keys=True)) def main() -> int: OUT_DIR.mkdir(parents=True, exist_ok=True) records = build_records() json_path = OUT_DIR / "vuln-explorer.json" csv_path = OUT_DIR / "vuln-explorer.csv" manifest_path = OUT_DIR / "manifest.json" write_json(records, json_path) write_csv(records, csv_path) write_manifest(json_path, csv_path, len(records), manifest_path) print(f"Wrote {len(records)} records to {OUT_DIR}") return 0 if __name__ == "__main__": raise SystemExit(main())