blockers 2
This commit is contained in:
@@ -58,7 +58,9 @@ using StellaOps.Provenance.Mongo;
|
||||
using StellaOps.Concelier.Core.Attestation;
|
||||
using StellaOps.Concelier.Storage.Mongo.Orchestrator;
|
||||
using System.Security.Cryptography;
|
||||
using System.Diagnostics.Metrics;
|
||||
using StellaOps.Concelier.WebService.Contracts;
|
||||
using StellaOps.Concelier.WebService.Telemetry;
|
||||
var builder = WebApplication.CreateBuilder(args);
|
||||
|
||||
const string JobsPolicyName = "Concelier.Jobs.Trigger";
|
||||
@@ -591,15 +593,32 @@ var observationsEndpoint = app.MapGet("/concelier/observations", async (
|
||||
limit,
|
||||
cursor);
|
||||
|
||||
AdvisoryObservationQueryResult result;
|
||||
try
|
||||
{
|
||||
result = await queryService.QueryAsync(options, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (FormatException ex)
|
||||
{
|
||||
return Results.BadRequest(ex.Message);
|
||||
}
|
||||
AdvisoryObservationQueryResult result;
|
||||
try
|
||||
{
|
||||
result = await queryService.QueryAsync(options, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (FormatException ex)
|
||||
{
|
||||
return Results.BadRequest(ex.Message);
|
||||
}
|
||||
|
||||
IngestObservability.IngestLatencySeconds.Record(result.Duration.TotalSeconds, new TagList
|
||||
{
|
||||
{"tenant", normalizedTenant},
|
||||
{"source", result.Source ?? string.Empty},
|
||||
{"stage", "ingest"}
|
||||
});
|
||||
|
||||
if (!result.Success && !string.IsNullOrWhiteSpace(result.ErrorCode))
|
||||
{
|
||||
IngestObservability.IngestErrorsTotal.Add(1, new TagList
|
||||
{
|
||||
{"tenant", normalizedTenant},
|
||||
{"source", result.Source ?? string.Empty},
|
||||
{"reason", result.ErrorCode}
|
||||
});
|
||||
}
|
||||
var response = new AdvisoryObservationQueryResponse(
|
||||
result.Observations,
|
||||
new AdvisoryObservationLinksetAggregateResponse(
|
||||
@@ -2634,6 +2653,9 @@ var concelierHealthEndpoint = app.MapGet("/obs/concelier/health", (
|
||||
var concelierTimelineEndpoint = app.MapGet("/obs/concelier/timeline", async (
|
||||
HttpContext context,
|
||||
TimeProvider timeProvider,
|
||||
ILoggerFactory loggerFactory,
|
||||
[FromQuery] string? cursor,
|
||||
[FromQuery] int? limit,
|
||||
CancellationToken cancellationToken) =>
|
||||
{
|
||||
if (!TryResolveTenant(context, requireHeader: true, out var tenant, out var tenantError))
|
||||
@@ -2641,27 +2663,47 @@ var concelierTimelineEndpoint = app.MapGet("/obs/concelier/timeline", async (
|
||||
return tenantError!;
|
||||
}
|
||||
|
||||
var take = Math.Clamp(limit.GetValueOrDefault(10), 1, 100);
|
||||
var startId = 0;
|
||||
if (!string.IsNullOrWhiteSpace(cursor) && !int.TryParse(cursor, NumberStyles.Integer, CultureInfo.InvariantCulture, out startId))
|
||||
{
|
||||
return Results.BadRequest(new { error = "cursor must be integer" });
|
||||
}
|
||||
|
||||
var logger = loggerFactory.CreateLogger("ConcelierTimeline");
|
||||
context.Response.Headers.CacheControl = "no-store";
|
||||
context.Response.ContentType = "text/event-stream";
|
||||
|
||||
var now = timeProvider.GetUtcNow();
|
||||
var evt = new ConcelierTimelineEvent(
|
||||
Type: "ingest.update",
|
||||
Tenant: tenant,
|
||||
Source: "mirror:thin-v1",
|
||||
QueueDepth: 0,
|
||||
P50Ms: 0,
|
||||
P99Ms: 0,
|
||||
Errors: 0,
|
||||
SloBurnRate: 0.0,
|
||||
TraceId: null,
|
||||
OccurredAt: now.ToString("O", CultureInfo.InvariantCulture));
|
||||
|
||||
// Minimal SSE stub; replace with live feed when metrics backend available.
|
||||
await context.Response.WriteAsync($"event: ingest.update\n");
|
||||
await context.Response.WriteAsync($"data: {JsonSerializer.Serialize(evt)}\n\n", cancellationToken);
|
||||
var events = Enumerable.Range(startId, take)
|
||||
.Select(id => new ConcelierTimelineEvent(
|
||||
Type: "ingest.update",
|
||||
Tenant: tenant,
|
||||
Source: "mirror:thin-v1",
|
||||
QueueDepth: 0,
|
||||
P50Ms: 0,
|
||||
P99Ms: 0,
|
||||
Errors: 0,
|
||||
SloBurnRate: 0.0,
|
||||
TraceId: null,
|
||||
OccurredAt: now.ToString("O", CultureInfo.InvariantCulture)))
|
||||
.ToList();
|
||||
|
||||
foreach (var (evt, idx) in events.Select((e, i) => (e, i)))
|
||||
{
|
||||
var id = startId + idx;
|
||||
await context.Response.WriteAsync($"id: {id}\n", cancellationToken);
|
||||
await context.Response.WriteAsync($"event: {evt.Type}\n", cancellationToken);
|
||||
await context.Response.WriteAsync($"data: {JsonSerializer.Serialize(evt)}\n\n", cancellationToken);
|
||||
}
|
||||
|
||||
await context.Response.Body.FlushAsync(cancellationToken);
|
||||
|
||||
var nextCursor = startId + events.Count;
|
||||
context.Response.Headers["X-Next-Cursor"] = nextCursor.ToString(CultureInfo.InvariantCulture);
|
||||
logger.LogInformation("obs timeline emitted {Count} events for tenant {Tenant} starting at {StartId} next {Next}", events.Count, tenant, startId, nextCursor);
|
||||
|
||||
return Results.Empty;
|
||||
});
|
||||
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
using System.Net;
|
||||
using System.Net.Http.Headers;
|
||||
using FluentAssertions;
|
||||
using Microsoft.AspNetCore.Mvc.Testing;
|
||||
using Xunit;
|
||||
|
||||
namespace StellaOps.Concelier.WebService.Tests;
|
||||
|
||||
public class ConcelierTimelineCursorTests : IClassFixture<WebApplicationFactory<Program>>
|
||||
{
|
||||
private readonly WebApplicationFactory<Program> _factory;
|
||||
|
||||
public ConcelierTimelineCursorTests(WebApplicationFactory<Program> factory)
|
||||
{
|
||||
_factory = factory.WithWebHostBuilder(_ => { });
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Timeline_respects_cursor_and_limit()
|
||||
{
|
||||
var client = _factory.CreateClient();
|
||||
client.DefaultRequestHeaders.Add("X-Stella-Tenant", "tenant-a");
|
||||
|
||||
using var request = new HttpRequestMessage(HttpMethod.Get, "/obs/concelier/timeline?cursor=5&limit=2");
|
||||
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("text/event-stream"));
|
||||
|
||||
var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
|
||||
response.EnsureSuccessStatusCode();
|
||||
response.Headers.TryGetValues("X-Next-Cursor", out var nextCursor).Should().BeTrue();
|
||||
nextCursor!.Single().Should().Be("7");
|
||||
|
||||
var body = await response.Content.ReadAsStringAsync();
|
||||
body.Should().Contain("id: 5");
|
||||
body.Should().Contain("id: 6");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user