using System; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.JetStream; using NATS.Client.JetStream.Models; using StellaOps.HybridLogicalClock; namespace StellaOps.Scheduler.Queue.Nats; internal abstract class NatsSchedulerQueueBase : ISchedulerQueue, IAsyncDisposable, ISchedulerQueueTransportDiagnostics { private const string TransportName = "nats"; private static readonly INatsSerializer PayloadSerializer = NatsRawSerializer.Default; private readonly SchedulerQueueOptions _queueOptions; private readonly SchedulerNatsQueueOptions _natsOptions; private readonly SchedulerNatsStreamOptions _streamOptions; private readonly INatsSchedulerQueuePayload _payload; private readonly ILogger _logger; private readonly TimeProvider _timeProvider; private readonly IHybridLogicalClock? _hlc; private readonly SemaphoreSlim _connectionGate = new(1, 1); private readonly Func> _connectionFactory; private NatsConnection? _connection; private NatsJSContext? _jsContext; private INatsJSConsumer? _consumer; private bool _disposed; private long _approximateDepth; protected NatsSchedulerQueueBase( SchedulerQueueOptions queueOptions, SchedulerNatsQueueOptions natsOptions, SchedulerNatsStreamOptions streamOptions, INatsSchedulerQueuePayload payload, ILogger logger, TimeProvider timeProvider, IHybridLogicalClock? hlc = null, Func>? connectionFactory = null) { _queueOptions = queueOptions ?? throw new ArgumentNullException(nameof(queueOptions)); _natsOptions = natsOptions ?? throw new ArgumentNullException(nameof(natsOptions)); _streamOptions = streamOptions ?? throw new ArgumentNullException(nameof(streamOptions)); _payload = payload ?? throw new ArgumentNullException(nameof(payload)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _timeProvider = timeProvider ?? TimeProvider.System; _hlc = hlc; _connectionFactory = connectionFactory ?? ((opts, cancellationToken) => new ValueTask(new NatsConnection(opts))); if (string.IsNullOrWhiteSpace(_natsOptions.Url)) { throw new InvalidOperationException("NATS connection URL must be configured for the scheduler queue."); } } public async ValueTask EnqueueAsync( TMessage message, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(message); var js = await GetJetStreamAsync(cancellationToken).ConfigureAwait(false); await EnsureStreamAndConsumerAsync(js, cancellationToken).ConfigureAwait(false); var payloadBytes = _payload.Serialize(message); var idempotencyKey = _payload.GetIdempotencyKey(message); // Generate HLC timestamp if clock is available var hlcTimestamp = _hlc?.Tick(); var headers = BuildHeaders(message, idempotencyKey, hlcTimestamp); var publishOptions = new NatsJSPubOpts { MsgId = idempotencyKey, RetryAttempts = 0 }; var ack = await js.PublishAsync( _streamOptions.Subject, payloadBytes, PayloadSerializer, publishOptions, headers, cancellationToken) .ConfigureAwait(false); if (ack.Duplicate) { SchedulerQueueMetrics.RecordDeduplicated(TransportName, _payload.QueueName); _logger.LogDebug( "Duplicate enqueue detected for scheduler {Queue} message idempotency key {Key}; sequence {Sequence} reused.", _payload.QueueName, idempotencyKey, ack.Seq); PublishDepth(); return new SchedulerQueueEnqueueResult(ack.Seq.ToString(), true); } SchedulerQueueMetrics.RecordEnqueued(TransportName, _payload.QueueName); _logger.LogDebug( "Enqueued scheduler {Queue} message into stream {Stream} with sequence {Sequence}.", _payload.QueueName, ack.Stream, ack.Seq); IncrementDepth(); return new SchedulerQueueEnqueueResult(ack.Seq.ToString(), false); } public async ValueTask>> LeaseAsync( SchedulerQueueLeaseRequest request, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(request); var js = await GetJetStreamAsync(cancellationToken).ConfigureAwait(false); var consumer = await EnsureStreamAndConsumerAsync(js, cancellationToken).ConfigureAwait(false); var fetchOpts = new NatsJSFetchOpts { MaxMsgs = request.BatchSize, Expires = request.LeaseDuration, IdleHeartbeat = _natsOptions.IdleHeartbeat }; var now = _timeProvider.GetUtcNow(); var leases = new List>(request.BatchSize); await foreach (var message in consumer.FetchAsync(PayloadSerializer, fetchOpts, cancellationToken).ConfigureAwait(false)) { var lease = CreateLease(message, request.Consumer, now, request.LeaseDuration); if (lease is not null) { leases.Add(lease); } } PublishDepth(); return leases; } public async ValueTask>> ClaimExpiredAsync( SchedulerQueueClaimOptions options, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(options); var js = await GetJetStreamAsync(cancellationToken).ConfigureAwait(false); var consumer = await EnsureStreamAndConsumerAsync(js, cancellationToken).ConfigureAwait(false); var fetchOpts = new NatsJSFetchOpts { MaxMsgs = options.BatchSize, Expires = options.MinIdleTime, IdleHeartbeat = _natsOptions.IdleHeartbeat }; var now = _timeProvider.GetUtcNow(); var leases = new List>(options.BatchSize); await foreach (var message in consumer.FetchAsync(PayloadSerializer, fetchOpts, cancellationToken).ConfigureAwait(false)) { var deliveries = (int)(message.Metadata?.NumDelivered ?? 1); if (deliveries <= 1) { await message.NakAsync(new AckOpts(), TimeSpan.Zero, cancellationToken).ConfigureAwait(false); continue; } var lease = CreateLease(message, options.ClaimantConsumer, now, _queueOptions.DefaultLeaseDuration); if (lease is not null) { leases.Add(lease); } } PublishDepth(); return leases; } public async ValueTask DisposeAsync() { if (_disposed) { return; } _disposed = true; if (_connection is not null) { await _connection.DisposeAsync().ConfigureAwait(false); } _connectionGate.Dispose(); SchedulerQueueMetrics.RemoveDepth(TransportName, _payload.QueueName); GC.SuppressFinalize(this); } public async ValueTask PingAsync(CancellationToken cancellationToken) { var connection = await EnsureConnectionAsync(cancellationToken).ConfigureAwait(false); await connection.PingAsync(cancellationToken).ConfigureAwait(false); } internal async Task AcknowledgeAsync(NatsSchedulerQueueLease lease, CancellationToken cancellationToken) { if (!lease.TryBeginCompletion()) { return; } await lease.RawMessage.AckAsync(new AckOpts(), cancellationToken).ConfigureAwait(false); SchedulerQueueMetrics.RecordAck(TransportName, _payload.QueueName); DecrementDepth(); } internal async Task RenewAsync(NatsSchedulerQueueLease lease, TimeSpan leaseDuration, CancellationToken cancellationToken) { await lease.RawMessage.AckProgressAsync(new AckOpts(), cancellationToken).ConfigureAwait(false); lease.RefreshLease(_timeProvider.GetUtcNow().Add(leaseDuration)); } internal async Task ReleaseAsync(NatsSchedulerQueueLease lease, SchedulerQueueReleaseDisposition disposition, CancellationToken cancellationToken) { if (disposition == SchedulerQueueReleaseDisposition.Retry && lease.Attempt >= _queueOptions.MaxDeliveryAttempts) { await DeadLetterAsync(lease, $"max-delivery-attempts:{lease.Attempt}", cancellationToken).ConfigureAwait(false); return; } if (!lease.TryBeginCompletion()) { return; } if (disposition == SchedulerQueueReleaseDisposition.Retry) { SchedulerQueueMetrics.RecordRetry(TransportName, _payload.QueueName); var delay = CalculateBackoff(lease.Attempt + 1); lease.IncrementAttempt(); await lease.RawMessage.NakAsync(new AckOpts(), delay, cancellationToken).ConfigureAwait(false); _logger.LogWarning( "Requeued scheduler {Queue} message {RunId} with delay {Delay} (attempt {Attempt}).", _payload.QueueName, lease.RunId, delay, lease.Attempt); } else { await lease.RawMessage.AckTerminateAsync(new AckOpts(), cancellationToken).ConfigureAwait(false); SchedulerQueueMetrics.RecordAck(TransportName, _payload.QueueName); DecrementDepth(); _logger.LogInformation( "Abandoned scheduler {Queue} message {RunId} after {Attempt} attempt(s).", _payload.QueueName, lease.RunId, lease.Attempt); } PublishDepth(); } internal async Task DeadLetterAsync(NatsSchedulerQueueLease lease, string reason, CancellationToken cancellationToken) { if (!lease.TryBeginCompletion()) { return; } await lease.RawMessage.AckAsync(new AckOpts(), cancellationToken).ConfigureAwait(false); DecrementDepth(); var js = await GetJetStreamAsync(cancellationToken).ConfigureAwait(false); if (!_queueOptions.DeadLetterEnabled) { _logger.LogWarning( "Dropped scheduler {Queue} message {RunId} after {Attempt} attempt(s); dead-letter disabled. Reason: {Reason}", _payload.QueueName, lease.RunId, lease.Attempt, reason); PublishDepth(); return; } await EnsureDeadLetterStreamAsync(js, cancellationToken).ConfigureAwait(false); var headers = BuildDeadLetterHeaders(lease, reason); await js.PublishAsync( _streamOptions.DeadLetterSubject, lease.Payload, PayloadSerializer, new NatsJSPubOpts(), headers, cancellationToken) .ConfigureAwait(false); SchedulerQueueMetrics.RecordDeadLetter(TransportName, _payload.QueueName); _logger.LogError( "Dead-lettered scheduler {Queue} message {RunId} after {Attempt} attempt(s): {Reason}", _payload.QueueName, lease.RunId, lease.Attempt, reason); PublishDepth(); } private async Task GetJetStreamAsync(CancellationToken cancellationToken) { if (_jsContext is not null) { return _jsContext; } var connection = await EnsureConnectionAsync(cancellationToken).ConfigureAwait(false); await _connectionGate.WaitAsync(cancellationToken).ConfigureAwait(false); try { _jsContext ??= new NatsJSContext(connection); return _jsContext; } finally { _connectionGate.Release(); } } private async ValueTask EnsureStreamAndConsumerAsync(NatsJSContext js, CancellationToken cancellationToken) { if (_consumer is not null) { return _consumer; } await _connectionGate.WaitAsync(cancellationToken).ConfigureAwait(false); try { if (_consumer is not null) { return _consumer; } await EnsureStreamAsync(js, cancellationToken).ConfigureAwait(false); await EnsureDeadLetterStreamAsync(js, cancellationToken).ConfigureAwait(false); var consumerConfig = new ConsumerConfig { DurableName = _streamOptions.DurableConsumer, AckPolicy = ConsumerConfigAckPolicy.Explicit, ReplayPolicy = ConsumerConfigReplayPolicy.Instant, DeliverPolicy = ConsumerConfigDeliverPolicy.All, AckWait = ToNanoseconds(_streamOptions.AckWait), MaxAckPending = Math.Max(1, _streamOptions.MaxAckPending), MaxDeliver = Math.Max(1, _queueOptions.MaxDeliveryAttempts), FilterSubjects = new[] { _streamOptions.Subject } }; try { _consumer = await js.CreateConsumerAsync( _streamOptions.Stream, consumerConfig, cancellationToken) .ConfigureAwait(false); } catch (NatsJSApiException apiEx) { _logger.LogDebug(apiEx, "CreateConsumerAsync failed with code {Code}; attempting to reuse durable {Durable}.", apiEx.Error?.Code, _streamOptions.DurableConsumer); _consumer = await js.GetConsumerAsync( _streamOptions.Stream, _streamOptions.DurableConsumer, cancellationToken) .ConfigureAwait(false); } return _consumer; } finally { _connectionGate.Release(); } } private async Task EnsureStreamAsync(NatsJSContext js, CancellationToken cancellationToken) { try { await js.GetStreamAsync( _streamOptions.Stream, new StreamInfoRequest(), cancellationToken) .ConfigureAwait(false); } catch (NatsJSApiException) { var config = new StreamConfig( name: _streamOptions.Stream, subjects: new[] { _streamOptions.Subject }) { Retention = StreamConfigRetention.Workqueue, Storage = StreamConfigStorage.File, MaxConsumers = -1, MaxMsgs = -1, MaxBytes = -1, MaxAge = 0 }; await js.CreateStreamAsync(config, cancellationToken).ConfigureAwait(false); _logger.LogInformation( "Created NATS JetStream stream {Stream} ({Subject}) for scheduler {Queue} queue.", _streamOptions.Stream, _streamOptions.Subject, _payload.QueueName); } } private async Task EnsureDeadLetterStreamAsync(NatsJSContext js, CancellationToken cancellationToken) { if (string.IsNullOrWhiteSpace(_streamOptions.DeadLetterStream) || string.IsNullOrWhiteSpace(_streamOptions.DeadLetterSubject)) { return; } try { await js.GetStreamAsync( _streamOptions.DeadLetterStream, new StreamInfoRequest(), cancellationToken) .ConfigureAwait(false); } catch (NatsJSApiException) { var config = new StreamConfig( name: _streamOptions.DeadLetterStream, subjects: new[] { _streamOptions.DeadLetterSubject }) { Retention = StreamConfigRetention.Workqueue, Storage = StreamConfigStorage.File, MaxConsumers = -1, MaxMsgs = -1, MaxBytes = -1, MaxAge = 0 }; await js.CreateStreamAsync(config, cancellationToken).ConfigureAwait(false); _logger.LogInformation( "Created NATS JetStream dead-letter stream {Stream} ({Subject}) for scheduler {Queue} queue.", _streamOptions.DeadLetterStream, _streamOptions.DeadLetterSubject, _payload.QueueName); } } private async Task EnsureConnectionAsync(CancellationToken cancellationToken) { if (_connection is not null) { return _connection; } await _connectionGate.WaitAsync(cancellationToken).ConfigureAwait(false); try { if (_connection is not null) { return _connection; } var options = new NatsOpts { Url = _natsOptions.Url!, Name = $"stellaops-scheduler-{_payload.QueueName}-queue", CommandTimeout = TimeSpan.FromSeconds(10), RequestTimeout = TimeSpan.FromSeconds(20), PingInterval = TimeSpan.FromSeconds(30) }; _connection = await _connectionFactory(options, cancellationToken).ConfigureAwait(false); await _connection.ConnectAsync().ConfigureAwait(false); return _connection; } finally { _connectionGate.Release(); } } private NatsSchedulerQueueLease? CreateLease( NatsJSMsg message, string consumer, DateTimeOffset now, TimeSpan leaseDuration) { var payload = message.Data ?? ReadOnlyMemory.Empty; if (payload.IsEmpty) { return null; } TMessage deserialized; try { deserialized = _payload.Deserialize(payload.ToArray()); } catch (Exception ex) { _logger.LogError(ex, "Failed to deserialize scheduler {Queue} payload from NATS sequence {Sequence}.", _payload.QueueName, message.Metadata?.Sequence); return null; } var attempt = (int)(message.Metadata?.NumDelivered ?? 1); if (attempt <= 0) { attempt = 1; } var headers = message.Headers ?? new NatsHeaders(); var enqueuedAt = headers.TryGetValue(SchedulerQueueFields.EnqueuedAt, out var enqueuedValues) && enqueuedValues.Count > 0 && long.TryParse(enqueuedValues[0], out var unix) ? DateTimeOffset.FromUnixTimeMilliseconds(unix) : now; // Parse HLC timestamp if present HlcTimestamp? hlcTimestamp = null; if (headers.TryGetValue(SchedulerQueueFields.HlcTimestamp, out var hlcValues) && hlcValues.Count > 0 && HlcTimestamp.TryParse(hlcValues[0], out var parsedHlc)) { hlcTimestamp = parsedHlc; } var leaseExpires = now.Add(leaseDuration); var runId = _payload.GetRunId(deserialized); var tenantId = _payload.GetTenantId(deserialized); var scheduleId = _payload.GetScheduleId(deserialized); var segmentId = _payload.GetSegmentId(deserialized); var correlationId = _payload.GetCorrelationId(deserialized); var attributes = _payload.GetAttributes(deserialized) ?? new Dictionary(); var attributeView = attributes.Count == 0 ? EmptyReadOnlyDictionary.Instance : new ReadOnlyDictionary(new Dictionary(attributes, StringComparer.Ordinal)); return new NatsSchedulerQueueLease( this, message, payload.ToArray(), _payload.GetIdempotencyKey(deserialized), runId, tenantId, scheduleId, segmentId, correlationId, attributeView, deserialized, attempt, enqueuedAt, leaseExpires, consumer, hlcTimestamp); } private NatsHeaders BuildHeaders(TMessage message, string idempotencyKey, HlcTimestamp? hlcTimestamp = null) { var headers = new NatsHeaders { { SchedulerQueueFields.IdempotencyKey, idempotencyKey }, { SchedulerQueueFields.RunId, _payload.GetRunId(message) }, { SchedulerQueueFields.TenantId, _payload.GetTenantId(message) }, { SchedulerQueueFields.QueueKind, _payload.QueueName }, { SchedulerQueueFields.EnqueuedAt, _timeProvider.GetUtcNow().ToUnixTimeMilliseconds().ToString() } }; // Include HLC timestamp if available if (hlcTimestamp.HasValue) { headers.Add(SchedulerQueueFields.HlcTimestamp, hlcTimestamp.Value.ToSortableString()); } var scheduleId = _payload.GetScheduleId(message); if (!string.IsNullOrWhiteSpace(scheduleId)) { headers.Add(SchedulerQueueFields.ScheduleId, scheduleId); } var segmentId = _payload.GetSegmentId(message); if (!string.IsNullOrWhiteSpace(segmentId)) { headers.Add(SchedulerQueueFields.SegmentId, segmentId); } var correlationId = _payload.GetCorrelationId(message); if (!string.IsNullOrWhiteSpace(correlationId)) { headers.Add(SchedulerQueueFields.CorrelationId, correlationId); } var attributes = _payload.GetAttributes(message); if (attributes is not null) { foreach (var kvp in attributes) { headers.Add(SchedulerQueueFields.AttributePrefix + kvp.Key, kvp.Value); } } return headers; } private NatsHeaders BuildDeadLetterHeaders(NatsSchedulerQueueLease lease, string reason) { var headers = new NatsHeaders { { SchedulerQueueFields.RunId, lease.RunId }, { SchedulerQueueFields.TenantId, lease.TenantId }, { SchedulerQueueFields.QueueKind, _payload.QueueName }, { "reason", reason } }; if (!string.IsNullOrWhiteSpace(lease.ScheduleId)) { headers.Add(SchedulerQueueFields.ScheduleId, lease.ScheduleId); } if (!string.IsNullOrWhiteSpace(lease.CorrelationId)) { headers.Add(SchedulerQueueFields.CorrelationId, lease.CorrelationId); } if (!string.IsNullOrWhiteSpace(lease.SegmentId)) { headers.Add(SchedulerQueueFields.SegmentId, lease.SegmentId); } return headers; } private TimeSpan CalculateBackoff(int attempt) { var initial = _queueOptions.RetryInitialBackoff > TimeSpan.Zero ? _queueOptions.RetryInitialBackoff : _streamOptions.RetryDelay; if (initial <= TimeSpan.Zero) { return TimeSpan.Zero; } if (attempt <= 1) { return initial; } var max = _queueOptions.RetryMaxBackoff > TimeSpan.Zero ? _queueOptions.RetryMaxBackoff : initial; var exponent = attempt - 1; var scaledTicks = initial.Ticks * Math.Pow(2, exponent - 1); var cappedTicks = Math.Min(max.Ticks, scaledTicks); return TimeSpan.FromTicks((long)Math.Max(initial.Ticks, cappedTicks)); } private static long ToNanoseconds(TimeSpan value) => value <= TimeSpan.Zero ? 0 : (long)(value.TotalMilliseconds * 1_000_000.0); private sealed class EmptyReadOnlyDictionary where TKey : notnull { public static readonly IReadOnlyDictionary Instance = new ReadOnlyDictionary(new Dictionary(0, EqualityComparer.Default)); } private void IncrementDepth() { var depth = Interlocked.Increment(ref _approximateDepth); SchedulerQueueMetrics.RecordDepth(TransportName, _payload.QueueName, depth); } private void DecrementDepth() { var depth = Interlocked.Decrement(ref _approximateDepth); if (depth < 0) { depth = Interlocked.Exchange(ref _approximateDepth, 0); } SchedulerQueueMetrics.RecordDepth(TransportName, _payload.QueueName, depth); } private void PublishDepth() { var depth = Volatile.Read(ref _approximateDepth); SchedulerQueueMetrics.RecordDepth(TransportName, _payload.QueueName, depth); } }