This commit is contained in:
master
2025-10-19 10:38:55 +03:00
parent 8dc7273e27
commit aef7ffb535
250 changed files with 17967 additions and 66 deletions

View File

@@ -0,0 +1,15 @@
# StellaOps.Scanner.Queue — Agent Charter
## Mission
Deliver the scanner job queue backbone defined in `docs/ARCHITECTURE_SCANNER.md`, providing deterministic, offline-friendly leasing semantics for WebService producers and Worker consumers.
## Responsibilities
- Define queue abstractions with idempotent enqueue tokens, acknowledgement, lease renewal, and claim support.
- Ship first-party adapters for Redis Streams and NATS JetStream, respecting offline deployments and allow-listed hosts.
- Surface health probes, structured diagnostics, and metrics needed by Scanner WebService/Worker.
- Document operational expectations and configuration binding hooks.
## Interfaces & Dependencies
- Consumes shared configuration primitives from `StellaOps.Configuration`.
- Exposes dependency injection extensions for `StellaOps.DependencyInjection`.
- Targets `net10.0` (preview) and aligns with scanner DTOs once `StellaOps.Scanner.Core` lands.

View File

@@ -0,0 +1,20 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace StellaOps.Scanner.Queue;
public interface IScanQueue
{
ValueTask<QueueEnqueueResult> EnqueueAsync(
ScanQueueMessage message,
CancellationToken cancellationToken = default);
ValueTask<IReadOnlyList<IScanQueueLease>> LeaseAsync(
QueueLeaseRequest request,
CancellationToken cancellationToken = default);
ValueTask<IReadOnlyList<IScanQueueLease>> ClaimExpiredLeasesAsync(
QueueClaimOptions options,
CancellationToken cancellationToken = default);
}

View File

@@ -0,0 +1,35 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace StellaOps.Scanner.Queue;
public interface IScanQueueLease
{
string MessageId { get; }
string JobId { get; }
ReadOnlyMemory<byte> Payload { get; }
int Attempt { get; }
DateTimeOffset EnqueuedAt { get; }
DateTimeOffset LeaseExpiresAt { get; }
string Consumer { get; }
string? IdempotencyKey { get; }
IReadOnlyDictionary<string, string> Attributes { get; }
Task AcknowledgeAsync(CancellationToken cancellationToken = default);
Task RenewAsync(TimeSpan leaseDuration, CancellationToken cancellationToken = default);
Task ReleaseAsync(QueueReleaseDisposition disposition, CancellationToken cancellationToken = default);
Task DeadLetterAsync(string reason, CancellationToken cancellationToken = default);
}

View File

