Merge branch 'main' of https://git.stella-ops.org/stella-ops.org/git.stella-ops.org
This commit is contained in:
@@ -1,7 +1,9 @@
|
||||
using System.Text.Json.Serialization;
|
||||
using Microsoft.AspNetCore.Builder;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
using Microsoft.Extensions.Options;
|
||||
using StellaOps.Excititor.Core.Evidence;
|
||||
using StellaOps.Excititor.Core.Storage;
|
||||
using StellaOps.Excititor.WebService.Services;
|
||||
using static Program;
|
||||
@@ -9,16 +11,22 @@ using static Program;
|
||||
namespace StellaOps.Excititor.WebService.Endpoints;
|
||||
|
||||
/// <summary>
|
||||
/// Attestation API endpoints (temporarily disabled while Mongo is removed and Postgres storage is adopted).
|
||||
/// Attestation API endpoints for listing and retrieving DSSE attestations.
|
||||
/// </summary>
|
||||
public static class AttestationEndpoints
|
||||
{
|
||||
public static void MapAttestationEndpoints(this WebApplication app)
|
||||
{
|
||||
// GET /attestations/vex/list
|
||||
app.MapGet("/attestations/vex/list", (
|
||||
app.MapGet("/attestations/vex/list", async (
|
||||
HttpContext context,
|
||||
IOptions<VexStorageOptions> storageOptions) =>
|
||||
[FromQuery] string? since,
|
||||
[FromQuery] string? until,
|
||||
[FromQuery] int? limit,
|
||||
[FromQuery] int? offset,
|
||||
IOptions<VexStorageOptions> storageOptions,
|
||||
[FromServices] IVexAttestationStore? attestationStore,
|
||||
CancellationToken cancellationToken) =>
|
||||
{
|
||||
var scopeResult = ScopeAuthorization.RequireScope(context, "vex.read");
|
||||
if (scopeResult is not null)
|
||||
@@ -26,22 +34,55 @@ public static class AttestationEndpoints
|
||||
return scopeResult;
|
||||
}
|
||||
|
||||
if (!TryResolveTenant(context, storageOptions.Value, requireHeader: false, out _, out var tenantError))
|
||||
if (!TryResolveTenant(context, storageOptions.Value, requireHeader: false, out var tenant, out var tenantError))
|
||||
{
|
||||
return tenantError;
|
||||
}
|
||||
|
||||
return Results.Problem(
|
||||
detail: "Attestation listing is temporarily unavailable during Postgres migration (Mongo/BSON removed).",
|
||||
statusCode: StatusCodes.Status503ServiceUnavailable,
|
||||
title: "Service unavailable");
|
||||
if (attestationStore is null)
|
||||
{
|
||||
return Results.Problem(
|
||||
detail: "Attestation store is not configured.",
|
||||
statusCode: StatusCodes.Status503ServiceUnavailable,
|
||||
title: "Service unavailable");
|
||||
}
|
||||
|
||||
var parsedSince = ParseTimestamp(since);
|
||||
var parsedUntil = ParseTimestamp(until);
|
||||
|
||||
var query = new VexAttestationQuery(
|
||||
tenant!,
|
||||
parsedSince,
|
||||
parsedUntil,
|
||||
limit ?? 100,
|
||||
offset ?? 0);
|
||||
|
||||
var result = await attestationStore.ListAsync(query, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var items = result.Items
|
||||
.Select(a => new AttestationListItemDto(
|
||||
a.AttestationId,
|
||||
a.ManifestId,
|
||||
a.MerkleRoot,
|
||||
a.ItemCount,
|
||||
a.AttestedAt))
|
||||
.ToList();
|
||||
|
||||
var response = new AttestationListResponse(
|
||||
items,
|
||||
result.TotalCount,
|
||||
result.HasMore);
|
||||
|
||||
return Results.Ok(response);
|
||||
}).WithName("ListVexAttestations");
|
||||
|
||||
// GET /attestations/vex/{attestationId}
|
||||
app.MapGet("/attestations/vex/{attestationId}", (
|
||||
app.MapGet("/attestations/vex/{attestationId}", async (
|
||||
HttpContext context,
|
||||
string attestationId,
|
||||
IOptions<VexStorageOptions> storageOptions) =>
|
||||
IOptions<VexStorageOptions> storageOptions,
|
||||
[FromServices] IVexAttestationStore? attestationStore,
|
||||
CancellationToken cancellationToken) =>
|
||||
{
|
||||
var scopeResult = ScopeAuthorization.RequireScope(context, "vex.read");
|
||||
if (scopeResult is not null)
|
||||
@@ -49,7 +90,7 @@ public static class AttestationEndpoints
|
||||
return scopeResult;
|
||||
}
|
||||
|
||||
if (!TryResolveTenant(context, storageOptions.Value, requireHeader: false, out _, out var tenantError))
|
||||
if (!TryResolveTenant(context, storageOptions.Value, requireHeader: false, out var tenant, out var tenantError))
|
||||
{
|
||||
return tenantError;
|
||||
}
|
||||
@@ -62,10 +103,69 @@ public static class AttestationEndpoints
|
||||
title: "Validation error");
|
||||
}
|
||||
|
||||
return Results.Problem(
|
||||
detail: "Attestation retrieval is temporarily unavailable during Postgres migration (Mongo/BSON removed).",
|
||||
statusCode: StatusCodes.Status503ServiceUnavailable,
|
||||
title: "Service unavailable");
|
||||
if (attestationStore is null)
|
||||
{
|
||||
return Results.Problem(
|
||||
detail: "Attestation store is not configured.",
|
||||
statusCode: StatusCodes.Status503ServiceUnavailable,
|
||||
title: "Service unavailable");
|
||||
}
|
||||
|
||||
var attestation = await attestationStore.FindByIdAsync(tenant!, attestationId, cancellationToken).ConfigureAwait(false);
|
||||
if (attestation is null)
|
||||
{
|
||||
return Results.NotFound(new
|
||||
{
|
||||
error = new { code = "ERR_ATTESTATION_NOT_FOUND", message = $"Attestation '{attestationId}' not found" }
|
||||
});
|
||||
}
|
||||
|
||||
var response = new AttestationDetailResponse(
|
||||
attestation.AttestationId,
|
||||
attestation.Tenant,
|
||||
attestation.ManifestId,
|
||||
attestation.MerkleRoot,
|
||||
attestation.DsseEnvelopeJson,
|
||||
attestation.DsseEnvelopeHash,
|
||||
attestation.ItemCount,
|
||||
attestation.AttestedAt,
|
||||
attestation.Metadata);
|
||||
|
||||
return Results.Ok(response);
|
||||
}).WithName("GetVexAttestation");
|
||||
}
|
||||
|
||||
private static DateTimeOffset? ParseTimestamp(string? value)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(value))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
return DateTimeOffset.TryParse(value, out var parsed) ? parsed : null;
|
||||
}
|
||||
}
|
||||
|
||||
// Response DTOs
|
||||
public sealed record AttestationListItemDto(
|
||||
[property: JsonPropertyName("attestationId")] string AttestationId,
|
||||
[property: JsonPropertyName("manifestId")] string ManifestId,
|
||||
[property: JsonPropertyName("merkleRoot")] string MerkleRoot,
|
||||
[property: JsonPropertyName("itemCount")] int ItemCount,
|
||||
[property: JsonPropertyName("attestedAt")] DateTimeOffset AttestedAt);
|
||||
|
||||
public sealed record AttestationListResponse(
|
||||
[property: JsonPropertyName("items")] IReadOnlyList<AttestationListItemDto> Items,
|
||||
[property: JsonPropertyName("totalCount")] int TotalCount,
|
||||
[property: JsonPropertyName("hasMore")] bool HasMore);
|
||||
|
||||
public sealed record AttestationDetailResponse(
|
||||
[property: JsonPropertyName("attestationId")] string AttestationId,
|
||||
[property: JsonPropertyName("tenant")] string Tenant,
|
||||
[property: JsonPropertyName("manifestId")] string ManifestId,
|
||||
[property: JsonPropertyName("merkleRoot")] string MerkleRoot,
|
||||
[property: JsonPropertyName("dsseEnvelopeJson")] string DsseEnvelopeJson,
|
||||
[property: JsonPropertyName("dsseEnvelopeHash")] string DsseEnvelopeHash,
|
||||
[property: JsonPropertyName("itemCount")] int ItemCount,
|
||||
[property: JsonPropertyName("attestedAt")] DateTimeOffset AttestedAt,
|
||||
[property: JsonPropertyName("metadata")] IReadOnlyDictionary<string, string> Metadata);
|
||||
|
||||
@@ -82,6 +82,9 @@ services.AddSingleton<IGraphOverlayStore>(sp =>
|
||||
});
|
||||
services.AddSingleton<IVexEvidenceLockerService, VexEvidenceLockerService>();
|
||||
services.AddSingleton<IVexEvidenceAttestor, StellaOps.Excititor.Attestation.Evidence.VexEvidenceAttestor>();
|
||||
// OBS-52/53/54: Attestation storage and timeline event recording
|
||||
services.TryAddSingleton<IVexAttestationStore, InMemoryVexAttestationStore>();
|
||||
services.TryAddSingleton<IVexTimelineEventRecorder, VexTimelineEventRecorder>();
|
||||
services.AddScoped<IVexIngestOrchestrator, VexIngestOrchestrator>();
|
||||
services.AddSingleton<VexStatementBackfillService>();
|
||||
services.AddOptions<ExcititorObservabilityOptions>()
|
||||
|
||||
@@ -12,6 +12,7 @@ using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using StellaOps.Excititor.Core;
|
||||
using StellaOps.Excititor.Core.Orchestration;
|
||||
using StellaOps.Excititor.Core.Storage;
|
||||
using StellaOps.Excititor.Worker.Options;
|
||||
|
||||
namespace StellaOps.Excititor.Worker.Orchestration;
|
||||
@@ -19,10 +20,12 @@ namespace StellaOps.Excititor.Worker.Orchestration;
|
||||
/// <summary>
|
||||
/// Default implementation of <see cref="IVexWorkerOrchestratorClient"/>.
|
||||
/// Stores heartbeats and artifacts locally and, when configured, mirrors them to the Orchestrator worker API.
|
||||
/// Per EXCITITOR-ORCH-32/33: Uses append-only checkpoint store for deterministic persistence and replay.
|
||||
/// </summary>
|
||||
internal sealed class VexWorkerOrchestratorClient : IVexWorkerOrchestratorClient
|
||||
{
|
||||
private readonly IVexConnectorStateRepository _stateRepository;
|
||||
private readonly IAppendOnlyCheckpointStore? _checkpointStore;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly IOptions<VexWorkerOrchestratorOptions> _options;
|
||||
private readonly ILogger<VexWorkerOrchestratorClient> _logger;
|
||||
@@ -36,9 +39,11 @@ internal sealed class VexWorkerOrchestratorClient : IVexWorkerOrchestratorClient
|
||||
TimeProvider timeProvider,
|
||||
IOptions<VexWorkerOrchestratorOptions> options,
|
||||
ILogger<VexWorkerOrchestratorClient> logger,
|
||||
HttpClient? httpClient = null)
|
||||
HttpClient? httpClient = null,
|
||||
IAppendOnlyCheckpointStore? checkpointStore = null)
|
||||
{
|
||||
_stateRepository = stateRepository ?? throw new ArgumentNullException(nameof(stateRepository));
|
||||
_checkpointStore = checkpointStore;
|
||||
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
|
||||
_options = options ?? throw new ArgumentNullException(nameof(options));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
@@ -150,6 +155,18 @@ internal sealed class VexWorkerOrchestratorClient : IVexWorkerOrchestratorClient
|
||||
heartbeat.LastArtifactHash);
|
||||
}
|
||||
|
||||
// Log to append-only checkpoint store (EXCITITOR-ORCH-32/33)
|
||||
await LogCheckpointMutationAsync(
|
||||
context,
|
||||
CheckpointMutation.Heartbeat(
|
||||
context.RunId,
|
||||
timestamp,
|
||||
cursor: null,
|
||||
heartbeat.LastArtifactHash,
|
||||
heartbeat.LastArtifactKind,
|
||||
idempotencyKey: $"hb-{context.RunId}-{sequence}"),
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
await SendRemoteHeartbeatAsync(context, heartbeat, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
@@ -194,6 +211,17 @@ internal sealed class VexWorkerOrchestratorClient : IVexWorkerOrchestratorClient
|
||||
artifact.Kind,
|
||||
artifact.ProviderId);
|
||||
|
||||
// Log to append-only checkpoint store (EXCITITOR-ORCH-32/33)
|
||||
await LogCheckpointMutationAsync(
|
||||
context,
|
||||
CheckpointMutation.Artifact(
|
||||
context.RunId,
|
||||
artifact.CreatedAt,
|
||||
artifact.Hash,
|
||||
artifact.Kind,
|
||||
idempotencyKey: $"artifact-{artifact.Hash}"),
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
await SendRemoteProgressForArtifactAsync(context, artifact, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
@@ -232,6 +260,19 @@ internal sealed class VexWorkerOrchestratorClient : IVexWorkerOrchestratorClient
|
||||
result.ClaimsGenerated,
|
||||
duration);
|
||||
|
||||
// Log to append-only checkpoint store (EXCITITOR-ORCH-32/33)
|
||||
await LogCheckpointMutationAsync(
|
||||
context,
|
||||
CheckpointMutation.Completed(
|
||||
context.RunId,
|
||||
result.CompletedAt,
|
||||
result.LastCheckpoint,
|
||||
result.DocumentsProcessed,
|
||||
result.ClaimsGenerated,
|
||||
result.LastArtifactHash,
|
||||
idempotencyKey: $"complete-{context.RunId}"),
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
await SendRemoteCompletionAsync(context, result, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
@@ -271,6 +312,19 @@ internal sealed class VexWorkerOrchestratorClient : IVexWorkerOrchestratorClient
|
||||
errorCode,
|
||||
retryAfterSeconds);
|
||||
|
||||
// Log to append-only checkpoint store (EXCITITOR-ORCH-32/33)
|
||||
await LogCheckpointMutationAsync(
|
||||
context,
|
||||
CheckpointMutation.Failed(
|
||||
context.RunId,
|
||||
now,
|
||||
errorCode,
|
||||
errorMessage,
|
||||
retryAfterSeconds,
|
||||
state.LastCheckpoint?.ToString("O"),
|
||||
idempotencyKey: $"fail-{context.RunId}"),
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
await SendRemoteCompletionAsync(
|
||||
context,
|
||||
new VexWorkerJobResult(0, 0, state.LastCheckpoint, state.LastArtifactHash, now),
|
||||
@@ -363,6 +417,20 @@ internal sealed class VexWorkerOrchestratorClient : IVexWorkerOrchestratorClient
|
||||
context.ConnectorId,
|
||||
checkpoint.Cursor ?? "(none)",
|
||||
checkpoint.ProcessedDigests.Length);
|
||||
|
||||
// Log to append-only checkpoint store (EXCITITOR-ORCH-32/33)
|
||||
if (!string.IsNullOrEmpty(checkpoint.Cursor))
|
||||
{
|
||||
await LogCheckpointMutationAsync(
|
||||
context,
|
||||
CheckpointMutation.CursorUpdate(
|
||||
context.RunId,
|
||||
checkpoint.LastProcessedAt ?? now,
|
||||
checkpoint.Cursor,
|
||||
checkpoint.ProcessedDigests.Length,
|
||||
idempotencyKey: $"cursor-{context.RunId}-{checkpoint.Cursor}"),
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
public async ValueTask<VexWorkerCheckpoint?> LoadCheckpointAsync(
|
||||
@@ -647,6 +715,93 @@ internal sealed class VexWorkerOrchestratorClient : IVexWorkerOrchestratorClient
|
||||
|
||||
private string Serialize(object value) => JsonSerializer.Serialize(value, _serializerOptions);
|
||||
|
||||
/// <summary>
|
||||
/// Logs a checkpoint mutation to the append-only store for deterministic replay.
|
||||
/// Per EXCITITOR-ORCH-32/33: All checkpoint mutations are logged for audit/replay.
|
||||
/// </summary>
|
||||
private async ValueTask LogCheckpointMutationAsync(
|
||||
VexWorkerJobContext context,
|
||||
CheckpointMutation mutation,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (_checkpointStore is null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var result = await _checkpointStore.AppendAsync(
|
||||
context.Tenant,
|
||||
context.ConnectorId,
|
||||
mutation,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (_options.Value.EnableVerboseLogging)
|
||||
{
|
||||
_logger.LogDebug(
|
||||
"Checkpoint mutation logged: runId={RunId} type={Type} seq={Sequence} duplicate={IsDuplicate}",
|
||||
context.RunId,
|
||||
mutation.Type,
|
||||
result.SequenceNumber,
|
||||
result.WasDuplicate);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex,
|
||||
"Failed to log checkpoint mutation for connector {ConnectorId}: {Type}",
|
||||
context.ConnectorId,
|
||||
mutation.Type);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets the append-only mutation log for a connector.
|
||||
/// Per EXCITITOR-ORCH-32/33: Enables deterministic replay.
|
||||
/// </summary>
|
||||
public async ValueTask<IReadOnlyList<CheckpointMutationEvent>> GetCheckpointMutationLogAsync(
|
||||
string tenant,
|
||||
string connectorId,
|
||||
long? sinceSequence = null,
|
||||
int limit = 100,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (_checkpointStore is null)
|
||||
{
|
||||
return Array.Empty<CheckpointMutationEvent>();
|
||||
}
|
||||
|
||||
return await _checkpointStore.GetMutationLogAsync(
|
||||
tenant,
|
||||
connectorId,
|
||||
sinceSequence,
|
||||
limit,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Replays checkpoint mutations to reconstruct state at a specific sequence.
|
||||
/// Per EXCITITOR-ORCH-32/33: Deterministic replay for audit/recovery.
|
||||
/// </summary>
|
||||
public async ValueTask<CheckpointState?> ReplayCheckpointToSequenceAsync(
|
||||
string tenant,
|
||||
string connectorId,
|
||||
long upToSequence,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (_checkpointStore is null)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
return await _checkpointStore.ReplayToSequenceAsync(
|
||||
tenant,
|
||||
connectorId,
|
||||
upToSequence,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private sealed record ClaimRequest(string WorkerId, string? TaskRunnerId, string? JobType, int? LeaseSeconds, string? IdempotencyKey);
|
||||
|
||||
private sealed record ClaimResponse(
|
||||
|
||||
@@ -0,0 +1,178 @@
|
||||
using System.Collections.Immutable;
|
||||
|
||||
namespace StellaOps.Excititor.Core.Evidence;
|
||||
|
||||
/// <summary>
|
||||
/// Stored attestation record with DSSE envelope and manifest metadata.
|
||||
/// </summary>
|
||||
public sealed record VexStoredAttestation
|
||||
{
|
||||
public VexStoredAttestation(
|
||||
string attestationId,
|
||||
string tenant,
|
||||
string manifestId,
|
||||
string merkleRoot,
|
||||
string dsseEnvelopeJson,
|
||||
string dsseEnvelopeHash,
|
||||
int itemCount,
|
||||
DateTimeOffset attestedAt,
|
||||
ImmutableDictionary<string, string>? metadata = null)
|
||||
{
|
||||
AttestationId = EnsureNotNullOrWhiteSpace(attestationId, nameof(attestationId));
|
||||
Tenant = EnsureNotNullOrWhiteSpace(tenant, nameof(tenant)).ToLowerInvariant();
|
||||
ManifestId = EnsureNotNullOrWhiteSpace(manifestId, nameof(manifestId));
|
||||
MerkleRoot = EnsureNotNullOrWhiteSpace(merkleRoot, nameof(merkleRoot));
|
||||
DsseEnvelopeJson = EnsureNotNullOrWhiteSpace(dsseEnvelopeJson, nameof(dsseEnvelopeJson));
|
||||
DsseEnvelopeHash = EnsureNotNullOrWhiteSpace(dsseEnvelopeHash, nameof(dsseEnvelopeHash));
|
||||
ItemCount = itemCount;
|
||||
AttestedAt = attestedAt.ToUniversalTime();
|
||||
Metadata = metadata ?? ImmutableDictionary<string, string>.Empty;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Unique attestation identifier.
|
||||
/// </summary>
|
||||
public string AttestationId { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Tenant this attestation belongs to.
|
||||
/// </summary>
|
||||
public string Tenant { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Manifest ID the attestation covers.
|
||||
/// </summary>
|
||||
public string ManifestId { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Merkle root of the manifest items.
|
||||
/// </summary>
|
||||
public string MerkleRoot { get; }
|
||||
|
||||
/// <summary>
|
||||
/// DSSE envelope as JSON string.
|
||||
/// </summary>
|
||||
public string DsseEnvelopeJson { get; }
|
||||
|
||||
/// <summary>
|
||||
/// SHA-256 hash of the DSSE envelope.
|
||||
/// </summary>
|
||||
public string DsseEnvelopeHash { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Number of items in the manifest.
|
||||
/// </summary>
|
||||
public int ItemCount { get; }
|
||||
|
||||
/// <summary>
|
||||
/// When the attestation was created.
|
||||
/// </summary>
|
||||
public DateTimeOffset AttestedAt { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Additional metadata.
|
||||
/// </summary>
|
||||
public ImmutableDictionary<string, string> Metadata { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Creates a stored attestation from an attestation result.
|
||||
/// </summary>
|
||||
public static VexStoredAttestation FromResult(VexEvidenceAttestationResult result)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(result);
|
||||
|
||||
return new VexStoredAttestation(
|
||||
result.AttestationId,
|
||||
result.SignedManifest.Tenant,
|
||||
result.SignedManifest.ManifestId,
|
||||
result.SignedManifest.MerkleRoot,
|
||||
result.DsseEnvelopeJson,
|
||||
result.DsseEnvelopeHash,
|
||||
result.SignedManifest.Items.Length,
|
||||
result.AttestedAt,
|
||||
result.SignedManifest.Metadata);
|
||||
}
|
||||
|
||||
private static string EnsureNotNullOrWhiteSpace(string value, string name)
|
||||
=> string.IsNullOrWhiteSpace(value) ? throw new ArgumentException($"{name} must be provided.", name) : value.Trim();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Query parameters for attestation listing.
|
||||
/// </summary>
|
||||
public sealed record VexAttestationQuery
|
||||
{
|
||||
public VexAttestationQuery(
|
||||
string tenant,
|
||||
DateTimeOffset? since = null,
|
||||
DateTimeOffset? until = null,
|
||||
int limit = 100,
|
||||
int offset = 0)
|
||||
{
|
||||
Tenant = EnsureNotNullOrWhiteSpace(tenant, nameof(tenant)).ToLowerInvariant();
|
||||
Since = since;
|
||||
Until = until;
|
||||
Limit = Math.Clamp(limit, 1, 1000);
|
||||
Offset = Math.Max(0, offset);
|
||||
}
|
||||
|
||||
public string Tenant { get; }
|
||||
public DateTimeOffset? Since { get; }
|
||||
public DateTimeOffset? Until { get; }
|
||||
public int Limit { get; }
|
||||
public int Offset { get; }
|
||||
|
||||
private static string EnsureNotNullOrWhiteSpace(string value, string name)
|
||||
=> string.IsNullOrWhiteSpace(value) ? throw new ArgumentException($"{name} must be provided.", name) : value.Trim();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Result of an attestation list query.
|
||||
/// </summary>
|
||||
public sealed record VexAttestationListResult
|
||||
{
|
||||
public VexAttestationListResult(
|
||||
IReadOnlyList<VexStoredAttestation> items,
|
||||
int totalCount,
|
||||
bool hasMore)
|
||||
{
|
||||
Items = items ?? Array.Empty<VexStoredAttestation>();
|
||||
TotalCount = totalCount;
|
||||
HasMore = hasMore;
|
||||
}
|
||||
|
||||
public IReadOnlyList<VexStoredAttestation> Items { get; }
|
||||
public int TotalCount { get; }
|
||||
public bool HasMore { get; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Storage interface for VEX attestations.
|
||||
/// </summary>
|
||||
public interface IVexAttestationStore
|
||||
{
|
||||
/// <summary>
|
||||
/// Saves an attestation to the store.
|
||||
/// </summary>
|
||||
ValueTask SaveAsync(VexStoredAttestation attestation, CancellationToken cancellationToken);
|
||||
|
||||
/// <summary>
|
||||
/// Finds an attestation by ID.
|
||||
/// </summary>
|
||||
ValueTask<VexStoredAttestation?> FindByIdAsync(string tenant, string attestationId, CancellationToken cancellationToken);
|
||||
|
||||
/// <summary>
|
||||
/// Finds an attestation by manifest ID.
|
||||
/// </summary>
|
||||
ValueTask<VexStoredAttestation?> FindByManifestIdAsync(string tenant, string manifestId, CancellationToken cancellationToken);
|
||||
|
||||
/// <summary>
|
||||
/// Lists attestations matching the query.
|
||||
/// </summary>
|
||||
ValueTask<VexAttestationListResult> ListAsync(VexAttestationQuery query, CancellationToken cancellationToken);
|
||||
|
||||
/// <summary>
|
||||
/// Counts attestations for a tenant.
|
||||
/// </summary>
|
||||
ValueTask<int> CountAsync(string tenant, CancellationToken cancellationToken);
|
||||
}
|
||||
@@ -0,0 +1,324 @@
|
||||
using System.Collections.Immutable;
|
||||
|
||||
namespace StellaOps.Excititor.Core.Evidence;
|
||||
|
||||
/// <summary>
|
||||
/// Event types for VEX evidence timeline recording.
|
||||
/// </summary>
|
||||
public static class VexTimelineEventTypes
|
||||
{
|
||||
public const string AttestationCreated = "vex.attestation.created";
|
||||
public const string AttestationVerified = "vex.attestation.verified";
|
||||
public const string AttestationFailed = "vex.attestation.failed";
|
||||
public const string ManifestBuilt = "vex.manifest.built";
|
||||
public const string ManifestVerified = "vex.manifest.verified";
|
||||
public const string ManifestVerificationFailed = "vex.manifest.verification_failed";
|
||||
public const string EvidenceBatchProcessed = "vex.evidence.batch_processed";
|
||||
public const string EvidenceBatchFailed = "vex.evidence.batch_failed";
|
||||
public const string LockerSealed = "vex.locker.sealed";
|
||||
public const string LockerOpened = "vex.locker.opened";
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Timeline event for VEX evidence operations.
|
||||
/// </summary>
|
||||
public sealed record VexTimelineEvent
|
||||
{
|
||||
public VexTimelineEvent(
|
||||
string eventId,
|
||||
string eventType,
|
||||
string tenant,
|
||||
DateTimeOffset occurredAt,
|
||||
string? manifestId = null,
|
||||
string? attestationId = null,
|
||||
string? merkleRoot = null,
|
||||
int? itemCount = null,
|
||||
string? errorCode = null,
|
||||
string? message = null,
|
||||
ImmutableDictionary<string, string>? metadata = null)
|
||||
{
|
||||
EventId = EnsureNotNullOrWhiteSpace(eventId, nameof(eventId));
|
||||
EventType = EnsureNotNullOrWhiteSpace(eventType, nameof(eventType));
|
||||
Tenant = EnsureNotNullOrWhiteSpace(tenant, nameof(tenant)).ToLowerInvariant();
|
||||
OccurredAt = occurredAt.ToUniversalTime();
|
||||
ManifestId = TrimToNull(manifestId);
|
||||
AttestationId = TrimToNull(attestationId);
|
||||
MerkleRoot = TrimToNull(merkleRoot);
|
||||
ItemCount = itemCount;
|
||||
ErrorCode = TrimToNull(errorCode);
|
||||
Message = TrimToNull(message);
|
||||
Metadata = metadata ?? ImmutableDictionary<string, string>.Empty;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Unique event identifier.
|
||||
/// </summary>
|
||||
public string EventId { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Type of event (see <see cref="VexTimelineEventTypes"/>).
|
||||
/// </summary>
|
||||
public string EventType { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Tenant this event belongs to.
|
||||
/// </summary>
|
||||
public string Tenant { get; }
|
||||
|
||||
/// <summary>
|
||||
/// When the event occurred.
|
||||
/// </summary>
|
||||
public DateTimeOffset OccurredAt { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Related manifest ID if applicable.
|
||||
/// </summary>
|
||||
public string? ManifestId { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Related attestation ID if applicable.
|
||||
/// </summary>
|
||||
public string? AttestationId { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Merkle root if applicable.
|
||||
/// </summary>
|
||||
public string? MerkleRoot { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Number of items involved if applicable.
|
||||
/// </summary>
|
||||
public int? ItemCount { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Error code for failure events.
|
||||
/// </summary>
|
||||
public string? ErrorCode { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Human-readable message.
|
||||
/// </summary>
|
||||
public string? Message { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Additional metadata.
|
||||
/// </summary>
|
||||
public ImmutableDictionary<string, string> Metadata { get; }
|
||||
|
||||
private static string EnsureNotNullOrWhiteSpace(string value, string name)
|
||||
=> string.IsNullOrWhiteSpace(value) ? throw new ArgumentException($"{name} must be provided.", name) : value.Trim();
|
||||
|
||||
private static string? TrimToNull(string? value)
|
||||
=> string.IsNullOrWhiteSpace(value) ? null : value.Trim();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Interface for recording VEX evidence timeline events.
|
||||
/// </summary>
|
||||
public interface IVexTimelineEventRecorder
|
||||
{
|
||||
/// <summary>
|
||||
/// Records a timeline event.
|
||||
/// </summary>
|
||||
ValueTask RecordAsync(VexTimelineEvent evt, CancellationToken cancellationToken);
|
||||
|
||||
/// <summary>
|
||||
/// Records an attestation created event.
|
||||
/// </summary>
|
||||
ValueTask RecordAttestationCreatedAsync(
|
||||
string tenant,
|
||||
string attestationId,
|
||||
string manifestId,
|
||||
string merkleRoot,
|
||||
int itemCount,
|
||||
CancellationToken cancellationToken);
|
||||
|
||||
/// <summary>
|
||||
/// Records an attestation verification event.
|
||||
/// </summary>
|
||||
ValueTask RecordAttestationVerifiedAsync(
|
||||
string tenant,
|
||||
string attestationId,
|
||||
bool isValid,
|
||||
string? errorCode,
|
||||
string? message,
|
||||
CancellationToken cancellationToken);
|
||||
|
||||
/// <summary>
|
||||
/// Records a manifest built event.
|
||||
/// </summary>
|
||||
ValueTask RecordManifestBuiltAsync(
|
||||
string tenant,
|
||||
string manifestId,
|
||||
string merkleRoot,
|
||||
int itemCount,
|
||||
CancellationToken cancellationToken);
|
||||
|
||||
/// <summary>
|
||||
/// Records an evidence batch processed event.
|
||||
/// </summary>
|
||||
ValueTask RecordBatchProcessedAsync(
|
||||
string tenant,
|
||||
int itemCount,
|
||||
string? manifestId,
|
||||
CancellationToken cancellationToken);
|
||||
|
||||
/// <summary>
|
||||
/// Lists timeline events for a tenant.
|
||||
/// </summary>
|
||||
ValueTask<IReadOnlyList<VexTimelineEvent>> ListEventsAsync(
|
||||
string tenant,
|
||||
string? eventType,
|
||||
DateTimeOffset? since,
|
||||
int limit,
|
||||
CancellationToken cancellationToken);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Default implementation of <see cref="IVexTimelineEventRecorder"/>.
|
||||
/// </summary>
|
||||
public sealed class VexTimelineEventRecorder : IVexTimelineEventRecorder
|
||||
{
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly List<VexTimelineEvent> _events = new();
|
||||
private readonly object _lock = new();
|
||||
private long _sequence;
|
||||
|
||||
public VexTimelineEventRecorder(TimeProvider? timeProvider = null)
|
||||
{
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
}
|
||||
|
||||
public ValueTask RecordAsync(VexTimelineEvent evt, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(evt);
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
lock (_lock)
|
||||
{
|
||||
_events.Add(evt);
|
||||
}
|
||||
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
|
||||
public ValueTask RecordAttestationCreatedAsync(
|
||||
string tenant,
|
||||
string attestationId,
|
||||
string manifestId,
|
||||
string merkleRoot,
|
||||
int itemCount,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var evt = new VexTimelineEvent(
|
||||
CreateEventId(),
|
||||
VexTimelineEventTypes.AttestationCreated,
|
||||
tenant,
|
||||
_timeProvider.GetUtcNow(),
|
||||
manifestId,
|
||||
attestationId,
|
||||
merkleRoot,
|
||||
itemCount);
|
||||
|
||||
return RecordAsync(evt, cancellationToken);
|
||||
}
|
||||
|
||||
public ValueTask RecordAttestationVerifiedAsync(
|
||||
string tenant,
|
||||
string attestationId,
|
||||
bool isValid,
|
||||
string? errorCode,
|
||||
string? message,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var eventType = isValid
|
||||
? VexTimelineEventTypes.AttestationVerified
|
||||
: VexTimelineEventTypes.AttestationFailed;
|
||||
|
||||
var evt = new VexTimelineEvent(
|
||||
CreateEventId(),
|
||||
eventType,
|
||||
tenant,
|
||||
_timeProvider.GetUtcNow(),
|
||||
attestationId: attestationId,
|
||||
errorCode: errorCode,
|
||||
message: message);
|
||||
|
||||
return RecordAsync(evt, cancellationToken);
|
||||
}
|
||||
|
||||
public ValueTask RecordManifestBuiltAsync(
|
||||
string tenant,
|
||||
string manifestId,
|
||||
string merkleRoot,
|
||||
int itemCount,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var evt = new VexTimelineEvent(
|
||||
CreateEventId(),
|
||||
VexTimelineEventTypes.ManifestBuilt,
|
||||
tenant,
|
||||
_timeProvider.GetUtcNow(),
|
||||
manifestId,
|
||||
merkleRoot: merkleRoot,
|
||||
itemCount: itemCount);
|
||||
|
||||
return RecordAsync(evt, cancellationToken);
|
||||
}
|
||||
|
||||
public ValueTask RecordBatchProcessedAsync(
|
||||
string tenant,
|
||||
int itemCount,
|
||||
string? manifestId,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var evt = new VexTimelineEvent(
|
||||
CreateEventId(),
|
||||
VexTimelineEventTypes.EvidenceBatchProcessed,
|
||||
tenant,
|
||||
_timeProvider.GetUtcNow(),
|
||||
manifestId,
|
||||
itemCount: itemCount);
|
||||
|
||||
return RecordAsync(evt, cancellationToken);
|
||||
}
|
||||
|
||||
public ValueTask<IReadOnlyList<VexTimelineEvent>> ListEventsAsync(
|
||||
string tenant,
|
||||
string? eventType,
|
||||
DateTimeOffset? since,
|
||||
int limit,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
lock (_lock)
|
||||
{
|
||||
var query = _events
|
||||
.Where(e => string.Equals(e.Tenant, tenant, StringComparison.OrdinalIgnoreCase));
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(eventType))
|
||||
{
|
||||
query = query.Where(e => string.Equals(e.EventType, eventType, StringComparison.OrdinalIgnoreCase));
|
||||
}
|
||||
|
||||
if (since.HasValue)
|
||||
{
|
||||
query = query.Where(e => e.OccurredAt >= since.Value);
|
||||
}
|
||||
|
||||
var results = query
|
||||
.OrderByDescending(e => e.OccurredAt)
|
||||
.Take(Math.Clamp(limit, 1, 1000))
|
||||
.ToList();
|
||||
|
||||
return ValueTask.FromResult<IReadOnlyList<VexTimelineEvent>>(results);
|
||||
}
|
||||
}
|
||||
|
||||
private string CreateEventId()
|
||||
{
|
||||
var seq = Interlocked.Increment(ref _sequence);
|
||||
return $"evt:{_timeProvider.GetUtcNow():yyyyMMddHHmmss}:{seq:D6}";
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,498 @@
|
||||
namespace StellaOps.Excititor.Core.Storage;
|
||||
|
||||
/// <summary>
|
||||
/// Append-only checkpoint store for deterministic connector state persistence.
|
||||
/// Per EXCITITOR-ORCH-32/33: Deterministic checkpoint persistence using Postgres append-only store.
|
||||
/// Mutations are logged and never modified; current state is derived from the log.
|
||||
/// </summary>
|
||||
public interface IAppendOnlyCheckpointStore
|
||||
{
|
||||
/// <summary>
|
||||
/// Appends a new checkpoint mutation for a connector.
|
||||
/// Thread-safe and idempotent (duplicate mutations are deduplicated by sequence).
|
||||
/// </summary>
|
||||
/// <param name="tenant">Tenant identifier.</param>
|
||||
/// <param name="connectorId">Connector identifier.</param>
|
||||
/// <param name="mutation">The checkpoint mutation to append.</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
/// <returns>The append result with sequence number.</returns>
|
||||
ValueTask<AppendCheckpointResult> AppendAsync(
|
||||
string tenant,
|
||||
string connectorId,
|
||||
CheckpointMutation mutation,
|
||||
CancellationToken cancellationToken);
|
||||
|
||||
/// <summary>
|
||||
/// Gets the current checkpoint state (derived from mutation log).
|
||||
/// </summary>
|
||||
/// <param name="tenant">Tenant identifier.</param>
|
||||
/// <param name="connectorId">Connector identifier.</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
/// <returns>Current checkpoint state or null if none exists.</returns>
|
||||
ValueTask<CheckpointState?> GetCurrentStateAsync(
|
||||
string tenant,
|
||||
string connectorId,
|
||||
CancellationToken cancellationToken);
|
||||
|
||||
/// <summary>
|
||||
/// Gets the mutation log for a connector (for audit/replay).
|
||||
/// </summary>
|
||||
/// <param name="tenant">Tenant identifier.</param>
|
||||
/// <param name="connectorId">Connector identifier.</param>
|
||||
/// <param name="sinceSequence">Return mutations after this sequence number (exclusive).</param>
|
||||
/// <param name="limit">Maximum number of mutations to return.</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
/// <returns>List of mutations in chronological order.</returns>
|
||||
ValueTask<IReadOnlyList<CheckpointMutationEvent>> GetMutationLogAsync(
|
||||
string tenant,
|
||||
string connectorId,
|
||||
long? sinceSequence,
|
||||
int limit,
|
||||
CancellationToken cancellationToken);
|
||||
|
||||
/// <summary>
|
||||
/// Replays mutations to reconstruct state at a specific point in time.
|
||||
/// </summary>
|
||||
/// <param name="tenant">Tenant identifier.</param>
|
||||
/// <param name="connectorId">Connector identifier.</param>
|
||||
/// <param name="upToSequence">Replay mutations up to and including this sequence.</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
/// <returns>State as of the specified sequence.</returns>
|
||||
ValueTask<CheckpointState?> ReplayToSequenceAsync(
|
||||
string tenant,
|
||||
string connectorId,
|
||||
long upToSequence,
|
||||
CancellationToken cancellationToken);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Result of an append operation.
|
||||
/// </summary>
|
||||
public sealed record AppendCheckpointResult
|
||||
{
|
||||
private AppendCheckpointResult(
|
||||
bool success,
|
||||
long sequenceNumber,
|
||||
bool wasDuplicate,
|
||||
CheckpointState currentState,
|
||||
string? errorMessage = null)
|
||||
{
|
||||
Success = success;
|
||||
SequenceNumber = sequenceNumber;
|
||||
WasDuplicate = wasDuplicate;
|
||||
CurrentState = currentState;
|
||||
ErrorMessage = errorMessage;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Whether the append was successful.
|
||||
/// </summary>
|
||||
public bool Success { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Monotonic sequence number for this mutation.
|
||||
/// </summary>
|
||||
public long SequenceNumber { get; }
|
||||
|
||||
/// <summary>
|
||||
/// True if this mutation was a duplicate (idempotent).
|
||||
/// </summary>
|
||||
public bool WasDuplicate { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Current state after this mutation.
|
||||
/// </summary>
|
||||
public CheckpointState CurrentState { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Error message if the append failed.
|
||||
/// </summary>
|
||||
public string? ErrorMessage { get; }
|
||||
|
||||
public static AppendCheckpointResult Appended(long sequenceNumber, CheckpointState state)
|
||||
=> new(true, sequenceNumber, wasDuplicate: false, state);
|
||||
|
||||
public static AppendCheckpointResult Duplicate(long sequenceNumber, CheckpointState state)
|
||||
=> new(true, sequenceNumber, wasDuplicate: true, state);
|
||||
|
||||
public static AppendCheckpointResult Failed(string error)
|
||||
=> new(false, 0, wasDuplicate: false, CheckpointState.Empty, error);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Checkpoint mutation to be appended to the log.
|
||||
/// </summary>
|
||||
public sealed record CheckpointMutation(
|
||||
CheckpointMutationType Type,
|
||||
Guid RunId,
|
||||
DateTimeOffset Timestamp,
|
||||
string? Cursor,
|
||||
string? ArtifactHash,
|
||||
string? ArtifactKind,
|
||||
int? DocumentsProcessed,
|
||||
int? ClaimsGenerated,
|
||||
string? ErrorCode,
|
||||
string? ErrorMessage,
|
||||
int? RetryAfterSeconds,
|
||||
string? IdempotencyKey = null)
|
||||
{
|
||||
/// <summary>
|
||||
/// Creates a heartbeat mutation.
|
||||
/// </summary>
|
||||
public static CheckpointMutation Heartbeat(
|
||||
Guid runId,
|
||||
DateTimeOffset timestamp,
|
||||
string? cursor = null,
|
||||
string? artifactHash = null,
|
||||
string? artifactKind = null,
|
||||
string? idempotencyKey = null)
|
||||
=> new(
|
||||
CheckpointMutationType.Heartbeat,
|
||||
runId,
|
||||
timestamp,
|
||||
cursor,
|
||||
artifactHash,
|
||||
artifactKind,
|
||||
DocumentsProcessed: null,
|
||||
ClaimsGenerated: null,
|
||||
ErrorCode: null,
|
||||
ErrorMessage: null,
|
||||
RetryAfterSeconds: null,
|
||||
idempotencyKey);
|
||||
|
||||
/// <summary>
|
||||
/// Creates a checkpoint cursor update mutation.
|
||||
/// </summary>
|
||||
public static CheckpointMutation CursorUpdate(
|
||||
Guid runId,
|
||||
DateTimeOffset timestamp,
|
||||
string cursor,
|
||||
int? documentsProcessed = null,
|
||||
string? idempotencyKey = null)
|
||||
=> new(
|
||||
CheckpointMutationType.CursorUpdate,
|
||||
runId,
|
||||
timestamp,
|
||||
cursor,
|
||||
ArtifactHash: null,
|
||||
ArtifactKind: null,
|
||||
documentsProcessed,
|
||||
ClaimsGenerated: null,
|
||||
ErrorCode: null,
|
||||
ErrorMessage: null,
|
||||
RetryAfterSeconds: null,
|
||||
idempotencyKey);
|
||||
|
||||
/// <summary>
|
||||
/// Creates a completion mutation.
|
||||
/// </summary>
|
||||
public static CheckpointMutation Completed(
|
||||
Guid runId,
|
||||
DateTimeOffset timestamp,
|
||||
string? cursor,
|
||||
int documentsProcessed,
|
||||
int claimsGenerated,
|
||||
string? artifactHash = null,
|
||||
string? idempotencyKey = null)
|
||||
=> new(
|
||||
CheckpointMutationType.Completed,
|
||||
runId,
|
||||
timestamp,
|
||||
cursor,
|
||||
artifactHash,
|
||||
ArtifactKind: null,
|
||||
documentsProcessed,
|
||||
claimsGenerated,
|
||||
ErrorCode: null,
|
||||
ErrorMessage: null,
|
||||
RetryAfterSeconds: null,
|
||||
idempotencyKey);
|
||||
|
||||
/// <summary>
|
||||
/// Creates a failure mutation.
|
||||
/// </summary>
|
||||
public static CheckpointMutation Failed(
|
||||
Guid runId,
|
||||
DateTimeOffset timestamp,
|
||||
string errorCode,
|
||||
string? errorMessage = null,
|
||||
int? retryAfterSeconds = null,
|
||||
string? cursor = null,
|
||||
string? idempotencyKey = null)
|
||||
=> new(
|
||||
CheckpointMutationType.Failed,
|
||||
runId,
|
||||
timestamp,
|
||||
cursor,
|
||||
ArtifactHash: null,
|
||||
ArtifactKind: null,
|
||||
DocumentsProcessed: null,
|
||||
ClaimsGenerated: null,
|
||||
errorCode,
|
||||
errorMessage,
|
||||
retryAfterSeconds,
|
||||
idempotencyKey);
|
||||
|
||||
/// <summary>
|
||||
/// Creates an artifact mutation.
|
||||
/// </summary>
|
||||
public static CheckpointMutation Artifact(
|
||||
Guid runId,
|
||||
DateTimeOffset timestamp,
|
||||
string artifactHash,
|
||||
string artifactKind,
|
||||
string? idempotencyKey = null)
|
||||
=> new(
|
||||
CheckpointMutationType.Artifact,
|
||||
runId,
|
||||
timestamp,
|
||||
Cursor: null,
|
||||
artifactHash,
|
||||
artifactKind,
|
||||
DocumentsProcessed: null,
|
||||
ClaimsGenerated: null,
|
||||
ErrorCode: null,
|
||||
ErrorMessage: null,
|
||||
RetryAfterSeconds: null,
|
||||
idempotencyKey);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Types of checkpoint mutations.
|
||||
/// </summary>
|
||||
public enum CheckpointMutationType
|
||||
{
|
||||
/// <summary>
|
||||
/// Run started.
|
||||
/// </summary>
|
||||
Started,
|
||||
|
||||
/// <summary>
|
||||
/// Heartbeat/progress update.
|
||||
/// </summary>
|
||||
Heartbeat,
|
||||
|
||||
/// <summary>
|
||||
/// Checkpoint cursor update.
|
||||
/// </summary>
|
||||
CursorUpdate,
|
||||
|
||||
/// <summary>
|
||||
/// Artifact recorded.
|
||||
/// </summary>
|
||||
Artifact,
|
||||
|
||||
/// <summary>
|
||||
/// Run completed successfully.
|
||||
/// </summary>
|
||||
Completed,
|
||||
|
||||
/// <summary>
|
||||
/// Run failed.
|
||||
/// </summary>
|
||||
Failed
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Persisted checkpoint mutation event (with sequence number).
|
||||
/// </summary>
|
||||
public sealed record CheckpointMutationEvent(
|
||||
long SequenceNumber,
|
||||
CheckpointMutationType Type,
|
||||
Guid RunId,
|
||||
DateTimeOffset Timestamp,
|
||||
string? Cursor,
|
||||
string? ArtifactHash,
|
||||
string? ArtifactKind,
|
||||
int? DocumentsProcessed,
|
||||
int? ClaimsGenerated,
|
||||
string? ErrorCode,
|
||||
string? ErrorMessage,
|
||||
int? RetryAfterSeconds,
|
||||
string? IdempotencyKey);
|
||||
|
||||
/// <summary>
|
||||
/// Current checkpoint state (derived from mutation log).
|
||||
/// </summary>
|
||||
public sealed record CheckpointState
|
||||
{
|
||||
public CheckpointState(
|
||||
string connectorId,
|
||||
string? cursor,
|
||||
DateTimeOffset lastUpdated,
|
||||
Guid? lastRunId,
|
||||
CheckpointMutationType? lastMutationType,
|
||||
string? lastArtifactHash,
|
||||
string? lastArtifactKind,
|
||||
int totalDocumentsProcessed,
|
||||
int totalClaimsGenerated,
|
||||
int successCount,
|
||||
int failureCount,
|
||||
string? lastErrorCode,
|
||||
DateTimeOffset? nextEligibleRun,
|
||||
long latestSequenceNumber)
|
||||
{
|
||||
ConnectorId = connectorId ?? throw new ArgumentNullException(nameof(connectorId));
|
||||
Cursor = cursor;
|
||||
LastUpdated = lastUpdated;
|
||||
LastRunId = lastRunId;
|
||||
LastMutationType = lastMutationType;
|
||||
LastArtifactHash = lastArtifactHash;
|
||||
LastArtifactKind = lastArtifactKind;
|
||||
TotalDocumentsProcessed = totalDocumentsProcessed;
|
||||
TotalClaimsGenerated = totalClaimsGenerated;
|
||||
SuccessCount = successCount;
|
||||
FailureCount = failureCount;
|
||||
LastErrorCode = lastErrorCode;
|
||||
NextEligibleRun = nextEligibleRun;
|
||||
LatestSequenceNumber = latestSequenceNumber;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Connector identifier.
|
||||
/// </summary>
|
||||
public string ConnectorId { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Current checkpoint cursor.
|
||||
/// </summary>
|
||||
public string? Cursor { get; }
|
||||
|
||||
/// <summary>
|
||||
/// When the checkpoint was last updated.
|
||||
/// </summary>
|
||||
public DateTimeOffset LastUpdated { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Last run ID.
|
||||
/// </summary>
|
||||
public Guid? LastRunId { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Last mutation type.
|
||||
/// </summary>
|
||||
public CheckpointMutationType? LastMutationType { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Last artifact hash.
|
||||
/// </summary>
|
||||
public string? LastArtifactHash { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Last artifact kind.
|
||||
/// </summary>
|
||||
public string? LastArtifactKind { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Total documents processed across all runs.
|
||||
/// </summary>
|
||||
public int TotalDocumentsProcessed { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Total claims generated across all runs.
|
||||
/// </summary>
|
||||
public int TotalClaimsGenerated { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Number of successful runs.
|
||||
/// </summary>
|
||||
public int SuccessCount { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Number of failed runs.
|
||||
/// </summary>
|
||||
public int FailureCount { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Last error code (from most recent failure).
|
||||
/// </summary>
|
||||
public string? LastErrorCode { get; }
|
||||
|
||||
/// <summary>
|
||||
/// When the connector is next eligible to run.
|
||||
/// </summary>
|
||||
public DateTimeOffset? NextEligibleRun { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Latest sequence number in the mutation log.
|
||||
/// </summary>
|
||||
public long LatestSequenceNumber { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Whether the connector is eligible to run now.
|
||||
/// </summary>
|
||||
public bool IsEligibleToRun(DateTimeOffset now)
|
||||
=> NextEligibleRun is null || now >= NextEligibleRun.Value;
|
||||
|
||||
/// <summary>
|
||||
/// Empty state for a new connector.
|
||||
/// </summary>
|
||||
public static CheckpointState Empty => new(
|
||||
connectorId: string.Empty,
|
||||
cursor: null,
|
||||
lastUpdated: DateTimeOffset.MinValue,
|
||||
lastRunId: null,
|
||||
lastMutationType: null,
|
||||
lastArtifactHash: null,
|
||||
lastArtifactKind: null,
|
||||
totalDocumentsProcessed: 0,
|
||||
totalClaimsGenerated: 0,
|
||||
successCount: 0,
|
||||
failureCount: 0,
|
||||
lastErrorCode: null,
|
||||
nextEligibleRun: null,
|
||||
latestSequenceNumber: 0);
|
||||
|
||||
/// <summary>
|
||||
/// Creates a new state for a connector.
|
||||
/// </summary>
|
||||
public static CheckpointState Initial(string connectorId) => new(
|
||||
connectorId,
|
||||
cursor: null,
|
||||
lastUpdated: DateTimeOffset.MinValue,
|
||||
lastRunId: null,
|
||||
lastMutationType: null,
|
||||
lastArtifactHash: null,
|
||||
lastArtifactKind: null,
|
||||
totalDocumentsProcessed: 0,
|
||||
totalClaimsGenerated: 0,
|
||||
successCount: 0,
|
||||
failureCount: 0,
|
||||
lastErrorCode: null,
|
||||
nextEligibleRun: null,
|
||||
latestSequenceNumber: 0);
|
||||
|
||||
/// <summary>
|
||||
/// Applies a mutation to produce a new state.
|
||||
/// </summary>
|
||||
public CheckpointState Apply(CheckpointMutationEvent mutation)
|
||||
{
|
||||
var newCursor = mutation.Cursor ?? Cursor;
|
||||
var newArtifactHash = mutation.ArtifactHash ?? LastArtifactHash;
|
||||
var newArtifactKind = mutation.ArtifactKind ?? LastArtifactKind;
|
||||
var newDocsProcessed = TotalDocumentsProcessed + (mutation.DocumentsProcessed ?? 0);
|
||||
var newClaimsGenerated = TotalClaimsGenerated + (mutation.ClaimsGenerated ?? 0);
|
||||
var newSuccessCount = mutation.Type == CheckpointMutationType.Completed ? SuccessCount + 1 : SuccessCount;
|
||||
var newFailureCount = mutation.Type == CheckpointMutationType.Failed ? FailureCount + 1 : FailureCount;
|
||||
var newErrorCode = mutation.Type == CheckpointMutationType.Failed ? mutation.ErrorCode : LastErrorCode;
|
||||
var newNextEligible = mutation.Type == CheckpointMutationType.Failed && mutation.RetryAfterSeconds.HasValue
|
||||
? mutation.Timestamp.AddSeconds(mutation.RetryAfterSeconds.Value)
|
||||
: (mutation.Type == CheckpointMutationType.Completed ? null : NextEligibleRun);
|
||||
|
||||
return new CheckpointState(
|
||||
ConnectorId.Length > 0 ? ConnectorId : "unknown",
|
||||
newCursor,
|
||||
mutation.Timestamp,
|
||||
mutation.RunId,
|
||||
mutation.Type,
|
||||
newArtifactHash,
|
||||
newArtifactKind,
|
||||
newDocsProcessed,
|
||||
newClaimsGenerated,
|
||||
newSuccessCount,
|
||||
newFailureCount,
|
||||
newErrorCode,
|
||||
newNextEligible,
|
||||
mutation.SequenceNumber);
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,7 @@ using System.Linq;
|
||||
using System.Security.Cryptography;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using StellaOps.Excititor.Core.Evidence;
|
||||
using StellaOps.Excititor.Core.Observations;
|
||||
|
||||
namespace StellaOps.Excititor.Core.Storage;
|
||||
@@ -708,3 +709,105 @@ public sealed class InMemoryVexObservationStore : IVexObservationStore
|
||||
return ValueTask.FromResult((long)count);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// In-memory attestation store for development and testing while Postgres backing is implemented.
|
||||
/// </summary>
|
||||
public sealed class InMemoryVexAttestationStore : IVexAttestationStore
|
||||
{
|
||||
private readonly ConcurrentDictionary<string, VexStoredAttestation> _attestations = new(StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
public ValueTask SaveAsync(VexStoredAttestation attestation, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(attestation);
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var key = CreateKey(attestation.Tenant, attestation.AttestationId);
|
||||
_attestations[key] = attestation;
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
|
||||
public ValueTask<VexStoredAttestation?> FindByIdAsync(string tenant, string attestationId, CancellationToken cancellationToken)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
if (string.IsNullOrWhiteSpace(tenant) || string.IsNullOrWhiteSpace(attestationId))
|
||||
{
|
||||
return ValueTask.FromResult<VexStoredAttestation?>(null);
|
||||
}
|
||||
|
||||
var key = CreateKey(tenant.Trim().ToLowerInvariant(), attestationId.Trim());
|
||||
_attestations.TryGetValue(key, out var attestation);
|
||||
return ValueTask.FromResult<VexStoredAttestation?>(attestation);
|
||||
}
|
||||
|
||||
public ValueTask<VexStoredAttestation?> FindByManifestIdAsync(string tenant, string manifestId, CancellationToken cancellationToken)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
if (string.IsNullOrWhiteSpace(tenant) || string.IsNullOrWhiteSpace(manifestId))
|
||||
{
|
||||
return ValueTask.FromResult<VexStoredAttestation?>(null);
|
||||
}
|
||||
|
||||
var normalizedTenant = tenant.Trim().ToLowerInvariant();
|
||||
var result = _attestations.Values
|
||||
.Where(a => string.Equals(a.Tenant, normalizedTenant, StringComparison.OrdinalIgnoreCase))
|
||||
.FirstOrDefault(a => string.Equals(a.ManifestId, manifestId.Trim(), StringComparison.OrdinalIgnoreCase));
|
||||
|
||||
return ValueTask.FromResult<VexStoredAttestation?>(result);
|
||||
}
|
||||
|
||||
public ValueTask<VexAttestationListResult> ListAsync(VexAttestationQuery query, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(query);
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var filtered = _attestations.Values
|
||||
.Where(a => string.Equals(a.Tenant, query.Tenant, StringComparison.OrdinalIgnoreCase));
|
||||
|
||||
if (query.Since.HasValue)
|
||||
{
|
||||
filtered = filtered.Where(a => a.AttestedAt >= query.Since.Value);
|
||||
}
|
||||
|
||||
if (query.Until.HasValue)
|
||||
{
|
||||
filtered = filtered.Where(a => a.AttestedAt <= query.Until.Value);
|
||||
}
|
||||
|
||||
var ordered = filtered
|
||||
.OrderByDescending(a => a.AttestedAt)
|
||||
.ThenBy(a => a.AttestationId, StringComparer.Ordinal)
|
||||
.ToList();
|
||||
|
||||
var totalCount = ordered.Count;
|
||||
var items = ordered
|
||||
.Skip(query.Offset)
|
||||
.Take(query.Limit)
|
||||
.ToList();
|
||||
|
||||
var hasMore = query.Offset + items.Count < totalCount;
|
||||
|
||||
return ValueTask.FromResult(new VexAttestationListResult(items, totalCount, hasMore));
|
||||
}
|
||||
|
||||
public ValueTask<int> CountAsync(string tenant, CancellationToken cancellationToken)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
if (string.IsNullOrWhiteSpace(tenant))
|
||||
{
|
||||
return ValueTask.FromResult(0);
|
||||
}
|
||||
|
||||
var normalizedTenant = tenant.Trim().ToLowerInvariant();
|
||||
var count = _attestations.Values
|
||||
.Count(a => string.Equals(a.Tenant, normalizedTenant, StringComparison.OrdinalIgnoreCase));
|
||||
|
||||
return ValueTask.FromResult(count);
|
||||
}
|
||||
|
||||
private static string CreateKey(string tenant, string attestationId)
|
||||
=> $"{tenant}|{attestationId}";
|
||||
}
|
||||
|
||||
@@ -0,0 +1,478 @@
|
||||
using System.Collections.Immutable;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Npgsql;
|
||||
using StellaOps.Excititor.Core.Storage;
|
||||
using StellaOps.Infrastructure.Postgres.Repositories;
|
||||
|
||||
namespace StellaOps.Excititor.Storage.Postgres.Repositories;
|
||||
|
||||
/// <summary>
|
||||
/// PostgreSQL-backed append-only checkpoint store for deterministic connector state persistence.
|
||||
/// Per EXCITITOR-ORCH-32/33: Deterministic checkpoint persistence using Postgres append-only store.
|
||||
/// </summary>
|
||||
public sealed class PostgresAppendOnlyCheckpointStore : RepositoryBase<ExcititorDataSource>, IAppendOnlyCheckpointStore
|
||||
{
|
||||
private volatile bool _initialized;
|
||||
private readonly SemaphoreSlim _initLock = new(1, 1);
|
||||
|
||||
public PostgresAppendOnlyCheckpointStore(ExcititorDataSource dataSource, ILogger<PostgresAppendOnlyCheckpointStore> logger)
|
||||
: base(dataSource, logger)
|
||||
{
|
||||
}
|
||||
|
||||
public async ValueTask<AppendCheckpointResult> AppendAsync(
|
||||
string tenant,
|
||||
string connectorId,
|
||||
CheckpointMutation mutation,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(tenant);
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(connectorId);
|
||||
ArgumentNullException.ThrowIfNull(mutation);
|
||||
|
||||
await EnsureTablesAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// Check for idempotency (duplicate mutation)
|
||||
if (!string.IsNullOrEmpty(mutation.IdempotencyKey))
|
||||
{
|
||||
var existing = await FindByIdempotencyKeyAsync(tenant, connectorId, mutation.IdempotencyKey, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
if (existing is not null)
|
||||
{
|
||||
var currentState = await GetCurrentStateAsync(tenant, connectorId, cancellationToken).ConfigureAwait(false)
|
||||
?? CheckpointState.Initial(connectorId);
|
||||
return AppendCheckpointResult.Duplicate(existing.SequenceNumber, currentState);
|
||||
}
|
||||
}
|
||||
|
||||
await using var connection = await DataSource.OpenConnectionAsync(tenant, "writer", cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// Insert mutation (sequence is auto-generated)
|
||||
const string insertSql = """
|
||||
INSERT INTO vex.checkpoint_mutations (
|
||||
tenant_id, connector_id, mutation_type, run_id, timestamp,
|
||||
cursor, artifact_hash, artifact_kind,
|
||||
documents_processed, claims_generated,
|
||||
error_code, error_message, retry_after_seconds,
|
||||
idempotency_key)
|
||||
VALUES (
|
||||
@tenant_id, @connector_id, @mutation_type, @run_id, @timestamp,
|
||||
@cursor, @artifact_hash, @artifact_kind,
|
||||
@documents_processed, @claims_generated,
|
||||
@error_code, @error_message, @retry_after_seconds,
|
||||
@idempotency_key)
|
||||
RETURNING sequence_number;
|
||||
""";
|
||||
|
||||
await using var command = CreateCommand(insertSql, connection);
|
||||
AddParameter(command, "tenant_id", tenant);
|
||||
AddParameter(command, "connector_id", connectorId);
|
||||
AddParameter(command, "mutation_type", mutation.Type.ToString());
|
||||
AddParameter(command, "run_id", mutation.RunId);
|
||||
AddParameter(command, "timestamp", mutation.Timestamp.UtcDateTime);
|
||||
AddParameter(command, "cursor", mutation.Cursor);
|
||||
AddParameter(command, "artifact_hash", mutation.ArtifactHash);
|
||||
AddParameter(command, "artifact_kind", mutation.ArtifactKind);
|
||||
AddParameter(command, "documents_processed", mutation.DocumentsProcessed);
|
||||
AddParameter(command, "claims_generated", mutation.ClaimsGenerated);
|
||||
AddParameter(command, "error_code", mutation.ErrorCode);
|
||||
AddParameter(command, "error_message", Truncate(mutation.ErrorMessage, 512));
|
||||
AddParameter(command, "retry_after_seconds", mutation.RetryAfterSeconds);
|
||||
AddParameter(command, "idempotency_key", mutation.IdempotencyKey);
|
||||
|
||||
var sequenceNumber = (long)(await command.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false))!;
|
||||
|
||||
// Update materialized state
|
||||
await UpdateMaterializedStateAsync(tenant, connectorId, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var newState = await GetCurrentStateAsync(tenant, connectorId, cancellationToken).ConfigureAwait(false)
|
||||
?? CheckpointState.Initial(connectorId);
|
||||
|
||||
return AppendCheckpointResult.Appended(sequenceNumber, newState);
|
||||
}
|
||||
|
||||
public async ValueTask<CheckpointState?> GetCurrentStateAsync(
|
||||
string tenant,
|
||||
string connectorId,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(tenant);
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(connectorId);
|
||||
|
||||
await EnsureTablesAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
await using var connection = await DataSource.OpenConnectionAsync(tenant, "reader", cancellationToken).ConfigureAwait(false);
|
||||
|
||||
const string sql = """
|
||||
SELECT connector_id, cursor, last_updated, last_run_id, last_mutation_type,
|
||||
last_artifact_hash, last_artifact_kind,
|
||||
total_documents_processed, total_claims_generated,
|
||||
success_count, failure_count, last_error_code,
|
||||
next_eligible_run, latest_sequence_number
|
||||
FROM vex.checkpoint_states
|
||||
WHERE tenant_id = @tenant_id AND connector_id = @connector_id;
|
||||
""";
|
||||
|
||||
await using var command = CreateCommand(sql, connection);
|
||||
AddParameter(command, "tenant_id", tenant);
|
||||
AddParameter(command, "connector_id", connectorId);
|
||||
|
||||
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
|
||||
if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
return MapState(reader);
|
||||
}
|
||||
|
||||
public async ValueTask<IReadOnlyList<CheckpointMutationEvent>> GetMutationLogAsync(
|
||||
string tenant,
|
||||
string connectorId,
|
||||
long? sinceSequence,
|
||||
int limit,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(tenant);
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(connectorId);
|
||||
limit = Math.Clamp(limit, 1, 1000);
|
||||
|
||||
await EnsureTablesAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
await using var connection = await DataSource.OpenConnectionAsync(tenant, "reader", cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var sql = """
|
||||
SELECT sequence_number, mutation_type, run_id, timestamp,
|
||||
cursor, artifact_hash, artifact_kind,
|
||||
documents_processed, claims_generated,
|
||||
error_code, error_message, retry_after_seconds,
|
||||
idempotency_key
|
||||
FROM vex.checkpoint_mutations
|
||||
WHERE tenant_id = @tenant_id AND connector_id = @connector_id
|
||||
""";
|
||||
|
||||
if (sinceSequence.HasValue)
|
||||
{
|
||||
sql += " AND sequence_number > @since_sequence";
|
||||
}
|
||||
|
||||
sql += " ORDER BY sequence_number ASC LIMIT @limit;";
|
||||
|
||||
await using var command = CreateCommand(sql, connection);
|
||||
AddParameter(command, "tenant_id", tenant);
|
||||
AddParameter(command, "connector_id", connectorId);
|
||||
AddParameter(command, "limit", limit);
|
||||
if (sinceSequence.HasValue)
|
||||
{
|
||||
AddParameter(command, "since_sequence", sinceSequence.Value);
|
||||
}
|
||||
|
||||
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var results = new List<CheckpointMutationEvent>();
|
||||
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
results.Add(MapMutation(reader));
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
public async ValueTask<CheckpointState?> ReplayToSequenceAsync(
|
||||
string tenant,
|
||||
string connectorId,
|
||||
long upToSequence,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(tenant);
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(connectorId);
|
||||
|
||||
await EnsureTablesAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// Get all mutations up to the specified sequence
|
||||
await using var connection = await DataSource.OpenConnectionAsync(tenant, "reader", cancellationToken).ConfigureAwait(false);
|
||||
|
||||
const string sql = """
|
||||
SELECT sequence_number, mutation_type, run_id, timestamp,
|
||||
cursor, artifact_hash, artifact_kind,
|
||||
documents_processed, claims_generated,
|
||||
error_code, error_message, retry_after_seconds,
|
||||
idempotency_key
|
||||
FROM vex.checkpoint_mutations
|
||||
WHERE tenant_id = @tenant_id AND connector_id = @connector_id
|
||||
AND sequence_number <= @up_to_sequence
|
||||
ORDER BY sequence_number ASC;
|
||||
""";
|
||||
|
||||
await using var command = CreateCommand(sql, connection);
|
||||
AddParameter(command, "tenant_id", tenant);
|
||||
AddParameter(command, "connector_id", connectorId);
|
||||
AddParameter(command, "up_to_sequence", upToSequence);
|
||||
|
||||
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var state = CheckpointState.Initial(connectorId);
|
||||
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
var mutation = MapMutation(reader);
|
||||
state = state.Apply(mutation);
|
||||
}
|
||||
|
||||
return state.LatestSequenceNumber > 0 ? state : null;
|
||||
}
|
||||
|
||||
private async ValueTask<CheckpointMutationEvent?> FindByIdempotencyKeyAsync(
|
||||
string tenant,
|
||||
string connectorId,
|
||||
string idempotencyKey,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
await using var connection = await DataSource.OpenConnectionAsync(tenant, "reader", cancellationToken).ConfigureAwait(false);
|
||||
|
||||
const string sql = """
|
||||
SELECT sequence_number, mutation_type, run_id, timestamp,
|
||||
cursor, artifact_hash, artifact_kind,
|
||||
documents_processed, claims_generated,
|
||||
error_code, error_message, retry_after_seconds,
|
||||
idempotency_key
|
||||
FROM vex.checkpoint_mutations
|
||||
WHERE tenant_id = @tenant_id AND connector_id = @connector_id AND idempotency_key = @idempotency_key;
|
||||
""";
|
||||
|
||||
await using var command = CreateCommand(sql, connection);
|
||||
AddParameter(command, "tenant_id", tenant);
|
||||
AddParameter(command, "connector_id", connectorId);
|
||||
AddParameter(command, "idempotency_key", idempotencyKey);
|
||||
|
||||
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
|
||||
if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
return MapMutation(reader);
|
||||
}
|
||||
|
||||
private async ValueTask UpdateMaterializedStateAsync(
|
||||
string tenant,
|
||||
string connectorId,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
await using var connection = await DataSource.OpenConnectionAsync(tenant, "writer", cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// Compute state from mutation log and upsert into materialized table
|
||||
const string sql = """
|
||||
INSERT INTO vex.checkpoint_states (
|
||||
tenant_id, connector_id, cursor, last_updated, last_run_id, last_mutation_type,
|
||||
last_artifact_hash, last_artifact_kind,
|
||||
total_documents_processed, total_claims_generated,
|
||||
success_count, failure_count, last_error_code,
|
||||
next_eligible_run, latest_sequence_number)
|
||||
SELECT
|
||||
@tenant_id,
|
||||
@connector_id,
|
||||
(SELECT cursor FROM vex.checkpoint_mutations
|
||||
WHERE tenant_id = @tenant_id AND connector_id = @connector_id AND cursor IS NOT NULL
|
||||
ORDER BY sequence_number DESC LIMIT 1),
|
||||
(SELECT MAX(timestamp) FROM vex.checkpoint_mutations
|
||||
WHERE tenant_id = @tenant_id AND connector_id = @connector_id),
|
||||
(SELECT run_id FROM vex.checkpoint_mutations
|
||||
WHERE tenant_id = @tenant_id AND connector_id = @connector_id
|
||||
ORDER BY sequence_number DESC LIMIT 1),
|
||||
(SELECT mutation_type FROM vex.checkpoint_mutations
|
||||
WHERE tenant_id = @tenant_id AND connector_id = @connector_id
|
||||
ORDER BY sequence_number DESC LIMIT 1),
|
||||
(SELECT artifact_hash FROM vex.checkpoint_mutations
|
||||
WHERE tenant_id = @tenant_id AND connector_id = @connector_id AND artifact_hash IS NOT NULL
|
||||
ORDER BY sequence_number DESC LIMIT 1),
|
||||
(SELECT artifact_kind FROM vex.checkpoint_mutations
|
||||
WHERE tenant_id = @tenant_id AND connector_id = @connector_id AND artifact_kind IS NOT NULL
|
||||
ORDER BY sequence_number DESC LIMIT 1),
|
||||
COALESCE((SELECT SUM(documents_processed) FROM vex.checkpoint_mutations
|
||||
WHERE tenant_id = @tenant_id AND connector_id = @connector_id), 0),
|
||||
COALESCE((SELECT SUM(claims_generated) FROM vex.checkpoint_mutations
|
||||
WHERE tenant_id = @tenant_id AND connector_id = @connector_id), 0),
|
||||
(SELECT COUNT(*) FROM vex.checkpoint_mutations
|
||||
WHERE tenant_id = @tenant_id AND connector_id = @connector_id AND mutation_type = 'Completed'),
|
||||
(SELECT COUNT(*) FROM vex.checkpoint_mutations
|
||||
WHERE tenant_id = @tenant_id AND connector_id = @connector_id AND mutation_type = 'Failed'),
|
||||
(SELECT error_code FROM vex.checkpoint_mutations
|
||||
WHERE tenant_id = @tenant_id AND connector_id = @connector_id AND mutation_type = 'Failed'
|
||||
ORDER BY sequence_number DESC LIMIT 1),
|
||||
(SELECT timestamp + (retry_after_seconds || ' seconds')::interval
|
||||
FROM vex.checkpoint_mutations
|
||||
WHERE tenant_id = @tenant_id AND connector_id = @connector_id AND mutation_type = 'Failed'
|
||||
AND retry_after_seconds IS NOT NULL
|
||||
ORDER BY sequence_number DESC LIMIT 1),
|
||||
(SELECT MAX(sequence_number) FROM vex.checkpoint_mutations
|
||||
WHERE tenant_id = @tenant_id AND connector_id = @connector_id)
|
||||
ON CONFLICT (tenant_id, connector_id) DO UPDATE SET
|
||||
cursor = EXCLUDED.cursor,
|
||||
last_updated = EXCLUDED.last_updated,
|
||||
last_run_id = EXCLUDED.last_run_id,
|
||||
last_mutation_type = EXCLUDED.last_mutation_type,
|
||||
last_artifact_hash = EXCLUDED.last_artifact_hash,
|
||||
last_artifact_kind = EXCLUDED.last_artifact_kind,
|
||||
total_documents_processed = EXCLUDED.total_documents_processed,
|
||||
total_claims_generated = EXCLUDED.total_claims_generated,
|
||||
success_count = EXCLUDED.success_count,
|
||||
failure_count = EXCLUDED.failure_count,
|
||||
last_error_code = EXCLUDED.last_error_code,
|
||||
next_eligible_run = EXCLUDED.next_eligible_run,
|
||||
latest_sequence_number = EXCLUDED.latest_sequence_number;
|
||||
""";
|
||||
|
||||
await using var command = CreateCommand(sql, connection);
|
||||
AddParameter(command, "tenant_id", tenant);
|
||||
AddParameter(command, "connector_id", connectorId);
|
||||
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private CheckpointState MapState(NpgsqlDataReader reader)
|
||||
{
|
||||
var connectorId = reader.GetString(0);
|
||||
var cursor = reader.IsDBNull(1) ? null : reader.GetString(1);
|
||||
var lastUpdated = reader.IsDBNull(2) ? DateTimeOffset.MinValue : new DateTimeOffset(reader.GetDateTime(2), TimeSpan.Zero);
|
||||
var lastRunId = reader.IsDBNull(3) ? (Guid?)null : reader.GetGuid(3);
|
||||
var lastMutationTypeStr = reader.IsDBNull(4) ? null : reader.GetString(4);
|
||||
var lastMutationType = !string.IsNullOrEmpty(lastMutationTypeStr)
|
||||
? Enum.Parse<CheckpointMutationType>(lastMutationTypeStr)
|
||||
: (CheckpointMutationType?)null;
|
||||
var lastArtifactHash = reader.IsDBNull(5) ? null : reader.GetString(5);
|
||||
var lastArtifactKind = reader.IsDBNull(6) ? null : reader.GetString(6);
|
||||
var totalDocsProcessed = reader.IsDBNull(7) ? 0 : reader.GetInt32(7);
|
||||
var totalClaimsGenerated = reader.IsDBNull(8) ? 0 : reader.GetInt32(8);
|
||||
var successCount = reader.IsDBNull(9) ? 0 : reader.GetInt32(9);
|
||||
var failureCount = reader.IsDBNull(10) ? 0 : reader.GetInt32(10);
|
||||
var lastErrorCode = reader.IsDBNull(11) ? null : reader.GetString(11);
|
||||
var nextEligible = reader.IsDBNull(12) ? (DateTimeOffset?)null : new DateTimeOffset(reader.GetDateTime(12), TimeSpan.Zero);
|
||||
var latestSeq = reader.IsDBNull(13) ? 0L : reader.GetInt64(13);
|
||||
|
||||
return new CheckpointState(
|
||||
connectorId,
|
||||
cursor,
|
||||
lastUpdated,
|
||||
lastRunId,
|
||||
lastMutationType,
|
||||
lastArtifactHash,
|
||||
lastArtifactKind,
|
||||
totalDocsProcessed,
|
||||
totalClaimsGenerated,
|
||||
successCount,
|
||||
failureCount,
|
||||
lastErrorCode,
|
||||
nextEligible,
|
||||
latestSeq);
|
||||
}
|
||||
|
||||
private CheckpointMutationEvent MapMutation(NpgsqlDataReader reader)
|
||||
{
|
||||
return new CheckpointMutationEvent(
|
||||
SequenceNumber: reader.GetInt64(0),
|
||||
Type: Enum.Parse<CheckpointMutationType>(reader.GetString(1)),
|
||||
RunId: reader.GetGuid(2),
|
||||
Timestamp: new DateTimeOffset(reader.GetDateTime(3), TimeSpan.Zero),
|
||||
Cursor: reader.IsDBNull(4) ? null : reader.GetString(4),
|
||||
ArtifactHash: reader.IsDBNull(5) ? null : reader.GetString(5),
|
||||
ArtifactKind: reader.IsDBNull(6) ? null : reader.GetString(6),
|
||||
DocumentsProcessed: reader.IsDBNull(7) ? null : reader.GetInt32(7),
|
||||
ClaimsGenerated: reader.IsDBNull(8) ? null : reader.GetInt32(8),
|
||||
ErrorCode: reader.IsDBNull(9) ? null : reader.GetString(9),
|
||||
ErrorMessage: reader.IsDBNull(10) ? null : reader.GetString(10),
|
||||
RetryAfterSeconds: reader.IsDBNull(11) ? null : reader.GetInt32(11),
|
||||
IdempotencyKey: reader.IsDBNull(12) ? null : reader.GetString(12));
|
||||
}
|
||||
|
||||
private async ValueTask EnsureTablesAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (_initialized)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
await _initLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
if (_initialized)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
await using var connection = await DataSource.OpenSystemConnectionAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// Create append-only mutations table
|
||||
const string mutationsSql = """
|
||||
CREATE TABLE IF NOT EXISTS vex.checkpoint_mutations (
|
||||
sequence_number bigserial PRIMARY KEY,
|
||||
tenant_id text NOT NULL,
|
||||
connector_id text NOT NULL,
|
||||
mutation_type text NOT NULL,
|
||||
run_id uuid NOT NULL,
|
||||
timestamp timestamptz NOT NULL,
|
||||
cursor text,
|
||||
artifact_hash text,
|
||||
artifact_kind text,
|
||||
documents_processed integer,
|
||||
claims_generated integer,
|
||||
error_code text,
|
||||
error_message text,
|
||||
retry_after_seconds integer,
|
||||
idempotency_key text,
|
||||
created_at timestamptz NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_checkpoint_mutations_tenant_connector
|
||||
ON vex.checkpoint_mutations (tenant_id, connector_id, sequence_number);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_checkpoint_mutations_idempotency
|
||||
ON vex.checkpoint_mutations (tenant_id, connector_id, idempotency_key)
|
||||
WHERE idempotency_key IS NOT NULL;
|
||||
""";
|
||||
|
||||
await using var mutationsCommand = CreateCommand(mutationsSql, connection);
|
||||
await mutationsCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// Create materialized state table
|
||||
const string statesSql = """
|
||||
CREATE TABLE IF NOT EXISTS vex.checkpoint_states (
|
||||
tenant_id text NOT NULL,
|
||||
connector_id text NOT NULL,
|
||||
cursor text,
|
||||
last_updated timestamptz,
|
||||
last_run_id uuid,
|
||||
last_mutation_type text,
|
||||
last_artifact_hash text,
|
||||
last_artifact_kind text,
|
||||
total_documents_processed integer NOT NULL DEFAULT 0,
|
||||
total_claims_generated integer NOT NULL DEFAULT 0,
|
||||
success_count integer NOT NULL DEFAULT 0,
|
||||
failure_count integer NOT NULL DEFAULT 0,
|
||||
last_error_code text,
|
||||
next_eligible_run timestamptz,
|
||||
latest_sequence_number bigint NOT NULL DEFAULT 0,
|
||||
PRIMARY KEY (tenant_id, connector_id)
|
||||
);
|
||||
""";
|
||||
|
||||
await using var statesCommand = CreateCommand(statesSql, connection);
|
||||
await statesCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
_initialized = true;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_initLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
private static string? Truncate(string? value, int maxLength)
|
||||
{
|
||||
if (string.IsNullOrEmpty(value))
|
||||
{
|
||||
return value;
|
||||
}
|
||||
|
||||
return value.Length <= maxLength ? value : value[..maxLength];
|
||||
}
|
||||
}
|
||||
@@ -36,6 +36,9 @@ public static class ServiceCollectionExtensions
|
||||
services.AddScoped<IVexRawStore, PostgresVexRawStore>();
|
||||
services.AddScoped<IVexConnectorStateRepository, PostgresConnectorStateRepository>();
|
||||
|
||||
// Register append-only checkpoint store for deterministic persistence (EXCITITOR-ORCH-32/33)
|
||||
services.AddScoped<IAppendOnlyCheckpointStore, PostgresAppendOnlyCheckpointStore>();
|
||||
|
||||
return services;
|
||||
}
|
||||
|
||||
@@ -59,6 +62,9 @@ public static class ServiceCollectionExtensions
|
||||
services.AddScoped<IVexRawStore, PostgresVexRawStore>();
|
||||
services.AddScoped<IVexConnectorStateRepository, PostgresConnectorStateRepository>();
|
||||
|
||||
// Register append-only checkpoint store for deterministic persistence (EXCITITOR-ORCH-32/33)
|
||||
services.AddScoped<IAppendOnlyCheckpointStore, PostgresAppendOnlyCheckpointStore>();
|
||||
|
||||
return services;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user