using System; using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; using System.Net.Http; using System.Net.Http.Headers; using System.Security.Cryptography; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Runtime.CompilerServices; using Microsoft.Extensions.Logging; using StellaOps.Excititor.Connectors.Abstractions; using StellaOps.Excititor.Connectors.SUSE.RancherVEXHub.Authentication; using StellaOps.Excititor.Connectors.SUSE.RancherVEXHub.Configuration; using StellaOps.Excititor.Connectors.SUSE.RancherVEXHub.Events; using StellaOps.Excititor.Connectors.SUSE.RancherVEXHub.Metadata; using StellaOps.Excititor.Connectors.SUSE.RancherVEXHub.State; using StellaOps.Excititor.Core; namespace StellaOps.Excititor.Connectors.SUSE.RancherVEXHub; public sealed class RancherHubConnector : VexConnectorBase { private static readonly VexConnectorDescriptor StaticDescriptor = new( id: "excititor:suse.rancher", kind: VexProviderKind.Hub, displayName: "SUSE Rancher VEX Hub") { Tags = ImmutableArray.Create("hub", "suse", "offline"), }; private readonly RancherHubMetadataLoader _metadataLoader; private readonly RancherHubEventClient _eventClient; private readonly RancherHubCheckpointManager _checkpointManager; private readonly RancherHubTokenProvider _tokenProvider; private readonly IHttpClientFactory _httpClientFactory; private readonly IEnumerable> _validators; private RancherHubConnectorOptions? _options; private RancherHubMetadataResult? _metadata; public RancherHubConnector( RancherHubMetadataLoader metadataLoader, RancherHubEventClient eventClient, RancherHubCheckpointManager checkpointManager, RancherHubTokenProvider tokenProvider, IHttpClientFactory httpClientFactory, ILogger logger, TimeProvider timeProvider, IEnumerable>? validators = null) : base(StaticDescriptor, logger, timeProvider) { _metadataLoader = metadataLoader ?? throw new ArgumentNullException(nameof(metadataLoader)); _eventClient = eventClient ?? throw new ArgumentNullException(nameof(eventClient)); _checkpointManager = checkpointManager ?? throw new ArgumentNullException(nameof(checkpointManager)); _tokenProvider = tokenProvider ?? throw new ArgumentNullException(nameof(tokenProvider)); _httpClientFactory = httpClientFactory ?? throw new ArgumentNullException(nameof(httpClientFactory)); _validators = validators ?? Array.Empty>(); } public override async ValueTask ValidateAsync(VexConnectorSettings settings, CancellationToken cancellationToken) { _options = VexConnectorOptionsBinder.Bind( Descriptor, settings, validators: _validators); _metadata = await _metadataLoader.LoadAsync(_options, cancellationToken).ConfigureAwait(false); LogConnectorEvent(LogLevel.Information, "validate", "Rancher hub discovery loaded.", new Dictionary { ["discoveryUri"] = _options.DiscoveryUri.ToString(), ["subscriptionUri"] = _metadata.Metadata.Subscription.EventsUri.ToString(), ["requiresAuth"] = _metadata.Metadata.Subscription.RequiresAuthentication, ["fromOffline"] = _metadata.FromOfflineSnapshot, }); } public override async IAsyncEnumerable FetchAsync(VexConnectorContext context, [EnumeratorCancellation] CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(context); if (_options is null) { throw new InvalidOperationException("Connector must be validated before fetch operations."); } if (_metadata is null) { _metadata = await _metadataLoader.LoadAsync(_options, cancellationToken).ConfigureAwait(false); } var checkpoint = await _checkpointManager.LoadAsync(Descriptor.Id, context, cancellationToken).ConfigureAwait(false); var digestHistory = checkpoint.Digests.ToList(); var dedupeSet = new HashSet(checkpoint.Digests, StringComparer.OrdinalIgnoreCase); var latestCursor = checkpoint.Cursor; var latestPublishedAt = checkpoint.LastPublishedAt ?? checkpoint.EffectiveSince; var stateChanged = false; LogConnectorEvent(LogLevel.Information, "fetch_start", "Starting Rancher hub event ingestion.", new Dictionary { ["since"] = checkpoint.EffectiveSince?.ToString("O"), ["cursor"] = checkpoint.Cursor, ["subscriptionUri"] = _metadata.Metadata.Subscription.EventsUri.ToString(), ["offline"] = checkpoint.Cursor is null && _options.PreferOfflineSnapshot, }); await foreach (var batch in _eventClient.FetchEventBatchesAsync( _options, _metadata.Metadata, checkpoint.Cursor, checkpoint.EffectiveSince, _metadata.Metadata.Subscription.Channels, cancellationToken).ConfigureAwait(false)) { LogConnectorEvent(LogLevel.Debug, "batch", "Processing Rancher hub batch.", new Dictionary { ["cursor"] = batch.Cursor, ["nextCursor"] = batch.NextCursor, ["count"] = batch.Events.Length, ["offline"] = batch.FromOfflineSnapshot, }); if (!string.IsNullOrWhiteSpace(batch.NextCursor) && !string.Equals(batch.NextCursor, latestCursor, StringComparison.Ordinal)) { latestCursor = batch.NextCursor; stateChanged = true; } else if (string.IsNullOrWhiteSpace(latestCursor) && !string.IsNullOrWhiteSpace(batch.Cursor)) { latestCursor = batch.Cursor; } foreach (var record in batch.Events) { cancellationToken.ThrowIfCancellationRequested(); var result = await ProcessEventAsync(record, batch, context, dedupeSet, digestHistory, cancellationToken).ConfigureAwait(false); if (result.ProcessedDocument is not null) { yield return result.ProcessedDocument; stateChanged = true; if (result.PublishedAt is { } published && (latestPublishedAt is null || published > latestPublishedAt)) { latestPublishedAt = published; } } else if (result.Quarantined) { stateChanged = true; } } } if (stateChanged || !string.Equals(latestCursor, checkpoint.Cursor, StringComparison.Ordinal) || latestPublishedAt != checkpoint.LastPublishedAt) { await _checkpointManager.SaveAsync( Descriptor.Id, latestCursor, latestPublishedAt, digestHistory.ToImmutableArray(), cancellationToken).ConfigureAwait(false); } } public override ValueTask NormalizeAsync(VexRawDocument document, CancellationToken cancellationToken) => throw new NotSupportedException("RancherHubConnector relies on format-specific normalizers for CSAF/OpenVEX payloads."); public RancherHubMetadata? GetCachedMetadata() => _metadata?.Metadata; private async Task ProcessEventAsync( RancherHubEventRecord record, RancherHubEventBatch batch, VexConnectorContext context, HashSet dedupeSet, List digestHistory, CancellationToken cancellationToken) { var quarantineKey = BuildQuarantineKey(record); if (dedupeSet.Contains(quarantineKey)) { return EventProcessingResult.QuarantinedOnly; } if (record.DocumentUri is null || string.IsNullOrWhiteSpace(record.Id)) { await QuarantineAsync(record, batch, "missing documentUri or id", context, cancellationToken).ConfigureAwait(false); AddQuarantineDigest(quarantineKey, dedupeSet, digestHistory); return EventProcessingResult.QuarantinedOnly; } var client = _httpClientFactory.CreateClient(RancherHubConnectorOptions.HttpClientName); using var request = await CreateDocumentRequestAsync(record.DocumentUri, cancellationToken).ConfigureAwait(false); using var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); if (!response.IsSuccessStatusCode) { await QuarantineAsync(record, batch, $"document fetch failed ({(int)response.StatusCode} {response.StatusCode})", context, cancellationToken).ConfigureAwait(false); AddQuarantineDigest(quarantineKey, dedupeSet, digestHistory); return EventProcessingResult.QuarantinedOnly; } var contentBytes = await response.Content.ReadAsByteArrayAsync(cancellationToken).ConfigureAwait(false); var publishedAt = record.PublishedAt ?? UtcNow(); var metadata = BuildMetadata(builder => builder .Add("rancher.event.id", record.Id) .Add("rancher.event.type", record.Type) .Add("rancher.event.channel", record.Channel) .Add("rancher.event.published", publishedAt) .Add("rancher.event.cursor", batch.NextCursor ?? batch.Cursor) .Add("rancher.event.offline", batch.FromOfflineSnapshot ? "true" : "false") .Add("rancher.event.declaredDigest", record.DocumentDigest)); var format = ResolveFormat(record.DocumentFormat); var document = CreateRawDocument(format, record.DocumentUri, contentBytes, metadata); if (!string.IsNullOrWhiteSpace(record.DocumentDigest)) { var declared = NormalizeDigest(record.DocumentDigest); var computed = NormalizeDigest(document.Digest); if (!string.Equals(declared, computed, StringComparison.OrdinalIgnoreCase)) { await QuarantineAsync(record, batch, $"digest mismatch (declared {record.DocumentDigest}, computed {document.Digest})", context, cancellationToken).ConfigureAwait(false); AddQuarantineDigest(quarantineKey, dedupeSet, digestHistory); return EventProcessingResult.QuarantinedOnly; } } if (!dedupeSet.Add(document.Digest)) { return EventProcessingResult.Skipped; } digestHistory.Add(document.Digest); await context.RawSink.StoreAsync(document, cancellationToken).ConfigureAwait(false); return new EventProcessingResult(document, false, publishedAt); } private async Task CreateDocumentRequestAsync(Uri documentUri, CancellationToken cancellationToken) { var request = new HttpRequestMessage(HttpMethod.Get, documentUri); if (_metadata?.Metadata.Subscription.RequiresAuthentication ?? false) { var token = await _tokenProvider.GetAccessTokenAsync(_options!, cancellationToken).ConfigureAwait(false); if (token is not null) { var scheme = string.IsNullOrWhiteSpace(token.TokenType) ? "Bearer" : token.TokenType; request.Headers.Authorization = new AuthenticationHeaderValue(scheme, token.Value); } } return request; } private async Task QuarantineAsync( RancherHubEventRecord record, RancherHubEventBatch batch, string reason, VexConnectorContext context, CancellationToken cancellationToken) { var metadata = BuildMetadata(builder => builder .Add("rancher.event.id", record.Id) .Add("rancher.event.type", record.Type) .Add("rancher.event.channel", record.Channel) .Add("rancher.event.quarantine", "true") .Add("rancher.event.error", reason) .Add("rancher.event.cursor", batch.NextCursor ?? batch.Cursor) .Add("rancher.event.offline", batch.FromOfflineSnapshot ? "true" : "false")); var sourceUri = record.DocumentUri ?? _metadata?.Metadata.Subscription.EventsUri ?? _options!.DiscoveryUri; var payload = Encoding.UTF8.GetBytes(record.RawJson); var document = CreateRawDocument(VexDocumentFormat.Csaf, sourceUri, payload, metadata); await context.RawSink.StoreAsync(document, cancellationToken).ConfigureAwait(false); LogConnectorEvent(LogLevel.Warning, "quarantine", "Rancher hub event moved to quarantine.", new Dictionary { ["eventId"] = record.Id ?? "(missing)", ["reason"] = reason, }); } private static void AddQuarantineDigest(string key, HashSet dedupeSet, List digestHistory) { if (dedupeSet.Add(key)) { digestHistory.Add(key); } } private static string BuildQuarantineKey(RancherHubEventRecord record) { if (!string.IsNullOrWhiteSpace(record.Id)) { return $"quarantine:{record.Id}"; } Span hash = stackalloc byte[32]; var bytes = Encoding.UTF8.GetBytes(record.RawJson); if (!SHA256.TryHashData(bytes, hash, out _)) { using var sha = SHA256.Create(); hash = sha.ComputeHash(bytes); } return $"quarantine:{Convert.ToHexString(hash).ToLowerInvariant()}"; } private static string NormalizeDigest(string digest) { if (string.IsNullOrWhiteSpace(digest)) { return digest; } var trimmed = digest.Trim(); return trimmed.StartsWith("sha256:", StringComparison.OrdinalIgnoreCase) ? trimmed.ToLowerInvariant() : $"sha256:{trimmed.ToLowerInvariant()}"; } private static VexDocumentFormat ResolveFormat(string? format) { if (string.IsNullOrWhiteSpace(format)) { return VexDocumentFormat.Csaf; } return format.ToLowerInvariant() switch { "csaf" or "csaf_json" or "json" => VexDocumentFormat.Csaf, "cyclonedx" or "cyclonedx_vex" => VexDocumentFormat.CycloneDx, "openvex" => VexDocumentFormat.OpenVex, "oci" or "oci_attestation" or "attestation" => VexDocumentFormat.OciAttestation, _ => VexDocumentFormat.Csaf, }; } private sealed record EventProcessingResult(VexRawDocument? ProcessedDocument, bool Quarantined, DateTimeOffset? PublishedAt) { public static EventProcessingResult QuarantinedOnly { get; } = new(null, true, null); public static EventProcessingResult Skipped { get; } = new(null, false, null); } }