Files
git.stella-ops.org/ops/devops/airgap/build_mirror_bundle.py
StellaOps Bot 71e9a56cfd
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
Airgap Sealed CI Smoke / sealed-smoke (push) Has been cancelled
Export Center CI / export-ci (push) Has been cancelled
Signals CI & Image / signals-ci (push) Has been cancelled
feat: Add Scanner CI runner and related artifacts
- Implemented `run-scanner-ci.sh` to build and run tests for the Scanner solution with a warmed NuGet cache.
- Created `excititor-vex-traces.json` dashboard for monitoring Excititor VEX observations.
- Added Docker Compose configuration for the OTLP span sink in `docker-compose.spansink.yml`.
- Configured OpenTelemetry collector in `otel-spansink.yaml` to receive and process traces.
- Developed `run-spansink.sh` script to run the OTLP span sink for Excititor traces.
- Introduced `FileSystemRiskBundleObjectStore` for storing risk bundle artifacts in the filesystem.
- Built `RiskBundleBuilder` for creating risk bundles with associated metadata and providers.
- Established `RiskBundleJob` to execute the risk bundle creation and storage process.
- Defined models for risk bundle inputs, entries, and manifests in `RiskBundleModels.cs`.
- Implemented signing functionality for risk bundle manifests with `HmacRiskBundleManifestSigner`.
- Created unit tests for `RiskBundleBuilder`, `RiskBundleJob`, and signing functionality to ensure correctness.
- Added filesystem artifact reader tests to validate manifest parsing and artifact listing.
- Included test manifests for egress scenarios in the task runner tests.
- Developed timeline query service tests to verify tenant and event ID handling.
2025-11-30 19:12:35 +02:00

155 lines
5.8 KiB
Python

#!/usr/bin/env python3
"""Automate mirror bundle manifest + checksums with dual-control approvals.
Implements DEVOPS-AIRGAP-57-001.
Features:
- Deterministic manifest (`mirror-bundle-manifest.json`) with sha256/size per file.
- `checksums.sha256` for quick verification.
- Dual-control approvals recorded via `--approver` (min 2 required to mark approved).
- Optional cosign signing of the manifest via `--cosign-key` (sign-blob); writes
`mirror-bundle-manifest.sig` and `mirror-bundle-manifest.pem` when available.
- Offline-friendly: purely local file reads; no network access.
Usage:
build_mirror_bundle.py --root /path/to/bundles --output out/mirror \
--approver alice@example.com --approver bob@example.com
build_mirror_bundle.py --self-test
"""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import shutil
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional
def sha256_file(path: Path) -> Dict[str, int | str]:
h = hashlib.sha256()
size = 0
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
size += len(chunk)
return {"sha256": h.hexdigest(), "size": size}
def find_files(root: Path) -> List[Path]:
files: List[Path] = []
for p in sorted(root.rglob("*")):
if p.is_file():
files.append(p)
return files
def write_checksums(items: List[Dict], output_dir: Path) -> None:
lines = [f"{item['sha256']} {item['path']}" for item in items]
(output_dir / "checksums.sha256").write_text("\n".join(lines) + ("\n" if lines else ""), encoding="utf-8")
def maybe_sign(manifest_path: Path, key: Optional[str]) -> Dict[str, str]:
if not key:
return {"status": "skipped", "reason": "no key provided"}
if shutil.which("cosign") is None:
return {"status": "skipped", "reason": "cosign not found"}
sig = manifest_path.with_suffix(manifest_path.suffix + ".sig")
pem = manifest_path.with_suffix(manifest_path.suffix + ".pem")
try:
subprocess.run(
["cosign", "sign-blob", "--key", key, "--output-signature", str(sig), "--output-certificate", str(pem), str(manifest_path)],
check=True,
capture_output=True,
text=True,
)
return {
"status": "signed",
"signature": sig.name,
"certificate": pem.name,
}
except subprocess.CalledProcessError as exc: # pragma: no cover
return {"status": "failed", "reason": exc.stderr or str(exc)}
def build_manifest(root: Path, output_dir: Path, approvers: List[str], cosign_key: Optional[str]) -> Dict:
files = find_files(root)
items: List[Dict] = []
for p in files:
rel = p.relative_to(root).as_posix()
info = sha256_file(p)
items.append({"path": rel, **info})
manifest = {
"created": datetime.now(timezone.utc).isoformat(),
"root": str(root),
"total": len(items),
"items": items,
"approvals": sorted(set(approvers)),
"approvalStatus": "approved" if len(set(approvers)) >= 2 else "pending",
}
output_dir.mkdir(parents=True, exist_ok=True)
manifest_path = output_dir / "mirror-bundle-manifest.json"
manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
write_checksums(items, output_dir)
signing = maybe_sign(manifest_path, cosign_key)
manifest["signing"] = signing
# Persist signing status in manifest for traceability
manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
return manifest
def parse_args(argv: List[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--root", type=Path, help="Root directory containing bundle files")
parser.add_argument("--output", type=Path, help="Output directory for manifest + checksums")
parser.add_argument("--approver", action="append", default=[], help="Approver identity (email or handle); provide twice for dual-control")
parser.add_argument("--cosign-key", help="Path or KMS URI for cosign signing key (optional)")
parser.add_argument("--self-test", action="store_true", help="Run internal self-test and exit")
return parser.parse_args(argv)
def self_test() -> int:
import tempfile
with tempfile.TemporaryDirectory() as tmp:
tmpdir = Path(tmp)
root = tmpdir / "bundles"
root.mkdir()
(root / "a.txt").write_text("hello", encoding="utf-8")
(root / "b.bin").write_bytes(b"world")
out = tmpdir / "out"
manifest = build_manifest(root, out, ["alice", "bob"], cosign_key=None)
assert manifest["approvalStatus"] == "approved"
assert (out / "mirror-bundle-manifest.json").exists()
assert (out / "checksums.sha256").exists()
print("self-test passed")
return 0
def main(argv: List[str]) -> int:
args = parse_args(argv)
if args.self_test:
return self_test()
if not (args.root and args.output):
print("--root and --output are required unless --self-test", file=sys.stderr)
return 2
manifest = build_manifest(args.root.resolve(), args.output.resolve(), args.approver, args.cosign_key)
if manifest["approvalStatus"] != "approved":
print("Manifest generated but approvalStatus=pending (need >=2 distinct approvers).", file=sys.stderr)
return 1
missing = [i for i in manifest["items"] if not (args.root / i["path"]).exists()]
if missing:
print(f"Missing files in manifest: {missing}", file=sys.stderr)
return 1
print(f"Mirror bundle manifest written to {args.output}")
return 0
if __name__ == "__main__": # pragma: no cover
sys.exit(main(sys.argv[1:]))