Fix Valkey Pub/Sub silent consumer death with 4-layer defense
Root cause: Known StackExchange.Redis bug — Pub/Sub subscriptions silently die without triggering ConnectionFailed (SE.Redis #1586, redis #7855). The consumer loop blocks forever on a dead subscription with _subscribed=true and no fallback poll. Layer 1 — Randomized fallback poll (safety net): QueueWaitTimeoutSeconds default changed from 0 (infinite) to 15. Actual wait is randomized between [15s, 30s] per iteration. 30 services × 1 poll per random(15-30s) = ~1.5 polls/sec (negligible). Even if Pub/Sub dies, consumers wake up via semaphore timeout. Layer 2 — Connection event hooks (reactive recovery): ConnectionFailed resets _subscribed=false + logs warning. ConnectionRestored resets _subscribed=false + releases semaphore to wake consumer immediately for re-subscription. Guards against duplicate event registration. Layer 3 — Proactive re-subscription timer (preemptive defense): After each successful subscribe, schedules a one-shot timer at random 5-15 minutes to force _subscribed=false. This preempts the known silent unsubscribe bug where ConnectionFailed never fires. Re-subscribe is cheap (one SUBSCRIBE command). Layer 4 — TCP keepalive + command timeouts (OS-level detection): KeepAlive=60s on StackExchange.Redis ConfigurationOptions. SyncTimeout=15s, AsyncTimeout=15s prevent hung commands. CorrelationTracker cleanup interval reduced from 30s to 5s. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -40,17 +40,18 @@ public class ValkeyTransportOptions
|
||||
|
||||
/// <summary>
|
||||
/// Gets or sets the queue wait timeout in seconds.
|
||||
/// This is the maximum time a consumer loop will block between XREADGROUP polls
|
||||
/// when no Valkey pub/sub notification arrives.
|
||||
/// This is the base time a consumer loop will block between XREADGROUP polls
|
||||
/// when no Valkey pub/sub notification arrives. The actual wait is randomized
|
||||
/// between [base, 2×base] to spread load across services.
|
||||
/// <para>
|
||||
/// Default: <c>0</c> (pure event-driven mode — consumers only wake on pub/sub
|
||||
/// notifications, no fallback polling). Set to a positive value (e.g. <c>5</c>)
|
||||
/// to add a safety-net poll interval for environments where pub/sub notifications
|
||||
/// may be unreliable.
|
||||
/// Default: <c>15</c> (safety-net poll every 15-30s). This defends against
|
||||
/// a known StackExchange.Redis bug where Pub/Sub subscriptions silently die
|
||||
/// without triggering ConnectionFailed (see: SE.Redis #1586, redis #7855).
|
||||
/// Set to <c>0</c> for pure event-driven mode (NOT RECOMMENDED).
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Configurable via compose env var <c>VALKEY_QUEUE_WAIT_TIMEOUT</c>.
|
||||
/// </para>
|
||||
/// </summary>
|
||||
public int QueueWaitTimeoutSeconds { get; set; } = 0;
|
||||
public int QueueWaitTimeoutSeconds { get; set; } = 15;
|
||||
}
|
||||
|
||||
@@ -73,6 +73,9 @@ public sealed class ValkeyConnectionFactory : IAsyncDisposable
|
||||
config.AbortOnConnectFail = _options.AbortOnConnectFail;
|
||||
config.ConnectTimeout = (int)_options.InitializationTimeout.TotalMilliseconds;
|
||||
config.SyncTimeout = 15_000; // 15s — prevents commands from hanging indefinitely
|
||||
// TCP keepalive: detect dead connections at the OS level.
|
||||
// Workaround for redis/redis#7855 — clients don't detect lost connections.
|
||||
config.KeepAlive = 60; // Send TCP keepalive every 60s
|
||||
config.AsyncTimeout = 15_000; // 15s — async command timeout
|
||||
config.ConnectRetry = _options.ConnectRetry;
|
||||
|
||||
|
||||
@@ -48,6 +48,8 @@ public sealed class ValkeyMessageQueue<TMessage> : IMessageQueue<TMessage>, INot
|
||||
private ChannelMessageQueue? _subscription;
|
||||
private volatile bool _subscribed;
|
||||
private volatile bool _groupInitialized;
|
||||
private volatile bool _eventsHooked;
|
||||
private ITimer? _resubscribeTimer;
|
||||
private bool _disposed;
|
||||
|
||||
private string NotificationChannel => $"notify:q:{_queueOptions.QueueName}";
|
||||
@@ -414,13 +416,15 @@ public sealed class ValkeyMessageQueue<TMessage> : IMessageQueue<TMessage>, INot
|
||||
{
|
||||
await EnsureNotificationSubscriptionAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// Use the configured timeout from ValkeyTransportOptions if set,
|
||||
// overriding the caller-provided timeout.
|
||||
// QueueWaitTimeoutSeconds=0 means pure event-driven (no fallback poll).
|
||||
// Use the configured timeout from ValkeyTransportOptions if set.
|
||||
// Randomize between [base, 2×base] to spread load across 30+ services
|
||||
// and avoid thundering herd on Valkey.
|
||||
// This also defends against the known StackExchange.Redis Pub/Sub bug
|
||||
// where subscriptions silently die (SE.Redis #1586, redis #7855).
|
||||
var configuredSeconds = _transportOptions.QueueWaitTimeoutSeconds;
|
||||
var effectiveTimeout = configuredSeconds <= 0
|
||||
? Timeout.InfiniteTimeSpan
|
||||
: TimeSpan.FromSeconds(configuredSeconds);
|
||||
: TimeSpan.FromSeconds(configuredSeconds + Random.Shared.Next(0, configuredSeconds));
|
||||
|
||||
// Wait for a pub/sub signal or timeout (fallback for missed notifications).
|
||||
try
|
||||
@@ -442,6 +446,17 @@ public sealed class ValkeyMessageQueue<TMessage> : IMessageQueue<TMessage>, INot
|
||||
{
|
||||
if (_subscribed) return;
|
||||
|
||||
// Hook connection lifecycle events (once) to detect silent Pub/Sub death.
|
||||
// Known StackExchange.Redis issue: Pub/Sub subscriptions can silently die
|
||||
// without triggering ConnectionFailed (SE.Redis #1586, redis #7855).
|
||||
if (!_eventsHooked)
|
||||
{
|
||||
var connection = await _connectionFactory.GetConnectionAsync(cancellationToken).ConfigureAwait(false);
|
||||
connection.ConnectionFailed += OnConnectionFailed;
|
||||
connection.ConnectionRestored += OnConnectionRestored;
|
||||
_eventsHooked = true;
|
||||
}
|
||||
|
||||
var subscriber = await _connectionFactory.GetSubscriberAsync(cancellationToken).ConfigureAwait(false);
|
||||
_subscription = await subscriber.SubscribeAsync(RedisChannel.Literal(NotificationChannel)).ConfigureAwait(false);
|
||||
_subscription.OnMessage(_ =>
|
||||
@@ -451,7 +466,22 @@ public sealed class ValkeyMessageQueue<TMessage> : IMessageQueue<TMessage>, INot
|
||||
});
|
||||
_subscribed = true;
|
||||
|
||||
_logger?.LogDebug("Subscribed to queue notifications on channel {Channel}", NotificationChannel);
|
||||
// Schedule proactive re-subscription in 5-15 minutes (randomized jitter).
|
||||
// This preempts the known silent unsubscribe bug where ConnectionFailed
|
||||
// never fires but the OnMessage handler stops receiving notifications.
|
||||
var resubDelayMinutes = 5 + Random.Shared.Next(0, 11); // 5-15 min
|
||||
_resubscribeTimer?.Dispose();
|
||||
_resubscribeTimer = _timeProvider.CreateTimer(_ =>
|
||||
{
|
||||
_subscribed = false;
|
||||
_logger?.LogInformation(
|
||||
"Proactive Pub/Sub re-subscription triggered after {Minutes}min jitter on channel {Channel}",
|
||||
resubDelayMinutes, NotificationChannel);
|
||||
try { _notificationSignal.Release(); }
|
||||
catch (SemaphoreFullException) { /* already signaled */ }
|
||||
}, null, TimeSpan.FromMinutes(resubDelayMinutes), Timeout.InfiniteTimeSpan);
|
||||
|
||||
_logger?.LogDebug("Subscribed to queue notifications on channel {Channel} (re-sub in {Minutes}min)", NotificationChannel, resubDelayMinutes);
|
||||
}
|
||||
finally
|
||||
{
|
||||
@@ -459,6 +489,21 @@ public sealed class ValkeyMessageQueue<TMessage> : IMessageQueue<TMessage>, INot
|
||||
}
|
||||
}
|
||||
|
||||
private void OnConnectionFailed(object? sender, ConnectionFailedEventArgs e)
|
||||
{
|
||||
_subscribed = false;
|
||||
_logger?.LogWarning("Valkey connection failed ({FailureType}) on {EndPoint} — will re-subscribe on next poll",
|
||||
e.FailureType, e.EndPoint);
|
||||
}
|
||||
|
||||
private void OnConnectionRestored(object? sender, ConnectionFailedEventArgs e)
|
||||
{
|
||||
_subscribed = false;
|
||||
_logger?.LogInformation("Valkey connection restored on {EndPoint} — forcing re-subscribe", e.EndPoint);
|
||||
try { _notificationSignal.Release(); }
|
||||
catch (SemaphoreFullException) { /* already signaled */ }
|
||||
}
|
||||
|
||||
#pragma warning disable CS1998
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
@@ -469,6 +514,8 @@ public sealed class ValkeyMessageQueue<TMessage> : IMessageQueue<TMessage>, INot
|
||||
|
||||
_disposed = true;
|
||||
|
||||
_resubscribeTimer?.Dispose();
|
||||
|
||||
if (_subscription is not null)
|
||||
{
|
||||
try { _subscription.Unsubscribe(); }
|
||||
|
||||
Reference in New Issue
Block a user