165 lines
5.3 KiB
Python
165 lines
5.3 KiB
Python
#!/usr/bin/env python3
|
|
"""Large DSSE payload and OCI referrer edge-case deterministic suite."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import pathlib
|
|
import tarfile
|
|
import time
|
|
|
|
import sys
|
|
|
|
TOOLS_DIR = pathlib.Path(__file__).resolve().parents[1] / "tools"
|
|
sys.path.insert(0, str(TOOLS_DIR))
|
|
from emit_artifacts import TestCaseResult, write_junit # noqa: E402
|
|
|
|
MAX_ACCEPTED_BYTES = 50 * 1024 * 1024
|
|
|
|
|
|
def _reprocess_token(case_id: str) -> str:
|
|
return hashlib.sha256(case_id.encode("utf-8")).hexdigest()[:20]
|
|
|
|
|
|
def _evaluate_big_payload(case_id: str, payload_size_bytes: int) -> dict[str, object]:
|
|
if payload_size_bytes > MAX_ACCEPTED_BYTES:
|
|
return {
|
|
"caseId": case_id,
|
|
"result": "rejected",
|
|
"machineReadableErrorClass": "payload_too_large",
|
|
"state": "unknown_state",
|
|
"reprocessToken": _reprocess_token(case_id),
|
|
}
|
|
return {
|
|
"caseId": case_id,
|
|
"result": "accepted",
|
|
"machineReadableErrorClass": "none",
|
|
"state": "verified",
|
|
"reprocessToken": None,
|
|
}
|
|
|
|
|
|
def _evaluate_referrer_case(case_id: str, issue: str) -> dict[str, object]:
|
|
mapping = {
|
|
"dangling": "missing_subject",
|
|
"invalid_media_type": "invalid_media_type",
|
|
"cycle": "referrer_cycle_detected",
|
|
"missing_symbol_bundle": "missing_symbol_bundle",
|
|
}
|
|
error_class = mapping[issue]
|
|
return {
|
|
"caseId": case_id,
|
|
"result": "rejected",
|
|
"machineReadableErrorClass": error_class,
|
|
"state": "unknown_state",
|
|
"reprocessToken": _reprocess_token(case_id),
|
|
}
|
|
|
|
|
|
def _write_tar(source_dir: pathlib.Path, tar_path: pathlib.Path) -> None:
|
|
tar_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with tarfile.open(tar_path, "w:gz") as archive:
|
|
for file in sorted(path for path in source_dir.rglob("*") if path.is_file()):
|
|
archive.add(file, arcname=file.relative_to(source_dir).as_posix())
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Run deterministic large DSSE/referrer suite.")
|
|
parser.add_argument(
|
|
"--output",
|
|
type=pathlib.Path,
|
|
default=pathlib.Path("out/supply-chain/04-big-dsse-referrers"),
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
output = args.output.resolve()
|
|
output.mkdir(parents=True, exist_ok=True)
|
|
case_root = output / "cases"
|
|
case_root.mkdir(parents=True, exist_ok=True)
|
|
|
|
start = time.perf_counter()
|
|
|
|
big_payload_cases = [
|
|
("dsse-100mb", 100 * 1024 * 1024),
|
|
("dsse-250mb", 250 * 1024 * 1024),
|
|
("dsse-1gb", 1024 * 1024 * 1024),
|
|
]
|
|
referrer_cases = [
|
|
("referrer-dangling", "dangling"),
|
|
("referrer-invalid-media-type", "invalid_media_type"),
|
|
("referrer-cycle", "cycle"),
|
|
("referrer-missing-symbol-bundle", "missing_symbol_bundle"),
|
|
]
|
|
|
|
results: list[dict[str, object]] = []
|
|
junit_cases: list[TestCaseResult] = []
|
|
failures = 0
|
|
|
|
for case_id, size_bytes in big_payload_cases:
|
|
case_start = time.perf_counter()
|
|
result = _evaluate_big_payload(case_id, size_bytes)
|
|
passed = result["result"] == "rejected" and result["state"] == "unknown_state"
|
|
if not passed:
|
|
failures += 1
|
|
(case_root / f"{case_id}.json").write_text(
|
|
json.dumps(result, sort_keys=True, indent=2) + "\n",
|
|
encoding="utf-8",
|
|
)
|
|
results.append(result)
|
|
junit_cases.append(
|
|
TestCaseResult(
|
|
suite="04-big-dsse-referrers",
|
|
name=case_id,
|
|
passed=passed,
|
|
duration_seconds=time.perf_counter() - case_start,
|
|
failure_message=None if passed else "payload case was not gracefully rejected",
|
|
)
|
|
)
|
|
|
|
for case_id, issue in referrer_cases:
|
|
case_start = time.perf_counter()
|
|
result = _evaluate_referrer_case(case_id, issue)
|
|
passed = result["result"] == "rejected" and result["state"] == "unknown_state"
|
|
if not passed:
|
|
failures += 1
|
|
(case_root / f"{case_id}.json").write_text(
|
|
json.dumps(result, sort_keys=True, indent=2) + "\n",
|
|
encoding="utf-8",
|
|
)
|
|
results.append(result)
|
|
junit_cases.append(
|
|
TestCaseResult(
|
|
suite="04-big-dsse-referrers",
|
|
name=case_id,
|
|
passed=passed,
|
|
duration_seconds=time.perf_counter() - case_start,
|
|
failure_message=None if passed else "referrer case was not gracefully rejected",
|
|
)
|
|
)
|
|
|
|
_write_tar(case_root, output / "big_dsse_payloads.tar.gz")
|
|
|
|
report = {
|
|
"durationSeconds": round(time.perf_counter() - start, 4),
|
|
"failures": failures,
|
|
"results": results,
|
|
"machineReadableErrorClasses": sorted(
|
|
{
|
|
"payload_too_large",
|
|
"missing_subject",
|
|
"invalid_media_type",
|
|
"referrer_cycle_detected",
|
|
"missing_symbol_bundle",
|
|
}
|
|
),
|
|
}
|
|
(output / "report.json").write_text(json.dumps(report, sort_keys=True, indent=2) + "\n", encoding="utf-8")
|
|
write_junit(output / "junit.xml", junit_cases)
|
|
return 0 if failures == 0 else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|