using System.Linq; using System.Net; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StellaOps.Zastava.Observer.Backend; using StellaOps.Zastava.Observer.Configuration; using StellaOps.Zastava.Observer.Runtime; namespace StellaOps.Zastava.Observer.Worker; internal sealed class RuntimeEventDispatchService : BackgroundService { private readonly IRuntimeEventBuffer buffer; private readonly IRuntimeEventsClient eventsClient; private readonly IRuntimeFactsClient runtimeFactsClient; private readonly IOptionsMonitor observerOptions; private readonly TimeProvider timeProvider; private readonly ILogger logger; public RuntimeEventDispatchService( IRuntimeEventBuffer buffer, IRuntimeEventsClient eventsClient, IRuntimeFactsClient runtimeFactsClient, IOptionsMonitor observerOptions, TimeProvider timeProvider, ILogger logger) { this.buffer = buffer ?? throw new ArgumentNullException(nameof(buffer)); this.eventsClient = eventsClient ?? throw new ArgumentNullException(nameof(eventsClient)); this.runtimeFactsClient = runtimeFactsClient ?? throw new ArgumentNullException(nameof(runtimeFactsClient)); this.observerOptions = observerOptions ?? throw new ArgumentNullException(nameof(observerOptions)); this.timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider)); this.logger = logger ?? throw new ArgumentNullException(nameof(logger)); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var batch = new List(); var enumerator = buffer.ReadAllAsync(stoppingToken).GetAsyncEnumerator(stoppingToken); Task? moveNextTask = null; Task? flushDelayTask = null; CancellationTokenSource? flushDelayCts = null; try { while (!stoppingToken.IsCancellationRequested) { moveNextTask ??= enumerator.MoveNextAsync().AsTask(); if (batch.Count > 0 && flushDelayTask is null) { StartFlushTimer(ref flushDelayTask, ref flushDelayCts, stoppingToken); } Task completedTask; if (flushDelayTask is null) { completedTask = await Task.WhenAny(moveNextTask).ConfigureAwait(false); } else { completedTask = await Task.WhenAny(moveNextTask, flushDelayTask).ConfigureAwait(false); } if (completedTask == moveNextTask) { if (!await moveNextTask.ConfigureAwait(false)) { break; } var item = enumerator.Current; batch.Add(item); moveNextTask = null; var options = observerOptions.CurrentValue; var batchSize = Math.Clamp(options.PublishBatchSize, 1, 512); if (batch.Count >= batchSize) { ResetFlushTimer(ref flushDelayTask, ref flushDelayCts); await FlushAsync(batch, stoppingToken).ConfigureAwait(false); } } else { // flush timer triggered ResetFlushTimer(ref flushDelayTask, ref flushDelayCts); if (batch.Count > 0) { await FlushAsync(batch, stoppingToken).ConfigureAwait(false); } } } } finally { ResetFlushTimer(ref flushDelayTask, ref flushDelayCts); if (batch.Count > 0 && !stoppingToken.IsCancellationRequested) { await FlushAsync(batch, stoppingToken).ConfigureAwait(false); } if (moveNextTask is not null) { try { await moveNextTask.ConfigureAwait(false); } catch { /* ignored */ } } await enumerator.DisposeAsync().ConfigureAwait(false); } } private async Task FlushAsync(List batch, CancellationToken cancellationToken) { if (batch.Count == 0) { return; } try { var envelopes = batch.Select(item => item.Envelope).ToArray(); var factsPublished = await TryPublishRuntimeFactsAsync(envelopes, cancellationToken).ConfigureAwait(false); var request = new RuntimeEventsIngestRequest { BatchId = $"obs-{timeProvider.GetUtcNow():yyyyMMddTHHmmssfff}-{Guid.NewGuid():N}", Events = envelopes }; var result = await eventsClient.PublishAsync(request, cancellationToken).ConfigureAwait(false); if (result.Success) { foreach (var item in batch) { await item.CompleteAsync().ConfigureAwait(false); } logger.LogInformation("Runtime events batch published (batchId={BatchId}, accepted={Accepted}, duplicates={Duplicates}, runtimeFacts={FactsPublished}).", request.BatchId, result.Accepted, result.Duplicates, factsPublished); } else if (result.RateLimited) { await RequeueBatchAsync(batch, cancellationToken).ConfigureAwait(false); await DelayAsync(result.RetryAfter, cancellationToken).ConfigureAwait(false); } } catch (RuntimeEventsException ex) when (!cancellationToken.IsCancellationRequested) { logger.LogWarning(ex, "Runtime events publish failed (status={StatusCode}); batch will be retried.", (int)ex.StatusCode); await RequeueBatchAsync(batch, cancellationToken).ConfigureAwait(false); var backoff = ex.StatusCode == HttpStatusCode.ServiceUnavailable ? TimeSpan.FromSeconds(5) : TimeSpan.FromSeconds(2); await DelayAsync(backoff, cancellationToken).ConfigureAwait(false); } catch (Exception ex) when (!cancellationToken.IsCancellationRequested) { logger.LogWarning(ex, "Runtime events publish encountered an unexpected error; batch will be retried."); await RequeueBatchAsync(batch, cancellationToken).ConfigureAwait(false); await DelayAsync(TimeSpan.FromSeconds(5), cancellationToken).ConfigureAwait(false); } finally { batch.Clear(); } } private async Task TryPublishRuntimeFactsAsync(RuntimeEventEnvelope[] envelopes, CancellationToken cancellationToken) { if (envelopes.Length == 0) { return false; } var options = observerOptions.CurrentValue.Reachability; var request = RuntimeFactsBuilder.Build(envelopes, options); if (request is null) { return false; } try { await runtimeFactsClient.PublishAsync(request, cancellationToken).ConfigureAwait(false); logger.LogDebug("Published {Count} runtime facts (callgraphId={CallgraphId}).", request.Events.Count, request.CallgraphId); return true; } catch (RuntimeFactsException ex) when (!cancellationToken.IsCancellationRequested) { logger.LogWarning(ex, "Runtime facts publish failed; batch will be retried."); throw; } catch (Exception ex) when (!cancellationToken.IsCancellationRequested) { logger.LogWarning(ex, "Runtime facts publish encountered an unexpected error; batch will be retried."); throw; } } private async Task RequeueBatchAsync(IEnumerable batch, CancellationToken cancellationToken) { foreach (var item in batch) { try { await item.RequeueAsync(cancellationToken).ConfigureAwait(false); } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { throw; } catch (Exception ex) { logger.LogWarning(ex, "Failed to requeue runtime event {EventId}; dropping.", item.Envelope.Event.EventId); await item.CompleteAsync().ConfigureAwait(false); } } } private async Task DelayAsync(TimeSpan delay, CancellationToken cancellationToken) { if (delay <= TimeSpan.Zero) { return; } try { await Task.Delay(delay, cancellationToken).ConfigureAwait(false); } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { } } private void StartFlushTimer(ref Task? flushTask, ref CancellationTokenSource? cts, CancellationToken stoppingToken) { var options = observerOptions.CurrentValue; var flushIntervalSeconds = Math.Clamp(options.PublishFlushIntervalSeconds, 0.1, 30); var flushInterval = TimeSpan.FromSeconds(flushIntervalSeconds); cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); flushTask = Task.Delay(flushInterval, cts.Token); } private void ResetFlushTimer(ref Task? flushTask, ref CancellationTokenSource? cts) { if (cts is not null) { try { cts.Cancel(); } catch { /* ignore */ } cts.Dispose(); cts = null; } flushTask = null; } }