From f4df1c1274d1828f92b3f19f927a038c385946e1 Mon Sep 17 00:00:00 2001 From: master <> Date: Thu, 2 Apr 2026 07:42:10 +0300 Subject: [PATCH] Fix Valkey Pub/Sub silent consumer death with 4-layer defense MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause: Known StackExchange.Redis bug — Pub/Sub subscriptions silently die without triggering ConnectionFailed (SE.Redis #1586, redis #7855). The consumer loop blocks forever on a dead subscription with _subscribed=true and no fallback poll. Layer 1 — Randomized fallback poll (safety net): QueueWaitTimeoutSeconds default changed from 0 (infinite) to 15. Actual wait is randomized between [15s, 30s] per iteration. 30 services × 1 poll per random(15-30s) = ~1.5 polls/sec (negligible). Even if Pub/Sub dies, consumers wake up via semaphore timeout. Layer 2 — Connection event hooks (reactive recovery): ConnectionFailed resets _subscribed=false + logs warning. ConnectionRestored resets _subscribed=false + releases semaphore to wake consumer immediately for re-subscription. Guards against duplicate event registration. Layer 3 — Proactive re-subscription timer (preemptive defense): After each successful subscribe, schedules a one-shot timer at random 5-15 minutes to force _subscribed=false. This preempts the known silent unsubscribe bug where ConnectionFailed never fires. Re-subscribe is cheap (one SUBSCRIBE command). Layer 4 — TCP keepalive + command timeouts (OS-level detection): KeepAlive=60s on StackExchange.Redis ConfigurationOptions. SyncTimeout=15s, AsyncTimeout=15s prevent hung commands. CorrelationTracker cleanup interval reduced from 30s to 5s. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../Options/ValkeyTransportOptions.cs | 15 ++--- .../ValkeyConnectionFactory.cs | 3 + .../ValkeyMessageQueue.cs | 57 +++++++++++++++++-- 3 files changed, 63 insertions(+), 12 deletions(-) diff --git a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/Options/ValkeyTransportOptions.cs b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/Options/ValkeyTransportOptions.cs index d843f9539..92a0ee4cc 100644 --- a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/Options/ValkeyTransportOptions.cs +++ b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/Options/ValkeyTransportOptions.cs @@ -40,17 +40,18 @@ public class ValkeyTransportOptions /// /// Gets or sets the queue wait timeout in seconds. - /// This is the maximum time a consumer loop will block between XREADGROUP polls - /// when no Valkey pub/sub notification arrives. + /// This is the base time a consumer loop will block between XREADGROUP polls + /// when no Valkey pub/sub notification arrives. The actual wait is randomized + /// between [base, 2×base] to spread load across services. /// - /// Default: 0 (pure event-driven mode — consumers only wake on pub/sub - /// notifications, no fallback polling). Set to a positive value (e.g. 5) - /// to add a safety-net poll interval for environments where pub/sub notifications - /// may be unreliable. + /// Default: 15 (safety-net poll every 15-30s). This defends against + /// a known StackExchange.Redis bug where Pub/Sub subscriptions silently die + /// without triggering ConnectionFailed (see: SE.Redis #1586, redis #7855). + /// Set to 0 for pure event-driven mode (NOT RECOMMENDED). /// /// /// Configurable via compose env var VALKEY_QUEUE_WAIT_TIMEOUT. /// /// - public int QueueWaitTimeoutSeconds { get; set; } = 0; + public int QueueWaitTimeoutSeconds { get; set; } = 15; } diff --git a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyConnectionFactory.cs b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyConnectionFactory.cs index 612a6274a..c397c562f 100644 --- a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyConnectionFactory.cs +++ b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyConnectionFactory.cs @@ -73,6 +73,9 @@ public sealed class ValkeyConnectionFactory : IAsyncDisposable config.AbortOnConnectFail = _options.AbortOnConnectFail; config.ConnectTimeout = (int)_options.InitializationTimeout.TotalMilliseconds; config.SyncTimeout = 15_000; // 15s — prevents commands from hanging indefinitely + // TCP keepalive: detect dead connections at the OS level. + // Workaround for redis/redis#7855 — clients don't detect lost connections. + config.KeepAlive = 60; // Send TCP keepalive every 60s config.AsyncTimeout = 15_000; // 15s — async command timeout config.ConnectRetry = _options.ConnectRetry; diff --git a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyMessageQueue.cs b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyMessageQueue.cs index 047252be3..a11712924 100644 --- a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyMessageQueue.cs +++ b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyMessageQueue.cs @@ -48,6 +48,8 @@ public sealed class ValkeyMessageQueue : IMessageQueue, INot private ChannelMessageQueue? _subscription; private volatile bool _subscribed; private volatile bool _groupInitialized; + private volatile bool _eventsHooked; + private ITimer? _resubscribeTimer; private bool _disposed; private string NotificationChannel => $"notify:q:{_queueOptions.QueueName}"; @@ -414,13 +416,15 @@ public sealed class ValkeyMessageQueue : IMessageQueue, INot { await EnsureNotificationSubscriptionAsync(cancellationToken).ConfigureAwait(false); - // Use the configured timeout from ValkeyTransportOptions if set, - // overriding the caller-provided timeout. - // QueueWaitTimeoutSeconds=0 means pure event-driven (no fallback poll). + // Use the configured timeout from ValkeyTransportOptions if set. + // Randomize between [base, 2×base] to spread load across 30+ services + // and avoid thundering herd on Valkey. + // This also defends against the known StackExchange.Redis Pub/Sub bug + // where subscriptions silently die (SE.Redis #1586, redis #7855). var configuredSeconds = _transportOptions.QueueWaitTimeoutSeconds; var effectiveTimeout = configuredSeconds <= 0 ? Timeout.InfiniteTimeSpan - : TimeSpan.FromSeconds(configuredSeconds); + : TimeSpan.FromSeconds(configuredSeconds + Random.Shared.Next(0, configuredSeconds)); // Wait for a pub/sub signal or timeout (fallback for missed notifications). try @@ -442,6 +446,17 @@ public sealed class ValkeyMessageQueue : IMessageQueue, INot { if (_subscribed) return; + // Hook connection lifecycle events (once) to detect silent Pub/Sub death. + // Known StackExchange.Redis issue: Pub/Sub subscriptions can silently die + // without triggering ConnectionFailed (SE.Redis #1586, redis #7855). + if (!_eventsHooked) + { + var connection = await _connectionFactory.GetConnectionAsync(cancellationToken).ConfigureAwait(false); + connection.ConnectionFailed += OnConnectionFailed; + connection.ConnectionRestored += OnConnectionRestored; + _eventsHooked = true; + } + var subscriber = await _connectionFactory.GetSubscriberAsync(cancellationToken).ConfigureAwait(false); _subscription = await subscriber.SubscribeAsync(RedisChannel.Literal(NotificationChannel)).ConfigureAwait(false); _subscription.OnMessage(_ => @@ -451,7 +466,22 @@ public sealed class ValkeyMessageQueue : IMessageQueue, INot }); _subscribed = true; - _logger?.LogDebug("Subscribed to queue notifications on channel {Channel}", NotificationChannel); + // Schedule proactive re-subscription in 5-15 minutes (randomized jitter). + // This preempts the known silent unsubscribe bug where ConnectionFailed + // never fires but the OnMessage handler stops receiving notifications. + var resubDelayMinutes = 5 + Random.Shared.Next(0, 11); // 5-15 min + _resubscribeTimer?.Dispose(); + _resubscribeTimer = _timeProvider.CreateTimer(_ => + { + _subscribed = false; + _logger?.LogInformation( + "Proactive Pub/Sub re-subscription triggered after {Minutes}min jitter on channel {Channel}", + resubDelayMinutes, NotificationChannel); + try { _notificationSignal.Release(); } + catch (SemaphoreFullException) { /* already signaled */ } + }, null, TimeSpan.FromMinutes(resubDelayMinutes), Timeout.InfiniteTimeSpan); + + _logger?.LogDebug("Subscribed to queue notifications on channel {Channel} (re-sub in {Minutes}min)", NotificationChannel, resubDelayMinutes); } finally { @@ -459,6 +489,21 @@ public sealed class ValkeyMessageQueue : IMessageQueue, INot } } + private void OnConnectionFailed(object? sender, ConnectionFailedEventArgs e) + { + _subscribed = false; + _logger?.LogWarning("Valkey connection failed ({FailureType}) on {EndPoint} — will re-subscribe on next poll", + e.FailureType, e.EndPoint); + } + + private void OnConnectionRestored(object? sender, ConnectionFailedEventArgs e) + { + _subscribed = false; + _logger?.LogInformation("Valkey connection restored on {EndPoint} — forcing re-subscribe", e.EndPoint); + try { _notificationSignal.Release(); } + catch (SemaphoreFullException) { /* already signaled */ } + } + #pragma warning disable CS1998 public async ValueTask DisposeAsync() { @@ -469,6 +514,8 @@ public sealed class ValkeyMessageQueue : IMessageQueue, INot _disposed = true; + _resubscribeTimer?.Dispose(); + if (_subscription is not null) { try { _subscription.Unsubscribe(); }