@@ -0,0 +1,644 @@
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
namespace StellaOps.Scanner.Queue.Nats;
internal sealed class NatsScanQueue : IScanQueue, IAsyncDisposable
{
private const string TransportName = "nats";
private static readonly INatsSerializer<byte[]> PayloadSerializer = NatsRawSerializer<byte[]>.Default;
private readonly ScannerQueueOptions _queueOptions;
private readonly NatsQueueOptions _options;
private readonly ILogger<NatsScanQueue> _logger;
private readonly TimeProvider _timeProvider;
private readonly SemaphoreSlim _connectionGate = new(1, 1);
private readonly Func<NatsOpts, CancellationToken, ValueTask<NatsConnection>> _connectionFactory;
private NatsConnection? _connection;
private NatsJSContext? _jsContext;
private INatsJSConsumer? _consumer;
private bool _disposed;
public NatsScanQueue(
ScannerQueueOptions queueOptions,
NatsQueueOptions options,
ILogger<NatsScanQueue> logger,
TimeProvider timeProvider,
Func<NatsOpts, CancellationToken, ValueTask<NatsConnection>>? connectionFactory = null)
{
_queueOptions = queueOptions ?? throw new ArgumentNullException(nameof(queueOptions));
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_timeProvider = timeProvider ?? TimeProvider.System;
_connectionFactory = connectionFactory ?? ((opts, cancellationToken) => new ValueTask<NatsConnection>(new NatsConnection(opts)));
if (string.IsNullOrWhiteSpace(_options.Url))
{
throw new InvalidOperationException("NATS connection URL must be configured for the scanner queue.");
}
}
public async ValueTask<QueueEnqueueResult> EnqueueAsync(
ScanQueueMessage message,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(message);
var js = await GetJetStreamAsync(cancellationToken).ConfigureAwait(false);
await EnsureStreamAndConsumerAsync(js, cancellationToken).ConfigureAwait(false);
var idempotencyKey = message.IdempotencyKey ?? message.JobId;
var headers = BuildHeaders(message, idempotencyKey);
var publishOpts = new NatsJSPubOpts
{
MsgId = idempotencyKey,
RetryAttempts = 0
};
var ack = await js.PublishAsync(
_options.Subject,
message.Payload.ToArray(),
PayloadSerializer,
publishOpts,
headers,
cancellationToken)
.ConfigureAwait(false);
if (ack.Duplicate)
{
_logger.LogDebug(
"Duplicate NATS enqueue detected for job {JobId} (token {Token}).",
message.JobId,
idempotencyKey);
QueueMetrics.RecordDeduplicated(TransportName);
return new QueueEnqueueResult(ack.Seq.ToString(), true);
}
QueueMetrics.RecordEnqueued(TransportName);
_logger.LogDebug(
"Enqueued job {JobId} into NATS stream {Stream} with sequence {Sequence}.",
message.JobId,
ack.Stream,
ack.Seq);
return new QueueEnqueueResult(ack.Seq.ToString(), false);
}
public async ValueTask<IReadOnlyList<IScanQueueLease>> LeaseAsync(
QueueLeaseRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
var js = await GetJetStreamAsync(cancellationToken).ConfigureAwait(false);
var consumer = await EnsureStreamAndConsumerAsync(js, cancellationToken).ConfigureAwait(false);
var fetchOpts = new NatsJSFetchOpts
{
MaxMsgs = request.BatchSize,
Expires = request.LeaseDuration,
IdleHeartbeat = _options.IdleHeartbeat
};
var now = _timeProvider.GetUtcNow();
var leases = new List<IScanQueueLease>(capacity: request.BatchSize);
await foreach (var msg in consumer.FetchAsync(PayloadSerializer, fetchOpts, cancellationToken).ConfigureAwait(false))
{
var lease = CreateLease(msg, request.Consumer, now, request.LeaseDuration);
if (lease is not null)
{
leases.Add(lease);
}
}
return leases;
}
public async ValueTask<IReadOnlyList<IScanQueueLease>> ClaimExpiredLeasesAsync(
QueueClaimOptions options,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(options);
var js = await GetJetStreamAsync(cancellationToken).ConfigureAwait(false);
var consumer = await EnsureStreamAndConsumerAsync(js, cancellationToken).ConfigureAwait(false);
var fetchOpts = new NatsJSFetchOpts
{
MaxMsgs = options.BatchSize,
Expires = options.MinIdleTime,
IdleHeartbeat = _options.IdleHeartbeat
};
var now = _timeProvider.GetUtcNow();
var leases = new List<IScanQueueLease>(options.BatchSize);
await foreach (var msg in consumer.FetchAsync(PayloadSerializer, fetchOpts, cancellationToken).ConfigureAwait(false))
{
var deliveries = (int)(msg.Metadata?.NumDelivered ?? 1);
if (deliveries <= 1)
{
// Fresh message; surface back to queue and continue.
await msg.NakAsync(new AckOpts(), TimeSpan.Zero, cancellationToken).ConfigureAwait(false);
continue;
}
var lease = CreateLease(msg, options.ClaimantConsumer, now, _queueOptions.DefaultLeaseDuration);
if (lease is not null)
{
leases.Add(lease);
}
}
return leases;
}
public async ValueTask DisposeAsync()
{
if (_disposed)
{
return;
}
_disposed = true;
if (_connection is not null)
{
await _connection.DisposeAsync().ConfigureAwait(false);
}
_connectionGate.Dispose();
GC.SuppressFinalize(this);
}
internal async Task AcknowledgeAsync(
NatsScanQueueLease lease,
CancellationToken cancellationToken)
{
if (!lease.TryBeginCompletion())
{
return;
}
await lease.Message.AckAsync(new AckOpts(), cancellationToken).ConfigureAwait(false);
QueueMetrics.RecordAck(TransportName);
_logger.LogDebug(
"Acknowledged job {JobId} (seq {Seq}).",
lease.JobId,
lease.MessageId);
}
internal async Task RenewLeaseAsync(
NatsScanQueueLease lease,
TimeSpan leaseDuration,
CancellationToken cancellationToken)
{
await lease.Message.AckProgressAsync(new AckOpts(), cancellationToken).ConfigureAwait(false);
var expires = _timeProvider.GetUtcNow().Add(leaseDuration);
lease.RefreshLease(expires);
_logger.LogDebug(
"Renewed NATS lease for job {JobId} until {Expires:u}.",
lease.JobId,
expires);
}
internal async Task ReleaseAsync(
NatsScanQueueLease lease,
QueueReleaseDisposition disposition,
CancellationToken cancellationToken)
{
if (disposition == QueueReleaseDisposition.Retry
&& lease.Attempt >= _queueOptions.MaxDeliveryAttempts)
{
_logger.LogWarning(
"Job {JobId} reached max delivery attempts ({Attempts}); shipping to dead-letter stream.",
lease.JobId,
lease.Attempt);
await DeadLetterAsync(
lease,
$"max-delivery-attempts:{lease.Attempt}",
cancellationToken).ConfigureAwait(false);
return;
}
if (!lease.TryBeginCompletion())
{
return;
}
if (disposition == QueueReleaseDisposition.Retry)
{
QueueMetrics.RecordRetry(TransportName);
var delay = CalculateBackoff(lease.Attempt);
await lease.Message.NakAsync(new AckOpts(), delay, cancellationToken).ConfigureAwait(false);
_logger.LogWarning(
"Rescheduled job {JobId} via NATS NAK with delay {Delay} (attempt {Attempt}).",
lease.JobId,
delay,
lease.Attempt);
}
else
{
await lease.Message.AckTerminateAsync(new AckOpts(), cancellationToken).ConfigureAwait(false);
QueueMetrics.RecordAck(TransportName);
_logger.LogInformation(
"Abandoned job {JobId} after {Attempt} attempt(s).",
lease.JobId,
lease.Attempt);
}
}
internal async Task DeadLetterAsync(
NatsScanQueueLease lease,
string reason,
CancellationToken cancellationToken)
{
if (!lease.TryBeginCompletion())
{
return;
}
await lease.Message.AckAsync(new AckOpts(), cancellationToken).ConfigureAwait(false);
var js = await GetJetStreamAsync(cancellationToken).ConfigureAwait(false);
await EnsureDeadLetterStreamAsync(js, cancellationToken).ConfigureAwait(false);
var headers = BuildDeadLetterHeaders(lease, reason);
await js.PublishAsync(
_options.DeadLetterSubject,
lease.Payload.ToArray(),
PayloadSerializer,
new NatsJSPubOpts(),
headers,
cancellationToken)
.ConfigureAwait(false);
QueueMetrics.RecordDeadLetter(TransportName);
_logger.LogError(
"Dead-lettered job {JobId} (attempt {Attempt}): {Reason}",
lease.JobId,
lease.Attempt,
reason);
}
private async Task<NatsJSContext> GetJetStreamAsync(CancellationToken cancellationToken)
{
if (_jsContext is not null)
{
return _jsContext;
}
var connection = await EnsureConnectionAsync(cancellationToken).ConfigureAwait(false);
await _connectionGate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
_jsContext ??= new NatsJSContext(connection);
return _jsContext;
}
finally
{
_connectionGate.Release();
}
}
private async ValueTask<INatsJSConsumer> EnsureStreamAndConsumerAsync(
NatsJSContext js,
CancellationToken cancellationToken)
{
if (_consumer is not null)
{
return _consumer;
}
await _connectionGate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (_consumer is not null)
{
return _consumer;
}
await EnsureStreamAsync(js, cancellationToken).ConfigureAwait(false);
await EnsureDeadLetterStreamAsync(js, cancellationToken).ConfigureAwait(false);
var consumerConfig = new ConsumerConfig
{
DurableName = _options.DurableConsumer,
AckPolicy = ConsumerConfigAckPolicy.Explicit,
ReplayPolicy = ConsumerConfigReplayPolicy.Instant,
DeliverPolicy = ConsumerConfigDeliverPolicy.All,
AckWait = ToNanoseconds(_options.AckWait),
MaxAckPending = _options.MaxInFlight,
MaxDeliver = Math.Max(1, _queueOptions.MaxDeliveryAttempts),
FilterSubjects = new[] { _options.Subject }
};
try
{
_consumer = await js.CreateConsumerAsync(
_options.Stream,
consumerConfig,
cancellationToken)
.ConfigureAwait(false);
}
catch (NatsJSApiException apiEx)
{
_logger.LogDebug(apiEx,
"CreateConsumerAsync failed with code {Code}; attempting to fetch existing durable consumer {Durable}.",
apiEx.Error?.Code,
_options.DurableConsumer);
_consumer = await js.GetConsumerAsync(
_options.Stream,
_options.DurableConsumer,
cancellationToken)
.ConfigureAwait(false);
}
return _consumer;
}
finally
{
_connectionGate.Release();
}
}
private async Task<NatsConnection> EnsureConnectionAsync(CancellationToken cancellationToken)
{
if (_connection is not null)
{
return _connection;
}
await _connectionGate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (_connection is not null)
{
return _connection;
}
var opts = new NatsOpts
{
Url = _options.Url!,
Name = "stellaops-scanner-queue",
CommandTimeout = TimeSpan.FromSeconds(10),
RequestTimeout = TimeSpan.FromSeconds(20),
PingInterval = TimeSpan.FromSeconds(30)
};
_connection = await _connectionFactory(opts, cancellationToken).ConfigureAwait(false);
await _connection.ConnectAsync().ConfigureAwait(false);
return _connection;
}
finally
{
_connectionGate.Release();
}
}
private async Task EnsureStreamAsync(NatsJSContext js, CancellationToken cancellationToken)
{
try
{
await js.GetStreamAsync(
_options.Stream,
new StreamInfoRequest(),
cancellationToken)
.ConfigureAwait(false);
}
catch (NatsJSApiException)
{
var config = new StreamConfig(
name: _options.Stream,
subjects: new[] { _options.Subject })
{
Retention = StreamConfigRetention.Workqueue,
Storage = StreamConfigStorage.File,
MaxConsumers = -1,
MaxMsgs = -1,
MaxBytes = -1,
MaxAge = 0
};
await js.CreateStreamAsync(config, cancellationToken).ConfigureAwait(false);
_logger.LogInformation("Created NATS JetStream stream {Stream} ({Subject}).", _options.Stream, _options.Subject);
}
}
private async Task EnsureDeadLetterStreamAsync(NatsJSContext js, CancellationToken cancellationToken)
{
try
{
await js.GetStreamAsync(
_options.DeadLetterStream,
new StreamInfoRequest(),
cancellationToken)
.ConfigureAwait(false);
}
catch (NatsJSApiException)
{
var config = new StreamConfig(
name: _options.DeadLetterStream,
subjects: new[] { _options.DeadLetterSubject })
{
Retention = StreamConfigRetention.Workqueue,
Storage = StreamConfigStorage.File,
MaxConsumers = -1,
MaxMsgs = -1,
MaxBytes = -1,
MaxAge = ToNanoseconds(_queueOptions.DeadLetter.Retention)
};
await js.CreateStreamAsync(config, cancellationToken).ConfigureAwait(false);
_logger.LogInformation("Created NATS dead-letter stream {Stream} ({Subject}).", _options.DeadLetterStream, _options.DeadLetterSubject);
}
}
internal async ValueTask PingAsync(CancellationToken cancellationToken)
{
var connection = await EnsureConnectionAsync(cancellationToken).ConfigureAwait(false);
await connection.PingAsync(cancellationToken).ConfigureAwait(false);
}
private NatsScanQueueLease? CreateLease(
NatsJSMsg<byte[]> message,
string consumer,
DateTimeOffset now,
TimeSpan leaseDuration)
{
var headers = message.Headers;
if (headers is null)
{
return null;
}
if (!headers.TryGetValue(QueueEnvelopeFields.JobId, out var jobIdValues) || jobIdValues.Count == 0)
{
return null;
}
var jobId = jobIdValues[0]!;
var idempotencyKey = headers.TryGetValue(QueueEnvelopeFields.IdempotencyKey, out var idemValues) && idemValues.Count > 0
? idemValues[0]
: null;
var enqueuedAt = headers.TryGetValue(QueueEnvelopeFields.EnqueuedAt, out var enqueuedValues) && enqueuedValues.Count > 0
&& long.TryParse(enqueuedValues[0], out var unix)
? DateTimeOffset.FromUnixTimeMilliseconds(unix)
: now;
var attempt = headers.TryGetValue(QueueEnvelopeFields.Attempt, out var attemptValues) && attemptValues.Count > 0
&& int.TryParse(attemptValues[0], out var parsedAttempt)
? parsedAttempt
: 1;
if (message.Metadata?.NumDelivered is ulong delivered && delivered > 0)
{
var deliveredInt = delivered > int.MaxValue ? int.MaxValue : (int)delivered;
if (deliveredInt > attempt)
{
attempt = deliveredInt;
}
}
var leaseExpires = now.Add(leaseDuration);
var attributes = ExtractAttributes(headers);
var messageId = message.Metadata?.Sequence.Stream.ToString() ?? Guid.NewGuid().ToString("n");
return new NatsScanQueueLease(
this,
message,
messageId,
jobId,
message.Data ?? Array.Empty<byte>(),
attempt,
enqueuedAt,
leaseExpires,
consumer,
idempotencyKey,
attributes);
}
private static IReadOnlyDictionary<string, string> ExtractAttributes(NatsHeaders headers)
{
var attributes = new Dictionary<string, string>(StringComparer.Ordinal);
foreach (var key in headers.Keys)
{
if (!key.StartsWith(QueueEnvelopeFields.AttributePrefix, StringComparison.Ordinal))
{
continue;
}
if (headers.TryGetValue(key, out var values) && values.Count > 0)
{
attributes[key[QueueEnvelopeFields.AttributePrefix.Length..]] = values[0]!;
}
}
return attributes.Count == 0
? EmptyReadOnlyDictionary<string, string>.Instance
: new ReadOnlyDictionary<string, string>(attributes);
}
private NatsHeaders BuildHeaders(ScanQueueMessage message, string idempotencyKey)
{
var headers = new NatsHeaders
{
{ QueueEnvelopeFields.JobId, message.JobId },
{ QueueEnvelopeFields.IdempotencyKey, idempotencyKey },
{ QueueEnvelopeFields.Attempt, "1" },
{ QueueEnvelopeFields.EnqueuedAt, _timeProvider.GetUtcNow().ToUnixTimeMilliseconds().ToString() }
};
if (!string.IsNullOrEmpty(message.TraceId))
{
headers.Add(QueueEnvelopeFields.TraceId, message.TraceId!);
}
if (message.Attributes is not null)
{
foreach (var kvp in message.Attributes)
{
headers.Add(QueueEnvelopeFields.AttributePrefix + kvp.Key, kvp.Value);
}
}
return headers;
}
private NatsHeaders BuildDeadLetterHeaders(NatsScanQueueLease lease, string reason)
{
var headers = new NatsHeaders
{
{ QueueEnvelopeFields.JobId, lease.JobId },
{ QueueEnvelopeFields.IdempotencyKey, lease.IdempotencyKey ?? lease.JobId },
{ QueueEnvelopeFields.Attempt, lease.Attempt.ToString() },
{ QueueEnvelopeFields.EnqueuedAt, lease.EnqueuedAt.ToUnixTimeMilliseconds().ToString() },
{ "deadletter-reason", reason }
};
foreach (var kvp in lease.Attributes)
{
headers.Add(QueueEnvelopeFields.AttributePrefix + kvp.Key, kvp.Value);
}
return headers;
}
private TimeSpan CalculateBackoff(int attempt)
{
var configuredInitial = _options.RetryDelay > TimeSpan.Zero
? _options.RetryDelay
: _queueOptions.RetryInitialBackoff;
if (configuredInitial <= TimeSpan.Zero)
{
return TimeSpan.Zero;
}
if (attempt <= 1)
{
return configuredInitial;
}
var max = _queueOptions.RetryMaxBackoff > TimeSpan.Zero
? _queueOptions.RetryMaxBackoff
: configuredInitial;
var exponent = attempt - 1;
var scaledTicks = configuredInitial.Ticks * Math.Pow(2, exponent - 1);
var cappedTicks = Math.Min(max.Ticks, scaledTicks);
var resultTicks = Math.Max(configuredInitial.Ticks, (long)cappedTicks);
return TimeSpan.FromTicks(resultTicks);
}
private static long ToNanoseconds(TimeSpan timeSpan)
=> timeSpan <= TimeSpan.Zero ? 0 : timeSpan.Ticks * 100L;
private static class EmptyReadOnlyDictionary<TKey, TValue>
where TKey : notnull
{
public static readonly IReadOnlyDictionary<TKey, TValue> Instance =
new ReadOnlyDictionary<TKey, TValue>(new Dictionary<TKey, TValue>(0, EqualityComparer<TKey>.Default));
}
}

