445 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			445 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/env python3
 | 
						|
"""
 | 
						|
Capture CERT-Bund search/export JSON snapshots and generate Offline Kit manifests.
 | 
						|
 | 
						|
The script can bootstrap a session against https://wid.cert-bund.de, fetch
 | 
						|
paginated search results plus per-year export payloads, and emit a manifest
 | 
						|
that records source, date range, SHA-256, and capture timestamps for each artefact.
 | 
						|
"""
 | 
						|
 | 
						|
from __future__ import annotations
 | 
						|
 | 
						|
import argparse
 | 
						|
import datetime as dt
 | 
						|
import hashlib
 | 
						|
import json
 | 
						|
import os
 | 
						|
from pathlib import Path, PurePosixPath
 | 
						|
import sys
 | 
						|
import time
 | 
						|
import urllib.error
 | 
						|
import urllib.parse
 | 
						|
import urllib.request
 | 
						|
from http.cookiejar import MozillaCookieJar
 | 
						|
from typing import Any, Dict, Iterable, List, Optional
 | 
						|
 | 
						|
 | 
						|
PORTAL_ROOT = "https://wid.cert-bund.de/portal/"
 | 
						|
SEARCH_ENDPOINT = "https://wid.cert-bund.de/portal/api/securityadvisory/search"
 | 
						|
EXPORT_ENDPOINT = "https://wid.cert-bund.de/portal/api/securityadvisory/export"
 | 
						|
CSRF_ENDPOINT = "https://wid.cert-bund.de/portal/api/security/csrf"
 | 
						|
USER_AGENT = "StellaOps.CertBundOffline/0.1"
 | 
						|
 | 
						|
UTC = dt.timezone.utc
 | 
						|
 | 
						|
 | 
						|
