445 lines
15 KiB
Python
445 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Capture CERT-Bund search/export JSON snapshots and generate Offline Kit manifests.
|
|
|
|
The script can bootstrap a session against https://wid.cert-bund.de, fetch
|
|
paginated search results plus per-year export payloads, and emit a manifest
|
|
that records source, date range, SHA-256, and capture timestamps for each artefact.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import hashlib
|
|
import json
|
|
import os
|
|
from pathlib import Path, PurePosixPath
|
|
import sys
|
|
import time
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from http.cookiejar import MozillaCookieJar
|
|
from typing import Any, Dict, Iterable, List, Optional
|
|
|
|
|
|
PORTAL_ROOT = "https://wid.cert-bund.de/portal/"
|
|
SEARCH_ENDPOINT = "https://wid.cert-bund.de/portal/api/securityadvisory/search"
|
|
EXPORT_ENDPOINT = "https://wid.cert-bund.de/portal/api/securityadvisory/export"
|
|
CSRF_ENDPOINT = "https://wid.cert-bund.de/portal/api/security/csrf"
|
|
USER_AGENT = "StellaOps.CertBundOffline/0.1"
|
|
|
|
UTC = dt.timezone.utc
|
|
|
|
|
|
class CertBundClient:
|
|
def __init__(
|
|
self,
|
|
cookie_file: Optional[Path] = None,
|
|
xsrf_token: Optional[str] = None,
|
|
auto_bootstrap: bool = True,
|
|
) -> None:
|
|
self.cookie_path = cookie_file
|
|
self.cookie_jar = MozillaCookieJar()
|
|
|
|
if self.cookie_path and self.cookie_path.exists():
|
|
self.cookie_jar.load(self.cookie_path, ignore_discard=True, ignore_expires=True)
|
|
|
|
self.opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(self.cookie_jar))
|
|
self.opener.addheaders = [("User-Agent", USER_AGENT)]
|
|
|
|
self._xsrf_token = xsrf_token
|
|
self.auto_bootstrap = auto_bootstrap
|
|
|
|
if self.auto_bootstrap and not self._xsrf_token:
|
|
self._bootstrap()
|
|
|
|
@property
|
|
def xsrf_token(self) -> str:
|
|
if self._xsrf_token:
|
|
return self._xsrf_token
|
|
|
|
token = _extract_cookie_value(self.cookie_jar, "XSRF-TOKEN")
|
|
if token:
|
|
self._xsrf_token = token
|
|
return token
|
|
|
|
raise RuntimeError(
|
|
"CERT-Bund XSRF token not available. Provide --xsrf-token or a cookie file "
|
|
"containing XSRF-TOKEN (see docs/ops/feedser-certbund-operations.md)."
|
|
)
|
|
|
|
def fetch_search_pages(
|
|
self,
|
|
destination: Path,
|
|
page_size: int,
|
|
max_pages: int,
|
|
) -> None:
|
|
destination.mkdir(parents=True, exist_ok=True)
|
|
|
|
for page in range(max_pages):
|
|
payload = {
|
|
"page": page,
|
|
"size": page_size,
|
|
"sort": ["published,desc"],
|
|
}
|
|
try:
|
|
document = self._post_json(SEARCH_ENDPOINT, payload)
|
|
except urllib.error.HTTPError as exc:
|
|
raise RuntimeError(
|
|
f"Failed to fetch CERT-Bund search page {page}: HTTP {exc.code}. "
|
|
"Double-check the XSRF token or portal cookies."
|
|
) from exc
|
|
|
|
content = document.get("content") or []
|
|
if not content and page > 0:
|
|
break
|
|
|
|
file_path = destination / f"certbund-search-page-{page:02d}.json"
|
|
_write_pretty_json(file_path, document)
|
|
print(f"[certbund] wrote search page {page:02d} → {file_path}")
|
|
|
|
if not content:
|
|
break
|
|
|
|
self._persist_cookies()
|
|
|
|
def fetch_exports(self, destination: Path, start_year: int, end_year: int) -> None:
|
|
destination.mkdir(parents=True, exist_ok=True)
|
|
|
|
for year in range(start_year, end_year + 1):
|
|
from_value = f"{year}-01-01"
|
|
to_value = f"{year}-12-31"
|
|
query = urllib.parse.urlencode({"format": "json", "from": from_value, "to": to_value})
|
|
url = f"{EXPORT_ENDPOINT}?{query}"
|
|
try:
|
|
document = self._get_json(url)
|
|
except urllib.error.HTTPError as exc:
|
|
raise RuntimeError(
|
|
f"Failed to fetch CERT-Bund export for {year}: HTTP {exc.code}. "
|
|
"Ensure the XSRF token and cookies are valid."
|
|
) from exc
|
|
|
|
file_path = destination / f"certbund-export-{year}.json"
|
|
_write_pretty_json(file_path, document)
|
|
print(f"[certbund] wrote export {year} → {file_path}")
|
|
|
|
self._persist_cookies()
|
|
|
|
def _bootstrap(self) -> None:
|
|
try:
|
|
self._request("GET", PORTAL_ROOT, headers={"Accept": "text/html,application/xhtml+xml"})
|
|
except urllib.error.HTTPError as exc:
|
|
raise RuntimeError(f"Failed to bootstrap CERT-Bund session: HTTP {exc.code}") from exc
|
|
|
|
# First attempt to obtain CSRF token directly.
|
|
self._attempt_csrf_fetch()
|
|
|
|
if _extract_cookie_value(self.cookie_jar, "XSRF-TOKEN"):
|
|
return
|
|
|
|
# If the token is still missing, trigger the search endpoint once (likely 403)
|
|
# to make the portal materialise JSESSIONID, then retry token acquisition.
|
|
try:
|
|
payload = {"page": 0, "size": 1, "sort": ["published,desc"]}
|
|
self._post_json(SEARCH_ENDPOINT, payload, include_token=False)
|
|
except urllib.error.HTTPError:
|
|
pass
|
|
|
|
self._attempt_csrf_fetch()
|
|
|
|
token = _extract_cookie_value(self.cookie_jar, "XSRF-TOKEN")
|
|
if token:
|
|
self._xsrf_token = token
|
|
else:
|
|
print(
|
|
"[certbund] warning: automatic XSRF token retrieval failed. "
|
|
"Supply --xsrf-token or reuse a browser-exported cookies file.",
|
|
file=sys.stderr,
|
|
)
|
|
|
|
def _attempt_csrf_fetch(self) -> None:
|
|
headers = {
|
|
"Accept": "application/json, text/plain, */*",
|
|
"X-Requested-With": "XMLHttpRequest",
|
|
"Origin": "https://wid.cert-bund.de",
|
|
"Referer": PORTAL_ROOT,
|
|
}
|
|
try:
|
|
self._request("GET", CSRF_ENDPOINT, headers=headers)
|
|
except urllib.error.HTTPError:
|
|
pass
|
|
|
|
def _request(self, method: str, url: str, data: Optional[bytes] = None, headers: Optional[Dict[str, str]] = None) -> bytes:
|
|
request = urllib.request.Request(url, data=data, method=method)
|
|
default_headers = {
|
|
"User-Agent": USER_AGENT,
|
|
"Accept": "application/json",
|
|
}
|
|
for key, value in default_headers.items():
|
|
request.add_header(key, value)
|
|
|
|
if headers:
|
|
for key, value in headers.items():
|
|
request.add_header(key, value)
|
|
|
|
return self.opener.open(request, timeout=60).read()
|
|
|
|
def _post_json(self, url: str, payload: Dict[str, Any], include_token: bool = True) -> Dict[str, Any]:
|
|
data = json.dumps(payload).encode("utf-8")
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json",
|
|
"X-Requested-With": "XMLHttpRequest",
|
|
"Origin": "https://wid.cert-bund.de",
|
|
"Referer": PORTAL_ROOT,
|
|
}
|
|
if include_token:
|
|
headers["X-XSRF-TOKEN"] = self.xsrf_token
|
|
|
|
raw = self._request("POST", url, data=data, headers=headers)
|
|
return json.loads(raw.decode("utf-8"))
|
|
|
|
def _get_json(self, url: str) -> Any:
|
|
headers = {
|
|
"Accept": "application/json",
|
|
"X-Requested-With": "XMLHttpRequest",
|
|
"Referer": PORTAL_ROOT,
|
|
}
|
|
headers["X-XSRF-TOKEN"] = self.xsrf_token
|
|
|
|
raw = self._request("GET", url, headers=headers)
|
|
return json.loads(raw.decode("utf-8"))
|
|
|
|
def _persist_cookies(self) -> None:
|
|
if not self.cookie_path:
|
|
return
|
|
|
|
self.cookie_path.parent.mkdir(parents=True, exist_ok=True)
|
|
self.cookie_jar.save(self.cookie_path, ignore_discard=True, ignore_expires=True)
|
|
|
|
|
|
def _extract_cookie_value(jar: MozillaCookieJar, name: str) -> Optional[str]:
|
|
for cookie in jar:
|
|
if cookie.name == name:
|
|
return cookie.value
|
|
return None
|
|
|
|
|
|
def _write_pretty_json(path: Path, document: Any) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with path.open("w", encoding="utf-8") as handle:
|
|
json.dump(document, handle, ensure_ascii=False, indent=2, sort_keys=True)
|
|
handle.write("\n")
|
|
|
|
|
|
def scan_artifacts(root: Path) -> List[Dict[str, Any]]:
|
|
records: List[Dict[str, Any]] = []
|
|
search_dir = root / "search"
|
|
export_dir = root / "export"
|
|
|
|
if search_dir.exists():
|
|
for file_path in sorted(search_dir.glob("certbund-search-page-*.json")):
|
|
record = _build_search_record(file_path)
|
|
records.append(record)
|
|
|
|
if export_dir.exists():
|
|
for file_path in sorted(export_dir.glob("certbund-export-*.json")):
|
|
record = _build_export_record(file_path)
|
|
records.append(record)
|
|
|
|
return records
|
|
|
|
|
|
def _build_search_record(path: Path) -> Dict[str, Any]:
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
data = json.load(handle)
|
|
|
|
content = data.get("content") or []
|
|
published_values: List[str] = []
|
|
for item in content:
|
|
published = (
|
|
item.get("published")
|
|
or item.get("publishedAt")
|
|
or item.get("datePublished")
|
|
or item.get("published_date")
|
|
)
|
|
if isinstance(published, str):
|
|
published_values.append(published)
|
|
|
|
if published_values:
|
|
try:
|
|
ordered = sorted(_parse_iso_timestamp(value) for value in published_values if value)
|
|
range_from = ordered[0].isoformat()
|
|
range_to = ordered[-1].isoformat()
|
|
except ValueError:
|
|
range_from = range_to = None
|
|
else:
|
|
range_from = range_to = None
|
|
|
|
return {
|
|
"type": "search",
|
|
"path": path,
|
|
"source": "feedser.cert-bund.search",
|
|
"itemCount": len(content),
|
|
"from": range_from,
|
|
"to": range_to,
|
|
"capturedAt": _timestamp_from_stat(path),
|
|
}
|
|
|
|
|
|
def _build_export_record(path: Path) -> Dict[str, Any]:
|
|
year = _extract_year_from_filename(path.name)
|
|
if year is not None:
|
|
from_value = f"{year}-01-01"
|
|
to_value = f"{year}-12-31"
|
|
else:
|
|
from_value = None
|
|
to_value = None
|
|
|
|
return {
|
|
"type": "export",
|
|
"path": path,
|
|
"source": "feedser.cert-bund.export",
|
|
"itemCount": None,
|
|
"from": from_value,
|
|
"to": to_value,
|
|
"capturedAt": _timestamp_from_stat(path),
|
|
}
|
|
|
|
|
|
def _timestamp_from_stat(path: Path) -> str:
|
|
stat = path.stat()
|
|
return dt.datetime.fromtimestamp(stat.st_mtime, tz=UTC).isoformat()
|
|
|
|
|
|
def _extract_year_from_filename(name: str) -> Optional[int]:
|
|
stem = Path(name).stem
|
|
parts = stem.split("-")
|
|
if parts and parts[-1].isdigit() and len(parts[-1]) == 4:
|
|
return int(parts[-1])
|
|
return None
|
|
|
|
|
|
def _parse_iso_timestamp(value: str) -> dt.datetime:
|
|
try:
|
|
return dt.datetime.fromisoformat(value.replace("Z", "+00:00"))
|
|
except ValueError:
|
|
# Fallback for formats like 2025-10-14T06:24:49
|
|
return dt.datetime.strptime(value, "%Y-%m-%dT%H:%M:%S").replace(tzinfo=UTC)
|
|
|
|
|
|
def build_manifest(root: Path, records: Iterable[Dict[str, Any]], manifest_path: Path) -> None:
|
|
manifest_entries = []
|
|
for record in records:
|
|
path = record["path"]
|
|
rel_path = PurePosixPath(path.relative_to(root).as_posix())
|
|
sha256 = hashlib.sha256(path.read_bytes()).hexdigest()
|
|
size = path.stat().st_size
|
|
|
|
entry = {
|
|
"source": record["source"],
|
|
"type": record["type"],
|
|
"path": str(rel_path),
|
|
"sha256": sha256,
|
|
"sizeBytes": size,
|
|
"capturedAt": record["capturedAt"],
|
|
"from": record.get("from"),
|
|
"to": record.get("to"),
|
|
"itemCount": record.get("itemCount"),
|
|
}
|
|
manifest_entries.append(entry)
|
|
|
|
sha_file = path.with_suffix(path.suffix + ".sha256")
|
|
_write_sha_file(sha_file, sha256, path.name)
|
|
|
|
manifest_entries.sort(key=lambda item: item["path"])
|
|
|
|
manifest_path.parent.mkdir(parents=True, exist_ok=True)
|
|
manifest_document = {
|
|
"source": "feedser.cert-bund",
|
|
"generatedAt": dt.datetime.now(tz=UTC).isoformat(),
|
|
"artifacts": manifest_entries,
|
|
}
|
|
|
|
with manifest_path.open("w", encoding="utf-8") as handle:
|
|
json.dump(manifest_document, handle, ensure_ascii=False, indent=2, sort_keys=True)
|
|
handle.write("\n")
|
|
|
|
manifest_sha = hashlib.sha256(manifest_path.read_bytes()).hexdigest()
|
|
_write_sha_file(manifest_path.with_suffix(".sha256"), manifest_sha, manifest_path.name)
|
|
|
|
print(f"[certbund] manifest generated → {manifest_path}")
|
|
|
|
|
|
def _write_sha_file(path: Path, digest: str, filename: str) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with path.open("w", encoding="utf-8") as handle:
|
|
handle.write(f"{digest} {filename}\n")
|
|
|
|
|
|
def parse_args(argv: Optional[List[str]] = None) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Capture CERT-Bund search/export snapshots for Offline Kit packaging.",
|
|
)
|
|
parser.add_argument("--output", default="seed-data/cert-bund", help="Destination directory for artefacts.")
|
|
parser.add_argument("--start-year", type=int, default=2014, help="First year (inclusive) for export snapshots.")
|
|
parser.add_argument(
|
|
"--end-year",
|
|
type=int,
|
|
default=dt.datetime.now(tz=UTC).year,
|
|
help="Last year (inclusive) for export snapshots.",
|
|
)
|
|
parser.add_argument("--page-size", type=int, default=100, help="Search page size.")
|
|
parser.add_argument("--max-pages", type=int, default=12, help="Maximum number of search result pages to capture.")
|
|
parser.add_argument("--cookie-file", type=Path, help="Path to a Netscape cookie file to reuse/persist session cookies.")
|
|
parser.add_argument("--xsrf-token", help="Optional explicit XSRF token value (overrides cookie discovery).")
|
|
parser.add_argument(
|
|
"--skip-fetch",
|
|
action="store_true",
|
|
help="Skip HTTP fetches and only regenerate manifest from existing files.",
|
|
)
|
|
parser.add_argument(
|
|
"--no-bootstrap",
|
|
action="store_true",
|
|
help="Do not attempt automatic session bootstrap (use with --skip-fetch or pre-populated cookies).",
|
|
)
|
|
return parser.parse_args(argv)
|
|
|
|
|
|
def main(argv: Optional[List[str]] = None) -> int:
|
|
args = parse_args(argv)
|
|
output_dir = Path(args.output).expanduser().resolve()
|
|
|
|
if not args.skip_fetch:
|
|
client = CertBundClient(
|
|
cookie_file=args.cookie_file,
|
|
xsrf_token=args.xsrf_token,
|
|
auto_bootstrap=not args.no_bootstrap,
|
|
)
|
|
|
|
start_year = args.start_year
|
|
end_year = args.end_year
|
|
if start_year > end_year:
|
|
raise SystemExit("start-year cannot be greater than end-year.")
|
|
|
|
client.fetch_search_pages(output_dir / "search", args.page_size, args.max_pages)
|
|
client.fetch_exports(output_dir / "export", start_year, end_year)
|
|
|
|
records = scan_artifacts(output_dir)
|
|
if not records:
|
|
print(
|
|
"[certbund] no artefacts discovered. Fetch data first or point --output to the dataset directory.",
|
|
file=sys.stderr,
|
|
)
|
|
return 1
|
|
|
|
manifest_path = output_dir / "manifest" / "certbund-offline-manifest.json"
|
|
build_manifest(output_dir, records, manifest_path)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|