116 lines
3.8 KiB
Python
116 lines
3.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Deterministic provenance backfill helper for Sprint 401.
|
|
|
|
Reads the attestation inventory NDJSON and subject→Rekor map, emits a sorted
|
|
NDJSON log of resolved backfill actions. No network calls are performed.
|
|
|
|
Usage:
|
|
python scripts/provenance_backfill.py \
|
|
--inventory docs/provenance/attestation-inventory-2025-11-18.ndjson \
|
|
--subject-map docs/provenance/subject-rekor-map-2025-11-18.json \
|
|
--out logs/provenance-backfill-2025-11-18.ndjson
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Dict, Iterable, List, Optional
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class InventoryRecord:
|
|
subject: str
|
|
dsse_hash: str
|
|
rekor_entry: str
|
|
|
|
@staticmethod
|
|
def from_json(obj: dict) -> "InventoryRecord":
|
|
return InventoryRecord(
|
|
subject=obj["subject"],
|
|
dsse_hash=obj["dsseHash"],
|
|
rekor_entry=obj.get("rekorEntry", ""),
|
|
)
|
|
|
|
|
|
def load_inventory(path: Path) -> List[InventoryRecord]:
|
|
records: List[InventoryRecord] = []
|
|
with path.open("r", encoding="utf-8") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
records.append(InventoryRecord.from_json(json.loads(line)))
|
|
return records
|
|
|
|
|
|
def load_subject_map(path: Path) -> Dict[str, str]:
|
|
with path.open("r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
|
|
def validate_hash(prefix: str, value: str) -> None:
|
|
if not value.startswith("sha256:") or len(value) <= len("sha256:"):
|
|
raise ValueError(f"{prefix} must be sha256:<hex>: got '{value}'")
|
|
|
|
|
|
def build_backfill_entries(
|
|
inventory: Iterable[InventoryRecord],
|
|
subject_map: Dict[str, str],
|
|
) -> List[dict]:
|
|
entries: List[dict] = []
|
|
for rec in inventory:
|
|
validate_hash("dsseHash", rec.dsse_hash)
|
|
resolved_rekor = subject_map.get(rec.subject)
|
|
status = "resolved" if resolved_rekor else "missing_rekor_entry"
|
|
rekor_entry = resolved_rekor or rec.rekor_entry
|
|
if rekor_entry:
|
|
validate_hash("rekorEntry", rekor_entry)
|
|
entries.append(
|
|
{
|
|
"subject": rec.subject,
|
|
"dsseHash": rec.dsse_hash,
|
|
"rekorEntry": rekor_entry,
|
|
"status": status,
|
|
}
|
|
)
|
|
entries.sort(key=lambda o: (o["subject"], o["rekorEntry"] or ""))
|
|
return entries
|
|
|
|
|
|
def write_ndjson(path: Path, entries: Iterable[dict]) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with path.open("w", encoding="utf-8") as f:
|
|
for entry in entries:
|
|
f.write(json.dumps(entry, separators=(",", ":"), sort_keys=True))
|
|
f.write("\n")
|
|
|
|
|
|
def parse_args(argv: Optional[List[str]] = None) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Deterministic provenance backfill helper.")
|
|
parser.add_argument("--inventory", required=True, type=Path, help="Path to attestation inventory NDJSON.")
|
|
parser.add_argument("--subject-map", required=True, type=Path, help="Path to subject→Rekor JSON map.")
|
|
parser.add_argument("--out", required=True, type=Path, help="Output NDJSON log path.")
|
|
return parser.parse_args(argv)
|
|
|
|
|
|
def main(argv: Optional[List[str]] = None) -> int:
|
|
args = parse_args(argv)
|
|
inventory = load_inventory(args.inventory)
|
|
subject_map = load_subject_map(args.subject_map)
|
|
entries = build_backfill_entries(inventory, subject_map)
|
|
write_ndjson(args.out, entries)
|
|
|
|
resolved = sum(1 for e in entries if e["status"] == "resolved")
|
|
missing = sum(1 for e in entries if e["status"] != "resolved")
|
|
print(f"wrote {len(entries)} entries -> {args.out} (resolved={resolved}, missing={missing})")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|