// Copyright (c) StellaOps. Licensed under the AGPL-3.0-or-later.
using System.Data;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
namespace StellaOps.Eventing.Outbox;
///
/// Background service that processes the transactional outbox for reliable event delivery.
///
public sealed class TimelineOutboxProcessor : BackgroundService
{
private readonly NpgsqlDataSource _dataSource;
private readonly IOptions _options;
private readonly ILogger _logger;
///
/// Initializes a new instance of the class.
///
public TimelineOutboxProcessor(
NpgsqlDataSource dataSource,
IOptions options,
ILogger logger)
{
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
///
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (!_options.Value.EnableOutbox)
{
_logger.LogInformation("Outbox processing disabled");
return;
}
_logger.LogInformation(
"Starting outbox processor with batch size {BatchSize} and interval {Interval}",
_options.Value.OutboxBatchSize,
_options.Value.OutboxInterval);
while (!stoppingToken.IsCancellationRequested)
{
try
{
var processedCount = await ProcessBatchAsync(stoppingToken).ConfigureAwait(false);
if (processedCount > 0)
{
_logger.LogDebug("Processed {Count} outbox entries", processedCount);
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing outbox batch");
}
await Task.Delay(_options.Value.OutboxInterval, stoppingToken).ConfigureAwait(false);
}
_logger.LogInformation("Outbox processor stopped");
}
private async Task ProcessBatchAsync(CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var transaction = await connection.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken).ConfigureAwait(false);
try
{
// Select and lock pending entries
const string selectSql = """
SELECT id, event_id, retry_count
FROM timeline.outbox
WHERE status = 'PENDING'
OR (status = 'FAILED' AND next_retry_at <= NOW())
ORDER BY id
LIMIT @batch_size
FOR UPDATE SKIP LOCKED
""";
await using var selectCmd = new NpgsqlCommand(selectSql, connection, transaction);
selectCmd.Parameters.AddWithValue("@batch_size", _options.Value.OutboxBatchSize);
var entries = new List<(long Id, string EventId, int RetryCount)>();
await using (var reader = await selectCmd.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false))
{
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
entries.Add((
reader.GetInt64(0),
reader.GetString(1),
reader.GetInt32(2)));
}
}
if (entries.Count == 0)
{
await transaction.RollbackAsync(cancellationToken).ConfigureAwait(false);
return 0;
}
// Process each entry (in real implementation, this would forward to downstream consumers)
var completedIds = new List();
foreach (var entry in entries)
{
try
{
// TODO: Forward event to downstream consumers
// For now, just mark as completed
completedIds.Add(entry.Id);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to process outbox entry {Id}", entry.Id);
await MarkAsFailedAsync(connection, transaction, entry.Id, entry.RetryCount, ex.Message, cancellationToken).ConfigureAwait(false);
}
}
// Mark completed entries
if (completedIds.Count > 0)
{
const string completeSql = """
UPDATE timeline.outbox
SET status = 'COMPLETED', updated_at = NOW()
WHERE id = ANY(@ids)
""";
await using var completeCmd = new NpgsqlCommand(completeSql, connection, transaction);
completeCmd.Parameters.AddWithValue("@ids", completedIds.ToArray());
await completeCmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
return completedIds.Count;
}
catch
{
await transaction.RollbackAsync(cancellationToken).ConfigureAwait(false);
throw;
}
}
private static async Task MarkAsFailedAsync(
NpgsqlConnection connection,
NpgsqlTransaction transaction,
long id,
int retryCount,
string errorMessage,
CancellationToken cancellationToken)
{
// Exponential backoff: 1s, 2s, 4s, 8s, 16s, max 5 retries
var nextRetryDelay = TimeSpan.FromSeconds(Math.Pow(2, retryCount));
var maxRetries = 5;
var newStatus = retryCount >= maxRetries ? "FAILED" : "PENDING";
const string sql = """
UPDATE timeline.outbox
SET status = @status,
retry_count = @retry_count,
next_retry_at = @next_retry_at,
error_message = @error_message,
updated_at = NOW()
WHERE id = @id
""";
await using var cmd = new NpgsqlCommand(sql, connection, transaction);
cmd.Parameters.AddWithValue("@id", id);
cmd.Parameters.AddWithValue("@status", newStatus);
cmd.Parameters.AddWithValue("@retry_count", retryCount + 1);
cmd.Parameters.AddWithValue("@next_retry_at", DateTimeOffset.UtcNow.Add(nextRetryDelay));
cmd.Parameters.AddWithValue("@error_message", errorMessage);
await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
}