namespace LedgerReplayHarness; internal sealed class TaskThrottler { private readonly SemaphoreSlim _semaphore; private readonly List _tasks = new(); public TaskThrottler(int maxDegreeOfParallelism) { _semaphore = new SemaphoreSlim(maxDegreeOfParallelism > 0 ? maxDegreeOfParallelism : 1); } public async Task RunAsync(Func taskFactory, CancellationToken cancellationToken) { await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); var task = Task.Run(async () => { try { await taskFactory().ConfigureAwait(false); } finally { _semaphore.Release(); } }, cancellationToken); lock (_tasks) _tasks.Add(task); } public async Task DrainAsync(CancellationToken cancellationToken) { Task[] pending; lock (_tasks) pending = _tasks.ToArray(); await Task.WhenAll(pending).WaitAsync(cancellationToken).ConfigureAwait(false); } }