namespace StellaOps.Findings.Ledger.Infrastructure.Postgres; using System.Text; using System.Text.Json; using Npgsql; using NpgsqlTypes; using StellaOps.Findings.Ledger.Domain; using StellaOps.Findings.Ledger.Infrastructure.Snapshot; /// /// PostgreSQL implementation of snapshot repository. /// public sealed class PostgresSnapshotRepository : ISnapshotRepository { private readonly NpgsqlDataSource _dataSource; private readonly JsonSerializerOptions _jsonOptions; public PostgresSnapshotRepository(NpgsqlDataSource dataSource) { _dataSource = dataSource; _jsonOptions = new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase, WriteIndented = false }; } public async Task CreateAsync( string tenantId, CreateSnapshotInput input, long currentSequence, DateTimeOffset currentTimestamp, CancellationToken ct = default) { var snapshotId = Guid.NewGuid(); var createdAt = DateTimeOffset.UtcNow; var expiresAt = input.ExpiresIn.HasValue ? createdAt.Add(input.ExpiresIn.Value) : (DateTimeOffset?)null; var sequenceNumber = input.AtSequence ?? currentSequence; var timestamp = input.AtTimestamp ?? currentTimestamp; var initialStats = new SnapshotStatistics(0, 0, 0, 0, 0, 0); var metadataJson = input.Metadata != null ? JsonSerializer.Serialize(input.Metadata, _jsonOptions) : null; var entityTypesJson = input.IncludeEntityTypes != null ? JsonSerializer.Serialize(input.IncludeEntityTypes.Select(e => e.ToString()).ToList(), _jsonOptions) : null; const string sql = """ INSERT INTO ledger_snapshots ( tenant_id, snapshot_id, label, description, status, created_at, expires_at, sequence_number, snapshot_timestamp, findings_count, vex_statements_count, advisories_count, sboms_count, events_count, size_bytes, merkle_root, dsse_digest, metadata, include_entity_types, sign_requested ) VALUES ( @tenantId, @snapshotId, @label, @description, @status, @createdAt, @expiresAt, @sequenceNumber, @timestamp, @findingsCount, @vexCount, @advisoriesCount, @sbomsCount, @eventsCount, @sizeBytes, @merkleRoot, @dsseDigest, @metadata::jsonb, @entityTypes::jsonb, @sign ) """; await using var cmd = _dataSource.CreateCommand(sql); cmd.Parameters.AddWithValue("tenantId", tenantId); cmd.Parameters.AddWithValue("snapshotId", snapshotId); cmd.Parameters.AddWithValue("label", (object?)input.Label ?? DBNull.Value); cmd.Parameters.AddWithValue("description", (object?)input.Description ?? DBNull.Value); cmd.Parameters.AddWithValue("status", SnapshotStatus.Creating.ToString()); cmd.Parameters.AddWithValue("createdAt", createdAt); cmd.Parameters.AddWithValue("expiresAt", (object?)expiresAt ?? DBNull.Value); cmd.Parameters.AddWithValue("sequenceNumber", sequenceNumber); cmd.Parameters.AddWithValue("timestamp", timestamp); cmd.Parameters.AddWithValue("findingsCount", initialStats.FindingsCount); cmd.Parameters.AddWithValue("vexCount", initialStats.VexStatementsCount); cmd.Parameters.AddWithValue("advisoriesCount", initialStats.AdvisoriesCount); cmd.Parameters.AddWithValue("sbomsCount", initialStats.SbomsCount); cmd.Parameters.AddWithValue("eventsCount", initialStats.EventsCount); cmd.Parameters.AddWithValue("sizeBytes", initialStats.SizeBytes); cmd.Parameters.AddWithValue("merkleRoot", DBNull.Value); cmd.Parameters.AddWithValue("dsseDigest", DBNull.Value); cmd.Parameters.AddWithValue("metadata", (object?)metadataJson ?? DBNull.Value); cmd.Parameters.AddWithValue("entityTypes", (object?)entityTypesJson ?? DBNull.Value); cmd.Parameters.AddWithValue("sign", input.Sign); await cmd.ExecuteNonQueryAsync(ct); return new LedgerSnapshot( tenantId, snapshotId, input.Label, input.Description, SnapshotStatus.Creating, createdAt, expiresAt, sequenceNumber, timestamp, initialStats, null, null, input.Metadata); } public async Task GetByIdAsync( string tenantId, Guid snapshotId, CancellationToken ct = default) { const string sql = """ SELECT tenant_id, snapshot_id, label, description, status, created_at, expires_at, sequence_number, snapshot_timestamp, findings_count, vex_statements_count, advisories_count, sboms_count, events_count, size_bytes, merkle_root, dsse_digest, metadata FROM ledger_snapshots WHERE tenant_id = @tenantId AND snapshot_id = @snapshotId """; await using var cmd = _dataSource.CreateCommand(sql); cmd.Parameters.AddWithValue("tenantId", tenantId); cmd.Parameters.AddWithValue("snapshotId", snapshotId); await using var reader = await cmd.ExecuteReaderAsync(ct); if (!await reader.ReadAsync(ct)) return null; return MapSnapshot(reader); } public async Task<(IReadOnlyList Snapshots, string? NextPageToken)> ListAsync( SnapshotListQuery query, CancellationToken ct = default) { var sql = new StringBuilder(""" SELECT tenant_id, snapshot_id, label, description, status, created_at, expires_at, sequence_number, snapshot_timestamp, findings_count, vex_statements_count, advisories_count, sboms_count, events_count, size_bytes, merkle_root, dsse_digest, metadata FROM ledger_snapshots WHERE tenant_id = @tenantId """); var parameters = new List { new("tenantId", query.TenantId) }; if (query.Status.HasValue) { sql.Append(" AND status = @status"); parameters.Add(new NpgsqlParameter("status", query.Status.Value.ToString())); } if (query.CreatedAfter.HasValue) { sql.Append(" AND created_at >= @createdAfter"); parameters.Add(new NpgsqlParameter("createdAfter", query.CreatedAfter.Value)); } if (query.CreatedBefore.HasValue) { sql.Append(" AND created_at < @createdBefore"); parameters.Add(new NpgsqlParameter("createdBefore", query.CreatedBefore.Value)); } if (!string.IsNullOrEmpty(query.PageToken)) { if (Guid.TryParse(query.PageToken, out var lastId)) { sql.Append(" AND snapshot_id > @lastId"); parameters.Add(new NpgsqlParameter("lastId", lastId)); } } sql.Append(" ORDER BY created_at DESC, snapshot_id"); sql.Append(" LIMIT @limit"); parameters.Add(new NpgsqlParameter("limit", query.PageSize + 1)); await using var cmd = _dataSource.CreateCommand(sql.ToString()); cmd.Parameters.AddRange(parameters.ToArray()); var snapshots = new List(); await using var reader = await cmd.ExecuteReaderAsync(ct); while (await reader.ReadAsync(ct) && snapshots.Count < query.PageSize) { snapshots.Add(MapSnapshot(reader)); } string? nextPageToken = null; if (await reader.ReadAsync(ct)) { nextPageToken = snapshots.Last().SnapshotId.ToString(); } return (snapshots, nextPageToken); } public async Task UpdateStatusAsync( string tenantId, Guid snapshotId, SnapshotStatus newStatus, CancellationToken ct = default) { const string sql = """ UPDATE ledger_snapshots SET status = @status, updated_at = @updatedAt WHERE tenant_id = @tenantId AND snapshot_id = @snapshotId """; await using var cmd = _dataSource.CreateCommand(sql); cmd.Parameters.AddWithValue("tenantId", tenantId); cmd.Parameters.AddWithValue("snapshotId", snapshotId); cmd.Parameters.AddWithValue("status", newStatus.ToString()); cmd.Parameters.AddWithValue("updatedAt", DateTimeOffset.UtcNow); return await cmd.ExecuteNonQueryAsync(ct) > 0; } public async Task UpdateStatisticsAsync( string tenantId, Guid snapshotId, SnapshotStatistics statistics, CancellationToken ct = default) { const string sql = """ UPDATE ledger_snapshots SET findings_count = @findingsCount, vex_statements_count = @vexCount, advisories_count = @advisoriesCount, sboms_count = @sbomsCount, events_count = @eventsCount, size_bytes = @sizeBytes, updated_at = @updatedAt WHERE tenant_id = @tenantId AND snapshot_id = @snapshotId """; await using var cmd = _dataSource.CreateCommand(sql); cmd.Parameters.AddWithValue("tenantId", tenantId); cmd.Parameters.AddWithValue("snapshotId", snapshotId); cmd.Parameters.AddWithValue("findingsCount", statistics.FindingsCount); cmd.Parameters.AddWithValue("vexCount", statistics.VexStatementsCount); cmd.Parameters.AddWithValue("advisoriesCount", statistics.AdvisoriesCount); cmd.Parameters.AddWithValue("sbomsCount", statistics.SbomsCount); cmd.Parameters.AddWithValue("eventsCount", statistics.EventsCount); cmd.Parameters.AddWithValue("sizeBytes", statistics.SizeBytes); cmd.Parameters.AddWithValue("updatedAt", DateTimeOffset.UtcNow); return await cmd.ExecuteNonQueryAsync(ct) > 0; } public async Task SetMerkleRootAsync( string tenantId, Guid snapshotId, string merkleRoot, string? dsseDigest, CancellationToken ct = default) { const string sql = """ UPDATE ledger_snapshots SET merkle_root = @merkleRoot, dsse_digest = @dsseDigest, updated_at = @updatedAt WHERE tenant_id = @tenantId AND snapshot_id = @snapshotId """; await using var cmd = _dataSource.CreateCommand(sql); cmd.Parameters.AddWithValue("tenantId", tenantId); cmd.Parameters.AddWithValue("snapshotId", snapshotId); cmd.Parameters.AddWithValue("merkleRoot", merkleRoot); cmd.Parameters.AddWithValue("dsseDigest", (object?)dsseDigest ?? DBNull.Value); cmd.Parameters.AddWithValue("updatedAt", DateTimeOffset.UtcNow); return await cmd.ExecuteNonQueryAsync(ct) > 0; } public async Task ExpireSnapshotsAsync( DateTimeOffset cutoff, CancellationToken ct = default) { const string sql = """ UPDATE ledger_snapshots SET status = @expiredStatus, updated_at = @updatedAt WHERE expires_at IS NOT NULL AND expires_at < @cutoff AND status = @availableStatus """; await using var cmd = _dataSource.CreateCommand(sql); cmd.Parameters.AddWithValue("expiredStatus", SnapshotStatus.Expired.ToString()); cmd.Parameters.AddWithValue("availableStatus", SnapshotStatus.Available.ToString()); cmd.Parameters.AddWithValue("cutoff", cutoff); cmd.Parameters.AddWithValue("updatedAt", DateTimeOffset.UtcNow); return await cmd.ExecuteNonQueryAsync(ct); } public async Task DeleteAsync( string tenantId, Guid snapshotId, CancellationToken ct = default) { const string sql = """ UPDATE ledger_snapshots SET status = @deletedStatus, updated_at = @updatedAt WHERE tenant_id = @tenantId AND snapshot_id = @snapshotId """; await using var cmd = _dataSource.CreateCommand(sql); cmd.Parameters.AddWithValue("tenantId", tenantId); cmd.Parameters.AddWithValue("snapshotId", snapshotId); cmd.Parameters.AddWithValue("deletedStatus", SnapshotStatus.Deleted.ToString()); cmd.Parameters.AddWithValue("updatedAt", DateTimeOffset.UtcNow); return await cmd.ExecuteNonQueryAsync(ct) > 0; } public async Task GetLatestAsync( string tenantId, CancellationToken ct = default) { const string sql = """ SELECT tenant_id, snapshot_id, label, description, status, created_at, expires_at, sequence_number, snapshot_timestamp, findings_count, vex_statements_count, advisories_count, sboms_count, events_count, size_bytes, merkle_root, dsse_digest, metadata FROM ledger_snapshots WHERE tenant_id = @tenantId AND status = @status ORDER BY created_at DESC LIMIT 1 """; await using var cmd = _dataSource.CreateCommand(sql); cmd.Parameters.AddWithValue("tenantId", tenantId); cmd.Parameters.AddWithValue("status", SnapshotStatus.Available.ToString()); await using var reader = await cmd.ExecuteReaderAsync(ct); if (!await reader.ReadAsync(ct)) return null; return MapSnapshot(reader); } public async Task ExistsAsync( string tenantId, Guid snapshotId, CancellationToken ct = default) { const string sql = """ SELECT 1 FROM ledger_snapshots WHERE tenant_id = @tenantId AND snapshot_id = @snapshotId LIMIT 1 """; await using var cmd = _dataSource.CreateCommand(sql); cmd.Parameters.AddWithValue("tenantId", tenantId); cmd.Parameters.AddWithValue("snapshotId", snapshotId); await using var reader = await cmd.ExecuteReaderAsync(ct); return await reader.ReadAsync(ct); } private LedgerSnapshot MapSnapshot(NpgsqlDataReader reader) { var metadataJson = reader.IsDBNull(reader.GetOrdinal("metadata")) ? null : reader.GetString(reader.GetOrdinal("metadata")); Dictionary? metadata = null; if (!string.IsNullOrEmpty(metadataJson)) { metadata = JsonSerializer.Deserialize>(metadataJson, _jsonOptions); } return new LedgerSnapshot( TenantId: reader.GetString(reader.GetOrdinal("tenant_id")), SnapshotId: reader.GetGuid(reader.GetOrdinal("snapshot_id")), Label: reader.IsDBNull(reader.GetOrdinal("label")) ? null : reader.GetString(reader.GetOrdinal("label")), Description: reader.IsDBNull(reader.GetOrdinal("description")) ? null : reader.GetString(reader.GetOrdinal("description")), Status: Enum.Parse(reader.GetString(reader.GetOrdinal("status"))), CreatedAt: reader.GetFieldValue(reader.GetOrdinal("created_at")), ExpiresAt: reader.IsDBNull(reader.GetOrdinal("expires_at")) ? null : reader.GetFieldValue(reader.GetOrdinal("expires_at")), SequenceNumber: reader.GetInt64(reader.GetOrdinal("sequence_number")), Timestamp: reader.GetFieldValue(reader.GetOrdinal("snapshot_timestamp")), Statistics: new SnapshotStatistics( FindingsCount: reader.GetInt64(reader.GetOrdinal("findings_count")), VexStatementsCount: reader.GetInt64(reader.GetOrdinal("vex_statements_count")), AdvisoriesCount: reader.GetInt64(reader.GetOrdinal("advisories_count")), SbomsCount: reader.GetInt64(reader.GetOrdinal("sboms_count")), EventsCount: reader.GetInt64(reader.GetOrdinal("events_count")), SizeBytes: reader.GetInt64(reader.GetOrdinal("size_bytes"))), MerkleRoot: reader.IsDBNull(reader.GetOrdinal("merkle_root")) ? null : reader.GetString(reader.GetOrdinal("merkle_root")), DsseDigest: reader.IsDBNull(reader.GetOrdinal("dsse_digest")) ? null : reader.GetString(reader.GetOrdinal("dsse_digest")), Metadata: metadata); } }