#!/usr/bin/env python3 """Automate mirror bundle manifest + checksums with dual-control approvals. Implements DEVOPS-AIRGAP-57-001. Features: - Deterministic manifest (`mirror-bundle-manifest.json`) with sha256/size per file. - `checksums.sha256` for quick verification. - Dual-control approvals recorded via `--approver` (min 2 required to mark approved). - Optional cosign signing of the manifest via `--cosign-key` (sign-blob); writes `mirror-bundle-manifest.sig` and `mirror-bundle-manifest.pem` when available. - Offline-friendly: purely local file reads; no network access. Usage: build_mirror_bundle.py --root /path/to/bundles --output out/mirror \ --approver alice@example.com --approver bob@example.com build_mirror_bundle.py --self-test """ from __future__ import annotations import argparse import hashlib import json import os import shutil import subprocess import sys from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional def sha256_file(path: Path) -> Dict[str, int | str]: h = hashlib.sha256() size = 0 with path.open("rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): h.update(chunk) size += len(chunk) return {"sha256": h.hexdigest(), "size": size} def find_files(root: Path) -> List[Path]: files: List[Path] = [] for p in sorted(root.rglob("*")): if p.is_file(): files.append(p) return files def write_checksums(items: List[Dict], output_dir: Path) -> None: lines = [f"{item['sha256']} {item['path']}" for item in items] (output_dir / "checksums.sha256").write_text("\n".join(lines) + ("\n" if lines else ""), encoding="utf-8") def maybe_sign(manifest_path: Path, key: Optional[str]) -> Dict[str, str]: if not key: return {"status": "skipped", "reason": "no key provided"} if shutil.which("cosign") is None: return {"status": "skipped", "reason": "cosign not found"} sig = manifest_path.with_suffix(manifest_path.suffix + ".sig") pem = manifest_path.with_suffix(manifest_path.suffix + ".pem") try: subprocess.run( ["cosign", "sign-blob", "--key", key, "--output-signature", str(sig), "--output-certificate", str(pem), str(manifest_path)], check=True, capture_output=True, text=True, ) return { "status": "signed", "signature": sig.name, "certificate": pem.name, } except subprocess.CalledProcessError as exc: # pragma: no cover return {"status": "failed", "reason": exc.stderr or str(exc)} def build_manifest(root: Path, output_dir: Path, approvers: List[str], cosign_key: Optional[str]) -> Dict: files = find_files(root) items: List[Dict] = [] for p in files: rel = p.relative_to(root).as_posix() info = sha256_file(p) items.append({"path": rel, **info}) manifest = { "created": datetime.now(timezone.utc).isoformat(), "root": str(root), "total": len(items), "items": items, "approvals": sorted(set(approvers)), "approvalStatus": "approved" if len(set(approvers)) >= 2 else "pending", } output_dir.mkdir(parents=True, exist_ok=True) manifest_path = output_dir / "mirror-bundle-manifest.json" manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") write_checksums(items, output_dir) signing = maybe_sign(manifest_path, cosign_key) manifest["signing"] = signing # Persist signing status in manifest for traceability manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") return manifest def parse_args(argv: List[str]) -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--root", type=Path, help="Root directory containing bundle files") parser.add_argument("--output", type=Path, help="Output directory for manifest + checksums") parser.add_argument("--approver", action="append", default=[], help="Approver identity (email or handle); provide twice for dual-control") parser.add_argument("--cosign-key", help="Path or KMS URI for cosign signing key (optional)") parser.add_argument("--self-test", action="store_true", help="Run internal self-test and exit") return parser.parse_args(argv) def self_test() -> int: import tempfile with tempfile.TemporaryDirectory() as tmp: tmpdir = Path(tmp) root = tmpdir / "bundles" root.mkdir() (root / "a.txt").write_text("hello", encoding="utf-8") (root / "b.bin").write_bytes(b"world") out = tmpdir / "out" manifest = build_manifest(root, out, ["alice", "bob"], cosign_key=None) assert manifest["approvalStatus"] == "approved" assert (out / "mirror-bundle-manifest.json").exists() assert (out / "checksums.sha256").exists() print("self-test passed") return 0 def main(argv: List[str]) -> int: args = parse_args(argv) if args.self_test: return self_test() if not (args.root and args.output): print("--root and --output are required unless --self-test", file=sys.stderr) return 2 manifest = build_manifest(args.root.resolve(), args.output.resolve(), args.approver, args.cosign_key) if manifest["approvalStatus"] != "approved": print("Manifest generated but approvalStatus=pending (need >=2 distinct approvers).", file=sys.stderr) return 1 missing = [i for i in manifest["items"] if not (args.root / i["path"]).exists()] if missing: print(f"Missing files in manifest: {missing}", file=sys.stderr) return 1 print(f"Mirror bundle manifest written to {args.output}") return 0 if __name__ == "__main__": # pragma: no cover sys.exit(main(sys.argv[1:]))