View File

@@ -0,0 +1,78 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using NATS.Client.JetStream;
namespace StellaOps.Scanner.Queue.Nats;
internal sealed class NatsScanQueueLease : IScanQueueLease
{
private readonly NatsScanQueue _queue;
private readonly NatsJSMsg<byte[]> _message;
private int _completed;
internal NatsScanQueueLease(
NatsScanQueue queue,
NatsJSMsg<byte[]> message,
string messageId,
string jobId,
byte[] payload,
int attempt,
DateTimeOffset enqueuedAt,
DateTimeOffset leaseExpiresAt,
string consumer,
string? idempotencyKey,
IReadOnlyDictionary<string, string> attributes)
{
_queue = queue;
_message = message;
MessageId = messageId;
JobId = jobId;
Payload = payload;
Attempt = attempt;
EnqueuedAt = enqueuedAt;
LeaseExpiresAt = leaseExpiresAt;
Consumer = consumer;
IdempotencyKey = idempotencyKey;
Attributes = attributes;
}
public string MessageId { get; }
public string JobId { get; }
public ReadOnlyMemory<byte> Payload { get; }
public int Attempt { get; internal set; }
public DateTimeOffset EnqueuedAt { get; }
public DateTimeOffset LeaseExpiresAt { get; private set; }
public string Consumer { get; }
public string? IdempotencyKey { get; }
public IReadOnlyDictionary<string, string> Attributes { get; }
internal NatsJSMsg<byte[]> Message => _message;
public Task AcknowledgeAsync(CancellationToken cancellationToken = default)
=> _queue.AcknowledgeAsync(this, cancellationToken);
public Task RenewAsync(TimeSpan leaseDuration, CancellationToken cancellationToken = default)
=> _queue.RenewLeaseAsync(this, leaseDuration, cancellationToken);
public Task ReleaseAsync(QueueReleaseDisposition disposition, CancellationToken cancellationToken = default)
=> _queue.ReleaseAsync(this, disposition, cancellationToken);
public Task DeadLetterAsync(string reason, CancellationToken cancellationToken = default)
=> _queue.DeadLetterAsync(this, reason, cancellationToken);
internal bool TryBeginCompletion()
=> Interlocked.CompareExchange(ref _completed, 1, 0) == 0;
internal void RefreshLease(DateTimeOffset expiresAt)
=> LeaseExpiresAt = expiresAt;
}

