Fix router messaging re-registration stability

This commit is contained in:
master
2026-03-07 03:48:46 +02:00
parent 28932d4a85
commit 2ff0e1f86b
9 changed files with 466 additions and 116 deletions

View File

@@ -19,49 +19,45 @@ internal sealed class InMemoryRoutingState : IGlobalRoutingState
/// <inheritdoc />
public void AddConnection(ConnectionState connection)
{
_connections[connection.ConnectionId] = connection;
// Index all endpoints
foreach (var endpoint in connection.Endpoints.Values)
lock (_indexLock)
{
var key = (endpoint.Method, endpoint.Path);
RemoveConnectionInternal(connection.ConnectionId);
// Add to endpoint index
var connectionIds = _endpointIndex.GetOrAdd(key, _ => []);
connectionIds.Add(connection.ConnectionId);
var staleConnectionIds = _connections.Values
.Where(existing =>
existing.ConnectionId != connection.ConnectionId &&
IsSameServiceInstance(existing, connection))
.Select(existing => existing.ConnectionId)
.ToList();
// Create path matcher if not exists
_pathMatchers.GetOrAdd(key, _ => new PathMatcher(endpoint.Path));
foreach (var staleConnectionId in staleConnectionIds)
{
RemoveConnectionInternal(staleConnectionId);
}
_connections[connection.ConnectionId] = connection;
// Index all endpoints
foreach (var endpoint in connection.Endpoints.Values)
{
var key = (endpoint.Method, endpoint.Path);
// Add to endpoint index
var connectionIds = _endpointIndex.GetOrAdd(key, _ => []);
connectionIds.Add(connection.ConnectionId);
// Create path matcher if not exists
_pathMatchers.GetOrAdd(key, _ => new PathMatcher(endpoint.Path));
}
}
}
/// <inheritdoc />
public void RemoveConnection(string connectionId)
{
if (_connections.TryRemove(connectionId, out var connection))
lock (_indexLock)
{
// Remove from endpoint index
foreach (var endpoint in connection.Endpoints.Values)
{
var key = (endpoint.Method, endpoint.Path);
if (_endpointIndex.TryGetValue(key, out var connectionIds))
{
// ConcurrentBag doesn't support removal, so we need to rebuild
lock (_indexLock)
{
var remaining = connectionIds.Where(id => id != connectionId).ToList();
if (remaining.Count == 0)
{
_endpointIndex.TryRemove(key, out _);
_pathMatchers.TryRemove(key, out _);
}
else
{
_endpointIndex[key] = new ConcurrentBag<string>(remaining);
}
}
}
}
RemoveConnectionInternal(connectionId);
}
}
@@ -100,7 +96,7 @@ internal sealed class InMemoryRoutingState : IGlobalRoutingState
// Get first connection with this endpoint
if (_endpointIndex.TryGetValue((m, p), out var connectionIds))
{
foreach (var connectionId in connectionIds)
foreach (var connectionId in connectionIds.Distinct(StringComparer.Ordinal))
{
if (_connections.TryGetValue(connectionId, out var conn) &&
conn.Endpoints.TryGetValue((m, p), out var endpoint))
@@ -135,7 +131,7 @@ internal sealed class InMemoryRoutingState : IGlobalRoutingState
if (!_endpointIndex.TryGetValue((m, p), out var connectionIds))
continue;
foreach (var connectionId in connectionIds)
foreach (var connectionId in connectionIds.Distinct(StringComparer.Ordinal))
{
if (!_connections.TryGetValue(connectionId, out var conn))
continue;
@@ -157,4 +153,45 @@ internal sealed class InMemoryRoutingState : IGlobalRoutingState
return result;
}
private void RemoveConnectionInternal(string connectionId)
{
if (!_connections.TryRemove(connectionId, out var connection))
{
return;
}
foreach (var endpoint in connection.Endpoints.Values)
{
var key = (endpoint.Method, endpoint.Path);
if (!_endpointIndex.TryGetValue(key, out var connectionIds))
{
continue;
}
var remaining = connectionIds
.Where(id => !string.Equals(id, connectionId, StringComparison.Ordinal))
.Distinct(StringComparer.Ordinal)
.ToList();
if (remaining.Count == 0)
{
_endpointIndex.TryRemove(key, out _);
_pathMatchers.TryRemove(key, out _);
}
else
{
_endpointIndex[key] = new ConcurrentBag<string>(remaining);
}
}
}
private static bool IsSameServiceInstance(ConnectionState existing, ConnectionState candidate)
{
return existing.TransportType == candidate.TransportType &&
string.Equals(existing.Instance.ServiceName, candidate.Instance.ServiceName, StringComparison.OrdinalIgnoreCase) &&
string.Equals(existing.Instance.Version, candidate.Instance.Version, StringComparison.Ordinal) &&
string.Equals(existing.Instance.InstanceId, candidate.Instance.InstanceId, StringComparison.Ordinal) &&
string.Equals(existing.Instance.Region, candidate.Instance.Region, StringComparison.OrdinalIgnoreCase);
}
}

View File

@@ -27,7 +27,8 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
private readonly CorrelationTracker _correlationTracker;
private readonly JsonSerializerOptions _jsonOptions;
private readonly ConcurrentDictionary<string, CancellationTokenSource> _inflightHandlers = new();
private readonly CancellationTokenSource _clientCts = new();
private readonly SemaphoreSlim _lifecycleGate = new(1, 1);
private CancellationTokenSource _connectionCts = new();
private IMessageQueue<RpcRequestMessage>? _requestQueue;
private IMessageQueue<RpcResponseMessage>? _responseQueue;
private IMessageQueue<RpcResponseMessage>? _serviceIncomingQueue;
@@ -87,64 +88,57 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
{
ObjectDisposedException.ThrowIf(_disposed, this);
_connectionId = Guid.NewGuid().ToString("N");
_instance = instance;
await _lifecycleGate.WaitAsync(cancellationToken);
// Create request queue (for sending to gateway)
_requestQueue = _queueFactory.Create<RpcRequestMessage>(new MessageQueueOptions
try
{
QueueName = _options.GetGatewayControlQueueName(),
ConsumerGroup = _options.ConsumerGroup,
ConsumerName = $"{instance.ServiceName}-{instance.InstanceId}"
});
_instance = instance;
var openedFreshConnection = EnsureConnectionInfrastructure(instance);
// Create response queue (for receiving gateway responses)
_responseQueue = _queueFactory.Create<RpcResponseMessage>(new MessageQueueOptions
// Send HELLO frame
var helloPayload = new HelloPayload
{
Instance = instance,
Endpoints = endpoints,
Schemas = schemas ?? new Dictionary<string, SchemaDefinition>(),
OpenApiInfo = openApiInfo
};
var helloMessage = new RpcRequestMessage
{
CorrelationId = Guid.NewGuid().ToString("N"),
ConnectionId = _connectionId!,
TargetService = "gateway",
FrameType = FrameType.Hello,
PayloadBase64 = Convert.ToBase64String(JsonSerializer.SerializeToUtf8Bytes(helloPayload, _jsonOptions)),
SenderInstanceId = instance.InstanceId
};
await _requestQueue!.EnqueueAsync(helloMessage, cancellationToken: cancellationToken);
if (openedFreshConnection)
{
_logger.LogInformation(
"Connected as {ServiceName}/{Version} instance {InstanceId} with {EndpointCount} endpoints via messaging",
instance.ServiceName,
instance.Version,
instance.InstanceId,
endpoints.Count);
}
else
{
_logger.LogDebug(
"Refreshed messaging registration for {ServiceName}/{Version} instance {InstanceId} on connection {ConnectionId}",
instance.ServiceName,
instance.Version,
instance.InstanceId,
_connectionId);
}
}
finally
{
QueueName = _options.ResponseQueueName,
ConsumerGroup = _options.ConsumerGroup,
ConsumerName = $"{instance.ServiceName}-{instance.InstanceId}"
});
// Create service-specific incoming queue (for receiving requests from gateway)
_serviceIncomingQueue = _queueFactory.Create<RpcResponseMessage>(new MessageQueueOptions
{
QueueName = _options.GetRequestQueueName(instance.ServiceName),
ConsumerGroup = _options.ConsumerGroup,
ConsumerName = $"{instance.ServiceName}-{instance.InstanceId}",
DefaultLeaseDuration = _options.LeaseDuration
});
// Send HELLO frame
var helloPayload = new HelloPayload
{
Instance = instance,
Endpoints = endpoints,
Schemas = schemas ?? new Dictionary<string, SchemaDefinition>(),
OpenApiInfo = openApiInfo
};
var helloMessage = new RpcRequestMessage
{
CorrelationId = Guid.NewGuid().ToString("N"),
ConnectionId = _connectionId,
TargetService = "gateway",
FrameType = FrameType.Hello,
PayloadBase64 = Convert.ToBase64String(JsonSerializer.SerializeToUtf8Bytes(helloPayload, _jsonOptions)),
SenderInstanceId = instance.InstanceId
};
await _requestQueue.EnqueueAsync(helloMessage, cancellationToken: cancellationToken);
_logger.LogInformation(
"Connected as {ServiceName}/{Version} instance {InstanceId} with {EndpointCount} endpoints via messaging",
instance.ServiceName,
instance.Version,
instance.InstanceId,
endpoints.Count);
// Start receiving responses and requests
_receiveTask = Task.Run(() => ReceiveLoopAsync(_clientCts.Token), CancellationToken.None);
_lifecycleGate.Release();
}
}
private async Task ReceiveLoopAsync(CancellationToken cancellationToken)
@@ -569,28 +563,40 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
/// <inheritdoc />
public async Task DisconnectAsync()
{
if (_connectionId is null) return;
await _lifecycleGate.WaitAsync();
// Cancel all inflight handlers
foreach (var kvp in _inflightHandlers)
try
{
try { kvp.Value.Cancel(); }
catch (ObjectDisposedException) { }
if (_connectionId is null && _receiveTask is null)
{
return;
}
CancelInflightHandlers();
await _connectionCts.CancelAsync();
if (_receiveTask is not null)
{
try { await _receiveTask; }
catch (OperationCanceledException) { }
}
_connectionCts.Dispose();
_connectionCts = new CancellationTokenSource();
_requestQueue = null;
_responseQueue = null;
_serviceIncomingQueue = null;
_receiveTask = null;
_connectionId = null;
_instance = null;
_transportDead = false;
_logger.LogInformation("Disconnected from messaging transport");
}
_inflightHandlers.Clear();
await _clientCts.CancelAsync();
if (_receiveTask is not null)
finally
{
try { await _receiveTask; }
catch (OperationCanceledException) { }
_lifecycleGate.Release();
}
_connectionId = null;
_instance = null;
_logger.LogInformation("Disconnected from messaging transport");
}
private static Frame DecodeFrame(FrameType frameType, string? correlationId, string payloadBase64)
@@ -611,15 +617,90 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
if (_disposed) return;
_disposed = true;
foreach (var kvp in _inflightHandlers)
{
try { kvp.Value.Cancel(); }
catch (ObjectDisposedException) { }
}
_inflightHandlers.Clear();
_clientCts.Cancel();
_clientCts.Dispose();
CancelInflightHandlers();
_connectionCts.Cancel();
_connectionCts.Dispose();
_lifecycleGate.Dispose();
_correlationTracker.Dispose();
}
private bool EnsureConnectionInfrastructure(InstanceDescriptor instance)
{
var requiresFreshConnection =
_connectionId is null ||
_requestQueue is null ||
_responseQueue is null ||
_serviceIncomingQueue is null ||
_receiveTask is null ||
_receiveTask.IsCompleted ||
_transportDead ||
_connectionCts.IsCancellationRequested;
if (!requiresFreshConnection)
{
return false;
}
if (_connectionCts.IsCancellationRequested)
{
_connectionCts.Dispose();
_connectionCts = new CancellationTokenSource();
}
_connectionId ??= Guid.NewGuid().ToString("N");
_requestQueue = CreateRequestQueue(instance);
_responseQueue = CreateResponseQueue(instance);
_serviceIncomingQueue = CreateServiceIncomingQueue(instance);
_transportDead = false;
_receiveTask = Task.Run(() => ReceiveLoopAsync(_connectionCts.Token), CancellationToken.None);
return true;
}
private IMessageQueue<RpcRequestMessage> CreateRequestQueue(InstanceDescriptor instance)
{
return _queueFactory.Create<RpcRequestMessage>(new MessageQueueOptions
{
QueueName = _options.GetGatewayControlQueueName(),
ConsumerGroup = _options.ConsumerGroup,
ConsumerName = $"{instance.ServiceName}-{instance.InstanceId}"
});
}
private IMessageQueue<RpcResponseMessage> CreateResponseQueue(InstanceDescriptor instance)
{
return _queueFactory.Create<RpcResponseMessage>(new MessageQueueOptions
{
QueueName = _options.ResponseQueueName,
ConsumerGroup = _options.ConsumerGroup,
ConsumerName = $"{instance.ServiceName}-{instance.InstanceId}"
});
}
private IMessageQueue<RpcResponseMessage> CreateServiceIncomingQueue(InstanceDescriptor instance)
{
return _queueFactory.Create<RpcResponseMessage>(new MessageQueueOptions
{
QueueName = _options.GetRequestQueueName(instance.ServiceName),
ConsumerGroup = _options.ConsumerGroup,
ConsumerName = $"{instance.ServiceName}-{instance.InstanceId}",
DefaultLeaseDuration = _options.LeaseDuration
});
}
private void CancelInflightHandlers()
{
foreach (var kvp in _inflightHandlers)
{
try
{
kvp.Value.Cancel();
kvp.Value.Dispose();
}
catch (ObjectDisposedException) { }
}
_inflightHandlers.Clear();
}
}

