#!/usr/bin/env python3 """Tenant isolation smoke test for DEVOPS-OBS-50-002. The script assumes the telemetry storage stack (Tempo + Loki) is running with mutual TLS enabled and enforces `X-Scope-OrgID` multi-tenancy. It performs the following checks: 1. Pushes a trace via the collector OTLP/HTTP endpoint and verifies it is retrievable from Tempo when using the matching tenant header, but not when querying as a different tenant. 2. Pushes a log entry to Loki with a tenant header and verifies it is only visible to the matching tenant. The goal is to provide a deterministic CI-friendly check that our storage configuration preserves tenant isolation guard rails before promoting bundles. """ from __future__ import annotations import argparse import json import ssl import sys import time import urllib.parse import urllib.request import uuid from pathlib import Path def _load_context(ca_file: Path, cert_file: Path, key_file: Path) -> ssl.SSLContext: context = ssl.create_default_context(cafile=str(ca_file)) context.minimum_version = ssl.TLSVersion.TLSv1_2 context.check_hostname = False context.load_cert_chain(certfile=str(cert_file), keyfile=str(key_file)) return context def _post_json(url: str, payload: dict, context: ssl.SSLContext, headers: dict | None = None) -> None: body = json.dumps(payload, separators=(",", ":")).encode("utf-8") request = urllib.request.Request( url, data=body, method="POST", headers={ "Content-Type": "application/json", "User-Agent": "stellaops-tenant-smoke/1.0", **(headers or {}), }, ) with urllib.request.urlopen(request, context=context, timeout=10) as response: status = response.status if status // 100 != 2: raise RuntimeError(f"POST {url} returned HTTP {status}") def _get(url: str, context: ssl.SSLContext, headers: dict | None = None) -> tuple[int, str]: request = urllib.request.Request( url, method="GET", headers={ "User-Agent": "stellaops-tenant-smoke/1.0", **(headers or {}), }, ) try: with urllib.request.urlopen(request, context=context, timeout=10) as response: return response.status, response.read().decode("utf-8") except urllib.error.HTTPError as exc: # type: ignore[attr-defined] body = exc.read().decode("utf-8") if exc.fp else "" return exc.code, body def _payload_trace(trace_id: str, tenant: str) -> dict: return { "resourceSpans": [ { "resource": { "attributes": [ {"key": "service.name", "value": {"stringValue": "tenant-smoke"}}, {"key": "tenant.id", "value": {"stringValue": tenant}}, ] }, "scopeSpans": [ { "scope": {"name": "tenant-smoke"}, "spans": [ { "traceId": trace_id, "spanId": "0000000000000001", "name": "tenant-check", "kind": 1, "startTimeUnixNano": "1730500000000000000", "endTimeUnixNano": "1730500000500000000", "status": {"code": 0}, } ], } ], } ] } def _payload_log(ts_ns: int, tenant: str, marker: str) -> dict: return { "resourceLogs": [ { "resource": { "attributes": [ {"key": "service.name", "value": {"stringValue": "tenant-smoke"}}, {"key": "tenant.id", "value": {"stringValue": tenant}}, ] }, "scopeLogs": [ { "scope": {"name": "tenant-smoke"}, "logRecords": [ { "timeUnixNano": str(ts_ns), "severityNumber": 9, "severityText": "Info", "body": {"stringValue": f"tenant={tenant} marker={marker}"}, } ], } ], } ] } def _assert_tenant_access( tempo_url: str, loki_url: str, collector_url: str, tenant: str, other_tenant: str, context: ssl.SSLContext, ) -> None: trace_id = uuid.uuid4().hex + uuid.uuid4().hex[:16] trace_payload = _payload_trace(trace_id, tenant) _post_json(f"{collector_url}/traces", trace_payload, context) log_marker = uuid.uuid4().hex[:12] timestamp_ns = int(time.time() * 1_000_000_000) log_payload = _payload_log(timestamp_ns, tenant, log_marker) _post_json(f"{collector_url}/logs", log_payload, context) # Allow background processing to flush to storage. time.sleep(2) tempo_headers = {"X-Scope-OrgID": tenant} tempo_status, tempo_body = _get(f"{tempo_url}/api/traces/{trace_id}", context, headers=tempo_headers) if tempo_status != 200: raise AssertionError(f"Tempo returned HTTP {tempo_status} for tenant {tenant}: {tempo_body}") if trace_id not in tempo_body: raise AssertionError("Tempo response missing expected trace data") other_status, _ = _get( f"{tempo_url}/api/traces/{trace_id}", context, headers={"X-Scope-OrgID": other_tenant} ) if other_status not in (401, 403, 404): raise AssertionError( f"Tempo should deny tenant {other_tenant}, received status {other_status}" ) log_query = urllib.parse.urlencode({"query": "{app=\"tenant-smoke\"}"}) loki_status, loki_body = _get( f"{loki_url}/loki/api/v1/query?{log_query}", context, headers={"X-Scope-OrgID": tenant} ) if loki_status != 200: raise AssertionError(f"Loki returned HTTP {loki_status} for tenant {tenant}: {loki_body}") if log_marker not in loki_body: raise AssertionError("Loki response missing expected log entry") other_log_status, other_log_body = _get( f"{loki_url}/loki/api/v1/query?{log_query}", context, headers={"X-Scope-OrgID": other_tenant}, ) if other_log_status == 200 and log_marker in other_log_body: raise AssertionError("Loki returned tenant data to the wrong org") if other_log_status not in (200, 401, 403): raise AssertionError( f"Unexpected Loki status when querying as {other_tenant}: {other_log_status}" ) def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--collector", default="https://localhost:4318/v1", help="Collector OTLP base URL") parser.add_argument("--tempo", default="https://localhost:3200", help="Tempo base URL") parser.add_argument("--loki", default="https://localhost:3100", help="Loki base URL") parser.add_argument("--tenant", default="dev", help="Primary tenant ID to test") parser.add_argument("--other-tenant", default="stage", help="Secondary tenant expected to be denied") parser.add_argument("--ca", type=Path, default=Path("deploy/telemetry/certs/ca.crt"), help="CA certificate path") parser.add_argument( "--cert", type=Path, default=Path("deploy/telemetry/certs/client.crt"), help="mTLS client certificate" ) parser.add_argument( "--key", type=Path, default=Path("deploy/telemetry/certs/client.key"), help="mTLS client key" ) args = parser.parse_args() for path in (args.ca, args.cert, args.key): if not path.exists(): print(f"[!] missing TLS material: {path}", file=sys.stderr) return 1 context = _load_context(args.ca, args.cert, args.key) collector_base = args.collector.rstrip("/") tempo_base = args.tempo.rstrip("/") loki_base = args.loki.rstrip("/") print(f"[*] Validating tenant isolation using tenant={args.tenant} and other={args.other_tenant}") _assert_tenant_access( tempo_base, loki_base, collector_base, tenant=args.tenant, other_tenant=args.other_tenant, context=context, ) print("[✓] Tempo and Loki enforce tenant isolation with mTLS + scoped headers.") return 0 if __name__ == "__main__": raise SystemExit(main())