diff --git a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/Options/ValkeyTransportOptions.cs b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/Options/ValkeyTransportOptions.cs index d843f9539..92a0ee4cc 100644 --- a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/Options/ValkeyTransportOptions.cs +++ b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/Options/ValkeyTransportOptions.cs @@ -40,17 +40,18 @@ public class ValkeyTransportOptions /// /// Gets or sets the queue wait timeout in seconds. - /// This is the maximum time a consumer loop will block between XREADGROUP polls - /// when no Valkey pub/sub notification arrives. + /// This is the base time a consumer loop will block between XREADGROUP polls + /// when no Valkey pub/sub notification arrives. The actual wait is randomized + /// between [base, 2×base] to spread load across services. /// - /// Default: 0 (pure event-driven mode — consumers only wake on pub/sub - /// notifications, no fallback polling). Set to a positive value (e.g. 5) - /// to add a safety-net poll interval for environments where pub/sub notifications - /// may be unreliable. + /// Default: 15 (safety-net poll every 15-30s). This defends against + /// a known StackExchange.Redis bug where Pub/Sub subscriptions silently die + /// without triggering ConnectionFailed (see: SE.Redis #1586, redis #7855). + /// Set to 0 for pure event-driven mode (NOT RECOMMENDED). /// /// /// Configurable via compose env var VALKEY_QUEUE_WAIT_TIMEOUT. /// /// - public int QueueWaitTimeoutSeconds { get; set; } = 0; + public int QueueWaitTimeoutSeconds { get; set; } = 15; } diff --git a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyConnectionFactory.cs b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyConnectionFactory.cs index 612a6274a..c397c562f 100644 --- a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyConnectionFactory.cs +++ b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyConnectionFactory.cs @@ -73,6 +73,9 @@ public sealed class ValkeyConnectionFactory : IAsyncDisposable config.AbortOnConnectFail = _options.AbortOnConnectFail; config.ConnectTimeout = (int)_options.InitializationTimeout.TotalMilliseconds; config.SyncTimeout = 15_000; // 15s — prevents commands from hanging indefinitely + // TCP keepalive: detect dead connections at the OS level. + // Workaround for redis/redis#7855 — clients don't detect lost connections. + config.KeepAlive = 60; // Send TCP keepalive every 60s config.AsyncTimeout = 15_000; // 15s — async command timeout config.ConnectRetry = _options.ConnectRetry; diff --git a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyMessageQueue.cs b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyMessageQueue.cs index 047252be3..a11712924 100644 --- a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyMessageQueue.cs +++ b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyMessageQueue.cs @@ -48,6 +48,8 @@ public sealed class ValkeyMessageQueue : IMessageQueue, INot private ChannelMessageQueue? _subscription; private volatile bool _subscribed; private volatile bool _groupInitialized; + private volatile bool _eventsHooked; + private ITimer? _resubscribeTimer; private bool _disposed; private string NotificationChannel => $"notify:q:{_queueOptions.QueueName}"; @@ -414,13 +416,15 @@ public sealed class ValkeyMessageQueue : IMessageQueue, INot { await EnsureNotificationSubscriptionAsync(cancellationToken).ConfigureAwait(false); - // Use the configured timeout from ValkeyTransportOptions if set, - // overriding the caller-provided timeout. - // QueueWaitTimeoutSeconds=0 means pure event-driven (no fallback poll). + // Use the configured timeout from ValkeyTransportOptions if set. + // Randomize between [base, 2×base] to spread load across 30+ services + // and avoid thundering herd on Valkey. + // This also defends against the known StackExchange.Redis Pub/Sub bug + // where subscriptions silently die (SE.Redis #1586, redis #7855). var configuredSeconds = _transportOptions.QueueWaitTimeoutSeconds; var effectiveTimeout = configuredSeconds <= 0 ? Timeout.InfiniteTimeSpan - : TimeSpan.FromSeconds(configuredSeconds); + : TimeSpan.FromSeconds(configuredSeconds + Random.Shared.Next(0, configuredSeconds)); // Wait for a pub/sub signal or timeout (fallback for missed notifications). try @@ -442,6 +446,17 @@ public sealed class ValkeyMessageQueue : IMessageQueue, INot { if (_subscribed) return; + // Hook connection lifecycle events (once) to detect silent Pub/Sub death. + // Known StackExchange.Redis issue: Pub/Sub subscriptions can silently die + // without triggering ConnectionFailed (SE.Redis #1586, redis #7855). + if (!_eventsHooked) + { + var connection = await _connectionFactory.GetConnectionAsync(cancellationToken).ConfigureAwait(false); + connection.ConnectionFailed += OnConnectionFailed; + connection.ConnectionRestored += OnConnectionRestored; + _eventsHooked = true; + } + var subscriber = await _connectionFactory.GetSubscriberAsync(cancellationToken).ConfigureAwait(false); _subscription = await subscriber.SubscribeAsync(RedisChannel.Literal(NotificationChannel)).ConfigureAwait(false); _subscription.OnMessage(_ => @@ -451,7 +466,22 @@ public sealed class ValkeyMessageQueue : IMessageQueue, INot }); _subscribed = true; - _logger?.LogDebug("Subscribed to queue notifications on channel {Channel}", NotificationChannel); + // Schedule proactive re-subscription in 5-15 minutes (randomized jitter). + // This preempts the known silent unsubscribe bug where ConnectionFailed + // never fires but the OnMessage handler stops receiving notifications. + var resubDelayMinutes = 5 + Random.Shared.Next(0, 11); // 5-15 min + _resubscribeTimer?.Dispose(); + _resubscribeTimer = _timeProvider.CreateTimer(_ => + { + _subscribed = false; + _logger?.LogInformation( + "Proactive Pub/Sub re-subscription triggered after {Minutes}min jitter on channel {Channel}", + resubDelayMinutes, NotificationChannel); + try { _notificationSignal.Release(); } + catch (SemaphoreFullException) { /* already signaled */ } + }, null, TimeSpan.FromMinutes(resubDelayMinutes), Timeout.InfiniteTimeSpan); + + _logger?.LogDebug("Subscribed to queue notifications on channel {Channel} (re-sub in {Minutes}min)", NotificationChannel, resubDelayMinutes); } finally { @@ -459,6 +489,21 @@ public sealed class ValkeyMessageQueue : IMessageQueue, INot } } + private void OnConnectionFailed(object? sender, ConnectionFailedEventArgs e) + { + _subscribed = false; + _logger?.LogWarning("Valkey connection failed ({FailureType}) on {EndPoint} — will re-subscribe on next poll", + e.FailureType, e.EndPoint); + } + + private void OnConnectionRestored(object? sender, ConnectionFailedEventArgs e) + { + _subscribed = false; + _logger?.LogInformation("Valkey connection restored on {EndPoint} — forcing re-subscribe", e.EndPoint); + try { _notificationSignal.Release(); } + catch (SemaphoreFullException) { /* already signaled */ } + } + #pragma warning disable CS1998 public async ValueTask DisposeAsync() { @@ -469,6 +514,8 @@ public sealed class ValkeyMessageQueue : IMessageQueue, INot _disposed = true; + _resubscribeTimer?.Dispose(); + if (_subscription is not null) { try { _subscription.Unsubscribe(); }