#!/usr/bin/env python3 """ Offline validator for Task Pack bundles. Validates the offline bundle manifest against a minimal rule set: - Required fields (schemaVersion, pack, plan, evidence, security, hashes, slo, tenant/environment). - Plan hash algorithm and canonical plan digest. - Presence and SHA-256 digests for referenced files (inputs.lock, approvals ledger, SBOM, revocations, DSSE envelopes). - Sandbox quotas/egress allowlist and SLO sanity. This script is deterministic: it emits sorted findings and never touches network resources. """ from __future__ import annotations import argparse import datetime as dt import hashlib import json import os import sys import tarfile import re from pathlib import Path from dataclasses import dataclass from typing import Dict, Iterable, List, Optional @dataclass class ValidationError: path: str message: str def __str__(self) -> str: return f"{self.path}: {self.message}" class BundleReader: def __init__(self, bundle_path: str): self.bundle_path = bundle_path self._tar: Optional[tarfile.TarFile] = None if os.path.isdir(bundle_path): self._tar = None elif tarfile.is_tarfile(bundle_path): self._tar = tarfile.open(bundle_path, mode="r:*") def exists(self, path: str) -> bool: if self._tar: try: self._tar.getmember(path) return True except KeyError: return False return os.path.exists(os.path.join(self.bundle_path, path)) def read_bytes(self, path: str) -> bytes: if self._tar: try: member = self._tar.getmember(path) except KeyError as exc: raise FileNotFoundError(path) from exc fileobj = self._tar.extractfile(member) if fileobj is None: raise FileNotFoundError(path) return fileobj.read() with open(os.path.join(self.bundle_path, path), "rb") as handle: return handle.read() def close(self) -> None: if self._tar: self._tar.close() def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Verify StellaOps Task Pack offline bundle deterministically." ) parser.add_argument( "--bundle", required=False, help="Path to bundle directory or tarball containing bundle manifest + artefacts.", ) parser.add_argument( "--fixture", choices=["good", "bad"], help="If set, uses built-in fixtures under scripts/packs/__fixtures__/ for quick checks.", ) parser.add_argument( "--manifest", default="bundle.json", help="Relative path to the offline bundle manifest inside the bundle (default: bundle.json).", ) parser.add_argument( "--require-dsse", action="store_true", help="Fail if DSSE envelope files are missing.", ) return parser.parse_args() def load_manifest(reader: BundleReader, manifest_path: str) -> Dict: raw = reader.read_bytes(manifest_path) try: return json.loads(raw) except json.JSONDecodeError as exc: raise ValueError(f"Manifest {manifest_path} is not valid JSON: {exc}") from exc def sha256_digest(data: bytes) -> str: return f"sha256:{hashlib.sha256(data).hexdigest()}" def parse_timestamp(value: str) -> bool: try: if value.endswith("Z"): dt.datetime.strptime(value, "%Y-%m-%dT%H:%M:%SZ") else: dt.datetime.fromisoformat(value) return True except (ValueError, TypeError): return False def validate_manifest(manifest: Dict) -> List[ValidationError]: required_top = [ "schemaVersion", "pack", "plan", "evidence", "security", "hashes", "slo", "tenant", "environment", "created", ] errors: List[ValidationError] = [] for key in required_top: if key not in manifest: errors.append(ValidationError(key, "is required")) if manifest.get("schemaVersion") != "stellaops.pack.offline-bundle.v1": errors.append( ValidationError( "schemaVersion", "must equal stellaops.pack.offline-bundle.v1" ) ) plan = manifest.get("plan", {}) if plan.get("hashAlgorithm") != "sha256": errors.append(ValidationError("plan.hashAlgorithm", "must be sha256")) for numeric_field in [ ("slo.runP95Seconds", manifest.get("slo", {}).get("runP95Seconds")), ("slo.approvalP95Seconds", manifest.get("slo", {}).get("approvalP95Seconds")), ("slo.maxQueueDepth", manifest.get("slo", {}).get("maxQueueDepth")), ]: field, value = numeric_field if value is None or not isinstance(value, (int, float)) or value <= 0: errors.append(ValidationError(field, "must be > 0")) for ts_field in ["created", "expires"]: ts_value = manifest.get(ts_field) if ts_value and not parse_timestamp(ts_value): errors.append(ValidationError(ts_field, "must be ISO-8601 (UTC recommended)")) sandbox = manifest.get("security", {}).get("sandbox", {}) for quota_field in [ ("security.sandbox.cpuLimitMillicores", sandbox.get("cpuLimitMillicores")), ("security.sandbox.memoryLimitMiB", sandbox.get("memoryLimitMiB")), ("security.sandbox.quotaSeconds", sandbox.get("quotaSeconds")), ]: field, value = quota_field if value is None or not isinstance(value, (int, float)) or value <= 0: errors.append(ValidationError(field, "must be > 0")) allowlist = sandbox.get("egressAllowlist") if allowlist is None or not isinstance(allowlist, list): errors.append( ValidationError( "security.sandbox.egressAllowlist", "must be an explicit list (empty list means fully sealed)", ) ) return errors def verify_hashes(reader: BundleReader, manifest: Dict) -> List[ValidationError]: errors: List[ValidationError] = [] hashes = manifest.get("hashes", []) if not isinstance(hashes, list): return [ValidationError("hashes", "must be an array of digest entries")] for entry in hashes: path = entry.get("path") algo = entry.get("algorithm") expected = entry.get("digest") if not path or algo != "sha256" or not expected: errors.append( ValidationError( f"hashes[{path or '?'}]", "path, algorithm=sha256, and digest are required" ) ) continue if not reader.exists(path): errors.append(ValidationError(path, "referenced in hashes but missing from bundle")) continue actual = sha256_digest(reader.read_bytes(path)) if actual != expected: errors.append( ValidationError( path, f"digest mismatch (expected {expected}, got {actual})" ) ) return errors def verify_files(reader: BundleReader, manifest: Dict, require_dsse: bool) -> List[ValidationError]: errors: List[ValidationError] = [] paths = [ ("plan.canonicalPlanPath", manifest.get("plan", {}).get("canonicalPlanPath")), ("plan.inputsLock", manifest.get("plan", {}).get("inputsLock")), ("pack.sbom", manifest.get("pack", {}).get("sbom")), ("evidence.attestation", manifest.get("evidence", {}).get("attestation")), ("evidence.approvalsLedger", manifest.get("evidence", {}).get("approvalsLedger")), ("security.revocations", manifest.get("security", {}).get("revocations")), ("security.secretsRedactionPolicy", manifest.get("security", {}).get("secretsRedactionPolicy")), ] if require_dsse: signatures = manifest.get("security", {}).get("signatures", {}) paths.extend( [ ("security.signatures.bundleDsse", signatures.get("bundleDsse")), ("security.signatures.attestationDsse", signatures.get("attestationDsse")), ] ) missing = [ ValidationError(path_label, "file path missing") for path_label, path_value in paths if not path_value ] for err in missing: errors.append(err) for path_label, path_value in paths: if not path_value: continue if not reader.exists(path_value): errors.append(ValidationError(path_label, f"{path_value} not found in bundle")) # Verify canonical plan hash if the file exists. plan_path = manifest.get("plan", {}).get("canonicalPlanPath") expected_plan_hash = manifest.get("plan", {}).get("hash") if plan_path and expected_plan_hash and reader.exists(plan_path): actual_plan_hash = sha256_digest(reader.read_bytes(plan_path)) if actual_plan_hash != expected_plan_hash: errors.append( ValidationError( "plan.hash", f"hash mismatch (expected {expected_plan_hash}, got {actual_plan_hash})", ) ) approvals_path = manifest.get("evidence", {}).get("approvalsLedger") if approvals_path and reader.exists(approvals_path): try: approval_doc = json.loads(reader.read_bytes(approvals_path)) errors.extend(validate_approval_ledger(approval_doc, manifest.get("plan", {}).get("hash"))) except Exception as exc: # broad but scoped to ledger parse errors.append(ValidationError("evidence.approvalsLedger", f"failed to parse ledger JSON: {exc}")) return errors def validate_approval_ledger(doc: Dict, expected_plan_hash: Optional[str]) -> List[ValidationError]: errors: List[ValidationError] = [] if doc.get("schemaVersion") != "stellaops.pack.approval-ledger.v1": errors.append(ValidationError("approvalsLedger.schemaVersion", "must be stellaops.pack.approval-ledger.v1")) for field in ["runId", "gateId", "tenantId", "decision", "planHash", "decidedAt"]: if not doc.get(field): errors.append(ValidationError(f"approvalsLedger.{field}", "is required")) plan_hash = doc.get("planHash") if plan_hash and not re.match(r"^sha256:[0-9a-f]{64}$", plan_hash, re.IGNORECASE): errors.append(ValidationError("approvalsLedger.planHash", "must be sha256:<64-hex>")) if expected_plan_hash and plan_hash and plan_hash.lower() != expected_plan_hash.lower(): errors.append(ValidationError("approvalsLedger.planHash", "must match manifest.plan.hash")) if doc.get("decision") not in {"approved", "rejected", "expired"}: errors.append(ValidationError("approvalsLedger.decision", "must be approved|rejected|expired")) return errors def main() -> int: args = parse_args() bundle_path = args.bundle if args.fixture: bundle_path = Path(__file__).parent / "__fixtures__" / args.fixture reader = BundleReader(bundle_path.as_posix() if isinstance(bundle_path, Path) else bundle_path) try: manifest = load_manifest(reader, args.manifest) errors: List[ValidationError] = [] errors.extend(validate_manifest(manifest)) errors.extend(verify_files(reader, manifest, require_dsse=args.require_dsse)) errors.extend(verify_hashes(reader, manifest)) if errors: for item in sorted(errors, key=lambda e: e.path): print(f"[FAIL] {item}") return 1 print("[OK] offline bundle validated") return 0 finally: reader.close() if __name__ == "__main__": sys.exit(main())