From 973cc8b3358e9b54569dfe55e21d2f908485d22d Mon Sep 17 00:00:00 2001 From: master <> Date: Fri, 6 Mar 2026 03:11:28 +0200 Subject: [PATCH] qa iteration 4 Add Valkey messaging transport auto-reconnection: - MessagingTransportClient: detect persistent Redis failures (5 consecutive) and exit processing loops instead of retrying forever with dead connection - IMicroserviceTransport: add TransportDied event to interface - RouterConnectionManager: listen for TransportDied, auto-reconnect after 2s - Fixes services becoming unreachable after Valkey blip during restarts Co-Authored-By: Claude Opus 4.6 (1M context) --- .../RouterConnectionManager.cs | 53 +++++++++++++++++ .../Abstractions/IMicroserviceTransport.cs | 6 ++ .../MessagingTransportClient.cs | 59 +++++++++++++++++-- 3 files changed, 114 insertions(+), 4 deletions(-) diff --git a/src/Router/__Libraries/StellaOps.Microservice/RouterConnectionManager.cs b/src/Router/__Libraries/StellaOps.Microservice/RouterConnectionManager.cs index 286c5233a..68cb972a6 100644 --- a/src/Router/__Libraries/StellaOps.Microservice/RouterConnectionManager.cs +++ b/src/Router/__Libraries/StellaOps.Microservice/RouterConnectionManager.cs @@ -134,6 +134,9 @@ public sealed class RouterConnectionManager : IRouterConnectionManager, IDisposa // Establish transport connection to the gateway (InMemory/TCP/RabbitMQ/etc). if (_microserviceTransport is not null) { + // Listen for transport death to trigger automatic reconnection + _microserviceTransport.TransportDied += OnTransportDied; + var instance = new InstanceDescriptor { InstanceId = _options.InstanceId, @@ -306,6 +309,56 @@ public sealed class RouterConnectionManager : IRouterConnectionManager, IDisposa } } + private void OnTransportDied() + { + if (_disposed) return; + + _logger.LogWarning( + "Messaging transport died for {ServiceName}/{Version}. Scheduling reconnection...", + _options.ServiceName, _options.Version); + + // Fire-and-forget reconnection on a background thread + _ = Task.Run(async () => + { + try + { + await Task.Delay(2000); // Brief pause before reconnecting + + if (_disposed || _cts.IsCancellationRequested) return; + + _logger.LogInformation("Attempting messaging transport reconnection..."); + + if (_microserviceTransport is null || _endpoints is null) return; + + var instance = new InstanceDescriptor + { + InstanceId = _options.InstanceId, + ServiceName = _options.ServiceName, + Version = _options.Version, + Region = _options.Region + }; + + await _microserviceTransport.ConnectAsync( + instance, + _endpoints, + _schemas, + _openApiInfo, + _cts.Token); + + _logger.LogInformation( + "Messaging transport reconnected for {ServiceName}/{Version}", + _options.ServiceName, _options.Version); + } + catch (Exception ex) + { + _logger.LogError(ex, + "Failed to reconnect messaging transport for {ServiceName}/{Version}. " + + "Service will need manual restart.", + _options.ServiceName, _options.Version); + } + }); + } + private async Task HeartbeatLoopAsync(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) diff --git a/src/Router/__Libraries/StellaOps.Router.Common/Abstractions/IMicroserviceTransport.cs b/src/Router/__Libraries/StellaOps.Router.Common/Abstractions/IMicroserviceTransport.cs index 3a2efce6f..106a45e7f 100644 --- a/src/Router/__Libraries/StellaOps.Router.Common/Abstractions/IMicroserviceTransport.cs +++ b/src/Router/__Libraries/StellaOps.Router.Common/Abstractions/IMicroserviceTransport.cs @@ -58,4 +58,10 @@ public interface IMicroserviceTransport /// Event raised when a CANCEL frame is received from the gateway. /// event Func? OnCancelReceived; + + /// + /// Raised when the transport connection is permanently lost and cannot recover. + /// Consumers should reconnect by calling again. + /// + event Action? TransportDied { add { } remove { } } } diff --git a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportClient.cs b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportClient.cs index f6db01ab7..638f6443e 100644 --- a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportClient.cs +++ b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportClient.cs @@ -35,6 +35,13 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr private string? _connectionId; private InstanceDescriptor? _instance; private bool _disposed; + private volatile bool _transportDead; + + /// + /// Max consecutive transport-level failures before the processing loops exit. + /// After this threshold, the transport is considered dead and must be reconnected. + /// + private const int MaxConsecutiveTransportFailures = 5; /// public event Func>? OnRequestReceived; @@ -147,12 +154,26 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr var incomingTask = ProcessIncomingRequestsAsync(cancellationToken); await Task.WhenAll(responseTask, incomingTask); + + if (_transportDead && !cancellationToken.IsCancellationRequested) + { + _logger.LogWarning("Messaging transport died. Raising TransportDied event for reconnection."); + TransportDied?.Invoke(); + } } + /// + /// Raised when the messaging transport connection is permanently lost. + /// The should listen for this and reconnect. + /// + public event Action? TransportDied; + private async Task ProcessResponsesAsync(CancellationToken cancellationToken) { if (_responseQueue is null) return; + var consecutiveFailures = 0; + while (!cancellationToken.IsCancellationRequested) { try @@ -161,6 +182,8 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr new LeaseRequest { BatchSize = _options.BatchSize }, cancellationToken); + consecutiveFailures = 0; // Reset on success + foreach (var lease in leases) { try @@ -196,8 +219,20 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr } catch (Exception ex) { - _logger.LogError(ex, "Error in response processing loop"); - await Task.Delay(1000, cancellationToken); + consecutiveFailures++; + _logger.LogError(ex, "Error in response processing loop (failure {Count})", consecutiveFailures); + + if (consecutiveFailures >= MaxConsecutiveTransportFailures) + { + _logger.LogCritical( + "Response processing loop exceeded {Max} consecutive failures. " + + "Messaging transport is considered dead and will be reconnected.", + MaxConsecutiveTransportFailures); + _transportDead = true; + break; + } + + await Task.Delay(Math.Min(1000 * consecutiveFailures, 10_000), cancellationToken); } } } @@ -206,6 +241,8 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr { if (_serviceIncomingQueue is null) return; + var consecutiveFailures = 0; + while (!cancellationToken.IsCancellationRequested) { try @@ -214,6 +251,8 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr new LeaseRequest { BatchSize = _options.BatchSize }, cancellationToken); + consecutiveFailures = 0; // Reset on success + foreach (var lease in leases) { try @@ -243,8 +282,20 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr } catch (Exception ex) { - _logger.LogError(ex, "Error in incoming request processing loop"); - await Task.Delay(1000, cancellationToken); + consecutiveFailures++; + _logger.LogError(ex, "Error in incoming request processing loop (failure {Count})", consecutiveFailures); + + if (consecutiveFailures >= MaxConsecutiveTransportFailures) + { + _logger.LogCritical( + "Incoming request loop exceeded {Max} consecutive failures. " + + "Messaging transport is considered dead and will be reconnected.", + MaxConsecutiveTransportFailures); + _transportDead = true; + break; + } + + await Task.Delay(Math.Min(1000 * consecutiveFailures, 10_000), cancellationToken); } } }