class CertBundClient:
 | 
						|
    def __init__(
 | 
						|
        self,
 | 
						|
        cookie_file: Optional[Path] = None,
 | 
						|
        xsrf_token: Optional[str] = None,
 | 
						|
        auto_bootstrap: bool = True,
 | 
						|
    ) -> None:
 | 
						|
        self.cookie_path = cookie_file
 | 
						|
        self.cookie_jar = MozillaCookieJar()
 | 
						|
 | 
						|
        if self.cookie_path and self.cookie_path.exists():
 | 
						|
            self.cookie_jar.load(self.cookie_path, ignore_discard=True, ignore_expires=True)
 | 
						|
 | 
						|
        self.opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(self.cookie_jar))
 | 
						|
        self.opener.addheaders = [("User-Agent", USER_AGENT)]
 | 
						|
 | 
						|
        self._xsrf_token = xsrf_token
 | 
						|
        self.auto_bootstrap = auto_bootstrap
 | 
						|
 | 
						|
        if self.auto_bootstrap and not self._xsrf_token:
 | 
						|
            self._bootstrap()
 | 
						|
 | 
						|
    @property
 | 
						|
    def xsrf_token(self) -> str:
 | 
						|
        if self._xsrf_token:
 | 
						|
            return self._xsrf_token
 | 
						|
 | 
						|
        token = _extract_cookie_value(self.cookie_jar, "XSRF-TOKEN")
 | 
						|
        if token:
 | 
						|
            self._xsrf_token = token
 | 
						|
            return token
 | 
						|
 | 
						|
        raise RuntimeError(
 | 
						|
            "CERT-Bund XSRF token not available. Provide --xsrf-token or a cookie file "
 | 
						|
            "containing XSRF-TOKEN (see docs/ops/feedser-certbund-operations.md)."
 | 
						|
        )
 | 
						|
 | 
						|
    def fetch_search_pages(
 | 
						|
        self,
 | 
						|
        destination: Path,
 | 
						|
        page_size: int,
 | 
						|
        max_pages: int,
 | 
						|
    ) -> None:
 | 
						|
        destination.mkdir(parents=True, exist_ok=True)
 | 
						|
 | 
						|
        for page in range(max_pages):
 | 
						|
            payload = {
 | 
						|
                "page": page,
 | 
						|
                "size": page_size,
 | 
						|
                "sort": ["published,desc"],
 | 
						|
            }
 | 
						|
            try:
 | 
						|
                document = self._post_json(SEARCH_ENDPOINT, payload)
 | 
						|
            except urllib.error.HTTPError as exc:
 | 
						|
                raise RuntimeError(
 | 
						|
                    f"Failed to fetch CERT-Bund search page {page}: HTTP {exc.code}. "
 | 
						|
                    "Double-check the XSRF token or portal cookies."
 | 
						|
                ) from exc
 | 
						|
 | 
						|
            content = document.get("content") or []
 | 
						|
            if not content and page > 0:
 | 
						|
                break
 | 
						|
 | 
						|
            file_path = destination / f"certbund-search-page-{page:02d}.json"
 | 
						|
            _write_pretty_json(file_path, document)
 | 
						|
            print(f"[certbund] wrote search page {page:02d} → {file_path}")
 | 
						|
 | 
						|
            if not content:
 | 
						|
                break
 | 
						|
 | 
						|
        self._persist_cookies()
 | 
						|
 | 
						|
    def fetch_exports(self, destination: Path, start_year: int, end_year: int) -> None:
 | 
						|
        destination.mkdir(parents=True, exist_ok=True)
 | 
						|
 | 
						|
        for year in range(start_year, end_year + 1):
 | 
						|
            from_value = f"{year}-01-01"
 | 
						|
            to_value = f"{year}-12-31"
 | 
						|
            query = urllib.parse.urlencode({"format": "json", "from": from_value, "to": to_value})
 | 
						|
            url = f"{EXPORT_ENDPOINT}?{query}"
 | 
						|
            try:
 | 
						|
                document = self._get_json(url)
 | 
						|
            except urllib.error.HTTPError as exc:
 | 
						|
                raise RuntimeError(
 | 
						|
                    f"Failed to fetch CERT-Bund export for {year}: HTTP {exc.code}. "
 | 
						|
                    "Ensure the XSRF token and cookies are valid."
 | 
						|
                ) from exc
 | 
						|
 | 
						|
            file_path = destination / f"certbund-export-{year}.json"
 | 
						|
            _write_pretty_json(file_path, document)
 | 
						|
            print(f"[certbund] wrote export {year} → {file_path}")
 | 
						|
 | 
						|
        self._persist_cookies()
 | 
						|
 | 
						|
    def _bootstrap(self) -> None:
 | 
						|
        try:
 | 
						|
            self._request("GET", PORTAL_ROOT, headers={"Accept": "text/html,application/xhtml+xml"})
 | 
						|
        except urllib.error.HTTPError as exc:
 | 
						|
            raise RuntimeError(f"Failed to bootstrap CERT-Bund session: HTTP {exc.code}") from exc
 | 
						|
 | 
						|
        # First attempt to obtain CSRF token directly.
 | 
						|
        self._attempt_csrf_fetch()
 | 
						|
 | 
						|
        if _extract_cookie_value(self.cookie_jar, "XSRF-TOKEN"):
 | 
						|
            return
 | 
						|
 | 
						|
        # If the token is still missing, trigger the search endpoint once (likely 403)
 | 
						|
        # to make the portal materialise JSESSIONID, then retry token acquisition.
 | 
						|
        try:
 | 
						|
            payload = {"page": 0, "size": 1, "sort": ["published,desc"]}
 | 
						|
            self._post_json(SEARCH_ENDPOINT, payload, include_token=False)
 | 
						|
        except urllib.error.HTTPError:
 | 
						|
            pass
 | 
						|
 | 
						|
        self._attempt_csrf_fetch()
 | 
						|
 | 
						|
        token = _extract_cookie_value(self.cookie_jar, "XSRF-TOKEN")
 | 
						|
        if token:
 | 
						|
            self._xsrf_token = token
 | 
						|
        else:
 | 
						|
            print(
 | 
						|
                "[certbund] warning: automatic XSRF token retrieval failed. "
 | 
						|
                "Supply --xsrf-token or reuse a browser-exported cookies file.",
 | 
						|
                file=sys.stderr,
 | 
						|
            )
 | 
						|
 | 
						|
    def _attempt_csrf_fetch(self) -> None:
 | 
						|
        headers = {
 | 
						|
            "Accept": "application/json, text/plain, */*",
 | 
						|
            "X-Requested-With": "XMLHttpRequest",
 | 
						|
            "Origin": "https://wid.cert-bund.de",
 | 
						|
            "Referer": PORTAL_ROOT,
 | 
						|
        }
 | 
						|
        try:
 | 
						|
            self._request("GET", CSRF_ENDPOINT, headers=headers)
 | 
						|
        except urllib.error.HTTPError:
 | 
						|
            pass
 | 
						|
 | 
						|
    def _request(self, method: str, url: str, data: Optional[bytes] = None, headers: Optional[Dict[str, str]] = None) -> bytes:
 | 
						|
        request = urllib.request.Request(url, data=data, method=method)
 | 
						|
        default_headers = {
 | 
						|
            "User-Agent": USER_AGENT,
 | 
						|
            "Accept": "application/json",
 | 
						|
        }
 | 
						|
        for key, value in default_headers.items():
 | 
						|
            request.add_header(key, value)
 | 
						|
 | 
						|
        if headers:
 | 
						|
            for key, value in headers.items():
 | 
						|
                request.add_header(key, value)
 | 
						|
 | 
						|
        return self.opener.open(request, timeout=60).read()
 | 
						|
 | 
						|
    def _post_json(self, url: str, payload: Dict[str, Any], include_token: bool = True) -> Dict[str, Any]:
 | 
						|
        data = json.dumps(payload).encode("utf-8")
 | 
						|
        headers = {
 | 
						|
            "Content-Type": "application/json",
 | 
						|
            "Accept": "application/json",
 | 
						|
            "X-Requested-With": "XMLHttpRequest",
 | 
						|
            "Origin": "https://wid.cert-bund.de",
 | 
						|
            "Referer": PORTAL_ROOT,
 | 
						|
        }
 | 
						|
        if include_token:
 | 
						|
            headers["X-XSRF-TOKEN"] = self.xsrf_token
 | 
						|
 | 
						|
        raw = self._request("POST", url, data=data, headers=headers)
 | 
						|
        return json.loads(raw.decode("utf-8"))
 | 
						|
 | 
						|
    def _get_json(self, url: str) -> Any:
 | 
						|
        headers = {
 | 
						|
            "Accept": "application/json",
 | 
						|
            "X-Requested-With": "XMLHttpRequest",
 | 
						|
            "Referer": PORTAL_ROOT,
 | 
						|
        }
 | 
						|
        headers["X-XSRF-TOKEN"] = self.xsrf_token
 | 
						|
 | 
						|
        raw = self._request("GET", url, headers=headers)
 | 
						|
        return json.loads(raw.decode("utf-8"))
 | 
						|
 | 
						|
    def _persist_cookies(self) -> None:
 | 
						|
        if not self.cookie_path:
 | 
						|
            return
 | 
						|
 | 
						|
        self.cookie_path.parent.mkdir(parents=True, exist_ok=True)
 | 
						|
        self.cookie_jar.save(self.cookie_path, ignore_discard=True, ignore_expires=True)
 | 
						|
 | 
						|
 | 
						|
