using System.Threading.Channels; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using StellaOps.Microservice.Streaming; using StellaOps.Router.Common.Enums; using StellaOps.Router.Common.Models; using StellaOps.Router.Transport.InMemory; using Xunit; namespace StellaOps.Gateway.WebService.Tests; public class StreamingTests { private readonly InMemoryConnectionRegistry _registry = new(); private readonly InMemoryTransportOptions _options = new() { SimulatedLatency = TimeSpan.Zero }; private InMemoryTransportClient CreateClient() { return new InMemoryTransportClient( _registry, Options.Create(_options), NullLogger.Instance); } [Fact] public void StreamDataPayload_HasRequiredProperties() { var payload = new StreamDataPayload { CorrelationId = Guid.NewGuid(), Data = new byte[] { 1, 2, 3 }, EndOfStream = true, SequenceNumber = 5 }; Assert.NotEqual(Guid.Empty, payload.CorrelationId); Assert.Equal(3, payload.Data.Length); Assert.True(payload.EndOfStream); Assert.Equal(5, payload.SequenceNumber); } [Fact] public void StreamingOptions_HasDefaultValues() { var options = StreamingOptions.Default; Assert.Equal(64 * 1024, options.ChunkSize); Assert.Equal(100, options.MaxConcurrentStreams); Assert.Equal(TimeSpan.FromMinutes(5), options.StreamIdleTimeout); Assert.Equal(16, options.ChannelCapacity); } } public class StreamingRequestBodyStreamTests { [Fact] public async Task ReadAsync_ReturnsDataFromChannel() { // Arrange var channel = Channel.CreateUnbounded(); using var stream = new StreamingRequestBodyStream(channel.Reader, CancellationToken.None); var testData = new byte[] { 1, 2, 3, 4, 5 }; await channel.Writer.WriteAsync(new StreamChunk { Data = testData, SequenceNumber = 0 }); await channel.Writer.WriteAsync(new StreamChunk { Data = [], EndOfStream = true, SequenceNumber = 1 }); channel.Writer.Complete(); // Act var buffer = new byte[10]; var bytesRead = await stream.ReadAsync(buffer); // Assert Assert.Equal(5, bytesRead); Assert.Equal(testData, buffer[..5]); } [Fact] public async Task ReadAsync_ReturnsZeroAtEndOfStream() { // Arrange var channel = Channel.CreateUnbounded(); using var stream = new StreamingRequestBodyStream(channel.Reader, CancellationToken.None); await channel.Writer.WriteAsync(new StreamChunk { Data = [], EndOfStream = true, SequenceNumber = 0 }); channel.Writer.Complete(); // Act var buffer = new byte[10]; var bytesRead = await stream.ReadAsync(buffer); // Assert Assert.Equal(0, bytesRead); } [Fact] public async Task ReadAsync_HandlesMultipleChunks() { // Arrange var channel = Channel.CreateUnbounded(); using var stream = new StreamingRequestBodyStream(channel.Reader, CancellationToken.None); await channel.Writer.WriteAsync(new StreamChunk { Data = [1, 2, 3], SequenceNumber = 0 }); await channel.Writer.WriteAsync(new StreamChunk { Data = [4, 5, 6], SequenceNumber = 1 }); await channel.Writer.WriteAsync(new StreamChunk { Data = [], EndOfStream = true, SequenceNumber = 2 }); channel.Writer.Complete(); // Act using var memStream = new MemoryStream(); await stream.CopyToAsync(memStream); // Assert var result = memStream.ToArray(); Assert.Equal(6, result.Length); Assert.Equal(new byte[] { 1, 2, 3, 4, 5, 6 }, result); } [Fact] public void Stream_Properties_AreCorrect() { var channel = Channel.CreateUnbounded(); using var stream = new StreamingRequestBodyStream(channel.Reader, CancellationToken.None); Assert.True(stream.CanRead); Assert.False(stream.CanWrite); Assert.False(stream.CanSeek); } [Fact] public void Write_ThrowsNotSupported() { var channel = Channel.CreateUnbounded(); using var stream = new StreamingRequestBodyStream(channel.Reader, CancellationToken.None); Assert.Throws(() => stream.Write([1, 2, 3], 0, 3)); } } public class StreamingResponseBodyStreamTests { [Fact] public async Task WriteAsync_WritesToChannel() { // Arrange var channel = Channel.CreateUnbounded(); await using var stream = new StreamingResponseBodyStream(channel.Writer, 1024, CancellationToken.None); var testData = new byte[] { 1, 2, 3, 4, 5 }; // Act await stream.WriteAsync(testData); await stream.FlushAsync(); // Assert Assert.True(channel.Reader.TryRead(out var chunk)); Assert.Equal(testData, chunk!.Data); Assert.False(chunk.EndOfStream); } [Fact] public async Task CompleteAsync_SendsEndOfStream() { // Arrange var channel = Channel.CreateUnbounded(); await using var stream = new StreamingResponseBodyStream(channel.Writer, 1024, CancellationToken.None); // Act await stream.WriteAsync(new byte[] { 1, 2, 3 }); await stream.CompleteAsync(); // Assert - should have data chunk + end chunk var chunks = new List(); await foreach (var chunk in channel.Reader.ReadAllAsync()) { chunks.Add(chunk); } Assert.Equal(2, chunks.Count); Assert.False(chunks[0].EndOfStream); Assert.True(chunks[1].EndOfStream); } [Fact] public async Task WriteAsync_ChunksLargeData() { // Arrange var chunkSize = 10; var channel = Channel.CreateUnbounded(); await using var stream = new StreamingResponseBodyStream(channel.Writer, chunkSize, CancellationToken.None); var testData = new byte[25]; // Will need 3 chunks for (var i = 0; i < testData.Length; i++) { testData[i] = (byte)i; } // Act await stream.WriteAsync(testData); await stream.CompleteAsync(); // Assert var chunks = new List(); await foreach (var chunk in channel.Reader.ReadAllAsync()) { chunks.Add(chunk); } // Should have 3 chunks (10+10+5) + 1 end-of-stream (with 0 data since remainder already flushed) Assert.Equal(4, chunks.Count); Assert.Equal(10, chunks[0].Data.Length); Assert.Equal(10, chunks[1].Data.Length); Assert.Equal(5, chunks[2].Data.Length); Assert.True(chunks[3].EndOfStream); } [Fact] public void Stream_Properties_AreCorrect() { var channel = Channel.CreateUnbounded(); using var stream = new StreamingResponseBodyStream(channel.Writer, 1024, CancellationToken.None); Assert.False(stream.CanRead); Assert.True(stream.CanWrite); Assert.False(stream.CanSeek); } [Fact] public void Read_ThrowsNotSupported() { var channel = Channel.CreateUnbounded(); using var stream = new StreamingResponseBodyStream(channel.Writer, 1024, CancellationToken.None); Assert.Throws(() => stream.Read(new byte[10], 0, 10)); } } public class InMemoryTransportStreamingTests { private readonly InMemoryConnectionRegistry _registry = new(); private readonly InMemoryTransportOptions _options = new() { SimulatedLatency = TimeSpan.Zero }; private InMemoryTransportClient CreateClient() { return new InMemoryTransportClient( _registry, Options.Create(_options), NullLogger.Instance); } [Fact] public async Task SendStreamingAsync_SendsRequestStreamDataFrames() { // Arrange using var client = CreateClient(); var instance = new InstanceDescriptor { InstanceId = "test-instance", ServiceName = "test-service", Version = "1.0.0", Region = "us-east-1" }; await client.ConnectAsync(instance, [], CancellationToken.None); // Get connection ID via reflection var connectionIdField = client.GetType() .GetField("_connectionId", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); var connectionId = connectionIdField?.GetValue(client)?.ToString(); Assert.NotNull(connectionId); var channel = _registry.GetChannel(connectionId!); Assert.NotNull(channel); Assert.NotNull(channel!.State); // Create request body stream var requestBody = new MemoryStream(new byte[] { 1, 2, 3, 4, 5 }); // Create request frame var requestFrame = new Frame { Type = FrameType.Request, CorrelationId = Guid.NewGuid().ToString("N"), Payload = ReadOnlyMemory.Empty }; var limits = PayloadLimits.Default; // Act - Start streaming (this will send frames to microservice) using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var sendTask = client.SendStreamingAsync( channel.State!, requestFrame, requestBody, _ => Task.CompletedTask, limits, cts.Token); // Read the frames that were sent to microservice var frames = new List(); await foreach (var frame in channel.ToMicroservice.Reader.ReadAllAsync(cts.Token)) { frames.Add(frame); if (frame.Type == FrameType.RequestStreamData && frame.Payload.Length == 0) { // End of stream - break break; } } // Assert - should have REQUEST header + data chunks + end-of-stream Assert.True(frames.Count >= 2); Assert.Equal(FrameType.Request, frames[0].Type); Assert.Equal(FrameType.RequestStreamData, frames[^1].Type); Assert.Equal(0, frames[^1].Payload.Length); // End of stream marker } }