Files

103 lines
2.9 KiB
Python

#!/usr/bin/env python3
"""Artifact and JUnit emitters for deterministic supply-chain hardening lanes."""
from __future__ import annotations
import json
import pathlib
import xml.etree.ElementTree as et
from dataclasses import dataclass
from typing import Any, Iterable
def _write_json(path: pathlib.Path, payload: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8", newline="\n") as handle:
json.dump(payload, handle, sort_keys=True, indent=2, ensure_ascii=False)
handle.write("\n")
def _write_text(path: pathlib.Path, content: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8", newline="\n")
@dataclass(frozen=True)
class TestCaseResult:
suite: str
name: str
passed: bool
duration_seconds: float = 0.0
failure_message: str | None = None
def write_junit(path: pathlib.Path, test_cases: Iterable[TestCaseResult]) -> None:
cases = list(test_cases)
failures = sum(0 if case.passed else 1 for case in cases)
suite = et.Element(
"testsuite",
attrib={
"name": "supply-chain-hardening",
"tests": str(len(cases)),
"failures": str(failures),
"errors": "0",
"skipped": "0",
},
)
for case in sorted(cases, key=lambda item: (item.suite, item.name)):
node = et.SubElement(
suite,
"testcase",
attrib={
"classname": case.suite,
"name": case.name,
"time": f"{case.duration_seconds:.3f}",
},
)
if not case.passed:
failure = et.SubElement(node, "failure", attrib={"type": "assertion"})
failure.text = case.failure_message or "failure"
tree = et.ElementTree(suite)
path.parent.mkdir(parents=True, exist_ok=True)
tree.write(path, encoding="utf-8", xml_declaration=True)
def record_failure(
*,
lane_output_dir: pathlib.Path,
case_id: str,
seed: int,
payload_text: str,
error_class: str,
message: str,
details: dict[str, Any] | None = None,
canonical_diff_patch: str | None = None,
) -> pathlib.Path:
"""
Write deterministic failure artifacts for replay.
Returns the failure directory path.
"""
failure_dir = lane_output_dir / "failures" / case_id
failure_dir.mkdir(parents=True, exist_ok=True)
_write_text(failure_dir / "failing_case.json", payload_text)
_write_text(failure_dir / "hypothesis_seed.txt", f"{seed}\n")
diagnostic_payload: dict[str, Any] = {
"caseId": case_id,
"errorClass": error_class,
"message": message,
}
if details:
diagnostic_payload["details"] = details
_write_json(failure_dir / "diagnostic_blob.json", diagnostic_payload)
if canonical_diff_patch is not None:
_write_text(failure_dir / "canonical_diff.patch", canonical_diff_patch)
return failure_dir