Add unit tests for VexLens normalizer, CPE parser, product mapper, and PURL parser
- Implemented comprehensive tests for VexLensNormalizer including format detection and normalization scenarios. - Added tests for CpeParser covering CPE 2.3 and 2.2 formats, invalid inputs, and canonical key generation. - Created tests for ProductMapper to validate parsing and matching logic across different strictness levels. - Developed tests for PurlParser to ensure correct parsing of various PURL formats and validation of identifiers. - Introduced stubs for Monaco editor and worker to facilitate testing in the web application. - Updated project file for the test project to include necessary dependencies.
This commit is contained in:
@@ -1,11 +0,0 @@
|
||||
namespace StellaOps.Concelier.Storage.Mongo.Documents;
|
||||
|
||||
/// <summary>
|
||||
/// Stub record for document storage. (Placeholder for full implementation)
|
||||
/// </summary>
|
||||
public sealed record DocumentRecord
|
||||
{
|
||||
public string Id { get; init; } = string.Empty;
|
||||
public string TenantId { get; init; } = string.Empty;
|
||||
public string Source { get; init; } = string.Empty;
|
||||
}
|
||||
@@ -1,8 +0,0 @@
|
||||
namespace StellaOps.Concelier.Storage.Mongo;
|
||||
|
||||
/// <summary>
|
||||
/// Stub interface for document storage. (Placeholder for full implementation)
|
||||
/// </summary>
|
||||
public interface IDocumentStore
|
||||
{
|
||||
}
|
||||
@@ -1,8 +0,0 @@
|
||||
namespace StellaOps.Concelier.Storage.Mongo;
|
||||
|
||||
/// <summary>
|
||||
/// Stub interface for source state repository. (Placeholder for full implementation)
|
||||
/// </summary>
|
||||
public interface ISourceStateRepository
|
||||
{
|
||||
}
|
||||
@@ -1,10 +0,0 @@
|
||||
namespace StellaOps.Concelier.Storage.Mongo;
|
||||
|
||||
/// <summary>
|
||||
/// Stub options for MongoDB storage. (Placeholder for full implementation)
|
||||
/// </summary>
|
||||
public sealed class MongoStorageOptions
|
||||
{
|
||||
public string ConnectionString { get; set; } = string.Empty;
|
||||
public string DatabaseName { get; set; } = string.Empty;
|
||||
}
|
||||
@@ -1,313 +0,0 @@
|
||||
using System.Security.Cryptography;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using MongoDB.Bson;
|
||||
using MongoDB.Driver;
|
||||
using MongoDB.Driver.GridFS;
|
||||
|
||||
namespace StellaOps.Concelier.Storage.Mongo.ObjectStorage;
|
||||
|
||||
/// <summary>
|
||||
/// Service for migrating raw payloads from GridFS to S3-compatible object storage.
|
||||
/// </summary>
|
||||
public sealed class GridFsMigrationService
|
||||
{
|
||||
private readonly IGridFSBucket _gridFs;
|
||||
private readonly IObjectStore _objectStore;
|
||||
private readonly IMigrationTracker _migrationTracker;
|
||||
private readonly ObjectStorageOptions _options;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly ILogger<GridFsMigrationService> _logger;
|
||||
|
||||
public GridFsMigrationService(
|
||||
IGridFSBucket gridFs,
|
||||
IObjectStore objectStore,
|
||||
IMigrationTracker migrationTracker,
|
||||
IOptions<ObjectStorageOptions> options,
|
||||
TimeProvider timeProvider,
|
||||
ILogger<GridFsMigrationService> logger)
|
||||
{
|
||||
_gridFs = gridFs ?? throw new ArgumentNullException(nameof(gridFs));
|
||||
_objectStore = objectStore ?? throw new ArgumentNullException(nameof(objectStore));
|
||||
_migrationTracker = migrationTracker ?? throw new ArgumentNullException(nameof(migrationTracker));
|
||||
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Migrates a single GridFS document to object storage.
|
||||
/// </summary>
|
||||
public async Task<MigrationResult> MigrateAsync(
|
||||
string gridFsId,
|
||||
string tenantId,
|
||||
string sourceId,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(gridFsId);
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(sourceId);
|
||||
|
||||
// Check if already migrated
|
||||
if (await _migrationTracker.IsMigratedAsync(gridFsId, cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
_logger.LogDebug("GridFS {GridFsId} already migrated, skipping", gridFsId);
|
||||
return MigrationResult.AlreadyMigrated(gridFsId);
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
// Download from GridFS
|
||||
var objectId = ObjectId.Parse(gridFsId);
|
||||
using var downloadStream = new MemoryStream();
|
||||
await _gridFs.DownloadToStreamAsync(objectId, downloadStream, cancellationToken: cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
var data = downloadStream.ToArray();
|
||||
var sha256 = ComputeSha256(data);
|
||||
|
||||
// Get GridFS file info
|
||||
var filter = Builders<GridFSFileInfo>.Filter.Eq("_id", objectId);
|
||||
var fileInfo = await _gridFs.Find(filter)
|
||||
.FirstOrDefaultAsync(cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
var ingestedAt = fileInfo?.UploadDateTime ?? _timeProvider.GetUtcNow().UtcDateTime;
|
||||
|
||||
// Create provenance metadata
|
||||
var provenance = new ProvenanceMetadata
|
||||
{
|
||||
SourceId = sourceId,
|
||||
IngestedAt = new DateTimeOffset(ingestedAt, TimeSpan.Zero),
|
||||
TenantId = tenantId,
|
||||
OriginalFormat = DetectFormat(fileInfo?.Filename),
|
||||
OriginalSize = data.Length,
|
||||
GridFsLegacyId = gridFsId,
|
||||
Transformations =
|
||||
[
|
||||
new TransformationRecord
|
||||
{
|
||||
Type = TransformationType.Migration,
|
||||
Timestamp = _timeProvider.GetUtcNow(),
|
||||
Agent = "concelier-gridfs-migration-v1"
|
||||
}
|
||||
]
|
||||
};
|
||||
|
||||
// Store in object storage
|
||||
var reference = await _objectStore.StoreAsync(
|
||||
tenantId,
|
||||
data,
|
||||
provenance,
|
||||
GetContentType(fileInfo?.Filename),
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// Record migration
|
||||
await _migrationTracker.RecordMigrationAsync(
|
||||
gridFsId,
|
||||
reference.Pointer,
|
||||
MigrationStatus.Migrated,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
_logger.LogInformation(
|
||||
"Migrated GridFS {GridFsId} to {Bucket}/{Key}, size {Size} bytes",
|
||||
gridFsId, reference.Pointer.Bucket, reference.Pointer.Key, data.Length);
|
||||
|
||||
return MigrationResult.Success(gridFsId, reference);
|
||||
}
|
||||
catch (GridFSFileNotFoundException)
|
||||
{
|
||||
_logger.LogWarning("GridFS file not found: {GridFsId}", gridFsId);
|
||||
return MigrationResult.NotFound(gridFsId);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to migrate GridFS {GridFsId}", gridFsId);
|
||||
return MigrationResult.Failed(gridFsId, ex.Message);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies a migrated document by comparing hashes.
|
||||
/// </summary>
|
||||
public async Task<bool> VerifyMigrationAsync(
|
||||
string gridFsId,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(gridFsId);
|
||||
|
||||
var record = await _migrationTracker.GetByGridFsIdAsync(gridFsId, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
if (record is null)
|
||||
{
|
||||
_logger.LogWarning("No migration record found for {GridFsId}", gridFsId);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Download original from GridFS
|
||||
var objectId = ObjectId.Parse(gridFsId);
|
||||
using var downloadStream = new MemoryStream();
|
||||
|
||||
try
|
||||
{
|
||||
await _gridFs.DownloadToStreamAsync(objectId, downloadStream, cancellationToken: cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
catch (GridFSFileNotFoundException)
|
||||
{
|
||||
_logger.LogWarning("Original GridFS file not found for verification: {GridFsId}", gridFsId);
|
||||
return false;
|
||||
}
|
||||
|
||||
var originalHash = ComputeSha256(downloadStream.ToArray());
|
||||
|
||||
// Verify the migrated object
|
||||
var reference = PayloadReference.CreateObjectStorage(record.Pointer, new ProvenanceMetadata
|
||||
{
|
||||
SourceId = string.Empty,
|
||||
IngestedAt = record.MigratedAt,
|
||||
TenantId = string.Empty,
|
||||
});
|
||||
|
||||
var verified = await _objectStore.VerifyIntegrityAsync(reference, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
if (verified && string.Equals(originalHash, record.Pointer.Sha256, StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
await _migrationTracker.MarkVerifiedAsync(gridFsId, cancellationToken).ConfigureAwait(false);
|
||||
_logger.LogInformation("Verified migration for {GridFsId}", gridFsId);
|
||||
return true;
|
||||
}
|
||||
|
||||
_logger.LogWarning(
|
||||
"Verification failed for {GridFsId}: original hash {Original}, stored hash {Stored}",
|
||||
gridFsId, originalHash, record.Pointer.Sha256);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Batches migration of multiple GridFS documents.
|
||||
/// </summary>
|
||||
public async Task<BatchMigrationResult> MigrateBatchAsync(
|
||||
IEnumerable<GridFsMigrationRequest> requests,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
var results = new List<MigrationResult>();
|
||||
|
||||
foreach (var request in requests)
|
||||
{
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
var result = await MigrateAsync(
|
||||
request.GridFsId,
|
||||
request.TenantId,
|
||||
request.SourceId,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
results.Add(result);
|
||||
}
|
||||
|
||||
return new BatchMigrationResult(results);
|
||||
}
|
||||
|
||||
private static string ComputeSha256(byte[] data)
|
||||
{
|
||||
var hash = SHA256.HashData(data);
|
||||
return Convert.ToHexStringLower(hash);
|
||||
}
|
||||
|
||||
private static OriginalFormat? DetectFormat(string? filename)
|
||||
{
|
||||
if (string.IsNullOrEmpty(filename))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
return Path.GetExtension(filename).ToLowerInvariant() switch
|
||||
{
|
||||
".json" => OriginalFormat.Json,
|
||||
".xml" => OriginalFormat.Xml,
|
||||
".csv" => OriginalFormat.Csv,
|
||||
".ndjson" => OriginalFormat.Ndjson,
|
||||
".yaml" or ".yml" => OriginalFormat.Yaml,
|
||||
_ => null
|
||||
};
|
||||
}
|
||||
|
||||
private static string GetContentType(string? filename)
|
||||
{
|
||||
if (string.IsNullOrEmpty(filename))
|
||||
{
|
||||
return "application/octet-stream";
|
||||
}
|
||||
|
||||
return Path.GetExtension(filename).ToLowerInvariant() switch
|
||||
{
|
||||
".json" => "application/json",
|
||||
".xml" => "application/xml",
|
||||
".csv" => "text/csv",
|
||||
".ndjson" => "application/x-ndjson",
|
||||
".yaml" or ".yml" => "application/x-yaml",
|
||||
_ => "application/octet-stream"
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Request to migrate a GridFS document.
|
||||
/// </summary>
|
||||
public sealed record GridFsMigrationRequest(
|
||||
string GridFsId,
|
||||
string TenantId,
|
||||
string SourceId);
|
||||
|
||||
/// <summary>
|
||||
/// Result of a single migration.
|
||||
/// </summary>
|
||||
public sealed record MigrationResult
|
||||
{
|
||||
public required string GridFsId { get; init; }
|
||||
public required MigrationResultStatus Status { get; init; }
|
||||
public PayloadReference? Reference { get; init; }
|
||||
public string? ErrorMessage { get; init; }
|
||||
|
||||
public static MigrationResult Success(string gridFsId, PayloadReference reference)
|
||||
=> new() { GridFsId = gridFsId, Status = MigrationResultStatus.Success, Reference = reference };
|
||||
|
||||
public static MigrationResult AlreadyMigrated(string gridFsId)
|
||||
=> new() { GridFsId = gridFsId, Status = MigrationResultStatus.AlreadyMigrated };
|
||||
|
||||
public static MigrationResult NotFound(string gridFsId)
|
||||
=> new() { GridFsId = gridFsId, Status = MigrationResultStatus.NotFound };
|
||||
|
||||
public static MigrationResult Failed(string gridFsId, string errorMessage)
|
||||
=> new() { GridFsId = gridFsId, Status = MigrationResultStatus.Failed, ErrorMessage = errorMessage };
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Status of a migration result.
|
||||
/// </summary>
|
||||
public enum MigrationResultStatus
|
||||
{
|
||||
Success,
|
||||
AlreadyMigrated,
|
||||
NotFound,
|
||||
Failed
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Result of a batch migration.
|
||||
/// </summary>
|
||||
public sealed record BatchMigrationResult(IReadOnlyList<MigrationResult> Results)
|
||||
{
|
||||
public int TotalCount => Results.Count;
|
||||
public int SuccessCount => Results.Count(r => r.Status == MigrationResultStatus.Success);
|
||||
public int AlreadyMigratedCount => Results.Count(r => r.Status == MigrationResultStatus.AlreadyMigrated);
|
||||
public int NotFoundCount => Results.Count(r => r.Status == MigrationResultStatus.NotFound);
|
||||
public int FailedCount => Results.Count(r => r.Status == MigrationResultStatus.Failed);
|
||||
}
|
||||
@@ -1,60 +0,0 @@
|
||||
namespace StellaOps.Concelier.Storage.Mongo.ObjectStorage;
|
||||
|
||||
/// <summary>
|
||||
/// Tracks GridFS to S3 migrations.
|
||||
/// </summary>
|
||||
public interface IMigrationTracker
|
||||
{
|
||||
/// <summary>
|
||||
/// Records a migration attempt.
|
||||
/// </summary>
|
||||
Task<MigrationRecord> RecordMigrationAsync(
|
||||
string gridFsId,
|
||||
ObjectPointer pointer,
|
||||
MigrationStatus status,
|
||||
CancellationToken cancellationToken = default);
|
||||
|
||||
/// <summary>
|
||||
/// Updates a migration record status.
|
||||
/// </summary>
|
||||
Task UpdateStatusAsync(
|
||||
string gridFsId,
|
||||
MigrationStatus status,
|
||||
string? errorMessage = null,
|
||||
CancellationToken cancellationToken = default);
|
||||
|
||||
/// <summary>
|
||||
/// Marks a migration as verified.
|
||||
/// </summary>
|
||||
Task MarkVerifiedAsync(
|
||||
string gridFsId,
|
||||
CancellationToken cancellationToken = default);
|
||||
|
||||
/// <summary>
|
||||
/// Gets a migration record by GridFS ID.
|
||||
/// </summary>
|
||||
Task<MigrationRecord?> GetByGridFsIdAsync(
|
||||
string gridFsId,
|
||||
CancellationToken cancellationToken = default);
|
||||
|
||||
/// <summary>
|
||||
/// Lists pending migrations.
|
||||
/// </summary>
|
||||
Task<IReadOnlyList<MigrationRecord>> ListPendingAsync(
|
||||
int limit = 100,
|
||||
CancellationToken cancellationToken = default);
|
||||
|
||||
/// <summary>
|
||||
/// Lists migrations needing verification.
|
||||
/// </summary>
|
||||
Task<IReadOnlyList<MigrationRecord>> ListNeedingVerificationAsync(
|
||||
int limit = 100,
|
||||
CancellationToken cancellationToken = default);
|
||||
|
||||
/// <summary>
|
||||
/// Checks if a GridFS ID has been migrated.
|
||||
/// </summary>
|
||||
Task<bool> IsMigratedAsync(
|
||||
string gridFsId,
|
||||
CancellationToken cancellationToken = default);
|
||||
}
|
||||
@@ -1,98 +0,0 @@
|
||||
namespace StellaOps.Concelier.Storage.Mongo.ObjectStorage;
|
||||
|
||||
/// <summary>
|
||||
/// Abstraction for S3-compatible object storage operations.
|
||||
/// </summary>
|
||||
public interface IObjectStore
|
||||
{
|
||||
/// <summary>
|
||||
/// Stores a payload, returning a reference (either inline or object storage).
|
||||
/// Automatically decides based on size thresholds.
|
||||
/// </summary>
|
||||
/// <param name="tenantId">Tenant identifier for bucket selection.</param>
|
||||
/// <param name="data">Payload data to store.</param>
|
||||
/// <param name="provenance">Provenance metadata for the payload.</param>
|
||||
/// <param name="contentType">MIME type of the content.</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
/// <returns>Reference to the stored payload.</returns>
|
||||
Task<PayloadReference> StoreAsync(
|
||||
string tenantId,
|
||||
ReadOnlyMemory<byte> data,
|
||||
ProvenanceMetadata provenance,
|
||||
string contentType = "application/json",
|
||||
CancellationToken cancellationToken = default);
|
||||
|
||||
/// <summary>
|
||||
/// Stores a payload from a stream.
|
||||
/// </summary>
|
||||
/// <param name="tenantId">Tenant identifier for bucket selection.</param>
|
||||
/// <param name="stream">Stream containing payload data.</param>
|
||||
/// <param name="provenance">Provenance metadata for the payload.</param>
|
||||
/// <param name="contentType">MIME type of the content.</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
/// <returns>Reference to the stored payload.</returns>
|
||||
Task<PayloadReference> StoreStreamAsync(
|
||||
string tenantId,
|
||||
Stream stream,
|
||||
ProvenanceMetadata provenance,
|
||||
string contentType = "application/json",
|
||||
CancellationToken cancellationToken = default);
|
||||
|
||||
/// <summary>
|
||||
/// Retrieves a payload by its reference.
|
||||
/// </summary>
|
||||
/// <param name="reference">Reference to the payload.</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
/// <returns>Payload data, or null if not found.</returns>
|
||||
Task<byte[]?> RetrieveAsync(
|
||||
PayloadReference reference,
|
||||
CancellationToken cancellationToken = default);
|
||||
|
||||
/// <summary>
|
||||
/// Retrieves a payload as a stream.
|
||||
/// </summary>
|
||||
/// <param name="reference">Reference to the payload.</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
/// <returns>Stream containing payload data, or null if not found.</returns>
|
||||
Task<Stream?> RetrieveStreamAsync(
|
||||
PayloadReference reference,
|
||||
CancellationToken cancellationToken = default);
|
||||
|
||||
/// <summary>
|
||||
/// Checks if an object exists.
|
||||
/// </summary>
|
||||
/// <param name="pointer">Object pointer to check.</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
/// <returns>True if object exists.</returns>
|
||||
Task<bool> ExistsAsync(
|
||||
ObjectPointer pointer,
|
||||
CancellationToken cancellationToken = default);
|
||||
|
||||
/// <summary>
|
||||
/// Deletes an object.
|
||||
/// </summary>
|
||||
/// <param name="pointer">Object pointer to delete.</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
Task DeleteAsync(
|
||||
ObjectPointer pointer,
|
||||
CancellationToken cancellationToken = default);
|
||||
|
||||
/// <summary>
|
||||
/// Ensures the tenant bucket exists.
|
||||
/// </summary>
|
||||
/// <param name="tenantId">Tenant identifier.</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
Task EnsureBucketExistsAsync(
|
||||
string tenantId,
|
||||
CancellationToken cancellationToken = default);
|
||||
|
||||
/// <summary>
|
||||
/// Verifies a payload's integrity by comparing its hash.
|
||||
/// </summary>
|
||||
/// <param name="reference">Reference to verify.</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
/// <returns>True if hash matches.</returns>
|
||||
Task<bool> VerifyIntegrityAsync(
|
||||
PayloadReference reference,
|
||||
CancellationToken cancellationToken = default);
|
||||
}
|
||||
@@ -1,63 +0,0 @@
|
||||
namespace StellaOps.Concelier.Storage.Mongo.ObjectStorage;
|
||||
|
||||
/// <summary>
|
||||
/// Record of a migration from GridFS to S3.
|
||||
/// </summary>
|
||||
public sealed record MigrationRecord
|
||||
{
|
||||
/// <summary>
|
||||
/// Original GridFS ObjectId.
|
||||
/// </summary>
|
||||
public required string GridFsId { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Pointer to the migrated object.
|
||||
/// </summary>
|
||||
public required ObjectPointer Pointer { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Timestamp when migration was performed.
|
||||
/// </summary>
|
||||
public required DateTimeOffset MigratedAt { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Current status of the migration.
|
||||
/// </summary>
|
||||
public required MigrationStatus Status { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Timestamp when content hash was verified post-migration.
|
||||
/// </summary>
|
||||
public DateTimeOffset? VerifiedAt { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Whether GridFS tombstone still exists for rollback.
|
||||
/// </summary>
|
||||
public bool RollbackAvailable { get; init; } = true;
|
||||
|
||||
/// <summary>
|
||||
/// Error message if migration failed.
|
||||
/// </summary>
|
||||
public string? ErrorMessage { get; init; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Status of a GridFS to S3 migration.
|
||||
/// </summary>
|
||||
public enum MigrationStatus
|
||||
{
|
||||
/// <summary>Migration pending.</summary>
|
||||
Pending,
|
||||
|
||||
/// <summary>Migration completed.</summary>
|
||||
Migrated,
|
||||
|
||||
/// <summary>Migration verified via hash comparison.</summary>
|
||||
Verified,
|
||||
|
||||
/// <summary>Migration failed.</summary>
|
||||
Failed,
|
||||
|
||||
/// <summary>Original GridFS tombstoned.</summary>
|
||||
Tombstoned
|
||||
}
|
||||
@@ -1,232 +0,0 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using MongoDB.Bson;
|
||||
using MongoDB.Bson.Serialization.Attributes;
|
||||
using MongoDB.Driver;
|
||||
|
||||
namespace StellaOps.Concelier.Storage.Mongo.ObjectStorage;
|
||||
|
||||
/// <summary>
|
||||
/// MongoDB-backed migration tracker for GridFS to S3 migrations.
|
||||
/// </summary>
|
||||
public sealed class MongoMigrationTracker : IMigrationTracker
|
||||
{
|
||||
private const string CollectionName = "object_storage_migrations";
|
||||
|
||||
private readonly IMongoCollection<MigrationDocument> _collection;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly ILogger<MongoMigrationTracker> _logger;
|
||||
|
||||
public MongoMigrationTracker(
|
||||
IMongoDatabase database,
|
||||
TimeProvider timeProvider,
|
||||
ILogger<MongoMigrationTracker> logger)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(database);
|
||||
_collection = database.GetCollection<MigrationDocument>(CollectionName);
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
public async Task<MigrationRecord> RecordMigrationAsync(
|
||||
string gridFsId,
|
||||
ObjectPointer pointer,
|
||||
MigrationStatus status,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(gridFsId);
|
||||
ArgumentNullException.ThrowIfNull(pointer);
|
||||
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var document = new MigrationDocument
|
||||
{
|
||||
GridFsId = gridFsId,
|
||||
Bucket = pointer.Bucket,
|
||||
Key = pointer.Key,
|
||||
Sha256 = pointer.Sha256,
|
||||
Size = pointer.Size,
|
||||
ContentType = pointer.ContentType,
|
||||
Encoding = pointer.Encoding.ToString().ToLowerInvariant(),
|
||||
MigratedAt = now.UtcDateTime,
|
||||
Status = status.ToString().ToLowerInvariant(),
|
||||
RollbackAvailable = true,
|
||||
};
|
||||
|
||||
await _collection.InsertOneAsync(document, cancellationToken: cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
_logger.LogInformation(
|
||||
"Recorded migration for GridFS {GridFsId} to {Bucket}/{Key}",
|
||||
gridFsId, pointer.Bucket, pointer.Key);
|
||||
|
||||
return ToRecord(document);
|
||||
}
|
||||
|
||||
public async Task UpdateStatusAsync(
|
||||
string gridFsId,
|
||||
MigrationStatus status,
|
||||
string? errorMessage = null,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(gridFsId);
|
||||
|
||||
var filter = Builders<MigrationDocument>.Filter.Eq(d => d.GridFsId, gridFsId);
|
||||
var update = Builders<MigrationDocument>.Update
|
||||
.Set(d => d.Status, status.ToString().ToLowerInvariant())
|
||||
.Set(d => d.ErrorMessage, errorMessage);
|
||||
|
||||
await _collection.UpdateOneAsync(filter, update, cancellationToken: cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
_logger.LogDebug("Updated migration status for {GridFsId} to {Status}", gridFsId, status);
|
||||
}
|
||||
|
||||
public async Task MarkVerifiedAsync(
|
||||
string gridFsId,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(gridFsId);
|
||||
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var filter = Builders<MigrationDocument>.Filter.Eq(d => d.GridFsId, gridFsId);
|
||||
var update = Builders<MigrationDocument>.Update
|
||||
.Set(d => d.Status, MigrationStatus.Verified.ToString().ToLowerInvariant())
|
||||
.Set(d => d.VerifiedAt, now.UtcDateTime);
|
||||
|
||||
await _collection.UpdateOneAsync(filter, update, cancellationToken: cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
_logger.LogDebug("Marked migration as verified for {GridFsId}", gridFsId);
|
||||
}
|
||||
|
||||
public async Task<MigrationRecord?> GetByGridFsIdAsync(
|
||||
string gridFsId,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(gridFsId);
|
||||
|
||||
var filter = Builders<MigrationDocument>.Filter.Eq(d => d.GridFsId, gridFsId);
|
||||
var document = await _collection.Find(filter)
|
||||
.FirstOrDefaultAsync(cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
return document is null ? null : ToRecord(document);
|
||||
}
|
||||
|
||||
public async Task<IReadOnlyList<MigrationRecord>> ListPendingAsync(
|
||||
int limit = 100,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
var filter = Builders<MigrationDocument>.Filter.Eq(
|
||||
d => d.Status, MigrationStatus.Pending.ToString().ToLowerInvariant());
|
||||
|
||||
var documents = await _collection.Find(filter)
|
||||
.Limit(limit)
|
||||
.ToListAsync(cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
return documents.Select(ToRecord).ToList();
|
||||
}
|
||||
|
||||
public async Task<IReadOnlyList<MigrationRecord>> ListNeedingVerificationAsync(
|
||||
int limit = 100,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
var filter = Builders<MigrationDocument>.Filter.Eq(
|
||||
d => d.Status, MigrationStatus.Migrated.ToString().ToLowerInvariant());
|
||||
|
||||
var documents = await _collection.Find(filter)
|
||||
.Limit(limit)
|
||||
.ToListAsync(cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
return documents.Select(ToRecord).ToList();
|
||||
}
|
||||
|
||||
public async Task<bool> IsMigratedAsync(
|
||||
string gridFsId,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(gridFsId);
|
||||
|
||||
var filter = Builders<MigrationDocument>.Filter.And(
|
||||
Builders<MigrationDocument>.Filter.Eq(d => d.GridFsId, gridFsId),
|
||||
Builders<MigrationDocument>.Filter.In(d => d.Status, new[]
|
||||
{
|
||||
MigrationStatus.Migrated.ToString().ToLowerInvariant(),
|
||||
MigrationStatus.Verified.ToString().ToLowerInvariant()
|
||||
}));
|
||||
|
||||
var count = await _collection.CountDocumentsAsync(filter, cancellationToken: cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
return count > 0;
|
||||
}
|
||||
|
||||
private static MigrationRecord ToRecord(MigrationDocument document)
|
||||
{
|
||||
return new MigrationRecord
|
||||
{
|
||||
GridFsId = document.GridFsId,
|
||||
Pointer = new ObjectPointer
|
||||
{
|
||||
Bucket = document.Bucket,
|
||||
Key = document.Key,
|
||||
Sha256 = document.Sha256,
|
||||
Size = document.Size,
|
||||
ContentType = document.ContentType,
|
||||
Encoding = Enum.Parse<ContentEncoding>(document.Encoding, ignoreCase: true),
|
||||
},
|
||||
MigratedAt = new DateTimeOffset(document.MigratedAt, TimeSpan.Zero),
|
||||
Status = Enum.Parse<MigrationStatus>(document.Status, ignoreCase: true),
|
||||
VerifiedAt = document.VerifiedAt.HasValue
|
||||
? new DateTimeOffset(document.VerifiedAt.Value, TimeSpan.Zero)
|
||||
: null,
|
||||
RollbackAvailable = document.RollbackAvailable,
|
||||
ErrorMessage = document.ErrorMessage,
|
||||
};
|
||||
}
|
||||
|
||||
[BsonIgnoreExtraElements]
|
||||
private sealed class MigrationDocument
|
||||
{
|
||||
[BsonId]
|
||||
[BsonRepresentation(BsonType.ObjectId)]
|
||||
public string? Id { get; set; }
|
||||
|
||||
[BsonElement("gridFsId")]
|
||||
public required string GridFsId { get; set; }
|
||||
|
||||
[BsonElement("bucket")]
|
||||
public required string Bucket { get; set; }
|
||||
|
||||
[BsonElement("key")]
|
||||
public required string Key { get; set; }
|
||||
|
||||
[BsonElement("sha256")]
|
||||
public required string Sha256 { get; set; }
|
||||
|
||||
[BsonElement("size")]
|
||||
public required long Size { get; set; }
|
||||
|
||||
[BsonElement("contentType")]
|
||||
public required string ContentType { get; set; }
|
||||
|
||||
[BsonElement("encoding")]
|
||||
public required string Encoding { get; set; }
|
||||
|
||||
[BsonElement("migratedAt")]
|
||||
public required DateTime MigratedAt { get; set; }
|
||||
|
||||
[BsonElement("status")]
|
||||
public required string Status { get; set; }
|
||||
|
||||
[BsonElement("verifiedAt")]
|
||||
public DateTime? VerifiedAt { get; set; }
|
||||
|
||||
[BsonElement("rollbackAvailable")]
|
||||
public bool RollbackAvailable { get; set; }
|
||||
|
||||
[BsonElement("errorMessage")]
|
||||
public string? ErrorMessage { get; set; }
|
||||
}
|
||||
}
|
||||
@@ -1,52 +0,0 @@
|
||||
namespace StellaOps.Concelier.Storage.Mongo.ObjectStorage;
|
||||
|
||||
/// <summary>
|
||||
/// Deterministic pointer to an object in S3-compatible storage.
|
||||
/// </summary>
|
||||
public sealed record ObjectPointer
|
||||
{
|
||||
/// <summary>
|
||||
/// S3 bucket name (tenant-prefixed).
|
||||
/// </summary>
|
||||
public required string Bucket { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Object key (deterministic, content-addressed).
|
||||
/// </summary>
|
||||
public required string Key { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// SHA-256 hash of object content (hex encoded).
|
||||
/// </summary>
|
||||
public required string Sha256 { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Object size in bytes.
|
||||
/// </summary>
|
||||
public required long Size { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// MIME type of the object.
|
||||
/// </summary>
|
||||
public string ContentType { get; init; } = "application/octet-stream";
|
||||
|
||||
/// <summary>
|
||||
/// Content encoding if compressed.
|
||||
/// </summary>
|
||||
public ContentEncoding Encoding { get; init; } = ContentEncoding.Identity;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Content encoding for stored objects.
|
||||
/// </summary>
|
||||
public enum ContentEncoding
|
||||
{
|
||||
/// <summary>No compression.</summary>
|
||||
Identity,
|
||||
|
||||
/// <summary>Gzip compression.</summary>
|
||||
Gzip,
|
||||
|
||||
/// <summary>Zstandard compression.</summary>
|
||||
Zstd
|
||||
}
|
||||
@@ -1,75 +0,0 @@
|
||||
namespace StellaOps.Concelier.Storage.Mongo.ObjectStorage;
|
||||
|
||||
/// <summary>
|
||||
/// Configuration options for S3-compatible object storage.
|
||||
/// </summary>
|
||||
public sealed class ObjectStorageOptions
|
||||
{
|
||||
/// <summary>
|
||||
/// Configuration section name.
|
||||
/// </summary>
|
||||
public const string SectionName = "Concelier:ObjectStorage";
|
||||
|
||||
/// <summary>
|
||||
/// S3-compatible endpoint URL (MinIO, AWS S3, etc.).
|
||||
/// </summary>
|
||||
public string Endpoint { get; set; } = "http://localhost:9000";
|
||||
|
||||
/// <summary>
|
||||
/// Storage region (use 'us-east-1' for MinIO).
|
||||
/// </summary>
|
||||
public string Region { get; set; } = "us-east-1";
|
||||
|
||||
/// <summary>
|
||||
/// Use path-style addressing (required for MinIO).
|
||||
/// </summary>
|
||||
public bool UsePathStyle { get; set; } = true;
|
||||
|
||||
/// <summary>
|
||||
/// Prefix for tenant bucket names.
|
||||
/// </summary>
|
||||
public string BucketPrefix { get; set; } = "stellaops-concelier-";
|
||||
|
||||
/// <summary>
|
||||
/// Maximum object size in bytes (default 5GB).
|
||||
/// </summary>
|
||||
public long MaxObjectSize { get; set; } = 5L * 1024 * 1024 * 1024;
|
||||
|
||||
/// <summary>
|
||||
/// Objects larger than this (bytes) will be compressed.
|
||||
/// Default: 1MB.
|
||||
/// </summary>
|
||||
public int CompressionThreshold { get; set; } = 1024 * 1024;
|
||||
|
||||
/// <summary>
|
||||
/// Objects smaller than this (bytes) will be stored inline.
|
||||
/// Default: 64KB.
|
||||
/// </summary>
|
||||
public int InlineThreshold { get; set; } = 64 * 1024;
|
||||
|
||||
/// <summary>
|
||||
/// Whether object storage is enabled. When false, uses GridFS fallback.
|
||||
/// </summary>
|
||||
public bool Enabled { get; set; } = false;
|
||||
|
||||
/// <summary>
|
||||
/// AWS access key ID (or MinIO access key).
|
||||
/// </summary>
|
||||
public string? AccessKeyId { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// AWS secret access key (or MinIO secret key).
|
||||
/// </summary>
|
||||
public string? SecretAccessKey { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Gets the bucket name for a tenant.
|
||||
/// </summary>
|
||||
public string GetBucketName(string tenantId)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
|
||||
// Normalize tenant ID to lowercase and replace invalid characters
|
||||
var normalized = tenantId.ToLowerInvariant().Replace('_', '-');
|
||||
return $"{BucketPrefix}{normalized}";
|
||||
}
|
||||
}
|
||||
@@ -1,128 +0,0 @@
|
||||
using Amazon;
|
||||
using Amazon.Runtime;
|
||||
using Amazon.S3;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.DependencyInjection.Extensions;
|
||||
using Microsoft.Extensions.Options;
|
||||
|
||||
namespace StellaOps.Concelier.Storage.Mongo.ObjectStorage;
|
||||
|
||||
/// <summary>
|
||||
/// Extension methods for registering object storage services.
|
||||
/// </summary>
|
||||
public static class ObjectStorageServiceCollectionExtensions
|
||||
{
|
||||
/// <summary>
|
||||
/// Adds object storage services for Concelier raw payload storage.
|
||||
/// </summary>
|
||||
public static IServiceCollection AddConcelierObjectStorage(
|
||||
this IServiceCollection services,
|
||||
IConfiguration configuration)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(services);
|
||||
ArgumentNullException.ThrowIfNull(configuration);
|
||||
|
||||
// Bind options
|
||||
services.Configure<ObjectStorageOptions>(
|
||||
configuration.GetSection(ObjectStorageOptions.SectionName));
|
||||
|
||||
// Register TimeProvider if not already registered
|
||||
services.TryAddSingleton(TimeProvider.System);
|
||||
|
||||
// Register S3 client
|
||||
services.TryAddSingleton<IAmazonS3>(sp =>
|
||||
{
|
||||
var options = sp.GetRequiredService<IOptions<ObjectStorageOptions>>().Value;
|
||||
|
||||
var config = new AmazonS3Config
|
||||
{
|
||||
RegionEndpoint = RegionEndpoint.GetBySystemName(options.Region),
|
||||
ForcePathStyle = options.UsePathStyle,
|
||||
};
|
||||
|
||||
if (!string.IsNullOrEmpty(options.Endpoint))
|
||||
{
|
||||
config.ServiceURL = options.Endpoint;
|
||||
}
|
||||
|
||||
if (!string.IsNullOrEmpty(options.AccessKeyId) &&
|
||||
!string.IsNullOrEmpty(options.SecretAccessKey))
|
||||
{
|
||||
var credentials = new BasicAWSCredentials(
|
||||
options.AccessKeyId,
|
||||
options.SecretAccessKey);
|
||||
return new AmazonS3Client(credentials, config);
|
||||
}
|
||||
|
||||
// Use default credentials chain (env vars, IAM role, etc.)
|
||||
return new AmazonS3Client(config);
|
||||
});
|
||||
|
||||
// Register object store
|
||||
services.TryAddSingleton<IObjectStore, S3ObjectStore>();
|
||||
|
||||
// Register migration tracker
|
||||
services.TryAddSingleton<IMigrationTracker, MongoMigrationTracker>();
|
||||
|
||||
// Register migration service
|
||||
services.TryAddSingleton<GridFsMigrationService>();
|
||||
|
||||
return services;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Adds object storage services with explicit options.
|
||||
/// </summary>
|
||||
public static IServiceCollection AddConcelierObjectStorage(
|
||||
this IServiceCollection services,
|
||||
Action<ObjectStorageOptions> configureOptions)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(services);
|
||||
ArgumentNullException.ThrowIfNull(configureOptions);
|
||||
|
||||
services.Configure(configureOptions);
|
||||
|
||||
// Register TimeProvider if not already registered
|
||||
services.TryAddSingleton(TimeProvider.System);
|
||||
|
||||
// Register S3 client
|
||||
services.TryAddSingleton<IAmazonS3>(sp =>
|
||||
{
|
||||
var options = sp.GetRequiredService<IOptions<ObjectStorageOptions>>().Value;
|
||||
|
||||
var config = new AmazonS3Config
|
||||
{
|
||||
RegionEndpoint = RegionEndpoint.GetBySystemName(options.Region),
|
||||
ForcePathStyle = options.UsePathStyle,
|
||||
};
|
||||
|
||||
if (!string.IsNullOrEmpty(options.Endpoint))
|
||||
{
|
||||
config.ServiceURL = options.Endpoint;
|
||||
}
|
||||
|
||||
if (!string.IsNullOrEmpty(options.AccessKeyId) &&
|
||||
!string.IsNullOrEmpty(options.SecretAccessKey))
|
||||
{
|
||||
var credentials = new BasicAWSCredentials(
|
||||
options.AccessKeyId,
|
||||
options.SecretAccessKey);
|
||||
return new AmazonS3Client(credentials, config);
|
||||
}
|
||||
|
||||
return new AmazonS3Client(config);
|
||||
});
|
||||
|
||||
// Register object store
|
||||
services.TryAddSingleton<IObjectStore, S3ObjectStore>();
|
||||
|
||||
// Register migration tracker
|
||||
services.TryAddSingleton<IMigrationTracker, MongoMigrationTracker>();
|
||||
|
||||
// Register migration service
|
||||
services.TryAddSingleton<GridFsMigrationService>();
|
||||
|
||||
return services;
|
||||
}
|
||||
}
|
||||
@@ -1,79 +0,0 @@
|
||||
namespace StellaOps.Concelier.Storage.Mongo.ObjectStorage;
|
||||
|
||||
/// <summary>
|
||||
/// Reference to a large payload stored in object storage (used in advisory_observations).
|
||||
/// </summary>
|
||||
public sealed record PayloadReference
|
||||
{
|
||||
/// <summary>
|
||||
/// Discriminator for payload type.
|
||||
/// </summary>
|
||||
public const string TypeDiscriminator = "object-storage-ref";
|
||||
|
||||
/// <summary>
|
||||
/// Type discriminator value.
|
||||
/// </summary>
|
||||
public string Type { get; init; } = TypeDiscriminator;
|
||||
|
||||
/// <summary>
|
||||
/// Pointer to the object in storage.
|
||||
/// </summary>
|
||||
public required ObjectPointer Pointer { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Provenance metadata for the payload.
|
||||
/// </summary>
|
||||
public required ProvenanceMetadata Provenance { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// If true, payload is small enough to be inline (not in object storage).
|
||||
/// </summary>
|
||||
public bool Inline { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Base64-encoded inline data (only if Inline=true and size less than threshold).
|
||||
/// </summary>
|
||||
public string? InlineData { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Creates a reference for inline data.
|
||||
/// </summary>
|
||||
public static PayloadReference CreateInline(
|
||||
byte[] data,
|
||||
string sha256,
|
||||
ProvenanceMetadata provenance,
|
||||
string contentType = "application/octet-stream")
|
||||
{
|
||||
return new PayloadReference
|
||||
{
|
||||
Pointer = new ObjectPointer
|
||||
{
|
||||
Bucket = string.Empty,
|
||||
Key = string.Empty,
|
||||
Sha256 = sha256,
|
||||
Size = data.Length,
|
||||
ContentType = contentType,
|
||||
Encoding = ContentEncoding.Identity,
|
||||
},
|
||||
Provenance = provenance,
|
||||
Inline = true,
|
||||
InlineData = Convert.ToBase64String(data),
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a reference for object storage data.
|
||||
/// </summary>
|
||||
public static PayloadReference CreateObjectStorage(
|
||||
ObjectPointer pointer,
|
||||
ProvenanceMetadata provenance)
|
||||
{
|
||||
return new PayloadReference
|
||||
{
|
||||
Pointer = pointer,
|
||||
Provenance = provenance,
|
||||
Inline = false,
|
||||
InlineData = null,
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -1,86 +0,0 @@
|
||||
namespace StellaOps.Concelier.Storage.Mongo.ObjectStorage;
|
||||
|
||||
/// <summary>
|
||||
/// Provenance metadata preserved from original ingestion.
|
||||
/// </summary>
|
||||
public sealed record ProvenanceMetadata
|
||||
{
|
||||
/// <summary>
|
||||
/// Identifier of the original data source (URI).
|
||||
/// </summary>
|
||||
public required string SourceId { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// UTC timestamp of original ingestion.
|
||||
/// </summary>
|
||||
public required DateTimeOffset IngestedAt { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Tenant identifier for multi-tenant isolation.
|
||||
/// </summary>
|
||||
public required string TenantId { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Original format before normalization.
|
||||
/// </summary>
|
||||
public OriginalFormat? OriginalFormat { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Original size before any transformation.
|
||||
/// </summary>
|
||||
public long? OriginalSize { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// List of transformations applied.
|
||||
/// </summary>
|
||||
public IReadOnlyList<TransformationRecord> Transformations { get; init; } = [];
|
||||
|
||||
/// <summary>
|
||||
/// Original GridFS ObjectId for migration tracking.
|
||||
/// </summary>
|
||||
public string? GridFsLegacyId { get; init; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Original format of ingested data.
|
||||
/// </summary>
|
||||
public enum OriginalFormat
|
||||
{
|
||||
Json,
|
||||
Xml,
|
||||
Csv,
|
||||
Ndjson,
|
||||
Yaml
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Record of a transformation applied to the payload.
|
||||
/// </summary>
|
||||
public sealed record TransformationRecord
|
||||
{
|
||||
/// <summary>
|
||||
/// Type of transformation.
|
||||
/// </summary>
|
||||
public required TransformationType Type { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Timestamp when transformation was applied.
|
||||
/// </summary>
|
||||
public required DateTimeOffset Timestamp { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Agent/service that performed the transformation.
|
||||
/// </summary>
|
||||
public required string Agent { get; init; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Types of transformations that can be applied.
|
||||
/// </summary>
|
||||
public enum TransformationType
|
||||
{
|
||||
Compression,
|
||||
Normalization,
|
||||
Redaction,
|
||||
Migration
|
||||
}
|
||||
@@ -1,320 +0,0 @@
|
||||
using System.IO.Compression;
|
||||
using System.Security.Cryptography;
|
||||
using Amazon.S3;
|
||||
using Amazon.S3.Model;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
|
||||
namespace StellaOps.Concelier.Storage.Mongo.ObjectStorage;
|
||||
|
||||
/// <summary>
|
||||
/// S3-compatible object store implementation for raw advisory payloads.
|
||||
/// </summary>
|
||||
public sealed class S3ObjectStore : IObjectStore
|
||||
{
|
||||
private readonly IAmazonS3 _s3;
|
||||
private readonly ObjectStorageOptions _options;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly ILogger<S3ObjectStore> _logger;
|
||||
|
||||
public S3ObjectStore(
|
||||
IAmazonS3 s3,
|
||||
IOptions<ObjectStorageOptions> options,
|
||||
TimeProvider timeProvider,
|
||||
ILogger<S3ObjectStore> logger)
|
||||
{
|
||||
_s3 = s3 ?? throw new ArgumentNullException(nameof(s3));
|
||||
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
public async Task<PayloadReference> StoreAsync(
|
||||
string tenantId,
|
||||
ReadOnlyMemory<byte> data,
|
||||
ProvenanceMetadata provenance,
|
||||
string contentType = "application/json",
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
|
||||
ArgumentNullException.ThrowIfNull(provenance);
|
||||
|
||||
var dataArray = data.ToArray();
|
||||
var sha256 = ComputeSha256(dataArray);
|
||||
|
||||
// Use inline storage for small payloads
|
||||
if (dataArray.Length < _options.InlineThreshold)
|
||||
{
|
||||
_logger.LogDebug(
|
||||
"Storing inline payload for tenant {TenantId}, size {Size} bytes",
|
||||
tenantId, dataArray.Length);
|
||||
|
||||
return PayloadReference.CreateInline(dataArray, sha256, provenance, contentType);
|
||||
}
|
||||
|
||||
// Store in S3
|
||||
var bucket = _options.GetBucketName(tenantId);
|
||||
await EnsureBucketExistsAsync(tenantId, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var shouldCompress = dataArray.Length >= _options.CompressionThreshold;
|
||||
var encoding = ContentEncoding.Identity;
|
||||
byte[] payloadToStore = dataArray;
|
||||
|
||||
if (shouldCompress)
|
||||
{
|
||||
payloadToStore = CompressGzip(dataArray);
|
||||
encoding = ContentEncoding.Gzip;
|
||||
_logger.LogDebug(
|
||||
"Compressed payload from {OriginalSize} to {CompressedSize} bytes",
|
||||
dataArray.Length, payloadToStore.Length);
|
||||
}
|
||||
|
||||
var key = GenerateKey(sha256, provenance.IngestedAt, contentType, encoding);
|
||||
|
||||
var request = new PutObjectRequest
|
||||
{
|
||||
BucketName = bucket,
|
||||
Key = key,
|
||||
InputStream = new MemoryStream(payloadToStore),
|
||||
ContentType = encoding == ContentEncoding.Gzip ? "application/gzip" : contentType,
|
||||
AutoCloseStream = true,
|
||||
};
|
||||
|
||||
// Add metadata
|
||||
request.Metadata["x-stellaops-sha256"] = sha256;
|
||||
request.Metadata["x-stellaops-original-size"] = dataArray.Length.ToString();
|
||||
request.Metadata["x-stellaops-encoding"] = encoding.ToString().ToLowerInvariant();
|
||||
request.Metadata["x-stellaops-source-id"] = provenance.SourceId;
|
||||
request.Metadata["x-stellaops-ingested-at"] = provenance.IngestedAt.ToString("O");
|
||||
|
||||
await _s3.PutObjectAsync(request, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
_logger.LogDebug(
|
||||
"Stored object {Bucket}/{Key}, size {Size} bytes, encoding {Encoding}",
|
||||
bucket, key, payloadToStore.Length, encoding);
|
||||
|
||||
var pointer = new ObjectPointer
|
||||
{
|
||||
Bucket = bucket,
|
||||
Key = key,
|
||||
Sha256 = sha256,
|
||||
Size = payloadToStore.Length,
|
||||
ContentType = contentType,
|
||||
Encoding = encoding,
|
||||
};
|
||||
|
||||
return PayloadReference.CreateObjectStorage(pointer, provenance);
|
||||
}
|
||||
|
||||
public async Task<PayloadReference> StoreStreamAsync(
|
||||
string tenantId,
|
||||
Stream stream,
|
||||
ProvenanceMetadata provenance,
|
||||
string contentType = "application/json",
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
|
||||
ArgumentNullException.ThrowIfNull(stream);
|
||||
ArgumentNullException.ThrowIfNull(provenance);
|
||||
|
||||
// Read stream to memory for hash computation
|
||||
using var memoryStream = new MemoryStream();
|
||||
await stream.CopyToAsync(memoryStream, cancellationToken).ConfigureAwait(false);
|
||||
var data = memoryStream.ToArray();
|
||||
|
||||
return await StoreAsync(tenantId, data, provenance, contentType, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task<byte[]?> RetrieveAsync(
|
||||
PayloadReference reference,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(reference);
|
||||
|
||||
// Handle inline data
|
||||
if (reference.Inline && reference.InlineData is not null)
|
||||
{
|
||||
return Convert.FromBase64String(reference.InlineData);
|
||||
}
|
||||
|
||||
var stream = await RetrieveStreamAsync(reference, cancellationToken).ConfigureAwait(false);
|
||||
if (stream is null)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
using (stream)
|
||||
{
|
||||
using var memoryStream = new MemoryStream();
|
||||
await stream.CopyToAsync(memoryStream, cancellationToken).ConfigureAwait(false);
|
||||
return memoryStream.ToArray();
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<Stream?> RetrieveStreamAsync(
|
||||
PayloadReference reference,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(reference);
|
||||
|
||||
// Handle inline data
|
||||
if (reference.Inline && reference.InlineData is not null)
|
||||
{
|
||||
return new MemoryStream(Convert.FromBase64String(reference.InlineData));
|
||||
}
|
||||
|
||||
var pointer = reference.Pointer;
|
||||
try
|
||||
{
|
||||
var response = await _s3.GetObjectAsync(pointer.Bucket, pointer.Key, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
Stream resultStream = response.ResponseStream;
|
||||
|
||||
// Decompress if needed
|
||||
if (pointer.Encoding == ContentEncoding.Gzip)
|
||||
{
|
||||
var decompressed = new MemoryStream();
|
||||
using (var gzip = new GZipStream(response.ResponseStream, CompressionMode.Decompress))
|
||||
{
|
||||
await gzip.CopyToAsync(decompressed, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
decompressed.Position = 0;
|
||||
resultStream = decompressed;
|
||||
}
|
||||
|
||||
return resultStream;
|
||||
}
|
||||
catch (AmazonS3Exception ex) when (ex.StatusCode == System.Net.HttpStatusCode.NotFound)
|
||||
{
|
||||
_logger.LogWarning("Object not found: {Bucket}/{Key}", pointer.Bucket, pointer.Key);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<bool> ExistsAsync(
|
||||
ObjectPointer pointer,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(pointer);
|
||||
|
||||
try
|
||||
{
|
||||
var metadata = await _s3.GetObjectMetadataAsync(pointer.Bucket, pointer.Key, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
return metadata.HttpStatusCode == System.Net.HttpStatusCode.OK;
|
||||
}
|
||||
catch (AmazonS3Exception ex) when (ex.StatusCode == System.Net.HttpStatusCode.NotFound)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task DeleteAsync(
|
||||
ObjectPointer pointer,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(pointer);
|
||||
|
||||
await _s3.DeleteObjectAsync(pointer.Bucket, pointer.Key, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
_logger.LogDebug("Deleted object {Bucket}/{Key}", pointer.Bucket, pointer.Key);
|
||||
}
|
||||
|
||||
public async Task EnsureBucketExistsAsync(
|
||||
string tenantId,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
|
||||
|
||||
var bucket = _options.GetBucketName(tenantId);
|
||||
|
||||
try
|
||||
{
|
||||
await _s3.EnsureBucketExistsAsync(bucket).ConfigureAwait(false);
|
||||
_logger.LogDebug("Ensured bucket exists: {Bucket}", bucket);
|
||||
}
|
||||
catch (AmazonS3Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to ensure bucket exists: {Bucket}", bucket);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<bool> VerifyIntegrityAsync(
|
||||
PayloadReference reference,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(reference);
|
||||
|
||||
var data = await RetrieveAsync(reference, cancellationToken).ConfigureAwait(false);
|
||||
if (data is null)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
var computedHash = ComputeSha256(data);
|
||||
var matches = string.Equals(computedHash, reference.Pointer.Sha256, StringComparison.OrdinalIgnoreCase);
|
||||
|
||||
if (!matches)
|
||||
{
|
||||
_logger.LogWarning(
|
||||
"Integrity check failed for {Bucket}/{Key}: expected {Expected}, got {Actual}",
|
||||
reference.Pointer.Bucket, reference.Pointer.Key,
|
||||
reference.Pointer.Sha256, computedHash);
|
||||
}
|
||||
|
||||
return matches;
|
||||
}
|
||||
|
||||
private static string ComputeSha256(byte[] data)
|
||||
{
|
||||
var hash = SHA256.HashData(data);
|
||||
return Convert.ToHexStringLower(hash);
|
||||
}
|
||||
|
||||
private static byte[] CompressGzip(byte[] data)
|
||||
{
|
||||
using var output = new MemoryStream();
|
||||
using (var gzip = new GZipStream(output, CompressionLevel.Optimal, leaveOpen: true))
|
||||
{
|
||||
gzip.Write(data);
|
||||
}
|
||||
return output.ToArray();
|
||||
}
|
||||
|
||||
private static string GenerateKey(
|
||||
string sha256,
|
||||
DateTimeOffset ingestedAt,
|
||||
string contentType,
|
||||
ContentEncoding encoding)
|
||||
{
|
||||
var date = ingestedAt.UtcDateTime;
|
||||
var extension = GetExtension(contentType, encoding);
|
||||
|
||||
// Format: advisories/raw/YYYY/MM/DD/sha256-{hash}.{extension}
|
||||
return $"advisories/raw/{date:yyyy}/{date:MM}/{date:dd}/sha256-{sha256[..16]}{extension}";
|
||||
}
|
||||
|
||||
private static string GetExtension(string contentType, ContentEncoding encoding)
|
||||
{
|
||||
var baseExt = contentType switch
|
||||
{
|
||||
"application/json" => ".json",
|
||||
"application/xml" or "text/xml" => ".xml",
|
||||
"text/csv" => ".csv",
|
||||
"application/x-ndjson" => ".ndjson",
|
||||
"application/x-yaml" or "text/yaml" => ".yaml",
|
||||
_ => ".bin"
|
||||
};
|
||||
|
||||
return encoding switch
|
||||
{
|
||||
ContentEncoding.Gzip => baseExt + ".gz",
|
||||
ContentEncoding.Zstd => baseExt + ".zst",
|
||||
_ => baseExt
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -1,23 +0,0 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net10.0</TargetFramework>
|
||||
<LangVersion>preview</LangVersion>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
|
||||
</PropertyGroup>
|
||||
<ItemGroup>
|
||||
<PackageReference Include="AWSSDK.S3" Version="3.7.305.6" />
|
||||
<PackageReference Include="MongoDB.Driver" Version="3.5.0" />
|
||||
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="10.0.0" />
|
||||
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="10.0.0" />
|
||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="10.0.0" />
|
||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.0" />
|
||||
<PackageReference Include="Microsoft.Extensions.Options" Version="10.0.0" />
|
||||
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="10.0.0" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="../StellaOps.Concelier.RawModels/StellaOps.Concelier.RawModels.csproj" />
|
||||
<ProjectReference Include="../StellaOps.Concelier.Models/StellaOps.Concelier.Models.csproj" />
|
||||
</ItemGroup>
|
||||
</Project>
|
||||
Reference in New Issue
Block a user