using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using StellaOps.Messaging; using StellaOps.Messaging.Abstractions; using StellaOps.Router.Common.Models; using StellaOps.Router.Transport.Messaging; using StellaOps.Router.Transport.Messaging.Options; using StellaOps.Router.Transport.Messaging.Protocol; using StellaOps.TestKit; using System.Text.Json; namespace StellaOps.Router.Common.Tests; public sealed class MessagingTransportQueueOptionsTests { [Trait("Category", TestCategories.Unit)] [Fact] public async Task MessagingTransportServer_StartAsync_UsesConfiguredConsumerGroup() { var options = Options.Create(new MessagingTransportOptions { ConsumerGroup = "router-gateway-test", BatchSize = 1 }); var queueFactory = new RecordingQueueFactory(); var server = new MessagingTransportServer( queueFactory, options, NullLogger.Instance); await server.StartAsync(CancellationToken.None); await server.StopAsync(CancellationToken.None); var requestQueue = queueFactory.CreatedQueues.Single(q => q.MessageType == typeof(RpcRequestMessage) && q.Options.QueueName == options.Value.GetGatewayControlQueueName()); var responseQueue = queueFactory.CreatedQueues.Single(q => q.MessageType == typeof(RpcResponseMessage) && q.Options.QueueName == options.Value.ResponseQueueName); requestQueue.Options.ConsumerGroup.Should().Be("router-gateway-test"); responseQueue.Options.ConsumerGroup.Should().Be("router-gateway-test"); } [Trait("Category", TestCategories.Unit)] [Fact] public async Task MessagingTransportClient_ConnectAsync_UsesConfiguredConsumerGroup() { var options = Options.Create(new MessagingTransportOptions { ConsumerGroup = "timelineindexer-test", BatchSize = 1 }); var queueFactory = new RecordingQueueFactory(); var client = new MessagingTransportClient( queueFactory, options, NullLogger.Instance); var instance = new InstanceDescriptor { InstanceId = "timelineindexer-1", ServiceName = "timelineindexer", Version = "1.0.0", Region = "local" }; await client.ConnectAsync( instance, [ new EndpointDescriptor { ServiceName = "timelineindexer", Version = "1.0.0", Method = "GET", Path = "/api/v1/timeline" } ], CancellationToken.None); await client.DisconnectAsync(); queueFactory.CreatedQueues.Should().NotBeEmpty(); queueFactory.CreatedQueues.Should().OnlyContain(q => q.Options.ConsumerGroup == "timelineindexer-test"); } [Trait("Category", TestCategories.Unit)] [Fact] public async Task MessagingTransportClient_ConnectAsync_IncludesSchemasAndOpenApiInfoInHelloPayload() { var options = Options.Create(new MessagingTransportOptions { ConsumerGroup = "timelineindexer-test", BatchSize = 1 }); var queueFactory = new RecordingQueueFactory(); var client = new MessagingTransportClient( queueFactory, options, NullLogger.Instance); var instance = new InstanceDescriptor { InstanceId = "timelineindexer-1", ServiceName = "timelineindexer", Version = "1.0.0", Region = "local" }; var schemaId = "TimelineEvent"; var schemas = new Dictionary { [schemaId] = new SchemaDefinition { SchemaId = schemaId, SchemaJson = "{\"type\":\"object\"}", ETag = "abc123" } }; await client.ConnectAsync( instance, [ new EndpointDescriptor { ServiceName = "timelineindexer", Version = "1.0.0", Method = "GET", Path = "/api/v1/timeline" } ], schemas, new ServiceOpenApiInfo { Title = "timelineindexer", Description = "Timeline service" }, CancellationToken.None); await client.DisconnectAsync(); var helloMessage = queueFactory.EnqueuedMessages .OfType() .First(message => message.FrameType == Common.Enums.FrameType.Hello); var payload = JsonSerializer.Deserialize( Convert.FromBase64String(helloMessage.PayloadBase64), new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }); payload.Should().NotBeNull(); payload!.Schemas.Should().ContainKey(schemaId); payload.Schemas[schemaId].SchemaJson.Should().Be("{\"type\":\"object\"}"); payload.OpenApiInfo.Should().NotBeNull(); payload.OpenApiInfo!.Title.Should().Be("timelineindexer"); } [Trait("Category", TestCategories.Unit)] [Fact] public void MessagingTransportOptions_DefaultControlQueue_DoesNotCollideWithGatewayServiceQueue() { var options = new MessagingTransportOptions(); var controlQueue = options.GetGatewayControlQueueName(); var gatewayServiceQueue = options.GetRequestQueueName("gateway"); controlQueue.Should().NotBe(gatewayServiceQueue); controlQueue.Should().Be("router:requests:gateway-control"); } private sealed class RecordingQueueFactory : IMessageQueueFactory { public string ProviderName => "test"; public List CreatedQueues { get; } = new(); public List EnqueuedMessages { get; } = new(); public IMessageQueue Create(MessageQueueOptions options) where TMessage : class { CreatedQueues.Add(new CreatedQueue(typeof(TMessage), CloneOptions(options))); return new NoOpMessageQueue(options.QueueName, message => EnqueuedMessages.Add(message)); } private static MessageQueueOptions CloneOptions(MessageQueueOptions options) { return new MessageQueueOptions { QueueName = options.QueueName, ConsumerGroup = options.ConsumerGroup, ConsumerName = options.ConsumerName, DeadLetterQueue = options.DeadLetterQueue, DefaultLeaseDuration = options.DefaultLeaseDuration, MaxDeliveryAttempts = options.MaxDeliveryAttempts, IdempotencyWindow = options.IdempotencyWindow, ApproximateMaxLength = options.ApproximateMaxLength, RetryInitialBackoff = options.RetryInitialBackoff, RetryMaxBackoff = options.RetryMaxBackoff, RetryBackoffMultiplier = options.RetryBackoffMultiplier }; } } private sealed class NoOpMessageQueue : IMessageQueue where TMessage : class { private readonly Action? _onEnqueue; public NoOpMessageQueue(string queueName, Action? onEnqueue = null) { QueueName = queueName; _onEnqueue = onEnqueue; } public string ProviderName => "test"; public string QueueName { get; } public ValueTask EnqueueAsync( TMessage message, EnqueueOptions? options = null, CancellationToken cancellationToken = default) { _onEnqueue?.Invoke(message); return ValueTask.FromResult(EnqueueResult.Succeeded(Guid.NewGuid().ToString("N"))); } public ValueTask>> LeaseAsync( LeaseRequest request, CancellationToken cancellationToken = default) { return ValueTask.FromResult>>([]); } public ValueTask>> ClaimExpiredAsync( ClaimRequest request, CancellationToken cancellationToken = default) { return ValueTask.FromResult>>([]); } public ValueTask GetPendingCountAsync(CancellationToken cancellationToken = default) { return ValueTask.FromResult(0L); } } private sealed record CreatedQueue(Type MessageType, MessageQueueOptions Options); }