Files
git.stella-ops.org/src/__Libraries/StellaOps.Provcache/WriteBehindQueue.Batch.cs

72 lines
2.7 KiB
C#

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
namespace StellaOps.Provcache;
public sealed partial class WriteBehindQueue
{
private async Task ProcessBatchAsync(List<WriteBehindItem> batch, CancellationToken cancellationToken)
{
var entries = batch.Select(b => b.Entry).ToList();
using var activity = ProvcacheTelemetry.StartWriteBehindFlushActivity(batch.Count);
try
{
await _repository.UpsertManyAsync(entries, cancellationToken).ConfigureAwait(false);
Interlocked.Add(ref _totalPersisted, batch.Count);
Interlocked.Increment(ref _totalBatches);
ProvcacheTelemetry.RecordWriteBehind("ok", batch.Count);
ProvcacheTelemetry.SetWriteBehindQueueSize((int)Interlocked.Read(ref _currentQueueDepth));
_logger.LogDebug(
"Write-behind batch persisted {Count} entries",
batch.Count);
}
catch (Exception ex)
{
ProvcacheTelemetry.MarkError(activity, ex.Message);
_logger.LogWarning(
ex,
"Write-behind batch failed for {Count} entries, scheduling retries",
batch.Count);
foreach (var item in batch)
{
if (item.RetryCount < _options.WriteBehindMaxRetries)
{
var retryItem = item with { RetryCount = item.RetryCount + 1 };
Interlocked.Increment(ref _totalRetries);
ProvcacheTelemetry.RecordWriteBehind("retry", 1);
if (_channel.Writer.TryWrite(retryItem))
{
Interlocked.Increment(ref _currentQueueDepth);
ProvcacheTelemetry.SetWriteBehindQueueSize((int)Interlocked.Read(ref _currentQueueDepth));
}
else
{
Interlocked.Increment(ref _totalFailed);
ProvcacheTelemetry.RecordWriteBehind("failed", 1);
_logger.LogError(
"Write-behind queue full, dropping entry for VeriKey {VeriKey}",
item.Entry.VeriKey);
}
}
else
{
Interlocked.Increment(ref _totalFailed);
ProvcacheTelemetry.RecordWriteBehind("failed", 1);
_logger.LogError(
"Write-behind max retries exceeded for VeriKey {VeriKey}",
item.Entry.VeriKey);
}
}
}
}
}