#!/usr/bin/env python3 """ Generate canonical SAMPLES-GRAPH-24-003 fixture. Outputs: - nodes.ndjson, edges.ndjson, overlay.ndjson - manifest.json with counts and SHA-256 hashes Deterministic and offline-only: fixed seed, fixed timestamps, sorted output. """ from __future__ import annotations import argparse import hashlib import json import random from pathlib import Path from typing import Iterable, List, Tuple TENANT = "demo-tenant" SNAPSHOT_ID = "graph-40k-policy-overlay-20251122" GENERATED_AT = "2025-11-22T00:00:00Z" DEFAULT_NODE_COUNT = 40_000 SEED = 424_242 MAX_FANOUT = 4 OVERLAY_INTERVAL = 400 # one overlay per 400 nodes -> ~100 overlays for 40k nodes OVERLAY_VERDICTS = ("allow", "deny", "defer") OVERLAY_SEVERITIES = ("none", "low", "medium", "high", "critical") def sha256(path: Path) -> str: h = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(8192), b""): h.update(chunk) return h.hexdigest() def write_ndjson(path: Path, rows: Iterable[dict]) -> None: with path.open("w", encoding="utf-8", newline="\n") as f: for row in rows: f.write(json.dumps(row, sort_keys=True, separators=(",", ":"))) f.write("\n") def build_nodes(count: int, rng: random.Random) -> List[dict]: nodes: List[dict] = [] for i in range(count): version_patch = i % 5 purl = f"pkg:pypi/demo-{i}@1.0.{version_patch}" node = { "id": purl, "kind": "component", "name": f"demo-{i}", "purl": purl, "tenant": TENANT, "version": f"1.0.{version_patch}", "snapshot": SNAPSHOT_ID, } nodes.append(node) nodes.sort(key=lambda n: n["id"]) return nodes def build_edges(nodes: List[dict], rng: random.Random) -> List[dict]: edges: List[dict] = [] for idx, node in enumerate(nodes): if idx == 0: continue fanout = rng.randint(1, min(MAX_FANOUT, idx)) targets_idx = rng.sample(range(idx), fanout) for tgt_idx in targets_idx: edges.append( { "source": node["id"], "target": nodes[tgt_idx]["id"], "kind": "DEPENDS_ON", "provenance": "mock-sbom-v1", "snapshot": SNAPSHOT_ID, "tenant": TENANT, } ) edges.sort(key=lambda e: (e["source"], e["target"])) return edges def build_overlays(nodes: List[dict], rng: random.Random) -> List[dict]: overlays: List[dict] = [] for idx, node in enumerate(nodes): if idx % OVERLAY_INTERVAL != 0: continue verdict = rng.choice(OVERLAY_VERDICTS) severity = rng.choice(OVERLAY_SEVERITIES) rule_id = f"RULE-{idx:05d}" overlay_id = hashlib.sha256(f"{TENANT}|{node['id']}|policy.overlay.v1".encode()).hexdigest() overlays.append( { "overlay_id": overlay_id, "overlay_kind": "policy.overlay.v1", "tenant": TENANT, "snapshot": SNAPSHOT_ID, "node_id": node["id"], "verdict": verdict, "rule_id": rule_id, "severity": severity, "explain": f"demo policy decision for {node['name']}", # bridge to bench overlay support (optional edge application) "source": node["id"], "target": f"policy:rule:{rule_id}", } ) overlays.sort(key=lambda o: o["overlay_id"]) return overlays def generate(out_dir: Path, node_count: int, seed: int) -> Tuple[Path, Path, Path, Path]: out_dir.mkdir(parents=True, exist_ok=True) rng = random.Random(seed) nodes = build_nodes(node_count, rng) edges = build_edges(nodes, rng) overlays = build_overlays(nodes, rng) nodes_path = out_dir / "nodes.ndjson" edges_path = out_dir / "edges.ndjson" overlay_path = out_dir / "overlay.ndjson" write_ndjson(nodes_path, nodes) write_ndjson(edges_path, edges) write_ndjson(overlay_path, overlays) manifest = { "snapshot_id": SNAPSHOT_ID, "tenant": TENANT, "generated_at": GENERATED_AT, "seed": seed, "counts": { "nodes": len(nodes), "edges": len(edges), "overlays": {"policy.overlay.v1": len(overlays)}, }, "hashes": { "nodes_ndjson_sha256": sha256(nodes_path), "edges_ndjson_sha256": sha256(edges_path), "overlay_ndjson_sha256": sha256(overlay_path), }, "overlay": { "path": "overlay.ndjson", "kind": "policy.overlay.v1", "id_scheme": "sha256(tenant|nodeId|overlayKind)", }, "inputs": {"sbom_source": "mock-sbom-v1"}, } manifest_path = out_dir / "manifest.json" manifest_path.write_text(json.dumps(manifest, indent=2, sort_keys=True)) return nodes_path, edges_path, overlay_path, manifest_path def main() -> int: parser = argparse.ArgumentParser(description="Generate canonical graph fixture (SAMPLES-GRAPH-24-003).") parser.add_argument("--out-dir", default="samples/graph/graph-40k", help="Output directory for fixture files") parser.add_argument("--nodes", type=int, default=DEFAULT_NODE_COUNT, help="Number of nodes to generate") parser.add_argument("--seed", type=int, default=SEED, help="Seed for deterministic generation") args = parser.parse_args() out_dir = Path(args.out_dir).resolve() nodes_path, edges_path, overlay_path, manifest_path = generate(out_dir, args.nodes, args.seed) print("Generated fixture:") print(f" nodes: {nodes_path}") print(f" edges: {edges_path}") print(f" overlay: {overlay_path}") print(f" manifest:{manifest_path}") return 0 if __name__ == "__main__": raise SystemExit(main())