diff --git a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportClient.cs b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportClient.cs
index 8e01204c7..269d22829 100644
--- a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportClient.cs
+++ b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportClient.cs
@@ -179,32 +179,24 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
consecutiveFailures = 0; // Reset on success
- foreach (var lease in leases)
+ if (leases.Count > 0)
{
- try
- {
- // Check if this response is for us (our connection)
- if (lease.Message.ConnectionId == _connectionId ||
- string.IsNullOrEmpty(lease.Message.ConnectionId))
+ await Task.WhenAll(leases.Select(lease =>
+ ProcessLeaseWithGuardAsync(lease, (l, ct) =>
{
- var frame = DecodeFrame(
- lease.Message.FrameType,
- lease.Message.CorrelationId,
- lease.Message.PayloadBase64);
-
- _correlationTracker.TryCompleteRequest(lease.Message.CorrelationId, frame);
- }
- await lease.AcknowledgeAsync(cancellationToken);
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "Error processing response {MessageId}", lease.MessageId);
- await lease.ReleaseAsync(ReleaseDisposition.Retry, cancellationToken);
- }
+ if (l.Message.ConnectionId == _connectionId ||
+ string.IsNullOrEmpty(l.Message.ConnectionId))
+ {
+ var frame = DecodeFrame(
+ l.Message.FrameType,
+ l.Message.CorrelationId,
+ l.Message.PayloadBase64);
+ _correlationTracker.TryCompleteRequest(l.Message.CorrelationId, frame);
+ }
+ return Task.CompletedTask;
+ }, cancellationToken)));
}
-
- // Wait for Pub/Sub notification instead of busy-polling.
- if (leases.Count == 0)
+ else
{
await _responseQueue.WaitForMessagesAsync(_options.HeartbeatInterval, cancellationToken);
}
@@ -249,27 +241,16 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
consecutiveFailures = 0; // Reset on success
- foreach (var lease in leases)
+ // Process messages concurrently to prevent head-of-line blocking.
+ // One slow handler must not block delivery of other messages.
+ if (leases.Count > 0)
{
- try
- {
- await HandleIncomingRequestAsync(lease, cancellationToken);
- await lease.AcknowledgeAsync(cancellationToken);
- }
- catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
- {
- break;
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "Error handling request {MessageId}", lease.MessageId);
- await lease.ReleaseAsync(ReleaseDisposition.Retry, cancellationToken);
- }
+ await Task.WhenAll(leases.Select(lease =>
+ ProcessLeaseWithGuardAsync(lease, HandleIncomingRequestAsync, cancellationToken)));
}
-
- // Wait for Pub/Sub notification instead of busy-polling.
- if (leases.Count == 0)
+ else
{
+ // Wait for Pub/Sub notification instead of busy-polling.
await _serviceIncomingQueue.WaitForMessagesAsync(_options.HeartbeatInterval, cancellationToken);
}
}
@@ -706,4 +687,54 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
_inflightHandlers.Clear();
}
+
+ ///
+ /// Processes a single message lease with timeout guard and exception protection.
+ /// Prevents one slow or buggy handler from blocking the entire consumer loop.
+ ///
+ private async Task ProcessLeaseWithGuardAsync(
+ IMessageLease lease,
+ Func, CancellationToken, Task> handler,
+ CancellationToken cancellationToken)
+ where TMessage : class
+ {
+ try
+ {
+ using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ timeoutCts.CancelAfter(TimeSpan.FromSeconds(55)); // 55s < 60s gateway timeout
+
+ await handler(lease, timeoutCts.Token);
+ await lease.AcknowledgeAsync(cancellationToken);
+ }
+ catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
+ {
+ // Graceful shutdown — let lease expire naturally
+ }
+ catch (OperationCanceledException)
+ {
+ _logger.LogWarning("Message {MessageId} processing timed out after 55s", lease.MessageId);
+ await TryReleaseAsync(lease, ReleaseDisposition.Retry, cancellationToken);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error processing message {MessageId}", lease.MessageId);
+ await TryReleaseAsync(lease, ReleaseDisposition.Retry, cancellationToken);
+ }
+ }
+
+ private static async Task TryReleaseAsync(
+ IMessageLease lease,
+ ReleaseDisposition disposition,
+ CancellationToken cancellationToken)
+ where TMessage : class
+ {
+ try
+ {
+ await lease.ReleaseAsync(disposition, cancellationToken);
+ }
+ catch
+ {
+ // Best-effort release — don't let a failed release crash the consumer loop
+ }
+ }
}
diff --git a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportServer.cs b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportServer.cs
index 3dd754ff7..dbc24b23d 100644
--- a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportServer.cs
+++ b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportServer.cs
@@ -156,23 +156,16 @@ public sealed class MessagingTransportServer : ITransportServer, IDisposable
new LeaseRequest { BatchSize = _options.BatchSize, LeaseDuration = _options.LeaseDuration },
cancellationToken);
- foreach (var lease in leases)
+ // Process messages concurrently to prevent head-of-line blocking.
+ // One slow handler must not block delivery of other messages.
+ if (leases.Count > 0)
{
- try
- {
- await ProcessRequestMessageAsync(lease, cancellationToken);
- await lease.AcknowledgeAsync(cancellationToken);
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "Error processing request {MessageId}", lease.MessageId);
- await lease.ReleaseAsync(ReleaseDisposition.Retry, cancellationToken);
- }
+ await Task.WhenAll(leases.Select(lease =>
+ ProcessLeaseWithGuardAsync(lease, ProcessRequestMessageAsync, cancellationToken)));
}
-
- // Wait for Pub/Sub notification instead of busy-polling.
- if (leases.Count == 0)
+ else
{
+ // Wait for Pub/Sub notification instead of busy-polling.
await _requestQueue.WaitForMessagesAsync(_options.HeartbeatInterval, cancellationToken);
}
}
@@ -200,22 +193,12 @@ public sealed class MessagingTransportServer : ITransportServer, IDisposable
new LeaseRequest { BatchSize = _options.BatchSize, LeaseDuration = _options.LeaseDuration },
cancellationToken);
- foreach (var lease in leases)
+ if (leases.Count > 0)
{
- try
- {
- await ProcessResponseMessageAsync(lease, cancellationToken);
- await lease.AcknowledgeAsync(cancellationToken);
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "Error processing response {MessageId}", lease.MessageId);
- await lease.ReleaseAsync(ReleaseDisposition.Retry, cancellationToken);
- }
+ await Task.WhenAll(leases.Select(lease =>
+ ProcessLeaseWithGuardAsync(lease, ProcessResponseMessageAsync, cancellationToken)));
}
-
- // Wait for Pub/Sub notification instead of busy-polling.
- if (leases.Count == 0)
+ else
{
await _responseQueue.WaitForMessagesAsync(_options.HeartbeatInterval, cancellationToken);
}
@@ -397,6 +380,57 @@ public sealed class MessagingTransportServer : ITransportServer, IDisposable
};
}
+ ///
+ /// Processes a single message lease with timeout guard and exception protection.
+ /// Prevents one slow or buggy handler from blocking the entire consumer loop.
+ ///
+ private async Task ProcessLeaseWithGuardAsync(
+ IMessageLease lease,
+ Func, CancellationToken, Task> handler,
+ CancellationToken cancellationToken)
+ where TMessage : class
+ {
+ try
+ {
+ using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ timeoutCts.CancelAfter(TimeSpan.FromSeconds(55)); // 55s < 60s gateway timeout
+
+ await handler(lease, timeoutCts.Token);
+ await lease.AcknowledgeAsync(cancellationToken);
+ }
+ catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
+ {
+ // Graceful shutdown — let lease expire naturally, XAUTOCLAIM will recover
+ }
+ catch (OperationCanceledException)
+ {
+ // Per-message timeout — handler took too long
+ _logger.LogWarning("Message {MessageId} processing timed out after 55s", lease.MessageId);
+ await TryReleaseAsync(lease, ReleaseDisposition.Retry, cancellationToken);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error processing message {MessageId}", lease.MessageId);
+ await TryReleaseAsync(lease, ReleaseDisposition.Retry, cancellationToken);
+ }
+ }
+
+ private static async Task TryReleaseAsync(
+ IMessageLease lease,
+ ReleaseDisposition disposition,
+ CancellationToken cancellationToken)
+ where TMessage : class
+ {
+ try
+ {
+ await lease.ReleaseAsync(disposition, cancellationToken);
+ }
+ catch
+ {
+ // Best-effort release — don't let a failed release crash the consumer loop
+ }
+ }
+
///
public void Dispose()
{
diff --git a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/Options/MessagingTransportOptions.cs b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/Options/MessagingTransportOptions.cs
index 17f097ff0..c2638b70a 100644
--- a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/Options/MessagingTransportOptions.cs
+++ b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/Options/MessagingTransportOptions.cs
@@ -59,6 +59,14 @@ public class MessagingTransportOptions
///
public TimeSpan HeartbeatInterval { get; set; } = TimeSpan.FromSeconds(45);
+ ///
+ /// Gets or sets the maximum number of messages processed concurrently per
+ /// consumer loop iteration. Prevents head-of-line blocking where one slow
+ /// handler blocks all other messages in the batch.
+ /// Default: 10 (matches BatchSize).
+ ///
+ public int MaxConcurrentRequests { get; set; } = 10;
+
///
/// Gets or sets the dead letter queue suffix.
///