Files
git.stella-ops.org/scripts/packs/verify_offline_bundle.py
StellaOps Bot 53508ceccb
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
Add unit tests and logging infrastructure for InMemory and RabbitMQ transports
- Implemented RecordingLogger and RecordingLoggerFactory for capturing log entries in tests.
- Added unit tests for InMemoryChannel, covering constructor behavior, property assignments, channel communication, and disposal.
- Created InMemoryTransportOptionsTests to validate default values and customizable options for InMemory transport.
- Developed RabbitMqFrameProtocolTests to ensure correct parsing and property creation for RabbitMQ frames.
- Added RabbitMqTransportOptionsTests to verify default settings and customization options for RabbitMQ transport.
- Updated project files for testing libraries and dependencies.
2025-12-05 09:38:45 +02:00

268 lines
9.4 KiB
Python

#!/usr/bin/env python3
"""
Offline validator for Task Pack bundles.
Validates the offline bundle manifest against a minimal rule set:
- Required fields (schemaVersion, pack, plan, evidence, security, hashes, slo, tenant/environment).
- Plan hash algorithm and canonical plan digest.
- Presence and SHA-256 digests for referenced files (inputs.lock, approvals ledger, SBOM, revocations, DSSE envelopes).
- Sandbox quotas/egress allowlist and SLO sanity.
This script is deterministic: it emits sorted findings and never touches network resources.
"""
from __future__ import annotations
import argparse
import datetime as dt
import hashlib
import json
import os
import sys
import tarfile
from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional
@dataclass
class ValidationError:
path: str
message: str
def __str__(self) -> str:
return f"{self.path}: {self.message}"
class BundleReader:
def __init__(self, bundle_path: str):
self.bundle_path = bundle_path
self._tar: Optional[tarfile.TarFile] = None
if tarfile.is_tarfile(bundle_path):
self._tar = tarfile.open(bundle_path, mode="r:*")
def exists(self, path: str) -> bool:
if self._tar:
try:
self._tar.getmember(path)
return True
except KeyError:
return False
return os.path.exists(os.path.join(self.bundle_path, path))
def read_bytes(self, path: str) -> bytes:
if self._tar:
try:
member = self._tar.getmember(path)
except KeyError as exc:
raise FileNotFoundError(path) from exc
fileobj = self._tar.extractfile(member)
if fileobj is None:
raise FileNotFoundError(path)
return fileobj.read()
with open(os.path.join(self.bundle_path, path), "rb") as handle:
return handle.read()
def close(self) -> None:
if self._tar:
self._tar.close()
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Verify StellaOps Task Pack offline bundle deterministically."
)
parser.add_argument(
"--bundle",
required=True,
help="Path to bundle directory or tarball containing bundle manifest + artefacts.",
)
parser.add_argument(
"--manifest",
default="bundle.json",
help="Relative path to the offline bundle manifest inside the bundle (default: bundle.json).",
)
parser.add_argument(
"--require-dsse",
action="store_true",
help="Fail if DSSE envelope files are missing.",
)
return parser.parse_args()
def load_manifest(reader: BundleReader, manifest_path: str) -> Dict:
raw = reader.read_bytes(manifest_path)
try:
return json.loads(raw)
except json.JSONDecodeError as exc:
raise ValueError(f"Manifest {manifest_path} is not valid JSON: {exc}") from exc
def sha256_digest(data: bytes) -> str:
return f"sha256:{hashlib.sha256(data).hexdigest()}"
def parse_timestamp(value: str) -> bool:
try:
if value.endswith("Z"):
dt.datetime.strptime(value, "%Y-%m-%dT%H:%M:%SZ")
else:
dt.datetime.fromisoformat(value)
return True
except (ValueError, TypeError):
return False
def validate_manifest(manifest: Dict) -> List[ValidationError]:
required_top = [
"schemaVersion",
"pack",
"plan",
"evidence",
"security",
"hashes",
"slo",
"tenant",
"environment",
"created",
]
errors: List[ValidationError] = []
for key in required_top:
if key not in manifest:
errors.append(ValidationError(key, "is required"))
if manifest.get("schemaVersion") != "stellaops.pack.offline-bundle.v1":
errors.append(
ValidationError(
"schemaVersion", "must equal stellaops.pack.offline-bundle.v1"
)
)
plan = manifest.get("plan", {})
if plan.get("hashAlgorithm") != "sha256":
errors.append(ValidationError("plan.hashAlgorithm", "must be sha256"))
for numeric_field in [
("slo.runP95Seconds", manifest.get("slo", {}).get("runP95Seconds")),
("slo.approvalP95Seconds", manifest.get("slo", {}).get("approvalP95Seconds")),
("slo.maxQueueDepth", manifest.get("slo", {}).get("maxQueueDepth")),
]:
field, value = numeric_field
if value is None or not isinstance(value, (int, float)) or value <= 0:
errors.append(ValidationError(field, "must be > 0"))
for ts_field in ["created", "expires"]:
ts_value = manifest.get(ts_field)
if ts_value and not parse_timestamp(ts_value):
errors.append(ValidationError(ts_field, "must be ISO-8601 (UTC recommended)"))
sandbox = manifest.get("security", {}).get("sandbox", {})
for quota_field in [
("security.sandbox.cpuLimitMillicores", sandbox.get("cpuLimitMillicores")),
("security.sandbox.memoryLimitMiB", sandbox.get("memoryLimitMiB")),
]:
field, value = quota_field
if value is None or not isinstance(value, (int, float)) or value <= 0:
errors.append(ValidationError(field, "must be > 0"))
allowlist = sandbox.get("egressAllowlist")
if allowlist is None or not isinstance(allowlist, list):
errors.append(
ValidationError(
"security.sandbox.egressAllowlist",
"must be an explicit list (empty list means fully sealed)",
)
)
return errors
def verify_hashes(reader: BundleReader, manifest: Dict) -> List[ValidationError]:
errors: List[ValidationError] = []
hashes = manifest.get("hashes", [])
if not isinstance(hashes, list):
return [ValidationError("hashes", "must be an array of digest entries")]
for entry in hashes:
path = entry.get("path")
algo = entry.get("algorithm")
expected = entry.get("digest")
if not path or algo != "sha256" or not expected:
errors.append(
ValidationError(
f"hashes[{path or '?'}]", "path, algorithm=sha256, and digest are required"
)
)
continue
if not reader.exists(path):
errors.append(ValidationError(path, "referenced in hashes but missing from bundle"))
continue
actual = sha256_digest(reader.read_bytes(path))
if actual != expected:
errors.append(
ValidationError(
path, f"digest mismatch (expected {expected}, got {actual})"
)
)
return errors
def verify_files(reader: BundleReader, manifest: Dict, require_dsse: bool) -> List[ValidationError]:
errors: List[ValidationError] = []
paths = [
("plan.canonicalPlanPath", manifest.get("plan", {}).get("canonicalPlanPath")),
("plan.inputsLock", manifest.get("plan", {}).get("inputsLock")),
("pack.sbom", manifest.get("pack", {}).get("sbom")),
("evidence.attestation", manifest.get("evidence", {}).get("attestation")),
("evidence.approvalsLedger", manifest.get("evidence", {}).get("approvalsLedger")),
("security.revocations", manifest.get("security", {}).get("revocations")),
("security.secretsRedactionPolicy", manifest.get("security", {}).get("secretsRedactionPolicy")),
]
if require_dsse:
signatures = manifest.get("security", {}).get("signatures", {})
paths.extend(
[
("security.signatures.bundleDsse", signatures.get("bundleDsse")),
("security.signatures.attestationDsse", signatures.get("attestationDsse")),
]
)
missing = [
ValidationError(path_label, "file path missing")
for path_label, path_value in paths
if not path_value
]
for err in missing:
errors.append(err)
for path_label, path_value in paths:
if not path_value:
continue
if not reader.exists(path_value):
errors.append(ValidationError(path_label, f"{path_value} not found in bundle"))
# Verify canonical plan hash if the file exists.
plan_path = manifest.get("plan", {}).get("canonicalPlanPath")
expected_plan_hash = manifest.get("plan", {}).get("hash")
if plan_path and expected_plan_hash and reader.exists(plan_path):
actual_plan_hash = sha256_digest(reader.read_bytes(plan_path))
if actual_plan_hash != expected_plan_hash:
errors.append(
ValidationError(
"plan.hash",
f"hash mismatch (expected {expected_plan_hash}, got {actual_plan_hash})",
)
)
return errors
def main() -> int:
args = parse_args()
reader = BundleReader(args.bundle)
try:
manifest = load_manifest(reader, args.manifest)
errors: List[ValidationError] = []
errors.extend(validate_manifest(manifest))
errors.extend(verify_files(reader, manifest, require_dsse=args.require_dsse))
errors.extend(verify_hashes(reader, manifest))
if errors:
for item in sorted(errors, key=lambda e: e.path):
print(f"[FAIL] {item}")
return 1
print("[OK] offline bundle validated")
return 0
finally:
reader.close()
if __name__ == "__main__":
sys.exit(main())