Add signal contracts for reachability, exploitability, trust, and unknown symbols
- Introduced `ReachabilityState`, `RuntimeHit`, `ExploitabilitySignal`, `ReachabilitySignal`, `SignalEnvelope`, `SignalType`, `TrustSignal`, and `UnknownSymbolSignal` records to define various signal types and their properties. - Implemented JSON serialization attributes for proper data interchange. - Created project files for the new signal contracts library and corresponding test projects. - Added deterministic test fixtures for micro-interaction testing. - Included cryptographic keys for secure operations with cosign.
This commit is contained in:
143
scripts/notifications/sign-dsse.py
Normal file
143
scripts/notifications/sign-dsse.py
Normal file
@@ -0,0 +1,143 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
DSSE signing utility for notification schemas and offline kit manifests.
|
||||
|
||||
Uses HMAC-SHA256 with Pre-Authentication Encoding (PAE) per DSSE spec.
|
||||
Development key: etc/secrets/dsse-dev.signing.json
|
||||
CI/Production: Use secrets.COSIGN_KEY_REF or equivalent HSM-backed key.
|
||||
|
||||
Usage:
|
||||
python scripts/notifications/sign-dsse.py <input.dsse.json> [--key <key-file>] [--output <output.dsse.json>]
|
||||
python scripts/notifications/sign-dsse.py docs/notifications/schemas/notify-schemas-catalog.dsse.json
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import base64
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
import struct
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def build_pae(payload_type: str, payload_bytes: bytes) -> bytes:
|
||||
"""Build Pre-Authentication Encoding per DSSE spec."""
|
||||
prefix = b"DSSEv1"
|
||||
type_bytes = payload_type.encode("utf-8") if payload_type else b""
|
||||
|
||||
# PAE format: "DSSEv1" + count(2) + len(type) + type + len(payload) + payload
|
||||
pae = (
|
||||
prefix +
|
||||
struct.pack(">Q", 2) + # count = 2 (type + payload)
|
||||
struct.pack(">Q", len(type_bytes)) +
|
||||
type_bytes +
|
||||
struct.pack(">Q", len(payload_bytes)) +
|
||||
payload_bytes
|
||||
)
|
||||
return pae
|
||||
|
||||
|
||||
def compute_hmac_signature(secret_b64: str, pae: bytes) -> str:
|
||||
"""Compute HMAC-SHA256 signature and return base64."""
|
||||
secret_bytes = base64.b64decode(secret_b64)
|
||||
signature = hmac.new(secret_bytes, pae, hashlib.sha256).digest()
|
||||
return base64.b64encode(signature).decode("utf-8")
|
||||
|
||||
|
||||
def load_key(key_path: Path) -> dict:
|
||||
"""Load signing key from JSON file."""
|
||||
with open(key_path, "r", encoding="utf-8") as f:
|
||||
key_data = json.load(f)
|
||||
|
||||
required = ["keyId", "secret", "algorithm"]
|
||||
for field in required:
|
||||
if field not in key_data:
|
||||
raise ValueError(f"Key file missing required field: {field}")
|
||||
|
||||
if key_data["algorithm"].upper() != "HMACSHA256":
|
||||
raise ValueError(f"Unsupported algorithm: {key_data['algorithm']}")
|
||||
|
||||
return key_data
|
||||
|
||||
|
||||
def sign_dsse(input_path: Path, key_data: dict, output_path: Path | None = None) -> dict:
|
||||
"""Sign a DSSE envelope file."""
|
||||
with open(input_path, "r", encoding="utf-8") as f:
|
||||
envelope = json.load(f)
|
||||
|
||||
if "payloadType" not in envelope or "payload" not in envelope:
|
||||
raise ValueError("Input file is not a valid DSSE envelope (missing payloadType or payload)")
|
||||
|
||||
payload_type = envelope["payloadType"]
|
||||
payload_b64 = envelope["payload"]
|
||||
payload_bytes = base64.b64decode(payload_b64)
|
||||
|
||||
# Build PAE and compute signature
|
||||
pae = build_pae(payload_type, payload_bytes)
|
||||
signature = compute_hmac_signature(key_data["secret"], pae)
|
||||
|
||||
# Create signature object
|
||||
sig_obj = {
|
||||
"sig": signature,
|
||||
"keyid": key_data["keyId"]
|
||||
}
|
||||
|
||||
# Add timestamp if not already present
|
||||
if "signedAt" not in sig_obj:
|
||||
sig_obj["signedAt"] = datetime.now(timezone.utc).isoformat(timespec="seconds")
|
||||
|
||||
# Update envelope with signature
|
||||
if "signatures" not in envelope or not envelope["signatures"]:
|
||||
envelope["signatures"] = []
|
||||
|
||||
# Remove any existing signature with the same keyId
|
||||
envelope["signatures"] = [s for s in envelope["signatures"] if s.get("keyid") != key_data["keyId"]]
|
||||
envelope["signatures"].append(sig_obj)
|
||||
|
||||
# Remove note field if present (was a placeholder)
|
||||
envelope.pop("note", None)
|
||||
|
||||
# Write output
|
||||
out_path = output_path or input_path
|
||||
with open(out_path, "w", encoding="utf-8") as f:
|
||||
json.dump(envelope, f, indent=2, ensure_ascii=False)
|
||||
f.write("\n")
|
||||
|
||||
return envelope
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Sign DSSE envelope files with HMAC-SHA256")
|
||||
parser.add_argument("input", type=Path, help="Input DSSE envelope file")
|
||||
parser.add_argument("--key", "-k", type=Path,
|
||||
default=Path("etc/secrets/dsse-dev.signing.json"),
|
||||
help="Signing key JSON file (default: etc/secrets/dsse-dev.signing.json)")
|
||||
parser.add_argument("--output", "-o", type=Path, help="Output file (default: overwrite input)")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.input.exists():
|
||||
print(f"Error: Input file not found: {args.input}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
if not args.key.exists():
|
||||
print(f"Error: Key file not found: {args.key}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
key_data = load_key(args.key)
|
||||
result = sign_dsse(args.input, key_data, args.output)
|
||||
out_path = args.output or args.input
|
||||
sig = result["signatures"][-1]
|
||||
print(f"Signed {args.input} with key {sig['keyid']}")
|
||||
print(f" Signature: {sig['sig'][:32]}...")
|
||||
print(f" Output: {out_path}")
|
||||
except Exception as e:
|
||||
print(f"Error: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user