save progress

This commit is contained in:
StellaOps Bot
2026-01-02 15:52:31 +02:00
parent 2dec7e6a04
commit f46bde5575
174 changed files with 20793 additions and 8307 deletions

View File

@@ -0,0 +1,303 @@
using System.Globalization;
using System.Text.Json.Nodes;
using StellaOps.Findings.Ledger.Domain;
using StellaOps.Findings.Ledger.Hashing;
namespace LedgerReplayHarness;
internal static class HarnessDraftParser
{
public static bool TryParseDraft(
JsonObject node,
string defaultTenant,
DateTimeOffset recordedAtBase,
out LedgerEventDraft draft,
out string error)
{
draft = default!;
error = string.Empty;
if (!TryGetRequiredString(node, "tenant", out var tenantId))
{
tenantId = defaultTenant;
}
if (!TryGetRequiredGuid(node, "chain_id", out var chainId, out error))
{
return false;
}
if (!TryGetRequiredLong(node, out var sequence, out error))
{
return false;
}
if (!TryGetRequiredGuid(node, "event_id", out var eventId, out error))
{
return false;
}
if (!TryGetRequiredString(node, "event_type", out var eventType))
{
error = "event_type missing";
return false;
}
if (!TryGetRequiredString(node, "policy_version", out var policyVersion))
{
error = "policy_version missing";
return false;
}
if (!TryGetRequiredString(node, "finding_id", out var findingId))
{
error = "finding_id missing";
return false;
}
if (!TryGetRequiredString(node, "artifact_id", out var artifactId))
{
error = "artifact_id missing";
return false;
}
Guid? sourceRunId = null;
if (node.TryGetPropertyValue("source_run_id", out var sourceRunNode) &&
sourceRunNode is not null &&
TryGetString(sourceRunNode, out var sourceRunValue) &&
!string.IsNullOrWhiteSpace(sourceRunValue))
{
if (!Guid.TryParse(sourceRunValue, out var parsedSourceRun))
{
error = "source_run_id invalid";
return false;
}
sourceRunId = parsedSourceRun;
}
if (!TryGetRequiredString(node, "actor_id", out var actorId))
{
error = "actor_id missing";
return false;
}
if (!TryGetRequiredString(node, "actor_type", out var actorType))
{
error = "actor_type missing";
return false;
}
if (!TryGetRequiredDateTime(node, "occurred_at", out var occurredAt, out error))
{
return false;
}
DateTimeOffset recordedAt;
if (node.TryGetPropertyValue("recorded_at", out var recordedAtNode) && recordedAtNode is not null)
{
if (!TryGetDateTime(recordedAtNode, out recordedAt))
{
error = "recorded_at invalid";
return false;
}
}
else
{
recordedAt = recordedAtBase;
}
if (!node.TryGetPropertyValue("payload", out var payloadNode) || payloadNode is not JsonObject payload)
{
error = "payload missing";
return false;
}
JsonObject canonicalEnvelope;
try
{
canonicalEnvelope = LedgerCanonicalJsonSerializer.Canonicalize(payload);
}
catch (Exception ex)
{
error = $"payload canonicalize failed: {ex.GetType().Name}";
return false;
}
var prev = node.TryGetPropertyValue("previous_hash", out var prevNode) && prevNode is not null && TryGetString(prevNode, out var prevValue)
? prevValue
: null;
draft = new LedgerEventDraft(
tenantId,
chainId,
sequence,
eventId,
eventType,
policyVersion,
findingId,
artifactId,
sourceRunId,
actorId,
actorType,
occurredAt,
recordedAt,
payload,
canonicalEnvelope,
prev);
return true;
}
private static bool TryGetRequiredString(JsonObject node, string name, out string value)
{
value = string.Empty;
if (!node.TryGetPropertyValue(name, out var nodeValue) || nodeValue is null)
{
return false;
}
if (!TryGetString(nodeValue, out value))
{
return false;
}
return !string.IsNullOrWhiteSpace(value);
}
private static bool TryGetString(JsonNode node, out string value)
{
value = string.Empty;
if (node is JsonValue jsonValue)
{
try
{
value = jsonValue.GetValue<string>();
return true;
}
catch
{
return false;
}
}
return false;
}
private static bool TryGetRequiredGuid(JsonObject node, string name, out Guid value, out string error)
{
value = Guid.Empty;
error = string.Empty;
if (!TryGetRequiredString(node, name, out var stringValue))
{
error = $"{name} missing";
return false;
}
if (!Guid.TryParse(stringValue, out value))
{
error = $"{name} invalid";
return false;
}
return true;
}
private static bool TryGetRequiredLong(JsonObject node, out long value, out string error)
{
value = 0;
error = string.Empty;
if (node.TryGetPropertyValue("sequence_no", out var seqNode) && seqNode is not null)
{
if (TryGetLong(seqNode, out value))
{
return true;
}
error = "sequence_no invalid";
return false;
}
if (node.TryGetPropertyValue("sequence", out var altNode) && altNode is not null)
{
if (TryGetLong(altNode, out value))
{
return true;
}
error = "sequence invalid";
return false;
}
error = "sequence_no missing";
return false;
}
private static bool TryGetLong(JsonNode node, out long value)
{
value = 0;
if (node is JsonValue jsonValue)
{
try
{
return jsonValue.TryGetValue(out value);
}
catch
{
return false;
}
}
return false;
}
private static bool TryGetRequiredDateTime(JsonObject node, string name, out DateTimeOffset value, out string error)
{
value = default;
error = string.Empty;
if (!TryGetRequiredString(node, name, out var stringValue))
{
error = $"{name} missing";
return false;
}
if (!DateTimeOffset.TryParse(stringValue, CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind, out value))
{
error = $"{name} invalid";
return false;
}
return true;
}
private static bool TryGetDateTime(JsonNode node, out DateTimeOffset value)
{
value = default;
if (node is JsonValue jsonValue)
{
try
{
if (jsonValue.TryGetValue(out value))
{
return true;
}
}
catch
{
return false;
}
if (jsonValue.TryGetValue(out string? stringValue) &&
!string.IsNullOrWhiteSpace(stringValue) &&
DateTimeOffset.TryParse(stringValue, CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind, out value))
{
return true;
}
}
return false;
}
}

