631 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			631 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/env python3
 | 
						|
"""Deterministic release pipeline helper for StellaOps.
 | 
						|
 | 
						|
This script builds service containers, generates SBOM and provenance artefacts,
 | 
						|
signs them with cosign, and writes a channel-specific release manifest.
 | 
						|
 | 
						|
The workflow expects external tooling to be available on PATH:
 | 
						|
- docker (with buildx)
 | 
						|
- cosign
 | 
						|
- helm
 | 
						|
- npm / node (for the UI build)
 | 
						|
- dotnet SDK (for BuildX plugin publication)
 | 
						|
"""
 | 
						|
from __future__ import annotations
 | 
						|
 | 
						|
import argparse
 | 
						|
import datetime as dt
 | 
						|
import hashlib
 | 
						|
import json
 | 
						|
import os
 | 
						|
import pathlib
 | 
						|
import re
 | 
						|
import shlex
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
import tempfile
 | 
						|
from collections import OrderedDict
 | 
						|
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Sequence
 | 
						|
 | 
						|
REPO_ROOT = pathlib.Path(__file__).resolve().parents[3]
 | 
						|
DEFAULT_CONFIG = REPO_ROOT / "ops/devops/release/components.json"
 | 
						|
 | 
						|
class CommandError(RuntimeError):
 | 
						|
    pass
 | 
						|
 | 
						|
def run(cmd: Sequence[str], *, cwd: Optional[pathlib.Path] = None, env: Optional[Mapping[str, str]] = None, capture: bool = True) -> str:
 | 
						|
    """Run a subprocess command, returning stdout (text)."""
 | 
						|
    process_env = os.environ.copy()
 | 
						|
    if env:
 | 
						|
        process_env.update(env)
 | 
						|
    result = subprocess.run(
 | 
						|
        list(cmd),
 | 
						|
        cwd=str(cwd) if cwd else None,
 | 
						|
        env=process_env,
 | 
						|
        check=False,
 | 
						|
        capture_output=capture,
 | 
						|
        text=True,
 | 
						|
    )
 | 
						|
    if process_env.get("STELLAOPS_RELEASE_DEBUG"):
 | 
						|
        sys.stderr.write(f"[debug] {' '.join(shlex.quote(c) for c in cmd)}\n")
 | 
						|
        if capture:
 | 
						|
            sys.stderr.write(result.stdout)
 | 
						|
            sys.stderr.write(result.stderr)
 | 
						|
    if result.returncode != 0:
 | 
						|
        stdout = result.stdout if capture else ""
 | 
						|
        stderr = result.stderr if capture else ""
 | 
						|
        raise CommandError(f"Command failed ({result.returncode}): {' '.join(cmd)}\nSTDOUT:\n{stdout}\nSTDERR:\n{stderr}")
 | 
						|
 | 
						|
    return result.stdout if capture else ""
 | 
						|
 | 
						|
 | 
						|
def load_json_config(path: pathlib.Path) -> Dict[str, Any]:
 | 
						|
    with path.open("r", encoding="utf-8") as handle:
 | 
						|
        return json.load(handle)
 | 
						|
 | 
						|
 | 
						|
def ensure_directory(path: pathlib.Path) -> pathlib.Path:
 | 
						|
    path.mkdir(parents=True, exist_ok=True)
 | 
						|
    return path
 | 
						|
 | 
						|
 | 
						|
def compute_sha256(path: pathlib.Path) -> str:
 | 
						|
    sha = hashlib.sha256()
 | 
						|
    with path.open("rb") as handle:
 | 
						|
        for chunk in iter(lambda: handle.read(1024 * 1024), b""):
 | 
						|
            sha.update(chunk)
 | 
						|
    return sha.hexdigest()
 | 
						|
 | 
						|
 | 
						|
def format_scalar(value: Any) -> str:
 | 
						|
    if isinstance(value, bool):
 | 
						|
        return "true" if value else "false"
 | 
						|
    if value is None:
 | 
						|
        return "null"
 | 
						|
    if isinstance(value, (int, float)):
 | 
						|
        return str(value)
 | 
						|
    text = str(value)
 | 
						|
    if text == "":
 | 
						|
        return '""'
 | 
						|
    if re.search(r"[\s:#\-\[\]\{\}]", text):
 | 
						|
        return json.dumps(text, ensure_ascii=False)
 | 
						|
    return text
 | 
						|
 | 
						|
 | 
						|
