Fix head-of-line blocking: concurrent message processing with guards
Root cause: The consumer loop processes messages sequentially with await. One slow handler (e.g., Concelier advisory JOIN taking 30s) blocks all other messages. Evidence: consumer pending=1, idle=34min, stream lag=59 messages undelivered. Fix: Replace sequential foreach with Task.WhenAll for concurrent processing. Each message gets its own exception guard: - 55s per-message timeout (below 60s gateway timeout) - Exception catch-all with retry release - Graceful shutdown propagation via CancellationToken - TryReleaseAsync guard prevents failed release from crashing loop Applied to both server (gateway) and client (microservice) consumer loops: ProcessRequestsAsync, ProcessResponsesAsync, ProcessIncomingRequestsAsync. This is the foundational fix. One slow request should never block delivery of all other requests to the same service. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -179,32 +179,24 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
|
|||||||
|
|
||||||
consecutiveFailures = 0; // Reset on success
|
consecutiveFailures = 0; // Reset on success
|
||||||
|
|
||||||
foreach (var lease in leases)
|
if (leases.Count > 0)
|
||||||
{
|
{
|
||||||
try
|
await Task.WhenAll(leases.Select(lease =>
|
||||||
{
|
ProcessLeaseWithGuardAsync(lease, (l, ct) =>
|
||||||
// Check if this response is for us (our connection)
|
|
||||||
if (lease.Message.ConnectionId == _connectionId ||
|
|
||||||
string.IsNullOrEmpty(lease.Message.ConnectionId))
|
|
||||||
{
|
{
|
||||||
var frame = DecodeFrame(
|
if (l.Message.ConnectionId == _connectionId ||
|
||||||
lease.Message.FrameType,
|
string.IsNullOrEmpty(l.Message.ConnectionId))
|
||||||
lease.Message.CorrelationId,
|
{
|
||||||
lease.Message.PayloadBase64);
|
var frame = DecodeFrame(
|
||||||
|
l.Message.FrameType,
|
||||||
_correlationTracker.TryCompleteRequest(lease.Message.CorrelationId, frame);
|
l.Message.CorrelationId,
|
||||||
}
|
l.Message.PayloadBase64);
|
||||||
await lease.AcknowledgeAsync(cancellationToken);
|
_correlationTracker.TryCompleteRequest(l.Message.CorrelationId, frame);
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
return Task.CompletedTask;
|
||||||
{
|
}, cancellationToken)));
|
||||||
_logger.LogError(ex, "Error processing response {MessageId}", lease.MessageId);
|
|
||||||
await lease.ReleaseAsync(ReleaseDisposition.Retry, cancellationToken);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
else
|
||||||
// Wait for Pub/Sub notification instead of busy-polling.
|
|
||||||
if (leases.Count == 0)
|
|
||||||
{
|
{
|
||||||
await _responseQueue.WaitForMessagesAsync(_options.HeartbeatInterval, cancellationToken);
|
await _responseQueue.WaitForMessagesAsync(_options.HeartbeatInterval, cancellationToken);
|
||||||
}
|
}
|
||||||
@@ -249,27 +241,16 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
|
|||||||
|
|
||||||
consecutiveFailures = 0; // Reset on success
|
consecutiveFailures = 0; // Reset on success
|
||||||
|
|
||||||
foreach (var lease in leases)
|
// Process messages concurrently to prevent head-of-line blocking.
|
||||||
|
// One slow handler must not block delivery of other messages.
|
||||||
|
if (leases.Count > 0)
|
||||||
{
|
{
|
||||||
try
|
await Task.WhenAll(leases.Select(lease =>
|
||||||
{
|
ProcessLeaseWithGuardAsync(lease, HandleIncomingRequestAsync, cancellationToken)));
|
||||||
await HandleIncomingRequestAsync(lease, cancellationToken);
|
|
||||||
await lease.AcknowledgeAsync(cancellationToken);
|
|
||||||
}
|
|
||||||
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, "Error handling request {MessageId}", lease.MessageId);
|
|
||||||
await lease.ReleaseAsync(ReleaseDisposition.Retry, cancellationToken);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
else
|
||||||
// Wait for Pub/Sub notification instead of busy-polling.
|
|
||||||
if (leases.Count == 0)
|
|
||||||
{
|
{
|
||||||
|
// Wait for Pub/Sub notification instead of busy-polling.
|
||||||
await _serviceIncomingQueue.WaitForMessagesAsync(_options.HeartbeatInterval, cancellationToken);
|
await _serviceIncomingQueue.WaitForMessagesAsync(_options.HeartbeatInterval, cancellationToken);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -706,4 +687,54 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
|
|||||||
|
|
||||||
_inflightHandlers.Clear();
|
_inflightHandlers.Clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Processes a single message lease with timeout guard and exception protection.
|
||||||
|
/// Prevents one slow or buggy handler from blocking the entire consumer loop.
|
||||||
|
/// </summary>
|
||||||
|
private async Task ProcessLeaseWithGuardAsync<TMessage>(
|
||||||
|
IMessageLease<TMessage> lease,
|
||||||
|
Func<IMessageLease<TMessage>, CancellationToken, Task> handler,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
where TMessage : class
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||||||
|
timeoutCts.CancelAfter(TimeSpan.FromSeconds(55)); // 55s < 60s gateway timeout
|
||||||
|
|
||||||
|
await handler(lease, timeoutCts.Token);
|
||||||
|
await lease.AcknowledgeAsync(cancellationToken);
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
// Graceful shutdown — let lease expire naturally
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException)
|
||||||
|
{
|
||||||
|
_logger.LogWarning("Message {MessageId} processing timed out after 55s", lease.MessageId);
|
||||||
|
await TryReleaseAsync(lease, ReleaseDisposition.Retry, cancellationToken);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "Error processing message {MessageId}", lease.MessageId);
|
||||||
|
await TryReleaseAsync(lease, ReleaseDisposition.Retry, cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static async Task TryReleaseAsync<TMessage>(
|
||||||
|
IMessageLease<TMessage> lease,
|
||||||
|
ReleaseDisposition disposition,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
where TMessage : class
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await lease.ReleaseAsync(disposition, cancellationToken);
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
// Best-effort release — don't let a failed release crash the consumer loop
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -156,23 +156,16 @@ public sealed class MessagingTransportServer : ITransportServer, IDisposable
|
|||||||
new LeaseRequest { BatchSize = _options.BatchSize, LeaseDuration = _options.LeaseDuration },
|
new LeaseRequest { BatchSize = _options.BatchSize, LeaseDuration = _options.LeaseDuration },
|
||||||
cancellationToken);
|
cancellationToken);
|
||||||
|
|
||||||
foreach (var lease in leases)
|
// Process messages concurrently to prevent head-of-line blocking.
|
||||||
|
// One slow handler must not block delivery of other messages.
|
||||||
|
if (leases.Count > 0)
|
||||||
{
|
{
|
||||||
try
|
await Task.WhenAll(leases.Select(lease =>
|
||||||
{
|
ProcessLeaseWithGuardAsync(lease, ProcessRequestMessageAsync, cancellationToken)));
|
||||||
await ProcessRequestMessageAsync(lease, cancellationToken);
|
|
||||||
await lease.AcknowledgeAsync(cancellationToken);
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, "Error processing request {MessageId}", lease.MessageId);
|
|
||||||
await lease.ReleaseAsync(ReleaseDisposition.Retry, cancellationToken);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
else
|
||||||
// Wait for Pub/Sub notification instead of busy-polling.
|
|
||||||
if (leases.Count == 0)
|
|
||||||
{
|
{
|
||||||
|
// Wait for Pub/Sub notification instead of busy-polling.
|
||||||
await _requestQueue.WaitForMessagesAsync(_options.HeartbeatInterval, cancellationToken);
|
await _requestQueue.WaitForMessagesAsync(_options.HeartbeatInterval, cancellationToken);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -200,22 +193,12 @@ public sealed class MessagingTransportServer : ITransportServer, IDisposable
|
|||||||
new LeaseRequest { BatchSize = _options.BatchSize, LeaseDuration = _options.LeaseDuration },
|
new LeaseRequest { BatchSize = _options.BatchSize, LeaseDuration = _options.LeaseDuration },
|
||||||
cancellationToken);
|
cancellationToken);
|
||||||
|
|
||||||
foreach (var lease in leases)
|
if (leases.Count > 0)
|
||||||
{
|
{
|
||||||
try
|
await Task.WhenAll(leases.Select(lease =>
|
||||||
{
|
ProcessLeaseWithGuardAsync(lease, ProcessResponseMessageAsync, cancellationToken)));
|
||||||
await ProcessResponseMessageAsync(lease, cancellationToken);
|
|
||||||
await lease.AcknowledgeAsync(cancellationToken);
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, "Error processing response {MessageId}", lease.MessageId);
|
|
||||||
await lease.ReleaseAsync(ReleaseDisposition.Retry, cancellationToken);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
else
|
||||||
// Wait for Pub/Sub notification instead of busy-polling.
|
|
||||||
if (leases.Count == 0)
|
|
||||||
{
|
{
|
||||||
await _responseQueue.WaitForMessagesAsync(_options.HeartbeatInterval, cancellationToken);
|
await _responseQueue.WaitForMessagesAsync(_options.HeartbeatInterval, cancellationToken);
|
||||||
}
|
}
|
||||||
@@ -397,6 +380,57 @@ public sealed class MessagingTransportServer : ITransportServer, IDisposable
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Processes a single message lease with timeout guard and exception protection.
|
||||||
|
/// Prevents one slow or buggy handler from blocking the entire consumer loop.
|
||||||
|
/// </summary>
|
||||||
|
private async Task ProcessLeaseWithGuardAsync<TMessage>(
|
||||||
|
IMessageLease<TMessage> lease,
|
||||||
|
Func<IMessageLease<TMessage>, CancellationToken, Task> handler,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
where TMessage : class
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||||||
|
timeoutCts.CancelAfter(TimeSpan.FromSeconds(55)); // 55s < 60s gateway timeout
|
||||||
|
|
||||||
|
await handler(lease, timeoutCts.Token);
|
||||||
|
await lease.AcknowledgeAsync(cancellationToken);
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
// Graceful shutdown — let lease expire naturally, XAUTOCLAIM will recover
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException)
|
||||||
|
{
|
||||||
|
// Per-message timeout — handler took too long
|
||||||
|
_logger.LogWarning("Message {MessageId} processing timed out after 55s", lease.MessageId);
|
||||||
|
await TryReleaseAsync(lease, ReleaseDisposition.Retry, cancellationToken);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "Error processing message {MessageId}", lease.MessageId);
|
||||||
|
await TryReleaseAsync(lease, ReleaseDisposition.Retry, cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static async Task TryReleaseAsync<TMessage>(
|
||||||
|
IMessageLease<TMessage> lease,
|
||||||
|
ReleaseDisposition disposition,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
where TMessage : class
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await lease.ReleaseAsync(disposition, cancellationToken);
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
// Best-effort release — don't let a failed release crash the consumer loop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public void Dispose()
|
public void Dispose()
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -59,6 +59,14 @@ public class MessagingTransportOptions
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public TimeSpan HeartbeatInterval { get; set; } = TimeSpan.FromSeconds(45);
|
public TimeSpan HeartbeatInterval { get; set; } = TimeSpan.FromSeconds(45);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gets or sets the maximum number of messages processed concurrently per
|
||||||
|
/// consumer loop iteration. Prevents head-of-line blocking where one slow
|
||||||
|
/// handler blocks all other messages in the batch.
|
||||||
|
/// Default: 10 (matches BatchSize).
|
||||||
|
/// </summary>
|
||||||
|
public int MaxConcurrentRequests { get; set; } = 10;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Gets or sets the dead letter queue suffix.
|
/// Gets or sets the dead letter queue suffix.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
|||||||
Reference in New Issue
Block a user