qa iteration 4
Add Valkey messaging transport auto-reconnection: - MessagingTransportClient: detect persistent Redis failures (5 consecutive) and exit processing loops instead of retrying forever with dead connection - IMicroserviceTransport: add TransportDied event to interface - RouterConnectionManager: listen for TransportDied, auto-reconnect after 2s - Fixes services becoming unreachable after Valkey blip during restarts Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -134,6 +134,9 @@ public sealed class RouterConnectionManager : IRouterConnectionManager, IDisposa
|
||||
// Establish transport connection to the gateway (InMemory/TCP/RabbitMQ/etc).
|
||||
if (_microserviceTransport is not null)
|
||||
{
|
||||
// Listen for transport death to trigger automatic reconnection
|
||||
_microserviceTransport.TransportDied += OnTransportDied;
|
||||
|
||||
var instance = new InstanceDescriptor
|
||||
{
|
||||
InstanceId = _options.InstanceId,
|
||||
@@ -306,6 +309,56 @@ public sealed class RouterConnectionManager : IRouterConnectionManager, IDisposa
|
||||
}
|
||||
}
|
||||
|
||||
private void OnTransportDied()
|
||||
{
|
||||
if (_disposed) return;
|
||||
|
||||
_logger.LogWarning(
|
||||
"Messaging transport died for {ServiceName}/{Version}. Scheduling reconnection...",
|
||||
_options.ServiceName, _options.Version);
|
||||
|
||||
// Fire-and-forget reconnection on a background thread
|
||||
_ = Task.Run(async () =>
|
||||
{
|
||||
try
|
||||
{
|
||||
await Task.Delay(2000); // Brief pause before reconnecting
|
||||
|
||||
if (_disposed || _cts.IsCancellationRequested) return;
|
||||
|
||||
_logger.LogInformation("Attempting messaging transport reconnection...");
|
||||
|
||||
if (_microserviceTransport is null || _endpoints is null) return;
|
||||
|
||||
var instance = new InstanceDescriptor
|
||||
{
|
||||
InstanceId = _options.InstanceId,
|
||||
ServiceName = _options.ServiceName,
|
||||
Version = _options.Version,
|
||||
Region = _options.Region
|
||||
};
|
||||
|
||||
await _microserviceTransport.ConnectAsync(
|
||||
instance,
|
||||
_endpoints,
|
||||
_schemas,
|
||||
_openApiInfo,
|
||||
_cts.Token);
|
||||
|
||||
_logger.LogInformation(
|
||||
"Messaging transport reconnected for {ServiceName}/{Version}",
|
||||
_options.ServiceName, _options.Version);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex,
|
||||
"Failed to reconnect messaging transport for {ServiceName}/{Version}. " +
|
||||
"Service will need manual restart.",
|
||||
_options.ServiceName, _options.Version);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private async Task HeartbeatLoopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
|
||||
@@ -58,4 +58,10 @@ public interface IMicroserviceTransport
|
||||
/// Event raised when a CANCEL frame is received from the gateway.
|
||||
/// </summary>
|
||||
event Func<Guid, string?, Task>? OnCancelReceived;
|
||||
|
||||
/// <summary>
|
||||
/// Raised when the transport connection is permanently lost and cannot recover.
|
||||
/// Consumers should reconnect by calling <see cref="ConnectAsync"/> again.
|
||||
/// </summary>
|
||||
event Action? TransportDied { add { } remove { } }
|
||||
}
|
||||
|
||||
@@ -35,6 +35,13 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
|
||||
private string? _connectionId;
|
||||
private InstanceDescriptor? _instance;
|
||||
private bool _disposed;
|
||||
private volatile bool _transportDead;
|
||||
|
||||
/// <summary>
|
||||
/// Max consecutive transport-level failures before the processing loops exit.
|
||||
/// After this threshold, the transport is considered dead and must be reconnected.
|
||||
/// </summary>
|
||||
private const int MaxConsecutiveTransportFailures = 5;
|
||||
|
||||
/// <inheritdoc />
|
||||
public event Func<Frame, CancellationToken, Task<Frame>>? OnRequestReceived;
|
||||
@@ -147,12 +154,26 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
|
||||
var incomingTask = ProcessIncomingRequestsAsync(cancellationToken);
|
||||
|
||||
await Task.WhenAll(responseTask, incomingTask);
|
||||
|
||||
if (_transportDead && !cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
_logger.LogWarning("Messaging transport died. Raising TransportDied event for reconnection.");
|
||||
TransportDied?.Invoke();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Raised when the messaging transport connection is permanently lost.
|
||||
/// The <see cref="RouterConnectionManager"/> should listen for this and reconnect.
|
||||
/// </summary>
|
||||
public event Action? TransportDied;
|
||||
|
||||
private async Task ProcessResponsesAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (_responseQueue is null) return;
|
||||
|
||||
var consecutiveFailures = 0;
|
||||
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
@@ -161,6 +182,8 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
|
||||
new LeaseRequest { BatchSize = _options.BatchSize },
|
||||
cancellationToken);
|
||||
|
||||
consecutiveFailures = 0; // Reset on success
|
||||
|
||||
foreach (var lease in leases)
|
||||
{
|
||||
try
|
||||
@@ -196,8 +219,20 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error in response processing loop");
|
||||
await Task.Delay(1000, cancellationToken);
|
||||
consecutiveFailures++;
|
||||
_logger.LogError(ex, "Error in response processing loop (failure {Count})", consecutiveFailures);
|
||||
|
||||
if (consecutiveFailures >= MaxConsecutiveTransportFailures)
|
||||
{
|
||||
_logger.LogCritical(
|
||||
"Response processing loop exceeded {Max} consecutive failures. " +
|
||||
"Messaging transport is considered dead and will be reconnected.",
|
||||
MaxConsecutiveTransportFailures);
|
||||
_transportDead = true;
|
||||
break;
|
||||
}
|
||||
|
||||
await Task.Delay(Math.Min(1000 * consecutiveFailures, 10_000), cancellationToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -206,6 +241,8 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
|
||||
{
|
||||
if (_serviceIncomingQueue is null) return;
|
||||
|
||||
var consecutiveFailures = 0;
|
||||
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
@@ -214,6 +251,8 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
|
||||
new LeaseRequest { BatchSize = _options.BatchSize },
|
||||
cancellationToken);
|
||||
|
||||
consecutiveFailures = 0; // Reset on success
|
||||
|
||||
foreach (var lease in leases)
|
||||
{
|
||||
try
|
||||
@@ -243,8 +282,20 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error in incoming request processing loop");
|
||||
await Task.Delay(1000, cancellationToken);
|
||||
consecutiveFailures++;
|
||||
_logger.LogError(ex, "Error in incoming request processing loop (failure {Count})", consecutiveFailures);
|
||||
|
||||
if (consecutiveFailures >= MaxConsecutiveTransportFailures)
|
||||
{
|
||||
_logger.LogCritical(
|
||||
"Incoming request loop exceeded {Max} consecutive failures. " +
|
||||
"Messaging transport is considered dead and will be reconnected.",
|
||||
MaxConsecutiveTransportFailures);
|
||||
_transportDead = true;
|
||||
break;
|
||||
}
|
||||
|
||||
await Task.Delay(Math.Min(1000 * consecutiveFailures, 10_000), cancellationToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user