using System; using System.Linq; using System.Threading.Tasks; using FluentAssertions; using Mongo2Go; using MongoDB.Driver; using StellaOps.Concelier.Storage.Mongo.Migrations; using StellaOps.Concelier.Storage.Mongo.Orchestrator; namespace StellaOps.Concelier.Storage.Mongo.Tests; public sealed class MongoOrchestratorRegistryStoreTests : IAsyncLifetime { private MongoDbRunner _runner = null!; private IMongoDatabase _database = null!; private MongoOrchestratorRegistryStore _store = null!; public Task InitializeAsync() { _runner = MongoDbRunner.Start(singleNodeReplSet: true); var client = new MongoClient(_runner.ConnectionString); _database = client.GetDatabase("orch-store-tests"); // ensure collections/indexes present var migration = new EnsureOrchestratorCollectionsMigration(); migration.ApplyAsync(_database, CancellationToken.None).GetAwaiter().GetResult(); _store = new MongoOrchestratorRegistryStore( _database.GetCollection(MongoStorageDefaults.Collections.OrchestratorRegistry), _database.GetCollection(MongoStorageDefaults.Collections.OrchestratorCommands), _database.GetCollection(MongoStorageDefaults.Collections.OrchestratorHeartbeats)); return Task.CompletedTask; } public Task DisposeAsync() { _runner.Dispose(); return Task.CompletedTask; } [Fact] public async Task UpsertAndFetchRegistryRoundTrips() { var record = new OrchestratorRegistryRecord( Tenant: "tenant-a", ConnectorId: "icscisa", Source: "icscisa", Capabilities: new[] { "observations", "linksets" }, AuthRef: "secret:concelier/icscisa/api-key", Schedule: new OrchestratorSchedule("*/30 * * * *", "UTC", 1, 120), RatePolicy: new OrchestratorRatePolicy(60, 10, 30), ArtifactKinds: new[] { "raw-advisory", "linkset" }, LockKey: "concelier:tenant-a:icscisa", EgressGuard: new OrchestratorEgressGuard(new[] { "icscert.kisa.or.kr" }, true), CreatedAt: DateTimeOffset.Parse("2025-11-20T00:00:00Z"), UpdatedAt: DateTimeOffset.Parse("2025-11-21T00:00:00Z")); await _store.UpsertAsync(record, CancellationToken.None); var fetched = await _store.GetAsync("tenant-a", "icscisa", CancellationToken.None); fetched.Should().NotBeNull(); fetched!.ConnectorId.Should().Be("icscisa"); fetched.Schedule.Cron.Should().Be("*/30 * * * *"); fetched.RatePolicy.Burst.Should().Be(10); fetched.EgressGuard.AirgapMode.Should().BeTrue(); } [Fact] public async Task EnqueueAndReadCommandsOrdersBySequence() { var runId = Guid.NewGuid(); var first = new OrchestratorCommandRecord( Tenant: "tenant-a", ConnectorId: "icscisa", RunId: runId, Sequence: 1, Command: OrchestratorCommandKind.Pause, Throttle: null, Backfill: null, CreatedAt: DateTimeOffset.Parse("2025-11-20T00:00:00Z"), ExpiresAt: null); var second = new OrchestratorCommandRecord( Tenant: "tenant-a", ConnectorId: "icscisa", RunId: runId, Sequence: 2, Command: OrchestratorCommandKind.Backfill, Throttle: null, Backfill: new OrchestratorBackfillRange("2024-01-01T00:00:00Z", "2024-02-01T00:00:00Z"), CreatedAt: DateTimeOffset.Parse("2025-11-20T00:01:00Z"), ExpiresAt: null); await _store.EnqueueCommandAsync(second, CancellationToken.None); await _store.EnqueueCommandAsync(first, CancellationToken.None); var commands = await _store.GetPendingCommandsAsync("tenant-a", "icscisa", runId, afterSequence: 0, CancellationToken.None); commands.Select(c => c.Sequence).Should().ContainInOrder(1, 2); commands.Last().Backfill!.FromCursor.Should().Be("2024-01-01T00:00:00Z"); } [Fact] public async Task AppendsHeartbeats() { var heartbeat = new OrchestratorHeartbeatRecord( Tenant: "tenant-a", ConnectorId: "icscisa", RunId: Guid.NewGuid(), Sequence: 5, Status: OrchestratorHeartbeatStatus.Running, Progress: 42, QueueDepth: 7, LastArtifactHash: "abc", LastArtifactKind: "normalized", ErrorCode: null, RetryAfterSeconds: null, TimestampUtc: DateTimeOffset.Parse("2025-11-21T00:00:00Z")); await _store.AppendHeartbeatAsync(heartbeat, CancellationToken.None); var count = await _database .GetCollection(MongoStorageDefaults.Collections.OrchestratorHeartbeats) .CountDocumentsAsync(FilterDefinition.Empty); count.Should().Be(1); } }