using System.Threading.Channels;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace StellaOps.Provcache;
///
/// Background service that manages write-behind persistence for Provcache entries.
/// Batches writes to Postgres for efficiency and provides retry logic for transient failures.
///
public sealed class WriteBehindQueue : BackgroundService, IWriteBehindQueue
{
private readonly Channel _channel;
private readonly IProvcacheRepository _repository;
private readonly ProvcacheOptions _options;
private readonly ILogger _logger;
private readonly TimeProvider _timeProvider;
// Metrics
private long _totalEnqueued;
private long _totalPersisted;
private long _totalFailed;
private long _totalRetries;
private long _totalBatches;
private long _currentQueueDepth;
public WriteBehindQueue(
IProvcacheRepository repository,
IOptions options,
ILogger logger,
TimeProvider? timeProvider = null)
{
_repository = repository ?? throw new ArgumentNullException(nameof(repository));
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_timeProvider = timeProvider ?? TimeProvider.System;
// Bounded channel to provide backpressure
_channel = Channel.CreateBounded(new BoundedChannelOptions(_options.WriteBehindQueueCapacity)
{
FullMode = BoundedChannelFullMode.Wait,
SingleWriter = false,
SingleReader = true
});
}
///
public ValueTask EnqueueAsync(ProvcacheEntry entry, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(entry);
var item = new WriteBehindItem
{
Entry = entry,
EnqueuedAt = _timeProvider.GetUtcNow(),
RetryCount = 0
};
Interlocked.Increment(ref _totalEnqueued);
Interlocked.Increment(ref _currentQueueDepth);
ProvcacheTelemetry.SetWriteBehindQueueSize((int)Interlocked.Read(ref _currentQueueDepth));
return _channel.Writer.WriteAsync(item, cancellationToken);
}
///
public WriteBehindMetrics GetMetrics()
{
return new WriteBehindMetrics
{
TotalEnqueued = Interlocked.Read(ref _totalEnqueued),
TotalPersisted = Interlocked.Read(ref _totalPersisted),
TotalFailed = Interlocked.Read(ref _totalFailed),
TotalRetries = Interlocked.Read(ref _totalRetries),
TotalBatches = Interlocked.Read(ref _totalBatches),
CurrentQueueDepth = Interlocked.Read(ref _currentQueueDepth),
Timestamp = _timeProvider.GetUtcNow()
};
}
///
/// Main processing loop that reads from the channel and batches writes.
///
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation(
"Write-behind queue started with batch size {BatchSize}, interval {IntervalMs}ms",
_options.WriteBehindBatchSize,
_options.WriteBehindFlushIntervalMs);
var batch = new List(_options.WriteBehindBatchSize);
var flushInterval = TimeSpan.FromMilliseconds(_options.WriteBehindFlushIntervalMs);
while (!stoppingToken.IsCancellationRequested)
{
try
{
batch.Clear();
// Read items until batch is full or timeout
using var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
cts.CancelAfter(flushInterval);
try
{
while (batch.Count < _options.WriteBehindBatchSize)
{
var item = await _channel.Reader.ReadAsync(cts.Token).ConfigureAwait(false);
batch.Add(item);
Interlocked.Decrement(ref _currentQueueDepth);
}
}
catch (OperationCanceledException) when (!stoppingToken.IsCancellationRequested)
{
// Timeout reached, process current batch
}
if (batch.Count > 0)
{
await ProcessBatchAsync(batch, stoppingToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in write-behind queue processing loop");
await Task.Delay(1000, stoppingToken).ConfigureAwait(false); // Backoff on error
}
}
// Drain remaining items on shutdown
await DrainAsync(CancellationToken.None).ConfigureAwait(false);
_logger.LogInformation("Write-behind queue stopped");
}
///
/// Processes a batch of items with retry logic.
///
private async Task ProcessBatchAsync(List batch, CancellationToken cancellationToken)
{
var entries = batch.Select(b => b.Entry).ToList();
using var activity = ProvcacheTelemetry.StartWriteBehindFlushActivity(batch.Count);
try
{
await _repository.UpsertManyAsync(entries, cancellationToken).ConfigureAwait(false);
Interlocked.Add(ref _totalPersisted, batch.Count);
Interlocked.Increment(ref _totalBatches);
ProvcacheTelemetry.RecordWriteBehind("ok", batch.Count);
ProvcacheTelemetry.SetWriteBehindQueueSize((int)Interlocked.Read(ref _currentQueueDepth));
_logger.LogDebug(
"Write-behind batch persisted {Count} entries",
batch.Count);
}
catch (Exception ex)
{
ProvcacheTelemetry.MarkError(activity, ex.Message);
_logger.LogWarning(
ex,
"Write-behind batch failed for {Count} entries, scheduling retries",
batch.Count);
// Re-enqueue failed items for retry
foreach (var item in batch)
{
if (item.RetryCount < _options.WriteBehindMaxRetries)
{
var retryItem = item with { RetryCount = item.RetryCount + 1 };
Interlocked.Increment(ref _totalRetries);
ProvcacheTelemetry.RecordWriteBehind("retry", 1);
if (_channel.Writer.TryWrite(retryItem))
{
Interlocked.Increment(ref _currentQueueDepth);
ProvcacheTelemetry.SetWriteBehindQueueSize((int)Interlocked.Read(ref _currentQueueDepth));
}
else
{
Interlocked.Increment(ref _totalFailed);
ProvcacheTelemetry.RecordWriteBehind("failed", 1);
_logger.LogError(
"Write-behind queue full, dropping entry for VeriKey {VeriKey}",
item.Entry.VeriKey);
}
}
else
{
Interlocked.Increment(ref _totalFailed);
ProvcacheTelemetry.RecordWriteBehind("failed", 1);
_logger.LogError(
"Write-behind max retries exceeded for VeriKey {VeriKey}",
item.Entry.VeriKey);
}
}
}
}
///
/// Drains remaining items from the queue during shutdown.
///
private async Task DrainAsync(CancellationToken cancellationToken)
{
var batch = new List();
while (_channel.Reader.TryRead(out var item))
{
batch.Add(item);
Interlocked.Decrement(ref _currentQueueDepth);
if (batch.Count >= _options.WriteBehindBatchSize)
{
await ProcessBatchAsync(batch, cancellationToken).ConfigureAwait(false);
batch.Clear();
}
}
if (batch.Count > 0)
{
await ProcessBatchAsync(batch, cancellationToken).ConfigureAwait(false);
}
_logger.LogInformation("Write-behind queue drained");
}
}
///
/// Interface for write-behind queue operations.
///
public interface IWriteBehindQueue
{
///
/// Enqueues an entry for asynchronous persistence.
///
ValueTask EnqueueAsync(ProvcacheEntry entry, CancellationToken cancellationToken = default);
///
/// Gets current queue metrics.
///
WriteBehindMetrics GetMetrics();
}
///
/// Item in the write-behind queue with retry metadata.
///
internal sealed record WriteBehindItem
{
public required ProvcacheEntry Entry { get; init; }
public required DateTimeOffset EnqueuedAt { get; init; }
public required int RetryCount { get; init; }
}
///
/// Metrics for the write-behind queue.
///
public sealed record WriteBehindMetrics
{
public required long TotalEnqueued { get; init; }
public required long TotalPersisted { get; init; }
public required long TotalFailed { get; init; }
public required long TotalRetries { get; init; }
public required long TotalBatches { get; init; }
public required long CurrentQueueDepth { get; init; }
public required DateTimeOffset Timestamp { get; init; }
}