1104 lines
		
	
	
		
			45 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1104 lines
		
	
	
		
			45 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python3
 | |
| """Deterministic release pipeline helper for StellaOps.
 | |
| 
 | |
| This script builds service containers, generates SBOM and provenance artefacts,
 | |
| signs them with cosign, and writes a channel-specific release manifest.
 | |
| 
 | |
| The workflow expects external tooling to be available on PATH:
 | |
| - docker (with buildx)
 | |
| - cosign
 | |
| - helm
 | |
| - npm / node (for the UI build)
 | |
| - dotnet SDK (for BuildX plugin publication)
 | |
| """
 | |
| from __future__ import annotations
 | |
| 
 | |
| import argparse
 | |
| import contextlib
 | |
| import datetime as dt
 | |
| import hashlib
 | |
| import json
 | |
| import os
 | |
| import pathlib
 | |
| import re
 | |
| import shlex
 | |
| import shutil
 | |
| import stat
 | |
| import subprocess
 | |
| import sys
 | |
| import tarfile
 | |
| import tempfile
 | |
| import uuid
 | |
| import zipfile
 | |
| from collections import OrderedDict
 | |
| from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Sequence, Tuple
 | |
| 
 | |
| REPO_ROOT = pathlib.Path(__file__).resolve().parents[3]
 | |
| DEFAULT_CONFIG = REPO_ROOT / "ops/devops/release/components.json"
 | |
| 
 | |
| class CommandError(RuntimeError):
 | |
|     pass
 | |
| 
 | |
| def run(cmd: Sequence[str], *, cwd: Optional[pathlib.Path] = None, env: Optional[Mapping[str, str]] = None, capture: bool = True) -> str:
 | |
|     """Run a subprocess command, returning stdout (text)."""
 | |
|     process_env = os.environ.copy()
 | |
|     if env:
 | |
|         process_env.update(env)
 | |
|     result = subprocess.run(
 | |
|         list(cmd),
 | |
|         cwd=str(cwd) if cwd else None,
 | |
|         env=process_env,
 | |
|         check=False,
 | |
|         capture_output=capture,
 | |
|         text=True,
 | |
|     )
 | |
|     if process_env.get("STELLAOPS_RELEASE_DEBUG"):
 | |
|         sys.stderr.write(f"[debug] {' '.join(shlex.quote(c) for c in cmd)}\n")
 | |
|         if capture:
 | |
|             sys.stderr.write(result.stdout)
 | |
|             sys.stderr.write(result.stderr)
 | |
|     if result.returncode != 0:
 | |
|         stdout = result.stdout if capture else ""
 | |
|         stderr = result.stderr if capture else ""
 | |
|         raise CommandError(f"Command failed ({result.returncode}): {' '.join(cmd)}\nSTDOUT:\n{stdout}\nSTDERR:\n{stderr}")
 | |
| 
 | |
|     return result.stdout if capture else ""
 | |
| 
 | |
| 
 | |
| def load_json_config(path: pathlib.Path) -> Dict[str, Any]:
 | |
|     with path.open("r", encoding="utf-8") as handle:
 | |
|         return json.load(handle)
 | |
| 
 | |
| 
 | |
| def ensure_directory(path: pathlib.Path) -> pathlib.Path:
 | |
|     path.mkdir(parents=True, exist_ok=True)
 | |
|     return path
 | |
| 
 | |
| 
 | |
| def compute_sha256(path: pathlib.Path) -> str:
 | |
|     sha = hashlib.sha256()
 | |
|     with path.open("rb") as handle:
 | |
|         for chunk in iter(lambda: handle.read(1024 * 1024), b""):
 | |
|             sha.update(chunk)
 | |
|     return sha.hexdigest()
 | |
| 
 | |
| 
 | |
| def format_scalar(value: Any) -> str:
 | |
|     if isinstance(value, bool):
 | |
|         return "true" if value else "false"
 | |
|     if value is None:
 | |
|         return "null"
 | |
|     if isinstance(value, (int, float)):
 | |
|         return str(value)
 | |
|     text = str(value)
 | |
|     if text == "":
 | |
|         return '""'
 | |
|     if re.search(r"[\s:#\-\[\]\{\}]", text):
 | |
|         return json.dumps(text, ensure_ascii=False)
 | |
|     return text
 | |
| 
 | |
| 
 | |
| def _yaml_lines(value: Any, indent: int = 0) -> List[str]:
 | |
|     pad = "  " * indent
 | |
|     if isinstance(value, Mapping):
 | |
|         lines: List[str] = []
 | |
|         for key, val in value.items():
 | |
|             if isinstance(val, (Mapping, list)):
 | |
|                 lines.append(f"{pad}{key}:")
 | |
|                 lines.extend(_yaml_lines(val, indent + 1))
 | |
|             else:
 | |
|                 lines.append(f"{pad}{key}: {format_scalar(val)}")
 | |
|         if not lines:
 | |
|             lines.append(f"{pad}{{}}")
 | |
|         return lines
 | |
|     if isinstance(value, list):
 | |
|         lines = []
 | |
|         if not value:
 | |
|             lines.append(f"{pad}[]")
 | |
|             return lines
 | |
|         for item in value:
 | |
|             if isinstance(item, (Mapping, list)):
 | |
|                 lines.append(f"{pad}-")
 | |
|                 lines.extend(_yaml_lines(item, indent + 1))
 | |
|             else:
 | |
|                 lines.append(f"{pad}- {format_scalar(item)}")
 | |
|         return lines
 | |
|     return [f"{pad}{format_scalar(value)}"]
 | |
| 
 | |
| 
 | |
| def dump_yaml(data: Mapping[str, Any]) -> str:
 | |
|     lines: List[str] = _yaml_lines(data)
 | |
|     return "\n".join(lines) + "\n"
 | |
| 
 | |
| 
 | |
| def utc_now_iso() -> str:
 | |
|     return dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
 | |
| 
 | |
| 
 | |
| def sanitize_calendar(version: str, explicit: Optional[str]) -> str:
 | |
|     if explicit:
 | |
|         return explicit
 | |
|     # Expect version like 2025.10.0-edge or 2.4.1
 | |
|     parts = re.findall(r"\d+", version)
 | |
|     if len(parts) >= 2:
 | |
|         return f"{parts[0]}.{parts[1]}"
 | |
|     return dt.datetime.now(tz=dt.timezone.utc).strftime("%Y.%m")
 | |
| 
 | |
| 
 | |
| class ReleaseBuilder:
 | |
|     def __init__(
 | |
|         self,
 | |
|         *,
 | |
|         repo_root: pathlib.Path,
 | |
|         config: Mapping[str, Any],
 | |
|         version: str,
 | |
|         channel: str,
 | |
|         calendar: str,
 | |
|         release_date: str,
 | |
|         git_sha: str,
 | |
|         output_dir: pathlib.Path,
 | |
|         push: bool,
 | |
|         dry_run: bool,
 | |
|         registry_override: Optional[str] = None,
 | |
|         platforms_override: Optional[Sequence[str]] = None,
 | |
|         skip_signing: bool = False,
 | |
|         cosign_key_ref: Optional[str] = None,
 | |
|         cosign_password: Optional[str] = None,
 | |
|         cosign_identity_token: Optional[str] = None,
 | |
|         tlog_upload: bool = True,
 | |
|     ) -> None:
 | |
