Files
git.stella-ops.org/scripts/bench/compute-metrics.py
StellaOps Bot 233873f620
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
Signals CI & Image / signals-ci (push) Has been cancelled
Signals Reachability Scoring & Events / reachability-smoke (push) Has been cancelled
Signals Reachability Scoring & Events / sign-and-upload (push) Has been cancelled
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Reachability Corpus Validation / validate-corpus (push) Has been cancelled
Reachability Corpus Validation / validate-ground-truths (push) Has been cancelled
Scanner Analyzers / Discover Analyzers (push) Has been cancelled
Scanner Analyzers / Validate Test Fixtures (push) Has been cancelled
Reachability Corpus Validation / determinism-check (push) Has been cancelled
Scanner Analyzers / Build Analyzers (push) Has been cancelled
Scanner Analyzers / Test Language Analyzers (push) Has been cancelled
Scanner Analyzers / Verify Deterministic Output (push) Has been cancelled
Notify Smoke Test / Notify Unit Tests (push) Has been cancelled
Notify Smoke Test / Notifier Service Tests (push) Has been cancelled
Notify Smoke Test / Notification Smoke Test (push) Has been cancelled
Policy Lint & Smoke / policy-lint (push) Has been cancelled
up
2025-12-14 15:50:38 +02:00

354 lines
11 KiB
Python

