using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Globalization; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using StackExchange.Redis; using StellaOps.Notify.Models; namespace StellaOps.Notify.Queue.Redis; internal sealed class RedisNotifyEventQueue : INotifyEventQueue, IAsyncDisposable { private const string TransportName = "redis"; private readonly NotifyEventQueueOptions _options; private readonly NotifyRedisEventQueueOptions _redisOptions; private readonly ILogger _logger; private readonly TimeProvider _timeProvider; private readonly Func> _connectionFactory; private readonly SemaphoreSlim _connectionLock = new(1, 1); private readonly SemaphoreSlim _groupInitLock = new(1, 1); private readonly IReadOnlyDictionary _streamsByName; private readonly ConcurrentDictionary _initializedStreams = new(StringComparer.Ordinal); private IConnectionMultiplexer? _connection; private bool _disposed; public RedisNotifyEventQueue( NotifyEventQueueOptions options, NotifyRedisEventQueueOptions redisOptions, ILogger logger, TimeProvider timeProvider, Func>? connectionFactory = null) { _options = options ?? throw new ArgumentNullException(nameof(options)); _redisOptions = redisOptions ?? throw new ArgumentNullException(nameof(redisOptions)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _timeProvider = timeProvider ?? TimeProvider.System; _connectionFactory = connectionFactory ?? (async config => { var connection = await ConnectionMultiplexer.ConnectAsync(config).ConfigureAwait(false); return (IConnectionMultiplexer)connection; }); if (string.IsNullOrWhiteSpace(_redisOptions.ConnectionString)) { throw new InvalidOperationException("Redis connection string must be configured for Notify event queue."); } _streamsByName = _redisOptions.Streams.ToDictionary( stream => stream.Stream, stream => stream, StringComparer.Ordinal); } public async ValueTask PublishAsync( NotifyQueueEventMessage message, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(message); cancellationToken.ThrowIfCancellationRequested(); var streamOptions = GetStreamOptions(message.Stream); var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false); await EnsureStreamInitializedAsync(db, streamOptions, cancellationToken).ConfigureAwait(false); var now = _timeProvider.GetUtcNow(); var entries = BuildEntries(message, now, attempt: 1); var messageId = await AddToStreamAsync( db, streamOptions, entries) .ConfigureAwait(false); var idempotencyToken = string.IsNullOrWhiteSpace(message.IdempotencyKey) ? message.Event.EventId.ToString("N") : message.IdempotencyKey; var idempotencyKey = streamOptions.IdempotencyKeyPrefix + idempotencyToken; var stored = await db.StringSetAsync( idempotencyKey, messageId, when: When.NotExists, expiry: _redisOptions.IdempotencyWindow) .ConfigureAwait(false); if (!stored) { await db.StreamDeleteAsync( streamOptions.Stream, new RedisValue[] { messageId }) .ConfigureAwait(false); var existing = await db.StringGetAsync(idempotencyKey).ConfigureAwait(false); var duplicateId = existing.IsNullOrEmpty ? messageId : existing; _logger.LogDebug( "Duplicate Notify event enqueue detected for idempotency token {Token}; returning existing stream id {StreamId}.", idempotencyToken, duplicateId.ToString()); NotifyQueueMetrics.RecordDeduplicated(TransportName, streamOptions.Stream); return new NotifyQueueEnqueueResult(duplicateId.ToString()!, true); } NotifyQueueMetrics.RecordEnqueued(TransportName, streamOptions.Stream); _logger.LogDebug( "Enqueued Notify event {EventId} for tenant {Tenant} on stream {Stream} (id {StreamId}).", message.Event.EventId, message.TenantId, streamOptions.Stream, messageId.ToString()); return new NotifyQueueEnqueueResult(messageId.ToString()!, false); } public async ValueTask>> LeaseAsync( NotifyQueueLeaseRequest request, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(request); cancellationToken.ThrowIfCancellationRequested(); var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false); var now = _timeProvider.GetUtcNow(); var leases = new List>(request.BatchSize); foreach (var streamOptions in _streamsByName.Values) { await EnsureStreamInitializedAsync(db, streamOptions, cancellationToken).ConfigureAwait(false); var remaining = request.BatchSize - leases.Count; if (remaining <= 0) { break; } var entries = await db.StreamReadGroupAsync( streamOptions.Stream, streamOptions.ConsumerGroup, request.Consumer, StreamPosition.NewMessages, remaining) .ConfigureAwait(false); if (entries is null || entries.Length == 0) { continue; } foreach (var entry in entries) { var lease = TryMapLease( streamOptions, entry, request.Consumer, now, request.LeaseDuration, attemptOverride: null); if (lease is null) { await AckPoisonAsync(db, streamOptions, entry.Id).ConfigureAwait(false); continue; } leases.Add(lease); if (leases.Count >= request.BatchSize) { break; } } } return leases; } public async ValueTask>> ClaimExpiredAsync( NotifyQueueClaimOptions options, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(options); cancellationToken.ThrowIfCancellationRequested(); var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false); var now = _timeProvider.GetUtcNow(); var leases = new List>(options.BatchSize); foreach (var streamOptions in _streamsByName.Values) { await EnsureStreamInitializedAsync(db, streamOptions, cancellationToken).ConfigureAwait(false); var pending = await db.StreamPendingMessagesAsync( streamOptions.Stream, streamOptions.ConsumerGroup, options.BatchSize, RedisValue.Null, (long)options.MinIdleTime.TotalMilliseconds) .ConfigureAwait(false); if (pending is null || pending.Length == 0) { continue; } var eligible = pending .Where(p => p.IdleTimeInMilliseconds >= options.MinIdleTime.TotalMilliseconds) .ToArray(); if (eligible.Length == 0) { continue; } var messageIds = eligible .Select(static p => (RedisValue)p.MessageId) .ToArray(); var entries = await db.StreamClaimAsync( streamOptions.Stream, streamOptions.ConsumerGroup, options.ClaimantConsumer, 0, messageIds) .ConfigureAwait(false); if (entries is null || entries.Length == 0) { continue; } var attemptById = eligible .Where(static info => !info.MessageId.IsNullOrEmpty) .ToDictionary( info => info.MessageId!.ToString(), info => (int)Math.Max(1, info.DeliveryCount), StringComparer.Ordinal); foreach (var entry in entries) { var entryId = entry.Id.ToString(); attemptById.TryGetValue(entryId, out var attempt); var lease = TryMapLease( streamOptions, entry, options.ClaimantConsumer, now, _options.DefaultLeaseDuration, attempt == 0 ? null : attempt); if (lease is null) { await AckPoisonAsync(db, streamOptions, entry.Id).ConfigureAwait(false); continue; } leases.Add(lease); if (leases.Count >= options.BatchSize) { return leases; } } } return leases; } public async ValueTask DisposeAsync() { if (_disposed) { return; } _disposed = true; if (_connection is not null) { await _connection.CloseAsync(); _connection.Dispose(); } _connectionLock.Dispose(); _groupInitLock.Dispose(); GC.SuppressFinalize(this); } internal async Task AcknowledgeAsync( RedisNotifyEventLease lease, CancellationToken cancellationToken) { if (!lease.TryBeginCompletion()) { return; } var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false); var streamOptions = lease.StreamOptions; await db.StreamAcknowledgeAsync( streamOptions.Stream, streamOptions.ConsumerGroup, new RedisValue[] { lease.MessageId }) .ConfigureAwait(false); await db.StreamDeleteAsync( streamOptions.Stream, new RedisValue[] { lease.MessageId }) .ConfigureAwait(false); NotifyQueueMetrics.RecordAck(TransportName, streamOptions.Stream); _logger.LogDebug( "Acknowledged Notify event {EventId} on consumer {Consumer} (stream {Stream}, id {MessageId}).", lease.Message.Event.EventId, lease.Consumer, streamOptions.Stream, lease.MessageId); } internal async Task RenewLeaseAsync( RedisNotifyEventLease lease, TimeSpan leaseDuration, CancellationToken cancellationToken) { var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false); var streamOptions = lease.StreamOptions; await db.StreamClaimAsync( streamOptions.Stream, streamOptions.ConsumerGroup, lease.Consumer, 0, new RedisValue[] { lease.MessageId }) .ConfigureAwait(false); var expires = _timeProvider.GetUtcNow().Add(leaseDuration); lease.RefreshLease(expires); _logger.LogDebug( "Renewed Notify event lease for {EventId} until {Expires:u}.", lease.Message.Event.EventId, expires); } internal Task ReleaseAsync( RedisNotifyEventLease lease, NotifyQueueReleaseDisposition disposition, CancellationToken cancellationToken) => Task.FromException(new NotSupportedException("Retry/abandon is not supported for Notify event streams.")); internal async Task DeadLetterAsync( RedisNotifyEventLease lease, string reason, CancellationToken cancellationToken) { if (!lease.TryBeginCompletion()) { return; } var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false); var streamOptions = lease.StreamOptions; await db.StreamAcknowledgeAsync( streamOptions.Stream, streamOptions.ConsumerGroup, new RedisValue[] { lease.MessageId }) .ConfigureAwait(false); await db.StreamDeleteAsync( streamOptions.Stream, new RedisValue[] { lease.MessageId }) .ConfigureAwait(false); _logger.LogWarning( "Dead-lettered Notify event {EventId} on stream {Stream} with reason '{Reason}'.", lease.Message.Event.EventId, streamOptions.Stream, reason); } internal async ValueTask PingAsync(CancellationToken cancellationToken) { var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false); _ = await db.PingAsync().ConfigureAwait(false); } private NotifyRedisEventStreamOptions GetStreamOptions(string stream) { if (!_streamsByName.TryGetValue(stream, out var options)) { throw new InvalidOperationException($"Stream '{stream}' is not configured for the Notify event queue."); } return options; } private async Task GetDatabaseAsync(CancellationToken cancellationToken) { if (_connection is { IsConnected: true }) { return _connection.GetDatabase(_redisOptions.Database ?? -1); } await _connectionLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { if (_connection is { IsConnected: true }) { return _connection.GetDatabase(_redisOptions.Database ?? -1); } var configuration = ConfigurationOptions.Parse(_redisOptions.ConnectionString!); configuration.AbortOnConnectFail = false; if (_redisOptions.Database.HasValue) { configuration.DefaultDatabase = _redisOptions.Database; } using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); timeoutCts.CancelAfter(_redisOptions.InitializationTimeout); _connection = await _connectionFactory(configuration).WaitAsync(timeoutCts.Token).ConfigureAwait(false); return _connection.GetDatabase(_redisOptions.Database ?? -1); } finally { _connectionLock.Release(); } } private async Task EnsureStreamInitializedAsync( IDatabase database, NotifyRedisEventStreamOptions streamOptions, CancellationToken cancellationToken) { if (_initializedStreams.ContainsKey(streamOptions.Stream)) { return; } await _groupInitLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { if (_initializedStreams.ContainsKey(streamOptions.Stream)) { return; } try { await database.StreamCreateConsumerGroupAsync( streamOptions.Stream, streamOptions.ConsumerGroup, StreamPosition.Beginning, createStream: true) .ConfigureAwait(false); } catch (RedisServerException ex) when (ex.Message.Contains("BUSYGROUP", StringComparison.OrdinalIgnoreCase)) { // Consumer group already exists — nothing to do. } _initializedStreams[streamOptions.Stream] = true; } finally { _groupInitLock.Release(); } } private static async Task AddToStreamAsync( IDatabase database, NotifyRedisEventStreamOptions streamOptions, IReadOnlyList entries) { return await database.StreamAddAsync( streamOptions.Stream, entries.ToArray(), maxLength: streamOptions.ApproximateMaxLength, useApproximateMaxLength: streamOptions.ApproximateMaxLength is not null) .ConfigureAwait(false); } private IReadOnlyList BuildEntries( NotifyQueueEventMessage message, DateTimeOffset enqueuedAt, int attempt) { var payload = NotifyCanonicalJsonSerializer.Serialize(message.Event); var entries = new List(8 + message.Attributes.Count) { new(NotifyQueueFields.Payload, payload), new(NotifyQueueFields.EventId, message.Event.EventId.ToString("D")), new(NotifyQueueFields.Tenant, message.TenantId), new(NotifyQueueFields.Kind, message.Event.Kind), new(NotifyQueueFields.Attempt, attempt), new(NotifyQueueFields.EnqueuedAt, enqueuedAt.ToUnixTimeMilliseconds()), new(NotifyQueueFields.IdempotencyKey, message.IdempotencyKey), new(NotifyQueueFields.PartitionKey, message.PartitionKey ?? string.Empty), new(NotifyQueueFields.TraceId, message.TraceId ?? string.Empty) }; foreach (var kvp in message.Attributes) { entries.Add(new NameValueEntry( NotifyQueueFields.AttributePrefix + kvp.Key, kvp.Value)); } return entries; } private RedisNotifyEventLease? TryMapLease( NotifyRedisEventStreamOptions streamOptions, StreamEntry entry, string consumer, DateTimeOffset now, TimeSpan leaseDuration, int? attemptOverride) { if (entry.Values is null || entry.Values.Length == 0) { return null; } string? payloadJson = null; string? eventIdRaw = null; long? enqueuedAtUnix = null; string? idempotency = null; string? partitionKey = null; string? traceId = null; var attempt = attemptOverride ?? 1; var attributes = new Dictionary(StringComparer.Ordinal); foreach (var field in entry.Values) { var name = field.Name.ToString(); var value = field.Value; if (name.Equals(NotifyQueueFields.Payload, StringComparison.Ordinal)) { payloadJson = value.ToString(); } else if (name.Equals(NotifyQueueFields.EventId, StringComparison.Ordinal)) { eventIdRaw = value.ToString(); } else if (name.Equals(NotifyQueueFields.Attempt, StringComparison.Ordinal)) { if (int.TryParse(value.ToString(), NumberStyles.Integer, CultureInfo.InvariantCulture, out var parsed)) { attempt = Math.Max(parsed, attempt); } } else if (name.Equals(NotifyQueueFields.EnqueuedAt, StringComparison.Ordinal)) { if (long.TryParse(value.ToString(), NumberStyles.Integer, CultureInfo.InvariantCulture, out var unix)) { enqueuedAtUnix = unix; } } else if (name.Equals(NotifyQueueFields.IdempotencyKey, StringComparison.Ordinal)) { var text = value.ToString(); idempotency = string.IsNullOrWhiteSpace(text) ? null : text; } else if (name.Equals(NotifyQueueFields.PartitionKey, StringComparison.Ordinal)) { var text = value.ToString(); partitionKey = string.IsNullOrWhiteSpace(text) ? null : text; } else if (name.Equals(NotifyQueueFields.TraceId, StringComparison.Ordinal)) { var text = value.ToString(); traceId = string.IsNullOrWhiteSpace(text) ? null : text; } else if (name.StartsWith(NotifyQueueFields.AttributePrefix, StringComparison.Ordinal)) { var key = name[NotifyQueueFields.AttributePrefix.Length..]; attributes[key] = value.ToString(); } } if (payloadJson is null || enqueuedAtUnix is null) { return null; } NotifyEvent notifyEvent; try { notifyEvent = NotifyCanonicalJsonSerializer.Deserialize(payloadJson); } catch (Exception ex) { _logger.LogWarning( ex, "Failed to deserialize Notify event payload for stream {Stream} entry {EntryId}.", streamOptions.Stream, entry.Id.ToString()); return null; } var attributeView = attributes.Count == 0 ? EmptyReadOnlyDictionary.Instance : new ReadOnlyDictionary(attributes); var message = new NotifyQueueEventMessage( notifyEvent, streamOptions.Stream, idempotencyKey: idempotency ?? notifyEvent.EventId.ToString("N"), partitionKey: partitionKey, traceId: traceId, attributes: attributeView); var enqueuedAt = DateTimeOffset.FromUnixTimeMilliseconds(enqueuedAtUnix.Value); var leaseExpiresAt = now.Add(leaseDuration); return new RedisNotifyEventLease( this, streamOptions, entry.Id.ToString(), message, attempt, consumer, enqueuedAt, leaseExpiresAt); } private async Task AckPoisonAsync( IDatabase database, NotifyRedisEventStreamOptions streamOptions, RedisValue messageId) { await database.StreamAcknowledgeAsync( streamOptions.Stream, streamOptions.ConsumerGroup, new RedisValue[] { messageId }) .ConfigureAwait(false); await database.StreamDeleteAsync( streamOptions.Stream, new RedisValue[] { messageId }) .ConfigureAwait(false); } }