feat: Add CVSS receipt management endpoints and related functionality
Some checks failed
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Concelier Attestation Tests / attestation-tests (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled

- Introduced new API endpoints for creating, retrieving, amending, and listing CVSS receipts.
- Updated IPolicyEngineClient interface to include methods for CVSS receipt operations.
- Implemented PolicyEngineClient to handle CVSS receipt requests.
- Enhanced Program.cs to map new CVSS receipt routes with appropriate authorization.
- Added necessary models and contracts for CVSS receipt requests and responses.
- Integrated Postgres document store for managing CVSS receipts and related data.
- Updated database schema with new migrations for source documents and payload storage.
- Refactored existing components to support new CVSS functionality.
This commit is contained in:
StellaOps Bot
2025-12-07 00:43:14 +02:00
parent 0de92144d2
commit 53889d85e7
67 changed files with 17207 additions and 16293 deletions

View File

@@ -1,366 +1,366 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MongoDB.Bson;
using StellaOps.Concelier.Connector.Common;
using StellaOps.Concelier.Connector.Common.Fetch;
using StellaOps.Concelier.Connector.Vndr.Oracle.Configuration;
using StellaOps.Concelier.Connector.Vndr.Oracle.Internal;
using StellaOps.Concelier.Storage.Mongo;
using StellaOps.Concelier.Storage.Mongo.Advisories;
using StellaOps.Concelier.Storage.Mongo.Documents;
using StellaOps.Concelier.Storage.Mongo.Dtos;
using StellaOps.Concelier.Storage.Mongo.PsirtFlags;
using StellaOps.Plugin;
namespace StellaOps.Concelier.Connector.Vndr.Oracle;
public sealed class OracleConnector : IFeedConnector
{
private static readonly JsonSerializerOptions SerializerOptions = new()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull,
};
private readonly SourceFetchService _fetchService;
private readonly RawDocumentStorage _rawDocumentStorage;
private readonly IDocumentStore _documentStore;
private readonly IDtoStore _dtoStore;
private readonly IAdvisoryStore _advisoryStore;
private readonly IPsirtFlagStore _psirtFlagStore;
private readonly ISourceStateRepository _stateRepository;
private readonly OracleCalendarFetcher _calendarFetcher;
private readonly OracleOptions _options;
private readonly TimeProvider _timeProvider;
private readonly ILogger<OracleConnector> _logger;
public OracleConnector(
SourceFetchService fetchService,
RawDocumentStorage rawDocumentStorage,
IDocumentStore documentStore,
IDtoStore dtoStore,
IAdvisoryStore advisoryStore,
IPsirtFlagStore psirtFlagStore,
ISourceStateRepository stateRepository,
OracleCalendarFetcher calendarFetcher,
IOptions<OracleOptions> options,
TimeProvider? timeProvider,
ILogger<OracleConnector> logger)
{
_fetchService = fetchService ?? throw new ArgumentNullException(nameof(fetchService));
_rawDocumentStorage = rawDocumentStorage ?? throw new ArgumentNullException(nameof(rawDocumentStorage));
_documentStore = documentStore ?? throw new ArgumentNullException(nameof(documentStore));
_dtoStore = dtoStore ?? throw new ArgumentNullException(nameof(dtoStore));
_advisoryStore = advisoryStore ?? throw new ArgumentNullException(nameof(advisoryStore));
_psirtFlagStore = psirtFlagStore ?? throw new ArgumentNullException(nameof(psirtFlagStore));
_stateRepository = stateRepository ?? throw new ArgumentNullException(nameof(stateRepository));
_calendarFetcher = calendarFetcher ?? throw new ArgumentNullException(nameof(calendarFetcher));
_options = (options ?? throw new ArgumentNullException(nameof(options))).Value ?? throw new ArgumentNullException(nameof(options));
_options.Validate();
_timeProvider = timeProvider ?? TimeProvider.System;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public string SourceName => VndrOracleConnectorPlugin.SourceName;
public async Task FetchAsync(IServiceProvider services, CancellationToken cancellationToken)
{
var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false);
var pendingDocuments = cursor.PendingDocuments.ToList();
var pendingMappings = cursor.PendingMappings.ToList();
var fetchCache = new Dictionary<string, OracleFetchCacheEntry>(cursor.FetchCache, StringComparer.OrdinalIgnoreCase);
var touchedResources = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
var now = _timeProvider.GetUtcNow();
var advisoryUris = await ResolveAdvisoryUrisAsync(cancellationToken).ConfigureAwait(false);
foreach (var uri in advisoryUris)
{
cancellationToken.ThrowIfCancellationRequested();
try
{
var cacheKey = uri.AbsoluteUri;
touchedResources.Add(cacheKey);
var advisoryId = DeriveAdvisoryId(uri);
var title = advisoryId.Replace('-', ' ');
var published = now;
var metadata = OracleDocumentMetadata.CreateMetadata(advisoryId, title, published);
var existing = await _documentStore.FindBySourceAndUriAsync(SourceName, uri.ToString(), cancellationToken).ConfigureAwait(false);
var request = new SourceFetchRequest(OracleOptions.HttpClientName, SourceName, uri)
{
Metadata = metadata,
ETag = existing?.Etag,
LastModified = existing?.LastModified,
AcceptHeaders = new[] { "text/html", "application/xhtml+xml", "text/plain;q=0.5" },
};
var result = await _fetchService.FetchAsync(request, cancellationToken).ConfigureAwait(false);
if (!result.IsSuccess || result.Document is null)
{
continue;
}
var cacheEntry = OracleFetchCacheEntry.FromDocument(result.Document);
if (existing is not null
&& string.Equals(existing.Status, DocumentStatuses.Mapped, StringComparison.Ordinal)
&& cursor.TryGetFetchCache(cacheKey, out var cached)
&& cached.Matches(result.Document))
{
_logger.LogDebug("Oracle advisory {AdvisoryId} unchanged; skipping parse/map", advisoryId);
await _documentStore.UpdateStatusAsync(result.Document.Id, existing.Status, cancellationToken).ConfigureAwait(false);
pendingDocuments.Remove(result.Document.Id);
pendingMappings.Remove(result.Document.Id);
fetchCache[cacheKey] = cacheEntry;
continue;
}
fetchCache[cacheKey] = cacheEntry;
if (!pendingDocuments.Contains(result.Document.Id))
{
pendingDocuments.Add(result.Document.Id);
}
if (_options.RequestDelay > TimeSpan.Zero)
{
await Task.Delay(_options.RequestDelay, cancellationToken).ConfigureAwait(false);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Oracle fetch failed for {Uri}", uri);
await _stateRepository.MarkFailureAsync(SourceName, _timeProvider.GetUtcNow(), TimeSpan.FromMinutes(10), ex.Message, cancellationToken).ConfigureAwait(false);
throw;
}
}
if (fetchCache.Count > 0 && touchedResources.Count > 0)
{
var stale = fetchCache.Keys.Where(key => !touchedResources.Contains(key)).ToArray();
foreach (var key in stale)
{
fetchCache.Remove(key);
}
}
var updatedCursor = cursor
.WithPendingDocuments(pendingDocuments)
.WithPendingMappings(pendingMappings)
.WithFetchCache(fetchCache)
.WithLastProcessed(now);
await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false);
}
public async Task ParseAsync(IServiceProvider services, CancellationToken cancellationToken)
{
var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false);
if (cursor.PendingDocuments.Count == 0)
{
return;
}
var pendingDocuments = cursor.PendingDocuments.ToList();
var pendingMappings = cursor.PendingMappings.ToList();
foreach (var documentId in cursor.PendingDocuments)
{
cancellationToken.ThrowIfCancellationRequested();
var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false);
if (document is null)
{
pendingDocuments.Remove(documentId);
pendingMappings.Remove(documentId);
continue;
}
if (!document.GridFsId.HasValue)
{
_logger.LogWarning("Oracle document {DocumentId} missing GridFS payload", document.Id);
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
pendingDocuments.Remove(documentId);
pendingMappings.Remove(documentId);
continue;
}
OracleDto dto;
try
{
var metadata = OracleDocumentMetadata.FromDocument(document);
var content = await _rawDocumentStorage.DownloadAsync(document.GridFsId.Value, cancellationToken).ConfigureAwait(false);
var html = System.Text.Encoding.UTF8.GetString(content);
dto = OracleParser.Parse(html, metadata);
}
catch (Exception ex)
{
_logger.LogError(ex, "Oracle parse failed for document {DocumentId}", document.Id);
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
pendingDocuments.Remove(documentId);
pendingMappings.Remove(documentId);
continue;
}
if (!OracleDtoValidator.TryNormalize(dto, out var normalized, out var validationError))
{
_logger.LogWarning("Oracle validation failed for document {DocumentId}: {Reason}", document.Id, validationError ?? "unknown");
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
pendingDocuments.Remove(documentId);
pendingMappings.Remove(documentId);
continue;
}
dto = normalized;
var json = JsonSerializer.Serialize(dto, SerializerOptions);
var payload = BsonDocument.Parse(json);
var validatedAt = _timeProvider.GetUtcNow();
var existingDto = await _dtoStore.FindByDocumentIdAsync(document.Id, cancellationToken).ConfigureAwait(false);
var dtoRecord = existingDto is null
? new DtoRecord(Guid.NewGuid(), document.Id, SourceName, "oracle.advisory.v1", payload, validatedAt)
: existingDto with
{
Payload = payload,
SchemaVersion = "oracle.advisory.v1",
ValidatedAt = validatedAt,
};
await _dtoStore.UpsertAsync(dtoRecord, cancellationToken).ConfigureAwait(false);
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.PendingMap, cancellationToken).ConfigureAwait(false);
pendingDocuments.Remove(documentId);
if (!pendingMappings.Contains(documentId))
{
pendingMappings.Add(documentId);
}
}
var updatedCursor = cursor
.WithPendingDocuments(pendingDocuments)
.WithPendingMappings(pendingMappings);
await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false);
}
public async Task MapAsync(IServiceProvider services, CancellationToken cancellationToken)
{
var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false);
if (cursor.PendingMappings.Count == 0)
{
return;
}
var pendingMappings = cursor.PendingMappings.ToList();
foreach (var documentId in cursor.PendingMappings)
{
cancellationToken.ThrowIfCancellationRequested();
var dtoRecord = await _dtoStore.FindByDocumentIdAsync(documentId, cancellationToken).ConfigureAwait(false);
var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false);
if (dtoRecord is null || document is null)
{
pendingMappings.Remove(documentId);
continue;
}
OracleDto? dto;
try
{
var json = dtoRecord.Payload.ToJson();
dto = JsonSerializer.Deserialize<OracleDto>(json, SerializerOptions);
}
catch (Exception ex)
{
_logger.LogError(ex, "Oracle DTO deserialization failed for document {DocumentId}", documentId);
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
pendingMappings.Remove(documentId);
continue;
}
if (dto is null)
{
_logger.LogWarning("Oracle DTO payload deserialized as null for document {DocumentId}", documentId);
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
pendingMappings.Remove(documentId);
continue;
}
var mappedAt = _timeProvider.GetUtcNow();
var (advisory, flag) = OracleMapper.Map(dto, document, dtoRecord, SourceName, mappedAt);
await _advisoryStore.UpsertAsync(advisory, cancellationToken).ConfigureAwait(false);
await _psirtFlagStore.UpsertAsync(flag, cancellationToken).ConfigureAwait(false);
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Mapped, cancellationToken).ConfigureAwait(false);
pendingMappings.Remove(documentId);
}
var updatedCursor = cursor.WithPendingMappings(pendingMappings);
await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false);
}
private async Task<OracleCursor> GetCursorAsync(CancellationToken cancellationToken)
{
var record = await _stateRepository.TryGetAsync(SourceName, cancellationToken).ConfigureAwait(false);
return OracleCursor.FromBson(record?.Cursor);
}
private async Task UpdateCursorAsync(OracleCursor cursor, CancellationToken cancellationToken)
{
var completedAt = _timeProvider.GetUtcNow();
await _stateRepository.UpdateCursorAsync(SourceName, cursor.ToBsonDocument(), completedAt, cancellationToken).ConfigureAwait(false);
}
private async Task<IReadOnlyCollection<Uri>> ResolveAdvisoryUrisAsync(CancellationToken cancellationToken)
{
var uris = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
foreach (var uri in _options.AdvisoryUris)
{
if (uri is not null)
{
uris.Add(uri.AbsoluteUri);
}
}
var calendarUris = await _calendarFetcher.GetAdvisoryUrisAsync(cancellationToken).ConfigureAwait(false);
foreach (var uri in calendarUris)
{
uris.Add(uri.AbsoluteUri);
}
return uris
.Select(static value => new Uri(value, UriKind.Absolute))
.OrderBy(static value => value.AbsoluteUri, StringComparer.OrdinalIgnoreCase)
.ToArray();
}
private static string DeriveAdvisoryId(Uri uri)
{
var segments = uri.Segments;
if (segments.Length == 0)
{
return uri.AbsoluteUri;
}
var slug = segments[^1].Trim('/');
if (string.IsNullOrWhiteSpace(slug))
{
return uri.AbsoluteUri;
}
return slug.Replace('.', '-');
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MongoDB.Bson;
using StellaOps.Concelier.Connector.Common;
using StellaOps.Concelier.Connector.Common.Fetch;
using StellaOps.Concelier.Connector.Vndr.Oracle.Configuration;
using StellaOps.Concelier.Connector.Vndr.Oracle.Internal;
using StellaOps.Concelier.Storage.Mongo;
using StellaOps.Concelier.Storage.Mongo.Advisories;
using StellaOps.Concelier.Storage.Mongo.Documents;
using StellaOps.Concelier.Storage.Mongo.Dtos;
using StellaOps.Concelier.Storage.Mongo.PsirtFlags;
using StellaOps.Plugin;
namespace StellaOps.Concelier.Connector.Vndr.Oracle;
public sealed class OracleConnector : IFeedConnector
{
private static readonly JsonSerializerOptions SerializerOptions = new()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull,
};
private readonly SourceFetchService _fetchService;
private readonly RawDocumentStorage _rawDocumentStorage;
private readonly IDocumentStore _documentStore;
private readonly IDtoStore _dtoStore;
private readonly IAdvisoryStore _advisoryStore;
private readonly IPsirtFlagStore _psirtFlagStore;
private readonly ISourceStateRepository _stateRepository;
private readonly OracleCalendarFetcher _calendarFetcher;
private readonly OracleOptions _options;
private readonly TimeProvider _timeProvider;
private readonly ILogger<OracleConnector> _logger;
public OracleConnector(
SourceFetchService fetchService,
RawDocumentStorage rawDocumentStorage,
IDocumentStore documentStore,
IDtoStore dtoStore,
IAdvisoryStore advisoryStore,
IPsirtFlagStore psirtFlagStore,
ISourceStateRepository stateRepository,
OracleCalendarFetcher calendarFetcher,
IOptions<OracleOptions> options,
TimeProvider? timeProvider,
ILogger<OracleConnector> logger)
{
_fetchService = fetchService ?? throw new ArgumentNullException(nameof(fetchService));
_rawDocumentStorage = rawDocumentStorage ?? throw new ArgumentNullException(nameof(rawDocumentStorage));
_documentStore = documentStore ?? throw new ArgumentNullException(nameof(documentStore));
_dtoStore = dtoStore ?? throw new ArgumentNullException(nameof(dtoStore));
_advisoryStore = advisoryStore ?? throw new ArgumentNullException(nameof(advisoryStore));
_psirtFlagStore = psirtFlagStore ?? throw new ArgumentNullException(nameof(psirtFlagStore));
_stateRepository = stateRepository ?? throw new ArgumentNullException(nameof(stateRepository));
_calendarFetcher = calendarFetcher ?? throw new ArgumentNullException(nameof(calendarFetcher));
_options = (options ?? throw new ArgumentNullException(nameof(options))).Value ?? throw new ArgumentNullException(nameof(options));
_options.Validate();
_timeProvider = timeProvider ?? TimeProvider.System;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public string SourceName => VndrOracleConnectorPlugin.SourceName;
public async Task FetchAsync(IServiceProvider services, CancellationToken cancellationToken)
{
var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false);
var pendingDocuments = cursor.PendingDocuments.ToList();
var pendingMappings = cursor.PendingMappings.ToList();
var fetchCache = new Dictionary<string, OracleFetchCacheEntry>(cursor.FetchCache, StringComparer.OrdinalIgnoreCase);
var touchedResources = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
var now = _timeProvider.GetUtcNow();
var advisoryUris = await ResolveAdvisoryUrisAsync(cancellationToken).ConfigureAwait(false);
foreach (var uri in advisoryUris)
{
cancellationToken.ThrowIfCancellationRequested();
try
{
var cacheKey = uri.AbsoluteUri;
touchedResources.Add(cacheKey);
var advisoryId = DeriveAdvisoryId(uri);
var title = advisoryId.Replace('-', ' ');
var published = now;
var metadata = OracleDocumentMetadata.CreateMetadata(advisoryId, title, published);
var existing = await _documentStore.FindBySourceAndUriAsync(SourceName, uri.ToString(), cancellationToken).ConfigureAwait(false);
var request = new SourceFetchRequest(OracleOptions.HttpClientName, SourceName, uri)
{
Metadata = metadata,
ETag = existing?.Etag,
LastModified = existing?.LastModified,
AcceptHeaders = new[] { "text/html", "application/xhtml+xml", "text/plain;q=0.5" },
};
var result = await _fetchService.FetchAsync(request, cancellationToken).ConfigureAwait(false);
if (!result.IsSuccess || result.Document is null)
{
continue;
}
var cacheEntry = OracleFetchCacheEntry.FromDocument(result.Document);
if (existing is not null
&& string.Equals(existing.Status, DocumentStatuses.Mapped, StringComparison.Ordinal)
&& cursor.TryGetFetchCache(cacheKey, out var cached)
&& cached.Matches(result.Document))
{
_logger.LogDebug("Oracle advisory {AdvisoryId} unchanged; skipping parse/map", advisoryId);
await _documentStore.UpdateStatusAsync(result.Document.Id, existing.Status, cancellationToken).ConfigureAwait(false);
pendingDocuments.Remove(result.Document.Id);
pendingMappings.Remove(result.Document.Id);
fetchCache[cacheKey] = cacheEntry;
continue;
}
fetchCache[cacheKey] = cacheEntry;
if (!pendingDocuments.Contains(result.Document.Id))
{
pendingDocuments.Add(result.Document.Id);
}
if (_options.RequestDelay > TimeSpan.Zero)
{
await Task.Delay(_options.RequestDelay, cancellationToken).ConfigureAwait(false);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Oracle fetch failed for {Uri}", uri);
await _stateRepository.MarkFailureAsync(SourceName, _timeProvider.GetUtcNow(), TimeSpan.FromMinutes(10), ex.Message, cancellationToken).ConfigureAwait(false);
throw;
}
}
if (fetchCache.Count > 0 && touchedResources.Count > 0)
{
var stale = fetchCache.Keys.Where(key => !touchedResources.Contains(key)).ToArray();
foreach (var key in stale)
{
fetchCache.Remove(key);
}
}
var updatedCursor = cursor
.WithPendingDocuments(pendingDocuments)
.WithPendingMappings(pendingMappings)
.WithFetchCache(fetchCache)
.WithLastProcessed(now);
await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false);
}
public async Task ParseAsync(IServiceProvider services, CancellationToken cancellationToken)
{
var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false);
if (cursor.PendingDocuments.Count == 0)
{
return;
}
var pendingDocuments = cursor.PendingDocuments.ToList();
var pendingMappings = cursor.PendingMappings.ToList();
foreach (var documentId in cursor.PendingDocuments)
{
cancellationToken.ThrowIfCancellationRequested();
var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false);
if (document is null)
{
pendingDocuments.Remove(documentId);
pendingMappings.Remove(documentId);
continue;
}
if (!document.PayloadId.HasValue)
{
_logger.LogWarning("Oracle document {DocumentId} missing GridFS payload", document.Id);
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
pendingDocuments.Remove(documentId);
pendingMappings.Remove(documentId);
continue;
}
OracleDto dto;
try
{
var metadata = OracleDocumentMetadata.FromDocument(document);
var content = await _rawDocumentStorage.DownloadAsync(document.PayloadId.Value, cancellationToken).ConfigureAwait(false);
var html = System.Text.Encoding.UTF8.GetString(content);
dto = OracleParser.Parse(html, metadata);
}
catch (Exception ex)
{
_logger.LogError(ex, "Oracle parse failed for document {DocumentId}", document.Id);
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
pendingDocuments.Remove(documentId);
pendingMappings.Remove(documentId);
continue;
}
if (!OracleDtoValidator.TryNormalize(dto, out var normalized, out var validationError))
{
_logger.LogWarning("Oracle validation failed for document {DocumentId}: {Reason}", document.Id, validationError ?? "unknown");
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
pendingDocuments.Remove(documentId);
pendingMappings.Remove(documentId);
continue;
}
dto = normalized;
var json = JsonSerializer.Serialize(dto, SerializerOptions);
var payload = BsonDocument.Parse(json);
var validatedAt = _timeProvider.GetUtcNow();
var existingDto = await _dtoStore.FindByDocumentIdAsync(document.Id, cancellationToken).ConfigureAwait(false);
var dtoRecord = existingDto is null
? new DtoRecord(Guid.NewGuid(), document.Id, SourceName, "oracle.advisory.v1", payload, validatedAt)
: existingDto with
{
Payload = payload,
SchemaVersion = "oracle.advisory.v1",
ValidatedAt = validatedAt,
};
await _dtoStore.UpsertAsync(dtoRecord, cancellationToken).ConfigureAwait(false);
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.PendingMap, cancellationToken).ConfigureAwait(false);
pendingDocuments.Remove(documentId);
if (!pendingMappings.Contains(documentId))
{
pendingMappings.Add(documentId);
}
}
var updatedCursor = cursor
.WithPendingDocuments(pendingDocuments)
.WithPendingMappings(pendingMappings);
await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false);
}
public async Task MapAsync(IServiceProvider services, CancellationToken cancellationToken)
{
var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false);
if (cursor.PendingMappings.Count == 0)
{
return;
}
var pendingMappings = cursor.PendingMappings.ToList();
foreach (var documentId in cursor.PendingMappings)
{
cancellationToken.ThrowIfCancellationRequested();
var dtoRecord = await _dtoStore.FindByDocumentIdAsync(documentId, cancellationToken).ConfigureAwait(false);
var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false);
if (dtoRecord is null || document is null)
{
pendingMappings.Remove(documentId);
continue;
}
OracleDto? dto;
try
{
var json = dtoRecord.Payload.ToJson();
dto = JsonSerializer.Deserialize<OracleDto>(json, SerializerOptions);
}
catch (Exception ex)
{
_logger.LogError(ex, "Oracle DTO deserialization failed for document {DocumentId}", documentId);
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
pendingMappings.Remove(documentId);
continue;
}
if (dto is null)
{
_logger.LogWarning("Oracle DTO payload deserialized as null for document {DocumentId}", documentId);
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
pendingMappings.Remove(documentId);
continue;
}
var mappedAt = _timeProvider.GetUtcNow();
var (advisory, flag) = OracleMapper.Map(dto, document, dtoRecord, SourceName, mappedAt);
await _advisoryStore.UpsertAsync(advisory, cancellationToken).ConfigureAwait(false);
await _psirtFlagStore.UpsertAsync(flag, cancellationToken).ConfigureAwait(false);
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Mapped, cancellationToken).ConfigureAwait(false);
pendingMappings.Remove(documentId);
}
var updatedCursor = cursor.WithPendingMappings(pendingMappings);
await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false);
}
private async Task<OracleCursor> GetCursorAsync(CancellationToken cancellationToken)
{
var record = await _stateRepository.TryGetAsync(SourceName, cancellationToken).ConfigureAwait(false);
return OracleCursor.FromBson(record?.Cursor);
}
private async Task UpdateCursorAsync(OracleCursor cursor, CancellationToken cancellationToken)
{
var completedAt = _timeProvider.GetUtcNow();
await _stateRepository.UpdateCursorAsync(SourceName, cursor.ToBsonDocument(), completedAt, cancellationToken).ConfigureAwait(false);
}
private async Task<IReadOnlyCollection<Uri>> ResolveAdvisoryUrisAsync(CancellationToken cancellationToken)
{
var uris = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
foreach (var uri in _options.AdvisoryUris)
{
if (uri is not null)
{
uris.Add(uri.AbsoluteUri);
}
}
var calendarUris = await _calendarFetcher.GetAdvisoryUrisAsync(cancellationToken).ConfigureAwait(false);
foreach (var uri in calendarUris)
{
uris.Add(uri.AbsoluteUri);
}
return uris
.Select(static value => new Uri(value, UriKind.Absolute))
.OrderBy(static value => value.AbsoluteUri, StringComparer.OrdinalIgnoreCase)
.ToArray();
}
private static string DeriveAdvisoryId(Uri uri)
{
var segments = uri.Segments;
if (segments.Length == 0)
{
return uri.AbsoluteUri;
}
var slug = segments[^1].Trim('/');
if (string.IsNullOrWhiteSpace(slug))
{
return uri.AbsoluteUri;
}
return slug.Replace('.', '-');
}
}