using System.Runtime.CompilerServices; using System.Text.Json; using Dapper; using Microsoft.Extensions.Logging; using StellaOps.Messaging.Abstractions; namespace StellaOps.Messaging.Transport.Postgres; /// /// PostgreSQL implementation of . /// Uses polling-based subscription with optional LISTEN/NOTIFY. /// public sealed class PostgresEventStream : IEventStream where TEvent : class { private readonly PostgresConnectionFactory _connectionFactory; private readonly EventStreamOptions _options; private readonly ILogger>? _logger; private readonly JsonSerializerOptions _jsonOptions; private readonly TimeProvider _timeProvider; private bool _tableInitialized; public PostgresEventStream( PostgresConnectionFactory connectionFactory, EventStreamOptions options, ILogger>? logger = null, JsonSerializerOptions? jsonOptions = null, TimeProvider? timeProvider = null) { _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); _options = options ?? throw new ArgumentNullException(nameof(options)); _logger = logger; _jsonOptions = jsonOptions ?? new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase, WriteIndented = false }; _timeProvider = timeProvider ?? TimeProvider.System; } /// public string ProviderName => "postgres"; /// public string StreamName => _options.StreamName; private string TableName => $"{_connectionFactory.Schema}.event_stream_{_options.StreamName.ToLowerInvariant().Replace("-", "_")}"; /// public async ValueTask PublishAsync( TEvent @event, EventPublishOptions? options = null, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(@event); await EnsureTableExistsAsync(cancellationToken).ConfigureAwait(false); await using var conn = await _connectionFactory.OpenConnectionAsync(cancellationToken).ConfigureAwait(false); var now = _timeProvider.GetUtcNow(); var eventJson = JsonSerializer.Serialize(@event, _jsonOptions); var sql = $@" INSERT INTO {TableName} (data, tenant_id, correlation_id, timestamp) VALUES (@Data::jsonb, @TenantId, @CorrelationId, @Timestamp) RETURNING id"; var id = await conn.ExecuteScalarAsync( new CommandDefinition(sql, new { Data = eventJson, TenantId = options?.TenantId, CorrelationId = options?.CorrelationId, Timestamp = now.UtcDateTime }, cancellationToken: cancellationToken)) .ConfigureAwait(false); // Auto-trim if configured if (_options.MaxLength.HasValue) { await TrimInternalAsync(conn, _options.MaxLength.Value, cancellationToken).ConfigureAwait(false); } var entryId = $"{now.ToUnixTimeMilliseconds()}-{id}"; return EventPublishResult.Succeeded(entryId); } /// public async ValueTask> PublishBatchAsync( IEnumerable events, EventPublishOptions? options = null, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(events); var results = new List(); foreach (var @event in events) { var result = await PublishAsync(@event, options, cancellationToken).ConfigureAwait(false); results.Add(result); } return results; } /// public async IAsyncEnumerable> SubscribeAsync( StreamPosition position, [EnumeratorCancellation] CancellationToken cancellationToken = default) { await EnsureTableExistsAsync(cancellationToken).ConfigureAwait(false); long lastId = position.Value switch { "0" => 0, "$" => long.MaxValue, // Will be resolved to actual max _ => ParseEntryId(position.Value) }; // If starting from end, get current max ID if (position.Value == "$") { await using var initConn = await _connectionFactory.OpenConnectionAsync(cancellationToken).ConfigureAwait(false); var maxIdSql = $@"SELECT COALESCE(MAX(id), 0) FROM {TableName}"; lastId = await initConn.ExecuteScalarAsync( new CommandDefinition(maxIdSql, cancellationToken: cancellationToken)) .ConfigureAwait(false); } while (!cancellationToken.IsCancellationRequested) { await using var conn = await _connectionFactory.OpenConnectionAsync(cancellationToken).ConfigureAwait(false); var sql = $@" SELECT id, data, tenant_id, correlation_id, timestamp FROM {TableName} WHERE id > @LastId ORDER BY id LIMIT 100"; var entries = await conn.QueryAsync( new CommandDefinition(sql, new { LastId = lastId }, cancellationToken: cancellationToken)) .ConfigureAwait(false); var entriesList = entries.ToList(); if (entriesList.Count > 0) { foreach (var entry in entriesList) { var @event = JsonSerializer.Deserialize(entry.Data, _jsonOptions); if (@event is not null) { var timestamp = new DateTimeOffset(entry.Timestamp, TimeSpan.Zero); var entryId = $"{timestamp.ToUnixTimeMilliseconds()}-{entry.Id}"; yield return new StreamEvent( entryId, @event, timestamp, entry.TenantId, entry.CorrelationId); } lastId = entry.Id; } } else { // No new entries, wait before polling again await Task.Delay(_options.PollInterval, cancellationToken).ConfigureAwait(false); } } } /// public async ValueTask GetInfoAsync(CancellationToken cancellationToken = default) { await EnsureTableExistsAsync(cancellationToken).ConfigureAwait(false); await using var conn = await _connectionFactory.OpenConnectionAsync(cancellationToken).ConfigureAwait(false); var sql = $@" SELECT COUNT(*) as length, MIN(id) as first_id, MAX(id) as last_id, MIN(timestamp) as first_ts, MAX(timestamp) as last_ts FROM {TableName}"; var info = await conn.QuerySingleAsync( new CommandDefinition(sql, cancellationToken: cancellationToken)) .ConfigureAwait(false); string? firstEntryId = null; string? lastEntryId = null; DateTimeOffset? firstTs = null; DateTimeOffset? lastTs = null; if (info.FirstId.HasValue && info.FirstTs.HasValue) { firstTs = new DateTimeOffset(info.FirstTs.Value, TimeSpan.Zero); firstEntryId = $"{firstTs.Value.ToUnixTimeMilliseconds()}-{info.FirstId.Value}"; } if (info.LastId.HasValue && info.LastTs.HasValue) { lastTs = new DateTimeOffset(info.LastTs.Value, TimeSpan.Zero); lastEntryId = $"{lastTs.Value.ToUnixTimeMilliseconds()}-{info.LastId.Value}"; } return new StreamInfo(info.Length, firstEntryId, lastEntryId, firstTs, lastTs); } /// public async ValueTask TrimAsync( long maxLength, bool approximate = true, CancellationToken cancellationToken = default) { await EnsureTableExistsAsync(cancellationToken).ConfigureAwait(false); await using var conn = await _connectionFactory.OpenConnectionAsync(cancellationToken).ConfigureAwait(false); return await TrimInternalAsync(conn, maxLength, cancellationToken).ConfigureAwait(false); } private async ValueTask TrimInternalAsync(Npgsql.NpgsqlConnection conn, long maxLength, CancellationToken cancellationToken) { var sql = $@" WITH to_delete AS ( SELECT id FROM {TableName} ORDER BY id DESC OFFSET @MaxLength ) DELETE FROM {TableName} WHERE id IN (SELECT id FROM to_delete)"; return await conn.ExecuteAsync( new CommandDefinition(sql, new { MaxLength = maxLength }, cancellationToken: cancellationToken)) .ConfigureAwait(false); } private async ValueTask EnsureTableExistsAsync(CancellationToken cancellationToken) { if (_tableInitialized) return; await using var conn = await _connectionFactory.OpenConnectionAsync(cancellationToken).ConfigureAwait(false); var safeName = _options.StreamName.ToLowerInvariant().Replace("-", "_"); var sql = $@" CREATE TABLE IF NOT EXISTS {TableName} ( id BIGSERIAL PRIMARY KEY, data JSONB NOT NULL, tenant_id TEXT, correlation_id TEXT, timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_{safeName}_timestamp ON {TableName} (timestamp);"; await conn.ExecuteAsync(new CommandDefinition(sql, cancellationToken: cancellationToken)).ConfigureAwait(false); _tableInitialized = true; } private static long ParseEntryId(string entryId) { // Format is "timestamp-id" var dashIndex = entryId.LastIndexOf('-'); if (dashIndex > 0 && long.TryParse(entryId.AsSpan(dashIndex + 1), out var id)) { return id; } return 0; } private sealed class EventRow { public long Id { get; init; } public string Data { get; init; } = null!; public string? TenantId { get; init; } public string? CorrelationId { get; init; } public DateTime Timestamp { get; init; } } private sealed class StreamInfoRow { public long Length { get; init; } public long? FirstId { get; init; } public long? LastId { get; init; } public DateTime? FirstTs { get; init; } public DateTime? LastTs { get; init; } } } /// /// Factory for creating PostgreSQL event stream instances. /// public sealed class PostgresEventStreamFactory : IEventStreamFactory { private readonly PostgresConnectionFactory _connectionFactory; private readonly ILoggerFactory? _loggerFactory; private readonly JsonSerializerOptions? _jsonOptions; private readonly TimeProvider _timeProvider; public PostgresEventStreamFactory( PostgresConnectionFactory connectionFactory, ILoggerFactory? loggerFactory = null, JsonSerializerOptions? jsonOptions = null, TimeProvider? timeProvider = null) { _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); _loggerFactory = loggerFactory; _jsonOptions = jsonOptions; _timeProvider = timeProvider ?? TimeProvider.System; } /// public string ProviderName => "postgres"; /// public IEventStream Create(EventStreamOptions options) where TEvent : class { ArgumentNullException.ThrowIfNull(options); return new PostgresEventStream( _connectionFactory, options, _loggerFactory?.CreateLogger>(), _jsonOptions, _timeProvider); } }