#!/usr/bin/env python3 """Deterministic JCS-style property checks for SBOM/attestation fixtures.""" from __future__ import annotations import argparse import difflib import json import pathlib import random import sys import time from typing import Any TOOLS_DIR = pathlib.Path(__file__).resolve().parents[1] / "tools" sys.path.insert(0, str(TOOLS_DIR)) from canonicalize_json import DuplicateKeyError, canonicalize_text, canonicalize_value, parse_json_strict # noqa: E402 from emit_artifacts import TestCaseResult, record_failure, write_junit # noqa: E402 def _shuffle_value(value: Any, rng: random.Random) -> Any: if isinstance(value, dict): items = list(value.items()) rng.shuffle(items) return {k: _shuffle_value(v, rng) for k, v in items} if isinstance(value, list): return [_shuffle_value(item, rng) for item in value] return value def _load_fixture_texts(corpus_root: pathlib.Path) -> list[tuple[str, str]]: fixture_paths = sorted(corpus_root.rglob("*.json")) fixtures: list[tuple[str, str]] = [] for path in fixture_paths: fixtures.append((path.name, path.read_text(encoding="utf-8"))) return fixtures def _run(seed: int, output: pathlib.Path) -> int: start = time.perf_counter() rng = random.Random(seed) corpus_root = pathlib.Path(__file__).resolve().parents[1] / "05-corpus" / "fixtures" / "sboms" fixtures = _load_fixture_texts(corpus_root) cases: list[TestCaseResult] = [] failures = 0 for index, (fixture_name, fixture_text) in enumerate(fixtures): case_id = f"{fixture_name}-idempotence" case_start = time.perf_counter() try: parsed = parse_json_strict(fixture_text) canonical_1 = canonicalize_text(fixture_text) canonical_2 = canonicalize_text(canonical_1) if canonical_1 != canonical_2: diff = "\n".join( difflib.unified_diff( canonical_1.splitlines(), canonical_2.splitlines(), fromfile="canonical_1", tofile="canonical_2", lineterm="", ) ) raise AssertionError("Idempotence mismatch", diff) shuffled = _shuffle_value(parsed, random.Random(rng.randint(0, 2**31 - 1) + index)) canonical_3 = canonicalize_value(shuffled) if canonical_1 != canonical_3: diff = "\n".join( difflib.unified_diff( canonical_1.splitlines(), canonical_3.splitlines(), fromfile="canonical_1", tofile="canonical_shuffled", lineterm="", ) ) raise AssertionError("Permutation equality mismatch", diff) cases.append( TestCaseResult( suite="01-jcs-property", name=case_id, passed=True, duration_seconds=time.perf_counter() - case_start, ) ) except Exception as exc: # noqa: BLE001 failures += 1 patch = None message = str(exc) if isinstance(exc, AssertionError) and len(exc.args) > 1: patch = str(exc.args[1]) message = str(exc.args[0]) record_failure( lane_output_dir=output, case_id=case_id, seed=seed, payload_text=fixture_text, error_class="canonicalization_invariant_failed", message=message, details={"fixture": fixture_name}, canonical_diff_patch=patch, ) cases.append( TestCaseResult( suite="01-jcs-property", name=case_id, passed=False, duration_seconds=time.perf_counter() - case_start, failure_message=message, ) ) duplicate_case = '{"a":1,"a":2}' duplicate_case_id = "duplicate-key-rejection" duplicate_start = time.perf_counter() try: parse_json_strict(duplicate_case) raise AssertionError("Duplicate key payload was not rejected") except DuplicateKeyError: cases.append( TestCaseResult( suite="01-jcs-property", name=duplicate_case_id, passed=True, duration_seconds=time.perf_counter() - duplicate_start, ) ) except Exception as exc: # noqa: BLE001 failures += 1 message = str(exc) record_failure( lane_output_dir=output, case_id=duplicate_case_id, seed=seed, payload_text=duplicate_case, error_class="duplicate_key_rejection_failed", message=message, details={}, canonical_diff_patch=None, ) cases.append( TestCaseResult( suite="01-jcs-property", name=duplicate_case_id, passed=False, duration_seconds=time.perf_counter() - duplicate_start, failure_message=message, ) ) summary = { "seed": seed, "fixtureCount": len(fixtures), "testCases": len(cases), "failures": failures, "durationSeconds": round(time.perf_counter() - start, 4), } output.mkdir(parents=True, exist_ok=True) (output / "summary.json").write_text(json.dumps(summary, sort_keys=True, indent=2) + "\n", encoding="utf-8") write_junit(output / "junit.xml", cases) return 0 if failures == 0 else 1 def main() -> int: parser = argparse.ArgumentParser(description="Run deterministic JCS property checks.") parser.add_argument("--seed", type=int, default=20260226, help="Deterministic seed.") parser.add_argument( "--output", type=pathlib.Path, default=pathlib.Path("out/supply-chain/01-jcs-property"), help="Output directory for junit and artifacts.", ) args = parser.parse_args() return _run(args.seed, args.output.resolve()) if __name__ == "__main__": raise SystemExit(main())