using System; using System.Collections.Generic; using System.Linq; using System.Text.Json; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using MongoDB.Bson; using StellaOps.Feedser.Source.CertFr.Configuration; using StellaOps.Feedser.Source.CertFr.Internal; using StellaOps.Feedser.Source.Common; using StellaOps.Feedser.Source.Common.Fetch; using StellaOps.Feedser.Storage.Mongo; using StellaOps.Feedser.Storage.Mongo.Advisories; using StellaOps.Feedser.Storage.Mongo.Documents; using StellaOps.Feedser.Storage.Mongo.Dtos; using StellaOps.Plugin; namespace StellaOps.Feedser.Source.CertFr; public sealed class CertFrConnector : IFeedConnector { private static readonly JsonSerializerOptions SerializerOptions = new() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase, DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull, }; private readonly CertFrFeedClient _feedClient; private readonly SourceFetchService _fetchService; private readonly RawDocumentStorage _rawDocumentStorage; private readonly IDocumentStore _documentStore; private readonly IDtoStore _dtoStore; private readonly IAdvisoryStore _advisoryStore; private readonly ISourceStateRepository _stateRepository; private readonly CertFrOptions _options; private readonly TimeProvider _timeProvider; private readonly ILogger _logger; public CertFrConnector( CertFrFeedClient feedClient, SourceFetchService fetchService, RawDocumentStorage rawDocumentStorage, IDocumentStore documentStore, IDtoStore dtoStore, IAdvisoryStore advisoryStore, ISourceStateRepository stateRepository, IOptions options, TimeProvider? timeProvider, ILogger logger) { _feedClient = feedClient ?? throw new ArgumentNullException(nameof(feedClient)); _fetchService = fetchService ?? throw new ArgumentNullException(nameof(fetchService)); _rawDocumentStorage = rawDocumentStorage ?? throw new ArgumentNullException(nameof(rawDocumentStorage)); _documentStore = documentStore ?? throw new ArgumentNullException(nameof(documentStore)); _dtoStore = dtoStore ?? throw new ArgumentNullException(nameof(dtoStore)); _advisoryStore = advisoryStore ?? throw new ArgumentNullException(nameof(advisoryStore)); _stateRepository = stateRepository ?? throw new ArgumentNullException(nameof(stateRepository)); _options = (options ?? throw new ArgumentNullException(nameof(options))).Value ?? throw new ArgumentNullException(nameof(options)); _options.Validate(); _timeProvider = timeProvider ?? TimeProvider.System; _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public string SourceName => CertFrConnectorPlugin.SourceName; public async Task FetchAsync(IServiceProvider services, CancellationToken cancellationToken) { var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false); var now = _timeProvider.GetUtcNow(); var windowEnd = now; var lastPublished = cursor.LastPublished ?? now - _options.InitialBackfill; var windowStart = lastPublished - _options.WindowOverlap; var minStart = now - _options.InitialBackfill; if (windowStart < minStart) { windowStart = minStart; } IReadOnlyList items; try { items = await _feedClient.LoadAsync(windowStart, windowEnd, cancellationToken).ConfigureAwait(false); } catch (Exception ex) { _logger.LogError(ex, "Cert-FR feed load failed {Start:o}-{End:o}", windowStart, windowEnd); await _stateRepository.MarkFailureAsync(SourceName, now, TimeSpan.FromMinutes(10), ex.Message, cancellationToken).ConfigureAwait(false); throw; } if (items.Count == 0) { await UpdateCursorAsync(cursor.WithLastPublished(windowEnd), cancellationToken).ConfigureAwait(false); return; } var pendingDocuments = cursor.PendingDocuments.ToList(); var pendingMappings = cursor.PendingMappings.ToList(); var maxPublished = cursor.LastPublished ?? DateTimeOffset.MinValue; foreach (var item in items) { cancellationToken.ThrowIfCancellationRequested(); try { var existing = await _documentStore.FindBySourceAndUriAsync(SourceName, item.DetailUri.ToString(), cancellationToken).ConfigureAwait(false); var request = new SourceFetchRequest(CertFrOptions.HttpClientName, SourceName, item.DetailUri) { Metadata = CertFrDocumentMetadata.CreateMetadata(item), ETag = existing?.Etag, LastModified = existing?.LastModified, AcceptHeaders = new[] { "text/html", "application/xhtml+xml", "text/plain;q=0.5" }, }; var result = await _fetchService.FetchAsync(request, cancellationToken).ConfigureAwait(false); if (result.IsNotModified || !result.IsSuccess || result.Document is null) { if (item.Published > maxPublished) { maxPublished = item.Published; } continue; } if (existing is not null && string.Equals(existing.Sha256, result.Document.Sha256, StringComparison.OrdinalIgnoreCase) && string.Equals(existing.Status, DocumentStatuses.Mapped, StringComparison.Ordinal)) { await _documentStore.UpdateStatusAsync(result.Document.Id, existing.Status, cancellationToken).ConfigureAwait(false); if (item.Published > maxPublished) { maxPublished = item.Published; } continue; } if (!pendingDocuments.Contains(result.Document.Id)) { pendingDocuments.Add(result.Document.Id); } if (item.Published > maxPublished) { maxPublished = item.Published; } if (_options.RequestDelay > TimeSpan.Zero) { await Task.Delay(_options.RequestDelay, cancellationToken).ConfigureAwait(false); } } catch (Exception ex) { _logger.LogError(ex, "Cert-FR fetch failed for {Uri}", item.DetailUri); await _stateRepository.MarkFailureAsync(SourceName, _timeProvider.GetUtcNow(), TimeSpan.FromMinutes(5), ex.Message, cancellationToken).ConfigureAwait(false); throw; } } if (maxPublished == DateTimeOffset.MinValue) { maxPublished = cursor.LastPublished ?? windowEnd; } var updatedCursor = cursor .WithPendingDocuments(pendingDocuments) .WithPendingMappings(pendingMappings) .WithLastPublished(maxPublished); await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false); } public async Task ParseAsync(IServiceProvider services, CancellationToken cancellationToken) { var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false); if (cursor.PendingDocuments.Count == 0) { return; } var pendingDocuments = cursor.PendingDocuments.ToList(); var pendingMappings = cursor.PendingMappings.ToList(); foreach (var documentId in cursor.PendingDocuments) { cancellationToken.ThrowIfCancellationRequested(); var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false); if (document is null) { pendingDocuments.Remove(documentId); pendingMappings.Remove(documentId); continue; } if (!document.GridFsId.HasValue) { _logger.LogWarning("Cert-FR document {DocumentId} missing GridFS payload", document.Id); await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false); pendingDocuments.Remove(documentId); pendingMappings.Remove(documentId); continue; } CertFrDocumentMetadata metadata; try { metadata = CertFrDocumentMetadata.FromDocument(document); } catch (Exception ex) { _logger.LogError(ex, "Cert-FR metadata parse failed for document {DocumentId}", document.Id); await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false); pendingDocuments.Remove(documentId); pendingMappings.Remove(documentId); continue; } CertFrDto dto; try { var content = await _rawDocumentStorage.DownloadAsync(document.GridFsId.Value, cancellationToken).ConfigureAwait(false); var html = System.Text.Encoding.UTF8.GetString(content); dto = CertFrParser.Parse(html, metadata); } catch (Exception ex) { _logger.LogError(ex, "Cert-FR parse failed for advisory {AdvisoryId} ({Uri})", metadata.AdvisoryId, document.Uri); await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false); pendingDocuments.Remove(documentId); pendingMappings.Remove(documentId); continue; } var json = JsonSerializer.Serialize(dto, SerializerOptions); var payload = BsonDocument.Parse(json); var validatedAt = _timeProvider.GetUtcNow(); var existingDto = await _dtoStore.FindByDocumentIdAsync(document.Id, cancellationToken).ConfigureAwait(false); var dtoRecord = existingDto is null ? new DtoRecord(Guid.NewGuid(), document.Id, SourceName, "certfr.detail.v1", payload, validatedAt) : existingDto with { Payload = payload, SchemaVersion = "certfr.detail.v1", ValidatedAt = validatedAt, }; await _dtoStore.UpsertAsync(dtoRecord, cancellationToken).ConfigureAwait(false); await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.PendingMap, cancellationToken).ConfigureAwait(false); pendingDocuments.Remove(documentId); if (!pendingMappings.Contains(documentId)) { pendingMappings.Add(documentId); } } var updatedCursor = cursor .WithPendingDocuments(pendingDocuments) .WithPendingMappings(pendingMappings); await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false); } public async Task MapAsync(IServiceProvider services, CancellationToken cancellationToken) { var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false); if (cursor.PendingMappings.Count == 0) { return; } var pendingMappings = cursor.PendingMappings.ToList(); foreach (var documentId in cursor.PendingMappings) { cancellationToken.ThrowIfCancellationRequested(); var dtoRecord = await _dtoStore.FindByDocumentIdAsync(documentId, cancellationToken).ConfigureAwait(false); var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false); if (dtoRecord is null || document is null) { pendingMappings.Remove(documentId); continue; } CertFrDto? dto; try { var json = dtoRecord.Payload.ToJson(); dto = JsonSerializer.Deserialize(json, SerializerOptions); } catch (Exception ex) { _logger.LogError(ex, "Cert-FR DTO deserialization failed for document {DocumentId}", documentId); await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false); pendingMappings.Remove(documentId); continue; } if (dto is null) { _logger.LogWarning("Cert-FR DTO payload deserialized as null for document {DocumentId}", documentId); await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false); pendingMappings.Remove(documentId); continue; } var mappedAt = _timeProvider.GetUtcNow(); var advisory = CertFrMapper.Map(dto, SourceName, mappedAt); await _advisoryStore.UpsertAsync(advisory, cancellationToken).ConfigureAwait(false); await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Mapped, cancellationToken).ConfigureAwait(false); pendingMappings.Remove(documentId); } var updatedCursor = cursor.WithPendingMappings(pendingMappings); await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false); } private async Task GetCursorAsync(CancellationToken cancellationToken) { var record = await _stateRepository.TryGetAsync(SourceName, cancellationToken).ConfigureAwait(false); return CertFrCursor.FromBson(record?.Cursor); } private async Task UpdateCursorAsync(CertFrCursor cursor, CancellationToken cancellationToken) { var completedAt = _timeProvider.GetUtcNow(); await _stateRepository.UpdateCursorAsync(SourceName, cursor.ToBsonDocument(), completedAt, cancellationToken).ConfigureAwait(false); } }