Some checks failed
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
Mirror Thin Bundle Sign & Verify / mirror-sign (push) Has been cancelled
Concelier Attestation Tests / attestation-tests (push) Has been cancelled
Export Center CI / export-ci (push) Has been cancelled
- Implement unit tests for RichGraphPublisher to verify graph publishing to CAS. - Implement unit tests for RichGraphWriter to ensure correct writing of canonical graphs and metadata. feat: Implement AOC Guard validation logic - Add AOC Guard validation logic to enforce document structure and field constraints. - Introduce violation codes for various validation errors. - Implement tests for AOC Guard to validate expected behavior. feat: Create Console Status API client and service - Implement ConsoleStatusClient for fetching console status and streaming run events. - Create ConsoleStatusService to manage console status polling and event subscriptions. - Add tests for ConsoleStatusClient to verify API interactions. feat: Develop Console Status component - Create ConsoleStatusComponent for displaying console status and run events. - Implement UI for showing status metrics and handling user interactions. - Add styles for console status display. test: Add tests for Console Status store - Implement tests for ConsoleStatusStore to verify event handling and state management.
81 lines
2.9 KiB
Python
81 lines
2.9 KiB
Python
#!/usr/bin/env python3
|
|
"""Normalize Semgrep JSON output into benchmark submission schema."""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any
|
|
|
|
import yaml
|
|
|
|
|
|
def load_case(case_path: Path) -> Dict[str, Any]:
|
|
return yaml.safe_load(case_path.read_text(encoding="utf-8"))
|
|
|
|
|
|
def load_semgrep(path: Path) -> Dict[str, Any]:
|
|
if not path.exists():
|
|
return {"results": []}
|
|
try:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
except json.JSONDecodeError:
|
|
return {"results": []}
|
|
|
|
|
|
def sink_prediction(results: List[Dict[str, Any]], sink_file: str) -> Dict[str, Any]:
|
|
# basic heuristic: reachable if any finding touches the sink file
|
|
hits = [r for r in results if r.get("path", "").endswith(sink_file)]
|
|
if hits:
|
|
first = hits[0]
|
|
line = first.get("start", {}).get("line") or first.get("end", {}).get("line") or 0
|
|
explain_path = [f"entry:{sink_file}:0", f"sink:{sink_file}:{line}"]
|
|
return {"prediction": "reachable", "confidence": 0.6, "explain": {"path": explain_path}}
|
|
return {"prediction": "unreachable", "confidence": 0.4}
|
|
|
|
|
|
def build_submission(case_meta: Dict[str, Any], results: Dict[str, Any], tool_version: str) -> Dict[str, Any]:
|
|
sinks_out = []
|
|
sinks = case_meta.get("sinks") or []
|
|
semgrep_results = results.get("results") or []
|
|
for sink in sorted(sinks, key=lambda s: s.get("id", "")):
|
|
loc = sink.get("location", {}) if isinstance(sink, dict) else {}
|
|
sink_file = Path(loc.get("file", "")).name
|
|
pred = sink_prediction(semgrep_results, sink_file)
|
|
sinks_out.append({
|
|
"sink_id": sink.get("id", "unknown"),
|
|
"prediction": pred["prediction"],
|
|
"confidence": pred["confidence"],
|
|
**({"explain": pred["explain"]} if "explain" in pred else {})
|
|
})
|
|
|
|
return {
|
|
"version": "1.0.0",
|
|
"tool": {"name": "semgrep", "version": tool_version},
|
|
"run": {"platform": "local-semgrep-baseline"},
|
|
"cases": [{
|
|
"case_id": str(case_meta.get("id") or case_meta.get("project") or "unknown-case"),
|
|
"sinks": sinks_out
|
|
}]
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--case", type=Path, required=True, help="Path to case.yaml")
|
|
ap.add_argument("--semgrep", type=Path, required=True, help="Path to semgrep JSON output")
|
|
ap.add_argument("--tool-version", type=str, default="unknown")
|
|
ap.add_argument("--output", type=Path, required=True)
|
|
args = ap.parse_args()
|
|
|
|
case_meta = load_case(args.case)
|
|
semgrep_out = load_semgrep(args.semgrep)
|
|
submission = build_submission(case_meta, semgrep_out, args.tool_version)
|
|
args.output.parent.mkdir(parents=True, exist_ok=True)
|
|
args.output.write_text(json.dumps(submission, indent=2, sort_keys=True), encoding="utf-8")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|