This commit is contained in:
98
scripts/mirror/verify_thin_bundle.py
Normal file
98
scripts/mirror/verify_thin_bundle.py
Normal file
@@ -0,0 +1,98 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Simple verifier for mirror-thin-v1 artefacts.
|
||||
Checks:
|
||||
1) SHA256 of manifest and tarball matches provided .sha256 files.
|
||||
2) Manifest schema has required fields.
|
||||
3) Tarball contains manifest.json, layers/, indexes/ with deterministic tar headers (mtime=0, uid/gid=0, sorted paths).
|
||||
4) Tar content digests match manifest entries.
|
||||
|
||||
Usage:
|
||||
python scripts/mirror/verify_thin_bundle.py out/mirror/thin/mirror-thin-v1.manifest.json out/mirror/thin/mirror-thin-v1.tar.gz
|
||||
|
||||
Exit code 0 on success; non-zero on any check failure.
|
||||
"""
|
||||
import json, tarfile, hashlib, sys, pathlib
|
||||
|
||||
REQUIRED_FIELDS = ["version", "created", "layers", "indexes"]
|
||||
|
||||
def sha256_file(path: pathlib.Path) -> str:
|
||||
h = hashlib.sha256()
|
||||
with path.open("rb") as f:
|
||||
for chunk in iter(lambda: f.read(8192), b""):
|
||||
h.update(chunk)
|
||||
return h.hexdigest()
|
||||
|
||||
def load_sha256_sidecar(path: pathlib.Path) -> str:
|
||||
sidecar = path.with_suffix(path.suffix + ".sha256")
|
||||
if not sidecar.exists():
|
||||
raise SystemExit(f"missing sidecar {sidecar}")
|
||||
return sidecar.read_text().strip().split()[0]
|
||||
|
||||
def check_schema(manifest: dict):
|
||||
missing = [f for f in REQUIRED_FIELDS if f not in manifest]
|
||||
if missing:
|
||||
raise SystemExit(f"manifest missing fields: {missing}")
|
||||
|
||||
def normalize(name: str) -> str:
|
||||
return name[2:] if name.startswith("./") else name
|
||||
|
||||
def check_tar_determinism(tar_path: pathlib.Path):
|
||||
with tarfile.open(tar_path, "r:gz") as tf:
|
||||
names = [normalize(n) for n in tf.getnames()]
|
||||
if names != sorted(names):
|
||||
raise SystemExit("tar entries not sorted")
|
||||
for m in tf.getmembers():
|
||||
if m.uid != 0 or m.gid != 0:
|
||||
raise SystemExit(f"tar header uid/gid not zero for {m.name}")
|
||||
if m.mtime != 0:
|
||||
raise SystemExit(f"tar header mtime not zero for {m.name}")
|
||||
|
||||
def check_content_hashes(manifest: dict, tar_path: pathlib.Path):
|
||||
with tarfile.open(tar_path, "r:gz") as tf:
|
||||
def get(name: str):
|
||||
try:
|
||||
return tf.getmember(name)
|
||||
except KeyError:
|
||||
# retry with leading ./
|
||||
return tf.getmember(f"./{name}")
|
||||
for layer in manifest.get("layers", []):
|
||||
name = layer["path"]
|
||||
info = get(name)
|
||||
data = tf.extractfile(info).read()
|
||||
digest = hashlib.sha256(data).hexdigest()
|
||||
if layer["digest"] != f"sha256:{digest}":
|
||||
raise SystemExit(f"layer digest mismatch {name}: {digest}")
|
||||
for idx in manifest.get("indexes", []):
|
||||
name = idx['name']
|
||||
if not name.startswith("indexes/"):
|
||||
name = f"indexes/{name}"
|
||||
info = get(name)
|
||||
data = tf.extractfile(info).read()
|
||||
digest = hashlib.sha256(data).hexdigest()
|
||||
if idx["digest"] != f"sha256:{digest}":
|
||||
raise SystemExit(f"index digest mismatch {name}: {digest}")
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) != 3:
|
||||
print(__doc__)
|
||||
sys.exit(2)
|
||||
manifest_path = pathlib.Path(sys.argv[1])
|
||||
tar_path = pathlib.Path(sys.argv[2])
|
||||
|
||||
man_expected = load_sha256_sidecar(manifest_path)
|
||||
tar_expected = load_sha256_sidecar(tar_path)
|
||||
if sha256_file(manifest_path) != man_expected:
|
||||
raise SystemExit("manifest sha256 mismatch")
|
||||
if sha256_file(tar_path) != tar_expected:
|
||||
raise SystemExit("tarball sha256 mismatch")
|
||||
|
||||
manifest = json.loads(manifest_path.read_text())
|
||||
check_schema(manifest)
|
||||
check_tar_determinism(tar_path)
|
||||
check_content_hashes(manifest, tar_path)
|
||||
print("OK: mirror-thin bundle verified")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user