using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using MongoDB.Bson; using MongoDB.Driver; using StellaOps.Signals.Models; namespace StellaOps.Signals.Persistence; public sealed class MongoUnknownsRepository : IUnknownsRepository { private readonly IMongoCollection collection; public MongoUnknownsRepository(IMongoCollection collection) { this.collection = collection ?? throw new ArgumentNullException(nameof(collection)); } public async Task UpsertAsync(string subjectKey, IEnumerable items, CancellationToken cancellationToken) { ArgumentException.ThrowIfNullOrWhiteSpace(subjectKey); ArgumentNullException.ThrowIfNull(items); // deterministic replace per subject to keep the registry stable await collection.DeleteManyAsync(doc => doc.SubjectKey == subjectKey, cancellationToken).ConfigureAwait(false); var batch = items.ToList(); if (batch.Count == 0) { return; } await collection.InsertManyAsync(batch, cancellationToken: cancellationToken).ConfigureAwait(false); } public async Task> GetBySubjectAsync(string subjectKey, CancellationToken cancellationToken) { ArgumentException.ThrowIfNullOrWhiteSpace(subjectKey); var cursor = await collection.FindAsync(doc => doc.SubjectKey == subjectKey, cancellationToken: cancellationToken).ConfigureAwait(false); return await cursor.ToListAsync(cancellationToken).ConfigureAwait(false); } public async Task CountBySubjectAsync(string subjectKey, CancellationToken cancellationToken) { ArgumentException.ThrowIfNullOrWhiteSpace(subjectKey); var count = await collection.CountDocumentsAsync(doc => doc.SubjectKey == subjectKey, cancellationToken: cancellationToken).ConfigureAwait(false); return (int)count; } }