using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Linq; using System.Threading; using System.Threading.Tasks; using FluentAssertions; using Microsoft.Extensions.Time.Testing; using StellaOps.Scanner.Queue; using Xunit; using StellaOps.TestKit; namespace StellaOps.Scanner.Queue.Tests; public sealed class QueueLeaseIntegrationTests { private readonly ScannerQueueOptions _options = new() { MaxDeliveryAttempts = 3, RetryInitialBackoff = TimeSpan.FromMilliseconds(1), RetryMaxBackoff = TimeSpan.FromMilliseconds(5), DefaultLeaseDuration = TimeSpan.FromSeconds(5) }; [Trait("Category", TestCategories.Unit)] [Fact] public async Task Enqueue_ShouldDeduplicate_ByIdempotencyKey() { var clock = new FakeTimeProvider(); var queue = new InMemoryScanQueue(_options, clock); var payload = new byte[] { 1, 2, 3 }; var message = new ScanQueueMessage("job-1", payload) { IdempotencyKey = "idem-1" }; var first = await queue.EnqueueAsync(message); first.Deduplicated.Should().BeFalse(); var second = await queue.EnqueueAsync(message); second.Deduplicated.Should().BeTrue(); } [Trait("Category", TestCategories.Unit)] [Fact] public async Task Lease_ShouldExposeTraceId_FromQueuedMessage() { var clock = new FakeTimeProvider(); var queue = new InMemoryScanQueue(_options, clock); var payload = new byte[] { 9 }; var message = new ScanQueueMessage("job-trace", payload) { TraceId = "trace-123" }; await queue.EnqueueAsync(message); var lease = await LeaseSingleAsync(queue, consumer: "worker-trace"); lease.Should().NotBeNull(); lease!.TraceId.Should().Be("trace-123"); } [Trait("Category", TestCategories.Unit)] [Fact] public async Task Lease_Acknowledge_ShouldRemoveFromQueue() { var clock = new FakeTimeProvider(); var queue = new InMemoryScanQueue(_options, clock); var message = new ScanQueueMessage("job-ack", new byte[] { 42 }); await queue.EnqueueAsync(message); var lease = await LeaseSingleAsync(queue, consumer: "worker-1"); lease.Should().NotBeNull(); await lease!.AcknowledgeAsync(); var afterAck = await queue.LeaseAsync(new QueueLeaseRequest("worker-1", 1, TimeSpan.FromSeconds(1))); afterAck.Should().BeEmpty(); } [Trait("Category", TestCategories.Unit)] [Fact] public async Task Release_WithRetry_ShouldDeadLetterAfterMaxAttempts() { var clock = new FakeTimeProvider(); var queue = new InMemoryScanQueue(_options, clock); var message = new ScanQueueMessage("job-retry", new byte[] { 5 }); await queue.EnqueueAsync(message); for (var attempt = 1; attempt <= _options.MaxDeliveryAttempts; attempt++) { var lease = await LeaseSingleAsync(queue, consumer: $"worker-{attempt}"); lease.Should().NotBeNull(); await lease!.ReleaseAsync(QueueReleaseDisposition.Retry); } queue.DeadLetters.Should().ContainSingle(dead => dead.JobId == "job-retry"); } [Trait("Category", TestCategories.Unit)] [Fact] public async Task Retry_ShouldIncreaseAttemptOnNextLease() { var clock = new FakeTimeProvider(); var queue = new InMemoryScanQueue(_options, clock); await queue.EnqueueAsync(new ScanQueueMessage("job-retry-attempt", new byte[] { 77 })); var firstLease = await LeaseSingleAsync(queue, "worker-retry"); firstLease.Should().NotBeNull(); firstLease!.Attempt.Should().Be(1); await firstLease.ReleaseAsync(QueueReleaseDisposition.Retry); var secondLease = await LeaseSingleAsync(queue, "worker-retry"); secondLease.Should().NotBeNull(); secondLease!.Attempt.Should().Be(2); } private static async Task LeaseSingleAsync(InMemoryScanQueue queue, string consumer) { var leases = await queue.LeaseAsync(new QueueLeaseRequest(consumer, 1, TimeSpan.FromSeconds(1))); return leases.FirstOrDefault(); } private sealed class InMemoryScanQueue : IScanQueue { private readonly ScannerQueueOptions _options; private readonly TimeProvider _timeProvider; private readonly ConcurrentQueue _ready = new(); private readonly ConcurrentDictionary _idempotency = new(StringComparer.Ordinal); private readonly ConcurrentDictionary _inFlight = new(StringComparer.Ordinal); private readonly List _deadLetters = new(); private long _sequence; public InMemoryScanQueue(ScannerQueueOptions options, TimeProvider timeProvider) { _options = options; _timeProvider = timeProvider; } public IReadOnlyList DeadLetters => _deadLetters; public ValueTask EnqueueAsync(ScanQueueMessage message, CancellationToken cancellationToken = default) { var token = message.IdempotencyKey ?? message.JobId; if (_idempotency.TryGetValue(token, out var existing)) { return ValueTask.FromResult(new QueueEnqueueResult(existing.SequenceId, true)); } var entry = new QueueEntry( sequenceId: Interlocked.Increment(ref _sequence).ToString(), jobId: message.JobId, payload: message.Payload.ToArray(), idempotencyKey: token, attempt: 1, enqueuedAt: _timeProvider.GetUtcNow(), traceId: message.TraceId, attributes: message.Attributes is null ? new ReadOnlyDictionary(new Dictionary(0, StringComparer.Ordinal)) : new ReadOnlyDictionary(new Dictionary(message.Attributes, StringComparer.Ordinal))); _idempotency[token] = entry; _ready.Enqueue(entry); return ValueTask.FromResult(new QueueEnqueueResult(entry.SequenceId, false)); } public ValueTask> LeaseAsync(QueueLeaseRequest request, CancellationToken cancellationToken = default) { var now = _timeProvider.GetUtcNow(); var leases = new List(request.BatchSize); while (leases.Count < request.BatchSize && _ready.TryDequeue(out var entry)) { entry.Attempt = Math.Max(entry.Attempt, entry.Deliveries + 1); entry.Deliveries = entry.Attempt; entry.LastLeaseAt = now; _inFlight[entry.SequenceId] = entry; var lease = new InMemoryLease( this, entry, request.Consumer, now, request.LeaseDuration); leases.Add(lease); } return ValueTask.FromResult>(leases); } public ValueTask> ClaimExpiredLeasesAsync(QueueClaimOptions options, CancellationToken cancellationToken = default) { var now = _timeProvider.GetUtcNow(); var leases = _inFlight.Values .Where(entry => now - entry.LastLeaseAt >= options.MinIdleTime) .Take(options.BatchSize) .Select(entry => new InMemoryLease(this, entry, options.ClaimantConsumer, now, _options.DefaultLeaseDuration)) .Cast() .ToList(); return ValueTask.FromResult>(leases); } internal Task AcknowledgeAsync(QueueEntry entry) { _inFlight.TryRemove(entry.SequenceId, out _); _idempotency.TryRemove(entry.IdempotencyKey, out _); return Task.CompletedTask; } internal Task RenewAsync(QueueEntry entry, TimeSpan leaseDuration) { var expires = _timeProvider.GetUtcNow().Add(leaseDuration); entry.LeaseExpiresAt = expires; return Task.FromResult(expires); } internal Task ReleaseAsync(QueueEntry entry, QueueReleaseDisposition disposition) { if (disposition == QueueReleaseDisposition.Retry && entry.Attempt >= _options.MaxDeliveryAttempts) { return DeadLetterAsync(entry, $"max-delivery-attempts:{entry.Attempt}"); } if (disposition == QueueReleaseDisposition.Retry) { entry.Attempt++; _ready.Enqueue(entry); } else { _idempotency.TryRemove(entry.IdempotencyKey, out _); } _inFlight.TryRemove(entry.SequenceId, out _); return Task.CompletedTask; } internal Task DeadLetterAsync(QueueEntry entry, string reason) { entry.DeadLetterReason = reason; _inFlight.TryRemove(entry.SequenceId, out _); _idempotency.TryRemove(entry.IdempotencyKey, out _); _deadLetters.Add(entry); return Task.CompletedTask; } private sealed class InMemoryLease : IScanQueueLease { private readonly InMemoryScanQueue _owner; private readonly QueueEntry _entry; private int _completed; public InMemoryLease( InMemoryScanQueue owner, QueueEntry entry, string consumer, DateTimeOffset now, TimeSpan leaseDuration) { _owner = owner; _entry = entry; Consumer = consumer; MessageId = entry.SequenceId; JobId = entry.JobId; Payload = entry.Payload; Attempt = entry.Attempt; EnqueuedAt = entry.EnqueuedAt; LeaseExpiresAt = now.Add(leaseDuration); IdempotencyKey = entry.IdempotencyKey; TraceId = entry.TraceId; Attributes = entry.Attributes; } public string MessageId { get; } public string JobId { get; } public ReadOnlyMemory Payload { get; } public int Attempt { get; } public DateTimeOffset EnqueuedAt { get; } public DateTimeOffset LeaseExpiresAt { get; private set; } public string Consumer { get; } public string? IdempotencyKey { get; } public string? TraceId { get; } public IReadOnlyDictionary Attributes { get; } public Task AcknowledgeAsync(CancellationToken cancellationToken = default) { if (TryComplete()) { return _owner.AcknowledgeAsync(_entry); } return Task.CompletedTask; } public Task RenewAsync(TimeSpan leaseDuration, CancellationToken cancellationToken = default) { return RenewInternalAsync(leaseDuration); } public Task ReleaseAsync(QueueReleaseDisposition disposition, CancellationToken cancellationToken = default) { if (TryComplete()) { return _owner.ReleaseAsync(_entry, disposition); } return Task.CompletedTask; } public Task DeadLetterAsync(string reason, CancellationToken cancellationToken = default) { if (TryComplete()) { return _owner.DeadLetterAsync(_entry, reason); } return Task.CompletedTask; } private async Task RenewInternalAsync(TimeSpan leaseDuration) { var expires = await _owner.RenewAsync(_entry, leaseDuration).ConfigureAwait(false); LeaseExpiresAt = expires; } private bool TryComplete() => Interlocked.CompareExchange(ref _completed, 1, 0) == 0; } internal sealed class QueueEntry { public QueueEntry( string sequenceId, string jobId, byte[] payload, string idempotencyKey, int attempt, DateTimeOffset enqueuedAt, string? traceId, IReadOnlyDictionary attributes) { SequenceId = sequenceId; JobId = jobId; Payload = payload; IdempotencyKey = idempotencyKey; Attempt = attempt; EnqueuedAt = enqueuedAt; LastLeaseAt = enqueuedAt; TraceId = traceId; Attributes = attributes; } public string SequenceId { get; } public string JobId { get; } public byte[] Payload { get; } public string IdempotencyKey { get; } public int Attempt { get; set; } public int Deliveries { get; set; } public DateTimeOffset EnqueuedAt { get; } public DateTimeOffset LeaseExpiresAt { get; set; } public DateTimeOffset LastLeaseAt { get; set; } public string? TraceId { get; } public IReadOnlyDictionary Attributes { get; } public string? DeadLetterReason { get; set; } } } }