Restructure solution layout by module
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled

This commit is contained in:
root
2025-10-28 15:10:40 +02:00
parent 4e3e575db5
commit 68da90a11a
4103 changed files with 192899 additions and 187024 deletions

View File

@@ -1,445 +1,445 @@
#!/usr/bin/env python3
"""Package the StellaOps Offline Kit with deterministic artefacts and manifest."""
from __future__ import annotations
import argparse
import datetime as dt
import hashlib
import json
import os
import re
import shutil
import subprocess
import sys
import tarfile
from collections import OrderedDict
from pathlib import Path
from typing import Any, Iterable, Mapping, MutableMapping, Optional
REPO_ROOT = Path(__file__).resolve().parents[2]
RELEASE_TOOLS_DIR = REPO_ROOT / "ops" / "devops" / "release"
TELEMETRY_TOOLS_DIR = REPO_ROOT / "ops" / "devops" / "telemetry"
TELEMETRY_BUNDLE_PATH = REPO_ROOT / "out" / "telemetry" / "telemetry-offline-bundle.tar.gz"
if str(RELEASE_TOOLS_DIR) not in sys.path:
sys.path.insert(0, str(RELEASE_TOOLS_DIR))
from verify_release import ( # type: ignore import-not-found
load_manifest,
resolve_path,
verify_release,
)
import mirror_debug_store # type: ignore import-not-found
DEFAULT_RELEASE_DIR = REPO_ROOT / "out" / "release"
DEFAULT_STAGING_DIR = REPO_ROOT / "out" / "offline-kit" / "staging"
DEFAULT_OUTPUT_DIR = REPO_ROOT / "out" / "offline-kit" / "dist"
ARTIFACT_TARGETS = {
"sbom": Path("sboms"),
"provenance": Path("attest"),
"signature": Path("signatures"),
"metadata": Path("metadata/docker"),
}
class CommandError(RuntimeError):
"""Raised when an external command fails."""
def run(cmd: Iterable[str], *, cwd: Optional[Path] = None, env: Optional[Mapping[str, str]] = None) -> str:
process_env = dict(os.environ)
if env:
process_env.update(env)
result = subprocess.run(
list(cmd),
cwd=str(cwd) if cwd else None,
env=process_env,
check=False,
capture_output=True,
text=True,
)
if result.returncode != 0:
raise CommandError(
f"Command failed ({result.returncode}): {' '.join(cmd)}\nSTDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}"
)
return result.stdout
def compute_sha256(path: Path) -> str:
sha = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
sha.update(chunk)
return sha.hexdigest()
def utc_now_iso() -> str:
return dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def safe_component_name(name: str) -> str:
return re.sub(r"[^A-Za-z0-9_.-]", "-", name.strip().lower())
def clean_directory(path: Path) -> None:
if path.exists():
shutil.rmtree(path)
path.mkdir(parents=True, exist_ok=True)
def run_python_analyzer_smoke() -> None:
script = REPO_ROOT / "ops" / "offline-kit" / "run-python-analyzer-smoke.sh"
run(["bash", str(script)], cwd=REPO_ROOT)
def copy_if_exists(source: Path, target: Path) -> None:
if source.is_dir():
shutil.copytree(source, target, dirs_exist_ok=True)
elif source.is_file():
target.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source, target)
def copy_release_manifests(release_dir: Path, staging_dir: Path) -> None:
manifest_dir = staging_dir / "manifest"
manifest_dir.mkdir(parents=True, exist_ok=True)
for name in ("release.yaml", "release.yaml.sha256", "release.json", "release.json.sha256"):
source = release_dir / name
if source.exists():
shutil.copy2(source, manifest_dir / source.name)
def copy_component_artifacts(
manifest: Mapping[str, Any],
release_dir: Path,
staging_dir: Path,
) -> None:
components = manifest.get("components") or []
for component in sorted(components, key=lambda entry: str(entry.get("name", ""))):
if not isinstance(component, Mapping):
continue
component_name = safe_component_name(str(component.get("name", "component")))
for key, target_root in ARTIFACT_TARGETS.items():
entry = component.get(key)
if not entry or not isinstance(entry, Mapping):
continue
path_str = entry.get("path")
if not path_str:
continue
resolved = resolve_path(str(path_str), release_dir)
if not resolved.exists():
raise FileNotFoundError(f"Component '{component_name}' {key} artefact not found: {resolved}")
target_dir = staging_dir / target_root
target_dir.mkdir(parents=True, exist_ok=True)
target_name = f"{component_name}-{resolved.name}" if resolved.name else component_name
shutil.copy2(resolved, target_dir / target_name)
def copy_collections(
manifest: Mapping[str, Any],
release_dir: Path,
staging_dir: Path,
) -> None:
for collection, subdir in (("charts", Path("charts")), ("compose", Path("compose"))):
entries = manifest.get(collection) or []
for entry in entries:
if not isinstance(entry, Mapping):
continue
path_str = entry.get("path")
if not path_str:
continue
resolved = resolve_path(str(path_str), release_dir)
if not resolved.exists():
raise FileNotFoundError(f"{collection} artefact not found: {resolved}")
target_dir = staging_dir / subdir
target_dir.mkdir(parents=True, exist_ok=True)
shutil.copy2(resolved, target_dir / resolved.name)
def copy_debug_store(release_dir: Path, staging_dir: Path) -> None:
mirror_debug_store.main(
[
"--release-dir",
str(release_dir),
"--offline-kit-dir",
str(staging_dir),
]
)
def copy_plugins_and_assets(staging_dir: Path) -> None:
copy_if_exists(REPO_ROOT / "plugins" / "scanner", staging_dir / "plugins" / "scanner")
copy_if_exists(REPO_ROOT / "certificates", staging_dir / "certificates")
copy_if_exists(REPO_ROOT / "seed-data", staging_dir / "seed-data")
docs_dir = staging_dir / "docs"
docs_dir.mkdir(parents=True, exist_ok=True)
copy_if_exists(REPO_ROOT / "docs" / "24_OFFLINE_KIT.md", docs_dir / "24_OFFLINE_KIT.md")
copy_if_exists(REPO_ROOT / "docs" / "ops" / "telemetry-collector.md", docs_dir / "telemetry-collector.md")
copy_if_exists(REPO_ROOT / "docs" / "ops" / "telemetry-storage.md", docs_dir / "telemetry-storage.md")
def package_telemetry_bundle(staging_dir: Path) -> None:
script = TELEMETRY_TOOLS_DIR / "package_offline_bundle.py"
if not script.exists():
return
TELEMETRY_BUNDLE_PATH.parent.mkdir(parents=True, exist_ok=True)
run(["python", str(script), "--output", str(TELEMETRY_BUNDLE_PATH)], cwd=REPO_ROOT)
telemetry_dir = staging_dir / "telemetry"
telemetry_dir.mkdir(parents=True, exist_ok=True)
shutil.copy2(TELEMETRY_BUNDLE_PATH, telemetry_dir / TELEMETRY_BUNDLE_PATH.name)
sha_path = TELEMETRY_BUNDLE_PATH.with_suffix(TELEMETRY_BUNDLE_PATH.suffix + ".sha256")
if sha_path.exists():
shutil.copy2(sha_path, telemetry_dir / sha_path.name)
def scan_files(staging_dir: Path, exclude: Optional[set[str]] = None) -> list[OrderedDict[str, Any]]:
entries: list[OrderedDict[str, Any]] = []
exclude = exclude or set()
for path in sorted(staging_dir.rglob("*")):
if not path.is_file():
continue
rel = path.relative_to(staging_dir).as_posix()
if rel in exclude:
continue
entries.append(
OrderedDict(
(
("name", rel),
("sha256", compute_sha256(path)),
("size", path.stat().st_size),
)
)
)
return entries
def write_offline_manifest(
staging_dir: Path,
version: str,
channel: str,
release_manifest_sha: Optional[str],
) -> tuple[Path, str]:
manifest_dir = staging_dir / "manifest"
manifest_dir.mkdir(parents=True, exist_ok=True)
offline_manifest_path = manifest_dir / "offline-manifest.json"
files = scan_files(staging_dir, exclude={"manifest/offline-manifest.json", "manifest/offline-manifest.json.sha256"})
manifest_data = OrderedDict(
(
(
"bundle",
OrderedDict(
(
("version", version),
("channel", channel),
("capturedAt", utc_now_iso()),
("releaseManifestSha256", release_manifest_sha),
)
),
),
("artifacts", files),
)
)
with offline_manifest_path.open("w", encoding="utf-8") as handle:
json.dump(manifest_data, handle, indent=2)
handle.write("\n")
manifest_sha = compute_sha256(offline_manifest_path)
(offline_manifest_path.with_suffix(".json.sha256")).write_text(
f"{manifest_sha} {offline_manifest_path.name}\n",
encoding="utf-8",
)
return offline_manifest_path, manifest_sha
def tarinfo_filter(tarinfo: tarfile.TarInfo) -> tarfile.TarInfo:
tarinfo.uid = 0
tarinfo.gid = 0
tarinfo.uname = ""
tarinfo.gname = ""
tarinfo.mtime = 0
return tarinfo
def create_tarball(staging_dir: Path, output_dir: Path, bundle_name: str) -> Path:
output_dir.mkdir(parents=True, exist_ok=True)
bundle_path = output_dir / f"{bundle_name}.tar.gz"
if bundle_path.exists():
bundle_path.unlink()
with tarfile.open(bundle_path, "w:gz", compresslevel=9) as tar:
for path in sorted(staging_dir.rglob("*")):
if path.is_file():
arcname = path.relative_to(staging_dir).as_posix()
tar.add(path, arcname=arcname, filter=tarinfo_filter)
return bundle_path
def sign_blob(
path: Path,
*,
key_ref: Optional[str],
identity_token: Optional[str],
password: Optional[str],
tlog_upload: bool,
) -> Optional[Path]:
if not key_ref and not identity_token:
return None
cmd = ["cosign", "sign-blob", "--yes", str(path)]
if key_ref:
cmd.extend(["--key", key_ref])
if identity_token:
cmd.extend(["--identity-token", identity_token])
if not tlog_upload:
cmd.append("--tlog-upload=false")
env = {"COSIGN_PASSWORD": password or ""}
signature = run(cmd, env=env)
sig_path = path.with_suffix(path.suffix + ".sig")
sig_path.write_text(signature, encoding="utf-8")
return sig_path
def build_offline_kit(args: argparse.Namespace) -> MutableMapping[str, Any]:
release_dir = args.release_dir.resolve()
staging_dir = args.staging_dir.resolve()
output_dir = args.output_dir.resolve()
verify_release(release_dir)
if not args.skip_smoke:
run_python_analyzer_smoke()
clean_directory(staging_dir)
copy_debug_store(release_dir, staging_dir)
manifest_data = load_manifest(release_dir)
release_manifest_sha = None
checksums = manifest_data.get("checksums")
if isinstance(checksums, Mapping):
release_manifest_sha = checksums.get("sha256")
copy_release_manifests(release_dir, staging_dir)
copy_component_artifacts(manifest_data, release_dir, staging_dir)
copy_collections(manifest_data, release_dir, staging_dir)
copy_plugins_and_assets(staging_dir)
package_telemetry_bundle(staging_dir)
offline_manifest_path, offline_manifest_sha = write_offline_manifest(
staging_dir,
args.version,
args.channel,
release_manifest_sha,
)
bundle_name = f"stella-ops-offline-kit-{args.version}-{args.channel}"
bundle_path = create_tarball(staging_dir, output_dir, bundle_name)
bundle_sha = compute_sha256(bundle_path)
bundle_sha_prefixed = f"sha256:{bundle_sha}"
(bundle_path.with_suffix(".tar.gz.sha256")).write_text(
f"{bundle_sha} {bundle_path.name}\n",
encoding="utf-8",
)
signature_paths: dict[str, str] = {}
sig = sign_blob(
bundle_path,
key_ref=args.cosign_key,
identity_token=args.cosign_identity_token,
password=args.cosign_password,
tlog_upload=not args.no_transparency,
)
if sig:
signature_paths["bundleSignature"] = str(sig)
manifest_sig = sign_blob(
offline_manifest_path,
key_ref=args.cosign_key,
identity_token=args.cosign_identity_token,
password=args.cosign_password,
tlog_upload=not args.no_transparency,
)
if manifest_sig:
signature_paths["manifestSignature"] = str(manifest_sig)
metadata = OrderedDict(
(
("bundleId", args.bundle_id or f"{args.version}-{args.channel}-{utc_now_iso()}"),
("bundleName", bundle_path.name),
("bundleSha256", bundle_sha_prefixed),
("bundleSize", bundle_path.stat().st_size),
("manifestName", offline_manifest_path.name),
("manifestSha256", f"sha256:{offline_manifest_sha}"),
("manifestSize", offline_manifest_path.stat().st_size),
("channel", args.channel),
("version", args.version),
("capturedAt", utc_now_iso()),
)
)
if sig:
metadata["bundleSignatureName"] = Path(sig).name
if manifest_sig:
metadata["manifestSignatureName"] = Path(manifest_sig).name
metadata_path = output_dir / f"{bundle_name}.metadata.json"
with metadata_path.open("w", encoding="utf-8") as handle:
json.dump(metadata, handle, indent=2)
handle.write("\n")
return OrderedDict(
(
("bundlePath", str(bundle_path)),
("bundleSha256", bundle_sha),
("manifestPath", str(offline_manifest_path)),
("metadataPath", str(metadata_path)),
("signatures", signature_paths),
)
)
def parse_args(argv: Optional[list[str]] = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--version", required=True, help="Bundle version (e.g. 2025.10.0)")
parser.add_argument("--channel", default="edge", help="Release channel (default: %(default)s)")
parser.add_argument("--bundle-id", help="Optional explicit bundle identifier")
parser.add_argument(
"--release-dir",
type=Path,
default=DEFAULT_RELEASE_DIR,
help="Release artefact directory (default: %(default)s)",
)
parser.add_argument(
"--staging-dir",
type=Path,
default=DEFAULT_STAGING_DIR,
help="Temporary staging directory (default: %(default)s)",
)
parser.add_argument(
"--output-dir",
type=Path,
default=DEFAULT_OUTPUT_DIR,
help="Destination directory for packaged bundles (default: %(default)s)",
)
parser.add_argument("--cosign-key", dest="cosign_key", help="Cosign key reference for signing")
parser.add_argument("--cosign-password", dest="cosign_password", help="Cosign key password (if applicable)")
parser.add_argument("--cosign-identity-token", dest="cosign_identity_token", help="Cosign identity token")
parser.add_argument("--no-transparency", action="store_true", help="Disable Rekor transparency log uploads")
parser.add_argument("--skip-smoke", action="store_true", help="Skip analyzer smoke execution (testing only)")
return parser.parse_args(argv)
def main(argv: Optional[list[str]] = None) -> int:
args = parse_args(argv)
try:
result = build_offline_kit(args)
except Exception as exc: # pylint: disable=broad-except
print(f"offline-kit packaging failed: {exc}", file=sys.stderr)
return 1
print("✅ Offline kit packaged")
for key, value in result.items():
if isinstance(value, dict):
for sub_key, sub_val in value.items():
print(f" - {key}.{sub_key}: {sub_val}")
else:
print(f" - {key}: {value}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
"""Package the StellaOps Offline Kit with deterministic artefacts and manifest."""
from __future__ import annotations
import argparse
import datetime as dt
import hashlib
import json
import os
import re
import shutil
import subprocess
import sys
import tarfile
from collections import OrderedDict
from pathlib import Path
from typing import Any, Iterable, Mapping, MutableMapping, Optional
REPO_ROOT = Path(__file__).resolve().parents[2]
RELEASE_TOOLS_DIR = REPO_ROOT / "ops" / "devops" / "release"
TELEMETRY_TOOLS_DIR = REPO_ROOT / "ops" / "devops" / "telemetry"
TELEMETRY_BUNDLE_PATH = REPO_ROOT / "out" / "telemetry" / "telemetry-offline-bundle.tar.gz"
if str(RELEASE_TOOLS_DIR) not in sys.path:
sys.path.insert(0, str(RELEASE_TOOLS_DIR))
from verify_release import ( # type: ignore import-not-found
load_manifest,
resolve_path,
verify_release,
)
import mirror_debug_store # type: ignore import-not-found
DEFAULT_RELEASE_DIR = REPO_ROOT / "out" / "release"
DEFAULT_STAGING_DIR = REPO_ROOT / "out" / "offline-kit" / "staging"
DEFAULT_OUTPUT_DIR = REPO_ROOT / "out" / "offline-kit" / "dist"
ARTIFACT_TARGETS = {
"sbom": Path("sboms"),
"provenance": Path("attest"),
"signature": Path("signatures"),
"metadata": Path("metadata/docker"),
}
class CommandError(RuntimeError):
"""Raised when an external command fails."""
def run(cmd: Iterable[str], *, cwd: Optional[Path] = None, env: Optional[Mapping[str, str]] = None) -> str:
process_env = dict(os.environ)
if env:
process_env.update(env)
result = subprocess.run(
list(cmd),
cwd=str(cwd) if cwd else None,
env=process_env,
check=False,
capture_output=True,
text=True,
)
if result.returncode != 0:
raise CommandError(
f"Command failed ({result.returncode}): {' '.join(cmd)}\nSTDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}"
)
return result.stdout
def compute_sha256(path: Path) -> str:
sha = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
sha.update(chunk)
return sha.hexdigest()
def utc_now_iso() -> str:
return dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def safe_component_name(name: str) -> str:
return re.sub(r"[^A-Za-z0-9_.-]", "-", name.strip().lower())
def clean_directory(path: Path) -> None:
if path.exists():
shutil.rmtree(path)
path.mkdir(parents=True, exist_ok=True)
def run_python_analyzer_smoke() -> None:
script = REPO_ROOT / "ops" / "offline-kit" / "run-python-analyzer-smoke.sh"
run(["bash", str(script)], cwd=REPO_ROOT)
def copy_if_exists(source: Path, target: Path) -> None:
if source.is_dir():
shutil.copytree(source, target, dirs_exist_ok=True)
elif source.is_file():
target.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source, target)
def copy_release_manifests(release_dir: Path, staging_dir: Path) -> None:
manifest_dir = staging_dir / "manifest"
manifest_dir.mkdir(parents=True, exist_ok=True)
for name in ("release.yaml", "release.yaml.sha256", "release.json", "release.json.sha256"):
source = release_dir / name
if source.exists():
shutil.copy2(source, manifest_dir / source.name)
def copy_component_artifacts(
manifest: Mapping[str, Any],
release_dir: Path,
staging_dir: Path,
) -> None:
components = manifest.get("components") or []
for component in sorted(components, key=lambda entry: str(entry.get("name", ""))):
if not isinstance(component, Mapping):
continue
component_name = safe_component_name(str(component.get("name", "component")))
for key, target_root in ARTIFACT_TARGETS.items():
entry = component.get(key)
if not entry or not isinstance(entry, Mapping):
continue
path_str = entry.get("path")
if not path_str:
continue
resolved = resolve_path(str(path_str), release_dir)
if not resolved.exists():
raise FileNotFoundError(f"Component '{component_name}' {key} artefact not found: {resolved}")
target_dir = staging_dir / target_root
target_dir.mkdir(parents=True, exist_ok=True)
target_name = f"{component_name}-{resolved.name}" if resolved.name else component_name
shutil.copy2(resolved, target_dir / target_name)
def copy_collections(
manifest: Mapping[str, Any],
release_dir: Path,
staging_dir: Path,
) -> None:
for collection, subdir in (("charts", Path("charts")), ("compose", Path("compose"))):
entries = manifest.get(collection) or []
for entry in entries:
if not isinstance(entry, Mapping):
continue
path_str = entry.get("path")
if not path_str:
continue
resolved = resolve_path(str(path_str), release_dir)
if not resolved.exists():
raise FileNotFoundError(f"{collection} artefact not found: {resolved}")
target_dir = staging_dir / subdir
target_dir.mkdir(parents=True, exist_ok=True)
shutil.copy2(resolved, target_dir / resolved.name)
def copy_debug_store(release_dir: Path, staging_dir: Path) -> None:
mirror_debug_store.main(
[
"--release-dir",
str(release_dir),
"--offline-kit-dir",
str(staging_dir),
]
)
def copy_plugins_and_assets(staging_dir: Path) -> None:
copy_if_exists(REPO_ROOT / "plugins" / "scanner", staging_dir / "plugins" / "scanner")
copy_if_exists(REPO_ROOT / "certificates", staging_dir / "certificates")
copy_if_exists(REPO_ROOT / "seed-data", staging_dir / "seed-data")
docs_dir = staging_dir / "docs"
docs_dir.mkdir(parents=True, exist_ok=True)
copy_if_exists(REPO_ROOT / "docs" / "24_OFFLINE_KIT.md", docs_dir / "24_OFFLINE_KIT.md")
copy_if_exists(REPO_ROOT / "docs" / "ops" / "telemetry-collector.md", docs_dir / "telemetry-collector.md")
copy_if_exists(REPO_ROOT / "docs" / "ops" / "telemetry-storage.md", docs_dir / "telemetry-storage.md")
def package_telemetry_bundle(staging_dir: Path) -> None:
script = TELEMETRY_TOOLS_DIR / "package_offline_bundle.py"
if not script.exists():
return
TELEMETRY_BUNDLE_PATH.parent.mkdir(parents=True, exist_ok=True)
run(["python", str(script), "--output", str(TELEMETRY_BUNDLE_PATH)], cwd=REPO_ROOT)
telemetry_dir = staging_dir / "telemetry"
telemetry_dir.mkdir(parents=True, exist_ok=True)
shutil.copy2(TELEMETRY_BUNDLE_PATH, telemetry_dir / TELEMETRY_BUNDLE_PATH.name)
sha_path = TELEMETRY_BUNDLE_PATH.with_suffix(TELEMETRY_BUNDLE_PATH.suffix + ".sha256")
if sha_path.exists():
shutil.copy2(sha_path, telemetry_dir / sha_path.name)
def scan_files(staging_dir: Path, exclude: Optional[set[str]] = None) -> list[OrderedDict[str, Any]]:
entries: list[OrderedDict[str, Any]] = []
exclude = exclude or set()
for path in sorted(staging_dir.rglob("*")):
if not path.is_file():
continue
rel = path.relative_to(staging_dir).as_posix()
if rel in exclude:
continue
entries.append(
OrderedDict(
(
("name", rel),
("sha256", compute_sha256(path)),
("size", path.stat().st_size),
)
)
)
return entries
def write_offline_manifest(
staging_dir: Path,
version: str,
channel: str,
release_manifest_sha: Optional[str],
) -> tuple[Path, str]:
manifest_dir = staging_dir / "manifest"
manifest_dir.mkdir(parents=True, exist_ok=True)
offline_manifest_path = manifest_dir / "offline-manifest.json"
files = scan_files(staging_dir, exclude={"manifest/offline-manifest.json", "manifest/offline-manifest.json.sha256"})
manifest_data = OrderedDict(
(
(
"bundle",
OrderedDict(
(
("version", version),
("channel", channel),
("capturedAt", utc_now_iso()),
("releaseManifestSha256", release_manifest_sha),
)
),
),
("artifacts", files),
)
)
with offline_manifest_path.open("w", encoding="utf-8") as handle:
json.dump(manifest_data, handle, indent=2)
handle.write("\n")
manifest_sha = compute_sha256(offline_manifest_path)
(offline_manifest_path.with_suffix(".json.sha256")).write_text(
f"{manifest_sha} {offline_manifest_path.name}\n",
encoding="utf-8",
)
return offline_manifest_path, manifest_sha
def tarinfo_filter(tarinfo: tarfile.TarInfo) -> tarfile.TarInfo:
tarinfo.uid = 0
tarinfo.gid = 0
tarinfo.uname = ""
tarinfo.gname = ""
tarinfo.mtime = 0
return tarinfo
def create_tarball(staging_dir: Path, output_dir: Path, bundle_name: str) -> Path:
output_dir.mkdir(parents=True, exist_ok=True)
bundle_path = output_dir / f"{bundle_name}.tar.gz"
if bundle_path.exists():
bundle_path.unlink()
with tarfile.open(bundle_path, "w:gz", compresslevel=9) as tar:
for path in sorted(staging_dir.rglob("*")):
if path.is_file():
arcname = path.relative_to(staging_dir).as_posix()
tar.add(path, arcname=arcname, filter=tarinfo_filter)
return bundle_path
def sign_blob(
path: Path,
*,
key_ref: Optional[str],
identity_token: Optional[str],
password: Optional[str],
tlog_upload: bool,
) -> Optional[Path]:
if not key_ref and not identity_token:
return None
cmd = ["cosign", "sign-blob", "--yes", str(path)]
if key_ref:
cmd.extend(["--key", key_ref])
if identity_token:
cmd.extend(["--identity-token", identity_token])
if not tlog_upload:
cmd.append("--tlog-upload=false")
env = {"COSIGN_PASSWORD": password or ""}
signature = run(cmd, env=env)
sig_path = path.with_suffix(path.suffix + ".sig")
sig_path.write_text(signature, encoding="utf-8")
return sig_path
def build_offline_kit(args: argparse.Namespace) -> MutableMapping[str, Any]:
release_dir = args.release_dir.resolve()
staging_dir = args.staging_dir.resolve()
output_dir = args.output_dir.resolve()
verify_release(release_dir)
if not args.skip_smoke:
run_python_analyzer_smoke()
clean_directory(staging_dir)
copy_debug_store(release_dir, staging_dir)
manifest_data = load_manifest(release_dir)
release_manifest_sha = None
checksums = manifest_data.get("checksums")
if isinstance(checksums, Mapping):
release_manifest_sha = checksums.get("sha256")
copy_release_manifests(release_dir, staging_dir)
copy_component_artifacts(manifest_data, release_dir, staging_dir)
copy_collections(manifest_data, release_dir, staging_dir)
copy_plugins_and_assets(staging_dir)
package_telemetry_bundle(staging_dir)
offline_manifest_path, offline_manifest_sha = write_offline_manifest(
staging_dir,
args.version,
args.channel,
release_manifest_sha,
)
bundle_name = f"stella-ops-offline-kit-{args.version}-{args.channel}"
bundle_path = create_tarball(staging_dir, output_dir, bundle_name)
bundle_sha = compute_sha256(bundle_path)
bundle_sha_prefixed = f"sha256:{bundle_sha}"
(bundle_path.with_suffix(".tar.gz.sha256")).write_text(
f"{bundle_sha} {bundle_path.name}\n",
encoding="utf-8",
)
signature_paths: dict[str, str] = {}
sig = sign_blob(
bundle_path,
key_ref=args.cosign_key,
identity_token=args.cosign_identity_token,
password=args.cosign_password,
tlog_upload=not args.no_transparency,
)
if sig:
signature_paths["bundleSignature"] = str(sig)
manifest_sig = sign_blob(
offline_manifest_path,
key_ref=args.cosign_key,
identity_token=args.cosign_identity_token,
password=args.cosign_password,
tlog_upload=not args.no_transparency,
)
if manifest_sig:
signature_paths["manifestSignature"] = str(manifest_sig)
metadata = OrderedDict(
(
("bundleId", args.bundle_id or f"{args.version}-{args.channel}-{utc_now_iso()}"),
("bundleName", bundle_path.name),
("bundleSha256", bundle_sha_prefixed),
("bundleSize", bundle_path.stat().st_size),
("manifestName", offline_manifest_path.name),
("manifestSha256", f"sha256:{offline_manifest_sha}"),
("manifestSize", offline_manifest_path.stat().st_size),
("channel", args.channel),
("version", args.version),
("capturedAt", utc_now_iso()),
)
)
if sig:
metadata["bundleSignatureName"] = Path(sig).name
if manifest_sig:
metadata["manifestSignatureName"] = Path(manifest_sig).name
metadata_path = output_dir / f"{bundle_name}.metadata.json"
with metadata_path.open("w", encoding="utf-8") as handle:
json.dump(metadata, handle, indent=2)
handle.write("\n")
return OrderedDict(
(
("bundlePath", str(bundle_path)),
("bundleSha256", bundle_sha),
("manifestPath", str(offline_manifest_path)),
("metadataPath", str(metadata_path)),
("signatures", signature_paths),
)
)
def parse_args(argv: Optional[list[str]] = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--version", required=True, help="Bundle version (e.g. 2025.10.0)")
parser.add_argument("--channel", default="edge", help="Release channel (default: %(default)s)")
parser.add_argument("--bundle-id", help="Optional explicit bundle identifier")
parser.add_argument(
"--release-dir",
type=Path,
default=DEFAULT_RELEASE_DIR,
help="Release artefact directory (default: %(default)s)",
)
parser.add_argument(
"--staging-dir",
type=Path,
default=DEFAULT_STAGING_DIR,
help="Temporary staging directory (default: %(default)s)",
)
parser.add_argument(
"--output-dir",
type=Path,
default=DEFAULT_OUTPUT_DIR,
help="Destination directory for packaged bundles (default: %(default)s)",
)
parser.add_argument("--cosign-key", dest="cosign_key", help="Cosign key reference for signing")
parser.add_argument("--cosign-password", dest="cosign_password", help="Cosign key password (if applicable)")
parser.add_argument("--cosign-identity-token", dest="cosign_identity_token", help="Cosign identity token")
parser.add_argument("--no-transparency", action="store_true", help="Disable Rekor transparency log uploads")
parser.add_argument("--skip-smoke", action="store_true", help="Skip analyzer smoke execution (testing only)")
return parser.parse_args(argv)
def main(argv: Optional[list[str]] = None) -> int:
args = parse_args(argv)
try:
result = build_offline_kit(args)
except Exception as exc: # pylint: disable=broad-except
print(f"offline-kit packaging failed: {exc}", file=sys.stderr)
return 1
print("✅ Offline kit packaged")
for key, value in result.items():
if isinstance(value, dict):
for sub_key, sub_val in value.items():
print(f" - {key}.{sub_key}: {sub_val}")
else:
print(f" - {key}: {value}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,221 +1,221 @@
#!/usr/bin/env python3
"""Mirror release debug-store artefacts into the Offline Kit staging tree.
This helper copies the release `debug/` directory (including `.build-id/`,
`debug-manifest.json`, and the `.sha256` companion) into the Offline Kit
output directory and verifies the manifest hashes after the copy. A summary
document is written under `metadata/debug-store.json` so packaging jobs can
surface the available build-ids and validation status.
"""
from __future__ import annotations
import argparse
import datetime as dt
import json
import pathlib
import shutil
import sys
from typing import Iterable, Tuple
REPO_ROOT = pathlib.Path(__file__).resolve().parents[2]
def compute_sha256(path: pathlib.Path) -> str:
import hashlib
sha = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
sha.update(chunk)
return sha.hexdigest()
def load_manifest(manifest_path: pathlib.Path) -> dict:
with manifest_path.open("r", encoding="utf-8") as handle:
return json.load(handle)
def parse_manifest_sha(sha_path: pathlib.Path) -> str | None:
if not sha_path.exists():
return None
text = sha_path.read_text(encoding="utf-8").strip()
if not text:
return None
# Allow either "<sha>" or "<sha> filename" formats.
return text.split()[0]
def iter_debug_files(base_dir: pathlib.Path) -> Iterable[pathlib.Path]:
for path in base_dir.rglob("*"):
if path.is_file():
yield path
def copy_debug_store(source_root: pathlib.Path, target_root: pathlib.Path, *, dry_run: bool) -> None:
if dry_run:
print(f"[dry-run] Would copy '{source_root}' -> '{target_root}'")
return
if target_root.exists():
shutil.rmtree(target_root)
shutil.copytree(source_root, target_root)
def verify_debug_store(manifest: dict, offline_root: pathlib.Path) -> Tuple[int, int]:
"""Return (verified_count, total_entries)."""
artifacts = manifest.get("artifacts", [])
verified = 0
for entry in artifacts:
debug_path = entry.get("debugPath")
expected_sha = entry.get("sha256")
expected_size = entry.get("size")
if not debug_path or not expected_sha:
continue
relative = pathlib.PurePosixPath(debug_path)
resolved = (offline_root.parent / relative).resolve()
if not resolved.exists():
raise FileNotFoundError(f"Debug artefact missing after mirror: {relative}")
actual_sha = compute_sha256(resolved)
if actual_sha != expected_sha:
raise ValueError(
f"Digest mismatch for {relative}: expected {expected_sha}, found {actual_sha}"
)
if expected_size is not None:
actual_size = resolved.stat().st_size
if actual_size != expected_size:
raise ValueError(
f"Size mismatch for {relative}: expected {expected_size}, found {actual_size}"
)
verified += 1
return verified, len(artifacts)
def summarize_store(manifest: dict, manifest_sha: str | None, offline_root: pathlib.Path, summary_path: pathlib.Path) -> None:
debug_files = [
path
for path in iter_debug_files(offline_root)
if path.suffix == ".debug"
]
total_size = sum(path.stat().st_size for path in debug_files)
build_ids = sorted(
{entry.get("buildId") for entry in manifest.get("artifacts", []) if entry.get("buildId")}
)
summary = {
"generatedAt": dt.datetime.now(tz=dt.timezone.utc)
.replace(microsecond=0)
.isoformat()
.replace("+00:00", "Z"),
"manifestGeneratedAt": manifest.get("generatedAt"),
"manifestSha256": manifest_sha,
"platforms": manifest.get("platforms")
or sorted({entry.get("platform") for entry in manifest.get("artifacts", []) if entry.get("platform")}),
"artifactCount": len(manifest.get("artifacts", [])),
"buildIds": {
"total": len(build_ids),
"samples": build_ids[:10],
},
"debugFiles": {
"count": len(debug_files),
"totalSizeBytes": total_size,
},
}
summary_path.parent.mkdir(parents=True, exist_ok=True)
with summary_path.open("w", encoding="utf-8") as handle:
json.dump(summary, handle, indent=2)
handle.write("\n")
def resolve_release_debug_dir(base: pathlib.Path) -> pathlib.Path:
debug_dir = base / "debug"
if debug_dir.exists():
return debug_dir
# Allow specifying the channel directory directly (e.g. out/release/stable)
if base.name == "debug":
return base
raise FileNotFoundError(f"Debug directory not found under '{base}'")
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--release-dir",
type=pathlib.Path,
default=REPO_ROOT / "out" / "release",
help="Release output directory containing the debug store (default: %(default)s)",
)
parser.add_argument(
"--offline-kit-dir",
type=pathlib.Path,
default=REPO_ROOT / "out" / "offline-kit",
help="Offline Kit staging directory (default: %(default)s)",
)
parser.add_argument(
"--verify-only",
action="store_true",
help="Skip copying and only verify the existing offline kit debug store",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print actions without copying files",
)
return parser.parse_args(argv)
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv)
try:
source_debug = resolve_release_debug_dir(args.release_dir.resolve())
except FileNotFoundError as exc:
print(f"error: {exc}", file=sys.stderr)
return 2
target_root = (args.offline_kit_dir / "debug").resolve()
if not args.verify_only:
copy_debug_store(source_debug, target_root, dry_run=args.dry_run)
if args.dry_run:
return 0
manifest_path = target_root / "debug-manifest.json"
if not manifest_path.exists():
print(f"error: offline kit manifest missing at {manifest_path}", file=sys.stderr)
return 3
manifest = load_manifest(manifest_path)
manifest_sha_path = manifest_path.with_suffix(manifest_path.suffix + ".sha256")
recorded_sha = parse_manifest_sha(manifest_sha_path)
recomputed_sha = compute_sha256(manifest_path)
if recorded_sha and recorded_sha != recomputed_sha:
print(
f"warning: manifest SHA mismatch (recorded {recorded_sha}, recomputed {recomputed_sha}); updating checksum",
file=sys.stderr,
)
manifest_sha_path.write_text(f"{recomputed_sha} {manifest_path.name}\n", encoding="utf-8")
verified, total = verify_debug_store(manifest, target_root)
print(f"✔ verified {verified}/{total} debug artefacts (manifest SHA {recomputed_sha})")
summary_path = args.offline_kit_dir / "metadata" / "debug-store.json"
summarize_store(manifest, recomputed_sha, target_root, summary_path)
print(f" summary written to {summary_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
"""Mirror release debug-store artefacts into the Offline Kit staging tree.
This helper copies the release `debug/` directory (including `.build-id/`,
`debug-manifest.json`, and the `.sha256` companion) into the Offline Kit
output directory and verifies the manifest hashes after the copy. A summary
document is written under `metadata/debug-store.json` so packaging jobs can
surface the available build-ids and validation status.
"""
from __future__ import annotations
import argparse
import datetime as dt
import json
import pathlib
import shutil
import sys
from typing import Iterable, Tuple
REPO_ROOT = pathlib.Path(__file__).resolve().parents[2]
def compute_sha256(path: pathlib.Path) -> str:
import hashlib
sha = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
sha.update(chunk)
return sha.hexdigest()
def load_manifest(manifest_path: pathlib.Path) -> dict:
with manifest_path.open("r", encoding="utf-8") as handle:
return json.load(handle)
def parse_manifest_sha(sha_path: pathlib.Path) -> str | None:
if not sha_path.exists():
return None
text = sha_path.read_text(encoding="utf-8").strip()
if not text:
return None
# Allow either "<sha>" or "<sha> filename" formats.
return text.split()[0]
def iter_debug_files(base_dir: pathlib.Path) -> Iterable[pathlib.Path]:
for path in base_dir.rglob("*"):
if path.is_file():
yield path
def copy_debug_store(source_root: pathlib.Path, target_root: pathlib.Path, *, dry_run: bool) -> None:
if dry_run:
print(f"[dry-run] Would copy '{source_root}' -> '{target_root}'")
return
if target_root.exists():
shutil.rmtree(target_root)
shutil.copytree(source_root, target_root)
def verify_debug_store(manifest: dict, offline_root: pathlib.Path) -> Tuple[int, int]:
"""Return (verified_count, total_entries)."""
artifacts = manifest.get("artifacts", [])
verified = 0
for entry in artifacts:
debug_path = entry.get("debugPath")
expected_sha = entry.get("sha256")
expected_size = entry.get("size")
if not debug_path or not expected_sha:
continue
relative = pathlib.PurePosixPath(debug_path)
resolved = (offline_root.parent / relative).resolve()
if not resolved.exists():
raise FileNotFoundError(f"Debug artefact missing after mirror: {relative}")
actual_sha = compute_sha256(resolved)
if actual_sha != expected_sha:
raise ValueError(
f"Digest mismatch for {relative}: expected {expected_sha}, found {actual_sha}"
)
if expected_size is not None:
actual_size = resolved.stat().st_size
if actual_size != expected_size:
raise ValueError(
f"Size mismatch for {relative}: expected {expected_size}, found {actual_size}"
)
verified += 1
return verified, len(artifacts)
def summarize_store(manifest: dict, manifest_sha: str | None, offline_root: pathlib.Path, summary_path: pathlib.Path) -> None:
debug_files = [
path
for path in iter_debug_files(offline_root)
if path.suffix == ".debug"
]
total_size = sum(path.stat().st_size for path in debug_files)
build_ids = sorted(
{entry.get("buildId") for entry in manifest.get("artifacts", []) if entry.get("buildId")}
)
summary = {
"generatedAt": dt.datetime.now(tz=dt.timezone.utc)
.replace(microsecond=0)
.isoformat()
.replace("+00:00", "Z"),
"manifestGeneratedAt": manifest.get("generatedAt"),
"manifestSha256": manifest_sha,
"platforms": manifest.get("platforms")
or sorted({entry.get("platform") for entry in manifest.get("artifacts", []) if entry.get("platform")}),
"artifactCount": len(manifest.get("artifacts", [])),
"buildIds": {
"total": len(build_ids),
"samples": build_ids[:10],
},
"debugFiles": {
"count": len(debug_files),
"totalSizeBytes": total_size,
},
}
summary_path.parent.mkdir(parents=True, exist_ok=True)
with summary_path.open("w", encoding="utf-8") as handle:
json.dump(summary, handle, indent=2)
handle.write("\n")
def resolve_release_debug_dir(base: pathlib.Path) -> pathlib.Path:
debug_dir = base / "debug"
if debug_dir.exists():
return debug_dir
# Allow specifying the channel directory directly (e.g. out/release/stable)
if base.name == "debug":
return base
raise FileNotFoundError(f"Debug directory not found under '{base}'")
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--release-dir",
type=pathlib.Path,
default=REPO_ROOT / "out" / "release",
help="Release output directory containing the debug store (default: %(default)s)",
)
parser.add_argument(
"--offline-kit-dir",
type=pathlib.Path,
default=REPO_ROOT / "out" / "offline-kit",
help="Offline Kit staging directory (default: %(default)s)",
)
parser.add_argument(
"--verify-only",
action="store_true",
help="Skip copying and only verify the existing offline kit debug store",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print actions without copying files",
)
return parser.parse_args(argv)
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv)
try:
source_debug = resolve_release_debug_dir(args.release_dir.resolve())
except FileNotFoundError as exc:
print(f"error: {exc}", file=sys.stderr)
return 2
target_root = (args.offline_kit_dir / "debug").resolve()
if not args.verify_only:
copy_debug_store(source_debug, target_root, dry_run=args.dry_run)
if args.dry_run:
return 0
manifest_path = target_root / "debug-manifest.json"
if not manifest_path.exists():
print(f"error: offline kit manifest missing at {manifest_path}", file=sys.stderr)
return 3
manifest = load_manifest(manifest_path)
manifest_sha_path = manifest_path.with_suffix(manifest_path.suffix + ".sha256")
recorded_sha = parse_manifest_sha(manifest_sha_path)
recomputed_sha = compute_sha256(manifest_path)
if recorded_sha and recorded_sha != recomputed_sha:
print(
f"warning: manifest SHA mismatch (recorded {recorded_sha}, recomputed {recomputed_sha}); updating checksum",
file=sys.stderr,
)
manifest_sha_path.write_text(f"{recomputed_sha} {manifest_path.name}\n", encoding="utf-8")
verified, total = verify_debug_store(manifest, target_root)
print(f"✔ verified {verified}/{total} debug artefacts (manifest SHA {recomputed_sha})")
summary_path = args.offline_kit_dir / "metadata" / "debug-store.json"
summarize_store(manifest, recomputed_sha, target_root, summary_path)
print(f" summary written to {summary_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -2,7 +2,7 @@
set -euo pipefail
repo_root="$(git -C "${BASH_SOURCE%/*}/.." rev-parse --show-toplevel 2>/dev/null || pwd)"
project_path="${repo_root}/src/StellaOps.Scanner.Analyzers.Lang.Python/StellaOps.Scanner.Analyzers.Lang.Python.csproj"
project_path="${repo_root}/src/Scanner/__Libraries/StellaOps.Scanner.Analyzers.Lang.Python/StellaOps.Scanner.Analyzers.Lang.Python.csproj"
output_dir="${repo_root}/out/analyzers/python"
plugin_dir="${repo_root}/plugins/scanner/analyzers/lang/StellaOps.Scanner.Analyzers.Lang.Python"

View File

@@ -1,256 +1,256 @@
from __future__ import annotations
import json
import tarfile
import tempfile
import unittest
import argparse
import sys
from collections import OrderedDict
from pathlib import Path
sys.path.append(str(Path(__file__).resolve().parent))
from build_release import write_manifest # type: ignore import-not-found
from build_offline_kit import build_offline_kit, compute_sha256 # type: ignore import-not-found
class OfflineKitBuilderTests(unittest.TestCase):
def setUp(self) -> None:
self._temp = tempfile.TemporaryDirectory()
self.base_path = Path(self._temp.name)
self.out_dir = self.base_path / "out"
self.release_dir = self.out_dir / "release"
self.staging_dir = self.base_path / "staging"
self.output_dir = self.base_path / "dist"
self._create_sample_release()
def tearDown(self) -> None:
self._temp.cleanup()
def _relative_to_out(self, path: Path) -> str:
return path.relative_to(self.out_dir).as_posix()
def _write_json(self, path: Path, payload: dict[str, object]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as handle:
json.dump(payload, handle, indent=2)
handle.write("\n")
def _create_sample_release(self) -> None:
self.release_dir.mkdir(parents=True, exist_ok=True)
sbom_path = self.release_dir / "artifacts/sboms/sample.cyclonedx.json"
sbom_path.parent.mkdir(parents=True, exist_ok=True)
sbom_path.write_text('{"bomFormat":"CycloneDX","specVersion":"1.5"}\n', encoding="utf-8")
sbom_sha = compute_sha256(sbom_path)
provenance_path = self.release_dir / "artifacts/provenance/sample.provenance.json"
self._write_json(
provenance_path,
{
"buildDefinition": {"buildType": "https://example/build"},
"runDetails": {"builder": {"id": "https://example/ci"}},
},
)
provenance_sha = compute_sha256(provenance_path)
signature_path = self.release_dir / "artifacts/signatures/sample.signature"
signature_path.parent.mkdir(parents=True, exist_ok=True)
signature_path.write_text("signature-data\n", encoding="utf-8")
signature_sha = compute_sha256(signature_path)
metadata_path = self.release_dir / "artifacts/metadata/sample.metadata.json"
self._write_json(metadata_path, {"digest": "sha256:1234"})
metadata_sha = compute_sha256(metadata_path)
chart_path = self.release_dir / "helm/stellaops-1.0.0.tgz"
chart_path.parent.mkdir(parents=True, exist_ok=True)
chart_path.write_bytes(b"helm-chart-data")
chart_sha = compute_sha256(chart_path)
compose_path = self.release_dir.parent / "deploy/compose/docker-compose.dev.yaml"
compose_path.parent.mkdir(parents=True, exist_ok=True)
compose_path.write_text("services: {}\n", encoding="utf-8")
compose_sha = compute_sha256(compose_path)
debug_file = self.release_dir / "debug/.build-id/ab/cdef.debug"
debug_file.parent.mkdir(parents=True, exist_ok=True)
debug_file.write_bytes(b"\x7fELFDEBUGDATA")
debug_sha = compute_sha256(debug_file)
debug_manifest_path = self.release_dir / "debug/debug-manifest.json"
debug_manifest = OrderedDict(
(
("generatedAt", "2025-10-26T00:00:00Z"),
("version", "1.0.0"),
("channel", "edge"),
(
"artifacts",
[
OrderedDict(
(
("buildId", "abcdef1234"),
("platform", "linux/amd64"),
("debugPath", "debug/.build-id/ab/cdef.debug"),
("sha256", debug_sha),
("size", debug_file.stat().st_size),
("components", ["sample"]),
("images", ["registry.example/sample@sha256:feedface"]),
("sources", ["app/sample.dll"]),
)
)
],
),
)
)
self._write_json(debug_manifest_path, debug_manifest)
debug_manifest_sha = compute_sha256(debug_manifest_path)
(debug_manifest_path.with_suffix(debug_manifest_path.suffix + ".sha256")).write_text(
f"{debug_manifest_sha} {debug_manifest_path.name}\n",
encoding="utf-8",
)
manifest = OrderedDict(
(
(
"release",
OrderedDict(
(
("version", "1.0.0"),
("channel", "edge"),
("date", "2025-10-26T00:00:00Z"),
("calendar", "2025.10"),
)
),
),
(
"components",
[
OrderedDict(
(
("name", "sample"),
("image", "registry.example/sample@sha256:feedface"),
("tags", ["registry.example/sample:1.0.0"]),
(
"sbom",
OrderedDict(
(
("path", self._relative_to_out(sbom_path)),
("sha256", sbom_sha),
)
),
),
(
"provenance",
OrderedDict(
(
("path", self._relative_to_out(provenance_path)),
("sha256", provenance_sha),
)
),
),
(
"signature",
OrderedDict(
(
("path", self._relative_to_out(signature_path)),
("sha256", signature_sha),
("ref", "sigstore://example"),
("tlogUploaded", True),
)
),
),
(
"metadata",
OrderedDict(
(
("path", self._relative_to_out(metadata_path)),
("sha256", metadata_sha),
)
),
),
)
)
],
),
(
"charts",
[
OrderedDict(
(
("name", "stellaops"),
("version", "1.0.0"),
("path", self._relative_to_out(chart_path)),
("sha256", chart_sha),
)
)
],
),
(
"compose",
[
OrderedDict(
(
("name", "docker-compose.dev.yaml"),
("path", compose_path.relative_to(self.out_dir).as_posix()),
("sha256", compose_sha),
)
)
],
),
(
"debugStore",
OrderedDict(
(
("manifest", "debug/debug-manifest.json"),
("sha256", debug_manifest_sha),
("entries", 1),
("platforms", ["linux/amd64"]),
("directory", "debug/.build-id"),
)
),
),
)
)
write_manifest(manifest, self.release_dir)
def test_build_offline_kit(self) -> None:
args = argparse.Namespace(
version="2025.10.0",
channel="edge",
bundle_id="bundle-001",
release_dir=self.release_dir,
staging_dir=self.staging_dir,
output_dir=self.output_dir,
cosign_key=None,
cosign_password=None,
cosign_identity_token=None,
no_transparency=False,
skip_smoke=True,
)
result = build_offline_kit(args)
bundle_path = Path(result["bundlePath"])
self.assertTrue(bundle_path.exists())
offline_manifest = self.output_dir.parent / "staging" / "manifest" / "offline-manifest.json"
self.assertTrue(offline_manifest.exists())
with offline_manifest.open("r", encoding="utf-8") as handle:
manifest_data = json.load(handle)
artifacts = manifest_data["artifacts"]
self.assertTrue(any(item["name"].startswith("sboms/") for item in artifacts))
metadata_path = Path(result["metadataPath"])
data = json.loads(metadata_path.read_text(encoding="utf-8"))
self.assertTrue(data["bundleSha256"].startswith("sha256:"))
self.assertTrue(data["manifestSha256"].startswith("sha256:"))
with tarfile.open(bundle_path, "r:gz") as tar:
members = tar.getnames()
self.assertIn("manifest/release.yaml", members)
self.assertTrue(any(name.startswith("sboms/sample-") for name in members))
if __name__ == "__main__":
unittest.main()
from __future__ import annotations
import json
import tarfile
import tempfile
import unittest
import argparse
import sys
from collections import OrderedDict
from pathlib import Path
sys.path.append(str(Path(__file__).resolve().parent))
from build_release import write_manifest # type: ignore import-not-found
from build_offline_kit import build_offline_kit, compute_sha256 # type: ignore import-not-found
class OfflineKitBuilderTests(unittest.TestCase):
def setUp(self) -> None:
self._temp = tempfile.TemporaryDirectory()
self.base_path = Path(self._temp.name)
self.out_dir = self.base_path / "out"
self.release_dir = self.out_dir / "release"
self.staging_dir = self.base_path / "staging"
self.output_dir = self.base_path / "dist"
self._create_sample_release()
def tearDown(self) -> None:
self._temp.cleanup()
def _relative_to_out(self, path: Path) -> str:
return path.relative_to(self.out_dir).as_posix()
def _write_json(self, path: Path, payload: dict[str, object]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as handle:
json.dump(payload, handle, indent=2)
handle.write("\n")
def _create_sample_release(self) -> None:
self.release_dir.mkdir(parents=True, exist_ok=True)
sbom_path = self.release_dir / "artifacts/sboms/sample.cyclonedx.json"
sbom_path.parent.mkdir(parents=True, exist_ok=True)
sbom_path.write_text('{"bomFormat":"CycloneDX","specVersion":"1.5"}\n', encoding="utf-8")
sbom_sha = compute_sha256(sbom_path)
provenance_path = self.release_dir / "artifacts/provenance/sample.provenance.json"
self._write_json(
provenance_path,
{
"buildDefinition": {"buildType": "https://example/build"},
"runDetails": {"builder": {"id": "https://example/ci"}},
},
)
provenance_sha = compute_sha256(provenance_path)
signature_path = self.release_dir / "artifacts/signatures/sample.signature"
signature_path.parent.mkdir(parents=True, exist_ok=True)
signature_path.write_text("signature-data\n", encoding="utf-8")
signature_sha = compute_sha256(signature_path)
metadata_path = self.release_dir / "artifacts/metadata/sample.metadata.json"
self._write_json(metadata_path, {"digest": "sha256:1234"})
metadata_sha = compute_sha256(metadata_path)
chart_path = self.release_dir / "helm/stellaops-1.0.0.tgz"
chart_path.parent.mkdir(parents=True, exist_ok=True)
chart_path.write_bytes(b"helm-chart-data")
chart_sha = compute_sha256(chart_path)
compose_path = self.release_dir.parent / "deploy/compose/docker-compose.dev.yaml"
compose_path.parent.mkdir(parents=True, exist_ok=True)
compose_path.write_text("services: {}\n", encoding="utf-8")
compose_sha = compute_sha256(compose_path)
debug_file = self.release_dir / "debug/.build-id/ab/cdef.debug"
debug_file.parent.mkdir(parents=True, exist_ok=True)
debug_file.write_bytes(b"\x7fELFDEBUGDATA")
debug_sha = compute_sha256(debug_file)
debug_manifest_path = self.release_dir / "debug/debug-manifest.json"
debug_manifest = OrderedDict(
(
("generatedAt", "2025-10-26T00:00:00Z"),
("version", "1.0.0"),
("channel", "edge"),
(
"artifacts",
[
OrderedDict(
(
("buildId", "abcdef1234"),
("platform", "linux/amd64"),
("debugPath", "debug/.build-id/ab/cdef.debug"),
("sha256", debug_sha),
("size", debug_file.stat().st_size),
("components", ["sample"]),
("images", ["registry.example/sample@sha256:feedface"]),
("sources", ["app/sample.dll"]),
)
)
],
),
)
)
self._write_json(debug_manifest_path, debug_manifest)
debug_manifest_sha = compute_sha256(debug_manifest_path)
(debug_manifest_path.with_suffix(debug_manifest_path.suffix + ".sha256")).write_text(
f"{debug_manifest_sha} {debug_manifest_path.name}\n",
encoding="utf-8",
)
manifest = OrderedDict(
(
(
"release",
OrderedDict(
(
("version", "1.0.0"),
("channel", "edge"),
("date", "2025-10-26T00:00:00Z"),
("calendar", "2025.10"),
)
),
),
(
"components",
[
OrderedDict(
(
("name", "sample"),
("image", "registry.example/sample@sha256:feedface"),
("tags", ["registry.example/sample:1.0.0"]),
(
"sbom",
OrderedDict(
(
("path", self._relative_to_out(sbom_path)),
("sha256", sbom_sha),
)
),
),
(
"provenance",
OrderedDict(
(
("path", self._relative_to_out(provenance_path)),
("sha256", provenance_sha),
)
),
),
(
"signature",
OrderedDict(
(
("path", self._relative_to_out(signature_path)),
("sha256", signature_sha),
("ref", "sigstore://example"),
("tlogUploaded", True),
)
),
),
(
"metadata",
OrderedDict(
(
("path", self._relative_to_out(metadata_path)),
("sha256", metadata_sha),
)
),
),
)
)
],
),
(
"charts",
[
OrderedDict(
(
("name", "stellaops"),
("version", "1.0.0"),
("path", self._relative_to_out(chart_path)),
("sha256", chart_sha),
)
)
],
),
(
"compose",
[
OrderedDict(
(
("name", "docker-compose.dev.yaml"),
("path", compose_path.relative_to(self.out_dir).as_posix()),
("sha256", compose_sha),
)
)
],
),
(
"debugStore",
OrderedDict(
(
("manifest", "debug/debug-manifest.json"),
("sha256", debug_manifest_sha),
("entries", 1),
("platforms", ["linux/amd64"]),
("directory", "debug/.build-id"),
)
),
),
)
)
write_manifest(manifest, self.release_dir)
def test_build_offline_kit(self) -> None:
args = argparse.Namespace(
version="2025.10.0",
channel="edge",
bundle_id="bundle-001",
release_dir=self.release_dir,
staging_dir=self.staging_dir,
output_dir=self.output_dir,
cosign_key=None,
cosign_password=None,
cosign_identity_token=None,
no_transparency=False,
skip_smoke=True,
)
result = build_offline_kit(args)
bundle_path = Path(result["bundlePath"])
self.assertTrue(bundle_path.exists())
offline_manifest = self.output_dir.parent / "staging" / "manifest" / "offline-manifest.json"
self.assertTrue(offline_manifest.exists())
with offline_manifest.open("r", encoding="utf-8") as handle:
manifest_data = json.load(handle)
artifacts = manifest_data["artifacts"]
self.assertTrue(any(item["name"].startswith("sboms/") for item in artifacts))
metadata_path = Path(result["metadataPath"])
data = json.loads(metadata_path.read_text(encoding="utf-8"))
self.assertTrue(data["bundleSha256"].startswith("sha256:"))
self.assertTrue(data["manifestSha256"].startswith("sha256:"))
with tarfile.open(bundle_path, "r:gz") as tar:
members = tar.getnames()
self.assertIn("manifest/release.yaml", members)
self.assertTrue(any(name.startswith("sboms/sample-") for name in members))
if __name__ == "__main__":
unittest.main()