Retry microservice startup and validate async Valkey connects
This commit is contained in:
@@ -25,7 +25,7 @@ public sealed class ValkeyConnectionFactory : IAsyncDisposable
|
||||
_options = options.Value;
|
||||
_logger = logger;
|
||||
_connectionFactory = connectionFactory ??
|
||||
(config => Task.FromResult<IConnectionMultiplexer>(ConnectionMultiplexer.Connect(config)));
|
||||
(async config => await ConnectionMultiplexer.ConnectAsync(config).ConfigureAwait(false));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -70,6 +70,16 @@ public sealed class ValkeyConnectionFactory : IAsyncDisposable
|
||||
|
||||
_logger?.LogDebug("Connecting to Valkey at {Endpoint}", _options.ConnectionString);
|
||||
_connection = await _connectionFactory(config).ConfigureAwait(false);
|
||||
if (!_connection.IsConnected)
|
||||
{
|
||||
await _connection.CloseAsync().ConfigureAwait(false);
|
||||
_connection.Dispose();
|
||||
_connection = null;
|
||||
throw new RedisConnectionException(
|
||||
ConnectionFailureType.UnableToConnect,
|
||||
$"Failed to connect to Valkey at {_options.ConnectionString}");
|
||||
}
|
||||
|
||||
_logger?.LogInformation("Connected to Valkey");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
|
||||
namespace StellaOps.Microservice;
|
||||
|
||||
@@ -11,6 +12,7 @@ public sealed class MicroserviceHostedService : IHostedService
|
||||
private readonly IRouterConnectionManager _connectionManager;
|
||||
private readonly IHostApplicationLifetime _applicationLifetime;
|
||||
private readonly ILogger<MicroserviceHostedService> _logger;
|
||||
private readonly StellaMicroserviceOptions _options;
|
||||
private readonly SchemaProviderDiscoveryDiagnostics? _schemaDiagnostics;
|
||||
private readonly CancellationTokenSource _startupCts = new();
|
||||
private Task? _startupTask;
|
||||
@@ -23,11 +25,13 @@ public sealed class MicroserviceHostedService : IHostedService
|
||||
IRouterConnectionManager connectionManager,
|
||||
IHostApplicationLifetime applicationLifetime,
|
||||
ILogger<MicroserviceHostedService> logger,
|
||||
IOptions<StellaMicroserviceOptions> options,
|
||||
SchemaProviderDiscoveryDiagnostics? schemaDiagnostics = null)
|
||||
{
|
||||
_connectionManager = connectionManager;
|
||||
_applicationLifetime = applicationLifetime;
|
||||
_logger = logger;
|
||||
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
|
||||
_schemaDiagnostics = schemaDiagnostics;
|
||||
}
|
||||
|
||||
@@ -93,24 +97,72 @@ public sealed class MicroserviceHostedService : IHostedService
|
||||
|
||||
private async Task StartWhenApplicationReadyAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var retryDelay = NormalizeRetryDelay(_options.ReconnectBackoffInitial, TimeSpan.FromSeconds(1));
|
||||
var maxRetryDelay = NormalizeRetryDelay(_options.ReconnectBackoffMax, TimeSpan.FromMinutes(1));
|
||||
if (maxRetryDelay < retryDelay)
|
||||
{
|
||||
maxRetryDelay = retryDelay;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
await WaitForApplicationStartedAsync(cancellationToken);
|
||||
await _connectionManager.StartAsync(cancellationToken);
|
||||
_isStarted = true;
|
||||
_logger.LogInformation("Stella microservice started");
|
||||
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _connectionManager.StartAsync(cancellationToken);
|
||||
_isStarted = true;
|
||||
_logger.LogInformation("Stella microservice started");
|
||||
return;
|
||||
}
|
||||
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
_logger.LogDebug("Stella microservice startup cancelled");
|
||||
return;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(
|
||||
ex,
|
||||
"Stella microservice startup failed. Retrying in {RetryDelay}.",
|
||||
retryDelay);
|
||||
|
||||
await ResetPartialStartupAsync().ConfigureAwait(false);
|
||||
await Task.Delay(retryDelay, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
retryDelay = TimeSpan.FromTicks(Math.Min(
|
||||
retryDelay.Ticks * 2,
|
||||
maxRetryDelay.Ticks));
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
_logger.LogDebug("Stella microservice startup cancelled");
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ResetPartialStartupAsync()
|
||||
{
|
||||
try
|
||||
{
|
||||
await _connectionManager.StopAsync(CancellationToken.None).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Stella microservice failed to start");
|
||||
_applicationLifetime.StopApplication();
|
||||
_logger.LogDebug(ex, "Failed to reset partial Stella microservice startup state");
|
||||
}
|
||||
}
|
||||
|
||||
private static TimeSpan NormalizeRetryDelay(TimeSpan configuredDelay, TimeSpan fallback)
|
||||
{
|
||||
return configuredDelay > TimeSpan.Zero
|
||||
? configuredDelay
|
||||
: fallback;
|
||||
}
|
||||
|
||||
private Task WaitForApplicationStartedAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (_applicationLifetime.ApplicationStarted.IsCancellationRequested)
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Extensions.Options;
|
||||
using Moq;
|
||||
using StackExchange.Redis;
|
||||
|
||||
namespace StellaOps.Messaging.Transport.Valkey.Tests;
|
||||
|
||||
public sealed class ValkeyConnectionFactoryTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task GetConnectionAsync_Throws_WhenFactoryReturnsDisconnectedConnection()
|
||||
{
|
||||
var multiplexer = new Mock<IConnectionMultiplexer>();
|
||||
multiplexer.SetupGet(connection => connection.IsConnected).Returns(false);
|
||||
multiplexer.Setup(connection => connection.CloseAsync(It.IsAny<bool>())).Returns(Task.CompletedTask);
|
||||
|
||||
var options = Options.Create(new ValkeyTransportOptions
|
||||
{
|
||||
ConnectionString = "cache.stella-ops.local:6379"
|
||||
});
|
||||
|
||||
await using var factory = new ValkeyConnectionFactory(
|
||||
options,
|
||||
NullLogger<ValkeyConnectionFactory>.Instance,
|
||||
_ => Task.FromResult(multiplexer.Object));
|
||||
|
||||
var act = () => factory.GetConnectionAsync(CancellationToken.None).AsTask();
|
||||
|
||||
await act.Should()
|
||||
.ThrowAsync<RedisConnectionException>()
|
||||
.WithMessage("*cache.stella-ops.local:6379*");
|
||||
|
||||
multiplexer.Verify(connection => connection.CloseAsync(It.IsAny<bool>()), Times.Once);
|
||||
multiplexer.Verify(connection => connection.Dispose(), Times.Once);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,148 @@
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Extensions.Options;
|
||||
using Moq;
|
||||
using StellaOps.TestKit;
|
||||
|
||||
namespace StellaOps.Microservice.Tests;
|
||||
|
||||
public sealed class MicroserviceHostedServiceTests
|
||||
{
|
||||
[Trait("Category", TestCategories.Unit)]
|
||||
[Fact]
|
||||
public async Task StartAsync_RetriesTransientFailures_WithoutStoppingHost()
|
||||
{
|
||||
var attempts = 0;
|
||||
var connected = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
var connectionManager = new Mock<IRouterConnectionManager>();
|
||||
connectionManager
|
||||
.Setup(manager => manager.StartAsync(It.IsAny<CancellationToken>()))
|
||||
.Returns<CancellationToken>(_ =>
|
||||
{
|
||||
var attempt = Interlocked.Increment(ref attempts);
|
||||
if (attempt < 3)
|
||||
{
|
||||
throw new InvalidOperationException($"transient-{attempt}");
|
||||
}
|
||||
|
||||
connected.TrySetResult();
|
||||
return Task.CompletedTask;
|
||||
});
|
||||
connectionManager
|
||||
.Setup(manager => manager.StopAsync(It.IsAny<CancellationToken>()))
|
||||
.Returns(Task.CompletedTask);
|
||||
|
||||
var lifetime = new TestHostApplicationLifetime();
|
||||
var options = Options.Create(new StellaMicroserviceOptions
|
||||
{
|
||||
ServiceName = "test-service",
|
||||
Version = "1.0.0",
|
||||
Region = "local",
|
||||
ReconnectBackoffInitial = TimeSpan.FromMilliseconds(10),
|
||||
ReconnectBackoffMax = TimeSpan.FromMilliseconds(20)
|
||||
});
|
||||
|
||||
var service = new MicroserviceHostedService(
|
||||
connectionManager.Object,
|
||||
lifetime,
|
||||
NullLogger<MicroserviceHostedService>.Instance,
|
||||
options);
|
||||
|
||||
await service.StartAsync(CancellationToken.None);
|
||||
lifetime.SignalStarted();
|
||||
|
||||
await connected.Task.WaitAsync(TimeSpan.FromSeconds(1));
|
||||
await service.StopAsync(CancellationToken.None);
|
||||
|
||||
attempts.Should().BeGreaterThanOrEqualTo(3);
|
||||
lifetime.StopRequested.Should().BeFalse();
|
||||
connectionManager.Verify(manager => manager.StopAsync(It.IsAny<CancellationToken>()), Times.AtLeast(3));
|
||||
}
|
||||
|
||||
[Trait("Category", TestCategories.Unit)]
|
||||
[Fact]
|
||||
public async Task StopAsync_CancelsRetryLoop_WhenStartupNeverSucceeds()
|
||||
{
|
||||
var attempts = 0;
|
||||
var secondAttemptObserved = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
var connectionManager = new Mock<IRouterConnectionManager>();
|
||||
connectionManager
|
||||
.Setup(manager => manager.StartAsync(It.IsAny<CancellationToken>()))
|
||||
.Returns<CancellationToken>(_ =>
|
||||
{
|
||||
if (Interlocked.Increment(ref attempts) >= 2)
|
||||
{
|
||||
secondAttemptObserved.TrySetResult();
|
||||
}
|
||||
|
||||
throw new InvalidOperationException("router unavailable");
|
||||
});
|
||||
connectionManager
|
||||
.Setup(manager => manager.StopAsync(It.IsAny<CancellationToken>()))
|
||||
.Returns(Task.CompletedTask);
|
||||
|
||||
var lifetime = new TestHostApplicationLifetime();
|
||||
var options = Options.Create(new StellaMicroserviceOptions
|
||||
{
|
||||
ServiceName = "test-service",
|
||||
Version = "1.0.0",
|
||||
Region = "local",
|
||||
ReconnectBackoffInitial = TimeSpan.FromMilliseconds(10),
|
||||
ReconnectBackoffMax = TimeSpan.FromMilliseconds(20)
|
||||
});
|
||||
|
||||
var service = new MicroserviceHostedService(
|
||||
connectionManager.Object,
|
||||
lifetime,
|
||||
NullLogger<MicroserviceHostedService>.Instance,
|
||||
options);
|
||||
|
||||
await service.StartAsync(CancellationToken.None);
|
||||
lifetime.SignalStarted();
|
||||
|
||||
await secondAttemptObserved.Task.WaitAsync(TimeSpan.FromSeconds(1));
|
||||
await service.StopAsync(new CancellationTokenSource(TimeSpan.FromSeconds(1)).Token);
|
||||
|
||||
lifetime.StopRequested.Should().BeFalse();
|
||||
attempts.Should().BeGreaterThanOrEqualTo(2);
|
||||
}
|
||||
|
||||
private sealed class TestHostApplicationLifetime : IHostApplicationLifetime
|
||||
{
|
||||
private readonly CancellationTokenSource _started = new();
|
||||
private readonly CancellationTokenSource _stopping = new();
|
||||
private readonly CancellationTokenSource _stopped = new();
|
||||
|
||||
public CancellationToken ApplicationStarted => _started.Token;
|
||||
|
||||
public CancellationToken ApplicationStopping => _stopping.Token;
|
||||
|
||||
public CancellationToken ApplicationStopped => _stopped.Token;
|
||||
|
||||
public bool StopRequested { get; private set; }
|
||||
|
||||
public void StopApplication()
|
||||
{
|
||||
StopRequested = true;
|
||||
if (!_stopping.IsCancellationRequested)
|
||||
{
|
||||
_stopping.Cancel();
|
||||
}
|
||||
|
||||
if (!_stopped.IsCancellationRequested)
|
||||
{
|
||||
_stopped.Cancel();
|
||||
}
|
||||
}
|
||||
|
||||
public void SignalStarted()
|
||||
{
|
||||
if (!_started.IsCancellationRequested)
|
||||
{
|
||||
_started.Cancel();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user