From 19b9c90a8d1f876f17768601ffa2abea3ef5b38c Mon Sep 17 00:00:00 2001 From: master <> Date: Thu, 12 Mar 2026 13:12:54 +0200 Subject: [PATCH] Retry microservice startup and validate async Valkey connects --- .../ValkeyConnectionFactory.cs | 12 +- .../MicroserviceHostedService.cs | 62 +++++++- .../ValkeyConnectionFactoryTests.cs | 36 +++++ .../MicroserviceHostedServiceTests.cs | 148 ++++++++++++++++++ 4 files changed, 252 insertions(+), 6 deletions(-) create mode 100644 src/Router/__Tests/StellaOps.Messaging.Transport.Valkey.Tests/ValkeyConnectionFactoryTests.cs create mode 100644 src/Router/__Tests/StellaOps.Microservice.Tests/MicroserviceHostedServiceTests.cs diff --git a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyConnectionFactory.cs b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyConnectionFactory.cs index 475d0989f..e67773b3b 100644 --- a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyConnectionFactory.cs +++ b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyConnectionFactory.cs @@ -25,7 +25,7 @@ public sealed class ValkeyConnectionFactory : IAsyncDisposable _options = options.Value; _logger = logger; _connectionFactory = connectionFactory ?? - (config => Task.FromResult(ConnectionMultiplexer.Connect(config))); + (async config => await ConnectionMultiplexer.ConnectAsync(config).ConfigureAwait(false)); } /// @@ -70,6 +70,16 @@ public sealed class ValkeyConnectionFactory : IAsyncDisposable _logger?.LogDebug("Connecting to Valkey at {Endpoint}", _options.ConnectionString); _connection = await _connectionFactory(config).ConfigureAwait(false); + if (!_connection.IsConnected) + { + await _connection.CloseAsync().ConfigureAwait(false); + _connection.Dispose(); + _connection = null; + throw new RedisConnectionException( + ConnectionFailureType.UnableToConnect, + $"Failed to connect to Valkey at {_options.ConnectionString}"); + } + _logger?.LogInformation("Connected to Valkey"); } } diff --git a/src/Router/__Libraries/StellaOps.Microservice/MicroserviceHostedService.cs b/src/Router/__Libraries/StellaOps.Microservice/MicroserviceHostedService.cs index 7604d79e9..63e12f4f2 100644 --- a/src/Router/__Libraries/StellaOps.Microservice/MicroserviceHostedService.cs +++ b/src/Router/__Libraries/StellaOps.Microservice/MicroserviceHostedService.cs @@ -1,5 +1,6 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; namespace StellaOps.Microservice; @@ -11,6 +12,7 @@ public sealed class MicroserviceHostedService : IHostedService private readonly IRouterConnectionManager _connectionManager; private readonly IHostApplicationLifetime _applicationLifetime; private readonly ILogger _logger; + private readonly StellaMicroserviceOptions _options; private readonly SchemaProviderDiscoveryDiagnostics? _schemaDiagnostics; private readonly CancellationTokenSource _startupCts = new(); private Task? _startupTask; @@ -23,11 +25,13 @@ public sealed class MicroserviceHostedService : IHostedService IRouterConnectionManager connectionManager, IHostApplicationLifetime applicationLifetime, ILogger logger, + IOptions options, SchemaProviderDiscoveryDiagnostics? schemaDiagnostics = null) { _connectionManager = connectionManager; _applicationLifetime = applicationLifetime; _logger = logger; + _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); _schemaDiagnostics = schemaDiagnostics; } @@ -93,24 +97,72 @@ public sealed class MicroserviceHostedService : IHostedService private async Task StartWhenApplicationReadyAsync(CancellationToken cancellationToken) { + var retryDelay = NormalizeRetryDelay(_options.ReconnectBackoffInitial, TimeSpan.FromSeconds(1)); + var maxRetryDelay = NormalizeRetryDelay(_options.ReconnectBackoffMax, TimeSpan.FromMinutes(1)); + if (maxRetryDelay < retryDelay) + { + maxRetryDelay = retryDelay; + } + try { await WaitForApplicationStartedAsync(cancellationToken); - await _connectionManager.StartAsync(cancellationToken); - _isStarted = true; - _logger.LogInformation("Stella microservice started"); + + while (!cancellationToken.IsCancellationRequested) + { + try + { + await _connectionManager.StartAsync(cancellationToken); + _isStarted = true; + _logger.LogInformation("Stella microservice started"); + return; + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + _logger.LogDebug("Stella microservice startup cancelled"); + return; + } + catch (Exception ex) + { + _logger.LogWarning( + ex, + "Stella microservice startup failed. Retrying in {RetryDelay}.", + retryDelay); + + await ResetPartialStartupAsync().ConfigureAwait(false); + await Task.Delay(retryDelay, cancellationToken).ConfigureAwait(false); + + retryDelay = TimeSpan.FromTicks(Math.Min( + retryDelay.Ticks * 2, + maxRetryDelay.Ticks)); + } + } } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { _logger.LogDebug("Stella microservice startup cancelled"); } + } + + private async Task ResetPartialStartupAsync() + { + try + { + await _connectionManager.StopAsync(CancellationToken.None).ConfigureAwait(false); + } catch (Exception ex) { - _logger.LogError(ex, "Stella microservice failed to start"); - _applicationLifetime.StopApplication(); + _logger.LogDebug(ex, "Failed to reset partial Stella microservice startup state"); } } + private static TimeSpan NormalizeRetryDelay(TimeSpan configuredDelay, TimeSpan fallback) + { + return configuredDelay > TimeSpan.Zero + ? configuredDelay + : fallback; + } + private Task WaitForApplicationStartedAsync(CancellationToken cancellationToken) { if (_applicationLifetime.ApplicationStarted.IsCancellationRequested) diff --git a/src/Router/__Tests/StellaOps.Messaging.Transport.Valkey.Tests/ValkeyConnectionFactoryTests.cs b/src/Router/__Tests/StellaOps.Messaging.Transport.Valkey.Tests/ValkeyConnectionFactoryTests.cs new file mode 100644 index 000000000..9d535a5b9 --- /dev/null +++ b/src/Router/__Tests/StellaOps.Messaging.Transport.Valkey.Tests/ValkeyConnectionFactoryTests.cs @@ -0,0 +1,36 @@ +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using Moq; +using StackExchange.Redis; + +namespace StellaOps.Messaging.Transport.Valkey.Tests; + +public sealed class ValkeyConnectionFactoryTests +{ + [Fact] + public async Task GetConnectionAsync_Throws_WhenFactoryReturnsDisconnectedConnection() + { + var multiplexer = new Mock(); + multiplexer.SetupGet(connection => connection.IsConnected).Returns(false); + multiplexer.Setup(connection => connection.CloseAsync(It.IsAny())).Returns(Task.CompletedTask); + + var options = Options.Create(new ValkeyTransportOptions + { + ConnectionString = "cache.stella-ops.local:6379" + }); + + await using var factory = new ValkeyConnectionFactory( + options, + NullLogger.Instance, + _ => Task.FromResult(multiplexer.Object)); + + var act = () => factory.GetConnectionAsync(CancellationToken.None).AsTask(); + + await act.Should() + .ThrowAsync() + .WithMessage("*cache.stella-ops.local:6379*"); + + multiplexer.Verify(connection => connection.CloseAsync(It.IsAny()), Times.Once); + multiplexer.Verify(connection => connection.Dispose(), Times.Once); + } +} diff --git a/src/Router/__Tests/StellaOps.Microservice.Tests/MicroserviceHostedServiceTests.cs b/src/Router/__Tests/StellaOps.Microservice.Tests/MicroserviceHostedServiceTests.cs new file mode 100644 index 000000000..fd9eb1da7 --- /dev/null +++ b/src/Router/__Tests/StellaOps.Microservice.Tests/MicroserviceHostedServiceTests.cs @@ -0,0 +1,148 @@ +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using Moq; +using StellaOps.TestKit; + +namespace StellaOps.Microservice.Tests; + +public sealed class MicroserviceHostedServiceTests +{ + [Trait("Category", TestCategories.Unit)] + [Fact] + public async Task StartAsync_RetriesTransientFailures_WithoutStoppingHost() + { + var attempts = 0; + var connected = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var connectionManager = new Mock(); + connectionManager + .Setup(manager => manager.StartAsync(It.IsAny())) + .Returns(_ => + { + var attempt = Interlocked.Increment(ref attempts); + if (attempt < 3) + { + throw new InvalidOperationException($"transient-{attempt}"); + } + + connected.TrySetResult(); + return Task.CompletedTask; + }); + connectionManager + .Setup(manager => manager.StopAsync(It.IsAny())) + .Returns(Task.CompletedTask); + + var lifetime = new TestHostApplicationLifetime(); + var options = Options.Create(new StellaMicroserviceOptions + { + ServiceName = "test-service", + Version = "1.0.0", + Region = "local", + ReconnectBackoffInitial = TimeSpan.FromMilliseconds(10), + ReconnectBackoffMax = TimeSpan.FromMilliseconds(20) + }); + + var service = new MicroserviceHostedService( + connectionManager.Object, + lifetime, + NullLogger.Instance, + options); + + await service.StartAsync(CancellationToken.None); + lifetime.SignalStarted(); + + await connected.Task.WaitAsync(TimeSpan.FromSeconds(1)); + await service.StopAsync(CancellationToken.None); + + attempts.Should().BeGreaterThanOrEqualTo(3); + lifetime.StopRequested.Should().BeFalse(); + connectionManager.Verify(manager => manager.StopAsync(It.IsAny()), Times.AtLeast(3)); + } + + [Trait("Category", TestCategories.Unit)] + [Fact] + public async Task StopAsync_CancelsRetryLoop_WhenStartupNeverSucceeds() + { + var attempts = 0; + var secondAttemptObserved = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var connectionManager = new Mock(); + connectionManager + .Setup(manager => manager.StartAsync(It.IsAny())) + .Returns(_ => + { + if (Interlocked.Increment(ref attempts) >= 2) + { + secondAttemptObserved.TrySetResult(); + } + + throw new InvalidOperationException("router unavailable"); + }); + connectionManager + .Setup(manager => manager.StopAsync(It.IsAny())) + .Returns(Task.CompletedTask); + + var lifetime = new TestHostApplicationLifetime(); + var options = Options.Create(new StellaMicroserviceOptions + { + ServiceName = "test-service", + Version = "1.0.0", + Region = "local", + ReconnectBackoffInitial = TimeSpan.FromMilliseconds(10), + ReconnectBackoffMax = TimeSpan.FromMilliseconds(20) + }); + + var service = new MicroserviceHostedService( + connectionManager.Object, + lifetime, + NullLogger.Instance, + options); + + await service.StartAsync(CancellationToken.None); + lifetime.SignalStarted(); + + await secondAttemptObserved.Task.WaitAsync(TimeSpan.FromSeconds(1)); + await service.StopAsync(new CancellationTokenSource(TimeSpan.FromSeconds(1)).Token); + + lifetime.StopRequested.Should().BeFalse(); + attempts.Should().BeGreaterThanOrEqualTo(2); + } + + private sealed class TestHostApplicationLifetime : IHostApplicationLifetime + { + private readonly CancellationTokenSource _started = new(); + private readonly CancellationTokenSource _stopping = new(); + private readonly CancellationTokenSource _stopped = new(); + + public CancellationToken ApplicationStarted => _started.Token; + + public CancellationToken ApplicationStopping => _stopping.Token; + + public CancellationToken ApplicationStopped => _stopped.Token; + + public bool StopRequested { get; private set; } + + public void StopApplication() + { + StopRequested = true; + if (!_stopping.IsCancellationRequested) + { + _stopping.Cancel(); + } + + if (!_stopped.IsCancellationRequested) + { + _stopped.Cancel(); + } + } + + public void SignalStarted() + { + if (!_started.IsCancellationRequested) + { + _started.Cancel(); + } + } + } +}