using System.Buffers; using System.Collections.ObjectModel; using System.Text.Json; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StellaOps.Messaging.Abstractions; using StackExchange.Redis; using RedisStreamPosition = StackExchange.Redis.StreamPosition; namespace StellaOps.Messaging.Transport.Valkey; /// /// Valkey/Redis Streams implementation of . /// /// The message type. public sealed class ValkeyMessageQueue : IMessageQueue, IAsyncDisposable where TMessage : class { private const string ProviderNameValue = "valkey"; private static class Fields { public const string Payload = "payload"; public const string TenantId = "tenant"; public const string CorrelationId = "correlation"; public const string IdempotencyKey = "idem"; public const string Attempt = "attempt"; public const string EnqueuedAt = "enq_at"; public const string HeaderPrefix = "h:"; } private readonly ValkeyConnectionFactory _connectionFactory; private readonly MessageQueueOptions _queueOptions; private readonly ValkeyTransportOptions _transportOptions; private readonly ILogger>? _logger; private readonly TimeProvider _timeProvider; private readonly SemaphoreSlim _groupInitLock = new(1, 1); private readonly JsonSerializerOptions _jsonOptions; private volatile bool _groupInitialized; private bool _disposed; public ValkeyMessageQueue( ValkeyConnectionFactory connectionFactory, MessageQueueOptions queueOptions, ValkeyTransportOptions transportOptions, ILogger>? logger = null, TimeProvider? timeProvider = null, JsonSerializerOptions? jsonOptions = null) { _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); _queueOptions = queueOptions ?? throw new ArgumentNullException(nameof(queueOptions)); _transportOptions = transportOptions ?? throw new ArgumentNullException(nameof(transportOptions)); _logger = logger; _timeProvider = timeProvider ?? TimeProvider.System; _jsonOptions = jsonOptions ?? new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase, WriteIndented = false }; } /// public string ProviderName => ProviderNameValue; /// public string QueueName => _queueOptions.QueueName; /// public async ValueTask EnqueueAsync( TMessage message, EnqueueOptions? options = null, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(message); cancellationToken.ThrowIfCancellationRequested(); var db = await _connectionFactory.GetDatabaseAsync(cancellationToken).ConfigureAwait(false); await EnsureConsumerGroupAsync(db, cancellationToken).ConfigureAwait(false); var now = _timeProvider.GetUtcNow(); var entries = BuildEntries(message, now, 1, options); var messageId = await AddToStreamAsync( db, _queueOptions.QueueName, entries, _queueOptions.ApproximateMaxLength) .ConfigureAwait(false); // Handle idempotency if key provided if (!string.IsNullOrWhiteSpace(options?.IdempotencyKey)) { var idempotencyKey = BuildIdempotencyKey(options.IdempotencyKey); var stored = await db.StringSetAsync( idempotencyKey, messageId, when: When.NotExists, expiry: _queueOptions.IdempotencyWindow) .ConfigureAwait(false); if (!stored) { // Duplicate detected - delete the message we just added and return existing await db.StreamDeleteAsync(_queueOptions.QueueName, [(RedisValue)messageId]).ConfigureAwait(false); var existing = await db.StringGetAsync(idempotencyKey).ConfigureAwait(false); var existingId = existing.IsNullOrEmpty ? messageId : existing.ToString(); _logger?.LogDebug( "Duplicate enqueue detected for queue {Queue} with key {Key}; returning existing id {MessageId}", _queueOptions.QueueName, idempotencyKey, existingId); return EnqueueResult.Duplicate(existingId); } } _logger?.LogDebug("Enqueued message to {Queue} with id {MessageId}", _queueOptions.QueueName, messageId); return EnqueueResult.Succeeded(messageId); } /// public async ValueTask>> LeaseAsync( LeaseRequest request, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(request); cancellationToken.ThrowIfCancellationRequested(); var db = await _connectionFactory.GetDatabaseAsync(cancellationToken).ConfigureAwait(false); await EnsureConsumerGroupAsync(db, cancellationToken).ConfigureAwait(false); var consumer = _queueOptions.ConsumerName ?? $"{Environment.MachineName}-{Environment.ProcessId}"; StreamEntry[] entries; if (request.PendingOnly) { // Read from pending only (redeliveries) entries = await db.StreamReadGroupAsync( _queueOptions.QueueName, _queueOptions.ConsumerGroup, consumer, position: "0", count: request.BatchSize) .ConfigureAwait(false); } else { // Read new messages entries = await db.StreamReadGroupAsync( _queueOptions.QueueName, _queueOptions.ConsumerGroup, consumer, position: ">", count: request.BatchSize) .ConfigureAwait(false); } if (entries is null || entries.Length == 0) { return []; } var now = _timeProvider.GetUtcNow(); var leaseDuration = request.LeaseDuration ?? _queueOptions.DefaultLeaseDuration; var leases = new List>(entries.Length); foreach (var entry in entries) { var lease = TryMapLease(entry, consumer, now, leaseDuration, attemptOverride: null); if (lease is null) { await HandlePoisonEntryAsync(db, entry.Id).ConfigureAwait(false); continue; } leases.Add(lease); } return leases; } /// public async ValueTask>> ClaimExpiredAsync( ClaimRequest request, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(request); cancellationToken.ThrowIfCancellationRequested(); var db = await _connectionFactory.GetDatabaseAsync(cancellationToken).ConfigureAwait(false); await EnsureConsumerGroupAsync(db, cancellationToken).ConfigureAwait(false); var consumer = _queueOptions.ConsumerName ?? $"{Environment.MachineName}-{Environment.ProcessId}"; var pending = await db.StreamPendingMessagesAsync( _queueOptions.QueueName, _queueOptions.ConsumerGroup, request.BatchSize, RedisValue.Null, (long)request.MinIdleTime.TotalMilliseconds) .ConfigureAwait(false); if (pending is null || pending.Length == 0) { return []; } var eligible = pending .Where(info => info.IdleTimeInMilliseconds >= request.MinIdleTime.TotalMilliseconds && info.DeliveryCount >= request.MinDeliveryAttempts) .ToArray(); if (eligible.Length == 0) { return []; } var messageIds = eligible.Select(info => (RedisValue)info.MessageId).ToArray(); var claimed = await db.StreamClaimAsync( _queueOptions.QueueName, _queueOptions.ConsumerGroup, consumer, 0, messageIds) .ConfigureAwait(false); if (claimed is null || claimed.Length == 0) { return []; } var now = _timeProvider.GetUtcNow(); var leaseDuration = request.LeaseDuration ?? _queueOptions.DefaultLeaseDuration; var attemptLookup = eligible.ToDictionary( info => info.MessageId.IsNullOrEmpty ? string.Empty : info.MessageId.ToString(), info => (int)Math.Max(1, info.DeliveryCount), StringComparer.Ordinal); var leases = new List>(claimed.Length); foreach (var entry in claimed) { var entryId = entry.Id.ToString(); attemptLookup.TryGetValue(entryId, out var attempt); var lease = TryMapLease(entry, consumer, now, leaseDuration, attemptOverride: attempt); if (lease is null) { await HandlePoisonEntryAsync(db, entry.Id).ConfigureAwait(false); continue; } leases.Add(lease); } return leases; } /// public async ValueTask GetPendingCountAsync(CancellationToken cancellationToken = default) { var db = await _connectionFactory.GetDatabaseAsync(cancellationToken).ConfigureAwait(false); var info = await db.StreamPendingAsync(_queueOptions.QueueName, _queueOptions.ConsumerGroup).ConfigureAwait(false); return info.PendingMessageCount; } internal async ValueTask AcknowledgeAsync(ValkeyMessageLease lease, CancellationToken cancellationToken) { if (!lease.TryBeginCompletion()) { return; } var db = await _connectionFactory.GetDatabaseAsync(cancellationToken).ConfigureAwait(false); await db.StreamAcknowledgeAsync( _queueOptions.QueueName, _queueOptions.ConsumerGroup, [(RedisValue)lease.MessageId]) .ConfigureAwait(false); await db.StreamDeleteAsync(_queueOptions.QueueName, [(RedisValue)lease.MessageId]).ConfigureAwait(false); _logger?.LogDebug("Acknowledged message {MessageId} from queue {Queue}", lease.MessageId, _queueOptions.QueueName); } internal async ValueTask RenewLeaseAsync(ValkeyMessageLease lease, TimeSpan extension, CancellationToken cancellationToken) { var db = await _connectionFactory.GetDatabaseAsync(cancellationToken).ConfigureAwait(false); await db.StreamClaimAsync( _queueOptions.QueueName, _queueOptions.ConsumerGroup, lease.Consumer, 0, [(RedisValue)lease.MessageId]) .ConfigureAwait(false); var expires = _timeProvider.GetUtcNow().Add(extension); lease.RefreshLease(expires); } internal async ValueTask ReleaseAsync( ValkeyMessageLease lease, ReleaseDisposition disposition, CancellationToken cancellationToken) { if (disposition == ReleaseDisposition.Retry && lease.Attempt >= _queueOptions.MaxDeliveryAttempts) { await DeadLetterAsync(lease, $"max-delivery-attempts:{lease.Attempt}", cancellationToken).ConfigureAwait(false); return; } if (!lease.TryBeginCompletion()) { return; } var db = await _connectionFactory.GetDatabaseAsync(cancellationToken).ConfigureAwait(false); // Acknowledge and delete the current entry await db.StreamAcknowledgeAsync( _queueOptions.QueueName, _queueOptions.ConsumerGroup, [(RedisValue)lease.MessageId]) .ConfigureAwait(false); await db.StreamDeleteAsync(_queueOptions.QueueName, [(RedisValue)lease.MessageId]).ConfigureAwait(false); if (disposition == ReleaseDisposition.Retry) { lease.IncrementAttempt(); // Calculate backoff delay var backoff = CalculateBackoff(lease.Attempt); if (backoff > TimeSpan.Zero) { try { await Task.Delay(backoff, cancellationToken).ConfigureAwait(false); } catch (TaskCanceledException) { return; } } // Re-enqueue with incremented attempt var now = _timeProvider.GetUtcNow(); var entries = BuildEntries(lease.Message, now, lease.Attempt, null); await AddToStreamAsync(db, _queueOptions.QueueName, entries, _queueOptions.ApproximateMaxLength) .ConfigureAwait(false); _logger?.LogDebug("Retrying message {MessageId}, attempt {Attempt}", lease.MessageId, lease.Attempt); } } internal async ValueTask DeadLetterAsync(ValkeyMessageLease lease, string reason, CancellationToken cancellationToken) { if (!lease.TryBeginCompletion()) { return; } var db = await _connectionFactory.GetDatabaseAsync(cancellationToken).ConfigureAwait(false); // Acknowledge and delete from main queue await db.StreamAcknowledgeAsync( _queueOptions.QueueName, _queueOptions.ConsumerGroup, [(RedisValue)lease.MessageId]) .ConfigureAwait(false); await db.StreamDeleteAsync(_queueOptions.QueueName, [(RedisValue)lease.MessageId]).ConfigureAwait(false); // Move to dead-letter queue if configured if (!string.IsNullOrWhiteSpace(_queueOptions.DeadLetterQueue)) { var now = _timeProvider.GetUtcNow(); var entries = BuildEntries(lease.Message, now, lease.Attempt, null); await AddToStreamAsync(db, _queueOptions.DeadLetterQueue, entries, null).ConfigureAwait(false); _logger?.LogWarning( "Dead-lettered message {MessageId} after {Attempt} attempt(s): {Reason}", lease.MessageId, lease.Attempt, reason); } else { _logger?.LogWarning( "Dropped message {MessageId} after {Attempt} attempt(s); dead-letter queue not configured. Reason: {Reason}", lease.MessageId, lease.Attempt, reason); } } public async ValueTask DisposeAsync() { if (_disposed) { return; } _disposed = true; _groupInitLock.Dispose(); } private string BuildIdempotencyKey(string key) => $"{_transportOptions.IdempotencyKeyPrefix}{key}"; private TimeSpan CalculateBackoff(int attempt) { if (attempt <= 1) { return _queueOptions.RetryInitialBackoff; } var initial = _queueOptions.RetryInitialBackoff; var max = _queueOptions.RetryMaxBackoff; var multiplier = _queueOptions.RetryBackoffMultiplier; var scaledTicks = initial.Ticks * Math.Pow(multiplier, attempt - 1); var cappedTicks = Math.Min(max.Ticks, scaledTicks); return TimeSpan.FromTicks((long)Math.Max(initial.Ticks, cappedTicks)); } private async Task EnsureConsumerGroupAsync(IDatabase database, CancellationToken cancellationToken) { if (_groupInitialized) { return; } await _groupInitLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { if (_groupInitialized) { return; } try { await database.StreamCreateConsumerGroupAsync( _queueOptions.QueueName, _queueOptions.ConsumerGroup, RedisStreamPosition.Beginning, createStream: true) .ConfigureAwait(false); } catch (RedisServerException ex) when (ex.Message.Contains("BUSYGROUP", StringComparison.OrdinalIgnoreCase)) { // Group already exists } _groupInitialized = true; } finally { _groupInitLock.Release(); } } private NameValueEntry[] BuildEntries(TMessage message, DateTimeOffset enqueuedAt, int attempt, EnqueueOptions? options) { var headerCount = options?.Headers?.Count ?? 0; var entries = ArrayPool.Shared.Rent(6 + headerCount); var index = 0; entries[index++] = new NameValueEntry(Fields.Payload, JsonSerializer.Serialize(message, _jsonOptions)); entries[index++] = new NameValueEntry(Fields.Attempt, attempt); entries[index++] = new NameValueEntry(Fields.EnqueuedAt, enqueuedAt.ToUnixTimeMilliseconds()); if (!string.IsNullOrWhiteSpace(options?.TenantId)) { entries[index++] = new NameValueEntry(Fields.TenantId, options.TenantId); } if (!string.IsNullOrWhiteSpace(options?.CorrelationId)) { entries[index++] = new NameValueEntry(Fields.CorrelationId, options.CorrelationId); } if (!string.IsNullOrWhiteSpace(options?.IdempotencyKey)) { entries[index++] = new NameValueEntry(Fields.IdempotencyKey, options.IdempotencyKey); } if (options?.Headers is not null) { foreach (var kvp in options.Headers) { entries[index++] = new NameValueEntry(Fields.HeaderPrefix + kvp.Key, kvp.Value); } } var result = entries.AsSpan(0, index).ToArray(); ArrayPool.Shared.Return(entries, clearArray: true); return result; } private ValkeyMessageLease? TryMapLease( StreamEntry entry, string consumer, DateTimeOffset now, TimeSpan leaseDuration, int? attemptOverride) { if (entry.Values is null || entry.Values.Length == 0) { return null; } string? payload = null; string? tenantId = null; string? correlationId = null; long? enqueuedAtUnix = null; var attempt = attemptOverride ?? 1; Dictionary? headers = null; foreach (var field in entry.Values) { var name = field.Name.ToString(); var value = field.Value; if (name.Equals(Fields.Payload, StringComparison.Ordinal)) { payload = value.ToString(); } else if (name.Equals(Fields.TenantId, StringComparison.Ordinal)) { tenantId = NormalizeOptional(value.ToString()); } else if (name.Equals(Fields.CorrelationId, StringComparison.Ordinal)) { correlationId = NormalizeOptional(value.ToString()); } else if (name.Equals(Fields.EnqueuedAt, StringComparison.Ordinal)) { if (long.TryParse(value.ToString(), out var unixMs)) { enqueuedAtUnix = unixMs; } } else if (name.Equals(Fields.Attempt, StringComparison.Ordinal)) { if (int.TryParse(value.ToString(), out var parsedAttempt)) { attempt = attemptOverride.HasValue ? Math.Max(attemptOverride.Value, parsedAttempt) : Math.Max(1, parsedAttempt); } } else if (name.StartsWith(Fields.HeaderPrefix, StringComparison.Ordinal)) { headers ??= new Dictionary(StringComparer.Ordinal); var key = name[Fields.HeaderPrefix.Length..]; headers[key] = value.ToString(); } } if (payload is null || enqueuedAtUnix is null) { return null; } TMessage message; try { message = JsonSerializer.Deserialize(payload, _jsonOptions)!; } catch { return null; } var enqueuedAt = DateTimeOffset.FromUnixTimeMilliseconds(enqueuedAtUnix.Value); var leaseExpires = now.Add(leaseDuration); IReadOnlyDictionary? headersView = headers is null || headers.Count == 0 ? null : new ReadOnlyDictionary(headers); return new ValkeyMessageLease( this, entry.Id.ToString(), message, attempt, enqueuedAt, leaseExpires, consumer, tenantId, correlationId, headersView); } private async Task HandlePoisonEntryAsync(IDatabase database, RedisValue entryId) { await database.StreamAcknowledgeAsync( _queueOptions.QueueName, _queueOptions.ConsumerGroup, [entryId]) .ConfigureAwait(false); await database.StreamDeleteAsync(_queueOptions.QueueName, [entryId]).ConfigureAwait(false); _logger?.LogWarning("Removed poison entry {EntryId} from queue {Queue}", entryId, _queueOptions.QueueName); } private async Task AddToStreamAsync( IDatabase database, string stream, NameValueEntry[] entries, int? maxLength) { var capacity = 4 + (entries.Length * 2); var args = new List(capacity) { (RedisKey)stream }; if (maxLength.HasValue) { args.Add("MAXLEN"); args.Add("~"); args.Add(maxLength.Value); } args.Add("*"); foreach (var entry in entries) { args.Add((RedisValue)entry.Name); args.Add(entry.Value); } var result = await database.ExecuteAsync("XADD", [.. args]).ConfigureAwait(false); return result!.ToString()!; } private static string? NormalizeOptional(string? value) => string.IsNullOrWhiteSpace(value) ? null : value; }