def _yaml_lines(value: Any, indent: int = 0) -> List[str]:
 | 
						|
    pad = "  " * indent
 | 
						|
    if isinstance(value, Mapping):
 | 
						|
        lines: List[str] = []
 | 
						|
        for key, val in value.items():
 | 
						|
            if isinstance(val, (Mapping, list)):
 | 
						|
                lines.append(f"{pad}{key}:")
 | 
						|
                lines.extend(_yaml_lines(val, indent + 1))
 | 
						|
            else:
 | 
						|
                lines.append(f"{pad}{key}: {format_scalar(val)}")
 | 
						|
        if not lines:
 | 
						|
            lines.append(f"{pad}{{}}")
 | 
						|
        return lines
 | 
						|
    if isinstance(value, list):
 | 
						|
        lines = []
 | 
						|
        if not value:
 | 
						|
            lines.append(f"{pad}[]")
 | 
						|
            return lines
 | 
						|
        for item in value:
 | 
						|
            if isinstance(item, (Mapping, list)):
 | 
						|
                lines.append(f"{pad}-")
 | 
						|
                lines.extend(_yaml_lines(item, indent + 1))
 | 
						|
            else:
 | 
						|
                lines.append(f"{pad}- {format_scalar(item)}")
 | 
						|
        return lines
 | 
						|
    return [f"{pad}{format_scalar(value)}"]
 | 
						|
 | 
						|
 | 
						|
def dump_yaml(data: Mapping[str, Any]) -> str:
 | 
						|
    lines: List[str] = _yaml_lines(data)
 | 
						|
    return "\n".join(lines) + "\n"
 | 
						|
 | 
						|
 | 
						|
def utc_now_iso() -> str:
 | 
						|
    return dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
 | 
						|
 | 
						|
 | 
						|
def sanitize_calendar(version: str, explicit: Optional[str]) -> str:
 | 
						|
    if explicit:
 | 
						|
        return explicit
 | 
						|
    # Expect version like 2025.10.0-edge or 2.4.1
 | 
						|
    parts = re.findall(r"\d+", version)
 | 
						|
    if len(parts) >= 2:
 | 
						|
        return f"{parts[0]}.{parts[1]}"
 | 
						|
    return dt.datetime.now(tz=dt.timezone.utc).strftime("%Y.%m")
 | 
						|
 | 
						|
 | 
						|
