feat: Add MongoIdempotencyStoreOptions for MongoDB configuration
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
feat: Implement BsonJsonConverter for converting BsonDocument and BsonArray to JSON fix: Update project file to include MongoDB.Bson package test: Add GraphOverlayExporterTests to validate NDJSON export functionality refactor: Refactor Program.cs in Attestation Tool for improved argument parsing and error handling docs: Update README for stella-forensic-verify with usage instructions and exit codes feat: Enhance HmacVerifier with clock skew and not-after checks feat: Add MerkleRootVerifier and ChainOfCustodyVerifier for additional verification methods fix: Update DenoRuntimeShim to correctly handle file paths feat: Introduce ComposerAutoloadData and related parsing in ComposerLockReader test: Add tests for Deno runtime execution and verification test: Enhance PHP package tests to include autoload data verification test: Add unit tests for HmacVerifier and verification logic
This commit is contained in:
@@ -0,0 +1,79 @@
|
||||
using System.Collections.Immutable;
|
||||
using System.Text.Json.Nodes;
|
||||
using MongoDB.Bson;
|
||||
using MongoDB.Driver;
|
||||
using StellaOps.Graph.Indexer.Infrastructure;
|
||||
|
||||
namespace StellaOps.Graph.Indexer.Analytics;
|
||||
|
||||
public sealed class MongoGraphSnapshotProvider : IGraphSnapshotProvider
|
||||
{
|
||||
private readonly IMongoCollection<BsonDocument> _snapshots;
|
||||
private readonly IMongoCollection<BsonDocument> _progress;
|
||||
private readonly MongoGraphSnapshotProviderOptions _options;
|
||||
|
||||
public MongoGraphSnapshotProvider(IMongoDatabase database, MongoGraphSnapshotProviderOptions? options = null)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(database);
|
||||
_options = options ?? new MongoGraphSnapshotProviderOptions();
|
||||
_snapshots = database.GetCollection<BsonDocument>(_options.SnapshotCollectionName);
|
||||
_progress = database.GetCollection<BsonDocument>(_options.ProgressCollectionName);
|
||||
}
|
||||
|
||||
public async Task<IReadOnlyList<GraphAnalyticsSnapshot>> GetPendingSnapshotsAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var processedIds = await _progress
|
||||
.Find(FilterDefinition<BsonDocument>.Empty)
|
||||
.Project(doc => doc["snapshot_id"].AsString)
|
||||
.ToListAsync(cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
var filter = Builders<BsonDocument>.Filter.Nin("snapshot_id", processedIds);
|
||||
var snapshots = await _snapshots
|
||||
.Find(filter)
|
||||
.Limit(_options.MaxBatch)
|
||||
.Sort(Builders<BsonDocument>.Sort.Descending("generated_at"))
|
||||
.ToListAsync(cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
var result = new List<GraphAnalyticsSnapshot>(snapshots.Count);
|
||||
foreach (var snapshot in snapshots)
|
||||
{
|
||||
var tenant = snapshot.GetValue("tenant", string.Empty).AsString;
|
||||
var snapshotId = snapshot.GetValue("snapshot_id", string.Empty).AsString;
|
||||
var generatedAt = snapshot.TryGetValue("generated_at", out var generated)
|
||||
&& generated.TryToUniversalTime(out var dt)
|
||||
? dt
|
||||
: DateTimeOffset.UtcNow;
|
||||
|
||||
var nodes = snapshot.TryGetValue("nodes", out var nodesValue) && nodesValue is BsonArray nodesArray
|
||||
? BsonJsonConverter.ToJsonArray(nodesArray).Select(n => (JsonObject)n!).ToImmutableArray()
|
||||
: ImmutableArray<JsonObject>.Empty;
|
||||
|
||||
var edges = snapshot.TryGetValue("edges", out var edgesValue) && edgesValue is BsonArray edgesArray
|
||||
? BsonJsonConverter.ToJsonArray(edgesArray).Select(n => (JsonObject)n!).ToImmutableArray()
|
||||
: ImmutableArray<JsonObject>.Empty;
|
||||
|
||||
result.Add(new GraphAnalyticsSnapshot(tenant, snapshotId, generatedAt, nodes, edges));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public async Task MarkProcessedAsync(string tenant, string snapshotId, CancellationToken cancellationToken)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var filter = Builders<BsonDocument>.Filter.Eq("snapshot_id", snapshotId)
|
||||
& Builders<BsonDocument>.Filter.Eq("tenant", tenant);
|
||||
|
||||
var update = Builders<BsonDocument>.Update.Set("snapshot_id", snapshotId)
|
||||
.Set("tenant", tenant)
|
||||
.SetOnInsert("processed_at", DateTimeOffset.UtcNow.UtcDateTime);
|
||||
|
||||
await _progress.UpdateOneAsync(filter, update, new UpdateOptions { IsUpsert = true }, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user