#!/usr/bin/env python3 """Deterministic release pipeline helper for StellaOps. This script builds service containers, generates SBOM and provenance artefacts, signs them with cosign, and writes a channel-specific release manifest. The workflow expects external tooling to be available on PATH: - docker (with buildx) - cosign - helm - npm / node (for the UI build) - dotnet SDK (for BuildX plugin publication) """ from __future__ import annotations import argparse import datetime as dt import hashlib import json import os import pathlib import re import shlex import subprocess import sys import tempfile from collections import OrderedDict from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Sequence REPO_ROOT = pathlib.Path(__file__).resolve().parents[3] DEFAULT_CONFIG = REPO_ROOT / "ops/devops/release/components.json" class CommandError(RuntimeError): pass def run(cmd: Sequence[str], *, cwd: Optional[pathlib.Path] = None, env: Optional[Mapping[str, str]] = None, capture: bool = True) -> str: """Run a subprocess command, returning stdout (text).""" process_env = os.environ.copy() if env: process_env.update(env) result = subprocess.run( list(cmd), cwd=str(cwd) if cwd else None, env=process_env, check=False, capture_output=capture, text=True, ) if process_env.get("STELLAOPS_RELEASE_DEBUG"): sys.stderr.write(f"[debug] {' '.join(shlex.quote(c) for c in cmd)}\n") if capture: sys.stderr.write(result.stdout) sys.stderr.write(result.stderr) if result.returncode != 0: stdout = result.stdout if capture else "" stderr = result.stderr if capture else "" raise CommandError(f"Command failed ({result.returncode}): {' '.join(cmd)}\nSTDOUT:\n{stdout}\nSTDERR:\n{stderr}") return result.stdout if capture else "" def load_json_config(path: pathlib.Path) -> Dict[str, Any]: with path.open("r", encoding="utf-8") as handle: return json.load(handle) def ensure_directory(path: pathlib.Path) -> pathlib.Path: path.mkdir(parents=True, exist_ok=True) return path def compute_sha256(path: pathlib.Path) -> str: sha = hashlib.sha256() with path.open("rb") as handle: for chunk in iter(lambda: handle.read(1024 * 1024), b""): sha.update(chunk) return sha.hexdigest() def format_scalar(value: Any) -> str: if isinstance(value, bool): return "true" if value else "false" if value is None: return "null" if isinstance(value, (int, float)): return str(value) text = str(value) if text == "": return '""' if re.search(r"[\s:#\-\[\]\{\}]", text): return json.dumps(text, ensure_ascii=False) return text def _yaml_lines(value: Any, indent: int = 0) -> List[str]: pad = " " * indent if isinstance(value, Mapping): lines: List[str] = [] for key, val in value.items(): if isinstance(val, (Mapping, list)): lines.append(f"{pad}{key}:") lines.extend(_yaml_lines(val, indent + 1)) else: lines.append(f"{pad}{key}: {format_scalar(val)}") if not lines: lines.append(f"{pad}{{}}") return lines if isinstance(value, list): lines = [] if not value: lines.append(f"{pad}[]") return lines for item in value: if isinstance(item, (Mapping, list)): lines.append(f"{pad}-") lines.extend(_yaml_lines(item, indent + 1)) else: lines.append(f"{pad}- {format_scalar(item)}") return lines return [f"{pad}{format_scalar(value)}"] def dump_yaml(data: Mapping[str, Any]) -> str: lines: List[str] = _yaml_lines(data) return "\n".join(lines) + "\n" def utc_now_iso() -> str: return dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") def sanitize_calendar(version: str, explicit: Optional[str]) -> str: if explicit: return explicit # Expect version like 2025.10.0-edge or 2.4.1 parts = re.findall(r"\d+", version) if len(parts) >= 2: return f"{parts[0]}.{parts[1]}" return dt.datetime.now(tz=dt.timezone.utc).strftime("%Y.%m") class ReleaseBuilder: def __init__( self, *, repo_root: pathlib.Path, config: Mapping[str, Any], version: str, channel: str, calendar: str, release_date: str, git_sha: str, output_dir: pathlib.Path, push: bool, dry_run: bool, registry_override: Optional[str] = None, platforms_override: Optional[Sequence[str]] = None, skip_signing: bool = False, cosign_key_ref: Optional[str] = None, cosign_password: Optional[str] = None, cosign_identity_token: Optional[str] = None, tlog_upload: bool = True, ) -> None: self.repo_root = repo_root self.config = config self.version = version self.channel = channel self.calendar = calendar self.release_date = release_date self.git_sha = git_sha self.output_dir = ensure_directory(output_dir) self.push = push self.dry_run = dry_run self.registry = registry_override or config.get("registry") if not self.registry: raise ValueError("Config missing 'registry'") platforms = list(platforms_override) if platforms_override else config.get("platforms") if not platforms: platforms = ["linux/amd64", "linux/arm64"] self.platforms = list(platforms) self.source_date_epoch = str(int(dt.datetime.fromisoformat(release_date.replace("Z", "+00:00")).timestamp())) self.artifacts_dir = ensure_directory(self.output_dir / "artifacts") self.sboms_dir = ensure_directory(self.artifacts_dir / "sboms") self.provenance_dir = ensure_directory(self.artifacts_dir / "provenance") self.signature_dir = ensure_directory(self.artifacts_dir / "signatures") self.metadata_dir = ensure_directory(self.artifacts_dir / "metadata") self.temp_dir = pathlib.Path(tempfile.mkdtemp(prefix="stellaops-release-")) self.skip_signing = skip_signing self.tlog_upload = tlog_upload self.cosign_key_ref = cosign_key_ref or os.environ.get("COSIGN_KEY_REF") self.cosign_identity_token = cosign_identity_token or os.environ.get("COSIGN_IDENTITY_TOKEN") password = cosign_password if cosign_password is not None else os.environ.get("COSIGN_PASSWORD", "") self.cosign_env = { "COSIGN_PASSWORD": password, "COSIGN_EXPERIMENTAL": "1", "COSIGN_ALLOW_HTTP_REGISTRY": os.environ.get("COSIGN_ALLOW_HTTP_REGISTRY", "1"), "COSIGN_DOCKER_MEDIA_TYPES": os.environ.get("COSIGN_DOCKER_MEDIA_TYPES", "1"), } # ---------------- # Build steps # ---------------- def run(self) -> Dict[str, Any]: components_result = [] if self.dry_run: print("⚠️ Dry-run enabled; commands will be skipped") self._prime_buildx_plugin() for component in self.config.get("components", []): result = self._build_component(component) components_result.append(result) helm_meta = self._package_helm() compose_meta = self._digest_compose_files() manifest = self._compose_manifest(components_result, helm_meta, compose_meta) return manifest def _prime_buildx_plugin(self) -> None: plugin_cfg = self.config.get("buildxPlugin") if not plugin_cfg: return project = plugin_cfg.get("project") if not project: return out_dir = ensure_directory(self.temp_dir / "buildx") if not self.dry_run: run([ "dotnet", "publish", project, "-c", "Release", "-o", str(out_dir), ]) cas_dir = ensure_directory(self.temp_dir / "cas") run([ "dotnet", str(out_dir / "StellaOps.Scanner.Sbomer.BuildXPlugin.dll"), "handshake", "--manifest", str(out_dir), "--cas", str(cas_dir), ]) def _component_tags(self, repo: str) -> List[str]: base = f"{self.registry}/{repo}" tags = [f"{base}:{self.version}"] if self.channel: tags.append(f"{base}:{self.channel}") return tags def _component_ref(self, repo: str, digest: str) -> str: return f"{self.registry}/{repo}@{digest}" def _build_component(self, component: Mapping[str, Any]) -> Mapping[str, Any]: name = component["name"] repo = component.get("repository", name) kind = component.get("kind", "dotnet-service") dockerfile = component.get("dockerfile") if not dockerfile: raise ValueError(f"Component {name} missing dockerfile") context = component.get("context", ".") iid_file = self.temp_dir / f"{name}.iid" metadata_file = self.metadata_dir / f"{name}.metadata.json" build_args = { "VERSION": self.version, "CHANNEL": self.channel, "GIT_SHA": self.git_sha, "SOURCE_DATE_EPOCH": self.source_date_epoch, } docker_cfg = self.config.get("docker", {}) if kind == "dotnet-service": build_args.update({ "PROJECT": component["project"], "ENTRYPOINT_DLL": component["entrypoint"], "SDK_IMAGE": docker_cfg.get("sdkImage", "mcr.microsoft.com/dotnet/nightly/sdk:10.0"), "RUNTIME_IMAGE": docker_cfg.get("runtimeImage", "gcr.io/distroless/dotnet/aspnet:latest"), }) elif kind == "angular-ui": build_args.update({ "NODE_IMAGE": docker_cfg.get("nodeImage", "node:20.14.0-bookworm"), "NGINX_IMAGE": docker_cfg.get("nginxImage", "nginx:1.27-alpine"), }) else: raise ValueError(f"Unsupported component kind {kind}") tags = self._component_tags(repo) build_cmd = [ "docker", "buildx", "build", "--file", dockerfile, "--metadata-file", str(metadata_file), "--iidfile", str(iid_file), "--progress", "plain", "--platform", ",".join(self.platforms), ] for key, value in build_args.items(): build_cmd.extend(["--build-arg", f"{key}={value}"]) for tag in tags: build_cmd.extend(["--tag", tag]) build_cmd.extend([ "--attest", "type=sbom", "--attest", "type=provenance,mode=max", ]) if self.push: build_cmd.append("--push") else: build_cmd.append("--load") build_cmd.append(context) if not self.dry_run: run(build_cmd, cwd=self.repo_root) digest = iid_file.read_text(encoding="utf-8").strip() if iid_file.exists() else "" image_ref = self._component_ref(repo, digest) if digest else "" bundle_info = self._sign_image(name, image_ref, tags) sbom_info = self._generate_sbom(name, image_ref) provenance_info = self._attach_provenance(name, image_ref) component_entry = OrderedDict() component_entry["name"] = name if digest: component_entry["image"] = image_ref component_entry["tags"] = tags if sbom_info: component_entry["sbom"] = sbom_info if provenance_info: component_entry["provenance"] = provenance_info if bundle_info: component_entry["signature"] = bundle_info if metadata_file.exists(): component_entry["metadata"] = str(metadata_file.relative_to(self.output_dir.parent)) if metadata_file.is_relative_to(self.output_dir.parent) else str(metadata_file) return component_entry def _sign_image(self, name: str, image_ref: str, tags: Sequence[str]) -> Optional[Mapping[str, Any]]: if self.skip_signing: return None if not image_ref: return None if not (self.cosign_key_ref or self.cosign_identity_token): raise ValueError("Signing requested but no cosign key or identity token provided. Use --skip-signing to bypass.") signature_path = self.signature_dir / f"{name}.signature" cmd = ["cosign", "sign", "--yes"] if self.cosign_key_ref: cmd.extend(["--key", self.cosign_key_ref]) if self.cosign_identity_token: cmd.extend(["--identity-token", self.cosign_identity_token]) if not self.tlog_upload: cmd.append("--tlog-upload=false") cmd.append("--allow-http-registry") cmd.append(image_ref) if self.dry_run: return None run(cmd, env=self.cosign_env) signature_data = run([ "cosign", "download", "signature", "--allow-http-registry", image_ref, ]) signature_path.write_text(signature_data, encoding="utf-8") signature_ref = run([ "cosign", "triangulate", "--allow-http-registry", image_ref, ]).strip() return OrderedDict( ( ("signature", OrderedDict(( ("path", str(signature_path.relative_to(self.output_dir.parent)) if signature_path.is_relative_to(self.output_dir.parent) else str(signature_path)), ("ref", signature_ref), ("tlogUploaded", self.tlog_upload), ))), ) ) def _generate_sbom(self, name: str, image_ref: str) -> Optional[Mapping[str, Any]]: if not image_ref or self.dry_run: return None sbom_path = self.sboms_dir / f"{name}.cyclonedx.json" run([ "docker", "sbom", image_ref, "--format", "cyclonedx-json", "--output", str(sbom_path), ]) entry = OrderedDict(( ("path", str(sbom_path.relative_to(self.output_dir.parent)) if sbom_path.is_relative_to(self.output_dir.parent) else str(sbom_path)), ("sha256", compute_sha256(sbom_path)), )) if self.skip_signing: return entry attach_cmd = [ "cosign", "attach", "sbom", "--sbom", str(sbom_path), "--type", "cyclonedx", ] if self.cosign_key_ref: attach_cmd.extend(["--key", self.cosign_key_ref]) attach_cmd.append("--allow-http-registry") attach_cmd.append(image_ref) run(attach_cmd, env=self.cosign_env) reference = run(["cosign", "triangulate", "--type", "sbom", "--allow-http-registry", image_ref]).strip() entry["ref"] = reference return entry def _attach_provenance(self, name: str, image_ref: str) -> Optional[Mapping[str, Any]]: if not image_ref or self.dry_run: return None predicate = OrderedDict() predicate["buildDefinition"] = OrderedDict( ( ("buildType", "https://git.stella-ops.org/stellaops/release"), ("externalParameters", OrderedDict(( ("component", name), ("version", self.version), ("channel", self.channel), ))), ) ) predicate["runDetails"] = OrderedDict( ( ("builder", OrderedDict((("id", "https://github.com/actions"),))), ("metadata", OrderedDict((("finishedOn", self.release_date),))), ) ) predicate_path = self.provenance_dir / f"{name}.provenance.json" with predicate_path.open("w", encoding="utf-8") as handle: json.dump(predicate, handle, indent=2, sort_keys=True) handle.write("\n") entry = OrderedDict(( ("path", str(predicate_path.relative_to(self.output_dir.parent)) if predicate_path.is_relative_to(self.output_dir.parent) else str(predicate_path)), ("sha256", compute_sha256(predicate_path)), )) if self.skip_signing: return entry cmd = [ "cosign", "attest", "--predicate", str(predicate_path), "--type", "https://slsa.dev/provenance/v1", ] if self.cosign_key_ref: cmd.extend(["--key", self.cosign_key_ref]) if not self.tlog_upload: cmd.append("--tlog-upload=false") cmd.append("--allow-http-registry") cmd.append(image_ref) run(cmd, env=self.cosign_env) ref = run([ "cosign", "triangulate", "--type", "https://slsa.dev/provenance/v1", "--allow-http-registry", image_ref, ]).strip() entry["ref"] = ref return entry # ---------------- # Helm + compose # ---------------- def _package_helm(self) -> Optional[Mapping[str, Any]]: helm_cfg = self.config.get("helm") if not helm_cfg: return None chart_path = helm_cfg.get("chartPath") if not chart_path: return None chart_dir = self.repo_root / chart_path output_dir = ensure_directory(self.output_dir / "helm") archive_path = output_dir / f"stellaops-{self.version}.tgz" if not self.dry_run: cmd = [ "helm", "package", str(chart_dir), "--destination", str(output_dir), "--version", self.version, "--app-version", self.version, ] run(cmd) packaged = next(output_dir.glob("*.tgz"), None) if packaged and packaged != archive_path: packaged.rename(archive_path) digest = compute_sha256(archive_path) if archive_path.exists() else None if archive_path.exists() and archive_path.is_relative_to(self.output_dir): manifest_path = str(archive_path.relative_to(self.output_dir)) elif archive_path.exists() and archive_path.is_relative_to(self.output_dir.parent): manifest_path = str(archive_path.relative_to(self.output_dir.parent)) else: manifest_path = f"helm/{archive_path.name}" return OrderedDict(( ("name", "stellaops"), ("version", self.version), ("path", manifest_path), ("sha256", digest), )) def _digest_compose_files(self) -> List[Mapping[str, Any]]: compose_cfg = self.config.get("compose", {}) files = compose_cfg.get("files", []) entries: List[Mapping[str, Any]] = [] for rel_path in files: src = self.repo_root / rel_path if not src.exists(): continue digest = compute_sha256(src) entries.append(OrderedDict(( ("name", pathlib.Path(rel_path).name), ("path", rel_path), ("sha256", digest), ))) return entries # ---------------- # Manifest assembly # ---------------- def _compose_manifest( self, components: List[Mapping[str, Any]], helm_meta: Optional[Mapping[str, Any]], compose_meta: List[Mapping[str, Any]], ) -> Dict[str, Any]: manifest = OrderedDict() manifest["release"] = OrderedDict(( ("version", self.version), ("channel", self.channel), ("date", self.release_date), ("calendar", self.calendar), )) manifest["components"] = components if helm_meta: manifest["charts"] = [helm_meta] if compose_meta: manifest["compose"] = compose_meta return manifest def parse_args(argv: Optional[Sequence[str]] = None) -> argparse.Namespace: parser = argparse.ArgumentParser(description="Build StellaOps release artefacts deterministically") parser.add_argument("--config", type=pathlib.Path, default=DEFAULT_CONFIG, help="Path to release config JSON") parser.add_argument("--version", required=True, help="Release version string (e.g. 2025.10.0-edge)") parser.add_argument("--channel", required=True, help="Release channel (edge|stable|lts)") parser.add_argument("--calendar", help="Calendar tag (YYYY.MM); defaults derived from version") parser.add_argument("--git-sha", default=os.environ.get("GIT_COMMIT", "unknown"), help="Git revision to embed") parser.add_argument("--output", type=pathlib.Path, default=REPO_ROOT / "out/release", help="Output directory for artefacts") parser.add_argument("--no-push", action="store_true", help="Do not push images (use docker load)") parser.add_argument("--dry-run", action="store_true", help="Print steps without executing commands") parser.add_argument("--registry", help="Override registry root (e.g. localhost:5000/stellaops)") parser.add_argument("--platform", dest="platforms", action="append", metavar="PLATFORM", help="Override build platforms (repeatable)") parser.add_argument("--skip-signing", action="store_true", help="Skip cosign signing/attestation steps") parser.add_argument("--cosign-key", dest="cosign_key", help="Override COSIGN_KEY_REF value") parser.add_argument("--cosign-password", dest="cosign_password", help="Password for cosign key") parser.add_argument("--cosign-identity-token", dest="cosign_identity_token", help="Identity token for keyless cosign flows") parser.add_argument("--no-transparency", action="store_true", help="Disable Rekor transparency log upload during signing") return parser.parse_args(argv) def write_manifest(manifest: Mapping[str, Any], output_dir: pathlib.Path) -> pathlib.Path: # Copy manifest to avoid mutating input when computing checksum base_manifest = OrderedDict(manifest) yaml_without_checksum = dump_yaml(base_manifest) digest = hashlib.sha256(yaml_without_checksum.encode("utf-8")).hexdigest() manifest_with_checksum = OrderedDict(base_manifest) manifest_with_checksum["checksums"] = OrderedDict((("sha256", digest),)) final_yaml = dump_yaml(manifest_with_checksum) output_path = output_dir / "release.yaml" with output_path.open("w", encoding="utf-8") as handle: handle.write(final_yaml) return output_path def main(argv: Optional[Sequence[str]] = None) -> int: args = parse_args(argv) config = load_json_config(args.config) release_date = utc_now_iso() calendar = sanitize_calendar(args.version, args.calendar) builder = ReleaseBuilder( repo_root=REPO_ROOT, config=config, version=args.version, channel=args.channel, calendar=calendar, release_date=release_date, git_sha=args.git_sha, output_dir=args.output, push=not args.no_push, dry_run=args.dry_run, registry_override=args.registry, platforms_override=args.platforms, skip_signing=args.skip_signing, cosign_key_ref=args.cosign_key, cosign_password=args.cosign_password, cosign_identity_token=args.cosign_identity_token, tlog_upload=not args.no_transparency, ) manifest = builder.run() manifest_path = write_manifest(manifest, builder.output_dir) print(f"✅ Release manifest written to {manifest_path}") return 0 if __name__ == "__main__": raise SystemExit(main())