// ----------------------------------------------------------------------------- // GreyQueueWatchdogService.cs // Sprint: SPRINT_20260118_018_Unknowns_queue_enhancement // Task: UQ-004 - Add timeout watchdog for stuck processing // Description: Watchdog service to detect and handle stuck entries // ----------------------------------------------------------------------------- using System.Diagnostics.Metrics; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace StellaOps.Unknowns.Services; /// /// Watchdog service that detects and handles stuck entries in Processing status. /// public sealed class GreyQueueWatchdogService : BackgroundService { private readonly IGreyQueueRepository _repository; private readonly INotificationPublisher _notificationPublisher; private readonly GreyQueueWatchdogOptions _options; private readonly TimeProvider _timeProvider; private readonly ILogger _logger; private static readonly Meter WatchdogMeter = new("StellaOps.Unknowns.Watchdog", "1.0.0"); private static readonly Counter StuckTotal = WatchdogMeter.CreateCounter( "greyqueue_stuck_total", "entries", "Total number of stuck entries detected"); private static readonly Counter TimeoutTotal = WatchdogMeter.CreateCounter( "greyqueue_timeout_total", "entries", "Total number of entries that timed out"); private static readonly Counter RetryTotal = WatchdogMeter.CreateCounter( "greyqueue_watchdog_retry_total", "entries", "Total number of forced retries by watchdog"); private static readonly Counter FailedTotal = WatchdogMeter.CreateCounter( "greyqueue_watchdog_failed_total", "entries", "Total number of entries moved to Failed by watchdog"); private static readonly Gauge ProcessingCount = WatchdogMeter.CreateGauge( "greyqueue_processing_count", "entries", "Current number of entries in Processing status"); public GreyQueueWatchdogService( IGreyQueueRepository repository, INotificationPublisher notificationPublisher, IOptions options, TimeProvider timeProvider, ILogger logger) { _repository = repository ?? throw new ArgumentNullException(nameof(repository)); _notificationPublisher = notificationPublisher ?? throw new ArgumentNullException(nameof(notificationPublisher)); _options = options?.Value ?? new GreyQueueWatchdogOptions(); _timeProvider = timeProvider ?? TimeProvider.System; _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation( "Grey Queue Watchdog starting with interval {Interval}, alert threshold {AlertThreshold}, timeout {Timeout}", _options.CheckInterval, _options.ProcessingAlertThreshold, _options.ProcessingTimeout); while (!stoppingToken.IsCancellationRequested) { try { await CheckProcessingEntriesAsync(stoppingToken); } catch (Exception ex) when (ex is not OperationCanceledException) { _logger.LogError(ex, "Watchdog check failed"); } await Task.Delay(_options.CheckInterval, stoppingToken); } } private async Task CheckProcessingEntriesAsync(CancellationToken ct) { var now = _timeProvider.GetUtcNow(); var processingEntries = await _repository.GetByStatusAsync(GreyQueueStatus.Processing, ct); ProcessingCount.Record(processingEntries.Count); foreach (var entry in processingEntries) { var processingDuration = now - (entry.LastProcessedAt ?? entry.CreatedAt); // Check if entry is stuck (exceeded alert threshold but not timeout) if (processingDuration >= _options.ProcessingAlertThreshold && processingDuration < _options.ProcessingTimeout) { _logger.LogWarning( "Entry {EntryId} has been processing for {Duration}", entry.Id, processingDuration); StuckTotal.Add(1); await _notificationPublisher.PublishAsync(new StuckProcessingAlert { EntryId = entry.Id, BomRef = entry.BomRef, ProcessingDuration = processingDuration, AlertedAt = now }, ct); } // Check if entry has timed out else if (processingDuration >= _options.ProcessingTimeout) { await HandleTimeoutAsync(entry, processingDuration, ct); } } } private async Task HandleTimeoutAsync( GreyQueueEntry entry, TimeSpan processingDuration, CancellationToken ct) { var now = _timeProvider.GetUtcNow(); _logger.LogWarning( "Entry {EntryId} has timed out after {Duration}. Attempts: {Attempts}/{MaxAttempts}", entry.Id, processingDuration, entry.ProcessingAttempts, entry.MaxAttempts); TimeoutTotal.Add(1); // Check if max attempts exceeded if (entry.ProcessingAttempts >= entry.MaxAttempts) { _logger.LogError( "Entry {EntryId} has exceeded max attempts ({MaxAttempts}), marking as Failed", entry.Id, entry.MaxAttempts); await _repository.UpdateStatusAsync(entry.Id, GreyQueueStatus.Failed, ct); FailedTotal.Add(1); await _notificationPublisher.PublishAsync(new EntryFailedNotification { EntryId = entry.Id, BomRef = entry.BomRef, Reason = $"Timed out after {entry.ProcessingAttempts} attempts", FailedAt = now }, ct); } else { // Force retry _logger.LogInformation( "Forcing retry for entry {EntryId} (attempt {Attempt})", entry.Id, entry.ProcessingAttempts + 1); var backoffMultiplier = Math.Pow(2, entry.ProcessingAttempts); var nextProcessingAt = now.AddMinutes(_options.BaseRetryDelayMinutes * backoffMultiplier); await _repository.ForceRetryAsync(entry.Id, nextProcessingAt, ct); RetryTotal.Add(1); await _notificationPublisher.PublishAsync(new ForcedRetryNotification { EntryId = entry.Id, BomRef = entry.BomRef, AttemptNumber = entry.ProcessingAttempts + 1, NextProcessingAt = nextProcessingAt }, ct); } } /// /// Manually triggers a retry for a stuck entry. /// public async Task ManualRetryAsync(Guid entryId, CancellationToken ct = default) { var entry = await _repository.GetByIdAsync(entryId, ct); if (entry == null) { throw new InvalidOperationException($"Entry {entryId} not found"); } if (entry.Status != GreyQueueStatus.Processing && entry.Status != GreyQueueStatus.Failed) { throw new InvalidOperationException($"Entry {entryId} is not stuck (status: {entry.Status})"); } _logger.LogInformation("Manual retry triggered for entry {EntryId}", entryId); var now = _timeProvider.GetUtcNow(); await _repository.ForceRetryAsync(entryId, now, ct); } /// /// Gets current watchdog statistics. /// public async Task GetStatsAsync(CancellationToken ct = default) { var now = _timeProvider.GetUtcNow(); var processingEntries = await _repository.GetByStatusAsync(GreyQueueStatus.Processing, ct); var stuckCount = 0; var timedOutCount = 0; var oldestProcessingDuration = TimeSpan.Zero; foreach (var entry in processingEntries) { var duration = now - (entry.LastProcessedAt ?? entry.CreatedAt); if (duration > oldestProcessingDuration) { oldestProcessingDuration = duration; } if (duration >= _options.ProcessingTimeout) { timedOutCount++; } else if (duration >= _options.ProcessingAlertThreshold) { stuckCount++; } } return new WatchdogStats { TotalProcessing = processingEntries.Count, StuckCount = stuckCount, TimedOutCount = timedOutCount, OldestProcessingDuration = oldestProcessingDuration, CheckedAt = now }; } } /// /// Watchdog configuration. /// public sealed record GreyQueueWatchdogOptions { /// Configuration section name. public const string SectionName = "Unknowns:Watchdog"; /// How often to check for stuck entries. public TimeSpan CheckInterval { get; init; } = TimeSpan.FromMinutes(5); /// Duration after which to alert (1 hour default). public TimeSpan ProcessingAlertThreshold { get; init; } = TimeSpan.FromHours(1); /// Duration after which to force retry (4 hours default). public TimeSpan ProcessingTimeout { get; init; } = TimeSpan.FromHours(4); /// Maximum processing attempts before marking as Failed. public int MaxAttempts { get; init; } = 5; /// Base retry delay in minutes (used with exponential backoff). public double BaseRetryDelayMinutes { get; init; } = 15; } /// /// Watchdog statistics. /// public sealed record WatchdogStats { /// Total entries in Processing status. public int TotalProcessing { get; init; } /// Number of stuck entries (alert threshold exceeded). public int StuckCount { get; init; } /// Number of timed out entries. public int TimedOutCount { get; init; } /// Duration of oldest processing entry. public TimeSpan OldestProcessingDuration { get; init; } /// When stats were checked. public DateTimeOffset CheckedAt { get; init; } } #region Notifications /// Stuck processing alert. public sealed record StuckProcessingAlert { public required Guid EntryId { get; init; } public required string BomRef { get; init; } public TimeSpan ProcessingDuration { get; init; } public DateTimeOffset AlertedAt { get; init; } } /// Entry failed notification. public sealed record EntryFailedNotification { public required Guid EntryId { get; init; } public required string BomRef { get; init; } public required string Reason { get; init; } public DateTimeOffset FailedAt { get; init; } } /// Forced retry notification. public sealed record ForcedRetryNotification { public required Guid EntryId { get; init; } public required string BomRef { get; init; } public int AttemptNumber { get; init; } public DateTimeOffset NextProcessingAt { get; init; } } #endregion #region Repository Extensions // These would be added to IGreyQueueRepository public partial interface IGreyQueueRepository { /// Gets entries by status. Task> GetByStatusAsync(GreyQueueStatus status, CancellationToken ct = default); /// Updates entry status. Task UpdateStatusAsync(Guid entryId, GreyQueueStatus status, CancellationToken ct = default); /// Forces a retry for an entry. Task ForceRetryAsync(Guid entryId, DateTimeOffset nextProcessingAt, CancellationToken ct = default); /// Gets entry by ID. Task GetByIdAsync(Guid entryId, CancellationToken ct = default); } #endregion