#!/usr/bin/env python3 """ Capture CERT-Bund search/export JSON snapshots and generate Offline Kit manifests. The script can bootstrap a session against https://wid.cert-bund.de, fetch paginated search results plus per-year export payloads, and emit a manifest that records source, date range, SHA-256, and capture timestamps for each artefact. """ from __future__ import annotations import argparse import datetime as dt import hashlib import json import os from pathlib import Path, PurePosixPath import sys import time import urllib.error import urllib.parse import urllib.request from http.cookiejar import MozillaCookieJar from typing import Any, Dict, Iterable, List, Optional PORTAL_ROOT = "https://wid.cert-bund.de/portal/" SEARCH_ENDPOINT = "https://wid.cert-bund.de/portal/api/securityadvisory/search" EXPORT_ENDPOINT = "https://wid.cert-bund.de/portal/api/securityadvisory/export" CSRF_ENDPOINT = "https://wid.cert-bund.de/portal/api/security/csrf" USER_AGENT = "StellaOps.CertBundOffline/0.1" UTC = dt.timezone.utc class CertBundClient: def __init__( self, cookie_file: Optional[Path] = None, xsrf_token: Optional[str] = None, auto_bootstrap: bool = True, ) -> None: self.cookie_path = cookie_file self.cookie_jar = MozillaCookieJar() if self.cookie_path and self.cookie_path.exists(): self.cookie_jar.load(self.cookie_path, ignore_discard=True, ignore_expires=True) self.opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(self.cookie_jar)) self.opener.addheaders = [("User-Agent", USER_AGENT)] self._xsrf_token = xsrf_token self.auto_bootstrap = auto_bootstrap if self.auto_bootstrap and not self._xsrf_token: self._bootstrap() @property def xsrf_token(self) -> str: if self._xsrf_token: return self._xsrf_token token = _extract_cookie_value(self.cookie_jar, "XSRF-TOKEN") if token: self._xsrf_token = token return token raise RuntimeError( "CERT-Bund XSRF token not available. Provide --xsrf-token or a cookie file " "containing XSRF-TOKEN (see docs/ops/concelier-certbund-operations.md)." ) def fetch_search_pages( self, destination: Path, page_size: int, max_pages: int, ) -> None: destination.mkdir(parents=True, exist_ok=True) for page in range(max_pages): payload = { "page": page, "size": page_size, "sort": ["published,desc"], } try: document = self._post_json(SEARCH_ENDPOINT, payload) except urllib.error.HTTPError as exc: raise RuntimeError( f"Failed to fetch CERT-Bund search page {page}: HTTP {exc.code}. " "Double-check the XSRF token or portal cookies." ) from exc content = document.get("content") or [] if not content and page > 0: break file_path = destination / f"certbund-search-page-{page:02d}.json" _write_pretty_json(file_path, document) print(f"[certbund] wrote search page {page:02d} → {file_path}") if not content: break self._persist_cookies() def fetch_exports(self, destination: Path, start_year: int, end_year: int) -> None: destination.mkdir(parents=True, exist_ok=True) for year in range(start_year, end_year + 1): from_value = f"{year}-01-01" to_value = f"{year}-12-31" query = urllib.parse.urlencode({"format": "json", "from": from_value, "to": to_value}) url = f"{EXPORT_ENDPOINT}?{query}" try: document = self._get_json(url) except urllib.error.HTTPError as exc: raise RuntimeError( f"Failed to fetch CERT-Bund export for {year}: HTTP {exc.code}. " "Ensure the XSRF token and cookies are valid." ) from exc file_path = destination / f"certbund-export-{year}.json" _write_pretty_json(file_path, document) print(f"[certbund] wrote export {year} → {file_path}") self._persist_cookies() def _bootstrap(self) -> None: try: self._request("GET", PORTAL_ROOT, headers={"Accept": "text/html,application/xhtml+xml"}) except urllib.error.HTTPError as exc: raise RuntimeError(f"Failed to bootstrap CERT-Bund session: HTTP {exc.code}") from exc # First attempt to obtain CSRF token directly. self._attempt_csrf_fetch() if _extract_cookie_value(self.cookie_jar, "XSRF-TOKEN"): return # If the token is still missing, trigger the search endpoint once (likely 403) # to make the portal materialise JSESSIONID, then retry token acquisition. try: payload = {"page": 0, "size": 1, "sort": ["published,desc"]} self._post_json(SEARCH_ENDPOINT, payload, include_token=False) except urllib.error.HTTPError: pass self._attempt_csrf_fetch() token = _extract_cookie_value(self.cookie_jar, "XSRF-TOKEN") if token: self._xsrf_token = token else: print( "[certbund] warning: automatic XSRF token retrieval failed. " "Supply --xsrf-token or reuse a browser-exported cookies file.", file=sys.stderr, ) def _attempt_csrf_fetch(self) -> None: headers = { "Accept": "application/json, text/plain, */*", "X-Requested-With": "XMLHttpRequest", "Origin": "https://wid.cert-bund.de", "Referer": PORTAL_ROOT, } try: self._request("GET", CSRF_ENDPOINT, headers=headers) except urllib.error.HTTPError: pass def _request(self, method: str, url: str, data: Optional[bytes] = None, headers: Optional[Dict[str, str]] = None) -> bytes: request = urllib.request.Request(url, data=data, method=method) default_headers = { "User-Agent": USER_AGENT, "Accept": "application/json", } for key, value in default_headers.items(): request.add_header(key, value) if headers: for key, value in headers.items(): request.add_header(key, value) return self.opener.open(request, timeout=60).read() def _post_json(self, url: str, payload: Dict[str, Any], include_token: bool = True) -> Dict[str, Any]: data = json.dumps(payload).encode("utf-8") headers = { "Content-Type": "application/json", "Accept": "application/json", "X-Requested-With": "XMLHttpRequest", "Origin": "https://wid.cert-bund.de", "Referer": PORTAL_ROOT, } if include_token: headers["X-XSRF-TOKEN"] = self.xsrf_token raw = self._request("POST", url, data=data, headers=headers) return json.loads(raw.decode("utf-8")) def _get_json(self, url: str) -> Any: headers = { "Accept": "application/json", "X-Requested-With": "XMLHttpRequest", "Referer": PORTAL_ROOT, } headers["X-XSRF-TOKEN"] = self.xsrf_token raw = self._request("GET", url, headers=headers) return json.loads(raw.decode("utf-8")) def _persist_cookies(self) -> None: if not self.cookie_path: return self.cookie_path.parent.mkdir(parents=True, exist_ok=True) self.cookie_jar.save(self.cookie_path, ignore_discard=True, ignore_expires=True) def _extract_cookie_value(jar: MozillaCookieJar, name: str) -> Optional[str]: for cookie in jar: if cookie.name == name: return cookie.value return None def _write_pretty_json(path: Path, document: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as handle: json.dump(document, handle, ensure_ascii=False, indent=2, sort_keys=True) handle.write("\n") def scan_artifacts(root: Path) -> List[Dict[str, Any]]: records: List[Dict[str, Any]] = [] search_dir = root / "search" export_dir = root / "export" if search_dir.exists(): for file_path in sorted(search_dir.glob("certbund-search-page-*.json")): record = _build_search_record(file_path) records.append(record) if export_dir.exists(): for file_path in sorted(export_dir.glob("certbund-export-*.json")): record = _build_export_record(file_path) records.append(record) return records def _build_search_record(path: Path) -> Dict[str, Any]: with path.open("r", encoding="utf-8") as handle: data = json.load(handle) content = data.get("content") or [] published_values: List[str] = [] for item in content: published = ( item.get("published") or item.get("publishedAt") or item.get("datePublished") or item.get("published_date") ) if isinstance(published, str): published_values.append(published) if published_values: try: ordered = sorted(_parse_iso_timestamp(value) for value in published_values if value) range_from = ordered[0].isoformat() range_to = ordered[-1].isoformat() except ValueError: range_from = range_to = None else: range_from = range_to = None return { "type": "search", "path": path, "source": "concelier.cert-bund.search", "itemCount": len(content), "from": range_from, "to": range_to, "capturedAt": _timestamp_from_stat(path), } def _build_export_record(path: Path) -> Dict[str, Any]: year = _extract_year_from_filename(path.name) if year is not None: from_value = f"{year}-01-01" to_value = f"{year}-12-31" else: from_value = None to_value = None return { "type": "export", "path": path, "source": "concelier.cert-bund.export", "itemCount": None, "from": from_value, "to": to_value, "capturedAt": _timestamp_from_stat(path), } def _timestamp_from_stat(path: Path) -> str: stat = path.stat() return dt.datetime.fromtimestamp(stat.st_mtime, tz=UTC).isoformat() def _extract_year_from_filename(name: str) -> Optional[int]: stem = Path(name).stem parts = stem.split("-") if parts and parts[-1].isdigit() and len(parts[-1]) == 4: return int(parts[-1]) return None def _parse_iso_timestamp(value: str) -> dt.datetime: try: return dt.datetime.fromisoformat(value.replace("Z", "+00:00")) except ValueError: # Fallback for formats like 2025-10-14T06:24:49 return dt.datetime.strptime(value, "%Y-%m-%dT%H:%M:%S").replace(tzinfo=UTC) def build_manifest(root: Path, records: Iterable[Dict[str, Any]], manifest_path: Path) -> None: manifest_entries = [] for record in records: path = record["path"] rel_path = PurePosixPath(path.relative_to(root).as_posix()) sha256 = hashlib.sha256(path.read_bytes()).hexdigest() size = path.stat().st_size entry = { "source": record["source"], "type": record["type"], "path": str(rel_path), "sha256": sha256, "sizeBytes": size, "capturedAt": record["capturedAt"], "from": record.get("from"), "to": record.get("to"), "itemCount": record.get("itemCount"), } manifest_entries.append(entry) sha_file = path.with_suffix(path.suffix + ".sha256") _write_sha_file(sha_file, sha256, path.name) manifest_entries.sort(key=lambda item: item["path"]) manifest_path.parent.mkdir(parents=True, exist_ok=True) manifest_document = { "source": "concelier.cert-bund", "generatedAt": dt.datetime.now(tz=UTC).isoformat(), "artifacts": manifest_entries, } with manifest_path.open("w", encoding="utf-8") as handle: json.dump(manifest_document, handle, ensure_ascii=False, indent=2, sort_keys=True) handle.write("\n") manifest_sha = hashlib.sha256(manifest_path.read_bytes()).hexdigest() _write_sha_file(manifest_path.with_suffix(".sha256"), manifest_sha, manifest_path.name) print(f"[certbund] manifest generated → {manifest_path}") def _write_sha_file(path: Path, digest: str, filename: str) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as handle: handle.write(f"{digest} {filename}\n") def parse_args(argv: Optional[List[str]] = None) -> argparse.Namespace: parser = argparse.ArgumentParser( description="Capture CERT-Bund search/export snapshots for Offline Kit packaging.", ) parser.add_argument("--output", default="seed-data/cert-bund", help="Destination directory for artefacts.") parser.add_argument("--start-year", type=int, default=2014, help="First year (inclusive) for export snapshots.") parser.add_argument( "--end-year", type=int, default=dt.datetime.now(tz=UTC).year, help="Last year (inclusive) for export snapshots.", ) parser.add_argument("--page-size", type=int, default=100, help="Search page size.") parser.add_argument("--max-pages", type=int, default=12, help="Maximum number of search result pages to capture.") parser.add_argument("--cookie-file", type=Path, help="Path to a Netscape cookie file to reuse/persist session cookies.") parser.add_argument("--xsrf-token", help="Optional explicit XSRF token value (overrides cookie discovery).") parser.add_argument( "--skip-fetch", action="store_true", help="Skip HTTP fetches and only regenerate manifest from existing files.", ) parser.add_argument( "--no-bootstrap", action="store_true", help="Do not attempt automatic session bootstrap (use with --skip-fetch or pre-populated cookies).", ) return parser.parse_args(argv) def main(argv: Optional[List[str]] = None) -> int: args = parse_args(argv) output_dir = Path(args.output).expanduser().resolve() if not args.skip_fetch: client = CertBundClient( cookie_file=args.cookie_file, xsrf_token=args.xsrf_token, auto_bootstrap=not args.no_bootstrap, ) start_year = args.start_year end_year = args.end_year if start_year > end_year: raise SystemExit("start-year cannot be greater than end-year.") client.fetch_search_pages(output_dir / "search", args.page_size, args.max_pages) client.fetch_exports(output_dir / "export", start_year, end_year) records = scan_artifacts(output_dir) if not records: print( "[certbund] no artefacts discovered. Fetch data first or point --output to the dataset directory.", file=sys.stderr, ) return 1 manifest_path = output_dir / "manifest" / "certbund-offline-manifest.json" build_manifest(output_dir, records, manifest_path) return 0 if __name__ == "__main__": sys.exit(main())