up
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
Findings Ledger CI / build-test (push) Has been cancelled
Findings Ledger CI / migration-validation (push) Has been cancelled
Scanner Analyzers / Discover Analyzers (push) Has been cancelled
Signals Reachability Scoring & Events / reachability-smoke (push) Has been cancelled
AOC Guard CI / aoc-guard (push) Has been cancelled
Concelier Attestation Tests / attestation-tests (push) Has been cancelled
cryptopro-linux-csp / build-and-test (push) Has been cancelled
Scanner Analyzers / Validate Test Fixtures (push) Has been cancelled
Signals CI & Image / signals-ci (push) Has been cancelled
sm-remote-ci / build-and-test (push) Has been cancelled
Findings Ledger CI / generate-manifest (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Scanner Analyzers / Build Analyzers (push) Has been cancelled
Scanner Analyzers / Test Language Analyzers (push) Has been cancelled
Scanner Analyzers / Verify Deterministic Output (push) Has been cancelled
Signals Reachability Scoring & Events / sign-and-upload (push) Has been cancelled

This commit is contained in:
StellaOps Bot
2025-12-09 09:38:09 +02:00
parent bc0762e97d
commit 108d1c64b3
193 changed files with 7265 additions and 13029 deletions

View File

@@ -11,6 +11,7 @@ using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MongoDB.Bson;
using MongoContracts = StellaOps.Concelier.Storage.Mongo;
using StorageContracts = StellaOps.Concelier.Storage.Contracts;
using StellaOps.Concelier.Connector.Common.Http;
using StellaOps.Concelier.Connector.Common.Telemetry;
using StellaOps.Concelier.Core.Aoc;
@@ -32,6 +33,7 @@ public sealed class SourceFetchService
private readonly IHttpClientFactory _httpClientFactory;
private readonly RawDocumentStorage _rawDocumentStorage;
private readonly MongoContracts.IDocumentStore _documentStore;
private readonly StorageContracts.IStorageDocumentStore _storageDocumentStore;
private readonly ILogger<SourceFetchService> _logger;
private readonly TimeProvider _timeProvider;
private readonly IOptionsMonitor<SourceHttpClientOptions> _httpClientOptions;
@@ -46,6 +48,7 @@ public sealed class SourceFetchService
IHttpClientFactory httpClientFactory,
RawDocumentStorage rawDocumentStorage,
MongoContracts.IDocumentStore documentStore,
StorageContracts.IStorageDocumentStore storageDocumentStore,
ILogger<SourceFetchService> logger,
IJitterSource jitterSource,
IAdvisoryRawWriteGuard guard,
@@ -58,6 +61,7 @@ public sealed class SourceFetchService
_httpClientFactory = httpClientFactory ?? throw new ArgumentNullException(nameof(httpClientFactory));
_rawDocumentStorage = rawDocumentStorage ?? throw new ArgumentNullException(nameof(rawDocumentStorage));
_documentStore = documentStore ?? throw new ArgumentNullException(nameof(documentStore));
_storageDocumentStore = storageDocumentStore ?? throw new ArgumentNullException(nameof(storageDocumentStore));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_jitterSource = jitterSource ?? throw new ArgumentNullException(nameof(jitterSource));
_guard = guard ?? throw new ArgumentNullException(nameof(guard));
@@ -69,6 +73,36 @@ public sealed class SourceFetchService
_connectorVersion = typeof(SourceFetchService).Assembly.GetName().Version?.ToString() ?? "0.0.0";
}
// Backward-compatible constructor until all callers provide the storage document contract explicitly.
public SourceFetchService(
IHttpClientFactory httpClientFactory,
RawDocumentStorage rawDocumentStorage,
MongoContracts.IDocumentStore documentStore,
ILogger<SourceFetchService> logger,
IJitterSource jitterSource,
IAdvisoryRawWriteGuard guard,
IAdvisoryLinksetMapper linksetMapper,
ICryptoHash hash,
TimeProvider? timeProvider = null,
IOptionsMonitor<SourceHttpClientOptions>? httpClientOptions = null,
IOptions<MongoContracts.MongoStorageOptions>? storageOptions = null)
: this(
httpClientFactory,
rawDocumentStorage,
documentStore,
documentStore as StorageContracts.IStorageDocumentStore
?? throw new ArgumentNullException(nameof(documentStore), "Document store must implement IStorageDocumentStore"),
logger,
jitterSource,
guard,
linksetMapper,
hash,
timeProvider,
httpClientOptions,
storageOptions)
{
}
public async Task<SourceFetchResult> FetchAsync(SourceFetchRequest request, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(request);
@@ -147,7 +181,7 @@ public sealed class SourceFetchService
}
}
var existing = await _documentStore.FindBySourceAndUriAsync(request.SourceName, request.RequestUri.ToString(), cancellationToken).ConfigureAwait(false);
var existing = await _storageDocumentStore.FindBySourceAndUriAsync(request.SourceName, request.RequestUri.ToString(), cancellationToken).ConfigureAwait(false);
var recordId = existing?.Id ?? Guid.NewGuid();
var payloadId = await _rawDocumentStorage.UploadAsync(
@@ -159,7 +193,7 @@ public sealed class SourceFetchService
cancellationToken,
recordId).ConfigureAwait(false);
var record = new MongoContracts.DocumentRecord(
var record = new StorageContracts.StorageDocument(
recordId,
request.SourceName,
request.RequestUri.ToString(),
@@ -173,9 +207,10 @@ public sealed class SourceFetchService
response.Content.Headers.LastModified,
payloadId,
expiresAt,
Payload: contentBytes);
Payload: contentBytes,
FetchedAt: fetchedAt);
var upserted = await _documentStore.UpsertAsync(record, cancellationToken).ConfigureAwait(false);
var upserted = await _storageDocumentStore.UpsertAsync(record, cancellationToken).ConfigureAwait(false);
SourceDiagnostics.RecordHttpRequest(request.SourceName, request.ClientName, response.StatusCode, sendResult.Attempts, duration, contentBytes.LongLength, rateLimitRemaining);
activity?.SetStatus(ActivityStatusCode.Ok);
_logger.LogInformation("Fetched {Source} document {Uri} (sha256={Sha})", request.SourceName, request.RequestUri, contentHash);

