save progress
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
using System.Globalization;
|
||||
using StellaOps.Concelier.Documents;
|
||||
|
||||
namespace StellaOps.Concelier.Connector.Common.Cursors;
|
||||
@@ -69,7 +70,11 @@ public sealed record TimeWindowCursorState(DateTimeOffset? LastWindowStart, Date
|
||||
return value.DocumentType switch
|
||||
{
|
||||
DocumentType.DateTime => new DateTimeOffset(value.ToUniversalTime(), TimeSpan.Zero),
|
||||
DocumentType.String when DateTimeOffset.TryParse(value.AsString, out var parsed) => parsed.ToUniversalTime(),
|
||||
DocumentType.String when DateTimeOffset.TryParse(
|
||||
value.AsString,
|
||||
CultureInfo.InvariantCulture,
|
||||
DateTimeStyles.AssumeUniversal | DateTimeStyles.AdjustToUniversal,
|
||||
out var parsed) => parsed.ToUniversalTime(),
|
||||
_ => null,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
using System.Collections.Concurrent;
|
||||
using System.IO;
|
||||
using System.Security.Cryptography;
|
||||
using System.Text;
|
||||
using StellaOps.Concelier.Storage;
|
||||
|
||||
namespace StellaOps.Concelier.Connector.Common.Fetch;
|
||||
@@ -9,12 +11,16 @@ namespace StellaOps.Concelier.Connector.Common.Fetch;
|
||||
/// </summary>
|
||||
public sealed class RawDocumentStorage
|
||||
{
|
||||
private readonly ConcurrentDictionary<Guid, byte[]> _blobs = new();
|
||||
private readonly IDocumentStore? _documentStore;
|
||||
private sealed record RawDocumentEntry(byte[] Payload, DateTimeOffset? ExpiresAt);
|
||||
|
||||
public RawDocumentStorage(IDocumentStore? documentStore = null)
|
||||
private readonly ConcurrentDictionary<Guid, RawDocumentEntry> _blobs = new();
|
||||
private readonly IDocumentStore? _documentStore;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
|
||||
public RawDocumentStorage(IDocumentStore? documentStore = null, TimeProvider? timeProvider = null)
|
||||
{
|
||||
_documentStore = documentStore;
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
}
|
||||
|
||||
public Task<Guid> UploadAsync(
|
||||
@@ -37,20 +43,30 @@ public sealed class RawDocumentStorage
|
||||
ArgumentException.ThrowIfNullOrEmpty(sourceName);
|
||||
ArgumentException.ThrowIfNullOrEmpty(uri);
|
||||
ArgumentNullException.ThrowIfNull(content);
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var id = documentId ?? Guid.NewGuid();
|
||||
var id = documentId ?? CreateDeterministicGuid($"{sourceName}:{uri}");
|
||||
var copy = new byte[content.Length];
|
||||
Buffer.BlockCopy(content, 0, copy, 0, content.Length);
|
||||
_blobs[id] = copy;
|
||||
_blobs[id] = new RawDocumentEntry(copy, ExpiresAt);
|
||||
await Task.CompletedTask.ConfigureAwait(false);
|
||||
return id;
|
||||
}
|
||||
|
||||
public async Task<byte[]> DownloadAsync(Guid id, CancellationToken cancellationToken)
|
||||
{
|
||||
if (_blobs.TryGetValue(id, out var bytes))
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
if (_blobs.TryGetValue(id, out var entry))
|
||||
{
|
||||
return bytes;
|
||||
if (entry.ExpiresAt.HasValue && entry.ExpiresAt.Value <= _timeProvider.GetUtcNow())
|
||||
{
|
||||
_blobs.TryRemove(id, out _);
|
||||
}
|
||||
else
|
||||
{
|
||||
return entry.Payload;
|
||||
}
|
||||
}
|
||||
|
||||
if (_documentStore is not null)
|
||||
@@ -58,7 +74,7 @@ public sealed class RawDocumentStorage
|
||||
var record = await _documentStore.FindAsync(id, cancellationToken).ConfigureAwait(false);
|
||||
if (record?.Payload is { Length: > 0 })
|
||||
{
|
||||
_blobs[id] = record.Payload;
|
||||
_blobs[id] = new RawDocumentEntry(record.Payload, record.ExpiresAt);
|
||||
return record.Payload;
|
||||
}
|
||||
}
|
||||
@@ -68,7 +84,16 @@ public sealed class RawDocumentStorage
|
||||
|
||||
public async Task DeleteAsync(Guid id, CancellationToken cancellationToken)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
_blobs.TryRemove(id, out _);
|
||||
await Task.CompletedTask.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private static Guid CreateDeterministicGuid(string value)
|
||||
{
|
||||
var bytes = SHA256.HashData(Encoding.UTF8.GetBytes(value ?? string.Empty));
|
||||
bytes[6] = (byte)((bytes[6] & 0x0F) | 0x50);
|
||||
bytes[8] = (byte)((bytes[8] & 0x3F) | 0x80);
|
||||
return new Guid(bytes.AsSpan(0, 16));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ using System.Net;
|
||||
using System.Net.Http;
|
||||
using System.Net.Http.Headers;
|
||||
using System.Text;
|
||||
using System.Security.Cryptography;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using StellaOps.Concelier.Documents;
|
||||
@@ -182,7 +183,7 @@ public sealed class SourceFetchService
|
||||
}
|
||||
|
||||
var existing = await _storageDocumentStore.FindBySourceAndUriAsync(request.SourceName, request.RequestUri.ToString(), cancellationToken).ConfigureAwait(false);
|
||||
var recordId = existing?.Id ?? Guid.NewGuid();
|
||||
var recordId = existing?.Id ?? CreateDeterministicGuid($"{request.SourceName}:{request.RequestUri}");
|
||||
|
||||
var payloadId = await _rawDocumentStorage.UploadAsync(
|
||||
request.SourceName,
|
||||
@@ -701,6 +702,7 @@ public sealed class SourceFetchService
|
||||
maxAttempts: options.MaxAttempts,
|
||||
baseDelay: options.BaseDelay,
|
||||
_jitterSource,
|
||||
_timeProvider,
|
||||
context => SourceDiagnostics.RecordRetry(
|
||||
request.SourceName,
|
||||
request.ClientName,
|
||||
@@ -770,6 +772,14 @@ public sealed class SourceFetchService
|
||||
}
|
||||
}
|
||||
|
||||
private static Guid CreateDeterministicGuid(string value)
|
||||
{
|
||||
var bytes = SHA256.HashData(Encoding.UTF8.GetBytes(value ?? string.Empty));
|
||||
bytes[6] = (byte)((bytes[6] & 0x0F) | 0x50);
|
||||
bytes[8] = (byte)((bytes[8] & 0x3F) | 0x80);
|
||||
return new Guid(bytes.AsSpan(0, 16));
|
||||
}
|
||||
|
||||
private static string? TryGetHeaderValue(HttpResponseHeaders headers, string name)
|
||||
{
|
||||
if (headers.TryGetValues(name, out var values))
|
||||
|
||||
@@ -16,12 +16,14 @@ internal static class SourceRetryPolicy
|
||||
int maxAttempts,
|
||||
TimeSpan baseDelay,
|
||||
IJitterSource jitterSource,
|
||||
TimeProvider timeProvider,
|
||||
Action<SourceRetryAttemptContext>? onRetry,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(requestFactory);
|
||||
ArgumentNullException.ThrowIfNull(sender);
|
||||
ArgumentNullException.ThrowIfNull(jitterSource);
|
||||
ArgumentNullException.ThrowIfNull(timeProvider);
|
||||
|
||||
var attempt = 0;
|
||||
|
||||
@@ -48,7 +50,7 @@ internal static class SourceRetryPolicy
|
||||
var delay = ComputeDelay(
|
||||
baseDelay,
|
||||
attempt,
|
||||
GetRetryAfter(response),
|
||||
GetRetryAfter(response, timeProvider),
|
||||
jitterSource);
|
||||
onRetry?.Invoke(new SourceRetryAttemptContext(attempt, response, null, delay));
|
||||
response.Dispose();
|
||||
@@ -76,7 +78,7 @@ internal static class SourceRetryPolicy
|
||||
return status >= 500 && status < 600;
|
||||
}
|
||||
|
||||
private static TimeSpan ComputeDelay(TimeSpan baseDelay, int attempt, TimeSpan? retryAfter = null, IJitterSource? jitterSource = null)
|
||||
private static TimeSpan ComputeDelay(TimeSpan baseDelay, int attempt, TimeSpan? retryAfter, IJitterSource jitterSource)
|
||||
{
|
||||
if (retryAfter.HasValue && retryAfter.Value > TimeSpan.Zero)
|
||||
{
|
||||
@@ -84,8 +86,7 @@ internal static class SourceRetryPolicy
|
||||
}
|
||||
|
||||
var exponential = TimeSpan.FromMilliseconds(baseDelay.TotalMilliseconds * Math.Pow(2, attempt - 1));
|
||||
var jitter = jitterSource?.Next(TimeSpan.FromMilliseconds(50), TimeSpan.FromMilliseconds(250))
|
||||
?? TimeSpan.FromMilliseconds(Random.Shared.Next(50, 250));
|
||||
var jitter = jitterSource.Next(TimeSpan.FromMilliseconds(50), TimeSpan.FromMilliseconds(250));
|
||||
return exponential + jitter;
|
||||
}
|
||||
|
||||
@@ -130,7 +131,7 @@ internal static class SourceRetryPolicy
|
||||
return false;
|
||||
}
|
||||
|
||||
private static TimeSpan? GetRetryAfter(HttpResponseMessage response)
|
||||
private static TimeSpan? GetRetryAfter(HttpResponseMessage response, TimeProvider timeProvider)
|
||||
{
|
||||
var retryAfter = response.Headers.RetryAfter;
|
||||
if (retryAfter is not null)
|
||||
@@ -142,7 +143,7 @@ internal static class SourceRetryPolicy
|
||||
|
||||
if (retryAfter.Date.HasValue)
|
||||
{
|
||||
var delta = retryAfter.Date.Value - DateTimeOffset.UtcNow;
|
||||
var delta = retryAfter.Date.Value - timeProvider.GetUtcNow();
|
||||
if (delta > TimeSpan.Zero)
|
||||
{
|
||||
return delta;
|
||||
@@ -168,7 +169,7 @@ internal static class SourceRetryPolicy
|
||||
if (long.TryParse(value, NumberStyles.Integer, CultureInfo.InvariantCulture, out var epochSeconds))
|
||||
{
|
||||
var resetTime = DateTimeOffset.FromUnixTimeSeconds(epochSeconds);
|
||||
var delta = resetTime - DateTimeOffset.UtcNow;
|
||||
var delta = resetTime - timeProvider.GetUtcNow();
|
||||
if (delta > TimeSpan.Zero)
|
||||
{
|
||||
return delta;
|
||||
|
||||
@@ -7,7 +7,7 @@ namespace StellaOps.Concelier.Connector.Common.Http;
|
||||
/// </summary>
|
||||
internal sealed class AllowlistedHttpMessageHandler : DelegatingHandler
|
||||
{
|
||||
private readonly IReadOnlyCollection<string> _allowedHosts;
|
||||
private readonly HashSet<string> _allowedHosts;
|
||||
|
||||
public AllowlistedHttpMessageHandler(SourceHttpClientOptions options)
|
||||
{
|
||||
@@ -18,7 +18,7 @@ internal sealed class AllowlistedHttpMessageHandler : DelegatingHandler
|
||||
throw new InvalidOperationException("Source HTTP client must configure at least one allowed host.");
|
||||
}
|
||||
|
||||
_allowedHosts = snapshot;
|
||||
_allowedHosts = new HashSet<string>(snapshot, StringComparer.OrdinalIgnoreCase);
|
||||
}
|
||||
|
||||
protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
|
||||
|
||||
@@ -42,7 +42,7 @@ public sealed class PdfTextExtractor
|
||||
{
|
||||
page = document.GetPage(index);
|
||||
}
|
||||
catch (InvalidOperationException ex) when (ex.Message.Contains("empty stack", StringComparison.OrdinalIgnoreCase))
|
||||
catch (InvalidOperationException)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
@@ -70,7 +70,7 @@ public sealed class PdfTextExtractor
|
||||
text = FlattenWords(page.GetWords());
|
||||
}
|
||||
}
|
||||
catch (InvalidOperationException ex) when (ex.Message.Contains("empty stack", StringComparison.OrdinalIgnoreCase))
|
||||
catch (InvalidOperationException)
|
||||
{
|
||||
try
|
||||
{
|
||||
@@ -97,7 +97,11 @@ public sealed class PdfTextExtractor
|
||||
|
||||
if (builder.Length == 0)
|
||||
{
|
||||
var raw = Encoding.ASCII.GetString(rawBytes);
|
||||
var raw = Encoding.UTF8.GetString(rawBytes);
|
||||
if (raw.Contains('\uFFFD', StringComparison.Ordinal))
|
||||
{
|
||||
raw = Encoding.Latin1.GetString(rawBytes);
|
||||
}
|
||||
var matches = Regex.Matches(raw, "\\(([^\\)]+)\\)", RegexOptions.CultureInvariant);
|
||||
foreach (Match match in matches)
|
||||
{
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
using System.Security.Cryptography;
|
||||
using System.Text;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using StellaOps.Concelier.Documents;
|
||||
@@ -144,7 +146,7 @@ public sealed class SourceStateSeedProcessor
|
||||
|
||||
var existing = await _documentStore.FindBySourceAndUriAsync(source, document.Uri, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var recordId = document.DocumentId ?? existing?.Id ?? Guid.NewGuid();
|
||||
var recordId = document.DocumentId ?? existing?.Id ?? CreateDeterministicGuid($"{source}:{document.Uri}");
|
||||
|
||||
if (existing?.PayloadId is { } oldGridId)
|
||||
{
|
||||
@@ -332,4 +334,11 @@ public sealed class SourceStateSeedProcessor
|
||||
}
|
||||
}
|
||||
|
||||
private static Guid CreateDeterministicGuid(string value)
|
||||
{
|
||||
var bytes = SHA256.HashData(Encoding.UTF8.GetBytes(value ?? string.Empty));
|
||||
bytes[6] = (byte)((bytes[6] & 0x0F) | 0x50);
|
||||
bytes[8] = (byte)((bytes[8] & 0x3F) | 0x80);
|
||||
return new Guid(bytes.AsSpan(0, 16));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
<TargetFramework>net10.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
|
||||
</PropertyGroup>
|
||||
<ItemGroup>
|
||||
<PackageReference Include="JsonSchema.Net" />
|
||||
|
||||
@@ -7,4 +7,4 @@ Source of truth: `docs/implplan/SPRINT_20251229_049_BE_csproj_audit_maint_tests.
|
||||
| --- | --- | --- |
|
||||
| AUDIT-0159-M | DONE | Maintainability audit for StellaOps.Concelier.Connector.Common. |
|
||||
| AUDIT-0159-T | DONE | Test coverage audit for StellaOps.Concelier.Connector.Common. |
|
||||
| AUDIT-0159-A | TODO | Pending approval for changes. |
|
||||
| AUDIT-0159-A | DONE | Determinism and telemetry fixes applied. |
|
||||
|
||||
Reference in New Issue
Block a user