save checkpoint
This commit is contained in:
@@ -2,6 +2,8 @@
|
||||
|
||||
using Microsoft.AspNetCore.Http.HttpResults;
|
||||
using StellaOps.Timeline.Core;
|
||||
using StellaOps.Timeline.Core.Export;
|
||||
using StellaOps.HybridLogicalClock;
|
||||
|
||||
namespace StellaOps.Timeline.WebService.Endpoints;
|
||||
|
||||
@@ -35,6 +37,7 @@ public static class ExportEndpoints
|
||||
string correlationId,
|
||||
ExportRequest request,
|
||||
ITimelineQueryService queryService,
|
||||
ITimelineBundleBuilder bundleBuilder,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(correlationId))
|
||||
@@ -42,6 +45,21 @@ public static class ExportEndpoints
|
||||
return TypedResults.BadRequest("Correlation ID is required");
|
||||
}
|
||||
|
||||
if (!IsSupportedFormat(request.Format))
|
||||
{
|
||||
return TypedResults.BadRequest("Format must be either 'ndjson' or 'json'.");
|
||||
}
|
||||
|
||||
if (!TryParseHlc(request.FromHlc, "fromHlc", out var fromHlc, out var fromParseError))
|
||||
{
|
||||
return TypedResults.BadRequest(fromParseError);
|
||||
}
|
||||
|
||||
if (!TryParseHlc(request.ToHlc, "toHlc", out var toHlc, out var toParseError))
|
||||
{
|
||||
return TypedResults.BadRequest(toParseError);
|
||||
}
|
||||
|
||||
// Validate the correlation exists
|
||||
var result = await queryService.GetByCorrelationIdAsync(correlationId, new TimelineQueryOptions { Limit = 1 }, cancellationToken);
|
||||
if (result.Events.Count == 0)
|
||||
@@ -49,60 +67,108 @@ public static class ExportEndpoints
|
||||
return TypedResults.BadRequest($"No events found for correlation ID: {correlationId}");
|
||||
}
|
||||
|
||||
// TODO: Queue export job
|
||||
var exportId = Guid.NewGuid().ToString("N")[..16];
|
||||
var operation = await bundleBuilder.InitiateExportAsync(
|
||||
correlationId,
|
||||
new Core.Export.ExportRequest
|
||||
{
|
||||
Format = request.Format.ToLowerInvariant(),
|
||||
SignBundle = request.SignBundle,
|
||||
FromHlc = fromHlc,
|
||||
ToHlc = toHlc,
|
||||
IncludePayloads = true
|
||||
},
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
return TypedResults.Accepted(
|
||||
$"/api/v1/timeline/export/{exportId}",
|
||||
$"/api/v1/timeline/export/{operation.ExportId}",
|
||||
new ExportInitiatedResponse
|
||||
{
|
||||
ExportId = exportId,
|
||||
CorrelationId = correlationId,
|
||||
Format = request.Format,
|
||||
SignBundle = request.SignBundle,
|
||||
Status = "INITIATED",
|
||||
ExportId = operation.ExportId,
|
||||
CorrelationId = operation.CorrelationId,
|
||||
Format = operation.Format,
|
||||
SignBundle = operation.SignBundle,
|
||||
Status = MapStatus(operation.Status),
|
||||
EstimatedEventCount = result.TotalCount
|
||||
});
|
||||
}
|
||||
|
||||
private static async Task<Results<Ok<ExportStatusResponse>, NotFound>> GetExportStatusAsync(
|
||||
string exportId,
|
||||
ITimelineBundleBuilder bundleBuilder,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
// TODO: Integrate with export state store
|
||||
await Task.CompletedTask;
|
||||
var operation = await bundleBuilder.GetExportStatusAsync(exportId, cancellationToken).ConfigureAwait(false);
|
||||
if (operation is null)
|
||||
{
|
||||
return TypedResults.NotFound();
|
||||
}
|
||||
|
||||
return TypedResults.Ok(new ExportStatusResponse
|
||||
{
|
||||
ExportId = exportId,
|
||||
Status = "COMPLETED",
|
||||
Format = "ndjson",
|
||||
EventCount = 100,
|
||||
FileSizeBytes = 45678,
|
||||
CreatedAt = DateTimeOffset.UtcNow.AddMinutes(-1),
|
||||
CompletedAt = DateTimeOffset.UtcNow.AddSeconds(-30)
|
||||
ExportId = operation.ExportId,
|
||||
Status = MapStatus(operation.Status),
|
||||
Format = operation.Format,
|
||||
EventCount = operation.EventCount,
|
||||
FileSizeBytes = operation.FileSizeBytes,
|
||||
CreatedAt = operation.CreatedAt,
|
||||
CompletedAt = operation.CompletedAt,
|
||||
Error = operation.Error
|
||||
});
|
||||
}
|
||||
|
||||
private static async Task<Results<FileStreamHttpResult, NotFound>> DownloadExportAsync(
|
||||
string exportId,
|
||||
ITimelineBundleBuilder bundleBuilder,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
// TODO: Integrate with export storage
|
||||
await Task.CompletedTask;
|
||||
|
||||
// Return stub for now - real implementation would stream from storage
|
||||
var stubContent = """
|
||||
{"event_id":"abc123","correlation_id":"scan-1","kind":"ENQUEUE"}
|
||||
{"event_id":"def456","correlation_id":"scan-1","kind":"EXECUTE"}
|
||||
""";
|
||||
var stream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(stubContent));
|
||||
var bundle = await bundleBuilder.GetExportBundleAsync(exportId, cancellationToken).ConfigureAwait(false);
|
||||
if (bundle is null)
|
||||
{
|
||||
return TypedResults.NotFound();
|
||||
}
|
||||
|
||||
return TypedResults.File(
|
||||
stream,
|
||||
contentType: "application/x-ndjson",
|
||||
fileDownloadName: $"timeline-{exportId}.ndjson");
|
||||
bundle.Content,
|
||||
contentType: bundle.ContentType,
|
||||
fileDownloadName: bundle.FileName);
|
||||
}
|
||||
|
||||
private static bool IsSupportedFormat(string format) =>
|
||||
string.Equals(format, "ndjson", StringComparison.OrdinalIgnoreCase) ||
|
||||
string.Equals(format, "json", StringComparison.OrdinalIgnoreCase);
|
||||
|
||||
private static bool TryParseHlc(
|
||||
string? rawValue,
|
||||
string parameterName,
|
||||
out HlcTimestamp? parsedValue,
|
||||
out string error)
|
||||
{
|
||||
parsedValue = null;
|
||||
error = string.Empty;
|
||||
|
||||
if (string.IsNullOrWhiteSpace(rawValue))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
if (HlcTimestamp.TryParse(rawValue, out var hlc))
|
||||
{
|
||||
parsedValue = hlc;
|
||||
return true;
|
||||
}
|
||||
|
||||
error = $"Invalid {parameterName} value '{rawValue}'. Expected format '{{physicalTime13}}-{{nodeId}}-{{counter6}}'.";
|
||||
return false;
|
||||
}
|
||||
|
||||
private static string MapStatus(ExportStatus status) => status switch
|
||||
{
|
||||
ExportStatus.Initiated => "INITIATED",
|
||||
ExportStatus.InProgress => "IN_PROGRESS",
|
||||
ExportStatus.Completed => "COMPLETED",
|
||||
ExportStatus.Failed => "FAILED",
|
||||
_ => "UNKNOWN"
|
||||
};
|
||||
}
|
||||
|
||||
// DTOs
|
||||
|
||||
@@ -49,12 +49,27 @@ public static class ReplayEndpoints
|
||||
return TypedResults.BadRequest("Correlation ID is required");
|
||||
}
|
||||
|
||||
if (!IsSupportedMode(request.Mode))
|
||||
{
|
||||
return TypedResults.BadRequest("Mode must be either 'dry-run' or 'verify'.");
|
||||
}
|
||||
|
||||
if (!TryParseHlc(request.FromHlc, "fromHlc", out var fromHlc, out var fromParseError))
|
||||
{
|
||||
return TypedResults.BadRequest(fromParseError);
|
||||
}
|
||||
|
||||
if (!TryParseHlc(request.ToHlc, "toHlc", out var toHlc, out var toParseError))
|
||||
{
|
||||
return TypedResults.BadRequest(toParseError);
|
||||
}
|
||||
|
||||
// Convert API request to domain request
|
||||
var domainRequest = new Core.Replay.ReplayRequest
|
||||
{
|
||||
Mode = request.Mode,
|
||||
FromHlc = ParseHlc(request.FromHlc),
|
||||
ToHlc = ParseHlc(request.ToHlc)
|
||||
Mode = request.Mode.ToLowerInvariant(),
|
||||
FromHlc = fromHlc,
|
||||
ToHlc = toHlc
|
||||
};
|
||||
|
||||
// Initiate replay via orchestrator
|
||||
@@ -129,14 +144,32 @@ public static class ReplayEndpoints
|
||||
return deleted ? TypedResults.Ok() : TypedResults.NotFound();
|
||||
}
|
||||
|
||||
private static HlcTimestamp? ParseHlc(string? hlcString)
|
||||
private static bool IsSupportedMode(string mode) =>
|
||||
string.Equals(mode, "dry-run", StringComparison.OrdinalIgnoreCase) ||
|
||||
string.Equals(mode, "verify", StringComparison.OrdinalIgnoreCase);
|
||||
|
||||
private static bool TryParseHlc(
|
||||
string? hlcString,
|
||||
string parameterName,
|
||||
out HlcTimestamp? parsedValue,
|
||||
out string error)
|
||||
{
|
||||
parsedValue = null;
|
||||
error = string.Empty;
|
||||
|
||||
if (string.IsNullOrWhiteSpace(hlcString))
|
||||
{
|
||||
return null;
|
||||
return true;
|
||||
}
|
||||
|
||||
return HlcTimestamp.TryParse(hlcString, out var hlc) ? hlc : null;
|
||||
if (HlcTimestamp.TryParse(hlcString, out var hlc))
|
||||
{
|
||||
parsedValue = hlc;
|
||||
return true;
|
||||
}
|
||||
|
||||
error = $"Invalid {parameterName} value '{hlcString}'. Expected format '{{physicalTime13}}-{{nodeId}}-{{counter6}}'.";
|
||||
return false;
|
||||
}
|
||||
|
||||
private static string MapStatus(ReplayStatus status) => status switch
|
||||
|
||||
@@ -28,7 +28,7 @@ public static class TimelineEndpoints
|
||||
.WithDescription("Get the critical path (longest latency stages) for a correlation");
|
||||
}
|
||||
|
||||
private static async Task<Results<Ok<TimelineResponse>, NotFound>> GetTimelineAsync(
|
||||
private static async Task<Results<Ok<TimelineResponse>, NotFound, BadRequest<string>>> GetTimelineAsync(
|
||||
string correlationId,
|
||||
ITimelineQueryService queryService,
|
||||
int? limit,
|
||||
@@ -39,12 +39,22 @@ public static class TimelineEndpoints
|
||||
string? kinds,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (!TryParseHlc(fromHlc, "fromHlc", out var parsedFromHlc, out var fromParseError))
|
||||
{
|
||||
return TypedResults.BadRequest(fromParseError);
|
||||
}
|
||||
|
||||
if (!TryParseHlc(toHlc, "toHlc", out var parsedToHlc, out var toParseError))
|
||||
{
|
||||
return TypedResults.BadRequest(toParseError);
|
||||
}
|
||||
|
||||
var options = new TimelineQueryOptions
|
||||
{
|
||||
Limit = limit ?? 100,
|
||||
Offset = offset ?? 0,
|
||||
FromHlc = !string.IsNullOrEmpty(fromHlc) ? HlcTimestamp.Parse(fromHlc) : null,
|
||||
ToHlc = !string.IsNullOrEmpty(toHlc) ? HlcTimestamp.Parse(toHlc) : null,
|
||||
FromHlc = parsedFromHlc,
|
||||
ToHlc = parsedToHlc,
|
||||
Services = !string.IsNullOrEmpty(services)
|
||||
? services.Split(',', StringSplitOptions.RemoveEmptyEntries).ToList()
|
||||
: null,
|
||||
@@ -109,6 +119,30 @@ public static class TimelineEndpoints
|
||||
}).ToList()
|
||||
});
|
||||
}
|
||||
|
||||
private static bool TryParseHlc(
|
||||
string? rawValue,
|
||||
string parameterName,
|
||||
out HlcTimestamp? parsedValue,
|
||||
out string error)
|
||||
{
|
||||
parsedValue = null;
|
||||
error = string.Empty;
|
||||
|
||||
if (string.IsNullOrWhiteSpace(rawValue))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
if (HlcTimestamp.TryParse(rawValue, out var hlc))
|
||||
{
|
||||
parsedValue = hlc;
|
||||
return true;
|
||||
}
|
||||
|
||||
error = $"Invalid {parameterName} value '{rawValue}'. Expected format '{{physicalTime13}}-{{nodeId}}-{{counter6}}'.";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// DTOs
|
||||
|
||||
@@ -30,11 +30,11 @@ public static class ServiceCollectionExtensions
|
||||
// Register query service
|
||||
services.TryAddScoped<ITimelineQueryService, TimelineQueryService>();
|
||||
|
||||
// Register replay orchestrator
|
||||
services.TryAddScoped<ITimelineReplayOrchestrator, TimelineReplayOrchestrator>();
|
||||
// Replay/export operations keep in-memory operation state and must survive request scopes.
|
||||
services.TryAddSingleton<ITimelineReplayOrchestrator, TimelineReplayOrchestrator>();
|
||||
|
||||
// Register export bundle builder
|
||||
services.TryAddScoped<ITimelineBundleBuilder, TimelineBundleBuilder>();
|
||||
services.TryAddSingleton<ITimelineBundleBuilder, TimelineBundleBuilder>();
|
||||
|
||||
return services;
|
||||
}
|
||||
|
||||
@@ -137,6 +137,112 @@ public sealed class TimelineApiIntegrationTests : IClassFixture<TimelineWebAppli
|
||||
response.StatusCode.Should().Be(HttpStatusCode.OK);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
[Trait("Intent", "Operational")]
|
||||
public async Task GetTimeline_ReturnsBadRequest_WhenFromHlcIsInvalid()
|
||||
{
|
||||
// Act
|
||||
var response = await _client.GetAsync("/api/v1/timeline/nonexistent-correlation?fromHlc=invalid-hlc&toHlc=1700000000000-node-000001");
|
||||
|
||||
// Assert
|
||||
response.StatusCode.Should().Be(HttpStatusCode.BadRequest);
|
||||
var payload = await response.Content.ReadAsStringAsync();
|
||||
payload.Should().Contain("Invalid fromHlc value");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
[Trait("Intent", "Operational")]
|
||||
public async Task ReplayStatus_IsReachableAfterReplayInitiation()
|
||||
{
|
||||
// Arrange
|
||||
const string correlationId = "test-corr-replay-lifecycle";
|
||||
await SeedEventsAsync(correlationId, 3);
|
||||
|
||||
// Act
|
||||
var initiateResponse = await _client.PostAsJsonAsync(
|
||||
$"/api/v1/timeline/{correlationId}/replay",
|
||||
new ReplayRequest { Mode = "dry-run" });
|
||||
|
||||
// Assert initiate
|
||||
initiateResponse.StatusCode.Should().Be(HttpStatusCode.Accepted);
|
||||
var initiated = await initiateResponse.Content.ReadFromJsonAsync<ReplayInitiatedResponse>();
|
||||
initiated.Should().NotBeNull();
|
||||
initiated!.ReplayId.Should().NotBeNullOrWhiteSpace();
|
||||
|
||||
// Assert status endpoint is queryable from a separate request scope
|
||||
var statusResponse = await _client.GetAsync($"/api/v1/timeline/replay/{initiated.ReplayId}");
|
||||
statusResponse.StatusCode.Should().Be(HttpStatusCode.OK);
|
||||
var statusPayload = await statusResponse.Content.ReadFromJsonAsync<ReplayStatusResponse>();
|
||||
statusPayload.Should().NotBeNull();
|
||||
statusPayload!.ReplayId.Should().Be(initiated.ReplayId);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
[Trait("Intent", "Operational")]
|
||||
public async Task ExportStatus_ReturnsNotFound_ForUnknownExportId()
|
||||
{
|
||||
// Act
|
||||
var response = await _client.GetAsync("/api/v1/timeline/export/unknown-export-id");
|
||||
|
||||
// Assert
|
||||
response.StatusCode.Should().Be(HttpStatusCode.NotFound);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
[Trait("Intent", "Operational")]
|
||||
public async Task ExportDownload_ReturnsNotFound_ForUnknownExportId()
|
||||
{
|
||||
// Act
|
||||
var response = await _client.GetAsync("/api/v1/timeline/export/unknown-export-id/download");
|
||||
|
||||
// Assert
|
||||
response.StatusCode.Should().Be(HttpStatusCode.NotFound);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
[Trait("Intent", "Operational")]
|
||||
public async Task ExportLifecycle_DownloadReturnsGeneratedBundle()
|
||||
{
|
||||
// Arrange
|
||||
const string correlationId = "test-corr-export-lifecycle";
|
||||
await SeedEventsAsync(correlationId, 3);
|
||||
|
||||
// Act
|
||||
var initiateResponse = await _client.PostAsJsonAsync(
|
||||
$"/api/v1/timeline/{correlationId}/export",
|
||||
new ExportRequest { Format = "ndjson", SignBundle = false });
|
||||
|
||||
// Assert initiate
|
||||
initiateResponse.StatusCode.Should().Be(HttpStatusCode.Accepted);
|
||||
var initiated = await initiateResponse.Content.ReadFromJsonAsync<ExportInitiatedResponse>();
|
||||
initiated.Should().NotBeNull();
|
||||
initiated!.ExportId.Should().NotBeNullOrWhiteSpace();
|
||||
|
||||
ExportStatusResponse? status = null;
|
||||
for (var attempt = 0; attempt < 20; attempt++)
|
||||
{
|
||||
var statusResponse = await _client.GetAsync($"/api/v1/timeline/export/{initiated.ExportId}");
|
||||
if (statusResponse.StatusCode == HttpStatusCode.OK)
|
||||
{
|
||||
status = await statusResponse.Content.ReadFromJsonAsync<ExportStatusResponse>();
|
||||
if (status?.Status == "COMPLETED")
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
await Task.Delay(25);
|
||||
}
|
||||
|
||||
status.Should().NotBeNull();
|
||||
status!.Status.Should().Be("COMPLETED");
|
||||
|
||||
var downloadResponse = await _client.GetAsync($"/api/v1/timeline/export/{initiated.ExportId}/download");
|
||||
downloadResponse.StatusCode.Should().Be(HttpStatusCode.OK);
|
||||
var bundleContent = await downloadResponse.Content.ReadAsStringAsync();
|
||||
bundleContent.Should().Contain(correlationId);
|
||||
}
|
||||
|
||||
private async Task SeedEventsAsync(string correlationId, int count)
|
||||
{
|
||||
using var scope = _factory.Services.CreateScope();
|
||||
@@ -268,4 +374,3 @@ internal sealed class NoOpTimelineEventEmitter : ITimelineEventEmitter
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,37 @@
|
||||
using System.Net;
|
||||
using FluentAssertions;
|
||||
using Microsoft.AspNetCore.Hosting;
|
||||
using Microsoft.AspNetCore.Mvc.Testing;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Xunit;
|
||||
|
||||
namespace StellaOps.Timeline.WebService.Tests;
|
||||
|
||||
[Trait("Category", "Integration")]
|
||||
public sealed class TimelineStartupRegistrationTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task Startup_WithPostgresStoreConfiguration_BuildsHost()
|
||||
{
|
||||
await using var factory = new WebApplicationFactory<StellaOps.Timeline.WebService.Program>()
|
||||
.WithWebHostBuilder(builder =>
|
||||
{
|
||||
builder.UseEnvironment("Development");
|
||||
builder.ConfigureAppConfiguration((_, config) =>
|
||||
{
|
||||
config.AddInMemoryCollection(new Dictionary<string, string?>
|
||||
{
|
||||
["Eventing:ServiceName"] = "timeline-tests",
|
||||
["Eventing:UseInMemoryStore"] = "false",
|
||||
["Eventing:ConnectionString"] = "Host=localhost;Port=5432;Database=timeline;Username=postgres;Password=postgres",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
using var client = factory.CreateClient();
|
||||
|
||||
var response = await client.GetAsync("/does-not-exist");
|
||||
|
||||
response.StatusCode.Should().Be(HttpStatusCode.NotFound);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user