diff --git a/src/Router/StellaOps.Gateway.WebService/Program.cs b/src/Router/StellaOps.Gateway.WebService/Program.cs index 4d6759b38..8cb43f8a9 100644 --- a/src/Router/StellaOps.Gateway.WebService/Program.cs +++ b/src/Router/StellaOps.Gateway.WebService/Program.cs @@ -107,13 +107,16 @@ RegisterGatewayTransportIfEnabled("tls", bootstrapOptions.Transports.Tls.Enabled RegisterGatewayTransportIfEnabled("messaging", bootstrapOptions.Transports.Messaging.Enabled, "Gateway:Transports:Messaging"); builder.Services.AddSingleton(); +builder.Services.AddSingleton(sp => sp.GetRequiredService()); builder.Services.AddSingleton(sp => sp.GetRequiredService()); +builder.Services.AddSingleton(); builder.Services.AddSingleton(new GatewayRouteCatalog(bootstrapOptions.Routes)); builder.Services.AddSingleton(); builder.Services.AddSingleton(); -builder.Services.AddHostedService(); +builder.Services.AddSingleton(); +builder.Services.AddHostedService(sp => sp.GetRequiredService()); builder.Services.AddHostedService(); builder.Services.AddSingleton(); @@ -215,6 +218,26 @@ if (bootstrapOptions.OpenApi.Enabled) app.MapRouterOpenApi(); } +app.MapPost( + "/api/v1/gateway/administration/router/resync", + async ( + GatewayRegistrationResyncRequest? request, + GatewayRegistrationResyncService resyncService, + CancellationToken cancellationToken) => + { + var result = await resyncService.RequestResyncAsync( + request?.ConnectionId, + string.IsNullOrWhiteSpace(request?.Reason) + ? "administration-request" + : request!.Reason!, + force: true, + cancellationToken); + + return result.MatchedCount == 0 + ? Results.NotFound(result) + : Results.Ok(result); + }); + app.UseWhen( context => !GatewayRoutes.IsSystemPath(context.Request.Path), branch => diff --git a/src/Router/StellaOps.Gateway.WebService/Services/GatewayHostedService.cs b/src/Router/StellaOps.Gateway.WebService/Services/GatewayHostedService.cs index 065f83e58..5d150ed21 100644 --- a/src/Router/StellaOps.Gateway.WebService/Services/GatewayHostedService.cs +++ b/src/Router/StellaOps.Gateway.WebService/Services/GatewayHostedService.cs @@ -20,7 +20,8 @@ public sealed class GatewayHostedService : IHostedService private readonly TlsTransportServer? _tlsServer; private readonly MessagingTransportServer? _messagingServer; private readonly IGlobalRoutingState _routingState; - private readonly GatewayTransportClient _transportClient; + private readonly IGatewayTransportClient _transportClient; + private readonly GatewayRegistrationResyncService _registrationResyncService; private readonly IEffectiveClaimsStore _claimsStore; private readonly IRouterOpenApiDocumentCache? _openApiCache; private readonly IOptions _options; @@ -33,7 +34,8 @@ public sealed class GatewayHostedService : IHostedService public GatewayHostedService( IGlobalRoutingState routingState, - GatewayTransportClient transportClient, + IGatewayTransportClient transportClient, + GatewayRegistrationResyncService registrationResyncService, IEffectiveClaimsStore claimsStore, IOptions options, GatewayServiceStatus status, @@ -48,6 +50,7 @@ public sealed class GatewayHostedService : IHostedService _messagingServer = messagingServer; _routingState = routingState; _transportClient = transportClient; + _registrationResyncService = registrationResyncService; _claimsStore = claimsStore; _options = options; _status = status; @@ -110,6 +113,7 @@ public sealed class GatewayHostedService : IHostedService { _messagingServer.OnHelloReceived += HandleMessagingHello; _messagingServer.OnHeartbeatReceived += HandleMessagingHeartbeat; + _messagingServer.OnEndpointsUpdated += HandleMessagingEndpointsUpdated; _messagingServer.OnResponseReceived += HandleMessagingResponse; _messagingServer.OnConnectionClosed += HandleMessagingDisconnection; await _messagingServer.StartAsync(cancellationToken); @@ -149,6 +153,7 @@ public sealed class GatewayHostedService : IHostedService await _messagingServer.StopAsync(cancellationToken); _messagingServer.OnHelloReceived -= HandleMessagingHello; _messagingServer.OnHeartbeatReceived -= HandleMessagingHeartbeat; + _messagingServer.OnEndpointsUpdated -= HandleMessagingEndpointsUpdated; _messagingServer.OnResponseReceived -= HandleMessagingResponse; _messagingServer.OnConnectionClosed -= HandleMessagingDisconnection; } @@ -487,32 +492,50 @@ public sealed class GatewayHostedService : IHostedService #region Messaging Transport Event Handlers - private Task HandleMessagingHello(ConnectionState state, HelloPayload payload) + private async Task HandleMessagingHello(ConnectionState state, HelloPayload payload) { - // The MessagingTransportServer already built the ConnectionState with TransportType.Messaging - // We need to add it to the routing state and update the claims store _routingState.AddConnection(state); - _claimsStore.UpdateFromMicroservice(payload.Instance.ServiceName, payload.Endpoints); - _openApiCache?.Invalidate(); + if (payload.Endpoints.Count > 0) + { + _claimsStore.UpdateFromMicroservice(payload.Instance.ServiceName, payload.Endpoints); + _openApiCache?.Invalidate(); + _registrationResyncService.ClearPending(state.ConnectionId); - _logger.LogInformation( - "Messaging connection registered: {ConnectionId} service={ServiceName} version={Version}", - state.ConnectionId, - state.Instance.ServiceName, - state.Instance.Version); - - return Task.CompletedTask; + _logger.LogInformation( + "Messaging connection registered: {ConnectionId} service={ServiceName} version={Version}", + state.ConnectionId, + state.Instance.ServiceName, + state.Instance.Version); + } + else + { + await _registrationResyncService.RequestResyncAsync( + state.ConnectionId, + "service-startup", + force: true, + CancellationToken.None); + } } - private Task HandleMessagingHeartbeat(ConnectionState state, HeartbeatPayload payload) + private async Task HandleMessagingHeartbeat(ConnectionState state, HeartbeatPayload payload) { + var knownConnection = _routingState.GetConnection(state.ConnectionId); + if (knownConnection is null) + { + _routingState.AddConnection(state); + await _registrationResyncService.RequestResyncAsync( + state.ConnectionId, + "gateway-state-miss", + force: false, + CancellationToken.None); + return; + } + _routingState.UpdateConnection(state.ConnectionId, conn => { conn.LastHeartbeatUtc = DateTime.UtcNow; conn.Status = payload.Status; }); - - return Task.CompletedTask; } private Task HandleMessagingResponse(ConnectionState state, Frame frame) @@ -521,8 +544,18 @@ public sealed class GatewayHostedService : IHostedService return Task.CompletedTask; } + private Task HandleMessagingEndpointsUpdated(ConnectionState state, EndpointsUpdatePayload payload) + { + _routingState.AddConnection(state); + _claimsStore.UpdateFromMicroservice(state.Instance.ServiceName, payload.Endpoints); + _openApiCache?.Invalidate(); + _registrationResyncService.ClearPending(state.ConnectionId); + return Task.CompletedTask; + } + private Task HandleMessagingDisconnection(string connectionId) { + _registrationResyncService.ClearPending(connectionId); HandleDisconnect(connectionId); return Task.CompletedTask; } diff --git a/src/Router/StellaOps.Gateway.WebService/Services/GatewayRegistrationResyncRequest.cs b/src/Router/StellaOps.Gateway.WebService/Services/GatewayRegistrationResyncRequest.cs new file mode 100644 index 000000000..8a48e7c66 --- /dev/null +++ b/src/Router/StellaOps.Gateway.WebService/Services/GatewayRegistrationResyncRequest.cs @@ -0,0 +1,3 @@ +namespace StellaOps.Gateway.WebService.Services; + +public sealed record GatewayRegistrationResyncRequest(string? ConnectionId, string? Reason); diff --git a/src/Router/StellaOps.Gateway.WebService/Services/GatewayRegistrationResyncService.cs b/src/Router/StellaOps.Gateway.WebService/Services/GatewayRegistrationResyncService.cs new file mode 100644 index 000000000..fbee72260 --- /dev/null +++ b/src/Router/StellaOps.Gateway.WebService/Services/GatewayRegistrationResyncService.cs @@ -0,0 +1,129 @@ +using System.Collections.Concurrent; +using System.Text.Json; +using StellaOps.Router.Common.Abstractions; +using StellaOps.Router.Common.Enums; +using StellaOps.Router.Common.Models; + +namespace StellaOps.Gateway.WebService.Services; + +public sealed class GatewayRegistrationResyncService +{ + private static readonly TimeSpan RequestCooldown = TimeSpan.FromSeconds(15); + private static readonly JsonSerializerOptions JsonOptions = new() + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + WriteIndented = false + }; + + private readonly IGlobalRoutingState _routingState; + private readonly IGatewayTransportClient _transportClient; + private readonly TimeProvider _timeProvider; + private readonly ILogger _logger; + private readonly ConcurrentDictionary _lastRequestUtc = new(StringComparer.Ordinal); + + public GatewayRegistrationResyncService( + IGlobalRoutingState routingState, + IGatewayTransportClient transportClient, + TimeProvider timeProvider, + ILogger logger) + { + _routingState = routingState; + _transportClient = transportClient; + _timeProvider = timeProvider; + _logger = logger; + } + + public async Task RequestResyncAsync( + string? connectionId, + string reason, + bool force, + CancellationToken cancellationToken) + { + var connections = string.IsNullOrWhiteSpace(connectionId) + ? _routingState.GetAllConnections() + : _routingState.GetAllConnections() + .Where(connection => string.Equals(connection.ConnectionId, connectionId, StringComparison.Ordinal)) + .ToArray(); + + var requestedCount = 0; + var skippedCount = 0; + + foreach (var connection in connections) + { + if (await TryRequestResyncAsync(connection, reason, force, cancellationToken)) + { + requestedCount++; + } + else + { + skippedCount++; + } + } + + return new GatewayRegistrationResyncResult + { + ConnectionId = connectionId, + Reason = reason, + MatchedCount = connections.Count, + RequestedCount = requestedCount, + SkippedCount = skippedCount + }; + } + + public void ClearPending(string connectionId) + { + _lastRequestUtc.TryRemove(connectionId, out _); + } + + private async Task TryRequestResyncAsync( + ConnectionState connection, + string reason, + bool force, + CancellationToken cancellationToken) + { + var nowUtc = _timeProvider.GetUtcNow(); + if (!force && + _lastRequestUtc.TryGetValue(connection.ConnectionId, out var lastRequestUtc) && + nowUtc - lastRequestUtc < RequestCooldown) + { + return false; + } + + var payload = new RegistrationResyncRequestPayload + { + Reason = reason + }; + + var frame = new Frame + { + Type = FrameType.ResyncRequest, + CorrelationId = null, + Payload = JsonSerializer.SerializeToUtf8Bytes(payload, JsonOptions) + }; + + await _transportClient.SendFrameAsync(connection, frame, cancellationToken); + _lastRequestUtc[connection.ConnectionId] = nowUtc; + + _logger.LogInformation( + "Requested endpoint metadata replay for {ConnectionId} ({ServiceName}/{Version}): {Reason}", + connection.ConnectionId, + connection.Instance.ServiceName, + connection.Instance.Version, + reason); + + return true; + } +} + +public sealed record GatewayRegistrationResyncResult +{ + public string? ConnectionId { get; init; } + + public required string Reason { get; init; } + + public int MatchedCount { get; init; } + + public int RequestedCount { get; init; } + + public int SkippedCount { get; init; } +} diff --git a/src/Router/StellaOps.Gateway.WebService/Services/GatewayTransportClient.cs b/src/Router/StellaOps.Gateway.WebService/Services/GatewayTransportClient.cs index 35baa1f2f..a9945ca23 100644 --- a/src/Router/StellaOps.Gateway.WebService/Services/GatewayTransportClient.cs +++ b/src/Router/StellaOps.Gateway.WebService/Services/GatewayTransportClient.cs @@ -11,7 +11,7 @@ using System.Threading.Channels; namespace StellaOps.Gateway.WebService.Services; -public sealed class GatewayTransportClient : ITransportClient +public sealed class GatewayTransportClient : IGatewayTransportClient { private readonly TcpTransportServer? _tcpServer; private readonly TlsTransportServer? _tlsServer; @@ -49,7 +49,7 @@ public sealed class GatewayTransportClient : ITransportClient try { - await SendFrameAsync(connection, frame, cancellationToken); + await DispatchFrameAsync(connection, frame, cancellationToken); using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); timeoutCts.CancelAfter(timeout); @@ -71,7 +71,7 @@ public sealed class GatewayTransportClient : ITransportClient Payload = ReadOnlyMemory.Empty }; - await SendFrameAsync(connection, frame, CancellationToken.None); + await DispatchFrameAsync(connection, frame, CancellationToken.None); } public async Task SendStreamingAsync( @@ -102,7 +102,7 @@ public sealed class GatewayTransportClient : ITransportClient try { - await SendFrameAsync(connection, headerFrame, cancellationToken); + await DispatchFrameAsync(connection, headerFrame, cancellationToken); await StreamRequestBodyAsync(connection, correlationId, requestBody, limits, cancellationToken); using var responseStream = new MemoryStream(); @@ -142,7 +142,12 @@ public sealed class GatewayTransportClient : ITransportClient _logger.LogDebug("No pending request for correlation ID {CorrelationId}", frame.CorrelationId); } - private async Task SendFrameAsync(ConnectionState connection, Frame frame, CancellationToken cancellationToken) + public Task SendFrameAsync(ConnectionState connection, Frame frame, CancellationToken cancellationToken) + { + return DispatchFrameAsync(connection, frame, cancellationToken); + } + + private async Task DispatchFrameAsync(ConnectionState connection, Frame frame, CancellationToken cancellationToken) { switch (connection.TransportType) { @@ -211,7 +216,7 @@ public sealed class GatewayTransportClient : ITransportClient CorrelationId = correlationId, Payload = new ReadOnlyMemory(buffer, 0, bytesRead) }; - await SendFrameAsync(connection, dataFrame, cancellationToken); + await DispatchFrameAsync(connection, dataFrame, cancellationToken); } var endFrame = new Frame @@ -220,7 +225,7 @@ public sealed class GatewayTransportClient : ITransportClient CorrelationId = correlationId, Payload = ReadOnlyMemory.Empty }; - await SendFrameAsync(connection, endFrame, cancellationToken); + await DispatchFrameAsync(connection, endFrame, cancellationToken); } finally { diff --git a/src/Router/StellaOps.Gateway.WebService/Services/IGatewayTransportClient.cs b/src/Router/StellaOps.Gateway.WebService/Services/IGatewayTransportClient.cs new file mode 100644 index 000000000..2e5b9e297 --- /dev/null +++ b/src/Router/StellaOps.Gateway.WebService/Services/IGatewayTransportClient.cs @@ -0,0 +1,11 @@ +using StellaOps.Router.Common.Abstractions; +using StellaOps.Router.Common.Models; + +namespace StellaOps.Gateway.WebService.Services; + +public interface IGatewayTransportClient : ITransportClient +{ + Task SendFrameAsync(ConnectionState connection, Frame frame, CancellationToken cancellationToken); + + void HandleResponseFrame(Frame frame); +} diff --git a/src/Router/__Libraries/StellaOps.Microservice/RouterConnectionManager.cs b/src/Router/__Libraries/StellaOps.Microservice/RouterConnectionManager.cs index 748b3836b..51978ef8f 100644 --- a/src/Router/__Libraries/StellaOps.Microservice/RouterConnectionManager.cs +++ b/src/Router/__Libraries/StellaOps.Microservice/RouterConnectionManager.cs @@ -101,6 +101,7 @@ public sealed class RouterConnectionManager : IRouterConnectionManager, IDisposa if (_microserviceTransport is not null) { _microserviceTransport.OnRequestReceived += HandleRequestReceivedAsync; + _microserviceTransport.OnResyncRequested += HandleResyncRequestedAsync; } // Get schema definitions from generated provider and runtime discovery provider. @@ -128,7 +129,12 @@ public sealed class RouterConnectionManager : IRouterConnectionManager, IDisposa { // Listen for transport death to trigger automatic reconnection _microserviceTransport.TransportDied += OnTransportDied; - await SendRegistrationRefreshAsync(cancellationToken); + await _microserviceTransport.ConnectAsync( + CreateInstanceDescriptor(), + _endpoints, + _schemas, + _openApiInfo, + cancellationToken); } else { @@ -159,6 +165,7 @@ public sealed class RouterConnectionManager : IRouterConnectionManager, IDisposa finally { _microserviceTransport.OnRequestReceived -= HandleRequestReceivedAsync; + _microserviceTransport.OnResyncRequested -= HandleResyncRequestedAsync; } } @@ -308,7 +315,12 @@ public sealed class RouterConnectionManager : IRouterConnectionManager, IDisposa if (_microserviceTransport is null || _endpoints is null) return; - await SendRegistrationRefreshAsync(_cts.Token); + await _microserviceTransport.ConnectAsync( + CreateInstanceDescriptor(), + _endpoints, + _schemas, + _openApiInfo, + _cts.Token); _logger.LogInformation( "Messaging transport reconnected for {ServiceName}/{Version}", @@ -327,44 +339,24 @@ public sealed class RouterConnectionManager : IRouterConnectionManager, IDisposa private async Task HeartbeatLoopAsync(CancellationToken cancellationToken) { var nextHeartbeatDueUtc = DateTime.UtcNow + _options.HeartbeatInterval; - var nextRegistrationRefreshDueUtc = DateTime.UtcNow + _options.RegistrationRefreshInterval; while (!cancellationToken.IsCancellationRequested) { try { var nowUtc = DateTime.UtcNow; - var delay = Min(nextHeartbeatDueUtc, nextRegistrationRefreshDueUtc) - nowUtc; + var delay = nextHeartbeatDueUtc - nowUtc; if (delay > TimeSpan.Zero) { await Task.Delay(delay, cancellationToken); nowUtc = DateTime.UtcNow; } - if (_microserviceTransport is not null && - _endpoints is not null && - nowUtc >= nextRegistrationRefreshDueUtc) - { - try - { - await SendRegistrationRefreshAsync(cancellationToken); - _logger.LogDebug( - "Sent periodic HELLO refresh for {ServiceName}/{Version}", - _options.ServiceName, - _options.Version); - } - catch (Exception ex) - { - _logger.LogWarning(ex, "Failed to send periodic HELLO refresh"); - } - - nextRegistrationRefreshDueUtc = nowUtc + _options.RegistrationRefreshInterval; - } - if (_microserviceTransport is not null && nowUtc >= nextHeartbeatDueUtc) { var heartbeat = new HeartbeatPayload { + Instance = CreateInstanceDescriptor(), InstanceId = _options.InstanceId, Status = _currentStatus, InFlightRequestCount = _inFlightRequestCount, @@ -407,15 +399,22 @@ public sealed class RouterConnectionManager : IRouterConnectionManager, IDisposa } } - private async Task SendRegistrationRefreshAsync(CancellationToken cancellationToken) + private async Task HandleResyncRequestedAsync( + RegistrationResyncRequestPayload request, + CancellationToken cancellationToken) { if (_microserviceTransport is null || _endpoints is null) { return; } - await _microserviceTransport.ConnectAsync( - CreateInstanceDescriptor(), + _logger.LogInformation( + "Router requested endpoint metadata replay for {ServiceName}/{Version}: {Reason}", + _options.ServiceName, + _options.Version, + request.Reason); + + await _microserviceTransport.SendEndpointsUpdateAsync( _endpoints, _schemas, _openApiInfo, diff --git a/src/Router/__Libraries/StellaOps.Router.Common/Abstractions/IMicroserviceTransport.cs b/src/Router/__Libraries/StellaOps.Router.Common/Abstractions/IMicroserviceTransport.cs index 106a45e7f..184dc9802 100644 --- a/src/Router/__Libraries/StellaOps.Router.Common/Abstractions/IMicroserviceTransport.cs +++ b/src/Router/__Libraries/StellaOps.Router.Common/Abstractions/IMicroserviceTransport.cs @@ -49,6 +49,18 @@ public interface IMicroserviceTransport /// Cancellation token. Task SendHeartbeatAsync(HeartbeatPayload heartbeat, CancellationToken cancellationToken); + /// + /// Sends the current endpoint/schema/OpenAPI metadata after the router explicitly requests a replay. + /// + Task SendEndpointsUpdateAsync( + IReadOnlyList endpoints, + IReadOnlyDictionary? schemas, + ServiceOpenApiInfo? openApiInfo, + CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + /// /// Event raised when a REQUEST frame is received from the gateway. /// @@ -59,6 +71,11 @@ public interface IMicroserviceTransport /// event Func? OnCancelReceived; + /// + /// Event raised when the router explicitly requests a fresh endpoint metadata replay. + /// + event Func? OnResyncRequested { add { } remove { } } + /// /// Raised when the transport connection is permanently lost and cannot recover. /// Consumers should reconnect by calling again. diff --git a/src/Router/__Libraries/StellaOps.Router.Common/Enums/FrameType.cs b/src/Router/__Libraries/StellaOps.Router.Common/Enums/FrameType.cs index 0d1fab006..120e202a8 100644 --- a/src/Router/__Libraries/StellaOps.Router.Common/Enums/FrameType.cs +++ b/src/Router/__Libraries/StellaOps.Router.Common/Enums/FrameType.cs @@ -40,6 +40,11 @@ public enum FrameType /// Cancel, + /// + /// Gateway-to-service control frame requesting a fresh endpoint metadata replay. + /// + ResyncRequest, + /// /// Optional frame for updating endpoint metadata at runtime. /// diff --git a/src/Router/__Libraries/StellaOps.Router.Common/Models/EndpointsUpdatePayload.cs b/src/Router/__Libraries/StellaOps.Router.Common/Models/EndpointsUpdatePayload.cs new file mode 100644 index 000000000..803170ccc --- /dev/null +++ b/src/Router/__Libraries/StellaOps.Router.Common/Models/EndpointsUpdatePayload.cs @@ -0,0 +1,23 @@ +namespace StellaOps.Router.Common.Models; + +/// +/// Payload for the EndpointsUpdate frame sent by a microservice after the router requests a metadata replay. +/// +public sealed record EndpointsUpdatePayload +{ + /// + /// Gets the endpoints registered by this instance. + /// + public required IReadOnlyList Endpoints { get; init; } + + /// + /// Gets the schema definitions for request/response validation. + /// Keys are schema IDs referenced by EndpointDescriptor.SchemaInfo. + /// + public IReadOnlyDictionary Schemas { get; init; } = new Dictionary(); + + /// + /// Gets the OpenAPI metadata for this service. + /// + public ServiceOpenApiInfo? OpenApiInfo { get; init; } +} diff --git a/src/Router/__Libraries/StellaOps.Router.Common/Models/HeartbeatPayload.cs b/src/Router/__Libraries/StellaOps.Router.Common/Models/HeartbeatPayload.cs index 929325b33..6033d63d4 100644 --- a/src/Router/__Libraries/StellaOps.Router.Common/Models/HeartbeatPayload.cs +++ b/src/Router/__Libraries/StellaOps.Router.Common/Models/HeartbeatPayload.cs @@ -7,6 +7,11 @@ namespace StellaOps.Router.Common.Models; /// public sealed record HeartbeatPayload { + /// + /// Gets the instance descriptor. + /// + public InstanceDescriptor? Instance { get; init; } + /// /// Gets the instance ID. /// diff --git a/src/Router/__Libraries/StellaOps.Router.Common/Models/RegistrationResyncRequestPayload.cs b/src/Router/__Libraries/StellaOps.Router.Common/Models/RegistrationResyncRequestPayload.cs new file mode 100644 index 000000000..ad9783af6 --- /dev/null +++ b/src/Router/__Libraries/StellaOps.Router.Common/Models/RegistrationResyncRequestPayload.cs @@ -0,0 +1,12 @@ +namespace StellaOps.Router.Common.Models; + +/// +/// Payload for the ResyncRequest frame sent by the router when it needs a fresh endpoint metadata replay. +/// +public sealed record RegistrationResyncRequestPayload +{ + /// + /// Gets the reason for the replay request. + /// + public string Reason { get; init; } = "router-requested-resync"; +} diff --git a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportClient.cs b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportClient.cs index 1d72a7e29..ac808d44a 100644 --- a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportClient.cs +++ b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportClient.cs @@ -51,6 +51,9 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr /// public event Func? OnCancelReceived; + /// + public event Func? OnResyncRequested; + /// /// Initializes a new instance of the class. /// @@ -100,9 +103,7 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr var helloPayload = new HelloPayload { Instance = instance, - Endpoints = endpoints, - Schemas = schemas ?? new Dictionary(), - OpenApiInfo = openApiInfo + Endpoints = [] }; var helloMessage = new RpcRequestMessage @@ -129,7 +130,7 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr else { _logger.LogDebug( - "Refreshed messaging registration for {ServiceName}/{Version} instance {InstanceId} on connection {ConnectionId}", + "Replayed slim messaging HELLO for {ServiceName}/{Version} instance {InstanceId} on connection {ConnectionId}", instance.ServiceName, instance.Version, instance.InstanceId, @@ -235,32 +236,20 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr { try { - _logger.LogWarning("[DIAG] Consumer loop: calling LeaseAsync (batch={BatchSize})", _options.BatchSize); - var sw = System.Diagnostics.Stopwatch.StartNew(); - var leases = await _serviceIncomingQueue.LeaseAsync( new LeaseRequest { BatchSize = _options.BatchSize }, cancellationToken); - sw.Stop(); consecutiveFailures = 0; if (leases.Count > 0) { - _logger.LogWarning("[DIAG] Consumer loop: leased {Count} messages in {ElapsedMs}ms, processing concurrently", - leases.Count, sw.ElapsedMilliseconds); - await Task.WhenAll(leases.Select(lease => ProcessLeaseWithGuardAsync(lease, HandleIncomingRequestAsync, cancellationToken))); - - _logger.LogWarning("[DIAG] Consumer loop: finished processing {Count} messages", leases.Count); } else { - _logger.LogWarning("[DIAG] Consumer loop: no messages, waiting for notification (lease took {ElapsedMs}ms)", - sw.ElapsedMilliseconds); await _serviceIncomingQueue.WaitForMessagesAsync(_options.HeartbeatInterval, cancellationToken); - _logger.LogWarning("[DIAG] Consumer loop: notification received or timeout, resuming"); } } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) @@ -299,6 +288,12 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr return; } + if (message.FrameType == FrameType.ResyncRequest) + { + await HandleResyncRequestAsync(message, cancellationToken); + return; + } + if (message.FrameType is not (FrameType.Request or FrameType.RequestStreamData)) { return; @@ -387,6 +382,27 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr } } + private async Task HandleResyncRequestAsync( + RpcResponseMessage message, + CancellationToken cancellationToken) + { + if (OnResyncRequested is null) + { + _logger.LogDebug( + "Ignoring router resync request for {ConnectionId}; no replay handler is registered", + message.ConnectionId); + return; + } + + var request = string.IsNullOrEmpty(message.PayloadBase64) + ? new RegistrationResyncRequestPayload() + : JsonSerializer.Deserialize( + Convert.FromBase64String(message.PayloadBase64), + _jsonOptions) ?? new RegistrationResyncRequestPayload(); + + await OnResyncRequested(request, cancellationToken); + } + /// public async Task SendRequestAsync( ConnectionState connection, @@ -553,6 +569,38 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr await _requestQueue.EnqueueAsync(message, cancellationToken: cancellationToken); } + /// + public async Task SendEndpointsUpdateAsync( + IReadOnlyList endpoints, + IReadOnlyDictionary? schemas, + ServiceOpenApiInfo? openApiInfo, + CancellationToken cancellationToken) + { + if (_requestQueue is null || _connectionId is null) + { + throw new InvalidOperationException("Not connected"); + } + + var payload = new EndpointsUpdatePayload + { + Endpoints = endpoints, + Schemas = schemas ?? new Dictionary(), + OpenApiInfo = openApiInfo + }; + + var message = new RpcRequestMessage + { + CorrelationId = Guid.NewGuid().ToString("N"), + ConnectionId = _connectionId, + TargetService = "gateway", + FrameType = FrameType.EndpointsUpdate, + PayloadBase64 = Convert.ToBase64String(JsonSerializer.SerializeToUtf8Bytes(payload, _jsonOptions)), + SenderInstanceId = _instance?.InstanceId + }; + + await _requestQueue.EnqueueAsync(message, cancellationToken: cancellationToken); + } + /// public async Task DisconnectAsync() { @@ -707,33 +755,29 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr CancellationToken cancellationToken) where TMessage : class { - var sw = System.Diagnostics.Stopwatch.StartNew(); try { using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); timeoutCts.CancelAfter(TimeSpan.FromSeconds(55)); - _logger.LogWarning("[DIAG] Guard: processing message {MessageId}", lease.MessageId); // WaitAsync abandons handlers that ignore CancellationToken (e.g., // StackExchange.Redis commands with their own internal timeout). // The handler continues in background but the consumer loop is unblocked. await handler(lease, timeoutCts.Token).WaitAsync(timeoutCts.Token); - sw.Stop(); - _logger.LogWarning("[DIAG] Guard: message {MessageId} processed in {ElapsedMs}ms", lease.MessageId, sw.ElapsedMilliseconds); await lease.AcknowledgeAsync(cancellationToken); } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { - _logger.LogWarning("[DIAG] Guard: message {MessageId} cancelled (shutdown) after {ElapsedMs}ms", lease.MessageId, sw.ElapsedMilliseconds); + // Graceful shutdown. } catch (OperationCanceledException) { - _logger.LogWarning("Guard: message {MessageId} TIMED OUT after {ElapsedMs}ms (55s limit)", lease.MessageId, sw.ElapsedMilliseconds); + _logger.LogWarning("Guard: message {MessageId} timed out after 55s", lease.MessageId); await TryReleaseAsync(lease, ReleaseDisposition.Retry, cancellationToken); } catch (Exception ex) { - _logger.LogError(ex, "Guard: message {MessageId} FAILED after {ElapsedMs}ms", lease.MessageId, sw.ElapsedMilliseconds); + _logger.LogError(ex, "Guard: message {MessageId} failed", lease.MessageId); await TryReleaseAsync(lease, ReleaseDisposition.Retry, cancellationToken); } } diff --git a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportServer.cs b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportServer.cs index 2f584b14a..aee72ceb6 100644 --- a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportServer.cs +++ b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportServer.cs @@ -44,6 +44,11 @@ public sealed class MessagingTransportServer : ITransportServer, IDisposable /// public event Func? OnHeartbeatReceived; + /// + /// Event raised when a microservice replays endpoint metadata after a router resync request. + /// + public event Func? OnEndpointsUpdated; + /// /// Event raised when a RESPONSE frame is received. /// @@ -229,6 +234,10 @@ public sealed class MessagingTransportServer : ITransportServer, IDisposable await HandleHeartbeatMessageAsync(message, cancellationToken); break; + case FrameType.EndpointsUpdate: + await HandleEndpointsUpdateMessageAsync(message, cancellationToken); + break; + case FrameType.Response: case FrameType.ResponseStreamData: // Response from microservice to gateway - route to pending request @@ -302,17 +311,37 @@ public sealed class MessagingTransportServer : ITransportServer, IDisposable private async Task HandleHeartbeatMessageAsync(RpcRequestMessage message, CancellationToken cancellationToken) { + var payload = JsonSerializer.Deserialize( + Convert.FromBase64String(message.PayloadBase64), _jsonOptions); + if (!_connections.TryGetValue(message.ConnectionId, out var state)) { - _logger.LogWarning("Heartbeat from unknown connection {ConnectionId}", message.ConnectionId); - return; + if (payload?.Instance is null) + { + _logger.LogWarning("Heartbeat from unknown connection {ConnectionId}", message.ConnectionId); + return; + } + + state = new ConnectionState + { + ConnectionId = message.ConnectionId, + Instance = payload.Instance, + Status = payload.Status, + LastHeartbeatUtc = DateTime.UtcNow, + TransportType = TransportType.Messaging + }; + + _connections[message.ConnectionId] = state; + + _logger.LogInformation( + "Heartbeat discovered unknown messaging connection {ConnectionId} for {ServiceName}/{Version}; awaiting endpoint replay", + message.ConnectionId, + state.Instance.ServiceName, + state.Instance.Version); } state.LastHeartbeatUtc = DateTime.UtcNow; - var payload = JsonSerializer.Deserialize( - Convert.FromBase64String(message.PayloadBase64), _jsonOptions); - if (payload is not null) { state.Status = payload.Status; @@ -326,6 +355,58 @@ public sealed class MessagingTransportServer : ITransportServer, IDisposable } } + private async Task HandleEndpointsUpdateMessageAsync( + RpcRequestMessage message, + CancellationToken cancellationToken) + { + if (!_connections.TryGetValue(message.ConnectionId, out var existing)) + { + _logger.LogWarning( + "Endpoint metadata replay arrived for unknown connection {ConnectionId}", + message.ConnectionId); + return; + } + + var payload = JsonSerializer.Deserialize( + Convert.FromBase64String(message.PayloadBase64), + _jsonOptions); + + if (payload is null) + { + _logger.LogWarning("Invalid EndpointsUpdate payload from {ConnectionId}", message.ConnectionId); + return; + } + + var state = new ConnectionState + { + ConnectionId = existing.ConnectionId, + Instance = existing.Instance, + Status = existing.Status, + LastHeartbeatUtc = DateTime.UtcNow, + TransportType = existing.TransportType, + Schemas = payload.Schemas, + OpenApiInfo = payload.OpenApiInfo + }; + + foreach (var endpoint in payload.Endpoints) + { + state.Endpoints[(endpoint.Method, endpoint.Path)] = endpoint; + } + + _connections[message.ConnectionId] = state; + + _logger.LogInformation( + "Endpoint metadata replay received from {ServiceName}/{Version} instance {InstanceId} via messaging", + state.Instance.ServiceName, + state.Instance.Version, + state.Instance.InstanceId); + + if (OnEndpointsUpdated is not null) + { + await OnEndpointsUpdated(state, payload); + } + } + /// /// Sends a request frame to a microservice via messaging. /// diff --git a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/TASKS.md b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/TASKS.md index e7457b350..752fcebc6 100644 --- a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/TASKS.md +++ b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/TASKS.md @@ -5,6 +5,6 @@ Source of truth: `docs/implplan/SPRINT_20260130_002_Tools_csproj_remediation_sol | Task ID | Status | Notes | | --- | --- | --- | | RTR-MSG-002 | DONE | `docs-archived/implplan/SPRINT_20260307_008_Router_integrations_messaging_reregistration_stability.md` - periodic HELLO re-registration now reuses the existing messaging connection/receive loop instead of minting duplicate logical connections. | -| RVM-06 | DONE | Updated messaging microservice HELLO payload to include schemas/OpenAPI metadata via the new schema-aware `IMicroserviceTransport.ConnectAsync(...)` overload. | +| RVM-06 | DONE | Messaging HELLO now stays slim and identity-only; the router requests full endpoint/schema/OpenAPI replay explicitly via `ResyncRequest` and `EndpointsUpdate` frames when startup or administration needs metadata. | | REMED-05 | TODO | Remediation checklist: docs/implplan/audits/csproj-standards/remediation/checklists/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/StellaOps.Router.Transport.Messaging.md. | | REMED-06 | DONE | SOLID review notes captured for SPRINT_20260130_002. | diff --git a/src/Router/__Tests/StellaOps.Gateway.WebService.Tests/Integration/MessagingTransportIntegrationTests.cs b/src/Router/__Tests/StellaOps.Gateway.WebService.Tests/Integration/MessagingTransportIntegrationTests.cs index 62c92b854..a3ab4e146 100644 --- a/src/Router/__Tests/StellaOps.Gateway.WebService.Tests/Integration/MessagingTransportIntegrationTests.cs +++ b/src/Router/__Tests/StellaOps.Gateway.WebService.Tests/Integration/MessagingTransportIntegrationTests.cs @@ -60,11 +60,17 @@ public sealed class MessagingTransportIntegrationTests tcpServer, tlsServer, messagingServer); + var resyncService = new GatewayRegistrationResyncService( + routingState.Object, + transportClient, + TimeProvider.System, + NullLogger.Instance); // Act & Assert - construction should succeed with messaging server var hostedService = new GatewayHostedService( routingState.Object, transportClient, + resyncService, claimsStore.Object, gatewayOptions, new GatewayServiceStatus(), @@ -96,11 +102,17 @@ public sealed class MessagingTransportIntegrationTests tcpServer, tlsServer, messagingServer: null); + var resyncService = new GatewayRegistrationResyncService( + routingState.Object, + transportClient, + TimeProvider.System, + NullLogger.Instance); // Act & Assert - construction should succeed without messaging server var hostedService = new GatewayHostedService( routingState.Object, transportClient, + resyncService, claimsStore.Object, gatewayOptions, new GatewayServiceStatus(), diff --git a/src/Router/__Tests/StellaOps.Gateway.WebService.Tests/Services/GatewayRegistrationResyncServiceTests.cs b/src/Router/__Tests/StellaOps.Gateway.WebService.Tests/Services/GatewayRegistrationResyncServiceTests.cs new file mode 100644 index 000000000..382e6636b --- /dev/null +++ b/src/Router/__Tests/StellaOps.Gateway.WebService.Tests/Services/GatewayRegistrationResyncServiceTests.cs @@ -0,0 +1,124 @@ +using System.Text.Json; +using Microsoft.Extensions.Logging.Abstractions; +using Moq; +using StellaOps.Gateway.WebService.Services; +using StellaOps.Router.Common.Abstractions; +using StellaOps.Router.Common.Enums; +using StellaOps.Router.Common.Models; + +namespace StellaOps.Gateway.WebService.Tests.Services; + +public sealed class GatewayRegistrationResyncServiceTests +{ + private static readonly JsonSerializerOptions JsonOptions = new() + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase + }; + + [Fact] + public async Task RequestResyncAsync_TargetConnection_SendsResyncFrame() + { + var connection = CreateConnectionState("conn-1"); + var routingState = new Mock(); + routingState.Setup(state => state.GetAllConnections()).Returns([connection]); + + var transportClient = new Mock(); + Frame? capturedFrame = null; + transportClient + .Setup(client => client.SendFrameAsync(connection, It.IsAny(), It.IsAny())) + .Callback((_, frame, _) => capturedFrame = frame) + .Returns(Task.CompletedTask); + + var service = new GatewayRegistrationResyncService( + routingState.Object, + transportClient.Object, + TimeProvider.System, + NullLogger.Instance); + + var result = await service.RequestResyncAsync("conn-1", "unit-test", force: true, CancellationToken.None); + + result.MatchedCount.Should().Be(1); + result.RequestedCount.Should().Be(1); + capturedFrame.Should().NotBeNull(); + capturedFrame!.Type.Should().Be(FrameType.ResyncRequest); + + var payload = JsonSerializer.Deserialize(capturedFrame.Payload.Span, JsonOptions); + payload.Should().NotBeNull(); + payload!.Reason.Should().Be("unit-test"); + } + + [Fact] + public async Task RequestResyncAsync_UsesCooldownUnlessForced() + { + var connection = CreateConnectionState("conn-1"); + var routingState = new Mock(); + routingState.Setup(state => state.GetAllConnections()).Returns([connection]); + + var transportClient = new Mock(); + transportClient + .Setup(client => client.SendFrameAsync(connection, It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + var service = new GatewayRegistrationResyncService( + routingState.Object, + transportClient.Object, + TimeProvider.System, + NullLogger.Instance); + + var first = await service.RequestResyncAsync("conn-1", "first", force: false, CancellationToken.None); + var second = await service.RequestResyncAsync("conn-1", "second", force: false, CancellationToken.None); + var third = await service.RequestResyncAsync("conn-1", "forced", force: true, CancellationToken.None); + + first.RequestedCount.Should().Be(1); + second.RequestedCount.Should().Be(0); + third.RequestedCount.Should().Be(1); + + transportClient.Verify( + client => client.SendFrameAsync(connection, It.IsAny(), It.IsAny()), + Times.Exactly(2)); + } + + [Fact] + public async Task ClearPending_AllowsImmediateRetry() + { + var connection = CreateConnectionState("conn-1"); + var routingState = new Mock(); + routingState.Setup(state => state.GetAllConnections()).Returns([connection]); + + var transportClient = new Mock(); + transportClient + .Setup(client => client.SendFrameAsync(connection, It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + var service = new GatewayRegistrationResyncService( + routingState.Object, + transportClient.Object, + TimeProvider.System, + NullLogger.Instance); + + await service.RequestResyncAsync("conn-1", "first", force: false, CancellationToken.None); + service.ClearPending("conn-1"); + var result = await service.RequestResyncAsync("conn-1", "second", force: false, CancellationToken.None); + + result.RequestedCount.Should().Be(1); + transportClient.Verify( + client => client.SendFrameAsync(connection, It.IsAny(), It.IsAny()), + Times.Exactly(2)); + } + + private static ConnectionState CreateConnectionState(string connectionId) + { + return new ConnectionState + { + ConnectionId = connectionId, + Instance = new InstanceDescriptor + { + InstanceId = "svc-1", + ServiceName = "timelineindexer", + Version = "1.0.0", + Region = "local" + }, + TransportType = TransportType.Messaging + }; + } +} diff --git a/src/Router/__Tests/StellaOps.Microservice.Tests/RouterConnectionManagerTests.cs b/src/Router/__Tests/StellaOps.Microservice.Tests/RouterConnectionManagerTests.cs index 1c9e407d5..b5085236b 100644 --- a/src/Router/__Tests/StellaOps.Microservice.Tests/RouterConnectionManagerTests.cs +++ b/src/Router/__Tests/StellaOps.Microservice.Tests/RouterConnectionManagerTests.cs @@ -388,7 +388,7 @@ public sealed class RouterConnectionManagerTests : IDisposable [Trait("Category", TestCategories.Unit)] [Fact] - public async Task StartAsync_ReplaysHelloWithinRegistrationRefreshInterval() + public async Task StartAsync_DoesNotReplayHelloWithinRegistrationRefreshInterval() { // Arrange _options.Routers.Add(new RouterEndpointConfig @@ -398,7 +398,6 @@ public sealed class RouterConnectionManagerTests : IDisposable TransportType = TransportType.InMemory }); - var registrationReplayObserved = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var connectCount = 0; _transportMock .Setup(t => t.ConnectAsync( @@ -409,10 +408,7 @@ public sealed class RouterConnectionManagerTests : IDisposable It.IsAny())) .Callback(() => { - if (Interlocked.Increment(ref connectCount) >= 2) - { - registrationReplayObserved.TrySetResult(); - } + Interlocked.Increment(ref connectCount); }) .Returns(Task.CompletedTask); @@ -420,11 +416,54 @@ public sealed class RouterConnectionManagerTests : IDisposable // Act await manager.StartAsync(CancellationToken.None); - await registrationReplayObserved.Task.WaitAsync(TimeSpan.FromSeconds(2)); + await Task.Delay(150); await manager.StopAsync(CancellationToken.None); // Assert - connectCount.Should().BeGreaterThanOrEqualTo(2); + connectCount.Should().Be(1); + } + + [Trait("Category", TestCategories.Unit)] + [Fact] + public async Task ResyncRequest_ReplaysEndpointMetadataWithoutReconnect() + { + // Arrange + _options.Routers.Add(new RouterEndpointConfig + { + Host = "localhost", + Port = 5000, + TransportType = TransportType.InMemory + }); + + var endpoints = new List + { + new() { ServiceName = "test-service", Version = "1.0.0", Method = "GET", Path = "/api/users" } + }; + + _discoveryProviderMock.Setup(d => d.DiscoverEndpoints()).Returns(endpoints); + + var transport = new RecordingMicroserviceTransport(); + using var manager = new RouterConnectionManager( + Options.Create(_options), + _discoveryProviderMock.Object, + _requestDispatcherMock.Object, + transport, + NullLogger.Instance); + + // Act + await manager.StartAsync(CancellationToken.None); + await transport.RaiseResyncRequestedAsync(new RegistrationResyncRequestPayload + { + Reason = "unit-test" + }); + await manager.StopAsync(CancellationToken.None); + + // Assert + transport.ConnectCount.Should().Be(1); + transport.EndpointUpdateCount.Should().Be(1); + transport.LastEndpointUpdate.Should().NotBeNull(); + transport.LastEndpointUpdate!.Endpoints.Should().ContainSingle(); + transport.LastEndpointUpdate.Endpoints[0].Path.Should().Be("/api/users"); } #endregion @@ -471,4 +510,69 @@ public sealed class RouterConnectionManagerTests : IDisposable public IReadOnlyDictionary DiscoverSchemaDefinitions() => _schemas; } + + private sealed class RecordingMicroserviceTransport : IMicroserviceTransport + { + public int ConnectCount { get; private set; } + + public int EndpointUpdateCount { get; private set; } + + public EndpointsUpdatePayload? LastEndpointUpdate { get; private set; } + + public event Func>? OnRequestReceived; + + public event Func? OnCancelReceived; + + public event Func? OnResyncRequested; + + public event Action? TransportDied; + + public Task ConnectAsync( + InstanceDescriptor instance, + IReadOnlyList endpoints, + CancellationToken cancellationToken) + { + ConnectCount++; + return Task.CompletedTask; + } + + public Task ConnectAsync( + InstanceDescriptor instance, + IReadOnlyList endpoints, + IReadOnlyDictionary? schemas, + ServiceOpenApiInfo? openApiInfo, + CancellationToken cancellationToken) + { + ConnectCount++; + return Task.CompletedTask; + } + + public Task DisconnectAsync() => Task.CompletedTask; + + public Task SendHeartbeatAsync(HeartbeatPayload heartbeat, CancellationToken cancellationToken) => Task.CompletedTask; + + public Task SendEndpointsUpdateAsync( + IReadOnlyList endpoints, + IReadOnlyDictionary? schemas, + ServiceOpenApiInfo? openApiInfo, + CancellationToken cancellationToken) + { + EndpointUpdateCount++; + LastEndpointUpdate = new EndpointsUpdatePayload + { + Endpoints = endpoints, + Schemas = schemas ?? new Dictionary(), + OpenApiInfo = openApiInfo + }; + + return Task.CompletedTask; + } + + public Task RaiseResyncRequestedAsync(RegistrationResyncRequestPayload request) + { + return OnResyncRequested is null + ? Task.CompletedTask + : OnResyncRequested.Invoke(request, CancellationToken.None); + } + } } diff --git a/src/Router/__Tests/StellaOps.Router.Common.Tests/MessagingTransportQueueOptionsTests.cs b/src/Router/__Tests/StellaOps.Router.Common.Tests/MessagingTransportQueueOptionsTests.cs index e102573cf..6a703c79f 100644 --- a/src/Router/__Tests/StellaOps.Router.Common.Tests/MessagingTransportQueueOptionsTests.cs +++ b/src/Router/__Tests/StellaOps.Router.Common.Tests/MessagingTransportQueueOptionsTests.cs @@ -89,7 +89,7 @@ public sealed class MessagingTransportQueueOptionsTests [Trait("Category", TestCategories.Unit)] [Fact] - public async Task MessagingTransportClient_ConnectAsync_IncludesSchemasAndOpenApiInfoInHelloPayload() + public async Task MessagingTransportClient_ConnectAsync_SendsSlimHelloPayload() { var options = Options.Create(new MessagingTransportOptions { @@ -152,7 +152,82 @@ public sealed class MessagingTransportQueueOptionsTests new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }); payload.Should().NotBeNull(); - payload!.Schemas.Should().ContainKey(schemaId); + payload!.Instance.ServiceName.Should().Be("timelineindexer"); + payload.Endpoints.Should().BeEmpty(); + payload.Schemas.Should().BeEmpty(); + payload.OpenApiInfo.Should().BeNull(); + } + + [Trait("Category", TestCategories.Unit)] + [Fact] + public async Task MessagingTransportClient_SendEndpointsUpdateAsync_IncludesSchemasAndOpenApiInfoInUpdatePayload() + { + var options = Options.Create(new MessagingTransportOptions + { + ConsumerGroup = "timelineindexer-test", + BatchSize = 1 + }); + + var queueFactory = new RecordingQueueFactory(); + var client = new MessagingTransportClient( + queueFactory, + options, + NullLogger.Instance); + + var instance = new InstanceDescriptor + { + InstanceId = "timelineindexer-1", + ServiceName = "timelineindexer", + Version = "1.0.0", + Region = "local" + }; + + var endpoints = + new[] + { + new EndpointDescriptor + { + ServiceName = "timelineindexer", + Version = "1.0.0", + Method = "GET", + Path = "/api/v1/timeline" + } + }; + + var schemaId = "TimelineEvent"; + var schemas = new Dictionary + { + [schemaId] = new SchemaDefinition + { + SchemaId = schemaId, + SchemaJson = "{\"type\":\"object\"}", + ETag = "abc123" + } + }; + + await client.ConnectAsync(instance, endpoints, CancellationToken.None); + await client.SendEndpointsUpdateAsync( + endpoints, + schemas, + new ServiceOpenApiInfo + { + Title = "timelineindexer", + Description = "Timeline service" + }, + CancellationToken.None); + await client.DisconnectAsync(); + + var updateMessage = queueFactory.EnqueuedMessages + .OfType() + .First(message => message.FrameType == Common.Enums.FrameType.EndpointsUpdate); + + var payload = JsonSerializer.Deserialize( + Convert.FromBase64String(updateMessage.PayloadBase64), + new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }); + + payload.Should().NotBeNull(); + payload!.Endpoints.Should().ContainSingle(); + payload.Schemas.Should().ContainKey(schemaId); payload.Schemas[schemaId].SchemaJson.Should().Be("{\"type\":\"object\"}"); payload.OpenApiInfo.Should().NotBeNull(); payload.OpenApiInfo!.Title.Should().Be("timelineindexer");