Add MaxConcurrentJobs semaphore to prevent Concelier sync overload
Problem: Triggering sync on all 21+ advisory sources simultaneously fires 21 background fetch jobs that all compete for DB connections, HTTP connections, and CPU. This overwhelms the service, causing 504 gateway timeouts on subsequent API calls. Fix: Add a SemaphoreSlim in JobCoordinator.ExecuteJobAsync gated by MaxConcurrentJobs (default: 6). When more than 6 jobs are triggered concurrently, excess jobs queue behind the semaphore rather than all executing at once. - JobSchedulerOptions: new MaxConcurrentJobs property (default 6) - JobCoordinator: SemaphoreSlim wraps ExecuteJobAsync, extracted ExecuteJobCoreAsync for the actual execution logic - Configurable via appsettings: JobScheduler:MaxConcurrentJobs The lease-based per-job deduplication still prevents the same job kind from running twice. This new limit caps total concurrent jobs across all kinds. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -22,6 +22,7 @@ public sealed class JobCoordinator : IJobCoordinator
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly JobDiagnostics _diagnostics;
|
||||
private readonly string _holderId;
|
||||
private readonly SemaphoreSlim _concurrencyGate;
|
||||
|
||||
public JobCoordinator(
|
||||
IOptions<JobSchedulerOptions> optionsAccessor,
|
||||
@@ -42,6 +43,7 @@ public sealed class JobCoordinator : IJobCoordinator
|
||||
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
|
||||
_diagnostics = diagnostics ?? throw new ArgumentNullException(nameof(diagnostics));
|
||||
_holderId = BuildHolderId();
|
||||
_concurrencyGate = new SemaphoreSlim(_options.MaxConcurrentJobs, _options.MaxConcurrentJobs);
|
||||
}
|
||||
|
||||
public async Task<JobTriggerResult> TriggerAsync(string kind, IReadOnlyDictionary<string, object?>? parameters, string trigger, CancellationToken cancellationToken)
|
||||
@@ -437,6 +439,27 @@ public sealed class JobCoordinator : IJobCoordinator
|
||||
IReadOnlyDictionary<string, object?> parameters,
|
||||
string trigger,
|
||||
CancellationTokenSource linkedTokenSource)
|
||||
{
|
||||
// Limit concurrent job execution to prevent resource exhaustion (DB connections, HTTP, CPU).
|
||||
// The semaphore is bounded by MaxConcurrentJobs (default: 6).
|
||||
await _concurrencyGate.WaitAsync(linkedTokenSource.Token).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
await ExecuteJobCoreAsync(definition, lease, run, parameters, trigger, linkedTokenSource).ConfigureAwait(false);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_concurrencyGate.Release();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ExecuteJobCoreAsync(
|
||||
JobDefinition definition,
|
||||
JobLease lease,
|
||||
JobRunSnapshot run,
|
||||
IReadOnlyDictionary<string, object?> parameters,
|
||||
string trigger,
|
||||
CancellationTokenSource linkedTokenSource)
|
||||
{
|
||||
using (linkedTokenSource)
|
||||
{
|
||||
|
||||
@@ -9,4 +9,11 @@ public sealed class JobSchedulerOptions
|
||||
public TimeSpan DefaultTimeout { get; set; } = TimeSpan.FromMinutes(15);
|
||||
|
||||
public TimeSpan DefaultLeaseDuration { get; set; } = TimeSpan.FromMinutes(5);
|
||||
|
||||
/// <summary>
|
||||
/// Maximum number of jobs that can execute concurrently.
|
||||
/// When exceeded, new triggers will queue behind the semaphore.
|
||||
/// Default: 6 (balances throughput vs. resource pressure).
|
||||
/// </summary>
|
||||
public int MaxConcurrentJobs { get; set; } = 6;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user