View File

@@ -0,0 +1,12 @@
namespace StellaOps.Scanner.Queue;
internal static class QueueEnvelopeFields
{
public const string Payload = "payload";
public const string JobId = "jobId";
public const string IdempotencyKey = "idempotency";
public const string Attempt = "attempt";
public const string EnqueuedAt = "enqueuedAt";
public const string TraceId = "traceId";
public const string AttributePrefix = "attr:";
}

View File

@@ -0,0 +1,28 @@
using System.Diagnostics.Metrics;
namespace StellaOps.Scanner.Queue;
internal static class QueueMetrics
{
private const string TransportTagName = "transport";
private static readonly Meter Meter = new("StellaOps.Scanner.Queue");
private static readonly Counter<long> EnqueuedCounter = Meter.CreateCounter<long>("scanner_queue_enqueued_total");
private static readonly Counter<long> DeduplicatedCounter = Meter.CreateCounter<long>("scanner_queue_deduplicated_total");
private static readonly Counter<long> AckCounter = Meter.CreateCounter<long>("scanner_queue_ack_total");
private static readonly Counter<long> RetryCounter = Meter.CreateCounter<long>("scanner_queue_retry_total");
private static readonly Counter<long> DeadLetterCounter = Meter.CreateCounter<long>("scanner_queue_deadletter_total");
public static void RecordEnqueued(string transport) => EnqueuedCounter.Add(1, BuildTags(transport));
public static void RecordDeduplicated(string transport) => DeduplicatedCounter.Add(1, BuildTags(transport));
public static void RecordAck(string transport) => AckCounter.Add(1, BuildTags(transport));
public static void RecordRetry(string transport) => RetryCounter.Add(1, BuildTags(transport));
public static void RecordDeadLetter(string transport) => DeadLetterCounter.Add(1, BuildTags(transport));
private static KeyValuePair<string, object?>[] BuildTags(string transport)
=> new[] { new KeyValuePair<string, object?>(TransportTagName, transport) };
}

View File

@@ -0,0 +1,7 @@
namespace StellaOps.Scanner.Queue;
public enum QueueTransportKind
{
Redis,
Nats
}

View File

