Resolve Concelier/Excititor merge conflicts

This commit is contained in:
root
2025-10-20 14:19:25 +03:00
2687 changed files with 212646 additions and 85913 deletions

View File

@@ -0,0 +1,390 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Extensions.Time.Testing;
using StellaOps.Scanner.Queue;
using Xunit;
namespace StellaOps.Scanner.Queue.Tests;
public sealed class QueueLeaseIntegrationTests
{
private readonly ScannerQueueOptions _options = new()
{
MaxDeliveryAttempts = 3,
RetryInitialBackoff = TimeSpan.FromMilliseconds(1),
RetryMaxBackoff = TimeSpan.FromMilliseconds(5),
DefaultLeaseDuration = TimeSpan.FromSeconds(5)
};
[Fact]
public async Task Enqueue_ShouldDeduplicate_ByIdempotencyKey()
{
var clock = new FakeTimeProvider();
var queue = new InMemoryScanQueue(_options, clock);
var payload = new byte[] { 1, 2, 3 };
var message = new ScanQueueMessage("job-1", payload)
{
IdempotencyKey = "idem-1"
};
var first = await queue.EnqueueAsync(message);
first.Deduplicated.Should().BeFalse();
var second = await queue.EnqueueAsync(message);
second.Deduplicated.Should().BeTrue();
}
[Fact]
public async Task Lease_ShouldExposeTraceId_FromQueuedMessage()
{
var clock = new FakeTimeProvider();
var queue = new InMemoryScanQueue(_options, clock);
var payload = new byte[] { 9 };
var message = new ScanQueueMessage("job-trace", payload)
{
TraceId = "trace-123"
};
await queue.EnqueueAsync(message);
var lease = await LeaseSingleAsync(queue, consumer: "worker-trace");
lease.Should().NotBeNull();
lease!.TraceId.Should().Be("trace-123");
}
[Fact]
public async Task Lease_Acknowledge_ShouldRemoveFromQueue()
{
var clock = new FakeTimeProvider();
var queue = new InMemoryScanQueue(_options, clock);
var message = new ScanQueueMessage("job-ack", new byte[] { 42 });
await queue.EnqueueAsync(message);
var lease = await LeaseSingleAsync(queue, consumer: "worker-1");
lease.Should().NotBeNull();
await lease!.AcknowledgeAsync();
var afterAck = await queue.LeaseAsync(new QueueLeaseRequest("worker-1", 1, TimeSpan.FromSeconds(1)));
afterAck.Should().BeEmpty();
}
[Fact]
public async Task Release_WithRetry_ShouldDeadLetterAfterMaxAttempts()
{
var clock = new FakeTimeProvider();
var queue = new InMemoryScanQueue(_options, clock);
var message = new ScanQueueMessage("job-retry", new byte[] { 5 });
await queue.EnqueueAsync(message);
for (var attempt = 1; attempt <= _options.MaxDeliveryAttempts; attempt++)
{
var lease = await LeaseSingleAsync(queue, consumer: $"worker-{attempt}");
lease.Should().NotBeNull();
await lease!.ReleaseAsync(QueueReleaseDisposition.Retry);
}
queue.DeadLetters.Should().ContainSingle(dead => dead.JobId == "job-retry");
}
[Fact]
public async Task Retry_ShouldIncreaseAttemptOnNextLease()
{
var clock = new FakeTimeProvider();
var queue = new InMemoryScanQueue(_options, clock);
await queue.EnqueueAsync(new ScanQueueMessage("job-retry-attempt", new byte[] { 77 }));
var firstLease = await LeaseSingleAsync(queue, "worker-retry");
firstLease.Should().NotBeNull();
firstLease!.Attempt.Should().Be(1);
await firstLease.ReleaseAsync(QueueReleaseDisposition.Retry);
var secondLease = await LeaseSingleAsync(queue, "worker-retry");
secondLease.Should().NotBeNull();
secondLease!.Attempt.Should().Be(2);
}
private static async Task<IScanQueueLease?> LeaseSingleAsync(InMemoryScanQueue queue, string consumer)
{
var leases = await queue.LeaseAsync(new QueueLeaseRequest(consumer, 1, TimeSpan.FromSeconds(1)));
return leases.FirstOrDefault();
}
private sealed class InMemoryScanQueue : IScanQueue
{
private readonly ScannerQueueOptions _options;
private readonly TimeProvider _timeProvider;
private readonly ConcurrentQueue<QueueEntry> _ready = new();
private readonly ConcurrentDictionary<string, QueueEntry> _idempotency = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, QueueEntry> _inFlight = new(StringComparer.Ordinal);
private readonly List<QueueEntry> _deadLetters = new();
private long _sequence;
public InMemoryScanQueue(ScannerQueueOptions options, TimeProvider timeProvider)
{
_options = options;
_timeProvider = timeProvider;
}
public IReadOnlyList<QueueEntry> DeadLetters => _deadLetters;
public ValueTask<QueueEnqueueResult> EnqueueAsync(ScanQueueMessage message, CancellationToken cancellationToken = default)
{
var token = message.IdempotencyKey ?? message.JobId;
if (_idempotency.TryGetValue(token, out var existing))
{
return ValueTask.FromResult(new QueueEnqueueResult(existing.SequenceId, true));
}
var entry = new QueueEntry(
sequenceId: Interlocked.Increment(ref _sequence).ToString(),
jobId: message.JobId,
payload: message.Payload.ToArray(),
idempotencyKey: token,
attempt: 1,
enqueuedAt: _timeProvider.GetUtcNow(),
traceId: message.TraceId,
attributes: message.Attributes is null
? new ReadOnlyDictionary<string, string>(new Dictionary<string, string>(0, StringComparer.Ordinal))
: new ReadOnlyDictionary<string, string>(new Dictionary<string, string>(message.Attributes, StringComparer.Ordinal)));
_idempotency[token] = entry;
_ready.Enqueue(entry);
return ValueTask.FromResult(new QueueEnqueueResult(entry.SequenceId, false));
}
public ValueTask<IReadOnlyList<IScanQueueLease>> LeaseAsync(QueueLeaseRequest request, CancellationToken cancellationToken = default)
{
var now = _timeProvider.GetUtcNow();
var leases = new List<IScanQueueLease>(request.BatchSize);
while (leases.Count < request.BatchSize && _ready.TryDequeue(out var entry))
{
entry.Attempt = Math.Max(entry.Attempt, entry.Deliveries + 1);
entry.Deliveries = entry.Attempt;
entry.LastLeaseAt = now;
_inFlight[entry.SequenceId] = entry;
var lease = new InMemoryLease(
this,
entry,
request.Consumer,
now,
request.LeaseDuration);
leases.Add(lease);
}
return ValueTask.FromResult<IReadOnlyList<IScanQueueLease>>(leases);
}
public ValueTask<IReadOnlyList<IScanQueueLease>> ClaimExpiredLeasesAsync(QueueClaimOptions options, CancellationToken cancellationToken = default)
{
var now = _timeProvider.GetUtcNow();
var leases = _inFlight.Values
.Where(entry => now - entry.LastLeaseAt >= options.MinIdleTime)
.Take(options.BatchSize)
.Select(entry => new InMemoryLease(this, entry, options.ClaimantConsumer, now, _options.DefaultLeaseDuration))
.Cast<IScanQueueLease>()
.ToList();
return ValueTask.FromResult<IReadOnlyList<IScanQueueLease>>(leases);
}
internal Task AcknowledgeAsync(QueueEntry entry)
{
_inFlight.TryRemove(entry.SequenceId, out _);
_idempotency.TryRemove(entry.IdempotencyKey, out _);
return Task.CompletedTask;
}
internal Task<DateTimeOffset> RenewAsync(QueueEntry entry, TimeSpan leaseDuration)
{
var expires = _timeProvider.GetUtcNow().Add(leaseDuration);
entry.LeaseExpiresAt = expires;
return Task.FromResult(expires);
}
internal Task ReleaseAsync(QueueEntry entry, QueueReleaseDisposition disposition)
{
if (disposition == QueueReleaseDisposition.Retry && entry.Attempt >= _options.MaxDeliveryAttempts)
{
return DeadLetterAsync(entry, $"max-delivery-attempts:{entry.Attempt}");
}
if (disposition == QueueReleaseDisposition.Retry)
{
entry.Attempt++;
_ready.Enqueue(entry);
}
else
{
_idempotency.TryRemove(entry.IdempotencyKey, out _);
}
_inFlight.TryRemove(entry.SequenceId, out _);
return Task.CompletedTask;
}
internal Task DeadLetterAsync(QueueEntry entry, string reason)
{
entry.DeadLetterReason = reason;
_inFlight.TryRemove(entry.SequenceId, out _);
_idempotency.TryRemove(entry.IdempotencyKey, out _);
_deadLetters.Add(entry);
return Task.CompletedTask;
}
private sealed class InMemoryLease : IScanQueueLease
{
private readonly InMemoryScanQueue _owner;
private readonly QueueEntry _entry;
private int _completed;
public InMemoryLease(
InMemoryScanQueue owner,
QueueEntry entry,
string consumer,
DateTimeOffset now,
TimeSpan leaseDuration)
{
_owner = owner;
_entry = entry;
Consumer = consumer;
MessageId = entry.SequenceId;
JobId = entry.JobId;
Payload = entry.Payload;
Attempt = entry.Attempt;
EnqueuedAt = entry.EnqueuedAt;
LeaseExpiresAt = now.Add(leaseDuration);
IdempotencyKey = entry.IdempotencyKey;
TraceId = entry.TraceId;
Attributes = entry.Attributes;
}
public string MessageId { get; }
public string JobId { get; }
public ReadOnlyMemory<byte> Payload { get; }
public int Attempt { get; }
public DateTimeOffset EnqueuedAt { get; }
public DateTimeOffset LeaseExpiresAt { get; private set; }
public string Consumer { get; }
public string? IdempotencyKey { get; }
public string? TraceId { get; }
public IReadOnlyDictionary<string, string> Attributes { get; }
public Task AcknowledgeAsync(CancellationToken cancellationToken = default)
{
if (TryComplete())
{
return _owner.AcknowledgeAsync(_entry);
}
return Task.CompletedTask;
}
public Task RenewAsync(TimeSpan leaseDuration, CancellationToken cancellationToken = default)
{
return RenewInternalAsync(leaseDuration);
}
public Task ReleaseAsync(QueueReleaseDisposition disposition, CancellationToken cancellationToken = default)
{
if (TryComplete())
{
return _owner.ReleaseAsync(_entry, disposition);
}
return Task.CompletedTask;
}
public Task DeadLetterAsync(string reason, CancellationToken cancellationToken = default)
{
if (TryComplete())
{
return _owner.DeadLetterAsync(_entry, reason);
}
return Task.CompletedTask;
}
private async Task RenewInternalAsync(TimeSpan leaseDuration)
{
var expires = await _owner.RenewAsync(_entry, leaseDuration).ConfigureAwait(false);
LeaseExpiresAt = expires;
}
private bool TryComplete()
=> Interlocked.CompareExchange(ref _completed, 1, 0) == 0;
}
internal sealed class QueueEntry
{
public QueueEntry(
string sequenceId,
string jobId,
byte[] payload,
string idempotencyKey,
int attempt,
DateTimeOffset enqueuedAt,
string? traceId,
IReadOnlyDictionary<string, string> attributes)
{
SequenceId = sequenceId;
JobId = jobId;
Payload = payload;
IdempotencyKey = idempotencyKey;
Attempt = attempt;
EnqueuedAt = enqueuedAt;
LastLeaseAt = enqueuedAt;
TraceId = traceId;
Attributes = attributes;
}
public string SequenceId { get; }
public string JobId { get; }
public byte[] Payload { get; }
public string IdempotencyKey { get; }
public int Attempt { get; set; }
public int Deliveries { get; set; }
public DateTimeOffset EnqueuedAt { get; }
public DateTimeOffset LeaseExpiresAt { get; set; }
public DateTimeOffset LastLeaseAt { get; set; }
public string? TraceId { get; }
public IReadOnlyDictionary<string, string> Attributes { get; }
public string? DeadLetterReason { get; set; }
}
}
}

View File

@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="FluentAssertions" Version="6.12.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="../StellaOps.Scanner.Queue/StellaOps.Scanner.Queue.csproj" />
</ItemGroup>
</Project>