Files

124 lines
3.5 KiB
Python

#!/usr/bin/env python3
"""Orchestrate deterministic supply-chain hardening lanes."""
from __future__ import annotations
import argparse
import json
import pathlib
import subprocess
import sys
import time
def _python() -> str:
return sys.executable
def _run(cmd: list[str], cwd: pathlib.Path) -> int:
completed = subprocess.run(cmd, cwd=str(cwd), check=False)
return completed.returncode
def main() -> int:
parser = argparse.ArgumentParser(description="Run supply-chain hardening suite.")
parser.add_argument("--seed", type=int, default=20260226)
parser.add_argument("--profile", choices=["smoke", "nightly"], default="smoke")
parser.add_argument("--output", type=pathlib.Path, default=pathlib.Path("out/supply-chain"))
args = parser.parse_args()
repo_root = pathlib.Path(__file__).resolve().parents[2]
output_root = args.output.resolve()
output_root.mkdir(parents=True, exist_ok=True)
if args.profile == "smoke":
fuzz_limit = 1000
fuzz_seconds = 60
else:
fuzz_limit = 5000
fuzz_seconds = 300
lanes = [
(
"01-jcs-property",
[
_python(),
"tests/supply-chain/01-jcs-property/test_jcs.py",
"--seed",
str(args.seed),
"--output",
str(output_root / "01-jcs-property"),
],
),
(
"02-schema-fuzz",
[
_python(),
"tests/supply-chain/02-schema-fuzz/run_mutations.py",
"--seed",
str(args.seed),
"--limit",
str(fuzz_limit),
"--time-seconds",
str(fuzz_seconds),
"--output",
str(output_root / "02-schema-fuzz"),
],
),
(
"03-rekor-neg",
[
_python(),
"tests/supply-chain/03-rekor-neg/run_negative_suite.py",
"--output",
str(output_root / "03-rekor-neg"),
],
),
(
"04-big-dsse-referrers",
[
_python(),
"tests/supply-chain/04-big-dsse-referrers/run_big_cases.py",
"--output",
str(output_root / "04-big-dsse-referrers"),
],
),
(
"05-corpus-archive",
[
_python(),
"tests/supply-chain/05-corpus/build_corpus_archive.py",
"--output",
str(output_root / "05-corpus"),
],
),
]
start = time.perf_counter()
lane_results: list[dict[str, object]] = []
for lane_name, command in lanes:
print(f"[supply-chain] lane={lane_name} command={' '.join(command)}")
return_code = _run(command, repo_root)
lane_results.append(
{
"lane": lane_name,
"returnCode": return_code,
"status": "pass" if return_code == 0 else "fail",
}
)
summary = {
"profile": args.profile,
"seed": args.seed,
"durationSeconds": round(time.perf_counter() - start, 4),
"lanes": lane_results,
}
(output_root / "summary.json").write_text(json.dumps(summary, sort_keys=True, indent=2) + "\n", encoding="utf-8")
failures = [lane for lane in lane_results if lane["returnCode"] != 0]
return 0 if not failures else 1
if __name__ == "__main__":
raise SystemExit(main())