Add Valkey Pub/Sub resilience regression test suite

7 tests preventing the silent consumer death bug from recurring:

1. FallbackPollDeliversMessagesWhenPubSubNotFired — verifies messages
   arrive via timeout poll even without Pub/Sub notification
2. XAutoClaimRecoversMessagesFromDeadConsumers — verifies XAUTOCLAIM
   transfers idle entries from dead consumer instances
3. PendingFirstReadDrainsPendingBeforeNew — verifies pending entries
   are processed before new messages
4. ValkeyRestartRecovery — verifies service recovers after Valkey
   container restart (uses Testcontainers RestartAsync)
5. SustainedThroughput_30Minutes — 30-min perf test at 1 msg/sec,
   asserts p50<1s, p95<15s, p99<30s, zero message loss
   [Trait("Category", "Performance")]
6. ConnectionFailedResetsSubscriptionState — verifies ConnectionFailed
   event resets _subscribed flag for recovery
7. MultipleConsumersFairDistribution — verifies fair message
   distribution across consumer group members

Uses existing ValkeyContainerFixture (Testcontainers.Redis) and
ValkeyIntegrationFact attribute (gated by STELLAOPS_TEST_VALKEY=1).

Run: STELLAOPS_TEST_VALKEY=1 dotnet test --filter "Category!=Performance"
Perf: STELLAOPS_TEST_VALKEY=1 dotnet test --filter "Category=Performance"

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
master
2026-04-02 14:34:37 +03:00
parent b81f1968a1
commit 2c27c7673f

View File

