using System; using System.Collections.Generic; using System.Linq; using System.Text.Json; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using MongoDB.Bson; using StellaOps.Feedser.Source.CertBund.Configuration; using StellaOps.Feedser.Source.CertBund.Internal; using StellaOps.Feedser.Source.Common; using StellaOps.Feedser.Source.Common.Fetch; using StellaOps.Feedser.Source.Common.Html; using StellaOps.Feedser.Storage.Mongo; using StellaOps.Feedser.Storage.Mongo.Advisories; using StellaOps.Feedser.Storage.Mongo.Documents; using StellaOps.Feedser.Storage.Mongo.Dtos; using StellaOps.Plugin; namespace StellaOps.Feedser.Source.CertBund; public sealed class CertBundConnector : IFeedConnector { private static readonly JsonSerializerOptions SerializerOptions = new(JsonSerializerDefaults.Web) { PropertyNameCaseInsensitive = true, DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull, }; private readonly CertBundFeedClient _feedClient; private readonly CertBundDetailParser _detailParser; private readonly SourceFetchService _fetchService; private readonly RawDocumentStorage _rawDocumentStorage; private readonly IDocumentStore _documentStore; private readonly IDtoStore _dtoStore; private readonly IAdvisoryStore _advisoryStore; private readonly ISourceStateRepository _stateRepository; private readonly CertBundOptions _options; private readonly TimeProvider _timeProvider; private readonly CertBundDiagnostics _diagnostics; private readonly ILogger _logger; public CertBundConnector( CertBundFeedClient feedClient, CertBundDetailParser detailParser, SourceFetchService fetchService, RawDocumentStorage rawDocumentStorage, IDocumentStore documentStore, IDtoStore dtoStore, IAdvisoryStore advisoryStore, ISourceStateRepository stateRepository, IOptions options, CertBundDiagnostics diagnostics, TimeProvider? timeProvider, ILogger logger) { _feedClient = feedClient ?? throw new ArgumentNullException(nameof(feedClient)); _detailParser = detailParser ?? throw new ArgumentNullException(nameof(detailParser)); _fetchService = fetchService ?? throw new ArgumentNullException(nameof(fetchService)); _rawDocumentStorage = rawDocumentStorage ?? throw new ArgumentNullException(nameof(rawDocumentStorage)); _documentStore = documentStore ?? throw new ArgumentNullException(nameof(documentStore)); _dtoStore = dtoStore ?? throw new ArgumentNullException(nameof(dtoStore)); _advisoryStore = advisoryStore ?? throw new ArgumentNullException(nameof(advisoryStore)); _stateRepository = stateRepository ?? throw new ArgumentNullException(nameof(stateRepository)); _options = (options ?? throw new ArgumentNullException(nameof(options))).Value ?? throw new ArgumentNullException(nameof(options)); _options.Validate(); _diagnostics = diagnostics ?? throw new ArgumentNullException(nameof(diagnostics)); _timeProvider = timeProvider ?? TimeProvider.System; _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public string SourceName => CertBundConnectorPlugin.SourceName; public async Task FetchAsync(IServiceProvider services, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(services); var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false); var now = _timeProvider.GetUtcNow(); IReadOnlyList feedItems; _diagnostics.FeedFetchAttempt(); try { feedItems = await _feedClient.LoadAsync(cancellationToken).ConfigureAwait(false); _diagnostics.FeedFetchSuccess(feedItems.Count); } catch (Exception ex) { _logger.LogError(ex, "CERT-Bund feed fetch failed"); _diagnostics.FeedFetchFailure(); await _stateRepository.MarkFailureAsync(SourceName, now, _options.FailureBackoff, ex.Message, cancellationToken).ConfigureAwait(false); throw; } var coverageDays = CalculateCoverageDays(feedItems, now); _diagnostics.RecordFeedCoverage(coverageDays); if (feedItems.Count == 0) { await UpdateCursorAsync(cursor.WithLastFetch(now), cancellationToken).ConfigureAwait(false); return; } var pendingDocuments = cursor.PendingDocuments.ToHashSet(); var pendingMappings = cursor.PendingMappings.ToHashSet(); var knownAdvisories = new HashSet(cursor.KnownAdvisories, StringComparer.OrdinalIgnoreCase); var processed = 0; var alreadyKnown = 0; var notModified = 0; var detailFailures = 0; var truncated = false; var latestPublished = cursor.LastPublished ?? DateTimeOffset.MinValue; foreach (var item in feedItems.OrderByDescending(static i => i.Published)) { cancellationToken.ThrowIfCancellationRequested(); if (knownAdvisories.Contains(item.AdvisoryId)) { alreadyKnown++; continue; } if (processed >= _options.MaxAdvisoriesPerFetch) { truncated = true; break; } try { _diagnostics.DetailFetchAttempt(); var existing = await _documentStore.FindBySourceAndUriAsync(SourceName, item.DetailUri.ToString(), cancellationToken).ConfigureAwait(false); var request = new SourceFetchRequest(CertBundOptions.HttpClientName, SourceName, item.DetailUri) { AcceptHeaders = new[] { "application/json", "text/json" }, Metadata = CertBundDocumentMetadata.CreateMetadata(item), ETag = existing?.Etag, LastModified = existing?.LastModified, TimeoutOverride = _options.RequestTimeout, }; var result = await _fetchService.FetchAsync(request, cancellationToken).ConfigureAwait(false); if (result.IsNotModified) { _diagnostics.DetailFetchNotModified(); notModified++; knownAdvisories.Add(item.AdvisoryId); continue; } if (!result.IsSuccess || result.Document is null) { _diagnostics.DetailFetchFailure("skipped"); detailFailures++; continue; } _diagnostics.DetailFetchSuccess(); pendingDocuments.Add(result.Document.Id); pendingMappings.Remove(result.Document.Id); knownAdvisories.Add(item.AdvisoryId); processed++; if (_options.RequestDelay > TimeSpan.Zero) { await Task.Delay(_options.RequestDelay, cancellationToken).ConfigureAwait(false); } } catch (Exception ex) { _logger.LogError(ex, "CERT-Bund detail fetch failed for {AdvisoryId}", item.AdvisoryId); _diagnostics.DetailFetchFailure("exception"); detailFailures++; await _stateRepository.MarkFailureAsync(SourceName, now, _options.FailureBackoff, ex.Message, cancellationToken).ConfigureAwait(false); throw; } if (item.Published > latestPublished) { latestPublished = item.Published; } } _diagnostics.DetailFetchEnqueued(processed); if (feedItems.Count > 0 || processed > 0 || detailFailures > 0) { _logger.LogInformation( "CERT-Bund fetch cycle: feed items {FeedItems}, enqueued {Enqueued}, already known {Known}, not modified {NotModified}, detail failures {DetailFailures}, pending documents {PendingDocuments}, pending mappings {PendingMappings}, truncated {Truncated}, coverageDays={CoverageDays}", feedItems.Count, processed, alreadyKnown, notModified, detailFailures, pendingDocuments.Count, pendingMappings.Count, truncated, coverageDays ?? double.NaN); } var trimmedKnown = knownAdvisories.Count > _options.MaxKnownAdvisories ? knownAdvisories.OrderByDescending(id => id, StringComparer.OrdinalIgnoreCase) .Take(_options.MaxKnownAdvisories) .ToArray() : knownAdvisories.ToArray(); var updatedCursor = cursor .WithPendingDocuments(pendingDocuments) .WithPendingMappings(pendingMappings) .WithKnownAdvisories(trimmedKnown) .WithLastPublished(latestPublished == DateTimeOffset.MinValue ? cursor.LastPublished : latestPublished) .WithLastFetch(now); await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false); } public async Task ParseAsync(IServiceProvider services, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(services); var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false); if (cursor.PendingDocuments.Count == 0) { return; } var remainingDocuments = cursor.PendingDocuments.ToHashSet(); var pendingMappings = cursor.PendingMappings.ToHashSet(); var now = _timeProvider.GetUtcNow(); var parsedCount = 0; var failedCount = 0; foreach (var documentId in cursor.PendingDocuments) { cancellationToken.ThrowIfCancellationRequested(); var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false); if (document is null) { remainingDocuments.Remove(documentId); pendingMappings.Remove(documentId); continue; } if (!document.GridFsId.HasValue) { await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false); remainingDocuments.Remove(documentId); pendingMappings.Remove(documentId); _diagnostics.ParseFailure("missing_payload"); failedCount++; continue; } byte[] payload; try { payload = await _rawDocumentStorage.DownloadAsync(document.GridFsId.Value, cancellationToken).ConfigureAwait(false); } catch (Exception ex) { _logger.LogError(ex, "CERT-Bund unable to download document {DocumentId}", document.Id); _diagnostics.ParseFailure("download_failed"); throw; } CertBundAdvisoryDto dto; try { dto = _detailParser.Parse(new Uri(document.Uri), new Uri(document.Metadata?["certbund.portalUri"] ?? document.Uri), payload); } catch (Exception ex) { _logger.LogError(ex, "CERT-Bund failed to parse advisory detail {DocumentId}", document.Id); await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false); remainingDocuments.Remove(documentId); pendingMappings.Remove(documentId); _diagnostics.ParseFailure("parse_error"); failedCount++; continue; } _diagnostics.ParseSuccess(dto.Products.Count, dto.CveIds.Count); parsedCount++; var bson = BsonDocument.Parse(JsonSerializer.Serialize(dto, SerializerOptions)); var dtoRecord = new DtoRecord(Guid.NewGuid(), document.Id, SourceName, "cert-bund.detail.v1", bson, now); await _dtoStore.UpsertAsync(dtoRecord, cancellationToken).ConfigureAwait(false); await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.PendingMap, cancellationToken).ConfigureAwait(false); remainingDocuments.Remove(documentId); pendingMappings.Add(document.Id); } if (cursor.PendingDocuments.Count > 0) { _logger.LogInformation( "CERT-Bund parse cycle: parsed {Parsed}, failures {Failures}, remaining documents {RemainingDocuments}, pending mappings {PendingMappings}", parsedCount, failedCount, remainingDocuments.Count, pendingMappings.Count); } var updatedCursor = cursor .WithPendingDocuments(remainingDocuments) .WithPendingMappings(pendingMappings); await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false); } public async Task MapAsync(IServiceProvider services, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(services); var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false); if (cursor.PendingMappings.Count == 0) { return; } var pendingMappings = cursor.PendingMappings.ToHashSet(); var mappedCount = 0; var failedCount = 0; foreach (var documentId in cursor.PendingMappings) { cancellationToken.ThrowIfCancellationRequested(); var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false); if (document is null) { pendingMappings.Remove(documentId); continue; } var dtoRecord = await _dtoStore.FindByDocumentIdAsync(documentId, cancellationToken).ConfigureAwait(false); if (dtoRecord is null) { await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false); pendingMappings.Remove(documentId); _diagnostics.MapFailure("missing_dto"); failedCount++; continue; } CertBundAdvisoryDto? dto; try { dto = JsonSerializer.Deserialize(dtoRecord.Payload.ToJson(), SerializerOptions); } catch (Exception ex) { _logger.LogError(ex, "CERT-Bund failed to deserialize DTO for document {DocumentId}", document.Id); await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false); pendingMappings.Remove(documentId); _diagnostics.MapFailure("deserialize_failed"); failedCount++; continue; } if (dto is null) { await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false); pendingMappings.Remove(documentId); _diagnostics.MapFailure("null_dto"); failedCount++; continue; } try { var advisory = CertBundMapper.Map(dto, document, dtoRecord.ValidatedAt); await _advisoryStore.UpsertAsync(advisory, cancellationToken).ConfigureAwait(false); await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Mapped, cancellationToken).ConfigureAwait(false); pendingMappings.Remove(documentId); _diagnostics.MapSuccess(advisory.AffectedPackages.Length, advisory.Aliases.Length); mappedCount++; } catch (Exception ex) { _logger.LogError(ex, "CERT-Bund mapping failed for document {DocumentId}", document.Id); await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false); pendingMappings.Remove(documentId); _diagnostics.MapFailure("exception"); failedCount++; } } if (cursor.PendingMappings.Count > 0) { _logger.LogInformation( "CERT-Bund map cycle: mapped {Mapped}, failures {Failures}, remaining pending mappings {PendingMappings}", mappedCount, failedCount, pendingMappings.Count); } var updatedCursor = cursor.WithPendingMappings(pendingMappings); await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false); } private static double? CalculateCoverageDays(IReadOnlyList items, DateTimeOffset fetchedAt) { if (items is null || items.Count == 0) { return null; } var oldest = items.Min(static item => item.Published); if (oldest == DateTimeOffset.MinValue) { return null; } var span = fetchedAt - oldest; return span >= TimeSpan.Zero ? span.TotalDays : null; } private async Task GetCursorAsync(CancellationToken cancellationToken) { var state = await _stateRepository.TryGetAsync(SourceName, cancellationToken).ConfigureAwait(false); return state is null ? CertBundCursor.Empty : CertBundCursor.FromBson(state.Cursor); } private Task UpdateCursorAsync(CertBundCursor cursor, CancellationToken cancellationToken) { var document = cursor.ToBsonDocument(); var completedAt = cursor.LastFetchAt ?? _timeProvider.GetUtcNow(); return _stateRepository.UpdateCursorAsync(SourceName, document, completedAt, cancellationToken); } }