445 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			445 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python3
 | |
| """
 | |
| Capture CERT-Bund search/export JSON snapshots and generate Offline Kit manifests.
 | |
| 
 | |
| The script can bootstrap a session against https://wid.cert-bund.de, fetch
 | |
| paginated search results plus per-year export payloads, and emit a manifest
 | |
| that records source, date range, SHA-256, and capture timestamps for each artefact.
 | |
| """
 | |
| 
 | |
| from __future__ import annotations
 | |
| 
 | |
| import argparse
 | |
| import datetime as dt
 | |
| import hashlib
 | |
| import json
 | |
| import os
 | |
| from pathlib import Path, PurePosixPath
 | |
| import sys
 | |
| import time
 | |
| import urllib.error
 | |
| import urllib.parse
 | |
| import urllib.request
 | |
| from http.cookiejar import MozillaCookieJar
 | |
| from typing import Any, Dict, Iterable, List, Optional
 | |
| 
 | |
| 
 | |
| PORTAL_ROOT = "https://wid.cert-bund.de/portal/"
 | |
| SEARCH_ENDPOINT = "https://wid.cert-bund.de/portal/api/securityadvisory/search"
 | |
| EXPORT_ENDPOINT = "https://wid.cert-bund.de/portal/api/securityadvisory/export"
 | |
| CSRF_ENDPOINT = "https://wid.cert-bund.de/portal/api/security/csrf"
 | |
| USER_AGENT = "StellaOps.CertBundOffline/0.1"
 | |
| 
 | |
| UTC = dt.timezone.utc
 | |
| 
 | |
| 
 | |
| class CertBundClient:
 | |
|     def __init__(
 | |
|         self,
 | |
|         cookie_file: Optional[Path] = None,
 | |
|         xsrf_token: Optional[str] = None,
 | |
|         auto_bootstrap: bool = True,
 | |
|     ) -> None:
 | |
|         self.cookie_path = cookie_file
 | |
|         self.cookie_jar = MozillaCookieJar()
 | |
| 
 | |
|         if self.cookie_path and self.cookie_path.exists():
 | |
|             self.cookie_jar.load(self.cookie_path, ignore_discard=True, ignore_expires=True)
 | |
| 
 | |
|         self.opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(self.cookie_jar))
 | |
|         self.opener.addheaders = [("User-Agent", USER_AGENT)]
 | |
| 
 | |
|         self._xsrf_token = xsrf_token
 | |
|         self.auto_bootstrap = auto_bootstrap
 | |
| 
 | |
|         if self.auto_bootstrap and not self._xsrf_token:
 | |
|             self._bootstrap()
 | |
| 
 | |
|     @property
 | |
|     def xsrf_token(self) -> str:
 | |
|         if self._xsrf_token:
 | |
|             return self._xsrf_token
 | |
| 
 | |
|         token = _extract_cookie_value(self.cookie_jar, "XSRF-TOKEN")
 | |
|         if token:
 | |
|             self._xsrf_token = token
 | |
|             return token
 | |
| 
 | |
|         raise RuntimeError(
 | |
|             "CERT-Bund XSRF token not available. Provide --xsrf-token or a cookie file "
 | |
|             "containing XSRF-TOKEN (see docs/ops/feedser-certbund-operations.md)."
 | |
|         )
 | |
| 
 | |
|     def fetch_search_pages(
 | |
|         self,
 | |
|         destination: Path,
 | |
|         page_size: int,
 | |
|         max_pages: int,
 | |
|     ) -> None:
 | |
|         destination.mkdir(parents=True, exist_ok=True)
 | |
| 
 | |
|         for page in range(max_pages):
 | |
|             payload = {
 | |
|                 "page": page,
 | |
|                 "size": page_size,
 | |
|                 "sort": ["published,desc"],
 | |
|             }
 | |
|             try:
 | |
|                 document = self._post_json(SEARCH_ENDPOINT, payload)
 | |
|             except urllib.error.HTTPError as exc:
 | |
|                 raise RuntimeError(
 | |
|                     f"Failed to fetch CERT-Bund search page {page}: HTTP {exc.code}. "
 | |
|                     "Double-check the XSRF token or portal cookies."
 | |
|                 ) from exc
 | |
