- Implemented PolicyDslValidator with command-line options for strict mode and JSON output. - Created PolicySchemaExporter to generate JSON schemas for policy-related models. - Developed PolicySimulationSmoke tool to validate policy simulations against expected outcomes. - Added project files and necessary dependencies for each tool. - Ensured proper error handling and usage instructions across tools.
		
			
				
	
	
		
			280 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			280 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/env python3
 | 
						|
"""Verify release artefacts (SBOMs, provenance, signatures, manifest hashes)."""
 | 
						|
 | 
						|
from __future__ import annotations
 | 
						|
 | 
						|
import argparse
 | 
						|
import hashlib
 | 
						|
import json
 | 
						|
import pathlib
 | 
						|
import sys
 | 
						|
from collections import OrderedDict
 | 
						|
from typing import Any, Mapping, Optional
 | 
						|
 | 
						|
from build_release import dump_yaml  # type: ignore import-not-found
 | 
						|
 | 
						|
 | 
						|
class VerificationError(Exception):
 | 
						|
    """Raised when release artefacts fail verification."""
 | 
						|
 | 
						|
 | 
						|
def compute_sha256(path: pathlib.Path) -> str:
 | 
						|
    sha = hashlib.sha256()
 | 
						|
    with path.open("rb") as handle:
 | 
						|
        for chunk in iter(lambda: handle.read(1024 * 1024), b""):
 | 
						|
            sha.update(chunk)
 | 
						|
    return sha.hexdigest()
 | 
						|
 | 
						|
 | 
						|
def parse_sha_file(path: pathlib.Path) -> Optional[str]:
 | 
						|
    if not path.exists():
 | 
						|
        return None
 | 
						|
    content = path.read_text(encoding="utf-8").strip()
 | 
						|
    if not content:
 | 
						|
        return None
 | 
						|
    return content.split()[0]
 | 
						|
 | 
						|
 | 
						|
def resolve_path(path_str: str, release_dir: pathlib.Path) -> pathlib.Path:
 | 
						|
    candidate = pathlib.Path(path_str.replace("\\", "/"))
 | 
						|
    if candidate.is_absolute():
 | 
						|
        return candidate
 | 
						|
 | 
						|
    for base in (release_dir, release_dir.parent, release_dir.parent.parent):
 | 
						|
        resolved = (base / candidate).resolve()
 | 
						|
        if resolved.exists():
 | 
						|
            return resolved
 | 
						|
    # Fall back to release_dir joined path even if missing to surface in caller.
 | 
						|
    return (release_dir / candidate).resolve()
 | 
						|
 | 
						|
 | 
						|
def load_manifest(release_dir: pathlib.Path) -> OrderedDict[str, Any]:
 | 
						|
    manifest_path = release_dir / "release.json"
 | 
						|
    if not manifest_path.exists():
 | 
						|
        raise VerificationError(f"Release manifest JSON missing at {manifest_path}")
 | 
						|
    try:
 | 
						|
        with manifest_path.open("r", encoding="utf-8") as handle:
 | 
						|
            return json.load(handle, object_pairs_hook=OrderedDict)
 | 
						|
    except json.JSONDecodeError as exc:
 | 
						|
        raise VerificationError(f"Failed to parse {manifest_path}: {exc}") from exc
 | 
						|
 | 
						|
 | 
						|
def verify_manifest_hashes(
 | 
						|
    manifest: Mapping[str, Any],
 | 
						|
    release_dir: pathlib.Path,
 | 
						|
    errors: list[str],
 | 
						|
) -> None:
 | 
						|
    yaml_path = release_dir / "release.yaml"
 | 
						|
    if not yaml_path.exists():
 | 
						|
        errors.append(f"Missing release.yaml at {yaml_path}")
 | 
						|
        return
 | 
						|
 | 
						|
    recorded_yaml_sha = parse_sha_file(yaml_path.with_name(yaml_path.name + ".sha256"))
 | 
						|
    actual_yaml_sha = compute_sha256(yaml_path)
 | 
						|
    if recorded_yaml_sha and recorded_yaml_sha != actual_yaml_sha:
 | 
						|
        errors.append(
 | 
						|
            f"release.yaml.sha256 recorded {recorded_yaml_sha} but file hashes to {actual_yaml_sha}"
 | 
						|
        )
 | 
						|
 | 
						|
    json_path = release_dir / "release.json"
 | 
						|
    recorded_json_sha = parse_sha_file(json_path.with_name(json_path.name + ".sha256"))
 | 
						|
    actual_json_sha = compute_sha256(json_path)
 | 
						|
    if recorded_json_sha and recorded_json_sha != actual_json_sha:
 | 
						|
        errors.append(
 | 
						|
            f"release.json.sha256 recorded {recorded_json_sha} but file hashes to {actual_json_sha}"
 | 
						|
        )
 | 
						|
 | 
						|
    checksums = manifest.get("checksums")
 | 
						|
    if isinstance(checksums, Mapping):
 | 
						|
        recorded_digest = checksums.get("sha256")
 | 
						|
        base_manifest = OrderedDict(manifest)
 | 
						|
        base_manifest.pop("checksums", None)
 | 
						|
        yaml_without_checksums = dump_yaml(base_manifest)
 | 
						|
        computed_digest = hashlib.sha256(yaml_without_checksums.encode("utf-8")).hexdigest()
 | 
						|
        if recorded_digest != computed_digest:
 | 
						|
            errors.append(
 | 
						|
                "Manifest checksum mismatch: "
 | 
						|
                f"recorded {recorded_digest}, computed {computed_digest}"
 | 
						|
            )
 | 
						|
 | 
						|
 | 
						|
