using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; using System.Net.Http; using System.Runtime.CompilerServices; using System.Text.Json; using System.Xml.Linq; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StellaOps.Excititor.Connectors.Abstractions; using StellaOps.Excititor.Connectors.RedHat.CSAF.Configuration; using StellaOps.Excititor.Connectors.RedHat.CSAF.Metadata; using StellaOps.Excititor.Core; using StellaOps.Excititor.Storage.Mongo; namespace StellaOps.Excititor.Connectors.RedHat.CSAF; public sealed class RedHatCsafConnector : VexConnectorBase { private readonly RedHatProviderMetadataLoader _metadataLoader; private readonly IHttpClientFactory _httpClientFactory; private readonly IVexConnectorStateRepository _stateRepository; public RedHatCsafConnector( VexConnectorDescriptor descriptor, RedHatProviderMetadataLoader metadataLoader, IHttpClientFactory httpClientFactory, IVexConnectorStateRepository stateRepository, ILogger logger, TimeProvider timeProvider) : base(descriptor, logger, timeProvider) { _metadataLoader = metadataLoader ?? throw new ArgumentNullException(nameof(metadataLoader)); _httpClientFactory = httpClientFactory ?? throw new ArgumentNullException(nameof(httpClientFactory)); _stateRepository = stateRepository ?? throw new ArgumentNullException(nameof(stateRepository)); } public override ValueTask ValidateAsync(VexConnectorSettings settings, CancellationToken cancellationToken) { // No connector-specific settings yet. return ValueTask.CompletedTask; } public override async IAsyncEnumerable FetchAsync(VexConnectorContext context, [EnumeratorCancellation] CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(context); var metadataResult = await _metadataLoader.LoadAsync(cancellationToken).ConfigureAwait(false); if (metadataResult.Provider.Discovery.RolIeService is null) { throw new InvalidOperationException("Red Hat provider metadata did not specify a ROLIE feed."); } var state = await _stateRepository.GetAsync(Descriptor.Id, cancellationToken).ConfigureAwait(false); var sinceTimestamp = context.Since; if (state?.LastUpdated is { } persisted && (sinceTimestamp is null || persisted > sinceTimestamp)) { sinceTimestamp = persisted; } var knownDigests = state?.DocumentDigests ?? ImmutableArray.Empty; var digestList = new List(knownDigests); var digestSet = new HashSet(knownDigests, StringComparer.OrdinalIgnoreCase); var latestUpdated = state?.LastUpdated ?? sinceTimestamp ?? DateTimeOffset.MinValue; var stateChanged = false; foreach (var entry in await FetchRolieEntriesAsync(metadataResult.Provider.Discovery.RolIeService, cancellationToken).ConfigureAwait(false)) { if (sinceTimestamp is not null && entry.Updated is DateTimeOffset updated && updated <= sinceTimestamp) { continue; } if (entry.DocumentUri is null) { Logger.LogDebug("Skipping ROLIE entry {Id} because no document link was provided.", entry.Id); continue; } var rawDocument = await DownloadCsafDocumentAsync(entry, cancellationToken).ConfigureAwait(false); if (!digestSet.Add(rawDocument.Digest)) { Logger.LogDebug("Skipping CSAF document {Uri} because digest {Digest} was already processed.", rawDocument.SourceUri, rawDocument.Digest); continue; } await context.RawSink.StoreAsync(rawDocument, cancellationToken).ConfigureAwait(false); digestList.Add(rawDocument.Digest); stateChanged = true; if (entry.Updated is DateTimeOffset entryUpdated && entryUpdated > latestUpdated) { latestUpdated = entryUpdated; } yield return rawDocument; } if (stateChanged) { var newLastUpdated = latestUpdated == DateTimeOffset.MinValue ? state?.LastUpdated : latestUpdated; var updatedState = new VexConnectorState( Descriptor.Id, newLastUpdated, digestList.ToImmutableArray()); await _stateRepository.SaveAsync(updatedState, cancellationToken).ConfigureAwait(false); } } public override ValueTask NormalizeAsync(VexRawDocument document, CancellationToken cancellationToken) { // This connector relies on format-specific normalizers registered elsewhere. throw new NotSupportedException("RedHatCsafConnector does not perform in-line normalization; use the CSAF normalizer component."); } private async Task> FetchRolieEntriesAsync(Uri feedUri, CancellationToken cancellationToken) { var client = _httpClientFactory.CreateClient(RedHatConnectorOptions.HttpClientName); using var response = await client.GetAsync(feedUri, cancellationToken).ConfigureAwait(false); response.EnsureSuccessStatusCode(); await using var stream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); var document = XDocument.Load(stream); var ns = document.Root?.Name.Namespace ?? "http://www.w3.org/2005/Atom"; var entries = document.Root? .Elements(ns + "entry") .Select(e => new RolieEntry( Id: (string?)e.Element(ns + "id"), Updated: ParseUpdated((string?)e.Element(ns + "updated")), DocumentUri: ParseDocumentLink(e, ns))) .Where(entry => entry.Id is not null && entry.Updated is not null) .OrderBy(entry => entry.Updated) .ToList() ?? new List(); return entries; } private static DateTimeOffset? ParseUpdated(string? value) => DateTimeOffset.TryParse(value, out var parsed) ? parsed : null; private static Uri? ParseDocumentLink(XElement entry, XNamespace ns) { var linkElements = entry.Elements(ns + "link"); foreach (var link in linkElements) { var rel = (string?)link.Attribute("rel"); var href = (string?)link.Attribute("href"); if (string.IsNullOrWhiteSpace(href)) { continue; } if (rel is null || rel.Equals("enclosure", StringComparison.OrdinalIgnoreCase) || rel.Equals("alternate", StringComparison.OrdinalIgnoreCase)) { if (Uri.TryCreate(href, UriKind.Absolute, out var uri)) { return uri; } } } return null; } private async Task DownloadCsafDocumentAsync(RolieEntry entry, CancellationToken cancellationToken) { var documentUri = entry.DocumentUri ?? throw new InvalidOperationException("ROLIE entry missing document URI."); var client = _httpClientFactory.CreateClient(RedHatConnectorOptions.HttpClientName); using var response = await client.GetAsync(documentUri, cancellationToken).ConfigureAwait(false); response.EnsureSuccessStatusCode(); var contentBytes = await response.Content.ReadAsByteArrayAsync(cancellationToken).ConfigureAwait(false); var metadata = BuildMetadata(builder => builder .Add("redhat.csaf.entryId", entry.Id) .Add("redhat.csaf.documentUri", documentUri.ToString()) .Add("redhat.csaf.updated", entry.Updated?.ToString("O"))); return CreateRawDocument(VexDocumentFormat.Csaf, documentUri, contentBytes, metadata); } private sealed record RolieEntry(string? Id, DateTimeOffset? Updated, Uri? DocumentUri); }