From 2ff0e1f86b84451f35605122b6bd5fe4020cdf94 Mon Sep 17 00:00:00 2001 From: master <> Date: Sat, 7 Mar 2026 03:48:46 +0200 Subject: [PATCH] Fix router messaging re-registration stability --- ...ions_messaging_reregistration_stability.md | 91 +++++++ docs/modules/router/architecture.md | 2 + .../State/InMemoryRoutingState.cs | 107 +++++--- .../MessagingTransportClient.cs | 243 ++++++++++++------ .../TASKS.md | 1 + .../MessagingTransportQueueOptionsTests.cs | 50 ++++ .../StellaOps.Router.Common.Tests/TASKS.md | 1 + .../Routing/InMemoryRoutingStateTests.cs | 86 +++++++ .../StellaOps.Router.Gateway.Tests/TASKS.md | 1 + 9 files changed, 466 insertions(+), 116 deletions(-) create mode 100644 docs/implplan/SPRINT_20260307_008_Router_integrations_messaging_reregistration_stability.md create mode 100644 src/Router/__Tests/StellaOps.Router.Gateway.Tests/Routing/InMemoryRoutingStateTests.cs diff --git a/docs/implplan/SPRINT_20260307_008_Router_integrations_messaging_reregistration_stability.md b/docs/implplan/SPRINT_20260307_008_Router_integrations_messaging_reregistration_stability.md new file mode 100644 index 000000000..90dac94c0 --- /dev/null +++ b/docs/implplan/SPRINT_20260307_008_Router_integrations_messaging_reregistration_stability.md @@ -0,0 +1,91 @@ +# Sprint 20260307-008 - Router Integrations Messaging Re-registration Stability + +## Topic & Scope +- Eliminate duplicate Router messaging registrations that destabilize authenticated `https://stella-ops.local/api/v1/integrations*` traffic after repeated HELLO re-registration. +- Fix the defect at the shared Router layer so re-registration refreshes an existing service connection instead of accumulating stale gateway routing entries. +- Validate the repaired behavior with focused Router tests and live Playwright verification of Setup Integrations routes on `https://stella-ops.local`. +- Working directory: `src/Router`. +- Expected evidence: targeted Router unit tests, scoped service rebuild/restart, live Playwright route/action verification, and sprint execution log updates. + +## Dependencies & Concurrency +- Upstream repro/evidence is tracked in `docs/implplan/SPRINT_20260306_003_FE_playwright_setup_reset_iteration_loop.md`. +- Safe parallelism: stay inside `src/Router` plus sprint/task-board updates; do not edit unrelated Web search files or Integrations persistence files owned by other active work. +- Runtime dependency: `stellaops-router-gateway` and `stellaops-integrations-web` must be rebuildable independently from the rest of the compose stack. + +## Documentation Prerequisites +- `docs/07_HIGH_LEVEL_ARCHITECTURE.md` +- `docs/modules/platform/architecture-overview.md` +- `docs/modules/router/README.md` +- `docs/modules/router/architecture.md` +- `docs/modules/router/openapi-aggregation.md` +- `docs/modules/router/schema-validation.md` +- `src/Router/AGENTS.md` +- `src/Router/__Libraries/StellaOps.Router.Gateway/AGENTS.md` +- `src/Router/StellaOps.Gateway.WebService/AGENTS.md` + +## Delivery Tracker + +### RTR-MSG-001 - Reproduce and explain duplicate messaging registrations +Status: DONE +Dependency: none +Owners: QA, Developer +Task description: +- Use live authenticated Playwright plus Router/gateway logs to explain why Setup Integrations requests can oscillate between working and stalling. +- Capture the repeated registration pattern and map it back to the Router client/server code path responsible for HELLO re-registration. + +Completion criteria: +- [x] Live evidence shows the affected `stella-ops.local` integrations route path and the corresponding Router/gateway behavior. +- [x] The repeated registration pattern is tied to specific Router source files and not left as a generic timing issue. +- [x] The scope boundary with the Web sprint is documented. + +### RTR-MSG-002 - Stop duplicate Router registrations at the source and gateway state +Status: DONE +Dependency: RTR-MSG-001 +Owners: Developer +Task description: +- Fix the shared Router messaging transport so `ConnectAsync(...)` re-registration does not spawn duplicate queues/receive loops or a fresh logical connection when the transport is already healthy. +- Harden gateway routing state so a reconnecting instance replaces stale registrations for the same service instance instead of accumulating them. + +Completion criteria: +- [x] Re-registration reuses or replaces the logical connection deterministically instead of accumulating duplicates. +- [x] Gateway routing state no longer retains stale connections for the same service instance after re-registration. +- [x] The fix stays offline-safe and deterministic. + +### RTR-MSG-003 - Rebuild targeted services and replay live integrations QA +Status: DONE +Dependency: RTR-MSG-002 +Owners: QA, Developer +Task description: +- Rebuild only the Router gateway and the live Integrations service components affected by the shared messaging transport fix. +- Replay the live Setup Integrations pages and actions with authenticated Playwright, including repeated requests, list/detail rendering, and onboarding navigation. + +Completion criteria: +- [x] Targeted Router tests pass against focused test runners for the affected classes. +- [x] `stellaops-router-gateway` and `stellaops-integrations-web` are restarted with the patched Router code. +- [x] Live Playwright confirms the integrations routes/actions stay functional without fallback timeout UI being the only thing keeping the page usable. + +## Execution Log +| Date (UTC) | Update | Owner | +| --- | --- | --- | +| 2026-03-07 | Sprint created after live Playwright on `https://stella-ops.local/setup/integrations/*` and Router log review exposed a stateful messaging defect outside Web scope. Direct Integrations HTTP calls were healthy, but Router logs showed the same `integrations` service instance repeatedly registering new messaging connection IDs over time. | QA | +| 2026-03-07 | Root cause confirmed in `RouterConnectionManager` periodic HELLO re-registration plus `MessagingTransportClient.ConnectAsync(...)`, which recreated queues, receive loops, and logical connection IDs on every refresh. | Developer | +| 2026-03-07 | Patched the shared Router transport so healthy HELLO re-registration reuses the existing logical connection, and hardened `InMemoryRoutingState` to replace stale same-instance registrations before rebuilding endpoint indexes. | Developer | +| 2026-03-07 | Added targeted xUnit v3 regressions for messaging re-registration reuse and gateway routing-state dedupe, then executed those classes directly through the test assembly runners because Microsoft Testing Platform ignored `dotnet test --filter` for these projects. | QA | +| 2026-03-07 | Rebuilt `stellaops/router-gateway:dev` and `stellaops/integrations-web:dev`, then force-recreated only `router-gateway` and `integrations-web` inside the live compose stack to replay the defect on patched runtime images. | Developer | +| 2026-03-07 | Live authenticated Playwright on `/setup/integrations`, `/setup/integrations/secrets`, `/setup/integrations/int-1`, and `/setup/integrations/onboarding/host` confirmed the SPA's own `/api/v1/integrations*` calls were returning `200` for list views, the missing-detail route rendered the intended explicit unavailable state with working back-navigation, and the host-provider selection advanced into the authentication step without stalling. | QA | +| 2026-03-07 | Timestamped Router logs showed the rebuilt `integrations-0eabab6a4e63421c9aa943f` instance re-HELLO at `2026-03-07T01:41:33Z` with the same logical connection id `a4627760b78c48228e62007d925df22a` first registered at `2026-03-07T01:36:41Z`, confirming the duplicate-registration defect is fixed for rebuilt clients. | QA | + +## Decisions & Risks +- Decision: this sprint stays inside `src/Router` plus required sprint/task-board updates only. +- Decision: the permanent fix must cover both sides of the behavior: the microservice transport must stop creating duplicate logical connections, and gateway routing state must fail safe when an older client re-registers anyway. +- Decision: targeted Router evidence uses the xUnit v3 test assembly executables (`*.Tests.exe -class ...`) because these projects run on Microsoft Testing Platform and ignore `dotnet test --filter`, which would otherwise hide the new regressions inside a full-suite pass count. +- Decision: raw unauthenticated probe calls to `/api/v1/integrations*` are not accepted as UI evidence for this sprint because the SPA attaches authenticated context differently than a naked fetch; live validation is based on Playwright-observed browser traffic plus Router logs. +- Risk: `docs/modules/gateway/architecture.md` and `docs/modules/gateway/openapi.md` are referenced by module charters but do not exist at the expected paths in this repo snapshot. +- Mitigation: follow the available canonical Router dossiers (`docs/modules/router/**`) and record the missing gateway doc paths here instead of inventing replacements during the bug-fix iteration. +- Risk: other long-running services in the current compose stack may also be using the same older Router transport behavior. +- Mitigation: harden gateway-side dedupe so the live stack benefits immediately after the targeted gateway rebuild even before every service image is refreshed. +- Risk: the shared client-side transport fix is live only in the rebuilt images from this sprint, so other services that still run older images can continue to mint fresh connection ids until later rollout iterations rebuild them. + +## Next Checkpoints +- 2026-03-07: rebuild the next scoped batch of messaging-client services so the shared transport fix rolls beyond the integrations path without attempting a full compose rebuild. +- 2026-03-07: continue Playwright-first page/action sweeps to surface the next live defect once the router registration churn for rebuilt services is confirmed clean. diff --git a/docs/modules/router/architecture.md b/docs/modules/router/architecture.md index 5df566e4b..7d620bab0 100644 --- a/docs/modules/router/architecture.md +++ b/docs/modules/router/architecture.md @@ -533,6 +533,8 @@ Gateway tracks: - Marks stale instances as Unhealthy - Uses health in routing decisions +Periodic HELLO re-registration is valid so a microservice can repopulate gateway state after a gateway restart, but it must refresh the existing logical transport connection instead of minting a second one. Gateway routing state also deduplicates by service instance identity (`ServiceName`, `Version`, `InstanceId`, transport) before re-indexing endpoints so repeated HELLO frames cannot accumulate stale route candidates. + --- ## Configuration diff --git a/src/Router/__Libraries/StellaOps.Router.Gateway/State/InMemoryRoutingState.cs b/src/Router/__Libraries/StellaOps.Router.Gateway/State/InMemoryRoutingState.cs index e0a5018fb..3ca294d6b 100644 --- a/src/Router/__Libraries/StellaOps.Router.Gateway/State/InMemoryRoutingState.cs +++ b/src/Router/__Libraries/StellaOps.Router.Gateway/State/InMemoryRoutingState.cs @@ -19,49 +19,45 @@ internal sealed class InMemoryRoutingState : IGlobalRoutingState /// public void AddConnection(ConnectionState connection) { - _connections[connection.ConnectionId] = connection; - - // Index all endpoints - foreach (var endpoint in connection.Endpoints.Values) + lock (_indexLock) { - var key = (endpoint.Method, endpoint.Path); + RemoveConnectionInternal(connection.ConnectionId); - // Add to endpoint index - var connectionIds = _endpointIndex.GetOrAdd(key, _ => []); - connectionIds.Add(connection.ConnectionId); + var staleConnectionIds = _connections.Values + .Where(existing => + existing.ConnectionId != connection.ConnectionId && + IsSameServiceInstance(existing, connection)) + .Select(existing => existing.ConnectionId) + .ToList(); - // Create path matcher if not exists - _pathMatchers.GetOrAdd(key, _ => new PathMatcher(endpoint.Path)); + foreach (var staleConnectionId in staleConnectionIds) + { + RemoveConnectionInternal(staleConnectionId); + } + + _connections[connection.ConnectionId] = connection; + + // Index all endpoints + foreach (var endpoint in connection.Endpoints.Values) + { + var key = (endpoint.Method, endpoint.Path); + + // Add to endpoint index + var connectionIds = _endpointIndex.GetOrAdd(key, _ => []); + connectionIds.Add(connection.ConnectionId); + + // Create path matcher if not exists + _pathMatchers.GetOrAdd(key, _ => new PathMatcher(endpoint.Path)); + } } } /// public void RemoveConnection(string connectionId) { - if (_connections.TryRemove(connectionId, out var connection)) + lock (_indexLock) { - // Remove from endpoint index - foreach (var endpoint in connection.Endpoints.Values) - { - var key = (endpoint.Method, endpoint.Path); - if (_endpointIndex.TryGetValue(key, out var connectionIds)) - { - // ConcurrentBag doesn't support removal, so we need to rebuild - lock (_indexLock) - { - var remaining = connectionIds.Where(id => id != connectionId).ToList(); - if (remaining.Count == 0) - { - _endpointIndex.TryRemove(key, out _); - _pathMatchers.TryRemove(key, out _); - } - else - { - _endpointIndex[key] = new ConcurrentBag(remaining); - } - } - } - } + RemoveConnectionInternal(connectionId); } } @@ -100,7 +96,7 @@ internal sealed class InMemoryRoutingState : IGlobalRoutingState // Get first connection with this endpoint if (_endpointIndex.TryGetValue((m, p), out var connectionIds)) { - foreach (var connectionId in connectionIds) + foreach (var connectionId in connectionIds.Distinct(StringComparer.Ordinal)) { if (_connections.TryGetValue(connectionId, out var conn) && conn.Endpoints.TryGetValue((m, p), out var endpoint)) @@ -135,7 +131,7 @@ internal sealed class InMemoryRoutingState : IGlobalRoutingState if (!_endpointIndex.TryGetValue((m, p), out var connectionIds)) continue; - foreach (var connectionId in connectionIds) + foreach (var connectionId in connectionIds.Distinct(StringComparer.Ordinal)) { if (!_connections.TryGetValue(connectionId, out var conn)) continue; @@ -157,4 +153,45 @@ internal sealed class InMemoryRoutingState : IGlobalRoutingState return result; } + + private void RemoveConnectionInternal(string connectionId) + { + if (!_connections.TryRemove(connectionId, out var connection)) + { + return; + } + + foreach (var endpoint in connection.Endpoints.Values) + { + var key = (endpoint.Method, endpoint.Path); + if (!_endpointIndex.TryGetValue(key, out var connectionIds)) + { + continue; + } + + var remaining = connectionIds + .Where(id => !string.Equals(id, connectionId, StringComparison.Ordinal)) + .Distinct(StringComparer.Ordinal) + .ToList(); + + if (remaining.Count == 0) + { + _endpointIndex.TryRemove(key, out _); + _pathMatchers.TryRemove(key, out _); + } + else + { + _endpointIndex[key] = new ConcurrentBag(remaining); + } + } + } + + private static bool IsSameServiceInstance(ConnectionState existing, ConnectionState candidate) + { + return existing.TransportType == candidate.TransportType && + string.Equals(existing.Instance.ServiceName, candidate.Instance.ServiceName, StringComparison.OrdinalIgnoreCase) && + string.Equals(existing.Instance.Version, candidate.Instance.Version, StringComparison.Ordinal) && + string.Equals(existing.Instance.InstanceId, candidate.Instance.InstanceId, StringComparison.Ordinal) && + string.Equals(existing.Instance.Region, candidate.Instance.Region, StringComparison.OrdinalIgnoreCase); + } } diff --git a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportClient.cs b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportClient.cs index 638f6443e..704bfe13b 100644 --- a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportClient.cs +++ b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/MessagingTransportClient.cs @@ -27,7 +27,8 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr private readonly CorrelationTracker _correlationTracker; private readonly JsonSerializerOptions _jsonOptions; private readonly ConcurrentDictionary _inflightHandlers = new(); - private readonly CancellationTokenSource _clientCts = new(); + private readonly SemaphoreSlim _lifecycleGate = new(1, 1); + private CancellationTokenSource _connectionCts = new(); private IMessageQueue? _requestQueue; private IMessageQueue? _responseQueue; private IMessageQueue? _serviceIncomingQueue; @@ -87,64 +88,57 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr { ObjectDisposedException.ThrowIf(_disposed, this); - _connectionId = Guid.NewGuid().ToString("N"); - _instance = instance; + await _lifecycleGate.WaitAsync(cancellationToken); - // Create request queue (for sending to gateway) - _requestQueue = _queueFactory.Create(new MessageQueueOptions + try { - QueueName = _options.GetGatewayControlQueueName(), - ConsumerGroup = _options.ConsumerGroup, - ConsumerName = $"{instance.ServiceName}-{instance.InstanceId}" - }); + _instance = instance; + var openedFreshConnection = EnsureConnectionInfrastructure(instance); - // Create response queue (for receiving gateway responses) - _responseQueue = _queueFactory.Create(new MessageQueueOptions + // Send HELLO frame + var helloPayload = new HelloPayload + { + Instance = instance, + Endpoints = endpoints, + Schemas = schemas ?? new Dictionary(), + OpenApiInfo = openApiInfo + }; + + var helloMessage = new RpcRequestMessage + { + CorrelationId = Guid.NewGuid().ToString("N"), + ConnectionId = _connectionId!, + TargetService = "gateway", + FrameType = FrameType.Hello, + PayloadBase64 = Convert.ToBase64String(JsonSerializer.SerializeToUtf8Bytes(helloPayload, _jsonOptions)), + SenderInstanceId = instance.InstanceId + }; + + await _requestQueue!.EnqueueAsync(helloMessage, cancellationToken: cancellationToken); + + if (openedFreshConnection) + { + _logger.LogInformation( + "Connected as {ServiceName}/{Version} instance {InstanceId} with {EndpointCount} endpoints via messaging", + instance.ServiceName, + instance.Version, + instance.InstanceId, + endpoints.Count); + } + else + { + _logger.LogDebug( + "Refreshed messaging registration for {ServiceName}/{Version} instance {InstanceId} on connection {ConnectionId}", + instance.ServiceName, + instance.Version, + instance.InstanceId, + _connectionId); + } + } + finally { - QueueName = _options.ResponseQueueName, - ConsumerGroup = _options.ConsumerGroup, - ConsumerName = $"{instance.ServiceName}-{instance.InstanceId}" - }); - - // Create service-specific incoming queue (for receiving requests from gateway) - _serviceIncomingQueue = _queueFactory.Create(new MessageQueueOptions - { - QueueName = _options.GetRequestQueueName(instance.ServiceName), - ConsumerGroup = _options.ConsumerGroup, - ConsumerName = $"{instance.ServiceName}-{instance.InstanceId}", - DefaultLeaseDuration = _options.LeaseDuration - }); - - // Send HELLO frame - var helloPayload = new HelloPayload - { - Instance = instance, - Endpoints = endpoints, - Schemas = schemas ?? new Dictionary(), - OpenApiInfo = openApiInfo - }; - - var helloMessage = new RpcRequestMessage - { - CorrelationId = Guid.NewGuid().ToString("N"), - ConnectionId = _connectionId, - TargetService = "gateway", - FrameType = FrameType.Hello, - PayloadBase64 = Convert.ToBase64String(JsonSerializer.SerializeToUtf8Bytes(helloPayload, _jsonOptions)), - SenderInstanceId = instance.InstanceId - }; - - await _requestQueue.EnqueueAsync(helloMessage, cancellationToken: cancellationToken); - - _logger.LogInformation( - "Connected as {ServiceName}/{Version} instance {InstanceId} with {EndpointCount} endpoints via messaging", - instance.ServiceName, - instance.Version, - instance.InstanceId, - endpoints.Count); - - // Start receiving responses and requests - _receiveTask = Task.Run(() => ReceiveLoopAsync(_clientCts.Token), CancellationToken.None); + _lifecycleGate.Release(); + } } private async Task ReceiveLoopAsync(CancellationToken cancellationToken) @@ -569,28 +563,40 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr /// public async Task DisconnectAsync() { - if (_connectionId is null) return; + await _lifecycleGate.WaitAsync(); - // Cancel all inflight handlers - foreach (var kvp in _inflightHandlers) + try { - try { kvp.Value.Cancel(); } - catch (ObjectDisposedException) { } + if (_connectionId is null && _receiveTask is null) + { + return; + } + + CancelInflightHandlers(); + await _connectionCts.CancelAsync(); + + if (_receiveTask is not null) + { + try { await _receiveTask; } + catch (OperationCanceledException) { } + } + + _connectionCts.Dispose(); + _connectionCts = new CancellationTokenSource(); + _requestQueue = null; + _responseQueue = null; + _serviceIncomingQueue = null; + _receiveTask = null; + _connectionId = null; + _instance = null; + _transportDead = false; + + _logger.LogInformation("Disconnected from messaging transport"); } - _inflightHandlers.Clear(); - - await _clientCts.CancelAsync(); - - if (_receiveTask is not null) + finally { - try { await _receiveTask; } - catch (OperationCanceledException) { } + _lifecycleGate.Release(); } - - _connectionId = null; - _instance = null; - - _logger.LogInformation("Disconnected from messaging transport"); } private static Frame DecodeFrame(FrameType frameType, string? correlationId, string payloadBase64) @@ -611,15 +617,90 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr if (_disposed) return; _disposed = true; - foreach (var kvp in _inflightHandlers) - { - try { kvp.Value.Cancel(); } - catch (ObjectDisposedException) { } - } - _inflightHandlers.Clear(); - - _clientCts.Cancel(); - _clientCts.Dispose(); + CancelInflightHandlers(); + _connectionCts.Cancel(); + _connectionCts.Dispose(); + _lifecycleGate.Dispose(); _correlationTracker.Dispose(); } + + private bool EnsureConnectionInfrastructure(InstanceDescriptor instance) + { + var requiresFreshConnection = + _connectionId is null || + _requestQueue is null || + _responseQueue is null || + _serviceIncomingQueue is null || + _receiveTask is null || + _receiveTask.IsCompleted || + _transportDead || + _connectionCts.IsCancellationRequested; + + if (!requiresFreshConnection) + { + return false; + } + + if (_connectionCts.IsCancellationRequested) + { + _connectionCts.Dispose(); + _connectionCts = new CancellationTokenSource(); + } + + _connectionId ??= Guid.NewGuid().ToString("N"); + + _requestQueue = CreateRequestQueue(instance); + _responseQueue = CreateResponseQueue(instance); + _serviceIncomingQueue = CreateServiceIncomingQueue(instance); + _transportDead = false; + _receiveTask = Task.Run(() => ReceiveLoopAsync(_connectionCts.Token), CancellationToken.None); + + return true; + } + + private IMessageQueue CreateRequestQueue(InstanceDescriptor instance) + { + return _queueFactory.Create(new MessageQueueOptions + { + QueueName = _options.GetGatewayControlQueueName(), + ConsumerGroup = _options.ConsumerGroup, + ConsumerName = $"{instance.ServiceName}-{instance.InstanceId}" + }); + } + + private IMessageQueue CreateResponseQueue(InstanceDescriptor instance) + { + return _queueFactory.Create(new MessageQueueOptions + { + QueueName = _options.ResponseQueueName, + ConsumerGroup = _options.ConsumerGroup, + ConsumerName = $"{instance.ServiceName}-{instance.InstanceId}" + }); + } + + private IMessageQueue CreateServiceIncomingQueue(InstanceDescriptor instance) + { + return _queueFactory.Create(new MessageQueueOptions + { + QueueName = _options.GetRequestQueueName(instance.ServiceName), + ConsumerGroup = _options.ConsumerGroup, + ConsumerName = $"{instance.ServiceName}-{instance.InstanceId}", + DefaultLeaseDuration = _options.LeaseDuration + }); + } + + private void CancelInflightHandlers() + { + foreach (var kvp in _inflightHandlers) + { + try + { + kvp.Value.Cancel(); + kvp.Value.Dispose(); + } + catch (ObjectDisposedException) { } + } + + _inflightHandlers.Clear(); + } } diff --git a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/TASKS.md b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/TASKS.md index 3ce3cc709..7b20c8179 100644 --- a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/TASKS.md +++ b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/TASKS.md @@ -4,6 +4,7 @@ Source of truth: `docs/implplan/SPRINT_20260130_002_Tools_csproj_remediation_sol | Task ID | Status | Notes | | --- | --- | --- | +| RTR-MSG-002 | DONE | `docs/implplan/SPRINT_20260307_008_Router_integrations_messaging_reregistration_stability.md` - periodic HELLO re-registration now reuses the existing messaging connection/receive loop instead of minting duplicate logical connections. | | RVM-06 | DONE | Updated messaging microservice HELLO payload to include schemas/OpenAPI metadata via the new schema-aware `IMicroserviceTransport.ConnectAsync(...)` overload. | | REMED-05 | TODO | Remediation checklist: docs/implplan/audits/csproj-standards/remediation/checklists/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/StellaOps.Router.Transport.Messaging.md. | | REMED-06 | DONE | SOLID review notes captured for SPRINT_20260130_002. | diff --git a/src/Router/__Tests/StellaOps.Router.Common.Tests/MessagingTransportQueueOptionsTests.cs b/src/Router/__Tests/StellaOps.Router.Common.Tests/MessagingTransportQueueOptionsTests.cs index 14cbac714..e102573cf 100644 --- a/src/Router/__Tests/StellaOps.Router.Common.Tests/MessagingTransportQueueOptionsTests.cs +++ b/src/Router/__Tests/StellaOps.Router.Common.Tests/MessagingTransportQueueOptionsTests.cs @@ -158,6 +158,56 @@ public sealed class MessagingTransportQueueOptionsTests payload.OpenApiInfo!.Title.Should().Be("timelineindexer"); } + [Trait("Category", TestCategories.Unit)] + [Fact] + public async Task MessagingTransportClient_ConnectAsync_WhenAlreadyConnected_ReusesQueuesAndLogicalConnection() + { + var options = Options.Create(new MessagingTransportOptions + { + ConsumerGroup = "timelineindexer-test", + BatchSize = 1 + }); + + var queueFactory = new RecordingQueueFactory(); + var client = new MessagingTransportClient( + queueFactory, + options, + NullLogger.Instance); + + var instance = new InstanceDescriptor + { + InstanceId = "timelineindexer-1", + ServiceName = "timelineindexer", + Version = "1.0.0", + Region = "local" + }; + + EndpointDescriptor[] endpoints = + [ + new EndpointDescriptor + { + ServiceName = "timelineindexer", + Version = "1.0.0", + Method = "GET", + Path = "/api/v1/timeline" + } + ]; + + await client.ConnectAsync(instance, endpoints, CancellationToken.None); + await client.ConnectAsync(instance, endpoints, CancellationToken.None); + await client.DisconnectAsync(); + + queueFactory.CreatedQueues.Should().HaveCount(3); + + var helloMessages = queueFactory.EnqueuedMessages + .OfType() + .Where(message => message.FrameType == Common.Enums.FrameType.Hello) + .ToList(); + + helloMessages.Should().HaveCount(2); + helloMessages.Select(message => message.ConnectionId).Distinct().Should().ContainSingle(); + } + [Trait("Category", TestCategories.Unit)] [Fact] public void MessagingTransportOptions_DefaultControlQueue_DoesNotCollideWithGatewayServiceQueue() diff --git a/src/Router/__Tests/StellaOps.Router.Common.Tests/TASKS.md b/src/Router/__Tests/StellaOps.Router.Common.Tests/TASKS.md index 19696a0b0..9c847724b 100644 --- a/src/Router/__Tests/StellaOps.Router.Common.Tests/TASKS.md +++ b/src/Router/__Tests/StellaOps.Router.Common.Tests/TASKS.md @@ -4,6 +4,7 @@ Source of truth: `docs/implplan/SPRINT_20260130_002_Tools_csproj_remediation_sol | Task ID | Status | Notes | | --- | --- | --- | +| RTR-MSG-002 | DONE | `docs/implplan/SPRINT_20260307_008_Router_integrations_messaging_reregistration_stability.md` - deterministic transport regressions now cover HELLO re-registration reuse via the xUnit v3 runner. | | RVM-06 | DONE | Added messaging HELLO payload coverage to verify schema/openapi metadata propagation from microservice transport client. | | REMED-05 | TODO | Remediation checklist: docs/implplan/audits/csproj-standards/remediation/checklists/src/Router/__Tests/StellaOps.Router.Common.Tests/StellaOps.Router.Common.Tests.md. | | REMED-06 | DONE | SOLID review notes captured for SPRINT_20260130_002. | diff --git a/src/Router/__Tests/StellaOps.Router.Gateway.Tests/Routing/InMemoryRoutingStateTests.cs b/src/Router/__Tests/StellaOps.Router.Gateway.Tests/Routing/InMemoryRoutingStateTests.cs new file mode 100644 index 000000000..ae9691a33 --- /dev/null +++ b/src/Router/__Tests/StellaOps.Router.Gateway.Tests/Routing/InMemoryRoutingStateTests.cs @@ -0,0 +1,86 @@ +using FluentAssertions; +using StellaOps.Router.Common.Enums; +using StellaOps.Router.Common.Models; +using StellaOps.Router.Gateway.State; + +namespace StellaOps.Router.Gateway.Tests.Routing; + +[Trait("Category", "Unit")] +public sealed class InMemoryRoutingStateTests +{ + [Fact] + public void AddConnection_WhenSameInstanceReconnects_ReplacesTheStaleConnection() + { + var state = new InMemoryRoutingState(); + var staleConnection = CreateConnection( + connectionId: "conn-stale", + instanceId: "integrations-1", + endpointPath: "/api/v1/integrations"); + var refreshedConnection = CreateConnection( + connectionId: "conn-fresh", + instanceId: "integrations-1", + endpointPath: "/api/v1/integrations"); + + state.AddConnection(staleConnection); + state.AddConnection(refreshedConnection); + + state.GetAllConnections().Should().ContainSingle() + .Which.ConnectionId.Should().Be("conn-fresh"); + state.GetConnection("conn-stale").Should().BeNull(); + state.GetConnectionsFor("integrations", "1.0.0", "GET", "/api/v1/integrations") + .Should().ContainSingle() + .Which.ConnectionId.Should().Be("conn-fresh"); + } + + [Fact] + public void AddConnection_WhenSameConnectionIdRefreshes_RebuildsEndpointIndexes() + { + var state = new InMemoryRoutingState(); + var originalConnection = CreateConnection( + connectionId: "conn-1", + instanceId: "integrations-1", + endpointPath: "/api/v1/integrations"); + var refreshedConnection = CreateConnection( + connectionId: "conn-1", + instanceId: "integrations-1", + endpointPath: "/api/v1/integrations/runtime-hosts"); + + state.AddConnection(originalConnection); + state.AddConnection(refreshedConnection); + + state.ResolveEndpoint("GET", "/api/v1/integrations").Should().BeNull(); + state.GetConnectionsFor("integrations", "1.0.0", "GET", "/api/v1/integrations/runtime-hosts") + .Should().ContainSingle() + .Which.ConnectionId.Should().Be("conn-1"); + } + + private static ConnectionState CreateConnection( + string connectionId, + string instanceId, + string endpointPath) + { + var endpoint = new EndpointDescriptor + { + ServiceName = "integrations", + Version = "1.0.0", + Method = "GET", + Path = endpointPath + }; + + var connection = new ConnectionState + { + ConnectionId = connectionId, + Instance = new InstanceDescriptor + { + InstanceId = instanceId, + ServiceName = "integrations", + Version = "1.0.0", + Region = "local" + }, + TransportType = TransportType.Messaging + }; + + connection.Endpoints[(endpoint.Method, endpoint.Path)] = endpoint; + return connection; + } +} diff --git a/src/Router/__Tests/StellaOps.Router.Gateway.Tests/TASKS.md b/src/Router/__Tests/StellaOps.Router.Gateway.Tests/TASKS.md index eb2831f10..3aaa96e1c 100644 --- a/src/Router/__Tests/StellaOps.Router.Gateway.Tests/TASKS.md +++ b/src/Router/__Tests/StellaOps.Router.Gateway.Tests/TASKS.md @@ -4,5 +4,6 @@ Source of truth: `docs/implplan/SPRINT_20260130_002_Tools_csproj_remediation_sol | Task ID | Status | Notes | | --- | --- | --- | +| RTR-MSG-002 | DONE | `docs/implplan/SPRINT_20260307_008_Router_integrations_messaging_reregistration_stability.md` - routing-state regressions now cover stale same-instance registration replacement and endpoint index rebuilds. | | REMED-05 | TODO | Remediation checklist: docs/implplan/audits/csproj-standards/remediation/checklists/src/Router/__Tests/StellaOps.Router.Gateway.Tests/StellaOps.Router.Gateway.Tests.md. | | REMED-06 | DONE | SOLID review notes captured for SPRINT_20260130_002. |