using System; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StellaOps.Scanner.Worker.Options; namespace StellaOps.Scanner.Worker.Processing; public sealed class LeaseHeartbeatService { private readonly TimeProvider _timeProvider; private readonly IOptionsMonitor _options; private readonly IDelayScheduler _delayScheduler; private readonly ILogger _logger; public LeaseHeartbeatService(TimeProvider timeProvider, IDelayScheduler delayScheduler, IOptionsMonitor options, ILogger logger) { _timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider)); _delayScheduler = delayScheduler ?? throw new ArgumentNullException(nameof(delayScheduler)); _options = options ?? throw new ArgumentNullException(nameof(options)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public async Task RunAsync(IScanJobLease lease, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(lease); await Task.Yield(); while (!cancellationToken.IsCancellationRequested) { var options = _options.CurrentValue; var interval = ComputeInterval(options, lease); var delay = ApplyJitter(interval, options.Queue); try { await _delayScheduler.DelayAsync(delay, cancellationToken).ConfigureAwait(false); } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { break; } if (cancellationToken.IsCancellationRequested) { break; } if (await TryRenewAsync(options, lease, cancellationToken).ConfigureAwait(false)) { continue; } _logger.LogError( "Job {JobId} (scan {ScanId}) lease renewal exhausted retries; cancelling processing.", lease.JobId, lease.ScanId); throw new InvalidOperationException("Lease renewal retries exhausted."); } } private static TimeSpan ComputeInterval(ScannerWorkerOptions options, IScanJobLease lease) { var divisor = options.Queue.HeartbeatSafetyFactor <= 0 ? 3.0 : options.Queue.HeartbeatSafetyFactor; var safetyFactor = Math.Max(3.0, divisor); var recommended = TimeSpan.FromTicks((long)(lease.LeaseDuration.Ticks / safetyFactor)); if (recommended < options.Queue.MinHeartbeatInterval) { recommended = options.Queue.MinHeartbeatInterval; } else if (recommended > options.Queue.MaxHeartbeatInterval) { recommended = options.Queue.MaxHeartbeatInterval; } return recommended; } private static TimeSpan ApplyJitter(TimeSpan duration, ScannerWorkerOptions.QueueOptions queueOptions) { if (queueOptions.MaxHeartbeatJitterMilliseconds <= 0) { return duration; } var offsetMs = Random.Shared.NextDouble() * queueOptions.MaxHeartbeatJitterMilliseconds; var adjusted = duration - TimeSpan.FromMilliseconds(offsetMs); if (adjusted < queueOptions.MinHeartbeatInterval) { return queueOptions.MinHeartbeatInterval; } return adjusted > TimeSpan.Zero ? adjusted : queueOptions.MinHeartbeatInterval; } private async Task TryRenewAsync(ScannerWorkerOptions options, IScanJobLease lease, CancellationToken cancellationToken) { try { await lease.RenewAsync(cancellationToken).ConfigureAwait(false); return true; } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { return false; } catch (Exception ex) { _logger.LogWarning( ex, "Job {JobId} (scan {ScanId}) heartbeat failed; retrying.", lease.JobId, lease.ScanId); } foreach (var delay in options.Queue.NormalizedHeartbeatRetryDelays) { if (cancellationToken.IsCancellationRequested) { return false; } try { await _delayScheduler.DelayAsync(delay, cancellationToken).ConfigureAwait(false); } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { return false; } try { await lease.RenewAsync(cancellationToken).ConfigureAwait(false); return true; } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { return false; } catch (Exception ex) { _logger.LogWarning( ex, "Job {JobId} (scan {ScanId}) heartbeat retry failed; will retry after {Delay}.", lease.JobId, lease.ScanId, delay); } } return false; } }