def _extract_cookie_value(jar: MozillaCookieJar, name: str) -> Optional[str]:
 | 
						|
    for cookie in jar:
 | 
						|
        if cookie.name == name:
 | 
						|
            return cookie.value
 | 
						|
    return None
 | 
						|
 | 
						|
 | 
						|
def _write_pretty_json(path: Path, document: Any) -> None:
 | 
						|
    path.parent.mkdir(parents=True, exist_ok=True)
 | 
						|
    with path.open("w", encoding="utf-8") as handle:
 | 
						|
        json.dump(document, handle, ensure_ascii=False, indent=2, sort_keys=True)
 | 
						|
        handle.write("\n")
 | 
						|
 | 
						|
 | 
						|
def scan_artifacts(root: Path) -> List[Dict[str, Any]]:
 | 
						|
    records: List[Dict[str, Any]] = []
 | 
						|
    search_dir = root / "search"
 | 
						|
    export_dir = root / "export"
 | 
						|
 | 
						|
    if search_dir.exists():
 | 
						|
        for file_path in sorted(search_dir.glob("certbund-search-page-*.json")):
 | 
						|
            record = _build_search_record(file_path)
 | 
						|
            records.append(record)
 | 
						|
 | 
						|
    if export_dir.exists():
 | 
						|
        for file_path in sorted(export_dir.glob("certbund-export-*.json")):
 | 
						|
            record = _build_export_record(file_path)
 | 
						|
            records.append(record)
 | 
						|
 | 
						|
    return records
 | 
						|
 | 
						|
 | 
						|
