Initial commit (history squashed)

This commit is contained in:
2025-10-07 10:14:21 +03:00
committed by Vladimir Moushkov
commit 6cbfd47ecd
621 changed files with 54480 additions and 0 deletions

View File

@@ -0,0 +1,29 @@
# AGENTS
## Role
Canonical persistence for raw documents, DTOs, canonical advisories, jobs, and state. Provides repositories and bootstrapper for collections/indexes.
## Scope
- Collections (MongoStorageDefaults): source, source_state, document, dto, advisory, alias, affected, reference, kev_flag, ru_flags, jp_flags, psirt_flags, merge_event, export_state, locks, jobs; GridFS bucket fs.documents; field names include ttlAt (locks), sourceName, uri, advisoryKey.
- Records: SourceState (cursor, lastSuccess/error, failCount, backoffUntil), JobRun, MergeEvent, ExportState, Advisory documents mirroring Models with embedded arrays when practical.
- Bootstrapper: create collections, indexes (unique advisoryKey, scheme/value, platform/name, published, modified), TTL on locks, and validate connectivity for /ready health probes.
- Job store: create, read, mark completed/failed; compute durations; recent/last queries; active by status.
- Advisory store: CRUD for canonical advisories; query by key/alias and list for exporters with deterministic paging.
## Participants
- Core jobs read/write runs and leases; WebService /ready pings database; /jobs APIs query runs/definitions.
- Source connectors store raw docs, DTOs, and mapped canonical advisories with provenance; Update SourceState cursor/backoff.
- Exporters read advisories and write export_state.
## Interfaces & contracts
- IMongoDatabase injected; MongoUrl from options; database name from options or MongoUrl or default "feedser".
- Repositories expose async methods with CancellationToken; deterministic sorting.
- All date/time values stored as UTC; identifiers normalized.
## In/Out of scope
In: persistence, bootstrap, indexes, basic query helpers.
Out: business mapping logic, HTTP, packaging.
## Observability & security expectations
- Log collection/index creation; warn on existing mismatches.
- Timeouts and retry policies; avoid unbounded scans; page reads.
- Do not log DSNs with credentials; redact in diagnostics.
## Tests
- Author and review coverage in `../StellaOps.Feedser.Storage.Mongo.Tests`.
- Shared fixtures (e.g., `MongoIntegrationFixture`, `ConnectorTestHarness`) live in `../StellaOps.Feedser.Testing`.
- Keep fixtures deterministic; match new cases to real-world advisories or regression scenarios.

View File

@@ -0,0 +1,27 @@
using MongoDB.Bson;
using MongoDB.Bson.Serialization.Attributes;
namespace StellaOps.Feedser.Storage.Mongo.Advisories;
[BsonIgnoreExtraElements]
public sealed class AdvisoryDocument
{
[BsonId]
public string Id { get; set; } = string.Empty;
[BsonElement("advisoryKey")]
public string AdvisoryKey
{
get => Id;
set => Id = value;
}
[BsonElement("payload")]
public BsonDocument Payload { get; set; } = new();
[BsonElement("modified")]
public DateTime Modified { get; set; }
[BsonElement("published")]
public DateTime? Published { get; set; }
}

View File

