using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Globalization; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.JetStream; using NATS.Client.JetStream.Models; using StellaOps.Notify.Models; namespace StellaOps.Notify.Queue.Nats; internal sealed class NatsNotifyDeliveryQueue : INotifyDeliveryQueue, IAsyncDisposable { private const string TransportName = "nats"; private static readonly INatsSerializer PayloadSerializer = NatsRawSerializer.Default; private readonly NotifyDeliveryQueueOptions _queueOptions; private readonly NotifyNatsDeliveryQueueOptions _options; private readonly ILogger _logger; private readonly TimeProvider _timeProvider; private readonly SemaphoreSlim _connectionGate = new(1, 1); private readonly Func> _connectionFactory; private NatsConnection? _connection; private NatsJSContext? _jsContext; private INatsJSConsumer? _consumer; private bool _disposed; public NatsNotifyDeliveryQueue( NotifyDeliveryQueueOptions queueOptions, NotifyNatsDeliveryQueueOptions options, ILogger logger, TimeProvider timeProvider, Func>? connectionFactory = null) { _queueOptions = queueOptions ?? throw new ArgumentNullException(nameof(queueOptions)); _options = options ?? throw new ArgumentNullException(nameof(options)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _timeProvider = timeProvider ?? TimeProvider.System; _connectionFactory = connectionFactory ?? ((opts, token) => new ValueTask(new NatsConnection(opts))); if (string.IsNullOrWhiteSpace(_options.Url)) { throw new InvalidOperationException("NATS connection URL must be configured for the Notify delivery queue."); } if (string.IsNullOrWhiteSpace(_options.Stream) || string.IsNullOrWhiteSpace(_options.Subject)) { throw new InvalidOperationException("NATS stream and subject must be configured for the Notify delivery queue."); } } public async ValueTask PublishAsync( NotifyDeliveryQueueMessage message, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(message); var js = await GetJetStreamAsync(cancellationToken).ConfigureAwait(false); await EnsureStreamAndConsumerAsync(js, cancellationToken).ConfigureAwait(false); await EnsureDeadLetterStreamAsync(js, cancellationToken).ConfigureAwait(false); var payload = Encoding.UTF8.GetBytes(NotifyCanonicalJsonSerializer.Serialize(message.Delivery)); var headers = BuildHeaders(message); var publishOpts = new NatsJSPubOpts { MsgId = message.IdempotencyKey, RetryAttempts = 0 }; var ack = await js.PublishAsync( _options.Subject, payload, PayloadSerializer, publishOpts, headers, cancellationToken) .ConfigureAwait(false); if (ack.Duplicate) { NotifyQueueMetrics.RecordDeduplicated(TransportName, _options.Stream); _logger.LogDebug( "Duplicate Notify delivery enqueue detected for delivery {DeliveryId}.", message.Delivery.DeliveryId); return new NotifyQueueEnqueueResult(ack.Seq.ToString(), true); } NotifyQueueMetrics.RecordEnqueued(TransportName, _options.Stream); _logger.LogDebug( "Enqueued Notify delivery {DeliveryId} into NATS stream {Stream} (sequence {Sequence}).", message.Delivery.DeliveryId, ack.Stream, ack.Seq); return new NotifyQueueEnqueueResult(ack.Seq.ToString(), false); } public async ValueTask>> LeaseAsync( NotifyQueueLeaseRequest request, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(request); var js = await GetJetStreamAsync(cancellationToken).ConfigureAwait(false); var consumer = await EnsureStreamAndConsumerAsync(js, cancellationToken).ConfigureAwait(false); var fetchOpts = new NatsJSFetchOpts { MaxMsgs = request.BatchSize, Expires = request.LeaseDuration, IdleHeartbeat = _options.IdleHeartbeat }; var now = _timeProvider.GetUtcNow(); var leases = new List>(request.BatchSize); await foreach (var msg in consumer.FetchAsync(PayloadSerializer, fetchOpts, cancellationToken).ConfigureAwait(false)) { var lease = CreateLease(msg, request.Consumer, now, request.LeaseDuration); if (lease is null) { await msg.AckAsync(new AckOpts(), cancellationToken).ConfigureAwait(false); continue; } leases.Add(lease); } return leases; } public async ValueTask>> ClaimExpiredAsync( NotifyQueueClaimOptions options, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(options); var js = await GetJetStreamAsync(cancellationToken).ConfigureAwait(false); var consumer = await EnsureStreamAndConsumerAsync(js, cancellationToken).ConfigureAwait(false); var fetchOpts = new NatsJSFetchOpts { MaxMsgs = options.BatchSize, Expires = options.MinIdleTime, IdleHeartbeat = _options.IdleHeartbeat }; var now = _timeProvider.GetUtcNow(); var leases = new List>(options.BatchSize); await foreach (var msg in consumer.FetchAsync(PayloadSerializer, fetchOpts, cancellationToken).ConfigureAwait(false)) { var deliveries = (int)(msg.Metadata?.NumDelivered ?? 1); if (deliveries <= 1) { await msg.NakAsync(new AckOpts(), TimeSpan.Zero, cancellationToken).ConfigureAwait(false); continue; } var lease = CreateLease(msg, options.ClaimantConsumer, now, _queueOptions.DefaultLeaseDuration); if (lease is null) { await msg.AckAsync(new AckOpts(), cancellationToken).ConfigureAwait(false); continue; } leases.Add(lease); } return leases; } public async ValueTask DisposeAsync() { if (_disposed) { return; } _disposed = true; if (_connection is not null) { await _connection.DisposeAsync().ConfigureAwait(false); } _connectionGate.Dispose(); GC.SuppressFinalize(this); } internal async Task AcknowledgeAsync( NatsNotifyDeliveryLease lease, CancellationToken cancellationToken) { if (!lease.TryBeginCompletion()) { return; } await lease.RawMessage.AckAsync(new AckOpts(), cancellationToken).ConfigureAwait(false); NotifyQueueMetrics.RecordAck(TransportName, _options.Stream); _logger.LogDebug( "Acknowledged Notify delivery {DeliveryId} (sequence {Sequence}).", lease.Message.Delivery.DeliveryId, lease.MessageId); } internal async Task RenewLeaseAsync( NatsNotifyDeliveryLease lease, TimeSpan leaseDuration, CancellationToken cancellationToken) { await lease.RawMessage.AckProgressAsync(new AckOpts(), cancellationToken).ConfigureAwait(false); var expires = _timeProvider.GetUtcNow().Add(leaseDuration); lease.RefreshLease(expires); _logger.LogDebug( "Renewed NATS lease for Notify delivery {DeliveryId} until {Expires:u}.", lease.Message.Delivery.DeliveryId, expires); } internal async Task ReleaseAsync( NatsNotifyDeliveryLease lease, NotifyQueueReleaseDisposition disposition, CancellationToken cancellationToken) { if (disposition == NotifyQueueReleaseDisposition.Retry && lease.Attempt >= _queueOptions.MaxDeliveryAttempts) { _logger.LogWarning( "Notify delivery {DeliveryId} reached max delivery attempts ({Attempts}); moving to dead-letter stream.", lease.Message.Delivery.DeliveryId, lease.Attempt); await DeadLetterAsync( lease, $"max-delivery-attempts:{lease.Attempt}", cancellationToken).ConfigureAwait(false); return; } if (!lease.TryBeginCompletion()) { return; } if (disposition == NotifyQueueReleaseDisposition.Retry) { var delay = CalculateBackoff(lease.Attempt); await lease.RawMessage.NakAsync(new AckOpts(), delay, cancellationToken).ConfigureAwait(false); NotifyQueueMetrics.RecordRetry(TransportName, _options.Stream); _logger.LogInformation( "Scheduled Notify delivery {DeliveryId} for retry with delay {Delay} (attempt {Attempt}).", lease.Message.Delivery.DeliveryId, delay, lease.Attempt); } else { await lease.RawMessage.AckTerminateAsync(new AckOpts(), cancellationToken).ConfigureAwait(false); NotifyQueueMetrics.RecordAck(TransportName, _options.Stream); _logger.LogInformation( "Abandoned Notify delivery {DeliveryId} after {Attempt} attempt(s).", lease.Message.Delivery.DeliveryId, lease.Attempt); } } internal async Task DeadLetterAsync( NatsNotifyDeliveryLease lease, string reason, CancellationToken cancellationToken) { if (!lease.TryBeginCompletion()) { return; } await lease.RawMessage.AckTerminateAsync(new AckOpts(), cancellationToken).ConfigureAwait(false); var js = await GetJetStreamAsync(cancellationToken).ConfigureAwait(false); await EnsureDeadLetterStreamAsync(js, cancellationToken).ConfigureAwait(false); var payload = Encoding.UTF8.GetBytes(NotifyCanonicalJsonSerializer.Serialize(lease.Message.Delivery)); var headers = BuildDeadLetterHeaders(lease, reason); await js.PublishAsync( _options.DeadLetterSubject, payload, PayloadSerializer, new NatsJSPubOpts(), headers, cancellationToken) .ConfigureAwait(false); NotifyQueueMetrics.RecordDeadLetter(TransportName, _options.DeadLetterStream); _logger.LogError( "Dead-lettered Notify delivery {DeliveryId} (attempt {Attempt}): {Reason}", lease.Message.Delivery.DeliveryId, lease.Attempt, reason); } internal async Task PingAsync(CancellationToken cancellationToken) { var connection = await EnsureConnectionAsync(cancellationToken).ConfigureAwait(false); await connection.PingAsync(cancellationToken).ConfigureAwait(false); } private async Task GetJetStreamAsync(CancellationToken cancellationToken) { if (_jsContext is not null) { return _jsContext; } var connection = await EnsureConnectionAsync(cancellationToken).ConfigureAwait(false); await _connectionGate.WaitAsync(cancellationToken).ConfigureAwait(false); try { _jsContext ??= new NatsJSContext(connection); return _jsContext; } finally { _connectionGate.Release(); } } private async ValueTask EnsureStreamAndConsumerAsync( NatsJSContext js, CancellationToken cancellationToken) { if (_consumer is not null) { return _consumer; } await _connectionGate.WaitAsync(cancellationToken).ConfigureAwait(false); try { if (_consumer is not null) { return _consumer; } await EnsureStreamAsync(js, cancellationToken).ConfigureAwait(false); await EnsureDeadLetterStreamAsync(js, cancellationToken).ConfigureAwait(false); var consumerConfig = new ConsumerConfig { DurableName = _options.DurableConsumer, AckPolicy = ConsumerConfigAckPolicy.Explicit, ReplayPolicy = ConsumerConfigReplayPolicy.Instant, DeliverPolicy = ConsumerConfigDeliverPolicy.All, AckWait = ToNanoseconds(_options.AckWait), MaxAckPending = _options.MaxAckPending, MaxDeliver = Math.Max(1, _queueOptions.MaxDeliveryAttempts), FilterSubjects = new[] { _options.Subject } }; try { _consumer = await js.CreateConsumerAsync( _options.Stream, consumerConfig, cancellationToken) .ConfigureAwait(false); } catch (NatsJSApiException apiEx) { _logger.LogDebug( apiEx, "CreateConsumerAsync failed with code {Code}; attempting to fetch existing durable consumer {Durable}.", apiEx.Error?.Code, _options.DurableConsumer); _consumer = await js.GetConsumerAsync( _options.Stream, _options.DurableConsumer, cancellationToken) .ConfigureAwait(false); } return _consumer; } finally { _connectionGate.Release(); } } private async Task EnsureConnectionAsync(CancellationToken cancellationToken) { if (_connection is not null) { return _connection; } await _connectionGate.WaitAsync(cancellationToken).ConfigureAwait(false); try { if (_connection is not null) { return _connection; } var opts = new NatsOpts { Url = _options.Url!, Name = "stellaops-notify-delivery", CommandTimeout = TimeSpan.FromSeconds(10), RequestTimeout = TimeSpan.FromSeconds(20), PingInterval = TimeSpan.FromSeconds(30) }; _connection = await _connectionFactory(opts, cancellationToken).ConfigureAwait(false); await _connection.ConnectAsync().ConfigureAwait(false); return _connection; } finally { _connectionGate.Release(); } } private async Task EnsureStreamAsync(NatsJSContext js, CancellationToken cancellationToken) { try { await js.GetStreamAsync(_options.Stream, cancellationToken: cancellationToken).ConfigureAwait(false); } catch (NatsJSApiException ex) when (ex.Error?.Code == 404) { var config = new StreamConfig(name: _options.Stream, subjects: new[] { _options.Subject }) { Retention = StreamConfigRetention.Workqueue, Storage = StreamConfigStorage.File, MaxConsumers = -1, MaxMsgs = -1, MaxBytes = -1 }; await js.CreateStreamAsync(config, cancellationToken).ConfigureAwait(false); _logger.LogInformation("Created NATS Notify delivery stream {Stream} ({Subject}).", _options.Stream, _options.Subject); } } private async Task EnsureDeadLetterStreamAsync(NatsJSContext js, CancellationToken cancellationToken) { try { await js.GetStreamAsync(_options.DeadLetterStream, cancellationToken: cancellationToken).ConfigureAwait(false); } catch (NatsJSApiException ex) when (ex.Error?.Code == 404) { var config = new StreamConfig(name: _options.DeadLetterStream, subjects: new[] { _options.DeadLetterSubject }) { Retention = StreamConfigRetention.Workqueue, Storage = StreamConfigStorage.File, MaxConsumers = -1, MaxMsgs = -1, MaxBytes = -1 }; await js.CreateStreamAsync(config, cancellationToken).ConfigureAwait(false); _logger.LogInformation("Created NATS Notify delivery dead-letter stream {Stream} ({Subject}).", _options.DeadLetterStream, _options.DeadLetterSubject); } } private NatsNotifyDeliveryLease? CreateLease( NatsJSMsg message, string consumer, DateTimeOffset now, TimeSpan leaseDuration) { var payloadBytes = message.Data ?? Array.Empty(); if (payloadBytes.Length == 0) { return null; } NotifyDelivery delivery; try { var json = Encoding.UTF8.GetString(payloadBytes); delivery = NotifyCanonicalJsonSerializer.Deserialize(json); } catch (Exception ex) { _logger.LogWarning( ex, "Failed to deserialize Notify delivery payload for NATS message {Sequence}.", message.Metadata?.Sequence.Stream); return null; } var headers = message.Headers ?? new NatsHeaders(); var deliveryId = TryGetHeader(headers, NotifyQueueFields.DeliveryId) ?? delivery.DeliveryId; var channelId = TryGetHeader(headers, NotifyQueueFields.ChannelId); var channelTypeRaw = TryGetHeader(headers, NotifyQueueFields.ChannelType); if (channelId is null || channelTypeRaw is null) { return null; } if (!Enum.TryParse(channelTypeRaw, ignoreCase: true, out var channelType)) { _logger.LogWarning("Unknown channel type '{ChannelType}' for delivery {DeliveryId}.", channelTypeRaw, deliveryId); return null; } var traceId = TryGetHeader(headers, NotifyQueueFields.TraceId); var partitionKey = TryGetHeader(headers, NotifyQueueFields.PartitionKey) ?? channelId; var idempotencyKey = TryGetHeader(headers, NotifyQueueFields.IdempotencyKey) ?? delivery.DeliveryId; var enqueuedAt = TryGetHeader(headers, NotifyQueueFields.EnqueuedAt) is { } enqueuedRaw && long.TryParse(enqueuedRaw, NumberStyles.Integer, CultureInfo.InvariantCulture, out var unix) ? DateTimeOffset.FromUnixTimeMilliseconds(unix) : now; var attempt = TryGetHeader(headers, NotifyQueueFields.Attempt) is { } attemptRaw && int.TryParse(attemptRaw, NumberStyles.Integer, CultureInfo.InvariantCulture, out var parsedAttempt) ? parsedAttempt : 1; if (message.Metadata?.NumDelivered is ulong delivered && delivered > 0) { var deliveredInt = delivered > int.MaxValue ? int.MaxValue : (int)delivered; if (deliveredInt > attempt) { attempt = deliveredInt; } } var attributes = ExtractAttributes(headers); var leaseExpires = now.Add(leaseDuration); var messageId = message.Metadata?.Sequence.Stream.ToString() ?? Guid.NewGuid().ToString("n"); var queueMessage = new NotifyDeliveryQueueMessage( delivery, channelId, channelType, _options.Subject, traceId, attributes); return new NatsNotifyDeliveryLease( this, message, messageId, queueMessage, attempt, consumer, enqueuedAt, leaseExpires, idempotencyKey); } private NatsHeaders BuildHeaders(NotifyDeliveryQueueMessage message) { var headers = new NatsHeaders { { NotifyQueueFields.DeliveryId, message.Delivery.DeliveryId }, { NotifyQueueFields.ChannelId, message.ChannelId }, { NotifyQueueFields.ChannelType, message.ChannelType.ToString() }, { NotifyQueueFields.Tenant, message.Delivery.TenantId }, { NotifyQueueFields.Attempt, "1" }, { NotifyQueueFields.EnqueuedAt, _timeProvider.GetUtcNow().ToUnixTimeMilliseconds().ToString(CultureInfo.InvariantCulture) }, { NotifyQueueFields.IdempotencyKey, message.IdempotencyKey }, { NotifyQueueFields.PartitionKey, message.PartitionKey } }; if (!string.IsNullOrWhiteSpace(message.TraceId)) { headers.Add(NotifyQueueFields.TraceId, message.TraceId!); } foreach (var kvp in message.Attributes) { headers.Add(NotifyQueueFields.AttributePrefix + kvp.Key, kvp.Value); } return headers; } private NatsHeaders BuildDeadLetterHeaders(NatsNotifyDeliveryLease lease, string reason) { var headers = new NatsHeaders { { NotifyQueueFields.DeliveryId, lease.Message.Delivery.DeliveryId }, { NotifyQueueFields.ChannelId, lease.Message.ChannelId }, { NotifyQueueFields.ChannelType, lease.Message.ChannelType.ToString() }, { NotifyQueueFields.Tenant, lease.Message.Delivery.TenantId }, { NotifyQueueFields.Attempt, lease.Attempt.ToString(CultureInfo.InvariantCulture) }, { NotifyQueueFields.IdempotencyKey, lease.Message.IdempotencyKey }, { "deadletter-reason", reason } }; if (!string.IsNullOrWhiteSpace(lease.Message.TraceId)) { headers.Add(NotifyQueueFields.TraceId, lease.Message.TraceId!); } foreach (var kvp in lease.Message.Attributes) { headers.Add(NotifyQueueFields.AttributePrefix + kvp.Key, kvp.Value); } return headers; } private static string? TryGetHeader(NatsHeaders headers, string key) { if (headers.TryGetValue(key, out var values) && values.Count > 0) { var value = values[0]; return string.IsNullOrWhiteSpace(value) ? null : value; } return null; } private static IReadOnlyDictionary ExtractAttributes(NatsHeaders headers) { var attributes = new Dictionary(StringComparer.Ordinal); foreach (var key in headers.Keys) { if (!key.StartsWith(NotifyQueueFields.AttributePrefix, StringComparison.Ordinal)) { continue; } if (headers.TryGetValue(key, out var values) && values.Count > 0) { attributes[key[NotifyQueueFields.AttributePrefix.Length..]] = values[0]!; } } return attributes.Count == 0 ? EmptyReadOnlyDictionary.Instance : new ReadOnlyDictionary(attributes); } private TimeSpan CalculateBackoff(int attempt) { var initial = _queueOptions.RetryInitialBackoff > TimeSpan.Zero ? _queueOptions.RetryInitialBackoff : _options.RetryDelay; if (initial <= TimeSpan.Zero) { return TimeSpan.Zero; } if (attempt <= 1) { return initial; } var max = _queueOptions.RetryMaxBackoff > TimeSpan.Zero ? _queueOptions.RetryMaxBackoff : initial; var exponent = attempt - 1; var scaledTicks = initial.Ticks * Math.Pow(2, exponent - 1); var cappedTicks = Math.Min(max.Ticks, scaledTicks); var resultTicks = Math.Max(initial.Ticks, (long)cappedTicks); return TimeSpan.FromTicks(resultTicks); } private static long ToNanoseconds(TimeSpan value) => value <= TimeSpan.Zero ? 0 : value.Ticks * 100L; private static class EmptyReadOnlyDictionary where TKey : notnull { public static readonly IReadOnlyDictionary Instance = new ReadOnlyDictionary(new Dictionary(0, EqualityComparer.Default)); } }