using System.Security.Cryptography; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using MongoDB.Bson; using StellaOps.Concelier.Connector.Common.Fetch; using StellaOps.Concelier.Storage.Mongo; using StellaOps.Concelier.Storage.Mongo.Documents; namespace StellaOps.Concelier.Connector.Common.State; /// /// Persists raw documents and cursor state for connectors that require manual seeding. /// public sealed class SourceStateSeedProcessor { private readonly IDocumentStore _documentStore; private readonly RawDocumentStorage _rawDocumentStorage; private readonly ISourceStateRepository _stateRepository; private readonly TimeProvider _timeProvider; private readonly ILogger _logger; public SourceStateSeedProcessor( IDocumentStore documentStore, RawDocumentStorage rawDocumentStorage, ISourceStateRepository stateRepository, TimeProvider? timeProvider = null, ILogger? logger = null) { _documentStore = documentStore ?? throw new ArgumentNullException(nameof(documentStore)); _rawDocumentStorage = rawDocumentStorage ?? throw new ArgumentNullException(nameof(rawDocumentStorage)); _stateRepository = stateRepository ?? throw new ArgumentNullException(nameof(stateRepository)); _timeProvider = timeProvider ?? TimeProvider.System; _logger = logger ?? NullLogger.Instance; } public async Task ProcessAsync(SourceStateSeedSpecification specification, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(specification); ArgumentException.ThrowIfNullOrEmpty(specification.Source); var completedAt = specification.CompletedAt ?? _timeProvider.GetUtcNow(); var documentIds = new List(); var pendingDocumentIds = new HashSet(); var pendingMappingIds = new HashSet(); var knownAdvisories = new HashSet(StringComparer.OrdinalIgnoreCase); AppendRange(knownAdvisories, specification.KnownAdvisories); if (specification.Cursor is { } cursorSeed) { AppendRange(pendingDocumentIds, cursorSeed.PendingDocuments); AppendRange(pendingMappingIds, cursorSeed.PendingMappings); AppendRange(knownAdvisories, cursorSeed.KnownAdvisories); } foreach (var document in specification.Documents ?? Array.Empty()) { cancellationToken.ThrowIfCancellationRequested(); await ProcessDocumentAsync(specification.Source, document, completedAt, documentIds, pendingDocumentIds, pendingMappingIds, knownAdvisories, cancellationToken).ConfigureAwait(false); } var state = await _stateRepository.TryGetAsync(specification.Source, cancellationToken).ConfigureAwait(false); var cursor = state?.Cursor ?? new BsonDocument(); var newlyPendingDocuments = MergeGuidArray(cursor, "pendingDocuments", pendingDocumentIds); var newlyPendingMappings = MergeGuidArray(cursor, "pendingMappings", pendingMappingIds); var newlyKnownAdvisories = MergeStringArray(cursor, "knownAdvisories", knownAdvisories); if (specification.Cursor is { } cursorSpec) { if (cursorSpec.LastModifiedCursor.HasValue) { cursor["lastModifiedCursor"] = cursorSpec.LastModifiedCursor.Value.UtcDateTime; } if (cursorSpec.LastFetchAt.HasValue) { cursor["lastFetchAt"] = cursorSpec.LastFetchAt.Value.UtcDateTime; } if (cursorSpec.Additional is not null) { foreach (var kvp in cursorSpec.Additional) { cursor[kvp.Key] = kvp.Value; } } } cursor["lastSeededAt"] = completedAt.UtcDateTime; await _stateRepository.UpdateCursorAsync(specification.Source, cursor, completedAt, cancellationToken).ConfigureAwait(false); _logger.LogInformation( "Seeded {Documents} document(s) for {Source}. pendingDocuments+= {PendingDocuments}, pendingMappings+= {PendingMappings}, knownAdvisories+= {KnownAdvisories}", documentIds.Count, specification.Source, newlyPendingDocuments.Count, newlyPendingMappings.Count, newlyKnownAdvisories.Count); return new SourceStateSeedResult( DocumentsProcessed: documentIds.Count, PendingDocumentsAdded: newlyPendingDocuments.Count, PendingMappingsAdded: newlyPendingMappings.Count, DocumentIds: documentIds.AsReadOnly(), PendingDocumentIds: newlyPendingDocuments, PendingMappingIds: newlyPendingMappings, KnownAdvisoriesAdded: newlyKnownAdvisories, CompletedAt: completedAt); } private async Task ProcessDocumentAsync( string source, SourceStateSeedDocument document, DateTimeOffset completedAt, List documentIds, HashSet pendingDocumentIds, HashSet pendingMappingIds, HashSet knownAdvisories, CancellationToken cancellationToken) { if (document is null) { throw new ArgumentNullException(nameof(document)); } ArgumentException.ThrowIfNullOrEmpty(document.Uri); if (document.Content is not { Length: > 0 }) { throw new InvalidOperationException($"Seed entry for '{document.Uri}' is missing content bytes."); } var payload = new byte[document.Content.Length]; Buffer.BlockCopy(document.Content, 0, payload, 0, document.Content.Length); if (!document.Uri.Contains("://", StringComparison.Ordinal)) { _logger.LogWarning("Seed document URI '{Uri}' does not appear to be absolute.", document.Uri); } var sha256 = Convert.ToHexString(SHA256.HashData(payload)).ToLowerInvariant(); var existing = await _documentStore.FindBySourceAndUriAsync(source, document.Uri, cancellationToken).ConfigureAwait(false); if (existing?.GridFsId is { } oldGridId) { await _rawDocumentStorage.DeleteAsync(oldGridId, cancellationToken).ConfigureAwait(false); } var gridId = await _rawDocumentStorage.UploadAsync( source, document.Uri, payload, document.ContentType, document.ExpiresAt, cancellationToken) .ConfigureAwait(false); var headers = CloneDictionary(document.Headers); if (!string.IsNullOrWhiteSpace(document.ContentType)) { headers ??= new Dictionary(StringComparer.OrdinalIgnoreCase); if (!headers.ContainsKey("content-type")) { headers["content-type"] = document.ContentType!; } } var metadata = CloneDictionary(document.Metadata); var record = new DocumentRecord( document.DocumentId ?? existing?.Id ?? Guid.NewGuid(), source, document.Uri, document.FetchedAt ?? completedAt, sha256, string.IsNullOrWhiteSpace(document.Status) ? DocumentStatuses.PendingParse : document.Status, document.ContentType, headers, metadata, document.Etag, document.LastModified, gridId, document.ExpiresAt); var upserted = await _documentStore.UpsertAsync(record, cancellationToken).ConfigureAwait(false); documentIds.Add(upserted.Id); if (document.AddToPendingDocuments) { pendingDocumentIds.Add(upserted.Id); } if (document.AddToPendingMappings) { pendingMappingIds.Add(upserted.Id); } AppendRange(knownAdvisories, document.KnownIdentifiers); } private static Dictionary? CloneDictionary(IReadOnlyDictionary? values) { if (values is null || values.Count == 0) { return null; } return new Dictionary(values, StringComparer.OrdinalIgnoreCase); } private static IReadOnlyCollection MergeGuidArray(BsonDocument cursor, string field, IReadOnlyCollection additions) { if (additions.Count == 0) { return Array.Empty(); } var existing = cursor.TryGetValue(field, out var value) && value is BsonArray existingArray ? existingArray.Select(AsGuid).Where(static g => g != Guid.Empty).ToHashSet() : new HashSet(); var newlyAdded = new List(); foreach (var guid in additions) { if (guid == Guid.Empty) { continue; } if (existing.Add(guid)) { newlyAdded.Add(guid); } } if (existing.Count > 0) { cursor[field] = new BsonArray(existing .Select(static g => g.ToString("D")) .OrderBy(static s => s, StringComparer.OrdinalIgnoreCase)); } return newlyAdded.AsReadOnly(); } private static IReadOnlyCollection MergeStringArray(BsonDocument cursor, string field, IReadOnlyCollection additions) { if (additions.Count == 0) { return Array.Empty(); } var existing = cursor.TryGetValue(field, out var value) && value is BsonArray existingArray ? existingArray.Select(static v => v?.AsString ?? string.Empty) .Where(static s => !string.IsNullOrWhiteSpace(s)) .ToHashSet(StringComparer.OrdinalIgnoreCase) : new HashSet(StringComparer.OrdinalIgnoreCase); var newlyAdded = new List(); foreach (var entry in additions) { if (string.IsNullOrWhiteSpace(entry)) { continue; } var normalized = entry.Trim(); if (existing.Add(normalized)) { newlyAdded.Add(normalized); } } if (existing.Count > 0) { cursor[field] = new BsonArray(existing .OrderBy(static s => s, StringComparer.OrdinalIgnoreCase)); } return newlyAdded.AsReadOnly(); } private static Guid AsGuid(BsonValue value) { if (value is null) { return Guid.Empty; } return Guid.TryParse(value.ToString(), out var parsed) ? parsed : Guid.Empty; } private static void AppendRange(HashSet target, IReadOnlyCollection? values) { if (values is null) { return; } foreach (var guid in values) { if (guid != Guid.Empty) { target.Add(guid); } } } private static void AppendRange(HashSet target, IReadOnlyCollection? values) { if (values is null) { return; } foreach (var value in values) { if (string.IsNullOrWhiteSpace(value)) { continue; } target.Add(value.Trim()); } } }