View File

@@ -0,0 +1,14 @@
namespace LedgerReplayHarness;
public sealed class HarnessFixtureException : Exception
{
public HarnessFixtureException(string fixturePath, int lineNumber, string message, Exception? innerException = null)
: base($"{Path.GetFileName(fixturePath)}:{lineNumber} {message}", innerException)
{
FixturePath = fixturePath;
LineNumber = lineNumber;
}
public string FixturePath { get; }
public int LineNumber { get; }
}

View File

@@ -0,0 +1,55 @@
using System.Runtime.CompilerServices;
using System.Text.Json;
using System.Text.Json.Nodes;
using StellaOps.Findings.Ledger.Domain;
namespace LedgerReplayHarness;
internal static class HarnessFixtureReader
{
public static async IAsyncEnumerable<LedgerEventDraft> ReadDraftsAsync(
FileInfo file,
string tenant,
TimeProvider timeProvider,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
await using var stream = file.OpenRead();
using var reader = new StreamReader(stream);
var recordedAtBase = timeProvider.GetUtcNow();
string? line;
var lineNumber = 0;
while ((line = await reader.ReadLineAsync().ConfigureAwait(false)) is not null)
{
cancellationToken.ThrowIfCancellationRequested();
lineNumber++;
if (string.IsNullOrWhiteSpace(line))
{
continue;
}
JsonObject? node;
try
{
node = JsonNode.Parse(line)?.AsObject();
}
catch (JsonException ex)
{
throw new HarnessFixtureException(file.FullName, lineNumber, "invalid json", ex);
}
if (node is null)
{
throw new HarnessFixtureException(file.FullName, lineNumber, "expected json object");
}
if (!HarnessDraftParser.TryParseDraft(node, tenant, recordedAtBase, out var draft, out var error))
{
throw new HarnessFixtureException(file.FullName, lineNumber, error);
}
yield return draft;
}
}
}

View File

