using System.Collections.Concurrent;
using System.Text;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StellaOps.Router.Common.Abstractions;
using StellaOps.Router.Common.Enums;
using StellaOps.Router.Common.Frames;
using StellaOps.Router.Common.Models;
namespace StellaOps.Microservice;
///
/// Manages connections to router gateways.
///
public sealed class RouterConnectionManager : IRouterConnectionManager, IDisposable
{
private readonly StellaMicroserviceOptions _options;
private readonly IEndpointDiscoveryProvider _endpointDiscovery;
private readonly RequestDispatcher _requestDispatcher;
private readonly IMicroserviceTransport? _microserviceTransport;
private readonly IGeneratedEndpointProvider? _generatedProvider;
private readonly ILogger _logger;
private readonly ConcurrentDictionary _connections = new();
private readonly CancellationTokenSource _cts = new();
private IReadOnlyList? _endpoints;
private IReadOnlyDictionary? _schemas;
private ServiceOpenApiInfo? _openApiInfo;
private Task? _heartbeatTask;
private bool _disposed;
private volatile InstanceHealthStatus _currentStatus = InstanceHealthStatus.Healthy;
private int _inFlightRequestCount;
private double _errorRate;
///
public IReadOnlyList Connections => [.. _connections.Values];
///
/// Initializes a new instance of the class.
///
public RouterConnectionManager(
IOptions options,
IEndpointDiscoveryProvider endpointDiscovery,
RequestDispatcher requestDispatcher,
IMicroserviceTransport? microserviceTransport,
ILogger logger,
IGeneratedEndpointProvider? generatedProvider = null)
{
_options = options.Value;
_endpointDiscovery = endpointDiscovery;
_requestDispatcher = requestDispatcher;
_microserviceTransport = microserviceTransport;
_generatedProvider = generatedProvider;
_logger = logger;
}
///
/// Gets or sets the current health status reported by this instance.
///
public InstanceHealthStatus CurrentStatus
{
get => _currentStatus;
set => _currentStatus = value;
}
///
/// Gets or sets the count of in-flight requests.
///
public int InFlightRequestCount
{
get => _inFlightRequestCount;
set => _inFlightRequestCount = value;
}
///
/// Gets or sets the error rate (0.0 to 1.0).
///
public double ErrorRate
{
get => _errorRate;
set => _errorRate = value;
}
///
public async Task StartAsync(CancellationToken cancellationToken)
{
ObjectDisposedException.ThrowIf(_disposed, this);
_options.Validate();
_logger.LogInformation(
"Starting router connection manager for {ServiceName}/{Version}",
_options.ServiceName,
_options.Version);
// Discover endpoints
_endpoints = _endpointDiscovery.DiscoverEndpoints();
_logger.LogInformation("Discovered {EndpointCount} endpoints", _endpoints.Count);
// Wire request handling before transport connect to avoid a race after HELLO.
if (_microserviceTransport is not null)
{
_microserviceTransport.OnRequestReceived += HandleRequestReceivedAsync;
}
// Get schema definitions from generated provider
_schemas = _generatedProvider?.GetSchemaDefinitions()
?? new Dictionary();
_logger.LogInformation("Discovered {SchemaCount} schemas", _schemas.Count);
// Build OpenAPI info from options
_openApiInfo = new ServiceOpenApiInfo
{
Title = _options.ServiceName,
Description = _options.ServiceDescription,
Contact = _options.ContactInfo
};
// Connect to each router
foreach (var router in _options.Routers)
{
await ConnectToRouterAsync(router, cancellationToken);
}
// Establish transport connection to the gateway (InMemory/TCP/RabbitMQ/etc).
if (_microserviceTransport is not null)
{
var instance = new InstanceDescriptor
{
InstanceId = _options.InstanceId,
ServiceName = _options.ServiceName,
Version = _options.Version,
Region = _options.Region
};
await _microserviceTransport.ConnectAsync(instance, _endpoints, cancellationToken);
}
else
{
_logger.LogWarning("No microservice transport configured; skipping transport connection.");
}
// Start heartbeat task
_heartbeatTask = Task.Run(() => HeartbeatLoopAsync(_cts.Token), CancellationToken.None);
}
///
public async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Stopping router connection manager");
await _cts.CancelAsync();
if (_microserviceTransport is not null)
{
try
{
await _microserviceTransport.DisconnectAsync();
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to disconnect transport");
}
finally
{
_microserviceTransport.OnRequestReceived -= HandleRequestReceivedAsync;
}
}
if (_heartbeatTask is not null)
{
try
{
await _heartbeatTask.WaitAsync(cancellationToken);
}
catch (OperationCanceledException)
{
// Expected
}
}
_connections.Clear();
}
private async Task HandleRequestReceivedAsync(Frame frame, CancellationToken cancellationToken)
{
var request = FrameConverter.ToRequestFrame(frame);
if (request is null)
{
_logger.LogWarning(
"Received invalid request frame: type={FrameType}, correlationId={CorrelationId}",
frame.Type,
frame.CorrelationId ?? "(null)");
var error = new ResponseFrame
{
RequestId = frame.CorrelationId ?? Guid.NewGuid().ToString("N"),
StatusCode = 400,
Headers = new Dictionary
{
["Content-Type"] = "text/plain; charset=utf-8"
},
Payload = Encoding.UTF8.GetBytes("Invalid request frame")
};
var errorFrame = FrameConverter.ToFrame(error);
return frame.CorrelationId is null
? errorFrame
: errorFrame with { CorrelationId = frame.CorrelationId };
}
var response = await _requestDispatcher.DispatchAsync(request, cancellationToken);
var responseFrame = FrameConverter.ToFrame(response);
// Ensure correlation ID matches the incoming request for transport-level matching.
return frame.CorrelationId is null
? responseFrame
: responseFrame with { CorrelationId = frame.CorrelationId };
}
private async Task ConnectToRouterAsync(RouterEndpointConfig router, CancellationToken cancellationToken)
{
var connectionId = $"{router.Host}:{router.Port}";
var backoff = _options.ReconnectBackoffInitial;
while (!cancellationToken.IsCancellationRequested)
{
try
{
_logger.LogInformation(
"Connecting to router at {Host}:{Port} via {Transport}",
router.Host,
router.Port,
router.TransportType);
// Create connection state
var instance = new InstanceDescriptor
{
InstanceId = _options.InstanceId,
ServiceName = _options.ServiceName,
Version = _options.Version,
Region = _options.Region
};
var state = new ConnectionState
{
ConnectionId = connectionId,
Instance = instance,
Status = InstanceHealthStatus.Healthy,
LastHeartbeatUtc = DateTime.UtcNow,
TransportType = router.TransportType,
Schemas = _schemas ?? new Dictionary(),
OpenApiInfo = _openApiInfo
};
// Register endpoints
foreach (var endpoint in _endpoints ?? [])
{
state.Endpoints[(endpoint.Method, endpoint.Path)] = endpoint;
}
_connections[connectionId] = state;
// For InMemory transport, connectivity is handled via the transport client
// Real transports will establish actual network connections here
_logger.LogInformation(
"Connected to router at {Host}:{Port}, registered {EndpointCount} endpoints",
router.Host,
router.Port,
_endpoints?.Count ?? 0);
// Reset backoff on successful connection
backoff = _options.ReconnectBackoffInitial;
return;
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Failed to connect to router at {Host}:{Port}, retrying in {Backoff}",
router.Host,
router.Port,
backoff);
await Task.Delay(backoff, cancellationToken);
// Exponential backoff
backoff = TimeSpan.FromTicks(Math.Min(
backoff.Ticks * 2,
_options.ReconnectBackoffMax.Ticks));
}
}
}
private async Task HeartbeatLoopAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
await Task.Delay(_options.HeartbeatInterval, cancellationToken);
// Build heartbeat payload with current status and metrics
var heartbeat = new HeartbeatPayload
{
InstanceId = _options.InstanceId,
Status = _currentStatus,
InFlightRequestCount = _inFlightRequestCount,
ErrorRate = _errorRate,
TimestampUtc = DateTime.UtcNow
};
// Send heartbeat via transport
if (_microserviceTransport is not null)
{
try
{
await _microserviceTransport.SendHeartbeatAsync(heartbeat, cancellationToken);
_logger.LogDebug(
"Sent heartbeat: status={Status}, inflight={InFlight}, errorRate={ErrorRate:P1}",
heartbeat.Status,
heartbeat.InFlightRequestCount,
heartbeat.ErrorRate);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to send heartbeat");
}
}
// Update connection state local heartbeat times
foreach (var connection in _connections.Values)
{
connection.LastHeartbeatUtc = DateTime.UtcNow;
}
}
catch (OperationCanceledException)
{
// Expected on shutdown
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Unexpected error in heartbeat loop");
}
}
}
///
public void Dispose()
{
if (_disposed) return;
_disposed = true;
_cts.Cancel();
_cts.Dispose();
}
}