diff --git a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyConnectionFactory.cs b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyConnectionFactory.cs index e67773b3b..612a6274a 100644 --- a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyConnectionFactory.cs +++ b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyConnectionFactory.cs @@ -44,7 +44,18 @@ public sealed class ValkeyConnectionFactory : IAsyncDisposable { if (_connection is not null && _connection.IsConnected) { - return _connection; + // Verify the connection can actually execute commands (not zombie). + // IsConnected only checks the socket, not command execution ability. + try + { + await _connection.GetDatabase().PingAsync().ConfigureAwait(false); + return _connection; + } + catch + { + _logger?.LogWarning("Valkey connection PING failed — reconnecting"); + // Fall through to reconnect + } } await _connectionLock.WaitAsync(cancellationToken).ConfigureAwait(false); @@ -61,6 +72,8 @@ public sealed class ValkeyConnectionFactory : IAsyncDisposable var config = ConfigurationOptions.Parse(_options.ConnectionString); config.AbortOnConnectFail = _options.AbortOnConnectFail; config.ConnectTimeout = (int)_options.InitializationTimeout.TotalMilliseconds; + config.SyncTimeout = 15_000; // 15s — prevents commands from hanging indefinitely + config.AsyncTimeout = 15_000; // 15s — async command timeout config.ConnectRetry = _options.ConnectRetry; if (_options.Database.HasValue) diff --git a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/Protocol/CorrelationTracker.cs b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/Protocol/CorrelationTracker.cs index 2f97ce3b9..5598fe551 100644 --- a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/Protocol/CorrelationTracker.cs +++ b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/Protocol/CorrelationTracker.cs @@ -20,8 +20,9 @@ public sealed class CorrelationTracker : IDisposable public CorrelationTracker(TimeProvider? timeProvider = null) { _timeProvider = timeProvider ?? TimeProvider.System; - // Cleanup expired requests every 30 seconds - _cleanupTimer = _timeProvider.CreateTimer(CleanupExpiredRequests, null, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30)); + // Cleanup expired requests every 5 seconds (was 30s — too slow under sustained load, + // causing pending request dictionary to grow and degrade lookup performance) + _cleanupTimer = _timeProvider.CreateTimer(CleanupExpiredRequests, null, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5)); } ///