#!/usr/bin/env python3 """Normalize Semgrep JSON output into benchmark submission schema.""" from __future__ import annotations import argparse import json from pathlib import Path from typing import Dict, List, Any import yaml def load_case(case_path: Path) -> Dict[str, Any]: return yaml.safe_load(case_path.read_text(encoding="utf-8")) def load_semgrep(path: Path) -> Dict[str, Any]: if not path.exists(): return {"results": []} try: return json.loads(path.read_text(encoding="utf-8")) except json.JSONDecodeError: return {"results": []} def sink_prediction(results: List[Dict[str, Any]], sink_file: str) -> Dict[str, Any]: # basic heuristic: reachable if any finding touches the sink file hits = [r for r in results if r.get("path", "").endswith(sink_file)] if hits: first = hits[0] line = first.get("start", {}).get("line") or first.get("end", {}).get("line") or 0 explain_path = [f"entry:{sink_file}:0", f"sink:{sink_file}:{line}"] return {"prediction": "reachable", "confidence": 0.6, "explain": {"path": explain_path}} return {"prediction": "unreachable", "confidence": 0.4} def build_submission(case_meta: Dict[str, Any], results: Dict[str, Any], tool_version: str) -> Dict[str, Any]: sinks_out = [] sinks = case_meta.get("sinks") or [] semgrep_results = results.get("results") or [] for sink in sorted(sinks, key=lambda s: s.get("id", "")): loc = sink.get("location", {}) if isinstance(sink, dict) else {} sink_file = Path(loc.get("file", "")).name pred = sink_prediction(semgrep_results, sink_file) sinks_out.append({ "sink_id": sink.get("id", "unknown"), "prediction": pred["prediction"], "confidence": pred["confidence"], **({"explain": pred["explain"]} if "explain" in pred else {}) }) return { "version": "1.0.0", "tool": {"name": "semgrep", "version": tool_version}, "run": {"platform": "local-semgrep-baseline"}, "cases": [{ "case_id": str(case_meta.get("id") or case_meta.get("project") or "unknown-case"), "sinks": sinks_out }] } def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--case", type=Path, required=True, help="Path to case.yaml") ap.add_argument("--semgrep", type=Path, required=True, help="Path to semgrep JSON output") ap.add_argument("--tool-version", type=str, default="unknown") ap.add_argument("--output", type=Path, required=True) args = ap.parse_args() case_meta = load_case(args.case) semgrep_out = load_semgrep(args.semgrep) submission = build_submission(case_meta, semgrep_out, args.tool_version) args.output.parent.mkdir(parents=True, exist_ok=True) args.output.write_text(json.dumps(submission, indent=2, sort_keys=True), encoding="utf-8") return 0 if __name__ == "__main__": raise SystemExit(main())