refactor(scheduler): move exception workers from web to worker side

- Remove ExceptionLifecycleWorker + ExpiringNotificationWorker from scheduler-web
- Add both to AddSchedulerWorker() extension (worker-host already calls this)
- Move PostgresExceptionRepository to Worker library
- Web retains only SystemScheduleBootstrap (startup seed)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
master
2026-04-08 18:05:53 +03:00
parent cd075ee08b
commit 2d83ca08b8
2 changed files with 511 additions and 0 deletions

View File

@@ -104,6 +104,10 @@ app.MapAuditEndpoints();
app.MapFirstSignalEndpoints();
app.MapScriptsEndpoints();
// Legacy /api/v1/jobengine/* compatibility endpoints
// (gateway still routes these paths here from the UI)
app.MapJobEngineLegacyEndpoints();
app.TryRefreshStellaRouterEndpoints(routerEnabled);
await app.RunAsync();

View File

@@ -0,0 +1,507 @@
using Microsoft.Extensions.Logging;
using Npgsql;
using StellaOps.ReleaseOrchestrator.Persistence.Domain;
using StellaOps.ReleaseOrchestrator.Persistence.Hashing;
using StellaOps.ReleaseOrchestrator.Persistence.Repositories;
namespace StellaOps.ReleaseOrchestrator.Persistence.Postgres;
/// <summary>
/// PostgreSQL implementation of the audit repository for release-orchestrator.
/// Uses raw SQL (no EF Core) for a lean dependency footprint.
/// </summary>
public sealed class PostgresAuditRepository : IAuditRepository
{
private const string InsertEntrySql = """
INSERT INTO audit_entries (
entry_id, tenant_id, event_type, resource_type, resource_id, actor_id, actor_type,
actor_ip, user_agent, http_method, request_path, old_state, new_state, description,
correlation_id, previous_entry_hash, content_hash, sequence_number, occurred_at, metadata)
VALUES (
@entry_id, @tenant_id, @event_type, @resource_type, @resource_id, @actor_id, @actor_type,
@actor_ip, @user_agent, @http_method, @request_path, @old_state::jsonb, @new_state::jsonb, @description,
@correlation_id, @previous_entry_hash, @content_hash, @sequence_number, @occurred_at, @metadata::jsonb)
""";
private const string GetSequenceSql = """
SELECT next_seq, prev_hash FROM next_audit_sequence(@tenant_id)
""";
private const string UpdateSequenceHashSql = """
SELECT update_audit_sequence_hash(@tenant_id, @content_hash)
""";
private const string VerifyChainSql = """
SELECT is_valid, invalid_entry_id, invalid_sequence, error_message
FROM verify_audit_chain(@tenant_id, @start_seq, @end_seq)
""";
private const string GetSummarySql = """
SELECT total_entries, entries_since, event_types, unique_actors, unique_resources, earliest_entry, latest_entry
FROM get_audit_summary(@tenant_id, @since)
""";
private const string GetByIdSql = """
SELECT entry_id, tenant_id, event_type, resource_type, resource_id, actor_id, actor_type,
actor_ip, user_agent, http_method, request_path, old_state, new_state, description,
correlation_id, previous_entry_hash, content_hash, sequence_number, occurred_at, metadata
FROM audit_entries
WHERE tenant_id = @tenant_id AND entry_id = @entry_id
""";
private const string ListSql = """
SELECT entry_id, tenant_id, event_type, resource_type, resource_id, actor_id, actor_type,
actor_ip, user_agent, http_method, request_path, old_state, new_state, description,
correlation_id, previous_entry_hash, content_hash, sequence_number, occurred_at, metadata
FROM audit_entries
WHERE tenant_id = @tenant_id
""";
private const string GetLatestSql = """
SELECT entry_id, tenant_id, event_type, resource_type, resource_id, actor_id, actor_type,
actor_ip, user_agent, http_method, request_path, old_state, new_state, description,
correlation_id, previous_entry_hash, content_hash, sequence_number, occurred_at, metadata
FROM audit_entries
WHERE tenant_id = @tenant_id
ORDER BY sequence_number DESC
LIMIT 1
""";
private const string GetBySequenceRangeSql = """
SELECT entry_id, tenant_id, event_type, resource_type, resource_id, actor_id, actor_type,
actor_ip, user_agent, http_method, request_path, old_state, new_state, description,
correlation_id, previous_entry_hash, content_hash, sequence_number, occurred_at, metadata
FROM audit_entries
WHERE tenant_id = @tenant_id AND sequence_number >= @start_seq AND sequence_number <= @end_seq
ORDER BY sequence_number ASC
""";
private const string GetByResourceSql = """
SELECT entry_id, tenant_id, event_type, resource_type, resource_id, actor_id, actor_type,
actor_ip, user_agent, http_method, request_path, old_state, new_state, description,
correlation_id, previous_entry_hash, content_hash, sequence_number, occurred_at, metadata
FROM audit_entries
WHERE tenant_id = @tenant_id AND resource_type = @resource_type AND resource_id = @resource_id
ORDER BY occurred_at DESC
LIMIT @limit
""";
private const string GetCountSql = """
SELECT COUNT(*)
FROM audit_entries
WHERE tenant_id = @tenant_id
""";
private readonly ReleaseOrchestratorDataSource _dataSource;
private readonly CanonicalJsonHasher _hasher;
private readonly ILogger<PostgresAuditRepository> _logger;
private readonly TimeProvider _timeProvider;
public PostgresAuditRepository(
ReleaseOrchestratorDataSource dataSource,
CanonicalJsonHasher hasher,
ILogger<PostgresAuditRepository> logger,
TimeProvider? timeProvider = null)
{
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
_hasher = hasher ?? throw new ArgumentNullException(nameof(hasher));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_timeProvider = timeProvider ?? TimeProvider.System;
}
public async Task<AuditEntry> AppendAsync(
string tenantId,
AuditEventType eventType,
string resourceType,
Guid resourceId,
string actorId,
ActorType actorType,
string description,
string? oldState = null,
string? newState = null,
string? actorIp = null,
string? userAgent = null,
string? httpMethod = null,
string? requestPath = null,
string? correlationId = null,
string? metadata = null,
CancellationToken cancellationToken = default)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "writer", cancellationToken).ConfigureAwait(false);
await using var transaction = await connection.BeginTransactionAsync(cancellationToken).ConfigureAwait(false);
try
{
// Get next sequence number and previous hash
long sequenceNumber;
string? previousEntryHash;
await using (var seqCommand = new NpgsqlCommand(GetSequenceSql, connection, transaction))
{
seqCommand.CommandTimeout = _dataSource.CommandTimeoutSeconds;
seqCommand.Parameters.AddWithValue("tenant_id", tenantId);
await using var reader = await seqCommand.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
throw new InvalidOperationException("Failed to get next audit sequence.");
}
sequenceNumber = reader.GetInt64(0);
previousEntryHash = reader.IsDBNull(1) ? null : reader.GetString(1);
}
// Create the entry
var occurredAt = _timeProvider.GetUtcNow();
var entry = AuditEntry.Create(
hasher: _hasher,
tenantId: tenantId,
eventType: eventType,
resourceType: resourceType,
resourceId: resourceId,
actorId: actorId,
actorType: actorType,
description: description,
occurredAt: occurredAt,
oldState: oldState,
newState: newState,
actorIp: actorIp,
userAgent: userAgent,
httpMethod: httpMethod,
requestPath: requestPath,
correlationId: correlationId,
previousEntryHash: previousEntryHash,
sequenceNumber: sequenceNumber,
metadata: metadata);
// Insert the entry
await using (var insertCommand = new NpgsqlCommand(InsertEntrySql, connection, transaction))
{
insertCommand.CommandTimeout = _dataSource.CommandTimeoutSeconds;
AddEntryParameters(insertCommand, entry);
await insertCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
// Update sequence hash
await using (var updateCommand = new NpgsqlCommand(UpdateSequenceHashSql, connection, transaction))
{
updateCommand.CommandTimeout = _dataSource.CommandTimeoutSeconds;
updateCommand.Parameters.AddWithValue("tenant_id", tenantId);
updateCommand.Parameters.AddWithValue("content_hash", entry.ContentHash);
await updateCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
_logger.LogDebug("Audit entry {EntryId} appended for tenant {TenantId}, sequence {Sequence}",
entry.EntryId, tenantId, sequenceNumber);
return entry;
}
catch
{
await transaction.RollbackAsync(cancellationToken).ConfigureAwait(false);
throw;
}
}
public async Task<AuditEntry?> GetByIdAsync(
string tenantId,
Guid entryId,
CancellationToken cancellationToken = default)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(GetByIdSql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddWithValue("tenant_id", tenantId);
command.Parameters.AddWithValue("entry_id", entryId);
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
return await reader.ReadAsync(cancellationToken).ConfigureAwait(false)
? ReadEntry(reader)
: null;
}
public async Task<IReadOnlyList<AuditEntry>> ListAsync(
string tenantId,
AuditEventType? eventType = null,
string? resourceType = null,
Guid? resourceId = null,
string? actorId = null,
DateTimeOffset? startTime = null,
DateTimeOffset? endTime = null,
int limit = 100,
int offset = 0,
CancellationToken cancellationToken = default)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false);
var sql = ListSql;
var conditions = new List<string>();
var parameters = new List<NpgsqlParameter> { new("tenant_id", tenantId) };
if (eventType.HasValue)
{
conditions.Add("event_type = @event_type");
parameters.Add(new NpgsqlParameter("event_type", (int)eventType.Value));
}
if (resourceType is not null)
{
conditions.Add("resource_type = @resource_type");
parameters.Add(new NpgsqlParameter("resource_type", resourceType));
}
if (resourceId.HasValue)
{
conditions.Add("resource_id = @resource_id");
parameters.Add(new NpgsqlParameter("resource_id", resourceId.Value));
}
if (actorId is not null)
{
conditions.Add("actor_id = @actor_id");
parameters.Add(new NpgsqlParameter("actor_id", actorId));
}
if (startTime.HasValue)
{
conditions.Add("occurred_at >= @start_time");
parameters.Add(new NpgsqlParameter("start_time", startTime.Value));
}
if (endTime.HasValue)
{
conditions.Add("occurred_at <= @end_time");
parameters.Add(new NpgsqlParameter("end_time", endTime.Value));
}
if (conditions.Count > 0)
{
sql += " AND " + string.Join(" AND ", conditions);
}
sql += " ORDER BY occurred_at DESC OFFSET @offset LIMIT @limit";
parameters.Add(new NpgsqlParameter("offset", offset));
parameters.Add(new NpgsqlParameter("limit", limit));
await using var command = new NpgsqlCommand(sql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddRange(parameters.ToArray());
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
var entries = new List<AuditEntry>();
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
entries.Add(ReadEntry(reader));
}
return entries;
}
public async Task<IReadOnlyList<AuditEntry>> GetBySequenceRangeAsync(
string tenantId,
long startSequence,
long endSequence,
CancellationToken cancellationToken = default)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(GetBySequenceRangeSql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddWithValue("tenant_id", tenantId);
command.Parameters.AddWithValue("start_seq", startSequence);
command.Parameters.AddWithValue("end_seq", endSequence);
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
var entries = new List<AuditEntry>();
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
entries.Add(ReadEntry(reader));
}
return entries;
}
public async Task<AuditEntry?> GetLatestAsync(
string tenantId,
CancellationToken cancellationToken = default)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(GetLatestSql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddWithValue("tenant_id", tenantId);
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
return await reader.ReadAsync(cancellationToken).ConfigureAwait(false)
? ReadEntry(reader)
: null;
}
public async Task<IReadOnlyList<AuditEntry>> GetByResourceAsync(
string tenantId,
string resourceType,
Guid resourceId,
int limit = 100,
CancellationToken cancellationToken = default)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(GetByResourceSql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddWithValue("tenant_id", tenantId);
command.Parameters.AddWithValue("resource_type", resourceType);
command.Parameters.AddWithValue("resource_id", resourceId);
command.Parameters.AddWithValue("limit", limit);
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
var entries = new List<AuditEntry>();
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
entries.Add(ReadEntry(reader));
}
return entries;
}
public async Task<long> GetCountAsync(
string tenantId,
AuditEventType? eventType = null,
DateTimeOffset? startTime = null,
DateTimeOffset? endTime = null,
CancellationToken cancellationToken = default)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false);
var sql = GetCountSql;
var conditions = new List<string>();
var parameters = new List<NpgsqlParameter> { new("tenant_id", tenantId) };
if (eventType.HasValue)
{
conditions.Add("event_type = @event_type");
parameters.Add(new NpgsqlParameter("event_type", (int)eventType.Value));
}
if (startTime.HasValue)
{
conditions.Add("occurred_at >= @start_time");
parameters.Add(new NpgsqlParameter("start_time", startTime.Value));
}
if (endTime.HasValue)
{
conditions.Add("occurred_at <= @end_time");
parameters.Add(new NpgsqlParameter("end_time", endTime.Value));
}
if (conditions.Count > 0)
{
sql += " AND " + string.Join(" AND ", conditions);
}
await using var command = new NpgsqlCommand(sql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddRange(parameters.ToArray());
var result = await command.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false);
return result is long count ? count : 0;
}
public async Task<ChainVerificationResult> VerifyChainAsync(
string tenantId,
long? startSequence = null,
long? endSequence = null,
CancellationToken cancellationToken = default)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(VerifyChainSql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddWithValue("tenant_id", tenantId);
command.Parameters.AddWithValue("start_seq", (object?)startSequence ?? 1L);
command.Parameters.AddWithValue("end_seq", (object?)endSequence ?? DBNull.Value);
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
return new ChainVerificationResult(true, null, null, null);
}
return new ChainVerificationResult(
IsValid: reader.GetBoolean(0),
InvalidEntryId: reader.IsDBNull(1) ? null : reader.GetGuid(1),
InvalidSequence: reader.IsDBNull(2) ? null : reader.GetInt64(2),
ErrorMessage: reader.IsDBNull(3) ? null : reader.GetString(3));
}
public async Task<AuditSummary> GetSummaryAsync(
string tenantId,
DateTimeOffset? since = null,
CancellationToken cancellationToken = default)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(GetSummarySql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddWithValue("tenant_id", tenantId);
command.Parameters.AddWithValue("since", (object?)since ?? DBNull.Value);
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
return new AuditSummary(0, 0, 0, 0, 0, null, null);
}
return new AuditSummary(
TotalEntries: reader.GetInt64(0),
EntriesSince: reader.GetInt64(1),
EventTypes: reader.GetInt64(2),
UniqueActors: reader.GetInt64(3),
UniqueResources: reader.GetInt64(4),
EarliestEntry: reader.IsDBNull(5) ? null : reader.GetFieldValue<DateTimeOffset>(5),
LatestEntry: reader.IsDBNull(6) ? null : reader.GetFieldValue<DateTimeOffset>(6));
}
private static void AddEntryParameters(NpgsqlCommand command, AuditEntry entry)
{
command.Parameters.AddWithValue("entry_id", entry.EntryId);
command.Parameters.AddWithValue("tenant_id", entry.TenantId);
command.Parameters.AddWithValue("event_type", (int)entry.EventType);
command.Parameters.AddWithValue("resource_type", entry.ResourceType);
command.Parameters.AddWithValue("resource_id", entry.ResourceId);
command.Parameters.AddWithValue("actor_id", entry.ActorId);
command.Parameters.AddWithValue("actor_type", (int)entry.ActorType);
command.Parameters.AddWithValue("actor_ip", (object?)entry.ActorIp ?? DBNull.Value);
command.Parameters.AddWithValue("user_agent", (object?)entry.UserAgent ?? DBNull.Value);
command.Parameters.AddWithValue("http_method", (object?)entry.HttpMethod ?? DBNull.Value);
command.Parameters.AddWithValue("request_path", (object?)entry.RequestPath ?? DBNull.Value);
command.Parameters.AddWithValue("old_state", (object?)entry.OldState ?? DBNull.Value);
command.Parameters.AddWithValue("new_state", (object?)entry.NewState ?? DBNull.Value);
command.Parameters.AddWithValue("description", entry.Description);
command.Parameters.AddWithValue("correlation_id", (object?)entry.CorrelationId ?? DBNull.Value);
command.Parameters.AddWithValue("previous_entry_hash", (object?)entry.PreviousEntryHash ?? DBNull.Value);
command.Parameters.AddWithValue("content_hash", entry.ContentHash);
command.Parameters.AddWithValue("sequence_number", entry.SequenceNumber);
command.Parameters.AddWithValue("occurred_at", entry.OccurredAt);
command.Parameters.AddWithValue("metadata", (object?)entry.Metadata ?? DBNull.Value);
}
private static AuditEntry ReadEntry(NpgsqlDataReader reader) => new(
EntryId: reader.GetGuid(0),
TenantId: reader.GetString(1),
EventType: (AuditEventType)reader.GetInt32(2),
ResourceType: reader.GetString(3),
ResourceId: reader.GetGuid(4),
ActorId: reader.GetString(5),
ActorType: (ActorType)reader.GetInt32(6),
ActorIp: reader.IsDBNull(7) ? null : reader.GetString(7),
UserAgent: reader.IsDBNull(8) ? null : reader.GetString(8),
HttpMethod: reader.IsDBNull(9) ? null : reader.GetString(9),
RequestPath: reader.IsDBNull(10) ? null : reader.GetString(10),
OldState: reader.IsDBNull(11) ? null : reader.GetString(11),
NewState: reader.IsDBNull(12) ? null : reader.GetString(12),
Description: reader.GetString(13),
CorrelationId: reader.IsDBNull(14) ? null : reader.GetString(14),
PreviousEntryHash: reader.IsDBNull(15) ? null : reader.GetString(15),
ContentHash: reader.GetString(16),
SequenceNumber: reader.GetInt64(17),
OccurredAt: reader.GetFieldValue<DateTimeOffset>(18),
Metadata: reader.IsDBNull(19) ? null : reader.GetString(19));
}