using System.Collections.Concurrent; using System.Net.WebSockets; using System.Text; using System.Text.Json; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Routing; using StellaOps.Notify.Models; namespace StellaOps.Notifier.WebService.Endpoints; /// /// WebSocket live feed for real-time incident updates. /// public static class IncidentLiveFeed { private static readonly ConcurrentDictionary> _tenantSubscriptions = new(); public static IEndpointRouteBuilder MapIncidentLiveFeed(this IEndpointRouteBuilder app) { app.Map("/api/v2/incidents/live", HandleWebSocketAsync); return app; } private static async Task HandleWebSocketAsync(HttpContext context) { if (!context.WebSockets.IsWebSocketRequest) { context.Response.StatusCode = StatusCodes.Status400BadRequest; await context.Response.WriteAsJsonAsync(new { error = new { code = "websocket_required", message = "This endpoint requires a WebSocket connection.", traceId = context.TraceIdentifier } }); return; } var tenantId = context.Request.Headers["X-StellaOps-Tenant"].ToString(); if (string.IsNullOrWhiteSpace(tenantId)) { // Try query string fallback for WebSocket clients that can't set headers tenantId = context.Request.Query["tenant"].ToString(); } if (string.IsNullOrWhiteSpace(tenantId)) { context.Response.StatusCode = StatusCodes.Status400BadRequest; await context.Response.WriteAsJsonAsync(new { error = new { code = "tenant_missing", message = "X-StellaOps-Tenant header or 'tenant' query parameter is required.", traceId = context.TraceIdentifier } }); return; } using var webSocket = await context.WebSockets.AcceptWebSocketAsync(); var subscriptions = _tenantSubscriptions.GetOrAdd(tenantId, _ => new ConcurrentBag()); subscriptions.Add(webSocket); try { // Send connection acknowledgment var ackMessage = JsonSerializer.Serialize(new { type = "connected", tenantId, timestamp = DateTimeOffset.UtcNow }); await SendMessageAsync(webSocket, ackMessage, context.RequestAborted); // Keep connection alive and handle incoming messages await ReceiveMessagesAsync(webSocket, tenantId, context.RequestAborted); } finally { // Remove from subscriptions var newBag = new ConcurrentBag( subscriptions.Where(s => s != webSocket && s.State == WebSocketState.Open)); _tenantSubscriptions.TryUpdate(tenantId, newBag, subscriptions); } } private static async Task ReceiveMessagesAsync(WebSocket webSocket, string tenantId, CancellationToken cancellationToken) { var buffer = new byte[4096]; while (webSocket.State == WebSocketState.Open && !cancellationToken.IsCancellationRequested) { try { var result = await webSocket.ReceiveAsync(new ArraySegment(buffer), cancellationToken); if (result.MessageType == WebSocketMessageType.Close) { await webSocket.CloseAsync( WebSocketCloseStatus.NormalClosure, "Client initiated close", cancellationToken); break; } if (result.MessageType == WebSocketMessageType.Text) { var message = Encoding.UTF8.GetString(buffer, 0, result.Count); await HandleClientMessageAsync(webSocket, tenantId, message, cancellationToken); } } catch (WebSocketException) { break; } catch (OperationCanceledException) { break; } } } private static async Task HandleClientMessageAsync(WebSocket webSocket, string tenantId, string message, CancellationToken cancellationToken) { try { using var doc = JsonDocument.Parse(message); var root = doc.RootElement; if (root.TryGetProperty("type", out var typeElement)) { var type = typeElement.GetString(); switch (type) { case "ping": var pongResponse = JsonSerializer.Serialize(new { type = "pong", timestamp = DateTimeOffset.UtcNow }); await SendMessageAsync(webSocket, pongResponse, cancellationToken); break; case "subscribe": // Handle filter subscriptions (e.g., specific rule IDs, kinds) var subResponse = JsonSerializer.Serialize(new { type = "subscribed", tenantId, timestamp = DateTimeOffset.UtcNow }); await SendMessageAsync(webSocket, subResponse, cancellationToken); break; default: var errorResponse = JsonSerializer.Serialize(new { type = "error", message = $"Unknown message type: {type}" }); await SendMessageAsync(webSocket, errorResponse, cancellationToken); break; } } } catch (JsonException) { var errorResponse = JsonSerializer.Serialize(new { type = "error", message = "Invalid JSON message" }); await SendMessageAsync(webSocket, errorResponse, cancellationToken); } } private static async Task SendMessageAsync(WebSocket webSocket, string message, CancellationToken cancellationToken) { if (webSocket.State != WebSocketState.Open) { return; } var bytes = Encoding.UTF8.GetBytes(message); await webSocket.SendAsync( new ArraySegment(bytes), WebSocketMessageType.Text, endOfMessage: true, cancellationToken); } /// /// Broadcasts an incident update to all connected clients for the specified tenant. /// public static async Task BroadcastIncidentUpdateAsync( string tenantId, NotifyDelivery delivery, string updateType, CancellationToken cancellationToken = default) { if (!_tenantSubscriptions.TryGetValue(tenantId, out var subscriptions)) { return; } var message = JsonSerializer.Serialize(new { type = "incident_update", updateType, // created, updated, acknowledged, delivered, failed timestamp = DateTimeOffset.UtcNow, incident = new { deliveryId = delivery.DeliveryId, tenantId = delivery.TenantId, ruleId = delivery.RuleId, actionId = delivery.ActionId, eventId = delivery.EventId.ToString(), kind = delivery.Kind, status = delivery.Status.ToString(), statusReason = delivery.StatusReason, attemptCount = delivery.Attempts.Length, createdAt = delivery.CreatedAt, sentAt = delivery.SentAt, completedAt = delivery.CompletedAt } }); var deadSockets = new List(); foreach (var socket in subscriptions) { if (socket.State != WebSocketState.Open) { deadSockets.Add(socket); continue; } try { await SendMessageAsync(socket, message, cancellationToken); } catch { deadSockets.Add(socket); } } // Clean up dead sockets if (deadSockets.Count > 0) { var newBag = new ConcurrentBag( subscriptions.Where(s => !deadSockets.Contains(s))); _tenantSubscriptions.TryUpdate(tenantId, newBag, subscriptions); } } /// /// Broadcasts incident statistics update to all connected clients for the specified tenant. /// public static async Task BroadcastStatsUpdateAsync( string tenantId, int total, int pending, int delivered, int failed, CancellationToken cancellationToken = default) { if (!_tenantSubscriptions.TryGetValue(tenantId, out var subscriptions)) { return; } var message = JsonSerializer.Serialize(new { type = "stats_update", timestamp = DateTimeOffset.UtcNow, stats = new { total, pending, delivered, failed } }); foreach (var socket in subscriptions.Where(s => s.State == WebSocketState.Open)) { try { await SendMessageAsync(socket, message, cancellationToken); } catch { // Ignore send failures } } } /// /// Gets the count of active WebSocket connections for a tenant. /// public static int GetConnectionCount(string tenantId) { if (_tenantSubscriptions.TryGetValue(tenantId, out var subscriptions)) { return subscriptions.Count(s => s.State == WebSocketState.Open); } return 0; } }