- Added detailed task completion records for KMS interface implementation and CLI support for file-based keys. - Documented security enhancements including Argon2id password hashing, audit event contracts, and rate limiting configurations. - Included scoped service support and integration updates for the Plugin platform, ensuring proper DI handling and testing coverage.
		
			
				
	
	
		
			84 lines
		
	
	
		
			3.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			84 lines
		
	
	
		
			3.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/env python3
 | 
						|
"""
 | 
						|
Static validation for the telemetry storage stack configuration.
 | 
						|
 | 
						|
Checks the Prometheus, Tempo, and Loki configuration snippets to ensure:
 | 
						|
- mutual TLS is enabled end-to-end
 | 
						|
- tenant override files are referenced
 | 
						|
- multitenancy flags are set
 | 
						|
- retention/limit defaults exist for __default__ tenant entries
 | 
						|
 | 
						|
This script is intended to back `DEVOPS-OBS-50-002` and can run in CI
 | 
						|
before publishing bundles or rolling out staging updates.
 | 
						|
"""
 | 
						|
 | 
						|
from __future__ import annotations
 | 
						|
 | 
						|
import sys
 | 
						|
from pathlib import Path
 | 
						|
 | 
						|
REPO_ROOT = Path(__file__).resolve().parents[3]
 | 
						|
PROMETHEUS_PATH = REPO_ROOT / "deploy/telemetry/storage/prometheus.yaml"
 | 
						|
TEMPO_PATH = REPO_ROOT / "deploy/telemetry/storage/tempo.yaml"
 | 
						|
LOKI_PATH = REPO_ROOT / "deploy/telemetry/storage/loki.yaml"
 | 
						|
TEMPO_OVERRIDES_PATH = REPO_ROOT / "deploy/telemetry/storage/tenants/tempo-overrides.yaml"
 | 
						|
LOKI_OVERRIDES_PATH = REPO_ROOT / "deploy/telemetry/storage/tenants/loki-overrides.yaml"
 | 
						|
 | 
						|
 | 
						|
def read(path: Path) -> str:
 | 
						|
    if not path.exists():
 | 
						|
        raise FileNotFoundError(f"Required configuration file missing: {path}")
 | 
						|
    return path.read_text(encoding="utf-8")
 | 
						|
 | 
						|
 | 
						|
def assert_contains(haystack: str, needle: str, path: Path) -> None:
 | 
						|
    if needle not in haystack:
 | 
						|
        raise AssertionError(f"{path} is missing required snippet: {needle!r}")
 | 
						|
 | 
						|
 | 
						|
def validate_prometheus() -> None:
 | 
						|
    content = read(PROMETHEUS_PATH)
 | 
						|
    assert_contains(content, "tls_config:", PROMETHEUS_PATH)
 | 
						|
    assert_contains(content, "ca_file:", PROMETHEUS_PATH)
 | 
						|
    assert_contains(content, "cert_file:", PROMETHEUS_PATH)
 | 
						|
    assert_contains(content, "key_file:", PROMETHEUS_PATH)
 | 
						|
    assert_contains(content, "authorization:", PROMETHEUS_PATH)
 | 
						|
    assert_contains(content, "credentials_file:", PROMETHEUS_PATH)
 | 
						|
 | 
						|
 | 
						|
def validate_tempo() -> None:
 | 
						|
    content = read(TEMPO_PATH)
 | 
						|
    assert_contains(content, "multitenancy_enabled: true", TEMPO_PATH)
 | 
						|
    assert_contains(content, "require_client_cert: true", TEMPO_PATH)
 | 
						|
    assert_contains(content, "per_tenant_override_config", TEMPO_PATH)
 | 
						|
    overrides = read(TEMPO_OVERRIDES_PATH)
 | 
						|
    assert_contains(overrides, "__default__", TEMPO_OVERRIDES_PATH)
 | 
						|
    assert_contains(overrides, "traces_per_second_limit", TEMPO_OVERRIDES_PATH)
 | 
						|
    assert_contains(overrides, "max_bytes_per_trace", TEMPO_OVERRIDES_PATH)
 | 
						|
 | 
						|
 | 
						|
def validate_loki() -> None:
 | 
						|
    content = read(LOKI_PATH)
 | 
						|
    assert_contains(content, "auth_enabled: true", LOKI_PATH)
 | 
						|
    assert_contains(content, "per_tenant_override_config", LOKI_PATH)
 | 
						|
    overrides = read(LOKI_OVERRIDES_PATH)
 | 
						|
    assert_contains(overrides, "__default__", LOKI_OVERRIDES_PATH)
 | 
						|
    assert_contains(overrides, "retention_period", LOKI_OVERRIDES_PATH)
 | 
						|
 | 
						|
 | 
						|
def main() -> int:
 | 
						|
    try:
 | 
						|
        validate_prometheus()
 | 
						|
        validate_tempo()
 | 
						|
        validate_loki()
 | 
						|
    except (AssertionError, FileNotFoundError) as exc:
 | 
						|
        print(f"[❌] telemetry storage validation failed: {exc}", file=sys.stderr)
 | 
						|
        return 1
 | 
						|
 | 
						|
    print("[✓] telemetry storage configuration meets multi-tenant guard rails.")
 | 
						|
    return 0
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    sys.exit(main())
 |