Files
git.stella-ops.org/scripts/feeds/run_icscisa_kisa_refresh.py
StellaOps Bot bc0762e97d up
2025-12-09 00:20:52 +02:00

468 lines
19 KiB
Python

#!/usr/bin/env python3
"""
ICS/KISA feed refresh runner.
Runs the SOP v0.2 workflow to emit NDJSON advisories, delta, fetch log, and hash
manifest under out/feeds/icscisa-kisa/<YYYYMMDD>/.
Defaults to live fetch with offline-safe fallback to baked-in samples. You can
force live/offline via env or CLI flags.
"""
from __future__ import annotations
import argparse
import datetime as dt
import hashlib
import json
import os
import re
import sys
from html import unescape
from pathlib import Path
from typing import Dict, Iterable, List, Tuple
from urllib.error import URLError, HTTPError
from urllib.parse import urlparse, urlunparse
from urllib.request import Request, urlopen
from xml.etree import ElementTree
DEFAULT_OUTPUT_ROOT = Path("out/feeds/icscisa-kisa")
DEFAULT_ICSCISA_URL = "https://www.cisa.gov/news-events/ics-advisories/icsa.xml"
DEFAULT_KISA_URL = "https://knvd.krcert.or.kr/rss/securityInfo.do"
DEFAULT_GATEWAY_HOST = "concelier-webservice"
DEFAULT_GATEWAY_SCHEME = "http"
USER_AGENT = "StellaOpsFeedRefresh/1.0 (+https://stella-ops.org)"
def utcnow() -> dt.datetime:
return dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc)
def iso(ts: dt.datetime) -> str:
return ts.strftime("%Y-%m-%dT%H:%M:%SZ")
def sha256_bytes(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def strip_html(value: str) -> str:
return re.sub(r"<[^>]+>", "", value or "").strip()
def safe_request(url: str) -> bytes:
req = Request(url, headers={"User-Agent": USER_AGENT})
with urlopen(req, timeout=30) as resp:
return resp.read()
def parse_rss_items(xml_bytes: bytes) -> Iterable[Dict[str, str]]:
root = ElementTree.fromstring(xml_bytes)
for item in root.findall(".//item"):
title = (item.findtext("title") or "").strip()
link = (item.findtext("link") or "").strip()
description = strip_html(unescape(item.findtext("description") or ""))
pub_date = (item.findtext("pubDate") or "").strip()
yield {
"title": title,
"link": link,
"description": description,
"pub_date": pub_date,
}
def normalize_icscisa_record(item: Dict[str, str], fetched_at: str, run_id: str) -> Dict[str, object]:
advisory_id = item["title"].split(":")[0].strip() or "icsa-unknown"
summary = item["description"] or item["title"]
raw_payload = f"{item['title']}\n{item['link']}\n{item['description']}"
record = {
"advisory_id": advisory_id,
"source": "icscisa",
"source_url": item["link"] or DEFAULT_ICSCISA_URL,
"title": item["title"] or advisory_id,
"summary": summary,
"published": iso(parse_pubdate(item["pub_date"])),
"updated": iso(parse_pubdate(item["pub_date"])),
"severity": "unknown",
"cvss": None,
"cwe": [],
"affected_products": [],
"references": [url for url in (item["link"],) if url],
"signature": {"status": "missing", "reason": "unsigned_source"},
"fetched_at": fetched_at,
"run_id": run_id,
"payload_sha256": sha256_bytes(raw_payload.encode("utf-8")),
}
return record
def normalize_kisa_record(item: Dict[str, str], fetched_at: str, run_id: str) -> Dict[str, object]:
advisory_id = extract_kisa_id(item)
raw_payload = f"{item['title']}\n{item['link']}\n{item['description']}"
record = {
"advisory_id": advisory_id,
"source": "kisa",
"source_url": item["link"] or DEFAULT_KISA_URL,
"title": item["title"] or advisory_id,
"summary": item["description"] or item["title"],
"published": iso(parse_pubdate(item["pub_date"])),
"updated": iso(parse_pubdate(item["pub_date"])),
"severity": "unknown",
"cvss": None,
"cwe": [],
"affected_products": [],
"references": [url for url in (item["link"], DEFAULT_KISA_URL) if url],
"signature": {"status": "missing", "reason": "unsigned_source"},
"fetched_at": fetched_at,
"run_id": run_id,
"payload_sha256": sha256_bytes(raw_payload.encode("utf-8")),
}
return record
def extract_kisa_id(item: Dict[str, str]) -> str:
link = item["link"]
match = re.search(r"IDX=([0-9]+)", link)
if match:
return f"KISA-{match.group(1)}"
return (item["title"].split()[0] if item["title"] else "KISA-unknown").strip()
def parse_pubdate(value: str) -> dt.datetime:
if not value:
return utcnow()
try:
# RFC1123-ish
return dt.datetime.strptime(value, "%a, %d %b %Y %H:%M:%S %Z").replace(tzinfo=dt.timezone.utc)
except ValueError:
try:
return dt.datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
return utcnow()
def sample_records() -> List[Dict[str, object]]:
now_iso = iso(utcnow())
return [
{
"advisory_id": "ICSA-25-123-01",
"source": "icscisa",
"source_url": "https://www.cisa.gov/news-events/ics-advisories/icsa-25-123-01",
"title": "Example ICS Advisory",
"summary": "Example Corp ControlSuite RCE via exposed management service.",
"published": "2025-10-13T12:00:00Z",
"updated": "2025-11-30T00:00:00Z",
"severity": "High",
"cvss": {"version": "3.1", "vector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", "score": 9.8},
"cwe": ["CWE-269"],
"affected_products": [{"vendor": "Example Corp", "product": "ControlSuite", "versions": ["4.2.0", "4.2.1"]}],
"references": [
"https://example.com/security/icsa-25-123-01.pdf",
"https://www.cisa.gov/news-events/ics-advisories/icsa-25-123-01",
],
"signature": {"status": "missing", "reason": "unsigned_source"},
"fetched_at": now_iso,
"run_id": "",
"payload_sha256": sha256_bytes(b"ICSA-25-123-01 Example ControlSuite advisory payload"),
},
{
"advisory_id": "ICSMA-25-045-01",
"source": "icscisa",
"source_url": "https://www.cisa.gov/news-events/ics-medical-advisories/icsma-25-045-01",
"title": "Example Medical Advisory",
"summary": "HealthTech infusion pump vulnerabilities including two CVEs.",
"published": "2025-10-14T09:30:00Z",
"updated": "2025-12-01T00:00:00Z",
"severity": "Medium",
"cvss": {"version": "3.1", "vector": "CVSS:3.1/AV:N/AC:H/PR:L/UI:R/S:U/C:L/I:L/A:L", "score": 6.3},
"cwe": ["CWE-319"],
"affected_products": [{"vendor": "HealthTech", "product": "InfusionManager", "versions": ["2.1.0", "2.1.1"]}],
"references": [
"https://www.cisa.gov/news-events/ics-medical-advisories/icsma-25-045-01",
"https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2025-11111",
],
"signature": {"status": "missing", "reason": "unsigned_source"},
"fetched_at": now_iso,
"run_id": "",
"payload_sha256": sha256_bytes(b"ICSMA-25-045-01 Example medical advisory payload"),
},
{
"advisory_id": "KISA-2025-5859",
"source": "kisa",
"source_url": "https://knvd.krcert.or.kr/detailDos.do?IDX=5859",
"title": "KISA sample advisory 5859",
"summary": "Remote code execution in ControlBoard service (offline HTML snapshot).",
"published": "2025-11-03T22:53:00Z",
"updated": "2025-12-02T00:00:00Z",
"severity": "High",
"cvss": {"version": "3.1", "vector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", "score": 9.8},
"cwe": ["CWE-787"],
"affected_products": [{"vendor": "ACME", "product": "ControlBoard", "versions": ["1.0.1.0084", "2.0.1.0034"]}],
"references": [
"https://knvd.krcert.or.kr/rss/securityInfo.do",
"https://knvd.krcert.or.kr/detailDos.do?IDX=5859",
],
"signature": {"status": "missing", "reason": "unsigned_source"},
"fetched_at": now_iso,
"run_id": "",
"payload_sha256": sha256_bytes(b"KISA advisory IDX 5859 cached HTML payload"),
},
{
"advisory_id": "KISA-2025-5860",
"source": "kisa",
"source_url": "https://knvd.krcert.or.kr/detailDos.do?IDX=5860",
"title": "KISA sample advisory 5860",
"summary": "Authentication bypass via default credentials in NetGateway appliance.",
"published": "2025-11-03T22:53:00Z",
"updated": "2025-12-02T00:00:00Z",
"severity": "Medium",
"cvss": {"version": "3.1", "vector": "CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:U/C:L/I:L/A:L", "score": 7.3},
"cwe": ["CWE-798"],
"affected_products": [{"vendor": "NetGateway", "product": "Edge", "versions": ["3.4.2", "3.4.3"]}],
"references": [
"https://knvd.krcert.or.kr/rss/securityInfo.do",
"https://knvd.krcert.or.kr/detailDos.do?IDX=5860",
],
"signature": {"status": "missing", "reason": "unsigned_source"},
"fetched_at": now_iso,
"run_id": "",
"payload_sha256": sha256_bytes(b"KISA advisory IDX 5860 cached HTML payload"),
},
]
def build_records(
run_id: str,
fetched_at: str,
live_fetch: bool,
offline_only: bool,
icscisa_url: str,
kisa_url: str,
) -> Tuple[List[Dict[str, object]], Dict[str, str]]:
samples = sample_records()
sample_icscisa = [r for r in samples if r["source"] == "icscisa"]
sample_kisa = [r for r in samples if r["source"] == "kisa"]
status = {"icscisa": "offline", "kisa": "offline"}
records: List[Dict[str, object]] = []
if live_fetch and not offline_only:
try:
icscisa_items = list(parse_rss_items(safe_request(icscisa_url)))
for item in icscisa_items:
records.append(normalize_icscisa_record(item, fetched_at, run_id))
status["icscisa"] = f"live:{len(icscisa_items)}"
except (URLError, HTTPError, ElementTree.ParseError, TimeoutError) as exc:
print(f"[warn] ICS CISA fetch failed ({exc}); falling back to samples.", file=sys.stderr)
try:
kisa_items = list(parse_rss_items(safe_request(kisa_url)))
for item in kisa_items:
records.append(normalize_kisa_record(item, fetched_at, run_id))
status["kisa"] = f"live:{len(kisa_items)}"
except (URLError, HTTPError, ElementTree.ParseError, TimeoutError) as exc:
print(f"[warn] KISA fetch failed ({exc}); falling back to samples.", file=sys.stderr)
if not records or status["icscisa"].startswith("live") is False:
records.extend(apply_run_metadata(sample_icscisa, run_id, fetched_at))
status["icscisa"] = status.get("icscisa") or "offline"
if not any(r["source"] == "kisa" for r in records):
records.extend(apply_run_metadata(sample_kisa, run_id, fetched_at))
status["kisa"] = status.get("kisa") or "offline"
return records, status
def apply_run_metadata(records: Iterable[Dict[str, object]], run_id: str, fetched_at: str) -> List[Dict[str, object]]:
updated = []
for record in records:
copy = dict(record)
copy["run_id"] = run_id
copy["fetched_at"] = fetched_at
copy["payload_sha256"] = record.get("payload_sha256") or sha256_bytes(json.dumps(record, sort_keys=True).encode("utf-8"))
updated.append(copy)
return updated
def find_previous_snapshot(base_dir: Path, current_run_date: str) -> Path | None:
if not base_dir.exists():
return None
candidates = sorted(p for p in base_dir.iterdir() if p.is_dir() and p.name != current_run_date)
if not candidates:
return None
return candidates[-1] / "advisories.ndjson"
def load_previous_hash(path: Path | None) -> str | None:
if path and path.exists():
return sha256_bytes(path.read_bytes())
return None
def compute_delta(new_records: List[Dict[str, object]], previous_path: Path | None) -> Dict[str, object]:
prev_records = {}
if previous_path and previous_path.exists():
with previous_path.open("r", encoding="utf-8") as handle:
for line in handle:
if line.strip():
rec = json.loads(line)
prev_records[rec["advisory_id"]] = rec
new_by_id = {r["advisory_id"]: r for r in new_records}
added = [rid for rid in new_by_id if rid not in prev_records]
updated = [
rid
for rid, rec in new_by_id.items()
if rid in prev_records and rec.get("payload_sha256") != prev_records[rid].get("payload_sha256")
]
removed = [rid for rid in prev_records if rid not in new_by_id]
return {
"added": {"icscisa": [rid for rid in added if new_by_id[rid]["source"] == "icscisa"],
"kisa": [rid for rid in added if new_by_id[rid]["source"] == "kisa"]},
"updated": {"icscisa": [rid for rid in updated if new_by_id[rid]["source"] == "icscisa"],
"kisa": [rid for rid in updated if new_by_id[rid]["source"] == "kisa"]},
"removed": {"icscisa": [rid for rid in removed if prev_records[rid]["source"] == "icscisa"],
"kisa": [rid for rid in removed if prev_records[rid]["source"] == "kisa"]},
"totals": {
"icscisa": {
"added": len([rid for rid in added if new_by_id[rid]["source"] == "icscisa"]),
"updated": len([rid for rid in updated if new_by_id[rid]["source"] == "icscisa"]),
"removed": len([rid for rid in removed if prev_records[rid]["source"] == "icscisa"]),
"remaining": len([rid for rid, rec in new_by_id.items() if rec["source"] == "icscisa"]),
},
"kisa": {
"added": len([rid for rid in added if new_by_id[rid]["source"] == "kisa"]),
"updated": len([rid for rid in updated if new_by_id[rid]["source"] == "kisa"]),
"removed": len([rid for rid in removed if prev_records[rid]["source"] == "kisa"]),
"remaining": len([rid for rid, rec in new_by_id.items() if rec["source"] == "kisa"]),
},
"overall": len(new_records),
},
}
def write_ndjson(records: List[Dict[str, object]], path: Path) -> None:
path.write_text("\n".join(json.dumps(r, sort_keys=True, separators=(",", ":")) for r in records) + "\n", encoding="utf-8")
def write_fetch_log(
path: Path,
run_id: str,
start: str,
end: str,
status: Dict[str, str],
gateway_host: str,
gateway_scheme: str,
icscisa_url: str,
kisa_url: str,
live_fetch: bool,
offline_only: bool,
) -> None:
lines = [
f"run_id={run_id} start={start} end={end}",
f"sources=icscisa,kisa cadence=weekly backlog_window=60d live_fetch={str(live_fetch).lower()} offline_only={str(offline_only).lower()}",
f"gateway={gateway_scheme}://{gateway_host}",
f"icscisa_url={icscisa_url} status={status.get('icscisa','offline')} retries=0",
f"kisa_url={kisa_url} status={status.get('kisa','offline')} retries=0",
"outputs=advisories.ndjson,delta.json,hashes.sha256",
]
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
def write_hashes(dir_path: Path) -> None:
entries = []
for name in ["advisories.ndjson", "delta.json", "fetch.log"]:
file_path = dir_path / name
entries.append(f"{sha256_bytes(file_path.read_bytes())} {name}")
(dir_path / "hashes.sha256").write_text("\n".join(entries) + "\n", encoding="utf-8")
def main() -> None:
parser = argparse.ArgumentParser(description="Run ICS/KISA feed refresh SOP v0.2")
parser.add_argument("--out-dir", default=str(DEFAULT_OUTPUT_ROOT), help="Base output directory (default: out/feeds/icscisa-kisa)")
parser.add_argument("--run-date", default=None, help="Override run date (YYYYMMDD)")
parser.add_argument("--run-id", default=None, help="Override run id")
parser.add_argument("--live", action="store_true", default=False, help="Force live fetch (default: enabled via env LIVE_FETCH=true)")
parser.add_argument("--offline", action="store_true", default=False, help="Force offline samples only")
args = parser.parse_args()
now = utcnow()
run_date = args.run_date or now.strftime("%Y%m%d")
run_id = args.run_id or f"icscisa-kisa-{now.strftime('%Y%m%dT%H%M%SZ')}"
fetched_at = iso(now)
start = fetched_at
live_fetch = args.live or os.getenv("LIVE_FETCH", "true").lower() == "true"
offline_only = args.offline or os.getenv("OFFLINE_SNAPSHOT", "false").lower() == "true"
output_root = Path(args.out_dir)
output_dir = output_root / run_date
output_dir.mkdir(parents=True, exist_ok=True)
previous_path = find_previous_snapshot(output_root, run_date)
gateway_host = os.getenv("FEED_GATEWAY_HOST", DEFAULT_GATEWAY_HOST)
gateway_scheme = os.getenv("FEED_GATEWAY_SCHEME", DEFAULT_GATEWAY_SCHEME)
def resolve_feed(url_env: str, default_url: str) -> str:
if url_env:
return url_env
parsed = urlparse(default_url)
# Replace host/scheme to allow on-prem DNS (docker network) defaults.
rewritten = parsed._replace(netloc=gateway_host, scheme=gateway_scheme)
return urlunparse(rewritten)
resolved_icscisa_url = resolve_feed(os.getenv("ICSCISA_FEED_URL"), DEFAULT_ICSCISA_URL)
resolved_kisa_url = resolve_feed(os.getenv("KISA_FEED_URL"), DEFAULT_KISA_URL)
records, status = build_records(
run_id=run_id,
fetched_at=fetched_at,
live_fetch=live_fetch,
offline_only=offline_only,
icscisa_url=resolved_icscisa_url,
kisa_url=resolved_kisa_url,
)
write_ndjson(records, output_dir / "advisories.ndjson")
delta = compute_delta(records, previous_path)
delta_payload = {
"run_id": run_id,
"generated_at": iso(utcnow()),
**delta,
"previous_snapshot_sha256": load_previous_hash(previous_path),
}
(output_dir / "delta.json").write_text(json.dumps(delta_payload, separators=(",", ":")) + "\n", encoding="utf-8")
end = iso(utcnow())
write_fetch_log(
output_dir / "fetch.log",
run_id,
start,
end,
status,
gateway_host=gateway_host,
gateway_scheme=gateway_scheme,
icscisa_url=resolved_icscisa_url,
kisa_url=resolved_kisa_url,
live_fetch=live_fetch and not offline_only,
offline_only=offline_only,
)
write_hashes(output_dir)
print(f"[ok] wrote {len(records)} advisories to {output_dir}")
print(f" run_id={run_id} live_fetch={live_fetch and not offline_only} offline_only={offline_only}")
print(f" gateway={gateway_scheme}://{gateway_host}")
print(f" icscisa_url={resolved_icscisa_url}")
print(f" kisa_url={resolved_kisa_url}")
print(f" status={status}")
if previous_path:
print(f" previous_snapshot={previous_path}")
if __name__ == "__main__":
main()