using System; using System.Buffers; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Globalization; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using StackExchange.Redis; using StellaOps.Notify.Models; namespace StellaOps.Notify.Queue.Redis; internal sealed class RedisNotifyDeliveryQueue : INotifyDeliveryQueue, IAsyncDisposable { private const string TransportName = "redis"; private readonly NotifyDeliveryQueueOptions _options; private readonly NotifyRedisDeliveryQueueOptions _redisOptions; private readonly ILogger _logger; private readonly TimeProvider _timeProvider; private readonly Func> _connectionFactory; private readonly SemaphoreSlim _connectionLock = new(1, 1); private readonly SemaphoreSlim _groupLock = new(1, 1); private readonly ConcurrentDictionary _streamInitialized = new(StringComparer.Ordinal); private IConnectionMultiplexer? _connection; private bool _disposed; public RedisNotifyDeliveryQueue( NotifyDeliveryQueueOptions options, NotifyRedisDeliveryQueueOptions redisOptions, ILogger logger, TimeProvider timeProvider, Func>? connectionFactory = null) { _options = options ?? throw new ArgumentNullException(nameof(options)); _redisOptions = redisOptions ?? throw new ArgumentNullException(nameof(redisOptions)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _timeProvider = timeProvider ?? TimeProvider.System; _connectionFactory = connectionFactory ?? (async config => { var connection = await ConnectionMultiplexer.ConnectAsync(config).ConfigureAwait(false); return (IConnectionMultiplexer)connection; }); if (string.IsNullOrWhiteSpace(_redisOptions.ConnectionString)) { throw new InvalidOperationException("Redis connection string must be configured for the Notify delivery queue."); } } public async ValueTask PublishAsync( NotifyDeliveryQueueMessage message, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(message); cancellationToken.ThrowIfCancellationRequested(); var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false); await EnsureConsumerGroupAsync(db, cancellationToken).ConfigureAwait(false); var now = _timeProvider.GetUtcNow(); var attempt = 1; var entries = BuildEntries(message, now, attempt); var messageId = await AddToStreamAsync( db, _redisOptions.StreamName, entries) .ConfigureAwait(false); var idempotencyKey = BuildIdempotencyKey(message.IdempotencyKey); var stored = await db.StringSetAsync( idempotencyKey, messageId, when: When.NotExists, expiry: _options.ClaimIdleThreshold) .ConfigureAwait(false); if (!stored) { await db.StreamDeleteAsync( _redisOptions.StreamName, new RedisValue[] { messageId }) .ConfigureAwait(false); var existing = await db.StringGetAsync(idempotencyKey).ConfigureAwait(false); var duplicateId = existing.IsNullOrEmpty ? messageId : existing; NotifyQueueMetrics.RecordDeduplicated(TransportName, _redisOptions.StreamName); _logger.LogDebug( "Duplicate Notify delivery enqueue detected for delivery {DeliveryId}.", message.Delivery.DeliveryId); return new NotifyQueueEnqueueResult(duplicateId.ToString()!, true); } NotifyQueueMetrics.RecordEnqueued(TransportName, _redisOptions.StreamName); _logger.LogDebug( "Enqueued Notify delivery {DeliveryId} (channel {ChannelId}) into stream {Stream}.", message.Delivery.DeliveryId, message.ChannelId, _redisOptions.StreamName); return new NotifyQueueEnqueueResult(messageId.ToString()!, false); } public async ValueTask>> LeaseAsync( NotifyQueueLeaseRequest request, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(request); cancellationToken.ThrowIfCancellationRequested(); var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false); await EnsureConsumerGroupAsync(db, cancellationToken).ConfigureAwait(false); var entries = await db.StreamReadGroupAsync( _redisOptions.StreamName, _redisOptions.ConsumerGroup, request.Consumer, StreamPosition.NewMessages, request.BatchSize) .ConfigureAwait(false); if (entries is null || entries.Length == 0) { return Array.Empty>(); } var now = _timeProvider.GetUtcNow(); var leases = new List>(entries.Length); foreach (var entry in entries) { var lease = TryMapLease(entry, request.Consumer, now, request.LeaseDuration, attemptOverride: null); if (lease is null) { await AckPoisonAsync(db, entry.Id).ConfigureAwait(false); continue; } leases.Add(lease); } return leases; } public async ValueTask>> ClaimExpiredAsync( NotifyQueueClaimOptions options, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(options); cancellationToken.ThrowIfCancellationRequested(); var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false); await EnsureConsumerGroupAsync(db, cancellationToken).ConfigureAwait(false); var pending = await db.StreamPendingMessagesAsync( _redisOptions.StreamName, _redisOptions.ConsumerGroup, options.BatchSize, RedisValue.Null, (long)options.MinIdleTime.TotalMilliseconds) .ConfigureAwait(false); if (pending is null || pending.Length == 0) { return Array.Empty>(); } var eligible = pending .Where(p => p.IdleTimeInMilliseconds >= options.MinIdleTime.TotalMilliseconds) .ToArray(); if (eligible.Length == 0) { return Array.Empty>(); } var messageIds = eligible .Select(static p => (RedisValue)p.MessageId) .ToArray(); var entries = await db.StreamClaimAsync( _redisOptions.StreamName, _redisOptions.ConsumerGroup, options.ClaimantConsumer, 0, messageIds) .ConfigureAwait(false); if (entries is null || entries.Length == 0) { return Array.Empty>(); } var now = _timeProvider.GetUtcNow(); var attemptLookup = eligible .Where(static info => !info.MessageId.IsNullOrEmpty) .ToDictionary( info => info.MessageId!.ToString(), info => (int)Math.Max(1, info.DeliveryCount), StringComparer.Ordinal); var leases = new List>(entries.Length); foreach (var entry in entries) { attemptLookup.TryGetValue(entry.Id.ToString(), out var attempt); var lease = TryMapLease(entry, options.ClaimantConsumer, now, _options.DefaultLeaseDuration, attempt == 0 ? null : attempt); if (lease is null) { await AckPoisonAsync(db, entry.Id).ConfigureAwait(false); continue; } leases.Add(lease); } return leases; } public async ValueTask DisposeAsync() { if (_disposed) { return; } _disposed = true; if (_connection is not null) { await _connection.CloseAsync().ConfigureAwait(false); _connection.Dispose(); } _connectionLock.Dispose(); _groupLock.Dispose(); GC.SuppressFinalize(this); } internal async Task AcknowledgeAsync( RedisNotifyDeliveryLease lease, CancellationToken cancellationToken) { if (!lease.TryBeginCompletion()) { return; } var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false); await db.StreamAcknowledgeAsync( _redisOptions.StreamName, _redisOptions.ConsumerGroup, new RedisValue[] { lease.MessageId }) .ConfigureAwait(false); await db.StreamDeleteAsync( _redisOptions.StreamName, new RedisValue[] { lease.MessageId }) .ConfigureAwait(false); NotifyQueueMetrics.RecordAck(TransportName, _redisOptions.StreamName); _logger.LogDebug( "Acknowledged Notify delivery {DeliveryId} (message {MessageId}).", lease.Message.Delivery.DeliveryId, lease.MessageId); } internal async Task RenewLeaseAsync( RedisNotifyDeliveryLease lease, TimeSpan leaseDuration, CancellationToken cancellationToken) { var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false); await db.StreamClaimAsync( _redisOptions.StreamName, _redisOptions.ConsumerGroup, lease.Consumer, 0, new RedisValue[] { lease.MessageId }) .ConfigureAwait(false); var expires = _timeProvider.GetUtcNow().Add(leaseDuration); lease.RefreshLease(expires); _logger.LogDebug( "Renewed Notify delivery lease {DeliveryId} until {Expires:u}.", lease.Message.Delivery.DeliveryId, expires); } internal async Task ReleaseAsync( RedisNotifyDeliveryLease lease, NotifyQueueReleaseDisposition disposition, CancellationToken cancellationToken) { if (disposition == NotifyQueueReleaseDisposition.Retry && lease.Attempt >= _options.MaxDeliveryAttempts) { _logger.LogWarning( "Notify delivery {DeliveryId} reached max delivery attempts ({Attempts}); moving to dead-letter stream.", lease.Message.Delivery.DeliveryId, lease.Attempt); await DeadLetterAsync( lease, $"max-delivery-attempts:{lease.Attempt}", cancellationToken).ConfigureAwait(false); return; } if (!lease.TryBeginCompletion()) { return; } var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false); await db.StreamAcknowledgeAsync( _redisOptions.StreamName, _redisOptions.ConsumerGroup, new RedisValue[] { lease.MessageId }) .ConfigureAwait(false); await db.StreamDeleteAsync( _redisOptions.StreamName, new RedisValue[] { lease.MessageId }) .ConfigureAwait(false); if (disposition == NotifyQueueReleaseDisposition.Retry) { NotifyQueueMetrics.RecordRetry(TransportName, _redisOptions.StreamName); var delay = CalculateBackoff(lease.Attempt); if (delay > TimeSpan.Zero) { try { await Task.Delay(delay, cancellationToken).ConfigureAwait(false); } catch (TaskCanceledException) { return; } } var now = _timeProvider.GetUtcNow(); var entries = BuildEntries(lease.Message, now, lease.Attempt + 1); await AddToStreamAsync( db, _redisOptions.StreamName, entries) .ConfigureAwait(false); NotifyQueueMetrics.RecordEnqueued(TransportName, _redisOptions.StreamName); _logger.LogInformation( "Retrying Notify delivery {DeliveryId} (attempt {Attempt}).", lease.Message.Delivery.DeliveryId, lease.Attempt + 1); } else { NotifyQueueMetrics.RecordAck(TransportName, _redisOptions.StreamName); _logger.LogInformation( "Abandoned Notify delivery {DeliveryId} after {Attempt} attempt(s).", lease.Message.Delivery.DeliveryId, lease.Attempt); } } internal async Task DeadLetterAsync( RedisNotifyDeliveryLease lease, string reason, CancellationToken cancellationToken) { if (!lease.TryBeginCompletion()) { return; } var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false); await db.StreamAcknowledgeAsync( _redisOptions.StreamName, _redisOptions.ConsumerGroup, new RedisValue[] { lease.MessageId }) .ConfigureAwait(false); await db.StreamDeleteAsync( _redisOptions.StreamName, new RedisValue[] { lease.MessageId }) .ConfigureAwait(false); await EnsureDeadLetterStreamAsync(db, cancellationToken).ConfigureAwait(false); var entries = BuildDeadLetterEntries(lease, reason); await AddToStreamAsync( db, _redisOptions.DeadLetterStreamName, entries) .ConfigureAwait(false); NotifyQueueMetrics.RecordDeadLetter(TransportName, _redisOptions.DeadLetterStreamName); _logger.LogError( "Dead-lettered Notify delivery {DeliveryId} (attempt {Attempt}): {Reason}", lease.Message.Delivery.DeliveryId, lease.Attempt, reason); } internal async ValueTask PingAsync(CancellationToken cancellationToken) { var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false); _ = await db.PingAsync().ConfigureAwait(false); } private async Task GetDatabaseAsync(CancellationToken cancellationToken) { if (_connection is { IsConnected: true }) { return _connection.GetDatabase(_redisOptions.Database ?? -1); } await _connectionLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { if (_connection is { IsConnected: true }) { return _connection.GetDatabase(_redisOptions.Database ?? -1); } var configuration = ConfigurationOptions.Parse(_redisOptions.ConnectionString!); configuration.AbortOnConnectFail = false; if (_redisOptions.Database.HasValue) { configuration.DefaultDatabase = _redisOptions.Database.Value; } using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); timeoutCts.CancelAfter(_redisOptions.InitializationTimeout); _connection = await _connectionFactory(configuration).WaitAsync(timeoutCts.Token).ConfigureAwait(false); return _connection.GetDatabase(_redisOptions.Database ?? -1); } finally { _connectionLock.Release(); } } private async Task EnsureConsumerGroupAsync( IDatabase database, CancellationToken cancellationToken) { if (_streamInitialized.ContainsKey(_redisOptions.StreamName)) { return; } await _groupLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { if (_streamInitialized.ContainsKey(_redisOptions.StreamName)) { return; } try { await database.StreamCreateConsumerGroupAsync( _redisOptions.StreamName, _redisOptions.ConsumerGroup, StreamPosition.Beginning, createStream: true) .ConfigureAwait(false); } catch (RedisServerException ex) when (ex.Message.Contains("BUSYGROUP", StringComparison.OrdinalIgnoreCase)) { // group already exists } _streamInitialized[_redisOptions.StreamName] = true; } finally { _groupLock.Release(); } } private async Task EnsureDeadLetterStreamAsync( IDatabase database, CancellationToken cancellationToken) { if (_streamInitialized.ContainsKey(_redisOptions.DeadLetterStreamName)) { return; } await _groupLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { if (_streamInitialized.ContainsKey(_redisOptions.DeadLetterStreamName)) { return; } try { await database.StreamCreateConsumerGroupAsync( _redisOptions.DeadLetterStreamName, _redisOptions.ConsumerGroup, StreamPosition.Beginning, createStream: true) .ConfigureAwait(false); } catch (RedisServerException ex) when (ex.Message.Contains("BUSYGROUP", StringComparison.OrdinalIgnoreCase)) { // ignore } _streamInitialized[_redisOptions.DeadLetterStreamName] = true; } finally { _groupLock.Release(); } } private NameValueEntry[] BuildEntries( NotifyDeliveryQueueMessage message, DateTimeOffset enqueuedAt, int attempt) { var json = NotifyCanonicalJsonSerializer.Serialize(message.Delivery); var attributeCount = message.Attributes.Count; var entries = ArrayPool.Shared.Rent(8 + attributeCount); var index = 0; entries[index++] = new NameValueEntry(NotifyQueueFields.Payload, json); entries[index++] = new NameValueEntry(NotifyQueueFields.DeliveryId, message.Delivery.DeliveryId); entries[index++] = new NameValueEntry(NotifyQueueFields.ChannelId, message.ChannelId); entries[index++] = new NameValueEntry(NotifyQueueFields.ChannelType, message.ChannelType.ToString()); entries[index++] = new NameValueEntry(NotifyQueueFields.Tenant, message.Delivery.TenantId); entries[index++] = new NameValueEntry(NotifyQueueFields.Attempt, attempt); entries[index++] = new NameValueEntry(NotifyQueueFields.EnqueuedAt, enqueuedAt.ToUnixTimeMilliseconds()); entries[index++] = new NameValueEntry(NotifyQueueFields.IdempotencyKey, message.IdempotencyKey); entries[index++] = new NameValueEntry(NotifyQueueFields.TraceId, message.TraceId ?? string.Empty); entries[index++] = new NameValueEntry(NotifyQueueFields.PartitionKey, message.PartitionKey); if (attributeCount > 0) { foreach (var kvp in message.Attributes) { entries[index++] = new NameValueEntry( NotifyQueueFields.AttributePrefix + kvp.Key, kvp.Value); } } return entries.AsSpan(0, index).ToArray(); } private NameValueEntry[] BuildDeadLetterEntries(RedisNotifyDeliveryLease lease, string reason) { var json = NotifyCanonicalJsonSerializer.Serialize(lease.Message.Delivery); var attributes = lease.Message.Attributes; var attributeCount = attributes.Count; var entries = ArrayPool.Shared.Rent(9 + attributeCount); var index = 0; entries[index++] = new NameValueEntry(NotifyQueueFields.Payload, json); entries[index++] = new NameValueEntry(NotifyQueueFields.DeliveryId, lease.Message.Delivery.DeliveryId); entries[index++] = new NameValueEntry(NotifyQueueFields.ChannelId, lease.Message.ChannelId); entries[index++] = new NameValueEntry(NotifyQueueFields.ChannelType, lease.Message.ChannelType.ToString()); entries[index++] = new NameValueEntry(NotifyQueueFields.Tenant, lease.Message.Delivery.TenantId); entries[index++] = new NameValueEntry(NotifyQueueFields.Attempt, lease.Attempt); entries[index++] = new NameValueEntry(NotifyQueueFields.IdempotencyKey, lease.Message.IdempotencyKey); entries[index++] = new NameValueEntry("deadletter-reason", reason); entries[index++] = new NameValueEntry(NotifyQueueFields.TraceId, lease.Message.TraceId ?? string.Empty); foreach (var kvp in attributes) { entries[index++] = new NameValueEntry( NotifyQueueFields.AttributePrefix + kvp.Key, kvp.Value); } return entries.AsSpan(0, index).ToArray(); } private RedisNotifyDeliveryLease? TryMapLease( StreamEntry entry, string consumer, DateTimeOffset now, TimeSpan leaseDuration, int? attemptOverride) { if (entry.Values is null || entry.Values.Length == 0) { return null; } string? payload = null; string? deliveryId = null; string? channelId = null; string? channelTypeRaw = null; string? traceId = null; string? idempotency = null; string? partitionKey = null; long? enqueuedAtUnix = null; var attempt = attemptOverride ?? 1; var attributes = new Dictionary(StringComparer.Ordinal); foreach (var value in entry.Values) { var name = value.Name.ToString(); var data = value.Value; if (name.Equals(NotifyQueueFields.Payload, StringComparison.Ordinal)) { payload = data.ToString(); } else if (name.Equals(NotifyQueueFields.DeliveryId, StringComparison.Ordinal)) { deliveryId = data.ToString(); } else if (name.Equals(NotifyQueueFields.ChannelId, StringComparison.Ordinal)) { channelId = data.ToString(); } else if (name.Equals(NotifyQueueFields.ChannelType, StringComparison.Ordinal)) { channelTypeRaw = data.ToString(); } else if (name.Equals(NotifyQueueFields.Attempt, StringComparison.Ordinal)) { if (int.TryParse(data.ToString(), NumberStyles.Integer, CultureInfo.InvariantCulture, out var parsed)) { attempt = Math.Max(parsed, attempt); } } else if (name.Equals(NotifyQueueFields.EnqueuedAt, StringComparison.Ordinal)) { if (long.TryParse(data.ToString(), NumberStyles.Integer, CultureInfo.InvariantCulture, out var unix)) { enqueuedAtUnix = unix; } } else if (name.Equals(NotifyQueueFields.IdempotencyKey, StringComparison.Ordinal)) { idempotency = data.ToString(); } else if (name.Equals(NotifyQueueFields.TraceId, StringComparison.Ordinal)) { var text = data.ToString(); traceId = string.IsNullOrWhiteSpace(text) ? null : text; } else if (name.Equals(NotifyQueueFields.PartitionKey, StringComparison.Ordinal)) { partitionKey = data.ToString(); } else if (name.StartsWith(NotifyQueueFields.AttributePrefix, StringComparison.Ordinal)) { attributes[name[NotifyQueueFields.AttributePrefix.Length..]] = data.ToString(); } } if (payload is null || deliveryId is null || channelId is null || channelTypeRaw is null) { return null; } NotifyDelivery delivery; try { delivery = NotifyCanonicalJsonSerializer.Deserialize(payload); } catch (Exception ex) { _logger.LogWarning( ex, "Failed to deserialize Notify delivery payload for entry {EntryId}.", entry.Id.ToString()); return null; } if (!Enum.TryParse(channelTypeRaw, ignoreCase: true, out var channelType)) { _logger.LogWarning( "Unknown channel type '{ChannelType}' for delivery {DeliveryId}; acknowledging as poison.", channelTypeRaw, deliveryId); return null; } var attributeView = attributes.Count == 0 ? EmptyReadOnlyDictionary.Instance : new ReadOnlyDictionary(attributes); var enqueuedAt = enqueuedAtUnix is null ? now : DateTimeOffset.FromUnixTimeMilliseconds(enqueuedAtUnix.Value); var message = new NotifyDeliveryQueueMessage( delivery, channelId, channelType, _redisOptions.StreamName, traceId, attributeView); var leaseExpires = now.Add(leaseDuration); return new RedisNotifyDeliveryLease( this, entry.Id.ToString(), message, attempt, enqueuedAt, leaseExpires, consumer, idempotency, partitionKey ?? channelId); } private async Task AckPoisonAsync(IDatabase database, RedisValue messageId) { await database.StreamAcknowledgeAsync( _redisOptions.StreamName, _redisOptions.ConsumerGroup, new RedisValue[] { messageId }) .ConfigureAwait(false); await database.StreamDeleteAsync( _redisOptions.StreamName, new RedisValue[] { messageId }) .ConfigureAwait(false); } private static async Task AddToStreamAsync( IDatabase database, string stream, IReadOnlyList entries) { return await database.StreamAddAsync( stream, entries.ToArray()) .ConfigureAwait(false); } private string BuildIdempotencyKey(string token) => string.Concat(_redisOptions.IdempotencyKeyPrefix, token); private TimeSpan CalculateBackoff(int attempt) { var initial = _options.RetryInitialBackoff > TimeSpan.Zero ? _options.RetryInitialBackoff : TimeSpan.FromSeconds(1); if (initial <= TimeSpan.Zero) { return TimeSpan.Zero; } if (attempt <= 1) { return initial; } var max = _options.RetryMaxBackoff > TimeSpan.Zero ? _options.RetryMaxBackoff : initial; var exponent = attempt - 1; var scaledTicks = initial.Ticks * Math.Pow(2, exponent - 1); var cappedTicks = Math.Min(max.Ticks, scaledTicks); var resultTicks = Math.Max(initial.Ticks, (long)cappedTicks); return TimeSpan.FromTicks(resultTicks); } }