#!/usr/bin/env python3 """ Smoke test for the StellaOps OpenTelemetry Collector deployment. The script sends sample traces, metrics, and logs over OTLP/HTTP with mutual TLS and asserts that the collector accepted the payloads by checking its Prometheus metrics endpoint. """ from __future__ import annotations import argparse import json import ssl import sys import time import urllib.request from pathlib import Path TRACE_PAYLOAD = { "resourceSpans": [ { "resource": { "attributes": [ {"key": "service.name", "value": {"stringValue": "smoke-client"}}, {"key": "tenant.id", "value": {"stringValue": "dev"}}, ] }, "scopeSpans": [ { "scope": {"name": "smoke-test"}, "spans": [ { "traceId": "00000000000000000000000000000001", "spanId": "0000000000000001", "name": "smoke-span", "kind": 1, "startTimeUnixNano": "1730000000000000000", "endTimeUnixNano": "1730000000500000000", "status": {"code": 0}, } ], } ], } ] } METRIC_PAYLOAD = { "resourceMetrics": [ { "resource": { "attributes": [ {"key": "service.name", "value": {"stringValue": "smoke-client"}}, {"key": "tenant.id", "value": {"stringValue": "dev"}}, ] }, "scopeMetrics": [ { "scope": {"name": "smoke-test"}, "metrics": [ { "name": "smoke_gauge", "gauge": { "dataPoints": [ { "asDouble": 1.0, "timeUnixNano": "1730000001000000000", "attributes": [ {"key": "phase", "value": {"stringValue": "ingest"}} ], } ] }, } ], } ], } ] } LOG_PAYLOAD = { "resourceLogs": [ { "resource": { "attributes": [ {"key": "service.name", "value": {"stringValue": "smoke-client"}}, {"key": "tenant.id", "value": {"stringValue": "dev"}}, ] }, "scopeLogs": [ { "scope": {"name": "smoke-test"}, "logRecords": [ { "timeUnixNano": "1730000002000000000", "severityNumber": 9, "severityText": "Info", "body": {"stringValue": "StellaOps collector smoke log"}, } ], } ], } ] } def _load_context(ca: Path, cert: Path, key: Path) -> ssl.SSLContext: context = ssl.create_default_context(cafile=str(ca)) context.check_hostname = False context.verify_mode = ssl.CERT_REQUIRED context.load_cert_chain(certfile=str(cert), keyfile=str(key)) return context def _post_json(url: str, payload: dict, context: ssl.SSLContext) -> None: data = json.dumps(payload).encode("utf-8") request = urllib.request.Request( url, data=data, headers={ "Content-Type": "application/json", "User-Agent": "stellaops-otel-smoke/1.0", }, method="POST", ) with urllib.request.urlopen(request, context=context, timeout=10) as response: if response.status // 100 != 2: raise RuntimeError(f"{url} returned HTTP {response.status}") def _fetch_metrics(url: str, context: ssl.SSLContext) -> str: request = urllib.request.Request( url, headers={ "User-Agent": "stellaops-otel-smoke/1.0", }, ) with urllib.request.urlopen(request, context=context, timeout=10) as response: return response.read().decode("utf-8") def _assert_counter(metrics: str, metric_name: str) -> None: for line in metrics.splitlines(): if line.startswith(metric_name): try: _, value = line.split(" ") if float(value) > 0: return except ValueError: continue raise AssertionError(f"{metric_name} not incremented") def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--host", default="localhost", help="Collector host (default: %(default)s)") parser.add_argument("--otlp-port", type=int, default=4318, help="OTLP/HTTP port") parser.add_argument("--metrics-port", type=int, default=9464, help="Prometheus metrics port") parser.add_argument("--health-port", type=int, default=13133, help="Health check port") parser.add_argument("--ca", type=Path, default=Path("deploy/telemetry/certs/ca.crt"), help="CA certificate path") parser.add_argument("--cert", type=Path, default=Path("deploy/telemetry/certs/client.crt"), help="Client certificate path") parser.add_argument("--key", type=Path, default=Path("deploy/telemetry/certs/client.key"), help="Client key path") args = parser.parse_args() for path in (args.ca, args.cert, args.key): if not path.exists(): print(f"[!] missing TLS material: {path}", file=sys.stderr) return 1 context = _load_context(args.ca, args.cert, args.key) otlp_base = f"https://{args.host}:{args.otlp_port}/v1" print(f"[*] Sending OTLP traffic to {otlp_base}") _post_json(f"{otlp_base}/traces", TRACE_PAYLOAD, context) _post_json(f"{otlp_base}/metrics", METRIC_PAYLOAD, context) _post_json(f"{otlp_base}/logs", LOG_PAYLOAD, context) # Allow Prometheus exporter to update metrics time.sleep(2) metrics_url = f"https://{args.host}:{args.metrics_port}/metrics" print(f"[*] Fetching collector metrics from {metrics_url}") metrics = _fetch_metrics(metrics_url, context) _assert_counter(metrics, "otelcol_receiver_accepted_spans") _assert_counter(metrics, "otelcol_receiver_accepted_logs") _assert_counter(metrics, "otelcol_receiver_accepted_metric_points") print("[✓] Collector accepted traces, logs, and metrics.") return 0 if __name__ == "__main__": raise SystemExit(main())