From b9b2ac8b98aac905e4df4b9d65b83b330781176e Mon Sep 17 00:00:00 2001 From: master <> Date: Thu, 2 Apr 2026 08:45:38 +0300 Subject: [PATCH] Drain pending entries before reading new in XREADGROUP consumer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause of messages lost after Pub/Sub recovery: XREADGROUP with position ">" only reads NEW messages. When the consumer was stuck (Pub/Sub dead), messages accumulated in the pending entries list (PEL) but were never acknowledged. After re-subscription, the consumer resumed with ">" and skipped all pending entries. Fix: Always read pending entries (position "0") first. If none pending, then read new (position ">"). This is the standard Redis Streams pattern for reliable consumption — ensures no messages are lost even after consumer failures. This explains why /canonical worked but /advisory-sources didn't: /canonical requests were made AFTER the consumer recovered (new), while /advisory-sources requests were made DURING the dead window (pending). Co-Authored-By: Claude Opus 4.6 (1M context) --- .../Options/ValkeyTransportOptions.cs | 2 +- .../ValkeyMessageQueue.cs | 20 +++++++++++++++++-- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/Options/ValkeyTransportOptions.cs b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/Options/ValkeyTransportOptions.cs index 92a0ee4cc..6aa27d7aa 100644 --- a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/Options/ValkeyTransportOptions.cs +++ b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/Options/ValkeyTransportOptions.cs @@ -53,5 +53,5 @@ public class ValkeyTransportOptions /// Configurable via compose env var VALKEY_QUEUE_WAIT_TIMEOUT. /// /// - public int QueueWaitTimeoutSeconds { get; set; } = 15; + public int QueueWaitTimeoutSeconds { get; set; } = 5; } diff --git a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyMessageQueue.cs b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyMessageQueue.cs index a11712924..aa7be1c9e 100644 --- a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyMessageQueue.cs +++ b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyMessageQueue.cs @@ -160,14 +160,30 @@ public sealed class ValkeyMessageQueue : IMessageQueue, INot } else { - // Read new messages + // Always drain pending entries first, then read new. + // This is critical: when the Pub/Sub consumer was stuck (known + // StackExchange.Redis bug — SE.Redis #1586), messages accumulated + // in the pending entries list (PEL). XREADGROUP with ">" skips them. + // Reading pending ("0") first ensures no messages are lost after recovery. entries = await db.StreamReadGroupAsync( _queueOptions.QueueName, _queueOptions.ConsumerGroup, consumer, - position: ">", + position: "0", count: request.BatchSize) .ConfigureAwait(false); + + // If no pending entries, read new messages + if (entries is null || entries.Length == 0) + { + entries = await db.StreamReadGroupAsync( + _queueOptions.QueueName, + _queueOptions.ConsumerGroup, + consumer, + position: ">", + count: request.BatchSize) + .ConfigureAwait(false); + } } if (entries is null || entries.Length == 0)