using System.Diagnostics; using System.Globalization; using System.Linq; using System.Net; using System.Net.Http; using System.Net.Http.Headers; using System.Security.Cryptography; using System.Text; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using MongoDB.Bson; using StellaOps.Concelier.Connector.Common.Http; using StellaOps.Concelier.Connector.Common.Telemetry; using StellaOps.Concelier.Storage.Mongo; using StellaOps.Concelier.Storage.Mongo.Documents; namespace StellaOps.Concelier.Connector.Common.Fetch; /// /// Executes HTTP fetches for connectors, capturing raw responses with metadata for downstream stages. /// public sealed class SourceFetchService { private static readonly string[] DefaultAcceptHeaders = { "application/json" }; private readonly IHttpClientFactory _httpClientFactory; private readonly RawDocumentStorage _rawDocumentStorage; private readonly IDocumentStore _documentStore; private readonly ILogger _logger; private readonly TimeProvider _timeProvider; private readonly IOptionsMonitor _httpClientOptions; private readonly IOptions _storageOptions; private readonly IJitterSource _jitterSource; public SourceFetchService( IHttpClientFactory httpClientFactory, RawDocumentStorage rawDocumentStorage, IDocumentStore documentStore, ILogger logger, IJitterSource jitterSource, TimeProvider? timeProvider = null, IOptionsMonitor? httpClientOptions = null, IOptions? storageOptions = null) { _httpClientFactory = httpClientFactory ?? throw new ArgumentNullException(nameof(httpClientFactory)); _rawDocumentStorage = rawDocumentStorage ?? throw new ArgumentNullException(nameof(rawDocumentStorage)); _documentStore = documentStore ?? throw new ArgumentNullException(nameof(documentStore)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _jitterSource = jitterSource ?? throw new ArgumentNullException(nameof(jitterSource)); _timeProvider = timeProvider ?? TimeProvider.System; _httpClientOptions = httpClientOptions ?? throw new ArgumentNullException(nameof(httpClientOptions)); _storageOptions = storageOptions ?? throw new ArgumentNullException(nameof(storageOptions)); } public async Task FetchAsync(SourceFetchRequest request, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(request); using var activity = SourceDiagnostics.StartFetch(request.SourceName, request.RequestUri, request.Method.Method, request.ClientName); var stopwatch = Stopwatch.StartNew(); try { var sendResult = await SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); var response = sendResult.Response; using (response) { var duration = stopwatch.Elapsed; activity?.SetTag("http.status_code", (int)response.StatusCode); activity?.SetTag("http.retry.count", sendResult.Attempts - 1); var rateLimitRemaining = TryGetHeaderValue(response.Headers, "x-ratelimit-remaining"); if (response.StatusCode == HttpStatusCode.NotModified) { _logger.LogDebug("Source {Source} returned 304 Not Modified for {Uri}", request.SourceName, request.RequestUri); SourceDiagnostics.RecordHttpRequest(request.SourceName, request.ClientName, response.StatusCode, sendResult.Attempts, duration, response.Content.Headers.ContentLength, rateLimitRemaining); activity?.SetStatus(ActivityStatusCode.Ok); return SourceFetchResult.NotModified(response.StatusCode); } if (!response.IsSuccessStatusCode) { var body = await ReadResponsePreviewAsync(response, cancellationToken).ConfigureAwait(false); SourceDiagnostics.RecordHttpRequest(request.SourceName, request.ClientName, response.StatusCode, sendResult.Attempts, duration, response.Content.Headers.ContentLength, rateLimitRemaining); activity?.SetStatus(ActivityStatusCode.Error, body); throw new HttpRequestException($"Fetch failed with status {(int)response.StatusCode} {response.StatusCode} from {request.RequestUri}. Body preview: {body}"); } var contentBytes = await response.Content.ReadAsByteArrayAsync(cancellationToken).ConfigureAwait(false); var sha256 = Convert.ToHexString(SHA256.HashData(contentBytes)).ToLowerInvariant(); var fetchedAt = _timeProvider.GetUtcNow(); var contentType = response.Content.Headers.ContentType?.ToString(); var storageOptions = _storageOptions.Value; var retention = storageOptions.RawDocumentRetention; DateTimeOffset? expiresAt = null; if (retention > TimeSpan.Zero) { var grace = storageOptions.RawDocumentRetentionTtlGrace >= TimeSpan.Zero ? storageOptions.RawDocumentRetentionTtlGrace : TimeSpan.Zero; try { expiresAt = fetchedAt.Add(retention).Add(grace); } catch (ArgumentOutOfRangeException) { expiresAt = DateTimeOffset.MaxValue; } } var gridFsId = await _rawDocumentStorage.UploadAsync( request.SourceName, request.RequestUri.ToString(), contentBytes, contentType, expiresAt, cancellationToken).ConfigureAwait(false); var headers = CreateHeaderDictionary(response); var metadata = request.Metadata is null ? new Dictionary(StringComparer.Ordinal) : new Dictionary(request.Metadata, StringComparer.Ordinal); metadata["attempts"] = sendResult.Attempts.ToString(CultureInfo.InvariantCulture); metadata["fetchedAt"] = fetchedAt.ToString("O"); var existing = await _documentStore.FindBySourceAndUriAsync(request.SourceName, request.RequestUri.ToString(), cancellationToken).ConfigureAwait(false); var recordId = existing?.Id ?? Guid.NewGuid(); var record = new DocumentRecord( recordId, request.SourceName, request.RequestUri.ToString(), fetchedAt, sha256, DocumentStatuses.PendingParse, contentType, headers, metadata, response.Headers.ETag?.Tag, response.Content.Headers.LastModified, gridFsId, expiresAt); var upserted = await _documentStore.UpsertAsync(record, cancellationToken).ConfigureAwait(false); SourceDiagnostics.RecordHttpRequest(request.SourceName, request.ClientName, response.StatusCode, sendResult.Attempts, duration, contentBytes.LongLength, rateLimitRemaining); activity?.SetStatus(ActivityStatusCode.Ok); _logger.LogInformation("Fetched {Source} document {Uri} (sha256={Sha})", request.SourceName, request.RequestUri, sha256); return SourceFetchResult.Success(upserted, response.StatusCode); } } catch (Exception ex) when (ex is HttpRequestException or TaskCanceledException) { activity?.SetStatus(ActivityStatusCode.Error, ex.Message); throw; } } public async Task FetchContentAsync(SourceFetchRequest request, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(request); using var activity = SourceDiagnostics.StartFetch(request.SourceName, request.RequestUri, request.Method.Method, request.ClientName); var stopwatch = Stopwatch.StartNew(); try { _ = _httpClientOptions.Get(request.ClientName); var sendResult = await SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); var response = sendResult.Response; using (response) { var duration = stopwatch.Elapsed; activity?.SetTag("http.status_code", (int)response.StatusCode); activity?.SetTag("http.retry.count", sendResult.Attempts - 1); var rateLimitRemaining = TryGetHeaderValue(response.Headers, "x-ratelimit-remaining"); if (response.StatusCode == HttpStatusCode.NotModified) { _logger.LogDebug("Source {Source} returned 304 Not Modified for {Uri}", request.SourceName, request.RequestUri); SourceDiagnostics.RecordHttpRequest(request.SourceName, request.ClientName, response.StatusCode, sendResult.Attempts, duration, response.Content.Headers.ContentLength, rateLimitRemaining); activity?.SetStatus(ActivityStatusCode.Ok); return SourceFetchContentResult.NotModified(response.StatusCode, sendResult.Attempts); } if (!response.IsSuccessStatusCode) { var body = await ReadResponsePreviewAsync(response, cancellationToken).ConfigureAwait(false); SourceDiagnostics.RecordHttpRequest(request.SourceName, request.ClientName, response.StatusCode, sendResult.Attempts, duration, response.Content.Headers.ContentLength, rateLimitRemaining); activity?.SetStatus(ActivityStatusCode.Error, body); throw new HttpRequestException($"Fetch failed with status {(int)response.StatusCode} {response.StatusCode} from {request.RequestUri}. Body preview: {body}"); } var contentBytes = await response.Content.ReadAsByteArrayAsync(cancellationToken).ConfigureAwait(false); var headers = CreateHeaderDictionary(response); SourceDiagnostics.RecordHttpRequest(request.SourceName, request.ClientName, response.StatusCode, sendResult.Attempts, duration, response.Content.Headers.ContentLength ?? contentBytes.LongLength, rateLimitRemaining); activity?.SetStatus(ActivityStatusCode.Ok); return SourceFetchContentResult.Success( response.StatusCode, contentBytes, response.Headers.ETag?.Tag, response.Content.Headers.LastModified, response.Content.Headers.ContentType?.ToString(), sendResult.Attempts, headers); } } catch (Exception ex) when (ex is HttpRequestException or TaskCanceledException) { activity?.SetStatus(ActivityStatusCode.Error, ex.Message); throw; } } private async Task SendAsync(SourceFetchRequest request, HttpCompletionOption completionOption, CancellationToken cancellationToken) { var attemptCount = 0; var options = _httpClientOptions.Get(request.ClientName); var response = await SourceRetryPolicy.SendWithRetryAsync( () => CreateHttpRequestMessage(request), async (httpRequest, ct) => { attemptCount++; var client = _httpClientFactory.CreateClient(request.ClientName); if (request.TimeoutOverride.HasValue) { client.Timeout = request.TimeoutOverride.Value; } return await client.SendAsync(httpRequest, completionOption, ct).ConfigureAwait(false); }, maxAttempts: options.MaxAttempts, baseDelay: options.BaseDelay, _jitterSource, context => SourceDiagnostics.RecordRetry( request.SourceName, request.ClientName, context.Response?.StatusCode, context.Attempt, context.Delay), cancellationToken).ConfigureAwait(false); return new SourceFetchSendResult(response, attemptCount); } internal static HttpRequestMessage CreateHttpRequestMessage(SourceFetchRequest request) { var httpRequest = new HttpRequestMessage(request.Method, request.RequestUri); var acceptValues = request.AcceptHeaders is { Count: > 0 } headers ? headers : DefaultAcceptHeaders; httpRequest.Headers.Accept.Clear(); var added = false; foreach (var mediaType in acceptValues) { if (string.IsNullOrWhiteSpace(mediaType)) { continue; } if (MediaTypeWithQualityHeaderValue.TryParse(mediaType, out var headerValue)) { httpRequest.Headers.Accept.Add(headerValue); added = true; } } if (!added) { httpRequest.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue(DefaultAcceptHeaders[0])); } if (!string.IsNullOrWhiteSpace(request.ETag)) { if (System.Net.Http.Headers.EntityTagHeaderValue.TryParse(request.ETag, out var etag)) { httpRequest.Headers.IfNoneMatch.Add(etag); } } if (request.LastModified.HasValue) { httpRequest.Headers.IfModifiedSince = request.LastModified.Value; } return httpRequest; } private static async Task ReadResponsePreviewAsync(HttpResponseMessage response, CancellationToken cancellationToken) { try { var buffer = await response.Content.ReadAsByteArrayAsync(cancellationToken).ConfigureAwait(false); var preview = Encoding.UTF8.GetString(buffer); return preview.Length > 256 ? preview[..256] : preview; } catch { return ""; } } private static string? TryGetHeaderValue(HttpResponseHeaders headers, string name) { if (headers.TryGetValues(name, out var values)) { return values.FirstOrDefault(); } return null; } private static Dictionary CreateHeaderDictionary(HttpResponseMessage response) { var headers = new Dictionary(StringComparer.OrdinalIgnoreCase); foreach (var header in response.Headers) { headers[header.Key] = string.Join(",", header.Value); } foreach (var header in response.Content.Headers) { headers[header.Key] = string.Join(",", header.Value); } return headers; } private readonly record struct SourceFetchSendResult(HttpResponseMessage Response, int Attempts); }