| 
 | |
|             content = document.get("content") or []
 | |
|             if not content and page > 0:
 | |
|                 break
 | |
| 
 | |
|             file_path = destination / f"certbund-search-page-{page:02d}.json"
 | |
|             _write_pretty_json(file_path, document)
 | |
|             print(f"[certbund] wrote search page {page:02d} → {file_path}")
 | |
| 
 | |
|             if not content:
 | |
|                 break
 | |
| 
 | |
|         self._persist_cookies()
 | |
| 
 | |
|     def fetch_exports(self, destination: Path, start_year: int, end_year: int) -> None:
 | |
|         destination.mkdir(parents=True, exist_ok=True)
 | |
| 
 | |
|         for year in range(start_year, end_year + 1):
 | |
|             from_value = f"{year}-01-01"
 | |
|             to_value = f"{year}-12-31"
 | |
|             query = urllib.parse.urlencode({"format": "json", "from": from_value, "to": to_value})
 | |
|             url = f"{EXPORT_ENDPOINT}?{query}"
 | |
|             try:
 | |
|                 document = self._get_json(url)
 | |
|             except urllib.error.HTTPError as exc:
 | |
|                 raise RuntimeError(
 | |
|                     f"Failed to fetch CERT-Bund export for {year}: HTTP {exc.code}. "
 | |
|                     "Ensure the XSRF token and cookies are valid."
 | |
|                 ) from exc
 | |
| 
 | |
|             file_path = destination / f"certbund-export-{year}.json"
 | |
|             _write_pretty_json(file_path, document)
 | |
|             print(f"[certbund] wrote export {year} → {file_path}")
 | |
| 
 | |
|         self._persist_cookies()
 | |
| 
 | |
|     def _bootstrap(self) -> None:
 | |
|         try:
 | |
|             self._request("GET", PORTAL_ROOT, headers={"Accept": "text/html,application/xhtml+xml"})
 | |
|         except urllib.error.HTTPError as exc:
 | |
|             raise RuntimeError(f"Failed to bootstrap CERT-Bund session: HTTP {exc.code}") from exc
 | |
| 
 | |
|         # First attempt to obtain CSRF token directly.
 | |
|         self._attempt_csrf_fetch()
 | |
| 
 | |
|         if _extract_cookie_value(self.cookie_jar, "XSRF-TOKEN"):
 | |
|             return
 | |
| 
 | |
|         # If the token is still missing, trigger the search endpoint once (likely 403)
 | |
|         # to make the portal materialise JSESSIONID, then retry token acquisition.
 | |
|         try:
 | |
|             payload = {"page": 0, "size": 1, "sort": ["published,desc"]}
 | |
|             self._post_json(SEARCH_ENDPOINT, payload, include_token=False)
 | |
|         except urllib.error.HTTPError:
 | |
|             pass
 | |
| 
 | |
|         self._attempt_csrf_fetch()
 | |
| 
 | |
|         token = _extract_cookie_value(self.cookie_jar, "XSRF-TOKEN")
 | |
|         if token:
 | |
|             self._xsrf_token = token
 | |
|         else:
 | |
|             print(
 | |
|                 "[certbund] warning: automatic XSRF token retrieval failed. "
 | |
|                 "Supply --xsrf-token or reuse a browser-exported cookies file.",
 | |
|                 file=sys.stderr,
 | |
|             )
 | |
| 
 | |
|     def _attempt_csrf_fetch(self) -> None:
 | |
|         headers = {
 | |
|             "Accept": "application/json, text/plain, */*",
 | |
|             "X-Requested-With": "XMLHttpRequest",
 | |
|             "Origin": "https://wid.cert-bund.de",
 | |
|             "Referer": PORTAL_ROOT,
 | |
|         }
 | |
|         try:
 | |
|             self._request("GET", CSRF_ENDPOINT, headers=headers)
 | |
|         except urllib.error.HTTPError:
 | |
|             pass
 | |
| 
 | |
|     def _request(self, method: str, url: str, data: Optional[bytes] = None, headers: Optional[Dict[str, str]] = None) -> bytes:
 | |
|         request = urllib.request.Request(url, data=data, method=method)
 | |