|         self.repo_root = repo_root
 | |
|         self.config = config
 | |
|         self.version = version
 | |
|         self.channel = channel
 | |
|         self.calendar = calendar
 | |
|         self.release_date = release_date
 | |
|         self.git_sha = git_sha
 | |
|         self.output_dir = ensure_directory(output_dir)
 | |
|         self.push = push
 | |
|         self.dry_run = dry_run
 | |
|         self.registry = registry_override or config.get("registry")
 | |
|         if not self.registry:
 | |
|             raise ValueError("Config missing 'registry'")
 | |
|         platforms = list(platforms_override) if platforms_override else config.get("platforms")
 | |
|         if not platforms:
 | |
|             platforms = ["linux/amd64", "linux/arm64"]
 | |
|         self.platforms = list(platforms)
 | |
|         self.source_date_epoch = str(int(dt.datetime.fromisoformat(release_date.replace("Z", "+00:00")).timestamp()))
 | |
|         self.artifacts_dir = ensure_directory(self.output_dir / "artifacts")
 | |
|         self.sboms_dir = ensure_directory(self.artifacts_dir / "sboms")
 | |
|         self.provenance_dir = ensure_directory(self.artifacts_dir / "provenance")
 | |
|         self.signature_dir = ensure_directory(self.artifacts_dir / "signatures")
 | |
|         self.metadata_dir = ensure_directory(self.artifacts_dir / "metadata")
 | |
|         self.debug_dir = ensure_directory(self.output_dir / "debug")
 | |
|         self.debug_store_dir = ensure_directory(self.debug_dir / ".build-id")
 | |
|         self.cli_config = config.get("cli")
 | |
|         self.cli_output_dir = ensure_directory(self.output_dir / "cli") if self.cli_config else None
 | |
|         self.temp_dir = pathlib.Path(tempfile.mkdtemp(prefix="stellaops-release-"))
 | |
|         self.skip_signing = skip_signing
 | |
|         self.tlog_upload = tlog_upload
 | |
|         self.cosign_key_ref = cosign_key_ref or os.environ.get("COSIGN_KEY_REF")
 | |
|         self.cosign_identity_token = cosign_identity_token or os.environ.get("COSIGN_IDENTITY_TOKEN")
 | |
|         password = cosign_password if cosign_password is not None else os.environ.get("COSIGN_PASSWORD", "")
 | |
|         self.cosign_env = {
 | |
|             "COSIGN_PASSWORD": password,
 | |
|             "COSIGN_EXPERIMENTAL": "1",
 | |
|             "COSIGN_ALLOW_HTTP_REGISTRY": os.environ.get("COSIGN_ALLOW_HTTP_REGISTRY", "1"),
 | |
|             "COSIGN_DOCKER_MEDIA_TYPES": os.environ.get("COSIGN_DOCKER_MEDIA_TYPES", "1"),
 | |
|         }
 | |
|         # Cache resolved objcopy binaries keyed by machine identifier to avoid repeated lookups.
 | |
|         self._objcopy_cache: Dict[str, Optional[str]] = {}
 | |
|         self._missing_symbol_platforms: Dict[str, int] = {}
 | |
| 
 | |
|     # ----------------
 | |
|     # Build steps
 | |
|     # ----------------
 | |
|     def run(self) -> Dict[str, Any]:
 | |
|         components_result = []
 | |
|         if self.dry_run:
 | |
|             print("⚠️  Dry-run enabled; commands will be skipped")
 | |
|         self._prime_buildx_plugin()
 | |
|         for component in self.config.get("components", []):
 | |
|             result = self._build_component(component)
 | |
|             components_result.append(result)
 | |
|         helm_meta = self._package_helm()
 | |
|         compose_meta = self._digest_compose_files()
 | |
|         debug_meta = self._collect_debug_store(components_result)
 | |
|         cli_meta = self._build_cli_artifacts()
 | |
|         manifest = self._compose_manifest(components_result, helm_meta, compose_meta, debug_meta, cli_meta)
 | |
|         return manifest
 | |
| 
 | |
|     def _prime_buildx_plugin(self) -> None:
 | |
|         plugin_cfg = self.config.get("buildxPlugin")
 | |
|         if not plugin_cfg:
 | |
|             return
 | |
|         project = plugin_cfg.get("project")
 | |
|         if not project:
 | |
|             return
 | |
|         out_dir = ensure_directory(self.temp_dir / "buildx")
 | |
|         if not self.dry_run:
 | |
|             run([
 | |
|                 "dotnet",
 | |
|                 "publish",
 | |
|                 project,
 | |
|                 "-c",
 | |
|                 "Release",
 | |
|                 "-o",
 | |
|                 str(out_dir),
 | |
|             ])
 | |
|             cas_dir = ensure_directory(self.temp_dir / "cas")
 | |
|             run([
 | |
|                 "dotnet",
 | |
|                 str(out_dir / "StellaOps.Scanner.Sbomer.BuildXPlugin.dll"),
 | |
|                 "handshake",
 | |
|                 "--manifest",
 | |
|                 str(out_dir),
 | |
|                 "--cas",
 | |
|                 str(cas_dir),
 | |
|             ])
 | |
| 
 | |
|     def _component_tags(self, repo: str) -> List[str]:
 | |
|         base = f"{self.registry}/{repo}"
 | |
|         tags = [f"{base}:{self.version}"]
 | |
|         if self.channel:
 | |
|             tags.append(f"{base}:{self.channel}")
 | |
|         return tags
 | |
| 
 | |
|     def _component_ref(self, repo: str, digest: str) -> str:
 | |
|         return f"{self.registry}/{repo}@{digest}"
 | |
| 
 | |
|     def _relative_path(self, path: pathlib.Path) -> str:
 | |
|         try:
 | |
|             return str(path.relative_to(self.output_dir.parent))
 | |
|         except ValueError:
 | |
|             return str(path)
 | |
| 
 | |
|     def _build_component(self, component: Mapping[str, Any]) -> Mapping[str, Any]:
 | |
|         name = component["name"]
 | |
|         repo = component.get("repository", name)
 | |
|         kind = component.get("kind", "dotnet-service")
 | |
|         dockerfile = component.get("dockerfile")
 | |
|         if not dockerfile:
 | |
|             raise ValueError(f"Component {name} missing dockerfile")
 | |
|         context = component.get("context", ".")
 | |
|         iid_file = self.temp_dir / f"{name}.iid"
 | |
|         metadata_file = self.metadata_dir / f"{name}.metadata.json"
 | |
| 
 | |