View File

@@ -4,6 +4,7 @@ Source of truth: `docs/implplan/SPRINT_20260130_002_Tools_csproj_remediation_sol
| Task ID | Status | Notes |
| --- | --- | --- |
| RTR-MSG-002 | DONE | `docs/implplan/SPRINT_20260307_008_Router_integrations_messaging_reregistration_stability.md` - periodic HELLO re-registration now reuses the existing messaging connection/receive loop instead of minting duplicate logical connections. |
| RVM-06 | DONE | Updated messaging microservice HELLO payload to include schemas/OpenAPI metadata via the new schema-aware `IMicroserviceTransport.ConnectAsync(...)` overload. |
| REMED-05 | TODO | Remediation checklist: docs/implplan/audits/csproj-standards/remediation/checklists/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/StellaOps.Router.Transport.Messaging.md. |
| REMED-06 | DONE | SOLID review notes captured for SPRINT_20260130_002. |

View File

@@ -158,6 +158,56 @@ public sealed class MessagingTransportQueueOptionsTests
payload.OpenApiInfo!.Title.Should().Be("timelineindexer");
}
[Trait("Category", TestCategories.Unit)]
[Fact]
public async Task MessagingTransportClient_ConnectAsync_WhenAlreadyConnected_ReusesQueuesAndLogicalConnection()
{
var options = Options.Create(new MessagingTransportOptions
{
ConsumerGroup = "timelineindexer-test",
BatchSize = 1
});
var queueFactory = new RecordingQueueFactory();
var client = new MessagingTransportClient(
queueFactory,
options,
NullLogger<MessagingTransportClient>.Instance);
var instance = new InstanceDescriptor
{
InstanceId = "timelineindexer-1",
ServiceName = "timelineindexer",
Version = "1.0.0",
Region = "local"
};
EndpointDescriptor[] endpoints =
[
new EndpointDescriptor
{
ServiceName = "timelineindexer",
Version = "1.0.0",
Method = "GET",
Path = "/api/v1/timeline"
}
];
await client.ConnectAsync(instance, endpoints, CancellationToken.None);
await client.ConnectAsync(instance, endpoints, CancellationToken.None);
await client.DisconnectAsync();
queueFactory.CreatedQueues.Should().HaveCount(3);
var helloMessages = queueFactory.EnqueuedMessages
.OfType<RpcRequestMessage>()
.Where(message => message.FrameType == Common.Enums.FrameType.Hello)
.ToList();
helloMessages.Should().HaveCount(2);
helloMessages.Select(message => message.ConnectionId).Distinct().Should().ContainSingle();
}
[Trait("Category", TestCategories.Unit)]
[Fact]
public void MessagingTransportOptions_DefaultControlQueue_DoesNotCollideWithGatewayServiceQueue()

View File

@@ -4,6 +4,7 @@ Source of truth: `docs/implplan/SPRINT_20260130_002_Tools_csproj_remediation_sol
| Task ID | Status | Notes |
| --- | --- | --- |
| RTR-MSG-002 | DONE | `docs/implplan/SPRINT_20260307_008_Router_integrations_messaging_reregistration_stability.md` - deterministic transport regressions now cover HELLO re-registration reuse via the xUnit v3 runner. |
| RVM-06 | DONE | Added messaging HELLO payload coverage to verify schema/openapi metadata propagation from microservice transport client. |
| REMED-05 | TODO | Remediation checklist: docs/implplan/audits/csproj-standards/remediation/checklists/src/Router/__Tests/StellaOps.Router.Common.Tests/StellaOps.Router.Common.Tests.md. |
| REMED-06 | DONE | SOLID review notes captured for SPRINT_20260130_002. |

