Files
git.stella-ops.org/ops/devops/release/build_release.py
master 17d861e4ab
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
up
2025-10-24 09:15:37 +03:00

631 lines
24 KiB
Python

#!/usr/bin/env python3
"""Deterministic release pipeline helper for StellaOps.
This script builds service containers, generates SBOM and provenance artefacts,
signs them with cosign, and writes a channel-specific release manifest.
The workflow expects external tooling to be available on PATH:
- docker (with buildx)
- cosign
- helm
- npm / node (for the UI build)
- dotnet SDK (for BuildX plugin publication)
"""
from __future__ import annotations
import argparse
import datetime as dt
import hashlib
import json
import os
import pathlib
import re
import shlex
import subprocess
import sys
import tempfile
from collections import OrderedDict
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Sequence
REPO_ROOT = pathlib.Path(__file__).resolve().parents[3]
DEFAULT_CONFIG = REPO_ROOT / "ops/devops/release/components.json"
class CommandError(RuntimeError):
pass
def run(cmd: Sequence[str], *, cwd: Optional[pathlib.Path] = None, env: Optional[Mapping[str, str]] = None, capture: bool = True) -> str:
"""Run a subprocess command, returning stdout (text)."""
process_env = os.environ.copy()
if env:
process_env.update(env)
result = subprocess.run(
list(cmd),
cwd=str(cwd) if cwd else None,
env=process_env,
check=False,
capture_output=capture,
text=True,
)
if process_env.get("STELLAOPS_RELEASE_DEBUG"):
sys.stderr.write(f"[debug] {' '.join(shlex.quote(c) for c in cmd)}\n")
if capture:
sys.stderr.write(result.stdout)
sys.stderr.write(result.stderr)
if result.returncode != 0:
stdout = result.stdout if capture else ""
stderr = result.stderr if capture else ""
raise CommandError(f"Command failed ({result.returncode}): {' '.join(cmd)}\nSTDOUT:\n{stdout}\nSTDERR:\n{stderr}")
return result.stdout if capture else ""
def load_json_config(path: pathlib.Path) -> Dict[str, Any]:
with path.open("r", encoding="utf-8") as handle:
return json.load(handle)
def ensure_directory(path: pathlib.Path) -> pathlib.Path:
path.mkdir(parents=True, exist_ok=True)
return path
def compute_sha256(path: pathlib.Path) -> str:
sha = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
sha.update(chunk)
return sha.hexdigest()
def format_scalar(value: Any) -> str:
if isinstance(value, bool):
return "true" if value else "false"
if value is None:
return "null"
if isinstance(value, (int, float)):
return str(value)
text = str(value)
if text == "":
return '""'
if re.search(r"[\s:#\-\[\]\{\}]", text):
return json.dumps(text, ensure_ascii=False)
return text
def _yaml_lines(value: Any, indent: int = 0) -> List[str]:
pad = " " * indent
if isinstance(value, Mapping):
lines: List[str] = []
for key, val in value.items():
if isinstance(val, (Mapping, list)):
lines.append(f"{pad}{key}:")
lines.extend(_yaml_lines(val, indent + 1))
else:
lines.append(f"{pad}{key}: {format_scalar(val)}")
if not lines:
lines.append(f"{pad}{{}}")
return lines
if isinstance(value, list):
lines = []
if not value:
lines.append(f"{pad}[]")
return lines
for item in value:
if isinstance(item, (Mapping, list)):
lines.append(f"{pad}-")
lines.extend(_yaml_lines(item, indent + 1))
else:
lines.append(f"{pad}- {format_scalar(item)}")
return lines
return [f"{pad}{format_scalar(value)}"]
def dump_yaml(data: Mapping[str, Any]) -> str:
lines: List[str] = _yaml_lines(data)
return "\n".join(lines) + "\n"
def utc_now_iso() -> str:
return dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def sanitize_calendar(version: str, explicit: Optional[str]) -> str:
if explicit:
return explicit
# Expect version like 2025.10.0-edge or 2.4.1
parts = re.findall(r"\d+", version)
if len(parts) >= 2:
return f"{parts[0]}.{parts[1]}"
return dt.datetime.now(tz=dt.timezone.utc).strftime("%Y.%m")
class ReleaseBuilder:
def __init__(
self,
*,
repo_root: pathlib.Path,
config: Mapping[str, Any],
version: str,
channel: str,
calendar: str,
release_date: str,
git_sha: str,
output_dir: pathlib.Path,
push: bool,
dry_run: bool,
registry_override: Optional[str] = None,
platforms_override: Optional[Sequence[str]] = None,
skip_signing: bool = False,
cosign_key_ref: Optional[str] = None,
cosign_password: Optional[str] = None,
cosign_identity_token: Optional[str] = None,
tlog_upload: bool = True,
) -> None:
self.repo_root = repo_root
self.config = config
self.version = version
self.channel = channel
self.calendar = calendar
self.release_date = release_date
self.git_sha = git_sha
self.output_dir = ensure_directory(output_dir)
self.push = push
self.dry_run = dry_run
self.registry = registry_override or config.get("registry")
if not self.registry:
raise ValueError("Config missing 'registry'")
platforms = list(platforms_override) if platforms_override else config.get("platforms")
if not platforms:
platforms = ["linux/amd64", "linux/arm64"]
self.platforms = list(platforms)
self.source_date_epoch = str(int(dt.datetime.fromisoformat(release_date.replace("Z", "+00:00")).timestamp()))
self.artifacts_dir = ensure_directory(self.output_dir / "artifacts")
self.sboms_dir = ensure_directory(self.artifacts_dir / "sboms")
self.provenance_dir = ensure_directory(self.artifacts_dir / "provenance")
self.signature_dir = ensure_directory(self.artifacts_dir / "signatures")
self.metadata_dir = ensure_directory(self.artifacts_dir / "metadata")
self.temp_dir = pathlib.Path(tempfile.mkdtemp(prefix="stellaops-release-"))
self.skip_signing = skip_signing
self.tlog_upload = tlog_upload
self.cosign_key_ref = cosign_key_ref or os.environ.get("COSIGN_KEY_REF")
self.cosign_identity_token = cosign_identity_token or os.environ.get("COSIGN_IDENTITY_TOKEN")
password = cosign_password if cosign_password is not None else os.environ.get("COSIGN_PASSWORD", "")
self.cosign_env = {
"COSIGN_PASSWORD": password,
"COSIGN_EXPERIMENTAL": "1",
"COSIGN_ALLOW_HTTP_REGISTRY": os.environ.get("COSIGN_ALLOW_HTTP_REGISTRY", "1"),
"COSIGN_DOCKER_MEDIA_TYPES": os.environ.get("COSIGN_DOCKER_MEDIA_TYPES", "1"),
}
# ----------------
# Build steps
# ----------------
def run(self) -> Dict[str, Any]:
components_result = []
if self.dry_run:
print("⚠️ Dry-run enabled; commands will be skipped")
self._prime_buildx_plugin()
for component in self.config.get("components", []):
result = self._build_component(component)
components_result.append(result)
helm_meta = self._package_helm()
compose_meta = self._digest_compose_files()
manifest = self._compose_manifest(components_result, helm_meta, compose_meta)
return manifest
def _prime_buildx_plugin(self) -> None:
plugin_cfg = self.config.get("buildxPlugin")
if not plugin_cfg:
return
project = plugin_cfg.get("project")
if not project:
return
out_dir = ensure_directory(self.temp_dir / "buildx")
if not self.dry_run:
run([
"dotnet",
"publish",
project,
"-c",
"Release",
"-o",
str(out_dir),
])
cas_dir = ensure_directory(self.temp_dir / "cas")
run([
"dotnet",
str(out_dir / "StellaOps.Scanner.Sbomer.BuildXPlugin.dll"),
"handshake",
"--manifest",
str(out_dir),
"--cas",
str(cas_dir),
])
def _component_tags(self, repo: str) -> List[str]:
base = f"{self.registry}/{repo}"
tags = [f"{base}:{self.version}"]
if self.channel:
tags.append(f"{base}:{self.channel}")
return tags
def _component_ref(self, repo: str, digest: str) -> str:
return f"{self.registry}/{repo}@{digest}"
def _build_component(self, component: Mapping[str, Any]) -> Mapping[str, Any]:
name = component["name"]
repo = component.get("repository", name)
kind = component.get("kind", "dotnet-service")
dockerfile = component.get("dockerfile")
if not dockerfile:
raise ValueError(f"Component {name} missing dockerfile")
context = component.get("context", ".")
iid_file = self.temp_dir / f"{name}.iid"
metadata_file = self.metadata_dir / f"{name}.metadata.json"
build_args = {
"VERSION": self.version,
"CHANNEL": self.channel,
"GIT_SHA": self.git_sha,
"SOURCE_DATE_EPOCH": self.source_date_epoch,
}
docker_cfg = self.config.get("docker", {})
if kind == "dotnet-service":
build_args.update({
"PROJECT": component["project"],
"ENTRYPOINT_DLL": component["entrypoint"],
"SDK_IMAGE": docker_cfg.get("sdkImage", "mcr.microsoft.com/dotnet/nightly/sdk:10.0"),
"RUNTIME_IMAGE": docker_cfg.get("runtimeImage", "gcr.io/distroless/dotnet/aspnet:latest"),
})
elif kind == "angular-ui":
build_args.update({
"NODE_IMAGE": docker_cfg.get("nodeImage", "node:20.14.0-bookworm"),
"NGINX_IMAGE": docker_cfg.get("nginxImage", "nginx:1.27-alpine"),
})
else:
raise ValueError(f"Unsupported component kind {kind}")
tags = self._component_tags(repo)
build_cmd = [
"docker",
"buildx",
"build",
"--file",
dockerfile,
"--metadata-file",
str(metadata_file),
"--iidfile",
str(iid_file),
"--progress",
"plain",
"--platform",
",".join(self.platforms),
]
for key, value in build_args.items():
build_cmd.extend(["--build-arg", f"{key}={value}"])
for tag in tags:
build_cmd.extend(["--tag", tag])
build_cmd.extend([
"--attest",
"type=sbom",
"--attest",
"type=provenance,mode=max",
])
if self.push:
build_cmd.append("--push")
else:
build_cmd.append("--load")
build_cmd.append(context)
if not self.dry_run:
run(build_cmd, cwd=self.repo_root)
digest = iid_file.read_text(encoding="utf-8").strip() if iid_file.exists() else ""
image_ref = self._component_ref(repo, digest) if digest else ""
bundle_info = self._sign_image(name, image_ref, tags)
sbom_info = self._generate_sbom(name, image_ref)
provenance_info = self._attach_provenance(name, image_ref)
component_entry = OrderedDict()
component_entry["name"] = name
if digest:
component_entry["image"] = image_ref
component_entry["tags"] = tags
if sbom_info:
component_entry["sbom"] = sbom_info
if provenance_info:
component_entry["provenance"] = provenance_info
if bundle_info:
component_entry["signature"] = bundle_info
if metadata_file.exists():
component_entry["metadata"] = str(metadata_file.relative_to(self.output_dir.parent)) if metadata_file.is_relative_to(self.output_dir.parent) else str(metadata_file)
return component_entry
def _sign_image(self, name: str, image_ref: str, tags: Sequence[str]) -> Optional[Mapping[str, Any]]:
if self.skip_signing:
return None
if not image_ref:
return None
if not (self.cosign_key_ref or self.cosign_identity_token):
raise ValueError("Signing requested but no cosign key or identity token provided. Use --skip-signing to bypass.")
signature_path = self.signature_dir / f"{name}.signature"
cmd = ["cosign", "sign", "--yes"]
if self.cosign_key_ref:
cmd.extend(["--key", self.cosign_key_ref])
if self.cosign_identity_token:
cmd.extend(["--identity-token", self.cosign_identity_token])
if not self.tlog_upload:
cmd.append("--tlog-upload=false")
cmd.append("--allow-http-registry")
cmd.append(image_ref)
if self.dry_run:
return None
run(cmd, env=self.cosign_env)
signature_data = run([
"cosign",
"download",
"signature",
"--allow-http-registry",
image_ref,
])
signature_path.write_text(signature_data, encoding="utf-8")
signature_ref = run([
"cosign",
"triangulate",
"--allow-http-registry",
image_ref,
]).strip()
return OrderedDict(
(
("signature", OrderedDict((
("path", str(signature_path.relative_to(self.output_dir.parent)) if signature_path.is_relative_to(self.output_dir.parent) else str(signature_path)),
("ref", signature_ref),
("tlogUploaded", self.tlog_upload),
))),
)
)
def _generate_sbom(self, name: str, image_ref: str) -> Optional[Mapping[str, Any]]:
if not image_ref or self.dry_run:
return None
sbom_path = self.sboms_dir / f"{name}.cyclonedx.json"
run([
"docker",
"sbom",
image_ref,
"--format",
"cyclonedx-json",
"--output",
str(sbom_path),
])
entry = OrderedDict((
("path", str(sbom_path.relative_to(self.output_dir.parent)) if sbom_path.is_relative_to(self.output_dir.parent) else str(sbom_path)),
("sha256", compute_sha256(sbom_path)),
))
if self.skip_signing:
return entry
attach_cmd = [
"cosign",
"attach",
"sbom",
"--sbom",
str(sbom_path),
"--type",
"cyclonedx",
]
if self.cosign_key_ref:
attach_cmd.extend(["--key", self.cosign_key_ref])
attach_cmd.append("--allow-http-registry")
attach_cmd.append(image_ref)
run(attach_cmd, env=self.cosign_env)
reference = run(["cosign", "triangulate", "--type", "sbom", "--allow-http-registry", image_ref]).strip()
entry["ref"] = reference
return entry
def _attach_provenance(self, name: str, image_ref: str) -> Optional[Mapping[str, Any]]:
if not image_ref or self.dry_run:
return None
predicate = OrderedDict()
predicate["buildDefinition"] = OrderedDict(
(
("buildType", "https://git.stella-ops.org/stellaops/release"),
("externalParameters", OrderedDict((
("component", name),
("version", self.version),
("channel", self.channel),
))),
)
)
predicate["runDetails"] = OrderedDict(
(
("builder", OrderedDict((("id", "https://github.com/actions"),))),
("metadata", OrderedDict((("finishedOn", self.release_date),))),
)
)
predicate_path = self.provenance_dir / f"{name}.provenance.json"
with predicate_path.open("w", encoding="utf-8") as handle:
json.dump(predicate, handle, indent=2, sort_keys=True)
handle.write("\n")
entry = OrderedDict((
("path", str(predicate_path.relative_to(self.output_dir.parent)) if predicate_path.is_relative_to(self.output_dir.parent) else str(predicate_path)),
("sha256", compute_sha256(predicate_path)),
))
if self.skip_signing:
return entry
cmd = [
"cosign",
"attest",
"--predicate",
str(predicate_path),
"--type",
"https://slsa.dev/provenance/v1",
]
if self.cosign_key_ref:
cmd.extend(["--key", self.cosign_key_ref])
if not self.tlog_upload:
cmd.append("--tlog-upload=false")
cmd.append("--allow-http-registry")
cmd.append(image_ref)
run(cmd, env=self.cosign_env)
ref = run([
"cosign",
"triangulate",
"--type",
"https://slsa.dev/provenance/v1",
"--allow-http-registry",
image_ref,
]).strip()
entry["ref"] = ref
return entry
# ----------------
# Helm + compose
# ----------------
def _package_helm(self) -> Optional[Mapping[str, Any]]:
helm_cfg = self.config.get("helm")
if not helm_cfg:
return None
chart_path = helm_cfg.get("chartPath")
if not chart_path:
return None
chart_dir = self.repo_root / chart_path
output_dir = ensure_directory(self.output_dir / "helm")
archive_path = output_dir / f"stellaops-{self.version}.tgz"
if not self.dry_run:
cmd = [
"helm",
"package",
str(chart_dir),
"--destination",
str(output_dir),
"--version",
self.version,
"--app-version",
self.version,
]
run(cmd)
packaged = next(output_dir.glob("*.tgz"), None)
if packaged and packaged != archive_path:
packaged.rename(archive_path)
digest = compute_sha256(archive_path) if archive_path.exists() else None
if archive_path.exists() and archive_path.is_relative_to(self.output_dir):
manifest_path = str(archive_path.relative_to(self.output_dir))
elif archive_path.exists() and archive_path.is_relative_to(self.output_dir.parent):
manifest_path = str(archive_path.relative_to(self.output_dir.parent))
else:
manifest_path = f"helm/{archive_path.name}"
return OrderedDict((
("name", "stellaops"),
("version", self.version),
("path", manifest_path),
("sha256", digest),
))
def _digest_compose_files(self) -> List[Mapping[str, Any]]:
compose_cfg = self.config.get("compose", {})
files = compose_cfg.get("files", [])
entries: List[Mapping[str, Any]] = []
for rel_path in files:
src = self.repo_root / rel_path
if not src.exists():
continue
digest = compute_sha256(src)
entries.append(OrderedDict((
("name", pathlib.Path(rel_path).name),
("path", rel_path),
("sha256", digest),
)))
return entries
# ----------------
# Manifest assembly
# ----------------
def _compose_manifest(
self,
components: List[Mapping[str, Any]],
helm_meta: Optional[Mapping[str, Any]],
compose_meta: List[Mapping[str, Any]],
) -> Dict[str, Any]:
manifest = OrderedDict()
manifest["release"] = OrderedDict((
("version", self.version),
("channel", self.channel),
("date", self.release_date),
("calendar", self.calendar),
))
manifest["components"] = components
if helm_meta:
manifest["charts"] = [helm_meta]
if compose_meta:
manifest["compose"] = compose_meta
return manifest
def parse_args(argv: Optional[Sequence[str]] = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Build StellaOps release artefacts deterministically")
parser.add_argument("--config", type=pathlib.Path, default=DEFAULT_CONFIG, help="Path to release config JSON")
parser.add_argument("--version", required=True, help="Release version string (e.g. 2025.10.0-edge)")
parser.add_argument("--channel", required=True, help="Release channel (edge|stable|lts)")
parser.add_argument("--calendar", help="Calendar tag (YYYY.MM); defaults derived from version")
parser.add_argument("--git-sha", default=os.environ.get("GIT_COMMIT", "unknown"), help="Git revision to embed")
parser.add_argument("--output", type=pathlib.Path, default=REPO_ROOT / "out/release", help="Output directory for artefacts")
parser.add_argument("--no-push", action="store_true", help="Do not push images (use docker load)")
parser.add_argument("--dry-run", action="store_true", help="Print steps without executing commands")
parser.add_argument("--registry", help="Override registry root (e.g. localhost:5000/stellaops)")
parser.add_argument("--platform", dest="platforms", action="append", metavar="PLATFORM", help="Override build platforms (repeatable)")
parser.add_argument("--skip-signing", action="store_true", help="Skip cosign signing/attestation steps")
parser.add_argument("--cosign-key", dest="cosign_key", help="Override COSIGN_KEY_REF value")
parser.add_argument("--cosign-password", dest="cosign_password", help="Password for cosign key")
parser.add_argument("--cosign-identity-token", dest="cosign_identity_token", help="Identity token for keyless cosign flows")
parser.add_argument("--no-transparency", action="store_true", help="Disable Rekor transparency log upload during signing")
return parser.parse_args(argv)
def write_manifest(manifest: Mapping[str, Any], output_dir: pathlib.Path) -> pathlib.Path:
# Copy manifest to avoid mutating input when computing checksum
base_manifest = OrderedDict(manifest)
yaml_without_checksum = dump_yaml(base_manifest)
digest = hashlib.sha256(yaml_without_checksum.encode("utf-8")).hexdigest()
manifest_with_checksum = OrderedDict(base_manifest)
manifest_with_checksum["checksums"] = OrderedDict((("sha256", digest),))
final_yaml = dump_yaml(manifest_with_checksum)
output_path = output_dir / "release.yaml"
with output_path.open("w", encoding="utf-8") as handle:
handle.write(final_yaml)
return output_path
def main(argv: Optional[Sequence[str]] = None) -> int:
args = parse_args(argv)
config = load_json_config(args.config)
release_date = utc_now_iso()
calendar = sanitize_calendar(args.version, args.calendar)
builder = ReleaseBuilder(
repo_root=REPO_ROOT,
config=config,
version=args.version,
channel=args.channel,
calendar=calendar,
release_date=release_date,
git_sha=args.git_sha,
output_dir=args.output,
push=not args.no_push,
dry_run=args.dry_run,
registry_override=args.registry,
platforms_override=args.platforms,
skip_signing=args.skip_signing,
cosign_key_ref=args.cosign_key,
cosign_password=args.cosign_password,
cosign_identity_token=args.cosign_identity_token,
tlog_upload=not args.no_transparency,
)
manifest = builder.run()
manifest_path = write_manifest(manifest, builder.output_dir)
print(f"✅ Release manifest written to {manifest_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())