#!/usr/bin/env python3 """ Offline verifier for StellaOps VEX proof bundles. - Validates the bundle against `docs/benchmarks/vex-evidence-playbook.schema.json`. - Checks justification IDs against the signed catalog. - Recomputes hashes for CAS artefacts, OpenVEX payload, and DSSE envelopes. - Enforces coverage and negative-test requirements per task VEX-GAPS-401-062. """ from __future__ import annotations import argparse import base64 import json from pathlib import Path import sys from typing import Dict, Any import jsonschema from blake3 import blake3 def load_json(path: Path) -> Any: return json.loads(path.read_text(encoding="utf-8")) def digest_for(data: bytes, algo: str) -> str: if algo == "sha256": import hashlib return hashlib.sha256(data).hexdigest() if algo == "blake3": return blake3(data).hexdigest() raise ValueError(f"Unsupported hash algorithm: {algo}") def parse_digest(digest: str) -> tuple[str, str]: if ":" not in digest: raise ValueError(f"Digest missing prefix: {digest}") algo, value = digest.split(":", 1) return algo, value def verify_digest(path: Path, expected: str) -> None: algo, value = parse_digest(expected) actual = digest_for(path.read_bytes(), algo) if actual.lower() != value.lower(): raise ValueError(f"Digest mismatch for {path}: expected {value}, got {actual}") def resolve_cas_uri(cas_root: Path, cas_uri: str) -> Path: if not cas_uri.startswith("cas://"): raise ValueError(f"CAS URI must start with cas:// — got {cas_uri}") relative = cas_uri[len("cas://") :] return cas_root / relative def verify_dsse(dsse_ref: Dict[str, Any]) -> None: path = Path(dsse_ref["path"]) verify_digest(path, dsse_ref["sha256"]) if "payload_sha256" in dsse_ref: envelope = load_json(path) payload = base64.b64decode(envelope["payload"]) verify_digest_from_bytes(payload, dsse_ref["payload_sha256"]) def verify_digest_from_bytes(data: bytes, expected: str) -> None: algo, value = parse_digest(expected) actual = digest_for(data, algo) if actual.lower() != value.lower(): raise ValueError(f"Digest mismatch for payload: expected {value}, got {actual}") def main() -> int: parser = argparse.ArgumentParser(description="Verify a StellaOps VEX proof bundle.") parser.add_argument("--bundle", required=True, type=Path) parser.add_argument("--schema", required=True, type=Path) parser.add_argument("--catalog", required=True, type=Path) parser.add_argument("--cas-root", required=True, type=Path) parser.add_argument("--min-coverage", type=float, default=95.0) args = parser.parse_args() bundle = load_json(args.bundle) schema = load_json(args.schema) catalog = load_json(args.catalog) jsonschema.validate(instance=bundle, schema=schema) justification_ids = {entry["id"] for entry in catalog.get("entries", [])} if bundle["justification"]["id"] not in justification_ids: raise ValueError(f"Justification {bundle['justification']['id']} not found in catalog") # Justification DSSE integrity if "dsse" in bundle["justification"]: verify_dsse(bundle["justification"]["dsse"]) # OpenVEX canonical hashes openvex_path = Path(bundle["openvex"]["path"]) openvex_bytes = openvex_path.read_bytes() verify_digest_from_bytes(openvex_bytes, bundle["openvex"]["canonical_sha256"]) verify_digest_from_bytes(openvex_bytes, bundle["openvex"]["canonical_blake3"]) # CAS evidence evidence_by_type: Dict[str, Dict[str, Any]] = {} for ev in bundle["evidence"]: ev_path = resolve_cas_uri(args.cas_root, ev["cas_uri"]) verify_digest(ev_path, ev["hash"]) if "dsse" in ev: verify_dsse(ev["dsse"]) evidence_by_type.setdefault(ev["type"], ev) # Graph hash alignment graph = bundle["graph"] graph_evidence = evidence_by_type.get("graph") if not graph_evidence: raise ValueError("Graph evidence missing from bundle") if graph["hash"].lower() != graph_evidence["hash"].lower(): raise ValueError("Graph hash does not match evidence hash") if "dsse" in graph: verify_dsse(graph["dsse"]) # Entrypoint coverage + negative tests + config/flags hashes for ep in bundle["entrypoints"]: if ep["coverage_percent"] < args.min_coverage: raise ValueError( f"Entrypoint {ep['id']} coverage {ep['coverage_percent']} below required {args.min_coverage}" ) if not ep["negative_tests"]: raise ValueError(f"Entrypoint {ep['id']} missing negative test confirmation") config_ev = evidence_by_type.get("config") if not config_ev or config_ev["hash"].lower() != ep["config_hash"].lower(): raise ValueError(f"Entrypoint {ep['id']} config_hash not backed by evidence") flags_ev = evidence_by_type.get("flags") if not flags_ev or flags_ev["hash"].lower() != ep["flags_hash"].lower(): raise ValueError(f"Entrypoint {ep['id']} flags_hash not backed by evidence") # RBAC enforcement rbac = bundle["rbac"] if rbac["approvals_required"] < 1 or not rbac["roles_allowed"]: raise ValueError("RBAC section is incomplete") # Reevaluation triggers: must all be true to satisfy VEX-GAPS-401-062 reevaluation = bundle["reevaluation"] if not all( [ reevaluation.get("on_sbom_change"), reevaluation.get("on_graph_change"), reevaluation.get("on_runtime_change"), ] ): raise ValueError("Reevaluation triggers must all be true") # Uncertainty gating present uncertainty = bundle["uncertainty"] if uncertainty["state"] not in {"U0-none", "U1-low", "U2-medium", "U3-high"}: raise ValueError("Invalid uncertainty state") # Signature envelope integrity (best-effort) default_dsse_path = args.bundle.with_suffix(".dsse.json") if default_dsse_path.exists(): sig_envelope_digest = f"sha256:{digest_for(default_dsse_path.read_bytes(), 'sha256')}" for sig in bundle["signatures"]: if sig["envelope_digest"].lower() != sig_envelope_digest.lower(): raise ValueError("Signature envelope digest mismatch") print("✔ VEX proof bundle verified") return 0 if __name__ == "__main__": try: sys.exit(main()) except Exception as exc: # pragma: no cover - top-level guard print(f"Verification failed: {exc}", file=sys.stderr) sys.exit(1)