up
Some checks failed
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Concelier Attestation Tests / attestation-tests (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
Export Center CI / export-ci (push) Has been cancelled
devportal-offline / build-offline (push) Has been cancelled
Some checks failed
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Concelier Attestation Tests / attestation-tests (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
Export Center CI / export-ci (push) Has been cancelled
devportal-offline / build-offline (push) Has been cancelled
This commit is contained in:
139
samples/graph/scripts/build_explorer_fixture.py
Normal file
139
samples/graph/scripts/build_explorer_fixture.py
Normal file
@@ -0,0 +1,139 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Build vulnerability explorer fixtures (JSON + CSV) from the canonical graph-40k fixture.
|
||||
|
||||
Generates deterministic outputs in `samples/graph/graph-40k/explorer/`:
|
||||
- vuln-explorer.json
|
||||
- vuln-explorer.csv
|
||||
- manifest.json (hashes + counts)
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import csv
|
||||
import hashlib
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
ROOT = Path(__file__).resolve().parent.parent
|
||||
GRAPH_ROOT = ROOT / "graph-40k"
|
||||
OVERLAY_PATH = GRAPH_ROOT / "overlay.ndjson"
|
||||
OUT_DIR = GRAPH_ROOT / "explorer"
|
||||
|
||||
# Fixed advisory set to keep fixtures stable and small.
|
||||
ADVISORIES = [
|
||||
("CVE-2024-0001", "critical"),
|
||||
("CVE-2024-0002", "high"),
|
||||
("CVE-2023-9999", "medium"),
|
||||
("CVE-2025-1234", "low"),
|
||||
("CVE-2022-4242", "none"),
|
||||
]
|
||||
|
||||
|
||||
def sha256(path: Path) -> str:
|
||||
h = hashlib.sha256()
|
||||
with path.open("rb") as f:
|
||||
for chunk in iter(lambda: f.read(8192), b""):
|
||||
h.update(chunk)
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def load_overlays() -> List[dict]:
|
||||
overlays: List[dict] = []
|
||||
with OVERLAY_PATH.open("r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
if line.strip():
|
||||
overlays.append(json.loads(line))
|
||||
overlays.sort(key=lambda o: o["overlay_id"])
|
||||
return overlays
|
||||
|
||||
|
||||
def build_records() -> List[dict]:
|
||||
overlays = load_overlays()[: len(ADVISORIES)]
|
||||
records: List[dict] = []
|
||||
for idx, overlay in enumerate(overlays):
|
||||
advisory_id, advisory_sev = ADVISORIES[idx]
|
||||
reachable = idx % 2 == 0 # alternate reachable/unreachable for UI coverage
|
||||
status = "affected" if reachable else "not_affected"
|
||||
conflict = "policy_deny_vs_scanner_affected" if overlay["verdict"] == "deny" and reachable else None
|
||||
|
||||
record = {
|
||||
"component": overlay["node_id"],
|
||||
"advisory": advisory_id,
|
||||
"advisory_severity": advisory_sev,
|
||||
"reachability": "reachable" if reachable else "unreachable",
|
||||
"status": status,
|
||||
"policy_overlay_id": overlay["overlay_id"],
|
||||
"policy_verdict": overlay["verdict"],
|
||||
"policy_severity": overlay["severity"],
|
||||
"policy_rule_id": overlay["rule_id"],
|
||||
"evidence": [
|
||||
"sbom:mock-sbom-v1",
|
||||
f"overlay:{overlay['overlay_id']}",
|
||||
],
|
||||
"conflict": conflict or "",
|
||||
"snapshot": overlay["snapshot"],
|
||||
"tenant": overlay["tenant"],
|
||||
}
|
||||
records.append(record)
|
||||
return records
|
||||
|
||||
|
||||
def write_json(records: List[dict], path: Path) -> None:
|
||||
path.write_text(json.dumps(records, indent=2, sort_keys=True))
|
||||
|
||||
|
||||
def write_csv(records: List[dict], path: Path) -> None:
|
||||
fieldnames = [
|
||||
"component",
|
||||
"advisory",
|
||||
"advisory_severity",
|
||||
"reachability",
|
||||
"status",
|
||||
"policy_overlay_id",
|
||||
"policy_verdict",
|
||||
"policy_severity",
|
||||
"policy_rule_id",
|
||||
"evidence",
|
||||
"conflict",
|
||||
"snapshot",
|
||||
"tenant",
|
||||
]
|
||||
with path.open("w", encoding="utf-8", newline="") as f:
|
||||
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
for r in records:
|
||||
row = r.copy()
|
||||
row["evidence"] = ";".join(r["evidence"])
|
||||
writer.writerow(row)
|
||||
|
||||
|
||||
def write_manifest(json_path: Path, csv_path: Path, count: int, manifest_path: Path) -> None:
|
||||
manifest = {
|
||||
"fixture": "graph-40k",
|
||||
"advisories": [a for a, _ in ADVISORIES],
|
||||
"count": count,
|
||||
"hashes": {
|
||||
"vuln-explorer.json": sha256(json_path),
|
||||
"vuln-explorer.csv": sha256(csv_path),
|
||||
},
|
||||
}
|
||||
manifest_path.write_text(json.dumps(manifest, indent=2, sort_keys=True))
|
||||
|
||||
|
||||
def main() -> int:
|
||||
OUT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
records = build_records()
|
||||
json_path = OUT_DIR / "vuln-explorer.json"
|
||||
csv_path = OUT_DIR / "vuln-explorer.csv"
|
||||
manifest_path = OUT_DIR / "manifest.json"
|
||||
|
||||
write_json(records, json_path)
|
||||
write_csv(records, csv_path)
|
||||
write_manifest(json_path, csv_path, len(records), manifest_path)
|
||||
print(f"Wrote {len(records)} records to {OUT_DIR}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user