Files
git.stella-ops.org/tests/supply-chain/01-jcs-property/test_jcs.py

184 lines
6.3 KiB
Python

#!/usr/bin/env python3
"""Deterministic JCS-style property checks for SBOM/attestation fixtures."""
from __future__ import annotations
import argparse
import difflib
import json
import pathlib
import random
import sys
import time
from typing import Any
TOOLS_DIR = pathlib.Path(__file__).resolve().parents[1] / "tools"
sys.path.insert(0, str(TOOLS_DIR))
from canonicalize_json import DuplicateKeyError, canonicalize_text, canonicalize_value, parse_json_strict # noqa: E402
from emit_artifacts import TestCaseResult, record_failure, write_junit # noqa: E402
def _shuffle_value(value: Any, rng: random.Random) -> Any:
if isinstance(value, dict):
items = list(value.items())
rng.shuffle(items)
return {k: _shuffle_value(v, rng) for k, v in items}
if isinstance(value, list):
return [_shuffle_value(item, rng) for item in value]
return value
def _load_fixture_texts(corpus_root: pathlib.Path) -> list[tuple[str, str]]:
fixture_paths = sorted(corpus_root.rglob("*.json"))
fixtures: list[tuple[str, str]] = []
for path in fixture_paths:
fixtures.append((path.name, path.read_text(encoding="utf-8")))
return fixtures
def _run(seed: int, output: pathlib.Path) -> int:
start = time.perf_counter()
rng = random.Random(seed)
corpus_root = pathlib.Path(__file__).resolve().parents[1] / "05-corpus" / "fixtures" / "sboms"
fixtures = _load_fixture_texts(corpus_root)
cases: list[TestCaseResult] = []
failures = 0
for index, (fixture_name, fixture_text) in enumerate(fixtures):
case_id = f"{fixture_name}-idempotence"
case_start = time.perf_counter()
try:
parsed = parse_json_strict(fixture_text)
canonical_1 = canonicalize_text(fixture_text)
canonical_2 = canonicalize_text(canonical_1)
if canonical_1 != canonical_2:
diff = "\n".join(
difflib.unified_diff(
canonical_1.splitlines(),
canonical_2.splitlines(),
fromfile="canonical_1",
tofile="canonical_2",
lineterm="",
)
)
raise AssertionError("Idempotence mismatch", diff)
shuffled = _shuffle_value(parsed, random.Random(rng.randint(0, 2**31 - 1) + index))
canonical_3 = canonicalize_value(shuffled)
if canonical_1 != canonical_3:
diff = "\n".join(
difflib.unified_diff(
canonical_1.splitlines(),
canonical_3.splitlines(),
fromfile="canonical_1",
tofile="canonical_shuffled",
lineterm="",
)
)
raise AssertionError("Permutation equality mismatch", diff)
cases.append(
TestCaseResult(
suite="01-jcs-property",
name=case_id,
passed=True,
duration_seconds=time.perf_counter() - case_start,
)
)
except Exception as exc: # noqa: BLE001
failures += 1
patch = None
message = str(exc)
if isinstance(exc, AssertionError) and len(exc.args) > 1:
patch = str(exc.args[1])
message = str(exc.args[0])
record_failure(
lane_output_dir=output,
case_id=case_id,
seed=seed,
payload_text=fixture_text,
error_class="canonicalization_invariant_failed",
message=message,
details={"fixture": fixture_name},
canonical_diff_patch=patch,
)
cases.append(
TestCaseResult(
suite="01-jcs-property",
name=case_id,
passed=False,
duration_seconds=time.perf_counter() - case_start,
failure_message=message,
)
)
duplicate_case = '{"a":1,"a":2}'
duplicate_case_id = "duplicate-key-rejection"
duplicate_start = time.perf_counter()
try:
parse_json_strict(duplicate_case)
raise AssertionError("Duplicate key payload was not rejected")
except DuplicateKeyError:
cases.append(
TestCaseResult(
suite="01-jcs-property",
name=duplicate_case_id,
passed=True,
duration_seconds=time.perf_counter() - duplicate_start,
)
)
except Exception as exc: # noqa: BLE001
failures += 1
message = str(exc)
record_failure(
lane_output_dir=output,
case_id=duplicate_case_id,
seed=seed,
payload_text=duplicate_case,
error_class="duplicate_key_rejection_failed",
message=message,
details={},
canonical_diff_patch=None,
)
cases.append(
TestCaseResult(
suite="01-jcs-property",
name=duplicate_case_id,
passed=False,
duration_seconds=time.perf_counter() - duplicate_start,
failure_message=message,
)
)
summary = {
"seed": seed,
"fixtureCount": len(fixtures),
"testCases": len(cases),
"failures": failures,
"durationSeconds": round(time.perf_counter() - start, 4),
}
output.mkdir(parents=True, exist_ok=True)
(output / "summary.json").write_text(json.dumps(summary, sort_keys=True, indent=2) + "\n", encoding="utf-8")
write_junit(output / "junit.xml", cases)
return 0 if failures == 0 else 1
def main() -> int:
parser = argparse.ArgumentParser(description="Run deterministic JCS property checks.")
parser.add_argument("--seed", type=int, default=20260226, help="Deterministic seed.")
parser.add_argument(
"--output",
type=pathlib.Path,
default=pathlib.Path("out/supply-chain/01-jcs-property"),
help="Output directory for junit and artifacts.",
)
args = parser.parse_args()
return _run(args.seed, args.output.resolve())
if __name__ == "__main__":
raise SystemExit(main())