def verify_artifact_entry(
 | 
						|
    entry: Mapping[str, Any],
 | 
						|
    release_dir: pathlib.Path,
 | 
						|
    label: str,
 | 
						|
    component_name: str,
 | 
						|
    errors: list[str],
 | 
						|
) -> None:
 | 
						|
    path_str = entry.get("path")
 | 
						|
    if not path_str:
 | 
						|
        errors.append(f"{component_name}: {label} missing 'path' field.")
 | 
						|
        return
 | 
						|
    resolved = resolve_path(str(path_str), release_dir)
 | 
						|
    if not resolved.exists():
 | 
						|
        errors.append(f"{component_name}: {label} path does not exist → {resolved}")
 | 
						|
        return
 | 
						|
    recorded_sha = entry.get("sha256")
 | 
						|
    if recorded_sha:
 | 
						|
        actual_sha = compute_sha256(resolved)
 | 
						|
        if actual_sha != recorded_sha:
 | 
						|
            errors.append(
 | 
						|
                f"{component_name}: {label} SHA mismatch for {resolved} "
 | 
						|
                f"(recorded {recorded_sha}, computed {actual_sha})"
 | 
						|
            )
 | 
						|
 | 
						|
 | 
						|
def verify_components(manifest: Mapping[str, Any], release_dir: pathlib.Path, errors: list[str]) -> None:
 | 
						|
    for component in manifest.get("components", []):
 | 
						|
        if not isinstance(component, Mapping):
 | 
						|
            errors.append("Component entry is not a mapping.")
 | 
						|
            continue
 | 
						|
        name = str(component.get("name", "<unknown>"))
 | 
						|
        for key, label in (
 | 
						|
            ("sbom", "SBOM"),
 | 
						|
            ("provenance", "provenance"),
 | 
						|
            ("signature", "signature"),
 | 
						|
            ("metadata", "metadata"),
 | 
						|
        ):
 | 
						|
            entry = component.get(key)
 | 
						|
            if not entry:
 | 
						|
                continue
 | 
						|
            if not isinstance(entry, Mapping):
 | 
						|
                errors.append(f"{name}: {label} entry must be a mapping.")
 | 
						|
                continue
 | 
						|
            verify_artifact_entry(entry, release_dir, label, name, errors)
 | 
						|
 | 
						|
 | 
						|
def verify_collections(manifest: Mapping[str, Any], release_dir: pathlib.Path, errors: list[str]) -> None:
 | 
						|
    for collection, label in (
 | 
						|
        ("charts", "chart"),
 | 
						|
        ("compose", "compose file"),
 | 
						|
    ):
 | 
						|
        for item in manifest.get(collection, []):
 | 
						|
            if not isinstance(item, Mapping):
 | 
						|
                errors.append(f"{collection} entry is not a mapping.")
 | 
						|
                continue
 | 
						|
            path_value = item.get("path")
 | 
						|
            if not path_value:
 | 
						|
                errors.append(f"{collection} entry missing path.")
 | 
						|
                continue
 | 
						|
            resolved = resolve_path(str(path_value), release_dir)
 | 
						|
            if not resolved.exists():
 | 
						|
                errors.append(f"{label} missing file → {resolved}")
 | 
						|
                continue
 | 
						|
            recorded_sha = item.get("sha256")
 | 
						|
            if recorded_sha:
 | 
						|
                actual_sha = compute_sha256(resolved)
 | 
						|
                if actual_sha != recorded_sha:
 | 
						|
                    errors.append(
 | 
						|
                        f"{label} SHA mismatch for {resolved} "
 | 
						|
                        f"(recorded {recorded_sha}, computed {actual_sha})"
 | 
						|
                    )
 | 
						|
 | 
						|
 | 
						|
