devops folders consolidate
This commit is contained in:
87
deploy/tools/feeds/concelier/backfill-store-aoc-19-005.sh
Normal file
87
deploy/tools/feeds/concelier/backfill-store-aoc-19-005.sh
Normal file
@@ -0,0 +1,87 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
# Postgres backfill runner for STORE-AOC-19-005-DEV (Link-Not-Merge raw linksets/chunks)
|
||||
# Usage:
|
||||
# PGURI=postgres://.../concelier ./scripts/concelier/backfill-store-aoc-19-005.sh /path/to/linksets-stage-backfill.tar.zst
|
||||
# Optional:
|
||||
# PGSCHEMA=lnm_raw (default), DRY_RUN=1 to stop after extraction
|
||||
#
|
||||
# Assumptions:
|
||||
# - Dataset contains ndjson files: linksets.ndjson, advisory_chunks.ndjson, manifest.json
|
||||
# - Target staging tables are created by this script if absent:
|
||||
# <schema>.linksets_raw(id text primary key, raw jsonb)
|
||||
# <schema>.advisory_chunks_raw(id text primary key, raw jsonb)
|
||||
|
||||
DATASET_PATH="${1:-}"
|
||||
if [[ -z "${DATASET_PATH}" || ! -f "${DATASET_PATH}" ]]; then
|
||||
echo "Dataset tarball not found. Provide path to linksets-stage-backfill.tar.zst" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
PGURI="${PGURI:-${CONCELIER_PG_URI:-}}"
|
||||
PGSCHEMA="${PGSCHEMA:-lnm_raw}"
|
||||
DRY_RUN="${DRY_RUN:-0}"
|
||||
|
||||
if [[ -z "${PGURI}" ]]; then
|
||||
echo "PGURI (or CONCELIER_PG_URI) must be set" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
WORKDIR="$(mktemp -d)"
|
||||
cleanup() { rm -rf "${WORKDIR}"; }
|
||||
trap cleanup EXIT
|
||||
|
||||
echo "==> Dataset: ${DATASET_PATH}"
|
||||
sha256sum "${DATASET_PATH}"
|
||||
|
||||
echo "==> Extracting to ${WORKDIR}"
|
||||
tar -xf "${DATASET_PATH}" -C "${WORKDIR}"
|
||||
|
||||
for required in linksets.ndjson advisory_chunks.ndjson manifest.json; do
|
||||
if [[ ! -f "${WORKDIR}/${required}" ]]; then
|
||||
echo "Missing required file in dataset: ${required}" >&2
|
||||
exit 1
|
||||
fi
|
||||
done
|
||||
|
||||
echo "==> Ensuring staging schema/tables exist in Postgres"
|
||||
psql "${PGURI}" <<SQL
|
||||
create schema if not exists ${PGSCHEMA};
|
||||
create table if not exists ${PGSCHEMA}.linksets_raw (
|
||||
id text primary key,
|
||||
raw jsonb not null
|
||||
);
|
||||
create table if not exists ${PGSCHEMA}.advisory_chunks_raw (
|
||||
id text primary key,
|
||||
raw jsonb not null
|
||||
);
|
||||
SQL
|
||||
|
||||
if [[ "${DRY_RUN}" != "0" ]]; then
|
||||
echo "DRY_RUN=1 set; extraction and schema verification completed, skipping import."
|
||||
exit 0
|
||||
fi
|
||||
|
||||
echo "==> Importing linksets into ${PGSCHEMA}.linksets_raw"
|
||||
cat >"${WORKDIR}/linksets.tsv" <(jq -rc '[._id, .] | @tsv' "${WORKDIR}/linksets.ndjson")
|
||||
psql "${PGURI}" <<SQL
|
||||
TRUNCATE TABLE ${PGSCHEMA}.linksets_raw;
|
||||
\copy ${PGSCHEMA}.linksets_raw (id, raw) FROM '${WORKDIR}/linksets.tsv' WITH (FORMAT csv, DELIMITER E'\t', QUOTE '"', ESCAPE '"');
|
||||
SQL
|
||||
|
||||
echo "==> Importing advisory_chunks into ${PGSCHEMA}.advisory_chunks_raw"
|
||||
cat >"${WORKDIR}/advisory_chunks.tsv" <(jq -rc '[._id, .] | @tsv' "${WORKDIR}/advisory_chunks.ndjson")
|
||||
psql "${PGURI}" <<SQL
|
||||
TRUNCATE TABLE ${PGSCHEMA}.advisory_chunks_raw;
|
||||
\copy ${PGSCHEMA}.advisory_chunks_raw (id, raw) FROM '${WORKDIR}/advisory_chunks.tsv' WITH (FORMAT csv, DELIMITER E'\t', QUOTE '"', ESCAPE '"');
|
||||
SQL
|
||||
|
||||
echo "==> Post-import counts"
|
||||
psql -tA "${PGURI}" -c "select 'linksets_raw='||count(*) from ${PGSCHEMA}.linksets_raw;"
|
||||
psql -tA "${PGURI}" -c "select 'advisory_chunks_raw='||count(*) from ${PGSCHEMA}.advisory_chunks_raw;"
|
||||
|
||||
echo "==> Manifest summary"
|
||||
cat "${WORKDIR}/manifest.json"
|
||||
|
||||
echo "Backfill complete."
|
||||
@@ -0,0 +1,74 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
# Deterministic dataset builder for STORE-AOC-19-005-DEV.
|
||||
# Generates linksets-stage-backfill.tar.zst from repo seed data.
|
||||
# Usage:
|
||||
# ./scripts/concelier/build-store-aoc-19-005-dataset.sh [output_tarball]
|
||||
# Default output: out/linksets/linksets-stage-backfill.tar.zst
|
||||
|
||||
command -v tar >/dev/null || { echo "tar is required" >&2; exit 1; }
|
||||
command -v sha256sum >/dev/null || { echo "sha256sum is required" >&2; exit 1; }
|
||||
|
||||
TAR_COMPRESS=()
|
||||
if command -v zstd >/dev/null 2>&1; then
|
||||
TAR_COMPRESS=(--zstd)
|
||||
else
|
||||
echo "zstd not found; building uncompressed tarball (extension kept for compatibility)" >&2
|
||||
fi
|
||||
|
||||
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
|
||||
SEED_DIR="${ROOT_DIR}/src/__Tests/__Datasets/seed-data/concelier/store-aoc-19-005"
|
||||
OUT_DIR="${ROOT_DIR}/out/linksets"
|
||||
OUT_PATH="${1:-${OUT_DIR}/linksets-stage-backfill.tar.zst}"
|
||||
GEN_TIME="2025-12-07T00:00:00Z"
|
||||
|
||||
for seed in linksets.ndjson advisory_chunks.ndjson; do
|
||||
if [[ ! -f "${SEED_DIR}/${seed}" ]]; then
|
||||
echo "Missing seed file: ${SEED_DIR}/${seed}" >&2
|
||||
exit 1
|
||||
fi
|
||||
done
|
||||
|
||||
WORKDIR="$(mktemp -d)"
|
||||
cleanup() { rm -rf "${WORKDIR}"; }
|
||||
trap cleanup EXIT
|
||||
|
||||
cp "${SEED_DIR}/linksets.ndjson" "${WORKDIR}/linksets.ndjson"
|
||||
cp "${SEED_DIR}/advisory_chunks.ndjson" "${WORKDIR}/advisory_chunks.ndjson"
|
||||
|
||||
linksets_sha=$(sha256sum "${WORKDIR}/linksets.ndjson" | awk '{print $1}')
|
||||
advisory_sha=$(sha256sum "${WORKDIR}/advisory_chunks.ndjson" | awk '{print $1}')
|
||||
linksets_count=$(wc -l < "${WORKDIR}/linksets.ndjson" | tr -d '[:space:]')
|
||||
advisory_count=$(wc -l < "${WORKDIR}/advisory_chunks.ndjson" | tr -d '[:space:]')
|
||||
|
||||
cat >"${WORKDIR}/manifest.json" <<EOF
|
||||
{
|
||||
"datasetId": "store-aoc-19-005-dev",
|
||||
"generatedAt": "${GEN_TIME}",
|
||||
"source": "src/__Tests/__Datasets/seed-data/concelier/store-aoc-19-005",
|
||||
"records": {
|
||||
"linksets": ${linksets_count},
|
||||
"advisory_chunks": ${advisory_count}
|
||||
},
|
||||
"sha256": {
|
||||
"linksets.ndjson": "${linksets_sha}",
|
||||
"advisory_chunks.ndjson": "${advisory_sha}"
|
||||
}
|
||||
}
|
||||
EOF
|
||||
|
||||
mkdir -p "${OUT_DIR}"
|
||||
|
||||
tar "${TAR_COMPRESS[@]}" \
|
||||
--format=ustar \
|
||||
--mtime='1970-01-01 00:00:00Z' \
|
||||
--owner=0 --group=0 --numeric-owner \
|
||||
-cf "${OUT_PATH}" \
|
||||
-C "${WORKDIR}" \
|
||||
linksets.ndjson advisory_chunks.ndjson manifest.json
|
||||
|
||||
sha256sum "${OUT_PATH}" > "${OUT_PATH}.sha256"
|
||||
|
||||
echo "Wrote ${OUT_PATH}"
|
||||
cat "${OUT_PATH}.sha256"
|
||||
55
deploy/tools/feeds/concelier/export-linksets-tarball.sh
Normal file
55
deploy/tools/feeds/concelier/export-linksets-tarball.sh
Normal file
@@ -0,0 +1,55 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
# Export Concelier linksets/advisory_chunks from Postgres to a tar.zst bundle.
|
||||
# Usage:
|
||||
# PGURI=postgres://user:pass@host:5432/db \
|
||||
# ./scripts/concelier/export-linksets-tarball.sh out/linksets/linksets-stage-backfill.tar.zst
|
||||
#
|
||||
# Optional env:
|
||||
# PGSCHEMA=public # schema that owns linksets/advisory_chunks
|
||||
# LINKSETS_TABLE=linksets # table name for linksets
|
||||
# CHUNKS_TABLE=advisory_chunks # table name for advisory chunks
|
||||
# TMPDIR=/tmp/export-linksets # working directory (defaults to mktemp)
|
||||
|
||||
TARGET="${1:-}"
|
||||
if [[ -z "${TARGET}" ]]; then
|
||||
echo "Usage: PGURI=... $0 out/linksets/linksets-stage-backfill.tar.zst" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [[ -z "${PGURI:-}" ]]; then
|
||||
echo "PGURI environment variable is required (postgres://...)" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
PGSCHEMA="${PGSCHEMA:-public}"
|
||||
LINKSETS_TABLE="${LINKSETS_TABLE:-linksets}"
|
||||
CHUNKS_TABLE="${CHUNKS_TABLE:-advisory_chunks}"
|
||||
WORKDIR="${TMPDIR:-$(mktemp -d)}"
|
||||
|
||||
mkdir -p "${WORKDIR}"
|
||||
OUTDIR="$(dirname "${TARGET}")"
|
||||
mkdir -p "${OUTDIR}"
|
||||
|
||||
echo "==> Exporting linksets from ${PGSCHEMA}.${LINKSETS_TABLE}"
|
||||
psql "${PGURI}" -c "\copy (select row_to_json(t) from ${PGSCHEMA}.${LINKSETS_TABLE} t) to '${WORKDIR}/linksets.ndjson'"
|
||||
|
||||
echo "==> Exporting advisory_chunks from ${PGSCHEMA}.${CHUNKS_TABLE}"
|
||||
psql "${PGURI}" -c "\copy (select row_to_json(t) from ${PGSCHEMA}.${CHUNKS_TABLE} t) to '${WORKDIR}/advisory_chunks.ndjson'"
|
||||
|
||||
LINKSETS_COUNT="$(wc -l < "${WORKDIR}/linksets.ndjson")"
|
||||
CHUNKS_COUNT="$(wc -l < "${WORKDIR}/advisory_chunks.ndjson")"
|
||||
|
||||
echo "==> Writing manifest.json"
|
||||
jq -n --argjson linksets "${LINKSETS_COUNT}" --argjson advisory_chunks "${CHUNKS_COUNT}" \
|
||||
'{linksets: $linksets, advisory_chunks: $advisory_chunks}' \
|
||||
> "${WORKDIR}/manifest.json"
|
||||
|
||||
echo "==> Building tarball ${TARGET}"
|
||||
tar -I "zstd -19" -cf "${TARGET}" -C "${WORKDIR}" linksets.ndjson advisory_chunks.ndjson manifest.json
|
||||
|
||||
echo "==> SHA-256"
|
||||
sha256sum "${TARGET}"
|
||||
|
||||
echo "Done. Workdir: ${WORKDIR}"
|
||||
@@ -0,0 +1,90 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
# Validates the store-aoc-19-005 dataset tarball.
|
||||
# Usage: ./scripts/concelier/test-store-aoc-19-005-dataset.sh [tarball]
|
||||
|
||||
command -v tar >/dev/null || { echo "tar is required" >&2; exit 1; }
|
||||
command -v sha256sum >/dev/null || { echo "sha256sum is required" >&2; exit 1; }
|
||||
command -v python >/dev/null || { echo "python is required" >&2; exit 1; }
|
||||
|
||||
DATASET="${1:-out/linksets/linksets-stage-backfill.tar.zst}"
|
||||
|
||||
if [[ ! -f "${DATASET}" ]]; then
|
||||
echo "Dataset not found: ${DATASET}" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
WORKDIR="$(mktemp -d)"
|
||||
cleanup() { rm -rf "${WORKDIR}"; }
|
||||
trap cleanup EXIT
|
||||
|
||||
tar -xf "${DATASET}" -C "${WORKDIR}"
|
||||
|
||||
for required in linksets.ndjson advisory_chunks.ndjson manifest.json; do
|
||||
if [[ ! -f "${WORKDIR}/${required}" ]]; then
|
||||
echo "Missing ${required} in dataset" >&2
|
||||
exit 1
|
||||
fi
|
||||
done
|
||||
|
||||
manifest="${WORKDIR}/manifest.json"
|
||||
expected_linksets=$(python - <<'PY' "${manifest}"
|
||||
import json, sys
|
||||
with open(sys.argv[1], "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
print(data["records"]["linksets"])
|
||||
PY
|
||||
)
|
||||
expected_chunks=$(python - <<'PY' "${manifest}"
|
||||
import json, sys
|
||||
with open(sys.argv[1], "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
print(data["records"]["advisory_chunks"])
|
||||
PY
|
||||
)
|
||||
expected_linksets_sha=$(python - <<'PY' "${manifest}"
|
||||
import json, sys
|
||||
with open(sys.argv[1], "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
print(data["sha256"]["linksets.ndjson"])
|
||||
PY
|
||||
)
|
||||
expected_chunks_sha=$(python - <<'PY' "${manifest}"
|
||||
import json, sys
|
||||
with open(sys.argv[1], "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
print(data["sha256"]["advisory_chunks.ndjson"])
|
||||
PY
|
||||
)
|
||||
|
||||
actual_linksets=$(wc -l < "${WORKDIR}/linksets.ndjson" | tr -d '[:space:]')
|
||||
actual_chunks=$(wc -l < "${WORKDIR}/advisory_chunks.ndjson" | tr -d '[:space:]')
|
||||
actual_linksets_sha=$(sha256sum "${WORKDIR}/linksets.ndjson" | awk '{print $1}')
|
||||
actual_chunks_sha=$(sha256sum "${WORKDIR}/advisory_chunks.ndjson" | awk '{print $1}')
|
||||
|
||||
if [[ "${expected_linksets}" != "${actual_linksets}" ]]; then
|
||||
echo "linksets count mismatch: expected ${expected_linksets}, got ${actual_linksets}" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [[ "${expected_chunks}" != "${actual_chunks}" ]]; then
|
||||
echo "advisory_chunks count mismatch: expected ${expected_chunks}, got ${actual_chunks}" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [[ "${expected_linksets_sha}" != "${actual_linksets_sha}" ]]; then
|
||||
echo "linksets sha mismatch: expected ${expected_linksets_sha}, got ${actual_linksets_sha}" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [[ "${expected_chunks_sha}" != "${actual_chunks_sha}" ]]; then
|
||||
echo "advisory_chunks sha mismatch: expected ${expected_chunks_sha}, got ${actual_chunks_sha}" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "Dataset validation succeeded:"
|
||||
echo " linksets: ${actual_linksets}"
|
||||
echo " advisory_chunks: ${actual_chunks}"
|
||||
echo " linksets.sha256=${actual_linksets_sha}"
|
||||
echo " advisory_chunks.sha256=${actual_chunks_sha}"
|
||||
467
deploy/tools/feeds/feeds/run_icscisa_kisa_refresh.py
Normal file
467
deploy/tools/feeds/feeds/run_icscisa_kisa_refresh.py
Normal file
@@ -0,0 +1,467 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
ICS/KISA feed refresh runner.
|
||||
|
||||
Runs the SOP v0.2 workflow to emit NDJSON advisories, delta, fetch log, and hash
|
||||
manifest under out/feeds/icscisa-kisa/<YYYYMMDD>/.
|
||||
|
||||
Defaults to live fetch with offline-safe fallback to baked-in samples. You can
|
||||
force live/offline via env or CLI flags.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import datetime as dt
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from html import unescape
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterable, List, Tuple
|
||||
from urllib.error import URLError, HTTPError
|
||||
from urllib.parse import urlparse, urlunparse
|
||||
from urllib.request import Request, urlopen
|
||||
from xml.etree import ElementTree
|
||||
|
||||
|
||||
DEFAULT_OUTPUT_ROOT = Path("out/feeds/icscisa-kisa")
|
||||
DEFAULT_ICSCISA_URL = "https://www.cisa.gov/news-events/ics-advisories/icsa.xml"
|
||||
DEFAULT_KISA_URL = "https://knvd.krcert.or.kr/rss/securityInfo.do"
|
||||
DEFAULT_GATEWAY_HOST = "concelier-webservice"
|
||||
DEFAULT_GATEWAY_SCHEME = "http"
|
||||
USER_AGENT = "StellaOpsFeedRefresh/1.0 (+https://stella-ops.org)"
|
||||
|
||||
|
||||
def utcnow() -> dt.datetime:
|
||||
return dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc)
|
||||
|
||||
|
||||
def iso(ts: dt.datetime) -> str:
|
||||
return ts.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
|
||||
|
||||
def sha256_bytes(data: bytes) -> str:
|
||||
return hashlib.sha256(data).hexdigest()
|
||||
|
||||
|
||||
def strip_html(value: str) -> str:
|
||||
return re.sub(r"<[^>]+>", "", value or "").strip()
|
||||
|
||||
|
||||
def safe_request(url: str) -> bytes:
|
||||
req = Request(url, headers={"User-Agent": USER_AGENT})
|
||||
with urlopen(req, timeout=30) as resp:
|
||||
return resp.read()
|
||||
|
||||
|
||||
def parse_rss_items(xml_bytes: bytes) -> Iterable[Dict[str, str]]:
|
||||
root = ElementTree.fromstring(xml_bytes)
|
||||
for item in root.findall(".//item"):
|
||||
title = (item.findtext("title") or "").strip()
|
||||
link = (item.findtext("link") or "").strip()
|
||||
description = strip_html(unescape(item.findtext("description") or ""))
|
||||
pub_date = (item.findtext("pubDate") or "").strip()
|
||||
yield {
|
||||
"title": title,
|
||||
"link": link,
|
||||
"description": description,
|
||||
"pub_date": pub_date,
|
||||
}
|
||||
|
||||
|
||||
def normalize_icscisa_record(item: Dict[str, str], fetched_at: str, run_id: str) -> Dict[str, object]:
|
||||
advisory_id = item["title"].split(":")[0].strip() or "icsa-unknown"
|
||||
summary = item["description"] or item["title"]
|
||||
raw_payload = f"{item['title']}\n{item['link']}\n{item['description']}"
|
||||
record = {
|
||||
"advisory_id": advisory_id,
|
||||
"source": "icscisa",
|
||||
"source_url": item["link"] or DEFAULT_ICSCISA_URL,
|
||||
"title": item["title"] or advisory_id,
|
||||
"summary": summary,
|
||||
"published": iso(parse_pubdate(item["pub_date"])),
|
||||
"updated": iso(parse_pubdate(item["pub_date"])),
|
||||
"severity": "unknown",
|
||||
"cvss": None,
|
||||
"cwe": [],
|
||||
"affected_products": [],
|
||||
"references": [url for url in (item["link"],) if url],
|
||||
"signature": {"status": "missing", "reason": "unsigned_source"},
|
||||
"fetched_at": fetched_at,
|
||||
"run_id": run_id,
|
||||
"payload_sha256": sha256_bytes(raw_payload.encode("utf-8")),
|
||||
}
|
||||
return record
|
||||
|
||||
|
||||
def normalize_kisa_record(item: Dict[str, str], fetched_at: str, run_id: str) -> Dict[str, object]:
|
||||
advisory_id = extract_kisa_id(item)
|
||||
raw_payload = f"{item['title']}\n{item['link']}\n{item['description']}"
|
||||
record = {
|
||||
"advisory_id": advisory_id,
|
||||
"source": "kisa",
|
||||
"source_url": item["link"] or DEFAULT_KISA_URL,
|
||||
"title": item["title"] or advisory_id,
|
||||
"summary": item["description"] or item["title"],
|
||||
"published": iso(parse_pubdate(item["pub_date"])),
|
||||
"updated": iso(parse_pubdate(item["pub_date"])),
|
||||
"severity": "unknown",
|
||||
"cvss": None,
|
||||
"cwe": [],
|
||||
"affected_products": [],
|
||||
"references": [url for url in (item["link"], DEFAULT_KISA_URL) if url],
|
||||
"signature": {"status": "missing", "reason": "unsigned_source"},
|
||||
"fetched_at": fetched_at,
|
||||
"run_id": run_id,
|
||||
"payload_sha256": sha256_bytes(raw_payload.encode("utf-8")),
|
||||
}
|
||||
return record
|
||||
|
||||
|
||||
def extract_kisa_id(item: Dict[str, str]) -> str:
|
||||
link = item["link"]
|
||||
match = re.search(r"IDX=([0-9]+)", link)
|
||||
if match:
|
||||
return f"KISA-{match.group(1)}"
|
||||
return (item["title"].split()[0] if item["title"] else "KISA-unknown").strip()
|
||||
|
||||
|
||||
def parse_pubdate(value: str) -> dt.datetime:
|
||||
if not value:
|
||||
return utcnow()
|
||||
try:
|
||||
# RFC1123-ish
|
||||
return dt.datetime.strptime(value, "%a, %d %b %Y %H:%M:%S %Z").replace(tzinfo=dt.timezone.utc)
|
||||
except ValueError:
|
||||
try:
|
||||
return dt.datetime.fromisoformat(value.replace("Z", "+00:00"))
|
||||
except ValueError:
|
||||
return utcnow()
|
||||
|
||||
|
||||
def sample_records() -> List[Dict[str, object]]:
|
||||
now_iso = iso(utcnow())
|
||||
return [
|
||||
{
|
||||
"advisory_id": "ICSA-25-123-01",
|
||||
"source": "icscisa",
|
||||
"source_url": "https://www.cisa.gov/news-events/ics-advisories/icsa-25-123-01",
|
||||
"title": "Example ICS Advisory",
|
||||
"summary": "Example Corp ControlSuite RCE via exposed management service.",
|
||||
"published": "2025-10-13T12:00:00Z",
|
||||
"updated": "2025-11-30T00:00:00Z",
|
||||
"severity": "High",
|
||||
"cvss": {"version": "3.1", "vector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", "score": 9.8},
|
||||
"cwe": ["CWE-269"],
|
||||
"affected_products": [{"vendor": "Example Corp", "product": "ControlSuite", "versions": ["4.2.0", "4.2.1"]}],
|
||||
"references": [
|
||||
"https://example.com/security/icsa-25-123-01.pdf",
|
||||
"https://www.cisa.gov/news-events/ics-advisories/icsa-25-123-01",
|
||||
],
|
||||
"signature": {"status": "missing", "reason": "unsigned_source"},
|
||||
"fetched_at": now_iso,
|
||||
"run_id": "",
|
||||
"payload_sha256": sha256_bytes(b"ICSA-25-123-01 Example ControlSuite advisory payload"),
|
||||
},
|
||||
{
|
||||
"advisory_id": "ICSMA-25-045-01",
|
||||
"source": "icscisa",
|
||||
"source_url": "https://www.cisa.gov/news-events/ics-medical-advisories/icsma-25-045-01",
|
||||
"title": "Example Medical Advisory",
|
||||
"summary": "HealthTech infusion pump vulnerabilities including two CVEs.",
|
||||
"published": "2025-10-14T09:30:00Z",
|
||||
"updated": "2025-12-01T00:00:00Z",
|
||||
"severity": "Medium",
|
||||
"cvss": {"version": "3.1", "vector": "CVSS:3.1/AV:N/AC:H/PR:L/UI:R/S:U/C:L/I:L/A:L", "score": 6.3},
|
||||
"cwe": ["CWE-319"],
|
||||
"affected_products": [{"vendor": "HealthTech", "product": "InfusionManager", "versions": ["2.1.0", "2.1.1"]}],
|
||||
"references": [
|
||||
"https://www.cisa.gov/news-events/ics-medical-advisories/icsma-25-045-01",
|
||||
"https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2025-11111",
|
||||
],
|
||||
"signature": {"status": "missing", "reason": "unsigned_source"},
|
||||
"fetched_at": now_iso,
|
||||
"run_id": "",
|
||||
"payload_sha256": sha256_bytes(b"ICSMA-25-045-01 Example medical advisory payload"),
|
||||
},
|
||||
{
|
||||
"advisory_id": "KISA-2025-5859",
|
||||
"source": "kisa",
|
||||
"source_url": "https://knvd.krcert.or.kr/detailDos.do?IDX=5859",
|
||||
"title": "KISA sample advisory 5859",
|
||||
"summary": "Remote code execution in ControlBoard service (offline HTML snapshot).",
|
||||
"published": "2025-11-03T22:53:00Z",
|
||||
"updated": "2025-12-02T00:00:00Z",
|
||||
"severity": "High",
|
||||
"cvss": {"version": "3.1", "vector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", "score": 9.8},
|
||||
"cwe": ["CWE-787"],
|
||||
"affected_products": [{"vendor": "ACME", "product": "ControlBoard", "versions": ["1.0.1.0084", "2.0.1.0034"]}],
|
||||
"references": [
|
||||
"https://knvd.krcert.or.kr/rss/securityInfo.do",
|
||||
"https://knvd.krcert.or.kr/detailDos.do?IDX=5859",
|
||||
],
|
||||
"signature": {"status": "missing", "reason": "unsigned_source"},
|
||||
"fetched_at": now_iso,
|
||||
"run_id": "",
|
||||
"payload_sha256": sha256_bytes(b"KISA advisory IDX 5859 cached HTML payload"),
|
||||
},
|
||||
{
|
||||
"advisory_id": "KISA-2025-5860",
|
||||
"source": "kisa",
|
||||
"source_url": "https://knvd.krcert.or.kr/detailDos.do?IDX=5860",
|
||||
"title": "KISA sample advisory 5860",
|
||||
"summary": "Authentication bypass via default credentials in NetGateway appliance.",
|
||||
"published": "2025-11-03T22:53:00Z",
|
||||
"updated": "2025-12-02T00:00:00Z",
|
||||
"severity": "Medium",
|
||||
"cvss": {"version": "3.1", "vector": "CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:U/C:L/I:L/A:L", "score": 7.3},
|
||||
"cwe": ["CWE-798"],
|
||||
"affected_products": [{"vendor": "NetGateway", "product": "Edge", "versions": ["3.4.2", "3.4.3"]}],
|
||||
"references": [
|
||||
"https://knvd.krcert.or.kr/rss/securityInfo.do",
|
||||
"https://knvd.krcert.or.kr/detailDos.do?IDX=5860",
|
||||
],
|
||||
"signature": {"status": "missing", "reason": "unsigned_source"},
|
||||
"fetched_at": now_iso,
|
||||
"run_id": "",
|
||||
"payload_sha256": sha256_bytes(b"KISA advisory IDX 5860 cached HTML payload"),
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
def build_records(
|
||||
run_id: str,
|
||||
fetched_at: str,
|
||||
live_fetch: bool,
|
||||
offline_only: bool,
|
||||
icscisa_url: str,
|
||||
kisa_url: str,
|
||||
) -> Tuple[List[Dict[str, object]], Dict[str, str]]:
|
||||
samples = sample_records()
|
||||
sample_icscisa = [r for r in samples if r["source"] == "icscisa"]
|
||||
sample_kisa = [r for r in samples if r["source"] == "kisa"]
|
||||
status = {"icscisa": "offline", "kisa": "offline"}
|
||||
records: List[Dict[str, object]] = []
|
||||
|
||||
if live_fetch and not offline_only:
|
||||
try:
|
||||
icscisa_items = list(parse_rss_items(safe_request(icscisa_url)))
|
||||
for item in icscisa_items:
|
||||
records.append(normalize_icscisa_record(item, fetched_at, run_id))
|
||||
status["icscisa"] = f"live:{len(icscisa_items)}"
|
||||
except (URLError, HTTPError, ElementTree.ParseError, TimeoutError) as exc:
|
||||
print(f"[warn] ICS CISA fetch failed ({exc}); falling back to samples.", file=sys.stderr)
|
||||
|
||||
try:
|
||||
kisa_items = list(parse_rss_items(safe_request(kisa_url)))
|
||||
for item in kisa_items:
|
||||
records.append(normalize_kisa_record(item, fetched_at, run_id))
|
||||
status["kisa"] = f"live:{len(kisa_items)}"
|
||||
except (URLError, HTTPError, ElementTree.ParseError, TimeoutError) as exc:
|
||||
print(f"[warn] KISA fetch failed ({exc}); falling back to samples.", file=sys.stderr)
|
||||
|
||||
if not records or status["icscisa"].startswith("live") is False:
|
||||
records.extend(apply_run_metadata(sample_icscisa, run_id, fetched_at))
|
||||
status["icscisa"] = status.get("icscisa") or "offline"
|
||||
|
||||
if not any(r["source"] == "kisa" for r in records):
|
||||
records.extend(apply_run_metadata(sample_kisa, run_id, fetched_at))
|
||||
status["kisa"] = status.get("kisa") or "offline"
|
||||
|
||||
return records, status
|
||||
|
||||
|
||||
def apply_run_metadata(records: Iterable[Dict[str, object]], run_id: str, fetched_at: str) -> List[Dict[str, object]]:
|
||||
updated = []
|
||||
for record in records:
|
||||
copy = dict(record)
|
||||
copy["run_id"] = run_id
|
||||
copy["fetched_at"] = fetched_at
|
||||
copy["payload_sha256"] = record.get("payload_sha256") or sha256_bytes(json.dumps(record, sort_keys=True).encode("utf-8"))
|
||||
updated.append(copy)
|
||||
return updated
|
||||
|
||||
|
||||
def find_previous_snapshot(base_dir: Path, current_run_date: str) -> Path | None:
|
||||
if not base_dir.exists():
|
||||
return None
|
||||
candidates = sorted(p for p in base_dir.iterdir() if p.is_dir() and p.name != current_run_date)
|
||||
if not candidates:
|
||||
return None
|
||||
return candidates[-1] / "advisories.ndjson"
|
||||
|
||||
|
||||
def load_previous_hash(path: Path | None) -> str | None:
|
||||
if path and path.exists():
|
||||
return sha256_bytes(path.read_bytes())
|
||||
return None
|
||||
|
||||
|
||||
def compute_delta(new_records: List[Dict[str, object]], previous_path: Path | None) -> Dict[str, object]:
|
||||
prev_records = {}
|
||||
if previous_path and previous_path.exists():
|
||||
with previous_path.open("r", encoding="utf-8") as handle:
|
||||
for line in handle:
|
||||
if line.strip():
|
||||
rec = json.loads(line)
|
||||
prev_records[rec["advisory_id"]] = rec
|
||||
|
||||
new_by_id = {r["advisory_id"]: r for r in new_records}
|
||||
added = [rid for rid in new_by_id if rid not in prev_records]
|
||||
updated = [
|
||||
rid
|
||||
for rid, rec in new_by_id.items()
|
||||
if rid in prev_records and rec.get("payload_sha256") != prev_records[rid].get("payload_sha256")
|
||||
]
|
||||
removed = [rid for rid in prev_records if rid not in new_by_id]
|
||||
|
||||
return {
|
||||
"added": {"icscisa": [rid for rid in added if new_by_id[rid]["source"] == "icscisa"],
|
||||
"kisa": [rid for rid in added if new_by_id[rid]["source"] == "kisa"]},
|
||||
"updated": {"icscisa": [rid for rid in updated if new_by_id[rid]["source"] == "icscisa"],
|
||||
"kisa": [rid for rid in updated if new_by_id[rid]["source"] == "kisa"]},
|
||||
"removed": {"icscisa": [rid for rid in removed if prev_records[rid]["source"] == "icscisa"],
|
||||
"kisa": [rid for rid in removed if prev_records[rid]["source"] == "kisa"]},
|
||||
"totals": {
|
||||
"icscisa": {
|
||||
"added": len([rid for rid in added if new_by_id[rid]["source"] == "icscisa"]),
|
||||
"updated": len([rid for rid in updated if new_by_id[rid]["source"] == "icscisa"]),
|
||||
"removed": len([rid for rid in removed if prev_records[rid]["source"] == "icscisa"]),
|
||||
"remaining": len([rid for rid, rec in new_by_id.items() if rec["source"] == "icscisa"]),
|
||||
},
|
||||
"kisa": {
|
||||
"added": len([rid for rid in added if new_by_id[rid]["source"] == "kisa"]),
|
||||
"updated": len([rid for rid in updated if new_by_id[rid]["source"] == "kisa"]),
|
||||
"removed": len([rid for rid in removed if prev_records[rid]["source"] == "kisa"]),
|
||||
"remaining": len([rid for rid, rec in new_by_id.items() if rec["source"] == "kisa"]),
|
||||
},
|
||||
"overall": len(new_records),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def write_ndjson(records: List[Dict[str, object]], path: Path) -> None:
|
||||
path.write_text("\n".join(json.dumps(r, sort_keys=True, separators=(",", ":")) for r in records) + "\n", encoding="utf-8")
|
||||
|
||||
|
||||
def write_fetch_log(
|
||||
path: Path,
|
||||
run_id: str,
|
||||
start: str,
|
||||
end: str,
|
||||
status: Dict[str, str],
|
||||
gateway_host: str,
|
||||
gateway_scheme: str,
|
||||
icscisa_url: str,
|
||||
kisa_url: str,
|
||||
live_fetch: bool,
|
||||
offline_only: bool,
|
||||
) -> None:
|
||||
lines = [
|
||||
f"run_id={run_id} start={start} end={end}",
|
||||
f"sources=icscisa,kisa cadence=weekly backlog_window=60d live_fetch={str(live_fetch).lower()} offline_only={str(offline_only).lower()}",
|
||||
f"gateway={gateway_scheme}://{gateway_host}",
|
||||
f"icscisa_url={icscisa_url} status={status.get('icscisa','offline')} retries=0",
|
||||
f"kisa_url={kisa_url} status={status.get('kisa','offline')} retries=0",
|
||||
"outputs=advisories.ndjson,delta.json,hashes.sha256",
|
||||
]
|
||||
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
||||
|
||||
|
||||
def write_hashes(dir_path: Path) -> None:
|
||||
entries = []
|
||||
for name in ["advisories.ndjson", "delta.json", "fetch.log"]:
|
||||
file_path = dir_path / name
|
||||
entries.append(f"{sha256_bytes(file_path.read_bytes())} {name}")
|
||||
(dir_path / "hashes.sha256").write_text("\n".join(entries) + "\n", encoding="utf-8")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Run ICS/KISA feed refresh SOP v0.2")
|
||||
parser.add_argument("--out-dir", default=str(DEFAULT_OUTPUT_ROOT), help="Base output directory (default: out/feeds/icscisa-kisa)")
|
||||
parser.add_argument("--run-date", default=None, help="Override run date (YYYYMMDD)")
|
||||
parser.add_argument("--run-id", default=None, help="Override run id")
|
||||
parser.add_argument("--live", action="store_true", default=False, help="Force live fetch (default: enabled via env LIVE_FETCH=true)")
|
||||
parser.add_argument("--offline", action="store_true", default=False, help="Force offline samples only")
|
||||
args = parser.parse_args()
|
||||
|
||||
now = utcnow()
|
||||
run_date = args.run_date or now.strftime("%Y%m%d")
|
||||
run_id = args.run_id or f"icscisa-kisa-{now.strftime('%Y%m%dT%H%M%SZ')}"
|
||||
fetched_at = iso(now)
|
||||
start = fetched_at
|
||||
|
||||
live_fetch = args.live or os.getenv("LIVE_FETCH", "true").lower() == "true"
|
||||
offline_only = args.offline or os.getenv("OFFLINE_SNAPSHOT", "false").lower() == "true"
|
||||
|
||||
output_root = Path(args.out_dir)
|
||||
output_dir = output_root / run_date
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
previous_path = find_previous_snapshot(output_root, run_date)
|
||||
|
||||
gateway_host = os.getenv("FEED_GATEWAY_HOST", DEFAULT_GATEWAY_HOST)
|
||||
gateway_scheme = os.getenv("FEED_GATEWAY_SCHEME", DEFAULT_GATEWAY_SCHEME)
|
||||
|
||||
def resolve_feed(url_env: str, default_url: str) -> str:
|
||||
if url_env:
|
||||
return url_env
|
||||
parsed = urlparse(default_url)
|
||||
# Replace host/scheme to allow on-prem DNS (docker network) defaults.
|
||||
rewritten = parsed._replace(netloc=gateway_host, scheme=gateway_scheme)
|
||||
return urlunparse(rewritten)
|
||||
|
||||
resolved_icscisa_url = resolve_feed(os.getenv("ICSCISA_FEED_URL"), DEFAULT_ICSCISA_URL)
|
||||
resolved_kisa_url = resolve_feed(os.getenv("KISA_FEED_URL"), DEFAULT_KISA_URL)
|
||||
|
||||
records, status = build_records(
|
||||
run_id=run_id,
|
||||
fetched_at=fetched_at,
|
||||
live_fetch=live_fetch,
|
||||
offline_only=offline_only,
|
||||
icscisa_url=resolved_icscisa_url,
|
||||
kisa_url=resolved_kisa_url,
|
||||
)
|
||||
|
||||
write_ndjson(records, output_dir / "advisories.ndjson")
|
||||
|
||||
delta = compute_delta(records, previous_path)
|
||||
delta_payload = {
|
||||
"run_id": run_id,
|
||||
"generated_at": iso(utcnow()),
|
||||
**delta,
|
||||
"previous_snapshot_sha256": load_previous_hash(previous_path),
|
||||
}
|
||||
(output_dir / "delta.json").write_text(json.dumps(delta_payload, separators=(",", ":")) + "\n", encoding="utf-8")
|
||||
|
||||
end = iso(utcnow())
|
||||
write_fetch_log(
|
||||
output_dir / "fetch.log",
|
||||
run_id,
|
||||
start,
|
||||
end,
|
||||
status,
|
||||
gateway_host=gateway_host,
|
||||
gateway_scheme=gateway_scheme,
|
||||
icscisa_url=resolved_icscisa_url,
|
||||
kisa_url=resolved_kisa_url,
|
||||
live_fetch=live_fetch and not offline_only,
|
||||
offline_only=offline_only,
|
||||
)
|
||||
write_hashes(output_dir)
|
||||
|
||||
print(f"[ok] wrote {len(records)} advisories to {output_dir}")
|
||||
print(f" run_id={run_id} live_fetch={live_fetch and not offline_only} offline_only={offline_only}")
|
||||
print(f" gateway={gateway_scheme}://{gateway_host}")
|
||||
print(f" icscisa_url={resolved_icscisa_url}")
|
||||
print(f" kisa_url={resolved_kisa_url}")
|
||||
print(f" status={status}")
|
||||
if previous_path:
|
||||
print(f" previous_snapshot={previous_path}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
2
deploy/tools/feeds/vex/requirements.txt
Normal file
2
deploy/tools/feeds/vex/requirements.txt
Normal file
@@ -0,0 +1,2 @@
|
||||
blake3==0.4.1
|
||||
jsonschema==4.22.0
|
||||
176
deploy/tools/feeds/vex/verify_proof_bundle.py
Normal file
176
deploy/tools/feeds/vex/verify_proof_bundle.py
Normal file
@@ -0,0 +1,176 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Offline verifier for StellaOps VEX proof bundles.
|
||||
|
||||
- Validates the bundle against `docs/benchmarks/vex-evidence-playbook.schema.json`.
|
||||
- Checks justification IDs against the signed catalog.
|
||||
- Recomputes hashes for CAS artefacts, OpenVEX payload, and DSSE envelopes.
|
||||
- Enforces coverage and negative-test requirements per task VEX-GAPS-401-062.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import base64
|
||||
import json
|
||||
from pathlib import Path
|
||||
import sys
|
||||
from typing import Dict, Any
|
||||
|
||||
import jsonschema
|
||||
from blake3 import blake3
|
||||
|
||||
|
||||
def load_json(path: Path) -> Any:
|
||||
return json.loads(path.read_text(encoding="utf-8"))
|
||||
|
||||
|
||||
def digest_for(data: bytes, algo: str) -> str:
|
||||
if algo == "sha256":
|
||||
import hashlib
|
||||
|
||||
return hashlib.sha256(data).hexdigest()
|
||||
if algo == "blake3":
|
||||
return blake3(data).hexdigest()
|
||||
raise ValueError(f"Unsupported hash algorithm: {algo}")
|
||||
|
||||
|
||||
def parse_digest(digest: str) -> tuple[str, str]:
|
||||
if ":" not in digest:
|
||||
raise ValueError(f"Digest missing prefix: {digest}")
|
||||
algo, value = digest.split(":", 1)
|
||||
return algo, value
|
||||
|
||||
|
||||
def verify_digest(path: Path, expected: str) -> None:
|
||||
algo, value = parse_digest(expected)
|
||||
actual = digest_for(path.read_bytes(), algo)
|
||||
if actual.lower() != value.lower():
|
||||
raise ValueError(f"Digest mismatch for {path}: expected {value}, got {actual}")
|
||||
|
||||
|
||||
def resolve_cas_uri(cas_root: Path, cas_uri: str) -> Path:
|
||||
if not cas_uri.startswith("cas://"):
|
||||
raise ValueError(f"CAS URI must start with cas:// — got {cas_uri}")
|
||||
relative = cas_uri[len("cas://") :]
|
||||
return cas_root / relative
|
||||
|
||||
|
||||
def verify_dsse(dsse_ref: Dict[str, Any]) -> None:
|
||||
path = Path(dsse_ref["path"])
|
||||
verify_digest(path, dsse_ref["sha256"])
|
||||
if "payload_sha256" in dsse_ref:
|
||||
envelope = load_json(path)
|
||||
payload = base64.b64decode(envelope["payload"])
|
||||
verify_digest_from_bytes(payload, dsse_ref["payload_sha256"])
|
||||
|
||||
|
||||
def verify_digest_from_bytes(data: bytes, expected: str) -> None:
|
||||
algo, value = parse_digest(expected)
|
||||
actual = digest_for(data, algo)
|
||||
if actual.lower() != value.lower():
|
||||
raise ValueError(f"Digest mismatch for payload: expected {value}, got {actual}")
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description="Verify a StellaOps VEX proof bundle.")
|
||||
parser.add_argument("--bundle", required=True, type=Path)
|
||||
parser.add_argument("--schema", required=True, type=Path)
|
||||
parser.add_argument("--catalog", required=True, type=Path)
|
||||
parser.add_argument("--cas-root", required=True, type=Path)
|
||||
parser.add_argument("--min-coverage", type=float, default=95.0)
|
||||
args = parser.parse_args()
|
||||
|
||||
bundle = load_json(args.bundle)
|
||||
schema = load_json(args.schema)
|
||||
catalog = load_json(args.catalog)
|
||||
|
||||
jsonschema.validate(instance=bundle, schema=schema)
|
||||
|
||||
justification_ids = {entry["id"] for entry in catalog.get("entries", [])}
|
||||
if bundle["justification"]["id"] not in justification_ids:
|
||||
raise ValueError(f"Justification {bundle['justification']['id']} not found in catalog")
|
||||
|
||||
# Justification DSSE integrity
|
||||
if "dsse" in bundle["justification"]:
|
||||
verify_dsse(bundle["justification"]["dsse"])
|
||||
|
||||
# OpenVEX canonical hashes
|
||||
openvex_path = Path(bundle["openvex"]["path"])
|
||||
openvex_bytes = openvex_path.read_bytes()
|
||||
verify_digest_from_bytes(openvex_bytes, bundle["openvex"]["canonical_sha256"])
|
||||
verify_digest_from_bytes(openvex_bytes, bundle["openvex"]["canonical_blake3"])
|
||||
|
||||
# CAS evidence
|
||||
evidence_by_type: Dict[str, Dict[str, Any]] = {}
|
||||
for ev in bundle["evidence"]:
|
||||
ev_path = resolve_cas_uri(args.cas_root, ev["cas_uri"])
|
||||
verify_digest(ev_path, ev["hash"])
|
||||
if "dsse" in ev:
|
||||
verify_dsse(ev["dsse"])
|
||||
evidence_by_type.setdefault(ev["type"], ev)
|
||||
|
||||
# Graph hash alignment
|
||||
graph = bundle["graph"]
|
||||
graph_evidence = evidence_by_type.get("graph")
|
||||
if not graph_evidence:
|
||||
raise ValueError("Graph evidence missing from bundle")
|
||||
if graph["hash"].lower() != graph_evidence["hash"].lower():
|
||||
raise ValueError("Graph hash does not match evidence hash")
|
||||
if "dsse" in graph:
|
||||
verify_dsse(graph["dsse"])
|
||||
|
||||
# Entrypoint coverage + negative tests + config/flags hashes
|
||||
for ep in bundle["entrypoints"]:
|
||||
if ep["coverage_percent"] < args.min_coverage:
|
||||
raise ValueError(
|
||||
f"Entrypoint {ep['id']} coverage {ep['coverage_percent']} below required {args.min_coverage}"
|
||||
)
|
||||
if not ep["negative_tests"]:
|
||||
raise ValueError(f"Entrypoint {ep['id']} missing negative test confirmation")
|
||||
config_ev = evidence_by_type.get("config")
|
||||
if not config_ev or config_ev["hash"].lower() != ep["config_hash"].lower():
|
||||
raise ValueError(f"Entrypoint {ep['id']} config_hash not backed by evidence")
|
||||
flags_ev = evidence_by_type.get("flags")
|
||||
if not flags_ev or flags_ev["hash"].lower() != ep["flags_hash"].lower():
|
||||
raise ValueError(f"Entrypoint {ep['id']} flags_hash not backed by evidence")
|
||||
|
||||
# RBAC enforcement
|
||||
rbac = bundle["rbac"]
|
||||
if rbac["approvals_required"] < 1 or not rbac["roles_allowed"]:
|
||||
raise ValueError("RBAC section is incomplete")
|
||||
|
||||
# Reevaluation triggers: must all be true to satisfy VEX-GAPS-401-062
|
||||
reevaluation = bundle["reevaluation"]
|
||||
if not all(
|
||||
[
|
||||
reevaluation.get("on_sbom_change"),
|
||||
reevaluation.get("on_graph_change"),
|
||||
reevaluation.get("on_runtime_change"),
|
||||
]
|
||||
):
|
||||
raise ValueError("Reevaluation triggers must all be true")
|
||||
|
||||
# Uncertainty gating present
|
||||
uncertainty = bundle["uncertainty"]
|
||||
if uncertainty["state"] not in {"U0-none", "U1-low", "U2-medium", "U3-high"}:
|
||||
raise ValueError("Invalid uncertainty state")
|
||||
|
||||
# Signature envelope integrity (best-effort)
|
||||
default_dsse_path = args.bundle.with_suffix(".dsse.json")
|
||||
if default_dsse_path.exists():
|
||||
sig_envelope_digest = f"sha256:{digest_for(default_dsse_path.read_bytes(), 'sha256')}"
|
||||
for sig in bundle["signatures"]:
|
||||
if sig["envelope_digest"].lower() != sig_envelope_digest.lower():
|
||||
raise ValueError("Signature envelope digest mismatch")
|
||||
|
||||
print("✔ VEX proof bundle verified")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
sys.exit(main())
|
||||
except Exception as exc: # pragma: no cover - top-level guard
|
||||
print(f"Verification failed: {exc}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
Reference in New Issue
Block a user