consolidation of some of the modules, localization fixes, product advisories work, qa work
This commit is contained in:
183
tests/supply-chain/01-jcs-property/test_jcs.py
Normal file
183
tests/supply-chain/01-jcs-property/test_jcs.py
Normal file
@@ -0,0 +1,183 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Deterministic JCS-style property checks for SBOM/attestation fixtures."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import difflib
|
||||
import json
|
||||
import pathlib
|
||||
import random
|
||||
import sys
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
TOOLS_DIR = pathlib.Path(__file__).resolve().parents[1] / "tools"
|
||||
sys.path.insert(0, str(TOOLS_DIR))
|
||||
|
||||
from canonicalize_json import DuplicateKeyError, canonicalize_text, canonicalize_value, parse_json_strict # noqa: E402
|
||||
from emit_artifacts import TestCaseResult, record_failure, write_junit # noqa: E402
|
||||
|
||||
|
||||
def _shuffle_value(value: Any, rng: random.Random) -> Any:
|
||||
if isinstance(value, dict):
|
||||
items = list(value.items())
|
||||
rng.shuffle(items)
|
||||
return {k: _shuffle_value(v, rng) for k, v in items}
|
||||
if isinstance(value, list):
|
||||
return [_shuffle_value(item, rng) for item in value]
|
||||
return value
|
||||
|
||||
|
||||
def _load_fixture_texts(corpus_root: pathlib.Path) -> list[tuple[str, str]]:
|
||||
fixture_paths = sorted(corpus_root.rglob("*.json"))
|
||||
fixtures: list[tuple[str, str]] = []
|
||||
for path in fixture_paths:
|
||||
fixtures.append((path.name, path.read_text(encoding="utf-8")))
|
||||
return fixtures
|
||||
|
||||
|
||||
def _run(seed: int, output: pathlib.Path) -> int:
|
||||
start = time.perf_counter()
|
||||
rng = random.Random(seed)
|
||||
|
||||
corpus_root = pathlib.Path(__file__).resolve().parents[1] / "05-corpus" / "fixtures" / "sboms"
|
||||
fixtures = _load_fixture_texts(corpus_root)
|
||||
|
||||
cases: list[TestCaseResult] = []
|
||||
failures = 0
|
||||
|
||||
for index, (fixture_name, fixture_text) in enumerate(fixtures):
|
||||
case_id = f"{fixture_name}-idempotence"
|
||||
case_start = time.perf_counter()
|
||||
try:
|
||||
parsed = parse_json_strict(fixture_text)
|
||||
canonical_1 = canonicalize_text(fixture_text)
|
||||
canonical_2 = canonicalize_text(canonical_1)
|
||||
if canonical_1 != canonical_2:
|
||||
diff = "\n".join(
|
||||
difflib.unified_diff(
|
||||
canonical_1.splitlines(),
|
||||
canonical_2.splitlines(),
|
||||
fromfile="canonical_1",
|
||||
tofile="canonical_2",
|
||||
lineterm="",
|
||||
)
|
||||
)
|
||||
raise AssertionError("Idempotence mismatch", diff)
|
||||
|
||||
shuffled = _shuffle_value(parsed, random.Random(rng.randint(0, 2**31 - 1) + index))
|
||||
canonical_3 = canonicalize_value(shuffled)
|
||||
if canonical_1 != canonical_3:
|
||||
diff = "\n".join(
|
||||
difflib.unified_diff(
|
||||
canonical_1.splitlines(),
|
||||
canonical_3.splitlines(),
|
||||
fromfile="canonical_1",
|
||||
tofile="canonical_shuffled",
|
||||
lineterm="",
|
||||
)
|
||||
)
|
||||
raise AssertionError("Permutation equality mismatch", diff)
|
||||
|
||||
cases.append(
|
||||
TestCaseResult(
|
||||
suite="01-jcs-property",
|
||||
name=case_id,
|
||||
passed=True,
|
||||
duration_seconds=time.perf_counter() - case_start,
|
||||
)
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
failures += 1
|
||||
patch = None
|
||||
message = str(exc)
|
||||
if isinstance(exc, AssertionError) and len(exc.args) > 1:
|
||||
patch = str(exc.args[1])
|
||||
message = str(exc.args[0])
|
||||
record_failure(
|
||||
lane_output_dir=output,
|
||||
case_id=case_id,
|
||||
seed=seed,
|
||||
payload_text=fixture_text,
|
||||
error_class="canonicalization_invariant_failed",
|
||||
message=message,
|
||||
details={"fixture": fixture_name},
|
||||
canonical_diff_patch=patch,
|
||||
)
|
||||
cases.append(
|
||||
TestCaseResult(
|
||||
suite="01-jcs-property",
|
||||
name=case_id,
|
||||
passed=False,
|
||||
duration_seconds=time.perf_counter() - case_start,
|
||||
failure_message=message,
|
||||
)
|
||||
)
|
||||
|
||||
duplicate_case = '{"a":1,"a":2}'
|
||||
duplicate_case_id = "duplicate-key-rejection"
|
||||
duplicate_start = time.perf_counter()
|
||||
try:
|
||||
parse_json_strict(duplicate_case)
|
||||
raise AssertionError("Duplicate key payload was not rejected")
|
||||
except DuplicateKeyError:
|
||||
cases.append(
|
||||
TestCaseResult(
|
||||
suite="01-jcs-property",
|
||||
name=duplicate_case_id,
|
||||
passed=True,
|
||||
duration_seconds=time.perf_counter() - duplicate_start,
|
||||
)
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
failures += 1
|
||||
message = str(exc)
|
||||
record_failure(
|
||||
lane_output_dir=output,
|
||||
case_id=duplicate_case_id,
|
||||
seed=seed,
|
||||
payload_text=duplicate_case,
|
||||
error_class="duplicate_key_rejection_failed",
|
||||
message=message,
|
||||
details={},
|
||||
canonical_diff_patch=None,
|
||||
)
|
||||
cases.append(
|
||||
TestCaseResult(
|
||||
suite="01-jcs-property",
|
||||
name=duplicate_case_id,
|
||||
passed=False,
|
||||
duration_seconds=time.perf_counter() - duplicate_start,
|
||||
failure_message=message,
|
||||
)
|
||||
)
|
||||
|
||||
summary = {
|
||||
"seed": seed,
|
||||
"fixtureCount": len(fixtures),
|
||||
"testCases": len(cases),
|
||||
"failures": failures,
|
||||
"durationSeconds": round(time.perf_counter() - start, 4),
|
||||
}
|
||||
output.mkdir(parents=True, exist_ok=True)
|
||||
(output / "summary.json").write_text(json.dumps(summary, sort_keys=True, indent=2) + "\n", encoding="utf-8")
|
||||
write_junit(output / "junit.xml", cases)
|
||||
return 0 if failures == 0 else 1
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description="Run deterministic JCS property checks.")
|
||||
parser.add_argument("--seed", type=int, default=20260226, help="Deterministic seed.")
|
||||
parser.add_argument(
|
||||
"--output",
|
||||
type=pathlib.Path,
|
||||
default=pathlib.Path("out/supply-chain/01-jcs-property"),
|
||||
help="Output directory for junit and artifacts.",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
return _run(args.seed, args.output.resolve())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user