feat: add PolicyPackSelectorComponent with tests and integration
- Implemented PolicyPackSelectorComponent for selecting policy packs. - Added unit tests for component behavior, including API success and error handling. - Introduced monaco-workers type declarations for editor workers. - Created acceptance tests for guardrails with stubs for AT1–AT10. - Established SCA Failure Catalogue Fixtures for regression testing. - Developed plugin determinism harness with stubs for PL1–PL10. - Added scripts for evidence upload and verification processes.
This commit is contained in:
@@ -20,6 +20,8 @@ import json
|
||||
import os
|
||||
import sys
|
||||
import tarfile
|
||||
import re
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, Iterable, List, Optional
|
||||
|
||||
@@ -37,7 +39,9 @@ class BundleReader:
|
||||
def __init__(self, bundle_path: str):
|
||||
self.bundle_path = bundle_path
|
||||
self._tar: Optional[tarfile.TarFile] = None
|
||||
if tarfile.is_tarfile(bundle_path):
|
||||
if os.path.isdir(bundle_path):
|
||||
self._tar = None
|
||||
elif tarfile.is_tarfile(bundle_path):
|
||||
self._tar = tarfile.open(bundle_path, mode="r:*")
|
||||
|
||||
def exists(self, path: str) -> bool:
|
||||
@@ -73,9 +77,14 @@ def parse_args() -> argparse.Namespace:
|
||||
)
|
||||
parser.add_argument(
|
||||
"--bundle",
|
||||
required=True,
|
||||
required=False,
|
||||
help="Path to bundle directory or tarball containing bundle manifest + artefacts.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--fixture",
|
||||
choices=["good", "bad"],
|
||||
help="If set, uses built-in fixtures under scripts/packs/__fixtures__/ for quick checks.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--manifest",
|
||||
default="bundle.json",
|
||||
@@ -154,6 +163,7 @@ def validate_manifest(manifest: Dict) -> List[ValidationError]:
|
||||
for quota_field in [
|
||||
("security.sandbox.cpuLimitMillicores", sandbox.get("cpuLimitMillicores")),
|
||||
("security.sandbox.memoryLimitMiB", sandbox.get("memoryLimitMiB")),
|
||||
("security.sandbox.quotaSeconds", sandbox.get("quotaSeconds")),
|
||||
]:
|
||||
field, value = quota_field
|
||||
if value is None or not isinstance(value, (int, float)) or value <= 0:
|
||||
@@ -241,12 +251,40 @@ def verify_files(reader: BundleReader, manifest: Dict, require_dsse: bool) -> Li
|
||||
f"hash mismatch (expected {expected_plan_hash}, got {actual_plan_hash})",
|
||||
)
|
||||
)
|
||||
|
||||
approvals_path = manifest.get("evidence", {}).get("approvalsLedger")
|
||||
if approvals_path and reader.exists(approvals_path):
|
||||
try:
|
||||
approval_doc = json.loads(reader.read_bytes(approvals_path))
|
||||
errors.extend(validate_approval_ledger(approval_doc, manifest.get("plan", {}).get("hash")))
|
||||
except Exception as exc: # broad but scoped to ledger parse
|
||||
errors.append(ValidationError("evidence.approvalsLedger", f"failed to parse ledger JSON: {exc}"))
|
||||
return errors
|
||||
|
||||
|
||||
def validate_approval_ledger(doc: Dict, expected_plan_hash: Optional[str]) -> List[ValidationError]:
|
||||
errors: List[ValidationError] = []
|
||||
if doc.get("schemaVersion") != "stellaops.pack.approval-ledger.v1":
|
||||
errors.append(ValidationError("approvalsLedger.schemaVersion", "must be stellaops.pack.approval-ledger.v1"))
|
||||
for field in ["runId", "gateId", "tenantId", "decision", "planHash", "decidedAt"]:
|
||||
if not doc.get(field):
|
||||
errors.append(ValidationError(f"approvalsLedger.{field}", "is required"))
|
||||
plan_hash = doc.get("planHash")
|
||||
if plan_hash and not re.match(r"^sha256:[0-9a-f]{64}$", plan_hash, re.IGNORECASE):
|
||||
errors.append(ValidationError("approvalsLedger.planHash", "must be sha256:<64-hex>"))
|
||||
if expected_plan_hash and plan_hash and plan_hash.lower() != expected_plan_hash.lower():
|
||||
errors.append(ValidationError("approvalsLedger.planHash", "must match manifest.plan.hash"))
|
||||
if doc.get("decision") not in {"approved", "rejected", "expired"}:
|
||||
errors.append(ValidationError("approvalsLedger.decision", "must be approved|rejected|expired"))
|
||||
return errors
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
reader = BundleReader(args.bundle)
|
||||
bundle_path = args.bundle
|
||||
if args.fixture:
|
||||
bundle_path = Path(__file__).parent / "__fixtures__" / args.fixture
|
||||
reader = BundleReader(bundle_path.as_posix() if isinstance(bundle_path, Path) else bundle_path)
|
||||
try:
|
||||
manifest = load_manifest(reader, args.manifest)
|
||||
errors: List[ValidationError] = []
|
||||
|
||||
Reference in New Issue
Block a user