using System.Collections.Generic; using System.Globalization; using System.Net; using System.Linq; using System.Net.Http; using System.Text; using System.Text.Json; using System.Text.Json.Serialization; using System.Threading; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using MongoDB.Bson; using StellaOps.Feedser.Models; using StellaOps.Feedser.Source.CertCc.Configuration; using StellaOps.Feedser.Source.CertCc.Internal; using StellaOps.Feedser.Source.Common; using StellaOps.Feedser.Source.Common.Fetch; using StellaOps.Feedser.Storage.Mongo; using StellaOps.Feedser.Storage.Mongo.Advisories; using StellaOps.Feedser.Storage.Mongo.Documents; using StellaOps.Feedser.Storage.Mongo.Dtos; using StellaOps.Plugin; namespace StellaOps.Feedser.Source.CertCc; public sealed class CertCcConnector : IFeedConnector { private static readonly JsonSerializerOptions DtoSerializerOptions = new(JsonSerializerDefaults.Web) { DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, PropertyNamingPolicy = JsonNamingPolicy.CamelCase, WriteIndented = false, }; private static readonly byte[] EmptyArrayPayload = Encoding.UTF8.GetBytes("[]"); private static readonly string[] DetailEndpoints = { "note", "vendors", "vuls", "vendors-vuls" }; private readonly CertCcSummaryPlanner _summaryPlanner; private readonly SourceFetchService _fetchService; private readonly RawDocumentStorage _rawDocumentStorage; private readonly IDocumentStore _documentStore; private readonly IDtoStore _dtoStore; private readonly IAdvisoryStore _advisoryStore; private readonly ISourceStateRepository _stateRepository; private readonly CertCcOptions _options; private readonly TimeProvider _timeProvider; private readonly ILogger _logger; private readonly CertCcDiagnostics _diagnostics; public CertCcConnector( CertCcSummaryPlanner summaryPlanner, SourceFetchService fetchService, RawDocumentStorage rawDocumentStorage, IDocumentStore documentStore, IDtoStore dtoStore, IAdvisoryStore advisoryStore, ISourceStateRepository stateRepository, IOptions options, CertCcDiagnostics diagnostics, TimeProvider? timeProvider, ILogger logger) { _summaryPlanner = summaryPlanner ?? throw new ArgumentNullException(nameof(summaryPlanner)); _fetchService = fetchService ?? throw new ArgumentNullException(nameof(fetchService)); _rawDocumentStorage = rawDocumentStorage ?? throw new ArgumentNullException(nameof(rawDocumentStorage)); _documentStore = documentStore ?? throw new ArgumentNullException(nameof(documentStore)); _dtoStore = dtoStore ?? throw new ArgumentNullException(nameof(dtoStore)); _advisoryStore = advisoryStore ?? throw new ArgumentNullException(nameof(advisoryStore)); _stateRepository = stateRepository ?? throw new ArgumentNullException(nameof(stateRepository)); _options = (options ?? throw new ArgumentNullException(nameof(options))).Value ?? throw new ArgumentNullException(nameof(options)); _options.Validate(); _diagnostics = diagnostics ?? throw new ArgumentNullException(nameof(diagnostics)); _timeProvider = timeProvider ?? TimeProvider.System; _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public string SourceName => CertCcConnectorPlugin.SourceName; public async Task FetchAsync(IServiceProvider services, CancellationToken cancellationToken) { var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false); var pendingDocuments = cursor.PendingDocuments.ToHashSet(); var pendingMappings = cursor.PendingMappings.ToHashSet(); var pendingNotes = new HashSet(cursor.PendingNotes, StringComparer.OrdinalIgnoreCase); var processedNotes = new HashSet(StringComparer.OrdinalIgnoreCase); var now = _timeProvider.GetUtcNow(); var remainingBudget = _options.MaxNotesPerFetch; // Resume notes that previously failed before fetching new summaries. if (pendingNotes.Count > 0 && remainingBudget > 0) { var replay = pendingNotes.ToArray(); foreach (var noteId in replay) { if (remainingBudget <= 0) { break; } try { if (!processedNotes.Add(noteId)) { continue; } if (await HasPendingDocumentBundleAsync(noteId, pendingDocuments, cancellationToken).ConfigureAwait(false)) { pendingNotes.Remove(noteId); continue; } await FetchNoteBundleAsync(noteId, null, pendingDocuments, pendingNotes, cancellationToken).ConfigureAwait(false); if (!pendingNotes.Contains(noteId)) { remainingBudget--; } } catch (Exception ex) { await _stateRepository.MarkFailureAsync(SourceName, now, TimeSpan.FromMinutes(5), ex.Message, cancellationToken).ConfigureAwait(false); throw; } } } var plan = _summaryPlanner.CreatePlan(cursor.SummaryState); _diagnostics.PlanEvaluated(plan.Window, plan.Requests.Count); try { foreach (var request in plan.Requests) { cancellationToken.ThrowIfCancellationRequested(); var shouldProcessNotes = remainingBudget > 0; try { _diagnostics.SummaryFetchAttempt(request.Scope); var metadata = BuildSummaryMetadata(request); var existingSummary = await _documentStore.FindBySourceAndUriAsync(SourceName, request.Uri.ToString(), cancellationToken).ConfigureAwait(false); var fetchRequest = new SourceFetchRequest( CertCcOptions.HttpClientName, SourceName, HttpMethod.Get, request.Uri, metadata, existingSummary?.Etag, existingSummary?.LastModified, null, new[] { "application/json" }); var result = await _fetchService.FetchAsync(fetchRequest, cancellationToken).ConfigureAwait(false); if (result.IsNotModified) { _diagnostics.SummaryFetchUnchanged(request.Scope); continue; } if (!result.IsSuccess || result.Document is null) { _diagnostics.SummaryFetchFailure(request.Scope); continue; } _diagnostics.SummaryFetchSuccess(request.Scope); if (!shouldProcessNotes) { continue; } var noteTokens = await ReadSummaryNotesAsync(result.Document, cancellationToken).ConfigureAwait(false); foreach (var token in noteTokens) { if (remainingBudget <= 0) { break; } var noteId = TryNormalizeNoteToken(token, out var vuIdentifier); if (string.IsNullOrEmpty(noteId)) { continue; } if (!processedNotes.Add(noteId)) { continue; } await FetchNoteBundleAsync(noteId, vuIdentifier, pendingDocuments, pendingNotes, cancellationToken).ConfigureAwait(false); if (!pendingNotes.Contains(noteId)) { remainingBudget--; } } } catch { _diagnostics.SummaryFetchFailure(request.Scope); throw; } } } catch (Exception ex) { var failureCursor = cursor .WithPendingSummaries(Array.Empty()) .WithPendingNotes(pendingNotes) .WithPendingDocuments(pendingDocuments) .WithPendingMappings(pendingMappings) .WithLastRun(now); await UpdateCursorAsync(failureCursor, cancellationToken).ConfigureAwait(false); await _stateRepository.MarkFailureAsync(SourceName, now, TimeSpan.FromMinutes(5), ex.Message, cancellationToken).ConfigureAwait(false); throw; } var updatedCursor = cursor .WithSummaryState(plan.NextState) .WithPendingSummaries(Array.Empty()) .WithPendingNotes(pendingNotes) .WithPendingDocuments(pendingDocuments) .WithPendingMappings(pendingMappings) .WithLastRun(now); await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false); } private async Task HasPendingDocumentBundleAsync(string noteId, HashSet pendingDocuments, CancellationToken cancellationToken) { if (pendingDocuments.Count == 0) { return false; } var required = new HashSet(DetailEndpoints, StringComparer.OrdinalIgnoreCase); foreach (var documentId in pendingDocuments) { var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false); if (document?.Metadata is null) { continue; } if (!document.Metadata.TryGetValue("certcc.noteId", out var metadataNoteId) || !string.Equals(metadataNoteId, noteId, StringComparison.OrdinalIgnoreCase)) { continue; } var endpoint = document.Metadata.TryGetValue("certcc.endpoint", out var endpointValue) ? endpointValue : "note"; required.Remove(endpoint); if (required.Count == 0) { return true; } } return false; } public async Task ParseAsync(IServiceProvider services, CancellationToken cancellationToken) { if (!_options.EnableDetailMapping) { return; } var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false); if (cursor.PendingDocuments.Count == 0) { return; } var pendingDocuments = cursor.PendingDocuments.ToHashSet(); var pendingMappings = cursor.PendingMappings.ToHashSet(); var groups = new Dictionary(StringComparer.OrdinalIgnoreCase); foreach (var documentId in cursor.PendingDocuments) { cancellationToken.ThrowIfCancellationRequested(); var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false); if (document is null) { pendingDocuments.Remove(documentId); continue; } if (!TryGetMetadata(document, "certcc.noteId", out var noteId) || string.IsNullOrWhiteSpace(noteId)) { await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false); pendingDocuments.Remove(documentId); continue; } var endpoint = TryGetMetadata(document, "certcc.endpoint", out var endpointValue) ? endpointValue : "note"; var group = groups.TryGetValue(noteId, out var existing) ? existing : (groups[noteId] = new NoteDocumentGroup(noteId)); group.Add(endpoint, document); } foreach (var group in groups.Values) { cancellationToken.ThrowIfCancellationRequested(); if (group.Note is null) { continue; } try { var noteBytes = await DownloadDocumentAsync(group.Note, cancellationToken).ConfigureAwait(false); var vendorsBytes = group.Vendors is null ? EmptyArrayPayload : await DownloadDocumentAsync(group.Vendors, cancellationToken).ConfigureAwait(false); var vulsBytes = group.Vuls is null ? EmptyArrayPayload : await DownloadDocumentAsync(group.Vuls, cancellationToken).ConfigureAwait(false); var vendorStatusesBytes = group.VendorStatuses is null ? EmptyArrayPayload : await DownloadDocumentAsync(group.VendorStatuses, cancellationToken).ConfigureAwait(false); var dto = CertCcNoteParser.Parse(noteBytes, vendorsBytes, vulsBytes, vendorStatusesBytes); var json = JsonSerializer.Serialize(dto, DtoSerializerOptions); var payload = MongoDB.Bson.BsonDocument.Parse(json); _diagnostics.ParseSuccess( dto.Vendors.Count, dto.VendorStatuses.Count, dto.Vulnerabilities.Count); var dtoRecord = new DtoRecord( Guid.NewGuid(), group.Note.Id, SourceName, "certcc.vince.note.v1", payload, _timeProvider.GetUtcNow()); await _dtoStore.UpsertAsync(dtoRecord, cancellationToken).ConfigureAwait(false); await _documentStore.UpdateStatusAsync(group.Note.Id, DocumentStatuses.PendingMap, cancellationToken).ConfigureAwait(false); pendingMappings.Add(group.Note.Id); pendingDocuments.Remove(group.Note.Id); if (group.Vendors is not null) { await _documentStore.UpdateStatusAsync(group.Vendors.Id, DocumentStatuses.Mapped, cancellationToken).ConfigureAwait(false); pendingDocuments.Remove(group.Vendors.Id); } if (group.Vuls is not null) { await _documentStore.UpdateStatusAsync(group.Vuls.Id, DocumentStatuses.Mapped, cancellationToken).ConfigureAwait(false); pendingDocuments.Remove(group.Vuls.Id); } if (group.VendorStatuses is not null) { await _documentStore.UpdateStatusAsync(group.VendorStatuses.Id, DocumentStatuses.Mapped, cancellationToken).ConfigureAwait(false); pendingDocuments.Remove(group.VendorStatuses.Id); } } catch (Exception ex) { _diagnostics.ParseFailure(); _logger.LogError(ex, "CERT/CC parse failed for note {NoteId}", group.NoteId); if (group.Note is not null) { await _documentStore.UpdateStatusAsync(group.Note.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false); pendingDocuments.Remove(group.Note.Id); } } } var updatedCursor = cursor .WithPendingDocuments(pendingDocuments) .WithPendingMappings(pendingMappings); await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false); } public async Task MapAsync(IServiceProvider services, CancellationToken cancellationToken) { if (!_options.EnableDetailMapping) { return; } var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false); if (cursor.PendingMappings.Count == 0) { return; } var pendingMappings = cursor.PendingMappings.ToHashSet(); foreach (var documentId in cursor.PendingMappings) { cancellationToken.ThrowIfCancellationRequested(); var dtoRecord = await _dtoStore.FindByDocumentIdAsync(documentId, cancellationToken).ConfigureAwait(false); var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false); if (dtoRecord is null || document is null) { pendingMappings.Remove(documentId); continue; } try { var json = dtoRecord.Payload.ToJson(); var dto = JsonSerializer.Deserialize(json, DtoSerializerOptions); if (dto is null) { throw new InvalidOperationException($"CERT/CC DTO payload deserialized as null for document {documentId}."); } var advisory = CertCcMapper.Map(dto, document, dtoRecord, SourceName); var affectedCount = advisory.AffectedPackages.Length; var normalizedRuleCount = advisory.AffectedPackages.Sum(static package => package.NormalizedVersions.Length); await _advisoryStore.UpsertAsync(advisory, cancellationToken).ConfigureAwait(false); await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Mapped, cancellationToken).ConfigureAwait(false); _diagnostics.MapSuccess(affectedCount, normalizedRuleCount); } catch (Exception ex) { _diagnostics.MapFailure(); _logger.LogError(ex, "CERT/CC mapping failed for document {DocumentId}", documentId); await _documentStore.UpdateStatusAsync(documentId, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false); } pendingMappings.Remove(documentId); } var updatedCursor = cursor.WithPendingMappings(pendingMappings); await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false); } private async Task FetchNoteBundleAsync( string noteId, string? vuIdentifier, HashSet pendingDocuments, HashSet pendingNotes, CancellationToken cancellationToken) { var missingEndpoints = new List<(string Endpoint, HttpStatusCode? Status)>(); try { foreach (var endpoint in DetailEndpoints) { cancellationToken.ThrowIfCancellationRequested(); var uri = BuildDetailUri(noteId, endpoint); var metadata = BuildDetailMetadata(noteId, vuIdentifier, endpoint); var existing = await _documentStore.FindBySourceAndUriAsync(SourceName, uri.ToString(), cancellationToken).ConfigureAwait(false); var request = new SourceFetchRequest(CertCcOptions.HttpClientName, SourceName, uri) { Metadata = metadata, ETag = existing?.Etag, LastModified = existing?.LastModified, AcceptHeaders = new[] { "application/json" }, }; SourceFetchResult result; _diagnostics.DetailFetchAttempt(endpoint); try { result = await _fetchService.FetchAsync(request, cancellationToken).ConfigureAwait(false); } catch (HttpRequestException httpEx) { var status = httpEx.StatusCode ?? TryParseStatusCodeFromMessage(httpEx.Message); if (ShouldTreatAsMissing(status, endpoint)) { _diagnostics.DetailFetchMissing(endpoint); missingEndpoints.Add((endpoint, status)); continue; } _diagnostics.DetailFetchFailure(endpoint); throw; } Guid documentId; if (result.IsSuccess && result.Document is not null) { _diagnostics.DetailFetchSuccess(endpoint); documentId = result.Document.Id; } else if (result.IsNotModified) { _diagnostics.DetailFetchUnchanged(endpoint); if (existing is null) { continue; } documentId = existing.Id; } else { _diagnostics.DetailFetchFailure(endpoint); _logger.LogWarning( "CERT/CC detail endpoint {Endpoint} returned {StatusCode} for note {NoteId}; will retry.", endpoint, (int)result.StatusCode, noteId); throw new HttpRequestException( $"CERT/CC endpoint '{endpoint}' returned {(int)result.StatusCode} ({result.StatusCode}) for note {noteId}.", null, result.StatusCode); } pendingDocuments.Add(documentId); if (_options.DetailRequestDelay > TimeSpan.Zero) { await Task.Delay(_options.DetailRequestDelay, cancellationToken).ConfigureAwait(false); } } if (missingEndpoints.Count > 0) { var formatted = string.Join( ", ", missingEndpoints.Select(item => item.Status.HasValue ? $"{item.Endpoint} ({(int)item.Status.Value})" : item.Endpoint)); _logger.LogWarning( "CERT/CC detail fetch completed with missing endpoints for note {NoteId}: {Endpoints}", noteId, formatted); } pendingNotes.Remove(noteId); } catch (Exception ex) { _logger.LogError(ex, "CERT/CC detail fetch failed for note {NoteId}", noteId); pendingNotes.Add(noteId); throw; } } private static Dictionary BuildSummaryMetadata(CertCcSummaryRequest request) { var metadata = new Dictionary(StringComparer.OrdinalIgnoreCase) { ["certcc.scope"] = request.Scope.ToString().ToLowerInvariant(), ["certcc.year"] = request.Year.ToString("D4", CultureInfo.InvariantCulture), }; if (request.Month.HasValue) { metadata["certcc.month"] = request.Month.Value.ToString("D2", CultureInfo.InvariantCulture); } return metadata; } private static Dictionary BuildDetailMetadata(string noteId, string? vuIdentifier, string endpoint) { var metadata = new Dictionary(StringComparer.OrdinalIgnoreCase) { ["certcc.endpoint"] = endpoint, ["certcc.noteId"] = noteId, }; if (!string.IsNullOrWhiteSpace(vuIdentifier)) { metadata["certcc.vuid"] = vuIdentifier; } return metadata; } private async Task> ReadSummaryNotesAsync(DocumentRecord document, CancellationToken cancellationToken) { if (!document.GridFsId.HasValue) { return Array.Empty(); } var payload = await _rawDocumentStorage.DownloadAsync(document.GridFsId.Value, cancellationToken).ConfigureAwait(false); return CertCcSummaryParser.ParseNotes(payload); } private async Task DownloadDocumentAsync(DocumentRecord document, CancellationToken cancellationToken) { if (!document.GridFsId.HasValue) { throw new InvalidOperationException($"Document {document.Id} has no GridFS payload."); } return await _rawDocumentStorage.DownloadAsync(document.GridFsId.Value, cancellationToken).ConfigureAwait(false); } private Uri BuildDetailUri(string noteId, string endpoint) { var suffix = endpoint switch { "note" => $"{noteId}/", "vendors" => $"{noteId}/vendors/", "vuls" => $"{noteId}/vuls/", "vendors-vuls" => $"{noteId}/vendors/vuls/", _ => $"{noteId}/", }; return new Uri(_options.BaseApiUri, suffix); } private static string? TryNormalizeNoteToken(string token, out string? vuIdentifier) { vuIdentifier = null; if (string.IsNullOrWhiteSpace(token)) { return null; } var trimmed = token.Trim(); var digits = new string(trimmed.Where(char.IsDigit).ToArray()); if (digits.Length == 0) { return null; } vuIdentifier = trimmed.StartsWith("vu", StringComparison.OrdinalIgnoreCase) ? trimmed.Replace(" ", string.Empty, StringComparison.Ordinal) : $"VU#{digits}"; return digits; } private static bool TryGetMetadata(DocumentRecord document, string key, out string value) { value = string.Empty; if (document.Metadata is null) { return false; } if (!document.Metadata.TryGetValue(key, out var metadataValue)) { return false; } value = metadataValue; return true; } private async Task GetCursorAsync(CancellationToken cancellationToken) { var record = await _stateRepository.TryGetAsync(SourceName, cancellationToken).ConfigureAwait(false); return CertCcCursor.FromBson(record?.Cursor); } private async Task UpdateCursorAsync(CertCcCursor cursor, CancellationToken cancellationToken) { var completedAt = _timeProvider.GetUtcNow(); await _stateRepository.UpdateCursorAsync(SourceName, cursor.ToBsonDocument(), completedAt, cancellationToken).ConfigureAwait(false); } private sealed class NoteDocumentGroup { public NoteDocumentGroup(string noteId) { NoteId = noteId; } public string NoteId { get; } public DocumentRecord? Note { get; private set; } public DocumentRecord? Vendors { get; private set; } public DocumentRecord? Vuls { get; private set; } public DocumentRecord? VendorStatuses { get; private set; } public void Add(string endpoint, DocumentRecord document) { switch (endpoint) { case "note": Note = document; break; case "vendors": Vendors = document; break; case "vuls": Vuls = document; break; case "vendors-vuls": VendorStatuses = document; break; default: Note ??= document; break; } } } private static bool ShouldTreatAsMissing(HttpStatusCode? statusCode, string endpoint) { if (statusCode is null) { return false; } if (statusCode is HttpStatusCode.NotFound or HttpStatusCode.Gone) { return !string.Equals(endpoint, "note", StringComparison.OrdinalIgnoreCase); } // Treat vendors/vendors-vuls/vuls 403 as optional air-gapped responses. if (statusCode == HttpStatusCode.Forbidden && !string.Equals(endpoint, "note", StringComparison.OrdinalIgnoreCase)) { return true; } return false; } private static HttpStatusCode? TryParseStatusCodeFromMessage(string? message) { if (string.IsNullOrWhiteSpace(message)) { return null; } const string marker = "status "; var index = message.IndexOf(marker, StringComparison.OrdinalIgnoreCase); if (index < 0) { return null; } index += marker.Length; var end = index; while (end < message.Length && char.IsDigit(message[end])) { end++; } if (end == index) { return null; } if (int.TryParse(message[index..end], NumberStyles.Integer, CultureInfo.InvariantCulture, out var code) && Enum.IsDefined(typeof(HttpStatusCode), code)) { return (HttpStatusCode)code; } return null; } }