View File

@@ -0,0 +1,86 @@
using FluentAssertions;
using StellaOps.Router.Common.Enums;
using StellaOps.Router.Common.Models;
using StellaOps.Router.Gateway.State;
namespace StellaOps.Router.Gateway.Tests.Routing;
[Trait("Category", "Unit")]
public sealed class InMemoryRoutingStateTests
{
[Fact]
public void AddConnection_WhenSameInstanceReconnects_ReplacesTheStaleConnection()
{
var state = new InMemoryRoutingState();
var staleConnection = CreateConnection(
connectionId: "conn-stale",
instanceId: "integrations-1",
endpointPath: "/api/v1/integrations");
var refreshedConnection = CreateConnection(
connectionId: "conn-fresh",
instanceId: "integrations-1",
endpointPath: "/api/v1/integrations");
state.AddConnection(staleConnection);
state.AddConnection(refreshedConnection);
state.GetAllConnections().Should().ContainSingle()
.Which.ConnectionId.Should().Be("conn-fresh");
state.GetConnection("conn-stale").Should().BeNull();
state.GetConnectionsFor("integrations", "1.0.0", "GET", "/api/v1/integrations")
.Should().ContainSingle()
.Which.ConnectionId.Should().Be("conn-fresh");
}
[Fact]
public void AddConnection_WhenSameConnectionIdRefreshes_RebuildsEndpointIndexes()
{
var state = new InMemoryRoutingState();
var originalConnection = CreateConnection(
connectionId: "conn-1",
instanceId: "integrations-1",
endpointPath: "/api/v1/integrations");
var refreshedConnection = CreateConnection(
connectionId: "conn-1",
instanceId: "integrations-1",
endpointPath: "/api/v1/integrations/runtime-hosts");
state.AddConnection(originalConnection);
state.AddConnection(refreshedConnection);
state.ResolveEndpoint("GET", "/api/v1/integrations").Should().BeNull();
state.GetConnectionsFor("integrations", "1.0.0", "GET", "/api/v1/integrations/runtime-hosts")
.Should().ContainSingle()
.Which.ConnectionId.Should().Be("conn-1");
}
private static ConnectionState CreateConnection(
string connectionId,
string instanceId,
string endpointPath)
{
var endpoint = new EndpointDescriptor
{
ServiceName = "integrations",
Version = "1.0.0",
Method = "GET",
Path = endpointPath
};
var connection = new ConnectionState
{
ConnectionId = connectionId,
Instance = new InstanceDescriptor
{
InstanceId = instanceId,
ServiceName = "integrations",
Version = "1.0.0",
Region = "local"
},
TransportType = TransportType.Messaging
};
connection.Endpoints[(endpoint.Method, endpoint.Path)] = endpoint;
return connection;
}
}

View File

@@ -4,5 +4,6 @@ Source of truth: `docs/implplan/SPRINT_20260130_002_Tools_csproj_remediation_sol
| Task ID | Status | Notes |
| --- | --- | --- |
| RTR-MSG-002 | DONE | `docs/implplan/SPRINT_20260307_008_Router_integrations_messaging_reregistration_stability.md` - routing-state regressions now cover stale same-instance registration replacement and endpoint index rebuilds. |
| REMED-05 | TODO | Remediation checklist: docs/implplan/audits/csproj-standards/remediation/checklists/src/Router/__Tests/StellaOps.Router.Gateway.Tests/StellaOps.Router.Gateway.Tests.md. |
| REMED-06 | DONE | SOLID review notes captured for SPRINT_20260130_002. |