Refactor code structure for improved readability and maintainability; removed redundant code blocks and optimized function calls.
This commit is contained in:
115
scripts/provenance_backfill.py
Normal file
115
scripts/provenance_backfill.py
Normal file
@@ -0,0 +1,115 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Deterministic provenance backfill helper for Sprint 401.
|
||||
|
||||
Reads the attestation inventory NDJSON and subject→Rekor map, emits a sorted
|
||||
NDJSON log of resolved backfill actions. No network calls are performed.
|
||||
|
||||
Usage:
|
||||
python scripts/provenance_backfill.py \
|
||||
--inventory docs/provenance/attestation-inventory-2025-11-18.ndjson \
|
||||
--subject-map docs/provenance/subject-rekor-map-2025-11-18.json \
|
||||
--out logs/provenance-backfill-2025-11-18.ndjson
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterable, List, Optional
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class InventoryRecord:
|
||||
subject: str
|
||||
dsse_hash: str
|
||||
rekor_entry: str
|
||||
|
||||
@staticmethod
|
||||
def from_json(obj: dict) -> "InventoryRecord":
|
||||
return InventoryRecord(
|
||||
subject=obj["subject"],
|
||||
dsse_hash=obj["dsseHash"],
|
||||
rekor_entry=obj.get("rekorEntry", ""),
|
||||
)
|
||||
|
||||
|
||||
def load_inventory(path: Path) -> List[InventoryRecord]:
|
||||
records: List[InventoryRecord] = []
|
||||
with path.open("r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
records.append(InventoryRecord.from_json(json.loads(line)))
|
||||
return records
|
||||
|
||||
|
||||
def load_subject_map(path: Path) -> Dict[str, str]:
|
||||
with path.open("r", encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def validate_hash(prefix: str, value: str) -> None:
|
||||
if not value.startswith("sha256:") or len(value) <= len("sha256:"):
|
||||
raise ValueError(f"{prefix} must be sha256:<hex>: got '{value}'")
|
||||
|
||||
|
||||
def build_backfill_entries(
|
||||
inventory: Iterable[InventoryRecord],
|
||||
subject_map: Dict[str, str],
|
||||
) -> List[dict]:
|
||||
entries: List[dict] = []
|
||||
for rec in inventory:
|
||||
validate_hash("dsseHash", rec.dsse_hash)
|
||||
resolved_rekor = subject_map.get(rec.subject)
|
||||
status = "resolved" if resolved_rekor else "missing_rekor_entry"
|
||||
rekor_entry = resolved_rekor or rec.rekor_entry
|
||||
if rekor_entry:
|
||||
validate_hash("rekorEntry", rekor_entry)
|
||||
entries.append(
|
||||
{
|
||||
"subject": rec.subject,
|
||||
"dsseHash": rec.dsse_hash,
|
||||
"rekorEntry": rekor_entry,
|
||||
"status": status,
|
||||
}
|
||||
)
|
||||
entries.sort(key=lambda o: (o["subject"], o["rekorEntry"] or ""))
|
||||
return entries
|
||||
|
||||
|
||||
def write_ndjson(path: Path, entries: Iterable[dict]) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with path.open("w", encoding="utf-8") as f:
|
||||
for entry in entries:
|
||||
f.write(json.dumps(entry, separators=(",", ":"), sort_keys=True))
|
||||
f.write("\n")
|
||||
|
||||
|
||||
def parse_args(argv: Optional[List[str]] = None) -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(description="Deterministic provenance backfill helper.")
|
||||
parser.add_argument("--inventory", required=True, type=Path, help="Path to attestation inventory NDJSON.")
|
||||
parser.add_argument("--subject-map", required=True, type=Path, help="Path to subject→Rekor JSON map.")
|
||||
parser.add_argument("--out", required=True, type=Path, help="Output NDJSON log path.")
|
||||
return parser.parse_args(argv)
|
||||
|
||||
|
||||
def main(argv: Optional[List[str]] = None) -> int:
|
||||
args = parse_args(argv)
|
||||
inventory = load_inventory(args.inventory)
|
||||
subject_map = load_subject_map(args.subject_map)
|
||||
entries = build_backfill_entries(inventory, subject_map)
|
||||
write_ndjson(args.out, entries)
|
||||
|
||||
resolved = sum(1 for e in entries if e["status"] == "resolved")
|
||||
missing = sum(1 for e in entries if e["status"] != "resolved")
|
||||
print(f"wrote {len(entries)} entries -> {args.out} (resolved={resolved}, missing={missing})")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user