|         build_args = {
 | |
|             "VERSION": self.version,
 | |
|             "CHANNEL": self.channel,
 | |
|             "GIT_SHA": self.git_sha,
 | |
|             "SOURCE_DATE_EPOCH": self.source_date_epoch,
 | |
|         }
 | |
|         docker_cfg = self.config.get("docker", {})
 | |
|         if kind == "dotnet-service":
 | |
|             build_args.update({
 | |
|                 "PROJECT": component["project"],
 | |
|                 "ENTRYPOINT_DLL": component["entrypoint"],
 | |
|                 "SDK_IMAGE": docker_cfg.get("sdkImage", "mcr.microsoft.com/dotnet/nightly/sdk:10.0"),
 | |
|                 "RUNTIME_IMAGE": docker_cfg.get("runtimeImage", "gcr.io/distroless/dotnet/aspnet:latest"),
 | |
|             })
 | |
|         elif kind == "angular-ui":
 | |
|             build_args.update({
 | |
|                 "NODE_IMAGE": docker_cfg.get("nodeImage", "node:20.14.0-bookworm"),
 | |
|                 "NGINX_IMAGE": docker_cfg.get("nginxImage", "nginx:1.27-alpine"),
 | |
|             })
 | |
|         else:
 | |
|             raise ValueError(f"Unsupported component kind {kind}")
 | |
| 
 | |
|         tags = self._component_tags(repo)
 | |
|         build_cmd = [
 | |
|             "docker",
 | |
|             "buildx",
 | |
|             "build",
 | |
|             "--file",
 | |
|             dockerfile,
 | |
|             "--metadata-file",
 | |
|             str(metadata_file),
 | |
|             "--iidfile",
 | |
|             str(iid_file),
 | |
|             "--progress",
 | |
|             "plain",
 | |
|             "--platform",
 | |
|             ",".join(self.platforms),
 | |
|         ]
 | |
|         for key, value in build_args.items():
 | |
|             build_cmd.extend(["--build-arg", f"{key}={value}"])
 | |
|         for tag in tags:
 | |
|             build_cmd.extend(["--tag", tag])
 | |
|         build_cmd.extend([
 | |
|             "--attest",
 | |
|             "type=sbom",
 | |
|             "--attest",
 | |
|             "type=provenance,mode=max",
 | |
|         ])
 | |
|         if self.push:
 | |
|             build_cmd.append("--push")
 | |
|         else:
 | |
|             build_cmd.append("--load")
 | |
|         build_cmd.append(context)
 | |
| 
 | |
|         if not self.dry_run:
 | |
|             run(build_cmd, cwd=self.repo_root)
 | |
| 
 | |
|         digest = iid_file.read_text(encoding="utf-8").strip() if iid_file.exists() else ""
 | |
|         image_ref = self._component_ref(repo, digest) if digest else ""
 | |
| 
 | |
|         bundle_info = self._sign_image(name, image_ref, tags)
 | |
|         sbom_info = self._generate_sbom(name, image_ref)
 | |
|         provenance_info = self._attach_provenance(name, image_ref)
 | |
| 
 | |
|         component_entry = OrderedDict()
 | |
|         component_entry["name"] = name
 | |
|         if digest:
 | |
|             component_entry["image"] = image_ref
 | |
|         component_entry["tags"] = tags
 | |
|         if sbom_info:
 | |
|             component_entry["sbom"] = sbom_info
 | |
|         if provenance_info:
 | |
|             component_entry["provenance"] = provenance_info
 | |
|         if bundle_info:
 | |
|             component_entry["signature"] = bundle_info
 | |
|         if metadata_file.exists():
 | |
|             metadata_rel = (
 | |
|                 str(metadata_file.relative_to(self.output_dir.parent))
 | |
|                 if metadata_file.is_relative_to(self.output_dir.parent)
 | |
|                 else str(metadata_file)
 | |
|             )
 | |
|             component_entry["metadata"] = OrderedDict((
 | |
|                 ("path", metadata_rel),
 | |
|                 ("sha256", compute_sha256(metadata_file)),
 | |
|             ))
 | |
|         return component_entry
 | |
| 
 | |
|     def _sign_image(self, name: str, image_ref: str, tags: Sequence[str]) -> Optional[Mapping[str, Any]]:
 | |
|         if self.skip_signing:
 | |
|             return None
 | |
|         if not image_ref:
 | |
|             return None
 | |
|         if not (self.cosign_key_ref or self.cosign_identity_token):
 | |
|             raise ValueError("Signing requested but no cosign key or identity token provided. Use --skip-signing to bypass.")
 | |
|         signature_path = self.signature_dir / f"{name}.signature"
 | |
|         cmd = ["cosign", "sign", "--yes"]
 | |
|         if self.cosign_key_ref:
 | |
|             cmd.extend(["--key", self.cosign_key_ref])
 | |
|         if self.cosign_identity_token:
 | |
|             cmd.extend(["--identity-token", self.cosign_identity_token])
 | |
|         if not self.tlog_upload:
 | |
|             cmd.append("--tlog-upload=false")
 | |
|         cmd.append("--allow-http-registry")
 | |
|         cmd.append(image_ref)
 | |
|         if self.dry_run:
 | |
|             return None
 | |
|         run(cmd, env=self.cosign_env)
 | |
|         signature_data = run([
 | |
|             "cosign",
 | |
|             "download",
 | |
|             "signature",
 | |
|             "--allow-http-registry",
 | |
|             image_ref,
 | |
|         ])
 | |
|         signature_path.write_text(signature_data, encoding="utf-8")
 | |
|         signature_sha = compute_sha256(signature_path)
 | |
|         signature_ref = run([
 | |
|             "cosign",
 | |
|             "triangulate",
 | |
|             "--allow-http-registry",
 | |
|             image_ref,
 | |
|         ]).strip()
 | |
|         return OrderedDict(
 | |
|             (
 | |
|                 ("signature", OrderedDict((
 | |
|                     ("path", str(signature_path.relative_to(self.output_dir.parent)) if signature_path.is_relative_to(self.output_dir.parent) else str(signature_path)),
 | |
|                     ("sha256", signature_sha),
 | |
|                     ("ref", signature_ref),
 | |
|                     ("tlogUploaded", self.tlog_upload),
 | |
|                 ))),
 | |
|             )
 | |
|         )
 | |
| 
 | |
|     def _generate_sbom(self, name: str, image_ref: str) -> Optional[Mapping[str, Any]]:
 | |
|         if not image_ref or self.dry_run:
 | |
|             return None
 | |
|         sbom_path = self.sboms_dir / f"{name}.cyclonedx.json"
 | |
|         run([
 | |
|             "docker",
 | |
|             "sbom",
 | |
|             image_ref,
 | |
|             "--format",
 | |
|             "cyclonedx-json",
 | |
|             "--output",
 | |
|             str(sbom_path),
 | |
|         ])
 | |
|         entry = OrderedDict((
 | |
|             ("path", str(sbom_path.relative_to(self.output_dir.parent)) if sbom_path.is_relative_to(self.output_dir.parent) else str(sbom_path)),
 | |
|             ("sha256", compute_sha256(sbom_path)),
 | |
|         ))
 | |
