refactor: JobEngine cleanup + crypto compose refactor + sprint plans + timeline merge prep
- Remove zombie JobEngine WebService (no container runs it) - Remove dangling STELLAOPS_JOBENGINE_URL, replace with RELEASE_ORCHESTRATOR_URL - Update Timeline audit paths to release-orchestrator - Extract smremote to docker-compose.crypto-provider.smremote.yml - Rename crypto compose files for consistent naming - Add crypto provider health probe API (CP-001) + tenant preferences (CP-002) - Create sprint plans: crypto picker, VulnExplorer merge, scheduler plugins - Timeline merge prep: ingestion worker relocated to infrastructure lib Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -143,7 +143,7 @@ public sealed class HttpUnifiedAuditEventProvider : IUnifiedAuditEventProvider
|
||||
{
|
||||
var uri = BuildUri(
|
||||
options.JobEngineBaseUrl,
|
||||
"/api/v1/jobengine/audit",
|
||||
"/api/v1/release-orchestrator/audit",
|
||||
new Dictionary<string, string?> { ["limit"] = options.FetchLimitPerModule.ToString(CultureInfo.InvariantCulture) });
|
||||
|
||||
if (uri is null)
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
|
||||
using Microsoft.Extensions.Logging;
|
||||
using StellaOps.Cryptography.Audit;
|
||||
using System.Linq;
|
||||
|
||||
namespace StellaOps.Timeline.WebService.Audit;
|
||||
|
||||
/// <summary>
|
||||
/// Logs authorization outcomes for timeline read/write operations (merged from timeline-indexer).
|
||||
/// </summary>
|
||||
public sealed class TimelineAuthorizationAuditSink(ILogger<TimelineAuthorizationAuditSink> logger) : IAuthEventSink
|
||||
{
|
||||
public ValueTask WriteAsync(AuthEventRecord record, CancellationToken cancellationToken)
|
||||
{
|
||||
logger.LogInformation(
|
||||
"Auth {Outcome} for {EventType} tenant={Tenant} scopes={Scopes} subject={Subject} correlation={Correlation}",
|
||||
record.Outcome,
|
||||
record.EventType,
|
||||
record.Tenant.Value ?? "<none>",
|
||||
record.Scopes.Any() ? string.Join(" ", record.Scopes) : "<none>",
|
||||
record.Subject?.SubjectId.Value ?? "<unknown>",
|
||||
record.CorrelationId ?? "<none>");
|
||||
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
@@ -352,8 +352,8 @@ public sealed record UnifiedAuditQuery
|
||||
public sealed record UnifiedAuditModuleEndpointsOptions
|
||||
{
|
||||
public string AuthorityBaseUrl { get; set; } = "http://authority.stella-ops.local";
|
||||
public string JobEngineBaseUrl { get; set; } = "http://jobengine.stella-ops.local";
|
||||
public string PolicyBaseUrl { get; set; } = "http://policy-gateway.stella-ops.local";
|
||||
public string JobEngineBaseUrl { get; set; } = "http://release-orchestrator.stella-ops.local";
|
||||
public string PolicyBaseUrl { get; set; } = "http://policy-engine.stella-ops.local";
|
||||
public string EvidenceLockerBaseUrl { get; set; } = "http://evidencelocker.stella-ops.local";
|
||||
public string NotifyBaseUrl { get; set; } = "http://notify.stella-ops.local";
|
||||
public int FetchLimitPerModule { get; set; } = 250;
|
||||
|
||||
@@ -20,7 +20,7 @@ public static class ExportEndpoints
|
||||
/// </summary>
|
||||
public static void MapExportEndpoints(this IEndpointRouteBuilder app)
|
||||
{
|
||||
var group = app.MapGroup("/api/v1/timeline")
|
||||
var group = app.MapGroup("/api/v1/timeline/hlc")
|
||||
.WithTags("Export")
|
||||
.RequireAuthorization(TimelinePolicies.Write)
|
||||
.RequireTenant();
|
||||
@@ -85,7 +85,7 @@ public static class ExportEndpoints
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
return TypedResults.Accepted(
|
||||
$"/api/v1/timeline/export/{operation.ExportId}",
|
||||
$"/api/v1/timeline/hlc/export/{operation.ExportId}",
|
||||
new ExportInitiatedResponse
|
||||
{
|
||||
ExportId = operation.ExportId,
|
||||
|
||||
@@ -20,7 +20,7 @@ public static class ReplayEndpoints
|
||||
/// </summary>
|
||||
public static void MapReplayEndpoints(this IEndpointRouteBuilder app)
|
||||
{
|
||||
var group = app.MapGroup("/api/v1/timeline")
|
||||
var group = app.MapGroup("/api/v1/timeline/hlc")
|
||||
.WithTags("Replay")
|
||||
.RequireAuthorization(TimelinePolicies.Write)
|
||||
.RequireTenant();
|
||||
@@ -84,7 +84,7 @@ public static class ReplayEndpoints
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
return TypedResults.Accepted(
|
||||
$"/api/v1/timeline/replay/{operation.ReplayId}",
|
||||
$"/api/v1/timeline/hlc/replay/{operation.ReplayId}",
|
||||
new ReplayInitiatedResponse
|
||||
{
|
||||
ReplayId = operation.ReplayId,
|
||||
|
||||
@@ -19,8 +19,8 @@ public static class TimelineEndpoints
|
||||
/// </summary>
|
||||
public static void MapTimelineEndpoints(this IEndpointRouteBuilder app)
|
||||
{
|
||||
var group = app.MapGroup("/api/v1/timeline")
|
||||
.WithTags("Timeline")
|
||||
var group = app.MapGroup("/api/v1/timeline/hlc")
|
||||
.WithTags("Timeline HLC")
|
||||
.RequireAuthorization(TimelinePolicies.Read)
|
||||
.RequireTenant();
|
||||
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
using StellaOps.Auth.Abstractions;
|
||||
using StellaOps.Cryptography.Audit;
|
||||
using StellaOps.Localization;
|
||||
using StellaOps.Auth.ServerIntegration;
|
||||
using StellaOps.Auth.ServerIntegration.Tenancy;
|
||||
@@ -8,10 +10,20 @@ using StellaOps.Timeline.Core;
|
||||
using StellaOps.Timeline.WebService.Audit;
|
||||
using StellaOps.Timeline.WebService.Endpoints;
|
||||
using StellaOps.Timeline.WebService.Security;
|
||||
using StellaOps.TimelineIndexer.Core.Abstractions;
|
||||
using StellaOps.TimelineIndexer.Core.Models;
|
||||
using StellaOps.TimelineIndexer.Infrastructure.DependencyInjection;
|
||||
using StellaOps.TimelineIndexer.Infrastructure.Options;
|
||||
using StellaOps.TimelineIndexer.Infrastructure.Subscriptions;
|
||||
using static StellaOps.Localization.T;
|
||||
|
||||
// Aliases to disambiguate indexer types from Timeline.Core types (different interfaces/models, same simple name)
|
||||
using IndexerQueryService = StellaOps.TimelineIndexer.Core.Abstractions.ITimelineQueryService;
|
||||
using IndexerQueryOptions = StellaOps.TimelineIndexer.Core.Models.TimelineQueryOptions;
|
||||
|
||||
var builder = WebApplication.CreateBuilder(args);
|
||||
|
||||
// Add services
|
||||
// ── Timeline (HLC) services ─────────────────────────────────────────────────
|
||||
builder.Services.AddStellaOpsEventing(builder.Configuration);
|
||||
builder.Services.AddTimelineServices(builder.Configuration);
|
||||
builder.Services.AddSingleton(TimeProvider.System);
|
||||
@@ -23,11 +35,11 @@ builder.Services.Configure<UnifiedAuditModuleEndpointsOptions>(options =>
|
||||
?? options.AuthorityBaseUrl;
|
||||
|
||||
options.JobEngineBaseUrl = builder.Configuration["UnifiedAudit:Sources:JobEngine"]
|
||||
?? builder.Configuration["STELLAOPS_JOBENGINE_URL"]
|
||||
?? builder.Configuration["STELLAOPS_RELEASE_ORCHESTRATOR_URL"]
|
||||
?? options.JobEngineBaseUrl;
|
||||
|
||||
options.PolicyBaseUrl = builder.Configuration["UnifiedAudit:Sources:Policy"]
|
||||
?? builder.Configuration["STELLAOPS_POLICY_GATEWAY_URL"]
|
||||
?? builder.Configuration["STELLAOPS_POLICY_ENGINE_URL"]
|
||||
?? options.PolicyBaseUrl;
|
||||
|
||||
options.EvidenceLockerBaseUrl = builder.Configuration["UnifiedAudit:Sources:EvidenceLocker"]
|
||||
@@ -61,6 +73,21 @@ builder.Services.AddSingleton<IngestAuditEventStore>();
|
||||
builder.Services.AddSingleton<IUnifiedAuditEventProvider, CompositeUnifiedAuditEventProvider>();
|
||||
builder.Services.AddSingleton<IUnifiedAuditAggregationService, UnifiedAuditAggregationService>();
|
||||
|
||||
// ── Timeline Indexer services (merged from timeline-indexer-web) ─────────────
|
||||
builder.Configuration.AddEnvironmentVariables(prefix: "TIMELINE_");
|
||||
builder.Services.AddTimelineIndexerPostgres(builder.Configuration);
|
||||
builder.Services.AddSingleton<IAuthEventSink, TimelineAuthorizationAuditSink>();
|
||||
|
||||
// ── Timeline Indexer ingestion worker (merged from timeline-indexer-worker) ──
|
||||
builder.Services.AddOptions<TimelineIngestionOptions>()
|
||||
.Bind(builder.Configuration.GetSection("Ingestion"));
|
||||
builder.Services.AddSingleton<TimelineEnvelopeParser>();
|
||||
builder.Services.AddSingleton<ITimelineEventSubscriber, NatsTimelineEventSubscriber>();
|
||||
builder.Services.AddSingleton<ITimelineEventSubscriber, RedisTimelineEventSubscriber>();
|
||||
builder.Services.AddSingleton<ITimelineEventSubscriber, NullTimelineEventSubscriber>();
|
||||
builder.Services.AddHostedService<TimelineIngestionWorker>();
|
||||
|
||||
// ── Shared infrastructure ───────────────────────────────────────────────────
|
||||
builder.Services.AddEndpointsApiExplorer();
|
||||
builder.Services.AddSwaggerGen(options =>
|
||||
{
|
||||
@@ -68,7 +95,7 @@ builder.Services.AddSwaggerGen(options =>
|
||||
{
|
||||
Title = "StellaOps Timeline API",
|
||||
Version = "v1",
|
||||
Description = "Unified event timeline API for querying, replaying, and exporting HLC-ordered events"
|
||||
Description = "Unified event timeline API for querying, replaying, and exporting HLC-ordered events plus timeline indexer query/evidence endpoints"
|
||||
});
|
||||
});
|
||||
|
||||
@@ -82,6 +109,7 @@ builder.Services.AddAuthorization(options =>
|
||||
{
|
||||
options.AddStellaOpsScopePolicy(TimelinePolicies.Read, StellaOpsScopes.TimelineRead);
|
||||
options.AddStellaOpsScopePolicy(TimelinePolicies.Write, StellaOpsScopes.TimelineWrite);
|
||||
options.AddObservabilityResourcePolicies();
|
||||
});
|
||||
|
||||
builder.Services.AddStellaOpsCors(builder.Environment, builder.Configuration);
|
||||
@@ -118,15 +146,128 @@ app.TryUseStellaRouter(routerEnabled);
|
||||
// Map endpoints
|
||||
await app.LoadTranslationsAsync();
|
||||
|
||||
// Timeline HLC endpoints (original timeline-web)
|
||||
app.MapTimelineEndpoints();
|
||||
app.MapReplayEndpoints();
|
||||
app.MapExportEndpoints();
|
||||
app.MapUnifiedAuditEndpoints();
|
||||
app.MapHealthEndpoints();
|
||||
|
||||
// Timeline Indexer endpoints (merged from timeline-indexer-web)
|
||||
// Mount under /api/v1 prefix (matching gateway route) and bare prefix for direct access
|
||||
MapTimelineIndexerEndpoints(app.MapGroup("/api/v1").RequireTenant(), routeNamePrefix: "timeline_api_v1");
|
||||
MapTimelineIndexerEndpoints(app.MapGroup(string.Empty).RequireTenant(), routeNamePrefix: "timeline");
|
||||
|
||||
app.TryRefreshStellaRouterEndpoints(routerEnabled);
|
||||
await app.RunAsync().ConfigureAwait(false);
|
||||
|
||||
// ── Indexer endpoint definitions (ported from timeline-indexer-web Program.cs) ──
|
||||
|
||||
static string GetTenantId(HttpContext ctx)
|
||||
{
|
||||
if (ctx.Request.Headers.TryGetValue("X-Tenant", out var header) && !string.IsNullOrWhiteSpace(header))
|
||||
{
|
||||
return header!;
|
||||
}
|
||||
|
||||
var tenant = ctx.User.FindFirst("tenant")?.Value;
|
||||
if (!string.IsNullOrWhiteSpace(tenant))
|
||||
{
|
||||
return tenant!;
|
||||
}
|
||||
|
||||
throw new InvalidOperationException("Tenant not provided");
|
||||
}
|
||||
|
||||
static void MapTimelineIndexerEndpoints(RouteGroupBuilder routes, string routeNamePrefix)
|
||||
{
|
||||
routes.MapGet("/timeline", async (
|
||||
HttpContext ctx,
|
||||
IndexerQueryService service,
|
||||
[FromQuery] string? eventType,
|
||||
[FromQuery] string? source,
|
||||
[FromQuery] string? correlationId,
|
||||
[FromQuery] string? traceId,
|
||||
[FromQuery] string? severity,
|
||||
[FromQuery] DateTimeOffset? since,
|
||||
[FromQuery] long? after,
|
||||
[FromQuery] int? limit,
|
||||
CancellationToken cancellationToken) =>
|
||||
{
|
||||
var tenantId = GetTenantId(ctx);
|
||||
var options = new IndexerQueryOptions
|
||||
{
|
||||
EventType = eventType,
|
||||
Source = source,
|
||||
CorrelationId = correlationId,
|
||||
TraceId = traceId,
|
||||
Severity = severity,
|
||||
Since = since,
|
||||
AfterEventSeq = after,
|
||||
Limit = limit ?? 100
|
||||
};
|
||||
var items = await service.QueryAsync(tenantId, options, cancellationToken).ConfigureAwait(false);
|
||||
return Results.Ok(items);
|
||||
})
|
||||
.WithName($"{routeNamePrefix}_query")
|
||||
.WithSummary("List timeline events")
|
||||
.WithDescription(_t("timelineindexer.timeline.query_description"))
|
||||
.WithTags("timeline-indexer")
|
||||
.Produces<IReadOnlyList<TimelineEventView>>(StatusCodes.Status200OK)
|
||||
.Produces(StatusCodes.Status401Unauthorized)
|
||||
.RequireAuthorization(StellaOpsResourceServerPolicies.TimelineRead);
|
||||
|
||||
routes.MapGet("/timeline/{eventId}", async (
|
||||
HttpContext ctx,
|
||||
IndexerQueryService service,
|
||||
string eventId,
|
||||
CancellationToken cancellationToken) =>
|
||||
{
|
||||
var tenantId = GetTenantId(ctx);
|
||||
var item = await service.GetAsync(tenantId, eventId, cancellationToken).ConfigureAwait(false);
|
||||
return item is null ? Results.NotFound() : Results.Ok(item);
|
||||
})
|
||||
.WithName($"{routeNamePrefix}_get_by_id")
|
||||
.WithSummary("Get timeline event")
|
||||
.WithDescription(_t("timelineindexer.timeline.get_by_id_description"))
|
||||
.WithTags("timeline-indexer")
|
||||
.Produces<TimelineEventView>(StatusCodes.Status200OK)
|
||||
.Produces(StatusCodes.Status404NotFound)
|
||||
.Produces(StatusCodes.Status401Unauthorized)
|
||||
.RequireAuthorization(StellaOpsResourceServerPolicies.TimelineRead);
|
||||
|
||||
routes.MapGet("/timeline/{eventId}/evidence", async (
|
||||
HttpContext ctx,
|
||||
IndexerQueryService service,
|
||||
string eventId,
|
||||
CancellationToken cancellationToken) =>
|
||||
{
|
||||
var tenantId = GetTenantId(ctx);
|
||||
var evidence = await service.GetEvidenceAsync(tenantId, eventId, cancellationToken).ConfigureAwait(false);
|
||||
return evidence is null ? Results.NotFound() : Results.Ok(evidence);
|
||||
})
|
||||
.WithName($"{routeNamePrefix}_get_evidence")
|
||||
.WithSummary("Get event evidence")
|
||||
.WithDescription(_t("timelineindexer.timeline.get_evidence_description"))
|
||||
.WithTags("timeline-indexer")
|
||||
.Produces<TimelineEvidenceView>(StatusCodes.Status200OK)
|
||||
.Produces(StatusCodes.Status404NotFound)
|
||||
.Produces(StatusCodes.Status401Unauthorized)
|
||||
.RequireAuthorization(StellaOpsResourceServerPolicies.TimelineRead);
|
||||
|
||||
routes.MapPost("/timeline/events", () =>
|
||||
Results.Accepted("/timeline/events", new TimelineIngestAcceptedResponse("indexed")))
|
||||
.WithName($"{routeNamePrefix}_ingest_event")
|
||||
.WithSummary("Ingest timeline event")
|
||||
.WithDescription(_t("timelineindexer.timeline.ingest_description"))
|
||||
.WithTags("timeline-indexer")
|
||||
.Produces<TimelineIngestAcceptedResponse>(StatusCodes.Status202Accepted)
|
||||
.Produces(StatusCodes.Status401Unauthorized)
|
||||
.RequireAuthorization(StellaOpsResourceServerPolicies.TimelineWrite);
|
||||
}
|
||||
|
||||
public sealed record TimelineIngestAcceptedResponse(string Status);
|
||||
|
||||
namespace StellaOps.Timeline.WebService
|
||||
{
|
||||
public partial class Program { }
|
||||
|
||||
@@ -11,6 +11,8 @@
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\__Libraries\StellaOps.Timeline.Core\StellaOps.Timeline.Core.csproj" />
|
||||
<ProjectReference Include="..\__Libraries\StellaOps.TimelineIndexer.Core\StellaOps.TimelineIndexer.Core.csproj" />
|
||||
<ProjectReference Include="..\__Libraries\StellaOps.TimelineIndexer.Infrastructure\StellaOps.TimelineIndexer.Infrastructure.csproj" />
|
||||
<ProjectReference Include="..\..\__Libraries\StellaOps.Eventing\StellaOps.Eventing.csproj" />
|
||||
<ProjectReference Include="..\..\__Libraries\StellaOps.HybridLogicalClock\StellaOps.HybridLogicalClock.csproj" />
|
||||
<ProjectReference Include="..\..\Router\__Libraries\StellaOps.Microservice\StellaOps.Microservice.csproj" />
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
{
|
||||
"_meta": { "locale": "en-US", "namespace": "timelineindexer", "version": "1.0" },
|
||||
|
||||
"timelineindexer.timeline.query_description": "Returns timeline events filtered by tenant and optional query parameters.",
|
||||
"timelineindexer.timeline.get_by_id_description": "Returns a single timeline event by event identifier for the current tenant.",
|
||||
"timelineindexer.timeline.get_evidence_description": "Returns evidence linkage for a timeline event, including bundle and attestation references.",
|
||||
"timelineindexer.timeline.ingest_description": "Queues an event ingestion request for asynchronous timeline indexing."
|
||||
}
|
||||
@@ -3,9 +3,12 @@ using StellaOps.TimelineIndexer.Core.Abstractions;
|
||||
using StellaOps.TimelineIndexer.Infrastructure.DependencyInjection;
|
||||
using StellaOps.TimelineIndexer.Infrastructure.Options;
|
||||
using StellaOps.TimelineIndexer.Infrastructure.Subscriptions;
|
||||
using StellaOps.TimelineIndexer.Worker;
|
||||
using StellaOps.Worker.Health;
|
||||
|
||||
// NOTE: This worker service is dormant -- its logic has been merged into
|
||||
// StellaOps.Timeline.WebService (timeline-web). This Program.cs is kept for
|
||||
// reference and standalone testing but is no longer deployed as a container.
|
||||
|
||||
var builder = WebApplication.CreateSlimBuilder(args);
|
||||
|
||||
builder.Configuration.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);
|
||||
@@ -21,7 +24,8 @@ builder.Services.AddSingleton<TimelineEnvelopeParser>();
|
||||
builder.Services.AddSingleton<ITimelineEventSubscriber, NatsTimelineEventSubscriber>();
|
||||
builder.Services.AddSingleton<ITimelineEventSubscriber, RedisTimelineEventSubscriber>();
|
||||
builder.Services.AddSingleton<ITimelineEventSubscriber, NullTimelineEventSubscriber>();
|
||||
builder.Services.AddHostedService<TimelineIngestionWorker>();
|
||||
// Use fully qualified name to avoid ambiguity with Infrastructure.Subscriptions.TimelineIngestionWorker
|
||||
builder.Services.AddHostedService<StellaOps.TimelineIndexer.Infrastructure.Subscriptions.TimelineIngestionWorker>();
|
||||
builder.Services.AddWorkerHealthChecks();
|
||||
|
||||
var app = builder.Build();
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using StellaOps.TimelineIndexer.Core.Abstractions;
|
||||
using StellaOps.TimelineIndexer.Core.Models;
|
||||
using System.Collections.Concurrent;
|
||||
@@ -8,11 +9,13 @@ using System.Linq;
|
||||
namespace StellaOps.TimelineIndexer.Worker;
|
||||
|
||||
/// <summary>
|
||||
/// Background consumer that reads timeline events from configured subscribers and persists them via the ingestion service.
|
||||
/// NOTE: This worker class is dormant -- the authoritative copy now lives in
|
||||
/// StellaOps.TimelineIndexer.Infrastructure.Subscriptions.TimelineIngestionWorker.
|
||||
/// Kept here for reference only.
|
||||
/// </summary>
|
||||
public sealed class TimelineIngestionWorker(
|
||||
IEnumerable<ITimelineEventSubscriber> subscribers,
|
||||
ITimelineIngestionService ingestionService,
|
||||
IServiceScopeFactory scopeFactory,
|
||||
ILogger<TimelineIngestionWorker> logger,
|
||||
TimeProvider? timeProvider = null) : BackgroundService
|
||||
{
|
||||
@@ -23,7 +26,7 @@ public sealed class TimelineIngestionWorker(
|
||||
private static readonly Histogram<double> LagHistogram = Meter.CreateHistogram<double>("timeline.ingest.lag.seconds");
|
||||
|
||||
private readonly IEnumerable<ITimelineEventSubscriber> _subscribers = subscribers;
|
||||
private readonly ITimelineIngestionService _ingestion = ingestionService;
|
||||
private readonly IServiceScopeFactory _scopeFactory = scopeFactory;
|
||||
private readonly ILogger<TimelineIngestionWorker> _logger = logger;
|
||||
private readonly ConcurrentDictionary<(string tenant, string eventId), byte> _sessionSeen = new();
|
||||
private readonly TimeProvider _timeProvider = timeProvider ?? TimeProvider.System;
|
||||
@@ -48,7 +51,10 @@ public sealed class TimelineIngestionWorker(
|
||||
|
||||
try
|
||||
{
|
||||
var result = await _ingestion.IngestAsync(envelope, cancellationToken).ConfigureAwait(false);
|
||||
await using var scope = _scopeFactory.CreateAsyncScope();
|
||||
var ingestion = scope.ServiceProvider.GetRequiredService<ITimelineIngestionService>();
|
||||
|
||||
var result = await ingestion.IngestAsync(envelope, cancellationToken).ConfigureAwait(false);
|
||||
if (result.Inserted)
|
||||
{
|
||||
IngestedCounter.Add(1);
|
||||
@@ -63,7 +69,6 @@ public sealed class TimelineIngestionWorker(
|
||||
}
|
||||
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
// Respect shutdown.
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
|
||||
@@ -32,6 +32,7 @@
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore" />
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" PrivateAssets="all" />
|
||||
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" />
|
||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
|
||||
<PackageReference Include="Microsoft.Extensions.Options" />
|
||||
<PackageReference Include="NATS.Client.Core" />
|
||||
|
||||
@@ -0,0 +1,85 @@
|
||||
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using StellaOps.TimelineIndexer.Core.Abstractions;
|
||||
using StellaOps.TimelineIndexer.Core.Models;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Diagnostics.Metrics;
|
||||
using System.Linq;
|
||||
|
||||
namespace StellaOps.TimelineIndexer.Infrastructure.Subscriptions;
|
||||
|
||||
/// <summary>
|
||||
/// Background consumer that reads timeline events from configured subscribers and persists them via the ingestion service.
|
||||
/// Moved from StellaOps.TimelineIndexer.Worker to enable hosting inside the unified timeline-web service.
|
||||
/// Uses IServiceScopeFactory to resolve scoped ITimelineIngestionService per ingestion call.
|
||||
/// </summary>
|
||||
public sealed class TimelineIngestionWorker(
|
||||
IEnumerable<ITimelineEventSubscriber> subscribers,
|
||||
IServiceScopeFactory scopeFactory,
|
||||
ILogger<TimelineIngestionWorker> logger,
|
||||
TimeProvider? timeProvider = null) : BackgroundService
|
||||
{
|
||||
private static readonly Meter Meter = new("StellaOps.TimelineIndexer", "1.0.0");
|
||||
private static readonly Counter<long> IngestedCounter = Meter.CreateCounter<long>("timeline.ingested");
|
||||
private static readonly Counter<long> DuplicateCounter = Meter.CreateCounter<long>("timeline.duplicates");
|
||||
private static readonly Counter<long> FailedCounter = Meter.CreateCounter<long>("timeline.failed");
|
||||
private static readonly Histogram<double> LagHistogram = Meter.CreateHistogram<double>("timeline.ingest.lag.seconds");
|
||||
|
||||
private readonly IEnumerable<ITimelineEventSubscriber> _subscribers = subscribers;
|
||||
private readonly IServiceScopeFactory _scopeFactory = scopeFactory;
|
||||
private readonly ILogger<TimelineIngestionWorker> _logger = logger;
|
||||
private readonly ConcurrentDictionary<(string tenant, string eventId), byte> _sessionSeen = new();
|
||||
private readonly TimeProvider _timeProvider = timeProvider ?? TimeProvider.System;
|
||||
|
||||
protected override Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
var tasks = _subscribers.Select(subscriber => ConsumeAsync(subscriber, stoppingToken)).ToArray();
|
||||
return Task.WhenAll(tasks);
|
||||
}
|
||||
|
||||
private async Task ConsumeAsync(ITimelineEventSubscriber subscriber, CancellationToken cancellationToken)
|
||||
{
|
||||
await foreach (var envelope in subscriber.SubscribeAsync(cancellationToken))
|
||||
{
|
||||
var key = (envelope.TenantId, envelope.EventId);
|
||||
if (!_sessionSeen.TryAdd(key, 0))
|
||||
{
|
||||
DuplicateCounter.Add(1);
|
||||
_logger.LogDebug("Skipped duplicate timeline event {EventId} for tenant {Tenant}", envelope.EventId, envelope.TenantId);
|
||||
continue;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
// Create a scope to resolve the scoped ITimelineIngestionService
|
||||
await using var scope = _scopeFactory.CreateAsyncScope();
|
||||
var ingestion = scope.ServiceProvider.GetRequiredService<ITimelineIngestionService>();
|
||||
|
||||
var result = await ingestion.IngestAsync(envelope, cancellationToken).ConfigureAwait(false);
|
||||
if (result.Inserted)
|
||||
{
|
||||
IngestedCounter.Add(1);
|
||||
LagHistogram.Record((_timeProvider.GetUtcNow() - envelope.OccurredAt).TotalSeconds);
|
||||
_logger.LogInformation("Ingested timeline event {EventId} from {Source} (tenant {Tenant})", envelope.EventId, envelope.Source, envelope.TenantId);
|
||||
}
|
||||
else
|
||||
{
|
||||
DuplicateCounter.Add(1);
|
||||
_logger.LogDebug("Store reported duplicate for event {EventId} tenant {Tenant}", envelope.EventId, envelope.TenantId);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
// Respect shutdown.
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
FailedCounter.Add(1);
|
||||
_logger.LogError(ex, "Failed to ingest timeline event {EventId} for tenant {Tenant}", envelope.EventId, envelope.TenantId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -45,7 +45,7 @@ public sealed class TimelineApiIntegrationTests : IClassFixture<TimelineWebAppli
|
||||
await SeedEventsAsync(correlationId, 5);
|
||||
|
||||
// Act
|
||||
var response = await _client.GetAsync($"/api/v1/timeline/{correlationId}");
|
||||
var response = await _client.GetAsync($"/api/v1/timeline/hlc/{correlationId}");
|
||||
|
||||
// Assert
|
||||
response.StatusCode.Should().Be(HttpStatusCode.OK);
|
||||
@@ -60,7 +60,7 @@ public sealed class TimelineApiIntegrationTests : IClassFixture<TimelineWebAppli
|
||||
public async Task GetTimeline_Returns404_WhenCorrelationNotFound()
|
||||
{
|
||||
// Act
|
||||
var response = await _client.GetAsync("/api/v1/timeline/nonexistent-correlation");
|
||||
var response = await _client.GetAsync("/api/v1/timeline/hlc/nonexistent-correlation");
|
||||
|
||||
// Assert
|
||||
response.StatusCode.Should().Be(HttpStatusCode.NotFound);
|
||||
@@ -74,7 +74,7 @@ public sealed class TimelineApiIntegrationTests : IClassFixture<TimelineWebAppli
|
||||
await SeedEventsAsync(correlationId, 10);
|
||||
|
||||
// Act
|
||||
var response = await _client.GetAsync($"/api/v1/timeline/{correlationId}");
|
||||
var response = await _client.GetAsync($"/api/v1/timeline/hlc/{correlationId}");
|
||||
var timeline = await response.Content.ReadFromJsonAsync<TimelineResponse>();
|
||||
|
||||
// Assert
|
||||
@@ -90,7 +90,7 @@ public sealed class TimelineApiIntegrationTests : IClassFixture<TimelineWebAppli
|
||||
await SeedEventsAsync(correlationId, 20);
|
||||
|
||||
// Act - Get first page
|
||||
var response1 = await _client.GetAsync($"/api/v1/timeline/{correlationId}?limit=10");
|
||||
var response1 = await _client.GetAsync($"/api/v1/timeline/hlc/{correlationId}?limit=10");
|
||||
var page1 = await response1.Content.ReadFromJsonAsync<TimelineResponse>();
|
||||
|
||||
// Assert
|
||||
@@ -108,7 +108,7 @@ public sealed class TimelineApiIntegrationTests : IClassFixture<TimelineWebAppli
|
||||
await SeedEventsWithServicesAsync(correlationId);
|
||||
|
||||
// Act
|
||||
var response = await _client.GetAsync($"/api/v1/timeline/{correlationId}?services=Scheduler");
|
||||
var response = await _client.GetAsync($"/api/v1/timeline/hlc/{correlationId}?services=Scheduler");
|
||||
var timeline = await response.Content.ReadFromJsonAsync<TimelineResponse>();
|
||||
|
||||
// Assert
|
||||
@@ -124,7 +124,7 @@ public sealed class TimelineApiIntegrationTests : IClassFixture<TimelineWebAppli
|
||||
await SeedEventsAsync(correlationId, 5);
|
||||
|
||||
// Act
|
||||
var response = await _client.GetAsync($"/api/v1/timeline/{correlationId}/critical-path");
|
||||
var response = await _client.GetAsync($"/api/v1/timeline/hlc/{correlationId}/critical-path");
|
||||
|
||||
// Assert
|
||||
response.StatusCode.Should().Be(HttpStatusCode.OK);
|
||||
@@ -150,7 +150,7 @@ public sealed class TimelineApiIntegrationTests : IClassFixture<TimelineWebAppli
|
||||
public async Task GetTimeline_ReturnsBadRequest_WhenFromHlcIsInvalid()
|
||||
{
|
||||
// Act
|
||||
var response = await _client.GetAsync("/api/v1/timeline/nonexistent-correlation?fromHlc=invalid-hlc&toHlc=1700000000000-node-000001");
|
||||
var response = await _client.GetAsync("/api/v1/timeline/hlc/nonexistent-correlation?fromHlc=invalid-hlc&toHlc=1700000000000-node-000001");
|
||||
|
||||
// Assert
|
||||
response.StatusCode.Should().Be(HttpStatusCode.BadRequest);
|
||||
@@ -168,7 +168,7 @@ public sealed class TimelineApiIntegrationTests : IClassFixture<TimelineWebAppli
|
||||
|
||||
// Act
|
||||
var initiateResponse = await _client.PostAsJsonAsync(
|
||||
$"/api/v1/timeline/{correlationId}/replay",
|
||||
$"/api/v1/timeline/hlc/{correlationId}/replay",
|
||||
new ReplayRequest { Mode = "dry-run" });
|
||||
|
||||
// Assert initiate
|
||||
@@ -178,7 +178,7 @@ public sealed class TimelineApiIntegrationTests : IClassFixture<TimelineWebAppli
|
||||
initiated!.ReplayId.Should().NotBeNullOrWhiteSpace();
|
||||
|
||||
// Assert status endpoint is queryable from a separate request scope
|
||||
var statusResponse = await _client.GetAsync($"/api/v1/timeline/replay/{initiated.ReplayId}");
|
||||
var statusResponse = await _client.GetAsync($"/api/v1/timeline/hlc/replay/{initiated.ReplayId}");
|
||||
statusResponse.StatusCode.Should().Be(HttpStatusCode.OK);
|
||||
var statusPayload = await statusResponse.Content.ReadFromJsonAsync<ReplayStatusResponse>();
|
||||
statusPayload.Should().NotBeNull();
|
||||
@@ -190,7 +190,7 @@ public sealed class TimelineApiIntegrationTests : IClassFixture<TimelineWebAppli
|
||||
public async Task ExportStatus_ReturnsNotFound_ForUnknownExportId()
|
||||
{
|
||||
// Act
|
||||
var response = await _client.GetAsync("/api/v1/timeline/export/unknown-export-id");
|
||||
var response = await _client.GetAsync("/api/v1/timeline/hlc/export/unknown-export-id");
|
||||
|
||||
// Assert
|
||||
response.StatusCode.Should().Be(HttpStatusCode.NotFound);
|
||||
@@ -201,7 +201,7 @@ public sealed class TimelineApiIntegrationTests : IClassFixture<TimelineWebAppli
|
||||
public async Task ExportDownload_ReturnsNotFound_ForUnknownExportId()
|
||||
{
|
||||
// Act
|
||||
var response = await _client.GetAsync("/api/v1/timeline/export/unknown-export-id/download");
|
||||
var response = await _client.GetAsync("/api/v1/timeline/hlc/export/unknown-export-id/download");
|
||||
|
||||
// Assert
|
||||
response.StatusCode.Should().Be(HttpStatusCode.NotFound);
|
||||
@@ -217,7 +217,7 @@ public sealed class TimelineApiIntegrationTests : IClassFixture<TimelineWebAppli
|
||||
|
||||
// Act
|
||||
var initiateResponse = await _client.PostAsJsonAsync(
|
||||
$"/api/v1/timeline/{correlationId}/export",
|
||||
$"/api/v1/timeline/hlc/{correlationId}/export",
|
||||
new ExportRequest { Format = "ndjson", SignBundle = false });
|
||||
|
||||
// Assert initiate
|
||||
@@ -229,7 +229,7 @@ public sealed class TimelineApiIntegrationTests : IClassFixture<TimelineWebAppli
|
||||
ExportStatusResponse? status = null;
|
||||
for (var attempt = 0; attempt < 20; attempt++)
|
||||
{
|
||||
var statusResponse = await _client.GetAsync($"/api/v1/timeline/export/{initiated.ExportId}");
|
||||
var statusResponse = await _client.GetAsync($"/api/v1/timeline/hlc/export/{initiated.ExportId}");
|
||||
if (statusResponse.StatusCode == HttpStatusCode.OK)
|
||||
{
|
||||
status = await statusResponse.Content.ReadFromJsonAsync<ExportStatusResponse>();
|
||||
@@ -245,7 +245,7 @@ public sealed class TimelineApiIntegrationTests : IClassFixture<TimelineWebAppli
|
||||
status.Should().NotBeNull();
|
||||
status!.Status.Should().Be("COMPLETED");
|
||||
|
||||
var downloadResponse = await _client.GetAsync($"/api/v1/timeline/export/{initiated.ExportId}/download");
|
||||
var downloadResponse = await _client.GetAsync($"/api/v1/timeline/hlc/export/{initiated.ExportId}/download");
|
||||
downloadResponse.StatusCode.Should().Be(HttpStatusCode.OK);
|
||||
var bundleContent = await downloadResponse.Content.ReadAsStringAsync();
|
||||
bundleContent.Should().Contain(correlationId);
|
||||
@@ -401,6 +401,8 @@ public sealed class TimelineWebApplicationFactory : WebApplicationFactory<Stella
|
||||
config.AddInMemoryCollection(new Dictionary<string, string?>
|
||||
{
|
||||
["Authority:ResourceServer:Authority"] = "http://localhost",
|
||||
// Required by TimelineIndexer Postgres registration (merged from timeline-indexer)
|
||||
["Postgres:Timeline:ConnectionString"] = "Host=localhost;Port=5432;Database=timeline_test;Username=test;Password=test",
|
||||
});
|
||||
});
|
||||
|
||||
@@ -416,6 +418,28 @@ public sealed class TimelineWebApplicationFactory : WebApplicationFactory<Stella
|
||||
services.RemoveAll<IConfigureOptions<JwtBearerOptions>>();
|
||||
services.RemoveAll<IPostConfigureOptions<JwtBearerOptions>>();
|
||||
|
||||
// Remove indexer Postgres services that require a real DB connection.
|
||||
// The migration hosted service, data source, and migration runner are
|
||||
// registered by AddTimelineIndexerPostgres and will fail without Postgres.
|
||||
var indexerHostedServices = services
|
||||
.Where(d => d.ServiceType == typeof(Microsoft.Extensions.Hosting.IHostedService)
|
||||
&& d.ImplementationType?.FullName?.Contains("TimelineIndexerMigration") == true)
|
||||
.ToList();
|
||||
foreach (var descriptor in indexerHostedServices)
|
||||
{
|
||||
services.Remove(descriptor);
|
||||
}
|
||||
|
||||
// Also remove the ingestion worker (no transport in tests)
|
||||
var ingestionWorkers = services
|
||||
.Where(d => d.ServiceType == typeof(Microsoft.Extensions.Hosting.IHostedService)
|
||||
&& d.ImplementationType?.FullName?.Contains("TimelineIngestionWorker") == true)
|
||||
.ToList();
|
||||
foreach (var descriptor in ingestionWorkers)
|
||||
{
|
||||
services.Remove(descriptor);
|
||||
}
|
||||
|
||||
services.AddSingleton<ITimelineEventStore, InMemoryTimelineEventStore>();
|
||||
services.AddSingleton<ITimelineEventEmitter, NoOpTimelineEventEmitter>();
|
||||
services.AddSingleton<IHybridLogicalClock>(_ =>
|
||||
|
||||
@@ -25,6 +25,8 @@ public sealed class TimelineStartupRegistrationTests
|
||||
["Eventing:UseInMemoryStore"] = "false",
|
||||
["Eventing:ConnectionString"] = "Host=localhost;Port=5432;Database=timeline;Username=postgres;Password=postgres",
|
||||
["Authority:ResourceServer:Authority"] = "http://localhost",
|
||||
// Required by TimelineIndexer Postgres registration (merged from timeline-indexer)
|
||||
["Postgres:Timeline:ConnectionString"] = "Host=localhost;Port=5432;Database=timeline;Username=postgres;Password=postgres",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user