Files
StellaOps Bot 233873f620
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
Signals CI & Image / signals-ci (push) Has been cancelled
Signals Reachability Scoring & Events / reachability-smoke (push) Has been cancelled
Signals Reachability Scoring & Events / sign-and-upload (push) Has been cancelled
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Reachability Corpus Validation / validate-corpus (push) Has been cancelled
Reachability Corpus Validation / validate-ground-truths (push) Has been cancelled
Scanner Analyzers / Discover Analyzers (push) Has been cancelled
Scanner Analyzers / Validate Test Fixtures (push) Has been cancelled
Reachability Corpus Validation / determinism-check (push) Has been cancelled
Scanner Analyzers / Build Analyzers (push) Has been cancelled
Scanner Analyzers / Test Language Analyzers (push) Has been cancelled
Scanner Analyzers / Verify Deterministic Output (push) Has been cancelled
Notify Smoke Test / Notify Unit Tests (push) Has been cancelled
Notify Smoke Test / Notifier Service Tests (push) Has been cancelled
Notify Smoke Test / Notification Smoke Test (push) Has been cancelled
Policy Lint & Smoke / policy-lint (push) Has been cancelled
up
2025-12-14 15:50:38 +02:00

334 lines
10 KiB
Python

#!/usr/bin/env python3
# SPDX-License-Identifier: AGPL-3.0-or-later
# BENCH-AUTO-401-019: Offline VEX proof bundle verifier
"""
Offline verification of VEX proof bundles without network access.
Validates:
- DSSE envelope structure
- Payload type and format
- Evidence hash references
- Justification catalog membership
- CAS hash verification
Usage:
python bench/tools/verify.py --bundle PATH [--cas-root PATH] [--catalog PATH]
"""
import argparse
import base64
import hashlib
import json
import sys
from pathlib import Path
from typing import Any
class VerificationResult:
"""Result of a verification check."""
def __init__(self, passed: bool, message: str, details: str = ""):
self.passed = passed
self.message = message
self.details = details
def __str__(self):
status = "\033[0;32m✓\033[0m" if self.passed else "\033[0;31m✗\033[0m"
result = f"{status} {self.message}"
if self.details:
result += f"\n {self.details}"
return result
def sha256_hex(data: bytes) -> str:
"""Compute SHA-256 hash."""
return hashlib.sha256(data).hexdigest()
def blake3_hex(data: bytes) -> str:
"""Compute BLAKE3-256 hash (fallback to SHA-256)."""
try:
import blake3
return "blake3:" + blake3.blake3(data).hexdigest()
except ImportError:
return "sha256:" + sha256_hex(data)
def load_json(path: Path) -> dict | None:
"""Load JSON file."""
try:
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError) as e:
return None
def verify_dsse_structure(dsse: dict) -> list[VerificationResult]:
"""Verify DSSE envelope structure."""
results = []
# Check required fields
if "payloadType" not in dsse:
results.append(VerificationResult(False, "Missing payloadType"))
else:
results.append(VerificationResult(True, f"payloadType: {dsse['payloadType']}"))
if "payload" not in dsse:
results.append(VerificationResult(False, "Missing payload"))
else:
results.append(VerificationResult(True, "payload present"))
if "signatures" not in dsse or not dsse["signatures"]:
results.append(VerificationResult(False, "Missing or empty signatures"))
else:
sig_count = len(dsse["signatures"])
results.append(VerificationResult(True, f"Found {sig_count} signature(s)"))
# Check for placeholder signatures
for i, sig in enumerate(dsse["signatures"]):
sig_value = sig.get("sig", "")
if sig_value.startswith("PLACEHOLDER"):
results.append(VerificationResult(
False,
f"Signature {i} is placeholder",
"Bundle needs actual signing before deployment"
))
else:
keyid = sig.get("keyid", "unknown")
results.append(VerificationResult(True, f"Signature {i} keyid: {keyid}"))
return results
def decode_payload(dsse: dict) -> tuple[dict | None, list[VerificationResult]]:
"""Decode DSSE payload."""
results = []
payload_b64 = dsse.get("payload", "")
if not payload_b64:
results.append(VerificationResult(False, "Empty payload"))
return None, results
try:
payload_bytes = base64.b64decode(payload_b64)
payload = json.loads(payload_bytes)
results.append(VerificationResult(True, "Payload decoded successfully"))
return payload, results
except Exception as e:
results.append(VerificationResult(False, f"Failed to decode payload: {e}"))
return None, results
def verify_openvex(payload: dict) -> list[VerificationResult]:
"""Verify OpenVEX document structure."""
results = []
# Check OpenVEX context
context = payload.get("@context", "")
if "openvex" in context.lower():
results.append(VerificationResult(True, f"OpenVEX context: {context}"))
else:
results.append(VerificationResult(False, f"Unexpected context: {context}"))
# Check statements
statements = payload.get("statements", [])
if not statements:
results.append(VerificationResult(False, "No VEX statements"))
else:
results.append(VerificationResult(True, f"Contains {len(statements)} statement(s)"))
for i, stmt in enumerate(statements):
vuln = stmt.get("vulnerability", {})
vuln_id = vuln.get("name", vuln.get("@id", "unknown"))
status = stmt.get("status", "unknown")
results.append(VerificationResult(
True,
f"Statement {i}: {vuln_id} -> {status}"
))
return results
def verify_evidence_hashes(payload: dict, cas_root: Path | None) -> list[VerificationResult]:
"""Verify evidence hash references against CAS."""
results = []
statements = payload.get("statements", [])
for stmt in statements:
impact = stmt.get("impact_statement", "")
if "Evidence hash:" in impact:
hash_value = impact.split("Evidence hash:")[1].strip()
results.append(VerificationResult(True, f"Evidence hash: {hash_value[:16]}..."))
# Verify against CAS if root provided
if cas_root and cas_root.exists():
# Look for reachability.json in CAS
reach_file = cas_root / "reachability.json"
if reach_file.exists():
with open(reach_file, 'rb') as f:
content = f.read()
actual_hash = blake3_hex(content)
if actual_hash == hash_value or hash_value in actual_hash:
results.append(VerificationResult(True, "Evidence hash matches CAS"))
else:
results.append(VerificationResult(
False,
"Evidence hash mismatch",
f"Expected: {hash_value[:32]}..., Got: {actual_hash[:32]}..."
))
return results
def verify_catalog_membership(payload: dict, catalog_path: Path) -> list[VerificationResult]:
"""Verify justification is in catalog."""
results = []
if not catalog_path.exists():
results.append(VerificationResult(False, f"Catalog not found: {catalog_path}"))
return results
catalog = load_json(catalog_path)
if catalog is None:
results.append(VerificationResult(False, "Failed to load catalog"))
return results
# Extract catalog entries
entries = catalog if isinstance(catalog, list) else catalog.get("entries", [])
catalog_ids = {e.get("id", "") for e in entries}
# Check each statement's justification
statements = payload.get("statements", [])
for stmt in statements:
justification = stmt.get("justification")
if justification:
if justification in catalog_ids:
results.append(VerificationResult(
True,
f"Justification '{justification}' in catalog"
))
else:
results.append(VerificationResult(
False,
f"Justification '{justification}' not in catalog"
))
return results
def main():
parser = argparse.ArgumentParser(
description="Offline VEX proof bundle verifier"
)
parser.add_argument(
"--bundle",
type=Path,
required=True,
help="Path to DSSE bundle file"
)
parser.add_argument(
"--cas-root",
type=Path,
default=None,
help="Path to CAS evidence directory"
)
parser.add_argument(
"--catalog",
type=Path,
default=Path("docs/benchmarks/vex-justifications.catalog.json"),
help="Path to justification catalog"
)
args = parser.parse_args()
# Resolve paths
repo_root = Path(__file__).parent.parent.parent
bundle_path = args.bundle if args.bundle.is_absolute() else repo_root / args.bundle
catalog_path = args.catalog if args.catalog.is_absolute() else repo_root / args.catalog
cas_root = args.cas_root if args.cas_root and args.cas_root.is_absolute() else (
repo_root / args.cas_root if args.cas_root else None
)
print(f"Verifying: {bundle_path}")
print("")
all_results = []
passed = 0
failed = 0
# Load DSSE bundle
dsse = load_json(bundle_path)
if dsse is None:
print("\033[0;31m✗\033[0m Failed to load bundle")
return 1
# Verify DSSE structure
print("DSSE Structure:")
results = verify_dsse_structure(dsse)
for r in results:
print(f" {r}")
if r.passed:
passed += 1
else:
failed += 1
all_results.extend(results)
# Decode payload
print("\nPayload:")
payload, results = decode_payload(dsse)
for r in results:
print(f" {r}")
if r.passed:
passed += 1
else:
failed += 1
all_results.extend(results)
if payload:
# Verify OpenVEX structure
payload_type = dsse.get("payloadType", "")
if "openvex" in payload_type.lower():
print("\nOpenVEX:")
results = verify_openvex(payload)
for r in results:
print(f" {r}")
if r.passed:
passed += 1
else:
failed += 1
all_results.extend(results)
# Verify evidence hashes
print("\nEvidence:")
results = verify_evidence_hashes(payload, cas_root)
for r in results:
print(f" {r}")
if r.passed:
passed += 1
else:
failed += 1
all_results.extend(results)
# Verify catalog membership
print("\nCatalog:")
results = verify_catalog_membership(payload, catalog_path)
for r in results:
print(f" {r}")
if r.passed:
passed += 1
else:
failed += 1
all_results.extend(results)
# Summary
print(f"\n{'='*40}")
print(f"Passed: {passed}, Failed: {failed}")
return 0 if failed == 0 else 1
if __name__ == "__main__":
sys.exit(main())