1104 lines
		
	
	
		
			45 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1104 lines
		
	
	
		
			45 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/env python3
 | 
						|
"""Deterministic release pipeline helper for StellaOps.
 | 
						|
 | 
						|
This script builds service containers, generates SBOM and provenance artefacts,
 | 
						|
signs them with cosign, and writes a channel-specific release manifest.
 | 
						|
 | 
						|
The workflow expects external tooling to be available on PATH:
 | 
						|
- docker (with buildx)
 | 
						|
- cosign
 | 
						|
- helm
 | 
						|
- npm / node (for the UI build)
 | 
						|
- dotnet SDK (for BuildX plugin publication)
 | 
						|
"""
 | 
						|
from __future__ import annotations
 | 
						|
 | 
						|
import argparse
 | 
						|
import contextlib
 | 
						|
import datetime as dt
 | 
						|
import hashlib
 | 
						|
import json
 | 
						|
import os
 | 
						|
import pathlib
 | 
						|
import re
 | 
						|
import shlex
 | 
						|
import shutil
 | 
						|
import stat
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
import tarfile
 | 
						|
import tempfile
 | 
						|
import uuid
 | 
						|
import zipfile
 | 
						|
from collections import OrderedDict
 | 
						|
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Sequence, Tuple
 | 
						|
 | 
						|
REPO_ROOT = pathlib.Path(__file__).resolve().parents[3]
 | 
						|
DEFAULT_CONFIG = REPO_ROOT / "ops/devops/release/components.json"
 | 
						|
 | 
						|
class CommandError(RuntimeError):
 | 
						|
    pass
 | 
						|
 | 
						|
def run(cmd: Sequence[str], *, cwd: Optional[pathlib.Path] = None, env: Optional[Mapping[str, str]] = None, capture: bool = True) -> str:
 | 
						|
    """Run a subprocess command, returning stdout (text)."""
 | 
						|
    process_env = os.environ.copy()
 | 
						|
    if env:
 | 
						|
        process_env.update(env)
 | 
						|
    result = subprocess.run(
 | 
						|
        list(cmd),
 | 
						|
        cwd=str(cwd) if cwd else None,
 | 
						|
        env=process_env,
 | 
						|
        check=False,
 | 
						|
        capture_output=capture,
 | 
						|
        text=True,
 | 
						|
    )
 | 
						|
    if process_env.get("STELLAOPS_RELEASE_DEBUG"):
 | 
						|
        sys.stderr.write(f"[debug] {' '.join(shlex.quote(c) for c in cmd)}\n")
 | 
						|
        if capture:
 | 
						|
            sys.stderr.write(result.stdout)
 | 
						|
            sys.stderr.write(result.stderr)
 | 
						|
    if result.returncode != 0:
 | 
						|
        stdout = result.stdout if capture else ""
 | 
						|
        stderr = result.stderr if capture else ""
 | 
						|
        raise CommandError(f"Command failed ({result.returncode}): {' '.join(cmd)}\nSTDOUT:\n{stdout}\nSTDERR:\n{stderr}")
 | 
						|
 | 
						|
    return result.stdout if capture else ""
 | 
						|
 | 
						|
 | 
						|
def load_json_config(path: pathlib.Path) -> Dict[str, Any]:
 | 
						|
    with path.open("r", encoding="utf-8") as handle:
 | 
						|
        return json.load(handle)
 | 
						|
 | 
						|
 | 
						|
def ensure_directory(path: pathlib.Path) -> pathlib.Path:
 | 
						|
    path.mkdir(parents=True, exist_ok=True)
 | 
						|
    return path
 | 
						|
 | 
						|
 | 
						|
def compute_sha256(path: pathlib.Path) -> str:
 | 
						|
    sha = hashlib.sha256()
 | 
						|
    with path.open("rb") as handle:
 | 
						|
        for chunk in iter(lambda: handle.read(1024 * 1024), b""):
 | 
						|
            sha.update(chunk)
 | 
						|
    return sha.hexdigest()
 | 
						|
 | 
						|
 | 
						|
def format_scalar(value: Any) -> str:
 | 
						|
    if isinstance(value, bool):
 | 
						|
        return "true" if value else "false"
 | 
						|
    if value is None:
 | 
						|
        return "null"
 | 
						|
    if isinstance(value, (int, float)):
 | 
						|
        return str(value)
 | 
						|
    text = str(value)
 | 
						|
    if text == "":
 | 
						|
        return '""'
 | 
						|
    if re.search(r"[\s:#\-\[\]\{\}]", text):
 | 
						|
        return json.dumps(text, ensure_ascii=False)
 | 
						|
    return text
 | 
						|
 | 
						|
 | 
						|
def _yaml_lines(value: Any, indent: int = 0) -> List[str]:
 | 
						|
    pad = "  " * indent
 | 
						|
    if isinstance(value, Mapping):
 | 
						|
        lines: List[str] = []
 | 
						|
        for key, val in value.items():
 | 
						|
            if isinstance(val, (Mapping, list)):
 | 
						|
                lines.append(f"{pad}{key}:")
 | 
						|
                lines.extend(_yaml_lines(val, indent + 1))
 | 
						|
            else:
 | 
						|
                lines.append(f"{pad}{key}: {format_scalar(val)}")
 | 
						|
        if not lines:
 | 
						|
            lines.append(f"{pad}{{}}")
 | 
						|
        return lines
 | 
						|
    if isinstance(value, list):
 | 
						|
        lines = []
 | 
						|
        if not value:
 | 
						|
            lines.append(f"{pad}[]")
 | 
						|
            return lines
 | 
						|
        for item in value:
 | 
						|
            if isinstance(item, (Mapping, list)):
 | 
						|
                lines.append(f"{pad}-")
 | 
						|
                lines.extend(_yaml_lines(item, indent + 1))
 | 
						|
            else:
 | 
						|
                lines.append(f"{pad}- {format_scalar(item)}")
 | 
						|
        return lines
 | 
						|
    return [f"{pad}{format_scalar(value)}"]
 | 
						|
 | 
						|
 | 
						|
def dump_yaml(data: Mapping[str, Any]) -> str:
 | 
						|
    lines: List[str] = _yaml_lines(data)
 | 
						|
    return "\n".join(lines) + "\n"
 | 
						|
 | 
						|
 | 
						|
def utc_now_iso() -> str:
 | 
						|
    return dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
 | 
						|
 | 
						|
 | 
						|
def sanitize_calendar(version: str, explicit: Optional[str]) -> str:
 | 
						|
    if explicit:
 | 
						|
        return explicit
 | 
						|
    # Expect version like 2025.10.0-edge or 2.4.1
 | 
						|
    parts = re.findall(r"\d+", version)
 | 
						|
    if len(parts) >= 2:
 | 
						|
        return f"{parts[0]}.{parts[1]}"
 | 
						|
    return dt.datetime.now(tz=dt.timezone.utc).strftime("%Y.%m")
 | 
						|
 | 
						|
 | 
						|
