Add gateway registration resync and slim HELLO transport frames

Introduce GatewayRegistrationResyncService to recover stale registrations,
extract IGatewayTransportClient interface, add EndpointsUpdate and
RegistrationResyncRequest frame types, and expand transport test coverage.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
master
2026-04-06 08:51:48 +03:00
parent 4d82c346e3
commit e3e87942c7
19 changed files with 796 additions and 91 deletions

View File

@@ -107,13 +107,16 @@ RegisterGatewayTransportIfEnabled("tls", bootstrapOptions.Transports.Tls.Enabled
RegisterGatewayTransportIfEnabled("messaging", bootstrapOptions.Transports.Messaging.Enabled, "Gateway:Transports:Messaging");
builder.Services.AddSingleton<GatewayTransportClient>();
builder.Services.AddSingleton<IGatewayTransportClient>(sp => sp.GetRequiredService<GatewayTransportClient>());
builder.Services.AddSingleton<ITransportClient>(sp => sp.GetRequiredService<GatewayTransportClient>());
builder.Services.AddSingleton<GatewayRegistrationResyncService>();
builder.Services.AddSingleton(new GatewayRouteCatalog(bootstrapOptions.Routes));
builder.Services.AddSingleton<IOpenApiDocumentGenerator, OpenApiDocumentGenerator>();
builder.Services.AddSingleton<IRouterOpenApiDocumentCache, RouterOpenApiDocumentCache>();
builder.Services.AddHostedService<GatewayHostedService>();
builder.Services.AddSingleton<GatewayHostedService>();
builder.Services.AddHostedService(sp => sp.GetRequiredService<GatewayHostedService>());
builder.Services.AddHostedService<GatewayHealthMonitorService>();
builder.Services.AddSingleton<IDpopReplayCache, InMemoryDpopReplayCache>();
@@ -215,6 +218,26 @@ if (bootstrapOptions.OpenApi.Enabled)
app.MapRouterOpenApi();
}
app.MapPost(
"/api/v1/gateway/administration/router/resync",
async (
GatewayRegistrationResyncRequest? request,
GatewayRegistrationResyncService resyncService,
CancellationToken cancellationToken) =>
{
var result = await resyncService.RequestResyncAsync(
request?.ConnectionId,
string.IsNullOrWhiteSpace(request?.Reason)
? "administration-request"
: request!.Reason!,
force: true,
cancellationToken);
return result.MatchedCount == 0
? Results.NotFound(result)
: Results.Ok(result);
});
app.UseWhen(
context => !GatewayRoutes.IsSystemPath(context.Request.Path),
branch =>

View File

@@ -20,7 +20,8 @@ public sealed class GatewayHostedService : IHostedService
private readonly TlsTransportServer? _tlsServer;
private readonly MessagingTransportServer? _messagingServer;
private readonly IGlobalRoutingState _routingState;
private readonly GatewayTransportClient _transportClient;
private readonly IGatewayTransportClient _transportClient;
private readonly GatewayRegistrationResyncService _registrationResyncService;
private readonly IEffectiveClaimsStore _claimsStore;
private readonly IRouterOpenApiDocumentCache? _openApiCache;
private readonly IOptions<GatewayOptions> _options;
@@ -33,7 +34,8 @@ public sealed class GatewayHostedService : IHostedService
public GatewayHostedService(
IGlobalRoutingState routingState,
GatewayTransportClient transportClient,
IGatewayTransportClient transportClient,
GatewayRegistrationResyncService registrationResyncService,
IEffectiveClaimsStore claimsStore,
IOptions<GatewayOptions> options,
GatewayServiceStatus status,
@@ -48,6 +50,7 @@ public sealed class GatewayHostedService : IHostedService
_messagingServer = messagingServer;
_routingState = routingState;
_transportClient = transportClient;
_registrationResyncService = registrationResyncService;
_claimsStore = claimsStore;
_options = options;
_status = status;
@@ -110,6 +113,7 @@ public sealed class GatewayHostedService : IHostedService
{
_messagingServer.OnHelloReceived += HandleMessagingHello;
_messagingServer.OnHeartbeatReceived += HandleMessagingHeartbeat;
_messagingServer.OnEndpointsUpdated += HandleMessagingEndpointsUpdated;
_messagingServer.OnResponseReceived += HandleMessagingResponse;
_messagingServer.OnConnectionClosed += HandleMessagingDisconnection;
await _messagingServer.StartAsync(cancellationToken);
@@ -149,6 +153,7 @@ public sealed class GatewayHostedService : IHostedService
await _messagingServer.StopAsync(cancellationToken);
_messagingServer.OnHelloReceived -= HandleMessagingHello;
_messagingServer.OnHeartbeatReceived -= HandleMessagingHeartbeat;
_messagingServer.OnEndpointsUpdated -= HandleMessagingEndpointsUpdated;
_messagingServer.OnResponseReceived -= HandleMessagingResponse;
_messagingServer.OnConnectionClosed -= HandleMessagingDisconnection;
}
@@ -487,32 +492,50 @@ public sealed class GatewayHostedService : IHostedService
#region Messaging Transport Event Handlers
private Task HandleMessagingHello(ConnectionState state, HelloPayload payload)
private async Task HandleMessagingHello(ConnectionState state, HelloPayload payload)
{
// The MessagingTransportServer already built the ConnectionState with TransportType.Messaging
// We need to add it to the routing state and update the claims store
_routingState.AddConnection(state);
_claimsStore.UpdateFromMicroservice(payload.Instance.ServiceName, payload.Endpoints);
_openApiCache?.Invalidate();
if (payload.Endpoints.Count > 0)
{
_claimsStore.UpdateFromMicroservice(payload.Instance.ServiceName, payload.Endpoints);
_openApiCache?.Invalidate();
_registrationResyncService.ClearPending(state.ConnectionId);
_logger.LogInformation(
"Messaging connection registered: {ConnectionId} service={ServiceName} version={Version}",
state.ConnectionId,
state.Instance.ServiceName,
state.Instance.Version);
return Task.CompletedTask;
_logger.LogInformation(
"Messaging connection registered: {ConnectionId} service={ServiceName} version={Version}",
state.ConnectionId,
state.Instance.ServiceName,
state.Instance.Version);
}
else
{
await _registrationResyncService.RequestResyncAsync(
state.ConnectionId,
"service-startup",
force: true,
CancellationToken.None);
}
}
private Task HandleMessagingHeartbeat(ConnectionState state, HeartbeatPayload payload)
private async Task HandleMessagingHeartbeat(ConnectionState state, HeartbeatPayload payload)
{
var knownConnection = _routingState.GetConnection(state.ConnectionId);
if (knownConnection is null)
{
_routingState.AddConnection(state);
await _registrationResyncService.RequestResyncAsync(
state.ConnectionId,
"gateway-state-miss",
force: false,
CancellationToken.None);
return;
}
_routingState.UpdateConnection(state.ConnectionId, conn =>
{
conn.LastHeartbeatUtc = DateTime.UtcNow;
conn.Status = payload.Status;
});
return Task.CompletedTask;
}
private Task HandleMessagingResponse(ConnectionState state, Frame frame)
@@ -521,8 +544,18 @@ public sealed class GatewayHostedService : IHostedService
return Task.CompletedTask;
}
private Task HandleMessagingEndpointsUpdated(ConnectionState state, EndpointsUpdatePayload payload)
{
_routingState.AddConnection(state);
_claimsStore.UpdateFromMicroservice(state.Instance.ServiceName, payload.Endpoints);
_openApiCache?.Invalidate();
_registrationResyncService.ClearPending(state.ConnectionId);
return Task.CompletedTask;
}
private Task HandleMessagingDisconnection(string connectionId)
{
_registrationResyncService.ClearPending(connectionId);
HandleDisconnect(connectionId);
return Task.CompletedTask;
}

View File

@@ -0,0 +1,3 @@
namespace StellaOps.Gateway.WebService.Services;
public sealed record GatewayRegistrationResyncRequest(string? ConnectionId, string? Reason);

View File

@@ -0,0 +1,129 @@
using System.Collections.Concurrent;
using System.Text.Json;
using StellaOps.Router.Common.Abstractions;
using StellaOps.Router.Common.Enums;
using StellaOps.Router.Common.Models;
namespace StellaOps.Gateway.WebService.Services;
public sealed class GatewayRegistrationResyncService
{
private static readonly TimeSpan RequestCooldown = TimeSpan.FromSeconds(15);
private static readonly JsonSerializerOptions JsonOptions = new()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
WriteIndented = false
};
private readonly IGlobalRoutingState _routingState;
private readonly IGatewayTransportClient _transportClient;
private readonly TimeProvider _timeProvider;
private readonly ILogger<GatewayRegistrationResyncService> _logger;
private readonly ConcurrentDictionary<string, DateTimeOffset> _lastRequestUtc = new(StringComparer.Ordinal);
public GatewayRegistrationResyncService(
IGlobalRoutingState routingState,
IGatewayTransportClient transportClient,
TimeProvider timeProvider,
ILogger<GatewayRegistrationResyncService> logger)
{
_routingState = routingState;
_transportClient = transportClient;
_timeProvider = timeProvider;
_logger = logger;
}
public async Task<GatewayRegistrationResyncResult> RequestResyncAsync(
string? connectionId,
string reason,
bool force,
CancellationToken cancellationToken)
{
var connections = string.IsNullOrWhiteSpace(connectionId)
? _routingState.GetAllConnections()
: _routingState.GetAllConnections()
.Where(connection => string.Equals(connection.ConnectionId, connectionId, StringComparison.Ordinal))
.ToArray();
var requestedCount = 0;
var skippedCount = 0;
foreach (var connection in connections)
{
if (await TryRequestResyncAsync(connection, reason, force, cancellationToken))
{
requestedCount++;
}
else
{
skippedCount++;
}
}
return new GatewayRegistrationResyncResult
{
ConnectionId = connectionId,
Reason = reason,
MatchedCount = connections.Count,
RequestedCount = requestedCount,
SkippedCount = skippedCount
};
}
public void ClearPending(string connectionId)
{
_lastRequestUtc.TryRemove(connectionId, out _);
}
private async Task<bool> TryRequestResyncAsync(
ConnectionState connection,
string reason,
bool force,
CancellationToken cancellationToken)
{
var nowUtc = _timeProvider.GetUtcNow();
if (!force &&
_lastRequestUtc.TryGetValue(connection.ConnectionId, out var lastRequestUtc) &&
nowUtc - lastRequestUtc < RequestCooldown)
{
return false;
}
var payload = new RegistrationResyncRequestPayload
{
Reason = reason
};
var frame = new Frame
{
Type = FrameType.ResyncRequest,
CorrelationId = null,
Payload = JsonSerializer.SerializeToUtf8Bytes(payload, JsonOptions)
};
await _transportClient.SendFrameAsync(connection, frame, cancellationToken);
_lastRequestUtc[connection.ConnectionId] = nowUtc;
_logger.LogInformation(
"Requested endpoint metadata replay for {ConnectionId} ({ServiceName}/{Version}): {Reason}",
connection.ConnectionId,
connection.Instance.ServiceName,
connection.Instance.Version,
reason);
return true;
}
}
public sealed record GatewayRegistrationResyncResult
{
public string? ConnectionId { get; init; }
public required string Reason { get; init; }
public int MatchedCount { get; init; }
public int RequestedCount { get; init; }
public int SkippedCount { get; init; }
}