class ReleaseBuilder:
 | 
						|
    def __init__(
 | 
						|
        self,
 | 
						|
        *,
 | 
						|
        repo_root: pathlib.Path,
 | 
						|
        config: Mapping[str, Any],
 | 
						|
        version: str,
 | 
						|
        channel: str,
 | 
						|
        calendar: str,
 | 
						|
        release_date: str,
 | 
						|
        git_sha: str,
 | 
						|
        output_dir: pathlib.Path,
 | 
						|
        push: bool,
 | 
						|
        dry_run: bool,
 | 
						|
        registry_override: Optional[str] = None,
 | 
						|
        platforms_override: Optional[Sequence[str]] = None,
 | 
						|
        skip_signing: bool = False,
 | 
						|
        cosign_key_ref: Optional[str] = None,
 | 
						|
        cosign_password: Optional[str] = None,
 | 
						|
        cosign_identity_token: Optional[str] = None,
 | 
						|
        tlog_upload: bool = True,
 | 
						|
    ) -> None:
 | 
						|
        self.repo_root = repo_root
 | 
						|
        self.config = config
 | 
						|
        self.version = version
 | 
						|
        self.channel = channel
 | 
						|
        self.calendar = calendar
 | 
						|
        self.release_date = release_date
 | 
						|
        self.git_sha = git_sha
 | 
						|
        self.output_dir = ensure_directory(output_dir)
 | 
						|
        self.push = push
 | 
						|
        self.dry_run = dry_run
 | 
						|
        self.registry = registry_override or config.get("registry")
 | 
						|
        if not self.registry:
 | 
						|
            raise ValueError("Config missing 'registry'")
 | 
						|
        platforms = list(platforms_override) if platforms_override else config.get("platforms")
 | 
						|
        if not platforms:
 | 
						|
            platforms = ["linux/amd64", "linux/arm64"]
 | 
						|
        self.platforms = list(platforms)
 | 
						|
        self.source_date_epoch = str(int(dt.datetime.fromisoformat(release_date.replace("Z", "+00:00")).timestamp()))
 | 
						|
        self.artifacts_dir = ensure_directory(self.output_dir / "artifacts")
 | 
						|
        self.sboms_dir = ensure_directory(self.artifacts_dir / "sboms")
 | 
						|
        self.provenance_dir = ensure_directory(self.artifacts_dir / "provenance")
 | 
						|
        self.signature_dir = ensure_directory(self.artifacts_dir / "signatures")
 | 
						|
        self.metadata_dir = ensure_directory(self.artifacts_dir / "metadata")
 | 
						|
        self.temp_dir = pathlib.Path(tempfile.mkdtemp(prefix="stellaops-release-"))
 | 
						|
        self.skip_signing = skip_signing
 | 
						|
        self.tlog_upload = tlog_upload
 | 
						|
        self.cosign_key_ref = cosign_key_ref or os.environ.get("COSIGN_KEY_REF")
 | 
						|
        self.cosign_identity_token = cosign_identity_token or os.environ.get("COSIGN_IDENTITY_TOKEN")
 | 
						|
        password = cosign_password if cosign_password is not None else os.environ.get("COSIGN_PASSWORD", "")
 | 
						|
        self.cosign_env = {
 | 
						|
            "COSIGN_PASSWORD": password,
 | 
						|
            "COSIGN_EXPERIMENTAL": "1",
 | 
						|
            "COSIGN_ALLOW_HTTP_REGISTRY": os.environ.get("COSIGN_ALLOW_HTTP_REGISTRY", "1"),
 | 
						|
            "COSIGN_DOCKER_MEDIA_TYPES": os.environ.get("COSIGN_DOCKER_MEDIA_TYPES", "1"),
 | 
						|
        }
 | 
						|
 | 
						|
    # ----------------
 | 
						|
    # Build steps
 | 
						|
    # ----------------
 | 
						|
    def run(self) -> Dict[str, Any]:
 | 
						|
        components_result = []
 | 
						|
        if self.dry_run:
 | 
						|
            print("⚠️  Dry-run enabled; commands will be skipped")
 | 
						|
        self._prime_buildx_plugin()
 | 
						|
        for component in self.config.get("components", []):
 | 
						|
            result = self._build_component(component)
 | 
						|
            components_result.append(result)
 | 
						|
        helm_meta = self._package_helm()
 | 
						|
        compose_meta = self._digest_compose_files()
 | 
						|
        manifest = self._compose_manifest(components_result, helm_meta, compose_meta)
 | 
						|
        return manifest
 | 
						|
 | 
						|
    def _prime_buildx_plugin(self) -> None:
 | 
						|
        plugin_cfg = self.config.get("buildxPlugin")
 | 
						|
        if not plugin_cfg:
 | 
						|
            return
 | 
						|
        project = plugin_cfg.get("project")
 | 
						|
        if not project:
 | 
						|
            return
 | 
						|
        out_dir = ensure_directory(self.temp_dir / "buildx")
 | 
						|
        if not self.dry_run:
 | 
						|
            run([
 | 
						|
                "dotnet",
 | 
						|
                "publish",
 | 
						|
                project,
 | 
						|
                "-c",
 | 
						|
                "Release",
 | 
						|
                "-o",
 | 
						|
                str(out_dir),
 | 
						|
            ])
 | 
						|
            cas_dir = ensure_directory(self.temp_dir / "cas")
 | 
						|
            run([
 | 
						|
                "dotnet",
 | 
						|
                str(out_dir / "StellaOps.Scanner.Sbomer.BuildXPlugin.dll"),
 | 
						|
                "handshake",
 | 
						|
                "--manifest",
 | 
						|
                str(out_dir),
 | 
						|
                "--cas",
 | 
						|
                str(cas_dir),
 | 
						|
            ])
 | 
						|
 | 
						|
    def _component_tags(self, repo: str) -> List[str]:
 | 
						|
        base = f"{self.registry}/{repo}"
 | 
						|
        tags = [f"{base}:{self.version}"]
 | 
						|
        if self.channel:
 | 
						|
            tags.append(f"{base}:{self.channel}")
 | 
						|
        return tags
 | 
						|
 | 
						|
    def _component_ref(self, repo: str, digest: str) -> str:
 | 
						|
        return f"{self.registry}/{repo}@{digest}"
 | 
						|
 | 
						|
    def _build_component(self, component: Mapping[str, Any]) -> Mapping[str, Any]:
 | 
						|
        name = component["name"]
 | 
						|
        repo = component.get("repository", name)
 | 
						|
        kind = component.get("kind", "dotnet-service")
 | 
						|
        dockerfile = component.get("dockerfile")
 | 
						|
        if not dockerfile:
 | 
						|
            raise ValueError(f"Component {name} missing dockerfile")
 | 
						|
        context = component.get("context", ".")
 | 
						|
        iid_file = self.temp_dir / f"{name}.iid"
 | 
						|
        metadata_file = self.metadata_dir / f"{name}.metadata.json"
 | 
						|
 | 
						|
        build_args = {
 | 
						|
            "VERSION": self.version,
 | 
						|
            "CHANNEL": self.channel,
 | 
						|
            "GIT_SHA": self.git_sha,
 | 
						|
            "SOURCE_DATE_EPOCH": self.source_date_epoch,
 | 
						|
        }
 | 
						|
        docker_cfg = self.config.get("docker", {})
 | 
						|
        if kind == "dotnet-service":
 | 
						|
            build_args.update({
 | 
						|
                "PROJECT": component["project"],
 | 
						|
                "ENTRYPOINT_DLL": component["entrypoint"],
 | 
						|
                "SDK_IMAGE": docker_cfg.get("sdkImage", "mcr.microsoft.com/dotnet/nightly/sdk:10.0"),
 | 
						|
                "RUNTIME_IMAGE": docker_cfg.get("runtimeImage", "gcr.io/distroless/dotnet/aspnet:latest"),
 | 
						|
            })
 | 
						|
        elif kind == "angular-ui":
 | 
						|
            build_args.update({
 | 
						|
                "NODE_IMAGE": docker_cfg.get("nodeImage", "node:20.14.0-bookworm"),
 | 
						|
                "NGINX_IMAGE": docker_cfg.get("nginxImage", "nginx:1.27-alpine"),
 | 
						|
            })
 | 
						|
        else:
 | 
						|
            raise ValueError(f"Unsupported component kind {kind}")
 | 
						|
 | 
						|
        tags = self._component_tags(repo)
 | 
						|
        build_cmd = [
 | 
						|
            "docker",
 | 
						|
            "buildx",
 | 
						|
            "build",
 | 
						|
            "--file",
 | 
						|
            dockerfile,
 | 
						|
            "--metadata-file",
 | 
						|
            str(metadata_file),
 | 
						|
            "--iidfile",
 | 
						|
            str(iid_file),
 | 
						|
            "--progress",
 | 
						|
            "plain",
 | 
						|
            "--platform",
 | 
						|
            ",".join(self.platforms),
 | 
						|
        ]
 | 
						|
        for key, value in build_args.items():
 | 
						|
            build_cmd.extend(["--build-arg", f"{key}={value}"])
 | 
						|
        for tag in tags:
 | 
						|
            build_cmd.extend(["--tag", tag])
 | 
						|
        build_cmd.extend([
 | 
						|
            "--attest",
 | 
						|
            "type=sbom",
 | 
						|
            "--attest",
 | 
						|
            "type=provenance,mode=max",
 | 
						|
        ])
 | 
						|
        if self.push:
 | 
						|
            build_cmd.append("--push")
 | 
						|
        else:
 | 
						|
            build_cmd.append("--load")
 | 
						|
        build_cmd.append(context)
 | 
						|
 | 
						|
        if not self.dry_run:
 | 
						|
            run(build_cmd, cwd=self.repo_root)
 | 
						|
 | 
						|
        digest = iid_file.read_text(encoding="utf-8").strip() if iid_file.exists() else ""
 | 
						|
        image_ref = self._component_ref(repo, digest) if digest else ""
 | 
						|
 | 
						|
        bundle_info = self._sign_image(name, image_ref, tags)
 | 
						|
        sbom_info = self._generate_sbom(name, image_ref)
 | 
						|
        provenance_info = self._attach_provenance(name, image_ref)
 | 
						|
 | 
						|
        component_entry = OrderedDict()
 | 
						|
        component_entry["name"] = name
 | 
						|
        if digest:
 | 
						|
            component_entry["image"] = image_ref
 | 
						|
        component_entry["tags"] = tags
 | 
						|
        if sbom_info:
 | 
						|
            component_entry["sbom"] = sbom_info
 | 
						|
        if provenance_info:
 | 
						|
            component_entry["provenance"] = provenance_info
 | 
						|
        if bundle_info:
 | 
						|
            component_entry["signature"] = bundle_info
 | 
						|
        if metadata_file.exists():
 | 
						|
            component_entry["metadata"] = str(metadata_file.relative_to(self.output_dir.parent)) if metadata_file.is_relative_to(self.output_dir.parent) else str(metadata_file)
 | 
						|
        return component_entry
 | 
						|
 | 
						|
    def _sign_image(self, name: str, image_ref: str, tags: Sequence[str]) -> Optional[Mapping[str, Any]]:
 | 
						|
        if self.skip_signing:
 | 
						|
            return None
 | 
						|
        if not image_ref:
 | 
						|
            return None
 | 
						|
        if not (self.cosign_key_ref or self.cosign_identity_token):
 | 
						|
            raise ValueError("Signing requested but no cosign key or identity token provided. Use --skip-signing to bypass.")
 | 
						|
        signature_path = self.signature_dir / f"{name}.signature"
 | 
						|
        cmd = ["cosign", "sign", "--yes"]
 | 
						|
        if self.cosign_key_ref:
 | 
						|
            cmd.extend(["--key", self.cosign_key_ref])
 | 
						|
        if self.cosign_identity_token:
 | 
						|
            cmd.extend(["--identity-token", self.cosign_identity_token])
 | 
						|
        if not self.tlog_upload:
 | 
						|
            cmd.append("--tlog-upload=false")
 | 
						|
        cmd.append("--allow-http-registry")
 | 
						|
        cmd.append(image_ref)
 | 
						|
        if self.dry_run:
 | 
						|
            return None
 | 
						|
        run(cmd, env=self.cosign_env)
 | 
						|
        signature_data = run([
 | 
						|
            "cosign",
 | 
						|
            "download",
 | 
						|
            "signature",
 | 
						|
            "--allow-http-registry",
 | 
						|
            image_ref,
 | 
						|
        ])
 | 
						|
        signature_path.write_text(signature_data, encoding="utf-8")
 | 
						|
        signature_ref = run([
 | 
						|
            "cosign",
 | 
						|
            "triangulate",
 | 
						|
            "--allow-http-registry",
 | 
						|
            image_ref,
 | 
						|
        ]).strip()
 | 
						|
        return OrderedDict(
 | 
						|
            (
 | 
						|
                ("signature", OrderedDict((
 | 
						|
                    ("path", str(signature_path.relative_to(self.output_dir.parent)) if signature_path.is_relative_to(self.output_dir.parent) else str(signature_path)),
 | 
						|
                    ("ref", signature_ref),
 | 
						|
                    ("tlogUploaded", self.tlog_upload),
 | 
						|
                ))),
 | 
						|
            )
 | 
						|
        )
 | 
						|
 | 
						|
    def _generate_sbom(self, name: str, image_ref: str) -> Optional[Mapping[str, Any]]:
 | 
						|
        if not image_ref or self.dry_run:
 | 
						|
            return None
 | 
						|
        sbom_path = self.sboms_dir / f"{name}.cyclonedx.json"
 | 
						|
        run([
 | 
						|
            "docker",
 | 
						|
            "sbom",
 | 
						|
            image_ref,
 | 
						|
            "--format",
 | 
						|
            "cyclonedx-json",
 | 
						|
            "--output",
 | 
						|
            str(sbom_path),
 | 
						|
        ])
 | 
						|
        entry = OrderedDict((
 | 
						|
            ("path", str(sbom_path.relative_to(self.output_dir.parent)) if sbom_path.is_relative_to(self.output_dir.parent) else str(sbom_path)),
 | 
						|
            ("sha256", compute_sha256(sbom_path)),
 | 
						|
        ))
 | 
						|
        if self.skip_signing:
 | 
						|
            return entry
 | 
						|
        attach_cmd = [
 | 
						|
            "cosign",
 | 
						|
            "attach",
 | 
						|
            "sbom",
 | 
						|
            "--sbom",
 | 
						|
            str(sbom_path),
 | 
						|
            "--type",
 | 
						|
            "cyclonedx",
 | 
						|
        ]
 | 
						|
        if self.cosign_key_ref:
 | 
						|
            attach_cmd.extend(["--key", self.cosign_key_ref])
 | 
						|
        attach_cmd.append("--allow-http-registry")
 | 
						|
        attach_cmd.append(image_ref)
 | 
						|
        run(attach_cmd, env=self.cosign_env)
 | 
						|
        reference = run(["cosign", "triangulate", "--type", "sbom", "--allow-http-registry", image_ref]).strip()
 | 
						|
        entry["ref"] = reference
 | 
						|
        return entry
 | 
						|
 | 
						|
    def _attach_provenance(self, name: str, image_ref: str) -> Optional[Mapping[str, Any]]:
 | 
						|
        if not image_ref or self.dry_run:
 | 
						|
            return None
 | 
						|
        predicate = OrderedDict()
 | 
						|
        predicate["buildDefinition"] = OrderedDict(
 | 
						|
            (
 | 
						|
                ("buildType", "https://git.stella-ops.org/stellaops/release"),
 | 
						|
                ("externalParameters", OrderedDict((
 | 
						|
                    ("component", name),
 | 
						|
                    ("version", self.version),
 | 
						|
                    ("channel", self.channel),
 | 
						|
                ))),
 | 
						|
            )
 | 
						|
        )
 | 
						|
        predicate["runDetails"] = OrderedDict(
 | 
						|
            (
 | 
						|
                ("builder", OrderedDict((("id", "https://github.com/actions"),))),
 | 
						|
                ("metadata", OrderedDict((("finishedOn", self.release_date),))),
 | 
						|
            )
 | 
						|
        )
 | 
						|
        predicate_path = self.provenance_dir / f"{name}.provenance.json"
 | 
						|
        with predicate_path.open("w", encoding="utf-8") as handle:
 | 
						|
            json.dump(predicate, handle, indent=2, sort_keys=True)
 | 
						|
            handle.write("\n")
 | 
						|
        entry = OrderedDict((
 | 
						|
            ("path", str(predicate_path.relative_to(self.output_dir.parent)) if predicate_path.is_relative_to(self.output_dir.parent) else str(predicate_path)),
 | 
						|
            ("sha256", compute_sha256(predicate_path)),
 | 
						|
        ))
 | 
						|
        if self.skip_signing:
 | 
						|
            return entry
 | 
						|
        cmd = [
 | 
						|
            "cosign",
 | 
						|
            "attest",
 | 
						|
            "--predicate",
 | 
						|
            str(predicate_path),
 | 
						|
            "--type",
 | 
						|
            "https://slsa.dev/provenance/v1",
 | 
						|
        ]
 | 
						|
        if self.cosign_key_ref:
 | 
						|
            cmd.extend(["--key", self.cosign_key_ref])
 | 
						|
        if not self.tlog_upload:
 | 
						|
            cmd.append("--tlog-upload=false")
 | 
						|
        cmd.append("--allow-http-registry")
 | 
						|
        cmd.append(image_ref)
 | 
						|
        run(cmd, env=self.cosign_env)
 | 
						|
        ref = run([
 | 
						|
            "cosign",
 | 
						|
            "triangulate",
 | 
						|
            "--type",
 | 
						|
            "https://slsa.dev/provenance/v1",
 | 
						|
            "--allow-http-registry",
 | 
						|
            image_ref,
 | 
						|
        ]).strip()
 | 
						|
        entry["ref"] = ref
 | 
						|
        return entry
 | 
						|
 | 
						|
    # ----------------
 | 
						|
    # Helm + compose
 | 
						|
    # ----------------
 | 
						|
    def _package_helm(self) -> Optional[Mapping[str, Any]]:
 | 
						|
        helm_cfg = self.config.get("helm")
 | 
						|
        if not helm_cfg:
 | 
						|
            return None
 | 
						|
        chart_path = helm_cfg.get("chartPath")
 | 
						|
        if not chart_path:
 | 
						|
            return None
 | 
						|
        chart_dir = self.repo_root / chart_path
 | 
						|
        output_dir = ensure_directory(self.output_dir / "helm")
 | 
						|
        archive_path = output_dir / f"stellaops-{self.version}.tgz"
 | 
						|
        if not self.dry_run:
 | 
						|
            cmd = [
 | 
						|
                "helm",
 | 
						|
                "package",
 | 
						|
                str(chart_dir),
 | 
						|
                "--destination",
 | 
						|
                str(output_dir),
 | 
						|
                "--version",
 | 
						|
                self.version,
 | 
						|
                "--app-version",
 | 
						|
                self.version,
 | 
						|
            ]
 | 
						|
            run(cmd)
 | 
						|
            packaged = next(output_dir.glob("*.tgz"), None)
 | 
						|
            if packaged and packaged != archive_path:
 | 
						|
                packaged.rename(archive_path)
 | 
						|
        digest = compute_sha256(archive_path) if archive_path.exists() else None
 | 
						|
        if archive_path.exists() and archive_path.is_relative_to(self.output_dir):
 | 
						|
            manifest_path = str(archive_path.relative_to(self.output_dir))
 | 
						|
        elif archive_path.exists() and archive_path.is_relative_to(self.output_dir.parent):
 | 
						|
            manifest_path = str(archive_path.relative_to(self.output_dir.parent))
 | 
						|
        else:
 | 
						|
            manifest_path = f"helm/{archive_path.name}"
 | 
						|
        return OrderedDict((
 | 
						|
            ("name", "stellaops"),
 | 
						|
            ("version", self.version),
 | 
						|
            ("path", manifest_path),
 | 
						|
            ("sha256", digest),
 | 
						|
        ))
 | 
						|
 | 
						|
    def _digest_compose_files(self) -> List[Mapping[str, Any]]:
 | 
						|
        compose_cfg = self.config.get("compose", {})
 | 
						|
        files = compose_cfg.get("files", [])
 | 
						|
        entries: List[Mapping[str, Any]] = []
 | 
						|
        for rel_path in files:
 | 
						|
            src = self.repo_root / rel_path
 | 
						|
            if not src.exists():
 | 
						|
                continue
 | 
						|
            digest = compute_sha256(src)
 | 
						|
            entries.append(OrderedDict((
 | 
						|
                ("name", pathlib.Path(rel_path).name),
 | 
						|
                ("path", rel_path),
 | 
						|
                ("sha256", digest),
 | 
						|
            )))
 | 
						|
        return entries
 | 
						|
 | 
						|
    # ----------------
 | 
						|
    # Manifest assembly
 | 
						|
    # ----------------
 | 
						|
    def _compose_manifest(
 | 
						|
        self,
 | 
						|
        components: List[Mapping[str, Any]],
 | 
						|
        helm_meta: Optional[Mapping[str, Any]],
 | 
						|
        compose_meta: List[Mapping[str, Any]],
 | 
						|
    ) -> Dict[str, Any]:
 | 
						|
        manifest = OrderedDict()
 | 
						|
        manifest["release"] = OrderedDict((
 | 
						|
            ("version", self.version),
 | 
						|
            ("channel", self.channel),
 | 
						|
            ("date", self.release_date),
 | 
						|
            ("calendar", self.calendar),
 | 
						|
        ))
 | 
						|
        manifest["components"] = components
 | 
						|
        if helm_meta:
 | 
						|
            manifest["charts"] = [helm_meta]
 | 
						|
        if compose_meta:
 | 
						|
            manifest["compose"] = compose_meta
 | 
						|
        return manifest
 | 
						|
 | 
						|
 | 
						|