class ReleaseBuilder:
 | 
						|
    def __init__(
 | 
						|
        self,
 | 
						|
        *,
 | 
						|
        repo_root: pathlib.Path,
 | 
						|
        config: Mapping[str, Any],
 | 
						|
        version: str,
 | 
						|
        channel: str,
 | 
						|
        calendar: str,
 | 
						|
        release_date: str,
 | 
						|
        git_sha: str,
 | 
						|
        output_dir: pathlib.Path,
 | 
						|
        push: bool,
 | 
						|
        dry_run: bool,
 | 
						|
        registry_override: Optional[str] = None,
 | 
						|
        platforms_override: Optional[Sequence[str]] = None,
 | 
						|
        skip_signing: bool = False,
 | 
						|
        cosign_key_ref: Optional[str] = None,
 | 
						|
        cosign_password: Optional[str] = None,
 | 
						|
        cosign_identity_token: Optional[str] = None,
 | 
						|
        tlog_upload: bool = True,
 | 
						|
    ) -> None:
 | 
						|
        self.repo_root = repo_root
 | 
						|
        self.config = config
 | 
						|
        self.version = version
 | 
						|
        self.channel = channel
 | 
						|
        self.calendar = calendar
 | 
						|
        self.release_date = release_date
 | 
						|
        self.git_sha = git_sha
 | 
						|
        self.output_dir = ensure_directory(output_dir)
 | 
						|
        self.push = push
 | 
						|
        self.dry_run = dry_run
 | 
						|
        self.registry = registry_override or config.get("registry")
 | 
						|
        if not self.registry:
 | 
						|
            raise ValueError("Config missing 'registry'")
 | 
						|
        platforms = list(platforms_override) if platforms_override else config.get("platforms")
 | 
						|
        if not platforms:
 | 
						|
            platforms = ["linux/amd64", "linux/arm64"]
 | 
						|
        self.platforms = list(platforms)
 | 
						|
        self.source_date_epoch = str(int(dt.datetime.fromisoformat(release_date.replace("Z", "+00:00")).timestamp()))
 | 
						|
        self.artifacts_dir = ensure_directory(self.output_dir / "artifacts")
 | 
						|
        self.sboms_dir = ensure_directory(self.artifacts_dir / "sboms")
 | 
						|
        self.provenance_dir = ensure_directory(self.artifacts_dir / "provenance")
 | 
						|
        self.signature_dir = ensure_directory(self.artifacts_dir / "signatures")
 | 
						|
        self.metadata_dir = ensure_directory(self.artifacts_dir / "metadata")
 | 
						|
        self.debug_dir = ensure_directory(self.output_dir / "debug")
 | 
						|
        self.debug_store_dir = ensure_directory(self.debug_dir / ".build-id")
 | 
						|
        self.cli_config = config.get("cli")
 | 
						|
        self.cli_output_dir = ensure_directory(self.output_dir / "cli") if self.cli_config else None
 | 
						|
        self.temp_dir = pathlib.Path(tempfile.mkdtemp(prefix="stellaops-release-"))
 | 
						|
        self.skip_signing = skip_signing
 | 
						|
        self.tlog_upload = tlog_upload
 | 
						|
        self.cosign_key_ref = cosign_key_ref or os.environ.get("COSIGN_KEY_REF")
 | 
						|
        self.cosign_identity_token = cosign_identity_token or os.environ.get("COSIGN_IDENTITY_TOKEN")
 | 
						|
        password = cosign_password if cosign_password is not None else os.environ.get("COSIGN_PASSWORD", "")
 | 
						|
        self.cosign_env = {
 | 
						|
            "COSIGN_PASSWORD": password,
 | 
						|
            "COSIGN_EXPERIMENTAL": "1",
 | 
						|
            "COSIGN_ALLOW_HTTP_REGISTRY": os.environ.get("COSIGN_ALLOW_HTTP_REGISTRY", "1"),
 | 
						|
            "COSIGN_DOCKER_MEDIA_TYPES": os.environ.get("COSIGN_DOCKER_MEDIA_TYPES", "1"),
 | 
						|
        }
 | 
						|
        # Cache resolved objcopy binaries keyed by machine identifier to avoid repeated lookups.
 | 
						|
        self._objcopy_cache: Dict[str, Optional[str]] = {}
 | 
						|
        self._missing_symbol_platforms: Dict[str, int] = {}
 | 
						|
 | 
						|
    # ----------------
 | 
						|
    # Build steps
 | 
						|
    # ----------------
 | 
						|
    def run(self) -> Dict[str, Any]:
 | 
						|
        components_result = []
 | 
						|
        if self.dry_run:
 | 
						|
            print("⚠️  Dry-run enabled; commands will be skipped")
 | 
						|
        self._prime_buildx_plugin()
 | 
						|
        for component in self.config.get("components", []):
 | 
						|
            result = self._build_component(component)
 | 
						|
            components_result.append(result)
 | 
						|
        helm_meta = self._package_helm()
 | 
						|
        compose_meta = self._digest_compose_files()
 | 
						|
        debug_meta = self._collect_debug_store(components_result)
 | 
						|
        cli_meta = self._build_cli_artifacts()
 | 
						|
        manifest = self._compose_manifest(components_result, helm_meta, compose_meta, debug_meta, cli_meta)
 | 
						|
        return manifest
 | 
						|
 | 
						|
    def _prime_buildx_plugin(self) -> None:
 | 
						|
        plugin_cfg = self.config.get("buildxPlugin")
 | 
						|
        if not plugin_cfg:
 | 
						|
            return
 | 
						|
        project = plugin_cfg.get("project")
 | 
						|
        if not project:
 | 
						|
            return
 | 
						|
        out_dir = ensure_directory(self.temp_dir / "buildx")
 | 
						|
        if not self.dry_run:
 | 
						|
            run([
 | 
						|
                "dotnet",
 | 
						|
                "publish",
 | 
						|
                project,
 | 
						|
                "-c",
 | 
						|
                "Release",
 | 
						|
                "-o",
 | 
						|
                str(out_dir),
 | 
						|
            ])
 | 
						|
            cas_dir = ensure_directory(self.temp_dir / "cas")
 | 
						|
            run([
 | 
						|
                "dotnet",
 | 
						|
                str(out_dir / "StellaOps.Scanner.Sbomer.BuildXPlugin.dll"),
 | 
						|
                "handshake",
 | 
						|
                "--manifest",
 | 
						|
                str(out_dir),
 | 
						|
                "--cas",
 | 
						|
                str(cas_dir),
 | 
						|
            ])
 | 
						|
 | 
						|
    def _component_tags(self, repo: str) -> List[str]:
 | 
						|
        base = f"{self.registry}/{repo}"
 | 
						|
        tags = [f"{base}:{self.version}"]
 | 
						|
        if self.channel:
 | 
						|
            tags.append(f"{base}:{self.channel}")
 | 
						|
        return tags
 | 
						|
 | 
						|
    def _component_ref(self, repo: str, digest: str) -> str:
 | 
						|
        return f"{self.registry}/{repo}@{digest}"
 | 
						|
 | 
						|
    def _relative_path(self, path: pathlib.Path) -> str:
 | 
						|
        try:
 | 
						|
            return str(path.relative_to(self.output_dir.parent))
 | 
						|
        except ValueError:
 | 
						|
            return str(path)
 | 
						|
 | 
						|
    def _build_component(self, component: Mapping[str, Any]) -> Mapping[str, Any]:
 | 
						|
        name = component["name"]
 | 
						|
        repo = component.get("repository", name)
 | 
						|
        kind = component.get("kind", "dotnet-service")
 | 
						|
        dockerfile = component.get("dockerfile")
 | 
						|
        if not dockerfile:
 | 
						|
            raise ValueError(f"Component {name} missing dockerfile")
 | 
						|
        context = component.get("context", ".")
 | 
						|
        iid_file = self.temp_dir / f"{name}.iid"
 | 
						|
        metadata_file = self.metadata_dir / f"{name}.metadata.json"
 | 
						|
 | 
						|
        build_args = {
 | 
						|
            "VERSION": self.version,
 | 
						|
            "CHANNEL": self.channel,
 | 
						|
            "GIT_SHA": self.git_sha,
 | 
						|
            "SOURCE_DATE_EPOCH": self.source_date_epoch,
 | 
						|
        }
 | 
						|
        docker_cfg = self.config.get("docker", {})
 | 
						|
        if kind == "dotnet-service":
 | 
						|
            build_args.update({
 | 
						|
                "PROJECT": component["project"],
 | 
						|
                "ENTRYPOINT_DLL": component["entrypoint"],
 | 
						|
                "SDK_IMAGE": docker_cfg.get("sdkImage", "mcr.microsoft.com/dotnet/nightly/sdk:10.0"),
 | 
						|
                "RUNTIME_IMAGE": docker_cfg.get("runtimeImage", "gcr.io/distroless/dotnet/aspnet:latest"),
 | 
						|
            })
 | 
						|
        elif kind == "angular-ui":
 | 
						|
            build_args.update({
 | 
						|
                "NODE_IMAGE": docker_cfg.get("nodeImage", "node:20.14.0-bookworm"),
 | 
						|
                "NGINX_IMAGE": docker_cfg.get("nginxImage", "nginx:1.27-alpine"),
 | 
						|
            })
 | 
						|
        else:
 | 
						|
            raise ValueError(f"Unsupported component kind {kind}")
 | 
						|
 | 
						|
        tags = self._component_tags(repo)
 | 
						|
        build_cmd = [
 | 
						|
            "docker",
 | 
						|
            "buildx",
 | 
						|
            "build",
 | 
						|
            "--file",
 | 
						|
            dockerfile,
 | 
						|
            "--metadata-file",
 | 
						|
            str(metadata_file),
 | 
						|
            "--iidfile",
 | 
						|
            str(iid_file),
 | 
						|
            "--progress",
 | 
						|
            "plain",
 | 
						|
            "--platform",
 | 
						|
            ",".join(self.platforms),
 | 
						|
        ]
 | 
						|
        for key, value in build_args.items():
 | 
						|
            build_cmd.extend(["--build-arg", f"{key}={value}"])
 | 
						|
        for tag in tags:
 | 
						|
            build_cmd.extend(["--tag", tag])
 | 
						|
        build_cmd.extend([
 | 
						|
            "--attest",
 | 
						|
            "type=sbom",
 | 
						|
            "--attest",
 | 
						|
            "type=provenance,mode=max",
 | 
						|
        ])
 | 
						|
        if self.push:
 | 
						|
            build_cmd.append("--push")
 | 
						|
        else:
 | 
						|
            build_cmd.append("--load")
 | 
						|
        build_cmd.append(context)
 | 
						|
 | 
						|
        if not self.dry_run:
 | 
						|
            run(build_cmd, cwd=self.repo_root)
 | 
						|
 | 
						|
        digest = iid_file.read_text(encoding="utf-8").strip() if iid_file.exists() else ""
 | 
						|
        image_ref = self._component_ref(repo, digest) if digest else ""
 | 
						|
 | 
						|
        bundle_info = self._sign_image(name, image_ref, tags)
 | 
						|
        sbom_info = self._generate_sbom(name, image_ref)
 | 
						|
        provenance_info = self._attach_provenance(name, image_ref)
 | 
						|
 | 
						|
        component_entry = OrderedDict()
 | 
						|
        component_entry["name"] = name
 | 
						|
        if digest:
 | 
						|
            component_entry["image"] = image_ref
 | 
						|
        component_entry["tags"] = tags
 | 
						|
        if sbom_info:
 | 
						|
            component_entry["sbom"] = sbom_info
 | 
						|
        if provenance_info:
 | 
						|
            component_entry["provenance"] = provenance_info
 | 
						|
        if bundle_info:
 | 
						|
            component_entry["signature"] = bundle_info
 | 
						|
        if metadata_file.exists():
 | 
						|
            metadata_rel = (
 | 
						|
                str(metadata_file.relative_to(self.output_dir.parent))
 | 
						|
                if metadata_file.is_relative_to(self.output_dir.parent)
 | 
						|
                else str(metadata_file)
 | 
						|
            )
 | 
						|
            component_entry["metadata"] = OrderedDict((
 | 
						|
                ("path", metadata_rel),
 | 
						|
                ("sha256", compute_sha256(metadata_file)),
 | 
						|
            ))
 | 
						|
        return component_entry
 | 
						|
 | 
						|
    def _sign_image(self, name: str, image_ref: str, tags: Sequence[str]) -> Optional[Mapping[str, Any]]:
 | 
						|
        if self.skip_signing:
 | 
						|
            return None
 | 
						|
        if not image_ref:
 | 
						|
            return None
 | 
						|
        if not (self.cosign_key_ref or self.cosign_identity_token):
 | 
						|
            raise ValueError("Signing requested but no cosign key or identity token provided. Use --skip-signing to bypass.")
 | 
						|
        signature_path = self.signature_dir / f"{name}.signature"
 | 
						|
        cmd = ["cosign", "sign", "--yes"]
 | 
						|
        if self.cosign_key_ref:
 | 
						|
            cmd.extend(["--key", self.cosign_key_ref])
 | 
						|
        if self.cosign_identity_token:
 | 
						|
            cmd.extend(["--identity-token", self.cosign_identity_token])
 | 
						|
        if not self.tlog_upload:
 | 
						|
            cmd.append("--tlog-upload=false")
 | 
						|
        cmd.append("--allow-http-registry")
 | 
						|
        cmd.append(image_ref)
 | 
						|
        if self.dry_run:
 | 
						|
            return None
 | 
						|
        run(cmd, env=self.cosign_env)
 | 
						|
        signature_data = run([
 | 
						|
            "cosign",
 | 
						|
            "download",
 | 
						|
            "signature",
 | 
						|
            "--allow-http-registry",
 | 
						|
            image_ref,
 | 
						|
        ])
 | 
						|
        signature_path.write_text(signature_data, encoding="utf-8")
 | 
						|
        signature_sha = compute_sha256(signature_path)
 | 
						|
        signature_ref = run([
 | 
						|
            "cosign",
 | 
						|
            "triangulate",
 | 
						|
            "--allow-http-registry",
 | 
						|
            image_ref,
 | 
						|
        ]).strip()
 | 
						|
        return OrderedDict(
 | 
						|
            (
 | 
						|
                ("signature", OrderedDict((
 | 
						|
                    ("path", str(signature_path.relative_to(self.output_dir.parent)) if signature_path.is_relative_to(self.output_dir.parent) else str(signature_path)),
 | 
						|
                    ("sha256", signature_sha),
 | 
						|
                    ("ref", signature_ref),
 | 
						|
                    ("tlogUploaded", self.tlog_upload),
 | 
						|
                ))),
 | 
						|
            )
 | 
						|
        )
 | 
						|
 | 
						|
    def _generate_sbom(self, name: str, image_ref: str) -> Optional[Mapping[str, Any]]:
 | 
						|
        if not image_ref or self.dry_run:
 | 
						|
            return None
 | 
						|
        sbom_path = self.sboms_dir / f"{name}.cyclonedx.json"
 | 
						|
        run([
 | 
						|
            "docker",
 | 
						|
            "sbom",
 | 
						|
            image_ref,
 | 
						|
            "--format",
 | 
						|
            "cyclonedx-json",
 | 
						|
            "--output",
 | 
						|
            str(sbom_path),
 | 
						|
        ])
 | 
						|
        entry = OrderedDict((
 | 
						|
            ("path", str(sbom_path.relative_to(self.output_dir.parent)) if sbom_path.is_relative_to(self.output_dir.parent) else str(sbom_path)),
 | 
						|
            ("sha256", compute_sha256(sbom_path)),
 | 
						|
        ))
 | 
						|
        if self.skip_signing:
 | 
						|
            return entry
 | 
						|
        attach_cmd = [
 | 
						|
            "cosign",
 | 
						|
            "attach",
 | 
						|
            "sbom",
 | 
						|
            "--sbom",
 | 
						|
            str(sbom_path),
 | 
						|
            "--type",
 | 
						|
            "cyclonedx",
 | 
						|
        ]
 | 
						|
        if self.cosign_key_ref:
 | 
						|
            attach_cmd.extend(["--key", self.cosign_key_ref])
 | 
						|
        attach_cmd.append("--allow-http-registry")
 | 
						|
        attach_cmd.append(image_ref)
 | 
						|
        run(attach_cmd, env=self.cosign_env)
 | 
						|
        reference = run(["cosign", "triangulate", "--type", "sbom", "--allow-http-registry", image_ref]).strip()
 | 
						|
        entry["ref"] = reference
 | 
						|
        return entry
 | 
						|
 | 
						|
    def _attach_provenance(self, name: str, image_ref: str) -> Optional[Mapping[str, Any]]:
 | 
						|
        if not image_ref or self.dry_run:
 | 
						|
            return None
 | 
						|
        predicate = OrderedDict()
 | 
						|
        predicate["buildDefinition"] = OrderedDict(
 | 
						|
            (
 | 
						|
                ("buildType", "https://git.stella-ops.org/stellaops/release"),
 | 
						|
                ("externalParameters", OrderedDict((
 | 
						|
                    ("component", name),
 | 
						|
                    ("version", self.version),
 | 
						|
                    ("channel", self.channel),
 | 
						|
                ))),
 | 
						|
            )
 | 
						|
        )
 | 
						|
        predicate["runDetails"] = OrderedDict(
 | 
						|
            (
 | 
						|
                ("builder", OrderedDict((("id", "https://github.com/actions"),))),
 | 
						|
                ("metadata", OrderedDict((("finishedOn", self.release_date),))),
 | 
						|
            )
 | 
						|
        )
 | 
						|
        predicate_path = self.provenance_dir / f"{name}.provenance.json"
 | 
						|
        with predicate_path.open("w", encoding="utf-8") as handle:
 | 
						|
            json.dump(predicate, handle, indent=2, sort_keys=True)
 | 
						|
            handle.write("\n")
 | 
						|
        entry = OrderedDict((
 | 
						|
            ("path", str(predicate_path.relative_to(self.output_dir.parent)) if predicate_path.is_relative_to(self.output_dir.parent) else str(predicate_path)),
 | 
						|
            ("sha256", compute_sha256(predicate_path)),
 | 
						|
        ))
 | 
						|
        if self.skip_signing:
 | 
						|
            return entry
 | 
						|
        cmd = [
 | 
						|
            "cosign",
 | 
						|
            "attest",
 | 
						|
            "--predicate",
 | 
						|
            str(predicate_path),
 | 
						|
            "--type",
 | 
						|
            "https://slsa.dev/provenance/v1",
 | 
						|
        ]
 | 
						|
        if self.cosign_key_ref:
 | 
						|
            cmd.extend(["--key", self.cosign_key_ref])
 | 
						|
        if not self.tlog_upload:
 | 
						|
            cmd.append("--tlog-upload=false")
 | 
						|
        cmd.append("--allow-http-registry")
 | 
						|
        cmd.append(image_ref)
 | 
						|
        run(cmd, env=self.cosign_env)
 | 
						|
        ref = run([
 | 
						|
            "cosign",
 | 
						|
            "triangulate",
 | 
						|
            "--type",
 | 
						|
            "https://slsa.dev/provenance/v1",
 | 
						|
            "--allow-http-registry",
 | 
						|
            image_ref,
 | 
						|
        ]).strip()
 | 
						|
        entry["ref"] = ref
 | 
						|
        return entry
 | 
						|
 | 
						|
    def _collect_debug_store(self, components: Sequence[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]:
 | 
						|
        if self.dry_run:
 | 
						|
            return None
 | 
						|
        debug_records: Dict[Tuple[str, str], OrderedDict[str, Any]] = {}
 | 
						|
        for component in components:
 | 
						|
            image_ref = component.get("image")
 | 
						|
            if not image_ref:
 | 
						|
                continue
 | 
						|
            name = component.get("name", "unknown")
 | 
						|
            entries = self._extract_debug_entries(name, image_ref)
 | 
						|
            for entry in entries:
 | 
						|
                key = (entry["platform"], entry["buildId"])
 | 
						|
                existing = debug_records.get(key)
 | 
						|
                if existing is None:
 | 
						|
                    record = OrderedDict((
 | 
						|
                        ("buildId", entry["buildId"]),
 | 
						|
                        ("platform", entry["platform"]),
 | 
						|
                        ("debugPath", entry["debugPath"]),
 | 
						|
                        ("sha256", entry["sha256"]),
 | 
						|
                        ("size", entry["size"]),
 | 
						|
                        ("components", [entry["component"]]),
 | 
						|
                        ("images", [entry["image"]]),
 | 
						|
                        ("sources", list(entry["sources"])),
 | 
						|
                    ))
 | 
						|
                    debug_records[key] = record
 | 
						|
                else:
 | 
						|
                    if entry["sha256"] != existing["sha256"]:
 | 
						|
                        raise RuntimeError(
 | 
						|
                            f"Build-id {entry['buildId']} for platform {entry['platform']} produced conflicting hashes"
 | 
						|
                        )
 | 
						|
                    if entry["component"] not in existing["components"]:
 | 
						|
                        existing["components"].append(entry["component"])
 | 
						|
                    if entry["image"] not in existing["images"]:
 | 
						|
                        existing["images"].append(entry["image"])
 | 
						|
                    for source in entry["sources"]:
 | 
						|
                        if source not in existing["sources"]:
 | 
						|
                            existing["sources"].append(source)
 | 
						|
        if not debug_records:
 | 
						|
            sys.stderr.write(
 | 
						|
                "[error] release build produced no debug artefacts; enable symbol extraction so out/release/debug is populated (DEVOPS-REL-17-004).\n"
 | 
						|
            )
 | 
						|
            # Remove empty directories before failing
 | 
						|
            with contextlib.suppress(FileNotFoundError, OSError):
 | 
						|
                if not any(self.debug_store_dir.iterdir()):
 | 
						|
                    self.debug_store_dir.rmdir()
 | 
						|
            with contextlib.suppress(FileNotFoundError, OSError):
 | 
						|
                if not any(self.debug_dir.iterdir()):
 | 
						|
                    self.debug_dir.rmdir()
 | 
						|
            raise RuntimeError(
 | 
						|
                "Debug store collection produced no build-id artefacts (DEVOPS-REL-17-004)."
 | 
						|
            )
 | 
						|
        entries = []
 | 
						|
        for record in debug_records.values():
 | 
						|
            entry = OrderedDict((
 | 
						|
                ("buildId", record["buildId"]),
 | 
						|
                ("platform", record["platform"]),
 | 
						|
                ("debugPath", record["debugPath"]),
 | 
						|
                ("sha256", record["sha256"]),
 | 
						|
                ("size", record["size"]),
 | 
						|
                ("components", sorted(record["components"])),
 | 
						|
                ("images", sorted(record["images"])),
 | 
						|
                ("sources", sorted(record["sources"])),
 | 
						|
            ))
 | 
						|
            entries.append(entry)
 | 
						|
        entries.sort(key=lambda item: (item["platform"], item["buildId"]))
 | 
						|
        manifest_path = self.debug_dir / "debug-manifest.json"
 | 
						|
        platform_counts: Dict[str, int] = {}
 | 
						|
        for entry in entries:
 | 
						|
            platform_counts[entry["platform"]] = platform_counts.get(entry["platform"], 0) + 1
 | 
						|
        missing_platforms = [
 | 
						|
            platform
 | 
						|
            for platform in self._missing_symbol_platforms
 | 
						|
            if platform_counts.get(platform, 0) == 0
 | 
						|
        ]
 | 
						|
        if missing_platforms:
 | 
						|
            raise RuntimeError(
 | 
						|
                "Debug extraction skipped all binaries for platforms without objcopy support: "
 | 
						|
                + ", ".join(sorted(missing_platforms))
 | 
						|
            )
 | 
						|
        manifest_data = OrderedDict((
 | 
						|
            ("generatedAt", self.release_date),
 | 
						|
            ("version", self.version),
 | 
						|
            ("channel", self.channel),
 | 
						|
            ("artifacts", entries),
 | 
						|
        ))
 | 
						|
        with manifest_path.open("w", encoding="utf-8") as handle:
 | 
						|
            json.dump(manifest_data, handle, indent=2)
 | 
						|
            handle.write("\n")
 | 
						|
        manifest_sha = compute_sha256(manifest_path)
 | 
						|
        sha_path = manifest_path.with_suffix(manifest_path.suffix + ".sha256")
 | 
						|
        sha_path.write_text(f"{manifest_sha}  {manifest_path.name}\n", encoding="utf-8")
 | 
						|
        manifest_rel = manifest_path.relative_to(self.output_dir).as_posix()
 | 
						|
        store_rel = self.debug_store_dir.relative_to(self.output_dir).as_posix()
 | 
						|
        platforms = sorted({entry["platform"] for entry in entries})
 | 
						|
        return OrderedDict((
 | 
						|
            ("manifest", manifest_rel),
 | 
						|
            ("sha256", manifest_sha),
 | 
						|
            ("entries", len(entries)),
 | 
						|
            ("platforms", platforms),
 | 
						|
            ("directory", store_rel),
 | 
						|
        ))
 | 
						|
 | 
						|
    # ----------------
 | 
						|
    # CLI packaging
 | 
						|
    # ----------------
 | 
						|
    def _build_cli_artifacts(self) -> List[Mapping[str, Any]]:
 | 
						|
        if not self.cli_config or self.dry_run:
 | 
						|
            return []
 | 
						|
        project_rel = self.cli_config.get("project")
 | 
						|
        if not project_rel:
 | 
						|
            return []
 | 
						|
        project_path = (self.repo_root / project_rel).resolve()
 | 
						|
        if not project_path.exists():
 | 
						|
            raise FileNotFoundError(f"CLI project not found at {project_path}")
 | 
						|
        runtimes: Sequence[str] = self.cli_config.get("runtimes", [])
 | 
						|
        if not runtimes:
 | 
						|
            runtimes = ("linux-x64",)
 | 
						|
        package_prefix = self.cli_config.get("packagePrefix", "stella")
 | 
						|
        ensure_directory(self.cli_output_dir or (self.output_dir / "cli"))
 | 
						|
 | 
						|
        cli_entries: List[Mapping[str, Any]] = []
 | 
						|
        for runtime in runtimes:
 | 
						|
            entry = self._build_cli_for_runtime(project_path, runtime, package_prefix)
 | 
						|
            cli_entries.append(entry)
 | 
						|
        return cli_entries
 | 
						|
 | 
						|
    def _build_cli_for_runtime(
 | 
						|
        self,
 | 
						|
        project_path: pathlib.Path,
 | 
						|
        runtime: str,
 | 
						|
        package_prefix: str,
 | 
						|
    ) -> Mapping[str, Any]:
 | 
						|
        publish_dir = ensure_directory(self.temp_dir / f"cli-publish-{runtime}")
 | 
						|
        publish_cmd = [
 | 
						|
            "dotnet",
 | 
						|
            "publish",
 | 
						|
            str(project_path),
 | 
						|
            "--configuration",
 | 
						|
            "Release",
 | 
						|
            "--runtime",
 | 
						|
            runtime,
 | 
						|
            "--self-contained",
 | 
						|
            "true",
 | 
						|
            "/p:PublishSingleFile=true",
 | 
						|
            "/p:IncludeNativeLibrariesForSelfExtract=true",
 | 
						|
            "/p:EnableCompressionInSingleFile=true",
 | 
						|
            "/p:InvariantGlobalization=true",
 | 
						|
            "--output",
 | 
						|
            str(publish_dir),
 | 
						|
        ]
 | 
						|
        run(publish_cmd, cwd=self.repo_root)
 | 
						|
 | 
						|
        original_name = "StellaOps.Cli"
 | 
						|
        if runtime.startswith("win"):
 | 
						|
            source = publish_dir / f"{original_name}.exe"
 | 
						|
            target = publish_dir / "stella.exe"
 | 
						|
        else:
 | 
						|
            source = publish_dir / original_name
 | 
						|
            target = publish_dir / "stella"
 | 
						|
        if source.exists():
 | 
						|
            if target.exists():
 | 
						|
                target.unlink()
 | 
						|
            source.rename(target)
 | 
						|
            if not runtime.startswith("win"):
 | 
						|
                target.chmod(target.stat().st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
 | 
						|
 | 
						|
        package_dir = self.cli_output_dir or (self.output_dir / "cli")
 | 
						|
        ensure_directory(package_dir)
 | 
						|
        archive_name = f"{package_prefix}-{self.version}-{runtime}"
 | 
						|
        if runtime.startswith("win"):
 | 
						|
            package_path = package_dir / f"{archive_name}.zip"
 | 
						|
            self._archive_zip(publish_dir, package_path)
 | 
						|
        else:
 | 
						|
            package_path = package_dir / f"{archive_name}.tar.gz"
 | 
						|
            self._archive_tar(publish_dir, package_path)
 | 
						|
 | 
						|
        digest = compute_sha256(package_path)
 | 
						|
        sha_path = package_path.with_suffix(package_path.suffix + ".sha256")
 | 
						|
        sha_path.write_text(f"{digest}  {package_path.name}\n", encoding="utf-8")
 | 
						|
 | 
						|
        archive_info = OrderedDict((
 | 
						|
            ("path", self._relative_path(package_path)),
 | 
						|
            ("sha256", digest),
 | 
						|
        ))
 | 
						|
        signature_info = self._sign_file(package_path)
 | 
						|
        if signature_info:
 | 
						|
            archive_info["signature"] = signature_info
 | 
						|
 | 
						|
        sbom_info = self._generate_cli_sbom(runtime, publish_dir)
 | 
						|
 | 
						|
        entry = OrderedDict((
 | 
						|
            ("runtime", runtime),
 | 
						|
            ("archive", archive_info),
 | 
						|
        ))
 | 
						|
        if sbom_info:
 | 
						|
            entry["sbom"] = sbom_info
 | 
						|
        return entry
 | 
						|
 | 
						|
    def _archive_tar(self, source_dir: pathlib.Path, archive_path: pathlib.Path) -> None:
 | 
						|
        with tarfile.open(archive_path, "w:gz") as tar:
 | 
						|
            for item in sorted(source_dir.rglob("*")):
 | 
						|
                arcname = item.relative_to(source_dir)
 | 
						|
                tar.add(item, arcname=arcname)
 | 
						|
 | 
						|
    def _archive_zip(self, source_dir: pathlib.Path, archive_path: pathlib.Path) -> None:
 | 
						|
        with zipfile.ZipFile(archive_path, "w", compression=zipfile.ZIP_DEFLATED) as zipf:
 | 
						|
            for item in sorted(source_dir.rglob("*")):
 | 
						|
                if item.is_dir():
 | 
						|
                    continue
 | 
						|
                arcname = item.relative_to(source_dir).as_posix()
 | 
						|
                zip_info = zipfile.ZipInfo(arcname)
 | 
						|
                zip_info.external_attr = (item.stat().st_mode & 0xFFFF) << 16
 | 
						|
                with item.open("rb") as handle:
 | 
						|
                    zipf.writestr(zip_info, handle.read())
 | 
						|
 | 
						|
    def _generate_cli_sbom(self, runtime: str, publish_dir: pathlib.Path) -> Optional[Mapping[str, Any]]:
 | 
						|
        if self.dry_run:
 | 
						|
            return None
 | 
						|
        sbom_dir = ensure_directory(self.sboms_dir / "cli")
 | 
						|
        sbom_path = sbom_dir / f"cli-{runtime}.cyclonedx.json"
 | 
						|
        run([
 | 
						|
            "syft",
 | 
						|
            f"dir:{publish_dir}",
 | 
						|
            "--output",
 | 
						|
            f"cyclonedx-json={sbom_path}",
 | 
						|
        ])
 | 
						|
        entry = OrderedDict((
 | 
						|
            ("path", self._relative_path(sbom_path)),
 | 
						|
            ("sha256", compute_sha256(sbom_path)),
 | 
						|
        ))
 | 
						|
        signature_info = self._sign_file(sbom_path)
 | 
						|
        if signature_info:
 | 
						|
            entry["signature"] = signature_info
 | 
						|
        return entry
 | 
						|
 | 
						|
    def _sign_file(self, path: pathlib.Path) -> Optional[Mapping[str, Any]]:
 | 
						|
        if self.skip_signing:
 | 
						|
            return None
 | 
						|
        if not (self.cosign_key_ref or self.cosign_identity_token):
 | 
						|
            raise ValueError(
 | 
						|
                "Signing requested but no cosign key or identity token provided. Use --skip-signing to bypass."
 | 
						|
            )
 | 
						|
        signature_path = path.with_suffix(path.suffix + ".sig")
 | 
						|
        sha_path = path.with_suffix(path.suffix + ".sha256")
 | 
						|
        digest = compute_sha256(path)
 | 
						|
        sha_path.write_text(f"{digest}  {path.name}\n", encoding="utf-8")
 | 
						|
        cmd = ["cosign", "sign-blob", "--yes", str(path)]
 | 
						|
        if self.cosign_key_ref:
 | 
						|
            cmd.extend(["--key", self.cosign_key_ref])
 | 
						|
        if self.cosign_identity_token:
 | 
						|
            cmd.extend(["--identity-token", self.cosign_identity_token])
 | 
						|
        if not self.tlog_upload:
 | 
						|
            cmd.append("--tlog-upload=false")
 | 
						|
        signature_data = run(cmd, env=self.cosign_env).strip()
 | 
						|
        signature_path.write_text(signature_data + "\n", encoding="utf-8")
 | 
						|
        return OrderedDict((
 | 
						|
            ("path", self._relative_path(signature_path)),
 | 
						|
            ("sha256", compute_sha256(signature_path)),
 | 
						|
            ("tlogUploaded", self.tlog_upload),
 | 
						|
        ))
 | 
						|
 | 
						|
    def _extract_debug_entries(self, component_name: str, image_ref: str) -> List[OrderedDict[str, Any]]:
 | 
						|
        if self.dry_run:
 | 
						|
            return []
 | 
						|
        entries: List[OrderedDict[str, Any]] = []
 | 
						|
        platforms = self.platforms if self.push else [None]
 | 
						|
        for platform in platforms:
 | 
						|
            platform_label = platform or (self.platforms[0] if self.platforms else "linux/amd64")
 | 
						|
            if self.push:
 | 
						|
                pull_cmd = ["docker", "pull"]
 | 
						|
                if platform:
 | 
						|
                    pull_cmd.extend(["--platform", platform])
 | 
						|
                pull_cmd.append(image_ref)
 | 
						|
                run(pull_cmd)
 | 
						|
            create_cmd = ["docker", "create"]
 | 
						|
            if platform:
 | 
						|
                create_cmd.extend(["--platform", platform])
 | 
						|
            create_cmd.append(image_ref)
 | 
						|
            container_id = run(create_cmd).strip()
 | 
						|
            export_path = self.temp_dir / f"{container_id}.tar"
 | 
						|
            try:
 | 
						|
                run(["docker", "export", container_id, "-o", str(export_path)], capture=False)
 | 
						|
            finally:
 | 
						|
                run(["docker", "rm", container_id], capture=False)
 | 
						|
            rootfs_dir = ensure_directory(self.temp_dir / f"{component_name}-{platform_label}-{uuid.uuid4().hex}")
 | 
						|
            try:
 | 
						|
                with tarfile.open(export_path, "r:*") as tar:
 | 
						|
                    self._safe_extract_tar(tar, rootfs_dir)
 | 
						|
            finally:
 | 
						|
                export_path.unlink(missing_ok=True)
 | 
						|
            try:
 | 
						|
                for file_path in rootfs_dir.rglob("*"):
 | 
						|
                    if not file_path.is_file() or file_path.is_symlink():
 | 
						|
                        continue
 | 
						|
                    if not self._is_elf(file_path):
 | 
						|
                        continue
 | 
						|
                    build_id, machine = self._read_build_id_and_machine(file_path)
 | 
						|
                    if not build_id:
 | 
						|
                        continue
 | 
						|
                    debug_file = self._debug_file_for_build_id(build_id)
 | 
						|
                    if not debug_file.exists():
 | 
						|
                        debug_file.parent.mkdir(parents=True, exist_ok=True)
 | 
						|
                        temp_debug = self.temp_dir / f"{build_id}.debug"
 | 
						|
                        with contextlib.suppress(FileNotFoundError):
 | 
						|
                            temp_debug.unlink()
 | 
						|
                        objcopy_tool = self._resolve_objcopy_tool(machine)
 | 
						|
                        if not objcopy_tool:
 | 
						|
                            self._emit_objcopy_warning(machine, platform_label, file_path)
 | 
						|
                            with contextlib.suppress(FileNotFoundError):
 | 
						|
                                temp_debug.unlink()
 | 
						|
                            continue
 | 
						|
                        try:
 | 
						|
                            run([objcopy_tool, "--only-keep-debug", str(file_path), str(temp_debug)], capture=False)
 | 
						|
                        except CommandError:
 | 
						|
                            with contextlib.suppress(FileNotFoundError):
 | 
						|
                                temp_debug.unlink()
 | 
						|
                            continue
 | 
						|
                        debug_file.parent.mkdir(parents=True, exist_ok=True)
 | 
						|
                        shutil.move(str(temp_debug), str(debug_file))
 | 
						|
                    sha = compute_sha256(debug_file)
 | 
						|
                    rel_debug = debug_file.relative_to(self.output_dir).as_posix()
 | 
						|
                    source_rel = file_path.relative_to(rootfs_dir).as_posix()
 | 
						|
                    entry = OrderedDict((
 | 
						|
                        ("component", component_name),
 | 
						|
                        ("image", image_ref),
 | 
						|
                        ("platform", platform_label),
 | 
						|
                        ("buildId", build_id),
 | 
						|
                        ("debugPath", rel_debug),
 | 
						|
                        ("sha256", sha),
 | 
						|
                        ("size", debug_file.stat().st_size),
 | 
						|
                        ("sources", [source_rel]),
 | 
						|
                    ))
 | 
						|
                    entries.append(entry)
 | 
						|
            finally:
 | 
						|
                shutil.rmtree(rootfs_dir, ignore_errors=True)
 | 
						|
        return entries
 | 
						|
 | 
						|
    def _debug_file_for_build_id(self, build_id: str) -> pathlib.Path:
 | 
						|
        normalized = build_id.lower()
 | 
						|
        prefix = normalized[:2]
 | 
						|
        remainder = normalized[2:]
 | 
						|
        return self.debug_store_dir / prefix / f"{remainder}.debug"
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def _safe_extract_tar(tar: tarfile.TarFile, dest: pathlib.Path) -> None:
 | 
						|
        dest_root = dest.resolve()
 | 
						|
        members = tar.getmembers()
 | 
						|
        for member in members:
 | 
						|
            member_path = (dest / member.name).resolve()
 | 
						|
            if not str(member_path).startswith(str(dest_root)):
 | 
						|
                raise RuntimeError(f"Refusing to extract '{member.name}' outside of destination directory")
 | 
						|
        tar.extractall(dest)
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def _is_elf(path: pathlib.Path) -> bool:
 | 
						|
        try:
 | 
						|
            with path.open("rb") as handle:
 | 
						|
                return handle.read(4) == b"\x7fELF"
 | 
						|
        except OSError:
 | 
						|
            return False
 | 
						|
 | 
						|
    def _read_build_id_and_machine(self, path: pathlib.Path) -> Tuple[Optional[str], Optional[str]]:
 | 
						|
        try:
 | 
						|
            header_output = run(["readelf", "-nh", str(path)])
 | 
						|
        except CommandError:
 | 
						|
            return None, None
 | 
						|
        build_id: Optional[str] = None
 | 
						|
        machine: Optional[str] = None
 | 
						|
        for line in header_output.splitlines():
 | 
						|
            stripped = line.strip()
 | 
						|
            if stripped.startswith("Build ID:"):
 | 
						|
                build_id = stripped.split("Build ID:", 1)[1].strip().lower()
 | 
						|
            elif stripped.startswith("Machine:"):
 | 
						|
                machine = stripped.split("Machine:", 1)[1].strip()
 | 
						|
        return build_id, machine
 | 
						|
 | 
						|
    def _resolve_objcopy_tool(self, machine: Optional[str]) -> Optional[str]:
 | 
						|
        key = (machine or "generic").lower()
 | 
						|
        if key in self._objcopy_cache:
 | 
						|
            return self._objcopy_cache[key]
 | 
						|
 | 
						|
        env_override = None
 | 
						|
        if machine and "aarch64" in machine.lower():
 | 
						|
            env_override = os.environ.get("STELLAOPS_OBJCOPY_AARCH64")
 | 
						|
            candidates = [
 | 
						|
                env_override,
 | 
						|
                "aarch64-linux-gnu-objcopy",
 | 
						|
                "llvm-objcopy",
 | 
						|
                "objcopy",
 | 
						|
            ]
 | 
						|
        elif machine and any(token in machine.lower() for token in ("x86-64", "amd", "x86_64")):
 | 
						|
            env_override = os.environ.get("STELLAOPS_OBJCOPY_AMD64")
 | 
						|
            candidates = [
 | 
						|
                env_override,
 | 
						|
                "objcopy",
 | 
						|
                "llvm-objcopy",
 | 
						|
            ]
 | 
						|
        else:
 | 
						|
            env_override = os.environ.get("STELLAOPS_OBJCOPY_DEFAULT")
 | 
						|
            candidates = [
 | 
						|
                env_override,
 | 
						|
                "objcopy",
 | 
						|
                "llvm-objcopy",
 | 
						|
            ]
 | 
						|
 | 
						|
        for candidate in candidates:
 | 
						|
            if not candidate:
 | 
						|
                continue
 | 
						|
            tool = shutil.which(candidate)
 | 
						|
            if tool:
 | 
						|
                self._objcopy_cache[key] = tool
 | 
						|
                return tool
 | 
						|
        self._objcopy_cache[key] = None
 | 
						|
        return None
 | 
						|
 | 
						|
    def _emit_objcopy_warning(self, machine: Optional[str], platform: str, file_path: pathlib.Path) -> None:
 | 
						|
        machine_label = machine or "unknown-machine"
 | 
						|
        count = self._missing_symbol_platforms.get(platform, 0)
 | 
						|
        self._missing_symbol_platforms[platform] = count + 1
 | 
						|
        if count == 0:
 | 
						|
            sys.stderr.write(
 | 
						|
                f"[warn] no objcopy tool available for {machine_label}; skipping debug extraction for {file_path}.\n"
 | 
						|
            )
 | 
						|
 | 
						|
    # ----------------
 | 
						|
    # Helm + compose
 | 
						|
    # ----------------
 | 
						|
    def _package_helm(self) -> Optional[Mapping[str, Any]]:
 | 
						|
        helm_cfg = self.config.get("helm")
 | 
						|
        if not helm_cfg:
 | 
						|
            return None
 | 
						|
        chart_path = helm_cfg.get("chartPath")
 | 
						|
        if not chart_path:
 | 
						|
            return None
 | 
						|
        chart_dir = self.repo_root / chart_path
 | 
						|
        output_dir = ensure_directory(self.output_dir / "helm")
 | 
						|
        archive_path = output_dir / f"stellaops-{self.version}.tgz"
 | 
						|
        if not self.dry_run:
 | 
						|
            cmd = [
 | 
						|
                "helm",
 | 
						|
                "package",
 | 
						|
                str(chart_dir),
 | 
						|
                "--destination",
 | 
						|
                str(output_dir),
 | 
						|
                "--version",
 | 
						|
                self.version,
 | 
						|
                "--app-version",
 | 
						|
                self.version,
 | 
						|
            ]
 | 
						|
            run(cmd)
 | 
						|
            packaged = next(output_dir.glob("*.tgz"), None)
 | 
						|
            if packaged and packaged != archive_path:
 | 
						|
                packaged.rename(archive_path)
 | 
						|
        digest = compute_sha256(archive_path) if archive_path.exists() else None
 | 
						|
        if archive_path.exists() and archive_path.is_relative_to(self.output_dir):
 | 
						|
            manifest_path = str(archive_path.relative_to(self.output_dir))
 | 
						|
        elif archive_path.exists() and archive_path.is_relative_to(self.output_dir.parent):
 | 
						|
            manifest_path = str(archive_path.relative_to(self.output_dir.parent))
 | 
						|
        else:
 | 
						|
            manifest_path = f"helm/{archive_path.name}"
 | 
						|
        return OrderedDict((
 | 
						|
            ("name", "stellaops"),
 | 
						|
            ("version", self.version),
 | 
						|
            ("path", manifest_path),
 | 
						|
            ("sha256", digest),
 | 
						|
        ))
 | 
						|
 | 
						|
    def _digest_compose_files(self) -> List[Mapping[str, Any]]:
 | 
						|
        compose_cfg = self.config.get("compose", {})
 | 
						|
        files = compose_cfg.get("files", [])
 | 
						|
        entries: List[Mapping[str, Any]] = []
 | 
						|
        for rel_path in files:
 | 
						|
            src = self.repo_root / rel_path
 | 
						|
            if not src.exists():
 | 
						|
                continue
 | 
						|
            digest = compute_sha256(src)
 | 
						|
            entries.append(OrderedDict((
 | 
						|
                ("name", pathlib.Path(rel_path).name),
 | 
						|
                ("path", rel_path),
 | 
						|
                ("sha256", digest),
 | 
						|
            )))
 | 
						|
        return entries
 | 
						|
 | 
						|
    # ----------------
 | 
						|
    # Manifest assembly
 | 
						|
    # ----------------
 | 
						|
    def _compose_manifest(
 | 
						|
        self,
 | 
						|
        components: List[Mapping[str, Any]],
 | 
						|
        helm_meta: Optional[Mapping[str, Any]],
 | 
						|
        compose_meta: List[Mapping[str, Any]],
 | 
						|
        debug_meta: Optional[Mapping[str, Any]],
 | 
						|
        cli_meta: Sequence[Mapping[str, Any]],
 | 
						|
    ) -> Dict[str, Any]:
 | 
						|
        manifest = OrderedDict()
 | 
						|
        manifest["release"] = OrderedDict((
 | 
						|
            ("version", self.version),
 | 
						|
            ("channel", self.channel),
 | 
						|
            ("date", self.release_date),
 | 
						|
            ("calendar", self.calendar),
 | 
						|
        ))
 | 
						|
        manifest["components"] = components
 | 
						|
        if helm_meta:
 | 
						|
            manifest["charts"] = [helm_meta]
 | 
						|
        if compose_meta:
 | 
						|
            manifest["compose"] = compose_meta
 | 
						|
        if debug_meta:
 | 
						|
            manifest["debugStore"] = debug_meta
 | 
						|
        if cli_meta:
 | 
						|
            manifest["cli"] = list(cli_meta)
 | 
						|
        return manifest
 | 
						|
 | 
						|
 | 
						|
def parse_args(argv: Optional[Sequence[str]] = None) -> argparse.Namespace:
 | 
						|
    parser = argparse.ArgumentParser(description="Build StellaOps release artefacts deterministically")
 | 
						|
    parser.add_argument("--config", type=pathlib.Path, default=DEFAULT_CONFIG, help="Path to release config JSON")
 | 
						|
    parser.add_argument("--version", required=True, help="Release version string (e.g. 2025.10.0-edge)")
 | 
						|
    parser.add_argument("--channel", required=True, help="Release channel (edge|stable|lts)")
 | 
						|
    parser.add_argument("--calendar", help="Calendar tag (YYYY.MM); defaults derived from version")
 | 
						|
    parser.add_argument("--git-sha", default=os.environ.get("GIT_COMMIT", "unknown"), help="Git revision to embed")
 | 
						|
    parser.add_argument("--output", type=pathlib.Path, default=REPO_ROOT / "out/release", help="Output directory for artefacts")
 | 
						|
    parser.add_argument("--no-push", action="store_true", help="Do not push images (use docker load)")
 | 
						|
    parser.add_argument("--dry-run", action="store_true", help="Print steps without executing commands")
 | 
						|
    parser.add_argument("--registry", help="Override registry root (e.g. localhost:5000/stellaops)")
 | 
						|
    parser.add_argument("--platform", dest="platforms", action="append", metavar="PLATFORM", help="Override build platforms (repeatable)")
 | 
						|
    parser.add_argument("--skip-signing", action="store_true", help="Skip cosign signing/attestation steps")
 | 
						|
    parser.add_argument("--cosign-key", dest="cosign_key", help="Override COSIGN_KEY_REF value")
 | 
						|
    parser.add_argument("--cosign-password", dest="cosign_password", help="Password for cosign key")
 | 
						|
    parser.add_argument("--cosign-identity-token", dest="cosign_identity_token", help="Identity token for keyless cosign flows")
 | 
						|
    parser.add_argument("--no-transparency", action="store_true", help="Disable Rekor transparency log upload during signing")
 | 
						|
    return parser.parse_args(argv)
 | 
						|
 | 
						|
 | 
						|
def write_manifest(manifest: Mapping[str, Any], output_dir: pathlib.Path) -> pathlib.Path:
 | 
						|
    # Copy manifest to avoid mutating input when computing checksum
 | 
						|
    base_manifest = OrderedDict(manifest)
 | 
						|
    yaml_without_checksum = dump_yaml(base_manifest)
 | 
						|
    digest = hashlib.sha256(yaml_without_checksum.encode("utf-8")).hexdigest()
 | 
						|
    manifest_with_checksum = OrderedDict(base_manifest)
 | 
						|
    manifest_with_checksum["checksums"] = OrderedDict((("sha256", digest),))
 | 
						|
    final_yaml = dump_yaml(manifest_with_checksum)
 | 
						|
    output_path = output_dir / "release.yaml"
 | 
						|
    with output_path.open("w", encoding="utf-8") as handle:
 | 
						|
        handle.write(final_yaml)
 | 
						|
    sha_path = output_path.with_name(output_path.name + ".sha256")
 | 
						|
    yaml_file_digest = compute_sha256(output_path)
 | 
						|
    sha_path.write_text(f"{yaml_file_digest}  {output_path.name}\n", encoding="utf-8")
 | 
						|
 | 
						|
    json_text = json.dumps(manifest_with_checksum, indent=2)
 | 
						|
    json_path = output_dir / "release.json"
 | 
						|
    with json_path.open("w", encoding="utf-8") as handle:
 | 
						|
        handle.write(json_text)
 | 
						|
        handle.write("\n")
 | 
						|
    json_digest = compute_sha256(json_path)
 | 
						|
    json_sha_path = json_path.with_name(json_path.name + ".sha256")
 | 
						|
    json_sha_path.write_text(f"{json_digest}  {json_path.name}\n", encoding="utf-8")
 | 
						|
    return output_path
 | 
						|
 | 
						|
 | 
						|
def main(argv: Optional[Sequence[str]] = None) -> int:
 | 
						|
    args = parse_args(argv)
 | 
						|
    config = load_json_config(args.config)
 | 
						|
    release_date = utc_now_iso()
 | 
						|
    calendar = sanitize_calendar(args.version, args.calendar)
 | 
						|
    builder = ReleaseBuilder(
 | 
						|
        repo_root=REPO_ROOT,
 | 
						|
        config=config,
 | 
						|
        version=args.version,
 | 
						|
        channel=args.channel,
 | 
						|
        calendar=calendar,
 | 
						|
        release_date=release_date,
 | 
						|
        git_sha=args.git_sha,
 | 
						|
        output_dir=args.output,
 | 
						|
        push=not args.no_push,
 | 
						|
        dry_run=args.dry_run,
 | 
						|
        registry_override=args.registry,
 | 
						|
        platforms_override=args.platforms,
 | 
						|
        skip_signing=args.skip_signing,
 | 
						|
        cosign_key_ref=args.cosign_key,
 | 
						|
        cosign_password=args.cosign_password,
 | 
						|
        cosign_identity_token=args.cosign_identity_token,
 | 
						|
        tlog_upload=not args.no_transparency,
 | 
						|
    )
 | 
						|
    manifest = builder.run()
 | 
						|
    manifest_path = write_manifest(manifest, builder.output_dir)
 | 
						|
    print(f"✅ Release manifest written to {manifest_path}")
 | 
						|
    return 0
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    raise SystemExit(main())
 |