Some checks failed
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Concelier Attestation Tests / attestation-tests (push) Has been cancelled
Console CI / console-ci (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
Export Center CI / export-ci (push) Has been cancelled
VEX Proof Bundles / verify-bundles (push) Has been cancelled
- Introduced sample proof bundle configuration files for testing, including `sample-proof-bundle-config.dsse.json`, `sample-proof-bundle.dsse.json`, and `sample-proof-bundle.json`. - Implemented a verification script `test_verify_sample.sh` to validate proof bundles against specified schemas and catalogs. - Updated existing proof bundle configurations with new metadata, including versioning, created timestamps, and justification details. - Enhanced evidence entries with expiration dates and hashes for better integrity checks. - Ensured all new configurations adhere to the defined schema for consistency and reliability in testing.
177 lines
6.5 KiB
Python
177 lines
6.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Offline verifier for StellaOps VEX proof bundles.
|
|
|
|
- Validates the bundle against `docs/benchmarks/vex-evidence-playbook.schema.json`.
|
|
- Checks justification IDs against the signed catalog.
|
|
- Recomputes hashes for CAS artefacts, OpenVEX payload, and DSSE envelopes.
|
|
- Enforces coverage and negative-test requirements per task VEX-GAPS-401-062.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import base64
|
|
import json
|
|
from pathlib import Path
|
|
import sys
|
|
from typing import Dict, Any
|
|
|
|
import jsonschema
|
|
from blake3 import blake3
|
|
|
|
|
|
def load_json(path: Path) -> Any:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
|
|
|
|
def digest_for(data: bytes, algo: str) -> str:
|
|
if algo == "sha256":
|
|
import hashlib
|
|
|
|
return hashlib.sha256(data).hexdigest()
|
|
if algo == "blake3":
|
|
return blake3(data).hexdigest()
|
|
raise ValueError(f"Unsupported hash algorithm: {algo}")
|
|
|
|
|
|
def parse_digest(digest: str) -> tuple[str, str]:
|
|
if ":" not in digest:
|
|
raise ValueError(f"Digest missing prefix: {digest}")
|
|
algo, value = digest.split(":", 1)
|
|
return algo, value
|
|
|
|
|
|
def verify_digest(path: Path, expected: str) -> None:
|
|
algo, value = parse_digest(expected)
|
|
actual = digest_for(path.read_bytes(), algo)
|
|
if actual.lower() != value.lower():
|
|
raise ValueError(f"Digest mismatch for {path}: expected {value}, got {actual}")
|
|
|
|
|
|
def resolve_cas_uri(cas_root: Path, cas_uri: str) -> Path:
|
|
if not cas_uri.startswith("cas://"):
|
|
raise ValueError(f"CAS URI must start with cas:// — got {cas_uri}")
|
|
relative = cas_uri[len("cas://") :]
|
|
return cas_root / relative
|
|
|
|
|
|
def verify_dsse(dsse_ref: Dict[str, Any]) -> None:
|
|
path = Path(dsse_ref["path"])
|
|
verify_digest(path, dsse_ref["sha256"])
|
|
if "payload_sha256" in dsse_ref:
|
|
envelope = load_json(path)
|
|
payload = base64.b64decode(envelope["payload"])
|
|
verify_digest_from_bytes(payload, dsse_ref["payload_sha256"])
|
|
|
|
|
|
def verify_digest_from_bytes(data: bytes, expected: str) -> None:
|
|
algo, value = parse_digest(expected)
|
|
actual = digest_for(data, algo)
|
|
if actual.lower() != value.lower():
|
|
raise ValueError(f"Digest mismatch for payload: expected {value}, got {actual}")
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Verify a StellaOps VEX proof bundle.")
|
|
parser.add_argument("--bundle", required=True, type=Path)
|
|
parser.add_argument("--schema", required=True, type=Path)
|
|
parser.add_argument("--catalog", required=True, type=Path)
|
|
parser.add_argument("--cas-root", required=True, type=Path)
|
|
parser.add_argument("--min-coverage", type=float, default=95.0)
|
|
args = parser.parse_args()
|
|
|
|
bundle = load_json(args.bundle)
|
|
schema = load_json(args.schema)
|
|
catalog = load_json(args.catalog)
|
|
|
|
jsonschema.validate(instance=bundle, schema=schema)
|
|
|
|
justification_ids = {entry["id"] for entry in catalog.get("entries", [])}
|
|
if bundle["justification"]["id"] not in justification_ids:
|
|
raise ValueError(f"Justification {bundle['justification']['id']} not found in catalog")
|
|
|
|
# Justification DSSE integrity
|
|
if "dsse" in bundle["justification"]:
|
|
verify_dsse(bundle["justification"]["dsse"])
|
|
|
|
# OpenVEX canonical hashes
|
|
openvex_path = Path(bundle["openvex"]["path"])
|
|
openvex_bytes = openvex_path.read_bytes()
|
|
verify_digest_from_bytes(openvex_bytes, bundle["openvex"]["canonical_sha256"])
|
|
verify_digest_from_bytes(openvex_bytes, bundle["openvex"]["canonical_blake3"])
|
|
|
|
# CAS evidence
|
|
evidence_by_type: Dict[str, Dict[str, Any]] = {}
|
|
for ev in bundle["evidence"]:
|
|
ev_path = resolve_cas_uri(args.cas_root, ev["cas_uri"])
|
|
verify_digest(ev_path, ev["hash"])
|
|
if "dsse" in ev:
|
|
verify_dsse(ev["dsse"])
|
|
evidence_by_type.setdefault(ev["type"], ev)
|
|
|
|
# Graph hash alignment
|
|
graph = bundle["graph"]
|
|
graph_evidence = evidence_by_type.get("graph")
|
|
if not graph_evidence:
|
|
raise ValueError("Graph evidence missing from bundle")
|
|
if graph["hash"].lower() != graph_evidence["hash"].lower():
|
|
raise ValueError("Graph hash does not match evidence hash")
|
|
if "dsse" in graph:
|
|
verify_dsse(graph["dsse"])
|
|
|
|
# Entrypoint coverage + negative tests + config/flags hashes
|
|
for ep in bundle["entrypoints"]:
|
|
if ep["coverage_percent"] < args.min_coverage:
|
|
raise ValueError(
|
|
f"Entrypoint {ep['id']} coverage {ep['coverage_percent']} below required {args.min_coverage}"
|
|
)
|
|
if not ep["negative_tests"]:
|
|
raise ValueError(f"Entrypoint {ep['id']} missing negative test confirmation")
|
|
config_ev = evidence_by_type.get("config")
|
|
if not config_ev or config_ev["hash"].lower() != ep["config_hash"].lower():
|
|
raise ValueError(f"Entrypoint {ep['id']} config_hash not backed by evidence")
|
|
flags_ev = evidence_by_type.get("flags")
|
|
if not flags_ev or flags_ev["hash"].lower() != ep["flags_hash"].lower():
|
|
raise ValueError(f"Entrypoint {ep['id']} flags_hash not backed by evidence")
|
|
|
|
# RBAC enforcement
|
|
rbac = bundle["rbac"]
|
|
if rbac["approvals_required"] < 1 or not rbac["roles_allowed"]:
|
|
raise ValueError("RBAC section is incomplete")
|
|
|
|
# Reevaluation triggers: must all be true to satisfy VEX-GAPS-401-062
|
|
reevaluation = bundle["reevaluation"]
|
|
if not all(
|
|
[
|
|
reevaluation.get("on_sbom_change"),
|
|
reevaluation.get("on_graph_change"),
|
|
reevaluation.get("on_runtime_change"),
|
|
]
|
|
):
|
|
raise ValueError("Reevaluation triggers must all be true")
|
|
|
|
# Uncertainty gating present
|
|
uncertainty = bundle["uncertainty"]
|
|
if uncertainty["state"] not in {"U0-none", "U1-low", "U2-medium", "U3-high"}:
|
|
raise ValueError("Invalid uncertainty state")
|
|
|
|
# Signature envelope integrity (best-effort)
|
|
default_dsse_path = args.bundle.with_suffix(".dsse.json")
|
|
if default_dsse_path.exists():
|
|
sig_envelope_digest = f"sha256:{digest_for(default_dsse_path.read_bytes(), 'sha256')}"
|
|
for sig in bundle["signatures"]:
|
|
if sig["envelope_digest"].lower() != sig_envelope_digest.lower():
|
|
raise ValueError("Signature envelope digest mismatch")
|
|
|
|
print("✔ VEX proof bundle verified")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
sys.exit(main())
|
|
except Exception as exc: # pragma: no cover - top-level guard
|
|
print(f"Verification failed: {exc}", file=sys.stderr)
|
|
sys.exit(1)
|