#!/usr/bin/env python3
# SPDX-License-Identifier: AGPL-3.0-or-later
# BENCH-AUTO-401-019: Compute FP/MTTD/repro metrics from bench findings
"""
Computes benchmark metrics from bench/findings/** and outputs to results/summary.csv.
Metrics:
- True Positives (TP): Reachable vulns correctly identified
- False Positives (FP): Unreachable vulns incorrectly marked affected
- True Negatives (TN): Unreachable vulns correctly marked not_affected
- False Negatives (FN): Reachable vulns missed
- MTTD: Mean Time To Detect (simulated)
- Reproducibility: Determinism score
Usage:
python scripts/bench/compute-metrics.py [--findings PATH] [--output PATH] [--baseline PATH]
"""
import argparse
import csv
import json
import os
import sys
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
@dataclass
class FindingMetrics:
"""Metrics for a single finding."""
finding_id: str
cve_id: str
variant: str # reachable or unreachable
vex_status: str # affected or not_affected
is_correct: bool
detection_time_ms: float = 0.0
evidence_hash: str = ""
@dataclass
class AggregateMetrics:
"""Aggregated benchmark metrics."""
total_findings: int = 0
true_positives: int = 0 # reachable + affected
false_positives: int = 0 # unreachable + affected
true_negatives: int = 0 # unreachable + not_affected
false_negatives: int = 0 # reachable + not_affected
mttd_ms: float = 0.0
reproducibility: float = 1.0
findings: list = field(default_factory=list)
@property
def precision(self) -> float:
"""TP / (TP + FP)"""
denom = self.true_positives + self.false_positives
return self.true_positives / denom if denom > 0 else 0.0
@property
def recall(self) -> float:
"""TP / (TP + FN)"""
denom = self.true_positives + self.false_negatives
return self.true_positives / denom if denom > 0 else 0.0
@property
def f1_score(self) -> float:
"""2 * (precision * recall) / (precision + recall)"""
p, r = self.precision, self.recall
return 2 * p * r / (p + r) if (p + r) > 0 else 0.0
@property
def accuracy(self) -> float:
"""(TP + TN) / total"""
correct = self.true_positives + self.true_negatives
return correct / self.total_findings if self.total_findings > 0 else 0.0
def load_finding(finding_dir: Path) -> FindingMetrics | None:
"""Load a finding from its directory."""
metadata_path = finding_dir / "metadata.json"
openvex_path = finding_dir / "decision.openvex.json"
if not metadata_path.exists() or not openvex_path.exists():
return None
with open(metadata_path, 'r', encoding='utf-8') as f:
metadata = json.load(f)
with open(openvex_path, 'r', encoding='utf-8') as f:
openvex = json.load(f)
# Extract VEX status
statements = openvex.get("statements", [])
vex_status = statements[0].get("status", "unknown") if statements else "unknown"
# Determine correctness
variant = metadata.get("variant", "unknown")
is_correct = (
(variant == "reachable" and vex_status == "affected") or
(variant == "unreachable" and vex_status == "not_affected")
)
# Extract evidence hash from impact_statement
evidence_hash = ""
if statements:
impact = statements[0].get("impact_statement", "")
if "Evidence hash:" in impact:
evidence_hash = impact.split("Evidence hash:")[1].strip()
return FindingMetrics(
finding_id=finding_dir.name,
cve_id=metadata.get("cve_id", "UNKNOWN"),
variant=variant,
vex_status=vex_status,
is_correct=is_correct,
evidence_hash=evidence_hash
)
def compute_metrics(findings_dir: Path) -> AggregateMetrics:
"""Compute aggregate metrics from all findings."""
metrics = AggregateMetrics()
if not findings_dir.exists():
return metrics
for finding_path in sorted(findings_dir.iterdir()):
if not finding_path.is_dir():
continue
finding = load_finding(finding_path)
if finding is None:
continue
metrics.total_findings += 1
metrics.findings.append(finding)
# Classify finding
if finding.variant == "reachable":
if finding.vex_status == "affected":
metrics.true_positives += 1
else:
metrics.false_negatives += 1
else: # unreachable
if finding.vex_status == "not_affected":
metrics.true_negatives += 1
else:
metrics.false_positives += 1
# Compute MTTD (simulated - based on evidence availability)
# In real scenarios, this would be the time from CVE publication to detection
metrics.mttd_ms = sum(f.detection_time_ms for f in metrics.findings)
if metrics.total_findings > 0:
metrics.mttd_ms /= metrics.total_findings
return metrics
def load_baseline(baseline_path: Path) -> dict:
"""Load baseline scanner results for comparison."""
if not baseline_path.exists():
return {}
with open(baseline_path, 'r', encoding='utf-8') as f:
return json.load(f)
def compare_with_baseline(metrics: AggregateMetrics, baseline: dict) -> dict:
"""Compare StellaOps metrics with baseline scanner."""
comparison = {
"stellaops": {
"precision": metrics.precision,
"recall": metrics.recall,
"f1_score": metrics.f1_score,
"accuracy": metrics.accuracy,
"false_positive_rate": metrics.false_positives / metrics.total_findings if metrics.total_findings > 0 else 0
}
}
if baseline:
# Extract baseline metrics
baseline_metrics = baseline.get("metrics", {})
comparison["baseline"] = {
"precision": baseline_metrics.get("precision", 0),
"recall": baseline_metrics.get("recall", 0),
"f1_score": baseline_metrics.get("f1_score", 0),
"accuracy": baseline_metrics.get("accuracy", 0),
"false_positive_rate": baseline_metrics.get("false_positive_rate", 0)
}
# Compute deltas
comparison["delta"] = {
k: comparison["stellaops"][k] - comparison["baseline"].get(k, 0)
for k in comparison["stellaops"]
}
return comparison
def write_summary_csv(metrics: AggregateMetrics, comparison: dict, output_path: Path):
"""Write summary.csv with all metrics."""
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# Header
writer.writerow([
"timestamp",
"total_findings",
"true_positives",
"false_positives",
"true_negatives",
"false_negatives",
"precision",
"recall",
"f1_score",
"accuracy",
"mttd_ms",
"reproducibility"
])
# Data row
writer.writerow([
datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
metrics.total_findings,
metrics.true_positives,
metrics.false_positives,
metrics.true_negatives,
metrics.false_negatives,
f"{metrics.precision:.4f}",
f"{metrics.recall:.4f}",
f"{metrics.f1_score:.4f}",
f"{metrics.accuracy:.4f}",
f"{metrics.mttd_ms:.2f}",
f"{metrics.reproducibility:.4f}"
])
def write_detailed_json(metrics: AggregateMetrics, comparison: dict, output_path: Path):
"""Write detailed JSON report."""
output_path.parent.mkdir(parents=True, exist_ok=True)
report = {
"generated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"summary": {
"total_findings": metrics.total_findings,
"true_positives": metrics.true_positives,
"false_positives": metrics.false_positives,
"true_negatives": metrics.true_negatives,
"false_negatives": metrics.false_negatives,
"precision": metrics.precision,
"recall": metrics.recall,
"f1_score": metrics.f1_score,
"accuracy": metrics.accuracy,
"mttd_ms": metrics.mttd_ms,
"reproducibility": metrics.reproducibility
},
"comparison": comparison,
"findings": [
{
"finding_id": f.finding_id,
"cve_id": f.cve_id,
"variant": f.variant,
"vex_status": f.vex_status,
"is_correct": f.is_correct,
"evidence_hash": f.evidence_hash
}
for f in metrics.findings
]
}
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, sort_keys=True)
def main():
parser = argparse.ArgumentParser(
description="Compute FP/MTTD/repro metrics from bench findings"
)
parser.add_argument(
"--findings",
type=Path,
default=Path("bench/findings"),
help="Path to findings directory"
)
parser.add_argument(
"--output",
type=Path,
default=Path("bench/results"),
help="Output directory for metrics"
)
parser.add_argument(
"--baseline",
type=Path,
default=None,
help="Path to baseline scanner results JSON"
)
parser.add_argument(
"--json",
action="store_true",
help="Also output detailed JSON report"
)
args = parser.parse_args()
# Resolve paths relative to repo root
repo_root = Path(__file__).parent.parent.parent
findings_path = repo_root / args.findings if not args.findings.is_absolute() else args.findings
output_path = repo_root / args.output if not args.output.is_absolute() else args.output
print(f"Findings path: {findings_path}")
print(f"Output path: {output_path}")
# Compute metrics
metrics = compute_metrics(findings_path)
print(f"\nMetrics Summary:")
print(f" Total findings: {metrics.total_findings}")
print(f" True Positives: {metrics.true_positives}")
print(f" False Positives: {metrics.false_positives}")
print(f" True Negatives: {metrics.true_negatives}")
print(f" False Negatives: {metrics.false_negatives}")
print(f" Precision: {metrics.precision:.4f}")
print(f" Recall: {metrics.recall:.4f}")
print(f" F1 Score: {metrics.f1_score:.4f}")
print(f" Accuracy: {metrics.accuracy:.4f}")
# Load baseline if provided
baseline = {}
if args.baseline:
baseline_path = repo_root / args.baseline if not args.baseline.is_absolute() else args.baseline
baseline = load_baseline(baseline_path)
if baseline:
print(f"\nBaseline comparison loaded from: {baseline_path}")
comparison = compare_with_baseline(metrics, baseline)
# Write outputs
write_summary_csv(metrics, comparison, output_path / "summary.csv")
print(f"\nWrote summary to: {output_path / 'summary.csv'}")
if args.json:
write_detailed_json(metrics, comparison, output_path / "metrics.json")
print(f"Wrote detailed report to: {output_path / 'metrics.json'}")
return 0
if __name__ == "__main__":
sys.exit(main())