Files
git.stella-ops.org/ops/devops/release/verify_release.py
master 96d52884e8
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
Add Policy DSL Validator, Schema Exporter, and Simulation Smoke tools
- Implemented PolicyDslValidator with command-line options for strict mode and JSON output.
- Created PolicySchemaExporter to generate JSON schemas for policy-related models.
- Developed PolicySimulationSmoke tool to validate policy simulations against expected outcomes.
- Added project files and necessary dependencies for each tool.
- Ensured proper error handling and usage instructions across tools.
2025-10-27 08:00:11 +02:00

280 lines
11 KiB
Python

#!/usr/bin/env python3
"""Verify release artefacts (SBOMs, provenance, signatures, manifest hashes)."""
from __future__ import annotations
import argparse
import hashlib
import json
import pathlib
import sys
from collections import OrderedDict
from typing import Any, Mapping, Optional
from build_release import dump_yaml # type: ignore import-not-found
class VerificationError(Exception):
"""Raised when release artefacts fail verification."""
def compute_sha256(path: pathlib.Path) -> str:
sha = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
sha.update(chunk)
return sha.hexdigest()
def parse_sha_file(path: pathlib.Path) -> Optional[str]:
if not path.exists():
return None
content = path.read_text(encoding="utf-8").strip()
if not content:
return None
return content.split()[0]
def resolve_path(path_str: str, release_dir: pathlib.Path) -> pathlib.Path:
candidate = pathlib.Path(path_str.replace("\\", "/"))
if candidate.is_absolute():
return candidate
for base in (release_dir, release_dir.parent, release_dir.parent.parent):
resolved = (base / candidate).resolve()
if resolved.exists():
return resolved
# Fall back to release_dir joined path even if missing to surface in caller.
return (release_dir / candidate).resolve()
def load_manifest(release_dir: pathlib.Path) -> OrderedDict[str, Any]:
manifest_path = release_dir / "release.json"
if not manifest_path.exists():
raise VerificationError(f"Release manifest JSON missing at {manifest_path}")
try:
with manifest_path.open("r", encoding="utf-8") as handle:
return json.load(handle, object_pairs_hook=OrderedDict)
except json.JSONDecodeError as exc:
raise VerificationError(f"Failed to parse {manifest_path}: {exc}") from exc
def verify_manifest_hashes(
manifest: Mapping[str, Any],
release_dir: pathlib.Path,
errors: list[str],
) -> None:
yaml_path = release_dir / "release.yaml"
if not yaml_path.exists():
errors.append(f"Missing release.yaml at {yaml_path}")
return
recorded_yaml_sha = parse_sha_file(yaml_path.with_name(yaml_path.name + ".sha256"))
actual_yaml_sha = compute_sha256(yaml_path)
if recorded_yaml_sha and recorded_yaml_sha != actual_yaml_sha:
errors.append(
f"release.yaml.sha256 recorded {recorded_yaml_sha} but file hashes to {actual_yaml_sha}"
)
json_path = release_dir / "release.json"
recorded_json_sha = parse_sha_file(json_path.with_name(json_path.name + ".sha256"))
actual_json_sha = compute_sha256(json_path)
if recorded_json_sha and recorded_json_sha != actual_json_sha:
errors.append(
f"release.json.sha256 recorded {recorded_json_sha} but file hashes to {actual_json_sha}"
)
checksums = manifest.get("checksums")
if isinstance(checksums, Mapping):
recorded_digest = checksums.get("sha256")
base_manifest = OrderedDict(manifest)
base_manifest.pop("checksums", None)
yaml_without_checksums = dump_yaml(base_manifest)
computed_digest = hashlib.sha256(yaml_without_checksums.encode("utf-8")).hexdigest()
if recorded_digest != computed_digest:
errors.append(
"Manifest checksum mismatch: "
f"recorded {recorded_digest}, computed {computed_digest}"
)
def verify_artifact_entry(
entry: Mapping[str, Any],
release_dir: pathlib.Path,
label: str,
component_name: str,
errors: list[str],
) -> None:
path_str = entry.get("path")
if not path_str:
errors.append(f"{component_name}: {label} missing 'path' field.")
return
resolved = resolve_path(str(path_str), release_dir)
if not resolved.exists():
errors.append(f"{component_name}: {label} path does not exist → {resolved}")
return
recorded_sha = entry.get("sha256")
if recorded_sha:
actual_sha = compute_sha256(resolved)
if actual_sha != recorded_sha:
errors.append(
f"{component_name}: {label} SHA mismatch for {resolved} "
f"(recorded {recorded_sha}, computed {actual_sha})"
)
def verify_components(manifest: Mapping[str, Any], release_dir: pathlib.Path, errors: list[str]) -> None:
for component in manifest.get("components", []):
if not isinstance(component, Mapping):
errors.append("Component entry is not a mapping.")
continue
name = str(component.get("name", "<unknown>"))
for key, label in (
("sbom", "SBOM"),
("provenance", "provenance"),
("signature", "signature"),
("metadata", "metadata"),
):
entry = component.get(key)
if not entry:
continue
if not isinstance(entry, Mapping):
errors.append(f"{name}: {label} entry must be a mapping.")
continue
verify_artifact_entry(entry, release_dir, label, name, errors)
def verify_collections(manifest: Mapping[str, Any], release_dir: pathlib.Path, errors: list[str]) -> None:
for collection, label in (
("charts", "chart"),
("compose", "compose file"),
):
for item in manifest.get(collection, []):
if not isinstance(item, Mapping):
errors.append(f"{collection} entry is not a mapping.")
continue
path_value = item.get("path")
if not path_value:
errors.append(f"{collection} entry missing path.")
continue
resolved = resolve_path(str(path_value), release_dir)
if not resolved.exists():
errors.append(f"{label} missing file → {resolved}")
continue
recorded_sha = item.get("sha256")
if recorded_sha:
actual_sha = compute_sha256(resolved)
if actual_sha != recorded_sha:
errors.append(
f"{label} SHA mismatch for {resolved} "
f"(recorded {recorded_sha}, computed {actual_sha})"
)
def verify_debug_store(manifest: Mapping[str, Any], release_dir: pathlib.Path, errors: list[str]) -> None:
debug = manifest.get("debugStore")
if not isinstance(debug, Mapping):
return
manifest_path_str = debug.get("manifest")
manifest_data: Optional[Mapping[str, Any]] = None
if manifest_path_str:
manifest_path = resolve_path(str(manifest_path_str), release_dir)
if not manifest_path.exists():
errors.append(f"Debug manifest missing → {manifest_path}")
else:
recorded_sha = debug.get("sha256")
if recorded_sha:
actual_sha = compute_sha256(manifest_path)
if actual_sha != recorded_sha:
errors.append(
f"Debug manifest SHA mismatch (recorded {recorded_sha}, computed {actual_sha})"
)
sha_sidecar = manifest_path.with_suffix(manifest_path.suffix + ".sha256")
sidecar_sha = parse_sha_file(sha_sidecar)
if sidecar_sha and recorded_sha and sidecar_sha != recorded_sha:
errors.append(
f"Debug manifest sidecar digest {sidecar_sha} disagrees with recorded {recorded_sha}"
)
try:
with manifest_path.open("r", encoding="utf-8") as handle:
manifest_data = json.load(handle)
except json.JSONDecodeError as exc:
errors.append(f"Debug manifest JSON invalid: {exc}")
directory = debug.get("directory")
if directory:
debug_dir = resolve_path(str(directory), release_dir)
if not debug_dir.exists():
errors.append(f"Debug directory missing → {debug_dir}")
if manifest_data:
artifacts = manifest_data.get("artifacts")
if not isinstance(artifacts, list) or not artifacts:
errors.append("Debug manifest contains no artefacts.")
return
declared_entries = debug.get("entries")
if isinstance(declared_entries, int) and declared_entries != len(artifacts):
errors.append(
f"Debug manifest reports {declared_entries} entries but contains {len(artifacts)} artefacts."
)
for artefact in artifacts:
if not isinstance(artefact, Mapping):
errors.append("Debug manifest artefact entry is not a mapping.")
continue
debug_path = artefact.get("debugPath")
artefact_sha = artefact.get("sha256")
if not debug_path or not artefact_sha:
errors.append("Debug manifest artefact missing debugPath or sha256.")
continue
resolved_debug = resolve_path(str(debug_path), release_dir)
if not resolved_debug.exists():
errors.append(f"Debug artefact missing → {resolved_debug}")
continue
actual_sha = compute_sha256(resolved_debug)
if actual_sha != artefact_sha:
errors.append(
f"Debug artefact SHA mismatch for {resolved_debug} "
f"(recorded {artefact_sha}, computed {actual_sha})"
)
def verify_release(release_dir: pathlib.Path) -> None:
if not release_dir.exists():
raise VerificationError(f"Release directory not found: {release_dir}")
manifest = load_manifest(release_dir)
errors: list[str] = []
verify_manifest_hashes(manifest, release_dir, errors)
verify_components(manifest, release_dir, errors)
verify_collections(manifest, release_dir, errors)
verify_debug_store(manifest, release_dir, errors)
if errors:
bullet_list = "\n - ".join(errors)
raise VerificationError(f"Release verification failed:\n - {bullet_list}")
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--release-dir",
type=pathlib.Path,
default=pathlib.Path("out/release"),
help="Path to the release artefact directory (default: %(default)s)",
)
return parser.parse_args(argv)
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv)
try:
verify_release(args.release_dir.resolve())
except VerificationError as exc:
print(str(exc), file=sys.stderr)
return 1
print(f"✅ Release artefacts verified OK in {args.release_dir}")
return 0
if __name__ == "__main__":
raise SystemExit(main())