using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StellaOps.Scanner.Worker.Diagnostics; using StellaOps.Scanner.Worker.Options; using StellaOps.Scanner.Worker.Processing; namespace StellaOps.Scanner.Worker.Hosting; public sealed partial class ScannerWorkerHostedService : BackgroundService { private readonly IScanJobSource _jobSource; private readonly ScanJobProcessor _processor; private readonly LeaseHeartbeatService _heartbeatService; private readonly ScannerWorkerMetrics _metrics; private readonly TimeProvider _timeProvider; private readonly IOptionsMonitor _options; private readonly ILogger _logger; private readonly IDelayScheduler _delayScheduler; public ScannerWorkerHostedService( IScanJobSource jobSource, ScanJobProcessor processor, LeaseHeartbeatService heartbeatService, ScannerWorkerMetrics metrics, TimeProvider timeProvider, IDelayScheduler delayScheduler, IOptionsMonitor options, ILogger logger) { _jobSource = jobSource ?? throw new ArgumentNullException(nameof(jobSource)); _processor = processor ?? throw new ArgumentNullException(nameof(processor)); _heartbeatService = heartbeatService ?? throw new ArgumentNullException(nameof(heartbeatService)); _metrics = metrics ?? throw new ArgumentNullException(nameof(metrics)); _timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider)); _delayScheduler = delayScheduler ?? throw new ArgumentNullException(nameof(delayScheduler)); _options = options ?? throw new ArgumentNullException(nameof(options)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var runningJobs = new HashSet(); var delayStrategy = new PollDelayStrategy(_options.CurrentValue.Polling); WorkerStarted(_logger); while (!stoppingToken.IsCancellationRequested) { runningJobs.RemoveWhere(static task => task.IsCompleted); var options = _options.CurrentValue; if (runningJobs.Count >= options.MaxConcurrentJobs) { var completed = await Task.WhenAny(runningJobs).ConfigureAwait(false); runningJobs.Remove(completed); continue; } IScanJobLease? lease = null; try { lease = await _jobSource.TryAcquireAsync(stoppingToken).ConfigureAwait(false); } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { break; } catch (Exception ex) { _logger.LogError(ex, "Scanner worker failed to acquire job lease; backing off."); } if (lease is null) { var delay = delayStrategy.NextDelay(); await _delayScheduler.DelayAsync(delay, stoppingToken).ConfigureAwait(false); continue; } delayStrategy.Reset(); runningJobs.Add(RunJobAsync(lease, stoppingToken)); } if (runningJobs.Count > 0) { await Task.WhenAll(runningJobs).ConfigureAwait(false); } WorkerStopping(_logger); } private async Task RunJobAsync(IScanJobLease lease, CancellationToken stoppingToken) { var options = _options.CurrentValue; var jobStart = _timeProvider.GetUtcNow(); var queueLatency = jobStart - lease.EnqueuedAtUtc; var jobCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); var jobToken = jobCts.Token; var context = new ScanJobContext(lease, _timeProvider, jobStart, jobToken); _metrics.RecordQueueLatency(context, queueLatency); JobAcquired(_logger, lease.JobId, lease.ScanId, lease.Attempt, queueLatency.TotalMilliseconds); var heartbeatTask = _heartbeatService.RunAsync(lease, jobToken); Exception? processingException = null; try { await _processor.ExecuteAsync(context, jobToken).ConfigureAwait(false); jobCts.Cancel(); await heartbeatTask.ConfigureAwait(false); await lease.CompleteAsync(stoppingToken).ConfigureAwait(false); var duration = _timeProvider.GetUtcNow() - jobStart; _metrics.RecordJobDuration(context, duration); _metrics.IncrementJobCompleted(context); JobCompleted(_logger, lease.JobId, lease.ScanId, duration.TotalMilliseconds); } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { processingException = null; await lease.AbandonAsync("host-stopping", CancellationToken.None).ConfigureAwait(false); JobAbandoned(_logger, lease.JobId, lease.ScanId); } catch (Exception ex) { processingException = ex; var duration = _timeProvider.GetUtcNow() - jobStart; _metrics.RecordJobDuration(context, duration); var reason = ex.GetType().Name; var maxAttempts = options.Queue.MaxAttempts; if (lease.Attempt >= maxAttempts) { await lease.PoisonAsync(reason, CancellationToken.None).ConfigureAwait(false); _metrics.IncrementJobFailed(context, reason); JobPoisoned(_logger, lease.JobId, lease.ScanId, lease.Attempt, maxAttempts, ex); } else { await lease.AbandonAsync(reason, CancellationToken.None).ConfigureAwait(false); JobAbandonedWithError(_logger, lease.JobId, lease.ScanId, lease.Attempt, maxAttempts, ex); } } finally { jobCts.Cancel(); try { await heartbeatTask.ConfigureAwait(false); } catch (Exception ex) when (processingException is null && ex is not OperationCanceledException) { _logger.LogWarning(ex, "Heartbeat loop ended with an exception for job {JobId}.", lease.JobId); } await lease.DisposeAsync().ConfigureAwait(false); jobCts.Dispose(); } } [LoggerMessage(EventId = 2000, Level = LogLevel.Information, Message = "Scanner worker host started.")] private static partial void WorkerStarted(ILogger logger); [LoggerMessage(EventId = 2001, Level = LogLevel.Information, Message = "Scanner worker host stopping.")] private static partial void WorkerStopping(ILogger logger); [LoggerMessage( EventId = 2002, Level = LogLevel.Information, Message = "Leased job {JobId} (scan {ScanId}) attempt {Attempt}; queue latency {LatencyMs:F0} ms.")] private static partial void JobAcquired(ILogger logger, string jobId, string scanId, int attempt, double latencyMs); [LoggerMessage( EventId = 2003, Level = LogLevel.Information, Message = "Job {JobId} (scan {ScanId}) completed in {DurationMs:F0} ms.")] private static partial void JobCompleted(ILogger logger, string jobId, string scanId, double durationMs); [LoggerMessage( EventId = 2004, Level = LogLevel.Warning, Message = "Job {JobId} (scan {ScanId}) abandoned due to host shutdown.")] private static partial void JobAbandoned(ILogger logger, string jobId, string scanId); [LoggerMessage( EventId = 2005, Level = LogLevel.Warning, Message = "Job {JobId} (scan {ScanId}) attempt {Attempt}/{MaxAttempts} abandoned after failure; job will be retried.")] private static partial void JobAbandonedWithError(ILogger logger, string jobId, string scanId, int attempt, int maxAttempts, Exception exception); [LoggerMessage( EventId = 2006, Level = LogLevel.Error, Message = "Job {JobId} (scan {ScanId}) attempt {Attempt}/{MaxAttempts} exceeded retry budget; quarantining job.")] private static partial void JobPoisoned(ILogger logger, string jobId, string scanId, int attempt, int maxAttempts, Exception exception); }