Files
git.stella-ops.org/src/StellaOps.Feedser.Storage.Mongo/MongoBootstrapper.cs
Vladimir Moushkov d0c95cf328
Some checks failed
Build Test Deploy / build-test (push) Has been cancelled
Build Test Deploy / docs (push) Has been cancelled
Build Test Deploy / deploy (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
UP
2025-10-09 18:59:17 +03:00

309 lines
13 KiB
C#

using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MongoDB.Bson;
using MongoDB.Driver;
using StellaOps.Feedser.Storage.Mongo.Migrations;
namespace StellaOps.Feedser.Storage.Mongo;
/// <summary>
/// Ensures required collections and indexes exist before the service begins processing.
/// </summary>
public sealed class MongoBootstrapper
{
private const string RawDocumentBucketName = "documents";
private static readonly string[] RequiredCollections =
{
MongoStorageDefaults.Collections.Source,
MongoStorageDefaults.Collections.SourceState,
MongoStorageDefaults.Collections.Document,
MongoStorageDefaults.Collections.Dto,
MongoStorageDefaults.Collections.Advisory,
MongoStorageDefaults.Collections.Alias,
MongoStorageDefaults.Collections.Affected,
MongoStorageDefaults.Collections.Reference,
MongoStorageDefaults.Collections.KevFlag,
MongoStorageDefaults.Collections.RuFlags,
MongoStorageDefaults.Collections.JpFlags,
MongoStorageDefaults.Collections.PsirtFlags,
MongoStorageDefaults.Collections.MergeEvent,
MongoStorageDefaults.Collections.ExportState,
MongoStorageDefaults.Collections.ChangeHistory,
MongoStorageDefaults.Collections.Locks,
MongoStorageDefaults.Collections.Jobs,
MongoStorageDefaults.Collections.Migrations,
};
private readonly IMongoDatabase _database;
private readonly MongoStorageOptions _options;
private readonly ILogger<MongoBootstrapper> _logger;
private readonly MongoMigrationRunner _migrationRunner;
public MongoBootstrapper(
IMongoDatabase database,
IOptions<MongoStorageOptions> options,
ILogger<MongoBootstrapper> logger,
MongoMigrationRunner migrationRunner)
{
_database = database ?? throw new ArgumentNullException(nameof(database));
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_migrationRunner = migrationRunner ?? throw new ArgumentNullException(nameof(migrationRunner));
}
public async Task InitializeAsync(CancellationToken cancellationToken)
{
var existingCollections = await ListCollectionsAsync(cancellationToken).ConfigureAwait(false);
foreach (var collectionName in RequiredCollections)
{
if (!existingCollections.Contains(collectionName))
{
await _database.CreateCollectionAsync(collectionName, cancellationToken: cancellationToken).ConfigureAwait(false);
_logger.LogInformation("Created Mongo collection {Collection}", collectionName);
}
}
await Task.WhenAll(
EnsureLocksIndexesAsync(cancellationToken),
EnsureJobsIndexesAsync(cancellationToken),
EnsureAdvisoryIndexesAsync(cancellationToken),
EnsureDocumentsIndexesAsync(cancellationToken),
EnsureDtoIndexesAsync(cancellationToken),
EnsureAliasIndexesAsync(cancellationToken),
EnsureAffectedIndexesAsync(cancellationToken),
EnsureReferenceIndexesAsync(cancellationToken),
EnsureSourceStateIndexesAsync(cancellationToken),
EnsurePsirtFlagIndexesAsync(cancellationToken),
EnsureChangeHistoryIndexesAsync(cancellationToken),
EnsureGridFsIndexesAsync(cancellationToken)).ConfigureAwait(false);
await _migrationRunner.RunAsync(cancellationToken).ConfigureAwait(false);
_logger.LogInformation("Mongo bootstrapper completed");
}
private async Task<HashSet<string>> ListCollectionsAsync(CancellationToken cancellationToken)
{
using var cursor = await _database.ListCollectionNamesAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
var list = await cursor.ToListAsync(cancellationToken).ConfigureAwait(false);
return new HashSet<string>(list, StringComparer.Ordinal);
}
private Task EnsureLocksIndexesAsync(CancellationToken cancellationToken)
{
var collection = _database.GetCollection<BsonDocument>(MongoStorageDefaults.Collections.Locks);
var indexes = new List<CreateIndexModel<BsonDocument>>
{
new(
Builders<BsonDocument>.IndexKeys.Ascending("ttlAt"),
new CreateIndexOptions { Name = "ttl_at_ttl", ExpireAfter = TimeSpan.Zero }),
};
return collection.Indexes.CreateManyAsync(indexes, cancellationToken);
}
private Task EnsureJobsIndexesAsync(CancellationToken cancellationToken)
{
var collection = _database.GetCollection<BsonDocument>(MongoStorageDefaults.Collections.Jobs);
var indexes = new List<CreateIndexModel<BsonDocument>>
{
new(
Builders<BsonDocument>.IndexKeys.Descending("createdAt"),
new CreateIndexOptions { Name = "jobs_createdAt_desc" }),
new(
Builders<BsonDocument>.IndexKeys.Ascending("kind").Descending("createdAt"),
new CreateIndexOptions { Name = "jobs_kind_createdAt" }),
new(
Builders<BsonDocument>.IndexKeys.Ascending("status").Descending("createdAt"),
new CreateIndexOptions { Name = "jobs_status_createdAt" }),
};
return collection.Indexes.CreateManyAsync(indexes, cancellationToken);
}
private Task EnsureAdvisoryIndexesAsync(CancellationToken cancellationToken)
{
var collection = _database.GetCollection<BsonDocument>(MongoStorageDefaults.Collections.Advisory);
var indexes = new List<CreateIndexModel<BsonDocument>>
{
new(
Builders<BsonDocument>.IndexKeys.Ascending("advisoryKey"),
new CreateIndexOptions { Name = "advisory_key_unique", Unique = true }),
new(
Builders<BsonDocument>.IndexKeys.Descending("modified"),
new CreateIndexOptions { Name = "advisory_modified_desc" }),
new(
Builders<BsonDocument>.IndexKeys.Descending("published"),
new CreateIndexOptions { Name = "advisory_published_desc" }),
};
return collection.Indexes.CreateManyAsync(indexes, cancellationToken);
}
private Task EnsureDocumentsIndexesAsync(CancellationToken cancellationToken)
{
var collection = _database.GetCollection<BsonDocument>(MongoStorageDefaults.Collections.Document);
var indexes = new List<CreateIndexModel<BsonDocument>>
{
new(
Builders<BsonDocument>.IndexKeys.Ascending("sourceName").Ascending("uri"),
new CreateIndexOptions { Name = "document_source_uri_unique", Unique = true }),
new(
Builders<BsonDocument>.IndexKeys.Descending("fetchedAt"),
new CreateIndexOptions { Name = "document_fetchedAt_desc" }),
};
var expiresKey = Builders<BsonDocument>.IndexKeys.Ascending("expiresAt");
var expiresOptions = new CreateIndexOptions<BsonDocument>
{
Name = _options.RawDocumentRetention > TimeSpan.Zero ? "document_expiresAt_ttl" : "document_expiresAt",
PartialFilterExpression = Builders<BsonDocument>.Filter.Exists("expiresAt", true),
};
if (_options.RawDocumentRetention > TimeSpan.Zero)
{
expiresOptions.ExpireAfter = TimeSpan.Zero;
}
indexes.Add(new CreateIndexModel<BsonDocument>(expiresKey, expiresOptions));
return collection.Indexes.CreateManyAsync(indexes, cancellationToken);
}
private Task EnsureAliasIndexesAsync(CancellationToken cancellationToken)
{
var collection = _database.GetCollection<BsonDocument>(MongoStorageDefaults.Collections.Alias);
var indexes = new List<CreateIndexModel<BsonDocument>>
{
new(
Builders<BsonDocument>.IndexKeys.Ascending("scheme").Ascending("value"),
new CreateIndexOptions { Name = "alias_scheme_value", Unique = false }),
};
return collection.Indexes.CreateManyAsync(indexes, cancellationToken);
}
private Task EnsureGridFsIndexesAsync(CancellationToken cancellationToken)
{
if (_options.RawDocumentRetention <= TimeSpan.Zero)
{
return Task.CompletedTask;
}
var collectionName = $"{RawDocumentBucketName}.files";
var collection = _database.GetCollection<BsonDocument>(collectionName);
var indexes = new List<CreateIndexModel<BsonDocument>>
{
new(
Builders<BsonDocument>.IndexKeys.Ascending("metadata.expiresAt"),
new CreateIndexOptions<BsonDocument>
{
Name = "gridfs_files_expiresAt_ttl",
ExpireAfter = TimeSpan.Zero,
PartialFilterExpression = Builders<BsonDocument>.Filter.Exists("metadata.expiresAt", true),
}),
};
return collection.Indexes.CreateManyAsync(indexes, cancellationToken);
}
private Task EnsureAffectedIndexesAsync(CancellationToken cancellationToken)
{
var collection = _database.GetCollection<BsonDocument>(MongoStorageDefaults.Collections.Affected);
var indexes = new List<CreateIndexModel<BsonDocument>>
{
new(
Builders<BsonDocument>.IndexKeys.Ascending("platform").Ascending("name"),
new CreateIndexOptions { Name = "affected_platform_name" }),
new(
Builders<BsonDocument>.IndexKeys.Ascending("advisoryId"),
new CreateIndexOptions { Name = "affected_advisoryId" }),
};
return collection.Indexes.CreateManyAsync(indexes, cancellationToken);
}
private Task EnsureReferenceIndexesAsync(CancellationToken cancellationToken)
{
var collection = _database.GetCollection<BsonDocument>(MongoStorageDefaults.Collections.Reference);
var indexes = new List<CreateIndexModel<BsonDocument>>
{
new(
Builders<BsonDocument>.IndexKeys.Ascending("url"),
new CreateIndexOptions { Name = "reference_url" }),
new(
Builders<BsonDocument>.IndexKeys.Ascending("advisoryId"),
new CreateIndexOptions { Name = "reference_advisoryId" }),
};
return collection.Indexes.CreateManyAsync(indexes, cancellationToken);
}
private Task EnsureSourceStateIndexesAsync(CancellationToken cancellationToken)
{
var collection = _database.GetCollection<BsonDocument>(MongoStorageDefaults.Collections.SourceState);
var indexes = new List<CreateIndexModel<BsonDocument>>
{
new(
Builders<BsonDocument>.IndexKeys.Ascending("sourceName"),
new CreateIndexOptions { Name = "source_state_unique", Unique = true }),
};
return collection.Indexes.CreateManyAsync(indexes, cancellationToken);
}
private Task EnsureDtoIndexesAsync(CancellationToken cancellationToken)
{
var collection = _database.GetCollection<BsonDocument>(MongoStorageDefaults.Collections.Dto);
var indexes = new List<CreateIndexModel<BsonDocument>>
{
new(
Builders<BsonDocument>.IndexKeys.Ascending("documentId"),
new CreateIndexOptions { Name = "dto_documentId" }),
new(
Builders<BsonDocument>.IndexKeys.Ascending("sourceName").Descending("validatedAt"),
new CreateIndexOptions { Name = "dto_source_validated" }),
};
return collection.Indexes.CreateManyAsync(indexes, cancellationToken);
}
private async Task EnsurePsirtFlagIndexesAsync(CancellationToken cancellationToken)
{
var collection = _database.GetCollection<BsonDocument>(MongoStorageDefaults.Collections.PsirtFlags);
try
{
await collection.Indexes.DropOneAsync("psirt_advisoryKey_unique", cancellationToken).ConfigureAwait(false);
}
catch (MongoCommandException ex) when (ex.CodeName == "IndexNotFound")
{
}
var index = new CreateIndexModel<BsonDocument>(
Builders<BsonDocument>.IndexKeys.Ascending("vendor"),
new CreateIndexOptions { Name = "psirt_vendor" });
await collection.Indexes.CreateOneAsync(index, cancellationToken: cancellationToken).ConfigureAwait(false);
}
private Task EnsureChangeHistoryIndexesAsync(CancellationToken cancellationToken)
{
var collection = _database.GetCollection<BsonDocument>(MongoStorageDefaults.Collections.ChangeHistory);
var indexes = new List<CreateIndexModel<BsonDocument>>
{
new(
Builders<BsonDocument>.IndexKeys.Ascending("source").Ascending("advisoryKey").Descending("capturedAt"),
new CreateIndexOptions { Name = "history_source_advisory_capturedAt" }),
new(
Builders<BsonDocument>.IndexKeys.Descending("capturedAt"),
new CreateIndexOptions { Name = "history_capturedAt" }),
new(
Builders<BsonDocument>.IndexKeys.Ascending("documentId"),
new CreateIndexOptions { Name = "history_documentId" })
};
return collection.Indexes.CreateManyAsync(indexes, cancellationToken);
}
}