using StellaOps.TimelineIndexer.Core.Abstractions; using StellaOps.TimelineIndexer.Core.Models; using System.Collections.Concurrent; using System.Diagnostics.Metrics; using System.Linq; namespace StellaOps.TimelineIndexer.Worker; /// /// Background consumer that reads timeline events from configured subscribers and persists them via the ingestion service. /// public sealed class TimelineIngestionWorker( IEnumerable subscribers, ITimelineIngestionService ingestionService, ILogger logger, TimeProvider? timeProvider = null) : BackgroundService { private static readonly Meter Meter = new("StellaOps.TimelineIndexer", "1.0.0"); private static readonly Counter IngestedCounter = Meter.CreateCounter("timeline.ingested"); private static readonly Counter DuplicateCounter = Meter.CreateCounter("timeline.duplicates"); private static readonly Counter FailedCounter = Meter.CreateCounter("timeline.failed"); private static readonly Histogram LagHistogram = Meter.CreateHistogram("timeline.ingest.lag.seconds"); private readonly IEnumerable _subscribers = subscribers; private readonly ITimelineIngestionService _ingestion = ingestionService; private readonly ILogger _logger = logger; private readonly ConcurrentDictionary<(string tenant, string eventId), byte> _sessionSeen = new(); private readonly TimeProvider _timeProvider = timeProvider ?? TimeProvider.System; protected override Task ExecuteAsync(CancellationToken stoppingToken) { var tasks = _subscribers.Select(subscriber => ConsumeAsync(subscriber, stoppingToken)).ToArray(); return Task.WhenAll(tasks); } private async Task ConsumeAsync(ITimelineEventSubscriber subscriber, CancellationToken cancellationToken) { await foreach (var envelope in subscriber.SubscribeAsync(cancellationToken)) { var key = (envelope.TenantId, envelope.EventId); if (!_sessionSeen.TryAdd(key, 0)) { DuplicateCounter.Add(1); _logger.LogDebug("Skipped duplicate timeline event {EventId} for tenant {Tenant}", envelope.EventId, envelope.TenantId); continue; } try { var result = await _ingestion.IngestAsync(envelope, cancellationToken).ConfigureAwait(false); if (result.Inserted) { IngestedCounter.Add(1); LagHistogram.Record((_timeProvider.GetUtcNow() - envelope.OccurredAt).TotalSeconds); _logger.LogInformation("Ingested timeline event {EventId} from {Source} (tenant {Tenant})", envelope.EventId, envelope.Source, envelope.TenantId); } else { DuplicateCounter.Add(1); _logger.LogDebug("Store reported duplicate for event {EventId} tenant {Tenant}", envelope.EventId, envelope.TenantId); } } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { // Respect shutdown. break; } catch (Exception ex) { FailedCounter.Add(1); _logger.LogError(ex, "Failed to ingest timeline event {EventId} for tenant {Tenant}", envelope.EventId, envelope.TenantId); } } } }