def parse_args(argv: Optional[Sequence[str]] = None) -> argparse.Namespace:
 | 
						|
    parser = argparse.ArgumentParser(description="Build StellaOps release artefacts deterministically")
 | 
						|
    parser.add_argument("--config", type=pathlib.Path, default=DEFAULT_CONFIG, help="Path to release config JSON")
 | 
						|
    parser.add_argument("--version", required=True, help="Release version string (e.g. 2025.10.0-edge)")
 | 
						|
    parser.add_argument("--channel", required=True, help="Release channel (edge|stable|lts)")
 | 
						|
    parser.add_argument("--calendar", help="Calendar tag (YYYY.MM); defaults derived from version")
 | 
						|
    parser.add_argument("--git-sha", default=os.environ.get("GIT_COMMIT", "unknown"), help="Git revision to embed")
 | 
						|
    parser.add_argument("--output", type=pathlib.Path, default=REPO_ROOT / "out/release", help="Output directory for artefacts")
 | 
						|
    parser.add_argument("--no-push", action="store_true", help="Do not push images (use docker load)")
 | 
						|
    parser.add_argument("--dry-run", action="store_true", help="Print steps without executing commands")
 | 
						|
    parser.add_argument("--registry", help="Override registry root (e.g. localhost:5000/stellaops)")
 | 
						|
    parser.add_argument("--platform", dest="platforms", action="append", metavar="PLATFORM", help="Override build platforms (repeatable)")
 | 
						|
    parser.add_argument("--skip-signing", action="store_true", help="Skip cosign signing/attestation steps")
 | 
						|
    parser.add_argument("--cosign-key", dest="cosign_key", help="Override COSIGN_KEY_REF value")
 | 
						|
    parser.add_argument("--cosign-password", dest="cosign_password", help="Password for cosign key")
 | 
						|
    parser.add_argument("--cosign-identity-token", dest="cosign_identity_token", help="Identity token for keyless cosign flows")
 | 
						|
    parser.add_argument("--no-transparency", action="store_true", help="Disable Rekor transparency log upload during signing")
 | 
						|
    return parser.parse_args(argv)
 | 
						|
 | 
						|
 | 
						|
