Files
git.stella-ops.org/scripts/packs/verify_offline_bundle.py
StellaOps Bot 18d87c64c5 feat: add PolicyPackSelectorComponent with tests and integration
- Implemented PolicyPackSelectorComponent for selecting policy packs.
- Added unit tests for component behavior, including API success and error handling.
- Introduced monaco-workers type declarations for editor workers.
- Created acceptance tests for guardrails with stubs for AT1–AT10.
- Established SCA Failure Catalogue Fixtures for regression testing.
- Developed plugin determinism harness with stubs for PL1–PL10.
- Added scripts for evidence upload and verification processes.
2025-12-05 21:24:34 +02:00

306 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Offline validator for Task Pack bundles.
Validates the offline bundle manifest against a minimal rule set:
- Required fields (schemaVersion, pack, plan, evidence, security, hashes, slo, tenant/environment).
- Plan hash algorithm and canonical plan digest.
- Presence and SHA-256 digests for referenced files (inputs.lock, approvals ledger, SBOM, revocations, DSSE envelopes).
- Sandbox quotas/egress allowlist and SLO sanity.
This script is deterministic: it emits sorted findings and never touches network resources.
"""
from __future__ import annotations
import argparse
import datetime as dt
import hashlib
import json
import os
import sys
import tarfile
import re
from pathlib import Path
from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional
@dataclass
class ValidationError:
path: str
message: str
def __str__(self) -> str:
return f"{self.path}: {self.message}"
class BundleReader:
def __init__(self, bundle_path: str):
self.bundle_path = bundle_path
self._tar: Optional[tarfile.TarFile] = None
if os.path.isdir(bundle_path):
self._tar = None
elif tarfile.is_tarfile(bundle_path):
self._tar = tarfile.open(bundle_path, mode="r:*")
def exists(self, path: str) -> bool:
if self._tar:
try:
self._tar.getmember(path)
return True
except KeyError:
return False
return os.path.exists(os.path.join(self.bundle_path, path))
def read_bytes(self, path: str) -> bytes:
if self._tar:
try:
member = self._tar.getmember(path)
except KeyError as exc:
raise FileNotFoundError(path) from exc
fileobj = self._tar.extractfile(member)
if fileobj is None:
raise FileNotFoundError(path)
return fileobj.read()
with open(os.path.join(self.bundle_path, path), "rb") as handle:
return handle.read()
def close(self) -> None:
if self._tar:
self._tar.close()
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Verify StellaOps Task Pack offline bundle deterministically."
)
parser.add_argument(
"--bundle",
required=False,
help="Path to bundle directory or tarball containing bundle manifest + artefacts.",
)
parser.add_argument(
"--fixture",
choices=["good", "bad"],
help="If set, uses built-in fixtures under scripts/packs/__fixtures__/ for quick checks.",
)
parser.add_argument(
"--manifest",
default="bundle.json",
help="Relative path to the offline bundle manifest inside the bundle (default: bundle.json).",
)
parser.add_argument(
"--require-dsse",
action="store_true",
help="Fail if DSSE envelope files are missing.",
)
return parser.parse_args()
def load_manifest(reader: BundleReader, manifest_path: str) -> Dict:
raw = reader.read_bytes(manifest_path)
try:
return json.loads(raw)
except json.JSONDecodeError as exc:
raise ValueError(f"Manifest {manifest_path} is not valid JSON: {exc}") from exc
def sha256_digest(data: bytes) -> str:
return f"sha256:{hashlib.sha256(data).hexdigest()}"
def parse_timestamp(value: str) -> bool:
try:
if value.endswith("Z"):
dt.datetime.strptime(value, "%Y-%m-%dT%H:%M:%SZ")
else:
dt.datetime.fromisoformat(value)
return True
except (ValueError, TypeError):
return False
def validate_manifest(manifest: Dict) -> List[ValidationError]:
required_top = [
"schemaVersion",
"pack",
"plan",
"evidence",
"security",
"hashes",
"slo",
"tenant",
"environment",
"created",
]
errors: List[ValidationError] = []
for key in required_top:
if key not in manifest:
errors.append(ValidationError(key, "is required"))
if manifest.get("schemaVersion") != "stellaops.pack.offline-bundle.v1":
errors.append(
ValidationError(
"schemaVersion", "must equal stellaops.pack.offline-bundle.v1"
)
)
plan = manifest.get("plan", {})
if plan.get("hashAlgorithm") != "sha256":
errors.append(ValidationError("plan.hashAlgorithm", "must be sha256"))
for numeric_field in [
("slo.runP95Seconds", manifest.get("slo", {}).get("runP95Seconds")),
("slo.approvalP95Seconds", manifest.get("slo", {}).get("approvalP95Seconds")),
("slo.maxQueueDepth", manifest.get("slo", {}).get("maxQueueDepth")),
]:
field, value = numeric_field
if value is None or not isinstance(value, (int, float)) or value <= 0:
errors.append(ValidationError(field, "must be > 0"))
for ts_field in ["created", "expires"]:
ts_value = manifest.get(ts_field)
if ts_value and not parse_timestamp(ts_value):
errors.append(ValidationError(ts_field, "must be ISO-8601 (UTC recommended)"))
sandbox = manifest.get("security", {}).get("sandbox", {})
for quota_field in [
("security.sandbox.cpuLimitMillicores", sandbox.get("cpuLimitMillicores")),
("security.sandbox.memoryLimitMiB", sandbox.get("memoryLimitMiB")),
("security.sandbox.quotaSeconds", sandbox.get("quotaSeconds")),
]:
field, value = quota_field
if value is None or not isinstance(value, (int, float)) or value <= 0:
errors.append(ValidationError(field, "must be > 0"))
allowlist = sandbox.get("egressAllowlist")
if allowlist is None or not isinstance(allowlist, list):
errors.append(
ValidationError(
"security.sandbox.egressAllowlist",
"must be an explicit list (empty list means fully sealed)",
)
)
return errors
def verify_hashes(reader: BundleReader, manifest: Dict) -> List[ValidationError]:
errors: List[ValidationError] = []
hashes = manifest.get("hashes", [])
if not isinstance(hashes, list):
return [ValidationError("hashes", "must be an array of digest entries")]
for entry in hashes:
path = entry.get("path")
algo = entry.get("algorithm")
expected = entry.get("digest")
if not path or algo != "sha256" or not expected:
errors.append(
ValidationError(
f"hashes[{path or '?'}]", "path, algorithm=sha256, and digest are required"
)
)
continue
if not reader.exists(path):
errors.append(ValidationError(path, "referenced in hashes but missing from bundle"))
continue
actual = sha256_digest(reader.read_bytes(path))
if actual != expected:
errors.append(
ValidationError(
path, f"digest mismatch (expected {expected}, got {actual})"
)
)
return errors
def verify_files(reader: BundleReader, manifest: Dict, require_dsse: bool) -> List[ValidationError]:
errors: List[ValidationError] = []
paths = [
("plan.canonicalPlanPath", manifest.get("plan", {}).get("canonicalPlanPath")),
("plan.inputsLock", manifest.get("plan", {}).get("inputsLock")),
("pack.sbom", manifest.get("pack", {}).get("sbom")),
("evidence.attestation", manifest.get("evidence", {}).get("attestation")),
("evidence.approvalsLedger", manifest.get("evidence", {}).get("approvalsLedger")),
("security.revocations", manifest.get("security", {}).get("revocations")),
("security.secretsRedactionPolicy", manifest.get("security", {}).get("secretsRedactionPolicy")),
]
if require_dsse:
signatures = manifest.get("security", {}).get("signatures", {})
paths.extend(
[
("security.signatures.bundleDsse", signatures.get("bundleDsse")),
("security.signatures.attestationDsse", signatures.get("attestationDsse")),
]
)
missing = [
ValidationError(path_label, "file path missing")
for path_label, path_value in paths
if not path_value
]
for err in missing:
errors.append(err)
for path_label, path_value in paths:
if not path_value:
continue
if not reader.exists(path_value):
errors.append(ValidationError(path_label, f"{path_value} not found in bundle"))
# Verify canonical plan hash if the file exists.
plan_path = manifest.get("plan", {}).get("canonicalPlanPath")
expected_plan_hash = manifest.get("plan", {}).get("hash")
if plan_path and expected_plan_hash and reader.exists(plan_path):
actual_plan_hash = sha256_digest(reader.read_bytes(plan_path))
if actual_plan_hash != expected_plan_hash:
errors.append(
ValidationError(
"plan.hash",
f"hash mismatch (expected {expected_plan_hash}, got {actual_plan_hash})",
)
)
approvals_path = manifest.get("evidence", {}).get("approvalsLedger")
if approvals_path and reader.exists(approvals_path):
try:
approval_doc = json.loads(reader.read_bytes(approvals_path))
errors.extend(validate_approval_ledger(approval_doc, manifest.get("plan", {}).get("hash")))
except Exception as exc: # broad but scoped to ledger parse
errors.append(ValidationError("evidence.approvalsLedger", f"failed to parse ledger JSON: {exc}"))
return errors
def validate_approval_ledger(doc: Dict, expected_plan_hash: Optional[str]) -> List[ValidationError]:
errors: List[ValidationError] = []
if doc.get("schemaVersion") != "stellaops.pack.approval-ledger.v1":
errors.append(ValidationError("approvalsLedger.schemaVersion", "must be stellaops.pack.approval-ledger.v1"))
for field in ["runId", "gateId", "tenantId", "decision", "planHash", "decidedAt"]:
if not doc.get(field):
errors.append(ValidationError(f"approvalsLedger.{field}", "is required"))
plan_hash = doc.get("planHash")
if plan_hash and not re.match(r"^sha256:[0-9a-f]{64}$", plan_hash, re.IGNORECASE):
errors.append(ValidationError("approvalsLedger.planHash", "must be sha256:<64-hex>"))
if expected_plan_hash and plan_hash and plan_hash.lower() != expected_plan_hash.lower():
errors.append(ValidationError("approvalsLedger.planHash", "must match manifest.plan.hash"))
if doc.get("decision") not in {"approved", "rejected", "expired"}:
errors.append(ValidationError("approvalsLedger.decision", "must be approved|rejected|expired"))
return errors
def main() -> int:
args = parse_args()
bundle_path = args.bundle
if args.fixture:
bundle_path = Path(__file__).parent / "__fixtures__" / args.fixture
reader = BundleReader(bundle_path.as_posix() if isinstance(bundle_path, Path) else bundle_path)
try:
manifest = load_manifest(reader, args.manifest)
errors: List[ValidationError] = []
errors.extend(validate_manifest(manifest))
errors.extend(verify_files(reader, manifest, require_dsse=args.require_dsse))
errors.extend(verify_hashes(reader, manifest))
if errors:
for item in sorted(errors, key=lambda e: e.path):
print(f"[FAIL] {item}")
return 1
print("[OK] offline bundle validated")
return 0
finally:
reader.close()
if __name__ == "__main__":
sys.exit(main())