diff --git a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/Options/ValkeyTransportOptions.cs b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/Options/ValkeyTransportOptions.cs index 92a0ee4cc..6aa27d7aa 100644 --- a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/Options/ValkeyTransportOptions.cs +++ b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/Options/ValkeyTransportOptions.cs @@ -53,5 +53,5 @@ public class ValkeyTransportOptions /// Configurable via compose env var VALKEY_QUEUE_WAIT_TIMEOUT. /// /// - public int QueueWaitTimeoutSeconds { get; set; } = 15; + public int QueueWaitTimeoutSeconds { get; set; } = 5; } diff --git a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyMessageQueue.cs b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyMessageQueue.cs index a11712924..aa7be1c9e 100644 --- a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyMessageQueue.cs +++ b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyMessageQueue.cs @@ -160,14 +160,30 @@ public sealed class ValkeyMessageQueue : IMessageQueue, INot } else { - // Read new messages + // Always drain pending entries first, then read new. + // This is critical: when the Pub/Sub consumer was stuck (known + // StackExchange.Redis bug — SE.Redis #1586), messages accumulated + // in the pending entries list (PEL). XREADGROUP with ">" skips them. + // Reading pending ("0") first ensures no messages are lost after recovery. entries = await db.StreamReadGroupAsync( _queueOptions.QueueName, _queueOptions.ConsumerGroup, consumer, - position: ">", + position: "0", count: request.BatchSize) .ConfigureAwait(false); + + // If no pending entries, read new messages + if (entries is null || entries.Length == 0) + { + entries = await db.StreamReadGroupAsync( + _queueOptions.QueueName, + _queueOptions.ConsumerGroup, + consumer, + position: ">", + count: request.BatchSize) + .ConfigureAwait(false); + } } if (entries is null || entries.Length == 0)