#!/usr/bin/env python3 """ Deterministic provenance backfill helper for Sprint 401. Reads the attestation inventory NDJSON and subject→Rekor map, emits a sorted NDJSON log of resolved backfill actions. No network calls are performed. Usage: python scripts/provenance_backfill.py \ --inventory docs/provenance/attestation-inventory-2025-11-18.ndjson \ --subject-map docs/provenance/subject-rekor-map-2025-11-18.json \ --out logs/provenance-backfill-2025-11-18.ndjson """ from __future__ import annotations import argparse import json import sys from dataclasses import dataclass from pathlib import Path from typing import Dict, Iterable, List, Optional @dataclass(frozen=True) class InventoryRecord: subject: str dsse_hash: str rekor_entry: str @staticmethod def from_json(obj: dict) -> "InventoryRecord": return InventoryRecord( subject=obj["subject"], dsse_hash=obj["dsseHash"], rekor_entry=obj.get("rekorEntry", ""), ) def load_inventory(path: Path) -> List[InventoryRecord]: records: List[InventoryRecord] = [] with path.open("r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue records.append(InventoryRecord.from_json(json.loads(line))) return records def load_subject_map(path: Path) -> Dict[str, str]: with path.open("r", encoding="utf-8") as f: return json.load(f) def validate_hash(prefix: str, value: str) -> None: if not value.startswith("sha256:") or len(value) <= len("sha256:"): raise ValueError(f"{prefix} must be sha256:: got '{value}'") def build_backfill_entries( inventory: Iterable[InventoryRecord], subject_map: Dict[str, str], ) -> List[dict]: entries: List[dict] = [] for rec in inventory: validate_hash("dsseHash", rec.dsse_hash) resolved_rekor = subject_map.get(rec.subject) status = "resolved" if resolved_rekor else "missing_rekor_entry" rekor_entry = resolved_rekor or rec.rekor_entry if rekor_entry: validate_hash("rekorEntry", rekor_entry) entries.append( { "subject": rec.subject, "dsseHash": rec.dsse_hash, "rekorEntry": rekor_entry, "status": status, } ) entries.sort(key=lambda o: (o["subject"], o["rekorEntry"] or "")) return entries def write_ndjson(path: Path, entries: Iterable[dict]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as f: for entry in entries: f.write(json.dumps(entry, separators=(",", ":"), sort_keys=True)) f.write("\n") def parse_args(argv: Optional[List[str]] = None) -> argparse.Namespace: parser = argparse.ArgumentParser(description="Deterministic provenance backfill helper.") parser.add_argument("--inventory", required=True, type=Path, help="Path to attestation inventory NDJSON.") parser.add_argument("--subject-map", required=True, type=Path, help="Path to subject→Rekor JSON map.") parser.add_argument("--out", required=True, type=Path, help="Output NDJSON log path.") return parser.parse_args(argv) def main(argv: Optional[List[str]] = None) -> int: args = parse_args(argv) inventory = load_inventory(args.inventory) subject_map = load_subject_map(args.subject_map) entries = build_backfill_entries(inventory, subject_map) write_ndjson(args.out, entries) resolved = sum(1 for e in entries if e["status"] == "resolved") missing = sum(1 for e in entries if e["status"] != "resolved") print(f"wrote {len(entries)} entries -> {args.out} (resolved={resolved}, missing={missing})") return 0 if __name__ == "__main__": sys.exit(main())