- Added `FilesystemPackRunProvenanceWriter` to write provenance manifests to the filesystem. - Introduced `MongoPackRunArtifactReader` to read artifacts from MongoDB. - Created `MongoPackRunProvenanceWriter` to store provenance manifests in MongoDB. - Developed unit tests for filesystem and MongoDB provenance writers. - Established `ITimelineEventStore` and `ITimelineIngestionService` interfaces for timeline event handling. - Implemented `TimelineIngestionService` to validate and persist timeline events with hashing. - Created PostgreSQL schema and migration scripts for timeline indexing. - Added dependency injection support for timeline indexer services. - Developed tests for timeline ingestion and schema validation.
175 lines
5.8 KiB
Python
175 lines
5.8 KiB
Python
#!/usr/bin/env python3
|
|
"""Build a deterministic Bootstrap Pack bundle for sealed/offline transfer.
|
|
|
|
- Reads a JSON config listing artefacts to include (images, Helm charts, extras).
|
|
- Copies artefacts into an output directory with preserved basenames.
|
|
- Generates `bootstrap-manifest.json` and `checksums.sha256` with sha256 hashes
|
|
and sizes for evidence/verification.
|
|
- Intended to satisfy DEVOPS-AIRGAP-56-003.
|
|
|
|
Config schema (JSON):
|
|
{
|
|
"name": "bootstrap-pack",
|
|
"images": ["release/containers/taskrunner.tar", "release/containers/orchestrator.tar"],
|
|
"charts": ["deploy/helm/stella.tgz"],
|
|
"extras": ["docs/24_OFFLINE_KIT.md"]
|
|
}
|
|
|
|
Usage:
|
|
build_bootstrap_pack.py --config bootstrap.json --output out/bootstrap-pack
|
|
build_bootstrap_pack.py --self-test
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import shutil
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Dict, List, Tuple
|
|
|
|
DEFAULT_NAME = "bootstrap-pack"
|
|
|
|
|
|
def sha256_file(path: Path) -> Tuple[str, int]:
|
|
h = hashlib.sha256()
|
|
size = 0
|
|
with path.open("rb") as f:
|
|
for chunk in iter(lambda: f.read(1024 * 1024), b""):
|
|
h.update(chunk)
|
|
size += len(chunk)
|
|
return h.hexdigest(), size
|
|
|
|
|
|
def load_config(path: Path) -> Dict:
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
cfg = json.load(handle)
|
|
if not isinstance(cfg, dict):
|
|
raise ValueError("config must be a JSON object")
|
|
return cfg
|
|
|
|
|
|
def ensure_list(cfg: Dict, key: str) -> List[str]:
|
|
value = cfg.get(key, [])
|
|
if value is None:
|
|
return []
|
|
if not isinstance(value, list):
|
|
raise ValueError(f"config.{key} must be a list")
|
|
return [str(x) for x in value]
|
|
|
|
|
|
def copy_item(src: Path, dest_root: Path, rel_dir: str) -> Tuple[str, str, int]:
|
|
dest_dir = dest_root / rel_dir
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
dest_path = dest_dir / src.name
|
|
shutil.copy2(src, dest_path)
|
|
digest, size = sha256_file(dest_path)
|
|
rel_path = dest_path.relative_to(dest_root).as_posix()
|
|
return rel_path, digest, size
|
|
|
|
|
|
def build_pack(config_path: Path, output_dir: Path) -> Dict:
|
|
cfg = load_config(config_path)
|
|
name = cfg.get("name", DEFAULT_NAME)
|
|
images = ensure_list(cfg, "images")
|
|
charts = ensure_list(cfg, "charts")
|
|
extras = ensure_list(cfg, "extras")
|
|
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
items = []
|
|
|
|
def process_list(paths: List[str], kind: str, rel_dir: str):
|
|
for raw in sorted(paths):
|
|
src = Path(raw).expanduser().resolve()
|
|
if not src.exists():
|
|
items.append({
|
|
"type": kind,
|
|
"source": raw,
|
|
"status": "missing"
|
|
})
|
|
continue
|
|
rel_path, digest, size = copy_item(src, output_dir, rel_dir)
|
|
items.append({
|
|
"type": kind,
|
|
"source": raw,
|
|
"path": rel_path,
|
|
"sha256": digest,
|
|
"size": size,
|
|
"status": "ok",
|
|
})
|
|
|
|
process_list(images, "image", "images")
|
|
process_list(charts, "chart", "charts")
|
|
process_list(extras, "extra", "extras")
|
|
|
|
manifest = {
|
|
"name": name,
|
|
"created": datetime.now(timezone.utc).isoformat(),
|
|
"items": items,
|
|
}
|
|
|
|
# checksums file (only for ok items)
|
|
checksum_lines = [f"{item['sha256']} {item['path']}" for item in items if item.get("status") == "ok"]
|
|
(output_dir / "checksums.sha256").write_text("\n".join(checksum_lines) + ("\n" if checksum_lines else ""), encoding="utf-8")
|
|
(output_dir / "bootstrap-manifest.json").write_text(json.dumps(manifest, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
|
|
return manifest
|
|
|
|
|
|
def parse_args(argv: List[str]) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--config", type=Path, help="Path to bootstrap pack config JSON")
|
|
parser.add_argument("--output", type=Path, help="Output directory for the pack")
|
|
parser.add_argument("--self-test", action="store_true", help="Run internal self-test and exit")
|
|
return parser.parse_args(argv)
|
|
|
|
|
|
def self_test() -> int:
|
|
import tempfile
|
|
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmpdir = Path(tmp)
|
|
files = []
|
|
for name, content in [("img1.tar", b"image-one"), ("chart1.tgz", b"chart-one"), ("readme.txt", b"hello")]:
|
|
p = tmpdir / name
|
|
p.write_bytes(content)
|
|
files.append(p)
|
|
cfg = {
|
|
"images": [str(files[0])],
|
|
"charts": [str(files[1])],
|
|
"extras": [str(files[2])],
|
|
}
|
|
cfg_path = tmpdir / "bootstrap.json"
|
|
cfg_path.write_text(json.dumps(cfg), encoding="utf-8")
|
|
outdir = tmpdir / "out"
|
|
manifest = build_pack(cfg_path, outdir)
|
|
assert all(item.get("status") == "ok" for item in manifest["items"]), manifest
|
|
for rel in ["images/img1.tar", "charts/chart1.tgz", "extras/readme.txt", "checksums.sha256", "bootstrap-manifest.json"]:
|
|
assert (outdir / rel).exists(), f"missing {rel}"
|
|
print("self-test passed")
|
|
return 0
|
|
|
|
|
|
def main(argv: List[str]) -> int:
|
|
args = parse_args(argv)
|
|
if args.self_test:
|
|
return self_test()
|
|
if not (args.config and args.output):
|
|
print("--config and --output are required unless --self-test", file=sys.stderr)
|
|
return 2
|
|
manifest = build_pack(args.config, args.output)
|
|
missing = [i for i in manifest["items"] if i.get("status") == "missing"]
|
|
if missing:
|
|
print("Pack built with missing items:")
|
|
for item in missing:
|
|
print(f" - {item['source']}")
|
|
return 1
|
|
print(f"Bootstrap pack written to {args.output}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__": # pragma: no cover
|
|
sys.exit(main(sys.argv[1:]))
|