@@ -0,0 +1,33 @@
using System.Security.Cryptography;
using System.Text;
namespace LedgerReplayHarness;
internal static class HarnessMath
{
public static double Percentile(IEnumerable<double> values, double percentile)
{
var data = values.Where(v => !double.IsNaN(v)).OrderBy(v => v).ToArray();
if (data.Length == 0)
{
return 0;
}
var rank = (percentile / 100.0) * (data.Length - 1);
var lowerIndex = (int)Math.Floor(rank);
var upperIndex = (int)Math.Ceiling(rank);
if (lowerIndex == upperIndex)
{
return data[lowerIndex];
}
var fraction = rank - lowerIndex;
return data[lowerIndex] + (data[upperIndex] - data[lowerIndex]) * fraction;
}
public static void AppendEventStreamEntry(IncrementalHash hasher, string eventHash, long sequence)
{
var payload = $"{eventHash}:{sequence}\n";
hasher.AppendData(Encoding.UTF8.GetBytes(payload));
}
}

View File

@@ -9,6 +9,11 @@
<ItemGroup>
<ProjectReference Include="..\..\StellaOps.Findings.Ledger.csproj" />
</ItemGroup>
<ItemGroup>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>StellaOps.Findings.Ledger.ReplayHarness.Tests</_Parameter1>
</AssemblyAttribute>
</ItemGroup>
<ItemGroup>
<PackageReference Include="System.CommandLine" />
</ItemGroup>

View File

