using System.Text.Json;
using Microsoft.Extensions.Logging;
using Npgsql;
using NpgsqlTypes;
using StellaOps.Evidence.Core;
using StellaOps.Infrastructure.Postgres.Repositories;
namespace StellaOps.Evidence.Persistence.Postgres;
///
/// PostgreSQL implementation of .
/// Stores evidence records with content-addressed IDs and tenant isolation via RLS.
///
public sealed class PostgresEvidenceStore : RepositoryBase, IEvidenceStore
{
private readonly string _tenantId;
private static readonly JsonSerializerOptions JsonOptions = new()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};
///
/// Creates a new PostgreSQL evidence store for the specified tenant.
///
/// Evidence data source.
/// Tenant identifier for RLS.
/// Logger instance.
public PostgresEvidenceStore(
EvidenceDataSource dataSource,
string tenantId,
ILogger logger)
: base(dataSource, logger)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
_tenantId = tenantId;
}
///
public async Task StoreAsync(IEvidence evidence, CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(evidence);
const string sql = """
INSERT INTO evidence.records (
evidence_id, subject_node_id, evidence_type, payload,
payload_schema_ver, external_cid, provenance, signatures, tenant_id
) VALUES (
@evidenceId, @subjectNodeId, @evidenceType, @payload,
@payloadSchemaVer, @externalCid, @provenance, @signatures, @tenantId
)
ON CONFLICT (evidence_id) DO NOTHING
RETURNING evidence_id
""";
await using var connection = await DataSource.OpenConnectionAsync(_tenantId, "writer", ct)
.ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddEvidenceParameters(command, evidence);
var result = await command.ExecuteScalarAsync(ct).ConfigureAwait(false);
// If result is null, row already existed (idempotent)
return evidence.EvidenceId;
}
///
public async Task StoreBatchAsync(IEnumerable evidenceRecords, CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(evidenceRecords);
var records = evidenceRecords.ToList();
if (records.Count == 0)
{
return 0;
}
await using var connection = await DataSource.OpenConnectionAsync(_tenantId, "writer", ct)
.ConfigureAwait(false);
await using var transaction = await connection.BeginTransactionAsync(ct).ConfigureAwait(false);
var storedCount = 0;
foreach (var evidence in records)
{
const string sql = """
INSERT INTO evidence.records (
evidence_id, subject_node_id, evidence_type, payload,
payload_schema_ver, external_cid, provenance, signatures, tenant_id
) VALUES (
@evidenceId, @subjectNodeId, @evidenceType, @payload,
@payloadSchemaVer, @externalCid, @provenance, @signatures, @tenantId
)
ON CONFLICT (evidence_id) DO NOTHING
""";
await using var command = new NpgsqlCommand(sql, connection, transaction)
{
CommandTimeout = CommandTimeoutSeconds
};
AddEvidenceParameters(command, evidence);
var affected = await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
if (affected > 0)
{
storedCount++;
}
}
await transaction.CommitAsync(ct).ConfigureAwait(false);
return storedCount;
}
///
public async Task GetByIdAsync(string evidenceId, CancellationToken ct = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(evidenceId);
const string sql = """
SELECT evidence_id, subject_node_id, evidence_type, payload,
payload_schema_ver, external_cid, provenance, signatures
FROM evidence.records
WHERE evidence_id = @evidenceId
AND tenant_id = @tenantId
""";
return await QuerySingleOrDefaultAsync(
_tenantId,
sql,
cmd =>
{
AddParameter(cmd, "@evidenceId", evidenceId);
AddParameter(cmd, "@tenantId", Guid.Parse(_tenantId));
},
MapEvidence,
ct).ConfigureAwait(false);
}
///
public async Task> GetBySubjectAsync(
string subjectNodeId,
EvidenceType? typeFilter = null,
CancellationToken ct = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(subjectNodeId);
var sql = """
SELECT evidence_id, subject_node_id, evidence_type, payload,
payload_schema_ver, external_cid, provenance, signatures
FROM evidence.records
WHERE subject_node_id = @subjectNodeId
AND tenant_id = @tenantId
""";
if (typeFilter.HasValue)
{
sql += " AND evidence_type = @evidenceType";
}
sql += " ORDER BY created_at DESC";
return await QueryAsync(
_tenantId,
sql,
cmd =>
{
AddParameter(cmd, "@subjectNodeId", subjectNodeId);
AddParameter(cmd, "@tenantId", Guid.Parse(_tenantId));
if (typeFilter.HasValue)
{
AddParameter(cmd, "@evidenceType", (short)typeFilter.Value);
}
},
MapEvidence,
ct).ConfigureAwait(false);
}
///
public async Task> GetByTypeAsync(
EvidenceType evidenceType,
int limit = 100,
CancellationToken ct = default)
{
const string sql = """
SELECT evidence_id, subject_node_id, evidence_type, payload,
payload_schema_ver, external_cid, provenance, signatures
FROM evidence.records
WHERE evidence_type = @evidenceType
AND tenant_id = @tenantId
ORDER BY created_at DESC
LIMIT @limit
""";
return await QueryAsync(
_tenantId,
sql,
cmd =>
{
AddParameter(cmd, "@evidenceType", (short)evidenceType);
AddParameter(cmd, "@tenantId", Guid.Parse(_tenantId));
AddParameter(cmd, "@limit", limit);
},
MapEvidence,
ct).ConfigureAwait(false);
}
///
public async Task ExistsAsync(string subjectNodeId, EvidenceType type, CancellationToken ct = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(subjectNodeId);
const string sql = """
SELECT EXISTS(
SELECT 1 FROM evidence.records
WHERE subject_node_id = @subjectNodeId
AND evidence_type = @evidenceType
AND tenant_id = @tenantId
)
""";
var result = await ExecuteScalarAsync(
_tenantId,
sql,
cmd =>
{
AddParameter(cmd, "@subjectNodeId", subjectNodeId);
AddParameter(cmd, "@evidenceType", (short)type);
AddParameter(cmd, "@tenantId", Guid.Parse(_tenantId));
},
ct).ConfigureAwait(false);
return result;
}
///
public async Task DeleteAsync(string evidenceId, CancellationToken ct = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(evidenceId);
const string sql = """
DELETE FROM evidence.records
WHERE evidence_id = @evidenceId
AND tenant_id = @tenantId
""";
var affected = await ExecuteAsync(
_tenantId,
sql,
cmd =>
{
AddParameter(cmd, "@evidenceId", evidenceId);
AddParameter(cmd, "@tenantId", Guid.Parse(_tenantId));
},
ct).ConfigureAwait(false);
return affected > 0;
}
///
public async Task CountBySubjectAsync(string subjectNodeId, CancellationToken ct = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(subjectNodeId);
const string sql = """
SELECT COUNT(*)
FROM evidence.records
WHERE subject_node_id = @subjectNodeId
AND tenant_id = @tenantId
""";
var result = await ExecuteScalarAsync(
_tenantId,
sql,
cmd =>
{
AddParameter(cmd, "@subjectNodeId", subjectNodeId);
AddParameter(cmd, "@tenantId", Guid.Parse(_tenantId));
},
ct).ConfigureAwait(false);
return (int)result;
}
private void AddEvidenceParameters(NpgsqlCommand command, IEvidence evidence)
{
AddParameter(command, "@evidenceId", evidence.EvidenceId);
AddParameter(command, "@subjectNodeId", evidence.SubjectNodeId);
AddParameter(command, "@evidenceType", (short)evidence.EvidenceType);
command.Parameters.Add(new NpgsqlParameter("@payload", NpgsqlDbType.Bytea)
{
TypedValue = evidence.Payload.ToArray()
});
AddParameter(command, "@payloadSchemaVer", evidence.PayloadSchemaVersion);
AddParameter(command, "@externalCid", evidence.ExternalPayloadCid);
AddJsonbParameter(command, "@provenance", JsonSerializer.Serialize(evidence.Provenance, JsonOptions));
AddJsonbParameter(command, "@signatures", JsonSerializer.Serialize(evidence.Signatures, JsonOptions));
AddParameter(command, "@tenantId", Guid.Parse(_tenantId));
}
private static IEvidence MapEvidence(NpgsqlDataReader reader)
{
var evidenceId = reader.GetString(0);
var subjectNodeId = reader.GetString(1);
var evidenceType = (EvidenceType)reader.GetInt16(2);
var payload = reader.GetFieldValue(3);
var payloadSchemaVer = reader.GetString(4);
var externalCid = GetNullableString(reader, 5);
var provenanceJson = reader.GetString(6);
var signaturesJson = reader.GetString(7);
var provenance = JsonSerializer.Deserialize(provenanceJson, JsonOptions)
?? throw new InvalidOperationException($"Failed to deserialize provenance for evidence {evidenceId}");
var signatures = JsonSerializer.Deserialize>(signaturesJson, JsonOptions)
?? [];
return new EvidenceRecord
{
EvidenceId = evidenceId,
SubjectNodeId = subjectNodeId,
EvidenceType = evidenceType,
Payload = payload,
PayloadSchemaVersion = payloadSchemaVer,
ExternalPayloadCid = externalCid,
Provenance = provenance,
Signatures = signatures
};
}
}