def _build_search_record(path: Path) -> Dict[str, Any]:
 | 
						|
    with path.open("r", encoding="utf-8") as handle:
 | 
						|
        data = json.load(handle)
 | 
						|
 | 
						|
    content = data.get("content") or []
 | 
						|
    published_values: List[str] = []
 | 
						|
    for item in content:
 | 
						|
        published = (
 | 
						|
            item.get("published")
 | 
						|
            or item.get("publishedAt")
 | 
						|
            or item.get("datePublished")
 | 
						|
            or item.get("published_date")
 | 
						|
        )
 | 
						|
        if isinstance(published, str):
 | 
						|
            published_values.append(published)
 | 
						|
 | 
						|
    if published_values:
 | 
						|
        try:
 | 
						|
            ordered = sorted(_parse_iso_timestamp(value) for value in published_values if value)
 | 
						|
            range_from = ordered[0].isoformat()
 | 
						|
            range_to = ordered[-1].isoformat()
 | 
						|
        except ValueError:
 | 
						|
            range_from = range_to = None
 | 
						|
    else:
 | 
						|
        range_from = range_to = None
 | 
						|
 | 
						|
    return {
 | 
						|
        "type": "search",
 | 
						|
        "path": path,
 | 
						|
        "source": "feedser.cert-bund.search",
 | 
						|
        "itemCount": len(content),
 | 
						|
        "from": range_from,
 | 
						|
        "to": range_to,
 | 
						|
        "capturedAt": _timestamp_from_stat(path),
 | 
						|
    }
 | 
						|
 | 
						|
 | 
						|
def _build_export_record(path: Path) -> Dict[str, Any]:
 | 
						|
    year = _extract_year_from_filename(path.name)
 | 
						|
    if year is not None:
 | 
						|
        from_value = f"{year}-01-01"
 | 
						|
        to_value = f"{year}-12-31"
 | 
						|
    else:
 | 
						|
        from_value = None
 | 
						|
        to_value = None
 | 
						|
 | 
						|
    return {
 | 
						|
        "type": "export",
 | 
						|
        "path": path,
 | 
						|
        "source": "feedser.cert-bund.export",
 | 
						|
        "itemCount": None,
 | 
						|
        "from": from_value,
 | 
						|
        "to": to_value,
 | 
						|
        "capturedAt": _timestamp_from_stat(path),
 | 
						|
    }
 | 
						|
 | 
						|
 | 
						|
def _timestamp_from_stat(path: Path) -> str:
 | 
						|
    stat = path.stat()
 | 
						|
    return dt.datetime.fromtimestamp(stat.st_mtime, tz=UTC).isoformat()
 | 
						|
 | 
						|
 | 
						|
def _extract_year_from_filename(name: str) -> Optional[int]:
 | 
						|
    stem = Path(name).stem
 | 
						|
    parts = stem.split("-")
 | 
						|
    if parts and parts[-1].isdigit() and len(parts[-1]) == 4:
 | 
						|
        return int(parts[-1])
 | 
						|
    return None
 | 
						|
 | 
						|
 | 
						|
def _parse_iso_timestamp(value: str) -> dt.datetime:
 | 
						|
    try:
 | 
						|
        return dt.datetime.fromisoformat(value.replace("Z", "+00:00"))
 | 
						|
    except ValueError:
 | 
						|
        # Fallback for formats like 2025-10-14T06:24:49
 | 
						|
        return dt.datetime.strptime(value, "%Y-%m-%dT%H:%M:%S").replace(tzinfo=UTC)
 | 
						|
 | 
						|
 | 
						|
def build_manifest(root: Path, records: Iterable[Dict[str, Any]], manifest_path: Path) -> None:
 | 
						|
    manifest_entries = []
 | 
						|
    for record in records:
 | 
						|
        path = record["path"]
 | 
						|
        rel_path = PurePosixPath(path.relative_to(root).as_posix())
 | 
						|
        sha256 = hashlib.sha256(path.read_bytes()).hexdigest()
 | 
						|
        size = path.stat().st_size
 | 
						|
 | 
						|
        entry = {
 | 
						|
            "source": record["source"],
 | 
						|
            "type": record["type"],
 | 
						|
            "path": str(rel_path),
 | 
						|
            "sha256": sha256,
 | 
						|
            "sizeBytes": size,
 | 
						|
            "capturedAt": record["capturedAt"],
 | 
						|
            "from": record.get("from"),
 | 
						|
            "to": record.get("to"),
 | 
						|
            "itemCount": record.get("itemCount"),
 | 
						|
        }
 | 
						|
        manifest_entries.append(entry)
 | 
						|
 | 
						|
        sha_file = path.with_suffix(path.suffix + ".sha256")
 | 
						|
        _write_sha_file(sha_file, sha256, path.name)
 | 
						|
 | 
						|
    manifest_entries.sort(key=lambda item: item["path"])
 | 
						|
 | 
						|
    manifest_path.parent.mkdir(parents=True, exist_ok=True)
 | 
						|
    manifest_document = {
 | 
						|
        "source": "feedser.cert-bund",
 | 
						|
        "generatedAt": dt.datetime.now(tz=UTC).isoformat(),
 | 
						|
        "artifacts": manifest_entries,
 | 
						|
    }
 | 
						|
 | 
						|
    with manifest_path.open("w", encoding="utf-8") as handle:
 | 
						|
        json.dump(manifest_document, handle, ensure_ascii=False, indent=2, sort_keys=True)
 | 
						|
        handle.write("\n")
 | 
						|
 | 
						|
    manifest_sha = hashlib.sha256(manifest_path.read_bytes()).hexdigest()
 | 
						|
    _write_sha_file(manifest_path.with_suffix(".sha256"), manifest_sha, manifest_path.name)
 | 
						|
 | 
						|
    print(f"[certbund] manifest generated → {manifest_path}")
 | 
						|
 | 
						|
 | 
						|