|         if self.skip_signing:
 | |
|             return entry
 | |
|         attach_cmd = [
 | |
|             "cosign",
 | |
|             "attach",
 | |
|             "sbom",
 | |
|             "--sbom",
 | |
|             str(sbom_path),
 | |
|             "--type",
 | |
|             "cyclonedx",
 | |
|         ]
 | |
|         if self.cosign_key_ref:
 | |
|             attach_cmd.extend(["--key", self.cosign_key_ref])
 | |
|         attach_cmd.append("--allow-http-registry")
 | |
|         attach_cmd.append(image_ref)
 | |
|         run(attach_cmd, env=self.cosign_env)
 | |
|         reference = run(["cosign", "triangulate", "--type", "sbom", "--allow-http-registry", image_ref]).strip()
 | |
|         entry["ref"] = reference
 | |
|         return entry
 | |
| 
 | |
|     def _attach_provenance(self, name: str, image_ref: str) -> Optional[Mapping[str, Any]]:
 | |
|         if not image_ref or self.dry_run:
 | |
|             return None
 | |
|         predicate = OrderedDict()
 | |
|         predicate["buildDefinition"] = OrderedDict(
 | |
|             (
 | |
|                 ("buildType", "https://git.stella-ops.org/stellaops/release"),
 | |
|                 ("externalParameters", OrderedDict((
 | |
|                     ("component", name),
 | |
|                     ("version", self.version),
 | |
|                     ("channel", self.channel),
 | |
|                 ))),
 | |
|             )
 | |
|         )
 | |
|         predicate["runDetails"] = OrderedDict(
 | |
|             (
 | |
|                 ("builder", OrderedDict((("id", "https://github.com/actions"),))),
 | |
|                 ("metadata", OrderedDict((("finishedOn", self.release_date),))),
 | |
|             )
 | |
|         )
 | |
|         predicate_path = self.provenance_dir / f"{name}.provenance.json"
 | |
|         with predicate_path.open("w", encoding="utf-8") as handle:
 | |
|             json.dump(predicate, handle, indent=2, sort_keys=True)
 | |
|             handle.write("\n")
 | |
|         entry = OrderedDict((
 | |
|             ("path", str(predicate_path.relative_to(self.output_dir.parent)) if predicate_path.is_relative_to(self.output_dir.parent) else str(predicate_path)),
 | |
|             ("sha256", compute_sha256(predicate_path)),
 | |
|         ))
 | |
|         if self.skip_signing:
 | |
|             return entry
 | |
|         cmd = [
 | |
|             "cosign",
 | |
|             "attest",
 | |
|             "--predicate",
 | |
|             str(predicate_path),
 | |
|             "--type",
 | |
|             "https://slsa.dev/provenance/v1",
 | |
|         ]
 | |
|         if self.cosign_key_ref:
 | |
|             cmd.extend(["--key", self.cosign_key_ref])
 | |
|         if not self.tlog_upload:
 | |
|             cmd.append("--tlog-upload=false")
 | |
|         cmd.append("--allow-http-registry")
 | |
|         cmd.append(image_ref)
 | |
|         run(cmd, env=self.cosign_env)
 | |
|         ref = run([
 | |
|             "cosign",
 | |
|             "triangulate",
 | |
|             "--type",
 | |
|             "https://slsa.dev/provenance/v1",
 | |
|             "--allow-http-registry",
 | |
|             image_ref,
 | |
|         ]).strip()
 | |
|         entry["ref"] = ref
 | |
|         return entry
 | |
| 
 | |
