#!/usr/bin/env python3 # SPDX-License-Identifier: AGPL-3.0-or-later # BENCH-AUTO-401-019: Offline VEX proof bundle verifier """ Offline verification of VEX proof bundles without network access. Validates: - DSSE envelope structure - Payload type and format - Evidence hash references - Justification catalog membership - CAS hash verification Usage: python bench/tools/verify.py --bundle PATH [--cas-root PATH] [--catalog PATH] """ import argparse import base64 import hashlib import json import sys from pathlib import Path from typing import Any class VerificationResult: """Result of a verification check.""" def __init__(self, passed: bool, message: str, details: str = ""): self.passed = passed self.message = message self.details = details def __str__(self): status = "\033[0;32m✓\033[0m" if self.passed else "\033[0;31m✗\033[0m" result = f"{status} {self.message}" if self.details: result += f"\n {self.details}" return result def sha256_hex(data: bytes) -> str: """Compute SHA-256 hash.""" return hashlib.sha256(data).hexdigest() def blake3_hex(data: bytes) -> str: """Compute BLAKE3-256 hash (fallback to SHA-256).""" try: import blake3 return "blake3:" + blake3.blake3(data).hexdigest() except ImportError: return "sha256:" + sha256_hex(data) def load_json(path: Path) -> dict | None: """Load JSON file.""" try: with open(path, 'r', encoding='utf-8') as f: return json.load(f) except (json.JSONDecodeError, FileNotFoundError) as e: return None def verify_dsse_structure(dsse: dict) -> list[VerificationResult]: """Verify DSSE envelope structure.""" results = [] # Check required fields if "payloadType" not in dsse: results.append(VerificationResult(False, "Missing payloadType")) else: results.append(VerificationResult(True, f"payloadType: {dsse['payloadType']}")) if "payload" not in dsse: results.append(VerificationResult(False, "Missing payload")) else: results.append(VerificationResult(True, "payload present")) if "signatures" not in dsse or not dsse["signatures"]: results.append(VerificationResult(False, "Missing or empty signatures")) else: sig_count = len(dsse["signatures"]) results.append(VerificationResult(True, f"Found {sig_count} signature(s)")) # Check for placeholder signatures for i, sig in enumerate(dsse["signatures"]): sig_value = sig.get("sig", "") if sig_value.startswith("PLACEHOLDER"): results.append(VerificationResult( False, f"Signature {i} is placeholder", "Bundle needs actual signing before deployment" )) else: keyid = sig.get("keyid", "unknown") results.append(VerificationResult(True, f"Signature {i} keyid: {keyid}")) return results def decode_payload(dsse: dict) -> tuple[dict | None, list[VerificationResult]]: """Decode DSSE payload.""" results = [] payload_b64 = dsse.get("payload", "") if not payload_b64: results.append(VerificationResult(False, "Empty payload")) return None, results try: payload_bytes = base64.b64decode(payload_b64) payload = json.loads(payload_bytes) results.append(VerificationResult(True, "Payload decoded successfully")) return payload, results except Exception as e: results.append(VerificationResult(False, f"Failed to decode payload: {e}")) return None, results def verify_openvex(payload: dict) -> list[VerificationResult]: """Verify OpenVEX document structure.""" results = [] # Check OpenVEX context context = payload.get("@context", "") if "openvex" in context.lower(): results.append(VerificationResult(True, f"OpenVEX context: {context}")) else: results.append(VerificationResult(False, f"Unexpected context: {context}")) # Check statements statements = payload.get("statements", []) if not statements: results.append(VerificationResult(False, "No VEX statements")) else: results.append(VerificationResult(True, f"Contains {len(statements)} statement(s)")) for i, stmt in enumerate(statements): vuln = stmt.get("vulnerability", {}) vuln_id = vuln.get("name", vuln.get("@id", "unknown")) status = stmt.get("status", "unknown") results.append(VerificationResult( True, f"Statement {i}: {vuln_id} -> {status}" )) return results def verify_evidence_hashes(payload: dict, cas_root: Path | None) -> list[VerificationResult]: """Verify evidence hash references against CAS.""" results = [] statements = payload.get("statements", []) for stmt in statements: impact = stmt.get("impact_statement", "") if "Evidence hash:" in impact: hash_value = impact.split("Evidence hash:")[1].strip() results.append(VerificationResult(True, f"Evidence hash: {hash_value[:16]}...")) # Verify against CAS if root provided if cas_root and cas_root.exists(): # Look for reachability.json in CAS reach_file = cas_root / "reachability.json" if reach_file.exists(): with open(reach_file, 'rb') as f: content = f.read() actual_hash = blake3_hex(content) if actual_hash == hash_value or hash_value in actual_hash: results.append(VerificationResult(True, "Evidence hash matches CAS")) else: results.append(VerificationResult( False, "Evidence hash mismatch", f"Expected: {hash_value[:32]}..., Got: {actual_hash[:32]}..." )) return results def verify_catalog_membership(payload: dict, catalog_path: Path) -> list[VerificationResult]: """Verify justification is in catalog.""" results = [] if not catalog_path.exists(): results.append(VerificationResult(False, f"Catalog not found: {catalog_path}")) return results catalog = load_json(catalog_path) if catalog is None: results.append(VerificationResult(False, "Failed to load catalog")) return results # Extract catalog entries entries = catalog if isinstance(catalog, list) else catalog.get("entries", []) catalog_ids = {e.get("id", "") for e in entries} # Check each statement's justification statements = payload.get("statements", []) for stmt in statements: justification = stmt.get("justification") if justification: if justification in catalog_ids: results.append(VerificationResult( True, f"Justification '{justification}' in catalog" )) else: results.append(VerificationResult( False, f"Justification '{justification}' not in catalog" )) return results def main(): parser = argparse.ArgumentParser( description="Offline VEX proof bundle verifier" ) parser.add_argument( "--bundle", type=Path, required=True, help="Path to DSSE bundle file" ) parser.add_argument( "--cas-root", type=Path, default=None, help="Path to CAS evidence directory" ) parser.add_argument( "--catalog", type=Path, default=Path("docs/benchmarks/vex-justifications.catalog.json"), help="Path to justification catalog" ) args = parser.parse_args() # Resolve paths repo_root = Path(__file__).parent.parent.parent bundle_path = args.bundle if args.bundle.is_absolute() else repo_root / args.bundle catalog_path = args.catalog if args.catalog.is_absolute() else repo_root / args.catalog cas_root = args.cas_root if args.cas_root and args.cas_root.is_absolute() else ( repo_root / args.cas_root if args.cas_root else None ) print(f"Verifying: {bundle_path}") print("") all_results = [] passed = 0 failed = 0 # Load DSSE bundle dsse = load_json(bundle_path) if dsse is None: print("\033[0;31m✗\033[0m Failed to load bundle") return 1 # Verify DSSE structure print("DSSE Structure:") results = verify_dsse_structure(dsse) for r in results: print(f" {r}") if r.passed: passed += 1 else: failed += 1 all_results.extend(results) # Decode payload print("\nPayload:") payload, results = decode_payload(dsse) for r in results: print(f" {r}") if r.passed: passed += 1 else: failed += 1 all_results.extend(results) if payload: # Verify OpenVEX structure payload_type = dsse.get("payloadType", "") if "openvex" in payload_type.lower(): print("\nOpenVEX:") results = verify_openvex(payload) for r in results: print(f" {r}") if r.passed: passed += 1 else: failed += 1 all_results.extend(results) # Verify evidence hashes print("\nEvidence:") results = verify_evidence_hashes(payload, cas_root) for r in results: print(f" {r}") if r.passed: passed += 1 else: failed += 1 all_results.extend(results) # Verify catalog membership print("\nCatalog:") results = verify_catalog_membership(payload, catalog_path) for r in results: print(f" {r}") if r.passed: passed += 1 else: failed += 1 all_results.extend(results) # Summary print(f"\n{'='*40}") print(f"Passed: {passed}, Failed: {failed}") return 0 if failed == 0 else 1 if __name__ == "__main__": sys.exit(main())