Some checks failed
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Concelier Attestation Tests / attestation-tests (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
Export Center CI / export-ci (push) Has been cancelled
Mirror Thin Bundle Sign & Verify / mirror-sign (push) Has been cancelled
- Introduced DigestUpsertRequest for handling digest upsert requests with properties like ChannelId, Recipient, DigestKey, Events, and CollectUntil. - Created LockEntity to represent a lightweight distributed lock entry with properties such as Id, TenantId, Resource, Owner, ExpiresAt, and CreatedAt. feat: Implement ILockRepository interface and LockRepository class - Defined ILockRepository interface with methods for acquiring and releasing locks. - Implemented LockRepository class with methods to try acquiring a lock and releasing it, using SQL for upsert operations. feat: Add SurfaceManifestPointer record for manifest pointers - Introduced SurfaceManifestPointer to represent a minimal pointer to a Surface.FS manifest associated with an image digest. feat: Create PolicySimulationInputLock and related validation logic - Added PolicySimulationInputLock record to describe policy simulation inputs and expected digests. - Implemented validation logic for policy simulation inputs, including checks for digest drift and shadow mode requirements. test: Add unit tests for ReplayVerificationService and ReplayVerifier - Created ReplayVerificationServiceTests to validate the behavior of the ReplayVerificationService under various scenarios. - Developed ReplayVerifierTests to ensure the correctness of the ReplayVerifier logic. test: Implement PolicySimulationInputLockValidatorTests - Added tests for PolicySimulationInputLockValidator to verify the validation logic against expected inputs and conditions. chore: Add cosign key example and signing scripts - Included a placeholder cosign key example for development purposes. - Added a script for signing Signals artifacts using cosign with support for both v2 and v3. chore: Create script for uploading evidence to the evidence locker - Developed a script to upload evidence to the evidence locker, ensuring required environment variables are set.
294 lines
12 KiB
Python
294 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Verifier for mirror-thin-v1 artefacts and bundle meta.
|
|
|
|
Checks:
|
|
1) SHA256 of manifest/tarball (and optional bundle meta) matches sidecars.
|
|
2) Manifest schema contains required fields and required layer files exist.
|
|
3) Tarball headers deterministic (sorted paths, uid/gid=0, mtime=0).
|
|
4) Tar contents match manifest digests.
|
|
5) Optional: verify DSSE signatures for manifest/bundle when a public key is provided.
|
|
6) Optional: validate bundle meta (tenant/env scope, policy hashes, gap coverage counts).
|
|
|
|
Usage:
|
|
python scripts/mirror/verify_thin_bundle.py \
|
|
out/mirror/thin/mirror-thin-v1.manifest.json \
|
|
out/mirror/thin/mirror-thin-v1.tar.gz \
|
|
--bundle-meta out/mirror/thin/mirror-thin-v1.bundle.json \
|
|
--pubkey out/mirror/thin/tuf/keys/ci-ed25519.pub \
|
|
--tenant tenant-demo --environment lab
|
|
|
|
Exit code 0 on success; non-zero on any check failure.
|
|
"""
|
|
import argparse
|
|
import base64
|
|
import hashlib
|
|
import json
|
|
import pathlib
|
|
import sys
|
|
import tarfile
|
|
from typing import Optional
|
|
|
|
try:
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
|
|
|
|
CRYPTO_AVAILABLE = True
|
|
except ImportError: # pragma: no cover - surfaced as runtime guidance
|
|
CRYPTO_AVAILABLE = False
|
|
|
|
REQUIRED_FIELDS = ["version", "created", "layers", "indexes"]
|
|
REQUIRED_LAYER_FILES = {
|
|
"layers/observations.ndjson",
|
|
"layers/time-anchor.json",
|
|
"layers/transport-plan.json",
|
|
"layers/rekor-policy.json",
|
|
"layers/mirror-policy.json",
|
|
"layers/offline-kit-policy.json",
|
|
"layers/artifact-hashes.json",
|
|
"indexes/observations.index",
|
|
}
|
|
|
|
|
|
def _b64url_decode(data: str) -> bytes:
|
|
padding = "=" * (-len(data) % 4)
|
|
return base64.urlsafe_b64decode(data + padding)
|
|
|
|
|
|
def sha256_file(path: pathlib.Path) -> str:
|
|
h = hashlib.sha256()
|
|
with path.open("rb") as f:
|
|
for chunk in iter(lambda: f.read(8192), b""):
|
|
h.update(chunk)
|
|
return h.hexdigest()
|
|
|
|
|
|
def load_sha256_sidecar(path: pathlib.Path) -> str:
|
|
sidecar = path.with_suffix(path.suffix + ".sha256")
|
|
if not sidecar.exists():
|
|
raise SystemExit(f"missing sidecar {sidecar}")
|
|
return sidecar.read_text().strip().split()[0]
|
|
|
|
|
|
def check_schema(manifest: dict):
|
|
missing = [f for f in REQUIRED_FIELDS if f not in manifest]
|
|
if missing:
|
|
raise SystemExit(f"manifest missing fields: {missing}")
|
|
|
|
|
|
def normalize(name: str) -> str:
|
|
return name[2:] if name.startswith("./") else name
|
|
|
|
|
|
def check_tar_determinism(tar_path: pathlib.Path):
|
|
with tarfile.open(tar_path, "r:gz") as tf:
|
|
names = [normalize(n) for n in tf.getnames()]
|
|
if names != sorted(names):
|
|
raise SystemExit("tar entries not sorted")
|
|
for m in tf.getmembers():
|
|
if m.uid != 0 or m.gid != 0:
|
|
raise SystemExit(f"tar header uid/gid not zero for {m.name}")
|
|
if m.mtime != 0:
|
|
raise SystemExit(f"tar header mtime not zero for {m.name}")
|
|
|
|
|
|
def check_required_layers(tar_path: pathlib.Path):
|
|
with tarfile.open(tar_path, "r:gz") as tf:
|
|
names = {normalize(n) for n in tf.getnames()}
|
|
for required in REQUIRED_LAYER_FILES:
|
|
if required not in names:
|
|
raise SystemExit(f"required file missing from bundle: {required}")
|
|
|
|
|
|
def check_content_hashes(manifest: dict, tar_path: pathlib.Path):
|
|
with tarfile.open(tar_path, "r:gz") as tf:
|
|
def get(name: str):
|
|
try:
|
|
return tf.getmember(name)
|
|
except KeyError:
|
|
return tf.getmember(f"./{name}")
|
|
for layer in manifest.get("layers", []):
|
|
name = layer["path"]
|
|
info = get(name)
|
|
data = tf.extractfile(info).read()
|
|
digest = hashlib.sha256(data).hexdigest()
|
|
if layer["digest"] != f"sha256:{digest}":
|
|
raise SystemExit(f"layer digest mismatch {name}: {digest}")
|
|
for idx in manifest.get("indexes", []):
|
|
name = idx['name']
|
|
if not name.startswith("indexes/"):
|
|
name = f"indexes/{name}"
|
|
info = get(name)
|
|
data = tf.extractfile(info).read()
|
|
digest = hashlib.sha256(data).hexdigest()
|
|
if idx["digest"] != f"sha256:{digest}":
|
|
raise SystemExit(f"index digest mismatch {name}: {digest}")
|
|
|
|
|
|
def read_tar_entry(tar_path: pathlib.Path, name: str) -> bytes:
|
|
with tarfile.open(tar_path, "r:gz") as tf:
|
|
try:
|
|
info = tf.getmember(name)
|
|
except KeyError:
|
|
info = tf.getmember(f"./{name}")
|
|
data = tf.extractfile(info).read()
|
|
return data
|
|
|
|
|
|
def load_pubkey(path: pathlib.Path) -> Ed25519PublicKey:
|
|
if not CRYPTO_AVAILABLE:
|
|
raise SystemExit("cryptography is required for DSSE verification; install before using --pubkey")
|
|
return serialization.load_pem_public_key(path.read_bytes())
|
|
|
|
|
|
def verify_dsse(dsse_path: pathlib.Path, pubkey_path: pathlib.Path, expected_payload: pathlib.Path, expected_type: str):
|
|
dsse_obj = json.loads(dsse_path.read_text())
|
|
if dsse_obj.get("payloadType") != expected_type:
|
|
raise SystemExit(f"DSSE payloadType mismatch for {dsse_path}")
|
|
payload = _b64url_decode(dsse_obj.get("payload", ""))
|
|
if payload != expected_payload.read_bytes():
|
|
raise SystemExit(f"DSSE payload mismatch for {dsse_path}")
|
|
sigs = dsse_obj.get("signatures") or []
|
|
if not sigs:
|
|
raise SystemExit(f"DSSE missing signatures: {dsse_path}")
|
|
pub = load_pubkey(pubkey_path)
|
|
try:
|
|
pub.verify(_b64url_decode(sigs[0]["sig"]), payload)
|
|
except Exception as exc: # pragma: no cover - cryptography raises InvalidSignature
|
|
raise SystemExit(f"DSSE signature verification failed for {dsse_path}: {exc}")
|
|
|
|
|
|
def check_bundle_meta(meta_path: pathlib.Path, manifest_path: pathlib.Path, tar_path: pathlib.Path, tenant: Optional[str], environment: Optional[str]):
|
|
meta = json.loads(meta_path.read_text())
|
|
for field in ["bundle", "version", "artifacts", "gaps", "tooling"]:
|
|
if field not in meta:
|
|
raise SystemExit(f"bundle meta missing field {field}")
|
|
if tenant and meta.get("tenant") != tenant:
|
|
raise SystemExit(f"bundle tenant mismatch: {meta.get('tenant')} != {tenant}")
|
|
if environment and meta.get("environment") != environment:
|
|
raise SystemExit(f"bundle environment mismatch: {meta.get('environment')} != {environment}")
|
|
|
|
artifacts = meta["artifacts"]
|
|
|
|
def expect(name: str, path: pathlib.Path):
|
|
recorded = artifacts.get(name)
|
|
if not recorded:
|
|
raise SystemExit(f"bundle meta missing artifact entry: {name}")
|
|
expected = recorded.get("sha256")
|
|
if expected and expected != sha256_file(path):
|
|
raise SystemExit(f"bundle meta digest mismatch for {name}")
|
|
|
|
expect("manifest", manifest_path)
|
|
expect("tarball", tar_path)
|
|
# DSSE sidecars are optional but if present, validate hashes
|
|
dsse_manifest = artifacts.get("manifest_dsse")
|
|
if dsse_manifest and dsse_manifest.get("path"):
|
|
expect("manifest_dsse", meta_path.parent / dsse_manifest["path"])
|
|
dsse_bundle = artifacts.get("bundle_dsse")
|
|
if dsse_bundle and dsse_bundle.get("path"):
|
|
expect("bundle_dsse", meta_path.parent / dsse_bundle["path"])
|
|
dsse_anchor = artifacts.get("time_anchor_dsse")
|
|
if dsse_anchor and dsse_anchor.get("path"):
|
|
expect("time_anchor_dsse", meta_path.parent / dsse_anchor["path"])
|
|
for extra in ["time_anchor", "transport_plan", "rekor_policy", "mirror_policy", "offline_policy", "artifact_hashes"]:
|
|
rec = artifacts.get(extra)
|
|
if not rec:
|
|
raise SystemExit(f"bundle meta missing artifact entry: {extra}")
|
|
if not rec.get("path"):
|
|
raise SystemExit(f"bundle meta missing path for {extra}")
|
|
|
|
time_anchor_dsse = artifacts.get("time_anchor_dsse")
|
|
if time_anchor_dsse:
|
|
if not time_anchor_dsse.get("path"):
|
|
raise SystemExit("bundle meta missing path for time_anchor_dsse")
|
|
if not (meta_path.parent / time_anchor_dsse["path"]).exists():
|
|
raise SystemExit("time_anchor_dsse referenced but file missing")
|
|
|
|
for group, expected_count in [("ok", 10), ("rk", 10), ("ms", 10)]:
|
|
if len(meta.get("gaps", {}).get(group, [])) != expected_count:
|
|
raise SystemExit(f"bundle meta gaps.{group} expected {expected_count} entries")
|
|
|
|
root_guess = manifest_path.parents[3] if len(manifest_path.parents) > 3 else manifest_path.parents[-1]
|
|
tool_expectations = {
|
|
'make_thin_v1_sh': root_guess / 'src' / 'Mirror' / 'StellaOps.Mirror.Creator' / 'make-thin-v1.sh',
|
|
'sign_script': root_guess / 'scripts' / 'mirror' / 'sign_thin_bundle.py',
|
|
'verify_script': root_guess / 'scripts' / 'mirror' / 'verify_thin_bundle.py',
|
|
'verify_oci': root_guess / 'scripts' / 'mirror' / 'verify_oci_layout.py'
|
|
}
|
|
for key, path in tool_expectations.items():
|
|
recorded = meta['tooling'].get(key)
|
|
if not recorded:
|
|
raise SystemExit(f"tool hash missing for {key}")
|
|
actual = sha256_file(path)
|
|
if recorded != actual:
|
|
raise SystemExit(f"tool hash mismatch for {key}")
|
|
|
|
if meta.get("checkpoint_freshness_seconds", 0) <= 0:
|
|
raise SystemExit("checkpoint_freshness_seconds must be positive")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("manifest", type=pathlib.Path)
|
|
parser.add_argument("tar", type=pathlib.Path)
|
|
parser.add_argument("--bundle-meta", type=pathlib.Path)
|
|
parser.add_argument("--pubkey", type=pathlib.Path)
|
|
parser.add_argument("--tenant", type=str)
|
|
parser.add_argument("--environment", type=str)
|
|
args = parser.parse_args()
|
|
|
|
manifest_path = args.manifest
|
|
tar_path = args.tar
|
|
bundle_meta = args.bundle_meta
|
|
bundle_dsse = bundle_meta.with_suffix(".dsse.json") if bundle_meta else None
|
|
manifest_dsse = manifest_path.with_suffix(".dsse.json")
|
|
time_anchor_dsse = None
|
|
time_anchor_path = tar_path.parent / "stage-v1" / "layers" / "time-anchor.json"
|
|
|
|
man_expected = load_sha256_sidecar(manifest_path)
|
|
tar_expected = load_sha256_sidecar(tar_path)
|
|
if sha256_file(manifest_path) != man_expected:
|
|
raise SystemExit("manifest sha256 mismatch")
|
|
if sha256_file(tar_path) != tar_expected:
|
|
raise SystemExit("tarball sha256 mismatch")
|
|
|
|
manifest = json.loads(manifest_path.read_text())
|
|
check_schema(manifest)
|
|
check_tar_determinism(tar_path)
|
|
check_required_layers(tar_path)
|
|
check_content_hashes(manifest, tar_path)
|
|
|
|
if bundle_meta:
|
|
if not bundle_meta.exists():
|
|
raise SystemExit(f"bundle meta missing: {bundle_meta}")
|
|
meta_expected = load_sha256_sidecar(bundle_meta)
|
|
if sha256_file(bundle_meta) != meta_expected:
|
|
raise SystemExit("bundle meta sha256 mismatch")
|
|
check_bundle_meta(bundle_meta, manifest_path, tar_path, args.tenant, args.environment)
|
|
meta = json.loads(bundle_meta.read_text())
|
|
ta_entry = meta.get("artifacts", {}).get("time_anchor_dsse")
|
|
if ta_entry and ta_entry.get("path"):
|
|
ta_path = bundle_meta.parent / ta_entry["path"]
|
|
if sha256_file(ta_path) != ta_entry.get("sha256"):
|
|
raise SystemExit("time_anchor_dsse sha256 mismatch")
|
|
time_anchor_dsse = ta_path
|
|
|
|
if args.pubkey:
|
|
pubkey = args.pubkey
|
|
if manifest_dsse.exists():
|
|
verify_dsse(manifest_dsse, pubkey, manifest_path, "application/vnd.stellaops.mirror.manifest+json")
|
|
if bundle_dsse and bundle_dsse.exists():
|
|
verify_dsse(bundle_dsse, pubkey, bundle_meta, "application/vnd.stellaops.mirror.bundle+json")
|
|
if time_anchor_dsse and time_anchor_dsse.exists() and time_anchor_path.exists():
|
|
anchor_bytes = read_tar_entry(tar_path, "layers/time-anchor.json")
|
|
tmp_anchor = tar_path.parent / "time-anchor.verify.json"
|
|
tmp_anchor.write_bytes(anchor_bytes)
|
|
verify_dsse(time_anchor_dsse, pubkey, tmp_anchor, "application/vnd.stellaops.time-anchor+json")
|
|
tmp_anchor.unlink(missing_ok=True)
|
|
|
|
print("OK: mirror-thin bundle verified")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|