|         default_headers = {
 | |
|             "User-Agent": USER_AGENT,
 | |
|             "Accept": "application/json",
 | |
|         }
 | |
|         for key, value in default_headers.items():
 | |
|             request.add_header(key, value)
 | |
| 
 | |
|         if headers:
 | |
|             for key, value in headers.items():
 | |
|                 request.add_header(key, value)
 | |
| 
 | |
|         return self.opener.open(request, timeout=60).read()
 | |
| 
 | |
|     def _post_json(self, url: str, payload: Dict[str, Any], include_token: bool = True) -> Dict[str, Any]:
 | |
|         data = json.dumps(payload).encode("utf-8")
 | |
|         headers = {
 | |
|             "Content-Type": "application/json",
 | |
|             "Accept": "application/json",
 | |
|             "X-Requested-With": "XMLHttpRequest",
 | |
|             "Origin": "https://wid.cert-bund.de",
 | |
|             "Referer": PORTAL_ROOT,
 | |
|         }
 | |
|         if include_token:
 | |
|             headers["X-XSRF-TOKEN"] = self.xsrf_token
 | |
| 
 | |
|         raw = self._request("POST", url, data=data, headers=headers)
 | |
|         return json.loads(raw.decode("utf-8"))
 | |
| 
 | |
|     def _get_json(self, url: str) -> Any:
 | |
|         headers = {
 | |
|             "Accept": "application/json",
 | |
|             "X-Requested-With": "XMLHttpRequest",
 | |
|             "Referer": PORTAL_ROOT,
 | |
|         }
 | |
|         headers["X-XSRF-TOKEN"] = self.xsrf_token
 | |
| 
 | |
|         raw = self._request("GET", url, headers=headers)
 | |
|         return json.loads(raw.decode("utf-8"))
 | |
| 
 | |
|     def _persist_cookies(self) -> None:
 | |
|         if not self.cookie_path:
 | |
|             return
 | |
| 
 | |
|         self.cookie_path.parent.mkdir(parents=True, exist_ok=True)
 | |
|         self.cookie_jar.save(self.cookie_path, ignore_discard=True, ignore_expires=True)
 | |
| 
 | |
| 
 | |
| def _extract_cookie_value(jar: MozillaCookieJar, name: str) -> Optional[str]:
 | |
|     for cookie in jar:
 | |
|         if cookie.name == name:
 | |
|             return cookie.value
 | |
|     return None
 | |
| 
 | |
| 
 | |
| def _write_pretty_json(path: Path, document: Any) -> None:
 | |
|     path.parent.mkdir(parents=True, exist_ok=True)
 | |
|     with path.open("w", encoding="utf-8") as handle:
 | |
|         json.dump(document, handle, ensure_ascii=False, indent=2, sort_keys=True)
 | |
|         handle.write("\n")
 | |
| 
 | |
| 
 | |
| def scan_artifacts(root: Path) -> List[Dict[str, Any]]:
 | |
|     records: List[Dict[str, Any]] = []
 | |
|     search_dir = root / "search"
 | |
|     export_dir = root / "export"
 | |
| 
 | |
|     if search_dir.exists():
 | |
|         for file_path in sorted(search_dir.glob("certbund-search-page-*.json")):
 | |
|             record = _build_search_record(file_path)
 | |
|             records.append(record)
 | |
| 
 | |
|     if export_dir.exists():
 | |
|         for file_path in sorted(export_dir.glob("certbund-export-*.json")):
 | |
|             record = _build_export_record(file_path)
 | |
|             records.append(record)
 | |
| 
 | |
|     return records
 | |
| 
 | |
| 
 | |
| def _build_search_record(path: Path) -> Dict[str, Any]:
 | |
|     with path.open("r", encoding="utf-8") as handle:
 | |
|         data = json.load(handle)
 | |
| 
 | |
|     content = data.get("content") or []
 | |
|     published_values: List[str] = []
 | |
|     for item in content:
 | |
|         published = (
 | |
|             item.get("published")
 | |
|             or item.get("publishedAt")
 | |
|             or item.get("datePublished")
 | |
|             or item.get("published_date")
 | |
|         )
 | |
|         if isinstance(published, str):
 | |