@@ -0,0 +1,764 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
namespace StellaOps.Scanner.Queue.Redis;
internal sealed class RedisScanQueue : IScanQueue, IAsyncDisposable
{
private const string TransportName = "redis";
private readonly ScannerQueueOptions _queueOptions;
private readonly RedisQueueOptions _options;
private readonly ILogger<RedisScanQueue> _logger;
private readonly TimeProvider _timeProvider;
private readonly SemaphoreSlim _connectionLock = new(1, 1);
private readonly SemaphoreSlim _groupInitLock = new(1, 1);
private readonly Func<ConfigurationOptions, Task<IConnectionMultiplexer>> _connectionFactory;
private IConnectionMultiplexer? _connection;
private volatile bool _groupInitialized;
private bool _disposed;
private string BuildIdempotencyKey(string key)
=> string.Concat(_options.IdempotencyKeyPrefix, key);
public RedisScanQueue(
ScannerQueueOptions queueOptions,
RedisQueueOptions options,
ILogger<RedisScanQueue> logger,
TimeProvider timeProvider,
Func<ConfigurationOptions, Task<IConnectionMultiplexer>>? connectionFactory = null)
{
_queueOptions = queueOptions ?? throw new ArgumentNullException(nameof(queueOptions));
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_timeProvider = timeProvider ?? TimeProvider.System;
_connectionFactory = connectionFactory ?? (config => Task.FromResult<IConnectionMultiplexer>(ConnectionMultiplexer.Connect(config)));
if (string.IsNullOrWhiteSpace(_options.ConnectionString))
{
throw new InvalidOperationException("Redis connection string must be configured for the scanner queue.");
}
}
public async ValueTask<QueueEnqueueResult> EnqueueAsync(
ScanQueueMessage message,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(message);
cancellationToken.ThrowIfCancellationRequested();
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
var now = _timeProvider.GetUtcNow();
await EnsureConsumerGroupAsync(db, cancellationToken).ConfigureAwait(false);
var attempt = 1;
var entries = BuildEntries(message, now, attempt);
var messageId = await AddToStreamAsync(
db,
_options.StreamName,
entries,
_options.ApproximateMaxLength,
_options.ApproximateMaxLength is not null)
.ConfigureAwait(false);
var idempotencyToken = message.IdempotencyKey ?? message.JobId;
var idempotencyKey = BuildIdempotencyKey(idempotencyToken);
var stored = await db.StringSetAsync(
key: idempotencyKey,
value: messageId,
when: When.NotExists,
expiry: _options.IdempotencyWindow)
.ConfigureAwait(false);
if (!stored)
{
// Duplicate enqueue delete the freshly added entry and surface cached ID.
await db.StreamDeleteAsync(
_options.StreamName,
new RedisValue[] { messageId })
.ConfigureAwait(false);
var existing = await db.StringGetAsync(idempotencyKey).ConfigureAwait(false);
var duplicateId = existing.IsNullOrEmpty ? messageId : existing;
_logger.LogDebug(
"Duplicate queue enqueue detected for job {JobId} (token {Token}), returning existing stream id {StreamId}.",
message.JobId,
idempotencyToken,
duplicateId.ToString());
QueueMetrics.RecordDeduplicated(TransportName);
return new QueueEnqueueResult(duplicateId.ToString()!, true);
}
_logger.LogDebug(
"Enqueued job {JobId} into stream {Stream} with id {StreamId}.",
message.JobId,
_options.StreamName,
messageId.ToString());
QueueMetrics.RecordEnqueued(TransportName);
return new QueueEnqueueResult(messageId.ToString()!, false);
}
public async ValueTask<IReadOnlyList<IScanQueueLease>> LeaseAsync(
QueueLeaseRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
cancellationToken.ThrowIfCancellationRequested();
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
await EnsureConsumerGroupAsync(db, cancellationToken).ConfigureAwait(false);
var entries = await db.StreamReadGroupAsync(
_options.StreamName,
_options.ConsumerGroup,
request.Consumer,
position: ">",
count: request.BatchSize,
flags: CommandFlags.None)
.ConfigureAwait(false);
if (entries is null || entries.Length == 0)
{
return Array.Empty<IScanQueueLease>();
}
var now = _timeProvider.GetUtcNow();
var leases = new List<IScanQueueLease>(entries.Length);
foreach (var entry in entries)
{
var lease = TryMapLease(
entry,
request.Consumer,
now,
request.LeaseDuration,
default);
if (lease is null)
{
_logger.LogWarning(
"Stream entry {StreamId} is missing required metadata; acknowledging to avoid poison message.",
entry.Id.ToString());
await db.StreamAcknowledgeAsync(
_options.StreamName,
_options.ConsumerGroup,
new RedisValue[] { entry.Id })
.ConfigureAwait(false);
await db.StreamDeleteAsync(
_options.StreamName,
new RedisValue[] { entry.Id })
.ConfigureAwait(false);
continue;
}
leases.Add(lease);
}
return leases;
}
public async ValueTask<IReadOnlyList<IScanQueueLease>> ClaimExpiredLeasesAsync(
QueueClaimOptions options,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(options);
cancellationToken.ThrowIfCancellationRequested();
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
await EnsureConsumerGroupAsync(db, cancellationToken).ConfigureAwait(false);
var pending = await db.StreamPendingMessagesAsync(
_options.StreamName,
_options.ConsumerGroup,
options.BatchSize,
RedisValue.Null,
(long)options.MinIdleTime.TotalMilliseconds)
.ConfigureAwait(false);
if (pending is null || pending.Length == 0)
{
return Array.Empty<IScanQueueLease>();
}
var eligible = pending
.Where(p => p.IdleTimeInMilliseconds >= options.MinIdleTime.TotalMilliseconds)
.ToArray();
if (eligible.Length == 0)
{
return Array.Empty<IScanQueueLease>();
}
var messageIds = eligible
.Select(static p => (RedisValue)p.MessageId)
.ToArray();
var entries = await db.StreamClaimAsync(
_options.StreamName,
_options.ConsumerGroup,
options.ClaimantConsumer,
0,
messageIds,
CommandFlags.None)
.ConfigureAwait(false);
if (entries is null || entries.Length == 0)
{
return Array.Empty<IScanQueueLease>();
}
var now = _timeProvider.GetUtcNow();
var pendingById = Enumerable.ToDictionary<StreamPendingMessageInfo, string, StreamPendingMessageInfo>(
eligible,
static p => p.MessageId.IsNullOrEmpty ? string.Empty : p.MessageId.ToString(),
static p => p,
StringComparer.Ordinal);
var leases = new List<IScanQueueLease>(entries.Length);
foreach (var entry in entries)
{
var entryIdValue = entry.Id;
var entryId = entryIdValue.IsNullOrEmpty ? string.Empty : entryIdValue.ToString();
var hasPending = pendingById.TryGetValue(entryId, out var pendingInfo);
var attempt = hasPending
? (int)Math.Max(1, pendingInfo.DeliveryCount)
: 1;
var lease = TryMapLease(
entry,
options.ClaimantConsumer,
now,
_queueOptions.DefaultLeaseDuration,
attempt);
if (lease is null)
{
_logger.LogWarning(
"Unable to map claimed stream entry {StreamId}; acknowledging to unblock queue.",
entry.Id.ToString());
await db.StreamAcknowledgeAsync(
_options.StreamName,
_options.ConsumerGroup,
new RedisValue[] { entry.Id })
.ConfigureAwait(false);
await db.StreamDeleteAsync(
_options.StreamName,
new RedisValue[] { entry.Id })
.ConfigureAwait(false);
continue;
}
leases.Add(lease);
}
return leases;
}
public async ValueTask DisposeAsync()
{
if (_disposed)
{
return;
}
_disposed = true;
if (_connection is not null)
{
await _connection.CloseAsync();
_connection.Dispose();
}
_connectionLock.Dispose();
_groupInitLock.Dispose();
GC.SuppressFinalize(this);
}
internal async Task AcknowledgeAsync(
RedisScanQueueLease lease,
CancellationToken cancellationToken)
{
if (!lease.TryBeginCompletion())
{
return;
}
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
await db.StreamAcknowledgeAsync(
_options.StreamName,
_options.ConsumerGroup,
new RedisValue[] { lease.MessageId })
.ConfigureAwait(false);
await db.StreamDeleteAsync(
_options.StreamName,
new RedisValue[] { lease.MessageId })
.ConfigureAwait(false);
_logger.LogDebug(
"Acknowledged job {JobId} ({MessageId}) on consumer {Consumer}.",
lease.JobId,
lease.MessageId,
lease.Consumer);
QueueMetrics.RecordAck(TransportName);
}
internal async Task RenewLeaseAsync(
RedisScanQueueLease lease,
TimeSpan leaseDuration,
CancellationToken cancellationToken)
{
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
await db.StreamClaimAsync(
_options.StreamName,
_options.ConsumerGroup,
lease.Consumer,
0,
new RedisValue[] { lease.MessageId },
CommandFlags.None)
.ConfigureAwait(false);
var expires = _timeProvider.GetUtcNow().Add(leaseDuration);
lease.RefreshLease(expires);
_logger.LogDebug(
"Renewed lease for job {JobId} until {LeaseExpiry:u}.",
lease.JobId,
expires);
}
internal async Task ReleaseAsync(
RedisScanQueueLease lease,
QueueReleaseDisposition disposition,
CancellationToken cancellationToken)
{
if (disposition == QueueReleaseDisposition.Retry
&& lease.Attempt >= _queueOptions.MaxDeliveryAttempts)
{
_logger.LogWarning(
"Job {JobId} reached max delivery attempts ({Attempts}); moving to dead-letter.",
lease.JobId,
lease.Attempt);
await DeadLetterAsync(
lease,
$"max-delivery-attempts:{lease.Attempt}",
cancellationToken).ConfigureAwait(false);
return;
}
if (!lease.TryBeginCompletion())
{
return;
}
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
await db.StreamAcknowledgeAsync(
_options.StreamName,
_options.ConsumerGroup,
new RedisValue[] { lease.MessageId })
.ConfigureAwait(false);
await db.StreamDeleteAsync(
_options.StreamName,
new RedisValue[] { lease.MessageId })
.ConfigureAwait(false);
QueueMetrics.RecordAck(TransportName);
if (disposition == QueueReleaseDisposition.Retry)
{
QueueMetrics.RecordRetry(TransportName);
var delay = CalculateBackoff(lease.Attempt);
if (delay > TimeSpan.Zero)
{
_logger.LogDebug(
"Delaying retry for job {JobId} by {Delay} (attempt {Attempt}).",
lease.JobId,
delay,
lease.Attempt);
try
{
await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
return;
}
}
var requeueMessage = new ScanQueueMessage(lease.JobId, lease.Payload)
{
IdempotencyKey = lease.IdempotencyKey,
Attributes = lease.Attributes,
TraceId = null
};
var now = _timeProvider.GetUtcNow();
var entries = BuildEntries(requeueMessage, now, lease.Attempt + 1);
await AddToStreamAsync(
db,
_options.StreamName,
entries,
_options.ApproximateMaxLength,
_options.ApproximateMaxLength is not null)
.ConfigureAwait(false);
QueueMetrics.RecordEnqueued(TransportName);
_logger.LogWarning(
"Released job {JobId} for retry (attempt {Attempt}).",
lease.JobId,
lease.Attempt + 1);
}
else
{
_logger.LogInformation(
"Abandoned job {JobId} after {Attempt} attempt(s).",
lease.JobId,
lease.Attempt);
}
}
internal async Task DeadLetterAsync(
RedisScanQueueLease lease,
string reason,
CancellationToken cancellationToken)
{
if (!lease.TryBeginCompletion())
{
return;
}
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
await db.StreamAcknowledgeAsync(
_options.StreamName,
_options.ConsumerGroup,
new RedisValue[] { lease.MessageId })
.ConfigureAwait(false);
await db.StreamDeleteAsync(
_options.StreamName,
new RedisValue[] { lease.MessageId })
.ConfigureAwait(false);
var now = _timeProvider.GetUtcNow();
var entries = BuildEntries(
new ScanQueueMessage(lease.JobId, lease.Payload)
{
IdempotencyKey = lease.IdempotencyKey,
Attributes = lease.Attributes,
TraceId = null
},
now,
lease.Attempt);
await AddToStreamAsync(
db,
_queueOptions.DeadLetter.StreamName,
entries,
null,
false)
.ConfigureAwait(false);
_logger.LogError(
"Dead-lettered job {JobId} (attempt {Attempt}): {Reason}",
lease.JobId,
lease.Attempt,
reason);
QueueMetrics.RecordDeadLetter(TransportName);
}
private async ValueTask<IDatabase> GetDatabaseAsync(CancellationToken cancellationToken)
{
if (_connection is not null)
{
return _connection.GetDatabase(_options.Database ?? -1);
}
await _connectionLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (_connection is null)
{
var config = ConfigurationOptions.Parse(_options.ConnectionString!);
config.AbortOnConnectFail = false;
config.ConnectTimeout = (int)_options.InitializationTimeout.TotalMilliseconds;
config.ConnectRetry = 3;
if (_options.Database is not null)
{
config.DefaultDatabase = _options.Database;
}
_connection = await _connectionFactory(config).ConfigureAwait(false);
}
return _connection.GetDatabase(_options.Database ?? -1);
}
finally
{
_connectionLock.Release();
}
}
private async Task EnsureConsumerGroupAsync(
IDatabase database,
CancellationToken cancellationToken)
{
if (_groupInitialized)
{
return;
}
await _groupInitLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (_groupInitialized)
{
return;
}
try
{
await database.StreamCreateConsumerGroupAsync(
_options.StreamName,
_options.ConsumerGroup,
StreamPosition.Beginning,
createStream: true)
.ConfigureAwait(false);
}
catch (RedisServerException ex) when (ex.Message.Contains("BUSYGROUP", StringComparison.OrdinalIgnoreCase))
{
// Already exists.
}
_groupInitialized = true;
}
finally
{
_groupInitLock.Release();
}
}
private NameValueEntry[] BuildEntries(
ScanQueueMessage message,
DateTimeOffset enqueuedAt,
int attempt)
{
var attributeCount = message.Attributes?.Count ?? 0;
var entries = ArrayPool<NameValueEntry>.Shared.Rent(6 + attributeCount);
var index = 0;
entries[index++] = new NameValueEntry(QueueEnvelopeFields.JobId, message.JobId);
entries[index++] = new NameValueEntry(QueueEnvelopeFields.Attempt, attempt);
entries[index++] = new NameValueEntry(QueueEnvelopeFields.EnqueuedAt, enqueuedAt.ToUnixTimeMilliseconds());
entries[index++] = new NameValueEntry(
QueueEnvelopeFields.IdempotencyKey,
message.IdempotencyKey ?? message.JobId);
entries[index++] = new NameValueEntry(
QueueEnvelopeFields.Payload,
message.Payload.ToArray());
entries[index++] = new NameValueEntry(
QueueEnvelopeFields.TraceId,
message.TraceId ?? string.Empty);
if (attributeCount > 0)
{
foreach (var kvp in message.Attributes!)
{
entries[index++] = new NameValueEntry(
QueueEnvelopeFields.AttributePrefix + kvp.Key,
kvp.Value);
}
}
var result = entries.AsSpan(0, index).ToArray();
ArrayPool<NameValueEntry>.Shared.Return(entries, clearArray: true);
return result;
}
private RedisScanQueueLease? TryMapLease(
StreamEntry entry,
string consumer,
DateTimeOffset now,
TimeSpan leaseDuration,
int? attemptOverride)
{
if (entry.Values is null || entry.Values.Length == 0)
{
return null;
}
string? jobId = null;
string? idempotency = null;
long? enqueuedAtUnix = null;
byte[]? payload = null;
string? traceId = null;
var attributes = new Dictionary<string, string>(StringComparer.Ordinal);
var attempt = attemptOverride ?? 1;
foreach (var field in entry.Values)
{
var name = field.Name.ToString();
if (name.Equals(QueueEnvelopeFields.JobId, StringComparison.Ordinal))
{
jobId = field.Value.ToString();
}
else if (name.Equals(QueueEnvelopeFields.IdempotencyKey, StringComparison.Ordinal))
{
idempotency = field.Value.ToString();
}
else if (name.Equals(QueueEnvelopeFields.EnqueuedAt, StringComparison.Ordinal))
{
if (long.TryParse(field.Value.ToString(), out var unix))
{
enqueuedAtUnix = unix;
}
}
else if (name.Equals(QueueEnvelopeFields.Payload, StringComparison.Ordinal))
{
payload = (byte[]?)field.Value ?? Array.Empty<byte>();
}
else if (name.Equals(QueueEnvelopeFields.Attempt, StringComparison.Ordinal))
{
if (int.TryParse(field.Value.ToString(), out var parsedAttempt))
{
attempt = Math.Max(parsedAttempt, attempt);
}
}
else if (name.Equals(QueueEnvelopeFields.TraceId, StringComparison.Ordinal))
{
traceId = field.Value.ToString();
}
else if (name.StartsWith(QueueEnvelopeFields.AttributePrefix, StringComparison.Ordinal))
{
attributes[name[QueueEnvelopeFields.AttributePrefix.Length..]] = field.Value.ToString();
}
}
if (jobId is null || payload is null || enqueuedAtUnix is null)
{
return null;
}
var enqueuedAt = DateTimeOffset.FromUnixTimeMilliseconds(enqueuedAtUnix.Value);
var leaseExpires = now.Add(leaseDuration);
var attributeView = attributes.Count == 0
? EmptyReadOnlyDictionary<string, string>.Instance
: new ReadOnlyDictionary<string, string>(attributes);
return new RedisScanQueueLease(
this,
entry.Id.ToString(),
jobId,
payload,
attempt,
enqueuedAt,
leaseExpires,
consumer,
idempotency,
attributeView);
}
private TimeSpan CalculateBackoff(int attempt)
{
var configuredInitial = _options.RetryInitialBackoff > TimeSpan.Zero
? _options.RetryInitialBackoff
: _queueOptions.RetryInitialBackoff;
var initial = configuredInitial > TimeSpan.Zero
? configuredInitial
: TimeSpan.Zero;
if (initial <= TimeSpan.Zero)
{
return TimeSpan.Zero;
}
if (attempt <= 1)
{
return initial;
}
var configuredMax = _queueOptions.RetryMaxBackoff > TimeSpan.Zero
? _queueOptions.RetryMaxBackoff
: initial;
var max = configuredMax <= TimeSpan.Zero
? initial
: configuredMax;
var exponent = attempt - 1;
var scale = Math.Pow(2, exponent - 1);
var scaledTicks = initial.Ticks * scale;
var cappedTicks = Math.Min(max.Ticks, scaledTicks);
var resultTicks = Math.Max(initial.Ticks, (long)cappedTicks);
return TimeSpan.FromTicks(resultTicks);
}
private async Task<RedisValue> AddToStreamAsync(
IDatabase database,
RedisKey stream,
NameValueEntry[] entries,
int? maxLength,
bool useApproximateLength)
{
var capacity = 4 + (entries.Length * 2);
var args = new List<object>(capacity)
{
stream
};
if (maxLength.HasValue)
{
args.Add("MAXLEN");
if (useApproximateLength)
{
args.Add("~");
}
args.Add(maxLength.Value);
}
args.Add("*");
for (var i = 0; i < entries.Length; i++)
{
args.Add(entries[i].Name);
args.Add(entries[i].Value);
}
var result = await database.ExecuteAsync("XADD", args.ToArray()).ConfigureAwait(false);
return (RedisValue)result!;
}
private static class EmptyReadOnlyDictionary<TKey, TValue>
where TKey : notnull
{
public static readonly IReadOnlyDictionary<TKey, TValue> Instance =
new ReadOnlyDictionary<TKey, TValue>(new Dictionary<TKey, TValue>(0, EqualityComparer<TKey>.Default));
}
internal async ValueTask PingAsync(CancellationToken cancellationToken)
{
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
await db.ExecuteAsync("PING").ConfigureAwait(false);
}
}

