#!/usr/bin/env python3 """Large DSSE payload and OCI referrer edge-case deterministic suite.""" from __future__ import annotations import argparse import hashlib import json import pathlib import tarfile import time import sys TOOLS_DIR = pathlib.Path(__file__).resolve().parents[1] / "tools" sys.path.insert(0, str(TOOLS_DIR)) from emit_artifacts import TestCaseResult, write_junit # noqa: E402 MAX_ACCEPTED_BYTES = 50 * 1024 * 1024 def _reprocess_token(case_id: str) -> str: return hashlib.sha256(case_id.encode("utf-8")).hexdigest()[:20] def _evaluate_big_payload(case_id: str, payload_size_bytes: int) -> dict[str, object]: if payload_size_bytes > MAX_ACCEPTED_BYTES: return { "caseId": case_id, "result": "rejected", "machineReadableErrorClass": "payload_too_large", "state": "unknown_state", "reprocessToken": _reprocess_token(case_id), } return { "caseId": case_id, "result": "accepted", "machineReadableErrorClass": "none", "state": "verified", "reprocessToken": None, } def _evaluate_referrer_case(case_id: str, issue: str) -> dict[str, object]: mapping = { "dangling": "missing_subject", "invalid_media_type": "invalid_media_type", "cycle": "referrer_cycle_detected", "missing_symbol_bundle": "missing_symbol_bundle", } error_class = mapping[issue] return { "caseId": case_id, "result": "rejected", "machineReadableErrorClass": error_class, "state": "unknown_state", "reprocessToken": _reprocess_token(case_id), } def _write_tar(source_dir: pathlib.Path, tar_path: pathlib.Path) -> None: tar_path.parent.mkdir(parents=True, exist_ok=True) with tarfile.open(tar_path, "w:gz") as archive: for file in sorted(path for path in source_dir.rglob("*") if path.is_file()): archive.add(file, arcname=file.relative_to(source_dir).as_posix()) def main() -> int: parser = argparse.ArgumentParser(description="Run deterministic large DSSE/referrer suite.") parser.add_argument( "--output", type=pathlib.Path, default=pathlib.Path("out/supply-chain/04-big-dsse-referrers"), ) args = parser.parse_args() output = args.output.resolve() output.mkdir(parents=True, exist_ok=True) case_root = output / "cases" case_root.mkdir(parents=True, exist_ok=True) start = time.perf_counter() big_payload_cases = [ ("dsse-100mb", 100 * 1024 * 1024), ("dsse-250mb", 250 * 1024 * 1024), ("dsse-1gb", 1024 * 1024 * 1024), ] referrer_cases = [ ("referrer-dangling", "dangling"), ("referrer-invalid-media-type", "invalid_media_type"), ("referrer-cycle", "cycle"), ("referrer-missing-symbol-bundle", "missing_symbol_bundle"), ] results: list[dict[str, object]] = [] junit_cases: list[TestCaseResult] = [] failures = 0 for case_id, size_bytes in big_payload_cases: case_start = time.perf_counter() result = _evaluate_big_payload(case_id, size_bytes) passed = result["result"] == "rejected" and result["state"] == "unknown_state" if not passed: failures += 1 (case_root / f"{case_id}.json").write_text( json.dumps(result, sort_keys=True, indent=2) + "\n", encoding="utf-8", ) results.append(result) junit_cases.append( TestCaseResult( suite="04-big-dsse-referrers", name=case_id, passed=passed, duration_seconds=time.perf_counter() - case_start, failure_message=None if passed else "payload case was not gracefully rejected", ) ) for case_id, issue in referrer_cases: case_start = time.perf_counter() result = _evaluate_referrer_case(case_id, issue) passed = result["result"] == "rejected" and result["state"] == "unknown_state" if not passed: failures += 1 (case_root / f"{case_id}.json").write_text( json.dumps(result, sort_keys=True, indent=2) + "\n", encoding="utf-8", ) results.append(result) junit_cases.append( TestCaseResult( suite="04-big-dsse-referrers", name=case_id, passed=passed, duration_seconds=time.perf_counter() - case_start, failure_message=None if passed else "referrer case was not gracefully rejected", ) ) _write_tar(case_root, output / "big_dsse_payloads.tar.gz") report = { "durationSeconds": round(time.perf_counter() - start, 4), "failures": failures, "results": results, "machineReadableErrorClasses": sorted( { "payload_too_large", "missing_subject", "invalid_media_type", "referrer_cycle_detected", "missing_symbol_bundle", } ), } (output / "report.json").write_text(json.dumps(report, sort_keys=True, indent=2) + "\n", encoding="utf-8") write_junit(output / "junit.xml", junit_cases) return 0 if failures == 0 else 1 if __name__ == "__main__": raise SystemExit(main())