|             published_values.append(published)
 | |
| 
 | |
|     if published_values:
 | |
|         try:
 | |
|             ordered = sorted(_parse_iso_timestamp(value) for value in published_values if value)
 | |
|             range_from = ordered[0].isoformat()
 | |
|             range_to = ordered[-1].isoformat()
 | |
|         except ValueError:
 | |
|             range_from = range_to = None
 | |
|     else:
 | |
|         range_from = range_to = None
 | |
| 
 | |
|     return {
 | |
|         "type": "search",
 | |
|         "path": path,
 | |
|         "source": "feedser.cert-bund.search",
 | |
|         "itemCount": len(content),
 | |
|         "from": range_from,
 | |
|         "to": range_to,
 | |
|         "capturedAt": _timestamp_from_stat(path),
 | |
|     }
 | |
| 
 | |
| 
 | |
| def _build_export_record(path: Path) -> Dict[str, Any]:
 | |
|     year = _extract_year_from_filename(path.name)
 | |
|     if year is not None:
 | |
|         from_value = f"{year}-01-01"
 | |
|         to_value = f"{year}-12-31"
 | |
|     else:
 | |
|         from_value = None
 | |
|         to_value = None
 | |
| 
 | |
|     return {
 | |
|         "type": "export",
 | |
|         "path": path,
 | |
|         "source": "feedser.cert-bund.export",
 | |
|         "itemCount": None,
 | |
|         "from": from_value,
 | |
|         "to": to_value,
 | |
|         "capturedAt": _timestamp_from_stat(path),
 | |
|     }
 | |
| 
 | |
| 
 | |
| def _timestamp_from_stat(path: Path) -> str:
 | |
|     stat = path.stat()
 | |
|     return dt.datetime.fromtimestamp(stat.st_mtime, tz=UTC).isoformat()
 | |
| 
 | |
| 
 | |
| def _extract_year_from_filename(name: str) -> Optional[int]:
 | |
|     stem = Path(name).stem
 | |
|     parts = stem.split("-")
 | |
|     if parts and parts[-1].isdigit() and len(parts[-1]) == 4:
 | |
|         return int(parts[-1])
 | |
|     return None
 | |
| 
 | |
| 
 | |
| def _parse_iso_timestamp(value: str) -> dt.datetime:
 | |
|     try:
 | |
|         return dt.datetime.fromisoformat(value.replace("Z", "+00:00"))
 | |
|     except ValueError:
 | |
|         # Fallback for formats like 2025-10-14T06:24:49
 | |
|         return dt.datetime.strptime(value, "%Y-%m-%dT%H:%M:%S").replace(tzinfo=UTC)
 | |
| 
 | |
| 
 | |
| def build_manifest(root: Path, records: Iterable[Dict[str, Any]], manifest_path: Path) -> None:
 | |
|     manifest_entries = []
 | |
|     for record in records:
 | |
|         path = record["path"]
 | |
|         rel_path = PurePosixPath(path.relative_to(root).as_posix())
 | |
|         sha256 = hashlib.sha256(path.read_bytes()).hexdigest()
 | |
|         size = path.stat().st_size
 | |
| 
 | |
|         entry = {
 | |
|             "source": record["source"],
 | |
|             "type": record["type"],
 | |
|             "path": str(rel_path),
 | |
|             "sha256": sha256,
 | |
|             "sizeBytes": size,
 | |
|             "capturedAt": record["capturedAt"],
 | |
|             "from": record.get("from"),
 | |
|             "to": record.get("to"),
 | |
|             "itemCount": record.get("itemCount"),
 | |
|         }
 | |
|         manifest_entries.append(entry)
 | |
| 
 | |
|         sha_file = path.with_suffix(path.suffix + ".sha256")
 | |
|         _write_sha_file(sha_file, sha256, path.name)
 | |
| 
 | |
|     manifest_entries.sort(key=lambda item: item["path"])
 | |
| 
 | |
|     manifest_path.parent.mkdir(parents=True, exist_ok=True)
 | |
|     manifest_document = {
 | |
|         "source": "feedser.cert-bund",
 | |
|         "generatedAt": dt.datetime.now(tz=UTC).isoformat(),
 | |
|         "artifacts": manifest_entries,
 | |
|     }
 | |
