Add integration tests for migration categories and execution
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
Policy Lint & Smoke / policy-lint (push) Has been cancelled
Concelier Attestation Tests / attestation-tests (push) Has been cancelled
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
Policy Lint & Smoke / policy-lint (push) Has been cancelled
Concelier Attestation Tests / attestation-tests (push) Has been cancelled
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
- Implemented MigrationCategoryTests to validate migration categorization for startup, release, seed, and data migrations. - Added tests for edge cases, including null, empty, and whitespace migration names. - Created StartupMigrationHostTests to verify the behavior of the migration host with real PostgreSQL instances using Testcontainers. - Included tests for migration execution, schema creation, and handling of pending release migrations. - Added SQL migration files for testing: creating a test table, adding a column, a release migration, and seeding data.
This commit is contained in:
946
docs/router/13-Step.md
Normal file
946
docs/router/13-Step.md
Normal file
@@ -0,0 +1,946 @@
|
||||
# Step 13: InMemory Transport Implementation
|
||||
|
||||
**Phase 3: Transport Layer**
|
||||
**Estimated Complexity:** Medium
|
||||
**Dependencies:** Step 12 (Request/Response Serialization)
|
||||
|
||||
---
|
||||
|
||||
## Overview
|
||||
|
||||
The InMemory transport provides a high-performance, zero-network transport for testing, local development, and same-process microservices. It serves as the reference implementation for the transport layer and must pass all protocol tests before any real transport implementation.
|
||||
|
||||
---
|
||||
|
||||
## Goals
|
||||
|
||||
1. Implement a fully-functional in-process transport without network overhead
|
||||
2. Serve as the reference implementation for transport protocol compliance
|
||||
3. Enable fast integration tests without network dependencies
|
||||
4. Support all frame types and streaming semantics
|
||||
5. Provide debugging hooks for protocol validation
|
||||
|
||||
---
|
||||
|
||||
## Core Architecture
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────┐
|
||||
│ InMemory Transport Hub │
|
||||
├─────────────────────────────────────────────────────────────┤
|
||||
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
|
||||
│ │ Gateway Side │◄──►│ Channels │◄──►│Microservice │ │
|
||||
│ │ Client │ │ (Duplex) │ │ Server │ │
|
||||
│ └──────────────┘ └──────────────┘ └──────────────┘ │
|
||||
│ │
|
||||
│ Connection Registry Frame Queue Handler Dispatch │
|
||||
└─────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Core Types
|
||||
|
||||
### InMemory Channel
|
||||
|
||||
```csharp
|
||||
namespace StellaOps.Router.Transport.InMemory;
|
||||
|
||||
/// <summary>
|
||||
/// Bidirectional in-memory channel for frame exchange.
|
||||
/// </summary>
|
||||
public sealed class InMemoryChannel : IAsyncDisposable
|
||||
{
|
||||
private readonly Channel<Frame> _gatewayToService;
|
||||
private readonly Channel<Frame> _serviceToGateway;
|
||||
private readonly CancellationTokenSource _cts;
|
||||
|
||||
public string ChannelId { get; }
|
||||
public string ServiceName { get; }
|
||||
public string InstanceId { get; }
|
||||
public ConnectionState State { get; private set; }
|
||||
public DateTimeOffset CreatedAt { get; }
|
||||
public DateTimeOffset LastActivityAt { get; private set; }
|
||||
|
||||
public InMemoryChannel(string serviceName, string instanceId)
|
||||
{
|
||||
ChannelId = Guid.NewGuid().ToString("N");
|
||||
ServiceName = serviceName;
|
||||
InstanceId = instanceId;
|
||||
CreatedAt = DateTimeOffset.UtcNow;
|
||||
LastActivityAt = CreatedAt;
|
||||
State = ConnectionState.Connecting;
|
||||
_cts = new CancellationTokenSource();
|
||||
|
||||
// Bounded channels to provide backpressure
|
||||
var options = new BoundedChannelOptions(1000)
|
||||
{
|
||||
FullMode = BoundedChannelFullMode.Wait,
|
||||
SingleReader = false,
|
||||
SingleWriter = false
|
||||
};
|
||||
|
||||
_gatewayToService = Channel.CreateBounded<Frame>(options);
|
||||
_serviceToGateway = Channel.CreateBounded<Frame>(options);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets the writer for sending frames from gateway to service.
|
||||
/// </summary>
|
||||
public ChannelWriter<Frame> GatewayWriter => _gatewayToService.Writer;
|
||||
|
||||
/// <summary>
|
||||
/// Gets the reader for receiving frames from gateway (service side).
|
||||
/// </summary>
|
||||
public ChannelReader<Frame> ServiceReader => _gatewayToService.Reader;
|
||||
|
||||
/// <summary>
|
||||
/// Gets the writer for sending frames from service to gateway.
|
||||
/// </summary>
|
||||
public ChannelWriter<Frame> ServiceWriter => _serviceToGateway.Writer;
|
||||
|
||||
/// <summary>
|
||||
/// Gets the reader for receiving frames from service (gateway side).
|
||||
/// </summary>
|
||||
public ChannelReader<Frame> GatewayReader => _serviceToGateway.Reader;
|
||||
|
||||
public void MarkConnected()
|
||||
{
|
||||
State = ConnectionState.Connected;
|
||||
LastActivityAt = DateTimeOffset.UtcNow;
|
||||
}
|
||||
|
||||
public void UpdateActivity()
|
||||
{
|
||||
LastActivityAt = DateTimeOffset.UtcNow;
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
State = ConnectionState.Disconnected;
|
||||
_cts.Cancel();
|
||||
_gatewayToService.Writer.TryComplete();
|
||||
_serviceToGateway.Writer.TryComplete();
|
||||
_cts.Dispose();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### InMemory Hub
|
||||
|
||||
```csharp
|
||||
namespace StellaOps.Router.Transport.InMemory;
|
||||
|
||||
/// <summary>
|
||||
/// Central hub managing all InMemory transport connections.
|
||||
/// </summary>
|
||||
public sealed class InMemoryTransportHub : IDisposable
|
||||
{
|
||||
private readonly ConcurrentDictionary<string, InMemoryChannel> _channels = new();
|
||||
private readonly ConcurrentDictionary<string, List<string>> _serviceChannels = new();
|
||||
private readonly ILogger<InMemoryTransportHub> _logger;
|
||||
|
||||
public InMemoryTransportHub(ILogger<InMemoryTransportHub> logger)
|
||||
{
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a new channel for a microservice connection.
|
||||
/// </summary>
|
||||
public InMemoryChannel CreateChannel(string serviceName, string instanceId)
|
||||
{
|
||||
var channel = new InMemoryChannel(serviceName, instanceId);
|
||||
|
||||
if (!_channels.TryAdd(channel.ChannelId, channel))
|
||||
{
|
||||
throw new InvalidOperationException($"Channel {channel.ChannelId} already exists");
|
||||
}
|
||||
|
||||
_serviceChannels.AddOrUpdate(
|
||||
serviceName,
|
||||
_ => new List<string> { channel.ChannelId },
|
||||
(_, list) => { lock (list) { list.Add(channel.ChannelId); } return list; }
|
||||
);
|
||||
|
||||
_logger.LogDebug(
|
||||
"Created InMemory channel {ChannelId} for {ServiceName}/{InstanceId}",
|
||||
channel.ChannelId, serviceName, instanceId);
|
||||
|
||||
return channel;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets a channel by ID.
|
||||
/// </summary>
|
||||
public InMemoryChannel? GetChannel(string channelId)
|
||||
{
|
||||
return _channels.TryGetValue(channelId, out var channel) ? channel : null;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets all channels for a service.
|
||||
/// </summary>
|
||||
public IReadOnlyList<InMemoryChannel> GetServiceChannels(string serviceName)
|
||||
{
|
||||
if (!_serviceChannels.TryGetValue(serviceName, out var channelIds))
|
||||
return Array.Empty<InMemoryChannel>();
|
||||
|
||||
var result = new List<InMemoryChannel>();
|
||||
lock (channelIds)
|
||||
{
|
||||
foreach (var id in channelIds)
|
||||
{
|
||||
if (_channels.TryGetValue(id, out var channel) &&
|
||||
channel.State == ConnectionState.Connected)
|
||||
{
|
||||
result.Add(channel);
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Removes a channel from the hub.
|
||||
/// </summary>
|
||||
public async Task RemoveChannelAsync(string channelId)
|
||||
{
|
||||
if (_channels.TryRemove(channelId, out var channel))
|
||||
{
|
||||
if (_serviceChannels.TryGetValue(channel.ServiceName, out var list))
|
||||
{
|
||||
lock (list) { list.Remove(channelId); }
|
||||
}
|
||||
|
||||
await channel.DisposeAsync();
|
||||
|
||||
_logger.LogDebug("Removed InMemory channel {ChannelId}", channelId);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets all active channels.
|
||||
/// </summary>
|
||||
public IEnumerable<InMemoryChannel> GetAllChannels()
|
||||
{
|
||||
return _channels.Values.Where(c => c.State == ConnectionState.Connected);
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
foreach (var channel in _channels.Values)
|
||||
{
|
||||
_ = channel.DisposeAsync();
|
||||
}
|
||||
_channels.Clear();
|
||||
_serviceChannels.Clear();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Gateway-Side Client
|
||||
|
||||
```csharp
|
||||
namespace StellaOps.Router.Transport.InMemory;
|
||||
|
||||
/// <summary>
|
||||
/// Gateway-side client for InMemory transport.
|
||||
/// </summary>
|
||||
public sealed class InMemoryTransportClient : ITransportClient
|
||||
{
|
||||
private readonly InMemoryTransportHub _hub;
|
||||
private readonly IPayloadSerializer _serializer;
|
||||
private readonly ILogger<InMemoryTransportClient> _logger;
|
||||
private readonly ConcurrentDictionary<string, TaskCompletionSource<ResponsePayload>> _pendingRequests = new();
|
||||
|
||||
public string TransportType => "InMemory";
|
||||
|
||||
public InMemoryTransportClient(
|
||||
InMemoryTransportHub hub,
|
||||
IPayloadSerializer serializer,
|
||||
ILogger<InMemoryTransportClient> logger)
|
||||
{
|
||||
_hub = hub;
|
||||
_serializer = serializer;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public async Task<ResponsePayload> SendRequestAsync(
|
||||
string serviceName,
|
||||
RequestPayload request,
|
||||
TimeSpan timeout,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var channels = _hub.GetServiceChannels(serviceName);
|
||||
if (channels.Count == 0)
|
||||
{
|
||||
throw new NoAvailableInstanceException(serviceName);
|
||||
}
|
||||
|
||||
// Simple round-robin selection (in production, use routing plugin)
|
||||
var channel = channels[Random.Shared.Next(channels.Count)];
|
||||
|
||||
var correlationId = Guid.NewGuid().ToString("N");
|
||||
var tcs = new TaskCompletionSource<ResponsePayload>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
_pendingRequests[correlationId] = tcs;
|
||||
|
||||
try
|
||||
{
|
||||
// Create and send request frame
|
||||
var frame = new Frame
|
||||
{
|
||||
Type = FrameType.Request,
|
||||
CorrelationId = correlationId,
|
||||
Payload = _serializer.SerializeRequest(request)
|
||||
};
|
||||
|
||||
await channel.GatewayWriter.WriteAsync(frame, cancellationToken);
|
||||
channel.UpdateActivity();
|
||||
|
||||
// Start listening for response
|
||||
_ = ListenForResponseAsync(channel, correlationId, cancellationToken);
|
||||
|
||||
// Wait for response with timeout
|
||||
using var timeoutCts = new CancellationTokenSource(timeout);
|
||||
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(
|
||||
cancellationToken, timeoutCts.Token);
|
||||
|
||||
try
|
||||
{
|
||||
return await tcs.Task.WaitAsync(linkedCts.Token);
|
||||
}
|
||||
catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested)
|
||||
{
|
||||
// Send cancel frame
|
||||
await SendCancelAsync(channel, correlationId);
|
||||
throw new TimeoutException($"Request to {serviceName} timed out after {timeout}");
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_pendingRequests.TryRemove(correlationId, out _);
|
||||
}
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<ResponsePayload> SendStreamingRequestAsync(
|
||||
string serviceName,
|
||||
IAsyncEnumerable<RequestPayload> requestChunks,
|
||||
TimeSpan timeout,
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
var channels = _hub.GetServiceChannels(serviceName);
|
||||
if (channels.Count == 0)
|
||||
{
|
||||
throw new NoAvailableInstanceException(serviceName);
|
||||
}
|
||||
|
||||
var channel = channels[Random.Shared.Next(channels.Count)];
|
||||
var correlationId = Guid.NewGuid().ToString("N");
|
||||
|
||||
// Send all request chunks
|
||||
await foreach (var chunk in requestChunks.WithCancellation(cancellationToken))
|
||||
{
|
||||
var frame = new Frame
|
||||
{
|
||||
Type = FrameType.Request,
|
||||
CorrelationId = correlationId,
|
||||
Payload = _serializer.SerializeRequest(chunk),
|
||||
Flags = chunk.IsStreaming ? FrameFlags.None : FrameFlags.Final
|
||||
};
|
||||
|
||||
await channel.GatewayWriter.WriteAsync(frame, cancellationToken);
|
||||
channel.UpdateActivity();
|
||||
}
|
||||
|
||||
// Read response chunks
|
||||
await foreach (var frame in channel.GatewayReader.ReadAllAsync(cancellationToken))
|
||||
{
|
||||
if (frame.CorrelationId != correlationId)
|
||||
continue;
|
||||
|
||||
if (frame.Type == FrameType.Response)
|
||||
{
|
||||
var response = _serializer.DeserializeResponse(frame.Payload);
|
||||
yield return response;
|
||||
|
||||
if (response.IsFinalChunk || frame.Flags.HasFlag(FrameFlags.Final))
|
||||
yield break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ListenForResponseAsync(
|
||||
InMemoryChannel channel,
|
||||
string correlationId,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
await foreach (var frame in channel.GatewayReader.ReadAllAsync(cancellationToken))
|
||||
{
|
||||
if (frame.CorrelationId != correlationId)
|
||||
continue;
|
||||
|
||||
if (frame.Type == FrameType.Response)
|
||||
{
|
||||
var response = _serializer.DeserializeResponse(frame.Payload);
|
||||
|
||||
if (_pendingRequests.TryGetValue(correlationId, out var tcs))
|
||||
{
|
||||
tcs.TrySetResult(response);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Expected on cancellation
|
||||
}
|
||||
}
|
||||
|
||||
private async Task SendCancelAsync(InMemoryChannel channel, string correlationId)
|
||||
{
|
||||
try
|
||||
{
|
||||
var cancelFrame = new Frame
|
||||
{
|
||||
Type = FrameType.Cancel,
|
||||
CorrelationId = correlationId,
|
||||
Payload = Array.Empty<byte>()
|
||||
};
|
||||
await channel.GatewayWriter.WriteAsync(cancelFrame);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Failed to send cancel frame for {CorrelationId}", correlationId);
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Microservice-Side Server
|
||||
|
||||
```csharp
|
||||
namespace StellaOps.Router.Transport.InMemory;
|
||||
|
||||
/// <summary>
|
||||
/// Microservice-side server for InMemory transport.
|
||||
/// </summary>
|
||||
public sealed class InMemoryTransportServer : ITransportServer
|
||||
{
|
||||
private readonly InMemoryTransportHub _hub;
|
||||
private readonly IPayloadSerializer _serializer;
|
||||
private readonly ILogger<InMemoryTransportServer> _logger;
|
||||
private InMemoryChannel? _channel;
|
||||
private CancellationTokenSource? _cts;
|
||||
private Task? _processingTask;
|
||||
|
||||
public string TransportType => "InMemory";
|
||||
public bool IsConnected => _channel?.State == ConnectionState.Connected;
|
||||
|
||||
public event Func<RequestPayload, CancellationToken, Task<ResponsePayload>>? OnRequest;
|
||||
public event Func<string, CancellationToken, Task>? OnCancel;
|
||||
|
||||
public InMemoryTransportServer(
|
||||
InMemoryTransportHub hub,
|
||||
IPayloadSerializer serializer,
|
||||
ILogger<InMemoryTransportServer> logger)
|
||||
{
|
||||
_hub = hub;
|
||||
_serializer = serializer;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public async Task ConnectAsync(
|
||||
string serviceName,
|
||||
string instanceId,
|
||||
EndpointDescriptor[] endpoints,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
_channel = _hub.CreateChannel(serviceName, instanceId);
|
||||
_cts = new CancellationTokenSource();
|
||||
|
||||
// Send HELLO frame
|
||||
var helloPayload = new HelloPayload
|
||||
{
|
||||
ServiceName = serviceName,
|
||||
InstanceId = instanceId,
|
||||
Endpoints = endpoints,
|
||||
Metadata = new Dictionary<string, string>
|
||||
{
|
||||
["transport"] = "InMemory",
|
||||
["pid"] = Environment.ProcessId.ToString()
|
||||
}
|
||||
};
|
||||
|
||||
var helloFrame = new Frame
|
||||
{
|
||||
Type = FrameType.Hello,
|
||||
CorrelationId = Guid.NewGuid().ToString("N"),
|
||||
Payload = _serializer.SerializeHello(helloPayload)
|
||||
};
|
||||
|
||||
await _channel.ServiceWriter.WriteAsync(helloFrame, cancellationToken);
|
||||
|
||||
// Wait for HELLO response
|
||||
var response = await _channel.ServiceReader.ReadAsync(cancellationToken);
|
||||
if (response.Type != FrameType.Hello)
|
||||
{
|
||||
throw new ProtocolException($"Expected HELLO response, got {response.Type}");
|
||||
}
|
||||
|
||||
_channel.MarkConnected();
|
||||
_logger.LogInformation(
|
||||
"InMemory transport connected for {ServiceName}/{InstanceId}",
|
||||
serviceName, instanceId);
|
||||
|
||||
// Start processing loop
|
||||
_processingTask = ProcessFramesAsync(_cts.Token);
|
||||
}
|
||||
|
||||
private async Task ProcessFramesAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (_channel == null) return;
|
||||
|
||||
try
|
||||
{
|
||||
await foreach (var frame in _channel.ServiceReader.ReadAllAsync(cancellationToken))
|
||||
{
|
||||
_channel.UpdateActivity();
|
||||
|
||||
switch (frame.Type)
|
||||
{
|
||||
case FrameType.Request:
|
||||
_ = HandleRequestAsync(frame, cancellationToken);
|
||||
break;
|
||||
|
||||
case FrameType.Cancel:
|
||||
if (OnCancel != null)
|
||||
{
|
||||
await OnCancel(frame.CorrelationId, cancellationToken);
|
||||
}
|
||||
break;
|
||||
|
||||
case FrameType.Heartbeat:
|
||||
await HandleHeartbeatAsync(frame);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Expected on shutdown
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error processing InMemory frames");
|
||||
}
|
||||
}
|
||||
|
||||
private async Task HandleRequestAsync(Frame frame, CancellationToken cancellationToken)
|
||||
{
|
||||
if (_channel == null || OnRequest == null) return;
|
||||
|
||||
try
|
||||
{
|
||||
var request = _serializer.DeserializeRequest(frame.Payload);
|
||||
var response = await OnRequest(request, cancellationToken);
|
||||
|
||||
var responseFrame = new Frame
|
||||
{
|
||||
Type = FrameType.Response,
|
||||
CorrelationId = frame.CorrelationId,
|
||||
Payload = _serializer.SerializeResponse(response),
|
||||
Flags = FrameFlags.Final
|
||||
};
|
||||
|
||||
await _channel.ServiceWriter.WriteAsync(responseFrame, cancellationToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error handling request {CorrelationId}", frame.CorrelationId);
|
||||
|
||||
// Send error response
|
||||
var errorResponse = new ResponsePayload
|
||||
{
|
||||
StatusCode = 500,
|
||||
Headers = new Dictionary<string, string>(),
|
||||
ErrorMessage = ex.Message,
|
||||
IsFinalChunk = true
|
||||
};
|
||||
|
||||
var errorFrame = new Frame
|
||||
{
|
||||
Type = FrameType.Response,
|
||||
CorrelationId = frame.CorrelationId,
|
||||
Payload = _serializer.SerializeResponse(errorResponse),
|
||||
Flags = FrameFlags.Final | FrameFlags.Error
|
||||
};
|
||||
|
||||
await _channel.ServiceWriter.WriteAsync(errorFrame, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task HandleHeartbeatAsync(Frame frame)
|
||||
{
|
||||
if (_channel == null) return;
|
||||
|
||||
var pongFrame = new Frame
|
||||
{
|
||||
Type = FrameType.Heartbeat,
|
||||
CorrelationId = frame.CorrelationId,
|
||||
Payload = frame.Payload // Echo back
|
||||
};
|
||||
|
||||
await _channel.ServiceWriter.WriteAsync(pongFrame);
|
||||
}
|
||||
|
||||
public async Task DisconnectAsync()
|
||||
{
|
||||
_cts?.Cancel();
|
||||
|
||||
if (_processingTask != null)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _processingTask.WaitAsync(TimeSpan.FromSeconds(5));
|
||||
}
|
||||
catch (TimeoutException)
|
||||
{
|
||||
_logger.LogWarning("InMemory processing task did not complete in time");
|
||||
}
|
||||
}
|
||||
|
||||
if (_channel != null)
|
||||
{
|
||||
await _hub.RemoveChannelAsync(_channel.ChannelId);
|
||||
}
|
||||
|
||||
_cts?.Dispose();
|
||||
}
|
||||
|
||||
public async Task SendHeartbeatAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (_channel == null || _channel.State != ConnectionState.Connected)
|
||||
return;
|
||||
|
||||
var heartbeatFrame = new Frame
|
||||
{
|
||||
Type = FrameType.Heartbeat,
|
||||
CorrelationId = Guid.NewGuid().ToString("N"),
|
||||
Payload = BitConverter.GetBytes(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds())
|
||||
};
|
||||
|
||||
await _channel.ServiceWriter.WriteAsync(heartbeatFrame, cancellationToken);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Integration with Global Routing State
|
||||
|
||||
```csharp
|
||||
namespace StellaOps.Router.Transport.InMemory;
|
||||
|
||||
/// <summary>
|
||||
/// InMemory transport integration with gateway routing state.
|
||||
/// </summary>
|
||||
public sealed class InMemoryRoutingIntegration : IHostedService
|
||||
{
|
||||
private readonly InMemoryTransportHub _hub;
|
||||
private readonly IGlobalRoutingState _routingState;
|
||||
private readonly ILogger<InMemoryRoutingIntegration> _logger;
|
||||
private Timer? _syncTimer;
|
||||
|
||||
public InMemoryRoutingIntegration(
|
||||
InMemoryTransportHub hub,
|
||||
IGlobalRoutingState routingState,
|
||||
ILogger<InMemoryRoutingIntegration> logger)
|
||||
{
|
||||
_hub = hub;
|
||||
_routingState = routingState;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
// Sync InMemory channels with routing state periodically
|
||||
_syncTimer = new Timer(SyncChannels, null, TimeSpan.Zero, TimeSpan.FromSeconds(5));
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private void SyncChannels(object? state)
|
||||
{
|
||||
try
|
||||
{
|
||||
foreach (var channel in _hub.GetAllChannels())
|
||||
{
|
||||
var connection = new EndpointConnection
|
||||
{
|
||||
ServiceName = channel.ServiceName,
|
||||
InstanceId = channel.InstanceId,
|
||||
ConnectionId = channel.ChannelId,
|
||||
Transport = "InMemory",
|
||||
State = channel.State,
|
||||
LastHeartbeat = channel.LastActivityAt
|
||||
};
|
||||
|
||||
_routingState.UpdateConnection(connection);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error syncing InMemory channels");
|
||||
}
|
||||
}
|
||||
|
||||
public Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_syncTimer?.Dispose();
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Service Registration
|
||||
|
||||
```csharp
|
||||
namespace StellaOps.Router.Transport.InMemory;
|
||||
|
||||
public static class InMemoryTransportExtensions
|
||||
{
|
||||
/// <summary>
|
||||
/// Adds InMemory transport to the gateway.
|
||||
/// </summary>
|
||||
public static IServiceCollection AddInMemoryTransport(this IServiceCollection services)
|
||||
{
|
||||
services.AddSingleton<InMemoryTransportHub>();
|
||||
services.AddSingleton<ITransportClient, InMemoryTransportClient>();
|
||||
services.AddHostedService<InMemoryRoutingIntegration>();
|
||||
|
||||
return services;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Adds InMemory transport to a microservice.
|
||||
/// </summary>
|
||||
public static IServiceCollection AddInMemoryMicroserviceTransport(
|
||||
this IServiceCollection services,
|
||||
Action<InMemoryTransportOptions>? configure = null)
|
||||
{
|
||||
var options = new InMemoryTransportOptions();
|
||||
configure?.Invoke(options);
|
||||
|
||||
services.AddSingleton(options);
|
||||
services.AddSingleton<ITransportServer, InMemoryTransportServer>();
|
||||
|
||||
return services;
|
||||
}
|
||||
}
|
||||
|
||||
public class InMemoryTransportOptions
|
||||
{
|
||||
public int MaxPendingRequests { get; set; } = 1000;
|
||||
public TimeSpan ConnectionTimeout { get; set; } = TimeSpan.FromSeconds(30);
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Testing Utilities
|
||||
|
||||
```csharp
|
||||
namespace StellaOps.Router.Transport.InMemory.Testing;
|
||||
|
||||
/// <summary>
|
||||
/// Test fixture for InMemory transport testing.
|
||||
/// </summary>
|
||||
public sealed class InMemoryTransportFixture : IAsyncDisposable
|
||||
{
|
||||
private readonly InMemoryTransportHub _hub;
|
||||
private readonly ILoggerFactory _loggerFactory;
|
||||
|
||||
public InMemoryTransportHub Hub => _hub;
|
||||
|
||||
public InMemoryTransportFixture()
|
||||
{
|
||||
_loggerFactory = LoggerFactory.Create(b => b.AddConsole());
|
||||
_hub = new InMemoryTransportHub(_loggerFactory.CreateLogger<InMemoryTransportHub>());
|
||||
}
|
||||
|
||||
public InMemoryTransportClient CreateClient()
|
||||
{
|
||||
var serializer = new MessagePackPayloadSerializer();
|
||||
return new InMemoryTransportClient(
|
||||
_hub,
|
||||
serializer,
|
||||
_loggerFactory.CreateLogger<InMemoryTransportClient>());
|
||||
}
|
||||
|
||||
public InMemoryTransportServer CreateServer()
|
||||
{
|
||||
var serializer = new MessagePackPayloadSerializer();
|
||||
return new InMemoryTransportServer(
|
||||
_hub,
|
||||
serializer,
|
||||
_loggerFactory.CreateLogger<InMemoryTransportServer>());
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
_hub.Dispose();
|
||||
_loggerFactory.Dispose();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Unit Tests
|
||||
|
||||
```csharp
|
||||
public class InMemoryTransportTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task SimpleRequestResponse_Works()
|
||||
{
|
||||
await using var fixture = new InMemoryTransportFixture();
|
||||
var client = fixture.CreateClient();
|
||||
var server = fixture.CreateServer();
|
||||
|
||||
// Setup server
|
||||
server.OnRequest += (request, ct) => Task.FromResult(new ResponsePayload
|
||||
{
|
||||
StatusCode = 200,
|
||||
Headers = new Dictionary<string, string>(),
|
||||
Body = Encoding.UTF8.GetBytes($"Hello {request.Path}")
|
||||
});
|
||||
|
||||
await server.ConnectAsync("test-service", "instance-1", Array.Empty<EndpointDescriptor>(), default);
|
||||
|
||||
// Send request
|
||||
var response = await client.SendRequestAsync(
|
||||
"test-service",
|
||||
new RequestPayload
|
||||
{
|
||||
Method = "GET",
|
||||
Path = "/test",
|
||||
Headers = new Dictionary<string, string>(),
|
||||
Claims = new Dictionary<string, string>()
|
||||
},
|
||||
TimeSpan.FromSeconds(5),
|
||||
default);
|
||||
|
||||
Assert.Equal(200, response.StatusCode);
|
||||
Assert.Equal("Hello /test", Encoding.UTF8.GetString(response.Body!));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Cancellation_SendsCancelFrame()
|
||||
{
|
||||
await using var fixture = new InMemoryTransportFixture();
|
||||
var client = fixture.CreateClient();
|
||||
var server = fixture.CreateServer();
|
||||
|
||||
var cancelReceived = new TaskCompletionSource<bool>();
|
||||
|
||||
server.OnRequest += async (request, ct) =>
|
||||
{
|
||||
await Task.Delay(TimeSpan.FromSeconds(30), ct);
|
||||
return new ResponsePayload { StatusCode = 200, Headers = new Dictionary<string, string>() };
|
||||
};
|
||||
|
||||
server.OnCancel += (correlationId, ct) =>
|
||||
{
|
||||
cancelReceived.TrySetResult(true);
|
||||
return Task.CompletedTask;
|
||||
};
|
||||
|
||||
await server.ConnectAsync("test-service", "instance-1", Array.Empty<EndpointDescriptor>(), default);
|
||||
|
||||
// Send request with short timeout
|
||||
await Assert.ThrowsAsync<TimeoutException>(() =>
|
||||
client.SendRequestAsync(
|
||||
"test-service",
|
||||
new RequestPayload { Method = "GET", Path = "/slow", Headers = new Dictionary<string, string>(), Claims = new Dictionary<string, string>() },
|
||||
TimeSpan.FromMilliseconds(100),
|
||||
default));
|
||||
|
||||
// Verify cancel was received
|
||||
var result = await cancelReceived.Task.WaitAsync(TimeSpan.FromSeconds(1));
|
||||
Assert.True(result);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task MultipleInstances_DistributesRequests()
|
||||
{
|
||||
await using var fixture = new InMemoryTransportFixture();
|
||||
var client = fixture.CreateClient();
|
||||
var server1 = fixture.CreateServer();
|
||||
var server2 = fixture.CreateServer();
|
||||
|
||||
var server1Count = 0;
|
||||
var server2Count = 0;
|
||||
|
||||
server1.OnRequest += (r, ct) =>
|
||||
{
|
||||
Interlocked.Increment(ref server1Count);
|
||||
return Task.FromResult(new ResponsePayload { StatusCode = 200, Headers = new Dictionary<string, string>() });
|
||||
};
|
||||
|
||||
server2.OnRequest += (r, ct) =>
|
||||
{
|
||||
Interlocked.Increment(ref server2Count);
|
||||
return Task.FromResult(new ResponsePayload { StatusCode = 200, Headers = new Dictionary<string, string>() });
|
||||
};
|
||||
|
||||
await server1.ConnectAsync("test-service", "instance-1", Array.Empty<EndpointDescriptor>(), default);
|
||||
await server2.ConnectAsync("test-service", "instance-2", Array.Empty<EndpointDescriptor>(), default);
|
||||
|
||||
// Send multiple requests
|
||||
for (int i = 0; i < 100; i++)
|
||||
{
|
||||
await client.SendRequestAsync(
|
||||
"test-service",
|
||||
new RequestPayload { Method = "GET", Path = "/test", Headers = new Dictionary<string, string>(), Claims = new Dictionary<string, string>() },
|
||||
TimeSpan.FromSeconds(5),
|
||||
default);
|
||||
}
|
||||
|
||||
// Both instances should have received requests
|
||||
Assert.True(server1Count > 0);
|
||||
Assert.True(server2Count > 0);
|
||||
Assert.Equal(100, server1Count + server2Count);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Deliverables
|
||||
|
||||
1. `StellaOps.Router.Transport.InMemory/InMemoryChannel.cs`
|
||||
2. `StellaOps.Router.Transport.InMemory/InMemoryTransportHub.cs`
|
||||
3. `StellaOps.Router.Transport.InMemory/InMemoryTransportClient.cs`
|
||||
4. `StellaOps.Router.Transport.InMemory/InMemoryTransportServer.cs`
|
||||
5. `StellaOps.Router.Transport.InMemory/InMemoryRoutingIntegration.cs`
|
||||
6. `StellaOps.Router.Transport.InMemory/InMemoryTransportExtensions.cs`
|
||||
7. `StellaOps.Router.Transport.InMemory.Testing/InMemoryTransportFixture.cs`
|
||||
8. Unit tests for all frame types
|
||||
9. Integration tests for request/response patterns
|
||||
10. Streaming tests
|
||||
|
||||
---
|
||||
|
||||
## Next Step
|
||||
|
||||
Proceed to [Step 14: TCP Transport Implementation](14-Step.md) to implement the primary production transport.
|
||||
Reference in New Issue
Block a user