using System.Collections.Generic; using System.Globalization; using System.IO; using System.Net; using System.Net.Http; using System.Linq; using System.Threading; using System.Threading.Tasks; using System.Xml.Linq; using System.Text.Json; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using MongoDB.Bson; using MongoDB.Bson.IO; using StellaOps.Concelier.Connector.Acsc.Configuration; using StellaOps.Concelier.Connector.Acsc.Internal; using StellaOps.Concelier.Connector.Common.Fetch; using StellaOps.Concelier.Connector.Common.Html; using StellaOps.Concelier.Connector.Common; using StellaOps.Concelier.Storage.Mongo; using StellaOps.Concelier.Storage.Mongo.Documents; using StellaOps.Concelier.Storage.Mongo.Dtos; using StellaOps.Concelier.Storage.Mongo.Advisories; using StellaOps.Plugin; namespace StellaOps.Concelier.Connector.Acsc; public sealed class AcscConnector : IFeedConnector { private static readonly string[] AcceptHeaders = { "application/rss+xml", "application/atom+xml;q=0.9", "application/xml;q=0.8", "text/xml;q=0.7", }; private static readonly JsonSerializerOptions SerializerOptions = new(JsonSerializerDefaults.Web) { PropertyNameCaseInsensitive = true, WriteIndented = false, }; private readonly SourceFetchService _fetchService; private readonly RawDocumentStorage _rawDocumentStorage; private readonly IDocumentStore _documentStore; private readonly IDtoStore _dtoStore; private readonly IAdvisoryStore _advisoryStore; private readonly ISourceStateRepository _stateRepository; private readonly IHttpClientFactory _httpClientFactory; private readonly AcscOptions _options; private readonly AcscDiagnostics _diagnostics; private readonly TimeProvider _timeProvider; private readonly ILogger _logger; private readonly HtmlContentSanitizer _htmlSanitizer = new(); public AcscConnector( SourceFetchService fetchService, RawDocumentStorage rawDocumentStorage, IDocumentStore documentStore, IDtoStore dtoStore, IAdvisoryStore advisoryStore, ISourceStateRepository stateRepository, IHttpClientFactory httpClientFactory, IOptions options, AcscDiagnostics diagnostics, TimeProvider? timeProvider, ILogger logger) { _fetchService = fetchService ?? throw new ArgumentNullException(nameof(fetchService)); _rawDocumentStorage = rawDocumentStorage ?? throw new ArgumentNullException(nameof(rawDocumentStorage)); _documentStore = documentStore ?? throw new ArgumentNullException(nameof(documentStore)); _dtoStore = dtoStore ?? throw new ArgumentNullException(nameof(dtoStore)); _advisoryStore = advisoryStore ?? throw new ArgumentNullException(nameof(advisoryStore)); _stateRepository = stateRepository ?? throw new ArgumentNullException(nameof(stateRepository)); _httpClientFactory = httpClientFactory ?? throw new ArgumentNullException(nameof(httpClientFactory)); _options = (options ?? throw new ArgumentNullException(nameof(options))).Value ?? throw new ArgumentNullException(nameof(options)); _options.Validate(); _diagnostics = diagnostics ?? throw new ArgumentNullException(nameof(diagnostics)); _timeProvider = timeProvider ?? TimeProvider.System; _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public string SourceName => AcscConnectorPlugin.SourceName; public async Task FetchAsync(IServiceProvider services, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(services); var now = _timeProvider.GetUtcNow(); var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false); var lastPublished = new Dictionary(cursor.LastPublishedByFeed, StringComparer.OrdinalIgnoreCase); var pendingDocuments = cursor.PendingDocuments.ToHashSet(); var pendingMappings = cursor.PendingMappings.ToHashSet(); var failures = new List<(AcscFeedOptions Feed, Exception Error)>(); var preferredEndpoint = ResolveInitialPreference(cursor); AcscEndpointPreference? successPreference = null; foreach (var feed in GetEnabledFeeds()) { cancellationToken.ThrowIfCancellationRequested(); Exception? lastError = null; bool handled = false; foreach (var mode in BuildFetchOrder(preferredEndpoint)) { cancellationToken.ThrowIfCancellationRequested(); if (mode == AcscFetchMode.Relay && !IsRelayConfigured) { continue; } var modeName = ModeName(mode); var targetUri = BuildFeedUri(feed, mode); var metadata = CreateMetadata(feed, cursor, modeName); var existing = await _documentStore.FindBySourceAndUriAsync(SourceName, targetUri.ToString(), cancellationToken).ConfigureAwait(false); var request = new SourceFetchRequest(AcscOptions.HttpClientName, SourceName, targetUri) { Metadata = metadata, ETag = existing?.Etag, LastModified = existing?.LastModified, AcceptHeaders = AcceptHeaders, TimeoutOverride = _options.RequestTimeout, }; try { _diagnostics.FetchAttempt(feed.Slug, modeName); var result = await _fetchService.FetchAsync(request, cancellationToken).ConfigureAwait(false); if (result.IsNotModified) { _diagnostics.FetchUnchanged(feed.Slug, modeName); successPreference ??= mode switch { AcscFetchMode.Relay => AcscEndpointPreference.Relay, _ => AcscEndpointPreference.Direct, }; handled = true; _logger.LogDebug("ACSC feed {Feed} returned 304 via {Mode}", feed.Slug, modeName); break; } if (!result.IsSuccess || result.Document is null) { _diagnostics.FetchFailure(feed.Slug, modeName); lastError = new InvalidOperationException($"Fetch returned no document for {targetUri}"); continue; } pendingDocuments.Add(result.Document.Id); successPreference = mode switch { AcscFetchMode.Relay => AcscEndpointPreference.Relay, _ => AcscEndpointPreference.Direct, }; handled = true; _diagnostics.FetchSuccess(feed.Slug, modeName); _logger.LogInformation("ACSC fetched {Feed} via {Mode} (documentId={DocumentId})", feed.Slug, modeName, result.Document.Id); var latestPublished = await TryComputeLatestPublishedAsync(result.Document, cancellationToken).ConfigureAwait(false); if (latestPublished.HasValue) { if (!lastPublished.TryGetValue(feed.Slug, out var existingPublished) || latestPublished.Value > existingPublished) { lastPublished[feed.Slug] = latestPublished.Value; _diagnostics.CursorUpdated(feed.Slug); _logger.LogDebug("ACSC feed {Feed} advanced published cursor to {Timestamp:O}", feed.Slug, latestPublished.Value); } } break; } catch (HttpRequestException ex) when (ShouldRetryWithRelay(mode)) { lastError = ex; _diagnostics.FetchFallback(feed.Slug, modeName, "http-request"); _logger.LogWarning(ex, "ACSC fetch via {Mode} failed for {Feed}; attempting relay fallback.", modeName, feed.Slug); continue; } catch (TaskCanceledException ex) when (ShouldRetryWithRelay(mode)) { lastError = ex; _diagnostics.FetchFallback(feed.Slug, modeName, "timeout"); _logger.LogWarning(ex, "ACSC fetch via {Mode} timed out for {Feed}; attempting relay fallback.", modeName, feed.Slug); continue; } catch (Exception ex) { lastError = ex; _diagnostics.FetchFailure(feed.Slug, modeName); _logger.LogError(ex, "ACSC fetch failed for {Feed} via {Mode}", feed.Slug, modeName); break; } } if (!handled && lastError is not null) { failures.Add((feed, lastError)); } } if (failures.Count > 0) { var failureReason = string.Join("; ", failures.Select(f => $"{f.Feed.Slug}: {f.Error.Message}")); await _stateRepository.MarkFailureAsync(SourceName, now, _options.FailureBackoff, failureReason, cancellationToken).ConfigureAwait(false); throw new AggregateException($"ACSC fetch failed for {failures.Count} feed(s): {failureReason}", failures.Select(f => f.Error)); } var updatedPreference = successPreference ?? preferredEndpoint; if (_options.ForceRelay) { updatedPreference = AcscEndpointPreference.Relay; } else if (!IsRelayConfigured) { updatedPreference = AcscEndpointPreference.Direct; } var updatedCursor = cursor .WithPreferredEndpoint(updatedPreference) .WithPendingDocuments(pendingDocuments) .WithPendingMappings(pendingMappings) .WithLastPublished(lastPublished); await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false); } public async Task ParseAsync(IServiceProvider services, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(services); var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false); if (cursor.PendingDocuments.Count == 0) { return; } var pendingDocuments = cursor.PendingDocuments.ToList(); var pendingMappings = cursor.PendingMappings.ToHashSet(); foreach (var documentId in cursor.PendingDocuments) { cancellationToken.ThrowIfCancellationRequested(); var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false); if (document is null) { pendingDocuments.Remove(documentId); pendingMappings.Remove(documentId); continue; } var metadata = AcscDocumentMetadata.FromDocument(document); var feedTag = string.IsNullOrWhiteSpace(metadata.FeedSlug) ? "(unknown)" : metadata.FeedSlug; _diagnostics.ParseAttempt(feedTag); if (!document.GridFsId.HasValue) { _diagnostics.ParseFailure(feedTag, "missingPayload"); _logger.LogWarning("ACSC document {DocumentId} missing GridFS payload (feed={Feed})", document.Id, feedTag); await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false); pendingDocuments.Remove(documentId); pendingMappings.Remove(documentId); continue; } byte[] rawBytes; try { rawBytes = await _rawDocumentStorage.DownloadAsync(document.GridFsId.Value, cancellationToken).ConfigureAwait(false); } catch (Exception ex) { _diagnostics.ParseFailure(feedTag, "download"); _logger.LogError(ex, "ACSC failed to download payload for document {DocumentId} (feed={Feed})", document.Id, feedTag); await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false); pendingDocuments.Remove(documentId); pendingMappings.Remove(documentId); continue; } try { var parsedAt = _timeProvider.GetUtcNow(); var dto = AcscFeedParser.Parse(rawBytes, metadata.FeedSlug, parsedAt, _htmlSanitizer); var json = JsonSerializer.Serialize(dto, SerializerOptions); var payload = BsonDocument.Parse(json); var existingDto = await _dtoStore.FindByDocumentIdAsync(document.Id, cancellationToken).ConfigureAwait(false); var dtoRecord = existingDto is null ? new DtoRecord(Guid.NewGuid(), document.Id, SourceName, "acsc.feed.v1", payload, parsedAt) : existingDto with { Payload = payload, SchemaVersion = "acsc.feed.v1", ValidatedAt = parsedAt, }; await _dtoStore.UpsertAsync(dtoRecord, cancellationToken).ConfigureAwait(false); await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.PendingMap, cancellationToken).ConfigureAwait(false); pendingDocuments.Remove(documentId); pendingMappings.Add(document.Id); _diagnostics.ParseSuccess(feedTag); _logger.LogInformation("ACSC parsed document {DocumentId} (feed={Feed}, entries={EntryCount})", document.Id, feedTag, dto.Entries.Count); } catch (Exception ex) { _diagnostics.ParseFailure(feedTag, "parse"); _logger.LogError(ex, "ACSC parse failed for document {DocumentId} (feed={Feed})", document.Id, feedTag); await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false); pendingDocuments.Remove(documentId); pendingMappings.Remove(documentId); } } var updatedCursor = cursor .WithPendingDocuments(pendingDocuments) .WithPendingMappings(pendingMappings); await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false); } public async Task MapAsync(IServiceProvider services, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(services); var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false); if (cursor.PendingMappings.Count == 0) { return; } var pendingMappings = cursor.PendingMappings.ToHashSet(); var documentIds = cursor.PendingMappings.ToList(); foreach (var documentId in documentIds) { cancellationToken.ThrowIfCancellationRequested(); var dtoRecord = await _dtoStore.FindByDocumentIdAsync(documentId, cancellationToken).ConfigureAwait(false); var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false); if (dtoRecord is null || document is null) { pendingMappings.Remove(documentId); continue; } AcscFeedDto? feed; try { var dtoJson = dtoRecord.Payload.ToJson(new JsonWriterSettings { OutputMode = JsonOutputMode.RelaxedExtendedJson, }); feed = JsonSerializer.Deserialize(dtoJson, SerializerOptions); } catch (Exception ex) { _logger.LogError(ex, "ACSC mapping failed to deserialize DTO for document {DocumentId}", document.Id); await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false); pendingMappings.Remove(documentId); continue; } if (feed is null) { _logger.LogWarning("ACSC mapping encountered null DTO payload for document {DocumentId}", document.Id); await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false); pendingMappings.Remove(documentId); continue; } var mappedAt = _timeProvider.GetUtcNow(); var advisories = AcscMapper.Map(feed, document, dtoRecord, SourceName, mappedAt); if (advisories.Count > 0) { foreach (var advisory in advisories) { await _advisoryStore.UpsertAsync(advisory, cancellationToken).ConfigureAwait(false); } _diagnostics.MapSuccess(advisories.Count); _logger.LogInformation( "ACSC mapped {Count} advisories from document {DocumentId} (feed={Feed})", advisories.Count, document.Id, feed.FeedSlug ?? "(unknown)"); } else { _logger.LogInformation( "ACSC mapping produced no advisories for document {DocumentId} (feed={Feed})", document.Id, feed.FeedSlug ?? "(unknown)"); } await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Mapped, cancellationToken).ConfigureAwait(false); pendingMappings.Remove(documentId); } var updatedCursor = cursor.WithPendingMappings(pendingMappings); await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false); } public async Task ProbeAsync(CancellationToken cancellationToken) { var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false); if (_options.ForceRelay) { if (cursor.PreferredEndpoint != AcscEndpointPreference.Relay) { await UpdateCursorAsync(cursor.WithPreferredEndpoint(AcscEndpointPreference.Relay), cancellationToken).ConfigureAwait(false); } return; } if (!IsRelayConfigured) { if (cursor.PreferredEndpoint != AcscEndpointPreference.Direct) { await UpdateCursorAsync(cursor.WithPreferredEndpoint(AcscEndpointPreference.Direct), cancellationToken).ConfigureAwait(false); } return; } var feed = GetEnabledFeeds().FirstOrDefault(); if (feed is null) { return; } var httpClient = _httpClientFactory.CreateClient(AcscOptions.HttpClientName); httpClient.Timeout = TimeSpan.FromSeconds(15); var directUri = BuildFeedUri(feed, AcscFetchMode.Direct); try { using var headRequest = new HttpRequestMessage(HttpMethod.Head, directUri); using var response = await httpClient.SendAsync(headRequest, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); if (response.IsSuccessStatusCode) { if (cursor.PreferredEndpoint != AcscEndpointPreference.Direct) { await UpdateCursorAsync(cursor.WithPreferredEndpoint(AcscEndpointPreference.Direct), cancellationToken).ConfigureAwait(false); _logger.LogInformation("ACSC probe succeeded via direct endpoint ({StatusCode}); relay preference cleared.", (int)response.StatusCode); } return; } if (response.StatusCode == HttpStatusCode.MethodNotAllowed) { using var probeRequest = new HttpRequestMessage(HttpMethod.Get, directUri); using var probeResponse = await httpClient.SendAsync(probeRequest, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); if (probeResponse.IsSuccessStatusCode) { if (cursor.PreferredEndpoint != AcscEndpointPreference.Direct) { await UpdateCursorAsync(cursor.WithPreferredEndpoint(AcscEndpointPreference.Direct), cancellationToken).ConfigureAwait(false); _logger.LogInformation("ACSC probe succeeded via direct endpoint after GET fallback ({StatusCode}).", (int)probeResponse.StatusCode); } return; } } _logger.LogWarning("ACSC direct probe returned HTTP {StatusCode}; relay preference enabled.", (int)response.StatusCode); } catch (Exception ex) { _logger.LogWarning(ex, "ACSC direct probe failed; relay preference will be enabled."); } if (cursor.PreferredEndpoint != AcscEndpointPreference.Relay) { await UpdateCursorAsync(cursor.WithPreferredEndpoint(AcscEndpointPreference.Relay), cancellationToken).ConfigureAwait(false); } } private bool ShouldRetryWithRelay(AcscFetchMode mode) => mode == AcscFetchMode.Direct && _options.EnableRelayFallback && IsRelayConfigured && !_options.ForceRelay; private IEnumerable BuildFetchOrder(AcscEndpointPreference preference) { if (_options.ForceRelay) { if (IsRelayConfigured) { yield return AcscFetchMode.Relay; } yield break; } if (!IsRelayConfigured) { yield return AcscFetchMode.Direct; yield break; } var preferRelay = preference == AcscEndpointPreference.Relay; if (preference == AcscEndpointPreference.Auto) { preferRelay = _options.PreferRelayByDefault; } if (preferRelay) { yield return AcscFetchMode.Relay; if (_options.EnableRelayFallback) { yield return AcscFetchMode.Direct; } } else { yield return AcscFetchMode.Direct; if (_options.EnableRelayFallback) { yield return AcscFetchMode.Relay; } } } private AcscEndpointPreference ResolveInitialPreference(AcscCursor cursor) { if (_options.ForceRelay) { return AcscEndpointPreference.Relay; } if (!IsRelayConfigured) { return AcscEndpointPreference.Direct; } if (cursor.PreferredEndpoint != AcscEndpointPreference.Auto) { return cursor.PreferredEndpoint; } return _options.PreferRelayByDefault ? AcscEndpointPreference.Relay : AcscEndpointPreference.Direct; } private async Task TryComputeLatestPublishedAsync(DocumentRecord document, CancellationToken cancellationToken) { if (!document.GridFsId.HasValue) { return null; } var rawBytes = await _rawDocumentStorage.DownloadAsync(document.GridFsId.Value, cancellationToken).ConfigureAwait(false); if (rawBytes.Length == 0) { return null; } try { using var memoryStream = new MemoryStream(rawBytes, writable: false); var xml = XDocument.Load(memoryStream, LoadOptions.None); DateTimeOffset? latest = null; foreach (var element in xml.Descendants()) { if (!IsEntryElement(element.Name.LocalName)) { continue; } var published = ExtractPublished(element); if (!published.HasValue) { continue; } if (latest is null || published.Value > latest.Value) { latest = published; } } return latest; } catch (Exception ex) { _logger.LogWarning(ex, "ACSC failed to derive published cursor for document {DocumentId} ({Uri})", document.Id, document.Uri); return null; } } private static bool IsEntryElement(string localName) => string.Equals(localName, "item", StringComparison.OrdinalIgnoreCase) || string.Equals(localName, "entry", StringComparison.OrdinalIgnoreCase); private static DateTimeOffset? ExtractPublished(XElement element) { foreach (var name in EnumerateTimestampNames(element)) { if (DateTimeOffset.TryParse( name.Value, CultureInfo.InvariantCulture, DateTimeStyles.AllowWhiteSpaces | DateTimeStyles.AssumeUniversal, out var parsed)) { return parsed.ToUniversalTime(); } } return null; } private static IEnumerable EnumerateTimestampNames(XElement element) { foreach (var child in element.Elements()) { var localName = child.Name.LocalName; if (string.Equals(localName, "pubDate", StringComparison.OrdinalIgnoreCase) || string.Equals(localName, "published", StringComparison.OrdinalIgnoreCase) || string.Equals(localName, "updated", StringComparison.OrdinalIgnoreCase) || string.Equals(localName, "date", StringComparison.OrdinalIgnoreCase)) { yield return child; } } } private Dictionary CreateMetadata(AcscFeedOptions feed, AcscCursor cursor, string mode) { var metadata = new Dictionary(StringComparer.Ordinal) { ["acsc.feed.slug"] = feed.Slug, ["acsc.fetch.mode"] = mode, }; if (cursor.LastPublishedByFeed.TryGetValue(feed.Slug, out var published) && published.HasValue) { metadata["acsc.cursor.lastPublished"] = published.Value.ToString("O"); } return metadata; } private Uri BuildFeedUri(AcscFeedOptions feed, AcscFetchMode mode) { var baseUri = mode switch { AcscFetchMode.Relay when IsRelayConfigured => _options.RelayEndpoint!, _ => _options.BaseEndpoint, }; return new Uri(baseUri, feed.RelativePath); } private IEnumerable GetEnabledFeeds() => _options.Feeds.Where(feed => feed is { Enabled: true }); private Task GetCursorAsync(CancellationToken cancellationToken) => GetCursorCoreAsync(cancellationToken); private async Task GetCursorCoreAsync(CancellationToken cancellationToken) { var state = await _stateRepository.TryGetAsync(SourceName, cancellationToken).ConfigureAwait(false); return state is null ? AcscCursor.Empty : AcscCursor.FromBson(state.Cursor); } private Task UpdateCursorAsync(AcscCursor cursor, CancellationToken cancellationToken) { var document = cursor.ToBsonDocument(); var completedAt = _timeProvider.GetUtcNow(); return _stateRepository.UpdateCursorAsync(SourceName, document, completedAt, cancellationToken); } private bool IsRelayConfigured => _options.RelayEndpoint is not null; private static string ModeName(AcscFetchMode mode) => mode switch { AcscFetchMode.Relay => "relay", _ => "direct", }; private enum AcscFetchMode { Direct = 0, Relay = 1, } }