1104 lines
44 KiB
Python
1104 lines
44 KiB
Python
#!/usr/bin/env python3
|
|
"""Deterministic release pipeline helper for StellaOps.
|
|
|
|
This script builds service containers, generates SBOM and provenance artefacts,
|
|
signs them with cosign, and writes a channel-specific release manifest.
|
|
|
|
The workflow expects external tooling to be available on PATH:
|
|
- docker (with buildx)
|
|
- cosign
|
|
- helm
|
|
- npm / node (for the UI build)
|
|
- dotnet SDK (for BuildX plugin publication)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import contextlib
|
|
import datetime as dt
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import pathlib
|
|
import re
|
|
import shlex
|
|
import shutil
|
|
import stat
|
|
import subprocess
|
|
import sys
|
|
import tarfile
|
|
import tempfile
|
|
import uuid
|
|
import zipfile
|
|
from collections import OrderedDict
|
|
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Sequence, Tuple
|
|
|
|
REPO_ROOT = pathlib.Path(__file__).resolve().parents[3]
|
|
DEFAULT_CONFIG = REPO_ROOT / "ops/devops/release/components.json"
|
|
|
|
class CommandError(RuntimeError):
|
|
pass
|
|
|
|
def run(cmd: Sequence[str], *, cwd: Optional[pathlib.Path] = None, env: Optional[Mapping[str, str]] = None, capture: bool = True) -> str:
|
|
"""Run a subprocess command, returning stdout (text)."""
|
|
process_env = os.environ.copy()
|
|
if env:
|
|
process_env.update(env)
|
|
result = subprocess.run(
|
|
list(cmd),
|
|
cwd=str(cwd) if cwd else None,
|
|
env=process_env,
|
|
check=False,
|
|
capture_output=capture,
|
|
text=True,
|
|
)
|
|
if process_env.get("STELLAOPS_RELEASE_DEBUG"):
|
|
sys.stderr.write(f"[debug] {' '.join(shlex.quote(c) for c in cmd)}\n")
|
|
if capture:
|
|
sys.stderr.write(result.stdout)
|
|
sys.stderr.write(result.stderr)
|
|
if result.returncode != 0:
|
|
stdout = result.stdout if capture else ""
|
|
stderr = result.stderr if capture else ""
|
|
raise CommandError(f"Command failed ({result.returncode}): {' '.join(cmd)}\nSTDOUT:\n{stdout}\nSTDERR:\n{stderr}")
|
|
|
|
return result.stdout if capture else ""
|
|
|
|
|
|
def load_json_config(path: pathlib.Path) -> Dict[str, Any]:
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
return json.load(handle)
|
|
|
|
|
|
def ensure_directory(path: pathlib.Path) -> pathlib.Path:
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
return path
|
|
|
|
|
|
def compute_sha256(path: pathlib.Path) -> str:
|
|
sha = hashlib.sha256()
|
|
with path.open("rb") as handle:
|
|
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
|
|
sha.update(chunk)
|
|
return sha.hexdigest()
|
|
|
|
|
|
def format_scalar(value: Any) -> str:
|
|
if isinstance(value, bool):
|
|
return "true" if value else "false"
|
|
if value is None:
|
|
return "null"
|
|
if isinstance(value, (int, float)):
|
|
return str(value)
|
|
text = str(value)
|
|
if text == "":
|
|
return '""'
|
|
if re.search(r"[\s:#\-\[\]\{\}]", text):
|
|
return json.dumps(text, ensure_ascii=False)
|
|
return text
|
|
|
|
|
|
def _yaml_lines(value: Any, indent: int = 0) -> List[str]:
|
|
pad = " " * indent
|
|
if isinstance(value, Mapping):
|
|
lines: List[str] = []
|
|
for key, val in value.items():
|
|
if isinstance(val, (Mapping, list)):
|
|
lines.append(f"{pad}{key}:")
|
|
lines.extend(_yaml_lines(val, indent + 1))
|
|
else:
|
|
lines.append(f"{pad}{key}: {format_scalar(val)}")
|
|
if not lines:
|
|
lines.append(f"{pad}{{}}")
|
|
return lines
|
|
if isinstance(value, list):
|
|
lines = []
|
|
if not value:
|
|
lines.append(f"{pad}[]")
|
|
return lines
|
|
for item in value:
|
|
if isinstance(item, (Mapping, list)):
|
|
lines.append(f"{pad}-")
|
|
lines.extend(_yaml_lines(item, indent + 1))
|
|
else:
|
|
lines.append(f"{pad}- {format_scalar(item)}")
|
|
return lines
|
|
return [f"{pad}{format_scalar(value)}"]
|
|
|
|
|
|
def dump_yaml(data: Mapping[str, Any]) -> str:
|
|
lines: List[str] = _yaml_lines(data)
|
|
return "\n".join(lines) + "\n"
|
|
|
|
|
|
def utc_now_iso() -> str:
|
|
return dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
|
|
|
|
|
|
def sanitize_calendar(version: str, explicit: Optional[str]) -> str:
|
|
if explicit:
|
|
return explicit
|
|
# Expect version like 2025.10.0-edge or 2.4.1
|
|
parts = re.findall(r"\d+", version)
|
|
if len(parts) >= 2:
|
|
return f"{parts[0]}.{parts[1]}"
|
|
return dt.datetime.now(tz=dt.timezone.utc).strftime("%Y.%m")
|
|
|
|
|
|
class ReleaseBuilder:
|
|
def __init__(
|
|
self,
|
|
*,
|
|
repo_root: pathlib.Path,
|
|
config: Mapping[str, Any],
|
|
version: str,
|
|
channel: str,
|
|
calendar: str,
|
|
release_date: str,
|
|
git_sha: str,
|
|
output_dir: pathlib.Path,
|
|
push: bool,
|
|
dry_run: bool,
|
|
registry_override: Optional[str] = None,
|
|
platforms_override: Optional[Sequence[str]] = None,
|
|
skip_signing: bool = False,
|
|
cosign_key_ref: Optional[str] = None,
|
|
cosign_password: Optional[str] = None,
|
|
cosign_identity_token: Optional[str] = None,
|
|
tlog_upload: bool = True,
|
|
) -> None:
|
|
self.repo_root = repo_root
|
|
self.config = config
|
|
self.version = version
|
|
self.channel = channel
|
|
self.calendar = calendar
|
|
self.release_date = release_date
|
|
self.git_sha = git_sha
|
|
self.output_dir = ensure_directory(output_dir)
|
|
self.push = push
|
|
self.dry_run = dry_run
|
|
self.registry = registry_override or config.get("registry")
|
|
if not self.registry:
|
|
raise ValueError("Config missing 'registry'")
|
|
platforms = list(platforms_override) if platforms_override else config.get("platforms")
|
|
if not platforms:
|
|
platforms = ["linux/amd64", "linux/arm64"]
|
|
self.platforms = list(platforms)
|
|
self.source_date_epoch = str(int(dt.datetime.fromisoformat(release_date.replace("Z", "+00:00")).timestamp()))
|
|
self.artifacts_dir = ensure_directory(self.output_dir / "artifacts")
|
|
self.sboms_dir = ensure_directory(self.artifacts_dir / "sboms")
|
|
self.provenance_dir = ensure_directory(self.artifacts_dir / "provenance")
|
|
self.signature_dir = ensure_directory(self.artifacts_dir / "signatures")
|
|
self.metadata_dir = ensure_directory(self.artifacts_dir / "metadata")
|
|
self.debug_dir = ensure_directory(self.output_dir / "debug")
|
|
self.debug_store_dir = ensure_directory(self.debug_dir / ".build-id")
|
|
self.cli_config = config.get("cli")
|
|
self.cli_output_dir = ensure_directory(self.output_dir / "cli") if self.cli_config else None
|
|
self.temp_dir = pathlib.Path(tempfile.mkdtemp(prefix="stellaops-release-"))
|
|
self.skip_signing = skip_signing
|
|
self.tlog_upload = tlog_upload
|
|
self.cosign_key_ref = cosign_key_ref or os.environ.get("COSIGN_KEY_REF")
|
|
self.cosign_identity_token = cosign_identity_token or os.environ.get("COSIGN_IDENTITY_TOKEN")
|
|
password = cosign_password if cosign_password is not None else os.environ.get("COSIGN_PASSWORD", "")
|
|
self.cosign_env = {
|
|
"COSIGN_PASSWORD": password,
|
|
"COSIGN_EXPERIMENTAL": "1",
|
|
"COSIGN_ALLOW_HTTP_REGISTRY": os.environ.get("COSIGN_ALLOW_HTTP_REGISTRY", "1"),
|
|
"COSIGN_DOCKER_MEDIA_TYPES": os.environ.get("COSIGN_DOCKER_MEDIA_TYPES", "1"),
|
|
}
|
|
# Cache resolved objcopy binaries keyed by machine identifier to avoid repeated lookups.
|
|
self._objcopy_cache: Dict[str, Optional[str]] = {}
|
|
self._missing_symbol_platforms: Dict[str, int] = {}
|
|
|
|
# ----------------
|
|
# Build steps
|
|
# ----------------
|
|
def run(self) -> Dict[str, Any]:
|
|
components_result = []
|
|
if self.dry_run:
|
|
print("⚠️ Dry-run enabled; commands will be skipped")
|
|
self._prime_buildx_plugin()
|
|
for component in self.config.get("components", []):
|
|
result = self._build_component(component)
|
|
components_result.append(result)
|
|
helm_meta = self._package_helm()
|
|
compose_meta = self._digest_compose_files()
|
|
debug_meta = self._collect_debug_store(components_result)
|
|
cli_meta = self._build_cli_artifacts()
|
|
manifest = self._compose_manifest(components_result, helm_meta, compose_meta, debug_meta, cli_meta)
|
|
return manifest
|
|
|
|
def _prime_buildx_plugin(self) -> None:
|
|
plugin_cfg = self.config.get("buildxPlugin")
|
|
if not plugin_cfg:
|
|
return
|
|
project = plugin_cfg.get("project")
|
|
if not project:
|
|
return
|
|
out_dir = ensure_directory(self.temp_dir / "buildx")
|
|
if not self.dry_run:
|
|
run([
|
|
"dotnet",
|
|
"publish",
|
|
project,
|
|
"-c",
|
|
"Release",
|
|
"-o",
|
|
str(out_dir),
|
|
])
|
|
cas_dir = ensure_directory(self.temp_dir / "cas")
|
|
run([
|
|
"dotnet",
|
|
str(out_dir / "StellaOps.Scanner.Sbomer.BuildXPlugin.dll"),
|
|
"handshake",
|
|
"--manifest",
|
|
str(out_dir),
|
|
"--cas",
|
|
str(cas_dir),
|
|
])
|
|
|
|
def _component_tags(self, repo: str) -> List[str]:
|
|
base = f"{self.registry}/{repo}"
|
|
tags = [f"{base}:{self.version}"]
|
|
if self.channel:
|
|
tags.append(f"{base}:{self.channel}")
|
|
return tags
|
|
|
|
def _component_ref(self, repo: str, digest: str) -> str:
|
|
return f"{self.registry}/{repo}@{digest}"
|
|
|
|
def _relative_path(self, path: pathlib.Path) -> str:
|
|
try:
|
|
return str(path.relative_to(self.output_dir.parent))
|
|
except ValueError:
|
|
return str(path)
|
|
|
|
def _build_component(self, component: Mapping[str, Any]) -> Mapping[str, Any]:
|
|
name = component["name"]
|
|
repo = component.get("repository", name)
|
|
kind = component.get("kind", "dotnet-service")
|
|
dockerfile = component.get("dockerfile")
|
|
if not dockerfile:
|
|
raise ValueError(f"Component {name} missing dockerfile")
|
|
context = component.get("context", ".")
|
|
iid_file = self.temp_dir / f"{name}.iid"
|
|
metadata_file = self.metadata_dir / f"{name}.metadata.json"
|
|
|
|
build_args = {
|
|
"VERSION": self.version,
|
|
"CHANNEL": self.channel,
|
|
"GIT_SHA": self.git_sha,
|
|
"SOURCE_DATE_EPOCH": self.source_date_epoch,
|
|
}
|
|
docker_cfg = self.config.get("docker", {})
|
|
if kind == "dotnet-service":
|
|
build_args.update({
|
|
"PROJECT": component["project"],
|
|
"ENTRYPOINT_DLL": component["entrypoint"],
|
|
"SDK_IMAGE": docker_cfg.get("sdkImage", "mcr.microsoft.com/dotnet/nightly/sdk:10.0"),
|
|
"RUNTIME_IMAGE": docker_cfg.get("runtimeImage", "gcr.io/distroless/dotnet/aspnet:latest"),
|
|
})
|
|
elif kind == "angular-ui":
|
|
build_args.update({
|
|
"NODE_IMAGE": docker_cfg.get("nodeImage", "node:20.14.0-bookworm"),
|
|
"NGINX_IMAGE": docker_cfg.get("nginxImage", "nginx:1.27-alpine"),
|
|
})
|
|
else:
|
|
raise ValueError(f"Unsupported component kind {kind}")
|
|
|
|
tags = self._component_tags(repo)
|
|
build_cmd = [
|
|
"docker",
|
|
"buildx",
|
|
"build",
|
|
"--file",
|
|
dockerfile,
|
|
"--metadata-file",
|
|
str(metadata_file),
|
|
"--iidfile",
|
|
str(iid_file),
|
|
"--progress",
|
|
"plain",
|
|
"--platform",
|
|
",".join(self.platforms),
|
|
]
|
|
for key, value in build_args.items():
|
|
build_cmd.extend(["--build-arg", f"{key}={value}"])
|
|
for tag in tags:
|
|
build_cmd.extend(["--tag", tag])
|
|
build_cmd.extend([
|
|
"--attest",
|
|
"type=sbom",
|
|
"--attest",
|
|
"type=provenance,mode=max",
|
|
])
|
|
if self.push:
|
|
build_cmd.append("--push")
|
|
else:
|
|
build_cmd.append("--load")
|
|
build_cmd.append(context)
|
|
|
|
if not self.dry_run:
|
|
run(build_cmd, cwd=self.repo_root)
|
|
|
|
digest = iid_file.read_text(encoding="utf-8").strip() if iid_file.exists() else ""
|
|
image_ref = self._component_ref(repo, digest) if digest else ""
|
|
|
|
bundle_info = self._sign_image(name, image_ref, tags)
|
|
sbom_info = self._generate_sbom(name, image_ref)
|
|
provenance_info = self._attach_provenance(name, image_ref)
|
|
|
|
component_entry = OrderedDict()
|
|
component_entry["name"] = name
|
|
if digest:
|
|
component_entry["image"] = image_ref
|
|
component_entry["tags"] = tags
|
|
if sbom_info:
|
|
component_entry["sbom"] = sbom_info
|
|
if provenance_info:
|
|
component_entry["provenance"] = provenance_info
|
|
if bundle_info:
|
|
component_entry["signature"] = bundle_info
|
|
if metadata_file.exists():
|
|
metadata_rel = (
|
|
str(metadata_file.relative_to(self.output_dir.parent))
|
|
if metadata_file.is_relative_to(self.output_dir.parent)
|
|
else str(metadata_file)
|
|
)
|
|
component_entry["metadata"] = OrderedDict((
|
|
("path", metadata_rel),
|
|
("sha256", compute_sha256(metadata_file)),
|
|
))
|
|
return component_entry
|
|
|
|
def _sign_image(self, name: str, image_ref: str, tags: Sequence[str]) -> Optional[Mapping[str, Any]]:
|
|
if self.skip_signing:
|
|
return None
|
|
if not image_ref:
|
|
return None
|
|
if not (self.cosign_key_ref or self.cosign_identity_token):
|
|
raise ValueError("Signing requested but no cosign key or identity token provided. Use --skip-signing to bypass.")
|
|
signature_path = self.signature_dir / f"{name}.signature"
|
|
cmd = ["cosign", "sign", "--yes"]
|
|
if self.cosign_key_ref:
|
|
cmd.extend(["--key", self.cosign_key_ref])
|
|
if self.cosign_identity_token:
|
|
cmd.extend(["--identity-token", self.cosign_identity_token])
|
|
if not self.tlog_upload:
|
|
cmd.append("--tlog-upload=false")
|
|
cmd.append("--allow-http-registry")
|
|
cmd.append(image_ref)
|
|
if self.dry_run:
|
|
return None
|
|
run(cmd, env=self.cosign_env)
|
|
signature_data = run([
|
|
"cosign",
|
|
"download",
|
|
"signature",
|
|
"--allow-http-registry",
|
|
image_ref,
|
|
])
|
|
signature_path.write_text(signature_data, encoding="utf-8")
|
|
signature_sha = compute_sha256(signature_path)
|
|
signature_ref = run([
|
|
"cosign",
|
|
"triangulate",
|
|
"--allow-http-registry",
|
|
image_ref,
|
|
]).strip()
|
|
return OrderedDict(
|
|
(
|
|
("signature", OrderedDict((
|
|
("path", str(signature_path.relative_to(self.output_dir.parent)) if signature_path.is_relative_to(self.output_dir.parent) else str(signature_path)),
|
|
("sha256", signature_sha),
|
|
("ref", signature_ref),
|
|
("tlogUploaded", self.tlog_upload),
|
|
))),
|
|
)
|
|
)
|
|
|
|
def _generate_sbom(self, name: str, image_ref: str) -> Optional[Mapping[str, Any]]:
|
|
if not image_ref or self.dry_run:
|
|
return None
|
|
sbom_path = self.sboms_dir / f"{name}.cyclonedx.json"
|
|
run([
|
|
"docker",
|
|
"sbom",
|
|
image_ref,
|
|
"--format",
|
|
"cyclonedx-json",
|
|
"--output",
|
|
str(sbom_path),
|
|
])
|
|
entry = OrderedDict((
|
|
("path", str(sbom_path.relative_to(self.output_dir.parent)) if sbom_path.is_relative_to(self.output_dir.parent) else str(sbom_path)),
|
|
("sha256", compute_sha256(sbom_path)),
|
|
))
|
|
if self.skip_signing:
|
|
return entry
|
|
attach_cmd = [
|
|
"cosign",
|
|
"attach",
|
|
"sbom",
|
|
"--sbom",
|
|
str(sbom_path),
|
|
"--type",
|
|
"cyclonedx",
|
|
]
|
|
if self.cosign_key_ref:
|
|
attach_cmd.extend(["--key", self.cosign_key_ref])
|
|
attach_cmd.append("--allow-http-registry")
|
|
attach_cmd.append(image_ref)
|
|
run(attach_cmd, env=self.cosign_env)
|
|
reference = run(["cosign", "triangulate", "--type", "sbom", "--allow-http-registry", image_ref]).strip()
|
|
entry["ref"] = reference
|
|
return entry
|
|
|
|
def _attach_provenance(self, name: str, image_ref: str) -> Optional[Mapping[str, Any]]:
|
|
if not image_ref or self.dry_run:
|
|
return None
|
|
predicate = OrderedDict()
|
|
predicate["buildDefinition"] = OrderedDict(
|
|
(
|
|
("buildType", "https://git.stella-ops.org/stellaops/release"),
|
|
("externalParameters", OrderedDict((
|
|
("component", name),
|
|
("version", self.version),
|
|
("channel", self.channel),
|
|
))),
|
|
)
|
|
)
|
|
predicate["runDetails"] = OrderedDict(
|
|
(
|
|
("builder", OrderedDict((("id", "https://github.com/actions"),))),
|
|
("metadata", OrderedDict((("finishedOn", self.release_date),))),
|
|
)
|
|
)
|
|
predicate_path = self.provenance_dir / f"{name}.provenance.json"
|
|
with predicate_path.open("w", encoding="utf-8") as handle:
|
|
json.dump(predicate, handle, indent=2, sort_keys=True)
|
|
handle.write("\n")
|
|
entry = OrderedDict((
|
|
("path", str(predicate_path.relative_to(self.output_dir.parent)) if predicate_path.is_relative_to(self.output_dir.parent) else str(predicate_path)),
|
|
("sha256", compute_sha256(predicate_path)),
|
|
))
|
|
if self.skip_signing:
|
|
return entry
|
|
cmd = [
|
|
"cosign",
|
|
"attest",
|
|
"--predicate",
|
|
str(predicate_path),
|
|
"--type",
|
|
"https://slsa.dev/provenance/v1",
|
|
]
|
|
if self.cosign_key_ref:
|
|
cmd.extend(["--key", self.cosign_key_ref])
|
|
if not self.tlog_upload:
|
|
cmd.append("--tlog-upload=false")
|
|
cmd.append("--allow-http-registry")
|
|
cmd.append(image_ref)
|
|
run(cmd, env=self.cosign_env)
|
|
ref = run([
|
|
"cosign",
|
|
"triangulate",
|
|
"--type",
|
|
"https://slsa.dev/provenance/v1",
|
|
"--allow-http-registry",
|
|
image_ref,
|
|
]).strip()
|
|
entry["ref"] = ref
|
|
return entry
|
|
|
|
def _collect_debug_store(self, components: Sequence[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]:
|
|
if self.dry_run:
|
|
return None
|
|
debug_records: Dict[Tuple[str, str], OrderedDict[str, Any]] = {}
|
|
for component in components:
|
|
image_ref = component.get("image")
|
|
if not image_ref:
|
|
continue
|
|
name = component.get("name", "unknown")
|
|
entries = self._extract_debug_entries(name, image_ref)
|
|
for entry in entries:
|
|
key = (entry["platform"], entry["buildId"])
|
|
existing = debug_records.get(key)
|
|
if existing is None:
|
|
record = OrderedDict((
|
|
("buildId", entry["buildId"]),
|
|
("platform", entry["platform"]),
|
|
("debugPath", entry["debugPath"]),
|
|
("sha256", entry["sha256"]),
|
|
("size", entry["size"]),
|
|
("components", [entry["component"]]),
|
|
("images", [entry["image"]]),
|
|
("sources", list(entry["sources"])),
|
|
))
|
|
debug_records[key] = record
|
|
else:
|
|
if entry["sha256"] != existing["sha256"]:
|
|
raise RuntimeError(
|
|
f"Build-id {entry['buildId']} for platform {entry['platform']} produced conflicting hashes"
|
|
)
|
|
if entry["component"] not in existing["components"]:
|
|
existing["components"].append(entry["component"])
|
|
if entry["image"] not in existing["images"]:
|
|
existing["images"].append(entry["image"])
|
|
for source in entry["sources"]:
|
|
if source not in existing["sources"]:
|
|
existing["sources"].append(source)
|
|
if not debug_records:
|
|
sys.stderr.write(
|
|
"[error] release build produced no debug artefacts; enable symbol extraction so out/release/debug is populated (DEVOPS-REL-17-004).\n"
|
|
)
|
|
# Remove empty directories before failing
|
|
with contextlib.suppress(FileNotFoundError, OSError):
|
|
if not any(self.debug_store_dir.iterdir()):
|
|
self.debug_store_dir.rmdir()
|
|
with contextlib.suppress(FileNotFoundError, OSError):
|
|
if not any(self.debug_dir.iterdir()):
|
|
self.debug_dir.rmdir()
|
|
raise RuntimeError(
|
|
"Debug store collection produced no build-id artefacts (DEVOPS-REL-17-004)."
|
|
)
|
|
entries = []
|
|
for record in debug_records.values():
|
|
entry = OrderedDict((
|
|
("buildId", record["buildId"]),
|
|
("platform", record["platform"]),
|
|
("debugPath", record["debugPath"]),
|
|
("sha256", record["sha256"]),
|
|
("size", record["size"]),
|
|
("components", sorted(record["components"])),
|
|
("images", sorted(record["images"])),
|
|
("sources", sorted(record["sources"])),
|
|
))
|
|
entries.append(entry)
|
|
entries.sort(key=lambda item: (item["platform"], item["buildId"]))
|
|
manifest_path = self.debug_dir / "debug-manifest.json"
|
|
platform_counts: Dict[str, int] = {}
|
|
for entry in entries:
|
|
platform_counts[entry["platform"]] = platform_counts.get(entry["platform"], 0) + 1
|
|
missing_platforms = [
|
|
platform
|
|
for platform in self._missing_symbol_platforms
|
|
if platform_counts.get(platform, 0) == 0
|
|
]
|
|
if missing_platforms:
|
|
raise RuntimeError(
|
|
"Debug extraction skipped all binaries for platforms without objcopy support: "
|
|
+ ", ".join(sorted(missing_platforms))
|
|
)
|
|
manifest_data = OrderedDict((
|
|
("generatedAt", self.release_date),
|
|
("version", self.version),
|
|
("channel", self.channel),
|
|
("artifacts", entries),
|
|
))
|
|
with manifest_path.open("w", encoding="utf-8") as handle:
|
|
json.dump(manifest_data, handle, indent=2)
|
|
handle.write("\n")
|
|
manifest_sha = compute_sha256(manifest_path)
|
|
sha_path = manifest_path.with_suffix(manifest_path.suffix + ".sha256")
|
|
sha_path.write_text(f"{manifest_sha} {manifest_path.name}\n", encoding="utf-8")
|
|
manifest_rel = manifest_path.relative_to(self.output_dir).as_posix()
|
|
store_rel = self.debug_store_dir.relative_to(self.output_dir).as_posix()
|
|
platforms = sorted({entry["platform"] for entry in entries})
|
|
return OrderedDict((
|
|
("manifest", manifest_rel),
|
|
("sha256", manifest_sha),
|
|
("entries", len(entries)),
|
|
("platforms", platforms),
|
|
("directory", store_rel),
|
|
))
|
|
|
|
# ----------------
|
|
# CLI packaging
|
|
# ----------------
|
|
def _build_cli_artifacts(self) -> List[Mapping[str, Any]]:
|
|
if not self.cli_config or self.dry_run:
|
|
return []
|
|
project_rel = self.cli_config.get("project")
|
|
if not project_rel:
|
|
return []
|
|
project_path = (self.repo_root / project_rel).resolve()
|
|
if not project_path.exists():
|
|
raise FileNotFoundError(f"CLI project not found at {project_path}")
|
|
runtimes: Sequence[str] = self.cli_config.get("runtimes", [])
|
|
if not runtimes:
|
|
runtimes = ("linux-x64",)
|
|
package_prefix = self.cli_config.get("packagePrefix", "stella")
|
|
ensure_directory(self.cli_output_dir or (self.output_dir / "cli"))
|
|
|
|
cli_entries: List[Mapping[str, Any]] = []
|
|
for runtime in runtimes:
|
|
entry = self._build_cli_for_runtime(project_path, runtime, package_prefix)
|
|
cli_entries.append(entry)
|
|
return cli_entries
|
|
|
|
def _build_cli_for_runtime(
|
|
self,
|
|
project_path: pathlib.Path,
|
|
runtime: str,
|
|
package_prefix: str,
|
|
) -> Mapping[str, Any]:
|
|
publish_dir = ensure_directory(self.temp_dir / f"cli-publish-{runtime}")
|
|
publish_cmd = [
|
|
"dotnet",
|
|
"publish",
|
|
str(project_path),
|
|
"--configuration",
|
|
"Release",
|
|
"--runtime",
|
|
runtime,
|
|
"--self-contained",
|
|
"true",
|
|
"/p:PublishSingleFile=true",
|
|
"/p:IncludeNativeLibrariesForSelfExtract=true",
|
|
"/p:EnableCompressionInSingleFile=true",
|
|
"/p:InvariantGlobalization=true",
|
|
"--output",
|
|
str(publish_dir),
|
|
]
|
|
run(publish_cmd, cwd=self.repo_root)
|
|
|
|
original_name = "StellaOps.Cli"
|
|
if runtime.startswith("win"):
|
|
source = publish_dir / f"{original_name}.exe"
|
|
target = publish_dir / "stella.exe"
|
|
else:
|
|
source = publish_dir / original_name
|
|
target = publish_dir / "stella"
|
|
if source.exists():
|
|
if target.exists():
|
|
target.unlink()
|
|
source.rename(target)
|
|
if not runtime.startswith("win"):
|
|
target.chmod(target.stat().st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
|
|
|
|
package_dir = self.cli_output_dir or (self.output_dir / "cli")
|
|
ensure_directory(package_dir)
|
|
archive_name = f"{package_prefix}-{self.version}-{runtime}"
|
|
if runtime.startswith("win"):
|
|
package_path = package_dir / f"{archive_name}.zip"
|
|
self._archive_zip(publish_dir, package_path)
|
|
else:
|
|
package_path = package_dir / f"{archive_name}.tar.gz"
|
|
self._archive_tar(publish_dir, package_path)
|
|
|
|
digest = compute_sha256(package_path)
|
|
sha_path = package_path.with_suffix(package_path.suffix + ".sha256")
|
|
sha_path.write_text(f"{digest} {package_path.name}\n", encoding="utf-8")
|
|
|
|
archive_info = OrderedDict((
|
|
("path", self._relative_path(package_path)),
|
|
("sha256", digest),
|
|
))
|
|
signature_info = self._sign_file(package_path)
|
|
if signature_info:
|
|
archive_info["signature"] = signature_info
|
|
|
|
sbom_info = self._generate_cli_sbom(runtime, publish_dir)
|
|
|
|
entry = OrderedDict((
|
|
("runtime", runtime),
|
|
("archive", archive_info),
|
|
))
|
|
if sbom_info:
|
|
entry["sbom"] = sbom_info
|
|
return entry
|
|
|
|
def _archive_tar(self, source_dir: pathlib.Path, archive_path: pathlib.Path) -> None:
|
|
with tarfile.open(archive_path, "w:gz") as tar:
|
|
for item in sorted(source_dir.rglob("*")):
|
|
arcname = item.relative_to(source_dir)
|
|
tar.add(item, arcname=arcname)
|
|
|
|
def _archive_zip(self, source_dir: pathlib.Path, archive_path: pathlib.Path) -> None:
|
|
with zipfile.ZipFile(archive_path, "w", compression=zipfile.ZIP_DEFLATED) as zipf:
|
|
for item in sorted(source_dir.rglob("*")):
|
|
if item.is_dir():
|
|
continue
|
|
arcname = item.relative_to(source_dir).as_posix()
|
|
zip_info = zipfile.ZipInfo(arcname)
|
|
zip_info.external_attr = (item.stat().st_mode & 0xFFFF) << 16
|
|
with item.open("rb") as handle:
|
|
zipf.writestr(zip_info, handle.read())
|
|
|
|
def _generate_cli_sbom(self, runtime: str, publish_dir: pathlib.Path) -> Optional[Mapping[str, Any]]:
|
|
if self.dry_run:
|
|
return None
|
|
sbom_dir = ensure_directory(self.sboms_dir / "cli")
|
|
sbom_path = sbom_dir / f"cli-{runtime}.cyclonedx.json"
|
|
run([
|
|
"syft",
|
|
f"dir:{publish_dir}",
|
|
"--output",
|
|
f"cyclonedx-json={sbom_path}",
|
|
])
|
|
entry = OrderedDict((
|
|
("path", self._relative_path(sbom_path)),
|
|
("sha256", compute_sha256(sbom_path)),
|
|
))
|
|
signature_info = self._sign_file(sbom_path)
|
|
if signature_info:
|
|
entry["signature"] = signature_info
|
|
return entry
|
|
|
|
def _sign_file(self, path: pathlib.Path) -> Optional[Mapping[str, Any]]:
|
|
if self.skip_signing:
|
|
return None
|
|
if not (self.cosign_key_ref or self.cosign_identity_token):
|
|
raise ValueError(
|
|
"Signing requested but no cosign key or identity token provided. Use --skip-signing to bypass."
|
|
)
|
|
signature_path = path.with_suffix(path.suffix + ".sig")
|
|
sha_path = path.with_suffix(path.suffix + ".sha256")
|
|
digest = compute_sha256(path)
|
|
sha_path.write_text(f"{digest} {path.name}\n", encoding="utf-8")
|
|
cmd = ["cosign", "sign-blob", "--yes", str(path)]
|
|
if self.cosign_key_ref:
|
|
cmd.extend(["--key", self.cosign_key_ref])
|
|
if self.cosign_identity_token:
|
|
cmd.extend(["--identity-token", self.cosign_identity_token])
|
|
if not self.tlog_upload:
|
|
cmd.append("--tlog-upload=false")
|
|
signature_data = run(cmd, env=self.cosign_env).strip()
|
|
signature_path.write_text(signature_data + "\n", encoding="utf-8")
|
|
return OrderedDict((
|
|
("path", self._relative_path(signature_path)),
|
|
("sha256", compute_sha256(signature_path)),
|
|
("tlogUploaded", self.tlog_upload),
|
|
))
|
|
|
|
def _extract_debug_entries(self, component_name: str, image_ref: str) -> List[OrderedDict[str, Any]]:
|
|
if self.dry_run:
|
|
return []
|
|
entries: List[OrderedDict[str, Any]] = []
|
|
platforms = self.platforms if self.push else [None]
|
|
for platform in platforms:
|
|
platform_label = platform or (self.platforms[0] if self.platforms else "linux/amd64")
|
|
if self.push:
|
|
pull_cmd = ["docker", "pull"]
|
|
if platform:
|
|
pull_cmd.extend(["--platform", platform])
|
|
pull_cmd.append(image_ref)
|
|
run(pull_cmd)
|
|
create_cmd = ["docker", "create"]
|
|
if platform:
|
|
create_cmd.extend(["--platform", platform])
|
|
create_cmd.append(image_ref)
|
|
container_id = run(create_cmd).strip()
|
|
export_path = self.temp_dir / f"{container_id}.tar"
|
|
try:
|
|
run(["docker", "export", container_id, "-o", str(export_path)], capture=False)
|
|
finally:
|
|
run(["docker", "rm", container_id], capture=False)
|
|
rootfs_dir = ensure_directory(self.temp_dir / f"{component_name}-{platform_label}-{uuid.uuid4().hex}")
|
|
try:
|
|
with tarfile.open(export_path, "r:*") as tar:
|
|
self._safe_extract_tar(tar, rootfs_dir)
|
|
finally:
|
|
export_path.unlink(missing_ok=True)
|
|
try:
|
|
for file_path in rootfs_dir.rglob("*"):
|
|
if not file_path.is_file() or file_path.is_symlink():
|
|
continue
|
|
if not self._is_elf(file_path):
|
|
continue
|
|
build_id, machine = self._read_build_id_and_machine(file_path)
|
|
if not build_id:
|
|
continue
|
|
debug_file = self._debug_file_for_build_id(build_id)
|
|
if not debug_file.exists():
|
|
debug_file.parent.mkdir(parents=True, exist_ok=True)
|
|
temp_debug = self.temp_dir / f"{build_id}.debug"
|
|
with contextlib.suppress(FileNotFoundError):
|
|
temp_debug.unlink()
|
|
objcopy_tool = self._resolve_objcopy_tool(machine)
|
|
if not objcopy_tool:
|
|
self._emit_objcopy_warning(machine, platform_label, file_path)
|
|
with contextlib.suppress(FileNotFoundError):
|
|
temp_debug.unlink()
|
|
continue
|
|
try:
|
|
run([objcopy_tool, "--only-keep-debug", str(file_path), str(temp_debug)], capture=False)
|
|
except CommandError:
|
|
with contextlib.suppress(FileNotFoundError):
|
|
temp_debug.unlink()
|
|
continue
|
|
debug_file.parent.mkdir(parents=True, exist_ok=True)
|
|
shutil.move(str(temp_debug), str(debug_file))
|
|
sha = compute_sha256(debug_file)
|
|
rel_debug = debug_file.relative_to(self.output_dir).as_posix()
|
|
source_rel = file_path.relative_to(rootfs_dir).as_posix()
|
|
entry = OrderedDict((
|
|
("component", component_name),
|
|
("image", image_ref),
|
|
("platform", platform_label),
|
|
("buildId", build_id),
|
|
("debugPath", rel_debug),
|
|
("sha256", sha),
|
|
("size", debug_file.stat().st_size),
|
|
("sources", [source_rel]),
|
|
))
|
|
entries.append(entry)
|
|
finally:
|
|
shutil.rmtree(rootfs_dir, ignore_errors=True)
|
|
return entries
|
|
|
|
def _debug_file_for_build_id(self, build_id: str) -> pathlib.Path:
|
|
normalized = build_id.lower()
|
|
prefix = normalized[:2]
|
|
remainder = normalized[2:]
|
|
return self.debug_store_dir / prefix / f"{remainder}.debug"
|
|
|
|
@staticmethod
|
|
def _safe_extract_tar(tar: tarfile.TarFile, dest: pathlib.Path) -> None:
|
|
dest_root = dest.resolve()
|
|
members = tar.getmembers()
|
|
for member in members:
|
|
member_path = (dest / member.name).resolve()
|
|
if not str(member_path).startswith(str(dest_root)):
|
|
raise RuntimeError(f"Refusing to extract '{member.name}' outside of destination directory")
|
|
tar.extractall(dest)
|
|
|
|
@staticmethod
|
|
def _is_elf(path: pathlib.Path) -> bool:
|
|
try:
|
|
with path.open("rb") as handle:
|
|
return handle.read(4) == b"\x7fELF"
|
|
except OSError:
|
|
return False
|
|
|
|
def _read_build_id_and_machine(self, path: pathlib.Path) -> Tuple[Optional[str], Optional[str]]:
|
|
try:
|
|
header_output = run(["readelf", "-nh", str(path)])
|
|
except CommandError:
|
|
return None, None
|
|
build_id: Optional[str] = None
|
|
machine: Optional[str] = None
|
|
for line in header_output.splitlines():
|
|
stripped = line.strip()
|
|
if stripped.startswith("Build ID:"):
|
|
build_id = stripped.split("Build ID:", 1)[1].strip().lower()
|
|
elif stripped.startswith("Machine:"):
|
|
machine = stripped.split("Machine:", 1)[1].strip()
|
|
return build_id, machine
|
|
|
|
def _resolve_objcopy_tool(self, machine: Optional[str]) -> Optional[str]:
|
|
key = (machine or "generic").lower()
|
|
if key in self._objcopy_cache:
|
|
return self._objcopy_cache[key]
|
|
|
|
env_override = None
|
|
if machine and "aarch64" in machine.lower():
|
|
env_override = os.environ.get("STELLAOPS_OBJCOPY_AARCH64")
|
|
candidates = [
|
|
env_override,
|
|
"aarch64-linux-gnu-objcopy",
|
|
"llvm-objcopy",
|
|
"objcopy",
|
|
]
|
|
elif machine and any(token in machine.lower() for token in ("x86-64", "amd", "x86_64")):
|
|
env_override = os.environ.get("STELLAOPS_OBJCOPY_AMD64")
|
|
candidates = [
|
|
env_override,
|
|
"objcopy",
|
|
"llvm-objcopy",
|
|
]
|
|
else:
|
|
env_override = os.environ.get("STELLAOPS_OBJCOPY_DEFAULT")
|
|
candidates = [
|
|
env_override,
|
|
"objcopy",
|
|
"llvm-objcopy",
|
|
]
|
|
|
|
for candidate in candidates:
|
|
if not candidate:
|
|
continue
|
|
tool = shutil.which(candidate)
|
|
if tool:
|
|
self._objcopy_cache[key] = tool
|
|
return tool
|
|
self._objcopy_cache[key] = None
|
|
return None
|
|
|
|
def _emit_objcopy_warning(self, machine: Optional[str], platform: str, file_path: pathlib.Path) -> None:
|
|
machine_label = machine or "unknown-machine"
|
|
count = self._missing_symbol_platforms.get(platform, 0)
|
|
self._missing_symbol_platforms[platform] = count + 1
|
|
if count == 0:
|
|
sys.stderr.write(
|
|
f"[warn] no objcopy tool available for {machine_label}; skipping debug extraction for {file_path}.\n"
|
|
)
|
|
|
|
# ----------------
|
|
# Helm + compose
|
|
# ----------------
|
|
def _package_helm(self) -> Optional[Mapping[str, Any]]:
|
|
helm_cfg = self.config.get("helm")
|
|
if not helm_cfg:
|
|
return None
|
|
chart_path = helm_cfg.get("chartPath")
|
|
if not chart_path:
|
|
return None
|
|
chart_dir = self.repo_root / chart_path
|
|
output_dir = ensure_directory(self.output_dir / "helm")
|
|
archive_path = output_dir / f"stellaops-{self.version}.tgz"
|
|
if not self.dry_run:
|
|
cmd = [
|
|
"helm",
|
|
"package",
|
|
str(chart_dir),
|
|
"--destination",
|
|
str(output_dir),
|
|
"--version",
|
|
self.version,
|
|
"--app-version",
|
|
self.version,
|
|
]
|
|
run(cmd)
|
|
packaged = next(output_dir.glob("*.tgz"), None)
|
|
if packaged and packaged != archive_path:
|
|
packaged.rename(archive_path)
|
|
digest = compute_sha256(archive_path) if archive_path.exists() else None
|
|
if archive_path.exists() and archive_path.is_relative_to(self.output_dir):
|
|
manifest_path = str(archive_path.relative_to(self.output_dir))
|
|
elif archive_path.exists() and archive_path.is_relative_to(self.output_dir.parent):
|
|
manifest_path = str(archive_path.relative_to(self.output_dir.parent))
|
|
else:
|
|
manifest_path = f"helm/{archive_path.name}"
|
|
return OrderedDict((
|
|
("name", "stellaops"),
|
|
("version", self.version),
|
|
("path", manifest_path),
|
|
("sha256", digest),
|
|
))
|
|
|
|
def _digest_compose_files(self) -> List[Mapping[str, Any]]:
|
|
compose_cfg = self.config.get("compose", {})
|
|
files = compose_cfg.get("files", [])
|
|
entries: List[Mapping[str, Any]] = []
|
|
for rel_path in files:
|
|
src = self.repo_root / rel_path
|
|
if not src.exists():
|
|
continue
|
|
digest = compute_sha256(src)
|
|
entries.append(OrderedDict((
|
|
("name", pathlib.Path(rel_path).name),
|
|
("path", rel_path),
|
|
("sha256", digest),
|
|
)))
|
|
return entries
|
|
|
|
# ----------------
|
|
# Manifest assembly
|
|
# ----------------
|
|
def _compose_manifest(
|
|
self,
|
|
components: List[Mapping[str, Any]],
|
|
helm_meta: Optional[Mapping[str, Any]],
|
|
compose_meta: List[Mapping[str, Any]],
|
|
debug_meta: Optional[Mapping[str, Any]],
|
|
cli_meta: Sequence[Mapping[str, Any]],
|
|
) -> Dict[str, Any]:
|
|
manifest = OrderedDict()
|
|
manifest["release"] = OrderedDict((
|
|
("version", self.version),
|
|
("channel", self.channel),
|
|
("date", self.release_date),
|
|
("calendar", self.calendar),
|
|
))
|
|
manifest["components"] = components
|
|
if helm_meta:
|
|
manifest["charts"] = [helm_meta]
|
|
if compose_meta:
|
|
manifest["compose"] = compose_meta
|
|
if debug_meta:
|
|
manifest["debugStore"] = debug_meta
|
|
if cli_meta:
|
|
manifest["cli"] = list(cli_meta)
|
|
return manifest
|
|
|
|
|
|
def parse_args(argv: Optional[Sequence[str]] = None) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Build StellaOps release artefacts deterministically")
|
|
parser.add_argument("--config", type=pathlib.Path, default=DEFAULT_CONFIG, help="Path to release config JSON")
|
|
parser.add_argument("--version", required=True, help="Release version string (e.g. 2025.10.0-edge)")
|
|
parser.add_argument("--channel", required=True, help="Release channel (edge|stable|lts)")
|
|
parser.add_argument("--calendar", help="Calendar tag (YYYY.MM); defaults derived from version")
|
|
parser.add_argument("--git-sha", default=os.environ.get("GIT_COMMIT", "unknown"), help="Git revision to embed")
|
|
parser.add_argument("--output", type=pathlib.Path, default=REPO_ROOT / "out/release", help="Output directory for artefacts")
|
|
parser.add_argument("--no-push", action="store_true", help="Do not push images (use docker load)")
|
|
parser.add_argument("--dry-run", action="store_true", help="Print steps without executing commands")
|
|
parser.add_argument("--registry", help="Override registry root (e.g. localhost:5000/stellaops)")
|
|
parser.add_argument("--platform", dest="platforms", action="append", metavar="PLATFORM", help="Override build platforms (repeatable)")
|
|
parser.add_argument("--skip-signing", action="store_true", help="Skip cosign signing/attestation steps")
|
|
parser.add_argument("--cosign-key", dest="cosign_key", help="Override COSIGN_KEY_REF value")
|
|
parser.add_argument("--cosign-password", dest="cosign_password", help="Password for cosign key")
|
|
parser.add_argument("--cosign-identity-token", dest="cosign_identity_token", help="Identity token for keyless cosign flows")
|
|
parser.add_argument("--no-transparency", action="store_true", help="Disable Rekor transparency log upload during signing")
|
|
return parser.parse_args(argv)
|
|
|
|
|
|
def write_manifest(manifest: Mapping[str, Any], output_dir: pathlib.Path) -> pathlib.Path:
|
|
# Copy manifest to avoid mutating input when computing checksum
|
|
base_manifest = OrderedDict(manifest)
|
|
yaml_without_checksum = dump_yaml(base_manifest)
|
|
digest = hashlib.sha256(yaml_without_checksum.encode("utf-8")).hexdigest()
|
|
manifest_with_checksum = OrderedDict(base_manifest)
|
|
manifest_with_checksum["checksums"] = OrderedDict((("sha256", digest),))
|
|
final_yaml = dump_yaml(manifest_with_checksum)
|
|
output_path = output_dir / "release.yaml"
|
|
with output_path.open("w", encoding="utf-8") as handle:
|
|
handle.write(final_yaml)
|
|
sha_path = output_path.with_name(output_path.name + ".sha256")
|
|
yaml_file_digest = compute_sha256(output_path)
|
|
sha_path.write_text(f"{yaml_file_digest} {output_path.name}\n", encoding="utf-8")
|
|
|
|
json_text = json.dumps(manifest_with_checksum, indent=2)
|
|
json_path = output_dir / "release.json"
|
|
with json_path.open("w", encoding="utf-8") as handle:
|
|
handle.write(json_text)
|
|
handle.write("\n")
|
|
json_digest = compute_sha256(json_path)
|
|
json_sha_path = json_path.with_name(json_path.name + ".sha256")
|
|
json_sha_path.write_text(f"{json_digest} {json_path.name}\n", encoding="utf-8")
|
|
return output_path
|
|
|
|
|
|
def main(argv: Optional[Sequence[str]] = None) -> int:
|
|
args = parse_args(argv)
|
|
config = load_json_config(args.config)
|
|
release_date = utc_now_iso()
|
|
calendar = sanitize_calendar(args.version, args.calendar)
|
|
builder = ReleaseBuilder(
|
|
repo_root=REPO_ROOT,
|
|
config=config,
|
|
version=args.version,
|
|
channel=args.channel,
|
|
calendar=calendar,
|
|
release_date=release_date,
|
|
git_sha=args.git_sha,
|
|
output_dir=args.output,
|
|
push=not args.no_push,
|
|
dry_run=args.dry_run,
|
|
registry_override=args.registry,
|
|
platforms_override=args.platforms,
|
|
skip_signing=args.skip_signing,
|
|
cosign_key_ref=args.cosign_key,
|
|
cosign_password=args.cosign_password,
|
|
cosign_identity_token=args.cosign_identity_token,
|
|
tlog_upload=not args.no_transparency,
|
|
)
|
|
manifest = builder.run()
|
|
manifest_path = write_manifest(manifest, builder.output_dir)
|
|
print(f"✅ Release manifest written to {manifest_path}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|