- Added Program.cs to set up the web application with Serilog for logging, health check endpoints, and a placeholder admission endpoint. - Configured Kestrel server to use TLS 1.3 and handle client certificates appropriately. - Created StellaOps.Zastava.Webhook.csproj with necessary dependencies including Serilog and Polly. - Documented tasks in TASKS.md for the Zastava Webhook project, outlining current work and exit criteria for each task.
391 lines
13 KiB
C#
391 lines
13 KiB
C#
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Generic;
|
|
using System.Collections.ObjectModel;
|
|
using System.Linq;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using FluentAssertions;
|
|
using Microsoft.Extensions.Time.Testing;
|
|
using StellaOps.Scanner.Queue;
|
|
using Xunit;
|
|
|
|
namespace StellaOps.Scanner.Queue.Tests;
|
|
|
|
public sealed class QueueLeaseIntegrationTests
|
|
{
|
|
private readonly ScannerQueueOptions _options = new()
|
|
{
|
|
MaxDeliveryAttempts = 3,
|
|
RetryInitialBackoff = TimeSpan.FromMilliseconds(1),
|
|
RetryMaxBackoff = TimeSpan.FromMilliseconds(5),
|
|
DefaultLeaseDuration = TimeSpan.FromSeconds(5)
|
|
};
|
|
|
|
[Fact]
|
|
public async Task Enqueue_ShouldDeduplicate_ByIdempotencyKey()
|
|
{
|
|
var clock = new FakeTimeProvider();
|
|
var queue = new InMemoryScanQueue(_options, clock);
|
|
|
|
var payload = new byte[] { 1, 2, 3 };
|
|
var message = new ScanQueueMessage("job-1", payload)
|
|
{
|
|
IdempotencyKey = "idem-1"
|
|
};
|
|
|
|
var first = await queue.EnqueueAsync(message);
|
|
first.Deduplicated.Should().BeFalse();
|
|
|
|
var second = await queue.EnqueueAsync(message);
|
|
second.Deduplicated.Should().BeTrue();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Lease_ShouldExposeTraceId_FromQueuedMessage()
|
|
{
|
|
var clock = new FakeTimeProvider();
|
|
var queue = new InMemoryScanQueue(_options, clock);
|
|
|
|
var payload = new byte[] { 9 };
|
|
var message = new ScanQueueMessage("job-trace", payload)
|
|
{
|
|
TraceId = "trace-123"
|
|
};
|
|
|
|
await queue.EnqueueAsync(message);
|
|
|
|
var lease = await LeaseSingleAsync(queue, consumer: "worker-trace");
|
|
lease.Should().NotBeNull();
|
|
lease!.TraceId.Should().Be("trace-123");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Lease_Acknowledge_ShouldRemoveFromQueue()
|
|
{
|
|
var clock = new FakeTimeProvider();
|
|
var queue = new InMemoryScanQueue(_options, clock);
|
|
|
|
var message = new ScanQueueMessage("job-ack", new byte[] { 42 });
|
|
await queue.EnqueueAsync(message);
|
|
|
|
var lease = await LeaseSingleAsync(queue, consumer: "worker-1");
|
|
lease.Should().NotBeNull();
|
|
|
|
await lease!.AcknowledgeAsync();
|
|
|
|
var afterAck = await queue.LeaseAsync(new QueueLeaseRequest("worker-1", 1, TimeSpan.FromSeconds(1)));
|
|
afterAck.Should().BeEmpty();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Release_WithRetry_ShouldDeadLetterAfterMaxAttempts()
|
|
{
|
|
var clock = new FakeTimeProvider();
|
|
var queue = new InMemoryScanQueue(_options, clock);
|
|
|
|
var message = new ScanQueueMessage("job-retry", new byte[] { 5 });
|
|
await queue.EnqueueAsync(message);
|
|
|
|
for (var attempt = 1; attempt <= _options.MaxDeliveryAttempts; attempt++)
|
|
{
|
|
var lease = await LeaseSingleAsync(queue, consumer: $"worker-{attempt}");
|
|
lease.Should().NotBeNull();
|
|
|
|
await lease!.ReleaseAsync(QueueReleaseDisposition.Retry);
|
|
}
|
|
|
|
queue.DeadLetters.Should().ContainSingle(dead => dead.JobId == "job-retry");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Retry_ShouldIncreaseAttemptOnNextLease()
|
|
{
|
|
var clock = new FakeTimeProvider();
|
|
var queue = new InMemoryScanQueue(_options, clock);
|
|
|
|
await queue.EnqueueAsync(new ScanQueueMessage("job-retry-attempt", new byte[] { 77 }));
|
|
|
|
var firstLease = await LeaseSingleAsync(queue, "worker-retry");
|
|
firstLease.Should().NotBeNull();
|
|
firstLease!.Attempt.Should().Be(1);
|
|
|
|
await firstLease.ReleaseAsync(QueueReleaseDisposition.Retry);
|
|
|
|
var secondLease = await LeaseSingleAsync(queue, "worker-retry");
|
|
secondLease.Should().NotBeNull();
|
|
secondLease!.Attempt.Should().Be(2);
|
|
}
|
|
|
|
private static async Task<IScanQueueLease?> LeaseSingleAsync(InMemoryScanQueue queue, string consumer)
|
|
{
|
|
var leases = await queue.LeaseAsync(new QueueLeaseRequest(consumer, 1, TimeSpan.FromSeconds(1)));
|
|
return leases.FirstOrDefault();
|
|
}
|
|
|
|
private sealed class InMemoryScanQueue : IScanQueue
|
|
{
|
|
private readonly ScannerQueueOptions _options;
|
|
private readonly TimeProvider _timeProvider;
|
|
private readonly ConcurrentQueue<QueueEntry> _ready = new();
|
|
private readonly ConcurrentDictionary<string, QueueEntry> _idempotency = new(StringComparer.Ordinal);
|
|
private readonly ConcurrentDictionary<string, QueueEntry> _inFlight = new(StringComparer.Ordinal);
|
|
private readonly List<QueueEntry> _deadLetters = new();
|
|
private long _sequence;
|
|
|
|
public InMemoryScanQueue(ScannerQueueOptions options, TimeProvider timeProvider)
|
|
{
|
|
_options = options;
|
|
_timeProvider = timeProvider;
|
|
}
|
|
|
|
public IReadOnlyList<QueueEntry> DeadLetters => _deadLetters;
|
|
|
|
public ValueTask<QueueEnqueueResult> EnqueueAsync(ScanQueueMessage message, CancellationToken cancellationToken = default)
|
|
{
|
|
var token = message.IdempotencyKey ?? message.JobId;
|
|
if (_idempotency.TryGetValue(token, out var existing))
|
|
{
|
|
return ValueTask.FromResult(new QueueEnqueueResult(existing.SequenceId, true));
|
|
}
|
|
|
|
var entry = new QueueEntry(
|
|
sequenceId: Interlocked.Increment(ref _sequence).ToString(),
|
|
jobId: message.JobId,
|
|
payload: message.Payload.ToArray(),
|
|
idempotencyKey: token,
|
|
attempt: 1,
|
|
enqueuedAt: _timeProvider.GetUtcNow(),
|
|
traceId: message.TraceId,
|
|
attributes: message.Attributes is null
|
|
? new ReadOnlyDictionary<string, string>(new Dictionary<string, string>(0, StringComparer.Ordinal))
|
|
: new ReadOnlyDictionary<string, string>(new Dictionary<string, string>(message.Attributes, StringComparer.Ordinal)));
|
|
|
|
_idempotency[token] = entry;
|
|
_ready.Enqueue(entry);
|
|
return ValueTask.FromResult(new QueueEnqueueResult(entry.SequenceId, false));
|
|
}
|
|
|
|
public ValueTask<IReadOnlyList<IScanQueueLease>> LeaseAsync(QueueLeaseRequest request, CancellationToken cancellationToken = default)
|
|
{
|
|
var now = _timeProvider.GetUtcNow();
|
|
var leases = new List<IScanQueueLease>(request.BatchSize);
|
|
|
|
while (leases.Count < request.BatchSize && _ready.TryDequeue(out var entry))
|
|
{
|
|
entry.Attempt = Math.Max(entry.Attempt, entry.Deliveries + 1);
|
|
entry.Deliveries = entry.Attempt;
|
|
entry.LastLeaseAt = now;
|
|
_inFlight[entry.SequenceId] = entry;
|
|
|
|
var lease = new InMemoryLease(
|
|
this,
|
|
entry,
|
|
request.Consumer,
|
|
now,
|
|
request.LeaseDuration);
|
|
leases.Add(lease);
|
|
}
|
|
|
|
return ValueTask.FromResult<IReadOnlyList<IScanQueueLease>>(leases);
|
|
}
|
|
|
|
public ValueTask<IReadOnlyList<IScanQueueLease>> ClaimExpiredLeasesAsync(QueueClaimOptions options, CancellationToken cancellationToken = default)
|
|
{
|
|
var now = _timeProvider.GetUtcNow();
|
|
var leases = _inFlight.Values
|
|
.Where(entry => now - entry.LastLeaseAt >= options.MinIdleTime)
|
|
.Take(options.BatchSize)
|
|
.Select(entry => new InMemoryLease(this, entry, options.ClaimantConsumer, now, _options.DefaultLeaseDuration))
|
|
.Cast<IScanQueueLease>()
|
|
.ToList();
|
|
|
|
return ValueTask.FromResult<IReadOnlyList<IScanQueueLease>>(leases);
|
|
}
|
|
|
|
internal Task AcknowledgeAsync(QueueEntry entry)
|
|
{
|
|
_inFlight.TryRemove(entry.SequenceId, out _);
|
|
_idempotency.TryRemove(entry.IdempotencyKey, out _);
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
internal Task<DateTimeOffset> RenewAsync(QueueEntry entry, TimeSpan leaseDuration)
|
|
{
|
|
var expires = _timeProvider.GetUtcNow().Add(leaseDuration);
|
|
entry.LeaseExpiresAt = expires;
|
|
return Task.FromResult(expires);
|
|
}
|
|
|
|
internal Task ReleaseAsync(QueueEntry entry, QueueReleaseDisposition disposition)
|
|
{
|
|
if (disposition == QueueReleaseDisposition.Retry && entry.Attempt >= _options.MaxDeliveryAttempts)
|
|
{
|
|
return DeadLetterAsync(entry, $"max-delivery-attempts:{entry.Attempt}");
|
|
}
|
|
|
|
if (disposition == QueueReleaseDisposition.Retry)
|
|
{
|
|
entry.Attempt++;
|
|
_ready.Enqueue(entry);
|
|
}
|
|
else
|
|
{
|
|
_idempotency.TryRemove(entry.IdempotencyKey, out _);
|
|
}
|
|
|
|
_inFlight.TryRemove(entry.SequenceId, out _);
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
internal Task DeadLetterAsync(QueueEntry entry, string reason)
|
|
{
|
|
entry.DeadLetterReason = reason;
|
|
_inFlight.TryRemove(entry.SequenceId, out _);
|
|
_idempotency.TryRemove(entry.IdempotencyKey, out _);
|
|
_deadLetters.Add(entry);
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
private sealed class InMemoryLease : IScanQueueLease
|
|
{
|
|
private readonly InMemoryScanQueue _owner;
|
|
private readonly QueueEntry _entry;
|
|
private int _completed;
|
|
|
|
public InMemoryLease(
|
|
InMemoryScanQueue owner,
|
|
QueueEntry entry,
|
|
string consumer,
|
|
DateTimeOffset now,
|
|
TimeSpan leaseDuration)
|
|
{
|
|
_owner = owner;
|
|
_entry = entry;
|
|
Consumer = consumer;
|
|
MessageId = entry.SequenceId;
|
|
JobId = entry.JobId;
|
|
Payload = entry.Payload;
|
|
Attempt = entry.Attempt;
|
|
EnqueuedAt = entry.EnqueuedAt;
|
|
LeaseExpiresAt = now.Add(leaseDuration);
|
|
IdempotencyKey = entry.IdempotencyKey;
|
|
TraceId = entry.TraceId;
|
|
Attributes = entry.Attributes;
|
|
}
|
|
|
|
public string MessageId { get; }
|
|
|
|
public string JobId { get; }
|
|
|
|
public ReadOnlyMemory<byte> Payload { get; }
|
|
|
|
public int Attempt { get; }
|
|
|
|
public DateTimeOffset EnqueuedAt { get; }
|
|
|
|
public DateTimeOffset LeaseExpiresAt { get; private set; }
|
|
|
|
public string Consumer { get; }
|
|
|
|
public string? IdempotencyKey { get; }
|
|
|
|
public string? TraceId { get; }
|
|
|
|
public IReadOnlyDictionary<string, string> Attributes { get; }
|
|
|
|
public Task AcknowledgeAsync(CancellationToken cancellationToken = default)
|
|
{
|
|
if (TryComplete())
|
|
{
|
|
return _owner.AcknowledgeAsync(_entry);
|
|
}
|
|
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
public Task RenewAsync(TimeSpan leaseDuration, CancellationToken cancellationToken = default)
|
|
{
|
|
return RenewInternalAsync(leaseDuration);
|
|
}
|
|
|
|
public Task ReleaseAsync(QueueReleaseDisposition disposition, CancellationToken cancellationToken = default)
|
|
{
|
|
if (TryComplete())
|
|
{
|
|
return _owner.ReleaseAsync(_entry, disposition);
|
|
}
|
|
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
public Task DeadLetterAsync(string reason, CancellationToken cancellationToken = default)
|
|
{
|
|
if (TryComplete())
|
|
{
|
|
return _owner.DeadLetterAsync(_entry, reason);
|
|
}
|
|
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
private async Task RenewInternalAsync(TimeSpan leaseDuration)
|
|
{
|
|
var expires = await _owner.RenewAsync(_entry, leaseDuration).ConfigureAwait(false);
|
|
LeaseExpiresAt = expires;
|
|
}
|
|
|
|
private bool TryComplete()
|
|
=> Interlocked.CompareExchange(ref _completed, 1, 0) == 0;
|
|
}
|
|
|
|
internal sealed class QueueEntry
|
|
{
|
|
public QueueEntry(
|
|
string sequenceId,
|
|
string jobId,
|
|
byte[] payload,
|
|
string idempotencyKey,
|
|
int attempt,
|
|
DateTimeOffset enqueuedAt,
|
|
string? traceId,
|
|
IReadOnlyDictionary<string, string> attributes)
|
|
{
|
|
SequenceId = sequenceId;
|
|
JobId = jobId;
|
|
Payload = payload;
|
|
IdempotencyKey = idempotencyKey;
|
|
Attempt = attempt;
|
|
EnqueuedAt = enqueuedAt;
|
|
LastLeaseAt = enqueuedAt;
|
|
TraceId = traceId;
|
|
Attributes = attributes;
|
|
}
|
|
|
|
public string SequenceId { get; }
|
|
|
|
public string JobId { get; }
|
|
|
|
public byte[] Payload { get; }
|
|
|
|
public string IdempotencyKey { get; }
|
|
|
|
public int Attempt { get; set; }
|
|
|
|
public int Deliveries { get; set; }
|
|
|
|
public DateTimeOffset EnqueuedAt { get; }
|
|
|
|
public DateTimeOffset LeaseExpiresAt { get; set; }
|
|
|
|
public DateTimeOffset LastLeaseAt { get; set; }
|
|
|
|
public string? TraceId { get; }
|
|
|
|
public IReadOnlyDictionary<string, string> Attributes { get; }
|
|
|
|
public string? DeadLetterReason { get; set; }
|
|
}
|
|
}
|
|
}
|