View File

@@ -0,0 +1,72 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace StellaOps.Scanner.Queue.Redis;
internal sealed class RedisScanQueueLease : IScanQueueLease
{
private readonly RedisScanQueue _queue;
private int _completed;
internal RedisScanQueueLease(
RedisScanQueue queue,
string messageId,
string jobId,
byte[] payload,
int attempt,
DateTimeOffset enqueuedAt,
DateTimeOffset leaseExpiresAt,
string consumer,
string? idempotencyKey,
IReadOnlyDictionary<string, string> attributes)
{
_queue = queue;
MessageId = messageId;
JobId = jobId;
Payload = payload;
Attempt = attempt;
EnqueuedAt = enqueuedAt;
LeaseExpiresAt = leaseExpiresAt;
Consumer = consumer;
IdempotencyKey = idempotencyKey;
Attributes = attributes;
}
public string MessageId { get; }
public string JobId { get; }
public ReadOnlyMemory<byte> Payload { get; }
public int Attempt { get; }
public DateTimeOffset EnqueuedAt { get; }
public DateTimeOffset LeaseExpiresAt { get; private set; }
public string Consumer { get; }
public string? IdempotencyKey { get; }
public IReadOnlyDictionary<string, string> Attributes { get; }
public Task AcknowledgeAsync(CancellationToken cancellationToken = default)
=> _queue.AcknowledgeAsync(this, cancellationToken);
public Task RenewAsync(TimeSpan leaseDuration, CancellationToken cancellationToken = default)
=> _queue.RenewLeaseAsync(this, leaseDuration, cancellationToken);
public Task ReleaseAsync(QueueReleaseDisposition disposition, CancellationToken cancellationToken = default)
=> _queue.ReleaseAsync(this, disposition, cancellationToken);
public Task DeadLetterAsync(string reason, CancellationToken cancellationToken = default)
=> _queue.DeadLetterAsync(this, reason, cancellationToken);
internal bool TryBeginCompletion()
=> Interlocked.CompareExchange(ref _completed, 1, 0) == 0;
internal void RefreshLease(DateTimeOffset expiresAt)
=> LeaseExpiresAt = expiresAt;
}