@@ -0,0 +1,245 @@
using System.Runtime.CompilerServices;
using System.Text.Json;
using System.Text.Json.Serialization;
using Microsoft.Extensions.Logging;
using MongoDB.Bson;
using MongoDB.Driver;
using StellaOps.Feedser.Models;
namespace StellaOps.Feedser.Storage.Mongo.Advisories;
public sealed class AdvisoryStore : IAdvisoryStore
{
private readonly IMongoCollection<AdvisoryDocument> _collection;
private readonly ILogger<AdvisoryStore> _logger;
public AdvisoryStore(IMongoDatabase database, ILogger<AdvisoryStore> logger)
{
_collection = (database ?? throw new ArgumentNullException(nameof(database)))
.GetCollection<AdvisoryDocument>(MongoStorageDefaults.Collections.Advisory);
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task UpsertAsync(Advisory advisory, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(advisory);
var payload = CanonicalJsonSerializer.Serialize(advisory);
var document = new AdvisoryDocument
{
AdvisoryKey = advisory.AdvisoryKey,
Payload = BsonDocument.Parse(payload),
Modified = advisory.Modified?.UtcDateTime ?? DateTime.UtcNow,
Published = advisory.Published?.UtcDateTime,
};
var options = new ReplaceOptions { IsUpsert = true };
await _collection.ReplaceOneAsync(x => x.AdvisoryKey == advisory.AdvisoryKey, document, options, cancellationToken).ConfigureAwait(false);
_logger.LogDebug("Upserted advisory {AdvisoryKey}", advisory.AdvisoryKey);
}
public async Task<Advisory?> FindAsync(string advisoryKey, CancellationToken cancellationToken)
{
ArgumentException.ThrowIfNullOrEmpty(advisoryKey);
var document = await _collection.Find(x => x.AdvisoryKey == advisoryKey)
.FirstOrDefaultAsync(cancellationToken)
.ConfigureAwait(false);
return document is null ? null : Deserialize(document.Payload);
}
public async Task<IReadOnlyList<Advisory>> GetRecentAsync(int limit, CancellationToken cancellationToken)
{
var cursor = await _collection.Find(FilterDefinition<AdvisoryDocument>.Empty)
.SortByDescending(x => x.Modified)
.Limit(limit)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
return cursor.Select(static doc => Deserialize(doc.Payload)).ToArray();
}
public async IAsyncEnumerable<Advisory> StreamAsync([EnumeratorCancellation] CancellationToken cancellationToken)
{
var options = new FindOptions<AdvisoryDocument>
{
Sort = Builders<AdvisoryDocument>.Sort.Ascending(static doc => doc.AdvisoryKey),
};
using var cursor = await _collection.FindAsync(
FilterDefinition<AdvisoryDocument>.Empty,
options,
cancellationToken)
.ConfigureAwait(false);
while (await cursor.MoveNextAsync(cancellationToken).ConfigureAwait(false))
{
foreach (var document in cursor.Current)
{
cancellationToken.ThrowIfCancellationRequested();
yield return Deserialize(document.Payload);
}
}
}
private static Advisory Deserialize(BsonDocument payload)
{
ArgumentNullException.ThrowIfNull(payload);
var advisoryKey = payload.GetValue("advisoryKey", defaultValue: null)?.AsString
?? throw new InvalidOperationException("advisoryKey missing from payload.");
var title = payload.GetValue("title", defaultValue: null)?.AsString ?? advisoryKey;
string? summary = payload.TryGetValue("summary", out var summaryValue) && summaryValue.IsString ? summaryValue.AsString : null;
string? language = payload.TryGetValue("language", out var languageValue) && languageValue.IsString ? languageValue.AsString : null;
DateTimeOffset? published = TryReadDateTime(payload, "published");
DateTimeOffset? modified = TryReadDateTime(payload, "modified");
string? severity = payload.TryGetValue("severity", out var severityValue) && severityValue.IsString ? severityValue.AsString : null;
var exploitKnown = payload.TryGetValue("exploitKnown", out var exploitValue) && exploitValue.IsBoolean && exploitValue.AsBoolean;
var aliases = payload.TryGetValue("aliases", out var aliasValue) && aliasValue is BsonArray aliasArray
? aliasArray.OfType<BsonValue>().Where(static x => x.IsString).Select(static x => x.AsString)
: Array.Empty<string>();
var references = payload.TryGetValue("references", out var referencesValue) && referencesValue is BsonArray referencesArray
? referencesArray.OfType<BsonDocument>().Select(DeserializeReference).ToArray()
: Array.Empty<AdvisoryReference>();
var affectedPackages = payload.TryGetValue("affectedPackages", out var affectedValue) && affectedValue is BsonArray affectedArray
? affectedArray.OfType<BsonDocument>().Select(DeserializeAffectedPackage).ToArray()
: Array.Empty<AffectedPackage>();
var cvssMetrics = payload.TryGetValue("cvssMetrics", out var cvssValue) && cvssValue is BsonArray cvssArray
? cvssArray.OfType<BsonDocument>().Select(DeserializeCvssMetric).ToArray()
: Array.Empty<CvssMetric>();
var provenance = payload.TryGetValue("provenance", out var provenanceValue) && provenanceValue is BsonArray provenanceArray
? provenanceArray.OfType<BsonDocument>().Select(DeserializeProvenance).ToArray()
: Array.Empty<AdvisoryProvenance>();
return new Advisory(
advisoryKey,
title,
summary,
language,
published,
modified,
severity,
exploitKnown,
aliases,
references,
affectedPackages,
cvssMetrics,
provenance);
}
private static AdvisoryReference DeserializeReference(BsonDocument document)
{
var url = document.GetValue("url", defaultValue: null)?.AsString
?? throw new InvalidOperationException("reference.url missing from payload.");
string? kind = document.TryGetValue("kind", out var kindValue) && kindValue.IsString ? kindValue.AsString : null;
string? sourceTag = document.TryGetValue("sourceTag", out var sourceTagValue) && sourceTagValue.IsString ? sourceTagValue.AsString : null;
string? summary = document.TryGetValue("summary", out var summaryValue) && summaryValue.IsString ? summaryValue.AsString : null;
var provenance = document.TryGetValue("provenance", out var provenanceValue) && provenanceValue.IsBsonDocument
? DeserializeProvenance(provenanceValue.AsBsonDocument)
: AdvisoryProvenance.Empty;
return new AdvisoryReference(url, kind, sourceTag, summary, provenance);
}
private static AffectedPackage DeserializeAffectedPackage(BsonDocument document)
{
var type = document.GetValue("type", defaultValue: null)?.AsString
?? throw new InvalidOperationException("affectedPackages.type missing from payload.");
var identifier = document.GetValue("identifier", defaultValue: null)?.AsString
?? throw new InvalidOperationException("affectedPackages.identifier missing from payload.");
string? platform = document.TryGetValue("platform", out var platformValue) && platformValue.IsString ? platformValue.AsString : null;
var versionRanges = document.TryGetValue("versionRanges", out var rangesValue) && rangesValue is BsonArray rangesArray
? rangesArray.OfType<BsonDocument>().Select(DeserializeVersionRange).ToArray()
: Array.Empty<AffectedVersionRange>();
var statuses = document.TryGetValue("statuses", out var statusesValue) && statusesValue is BsonArray statusesArray
? statusesArray.OfType<BsonDocument>().Select(DeserializeStatus).ToArray()
: Array.Empty<AffectedPackageStatus>();
var provenance = document.TryGetValue("provenance", out var provenanceValue) && provenanceValue is BsonArray provenanceArray
? provenanceArray.OfType<BsonDocument>().Select(DeserializeProvenance).ToArray()
: Array.Empty<AdvisoryProvenance>();
return new AffectedPackage(type, identifier, platform, versionRanges, statuses, provenance);
}
private static AffectedVersionRange DeserializeVersionRange(BsonDocument document)
{
var rangeKind = document.GetValue("rangeKind", defaultValue: null)?.AsString
?? throw new InvalidOperationException("versionRanges.rangeKind missing from payload.");
string? introducedVersion = document.TryGetValue("introducedVersion", out var introducedValue) && introducedValue.IsString ? introducedValue.AsString : null;
string? fixedVersion = document.TryGetValue("fixedVersion", out var fixedValue) && fixedValue.IsString ? fixedValue.AsString : null;
string? lastAffectedVersion = document.TryGetValue("lastAffectedVersion", out var lastValue) && lastValue.IsString ? lastValue.AsString : null;
string? rangeExpression = document.TryGetValue("rangeExpression", out var expressionValue) && expressionValue.IsString ? expressionValue.AsString : null;
var provenance = document.TryGetValue("provenance", out var provenanceValue) && provenanceValue.IsBsonDocument
? DeserializeProvenance(provenanceValue.AsBsonDocument)
: AdvisoryProvenance.Empty;
return new AffectedVersionRange(rangeKind, introducedVersion, fixedVersion, lastAffectedVersion, rangeExpression, provenance);
}
private static AffectedPackageStatus DeserializeStatus(BsonDocument document)
{
var status = document.GetValue("status", defaultValue: null)?.AsString
?? throw new InvalidOperationException("statuses.status missing from payload.");
var provenance = document.TryGetValue("provenance", out var provenanceValue) && provenanceValue.IsBsonDocument
? DeserializeProvenance(provenanceValue.AsBsonDocument)
: AdvisoryProvenance.Empty;
return new AffectedPackageStatus(status, provenance);
}
private static CvssMetric DeserializeCvssMetric(BsonDocument document)
{
var version = document.GetValue("version", defaultValue: null)?.AsString
?? throw new InvalidOperationException("cvssMetrics.version missing from payload.");
var vector = document.GetValue("vector", defaultValue: null)?.AsString
?? throw new InvalidOperationException("cvssMetrics.vector missing from payload.");
var baseScore = document.TryGetValue("baseScore", out var scoreValue) && scoreValue.IsNumeric ? scoreValue.ToDouble() : 0d;
var baseSeverity = document.GetValue("baseSeverity", defaultValue: null)?.AsString
?? throw new InvalidOperationException("cvssMetrics.baseSeverity missing from payload.");
var provenance = document.TryGetValue("provenance", out var provenanceValue) && provenanceValue.IsBsonDocument
? DeserializeProvenance(provenanceValue.AsBsonDocument)
: AdvisoryProvenance.Empty;
return new CvssMetric(version, vector, baseScore, baseSeverity, provenance);
}
private static AdvisoryProvenance DeserializeProvenance(BsonDocument document)
{
var source = document.GetValue("source", defaultValue: null)?.AsString
?? throw new InvalidOperationException("provenance.source missing from payload.");
var kind = document.GetValue("kind", defaultValue: null)?.AsString
?? throw new InvalidOperationException("provenance.kind missing from payload.");
string? value = document.TryGetValue("value", out var valueElement) && valueElement.IsString ? valueElement.AsString : null;
var recordedAt = TryConvertDateTime(document.GetValue("recordedAt", defaultValue: null));
return new AdvisoryProvenance(source, kind, value ?? string.Empty, recordedAt ?? DateTimeOffset.UtcNow);
}
private static DateTimeOffset? TryReadDateTime(BsonDocument document, string field)
=> document.TryGetValue(field, out var value) ? TryConvertDateTime(value) : null;
private static DateTimeOffset? TryConvertDateTime(BsonValue? value)
{
if (value is null)
{
return null;
}
return value switch
{
BsonDateTime dateTime => DateTime.SpecifyKind(dateTime.ToUniversalTime(), DateTimeKind.Utc),
BsonString stringValue when DateTimeOffset.TryParse(stringValue.AsString, out var parsed) => parsed.ToUniversalTime(),
_ => null,
};
}
}

View File

@@ -0,0 +1,14 @@
using StellaOps.Feedser.Models;
namespace StellaOps.Feedser.Storage.Mongo.Advisories;
public interface IAdvisoryStore
{
Task UpsertAsync(Advisory advisory, CancellationToken cancellationToken);
Task<Advisory?> FindAsync(string advisoryKey, CancellationToken cancellationToken);
Task<IReadOnlyList<Advisory>> GetRecentAsync(int limit, CancellationToken cancellationToken);
IAsyncEnumerable<Advisory> StreamAsync(CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,43 @@
using System;
using System.Collections.Generic;
using MongoDB.Bson;
using MongoDB.Bson.Serialization.Attributes;
namespace StellaOps.Feedser.Storage.Mongo.ChangeHistory;
[BsonIgnoreExtraElements]
public sealed class ChangeHistoryDocument
{
[BsonId]
public Guid Id { get; set; }
[BsonElement("source")]
public string SourceName { get; set; } = string.Empty;
[BsonElement("advisoryKey")]
public string AdvisoryKey { get; set; } = string.Empty;
[BsonElement("documentId")]
public Guid DocumentId { get; set; }
[BsonElement("documentSha256")]
public string DocumentSha256 { get; set; } = string.Empty;
[BsonElement("currentHash")]
public string CurrentHash { get; set; } = string.Empty;
[BsonElement("previousHash")]
public string? PreviousHash { get; set; }
[BsonElement("currentSnapshot")]
public string CurrentSnapshot { get; set; } = string.Empty;
[BsonElement("previousSnapshot")]
public string? PreviousSnapshot { get; set; }
[BsonElement("changes")]
public List<BsonDocument> Changes { get; set; } = new();
[BsonElement("capturedAt")]
public DateTime CapturedAt { get; set; }
}

View File

@@ -0,0 +1,70 @@
using System;
using System.Collections.Generic;
using MongoDB.Bson;
namespace StellaOps.Feedser.Storage.Mongo.ChangeHistory;
internal static class ChangeHistoryDocumentExtensions
{
public static ChangeHistoryDocument ToDocument(this ChangeHistoryRecord record)
{
var changes = new List<BsonDocument>(record.Changes.Count);
foreach (var change in record.Changes)
{
changes.Add(new BsonDocument
{
["field"] = change.Field,
["type"] = change.ChangeType,
["previous"] = change.PreviousValue is null ? BsonNull.Value : new BsonString(change.PreviousValue),
["current"] = change.CurrentValue is null ? BsonNull.Value : new BsonString(change.CurrentValue),
});
}
return new ChangeHistoryDocument
{
Id = record.Id,
SourceName = record.SourceName,
AdvisoryKey = record.AdvisoryKey,
DocumentId = record.DocumentId,
DocumentSha256 = record.DocumentSha256,
CurrentHash = record.CurrentHash,
PreviousHash = record.PreviousHash,
CurrentSnapshot = record.CurrentSnapshot,
PreviousSnapshot = record.PreviousSnapshot,
Changes = changes,
CapturedAt = record.CapturedAt.UtcDateTime,
};
}
public static ChangeHistoryRecord ToRecord(this ChangeHistoryDocument document)
{
var changes = new List<ChangeHistoryFieldChange>(document.Changes.Count);
foreach (var change in document.Changes)
{
var previousValue = change.TryGetValue("previous", out var previousBson) && previousBson is not BsonNull
? previousBson.AsString
: null;
var currentValue = change.TryGetValue("current", out var currentBson) && currentBson is not BsonNull
? currentBson.AsString
: null;
var fieldName = change.GetValue("field", "").AsString;
var changeType = change.GetValue("type", "").AsString;
changes.Add(new ChangeHistoryFieldChange(fieldName, changeType, previousValue, currentValue));
}
var capturedAtUtc = DateTime.SpecifyKind(document.CapturedAt, DateTimeKind.Utc);
return new ChangeHistoryRecord(
document.Id,
document.SourceName,
document.AdvisoryKey,
document.DocumentId,
document.DocumentSha256,
document.CurrentHash,
document.PreviousHash,
document.CurrentSnapshot,
document.PreviousSnapshot,
changes,
new DateTimeOffset(capturedAtUtc));
}
}

View File

@@ -0,0 +1,24 @@
using System;
namespace StellaOps.Feedser.Storage.Mongo.ChangeHistory;
public sealed record ChangeHistoryFieldChange
{
public ChangeHistoryFieldChange(string field, string changeType, string? previousValue, string? currentValue)
{
ArgumentException.ThrowIfNullOrEmpty(field);
ArgumentException.ThrowIfNullOrEmpty(changeType);
Field = field;
ChangeType = changeType;
PreviousValue = previousValue;
CurrentValue = currentValue;
}
public string Field { get; }
public string ChangeType { get; }
public string? PreviousValue { get; }
public string? CurrentValue { get; }
}

View File

@@ -0,0 +1,62 @@
using System;
using System.Collections.Generic;
namespace StellaOps.Feedser.Storage.Mongo.ChangeHistory;
public sealed class ChangeHistoryRecord
{
public ChangeHistoryRecord(
Guid id,
string sourceName,
string advisoryKey,
Guid documentId,
string documentSha256,
string currentHash,
string? previousHash,
string currentSnapshot,
string? previousSnapshot,
IReadOnlyList<ChangeHistoryFieldChange> changes,
DateTimeOffset capturedAt)
{
ArgumentException.ThrowIfNullOrEmpty(sourceName);
ArgumentException.ThrowIfNullOrEmpty(advisoryKey);
ArgumentException.ThrowIfNullOrEmpty(documentSha256);
ArgumentException.ThrowIfNullOrEmpty(currentHash);
ArgumentException.ThrowIfNullOrEmpty(currentSnapshot);
ArgumentNullException.ThrowIfNull(changes);
Id = id;
SourceName = sourceName;
AdvisoryKey = advisoryKey;
DocumentId = documentId;
DocumentSha256 = documentSha256;
CurrentHash = currentHash;
PreviousHash = previousHash;
CurrentSnapshot = currentSnapshot;
PreviousSnapshot = previousSnapshot;
Changes = changes;
CapturedAt = capturedAt;
}
public Guid Id { get; }
public string SourceName { get; }
public string AdvisoryKey { get; }
public Guid DocumentId { get; }
public string DocumentSha256 { get; }
public string CurrentHash { get; }
public string? PreviousHash { get; }
public string CurrentSnapshot { get; }
public string? PreviousSnapshot { get; }
public IReadOnlyList<ChangeHistoryFieldChange> Changes { get; }
public DateTimeOffset CapturedAt { get; }
}

View File

@@ -0,0 +1,12 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace StellaOps.Feedser.Storage.Mongo.ChangeHistory;
public interface IChangeHistoryStore
{
Task AddAsync(ChangeHistoryRecord record, CancellationToken cancellationToken);
Task<IReadOnlyList<ChangeHistoryRecord>> GetRecentAsync(string sourceName, string advisoryKey, int limit, CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,53 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using MongoDB.Driver;
namespace StellaOps.Feedser.Storage.Mongo.ChangeHistory;
public sealed class MongoChangeHistoryStore : IChangeHistoryStore
{
private readonly IMongoCollection<ChangeHistoryDocument> _collection;
private readonly ILogger<MongoChangeHistoryStore> _logger;
public MongoChangeHistoryStore(IMongoDatabase database, ILogger<MongoChangeHistoryStore> logger)
{
_collection = (database ?? throw new ArgumentNullException(nameof(database)))
.GetCollection<ChangeHistoryDocument>(MongoStorageDefaults.Collections.ChangeHistory);
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task AddAsync(ChangeHistoryRecord record, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(record);
var document = record.ToDocument();
await _collection.InsertOneAsync(document, cancellationToken: cancellationToken).ConfigureAwait(false);
_logger.LogDebug("Recorded change history for {Source}/{Advisory} with hash {Hash}", record.SourceName, record.AdvisoryKey, record.CurrentHash);
}
public async Task<IReadOnlyList<ChangeHistoryRecord>> GetRecentAsync(string sourceName, string advisoryKey, int limit, CancellationToken cancellationToken)
{
ArgumentException.ThrowIfNullOrEmpty(sourceName);
ArgumentException.ThrowIfNullOrEmpty(advisoryKey);
if (limit <= 0)
{
limit = 10;
}
var cursor = await _collection.Find(x => x.SourceName == sourceName && x.AdvisoryKey == advisoryKey)
.SortByDescending(x => x.CapturedAt)
.Limit(limit)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
var records = new List<ChangeHistoryRecord>(cursor.Count);
foreach (var document in cursor)
{
records.Add(document.ToRecord());
}
return records;
}
}

View File

@@ -0,0 +1,130 @@
using MongoDB.Bson;
using MongoDB.Bson.Serialization.Attributes;
namespace StellaOps.Feedser.Storage.Mongo.Documents;
[BsonIgnoreExtraElements]
public sealed class DocumentDocument
{
[BsonId]
public Guid Id { get; set; }
[BsonElement("sourceName")]
public string SourceName { get; set; } = string.Empty;
[BsonElement("uri")]
public string Uri { get; set; } = string.Empty;
[BsonElement("fetchedAt")]
public DateTime FetchedAt { get; set; }
[BsonElement("sha256")]
public string Sha256 { get; set; } = string.Empty;
[BsonElement("status")]
public string Status { get; set; } = string.Empty;
[BsonElement("contentType")]
[BsonIgnoreIfNull]
public string? ContentType { get; set; }
[BsonElement("headers")]
[BsonIgnoreIfNull]
public BsonDocument? Headers { get; set; }
[BsonElement("metadata")]
[BsonIgnoreIfNull]
public BsonDocument? Metadata { get; set; }
[BsonElement("etag")]
[BsonIgnoreIfNull]
public string? Etag { get; set; }
[BsonElement("lastModified")]
[BsonIgnoreIfNull]
public DateTime? LastModified { get; set; }
[BsonElement("expiresAt")]
[BsonIgnoreIfNull]
public DateTime? ExpiresAt { get; set; }
[BsonElement("gridFsId")]
[BsonIgnoreIfNull]
public ObjectId? GridFsId { get; set; }
}
internal static class DocumentDocumentExtensions
{
public static DocumentDocument FromRecord(DocumentRecord record)
{
return new DocumentDocument
{
Id = record.Id,
SourceName = record.SourceName,
Uri = record.Uri,
FetchedAt = record.FetchedAt.UtcDateTime,
Sha256 = record.Sha256,
Status = record.Status,
ContentType = record.ContentType,
Headers = ToBson(record.Headers),
Metadata = ToBson(record.Metadata),
Etag = record.Etag,
LastModified = record.LastModified?.UtcDateTime,
GridFsId = record.GridFsId,
ExpiresAt = record.ExpiresAt?.UtcDateTime,
};
}
public static DocumentRecord ToRecord(this DocumentDocument document)
{
IReadOnlyDictionary<string, string>? headers = null;
if (document.Headers is not null)
{
headers = document.Headers.Elements.ToDictionary(
static e => e.Name,
static e => e.Value?.ToString() ?? string.Empty,
StringComparer.Ordinal);
}
IReadOnlyDictionary<string, string>? metadata = null;
if (document.Metadata is not null)
{
metadata = document.Metadata.Elements.ToDictionary(
static e => e.Name,
static e => e.Value?.ToString() ?? string.Empty,
StringComparer.Ordinal);
}
return new DocumentRecord(
document.Id,
document.SourceName,
document.Uri,
DateTime.SpecifyKind(document.FetchedAt, DateTimeKind.Utc),
document.Sha256,
document.Status,
document.ContentType,
headers,
metadata,
document.Etag,
document.LastModified.HasValue ? DateTime.SpecifyKind(document.LastModified.Value, DateTimeKind.Utc) : null,
document.GridFsId,
document.ExpiresAt.HasValue ? DateTime.SpecifyKind(document.ExpiresAt.Value, DateTimeKind.Utc) : null);
}
private static BsonDocument? ToBson(IReadOnlyDictionary<string, string>? values)
{
if (values is null)
{
return null;
}
var document = new BsonDocument();
foreach (var kvp in values)
{
document[kvp.Key] = kvp.Value;
}
return document;
}
}

View File

@@ -0,0 +1,22 @@
using MongoDB.Bson;
namespace StellaOps.Feedser.Storage.Mongo.Documents;
public sealed record DocumentRecord(
Guid Id,
string SourceName,
string Uri,
DateTimeOffset FetchedAt,
string Sha256,
string Status,
string? ContentType,
IReadOnlyDictionary<string, string>? Headers,
IReadOnlyDictionary<string, string>? Metadata,
string? Etag,
DateTimeOffset? LastModified,
ObjectId? GridFsId,
DateTimeOffset? ExpiresAt = null)
{
public DocumentRecord WithStatus(string status)
=> this with { Status = status };
}

View File

@@ -0,0 +1,66 @@
using Microsoft.Extensions.Logging;
using MongoDB.Driver;
namespace StellaOps.Feedser.Storage.Mongo.Documents;
public sealed class DocumentStore : IDocumentStore
{
private readonly IMongoCollection<DocumentDocument> _collection;
private readonly ILogger<DocumentStore> _logger;
public DocumentStore(IMongoDatabase database, ILogger<DocumentStore> logger)
{
_collection = (database ?? throw new ArgumentNullException(nameof(database)))
.GetCollection<DocumentDocument>(MongoStorageDefaults.Collections.Document);
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<DocumentRecord> UpsertAsync(DocumentRecord record, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(record);
var document = DocumentDocumentExtensions.FromRecord(record);
var filter = Builders<DocumentDocument>.Filter.Eq(x => x.SourceName, record.SourceName)
& Builders<DocumentDocument>.Filter.Eq(x => x.Uri, record.Uri);
var options = new FindOneAndReplaceOptions<DocumentDocument>
{
IsUpsert = true,
ReturnDocument = ReturnDocument.After,
};
var replaced = await _collection.FindOneAndReplaceAsync(filter, document, options, cancellationToken).ConfigureAwait(false);
_logger.LogDebug("Upserted document {Source}/{Uri}", record.SourceName, record.Uri);
return (replaced ?? document).ToRecord();
}
public async Task<DocumentRecord?> FindBySourceAndUriAsync(string sourceName, string uri, CancellationToken cancellationToken)
{
ArgumentException.ThrowIfNullOrEmpty(sourceName);
ArgumentException.ThrowIfNullOrEmpty(uri);
var filter = Builders<DocumentDocument>.Filter.Eq(x => x.SourceName, sourceName)
& Builders<DocumentDocument>.Filter.Eq(x => x.Uri, uri);
var document = await _collection.Find(filter).FirstOrDefaultAsync(cancellationToken).ConfigureAwait(false);
return document?.ToRecord();
}
public async Task<DocumentRecord?> FindAsync(Guid id, CancellationToken cancellationToken)
{
var document = await _collection.Find(x => x.Id == id).FirstOrDefaultAsync(cancellationToken).ConfigureAwait(false);
return document?.ToRecord();
}
public async Task<bool> UpdateStatusAsync(Guid id, string status, CancellationToken cancellationToken)
{
ArgumentException.ThrowIfNullOrEmpty(status);
var update = Builders<DocumentDocument>.Update
.Set(x => x.Status, status)
.Set(x => x.LastModified, DateTime.UtcNow);
var result = await _collection.UpdateOneAsync(x => x.Id == id, update, cancellationToken: cancellationToken).ConfigureAwait(false);
return result.MatchedCount > 0;
}
}

View File

@@ -0,0 +1,12 @@
namespace StellaOps.Feedser.Storage.Mongo.Documents;
public interface IDocumentStore
{
Task<DocumentRecord> UpsertAsync(DocumentRecord record, CancellationToken cancellationToken);
Task<DocumentRecord?> FindBySourceAndUriAsync(string sourceName, string uri, CancellationToken cancellationToken);
Task<DocumentRecord?> FindAsync(Guid id, CancellationToken cancellationToken);
Task<bool> UpdateStatusAsync(Guid id, string status, CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,49 @@
using MongoDB.Bson;
using MongoDB.Bson.Serialization.Attributes;
namespace StellaOps.Feedser.Storage.Mongo.Dtos;
[BsonIgnoreExtraElements]
public sealed class DtoDocument
{
[BsonId]
public Guid Id { get; set; }
[BsonElement("documentId")]
public Guid DocumentId { get; set; }
[BsonElement("sourceName")]
public string SourceName { get; set; } = string.Empty;
[BsonElement("schemaVersion")]
public string SchemaVersion { get; set; } = string.Empty;
[BsonElement("payload")]
public BsonDocument Payload { get; set; } = new();
[BsonElement("validatedAt")]
public DateTime ValidatedAt { get; set; }
}
internal static class DtoDocumentExtensions
{
public static DtoDocument FromRecord(DtoRecord record)
=> new()
{
Id = record.Id,
DocumentId = record.DocumentId,
SourceName = record.SourceName,
SchemaVersion = record.SchemaVersion,
Payload = record.Payload ?? new BsonDocument(),
ValidatedAt = record.ValidatedAt.UtcDateTime,
};
public static DtoRecord ToRecord(this DtoDocument document)
=> new(
document.Id,
document.DocumentId,
document.SourceName,
document.SchemaVersion,
document.Payload,
DateTime.SpecifyKind(document.ValidatedAt, DateTimeKind.Utc));
}

View File

@@ -0,0 +1,11 @@
using MongoDB.Bson;
namespace StellaOps.Feedser.Storage.Mongo.Dtos;
public sealed record DtoRecord(
Guid Id,
Guid DocumentId,
string SourceName,
string SchemaVersion,
BsonDocument Payload,
DateTimeOffset ValidatedAt);

View File

@@ -0,0 +1,55 @@
using Microsoft.Extensions.Logging;
using MongoDB.Driver;
namespace StellaOps.Feedser.Storage.Mongo.Dtos;
public sealed class DtoStore : IDtoStore
{
private readonly IMongoCollection<DtoDocument> _collection;
private readonly ILogger<DtoStore> _logger;
public DtoStore(IMongoDatabase database, ILogger<DtoStore> logger)
{
_collection = (database ?? throw new ArgumentNullException(nameof(database)))
.GetCollection<DtoDocument>(MongoStorageDefaults.Collections.Dto);
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<DtoRecord> UpsertAsync(DtoRecord record, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(record);
var document = DtoDocumentExtensions.FromRecord(record);
var filter = Builders<DtoDocument>.Filter.Eq(x => x.DocumentId, record.DocumentId)
& Builders<DtoDocument>.Filter.Eq(x => x.SourceName, record.SourceName);
var options = new FindOneAndReplaceOptions<DtoDocument>
{
IsUpsert = true,
ReturnDocument = ReturnDocument.After,
};
var replaced = await _collection.FindOneAndReplaceAsync(filter, document, options, cancellationToken).ConfigureAwait(false);
_logger.LogDebug("Upserted DTO for {Source}/{DocumentId}", record.SourceName, record.DocumentId);
return (replaced ?? document).ToRecord();
}
public async Task<DtoRecord?> FindByDocumentIdAsync(Guid documentId, CancellationToken cancellationToken)
{
var document = await _collection.Find(x => x.DocumentId == documentId)
.FirstOrDefaultAsync(cancellationToken)
.ConfigureAwait(false);
return document?.ToRecord();
}
public async Task<IReadOnlyList<DtoRecord>> GetBySourceAsync(string sourceName, int limit, CancellationToken cancellationToken)
{
var cursor = await _collection.Find(x => x.SourceName == sourceName)
.SortByDescending(x => x.ValidatedAt)
.Limit(limit)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
return cursor.Select(static x => x.ToRecord()).ToArray();
}
}

View File

@@ -0,0 +1,10 @@
namespace StellaOps.Feedser.Storage.Mongo.Dtos;
public interface IDtoStore
{
Task<DtoRecord> UpsertAsync(DtoRecord record, CancellationToken cancellationToken);
Task<DtoRecord?> FindByDocumentIdAsync(Guid documentId, CancellationToken cancellationToken);
Task<IReadOnlyList<DtoRecord>> GetBySourceAsync(string sourceName, int limit, CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,63 @@
using MongoDB.Bson.Serialization.Attributes;
namespace StellaOps.Feedser.Storage.Mongo.Exporting;
[BsonIgnoreExtraElements]
public sealed class ExportStateDocument
{
[BsonId]
public string Id { get; set; } = string.Empty;
[BsonElement("baseExportId")]
public string? BaseExportId { get; set; }
[BsonElement("baseDigest")]
public string? BaseDigest { get; set; }
[BsonElement("lastFullDigest")]
public string? LastFullDigest { get; set; }
[BsonElement("lastDeltaDigest")]
public string? LastDeltaDigest { get; set; }
[BsonElement("exportCursor")]
public string? ExportCursor { get; set; }
[BsonElement("targetRepo")]
public string? TargetRepository { get; set; }
[BsonElement("exporterVersion")]
public string? ExporterVersion { get; set; }
[BsonElement("updatedAt")]
public DateTime UpdatedAt { get; set; }
}
internal static class ExportStateDocumentExtensions
{
public static ExportStateDocument FromRecord(ExportStateRecord record)
=> new()
{
Id = record.Id,
BaseExportId = record.BaseExportId,
BaseDigest = record.BaseDigest,
LastFullDigest = record.LastFullDigest,
LastDeltaDigest = record.LastDeltaDigest,
ExportCursor = record.ExportCursor,
TargetRepository = record.TargetRepository,
ExporterVersion = record.ExporterVersion,
UpdatedAt = record.UpdatedAt.UtcDateTime,
};
public static ExportStateRecord ToRecord(this ExportStateDocument document)
=> new(
document.Id,
document.BaseExportId,
document.BaseDigest,
document.LastFullDigest,
document.LastDeltaDigest,
document.ExportCursor,
document.TargetRepository,
document.ExporterVersion,
DateTime.SpecifyKind(document.UpdatedAt, DateTimeKind.Utc));
}

View File

@@ -0,0 +1,102 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace StellaOps.Feedser.Storage.Mongo.Exporting;
/// <summary>
/// Helper for exporters to read and persist their export metadata in Mongo-backed storage.
/// </summary>
public sealed class ExportStateManager
{
private readonly IExportStateStore _store;
private readonly TimeProvider _timeProvider;
public ExportStateManager(IExportStateStore store, TimeProvider? timeProvider = null)
{
_store = store ?? throw new ArgumentNullException(nameof(store));
_timeProvider = timeProvider ?? TimeProvider.System;
}
public Task<ExportStateRecord?> GetAsync(string exporterId, CancellationToken cancellationToken)
{
ArgumentException.ThrowIfNullOrEmpty(exporterId);
return _store.FindAsync(exporterId, cancellationToken);
}
public async Task<ExportStateRecord> StoreFullExportAsync(
string exporterId,
string exportId,
string exportDigest,
string? cursor,
string? targetRepository,
string exporterVersion,
CancellationToken cancellationToken)
{
ArgumentException.ThrowIfNullOrEmpty(exporterId);
ArgumentException.ThrowIfNullOrEmpty(exportId);
ArgumentException.ThrowIfNullOrEmpty(exportDigest);
ArgumentException.ThrowIfNullOrEmpty(exporterVersion);
var existing = await _store.FindAsync(exporterId, cancellationToken).ConfigureAwait(false);
var repository = string.IsNullOrWhiteSpace(targetRepository) ? existing?.TargetRepository : targetRepository;
var now = _timeProvider.GetUtcNow();
var baseExportId = existing?.BaseExportId ?? exportId;
var baseDigest = existing?.BaseDigest ?? exportDigest;
var record = existing is null
? new ExportStateRecord(
exporterId,
baseExportId,
baseDigest,
exportDigest,
LastDeltaDigest: null,
ExportCursor: cursor ?? exportDigest,
TargetRepository: repository,
ExporterVersion: exporterVersion,
UpdatedAt: now)
: existing with
{
BaseExportId = baseExportId,
BaseDigest = baseDigest,
LastFullDigest = exportDigest,
LastDeltaDigest = null,
ExportCursor = cursor ?? existing.ExportCursor,
TargetRepository = repository,
ExporterVersion = exporterVersion,
UpdatedAt = now,
};
return await _store.UpsertAsync(record, cancellationToken).ConfigureAwait(false);
}
public async Task<ExportStateRecord> StoreDeltaExportAsync(
string exporterId,
string deltaDigest,
string? cursor,
string exporterVersion,
CancellationToken cancellationToken)
{
ArgumentException.ThrowIfNullOrEmpty(exporterId);
ArgumentException.ThrowIfNullOrEmpty(deltaDigest);
ArgumentException.ThrowIfNullOrEmpty(exporterVersion);
var existing = await _store.FindAsync(exporterId, cancellationToken).ConfigureAwait(false);
if (existing is null)
{
throw new InvalidOperationException($"Full export state missing for '{exporterId}'.");
}
var now = _timeProvider.GetUtcNow();
var record = existing with
{
LastDeltaDigest = deltaDigest,
ExportCursor = cursor ?? existing.ExportCursor,
ExporterVersion = exporterVersion,
UpdatedAt = now,
};
return await _store.UpsertAsync(record, cancellationToken).ConfigureAwait(false);
}
}

View File

@@ -0,0 +1,12 @@
namespace StellaOps.Feedser.Storage.Mongo.Exporting;
public sealed record ExportStateRecord(
string Id,
string? BaseExportId,
string? BaseDigest,
string? LastFullDigest,
string? LastDeltaDigest,
string? ExportCursor,
string? TargetRepository,
string? ExporterVersion,
DateTimeOffset UpdatedAt);

View File

@@ -0,0 +1,43 @@
using Microsoft.Extensions.Logging;
using MongoDB.Driver;
namespace StellaOps.Feedser.Storage.Mongo.Exporting;
public sealed class ExportStateStore : IExportStateStore
{
private readonly IMongoCollection<ExportStateDocument> _collection;
private readonly ILogger<ExportStateStore> _logger;
public ExportStateStore(IMongoDatabase database, ILogger<ExportStateStore> logger)
{
_collection = (database ?? throw new ArgumentNullException(nameof(database)))
.GetCollection<ExportStateDocument>(MongoStorageDefaults.Collections.ExportState);
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<ExportStateRecord> UpsertAsync(ExportStateRecord record, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(record);
var document = ExportStateDocumentExtensions.FromRecord(record);
var options = new FindOneAndReplaceOptions<ExportStateDocument>
{
IsUpsert = true,
ReturnDocument = ReturnDocument.After,
};
var replaced = await _collection.FindOneAndReplaceAsync<ExportStateDocument, ExportStateDocument>(
x => x.Id == record.Id,
document,
options,
cancellationToken).ConfigureAwait(false);
_logger.LogDebug("Stored export state {StateId}", record.Id);
return (replaced ?? document).ToRecord();
}
public async Task<ExportStateRecord?> FindAsync(string id, CancellationToken cancellationToken)
{
var document = await _collection.Find(x => x.Id == id).FirstOrDefaultAsync(cancellationToken).ConfigureAwait(false);
return document?.ToRecord();
}
}

View File

@@ -0,0 +1,8 @@
namespace StellaOps.Feedser.Storage.Mongo.Exporting;
public interface IExportStateStore
{
Task<ExportStateRecord> UpsertAsync(ExportStateRecord record, CancellationToken cancellationToken);
Task<ExportStateRecord?> FindAsync(string id, CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,14 @@
using MongoDB.Bson;
namespace StellaOps.Feedser.Storage.Mongo;
public interface ISourceStateRepository
{
Task<SourceStateRecord?> TryGetAsync(string sourceName, CancellationToken cancellationToken);
Task<SourceStateRecord> UpsertAsync(SourceStateRecord record, CancellationToken cancellationToken);
Task<SourceStateRecord?> UpdateCursorAsync(string sourceName, BsonDocument cursor, DateTimeOffset completedAt, CancellationToken cancellationToken);
Task<SourceStateRecord?> MarkFailureAsync(string sourceName, DateTimeOffset failedAt, TimeSpan? backoff, string? failureReason, CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,38 @@
using MongoDB.Bson.Serialization.Attributes;
using StellaOps.Feedser.Core.Jobs;
namespace StellaOps.Feedser.Storage.Mongo;
[BsonIgnoreExtraElements]
public sealed class JobLeaseDocument
{
[BsonId]
public string Key { get; set; } = string.Empty;
[BsonElement("holder")]
public string Holder { get; set; } = string.Empty;
[BsonElement("acquiredAt")]
public DateTime AcquiredAt { get; set; }
[BsonElement("heartbeatAt")]
public DateTime HeartbeatAt { get; set; }
[BsonElement("leaseMs")]
public long LeaseMs { get; set; }
[BsonElement("ttlAt")]
public DateTime TtlAt { get; set; }
}
internal static class JobLeaseDocumentExtensions
{
public static JobLease ToLease(this JobLeaseDocument document)
=> new(
document.Key,
document.Holder,
DateTime.SpecifyKind(document.AcquiredAt, DateTimeKind.Utc),
DateTime.SpecifyKind(document.HeartbeatAt, DateTimeKind.Utc),
TimeSpan.FromMilliseconds(document.LeaseMs),
DateTime.SpecifyKind(document.TtlAt, DateTimeKind.Utc));
}

View File

@@ -0,0 +1,119 @@
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using MongoDB.Bson;
using MongoDB.Bson.Serialization.Attributes;
using StellaOps.Feedser.Core.Jobs;
namespace StellaOps.Feedser.Storage.Mongo;
[BsonIgnoreExtraElements]
public sealed class JobRunDocument
{
[BsonId]
[BsonGuidRepresentation(GuidRepresentation.Standard)]
public Guid Id { get; set; }
[BsonElement("kind")]
public string Kind { get; set; } = string.Empty;
[BsonElement("status")]
public string Status { get; set; } = JobRunStatus.Pending.ToString();
[BsonElement("trigger")]
public string Trigger { get; set; } = string.Empty;
[BsonElement("parameters")]
public BsonDocument Parameters { get; set; } = new();
[BsonElement("parametersHash")]
[BsonIgnoreIfNull]
public string? ParametersHash { get; set; }
[BsonElement("createdAt")]
public DateTime CreatedAt { get; set; }
[BsonElement("startedAt")]
[BsonIgnoreIfNull]
public DateTime? StartedAt { get; set; }
[BsonElement("completedAt")]
[BsonIgnoreIfNull]
public DateTime? CompletedAt { get; set; }
[BsonElement("error")]
[BsonIgnoreIfNull]
public string? Error { get; set; }
[BsonElement("timeoutMs")]
[BsonIgnoreIfNull]
public long? TimeoutMs { get; set; }
[BsonElement("leaseMs")]
[BsonIgnoreIfNull]
public long? LeaseMs { get; set; }
}
internal static class JobRunDocumentExtensions
{
public static JobRunDocument FromRequest(JobRunCreateRequest request, Guid id)
{
return new JobRunDocument
{
Id = id,
Kind = request.Kind,
Status = JobRunStatus.Pending.ToString(),
Trigger = request.Trigger,
Parameters = request.Parameters is { Count: > 0 }
? BsonDocument.Parse(JsonSerializer.Serialize(request.Parameters))
: new BsonDocument(),
ParametersHash = request.ParametersHash,
CreatedAt = request.CreatedAt.UtcDateTime,
TimeoutMs = request.Timeout?.MillisecondsFromTimespan(),
LeaseMs = request.LeaseDuration?.MillisecondsFromTimespan(),
};
}
public static JobRunSnapshot ToSnapshot(this JobRunDocument document)
{
var parameters = document.Parameters?.ToDictionary() ?? new Dictionary<string, object?>();
return new JobRunSnapshot(
document.Id,
document.Kind,
Enum.Parse<JobRunStatus>(document.Status, ignoreCase: true),
DateTime.SpecifyKind(document.CreatedAt, DateTimeKind.Utc),
document.StartedAt.HasValue ? DateTime.SpecifyKind(document.StartedAt.Value, DateTimeKind.Utc) : null,
document.CompletedAt.HasValue ? DateTime.SpecifyKind(document.CompletedAt.Value, DateTimeKind.Utc) : null,
document.Trigger,
document.ParametersHash,
document.Error,
document.TimeoutMs?.MillisecondsToTimespan(),
document.LeaseMs?.MillisecondsToTimespan(),
parameters);
}
public static Dictionary<string, object?> ToDictionary(this BsonDocument document)
{
return document.Elements.ToDictionary(
static element => element.Name,
static element => element.Value switch
{
BsonString s => (object?)s.AsString,
BsonBoolean b => b.AsBoolean,
BsonInt32 i => i.AsInt32,
BsonInt64 l => l.AsInt64,
BsonDouble d => d.AsDouble,
BsonNull => null,
BsonArray array => array.Select(v => v.IsBsonDocument ? ToDictionary(v.AsBsonDocument) : (object?)v.ToString()).ToArray(),
BsonDocument doc => ToDictionary(doc),
_ => element.Value.ToString(),
});
}
private static long MillisecondsFromTimespan(this TimeSpan timeSpan)
=> (long)timeSpan.TotalMilliseconds;
private static TimeSpan MillisecondsToTimespan(this long milliseconds)
=> TimeSpan.FromMilliseconds(milliseconds);
}

View File

@@ -0,0 +1,11 @@
using System.Threading;
using System.Threading.Tasks;
namespace StellaOps.Feedser.Storage.Mongo.JpFlags;
public interface IJpFlagStore
{
Task UpsertAsync(JpFlagRecord record, CancellationToken cancellationToken);
Task<JpFlagRecord?> FindAsync(string advisoryKey, CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,54 @@
using MongoDB.Bson.Serialization.Attributes;
namespace StellaOps.Feedser.Storage.Mongo.JpFlags;
[BsonIgnoreExtraElements]
public sealed class JpFlagDocument
{
[BsonId]
[BsonElement("advisoryKey")]
public string AdvisoryKey { get; set; } = string.Empty;
[BsonElement("sourceName")]
public string SourceName { get; set; } = string.Empty;
[BsonElement("category")]
[BsonIgnoreIfNull]
public string? Category { get; set; }
[BsonElement("vendorStatus")]
[BsonIgnoreIfNull]
public string? VendorStatus { get; set; }
[BsonElement("recordedAt")]
public DateTime RecordedAt { get; set; }
}
internal static class JpFlagDocumentExtensions
{
public static JpFlagDocument FromRecord(JpFlagRecord record)
{
ArgumentNullException.ThrowIfNull(record);
return new JpFlagDocument
{
AdvisoryKey = record.AdvisoryKey,
SourceName = record.SourceName,
Category = record.Category,
VendorStatus = record.VendorStatus,
RecordedAt = record.RecordedAt.UtcDateTime,
};
}
public static JpFlagRecord ToRecord(this JpFlagDocument document)
{
ArgumentNullException.ThrowIfNull(document);
return new JpFlagRecord(
document.AdvisoryKey,
document.SourceName,
document.Category,
document.VendorStatus,
DateTime.SpecifyKind(document.RecordedAt, DateTimeKind.Utc));
}
}

View File

@@ -0,0 +1,15 @@
namespace StellaOps.Feedser.Storage.Mongo.JpFlags;
/// <summary>
/// Captures Japan-specific enrichment flags derived from JVN payloads.
/// </summary>
public sealed record JpFlagRecord(
string AdvisoryKey,
string SourceName,
string? Category,
string? VendorStatus,
DateTimeOffset RecordedAt)
{
public JpFlagRecord WithRecordedAt(DateTimeOffset recordedAt)
=> this with { RecordedAt = recordedAt.ToUniversalTime() };
}

View File

@@ -0,0 +1,39 @@
using Microsoft.Extensions.Logging;
using MongoDB.Driver;
namespace StellaOps.Feedser.Storage.Mongo.JpFlags;
public sealed class JpFlagStore : IJpFlagStore
{
private readonly IMongoCollection<JpFlagDocument> _collection;
private readonly ILogger<JpFlagStore> _logger;
public JpFlagStore(IMongoDatabase database, ILogger<JpFlagStore> logger)
{
ArgumentNullException.ThrowIfNull(database);
ArgumentNullException.ThrowIfNull(logger);
_collection = database.GetCollection<JpFlagDocument>(MongoStorageDefaults.Collections.JpFlags);
_logger = logger;
}
public async Task UpsertAsync(JpFlagRecord record, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(record);
var document = JpFlagDocumentExtensions.FromRecord(record);
var filter = Builders<JpFlagDocument>.Filter.Eq(x => x.AdvisoryKey, record.AdvisoryKey);
var options = new ReplaceOptions { IsUpsert = true };
await _collection.ReplaceOneAsync(filter, document, options, cancellationToken).ConfigureAwait(false);
_logger.LogDebug("Upserted jp_flag for {AdvisoryKey}", record.AdvisoryKey);
}
public async Task<JpFlagRecord?> FindAsync(string advisoryKey, CancellationToken cancellationToken)
{
ArgumentException.ThrowIfNullOrEmpty(advisoryKey);
var filter = Builders<JpFlagDocument>.Filter.Eq(x => x.AdvisoryKey, advisoryKey);
var document = await _collection.Find(filter).FirstOrDefaultAsync(cancellationToken).ConfigureAwait(false);
return document?.ToRecord();
}
}

View File

@@ -0,0 +1,37 @@
# Mongo Schema Migration Playbook
This module owns the persistent shape of Feedser's MongoDB database. Upgrades must be deterministic and safe to run on live replicas. The `MongoMigrationRunner` executes idempotent migrations on startup immediately after the bootstrapper completes its collection and index checks.
## Execution Path
1. `StellaOps.Feedser.WebService` calls `MongoBootstrapper.InitializeAsync()` during startup.
2. Once collections and baseline indexes are ensured, the bootstrapper invokes `MongoMigrationRunner.RunAsync()`.
3. Each `IMongoMigration` implementation is sorted by its `Id` (ordinal compare) and executed exactly once. Completion is recorded in the `schema_migrations` collection.
4. Failures surface during startup and prevent the service from serving traffic, matching our "fail-fast" requirement for storage incompatibilities.
## Creating a Migration
1. Implement `IMongoMigration` under `StellaOps.Feedser.Storage.Mongo.Migrations`. Use a monotonically increasing identifier such as `yyyyMMdd_description`.
2. Keep the body idempotent: query state first, drop/re-create indexes only when mismatch is detected, and avoid multi-document transactions unless required.
3. Add the migration to DI in `ServiceCollectionExtensions` so it flows into the runner.
4. Write an integration test that exercises the migration against a Mongo2Go instance to validate behaviour.
## Current Migrations
| Id | Description |
| --- | --- |
| `20241005_document_expiry_indexes` | Ensures `document` collection uses the correct TTL/partial index depending on raw document retention settings. |
| `20241005_gridfs_expiry_indexes` | Aligns the GridFS `documents.files` TTL index with retention settings. |
## Operator Runbook
- `schema_migrations` records each applied migration (`_id`, `description`, `appliedAt`). Review this collection when auditing upgrades.
- To re-run a migration in a lab, delete the corresponding document from `schema_migrations` and restart the service. **Do not** do this in production unless the migration body is known to be idempotent and safe.
- When changing retention settings (`RawDocumentRetention`), deploy the new configuration and restart Feedser. The migration runner will adjust indexes on the next boot.
- If migrations fail, restart with `Logging__LogLevel__StellaOps.Feedser.Storage.Mongo.Migrations=Debug` to surface diagnostic output. Remediate underlying index/collection drift before retrying.
## Validating an Upgrade
1. Run `dotnet test --filter MongoMigrationRunnerTests` to exercise integration coverage.
2. In staging, execute `db.schema_migrations.find().sort({_id:1})` to verify applied migrations and timestamps.
3. Inspect index shapes: `db.document.getIndexes()` and `db.documents.files.getIndexes()` for TTL/partial filter alignment.

View File

@@ -0,0 +1,8 @@
namespace StellaOps.Feedser.Storage.Mongo.MergeEvents;
public interface IMergeEventStore
{
Task AppendAsync(MergeEventRecord record, CancellationToken cancellationToken);
Task<IReadOnlyList<MergeEventRecord>> GetRecentAsync(string advisoryKey, int limit, CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,48 @@
using MongoDB.Bson.Serialization.Attributes;
namespace StellaOps.Feedser.Storage.Mongo.MergeEvents;
[BsonIgnoreExtraElements]
public sealed class MergeEventDocument
{
[BsonId]
public Guid Id { get; set; }
[BsonElement("advisoryKey")]
public string AdvisoryKey { get; set; } = string.Empty;
[BsonElement("beforeHash")]
public byte[] BeforeHash { get; set; } = Array.Empty<byte>();
[BsonElement("afterHash")]
public byte[] AfterHash { get; set; } = Array.Empty<byte>();
[BsonElement("mergedAt")]
public DateTime MergedAt { get; set; }
[BsonElement("inputDocuments")]
public List<Guid> InputDocuments { get; set; } = new();
}
internal static class MergeEventDocumentExtensions
{
public static MergeEventDocument FromRecord(MergeEventRecord record)
=> new()
{
Id = record.Id,
AdvisoryKey = record.AdvisoryKey,
BeforeHash = record.BeforeHash,
AfterHash = record.AfterHash,
MergedAt = record.MergedAt.UtcDateTime,
InputDocuments = record.InputDocumentIds.ToList(),
};
public static MergeEventRecord ToRecord(this MergeEventDocument document)
=> new(
document.Id,
document.AdvisoryKey,
document.BeforeHash,
document.AfterHash,
DateTime.SpecifyKind(document.MergedAt, DateTimeKind.Utc),
document.InputDocuments);
}

View File

@@ -0,0 +1,9 @@
namespace StellaOps.Feedser.Storage.Mongo.MergeEvents;
public sealed record MergeEventRecord(
Guid Id,
string AdvisoryKey,
byte[] BeforeHash,
byte[] AfterHash,
DateTimeOffset MergedAt,
IReadOnlyList<Guid> InputDocumentIds);

View File

@@ -0,0 +1,36 @@
using Microsoft.Extensions.Logging;
using MongoDB.Driver;
namespace StellaOps.Feedser.Storage.Mongo.MergeEvents;
public sealed class MergeEventStore : IMergeEventStore
{
private readonly IMongoCollection<MergeEventDocument> _collection;
private readonly ILogger<MergeEventStore> _logger;
public MergeEventStore(IMongoDatabase database, ILogger<MergeEventStore> logger)
{
_collection = (database ?? throw new ArgumentNullException(nameof(database)))
.GetCollection<MergeEventDocument>(MongoStorageDefaults.Collections.MergeEvent);
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task AppendAsync(MergeEventRecord record, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(record);
var document = MergeEventDocumentExtensions.FromRecord(record);
await _collection.InsertOneAsync(document, cancellationToken: cancellationToken).ConfigureAwait(false);
_logger.LogDebug("Appended merge event {MergeId} for {AdvisoryKey}", record.Id, record.AdvisoryKey);
}
public async Task<IReadOnlyList<MergeEventRecord>> GetRecentAsync(string advisoryKey, int limit, CancellationToken cancellationToken)
{
var cursor = await _collection.Find(x => x.AdvisoryKey == advisoryKey)
.SortByDescending(x => x.MergedAt)
.Limit(limit)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
return cursor.Select(static x => x.ToRecord()).ToArray();
}
}

View File

@@ -0,0 +1,146 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using MongoDB.Bson;
using MongoDB.Driver;
namespace StellaOps.Feedser.Storage.Mongo.Migrations;
internal sealed class EnsureDocumentExpiryIndexesMigration : IMongoMigration
{
private readonly MongoStorageOptions _options;
public EnsureDocumentExpiryIndexesMigration(IOptions<MongoStorageOptions> options)
{
ArgumentNullException.ThrowIfNull(options);
_options = options.Value;
}
public string Id => "20241005_document_expiry_indexes";
public string Description => "Ensure document.expiresAt index matches configured retention";
public async Task ApplyAsync(IMongoDatabase database, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(database);
var needsTtl = _options.RawDocumentRetention > TimeSpan.Zero;
var collection = database.GetCollection<BsonDocument>(MongoStorageDefaults.Collections.Document);
using var cursor = await collection.Indexes.ListAsync(cancellationToken).ConfigureAwait(false);
var indexes = await cursor.ToListAsync(cancellationToken).ConfigureAwait(false);
var ttlIndex = indexes.FirstOrDefault(x => TryGetName(x, out var name) && string.Equals(name, "document_expiresAt_ttl", StringComparison.Ordinal));
var nonTtlIndex = indexes.FirstOrDefault(x => TryGetName(x, out var name) && string.Equals(name, "document_expiresAt", StringComparison.Ordinal));
if (needsTtl)
{
var shouldRebuild = ttlIndex is null || !IndexMatchesTtlExpectations(ttlIndex);
if (shouldRebuild)
{
if (ttlIndex is not null)
{
await collection.Indexes.DropOneAsync("document_expiresAt_ttl", cancellationToken).ConfigureAwait(false);
}
if (nonTtlIndex is not null)
{
await collection.Indexes.DropOneAsync("document_expiresAt", cancellationToken).ConfigureAwait(false);
}
var options = new CreateIndexOptions<BsonDocument>
{
Name = "document_expiresAt_ttl",
ExpireAfter = TimeSpan.Zero,
PartialFilterExpression = Builders<BsonDocument>.Filter.Exists("expiresAt", true),
};
var keys = Builders<BsonDocument>.IndexKeys.Ascending("expiresAt");
await collection.Indexes.CreateOneAsync(new CreateIndexModel<BsonDocument>(keys, options), cancellationToken: cancellationToken).ConfigureAwait(false);
}
else if (nonTtlIndex is not null)
{
await collection.Indexes.DropOneAsync("document_expiresAt", cancellationToken).ConfigureAwait(false);
}
}
else
{
if (ttlIndex is not null)
{
await collection.Indexes.DropOneAsync("document_expiresAt_ttl", cancellationToken).ConfigureAwait(false);
}
var shouldRebuild = nonTtlIndex is null || !IndexMatchesNonTtlExpectations(nonTtlIndex);
if (shouldRebuild)
{
if (nonTtlIndex is not null)
{
await collection.Indexes.DropOneAsync("document_expiresAt", cancellationToken).ConfigureAwait(false);
}
var options = new CreateIndexOptions<BsonDocument>
{
Name = "document_expiresAt",
PartialFilterExpression = Builders<BsonDocument>.Filter.Exists("expiresAt", true),
};
var keys = Builders<BsonDocument>.IndexKeys.Ascending("expiresAt");
await collection.Indexes.CreateOneAsync(new CreateIndexModel<BsonDocument>(keys, options), cancellationToken: cancellationToken).ConfigureAwait(false);
}
}
}
private static bool IndexMatchesTtlExpectations(BsonDocument index)
{
if (!index.TryGetValue("expireAfterSeconds", out var expireAfter) || expireAfter.ToDouble() != 0)
{
return false;
}
if (!index.TryGetValue("partialFilterExpression", out var partialFilter) || partialFilter is not BsonDocument partialDoc)
{
return false;
}
if (!partialDoc.TryGetValue("expiresAt", out var expiresAtRule) || expiresAtRule is not BsonDocument expiresAtDoc)
{
return false;
}
return expiresAtDoc.Contains("$exists") && expiresAtDoc["$exists"].ToBoolean();
}
private static bool IndexMatchesNonTtlExpectations(BsonDocument index)
{
if (index.Contains("expireAfterSeconds"))
{
return false;
}
if (!index.TryGetValue("partialFilterExpression", out var partialFilter) || partialFilter is not BsonDocument partialDoc)
{
return false;
}
if (!partialDoc.TryGetValue("expiresAt", out var expiresAtRule) || expiresAtRule is not BsonDocument expiresAtDoc)
{
return false;
}
return expiresAtDoc.Contains("$exists") && expiresAtDoc["$exists"].ToBoolean();
}
private static bool TryGetName(BsonDocument index, out string name)
{
if (index.TryGetValue("name", out var value) && value.IsString)
{
name = value.AsString;
return true;
}
name = string.Empty;
return false;
}
}

View File

@@ -0,0 +1,95 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using MongoDB.Bson;
using MongoDB.Driver;
namespace StellaOps.Feedser.Storage.Mongo.Migrations;
internal sealed class EnsureGridFsExpiryIndexesMigration : IMongoMigration
{
private readonly MongoStorageOptions _options;
public EnsureGridFsExpiryIndexesMigration(IOptions<MongoStorageOptions> options)
{
ArgumentNullException.ThrowIfNull(options);
_options = options.Value;
}
public string Id => "20241005_gridfs_expiry_indexes";
public string Description => "Ensure GridFS metadata.expiresAt TTL index reflects retention settings";
public async Task ApplyAsync(IMongoDatabase database, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(database);
var needsTtl = _options.RawDocumentRetention > TimeSpan.Zero;
var collection = database.GetCollection<BsonDocument>("documents.files");
using var cursor = await collection.Indexes.ListAsync(cancellationToken).ConfigureAwait(false);
var indexes = await cursor.ToListAsync(cancellationToken).ConfigureAwait(false);
var ttlIndex = indexes.FirstOrDefault(x => TryGetName(x, out var name) && string.Equals(name, "gridfs_files_expiresAt_ttl", StringComparison.Ordinal));
if (needsTtl)
{
var shouldRebuild = ttlIndex is null || !IndexMatchesTtlExpectations(ttlIndex);
if (shouldRebuild)
{
if (ttlIndex is not null)
{
await collection.Indexes.DropOneAsync("gridfs_files_expiresAt_ttl", cancellationToken).ConfigureAwait(false);
}
var keys = Builders<BsonDocument>.IndexKeys.Ascending("metadata.expiresAt");
var options = new CreateIndexOptions<BsonDocument>
{
Name = "gridfs_files_expiresAt_ttl",
ExpireAfter = TimeSpan.Zero,
PartialFilterExpression = Builders<BsonDocument>.Filter.Exists("metadata.expiresAt", true),
};
await collection.Indexes.CreateOneAsync(new CreateIndexModel<BsonDocument>(keys, options), cancellationToken: cancellationToken).ConfigureAwait(false);
}
}
else if (ttlIndex is not null)
{
await collection.Indexes.DropOneAsync("gridfs_files_expiresAt_ttl", cancellationToken).ConfigureAwait(false);
}
}
private static bool IndexMatchesTtlExpectations(BsonDocument index)
{
if (!index.TryGetValue("expireAfterSeconds", out var expireAfter) || expireAfter.ToDouble() != 0)
{
return false;
}
if (!index.TryGetValue("partialFilterExpression", out var partialFilter) || partialFilter is not BsonDocument partialDoc)
{
return false;
}
if (!partialDoc.TryGetValue("metadata.expiresAt", out var expiresAtRule) || expiresAtRule is not BsonDocument expiresAtDoc)
{
return false;
}
return expiresAtDoc.Contains("$exists") && expiresAtDoc["$exists"].ToBoolean();
}
private static bool TryGetName(BsonDocument index, out string name)
{
if (index.TryGetValue("name", out var value) && value.IsString)
{
name = value.AsString;
return true;
}
name = string.Empty;
return false;
}
}

View File

@@ -0,0 +1,24 @@
using MongoDB.Driver;
namespace StellaOps.Feedser.Storage.Mongo.Migrations;
/// <summary>
/// Represents a single, idempotent MongoDB migration.
/// </summary>
public interface IMongoMigration
{
/// <summary>
/// Unique identifier for the migration. Sorting is performed using ordinal comparison.
/// </summary>
string Id { get; }
/// <summary>
/// Short description surfaced in logs to aid runbooks.
/// </summary>
string Description { get; }
/// <summary>
/// Executes the migration.
/// </summary>
Task ApplyAsync(IMongoDatabase database, CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,18 @@
using MongoDB.Bson;
using MongoDB.Bson.Serialization.Attributes;
namespace StellaOps.Feedser.Storage.Mongo.Migrations;
[BsonIgnoreExtraElements]
internal sealed class MongoMigrationDocument
{
[BsonId]
public string Id { get; set; } = string.Empty;
[BsonElement("description")]
[BsonIgnoreIfNull]
public string? Description { get; set; }
[BsonElement("appliedAt")]
public DateTime AppliedAtUtc { get; set; }
}

View File

@@ -0,0 +1,102 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using MongoDB.Driver;
namespace StellaOps.Feedser.Storage.Mongo.Migrations;
/// <summary>
/// Executes pending schema migrations tracked inside MongoDB to keep upgrades deterministic.
/// </summary>
public sealed class MongoMigrationRunner
{
private readonly IMongoDatabase _database;
private readonly IReadOnlyList<IMongoMigration> _migrations;
private readonly ILogger<MongoMigrationRunner> _logger;
private readonly TimeProvider _timeProvider;
public MongoMigrationRunner(
IMongoDatabase database,
IEnumerable<IMongoMigration> migrations,
ILogger<MongoMigrationRunner> logger,
TimeProvider? timeProvider = null)
{
_database = database ?? throw new ArgumentNullException(nameof(database));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_timeProvider = timeProvider ?? TimeProvider.System;
_migrations = (migrations ?? throw new ArgumentNullException(nameof(migrations)))
.OrderBy(m => m.Id, StringComparer.Ordinal)
.ToArray();
}
public async Task RunAsync(CancellationToken cancellationToken)
{
if (_migrations.Count == 0)
{
return;
}
var collection = _database.GetCollection<MongoMigrationDocument>(MongoStorageDefaults.Collections.Migrations);
await EnsureCollectionExistsAsync(_database, cancellationToken).ConfigureAwait(false);
var appliedIds = await LoadAppliedMigrationIdsAsync(collection, cancellationToken).ConfigureAwait(false);
foreach (var migration in _migrations)
{
if (appliedIds.Contains(migration.Id, StringComparer.Ordinal))
{
continue;
}
_logger.LogInformation("Applying Mongo migration {MigrationId}: {Description}", migration.Id, migration.Description);
try
{
await migration.ApplyAsync(_database, cancellationToken).ConfigureAwait(false);
var document = new MongoMigrationDocument
{
Id = migration.Id,
Description = string.IsNullOrWhiteSpace(migration.Description) ? null : migration.Description,
AppliedAtUtc = _timeProvider.GetUtcNow().UtcDateTime,
};
await collection.InsertOneAsync(document, cancellationToken: cancellationToken).ConfigureAwait(false);
_logger.LogInformation("Mongo migration {MigrationId} applied", migration.Id);
}
catch (Exception ex)
{
_logger.LogError(ex, "Mongo migration {MigrationId} failed", migration.Id);
throw;
}
}
}
private static async Task<HashSet<string>> LoadAppliedMigrationIdsAsync(
IMongoCollection<MongoMigrationDocument> collection,
CancellationToken cancellationToken)
{
using var cursor = await collection.FindAsync(FilterDefinition<MongoMigrationDocument>.Empty, cancellationToken: cancellationToken).ConfigureAwait(false);
var applied = await cursor.ToListAsync(cancellationToken).ConfigureAwait(false);
var set = new HashSet<string>(StringComparer.Ordinal);
foreach (var document in applied)
{
if (!string.IsNullOrWhiteSpace(document.Id))
{
set.Add(document.Id);
}
}
return set;
}
private static async Task EnsureCollectionExistsAsync(IMongoDatabase database, CancellationToken cancellationToken)
{
using var cursor = await database.ListCollectionNamesAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
var names = await cursor.ToListAsync(cancellationToken).ConfigureAwait(false);
if (!names.Contains(MongoStorageDefaults.Collections.Migrations, StringComparer.Ordinal))
{
await database.CreateCollectionAsync(MongoStorageDefaults.Collections.Migrations, cancellationToken: cancellationToken).ConfigureAwait(false);
}
}
}

View File

@@ -0,0 +1,306 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MongoDB.Bson;
using MongoDB.Driver;
using StellaOps.Feedser.Storage.Mongo.Migrations;
namespace StellaOps.Feedser.Storage.Mongo;
/// <summary>
/// Ensures required collections and indexes exist before the service begins processing.
/// </summary>
public sealed class MongoBootstrapper
{
private const string RawDocumentBucketName = "documents";
private static readonly string[] RequiredCollections =
{
MongoStorageDefaults.Collections.Source,
MongoStorageDefaults.Collections.SourceState,
MongoStorageDefaults.Collections.Document,
MongoStorageDefaults.Collections.Dto,
MongoStorageDefaults.Collections.Advisory,
MongoStorageDefaults.Collections.Alias,
MongoStorageDefaults.Collections.Affected,
MongoStorageDefaults.Collections.Reference,
MongoStorageDefaults.Collections.KevFlag,
MongoStorageDefaults.Collections.RuFlags,
MongoStorageDefaults.Collections.JpFlags,
MongoStorageDefaults.Collections.PsirtFlags,
MongoStorageDefaults.Collections.MergeEvent,
MongoStorageDefaults.Collections.ExportState,
MongoStorageDefaults.Collections.ChangeHistory,
MongoStorageDefaults.Collections.Locks,
MongoStorageDefaults.Collections.Jobs,
MongoStorageDefaults.Collections.Migrations,
};
private readonly IMongoDatabase _database;
private readonly MongoStorageOptions _options;
private readonly ILogger<MongoBootstrapper> _logger;
private readonly MongoMigrationRunner _migrationRunner;
public MongoBootstrapper(
IMongoDatabase database,
IOptions<MongoStorageOptions> options,
ILogger<MongoBootstrapper> logger,
MongoMigrationRunner migrationRunner)
{
_database = database ?? throw new ArgumentNullException(nameof(database));
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_migrationRunner = migrationRunner ?? throw new ArgumentNullException(nameof(migrationRunner));
}
public async Task InitializeAsync(CancellationToken cancellationToken)
{
var existingCollections = await ListCollectionsAsync(cancellationToken).ConfigureAwait(false);
foreach (var collectionName in RequiredCollections)
{
if (!existingCollections.Contains(collectionName))
{
await _database.CreateCollectionAsync(collectionName, cancellationToken: cancellationToken).ConfigureAwait(false);
_logger.LogInformation("Created Mongo collection {Collection}", collectionName);
}
}
await Task.WhenAll(
EnsureLocksIndexesAsync(cancellationToken),
EnsureJobsIndexesAsync(cancellationToken),
EnsureAdvisoryIndexesAsync(cancellationToken),
EnsureDocumentsIndexesAsync(cancellationToken),
EnsureDtoIndexesAsync(cancellationToken),
EnsureAliasIndexesAsync(cancellationToken),
EnsureAffectedIndexesAsync(cancellationToken),
EnsureReferenceIndexesAsync(cancellationToken),
EnsureSourceStateIndexesAsync(cancellationToken),
EnsurePsirtFlagIndexesAsync(cancellationToken),
EnsureChangeHistoryIndexesAsync(cancellationToken),
EnsureGridFsIndexesAsync(cancellationToken)).ConfigureAwait(false);
await _migrationRunner.RunAsync(cancellationToken).ConfigureAwait(false);
_logger.LogInformation("Mongo bootstrapper completed");
}
private async Task<HashSet<string>> ListCollectionsAsync(CancellationToken cancellationToken)
{
using var cursor = await _database.ListCollectionNamesAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
var list = await cursor.ToListAsync(cancellationToken).ConfigureAwait(false);
return new HashSet<string>(list, StringComparer.Ordinal);
}
private Task EnsureLocksIndexesAsync(CancellationToken cancellationToken)
{
var collection = _database.GetCollection<BsonDocument>(MongoStorageDefaults.Collections.Locks);
var indexes = new List<CreateIndexModel<BsonDocument>>
{
new(
Builders<BsonDocument>.IndexKeys.Ascending("ttlAt"),
new CreateIndexOptions { Name = "ttl_at_ttl", ExpireAfter = TimeSpan.Zero }),
};
return collection.Indexes.CreateManyAsync(indexes, cancellationToken);
}
private Task EnsureJobsIndexesAsync(CancellationToken cancellationToken)
{
var collection = _database.GetCollection<BsonDocument>(MongoStorageDefaults.Collections.Jobs);
var indexes = new List<CreateIndexModel<BsonDocument>>
{
new(
Builders<BsonDocument>.IndexKeys.Descending("createdAt"),
new CreateIndexOptions { Name = "jobs_createdAt_desc" }),
new(
Builders<BsonDocument>.IndexKeys.Ascending("kind").Descending("createdAt"),
new CreateIndexOptions { Name = "jobs_kind_createdAt" }),
new(
Builders<BsonDocument>.IndexKeys.Ascending("status").Descending("createdAt"),
new CreateIndexOptions { Name = "jobs_status_createdAt" }),
};
return collection.Indexes.CreateManyAsync(indexes, cancellationToken);
}
private Task EnsureAdvisoryIndexesAsync(CancellationToken cancellationToken)
{
var collection = _database.GetCollection<BsonDocument>(MongoStorageDefaults.Collections.Advisory);
var indexes = new List<CreateIndexModel<BsonDocument>>
{
new(
Builders<BsonDocument>.IndexKeys.Ascending("advisoryKey"),
new CreateIndexOptions { Name = "advisory_key_unique", Unique = true }),
new(
Builders<BsonDocument>.IndexKeys.Descending("modified"),
new CreateIndexOptions { Name = "advisory_modified_desc" }),
new(
Builders<BsonDocument>.IndexKeys.Descending("published"),
new CreateIndexOptions { Name = "advisory_published_desc" }),
};
return collection.Indexes.CreateManyAsync(indexes, cancellationToken);
}
private Task EnsureDocumentsIndexesAsync(CancellationToken cancellationToken)
{
var collection = _database.GetCollection<BsonDocument>(MongoStorageDefaults.Collections.Document);
var indexes = new List<CreateIndexModel<BsonDocument>>
{
new(
Builders<BsonDocument>.IndexKeys.Ascending("sourceName").Ascending("uri"),
new CreateIndexOptions { Name = "document_source_uri_unique", Unique = true }),
new(
Builders<BsonDocument>.IndexKeys.Descending("fetchedAt"),
new CreateIndexOptions { Name = "document_fetchedAt_desc" }),
};
var expiresKey = Builders<BsonDocument>.IndexKeys.Ascending("expiresAt");
var expiresOptions = new CreateIndexOptions<BsonDocument>
{
Name = _options.RawDocumentRetention > TimeSpan.Zero ? "document_expiresAt_ttl" : "document_expiresAt",
PartialFilterExpression = Builders<BsonDocument>.Filter.Exists("expiresAt", true),
};
if (_options.RawDocumentRetention > TimeSpan.Zero)
{
expiresOptions.ExpireAfter = TimeSpan.Zero;
}
indexes.Add(new CreateIndexModel<BsonDocument>(expiresKey, expiresOptions));
return collection.Indexes.CreateManyAsync(indexes, cancellationToken);
}
private Task EnsureAliasIndexesAsync(CancellationToken cancellationToken)
{
var collection = _database.GetCollection<BsonDocument>(MongoStorageDefaults.Collections.Alias);
var indexes = new List<CreateIndexModel<BsonDocument>>
{
new(
Builders<BsonDocument>.IndexKeys.Ascending("scheme").Ascending("value"),
new CreateIndexOptions { Name = "alias_scheme_value", Unique = false }),
};
return collection.Indexes.CreateManyAsync(indexes, cancellationToken);
}
private Task EnsureGridFsIndexesAsync(CancellationToken cancellationToken)
{
if (_options.RawDocumentRetention <= TimeSpan.Zero)
{
return Task.CompletedTask;
}
var collectionName = $"{RawDocumentBucketName}.files";
var collection = _database.GetCollection<BsonDocument>(collectionName);
var indexes = new List<CreateIndexModel<BsonDocument>>
{
new(
Builders<BsonDocument>.IndexKeys.Ascending("metadata.expiresAt"),
new CreateIndexOptions<BsonDocument>
{
Name = "gridfs_files_expiresAt_ttl",
ExpireAfter = TimeSpan.Zero,
PartialFilterExpression = Builders<BsonDocument>.Filter.Exists("metadata.expiresAt", true),
}),
};
return collection.Indexes.CreateManyAsync(indexes, cancellationToken);
}
private Task EnsureAffectedIndexesAsync(CancellationToken cancellationToken)
{
var collection = _database.GetCollection<BsonDocument>(MongoStorageDefaults.Collections.Affected);
var indexes = new List<CreateIndexModel<BsonDocument>>
{
new(
Builders<BsonDocument>.IndexKeys.Ascending("platform").Ascending("name"),
new CreateIndexOptions { Name = "affected_platform_name" }),
new(
Builders<BsonDocument>.IndexKeys.Ascending("advisoryId"),
new CreateIndexOptions { Name = "affected_advisoryId" }),
};
return collection.Indexes.CreateManyAsync(indexes, cancellationToken);
}
private Task EnsureReferenceIndexesAsync(CancellationToken cancellationToken)
{
var collection = _database.GetCollection<BsonDocument>(MongoStorageDefaults.Collections.Reference);
var indexes = new List<CreateIndexModel<BsonDocument>>
{
new(
Builders<BsonDocument>.IndexKeys.Ascending("url"),
new CreateIndexOptions { Name = "reference_url" }),
new(
Builders<BsonDocument>.IndexKeys.Ascending("advisoryId"),
new CreateIndexOptions { Name = "reference_advisoryId" }),
};
return collection.Indexes.CreateManyAsync(indexes, cancellationToken);
}
private Task EnsureSourceStateIndexesAsync(CancellationToken cancellationToken)
{
var collection = _database.GetCollection<BsonDocument>(MongoStorageDefaults.Collections.SourceState);
var indexes = new List<CreateIndexModel<BsonDocument>>
{
new(
Builders<BsonDocument>.IndexKeys.Ascending("sourceName"),
new CreateIndexOptions { Name = "source_state_unique", Unique = true }),
};
return collection.Indexes.CreateManyAsync(indexes, cancellationToken);
}
private Task EnsureDtoIndexesAsync(CancellationToken cancellationToken)
{
var collection = _database.GetCollection<BsonDocument>(MongoStorageDefaults.Collections.Dto);
var indexes = new List<CreateIndexModel<BsonDocument>>
{
new(
Builders<BsonDocument>.IndexKeys.Ascending("documentId"),
new CreateIndexOptions { Name = "dto_documentId" }),
new(
Builders<BsonDocument>.IndexKeys.Ascending("sourceName").Descending("validatedAt"),
new CreateIndexOptions { Name = "dto_source_validated" }),
};
return collection.Indexes.CreateManyAsync(indexes, cancellationToken);
}
private Task EnsurePsirtFlagIndexesAsync(CancellationToken cancellationToken)
{
var collection = _database.GetCollection<BsonDocument>(MongoStorageDefaults.Collections.PsirtFlags);
var indexes = new List<CreateIndexModel<BsonDocument>>
{
new(
Builders<BsonDocument>.IndexKeys.Ascending("advisoryKey"),
new CreateIndexOptions { Name = "psirt_advisoryKey_unique", Unique = true }),
new(
Builders<BsonDocument>.IndexKeys.Ascending("vendor"),
new CreateIndexOptions { Name = "psirt_vendor" }),
};
return collection.Indexes.CreateManyAsync(indexes, cancellationToken);
}
private Task EnsureChangeHistoryIndexesAsync(CancellationToken cancellationToken)
{
var collection = _database.GetCollection<BsonDocument>(MongoStorageDefaults.Collections.ChangeHistory);
var indexes = new List<CreateIndexModel<BsonDocument>>
{
new(
Builders<BsonDocument>.IndexKeys.Ascending("source").Ascending("advisoryKey").Descending("capturedAt"),
new CreateIndexOptions { Name = "history_source_advisory_capturedAt" }),
new(
Builders<BsonDocument>.IndexKeys.Descending("capturedAt"),
new CreateIndexOptions { Name = "history_capturedAt" }),
new(
Builders<BsonDocument>.IndexKeys.Ascending("documentId"),
new CreateIndexOptions { Name = "history_documentId" })
};
return collection.Indexes.CreateManyAsync(indexes, cancellationToken);
}
}

View File

@@ -0,0 +1,192 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Extensions.Logging;
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Driver;
using StellaOps.Feedser.Core.Jobs;
namespace StellaOps.Feedser.Storage.Mongo;
public sealed class MongoJobStore : IJobStore
{
private static readonly string PendingStatus = JobRunStatus.Pending.ToString();
private static readonly string RunningStatus = JobRunStatus.Running.ToString();
private readonly IMongoCollection<JobRunDocument> _collection;
private readonly ILogger<MongoJobStore> _logger;
public MongoJobStore(IMongoCollection<JobRunDocument> collection, ILogger<MongoJobStore> logger)
{
_collection = collection ?? throw new ArgumentNullException(nameof(collection));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<JobRunSnapshot> CreateAsync(JobRunCreateRequest request, CancellationToken cancellationToken)
{
var runId = Guid.NewGuid();
var document = JobRunDocumentExtensions.FromRequest(request, runId);
await _collection.InsertOneAsync(document, cancellationToken: cancellationToken).ConfigureAwait(false);
_logger.LogDebug("Created job run {RunId} for {Kind} with trigger {Trigger}", runId, request.Kind, request.Trigger);
return document.ToSnapshot();
}
public async Task<JobRunSnapshot?> TryStartAsync(Guid runId, DateTimeOffset startedAt, CancellationToken cancellationToken)
{
var filter = Builders<JobRunDocument>.Filter.Eq(x => x.Id, runId)
& Builders<JobRunDocument>.Filter.Eq(x => x.Status, PendingStatus);
var update = Builders<JobRunDocument>.Update
.Set(x => x.Status, RunningStatus)
.Set(x => x.StartedAt, startedAt.UtcDateTime);
var result = await _collection.FindOneAndUpdateAsync(
filter,
update,
new FindOneAndUpdateOptions<JobRunDocument>
{
ReturnDocument = ReturnDocument.After,
},
cancellationToken).ConfigureAwait(false);
if (result is null)
{
_logger.LogDebug("Failed to start job run {RunId}; status transition rejected", runId);
return null;
}
return result.ToSnapshot();
}
public async Task<JobRunSnapshot?> TryCompleteAsync(Guid runId, JobRunCompletion completion, CancellationToken cancellationToken)
{
var filter = Builders<JobRunDocument>.Filter.Eq(x => x.Id, runId)
& Builders<JobRunDocument>.Filter.In(x => x.Status, new[] { PendingStatus, RunningStatus });
var update = Builders<JobRunDocument>.Update
.Set(x => x.Status, completion.Status.ToString())
.Set(x => x.CompletedAt, completion.CompletedAt.UtcDateTime)
.Set(x => x.Error, completion.Error);
var result = await _collection.FindOneAndUpdateAsync(
filter,
update,
new FindOneAndUpdateOptions<JobRunDocument>
{
ReturnDocument = ReturnDocument.After,
},
cancellationToken).ConfigureAwait(false);
if (result is null)
{
_logger.LogWarning("Failed to mark job run {RunId} as {Status}", runId, completion.Status);
return null;
}
return result.ToSnapshot();
}
public async Task<JobRunSnapshot?> FindAsync(Guid runId, CancellationToken cancellationToken)
{
var cursor = await _collection.FindAsync(x => x.Id == runId, cancellationToken: cancellationToken).ConfigureAwait(false);
var document = await cursor.FirstOrDefaultAsync(cancellationToken).ConfigureAwait(false);
return document?.ToSnapshot();
}
public async Task<IReadOnlyList<JobRunSnapshot>> GetRecentRunsAsync(string? kind, int limit, CancellationToken cancellationToken)
{
if (limit <= 0)
{
return Array.Empty<JobRunSnapshot>();
}
var filter = string.IsNullOrWhiteSpace(kind)
? Builders<JobRunDocument>.Filter.Empty
: Builders<JobRunDocument>.Filter.Eq(x => x.Kind, kind);
var cursor = await _collection.Find(filter)
.SortByDescending(x => x.CreatedAt)
.Limit(limit)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
return cursor.Select(static doc => doc.ToSnapshot()).ToArray();
}
public async Task<IReadOnlyList<JobRunSnapshot>> GetActiveRunsAsync(CancellationToken cancellationToken)
{
var filter = Builders<JobRunDocument>.Filter.In(x => x.Status, new[] { PendingStatus, RunningStatus });
var cursor = await _collection.Find(filter)
.SortByDescending(x => x.CreatedAt)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
return cursor.Select(static doc => doc.ToSnapshot()).ToArray();
}
public async Task<JobRunSnapshot?> GetLastRunAsync(string kind, CancellationToken cancellationToken)
{
var cursor = await _collection.Find(x => x.Kind == kind)
.SortByDescending(x => x.CreatedAt)
.Limit(1)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
return cursor.FirstOrDefault()?.ToSnapshot();
}
public async Task<IReadOnlyDictionary<string, JobRunSnapshot>> GetLastRunsAsync(IEnumerable<string> kinds, CancellationToken cancellationToken)
{
if (kinds is null)
{
throw new ArgumentNullException(nameof(kinds));
}
var kindList = kinds
.Where(static kind => !string.IsNullOrWhiteSpace(kind))
.Select(static kind => kind.Trim())
.Distinct(StringComparer.Ordinal)
.ToArray();
if (kindList.Length == 0)
{
return new Dictionary<string, JobRunSnapshot>(StringComparer.Ordinal);
}
var matchStage = new BsonDocument("$match", new BsonDocument("kind", new BsonDocument("$in", new BsonArray(kindList))));
var sortStage = new BsonDocument("$sort", new BsonDocument("createdAt", -1));
var groupStage = new BsonDocument("$group", new BsonDocument
{
{ "_id", "$kind" },
{ "document", new BsonDocument("$first", "$$ROOT") }
});
var pipeline = new[] { matchStage, sortStage, groupStage };
var aggregate = await _collection.Aggregate<BsonDocument>(pipeline)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
var results = new Dictionary<string, JobRunSnapshot>(StringComparer.Ordinal);
foreach (var element in aggregate)
{
if (!element.TryGetValue("_id", out var idValue) || idValue.BsonType != BsonType.String)
{
continue;
}
if (!element.TryGetValue("document", out var documentValue) || documentValue.BsonType != BsonType.Document)
{
continue;
}
var document = BsonSerializer.Deserialize<JobRunDocument>(documentValue.AsBsonDocument);
results[idValue.AsString] = document.ToSnapshot();
}
return results;
}
}

View File

@@ -0,0 +1,116 @@
using Microsoft.Extensions.Logging;
using MongoDB.Driver;
using StellaOps.Feedser.Core.Jobs;
namespace StellaOps.Feedser.Storage.Mongo;
public sealed class MongoLeaseStore : ILeaseStore
{
private readonly IMongoCollection<JobLeaseDocument> _collection;
private readonly ILogger<MongoLeaseStore> _logger;
public MongoLeaseStore(IMongoCollection<JobLeaseDocument> collection, ILogger<MongoLeaseStore> logger)
{
_collection = collection ?? throw new ArgumentNullException(nameof(collection));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<JobLease?> TryAcquireAsync(string key, string holder, TimeSpan leaseDuration, DateTimeOffset now, CancellationToken cancellationToken)
{
var nowUtc = now.UtcDateTime;
var ttlUtc = nowUtc.Add(leaseDuration);
var filter = Builders<JobLeaseDocument>.Filter.Eq(x => x.Key, key)
& Builders<JobLeaseDocument>.Filter.Or(
Builders<JobLeaseDocument>.Filter.Lte(x => x.TtlAt, nowUtc),
Builders<JobLeaseDocument>.Filter.Eq(x => x.Holder, holder));
var update = Builders<JobLeaseDocument>.Update
.Set(x => x.Holder, holder)
.Set(x => x.AcquiredAt, nowUtc)
.Set(x => x.HeartbeatAt, nowUtc)
.Set(x => x.LeaseMs, (long)leaseDuration.TotalMilliseconds)
.Set(x => x.TtlAt, ttlUtc);
var options = new FindOneAndUpdateOptions<JobLeaseDocument>
{
ReturnDocument = ReturnDocument.After,
};
var updated = await _collection.FindOneAndUpdateAsync(filter, update, options, cancellationToken).ConfigureAwait(false);
if (updated is not null)
{
_logger.LogDebug("Lease {Key} acquired by {Holder}", key, holder);
return updated.ToLease();
}
try
{
var document = new JobLeaseDocument
{
Key = key,
Holder = holder,
AcquiredAt = nowUtc,
HeartbeatAt = nowUtc,
LeaseMs = (long)leaseDuration.TotalMilliseconds,
TtlAt = ttlUtc,
};
await _collection.InsertOneAsync(document, cancellationToken: cancellationToken).ConfigureAwait(false);
_logger.LogDebug("Lease {Key} inserted for {Holder}", key, holder);
return document.ToLease();
}
catch (MongoWriteException ex) when (ex.WriteError.Category == ServerErrorCategory.DuplicateKey)
{
_logger.LogDebug(ex, "Lease {Key} already held by another process", key);
return null;
}
}
public async Task<JobLease?> HeartbeatAsync(string key, string holder, TimeSpan leaseDuration, DateTimeOffset now, CancellationToken cancellationToken)
{
var nowUtc = now.UtcDateTime;
var ttlUtc = nowUtc.Add(leaseDuration);
var filter = Builders<JobLeaseDocument>.Filter.Eq(x => x.Key, key)
& Builders<JobLeaseDocument>.Filter.Eq(x => x.Holder, holder);
var update = Builders<JobLeaseDocument>.Update
.Set(x => x.HeartbeatAt, nowUtc)
.Set(x => x.LeaseMs, (long)leaseDuration.TotalMilliseconds)
.Set(x => x.TtlAt, ttlUtc);
var updated = await _collection.FindOneAndUpdateAsync(
filter,
update,
new FindOneAndUpdateOptions<JobLeaseDocument>
{
ReturnDocument = ReturnDocument.After,
},
cancellationToken).ConfigureAwait(false);
if (updated is null)
{
_logger.LogDebug("Heartbeat rejected for lease {Key} held by {Holder}", key, holder);
}
return updated?.ToLease();
}
public async Task<bool> ReleaseAsync(string key, string holder, CancellationToken cancellationToken)
{
var result = await _collection.DeleteOneAsync(
Builders<JobLeaseDocument>.Filter.Eq(x => x.Key, key)
& Builders<JobLeaseDocument>.Filter.Eq(x => x.Holder, holder),
cancellationToken).ConfigureAwait(false);
if (result.DeletedCount == 0)
{
_logger.LogDebug("Lease {Key} not released by {Holder}; no matching document", key, holder);
return false;
}
_logger.LogDebug("Lease {Key} released by {Holder}", key, holder);
return true;
}
}

View File

@@ -0,0 +1,112 @@
using Microsoft.Extensions.Logging;
using MongoDB.Bson;
using MongoDB.Driver;
namespace StellaOps.Feedser.Storage.Mongo;
public sealed class MongoSourceStateRepository : ISourceStateRepository
{
private readonly IMongoCollection<SourceStateDocument> _collection;
private const int MaxFailureReasonLength = 1024;
private readonly ILogger<MongoSourceStateRepository> _logger;
public MongoSourceStateRepository(IMongoDatabase database, ILogger<MongoSourceStateRepository> logger)
{
_collection = (database ?? throw new ArgumentNullException(nameof(database)))
.GetCollection<SourceStateDocument>(MongoStorageDefaults.Collections.SourceState);
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<SourceStateRecord?> TryGetAsync(string sourceName, CancellationToken cancellationToken)
{
var cursor = await _collection.FindAsync(x => x.SourceName == sourceName, cancellationToken: cancellationToken).ConfigureAwait(false);
var document = await cursor.FirstOrDefaultAsync(cancellationToken).ConfigureAwait(false);
return document?.ToRecord();
}
public async Task<SourceStateRecord> UpsertAsync(SourceStateRecord record, CancellationToken cancellationToken)
{
var document = SourceStateDocumentExtensions.FromRecord(record with { UpdatedAt = DateTimeOffset.UtcNow });
await _collection.ReplaceOneAsync(
x => x.SourceName == record.SourceName,
document,
new ReplaceOptions { IsUpsert = true },
cancellationToken).ConfigureAwait(false);
_logger.LogDebug("Upserted source state for {Source}", record.SourceName);
return document.ToRecord();
}
public async Task<SourceStateRecord?> UpdateCursorAsync(string sourceName, BsonDocument cursor, DateTimeOffset completedAt, CancellationToken cancellationToken)
{
ArgumentException.ThrowIfNullOrEmpty(sourceName);
var update = Builders<SourceStateDocument>.Update
.Set(x => x.Cursor, cursor ?? new BsonDocument())
.Set(x => x.LastSuccess, completedAt.UtcDateTime)
.Set(x => x.FailCount, 0)
.Set(x => x.BackoffUntil, (DateTime?)null)
.Set(x => x.LastFailureReason, null)
.Set(x => x.UpdatedAt, DateTime.UtcNow)
.SetOnInsert(x => x.SourceName, sourceName);
var options = new FindOneAndUpdateOptions<SourceStateDocument>
{
ReturnDocument = ReturnDocument.After,
IsUpsert = true,
};
var document = await _collection
.FindOneAndUpdateAsync<SourceStateDocument, SourceStateDocument>(
x => x.SourceName == sourceName,
update,
options,
cancellationToken)
.ConfigureAwait(false);
return document?.ToRecord();
}
public async Task<SourceStateRecord?> MarkFailureAsync(string sourceName, DateTimeOffset failedAt, TimeSpan? backoff, string? failureReason, CancellationToken cancellationToken)
{
ArgumentException.ThrowIfNullOrEmpty(sourceName);
var reasonValue = NormalizeFailureReason(failureReason);
var update = Builders<SourceStateDocument>.Update
.Inc(x => x.FailCount, 1)
.Set(x => x.LastFailure, failedAt.UtcDateTime)
.Set(x => x.BackoffUntil, backoff.HasValue ? failedAt.UtcDateTime.Add(backoff.Value) : null)
.Set(x => x.LastFailureReason, reasonValue)
.Set(x => x.UpdatedAt, DateTime.UtcNow)
.SetOnInsert(x => x.SourceName, sourceName);
var options = new FindOneAndUpdateOptions<SourceStateDocument>
{
ReturnDocument = ReturnDocument.After,
IsUpsert = true,
};
var document = await _collection
.FindOneAndUpdateAsync<SourceStateDocument, SourceStateDocument>(
x => x.SourceName == sourceName,
update,
options,
cancellationToken)
.ConfigureAwait(false);
return document?.ToRecord();
}
private static string? NormalizeFailureReason(string? reason)
{
if (string.IsNullOrWhiteSpace(reason))
{
return null;
}
var trimmed = reason.Trim();
if (trimmed.Length <= MaxFailureReasonLength)
{
return trimmed;
}
return trimmed[..MaxFailureReasonLength];
}
}

View File

@@ -0,0 +1,28 @@
namespace StellaOps.Feedser.Storage.Mongo;
public static class MongoStorageDefaults
{
public const string DefaultDatabaseName = "feedser";
public static class Collections
{
public const string Source = "source";
public const string SourceState = "source_state";
public const string Document = "document";
public const string Dto = "dto";
public const string Advisory = "advisory";
public const string Alias = "alias";
public const string Affected = "affected";
public const string Reference = "reference";
public const string KevFlag = "kev_flag";
public const string RuFlags = "ru_flags";
public const string JpFlags = "jp_flags";
public const string PsirtFlags = "psirt_flags";
public const string MergeEvent = "merge_event";
public const string ExportState = "export_state";
public const string Locks = "locks";
public const string Jobs = "jobs";
public const string Migrations = "schema_migrations";
public const string ChangeHistory = "source_change_history";
}
}

View File

@@ -0,0 +1,78 @@
using MongoDB.Driver;
namespace StellaOps.Feedser.Storage.Mongo;
public sealed class MongoStorageOptions
{
public string ConnectionString { get; set; } = string.Empty;
public string? DatabaseName { get; set; }
public TimeSpan CommandTimeout { get; set; } = TimeSpan.FromSeconds(30);
/// <summary>
/// Retention period for raw documents (document + DTO + GridFS payloads).
/// Set to <see cref="TimeSpan.Zero"/> to disable automatic expiry.
/// </summary>
public TimeSpan RawDocumentRetention { get; set; } = TimeSpan.FromDays(45);
/// <summary>
/// Additional grace period applied on top of <see cref="RawDocumentRetention"/> before TTL purges old rows.
/// Allows the retention background service to delete GridFS blobs first.
/// </summary>
public TimeSpan RawDocumentRetentionTtlGrace { get; set; } = TimeSpan.FromDays(1);
/// <summary>
/// Interval between retention sweeps. Only used when <see cref="RawDocumentRetention"/> is greater than zero.
/// </summary>
public TimeSpan RawDocumentRetentionSweepInterval { get; set; } = TimeSpan.FromHours(6);
public string GetDatabaseName()
{
if (!string.IsNullOrWhiteSpace(DatabaseName))
{
return DatabaseName.Trim();
}
if (!string.IsNullOrWhiteSpace(ConnectionString))
{
var url = MongoUrl.Create(ConnectionString);
if (!string.IsNullOrWhiteSpace(url.DatabaseName))
{
return url.DatabaseName;
}
}
return MongoStorageDefaults.DefaultDatabaseName;
}
public void EnsureValid()
{
if (string.IsNullOrWhiteSpace(ConnectionString))
{
throw new InvalidOperationException("Mongo connection string is not configured.");
}
if (CommandTimeout <= TimeSpan.Zero)
{
throw new InvalidOperationException("Command timeout must be greater than zero.");
}
if (RawDocumentRetention < TimeSpan.Zero)
{
throw new InvalidOperationException("Raw document retention cannot be negative.");
}
if (RawDocumentRetentionTtlGrace < TimeSpan.Zero)
{
throw new InvalidOperationException("Raw document retention TTL grace cannot be negative.");
}
if (RawDocumentRetention > TimeSpan.Zero && RawDocumentRetentionSweepInterval <= TimeSpan.Zero)
{
throw new InvalidOperationException("Raw document retention sweep interval must be positive when retention is enabled.");
}
_ = GetDatabaseName();
}
}

View File

@@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("StellaOps.Feedser.Storage.Mongo.Tests")]

View File

@@ -0,0 +1,11 @@
using System.Threading;
using System.Threading.Tasks;
namespace StellaOps.Feedser.Storage.Mongo.PsirtFlags;
public interface IPsirtFlagStore
{
Task UpsertAsync(PsirtFlagRecord record, CancellationToken cancellationToken);
Task<PsirtFlagRecord?> FindAsync(string advisoryKey, CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,52 @@
using MongoDB.Bson.Serialization.Attributes;
namespace StellaOps.Feedser.Storage.Mongo.PsirtFlags;
[BsonIgnoreExtraElements]
public sealed class PsirtFlagDocument
{
[BsonId]
[BsonElement("advisoryKey")]
public string AdvisoryKey { get; set; } = string.Empty;
[BsonElement("vendor")]
public string Vendor { get; set; } = string.Empty;
[BsonElement("sourceName")]
public string SourceName { get; set; } = string.Empty;
[BsonElement("advisoryIdText")]
public string AdvisoryIdText { get; set; } = string.Empty;
[BsonElement("flaggedAt")]
public DateTime FlaggedAt { get; set; }
}
internal static class PsirtFlagDocumentExtensions
{
public static PsirtFlagDocument FromRecord(PsirtFlagRecord record)
{
ArgumentNullException.ThrowIfNull(record);
return new PsirtFlagDocument
{
AdvisoryKey = string.IsNullOrWhiteSpace(record.AdvisoryKey) ? record.AdvisoryIdText : record.AdvisoryKey,
Vendor = record.Vendor,
SourceName = record.SourceName,
AdvisoryIdText = record.AdvisoryIdText,
FlaggedAt = record.FlaggedAt.UtcDateTime,
};
}
public static PsirtFlagRecord ToRecord(this PsirtFlagDocument document)
{
ArgumentNullException.ThrowIfNull(document);
return new PsirtFlagRecord(
document.AdvisoryKey,
document.Vendor,
document.SourceName,
document.AdvisoryIdText,
DateTime.SpecifyKind(document.FlaggedAt, DateTimeKind.Utc));
}
}

View File

@@ -0,0 +1,15 @@
namespace StellaOps.Feedser.Storage.Mongo.PsirtFlags;
/// <summary>
/// Describes a PSIRT precedence flag for a canonical advisory.
/// </summary>
public sealed record PsirtFlagRecord(
string AdvisoryKey,
string Vendor,
string SourceName,
string AdvisoryIdText,
DateTimeOffset FlaggedAt)
{
public PsirtFlagRecord WithFlaggedAt(DateTimeOffset flaggedAt)
=> this with { FlaggedAt = flaggedAt.ToUniversalTime() };
}

View File

@@ -0,0 +1,50 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using MongoDB.Driver;
namespace StellaOps.Feedser.Storage.Mongo.PsirtFlags;
public sealed class PsirtFlagStore : IPsirtFlagStore
{
private readonly IMongoCollection<PsirtFlagDocument> _collection;
private readonly ILogger<PsirtFlagStore> _logger;
public PsirtFlagStore(IMongoDatabase database, ILogger<PsirtFlagStore> logger)
{
ArgumentNullException.ThrowIfNull(database);
ArgumentNullException.ThrowIfNull(logger);
_collection = database.GetCollection<PsirtFlagDocument>(MongoStorageDefaults.Collections.PsirtFlags);
_logger = logger;
}
public async Task UpsertAsync(PsirtFlagRecord record, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(record);
ArgumentException.ThrowIfNullOrEmpty(record.AdvisoryKey);
var document = PsirtFlagDocumentExtensions.FromRecord(record);
var filter = Builders<PsirtFlagDocument>.Filter.Eq(x => x.AdvisoryKey, record.AdvisoryKey);
var options = new ReplaceOptions { IsUpsert = true };
try
{
await _collection.ReplaceOneAsync(filter, document, options, cancellationToken).ConfigureAwait(false);
_logger.LogDebug("Upserted PSIRT flag for {AdvisoryKey}", record.AdvisoryKey);
}
catch (MongoWriteException ex) when (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey)
{
_logger.LogWarning(ex, "Duplicate PSIRT flag detected for {AdvisoryKey}", record.AdvisoryKey);
}
}
public async Task<PsirtFlagRecord?> FindAsync(string advisoryKey, CancellationToken cancellationToken)
{
ArgumentException.ThrowIfNullOrEmpty(advisoryKey);
var filter = Builders<PsirtFlagDocument>.Filter.Eq(x => x.AdvisoryKey, advisoryKey);
var document = await _collection.Find(filter).FirstOrDefaultAsync(cancellationToken).ConfigureAwait(false);
return document?.ToRecord();
}
}

View File

@@ -0,0 +1,155 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MongoDB.Driver;
using MongoDB.Driver.GridFS;
using StellaOps.Feedser.Storage.Mongo.Documents;
using StellaOps.Feedser.Storage.Mongo.Dtos;
namespace StellaOps.Feedser.Storage.Mongo;
/// <summary>
/// Periodically purges expired raw documents, associated DTO payloads, and GridFS content.
/// Complements TTL indexes by ensuring deterministic cleanup before Mongo's background sweeper runs.
/// </summary>
internal sealed class RawDocumentRetentionService : BackgroundService
{
private readonly IMongoCollection<DocumentDocument> _documents;
private readonly IMongoCollection<DtoDocument> _dtos;
private readonly GridFSBucket _bucket;
private readonly MongoStorageOptions _options;
private readonly ILogger<RawDocumentRetentionService> _logger;
private readonly TimeProvider _timeProvider;
public RawDocumentRetentionService(
IMongoDatabase database,
IOptions<MongoStorageOptions> options,
ILogger<RawDocumentRetentionService> logger,
TimeProvider? timeProvider = null)
{
ArgumentNullException.ThrowIfNull(database);
ArgumentNullException.ThrowIfNull(options);
ArgumentNullException.ThrowIfNull(logger);
_documents = database.GetCollection<DocumentDocument>(MongoStorageDefaults.Collections.Document);
_dtos = database.GetCollection<DtoDocument>(MongoStorageDefaults.Collections.Dto);
_bucket = new GridFSBucket(database, new GridFSBucketOptions
{
BucketName = "documents",
ReadConcern = database.Settings.ReadConcern,
WriteConcern = database.Settings.WriteConcern,
});
_options = options.Value;
_logger = logger;
_timeProvider = timeProvider ?? TimeProvider.System;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (_options.RawDocumentRetention <= TimeSpan.Zero)
{
_logger.LogInformation("Raw document retention disabled; purge service idle.");
return;
}
var sweepInterval = _options.RawDocumentRetentionSweepInterval > TimeSpan.Zero
? _options.RawDocumentRetentionSweepInterval
: TimeSpan.FromHours(6);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await SweepExpiredDocumentsAsync(stoppingToken).ConfigureAwait(false);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Raw document retention sweep failed");
}
try
{
await Task.Delay(sweepInterval, stoppingToken).ConfigureAwait(false);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
}
}
internal async Task<int> SweepExpiredDocumentsAsync(CancellationToken cancellationToken)
{
var grace = _options.RawDocumentRetentionTtlGrace >= TimeSpan.Zero
? _options.RawDocumentRetentionTtlGrace
: TimeSpan.Zero;
var threshold = _timeProvider.GetUtcNow() + grace;
var filterBuilder = Builders<DocumentDocument>.Filter;
var filter = filterBuilder.And(
filterBuilder.Ne(doc => doc.ExpiresAt, null),
filterBuilder.Lte(doc => doc.ExpiresAt, threshold.UtcDateTime));
var removed = 0;
while (!cancellationToken.IsCancellationRequested)
{
var batch = await _documents
.Find(filter)
.SortBy(doc => doc.ExpiresAt)
.Limit(200)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
if (batch.Count == 0)
{
break;
}
foreach (var document in batch)
{
if (cancellationToken.IsCancellationRequested)
{
break;
}
await PurgeDocumentAsync(document, cancellationToken).ConfigureAwait(false);
removed++;
}
}
if (removed > 0)
{
_logger.LogInformation("Purged {Count} expired raw documents (threshold <= {Threshold})", removed, threshold);
}
return removed;
}
private async Task PurgeDocumentAsync(DocumentDocument document, CancellationToken cancellationToken)
{
if (document.GridFsId.HasValue)
{
try
{
await _bucket.DeleteAsync(document.GridFsId.Value, cancellationToken).ConfigureAwait(false);
}
catch (GridFSFileNotFoundException)
{
// already removed or TTL swept
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to delete GridFS payload {GridFsId} for document {DocumentId}", document.GridFsId, document.Id);
}
}
await _dtos.DeleteManyAsync(x => x.DocumentId == document.Id, cancellationToken).ConfigureAwait(false);
await _documents.DeleteOneAsync(x => x.Id == document.Id, cancellationToken).ConfigureAwait(false);
}
}

View File

@@ -0,0 +1,88 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MongoDB.Driver;
using StellaOps.Feedser.Core.Jobs;
using StellaOps.Feedser.Storage.Mongo.Advisories;
using StellaOps.Feedser.Storage.Mongo.ChangeHistory;
using StellaOps.Feedser.Storage.Mongo.Documents;
using StellaOps.Feedser.Storage.Mongo.Dtos;
using StellaOps.Feedser.Storage.Mongo.Exporting;
using StellaOps.Feedser.Storage.Mongo.JpFlags;
using StellaOps.Feedser.Storage.Mongo.MergeEvents;
using StellaOps.Feedser.Storage.Mongo.PsirtFlags;
using StellaOps.Feedser.Storage.Mongo.Migrations;
namespace StellaOps.Feedser.Storage.Mongo;
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddMongoStorage(this IServiceCollection services, Action<MongoStorageOptions> configureOptions)
{
ArgumentNullException.ThrowIfNull(services);
ArgumentNullException.ThrowIfNull(configureOptions);
services.AddOptions<MongoStorageOptions>()
.Configure(configureOptions)
.PostConfigure(static options => options.EnsureValid());
services.TryAddSingleton(TimeProvider.System);
services.AddSingleton<IMongoClient>(static sp =>
{
var options = sp.GetRequiredService<IOptions<MongoStorageOptions>>().Value;
return new MongoClient(options.ConnectionString);
});
services.AddSingleton(static sp =>
{
var options = sp.GetRequiredService<IOptions<MongoStorageOptions>>().Value;
var client = sp.GetRequiredService<IMongoClient>();
var settings = new MongoDatabaseSettings
{
ReadConcern = ReadConcern.Majority,
WriteConcern = WriteConcern.WMajority,
ReadPreference = ReadPreference.PrimaryPreferred,
};
var database = client.GetDatabase(options.GetDatabaseName(), settings);
var writeConcern = database.Settings.WriteConcern.With(wTimeout: options.CommandTimeout);
return database.WithWriteConcern(writeConcern);
});
services.AddSingleton<MongoBootstrapper>();
services.AddSingleton<IJobStore, MongoJobStore>();
services.AddSingleton<ILeaseStore, MongoLeaseStore>();
services.AddSingleton<ISourceStateRepository, MongoSourceStateRepository>();
services.AddSingleton<IDocumentStore, DocumentStore>();
services.AddSingleton<IDtoStore, DtoStore>();
services.AddSingleton<IAdvisoryStore, AdvisoryStore>();
services.AddSingleton<IChangeHistoryStore, MongoChangeHistoryStore>();
services.AddSingleton<IJpFlagStore, JpFlagStore>();
services.AddSingleton<IPsirtFlagStore, PsirtFlagStore>();
services.AddSingleton<IMergeEventStore, MergeEventStore>();
services.AddSingleton<IExportStateStore, ExportStateStore>();
services.TryAddSingleton<ExportStateManager>();
services.AddSingleton<IMongoCollection<JobRunDocument>>(static sp =>
{
var database = sp.GetRequiredService<IMongoDatabase>();
return database.GetCollection<JobRunDocument>(MongoStorageDefaults.Collections.Jobs);
});
services.AddSingleton<IMongoCollection<JobLeaseDocument>>(static sp =>
{
var database = sp.GetRequiredService<IMongoDatabase>();
return database.GetCollection<JobLeaseDocument>(MongoStorageDefaults.Collections.Locks);
});
services.AddHostedService<RawDocumentRetentionService>();
services.AddSingleton<MongoMigrationRunner>();
services.AddSingleton<IMongoMigration, EnsureDocumentExpiryIndexesMigration>();
services.AddSingleton<IMongoMigration, EnsureGridFsExpiryIndexesMigration>();
return services;
}
}

View File

@@ -0,0 +1,73 @@
using MongoDB.Bson;
using MongoDB.Bson.Serialization.Attributes;
namespace StellaOps.Feedser.Storage.Mongo;
[BsonIgnoreExtraElements]
public sealed class SourceStateDocument
{
[BsonId]
public string SourceName { get; set; } = string.Empty;
[BsonElement("enabled")]
public bool Enabled { get; set; } = true;
[BsonElement("paused")]
public bool Paused { get; set; }
[BsonElement("cursor")]
public BsonDocument Cursor { get; set; } = new();
[BsonElement("lastSuccess")]
[BsonIgnoreIfNull]
public DateTime? LastSuccess { get; set; }
[BsonElement("lastFailure")]
[BsonIgnoreIfNull]
public DateTime? LastFailure { get; set; }
[BsonElement("failCount")]
public int FailCount { get; set; }
[BsonElement("backoffUntil")]
[BsonIgnoreIfNull]
public DateTime? BackoffUntil { get; set; }
[BsonElement("updatedAt")]
public DateTime UpdatedAt { get; set; }
[BsonElement("lastFailureReason")]
[BsonIgnoreIfNull]
public string? LastFailureReason { get; set; }
}
internal static class SourceStateDocumentExtensions
{
public static SourceStateDocument FromRecord(SourceStateRecord record)
=> new()
{
SourceName = record.SourceName,
Enabled = record.Enabled,
Paused = record.Paused,
Cursor = record.Cursor ?? new BsonDocument(),
LastSuccess = record.LastSuccess?.UtcDateTime,
LastFailure = record.LastFailure?.UtcDateTime,
FailCount = record.FailCount,
BackoffUntil = record.BackoffUntil?.UtcDateTime,
UpdatedAt = record.UpdatedAt.UtcDateTime,
LastFailureReason = record.LastFailureReason,
};
public static SourceStateRecord ToRecord(this SourceStateDocument document)
=> new(
document.SourceName,
document.Enabled,
document.Paused,
document.Cursor ?? new BsonDocument(),
document.LastSuccess.HasValue ? DateTime.SpecifyKind(document.LastSuccess.Value, DateTimeKind.Utc) : null,
document.LastFailure.HasValue ? DateTime.SpecifyKind(document.LastFailure.Value, DateTimeKind.Utc) : null,
document.FailCount,
document.BackoffUntil.HasValue ? DateTime.SpecifyKind(document.BackoffUntil.Value, DateTimeKind.Utc) : null,
DateTime.SpecifyKind(document.UpdatedAt, DateTimeKind.Utc),
document.LastFailureReason);
}

View File

@@ -0,0 +1,15 @@
using MongoDB.Bson;
namespace StellaOps.Feedser.Storage.Mongo;
public sealed record SourceStateRecord(
string SourceName,
bool Enabled,
bool Paused,
BsonDocument Cursor,
DateTimeOffset? LastSuccess,
DateTimeOffset? LastFailure,
int FailCount,
DateTimeOffset? BackoffUntil,
DateTimeOffset UpdatedAt,
string? LastFailureReason);

View File

@@ -0,0 +1,19 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace StellaOps.Feedser.Storage.Mongo;
public static class SourceStateRepositoryExtensions
{
public static Task<SourceStateRecord?> MarkFailureAsync(
this ISourceStateRepository repository,
string sourceName,
DateTimeOffset failedAt,
TimeSpan? backoff,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(repository);
return repository.MarkFailureAsync(sourceName, failedAt, backoff, failureReason: null, cancellationToken);
}
}

View File

@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<LangVersion>preview</LangVersion>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="MongoDB.Driver" Version="2.22.0" />
<PackageReference Include="MongoDB.Driver.GridFS" Version="2.22.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\StellaOps.Feedser.Core\StellaOps.Feedser.Core.csproj" />
<ProjectReference Include="..\StellaOps.Feedser.Models\StellaOps.Feedser.Models.csproj" />
</ItemGroup>
</Project>

View File

@@ -0,0 +1,15 @@
# TASKS
| Task | Owner(s) | Depends on | Notes |
|---|---|---|---|
|MongoBootstrapper to create collections/indexes|BE-Storage|Storage.Mongo|DONE `MongoBootstrapper` ensures collections & indexes incl. TTL on locks.ttlAt.|
|SourceState repository (get/set/backoff)|BE-Conn-Base|Storage.Mongo|DONE implemented `MongoSourceStateRepository`.|
|Document/DTO stores with SHA/metadata|BE-Conn-Base|Storage.Mongo|DONE DocumentStore and DtoStore provide upsert/status lookups.|
|AdvisoryStore (GetAllAsync etc.)|BE-Export|Models|DONE AdvisoryStore handles upsert + recent/advisory fetches.|
|Job store (runs/active/recent)|BE-Core|Storage.Mongo|DONE `MongoJobStore` covers create/start/complete queries.|
|Alias and reference secondary indexes|BE-Storage|Models|DONE bootstrapper builds alias/reference indexes.|
|MergeEvent store|BE-Merge|Models|DONE MergeEventStore appends/retrieves recent events.|
|ExportState store|BE-Export|Exporters|DONE ExportStateStore upserts and retrieves exporter metadata.|
|Performance tests for large advisories|QA|Storage.Mongo|DONE `AdvisoryStorePerformanceTests` exercises large payload upsert/find throughput budgets.|
|Migration playbook for schema/index changes|BE-Storage|Storage.Mongo|DONE `MongoMigrationRunner` executes `IMongoMigration` steps recorded in `schema_migrations`; see `MIGRATIONS.md`.|
|Raw document retention/TTL strategy|BE-Storage|Storage.Mongo|DONE retention options flow into `RawDocumentRetentionService` and TTL migrations for `document`/GridFS indexes.|
|Persist last failure reason in SourceState|BE-Storage|Storage.Mongo|DONE `MongoSourceStateRepository.MarkFailureAsync` stores `lastFailureReason` with length guard + reset on success.|