using Microsoft.Extensions.Logging; using StellaOps.Messaging; using StellaOps.Provcache.Events; using System; using System.Threading; using System.Threading.Tasks; namespace StellaOps.Provcache.Invalidation; public sealed partial class FeedEpochInvalidator { private async Task ProcessEventsAsync(CancellationToken cancellationToken) { try { // Start from latest events. await foreach (var streamEvent in _eventStream.SubscribeAsync(StreamPosition.End, cancellationToken)) { try { await HandleEventAsync(streamEvent.Event, cancellationToken); Interlocked.Increment(ref _eventsProcessed); _lastEventAt = _timeProvider.GetUtcNow(); } catch (Exception ex) { Interlocked.Increment(ref _errors); _logger.LogError( ex, "Error processing FeedEpochAdvancedEvent {EventId} for feed {FeedId}", streamEvent.Event.EventId, streamEvent.Event.FeedId); } } } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { // Normal shutdown. } catch (Exception ex) { _logger.LogError(ex, "Fatal error in FeedEpochInvalidator event processing loop"); throw; } } private async Task HandleEventAsync(FeedEpochAdvancedEvent @event, CancellationToken cancellationToken) { _logger.LogInformation( "Processing feed epoch advancement: FeedId={FeedId}, PreviousEpoch={PreviousEpoch}, NewEpoch={NewEpoch}", @event.FeedId, @event.PreviousEpoch, @event.NewEpoch); // Invalidate entries with feed_epoch older than the new epoch. // The feed_epoch in cache entries is formatted as "feed:epoch" (e.g., "cve:2024-12-24T12:00:00Z"). var request = InvalidationRequest.ByFeedEpochOlderThan( @event.NewEpoch, $"Feed {FormatFeedEpoch(@event.FeedId, @event.NewEpoch)} advanced"); var result = await _provcacheService.InvalidateByAsync(request, cancellationToken); Interlocked.Add(ref _entriesInvalidated, result.EntriesAffected); _logger.LogInformation( "Feed epoch advancement invalidated {Count} cache entries for feed {FeedId} epoch {NewEpoch}", result.EntriesAffected, @event.FeedId, @event.NewEpoch); // Record telemetry. ProvcacheTelemetry.RecordInvalidation("feed_epoch", result.EntriesAffected); } /// /// Formats a feed epoch identifier. /// private static string FormatFeedEpoch(string feedId, string epoch) { return $"{feedId}:{epoch}"; } }