Add diagnostic logging to consumer loop and guard for transport debugging
This commit is contained in:
@@ -235,23 +235,32 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
|
||||
{
|
||||
try
|
||||
{
|
||||
_logger.LogTrace("Consumer loop: calling LeaseAsync (batch={BatchSize})", _options.BatchSize);
|
||||
var sw = System.Diagnostics.Stopwatch.StartNew();
|
||||
|
||||
var leases = await _serviceIncomingQueue.LeaseAsync(
|
||||
new LeaseRequest { BatchSize = _options.BatchSize },
|
||||
cancellationToken);
|
||||
|
||||
consecutiveFailures = 0; // Reset on success
|
||||
sw.Stop();
|
||||
consecutiveFailures = 0;
|
||||
|
||||
// Process messages concurrently to prevent head-of-line blocking.
|
||||
// One slow handler must not block delivery of other messages.
|
||||
if (leases.Count > 0)
|
||||
{
|
||||
_logger.LogDebug("Consumer loop: leased {Count} messages in {ElapsedMs}ms, processing concurrently",
|
||||
leases.Count, sw.ElapsedMilliseconds);
|
||||
|
||||
await Task.WhenAll(leases.Select(lease =>
|
||||
ProcessLeaseWithGuardAsync(lease, HandleIncomingRequestAsync, cancellationToken)));
|
||||
|
||||
_logger.LogDebug("Consumer loop: finished processing {Count} messages", leases.Count);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Wait for Pub/Sub notification instead of busy-polling.
|
||||
_logger.LogTrace("Consumer loop: no messages, waiting for notification (lease took {ElapsedMs}ms)",
|
||||
sw.ElapsedMilliseconds);
|
||||
await _serviceIncomingQueue.WaitForMessagesAsync(_options.HeartbeatInterval, cancellationToken);
|
||||
_logger.LogTrace("Consumer loop: notification received or timeout, resuming");
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||
@@ -698,26 +707,30 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
|
||||
CancellationToken cancellationToken)
|
||||
where TMessage : class
|
||||
{
|
||||
var sw = System.Diagnostics.Stopwatch.StartNew();
|
||||
try
|
||||
{
|
||||
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||||
timeoutCts.CancelAfter(TimeSpan.FromSeconds(55)); // 55s < 60s gateway timeout
|
||||
timeoutCts.CancelAfter(TimeSpan.FromSeconds(55));
|
||||
|
||||
_logger.LogDebug("Guard: processing message {MessageId}", lease.MessageId);
|
||||
await handler(lease, timeoutCts.Token);
|
||||
sw.Stop();
|
||||
_logger.LogDebug("Guard: message {MessageId} processed in {ElapsedMs}ms", lease.MessageId, sw.ElapsedMilliseconds);
|
||||
await lease.AcknowledgeAsync(cancellationToken);
|
||||
}
|
||||
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
// Graceful shutdown — let lease expire naturally
|
||||
_logger.LogDebug("Guard: message {MessageId} cancelled (shutdown) after {ElapsedMs}ms", lease.MessageId, sw.ElapsedMilliseconds);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
_logger.LogWarning("Message {MessageId} processing timed out after 55s", lease.MessageId);
|
||||
_logger.LogWarning("Guard: message {MessageId} TIMED OUT after {ElapsedMs}ms (55s limit)", lease.MessageId, sw.ElapsedMilliseconds);
|
||||
await TryReleaseAsync(lease, ReleaseDisposition.Retry, cancellationToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error processing message {MessageId}", lease.MessageId);
|
||||
_logger.LogError(ex, "Guard: message {MessageId} FAILED after {ElapsedMs}ms", lease.MessageId, sw.ElapsedMilliseconds);
|
||||
await TryReleaseAsync(lease, ReleaseDisposition.Retry, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user