Files
git.stella-ops.org/tests/supply-chain/04-big-dsse-referrers/run_big_cases.py

165 lines
5.3 KiB
Python

#!/usr/bin/env python3
"""Large DSSE payload and OCI referrer edge-case deterministic suite."""
from __future__ import annotations
import argparse
import hashlib
import json
import pathlib
import tarfile
import time
import sys
TOOLS_DIR = pathlib.Path(__file__).resolve().parents[1] / "tools"
sys.path.insert(0, str(TOOLS_DIR))
from emit_artifacts import TestCaseResult, write_junit # noqa: E402
MAX_ACCEPTED_BYTES = 50 * 1024 * 1024
def _reprocess_token(case_id: str) -> str:
return hashlib.sha256(case_id.encode("utf-8")).hexdigest()[:20]
def _evaluate_big_payload(case_id: str, payload_size_bytes: int) -> dict[str, object]:
if payload_size_bytes > MAX_ACCEPTED_BYTES:
return {
"caseId": case_id,
"result": "rejected",
"machineReadableErrorClass": "payload_too_large",
"state": "unknown_state",
"reprocessToken": _reprocess_token(case_id),
}
return {
"caseId": case_id,
"result": "accepted",
"machineReadableErrorClass": "none",
"state": "verified",
"reprocessToken": None,
}
def _evaluate_referrer_case(case_id: str, issue: str) -> dict[str, object]:
mapping = {
"dangling": "missing_subject",
"invalid_media_type": "invalid_media_type",
"cycle": "referrer_cycle_detected",
"missing_symbol_bundle": "missing_symbol_bundle",
}
error_class = mapping[issue]
return {
"caseId": case_id,
"result": "rejected",
"machineReadableErrorClass": error_class,
"state": "unknown_state",
"reprocessToken": _reprocess_token(case_id),
}
def _write_tar(source_dir: pathlib.Path, tar_path: pathlib.Path) -> None:
tar_path.parent.mkdir(parents=True, exist_ok=True)
with tarfile.open(tar_path, "w:gz") as archive:
for file in sorted(path for path in source_dir.rglob("*") if path.is_file()):
archive.add(file, arcname=file.relative_to(source_dir).as_posix())
def main() -> int:
parser = argparse.ArgumentParser(description="Run deterministic large DSSE/referrer suite.")
parser.add_argument(
"--output",
type=pathlib.Path,
default=pathlib.Path("out/supply-chain/04-big-dsse-referrers"),
)
args = parser.parse_args()
output = args.output.resolve()
output.mkdir(parents=True, exist_ok=True)
case_root = output / "cases"
case_root.mkdir(parents=True, exist_ok=True)
start = time.perf_counter()
big_payload_cases = [
("dsse-100mb", 100 * 1024 * 1024),
("dsse-250mb", 250 * 1024 * 1024),
("dsse-1gb", 1024 * 1024 * 1024),
]
referrer_cases = [
("referrer-dangling", "dangling"),
("referrer-invalid-media-type", "invalid_media_type"),
("referrer-cycle", "cycle"),
("referrer-missing-symbol-bundle", "missing_symbol_bundle"),
]
results: list[dict[str, object]] = []
junit_cases: list[TestCaseResult] = []
failures = 0
for case_id, size_bytes in big_payload_cases:
case_start = time.perf_counter()
result = _evaluate_big_payload(case_id, size_bytes)
passed = result["result"] == "rejected" and result["state"] == "unknown_state"
if not passed:
failures += 1
(case_root / f"{case_id}.json").write_text(
json.dumps(result, sort_keys=True, indent=2) + "\n",
encoding="utf-8",
)
results.append(result)
junit_cases.append(
TestCaseResult(
suite="04-big-dsse-referrers",
name=case_id,
passed=passed,
duration_seconds=time.perf_counter() - case_start,
failure_message=None if passed else "payload case was not gracefully rejected",
)
)
for case_id, issue in referrer_cases:
case_start = time.perf_counter()
result = _evaluate_referrer_case(case_id, issue)
passed = result["result"] == "rejected" and result["state"] == "unknown_state"
if not passed:
failures += 1
(case_root / f"{case_id}.json").write_text(
json.dumps(result, sort_keys=True, indent=2) + "\n",
encoding="utf-8",
)
results.append(result)
junit_cases.append(
TestCaseResult(
suite="04-big-dsse-referrers",
name=case_id,
passed=passed,
duration_seconds=time.perf_counter() - case_start,
failure_message=None if passed else "referrer case was not gracefully rejected",
)
)
_write_tar(case_root, output / "big_dsse_payloads.tar.gz")
report = {
"durationSeconds": round(time.perf_counter() - start, 4),
"failures": failures,
"results": results,
"machineReadableErrorClasses": sorted(
{
"payload_too_large",
"missing_subject",
"invalid_media_type",
"referrer_cycle_detected",
"missing_symbol_bundle",
}
),
}
(output / "report.json").write_text(json.dumps(report, sort_keys=True, indent=2) + "\n", encoding="utf-8")
write_junit(output / "junit.xml", junit_cases)
return 0 if failures == 0 else 1
if __name__ == "__main__":
raise SystemExit(main())