436 lines
18 KiB
C#
436 lines
18 KiB
C#
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Text.Json;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using MongoDB.Bson;
|
|
using StellaOps.Feedser.Source.CertBund.Configuration;
|
|
using StellaOps.Feedser.Source.CertBund.Internal;
|
|
using StellaOps.Feedser.Source.Common;
|
|
using StellaOps.Feedser.Source.Common.Fetch;
|
|
using StellaOps.Feedser.Source.Common.Html;
|
|
using StellaOps.Feedser.Storage.Mongo;
|
|
using StellaOps.Feedser.Storage.Mongo.Advisories;
|
|
using StellaOps.Feedser.Storage.Mongo.Documents;
|
|
using StellaOps.Feedser.Storage.Mongo.Dtos;
|
|
using StellaOps.Plugin;
|
|
|
|
namespace StellaOps.Feedser.Source.CertBund;
|
|
|
|
public sealed class CertBundConnector : IFeedConnector
|
|
{
|
|
private static readonly JsonSerializerOptions SerializerOptions = new(JsonSerializerDefaults.Web)
|
|
{
|
|
PropertyNameCaseInsensitive = true,
|
|
DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull,
|
|
};
|
|
|
|
private readonly CertBundFeedClient _feedClient;
|
|
private readonly CertBundDetailParser _detailParser;
|
|
private readonly SourceFetchService _fetchService;
|
|
private readonly RawDocumentStorage _rawDocumentStorage;
|
|
private readonly IDocumentStore _documentStore;
|
|
private readonly IDtoStore _dtoStore;
|
|
private readonly IAdvisoryStore _advisoryStore;
|
|
private readonly ISourceStateRepository _stateRepository;
|
|
private readonly CertBundOptions _options;
|
|
private readonly TimeProvider _timeProvider;
|
|
private readonly CertBundDiagnostics _diagnostics;
|
|
private readonly ILogger<CertBundConnector> _logger;
|
|
|
|
public CertBundConnector(
|
|
CertBundFeedClient feedClient,
|
|
CertBundDetailParser detailParser,
|
|
SourceFetchService fetchService,
|
|
RawDocumentStorage rawDocumentStorage,
|
|
IDocumentStore documentStore,
|
|
IDtoStore dtoStore,
|
|
IAdvisoryStore advisoryStore,
|
|
ISourceStateRepository stateRepository,
|
|
IOptions<CertBundOptions> options,
|
|
CertBundDiagnostics diagnostics,
|
|
TimeProvider? timeProvider,
|
|
ILogger<CertBundConnector> logger)
|
|
{
|
|
_feedClient = feedClient ?? throw new ArgumentNullException(nameof(feedClient));
|
|
_detailParser = detailParser ?? throw new ArgumentNullException(nameof(detailParser));
|
|
_fetchService = fetchService ?? throw new ArgumentNullException(nameof(fetchService));
|
|
_rawDocumentStorage = rawDocumentStorage ?? throw new ArgumentNullException(nameof(rawDocumentStorage));
|
|
_documentStore = documentStore ?? throw new ArgumentNullException(nameof(documentStore));
|
|
_dtoStore = dtoStore ?? throw new ArgumentNullException(nameof(dtoStore));
|
|
_advisoryStore = advisoryStore ?? throw new ArgumentNullException(nameof(advisoryStore));
|
|
_stateRepository = stateRepository ?? throw new ArgumentNullException(nameof(stateRepository));
|
|
_options = (options ?? throw new ArgumentNullException(nameof(options))).Value ?? throw new ArgumentNullException(nameof(options));
|
|
_options.Validate();
|
|
_diagnostics = diagnostics ?? throw new ArgumentNullException(nameof(diagnostics));
|
|
_timeProvider = timeProvider ?? TimeProvider.System;
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
}
|
|
|
|
public string SourceName => CertBundConnectorPlugin.SourceName;
|
|
|
|
public async Task FetchAsync(IServiceProvider services, CancellationToken cancellationToken)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(services);
|
|
|
|
var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false);
|
|
var now = _timeProvider.GetUtcNow();
|
|
IReadOnlyList<CertBundFeedItem> feedItems;
|
|
|
|
_diagnostics.FeedFetchAttempt();
|
|
try
|
|
{
|
|
feedItems = await _feedClient.LoadAsync(cancellationToken).ConfigureAwait(false);
|
|
_diagnostics.FeedFetchSuccess(feedItems.Count);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "CERT-Bund feed fetch failed");
|
|
_diagnostics.FeedFetchFailure();
|
|
await _stateRepository.MarkFailureAsync(SourceName, now, _options.FailureBackoff, ex.Message, cancellationToken).ConfigureAwait(false);
|
|
throw;
|
|
}
|
|
|
|
var coverageDays = CalculateCoverageDays(feedItems, now);
|
|
_diagnostics.RecordFeedCoverage(coverageDays);
|
|
|
|
if (feedItems.Count == 0)
|
|
{
|
|
await UpdateCursorAsync(cursor.WithLastFetch(now), cancellationToken).ConfigureAwait(false);
|
|
return;
|
|
}
|
|
|
|
var pendingDocuments = cursor.PendingDocuments.ToHashSet();
|
|
var pendingMappings = cursor.PendingMappings.ToHashSet();
|
|
var knownAdvisories = new HashSet<string>(cursor.KnownAdvisories, StringComparer.OrdinalIgnoreCase);
|
|
var processed = 0;
|
|
var alreadyKnown = 0;
|
|
var notModified = 0;
|
|
var detailFailures = 0;
|
|
var truncated = false;
|
|
var latestPublished = cursor.LastPublished ?? DateTimeOffset.MinValue;
|
|
|
|
foreach (var item in feedItems.OrderByDescending(static i => i.Published))
|
|
{
|
|
cancellationToken.ThrowIfCancellationRequested();
|
|
|
|
if (knownAdvisories.Contains(item.AdvisoryId))
|
|
{
|
|
alreadyKnown++;
|
|
continue;
|
|
}
|
|
|
|
if (processed >= _options.MaxAdvisoriesPerFetch)
|
|
{
|
|
truncated = true;
|
|
break;
|
|
}
|
|
|
|
try
|
|
{
|
|
_diagnostics.DetailFetchAttempt();
|
|
var existing = await _documentStore.FindBySourceAndUriAsync(SourceName, item.DetailUri.ToString(), cancellationToken).ConfigureAwait(false);
|
|
var request = new SourceFetchRequest(CertBundOptions.HttpClientName, SourceName, item.DetailUri)
|
|
{
|
|
AcceptHeaders = new[] { "application/json", "text/json" },
|
|
Metadata = CertBundDocumentMetadata.CreateMetadata(item),
|
|
ETag = existing?.Etag,
|
|
LastModified = existing?.LastModified,
|
|
TimeoutOverride = _options.RequestTimeout,
|
|
};
|
|
|
|
var result = await _fetchService.FetchAsync(request, cancellationToken).ConfigureAwait(false);
|
|
if (result.IsNotModified)
|
|
{
|
|
_diagnostics.DetailFetchNotModified();
|
|
notModified++;
|
|
knownAdvisories.Add(item.AdvisoryId);
|
|
continue;
|
|
}
|
|
|
|
if (!result.IsSuccess || result.Document is null)
|
|
{
|
|
_diagnostics.DetailFetchFailure("skipped");
|
|
detailFailures++;
|
|
continue;
|
|
}
|
|
|
|
_diagnostics.DetailFetchSuccess();
|
|
pendingDocuments.Add(result.Document.Id);
|
|
pendingMappings.Remove(result.Document.Id);
|
|
knownAdvisories.Add(item.AdvisoryId);
|
|
processed++;
|
|
|
|
if (_options.RequestDelay > TimeSpan.Zero)
|
|
{
|
|
await Task.Delay(_options.RequestDelay, cancellationToken).ConfigureAwait(false);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "CERT-Bund detail fetch failed for {AdvisoryId}", item.AdvisoryId);
|
|
_diagnostics.DetailFetchFailure("exception");
|
|
detailFailures++;
|
|
await _stateRepository.MarkFailureAsync(SourceName, now, _options.FailureBackoff, ex.Message, cancellationToken).ConfigureAwait(false);
|
|
throw;
|
|
}
|
|
|
|
if (item.Published > latestPublished)
|
|
{
|
|
latestPublished = item.Published;
|
|
}
|
|
}
|
|
|
|
_diagnostics.DetailFetchEnqueued(processed);
|
|
|
|
if (feedItems.Count > 0 || processed > 0 || detailFailures > 0)
|
|
{
|
|
_logger.LogInformation(
|
|
"CERT-Bund fetch cycle: feed items {FeedItems}, enqueued {Enqueued}, already known {Known}, not modified {NotModified}, detail failures {DetailFailures}, pending documents {PendingDocuments}, pending mappings {PendingMappings}, truncated {Truncated}, coverageDays={CoverageDays}",
|
|
feedItems.Count,
|
|
processed,
|
|
alreadyKnown,
|
|
notModified,
|
|
detailFailures,
|
|
pendingDocuments.Count,
|
|
pendingMappings.Count,
|
|
truncated,
|
|
coverageDays ?? double.NaN);
|
|
}
|
|
|
|
var trimmedKnown = knownAdvisories.Count > _options.MaxKnownAdvisories
|
|
? knownAdvisories.OrderByDescending(id => id, StringComparer.OrdinalIgnoreCase)
|
|
.Take(_options.MaxKnownAdvisories)
|
|
.ToArray()
|
|
: knownAdvisories.ToArray();
|
|
|
|
var updatedCursor = cursor
|
|
.WithPendingDocuments(pendingDocuments)
|
|
.WithPendingMappings(pendingMappings)
|
|
.WithKnownAdvisories(trimmedKnown)
|
|
.WithLastPublished(latestPublished == DateTimeOffset.MinValue ? cursor.LastPublished : latestPublished)
|
|
.WithLastFetch(now);
|
|
|
|
await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false);
|
|
}
|
|
|
|
public async Task ParseAsync(IServiceProvider services, CancellationToken cancellationToken)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(services);
|
|
|
|
var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false);
|
|
if (cursor.PendingDocuments.Count == 0)
|
|
{
|
|
return;
|
|
}
|
|
|
|
var remainingDocuments = cursor.PendingDocuments.ToHashSet();
|
|
var pendingMappings = cursor.PendingMappings.ToHashSet();
|
|
var now = _timeProvider.GetUtcNow();
|
|
var parsedCount = 0;
|
|
var failedCount = 0;
|
|
|
|
foreach (var documentId in cursor.PendingDocuments)
|
|
{
|
|
cancellationToken.ThrowIfCancellationRequested();
|
|
|
|
var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false);
|
|
if (document is null)
|
|
{
|
|
remainingDocuments.Remove(documentId);
|
|
pendingMappings.Remove(documentId);
|
|
continue;
|
|
}
|
|
|
|
if (!document.GridFsId.HasValue)
|
|
{
|
|
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
|
|
remainingDocuments.Remove(documentId);
|
|
pendingMappings.Remove(documentId);
|
|
_diagnostics.ParseFailure("missing_payload");
|
|
failedCount++;
|
|
continue;
|
|
}
|
|
|
|
byte[] payload;
|
|
try
|
|
{
|
|
payload = await _rawDocumentStorage.DownloadAsync(document.GridFsId.Value, cancellationToken).ConfigureAwait(false);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "CERT-Bund unable to download document {DocumentId}", document.Id);
|
|
_diagnostics.ParseFailure("download_failed");
|
|
throw;
|
|
}
|
|
|
|
CertBundAdvisoryDto dto;
|
|
try
|
|
{
|
|
dto = _detailParser.Parse(new Uri(document.Uri), new Uri(document.Metadata?["certbund.portalUri"] ?? document.Uri), payload);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "CERT-Bund failed to parse advisory detail {DocumentId}", document.Id);
|
|
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
|
|
remainingDocuments.Remove(documentId);
|
|
pendingMappings.Remove(documentId);
|
|
_diagnostics.ParseFailure("parse_error");
|
|
failedCount++;
|
|
continue;
|
|
}
|
|
|
|
_diagnostics.ParseSuccess(dto.Products.Count, dto.CveIds.Count);
|
|
parsedCount++;
|
|
|
|
var bson = BsonDocument.Parse(JsonSerializer.Serialize(dto, SerializerOptions));
|
|
var dtoRecord = new DtoRecord(Guid.NewGuid(), document.Id, SourceName, "cert-bund.detail.v1", bson, now);
|
|
await _dtoStore.UpsertAsync(dtoRecord, cancellationToken).ConfigureAwait(false);
|
|
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.PendingMap, cancellationToken).ConfigureAwait(false);
|
|
|
|
remainingDocuments.Remove(documentId);
|
|
pendingMappings.Add(document.Id);
|
|
}
|
|
|
|
if (cursor.PendingDocuments.Count > 0)
|
|
{
|
|
_logger.LogInformation(
|
|
"CERT-Bund parse cycle: parsed {Parsed}, failures {Failures}, remaining documents {RemainingDocuments}, pending mappings {PendingMappings}",
|
|
parsedCount,
|
|
failedCount,
|
|
remainingDocuments.Count,
|
|
pendingMappings.Count);
|
|
}
|
|
|
|
var updatedCursor = cursor
|
|
.WithPendingDocuments(remainingDocuments)
|
|
.WithPendingMappings(pendingMappings);
|
|
|
|
await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false);
|
|
}
|
|
|
|
public async Task MapAsync(IServiceProvider services, CancellationToken cancellationToken)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(services);
|
|
|
|
var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false);
|
|
if (cursor.PendingMappings.Count == 0)
|
|
{
|
|
return;
|
|
}
|
|
|
|
var pendingMappings = cursor.PendingMappings.ToHashSet();
|
|
var mappedCount = 0;
|
|
var failedCount = 0;
|
|
|
|
foreach (var documentId in cursor.PendingMappings)
|
|
{
|
|
cancellationToken.ThrowIfCancellationRequested();
|
|
|
|
var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false);
|
|
if (document is null)
|
|
{
|
|
pendingMappings.Remove(documentId);
|
|
continue;
|
|
}
|
|
|
|
var dtoRecord = await _dtoStore.FindByDocumentIdAsync(documentId, cancellationToken).ConfigureAwait(false);
|
|
if (dtoRecord is null)
|
|
{
|
|
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
|
|
pendingMappings.Remove(documentId);
|
|
_diagnostics.MapFailure("missing_dto");
|
|
failedCount++;
|
|
continue;
|
|
}
|
|
|
|
CertBundAdvisoryDto? dto;
|
|
try
|
|
{
|
|
dto = JsonSerializer.Deserialize<CertBundAdvisoryDto>(dtoRecord.Payload.ToJson(), SerializerOptions);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "CERT-Bund failed to deserialize DTO for document {DocumentId}", document.Id);
|
|
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
|
|
pendingMappings.Remove(documentId);
|
|
_diagnostics.MapFailure("deserialize_failed");
|
|
failedCount++;
|
|
continue;
|
|
}
|
|
|
|
if (dto is null)
|
|
{
|
|
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
|
|
pendingMappings.Remove(documentId);
|
|
_diagnostics.MapFailure("null_dto");
|
|
failedCount++;
|
|
continue;
|
|
}
|
|
|
|
try
|
|
{
|
|
var advisory = CertBundMapper.Map(dto, document, dtoRecord.ValidatedAt);
|
|
await _advisoryStore.UpsertAsync(advisory, cancellationToken).ConfigureAwait(false);
|
|
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Mapped, cancellationToken).ConfigureAwait(false);
|
|
pendingMappings.Remove(documentId);
|
|
_diagnostics.MapSuccess(advisory.AffectedPackages.Length, advisory.Aliases.Length);
|
|
mappedCount++;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "CERT-Bund mapping failed for document {DocumentId}", document.Id);
|
|
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
|
|
pendingMappings.Remove(documentId);
|
|
_diagnostics.MapFailure("exception");
|
|
failedCount++;
|
|
}
|
|
}
|
|
|
|
if (cursor.PendingMappings.Count > 0)
|
|
{
|
|
_logger.LogInformation(
|
|
"CERT-Bund map cycle: mapped {Mapped}, failures {Failures}, remaining pending mappings {PendingMappings}",
|
|
mappedCount,
|
|
failedCount,
|
|
pendingMappings.Count);
|
|
}
|
|
|
|
var updatedCursor = cursor.WithPendingMappings(pendingMappings);
|
|
await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false);
|
|
}
|
|
|
|
private static double? CalculateCoverageDays(IReadOnlyList<CertBundFeedItem> items, DateTimeOffset fetchedAt)
|
|
{
|
|
if (items is null || items.Count == 0)
|
|
{
|
|
return null;
|
|
}
|
|
|
|
var oldest = items.Min(static item => item.Published);
|
|
if (oldest == DateTimeOffset.MinValue)
|
|
{
|
|
return null;
|
|
}
|
|
|
|
var span = fetchedAt - oldest;
|
|
return span >= TimeSpan.Zero ? span.TotalDays : null;
|
|
}
|
|
|
|
private async Task<CertBundCursor> GetCursorAsync(CancellationToken cancellationToken)
|
|
{
|
|
var state = await _stateRepository.TryGetAsync(SourceName, cancellationToken).ConfigureAwait(false);
|
|
return state is null ? CertBundCursor.Empty : CertBundCursor.FromBson(state.Cursor);
|
|
}
|
|
|
|
private Task UpdateCursorAsync(CertBundCursor cursor, CancellationToken cancellationToken)
|
|
{
|
|
var document = cursor.ToBsonDocument();
|
|
var completedAt = cursor.LastFetchAt ?? _timeProvider.GetUtcNow();
|
|
return _stateRepository.UpdateCursorAsync(SourceName, document, completedAt, cancellationToken);
|
|
}
|
|
}
|