Refactor code structure for improved readability and maintainability; optimize performance in key functions.
This commit is contained in:
57
scripts/corpus/add-case.py
Normal file
57
scripts/corpus/add-case.py
Normal file
@@ -0,0 +1,57 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Add a new corpus case from a template."""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[2]
|
||||
CORPUS = ROOT / "bench" / "golden-corpus" / "categories"
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--category", required=True)
|
||||
parser.add_argument("--name", required=True)
|
||||
args = parser.parse_args()
|
||||
|
||||
case_dir = CORPUS / args.category / args.name
|
||||
(case_dir / "input").mkdir(parents=True, exist_ok=True)
|
||||
(case_dir / "expected").mkdir(parents=True, exist_ok=True)
|
||||
|
||||
created_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
(case_dir / "case-manifest.json").write_text(
|
||||
'{\n'
|
||||
f' "id": "{args.name}",\n'
|
||||
f' "category": "{args.category}",\n'
|
||||
' "description": "New corpus case",\n'
|
||||
f' "createdAt": "{created_at}",\n'
|
||||
' "inputs": ["sbom-cyclonedx.json", "sbom-spdx.json", "image.tar.gz"],\n'
|
||||
' "expected": ["verdict.json", "evidence-index.json", "unknowns.json", "delta-verdict.json"]\n'
|
||||
'}\n',
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
for rel in [
|
||||
"input/sbom-cyclonedx.json",
|
||||
"input/sbom-spdx.json",
|
||||
"input/image.tar.gz",
|
||||
"expected/verdict.json",
|
||||
"expected/evidence-index.json",
|
||||
"expected/unknowns.json",
|
||||
"expected/delta-verdict.json",
|
||||
"run-manifest.json",
|
||||
]:
|
||||
target = case_dir / rel
|
||||
if target.suffix == ".gz":
|
||||
target.touch()
|
||||
else:
|
||||
target.write_text("{}\n", encoding="utf-8")
|
||||
|
||||
print(f"Created case at {case_dir}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
48
scripts/corpus/check-determinism.py
Normal file
48
scripts/corpus/check-determinism.py
Normal file
@@ -0,0 +1,48 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Check determinism by verifying manifest digests match stored values."""
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[2]
|
||||
MANIFEST = ROOT / "bench" / "golden-corpus" / "corpus-manifest.json"
|
||||
|
||||
|
||||
def sha256(path: Path) -> str:
|
||||
h = hashlib.sha256()
|
||||
with path.open("rb") as fh:
|
||||
while True:
|
||||
chunk = fh.read(8192)
|
||||
if not chunk:
|
||||
break
|
||||
h.update(chunk)
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def main() -> int:
|
||||
if not MANIFEST.exists():
|
||||
print(f"Manifest not found: {MANIFEST}")
|
||||
return 1
|
||||
|
||||
data = json.loads(MANIFEST.read_text(encoding="utf-8"))
|
||||
mismatches = []
|
||||
for case in data.get("cases", []):
|
||||
path = ROOT / case["path"]
|
||||
manifest_path = path / "case-manifest.json"
|
||||
digest = f"sha256:{sha256(manifest_path)}"
|
||||
if digest != case.get("manifestDigest"):
|
||||
mismatches.append({"id": case.get("id"), "expected": case.get("manifestDigest"), "actual": digest})
|
||||
|
||||
if mismatches:
|
||||
print(json.dumps({"status": "fail", "mismatches": mismatches}, indent=2))
|
||||
return 1
|
||||
|
||||
print(json.dumps({"status": "ok", "checked": len(data.get("cases", []))}, indent=2))
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
47
scripts/corpus/generate-manifest.py
Normal file
47
scripts/corpus/generate-manifest.py
Normal file
@@ -0,0 +1,47 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Generate corpus-manifest.json from case directories."""
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[2]
|
||||
CORPUS = ROOT / "bench" / "golden-corpus" / "categories"
|
||||
OUTPUT = ROOT / "bench" / "golden-corpus" / "corpus-manifest.json"
|
||||
|
||||
|
||||
def sha256(path: Path) -> str:
|
||||
h = hashlib.sha256()
|
||||
with path.open("rb") as fh:
|
||||
while True:
|
||||
chunk = fh.read(8192)
|
||||
if not chunk:
|
||||
break
|
||||
h.update(chunk)
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def main() -> int:
|
||||
cases = []
|
||||
for case_dir in sorted([p for p in CORPUS.rglob("*") if p.is_dir() and (p / "case-manifest.json").exists()]):
|
||||
manifest_path = case_dir / "case-manifest.json"
|
||||
cases.append({
|
||||
"id": case_dir.name,
|
||||
"path": str(case_dir.relative_to(ROOT)).replace("\\", "/"),
|
||||
"manifestDigest": f"sha256:{sha256(manifest_path)}",
|
||||
})
|
||||
|
||||
payload = {
|
||||
"generatedAt": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
|
||||
"caseCount": len(cases),
|
||||
"cases": cases,
|
||||
}
|
||||
|
||||
OUTPUT.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
54
scripts/corpus/validate-corpus.py
Normal file
54
scripts/corpus/validate-corpus.py
Normal file
@@ -0,0 +1,54 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Validate golden corpus case structure."""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[2]
|
||||
CORPUS = ROOT / "bench" / "golden-corpus" / "categories"
|
||||
|
||||
REQUIRED = [
|
||||
"case-manifest.json",
|
||||
"run-manifest.json",
|
||||
"input/sbom-cyclonedx.json",
|
||||
"input/sbom-spdx.json",
|
||||
"input/image.tar.gz",
|
||||
"expected/verdict.json",
|
||||
"expected/evidence-index.json",
|
||||
"expected/unknowns.json",
|
||||
"expected/delta-verdict.json",
|
||||
]
|
||||
|
||||
|
||||
def validate_case(case_dir: Path) -> list[str]:
|
||||
missing = []
|
||||
for rel in REQUIRED:
|
||||
if not (case_dir / rel).exists():
|
||||
missing.append(rel)
|
||||
return missing
|
||||
|
||||
|
||||
def main() -> int:
|
||||
if not CORPUS.exists():
|
||||
print(f"Corpus path not found: {CORPUS}")
|
||||
return 1
|
||||
|
||||
errors = []
|
||||
cases = sorted([p for p in CORPUS.rglob("*") if p.is_dir() and (p / "case-manifest.json").exists()])
|
||||
for case in cases:
|
||||
missing = validate_case(case)
|
||||
if missing:
|
||||
errors.append({"case": str(case.relative_to(ROOT)), "missing": missing})
|
||||
|
||||
if errors:
|
||||
print(json.dumps({"status": "fail", "errors": errors}, indent=2))
|
||||
return 1
|
||||
|
||||
print(json.dumps({"status": "ok", "cases": len(cases)}, indent=2))
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user