#!/usr/bin/env python3 """ Verifier for mirror-thin-v1 artefacts and bundle meta. Checks: 1) SHA256 of manifest/tarball (and optional bundle meta) matches sidecars. 2) Manifest schema contains required fields and required layer files exist. 3) Tarball headers deterministic (sorted paths, uid/gid=0, mtime=0). 4) Tar contents match manifest digests. 5) Optional: verify DSSE signatures for manifest/bundle when a public key is provided. 6) Optional: validate bundle meta (tenant/env scope, policy hashes, gap coverage counts). Usage: python scripts/mirror/verify_thin_bundle.py \ out/mirror/thin/mirror-thin-v1.manifest.json \ out/mirror/thin/mirror-thin-v1.tar.gz \ --bundle-meta out/mirror/thin/mirror-thin-v1.bundle.json \ --pubkey out/mirror/thin/tuf/keys/ci-ed25519.pub \ --tenant tenant-demo --environment lab Exit code 0 on success; non-zero on any check failure. """ import argparse import base64 import hashlib import json import pathlib import sys import tarfile from typing import Optional try: from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey CRYPTO_AVAILABLE = True except ImportError: # pragma: no cover - surfaced as runtime guidance CRYPTO_AVAILABLE = False REQUIRED_FIELDS = ["version", "created", "layers", "indexes"] REQUIRED_LAYER_FILES = { "layers/observations.ndjson", "layers/time-anchor.json", "layers/transport-plan.json", "layers/rekor-policy.json", "layers/mirror-policy.json", "layers/offline-kit-policy.json", "layers/artifact-hashes.json", "indexes/observations.index", } def _b64url_decode(data: str) -> bytes: padding = "=" * (-len(data) % 4) return base64.urlsafe_b64decode(data + padding) def sha256_file(path: pathlib.Path) -> str: h = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(8192), b""): h.update(chunk) return h.hexdigest() def load_sha256_sidecar(path: pathlib.Path) -> str: sidecar = path.with_suffix(path.suffix + ".sha256") if not sidecar.exists(): raise SystemExit(f"missing sidecar {sidecar}") return sidecar.read_text().strip().split()[0] def check_schema(manifest: dict): missing = [f for f in REQUIRED_FIELDS if f not in manifest] if missing: raise SystemExit(f"manifest missing fields: {missing}") def normalize(name: str) -> str: return name[2:] if name.startswith("./") else name def check_tar_determinism(tar_path: pathlib.Path): with tarfile.open(tar_path, "r:gz") as tf: names = [normalize(n) for n in tf.getnames()] if names != sorted(names): raise SystemExit("tar entries not sorted") for m in tf.getmembers(): if m.uid != 0 or m.gid != 0: raise SystemExit(f"tar header uid/gid not zero for {m.name}") if m.mtime != 0: raise SystemExit(f"tar header mtime not zero for {m.name}") def check_required_layers(tar_path: pathlib.Path): with tarfile.open(tar_path, "r:gz") as tf: names = {normalize(n) for n in tf.getnames()} for required in REQUIRED_LAYER_FILES: if required not in names: raise SystemExit(f"required file missing from bundle: {required}") def check_content_hashes(manifest: dict, tar_path: pathlib.Path): with tarfile.open(tar_path, "r:gz") as tf: def get(name: str): try: return tf.getmember(name) except KeyError: return tf.getmember(f"./{name}") for layer in manifest.get("layers", []): name = layer["path"] info = get(name) data = tf.extractfile(info).read() digest = hashlib.sha256(data).hexdigest() if layer["digest"] != f"sha256:{digest}": raise SystemExit(f"layer digest mismatch {name}: {digest}") for idx in manifest.get("indexes", []): name = idx['name'] if not name.startswith("indexes/"): name = f"indexes/{name}" info = get(name) data = tf.extractfile(info).read() digest = hashlib.sha256(data).hexdigest() if idx["digest"] != f"sha256:{digest}": raise SystemExit(f"index digest mismatch {name}: {digest}") def load_pubkey(path: pathlib.Path) -> Ed25519PublicKey: if not CRYPTO_AVAILABLE: raise SystemExit("cryptography is required for DSSE verification; install before using --pubkey") return serialization.load_pem_public_key(path.read_bytes()) def verify_dsse(dsse_path: pathlib.Path, pubkey_path: pathlib.Path, expected_payload: pathlib.Path, expected_type: str): dsse_obj = json.loads(dsse_path.read_text()) if dsse_obj.get("payloadType") != expected_type: raise SystemExit(f"DSSE payloadType mismatch for {dsse_path}") payload = _b64url_decode(dsse_obj.get("payload", "")) if payload != expected_payload.read_bytes(): raise SystemExit(f"DSSE payload mismatch for {dsse_path}") sigs = dsse_obj.get("signatures") or [] if not sigs: raise SystemExit(f"DSSE missing signatures: {dsse_path}") pub = load_pubkey(pubkey_path) try: pub.verify(_b64url_decode(sigs[0]["sig"]), payload) except Exception as exc: # pragma: no cover - cryptography raises InvalidSignature raise SystemExit(f"DSSE signature verification failed for {dsse_path}: {exc}") def check_bundle_meta(meta_path: pathlib.Path, manifest_path: pathlib.Path, tar_path: pathlib.Path, tenant: Optional[str], environment: Optional[str]): meta = json.loads(meta_path.read_text()) for field in ["bundle", "version", "artifacts", "gaps", "tooling"]: if field not in meta: raise SystemExit(f"bundle meta missing field {field}") if tenant and meta.get("tenant") != tenant: raise SystemExit(f"bundle tenant mismatch: {meta.get('tenant')} != {tenant}") if environment and meta.get("environment") != environment: raise SystemExit(f"bundle environment mismatch: {meta.get('environment')} != {environment}") artifacts = meta["artifacts"] def expect(name: str, path: pathlib.Path): recorded = artifacts.get(name) if not recorded: raise SystemExit(f"bundle meta missing artifact entry: {name}") expected = recorded.get("sha256") if expected and expected != sha256_file(path): raise SystemExit(f"bundle meta digest mismatch for {name}") expect("manifest", manifest_path) expect("tarball", tar_path) for extra in ["time_anchor", "transport_plan", "rekor_policy", "mirror_policy", "offline_policy", "artifact_hashes"]: rec = artifacts.get(extra) if not rec: raise SystemExit(f"bundle meta missing artifact entry: {extra}") if not rec.get("path"): raise SystemExit(f"bundle meta missing path for {extra}") for group, expected_count in [("ok", 10), ("rk", 10), ("ms", 10)]: if len(meta.get("gaps", {}).get(group, [])) != expected_count: raise SystemExit(f"bundle meta gaps.{group} expected {expected_count} entries") root_guess = manifest_path.parents[3] if len(manifest_path.parents) > 3 else manifest_path.parents[-1] tool_expectations = { 'make_thin_v1_sh': root_guess / 'src' / 'Mirror' / 'StellaOps.Mirror.Creator' / 'make-thin-v1.sh', 'sign_script': root_guess / 'scripts' / 'mirror' / 'sign_thin_bundle.py', 'verify_script': root_guess / 'scripts' / 'mirror' / 'verify_thin_bundle.py', 'verify_oci': root_guess / 'scripts' / 'mirror' / 'verify_oci_layout.py' } for key, path in tool_expectations.items(): recorded = meta['tooling'].get(key) if not recorded: raise SystemExit(f"tool hash missing for {key}") actual = sha256_file(path) if recorded != actual: raise SystemExit(f"tool hash mismatch for {key}") if meta.get("checkpoint_freshness_seconds", 0) <= 0: raise SystemExit("checkpoint_freshness_seconds must be positive") def main(): parser = argparse.ArgumentParser() parser.add_argument("manifest", type=pathlib.Path) parser.add_argument("tar", type=pathlib.Path) parser.add_argument("--bundle-meta", type=pathlib.Path) parser.add_argument("--pubkey", type=pathlib.Path) parser.add_argument("--tenant", type=str) parser.add_argument("--environment", type=str) args = parser.parse_args() manifest_path = args.manifest tar_path = args.tar bundle_meta = args.bundle_meta bundle_dsse = bundle_meta.with_suffix(".dsse.json") if bundle_meta else None manifest_dsse = manifest_path.with_suffix(".dsse.json") man_expected = load_sha256_sidecar(manifest_path) tar_expected = load_sha256_sidecar(tar_path) if sha256_file(manifest_path) != man_expected: raise SystemExit("manifest sha256 mismatch") if sha256_file(tar_path) != tar_expected: raise SystemExit("tarball sha256 mismatch") manifest = json.loads(manifest_path.read_text()) check_schema(manifest) check_tar_determinism(tar_path) check_required_layers(tar_path) check_content_hashes(manifest, tar_path) if bundle_meta: if not bundle_meta.exists(): raise SystemExit(f"bundle meta missing: {bundle_meta}") meta_expected = load_sha256_sidecar(bundle_meta) if sha256_file(bundle_meta) != meta_expected: raise SystemExit("bundle meta sha256 mismatch") check_bundle_meta(bundle_meta, manifest_path, tar_path, args.tenant, args.environment) if args.pubkey: pubkey = args.pubkey if manifest_dsse.exists(): verify_dsse(manifest_dsse, pubkey, manifest_path, "application/vnd.stellaops.mirror.manifest+json") if bundle_dsse and bundle_dsse.exists(): verify_dsse(bundle_dsse, pubkey, bundle_meta, "application/vnd.stellaops.mirror.bundle+json") print("OK: mirror-thin bundle verified") if __name__ == "__main__": main()