View File

@@ -0,0 +1,76 @@
using System;
using System.Collections.Generic;
using System.Text.Json;
namespace StellaOps.Concelier.Storage.Contracts;
/// <summary>
/// Postgres-native storage document contract (Mongo-free).
/// </summary>
public sealed record StorageDocument(
Guid Id,
string SourceName,
string Uri,
DateTimeOffset CreatedAt,
string Sha256,
string Status,
string? ContentType,
IReadOnlyDictionary<string, string>? Headers,
IReadOnlyDictionary<string, string>? Metadata,
string? Etag,
DateTimeOffset? LastModified,
Guid? PayloadId,
DateTimeOffset? ExpiresAt,
byte[]? Payload,
DateTimeOffset? FetchedAt);
public interface IStorageDocumentStore
{
Task<StorageDocument?> FindBySourceAndUriAsync(string sourceName, string uri, CancellationToken cancellationToken);
Task<StorageDocument?> FindAsync(Guid id, CancellationToken cancellationToken);
Task<StorageDocument> UpsertAsync(StorageDocument record, CancellationToken cancellationToken);
Task UpdateStatusAsync(Guid id, string status, CancellationToken cancellationToken);
}
/// <summary>
/// Postgres-native DTO storage contract using JSON payloads.
/// </summary>
public sealed record StorageDto(
Guid Id,
Guid DocumentId,
string SourceName,
string Format,
JsonDocument Payload,
DateTimeOffset CreatedAt,
string SchemaVersion,
DateTimeOffset ValidatedAt);
public interface IStorageDtoStore
{
Task<StorageDto> UpsertAsync(StorageDto record, CancellationToken cancellationToken);
Task<StorageDto?> FindByDocumentIdAsync(Guid documentId, CancellationToken cancellationToken);
Task<IReadOnlyList<StorageDto>> GetBySourceAsync(string sourceName, int limit, CancellationToken cancellationToken);
}
/// <summary>
/// Cursor/state contract for ingestion sources without Mongo/Bson dependencies.
/// </summary>
public sealed record SourceCursorState(
string SourceName,
bool Enabled,
bool Paused,
JsonDocument? Cursor,
DateTimeOffset? LastSuccess,
DateTimeOffset? LastFailure,
int FailCount,
DateTimeOffset? BackoffUntil,
DateTimeOffset UpdatedAt,
string? LastFailureReason);
public interface ISourceStateStore
{
Task<SourceCursorState?> TryGetAsync(string sourceName, CancellationToken cancellationToken);
Task UpdateCursorAsync(string sourceName, JsonDocument cursor, DateTimeOffset completedAt, CancellationToken cancellationToken);
Task MarkFailureAsync(string sourceName, DateTimeOffset now, TimeSpan backoff, string reason, CancellationToken cancellationToken);
Task UpsertAsync(SourceCursorState record, CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,125 @@
using System;
using System.Text.Json;
using MongoDB.Bson;
using MongoDB.Bson.IO;
using Contracts = StellaOps.Concelier.Storage.Contracts;
using MongoContracts = StellaOps.Concelier.Storage.Mongo;
namespace StellaOps.Concelier.Storage.Postgres;
internal static class ContractsMappingExtensions
{
private static readonly JsonWriterSettings RelaxedJsonSettings = new()
{
OutputMode = JsonOutputMode.RelaxedExtendedJson
};
internal static Contracts.StorageDocument ToStorageDocument(this MongoContracts.DocumentRecord record)
{
return new Contracts.StorageDocument(
record.Id,
record.SourceName,
record.Uri,
record.CreatedAt,
record.Sha256,
record.Status,
record.ContentType,
record.Headers,
record.Metadata,
record.Etag,
record.LastModified,
record.PayloadId,
record.ExpiresAt,
record.Payload,
record.FetchedAt);
}
internal static MongoContracts.DocumentRecord ToMongoDocumentRecord(this Contracts.StorageDocument record)
{
return new MongoContracts.DocumentRecord(
record.Id,
record.SourceName,
record.Uri,
record.CreatedAt,
record.Sha256,
record.Status,
record.ContentType,
record.Headers,
record.Metadata,
record.Etag,
record.LastModified,
record.PayloadId,
record.ExpiresAt,
record.Payload,
record.FetchedAt);
}
internal static Contracts.StorageDto ToStorageDto(this MongoContracts.DtoRecord record)
{
var json = record.Payload.ToJson(RelaxedJsonSettings);
var payload = JsonDocument.Parse(json);
return new Contracts.StorageDto(
record.Id,
record.DocumentId,
record.SourceName,
record.Format,
payload,
record.CreatedAt,
record.SchemaVersion,
record.ValidatedAt);
}
internal static MongoContracts.DtoRecord ToMongoDtoRecord(this Contracts.StorageDto record)
{
var json = record.Payload.RootElement.GetRawText();
var bson = BsonDocument.Parse(json);
return new MongoContracts.DtoRecord(
record.Id,
record.DocumentId,
record.SourceName,
record.Format,
bson,
record.CreatedAt,
record.SchemaVersion,
record.ValidatedAt);
}
internal static Contracts.SourceCursorState ToStorageCursorState(this MongoContracts.SourceStateRecord record)
{
var cursorJson = record.Cursor is null ? null : record.Cursor.ToJson(RelaxedJsonSettings);
var cursor = cursorJson is null ? null : JsonDocument.Parse(cursorJson);
return new Contracts.SourceCursorState(
record.SourceName,
record.Enabled,
record.Paused,
cursor,
record.LastSuccess,
record.LastFailure,
record.FailCount,
record.BackoffUntil,
record.UpdatedAt,
record.LastFailureReason);
}
internal static MongoContracts.SourceStateRecord ToMongoSourceStateRecord(this Contracts.SourceCursorState record)
{
var bsonCursor = record.Cursor is null ? null : BsonDocument.Parse(record.Cursor.RootElement.GetRawText());
return new MongoContracts.SourceStateRecord(
record.SourceName,
record.Enabled,
record.Paused,
bsonCursor,
record.LastSuccess,
record.LastFailure,
record.FailCount,
record.BackoffUntil,
record.UpdatedAt,
record.LastFailureReason);
}
internal static BsonDocument ToBsonDocument(this JsonDocument document)
{
ArgumentNullException.ThrowIfNull(document);
return BsonDocument.Parse(document.RootElement.GetRawText());
}
}

View File

@@ -1,14 +1,15 @@
using System.Text.Json;
using StellaOps.Concelier.Storage.Mongo;
using Contracts = StellaOps.Concelier.Storage.Contracts;
using StellaOps.Concelier.Storage.Postgres.Models;
using StellaOps.Concelier.Storage.Postgres.Repositories;
namespace StellaOps.Concelier.Storage.Postgres;
/// <summary>
/// Postgres-backed implementation that satisfies the legacy IDocumentStore contract.
/// Postgres-backed implementation that satisfies the legacy IDocumentStore contract and the new Postgres-native storage contract.
/// </summary>
public sealed class PostgresDocumentStore : IDocumentStore
public sealed class PostgresDocumentStore : IDocumentStore, Contracts.IStorageDocumentStore
{
private readonly IDocumentRepository _repository;
private readonly ISourceRepository _sourceRepository;
@@ -64,6 +65,18 @@ public sealed class PostgresDocumentStore : IDocumentStore
await _repository.UpdateStatusAsync(id, status, cancellationToken).ConfigureAwait(false);
}
async Task<Contracts.StorageDocument?> Contracts.IStorageDocumentStore.FindBySourceAndUriAsync(string sourceName, string uri, CancellationToken cancellationToken)
=> (await FindBySourceAndUriAsync(sourceName, uri, cancellationToken).ConfigureAwait(false))?.ToStorageDocument();
async Task<Contracts.StorageDocument?> Contracts.IStorageDocumentStore.FindAsync(Guid id, CancellationToken cancellationToken)
=> (await FindAsync(id, cancellationToken).ConfigureAwait(false))?.ToStorageDocument();
async Task<Contracts.StorageDocument> Contracts.IStorageDocumentStore.UpsertAsync(Contracts.StorageDocument record, CancellationToken cancellationToken)
=> (await UpsertAsync(record.ToMongoDocumentRecord(), cancellationToken).ConfigureAwait(false)).ToStorageDocument();
Task Contracts.IStorageDocumentStore.UpdateStatusAsync(Guid id, string status, CancellationToken cancellationToken)
=> UpdateStatusAsync(id, status, cancellationToken);
private DocumentRecord Map(DocumentRecordEntity row)
{
return new DocumentRecord(

View File

@@ -1,10 +1,13 @@
using System.Linq;
using System.Text.Json;
using Dapper;
using StellaOps.Concelier.Storage.Mongo;
using Contracts = StellaOps.Concelier.Storage.Contracts;
using StellaOps.Concelier.Storage.Postgres;
namespace StellaOps.Concelier.Storage.Postgres.Repositories;
internal sealed class PostgresDtoStore : IDtoStore
internal sealed class PostgresDtoStore : IDtoStore, Contracts.IStorageDtoStore
{
private readonly ConcelierDataSource _dataSource;
private readonly JsonSerializerOptions _jsonOptions = new(JsonSerializerDefaults.General)
@@ -92,6 +95,17 @@ internal sealed class PostgresDtoStore : IDtoStore
row.ValidatedAt);
}
async Task<Contracts.StorageDto> Contracts.IStorageDtoStore.UpsertAsync(Contracts.StorageDto record, CancellationToken cancellationToken)
=> (await UpsertAsync(record.ToMongoDtoRecord(), cancellationToken).ConfigureAwait(false)).ToStorageDto();
async Task<Contracts.StorageDto?> Contracts.IStorageDtoStore.FindByDocumentIdAsync(Guid documentId, CancellationToken cancellationToken)
=> (await FindByDocumentIdAsync(documentId, cancellationToken).ConfigureAwait(false))?.ToStorageDto();
async Task<IReadOnlyList<Contracts.StorageDto>> Contracts.IStorageDtoStore.GetBySourceAsync(string sourceName, int limit, CancellationToken cancellationToken)
=> (await GetBySourceAsync(sourceName, limit, cancellationToken).ConfigureAwait(false))
.Select(dto => dto.ToStorageDto())
.ToArray();
private sealed record DtoRow(
Guid Id,
Guid DocumentId,

View File

@@ -4,14 +4,15 @@ using System.Collections.Generic;
using MongoDB.Bson;
using StellaOps.Concelier.Storage.Postgres.Models;
using StellaOps.Concelier.Storage.Postgres.Repositories;
using Contracts = StellaOps.Concelier.Storage.Contracts;
using MongoContracts = StellaOps.Concelier.Storage.Mongo;
namespace StellaOps.Concelier.Storage.Postgres;
/// <summary>
/// Adapter that satisfies the legacy source state contract using PostgreSQL storage.
/// Adapter that satisfies the legacy source state contract using PostgreSQL storage and provides a Postgres-native cursor contract.
/// </summary>
public sealed class PostgresSourceStateAdapter : MongoContracts.ISourceStateRepository
public sealed class PostgresSourceStateAdapter : MongoContracts.ISourceStateRepository, Contracts.ISourceStateStore
{
private readonly ISourceRepository _sourceRepository;
private readonly Repositories.ISourceStateRepository _stateRepository;
@@ -134,6 +135,18 @@ public sealed class PostgresSourceStateAdapter : MongoContracts.ISourceStateRepo
_ = await _stateRepository.UpsertAsync(entity, cancellationToken).ConfigureAwait(false);
}
async Task<Contracts.SourceCursorState?> Contracts.ISourceStateStore.TryGetAsync(string sourceName, CancellationToken cancellationToken)
=> (await TryGetAsync(sourceName, cancellationToken).ConfigureAwait(false))?.ToStorageCursorState();
Task Contracts.ISourceStateStore.UpdateCursorAsync(string sourceName, JsonDocument cursor, DateTimeOffset completedAt, CancellationToken cancellationToken)
=> UpdateCursorAsync(sourceName, cursor.ToBsonDocument(), completedAt, cancellationToken);
Task Contracts.ISourceStateStore.MarkFailureAsync(string sourceName, DateTimeOffset now, TimeSpan backoff, string reason, CancellationToken cancellationToken)
=> MarkFailureAsync(sourceName, now, backoff, reason, cancellationToken);
Task Contracts.ISourceStateStore.UpsertAsync(Contracts.SourceCursorState record, CancellationToken cancellationToken)
=> UpsertAsync(record.ToMongoSourceStateRecord(), cancellationToken);
private async Task<SourceEntity> EnsureSourceAsync(string sourceName, CancellationToken cancellationToken)
{
var existing = await _sourceRepository.GetByKeyAsync(sourceName, cancellationToken).ConfigureAwait(false);