up
This commit is contained in:
@@ -37,6 +37,7 @@ using MongoDB.Driver;
|
||||
using MongoDB.Bson;
|
||||
using Microsoft.Extensions.Caching.Memory;
|
||||
using StellaOps.Excititor.WebService.Contracts;
|
||||
using System.Globalization;
|
||||
using StellaOps.Excititor.WebService.Graph;
|
||||
|
||||
var builder = WebApplication.CreateBuilder(args);
|
||||
@@ -1176,6 +1177,66 @@ app.MapGet("/obs/excititor/health", async (
|
||||
return Results.Ok(payload);
|
||||
});
|
||||
|
||||
// VEX timeline SSE (WEB-OBS-52-001)
|
||||
app.MapGet("/obs/excititor/timeline", async (
|
||||
HttpContext context,
|
||||
IOptions<VexMongoStorageOptions> storageOptions,
|
||||
TimeProvider timeProvider,
|
||||
ILoggerFactory loggerFactory,
|
||||
[FromQuery] string? cursor,
|
||||
[FromQuery] int? limit,
|
||||
CancellationToken cancellationToken) =>
|
||||
{
|
||||
if (!TryResolveTenant(context, storageOptions.Value, requireHeader: true, out var tenant, out var tenantError))
|
||||
{
|
||||
return tenantError;
|
||||
}
|
||||
|
||||
var logger = loggerFactory.CreateLogger("ExcititorTimeline");
|
||||
var take = Math.Clamp(limit.GetValueOrDefault(10), 1, 100);
|
||||
|
||||
var startId = 0;
|
||||
var candidateCursor = cursor ?? context.Request.Headers["Last-Event-ID"].FirstOrDefault();
|
||||
if (!string.IsNullOrWhiteSpace(candidateCursor) && !int.TryParse(candidateCursor, NumberStyles.Integer, CultureInfo.InvariantCulture, out startId))
|
||||
{
|
||||
return Results.BadRequest(new { error = "cursor must be integer" });
|
||||
}
|
||||
|
||||
context.Response.Headers.CacheControl = "no-store";
|
||||
context.Response.Headers["X-Accel-Buffering"] = "no";
|
||||
context.Response.ContentType = "text/event-stream";
|
||||
await context.Response.WriteAsync("retry: 5000\n\n", cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var now = timeProvider.GetUtcNow();
|
||||
var events = Enumerable.Range(startId, take)
|
||||
.Select(id => new ExcititorTimelineEvent(
|
||||
Type: "evidence.update",
|
||||
Tenant: tenant,
|
||||
Source: "vex-runtime",
|
||||
Count: 0,
|
||||
Errors: 0,
|
||||
TraceId: null,
|
||||
OccurredAt: now.ToString("O", CultureInfo.InvariantCulture)))
|
||||
.ToList();
|
||||
|
||||
foreach (var (evt, idx) in events.Select((e, i) => (e, i)))
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
var id = startId + idx;
|
||||
await context.Response.WriteAsync($"id: {id}\n", cancellationToken).ConfigureAwait(false);
|
||||
await context.Response.WriteAsync($"event: {evt.Type}\n", cancellationToken).ConfigureAwait(false);
|
||||
await context.Response.WriteAsync($"data: {JsonSerializer.Serialize(evt)}\n\n", cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
await context.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var nextCursor = startId + events.Count;
|
||||
context.Response.Headers["X-Next-Cursor"] = nextCursor.ToString(CultureInfo.InvariantCulture);
|
||||
logger.LogInformation("obs excititor timeline emitted {Count} events for tenant {Tenant} start {Start} next {Next}", events.Count, tenant, startId, nextCursor);
|
||||
|
||||
return Results.Empty;
|
||||
}).WithName("GetExcititorTimeline");
|
||||
|
||||
IngestEndpoints.MapIngestEndpoints(app);
|
||||
ResolveEndpoint.MapResolveEndpoint(app);
|
||||
MirrorEndpoints.MapMirrorEndpoints(app);
|
||||
|
||||
Reference in New Issue
Block a user