View File

@@ -0,0 +1,115 @@
using System;
using System.Collections.Generic;
namespace StellaOps.Scanner.Queue;
public sealed class ScanQueueMessage
{
private readonly byte[] _payload;
public ScanQueueMessage(string jobId, ReadOnlyMemory<byte> payload)
{
if (string.IsNullOrWhiteSpace(jobId))
{
throw new ArgumentException("Job identifier must be provided.", nameof(jobId));
}
JobId = jobId;
_payload = CopyPayload(payload);
}
public string JobId { get; }
public string? IdempotencyKey { get; init; }
public string? TraceId { get; init; }
public IReadOnlyDictionary<string, string>? Attributes { get; init; }
public ReadOnlyMemory<byte> Payload => _payload;
private static byte[] CopyPayload(ReadOnlyMemory<byte> payload)
{
if (payload.Length == 0)
{
return Array.Empty<byte>();
}
var copy = new byte[payload.Length];
payload.Span.CopyTo(copy);
return copy;
}
}
public readonly record struct QueueEnqueueResult(string MessageId, bool Deduplicated);
public sealed class QueueLeaseRequest
{
public QueueLeaseRequest(string consumer, int batchSize, TimeSpan leaseDuration)
{
if (string.IsNullOrWhiteSpace(consumer))
{
throw new ArgumentException("Consumer name must be provided.", nameof(consumer));
}
if (batchSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(batchSize), batchSize, "Batch size must be positive.");
}
if (leaseDuration <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(nameof(leaseDuration), leaseDuration, "Lease duration must be positive.");
}
Consumer = consumer;
BatchSize = batchSize;
LeaseDuration = leaseDuration;
}
public string Consumer { get; }
public int BatchSize { get; }
public TimeSpan LeaseDuration { get; }
}
public sealed class QueueClaimOptions
{
public QueueClaimOptions(
string claimantConsumer,
int batchSize,
TimeSpan minIdleTime)
{
if (string.IsNullOrWhiteSpace(claimantConsumer))
{
throw new ArgumentException("Consumer must be provided.", nameof(claimantConsumer));
}
if (batchSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(batchSize), batchSize, "Batch size must be positive.");
}
if (minIdleTime < TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(nameof(minIdleTime), minIdleTime, "Idle time cannot be negative.");
}
ClaimantConsumer = claimantConsumer;
BatchSize = batchSize;
MinIdleTime = minIdleTime;
}
public string ClaimantConsumer { get; }
public int BatchSize { get; }
public TimeSpan MinIdleTime { get; }
}
public enum QueueReleaseDisposition
{
Retry,
Abandon
}

View File

