using Microsoft.Extensions.Logging;
using MongoDB.Driver;
using StellaOps.Concelier.Models;
using StellaOps.Concelier.Storage.Mongo.Advisories;
using StellaOps.Concelier.Storage.Postgres.Advisories;
namespace StellaOps.Concelier.WebService.DualWrite;
///
/// Dual-write advisory store that writes to both MongoDB and PostgreSQL simultaneously.
/// Used during migration to verify parity between backends.
///
///
/// MongoDB is the primary store; PostgreSQL writes are best-effort with error logging.
/// Read operations are always served from MongoDB.
///
public sealed class DualWriteAdvisoryStore : IAdvisoryStore
{
private readonly AdvisoryStore _mongoStore;
private readonly IPostgresAdvisoryStore _postgresStore;
private readonly ILogger _logger;
public DualWriteAdvisoryStore(
AdvisoryStore mongoStore,
IPostgresAdvisoryStore postgresStore,
ILogger logger)
{
_mongoStore = mongoStore ?? throw new ArgumentNullException(nameof(mongoStore));
_postgresStore = postgresStore ?? throw new ArgumentNullException(nameof(postgresStore));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
///
public async Task UpsertAsync(Advisory advisory, CancellationToken cancellationToken, IClientSessionHandle? session = null)
{
// Write to MongoDB (primary)
await _mongoStore.UpsertAsync(advisory, cancellationToken, session).ConfigureAwait(false);
// Write to PostgreSQL (secondary, best-effort)
try
{
await _postgresStore.UpsertAsync(advisory, sourceId: null, cancellationToken).ConfigureAwait(false);
_logger.LogDebug("Dual-write success for advisory {AdvisoryKey}", advisory.AdvisoryKey);
}
catch (Exception ex)
{
// Log but don't fail - MongoDB is primary during migration
_logger.LogWarning(ex, "Dual-write to PostgreSQL failed for advisory {AdvisoryKey}. MongoDB write succeeded.", advisory.AdvisoryKey);
}
}
///
public Task FindAsync(string advisoryKey, CancellationToken cancellationToken, IClientSessionHandle? session = null)
{
// Always read from MongoDB during dual-write mode
return _mongoStore.FindAsync(advisoryKey, cancellationToken, session);
}
///
public Task> GetRecentAsync(int limit, CancellationToken cancellationToken, IClientSessionHandle? session = null)
{
// Always read from MongoDB during dual-write mode
return _mongoStore.GetRecentAsync(limit, cancellationToken, session);
}
///
public IAsyncEnumerable StreamAsync(CancellationToken cancellationToken, IClientSessionHandle? session = null)
{
// Always read from MongoDB during dual-write mode
return _mongoStore.StreamAsync(cancellationToken, session);
}
}