- Implemented PolicyPackSelectorComponent for selecting policy packs. - Added unit tests for component behavior, including API success and error handling. - Introduced monaco-workers type declarations for editor workers. - Created acceptance tests for guardrails with stubs for AT1–AT10. - Established SCA Failure Catalogue Fixtures for regression testing. - Developed plugin determinism harness with stubs for PL1–PL10. - Added scripts for evidence upload and verification processes.
306 lines
12 KiB
Python
306 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Offline validator for Task Pack bundles.
|
|
|
|
Validates the offline bundle manifest against a minimal rule set:
|
|
- Required fields (schemaVersion, pack, plan, evidence, security, hashes, slo, tenant/environment).
|
|
- Plan hash algorithm and canonical plan digest.
|
|
- Presence and SHA-256 digests for referenced files (inputs.lock, approvals ledger, SBOM, revocations, DSSE envelopes).
|
|
- Sandbox quotas/egress allowlist and SLO sanity.
|
|
|
|
This script is deterministic: it emits sorted findings and never touches network resources.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import sys
|
|
import tarfile
|
|
import re
|
|
from pathlib import Path
|
|
from dataclasses import dataclass
|
|
from typing import Dict, Iterable, List, Optional
|
|
|
|
|
|
@dataclass
|
|
class ValidationError:
|
|
path: str
|
|
message: str
|
|
|
|
def __str__(self) -> str:
|
|
return f"{self.path}: {self.message}"
|
|
|
|
|
|
class BundleReader:
|
|
def __init__(self, bundle_path: str):
|
|
self.bundle_path = bundle_path
|
|
self._tar: Optional[tarfile.TarFile] = None
|
|
if os.path.isdir(bundle_path):
|
|
self._tar = None
|
|
elif tarfile.is_tarfile(bundle_path):
|
|
self._tar = tarfile.open(bundle_path, mode="r:*")
|
|
|
|
def exists(self, path: str) -> bool:
|
|
if self._tar:
|
|
try:
|
|
self._tar.getmember(path)
|
|
return True
|
|
except KeyError:
|
|
return False
|
|
return os.path.exists(os.path.join(self.bundle_path, path))
|
|
|
|
def read_bytes(self, path: str) -> bytes:
|
|
if self._tar:
|
|
try:
|
|
member = self._tar.getmember(path)
|
|
except KeyError as exc:
|
|
raise FileNotFoundError(path) from exc
|
|
fileobj = self._tar.extractfile(member)
|
|
if fileobj is None:
|
|
raise FileNotFoundError(path)
|
|
return fileobj.read()
|
|
with open(os.path.join(self.bundle_path, path), "rb") as handle:
|
|
return handle.read()
|
|
|
|
def close(self) -> None:
|
|
if self._tar:
|
|
self._tar.close()
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Verify StellaOps Task Pack offline bundle deterministically."
|
|
)
|
|
parser.add_argument(
|
|
"--bundle",
|
|
required=False,
|
|
help="Path to bundle directory or tarball containing bundle manifest + artefacts.",
|
|
)
|
|
parser.add_argument(
|
|
"--fixture",
|
|
choices=["good", "bad"],
|
|
help="If set, uses built-in fixtures under scripts/packs/__fixtures__/ for quick checks.",
|
|
)
|
|
parser.add_argument(
|
|
"--manifest",
|
|
default="bundle.json",
|
|
help="Relative path to the offline bundle manifest inside the bundle (default: bundle.json).",
|
|
)
|
|
parser.add_argument(
|
|
"--require-dsse",
|
|
action="store_true",
|
|
help="Fail if DSSE envelope files are missing.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def load_manifest(reader: BundleReader, manifest_path: str) -> Dict:
|
|
raw = reader.read_bytes(manifest_path)
|
|
try:
|
|
return json.loads(raw)
|
|
except json.JSONDecodeError as exc:
|
|
raise ValueError(f"Manifest {manifest_path} is not valid JSON: {exc}") from exc
|
|
|
|
|
|
def sha256_digest(data: bytes) -> str:
|
|
return f"sha256:{hashlib.sha256(data).hexdigest()}"
|
|
|
|
|
|
def parse_timestamp(value: str) -> bool:
|
|
try:
|
|
if value.endswith("Z"):
|
|
dt.datetime.strptime(value, "%Y-%m-%dT%H:%M:%SZ")
|
|
else:
|
|
dt.datetime.fromisoformat(value)
|
|
return True
|
|
except (ValueError, TypeError):
|
|
return False
|
|
|
|
|
|
def validate_manifest(manifest: Dict) -> List[ValidationError]:
|
|
required_top = [
|
|
"schemaVersion",
|
|
"pack",
|
|
"plan",
|
|
"evidence",
|
|
"security",
|
|
"hashes",
|
|
"slo",
|
|
"tenant",
|
|
"environment",
|
|
"created",
|
|
]
|
|
errors: List[ValidationError] = []
|
|
for key in required_top:
|
|
if key not in manifest:
|
|
errors.append(ValidationError(key, "is required"))
|
|
if manifest.get("schemaVersion") != "stellaops.pack.offline-bundle.v1":
|
|
errors.append(
|
|
ValidationError(
|
|
"schemaVersion", "must equal stellaops.pack.offline-bundle.v1"
|
|
)
|
|
)
|
|
plan = manifest.get("plan", {})
|
|
if plan.get("hashAlgorithm") != "sha256":
|
|
errors.append(ValidationError("plan.hashAlgorithm", "must be sha256"))
|
|
for numeric_field in [
|
|
("slo.runP95Seconds", manifest.get("slo", {}).get("runP95Seconds")),
|
|
("slo.approvalP95Seconds", manifest.get("slo", {}).get("approvalP95Seconds")),
|
|
("slo.maxQueueDepth", manifest.get("slo", {}).get("maxQueueDepth")),
|
|
]:
|
|
field, value = numeric_field
|
|
if value is None or not isinstance(value, (int, float)) or value <= 0:
|
|
errors.append(ValidationError(field, "must be > 0"))
|
|
for ts_field in ["created", "expires"]:
|
|
ts_value = manifest.get(ts_field)
|
|
if ts_value and not parse_timestamp(ts_value):
|
|
errors.append(ValidationError(ts_field, "must be ISO-8601 (UTC recommended)"))
|
|
sandbox = manifest.get("security", {}).get("sandbox", {})
|
|
for quota_field in [
|
|
("security.sandbox.cpuLimitMillicores", sandbox.get("cpuLimitMillicores")),
|
|
("security.sandbox.memoryLimitMiB", sandbox.get("memoryLimitMiB")),
|
|
("security.sandbox.quotaSeconds", sandbox.get("quotaSeconds")),
|
|
]:
|
|
field, value = quota_field
|
|
if value is None or not isinstance(value, (int, float)) or value <= 0:
|
|
errors.append(ValidationError(field, "must be > 0"))
|
|
allowlist = sandbox.get("egressAllowlist")
|
|
if allowlist is None or not isinstance(allowlist, list):
|
|
errors.append(
|
|
ValidationError(
|
|
"security.sandbox.egressAllowlist",
|
|
"must be an explicit list (empty list means fully sealed)",
|
|
)
|
|
)
|
|
return errors
|
|
|
|
|
|
def verify_hashes(reader: BundleReader, manifest: Dict) -> List[ValidationError]:
|
|
errors: List[ValidationError] = []
|
|
hashes = manifest.get("hashes", [])
|
|
if not isinstance(hashes, list):
|
|
return [ValidationError("hashes", "must be an array of digest entries")]
|
|
for entry in hashes:
|
|
path = entry.get("path")
|
|
algo = entry.get("algorithm")
|
|
expected = entry.get("digest")
|
|
if not path or algo != "sha256" or not expected:
|
|
errors.append(
|
|
ValidationError(
|
|
f"hashes[{path or '?'}]", "path, algorithm=sha256, and digest are required"
|
|
)
|
|
)
|
|
continue
|
|
if not reader.exists(path):
|
|
errors.append(ValidationError(path, "referenced in hashes but missing from bundle"))
|
|
continue
|
|
actual = sha256_digest(reader.read_bytes(path))
|
|
if actual != expected:
|
|
errors.append(
|
|
ValidationError(
|
|
path, f"digest mismatch (expected {expected}, got {actual})"
|
|
)
|
|
)
|
|
return errors
|
|
|
|
|
|
def verify_files(reader: BundleReader, manifest: Dict, require_dsse: bool) -> List[ValidationError]:
|
|
errors: List[ValidationError] = []
|
|
paths = [
|
|
("plan.canonicalPlanPath", manifest.get("plan", {}).get("canonicalPlanPath")),
|
|
("plan.inputsLock", manifest.get("plan", {}).get("inputsLock")),
|
|
("pack.sbom", manifest.get("pack", {}).get("sbom")),
|
|
("evidence.attestation", manifest.get("evidence", {}).get("attestation")),
|
|
("evidence.approvalsLedger", manifest.get("evidence", {}).get("approvalsLedger")),
|
|
("security.revocations", manifest.get("security", {}).get("revocations")),
|
|
("security.secretsRedactionPolicy", manifest.get("security", {}).get("secretsRedactionPolicy")),
|
|
]
|
|
if require_dsse:
|
|
signatures = manifest.get("security", {}).get("signatures", {})
|
|
paths.extend(
|
|
[
|
|
("security.signatures.bundleDsse", signatures.get("bundleDsse")),
|
|
("security.signatures.attestationDsse", signatures.get("attestationDsse")),
|
|
]
|
|
)
|
|
missing = [
|
|
ValidationError(path_label, "file path missing")
|
|
for path_label, path_value in paths
|
|
if not path_value
|
|
]
|
|
for err in missing:
|
|
errors.append(err)
|
|
for path_label, path_value in paths:
|
|
if not path_value:
|
|
continue
|
|
if not reader.exists(path_value):
|
|
errors.append(ValidationError(path_label, f"{path_value} not found in bundle"))
|
|
# Verify canonical plan hash if the file exists.
|
|
plan_path = manifest.get("plan", {}).get("canonicalPlanPath")
|
|
expected_plan_hash = manifest.get("plan", {}).get("hash")
|
|
if plan_path and expected_plan_hash and reader.exists(plan_path):
|
|
actual_plan_hash = sha256_digest(reader.read_bytes(plan_path))
|
|
if actual_plan_hash != expected_plan_hash:
|
|
errors.append(
|
|
ValidationError(
|
|
"plan.hash",
|
|
f"hash mismatch (expected {expected_plan_hash}, got {actual_plan_hash})",
|
|
)
|
|
)
|
|
|
|
approvals_path = manifest.get("evidence", {}).get("approvalsLedger")
|
|
if approvals_path and reader.exists(approvals_path):
|
|
try:
|
|
approval_doc = json.loads(reader.read_bytes(approvals_path))
|
|
errors.extend(validate_approval_ledger(approval_doc, manifest.get("plan", {}).get("hash")))
|
|
except Exception as exc: # broad but scoped to ledger parse
|
|
errors.append(ValidationError("evidence.approvalsLedger", f"failed to parse ledger JSON: {exc}"))
|
|
return errors
|
|
|
|
|
|
def validate_approval_ledger(doc: Dict, expected_plan_hash: Optional[str]) -> List[ValidationError]:
|
|
errors: List[ValidationError] = []
|
|
if doc.get("schemaVersion") != "stellaops.pack.approval-ledger.v1":
|
|
errors.append(ValidationError("approvalsLedger.schemaVersion", "must be stellaops.pack.approval-ledger.v1"))
|
|
for field in ["runId", "gateId", "tenantId", "decision", "planHash", "decidedAt"]:
|
|
if not doc.get(field):
|
|
errors.append(ValidationError(f"approvalsLedger.{field}", "is required"))
|
|
plan_hash = doc.get("planHash")
|
|
if plan_hash and not re.match(r"^sha256:[0-9a-f]{64}$", plan_hash, re.IGNORECASE):
|
|
errors.append(ValidationError("approvalsLedger.planHash", "must be sha256:<64-hex>"))
|
|
if expected_plan_hash and plan_hash and plan_hash.lower() != expected_plan_hash.lower():
|
|
errors.append(ValidationError("approvalsLedger.planHash", "must match manifest.plan.hash"))
|
|
if doc.get("decision") not in {"approved", "rejected", "expired"}:
|
|
errors.append(ValidationError("approvalsLedger.decision", "must be approved|rejected|expired"))
|
|
return errors
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
bundle_path = args.bundle
|
|
if args.fixture:
|
|
bundle_path = Path(__file__).parent / "__fixtures__" / args.fixture
|
|
reader = BundleReader(bundle_path.as_posix() if isinstance(bundle_path, Path) else bundle_path)
|
|
try:
|
|
manifest = load_manifest(reader, args.manifest)
|
|
errors: List[ValidationError] = []
|
|
errors.extend(validate_manifest(manifest))
|
|
errors.extend(verify_files(reader, manifest, require_dsse=args.require_dsse))
|
|
errors.extend(verify_hashes(reader, manifest))
|
|
if errors:
|
|
for item in sorted(errors, key=lambda e: e.path):
|
|
print(f"[FAIL] {item}")
|
|
return 1
|
|
print("[OK] offline bundle validated")
|
|
return 0
|
|
finally:
|
|
reader.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|