Rename Concelier Source modules to Connector

This commit is contained in:
2025-10-18 20:11:18 +03:00
parent 0137856fdb
commit 6524626230
789 changed files with 1489 additions and 1489 deletions

View File

@@ -0,0 +1,338 @@
using System.Diagnostics;
using System.Globalization;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Security.Cryptography;
using System.Text;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MongoDB.Bson;
using StellaOps.Concelier.Connector.Common.Http;
using StellaOps.Concelier.Connector.Common.Telemetry;
using StellaOps.Concelier.Storage.Mongo;
using StellaOps.Concelier.Storage.Mongo.Documents;
namespace StellaOps.Concelier.Connector.Common.Fetch;
/// <summary>
/// Executes HTTP fetches for connectors, capturing raw responses with metadata for downstream stages.
/// </summary>
public sealed class SourceFetchService
{
private static readonly string[] DefaultAcceptHeaders = { "application/json" };
private readonly IHttpClientFactory _httpClientFactory;
private readonly RawDocumentStorage _rawDocumentStorage;
private readonly IDocumentStore _documentStore;
private readonly ILogger<SourceFetchService> _logger;
private readonly TimeProvider _timeProvider;
private readonly IOptionsMonitor<SourceHttpClientOptions> _httpClientOptions;
private readonly IOptions<MongoStorageOptions> _storageOptions;
private readonly IJitterSource _jitterSource;
public SourceFetchService(
IHttpClientFactory httpClientFactory,
RawDocumentStorage rawDocumentStorage,
IDocumentStore documentStore,
ILogger<SourceFetchService> logger,
IJitterSource jitterSource,
TimeProvider? timeProvider = null,
IOptionsMonitor<SourceHttpClientOptions>? httpClientOptions = null,
IOptions<MongoStorageOptions>? storageOptions = null)
{
_httpClientFactory = httpClientFactory ?? throw new ArgumentNullException(nameof(httpClientFactory));
_rawDocumentStorage = rawDocumentStorage ?? throw new ArgumentNullException(nameof(rawDocumentStorage));
_documentStore = documentStore ?? throw new ArgumentNullException(nameof(documentStore));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_jitterSource = jitterSource ?? throw new ArgumentNullException(nameof(jitterSource));
_timeProvider = timeProvider ?? TimeProvider.System;
_httpClientOptions = httpClientOptions ?? throw new ArgumentNullException(nameof(httpClientOptions));
_storageOptions = storageOptions ?? throw new ArgumentNullException(nameof(storageOptions));
}
public async Task<SourceFetchResult> FetchAsync(SourceFetchRequest request, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(request);
using var activity = SourceDiagnostics.StartFetch(request.SourceName, request.RequestUri, request.Method.Method, request.ClientName);
var stopwatch = Stopwatch.StartNew();
try
{
var sendResult = await SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
var response = sendResult.Response;
using (response)
{
var duration = stopwatch.Elapsed;
activity?.SetTag("http.status_code", (int)response.StatusCode);
activity?.SetTag("http.retry.count", sendResult.Attempts - 1);
var rateLimitRemaining = TryGetHeaderValue(response.Headers, "x-ratelimit-remaining");
if (response.StatusCode == HttpStatusCode.NotModified)
{
_logger.LogDebug("Source {Source} returned 304 Not Modified for {Uri}", request.SourceName, request.RequestUri);
SourceDiagnostics.RecordHttpRequest(request.SourceName, request.ClientName, response.StatusCode, sendResult.Attempts, duration, response.Content.Headers.ContentLength, rateLimitRemaining);
activity?.SetStatus(ActivityStatusCode.Ok);
return SourceFetchResult.NotModified(response.StatusCode);
}
if (!response.IsSuccessStatusCode)
{
var body = await ReadResponsePreviewAsync(response, cancellationToken).ConfigureAwait(false);
SourceDiagnostics.RecordHttpRequest(request.SourceName, request.ClientName, response.StatusCode, sendResult.Attempts, duration, response.Content.Headers.ContentLength, rateLimitRemaining);
activity?.SetStatus(ActivityStatusCode.Error, body);
throw new HttpRequestException($"Fetch failed with status {(int)response.StatusCode} {response.StatusCode} from {request.RequestUri}. Body preview: {body}");
}
var contentBytes = await response.Content.ReadAsByteArrayAsync(cancellationToken).ConfigureAwait(false);
var sha256 = Convert.ToHexString(SHA256.HashData(contentBytes)).ToLowerInvariant();
var fetchedAt = _timeProvider.GetUtcNow();
var contentType = response.Content.Headers.ContentType?.ToString();
var storageOptions = _storageOptions.Value;
var retention = storageOptions.RawDocumentRetention;
DateTimeOffset? expiresAt = null;
if (retention > TimeSpan.Zero)
{
var grace = storageOptions.RawDocumentRetentionTtlGrace >= TimeSpan.Zero
? storageOptions.RawDocumentRetentionTtlGrace
: TimeSpan.Zero;
try
{
expiresAt = fetchedAt.Add(retention).Add(grace);
}
catch (ArgumentOutOfRangeException)
{
expiresAt = DateTimeOffset.MaxValue;
}
}
var gridFsId = await _rawDocumentStorage.UploadAsync(
request.SourceName,
request.RequestUri.ToString(),
contentBytes,
contentType,
expiresAt,
cancellationToken).ConfigureAwait(false);
var headers = CreateHeaderDictionary(response);
var metadata = request.Metadata is null
? new Dictionary<string, string>(StringComparer.Ordinal)
: new Dictionary<string, string>(request.Metadata, StringComparer.Ordinal);
metadata["attempts"] = sendResult.Attempts.ToString(CultureInfo.InvariantCulture);
metadata["fetchedAt"] = fetchedAt.ToString("O");
var existing = await _documentStore.FindBySourceAndUriAsync(request.SourceName, request.RequestUri.ToString(), cancellationToken).ConfigureAwait(false);
var recordId = existing?.Id ?? Guid.NewGuid();
var record = new DocumentRecord(
recordId,
request.SourceName,
request.RequestUri.ToString(),
fetchedAt,
sha256,
DocumentStatuses.PendingParse,
contentType,
headers,
metadata,
response.Headers.ETag?.Tag,
response.Content.Headers.LastModified,
gridFsId,
expiresAt);
var upserted = await _documentStore.UpsertAsync(record, cancellationToken).ConfigureAwait(false);
SourceDiagnostics.RecordHttpRequest(request.SourceName, request.ClientName, response.StatusCode, sendResult.Attempts, duration, contentBytes.LongLength, rateLimitRemaining);
activity?.SetStatus(ActivityStatusCode.Ok);
_logger.LogInformation("Fetched {Source} document {Uri} (sha256={Sha})", request.SourceName, request.RequestUri, sha256);
return SourceFetchResult.Success(upserted, response.StatusCode);
}
}
catch (Exception ex) when (ex is HttpRequestException or TaskCanceledException)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
throw;
}
}
public async Task<SourceFetchContentResult> FetchContentAsync(SourceFetchRequest request, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(request);
using var activity = SourceDiagnostics.StartFetch(request.SourceName, request.RequestUri, request.Method.Method, request.ClientName);
var stopwatch = Stopwatch.StartNew();
try
{
_ = _httpClientOptions.Get(request.ClientName);
var sendResult = await SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
var response = sendResult.Response;
using (response)
{
var duration = stopwatch.Elapsed;
activity?.SetTag("http.status_code", (int)response.StatusCode);
activity?.SetTag("http.retry.count", sendResult.Attempts - 1);
var rateLimitRemaining = TryGetHeaderValue(response.Headers, "x-ratelimit-remaining");
if (response.StatusCode == HttpStatusCode.NotModified)
{
_logger.LogDebug("Source {Source} returned 304 Not Modified for {Uri}", request.SourceName, request.RequestUri);
SourceDiagnostics.RecordHttpRequest(request.SourceName, request.ClientName, response.StatusCode, sendResult.Attempts, duration, response.Content.Headers.ContentLength, rateLimitRemaining);
activity?.SetStatus(ActivityStatusCode.Ok);
return SourceFetchContentResult.NotModified(response.StatusCode, sendResult.Attempts);
}
if (!response.IsSuccessStatusCode)
{
var body = await ReadResponsePreviewAsync(response, cancellationToken).ConfigureAwait(false);
SourceDiagnostics.RecordHttpRequest(request.SourceName, request.ClientName, response.StatusCode, sendResult.Attempts, duration, response.Content.Headers.ContentLength, rateLimitRemaining);
activity?.SetStatus(ActivityStatusCode.Error, body);
throw new HttpRequestException($"Fetch failed with status {(int)response.StatusCode} {response.StatusCode} from {request.RequestUri}. Body preview: {body}");
}
var contentBytes = await response.Content.ReadAsByteArrayAsync(cancellationToken).ConfigureAwait(false);
var headers = CreateHeaderDictionary(response);
SourceDiagnostics.RecordHttpRequest(request.SourceName, request.ClientName, response.StatusCode, sendResult.Attempts, duration, response.Content.Headers.ContentLength ?? contentBytes.LongLength, rateLimitRemaining);
activity?.SetStatus(ActivityStatusCode.Ok);
return SourceFetchContentResult.Success(
response.StatusCode,
contentBytes,
response.Headers.ETag?.Tag,
response.Content.Headers.LastModified,
response.Content.Headers.ContentType?.ToString(),
sendResult.Attempts,
headers);
}
}
catch (Exception ex) when (ex is HttpRequestException or TaskCanceledException)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
throw;
}
}
private async Task<SourceFetchSendResult> SendAsync(SourceFetchRequest request, HttpCompletionOption completionOption, CancellationToken cancellationToken)
{
var attemptCount = 0;
var options = _httpClientOptions.Get(request.ClientName);
var response = await SourceRetryPolicy.SendWithRetryAsync(
() => CreateHttpRequestMessage(request),
async (httpRequest, ct) =>
{
attemptCount++;
var client = _httpClientFactory.CreateClient(request.ClientName);
if (request.TimeoutOverride.HasValue)
{
client.Timeout = request.TimeoutOverride.Value;
}
return await client.SendAsync(httpRequest, completionOption, ct).ConfigureAwait(false);
},
maxAttempts: options.MaxAttempts,
baseDelay: options.BaseDelay,
_jitterSource,
context => SourceDiagnostics.RecordRetry(
request.SourceName,
request.ClientName,
context.Response?.StatusCode,
context.Attempt,
context.Delay),
cancellationToken).ConfigureAwait(false);
return new SourceFetchSendResult(response, attemptCount);
}
internal static HttpRequestMessage CreateHttpRequestMessage(SourceFetchRequest request)
{
var httpRequest = new HttpRequestMessage(request.Method, request.RequestUri);
var acceptValues = request.AcceptHeaders is { Count: > 0 } headers
? headers
: DefaultAcceptHeaders;
httpRequest.Headers.Accept.Clear();
var added = false;
foreach (var mediaType in acceptValues)
{
if (string.IsNullOrWhiteSpace(mediaType))
{
continue;
}
if (MediaTypeWithQualityHeaderValue.TryParse(mediaType, out var headerValue))
{
httpRequest.Headers.Accept.Add(headerValue);
added = true;
}
}
if (!added)
{
httpRequest.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue(DefaultAcceptHeaders[0]));
}
if (!string.IsNullOrWhiteSpace(request.ETag))
{
if (System.Net.Http.Headers.EntityTagHeaderValue.TryParse(request.ETag, out var etag))
{
httpRequest.Headers.IfNoneMatch.Add(etag);
}
}
if (request.LastModified.HasValue)
{
httpRequest.Headers.IfModifiedSince = request.LastModified.Value;
}
return httpRequest;
}
private static async Task<string> ReadResponsePreviewAsync(HttpResponseMessage response, CancellationToken cancellationToken)
{
try
{
var buffer = await response.Content.ReadAsByteArrayAsync(cancellationToken).ConfigureAwait(false);
var preview = Encoding.UTF8.GetString(buffer);
return preview.Length > 256 ? preview[..256] : preview;
}
catch
{
return "<unavailable>";
}
}
private static string? TryGetHeaderValue(HttpResponseHeaders headers, string name)
{
if (headers.TryGetValues(name, out var values))
{
return values.FirstOrDefault();
}
return null;
}
private static Dictionary<string, string> CreateHeaderDictionary(HttpResponseMessage response)
{
var headers = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
foreach (var header in response.Headers)
{
headers[header.Key] = string.Join(",", header.Value);
}
foreach (var header in response.Content.Headers)
{
headers[header.Key] = string.Join(",", header.Value);
}
return headers;
}
private readonly record struct SourceFetchSendResult(HttpResponseMessage Response, int Attempts);
}