#!/usr/bin/env python3 """ ICS/KISA feed refresh runner. Runs the SOP v0.2 workflow to emit NDJSON advisories, delta, fetch log, and hash manifest under out/feeds/icscisa-kisa//. Defaults to live fetch with offline-safe fallback to baked-in samples. You can force live/offline via env or CLI flags. """ from __future__ import annotations import argparse import datetime as dt import hashlib import json import os import re import sys from html import unescape from pathlib import Path from typing import Dict, Iterable, List, Tuple from urllib.error import URLError, HTTPError from urllib.parse import urlparse, urlunparse from urllib.request import Request, urlopen from xml.etree import ElementTree DEFAULT_OUTPUT_ROOT = Path("out/feeds/icscisa-kisa") DEFAULT_ICSCISA_URL = "https://www.cisa.gov/news-events/ics-advisories/icsa.xml" DEFAULT_KISA_URL = "https://knvd.krcert.or.kr/rss/securityInfo.do" DEFAULT_GATEWAY_HOST = "concelier-webservice" DEFAULT_GATEWAY_SCHEME = "http" USER_AGENT = "StellaOpsFeedRefresh/1.0 (+https://stella-ops.org)" def utcnow() -> dt.datetime: return dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc) def iso(ts: dt.datetime) -> str: return ts.strftime("%Y-%m-%dT%H:%M:%SZ") def sha256_bytes(data: bytes) -> str: return hashlib.sha256(data).hexdigest() def strip_html(value: str) -> str: return re.sub(r"<[^>]+>", "", value or "").strip() def safe_request(url: str) -> bytes: req = Request(url, headers={"User-Agent": USER_AGENT}) with urlopen(req, timeout=30) as resp: return resp.read() def parse_rss_items(xml_bytes: bytes) -> Iterable[Dict[str, str]]: root = ElementTree.fromstring(xml_bytes) for item in root.findall(".//item"): title = (item.findtext("title") or "").strip() link = (item.findtext("link") or "").strip() description = strip_html(unescape(item.findtext("description") or "")) pub_date = (item.findtext("pubDate") or "").strip() yield { "title": title, "link": link, "description": description, "pub_date": pub_date, } def normalize_icscisa_record(item: Dict[str, str], fetched_at: str, run_id: str) -> Dict[str, object]: advisory_id = item["title"].split(":")[0].strip() or "icsa-unknown" summary = item["description"] or item["title"] raw_payload = f"{item['title']}\n{item['link']}\n{item['description']}" record = { "advisory_id": advisory_id, "source": "icscisa", "source_url": item["link"] or DEFAULT_ICSCISA_URL, "title": item["title"] or advisory_id, "summary": summary, "published": iso(parse_pubdate(item["pub_date"])), "updated": iso(parse_pubdate(item["pub_date"])), "severity": "unknown", "cvss": None, "cwe": [], "affected_products": [], "references": [url for url in (item["link"],) if url], "signature": {"status": "missing", "reason": "unsigned_source"}, "fetched_at": fetched_at, "run_id": run_id, "payload_sha256": sha256_bytes(raw_payload.encode("utf-8")), } return record def normalize_kisa_record(item: Dict[str, str], fetched_at: str, run_id: str) -> Dict[str, object]: advisory_id = extract_kisa_id(item) raw_payload = f"{item['title']}\n{item['link']}\n{item['description']}" record = { "advisory_id": advisory_id, "source": "kisa", "source_url": item["link"] or DEFAULT_KISA_URL, "title": item["title"] or advisory_id, "summary": item["description"] or item["title"], "published": iso(parse_pubdate(item["pub_date"])), "updated": iso(parse_pubdate(item["pub_date"])), "severity": "unknown", "cvss": None, "cwe": [], "affected_products": [], "references": [url for url in (item["link"], DEFAULT_KISA_URL) if url], "signature": {"status": "missing", "reason": "unsigned_source"}, "fetched_at": fetched_at, "run_id": run_id, "payload_sha256": sha256_bytes(raw_payload.encode("utf-8")), } return record def extract_kisa_id(item: Dict[str, str]) -> str: link = item["link"] match = re.search(r"IDX=([0-9]+)", link) if match: return f"KISA-{match.group(1)}" return (item["title"].split()[0] if item["title"] else "KISA-unknown").strip() def parse_pubdate(value: str) -> dt.datetime: if not value: return utcnow() try: # RFC1123-ish return dt.datetime.strptime(value, "%a, %d %b %Y %H:%M:%S %Z").replace(tzinfo=dt.timezone.utc) except ValueError: try: return dt.datetime.fromisoformat(value.replace("Z", "+00:00")) except ValueError: return utcnow() def sample_records() -> List[Dict[str, object]]: now_iso = iso(utcnow()) return [ { "advisory_id": "ICSA-25-123-01", "source": "icscisa", "source_url": "https://www.cisa.gov/news-events/ics-advisories/icsa-25-123-01", "title": "Example ICS Advisory", "summary": "Example Corp ControlSuite RCE via exposed management service.", "published": "2025-10-13T12:00:00Z", "updated": "2025-11-30T00:00:00Z", "severity": "High", "cvss": {"version": "3.1", "vector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", "score": 9.8}, "cwe": ["CWE-269"], "affected_products": [{"vendor": "Example Corp", "product": "ControlSuite", "versions": ["4.2.0", "4.2.1"]}], "references": [ "https://example.com/security/icsa-25-123-01.pdf", "https://www.cisa.gov/news-events/ics-advisories/icsa-25-123-01", ], "signature": {"status": "missing", "reason": "unsigned_source"}, "fetched_at": now_iso, "run_id": "", "payload_sha256": sha256_bytes(b"ICSA-25-123-01 Example ControlSuite advisory payload"), }, { "advisory_id": "ICSMA-25-045-01", "source": "icscisa", "source_url": "https://www.cisa.gov/news-events/ics-medical-advisories/icsma-25-045-01", "title": "Example Medical Advisory", "summary": "HealthTech infusion pump vulnerabilities including two CVEs.", "published": "2025-10-14T09:30:00Z", "updated": "2025-12-01T00:00:00Z", "severity": "Medium", "cvss": {"version": "3.1", "vector": "CVSS:3.1/AV:N/AC:H/PR:L/UI:R/S:U/C:L/I:L/A:L", "score": 6.3}, "cwe": ["CWE-319"], "affected_products": [{"vendor": "HealthTech", "product": "InfusionManager", "versions": ["2.1.0", "2.1.1"]}], "references": [ "https://www.cisa.gov/news-events/ics-medical-advisories/icsma-25-045-01", "https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2025-11111", ], "signature": {"status": "missing", "reason": "unsigned_source"}, "fetched_at": now_iso, "run_id": "", "payload_sha256": sha256_bytes(b"ICSMA-25-045-01 Example medical advisory payload"), }, { "advisory_id": "KISA-2025-5859", "source": "kisa", "source_url": "https://knvd.krcert.or.kr/detailDos.do?IDX=5859", "title": "KISA sample advisory 5859", "summary": "Remote code execution in ControlBoard service (offline HTML snapshot).", "published": "2025-11-03T22:53:00Z", "updated": "2025-12-02T00:00:00Z", "severity": "High", "cvss": {"version": "3.1", "vector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", "score": 9.8}, "cwe": ["CWE-787"], "affected_products": [{"vendor": "ACME", "product": "ControlBoard", "versions": ["1.0.1.0084", "2.0.1.0034"]}], "references": [ "https://knvd.krcert.or.kr/rss/securityInfo.do", "https://knvd.krcert.or.kr/detailDos.do?IDX=5859", ], "signature": {"status": "missing", "reason": "unsigned_source"}, "fetched_at": now_iso, "run_id": "", "payload_sha256": sha256_bytes(b"KISA advisory IDX 5859 cached HTML payload"), }, { "advisory_id": "KISA-2025-5860", "source": "kisa", "source_url": "https://knvd.krcert.or.kr/detailDos.do?IDX=5860", "title": "KISA sample advisory 5860", "summary": "Authentication bypass via default credentials in NetGateway appliance.", "published": "2025-11-03T22:53:00Z", "updated": "2025-12-02T00:00:00Z", "severity": "Medium", "cvss": {"version": "3.1", "vector": "CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:U/C:L/I:L/A:L", "score": 7.3}, "cwe": ["CWE-798"], "affected_products": [{"vendor": "NetGateway", "product": "Edge", "versions": ["3.4.2", "3.4.3"]}], "references": [ "https://knvd.krcert.or.kr/rss/securityInfo.do", "https://knvd.krcert.or.kr/detailDos.do?IDX=5860", ], "signature": {"status": "missing", "reason": "unsigned_source"}, "fetched_at": now_iso, "run_id": "", "payload_sha256": sha256_bytes(b"KISA advisory IDX 5860 cached HTML payload"), }, ] def build_records( run_id: str, fetched_at: str, live_fetch: bool, offline_only: bool, icscisa_url: str, kisa_url: str, ) -> Tuple[List[Dict[str, object]], Dict[str, str]]: samples = sample_records() sample_icscisa = [r for r in samples if r["source"] == "icscisa"] sample_kisa = [r for r in samples if r["source"] == "kisa"] status = {"icscisa": "offline", "kisa": "offline"} records: List[Dict[str, object]] = [] if live_fetch and not offline_only: try: icscisa_items = list(parse_rss_items(safe_request(icscisa_url))) for item in icscisa_items: records.append(normalize_icscisa_record(item, fetched_at, run_id)) status["icscisa"] = f"live:{len(icscisa_items)}" except (URLError, HTTPError, ElementTree.ParseError, TimeoutError) as exc: print(f"[warn] ICS CISA fetch failed ({exc}); falling back to samples.", file=sys.stderr) try: kisa_items = list(parse_rss_items(safe_request(kisa_url))) for item in kisa_items: records.append(normalize_kisa_record(item, fetched_at, run_id)) status["kisa"] = f"live:{len(kisa_items)}" except (URLError, HTTPError, ElementTree.ParseError, TimeoutError) as exc: print(f"[warn] KISA fetch failed ({exc}); falling back to samples.", file=sys.stderr) if not records or status["icscisa"].startswith("live") is False: records.extend(apply_run_metadata(sample_icscisa, run_id, fetched_at)) status["icscisa"] = status.get("icscisa") or "offline" if not any(r["source"] == "kisa" for r in records): records.extend(apply_run_metadata(sample_kisa, run_id, fetched_at)) status["kisa"] = status.get("kisa") or "offline" return records, status def apply_run_metadata(records: Iterable[Dict[str, object]], run_id: str, fetched_at: str) -> List[Dict[str, object]]: updated = [] for record in records: copy = dict(record) copy["run_id"] = run_id copy["fetched_at"] = fetched_at copy["payload_sha256"] = record.get("payload_sha256") or sha256_bytes(json.dumps(record, sort_keys=True).encode("utf-8")) updated.append(copy) return updated def find_previous_snapshot(base_dir: Path, current_run_date: str) -> Path | None: if not base_dir.exists(): return None candidates = sorted(p for p in base_dir.iterdir() if p.is_dir() and p.name != current_run_date) if not candidates: return None return candidates[-1] / "advisories.ndjson" def load_previous_hash(path: Path | None) -> str | None: if path and path.exists(): return sha256_bytes(path.read_bytes()) return None def compute_delta(new_records: List[Dict[str, object]], previous_path: Path | None) -> Dict[str, object]: prev_records = {} if previous_path and previous_path.exists(): with previous_path.open("r", encoding="utf-8") as handle: for line in handle: if line.strip(): rec = json.loads(line) prev_records[rec["advisory_id"]] = rec new_by_id = {r["advisory_id"]: r for r in new_records} added = [rid for rid in new_by_id if rid not in prev_records] updated = [ rid for rid, rec in new_by_id.items() if rid in prev_records and rec.get("payload_sha256") != prev_records[rid].get("payload_sha256") ] removed = [rid for rid in prev_records if rid not in new_by_id] return { "added": {"icscisa": [rid for rid in added if new_by_id[rid]["source"] == "icscisa"], "kisa": [rid for rid in added if new_by_id[rid]["source"] == "kisa"]}, "updated": {"icscisa": [rid for rid in updated if new_by_id[rid]["source"] == "icscisa"], "kisa": [rid for rid in updated if new_by_id[rid]["source"] == "kisa"]}, "removed": {"icscisa": [rid for rid in removed if prev_records[rid]["source"] == "icscisa"], "kisa": [rid for rid in removed if prev_records[rid]["source"] == "kisa"]}, "totals": { "icscisa": { "added": len([rid for rid in added if new_by_id[rid]["source"] == "icscisa"]), "updated": len([rid for rid in updated if new_by_id[rid]["source"] == "icscisa"]), "removed": len([rid for rid in removed if prev_records[rid]["source"] == "icscisa"]), "remaining": len([rid for rid, rec in new_by_id.items() if rec["source"] == "icscisa"]), }, "kisa": { "added": len([rid for rid in added if new_by_id[rid]["source"] == "kisa"]), "updated": len([rid for rid in updated if new_by_id[rid]["source"] == "kisa"]), "removed": len([rid for rid in removed if prev_records[rid]["source"] == "kisa"]), "remaining": len([rid for rid, rec in new_by_id.items() if rec["source"] == "kisa"]), }, "overall": len(new_records), }, } def write_ndjson(records: List[Dict[str, object]], path: Path) -> None: path.write_text("\n".join(json.dumps(r, sort_keys=True, separators=(",", ":")) for r in records) + "\n", encoding="utf-8") def write_fetch_log( path: Path, run_id: str, start: str, end: str, status: Dict[str, str], gateway_host: str, gateway_scheme: str, icscisa_url: str, kisa_url: str, live_fetch: bool, offline_only: bool, ) -> None: lines = [ f"run_id={run_id} start={start} end={end}", f"sources=icscisa,kisa cadence=weekly backlog_window=60d live_fetch={str(live_fetch).lower()} offline_only={str(offline_only).lower()}", f"gateway={gateway_scheme}://{gateway_host}", f"icscisa_url={icscisa_url} status={status.get('icscisa','offline')} retries=0", f"kisa_url={kisa_url} status={status.get('kisa','offline')} retries=0", "outputs=advisories.ndjson,delta.json,hashes.sha256", ] path.write_text("\n".join(lines) + "\n", encoding="utf-8") def write_hashes(dir_path: Path) -> None: entries = [] for name in ["advisories.ndjson", "delta.json", "fetch.log"]: file_path = dir_path / name entries.append(f"{sha256_bytes(file_path.read_bytes())} {name}") (dir_path / "hashes.sha256").write_text("\n".join(entries) + "\n", encoding="utf-8") def main() -> None: parser = argparse.ArgumentParser(description="Run ICS/KISA feed refresh SOP v0.2") parser.add_argument("--out-dir", default=str(DEFAULT_OUTPUT_ROOT), help="Base output directory (default: out/feeds/icscisa-kisa)") parser.add_argument("--run-date", default=None, help="Override run date (YYYYMMDD)") parser.add_argument("--run-id", default=None, help="Override run id") parser.add_argument("--live", action="store_true", default=False, help="Force live fetch (default: enabled via env LIVE_FETCH=true)") parser.add_argument("--offline", action="store_true", default=False, help="Force offline samples only") args = parser.parse_args() now = utcnow() run_date = args.run_date or now.strftime("%Y%m%d") run_id = args.run_id or f"icscisa-kisa-{now.strftime('%Y%m%dT%H%M%SZ')}" fetched_at = iso(now) start = fetched_at live_fetch = args.live or os.getenv("LIVE_FETCH", "true").lower() == "true" offline_only = args.offline or os.getenv("OFFLINE_SNAPSHOT", "false").lower() == "true" output_root = Path(args.out_dir) output_dir = output_root / run_date output_dir.mkdir(parents=True, exist_ok=True) previous_path = find_previous_snapshot(output_root, run_date) gateway_host = os.getenv("FEED_GATEWAY_HOST", DEFAULT_GATEWAY_HOST) gateway_scheme = os.getenv("FEED_GATEWAY_SCHEME", DEFAULT_GATEWAY_SCHEME) def resolve_feed(url_env: str, default_url: str) -> str: if url_env: return url_env parsed = urlparse(default_url) # Replace host/scheme to allow on-prem DNS (docker network) defaults. rewritten = parsed._replace(netloc=gateway_host, scheme=gateway_scheme) return urlunparse(rewritten) resolved_icscisa_url = resolve_feed(os.getenv("ICSCISA_FEED_URL"), DEFAULT_ICSCISA_URL) resolved_kisa_url = resolve_feed(os.getenv("KISA_FEED_URL"), DEFAULT_KISA_URL) records, status = build_records( run_id=run_id, fetched_at=fetched_at, live_fetch=live_fetch, offline_only=offline_only, icscisa_url=resolved_icscisa_url, kisa_url=resolved_kisa_url, ) write_ndjson(records, output_dir / "advisories.ndjson") delta = compute_delta(records, previous_path) delta_payload = { "run_id": run_id, "generated_at": iso(utcnow()), **delta, "previous_snapshot_sha256": load_previous_hash(previous_path), } (output_dir / "delta.json").write_text(json.dumps(delta_payload, separators=(",", ":")) + "\n", encoding="utf-8") end = iso(utcnow()) write_fetch_log( output_dir / "fetch.log", run_id, start, end, status, gateway_host=gateway_host, gateway_scheme=gateway_scheme, icscisa_url=resolved_icscisa_url, kisa_url=resolved_kisa_url, live_fetch=live_fetch and not offline_only, offline_only=offline_only, ) write_hashes(output_dir) print(f"[ok] wrote {len(records)} advisories to {output_dir}") print(f" run_id={run_id} live_fetch={live_fetch and not offline_only} offline_only={offline_only}") print(f" gateway={gateway_scheme}://{gateway_host}") print(f" icscisa_url={resolved_icscisa_url}") print(f" kisa_url={resolved_kisa_url}") print(f" status={status}") if previous_path: print(f" previous_snapshot={previous_path}") if __name__ == "__main__": main()