using System.Linq; using System.Net; using System.Net.Http.Headers; using System.Text.Json; using System.Text.Json.Serialization; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StellaOps.Zastava.Core.Configuration; using StellaOps.Zastava.Core.Contracts; using StellaOps.Zastava.Core.Diagnostics; using StellaOps.Zastava.Core.Security; using StellaOps.Zastava.Core.Serialization; using StellaOps.Zastava.Observer.Configuration; namespace StellaOps.Zastava.Observer.Backend; internal interface IRuntimeEventsClient { Task PublishAsync(RuntimeEventsIngestRequest request, CancellationToken cancellationToken); } internal sealed class RuntimeEventsClient : IRuntimeEventsClient { private static readonly JsonSerializerOptions SerializerOptions = new() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase, DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull }; static RuntimeEventsClient() { SerializerOptions.Converters.Add(new JsonStringEnumConverter(JsonNamingPolicy.CamelCase, allowIntegerValues: false)); } private readonly HttpClient httpClient; private readonly IZastavaAuthorityTokenProvider authorityTokenProvider; private readonly IOptionsMonitor runtimeOptions; private readonly IOptionsMonitor observerOptions; private readonly IZastavaRuntimeMetrics runtimeMetrics; private readonly TimeProvider timeProvider; private readonly ILogger logger; public RuntimeEventsClient( HttpClient httpClient, IZastavaAuthorityTokenProvider authorityTokenProvider, IOptionsMonitor runtimeOptions, IOptionsMonitor observerOptions, IZastavaRuntimeMetrics runtimeMetrics, TimeProvider timeProvider, ILogger logger) { this.httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient)); this.authorityTokenProvider = authorityTokenProvider ?? throw new ArgumentNullException(nameof(authorityTokenProvider)); this.runtimeOptions = runtimeOptions ?? throw new ArgumentNullException(nameof(runtimeOptions)); this.observerOptions = observerOptions ?? throw new ArgumentNullException(nameof(observerOptions)); this.runtimeMetrics = runtimeMetrics ?? throw new ArgumentNullException(nameof(runtimeMetrics)); this.timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider)); this.logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public async Task PublishAsync(RuntimeEventsIngestRequest request, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(request); if (request.Events.Count == 0) { return RuntimeEventPublishResult.Empty; } var runtime = runtimeOptions.CurrentValue; var authority = runtime.Authority; var audience = authority.Audience.FirstOrDefault() ?? "scanner"; var scopes = authority.Scopes ?? Array.Empty(); var token = await authorityTokenProvider.GetAsync(audience, scopes, cancellationToken).ConfigureAwait(false); var backend = observerOptions.CurrentValue.Backend; var requestPath = backend.EventsPath; using var httpRequest = new HttpRequestMessage(HttpMethod.Post, requestPath); var payload = ZastavaCanonicalJsonSerializer.SerializeToUtf8Bytes(request); httpRequest.Content = new ByteArrayContent(payload); httpRequest.Content.Headers.ContentType = new MediaTypeHeaderValue("application/json"); httpRequest.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json")); httpRequest.Headers.Authorization = CreateAuthorizationHeader(token); var stopwatch = System.Diagnostics.Stopwatch.StartNew(); try { using var response = await httpClient.SendAsync(httpRequest, cancellationToken).ConfigureAwait(false); stopwatch.Stop(); RecordLatency(stopwatch.Elapsed.TotalMilliseconds, success: response.IsSuccessStatusCode); if (response.IsSuccessStatusCode) { var body = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false); RuntimeEventsIngestResponse? parsed = null; if (!string.IsNullOrWhiteSpace(body)) { parsed = JsonSerializer.Deserialize(body, SerializerOptions); } var accepted = parsed?.Accepted ?? request.Events.Count; var duplicates = parsed?.Duplicates ?? 0; logger.LogDebug("Published runtime events batch (batchId={BatchId}, accepted={Accepted}, duplicates={Duplicates}).", request.BatchId, accepted, duplicates); return RuntimeEventPublishResult.Successful(accepted, duplicates); } if (response.StatusCode == HttpStatusCode.TooManyRequests) { var retryAfter = ParseRetryAfter(response.Headers.RetryAfter, timeProvider) ?? TimeSpan.FromSeconds(5); logger.LogWarning("Runtime events publish rate limited (batchId={BatchId}, retryAfter={RetryAfter}).", request.BatchId, retryAfter); return RuntimeEventPublishResult.FromRateLimit(retryAfter); } var errorBody = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false); logger.LogWarning("Runtime events publish failed with status {Status} (batchId={BatchId}): {Payload}", (int)response.StatusCode, request.BatchId, Truncate(errorBody)); throw new RuntimeEventsException($"Runtime events publish failed with status {(int)response.StatusCode}", response.StatusCode); } catch (RuntimeEventsException) { throw; } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { throw; } catch (Exception ex) { stopwatch.Stop(); RecordLatency(stopwatch.Elapsed.TotalMilliseconds, success: false); logger.LogWarning(ex, "Runtime events publish encountered an exception (batchId={BatchId}).", request.BatchId); throw new RuntimeEventsException("Runtime events publish failed due to network error.", HttpStatusCode.ServiceUnavailable, ex); } } private AuthenticationHeaderValue CreateAuthorizationHeader(ZastavaOperationalToken token) { var scheme = string.Equals(token.TokenType, "dpop", StringComparison.OrdinalIgnoreCase) ? "DPoP" : token.TokenType; return new AuthenticationHeaderValue(scheme, token.AccessToken); } private void RecordLatency(double elapsedMs, bool success) { var tags = runtimeMetrics.DefaultTags .Concat(new[] { new KeyValuePair("endpoint", "runtime-events"), new KeyValuePair("success", success ? "true" : "false") }) .ToArray(); runtimeMetrics.BackendLatencyMs.Record(elapsedMs, tags); } internal static TimeSpan? ParseRetryAfter(RetryConditionHeaderValue? retryAfter, TimeProvider timeProvider) { ArgumentNullException.ThrowIfNull(timeProvider); if (retryAfter is null) { return null; } if (retryAfter.Delta.HasValue) { return retryAfter.Delta.Value; } if (retryAfter.Date.HasValue) { var delta = retryAfter.Date.Value - timeProvider.GetUtcNow(); return delta > TimeSpan.Zero ? delta : TimeSpan.Zero; } return null; } internal static string Truncate(string? value, int maxLength = 512) { if (string.IsNullOrEmpty(value)) { return string.Empty; } return value.Length <= maxLength ? value : value[..maxLength] + "..."; } } internal sealed record RuntimeEventsIngestRequest { [JsonPropertyName("batchId")] public string? BatchId { get; init; } [JsonPropertyName("events")] public IReadOnlyList Events { get; init; } = Array.Empty(); } internal sealed record RuntimeEventsIngestResponse { [JsonPropertyName("accepted")] public int Accepted { get; init; } [JsonPropertyName("duplicates")] public int Duplicates { get; init; } } internal readonly record struct RuntimeEventPublishResult( bool Success, bool RateLimited, TimeSpan RetryAfter, int Accepted, int Duplicates) { public static RuntimeEventPublishResult Empty => new(true, false, TimeSpan.Zero, 0, 0); public static RuntimeEventPublishResult Successful(int accepted, int duplicates) => new(true, false, TimeSpan.Zero, accepted, duplicates); public static RuntimeEventPublishResult FromRateLimit(TimeSpan retryAfter) => new(false, true, retryAfter, 0, 0); } internal sealed class RuntimeEventsException : Exception { public RuntimeEventsException(string message, HttpStatusCode statusCode, Exception? innerException = null) : base(message, innerException) { StatusCode = statusCode; } public HttpStatusCode StatusCode { get; } }