View File

@@ -11,7 +11,7 @@ using System.Threading.Channels;
namespace StellaOps.Gateway.WebService.Services;
public sealed class GatewayTransportClient : ITransportClient
public sealed class GatewayTransportClient : IGatewayTransportClient
{
private readonly TcpTransportServer? _tcpServer;
private readonly TlsTransportServer? _tlsServer;
@@ -49,7 +49,7 @@ public sealed class GatewayTransportClient : ITransportClient
try
{
await SendFrameAsync(connection, frame, cancellationToken);
await DispatchFrameAsync(connection, frame, cancellationToken);
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeoutCts.CancelAfter(timeout);
@@ -71,7 +71,7 @@ public sealed class GatewayTransportClient : ITransportClient
Payload = ReadOnlyMemory<byte>.Empty
};
await SendFrameAsync(connection, frame, CancellationToken.None);
await DispatchFrameAsync(connection, frame, CancellationToken.None);
}
public async Task SendStreamingAsync(
@@ -102,7 +102,7 @@ public sealed class GatewayTransportClient : ITransportClient
try
{
await SendFrameAsync(connection, headerFrame, cancellationToken);
await DispatchFrameAsync(connection, headerFrame, cancellationToken);
await StreamRequestBodyAsync(connection, correlationId, requestBody, limits, cancellationToken);
using var responseStream = new MemoryStream();
@@ -142,7 +142,12 @@ public sealed class GatewayTransportClient : ITransportClient
_logger.LogDebug("No pending request for correlation ID {CorrelationId}", frame.CorrelationId);
}
private async Task SendFrameAsync(ConnectionState connection, Frame frame, CancellationToken cancellationToken)
public Task SendFrameAsync(ConnectionState connection, Frame frame, CancellationToken cancellationToken)
{
return DispatchFrameAsync(connection, frame, cancellationToken);
}
private async Task DispatchFrameAsync(ConnectionState connection, Frame frame, CancellationToken cancellationToken)
{
switch (connection.TransportType)
{
@@ -211,7 +216,7 @@ public sealed class GatewayTransportClient : ITransportClient
CorrelationId = correlationId,
Payload = new ReadOnlyMemory<byte>(buffer, 0, bytesRead)
};
await SendFrameAsync(connection, dataFrame, cancellationToken);
await DispatchFrameAsync(connection, dataFrame, cancellationToken);
}
var endFrame = new Frame
@@ -220,7 +225,7 @@ public sealed class GatewayTransportClient : ITransportClient
CorrelationId = correlationId,
Payload = ReadOnlyMemory<byte>.Empty
};
await SendFrameAsync(connection, endFrame, cancellationToken);
await DispatchFrameAsync(connection, endFrame, cancellationToken);
}
finally
{

View File

@@ -0,0 +1,11 @@
using StellaOps.Router.Common.Abstractions;
using StellaOps.Router.Common.Models;
namespace StellaOps.Gateway.WebService.Services;
public interface IGatewayTransportClient : ITransportClient
{
Task SendFrameAsync(ConnectionState connection, Frame frame, CancellationToken cancellationToken);
void HandleResponseFrame(Frame frame);
}

View File

@@ -101,6 +101,7 @@ public sealed class RouterConnectionManager : IRouterConnectionManager, IDisposa
if (_microserviceTransport is not null)
{
_microserviceTransport.OnRequestReceived += HandleRequestReceivedAsync;
_microserviceTransport.OnResyncRequested += HandleResyncRequestedAsync;
}
// Get schema definitions from generated provider and runtime discovery provider.
@@ -128,7 +129,12 @@ public sealed class RouterConnectionManager : IRouterConnectionManager, IDisposa
{
// Listen for transport death to trigger automatic reconnection
_microserviceTransport.TransportDied += OnTransportDied;
await SendRegistrationRefreshAsync(cancellationToken);
await _microserviceTransport.ConnectAsync(
CreateInstanceDescriptor(),
_endpoints,
_schemas,
_openApiInfo,
cancellationToken);
}
else
{
@@ -159,6 +165,7 @@ public sealed class RouterConnectionManager : IRouterConnectionManager, IDisposa
finally
{
_microserviceTransport.OnRequestReceived -= HandleRequestReceivedAsync;
_microserviceTransport.OnResyncRequested -= HandleResyncRequestedAsync;
}
}
@@ -308,7 +315,12 @@ public sealed class RouterConnectionManager : IRouterConnectionManager, IDisposa
if (_microserviceTransport is null || _endpoints is null) return;
await SendRegistrationRefreshAsync(_cts.Token);
await _microserviceTransport.ConnectAsync(
CreateInstanceDescriptor(),
_endpoints,
_schemas,
_openApiInfo,
_cts.Token);
_logger.LogInformation(
"Messaging transport reconnected for {ServiceName}/{Version}",
@@ -327,44 +339,24 @@ public sealed class RouterConnectionManager : IRouterConnectionManager, IDisposa
private async Task HeartbeatLoopAsync(CancellationToken cancellationToken)
{
var nextHeartbeatDueUtc = DateTime.UtcNow + _options.HeartbeatInterval;
var nextRegistrationRefreshDueUtc = DateTime.UtcNow + _options.RegistrationRefreshInterval;
while (!cancellationToken.IsCancellationRequested)
{
try
{
var nowUtc = DateTime.UtcNow;
var delay = Min(nextHeartbeatDueUtc, nextRegistrationRefreshDueUtc) - nowUtc;
var delay = nextHeartbeatDueUtc - nowUtc;
if (delay > TimeSpan.Zero)
{
await Task.Delay(delay, cancellationToken);
nowUtc = DateTime.UtcNow;
}
if (_microserviceTransport is not null &&
_endpoints is not null &&
nowUtc >= nextRegistrationRefreshDueUtc)
{
try
{
await SendRegistrationRefreshAsync(cancellationToken);
_logger.LogDebug(
"Sent periodic HELLO refresh for {ServiceName}/{Version}",
_options.ServiceName,
_options.Version);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to send periodic HELLO refresh");
}
nextRegistrationRefreshDueUtc = nowUtc + _options.RegistrationRefreshInterval;
}
if (_microserviceTransport is not null && nowUtc >= nextHeartbeatDueUtc)
{
var heartbeat = new HeartbeatPayload
{
Instance = CreateInstanceDescriptor(),
InstanceId = _options.InstanceId,
Status = _currentStatus,
InFlightRequestCount = _inFlightRequestCount,
@@ -407,15 +399,22 @@ public sealed class RouterConnectionManager : IRouterConnectionManager, IDisposa
}
}
private async Task SendRegistrationRefreshAsync(CancellationToken cancellationToken)
private async Task HandleResyncRequestedAsync(
RegistrationResyncRequestPayload request,
CancellationToken cancellationToken)
{
if (_microserviceTransport is null || _endpoints is null)
{
return;
}
await _microserviceTransport.ConnectAsync(
CreateInstanceDescriptor(),
_logger.LogInformation(
"Router requested endpoint metadata replay for {ServiceName}/{Version}: {Reason}",
_options.ServiceName,
_options.Version,
request.Reason);
await _microserviceTransport.SendEndpointsUpdateAsync(
_endpoints,
_schemas,
_openApiInfo,

View File

@@ -49,6 +49,18 @@ public interface IMicroserviceTransport
/// <param name="cancellationToken">Cancellation token.</param>
Task SendHeartbeatAsync(HeartbeatPayload heartbeat, CancellationToken cancellationToken);
/// <summary>
/// Sends the current endpoint/schema/OpenAPI metadata after the router explicitly requests a replay.
/// </summary>
Task SendEndpointsUpdateAsync(
IReadOnlyList<EndpointDescriptor> endpoints,
IReadOnlyDictionary<string, SchemaDefinition>? schemas,
ServiceOpenApiInfo? openApiInfo,
CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
/// <summary>
/// Event raised when a REQUEST frame is received from the gateway.
/// </summary>
@@ -59,6 +71,11 @@ public interface IMicroserviceTransport
/// </summary>
event Func<Guid, string?, Task>? OnCancelReceived;
/// <summary>
/// Event raised when the router explicitly requests a fresh endpoint metadata replay.
/// </summary>
event Func<RegistrationResyncRequestPayload, CancellationToken, Task>? OnResyncRequested { add { } remove { } }
/// <summary>
/// Raised when the transport connection is permanently lost and cannot recover.
/// Consumers should reconnect by calling <see cref="ConnectAsync"/> again.

View File

@@ -40,6 +40,11 @@ public enum FrameType
/// </summary>
Cancel,
/// <summary>
/// Gateway-to-service control frame requesting a fresh endpoint metadata replay.
/// </summary>
ResyncRequest,
/// <summary>
/// Optional frame for updating endpoint metadata at runtime.
/// </summary>

View File

@@ -0,0 +1,23 @@
namespace StellaOps.Router.Common.Models;
/// <summary>
/// Payload for the EndpointsUpdate frame sent by a microservice after the router requests a metadata replay.
/// </summary>
public sealed record EndpointsUpdatePayload
{
/// <summary>
/// Gets the endpoints registered by this instance.
/// </summary>
public required IReadOnlyList<EndpointDescriptor> Endpoints { get; init; }
/// <summary>
/// Gets the schema definitions for request/response validation.
/// Keys are schema IDs referenced by EndpointDescriptor.SchemaInfo.
/// </summary>
public IReadOnlyDictionary<string, SchemaDefinition> Schemas { get; init; } = new Dictionary<string, SchemaDefinition>();
/// <summary>
/// Gets the OpenAPI metadata for this service.
/// </summary>
public ServiceOpenApiInfo? OpenApiInfo { get; init; }
}

View File

@@ -7,6 +7,11 @@ namespace StellaOps.Router.Common.Models;
/// </summary>
public sealed record HeartbeatPayload
{
/// <summary>
/// Gets the instance descriptor.
/// </summary>
public InstanceDescriptor? Instance { get; init; }
/// <summary>
/// Gets the instance ID.
/// </summary>

View File

@@ -0,0 +1,12 @@
namespace StellaOps.Router.Common.Models;
/// <summary>
/// Payload for the ResyncRequest frame sent by the router when it needs a fresh endpoint metadata replay.
/// </summary>
public sealed record RegistrationResyncRequestPayload
{
/// <summary>
/// Gets the reason for the replay request.
/// </summary>
public string Reason { get; init; } = "router-requested-resync";
}

View File

@@ -51,6 +51,9 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
/// <inheritdoc />
public event Func<Guid, string?, Task>? OnCancelReceived;
/// <inheritdoc />
public event Func<RegistrationResyncRequestPayload, CancellationToken, Task>? OnResyncRequested;
/// <summary>
/// Initializes a new instance of the <see cref="MessagingTransportClient"/> class.
/// </summary>
@@ -100,9 +103,7 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
var helloPayload = new HelloPayload
{
Instance = instance,
Endpoints = endpoints,
Schemas = schemas ?? new Dictionary<string, SchemaDefinition>(),
OpenApiInfo = openApiInfo
Endpoints = []
};
var helloMessage = new RpcRequestMessage
@@ -129,7 +130,7 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
else
{
_logger.LogDebug(
"Refreshed messaging registration for {ServiceName}/{Version} instance {InstanceId} on connection {ConnectionId}",
"Replayed slim messaging HELLO for {ServiceName}/{Version} instance {InstanceId} on connection {ConnectionId}",
instance.ServiceName,
instance.Version,
instance.InstanceId,
@@ -235,32 +236,20 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
{
try
{
_logger.LogWarning("[DIAG] Consumer loop: calling LeaseAsync (batch={BatchSize})", _options.BatchSize);
var sw = System.Diagnostics.Stopwatch.StartNew();
var leases = await _serviceIncomingQueue.LeaseAsync(
new LeaseRequest { BatchSize = _options.BatchSize },
cancellationToken);
sw.Stop();
consecutiveFailures = 0;
if (leases.Count > 0)
{
_logger.LogWarning("[DIAG] Consumer loop: leased {Count} messages in {ElapsedMs}ms, processing concurrently",
leases.Count, sw.ElapsedMilliseconds);
await Task.WhenAll(leases.Select(lease =>
ProcessLeaseWithGuardAsync(lease, HandleIncomingRequestAsync, cancellationToken)));
_logger.LogWarning("[DIAG] Consumer loop: finished processing {Count} messages", leases.Count);
}
else
{
_logger.LogWarning("[DIAG] Consumer loop: no messages, waiting for notification (lease took {ElapsedMs}ms)",
sw.ElapsedMilliseconds);
await _serviceIncomingQueue.WaitForMessagesAsync(_options.HeartbeatInterval, cancellationToken);
_logger.LogWarning("[DIAG] Consumer loop: notification received or timeout, resuming");
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
@@ -299,6 +288,12 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
return;
}
if (message.FrameType == FrameType.ResyncRequest)
{
await HandleResyncRequestAsync(message, cancellationToken);
return;
}
if (message.FrameType is not (FrameType.Request or FrameType.RequestStreamData))
{
return;
@@ -387,6 +382,27 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
}
}
private async Task HandleResyncRequestAsync(
RpcResponseMessage message,
CancellationToken cancellationToken)
{
if (OnResyncRequested is null)
{
_logger.LogDebug(
"Ignoring router resync request for {ConnectionId}; no replay handler is registered",
message.ConnectionId);
return;
}
var request = string.IsNullOrEmpty(message.PayloadBase64)
? new RegistrationResyncRequestPayload()
: JsonSerializer.Deserialize<RegistrationResyncRequestPayload>(
Convert.FromBase64String(message.PayloadBase64),
_jsonOptions) ?? new RegistrationResyncRequestPayload();
await OnResyncRequested(request, cancellationToken);
}
/// <inheritdoc />
public async Task<Frame> SendRequestAsync(
ConnectionState connection,
@@ -553,6 +569,38 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
await _requestQueue.EnqueueAsync(message, cancellationToken: cancellationToken);
}
/// <inheritdoc />
public async Task SendEndpointsUpdateAsync(
IReadOnlyList<EndpointDescriptor> endpoints,
IReadOnlyDictionary<string, SchemaDefinition>? schemas,
ServiceOpenApiInfo? openApiInfo,
CancellationToken cancellationToken)
{
if (_requestQueue is null || _connectionId is null)
{
throw new InvalidOperationException("Not connected");
}
var payload = new EndpointsUpdatePayload
{
Endpoints = endpoints,
Schemas = schemas ?? new Dictionary<string, SchemaDefinition>(),
OpenApiInfo = openApiInfo
};
var message = new RpcRequestMessage
{
CorrelationId = Guid.NewGuid().ToString("N"),
ConnectionId = _connectionId,
TargetService = "gateway",
FrameType = FrameType.EndpointsUpdate,
PayloadBase64 = Convert.ToBase64String(JsonSerializer.SerializeToUtf8Bytes(payload, _jsonOptions)),
SenderInstanceId = _instance?.InstanceId
};
await _requestQueue.EnqueueAsync(message, cancellationToken: cancellationToken);
}
/// <inheritdoc />
public async Task DisconnectAsync()
{
@@ -707,33 +755,29 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
CancellationToken cancellationToken)
where TMessage : class
{
var sw = System.Diagnostics.Stopwatch.StartNew();
try
{
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeoutCts.CancelAfter(TimeSpan.FromSeconds(55));
_logger.LogWarning("[DIAG] Guard: processing message {MessageId}", lease.MessageId);
// WaitAsync abandons handlers that ignore CancellationToken (e.g.,
// StackExchange.Redis commands with their own internal timeout).
// The handler continues in background but the consumer loop is unblocked.
await handler(lease, timeoutCts.Token).WaitAsync(timeoutCts.Token);
sw.Stop();
_logger.LogWarning("[DIAG] Guard: message {MessageId} processed in {ElapsedMs}ms", lease.MessageId, sw.ElapsedMilliseconds);
await lease.AcknowledgeAsync(cancellationToken);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
_logger.LogWarning("[DIAG] Guard: message {MessageId} cancelled (shutdown) after {ElapsedMs}ms", lease.MessageId, sw.ElapsedMilliseconds);
// Graceful shutdown.
}
catch (OperationCanceledException)
{
_logger.LogWarning("Guard: message {MessageId} TIMED OUT after {ElapsedMs}ms (55s limit)", lease.MessageId, sw.ElapsedMilliseconds);
_logger.LogWarning("Guard: message {MessageId} timed out after 55s", lease.MessageId);
await TryReleaseAsync(lease, ReleaseDisposition.Retry, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Guard: message {MessageId} FAILED after {ElapsedMs}ms", lease.MessageId, sw.ElapsedMilliseconds);
_logger.LogError(ex, "Guard: message {MessageId} failed", lease.MessageId);
await TryReleaseAsync(lease, ReleaseDisposition.Retry, cancellationToken);
}
}

View File

@@ -44,6 +44,11 @@ public sealed class MessagingTransportServer : ITransportServer, IDisposable
/// </summary>
public event Func<ConnectionState, HeartbeatPayload, Task>? OnHeartbeatReceived;
/// <summary>
/// Event raised when a microservice replays endpoint metadata after a router resync request.
/// </summary>
public event Func<ConnectionState, EndpointsUpdatePayload, Task>? OnEndpointsUpdated;
/// <summary>
/// Event raised when a RESPONSE frame is received.
/// </summary>
@@ -229,6 +234,10 @@ public sealed class MessagingTransportServer : ITransportServer, IDisposable
await HandleHeartbeatMessageAsync(message, cancellationToken);
break;
case FrameType.EndpointsUpdate:
await HandleEndpointsUpdateMessageAsync(message, cancellationToken);
break;
case FrameType.Response:
case FrameType.ResponseStreamData:
// Response from microservice to gateway - route to pending request
@@ -302,17 +311,37 @@ public sealed class MessagingTransportServer : ITransportServer, IDisposable
private async Task HandleHeartbeatMessageAsync(RpcRequestMessage message, CancellationToken cancellationToken)
{
var payload = JsonSerializer.Deserialize<HeartbeatPayload>(
Convert.FromBase64String(message.PayloadBase64), _jsonOptions);
if (!_connections.TryGetValue(message.ConnectionId, out var state))
{
_logger.LogWarning("Heartbeat from unknown connection {ConnectionId}", message.ConnectionId);
return;
if (payload?.Instance is null)
{
_logger.LogWarning("Heartbeat from unknown connection {ConnectionId}", message.ConnectionId);
return;
}
state = new ConnectionState
{
ConnectionId = message.ConnectionId,
Instance = payload.Instance,
Status = payload.Status,
LastHeartbeatUtc = DateTime.UtcNow,
TransportType = TransportType.Messaging
};
_connections[message.ConnectionId] = state;
_logger.LogInformation(
"Heartbeat discovered unknown messaging connection {ConnectionId} for {ServiceName}/{Version}; awaiting endpoint replay",
message.ConnectionId,
state.Instance.ServiceName,
state.Instance.Version);
}
state.LastHeartbeatUtc = DateTime.UtcNow;
var payload = JsonSerializer.Deserialize<HeartbeatPayload>(
Convert.FromBase64String(message.PayloadBase64), _jsonOptions);
if (payload is not null)
{
state.Status = payload.Status;
@@ -326,6 +355,58 @@ public sealed class MessagingTransportServer : ITransportServer, IDisposable
}
}
private async Task HandleEndpointsUpdateMessageAsync(
RpcRequestMessage message,
CancellationToken cancellationToken)
{
if (!_connections.TryGetValue(message.ConnectionId, out var existing))
{
_logger.LogWarning(
"Endpoint metadata replay arrived for unknown connection {ConnectionId}",
message.ConnectionId);
return;
}
var payload = JsonSerializer.Deserialize<EndpointsUpdatePayload>(
Convert.FromBase64String(message.PayloadBase64),
_jsonOptions);
if (payload is null)
{
_logger.LogWarning("Invalid EndpointsUpdate payload from {ConnectionId}", message.ConnectionId);
return;
}
var state = new ConnectionState
{
ConnectionId = existing.ConnectionId,
Instance = existing.Instance,
Status = existing.Status,
LastHeartbeatUtc = DateTime.UtcNow,
TransportType = existing.TransportType,
Schemas = payload.Schemas,
OpenApiInfo = payload.OpenApiInfo
};
foreach (var endpoint in payload.Endpoints)
{
state.Endpoints[(endpoint.Method, endpoint.Path)] = endpoint;
}
_connections[message.ConnectionId] = state;
_logger.LogInformation(
"Endpoint metadata replay received from {ServiceName}/{Version} instance {InstanceId} via messaging",
state.Instance.ServiceName,
state.Instance.Version,
state.Instance.InstanceId);
if (OnEndpointsUpdated is not null)
{
await OnEndpointsUpdated(state, payload);
}
}
/// <summary>
/// Sends a request frame to a microservice via messaging.
/// </summary>

View File

@@ -5,6 +5,6 @@ Source of truth: `docs/implplan/SPRINT_20260130_002_Tools_csproj_remediation_sol
| Task ID | Status | Notes |
| --- | --- | --- |
| RTR-MSG-002 | DONE | `docs-archived/implplan/SPRINT_20260307_008_Router_integrations_messaging_reregistration_stability.md` - periodic HELLO re-registration now reuses the existing messaging connection/receive loop instead of minting duplicate logical connections. |
| RVM-06 | DONE | Updated messaging microservice HELLO payload to include schemas/OpenAPI metadata via the new schema-aware `IMicroserviceTransport.ConnectAsync(...)` overload. |
| RVM-06 | DONE | Messaging HELLO now stays slim and identity-only; the router requests full endpoint/schema/OpenAPI replay explicitly via `ResyncRequest` and `EndpointsUpdate` frames when startup or administration needs metadata. |
| REMED-05 | TODO | Remediation checklist: docs/implplan/audits/csproj-standards/remediation/checklists/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/StellaOps.Router.Transport.Messaging.md. |
| REMED-06 | DONE | SOLID review notes captured for SPRINT_20260130_002. |

View File

@@ -60,11 +60,17 @@ public sealed class MessagingTransportIntegrationTests
tcpServer,
tlsServer,
messagingServer);
var resyncService = new GatewayRegistrationResyncService(
routingState.Object,
transportClient,
TimeProvider.System,
NullLogger<GatewayRegistrationResyncService>.Instance);
// Act & Assert - construction should succeed with messaging server
var hostedService = new GatewayHostedService(
routingState.Object,
transportClient,
resyncService,
claimsStore.Object,
gatewayOptions,
new GatewayServiceStatus(),
@@ -96,11 +102,17 @@ public sealed class MessagingTransportIntegrationTests
tcpServer,
tlsServer,
messagingServer: null);
var resyncService = new GatewayRegistrationResyncService(
routingState.Object,
transportClient,
TimeProvider.System,
NullLogger<GatewayRegistrationResyncService>.Instance);
// Act & Assert - construction should succeed without messaging server
var hostedService = new GatewayHostedService(
routingState.Object,
transportClient,
resyncService,
claimsStore.Object,
gatewayOptions,
new GatewayServiceStatus(),

View File

@@ -0,0 +1,124 @@
using System.Text.Json;
using Microsoft.Extensions.Logging.Abstractions;
using Moq;
using StellaOps.Gateway.WebService.Services;
using StellaOps.Router.Common.Abstractions;
using StellaOps.Router.Common.Enums;
using StellaOps.Router.Common.Models;
namespace StellaOps.Gateway.WebService.Tests.Services;
public sealed class GatewayRegistrationResyncServiceTests
{
private static readonly JsonSerializerOptions JsonOptions = new()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};
[Fact]
public async Task RequestResyncAsync_TargetConnection_SendsResyncFrame()
{
var connection = CreateConnectionState("conn-1");
var routingState = new Mock<IGlobalRoutingState>();
routingState.Setup(state => state.GetAllConnections()).Returns([connection]);
var transportClient = new Mock<IGatewayTransportClient>();
Frame? capturedFrame = null;
transportClient
.Setup(client => client.SendFrameAsync(connection, It.IsAny<Frame>(), It.IsAny<CancellationToken>()))
.Callback<ConnectionState, Frame, CancellationToken>((_, frame, _) => capturedFrame = frame)
.Returns(Task.CompletedTask);
var service = new GatewayRegistrationResyncService(
routingState.Object,
transportClient.Object,
TimeProvider.System,
NullLogger<GatewayRegistrationResyncService>.Instance);
var result = await service.RequestResyncAsync("conn-1", "unit-test", force: true, CancellationToken.None);
result.MatchedCount.Should().Be(1);
result.RequestedCount.Should().Be(1);
capturedFrame.Should().NotBeNull();
capturedFrame!.Type.Should().Be(FrameType.ResyncRequest);
var payload = JsonSerializer.Deserialize<RegistrationResyncRequestPayload>(capturedFrame.Payload.Span, JsonOptions);
payload.Should().NotBeNull();
payload!.Reason.Should().Be("unit-test");
}
[Fact]
public async Task RequestResyncAsync_UsesCooldownUnlessForced()
{
var connection = CreateConnectionState("conn-1");
var routingState = new Mock<IGlobalRoutingState>();
routingState.Setup(state => state.GetAllConnections()).Returns([connection]);
var transportClient = new Mock<IGatewayTransportClient>();
transportClient
.Setup(client => client.SendFrameAsync(connection, It.IsAny<Frame>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);
var service = new GatewayRegistrationResyncService(
routingState.Object,
transportClient.Object,
TimeProvider.System,
NullLogger<GatewayRegistrationResyncService>.Instance);
var first = await service.RequestResyncAsync("conn-1", "first", force: false, CancellationToken.None);
var second = await service.RequestResyncAsync("conn-1", "second", force: false, CancellationToken.None);
var third = await service.RequestResyncAsync("conn-1", "forced", force: true, CancellationToken.None);
first.RequestedCount.Should().Be(1);
second.RequestedCount.Should().Be(0);
third.RequestedCount.Should().Be(1);
transportClient.Verify(
client => client.SendFrameAsync(connection, It.IsAny<Frame>(), It.IsAny<CancellationToken>()),
Times.Exactly(2));
}
[Fact]
public async Task ClearPending_AllowsImmediateRetry()
{
var connection = CreateConnectionState("conn-1");
var routingState = new Mock<IGlobalRoutingState>();
routingState.Setup(state => state.GetAllConnections()).Returns([connection]);
var transportClient = new Mock<IGatewayTransportClient>();
transportClient
.Setup(client => client.SendFrameAsync(connection, It.IsAny<Frame>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);
var service = new GatewayRegistrationResyncService(
routingState.Object,
transportClient.Object,
TimeProvider.System,
NullLogger<GatewayRegistrationResyncService>.Instance);
await service.RequestResyncAsync("conn-1", "first", force: false, CancellationToken.None);
service.ClearPending("conn-1");
var result = await service.RequestResyncAsync("conn-1", "second", force: false, CancellationToken.None);
result.RequestedCount.Should().Be(1);
transportClient.Verify(
client => client.SendFrameAsync(connection, It.IsAny<Frame>(), It.IsAny<CancellationToken>()),
Times.Exactly(2));
}
private static ConnectionState CreateConnectionState(string connectionId)
{
return new ConnectionState
{
ConnectionId = connectionId,
Instance = new InstanceDescriptor
{
InstanceId = "svc-1",
ServiceName = "timelineindexer",
Version = "1.0.0",
Region = "local"
},
TransportType = TransportType.Messaging
};
}
}

View File

@@ -388,7 +388,7 @@ public sealed class RouterConnectionManagerTests : IDisposable
[Trait("Category", TestCategories.Unit)]
[Fact]
public async Task StartAsync_ReplaysHelloWithinRegistrationRefreshInterval()
public async Task StartAsync_DoesNotReplayHelloWithinRegistrationRefreshInterval()
{
// Arrange
_options.Routers.Add(new RouterEndpointConfig
@@ -398,7 +398,6 @@ public sealed class RouterConnectionManagerTests : IDisposable
TransportType = TransportType.InMemory
});
var registrationReplayObserved = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var connectCount = 0;
_transportMock
.Setup(t => t.ConnectAsync(
@@ -409,10 +408,7 @@ public sealed class RouterConnectionManagerTests : IDisposable
It.IsAny<CancellationToken>()))
.Callback(() =>
{
if (Interlocked.Increment(ref connectCount) >= 2)
{
registrationReplayObserved.TrySetResult();
}
Interlocked.Increment(ref connectCount);
})
.Returns(Task.CompletedTask);
@@ -420,11 +416,54 @@ public sealed class RouterConnectionManagerTests : IDisposable
// Act
await manager.StartAsync(CancellationToken.None);
await registrationReplayObserved.Task.WaitAsync(TimeSpan.FromSeconds(2));
await Task.Delay(150);
await manager.StopAsync(CancellationToken.None);
// Assert
connectCount.Should().BeGreaterThanOrEqualTo(2);
connectCount.Should().Be(1);
}
[Trait("Category", TestCategories.Unit)]
[Fact]
public async Task ResyncRequest_ReplaysEndpointMetadataWithoutReconnect()
{
// Arrange
_options.Routers.Add(new RouterEndpointConfig
{
Host = "localhost",
Port = 5000,
TransportType = TransportType.InMemory
});
var endpoints = new List<EndpointDescriptor>
{
new() { ServiceName = "test-service", Version = "1.0.0", Method = "GET", Path = "/api/users" }
};
_discoveryProviderMock.Setup(d => d.DiscoverEndpoints()).Returns(endpoints);
var transport = new RecordingMicroserviceTransport();
using var manager = new RouterConnectionManager(
Options.Create(_options),
_discoveryProviderMock.Object,
_requestDispatcherMock.Object,
transport,
NullLogger<RouterConnectionManager>.Instance);
// Act
await manager.StartAsync(CancellationToken.None);
await transport.RaiseResyncRequestedAsync(new RegistrationResyncRequestPayload
{
Reason = "unit-test"
});
await manager.StopAsync(CancellationToken.None);
// Assert
transport.ConnectCount.Should().Be(1);
transport.EndpointUpdateCount.Should().Be(1);
transport.LastEndpointUpdate.Should().NotBeNull();
transport.LastEndpointUpdate!.Endpoints.Should().ContainSingle();
transport.LastEndpointUpdate.Endpoints[0].Path.Should().Be("/api/users");
}
#endregion
@@ -471,4 +510,69 @@ public sealed class RouterConnectionManagerTests : IDisposable
public IReadOnlyDictionary<string, SchemaDefinition> DiscoverSchemaDefinitions() => _schemas;
}
private sealed class RecordingMicroserviceTransport : IMicroserviceTransport
{
public int ConnectCount { get; private set; }
public int EndpointUpdateCount { get; private set; }
public EndpointsUpdatePayload? LastEndpointUpdate { get; private set; }
public event Func<Frame, CancellationToken, Task<Frame>>? OnRequestReceived;
public event Func<Guid, string?, Task>? OnCancelReceived;
public event Func<RegistrationResyncRequestPayload, CancellationToken, Task>? OnResyncRequested;
public event Action? TransportDied;
public Task ConnectAsync(
InstanceDescriptor instance,
IReadOnlyList<EndpointDescriptor> endpoints,
CancellationToken cancellationToken)
{
ConnectCount++;
return Task.CompletedTask;
}
public Task ConnectAsync(
InstanceDescriptor instance,
IReadOnlyList<EndpointDescriptor> endpoints,
IReadOnlyDictionary<string, SchemaDefinition>? schemas,
ServiceOpenApiInfo? openApiInfo,
CancellationToken cancellationToken)
{
ConnectCount++;
return Task.CompletedTask;
}
public Task DisconnectAsync() => Task.CompletedTask;
public Task SendHeartbeatAsync(HeartbeatPayload heartbeat, CancellationToken cancellationToken) => Task.CompletedTask;
public Task SendEndpointsUpdateAsync(
IReadOnlyList<EndpointDescriptor> endpoints,
IReadOnlyDictionary<string, SchemaDefinition>? schemas,
ServiceOpenApiInfo? openApiInfo,
CancellationToken cancellationToken)
{
EndpointUpdateCount++;
LastEndpointUpdate = new EndpointsUpdatePayload
{
Endpoints = endpoints,
Schemas = schemas ?? new Dictionary<string, SchemaDefinition>(),
OpenApiInfo = openApiInfo
};
return Task.CompletedTask;
}
public Task RaiseResyncRequestedAsync(RegistrationResyncRequestPayload request)
{
return OnResyncRequested is null
? Task.CompletedTask
: OnResyncRequested.Invoke(request, CancellationToken.None);
}
}
}

View File

@@ -89,7 +89,7 @@ public sealed class MessagingTransportQueueOptionsTests
[Trait("Category", TestCategories.Unit)]
[Fact]
public async Task MessagingTransportClient_ConnectAsync_IncludesSchemasAndOpenApiInfoInHelloPayload()
public async Task MessagingTransportClient_ConnectAsync_SendsSlimHelloPayload()
{
var options = Options.Create(new MessagingTransportOptions
{
@@ -152,7 +152,82 @@ public sealed class MessagingTransportQueueOptionsTests
new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase });
payload.Should().NotBeNull();
payload!.Schemas.Should().ContainKey(schemaId);
payload!.Instance.ServiceName.Should().Be("timelineindexer");
payload.Endpoints.Should().BeEmpty();
payload.Schemas.Should().BeEmpty();
payload.OpenApiInfo.Should().BeNull();
}
[Trait("Category", TestCategories.Unit)]
[Fact]
public async Task MessagingTransportClient_SendEndpointsUpdateAsync_IncludesSchemasAndOpenApiInfoInUpdatePayload()
{
var options = Options.Create(new MessagingTransportOptions
{
ConsumerGroup = "timelineindexer-test",
BatchSize = 1
});
var queueFactory = new RecordingQueueFactory();
var client = new MessagingTransportClient(
queueFactory,
options,
NullLogger<MessagingTransportClient>.Instance);
var instance = new InstanceDescriptor
{
InstanceId = "timelineindexer-1",
ServiceName = "timelineindexer",
Version = "1.0.0",
Region = "local"
};
var endpoints =
new[]
{
new EndpointDescriptor
{
ServiceName = "timelineindexer",
Version = "1.0.0",
Method = "GET",
Path = "/api/v1/timeline"
}
};
var schemaId = "TimelineEvent";
var schemas = new Dictionary<string, SchemaDefinition>
{
[schemaId] = new SchemaDefinition
{
SchemaId = schemaId,
SchemaJson = "{\"type\":\"object\"}",
ETag = "abc123"
}
};
await client.ConnectAsync(instance, endpoints, CancellationToken.None);
await client.SendEndpointsUpdateAsync(
endpoints,
schemas,
new ServiceOpenApiInfo
{
Title = "timelineindexer",
Description = "Timeline service"
},
CancellationToken.None);
await client.DisconnectAsync();
var updateMessage = queueFactory.EnqueuedMessages
.OfType<RpcRequestMessage>()
.First(message => message.FrameType == Common.Enums.FrameType.EndpointsUpdate);
var payload = JsonSerializer.Deserialize<EndpointsUpdatePayload>(
Convert.FromBase64String(updateMessage.PayloadBase64),
new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase });
payload.Should().NotBeNull();
payload!.Endpoints.Should().ContainSingle();
payload.Schemas.Should().ContainKey(schemaId);
payload.Schemas[schemaId].SchemaJson.Should().Be("{\"type\":\"object\"}");
payload.OpenApiInfo.Should().NotBeNull();
payload.OpenApiInfo!.Title.Should().Be("timelineindexer");