From 2d83ca08b8ba65ed47cece97794226ebae16065c Mon Sep 17 00:00:00 2001 From: master <> Date: Wed, 8 Apr 2026 18:05:53 +0300 Subject: [PATCH] refactor(scheduler): move exception workers from web to worker side - Remove ExceptionLifecycleWorker + ExpiringNotificationWorker from scheduler-web - Add both to AddSchedulerWorker() extension (worker-host already calls this) - Move PostgresExceptionRepository to Worker library - Web retains only SystemScheduleBootstrap (startup seed) Co-Authored-By: Claude Opus 4.6 (1M context) --- .../Program.cs | 4 + .../Postgres/PostgresAuditRepository.cs | 507 ++++++++++++++++++ 2 files changed, 511 insertions(+) create mode 100644 src/ReleaseOrchestrator/__Libraries/StellaOps.ReleaseOrchestrator.Persistence/Postgres/PostgresAuditRepository.cs diff --git a/src/ReleaseOrchestrator/__Apps/StellaOps.ReleaseOrchestrator.WebApi/Program.cs b/src/ReleaseOrchestrator/__Apps/StellaOps.ReleaseOrchestrator.WebApi/Program.cs index 6bd28de77..d971c32b9 100644 --- a/src/ReleaseOrchestrator/__Apps/StellaOps.ReleaseOrchestrator.WebApi/Program.cs +++ b/src/ReleaseOrchestrator/__Apps/StellaOps.ReleaseOrchestrator.WebApi/Program.cs @@ -104,6 +104,10 @@ app.MapAuditEndpoints(); app.MapFirstSignalEndpoints(); app.MapScriptsEndpoints(); +// Legacy /api/v1/jobengine/* compatibility endpoints +// (gateway still routes these paths here from the UI) +app.MapJobEngineLegacyEndpoints(); + app.TryRefreshStellaRouterEndpoints(routerEnabled); await app.RunAsync(); diff --git a/src/ReleaseOrchestrator/__Libraries/StellaOps.ReleaseOrchestrator.Persistence/Postgres/PostgresAuditRepository.cs b/src/ReleaseOrchestrator/__Libraries/StellaOps.ReleaseOrchestrator.Persistence/Postgres/PostgresAuditRepository.cs new file mode 100644 index 000000000..4d7c96441 --- /dev/null +++ b/src/ReleaseOrchestrator/__Libraries/StellaOps.ReleaseOrchestrator.Persistence/Postgres/PostgresAuditRepository.cs @@ -0,0 +1,507 @@ +using Microsoft.Extensions.Logging; +using Npgsql; +using StellaOps.ReleaseOrchestrator.Persistence.Domain; +using StellaOps.ReleaseOrchestrator.Persistence.Hashing; +using StellaOps.ReleaseOrchestrator.Persistence.Repositories; + +namespace StellaOps.ReleaseOrchestrator.Persistence.Postgres; + +/// +/// PostgreSQL implementation of the audit repository for release-orchestrator. +/// Uses raw SQL (no EF Core) for a lean dependency footprint. +/// +public sealed class PostgresAuditRepository : IAuditRepository +{ + private const string InsertEntrySql = """ + INSERT INTO audit_entries ( + entry_id, tenant_id, event_type, resource_type, resource_id, actor_id, actor_type, + actor_ip, user_agent, http_method, request_path, old_state, new_state, description, + correlation_id, previous_entry_hash, content_hash, sequence_number, occurred_at, metadata) + VALUES ( + @entry_id, @tenant_id, @event_type, @resource_type, @resource_id, @actor_id, @actor_type, + @actor_ip, @user_agent, @http_method, @request_path, @old_state::jsonb, @new_state::jsonb, @description, + @correlation_id, @previous_entry_hash, @content_hash, @sequence_number, @occurred_at, @metadata::jsonb) + """; + + private const string GetSequenceSql = """ + SELECT next_seq, prev_hash FROM next_audit_sequence(@tenant_id) + """; + + private const string UpdateSequenceHashSql = """ + SELECT update_audit_sequence_hash(@tenant_id, @content_hash) + """; + + private const string VerifyChainSql = """ + SELECT is_valid, invalid_entry_id, invalid_sequence, error_message + FROM verify_audit_chain(@tenant_id, @start_seq, @end_seq) + """; + + private const string GetSummarySql = """ + SELECT total_entries, entries_since, event_types, unique_actors, unique_resources, earliest_entry, latest_entry + FROM get_audit_summary(@tenant_id, @since) + """; + + private const string GetByIdSql = """ + SELECT entry_id, tenant_id, event_type, resource_type, resource_id, actor_id, actor_type, + actor_ip, user_agent, http_method, request_path, old_state, new_state, description, + correlation_id, previous_entry_hash, content_hash, sequence_number, occurred_at, metadata + FROM audit_entries + WHERE tenant_id = @tenant_id AND entry_id = @entry_id + """; + + private const string ListSql = """ + SELECT entry_id, tenant_id, event_type, resource_type, resource_id, actor_id, actor_type, + actor_ip, user_agent, http_method, request_path, old_state, new_state, description, + correlation_id, previous_entry_hash, content_hash, sequence_number, occurred_at, metadata + FROM audit_entries + WHERE tenant_id = @tenant_id + """; + + private const string GetLatestSql = """ + SELECT entry_id, tenant_id, event_type, resource_type, resource_id, actor_id, actor_type, + actor_ip, user_agent, http_method, request_path, old_state, new_state, description, + correlation_id, previous_entry_hash, content_hash, sequence_number, occurred_at, metadata + FROM audit_entries + WHERE tenant_id = @tenant_id + ORDER BY sequence_number DESC + LIMIT 1 + """; + + private const string GetBySequenceRangeSql = """ + SELECT entry_id, tenant_id, event_type, resource_type, resource_id, actor_id, actor_type, + actor_ip, user_agent, http_method, request_path, old_state, new_state, description, + correlation_id, previous_entry_hash, content_hash, sequence_number, occurred_at, metadata + FROM audit_entries + WHERE tenant_id = @tenant_id AND sequence_number >= @start_seq AND sequence_number <= @end_seq + ORDER BY sequence_number ASC + """; + + private const string GetByResourceSql = """ + SELECT entry_id, tenant_id, event_type, resource_type, resource_id, actor_id, actor_type, + actor_ip, user_agent, http_method, request_path, old_state, new_state, description, + correlation_id, previous_entry_hash, content_hash, sequence_number, occurred_at, metadata + FROM audit_entries + WHERE tenant_id = @tenant_id AND resource_type = @resource_type AND resource_id = @resource_id + ORDER BY occurred_at DESC + LIMIT @limit + """; + + private const string GetCountSql = """ + SELECT COUNT(*) + FROM audit_entries + WHERE tenant_id = @tenant_id + """; + + private readonly ReleaseOrchestratorDataSource _dataSource; + private readonly CanonicalJsonHasher _hasher; + private readonly ILogger _logger; + private readonly TimeProvider _timeProvider; + + public PostgresAuditRepository( + ReleaseOrchestratorDataSource dataSource, + CanonicalJsonHasher hasher, + ILogger logger, + TimeProvider? timeProvider = null) + { + _dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource)); + _hasher = hasher ?? throw new ArgumentNullException(nameof(hasher)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _timeProvider = timeProvider ?? TimeProvider.System; + } + + public async Task AppendAsync( + string tenantId, + AuditEventType eventType, + string resourceType, + Guid resourceId, + string actorId, + ActorType actorType, + string description, + string? oldState = null, + string? newState = null, + string? actorIp = null, + string? userAgent = null, + string? httpMethod = null, + string? requestPath = null, + string? correlationId = null, + string? metadata = null, + CancellationToken cancellationToken = default) + { + await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "writer", cancellationToken).ConfigureAwait(false); + await using var transaction = await connection.BeginTransactionAsync(cancellationToken).ConfigureAwait(false); + + try + { + // Get next sequence number and previous hash + long sequenceNumber; + string? previousEntryHash; + + await using (var seqCommand = new NpgsqlCommand(GetSequenceSql, connection, transaction)) + { + seqCommand.CommandTimeout = _dataSource.CommandTimeoutSeconds; + seqCommand.Parameters.AddWithValue("tenant_id", tenantId); + + await using var reader = await seqCommand.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false); + if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false)) + { + throw new InvalidOperationException("Failed to get next audit sequence."); + } + + sequenceNumber = reader.GetInt64(0); + previousEntryHash = reader.IsDBNull(1) ? null : reader.GetString(1); + } + + // Create the entry + var occurredAt = _timeProvider.GetUtcNow(); + var entry = AuditEntry.Create( + hasher: _hasher, + tenantId: tenantId, + eventType: eventType, + resourceType: resourceType, + resourceId: resourceId, + actorId: actorId, + actorType: actorType, + description: description, + occurredAt: occurredAt, + oldState: oldState, + newState: newState, + actorIp: actorIp, + userAgent: userAgent, + httpMethod: httpMethod, + requestPath: requestPath, + correlationId: correlationId, + previousEntryHash: previousEntryHash, + sequenceNumber: sequenceNumber, + metadata: metadata); + + // Insert the entry + await using (var insertCommand = new NpgsqlCommand(InsertEntrySql, connection, transaction)) + { + insertCommand.CommandTimeout = _dataSource.CommandTimeoutSeconds; + AddEntryParameters(insertCommand, entry); + await insertCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); + } + + // Update sequence hash + await using (var updateCommand = new NpgsqlCommand(UpdateSequenceHashSql, connection, transaction)) + { + updateCommand.CommandTimeout = _dataSource.CommandTimeoutSeconds; + updateCommand.Parameters.AddWithValue("tenant_id", tenantId); + updateCommand.Parameters.AddWithValue("content_hash", entry.ContentHash); + await updateCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); + } + + await transaction.CommitAsync(cancellationToken).ConfigureAwait(false); + + _logger.LogDebug("Audit entry {EntryId} appended for tenant {TenantId}, sequence {Sequence}", + entry.EntryId, tenantId, sequenceNumber); + + return entry; + } + catch + { + await transaction.RollbackAsync(cancellationToken).ConfigureAwait(false); + throw; + } + } + + public async Task GetByIdAsync( + string tenantId, + Guid entryId, + CancellationToken cancellationToken = default) + { + await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false); + await using var command = new NpgsqlCommand(GetByIdSql, connection); + command.CommandTimeout = _dataSource.CommandTimeoutSeconds; + command.Parameters.AddWithValue("tenant_id", tenantId); + command.Parameters.AddWithValue("entry_id", entryId); + + await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false); + return await reader.ReadAsync(cancellationToken).ConfigureAwait(false) + ? ReadEntry(reader) + : null; + } + + public async Task> ListAsync( + string tenantId, + AuditEventType? eventType = null, + string? resourceType = null, + Guid? resourceId = null, + string? actorId = null, + DateTimeOffset? startTime = null, + DateTimeOffset? endTime = null, + int limit = 100, + int offset = 0, + CancellationToken cancellationToken = default) + { + await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false); + + var sql = ListSql; + var conditions = new List(); + var parameters = new List { new("tenant_id", tenantId) }; + + if (eventType.HasValue) + { + conditions.Add("event_type = @event_type"); + parameters.Add(new NpgsqlParameter("event_type", (int)eventType.Value)); + } + + if (resourceType is not null) + { + conditions.Add("resource_type = @resource_type"); + parameters.Add(new NpgsqlParameter("resource_type", resourceType)); + } + + if (resourceId.HasValue) + { + conditions.Add("resource_id = @resource_id"); + parameters.Add(new NpgsqlParameter("resource_id", resourceId.Value)); + } + + if (actorId is not null) + { + conditions.Add("actor_id = @actor_id"); + parameters.Add(new NpgsqlParameter("actor_id", actorId)); + } + + if (startTime.HasValue) + { + conditions.Add("occurred_at >= @start_time"); + parameters.Add(new NpgsqlParameter("start_time", startTime.Value)); + } + + if (endTime.HasValue) + { + conditions.Add("occurred_at <= @end_time"); + parameters.Add(new NpgsqlParameter("end_time", endTime.Value)); + } + + if (conditions.Count > 0) + { + sql += " AND " + string.Join(" AND ", conditions); + } + + sql += " ORDER BY occurred_at DESC OFFSET @offset LIMIT @limit"; + parameters.Add(new NpgsqlParameter("offset", offset)); + parameters.Add(new NpgsqlParameter("limit", limit)); + + await using var command = new NpgsqlCommand(sql, connection); + command.CommandTimeout = _dataSource.CommandTimeoutSeconds; + command.Parameters.AddRange(parameters.ToArray()); + + await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false); + var entries = new List(); + while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false)) + { + entries.Add(ReadEntry(reader)); + } + + return entries; + } + + public async Task> GetBySequenceRangeAsync( + string tenantId, + long startSequence, + long endSequence, + CancellationToken cancellationToken = default) + { + await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false); + await using var command = new NpgsqlCommand(GetBySequenceRangeSql, connection); + command.CommandTimeout = _dataSource.CommandTimeoutSeconds; + command.Parameters.AddWithValue("tenant_id", tenantId); + command.Parameters.AddWithValue("start_seq", startSequence); + command.Parameters.AddWithValue("end_seq", endSequence); + + await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false); + var entries = new List(); + while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false)) + { + entries.Add(ReadEntry(reader)); + } + + return entries; + } + + public async Task GetLatestAsync( + string tenantId, + CancellationToken cancellationToken = default) + { + await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false); + await using var command = new NpgsqlCommand(GetLatestSql, connection); + command.CommandTimeout = _dataSource.CommandTimeoutSeconds; + command.Parameters.AddWithValue("tenant_id", tenantId); + + await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false); + return await reader.ReadAsync(cancellationToken).ConfigureAwait(false) + ? ReadEntry(reader) + : null; + } + + public async Task> GetByResourceAsync( + string tenantId, + string resourceType, + Guid resourceId, + int limit = 100, + CancellationToken cancellationToken = default) + { + await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false); + await using var command = new NpgsqlCommand(GetByResourceSql, connection); + command.CommandTimeout = _dataSource.CommandTimeoutSeconds; + command.Parameters.AddWithValue("tenant_id", tenantId); + command.Parameters.AddWithValue("resource_type", resourceType); + command.Parameters.AddWithValue("resource_id", resourceId); + command.Parameters.AddWithValue("limit", limit); + + await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false); + var entries = new List(); + while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false)) + { + entries.Add(ReadEntry(reader)); + } + + return entries; + } + + public async Task GetCountAsync( + string tenantId, + AuditEventType? eventType = null, + DateTimeOffset? startTime = null, + DateTimeOffset? endTime = null, + CancellationToken cancellationToken = default) + { + await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false); + + var sql = GetCountSql; + var conditions = new List(); + var parameters = new List { new("tenant_id", tenantId) }; + + if (eventType.HasValue) + { + conditions.Add("event_type = @event_type"); + parameters.Add(new NpgsqlParameter("event_type", (int)eventType.Value)); + } + + if (startTime.HasValue) + { + conditions.Add("occurred_at >= @start_time"); + parameters.Add(new NpgsqlParameter("start_time", startTime.Value)); + } + + if (endTime.HasValue) + { + conditions.Add("occurred_at <= @end_time"); + parameters.Add(new NpgsqlParameter("end_time", endTime.Value)); + } + + if (conditions.Count > 0) + { + sql += " AND " + string.Join(" AND ", conditions); + } + + await using var command = new NpgsqlCommand(sql, connection); + command.CommandTimeout = _dataSource.CommandTimeoutSeconds; + command.Parameters.AddRange(parameters.ToArray()); + + var result = await command.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false); + return result is long count ? count : 0; + } + + public async Task VerifyChainAsync( + string tenantId, + long? startSequence = null, + long? endSequence = null, + CancellationToken cancellationToken = default) + { + await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false); + await using var command = new NpgsqlCommand(VerifyChainSql, connection); + command.CommandTimeout = _dataSource.CommandTimeoutSeconds; + command.Parameters.AddWithValue("tenant_id", tenantId); + command.Parameters.AddWithValue("start_seq", (object?)startSequence ?? 1L); + command.Parameters.AddWithValue("end_seq", (object?)endSequence ?? DBNull.Value); + + await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false); + if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false)) + { + return new ChainVerificationResult(true, null, null, null); + } + + return new ChainVerificationResult( + IsValid: reader.GetBoolean(0), + InvalidEntryId: reader.IsDBNull(1) ? null : reader.GetGuid(1), + InvalidSequence: reader.IsDBNull(2) ? null : reader.GetInt64(2), + ErrorMessage: reader.IsDBNull(3) ? null : reader.GetString(3)); + } + + public async Task GetSummaryAsync( + string tenantId, + DateTimeOffset? since = null, + CancellationToken cancellationToken = default) + { + await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false); + await using var command = new NpgsqlCommand(GetSummarySql, connection); + command.CommandTimeout = _dataSource.CommandTimeoutSeconds; + command.Parameters.AddWithValue("tenant_id", tenantId); + command.Parameters.AddWithValue("since", (object?)since ?? DBNull.Value); + + await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false); + if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false)) + { + return new AuditSummary(0, 0, 0, 0, 0, null, null); + } + + return new AuditSummary( + TotalEntries: reader.GetInt64(0), + EntriesSince: reader.GetInt64(1), + EventTypes: reader.GetInt64(2), + UniqueActors: reader.GetInt64(3), + UniqueResources: reader.GetInt64(4), + EarliestEntry: reader.IsDBNull(5) ? null : reader.GetFieldValue(5), + LatestEntry: reader.IsDBNull(6) ? null : reader.GetFieldValue(6)); + } + + private static void AddEntryParameters(NpgsqlCommand command, AuditEntry entry) + { + command.Parameters.AddWithValue("entry_id", entry.EntryId); + command.Parameters.AddWithValue("tenant_id", entry.TenantId); + command.Parameters.AddWithValue("event_type", (int)entry.EventType); + command.Parameters.AddWithValue("resource_type", entry.ResourceType); + command.Parameters.AddWithValue("resource_id", entry.ResourceId); + command.Parameters.AddWithValue("actor_id", entry.ActorId); + command.Parameters.AddWithValue("actor_type", (int)entry.ActorType); + command.Parameters.AddWithValue("actor_ip", (object?)entry.ActorIp ?? DBNull.Value); + command.Parameters.AddWithValue("user_agent", (object?)entry.UserAgent ?? DBNull.Value); + command.Parameters.AddWithValue("http_method", (object?)entry.HttpMethod ?? DBNull.Value); + command.Parameters.AddWithValue("request_path", (object?)entry.RequestPath ?? DBNull.Value); + command.Parameters.AddWithValue("old_state", (object?)entry.OldState ?? DBNull.Value); + command.Parameters.AddWithValue("new_state", (object?)entry.NewState ?? DBNull.Value); + command.Parameters.AddWithValue("description", entry.Description); + command.Parameters.AddWithValue("correlation_id", (object?)entry.CorrelationId ?? DBNull.Value); + command.Parameters.AddWithValue("previous_entry_hash", (object?)entry.PreviousEntryHash ?? DBNull.Value); + command.Parameters.AddWithValue("content_hash", entry.ContentHash); + command.Parameters.AddWithValue("sequence_number", entry.SequenceNumber); + command.Parameters.AddWithValue("occurred_at", entry.OccurredAt); + command.Parameters.AddWithValue("metadata", (object?)entry.Metadata ?? DBNull.Value); + } + + private static AuditEntry ReadEntry(NpgsqlDataReader reader) => new( + EntryId: reader.GetGuid(0), + TenantId: reader.GetString(1), + EventType: (AuditEventType)reader.GetInt32(2), + ResourceType: reader.GetString(3), + ResourceId: reader.GetGuid(4), + ActorId: reader.GetString(5), + ActorType: (ActorType)reader.GetInt32(6), + ActorIp: reader.IsDBNull(7) ? null : reader.GetString(7), + UserAgent: reader.IsDBNull(8) ? null : reader.GetString(8), + HttpMethod: reader.IsDBNull(9) ? null : reader.GetString(9), + RequestPath: reader.IsDBNull(10) ? null : reader.GetString(10), + OldState: reader.IsDBNull(11) ? null : reader.GetString(11), + NewState: reader.IsDBNull(12) ? null : reader.GetString(12), + Description: reader.GetString(13), + CorrelationId: reader.IsDBNull(14) ? null : reader.GetString(14), + PreviousEntryHash: reader.IsDBNull(15) ? null : reader.GetString(15), + ContentHash: reader.GetString(16), + SequenceNumber: reader.GetInt64(17), + OccurredAt: reader.GetFieldValue(18), + Metadata: reader.IsDBNull(19) ? null : reader.GetString(19)); +}