@@ -0,0 +1,55 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;
using StellaOps.Scanner.Queue.Nats;
using StellaOps.Scanner.Queue.Redis;
namespace StellaOps.Scanner.Queue;
public sealed class ScannerQueueHealthCheck : IHealthCheck
{
private readonly IScanQueue _queue;
private readonly ILogger<ScannerQueueHealthCheck> _logger;
public ScannerQueueHealthCheck(
IScanQueue queue,
ILogger<ScannerQueueHealthCheck> logger)
{
_queue = queue ?? throw new ArgumentNullException(nameof(queue));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
try
{
switch (_queue)
{
case RedisScanQueue redisQueue:
await redisQueue.PingAsync(cancellationToken).ConfigureAwait(false);
return HealthCheckResult.Healthy("Redis queue reachable.");
case NatsScanQueue natsQueue:
await natsQueue.PingAsync(cancellationToken).ConfigureAwait(false);
return HealthCheckResult.Healthy("NATS queue reachable.");
default:
return HealthCheckResult.Healthy("Queue transport without dedicated ping returned healthy.");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Scanner queue health check failed.");
return new HealthCheckResult(
context.Registration.FailureStatus,
"Queue transport unreachable.",
ex);
}
}
}

View File

@@ -0,0 +1,92 @@
using System;
namespace StellaOps.Scanner.Queue;
public sealed class ScannerQueueOptions
{
public QueueTransportKind Kind { get; set; } = QueueTransportKind.Redis;
public RedisQueueOptions Redis { get; set; } = new();
public NatsQueueOptions Nats { get; set; } = new();
/// <summary>
/// Default lease duration applied when callers do not override the visibility timeout.
/// </summary>
public TimeSpan DefaultLeaseDuration { get; set; } = TimeSpan.FromMinutes(5);
/// <summary>
/// Maximum number of times a message may be delivered before it is shunted to the dead-letter queue.
/// </summary>
public int MaxDeliveryAttempts { get; set; } = 5;
/// <summary>
/// Options controlling retry/backoff/dead-letter handling.
/// </summary>
public DeadLetterQueueOptions DeadLetter { get; set; } = new();
/// <summary>
/// Initial backoff applied when a job is retried after failure.
/// </summary>
public TimeSpan RetryInitialBackoff { get; set; } = TimeSpan.FromSeconds(5);
/// <summary>
/// Maximum backoff window applied for exponential retry.
/// </summary>
public TimeSpan RetryMaxBackoff { get; set; } = TimeSpan.FromMinutes(2);
}
public sealed class RedisQueueOptions
{
public string? ConnectionString { get; set; }
public int? Database { get; set; }
public string StreamName { get; set; } = "scanner:jobs";
public string ConsumerGroup { get; set; } = "scanner-workers";
public string IdempotencyKeyPrefix { get; set; } = "scanner:jobs:idemp:";
public TimeSpan IdempotencyWindow { get; set; } = TimeSpan.FromHours(12);
public int? ApproximateMaxLength { get; set; }
public TimeSpan InitializationTimeout { get; set; } = TimeSpan.FromSeconds(30);
public TimeSpan ClaimIdleThreshold { get; set; } = TimeSpan.FromMinutes(10);
public TimeSpan PendingScanWindow { get; set; } = TimeSpan.FromMinutes(30);
public TimeSpan RetryInitialBackoff { get; set; } = TimeSpan.FromSeconds(5);
}
public sealed class NatsQueueOptions
{
public string? Url { get; set; }
public string Stream { get; set; } = "SCANNER_JOBS";
public string Subject { get; set; } = "scanner.jobs";
public string DurableConsumer { get; set; } = "scanner-workers";
public int MaxInFlight { get; set; } = 64;
public TimeSpan AckWait { get; set; } = TimeSpan.FromMinutes(5);
public string DeadLetterStream { get; set; } = "SCANNER_JOBS_DEAD";
public string DeadLetterSubject { get; set; } = "scanner.jobs.dead";
public TimeSpan RetryDelay { get; set; } = TimeSpan.FromSeconds(10);
public TimeSpan IdleHeartbeat { get; set; } = TimeSpan.FromSeconds(30);
}
public sealed class DeadLetterQueueOptions
{
public string StreamName { get; set; } = "scanner:jobs:dead";
public TimeSpan Retention { get; set; } = TimeSpan.FromDays(7);
}

View File

@@ -0,0 +1,67 @@
using System;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;
using StellaOps.Scanner.Queue.Nats;
using StellaOps.Scanner.Queue.Redis;
namespace StellaOps.Scanner.Queue;
public static class ScannerQueueServiceCollectionExtensions
{
public static IServiceCollection AddScannerQueue(
this IServiceCollection services,
IConfiguration configuration,
string sectionName = "scanner:queue")
{
ArgumentNullException.ThrowIfNull(services);
ArgumentNullException.ThrowIfNull(configuration);
var options = new ScannerQueueOptions();
configuration.GetSection(sectionName).Bind(options);
services.TryAddSingleton(TimeProvider.System);
services.AddSingleton(options);
services.AddSingleton<IScanQueue>(sp =>
{
var loggerFactory = sp.GetRequiredService<ILoggerFactory>();
var timeProvider = sp.GetService<TimeProvider>() ?? TimeProvider.System;
return options.Kind switch
{
QueueTransportKind.Redis => new RedisScanQueue(
options,
options.Redis,
loggerFactory.CreateLogger<RedisScanQueue>(),
timeProvider),
QueueTransportKind.Nats => new NatsScanQueue(
options,
options.Nats,
loggerFactory.CreateLogger<NatsScanQueue>(),
timeProvider),
_ => throw new InvalidOperationException($"Unsupported queue transport kind '{options.Kind}'.")
};
});
services.AddSingleton<ScannerQueueHealthCheck>();
return services;
}
public static IHealthChecksBuilder AddScannerQueueHealthCheck(
this IHealthChecksBuilder builder)
{
ArgumentNullException.ThrowIfNull(builder);
builder.Services.TryAddSingleton<ScannerQueueHealthCheck>();
builder.AddCheck<ScannerQueueHealthCheck>(
name: "scanner-queue",
failureStatus: HealthStatus.Unhealthy,
tags: new[] { "scanner", "queue" });
return builder;
}
}

View File

@@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<GenerateAssemblyInfo>false</GenerateAssemblyInfo>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="8.0.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="8.0.1" />
<PackageReference Include="StackExchange.Redis" Version="2.7.33" />
<PackageReference Include="NATS.Client.Core" Version="2.0.0" />
<PackageReference Include="NATS.Client.JetStream" Version="2.0.0" />
</ItemGroup>
</Project>

View File

@@ -0,0 +1,7 @@
# Scanner Queue Task Board (Sprint 9)
| ID | Status | Owner(s) | Depends on | Description | Exit Criteria |
|----|--------|----------|------------|-------------|---------------|
| SCANNER-QUEUE-09-401 | DONE (2025-10-19) | Scanner Queue Guild | — | Implement queue abstraction + Redis Streams adapter with ack/lease semantics, idempotency tokens, and deterministic job IDs. | Interfaces finalized; Redis adapter passes enqueue/dequeue/ack/claim lease tests; structured logs exercised. |
| SCANNER-QUEUE-09-402 | DONE (2025-10-19) | Scanner Queue Guild | SCANNER-QUEUE-09-401 | Add pluggable backend support (Redis, NATS) with configuration binding, health probes, failover documentation. | NATS adapter + DI bindings delivered; health checks documented; configuration tests green. |
| SCANNER-QUEUE-09-403 | DONE (2025-10-19) | Scanner Queue Guild | SCANNER-QUEUE-09-401 | Implement retry and dead-letter flow with structured metrics/logs for offline deployments. | Retry policy configurable; dead-letter queue persisted; metrics counters validated in integration tests. |