feat: Add VEX compact fixture and implement offline verifier for Findings Ledger exports
- Introduced a new VEX compact fixture for testing purposes. - Implemented `verify_export.py` script to validate Findings Ledger exports, ensuring deterministic ordering and applying redaction manifests. - Added a lightweight stub `HarnessRunner` for unit tests to validate ledger hashing expectations. - Documented tasks related to the Mirror Creator. - Created models for entropy signals and implemented the `EntropyPenaltyCalculator` to compute penalties based on scanner outputs. - Developed unit tests for `EntropyPenaltyCalculator` to ensure correct penalty calculations and handling of edge cases. - Added tests for symbol ID normalization in the reachability scanner. - Enhanced console status service with comprehensive unit tests for connection handling and error recovery. - Included Cosign tool version 2.6.0 with checksums for various platforms.
This commit is contained in:
@@ -25,6 +25,7 @@ using StellaOps.Findings.Ledger.WebService.Mappings;
|
||||
using StellaOps.Findings.Ledger.WebService.Services;
|
||||
using StellaOps.Telemetry.Core;
|
||||
using StellaOps.Findings.Ledger.Services.Security;
|
||||
using StellaOps.Findings.Ledger.Observability;
|
||||
|
||||
const string LedgerWritePolicy = "ledger.events.write";
|
||||
const string LedgerExportPolicy = "ledger.export.read";
|
||||
@@ -45,6 +46,8 @@ var bootstrapOptions = builder.Configuration.BindOptions<LedgerServiceOptions>(
|
||||
LedgerServiceOptions.SectionName,
|
||||
(opts, _) => opts.Validate());
|
||||
|
||||
LedgerMetrics.ConfigureQuotas(bootstrapOptions.Quotas.MaxIngestBacklog);
|
||||
|
||||
builder.Host.UseSerilog((context, services, loggerConfiguration) =>
|
||||
{
|
||||
loggerConfiguration
|
||||
|
||||
@@ -21,7 +21,7 @@ public sealed class LedgerAnchorQueue
|
||||
public ValueTask EnqueueAsync(LedgerEventRecord record, CancellationToken cancellationToken)
|
||||
{
|
||||
var writeTask = _channel.Writer.WriteAsync(record, cancellationToken);
|
||||
LedgerMetrics.IncrementBacklog();
|
||||
LedgerMetrics.IncrementBacklog(record.TenantId);
|
||||
return writeTask;
|
||||
}
|
||||
|
||||
|
||||
@@ -37,7 +37,7 @@ public sealed class LedgerMerkleAnchorWorker : BackgroundService
|
||||
{
|
||||
await foreach (var record in _queue.ReadAllAsync(stoppingToken))
|
||||
{
|
||||
LedgerMetrics.DecrementBacklog();
|
||||
LedgerMetrics.DecrementBacklog(record.TenantId);
|
||||
await HandleEventAsync(record, stoppingToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Diagnostics.Metrics;
|
||||
using System.Reflection;
|
||||
|
||||
namespace StellaOps.Findings.Ledger.Observability;
|
||||
|
||||
@@ -22,6 +24,14 @@ internal static class LedgerMetrics
|
||||
"ledger_events_total",
|
||||
description: "Number of ledger events appended.");
|
||||
|
||||
private static readonly Counter<long> BackpressureApplied = Meter.CreateCounter<long>(
|
||||
"ledger_backpressure_applied_total",
|
||||
description: "Times ingest backpressure thresholds were exceeded.");
|
||||
|
||||
private static readonly Counter<long> QuotaRejections = Meter.CreateCounter<long>(
|
||||
"ledger_quota_rejections_total",
|
||||
description: "Requests rejected due to configured quotas.");
|
||||
|
||||
private static readonly Histogram<double> ProjectionApplySeconds = Meter.CreateHistogram<double>(
|
||||
"ledger_projection_apply_seconds",
|
||||
unit: "s",
|
||||
@@ -45,21 +55,38 @@ internal static class LedgerMetrics
|
||||
"ledger_merkle_anchor_failures_total",
|
||||
description: "Count of Merkle anchor failures by reason.");
|
||||
|
||||
private static readonly Counter<long> AttachmentsEncryptionFailures = Meter.CreateCounter<long>(
|
||||
"ledger_attachments_encryption_failures_total",
|
||||
description: "Count of attachment encryption/signing/upload failures.");
|
||||
|
||||
private static readonly ObservableGauge<double> ProjectionLagGauge =
|
||||
Meter.CreateObservableGauge("ledger_projection_lag_seconds", ObserveProjectionLag, unit: "s",
|
||||
description: "Lag between ledger recorded_at and projection application time.");
|
||||
|
||||
private static readonly ObservableGauge<long> IngestBacklogGauge =
|
||||
Meter.CreateObservableGauge("ledger_ingest_backlog_events", ObserveBacklog,
|
||||
description: "Number of events buffered for ingestion/anchoring.");
|
||||
description: "Number of events buffered for ingestion/anchoring per tenant.");
|
||||
|
||||
private static readonly ObservableGauge<long> QuotaRemainingGauge =
|
||||
Meter.CreateObservableGauge("ledger_quota_remaining", ObserveQuotaRemaining,
|
||||
description: "Remaining ingest backlog capacity before backpressure applies.");
|
||||
|
||||
private static readonly ObservableGauge<long> DbConnectionsGauge =
|
||||
Meter.CreateObservableGauge("ledger_db_connections_active", ObserveDbConnections,
|
||||
description: "Active PostgreSQL connections by role.");
|
||||
|
||||
private static readonly ObservableGauge<long> AppVersionGauge =
|
||||
Meter.CreateObservableGauge("ledger_app_version_info", ObserveAppVersion,
|
||||
description: "Static gauge exposing build version and git sha.");
|
||||
|
||||
private static readonly ConcurrentDictionary<string, double> ProjectionLagByTenant = new(StringComparer.Ordinal);
|
||||
private static readonly ConcurrentDictionary<string, long> DbConnectionsByRole = new(StringComparer.OrdinalIgnoreCase);
|
||||
private static long _ingestBacklog;
|
||||
private static readonly ConcurrentDictionary<string, long> BacklogByTenant = new(StringComparer.Ordinal);
|
||||
|
||||
private static long _ingestBacklogLimit = 5000;
|
||||
|
||||
private static readonly string AppVersion = Assembly.GetExecutingAssembly().GetName().Version?.ToString() ?? "0.0.0";
|
||||
private static readonly string GitSha = Environment.GetEnvironmentVariable("GIT_SHA") ?? "unknown";
|
||||
|
||||
public static void RecordWriteSuccess(TimeSpan duration, string? tenantId, string? eventType, string? source)
|
||||
{
|
||||
@@ -127,17 +154,55 @@ internal static class LedgerMetrics
|
||||
MerkleAnchorFailures.Add(1, tags);
|
||||
}
|
||||
|
||||
public static void IncrementBacklog() => Interlocked.Increment(ref _ingestBacklog);
|
||||
|
||||
public static void DecrementBacklog()
|
||||
public static void RecordAttachmentFailure(string tenantId, string stage)
|
||||
{
|
||||
var value = Interlocked.Decrement(ref _ingestBacklog);
|
||||
if (value < 0)
|
||||
var tags = new KeyValuePair<string, object?>[]
|
||||
{
|
||||
Interlocked.Exchange(ref _ingestBacklog, 0);
|
||||
new("tenant", tenantId),
|
||||
new("stage", stage)
|
||||
};
|
||||
AttachmentsEncryptionFailures.Add(1, tags);
|
||||
}
|
||||
|
||||
public static void ConfigureQuotas(long ingestBacklogLimit)
|
||||
{
|
||||
if (ingestBacklogLimit > 0)
|
||||
{
|
||||
Interlocked.Exchange(ref _ingestBacklogLimit, ingestBacklogLimit);
|
||||
}
|
||||
}
|
||||
|
||||
public static long IncrementBacklog(string? tenantId = null)
|
||||
{
|
||||
var key = NormalizeTenant(tenantId);
|
||||
var backlog = BacklogByTenant.AddOrUpdate(key, _ => 1, (_, current) => current + 1);
|
||||
if (backlog > _ingestBacklogLimit)
|
||||
{
|
||||
BackpressureApplied.Add(1, new KeyValuePair<string, object?>[]
|
||||
{
|
||||
new("tenant", key),
|
||||
new("reason", "ingest_backlog"),
|
||||
new("limit", _ingestBacklogLimit)
|
||||
});
|
||||
}
|
||||
return backlog;
|
||||
}
|
||||
|
||||
public static void RecordQuotaRejection(string tenantId, string reason)
|
||||
{
|
||||
QuotaRejections.Add(1, new KeyValuePair<string, object?>[]
|
||||
{
|
||||
new("tenant", NormalizeTenant(tenantId)),
|
||||
new("reason", reason)
|
||||
});
|
||||
}
|
||||
|
||||
public static void DecrementBacklog(string? tenantId = null)
|
||||
{
|
||||
var key = NormalizeTenant(tenantId);
|
||||
BacklogByTenant.AddOrUpdate(key, _ => 0, (_, current) => Math.Max(0, current - 1));
|
||||
}
|
||||
|
||||
public static void ConnectionOpened(string role)
|
||||
{
|
||||
var normalized = NormalizeRole(role);
|
||||
@@ -150,12 +215,19 @@ internal static class LedgerMetrics
|
||||
DbConnectionsByRole.AddOrUpdate(normalized, _ => 0, (_, current) => Math.Max(0, current - 1));
|
||||
}
|
||||
|
||||
public static void IncrementDbConnection(string role) => ConnectionOpened(role);
|
||||
|
||||
public static void DecrementDbConnection(string role) => ConnectionClosed(role);
|
||||
|
||||
public static void UpdateProjectionLag(string? tenantId, double lagSeconds)
|
||||
{
|
||||
var key = string.IsNullOrWhiteSpace(tenantId) ? string.Empty : tenantId;
|
||||
ProjectionLagByTenant[key] = lagSeconds < 0 ? 0 : lagSeconds;
|
||||
}
|
||||
|
||||
public static void RecordProjectionLag(TimeSpan lag, string? tenantId) =>
|
||||
UpdateProjectionLag(tenantId, lag.TotalSeconds);
|
||||
|
||||
private static IEnumerable<Measurement<double>> ObserveProjectionLag()
|
||||
{
|
||||
foreach (var kvp in ProjectionLagByTenant)
|
||||
@@ -166,7 +238,19 @@ internal static class LedgerMetrics
|
||||
|
||||
private static IEnumerable<Measurement<long>> ObserveBacklog()
|
||||
{
|
||||
yield return new Measurement<long>(Interlocked.Read(ref _ingestBacklog));
|
||||
foreach (var kvp in BacklogByTenant)
|
||||
{
|
||||
yield return new Measurement<long>(kvp.Value, new KeyValuePair<string, object?>("tenant", kvp.Key));
|
||||
}
|
||||
}
|
||||
|
||||
private static IEnumerable<Measurement<long>> ObserveQuotaRemaining()
|
||||
{
|
||||
foreach (var kvp in BacklogByTenant)
|
||||
{
|
||||
var remaining = Math.Max(0, _ingestBacklogLimit - kvp.Value);
|
||||
yield return new Measurement<long>(remaining, new KeyValuePair<string, object?>("tenant", kvp.Key));
|
||||
}
|
||||
}
|
||||
|
||||
private static IEnumerable<Measurement<long>> ObserveDbConnections()
|
||||
@@ -177,5 +261,13 @@ internal static class LedgerMetrics
|
||||
}
|
||||
}
|
||||
|
||||
private static IEnumerable<Measurement<long>> ObserveAppVersion()
|
||||
{
|
||||
yield return new Measurement<long>(1, new KeyValuePair<string, object?>("version", AppVersion),
|
||||
new KeyValuePair<string, object?>("git_sha", GitSha));
|
||||
}
|
||||
|
||||
private static string NormalizeRole(string role) => string.IsNullOrWhiteSpace(role) ? "unspecified" : role.ToLowerInvariant();
|
||||
|
||||
private static string NormalizeTenant(string? tenantId) => string.IsNullOrWhiteSpace(tenantId) ? string.Empty : tenantId;
|
||||
}
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace StellaOps.Findings.Ledger.Options;
|
||||
|
||||
public sealed class LedgerServiceOptions
|
||||
@@ -16,6 +19,8 @@ public sealed class LedgerServiceOptions
|
||||
|
||||
public AttachmentsOptions Attachments { get; init; } = new();
|
||||
|
||||
public QuotaOptions Quotas { get; init; } = new();
|
||||
|
||||
public void Validate()
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(Database.ConnectionString))
|
||||
@@ -50,6 +55,7 @@ public sealed class LedgerServiceOptions
|
||||
|
||||
PolicyEngine.Validate();
|
||||
Attachments.Validate();
|
||||
Quotas.Validate();
|
||||
}
|
||||
|
||||
public sealed class DatabaseOptions
|
||||
@@ -207,4 +213,19 @@ public sealed class LedgerServiceOptions
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public sealed class QuotaOptions
|
||||
{
|
||||
private const int DefaultBacklog = 5000;
|
||||
|
||||
public long MaxIngestBacklog { get; set; } = DefaultBacklog;
|
||||
|
||||
public void Validate()
|
||||
{
|
||||
if (MaxIngestBacklog <= 0)
|
||||
{
|
||||
throw new InvalidOperationException("Quotas.MaxIngestBacklog must be greater than zero.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,3 +13,4 @@ Status changes must be mirrored in `docs/implplan/SPRINT_0120_0000_0001_policy_r
|
||||
| Task ID | Status | Notes | Updated (UTC) |
|
||||
| --- | --- | --- | --- |
|
||||
| LEDGER-OBS-54-001 | DONE | Implemented `/v1/ledger/attestations` with deterministic paging, filter hash guard, and schema/OpenAPI updates. | 2025-11-22 |
|
||||
| LEDGER-GAPS-121-009 | DONE | FL1–FL10 remediation: schema catalog + export canonicals, Merkle/external anchor policy, tenant isolation/redaction manifest, offline verifier + checksum guard, golden fixtures, backpressure metrics. | 2025-12-02 |
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
{"shape":"export.v1.canonical","advisoryId":"ADV-2025-010","source":"mirror:nvd","title":"Template injection in sample app","description":"Unsanitised template input leads to RCE.","cwes":["CWE-94"],"cvss":{"version":"3.1","vector":"CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H","baseScore":9.8},"epss":{"score":0.72,"percentile":0.98},"kev":true,"published":"2025-11-28T09:00:00Z","modified":"2025-11-30T18:00:00Z","status":"active","projectionVersion":"cycle:v1","cycleHash":"4b2e8ff08bd7cce5d6feaa9ab1c7de8ef9b0c1d2e3f405162738495a0b1c2d3e","provenance":{"ledgerRoot":"8c7d6e5f4c3b2a1908172635443321ffeeddbbccaa99887766554433221100aa","projectorVersion":"proj-1.0.0","policyHash":"sha256:policy-v1","filtersHash":"c6d7e8f9a0b1c2d3e4f50617283940aa5544332211ffeeccbb99887766554433"}}
|
||||
@@ -0,0 +1,2 @@
|
||||
{"shape":"export.v1.canonical","findingId":"artifact:sha256:5c1f5f2e1b7c4d8a9e0f123456789abc|pkg:npm/lodash@4.17.21|cve:CVE-2025-1111","eventSequence":42,"observedAt":"2025-12-01T10:00:00Z","policyVersion":"sha256:policy-v1","projectionVersion":"cycle:v1","status":"triaged","severity":6.7,"risk":{"score":8.2,"severity":"high","profileVersion":"risk-profile-v2","explanationId":"550e8400-e29b-41d4-a716-446655440000"},"advisories":[{"id":"ADV-2025-001","cwes":["CWE-79"]}],"evidenceBundleRef":{"digest":"sha256:evidence-001","dsseDigest":"sha256:dsse-001","timelineRef":"timeline://events/123"},"cycleHash":"1f0b6bb757a4dbe2d3c96786b9d4da3e4c3a5d35b4c1a1e5c2e4b9d1786f3d11","provenance":{"ledgerRoot":"9d8f6c1a2b3c4d5e6f708192837465aa9b8c7d6e5f4c3b2a1908172635443321","projectorVersion":"proj-1.0.0","policyHash":"sha256:policy-v1","filtersHash":"a81d6c6d2bcf9c0e7cbb1fcd292e4b7cc21f6d5c4e3f2b1a0c9d8e7f6c5b4a3e"}}
|
||||
{"shape":"export.v1.canonical","findingId":"artifact:sha256:7d2e4f6a8b9c0d1e2f3a4b5c6d7e8f90|pkg:pypi/django@5.0.0|cve:CVE-2025-2222","eventSequence":84,"observedAt":"2025-12-01T10:30:00Z","policyVersion":"sha256:policy-v1","projectionVersion":"cycle:v1","status":"affected","severity":8.9,"risk":{"score":9.4,"severity":"critical","profileVersion":"risk-profile-v2","explanationId":"660e8400-e29b-41d4-a716-446655440000"},"advisories":[{"id":"ADV-2025-014","cwes":["CWE-352"],"kev":true}],"evidenceBundleRef":{"digest":"sha256:evidence-014","dsseDigest":"sha256:dsse-014","timelineRef":"timeline://events/987"},"cycleHash":"2e0c7cc868b5ecc3e4da7897c0e5eb4f5d4b6c47c5d2b2f6c3f5c0e2897f4e22","provenance":{"ledgerRoot":"8c7d6e5f4c3b2a1908172635443321ffeeddbbccaa99887766554433221100aa","projectorVersion":"proj-1.0.0","policyHash":"sha256:policy-v1","filtersHash":"a81d6c6d2bcf9c0e7cbb1fcd292e4b7cc21f6d5c4e3f2b1a0c9d8e7f6c5b4a3e"}}
|
||||
@@ -0,0 +1 @@
|
||||
{"shape":"export.v1.compact","sbomId":"sbom-oci-sha256-abc123","subject":{"digest":"sha256:abc123","mediaType":"application/vnd.oci.image.manifest.v1+json"},"sbomFormat":"spdx-json","createdAt":"2025-11-30T21:00:00Z","componentsCount":142,"hasVulnerabilities":true,"materials":["sha256:layer-001","sha256:layer-002"],"projectionVersion":"cycle:v1","cycleHash":"5c3f90019ce8ddf6e7ffbbaa1c8eef90a1b2c3d4e5f60718293a4b5c6d7e8f90","provenance":{"ledgerRoot":"9d8f6c1a2b3c4d5e6f708192837465aa9b8c7d6e5f4c3b2a1908172635443321","projectorVersion":"proj-1.0.0","policyHash":"sha256:policy-v1","filtersHash":"d7e8f9a0b1c2d3e4f50617283940aa5544332211ffeeccbb9988776655443322"}}
|
||||
@@ -0,0 +1 @@
|
||||
{"shape":"export.v1.compact","vexStatementId":"vex-2025-0001","product":{"purl":"pkg:npm/lodash@4.17.21"},"status":"not_affected","statusJustification":"component_not_present","knownExploited":false,"timestamp":"2025-12-01T11:00:00Z","policyVersion":"sha256:policy-v1","projectionVersion":"cycle:v1","cycleHash":"3a1d7ee97ac6fdd4e5fb98a8c1f6ec5d6c7d8e9fa0b1c2d3e4f506172839405f","provenance":{"ledgerRoot":"9d8f6c1a2b3c4d5e6f708192837465aa9b8c7d6e5f4c3b2a1908172635443321","projectorVersion":"proj-1.0.0","policyHash":"sha256:policy-v1","filtersHash":"b5c6d7e8f9a0b1c2d3e4f50617283940aa5544332211ffeeccbb998877665544"}}
|
||||
@@ -1,6 +1,8 @@
|
||||
using System.CommandLine;
|
||||
using System.Diagnostics;
|
||||
using System.Diagnostics.Metrics;
|
||||
using System.Security.Cryptography;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using System.Text.Json.Nodes;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
@@ -51,6 +53,10 @@ var metricsOption = new Option<FileInfo?>(
|
||||
name: "--metrics",
|
||||
description: "Optional path to write metrics snapshot JSON");
|
||||
|
||||
var expectedChecksumOption = new Option<FileInfo?>(
|
||||
name: "--expected-checksum",
|
||||
description: "Optional JSON file containing expected eventStream/projection checksums");
|
||||
|
||||
var root = new RootCommand("Findings Ledger Replay Harness (LEDGER-29-008)");
|
||||
root.AddOption(fixturesOption);
|
||||
root.AddOption(connectionOption);
|
||||
@@ -58,8 +64,9 @@ root.AddOption(tenantOption);
|
||||
root.AddOption(maxParallelOption);
|
||||
root.AddOption(reportOption);
|
||||
root.AddOption(metricsOption);
|
||||
root.AddOption(expectedChecksumOption);
|
||||
|
||||
root.SetHandler(async (FileInfo[] fixtures, string connection, string tenant, int maxParallel, FileInfo? reportFile, FileInfo? metricsFile) =>
|
||||
root.SetHandler(async (FileInfo[] fixtures, string connection, string tenant, int maxParallel, FileInfo? reportFile, FileInfo? metricsFile, FileInfo? expectedChecksumsFile) =>
|
||||
{
|
||||
await using var host = BuildHost(connection);
|
||||
using var scope = host.Services.CreateScope();
|
||||
@@ -103,7 +110,7 @@ root.SetHandler(async (FileInfo[] fixtures, string connection, string tenant, in
|
||||
|
||||
meterListener.RecordObservableInstruments();
|
||||
|
||||
var verification = await VerifyLedgerAsync(scope.ServiceProvider, tenant, eventsWritten, cts.Token).ConfigureAwait(false);
|
||||
var verification = await VerifyLedgerAsync(scope.ServiceProvider, tenant, eventsWritten, expectedChecksumsFile, cts.Token).ConfigureAwait(false);
|
||||
|
||||
var writeDurations = metrics.HistDouble("ledger_write_duration_seconds").Concat(metrics.HistDouble("ledger_write_latency_seconds"));
|
||||
var writeLatencyP95Ms = Percentile(writeDurations, 95) * 1000;
|
||||
@@ -123,6 +130,8 @@ root.SetHandler(async (FileInfo[] fixtures, string connection, string tenant, in
|
||||
ProjectionLagSecondsMax: projectionLagSeconds,
|
||||
BacklogEventsMax: backlogEvents,
|
||||
DbConnectionsObserved: dbConnections,
|
||||
EventStreamChecksum: verification.EventStreamChecksum,
|
||||
ProjectionChecksum: verification.ProjectionChecksum,
|
||||
VerificationErrors: verification.Errors.ToArray());
|
||||
|
||||
var jsonOptions = new JsonSerializerOptions { WriteIndented = true };
|
||||
@@ -132,7 +141,8 @@ root.SetHandler(async (FileInfo[] fixtures, string connection, string tenant, in
|
||||
if (reportFile is not null)
|
||||
{
|
||||
await File.WriteAllTextAsync(reportFile.FullName, json, cts.Token).ConfigureAwait(false);
|
||||
await WriteDssePlaceholderAsync(reportFile.FullName, json, cts.Token).ConfigureAwait(false);
|
||||
var policyHash = Environment.GetEnvironmentVariable("LEDGER_POLICY_HASH");
|
||||
await WriteDssePlaceholderAsync(reportFile.FullName, json, policyHash, cts.Token).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
if (metricsFile is not null)
|
||||
@@ -148,7 +158,7 @@ root.SetHandler(async (FileInfo[] fixtures, string connection, string tenant, in
|
||||
|
||||
await root.InvokeAsync(args);
|
||||
|
||||
static async Task WriteDssePlaceholderAsync(string reportPath, string json, CancellationToken cancellationToken)
|
||||
static async Task WriteDssePlaceholderAsync(string reportPath, string json, string? policyHash, CancellationToken cancellationToken)
|
||||
{
|
||||
using var sha = System.Security.Cryptography.SHA256.Create();
|
||||
var digest = sha.ComputeHash(System.Text.Encoding.UTF8.GetBytes(json));
|
||||
@@ -156,6 +166,8 @@ static async Task WriteDssePlaceholderAsync(string reportPath, string json, Canc
|
||||
{
|
||||
payloadType = "application/vnd.stella-ledger-harness+json",
|
||||
sha256 = Convert.ToHexString(digest).ToLowerInvariant(),
|
||||
policyHash = policyHash ?? string.Empty,
|
||||
schemaVersion = "ledger.harness.v1",
|
||||
signedBy = "harness-local",
|
||||
createdAt = DateTimeOffset.UtcNow
|
||||
};
|
||||
@@ -210,6 +222,8 @@ static IHost BuildHost(string connectionString)
|
||||
opts.Database.ConnectionString = connectionString;
|
||||
});
|
||||
|
||||
LedgerMetrics.ConfigureQuotas(20_000);
|
||||
|
||||
services.AddSingleton<TimeProvider>(_ => TimeProvider.System);
|
||||
services.AddSingleton<LedgerDataSource>();
|
||||
services.AddSingleton<ILedgerEventRepository, PostgresLedgerEventRepository>();
|
||||
@@ -302,13 +316,17 @@ static LedgerEventDraft ToDraft(JsonObject node, string defaultTenant, DateTimeO
|
||||
prev);
|
||||
}
|
||||
|
||||
static async Task<VerificationResult> VerifyLedgerAsync(IServiceProvider services, string tenant, long expectedEvents, CancellationToken cancellationToken)
|
||||
static async Task<VerificationResult> VerifyLedgerAsync(IServiceProvider services, string tenant, long expectedEvents, FileInfo? expectedChecksumsFile, CancellationToken cancellationToken)
|
||||
{
|
||||
var errors = new List<string>();
|
||||
var dataSource = services.GetRequiredService<LedgerDataSource>();
|
||||
var expectedChecksums = LoadExpectedChecksums(expectedChecksumsFile);
|
||||
|
||||
await using var connection = await dataSource.OpenConnectionAsync(tenant, "verify", cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var eventHasher = IncrementalHash.CreateHash(HashAlgorithmName.SHA256);
|
||||
var projectionHasher = IncrementalHash.CreateHash(HashAlgorithmName.SHA256);
|
||||
|
||||
// Count check
|
||||
await using (var countCommand = new Npgsql.NpgsqlCommand("select count(*) from ledger_events where tenant_id = @tenant", connection))
|
||||
{
|
||||
@@ -346,6 +364,7 @@ static async Task<VerificationResult> VerifyLedgerAsync(IServiceProvider service
|
||||
var eventHash = reader.GetString(4);
|
||||
var previousHash = reader.GetString(5);
|
||||
var merkleLeafHash = reader.GetString(6);
|
||||
eventHasher.AppendData(Encoding.UTF8.GetBytes($"{eventHash}:{sequence}\n"));
|
||||
|
||||
if (currentChain != chainId)
|
||||
{
|
||||
@@ -382,17 +401,47 @@ static async Task<VerificationResult> VerifyLedgerAsync(IServiceProvider service
|
||||
expectedSequence++;
|
||||
}
|
||||
|
||||
if (errors.Count == 0)
|
||||
// Projection checksum
|
||||
try
|
||||
{
|
||||
// Additional check: projector caught up (no lag > 0)
|
||||
var lagMax = LedgerMetricsSnapshot.LagMax;
|
||||
if (lagMax > 0)
|
||||
await using var projectionCommand = new Npgsql.NpgsqlCommand("""
|
||||
select finding_id, policy_version, cycle_hash
|
||||
from findings_projection
|
||||
where tenant_id = @tenant
|
||||
order by finding_id, policy_version
|
||||
""", connection);
|
||||
projectionCommand.Parameters.AddWithValue("tenant", tenant);
|
||||
|
||||
await using var projectionReader = await projectionCommand.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
|
||||
while (await projectionReader.ReadAsync(cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
errors.Add($"projection_lag_remaining:{lagMax}");
|
||||
var findingId = projectionReader.GetString(0);
|
||||
var policyVersion = projectionReader.GetString(1);
|
||||
var cycleHash = projectionReader.GetString(2);
|
||||
projectionHasher.AppendData(Encoding.UTF8.GetBytes($"{findingId}:{policyVersion}:{cycleHash}\n"));
|
||||
}
|
||||
}
|
||||
catch (Exception ex) when (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
errors.Add($"projection_checksum_error:{ex.GetType().Name}");
|
||||
}
|
||||
|
||||
return new VerificationResult(errors.Count == 0, errors);
|
||||
var eventStreamChecksum = Convert.ToHexString(eventHasher.GetHashAndReset()).ToLowerInvariant();
|
||||
var projectionChecksum = Convert.ToHexString(projectionHasher.GetHashAndReset()).ToLowerInvariant();
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(expectedChecksums.EventStream) &&
|
||||
!eventStreamChecksum.Equals(expectedChecksums.EventStream, StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
errors.Add($"event_checksum_mismatch:{eventStreamChecksum}");
|
||||
}
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(expectedChecksums.Projection) &&
|
||||
!projectionChecksum.Equals(expectedChecksums.Projection, StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
errors.Add($"projection_checksum_mismatch:{projectionChecksum}");
|
||||
}
|
||||
|
||||
return new VerificationResult(errors.Count == 0, errors, eventStreamChecksum, projectionChecksum);
|
||||
}
|
||||
|
||||
static double Percentile(IEnumerable<double> values, double percentile)
|
||||
@@ -426,9 +475,16 @@ internal sealed record HarnessReport(
|
||||
double ProjectionLagSecondsMax,
|
||||
double BacklogEventsMax,
|
||||
long DbConnectionsObserved,
|
||||
string EventStreamChecksum,
|
||||
string ProjectionChecksum,
|
||||
IReadOnlyList<string> VerificationErrors);
|
||||
|
||||
internal sealed record VerificationResult(bool Success, IReadOnlyList<string> Errors);
|
||||
internal sealed record VerificationResult(bool Success, IReadOnlyList<string> Errors, string EventStreamChecksum, string ProjectionChecksum);
|
||||
|
||||
internal sealed record ExpectedChecksums(string? EventStream, string? Projection)
|
||||
{
|
||||
public static ExpectedChecksums Empty { get; } = new(null, null);
|
||||
}
|
||||
|
||||
internal sealed class MetricsBag
|
||||
{
|
||||
@@ -452,6 +508,20 @@ internal sealed class MetricsBag
|
||||
};
|
||||
}
|
||||
|
||||
static ExpectedChecksums LoadExpectedChecksums(FileInfo? file)
|
||||
{
|
||||
if (file is null)
|
||||
{
|
||||
return ExpectedChecksums.Empty;
|
||||
}
|
||||
|
||||
using var doc = JsonDocument.Parse(File.ReadAllText(file.FullName));
|
||||
var root = doc.RootElement;
|
||||
var eventStream = root.TryGetProperty("eventStream", out var ev) ? ev.GetString() : null;
|
||||
var projection = root.TryGetProperty("projection", out var pr) ? pr.GetString() : null;
|
||||
return new ExpectedChecksums(eventStream, projection);
|
||||
}
|
||||
|
||||
// Harness lightweight no-op implementations for projection/merkle to keep replay fast
|
||||
internal sealed class NoOpPolicyEvaluationService : IPolicyEvaluationService
|
||||
{
|
||||
|
||||
@@ -0,0 +1,128 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Offline verifier for Findings Ledger exports (FL8).
|
||||
- Validates deterministic ordering and applies redaction manifest.
|
||||
- Computes per-line and dataset SHA-256 digests.
|
||||
"""
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List
|
||||
|
||||
|
||||
def load_manifest(path: Path) -> Dict[str, Any]:
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(path)
|
||||
with path.open("r", encoding="utf-8") as f:
|
||||
if path.suffix in (".json", ".ndjson"):
|
||||
return json.load(f)
|
||||
return yaml_manifest(f.read(), path)
|
||||
|
||||
|
||||
def yaml_manifest(content: str, path: Path) -> Dict[str, Any]:
|
||||
try:
|
||||
import yaml # type: ignore
|
||||
except ImportError as exc: # pragma: no cover - optional dependency
|
||||
raise RuntimeError(
|
||||
f"YAML manifest requested but PyYAML is not installed. "
|
||||
f"Install pyyaml or provide JSON manifest instead ({path})."
|
||||
) from exc
|
||||
return yaml.safe_load(content)
|
||||
|
||||
|
||||
def apply_rule(obj: Any, segments: List[str], action: str, mask_with: str | None, hash_with: str | None) -> None:
|
||||
if not segments:
|
||||
return
|
||||
key = segments[0]
|
||||
is_array = key.endswith("[*]")
|
||||
if is_array:
|
||||
key = key[:-3]
|
||||
if isinstance(obj, dict) and key in obj:
|
||||
target = obj[key]
|
||||
else:
|
||||
return
|
||||
|
||||
if len(segments) == 1:
|
||||
if action == "drop":
|
||||
obj.pop(key, None)
|
||||
elif action == "mask":
|
||||
obj[key] = mask_with or "<masked>"
|
||||
elif action == "hash":
|
||||
if isinstance(target, str):
|
||||
obj[key] = hashlib.sha256(target.encode("utf-8")).hexdigest()
|
||||
else:
|
||||
remaining = segments[1:]
|
||||
if is_array and isinstance(target, list):
|
||||
for item in target:
|
||||
apply_rule(item, remaining, action, mask_with, hash_with)
|
||||
elif isinstance(target, dict):
|
||||
apply_rule(target, remaining, action, mask_with, hash_with)
|
||||
|
||||
|
||||
def apply_manifest(record: Dict[str, Any], manifest: Dict[str, Any], shape: str) -> None:
|
||||
rules = manifest.get("rules", {}).get(shape, [])
|
||||
for rule in rules:
|
||||
path = rule.get("path")
|
||||
action = rule.get("action")
|
||||
if not path or not action:
|
||||
continue
|
||||
segments = path.replace("$.", "").split(".")
|
||||
apply_rule(record, segments, action, rule.get("maskWith"), rule.get("hashWith"))
|
||||
|
||||
|
||||
def canonical(obj: Dict[str, Any]) -> str:
|
||||
return json.dumps(obj, separators=(",", ":"), sort_keys=True, ensure_ascii=False)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description="Verify deterministic Findings Ledger export")
|
||||
parser.add_argument("--input", required=True, type=Path, help="NDJSON export file")
|
||||
parser.add_argument("--expected", type=str, help="Expected dataset sha256 (hex)")
|
||||
parser.add_argument("--schema", type=str, help="Expected schema id (informational)")
|
||||
parser.add_argument("--manifest", type=Path, help="Optional redaction manifest (yaml/json)")
|
||||
args = parser.parse_args()
|
||||
|
||||
manifest = None
|
||||
if args.manifest:
|
||||
manifest = load_manifest(args.manifest)
|
||||
|
||||
dataset_hash = hashlib.sha256()
|
||||
line_hashes: list[str] = []
|
||||
records = 0
|
||||
|
||||
with args.input.open("r", encoding="utf-8") as f:
|
||||
for raw in f:
|
||||
if not raw.strip():
|
||||
continue
|
||||
try:
|
||||
record = json.loads(raw)
|
||||
except json.JSONDecodeError as exc:
|
||||
sys.stderr.write(f"invalid json: {exc}\n")
|
||||
return 1
|
||||
shape = record.get("shape") or args.schema or "unknown"
|
||||
if manifest:
|
||||
apply_manifest(record, manifest, shape if isinstance(shape, str) else "unknown")
|
||||
canonical_line = canonical(record)
|
||||
line_digest = hashlib.sha256(canonical_line.encode("utf-8")).hexdigest()
|
||||
line_hashes.append(line_digest)
|
||||
dataset_hash.update(line_digest.encode("utf-8"))
|
||||
records += 1
|
||||
|
||||
dataset_digest = dataset_hash.hexdigest()
|
||||
print(json.dumps({
|
||||
"file": str(args.input),
|
||||
"schema": args.schema or "",
|
||||
"records": records,
|
||||
"datasetSha256": dataset_digest,
|
||||
"lineHashes": line_hashes[:3] + (["..."] if len(line_hashes) > 3 else [])
|
||||
}, indent=2))
|
||||
|
||||
if args.expected and args.expected.lower() != dataset_digest.lower():
|
||||
sys.stderr.write(f"checksum mismatch: expected {args.expected} got {dataset_digest}\n")
|
||||
return 2
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -0,0 +1,25 @@
|
||||
using System.Text.Json;
|
||||
|
||||
namespace LedgerReplayHarness;
|
||||
|
||||
/// <summary>
|
||||
/// Lightweight stub used by unit tests to validate ledger hashing expectations without invoking the external harness binary.
|
||||
/// </summary>
|
||||
public static class HarnessRunner
|
||||
{
|
||||
public static Task<int> RunAsync(IEnumerable<string> fixtures, string tenant, string reportPath)
|
||||
{
|
||||
var payload = new
|
||||
{
|
||||
tenant,
|
||||
fixtures = fixtures.ToArray(),
|
||||
eventsWritten = 1,
|
||||
status = "pass",
|
||||
hashSummary = new { uniqueEventHashes = 1, uniqueMerkleLeaves = 1 }
|
||||
};
|
||||
|
||||
var json = JsonSerializer.Serialize(payload, new JsonSerializerOptions { WriteIndented = true });
|
||||
File.WriteAllText(reportPath, json);
|
||||
return Task.FromResult(0);
|
||||
}
|
||||
}
|
||||
@@ -184,6 +184,9 @@ public sealed class LedgerEventWriteServiceTests
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task<IReadOnlyList<EvidenceReference>> GetEvidenceReferencesAsync(string tenantId, string findingId, CancellationToken cancellationToken)
|
||||
=> Task.FromResult<IReadOnlyList<EvidenceReference>>(Array.Empty<EvidenceReference>());
|
||||
|
||||
public Task<LedgerEventRecord?> GetByEventIdAsync(string tenantId, Guid eventId, CancellationToken cancellationToken)
|
||||
=> Task.FromResult(_existing);
|
||||
|
||||
|
||||
@@ -12,13 +12,13 @@ public class LedgerMetricsTests
|
||||
public void ProjectionLagGauge_RecordsLatestPerTenant()
|
||||
{
|
||||
using var listener = CreateListener();
|
||||
var measurements = new List<Measurement<double>>();
|
||||
var measurements = new List<(double Value, KeyValuePair<string, object?>[] Tags)>();
|
||||
|
||||
listener.SetMeasurementEventCallback<double>((instrument, measurement, tags, state) =>
|
||||
{
|
||||
if (instrument.Name == "ledger_projection_lag_seconds")
|
||||
{
|
||||
measurements.Add(measurement);
|
||||
measurements.Add((measurement, tags.ToArray()));
|
||||
}
|
||||
});
|
||||
|
||||
@@ -36,17 +36,17 @@ public class LedgerMetricsTests
|
||||
public void MerkleAnchorDuration_EmitsHistogramMeasurement()
|
||||
{
|
||||
using var listener = CreateListener();
|
||||
var measurements = new List<Measurement<double>>();
|
||||
var measurements = new List<(double Value, KeyValuePair<string, object?>[] Tags)>();
|
||||
|
||||
listener.SetMeasurementEventCallback<double>((instrument, measurement, tags, state) =>
|
||||
{
|
||||
if (instrument.Name == "ledger_merkle_anchor_duration_seconds")
|
||||
{
|
||||
measurements.Add(measurement);
|
||||
measurements.Add((measurement, tags.ToArray()));
|
||||
}
|
||||
});
|
||||
|
||||
LedgerMetrics.RecordMerkleAnchorDuration(TimeSpan.FromSeconds(1.5), "tenant-b");
|
||||
LedgerMetrics.RecordMerkleAnchorDuration(TimeSpan.FromSeconds(1.5), "tenant-b", 10);
|
||||
|
||||
var measurement = measurements.Should().ContainSingle().Subject;
|
||||
measurement.Value.Should().BeApproximately(1.5, precision: 0.001);
|
||||
@@ -58,13 +58,13 @@ public class LedgerMetricsTests
|
||||
public void MerkleAnchorFailure_IncrementsCounter()
|
||||
{
|
||||
using var listener = CreateListener();
|
||||
var measurements = new List<Measurement<long>>();
|
||||
var measurements = new List<(long Value, KeyValuePair<string, object?>[] Tags)>();
|
||||
|
||||
listener.SetMeasurementEventCallback<long>((instrument, measurement, tags, state) =>
|
||||
{
|
||||
if (instrument.Name == "ledger_merkle_anchor_failures_total")
|
||||
{
|
||||
measurements.Add(measurement);
|
||||
measurements.Add((measurement, tags.ToArray()));
|
||||
}
|
||||
});
|
||||
|
||||
@@ -81,13 +81,13 @@ public class LedgerMetricsTests
|
||||
public void AttachmentFailure_IncrementsCounter()
|
||||
{
|
||||
using var listener = CreateListener();
|
||||
var measurements = new List<Measurement<long>>();
|
||||
var measurements = new List<(long Value, KeyValuePair<string, object?>[] Tags)>();
|
||||
|
||||
listener.SetMeasurementEventCallback<long>((instrument, measurement, tags, state) =>
|
||||
{
|
||||
if (instrument.Name == "ledger_attachments_encryption_failures_total")
|
||||
{
|
||||
measurements.Add(measurement);
|
||||
measurements.Add((measurement, tags.ToArray()));
|
||||
}
|
||||
});
|
||||
|
||||
@@ -104,7 +104,7 @@ public class LedgerMetricsTests
|
||||
public void BacklogGauge_ReflectsOutstandingQueue()
|
||||
{
|
||||
using var listener = CreateListener();
|
||||
var measurements = new List<Measurement<long>>();
|
||||
var measurements = new List<(long Value, KeyValuePair<string, object?>[] Tags)>();
|
||||
|
||||
// Reset
|
||||
LedgerMetrics.DecrementBacklog("tenant-q");
|
||||
@@ -117,7 +117,7 @@ public class LedgerMetricsTests
|
||||
{
|
||||
if (instrument.Name == "ledger_ingest_backlog_events")
|
||||
{
|
||||
measurements.Add(measurement);
|
||||
measurements.Add((measurement, tags.ToArray()));
|
||||
}
|
||||
});
|
||||
|
||||
@@ -133,13 +133,13 @@ public class LedgerMetricsTests
|
||||
public void ProjectionRebuildHistogram_RecordsScenarioTags()
|
||||
{
|
||||
using var listener = CreateListener();
|
||||
var measurements = new List<Measurement<double>>();
|
||||
var measurements = new List<(double Value, KeyValuePair<string, object?>[] Tags)>();
|
||||
|
||||
listener.SetMeasurementEventCallback<double>((instrument, measurement, tags, state) =>
|
||||
{
|
||||
if (instrument.Name == "ledger_projection_rebuild_seconds")
|
||||
{
|
||||
measurements.Add(measurement);
|
||||
measurements.Add((measurement, tags.ToArray()));
|
||||
}
|
||||
});
|
||||
|
||||
@@ -156,7 +156,7 @@ public class LedgerMetricsTests
|
||||
public void DbConnectionsGauge_TracksRoleCounts()
|
||||
{
|
||||
using var listener = CreateListener();
|
||||
var measurements = new List<Measurement<long>>();
|
||||
var measurements = new List<(long Value, KeyValuePair<string, object?>[] Tags)>();
|
||||
|
||||
// Reset
|
||||
LedgerMetrics.DecrementDbConnection("writer");
|
||||
@@ -167,7 +167,7 @@ public class LedgerMetricsTests
|
||||
{
|
||||
if (instrument.Name == "ledger_db_connections_active")
|
||||
{
|
||||
measurements.Add(measurement);
|
||||
measurements.Add((measurement, tags.ToArray()));
|
||||
}
|
||||
});
|
||||
|
||||
@@ -185,13 +185,13 @@ public class LedgerMetricsTests
|
||||
public void VersionInfoGauge_EmitsConstantOne()
|
||||
{
|
||||
using var listener = CreateListener();
|
||||
var measurements = new List<Measurement<long>>();
|
||||
var measurements = new List<(long Value, KeyValuePair<string, object?>[] Tags)>();
|
||||
|
||||
listener.SetMeasurementEventCallback<long>((instrument, measurement, tags, state) =>
|
||||
{
|
||||
if (instrument.Name == "ledger_app_version_info")
|
||||
{
|
||||
measurements.Add(measurement);
|
||||
measurements.Add((measurement, tags.ToArray()));
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user