up
This commit is contained in:
@@ -1,293 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text.Json;
|
|
||||||
using Microsoft.Extensions.Logging;
|
|
||||||
using Microsoft.Extensions.Options;
|
|
||||||
using MongoDB.Bson;
|
|
||||||
using StellaOps.Feedser.Source.Common;
|
|
||||||
using StellaOps.Feedser.Source.Common.Fetch;
|
|
||||||
using StellaOps.Feedser.Source.Vndr.Oracle.Configuration;
|
|
||||||
using StellaOps.Feedser.Source.Vndr.Oracle.Internal;
|
|
||||||
using StellaOps.Feedser.Storage.Mongo;
|
|
||||||
using StellaOps.Feedser.Storage.Mongo.Advisories;
|
|
||||||
using StellaOps.Feedser.Storage.Mongo.Documents;
|
|
||||||
using StellaOps.Feedser.Storage.Mongo.Dtos;
|
|
||||||
using StellaOps.Feedser.Storage.Mongo.PsirtFlags;
|
|
||||||
using StellaOps.Plugin;
|
|
||||||
|
|
||||||
namespace StellaOps.Feedser.Source.Vndr.Oracle;
|
|
||||||
|
|
||||||
public sealed class OracleConnector : IFeedConnector
|
|
||||||
{
|
|
||||||
private static readonly JsonSerializerOptions SerializerOptions = new()
|
|
||||||
{
|
|
||||||
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
|
|
||||||
DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull,
|
|
||||||
};
|
|
||||||
|
|
||||||
private readonly SourceFetchService _fetchService;
|
|
||||||
private readonly RawDocumentStorage _rawDocumentStorage;
|
|
||||||
private readonly IDocumentStore _documentStore;
|
|
||||||
private readonly IDtoStore _dtoStore;
|
|
||||||
private readonly IAdvisoryStore _advisoryStore;
|
|
||||||
private readonly IPsirtFlagStore _psirtFlagStore;
|
|
||||||
private readonly ISourceStateRepository _stateRepository;
|
|
||||||
private readonly OracleOptions _options;
|
|
||||||
private readonly TimeProvider _timeProvider;
|
|
||||||
private readonly ILogger<OracleConnector> _logger;
|
|
||||||
|
|
||||||
public OracleConnector(
|
|
||||||
SourceFetchService fetchService,
|
|
||||||
RawDocumentStorage rawDocumentStorage,
|
|
||||||
IDocumentStore documentStore,
|
|
||||||
IDtoStore dtoStore,
|
|
||||||
IAdvisoryStore advisoryStore,
|
|
||||||
IPsirtFlagStore psirtFlagStore,
|
|
||||||
ISourceStateRepository stateRepository,
|
|
||||||
IOptions<OracleOptions> options,
|
|
||||||
TimeProvider? timeProvider,
|
|
||||||
ILogger<OracleConnector> logger)
|
|
||||||
{
|
|
||||||
_fetchService = fetchService ?? throw new ArgumentNullException(nameof(fetchService));
|
|
||||||
_rawDocumentStorage = rawDocumentStorage ?? throw new ArgumentNullException(nameof(rawDocumentStorage));
|
|
||||||
_documentStore = documentStore ?? throw new ArgumentNullException(nameof(documentStore));
|
|
||||||
_dtoStore = dtoStore ?? throw new ArgumentNullException(nameof(dtoStore));
|
|
||||||
_advisoryStore = advisoryStore ?? throw new ArgumentNullException(nameof(advisoryStore));
|
|
||||||
_psirtFlagStore = psirtFlagStore ?? throw new ArgumentNullException(nameof(psirtFlagStore));
|
|
||||||
_stateRepository = stateRepository ?? throw new ArgumentNullException(nameof(stateRepository));
|
|
||||||
_options = (options ?? throw new ArgumentNullException(nameof(options))).Value ?? throw new ArgumentNullException(nameof(options));
|
|
||||||
_options.Validate();
|
|
||||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
|
||||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
||||||
}
|
|
||||||
|
|
||||||
public string SourceName => VndrOracleConnectorPlugin.SourceName;
|
|
||||||
|
|
||||||
public async Task FetchAsync(IServiceProvider services, CancellationToken cancellationToken)
|
|
||||||
{
|
|
||||||
var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false);
|
|
||||||
var pendingDocuments = cursor.PendingDocuments.ToList();
|
|
||||||
var pendingMappings = cursor.PendingMappings.ToList();
|
|
||||||
var now = _timeProvider.GetUtcNow();
|
|
||||||
|
|
||||||
foreach (var uri in _options.AdvisoryUris)
|
|
||||||
{
|
|
||||||
cancellationToken.ThrowIfCancellationRequested();
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
var advisoryId = DeriveAdvisoryId(uri);
|
|
||||||
var title = advisoryId.Replace('-', ' ');
|
|
||||||
var published = now;
|
|
||||||
|
|
||||||
var metadata = OracleDocumentMetadata.CreateMetadata(advisoryId, title, published);
|
|
||||||
var existing = await _documentStore.FindBySourceAndUriAsync(SourceName, uri.ToString(), cancellationToken).ConfigureAwait(false);
|
|
||||||
|
|
||||||
var request = new SourceFetchRequest(OracleOptions.HttpClientName, SourceName, uri)
|
|
||||||
{
|
|
||||||
Metadata = metadata,
|
|
||||||
ETag = existing?.Etag,
|
|
||||||
LastModified = existing?.LastModified,
|
|
||||||
AcceptHeaders = new[] { "text/html", "application/xhtml+xml", "text/plain;q=0.5" },
|
|
||||||
};
|
|
||||||
|
|
||||||
var result = await _fetchService.FetchAsync(request, cancellationToken).ConfigureAwait(false);
|
|
||||||
if (!result.IsSuccess || result.Document is null)
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!pendingDocuments.Contains(result.Document.Id))
|
|
||||||
{
|
|
||||||
pendingDocuments.Add(result.Document.Id);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_options.RequestDelay > TimeSpan.Zero)
|
|
||||||
{
|
|
||||||
await Task.Delay(_options.RequestDelay, cancellationToken).ConfigureAwait(false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, "Oracle fetch failed for {Uri}", uri);
|
|
||||||
await _stateRepository.MarkFailureAsync(SourceName, _timeProvider.GetUtcNow(), TimeSpan.FromMinutes(10), ex.Message, cancellationToken).ConfigureAwait(false);
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var updatedCursor = cursor
|
|
||||||
.WithPendingDocuments(pendingDocuments)
|
|
||||||
.WithPendingMappings(pendingMappings)
|
|
||||||
.WithLastProcessed(now);
|
|
||||||
|
|
||||||
await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task ParseAsync(IServiceProvider services, CancellationToken cancellationToken)
|
|
||||||
{
|
|
||||||
var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false);
|
|
||||||
if (cursor.PendingDocuments.Count == 0)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
var pendingDocuments = cursor.PendingDocuments.ToList();
|
|
||||||
var pendingMappings = cursor.PendingMappings.ToList();
|
|
||||||
|
|
||||||
foreach (var documentId in cursor.PendingDocuments)
|
|
||||||
{
|
|
||||||
cancellationToken.ThrowIfCancellationRequested();
|
|
||||||
|
|
||||||
var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false);
|
|
||||||
if (document is null)
|
|
||||||
{
|
|
||||||
pendingDocuments.Remove(documentId);
|
|
||||||
pendingMappings.Remove(documentId);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!document.GridFsId.HasValue)
|
|
||||||
{
|
|
||||||
_logger.LogWarning("Oracle document {DocumentId} missing GridFS payload", document.Id);
|
|
||||||
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
|
|
||||||
pendingDocuments.Remove(documentId);
|
|
||||||
pendingMappings.Remove(documentId);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
OracleDto dto;
|
|
||||||
try
|
|
||||||
{
|
|
||||||
var metadata = OracleDocumentMetadata.FromDocument(document);
|
|
||||||
var content = await _rawDocumentStorage.DownloadAsync(document.GridFsId.Value, cancellationToken).ConfigureAwait(false);
|
|
||||||
var html = System.Text.Encoding.UTF8.GetString(content);
|
|
||||||
dto = OracleParser.Parse(html, metadata);
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, "Oracle parse failed for document {DocumentId}", document.Id);
|
|
||||||
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
|
|
||||||
pendingDocuments.Remove(documentId);
|
|
||||||
pendingMappings.Remove(documentId);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
var json = JsonSerializer.Serialize(dto, SerializerOptions);
|
|
||||||
var payload = BsonDocument.Parse(json);
|
|
||||||
var validatedAt = _timeProvider.GetUtcNow();
|
|
||||||
|
|
||||||
var existingDto = await _dtoStore.FindByDocumentIdAsync(document.Id, cancellationToken).ConfigureAwait(false);
|
|
||||||
var dtoRecord = existingDto is null
|
|
||||||
? new DtoRecord(Guid.NewGuid(), document.Id, SourceName, "oracle.advisory.v1", payload, validatedAt)
|
|
||||||
: existingDto with
|
|
||||||
{
|
|
||||||
Payload = payload,
|
|
||||||
SchemaVersion = "oracle.advisory.v1",
|
|
||||||
ValidatedAt = validatedAt,
|
|
||||||
};
|
|
||||||
|
|
||||||
await _dtoStore.UpsertAsync(dtoRecord, cancellationToken).ConfigureAwait(false);
|
|
||||||
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.PendingMap, cancellationToken).ConfigureAwait(false);
|
|
||||||
|
|
||||||
pendingDocuments.Remove(documentId);
|
|
||||||
if (!pendingMappings.Contains(documentId))
|
|
||||||
{
|
|
||||||
pendingMappings.Add(documentId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var updatedCursor = cursor
|
|
||||||
.WithPendingDocuments(pendingDocuments)
|
|
||||||
.WithPendingMappings(pendingMappings);
|
|
||||||
|
|
||||||
await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task MapAsync(IServiceProvider services, CancellationToken cancellationToken)
|
|
||||||
{
|
|
||||||
var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false);
|
|
||||||
if (cursor.PendingMappings.Count == 0)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
var pendingMappings = cursor.PendingMappings.ToList();
|
|
||||||
|
|
||||||
foreach (var documentId in cursor.PendingMappings)
|
|
||||||
{
|
|
||||||
cancellationToken.ThrowIfCancellationRequested();
|
|
||||||
|
|
||||||
var dtoRecord = await _dtoStore.FindByDocumentIdAsync(documentId, cancellationToken).ConfigureAwait(false);
|
|
||||||
var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false);
|
|
||||||
|
|
||||||
if (dtoRecord is null || document is null)
|
|
||||||
{
|
|
||||||
pendingMappings.Remove(documentId);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
OracleDto? dto;
|
|
||||||
try
|
|
||||||
{
|
|
||||||
var json = dtoRecord.Payload.ToJson();
|
|
||||||
dto = JsonSerializer.Deserialize<OracleDto>(json, SerializerOptions);
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, "Oracle DTO deserialization failed for document {DocumentId}", documentId);
|
|
||||||
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
|
|
||||||
pendingMappings.Remove(documentId);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dto is null)
|
|
||||||
{
|
|
||||||
_logger.LogWarning("Oracle DTO payload deserialized as null for document {DocumentId}", documentId);
|
|
||||||
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
|
|
||||||
pendingMappings.Remove(documentId);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
var mappedAt = _timeProvider.GetUtcNow();
|
|
||||||
var (advisory, flag) = OracleMapper.Map(dto, SourceName, mappedAt);
|
|
||||||
await _advisoryStore.UpsertAsync(advisory, cancellationToken).ConfigureAwait(false);
|
|
||||||
await _psirtFlagStore.UpsertAsync(flag, cancellationToken).ConfigureAwait(false);
|
|
||||||
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Mapped, cancellationToken).ConfigureAwait(false);
|
|
||||||
|
|
||||||
pendingMappings.Remove(documentId);
|
|
||||||
}
|
|
||||||
|
|
||||||
var updatedCursor = cursor.WithPendingMappings(pendingMappings);
|
|
||||||
await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
private async Task<OracleCursor> GetCursorAsync(CancellationToken cancellationToken)
|
|
||||||
{
|
|
||||||
var record = await _stateRepository.TryGetAsync(SourceName, cancellationToken).ConfigureAwait(false);
|
|
||||||
return OracleCursor.FromBson(record?.Cursor);
|
|
||||||
}
|
|
||||||
|
|
||||||
private async Task UpdateCursorAsync(OracleCursor cursor, CancellationToken cancellationToken)
|
|
||||||
{
|
|
||||||
var completedAt = _timeProvider.GetUtcNow();
|
|
||||||
await _stateRepository.UpdateCursorAsync(SourceName, cursor.ToBsonDocument(), completedAt, cancellationToken).ConfigureAwait(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static string DeriveAdvisoryId(Uri uri)
|
|
||||||
{
|
|
||||||
var segments = uri.Segments;
|
|
||||||
if (segments.Length == 0)
|
|
||||||
{
|
|
||||||
return uri.AbsoluteUri;
|
|
||||||
}
|
|
||||||
|
|
||||||
var slug = segments[^1].Trim('/');
|
|
||||||
if (string.IsNullOrWhiteSpace(slug))
|
|
||||||
{
|
|
||||||
return uri.AbsoluteUri;
|
|
||||||
}
|
|
||||||
|
|
||||||
return slug.Replace('.', '-');
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,21 +0,0 @@
|
|||||||
using System;
|
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
|
||||||
using StellaOps.Plugin;
|
|
||||||
|
|
||||||
namespace StellaOps.Feedser.Source.Vndr.Oracle;
|
|
||||||
|
|
||||||
public sealed class VndrOracleConnectorPlugin : IConnectorPlugin
|
|
||||||
{
|
|
||||||
public const string SourceName = "vndr-oracle";
|
|
||||||
|
|
||||||
public string Name => SourceName;
|
|
||||||
|
|
||||||
public bool IsAvailable(IServiceProvider services)
|
|
||||||
=> services.GetService<OracleConnector>() is not null;
|
|
||||||
|
|
||||||
public IFeedConnector Create(IServiceProvider services)
|
|
||||||
{
|
|
||||||
ArgumentNullException.ThrowIfNull(services);
|
|
||||||
return services.GetRequiredService<OracleConnector>();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -20,4 +20,5 @@
|
|||||||
|Author ops guidance for resilience tuning|Docs/Feedser|Plumb Authority client resilience options|**TODO** – Extend operator/quickstart docs with recommended retry profiles, offline-tolerance guidance, and monitoring cues.|
|
|Author ops guidance for resilience tuning|Docs/Feedser|Plumb Authority client resilience options|**TODO** – Extend operator/quickstart docs with recommended retry profiles, offline-tolerance guidance, and monitoring cues.|
|
||||||
|Document authority bypass logging patterns|Docs/Feedser|FSR3 logging|**TODO** – Capture new audit log fields (bypass, remote IP, subject) in operator docs and add troubleshooting guidance for cron bypasses.|
|
|Document authority bypass logging patterns|Docs/Feedser|FSR3 logging|**TODO** – Capture new audit log fields (bypass, remote IP, subject) in operator docs and add troubleshooting guidance for cron bypasses.|
|
||||||
|Update Feedser operator guide for enforcement cutoff|Docs/Feedser|FSR1 rollout|**TODO** – Add `allowAnonymousFallback` sunset timeline and checklist to operator guide / runbooks before 2025-12-31 enforcement.|
|
|Update Feedser operator guide for enforcement cutoff|Docs/Feedser|FSR1 rollout|**TODO** – Add `allowAnonymousFallback` sunset timeline and checklist to operator guide / runbooks before 2025-12-31 enforcement.|
|
||||||
|
|Rename plugin drop directory to namespaced path|BE-Base|Plugins|**TODO** – Point Feedser source/exporter build outputs to `StellaOps.Feedser.PluginBinaries`, update PluginHost defaults/search patterns to match, ensure Offline Kit packaging/tests expect the new folder, and document migration guidance for operators.|
|
||||||
|Authority resilience adoption|Feedser WebService, Docs|Plumb Authority client resilience options|**BLOCKED (2025-10-10)** – Roll out retry/offline knobs to deployment docs and confirm CLI parity once LIB5 lands; unblock after resilience options wired and tested.|
|
|Authority resilience adoption|Feedser WebService, Docs|Plumb Authority client resilience options|**BLOCKED (2025-10-10)** – Roll out retry/offline knobs to deployment docs and confirm CLI parity once LIB5 lands; unblock after resilience options wired and tested.|
|
||||||
|
|||||||
Reference in New Issue
Block a user