using System.Net; using System.Text; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StellaOps.Router.Common.Abstractions; using StellaOps.Router.Common.Enums; using StellaOps.Router.Common.Models; using Xunit; namespace StellaOps.Router.Transport.Udp.Tests; public class UdpTransportTests { private static readonly int BasePort = 15100; private static int _portOffset; private static int GetNextPort() => BasePort + Interlocked.Increment(ref _portOffset); [Fact] public void UdpFrameProtocol_SerializeAndParse_RoundTrip() { // Arrange var originalFrame = new Frame { Type = FrameType.Request, CorrelationId = Guid.NewGuid().ToString("N"), Payload = Encoding.UTF8.GetBytes("Hello, UDP!") }; // Act var serialized = UdpFrameProtocol.SerializeFrame(originalFrame); var parsed = UdpFrameProtocol.ParseFrame(serialized); // Assert Assert.Equal(originalFrame.Type, parsed.Type); Assert.Equal(originalFrame.CorrelationId, parsed.CorrelationId); Assert.Equal(originalFrame.Payload.ToArray(), parsed.Payload.ToArray()); } [Fact] public void UdpFrameProtocol_ParseFrame_WithEmptyPayload() { // Arrange var originalFrame = new Frame { Type = FrameType.Hello, CorrelationId = Guid.NewGuid().ToString("N"), Payload = ReadOnlyMemory.Empty }; // Act var serialized = UdpFrameProtocol.SerializeFrame(originalFrame); var parsed = UdpFrameProtocol.ParseFrame(serialized); // Assert Assert.Equal(originalFrame.Type, parsed.Type); Assert.Empty(parsed.Payload.ToArray()); } [Fact] public void UdpFrameProtocol_ParseFrame_ThrowsOnTooSmallDatagram() { // Arrange var tooSmall = new byte[5]; // Less than 17 bytes (1 + 16) // Act & Assert Assert.Throws(() => UdpFrameProtocol.ParseFrame(tooSmall)); } [Fact] public void PayloadTooLargeException_HasCorrectProperties() { // Arrange & Act var exception = new PayloadTooLargeException(10000, 8192); // Assert Assert.Equal(10000, exception.ActualSize); Assert.Equal(8192, exception.MaxSize); Assert.Contains("10000", exception.Message); Assert.Contains("8192", exception.Message); } [Fact] public async Task UdpTransportServer_StartsAndStops() { // Arrange var port = GetNextPort(); var services = new ServiceCollection(); services.AddLogging(); services.AddUdpTransportServer(opts => { opts.Port = port; opts.BindAddress = IPAddress.Loopback; }); await using var provider = services.BuildServiceProvider(); var server = provider.GetRequiredService(); // Act await server.StartAsync(CancellationToken.None); await Task.Delay(50); // Assert Assert.Equal(0, server.ConnectionCount); // Cleanup await server.StopAsync(CancellationToken.None); } [Fact] public async Task UdpTransportClient_ConnectsAndDisconnects() { // Arrange var port = GetNextPort(); var services = new ServiceCollection(); services.AddLogging(); services.AddUdpTransportServer(opts => { opts.Port = port; opts.BindAddress = IPAddress.Loopback; }); services.AddUdpTransportClient(opts => { opts.Host = "127.0.0.1"; opts.Port = port; }); await using var provider = services.BuildServiceProvider(); var server = provider.GetRequiredService(); var client = provider.GetRequiredService(); await server.StartAsync(CancellationToken.None); await Task.Delay(50); // Act var instance = new InstanceDescriptor { InstanceId = "test-instance", ServiceName = "TestService", Version = "1.0.0", Region = "local" }; await client.ConnectAsync(instance, [], CancellationToken.None); await Task.Delay(100); // Assert Assert.Equal(1, server.ConnectionCount); // Cleanup await client.DisconnectAsync(); await server.StopAsync(CancellationToken.None); } [Fact] public async Task UdpTransport_RequestResponse_Works() { // Arrange var port = GetNextPort(); var services = new ServiceCollection(); services.AddLogging(); services.AddUdpTransportServer(opts => { opts.Port = port; opts.BindAddress = IPAddress.Loopback; }); services.AddUdpTransportClient(opts => { opts.Host = "127.0.0.1"; opts.Port = port; }); await using var provider = services.BuildServiceProvider(); var server = provider.GetRequiredService(); var client = provider.GetRequiredService(); // Set up server to respond to requests server.OnFrame += (connectionId, frame) => { if (frame.Type == FrameType.Request) { var responseFrame = new Frame { Type = FrameType.Response, CorrelationId = frame.CorrelationId, Payload = Encoding.UTF8.GetBytes("Response data") }; _ = server.SendFrameAsync(connectionId, responseFrame); } }; await server.StartAsync(CancellationToken.None); await Task.Delay(50); var instance = new InstanceDescriptor { InstanceId = "test-instance", ServiceName = "TestService", Version = "1.0.0", Region = "local" }; await client.ConnectAsync(instance, [], CancellationToken.None); await Task.Delay(100); // Act var connectionState = new ConnectionState { ConnectionId = "test", Instance = instance, TransportType = TransportType.Udp }; var requestFrame = new Frame { Type = FrameType.Request, CorrelationId = Guid.NewGuid().ToString("N"), Payload = Encoding.UTF8.GetBytes("Request data") }; var response = await client.SendRequestAsync( connectionState, requestFrame, TimeSpan.FromSeconds(5), CancellationToken.None); // Assert Assert.Equal(FrameType.Response, response.Type); Assert.Equal("Response data", Encoding.UTF8.GetString(response.Payload.Span)); // Cleanup await client.DisconnectAsync(); await server.StopAsync(CancellationToken.None); } [Fact] public async Task UdpTransport_PayloadTooLarge_ThrowsException() { // Arrange var port = GetNextPort(); var services = new ServiceCollection(); services.AddLogging(); services.AddUdpTransportServer(opts => { opts.Port = port; opts.BindAddress = IPAddress.Loopback; opts.MaxDatagramSize = 100; // Small limit for testing }); services.AddUdpTransportClient(opts => { opts.Host = "127.0.0.1"; opts.Port = port; opts.MaxDatagramSize = 100; // Small limit for testing }); await using var provider = services.BuildServiceProvider(); var server = provider.GetRequiredService(); var client = provider.GetRequiredService(); await server.StartAsync(CancellationToken.None); await Task.Delay(50); var instance = new InstanceDescriptor { InstanceId = "test-instance", ServiceName = "TestService", Version = "1.0.0", Region = "local" }; await client.ConnectAsync(instance, [], CancellationToken.None); await Task.Delay(100); // Act & Assert var connectionState = new ConnectionState { ConnectionId = "test", Instance = instance, TransportType = TransportType.Udp }; var largePayload = new byte[200]; // Exceeds 100 byte limit var requestFrame = new Frame { Type = FrameType.Request, CorrelationId = Guid.NewGuid().ToString("N"), Payload = largePayload }; await Assert.ThrowsAsync(() => client.SendRequestAsync( connectionState, requestFrame, TimeSpan.FromSeconds(5), CancellationToken.None)); // Cleanup await client.DisconnectAsync(); await server.StopAsync(CancellationToken.None); } [Fact] public async Task UdpTransport_StreamingNotSupported_ThrowsNotSupportedException() { // Arrange var port = GetNextPort(); var services = new ServiceCollection(); services.AddLogging(); services.AddUdpTransportClient(opts => { opts.Host = "127.0.0.1"; opts.Port = port; }); await using var provider = services.BuildServiceProvider(); var client = provider.GetRequiredService(); var connectionState = new ConnectionState { ConnectionId = "test", Instance = new InstanceDescriptor { InstanceId = "test", ServiceName = "TestService", Version = "1.0.0", Region = "local" }, TransportType = TransportType.Udp }; var requestFrame = new Frame { Type = FrameType.Request, CorrelationId = Guid.NewGuid().ToString("N"), Payload = ReadOnlyMemory.Empty }; // Act & Assert await Assert.ThrowsAsync(() => client.SendStreamingAsync( connectionState, requestFrame, Stream.Null, _ => Task.CompletedTask, new PayloadLimits(), CancellationToken.None)); } [Fact] public async Task UdpTransport_Timeout_ThrowsTimeoutException() { // Arrange var port = GetNextPort(); var services = new ServiceCollection(); services.AddLogging(); services.AddUdpTransportServer(opts => { opts.Port = port; opts.BindAddress = IPAddress.Loopback; }); services.AddUdpTransportClient(opts => { opts.Host = "127.0.0.1"; opts.Port = port; }); await using var provider = services.BuildServiceProvider(); var server = provider.GetRequiredService(); var client = provider.GetRequiredService(); // Server doesn't respond to requests (no OnFrame handler) await server.StartAsync(CancellationToken.None); await Task.Delay(50); var instance = new InstanceDescriptor { InstanceId = "test-instance", ServiceName = "TestService", Version = "1.0.0", Region = "local" }; await client.ConnectAsync(instance, [], CancellationToken.None); await Task.Delay(100); // Act & Assert var connectionState = new ConnectionState { ConnectionId = "test", Instance = instance, TransportType = TransportType.Udp }; var requestFrame = new Frame { Type = FrameType.Request, CorrelationId = Guid.NewGuid().ToString("N"), Payload = Encoding.UTF8.GetBytes("Test") }; await Assert.ThrowsAsync(() => client.SendRequestAsync( connectionState, requestFrame, TimeSpan.FromMilliseconds(100), // Short timeout CancellationToken.None)); // Cleanup await client.DisconnectAsync(); await server.StopAsync(CancellationToken.None); } [Fact] public void ServiceCollectionExtensions_RegistersServerCorrectly() { // Arrange var services = new ServiceCollection(); services.AddLogging(); services.AddUdpTransportServer(opts => { opts.Port = 5102; }); // Act var provider = services.BuildServiceProvider(); var server = provider.GetService(); var udpServer = provider.GetService(); // Assert Assert.NotNull(server); Assert.NotNull(udpServer); Assert.Same(server, udpServer); } [Fact] public void ServiceCollectionExtensions_RegistersClientCorrectly() { // Arrange var services = new ServiceCollection(); services.AddLogging(); services.AddUdpTransportClient(opts => { opts.Host = "127.0.0.1"; opts.Port = 5102; }); // Act var provider = services.BuildServiceProvider(); var client = provider.GetService(); var udpClient = provider.GetService(); var microserviceTransport = provider.GetService(); // Assert Assert.NotNull(client); Assert.NotNull(udpClient); Assert.NotNull(microserviceTransport); Assert.Same(client, udpClient); Assert.Same(microserviceTransport, udpClient); } [Fact] public async Task UdpTransport_HeartbeatSent() { // Arrange var port = GetNextPort(); var heartbeatReceived = new TaskCompletionSource(); var services = new ServiceCollection(); services.AddLogging(); services.AddUdpTransportServer(opts => { opts.Port = port; opts.BindAddress = IPAddress.Loopback; }); services.AddUdpTransportClient(opts => { opts.Host = "127.0.0.1"; opts.Port = port; }); await using var provider = services.BuildServiceProvider(); var server = provider.GetRequiredService(); var client = provider.GetRequiredService(); server.OnFrame += (connectionId, frame) => { if (frame.Type == FrameType.Heartbeat) { heartbeatReceived.TrySetResult(true); } }; await server.StartAsync(CancellationToken.None); await Task.Delay(50); var instance = new InstanceDescriptor { InstanceId = "test-instance", ServiceName = "TestService", Version = "1.0.0", Region = "local" }; await client.ConnectAsync(instance, [], CancellationToken.None); await Task.Delay(100); // Act await client.SendHeartbeatAsync(new HeartbeatPayload { InstanceId = "test-instance", Status = InstanceHealthStatus.Healthy }, CancellationToken.None); // Assert var received = await Task.WhenAny(heartbeatReceived.Task, Task.Delay(1000)); Assert.True(heartbeatReceived.Task.IsCompleted); // Cleanup await client.DisconnectAsync(); await server.StopAsync(CancellationToken.None); } }