using System; using System.IO; using System.Linq; using System.Text.Json; using System.Text.Json.Nodes; using System.Threading; using System.Threading.Tasks; using FluentAssertions; using Mongo2Go; using MongoDB.Bson; using MongoDB.Driver; using StellaOps.Graph.Indexer.Ingestion.Advisory; using Xunit; namespace StellaOps.Graph.Indexer.Tests; public sealed class MongoGraphDocumentWriterTests : IAsyncLifetime, IDisposable { private readonly MongoTestContext _context; private readonly MongoGraphDocumentWriter? _writer; private readonly IMongoCollection? _nodeCollection; private readonly IMongoCollection? _edgeCollection; public MongoGraphDocumentWriterTests() { _context = MongoTestContext.Create(); if (_context.SkipReason is null) { var database = _context.Database ?? throw new InvalidOperationException("MongoDB test context initialized without a database."); _writer = new MongoGraphDocumentWriter(database); _nodeCollection = database.GetCollection("graph_nodes"); _edgeCollection = database.GetCollection("graph_edges"); } } [SkippableFact] public async Task WriteAsync_upserts_nodes_and_edges() { Skip.If(_context.SkipReason is not null, _context.SkipReason ?? string.Empty); var writer = _writer!; var nodeCollection = _nodeCollection!; var edgeCollection = _edgeCollection!; var snapshot = LoadSnapshot(); var transformer = new AdvisoryLinksetTransformer(); var batch = transformer.Transform(snapshot); await writer.WriteAsync(batch, CancellationToken.None); var nodes = await nodeCollection .Find(FilterDefinition.Empty) .ToListAsync(); var edges = await edgeCollection .Find(FilterDefinition.Empty) .ToListAsync(); nodes.Should().HaveCount(batch.Nodes.Length); edges.Should().HaveCount(batch.Edges.Length); // Write the same batch again to ensure idempotency through upsert. await writer.WriteAsync(batch, CancellationToken.None); var nodesAfter = await nodeCollection .Find(Builders.Filter.Empty) .ToListAsync(); var edgesAfter = await edgeCollection .Find(Builders.Filter.Empty) .ToListAsync(); nodesAfter.Should().HaveCount(batch.Nodes.Length); edgesAfter.Should().HaveCount(batch.Edges.Length); } [SkippableFact] public async Task WriteAsync_replaces_existing_documents() { Skip.If(_context.SkipReason is not null, _context.SkipReason ?? string.Empty); var writer = _writer!; var edgeCollection = _edgeCollection!; var snapshot = LoadSnapshot(); var transformer = new AdvisoryLinksetTransformer(); var batch = transformer.Transform(snapshot); await writer.WriteAsync(batch, CancellationToken.None); // change provenance offset to ensure replacement occurs var snapshotJson = JsonSerializer.Serialize(snapshot); var document = JsonNode.Parse(snapshotJson)!.AsObject(); document["eventOffset"] = snapshot.EventOffset + 10; var mutated = document.Deserialize(new JsonSerializerOptions { PropertyNameCaseInsensitive = true })!; var mutatedBatch = transformer.Transform(mutated); await writer.WriteAsync(mutatedBatch, CancellationToken.None); var edges = await edgeCollection .Find(FilterDefinition.Empty) .ToListAsync(); edges.Should().HaveCount(1); edges.Single()["provenance"]["event_offset"].AsInt64.Should().Be(mutated.EventOffset); } private static AdvisoryLinksetSnapshot LoadSnapshot() { var path = Path.Combine(AppContext.BaseDirectory, "Fixtures", "v1", "linkset-snapshot.json"); return JsonSerializer.Deserialize(File.ReadAllText(path), new JsonSerializerOptions { PropertyNameCaseInsensitive = true })!; } public Task InitializeAsync() => Task.CompletedTask; public Task DisposeAsync() => _context.DisposeAsync().AsTask(); public void Dispose() { _context.Dispose(); } private sealed class MongoTestContext : IAsyncDisposable, IDisposable { private const string ExternalMongoEnv = "STELLAOPS_TEST_MONGO_URI"; private const string DefaultLocalMongo = "mongodb://127.0.0.1:27017"; private readonly bool _ownsDatabase; private readonly string? _databaseName; private MongoTestContext(IMongoClient? client, IMongoDatabase? database, MongoDbRunner? runner, bool ownsDatabase, string? skipReason) { Client = client; Database = database; Runner = runner; _ownsDatabase = ownsDatabase; _databaseName = database?.DatabaseNamespace.DatabaseName; SkipReason = skipReason; } public IMongoClient? Client { get; } public IMongoDatabase? Database { get; } public MongoDbRunner? Runner { get; } public string? SkipReason { get; } public static MongoTestContext Create() { // 1) Explicit override via env var (CI/local scripted). var uri = Environment.GetEnvironmentVariable(ExternalMongoEnv); if (TryCreateExternal(uri, out var externalContext)) { return externalContext!; } // 2) Try localhost default. if (TryCreateExternal(DefaultLocalMongo, out externalContext)) { return externalContext!; } // 3) Fallback to Mongo2Go embedded runner. if (TryCreateEmbedded(out var embeddedContext)) { return embeddedContext!; } return new MongoTestContext(null, null, null, ownsDatabase: false, skipReason: "MongoDB unavailable: set STELLAOPS_TEST_MONGO_URI or run mongod on 127.0.0.1:27017."); } public async ValueTask DisposeAsync() { if (Runner is not null) { Runner.Dispose(); return; } if (_ownsDatabase && Client is not null && _databaseName is not null) { await Client.DropDatabaseAsync(_databaseName).ConfigureAwait(false); } } public void Dispose() { Runner?.Dispose(); if (_ownsDatabase && Client is not null && _databaseName is not null) { Client.DropDatabase(_databaseName); } } private static bool TryCreateExternal(string? uri, out MongoTestContext? context) { context = null; if (string.IsNullOrWhiteSpace(uri)) { return false; } try { var client = new MongoClient(uri); var dbName = $"graph-indexer-tests-{Guid.NewGuid():N}"; var database = client.GetDatabase(dbName); // Ping to ensure connectivity. database.RunCommand(new BsonDocument("ping", 1)); context = new MongoTestContext(client, database, runner: null, ownsDatabase: true, skipReason: null); return true; } catch { return false; } } private static bool TryCreateEmbedded(out MongoTestContext? context) { context = null; try { var runner = MongoDbRunner.Start(singleNodeReplSet: true); var client = new MongoClient(runner.ConnectionString); var database = client.GetDatabase("graph-indexer-tests"); context = new MongoTestContext(client, database, runner, ownsDatabase: false, skipReason: null); return true; } catch { return false; } } } }