Files
git.stella-ops.org/tools/certbund_offline_snapshot.py
2025-10-18 20:46:16 +03:00

445 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Capture CERT-Bund search/export JSON snapshots and generate Offline Kit manifests.
The script can bootstrap a session against https://wid.cert-bund.de, fetch
paginated search results plus per-year export payloads, and emit a manifest
that records source, date range, SHA-256, and capture timestamps for each artefact.
"""
from __future__ import annotations
import argparse
import datetime as dt
import hashlib
import json
import os
from pathlib import Path, PurePosixPath
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from http.cookiejar import MozillaCookieJar
from typing import Any, Dict, Iterable, List, Optional
PORTAL_ROOT = "https://wid.cert-bund.de/portal/"
SEARCH_ENDPOINT = "https://wid.cert-bund.de/portal/api/securityadvisory/search"
EXPORT_ENDPOINT = "https://wid.cert-bund.de/portal/api/securityadvisory/export"
CSRF_ENDPOINT = "https://wid.cert-bund.de/portal/api/security/csrf"
USER_AGENT = "StellaOps.CertBundOffline/0.1"
UTC = dt.timezone.utc
class CertBundClient:
def __init__(
self,
cookie_file: Optional[Path] = None,
xsrf_token: Optional[str] = None,
auto_bootstrap: bool = True,
) -> None:
self.cookie_path = cookie_file
self.cookie_jar = MozillaCookieJar()
if self.cookie_path and self.cookie_path.exists():
self.cookie_jar.load(self.cookie_path, ignore_discard=True, ignore_expires=True)
self.opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(self.cookie_jar))
self.opener.addheaders = [("User-Agent", USER_AGENT)]
self._xsrf_token = xsrf_token
self.auto_bootstrap = auto_bootstrap
if self.auto_bootstrap and not self._xsrf_token:
self._bootstrap()
@property
def xsrf_token(self) -> str:
if self._xsrf_token:
return self._xsrf_token
token = _extract_cookie_value(self.cookie_jar, "XSRF-TOKEN")
if token:
self._xsrf_token = token
return token
raise RuntimeError(
"CERT-Bund XSRF token not available. Provide --xsrf-token or a cookie file "
"containing XSRF-TOKEN (see docs/ops/concelier-certbund-operations.md)."
)
def fetch_search_pages(
self,
destination: Path,
page_size: int,
max_pages: int,
) -> None:
destination.mkdir(parents=True, exist_ok=True)
for page in range(max_pages):
payload = {
"page": page,
"size": page_size,
"sort": ["published,desc"],
}
try:
document = self._post_json(SEARCH_ENDPOINT, payload)
except urllib.error.HTTPError as exc:
raise RuntimeError(
f"Failed to fetch CERT-Bund search page {page}: HTTP {exc.code}. "
"Double-check the XSRF token or portal cookies."
) from exc
content = document.get("content") or []
if not content and page > 0:
break
file_path = destination / f"certbund-search-page-{page:02d}.json"
_write_pretty_json(file_path, document)
print(f"[certbund] wrote search page {page:02d}{file_path}")
if not content:
break
self._persist_cookies()
def fetch_exports(self, destination: Path, start_year: int, end_year: int) -> None:
destination.mkdir(parents=True, exist_ok=True)
for year in range(start_year, end_year + 1):
from_value = f"{year}-01-01"
to_value = f"{year}-12-31"
query = urllib.parse.urlencode({"format": "json", "from": from_value, "to": to_value})
url = f"{EXPORT_ENDPOINT}?{query}"
try:
document = self._get_json(url)
except urllib.error.HTTPError as exc:
raise RuntimeError(
f"Failed to fetch CERT-Bund export for {year}: HTTP {exc.code}. "
"Ensure the XSRF token and cookies are valid."
) from exc
file_path = destination / f"certbund-export-{year}.json"
_write_pretty_json(file_path, document)
print(f"[certbund] wrote export {year}{file_path}")
self._persist_cookies()
def _bootstrap(self) -> None:
try:
self._request("GET", PORTAL_ROOT, headers={"Accept": "text/html,application/xhtml+xml"})
except urllib.error.HTTPError as exc:
raise RuntimeError(f"Failed to bootstrap CERT-Bund session: HTTP {exc.code}") from exc
# First attempt to obtain CSRF token directly.
self._attempt_csrf_fetch()
if _extract_cookie_value(self.cookie_jar, "XSRF-TOKEN"):
return
# If the token is still missing, trigger the search endpoint once (likely 403)
# to make the portal materialise JSESSIONID, then retry token acquisition.
try:
payload = {"page": 0, "size": 1, "sort": ["published,desc"]}
self._post_json(SEARCH_ENDPOINT, payload, include_token=False)
except urllib.error.HTTPError:
pass
self._attempt_csrf_fetch()
token = _extract_cookie_value(self.cookie_jar, "XSRF-TOKEN")
if token:
self._xsrf_token = token
else:
print(
"[certbund] warning: automatic XSRF token retrieval failed. "
"Supply --xsrf-token or reuse a browser-exported cookies file.",
file=sys.stderr,
)
def _attempt_csrf_fetch(self) -> None:
headers = {
"Accept": "application/json, text/plain, */*",
"X-Requested-With": "XMLHttpRequest",
"Origin": "https://wid.cert-bund.de",
"Referer": PORTAL_ROOT,
}
try:
self._request("GET", CSRF_ENDPOINT, headers=headers)
except urllib.error.HTTPError:
pass
def _request(self, method: str, url: str, data: Optional[bytes] = None, headers: Optional[Dict[str, str]] = None) -> bytes:
request = urllib.request.Request(url, data=data, method=method)
default_headers = {
"User-Agent": USER_AGENT,
"Accept": "application/json",
}
for key, value in default_headers.items():
request.add_header(key, value)
if headers:
for key, value in headers.items():
request.add_header(key, value)
return self.opener.open(request, timeout=60).read()
def _post_json(self, url: str, payload: Dict[str, Any], include_token: bool = True) -> Dict[str, Any]:
data = json.dumps(payload).encode("utf-8")
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"X-Requested-With": "XMLHttpRequest",
"Origin": "https://wid.cert-bund.de",
"Referer": PORTAL_ROOT,
}
if include_token:
headers["X-XSRF-TOKEN"] = self.xsrf_token
raw = self._request("POST", url, data=data, headers=headers)
return json.loads(raw.decode("utf-8"))
def _get_json(self, url: str) -> Any:
headers = {
"Accept": "application/json",
"X-Requested-With": "XMLHttpRequest",
"Referer": PORTAL_ROOT,
}
headers["X-XSRF-TOKEN"] = self.xsrf_token
raw = self._request("GET", url, headers=headers)
return json.loads(raw.decode("utf-8"))
def _persist_cookies(self) -> None:
if not self.cookie_path:
return
self.cookie_path.parent.mkdir(parents=True, exist_ok=True)
self.cookie_jar.save(self.cookie_path, ignore_discard=True, ignore_expires=True)
def _extract_cookie_value(jar: MozillaCookieJar, name: str) -> Optional[str]:
for cookie in jar:
if cookie.name == name:
return cookie.value
return None
def _write_pretty_json(path: Path, document: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as handle:
json.dump(document, handle, ensure_ascii=False, indent=2, sort_keys=True)
handle.write("\n")
def scan_artifacts(root: Path) -> List[Dict[str, Any]]:
records: List[Dict[str, Any]] = []
search_dir = root / "search"
export_dir = root / "export"
if search_dir.exists():
for file_path in sorted(search_dir.glob("certbund-search-page-*.json")):
record = _build_search_record(file_path)
records.append(record)
if export_dir.exists():
for file_path in sorted(export_dir.glob("certbund-export-*.json")):
record = _build_export_record(file_path)
records.append(record)
return records
def _build_search_record(path: Path) -> Dict[str, Any]:
with path.open("r", encoding="utf-8") as handle:
data = json.load(handle)
content = data.get("content") or []
published_values: List[str] = []
for item in content:
published = (
item.get("published")
or item.get("publishedAt")
or item.get("datePublished")
or item.get("published_date")
)
if isinstance(published, str):
published_values.append(published)
if published_values:
try:
ordered = sorted(_parse_iso_timestamp(value) for value in published_values if value)
range_from = ordered[0].isoformat()
range_to = ordered[-1].isoformat()
except ValueError:
range_from = range_to = None
else:
range_from = range_to = None
return {
"type": "search",
"path": path,
"source": "concelier.cert-bund.search",
"itemCount": len(content),
"from": range_from,
"to": range_to,
"capturedAt": _timestamp_from_stat(path),
}
def _build_export_record(path: Path) -> Dict[str, Any]:
year = _extract_year_from_filename(path.name)
if year is not None:
from_value = f"{year}-01-01"
to_value = f"{year}-12-31"
else:
from_value = None
to_value = None
return {
"type": "export",
"path": path,
"source": "concelier.cert-bund.export",
"itemCount": None,
"from": from_value,
"to": to_value,
"capturedAt": _timestamp_from_stat(path),
}
def _timestamp_from_stat(path: Path) -> str:
stat = path.stat()
return dt.datetime.fromtimestamp(stat.st_mtime, tz=UTC).isoformat()
def _extract_year_from_filename(name: str) -> Optional[int]:
stem = Path(name).stem
parts = stem.split("-")
if parts and parts[-1].isdigit() and len(parts[-1]) == 4:
return int(parts[-1])
return None
def _parse_iso_timestamp(value: str) -> dt.datetime:
try:
return dt.datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
# Fallback for formats like 2025-10-14T06:24:49
return dt.datetime.strptime(value, "%Y-%m-%dT%H:%M:%S").replace(tzinfo=UTC)
def build_manifest(root: Path, records: Iterable[Dict[str, Any]], manifest_path: Path) -> None:
manifest_entries = []
for record in records:
path = record["path"]
rel_path = PurePosixPath(path.relative_to(root).as_posix())
sha256 = hashlib.sha256(path.read_bytes()).hexdigest()
size = path.stat().st_size
entry = {
"source": record["source"],
"type": record["type"],
"path": str(rel_path),
"sha256": sha256,
"sizeBytes": size,
"capturedAt": record["capturedAt"],
"from": record.get("from"),
"to": record.get("to"),
"itemCount": record.get("itemCount"),
}
manifest_entries.append(entry)
sha_file = path.with_suffix(path.suffix + ".sha256")
_write_sha_file(sha_file, sha256, path.name)
manifest_entries.sort(key=lambda item: item["path"])
manifest_path.parent.mkdir(parents=True, exist_ok=True)
manifest_document = {
"source": "concelier.cert-bund",
"generatedAt": dt.datetime.now(tz=UTC).isoformat(),
"artifacts": manifest_entries,
}
with manifest_path.open("w", encoding="utf-8") as handle:
json.dump(manifest_document, handle, ensure_ascii=False, indent=2, sort_keys=True)
handle.write("\n")
manifest_sha = hashlib.sha256(manifest_path.read_bytes()).hexdigest()
_write_sha_file(manifest_path.with_suffix(".sha256"), manifest_sha, manifest_path.name)
print(f"[certbund] manifest generated → {manifest_path}")
def _write_sha_file(path: Path, digest: str, filename: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as handle:
handle.write(f"{digest} {filename}\n")
def parse_args(argv: Optional[List[str]] = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Capture CERT-Bund search/export snapshots for Offline Kit packaging.",
)
parser.add_argument("--output", default="seed-data/cert-bund", help="Destination directory for artefacts.")
parser.add_argument("--start-year", type=int, default=2014, help="First year (inclusive) for export snapshots.")
parser.add_argument(
"--end-year",
type=int,
default=dt.datetime.now(tz=UTC).year,
help="Last year (inclusive) for export snapshots.",
)
parser.add_argument("--page-size", type=int, default=100, help="Search page size.")
parser.add_argument("--max-pages", type=int, default=12, help="Maximum number of search result pages to capture.")
parser.add_argument("--cookie-file", type=Path, help="Path to a Netscape cookie file to reuse/persist session cookies.")
parser.add_argument("--xsrf-token", help="Optional explicit XSRF token value (overrides cookie discovery).")
parser.add_argument(
"--skip-fetch",
action="store_true",
help="Skip HTTP fetches and only regenerate manifest from existing files.",
)
parser.add_argument(
"--no-bootstrap",
action="store_true",
help="Do not attempt automatic session bootstrap (use with --skip-fetch or pre-populated cookies).",
)
return parser.parse_args(argv)
def main(argv: Optional[List[str]] = None) -> int:
args = parse_args(argv)
output_dir = Path(args.output).expanduser().resolve()
if not args.skip_fetch:
client = CertBundClient(
cookie_file=args.cookie_file,
xsrf_token=args.xsrf_token,
auto_bootstrap=not args.no_bootstrap,
)
start_year = args.start_year
end_year = args.end_year
if start_year > end_year:
raise SystemExit("start-year cannot be greater than end-year.")
client.fetch_search_pages(output_dir / "search", args.page_size, args.max_pages)
client.fetch_exports(output_dir / "export", start_year, end_year)
records = scan_artifacts(output_dir)
if not records:
print(
"[certbund] no artefacts discovered. Fetch data first or point --output to the dataset directory.",
file=sys.stderr,
)
return 1
manifest_path = output_dir / "manifest" / "certbund-offline-manifest.json"
build_manifest(output_dir, records, manifest_path)
return 0
if __name__ == "__main__":
sys.exit(main())