wip: doctor/cli/docs/api to vector db consolidation; api hardening for descriptions, tenant, and scopes; migrations and conversions of all DALs to EF v10

This commit is contained in:
master
2026-02-23 15:30:50 +02:00
parent bd8fee6ed8
commit e746577380
1424 changed files with 81225 additions and 25251 deletions

View File

@@ -1,32 +1,36 @@
// Copyright (c) StellaOps. Licensed under the BUSL-1.1.
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Npgsql;
using StellaOps.Eventing.EfCore.Models;
using StellaOps.Eventing.Models;
using StellaOps.Eventing.Postgres;
using StellaOps.HybridLogicalClock;
using System.Data;
using System.Globalization;
namespace StellaOps.Eventing.Storage;
/// <summary>
/// PostgreSQL implementation of <see cref="ITimelineEventStore"/>.
/// PostgreSQL implementation of <see cref="ITimelineEventStore"/> backed by EF Core.
/// </summary>
public sealed class PostgresTimelineEventStore : ITimelineEventStore
{
private readonly NpgsqlDataSource _dataSource;
private readonly ILogger<PostgresTimelineEventStore> _logger;
private readonly EventingDataSource? _eventingDataSource;
/// <summary>
/// Initializes a new instance of the <see cref="PostgresTimelineEventStore"/> class.
/// Uses the raw NpgsqlDataSource (legacy DI path) or EventingDataSource (EF Core DI path).
/// </summary>
public PostgresTimelineEventStore(
NpgsqlDataSource dataSource,
ILogger<PostgresTimelineEventStore> logger)
ILogger<PostgresTimelineEventStore> logger,
EventingDataSource? eventingDataSource = null)
{
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_eventingDataSource = eventingDataSource;
}
/// <inheritdoc/>
@@ -34,27 +38,17 @@ public sealed class PostgresTimelineEventStore : ITimelineEventStore
{
ArgumentNullException.ThrowIfNull(timelineEvent);
const string sql = """
INSERT INTO timeline.events (
event_id, t_hlc, ts_wall, service, trace_parent,
correlation_id, kind, payload, payload_digest,
engine_name, engine_version, engine_digest, dsse_sig, schema_version
) VALUES (
@event_id, @t_hlc, @ts_wall, @service, @trace_parent,
@correlation_id, @kind, @payload::jsonb, @payload_digest,
@engine_name, @engine_version, @engine_digest, @dsse_sig, @schema_version
)
ON CONFLICT (event_id) DO NOTHING
""";
await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(sql, connection);
await using var dbContext = CreateDbContext(connection);
AddEventParameters(command, timelineEvent);
var entity = MapToEntity(timelineEvent);
dbContext.Events.Add(entity);
var rowsAffected = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
if (rowsAffected == 0)
try
{
await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
}
catch (DbUpdateException ex) when (IsUniqueViolation(ex))
{
_logger.LogDebug("Event {EventId} already exists (idempotent insert)", timelineEvent.EventId);
}
@@ -72,28 +66,25 @@ public sealed class PostgresTimelineEventStore : ITimelineEventStore
}
await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var transaction = await connection.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken).ConfigureAwait(false);
await using var dbContext = CreateDbContext(connection);
await using var transaction = await dbContext.Database.BeginTransactionAsync(cancellationToken).ConfigureAwait(false);
try
{
const string sql = """
INSERT INTO timeline.events (
event_id, t_hlc, ts_wall, service, trace_parent,
correlation_id, kind, payload, payload_digest,
engine_name, engine_version, engine_digest, dsse_sig, schema_version
) VALUES (
@event_id, @t_hlc, @ts_wall, @service, @trace_parent,
@correlation_id, @kind, @payload::jsonb, @payload_digest,
@engine_name, @engine_version, @engine_digest, @dsse_sig, @schema_version
)
ON CONFLICT (event_id) DO NOTHING
""";
foreach (var timelineEvent in eventList)
{
await using var command = new NpgsqlCommand(sql, connection, transaction);
AddEventParameters(command, timelineEvent);
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
var entity = MapToEntity(timelineEvent);
dbContext.Events.Add(entity);
try
{
await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
}
catch (DbUpdateException ex) when (IsUniqueViolation(ex))
{
// Idempotent: event already exists, detach and continue
dbContext.ChangeTracker.Clear();
}
}
await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
@@ -116,24 +107,19 @@ public sealed class PostgresTimelineEventStore : ITimelineEventStore
{
ArgumentException.ThrowIfNullOrWhiteSpace(correlationId);
const string sql = """
SELECT event_id, t_hlc, ts_wall, service, trace_parent,
correlation_id, kind, payload, payload_digest,
engine_name, engine_version, engine_digest, dsse_sig, schema_version
FROM timeline.events
WHERE correlation_id = @correlation_id
ORDER BY t_hlc ASC
LIMIT @limit OFFSET @offset
""";
await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(sql, connection);
await using var dbContext = CreateDbContext(connection);
command.Parameters.AddWithValue("@correlation_id", correlationId);
command.Parameters.AddWithValue("@limit", limit);
command.Parameters.AddWithValue("@offset", offset);
var entities = await dbContext.Events
.AsNoTracking()
.Where(e => e.CorrelationId == correlationId)
.OrderBy(e => e.THlc)
.Skip(offset)
.Take(limit)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
return await ExecuteQueryAsync(command, cancellationToken).ConfigureAwait(false);
return entities.Select(MapToDomain).ToList();
}
/// <inheritdoc/>
@@ -145,25 +131,22 @@ public sealed class PostgresTimelineEventStore : ITimelineEventStore
{
ArgumentException.ThrowIfNullOrWhiteSpace(correlationId);
const string sql = """
SELECT event_id, t_hlc, ts_wall, service, trace_parent,
correlation_id, kind, payload, payload_digest,
engine_name, engine_version, engine_digest, dsse_sig, schema_version
FROM timeline.events
WHERE correlation_id = @correlation_id
AND t_hlc >= @from_hlc
AND t_hlc <= @to_hlc
ORDER BY t_hlc ASC
""";
var fromStr = fromHlc.ToSortableString();
var toStr = toHlc.ToSortableString();
await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(sql, connection);
await using var dbContext = CreateDbContext(connection);
command.Parameters.AddWithValue("@correlation_id", correlationId);
command.Parameters.AddWithValue("@from_hlc", fromHlc.ToSortableString());
command.Parameters.AddWithValue("@to_hlc", toHlc.ToSortableString());
var entities = await dbContext.Events
.AsNoTracking()
.Where(e => e.CorrelationId == correlationId
&& string.Compare(e.THlc, fromStr) >= 0
&& string.Compare(e.THlc, toStr) <= 0)
.OrderBy(e => e.THlc)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
return await ExecuteQueryAsync(command, cancellationToken).ConfigureAwait(false);
return entities.Select(MapToDomain).ToList();
}
/// <inheritdoc/>
@@ -175,38 +158,26 @@ public sealed class PostgresTimelineEventStore : ITimelineEventStore
{
ArgumentException.ThrowIfNullOrWhiteSpace(service);
var sql = fromHlc.HasValue
? """
SELECT event_id, t_hlc, ts_wall, service, trace_parent,
correlation_id, kind, payload, payload_digest,
engine_name, engine_version, engine_digest, dsse_sig, schema_version
FROM timeline.events
WHERE service = @service AND t_hlc >= @from_hlc
ORDER BY t_hlc ASC
LIMIT @limit
"""
: """
SELECT event_id, t_hlc, ts_wall, service, trace_parent,
correlation_id, kind, payload, payload_digest,
engine_name, engine_version, engine_digest, dsse_sig, schema_version
FROM timeline.events
WHERE service = @service
ORDER BY t_hlc ASC
LIMIT @limit
""";
await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(sql, connection);
await using var dbContext = CreateDbContext(connection);
command.Parameters.AddWithValue("@service", service);
command.Parameters.AddWithValue("@limit", limit);
IQueryable<TimelineEventEntity> query = dbContext.Events
.AsNoTracking()
.Where(e => e.Service == service);
if (fromHlc.HasValue)
{
command.Parameters.AddWithValue("@from_hlc", fromHlc.Value.ToSortableString());
var fromStr = fromHlc.Value.ToSortableString();
query = query.Where(e => string.Compare(e.THlc, fromStr) >= 0);
}
return await ExecuteQueryAsync(command, cancellationToken).ConfigureAwait(false);
var entities = await query
.OrderBy(e => e.THlc)
.Take(limit)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
return entities.Select(MapToDomain).ToList();
}
/// <inheritdoc/>
@@ -214,21 +185,15 @@ public sealed class PostgresTimelineEventStore : ITimelineEventStore
{
ArgumentException.ThrowIfNullOrWhiteSpace(eventId);
const string sql = """
SELECT event_id, t_hlc, ts_wall, service, trace_parent,
correlation_id, kind, payload, payload_digest,
engine_name, engine_version, engine_digest, dsse_sig, schema_version
FROM timeline.events
WHERE event_id = @event_id
""";
await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(sql, connection);
await using var dbContext = CreateDbContext(connection);
command.Parameters.AddWithValue("@event_id", eventId);
var entity = await dbContext.Events
.AsNoTracking()
.FirstOrDefaultAsync(e => e.EventId == eventId, cancellationToken)
.ConfigureAwait(false);
var events = await ExecuteQueryAsync(command, cancellationToken).ConfigureAwait(false);
return events.Count > 0 ? events[0] : null;
return entity is null ? null : MapToDomain(entity);
}
/// <inheritdoc/>
@@ -236,78 +201,75 @@ public sealed class PostgresTimelineEventStore : ITimelineEventStore
{
ArgumentException.ThrowIfNullOrWhiteSpace(correlationId);
const string sql = """
SELECT COUNT(*) FROM timeline.events WHERE correlation_id = @correlation_id
""";
await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(sql, connection);
await using var dbContext = CreateDbContext(connection);
command.Parameters.AddWithValue("@correlation_id", correlationId);
var result = await command.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false);
return Convert.ToInt64(result, CultureInfo.InvariantCulture);
return await dbContext.Events
.AsNoTracking()
.Where(e => e.CorrelationId == correlationId)
.LongCountAsync(cancellationToken)
.ConfigureAwait(false);
}
private static void AddEventParameters(NpgsqlCommand command, TimelineEvent e)
private EfCore.Context.EventingDbContext CreateDbContext(NpgsqlConnection connection)
{
command.Parameters.AddWithValue("@event_id", e.EventId);
command.Parameters.AddWithValue("@t_hlc", e.THlc.ToSortableString());
command.Parameters.AddWithValue("@ts_wall", e.TsWall);
command.Parameters.AddWithValue("@service", e.Service);
command.Parameters.AddWithValue("@trace_parent", (object?)e.TraceParent ?? DBNull.Value);
command.Parameters.AddWithValue("@correlation_id", e.CorrelationId);
command.Parameters.AddWithValue("@kind", e.Kind);
command.Parameters.AddWithValue("@payload", e.Payload);
command.Parameters.AddWithValue("@payload_digest", e.PayloadDigest);
command.Parameters.AddWithValue("@engine_name", e.EngineVersion.EngineName);
command.Parameters.AddWithValue("@engine_version", e.EngineVersion.Version);
command.Parameters.AddWithValue("@engine_digest", e.EngineVersion.SourceDigest);
command.Parameters.AddWithValue("@dsse_sig", (object?)e.DsseSig ?? DBNull.Value);
command.Parameters.AddWithValue("@schema_version", e.SchemaVersion);
var commandTimeout = _eventingDataSource?.CommandTimeoutSeconds ?? 30;
var schemaName = _eventingDataSource?.SchemaName ?? EventingDataSource.DefaultSchemaName;
return EventingDbContextFactory.Create(connection, commandTimeout, schemaName);
}
private static async Task<IReadOnlyList<TimelineEvent>> ExecuteQueryAsync(
NpgsqlCommand command,
CancellationToken cancellationToken)
private static TimelineEventEntity MapToEntity(TimelineEvent e)
{
var events = new List<TimelineEvent>();
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
return new TimelineEventEntity
{
events.Add(MapFromReader(reader));
}
return events;
}
private static TimelineEvent MapFromReader(NpgsqlDataReader reader)
{
var hlcString = reader.GetString(reader.GetOrdinal("t_hlc"));
return new TimelineEvent
{
EventId = reader.GetString(reader.GetOrdinal("event_id")),
THlc = HlcTimestamp.Parse(hlcString),
TsWall = reader.GetFieldValue<DateTimeOffset>(reader.GetOrdinal("ts_wall")),
Service = reader.GetString(reader.GetOrdinal("service")),
TraceParent = reader.IsDBNull(reader.GetOrdinal("trace_parent"))
? null
: reader.GetString(reader.GetOrdinal("trace_parent")),
CorrelationId = reader.GetString(reader.GetOrdinal("correlation_id")),
Kind = reader.GetString(reader.GetOrdinal("kind")),
Payload = reader.GetString(reader.GetOrdinal("payload")),
PayloadDigest = (byte[])reader.GetValue(reader.GetOrdinal("payload_digest")),
EngineVersion = new EngineVersionRef(
reader.GetString(reader.GetOrdinal("engine_name")),
reader.GetString(reader.GetOrdinal("engine_version")),
reader.GetString(reader.GetOrdinal("engine_digest"))),
DsseSig = reader.IsDBNull(reader.GetOrdinal("dsse_sig"))
? null
: reader.GetString(reader.GetOrdinal("dsse_sig")),
SchemaVersion = reader.GetInt32(reader.GetOrdinal("schema_version"))
EventId = e.EventId,
THlc = e.THlc.ToSortableString(),
TsWall = e.TsWall,
Service = e.Service,
TraceParent = e.TraceParent,
CorrelationId = e.CorrelationId,
Kind = e.Kind,
Payload = e.Payload,
PayloadDigest = e.PayloadDigest,
EngineName = e.EngineVersion.EngineName,
EngineVersion = e.EngineVersion.Version,
EngineDigest = e.EngineVersion.SourceDigest,
DsseSig = e.DsseSig,
SchemaVersion = e.SchemaVersion
};
}
private static TimelineEvent MapToDomain(TimelineEventEntity entity)
{
return new TimelineEvent
{
EventId = entity.EventId,
THlc = HlcTimestamp.Parse(entity.THlc),
TsWall = entity.TsWall,
Service = entity.Service,
TraceParent = entity.TraceParent,
CorrelationId = entity.CorrelationId,
Kind = entity.Kind,
Payload = entity.Payload,
PayloadDigest = entity.PayloadDigest,
EngineVersion = new EngineVersionRef(
entity.EngineName,
entity.EngineVersion,
entity.EngineDigest),
DsseSig = entity.DsseSig,
SchemaVersion = entity.SchemaVersion
};
}
private static bool IsUniqueViolation(DbUpdateException exception)
{
Exception? current = exception;
while (current is not null)
{
if (current is PostgresException { SqlState: PostgresErrorCodes.UniqueViolation })
return true;
current = current.InnerException;
}
return false;
}
}