Files
git.stella-ops.org/ops/wine-csp/fetch-cryptopro.py
StellaOps Bot bc0762e97d up
2025-12-09 00:20:52 +02:00

165 lines
5.3 KiB
Python

#!/usr/bin/env python3
"""
CryptoPro crawler (metadata only by default).
Fetches https://cryptopro.ru/downloads (or override) with basic auth, recurses linked pages,
and selects candidate Linux packages (.deb/.rpm/.tar.gz/.tgz/.run) or MSI as fallback.
Environment:
CRYPTOPRO_DOWNLOAD_URL: start URL (default: https://cryptopro.ru/downloads)
CRYPTOPRO_USERNAME / CRYPTOPRO_PASSWORD: credentials
CRYPTOPRO_MAX_PAGES: max pages to crawl (default: 20)
CRYPTOPRO_MAX_DEPTH: max link depth (default: 2)
CRYPTOPRO_DRY_RUN: 1 (default) to list only, 0 to enable download
CRYPTOPRO_OUTPUT: output path (default: /opt/cryptopro/csp-installer.bin)
"""
import os
import sys
import re
import html.parser
import urllib.parse
import urllib.request
from collections import deque
SESSION_HEADERS = {
"User-Agent": "StellaOps-CryptoPro-Crawler/1.0 (+https://stella-ops.org)",
}
LINUX_PATTERNS = re.compile(r"\.(deb|rpm|tar\.gz|tgz|run)(?:$|\?)", re.IGNORECASE)
MSI_PATTERN = re.compile(r"\.msi(?:$|\?)", re.IGNORECASE)
def log(msg: str) -> None:
sys.stdout.write(msg + "\n")
sys.stdout.flush()
def warn(msg: str) -> None:
sys.stderr.write("[WARN] " + msg + "\n")
sys.stderr.flush()
class LinkParser(html.parser.HTMLParser):
def __init__(self):
super().__init__()
self.links = []
def handle_starttag(self, tag, attrs):
if tag != "a":
return
href = dict(attrs).get("href")
if href:
self.links.append(href)
def fetch(url: str, auth_handler) -> tuple[str, list[str]]:
opener = urllib.request.build_opener(auth_handler)
req = urllib.request.Request(url, headers=SESSION_HEADERS)
with opener.open(req, timeout=30) as resp:
data = resp.read()
parser = LinkParser()
parser.feed(data.decode("utf-8", errors="ignore"))
return data, parser.links
def resolve_links(base: str, links: list[str]) -> list[str]:
resolved = []
for href in links:
if href.startswith("#") or href.startswith("mailto:"):
continue
resolved.append(urllib.parse.urljoin(base, href))
return resolved
def choose_candidates(urls: list[str]) -> tuple[list[str], list[str]]:
linux = []
msi = []
for u in urls:
if LINUX_PATTERNS.search(u):
linux.append(u)
elif MSI_PATTERN.search(u):
msi.append(u)
# stable ordering
linux = sorted(set(linux))
msi = sorted(set(msi))
return linux, msi
def download(url: str, output_path: str, auth_handler) -> int:
opener = urllib.request.build_opener(auth_handler)
req = urllib.request.Request(url, headers=SESSION_HEADERS)
with opener.open(req, timeout=60) as resp:
with open(output_path, "wb") as f:
f.write(resp.read())
return os.path.getsize(output_path)
def main() -> int:
start_url = os.environ.get("CRYPTOPRO_DOWNLOAD_URL", "https://cryptopro.ru/downloads")
username = os.environ.get("CRYPTOPRO_USERNAME", "contact@stella-ops.org")
password = os.environ.get("CRYPTOPRO_PASSWORD", "Hoko33JD3nj3aJD.")
max_pages = int(os.environ.get("CRYPTOPRO_MAX_PAGES", "20"))
max_depth = int(os.environ.get("CRYPTOPRO_MAX_DEPTH", "2"))
dry_run = os.environ.get("CRYPTOPRO_DRY_RUN", "1") != "0"
output_path = os.environ.get("CRYPTOPRO_OUTPUT", "/opt/cryptopro/csp-installer.bin")
if username == "contact@stella-ops.org" and password == "Hoko33JD3nj3aJD.":
warn("Using default demo credentials; set CRYPTOPRO_USERNAME/CRYPTOPRO_PASSWORD to real customer creds.")
passman = urllib.request.HTTPPasswordMgrWithDefaultRealm()
passman.add_password(None, start_url, username, password)
auth_handler = urllib.request.HTTPBasicAuthHandler(passman)
seen = set()
queue = deque([(start_url, 0)])
crawled = 0
all_links = []
while queue and crawled < max_pages:
url, depth = queue.popleft()
if url in seen or depth > max_depth:
continue
seen.add(url)
try:
data, links = fetch(url, auth_handler)
crawled += 1
log(f"[crawl] {url} ({len(data)} bytes, depth={depth}, links={len(links)})")
except Exception as ex: # noqa: BLE001
warn(f"[crawl] failed {url}: {ex}")
continue
resolved = resolve_links(url, links)
all_links.extend(resolved)
for child in resolved:
if child not in seen and depth + 1 <= max_depth:
queue.append((child, depth + 1))
linux, msi = choose_candidates(all_links)
log(f"[crawl] Linux candidates: {len(linux)}; MSI candidates: {len(msi)}")
if dry_run:
log("[crawl] Dry-run mode: not downloading. Set CRYPTOPRO_DRY_RUN=0 and CRYPTOPRO_OUTPUT to enable download.")
for idx, link in enumerate(linux[:10], 1):
log(f" [linux {idx}] {link}")
for idx, link in enumerate(msi[:5], 1):
log(f" [msi {idx}] {link}")
return 0
os.makedirs(os.path.dirname(output_path), exist_ok=True)
target = None
if linux:
target = linux[0]
elif msi:
target = msi[0]
else:
warn("No candidate downloads found.")
return 1
log(f"[download] Fetching {target} -> {output_path}")
size = download(target, output_path, auth_handler)
log(f"[download] Complete, size={size} bytes")
return 0
if __name__ == "__main__":
sys.exit(main())