// // Copyright (c) StellaOps. Licensed under AGPL-3.0-or-later. // using System; using System.Collections.Generic; using System.Threading.Tasks; using FluentAssertions; using Microsoft.Extensions.Logging.Abstractions; using StackExchange.Redis; using StellaOps.Scheduler.Models; using StellaOps.Scheduler.Queue.Redis; using StellaOps.TestKit; using Testcontainers.Redis; using Xunit; namespace StellaOps.Scheduler.Queue.Tests; /// /// Integration tests for scheduler queues. /// /// /// HLC integration has been moved to the enqueue/dequeue services layer. /// These tests verify basic queue functionality. /// [Trait("Category", TestCategories.Integration)] public sealed class HlcQueueIntegrationTests : IAsyncLifetime { private readonly RedisContainer _redis; private string? _skipReason; public HlcQueueIntegrationTests() { _redis = new RedisBuilder().Build(); } public async ValueTask InitializeAsync() { try { await _redis.StartAsync(); } catch (Exception ex) when (IsDockerUnavailable(ex)) { _skipReason = $"Docker engine is not available for Redis-backed tests: {ex.Message}"; } } public async ValueTask DisposeAsync() { if (_skipReason is not null) { return; } await _redis.DisposeAsync().AsTask(); } [Fact] public async Task PlannerQueue_EnqueueAndLease_Works() { if (SkipIfUnavailable()) { return; } var options = CreateOptions(); await using var queue = new RedisSchedulerPlannerQueue( options, options.Redis, NullLogger.Instance, TimeProvider.System, connectionFactory: async config => (IConnectionMultiplexer)await ConnectionMultiplexer.ConnectAsync(config).ConfigureAwait(false)); var message = CreatePlannerMessage(); var enqueueResult = await queue.EnqueueAsync(message); enqueueResult.Deduplicated.Should().BeFalse(); var leases = await queue.LeaseAsync(new SchedulerQueueLeaseRequest("planner-test", batchSize: 1, options.DefaultLeaseDuration)); leases.Should().ContainSingle(); var lease = leases[0]; lease.Message.Run.Id.Should().Be(message.Run.Id); await lease.AcknowledgeAsync(); } [Fact] public async Task RunnerQueue_EnqueueAndLease_Works() { if (SkipIfUnavailable()) { return; } var options = CreateOptions(); await using var queue = new RedisSchedulerRunnerQueue( options, options.Redis, NullLogger.Instance, TimeProvider.System, connectionFactory: async config => (IConnectionMultiplexer)await ConnectionMultiplexer.ConnectAsync(config).ConfigureAwait(false)); var message = CreateRunnerMessage(); await queue.EnqueueAsync(message); var leases = await queue.LeaseAsync(new SchedulerQueueLeaseRequest("runner-test", batchSize: 1, options.DefaultLeaseDuration)); leases.Should().ContainSingle(); var lease = leases[0]; lease.Message.SegmentId.Should().Be(message.SegmentId); await lease.AcknowledgeAsync(); } [Fact] public async Task PlannerQueue_MultipleMessages_AllLeased() { if (SkipIfUnavailable()) { return; } var options = CreateOptions(); await using var queue = new RedisSchedulerPlannerQueue( options, options.Redis, NullLogger.Instance, TimeProvider.System, connectionFactory: async config => (IConnectionMultiplexer)await ConnectionMultiplexer.ConnectAsync(config).ConfigureAwait(false)); // Enqueue multiple messages for (int i = 0; i < 5; i++) { var msg = CreatePlannerMessage(suffix: i.ToString()); await queue.EnqueueAsync(msg); } // Lease all messages var leases = await queue.LeaseAsync(new SchedulerQueueLeaseRequest("multi-consumer", batchSize: 10, options.DefaultLeaseDuration)); leases.Should().HaveCount(5); foreach (var lease in leases) { await lease.AcknowledgeAsync(); } } [Fact] public async Task PlannerQueue_Idempotency_DuplicatesAreDetected() { if (SkipIfUnavailable()) { return; } var options = CreateOptions(); await using var queue = new RedisSchedulerPlannerQueue( options, options.Redis, NullLogger.Instance, TimeProvider.System, connectionFactory: async config => (IConnectionMultiplexer)await ConnectionMultiplexer.ConnectAsync(config).ConfigureAwait(false)); var message = CreatePlannerMessage(); // First enqueue var first = await queue.EnqueueAsync(message); first.Deduplicated.Should().BeFalse(); // Second enqueue with same message should be deduplicated var second = await queue.EnqueueAsync(message); second.Deduplicated.Should().BeTrue(); // Only one message should be leased var leases = await queue.LeaseAsync(new SchedulerQueueLeaseRequest("dedup-consumer", batchSize: 10, options.DefaultLeaseDuration)); leases.Should().ContainSingle(); await leases[0].AcknowledgeAsync(); } [Fact] public async Task RunnerQueue_Ordering_PreservedInLeases() { if (SkipIfUnavailable()) { return; } var options = CreateOptions(); await using var queue = new RedisSchedulerRunnerQueue( options, options.Redis, NullLogger.Instance, TimeProvider.System, connectionFactory: async config => (IConnectionMultiplexer)await ConnectionMultiplexer.ConnectAsync(config).ConfigureAwait(false)); // Enqueue messages with sequential segment IDs var segmentIds = new List(); for (int i = 0; i < 5; i++) { var segmentId = $"segment-order-{i:D3}"; segmentIds.Add(segmentId); await queue.EnqueueAsync(CreateRunnerMessage(segmentId)); } // Lease all messages var leases = await queue.LeaseAsync(new SchedulerQueueLeaseRequest("order-consumer", batchSize: 10, options.DefaultLeaseDuration)); leases.Should().HaveCount(5); // Verify ordering is preserved for (int i = 0; i < leases.Count; i++) { leases[i].Message.SegmentId.Should().Be(segmentIds[i]); await leases[i].AcknowledgeAsync(); } } private SchedulerQueueOptions CreateOptions() { var unique = Guid.NewGuid().ToString("N"); return new SchedulerQueueOptions { Kind = SchedulerQueueTransportKind.Redis, DefaultLeaseDuration = TimeSpan.FromSeconds(30), MaxDeliveryAttempts = 5, RetryInitialBackoff = TimeSpan.FromMilliseconds(10), RetryMaxBackoff = TimeSpan.FromMilliseconds(50), Redis = new SchedulerRedisQueueOptions { ConnectionString = _redis.GetConnectionString(), Database = 0, InitializationTimeout = TimeSpan.FromSeconds(10), Planner = new RedisSchedulerStreamOptions { Stream = $"scheduler:test:planner:{unique}", ConsumerGroup = $"planner-test-{unique}", DeadLetterStream = $"scheduler:test:planner:{unique}:dead", IdempotencyKeyPrefix = $"scheduler:test:planner:{unique}:idemp:", IdempotencyWindow = TimeSpan.FromMinutes(5) }, Runner = new RedisSchedulerStreamOptions { Stream = $"scheduler:test:runner:{unique}", ConsumerGroup = $"runner-test-{unique}", DeadLetterStream = $"scheduler:test:runner:{unique}:dead", IdempotencyKeyPrefix = $"scheduler:test:runner:{unique}:idemp:", IdempotencyWindow = TimeSpan.FromMinutes(5) } } }; } private bool SkipIfUnavailable() { if (_skipReason is not null) { return true; } return false; } private static bool IsDockerUnavailable(Exception exception) { while (exception is AggregateException aggregate && aggregate.InnerException is not null) { exception = aggregate.InnerException; } return exception is TimeoutException || exception.GetType().Name.Contains("Docker", StringComparison.OrdinalIgnoreCase); } private static PlannerQueueMessage CreatePlannerMessage(string suffix = "") { var id = string.IsNullOrEmpty(suffix) ? "run-test" : $"run-test-{suffix}"; var schedule = new Schedule( id: "sch-test", tenantId: "tenant-test", name: "Test", enabled: true, cronExpression: "0 0 * * *", timezone: "UTC", mode: ScheduleMode.AnalysisOnly, selection: new Selector(SelectorScope.AllImages, tenantId: "tenant-test"), onlyIf: ScheduleOnlyIf.Default, notify: ScheduleNotify.Default, limits: ScheduleLimits.Default, createdAt: DateTimeOffset.UtcNow, createdBy: "tests", updatedAt: DateTimeOffset.UtcNow, updatedBy: "tests"); var run = new Run( id: id, tenantId: "tenant-test", trigger: RunTrigger.Manual, state: RunState.Planning, stats: RunStats.Empty, createdAt: DateTimeOffset.UtcNow, reason: RunReason.Empty, scheduleId: schedule.Id); var impactSet = new ImpactSet( selector: new Selector(SelectorScope.AllImages, tenantId: "tenant-test"), images: new[] { new ImpactImage( imageDigest: "sha256:cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc", registry: "registry", repository: "repo", namespaces: new[] { "prod" }, tags: new[] { "latest" }) }, usageOnly: true, generatedAt: DateTimeOffset.UtcNow, total: 1); return new PlannerQueueMessage(run, impactSet, schedule, correlationId: $"corr-{suffix}"); } private static RunnerSegmentQueueMessage CreateRunnerMessage(string? segmentId = null) { return new RunnerSegmentQueueMessage( segmentId: segmentId ?? "segment-test", runId: "run-test", tenantId: "tenant-test", imageDigests: new[] { "sha256:dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd" }, scheduleId: "sch-test", ratePerSecond: 10, usageOnly: true, attributes: new Dictionary { ["priority"] = "normal" }, correlationId: "corr-runner"); } }