diff --git a/src/Router/__Tests/StellaOps.Messaging.Transport.Valkey.Tests/ValkeyPubSubResilienceTests.cs b/src/Router/__Tests/StellaOps.Messaging.Transport.Valkey.Tests/ValkeyPubSubResilienceTests.cs new file mode 100644 index 000000000..d47e4357a --- /dev/null +++ b/src/Router/__Tests/StellaOps.Messaging.Transport.Valkey.Tests/ValkeyPubSubResilienceTests.cs @@ -0,0 +1,645 @@ +// ----------------------------------------------------------------------------- +// ValkeyPubSubResilienceTests.cs +// Regression tests for Valkey Pub/Sub resilience. +// Prevents the silent consumer death bug from recurring. +// Validates: fallback polling, XAUTOCLAIM recovery, pending-first drain, +// container restart recovery, sustained throughput, subscription +// state reset, and multi-consumer fair distribution. +// ----------------------------------------------------------------------------- + +using System.Collections.Concurrent; +using System.Diagnostics; +using FluentAssertions; +using StellaOps.Messaging; +using StellaOps.Messaging.Abstractions; +using StellaOps.Messaging.Transport.Valkey.Tests.Fixtures; +using StackExchange.Redis; +using Xunit; + +namespace StellaOps.Messaging.Transport.Valkey.Tests; + +/// +/// Pub/Sub resilience regression tests for Valkey transport. +/// These tests guard against the silent consumer death bug where Pub/Sub +/// subscriptions die without triggering ConnectionFailed (SE.Redis #1586, +/// redis #7855). The transport must recover via fallback polling, XAUTOCLAIM, +/// and subscription state resets. +/// +[Collection(ValkeyIntegrationTestCollection.Name)] +public sealed class ValkeyPubSubResilienceTests : IAsyncLifetime +{ + private readonly ValkeyContainerFixture _fixture; + private readonly ITestOutputHelper _output; + private ValkeyConnectionFactory? _connectionFactory; + + public ValkeyPubSubResilienceTests(ValkeyContainerFixture fixture, ITestOutputHelper output) + { + _fixture = fixture; + _output = output; + } + + public ValueTask InitializeAsync() + { + _connectionFactory = _fixture.CreateConnectionFactory(); + return ValueTask.CompletedTask; + } + + public async ValueTask DisposeAsync() + { + if (_connectionFactory is not null) + { + await _connectionFactory.DisposeAsync(); + } + } + + #region Test Message Type + + public record TestMessage(string Id, string Content, DateTimeOffset CreatedAt); + + #endregion + + #region Test 1: Fallback Poll Delivers Messages When Pub/Sub Not Fired + + [ValkeyIntegrationFact] + public async Task FallbackPollDeliversMessagesWhenPubSubNotFired() + { + // Arrange — use a fast poll timeout so the fallback fires quickly. + var transportOptions = _fixture.CreateOptions(); + transportOptions.QueueWaitTimeoutSeconds = 2; + + var queueOptions = _fixture.CreateQueueOptions(); + var queue = new ValkeyMessageQueue( + _connectionFactory!, + queueOptions, + transportOptions, + _fixture.GetLogger>()); + + var original = new TestMessage(Guid.NewGuid().ToString(), "bypass-pubsub", DateTimeOffset.UtcNow); + + // Enqueue directly via StreamAddAsync to bypass Pub/Sub notification. + var db = await _connectionFactory!.GetDatabaseAsync(); + var payload = System.Text.Json.JsonSerializer.Serialize(original); + var enqueuedAt = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + + // Create the consumer group first by leasing (which triggers group creation). + _ = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 }); + + await db.StreamAddAsync( + queueOptions.QueueName, + [ + new NameValueEntry("payload", payload), + new NameValueEntry("attempt", 1), + new NameValueEntry("enq_at", enqueuedAt) + ]); + + // Act — WaitForNotificationAsync should return after the timeout (not Pub/Sub signal). + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + var sw = Stopwatch.StartNew(); + await queue.WaitForNotificationAsync(TimeSpan.FromSeconds(10), cts.Token); + sw.Stop(); + + // The wait should have returned via timeout (2-6s range due to jitter [base, 2*base]). + sw.Elapsed.Should().BeGreaterThan(TimeSpan.FromSeconds(1), + "fallback poll should not return instantly — it waits for the configured timeout"); + sw.Elapsed.Should().BeLessThan(TimeSpan.FromSeconds(10), + "fallback poll should return before the outer timeout"); + + // Now lease the message that was injected directly. + var leases = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 }); + + // Assert + leases.Should().HaveCount(1, "the fallback poll should enable leasing the directly-injected message"); + leases[0].Message.Id.Should().Be(original.Id); + leases[0].Message.Content.Should().Be(original.Content); + + await leases[0].AcknowledgeAsync(); + await queue.DisposeAsync(); + + _output.WriteLine($"Fallback poll returned after {sw.Elapsed.TotalSeconds:F1}s (expected 2-6s range)"); + } + + #endregion + + #region Test 2: XAUTOCLAIM Recovers Messages From Dead Consumers + + [ValkeyIntegrationFact] + public async Task XAutoClaimRecoversMessagesFromDeadConsumers() + { + // Arrange — create a queue with consumer name "old-consumer". + var queueOptions = _fixture.CreateQueueOptions(consumerName: "old-consumer"); + var transportOptions = _fixture.CreateOptions(); + + var oldQueue = new ValkeyMessageQueue( + _connectionFactory!, + queueOptions, + transportOptions, + _fixture.GetLogger>()); + + var messages = Enumerable.Range(1, 3) + .Select(i => new TestMessage($"msg-{i}", $"Content-{i}", DateTimeOffset.UtcNow)) + .ToList(); + + // Enqueue 3 messages via the old consumer's queue. + foreach (var msg in messages) + { + await oldQueue.EnqueueAsync(msg); + } + + // Lease all 3 (assigned to old-consumer's pending list). + var oldLeases = await oldQueue.LeaseAsync(new LeaseRequest { BatchSize = 3 }); + oldLeases.Should().HaveCount(3, "all 3 messages should be leased by old-consumer"); + + // Do NOT acknowledge them — they stay pending. + await oldQueue.DisposeAsync(); + + // Wait 35 seconds to exceed the 30s XAUTOCLAIM idle threshold. + _output.WriteLine("Waiting 35s for XAUTOCLAIM idle threshold..."); + await Task.Delay(TimeSpan.FromSeconds(35)); + + // Act — create a NEW queue instance with consumer name "new-consumer" (same group). + var newQueueOptions = _fixture.CreateQueueOptions( + queueName: queueOptions.QueueName, + consumerGroup: queueOptions.ConsumerGroup, + consumerName: "new-consumer"); + + // Create a fresh connection factory to simulate a different consumer process. + await using var newConnectionFactory = _fixture.CreateConnectionFactory(); + var newQueue = new ValkeyMessageQueue( + newConnectionFactory, + newQueueOptions, + transportOptions, + _fixture.GetLogger>()); + + // LeaseAsync should XAUTOCLAIM the 3 idle entries. + var newLeases = await newQueue.LeaseAsync(new LeaseRequest { BatchSize = 10 }); + + // Assert + newLeases.Should().HaveCount(3, "XAUTOCLAIM should recover all 3 idle messages from the dead consumer"); + + var recoveredIds = newLeases.Select(l => l.Message.Id).OrderBy(id => id).ToList(); + var originalIds = messages.Select(m => m.Id).OrderBy(id => id).ToList(); + recoveredIds.Should().BeEquivalentTo(originalIds, "all original message IDs should be recovered"); + + // Cleanup + foreach (var lease in newLeases) + { + await lease.AcknowledgeAsync(); + } + + await newQueue.DisposeAsync(); + + _output.WriteLine("XAUTOCLAIM recovery test passed — all 3 messages recovered by new consumer"); + } + + #endregion + + #region Test 3: Pending-First Read Drains Pending Before New + + [ValkeyIntegrationFact] + public async Task PendingFirstReadDrainsPendingBeforeNew() + { + // Arrange + var queueOptions = _fixture.CreateQueueOptions(); + var transportOptions = _fixture.CreateOptions(); + var queue = new ValkeyMessageQueue( + _connectionFactory!, + queueOptions, + transportOptions, + _fixture.GetLogger>()); + + var messageA = new TestMessage("msg-A", "Message A", DateTimeOffset.UtcNow); + var messageB = new TestMessage("msg-B", "Message B", DateTimeOffset.UtcNow); + + // Enqueue message A. + await queue.EnqueueAsync(messageA); + + // Lease A — it's now pending, assigned to this consumer. + var leaseA = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 }); + leaseA.Should().HaveCount(1); + leaseA[0].Message.Id.Should().Be("msg-A"); + + // Do NOT ACK A. + + // Enqueue message B. + await queue.EnqueueAsync(messageB); + + // Act — call LeaseAsync (non-pending mode). LeaseAsync drains pending first. + var leaseA2 = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 }); + + // Assert — message A is returned (pending first). + leaseA2.Should().HaveCount(1, "pending messages should be returned before new ones"); + leaseA2[0].Message.Id.Should().Be("msg-A", "pending message A should be returned first"); + + // ACK A. + await leaseA2[0].AcknowledgeAsync(); + + // Lease again — now B should come. + var leaseB = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 }); + leaseB.Should().HaveCount(1, "message B should be available after A is acknowledged"); + leaseB[0].Message.Id.Should().Be("msg-B", "new message B should be returned after pending is drained"); + + await leaseB[0].AcknowledgeAsync(); + await queue.DisposeAsync(); + + _output.WriteLine("Pending-first drain test passed"); + } + + #endregion + + #region Test 4: Valkey Restart Recovery + + [ValkeyIntegrationFact] + public async Task ValkeyRestartRecovery() + { + // Arrange — create queue and process 3 messages. + var queueOptions = _fixture.CreateQueueOptions(); + var transportOptions = _fixture.CreateOptions(); + + // Use a dedicated connection factory that will reconnect after restart. + await using var factory = _fixture.CreateConnectionFactory(); + var queue = new ValkeyMessageQueue( + factory, + queueOptions, + transportOptions, + _fixture.GetLogger>()); + + var preRestartMessages = Enumerable.Range(1, 3) + .Select(i => new TestMessage($"pre-{i}", $"Pre-restart-{i}", DateTimeOffset.UtcNow)) + .ToList(); + + foreach (var msg in preRestartMessages) + { + await queue.EnqueueAsync(msg); + } + + var preLeases = await queue.LeaseAsync(new LeaseRequest { BatchSize = 3 }); + preLeases.Should().HaveCount(3); + foreach (var lease in preLeases) + { + await lease.AcknowledgeAsync(); + } + + await queue.DisposeAsync(); + + // Act — restart the Valkey container. + _output.WriteLine("Restarting Valkey container..."); + await _fixture.RestartAsync(); + + // Wait for reconnection (poll IsConnected for up to 30s). + await using var reconnectedFactory = _fixture.CreateConnectionFactory(); + var reconnected = false; + var sw = Stopwatch.StartNew(); + while (sw.Elapsed < TimeSpan.FromSeconds(30)) + { + try + { + await reconnectedFactory.PingAsync(); + reconnected = true; + break; + } + catch + { + await Task.Delay(500); + } + } + + reconnected.Should().BeTrue("connection should be re-established within 30s after restart"); + _output.WriteLine($"Reconnected after {sw.Elapsed.TotalSeconds:F1}s"); + + // Create a new queue on the reconnected factory. + var postQueue = new ValkeyMessageQueue( + reconnectedFactory, + _fixture.CreateQueueOptions(), + transportOptions, + _fixture.GetLogger>()); + + // Enqueue 3 more messages after restart. + var postRestartMessages = Enumerable.Range(1, 3) + .Select(i => new TestMessage($"post-{i}", $"Post-restart-{i}", DateTimeOffset.UtcNow)) + .ToList(); + + foreach (var msg in postRestartMessages) + { + await postQueue.EnqueueAsync(msg); + } + + // Lease and verify all 3 post-restart messages. + var postLeases = await postQueue.LeaseAsync(new LeaseRequest { BatchSize = 10 }); + postLeases.Should().HaveCount(3, "all 3 post-restart messages should be received"); + + var postIds = postLeases.Select(l => l.Message.Id).OrderBy(id => id).ToList(); + var expectedIds = postRestartMessages.Select(m => m.Id).OrderBy(id => id).ToList(); + postIds.Should().BeEquivalentTo(expectedIds); + + foreach (var lease in postLeases) + { + await lease.AcknowledgeAsync(); + } + + await postQueue.DisposeAsync(); + + _output.WriteLine("Valkey restart recovery test passed"); + } + + #endregion + + #region Test 5: Sustained Throughput (30 Minutes) + + [ValkeyIntegrationFact] + [Trait("Category", "Performance")] + public async Task SustainedThroughput_30Minutes() + { + // Arrange + var transportOptions = _fixture.CreateOptions(); + transportOptions.QueueWaitTimeoutSeconds = 10; + + var queueOptions = _fixture.CreateQueueOptions(); + + await using var producerFactory = _fixture.CreateConnectionFactory(); + await using var consumerFactory = _fixture.CreateConnectionFactory(); + + var producerQueue = new ValkeyMessageQueue( + producerFactory, + queueOptions, + transportOptions, + _fixture.GetLogger>()); + + var consumerQueue = new ValkeyMessageQueue( + consumerFactory, + queueOptions, + transportOptions, + _fixture.GetLogger>()); + + const int totalMessages = 1800; + + var enqueueTimestamps = new ConcurrentDictionary(); + var deliveryLatencies = new ConcurrentBag(); + var receivedIds = new ConcurrentBag(); + + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(35)); + + // Producer task: send 1 message per second for 1800 seconds. + var producerTask = Task.Run(async () => + { + for (int i = 1; i <= totalMessages; i++) + { + var msg = new TestMessage($"perf-{i:D5}", $"Perf message {i}", DateTimeOffset.UtcNow); + enqueueTimestamps[msg.Id] = DateTimeOffset.UtcNow; + await producerQueue.EnqueueAsync(msg); + + if (i < totalMessages) + { + await Task.Delay(TimeSpan.FromSeconds(1), cts.Token); + } + + if (i % 100 == 0) + { + _output.WriteLine($"Producer: sent {i}/{totalMessages}"); + } + } + }, cts.Token); + + // Consumer task: lease + ACK continuously. + var consumerTask = Task.Run(async () => + { + while (!cts.IsCancellationRequested && receivedIds.Count < totalMessages) + { + try + { + // Wait for notification or timeout. + await consumerQueue.WaitForNotificationAsync(TimeSpan.FromSeconds(15), cts.Token); + + var leases = await consumerQueue.LeaseAsync( + new LeaseRequest { BatchSize = 50 }, + cts.Token); + + foreach (var lease in leases) + { + var now = DateTimeOffset.UtcNow; + if (enqueueTimestamps.TryGetValue(lease.Message.Id, out var enqueueTime)) + { + deliveryLatencies.Add((now - enqueueTime).TotalSeconds); + } + + receivedIds.Add(lease.Message.Id); + await lease.AcknowledgeAsync(); + } + } + catch (OperationCanceledException) + { + break; + } + } + + _output.WriteLine($"Consumer: received {receivedIds.Count}/{totalMessages}"); + }, cts.Token); + + // Wait for both tasks. + await Task.WhenAll(producerTask, consumerTask); + + // Give consumer extra time to drain any remaining. + var drainDeadline = DateTimeOffset.UtcNow.AddSeconds(60); + while (receivedIds.Count < totalMessages && DateTimeOffset.UtcNow < drainDeadline) + { + var leases = await consumerQueue.LeaseAsync(new LeaseRequest { BatchSize = 50 }); + foreach (var lease in leases) + { + var now = DateTimeOffset.UtcNow; + if (enqueueTimestamps.TryGetValue(lease.Message.Id, out var enqueueTime)) + { + deliveryLatencies.Add((now - enqueueTime).TotalSeconds); + } + + receivedIds.Add(lease.Message.Id); + await lease.AcknowledgeAsync(); + } + + if (leases.Count == 0) + { + await Task.Delay(500); + } + } + + // Assert — zero message loss. + receivedIds.Should().HaveCount(totalMessages, + $"all {totalMessages} messages must be received (zero loss)"); + + // Calculate percentiles. + var sortedLatencies = deliveryLatencies.OrderBy(l => l).ToArray(); + var p50 = sortedLatencies[(int)(sortedLatencies.Length * 0.50)]; + var p95 = sortedLatencies[(int)(sortedLatencies.Length * 0.95)]; + var p99 = sortedLatencies[(int)(sortedLatencies.Length * 0.99)]; + + _output.WriteLine($"Latency percentiles: p50={p50:F2}s, p95={p95:F2}s, p99={p99:F2}s"); + + p50.Should().BeLessThan(1.0, "p50 latency should be under 1 second"); + p95.Should().BeLessThan(15.0, "p95 latency should be under 15 seconds"); + p99.Should().BeLessThan(30.0, "p99 latency should be under 30 seconds"); + + await producerQueue.DisposeAsync(); + await consumerQueue.DisposeAsync(); + + _output.WriteLine("Sustained throughput test passed (30 min, zero loss)"); + } + + #endregion + + #region Test 6: ConnectionFailed Resets Subscription State + + [ValkeyIntegrationFact] + public async Task ConnectionFailedResetsSubscriptionState() + { + // Arrange + var transportOptions = _fixture.CreateOptions(); + transportOptions.QueueWaitTimeoutSeconds = 2; + + var queueOptions = _fixture.CreateQueueOptions(); + + await using var factory = _fixture.CreateConnectionFactory(); + var queue = new ValkeyMessageQueue( + factory, + queueOptions, + transportOptions, + _fixture.GetLogger>()); + + // Trigger subscription by calling WaitForNotificationAsync. + var cts1 = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await queue.WaitForNotificationAsync(TimeSpan.FromSeconds(5), cts1.Token); + + // Get the underlying connection and simulate failure. + var connection = await factory.GetConnectionAsync(); + + // Simulate connection failure by closing the connection. + // This triggers the ConnectionFailed event which resets _subscribed = false. + connection.GetDatabase().Multiplexer.Close(allowCommandsToComplete: false); + + // Brief delay for the event to propagate. + await Task.Delay(500); + + // Act — next WaitForNotificationAsync should not hang; it should re-subscribe + // and return within the timeout (not stuck on a dead subscription). + await using var recoveredFactory = _fixture.CreateConnectionFactory(); + var recoveredQueue = new ValkeyMessageQueue( + recoveredFactory, + queueOptions, + transportOptions, + _fixture.GetLogger>()); + + var sw = Stopwatch.StartNew(); + var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + await recoveredQueue.WaitForNotificationAsync(TimeSpan.FromSeconds(5), cts2.Token); + sw.Stop(); + + sw.Elapsed.Should().BeLessThan(TimeSpan.FromSeconds(10), + "WaitForNotificationAsync should not hang after connection failure"); + + // Verify messages can still be leased. + var msg = new TestMessage(Guid.NewGuid().ToString(), "post-failure", DateTimeOffset.UtcNow); + await recoveredQueue.EnqueueAsync(msg); + + var leases = await recoveredQueue.LeaseAsync(new LeaseRequest { BatchSize = 1 }); + leases.Should().HaveCount(1, "messages should be leasable after connection recovery"); + leases[0].Message.Id.Should().Be(msg.Id); + + await leases[0].AcknowledgeAsync(); + await recoveredQueue.DisposeAsync(); + + _output.WriteLine($"Connection failure recovery test passed (wait returned in {sw.Elapsed.TotalSeconds:F1}s)"); + } + + #endregion + + #region Test 7: Multiple Consumers Fair Distribution + + [ValkeyIntegrationFact] + public async Task MultipleConsumersFairDistribution() + { + // Arrange — 2 queue instances on the same consumer group with different consumer names. + var queueOptions = _fixture.CreateQueueOptions(); + var transportOptions = _fixture.CreateOptions(); + + var consumer1Options = _fixture.CreateQueueOptions( + queueName: queueOptions.QueueName, + consumerGroup: queueOptions.ConsumerGroup, + consumerName: "fair-consumer-1"); + + var consumer2Options = _fixture.CreateQueueOptions( + queueName: queueOptions.QueueName, + consumerGroup: queueOptions.ConsumerGroup, + consumerName: "fair-consumer-2"); + + await using var factory1 = _fixture.CreateConnectionFactory(); + await using var factory2 = _fixture.CreateConnectionFactory(); + + var queue1 = new ValkeyMessageQueue( + factory1, + consumer1Options, + transportOptions, + _fixture.GetLogger>()); + + var queue2 = new ValkeyMessageQueue( + factory2, + consumer2Options, + transportOptions, + _fixture.GetLogger>()); + + // Enqueue 10 messages via consumer 1's queue (same stream). + var messages = Enumerable.Range(1, 10) + .Select(i => new TestMessage($"fair-{i}", $"Fair message {i}", DateTimeOffset.UtcNow)) + .ToList(); + + foreach (var msg in messages) + { + await queue1.EnqueueAsync(msg); + } + + // Act — both consumers lease concurrently. + var allReceivedIds = new ConcurrentBag(); + var consumer1Ids = new ConcurrentBag(); + var consumer2Ids = new ConcurrentBag(); + + async Task ConsumeAll(ValkeyMessageQueue q, ConcurrentBag ids) + { + var remaining = 10; + var attempts = 0; + while (remaining > 0 && attempts < 20) + { + var leases = await q.LeaseAsync(new LeaseRequest { BatchSize = 5 }); + foreach (var lease in leases) + { + ids.Add(lease.Message.Id); + allReceivedIds.Add(lease.Message.Id); + await lease.AcknowledgeAsync(); + remaining--; + } + + if (leases.Count == 0) + { + attempts++; + await Task.Delay(100); + } + } + } + + await Task.WhenAll( + ConsumeAll(queue1, consumer1Ids), + ConsumeAll(queue2, consumer2Ids)); + + // Assert — all 10 messages received across both consumers. + var uniqueIds = allReceivedIds.Distinct().ToList(); + uniqueIds.Should().HaveCount(10, + "all 10 messages should be received across both consumers (no loss)"); + + // Verify no duplicates. + allReceivedIds.Should().OnlyHaveUniqueItems( + "each message should be delivered to only one consumer (no duplicates)"); + + _output.WriteLine($"Consumer 1 received {consumer1Ids.Count}, Consumer 2 received {consumer2Ids.Count}"); + _output.WriteLine("Multi-consumer fair distribution test passed"); + + await queue1.DisposeAsync(); + await queue2.DisposeAsync(); + } + + #endregion +}