148 lines
4.3 KiB
Python
148 lines
4.3 KiB
Python
#!/usr/bin/env python3
|
|
"""Deterministic schema validator for reachability benchmark assets.
|
|
|
|
Usage examples:
|
|
python tools/validate.py case schemas/examples/case.sample.yaml
|
|
python tools/validate.py truth benchmark/truth/public.json
|
|
python tools/validate.py all schemas/examples
|
|
|
|
The script is offline-friendly and relies only on pinned deps from
|
|
`tools/requirements.txt`.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Iterable, Tuple
|
|
|
|
import yaml
|
|
from jsonschema import Draft202012Validator, FormatChecker
|
|
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
SCHEMAS = {
|
|
"case": ROOT / "schemas" / "case.schema.yaml",
|
|
"entrypoints": ROOT / "schemas" / "entrypoints.schema.yaml",
|
|
"truth": ROOT / "schemas" / "truth.schema.json",
|
|
"coverage": ROOT / "schemas" / "coverage.schema.json",
|
|
"trace": ROOT / "schemas" / "trace.schema.json",
|
|
"submission": ROOT / "schemas" / "submission.schema.json",
|
|
"manifest": ROOT / "benchmark" / "schemas" / "benchmark-manifest.schema.json",
|
|
}
|
|
|
|
|
|
def load_yaml_or_json(path: Path):
|
|
text = path.read_text(encoding="utf-8")
|
|
if path.suffix.lower() in {".yaml", ".yml"}:
|
|
return yaml.safe_load(text)
|
|
return json.loads(text)
|
|
|
|
|
|
def load_schema(kind: str):
|
|
schema_path = SCHEMAS[kind]
|
|
return load_yaml_or_json(schema_path)
|
|
|
|
|
|
def validate_one(kind: str, payload_path: Path) -> Tuple[bool, Tuple[str, ...]]:
|
|
schema = load_schema(kind)
|
|
document = load_yaml_or_json(payload_path)
|
|
validator = Draft202012Validator(schema, format_checker=FormatChecker())
|
|
|
|
errors = sorted(validator.iter_errors(document), key=lambda e: (list(e.path), e.message))
|
|
if errors:
|
|
messages = tuple(
|
|
f"{payload_path}: {"/".join(str(p) for p in err.path) or '<root>'}: {err.message}"
|
|
for err in errors
|
|
)
|
|
return False, messages
|
|
return True, ()
|
|
|
|
|
|
def collect_all(directory: Path) -> Iterable[Tuple[str, Path]]:
|
|
mapping = {
|
|
"case": ("case",),
|
|
"entrypoints": ("entrypoints", "entrypoint"),
|
|
"truth": ("truth",),
|
|
"coverage": ("coverage",),
|
|
"trace": ("trace", "traces"),
|
|
"submission": ("submission",),
|
|
"manifest": ("manifest",),
|
|
}
|
|
for path in sorted(directory.rglob("*")):
|
|
if not path.is_file():
|
|
continue
|
|
stem_lower = path.stem.lower()
|
|
for kind, tokens in mapping.items():
|
|
if any(token in stem_lower for token in tokens):
|
|
yield kind, path
|
|
break
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(description="Validate reachability benchmark files against schemas.")
|
|
parser.add_argument(
|
|
"kind",
|
|
choices=[
|
|
"case",
|
|
"entrypoints",
|
|
"truth",
|
|
"coverage",
|
|
"trace",
|
|
"submission",
|
|
"manifest",
|
|
"all",
|
|
],
|
|
help="Which schema to validate against or 'all' to auto-detect in a directory",
|
|
)
|
|
parser.add_argument(
|
|
"paths",
|
|
nargs="+",
|
|
help="File(s) to validate. If kind=all, provide one or more directories to scan.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
failures: list[str] = []
|
|
|
|
if args.kind == "all":
|
|
for base in args.paths:
|
|
base_path = Path(base)
|
|
if not base_path.exists():
|
|
failures.append(f"{base}: path not found")
|
|
continue
|
|
for kind, path in collect_all(base_path):
|
|
ok, messages = validate_one(kind, path)
|
|
if ok:
|
|
print(f"OK [{kind}] {path}")
|
|
else:
|
|
failures.extend(messages)
|
|
if failures:
|
|
for msg in failures:
|
|
print(f"FAIL {msg}")
|
|
return 1
|
|
return 0
|
|
|
|
# Single schema mode
|
|
for path_str in args.paths:
|
|
path = Path(path_str)
|
|
if not path.exists():
|
|
failures.append(f"{path}: path not found")
|
|
continue
|
|
ok, messages = validate_one(args.kind, path)
|
|
if ok:
|
|
print(f"OK [{args.kind}] {path}")
|
|
else:
|
|
failures.extend(messages)
|
|
|
|
if failures:
|
|
for msg in failures:
|
|
print(f"FAIL {msg}")
|
|
return 1
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|