Files
git.stella-ops.org/src/StellaOps.Feedser.Source.CertCc/CertCcConnector.cs
master 0f1b203fde
Some checks failed
Build Test Deploy / build-test (push) Has been cancelled
Build Test Deploy / authority-container (push) Has been cancelled
Build Test Deploy / docs (push) Has been cancelled
Build Test Deploy / deploy (push) Has been cancelled
tam
2025-10-12 20:42:07 +00:00

780 lines
29 KiB
C#

using System.Collections.Generic;
using System.Globalization;
using System.Net;
using System.Linq;
using System.Net.Http;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MongoDB.Bson;
using StellaOps.Feedser.Models;
using StellaOps.Feedser.Source.CertCc.Configuration;
using StellaOps.Feedser.Source.CertCc.Internal;
using StellaOps.Feedser.Source.Common;
using StellaOps.Feedser.Source.Common.Fetch;
using StellaOps.Feedser.Storage.Mongo;
using StellaOps.Feedser.Storage.Mongo.Advisories;
using StellaOps.Feedser.Storage.Mongo.Documents;
using StellaOps.Feedser.Storage.Mongo.Dtos;
using StellaOps.Plugin;
namespace StellaOps.Feedser.Source.CertCc;
public sealed class CertCcConnector : IFeedConnector
{
private static readonly JsonSerializerOptions DtoSerializerOptions = new(JsonSerializerDefaults.Web)
{
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
WriteIndented = false,
};
private static readonly byte[] EmptyArrayPayload = Encoding.UTF8.GetBytes("[]");
private static readonly string[] DetailEndpoints = { "note", "vendors", "vuls", "vendors-vuls" };
private readonly CertCcSummaryPlanner _summaryPlanner;
private readonly SourceFetchService _fetchService;
private readonly RawDocumentStorage _rawDocumentStorage;
private readonly IDocumentStore _documentStore;
private readonly IDtoStore _dtoStore;
private readonly IAdvisoryStore _advisoryStore;
private readonly ISourceStateRepository _stateRepository;
private readonly CertCcOptions _options;
private readonly TimeProvider _timeProvider;
private readonly ILogger<CertCcConnector> _logger;
private readonly CertCcDiagnostics _diagnostics;
public CertCcConnector(
CertCcSummaryPlanner summaryPlanner,
SourceFetchService fetchService,
RawDocumentStorage rawDocumentStorage,
IDocumentStore documentStore,
IDtoStore dtoStore,
IAdvisoryStore advisoryStore,
ISourceStateRepository stateRepository,
IOptions<CertCcOptions> options,
CertCcDiagnostics diagnostics,
TimeProvider? timeProvider,
ILogger<CertCcConnector> logger)
{
_summaryPlanner = summaryPlanner ?? throw new ArgumentNullException(nameof(summaryPlanner));
_fetchService = fetchService ?? throw new ArgumentNullException(nameof(fetchService));
_rawDocumentStorage = rawDocumentStorage ?? throw new ArgumentNullException(nameof(rawDocumentStorage));
_documentStore = documentStore ?? throw new ArgumentNullException(nameof(documentStore));
_dtoStore = dtoStore ?? throw new ArgumentNullException(nameof(dtoStore));
_advisoryStore = advisoryStore ?? throw new ArgumentNullException(nameof(advisoryStore));
_stateRepository = stateRepository ?? throw new ArgumentNullException(nameof(stateRepository));
_options = (options ?? throw new ArgumentNullException(nameof(options))).Value ?? throw new ArgumentNullException(nameof(options));
_options.Validate();
_diagnostics = diagnostics ?? throw new ArgumentNullException(nameof(diagnostics));
_timeProvider = timeProvider ?? TimeProvider.System;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public string SourceName => CertCcConnectorPlugin.SourceName;
public async Task FetchAsync(IServiceProvider services, CancellationToken cancellationToken)
{
var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false);
var pendingDocuments = cursor.PendingDocuments.ToHashSet();
var pendingMappings = cursor.PendingMappings.ToHashSet();
var pendingNotes = new HashSet<string>(cursor.PendingNotes, StringComparer.OrdinalIgnoreCase);
var processedNotes = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
var now = _timeProvider.GetUtcNow();
var remainingBudget = _options.MaxNotesPerFetch;
// Resume notes that previously failed before fetching new summaries.
if (pendingNotes.Count > 0 && remainingBudget > 0)
{
var replay = pendingNotes.ToArray();
foreach (var noteId in replay)
{
if (remainingBudget <= 0)
{
break;
}
try
{
if (!processedNotes.Add(noteId))
{
continue;
}
if (await HasPendingDocumentBundleAsync(noteId, pendingDocuments, cancellationToken).ConfigureAwait(false))
{
pendingNotes.Remove(noteId);
continue;
}
await FetchNoteBundleAsync(noteId, null, pendingDocuments, pendingNotes, cancellationToken).ConfigureAwait(false);
if (!pendingNotes.Contains(noteId))
{
remainingBudget--;
}
}
catch (Exception ex)
{
await _stateRepository.MarkFailureAsync(SourceName, now, TimeSpan.FromMinutes(5), ex.Message, cancellationToken).ConfigureAwait(false);
throw;
}
}
}
var plan = _summaryPlanner.CreatePlan(cursor.SummaryState);
_diagnostics.PlanEvaluated(plan.Window, plan.Requests.Count);
try
{
foreach (var request in plan.Requests)
{
cancellationToken.ThrowIfCancellationRequested();
var shouldProcessNotes = remainingBudget > 0;
try
{
_diagnostics.SummaryFetchAttempt(request.Scope);
var metadata = BuildSummaryMetadata(request);
var existingSummary = await _documentStore.FindBySourceAndUriAsync(SourceName, request.Uri.ToString(), cancellationToken).ConfigureAwait(false);
var fetchRequest = new SourceFetchRequest(
CertCcOptions.HttpClientName,
SourceName,
HttpMethod.Get,
request.Uri,
metadata,
existingSummary?.Etag,
existingSummary?.LastModified,
null,
new[] { "application/json" });
var result = await _fetchService.FetchAsync(fetchRequest, cancellationToken).ConfigureAwait(false);
if (result.IsNotModified)
{
_diagnostics.SummaryFetchUnchanged(request.Scope);
continue;
}
if (!result.IsSuccess || result.Document is null)
{
_diagnostics.SummaryFetchFailure(request.Scope);
continue;
}
_diagnostics.SummaryFetchSuccess(request.Scope);
if (!shouldProcessNotes)
{
continue;
}
var noteTokens = await ReadSummaryNotesAsync(result.Document, cancellationToken).ConfigureAwait(false);
foreach (var token in noteTokens)
{
if (remainingBudget <= 0)
{
break;
}
var noteId = TryNormalizeNoteToken(token, out var vuIdentifier);
if (string.IsNullOrEmpty(noteId))
{
continue;
}
if (!processedNotes.Add(noteId))
{
continue;
}
await FetchNoteBundleAsync(noteId, vuIdentifier, pendingDocuments, pendingNotes, cancellationToken).ConfigureAwait(false);
if (!pendingNotes.Contains(noteId))
{
remainingBudget--;
}
}
}
catch
{
_diagnostics.SummaryFetchFailure(request.Scope);
throw;
}
}
}
catch (Exception ex)
{
var failureCursor = cursor
.WithPendingSummaries(Array.Empty<Guid>())
.WithPendingNotes(pendingNotes)
.WithPendingDocuments(pendingDocuments)
.WithPendingMappings(pendingMappings)
.WithLastRun(now);
await UpdateCursorAsync(failureCursor, cancellationToken).ConfigureAwait(false);
await _stateRepository.MarkFailureAsync(SourceName, now, TimeSpan.FromMinutes(5), ex.Message, cancellationToken).ConfigureAwait(false);
throw;
}
var updatedCursor = cursor
.WithSummaryState(plan.NextState)
.WithPendingSummaries(Array.Empty<Guid>())
.WithPendingNotes(pendingNotes)
.WithPendingDocuments(pendingDocuments)
.WithPendingMappings(pendingMappings)
.WithLastRun(now);
await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false);
}
private async Task<bool> HasPendingDocumentBundleAsync(string noteId, HashSet<Guid> pendingDocuments, CancellationToken cancellationToken)
{
if (pendingDocuments.Count == 0)
{
return false;
}
var required = new HashSet<string>(DetailEndpoints, StringComparer.OrdinalIgnoreCase);
foreach (var documentId in pendingDocuments)
{
var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false);
if (document?.Metadata is null)
{
continue;
}
if (!document.Metadata.TryGetValue("certcc.noteId", out var metadataNoteId) ||
!string.Equals(metadataNoteId, noteId, StringComparison.OrdinalIgnoreCase))
{
continue;
}
var endpoint = document.Metadata.TryGetValue("certcc.endpoint", out var endpointValue)
? endpointValue
: "note";
required.Remove(endpoint);
if (required.Count == 0)
{
return true;
}
}
return false;
}
public async Task ParseAsync(IServiceProvider services, CancellationToken cancellationToken)
{
if (!_options.EnableDetailMapping)
{
return;
}
var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false);
if (cursor.PendingDocuments.Count == 0)
{
return;
}
var pendingDocuments = cursor.PendingDocuments.ToHashSet();
var pendingMappings = cursor.PendingMappings.ToHashSet();
var groups = new Dictionary<string, NoteDocumentGroup>(StringComparer.OrdinalIgnoreCase);
foreach (var documentId in cursor.PendingDocuments)
{
cancellationToken.ThrowIfCancellationRequested();
var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false);
if (document is null)
{
pendingDocuments.Remove(documentId);
continue;
}
if (!TryGetMetadata(document, "certcc.noteId", out var noteId) || string.IsNullOrWhiteSpace(noteId))
{
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
pendingDocuments.Remove(documentId);
continue;
}
var endpoint = TryGetMetadata(document, "certcc.endpoint", out var endpointValue)
? endpointValue
: "note";
var group = groups.TryGetValue(noteId, out var existing)
? existing
: (groups[noteId] = new NoteDocumentGroup(noteId));
group.Add(endpoint, document);
}
foreach (var group in groups.Values)
{
cancellationToken.ThrowIfCancellationRequested();
if (group.Note is null)
{
continue;
}
try
{
var noteBytes = await DownloadDocumentAsync(group.Note, cancellationToken).ConfigureAwait(false);
var vendorsBytes = group.Vendors is null
? EmptyArrayPayload
: await DownloadDocumentAsync(group.Vendors, cancellationToken).ConfigureAwait(false);
var vulsBytes = group.Vuls is null
? EmptyArrayPayload
: await DownloadDocumentAsync(group.Vuls, cancellationToken).ConfigureAwait(false);
var vendorStatusesBytes = group.VendorStatuses is null
? EmptyArrayPayload
: await DownloadDocumentAsync(group.VendorStatuses, cancellationToken).ConfigureAwait(false);
var dto = CertCcNoteParser.Parse(noteBytes, vendorsBytes, vulsBytes, vendorStatusesBytes);
var json = JsonSerializer.Serialize(dto, DtoSerializerOptions);
var payload = MongoDB.Bson.BsonDocument.Parse(json);
_diagnostics.ParseSuccess(
dto.Vendors.Count,
dto.VendorStatuses.Count,
dto.Vulnerabilities.Count);
var dtoRecord = new DtoRecord(
Guid.NewGuid(),
group.Note.Id,
SourceName,
"certcc.vince.note.v1",
payload,
_timeProvider.GetUtcNow());
await _dtoStore.UpsertAsync(dtoRecord, cancellationToken).ConfigureAwait(false);
await _documentStore.UpdateStatusAsync(group.Note.Id, DocumentStatuses.PendingMap, cancellationToken).ConfigureAwait(false);
pendingMappings.Add(group.Note.Id);
pendingDocuments.Remove(group.Note.Id);
if (group.Vendors is not null)
{
await _documentStore.UpdateStatusAsync(group.Vendors.Id, DocumentStatuses.Mapped, cancellationToken).ConfigureAwait(false);
pendingDocuments.Remove(group.Vendors.Id);
}
if (group.Vuls is not null)
{
await _documentStore.UpdateStatusAsync(group.Vuls.Id, DocumentStatuses.Mapped, cancellationToken).ConfigureAwait(false);
pendingDocuments.Remove(group.Vuls.Id);
}
if (group.VendorStatuses is not null)
{
await _documentStore.UpdateStatusAsync(group.VendorStatuses.Id, DocumentStatuses.Mapped, cancellationToken).ConfigureAwait(false);
pendingDocuments.Remove(group.VendorStatuses.Id);
}
}
catch (Exception ex)
{
_diagnostics.ParseFailure();
_logger.LogError(ex, "CERT/CC parse failed for note {NoteId}", group.NoteId);
if (group.Note is not null)
{
await _documentStore.UpdateStatusAsync(group.Note.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
pendingDocuments.Remove(group.Note.Id);
}
}
}
var updatedCursor = cursor
.WithPendingDocuments(pendingDocuments)
.WithPendingMappings(pendingMappings);
await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false);
}
public async Task MapAsync(IServiceProvider services, CancellationToken cancellationToken)
{
if (!_options.EnableDetailMapping)
{
return;
}
var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false);
if (cursor.PendingMappings.Count == 0)
{
return;
}
var pendingMappings = cursor.PendingMappings.ToHashSet();
foreach (var documentId in cursor.PendingMappings)
{
cancellationToken.ThrowIfCancellationRequested();
var dtoRecord = await _dtoStore.FindByDocumentIdAsync(documentId, cancellationToken).ConfigureAwait(false);
var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false);
if (dtoRecord is null || document is null)
{
pendingMappings.Remove(documentId);
continue;
}
try
{
var json = dtoRecord.Payload.ToJson();
var dto = JsonSerializer.Deserialize<CertCcNoteDto>(json, DtoSerializerOptions);
if (dto is null)
{
throw new InvalidOperationException($"CERT/CC DTO payload deserialized as null for document {documentId}.");
}
var advisory = CertCcMapper.Map(dto, document, dtoRecord, SourceName);
var affectedCount = advisory.AffectedPackages.Length;
var normalizedRuleCount = advisory.AffectedPackages.Sum(static package => package.NormalizedVersions.Length);
await _advisoryStore.UpsertAsync(advisory, cancellationToken).ConfigureAwait(false);
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Mapped, cancellationToken).ConfigureAwait(false);
_diagnostics.MapSuccess(affectedCount, normalizedRuleCount);
}
catch (Exception ex)
{
_diagnostics.MapFailure();
_logger.LogError(ex, "CERT/CC mapping failed for document {DocumentId}", documentId);
await _documentStore.UpdateStatusAsync(documentId, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
}
pendingMappings.Remove(documentId);
}
var updatedCursor = cursor.WithPendingMappings(pendingMappings);
await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false);
}
private async Task FetchNoteBundleAsync(
string noteId,
string? vuIdentifier,
HashSet<Guid> pendingDocuments,
HashSet<string> pendingNotes,
CancellationToken cancellationToken)
{
var missingEndpoints = new List<(string Endpoint, HttpStatusCode? Status)>();
try
{
foreach (var endpoint in DetailEndpoints)
{
cancellationToken.ThrowIfCancellationRequested();
var uri = BuildDetailUri(noteId, endpoint);
var metadata = BuildDetailMetadata(noteId, vuIdentifier, endpoint);
var existing = await _documentStore.FindBySourceAndUriAsync(SourceName, uri.ToString(), cancellationToken).ConfigureAwait(false);
var request = new SourceFetchRequest(CertCcOptions.HttpClientName, SourceName, uri)
{
Metadata = metadata,
ETag = existing?.Etag,
LastModified = existing?.LastModified,
AcceptHeaders = new[] { "application/json" },
};
SourceFetchResult result;
_diagnostics.DetailFetchAttempt(endpoint);
try
{
result = await _fetchService.FetchAsync(request, cancellationToken).ConfigureAwait(false);
}
catch (HttpRequestException httpEx)
{
var status = httpEx.StatusCode ?? TryParseStatusCodeFromMessage(httpEx.Message);
if (ShouldTreatAsMissing(status, endpoint))
{
_diagnostics.DetailFetchMissing(endpoint);
missingEndpoints.Add((endpoint, status));
continue;
}
_diagnostics.DetailFetchFailure(endpoint);
throw;
}
Guid documentId;
if (result.IsSuccess && result.Document is not null)
{
_diagnostics.DetailFetchSuccess(endpoint);
documentId = result.Document.Id;
}
else if (result.IsNotModified)
{
_diagnostics.DetailFetchUnchanged(endpoint);
if (existing is null)
{
continue;
}
documentId = existing.Id;
}
else
{
_diagnostics.DetailFetchFailure(endpoint);
_logger.LogWarning(
"CERT/CC detail endpoint {Endpoint} returned {StatusCode} for note {NoteId}; will retry.",
endpoint,
(int)result.StatusCode,
noteId);
throw new HttpRequestException(
$"CERT/CC endpoint '{endpoint}' returned {(int)result.StatusCode} ({result.StatusCode}) for note {noteId}.",
null,
result.StatusCode);
}
pendingDocuments.Add(documentId);
if (_options.DetailRequestDelay > TimeSpan.Zero)
{
await Task.Delay(_options.DetailRequestDelay, cancellationToken).ConfigureAwait(false);
}
}
if (missingEndpoints.Count > 0)
{
var formatted = string.Join(
", ",
missingEndpoints.Select(item =>
item.Status.HasValue
? $"{item.Endpoint} ({(int)item.Status.Value})"
: item.Endpoint));
_logger.LogWarning(
"CERT/CC detail fetch completed with missing endpoints for note {NoteId}: {Endpoints}",
noteId,
formatted);
}
pendingNotes.Remove(noteId);
}
catch (Exception ex)
{
_logger.LogError(ex, "CERT/CC detail fetch failed for note {NoteId}", noteId);
pendingNotes.Add(noteId);
throw;
}
}
private static Dictionary<string, string> BuildSummaryMetadata(CertCcSummaryRequest request)
{
var metadata = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase)
{
["certcc.scope"] = request.Scope.ToString().ToLowerInvariant(),
["certcc.year"] = request.Year.ToString("D4", CultureInfo.InvariantCulture),
};
if (request.Month.HasValue)
{
metadata["certcc.month"] = request.Month.Value.ToString("D2", CultureInfo.InvariantCulture);
}
return metadata;
}
private static Dictionary<string, string> BuildDetailMetadata(string noteId, string? vuIdentifier, string endpoint)
{
var metadata = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase)
{
["certcc.endpoint"] = endpoint,
["certcc.noteId"] = noteId,
};
if (!string.IsNullOrWhiteSpace(vuIdentifier))
{
metadata["certcc.vuid"] = vuIdentifier;
}
return metadata;
}
private async Task<IReadOnlyList<string>> ReadSummaryNotesAsync(DocumentRecord document, CancellationToken cancellationToken)
{
if (!document.GridFsId.HasValue)
{
return Array.Empty<string>();
}
var payload = await _rawDocumentStorage.DownloadAsync(document.GridFsId.Value, cancellationToken).ConfigureAwait(false);
return CertCcSummaryParser.ParseNotes(payload);
}
private async Task<byte[]> DownloadDocumentAsync(DocumentRecord document, CancellationToken cancellationToken)
{
if (!document.GridFsId.HasValue)
{
throw new InvalidOperationException($"Document {document.Id} has no GridFS payload.");
}
return await _rawDocumentStorage.DownloadAsync(document.GridFsId.Value, cancellationToken).ConfigureAwait(false);
}
private Uri BuildDetailUri(string noteId, string endpoint)
{
var suffix = endpoint switch
{
"note" => $"{noteId}/",
"vendors" => $"{noteId}/vendors/",
"vuls" => $"{noteId}/vuls/",
"vendors-vuls" => $"{noteId}/vendors/vuls/",
_ => $"{noteId}/",
};
return new Uri(_options.BaseApiUri, suffix);
}
private static string? TryNormalizeNoteToken(string token, out string? vuIdentifier)
{
vuIdentifier = null;
if (string.IsNullOrWhiteSpace(token))
{
return null;
}
var trimmed = token.Trim();
var digits = new string(trimmed.Where(char.IsDigit).ToArray());
if (digits.Length == 0)
{
return null;
}
vuIdentifier = trimmed.StartsWith("vu", StringComparison.OrdinalIgnoreCase)
? trimmed.Replace(" ", string.Empty, StringComparison.Ordinal)
: $"VU#{digits}";
return digits;
}
private static bool TryGetMetadata(DocumentRecord document, string key, out string value)
{
value = string.Empty;
if (document.Metadata is null)
{
return false;
}
if (!document.Metadata.TryGetValue(key, out var metadataValue))
{
return false;
}
value = metadataValue;
return true;
}
private async Task<CertCcCursor> GetCursorAsync(CancellationToken cancellationToken)
{
var record = await _stateRepository.TryGetAsync(SourceName, cancellationToken).ConfigureAwait(false);
return CertCcCursor.FromBson(record?.Cursor);
}
private async Task UpdateCursorAsync(CertCcCursor cursor, CancellationToken cancellationToken)
{
var completedAt = _timeProvider.GetUtcNow();
await _stateRepository.UpdateCursorAsync(SourceName, cursor.ToBsonDocument(), completedAt, cancellationToken).ConfigureAwait(false);
}
private sealed class NoteDocumentGroup
{
public NoteDocumentGroup(string noteId)
{
NoteId = noteId;
}
public string NoteId { get; }
public DocumentRecord? Note { get; private set; }
public DocumentRecord? Vendors { get; private set; }
public DocumentRecord? Vuls { get; private set; }
public DocumentRecord? VendorStatuses { get; private set; }
public void Add(string endpoint, DocumentRecord document)
{
switch (endpoint)
{
case "note":
Note = document;
break;
case "vendors":
Vendors = document;
break;
case "vuls":
Vuls = document;
break;
case "vendors-vuls":
VendorStatuses = document;
break;
default:
Note ??= document;
break;
}
}
}
private static bool ShouldTreatAsMissing(HttpStatusCode? statusCode, string endpoint)
{
if (statusCode is null)
{
return false;
}
if (statusCode is HttpStatusCode.NotFound or HttpStatusCode.Gone)
{
return !string.Equals(endpoint, "note", StringComparison.OrdinalIgnoreCase);
}
// Treat vendors/vendors-vuls/vuls 403 as optional air-gapped responses.
if (statusCode == HttpStatusCode.Forbidden && !string.Equals(endpoint, "note", StringComparison.OrdinalIgnoreCase))
{
return true;
}
return false;
}
private static HttpStatusCode? TryParseStatusCodeFromMessage(string? message)
{
if (string.IsNullOrWhiteSpace(message))
{
return null;
}
const string marker = "status ";
var index = message.IndexOf(marker, StringComparison.OrdinalIgnoreCase);
if (index < 0)
{
return null;
}
index += marker.Length;
var end = index;
while (end < message.Length && char.IsDigit(message[end]))
{
end++;
}
if (end == index)
{
return null;
}
if (int.TryParse(message[index..end], NumberStyles.Integer, CultureInfo.InvariantCulture, out var code) &&
Enum.IsDefined(typeof(HttpStatusCode), code))
{
return (HttpStatusCode)code;
}
return null;
}
}