using System.Diagnostics; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StellaOps.Findings.Ledger.Domain; using StellaOps.Findings.Ledger.Infrastructure; using StellaOps.Findings.Ledger.Infrastructure.Policy; using StellaOps.Findings.Ledger.Options; using StellaOps.Findings.Ledger.Observability; using StellaOps.Findings.Ledger.Services; namespace StellaOps.Findings.Ledger.Infrastructure.Projection; public sealed class LedgerProjectionWorker : BackgroundService { private readonly ILedgerEventStream _eventStream; private readonly IFindingProjectionRepository _repository; private readonly IPolicyEvaluationService _policyEvaluationService; private readonly TimeProvider _timeProvider; private readonly LedgerServiceOptions.ProjectionOptions _options; private readonly ILogger _logger; public LedgerProjectionWorker( ILedgerEventStream eventStream, IFindingProjectionRepository repository, IPolicyEvaluationService policyEvaluationService, IOptions options, TimeProvider timeProvider, ILogger logger) { _eventStream = eventStream ?? throw new ArgumentNullException(nameof(eventStream)); _repository = repository ?? throw new ArgumentNullException(nameof(repository)); _policyEvaluationService = policyEvaluationService ?? throw new ArgumentNullException(nameof(policyEvaluationService)); _timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider)); _options = (options ?? throw new ArgumentNullException(nameof(options))).Value.Projection; _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { ProjectionCheckpoint checkpoint; try { checkpoint = await _repository.GetCheckpointAsync(stoppingToken).ConfigureAwait(false); } catch (Exception ex) when (!stoppingToken.IsCancellationRequested) { _logger.LogError(ex, "Failed to load ledger projection checkpoint."); throw; } while (!stoppingToken.IsCancellationRequested) { IReadOnlyList batch; try { batch = await _eventStream.ReadNextBatchAsync(checkpoint, _options.BatchSize, stoppingToken).ConfigureAwait(false); } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { break; } catch (Exception ex) { _logger.LogError(ex, "Failed to read ledger events for projection replay."); await DelayAsync(stoppingToken).ConfigureAwait(false); continue; } if (batch.Count == 0) { await DelayAsync(stoppingToken).ConfigureAwait(false); continue; } var batchStopwatch = Stopwatch.StartNew(); var batchTenant = batch[0].TenantId; var batchFailed = false; foreach (var record in batch) { using var scope = _logger.BeginScope(new Dictionary { ["tenant"] = record.TenantId, ["chainId"] = record.ChainId, ["eventId"] = record.EventId, ["eventType"] = record.EventType, ["policyVersion"] = record.PolicyVersion }); using var activity = LedgerTelemetry.StartProjectionApply(record); var applyStopwatch = Stopwatch.StartNew(); if (!LedgerEventConstants.IsFindingEvent(record.EventType)) { checkpoint = checkpoint with { LastRecordedAt = record.RecordedAt, LastEventId = record.EventId, UpdatedAt = _timeProvider.GetUtcNow() }; await _repository.SaveCheckpointAsync(checkpoint, stoppingToken).ConfigureAwait(false); _logger.LogInformation("Skipped non-finding ledger event {EventId} type {EventType} during projection.", record.EventId, record.EventType); continue; } string? evaluationStatus = null; try { evaluationStatus = await ApplyAsync(record, stoppingToken).ConfigureAwait(false); checkpoint = checkpoint with { LastRecordedAt = record.RecordedAt, LastEventId = record.EventId, UpdatedAt = _timeProvider.GetUtcNow() }; await _repository.SaveCheckpointAsync(checkpoint, stoppingToken).ConfigureAwait(false); _logger.LogInformation( "Projected ledger event {EventId} for tenant {Tenant} chain {ChainId} seq {Sequence} finding {FindingId}.", record.EventId, record.TenantId, record.ChainId, record.SequenceNumber, record.FindingId); activity?.SetStatus(System.Diagnostics.ActivityStatusCode.Ok); applyStopwatch.Stop(); var now = _timeProvider.GetUtcNow(); var lagSeconds = Math.Max(0, (now - record.RecordedAt).TotalSeconds); LedgerMetrics.RecordProjectionApply( applyStopwatch.Elapsed, lagSeconds, record.TenantId, record.EventType, record.PolicyVersion, evaluationStatus ?? string.Empty); LedgerTimeline.EmitProjectionUpdated(_logger, record, evaluationStatus, evidenceBundleRef: null); } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { LedgerTelemetry.MarkError(activity, "projection_cancelled"); return; } catch (Exception ex) { LedgerTelemetry.MarkError(activity, "projection_failed"); _logger.LogError(ex, "Failed to project ledger event {EventId} for tenant {TenantId}.", record.EventId, record.TenantId); batchFailed = true; await DelayAsync(stoppingToken).ConfigureAwait(false); break; } } batchStopwatch.Stop(); if (!batchFailed) { LedgerMetrics.RecordProjectionRebuild(batchStopwatch.Elapsed, batchTenant, "replay"); } } } private async Task ApplyAsync(LedgerEventRecord record, CancellationToken cancellationToken) { var current = await _repository.GetAsync(record.TenantId, record.FindingId, record.PolicyVersion, cancellationToken).ConfigureAwait(false); var evaluation = await _policyEvaluationService.EvaluateAsync(record, current, cancellationToken).ConfigureAwait(false); var result = LedgerProjectionReducer.Reduce(record, current, evaluation); await _repository.UpsertAsync(result.Projection, cancellationToken).ConfigureAwait(false); await _repository.InsertHistoryAsync(result.History, cancellationToken).ConfigureAwait(false); if (result.Action is not null) { await _repository.InsertActionAsync(result.Action, cancellationToken).ConfigureAwait(false); } return evaluation.Status; } private async Task DelayAsync(CancellationToken cancellationToken) { try { await Task.Delay(_options.IdleDelay, cancellationToken).ConfigureAwait(false); } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { } } }