Files
git.stella-ops.org/scripts/provenance_backfill.py
master 10212d67c0
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
api-governance / spectral-lint (push) Has been cancelled
Refactor code structure for improved readability and maintainability; removed redundant code blocks and optimized function calls.
2025-11-20 07:50:52 +02:00

116 lines
3.8 KiB
Python

#!/usr/bin/env python3
"""
Deterministic provenance backfill helper for Sprint 401.
Reads the attestation inventory NDJSON and subject→Rekor map, emits a sorted
NDJSON log of resolved backfill actions. No network calls are performed.
Usage:
python scripts/provenance_backfill.py \
--inventory docs/provenance/attestation-inventory-2025-11-18.ndjson \
--subject-map docs/provenance/subject-rekor-map-2025-11-18.json \
--out logs/provenance-backfill-2025-11-18.ndjson
"""
from __future__ import annotations
import argparse
import json
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Optional
@dataclass(frozen=True)
class InventoryRecord:
subject: str
dsse_hash: str
rekor_entry: str
@staticmethod
def from_json(obj: dict) -> "InventoryRecord":
return InventoryRecord(
subject=obj["subject"],
dsse_hash=obj["dsseHash"],
rekor_entry=obj.get("rekorEntry", ""),
)
def load_inventory(path: Path) -> List[InventoryRecord]:
records: List[InventoryRecord] = []
with path.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
records.append(InventoryRecord.from_json(json.loads(line)))
return records
def load_subject_map(path: Path) -> Dict[str, str]:
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def validate_hash(prefix: str, value: str) -> None:
if not value.startswith("sha256:") or len(value) <= len("sha256:"):
raise ValueError(f"{prefix} must be sha256:<hex>: got '{value}'")
def build_backfill_entries(
inventory: Iterable[InventoryRecord],
subject_map: Dict[str, str],
) -> List[dict]:
entries: List[dict] = []
for rec in inventory:
validate_hash("dsseHash", rec.dsse_hash)
resolved_rekor = subject_map.get(rec.subject)
status = "resolved" if resolved_rekor else "missing_rekor_entry"
rekor_entry = resolved_rekor or rec.rekor_entry
if rekor_entry:
validate_hash("rekorEntry", rekor_entry)
entries.append(
{
"subject": rec.subject,
"dsseHash": rec.dsse_hash,
"rekorEntry": rekor_entry,
"status": status,
}
)
entries.sort(key=lambda o: (o["subject"], o["rekorEntry"] or ""))
return entries
def write_ndjson(path: Path, entries: Iterable[dict]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as f:
for entry in entries:
f.write(json.dumps(entry, separators=(",", ":"), sort_keys=True))
f.write("\n")
def parse_args(argv: Optional[List[str]] = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Deterministic provenance backfill helper.")
parser.add_argument("--inventory", required=True, type=Path, help="Path to attestation inventory NDJSON.")
parser.add_argument("--subject-map", required=True, type=Path, help="Path to subject→Rekor JSON map.")
parser.add_argument("--out", required=True, type=Path, help="Output NDJSON log path.")
return parser.parse_args(argv)
def main(argv: Optional[List[str]] = None) -> int:
args = parse_args(argv)
inventory = load_inventory(args.inventory)
subject_map = load_subject_map(args.subject_map)
entries = build_backfill_entries(inventory, subject_map)
write_ndjson(args.out, entries)
resolved = sum(1 for e in entries if e["status"] == "resolved")
missing = sum(1 for e in entries if e["status"] != "resolved")
print(f"wrote {len(entries)} entries -> {args.out} (resolved={resolved}, missing={missing})")
return 0
if __name__ == "__main__":
sys.exit(main())