From db967a54f83a4b65af90f7ad1f5864780ec5a6ca Mon Sep 17 00:00:00 2001 From: master <> Date: Wed, 1 Apr 2026 00:22:25 +0300 Subject: [PATCH] Add MaxConcurrentJobs semaphore to prevent Concelier sync overload Problem: Triggering sync on all 21+ advisory sources simultaneously fires 21 background fetch jobs that all compete for DB connections, HTTP connections, and CPU. This overwhelms the service, causing 504 gateway timeouts on subsequent API calls. Fix: Add a SemaphoreSlim in JobCoordinator.ExecuteJobAsync gated by MaxConcurrentJobs (default: 6). When more than 6 jobs are triggered concurrently, excess jobs queue behind the semaphore rather than all executing at once. - JobSchedulerOptions: new MaxConcurrentJobs property (default 6) - JobCoordinator: SemaphoreSlim wraps ExecuteJobAsync, extracted ExecuteJobCoreAsync for the actual execution logic - Configurable via appsettings: JobScheduler:MaxConcurrentJobs The lease-based per-job deduplication still prevents the same job kind from running twice. This new limit caps total concurrent jobs across all kinds. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../Jobs/JobCoordinator.cs | 23 +++++++++++++++++++ .../Jobs/JobSchedulerOptions.cs | 7 ++++++ 2 files changed, 30 insertions(+) diff --git a/src/Concelier/__Libraries/StellaOps.Concelier.Core/Jobs/JobCoordinator.cs b/src/Concelier/__Libraries/StellaOps.Concelier.Core/Jobs/JobCoordinator.cs index e3ba7f1f2..be6df50c2 100644 --- a/src/Concelier/__Libraries/StellaOps.Concelier.Core/Jobs/JobCoordinator.cs +++ b/src/Concelier/__Libraries/StellaOps.Concelier.Core/Jobs/JobCoordinator.cs @@ -22,6 +22,7 @@ public sealed class JobCoordinator : IJobCoordinator private readonly TimeProvider _timeProvider; private readonly JobDiagnostics _diagnostics; private readonly string _holderId; + private readonly SemaphoreSlim _concurrencyGate; public JobCoordinator( IOptions optionsAccessor, @@ -42,6 +43,7 @@ public sealed class JobCoordinator : IJobCoordinator _timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider)); _diagnostics = diagnostics ?? throw new ArgumentNullException(nameof(diagnostics)); _holderId = BuildHolderId(); + _concurrencyGate = new SemaphoreSlim(_options.MaxConcurrentJobs, _options.MaxConcurrentJobs); } public async Task TriggerAsync(string kind, IReadOnlyDictionary? parameters, string trigger, CancellationToken cancellationToken) @@ -437,6 +439,27 @@ public sealed class JobCoordinator : IJobCoordinator IReadOnlyDictionary parameters, string trigger, CancellationTokenSource linkedTokenSource) + { + // Limit concurrent job execution to prevent resource exhaustion (DB connections, HTTP, CPU). + // The semaphore is bounded by MaxConcurrentJobs (default: 6). + await _concurrencyGate.WaitAsync(linkedTokenSource.Token).ConfigureAwait(false); + try + { + await ExecuteJobCoreAsync(definition, lease, run, parameters, trigger, linkedTokenSource).ConfigureAwait(false); + } + finally + { + _concurrencyGate.Release(); + } + } + + private async Task ExecuteJobCoreAsync( + JobDefinition definition, + JobLease lease, + JobRunSnapshot run, + IReadOnlyDictionary parameters, + string trigger, + CancellationTokenSource linkedTokenSource) { using (linkedTokenSource) { diff --git a/src/Concelier/__Libraries/StellaOps.Concelier.Core/Jobs/JobSchedulerOptions.cs b/src/Concelier/__Libraries/StellaOps.Concelier.Core/Jobs/JobSchedulerOptions.cs index b5db137af..383701b9b 100644 --- a/src/Concelier/__Libraries/StellaOps.Concelier.Core/Jobs/JobSchedulerOptions.cs +++ b/src/Concelier/__Libraries/StellaOps.Concelier.Core/Jobs/JobSchedulerOptions.cs @@ -9,4 +9,11 @@ public sealed class JobSchedulerOptions public TimeSpan DefaultTimeout { get; set; } = TimeSpan.FromMinutes(15); public TimeSpan DefaultLeaseDuration { get; set; } = TimeSpan.FromMinutes(5); + + /// + /// Maximum number of jobs that can execute concurrently. + /// When exceeded, new triggers will queue behind the semaphore. + /// Default: 6 (balances throughput vs. resource pressure). + /// + public int MaxConcurrentJobs { get; set; } = 6; }