using System.Collections.Immutable; using System.Text.Json.Nodes; using MongoDB.Bson; using MongoDB.Driver; using StellaOps.Graph.Indexer.Infrastructure; namespace StellaOps.Graph.Indexer.Analytics; public sealed class MongoGraphSnapshotProvider : IGraphSnapshotProvider { private readonly IMongoCollection _snapshots; private readonly IMongoCollection _progress; private readonly MongoGraphSnapshotProviderOptions _options; public MongoGraphSnapshotProvider(IMongoDatabase database, MongoGraphSnapshotProviderOptions? options = null) { ArgumentNullException.ThrowIfNull(database); _options = options ?? new MongoGraphSnapshotProviderOptions(); _snapshots = database.GetCollection(_options.SnapshotCollectionName); _progress = database.GetCollection(_options.ProgressCollectionName); } public async Task> GetPendingSnapshotsAsync(CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); var processedIds = await _progress .Find(FilterDefinition.Empty) .Project(doc => doc["snapshot_id"].AsString) .ToListAsync(cancellationToken) .ConfigureAwait(false); var filter = Builders.Filter.Nin("snapshot_id", processedIds); var snapshots = await _snapshots .Find(filter) .Limit(_options.MaxBatch) .Sort(Builders.Sort.Descending("generated_at")) .ToListAsync(cancellationToken) .ConfigureAwait(false); var result = new List(snapshots.Count); foreach (var snapshot in snapshots) { var tenant = snapshot.GetValue("tenant", string.Empty).AsString; var snapshotId = snapshot.GetValue("snapshot_id", string.Empty).AsString; var generatedAt = snapshot.TryGetValue("generated_at", out var generated) && generated.BsonType == BsonType.DateTime ? DateTime.SpecifyKind(generated.ToUniversalTime(), DateTimeKind.Utc) : DateTimeOffset.UtcNow; var nodes = snapshot.TryGetValue("nodes", out var nodesValue) && nodesValue is BsonArray nodesArray ? BsonJsonConverter.ToJsonArray(nodesArray).Select(n => (JsonObject)n!).ToImmutableArray() : ImmutableArray.Empty; var edges = snapshot.TryGetValue("edges", out var edgesValue) && edgesValue is BsonArray edgesArray ? BsonJsonConverter.ToJsonArray(edgesArray).Select(n => (JsonObject)n!).ToImmutableArray() : ImmutableArray.Empty; result.Add(new GraphAnalyticsSnapshot(tenant, snapshotId, generatedAt, nodes, edges)); } return result; } public async Task MarkProcessedAsync(string tenant, string snapshotId, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); var filter = Builders.Filter.Eq("snapshot_id", snapshotId) & Builders.Filter.Eq("tenant", tenant); var update = Builders.Update.Set("snapshot_id", snapshotId) .Set("tenant", tenant) .SetOnInsert("processed_at", DateTimeOffset.UtcNow.UtcDateTime); await _progress.UpdateOneAsync(filter, update, new UpdateOptions { IsUpsert = true }, cancellationToken) .ConfigureAwait(false); } }