up
Some checks failed
Build Test Deploy / docs (push) Has been cancelled
Build Test Deploy / deploy (push) Has been cancelled
Build Test Deploy / build-test (push) Has been cancelled
Build Test Deploy / authority-container (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
Some checks failed
Build Test Deploy / docs (push) Has been cancelled
Build Test Deploy / deploy (push) Has been cancelled
Build Test Deploy / build-test (push) Has been cancelled
Build Test Deploy / authority-container (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
This commit is contained in:
@@ -1,45 +1,75 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Globalization;
|
||||
using System.Net;
|
||||
using System.Linq;
|
||||
using System.Net.Http;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using System.Text.Json.Serialization;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using MongoDB.Bson;
|
||||
using StellaOps.Feedser.Models;
|
||||
using StellaOps.Feedser.Source.CertCc.Configuration;
|
||||
using StellaOps.Feedser.Source.CertCc.Internal;
|
||||
using StellaOps.Feedser.Source.Common;
|
||||
using StellaOps.Feedser.Source.Common.Fetch;
|
||||
using StellaOps.Feedser.Source.Common.Cursors;
|
||||
using StellaOps.Feedser.Storage.Mongo;
|
||||
using StellaOps.Feedser.Storage.Mongo.Advisories;
|
||||
using StellaOps.Feedser.Storage.Mongo.Documents;
|
||||
using StellaOps.Feedser.Storage.Mongo.Dtos;
|
||||
using StellaOps.Plugin;
|
||||
|
||||
namespace StellaOps.Feedser.Source.CertCc;
|
||||
|
||||
public sealed class CertCcConnector : IFeedConnector
|
||||
{
|
||||
private static readonly JsonSerializerOptions DtoSerializerOptions = new(JsonSerializerDefaults.Web)
|
||||
{
|
||||
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
|
||||
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
|
||||
WriteIndented = false,
|
||||
};
|
||||
|
||||
private static readonly byte[] EmptyArrayPayload = Encoding.UTF8.GetBytes("[]");
|
||||
private static readonly string[] DetailEndpoints = { "note", "vendors", "vuls", "vendors-vuls" };
|
||||
|
||||
private readonly CertCcSummaryPlanner _summaryPlanner;
|
||||
private readonly SourceFetchService _fetchService;
|
||||
private readonly RawDocumentStorage _rawDocumentStorage;
|
||||
private readonly IDocumentStore _documentStore;
|
||||
private readonly IDtoStore _dtoStore;
|
||||
private readonly IAdvisoryStore _advisoryStore;
|
||||
private readonly ISourceStateRepository _stateRepository;
|
||||
private readonly CertCcOptions _options;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly ILogger<CertCcConnector> _logger;
|
||||
private readonly CertCcDiagnostics _diagnostics;
|
||||
|
||||
public CertCcConnector(
|
||||
CertCcSummaryPlanner summaryPlanner,
|
||||
SourceFetchService fetchService,
|
||||
RawDocumentStorage rawDocumentStorage,
|
||||
IDocumentStore documentStore,
|
||||
IDtoStore dtoStore,
|
||||
IAdvisoryStore advisoryStore,
|
||||
ISourceStateRepository stateRepository,
|
||||
IOptions<CertCcOptions> options,
|
||||
CertCcDiagnostics diagnostics,
|
||||
TimeProvider? timeProvider,
|
||||
ILogger<CertCcConnector> logger)
|
||||
{
|
||||
_summaryPlanner = summaryPlanner ?? throw new ArgumentNullException(nameof(summaryPlanner));
|
||||
_fetchService = fetchService ?? throw new ArgumentNullException(nameof(fetchService));
|
||||
_rawDocumentStorage = rawDocumentStorage ?? throw new ArgumentNullException(nameof(rawDocumentStorage));
|
||||
_documentStore = documentStore ?? throw new ArgumentNullException(nameof(documentStore));
|
||||
_dtoStore = dtoStore ?? throw new ArgumentNullException(nameof(dtoStore));
|
||||
_advisoryStore = advisoryStore ?? throw new ArgumentNullException(nameof(advisoryStore));
|
||||
_stateRepository = stateRepository ?? throw new ArgumentNullException(nameof(stateRepository));
|
||||
_options = (options ?? throw new ArgumentNullException(nameof(options))).Value ?? throw new ArgumentNullException(nameof(options));
|
||||
_options.Validate();
|
||||
_diagnostics = diagnostics ?? throw new ArgumentNullException(nameof(diagnostics));
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
@@ -49,66 +79,595 @@ public sealed class CertCcConnector : IFeedConnector
|
||||
public async Task FetchAsync(IServiceProvider services, CancellationToken cancellationToken)
|
||||
{
|
||||
var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false);
|
||||
var plan = _summaryPlanner.CreatePlan(cursor.SummaryState);
|
||||
if (plan.Requests.Count == 0)
|
||||
|
||||
var pendingDocuments = cursor.PendingDocuments.ToHashSet();
|
||||
var pendingMappings = cursor.PendingMappings.ToHashSet();
|
||||
var pendingNotes = new HashSet<string>(cursor.PendingNotes, StringComparer.OrdinalIgnoreCase);
|
||||
var processedNotes = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var remainingBudget = _options.MaxNotesPerFetch;
|
||||
|
||||
// Resume notes that previously failed before fetching new summaries.
|
||||
if (pendingNotes.Count > 0 && remainingBudget > 0)
|
||||
{
|
||||
await UpdateCursorAsync(cursor.WithSummaryState(plan.NextState).WithLastRun(_timeProvider.GetUtcNow()), cancellationToken).ConfigureAwait(false);
|
||||
return;
|
||||
var replay = pendingNotes.ToArray();
|
||||
foreach (var noteId in replay)
|
||||
{
|
||||
if (remainingBudget <= 0)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
if (!processedNotes.Add(noteId))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (await HasPendingDocumentBundleAsync(noteId, pendingDocuments, cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
pendingNotes.Remove(noteId);
|
||||
continue;
|
||||
}
|
||||
|
||||
await FetchNoteBundleAsync(noteId, null, pendingDocuments, pendingNotes, cancellationToken).ConfigureAwait(false);
|
||||
if (!pendingNotes.Contains(noteId))
|
||||
{
|
||||
remainingBudget--;
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
await _stateRepository.MarkFailureAsync(SourceName, now, TimeSpan.FromMinutes(5), ex.Message, cancellationToken).ConfigureAwait(false);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
foreach (var request in plan.Requests)
|
||||
var plan = _summaryPlanner.CreatePlan(cursor.SummaryState);
|
||||
_diagnostics.PlanEvaluated(plan.Window, plan.Requests.Count);
|
||||
|
||||
try
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
try
|
||||
foreach (var request in plan.Requests)
|
||||
{
|
||||
var uri = request.Uri;
|
||||
var existing = await _documentStore.FindBySourceAndUriAsync(SourceName, uri.ToString(), cancellationToken).ConfigureAwait(false);
|
||||
var metadata = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase)
|
||||
{
|
||||
["certcc.scope"] = request.Scope.ToString().ToLowerInvariant(),
|
||||
["certcc.year"] = request.Year.ToString("D4"),
|
||||
};
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
var shouldProcessNotes = remainingBudget > 0;
|
||||
|
||||
if (request.Month.HasValue)
|
||||
try
|
||||
{
|
||||
metadata["certcc.month"] = request.Month.Value.ToString("D2");
|
||||
_diagnostics.SummaryFetchAttempt(request.Scope);
|
||||
var metadata = BuildSummaryMetadata(request);
|
||||
var existingSummary = await _documentStore.FindBySourceAndUriAsync(SourceName, request.Uri.ToString(), cancellationToken).ConfigureAwait(false);
|
||||
var fetchRequest = new SourceFetchRequest(
|
||||
CertCcOptions.HttpClientName,
|
||||
SourceName,
|
||||
HttpMethod.Get,
|
||||
request.Uri,
|
||||
metadata,
|
||||
existingSummary?.Etag,
|
||||
existingSummary?.LastModified,
|
||||
null,
|
||||
new[] { "application/json" });
|
||||
|
||||
var result = await _fetchService.FetchAsync(fetchRequest, cancellationToken).ConfigureAwait(false);
|
||||
if (result.IsNotModified)
|
||||
{
|
||||
_diagnostics.SummaryFetchUnchanged(request.Scope);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!result.IsSuccess || result.Document is null)
|
||||
{
|
||||
_diagnostics.SummaryFetchFailure(request.Scope);
|
||||
continue;
|
||||
}
|
||||
|
||||
_diagnostics.SummaryFetchSuccess(request.Scope);
|
||||
|
||||
if (!shouldProcessNotes)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
var noteTokens = await ReadSummaryNotesAsync(result.Document, cancellationToken).ConfigureAwait(false);
|
||||
foreach (var token in noteTokens)
|
||||
{
|
||||
if (remainingBudget <= 0)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
var noteId = TryNormalizeNoteToken(token, out var vuIdentifier);
|
||||
if (string.IsNullOrEmpty(noteId))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!processedNotes.Add(noteId))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
await FetchNoteBundleAsync(noteId, vuIdentifier, pendingDocuments, pendingNotes, cancellationToken).ConfigureAwait(false);
|
||||
if (!pendingNotes.Contains(noteId))
|
||||
{
|
||||
remainingBudget--;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var fetchRequest = new SourceFetchRequest(CertCcOptions.HttpClientName, SourceName, uri)
|
||||
catch
|
||||
{
|
||||
Metadata = metadata,
|
||||
AcceptHeaders = new[] { "application/json" },
|
||||
ETag = existing?.Etag,
|
||||
LastModified = existing?.LastModified,
|
||||
};
|
||||
|
||||
var result = await _fetchService.FetchAsync(fetchRequest, cancellationToken).ConfigureAwait(false);
|
||||
if (result.IsNotModified)
|
||||
{
|
||||
_logger.LogDebug("CERT/CC summary {Uri} returned 304 Not Modified", uri);
|
||||
_diagnostics.SummaryFetchFailure(request.Scope);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "CERT/CC summary fetch failed for {Uri}", request.Uri);
|
||||
await _stateRepository.MarkFailureAsync(SourceName, _timeProvider.GetUtcNow(), TimeSpan.FromMinutes(5), ex.Message, cancellationToken).ConfigureAwait(false);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
var failureCursor = cursor
|
||||
.WithPendingSummaries(Array.Empty<Guid>())
|
||||
.WithPendingNotes(pendingNotes)
|
||||
.WithPendingDocuments(pendingDocuments)
|
||||
.WithPendingMappings(pendingMappings)
|
||||
.WithLastRun(now);
|
||||
|
||||
await UpdateCursorAsync(failureCursor, cancellationToken).ConfigureAwait(false);
|
||||
await _stateRepository.MarkFailureAsync(SourceName, now, TimeSpan.FromMinutes(5), ex.Message, cancellationToken).ConfigureAwait(false);
|
||||
throw;
|
||||
}
|
||||
|
||||
var updatedCursor = cursor
|
||||
.WithSummaryState(plan.NextState)
|
||||
.WithLastRun(_timeProvider.GetUtcNow());
|
||||
.WithPendingSummaries(Array.Empty<Guid>())
|
||||
.WithPendingNotes(pendingNotes)
|
||||
.WithPendingDocuments(pendingDocuments)
|
||||
.WithPendingMappings(pendingMappings)
|
||||
.WithLastRun(now);
|
||||
|
||||
await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public Task ParseAsync(IServiceProvider services, CancellationToken cancellationToken)
|
||||
=> Task.CompletedTask;
|
||||
private async Task<bool> HasPendingDocumentBundleAsync(string noteId, HashSet<Guid> pendingDocuments, CancellationToken cancellationToken)
|
||||
{
|
||||
if (pendingDocuments.Count == 0)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
public Task MapAsync(IServiceProvider services, CancellationToken cancellationToken)
|
||||
=> Task.CompletedTask;
|
||||
var required = new HashSet<string>(DetailEndpoints, StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
foreach (var documentId in pendingDocuments)
|
||||
{
|
||||
var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false);
|
||||
if (document?.Metadata is null)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!document.Metadata.TryGetValue("certcc.noteId", out var metadataNoteId) ||
|
||||
!string.Equals(metadataNoteId, noteId, StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
var endpoint = document.Metadata.TryGetValue("certcc.endpoint", out var endpointValue)
|
||||
? endpointValue
|
||||
: "note";
|
||||
|
||||
required.Remove(endpoint);
|
||||
if (required.Count == 0)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
public async Task ParseAsync(IServiceProvider services, CancellationToken cancellationToken)
|
||||
{
|
||||
if (!_options.EnableDetailMapping)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false);
|
||||
if (cursor.PendingDocuments.Count == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var pendingDocuments = cursor.PendingDocuments.ToHashSet();
|
||||
var pendingMappings = cursor.PendingMappings.ToHashSet();
|
||||
|
||||
var groups = new Dictionary<string, NoteDocumentGroup>(StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
foreach (var documentId in cursor.PendingDocuments)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false);
|
||||
if (document is null)
|
||||
{
|
||||
pendingDocuments.Remove(documentId);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!TryGetMetadata(document, "certcc.noteId", out var noteId) || string.IsNullOrWhiteSpace(noteId))
|
||||
{
|
||||
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
|
||||
pendingDocuments.Remove(documentId);
|
||||
continue;
|
||||
}
|
||||
|
||||
var endpoint = TryGetMetadata(document, "certcc.endpoint", out var endpointValue)
|
||||
? endpointValue
|
||||
: "note";
|
||||
|
||||
var group = groups.TryGetValue(noteId, out var existing)
|
||||
? existing
|
||||
: (groups[noteId] = new NoteDocumentGroup(noteId));
|
||||
|
||||
group.Add(endpoint, document);
|
||||
}
|
||||
|
||||
foreach (var group in groups.Values)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
if (group.Note is null)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var noteBytes = await DownloadDocumentAsync(group.Note, cancellationToken).ConfigureAwait(false);
|
||||
var vendorsBytes = group.Vendors is null
|
||||
? EmptyArrayPayload
|
||||
: await DownloadDocumentAsync(group.Vendors, cancellationToken).ConfigureAwait(false);
|
||||
var vulsBytes = group.Vuls is null
|
||||
? EmptyArrayPayload
|
||||
: await DownloadDocumentAsync(group.Vuls, cancellationToken).ConfigureAwait(false);
|
||||
var vendorStatusesBytes = group.VendorStatuses is null
|
||||
? EmptyArrayPayload
|
||||
: await DownloadDocumentAsync(group.VendorStatuses, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var dto = CertCcNoteParser.Parse(noteBytes, vendorsBytes, vulsBytes, vendorStatusesBytes);
|
||||
var json = JsonSerializer.Serialize(dto, DtoSerializerOptions);
|
||||
var payload = MongoDB.Bson.BsonDocument.Parse(json);
|
||||
|
||||
_diagnostics.ParseSuccess(
|
||||
dto.Vendors.Count,
|
||||
dto.VendorStatuses.Count,
|
||||
dto.Vulnerabilities.Count);
|
||||
|
||||
var dtoRecord = new DtoRecord(
|
||||
Guid.NewGuid(),
|
||||
group.Note.Id,
|
||||
SourceName,
|
||||
"certcc.vince.note.v1",
|
||||
payload,
|
||||
_timeProvider.GetUtcNow());
|
||||
|
||||
await _dtoStore.UpsertAsync(dtoRecord, cancellationToken).ConfigureAwait(false);
|
||||
await _documentStore.UpdateStatusAsync(group.Note.Id, DocumentStatuses.PendingMap, cancellationToken).ConfigureAwait(false);
|
||||
pendingMappings.Add(group.Note.Id);
|
||||
pendingDocuments.Remove(group.Note.Id);
|
||||
|
||||
if (group.Vendors is not null)
|
||||
{
|
||||
await _documentStore.UpdateStatusAsync(group.Vendors.Id, DocumentStatuses.Mapped, cancellationToken).ConfigureAwait(false);
|
||||
pendingDocuments.Remove(group.Vendors.Id);
|
||||
}
|
||||
|
||||
if (group.Vuls is not null)
|
||||
{
|
||||
await _documentStore.UpdateStatusAsync(group.Vuls.Id, DocumentStatuses.Mapped, cancellationToken).ConfigureAwait(false);
|
||||
pendingDocuments.Remove(group.Vuls.Id);
|
||||
}
|
||||
|
||||
if (group.VendorStatuses is not null)
|
||||
{
|
||||
await _documentStore.UpdateStatusAsync(group.VendorStatuses.Id, DocumentStatuses.Mapped, cancellationToken).ConfigureAwait(false);
|
||||
pendingDocuments.Remove(group.VendorStatuses.Id);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_diagnostics.ParseFailure();
|
||||
_logger.LogError(ex, "CERT/CC parse failed for note {NoteId}", group.NoteId);
|
||||
if (group.Note is not null)
|
||||
{
|
||||
await _documentStore.UpdateStatusAsync(group.Note.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
|
||||
pendingDocuments.Remove(group.Note.Id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var updatedCursor = cursor
|
||||
.WithPendingDocuments(pendingDocuments)
|
||||
.WithPendingMappings(pendingMappings);
|
||||
|
||||
await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task MapAsync(IServiceProvider services, CancellationToken cancellationToken)
|
||||
{
|
||||
if (!_options.EnableDetailMapping)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false);
|
||||
if (cursor.PendingMappings.Count == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var pendingMappings = cursor.PendingMappings.ToHashSet();
|
||||
|
||||
foreach (var documentId in cursor.PendingMappings)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var dtoRecord = await _dtoStore.FindByDocumentIdAsync(documentId, cancellationToken).ConfigureAwait(false);
|
||||
var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (dtoRecord is null || document is null)
|
||||
{
|
||||
pendingMappings.Remove(documentId);
|
||||
continue;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var json = dtoRecord.Payload.ToJson();
|
||||
var dto = JsonSerializer.Deserialize<CertCcNoteDto>(json, DtoSerializerOptions);
|
||||
if (dto is null)
|
||||
{
|
||||
throw new InvalidOperationException($"CERT/CC DTO payload deserialized as null for document {documentId}.");
|
||||
}
|
||||
|
||||
var advisory = CertCcMapper.Map(dto, document, dtoRecord, SourceName);
|
||||
var affectedCount = advisory.AffectedPackages.Count;
|
||||
var normalizedRuleCount = advisory.AffectedPackages.Sum(static package => package.NormalizedVersions.Count);
|
||||
await _advisoryStore.UpsertAsync(advisory, cancellationToken).ConfigureAwait(false);
|
||||
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Mapped, cancellationToken).ConfigureAwait(false);
|
||||
_diagnostics.MapSuccess(affectedCount, normalizedRuleCount);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_diagnostics.MapFailure();
|
||||
_logger.LogError(ex, "CERT/CC mapping failed for document {DocumentId}", documentId);
|
||||
await _documentStore.UpdateStatusAsync(documentId, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
pendingMappings.Remove(documentId);
|
||||
}
|
||||
|
||||
var updatedCursor = cursor.WithPendingMappings(pendingMappings);
|
||||
await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task FetchNoteBundleAsync(
|
||||
string noteId,
|
||||
string? vuIdentifier,
|
||||
HashSet<Guid> pendingDocuments,
|
||||
HashSet<string> pendingNotes,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var missingEndpoints = new List<(string Endpoint, HttpStatusCode? Status)>();
|
||||
|
||||
try
|
||||
{
|
||||
foreach (var endpoint in DetailEndpoints)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var uri = BuildDetailUri(noteId, endpoint);
|
||||
var metadata = BuildDetailMetadata(noteId, vuIdentifier, endpoint);
|
||||
var existing = await _documentStore.FindBySourceAndUriAsync(SourceName, uri.ToString(), cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var request = new SourceFetchRequest(CertCcOptions.HttpClientName, SourceName, uri)
|
||||
{
|
||||
Metadata = metadata,
|
||||
ETag = existing?.Etag,
|
||||
LastModified = existing?.LastModified,
|
||||
AcceptHeaders = new[] { "application/json" },
|
||||
};
|
||||
|
||||
SourceFetchResult result;
|
||||
_diagnostics.DetailFetchAttempt(endpoint);
|
||||
try
|
||||
{
|
||||
result = await _fetchService.FetchAsync(request, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (HttpRequestException httpEx)
|
||||
{
|
||||
var status = httpEx.StatusCode ?? TryParseStatusCodeFromMessage(httpEx.Message);
|
||||
if (ShouldTreatAsMissing(status, endpoint))
|
||||
{
|
||||
_diagnostics.DetailFetchMissing(endpoint);
|
||||
missingEndpoints.Add((endpoint, status));
|
||||
continue;
|
||||
}
|
||||
|
||||
_diagnostics.DetailFetchFailure(endpoint);
|
||||
throw;
|
||||
}
|
||||
Guid documentId;
|
||||
if (result.IsSuccess && result.Document is not null)
|
||||
{
|
||||
_diagnostics.DetailFetchSuccess(endpoint);
|
||||
documentId = result.Document.Id;
|
||||
}
|
||||
else if (result.IsNotModified)
|
||||
{
|
||||
_diagnostics.DetailFetchUnchanged(endpoint);
|
||||
if (existing is null)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
documentId = existing.Id;
|
||||
}
|
||||
else
|
||||
{
|
||||
_diagnostics.DetailFetchFailure(endpoint);
|
||||
_logger.LogWarning(
|
||||
"CERT/CC detail endpoint {Endpoint} returned {StatusCode} for note {NoteId}; will retry.",
|
||||
endpoint,
|
||||
(int)result.StatusCode,
|
||||
noteId);
|
||||
|
||||
throw new HttpRequestException(
|
||||
$"CERT/CC endpoint '{endpoint}' returned {(int)result.StatusCode} ({result.StatusCode}) for note {noteId}.",
|
||||
null,
|
||||
result.StatusCode);
|
||||
}
|
||||
|
||||
pendingDocuments.Add(documentId);
|
||||
|
||||
if (_options.DetailRequestDelay > TimeSpan.Zero)
|
||||
{
|
||||
await Task.Delay(_options.DetailRequestDelay, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
if (missingEndpoints.Count > 0)
|
||||
{
|
||||
var formatted = string.Join(
|
||||
", ",
|
||||
missingEndpoints.Select(item =>
|
||||
item.Status.HasValue
|
||||
? $"{item.Endpoint} ({(int)item.Status.Value})"
|
||||
: item.Endpoint));
|
||||
|
||||
_logger.LogWarning(
|
||||
"CERT/CC detail fetch completed with missing endpoints for note {NoteId}: {Endpoints}",
|
||||
noteId,
|
||||
formatted);
|
||||
}
|
||||
|
||||
pendingNotes.Remove(noteId);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "CERT/CC detail fetch failed for note {NoteId}", noteId);
|
||||
pendingNotes.Add(noteId);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
private static Dictionary<string, string> BuildSummaryMetadata(CertCcSummaryRequest request)
|
||||
{
|
||||
var metadata = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase)
|
||||
{
|
||||
["certcc.scope"] = request.Scope.ToString().ToLowerInvariant(),
|
||||
["certcc.year"] = request.Year.ToString("D4", CultureInfo.InvariantCulture),
|
||||
};
|
||||
|
||||
if (request.Month.HasValue)
|
||||
{
|
||||
metadata["certcc.month"] = request.Month.Value.ToString("D2", CultureInfo.InvariantCulture);
|
||||
}
|
||||
|
||||
return metadata;
|
||||
}
|
||||
|
||||
private static Dictionary<string, string> BuildDetailMetadata(string noteId, string? vuIdentifier, string endpoint)
|
||||
{
|
||||
var metadata = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase)
|
||||
{
|
||||
["certcc.endpoint"] = endpoint,
|
||||
["certcc.noteId"] = noteId,
|
||||
};
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(vuIdentifier))
|
||||
{
|
||||
metadata["certcc.vuid"] = vuIdentifier;
|
||||
}
|
||||
|
||||
return metadata;
|
||||
}
|
||||
|
||||
private async Task<IReadOnlyList<string>> ReadSummaryNotesAsync(DocumentRecord document, CancellationToken cancellationToken)
|
||||
{
|
||||
if (!document.GridFsId.HasValue)
|
||||
{
|
||||
return Array.Empty<string>();
|
||||
}
|
||||
|
||||
var payload = await _rawDocumentStorage.DownloadAsync(document.GridFsId.Value, cancellationToken).ConfigureAwait(false);
|
||||
return CertCcSummaryParser.ParseNotes(payload);
|
||||
}
|
||||
|
||||
private async Task<byte[]> DownloadDocumentAsync(DocumentRecord document, CancellationToken cancellationToken)
|
||||
{
|
||||
if (!document.GridFsId.HasValue)
|
||||
{
|
||||
throw new InvalidOperationException($"Document {document.Id} has no GridFS payload.");
|
||||
}
|
||||
|
||||
return await _rawDocumentStorage.DownloadAsync(document.GridFsId.Value, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private Uri BuildDetailUri(string noteId, string endpoint)
|
||||
{
|
||||
var suffix = endpoint switch
|
||||
{
|
||||
"note" => $"{noteId}/",
|
||||
"vendors" => $"{noteId}/vendors/",
|
||||
"vuls" => $"{noteId}/vuls/",
|
||||
"vendors-vuls" => $"{noteId}/vendors/vuls/",
|
||||
_ => $"{noteId}/",
|
||||
};
|
||||
|
||||
return new Uri(_options.BaseApiUri, suffix);
|
||||
}
|
||||
|
||||
private static string? TryNormalizeNoteToken(string token, out string? vuIdentifier)
|
||||
{
|
||||
vuIdentifier = null;
|
||||
if (string.IsNullOrWhiteSpace(token))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
var trimmed = token.Trim();
|
||||
var digits = new string(trimmed.Where(char.IsDigit).ToArray());
|
||||
if (digits.Length == 0)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
vuIdentifier = trimmed.StartsWith("vu", StringComparison.OrdinalIgnoreCase)
|
||||
? trimmed.Replace(" ", string.Empty, StringComparison.Ordinal)
|
||||
: $"VU#{digits}";
|
||||
|
||||
return digits;
|
||||
}
|
||||
|
||||
private static bool TryGetMetadata(DocumentRecord document, string key, out string value)
|
||||
{
|
||||
value = string.Empty;
|
||||
if (document.Metadata is null)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!document.Metadata.TryGetValue(key, out var metadataValue))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
value = metadataValue;
|
||||
return true;
|
||||
}
|
||||
|
||||
private async Task<CertCcCursor> GetCursorAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
@@ -118,7 +677,103 @@ public sealed class CertCcConnector : IFeedConnector
|
||||
|
||||
private async Task UpdateCursorAsync(CertCcCursor cursor, CancellationToken cancellationToken)
|
||||
{
|
||||
var document = cursor.ToBsonDocument();
|
||||
await _stateRepository.UpdateCursorAsync(SourceName, document, _timeProvider.GetUtcNow(), cancellationToken).ConfigureAwait(false);
|
||||
var completedAt = _timeProvider.GetUtcNow();
|
||||
await _stateRepository.UpdateCursorAsync(SourceName, cursor.ToBsonDocument(), completedAt, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private sealed class NoteDocumentGroup
|
||||
{
|
||||
public NoteDocumentGroup(string noteId)
|
||||
{
|
||||
NoteId = noteId;
|
||||
}
|
||||
|
||||
public string NoteId { get; }
|
||||
|
||||
public DocumentRecord? Note { get; private set; }
|
||||
|
||||
public DocumentRecord? Vendors { get; private set; }
|
||||
|
||||
public DocumentRecord? Vuls { get; private set; }
|
||||
|
||||
public DocumentRecord? VendorStatuses { get; private set; }
|
||||
|
||||
public void Add(string endpoint, DocumentRecord document)
|
||||
{
|
||||
switch (endpoint)
|
||||
{
|
||||
case "note":
|
||||
Note = document;
|
||||
break;
|
||||
case "vendors":
|
||||
Vendors = document;
|
||||
break;
|
||||
case "vuls":
|
||||
Vuls = document;
|
||||
break;
|
||||
case "vendors-vuls":
|
||||
VendorStatuses = document;
|
||||
break;
|
||||
default:
|
||||
Note ??= document;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static bool ShouldTreatAsMissing(HttpStatusCode? statusCode, string endpoint)
|
||||
{
|
||||
if (statusCode is null)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (statusCode is HttpStatusCode.NotFound or HttpStatusCode.Gone)
|
||||
{
|
||||
return !string.Equals(endpoint, "note", StringComparison.OrdinalIgnoreCase);
|
||||
}
|
||||
|
||||
// Treat vendors/vendors-vuls/vuls 403 as optional air-gapped responses.
|
||||
if (statusCode == HttpStatusCode.Forbidden && !string.Equals(endpoint, "note", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private static HttpStatusCode? TryParseStatusCodeFromMessage(string? message)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(message))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
const string marker = "status ";
|
||||
var index = message.IndexOf(marker, StringComparison.OrdinalIgnoreCase);
|
||||
if (index < 0)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
index += marker.Length;
|
||||
var end = index;
|
||||
while (end < message.Length && char.IsDigit(message[end]))
|
||||
{
|
||||
end++;
|
||||
}
|
||||
|
||||
if (end == index)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
if (int.TryParse(message[index..end], NumberStyles.Integer, CultureInfo.InvariantCulture, out var code) &&
|
||||
Enum.IsDefined(typeof(HttpStatusCode), code))
|
||||
{
|
||||
return (HttpStatusCode)code;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user