Add unit tests for SBOM ingestion and transformation
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled

- Implement `SbomIngestServiceCollectionExtensionsTests` to verify the SBOM ingestion pipeline exports snapshots correctly.
- Create `SbomIngestTransformerTests` to ensure the transformation produces expected nodes and edges, including deduplication of license nodes and normalization of timestamps.
- Add `SbomSnapshotExporterTests` to test the export functionality for manifest, adjacency, nodes, and edges.
- Introduce `VexOverlayTransformerTests` to validate the transformation of VEX nodes and edges.
- Set up project file for the test project with necessary dependencies and configurations.
- Include JSON fixture files for testing purposes.
This commit is contained in:
master
2025-11-04 07:49:39 +02:00
parent f72c5c513a
commit 2eb6852d34
491 changed files with 39445 additions and 3917 deletions

View File

@@ -0,0 +1,81 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
using StellaOps.Findings.Ledger.Options;
namespace StellaOps.Findings.Ledger.Infrastructure.Postgres;
public sealed class LedgerDataSource : IAsyncDisposable
{
private readonly NpgsqlDataSource _dataSource;
private readonly LedgerServiceOptions.DatabaseOptions _options;
private readonly ILogger<LedgerDataSource> _logger;
public LedgerDataSource(
IOptions<LedgerServiceOptions> options,
ILogger<LedgerDataSource> logger)
{
ArgumentNullException.ThrowIfNull(options);
_options = options.Value.Database;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
var builder = new NpgsqlDataSourceBuilder(_options.ConnectionString);
_dataSource = builder.Build();
}
public int CommandTimeoutSeconds => _options.CommandTimeoutSeconds;
public async ValueTask DisposeAsync()
{
await _dataSource.DisposeAsync().ConfigureAwait(false);
}
public Task<NpgsqlConnection> OpenConnectionAsync(string tenantId, CancellationToken cancellationToken)
=> OpenConnectionInternalAsync(tenantId, cancellationToken);
private async Task<NpgsqlConnection> OpenConnectionInternalAsync(string tenantId, CancellationToken cancellationToken)
{
var connection = await _dataSource.OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
try
{
await ConfigureSessionAsync(connection, tenantId, cancellationToken).ConfigureAwait(false);
}
catch
{
await connection.DisposeAsync().ConfigureAwait(false);
throw;
}
return connection;
}
private async Task ConfigureSessionAsync(NpgsqlConnection connection, string tenantId, CancellationToken cancellationToken)
{
try
{
await using (var command = new NpgsqlCommand("SET TIME ZONE 'UTC';", connection))
{
command.CommandTimeout = _options.CommandTimeoutSeconds;
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
if (!string.IsNullOrWhiteSpace(tenantId))
{
await using var tenantCommand = new NpgsqlCommand("SELECT set_config('app.current_tenant', @tenant, false);", connection);
tenantCommand.CommandTimeout = _options.CommandTimeoutSeconds;
tenantCommand.Parameters.AddWithValue("tenant", tenantId);
await tenantCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
}
catch (Exception ex)
{
if (_logger.IsEnabled(LogLevel.Error))
{
_logger.LogError(ex, "Failed to configure PostgreSQL session for tenant {TenantId}.", tenantId);
}
throw;
}
}
}

View File

@@ -0,0 +1,318 @@
using System.Text.Json.Nodes;
using Microsoft.Extensions.Logging;
using Npgsql;
using NpgsqlTypes;
using StellaOps.Findings.Ledger.Domain;
using StellaOps.Findings.Ledger.Hashing;
namespace StellaOps.Findings.Ledger.Infrastructure.Postgres;
public sealed class PostgresFindingProjectionRepository : IFindingProjectionRepository
{
private const string GetProjectionSql = """
SELECT status,
severity,
labels,
current_event_id,
explain_ref,
policy_rationale,
updated_at,
cycle_hash
FROM findings_projection
WHERE tenant_id = @tenant_id
AND finding_id = @finding_id
AND policy_version = @policy_version
""";
private const string UpsertProjectionSql = """
INSERT INTO findings_projection (
tenant_id,
finding_id,
policy_version,
status,
severity,
labels,
current_event_id,
explain_ref,
policy_rationale,
updated_at,
cycle_hash)
VALUES (
@tenant_id,
@finding_id,
@policy_version,
@status,
@severity,
@labels,
@current_event_id,
@explain_ref,
@policy_rationale,
@updated_at,
@cycle_hash)
ON CONFLICT (tenant_id, finding_id, policy_version)
DO UPDATE SET
status = EXCLUDED.status,
severity = EXCLUDED.severity,
labels = EXCLUDED.labels,
current_event_id = EXCLUDED.current_event_id,
explain_ref = EXCLUDED.explain_ref,
policy_rationale = EXCLUDED.policy_rationale,
updated_at = EXCLUDED.updated_at,
cycle_hash = EXCLUDED.cycle_hash;
""";
private const string InsertHistorySql = """
INSERT INTO finding_history (
tenant_id,
finding_id,
policy_version,
event_id,
status,
severity,
actor_id,
comment,
occurred_at)
VALUES (
@tenant_id,
@finding_id,
@policy_version,
@event_id,
@status,
@severity,
@actor_id,
@comment,
@occurred_at)
ON CONFLICT (tenant_id, finding_id, event_id)
DO NOTHING;
""";
private const string InsertActionSql = """
INSERT INTO triage_actions (
tenant_id,
action_id,
event_id,
finding_id,
action_type,
payload,
created_at,
created_by)
VALUES (
@tenant_id,
@action_id,
@event_id,
@finding_id,
@action_type,
@payload,
@created_at,
@created_by)
ON CONFLICT (tenant_id, action_id)
DO NOTHING;
""";
private const string SelectCheckpointSql = """
SELECT last_recorded_at,
last_event_id,
updated_at
FROM ledger_projection_offsets
WHERE worker_id = @worker_id
""";
private const string UpsertCheckpointSql = """
INSERT INTO ledger_projection_offsets (
worker_id,
last_recorded_at,
last_event_id,
updated_at)
VALUES (
@worker_id,
@last_recorded_at,
@last_event_id,
@updated_at)
ON CONFLICT (worker_id)
DO UPDATE SET
last_recorded_at = EXCLUDED.last_recorded_at,
last_event_id = EXCLUDED.last_event_id,
updated_at = EXCLUDED.updated_at;
""";
private const string DefaultWorkerId = "default";
private readonly LedgerDataSource _dataSource;
private readonly TimeProvider _timeProvider;
private readonly ILogger<PostgresFindingProjectionRepository> _logger;
public PostgresFindingProjectionRepository(
LedgerDataSource dataSource,
TimeProvider timeProvider,
ILogger<PostgresFindingProjectionRepository> logger)
{
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<FindingProjection?> GetAsync(string tenantId, string findingId, string policyVersion, CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(GetProjectionSql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddWithValue("tenant_id", tenantId);
command.Parameters.AddWithValue("finding_id", findingId);
command.Parameters.AddWithValue("policy_version", policyVersion);
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
return null;
}
var status = reader.GetString(0);
var severity = reader.IsDBNull(1) ? (decimal?)null : reader.GetDecimal(1);
var labelsJson = reader.GetFieldValue<string>(2);
var labels = JsonNode.Parse(labelsJson)?.AsObject() ?? new JsonObject();
var currentEventId = reader.GetGuid(3);
var explainRef = reader.IsDBNull(4) ? null : reader.GetString(4);
var rationaleJson = reader.IsDBNull(5) ? string.Empty : reader.GetFieldValue<string>(5);
JsonArray rationale;
if (string.IsNullOrWhiteSpace(rationaleJson))
{
rationale = new JsonArray();
}
else
{
rationale = JsonNode.Parse(rationaleJson) as JsonArray ?? new JsonArray();
}
var updatedAt = reader.GetFieldValue<DateTimeOffset>(6);
var cycleHash = reader.GetString(7);
return new FindingProjection(
tenantId,
findingId,
policyVersion,
status,
severity,
labels,
currentEventId,
explainRef,
rationale,
updatedAt,
cycleHash);
}
public async Task UpsertAsync(FindingProjection projection, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(projection);
await using var connection = await _dataSource.OpenConnectionAsync(projection.TenantId, cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(UpsertProjectionSql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddWithValue("tenant_id", projection.TenantId);
command.Parameters.AddWithValue("finding_id", projection.FindingId);
command.Parameters.AddWithValue("policy_version", projection.PolicyVersion);
command.Parameters.AddWithValue("status", projection.Status);
command.Parameters.AddWithValue("severity", projection.Severity.HasValue ? projection.Severity.Value : (object)DBNull.Value);
var labelsCanonical = LedgerCanonicalJsonSerializer.Canonicalize(projection.Labels);
var labelsJson = labelsCanonical.ToJsonString();
command.Parameters.Add(new NpgsqlParameter<string>("labels", NpgsqlDbType.Jsonb) { TypedValue = labelsJson });
command.Parameters.AddWithValue("current_event_id", projection.CurrentEventId);
command.Parameters.AddWithValue("explain_ref", projection.ExplainRef ?? (object)DBNull.Value);
var rationaleCanonical = LedgerCanonicalJsonSerializer.Canonicalize(projection.PolicyRationale);
var rationaleJson = rationaleCanonical.ToJsonString();
command.Parameters.Add(new NpgsqlParameter<string>("policy_rationale", NpgsqlDbType.Jsonb) { TypedValue = rationaleJson });
command.Parameters.AddWithValue("updated_at", projection.UpdatedAt);
command.Parameters.AddWithValue("cycle_hash", projection.CycleHash);
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
public async Task InsertHistoryAsync(FindingHistoryEntry entry, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(entry);
await using var connection = await _dataSource.OpenConnectionAsync(entry.TenantId, cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(InsertHistorySql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddWithValue("tenant_id", entry.TenantId);
command.Parameters.AddWithValue("finding_id", entry.FindingId);
command.Parameters.AddWithValue("policy_version", entry.PolicyVersion);
command.Parameters.AddWithValue("event_id", entry.EventId);
command.Parameters.AddWithValue("status", entry.Status);
command.Parameters.AddWithValue("severity", entry.Severity.HasValue ? entry.Severity.Value : (object)DBNull.Value);
command.Parameters.AddWithValue("actor_id", entry.ActorId);
command.Parameters.AddWithValue("comment", entry.Comment ?? (object)DBNull.Value);
command.Parameters.AddWithValue("occurred_at", entry.OccurredAt);
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
public async Task InsertActionAsync(TriageActionEntry entry, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(entry);
await using var connection = await _dataSource.OpenConnectionAsync(entry.TenantId, cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(InsertActionSql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddWithValue("tenant_id", entry.TenantId);
command.Parameters.AddWithValue("action_id", entry.ActionId);
command.Parameters.AddWithValue("event_id", entry.EventId);
command.Parameters.AddWithValue("finding_id", entry.FindingId);
command.Parameters.AddWithValue("action_type", entry.ActionType);
var payloadJson = entry.Payload.ToJsonString();
command.Parameters.Add(new NpgsqlParameter<string>("payload", NpgsqlDbType.Jsonb) { TypedValue = payloadJson });
command.Parameters.AddWithValue("created_at", entry.CreatedAt);
command.Parameters.AddWithValue("created_by", entry.CreatedBy);
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
public async Task<ProjectionCheckpoint> GetCheckpointAsync(CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(string.Empty, cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(SelectCheckpointSql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddWithValue("worker_id", DefaultWorkerId);
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
return ProjectionCheckpoint.Initial(_timeProvider);
}
var lastRecordedAt = reader.GetFieldValue<DateTimeOffset>(0);
var lastEventId = reader.GetGuid(1);
var updatedAt = reader.GetFieldValue<DateTimeOffset>(2);
return new ProjectionCheckpoint(lastRecordedAt, lastEventId, updatedAt);
}
public async Task SaveCheckpointAsync(ProjectionCheckpoint checkpoint, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(checkpoint);
await using var connection = await _dataSource.OpenConnectionAsync(string.Empty, cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(UpsertCheckpointSql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddWithValue("worker_id", DefaultWorkerId);
command.Parameters.AddWithValue("last_recorded_at", checkpoint.LastRecordedAt);
command.Parameters.AddWithValue("last_event_id", checkpoint.LastEventId);
command.Parameters.AddWithValue("updated_at", checkpoint.UpdatedAt);
try
{
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
catch (PostgresException ex)
{
_logger.LogError(ex, "Failed to persist projection checkpoint.");
throw;
}
}
}

View File

@@ -0,0 +1,221 @@
using System.Text.Json.Nodes;
using Microsoft.Extensions.Logging;
using Npgsql;
using NpgsqlTypes;
using StellaOps.Findings.Ledger.Domain;
using StellaOps.Findings.Ledger.Hashing;
namespace StellaOps.Findings.Ledger.Infrastructure.Postgres;
public sealed class PostgresLedgerEventRepository : ILedgerEventRepository
{
private const string SelectByEventIdSql = """
SELECT chain_id,
sequence_no,
event_type,
policy_version,
finding_id,
artifact_id,
source_run_id,
actor_id,
actor_type,
occurred_at,
recorded_at,
event_body,
event_hash,
previous_hash,
merkle_leaf_hash
FROM ledger_events
WHERE tenant_id = @tenant_id
AND event_id = @event_id
""";
private const string SelectChainHeadSql = """
SELECT sequence_no,
event_hash,
recorded_at
FROM ledger_events
WHERE tenant_id = @tenant_id
AND chain_id = @chain_id
ORDER BY sequence_no DESC
LIMIT 1
""";
private const string InsertEventSql = """
INSERT INTO ledger_events (
tenant_id,
chain_id,
sequence_no,
event_id,
event_type,
policy_version,
finding_id,
artifact_id,
source_run_id,
actor_id,
actor_type,
occurred_at,
recorded_at,
event_body,
event_hash,
previous_hash,
merkle_leaf_hash)
VALUES (
@tenant_id,
@chain_id,
@sequence_no,
@event_id,
@event_type,
@policy_version,
@finding_id,
@artifact_id,
@source_run_id,
@actor_id,
@actor_type,
@occurred_at,
@recorded_at,
@event_body,
@event_hash,
@previous_hash,
@merkle_leaf_hash)
""";
private readonly LedgerDataSource _dataSource;
private readonly ILogger<PostgresLedgerEventRepository> _logger;
public PostgresLedgerEventRepository(
LedgerDataSource dataSource,
ILogger<PostgresLedgerEventRepository> logger)
{
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<LedgerEventRecord?> GetByEventIdAsync(string tenantId, Guid eventId, CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(SelectByEventIdSql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddWithValue("tenant_id", tenantId);
command.Parameters.AddWithValue("event_id", eventId);
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
return null;
}
return MapLedgerEventRecord(tenantId, eventId, reader);
}
public async Task<LedgerChainHead?> GetChainHeadAsync(string tenantId, Guid chainId, CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(SelectChainHeadSql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddWithValue("tenant_id", tenantId);
command.Parameters.AddWithValue("chain_id", chainId);
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
return null;
}
var sequenceNumber = reader.GetInt64(0);
var eventHash = reader.GetString(1);
var recordedAt = reader.GetFieldValue<DateTimeOffset>(2);
return new LedgerChainHead(sequenceNumber, eventHash, recordedAt);
}
public async Task AppendAsync(LedgerEventRecord record, CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(record.TenantId, cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(InsertEventSql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddWithValue("tenant_id", record.TenantId);
command.Parameters.AddWithValue("chain_id", record.ChainId);
command.Parameters.AddWithValue("sequence_no", record.SequenceNumber);
command.Parameters.AddWithValue("event_id", record.EventId);
command.Parameters.AddWithValue("event_type", record.EventType);
command.Parameters.AddWithValue("policy_version", record.PolicyVersion);
command.Parameters.AddWithValue("finding_id", record.FindingId);
command.Parameters.AddWithValue("artifact_id", record.ArtifactId);
if (record.SourceRunId.HasValue)
{
command.Parameters.AddWithValue("source_run_id", record.SourceRunId.Value);
}
else
{
command.Parameters.AddWithValue("source_run_id", DBNull.Value);
}
command.Parameters.AddWithValue("actor_id", record.ActorId);
command.Parameters.AddWithValue("actor_type", record.ActorType);
command.Parameters.AddWithValue("occurred_at", record.OccurredAt);
command.Parameters.AddWithValue("recorded_at", record.RecordedAt);
var eventBody = record.EventBody.ToJsonString();
command.Parameters.Add(new NpgsqlParameter<string>("event_body", NpgsqlDbType.Jsonb) { TypedValue = eventBody });
command.Parameters.AddWithValue("event_hash", record.EventHash);
command.Parameters.AddWithValue("previous_hash", record.PreviousHash);
command.Parameters.AddWithValue("merkle_leaf_hash", record.MerkleLeafHash);
try
{
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
catch (PostgresException ex) when (string.Equals(ex.SqlState, PostgresErrorCodes.UniqueViolation, StringComparison.Ordinal))
{
throw new LedgerDuplicateEventException(record.EventId, ex);
}
}
internal static LedgerEventRecord MapLedgerEventRecord(string tenantId, Guid eventId, NpgsqlDataReader reader)
{
var chainId = reader.GetFieldValue<Guid>(0);
var sequenceNumber = reader.GetInt64(1);
var eventType = reader.GetString(2);
var policyVersion = reader.GetString(3);
var findingId = reader.GetString(4);
var artifactId = reader.GetString(5);
var sourceRunId = reader.IsDBNull(6) ? (Guid?)null : reader.GetGuid(6);
var actorId = reader.GetString(7);
var actorType = reader.GetString(8);
var occurredAt = reader.GetFieldValue<DateTimeOffset>(9);
var recordedAt = reader.GetFieldValue<DateTimeOffset>(10);
var eventBodyJson = reader.GetFieldValue<string>(11);
var eventBody = JsonNode.Parse(eventBodyJson)?.AsObject()
?? throw new InvalidOperationException("Failed to parse ledger event body.");
var eventHash = reader.GetString(12);
var previousHash = reader.GetString(13);
var merkleLeafHash = reader.GetString(14);
var canonicalEnvelope = LedgerCanonicalJsonSerializer.Canonicalize(eventBody);
var canonicalJson = LedgerCanonicalJsonSerializer.Serialize(canonicalEnvelope);
return new LedgerEventRecord(
tenantId,
chainId,
sequenceNumber,
eventId,
eventType,
policyVersion,
findingId,
artifactId,
sourceRunId,
actorId,
actorType,
occurredAt,
recordedAt,
eventBody,
eventHash,
previousHash,
merkleLeafHash,
canonicalJson);
}
}

View File

@@ -0,0 +1,130 @@
using System.Text.Json.Nodes;
using Microsoft.Extensions.Logging;
using Npgsql;
using StellaOps.Findings.Ledger.Domain;
using StellaOps.Findings.Ledger.Hashing;
namespace StellaOps.Findings.Ledger.Infrastructure.Postgres;
public sealed class PostgresLedgerEventStream : ILedgerEventStream
{
private const string ReadEventsSql = """
SELECT tenant_id,
chain_id,
sequence_no,
event_id,
event_type,
policy_version,
finding_id,
artifact_id,
source_run_id,
actor_id,
actor_type,
occurred_at,
recorded_at,
event_body,
event_hash,
previous_hash,
merkle_leaf_hash
FROM ledger_events
WHERE recorded_at > @last_recorded_at
OR (recorded_at = @last_recorded_at AND event_id > @last_event_id)
ORDER BY recorded_at, event_id
LIMIT @page_size
""";
private readonly LedgerDataSource _dataSource;
private readonly ILogger<PostgresLedgerEventStream> _logger;
public PostgresLedgerEventStream(
LedgerDataSource dataSource,
ILogger<PostgresLedgerEventStream> logger)
{
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<IReadOnlyList<LedgerEventRecord>> ReadNextBatchAsync(
ProjectionCheckpoint checkpoint,
int batchSize,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(checkpoint);
if (batchSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(batchSize), "Batch size must be greater than zero.");
}
var records = new List<LedgerEventRecord>(batchSize);
await using var connection = await _dataSource.OpenConnectionAsync(string.Empty, cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(ReadEventsSql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddWithValue("last_recorded_at", checkpoint.LastRecordedAt);
command.Parameters.AddWithValue("last_event_id", checkpoint.LastEventId);
command.Parameters.AddWithValue("page_size", batchSize);
try
{
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
records.Add(MapLedgerEvent(reader));
}
}
catch (PostgresException ex)
{
_logger.LogError(ex, "Failed to read ledger event batch for projection replay.");
throw;
}
return records;
}
private static LedgerEventRecord MapLedgerEvent(NpgsqlDataReader reader)
{
var tenantId = reader.GetString(0);
var chainId = reader.GetFieldValue<Guid>(1);
var sequenceNumber = reader.GetInt64(2);
var eventId = reader.GetGuid(3);
var eventType = reader.GetString(4);
var policyVersion = reader.GetString(5);
var findingId = reader.GetString(6);
var artifactId = reader.GetString(7);
var sourceRunId = reader.IsDBNull(8) ? (Guid?)null : reader.GetGuid(8);
var actorId = reader.GetString(9);
var actorType = reader.GetString(10);
var occurredAt = reader.GetFieldValue<DateTimeOffset>(11);
var recordedAt = reader.GetFieldValue<DateTimeOffset>(12);
var eventBodyJson = reader.GetFieldValue<string>(13);
var eventBodyParsed = JsonNode.Parse(eventBodyJson)?.AsObject()
?? throw new InvalidOperationException("Failed to parse ledger event payload.");
var canonicalEnvelope = LedgerCanonicalJsonSerializer.Canonicalize(eventBodyParsed);
var canonicalJson = LedgerCanonicalJsonSerializer.Serialize(canonicalEnvelope);
var eventHash = reader.GetString(14);
var previousHash = reader.GetString(15);
var merkleLeafHash = reader.GetString(16);
return new LedgerEventRecord(
tenantId,
chainId,
sequenceNumber,
eventId,
eventType,
policyVersion,
findingId,
artifactId,
sourceRunId,
actorId,
actorType,
occurredAt,
recordedAt,
canonicalEnvelope,
eventHash,
previousHash,
merkleLeafHash,
canonicalJson);
}
}

View File

@@ -0,0 +1,83 @@
using Microsoft.Extensions.Logging;
using Npgsql;
using StellaOps.Findings.Ledger.Infrastructure.Merkle;
namespace StellaOps.Findings.Ledger.Infrastructure.Postgres;
public sealed class PostgresMerkleAnchorRepository : IMerkleAnchorRepository
{
private const string InsertAnchorSql = """
INSERT INTO ledger_merkle_roots (
tenant_id,
anchor_id,
window_start,
window_end,
sequence_start,
sequence_end,
root_hash,
leaf_count,
anchored_at,
anchor_reference)
VALUES (
@tenant_id,
@anchor_id,
@window_start,
@window_end,
@sequence_start,
@sequence_end,
@root_hash,
@leaf_count,
@anchored_at,
@anchor_reference)
""";
private readonly LedgerDataSource _dataSource;
private readonly ILogger<PostgresMerkleAnchorRepository> _logger;
public PostgresMerkleAnchorRepository(
LedgerDataSource dataSource,
ILogger<PostgresMerkleAnchorRepository> logger)
{
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task InsertAsync(
string tenantId,
Guid anchorId,
DateTimeOffset windowStart,
DateTimeOffset windowEnd,
long sequenceStart,
long sequenceEnd,
string rootHash,
int leafCount,
DateTimeOffset anchoredAt,
string? anchorReference,
CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(InsertAnchorSql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddWithValue("tenant_id", tenantId);
command.Parameters.AddWithValue("anchor_id", anchorId);
command.Parameters.AddWithValue("window_start", windowStart);
command.Parameters.AddWithValue("window_end", windowEnd);
command.Parameters.AddWithValue("sequence_start", sequenceStart);
command.Parameters.AddWithValue("sequence_end", sequenceEnd);
command.Parameters.AddWithValue("root_hash", rootHash);
command.Parameters.AddWithValue("leaf_count", leafCount);
command.Parameters.AddWithValue("anchored_at", anchoredAt);
command.Parameters.AddWithValue("anchor_reference", anchorReference ?? (object)DBNull.Value);
try
{
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
catch (PostgresException ex)
{
_logger.LogError(ex, "Failed to insert Merkle root for tenant {TenantId}.", tenantId);
throw;
}
}
}