151 lines
5.4 KiB
C#
151 lines
5.4 KiB
C#
using System;
|
|
using System.ComponentModel.DataAnnotations;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.Extensions.Options;
|
|
using MongoDB.Driver;
|
|
using MongoDB.Driver.Core.Clusters;
|
|
using StellaOps.Vexer.Core;
|
|
|
|
namespace StellaOps.Vexer.Storage.Mongo;
|
|
|
|
public sealed class MongoVexExportStore : IVexExportStore
|
|
{
|
|
private readonly IMongoClient _client;
|
|
private readonly IMongoCollection<VexExportManifestRecord> _exports;
|
|
private readonly IMongoCollection<VexCacheEntryRecord> _cache;
|
|
private readonly VexMongoStorageOptions _options;
|
|
|
|
public MongoVexExportStore(
|
|
IMongoClient client,
|
|
IMongoDatabase database,
|
|
IOptions<VexMongoStorageOptions> options)
|
|
{
|
|
_client = client ?? throw new ArgumentNullException(nameof(client));
|
|
ArgumentNullException.ThrowIfNull(database);
|
|
ArgumentNullException.ThrowIfNull(options);
|
|
|
|
_options = options.Value;
|
|
Validator.ValidateObject(_options, new ValidationContext(_options), validateAllProperties: true);
|
|
|
|
VexMongoMappingRegistry.Register();
|
|
_exports = database.GetCollection<VexExportManifestRecord>(VexMongoCollectionNames.Exports);
|
|
_cache = database.GetCollection<VexCacheEntryRecord>(VexMongoCollectionNames.Cache);
|
|
}
|
|
|
|
public async ValueTask<VexExportManifest?> FindAsync(VexQuerySignature signature, VexExportFormat format, CancellationToken cancellationToken)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(signature);
|
|
var cacheId = VexCacheEntryRecord.CreateId(signature, format);
|
|
var cacheFilter = Builders<VexCacheEntryRecord>.Filter.Eq(x => x.Id, cacheId);
|
|
var cacheRecord = await _cache.Find(cacheFilter).FirstOrDefaultAsync(cancellationToken).ConfigureAwait(false);
|
|
|
|
if (cacheRecord is null)
|
|
{
|
|
return null;
|
|
}
|
|
|
|
if (cacheRecord.ExpiresAt is DateTime expiresAt && expiresAt <= DateTime.UtcNow)
|
|
{
|
|
await _cache.DeleteOneAsync(cacheFilter, cancellationToken).ConfigureAwait(false);
|
|
return null;
|
|
}
|
|
|
|
var manifestId = VexExportManifestRecord.CreateId(signature, format);
|
|
var manifestFilter = Builders<VexExportManifestRecord>.Filter.Eq(x => x.Id, manifestId);
|
|
var manifest = await _exports.Find(manifestFilter).FirstOrDefaultAsync(cancellationToken).ConfigureAwait(false);
|
|
|
|
if (manifest is null)
|
|
{
|
|
await _cache.DeleteOneAsync(cacheFilter, cancellationToken).ConfigureAwait(false);
|
|
return null;
|
|
}
|
|
|
|
if (!string.IsNullOrWhiteSpace(cacheRecord.ManifestId) &&
|
|
!string.Equals(cacheRecord.ManifestId, manifest.Id, StringComparison.Ordinal))
|
|
{
|
|
await _cache.DeleteOneAsync(cacheFilter, cancellationToken).ConfigureAwait(false);
|
|
return null;
|
|
}
|
|
|
|
return manifest.ToDomain();
|
|
}
|
|
|
|
public async ValueTask SaveAsync(VexExportManifest manifest, CancellationToken cancellationToken)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(manifest);
|
|
|
|
using var session = await _client.StartSessionAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
|
|
var supportsTransactions = session.Client.Cluster.Description.Type != ClusterType.Standalone;
|
|
|
|
var startedTransaction = false;
|
|
if (supportsTransactions)
|
|
{
|
|
try
|
|
{
|
|
session.StartTransaction();
|
|
startedTransaction = true;
|
|
}
|
|
catch (NotSupportedException)
|
|
{
|
|
supportsTransactions = false;
|
|
}
|
|
}
|
|
|
|
try
|
|
{
|
|
var manifestRecord = VexExportManifestRecord.FromDomain(manifest);
|
|
var manifestFilter = Builders<VexExportManifestRecord>.Filter.Eq(x => x.Id, manifestRecord.Id);
|
|
|
|
await _exports
|
|
.ReplaceOneAsync(
|
|
session,
|
|
manifestFilter,
|
|
manifestRecord,
|
|
new ReplaceOptions { IsUpsert = true },
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
var cacheEntry = CreateCacheEntry(manifest);
|
|
var cacheRecord = VexCacheEntryRecord.FromDomain(cacheEntry);
|
|
var cacheFilter = Builders<VexCacheEntryRecord>.Filter.Eq(x => x.Id, cacheRecord.Id);
|
|
|
|
await _cache
|
|
.ReplaceOneAsync(
|
|
session,
|
|
cacheFilter,
|
|
cacheRecord,
|
|
new ReplaceOptions { IsUpsert = true },
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
if (startedTransaction)
|
|
{
|
|
await session.CommitTransactionAsync(cancellationToken).ConfigureAwait(false);
|
|
}
|
|
}
|
|
catch
|
|
{
|
|
if (startedTransaction && session.IsInTransaction)
|
|
{
|
|
await session.AbortTransactionAsync(cancellationToken).ConfigureAwait(false);
|
|
}
|
|
throw;
|
|
}
|
|
}
|
|
|
|
private VexCacheEntry CreateCacheEntry(VexExportManifest manifest)
|
|
{
|
|
var expiresAt = manifest.CreatedAt + _options.ExportCacheTtl;
|
|
return new VexCacheEntry(
|
|
manifest.QuerySignature,
|
|
manifest.Format,
|
|
manifest.Artifact,
|
|
manifest.CreatedAt,
|
|
manifest.SizeBytes,
|
|
manifestId: manifest.ExportId,
|
|
gridFsObjectId: null,
|
|
expiresAt: expiresAt);
|
|
}
|
|
}
|