Resolve Concelier/Excititor merge conflicts
This commit is contained in:
15
src/StellaOps.Scanner.Queue/AGENTS.md
Normal file
15
src/StellaOps.Scanner.Queue/AGENTS.md
Normal file
@@ -0,0 +1,15 @@
|
||||
# StellaOps.Scanner.Queue — Agent Charter
|
||||
|
||||
## Mission
|
||||
Deliver the scanner job queue backbone defined in `docs/ARCHITECTURE_SCANNER.md`, providing deterministic, offline-friendly leasing semantics for WebService producers and Worker consumers.
|
||||
|
||||
## Responsibilities
|
||||
- Define queue abstractions with idempotent enqueue tokens, acknowledgement, lease renewal, and claim support.
|
||||
- Ship first-party adapters for Redis Streams and NATS JetStream, respecting offline deployments and allow-listed hosts.
|
||||
- Surface health probes, structured diagnostics, and metrics needed by Scanner WebService/Worker.
|
||||
- Document operational expectations and configuration binding hooks.
|
||||
|
||||
## Interfaces & Dependencies
|
||||
- Consumes shared configuration primitives from `StellaOps.Configuration`.
|
||||
- Exposes dependency injection extensions for `StellaOps.DependencyInjection`.
|
||||
- Targets `net10.0` (preview) and aligns with scanner DTOs once `StellaOps.Scanner.Core` lands.
|
||||
20
src/StellaOps.Scanner.Queue/IScanQueue.cs
Normal file
20
src/StellaOps.Scanner.Queue/IScanQueue.cs
Normal file
@@ -0,0 +1,20 @@
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace StellaOps.Scanner.Queue;
|
||||
|
||||
public interface IScanQueue
|
||||
{
|
||||
ValueTask<QueueEnqueueResult> EnqueueAsync(
|
||||
ScanQueueMessage message,
|
||||
CancellationToken cancellationToken = default);
|
||||
|
||||
ValueTask<IReadOnlyList<IScanQueueLease>> LeaseAsync(
|
||||
QueueLeaseRequest request,
|
||||
CancellationToken cancellationToken = default);
|
||||
|
||||
ValueTask<IReadOnlyList<IScanQueueLease>> ClaimExpiredLeasesAsync(
|
||||
QueueClaimOptions options,
|
||||
CancellationToken cancellationToken = default);
|
||||
}
|
||||
37
src/StellaOps.Scanner.Queue/IScanQueueLease.cs
Normal file
37
src/StellaOps.Scanner.Queue/IScanQueueLease.cs
Normal file
@@ -0,0 +1,37 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace StellaOps.Scanner.Queue;
|
||||
|
||||
public interface IScanQueueLease
|
||||
{
|
||||
string MessageId { get; }
|
||||
|
||||
string JobId { get; }
|
||||
|
||||
ReadOnlyMemory<byte> Payload { get; }
|
||||
|
||||
int Attempt { get; }
|
||||
|
||||
DateTimeOffset EnqueuedAt { get; }
|
||||
|
||||
DateTimeOffset LeaseExpiresAt { get; }
|
||||
|
||||
string Consumer { get; }
|
||||
|
||||
string? IdempotencyKey { get; }
|
||||
|
||||
string? TraceId { get; }
|
||||
|
||||
IReadOnlyDictionary<string, string> Attributes { get; }
|
||||
|
||||
Task AcknowledgeAsync(CancellationToken cancellationToken = default);
|
||||
|
||||
Task RenewAsync(TimeSpan leaseDuration, CancellationToken cancellationToken = default);
|
||||
|
||||
Task ReleaseAsync(QueueReleaseDisposition disposition, CancellationToken cancellationToken = default);
|
||||
|
||||
Task DeadLetterAsync(string reason, CancellationToken cancellationToken = default);
|
||||
}
|
||||
654
src/StellaOps.Scanner.Queue/Nats/NatsScanQueue.cs
Normal file
654
src/StellaOps.Scanner.Queue/Nats/NatsScanQueue.cs
Normal file
@@ -0,0 +1,654 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Collections.ObjectModel;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using NATS.Client.Core;
|
||||
using NATS.Client.JetStream;
|
||||
using NATS.Client.JetStream.Models;
|
||||
|
||||
namespace StellaOps.Scanner.Queue.Nats;
|
||||
|
||||
internal sealed class NatsScanQueue : IScanQueue, IAsyncDisposable
|
||||
{
|
||||
private const string TransportName = "nats";
|
||||
|
||||
private static readonly INatsSerializer<byte[]> PayloadSerializer = NatsRawSerializer<byte[]>.Default;
|
||||
|
||||
private readonly ScannerQueueOptions _queueOptions;
|
||||
private readonly NatsQueueOptions _options;
|
||||
private readonly ILogger<NatsScanQueue> _logger;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly SemaphoreSlim _connectionGate = new(1, 1);
|
||||
private readonly Func<NatsOpts, CancellationToken, ValueTask<NatsConnection>> _connectionFactory;
|
||||
|
||||
private NatsConnection? _connection;
|
||||
private NatsJSContext? _jsContext;
|
||||
private INatsJSConsumer? _consumer;
|
||||
private bool _disposed;
|
||||
|
||||
public NatsScanQueue(
|
||||
ScannerQueueOptions queueOptions,
|
||||
NatsQueueOptions options,
|
||||
ILogger<NatsScanQueue> logger,
|
||||
TimeProvider timeProvider,
|
||||
Func<NatsOpts, CancellationToken, ValueTask<NatsConnection>>? connectionFactory = null)
|
||||
{
|
||||
_queueOptions = queueOptions ?? throw new ArgumentNullException(nameof(queueOptions));
|
||||
_options = options ?? throw new ArgumentNullException(nameof(options));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
_connectionFactory = connectionFactory ?? ((opts, cancellationToken) => new ValueTask<NatsConnection>(new NatsConnection(opts)));
|
||||
|
||||
if (string.IsNullOrWhiteSpace(_options.Url))
|
||||
{
|
||||
throw new InvalidOperationException("NATS connection URL must be configured for the scanner queue.");
|
||||
}
|
||||
}
|
||||
|
||||
public async ValueTask<QueueEnqueueResult> EnqueueAsync(
|
||||
ScanQueueMessage message,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(message);
|
||||
|
||||
var js = await GetJetStreamAsync(cancellationToken).ConfigureAwait(false);
|
||||
await EnsureStreamAndConsumerAsync(js, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var idempotencyKey = message.IdempotencyKey ?? message.JobId;
|
||||
var headers = BuildHeaders(message, idempotencyKey);
|
||||
var publishOpts = new NatsJSPubOpts
|
||||
{
|
||||
MsgId = idempotencyKey,
|
||||
RetryAttempts = 0
|
||||
};
|
||||
|
||||
var ack = await js.PublishAsync(
|
||||
_options.Subject,
|
||||
message.Payload.ToArray(),
|
||||
PayloadSerializer,
|
||||
publishOpts,
|
||||
headers,
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
if (ack.Duplicate)
|
||||
{
|
||||
_logger.LogDebug(
|
||||
"Duplicate NATS enqueue detected for job {JobId} (token {Token}).",
|
||||
message.JobId,
|
||||
idempotencyKey);
|
||||
|
||||
QueueMetrics.RecordDeduplicated(TransportName);
|
||||
return new QueueEnqueueResult(ack.Seq.ToString(), true);
|
||||
}
|
||||
|
||||
QueueMetrics.RecordEnqueued(TransportName);
|
||||
_logger.LogDebug(
|
||||
"Enqueued job {JobId} into NATS stream {Stream} with sequence {Sequence}.",
|
||||
message.JobId,
|
||||
ack.Stream,
|
||||
ack.Seq);
|
||||
|
||||
return new QueueEnqueueResult(ack.Seq.ToString(), false);
|
||||
}
|
||||
|
||||
public async ValueTask<IReadOnlyList<IScanQueueLease>> LeaseAsync(
|
||||
QueueLeaseRequest request,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
|
||||
var js = await GetJetStreamAsync(cancellationToken).ConfigureAwait(false);
|
||||
var consumer = await EnsureStreamAndConsumerAsync(js, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var fetchOpts = new NatsJSFetchOpts
|
||||
{
|
||||
MaxMsgs = request.BatchSize,
|
||||
Expires = request.LeaseDuration,
|
||||
IdleHeartbeat = _options.IdleHeartbeat
|
||||
};
|
||||
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var leases = new List<IScanQueueLease>(capacity: request.BatchSize);
|
||||
|
||||
await foreach (var msg in consumer.FetchAsync(PayloadSerializer, fetchOpts, cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
var lease = CreateLease(msg, request.Consumer, now, request.LeaseDuration);
|
||||
if (lease is not null)
|
||||
{
|
||||
leases.Add(lease);
|
||||
}
|
||||
}
|
||||
|
||||
return leases;
|
||||
}
|
||||
|
||||
public async ValueTask<IReadOnlyList<IScanQueueLease>> ClaimExpiredLeasesAsync(
|
||||
QueueClaimOptions options,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(options);
|
||||
|
||||
var js = await GetJetStreamAsync(cancellationToken).ConfigureAwait(false);
|
||||
var consumer = await EnsureStreamAndConsumerAsync(js, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var fetchOpts = new NatsJSFetchOpts
|
||||
{
|
||||
MaxMsgs = options.BatchSize,
|
||||
Expires = options.MinIdleTime,
|
||||
IdleHeartbeat = _options.IdleHeartbeat
|
||||
};
|
||||
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var leases = new List<IScanQueueLease>(options.BatchSize);
|
||||
|
||||
await foreach (var msg in consumer.FetchAsync(PayloadSerializer, fetchOpts, cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
var deliveries = (int)(msg.Metadata?.NumDelivered ?? 1);
|
||||
if (deliveries <= 1)
|
||||
{
|
||||
// Fresh message; surface back to queue and continue.
|
||||
await msg.NakAsync(new AckOpts(), TimeSpan.Zero, cancellationToken).ConfigureAwait(false);
|
||||
continue;
|
||||
}
|
||||
|
||||
var lease = CreateLease(msg, options.ClaimantConsumer, now, _queueOptions.DefaultLeaseDuration);
|
||||
if (lease is not null)
|
||||
{
|
||||
leases.Add(lease);
|
||||
}
|
||||
}
|
||||
|
||||
return leases;
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
if (_disposed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_disposed = true;
|
||||
|
||||
if (_connection is not null)
|
||||
{
|
||||
await _connection.DisposeAsync().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
_connectionGate.Dispose();
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
|
||||
internal async Task AcknowledgeAsync(
|
||||
NatsScanQueueLease lease,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (!lease.TryBeginCompletion())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
await lease.Message.AckAsync(new AckOpts(), cancellationToken).ConfigureAwait(false);
|
||||
|
||||
QueueMetrics.RecordAck(TransportName);
|
||||
_logger.LogDebug(
|
||||
"Acknowledged job {JobId} (seq {Seq}).",
|
||||
lease.JobId,
|
||||
lease.MessageId);
|
||||
}
|
||||
|
||||
internal async Task RenewLeaseAsync(
|
||||
NatsScanQueueLease lease,
|
||||
TimeSpan leaseDuration,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
await lease.Message.AckProgressAsync(new AckOpts(), cancellationToken).ConfigureAwait(false);
|
||||
var expires = _timeProvider.GetUtcNow().Add(leaseDuration);
|
||||
lease.RefreshLease(expires);
|
||||
|
||||
_logger.LogDebug(
|
||||
"Renewed NATS lease for job {JobId} until {Expires:u}.",
|
||||
lease.JobId,
|
||||
expires);
|
||||
}
|
||||
|
||||
internal async Task ReleaseAsync(
|
||||
NatsScanQueueLease lease,
|
||||
QueueReleaseDisposition disposition,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (disposition == QueueReleaseDisposition.Retry
|
||||
&& lease.Attempt >= _queueOptions.MaxDeliveryAttempts)
|
||||
{
|
||||
_logger.LogWarning(
|
||||
"Job {JobId} reached max delivery attempts ({Attempts}); shipping to dead-letter stream.",
|
||||
lease.JobId,
|
||||
lease.Attempt);
|
||||
|
||||
await DeadLetterAsync(
|
||||
lease,
|
||||
$"max-delivery-attempts:{lease.Attempt}",
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!lease.TryBeginCompletion())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if (disposition == QueueReleaseDisposition.Retry)
|
||||
{
|
||||
QueueMetrics.RecordRetry(TransportName);
|
||||
|
||||
var delay = CalculateBackoff(lease.Attempt);
|
||||
await lease.Message.NakAsync(new AckOpts(), delay, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
_logger.LogWarning(
|
||||
"Rescheduled job {JobId} via NATS NAK with delay {Delay} (attempt {Attempt}).",
|
||||
lease.JobId,
|
||||
delay,
|
||||
lease.Attempt);
|
||||
}
|
||||
else
|
||||
{
|
||||
await lease.Message.AckTerminateAsync(new AckOpts(), cancellationToken).ConfigureAwait(false);
|
||||
QueueMetrics.RecordAck(TransportName);
|
||||
|
||||
_logger.LogInformation(
|
||||
"Abandoned job {JobId} after {Attempt} attempt(s).",
|
||||
lease.JobId,
|
||||
lease.Attempt);
|
||||
}
|
||||
}
|
||||
|
||||
internal async Task DeadLetterAsync(
|
||||
NatsScanQueueLease lease,
|
||||
string reason,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (!lease.TryBeginCompletion())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
await lease.Message.AckAsync(new AckOpts(), cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var js = await GetJetStreamAsync(cancellationToken).ConfigureAwait(false);
|
||||
await EnsureDeadLetterStreamAsync(js, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var headers = BuildDeadLetterHeaders(lease, reason);
|
||||
await js.PublishAsync(
|
||||
_options.DeadLetterSubject,
|
||||
lease.Payload.ToArray(),
|
||||
PayloadSerializer,
|
||||
new NatsJSPubOpts(),
|
||||
headers,
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
QueueMetrics.RecordDeadLetter(TransportName);
|
||||
_logger.LogError(
|
||||
"Dead-lettered job {JobId} (attempt {Attempt}): {Reason}",
|
||||
lease.JobId,
|
||||
lease.Attempt,
|
||||
reason);
|
||||
}
|
||||
|
||||
private async Task<NatsJSContext> GetJetStreamAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (_jsContext is not null)
|
||||
{
|
||||
return _jsContext;
|
||||
}
|
||||
|
||||
var connection = await EnsureConnectionAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
await _connectionGate.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
_jsContext ??= new NatsJSContext(connection);
|
||||
return _jsContext;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_connectionGate.Release();
|
||||
}
|
||||
}
|
||||
|
||||
private async ValueTask<INatsJSConsumer> EnsureStreamAndConsumerAsync(
|
||||
NatsJSContext js,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (_consumer is not null)
|
||||
{
|
||||
return _consumer;
|
||||
}
|
||||
|
||||
await _connectionGate.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
if (_consumer is not null)
|
||||
{
|
||||
return _consumer;
|
||||
}
|
||||
|
||||
await EnsureStreamAsync(js, cancellationToken).ConfigureAwait(false);
|
||||
await EnsureDeadLetterStreamAsync(js, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var consumerConfig = new ConsumerConfig
|
||||
{
|
||||
DurableName = _options.DurableConsumer,
|
||||
AckPolicy = ConsumerConfigAckPolicy.Explicit,
|
||||
ReplayPolicy = ConsumerConfigReplayPolicy.Instant,
|
||||
DeliverPolicy = ConsumerConfigDeliverPolicy.All,
|
||||
AckWait = ToNanoseconds(_options.AckWait),
|
||||
MaxAckPending = _options.MaxInFlight,
|
||||
MaxDeliver = Math.Max(1, _queueOptions.MaxDeliveryAttempts),
|
||||
FilterSubjects = new[] { _options.Subject }
|
||||
};
|
||||
|
||||
try
|
||||
{
|
||||
_consumer = await js.CreateConsumerAsync(
|
||||
_options.Stream,
|
||||
consumerConfig,
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
catch (NatsJSApiException apiEx)
|
||||
{
|
||||
_logger.LogDebug(apiEx,
|
||||
"CreateConsumerAsync failed with code {Code}; attempting to fetch existing durable consumer {Durable}.",
|
||||
apiEx.Error?.Code,
|
||||
_options.DurableConsumer);
|
||||
|
||||
_consumer = await js.GetConsumerAsync(
|
||||
_options.Stream,
|
||||
_options.DurableConsumer,
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
return _consumer;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_connectionGate.Release();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<NatsConnection> EnsureConnectionAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (_connection is not null)
|
||||
{
|
||||
return _connection;
|
||||
}
|
||||
|
||||
await _connectionGate.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
if (_connection is not null)
|
||||
{
|
||||
return _connection;
|
||||
}
|
||||
|
||||
var opts = new NatsOpts
|
||||
{
|
||||
Url = _options.Url!,
|
||||
Name = "stellaops-scanner-queue",
|
||||
CommandTimeout = TimeSpan.FromSeconds(10),
|
||||
RequestTimeout = TimeSpan.FromSeconds(20),
|
||||
PingInterval = TimeSpan.FromSeconds(30)
|
||||
};
|
||||
|
||||
_connection = await _connectionFactory(opts, cancellationToken).ConfigureAwait(false);
|
||||
await _connection.ConnectAsync().ConfigureAwait(false);
|
||||
return _connection;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_connectionGate.Release();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task EnsureStreamAsync(NatsJSContext js, CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
await js.GetStreamAsync(
|
||||
_options.Stream,
|
||||
new StreamInfoRequest(),
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
catch (NatsJSApiException)
|
||||
{
|
||||
var config = new StreamConfig(
|
||||
name: _options.Stream,
|
||||
subjects: new[] { _options.Subject })
|
||||
{
|
||||
Retention = StreamConfigRetention.Workqueue,
|
||||
Storage = StreamConfigStorage.File,
|
||||
MaxConsumers = -1,
|
||||
MaxMsgs = -1,
|
||||
MaxBytes = -1,
|
||||
MaxAge = 0
|
||||
};
|
||||
|
||||
await js.CreateStreamAsync(config, cancellationToken).ConfigureAwait(false);
|
||||
_logger.LogInformation("Created NATS JetStream stream {Stream} ({Subject}).", _options.Stream, _options.Subject);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task EnsureDeadLetterStreamAsync(NatsJSContext js, CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
await js.GetStreamAsync(
|
||||
_options.DeadLetterStream,
|
||||
new StreamInfoRequest(),
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
catch (NatsJSApiException)
|
||||
{
|
||||
var config = new StreamConfig(
|
||||
name: _options.DeadLetterStream,
|
||||
subjects: new[] { _options.DeadLetterSubject })
|
||||
{
|
||||
Retention = StreamConfigRetention.Workqueue,
|
||||
Storage = StreamConfigStorage.File,
|
||||
MaxConsumers = -1,
|
||||
MaxMsgs = -1,
|
||||
MaxBytes = -1,
|
||||
MaxAge = ToNanoseconds(_queueOptions.DeadLetter.Retention)
|
||||
};
|
||||
|
||||
await js.CreateStreamAsync(config, cancellationToken).ConfigureAwait(false);
|
||||
_logger.LogInformation("Created NATS dead-letter stream {Stream} ({Subject}).", _options.DeadLetterStream, _options.DeadLetterSubject);
|
||||
}
|
||||
}
|
||||
|
||||
internal async ValueTask PingAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var connection = await EnsureConnectionAsync(cancellationToken).ConfigureAwait(false);
|
||||
await connection.PingAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private NatsScanQueueLease? CreateLease(
|
||||
NatsJSMsg<byte[]> message,
|
||||
string consumer,
|
||||
DateTimeOffset now,
|
||||
TimeSpan leaseDuration)
|
||||
{
|
||||
var headers = message.Headers;
|
||||
if (headers is null)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
if (!headers.TryGetValue(QueueEnvelopeFields.JobId, out var jobIdValues) || jobIdValues.Count == 0)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
var jobId = jobIdValues[0]!;
|
||||
var idempotencyKey = headers.TryGetValue(QueueEnvelopeFields.IdempotencyKey, out var idemValues) && idemValues.Count > 0
|
||||
? idemValues[0]
|
||||
: null;
|
||||
|
||||
var traceId = headers.TryGetValue(QueueEnvelopeFields.TraceId, out var traceValues) && traceValues.Count > 0
|
||||
? string.IsNullOrWhiteSpace(traceValues[0]) ? null : traceValues[0]
|
||||
: null;
|
||||
|
||||
var enqueuedAt = headers.TryGetValue(QueueEnvelopeFields.EnqueuedAt, out var enqueuedValues) && enqueuedValues.Count > 0
|
||||
&& long.TryParse(enqueuedValues[0], out var unix)
|
||||
? DateTimeOffset.FromUnixTimeMilliseconds(unix)
|
||||
: now;
|
||||
|
||||
var attempt = headers.TryGetValue(QueueEnvelopeFields.Attempt, out var attemptValues) && attemptValues.Count > 0
|
||||
&& int.TryParse(attemptValues[0], out var parsedAttempt)
|
||||
? parsedAttempt
|
||||
: 1;
|
||||
|
||||
if (message.Metadata?.NumDelivered is ulong delivered && delivered > 0)
|
||||
{
|
||||
var deliveredInt = delivered > int.MaxValue ? int.MaxValue : (int)delivered;
|
||||
if (deliveredInt > attempt)
|
||||
{
|
||||
attempt = deliveredInt;
|
||||
}
|
||||
}
|
||||
|
||||
var leaseExpires = now.Add(leaseDuration);
|
||||
var attributes = ExtractAttributes(headers);
|
||||
|
||||
var messageId = message.Metadata?.Sequence.Stream.ToString() ?? Guid.NewGuid().ToString("n");
|
||||
return new NatsScanQueueLease(
|
||||
this,
|
||||
message,
|
||||
messageId,
|
||||
jobId,
|
||||
message.Data ?? Array.Empty<byte>(),
|
||||
attempt,
|
||||
enqueuedAt,
|
||||
leaseExpires,
|
||||
consumer,
|
||||
idempotencyKey,
|
||||
traceId,
|
||||
attributes);
|
||||
}
|
||||
|
||||
private static IReadOnlyDictionary<string, string> ExtractAttributes(NatsHeaders headers)
|
||||
{
|
||||
var attributes = new Dictionary<string, string>(StringComparer.Ordinal);
|
||||
|
||||
foreach (var key in headers.Keys)
|
||||
{
|
||||
if (!key.StartsWith(QueueEnvelopeFields.AttributePrefix, StringComparison.Ordinal))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (headers.TryGetValue(key, out var values) && values.Count > 0)
|
||||
{
|
||||
attributes[key[QueueEnvelopeFields.AttributePrefix.Length..]] = values[0]!;
|
||||
}
|
||||
}
|
||||
|
||||
return attributes.Count == 0
|
||||
? EmptyReadOnlyDictionary<string, string>.Instance
|
||||
: new ReadOnlyDictionary<string, string>(attributes);
|
||||
}
|
||||
|
||||
private NatsHeaders BuildHeaders(ScanQueueMessage message, string idempotencyKey)
|
||||
{
|
||||
var headers = new NatsHeaders
|
||||
{
|
||||
{ QueueEnvelopeFields.JobId, message.JobId },
|
||||
{ QueueEnvelopeFields.IdempotencyKey, idempotencyKey },
|
||||
{ QueueEnvelopeFields.Attempt, "1" },
|
||||
{ QueueEnvelopeFields.EnqueuedAt, _timeProvider.GetUtcNow().ToUnixTimeMilliseconds().ToString() }
|
||||
};
|
||||
|
||||
if (!string.IsNullOrEmpty(message.TraceId))
|
||||
{
|
||||
headers.Add(QueueEnvelopeFields.TraceId, message.TraceId!);
|
||||
}
|
||||
|
||||
if (message.Attributes is not null)
|
||||
{
|
||||
foreach (var kvp in message.Attributes)
|
||||
{
|
||||
headers.Add(QueueEnvelopeFields.AttributePrefix + kvp.Key, kvp.Value);
|
||||
}
|
||||
}
|
||||
|
||||
return headers;
|
||||
}
|
||||
|
||||
private NatsHeaders BuildDeadLetterHeaders(NatsScanQueueLease lease, string reason)
|
||||
{
|
||||
var headers = new NatsHeaders
|
||||
{
|
||||
{ QueueEnvelopeFields.JobId, lease.JobId },
|
||||
{ QueueEnvelopeFields.IdempotencyKey, lease.IdempotencyKey ?? lease.JobId },
|
||||
{ QueueEnvelopeFields.Attempt, lease.Attempt.ToString() },
|
||||
{ QueueEnvelopeFields.EnqueuedAt, lease.EnqueuedAt.ToUnixTimeMilliseconds().ToString() },
|
||||
{ "deadletter-reason", reason }
|
||||
};
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(lease.TraceId))
|
||||
{
|
||||
headers.Add(QueueEnvelopeFields.TraceId, lease.TraceId!);
|
||||
}
|
||||
|
||||
foreach (var kvp in lease.Attributes)
|
||||
{
|
||||
headers.Add(QueueEnvelopeFields.AttributePrefix + kvp.Key, kvp.Value);
|
||||
}
|
||||
|
||||
return headers;
|
||||
}
|
||||
|
||||
private TimeSpan CalculateBackoff(int attempt)
|
||||
{
|
||||
var configuredInitial = _options.RetryDelay > TimeSpan.Zero
|
||||
? _options.RetryDelay
|
||||
: _queueOptions.RetryInitialBackoff;
|
||||
|
||||
if (configuredInitial <= TimeSpan.Zero)
|
||||
{
|
||||
return TimeSpan.Zero;
|
||||
}
|
||||
|
||||
if (attempt <= 1)
|
||||
{
|
||||
return configuredInitial;
|
||||
}
|
||||
|
||||
var max = _queueOptions.RetryMaxBackoff > TimeSpan.Zero
|
||||
? _queueOptions.RetryMaxBackoff
|
||||
: configuredInitial;
|
||||
|
||||
var exponent = attempt - 1;
|
||||
var scaledTicks = configuredInitial.Ticks * Math.Pow(2, exponent - 1);
|
||||
var cappedTicks = Math.Min(max.Ticks, scaledTicks);
|
||||
var resultTicks = Math.Max(configuredInitial.Ticks, (long)cappedTicks);
|
||||
return TimeSpan.FromTicks(resultTicks);
|
||||
}
|
||||
|
||||
private static long ToNanoseconds(TimeSpan timeSpan)
|
||||
=> timeSpan <= TimeSpan.Zero ? 0 : timeSpan.Ticks * 100L;
|
||||
|
||||
private static class EmptyReadOnlyDictionary<TKey, TValue>
|
||||
where TKey : notnull
|
||||
{
|
||||
public static readonly IReadOnlyDictionary<TKey, TValue> Instance =
|
||||
new ReadOnlyDictionary<TKey, TValue>(new Dictionary<TKey, TValue>(0, EqualityComparer<TKey>.Default));
|
||||
}
|
||||
}
|
||||
82
src/StellaOps.Scanner.Queue/Nats/NatsScanQueueLease.cs
Normal file
82
src/StellaOps.Scanner.Queue/Nats/NatsScanQueueLease.cs
Normal file
@@ -0,0 +1,82 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using NATS.Client.JetStream;
|
||||
|
||||
namespace StellaOps.Scanner.Queue.Nats;
|
||||
|
||||
internal sealed class NatsScanQueueLease : IScanQueueLease
|
||||
{
|
||||
private readonly NatsScanQueue _queue;
|
||||
private readonly NatsJSMsg<byte[]> _message;
|
||||
private int _completed;
|
||||
|
||||
internal NatsScanQueueLease(
|
||||
NatsScanQueue queue,
|
||||
NatsJSMsg<byte[]> message,
|
||||
string messageId,
|
||||
string jobId,
|
||||
byte[] payload,
|
||||
int attempt,
|
||||
DateTimeOffset enqueuedAt,
|
||||
DateTimeOffset leaseExpiresAt,
|
||||
string consumer,
|
||||
string? idempotencyKey,
|
||||
string? traceId,
|
||||
IReadOnlyDictionary<string, string> attributes)
|
||||
{
|
||||
_queue = queue;
|
||||
_message = message;
|
||||
MessageId = messageId;
|
||||
JobId = jobId;
|
||||
Payload = payload;
|
||||
Attempt = attempt;
|
||||
EnqueuedAt = enqueuedAt;
|
||||
LeaseExpiresAt = leaseExpiresAt;
|
||||
Consumer = consumer;
|
||||
IdempotencyKey = idempotencyKey;
|
||||
TraceId = traceId;
|
||||
Attributes = attributes;
|
||||
}
|
||||
|
||||
public string MessageId { get; }
|
||||
|
||||
public string JobId { get; }
|
||||
|
||||
public ReadOnlyMemory<byte> Payload { get; }
|
||||
|
||||
public int Attempt { get; internal set; }
|
||||
|
||||
public DateTimeOffset EnqueuedAt { get; }
|
||||
|
||||
public DateTimeOffset LeaseExpiresAt { get; private set; }
|
||||
|
||||
public string Consumer { get; }
|
||||
|
||||
public string? IdempotencyKey { get; }
|
||||
|
||||
public string? TraceId { get; }
|
||||
|
||||
public IReadOnlyDictionary<string, string> Attributes { get; }
|
||||
|
||||
internal NatsJSMsg<byte[]> Message => _message;
|
||||
|
||||
public Task AcknowledgeAsync(CancellationToken cancellationToken = default)
|
||||
=> _queue.AcknowledgeAsync(this, cancellationToken);
|
||||
|
||||
public Task RenewAsync(TimeSpan leaseDuration, CancellationToken cancellationToken = default)
|
||||
=> _queue.RenewLeaseAsync(this, leaseDuration, cancellationToken);
|
||||
|
||||
public Task ReleaseAsync(QueueReleaseDisposition disposition, CancellationToken cancellationToken = default)
|
||||
=> _queue.ReleaseAsync(this, disposition, cancellationToken);
|
||||
|
||||
public Task DeadLetterAsync(string reason, CancellationToken cancellationToken = default)
|
||||
=> _queue.DeadLetterAsync(this, reason, cancellationToken);
|
||||
|
||||
internal bool TryBeginCompletion()
|
||||
=> Interlocked.CompareExchange(ref _completed, 1, 0) == 0;
|
||||
|
||||
internal void RefreshLease(DateTimeOffset expiresAt)
|
||||
=> LeaseExpiresAt = expiresAt;
|
||||
}
|
||||
12
src/StellaOps.Scanner.Queue/QueueEnvelopeFields.cs
Normal file
12
src/StellaOps.Scanner.Queue/QueueEnvelopeFields.cs
Normal file
@@ -0,0 +1,12 @@
|
||||
namespace StellaOps.Scanner.Queue;
|
||||
|
||||
internal static class QueueEnvelopeFields
|
||||
{
|
||||
public const string Payload = "payload";
|
||||
public const string JobId = "jobId";
|
||||
public const string IdempotencyKey = "idempotency";
|
||||
public const string Attempt = "attempt";
|
||||
public const string EnqueuedAt = "enqueuedAt";
|
||||
public const string TraceId = "traceId";
|
||||
public const string AttributePrefix = "attr:";
|
||||
}
|
||||
28
src/StellaOps.Scanner.Queue/QueueMetrics.cs
Normal file
28
src/StellaOps.Scanner.Queue/QueueMetrics.cs
Normal file
@@ -0,0 +1,28 @@
|
||||
using System.Diagnostics.Metrics;
|
||||
|
||||
namespace StellaOps.Scanner.Queue;
|
||||
|
||||
internal static class QueueMetrics
|
||||
{
|
||||
private const string TransportTagName = "transport";
|
||||
|
||||
private static readonly Meter Meter = new("StellaOps.Scanner.Queue");
|
||||
private static readonly Counter<long> EnqueuedCounter = Meter.CreateCounter<long>("scanner_queue_enqueued_total");
|
||||
private static readonly Counter<long> DeduplicatedCounter = Meter.CreateCounter<long>("scanner_queue_deduplicated_total");
|
||||
private static readonly Counter<long> AckCounter = Meter.CreateCounter<long>("scanner_queue_ack_total");
|
||||
private static readonly Counter<long> RetryCounter = Meter.CreateCounter<long>("scanner_queue_retry_total");
|
||||
private static readonly Counter<long> DeadLetterCounter = Meter.CreateCounter<long>("scanner_queue_deadletter_total");
|
||||
|
||||
public static void RecordEnqueued(string transport) => EnqueuedCounter.Add(1, BuildTags(transport));
|
||||
|
||||
public static void RecordDeduplicated(string transport) => DeduplicatedCounter.Add(1, BuildTags(transport));
|
||||
|
||||
public static void RecordAck(string transport) => AckCounter.Add(1, BuildTags(transport));
|
||||
|
||||
public static void RecordRetry(string transport) => RetryCounter.Add(1, BuildTags(transport));
|
||||
|
||||
public static void RecordDeadLetter(string transport) => DeadLetterCounter.Add(1, BuildTags(transport));
|
||||
|
||||
private static KeyValuePair<string, object?>[] BuildTags(string transport)
|
||||
=> new[] { new KeyValuePair<string, object?>(TransportTagName, transport) };
|
||||
}
|
||||
7
src/StellaOps.Scanner.Queue/QueueTransportKind.cs
Normal file
7
src/StellaOps.Scanner.Queue/QueueTransportKind.cs
Normal file
@@ -0,0 +1,7 @@
|
||||
namespace StellaOps.Scanner.Queue;
|
||||
|
||||
public enum QueueTransportKind
|
||||
{
|
||||
Redis,
|
||||
Nats
|
||||
}
|
||||
766
src/StellaOps.Scanner.Queue/Redis/RedisScanQueue.cs
Normal file
766
src/StellaOps.Scanner.Queue/Redis/RedisScanQueue.cs
Normal file
@@ -0,0 +1,766 @@
|
||||
using System;
|
||||
using System.Buffers;
|
||||
using System.Collections.Generic;
|
||||
using System.Collections.ObjectModel;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using StackExchange.Redis;
|
||||
|
||||
namespace StellaOps.Scanner.Queue.Redis;
|
||||
|
||||
internal sealed class RedisScanQueue : IScanQueue, IAsyncDisposable
|
||||
{
|
||||
private const string TransportName = "redis";
|
||||
|
||||
private readonly ScannerQueueOptions _queueOptions;
|
||||
private readonly RedisQueueOptions _options;
|
||||
private readonly ILogger<RedisScanQueue> _logger;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly SemaphoreSlim _connectionLock = new(1, 1);
|
||||
private readonly SemaphoreSlim _groupInitLock = new(1, 1);
|
||||
private readonly Func<ConfigurationOptions, Task<IConnectionMultiplexer>> _connectionFactory;
|
||||
private IConnectionMultiplexer? _connection;
|
||||
private volatile bool _groupInitialized;
|
||||
private bool _disposed;
|
||||
|
||||
private string BuildIdempotencyKey(string key)
|
||||
=> string.Concat(_options.IdempotencyKeyPrefix, key);
|
||||
|
||||
public RedisScanQueue(
|
||||
ScannerQueueOptions queueOptions,
|
||||
RedisQueueOptions options,
|
||||
ILogger<RedisScanQueue> logger,
|
||||
TimeProvider timeProvider,
|
||||
Func<ConfigurationOptions, Task<IConnectionMultiplexer>>? connectionFactory = null)
|
||||
{
|
||||
_queueOptions = queueOptions ?? throw new ArgumentNullException(nameof(queueOptions));
|
||||
_options = options ?? throw new ArgumentNullException(nameof(options));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
_connectionFactory = connectionFactory ?? (config => Task.FromResult<IConnectionMultiplexer>(ConnectionMultiplexer.Connect(config)));
|
||||
|
||||
if (string.IsNullOrWhiteSpace(_options.ConnectionString))
|
||||
{
|
||||
throw new InvalidOperationException("Redis connection string must be configured for the scanner queue.");
|
||||
}
|
||||
}
|
||||
|
||||
public async ValueTask<QueueEnqueueResult> EnqueueAsync(
|
||||
ScanQueueMessage message,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(message);
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
await EnsureConsumerGroupAsync(db, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var attempt = 1;
|
||||
var entries = BuildEntries(message, now, attempt);
|
||||
var messageId = await AddToStreamAsync(
|
||||
db,
|
||||
_options.StreamName,
|
||||
entries,
|
||||
_options.ApproximateMaxLength,
|
||||
_options.ApproximateMaxLength is not null)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
var idempotencyToken = message.IdempotencyKey ?? message.JobId;
|
||||
var idempotencyKey = BuildIdempotencyKey(idempotencyToken);
|
||||
|
||||
var stored = await db.StringSetAsync(
|
||||
key: idempotencyKey,
|
||||
value: messageId,
|
||||
when: When.NotExists,
|
||||
expiry: _options.IdempotencyWindow)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
if (!stored)
|
||||
{
|
||||
// Duplicate enqueue – delete the freshly added entry and surface cached ID.
|
||||
await db.StreamDeleteAsync(
|
||||
_options.StreamName,
|
||||
new RedisValue[] { messageId })
|
||||
.ConfigureAwait(false);
|
||||
|
||||
var existing = await db.StringGetAsync(idempotencyKey).ConfigureAwait(false);
|
||||
var duplicateId = existing.IsNullOrEmpty ? messageId : existing;
|
||||
|
||||
_logger.LogDebug(
|
||||
"Duplicate queue enqueue detected for job {JobId} (token {Token}), returning existing stream id {StreamId}.",
|
||||
message.JobId,
|
||||
idempotencyToken,
|
||||
duplicateId.ToString());
|
||||
|
||||
QueueMetrics.RecordDeduplicated(TransportName);
|
||||
return new QueueEnqueueResult(duplicateId.ToString()!, true);
|
||||
}
|
||||
|
||||
_logger.LogDebug(
|
||||
"Enqueued job {JobId} into stream {Stream} with id {StreamId}.",
|
||||
message.JobId,
|
||||
_options.StreamName,
|
||||
messageId.ToString());
|
||||
|
||||
QueueMetrics.RecordEnqueued(TransportName);
|
||||
return new QueueEnqueueResult(messageId.ToString()!, false);
|
||||
}
|
||||
|
||||
public async ValueTask<IReadOnlyList<IScanQueueLease>> LeaseAsync(
|
||||
QueueLeaseRequest request,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
await EnsureConsumerGroupAsync(db, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var entries = await db.StreamReadGroupAsync(
|
||||
_options.StreamName,
|
||||
_options.ConsumerGroup,
|
||||
request.Consumer,
|
||||
position: ">",
|
||||
count: request.BatchSize,
|
||||
flags: CommandFlags.None)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
if (entries is null || entries.Length == 0)
|
||||
{
|
||||
return Array.Empty<IScanQueueLease>();
|
||||
}
|
||||
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var leases = new List<IScanQueueLease>(entries.Length);
|
||||
|
||||
foreach (var entry in entries)
|
||||
{
|
||||
var lease = TryMapLease(
|
||||
entry,
|
||||
request.Consumer,
|
||||
now,
|
||||
request.LeaseDuration,
|
||||
default);
|
||||
|
||||
if (lease is null)
|
||||
{
|
||||
_logger.LogWarning(
|
||||
"Stream entry {StreamId} is missing required metadata; acknowledging to avoid poison message.",
|
||||
entry.Id.ToString());
|
||||
await db.StreamAcknowledgeAsync(
|
||||
_options.StreamName,
|
||||
_options.ConsumerGroup,
|
||||
new RedisValue[] { entry.Id })
|
||||
.ConfigureAwait(false);
|
||||
await db.StreamDeleteAsync(
|
||||
_options.StreamName,
|
||||
new RedisValue[] { entry.Id })
|
||||
.ConfigureAwait(false);
|
||||
continue;
|
||||
}
|
||||
|
||||
leases.Add(lease);
|
||||
}
|
||||
|
||||
return leases;
|
||||
}
|
||||
|
||||
public async ValueTask<IReadOnlyList<IScanQueueLease>> ClaimExpiredLeasesAsync(
|
||||
QueueClaimOptions options,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(options);
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
await EnsureConsumerGroupAsync(db, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var pending = await db.StreamPendingMessagesAsync(
|
||||
_options.StreamName,
|
||||
_options.ConsumerGroup,
|
||||
options.BatchSize,
|
||||
RedisValue.Null,
|
||||
(long)options.MinIdleTime.TotalMilliseconds)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
if (pending is null || pending.Length == 0)
|
||||
{
|
||||
return Array.Empty<IScanQueueLease>();
|
||||
}
|
||||
|
||||
var eligible = pending
|
||||
.Where(p => p.IdleTimeInMilliseconds >= options.MinIdleTime.TotalMilliseconds)
|
||||
.ToArray();
|
||||
|
||||
if (eligible.Length == 0)
|
||||
{
|
||||
return Array.Empty<IScanQueueLease>();
|
||||
}
|
||||
|
||||
var messageIds = eligible
|
||||
.Select(static p => (RedisValue)p.MessageId)
|
||||
.ToArray();
|
||||
|
||||
var entries = await db.StreamClaimAsync(
|
||||
_options.StreamName,
|
||||
_options.ConsumerGroup,
|
||||
options.ClaimantConsumer,
|
||||
0,
|
||||
messageIds,
|
||||
CommandFlags.None)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
if (entries is null || entries.Length == 0)
|
||||
{
|
||||
return Array.Empty<IScanQueueLease>();
|
||||
}
|
||||
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var pendingById = Enumerable.ToDictionary<StreamPendingMessageInfo, string, StreamPendingMessageInfo>(
|
||||
eligible,
|
||||
static p => p.MessageId.IsNullOrEmpty ? string.Empty : p.MessageId.ToString(),
|
||||
static p => p,
|
||||
StringComparer.Ordinal);
|
||||
|
||||
var leases = new List<IScanQueueLease>(entries.Length);
|
||||
foreach (var entry in entries)
|
||||
{
|
||||
var entryIdValue = entry.Id;
|
||||
var entryId = entryIdValue.IsNullOrEmpty ? string.Empty : entryIdValue.ToString();
|
||||
var hasPending = pendingById.TryGetValue(entryId, out var pendingInfo);
|
||||
var attempt = hasPending
|
||||
? (int)Math.Max(1, pendingInfo.DeliveryCount)
|
||||
: 1;
|
||||
|
||||
var lease = TryMapLease(
|
||||
entry,
|
||||
options.ClaimantConsumer,
|
||||
now,
|
||||
_queueOptions.DefaultLeaseDuration,
|
||||
attempt);
|
||||
|
||||
if (lease is null)
|
||||
{
|
||||
_logger.LogWarning(
|
||||
"Unable to map claimed stream entry {StreamId}; acknowledging to unblock queue.",
|
||||
entry.Id.ToString());
|
||||
await db.StreamAcknowledgeAsync(
|
||||
_options.StreamName,
|
||||
_options.ConsumerGroup,
|
||||
new RedisValue[] { entry.Id })
|
||||
.ConfigureAwait(false);
|
||||
await db.StreamDeleteAsync(
|
||||
_options.StreamName,
|
||||
new RedisValue[] { entry.Id })
|
||||
.ConfigureAwait(false);
|
||||
continue;
|
||||
}
|
||||
|
||||
leases.Add(lease);
|
||||
}
|
||||
|
||||
return leases;
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
if (_disposed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_disposed = true;
|
||||
if (_connection is not null)
|
||||
{
|
||||
await _connection.CloseAsync();
|
||||
_connection.Dispose();
|
||||
}
|
||||
|
||||
_connectionLock.Dispose();
|
||||
_groupInitLock.Dispose();
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
|
||||
internal async Task AcknowledgeAsync(
|
||||
RedisScanQueueLease lease,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (!lease.TryBeginCompletion())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
await db.StreamAcknowledgeAsync(
|
||||
_options.StreamName,
|
||||
_options.ConsumerGroup,
|
||||
new RedisValue[] { lease.MessageId })
|
||||
.ConfigureAwait(false);
|
||||
|
||||
await db.StreamDeleteAsync(
|
||||
_options.StreamName,
|
||||
new RedisValue[] { lease.MessageId })
|
||||
.ConfigureAwait(false);
|
||||
|
||||
_logger.LogDebug(
|
||||
"Acknowledged job {JobId} ({MessageId}) on consumer {Consumer}.",
|
||||
lease.JobId,
|
||||
lease.MessageId,
|
||||
lease.Consumer);
|
||||
|
||||
QueueMetrics.RecordAck(TransportName);
|
||||
}
|
||||
|
||||
internal async Task RenewLeaseAsync(
|
||||
RedisScanQueueLease lease,
|
||||
TimeSpan leaseDuration,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
await db.StreamClaimAsync(
|
||||
_options.StreamName,
|
||||
_options.ConsumerGroup,
|
||||
lease.Consumer,
|
||||
0,
|
||||
new RedisValue[] { lease.MessageId },
|
||||
CommandFlags.None)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
var expires = _timeProvider.GetUtcNow().Add(leaseDuration);
|
||||
lease.RefreshLease(expires);
|
||||
|
||||
_logger.LogDebug(
|
||||
"Renewed lease for job {JobId} until {LeaseExpiry:u}.",
|
||||
lease.JobId,
|
||||
expires);
|
||||
}
|
||||
|
||||
internal async Task ReleaseAsync(
|
||||
RedisScanQueueLease lease,
|
||||
QueueReleaseDisposition disposition,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (disposition == QueueReleaseDisposition.Retry
|
||||
&& lease.Attempt >= _queueOptions.MaxDeliveryAttempts)
|
||||
{
|
||||
_logger.LogWarning(
|
||||
"Job {JobId} reached max delivery attempts ({Attempts}); moving to dead-letter.",
|
||||
lease.JobId,
|
||||
lease.Attempt);
|
||||
|
||||
await DeadLetterAsync(
|
||||
lease,
|
||||
$"max-delivery-attempts:{lease.Attempt}",
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!lease.TryBeginCompletion())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
await db.StreamAcknowledgeAsync(
|
||||
_options.StreamName,
|
||||
_options.ConsumerGroup,
|
||||
new RedisValue[] { lease.MessageId })
|
||||
.ConfigureAwait(false);
|
||||
await db.StreamDeleteAsync(
|
||||
_options.StreamName,
|
||||
new RedisValue[] { lease.MessageId })
|
||||
.ConfigureAwait(false);
|
||||
|
||||
QueueMetrics.RecordAck(TransportName);
|
||||
|
||||
if (disposition == QueueReleaseDisposition.Retry)
|
||||
{
|
||||
QueueMetrics.RecordRetry(TransportName);
|
||||
|
||||
var delay = CalculateBackoff(lease.Attempt);
|
||||
if (delay > TimeSpan.Zero)
|
||||
{
|
||||
_logger.LogDebug(
|
||||
"Delaying retry for job {JobId} by {Delay} (attempt {Attempt}).",
|
||||
lease.JobId,
|
||||
delay,
|
||||
lease.Attempt);
|
||||
|
||||
try
|
||||
{
|
||||
await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (TaskCanceledException)
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
var requeueMessage = new ScanQueueMessage(lease.JobId, lease.Payload)
|
||||
{
|
||||
IdempotencyKey = lease.IdempotencyKey,
|
||||
Attributes = lease.Attributes,
|
||||
TraceId = lease.TraceId
|
||||
};
|
||||
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var entries = BuildEntries(requeueMessage, now, lease.Attempt + 1);
|
||||
|
||||
await AddToStreamAsync(
|
||||
db,
|
||||
_options.StreamName,
|
||||
entries,
|
||||
_options.ApproximateMaxLength,
|
||||
_options.ApproximateMaxLength is not null)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
QueueMetrics.RecordEnqueued(TransportName);
|
||||
_logger.LogWarning(
|
||||
"Released job {JobId} for retry (attempt {Attempt}).",
|
||||
lease.JobId,
|
||||
lease.Attempt + 1);
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.LogInformation(
|
||||
"Abandoned job {JobId} after {Attempt} attempt(s).",
|
||||
lease.JobId,
|
||||
lease.Attempt);
|
||||
}
|
||||
}
|
||||
|
||||
internal async Task DeadLetterAsync(
|
||||
RedisScanQueueLease lease,
|
||||
string reason,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (!lease.TryBeginCompletion())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
await db.StreamAcknowledgeAsync(
|
||||
_options.StreamName,
|
||||
_options.ConsumerGroup,
|
||||
new RedisValue[] { lease.MessageId })
|
||||
.ConfigureAwait(false);
|
||||
|
||||
await db.StreamDeleteAsync(
|
||||
_options.StreamName,
|
||||
new RedisValue[] { lease.MessageId })
|
||||
.ConfigureAwait(false);
|
||||
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var entries = BuildEntries(
|
||||
new ScanQueueMessage(lease.JobId, lease.Payload)
|
||||
{
|
||||
IdempotencyKey = lease.IdempotencyKey,
|
||||
Attributes = lease.Attributes,
|
||||
TraceId = lease.TraceId
|
||||
},
|
||||
now,
|
||||
lease.Attempt);
|
||||
|
||||
await AddToStreamAsync(
|
||||
db,
|
||||
_queueOptions.DeadLetter.StreamName,
|
||||
entries,
|
||||
null,
|
||||
false)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
_logger.LogError(
|
||||
"Dead-lettered job {JobId} (attempt {Attempt}): {Reason}",
|
||||
lease.JobId,
|
||||
lease.Attempt,
|
||||
reason);
|
||||
|
||||
QueueMetrics.RecordDeadLetter(TransportName);
|
||||
}
|
||||
|
||||
private async ValueTask<IDatabase> GetDatabaseAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (_connection is not null)
|
||||
{
|
||||
return _connection.GetDatabase(_options.Database ?? -1);
|
||||
}
|
||||
|
||||
await _connectionLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
if (_connection is null)
|
||||
{
|
||||
var config = ConfigurationOptions.Parse(_options.ConnectionString!);
|
||||
config.AbortOnConnectFail = false;
|
||||
config.ConnectTimeout = (int)_options.InitializationTimeout.TotalMilliseconds;
|
||||
config.ConnectRetry = 3;
|
||||
if (_options.Database is not null)
|
||||
{
|
||||
config.DefaultDatabase = _options.Database;
|
||||
}
|
||||
|
||||
_connection = await _connectionFactory(config).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
return _connection.GetDatabase(_options.Database ?? -1);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_connectionLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task EnsureConsumerGroupAsync(
|
||||
IDatabase database,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (_groupInitialized)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
await _groupInitLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
if (_groupInitialized)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
await database.StreamCreateConsumerGroupAsync(
|
||||
_options.StreamName,
|
||||
_options.ConsumerGroup,
|
||||
StreamPosition.Beginning,
|
||||
createStream: true)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
catch (RedisServerException ex) when (ex.Message.Contains("BUSYGROUP", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
// Already exists.
|
||||
}
|
||||
|
||||
_groupInitialized = true;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_groupInitLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
private NameValueEntry[] BuildEntries(
|
||||
ScanQueueMessage message,
|
||||
DateTimeOffset enqueuedAt,
|
||||
int attempt)
|
||||
{
|
||||
var attributeCount = message.Attributes?.Count ?? 0;
|
||||
var entries = ArrayPool<NameValueEntry>.Shared.Rent(6 + attributeCount);
|
||||
var index = 0;
|
||||
|
||||
entries[index++] = new NameValueEntry(QueueEnvelopeFields.JobId, message.JobId);
|
||||
entries[index++] = new NameValueEntry(QueueEnvelopeFields.Attempt, attempt);
|
||||
entries[index++] = new NameValueEntry(QueueEnvelopeFields.EnqueuedAt, enqueuedAt.ToUnixTimeMilliseconds());
|
||||
entries[index++] = new NameValueEntry(
|
||||
QueueEnvelopeFields.IdempotencyKey,
|
||||
message.IdempotencyKey ?? message.JobId);
|
||||
entries[index++] = new NameValueEntry(
|
||||
QueueEnvelopeFields.Payload,
|
||||
message.Payload.ToArray());
|
||||
entries[index++] = new NameValueEntry(
|
||||
QueueEnvelopeFields.TraceId,
|
||||
message.TraceId ?? string.Empty);
|
||||
|
||||
if (attributeCount > 0)
|
||||
{
|
||||
foreach (var kvp in message.Attributes!)
|
||||
{
|
||||
entries[index++] = new NameValueEntry(
|
||||
QueueEnvelopeFields.AttributePrefix + kvp.Key,
|
||||
kvp.Value);
|
||||
}
|
||||
}
|
||||
|
||||
var result = entries.AsSpan(0, index).ToArray();
|
||||
ArrayPool<NameValueEntry>.Shared.Return(entries, clearArray: true);
|
||||
return result;
|
||||
}
|
||||
|
||||
private RedisScanQueueLease? TryMapLease(
|
||||
StreamEntry entry,
|
||||
string consumer,
|
||||
DateTimeOffset now,
|
||||
TimeSpan leaseDuration,
|
||||
int? attemptOverride)
|
||||
{
|
||||
if (entry.Values is null || entry.Values.Length == 0)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
string? jobId = null;
|
||||
string? idempotency = null;
|
||||
long? enqueuedAtUnix = null;
|
||||
byte[]? payload = null;
|
||||
string? traceId = null;
|
||||
var attributes = new Dictionary<string, string>(StringComparer.Ordinal);
|
||||
var attempt = attemptOverride ?? 1;
|
||||
|
||||
foreach (var field in entry.Values)
|
||||
{
|
||||
var name = field.Name.ToString();
|
||||
if (name.Equals(QueueEnvelopeFields.JobId, StringComparison.Ordinal))
|
||||
{
|
||||
jobId = field.Value.ToString();
|
||||
}
|
||||
else if (name.Equals(QueueEnvelopeFields.IdempotencyKey, StringComparison.Ordinal))
|
||||
{
|
||||
idempotency = field.Value.ToString();
|
||||
}
|
||||
else if (name.Equals(QueueEnvelopeFields.EnqueuedAt, StringComparison.Ordinal))
|
||||
{
|
||||
if (long.TryParse(field.Value.ToString(), out var unix))
|
||||
{
|
||||
enqueuedAtUnix = unix;
|
||||
}
|
||||
}
|
||||
else if (name.Equals(QueueEnvelopeFields.Payload, StringComparison.Ordinal))
|
||||
{
|
||||
payload = (byte[]?)field.Value ?? Array.Empty<byte>();
|
||||
}
|
||||
else if (name.Equals(QueueEnvelopeFields.Attempt, StringComparison.Ordinal))
|
||||
{
|
||||
if (int.TryParse(field.Value.ToString(), out var parsedAttempt))
|
||||
{
|
||||
attempt = Math.Max(parsedAttempt, attempt);
|
||||
}
|
||||
}
|
||||
else if (name.Equals(QueueEnvelopeFields.TraceId, StringComparison.Ordinal))
|
||||
{
|
||||
var value = field.Value.ToString();
|
||||
traceId = string.IsNullOrWhiteSpace(value) ? null : value;
|
||||
}
|
||||
else if (name.StartsWith(QueueEnvelopeFields.AttributePrefix, StringComparison.Ordinal))
|
||||
{
|
||||
attributes[name[QueueEnvelopeFields.AttributePrefix.Length..]] = field.Value.ToString();
|
||||
}
|
||||
}
|
||||
|
||||
if (jobId is null || payload is null || enqueuedAtUnix is null)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
var enqueuedAt = DateTimeOffset.FromUnixTimeMilliseconds(enqueuedAtUnix.Value);
|
||||
var leaseExpires = now.Add(leaseDuration);
|
||||
|
||||
var attributeView = attributes.Count == 0
|
||||
? EmptyReadOnlyDictionary<string, string>.Instance
|
||||
: new ReadOnlyDictionary<string, string>(attributes);
|
||||
|
||||
return new RedisScanQueueLease(
|
||||
this,
|
||||
entry.Id.ToString(),
|
||||
jobId,
|
||||
payload,
|
||||
attempt,
|
||||
enqueuedAt,
|
||||
leaseExpires,
|
||||
consumer,
|
||||
idempotency,
|
||||
traceId,
|
||||
attributeView);
|
||||
}
|
||||
|
||||
private TimeSpan CalculateBackoff(int attempt)
|
||||
{
|
||||
var configuredInitial = _options.RetryInitialBackoff > TimeSpan.Zero
|
||||
? _options.RetryInitialBackoff
|
||||
: _queueOptions.RetryInitialBackoff;
|
||||
|
||||
var initial = configuredInitial > TimeSpan.Zero
|
||||
? configuredInitial
|
||||
: TimeSpan.Zero;
|
||||
|
||||
if (initial <= TimeSpan.Zero)
|
||||
{
|
||||
return TimeSpan.Zero;
|
||||
}
|
||||
|
||||
if (attempt <= 1)
|
||||
{
|
||||
return initial;
|
||||
}
|
||||
|
||||
var configuredMax = _queueOptions.RetryMaxBackoff > TimeSpan.Zero
|
||||
? _queueOptions.RetryMaxBackoff
|
||||
: initial;
|
||||
|
||||
var max = configuredMax <= TimeSpan.Zero
|
||||
? initial
|
||||
: configuredMax;
|
||||
|
||||
var exponent = attempt - 1;
|
||||
var scale = Math.Pow(2, exponent - 1);
|
||||
var scaledTicks = initial.Ticks * scale;
|
||||
var cappedTicks = Math.Min(max.Ticks, scaledTicks);
|
||||
|
||||
var resultTicks = Math.Max(initial.Ticks, (long)cappedTicks);
|
||||
return TimeSpan.FromTicks(resultTicks);
|
||||
}
|
||||
|
||||
private async Task<RedisValue> AddToStreamAsync(
|
||||
IDatabase database,
|
||||
RedisKey stream,
|
||||
NameValueEntry[] entries,
|
||||
int? maxLength,
|
||||
bool useApproximateLength)
|
||||
{
|
||||
var capacity = 4 + (entries.Length * 2);
|
||||
var args = new List<object>(capacity)
|
||||
{
|
||||
stream
|
||||
};
|
||||
|
||||
if (maxLength.HasValue)
|
||||
{
|
||||
args.Add("MAXLEN");
|
||||
if (useApproximateLength)
|
||||
{
|
||||
args.Add("~");
|
||||
}
|
||||
|
||||
args.Add(maxLength.Value);
|
||||
}
|
||||
|
||||
args.Add("*");
|
||||
for (var i = 0; i < entries.Length; i++)
|
||||
{
|
||||
args.Add(entries[i].Name);
|
||||
args.Add(entries[i].Value);
|
||||
}
|
||||
|
||||
var result = await database.ExecuteAsync("XADD", args.ToArray()).ConfigureAwait(false);
|
||||
return (RedisValue)result!;
|
||||
}
|
||||
|
||||
private static class EmptyReadOnlyDictionary<TKey, TValue>
|
||||
where TKey : notnull
|
||||
{
|
||||
public static readonly IReadOnlyDictionary<TKey, TValue> Instance =
|
||||
new ReadOnlyDictionary<TKey, TValue>(new Dictionary<TKey, TValue>(0, EqualityComparer<TKey>.Default));
|
||||
}
|
||||
|
||||
internal async ValueTask PingAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
await db.ExecuteAsync("PING").ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
76
src/StellaOps.Scanner.Queue/Redis/RedisScanQueueLease.cs
Normal file
76
src/StellaOps.Scanner.Queue/Redis/RedisScanQueueLease.cs
Normal file
@@ -0,0 +1,76 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace StellaOps.Scanner.Queue.Redis;
|
||||
|
||||
internal sealed class RedisScanQueueLease : IScanQueueLease
|
||||
{
|
||||
private readonly RedisScanQueue _queue;
|
||||
private int _completed;
|
||||
|
||||
internal RedisScanQueueLease(
|
||||
RedisScanQueue queue,
|
||||
string messageId,
|
||||
string jobId,
|
||||
byte[] payload,
|
||||
int attempt,
|
||||
DateTimeOffset enqueuedAt,
|
||||
DateTimeOffset leaseExpiresAt,
|
||||
string consumer,
|
||||
string? idempotencyKey,
|
||||
string? traceId,
|
||||
IReadOnlyDictionary<string, string> attributes)
|
||||
{
|
||||
_queue = queue;
|
||||
MessageId = messageId;
|
||||
JobId = jobId;
|
||||
Payload = payload;
|
||||
Attempt = attempt;
|
||||
EnqueuedAt = enqueuedAt;
|
||||
LeaseExpiresAt = leaseExpiresAt;
|
||||
Consumer = consumer;
|
||||
IdempotencyKey = idempotencyKey;
|
||||
TraceId = traceId;
|
||||
Attributes = attributes;
|
||||
}
|
||||
|
||||
public string MessageId { get; }
|
||||
|
||||
public string JobId { get; }
|
||||
|
||||
public ReadOnlyMemory<byte> Payload { get; }
|
||||
|
||||
public int Attempt { get; }
|
||||
|
||||
public DateTimeOffset EnqueuedAt { get; }
|
||||
|
||||
public DateTimeOffset LeaseExpiresAt { get; private set; }
|
||||
|
||||
public string Consumer { get; }
|
||||
|
||||
public string? IdempotencyKey { get; }
|
||||
|
||||
public string? TraceId { get; }
|
||||
|
||||
public IReadOnlyDictionary<string, string> Attributes { get; }
|
||||
|
||||
public Task AcknowledgeAsync(CancellationToken cancellationToken = default)
|
||||
=> _queue.AcknowledgeAsync(this, cancellationToken);
|
||||
|
||||
public Task RenewAsync(TimeSpan leaseDuration, CancellationToken cancellationToken = default)
|
||||
=> _queue.RenewLeaseAsync(this, leaseDuration, cancellationToken);
|
||||
|
||||
public Task ReleaseAsync(QueueReleaseDisposition disposition, CancellationToken cancellationToken = default)
|
||||
=> _queue.ReleaseAsync(this, disposition, cancellationToken);
|
||||
|
||||
public Task DeadLetterAsync(string reason, CancellationToken cancellationToken = default)
|
||||
=> _queue.DeadLetterAsync(this, reason, cancellationToken);
|
||||
|
||||
internal bool TryBeginCompletion()
|
||||
=> Interlocked.CompareExchange(ref _completed, 1, 0) == 0;
|
||||
|
||||
internal void RefreshLease(DateTimeOffset expiresAt)
|
||||
=> LeaseExpiresAt = expiresAt;
|
||||
}
|
||||
115
src/StellaOps.Scanner.Queue/ScanQueueContracts.cs
Normal file
115
src/StellaOps.Scanner.Queue/ScanQueueContracts.cs
Normal file
@@ -0,0 +1,115 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace StellaOps.Scanner.Queue;
|
||||
|
||||
public sealed class ScanQueueMessage
|
||||
{
|
||||
private readonly byte[] _payload;
|
||||
|
||||
public ScanQueueMessage(string jobId, ReadOnlyMemory<byte> payload)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(jobId))
|
||||
{
|
||||
throw new ArgumentException("Job identifier must be provided.", nameof(jobId));
|
||||
}
|
||||
|
||||
JobId = jobId;
|
||||
_payload = CopyPayload(payload);
|
||||
}
|
||||
|
||||
public string JobId { get; }
|
||||
|
||||
public string? IdempotencyKey { get; init; }
|
||||
|
||||
public string? TraceId { get; init; }
|
||||
|
||||
public IReadOnlyDictionary<string, string>? Attributes { get; init; }
|
||||
|
||||
public ReadOnlyMemory<byte> Payload => _payload;
|
||||
|
||||
private static byte[] CopyPayload(ReadOnlyMemory<byte> payload)
|
||||
{
|
||||
if (payload.Length == 0)
|
||||
{
|
||||
return Array.Empty<byte>();
|
||||
}
|
||||
|
||||
var copy = new byte[payload.Length];
|
||||
payload.Span.CopyTo(copy);
|
||||
return copy;
|
||||
}
|
||||
}
|
||||
|
||||
public readonly record struct QueueEnqueueResult(string MessageId, bool Deduplicated);
|
||||
|
||||
public sealed class QueueLeaseRequest
|
||||
{
|
||||
public QueueLeaseRequest(string consumer, int batchSize, TimeSpan leaseDuration)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(consumer))
|
||||
{
|
||||
throw new ArgumentException("Consumer name must be provided.", nameof(consumer));
|
||||
}
|
||||
|
||||
if (batchSize <= 0)
|
||||
{
|
||||
throw new ArgumentOutOfRangeException(nameof(batchSize), batchSize, "Batch size must be positive.");
|
||||
}
|
||||
|
||||
if (leaseDuration <= TimeSpan.Zero)
|
||||
{
|
||||
throw new ArgumentOutOfRangeException(nameof(leaseDuration), leaseDuration, "Lease duration must be positive.");
|
||||
}
|
||||
|
||||
Consumer = consumer;
|
||||
BatchSize = batchSize;
|
||||
LeaseDuration = leaseDuration;
|
||||
}
|
||||
|
||||
public string Consumer { get; }
|
||||
|
||||
public int BatchSize { get; }
|
||||
|
||||
public TimeSpan LeaseDuration { get; }
|
||||
}
|
||||
|
||||
public sealed class QueueClaimOptions
|
||||
{
|
||||
public QueueClaimOptions(
|
||||
string claimantConsumer,
|
||||
int batchSize,
|
||||
TimeSpan minIdleTime)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(claimantConsumer))
|
||||
{
|
||||
throw new ArgumentException("Consumer must be provided.", nameof(claimantConsumer));
|
||||
}
|
||||
|
||||
if (batchSize <= 0)
|
||||
{
|
||||
throw new ArgumentOutOfRangeException(nameof(batchSize), batchSize, "Batch size must be positive.");
|
||||
}
|
||||
|
||||
if (minIdleTime < TimeSpan.Zero)
|
||||
{
|
||||
throw new ArgumentOutOfRangeException(nameof(minIdleTime), minIdleTime, "Idle time cannot be negative.");
|
||||
}
|
||||
|
||||
ClaimantConsumer = claimantConsumer;
|
||||
BatchSize = batchSize;
|
||||
MinIdleTime = minIdleTime;
|
||||
}
|
||||
|
||||
public string ClaimantConsumer { get; }
|
||||
|
||||
public int BatchSize { get; }
|
||||
|
||||
public TimeSpan MinIdleTime { get; }
|
||||
}
|
||||
|
||||
public enum QueueReleaseDisposition
|
||||
{
|
||||
Retry,
|
||||
Abandon
|
||||
}
|
||||
55
src/StellaOps.Scanner.Queue/ScannerQueueHealthCheck.cs
Normal file
55
src/StellaOps.Scanner.Queue/ScannerQueueHealthCheck.cs
Normal file
@@ -0,0 +1,55 @@
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Diagnostics.HealthChecks;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using StellaOps.Scanner.Queue.Nats;
|
||||
using StellaOps.Scanner.Queue.Redis;
|
||||
|
||||
namespace StellaOps.Scanner.Queue;
|
||||
|
||||
public sealed class ScannerQueueHealthCheck : IHealthCheck
|
||||
{
|
||||
private readonly IScanQueue _queue;
|
||||
private readonly ILogger<ScannerQueueHealthCheck> _logger;
|
||||
|
||||
public ScannerQueueHealthCheck(
|
||||
IScanQueue queue,
|
||||
ILogger<ScannerQueueHealthCheck> logger)
|
||||
{
|
||||
_queue = queue ?? throw new ArgumentNullException(nameof(queue));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
public async Task<HealthCheckResult> CheckHealthAsync(
|
||||
HealthCheckContext context,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
try
|
||||
{
|
||||
switch (_queue)
|
||||
{
|
||||
case RedisScanQueue redisQueue:
|
||||
await redisQueue.PingAsync(cancellationToken).ConfigureAwait(false);
|
||||
return HealthCheckResult.Healthy("Redis queue reachable.");
|
||||
|
||||
case NatsScanQueue natsQueue:
|
||||
await natsQueue.PingAsync(cancellationToken).ConfigureAwait(false);
|
||||
return HealthCheckResult.Healthy("NATS queue reachable.");
|
||||
|
||||
default:
|
||||
return HealthCheckResult.Healthy("Queue transport without dedicated ping returned healthy.");
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Scanner queue health check failed.");
|
||||
return new HealthCheckResult(
|
||||
context.Registration.FailureStatus,
|
||||
"Queue transport unreachable.",
|
||||
ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
92
src/StellaOps.Scanner.Queue/ScannerQueueOptions.cs
Normal file
92
src/StellaOps.Scanner.Queue/ScannerQueueOptions.cs
Normal file
@@ -0,0 +1,92 @@
|
||||
using System;
|
||||
|
||||
namespace StellaOps.Scanner.Queue;
|
||||
|
||||
public sealed class ScannerQueueOptions
|
||||
{
|
||||
public QueueTransportKind Kind { get; set; } = QueueTransportKind.Redis;
|
||||
|
||||
public RedisQueueOptions Redis { get; set; } = new();
|
||||
|
||||
public NatsQueueOptions Nats { get; set; } = new();
|
||||
|
||||
/// <summary>
|
||||
/// Default lease duration applied when callers do not override the visibility timeout.
|
||||
/// </summary>
|
||||
public TimeSpan DefaultLeaseDuration { get; set; } = TimeSpan.FromMinutes(5);
|
||||
|
||||
/// <summary>
|
||||
/// Maximum number of times a message may be delivered before it is shunted to the dead-letter queue.
|
||||
/// </summary>
|
||||
public int MaxDeliveryAttempts { get; set; } = 5;
|
||||
|
||||
/// <summary>
|
||||
/// Options controlling retry/backoff/dead-letter handling.
|
||||
/// </summary>
|
||||
public DeadLetterQueueOptions DeadLetter { get; set; } = new();
|
||||
|
||||
/// <summary>
|
||||
/// Initial backoff applied when a job is retried after failure.
|
||||
/// </summary>
|
||||
public TimeSpan RetryInitialBackoff { get; set; } = TimeSpan.FromSeconds(5);
|
||||
|
||||
/// <summary>
|
||||
/// Maximum backoff window applied for exponential retry.
|
||||
/// </summary>
|
||||
public TimeSpan RetryMaxBackoff { get; set; } = TimeSpan.FromMinutes(2);
|
||||
}
|
||||
|
||||
public sealed class RedisQueueOptions
|
||||
{
|
||||
public string? ConnectionString { get; set; }
|
||||
|
||||
public int? Database { get; set; }
|
||||
|
||||
public string StreamName { get; set; } = "scanner:jobs";
|
||||
|
||||
public string ConsumerGroup { get; set; } = "scanner-workers";
|
||||
|
||||
public string IdempotencyKeyPrefix { get; set; } = "scanner:jobs:idemp:";
|
||||
|
||||
public TimeSpan IdempotencyWindow { get; set; } = TimeSpan.FromHours(12);
|
||||
|
||||
public int? ApproximateMaxLength { get; set; }
|
||||
|
||||
public TimeSpan InitializationTimeout { get; set; } = TimeSpan.FromSeconds(30);
|
||||
|
||||
public TimeSpan ClaimIdleThreshold { get; set; } = TimeSpan.FromMinutes(10);
|
||||
|
||||
public TimeSpan PendingScanWindow { get; set; } = TimeSpan.FromMinutes(30);
|
||||
|
||||
public TimeSpan RetryInitialBackoff { get; set; } = TimeSpan.FromSeconds(5);
|
||||
}
|
||||
|
||||
public sealed class NatsQueueOptions
|
||||
{
|
||||
public string? Url { get; set; }
|
||||
|
||||
public string Stream { get; set; } = "SCANNER_JOBS";
|
||||
|
||||
public string Subject { get; set; } = "scanner.jobs";
|
||||
|
||||
public string DurableConsumer { get; set; } = "scanner-workers";
|
||||
|
||||
public int MaxInFlight { get; set; } = 64;
|
||||
|
||||
public TimeSpan AckWait { get; set; } = TimeSpan.FromMinutes(5);
|
||||
|
||||
public string DeadLetterStream { get; set; } = "SCANNER_JOBS_DEAD";
|
||||
|
||||
public string DeadLetterSubject { get; set; } = "scanner.jobs.dead";
|
||||
|
||||
public TimeSpan RetryDelay { get; set; } = TimeSpan.FromSeconds(10);
|
||||
|
||||
public TimeSpan IdleHeartbeat { get; set; } = TimeSpan.FromSeconds(30);
|
||||
}
|
||||
|
||||
public sealed class DeadLetterQueueOptions
|
||||
{
|
||||
public string StreamName { get; set; } = "scanner:jobs:dead";
|
||||
|
||||
public TimeSpan Retention { get; set; } = TimeSpan.FromDays(7);
|
||||
}
|
||||
@@ -0,0 +1,67 @@
|
||||
using System;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.DependencyInjection.Extensions;
|
||||
using Microsoft.Extensions.Diagnostics.HealthChecks;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using StellaOps.Scanner.Queue.Nats;
|
||||
using StellaOps.Scanner.Queue.Redis;
|
||||
|
||||
namespace StellaOps.Scanner.Queue;
|
||||
|
||||
public static class ScannerQueueServiceCollectionExtensions
|
||||
{
|
||||
public static IServiceCollection AddScannerQueue(
|
||||
this IServiceCollection services,
|
||||
IConfiguration configuration,
|
||||
string sectionName = "scanner:queue")
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(services);
|
||||
ArgumentNullException.ThrowIfNull(configuration);
|
||||
|
||||
var options = new ScannerQueueOptions();
|
||||
configuration.GetSection(sectionName).Bind(options);
|
||||
|
||||
services.TryAddSingleton(TimeProvider.System);
|
||||
services.AddSingleton(options);
|
||||
|
||||
services.AddSingleton<IScanQueue>(sp =>
|
||||
{
|
||||
var loggerFactory = sp.GetRequiredService<ILoggerFactory>();
|
||||
var timeProvider = sp.GetService<TimeProvider>() ?? TimeProvider.System;
|
||||
|
||||
return options.Kind switch
|
||||
{
|
||||
QueueTransportKind.Redis => new RedisScanQueue(
|
||||
options,
|
||||
options.Redis,
|
||||
loggerFactory.CreateLogger<RedisScanQueue>(),
|
||||
timeProvider),
|
||||
QueueTransportKind.Nats => new NatsScanQueue(
|
||||
options,
|
||||
options.Nats,
|
||||
loggerFactory.CreateLogger<NatsScanQueue>(),
|
||||
timeProvider),
|
||||
_ => throw new InvalidOperationException($"Unsupported queue transport kind '{options.Kind}'.")
|
||||
};
|
||||
});
|
||||
|
||||
services.AddSingleton<ScannerQueueHealthCheck>();
|
||||
|
||||
return services;
|
||||
}
|
||||
|
||||
public static IHealthChecksBuilder AddScannerQueueHealthCheck(
|
||||
this IHealthChecksBuilder builder)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(builder);
|
||||
|
||||
builder.Services.TryAddSingleton<ScannerQueueHealthCheck>();
|
||||
builder.AddCheck<ScannerQueueHealthCheck>(
|
||||
name: "scanner-queue",
|
||||
failureStatus: HealthStatus.Unhealthy,
|
||||
tags: new[] { "scanner", "queue" });
|
||||
|
||||
return builder;
|
||||
}
|
||||
}
|
||||
21
src/StellaOps.Scanner.Queue/StellaOps.Scanner.Queue.csproj
Normal file
21
src/StellaOps.Scanner.Queue/StellaOps.Scanner.Queue.csproj
Normal file
@@ -0,0 +1,21 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net10.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
<GenerateAssemblyInfo>false</GenerateAssemblyInfo>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
|
||||
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="8.0.1" />
|
||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.0" />
|
||||
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="8.0.0" />
|
||||
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks.Abstractions" Version="8.0.0" />
|
||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
|
||||
<PackageReference Include="Microsoft.Extensions.Options" Version="8.0.1" />
|
||||
<PackageReference Include="StackExchange.Redis" Version="2.7.33" />
|
||||
<PackageReference Include="NATS.Client.Core" Version="2.0.0" />
|
||||
<PackageReference Include="NATS.Client.JetStream" Version="2.0.0" />
|
||||
</ItemGroup>
|
||||
</Project>
|
||||
7
src/StellaOps.Scanner.Queue/TASKS.md
Normal file
7
src/StellaOps.Scanner.Queue/TASKS.md
Normal file
@@ -0,0 +1,7 @@
|
||||
# Scanner Queue Task Board (Sprint 9)
|
||||
|
||||
| ID | Status | Owner(s) | Depends on | Description | Exit Criteria |
|
||||
|----|--------|----------|------------|-------------|---------------|
|
||||
| SCANNER-QUEUE-09-401 | DONE (2025-10-19) | Scanner Queue Guild | — | Implement queue abstraction + Redis Streams adapter with ack/lease semantics, idempotency tokens, and deterministic job IDs. | Interfaces finalized; Redis adapter passes enqueue/dequeue/ack/claim lease tests; structured logs exercised. |
|
||||
| SCANNER-QUEUE-09-402 | DONE (2025-10-19) | Scanner Queue Guild | SCANNER-QUEUE-09-401 | Add pluggable backend support (Redis, NATS) with configuration binding, health probes, failover documentation. | NATS adapter + DI bindings delivered; health checks documented; configuration tests green. |
|
||||
| SCANNER-QUEUE-09-403 | DONE (2025-10-19) | Scanner Queue Guild | SCANNER-QUEUE-09-401 | Implement retry and dead-letter flow with structured metrics/logs for offline deployments. | Retry policy configurable; dead-letter queue persisted; metrics counters validated in integration tests. |
|
||||
Reference in New Issue
Block a user