using Dapper; using Microsoft.Extensions.Logging; using StellaOps.Messaging.Abstractions; namespace StellaOps.Messaging.Transport.Postgres; /// /// PostgreSQL implementation of . /// Uses INSERT ... ON CONFLICT DO NOTHING for atomic claiming. /// public sealed class PostgresIdempotencyStore : IIdempotencyStore { private readonly PostgresConnectionFactory _connectionFactory; private readonly string _name; private readonly ILogger? _logger; private readonly TimeProvider _timeProvider; private bool _tableInitialized; public PostgresIdempotencyStore( PostgresConnectionFactory connectionFactory, string name, ILogger? logger = null, TimeProvider? timeProvider = null) { _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); _name = name ?? throw new ArgumentNullException(nameof(name)); _logger = logger; _timeProvider = timeProvider ?? TimeProvider.System; } /// public string ProviderName => "postgres"; private string TableName => $"{_connectionFactory.Schema}.idempotency_{_name.ToLowerInvariant().Replace("-", "_")}"; /// public async ValueTask TryClaimAsync( string key, string value, TimeSpan window, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(key); ArgumentNullException.ThrowIfNull(value); await EnsureTableExistsAsync(cancellationToken).ConfigureAwait(false); await using var conn = await _connectionFactory.OpenConnectionAsync(cancellationToken).ConfigureAwait(false); var now = _timeProvider.GetUtcNow(); var expiresAt = now.Add(window); // Clean up expired entries first var cleanupSql = $@"DELETE FROM {TableName} WHERE expires_at < @Now"; await conn.ExecuteAsync(new CommandDefinition(cleanupSql, new { Now = now.UtcDateTime }, cancellationToken: cancellationToken)) .ConfigureAwait(false); // Try to insert var sql = $@" INSERT INTO {TableName} (key, value, expires_at) VALUES (@Key, @Value, @ExpiresAt) ON CONFLICT (key) DO NOTHING RETURNING TRUE"; var result = await conn.ExecuteScalarAsync( new CommandDefinition(sql, new { Key = key, Value = value, ExpiresAt = expiresAt.UtcDateTime }, cancellationToken: cancellationToken)) .ConfigureAwait(false); if (result == true) { return IdempotencyResult.Claimed(); } // Key already exists, get existing value var existingSql = $@"SELECT value FROM {TableName} WHERE key = @Key"; var existingValue = await conn.ExecuteScalarAsync( new CommandDefinition(existingSql, new { Key = key }, cancellationToken: cancellationToken)) .ConfigureAwait(false); return IdempotencyResult.Duplicate(existingValue ?? string.Empty); } /// public async ValueTask ExistsAsync(string key, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(key); await EnsureTableExistsAsync(cancellationToken).ConfigureAwait(false); await using var conn = await _connectionFactory.OpenConnectionAsync(cancellationToken).ConfigureAwait(false); var now = _timeProvider.GetUtcNow(); var sql = $@"SELECT EXISTS(SELECT 1 FROM {TableName} WHERE key = @Key AND expires_at > @Now)"; return await conn.ExecuteScalarAsync( new CommandDefinition(sql, new { Key = key, Now = now.UtcDateTime }, cancellationToken: cancellationToken)) .ConfigureAwait(false); } /// public async ValueTask GetAsync(string key, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(key); await EnsureTableExistsAsync(cancellationToken).ConfigureAwait(false); await using var conn = await _connectionFactory.OpenConnectionAsync(cancellationToken).ConfigureAwait(false); var now = _timeProvider.GetUtcNow(); var sql = $@"SELECT value FROM {TableName} WHERE key = @Key AND expires_at > @Now"; return await conn.ExecuteScalarAsync( new CommandDefinition(sql, new { Key = key, Now = now.UtcDateTime }, cancellationToken: cancellationToken)) .ConfigureAwait(false); } /// public async ValueTask ReleaseAsync(string key, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(key); await EnsureTableExistsAsync(cancellationToken).ConfigureAwait(false); await using var conn = await _connectionFactory.OpenConnectionAsync(cancellationToken).ConfigureAwait(false); var sql = $@"DELETE FROM {TableName} WHERE key = @Key"; var deleted = await conn.ExecuteAsync( new CommandDefinition(sql, new { Key = key }, cancellationToken: cancellationToken)) .ConfigureAwait(false); return deleted > 0; } /// public async ValueTask ExtendAsync( string key, TimeSpan extension, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(key); await EnsureTableExistsAsync(cancellationToken).ConfigureAwait(false); await using var conn = await _connectionFactory.OpenConnectionAsync(cancellationToken).ConfigureAwait(false); var sql = $@" UPDATE {TableName} SET expires_at = expires_at + @Extension WHERE key = @Key"; var updated = await conn.ExecuteAsync( new CommandDefinition(sql, new { Key = key, Extension = extension }, cancellationToken: cancellationToken)) .ConfigureAwait(false); return updated > 0; } private async ValueTask EnsureTableExistsAsync(CancellationToken cancellationToken) { if (_tableInitialized) return; await using var conn = await _connectionFactory.OpenConnectionAsync(cancellationToken).ConfigureAwait(false); var safeName = _name.ToLowerInvariant().Replace("-", "_"); var sql = $@" CREATE TABLE IF NOT EXISTS {TableName} ( key TEXT PRIMARY KEY, value TEXT NOT NULL, expires_at TIMESTAMPTZ NOT NULL ); CREATE INDEX IF NOT EXISTS idx_{safeName}_expires ON {TableName} (expires_at);"; await conn.ExecuteAsync(new CommandDefinition(sql, cancellationToken: cancellationToken)).ConfigureAwait(false); _tableInitialized = true; } } /// /// Factory for creating PostgreSQL idempotency store instances. /// public sealed class PostgresIdempotencyStoreFactory : IIdempotencyStoreFactory { private readonly PostgresConnectionFactory _connectionFactory; private readonly ILoggerFactory? _loggerFactory; private readonly TimeProvider _timeProvider; public PostgresIdempotencyStoreFactory( PostgresConnectionFactory connectionFactory, ILoggerFactory? loggerFactory = null, TimeProvider? timeProvider = null) { _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); _loggerFactory = loggerFactory; _timeProvider = timeProvider ?? TimeProvider.System; } /// public string ProviderName => "postgres"; /// public IIdempotencyStore Create(string name) { ArgumentNullException.ThrowIfNull(name); return new PostgresIdempotencyStore( _connectionFactory, name, _loggerFactory?.CreateLogger(), _timeProvider); } }