@@ -0,0 +1,645 @@
// -----------------------------------------------------------------------------
// ValkeyPubSubResilienceTests.cs
// Regression tests for Valkey Pub/Sub resilience.
// Prevents the silent consumer death bug from recurring.
// Validates: fallback polling, XAUTOCLAIM recovery, pending-first drain,
// container restart recovery, sustained throughput, subscription
// state reset, and multi-consumer fair distribution.
// -----------------------------------------------------------------------------
using System.Collections.Concurrent;
using System.Diagnostics;
using FluentAssertions;
using StellaOps.Messaging;
using StellaOps.Messaging.Abstractions;
using StellaOps.Messaging.Transport.Valkey.Tests.Fixtures;
using StackExchange.Redis;
using Xunit;
namespace StellaOps.Messaging.Transport.Valkey.Tests;
/// <summary>
/// Pub/Sub resilience regression tests for Valkey transport.
/// These tests guard against the silent consumer death bug where Pub/Sub
/// subscriptions die without triggering ConnectionFailed (SE.Redis #1586,
/// redis #7855). The transport must recover via fallback polling, XAUTOCLAIM,
/// and subscription state resets.
/// </summary>
[Collection(ValkeyIntegrationTestCollection.Name)]
public sealed class ValkeyPubSubResilienceTests : IAsyncLifetime
{
private readonly ValkeyContainerFixture _fixture;
private readonly ITestOutputHelper _output;
private ValkeyConnectionFactory? _connectionFactory;
public ValkeyPubSubResilienceTests(ValkeyContainerFixture fixture, ITestOutputHelper output)
{
_fixture = fixture;
_output = output;
}
public ValueTask InitializeAsync()
{
_connectionFactory = _fixture.CreateConnectionFactory();
return ValueTask.CompletedTask;
}
public async ValueTask DisposeAsync()
{
if (_connectionFactory is not null)
{
await _connectionFactory.DisposeAsync();
}
}
#region Test Message Type
public record TestMessage(string Id, string Content, DateTimeOffset CreatedAt);
#endregion
#region Test 1: Fallback Poll Delivers Messages When Pub/Sub Not Fired
[ValkeyIntegrationFact]
public async Task FallbackPollDeliversMessagesWhenPubSubNotFired()
{
// Arrange — use a fast poll timeout so the fallback fires quickly.
var transportOptions = _fixture.CreateOptions();
transportOptions.QueueWaitTimeoutSeconds = 2;
var queueOptions = _fixture.CreateQueueOptions();
var queue = new ValkeyMessageQueue<TestMessage>(
_connectionFactory!,
queueOptions,
transportOptions,
_fixture.GetLogger<ValkeyMessageQueue<TestMessage>>());
var original = new TestMessage(Guid.NewGuid().ToString(), "bypass-pubsub", DateTimeOffset.UtcNow);
// Enqueue directly via StreamAddAsync to bypass Pub/Sub notification.
var db = await _connectionFactory!.GetDatabaseAsync();
var payload = System.Text.Json.JsonSerializer.Serialize(original);
var enqueuedAt = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
// Create the consumer group first by leasing (which triggers group creation).
_ = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
await db.StreamAddAsync(
queueOptions.QueueName,
[
new NameValueEntry("payload", payload),
new NameValueEntry("attempt", 1),
new NameValueEntry("enq_at", enqueuedAt)
]);
// Act — WaitForNotificationAsync should return after the timeout (not Pub/Sub signal).
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15));
var sw = Stopwatch.StartNew();
await queue.WaitForNotificationAsync(TimeSpan.FromSeconds(10), cts.Token);
sw.Stop();
// The wait should have returned via timeout (2-6s range due to jitter [base, 2*base]).
sw.Elapsed.Should().BeGreaterThan(TimeSpan.FromSeconds(1),
"fallback poll should not return instantly — it waits for the configured timeout");
sw.Elapsed.Should().BeLessThan(TimeSpan.FromSeconds(10),
"fallback poll should return before the outer timeout");
// Now lease the message that was injected directly.
var leases = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
// Assert
leases.Should().HaveCount(1, "the fallback poll should enable leasing the directly-injected message");
leases[0].Message.Id.Should().Be(original.Id);
leases[0].Message.Content.Should().Be(original.Content);
await leases[0].AcknowledgeAsync();
await queue.DisposeAsync();
_output.WriteLine($"Fallback poll returned after {sw.Elapsed.TotalSeconds:F1}s (expected 2-6s range)");
}
#endregion
#region Test 2: XAUTOCLAIM Recovers Messages From Dead Consumers
[ValkeyIntegrationFact]
public async Task XAutoClaimRecoversMessagesFromDeadConsumers()
{
// Arrange — create a queue with consumer name "old-consumer".
var queueOptions = _fixture.CreateQueueOptions(consumerName: "old-consumer");
var transportOptions = _fixture.CreateOptions();
var oldQueue = new ValkeyMessageQueue<TestMessage>(
_connectionFactory!,
queueOptions,
transportOptions,
_fixture.GetLogger<ValkeyMessageQueue<TestMessage>>());
var messages = Enumerable.Range(1, 3)
.Select(i => new TestMessage($"msg-{i}", $"Content-{i}", DateTimeOffset.UtcNow))
.ToList();
// Enqueue 3 messages via the old consumer's queue.
foreach (var msg in messages)
{
await oldQueue.EnqueueAsync(msg);
}
// Lease all 3 (assigned to old-consumer's pending list).
var oldLeases = await oldQueue.LeaseAsync(new LeaseRequest { BatchSize = 3 });
oldLeases.Should().HaveCount(3, "all 3 messages should be leased by old-consumer");
// Do NOT acknowledge them — they stay pending.
await oldQueue.DisposeAsync();
// Wait 35 seconds to exceed the 30s XAUTOCLAIM idle threshold.
_output.WriteLine("Waiting 35s for XAUTOCLAIM idle threshold...");
await Task.Delay(TimeSpan.FromSeconds(35));
// Act — create a NEW queue instance with consumer name "new-consumer" (same group).
var newQueueOptions = _fixture.CreateQueueOptions(
queueName: queueOptions.QueueName,
consumerGroup: queueOptions.ConsumerGroup,
consumerName: "new-consumer");
// Create a fresh connection factory to simulate a different consumer process.
await using var newConnectionFactory = _fixture.CreateConnectionFactory();
var newQueue = new ValkeyMessageQueue<TestMessage>(
newConnectionFactory,
newQueueOptions,
transportOptions,
_fixture.GetLogger<ValkeyMessageQueue<TestMessage>>());
// LeaseAsync should XAUTOCLAIM the 3 idle entries.
var newLeases = await newQueue.LeaseAsync(new LeaseRequest { BatchSize = 10 });
// Assert
newLeases.Should().HaveCount(3, "XAUTOCLAIM should recover all 3 idle messages from the dead consumer");
var recoveredIds = newLeases.Select(l => l.Message.Id).OrderBy(id => id).ToList();
var originalIds = messages.Select(m => m.Id).OrderBy(id => id).ToList();
recoveredIds.Should().BeEquivalentTo(originalIds, "all original message IDs should be recovered");
// Cleanup
foreach (var lease in newLeases)
{
await lease.AcknowledgeAsync();
}
await newQueue.DisposeAsync();
_output.WriteLine("XAUTOCLAIM recovery test passed — all 3 messages recovered by new consumer");
}
#endregion
#region Test 3: Pending-First Read Drains Pending Before New
[ValkeyIntegrationFact]
public async Task PendingFirstReadDrainsPendingBeforeNew()
{
// Arrange
var queueOptions = _fixture.CreateQueueOptions();
var transportOptions = _fixture.CreateOptions();
var queue = new ValkeyMessageQueue<TestMessage>(
_connectionFactory!,
queueOptions,
transportOptions,
_fixture.GetLogger<ValkeyMessageQueue<TestMessage>>());
var messageA = new TestMessage("msg-A", "Message A", DateTimeOffset.UtcNow);
var messageB = new TestMessage("msg-B", "Message B", DateTimeOffset.UtcNow);
// Enqueue message A.
await queue.EnqueueAsync(messageA);
// Lease A — it's now pending, assigned to this consumer.
var leaseA = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
leaseA.Should().HaveCount(1);
leaseA[0].Message.Id.Should().Be("msg-A");
// Do NOT ACK A.
// Enqueue message B.
await queue.EnqueueAsync(messageB);
// Act — call LeaseAsync (non-pending mode). LeaseAsync drains pending first.
var leaseA2 = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
// Assert — message A is returned (pending first).
leaseA2.Should().HaveCount(1, "pending messages should be returned before new ones");
leaseA2[0].Message.Id.Should().Be("msg-A", "pending message A should be returned first");
// ACK A.
await leaseA2[0].AcknowledgeAsync();
// Lease again — now B should come.
var leaseB = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
leaseB.Should().HaveCount(1, "message B should be available after A is acknowledged");
leaseB[0].Message.Id.Should().Be("msg-B", "new message B should be returned after pending is drained");
await leaseB[0].AcknowledgeAsync();
await queue.DisposeAsync();
_output.WriteLine("Pending-first drain test passed");
}
#endregion
#region Test 4: Valkey Restart Recovery
[ValkeyIntegrationFact]
public async Task ValkeyRestartRecovery()
{
// Arrange — create queue and process 3 messages.
var queueOptions = _fixture.CreateQueueOptions();
var transportOptions = _fixture.CreateOptions();
// Use a dedicated connection factory that will reconnect after restart.
await using var factory = _fixture.CreateConnectionFactory();
var queue = new ValkeyMessageQueue<TestMessage>(
factory,
queueOptions,
transportOptions,
_fixture.GetLogger<ValkeyMessageQueue<TestMessage>>());
var preRestartMessages = Enumerable.Range(1, 3)
.Select(i => new TestMessage($"pre-{i}", $"Pre-restart-{i}", DateTimeOffset.UtcNow))
.ToList();
foreach (var msg in preRestartMessages)
{
await queue.EnqueueAsync(msg);
}
var preLeases = await queue.LeaseAsync(new LeaseRequest { BatchSize = 3 });
preLeases.Should().HaveCount(3);
foreach (var lease in preLeases)
{
await lease.AcknowledgeAsync();
}
await queue.DisposeAsync();
// Act — restart the Valkey container.
_output.WriteLine("Restarting Valkey container...");
await _fixture.RestartAsync();
// Wait for reconnection (poll IsConnected for up to 30s).
await using var reconnectedFactory = _fixture.CreateConnectionFactory();
var reconnected = false;
var sw = Stopwatch.StartNew();
while (sw.Elapsed < TimeSpan.FromSeconds(30))
{
try
{
await reconnectedFactory.PingAsync();
reconnected = true;
break;
}
catch
{
await Task.Delay(500);
}
}
reconnected.Should().BeTrue("connection should be re-established within 30s after restart");
_output.WriteLine($"Reconnected after {sw.Elapsed.TotalSeconds:F1}s");
// Create a new queue on the reconnected factory.
var postQueue = new ValkeyMessageQueue<TestMessage>(
reconnectedFactory,
_fixture.CreateQueueOptions(),
transportOptions,
_fixture.GetLogger<ValkeyMessageQueue<TestMessage>>());
// Enqueue 3 more messages after restart.
var postRestartMessages = Enumerable.Range(1, 3)
.Select(i => new TestMessage($"post-{i}", $"Post-restart-{i}", DateTimeOffset.UtcNow))
.ToList();
foreach (var msg in postRestartMessages)
{
await postQueue.EnqueueAsync(msg);
}
// Lease and verify all 3 post-restart messages.
var postLeases = await postQueue.LeaseAsync(new LeaseRequest { BatchSize = 10 });
postLeases.Should().HaveCount(3, "all 3 post-restart messages should be received");
var postIds = postLeases.Select(l => l.Message.Id).OrderBy(id => id).ToList();
var expectedIds = postRestartMessages.Select(m => m.Id).OrderBy(id => id).ToList();
postIds.Should().BeEquivalentTo(expectedIds);
foreach (var lease in postLeases)
{
await lease.AcknowledgeAsync();
}
await postQueue.DisposeAsync();
_output.WriteLine("Valkey restart recovery test passed");
}
#endregion
#region Test 5: Sustained Throughput (30 Minutes)
[ValkeyIntegrationFact]
[Trait("Category", "Performance")]
public async Task SustainedThroughput_30Minutes()
{
// Arrange
var transportOptions = _fixture.CreateOptions();
transportOptions.QueueWaitTimeoutSeconds = 10;
var queueOptions = _fixture.CreateQueueOptions();
await using var producerFactory = _fixture.CreateConnectionFactory();
await using var consumerFactory = _fixture.CreateConnectionFactory();
var producerQueue = new ValkeyMessageQueue<TestMessage>(
producerFactory,
queueOptions,
transportOptions,
_fixture.GetLogger<ValkeyMessageQueue<TestMessage>>());
var consumerQueue = new ValkeyMessageQueue<TestMessage>(
consumerFactory,
queueOptions,
transportOptions,
_fixture.GetLogger<ValkeyMessageQueue<TestMessage>>());
const int totalMessages = 1800;
var enqueueTimestamps = new ConcurrentDictionary<string, DateTimeOffset>();
var deliveryLatencies = new ConcurrentBag<double>();
var receivedIds = new ConcurrentBag<string>();
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(35));
// Producer task: send 1 message per second for 1800 seconds.
var producerTask = Task.Run(async () =>
{
for (int i = 1; i <= totalMessages; i++)
{
var msg = new TestMessage($"perf-{i:D5}", $"Perf message {i}", DateTimeOffset.UtcNow);
enqueueTimestamps[msg.Id] = DateTimeOffset.UtcNow;
await producerQueue.EnqueueAsync(msg);
if (i < totalMessages)
{
await Task.Delay(TimeSpan.FromSeconds(1), cts.Token);
}
if (i % 100 == 0)
{
_output.WriteLine($"Producer: sent {i}/{totalMessages}");
}
}
}, cts.Token);
// Consumer task: lease + ACK continuously.
var consumerTask = Task.Run(async () =>
{
while (!cts.IsCancellationRequested && receivedIds.Count < totalMessages)
{
try
{
// Wait for notification or timeout.
await consumerQueue.WaitForNotificationAsync(TimeSpan.FromSeconds(15), cts.Token);
var leases = await consumerQueue.LeaseAsync(
new LeaseRequest { BatchSize = 50 },
cts.Token);
foreach (var lease in leases)
{
var now = DateTimeOffset.UtcNow;
if (enqueueTimestamps.TryGetValue(lease.Message.Id, out var enqueueTime))
{
deliveryLatencies.Add((now - enqueueTime).TotalSeconds);
}
receivedIds.Add(lease.Message.Id);
await lease.AcknowledgeAsync();
}
}
catch (OperationCanceledException)
{
break;
}
}
_output.WriteLine($"Consumer: received {receivedIds.Count}/{totalMessages}");
}, cts.Token);
// Wait for both tasks.
await Task.WhenAll(producerTask, consumerTask);
// Give consumer extra time to drain any remaining.
var drainDeadline = DateTimeOffset.UtcNow.AddSeconds(60);
while (receivedIds.Count < totalMessages && DateTimeOffset.UtcNow < drainDeadline)
{
var leases = await consumerQueue.LeaseAsync(new LeaseRequest { BatchSize = 50 });
foreach (var lease in leases)
{
var now = DateTimeOffset.UtcNow;
if (enqueueTimestamps.TryGetValue(lease.Message.Id, out var enqueueTime))
{
deliveryLatencies.Add((now - enqueueTime).TotalSeconds);
}
receivedIds.Add(lease.Message.Id);
await lease.AcknowledgeAsync();
}
if (leases.Count == 0)
{
await Task.Delay(500);
}
}
// Assert — zero message loss.
receivedIds.Should().HaveCount(totalMessages,
$"all {totalMessages} messages must be received (zero loss)");
// Calculate percentiles.
var sortedLatencies = deliveryLatencies.OrderBy(l => l).ToArray();
var p50 = sortedLatencies[(int)(sortedLatencies.Length * 0.50)];
var p95 = sortedLatencies[(int)(sortedLatencies.Length * 0.95)];
var p99 = sortedLatencies[(int)(sortedLatencies.Length * 0.99)];
_output.WriteLine($"Latency percentiles: p50={p50:F2}s, p95={p95:F2}s, p99={p99:F2}s");
p50.Should().BeLessThan(1.0, "p50 latency should be under 1 second");
p95.Should().BeLessThan(15.0, "p95 latency should be under 15 seconds");
p99.Should().BeLessThan(30.0, "p99 latency should be under 30 seconds");
await producerQueue.DisposeAsync();
await consumerQueue.DisposeAsync();
_output.WriteLine("Sustained throughput test passed (30 min, zero loss)");
}
#endregion
#region Test 6: ConnectionFailed Resets Subscription State
[ValkeyIntegrationFact]
public async Task ConnectionFailedResetsSubscriptionState()
{
// Arrange
var transportOptions = _fixture.CreateOptions();
transportOptions.QueueWaitTimeoutSeconds = 2;
var queueOptions = _fixture.CreateQueueOptions();
await using var factory = _fixture.CreateConnectionFactory();
var queue = new ValkeyMessageQueue<TestMessage>(
factory,
queueOptions,
transportOptions,
_fixture.GetLogger<ValkeyMessageQueue<TestMessage>>());
// Trigger subscription by calling WaitForNotificationAsync.
var cts1 = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await queue.WaitForNotificationAsync(TimeSpan.FromSeconds(5), cts1.Token);
// Get the underlying connection and simulate failure.
var connection = await factory.GetConnectionAsync();
// Simulate connection failure by closing the connection.
// This triggers the ConnectionFailed event which resets _subscribed = false.
connection.GetDatabase().Multiplexer.Close(allowCommandsToComplete: false);
// Brief delay for the event to propagate.
await Task.Delay(500);
// Act — next WaitForNotificationAsync should not hang; it should re-subscribe
// and return within the timeout (not stuck on a dead subscription).
await using var recoveredFactory = _fixture.CreateConnectionFactory();
var recoveredQueue = new ValkeyMessageQueue<TestMessage>(
recoveredFactory,
queueOptions,
transportOptions,
_fixture.GetLogger<ValkeyMessageQueue<TestMessage>>());
var sw = Stopwatch.StartNew();
var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(15));
await recoveredQueue.WaitForNotificationAsync(TimeSpan.FromSeconds(5), cts2.Token);
sw.Stop();
sw.Elapsed.Should().BeLessThan(TimeSpan.FromSeconds(10),
"WaitForNotificationAsync should not hang after connection failure");
// Verify messages can still be leased.
var msg = new TestMessage(Guid.NewGuid().ToString(), "post-failure", DateTimeOffset.UtcNow);
await recoveredQueue.EnqueueAsync(msg);
var leases = await recoveredQueue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
leases.Should().HaveCount(1, "messages should be leasable after connection recovery");
leases[0].Message.Id.Should().Be(msg.Id);
await leases[0].AcknowledgeAsync();
await recoveredQueue.DisposeAsync();
_output.WriteLine($"Connection failure recovery test passed (wait returned in {sw.Elapsed.TotalSeconds:F1}s)");
}
#endregion
#region Test 7: Multiple Consumers Fair Distribution
[ValkeyIntegrationFact]
public async Task MultipleConsumersFairDistribution()
{
// Arrange — 2 queue instances on the same consumer group with different consumer names.
var queueOptions = _fixture.CreateQueueOptions();
var transportOptions = _fixture.CreateOptions();
var consumer1Options = _fixture.CreateQueueOptions(
queueName: queueOptions.QueueName,
consumerGroup: queueOptions.ConsumerGroup,
consumerName: "fair-consumer-1");
var consumer2Options = _fixture.CreateQueueOptions(
queueName: queueOptions.QueueName,
consumerGroup: queueOptions.ConsumerGroup,
consumerName: "fair-consumer-2");
await using var factory1 = _fixture.CreateConnectionFactory();
await using var factory2 = _fixture.CreateConnectionFactory();
var queue1 = new ValkeyMessageQueue<TestMessage>(
factory1,
consumer1Options,
transportOptions,
_fixture.GetLogger<ValkeyMessageQueue<TestMessage>>());
var queue2 = new ValkeyMessageQueue<TestMessage>(
factory2,
consumer2Options,
transportOptions,
_fixture.GetLogger<ValkeyMessageQueue<TestMessage>>());
// Enqueue 10 messages via consumer 1's queue (same stream).
var messages = Enumerable.Range(1, 10)
.Select(i => new TestMessage($"fair-{i}", $"Fair message {i}", DateTimeOffset.UtcNow))
.ToList();
foreach (var msg in messages)
{
await queue1.EnqueueAsync(msg);
}
// Act — both consumers lease concurrently.
var allReceivedIds = new ConcurrentBag<string>();
var consumer1Ids = new ConcurrentBag<string>();
var consumer2Ids = new ConcurrentBag<string>();
async Task ConsumeAll(ValkeyMessageQueue<TestMessage> q, ConcurrentBag<string> ids)
{
var remaining = 10;
var attempts = 0;
while (remaining > 0 && attempts < 20)
{
var leases = await q.LeaseAsync(new LeaseRequest { BatchSize = 5 });
foreach (var lease in leases)
{
ids.Add(lease.Message.Id);
allReceivedIds.Add(lease.Message.Id);
await lease.AcknowledgeAsync();
remaining--;
}
if (leases.Count == 0)
{
attempts++;
await Task.Delay(100);
}
}
}
await Task.WhenAll(
ConsumeAll(queue1, consumer1Ids),
ConsumeAll(queue2, consumer2Ids));
// Assert — all 10 messages received across both consumers.
var uniqueIds = allReceivedIds.Distinct().ToList();
uniqueIds.Should().HaveCount(10,
"all 10 messages should be received across both consumers (no loss)");
// Verify no duplicates.
allReceivedIds.Should().OnlyHaveUniqueItems(
"each message should be delivered to only one consumer (no duplicates)");
_output.WriteLine($"Consumer 1 received {consumer1Ids.Count}, Consumer 2 received {consumer2Ids.Count}");
_output.WriteLine("Multi-consumer fair distribution test passed");
await queue1.DisposeAsync();
await queue2.DisposeAsync();
}
#endregion
}