@@ -1,7 +1,6 @@
using System.CommandLine;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Runtime.CompilerServices;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
@@ -21,6 +20,8 @@ using StellaOps.Findings.Ledger.Options;
using StellaOps.Findings.Ledger.Observability;
using StellaOps.Findings.Ledger.Services;
using LedgerReplayHarness;
// Command-line options
var fixturesOption = new Option<FileInfo[]>("--fixture")
{
@@ -44,7 +45,12 @@ var tenantOption = new Option<string>("--tenant")
var maxParallelOption = new Option<int>("--maxParallel")
{
Description = "Maximum concurrent append operations",
DefaultValueFactory = _ => 4
DefaultValueFactory = _ => 1
};
var allowParallelOption = new Option<bool>("--allowParallel")
{
Description = "Allow non-deterministic parallel fixture ingestion"
};
var reportOption = new Option<FileInfo?>("--report")
@@ -67,6 +73,7 @@ root.Add(fixturesOption);
root.Add(connectionOption);
root.Add(tenantOption);
root.Add(maxParallelOption);
root.Add(allowParallelOption);
root.Add(reportOption);
root.Add(metricsOption);
root.Add(expectedChecksumOption);
@@ -77,6 +84,7 @@ root.SetAction(async (parseResult, ct) =>
var connection = parseResult.GetValue(connectionOption)!;
var tenant = parseResult.GetValue(tenantOption)!;
var maxParallel = parseResult.GetValue(maxParallelOption);
var allowParallel = parseResult.GetValue(allowParallelOption);
var reportFile = parseResult.GetValue(reportOption);
var metricsFile = parseResult.GetValue(metricsOption);
var expectedChecksumsFile = parseResult.GetValue(expectedChecksumOption);
@@ -96,25 +104,23 @@ root.SetAction(async (parseResult, ct) =>
var (meterListener, metrics) = CreateMeterListener();
var sw = Stopwatch.StartNew();
long eventsWritten = 0;
var eventsWritten = new LongCounter();
var orderedFixtures = fixtures.OrderBy(f => f.FullName, StringComparer.Ordinal).ToArray();
await Parallel.ForEachAsync(fixtures, new ParallelOptions { MaxDegreeOfParallelism = maxParallel, CancellationToken = cts.Token }, async (file, token) =>
if (allowParallel && maxParallel > 1)
{
await foreach (var draft in ReadDraftsAsync(file, tenant, timeProvider, token))
await Parallel.ForEachAsync(orderedFixtures, new ParallelOptions { MaxDegreeOfParallelism = maxParallel, CancellationToken = cts.Token }, async (file, token) =>
{
var result = await writeService.AppendAsync(draft, token).ConfigureAwait(false);
if (result.Status is LedgerWriteStatus.ValidationFailed or LedgerWriteStatus.Conflict)
{
throw new InvalidOperationException($"Append failed for {draft.EventId}: {string.Join(",", result.Errors)} ({result.ConflictCode})");
}
Interlocked.Increment(ref eventsWritten);
if (eventsWritten % 50_000 == 0)
{
logger.LogInformation("Appended {Count} events...", eventsWritten);
}
await AppendFixtureAsync(file, tenant, timeProvider, writeService, logger, eventsWritten, token).ConfigureAwait(false);
}).ConfigureAwait(false);
}
else
{
foreach (var file in orderedFixtures)
{
await AppendFixtureAsync(file, tenant, timeProvider, writeService, logger, eventsWritten, cts.Token).ConfigureAwait(false);
}
}).ConfigureAwait(false);
}
// Wait for projector to catch up
await Task.Delay(TimeSpan.FromSeconds(2), cts.Token);
@@ -122,19 +128,19 @@ root.SetAction(async (parseResult, ct) =>
meterListener.RecordObservableInstruments();
var verification = await VerifyLedgerAsync(scope.ServiceProvider, tenant, eventsWritten, expectedChecksumsFile, cts.Token).ConfigureAwait(false);
var verification = await VerifyLedgerAsync(scope.ServiceProvider, tenant, eventsWritten.Value, expectedChecksumsFile, cts.Token).ConfigureAwait(false);
var writeDurations = metrics.HistDouble("ledger_write_duration_seconds").Concat(metrics.HistDouble("ledger_write_latency_seconds"));
var writeLatencyP95Ms = Percentile(writeDurations, 95) * 1000;
var rebuildP95Ms = Percentile(metrics.HistDouble("ledger_projection_rebuild_seconds"), 95) * 1000;
var writeLatencyP95Ms = HarnessMath.Percentile(writeDurations, 95) * 1000;
var rebuildP95Ms = HarnessMath.Percentile(metrics.HistDouble("ledger_projection_rebuild_seconds"), 95) * 1000;
var projectionLagSeconds = metrics.GaugeDouble("ledger_projection_lag_seconds").DefaultIfEmpty(0).Max();
var backlogEvents = metrics.GaugeLong("ledger_ingest_backlog_events").DefaultIfEmpty(0).Max();
var dbConnections = metrics.GaugeLong("ledger_db_connections_active").DefaultIfEmpty(0).Sum();
var report = new HarnessReport(
tenant,
fixtures.Select(f => f.FullName).ToArray(),
eventsWritten,
orderedFixtures.Select(f => f.FullName).ToArray(),
eventsWritten.Value,
sw.Elapsed.TotalSeconds,
Status: verification.Success ? "pass" : "fail",
WriteLatencyP95Ms: writeLatencyP95Ms,
@@ -252,82 +258,31 @@ static IHost BuildHost(string connectionString)
.Build();
}
static async IAsyncEnumerable<LedgerEventDraft> ReadDraftsAsync(FileInfo file, string tenant, TimeProvider timeProvider, [EnumeratorCancellation] CancellationToken cancellationToken)
static async Task AppendFixtureAsync(
FileInfo file,
string tenant,
TimeProvider timeProvider,
ILedgerEventWriteService writeService,
ILogger logger,
LongCounter eventsWritten,
CancellationToken cancellationToken)
{
await using var stream = file.OpenRead();
using var reader = new StreamReader(stream);
var recordedAtBase = timeProvider.GetUtcNow();
string? line;
while ((line = await reader.ReadLineAsync().ConfigureAwait(false)) is not null)
await foreach (var draft in HarnessFixtureReader.ReadDraftsAsync(file, tenant, timeProvider, cancellationToken))
{
if (string.IsNullOrWhiteSpace(line))
var result = await writeService.AppendAsync(draft, cancellationToken).ConfigureAwait(false);
if (result.Status is LedgerWriteStatus.ValidationFailed or LedgerWriteStatus.Conflict)
{
continue;
throw new InvalidOperationException($"Append failed for {draft.EventId}: {string.Join(",", result.Errors)} ({result.ConflictCode})");
}
var node = JsonNode.Parse(line)?.AsObject();
if (node is null)
var total = Interlocked.Increment(ref eventsWritten.Value);
if (total % 50_000 == 0)
{
continue;
logger.LogInformation("Appended {Count} events...", total);
}
yield return ToDraft(node, tenant, recordedAtBase);
cancellationToken.ThrowIfCancellationRequested();
}
}
static LedgerEventDraft ToDraft(JsonObject node, string defaultTenant, DateTimeOffset recordedAtBase)
{
string required(string name) => node[name]?.GetValue<string>() ?? throw new InvalidOperationException($"{name} missing");
var tenantId = node.TryGetPropertyValue("tenant", out var tenantNode)
? tenantNode!.GetValue<string>()
: defaultTenant;
var chainId = Guid.Parse(required("chain_id"));
var sequence = node["sequence_no"]?.GetValue<long>() ?? node["sequence"]?.GetValue<long>() ?? throw new InvalidOperationException("sequence_no missing");
var eventId = Guid.Parse(required("event_id"));
var eventType = required("event_type");
var policyVersion = required("policy_version");
var findingId = required("finding_id");
var artifactId = required("artifact_id");
var sourceRunId = node.TryGetPropertyValue("source_run_id", out var sourceRunNode) && sourceRunNode is not null && !string.IsNullOrWhiteSpace(sourceRunNode.GetValue<string>())
? (Guid?)Guid.Parse(sourceRunNode!.GetValue<string>())
: null;
var actorId = required("actor_id");
var actorType = required("actor_type");
var occurredAt = DateTimeOffset.Parse(required("occurred_at"));
var recordedAt = node.TryGetPropertyValue("recorded_at", out var recordedAtNode) && recordedAtNode is not null
? DateTimeOffset.Parse(recordedAtNode.GetValue<string>())
: recordedAtBase;
var payload = node.TryGetPropertyValue("payload", out var payloadNode) && payloadNode is JsonObject payloadObj
? payloadObj
: throw new InvalidOperationException("payload missing");
var canonicalEnvelope = LedgerCanonicalJsonSerializer.Canonicalize(payload);
var prev = node.TryGetPropertyValue("previous_hash", out var prevNode) ? prevNode?.GetValue<string>() : null;
return new LedgerEventDraft(
tenantId,
chainId,
sequence,
eventId,
eventType,
policyVersion,
findingId,
artifactId,
sourceRunId,
actorId,
actorType,
occurredAt,
recordedAt,
payload,
canonicalEnvelope,
prev);
}
static async Task<VerificationResult> VerifyLedgerAsync(IServiceProvider services, string tenant, long expectedEvents, FileInfo? expectedChecksumsFile, CancellationToken cancellationToken)
{
var errors = new List<string>();
@@ -377,7 +332,7 @@ static async Task<VerificationResult> VerifyLedgerAsync(IServiceProvider service
var eventHash = reader.GetString(4);
var previousHash = reader.GetString(5);
var merkleLeafHash = reader.GetString(6);
eventHasher.AppendData(Encoding.UTF8.GetBytes($"{eventHash}:{sequence}\n"));
HarnessMath.AppendEventStreamEntry(eventHasher, eventHash, sequence);
if (currentChain != chainId)
{
@@ -457,26 +412,6 @@ static async Task<VerificationResult> VerifyLedgerAsync(IServiceProvider service
return new VerificationResult(errors.Count == 0, errors, eventStreamChecksum, projectionChecksum);
}
static double Percentile(IEnumerable<double> values, double percentile)
{
var data = values.Where(v => !double.IsNaN(v)).OrderBy(v => v).ToArray();
if (data.Length == 0)
{
return 0;
}
var rank = (percentile / 100.0) * (data.Length - 1);
var lowerIndex = (int)Math.Floor(rank);
var upperIndex = (int)Math.Ceiling(rank);
if (lowerIndex == upperIndex)
{
return data[lowerIndex];
}
var fraction = rank - lowerIndex;
return data[lowerIndex] + (data[upperIndex] - data[lowerIndex]) * fraction;
}
// Local function - must be before type declarations
static ExpectedChecksums LoadExpectedChecksums(FileInfo? file)
{
@@ -536,6 +471,11 @@ internal sealed class MetricsBag
};
}
internal sealed class LongCounter
{
public long Value;
}
// Harness lightweight no-op implementations for projection/merkle to keep replay fast
internal sealed class NoOpPolicyEvaluationService : IPolicyEvaluationService
{