| 
 | |
|     with manifest_path.open("w", encoding="utf-8") as handle:
 | |
|         json.dump(manifest_document, handle, ensure_ascii=False, indent=2, sort_keys=True)
 | |
|         handle.write("\n")
 | |
| 
 | |
|     manifest_sha = hashlib.sha256(manifest_path.read_bytes()).hexdigest()
 | |
|     _write_sha_file(manifest_path.with_suffix(".sha256"), manifest_sha, manifest_path.name)
 | |
| 
 | |
|     print(f"[certbund] manifest generated → {manifest_path}")
 | |
| 
 | |
| 
 | |
| def _write_sha_file(path: Path, digest: str, filename: str) -> None:
 | |
|     path.parent.mkdir(parents=True, exist_ok=True)
 | |
|     with path.open("w", encoding="utf-8") as handle:
 | |
|         handle.write(f"{digest}  {filename}\n")
 | |
| 
 | |
| 
 | |
| def parse_args(argv: Optional[List[str]] = None) -> argparse.Namespace:
 | |
|     parser = argparse.ArgumentParser(
 | |
|         description="Capture CERT-Bund search/export snapshots for Offline Kit packaging.",
 | |
|     )
 | |
|     parser.add_argument("--output", default="seed-data/cert-bund", help="Destination directory for artefacts.")
 | |
|     parser.add_argument("--start-year", type=int, default=2014, help="First year (inclusive) for export snapshots.")
 | |
|     parser.add_argument(
 | |
|         "--end-year",
 | |
|         type=int,
 | |
|         default=dt.datetime.now(tz=UTC).year,
 | |
|         help="Last year (inclusive) for export snapshots.",
 | |
|     )
 | |
|     parser.add_argument("--page-size", type=int, default=100, help="Search page size.")
 | |
|     parser.add_argument("--max-pages", type=int, default=12, help="Maximum number of search result pages to capture.")
 | |
|     parser.add_argument("--cookie-file", type=Path, help="Path to a Netscape cookie file to reuse/persist session cookies.")
 | |
|     parser.add_argument("--xsrf-token", help="Optional explicit XSRF token value (overrides cookie discovery).")
 | |
|     parser.add_argument(
 | |
|         "--skip-fetch",
 | |
|         action="store_true",
 | |
|         help="Skip HTTP fetches and only regenerate manifest from existing files.",
 | |
|     )
 | |
|     parser.add_argument(
 | |
|         "--no-bootstrap",
 | |
|         action="store_true",
 | |
|         help="Do not attempt automatic session bootstrap (use with --skip-fetch or pre-populated cookies).",
 | |
|     )
 | |
|     return parser.parse_args(argv)
 | |
| 
 | |
| 
 | |
| def main(argv: Optional[List[str]] = None) -> int:
 | |
|     args = parse_args(argv)
 | |
|     output_dir = Path(args.output).expanduser().resolve()
 | |
| 
 | |
|     if not args.skip_fetch:
 | |
|         client = CertBundClient(
 | |
|             cookie_file=args.cookie_file,
 | |
|             xsrf_token=args.xsrf_token,
 | |
|             auto_bootstrap=not args.no_bootstrap,
 | |
|         )
 | |
| 
 | |
|         start_year = args.start_year
 | |
|         end_year = args.end_year
 | |
|         if start_year > end_year:
 | |
|             raise SystemExit("start-year cannot be greater than end-year.")
 | |
| 
 | |
|         client.fetch_search_pages(output_dir / "search", args.page_size, args.max_pages)
 | |
|         client.fetch_exports(output_dir / "export", start_year, end_year)
 | |
| 
 | |
|     records = scan_artifacts(output_dir)
 | |
|     if not records:
 | |
|         print(
 | |
|             "[certbund] no artefacts discovered. Fetch data first or point --output to the dataset directory.",
 | |
|             file=sys.stderr,
 | |
|         )
 | |
|         return 1
 | |
| 
 | |
|     manifest_path = output_dir / "manifest" / "certbund-offline-manifest.json"
 | |
|     build_manifest(output_dir, records, manifest_path)
 | |
|     return 0
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     sys.exit(main())
 |