#!/usr/bin/env python3 """ Offline validator for Task Pack bundles. Validates the offline bundle manifest against a minimal rule set: - Required fields (schemaVersion, pack, plan, evidence, security, hashes, slo, tenant/environment). - Plan hash algorithm and canonical plan digest. - Presence and SHA-256 digests for referenced files (inputs.lock, approvals ledger, SBOM, revocations, DSSE envelopes). - Sandbox quotas/egress allowlist and SLO sanity. This script is deterministic: it emits sorted findings and never touches network resources. """ from __future__ import annotations import argparse import datetime as dt import hashlib import json import os import sys import tarfile from dataclasses import dataclass from typing import Dict, Iterable, List, Optional @dataclass class ValidationError: path: str message: str def __str__(self) -> str: return f"{self.path}: {self.message}" class BundleReader: def __init__(self, bundle_path: str): self.bundle_path = bundle_path self._tar: Optional[tarfile.TarFile] = None if tarfile.is_tarfile(bundle_path): self._tar = tarfile.open(bundle_path, mode="r:*") def exists(self, path: str) -> bool: if self._tar: try: self._tar.getmember(path) return True except KeyError: return False return os.path.exists(os.path.join(self.bundle_path, path)) def read_bytes(self, path: str) -> bytes: if self._tar: try: member = self._tar.getmember(path) except KeyError as exc: raise FileNotFoundError(path) from exc fileobj = self._tar.extractfile(member) if fileobj is None: raise FileNotFoundError(path) return fileobj.read() with open(os.path.join(self.bundle_path, path), "rb") as handle: return handle.read() def close(self) -> None: if self._tar: self._tar.close() def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Verify StellaOps Task Pack offline bundle deterministically." ) parser.add_argument( "--bundle", required=True, help="Path to bundle directory or tarball containing bundle manifest + artefacts.", ) parser.add_argument( "--manifest", default="bundle.json", help="Relative path to the offline bundle manifest inside the bundle (default: bundle.json).", ) parser.add_argument( "--require-dsse", action="store_true", help="Fail if DSSE envelope files are missing.", ) return parser.parse_args() def load_manifest(reader: BundleReader, manifest_path: str) -> Dict: raw = reader.read_bytes(manifest_path) try: return json.loads(raw) except json.JSONDecodeError as exc: raise ValueError(f"Manifest {manifest_path} is not valid JSON: {exc}") from exc def sha256_digest(data: bytes) -> str: return f"sha256:{hashlib.sha256(data).hexdigest()}" def parse_timestamp(value: str) -> bool: try: if value.endswith("Z"): dt.datetime.strptime(value, "%Y-%m-%dT%H:%M:%SZ") else: dt.datetime.fromisoformat(value) return True except (ValueError, TypeError): return False def validate_manifest(manifest: Dict) -> List[ValidationError]: required_top = [ "schemaVersion", "pack", "plan", "evidence", "security", "hashes", "slo", "tenant", "environment", "created", ] errors: List[ValidationError] = [] for key in required_top: if key not in manifest: errors.append(ValidationError(key, "is required")) if manifest.get("schemaVersion") != "stellaops.pack.offline-bundle.v1": errors.append( ValidationError( "schemaVersion", "must equal stellaops.pack.offline-bundle.v1" ) ) plan = manifest.get("plan", {}) if plan.get("hashAlgorithm") != "sha256": errors.append(ValidationError("plan.hashAlgorithm", "must be sha256")) for numeric_field in [ ("slo.runP95Seconds", manifest.get("slo", {}).get("runP95Seconds")), ("slo.approvalP95Seconds", manifest.get("slo", {}).get("approvalP95Seconds")), ("slo.maxQueueDepth", manifest.get("slo", {}).get("maxQueueDepth")), ]: field, value = numeric_field if value is None or not isinstance(value, (int, float)) or value <= 0: errors.append(ValidationError(field, "must be > 0")) for ts_field in ["created", "expires"]: ts_value = manifest.get(ts_field) if ts_value and not parse_timestamp(ts_value): errors.append(ValidationError(ts_field, "must be ISO-8601 (UTC recommended)")) sandbox = manifest.get("security", {}).get("sandbox", {}) for quota_field in [ ("security.sandbox.cpuLimitMillicores", sandbox.get("cpuLimitMillicores")), ("security.sandbox.memoryLimitMiB", sandbox.get("memoryLimitMiB")), ]: field, value = quota_field if value is None or not isinstance(value, (int, float)) or value <= 0: errors.append(ValidationError(field, "must be > 0")) allowlist = sandbox.get("egressAllowlist") if allowlist is None or not isinstance(allowlist, list): errors.append( ValidationError( "security.sandbox.egressAllowlist", "must be an explicit list (empty list means fully sealed)", ) ) return errors def verify_hashes(reader: BundleReader, manifest: Dict) -> List[ValidationError]: errors: List[ValidationError] = [] hashes = manifest.get("hashes", []) if not isinstance(hashes, list): return [ValidationError("hashes", "must be an array of digest entries")] for entry in hashes: path = entry.get("path") algo = entry.get("algorithm") expected = entry.get("digest") if not path or algo != "sha256" or not expected: errors.append( ValidationError( f"hashes[{path or '?'}]", "path, algorithm=sha256, and digest are required" ) ) continue if not reader.exists(path): errors.append(ValidationError(path, "referenced in hashes but missing from bundle")) continue actual = sha256_digest(reader.read_bytes(path)) if actual != expected: errors.append( ValidationError( path, f"digest mismatch (expected {expected}, got {actual})" ) ) return errors def verify_files(reader: BundleReader, manifest: Dict, require_dsse: bool) -> List[ValidationError]: errors: List[ValidationError] = [] paths = [ ("plan.canonicalPlanPath", manifest.get("plan", {}).get("canonicalPlanPath")), ("plan.inputsLock", manifest.get("plan", {}).get("inputsLock")), ("pack.sbom", manifest.get("pack", {}).get("sbom")), ("evidence.attestation", manifest.get("evidence", {}).get("attestation")), ("evidence.approvalsLedger", manifest.get("evidence", {}).get("approvalsLedger")), ("security.revocations", manifest.get("security", {}).get("revocations")), ("security.secretsRedactionPolicy", manifest.get("security", {}).get("secretsRedactionPolicy")), ] if require_dsse: signatures = manifest.get("security", {}).get("signatures", {}) paths.extend( [ ("security.signatures.bundleDsse", signatures.get("bundleDsse")), ("security.signatures.attestationDsse", signatures.get("attestationDsse")), ] ) missing = [ ValidationError(path_label, "file path missing") for path_label, path_value in paths if not path_value ] for err in missing: errors.append(err) for path_label, path_value in paths: if not path_value: continue if not reader.exists(path_value): errors.append(ValidationError(path_label, f"{path_value} not found in bundle")) # Verify canonical plan hash if the file exists. plan_path = manifest.get("plan", {}).get("canonicalPlanPath") expected_plan_hash = manifest.get("plan", {}).get("hash") if plan_path and expected_plan_hash and reader.exists(plan_path): actual_plan_hash = sha256_digest(reader.read_bytes(plan_path)) if actual_plan_hash != expected_plan_hash: errors.append( ValidationError( "plan.hash", f"hash mismatch (expected {expected_plan_hash}, got {actual_plan_hash})", ) ) return errors def main() -> int: args = parse_args() reader = BundleReader(args.bundle) try: manifest = load_manifest(reader, args.manifest) errors: List[ValidationError] = [] errors.extend(validate_manifest(manifest)) errors.extend(verify_files(reader, manifest, require_dsse=args.require_dsse)) errors.extend(verify_hashes(reader, manifest)) if errors: for item in sorted(errors, key=lambda e: e.path): print(f"[FAIL] {item}") return 1 print("[OK] offline bundle validated") return 0 finally: reader.close() if __name__ == "__main__": sys.exit(main())