Files
git.stella-ops.org/ops/offline-kit/build_offline_kit.py
StellaOps Bot d63af51f84
Some checks failed
api-governance / spectral-lint (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
oas-ci / oas-validate (push) Has been cancelled
SDK Publish & Sign / sdk-publish (push) Has been cancelled
Policy Lint & Smoke / policy-lint (push) Has been cancelled
Policy Simulation / policy-simulate (push) Has been cancelled
devportal-offline / build-offline (push) Has been cancelled
up
2025-11-26 20:23:28 +02:00

581 lines
22 KiB
Python

#!/usr/bin/env python3
"""Package the StellaOps Offline Kit with deterministic artefacts and manifest."""
from __future__ import annotations
import argparse
import datetime as dt
import hashlib
import json
import os
import re
import shutil
import subprocess
import sys
import tarfile
from collections import OrderedDict
from pathlib import Path
from typing import Any, Iterable, Mapping, MutableMapping, Optional
REPO_ROOT = Path(__file__).resolve().parents[2]
RELEASE_TOOLS_DIR = REPO_ROOT / "ops" / "devops" / "release"
TELEMETRY_TOOLS_DIR = REPO_ROOT / "ops" / "devops" / "telemetry"
TELEMETRY_BUNDLE_PATH = REPO_ROOT / "out" / "telemetry" / "telemetry-offline-bundle.tar.gz"
if str(RELEASE_TOOLS_DIR) not in sys.path:
sys.path.insert(0, str(RELEASE_TOOLS_DIR))
from verify_release import ( # type: ignore import-not-found
load_manifest,
resolve_path,
verify_release,
)
import mirror_debug_store # type: ignore import-not-found
DEFAULT_RELEASE_DIR = REPO_ROOT / "out" / "release"
DEFAULT_STAGING_DIR = REPO_ROOT / "out" / "offline-kit" / "staging"
DEFAULT_OUTPUT_DIR = REPO_ROOT / "out" / "offline-kit" / "dist"
ARTIFACT_TARGETS = {
"sbom": Path("sboms"),
"provenance": Path("attest"),
"signature": Path("signatures"),
"metadata": Path("metadata/docker"),
}
class CommandError(RuntimeError):
"""Raised when an external command fails."""
def run(cmd: Iterable[str], *, cwd: Optional[Path] = None, env: Optional[Mapping[str, str]] = None) -> str:
process_env = dict(os.environ)
if env:
process_env.update(env)
result = subprocess.run(
list(cmd),
cwd=str(cwd) if cwd else None,
env=process_env,
check=False,
capture_output=True,
text=True,
)
if result.returncode != 0:
raise CommandError(
f"Command failed ({result.returncode}): {' '.join(cmd)}\nSTDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}"
)
return result.stdout
def compute_sha256(path: Path) -> str:
sha = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
sha.update(chunk)
return sha.hexdigest()
def utc_now_iso() -> str:
return dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def safe_component_name(name: str) -> str:
return re.sub(r"[^A-Za-z0-9_.-]", "-", name.strip().lower())
def clean_directory(path: Path) -> None:
if path.exists():
shutil.rmtree(path)
path.mkdir(parents=True, exist_ok=True)
def run_python_analyzer_smoke() -> None:
script = REPO_ROOT / "ops" / "offline-kit" / "run-python-analyzer-smoke.sh"
run(["bash", str(script)], cwd=REPO_ROOT)
def run_rust_analyzer_smoke() -> None:
script = REPO_ROOT / "ops" / "offline-kit" / "run-rust-analyzer-smoke.sh"
run(["bash", str(script)], cwd=REPO_ROOT)
def copy_if_exists(source: Path, target: Path) -> None:
if source.is_dir():
shutil.copytree(source, target, dirs_exist_ok=True)
elif source.is_file():
target.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source, target)
def copy_release_manifests(release_dir: Path, staging_dir: Path) -> None:
manifest_dir = staging_dir / "manifest"
manifest_dir.mkdir(parents=True, exist_ok=True)
for name in ("release.yaml", "release.yaml.sha256", "release.json", "release.json.sha256"):
source = release_dir / name
if source.exists():
shutil.copy2(source, manifest_dir / source.name)
def copy_component_artifacts(
manifest: Mapping[str, Any],
release_dir: Path,
staging_dir: Path,
) -> None:
components = manifest.get("components") or []
for component in sorted(components, key=lambda entry: str(entry.get("name", ""))):
if not isinstance(component, Mapping):
continue
component_name = safe_component_name(str(component.get("name", "component")))
for key, target_root in ARTIFACT_TARGETS.items():
entry = component.get(key)
if not entry or not isinstance(entry, Mapping):
continue
path_str = entry.get("path")
if not path_str:
continue
resolved = resolve_path(str(path_str), release_dir)
if not resolved.exists():
raise FileNotFoundError(f"Component '{component_name}' {key} artefact not found: {resolved}")
target_dir = staging_dir / target_root
target_dir.mkdir(parents=True, exist_ok=True)
target_name = f"{component_name}-{resolved.name}" if resolved.name else component_name
shutil.copy2(resolved, target_dir / target_name)
def copy_collections(
manifest: Mapping[str, Any],
release_dir: Path,
staging_dir: Path,
) -> None:
for collection, subdir in (("charts", Path("charts")), ("compose", Path("compose"))):
entries = manifest.get(collection) or []
for entry in entries:
if not isinstance(entry, Mapping):
continue
path_str = entry.get("path")
if not path_str:
continue
resolved = resolve_path(str(path_str), release_dir)
if not resolved.exists():
raise FileNotFoundError(f"{collection} artefact not found: {resolved}")
target_dir = staging_dir / subdir
target_dir.mkdir(parents=True, exist_ok=True)
shutil.copy2(resolved, target_dir / resolved.name)
def copy_debug_store(release_dir: Path, staging_dir: Path) -> None:
mirror_debug_store.main(
[
"--release-dir",
str(release_dir),
"--offline-kit-dir",
str(staging_dir),
]
)
def copy_plugins_and_assets(staging_dir: Path) -> None:
copy_if_exists(REPO_ROOT / "plugins" / "scanner", staging_dir / "plugins" / "scanner")
copy_if_exists(REPO_ROOT / "certificates", staging_dir / "certificates")
copy_if_exists(REPO_ROOT / "seed-data", staging_dir / "seed-data")
docs_dir = staging_dir / "docs"
docs_dir.mkdir(parents=True, exist_ok=True)
copy_if_exists(REPO_ROOT / "docs" / "24_OFFLINE_KIT.md", docs_dir / "24_OFFLINE_KIT.md")
copy_if_exists(REPO_ROOT / "docs" / "ops" / "telemetry-collector.md", docs_dir / "telemetry-collector.md")
copy_if_exists(REPO_ROOT / "docs" / "ops" / "telemetry-storage.md", docs_dir / "telemetry-storage.md")
copy_if_exists(REPO_ROOT / "docs" / "airgap" / "mirror-bundles.md", docs_dir / "mirror-bundles.md")
def copy_cli_and_taskrunner_assets(release_dir: Path, staging_dir: Path) -> None:
"""Bundle CLI binaries, task pack docs, and Task Runner samples when available."""
cli_src = release_dir / "cli"
if cli_src.exists():
copy_if_exists(cli_src, staging_dir / "cli")
taskrunner_bootstrap = staging_dir / "bootstrap" / "task-runner"
taskrunner_bootstrap.mkdir(parents=True, exist_ok=True)
copy_if_exists(REPO_ROOT / "etc" / "task-runner.yaml.sample", taskrunner_bootstrap / "task-runner.yaml.sample")
docs_dir = staging_dir / "docs"
copy_if_exists(REPO_ROOT / "docs" / "task-packs", docs_dir / "task-packs")
copy_if_exists(REPO_ROOT / "docs" / "modules" / "taskrunner", docs_dir / "modules" / "taskrunner")
def copy_orchestrator_assets(release_dir: Path, staging_dir: Path) -> None:
"""Copy orchestrator service, worker SDK, postgres snapshot, and dashboards when present."""
mapping = {
release_dir / "orchestrator" / "service": staging_dir / "orchestrator" / "service",
release_dir / "orchestrator" / "worker-sdk": staging_dir / "orchestrator" / "worker-sdk",
release_dir / "orchestrator" / "postgres": staging_dir / "orchestrator" / "postgres",
release_dir / "orchestrator" / "dashboards": staging_dir / "orchestrator" / "dashboards",
}
for src, dest in mapping.items():
copy_if_exists(src, dest)
def copy_export_and_notifier_assets(release_dir: Path, staging_dir: Path) -> None:
"""Copy Export Center and Notifier offline bundles and tooling when present."""
copy_if_exists(release_dir / "export-center", staging_dir / "export-center")
copy_if_exists(release_dir / "notifier", staging_dir / "notifier")
def copy_surface_secrets(release_dir: Path, staging_dir: Path) -> None:
"""Include Surface.Secrets bundles and manifests if present."""
copy_if_exists(release_dir / "surface-secrets", staging_dir / "surface-secrets")
def copy_bootstrap_configs(staging_dir: Path) -> None:
notify_config = REPO_ROOT / "etc" / "notify.airgap.yaml"
notify_secret = REPO_ROOT / "etc" / "secrets" / "notify-web-airgap.secret.example"
notify_doc = REPO_ROOT / "docs" / "modules" / "notify" / "bootstrap-pack.md"
if not notify_config.exists():
raise FileNotFoundError(f"Missing notifier air-gap config: {notify_config}")
if not notify_secret.exists():
raise FileNotFoundError(f"Missing notifier air-gap secret template: {notify_secret}")
notify_bootstrap_dir = staging_dir / "bootstrap" / "notify"
notify_bootstrap_dir.mkdir(parents=True, exist_ok=True)
copy_if_exists(REPO_ROOT / "etc" / "bootstrap" / "notify", notify_bootstrap_dir)
copy_if_exists(notify_config, notify_bootstrap_dir / "notify.yaml")
copy_if_exists(notify_secret, notify_bootstrap_dir / "notify-web.secret.example")
copy_if_exists(notify_doc, notify_bootstrap_dir / "README.md")
def verify_required_seed_data(repo_root: Path) -> None:
ruby_git_sources = repo_root / "seed-data" / "analyzers" / "ruby" / "git-sources"
if not ruby_git_sources.is_dir():
raise FileNotFoundError(f"Missing Ruby git-sources seed directory: {ruby_git_sources}")
required_files = [
ruby_git_sources / "Gemfile.lock",
ruby_git_sources / "expected.json",
]
for path in required_files:
if not path.exists():
raise FileNotFoundError(f"Offline kit seed artefact missing: {path}")
def copy_third_party_licenses(staging_dir: Path) -> None:
licenses_src = REPO_ROOT / "third-party-licenses"
if not licenses_src.is_dir():
return
target_dir = staging_dir / "third-party-licenses"
target_dir.mkdir(parents=True, exist_ok=True)
entries = sorted(licenses_src.iterdir(), key=lambda entry: entry.name.lower())
for entry in entries:
if entry.is_dir():
shutil.copytree(entry, target_dir / entry.name, dirs_exist_ok=True)
elif entry.is_file():
shutil.copy2(entry, target_dir / entry.name)
def package_telemetry_bundle(staging_dir: Path) -> None:
script = TELEMETRY_TOOLS_DIR / "package_offline_bundle.py"
if not script.exists():
return
TELEMETRY_BUNDLE_PATH.parent.mkdir(parents=True, exist_ok=True)
run(["python", str(script), "--output", str(TELEMETRY_BUNDLE_PATH)], cwd=REPO_ROOT)
telemetry_dir = staging_dir / "telemetry"
telemetry_dir.mkdir(parents=True, exist_ok=True)
shutil.copy2(TELEMETRY_BUNDLE_PATH, telemetry_dir / TELEMETRY_BUNDLE_PATH.name)
sha_path = TELEMETRY_BUNDLE_PATH.with_suffix(TELEMETRY_BUNDLE_PATH.suffix + ".sha256")
if sha_path.exists():
shutil.copy2(sha_path, telemetry_dir / sha_path.name)
def scan_files(staging_dir: Path, exclude: Optional[set[str]] = None) -> list[OrderedDict[str, Any]]:
entries: list[OrderedDict[str, Any]] = []
exclude = exclude or set()
for path in sorted(staging_dir.rglob("*")):
if not path.is_file():
continue
rel = path.relative_to(staging_dir).as_posix()
if rel in exclude:
continue
entries.append(
OrderedDict(
(
("name", rel),
("sha256", compute_sha256(path)),
("size", path.stat().st_size),
)
)
)
return entries
def summarize_counts(staging_dir: Path) -> Mapping[str, int]:
def count_files(rel: str) -> int:
root = staging_dir / rel
if not root.exists():
return 0
return sum(1 for path in root.rglob("*") if path.is_file())
return {
"cli": count_files("cli"),
"taskPacksDocs": count_files("docs/task-packs"),
"containers": count_files("containers"),
"orchestrator": count_files("orchestrator"),
"exportCenter": count_files("export-center"),
"notifier": count_files("notifier"),
"surfaceSecrets": count_files("surface-secrets"),
}
def copy_container_bundles(release_dir: Path, staging_dir: Path) -> None:
"""Copy container air-gap bundles if present in the release directory."""
candidates = [release_dir / "containers", release_dir / "images"]
target_dir = staging_dir / "containers"
for root in candidates:
if not root.exists():
continue
for bundle in sorted(root.glob("**/*")):
if bundle.is_file() and bundle.suffix in {".gz", ".tar", ".tgz"}:
target_path = target_dir / bundle.relative_to(root)
target_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(bundle, target_path)
def write_offline_manifest(
staging_dir: Path,
version: str,
channel: str,
release_manifest_sha: Optional[str],
) -> tuple[Path, str]:
manifest_dir = staging_dir / "manifest"
manifest_dir.mkdir(parents=True, exist_ok=True)
offline_manifest_path = manifest_dir / "offline-manifest.json"
files = scan_files(staging_dir, exclude={"manifest/offline-manifest.json", "manifest/offline-manifest.json.sha256"})
manifest_data = OrderedDict(
(
(
"bundle",
OrderedDict(
(
("version", version),
("channel", channel),
("capturedAt", utc_now_iso()),
("releaseManifestSha256", release_manifest_sha),
)
),
),
("artifacts", files),
)
)
with offline_manifest_path.open("w", encoding="utf-8") as handle:
json.dump(manifest_data, handle, indent=2)
handle.write("\n")
manifest_sha = compute_sha256(offline_manifest_path)
(offline_manifest_path.with_suffix(".json.sha256")).write_text(
f"{manifest_sha} {offline_manifest_path.name}\n",
encoding="utf-8",
)
return offline_manifest_path, manifest_sha
def tarinfo_filter(tarinfo: tarfile.TarInfo) -> tarfile.TarInfo:
tarinfo.uid = 0
tarinfo.gid = 0
tarinfo.uname = ""
tarinfo.gname = ""
tarinfo.mtime = 0
return tarinfo
def create_tarball(staging_dir: Path, output_dir: Path, bundle_name: str) -> Path:
output_dir.mkdir(parents=True, exist_ok=True)
bundle_path = output_dir / f"{bundle_name}.tar.gz"
if bundle_path.exists():
bundle_path.unlink()
with tarfile.open(bundle_path, "w:gz", compresslevel=9) as tar:
for path in sorted(staging_dir.rglob("*")):
if path.is_file():
arcname = path.relative_to(staging_dir).as_posix()
tar.add(path, arcname=arcname, filter=tarinfo_filter)
return bundle_path
def sign_blob(
path: Path,
*,
key_ref: Optional[str],
identity_token: Optional[str],
password: Optional[str],
tlog_upload: bool,
) -> Optional[Path]:
if not key_ref and not identity_token:
return None
cmd = ["cosign", "sign-blob", "--yes", str(path)]
if key_ref:
cmd.extend(["--key", key_ref])
if identity_token:
cmd.extend(["--identity-token", identity_token])
if not tlog_upload:
cmd.append("--tlog-upload=false")
env = {"COSIGN_PASSWORD": password or ""}
signature = run(cmd, env=env)
sig_path = path.with_suffix(path.suffix + ".sig")
sig_path.write_text(signature, encoding="utf-8")
return sig_path
def build_offline_kit(args: argparse.Namespace) -> MutableMapping[str, Any]:
release_dir = args.release_dir.resolve()
staging_dir = args.staging_dir.resolve()
output_dir = args.output_dir.resolve()
verify_release(release_dir)
verify_required_seed_data(REPO_ROOT)
if not args.skip_smoke:
run_rust_analyzer_smoke()
run_python_analyzer_smoke()
clean_directory(staging_dir)
copy_debug_store(release_dir, staging_dir)
manifest_data = load_manifest(release_dir)
release_manifest_sha = None
checksums = manifest_data.get("checksums")
if isinstance(checksums, Mapping):
release_manifest_sha = checksums.get("sha256")
copy_release_manifests(release_dir, staging_dir)
copy_component_artifacts(manifest_data, release_dir, staging_dir)
copy_collections(manifest_data, release_dir, staging_dir)
copy_plugins_and_assets(staging_dir)
copy_bootstrap_configs(staging_dir)
copy_cli_and_taskrunner_assets(release_dir, staging_dir)
copy_container_bundles(release_dir, staging_dir)
copy_orchestrator_assets(release_dir, staging_dir)
copy_export_and_notifier_assets(release_dir, staging_dir)
copy_surface_secrets(release_dir, staging_dir)
copy_third_party_licenses(staging_dir)
package_telemetry_bundle(staging_dir)
offline_manifest_path, offline_manifest_sha = write_offline_manifest(
staging_dir,
args.version,
args.channel,
release_manifest_sha,
)
bundle_name = f"stella-ops-offline-kit-{args.version}-{args.channel}"
bundle_path = create_tarball(staging_dir, output_dir, bundle_name)
bundle_sha = compute_sha256(bundle_path)
bundle_sha_prefixed = f"sha256:{bundle_sha}"
(bundle_path.with_suffix(".tar.gz.sha256")).write_text(
f"{bundle_sha} {bundle_path.name}\n",
encoding="utf-8",
)
signature_paths: dict[str, str] = {}
sig = sign_blob(
bundle_path,
key_ref=args.cosign_key,
identity_token=args.cosign_identity_token,
password=args.cosign_password,
tlog_upload=not args.no_transparency,
)
if sig:
signature_paths["bundleSignature"] = str(sig)
manifest_sig = sign_blob(
offline_manifest_path,
key_ref=args.cosign_key,
identity_token=args.cosign_identity_token,
password=args.cosign_password,
tlog_upload=not args.no_transparency,
)
if manifest_sig:
signature_paths["manifestSignature"] = str(manifest_sig)
metadata = OrderedDict(
(
("bundleId", args.bundle_id or f"{args.version}-{args.channel}-{utc_now_iso()}"),
("bundleName", bundle_path.name),
("bundleSha256", bundle_sha_prefixed),
("bundleSize", bundle_path.stat().st_size),
("manifestName", offline_manifest_path.name),
("manifestSha256", f"sha256:{offline_manifest_sha}"),
("manifestSize", offline_manifest_path.stat().st_size),
("channel", args.channel),
("version", args.version),
("capturedAt", utc_now_iso()),
("counts", summarize_counts(staging_dir)),
)
)
if sig:
metadata["bundleSignatureName"] = Path(sig).name
if manifest_sig:
metadata["manifestSignatureName"] = Path(manifest_sig).name
metadata_path = output_dir / f"{bundle_name}.metadata.json"
with metadata_path.open("w", encoding="utf-8") as handle:
json.dump(metadata, handle, indent=2)
handle.write("\n")
return OrderedDict(
(
("bundlePath", str(bundle_path)),
("bundleSha256", bundle_sha),
("manifestPath", str(offline_manifest_path)),
("metadataPath", str(metadata_path)),
("signatures", signature_paths),
)
)
def parse_args(argv: Optional[list[str]] = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--version", required=True, help="Bundle version (e.g. 2025.10.0)")
parser.add_argument("--channel", default="edge", help="Release channel (default: %(default)s)")
parser.add_argument("--bundle-id", help="Optional explicit bundle identifier")
parser.add_argument(
"--release-dir",
type=Path,
default=DEFAULT_RELEASE_DIR,
help="Release artefact directory (default: %(default)s)",
)
parser.add_argument(
"--staging-dir",
type=Path,
default=DEFAULT_STAGING_DIR,
help="Temporary staging directory (default: %(default)s)",
)
parser.add_argument(
"--output-dir",
type=Path,
default=DEFAULT_OUTPUT_DIR,
help="Destination directory for packaged bundles (default: %(default)s)",
)
parser.add_argument("--cosign-key", dest="cosign_key", help="Cosign key reference for signing")
parser.add_argument("--cosign-password", dest="cosign_password", help="Cosign key password (if applicable)")
parser.add_argument("--cosign-identity-token", dest="cosign_identity_token", help="Cosign identity token")
parser.add_argument("--no-transparency", action="store_true", help="Disable Rekor transparency log uploads")
parser.add_argument("--skip-smoke", action="store_true", help="Skip analyzer smoke execution (testing only)")
return parser.parse_args(argv)
def main(argv: Optional[list[str]] = None) -> int:
args = parse_args(argv)
try:
result = build_offline_kit(args)
except Exception as exc: # pylint: disable=broad-except
print(f"offline-kit packaging failed: {exc}", file=sys.stderr)
return 1
print("✅ Offline kit packaged")
for key, value in result.items():
if isinstance(value, dict):
for sub_key, sub_val in value.items():
print(f" - {key}.{sub_key}: {sub_val}")
else:
print(f" - {key}: {value}")
return 0
if __name__ == "__main__":
raise SystemExit(main())