From 9ae5936f8826f7ecd30d28893f2447359040313c Mon Sep 17 00:00:00 2001 From: master <> Date: Thu, 2 Apr 2026 19:10:36 +0300 Subject: [PATCH] Add diagnostic logging to consumer loop and guard for transport debugging --- .../MessagingTransportClient.cs | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportClient.cs b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportClient.cs index 269d22829..bcb41bb91 100644 --- a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportClient.cs +++ b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportClient.cs @@ -235,23 +235,32 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr { try { + _logger.LogTrace("Consumer loop: calling LeaseAsync (batch={BatchSize})", _options.BatchSize); + var sw = System.Diagnostics.Stopwatch.StartNew(); + var leases = await _serviceIncomingQueue.LeaseAsync( new LeaseRequest { BatchSize = _options.BatchSize }, cancellationToken); - consecutiveFailures = 0; // Reset on success + sw.Stop(); + consecutiveFailures = 0; - // Process messages concurrently to prevent head-of-line blocking. - // One slow handler must not block delivery of other messages. if (leases.Count > 0) { + _logger.LogDebug("Consumer loop: leased {Count} messages in {ElapsedMs}ms, processing concurrently", + leases.Count, sw.ElapsedMilliseconds); + await Task.WhenAll(leases.Select(lease => ProcessLeaseWithGuardAsync(lease, HandleIncomingRequestAsync, cancellationToken))); + + _logger.LogDebug("Consumer loop: finished processing {Count} messages", leases.Count); } else { - // Wait for Pub/Sub notification instead of busy-polling. + _logger.LogTrace("Consumer loop: no messages, waiting for notification (lease took {ElapsedMs}ms)", + sw.ElapsedMilliseconds); await _serviceIncomingQueue.WaitForMessagesAsync(_options.HeartbeatInterval, cancellationToken); + _logger.LogTrace("Consumer loop: notification received or timeout, resuming"); } } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) @@ -698,26 +707,30 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr CancellationToken cancellationToken) where TMessage : class { + var sw = System.Diagnostics.Stopwatch.StartNew(); try { using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - timeoutCts.CancelAfter(TimeSpan.FromSeconds(55)); // 55s < 60s gateway timeout + timeoutCts.CancelAfter(TimeSpan.FromSeconds(55)); + _logger.LogDebug("Guard: processing message {MessageId}", lease.MessageId); await handler(lease, timeoutCts.Token); + sw.Stop(); + _logger.LogDebug("Guard: message {MessageId} processed in {ElapsedMs}ms", lease.MessageId, sw.ElapsedMilliseconds); await lease.AcknowledgeAsync(cancellationToken); } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { - // Graceful shutdown — let lease expire naturally + _logger.LogDebug("Guard: message {MessageId} cancelled (shutdown) after {ElapsedMs}ms", lease.MessageId, sw.ElapsedMilliseconds); } catch (OperationCanceledException) { - _logger.LogWarning("Message {MessageId} processing timed out after 55s", lease.MessageId); + _logger.LogWarning("Guard: message {MessageId} TIMED OUT after {ElapsedMs}ms (55s limit)", lease.MessageId, sw.ElapsedMilliseconds); await TryReleaseAsync(lease, ReleaseDisposition.Retry, cancellationToken); } catch (Exception ex) { - _logger.LogError(ex, "Error processing message {MessageId}", lease.MessageId); + _logger.LogError(ex, "Guard: message {MessageId} FAILED after {ElapsedMs}ms", lease.MessageId, sw.ElapsedMilliseconds); await TryReleaseAsync(lease, ReleaseDisposition.Retry, cancellationToken); } }