Drain pending entries before reading new in XREADGROUP consumer

Root cause of messages lost after Pub/Sub recovery: XREADGROUP with
position ">" only reads NEW messages. When the consumer was stuck
(Pub/Sub dead), messages accumulated in the pending entries list (PEL)
but were never acknowledged. After re-subscription, the consumer
resumed with ">" and skipped all pending entries.

Fix: Always read pending entries (position "0") first. If none pending,
then read new (position ">"). This is the standard Redis Streams
pattern for reliable consumption — ensures no messages are lost even
after consumer failures.

This explains why /canonical worked but /advisory-sources didn't:
/canonical requests were made AFTER the consumer recovered (new), while
/advisory-sources requests were made DURING the dead window (pending).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
master
2026-04-02 08:45:38 +03:00
parent dc4d69c6be
commit b9b2ac8b98
2 changed files with 19 additions and 3 deletions

View File

@@ -53,5 +53,5 @@ public class ValkeyTransportOptions
/// Configurable via compose env var <c>VALKEY_QUEUE_WAIT_TIMEOUT</c>.
/// </para>
/// </summary>
public int QueueWaitTimeoutSeconds { get; set; } = 15;
public int QueueWaitTimeoutSeconds { get; set; } = 5;
}

View File

@@ -160,14 +160,30 @@ public sealed class ValkeyMessageQueue<TMessage> : IMessageQueue<TMessage>, INot
}
else
{
// Read new messages
// Always drain pending entries first, then read new.
// This is critical: when the Pub/Sub consumer was stuck (known
// StackExchange.Redis bug — SE.Redis #1586), messages accumulated
// in the pending entries list (PEL). XREADGROUP with ">" skips them.
// Reading pending ("0") first ensures no messages are lost after recovery.
entries = await db.StreamReadGroupAsync(
_queueOptions.QueueName,
_queueOptions.ConsumerGroup,
consumer,
position: ">",
position: "0",
count: request.BatchSize)
.ConfigureAwait(false);
// If no pending entries, read new messages
if (entries is null || entries.Length == 0)
{
entries = await db.StreamReadGroupAsync(
_queueOptions.QueueName,
_queueOptions.ConsumerGroup,
consumer,
position: ">",
count: request.BatchSize)
.ConfigureAwait(false);
}
}
if (entries is null || entries.Length == 0)