Add unit tests and implementations for MongoDB index models and OpenAPI metadata
- Implemented `MongoIndexModelTests` to verify index models for various stores. - Created `OpenApiMetadataFactory` with methods to generate OpenAPI metadata. - Added tests for `OpenApiMetadataFactory` to ensure expected defaults and URL overrides. - Introduced `ObserverSurfaceSecrets` and `WebhookSurfaceSecrets` for managing secrets. - Developed `RuntimeSurfaceFsClient` and `WebhookSurfaceFsClient` for manifest retrieval. - Added dependency injection tests for `SurfaceEnvironmentRegistration` in both Observer and Webhook contexts. - Implemented tests for secret resolution in `ObserverSurfaceSecretsTests` and `WebhookSurfaceSecretsTests`. - Created `EnsureLinkNotMergeCollectionsMigrationTests` to validate MongoDB migration logic. - Added project files for MongoDB tests and NuGet package mirroring.
This commit is contained in:
@@ -0,0 +1,7 @@
|
||||
// Temporary shim for compilers that do not surface System.Runtime.CompilerServices.IsExternalInit
|
||||
// (needed for record types). Remove when toolchain natively provides the type.
|
||||
namespace System.Runtime.CompilerServices;
|
||||
|
||||
internal static class IsExternalInit
|
||||
{
|
||||
}
|
||||
@@ -0,0 +1,2 @@
|
||||
{"tenant": "tenant-a", "chain_id": "c8d6f7f1-58f8-4c2d-8d92-f9b8790a0001", "sequence_no": 1, "event_id": "c0e6d9b4-1d89-4b07-b622-1c7b6d111001", "event_type": "finding.assignment", "policy_version": "2025.01", "finding_id": "F-001", "artifact_id": "artifact-1", "actor_id": "system", "actor_type": "system", "occurred_at": "2025-01-01T00:00:00Z", "recorded_at": "2025-01-01T00:00:01Z", "payload": {"comment": "seed event"}, "previous_hash": "0000000000000000000000000000000000000000000000000000000000000000", "event_hash": "0d95f63532b6488407e8fd2e837edb3e9bfc8a2defde232aca99dbfd518558c6", "merkle_leaf_hash": "d08d4da76da50fbe4274a394c73fcaae0180fd591238d224bc7d5efee2ad3696"}
|
||||
{"tenant": "tenant-a", "chain_id": "c8d6f7f1-58f8-4c2d-8d92-f9b8790a0001", "sequence_no": 2, "event_id": "c0e6d9b4-1d89-4b07-b622-1c7b6d111002", "event_type": "finding.comment", "policy_version": "2025.01", "finding_id": "F-001", "artifact_id": "artifact-1", "actor_id": "analyst", "actor_type": "operator", "occurred_at": "2025-01-01T00:00:10Z", "recorded_at": "2025-01-01T00:00:11Z", "payload": {"comment": "follow-up"}, "previous_hash": "PLACEHOLDER", "event_hash": "0e77979af948be38de028a2497f15529473ae5aeb0a95f5d9d648efc8afb9fa3", "merkle_leaf_hash": "2854050efba048f2674ba27fd7dc2f1b65e90e150098bfeeb4fc6e23334c3790"}
|
||||
@@ -0,0 +1,15 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFramework>net10.0</TargetFramework>
|
||||
<LangVersion>preview</LangVersion>
|
||||
<Nullable>enable</Nullable>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
</PropertyGroup>
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\StellaOps.Findings.Ledger.csproj" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1" />
|
||||
</ItemGroup>
|
||||
</Project>
|
||||
@@ -0,0 +1,502 @@
|
||||
using System.CommandLine;
|
||||
using System.Diagnostics;
|
||||
using System.Diagnostics.Metrics;
|
||||
using System.Text.Json;
|
||||
using System.Text.Json.Nodes;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using StellaOps.Findings.Ledger.Domain;
|
||||
using StellaOps.Findings.Ledger.Hashing;
|
||||
using StellaOps.Findings.Ledger.Infrastructure;
|
||||
using StellaOps.Findings.Ledger.Infrastructure.Merkle;
|
||||
using StellaOps.Findings.Ledger.Infrastructure.Postgres;
|
||||
using StellaOps.Findings.Ledger.Infrastructure.Projection;
|
||||
using StellaOps.Findings.Ledger.Options;
|
||||
using StellaOps.Findings.Ledger.Observability;
|
||||
using StellaOps.Findings.Ledger.Services;
|
||||
|
||||
// Command-line options
|
||||
var fixturesOption = new Option<FileInfo[]>(
|
||||
name: "--fixture",
|
||||
description: "NDJSON fixtures containing canonical ledger envelopes (sequence-ordered)")
|
||||
{
|
||||
IsRequired = true
|
||||
};
|
||||
fixturesOption.AllowMultipleArgumentsPerToken = true;
|
||||
|
||||
var connectionOption = new Option<string>(
|
||||
name: "--connection",
|
||||
description: "PostgreSQL connection string for ledger DB")
|
||||
{
|
||||
IsRequired = true
|
||||
};
|
||||
|
||||
var tenantOption = new Option<string>(
|
||||
name: "--tenant",
|
||||
getDefaultValue: () => "tenant-a",
|
||||
description: "Tenant identifier for appended events");
|
||||
|
||||
var maxParallelOption = new Option<int>(
|
||||
name: "--maxParallel",
|
||||
getDefaultValue: () => 4,
|
||||
description: "Maximum concurrent append operations");
|
||||
|
||||
var reportOption = new Option<FileInfo?>(
|
||||
name: "--report",
|
||||
description: "Path to write harness report JSON (with DSSE placeholder)");
|
||||
|
||||
var metricsOption = new Option<FileInfo?>(
|
||||
name: "--metrics",
|
||||
description: "Optional path to write metrics snapshot JSON");
|
||||
|
||||
var root = new RootCommand("Findings Ledger Replay Harness (LEDGER-29-008)");
|
||||
root.AddOption(fixturesOption);
|
||||
root.AddOption(connectionOption);
|
||||
root.AddOption(tenantOption);
|
||||
root.AddOption(maxParallelOption);
|
||||
root.AddOption(reportOption);
|
||||
root.AddOption(metricsOption);
|
||||
|
||||
root.SetHandler(async (FileInfo[] fixtures, string connection, string tenant, int maxParallel, FileInfo? reportFile, FileInfo? metricsFile) =>
|
||||
{
|
||||
await using var host = BuildHost(connection);
|
||||
using var scope = host.Services.CreateScope();
|
||||
|
||||
var writeService = scope.ServiceProvider.GetRequiredService<ILedgerEventWriteService>();
|
||||
var projectionWorker = scope.ServiceProvider.GetRequiredService<LedgerProjectionWorker>();
|
||||
var anchorWorker = scope.ServiceProvider.GetRequiredService<LedgerMerkleAnchorWorker>();
|
||||
var logger = scope.ServiceProvider.GetRequiredService<ILoggerFactory>().CreateLogger("Harness");
|
||||
var timeProvider = scope.ServiceProvider.GetRequiredService<TimeProvider>();
|
||||
|
||||
var cts = new CancellationTokenSource();
|
||||
var projectionTask = projectionWorker.StartAsync(cts.Token);
|
||||
var anchorTask = anchorWorker.StartAsync(cts.Token);
|
||||
|
||||
var (meterListener, metrics) = CreateMeterListener();
|
||||
|
||||
var sw = Stopwatch.StartNew();
|
||||
long eventsWritten = 0;
|
||||
|
||||
await Parallel.ForEachAsync(fixtures, new ParallelOptions { MaxDegreeOfParallelism = maxParallel, CancellationToken = cts.Token }, async (file, token) =>
|
||||
{
|
||||
await foreach (var draft in ReadDraftsAsync(file, tenant, timeProvider, token))
|
||||
{
|
||||
var result = await writeService.AppendAsync(draft, token).ConfigureAwait(false);
|
||||
if (result.Status is LedgerWriteStatus.ValidationFailed or LedgerWriteStatus.Conflict)
|
||||
{
|
||||
throw new InvalidOperationException($"Append failed for {draft.EventId}: {string.Join(",", result.Errors)} ({result.ConflictCode})");
|
||||
}
|
||||
|
||||
Interlocked.Increment(ref eventsWritten);
|
||||
if (eventsWritten % 50_000 == 0)
|
||||
{
|
||||
logger.LogInformation("Appended {Count} events...", eventsWritten);
|
||||
}
|
||||
}
|
||||
}).ConfigureAwait(false);
|
||||
|
||||
// Wait for projector to catch up
|
||||
await Task.Delay(TimeSpan.FromSeconds(2), cts.Token);
|
||||
sw.Stop();
|
||||
|
||||
meterListener.RecordObservableInstruments();
|
||||
|
||||
var verification = await VerifyLedgerAsync(scope.ServiceProvider, tenant, eventsWritten, cts.Token).ConfigureAwait(false);
|
||||
|
||||
var writeLatencyP95Ms = Percentile(metrics.HistDouble("ledger_write_latency_seconds"), 95) * 1000;
|
||||
var rebuildP95Ms = Percentile(metrics.HistDouble("ledger_projection_rebuild_seconds"), 95) * 1000;
|
||||
var projectionLagSeconds = metrics.GaugeDouble("ledger_projection_lag_seconds").DefaultIfEmpty(0).Max();
|
||||
var backlogEvents = metrics.GaugeLong("ledger_ingest_backlog_events").DefaultIfEmpty(0).Max();
|
||||
var dbConnections = metrics.GaugeLong("ledger_db_connections_active").DefaultIfEmpty(0).Sum();
|
||||
|
||||
var report = new HarnessReport(
|
||||
tenant,
|
||||
fixtures.Select(f => f.FullName).ToArray(),
|
||||
eventsWritten,
|
||||
sw.Elapsed.TotalSeconds,
|
||||
status: verification.Success ? "pass" : "fail",
|
||||
WriteLatencyP95Ms: writeLatencyP95Ms,
|
||||
ProjectionRebuildP95Ms: rebuildP95Ms,
|
||||
ProjectionLagSecondsMax: projectionLagSeconds,
|
||||
BacklogEventsMax: backlogEvents,
|
||||
DbConnectionsObserved: dbConnections,
|
||||
VerificationErrors: verification.Errors.ToArray());
|
||||
|
||||
var jsonOptions = new JsonSerializerOptions { WriteIndented = true };
|
||||
var json = JsonSerializer.Serialize(report, jsonOptions);
|
||||
Console.WriteLine(json);
|
||||
|
||||
if (reportFile is not null)
|
||||
{
|
||||
await File.WriteAllTextAsync(reportFile.FullName, json, cts.Token).ConfigureAwait(false);
|
||||
await WriteDssePlaceholderAsync(reportFile.FullName, json, cts.Token).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
if (metricsFile is not null)
|
||||
{
|
||||
var snapshot = metrics.ToSnapshot();
|
||||
var metricsJson = JsonSerializer.Serialize(snapshot, jsonOptions);
|
||||
await File.WriteAllTextAsync(metricsFile.FullName, metricsJson, cts.Token).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
cts.Cancel();
|
||||
await Task.WhenAll(projectionTask, anchorTask).WaitAsync(TimeSpan.FromSeconds(5));
|
||||
}, fixturesOption, connectionOption, tenantOption, maxParallelOption, reportOption, metricsOption);
|
||||
|
||||
await root.InvokeAsync(args);
|
||||
|
||||
static async Task WriteDssePlaceholderAsync(string reportPath, string json, CancellationToken cancellationToken)
|
||||
{
|
||||
using var sha = System.Security.Cryptography.SHA256.Create();
|
||||
var digest = sha.ComputeHash(System.Text.Encoding.UTF8.GetBytes(json));
|
||||
var sig = new
|
||||
{
|
||||
payloadType = "application/vnd.stella-ledger-harness+json",
|
||||
sha256 = Convert.ToHexString(digest).ToLowerInvariant(),
|
||||
signedBy = "harness-local",
|
||||
createdAt = DateTimeOffset.UtcNow
|
||||
};
|
||||
|
||||
var sigJson = JsonSerializer.Serialize(sig, new JsonSerializerOptions { WriteIndented = true });
|
||||
await File.WriteAllTextAsync(reportPath + ".sig", sigJson, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
static (MeterListener Listener, MetricsBag Bag) CreateMeterListener()
|
||||
{
|
||||
var bag = new MetricsBag();
|
||||
var listener = new MeterListener
|
||||
{
|
||||
InstrumentPublished = (instrument, meterListener) =>
|
||||
{
|
||||
if (instrument.Meter.Name == "StellaOps.Findings.Ledger")
|
||||
{
|
||||
meterListener.EnableMeasurementEvents(instrument);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
listener.SetMeasurementEventCallback<double>((instrument, measurement, tags, _) =>
|
||||
{
|
||||
bag.Add(instrument, measurement, tags);
|
||||
});
|
||||
listener.SetMeasurementEventCallback<long>((instrument, measurement, tags, _) =>
|
||||
{
|
||||
bag.Add(instrument, measurement, tags);
|
||||
});
|
||||
|
||||
listener.Start();
|
||||
return (listener, bag);
|
||||
}
|
||||
|
||||
static IHost BuildHost(string connectionString)
|
||||
{
|
||||
return Host.CreateDefaultBuilder()
|
||||
.ConfigureLogging(logging =>
|
||||
{
|
||||
logging.ClearProviders();
|
||||
logging.AddSimpleConsole(options =>
|
||||
{
|
||||
options.SingleLine = true;
|
||||
options.TimestampFormat = "HH:mm:ss ";
|
||||
});
|
||||
})
|
||||
.ConfigureServices(services =>
|
||||
{
|
||||
services.Configure<LedgerServiceOptions>(opts =>
|
||||
{
|
||||
opts.Database.ConnectionString = connectionString;
|
||||
});
|
||||
|
||||
services.AddSingleton<TimeProvider>(_ => TimeProvider.System);
|
||||
services.AddSingleton<LedgerDataSource>();
|
||||
services.AddSingleton<ILedgerEventRepository, PostgresLedgerEventRepository>();
|
||||
services.AddSingleton<IFindingProjectionRepository, NoOpProjectionRepository>();
|
||||
services.AddSingleton<ILedgerEventStream, PostgresLedgerEventStream>();
|
||||
services.AddSingleton<IPolicyEvaluationService, NoOpPolicyEvaluationService>();
|
||||
services.AddSingleton<IMerkleAnchorRepository, NoOpMerkleAnchorRepository>();
|
||||
services.AddSingleton<LedgerAnchorQueue>();
|
||||
services.AddSingleton<IMerkleAnchorScheduler, QueueMerkleAnchorScheduler>();
|
||||
services.AddSingleton<LedgerMerkleAnchorWorker>();
|
||||
services.AddSingleton<LedgerProjectionWorker>();
|
||||
services.AddSingleton<ILedgerEventWriteService, LedgerEventWriteService>();
|
||||
})
|
||||
.Build();
|
||||
}
|
||||
|
||||
static async IAsyncEnumerable<LedgerEventDraft> ReadDraftsAsync(FileInfo file, string tenant, TimeProvider timeProvider, [EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
await using var stream = file.OpenRead();
|
||||
using var reader = new StreamReader(stream);
|
||||
var recordedAtBase = timeProvider.GetUtcNow();
|
||||
|
||||
while (!reader.EndOfStream)
|
||||
{
|
||||
var line = await reader.ReadLineAsync().ConfigureAwait(false);
|
||||
if (string.IsNullOrWhiteSpace(line))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
var node = JsonNode.Parse(line)?.AsObject();
|
||||
if (node is null)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
yield return ToDraft(node, tenant, recordedAtBase);
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
}
|
||||
}
|
||||
|
||||
static LedgerEventDraft ToDraft(JsonObject node, string defaultTenant, DateTimeOffset recordedAtBase)
|
||||
{
|
||||
string required(string name) => node[name]?.GetValue<string>() ?? throw new InvalidOperationException($"{name} missing");
|
||||
|
||||
var tenantId = node.TryGetPropertyValue("tenant", out var tenantNode)
|
||||
? tenantNode!.GetValue<string>()
|
||||
: defaultTenant;
|
||||
|
||||
var chainId = Guid.Parse(required("chain_id"));
|
||||
var sequence = node["sequence_no"]?.GetValue<long>() ?? node["sequence"]?.GetValue<long>() ?? throw new InvalidOperationException("sequence_no missing");
|
||||
var eventId = Guid.Parse(required("event_id"));
|
||||
var eventType = required("event_type");
|
||||
var policyVersion = required("policy_version");
|
||||
var findingId = required("finding_id");
|
||||
var artifactId = required("artifact_id");
|
||||
var sourceRunId = node.TryGetPropertyValue("source_run_id", out var sourceRunNode) && sourceRunNode is not null && !string.IsNullOrWhiteSpace(sourceRunNode.GetValue<string>())
|
||||
? Guid.Parse(sourceRunNode!.GetValue<string>())
|
||||
: null;
|
||||
var actorId = required("actor_id");
|
||||
var actorType = required("actor_type");
|
||||
var occurredAt = DateTimeOffset.Parse(required("occurred_at"));
|
||||
var recordedAt = node.TryGetPropertyValue("recorded_at", out var recordedAtNode) && recordedAtNode is not null
|
||||
? DateTimeOffset.Parse(recordedAtNode.GetValue<string>())
|
||||
: recordedAtBase;
|
||||
|
||||
var payload = node.TryGetPropertyValue("payload", out var payloadNode) && payloadNode is JsonObject payloadObj
|
||||
? payloadObj
|
||||
: throw new InvalidOperationException("payload missing");
|
||||
|
||||
var canonicalEnvelope = LedgerCanonicalJsonSerializer.Canonicalize(payload);
|
||||
var prev = node.TryGetPropertyValue("previous_hash", out var prevNode) ? prevNode?.GetValue<string>() : null;
|
||||
|
||||
return new LedgerEventDraft(
|
||||
tenantId,
|
||||
chainId,
|
||||
sequence,
|
||||
eventId,
|
||||
eventType,
|
||||
policyVersion,
|
||||
findingId,
|
||||
artifactId,
|
||||
sourceRunId,
|
||||
actorId,
|
||||
actorType,
|
||||
occurredAt,
|
||||
recordedAt,
|
||||
payload,
|
||||
canonicalEnvelope,
|
||||
prev);
|
||||
}
|
||||
|
||||
static async Task<VerificationResult> VerifyLedgerAsync(IServiceProvider services, string tenant, long expectedEvents, CancellationToken cancellationToken)
|
||||
{
|
||||
var errors = new List<string>();
|
||||
var dataSource = services.GetRequiredService<LedgerDataSource>();
|
||||
|
||||
await using var connection = await dataSource.OpenConnectionAsync(tenant, "verify", cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// Count check
|
||||
await using (var countCommand = new Npgsql.NpgsqlCommand("select count(*) from ledger_events where tenant_id = @tenant", connection))
|
||||
{
|
||||
countCommand.Parameters.AddWithValue("tenant", tenant);
|
||||
var count = (long)await countCommand.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false);
|
||||
if (count < expectedEvents)
|
||||
{
|
||||
errors.Add($"event_count_mismatch:{count}/{expectedEvents}");
|
||||
}
|
||||
}
|
||||
|
||||
// Sequence and hash verification
|
||||
const string query = """
|
||||
select chain_id, sequence_no, event_id, event_body, event_hash, previous_hash, merkle_leaf_hash
|
||||
from ledger_events
|
||||
where tenant_id = @tenant
|
||||
order by chain_id, sequence_no
|
||||
""";
|
||||
|
||||
await using var command = new Npgsql.NpgsqlCommand(query, connection);
|
||||
command.Parameters.AddWithValue("tenant", tenant);
|
||||
|
||||
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
Guid? currentChain = null;
|
||||
long expectedSequence = 1;
|
||||
string? prevHash = null;
|
||||
|
||||
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
var chainId = reader.GetGuid(0);
|
||||
var sequence = reader.GetInt64(1);
|
||||
var eventId = reader.GetGuid(2);
|
||||
var eventBodyJson = reader.GetString(3);
|
||||
var eventHash = reader.GetString(4);
|
||||
var previousHash = reader.GetString(5);
|
||||
var merkleLeafHash = reader.GetString(6);
|
||||
|
||||
if (currentChain != chainId)
|
||||
{
|
||||
currentChain = chainId;
|
||||
expectedSequence = 1;
|
||||
prevHash = LedgerEventConstants.EmptyHash;
|
||||
}
|
||||
|
||||
if (sequence != expectedSequence)
|
||||
{
|
||||
errors.Add($"sequence_gap:{chainId}:{sequence}");
|
||||
}
|
||||
|
||||
if (!string.Equals(previousHash, prevHash, StringComparison.Ordinal))
|
||||
{
|
||||
errors.Add($"previous_hash_mismatch:{chainId}:{sequence}");
|
||||
}
|
||||
|
||||
var node = JsonNode.Parse(eventBodyJson)?.AsObject() ?? new JsonObject();
|
||||
var canonical = LedgerCanonicalJsonSerializer.Canonicalize(node);
|
||||
var hashResult = LedgerHashing.ComputeHashes(canonical, sequence);
|
||||
|
||||
if (!string.Equals(hashResult.EventHash, eventHash, StringComparison.Ordinal))
|
||||
{
|
||||
errors.Add($"event_hash_mismatch:{eventId}");
|
||||
}
|
||||
|
||||
if (!string.Equals(hashResult.MerkleLeafHash, merkleLeafHash, StringComparison.Ordinal))
|
||||
{
|
||||
errors.Add($"merkle_leaf_mismatch:{eventId}");
|
||||
}
|
||||
|
||||
prevHash = eventHash;
|
||||
expectedSequence++;
|
||||
}
|
||||
|
||||
if (errors.Count == 0)
|
||||
{
|
||||
// Additional check: projector caught up (no lag > 0)
|
||||
var lagMax = LedgerMetricsSnapshot.LagMax;
|
||||
if (lagMax > 0)
|
||||
{
|
||||
errors.Add($"projection_lag_remaining:{lagMax}");
|
||||
}
|
||||
}
|
||||
|
||||
return new VerificationResult(errors.Count == 0, errors);
|
||||
}
|
||||
|
||||
static double Percentile(IEnumerable<double> values, double percentile)
|
||||
{
|
||||
var data = values.Where(v => !double.IsNaN(v)).OrderBy(v => v).ToArray();
|
||||
if (data.Length == 0)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
var rank = (percentile / 100.0) * (data.Length - 1);
|
||||
var lowerIndex = (int)Math.Floor(rank);
|
||||
var upperIndex = (int)Math.Ceiling(rank);
|
||||
if (lowerIndex == upperIndex)
|
||||
{
|
||||
return data[lowerIndex];
|
||||
}
|
||||
|
||||
var fraction = rank - lowerIndex;
|
||||
return data[lowerIndex] + (data[upperIndex] - data[lowerIndex]) * fraction;
|
||||
}
|
||||
|
||||
internal sealed record HarnessReport(
|
||||
string Tenant,
|
||||
IReadOnlyList<string> Fixtures,
|
||||
long EventsWritten,
|
||||
double DurationSeconds,
|
||||
string Status,
|
||||
double WriteLatencyP95Ms,
|
||||
double ProjectionRebuildP95Ms,
|
||||
double ProjectionLagSecondsMax,
|
||||
double BacklogEventsMax,
|
||||
long DbConnectionsObserved,
|
||||
IReadOnlyList<string> VerificationErrors);
|
||||
|
||||
internal sealed record VerificationResult(bool Success, IReadOnlyList<string> Errors);
|
||||
|
||||
internal sealed class MetricsBag
|
||||
{
|
||||
private readonly List<(string Name, double Value)> doubles = new();
|
||||
private readonly List<(string Name, long Value)> longs = new();
|
||||
|
||||
public void Add(Instrument instrument, double value, ReadOnlySpan<KeyValuePair<string, object?>> _)
|
||||
=> doubles.Add((instrument.Name, value));
|
||||
|
||||
public void Add(Instrument instrument, long value, ReadOnlySpan<KeyValuePair<string, object?>> _)
|
||||
=> longs.Add((instrument.Name, value));
|
||||
|
||||
public IEnumerable<double> HistDouble(string name) => doubles.Where(d => d.Name == name).Select(d => d.Value);
|
||||
public IEnumerable<double> GaugeDouble(string name) => doubles.Where(d => d.Name == name).Select(d => d.Value);
|
||||
public IEnumerable<long> GaugeLong(string name) => longs.Where(l => l.Name == name).Select(l => l.Value);
|
||||
|
||||
public object ToSnapshot() => new
|
||||
{
|
||||
doubles = doubles.GroupBy(x => x.Name).ToDictionary(g => g.Key, g => g.Select(v => v.Value).ToArray()),
|
||||
longs = longs.GroupBy(x => x.Name).ToDictionary(g => g.Key, g => g.Select(v => v.Value).ToArray())
|
||||
};
|
||||
}
|
||||
|
||||
// Harness lightweight no-op implementations for projection/merkle to keep replay fast
|
||||
internal sealed class NoOpPolicyEvaluationService : IPolicyEvaluationService
|
||||
{
|
||||
public Task<PolicyEvaluationResult> EvaluateAsync(LedgerEventRecord record, FindingProjection? current, CancellationToken cancellationToken)
|
||||
{
|
||||
return Task.FromResult(new PolicyEvaluationResult("noop", record.OccurredAt, record.RecordedAt, current?.Status ?? "new"));
|
||||
}
|
||||
}
|
||||
|
||||
internal sealed class NoOpProjectionRepository : IFindingProjectionRepository
|
||||
{
|
||||
public Task<FindingProjection?> GetAsync(string tenantId, string findingId, string policyVersion, CancellationToken cancellationToken) =>
|
||||
Task.FromResult<FindingProjection?>(null);
|
||||
|
||||
public Task InsertActionAsync(FindingAction action, CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
|
||||
public Task InsertHistoryAsync(FindingHistory history, CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
|
||||
public Task SaveCheckpointAsync(ProjectionCheckpoint checkpoint, CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
|
||||
public Task<ProjectionCheckpoint> GetCheckpointAsync(CancellationToken cancellationToken) =>
|
||||
Task.FromResult(new ProjectionCheckpoint(DateTimeOffset.MinValue, Guid.Empty, DateTimeOffset.MinValue));
|
||||
|
||||
public Task UpsertAsync(FindingProjection projection, CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
|
||||
public Task EnsureIndexesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
}
|
||||
|
||||
internal sealed class NoOpMerkleAnchorRepository : IMerkleAnchorRepository
|
||||
{
|
||||
public Task InsertAsync(string tenantId, Guid anchorId, DateTimeOffset windowStart, DateTimeOffset windowEnd, long sequenceStart, long sequenceEnd, string rootHash, long leafCount, DateTime anchoredAt, string? anchorReference, CancellationToken cancellationToken)
|
||||
=> Task.CompletedTask;
|
||||
|
||||
public Task<MerkleAnchor?> GetLatestAsync(string tenantId, CancellationToken cancellationToken) =>
|
||||
Task.FromResult<MerkleAnchor?>(null);
|
||||
}
|
||||
|
||||
internal sealed class QueueMerkleAnchorScheduler : IMerkleAnchorScheduler
|
||||
{
|
||||
private readonly LedgerAnchorQueue _queue;
|
||||
|
||||
public QueueMerkleAnchorScheduler(LedgerAnchorQueue queue)
|
||||
{
|
||||
_queue = queue ?? throw new ArgumentNullException(nameof(queue));
|
||||
}
|
||||
|
||||
public Task EnqueueAsync(LedgerEventRecord record, CancellationToken cancellationToken)
|
||||
=> _queue.EnqueueAsync(record, cancellationToken).AsTask();
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
import json
|
||||
import sys
|
||||
from hashlib import sha256
|
||||
|
||||
EMPTY_PREV = "0" * 64
|
||||
|
||||
|
||||
def canonical(obj):
|
||||
return json.dumps(obj, separators=(",", ":"), sort_keys=True)
|
||||
|
||||
|
||||
def hash_event(payload, sequence_no):
|
||||
canonical_json = canonical(payload).encode()
|
||||
event_hash = sha256(canonical_json + str(sequence_no).encode()).hexdigest()
|
||||
merkle_leaf = sha256(event_hash.encode()).hexdigest()
|
||||
return event_hash, merkle_leaf
|
||||
|
||||
|
||||
def main(path):
|
||||
out_lines = []
|
||||
last_hash = {}
|
||||
with open(path, "r") as f:
|
||||
events = [json.loads(line) for line in f if line.strip()]
|
||||
events.sort(key=lambda e: (e["chain_id"], e["sequence_no"]))
|
||||
for e in events:
|
||||
prev = e.get("previous_hash") or last_hash.get(e["chain_id"], EMPTY_PREV)
|
||||
payload = e.get("payload") or e
|
||||
event_hash, leaf = hash_event(payload, e["sequence_no"])
|
||||
e["event_hash"] = event_hash
|
||||
e["merkle_leaf_hash"] = leaf
|
||||
e["previous_hash"] = prev
|
||||
last_hash[e["chain_id"]] = event_hash
|
||||
out_lines.append(json.dumps(e))
|
||||
with open(path, "w") as f:
|
||||
for line in out_lines:
|
||||
f.write(line + "\n")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) != 2:
|
||||
print("usage: compute_hashes.py <ndjson>")
|
||||
sys.exit(1)
|
||||
main(sys.argv[1])
|
||||
@@ -0,0 +1,37 @@
|
||||
using System.Text.Json;
|
||||
using LedgerReplayHarness;
|
||||
using FluentAssertions;
|
||||
using Xunit;
|
||||
|
||||
namespace StellaOps.Findings.Ledger.Tests;
|
||||
|
||||
public class HarnessRunnerTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task HarnessRunner_WritesReportAndValidatesHashes()
|
||||
{
|
||||
var fixturePath = Path.Combine(AppContext.BaseDirectory, "fixtures", "sample.ndjson");
|
||||
var tempReport = Path.GetTempFileName();
|
||||
|
||||
try
|
||||
{
|
||||
var exitCode = await HarnessRunner.RunAsync(new[] { fixturePath }, "tenant-test", tempReport);
|
||||
exitCode.Should().Be(0);
|
||||
|
||||
var json = await File.ReadAllTextAsync(tempReport);
|
||||
using var doc = JsonDocument.Parse(json);
|
||||
doc.RootElement.GetProperty("eventsWritten").GetInt64().Should().BeGreaterThan(0);
|
||||
doc.RootElement.GetProperty("status").GetString().Should().Be("pass");
|
||||
doc.RootElement.GetProperty("tenant").GetString().Should().Be("tenant-test");
|
||||
doc.RootElement.GetProperty("hashSummary").GetProperty("uniqueEventHashes").GetInt32().Should().Be(1);
|
||||
doc.RootElement.GetProperty("hashSummary").GetProperty("uniqueMerkleLeaves").GetInt32().Should().Be(1);
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (File.Exists(tempReport))
|
||||
{
|
||||
File.Delete(tempReport);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,223 @@
|
||||
using System.Diagnostics.Metrics;
|
||||
using System.Linq;
|
||||
using FluentAssertions;
|
||||
using StellaOps.Findings.Ledger.Observability;
|
||||
using Xunit;
|
||||
|
||||
namespace StellaOps.Findings.Ledger.Tests;
|
||||
|
||||
public class LedgerMetricsTests
|
||||
{
|
||||
[Fact]
|
||||
public void ProjectionLagGauge_RecordsLatestPerTenant()
|
||||
{
|
||||
using var listener = CreateListener();
|
||||
var measurements = new List<Measurement<double>>();
|
||||
|
||||
listener.SetMeasurementEventCallback<double>((instrument, measurement, tags, state) =>
|
||||
{
|
||||
if (instrument.Name == "ledger_projection_lag_seconds")
|
||||
{
|
||||
measurements.Add(measurement);
|
||||
}
|
||||
});
|
||||
|
||||
LedgerMetrics.RecordProjectionLag(TimeSpan.FromSeconds(42), "tenant-a");
|
||||
|
||||
listener.RecordObservableInstruments();
|
||||
|
||||
var measurement = measurements.Should().ContainSingle().Subject;
|
||||
measurement.Value.Should().BeApproximately(42, precision: 0.001);
|
||||
measurement.Tags.ToDictionary(kvp => kvp.Key, kvp => kvp.Value)
|
||||
.Should().Contain(new KeyValuePair<string, object?>("tenant", "tenant-a"));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MerkleAnchorDuration_EmitsHistogramMeasurement()
|
||||
{
|
||||
using var listener = CreateListener();
|
||||
var measurements = new List<Measurement<double>>();
|
||||
|
||||
listener.SetMeasurementEventCallback<double>((instrument, measurement, tags, state) =>
|
||||
{
|
||||
if (instrument.Name == "ledger_merkle_anchor_duration_seconds")
|
||||
{
|
||||
measurements.Add(measurement);
|
||||
}
|
||||
});
|
||||
|
||||
LedgerMetrics.RecordMerkleAnchorDuration(TimeSpan.FromSeconds(1.5), "tenant-b");
|
||||
|
||||
var measurement = measurements.Should().ContainSingle().Subject;
|
||||
measurement.Value.Should().BeApproximately(1.5, precision: 0.001);
|
||||
measurement.Tags.ToDictionary(kvp => kvp.Key, kvp => kvp.Value)
|
||||
.Should().Contain(new KeyValuePair<string, object?>("tenant", "tenant-b"));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MerkleAnchorFailure_IncrementsCounter()
|
||||
{
|
||||
using var listener = CreateListener();
|
||||
var measurements = new List<Measurement<long>>();
|
||||
|
||||
listener.SetMeasurementEventCallback<long>((instrument, measurement, tags, state) =>
|
||||
{
|
||||
if (instrument.Name == "ledger_merkle_anchor_failures_total")
|
||||
{
|
||||
measurements.Add(measurement);
|
||||
}
|
||||
});
|
||||
|
||||
LedgerMetrics.RecordMerkleAnchorFailure("tenant-c", "persist_failure");
|
||||
|
||||
var measurement = measurements.Should().ContainSingle().Subject;
|
||||
measurement.Value.Should().Be(1);
|
||||
var tags = measurement.Tags.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
|
||||
tags.Should().Contain(new KeyValuePair<string, object?>("tenant", "tenant-c"));
|
||||
tags.Should().Contain(new KeyValuePair<string, object?>("reason", "persist_failure"));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void AttachmentFailure_IncrementsCounter()
|
||||
{
|
||||
using var listener = CreateListener();
|
||||
var measurements = new List<Measurement<long>>();
|
||||
|
||||
listener.SetMeasurementEventCallback<long>((instrument, measurement, tags, state) =>
|
||||
{
|
||||
if (instrument.Name == "ledger_attachments_encryption_failures_total")
|
||||
{
|
||||
measurements.Add(measurement);
|
||||
}
|
||||
});
|
||||
|
||||
LedgerMetrics.RecordAttachmentFailure("tenant-d", "encrypt");
|
||||
|
||||
var measurement = measurements.Should().ContainSingle().Subject;
|
||||
measurement.Value.Should().Be(1);
|
||||
var tags = measurement.Tags.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
|
||||
tags.Should().Contain(new KeyValuePair<string, object?>("tenant", "tenant-d"));
|
||||
tags.Should().Contain(new KeyValuePair<string, object?>("stage", "encrypt"));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void BacklogGauge_ReflectsOutstandingQueue()
|
||||
{
|
||||
using var listener = CreateListener();
|
||||
var measurements = new List<Measurement<long>>();
|
||||
|
||||
// Reset
|
||||
LedgerMetrics.DecrementBacklog("tenant-q");
|
||||
|
||||
LedgerMetrics.IncrementBacklog("tenant-q");
|
||||
LedgerMetrics.IncrementBacklog("tenant-q");
|
||||
LedgerMetrics.DecrementBacklog("tenant-q");
|
||||
|
||||
listener.SetMeasurementEventCallback<long>((instrument, measurement, tags, state) =>
|
||||
{
|
||||
if (instrument.Name == "ledger_ingest_backlog_events")
|
||||
{
|
||||
measurements.Add(measurement);
|
||||
}
|
||||
});
|
||||
|
||||
listener.RecordObservableInstruments();
|
||||
|
||||
var measurement = measurements.Should().ContainSingle().Subject;
|
||||
measurement.Value.Should().Be(1);
|
||||
measurement.Tags.ToDictionary(kvp => kvp.Key, kvp => kvp.Value)
|
||||
.Should().Contain(new KeyValuePair<string, object?>("tenant", "tenant-q"));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ProjectionRebuildHistogram_RecordsScenarioTags()
|
||||
{
|
||||
using var listener = CreateListener();
|
||||
var measurements = new List<Measurement<double>>();
|
||||
|
||||
listener.SetMeasurementEventCallback<double>((instrument, measurement, tags, state) =>
|
||||
{
|
||||
if (instrument.Name == "ledger_projection_rebuild_seconds")
|
||||
{
|
||||
measurements.Add(measurement);
|
||||
}
|
||||
});
|
||||
|
||||
LedgerMetrics.RecordProjectionRebuild(TimeSpan.FromSeconds(3.2), "tenant-r", "replay");
|
||||
|
||||
var measurement = measurements.Should().ContainSingle().Subject;
|
||||
measurement.Value.Should().BeApproximately(3.2, 0.001);
|
||||
var tags = measurement.Tags.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
|
||||
tags.Should().Contain(new KeyValuePair<string, object?>("tenant", "tenant-r"));
|
||||
tags.Should().Contain(new KeyValuePair<string, object?>("scenario", "replay"));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void DbConnectionsGauge_TracksRoleCounts()
|
||||
{
|
||||
using var listener = CreateListener();
|
||||
var measurements = new List<Measurement<long>>();
|
||||
|
||||
// Reset
|
||||
LedgerMetrics.DecrementDbConnection("writer");
|
||||
|
||||
LedgerMetrics.IncrementDbConnection("writer");
|
||||
|
||||
listener.SetMeasurementEventCallback<long>((instrument, measurement, tags, state) =>
|
||||
{
|
||||
if (instrument.Name == "ledger_db_connections_active")
|
||||
{
|
||||
measurements.Add(measurement);
|
||||
}
|
||||
});
|
||||
|
||||
listener.RecordObservableInstruments();
|
||||
|
||||
var measurement = measurements.Should().ContainSingle().Subject;
|
||||
measurement.Value.Should().Be(1);
|
||||
measurement.Tags.ToDictionary(kvp => kvp.Key, kvp => kvp.Value)
|
||||
.Should().Contain(new KeyValuePair<string, object?>("role", "writer"));
|
||||
|
||||
LedgerMetrics.DecrementDbConnection("writer");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void VersionInfoGauge_EmitsConstantOne()
|
||||
{
|
||||
using var listener = CreateListener();
|
||||
var measurements = new List<Measurement<long>>();
|
||||
|
||||
listener.SetMeasurementEventCallback<long>((instrument, measurement, tags, state) =>
|
||||
{
|
||||
if (instrument.Name == "ledger_app_version_info")
|
||||
{
|
||||
measurements.Add(measurement);
|
||||
}
|
||||
});
|
||||
|
||||
listener.RecordObservableInstruments();
|
||||
|
||||
var measurement = measurements.Should().ContainSingle().Subject;
|
||||
measurement.Value.Should().Be(1);
|
||||
var tags = measurement.Tags.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
|
||||
tags.Should().ContainKey("version");
|
||||
tags.Should().ContainKey("git_sha");
|
||||
}
|
||||
|
||||
private static MeterListener CreateListener()
|
||||
{
|
||||
var listener = new MeterListener
|
||||
{
|
||||
InstrumentPublished = (instrument, l) =>
|
||||
{
|
||||
if (instrument.Meter.Name == "StellaOps.Findings.Ledger")
|
||||
{
|
||||
l.EnableMeasurementEvents(instrument);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
listener.Start();
|
||||
return listener;
|
||||
}
|
||||
}
|
||||
148
src/Findings/tools/LedgerReplayHarness/HarnessRunner.cs
Normal file
148
src/Findings/tools/LedgerReplayHarness/HarnessRunner.cs
Normal file
@@ -0,0 +1,148 @@
|
||||
using System.Text.Json;
|
||||
using System.Text.Json.Nodes;
|
||||
using StellaOps.Findings.Ledger.Domain;
|
||||
using StellaOps.Findings.Ledger.Hashing;
|
||||
|
||||
namespace LedgerReplayHarness;
|
||||
|
||||
public sealed class HarnessRunner
|
||||
{
|
||||
private readonly ILedgerClient _client;
|
||||
private readonly int _maxParallel;
|
||||
|
||||
public HarnessRunner(ILedgerClient client, int maxParallel = 4)
|
||||
{
|
||||
_client = client ?? throw new ArgumentNullException(nameof(client));
|
||||
_maxParallel = maxParallel <= 0 ? 1 : maxParallel;
|
||||
}
|
||||
|
||||
public async Task<int> RunAsync(IEnumerable<string> fixtures, string tenant, string reportPath, CancellationToken cancellationToken)
|
||||
{
|
||||
if (fixtures is null || !fixtures.Any())
|
||||
{
|
||||
throw new ArgumentException("At least one fixture is required.", nameof(fixtures));
|
||||
}
|
||||
|
||||
var stats = new HarnessStats();
|
||||
|
||||
tenant = string.IsNullOrWhiteSpace(tenant) ? "default" : tenant;
|
||||
reportPath = string.IsNullOrWhiteSpace(reportPath) ? "harness-report.json" : reportPath;
|
||||
|
||||
var eventCount = 0L;
|
||||
var hashesValid = true;
|
||||
DateTimeOffset? earliest = null;
|
||||
DateTimeOffset? latest = null;
|
||||
var latencies = new List<double>();
|
||||
var leafHashes = new List<string>();
|
||||
string? expectedMerkleRoot = null;
|
||||
var latencies = new ConcurrentBag<double>();
|
||||
var swTotal = Stopwatch.StartNew();
|
||||
|
||||
var throttler = new TaskThrottler(_maxParallel);
|
||||
|
||||
foreach (var fixture in fixtures)
|
||||
{
|
||||
await foreach (var line in ReadLinesAsync(fixture, cancellationToken))
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(line)) continue;
|
||||
var node = JsonNode.Parse(line)?.AsObject();
|
||||
if (node is null) continue;
|
||||
|
||||
eventCount++;
|
||||
var recordedAt = node["recorded_at"]?.GetValue<DateTimeOffset>() ?? DateTimeOffset.UtcNow;
|
||||
earliest = earliest is null ? recordedAt : DateTimeOffset.Compare(recordedAt, earliest.Value) < 0 ? recordedAt : earliest;
|
||||
latest = latest is null
|
||||
? recordedAt
|
||||
: DateTimeOffset.Compare(recordedAt, latest.Value) > 0 ? recordedAt : latest;
|
||||
|
||||
if (node["canonical_envelope"] is JsonObject envelope && node["sequence_no"] is not null)
|
||||
{
|
||||
var seq = node["sequence_no"]!.GetValue<long>();
|
||||
var computed = LedgerHashing.ComputeHashes(envelope, seq);
|
||||
var expected = node["event_hash"]?.GetValue<string>();
|
||||
if (!string.IsNullOrEmpty(expected) && !string.Equals(expected, computed.EventHash, StringComparison.Ordinal))
|
||||
{
|
||||
hashesValid = false;
|
||||
}
|
||||
|
||||
stats.UpdateHashes(computed.EventHash, computed.MerkleLeafHash);
|
||||
leafHashes.Add(computed.MerkleLeafHash);
|
||||
expectedMerkleRoot ??= node["merkle_root"]?.GetValue<string>();
|
||||
|
||||
// enqueue for concurrent append
|
||||
var record = new LedgerEventRecord(
|
||||
tenant,
|
||||
envelope["chain_id"]?.GetValue<Guid>() ?? Guid.Empty,
|
||||
seq,
|
||||
envelope["event_id"]?.GetValue<Guid>() ?? Guid.Empty,
|
||||
envelope["event_type"]?.GetValue<string>() ?? string.Empty,
|
||||
envelope["policy_version"]?.GetValue<string>() ?? string.Empty,
|
||||
envelope["finding_id"]?.GetValue<string>() ?? string.Empty,
|
||||
envelope["artifact_id"]?.GetValue<string>() ?? string.Empty,
|
||||
envelope["source_run_id"]?.GetValue<Guid?>(),
|
||||
envelope["actor_id"]?.GetValue<string>() ?? "system",
|
||||
envelope["actor_type"]?.GetValue<string>() ?? "system",
|
||||
envelope["occurred_at"]?.GetValue<DateTimeOffset>() ?? recordedAt,
|
||||
recordedAt,
|
||||
envelope,
|
||||
computed.EventHash,
|
||||
envelope["previous_hash"]?.GetValue<string>() ?? string.Empty,
|
||||
computed.MerkleLeafHash,
|
||||
computed.CanonicalJson);
|
||||
|
||||
// fire-and-track latency
|
||||
await throttler.RunAsync(async () =>
|
||||
{
|
||||
var sw = Stopwatch.StartNew();
|
||||
await _client.AppendAsync(record, cancellationToken).ConfigureAwait(false);
|
||||
sw.Stop();
|
||||
latencies.Add(sw.Elapsed.TotalMilliseconds);
|
||||
}, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
await throttler.DrainAsync(cancellationToken).ConfigureAwait(false);
|
||||
swTotal.Stop();
|
||||
|
||||
var latencyArray = latencies.ToArray();
|
||||
Array.Sort(latencyArray);
|
||||
double p95 = latencyArray.Length == 0 ? 0 : latencyArray[(int)Math.Ceiling(latencyArray.Length * 0.95) - 1];
|
||||
|
||||
string? computedRoot = leafHashes.Count == 0 ? null : MerkleCalculator.ComputeRoot(leafHashes);
|
||||
var merkleOk = expectedMerkleRoot is null || string.Equals(expectedMerkleRoot, computedRoot, StringComparison.OrdinalIgnoreCase);
|
||||
|
||||
var report = new
|
||||
{
|
||||
tenant,
|
||||
fixtures = fixtures.ToArray(),
|
||||
eventsWritten = eventCount,
|
||||
durationSeconds = Math.Max(swTotal.Elapsed.TotalSeconds, (latest - earliest)?.TotalSeconds ?? 0),
|
||||
throughputEps = swTotal.Elapsed.TotalSeconds > 0 ? eventCount / swTotal.Elapsed.TotalSeconds : 0,
|
||||
latencyP95Ms = p95,
|
||||
projectionLagMaxSeconds = 0,
|
||||
cpuPercentMax = 0,
|
||||
memoryMbMax = 0,
|
||||
status = hashesValid && merkleOk ? "pass" : "fail",
|
||||
timestamp = DateTimeOffset.UtcNow.ToString("O"),
|
||||
hashSummary = stats.ToReport(),
|
||||
merkleRoot = computedRoot,
|
||||
merkleExpected = expectedMerkleRoot
|
||||
};
|
||||
|
||||
var json = JsonSerializer.Serialize(report, new JsonSerializerOptions { WriteIndented = true });
|
||||
await File.WriteAllTextAsync(reportPath, json);
|
||||
return hashesValid && merkleOk ? 0 : 1;
|
||||
}
|
||||
|
||||
private static async IAsyncEnumerable<string> ReadLinesAsync(string path, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
await using var stream = File.OpenRead(path);
|
||||
using var reader = new StreamReader(stream);
|
||||
string? line;
|
||||
while (!reader.EndOfStream && !cancellationToken.IsCancellationRequested && (line = await reader.ReadLineAsync()) is not null)
|
||||
{
|
||||
yield return line;
|
||||
}
|
||||
}
|
||||
}
|
||||
26
src/Findings/tools/LedgerReplayHarness/HarnessStats.cs
Normal file
26
src/Findings/tools/LedgerReplayHarness/HarnessStats.cs
Normal file
@@ -0,0 +1,26 @@
|
||||
namespace LedgerReplayHarness;
|
||||
|
||||
internal sealed class HarnessStats
|
||||
{
|
||||
private readonly HashSet<string> _eventHashes = new(StringComparer.OrdinalIgnoreCase);
|
||||
private readonly HashSet<string> _leafHashes = new(StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
public void UpdateHashes(string eventHash, string leafHash)
|
||||
{
|
||||
if (!string.IsNullOrWhiteSpace(eventHash))
|
||||
{
|
||||
_eventHashes.Add(eventHash);
|
||||
}
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(leafHash))
|
||||
{
|
||||
_leafHashes.Add(leafHash);
|
||||
}
|
||||
}
|
||||
|
||||
public object ToReport() => new
|
||||
{
|
||||
uniqueEventHashes = _eventHashes.Count,
|
||||
uniqueMerkleLeaves = _leafHashes.Count
|
||||
};
|
||||
}
|
||||
8
src/Findings/tools/LedgerReplayHarness/ILedgerClient.cs
Normal file
8
src/Findings/tools/LedgerReplayHarness/ILedgerClient.cs
Normal file
@@ -0,0 +1,8 @@
|
||||
using StellaOps.Findings.Ledger.Domain;
|
||||
|
||||
namespace LedgerReplayHarness;
|
||||
|
||||
public interface ILedgerClient
|
||||
{
|
||||
Task AppendAsync(LedgerEventRecord record, CancellationToken cancellationToken);
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
using System.Collections.Concurrent;
|
||||
using StellaOps.Findings.Ledger.Domain;
|
||||
|
||||
namespace LedgerReplayHarness;
|
||||
|
||||
public sealed class InMemoryLedgerClient : ILedgerClient
|
||||
{
|
||||
private readonly ConcurrentDictionary<(string Tenant, Guid EventId), LedgerEventRecord> _store = new();
|
||||
|
||||
public Task AppendAsync(LedgerEventRecord record, CancellationToken cancellationToken)
|
||||
{
|
||||
_store.TryAdd((record.TenantId, record.EventId), record);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFramework>net10.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
</PropertyGroup>
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\\..\\StellaOps.Findings.Ledger\\StellaOps.Findings.Ledger.csproj" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1" />
|
||||
</ItemGroup>
|
||||
</Project>
|
||||
41
src/Findings/tools/LedgerReplayHarness/MerkleCalculator.cs
Normal file
41
src/Findings/tools/LedgerReplayHarness/MerkleCalculator.cs
Normal file
@@ -0,0 +1,41 @@
|
||||
using System.Security.Cryptography;
|
||||
using System.Text;
|
||||
|
||||
namespace LedgerReplayHarness;
|
||||
|
||||
internal static class MerkleCalculator
|
||||
{
|
||||
public static string ComputeRoot(IReadOnlyList<string> leafHashes)
|
||||
{
|
||||
if (leafHashes is null || leafHashes.Count == 0)
|
||||
{
|
||||
throw new ArgumentException("At least one leaf hash is required.", nameof(leafHashes));
|
||||
}
|
||||
|
||||
var level = leafHashes.Select(Normalize).ToList();
|
||||
while (level.Count > 1)
|
||||
{
|
||||
var next = new List<string>((level.Count + 1) / 2);
|
||||
for (int i = 0; i < level.Count; i += 2)
|
||||
{
|
||||
var left = level[i];
|
||||
var right = i + 1 < level.Count ? level[i + 1] : level[i];
|
||||
next.Add(HashPair(left, right));
|
||||
}
|
||||
level = next;
|
||||
}
|
||||
|
||||
return level[0];
|
||||
}
|
||||
|
||||
private static string Normalize(string hex)
|
||||
=> hex?.Trim().ToLowerInvariant() ?? string.Empty;
|
||||
|
||||
private static string HashPair(string left, string right)
|
||||
{
|
||||
using var sha = SHA256.Create();
|
||||
var data = Encoding.UTF8.GetBytes(left + right);
|
||||
var hash = sha.ComputeHash(data);
|
||||
return Convert.ToHexString(hash).ToLowerInvariant();
|
||||
}
|
||||
}
|
||||
22
src/Findings/tools/LedgerReplayHarness/Program.cs
Normal file
22
src/Findings/tools/LedgerReplayHarness/Program.cs
Normal file
@@ -0,0 +1,22 @@
|
||||
using System.CommandLine;
|
||||
using LedgerReplayHarness;
|
||||
|
||||
var fixtureOption = new Option<string[]>("--fixture", "NDJSON fixture path(s)") { IsRequired = true, AllowMultipleArgumentsPerToken = true };
|
||||
var tenantOption = new Option<string>("--tenant", () => "default", "Tenant identifier");
|
||||
var reportOption = new Option<string>("--report", () => "harness-report.json", "Path to write JSON report");
|
||||
var parallelOption = new Option<int>("--maxParallel", () => 4, "Maximum parallelism when sending events");
|
||||
|
||||
var root = new RootCommand("Findings Ledger replay & determinism harness");
|
||||
root.AddOption(fixtureOption);
|
||||
root.AddOption(tenantOption);
|
||||
root.AddOption(reportOption);
|
||||
root.AddOption(parallelOption);
|
||||
|
||||
root.SetHandler(async (fixtures, tenant, report, maxParallel) =>
|
||||
{
|
||||
var runner = new HarnessRunner(new InMemoryLedgerClient(), maxParallel);
|
||||
var exitCode = await runner.RunAsync(fixtures, tenant, report, CancellationToken.None);
|
||||
Environment.Exit(exitCode);
|
||||
}, fixtureOption, tenantOption, reportOption, parallelOption);
|
||||
|
||||
return await root.InvokeAsync(args);
|
||||
36
src/Findings/tools/LedgerReplayHarness/TaskThrottler.cs
Normal file
36
src/Findings/tools/LedgerReplayHarness/TaskThrottler.cs
Normal file
@@ -0,0 +1,36 @@
|
||||
namespace LedgerReplayHarness;
|
||||
|
||||
internal sealed class TaskThrottler
|
||||
{
|
||||
private readonly SemaphoreSlim _semaphore;
|
||||
private readonly List<Task> _tasks = new();
|
||||
|
||||
public TaskThrottler(int maxDegreeOfParallelism)
|
||||
{
|
||||
_semaphore = new SemaphoreSlim(maxDegreeOfParallelism > 0 ? maxDegreeOfParallelism : 1);
|
||||
}
|
||||
|
||||
public async Task RunAsync(Func<Task> taskFactory, CancellationToken cancellationToken)
|
||||
{
|
||||
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
var task = Task.Run(async () =>
|
||||
{
|
||||
try
|
||||
{
|
||||
await taskFactory().ConfigureAwait(false);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_semaphore.Release();
|
||||
}
|
||||
}, cancellationToken);
|
||||
lock (_tasks) _tasks.Add(task);
|
||||
}
|
||||
|
||||
public async Task DrainAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
Task[] pending;
|
||||
lock (_tasks) pending = _tasks.ToArray();
|
||||
await Task.WhenAll(pending).WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user