def write_manifest(manifest: Mapping[str, Any], output_dir: pathlib.Path) -> pathlib.Path:
 | 
						|
    # Copy manifest to avoid mutating input when computing checksum
 | 
						|
    base_manifest = OrderedDict(manifest)
 | 
						|
    yaml_without_checksum = dump_yaml(base_manifest)
 | 
						|
    digest = hashlib.sha256(yaml_without_checksum.encode("utf-8")).hexdigest()
 | 
						|
    manifest_with_checksum = OrderedDict(base_manifest)
 | 
						|
    manifest_with_checksum["checksums"] = OrderedDict((("sha256", digest),))
 | 
						|
    final_yaml = dump_yaml(manifest_with_checksum)
 | 
						|
    output_path = output_dir / "release.yaml"
 | 
						|
    with output_path.open("w", encoding="utf-8") as handle:
 | 
						|
        handle.write(final_yaml)
 | 
						|
    return output_path
 | 
						|
 | 
						|
 | 
						|
def main(argv: Optional[Sequence[str]] = None) -> int:
 | 
						|
    args = parse_args(argv)
 | 
						|
    config = load_json_config(args.config)
 | 
						|
    release_date = utc_now_iso()
 | 
						|
    calendar = sanitize_calendar(args.version, args.calendar)
 | 
						|
    builder = ReleaseBuilder(
 | 
						|
        repo_root=REPO_ROOT,
 | 
						|
        config=config,
 | 
						|
        version=args.version,
 | 
						|
        channel=args.channel,
 | 
						|
        calendar=calendar,
 | 
						|
        release_date=release_date,
 | 
						|
        git_sha=args.git_sha,
 | 
						|
        output_dir=args.output,
 | 
						|
        push=not args.no_push,
 | 
						|
        dry_run=args.dry_run,
 | 
						|
        registry_override=args.registry,
 | 
						|
        platforms_override=args.platforms,
 | 
						|
        skip_signing=args.skip_signing,
 | 
						|
        cosign_key_ref=args.cosign_key,
 | 
						|
        cosign_password=args.cosign_password,
 | 
						|
        cosign_identity_token=args.cosign_identity_token,
 | 
						|
        tlog_upload=not args.no_transparency,
 | 
						|
    )
 | 
						|
    manifest = builder.run()
 | 
						|
    manifest_path = write_manifest(manifest, builder.output_dir)
 | 
						|
    print(f"✅ Release manifest written to {manifest_path}")
 | 
						|
    return 0
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    raise SystemExit(main())
 |