def _write_sha_file(path: Path, digest: str, filename: str) -> None:
 | 
						|
    path.parent.mkdir(parents=True, exist_ok=True)
 | 
						|
    with path.open("w", encoding="utf-8") as handle:
 | 
						|
        handle.write(f"{digest}  {filename}\n")
 | 
						|
 | 
						|
 | 
						|
def parse_args(argv: Optional[List[str]] = None) -> argparse.Namespace:
 | 
						|
    parser = argparse.ArgumentParser(
 | 
						|
        description="Capture CERT-Bund search/export snapshots for Offline Kit packaging.",
 | 
						|
    )
 | 
						|
    parser.add_argument("--output", default="seed-data/cert-bund", help="Destination directory for artefacts.")
 | 
						|
    parser.add_argument("--start-year", type=int, default=2014, help="First year (inclusive) for export snapshots.")
 | 
						|
    parser.add_argument(
 | 
						|
        "--end-year",
 | 
						|
        type=int,
 | 
						|
        default=dt.datetime.now(tz=UTC).year,
 | 
						|
        help="Last year (inclusive) for export snapshots.",
 | 
						|
    )
 | 
						|
    parser.add_argument("--page-size", type=int, default=100, help="Search page size.")
 | 
						|
    parser.add_argument("--max-pages", type=int, default=12, help="Maximum number of search result pages to capture.")
 | 
						|
    parser.add_argument("--cookie-file", type=Path, help="Path to a Netscape cookie file to reuse/persist session cookies.")
 | 
						|
    parser.add_argument("--xsrf-token", help="Optional explicit XSRF token value (overrides cookie discovery).")
 | 
						|
    parser.add_argument(
 | 
						|
        "--skip-fetch",
 | 
						|
        action="store_true",
 | 
						|
        help="Skip HTTP fetches and only regenerate manifest from existing files.",
 | 
						|
    )
 | 
						|
    parser.add_argument(
 | 
						|
        "--no-bootstrap",
 | 
						|
        action="store_true",
 | 
						|
        help="Do not attempt automatic session bootstrap (use with --skip-fetch or pre-populated cookies).",
 | 
						|
    )
 | 
						|
    return parser.parse_args(argv)
 | 
						|
 | 
						|
 | 
						|
def main(argv: Optional[List[str]] = None) -> int:
 | 
						|
    args = parse_args(argv)
 | 
						|
    output_dir = Path(args.output).expanduser().resolve()
 | 
						|
 | 
						|
    if not args.skip_fetch:
 | 
						|
        client = CertBundClient(
 | 
						|
            cookie_file=args.cookie_file,
 | 
						|
            xsrf_token=args.xsrf_token,
 | 
						|
            auto_bootstrap=not args.no_bootstrap,
 | 
						|
        )
 | 
						|
 | 
						|
        start_year = args.start_year
 | 
						|
        end_year = args.end_year
 | 
						|
        if start_year > end_year:
 | 
						|
            raise SystemExit("start-year cannot be greater than end-year.")
 | 
						|
 | 
						|
        client.fetch_search_pages(output_dir / "search", args.page_size, args.max_pages)
 | 
						|
        client.fetch_exports(output_dir / "export", start_year, end_year)
 | 
						|
 | 
						|
    records = scan_artifacts(output_dir)
 | 
						|
    if not records:
 | 
						|
        print(
 | 
						|
            "[certbund] no artefacts discovered. Fetch data first or point --output to the dataset directory.",
 | 
						|
            file=sys.stderr,
 | 
						|
        )
 | 
						|
        return 1
 | 
						|
 | 
						|
    manifest_path = output_dir / "manifest" / "certbund-offline-manifest.json"
 | 
						|
    build_manifest(output_dir, records, manifest_path)
 | 
						|
    return 0
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    sys.exit(main())
 |