|     def _collect_debug_store(self, components: Sequence[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]:
 | |
|         if self.dry_run:
 | |
|             return None
 | |
|         debug_records: Dict[Tuple[str, str], OrderedDict[str, Any]] = {}
 | |
|         for component in components:
 | |
|             image_ref = component.get("image")
 | |
|             if not image_ref:
 | |
|                 continue
 | |
|             name = component.get("name", "unknown")
 | |
|             entries = self._extract_debug_entries(name, image_ref)
 | |
|             for entry in entries:
 | |
|                 key = (entry["platform"], entry["buildId"])
 | |
|                 existing = debug_records.get(key)
 | |
|                 if existing is None:
 | |
|                     record = OrderedDict((
 | |
|                         ("buildId", entry["buildId"]),
 | |
|                         ("platform", entry["platform"]),
 | |
|                         ("debugPath", entry["debugPath"]),
 | |
|                         ("sha256", entry["sha256"]),
 | |
|                         ("size", entry["size"]),
 | |
|                         ("components", [entry["component"]]),
 | |
|                         ("images", [entry["image"]]),
 | |
|                         ("sources", list(entry["sources"])),
 | |
|                     ))
 | |
|                     debug_records[key] = record
 | |
|                 else:
 | |
|                     if entry["sha256"] != existing["sha256"]:
 | |
|                         raise RuntimeError(
 | |
|                             f"Build-id {entry['buildId']} for platform {entry['platform']} produced conflicting hashes"
 | |
|                         )
 | |
|                     if entry["component"] not in existing["components"]:
 | |
|                         existing["components"].append(entry["component"])
 | |
|                     if entry["image"] not in existing["images"]:
 | |
|                         existing["images"].append(entry["image"])
 | |
|                     for source in entry["sources"]:
 | |
|                         if source not in existing["sources"]:
 | |
|                             existing["sources"].append(source)
 | |
|         if not debug_records:
 | |
|             sys.stderr.write(
 | |
|                 "[error] release build produced no debug artefacts; enable symbol extraction so out/release/debug is populated (DEVOPS-REL-17-004).\n"
 | |
|             )
 | |
|             # Remove empty directories before failing
 | |
|             with contextlib.suppress(FileNotFoundError, OSError):
 | |
|                 if not any(self.debug_store_dir.iterdir()):
 | |
|                     self.debug_store_dir.rmdir()
 | |
|             with contextlib.suppress(FileNotFoundError, OSError):
 | |
|                 if not any(self.debug_dir.iterdir()):
 | |
|                     self.debug_dir.rmdir()
 | |
|             raise RuntimeError(
 | |
|                 "Debug store collection produced no build-id artefacts (DEVOPS-REL-17-004)."
 | |
|             )
 | |
|         entries = []
 | |
|         for record in debug_records.values():
 | |
|             entry = OrderedDict((
 | |
|                 ("buildId", record["buildId"]),
 | |
|                 ("platform", record["platform"]),
 | |
|                 ("debugPath", record["debugPath"]),
 | |
|                 ("sha256", record["sha256"]),
 | |
|                 ("size", record["size"]),
 | |
|                 ("components", sorted(record["components"])),
 | |
|                 ("images", sorted(record["images"])),
 | |
|                 ("sources", sorted(record["sources"])),
 | |
|             ))
 | |
|             entries.append(entry)
 | |
|         entries.sort(key=lambda item: (item["platform"], item["buildId"]))
 | |
|         manifest_path = self.debug_dir / "debug-manifest.json"
 | |
|         platform_counts: Dict[str, int] = {}
 | |
|         for entry in entries:
 | |
|             platform_counts[entry["platform"]] = platform_counts.get(entry["platform"], 0) + 1
 | |
|         missing_platforms = [
 | |
|             platform
 | |
|             for platform in self._missing_symbol_platforms
 | |
|             if platform_counts.get(platform, 0) == 0
 | |
|         ]
 | |
|         if missing_platforms:
 | |
|             raise RuntimeError(
 | |
|                 "Debug extraction skipped all binaries for platforms without objcopy support: "
 | |
|                 + ", ".join(sorted(missing_platforms))
 | |
|             )
 | |
|         manifest_data = OrderedDict((
 | |
|             ("generatedAt", self.release_date),
 | |
|             ("version", self.version),
 | |
|             ("channel", self.channel),
 | |
|             ("artifacts", entries),
 | |
|         ))
 | |
|         with manifest_path.open("w", encoding="utf-8") as handle:
 | |
|             json.dump(manifest_data, handle, indent=2)
 | |
|             handle.write("\n")
 | |
|         manifest_sha = compute_sha256(manifest_path)
 | |
|         sha_path = manifest_path.with_suffix(manifest_path.suffix + ".sha256")
 | |
|         sha_path.write_text(f"{manifest_sha}  {manifest_path.name}\n", encoding="utf-8")
 | |
|         manifest_rel = manifest_path.relative_to(self.output_dir).as_posix()
 | |
|         store_rel = self.debug_store_dir.relative_to(self.output_dir).as_posix()
 | |
|         platforms = sorted({entry["platform"] for entry in entries})
 | |
|         return OrderedDict((
 | |
|             ("manifest", manifest_rel),
 | |
|             ("sha256", manifest_sha),
 | |
|             ("entries", len(entries)),
 | |
|             ("platforms", platforms),
 | |
|             ("directory", store_rel),
 | |
|         ))
 | |
| 
 | |
|     # ----------------
 | |
|     # CLI packaging
 | |
|     # ----------------
 | |
|     def _build_cli_artifacts(self) -> List[Mapping[str, Any]]:
 | |
|         if not self.cli_config or self.dry_run:
 | |
|             return []
 | |
|         project_rel = self.cli_config.get("project")
 | |
|         if not project_rel:
 | |
|             return []
 | |
|         project_path = (self.repo_root / project_rel).resolve()
 | |
|         if not project_path.exists():
 | |
|             raise FileNotFoundError(f"CLI project not found at {project_path}")
 | |
|         runtimes: Sequence[str] = self.cli_config.get("runtimes", [])
 | |
|         if not runtimes:
 | |
|             runtimes = ("linux-x64",)
 | |
|         package_prefix = self.cli_config.get("packagePrefix", "stella")
 | |
|         ensure_directory(self.cli_output_dir or (self.output_dir / "cli"))
 | |
| 
 | |
|         cli_entries: List[Mapping[str, Any]] = []
 | |
|         for runtime in runtimes:
 | |
|             entry = self._build_cli_for_runtime(project_path, runtime, package_prefix)
 | |
|             cli_entries.append(entry)
 | |
|         return cli_entries
 | |
| 
 | |
|     def _build_cli_for_runtime(
 | |
|         self,
 | |
|         project_path: pathlib.Path,
 | |
|         runtime: str,
 | |
|         package_prefix: str,
 | |
|     ) -> Mapping[str, Any]:
 | |
|         publish_dir = ensure_directory(self.temp_dir / f"cli-publish-{runtime}")
 | |
|         publish_cmd = [
 | |
|             "dotnet",
 | |
|             "publish",
 | |
|             str(project_path),
 | |
|             "--configuration",
 | |
|             "Release",
 | |
|             "--runtime",
 | |
|             runtime,
 | |
|             "--self-contained",
 | |
|             "true",
 | |
|             "/p:PublishSingleFile=true",
 | |
|             "/p:IncludeNativeLibrariesForSelfExtract=true",
 | |
|             "/p:EnableCompressionInSingleFile=true",
 | |
|             "/p:InvariantGlobalization=true",
 | |
|             "--output",
 | |
|             str(publish_dir),
 | |
|         ]
 | |
|         run(publish_cmd, cwd=self.repo_root)
 | |
| 
 | |
|         original_name = "StellaOps.Cli"
 | |
|         if runtime.startswith("win"):
 | |
|             source = publish_dir / f"{original_name}.exe"
 | |
|             target = publish_dir / "stella.exe"
 | |
|         else:
 | |
|             source = publish_dir / original_name
 | |
|             target = publish_dir / "stella"
 | |
|         if source.exists():
 | |
|             if target.exists():
 | |
|                 target.unlink()
 | |
|             source.rename(target)
 | |
|             if not runtime.startswith("win"):
 | |
|                 target.chmod(target.stat().st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
 | |
| 
 | |
|         package_dir = self.cli_output_dir or (self.output_dir / "cli")
 | |
|         ensure_directory(package_dir)
 | |
|         archive_name = f"{package_prefix}-{self.version}-{runtime}"
 | |
|         if runtime.startswith("win"):
 | |
|             package_path = package_dir / f"{archive_name}.zip"
 | |
|             self._archive_zip(publish_dir, package_path)
 | |
|         else:
 | |
|             package_path = package_dir / f"{archive_name}.tar.gz"
 | |
|             self._archive_tar(publish_dir, package_path)
 | |
| 
 | |
|         digest = compute_sha256(package_path)
 | |
|         sha_path = package_path.with_suffix(package_path.suffix + ".sha256")
 | |
|         sha_path.write_text(f"{digest}  {package_path.name}\n", encoding="utf-8")
 | |
| 
 | |
|         archive_info = OrderedDict((
 | |
|             ("path", self._relative_path(package_path)),
 | |
|             ("sha256", digest),
 | |
|         ))
 | |
|         signature_info = self._sign_file(package_path)
 | |
|         if signature_info:
 | |
|             archive_info["signature"] = signature_info
 | |
| 
 | |
|         sbom_info = self._generate_cli_sbom(runtime, publish_dir)
 | |
| 
 | |
|         entry = OrderedDict((
 | |
|             ("runtime", runtime),
 | |
|             ("archive", archive_info),
 | |
|         ))
 | |
|         if sbom_info:
 | |
|             entry["sbom"] = sbom_info
 | |
|         return entry
 | |
| 
 | |
|     def _archive_tar(self, source_dir: pathlib.Path, archive_path: pathlib.Path) -> None:
 | |
|         with tarfile.open(archive_path, "w:gz") as tar:
 | |
|             for item in sorted(source_dir.rglob("*")):
 | |
|                 arcname = item.relative_to(source_dir)
 | |
|                 tar.add(item, arcname=arcname)
 | |
| 
 | |
|     def _archive_zip(self, source_dir: pathlib.Path, archive_path: pathlib.Path) -> None:
 | |
|         with zipfile.ZipFile(archive_path, "w", compression=zipfile.ZIP_DEFLATED) as zipf:
 | |
|             for item in sorted(source_dir.rglob("*")):
 | |
|                 if item.is_dir():
 | |
|                     continue
 | |
|                 arcname = item.relative_to(source_dir).as_posix()
 | |
|                 zip_info = zipfile.ZipInfo(arcname)
 | |
|                 zip_info.external_attr = (item.stat().st_mode & 0xFFFF) << 16
 | |
|                 with item.open("rb") as handle:
 | |
|                     zipf.writestr(zip_info, handle.read())
 | |
| 
 | |
|     def _generate_cli_sbom(self, runtime: str, publish_dir: pathlib.Path) -> Optional[Mapping[str, Any]]:
 | |
|         if self.dry_run:
 | |
|             return None
 | |
|         sbom_dir = ensure_directory(self.sboms_dir / "cli")
 | |
|         sbom_path = sbom_dir / f"cli-{runtime}.cyclonedx.json"
 | |
|         run([
 | |
|             "syft",
 | |
|             f"dir:{publish_dir}",
 | |
|             "--output",
 | |
|             f"cyclonedx-json={sbom_path}",
 | |
|         ])
 | |
|         entry = OrderedDict((
 | |
|             ("path", self._relative_path(sbom_path)),
 | |
|             ("sha256", compute_sha256(sbom_path)),
 | |
|         ))
 | |
|         signature_info = self._sign_file(sbom_path)
 | |
|         if signature_info:
 | |
|             entry["signature"] = signature_info
 | |
|         return entry
 | |
| 
 | |
|     def _sign_file(self, path: pathlib.Path) -> Optional[Mapping[str, Any]]:
 | |
|         if self.skip_signing:
 | |
|             return None
 | |
|         if not (self.cosign_key_ref or self.cosign_identity_token):
 | |
|             raise ValueError(
 | |
|                 "Signing requested but no cosign key or identity token provided. Use --skip-signing to bypass."
 | |
|             )
 | |
|         signature_path = path.with_suffix(path.suffix + ".sig")
 | |
|         sha_path = path.with_suffix(path.suffix + ".sha256")
 | |
|         digest = compute_sha256(path)
 | |
|         sha_path.write_text(f"{digest}  {path.name}\n", encoding="utf-8")
 | |
|         cmd = ["cosign", "sign-blob", "--yes", str(path)]
 | |
|         if self.cosign_key_ref:
 | |
|             cmd.extend(["--key", self.cosign_key_ref])
 | |
|         if self.cosign_identity_token:
 | |
|             cmd.extend(["--identity-token", self.cosign_identity_token])
 | |
|         if not self.tlog_upload:
 | |
|             cmd.append("--tlog-upload=false")
 | |
|         signature_data = run(cmd, env=self.cosign_env).strip()
 | |
|         signature_path.write_text(signature_data + "\n", encoding="utf-8")
 | |
|         return OrderedDict((
 | |
|             ("path", self._relative_path(signature_path)),
 | |
|             ("sha256", compute_sha256(signature_path)),
 | |
|             ("tlogUploaded", self.tlog_upload),
 | |
|         ))
 | |
| 
 | |
|     def _extract_debug_entries(self, component_name: str, image_ref: str) -> List[OrderedDict[str, Any]]:
 | |
|         if self.dry_run:
 | |
|             return []
 | |
|         entries: List[OrderedDict[str, Any]] = []
 | |
|         platforms = self.platforms if self.push else [None]
 | |
|         for platform in platforms:
 | |
|             platform_label = platform or (self.platforms[0] if self.platforms else "linux/amd64")
 | |
|             if self.push:
 | |
|                 pull_cmd = ["docker", "pull"]
 | |
|                 if platform:
 | |
|                     pull_cmd.extend(["--platform", platform])
 | |
|                 pull_cmd.append(image_ref)
 | |
|                 run(pull_cmd)
 | |
|             create_cmd = ["docker", "create"]
 | |
|             if platform:
 | |
|                 create_cmd.extend(["--platform", platform])
 | |
|             create_cmd.append(image_ref)
 | |
|             container_id = run(create_cmd).strip()
 | |
|             export_path = self.temp_dir / f"{container_id}.tar"
 | |
|             try:
 | |
|                 run(["docker", "export", container_id, "-o", str(export_path)], capture=False)
 | |
|             finally:
 | |
|                 run(["docker", "rm", container_id], capture=False)
 | |
|             rootfs_dir = ensure_directory(self.temp_dir / f"{component_name}-{platform_label}-{uuid.uuid4().hex}")
 | |
|             try:
 | |
|                 with tarfile.open(export_path, "r:*") as tar:
 | |
|                     self._safe_extract_tar(tar, rootfs_dir)
 | |
|             finally:
 | |
|                 export_path.unlink(missing_ok=True)
 | |
|             try:
 | |
|                 for file_path in rootfs_dir.rglob("*"):
 | |
|                     if not file_path.is_file() or file_path.is_symlink():
 | |
|                         continue
 | |
|                     if not self._is_elf(file_path):
 | |
|                         continue
 | |
|                     build_id, machine = self._read_build_id_and_machine(file_path)
 | |
|                     if not build_id:
 | |
|                         continue
 | |
|                     debug_file = self._debug_file_for_build_id(build_id)
 | |
|                     if not debug_file.exists():
 | |
|                         debug_file.parent.mkdir(parents=True, exist_ok=True)
 | |
|                         temp_debug = self.temp_dir / f"{build_id}.debug"
 | |
|                         with contextlib.suppress(FileNotFoundError):
 | |
|                             temp_debug.unlink()
 | |
|                         objcopy_tool = self._resolve_objcopy_tool(machine)
 | |
|                         if not objcopy_tool:
 | |
|                             self._emit_objcopy_warning(machine, platform_label, file_path)
 | |
|                             with contextlib.suppress(FileNotFoundError):
 | |
|                                 temp_debug.unlink()
 | |
|                             continue
 | |
|                         try:
 | |
|                             run([objcopy_tool, "--only-keep-debug", str(file_path), str(temp_debug)], capture=False)
 | |
|                         except CommandError:
 | |
|                             with contextlib.suppress(FileNotFoundError):
 | |
|                                 temp_debug.unlink()
 | |
|                             continue
 | |
|                         debug_file.parent.mkdir(parents=True, exist_ok=True)
 | |
|                         shutil.move(str(temp_debug), str(debug_file))
 | |
|                     sha = compute_sha256(debug_file)
 | |
|                     rel_debug = debug_file.relative_to(self.output_dir).as_posix()
 | |
|                     source_rel = file_path.relative_to(rootfs_dir).as_posix()
 | |
|                     entry = OrderedDict((
 | |
|                         ("component", component_name),
 | |
|                         ("image", image_ref),
 | |
|                         ("platform", platform_label),
 | |
|                         ("buildId", build_id),
 | |
|                         ("debugPath", rel_debug),
 | |
|                         ("sha256", sha),
 | |
|                         ("size", debug_file.stat().st_size),
 | |
|                         ("sources", [source_rel]),
 | |
|                     ))
 | |
|                     entries.append(entry)
 | |
|             finally:
 | |
|                 shutil.rmtree(rootfs_dir, ignore_errors=True)
 | |
|         return entries
 | |
| 
 | |
|     def _debug_file_for_build_id(self, build_id: str) -> pathlib.Path:
 | |
|         normalized = build_id.lower()
 | |
|         prefix = normalized[:2]
 | |
|         remainder = normalized[2:]
 | |
|         return self.debug_store_dir / prefix / f"{remainder}.debug"
 | |
| 
 | |
|     @staticmethod
 | |
|     def _safe_extract_tar(tar: tarfile.TarFile, dest: pathlib.Path) -> None:
 | |
|         dest_root = dest.resolve()
 | |
|         members = tar.getmembers()
 | |
|         for member in members:
 | |
|             member_path = (dest / member.name).resolve()
 | |
|             if not str(member_path).startswith(str(dest_root)):
 | |
|                 raise RuntimeError(f"Refusing to extract '{member.name}' outside of destination directory")
 | |
|         tar.extractall(dest)
 | |
| 
 | |
|     @staticmethod
 | |
|     def _is_elf(path: pathlib.Path) -> bool:
 | |
|         try:
 | |
|             with path.open("rb") as handle:
 | |
|                 return handle.read(4) == b"\x7fELF"
 | |
|         except OSError:
 | |
|             return False
 | |
| 
 | |
|     def _read_build_id_and_machine(self, path: pathlib.Path) -> Tuple[Optional[str], Optional[str]]:
 | |
|         try:
 | |
|             header_output = run(["readelf", "-nh", str(path)])
 | |
|         except CommandError:
 | |
|             return None, None
 | |
|         build_id: Optional[str] = None
 | |
|         machine: Optional[str] = None
 | |
|         for line in header_output.splitlines():
 | |
|             stripped = line.strip()
 | |
|             if stripped.startswith("Build ID:"):
 | |
|                 build_id = stripped.split("Build ID:", 1)[1].strip().lower()
 | |
|             elif stripped.startswith("Machine:"):
 | |
|                 machine = stripped.split("Machine:", 1)[1].strip()
 | |
|         return build_id, machine
 | |
| 
 | |
|     def _resolve_objcopy_tool(self, machine: Optional[str]) -> Optional[str]:
 | |
|         key = (machine or "generic").lower()
 | |
|         if key in self._objcopy_cache:
 | |
|             return self._objcopy_cache[key]
 | |
| 
 | |
|         env_override = None
 | |
|         if machine and "aarch64" in machine.lower():
 | |
|             env_override = os.environ.get("STELLAOPS_OBJCOPY_AARCH64")
 | |
|             candidates = [
 | |
|                 env_override,
 | |
|                 "aarch64-linux-gnu-objcopy",
 | |
|                 "llvm-objcopy",
 | |
|                 "objcopy",
 | |
|             ]
 | |
|         elif machine and any(token in machine.lower() for token in ("x86-64", "amd", "x86_64")):
 | |
|             env_override = os.environ.get("STELLAOPS_OBJCOPY_AMD64")
 | |
|             candidates = [
 | |
|                 env_override,
 | |
|                 "objcopy",
 | |
|                 "llvm-objcopy",
 | |
|             ]
 | |
|         else:
 | |
|             env_override = os.environ.get("STELLAOPS_OBJCOPY_DEFAULT")
 | |
|             candidates = [
 | |
|                 env_override,
 | |
|                 "objcopy",
 | |
|                 "llvm-objcopy",
 | |
|             ]
 | |
| 
 | |
|         for candidate in candidates:
 | |
|             if not candidate:
 | |
|                 continue
 | |
|             tool = shutil.which(candidate)
 | |
|             if tool:
 | |
|                 self._objcopy_cache[key] = tool
 | |
|                 return tool
 | |
|         self._objcopy_cache[key] = None
 | |
|         return None
 | |
| 
 | |
|     def _emit_objcopy_warning(self, machine: Optional[str], platform: str, file_path: pathlib.Path) -> None:
 | |
|         machine_label = machine or "unknown-machine"
 | |
|         count = self._missing_symbol_platforms.get(platform, 0)
 | |
|         self._missing_symbol_platforms[platform] = count + 1
 | |
|         if count == 0:
 | |
|             sys.stderr.write(
 | |
|                 f"[warn] no objcopy tool available for {machine_label}; skipping debug extraction for {file_path}.\n"
 | |
|             )
 | |
| 
 | |
|     # ----------------
 | |
|     # Helm + compose
 | |
|     # ----------------
 | |
|     def _package_helm(self) -> Optional[Mapping[str, Any]]:
 | |
|         helm_cfg = self.config.get("helm")
 | |
|         if not helm_cfg:
 | |
|             return None
 | |
|         chart_path = helm_cfg.get("chartPath")
 | |
|         if not chart_path:
 | |
|             return None
 | |
|         chart_dir = self.repo_root / chart_path
 | |
|         output_dir = ensure_directory(self.output_dir / "helm")
 | |
|         archive_path = output_dir / f"stellaops-{self.version}.tgz"
 | |
|         if not self.dry_run:
 | |
|             cmd = [
 | |
|                 "helm",
 | |
|                 "package",
 | |
|                 str(chart_dir),
 | |
|                 "--destination",
 | |
|                 str(output_dir),
 | |
|                 "--version",
 | |
|                 self.version,
 | |
|                 "--app-version",
 | |
|                 self.version,
 | |
|             ]
 | |
|             run(cmd)
 | |
|             packaged = next(output_dir.glob("*.tgz"), None)
 | |
|             if packaged and packaged != archive_path:
 | |
|                 packaged.rename(archive_path)
 | |
|         digest = compute_sha256(archive_path) if archive_path.exists() else None
 | |
|         if archive_path.exists() and archive_path.is_relative_to(self.output_dir):
 | |
|             manifest_path = str(archive_path.relative_to(self.output_dir))
 | |
|         elif archive_path.exists() and archive_path.is_relative_to(self.output_dir.parent):
 | |
|             manifest_path = str(archive_path.relative_to(self.output_dir.parent))
 | |
|         else:
 | |
|             manifest_path = f"helm/{archive_path.name}"
 | |
|         return OrderedDict((
 | |
|             ("name", "stellaops"),
 | |
|             ("version", self.version),
 | |
|             ("path", manifest_path),
 | |
|             ("sha256", digest),
 | |
|         ))
 | |
| 
 | |
|     def _digest_compose_files(self) -> List[Mapping[str, Any]]:
 | |
|         compose_cfg = self.config.get("compose", {})
 | |
|         files = compose_cfg.get("files", [])
 | |
|         entries: List[Mapping[str, Any]] = []
 | |
|         for rel_path in files:
 | |
|             src = self.repo_root / rel_path
 | |
|             if not src.exists():
 | |
|                 continue
 | |
|             digest = compute_sha256(src)
 | |
|             entries.append(OrderedDict((
 | |
|                 ("name", pathlib.Path(rel_path).name),
 | |
|                 ("path", rel_path),
 | |
|                 ("sha256", digest),
 | |
|             )))
 | |
|         return entries
 | |
| 
 | |
|     # ----------------
 | |
|     # Manifest assembly
 | |
|     # ----------------
 | |
|     def _compose_manifest(
 | |
|         self,
 | |
|         components: List[Mapping[str, Any]],
 | |
|         helm_meta: Optional[Mapping[str, Any]],
 | |
|         compose_meta: List[Mapping[str, Any]],
 | |
|         debug_meta: Optional[Mapping[str, Any]],
 | |
|         cli_meta: Sequence[Mapping[str, Any]],
 | |
|     ) -> Dict[str, Any]:
 | |
|         manifest = OrderedDict()
 | |
|         manifest["release"] = OrderedDict((
 | |
|             ("version", self.version),
 | |
|             ("channel", self.channel),
 | |
|             ("date", self.release_date),
 | |
|             ("calendar", self.calendar),
 | |
|         ))
 | |
|         manifest["components"] = components
 | |
|         if helm_meta:
 | |
|             manifest["charts"] = [helm_meta]
 | |
|         if compose_meta:
 | |
|             manifest["compose"] = compose_meta
 | |
|         if debug_meta:
 | |
|             manifest["debugStore"] = debug_meta
 | |
|         if cli_meta:
 | |
|             manifest["cli"] = list(cli_meta)
 | |
|         return manifest
 | |
| 
 | |
| 
 | |
| def parse_args(argv: Optional[Sequence[str]] = None) -> argparse.Namespace:
 | |
|     parser = argparse.ArgumentParser(description="Build StellaOps release artefacts deterministically")
 | |
|     parser.add_argument("--config", type=pathlib.Path, default=DEFAULT_CONFIG, help="Path to release config JSON")
 | |
|     parser.add_argument("--version", required=True, help="Release version string (e.g. 2025.10.0-edge)")
 | |
|     parser.add_argument("--channel", required=True, help="Release channel (edge|stable|lts)")
 | |
|     parser.add_argument("--calendar", help="Calendar tag (YYYY.MM); defaults derived from version")
 | |
|     parser.add_argument("--git-sha", default=os.environ.get("GIT_COMMIT", "unknown"), help="Git revision to embed")
 | |
|     parser.add_argument("--output", type=pathlib.Path, default=REPO_ROOT / "out/release", help="Output directory for artefacts")
 | |
|     parser.add_argument("--no-push", action="store_true", help="Do not push images (use docker load)")
 | |
|     parser.add_argument("--dry-run", action="store_true", help="Print steps without executing commands")
 | |
|     parser.add_argument("--registry", help="Override registry root (e.g. localhost:5000/stellaops)")
 | |
|     parser.add_argument("--platform", dest="platforms", action="append", metavar="PLATFORM", help="Override build platforms (repeatable)")
 | |
|     parser.add_argument("--skip-signing", action="store_true", help="Skip cosign signing/attestation steps")
 | |
|     parser.add_argument("--cosign-key", dest="cosign_key", help="Override COSIGN_KEY_REF value")
 | |
|     parser.add_argument("--cosign-password", dest="cosign_password", help="Password for cosign key")
 | |
|     parser.add_argument("--cosign-identity-token", dest="cosign_identity_token", help="Identity token for keyless cosign flows")
 | |
|     parser.add_argument("--no-transparency", action="store_true", help="Disable Rekor transparency log upload during signing")
 | |
|     return parser.parse_args(argv)
 | |
| 
 | |
| 
 | |
| def write_manifest(manifest: Mapping[str, Any], output_dir: pathlib.Path) -> pathlib.Path:
 | |
|     # Copy manifest to avoid mutating input when computing checksum
 | |
|     base_manifest = OrderedDict(manifest)
 | |
|     yaml_without_checksum = dump_yaml(base_manifest)
 | |
|     digest = hashlib.sha256(yaml_without_checksum.encode("utf-8")).hexdigest()
 | |
|     manifest_with_checksum = OrderedDict(base_manifest)
 | |
|     manifest_with_checksum["checksums"] = OrderedDict((("sha256", digest),))
 | |
|     final_yaml = dump_yaml(manifest_with_checksum)
 | |
|     output_path = output_dir / "release.yaml"
 | |
|     with output_path.open("w", encoding="utf-8") as handle:
 | |
|         handle.write(final_yaml)
 | |
|     sha_path = output_path.with_name(output_path.name + ".sha256")
 | |
|     yaml_file_digest = compute_sha256(output_path)
 | |
|     sha_path.write_text(f"{yaml_file_digest}  {output_path.name}\n", encoding="utf-8")
 | |
| 
 | |
|     json_text = json.dumps(manifest_with_checksum, indent=2)
 | |
|     json_path = output_dir / "release.json"
 | |
|     with json_path.open("w", encoding="utf-8") as handle:
 | |
|         handle.write(json_text)
 | |
|         handle.write("\n")
 | |
|     json_digest = compute_sha256(json_path)
 | |
|     json_sha_path = json_path.with_name(json_path.name + ".sha256")
 | |
|     json_sha_path.write_text(f"{json_digest}  {json_path.name}\n", encoding="utf-8")
 | |
|     return output_path
 | |
| 
 | |
| 
 | |
| def main(argv: Optional[Sequence[str]] = None) -> int:
 | |
|     args = parse_args(argv)
 | |
|     config = load_json_config(args.config)
 | |
|     release_date = utc_now_iso()
 | |
|     calendar = sanitize_calendar(args.version, args.calendar)
 | |
|     builder = ReleaseBuilder(
 | |
|         repo_root=REPO_ROOT,
 | |
|         config=config,
 | |
|         version=args.version,
 | |
|         channel=args.channel,
 | |
|         calendar=calendar,
 | |
|         release_date=release_date,
 | |
|         git_sha=args.git_sha,
 | |
|         output_dir=args.output,
 | |
|         push=not args.no_push,
 | |
|         dry_run=args.dry_run,
 | |
|         registry_override=args.registry,
 | |
|         platforms_override=args.platforms,
 | |
|         skip_signing=args.skip_signing,
 | |
|         cosign_key_ref=args.cosign_key,
 | |
|         cosign_password=args.cosign_password,
 | |
|         cosign_identity_token=args.cosign_identity_token,
 | |
|         tlog_upload=not args.no_transparency,
 | |
|     )
 | |
|     manifest = builder.run()
 | |
|     manifest_path = write_manifest(manifest, builder.output_dir)
 | |
|     print(f"✅ Release manifest written to {manifest_path}")
 | |
|     return 0
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     raise SystemExit(main())
 |