// ----------------------------------------------------------------------------- // RetryStatePersistenceTests.cs // Sprint: SPRINT_5100_0009_0009_notify_tests // Task: NOTIFY-5100-011 // Description: Model S1 retry state persistence tests for Notify delivery storage // ----------------------------------------------------------------------------- using FluentAssertions; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using StellaOps.Notify.Persistence.Postgres.Models; using StellaOps.Notify.Persistence.Postgres.Repositories; using StellaOps.TestKit; using Xunit; namespace StellaOps.Notify.Persistence.Postgres.Tests; /// /// Retry state persistence tests for Notify delivery storage operations. /// Implements Model S1 (Storage/Postgres) test requirements: /// - Failed notification → retry state saved /// - Retry state persists across queries /// - Retry on next poll behavior /// [Collection(NotifyPostgresCollection.Name)] [Trait("Category", TestCategories.Integration)] [Trait("Category", "RetryPersistence")] public sealed class RetryStatePersistenceTests : IAsyncLifetime { private readonly NotifyPostgresFixture _fixture; private NotifyDataSource _dataSource = null!; private DeliveryRepository _deliveryRepository = null!; private ChannelRepository _channelRepository = null!; private readonly string _tenantId = Guid.NewGuid().ToString(); private Guid _channelId; public RetryStatePersistenceTests(NotifyPostgresFixture fixture) { _fixture = fixture; } public async ValueTask InitializeAsync() { await _fixture.ExecuteSqlAsync( "TRUNCATE TABLE notify.audit, notify.deliveries, notify.digests, notify.channels RESTART IDENTITY CASCADE;"); var options = _fixture.Fixture.CreateOptions(); options.SchemaName = _fixture.SchemaName; _dataSource = new NotifyDataSource(Options.Create(options), NullLogger.Instance); _deliveryRepository = new DeliveryRepository(_dataSource, NullLogger.Instance); _channelRepository = new ChannelRepository(_dataSource, NullLogger.Instance); // Create a channel for deliveries _channelId = Guid.NewGuid(); await _channelRepository.CreateAsync(new ChannelEntity { Id = _channelId, TenantId = _tenantId, Name = "test-email", ChannelType = ChannelType.Email, Enabled = true }); } public ValueTask DisposeAsync() => ValueTask.CompletedTask; [Fact] public async Task MarkFailed_WithRetry_SavesNextRetryTime() { // Arrange var delivery = CreateDelivery(maxAttempts: 3); await _deliveryRepository.CreateAsync(delivery); var retryDelay = TimeSpan.FromMinutes(5); // Act await _deliveryRepository.MarkFailedAsync(_tenantId, delivery.Id, "Connection timeout", retryDelay); // Assert var fetched = await _deliveryRepository.GetByIdAsync(_tenantId, delivery.Id); fetched.Should().NotBeNull(); fetched!.Attempt.Should().BeGreaterThan(0); fetched.ErrorMessage.Should().Be("Connection timeout"); // If retry is scheduled (not yet exhausted attempts), should have next retry time if (fetched.Status == DeliveryStatus.Pending) { fetched.NextRetryAt.Should().NotBeNull(); fetched.NextRetryAt.Should().BeAfter(DateTimeOffset.UtcNow); } } [Fact] public async Task MarkFailed_AttemptsExhausted_StatusIsFailed() { // Arrange var delivery = CreateDelivery(maxAttempts: 1); await _deliveryRepository.CreateAsync(delivery); // Act - Fail once (exhausts max attempts) await _deliveryRepository.MarkFailedAsync(_tenantId, delivery.Id, "Permanent failure", TimeSpan.Zero); // Assert var fetched = await _deliveryRepository.GetByIdAsync(_tenantId, delivery.Id); fetched.Should().NotBeNull(); fetched!.Status.Should().Be(DeliveryStatus.Failed); fetched.FailedAt.Should().NotBeNull(); fetched.ErrorMessage.Should().Be("Permanent failure"); } [Fact] public async Task RetryState_PersistsAcrossQueries() { // Arrange var delivery = CreateDelivery(maxAttempts: 5); await _deliveryRepository.CreateAsync(delivery); var retryDelay = TimeSpan.FromMinutes(10); await _deliveryRepository.MarkFailedAsync(_tenantId, delivery.Id, "Transient error", retryDelay); // Act - Query multiple times var fetched1 = await _deliveryRepository.GetByIdAsync(_tenantId, delivery.Id); var fetched2 = await _deliveryRepository.GetByIdAsync(_tenantId, delivery.Id); var fetched3 = await _deliveryRepository.GetByIdAsync(_tenantId, delivery.Id); // Assert - All queries should return same retry state fetched1!.Attempt.Should().Be(fetched2!.Attempt); fetched2.Attempt.Should().Be(fetched3!.Attempt); fetched1.ErrorMessage.Should().Be(fetched2.ErrorMessage); fetched2.ErrorMessage.Should().Be(fetched3.ErrorMessage); fetched1.NextRetryAt.Should().Be(fetched2.NextRetryAt); fetched2.NextRetryAt.Should().Be(fetched3.NextRetryAt); } [Fact] public async Task MultipleFailures_AttemptCountIncreases() { // Arrange var delivery = CreateDelivery(maxAttempts: 5); await _deliveryRepository.CreateAsync(delivery); // Act - Fail multiple times await _deliveryRepository.MarkFailedAsync(_tenantId, delivery.Id, "Error 1", TimeSpan.FromSeconds(1)); var afterFirst = await _deliveryRepository.GetByIdAsync(_tenantId, delivery.Id); // Wait for retry window if needed if (afterFirst!.Status == DeliveryStatus.Pending && afterFirst.NextRetryAt.HasValue) { await _deliveryRepository.MarkFailedAsync(_tenantId, delivery.Id, "Error 2", TimeSpan.FromSeconds(1)); } // Assert var fetched = await _deliveryRepository.GetByIdAsync(_tenantId, delivery.Id); fetched.Should().NotBeNull(); fetched!.Attempt.Should().BeGreaterThanOrEqualTo(1); } [Fact] public async Task GetPending_IncludesRetryableDeliveries() { // Arrange var delivery = CreateDelivery(maxAttempts: 3); await _deliveryRepository.CreateAsync(delivery); // Fail with short retry delay await _deliveryRepository.MarkFailedAsync(_tenantId, delivery.Id, "Transient error", TimeSpan.Zero); var fetched = await _deliveryRepository.GetByIdAsync(_tenantId, delivery.Id); // Act - Query pending deliveries (should include retryable ones) if (fetched!.Status == DeliveryStatus.Pending) { var pending = await _deliveryRepository.GetPendingAsync(_tenantId); // Assert - Delivery should be in pending list if retry time has passed if (fetched.NextRetryAt == null || fetched.NextRetryAt <= DateTimeOffset.UtcNow) { pending.Should().Contain(d => d.Id == delivery.Id); } } } [Fact] public async Task ErrorMessage_PreservedAfterRetry() { // Arrange var delivery = CreateDelivery(maxAttempts: 3); await _deliveryRepository.CreateAsync(delivery); var errorMessage = "SMTP connection refused: 10.0.0.1:587"; // Act await _deliveryRepository.MarkFailedAsync(_tenantId, delivery.Id, errorMessage, TimeSpan.FromMinutes(5)); // Assert var fetched = await _deliveryRepository.GetByIdAsync(_tenantId, delivery.Id); fetched.Should().NotBeNull(); fetched!.ErrorMessage.Should().Be(errorMessage); } [Fact] public async Task SuccessfulDelivery_ClearsRetryState() { // Arrange var delivery = CreateDelivery(maxAttempts: 3); await _deliveryRepository.CreateAsync(delivery); // Fail first await _deliveryRepository.MarkFailedAsync(_tenantId, delivery.Id, "Transient error", TimeSpan.FromMinutes(1)); // Then succeed await _deliveryRepository.MarkQueuedAsync(_tenantId, delivery.Id); await _deliveryRepository.MarkSentAsync(_tenantId, delivery.Id); await _deliveryRepository.MarkDeliveredAsync(_tenantId, delivery.Id); // Assert var fetched = await _deliveryRepository.GetByIdAsync(_tenantId, delivery.Id); fetched.Should().NotBeNull(); fetched!.Status.Should().Be(DeliveryStatus.Delivered); fetched.DeliveredAt.Should().NotBeNull(); } [Fact] public async Task MultipleDeliveries_RetryStatesAreIndependent() { // Arrange var delivery1 = CreateDelivery(maxAttempts: 3, recipient: "user1@example.com"); var delivery2 = CreateDelivery(maxAttempts: 3, recipient: "user2@example.com"); await _deliveryRepository.CreateAsync(delivery1); await _deliveryRepository.CreateAsync(delivery2); // Act - Fail delivery1, succeed delivery2 await _deliveryRepository.MarkFailedAsync(_tenantId, delivery1.Id, "Error for user1", TimeSpan.FromMinutes(5)); await _deliveryRepository.MarkQueuedAsync(_tenantId, delivery2.Id); await _deliveryRepository.MarkSentAsync(_tenantId, delivery2.Id); await _deliveryRepository.MarkDeliveredAsync(_tenantId, delivery2.Id); // Assert var fetched1 = await _deliveryRepository.GetByIdAsync(_tenantId, delivery1.Id); var fetched2 = await _deliveryRepository.GetByIdAsync(_tenantId, delivery2.Id); fetched1!.ErrorMessage.Should().Be("Error for user1"); fetched2!.Status.Should().Be(DeliveryStatus.Delivered); fetched2.ErrorMessage.Should().BeNull(); } [Fact] public async Task GetByStatus_ReturnsPendingWithRetries() { // Arrange var delivery = CreateDelivery(maxAttempts: 5); await _deliveryRepository.CreateAsync(delivery); await _deliveryRepository.MarkFailedAsync(_tenantId, delivery.Id, "Transient", TimeSpan.Zero); var fetched = await _deliveryRepository.GetByIdAsync(_tenantId, delivery.Id); // Act - Query by pending status if (fetched!.Status == DeliveryStatus.Pending) { var pending = await _deliveryRepository.GetByStatusAsync(_tenantId, DeliveryStatus.Pending); // Assert pending.Should().Contain(d => d.Id == delivery.Id); var foundDelivery = pending.First(d => d.Id == delivery.Id); foundDelivery.Attempt.Should().BeGreaterThan(0); } } [Fact] public async Task GetByStatus_ReturnsFailed() { // Arrange var delivery = CreateDelivery(maxAttempts: 1); await _deliveryRepository.CreateAsync(delivery); await _deliveryRepository.MarkFailedAsync(_tenantId, delivery.Id, "Permanent failure", TimeSpan.Zero); // Act var failed = await _deliveryRepository.GetByStatusAsync(_tenantId, DeliveryStatus.Failed); // Assert failed.Should().Contain(d => d.Id == delivery.Id); var foundDelivery = failed.First(d => d.Id == delivery.Id); foundDelivery.FailedAt.Should().NotBeNull(); foundDelivery.ErrorMessage.Should().Be("Permanent failure"); } private DeliveryEntity CreateDelivery(int maxAttempts = 3, string? recipient = null) { return new DeliveryEntity { Id = Guid.NewGuid(), TenantId = _tenantId, ChannelId = _channelId, Recipient = recipient ?? "test@example.com", EventType = "test.event", EventPayload = """{"test": true}""", Status = DeliveryStatus.Pending, MaxAttempts = maxAttempts }; } }