def verify_debug_store(manifest: Mapping[str, Any], release_dir: pathlib.Path, errors: list[str]) -> None:
 | 
						|
    debug = manifest.get("debugStore")
 | 
						|
    if not isinstance(debug, Mapping):
 | 
						|
        return
 | 
						|
    manifest_path_str = debug.get("manifest")
 | 
						|
    manifest_data: Optional[Mapping[str, Any]] = None
 | 
						|
    if manifest_path_str:
 | 
						|
        manifest_path = resolve_path(str(manifest_path_str), release_dir)
 | 
						|
        if not manifest_path.exists():
 | 
						|
            errors.append(f"Debug manifest missing → {manifest_path}")
 | 
						|
        else:
 | 
						|
            recorded_sha = debug.get("sha256")
 | 
						|
            if recorded_sha:
 | 
						|
                actual_sha = compute_sha256(manifest_path)
 | 
						|
                if actual_sha != recorded_sha:
 | 
						|
                    errors.append(
 | 
						|
                        f"Debug manifest SHA mismatch (recorded {recorded_sha}, computed {actual_sha})"
 | 
						|
                    )
 | 
						|
            sha_sidecar = manifest_path.with_suffix(manifest_path.suffix + ".sha256")
 | 
						|
            sidecar_sha = parse_sha_file(sha_sidecar)
 | 
						|
            if sidecar_sha and recorded_sha and sidecar_sha != recorded_sha:
 | 
						|
                errors.append(
 | 
						|
                    f"Debug manifest sidecar digest {sidecar_sha} disagrees with recorded {recorded_sha}"
 | 
						|
                )
 | 
						|
            try:
 | 
						|
                with manifest_path.open("r", encoding="utf-8") as handle:
 | 
						|
                    manifest_data = json.load(handle)
 | 
						|
            except json.JSONDecodeError as exc:
 | 
						|
                errors.append(f"Debug manifest JSON invalid: {exc}")
 | 
						|
    directory = debug.get("directory")
 | 
						|
    if directory:
 | 
						|
        debug_dir = resolve_path(str(directory), release_dir)
 | 
						|
        if not debug_dir.exists():
 | 
						|
            errors.append(f"Debug directory missing → {debug_dir}")
 | 
						|
 | 
						|
    if manifest_data:
 | 
						|
        artifacts = manifest_data.get("artifacts")
 | 
						|
        if not isinstance(artifacts, list) or not artifacts:
 | 
						|
            errors.append("Debug manifest contains no artefacts.")
 | 
						|
            return
 | 
						|
 | 
						|
        declared_entries = debug.get("entries")
 | 
						|
        if isinstance(declared_entries, int) and declared_entries != len(artifacts):
 | 
						|
            errors.append(
 | 
						|
                f"Debug manifest reports {declared_entries} entries but contains {len(artifacts)} artefacts."
 | 
						|
            )
 | 
						|
 | 
						|
        for artefact in artifacts:
 | 
						|
            if not isinstance(artefact, Mapping):
 | 
						|
                errors.append("Debug manifest artefact entry is not a mapping.")
 | 
						|
                continue
 | 
						|
            debug_path = artefact.get("debugPath")
 | 
						|
            artefact_sha = artefact.get("sha256")
 | 
						|
            if not debug_path or not artefact_sha:
 | 
						|
                errors.append("Debug manifest artefact missing debugPath or sha256.")
 | 
						|
                continue
 | 
						|
            resolved_debug = resolve_path(str(debug_path), release_dir)
 | 
						|
            if not resolved_debug.exists():
 | 
						|
                errors.append(f"Debug artefact missing → {resolved_debug}")
 | 
						|
                continue
 | 
						|
            actual_sha = compute_sha256(resolved_debug)
 | 
						|
            if actual_sha != artefact_sha:
 | 
						|
                errors.append(
 | 
						|
                    f"Debug artefact SHA mismatch for {resolved_debug} "
 | 
						|
                    f"(recorded {artefact_sha}, computed {actual_sha})"
 | 
						|
                )
 | 
						|
 | 
						|
 | 
						|
def verify_release(release_dir: pathlib.Path) -> None:
 | 
						|
    if not release_dir.exists():
 | 
						|
        raise VerificationError(f"Release directory not found: {release_dir}")
 | 
						|
    manifest = load_manifest(release_dir)
 | 
						|
    errors: list[str] = []
 | 
						|
    verify_manifest_hashes(manifest, release_dir, errors)
 | 
						|
    verify_components(manifest, release_dir, errors)
 | 
						|
    verify_collections(manifest, release_dir, errors)
 | 
						|
    verify_debug_store(manifest, release_dir, errors)
 | 
						|
    if errors:
 | 
						|
        bullet_list = "\n - ".join(errors)
 | 
						|
        raise VerificationError(f"Release verification failed:\n - {bullet_list}")
 | 
						|
 | 
						|
 | 
						|
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
 | 
						|
    parser = argparse.ArgumentParser(description=__doc__)
 | 
						|
    parser.add_argument(
 | 
						|
        "--release-dir",
 | 
						|
        type=pathlib.Path,
 | 
						|
        default=pathlib.Path("out/release"),
 | 
						|
        help="Path to the release artefact directory (default: %(default)s)",
 | 
						|
    )
 | 
						|
    return parser.parse_args(argv)
 | 
						|
 | 
						|
 | 
						|
def main(argv: list[str] | None = None) -> int:
 | 
						|
    args = parse_args(argv)
 | 
						|
    try:
 | 
						|
        verify_release(args.release_dir.resolve())
 | 
						|
    except VerificationError as exc:
 | 
						|
        print(str(exc), file=sys.stderr)
 | 
						|
        return 1
 | 
						|
    print(f"✅ Release artefacts verified OK in {args.release_dir}")
 | 
						|
    return 0
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    raise SystemExit(main())
 |