Files
git.stella-ops.org/devops/offline/airgap/build_mirror_bundle.py
2025-12-26 18:11:06 +02:00

155 lines
5.8 KiB
Python

#!/usr/bin/env python3
"""Automate mirror bundle manifest + checksums with dual-control approvals.
Implements DEVOPS-AIRGAP-57-001.
Features:
- Deterministic manifest (`mirror-bundle-manifest.json`) with sha256/size per file.
- `checksums.sha256` for quick verification.
- Dual-control approvals recorded via `--approver` (min 2 required to mark approved).
- Optional cosign signing of the manifest via `--cosign-key` (sign-blob); writes
`mirror-bundle-manifest.sig` and `mirror-bundle-manifest.pem` when available.
- Offline-friendly: purely local file reads; no network access.
Usage:
build_mirror_bundle.py --root /path/to/bundles --output out/mirror \
--approver alice@example.com --approver bob@example.com
build_mirror_bundle.py --self-test
"""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import shutil
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional
def sha256_file(path: Path) -> Dict[str, int | str]:
h = hashlib.sha256()
size = 0
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
size += len(chunk)
return {"sha256": h.hexdigest(), "size": size}
def find_files(root: Path) -> List[Path]:
files: List[Path] = []
for p in sorted(root.rglob("*")):
if p.is_file():
files.append(p)
return files
def write_checksums(items: List[Dict], output_dir: Path) -> None:
lines = [f"{item['sha256']} {item['path']}" for item in items]
(output_dir / "checksums.sha256").write_text("\n".join(lines) + ("\n" if lines else ""), encoding="utf-8")
def maybe_sign(manifest_path: Path, key: Optional[str]) -> Dict[str, str]:
if not key:
return {"status": "skipped", "reason": "no key provided"}
if shutil.which("cosign") is None:
return {"status": "skipped", "reason": "cosign not found"}
sig = manifest_path.with_suffix(manifest_path.suffix + ".sig")
pem = manifest_path.with_suffix(manifest_path.suffix + ".pem")
try:
subprocess.run(
["cosign", "sign-blob", "--key", key, "--output-signature", str(sig), "--output-certificate", str(pem), str(manifest_path)],
check=True,
capture_output=True,
text=True,
)
return {
"status": "signed",
"signature": sig.name,
"certificate": pem.name,
}
except subprocess.CalledProcessError as exc: # pragma: no cover
return {"status": "failed", "reason": exc.stderr or str(exc)}
def build_manifest(root: Path, output_dir: Path, approvers: List[str], cosign_key: Optional[str]) -> Dict:
files = find_files(root)
items: List[Dict] = []
for p in files:
rel = p.relative_to(root).as_posix()
info = sha256_file(p)
items.append({"path": rel, **info})
manifest = {
"created": datetime.now(timezone.utc).isoformat(),
"root": str(root),
"total": len(items),
"items": items,
"approvals": sorted(set(approvers)),
"approvalStatus": "approved" if len(set(approvers)) >= 2 else "pending",
}
output_dir.mkdir(parents=True, exist_ok=True)
manifest_path = output_dir / "mirror-bundle-manifest.json"
manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
write_checksums(items, output_dir)
signing = maybe_sign(manifest_path, cosign_key)
manifest["signing"] = signing
# Persist signing status in manifest for traceability
manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
return manifest
def parse_args(argv: List[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--root", type=Path, help="Root directory containing bundle files")
parser.add_argument("--output", type=Path, help="Output directory for manifest + checksums")
parser.add_argument("--approver", action="append", default=[], help="Approver identity (email or handle); provide twice for dual-control")
parser.add_argument("--cosign-key", help="Path or KMS URI for cosign signing key (optional)")
parser.add_argument("--self-test", action="store_true", help="Run internal self-test and exit")
return parser.parse_args(argv)
def self_test() -> int:
import tempfile
with tempfile.TemporaryDirectory() as tmp:
tmpdir = Path(tmp)
root = tmpdir / "bundles"
root.mkdir()
(root / "a.txt").write_text("hello", encoding="utf-8")
(root / "b.bin").write_bytes(b"world")
out = tmpdir / "out"
manifest = build_manifest(root, out, ["alice", "bob"], cosign_key=None)
assert manifest["approvalStatus"] == "approved"
assert (out / "mirror-bundle-manifest.json").exists()
assert (out / "checksums.sha256").exists()
print("self-test passed")
return 0
def main(argv: List[str]) -> int:
args = parse_args(argv)
if args.self_test:
return self_test()
if not (args.root and args.output):
print("--root and --output are required unless --self-test", file=sys.stderr)
return 2
manifest = build_manifest(args.root.resolve(), args.output.resolve(), args.approver, args.cosign_key)
if manifest["approvalStatus"] != "approved":
print("Manifest generated but approvalStatus=pending (need >=2 distinct approvers).", file=sys.stderr)
return 1
missing = [i for i in manifest["items"] if not (args.root / i["path"]).exists()]
if missing:
print(f"Missing files in manifest: {missing}", file=sys.stderr)
return 1
print(f"Mirror bundle manifest written to {args.output}")
return 0
if __name__ == "__main__": # pragma: no cover
sys.exit(main(sys.argv[1:]))