up
This commit is contained in:
76
src/StellaOps.Notify.Queue/Redis/RedisNotifyDeliveryLease.cs
Normal file
76
src/StellaOps.Notify.Queue/Redis/RedisNotifyDeliveryLease.cs
Normal file
@@ -0,0 +1,76 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace StellaOps.Notify.Queue.Redis;
|
||||
|
||||
internal sealed class RedisNotifyDeliveryLease : INotifyQueueLease<NotifyDeliveryQueueMessage>
|
||||
{
|
||||
private readonly RedisNotifyDeliveryQueue _queue;
|
||||
private int _completed;
|
||||
|
||||
internal RedisNotifyDeliveryLease(
|
||||
RedisNotifyDeliveryQueue queue,
|
||||
string messageId,
|
||||
NotifyDeliveryQueueMessage message,
|
||||
int attempt,
|
||||
DateTimeOffset enqueuedAt,
|
||||
DateTimeOffset leaseExpiresAt,
|
||||
string consumer,
|
||||
string? idempotencyKey,
|
||||
string partitionKey)
|
||||
{
|
||||
_queue = queue ?? throw new ArgumentNullException(nameof(queue));
|
||||
MessageId = messageId ?? throw new ArgumentNullException(nameof(messageId));
|
||||
Message = message ?? throw new ArgumentNullException(nameof(message));
|
||||
Attempt = attempt;
|
||||
EnqueuedAt = enqueuedAt;
|
||||
LeaseExpiresAt = leaseExpiresAt;
|
||||
Consumer = consumer ?? throw new ArgumentNullException(nameof(consumer));
|
||||
IdempotencyKey = idempotencyKey ?? message.IdempotencyKey;
|
||||
PartitionKey = partitionKey ?? message.ChannelId;
|
||||
}
|
||||
|
||||
public string MessageId { get; }
|
||||
|
||||
public int Attempt { get; internal set; }
|
||||
|
||||
public DateTimeOffset EnqueuedAt { get; }
|
||||
|
||||
public DateTimeOffset LeaseExpiresAt { get; private set; }
|
||||
|
||||
public string Consumer { get; }
|
||||
|
||||
public string Stream => Message.Stream;
|
||||
|
||||
public string TenantId => Message.TenantId;
|
||||
|
||||
public string PartitionKey { get; }
|
||||
|
||||
public string IdempotencyKey { get; }
|
||||
|
||||
public string? TraceId => Message.TraceId;
|
||||
|
||||
public IReadOnlyDictionary<string, string> Attributes => Message.Attributes;
|
||||
|
||||
public NotifyDeliveryQueueMessage Message { get; }
|
||||
|
||||
public Task AcknowledgeAsync(CancellationToken cancellationToken = default)
|
||||
=> _queue.AcknowledgeAsync(this, cancellationToken);
|
||||
|
||||
public Task RenewAsync(TimeSpan leaseDuration, CancellationToken cancellationToken = default)
|
||||
=> _queue.RenewLeaseAsync(this, leaseDuration, cancellationToken);
|
||||
|
||||
public Task ReleaseAsync(NotifyQueueReleaseDisposition disposition, CancellationToken cancellationToken = default)
|
||||
=> _queue.ReleaseAsync(this, disposition, cancellationToken);
|
||||
|
||||
public Task DeadLetterAsync(string reason, CancellationToken cancellationToken = default)
|
||||
=> _queue.DeadLetterAsync(this, reason, cancellationToken);
|
||||
|
||||
internal bool TryBeginCompletion()
|
||||
=> Interlocked.CompareExchange(ref _completed, 1, 0) == 0;
|
||||
|
||||
internal void RefreshLease(DateTimeOffset expiresAt)
|
||||
=> LeaseExpiresAt = expiresAt;
|
||||
}
|
||||
788
src/StellaOps.Notify.Queue/Redis/RedisNotifyDeliveryQueue.cs
Normal file
788
src/StellaOps.Notify.Queue/Redis/RedisNotifyDeliveryQueue.cs
Normal file
@@ -0,0 +1,788 @@
|
||||
using System;
|
||||
using System.Buffers;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Collections.ObjectModel;
|
||||
using System.Globalization;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using StackExchange.Redis;
|
||||
using StellaOps.Notify.Models;
|
||||
|
||||
namespace StellaOps.Notify.Queue.Redis;
|
||||
|
||||
internal sealed class RedisNotifyDeliveryQueue : INotifyDeliveryQueue, IAsyncDisposable
|
||||
{
|
||||
private const string TransportName = "redis";
|
||||
|
||||
private readonly NotifyDeliveryQueueOptions _options;
|
||||
private readonly NotifyRedisDeliveryQueueOptions _redisOptions;
|
||||
private readonly ILogger<RedisNotifyDeliveryQueue> _logger;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly Func<ConfigurationOptions, Task<IConnectionMultiplexer>> _connectionFactory;
|
||||
private readonly SemaphoreSlim _connectionLock = new(1, 1);
|
||||
private readonly SemaphoreSlim _groupLock = new(1, 1);
|
||||
private readonly ConcurrentDictionary<string, bool> _streamInitialized = new(StringComparer.Ordinal);
|
||||
|
||||
private IConnectionMultiplexer? _connection;
|
||||
private bool _disposed;
|
||||
|
||||
public RedisNotifyDeliveryQueue(
|
||||
NotifyDeliveryQueueOptions options,
|
||||
NotifyRedisDeliveryQueueOptions redisOptions,
|
||||
ILogger<RedisNotifyDeliveryQueue> logger,
|
||||
TimeProvider timeProvider,
|
||||
Func<ConfigurationOptions, Task<IConnectionMultiplexer>>? connectionFactory = null)
|
||||
{
|
||||
_options = options ?? throw new ArgumentNullException(nameof(options));
|
||||
_redisOptions = redisOptions ?? throw new ArgumentNullException(nameof(redisOptions));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
_connectionFactory = connectionFactory ?? (async config =>
|
||||
{
|
||||
var connection = await ConnectionMultiplexer.ConnectAsync(config).ConfigureAwait(false);
|
||||
return (IConnectionMultiplexer)connection;
|
||||
});
|
||||
|
||||
if (string.IsNullOrWhiteSpace(_redisOptions.ConnectionString))
|
||||
{
|
||||
throw new InvalidOperationException("Redis connection string must be configured for the Notify delivery queue.");
|
||||
}
|
||||
}
|
||||
|
||||
public async ValueTask<NotifyQueueEnqueueResult> PublishAsync(
|
||||
NotifyDeliveryQueueMessage message,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(message);
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
await EnsureConsumerGroupAsync(db, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var attempt = 1;
|
||||
var entries = BuildEntries(message, now, attempt);
|
||||
|
||||
var messageId = await AddToStreamAsync(
|
||||
db,
|
||||
_redisOptions.StreamName,
|
||||
entries)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
var idempotencyKey = BuildIdempotencyKey(message.IdempotencyKey);
|
||||
var stored = await db.StringSetAsync(
|
||||
idempotencyKey,
|
||||
messageId,
|
||||
when: When.NotExists,
|
||||
expiry: _options.ClaimIdleThreshold)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
if (!stored)
|
||||
{
|
||||
await db.StreamDeleteAsync(
|
||||
_redisOptions.StreamName,
|
||||
new RedisValue[] { messageId })
|
||||
.ConfigureAwait(false);
|
||||
|
||||
var existing = await db.StringGetAsync(idempotencyKey).ConfigureAwait(false);
|
||||
var duplicateId = existing.IsNullOrEmpty ? messageId : existing;
|
||||
|
||||
NotifyQueueMetrics.RecordDeduplicated(TransportName, _redisOptions.StreamName);
|
||||
_logger.LogDebug(
|
||||
"Duplicate Notify delivery enqueue detected for delivery {DeliveryId}.",
|
||||
message.Delivery.DeliveryId);
|
||||
|
||||
return new NotifyQueueEnqueueResult(duplicateId.ToString()!, true);
|
||||
}
|
||||
|
||||
NotifyQueueMetrics.RecordEnqueued(TransportName, _redisOptions.StreamName);
|
||||
_logger.LogDebug(
|
||||
"Enqueued Notify delivery {DeliveryId} (channel {ChannelId}) into stream {Stream}.",
|
||||
message.Delivery.DeliveryId,
|
||||
message.ChannelId,
|
||||
_redisOptions.StreamName);
|
||||
|
||||
return new NotifyQueueEnqueueResult(messageId.ToString()!, false);
|
||||
}
|
||||
|
||||
public async ValueTask<IReadOnlyList<INotifyQueueLease<NotifyDeliveryQueueMessage>>> LeaseAsync(
|
||||
NotifyQueueLeaseRequest request,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
await EnsureConsumerGroupAsync(db, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var entries = await db.StreamReadGroupAsync(
|
||||
_redisOptions.StreamName,
|
||||
_redisOptions.ConsumerGroup,
|
||||
request.Consumer,
|
||||
StreamPosition.NewMessages,
|
||||
request.BatchSize)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
if (entries is null || entries.Length == 0)
|
||||
{
|
||||
return Array.Empty<INotifyQueueLease<NotifyDeliveryQueueMessage>>();
|
||||
}
|
||||
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var leases = new List<INotifyQueueLease<NotifyDeliveryQueueMessage>>(entries.Length);
|
||||
|
||||
foreach (var entry in entries)
|
||||
{
|
||||
var lease = TryMapLease(entry, request.Consumer, now, request.LeaseDuration, attemptOverride: null);
|
||||
if (lease is null)
|
||||
{
|
||||
await AckPoisonAsync(db, entry.Id).ConfigureAwait(false);
|
||||
continue;
|
||||
}
|
||||
|
||||
leases.Add(lease);
|
||||
}
|
||||
|
||||
return leases;
|
||||
}
|
||||
|
||||
public async ValueTask<IReadOnlyList<INotifyQueueLease<NotifyDeliveryQueueMessage>>> ClaimExpiredAsync(
|
||||
NotifyQueueClaimOptions options,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(options);
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
await EnsureConsumerGroupAsync(db, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var pending = await db.StreamPendingMessagesAsync(
|
||||
_redisOptions.StreamName,
|
||||
_redisOptions.ConsumerGroup,
|
||||
options.BatchSize,
|
||||
RedisValue.Null,
|
||||
(long)options.MinIdleTime.TotalMilliseconds)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
if (pending is null || pending.Length == 0)
|
||||
{
|
||||
return Array.Empty<INotifyQueueLease<NotifyDeliveryQueueMessage>>();
|
||||
}
|
||||
|
||||
var eligible = pending
|
||||
.Where(p => p.IdleTimeInMilliseconds >= options.MinIdleTime.TotalMilliseconds)
|
||||
.ToArray();
|
||||
|
||||
if (eligible.Length == 0)
|
||||
{
|
||||
return Array.Empty<INotifyQueueLease<NotifyDeliveryQueueMessage>>();
|
||||
}
|
||||
|
||||
var messageIds = eligible
|
||||
.Select(static p => (RedisValue)p.MessageId)
|
||||
.ToArray();
|
||||
|
||||
var entries = await db.StreamClaimAsync(
|
||||
_redisOptions.StreamName,
|
||||
_redisOptions.ConsumerGroup,
|
||||
options.ClaimantConsumer,
|
||||
0,
|
||||
messageIds)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
if (entries is null || entries.Length == 0)
|
||||
{
|
||||
return Array.Empty<INotifyQueueLease<NotifyDeliveryQueueMessage>>();
|
||||
}
|
||||
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var attemptLookup = eligible
|
||||
.Where(static info => !info.MessageId.IsNullOrEmpty)
|
||||
.ToDictionary(
|
||||
info => info.MessageId!.ToString(),
|
||||
info => (int)Math.Max(1, info.DeliveryCount),
|
||||
StringComparer.Ordinal);
|
||||
|
||||
var leases = new List<INotifyQueueLease<NotifyDeliveryQueueMessage>>(entries.Length);
|
||||
|
||||
foreach (var entry in entries)
|
||||
{
|
||||
attemptLookup.TryGetValue(entry.Id.ToString(), out var attempt);
|
||||
var lease = TryMapLease(entry, options.ClaimantConsumer, now, _options.DefaultLeaseDuration, attempt == 0 ? null : attempt);
|
||||
if (lease is null)
|
||||
{
|
||||
await AckPoisonAsync(db, entry.Id).ConfigureAwait(false);
|
||||
continue;
|
||||
}
|
||||
|
||||
leases.Add(lease);
|
||||
}
|
||||
|
||||
return leases;
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
if (_disposed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_disposed = true;
|
||||
if (_connection is not null)
|
||||
{
|
||||
await _connection.CloseAsync().ConfigureAwait(false);
|
||||
_connection.Dispose();
|
||||
}
|
||||
|
||||
_connectionLock.Dispose();
|
||||
_groupLock.Dispose();
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
|
||||
internal async Task AcknowledgeAsync(
|
||||
RedisNotifyDeliveryLease lease,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (!lease.TryBeginCompletion())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
await db.StreamAcknowledgeAsync(
|
||||
_redisOptions.StreamName,
|
||||
_redisOptions.ConsumerGroup,
|
||||
new RedisValue[] { lease.MessageId })
|
||||
.ConfigureAwait(false);
|
||||
|
||||
await db.StreamDeleteAsync(
|
||||
_redisOptions.StreamName,
|
||||
new RedisValue[] { lease.MessageId })
|
||||
.ConfigureAwait(false);
|
||||
|
||||
NotifyQueueMetrics.RecordAck(TransportName, _redisOptions.StreamName);
|
||||
_logger.LogDebug(
|
||||
"Acknowledged Notify delivery {DeliveryId} (message {MessageId}).",
|
||||
lease.Message.Delivery.DeliveryId,
|
||||
lease.MessageId);
|
||||
}
|
||||
|
||||
internal async Task RenewLeaseAsync(
|
||||
RedisNotifyDeliveryLease lease,
|
||||
TimeSpan leaseDuration,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
await db.StreamClaimAsync(
|
||||
_redisOptions.StreamName,
|
||||
_redisOptions.ConsumerGroup,
|
||||
lease.Consumer,
|
||||
0,
|
||||
new RedisValue[] { lease.MessageId })
|
||||
.ConfigureAwait(false);
|
||||
|
||||
var expires = _timeProvider.GetUtcNow().Add(leaseDuration);
|
||||
lease.RefreshLease(expires);
|
||||
|
||||
_logger.LogDebug(
|
||||
"Renewed Notify delivery lease {DeliveryId} until {Expires:u}.",
|
||||
lease.Message.Delivery.DeliveryId,
|
||||
expires);
|
||||
}
|
||||
|
||||
internal async Task ReleaseAsync(
|
||||
RedisNotifyDeliveryLease lease,
|
||||
NotifyQueueReleaseDisposition disposition,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (disposition == NotifyQueueReleaseDisposition.Retry
|
||||
&& lease.Attempt >= _options.MaxDeliveryAttempts)
|
||||
{
|
||||
_logger.LogWarning(
|
||||
"Notify delivery {DeliveryId} reached max delivery attempts ({Attempts}); moving to dead-letter stream.",
|
||||
lease.Message.Delivery.DeliveryId,
|
||||
lease.Attempt);
|
||||
|
||||
await DeadLetterAsync(
|
||||
lease,
|
||||
$"max-delivery-attempts:{lease.Attempt}",
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (!lease.TryBeginCompletion())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
await db.StreamAcknowledgeAsync(
|
||||
_redisOptions.StreamName,
|
||||
_redisOptions.ConsumerGroup,
|
||||
new RedisValue[] { lease.MessageId })
|
||||
.ConfigureAwait(false);
|
||||
await db.StreamDeleteAsync(
|
||||
_redisOptions.StreamName,
|
||||
new RedisValue[] { lease.MessageId })
|
||||
.ConfigureAwait(false);
|
||||
|
||||
if (disposition == NotifyQueueReleaseDisposition.Retry)
|
||||
{
|
||||
NotifyQueueMetrics.RecordRetry(TransportName, _redisOptions.StreamName);
|
||||
|
||||
var delay = CalculateBackoff(lease.Attempt);
|
||||
if (delay > TimeSpan.Zero)
|
||||
{
|
||||
try
|
||||
{
|
||||
await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (TaskCanceledException)
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var entries = BuildEntries(lease.Message, now, lease.Attempt + 1);
|
||||
|
||||
await AddToStreamAsync(
|
||||
db,
|
||||
_redisOptions.StreamName,
|
||||
entries)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
NotifyQueueMetrics.RecordEnqueued(TransportName, _redisOptions.StreamName);
|
||||
_logger.LogInformation(
|
||||
"Retrying Notify delivery {DeliveryId} (attempt {Attempt}).",
|
||||
lease.Message.Delivery.DeliveryId,
|
||||
lease.Attempt + 1);
|
||||
}
|
||||
else
|
||||
{
|
||||
NotifyQueueMetrics.RecordAck(TransportName, _redisOptions.StreamName);
|
||||
_logger.LogInformation(
|
||||
"Abandoned Notify delivery {DeliveryId} after {Attempt} attempt(s).",
|
||||
lease.Message.Delivery.DeliveryId,
|
||||
lease.Attempt);
|
||||
}
|
||||
}
|
||||
|
||||
internal async Task DeadLetterAsync(
|
||||
RedisNotifyDeliveryLease lease,
|
||||
string reason,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (!lease.TryBeginCompletion())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
await db.StreamAcknowledgeAsync(
|
||||
_redisOptions.StreamName,
|
||||
_redisOptions.ConsumerGroup,
|
||||
new RedisValue[] { lease.MessageId })
|
||||
.ConfigureAwait(false);
|
||||
|
||||
await db.StreamDeleteAsync(
|
||||
_redisOptions.StreamName,
|
||||
new RedisValue[] { lease.MessageId })
|
||||
.ConfigureAwait(false);
|
||||
|
||||
await EnsureDeadLetterStreamAsync(db, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var entries = BuildDeadLetterEntries(lease, reason);
|
||||
await AddToStreamAsync(
|
||||
db,
|
||||
_redisOptions.DeadLetterStreamName,
|
||||
entries)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
NotifyQueueMetrics.RecordDeadLetter(TransportName, _redisOptions.DeadLetterStreamName);
|
||||
_logger.LogError(
|
||||
"Dead-lettered Notify delivery {DeliveryId} (attempt {Attempt}): {Reason}",
|
||||
lease.Message.Delivery.DeliveryId,
|
||||
lease.Attempt,
|
||||
reason);
|
||||
}
|
||||
|
||||
internal async ValueTask PingAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
_ = await db.PingAsync().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task<IDatabase> GetDatabaseAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (_connection is { IsConnected: true })
|
||||
{
|
||||
return _connection.GetDatabase(_redisOptions.Database ?? -1);
|
||||
}
|
||||
|
||||
await _connectionLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
if (_connection is { IsConnected: true })
|
||||
{
|
||||
return _connection.GetDatabase(_redisOptions.Database ?? -1);
|
||||
}
|
||||
|
||||
var configuration = ConfigurationOptions.Parse(_redisOptions.ConnectionString!);
|
||||
configuration.AbortOnConnectFail = false;
|
||||
if (_redisOptions.Database.HasValue)
|
||||
{
|
||||
configuration.DefaultDatabase = _redisOptions.Database.Value;
|
||||
}
|
||||
|
||||
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||||
timeoutCts.CancelAfter(_redisOptions.InitializationTimeout);
|
||||
|
||||
_connection = await _connectionFactory(configuration).WaitAsync(timeoutCts.Token).ConfigureAwait(false);
|
||||
return _connection.GetDatabase(_redisOptions.Database ?? -1);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_connectionLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task EnsureConsumerGroupAsync(
|
||||
IDatabase database,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (_streamInitialized.ContainsKey(_redisOptions.StreamName))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
await _groupLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
if (_streamInitialized.ContainsKey(_redisOptions.StreamName))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
await database.StreamCreateConsumerGroupAsync(
|
||||
_redisOptions.StreamName,
|
||||
_redisOptions.ConsumerGroup,
|
||||
StreamPosition.Beginning,
|
||||
createStream: true)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
catch (RedisServerException ex) when (ex.Message.Contains("BUSYGROUP", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
// group already exists
|
||||
}
|
||||
|
||||
_streamInitialized[_redisOptions.StreamName] = true;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_groupLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task EnsureDeadLetterStreamAsync(
|
||||
IDatabase database,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (_streamInitialized.ContainsKey(_redisOptions.DeadLetterStreamName))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
await _groupLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
if (_streamInitialized.ContainsKey(_redisOptions.DeadLetterStreamName))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
await database.StreamCreateConsumerGroupAsync(
|
||||
_redisOptions.DeadLetterStreamName,
|
||||
_redisOptions.ConsumerGroup,
|
||||
StreamPosition.Beginning,
|
||||
createStream: true)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
catch (RedisServerException ex) when (ex.Message.Contains("BUSYGROUP", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
// ignore
|
||||
}
|
||||
|
||||
_streamInitialized[_redisOptions.DeadLetterStreamName] = true;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_groupLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
private NameValueEntry[] BuildEntries(
|
||||
NotifyDeliveryQueueMessage message,
|
||||
DateTimeOffset enqueuedAt,
|
||||
int attempt)
|
||||
{
|
||||
var json = NotifyCanonicalJsonSerializer.Serialize(message.Delivery);
|
||||
var attributeCount = message.Attributes.Count;
|
||||
|
||||
var entries = ArrayPool<NameValueEntry>.Shared.Rent(8 + attributeCount);
|
||||
var index = 0;
|
||||
|
||||
entries[index++] = new NameValueEntry(NotifyQueueFields.Payload, json);
|
||||
entries[index++] = new NameValueEntry(NotifyQueueFields.DeliveryId, message.Delivery.DeliveryId);
|
||||
entries[index++] = new NameValueEntry(NotifyQueueFields.ChannelId, message.ChannelId);
|
||||
entries[index++] = new NameValueEntry(NotifyQueueFields.ChannelType, message.ChannelType.ToString());
|
||||
entries[index++] = new NameValueEntry(NotifyQueueFields.Tenant, message.Delivery.TenantId);
|
||||
entries[index++] = new NameValueEntry(NotifyQueueFields.Attempt, attempt);
|
||||
entries[index++] = new NameValueEntry(NotifyQueueFields.EnqueuedAt, enqueuedAt.ToUnixTimeMilliseconds());
|
||||
entries[index++] = new NameValueEntry(NotifyQueueFields.IdempotencyKey, message.IdempotencyKey);
|
||||
entries[index++] = new NameValueEntry(NotifyQueueFields.TraceId, message.TraceId ?? string.Empty);
|
||||
entries[index++] = new NameValueEntry(NotifyQueueFields.PartitionKey, message.PartitionKey);
|
||||
|
||||
if (attributeCount > 0)
|
||||
{
|
||||
foreach (var kvp in message.Attributes)
|
||||
{
|
||||
entries[index++] = new NameValueEntry(
|
||||
NotifyQueueFields.AttributePrefix + kvp.Key,
|
||||
kvp.Value);
|
||||
}
|
||||
}
|
||||
|
||||
return entries.AsSpan(0, index).ToArray();
|
||||
}
|
||||
|
||||
private NameValueEntry[] BuildDeadLetterEntries(RedisNotifyDeliveryLease lease, string reason)
|
||||
{
|
||||
var json = NotifyCanonicalJsonSerializer.Serialize(lease.Message.Delivery);
|
||||
var attributes = lease.Message.Attributes;
|
||||
var attributeCount = attributes.Count;
|
||||
|
||||
var entries = ArrayPool<NameValueEntry>.Shared.Rent(9 + attributeCount);
|
||||
var index = 0;
|
||||
|
||||
entries[index++] = new NameValueEntry(NotifyQueueFields.Payload, json);
|
||||
entries[index++] = new NameValueEntry(NotifyQueueFields.DeliveryId, lease.Message.Delivery.DeliveryId);
|
||||
entries[index++] = new NameValueEntry(NotifyQueueFields.ChannelId, lease.Message.ChannelId);
|
||||
entries[index++] = new NameValueEntry(NotifyQueueFields.ChannelType, lease.Message.ChannelType.ToString());
|
||||
entries[index++] = new NameValueEntry(NotifyQueueFields.Tenant, lease.Message.Delivery.TenantId);
|
||||
entries[index++] = new NameValueEntry(NotifyQueueFields.Attempt, lease.Attempt);
|
||||
entries[index++] = new NameValueEntry(NotifyQueueFields.IdempotencyKey, lease.Message.IdempotencyKey);
|
||||
entries[index++] = new NameValueEntry("deadletter-reason", reason);
|
||||
entries[index++] = new NameValueEntry(NotifyQueueFields.TraceId, lease.Message.TraceId ?? string.Empty);
|
||||
|
||||
foreach (var kvp in attributes)
|
||||
{
|
||||
entries[index++] = new NameValueEntry(
|
||||
NotifyQueueFields.AttributePrefix + kvp.Key,
|
||||
kvp.Value);
|
||||
}
|
||||
|
||||
return entries.AsSpan(0, index).ToArray();
|
||||
}
|
||||
|
||||
private RedisNotifyDeliveryLease? TryMapLease(
|
||||
StreamEntry entry,
|
||||
string consumer,
|
||||
DateTimeOffset now,
|
||||
TimeSpan leaseDuration,
|
||||
int? attemptOverride)
|
||||
{
|
||||
if (entry.Values is null || entry.Values.Length == 0)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
string? payload = null;
|
||||
string? deliveryId = null;
|
||||
string? channelId = null;
|
||||
string? channelTypeRaw = null;
|
||||
string? traceId = null;
|
||||
string? idempotency = null;
|
||||
string? partitionKey = null;
|
||||
long? enqueuedAtUnix = null;
|
||||
var attempt = attemptOverride ?? 1;
|
||||
var attributes = new Dictionary<string, string>(StringComparer.Ordinal);
|
||||
|
||||
foreach (var value in entry.Values)
|
||||
{
|
||||
var name = value.Name.ToString();
|
||||
var data = value.Value;
|
||||
if (name.Equals(NotifyQueueFields.Payload, StringComparison.Ordinal))
|
||||
{
|
||||
payload = data.ToString();
|
||||
}
|
||||
else if (name.Equals(NotifyQueueFields.DeliveryId, StringComparison.Ordinal))
|
||||
{
|
||||
deliveryId = data.ToString();
|
||||
}
|
||||
else if (name.Equals(NotifyQueueFields.ChannelId, StringComparison.Ordinal))
|
||||
{
|
||||
channelId = data.ToString();
|
||||
}
|
||||
else if (name.Equals(NotifyQueueFields.ChannelType, StringComparison.Ordinal))
|
||||
{
|
||||
channelTypeRaw = data.ToString();
|
||||
}
|
||||
else if (name.Equals(NotifyQueueFields.Attempt, StringComparison.Ordinal))
|
||||
{
|
||||
if (int.TryParse(data.ToString(), NumberStyles.Integer, CultureInfo.InvariantCulture, out var parsed))
|
||||
{
|
||||
attempt = Math.Max(parsed, attempt);
|
||||
}
|
||||
}
|
||||
else if (name.Equals(NotifyQueueFields.EnqueuedAt, StringComparison.Ordinal))
|
||||
{
|
||||
if (long.TryParse(data.ToString(), NumberStyles.Integer, CultureInfo.InvariantCulture, out var unix))
|
||||
{
|
||||
enqueuedAtUnix = unix;
|
||||
}
|
||||
}
|
||||
else if (name.Equals(NotifyQueueFields.IdempotencyKey, StringComparison.Ordinal))
|
||||
{
|
||||
idempotency = data.ToString();
|
||||
}
|
||||
else if (name.Equals(NotifyQueueFields.TraceId, StringComparison.Ordinal))
|
||||
{
|
||||
var text = data.ToString();
|
||||
traceId = string.IsNullOrWhiteSpace(text) ? null : text;
|
||||
}
|
||||
else if (name.Equals(NotifyQueueFields.PartitionKey, StringComparison.Ordinal))
|
||||
{
|
||||
partitionKey = data.ToString();
|
||||
}
|
||||
else if (name.StartsWith(NotifyQueueFields.AttributePrefix, StringComparison.Ordinal))
|
||||
{
|
||||
attributes[name[NotifyQueueFields.AttributePrefix.Length..]] = data.ToString();
|
||||
}
|
||||
}
|
||||
|
||||
if (payload is null || deliveryId is null || channelId is null || channelTypeRaw is null)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
NotifyDelivery delivery;
|
||||
try
|
||||
{
|
||||
delivery = NotifyCanonicalJsonSerializer.Deserialize<NotifyDelivery>(payload);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(
|
||||
ex,
|
||||
"Failed to deserialize Notify delivery payload for entry {EntryId}.",
|
||||
entry.Id.ToString());
|
||||
return null;
|
||||
}
|
||||
|
||||
if (!Enum.TryParse<NotifyChannelType>(channelTypeRaw, ignoreCase: true, out var channelType))
|
||||
{
|
||||
_logger.LogWarning(
|
||||
"Unknown channel type '{ChannelType}' for delivery {DeliveryId}; acknowledging as poison.",
|
||||
channelTypeRaw,
|
||||
deliveryId);
|
||||
return null;
|
||||
}
|
||||
|
||||
var attributeView = attributes.Count == 0
|
||||
? EmptyReadOnlyDictionary<string, string>.Instance
|
||||
: new ReadOnlyDictionary<string, string>(attributes);
|
||||
|
||||
var enqueuedAt = enqueuedAtUnix is null
|
||||
? now
|
||||
: DateTimeOffset.FromUnixTimeMilliseconds(enqueuedAtUnix.Value);
|
||||
|
||||
var message = new NotifyDeliveryQueueMessage(
|
||||
delivery,
|
||||
channelId,
|
||||
channelType,
|
||||
_redisOptions.StreamName,
|
||||
traceId,
|
||||
attributeView);
|
||||
|
||||
var leaseExpires = now.Add(leaseDuration);
|
||||
|
||||
return new RedisNotifyDeliveryLease(
|
||||
this,
|
||||
entry.Id.ToString(),
|
||||
message,
|
||||
attempt,
|
||||
enqueuedAt,
|
||||
leaseExpires,
|
||||
consumer,
|
||||
idempotency,
|
||||
partitionKey ?? channelId);
|
||||
}
|
||||
|
||||
private async Task AckPoisonAsync(IDatabase database, RedisValue messageId)
|
||||
{
|
||||
await database.StreamAcknowledgeAsync(
|
||||
_redisOptions.StreamName,
|
||||
_redisOptions.ConsumerGroup,
|
||||
new RedisValue[] { messageId })
|
||||
.ConfigureAwait(false);
|
||||
|
||||
await database.StreamDeleteAsync(
|
||||
_redisOptions.StreamName,
|
||||
new RedisValue[] { messageId })
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private static async Task<RedisValue> AddToStreamAsync(
|
||||
IDatabase database,
|
||||
string stream,
|
||||
IReadOnlyList<NameValueEntry> entries)
|
||||
{
|
||||
return await database.StreamAddAsync(
|
||||
stream,
|
||||
entries.ToArray())
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private string BuildIdempotencyKey(string token)
|
||||
=> string.Concat(_redisOptions.IdempotencyKeyPrefix, token);
|
||||
|
||||
private TimeSpan CalculateBackoff(int attempt)
|
||||
{
|
||||
var initial = _options.RetryInitialBackoff > TimeSpan.Zero
|
||||
? _options.RetryInitialBackoff
|
||||
: TimeSpan.FromSeconds(1);
|
||||
|
||||
if (initial <= TimeSpan.Zero)
|
||||
{
|
||||
return TimeSpan.Zero;
|
||||
}
|
||||
|
||||
if (attempt <= 1)
|
||||
{
|
||||
return initial;
|
||||
}
|
||||
|
||||
var max = _options.RetryMaxBackoff > TimeSpan.Zero
|
||||
? _options.RetryMaxBackoff
|
||||
: initial;
|
||||
|
||||
var exponent = attempt - 1;
|
||||
var scaledTicks = initial.Ticks * Math.Pow(2, exponent - 1);
|
||||
var cappedTicks = Math.Min(max.Ticks, scaledTicks);
|
||||
var resultTicks = Math.Max(initial.Ticks, (long)cappedTicks);
|
||||
return TimeSpan.FromTicks(resultTicks);
|
||||
}
|
||||
}
|
||||
76
src/StellaOps.Notify.Queue/Redis/RedisNotifyEventLease.cs
Normal file
76
src/StellaOps.Notify.Queue/Redis/RedisNotifyEventLease.cs
Normal file
@@ -0,0 +1,76 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace StellaOps.Notify.Queue.Redis;
|
||||
|
||||
internal sealed class RedisNotifyEventLease : INotifyQueueLease<NotifyQueueEventMessage>
|
||||
{
|
||||
private readonly RedisNotifyEventQueue _queue;
|
||||
private int _completed;
|
||||
|
||||
internal RedisNotifyEventLease(
|
||||
RedisNotifyEventQueue queue,
|
||||
NotifyRedisEventStreamOptions streamOptions,
|
||||
string messageId,
|
||||
NotifyQueueEventMessage message,
|
||||
int attempt,
|
||||
string consumer,
|
||||
DateTimeOffset enqueuedAt,
|
||||
DateTimeOffset leaseExpiresAt)
|
||||
{
|
||||
_queue = queue ?? throw new ArgumentNullException(nameof(queue));
|
||||
StreamOptions = streamOptions ?? throw new ArgumentNullException(nameof(streamOptions));
|
||||
MessageId = messageId ?? throw new ArgumentNullException(nameof(messageId));
|
||||
Message = message ?? throw new ArgumentNullException(nameof(message));
|
||||
Attempt = attempt;
|
||||
Consumer = consumer ?? throw new ArgumentNullException(nameof(consumer));
|
||||
EnqueuedAt = enqueuedAt;
|
||||
LeaseExpiresAt = leaseExpiresAt;
|
||||
}
|
||||
|
||||
internal NotifyRedisEventStreamOptions StreamOptions { get; }
|
||||
|
||||
public string MessageId { get; }
|
||||
|
||||
public int Attempt { get; }
|
||||
|
||||
public DateTimeOffset EnqueuedAt { get; }
|
||||
|
||||
public DateTimeOffset LeaseExpiresAt { get; private set; }
|
||||
|
||||
public string Consumer { get; }
|
||||
|
||||
public string Stream => StreamOptions.Stream;
|
||||
|
||||
public string TenantId => Message.TenantId;
|
||||
|
||||
public string? PartitionKey => Message.PartitionKey;
|
||||
|
||||
public string IdempotencyKey => Message.IdempotencyKey;
|
||||
|
||||
public string? TraceId => Message.TraceId;
|
||||
|
||||
public IReadOnlyDictionary<string, string> Attributes => Message.Attributes;
|
||||
|
||||
public NotifyQueueEventMessage Message { get; }
|
||||
|
||||
public Task AcknowledgeAsync(CancellationToken cancellationToken = default)
|
||||
=> _queue.AcknowledgeAsync(this, cancellationToken);
|
||||
|
||||
public Task RenewAsync(TimeSpan leaseDuration, CancellationToken cancellationToken = default)
|
||||
=> _queue.RenewLeaseAsync(this, leaseDuration, cancellationToken);
|
||||
|
||||
public Task ReleaseAsync(NotifyQueueReleaseDisposition disposition, CancellationToken cancellationToken = default)
|
||||
=> _queue.ReleaseAsync(this, disposition, cancellationToken);
|
||||
|
||||
public Task DeadLetterAsync(string reason, CancellationToken cancellationToken = default)
|
||||
=> _queue.DeadLetterAsync(this, reason, cancellationToken);
|
||||
|
||||
internal bool TryBeginCompletion()
|
||||
=> Interlocked.CompareExchange(ref _completed, 1, 0) == 0;
|
||||
|
||||
internal void RefreshLease(DateTimeOffset expiresAt)
|
||||
=> LeaseExpiresAt = expiresAt;
|
||||
}
|
||||
655
src/StellaOps.Notify.Queue/Redis/RedisNotifyEventQueue.cs
Normal file
655
src/StellaOps.Notify.Queue/Redis/RedisNotifyEventQueue.cs
Normal file
@@ -0,0 +1,655 @@
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Collections.ObjectModel;
|
||||
using System.Globalization;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using StackExchange.Redis;
|
||||
using StellaOps.Notify.Models;
|
||||
|
||||
namespace StellaOps.Notify.Queue.Redis;
|
||||
|
||||
internal sealed class RedisNotifyEventQueue : INotifyEventQueue, IAsyncDisposable
|
||||
{
|
||||
private const string TransportName = "redis";
|
||||
|
||||
private readonly NotifyEventQueueOptions _options;
|
||||
private readonly NotifyRedisEventQueueOptions _redisOptions;
|
||||
private readonly ILogger<RedisNotifyEventQueue> _logger;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly Func<ConfigurationOptions, Task<IConnectionMultiplexer>> _connectionFactory;
|
||||
private readonly SemaphoreSlim _connectionLock = new(1, 1);
|
||||
private readonly SemaphoreSlim _groupInitLock = new(1, 1);
|
||||
private readonly IReadOnlyDictionary<string, NotifyRedisEventStreamOptions> _streamsByName;
|
||||
private readonly ConcurrentDictionary<string, bool> _initializedStreams = new(StringComparer.Ordinal);
|
||||
|
||||
private IConnectionMultiplexer? _connection;
|
||||
private bool _disposed;
|
||||
|
||||
public RedisNotifyEventQueue(
|
||||
NotifyEventQueueOptions options,
|
||||
NotifyRedisEventQueueOptions redisOptions,
|
||||
ILogger<RedisNotifyEventQueue> logger,
|
||||
TimeProvider timeProvider,
|
||||
Func<ConfigurationOptions, Task<IConnectionMultiplexer>>? connectionFactory = null)
|
||||
{
|
||||
_options = options ?? throw new ArgumentNullException(nameof(options));
|
||||
_redisOptions = redisOptions ?? throw new ArgumentNullException(nameof(redisOptions));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
_connectionFactory = connectionFactory ?? (async config =>
|
||||
{
|
||||
var connection = await ConnectionMultiplexer.ConnectAsync(config).ConfigureAwait(false);
|
||||
return (IConnectionMultiplexer)connection;
|
||||
});
|
||||
|
||||
if (string.IsNullOrWhiteSpace(_redisOptions.ConnectionString))
|
||||
{
|
||||
throw new InvalidOperationException("Redis connection string must be configured for Notify event queue.");
|
||||
}
|
||||
|
||||
_streamsByName = _redisOptions.Streams.ToDictionary(
|
||||
stream => stream.Stream,
|
||||
stream => stream,
|
||||
StringComparer.Ordinal);
|
||||
}
|
||||
|
||||
public async ValueTask<NotifyQueueEnqueueResult> PublishAsync(
|
||||
NotifyQueueEventMessage message,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(message);
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var streamOptions = GetStreamOptions(message.Stream);
|
||||
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
await EnsureStreamInitializedAsync(db, streamOptions, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var entries = BuildEntries(message, now, attempt: 1);
|
||||
|
||||
var messageId = await AddToStreamAsync(
|
||||
db,
|
||||
streamOptions,
|
||||
entries)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
var idempotencyToken = string.IsNullOrWhiteSpace(message.IdempotencyKey)
|
||||
? message.Event.EventId.ToString("N")
|
||||
: message.IdempotencyKey;
|
||||
|
||||
var idempotencyKey = streamOptions.IdempotencyKeyPrefix + idempotencyToken;
|
||||
var stored = await db.StringSetAsync(
|
||||
idempotencyKey,
|
||||
messageId,
|
||||
when: When.NotExists,
|
||||
expiry: _redisOptions.IdempotencyWindow)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
if (!stored)
|
||||
{
|
||||
await db.StreamDeleteAsync(
|
||||
streamOptions.Stream,
|
||||
new RedisValue[] { messageId })
|
||||
.ConfigureAwait(false);
|
||||
|
||||
var existing = await db.StringGetAsync(idempotencyKey).ConfigureAwait(false);
|
||||
var duplicateId = existing.IsNullOrEmpty ? messageId : existing;
|
||||
|
||||
_logger.LogDebug(
|
||||
"Duplicate Notify event enqueue detected for idempotency token {Token}; returning existing stream id {StreamId}.",
|
||||
idempotencyToken,
|
||||
duplicateId.ToString());
|
||||
|
||||
NotifyQueueMetrics.RecordDeduplicated(TransportName, streamOptions.Stream);
|
||||
return new NotifyQueueEnqueueResult(duplicateId.ToString()!, true);
|
||||
}
|
||||
|
||||
NotifyQueueMetrics.RecordEnqueued(TransportName, streamOptions.Stream);
|
||||
|
||||
_logger.LogDebug(
|
||||
"Enqueued Notify event {EventId} for tenant {Tenant} on stream {Stream} (id {StreamId}).",
|
||||
message.Event.EventId,
|
||||
message.TenantId,
|
||||
streamOptions.Stream,
|
||||
messageId.ToString());
|
||||
|
||||
return new NotifyQueueEnqueueResult(messageId.ToString()!, false);
|
||||
}
|
||||
|
||||
public async ValueTask<IReadOnlyList<INotifyQueueLease<NotifyQueueEventMessage>>> LeaseAsync(
|
||||
NotifyQueueLeaseRequest request,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var leases = new List<INotifyQueueLease<NotifyQueueEventMessage>>(request.BatchSize);
|
||||
|
||||
foreach (var streamOptions in _streamsByName.Values)
|
||||
{
|
||||
await EnsureStreamInitializedAsync(db, streamOptions, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var remaining = request.BatchSize - leases.Count;
|
||||
if (remaining <= 0)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
var entries = await db.StreamReadGroupAsync(
|
||||
streamOptions.Stream,
|
||||
streamOptions.ConsumerGroup,
|
||||
request.Consumer,
|
||||
StreamPosition.NewMessages,
|
||||
remaining)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
if (entries is null || entries.Length == 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
foreach (var entry in entries)
|
||||
{
|
||||
var lease = TryMapLease(
|
||||
streamOptions,
|
||||
entry,
|
||||
request.Consumer,
|
||||
now,
|
||||
request.LeaseDuration,
|
||||
attemptOverride: null);
|
||||
|
||||
if (lease is null)
|
||||
{
|
||||
await AckPoisonAsync(db, streamOptions, entry.Id).ConfigureAwait(false);
|
||||
continue;
|
||||
}
|
||||
|
||||
leases.Add(lease);
|
||||
|
||||
if (leases.Count >= request.BatchSize)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return leases;
|
||||
}
|
||||
|
||||
public async ValueTask<IReadOnlyList<INotifyQueueLease<NotifyQueueEventMessage>>> ClaimExpiredAsync(
|
||||
NotifyQueueClaimOptions options,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(options);
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var leases = new List<INotifyQueueLease<NotifyQueueEventMessage>>(options.BatchSize);
|
||||
|
||||
foreach (var streamOptions in _streamsByName.Values)
|
||||
{
|
||||
await EnsureStreamInitializedAsync(db, streamOptions, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var pending = await db.StreamPendingMessagesAsync(
|
||||
streamOptions.Stream,
|
||||
streamOptions.ConsumerGroup,
|
||||
options.BatchSize,
|
||||
RedisValue.Null,
|
||||
(long)options.MinIdleTime.TotalMilliseconds)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
if (pending is null || pending.Length == 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
var eligible = pending
|
||||
.Where(p => p.IdleTimeInMilliseconds >= options.MinIdleTime.TotalMilliseconds)
|
||||
.ToArray();
|
||||
|
||||
if (eligible.Length == 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
var messageIds = eligible
|
||||
.Select(static p => (RedisValue)p.MessageId)
|
||||
.ToArray();
|
||||
|
||||
var entries = await db.StreamClaimAsync(
|
||||
streamOptions.Stream,
|
||||
streamOptions.ConsumerGroup,
|
||||
options.ClaimantConsumer,
|
||||
0,
|
||||
messageIds)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
if (entries is null || entries.Length == 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
var attemptById = eligible
|
||||
.Where(static info => !info.MessageId.IsNullOrEmpty)
|
||||
.ToDictionary(
|
||||
info => info.MessageId!.ToString(),
|
||||
info => (int)Math.Max(1, info.DeliveryCount),
|
||||
StringComparer.Ordinal);
|
||||
|
||||
foreach (var entry in entries)
|
||||
{
|
||||
var entryId = entry.Id.ToString();
|
||||
attemptById.TryGetValue(entryId, out var attempt);
|
||||
|
||||
var lease = TryMapLease(
|
||||
streamOptions,
|
||||
entry,
|
||||
options.ClaimantConsumer,
|
||||
now,
|
||||
_options.DefaultLeaseDuration,
|
||||
attempt == 0 ? null : attempt);
|
||||
|
||||
if (lease is null)
|
||||
{
|
||||
await AckPoisonAsync(db, streamOptions, entry.Id).ConfigureAwait(false);
|
||||
continue;
|
||||
}
|
||||
|
||||
leases.Add(lease);
|
||||
if (leases.Count >= options.BatchSize)
|
||||
{
|
||||
return leases;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return leases;
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
if (_disposed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_disposed = true;
|
||||
if (_connection is not null)
|
||||
{
|
||||
await _connection.CloseAsync();
|
||||
_connection.Dispose();
|
||||
}
|
||||
|
||||
_connectionLock.Dispose();
|
||||
_groupInitLock.Dispose();
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
|
||||
internal async Task AcknowledgeAsync(
|
||||
RedisNotifyEventLease lease,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (!lease.TryBeginCompletion())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
var streamOptions = lease.StreamOptions;
|
||||
|
||||
await db.StreamAcknowledgeAsync(
|
||||
streamOptions.Stream,
|
||||
streamOptions.ConsumerGroup,
|
||||
new RedisValue[] { lease.MessageId })
|
||||
.ConfigureAwait(false);
|
||||
|
||||
await db.StreamDeleteAsync(
|
||||
streamOptions.Stream,
|
||||
new RedisValue[] { lease.MessageId })
|
||||
.ConfigureAwait(false);
|
||||
|
||||
NotifyQueueMetrics.RecordAck(TransportName, streamOptions.Stream);
|
||||
|
||||
_logger.LogDebug(
|
||||
"Acknowledged Notify event {EventId} on consumer {Consumer} (stream {Stream}, id {MessageId}).",
|
||||
lease.Message.Event.EventId,
|
||||
lease.Consumer,
|
||||
streamOptions.Stream,
|
||||
lease.MessageId);
|
||||
}
|
||||
|
||||
internal async Task RenewLeaseAsync(
|
||||
RedisNotifyEventLease lease,
|
||||
TimeSpan leaseDuration,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
var streamOptions = lease.StreamOptions;
|
||||
|
||||
await db.StreamClaimAsync(
|
||||
streamOptions.Stream,
|
||||
streamOptions.ConsumerGroup,
|
||||
lease.Consumer,
|
||||
0,
|
||||
new RedisValue[] { lease.MessageId })
|
||||
.ConfigureAwait(false);
|
||||
|
||||
var expires = _timeProvider.GetUtcNow().Add(leaseDuration);
|
||||
lease.RefreshLease(expires);
|
||||
|
||||
_logger.LogDebug(
|
||||
"Renewed Notify event lease for {EventId} until {Expires:u}.",
|
||||
lease.Message.Event.EventId,
|
||||
expires);
|
||||
}
|
||||
|
||||
internal Task ReleaseAsync(
|
||||
RedisNotifyEventLease lease,
|
||||
NotifyQueueReleaseDisposition disposition,
|
||||
CancellationToken cancellationToken)
|
||||
=> Task.FromException(new NotSupportedException("Retry/abandon is not supported for Notify event streams."));
|
||||
|
||||
internal async Task DeadLetterAsync(
|
||||
RedisNotifyEventLease lease,
|
||||
string reason,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (!lease.TryBeginCompletion())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
var streamOptions = lease.StreamOptions;
|
||||
|
||||
await db.StreamAcknowledgeAsync(
|
||||
streamOptions.Stream,
|
||||
streamOptions.ConsumerGroup,
|
||||
new RedisValue[] { lease.MessageId })
|
||||
.ConfigureAwait(false);
|
||||
|
||||
await db.StreamDeleteAsync(
|
||||
streamOptions.Stream,
|
||||
new RedisValue[] { lease.MessageId })
|
||||
.ConfigureAwait(false);
|
||||
|
||||
_logger.LogWarning(
|
||||
"Dead-lettered Notify event {EventId} on stream {Stream} with reason '{Reason}'.",
|
||||
lease.Message.Event.EventId,
|
||||
streamOptions.Stream,
|
||||
reason);
|
||||
}
|
||||
|
||||
internal async ValueTask PingAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var db = await GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
_ = await db.PingAsync().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private NotifyRedisEventStreamOptions GetStreamOptions(string stream)
|
||||
{
|
||||
if (!_streamsByName.TryGetValue(stream, out var options))
|
||||
{
|
||||
throw new InvalidOperationException($"Stream '{stream}' is not configured for the Notify event queue.");
|
||||
}
|
||||
|
||||
return options;
|
||||
}
|
||||
|
||||
private async Task<IDatabase> GetDatabaseAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (_connection is { IsConnected: true })
|
||||
{
|
||||
return _connection.GetDatabase(_redisOptions.Database ?? -1);
|
||||
}
|
||||
|
||||
await _connectionLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
if (_connection is { IsConnected: true })
|
||||
{
|
||||
return _connection.GetDatabase(_redisOptions.Database ?? -1);
|
||||
}
|
||||
|
||||
var configuration = ConfigurationOptions.Parse(_redisOptions.ConnectionString!);
|
||||
configuration.AbortOnConnectFail = false;
|
||||
if (_redisOptions.Database.HasValue)
|
||||
{
|
||||
configuration.DefaultDatabase = _redisOptions.Database;
|
||||
}
|
||||
|
||||
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||||
timeoutCts.CancelAfter(_redisOptions.InitializationTimeout);
|
||||
|
||||
_connection = await _connectionFactory(configuration).WaitAsync(timeoutCts.Token).ConfigureAwait(false);
|
||||
return _connection.GetDatabase(_redisOptions.Database ?? -1);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_connectionLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task EnsureStreamInitializedAsync(
|
||||
IDatabase database,
|
||||
NotifyRedisEventStreamOptions streamOptions,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (_initializedStreams.ContainsKey(streamOptions.Stream))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
await _groupInitLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
if (_initializedStreams.ContainsKey(streamOptions.Stream))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
await database.StreamCreateConsumerGroupAsync(
|
||||
streamOptions.Stream,
|
||||
streamOptions.ConsumerGroup,
|
||||
StreamPosition.Beginning,
|
||||
createStream: true)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
catch (RedisServerException ex) when (ex.Message.Contains("BUSYGROUP", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
// Consumer group already exists — nothing to do.
|
||||
}
|
||||
|
||||
_initializedStreams[streamOptions.Stream] = true;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_groupInitLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
private static async Task<RedisValue> AddToStreamAsync(
|
||||
IDatabase database,
|
||||
NotifyRedisEventStreamOptions streamOptions,
|
||||
IReadOnlyList<NameValueEntry> entries)
|
||||
{
|
||||
return await database.StreamAddAsync(
|
||||
streamOptions.Stream,
|
||||
entries.ToArray(),
|
||||
maxLength: streamOptions.ApproximateMaxLength,
|
||||
useApproximateMaxLength: streamOptions.ApproximateMaxLength is not null)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private IReadOnlyList<NameValueEntry> BuildEntries(
|
||||
NotifyQueueEventMessage message,
|
||||
DateTimeOffset enqueuedAt,
|
||||
int attempt)
|
||||
{
|
||||
var payload = NotifyCanonicalJsonSerializer.Serialize(message.Event);
|
||||
|
||||
var entries = new List<NameValueEntry>(8 + message.Attributes.Count)
|
||||
{
|
||||
new(NotifyQueueFields.Payload, payload),
|
||||
new(NotifyQueueFields.EventId, message.Event.EventId.ToString("D")),
|
||||
new(NotifyQueueFields.Tenant, message.TenantId),
|
||||
new(NotifyQueueFields.Kind, message.Event.Kind),
|
||||
new(NotifyQueueFields.Attempt, attempt),
|
||||
new(NotifyQueueFields.EnqueuedAt, enqueuedAt.ToUnixTimeMilliseconds()),
|
||||
new(NotifyQueueFields.IdempotencyKey, message.IdempotencyKey),
|
||||
new(NotifyQueueFields.PartitionKey, message.PartitionKey ?? string.Empty),
|
||||
new(NotifyQueueFields.TraceId, message.TraceId ?? string.Empty)
|
||||
};
|
||||
|
||||
foreach (var kvp in message.Attributes)
|
||||
{
|
||||
entries.Add(new NameValueEntry(
|
||||
NotifyQueueFields.AttributePrefix + kvp.Key,
|
||||
kvp.Value));
|
||||
}
|
||||
|
||||
return entries;
|
||||
}
|
||||
|
||||
private RedisNotifyEventLease? TryMapLease(
|
||||
NotifyRedisEventStreamOptions streamOptions,
|
||||
StreamEntry entry,
|
||||
string consumer,
|
||||
DateTimeOffset now,
|
||||
TimeSpan leaseDuration,
|
||||
int? attemptOverride)
|
||||
{
|
||||
if (entry.Values is null || entry.Values.Length == 0)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
string? payloadJson = null;
|
||||
string? eventIdRaw = null;
|
||||
long? enqueuedAtUnix = null;
|
||||
string? idempotency = null;
|
||||
string? partitionKey = null;
|
||||
string? traceId = null;
|
||||
var attempt = attemptOverride ?? 1;
|
||||
var attributes = new Dictionary<string, string>(StringComparer.Ordinal);
|
||||
|
||||
foreach (var field in entry.Values)
|
||||
{
|
||||
var name = field.Name.ToString();
|
||||
var value = field.Value;
|
||||
if (name.Equals(NotifyQueueFields.Payload, StringComparison.Ordinal))
|
||||
{
|
||||
payloadJson = value.ToString();
|
||||
}
|
||||
else if (name.Equals(NotifyQueueFields.EventId, StringComparison.Ordinal))
|
||||
{
|
||||
eventIdRaw = value.ToString();
|
||||
}
|
||||
else if (name.Equals(NotifyQueueFields.Attempt, StringComparison.Ordinal))
|
||||
{
|
||||
if (int.TryParse(value.ToString(), NumberStyles.Integer, CultureInfo.InvariantCulture, out var parsed))
|
||||
{
|
||||
attempt = Math.Max(parsed, attempt);
|
||||
}
|
||||
}
|
||||
else if (name.Equals(NotifyQueueFields.EnqueuedAt, StringComparison.Ordinal))
|
||||
{
|
||||
if (long.TryParse(value.ToString(), NumberStyles.Integer, CultureInfo.InvariantCulture, out var unix))
|
||||
{
|
||||
enqueuedAtUnix = unix;
|
||||
}
|
||||
}
|
||||
else if (name.Equals(NotifyQueueFields.IdempotencyKey, StringComparison.Ordinal))
|
||||
{
|
||||
var text = value.ToString();
|
||||
idempotency = string.IsNullOrWhiteSpace(text) ? null : text;
|
||||
}
|
||||
else if (name.Equals(NotifyQueueFields.PartitionKey, StringComparison.Ordinal))
|
||||
{
|
||||
var text = value.ToString();
|
||||
partitionKey = string.IsNullOrWhiteSpace(text) ? null : text;
|
||||
}
|
||||
else if (name.Equals(NotifyQueueFields.TraceId, StringComparison.Ordinal))
|
||||
{
|
||||
var text = value.ToString();
|
||||
traceId = string.IsNullOrWhiteSpace(text) ? null : text;
|
||||
}
|
||||
else if (name.StartsWith(NotifyQueueFields.AttributePrefix, StringComparison.Ordinal))
|
||||
{
|
||||
var key = name[NotifyQueueFields.AttributePrefix.Length..];
|
||||
attributes[key] = value.ToString();
|
||||
}
|
||||
}
|
||||
|
||||
if (payloadJson is null || enqueuedAtUnix is null)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
NotifyEvent notifyEvent;
|
||||
try
|
||||
{
|
||||
notifyEvent = NotifyCanonicalJsonSerializer.Deserialize<NotifyEvent>(payloadJson);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(
|
||||
ex,
|
||||
"Failed to deserialize Notify event payload for stream {Stream} entry {EntryId}.",
|
||||
streamOptions.Stream,
|
||||
entry.Id.ToString());
|
||||
return null;
|
||||
}
|
||||
|
||||
var attributeView = attributes.Count == 0
|
||||
? EmptyReadOnlyDictionary<string, string>.Instance
|
||||
: new ReadOnlyDictionary<string, string>(attributes);
|
||||
|
||||
var message = new NotifyQueueEventMessage(
|
||||
notifyEvent,
|
||||
streamOptions.Stream,
|
||||
idempotencyKey: idempotency ?? notifyEvent.EventId.ToString("N"),
|
||||
partitionKey: partitionKey,
|
||||
traceId: traceId,
|
||||
attributes: attributeView);
|
||||
|
||||
var enqueuedAt = DateTimeOffset.FromUnixTimeMilliseconds(enqueuedAtUnix.Value);
|
||||
var leaseExpiresAt = now.Add(leaseDuration);
|
||||
|
||||
return new RedisNotifyEventLease(
|
||||
this,
|
||||
streamOptions,
|
||||
entry.Id.ToString(),
|
||||
message,
|
||||
attempt,
|
||||
consumer,
|
||||
enqueuedAt,
|
||||
leaseExpiresAt);
|
||||
}
|
||||
|
||||
private async Task AckPoisonAsync(
|
||||
IDatabase database,
|
||||
NotifyRedisEventStreamOptions streamOptions,
|
||||
RedisValue messageId)
|
||||
{
|
||||
await database.StreamAcknowledgeAsync(
|
||||
streamOptions.Stream,
|
||||
streamOptions.ConsumerGroup,
|
||||
new RedisValue[] { messageId })
|
||||
.ConfigureAwait(false);
|
||||
|
||||
await database.StreamDeleteAsync(
|
||||
streamOptions.Stream,
|
||||
new RedisValue[] { messageId })
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user