diff --git a/src/Notifier/StellaOps.Notifier/StellaOps.Notifier.Worker/DeadLetter/PostgresDeadLetterService.cs b/src/Notifier/StellaOps.Notifier/StellaOps.Notifier.Worker/DeadLetter/PostgresDeadLetterService.cs new file mode 100644 index 000000000..a0fb881f9 --- /dev/null +++ b/src/Notifier/StellaOps.Notifier/StellaOps.Notifier.Worker/DeadLetter/PostgresDeadLetterService.cs @@ -0,0 +1,408 @@ +using System.Collections.Immutable; +using System.Text.Json; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; +using StellaOps.Notifier.Worker.Observability; +using StellaOps.Notifier.Worker.Storage; +using StellaOps.Notify.Persistence.Postgres.Models; + +namespace StellaOps.Notifier.Worker.DeadLetter; + +public sealed class PostgresDeadLetterService : PostgresNotifyRuntimeServiceBase, IDeadLetterService +{ + private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web); + + private readonly TimeProvider _timeProvider; + private readonly INotifyMetrics? _metrics; + private readonly ILogger _logger; + + public PostgresDeadLetterService( + NotifyDurableStorageSupport support, + TimeProvider timeProvider, + ILogger logger, + INotifyMetrics? metrics = null) + : base(support) + { + _timeProvider = timeProvider ?? TimeProvider.System; + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _metrics = metrics; + } + + public Task EnqueueAsync( + DeadLetterEnqueueRequest request, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(request); + + return InTenantDbContextAsync( + request.TenantId, + "writer", + async dbContext => + { + var now = _timeProvider.GetUtcNow(); + var entry = new DeadLetterRuntimeEntity + { + EntryId = $"dl-{Guid.NewGuid():N}"[..19], + TenantId = request.TenantId, + DeliveryId = request.DeliveryId, + EventId = request.EventId, + ChannelId = request.ChannelId, + ChannelType = request.ChannelType, + FailureReason = request.FailureReason, + FailureDetails = request.FailureDetails, + AttemptCount = request.AttemptCount, + CreatedAt = now, + FirstAttemptAt = request.LastAttemptAt ?? now, + LastAttemptAt = request.LastAttemptAt ?? now, + Status = DeadLetterStatus.Pending.ToString(), + RetryCount = 0, + Metadata = SerializeDictionary(request.Metadata), + OriginalPayload = request.OriginalPayload, + ExceptionType = null, + ExceptionMessage = null, + }; + + dbContext.DeadLetterEntries.Add(entry); + await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false); + + _metrics?.RecordDeadLetter(request.TenantId, request.FailureReason, request.ChannelType); + _logger.LogWarning( + "Dead-lettered delivery {DeliveryId} for tenant {TenantId}: {Reason}", + request.DeliveryId, + request.TenantId, + request.FailureReason); + + return MapEntry(entry); + }, + cancellationToken); + } + + public Task GetAsync( + string tenantId, + string entryId, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(tenantId); + ArgumentException.ThrowIfNullOrWhiteSpace(entryId); + + return InTenantDbContextAsync( + tenantId, + "reader", + async dbContext => + { + var entity = await dbContext.DeadLetterEntries + .AsNoTracking() + .FirstOrDefaultAsync(row => row.EntryId == entryId, cancellationToken) + .ConfigureAwait(false); + return entity is null ? null : MapEntry(entity); + }, + cancellationToken); + } + + public Task> ListAsync( + string tenantId, + DeadLetterListOptions? options = null, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(tenantId); + options ??= new DeadLetterListOptions(); + + return InTenantDbContextAsync( + tenantId, + "reader", + async dbContext => + { + var query = dbContext.DeadLetterEntries + .AsNoTracking() + .Where(row => row.TenantId == tenantId); + + if (options.Status.HasValue) + { + var status = options.Status.Value.ToString(); + query = query.Where(row => row.Status == status); + } + + if (!string.IsNullOrWhiteSpace(options.ChannelId)) + { + query = query.Where(row => row.ChannelId == options.ChannelId); + } + + if (!string.IsNullOrWhiteSpace(options.ChannelType)) + { + query = query.Where(row => row.ChannelType == options.ChannelType); + } + + if (options.Since.HasValue) + { + query = query.Where(row => row.CreatedAt >= options.Since.Value); + } + + if (options.Until.HasValue) + { + query = query.Where(row => row.CreatedAt <= options.Until.Value); + } + + var rows = await query + .OrderByDescending(row => row.CreatedAt) + .Skip(options.Offset) + .Take(options.Limit) + .ToListAsync(cancellationToken) + .ConfigureAwait(false); + return (IReadOnlyList)rows.Select(MapEntry).ToList(); + }, + cancellationToken); + } + + public Task RetryAsync( + string tenantId, + string entryId, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(tenantId); + ArgumentException.ThrowIfNullOrWhiteSpace(entryId); + + return InTenantDbContextAsync( + tenantId, + "writer", + async dbContext => + { + var entity = await dbContext.DeadLetterEntries + .FirstOrDefaultAsync(row => row.EntryId == entryId, cancellationToken) + .ConfigureAwait(false); + + if (entity is null) + { + return new DeadLetterRetryResult + { + EntryId = entryId, + Success = false, + Error = "Entry not found", + }; + } + + var currentStatus = ParseStatus(entity.Status); + if (currentStatus is DeadLetterStatus.Retried or DeadLetterStatus.Resolved) + { + return new DeadLetterRetryResult + { + EntryId = entryId, + Success = false, + Error = $"Entry is already {currentStatus}", + }; + } + + var now = _timeProvider.GetUtcNow(); + dbContext.Entry(entity).CurrentValues.SetValues(new DeadLetterRuntimeEntity + { + EntryId = entity.EntryId, + TenantId = entity.TenantId, + DeliveryId = entity.DeliveryId, + EventId = entity.EventId, + ChannelId = entity.ChannelId, + ChannelType = entity.ChannelType, + FailureReason = entity.FailureReason, + FailureDetails = entity.FailureDetails, + AttemptCount = entity.AttemptCount, + CreatedAt = entity.CreatedAt, + FirstAttemptAt = entity.FirstAttemptAt, + LastAttemptAt = entity.LastAttemptAt, + Status = DeadLetterStatus.Retried.ToString(), + RetryCount = entity.RetryCount + 1, + LastRetryAt = now, + Resolution = entity.Resolution, + ResolvedBy = entity.ResolvedBy, + ResolvedAt = entity.ResolvedAt, + Metadata = entity.Metadata, + OriginalPayload = entity.OriginalPayload, + ExceptionType = entity.ExceptionType, + ExceptionMessage = entity.ExceptionMessage, + }); + + await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false); + + _logger.LogInformation( + "Retried dead-letter entry {EntryId} for tenant {TenantId}", + entryId, + tenantId); + + return new DeadLetterRetryResult + { + EntryId = entryId, + Success = true, + RetriedAt = now, + NewDeliveryId = Guid.NewGuid().ToString("N"), + }; + }, + cancellationToken); + } + + public async Task> RetryBatchAsync( + string tenantId, + IEnumerable entryIds, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(tenantId); + ArgumentNullException.ThrowIfNull(entryIds); + + var results = new List(); + foreach (var entryId in entryIds) + { + results.Add(await RetryAsync(tenantId, entryId, cancellationToken).ConfigureAwait(false)); + } + + return results; + } + + public Task ResolveAsync( + string tenantId, + string entryId, + string resolution, + string? resolvedBy = null, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(tenantId); + ArgumentException.ThrowIfNullOrWhiteSpace(entryId); + ArgumentException.ThrowIfNullOrWhiteSpace(resolution); + + return InTenantDbContextAsync( + tenantId, + "writer", + async dbContext => + { + var entity = await dbContext.DeadLetterEntries + .FirstOrDefaultAsync(row => row.EntryId == entryId, cancellationToken) + .ConfigureAwait(false); + if (entity is null) + { + return; + } + + dbContext.Entry(entity).CurrentValues.SetValues(new DeadLetterRuntimeEntity + { + EntryId = entity.EntryId, + TenantId = entity.TenantId, + DeliveryId = entity.DeliveryId, + EventId = entity.EventId, + ChannelId = entity.ChannelId, + ChannelType = entity.ChannelType, + FailureReason = entity.FailureReason, + FailureDetails = entity.FailureDetails, + AttemptCount = entity.AttemptCount, + CreatedAt = entity.CreatedAt, + FirstAttemptAt = entity.FirstAttemptAt, + LastAttemptAt = entity.LastAttemptAt, + Status = DeadLetterStatus.Resolved.ToString(), + RetryCount = entity.RetryCount, + LastRetryAt = entity.LastRetryAt, + Resolution = resolution, + ResolvedBy = resolvedBy, + ResolvedAt = _timeProvider.GetUtcNow(), + Metadata = entity.Metadata, + OriginalPayload = entity.OriginalPayload, + ExceptionType = entity.ExceptionType, + ExceptionMessage = entity.ExceptionMessage, + }); + + await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false); + + _logger.LogInformation( + "Resolved dead-letter entry {EntryId} for tenant {TenantId}: {Resolution}", + entryId, + tenantId, + resolution); + }, + cancellationToken); + } + + public Task PurgeExpiredAsync( + string tenantId, + TimeSpan maxAge, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(tenantId); + + return InTenantDbContextAsync( + tenantId, + "writer", + async dbContext => + { + var cutoff = _timeProvider.GetUtcNow() - maxAge; + return await dbContext.DeadLetterEntries + .Where(row => row.TenantId == tenantId && row.CreatedAt < cutoff) + .ExecuteDeleteAsync(cancellationToken) + .ConfigureAwait(false); + }, + cancellationToken); + } + + public Task GetStatsAsync( + string tenantId, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(tenantId); + + return InTenantDbContextAsync( + tenantId, + "reader", + async dbContext => + { + var rows = await dbContext.DeadLetterEntries + .AsNoTracking() + .Where(row => row.TenantId == tenantId) + .ToListAsync(cancellationToken) + .ConfigureAwait(false); + + return new DeadLetterStats + { + TotalCount = rows.Count, + PendingCount = rows.Count(row => ParseStatus(row.Status) == DeadLetterStatus.Pending), + RetryingCount = rows.Count(row => ParseStatus(row.Status) == DeadLetterStatus.Retrying), + RetriedCount = rows.Count(row => ParseStatus(row.Status) == DeadLetterStatus.Retried), + ResolvedCount = rows.Count(row => ParseStatus(row.Status) == DeadLetterStatus.Resolved), + ExhaustedCount = rows.Count(row => ParseStatus(row.Status) == DeadLetterStatus.Exhausted), + ByChannel = rows.GroupBy(row => row.ChannelType).ToDictionary(group => group.Key, group => group.Count()), + ByReason = rows.GroupBy(row => row.FailureReason).ToDictionary(group => group.Key, group => group.Count()), + OldestEntryAt = rows.MinBy(row => row.CreatedAt)?.CreatedAt, + NewestEntryAt = rows.MaxBy(row => row.CreatedAt)?.CreatedAt, + }; + }, + cancellationToken); + } + + private static DeadLetterEntry MapEntry(DeadLetterRuntimeEntity entity) + => new() + { + EntryId = entity.EntryId, + TenantId = entity.TenantId, + DeliveryId = entity.DeliveryId, + EventId = entity.EventId ?? string.Empty, + ChannelId = entity.ChannelId ?? string.Empty, + ChannelType = entity.ChannelType, + FailureReason = entity.FailureReason, + FailureDetails = entity.FailureDetails, + AttemptCount = entity.AttemptCount, + CreatedAt = entity.CreatedAt, + LastAttemptAt = entity.LastAttemptAt, + Status = ParseStatus(entity.Status), + RetryCount = entity.RetryCount, + LastRetryAt = entity.LastRetryAt, + Resolution = entity.Resolution, + ResolvedBy = entity.ResolvedBy, + ResolvedAt = entity.ResolvedAt, + Metadata = DeserializeDictionary(entity.Metadata).ToImmutableDictionary(), + OriginalPayload = entity.OriginalPayload, + }; + + private static DeadLetterStatus ParseStatus(string? status) + => Enum.TryParse(status, true, out var parsed) + ? parsed + : DeadLetterStatus.Pending; + + private static string SerializeDictionary(IReadOnlyDictionary? metadata) + => JsonSerializer.Serialize(metadata ?? ImmutableDictionary.Empty, JsonOptions); + + private static Dictionary DeserializeDictionary(string? json) + => string.IsNullOrWhiteSpace(json) + ? new Dictionary() + : (JsonSerializer.Deserialize>(json, JsonOptions) ?? new Dictionary()); +} diff --git a/src/Notifier/StellaOps.Notifier/StellaOps.Notifier.Worker/Observability/PostgresDeadLetterHandler.cs b/src/Notifier/StellaOps.Notifier/StellaOps.Notifier.Worker/Observability/PostgresDeadLetterHandler.cs new file mode 100644 index 000000000..f71546a32 --- /dev/null +++ b/src/Notifier/StellaOps.Notifier/StellaOps.Notifier.Worker/Observability/PostgresDeadLetterHandler.cs @@ -0,0 +1,454 @@ +using System.Text.Json; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; +using StellaOps.Notifier.Worker.Storage; +using StellaOps.Notify.Persistence.EfCore.Context; +using StellaOps.Notify.Persistence.Postgres.Models; +using ServiceDeadLetterStatus = StellaOps.Notifier.Worker.DeadLetter.DeadLetterStatus; + +namespace StellaOps.Notifier.Worker.Observability; + +public sealed class PostgresDeadLetterHandler : PostgresNotifyRuntimeServiceBase, IDeadLetterHandler +{ + private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web); + + private readonly TimeProvider _timeProvider; + private readonly ILogger _logger; + + public PostgresDeadLetterHandler( + NotifyDurableStorageSupport support, + TimeProvider timeProvider, + ILogger logger) + : base(support) + { + _timeProvider = timeProvider ?? TimeProvider.System; + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public Task DeadLetterAsync( + string tenantId, + string deliveryId, + DeadLetterReason reason, + string channelType, + object? payload = null, + Exception? exception = null, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(tenantId); + ArgumentException.ThrowIfNullOrWhiteSpace(deliveryId); + ArgumentException.ThrowIfNullOrWhiteSpace(channelType); + + return InTenantDbContextAsync( + tenantId, + "writer", + async dbContext => + { + var now = _timeProvider.GetUtcNow(); + var entity = new DeadLetterRuntimeEntity + { + EntryId = $"dl-{Guid.NewGuid():N}"[..19], + TenantId = tenantId, + DeliveryId = deliveryId, + EventId = deliveryId, + ChannelId = channelType, + ChannelType = channelType, + FailureReason = reason.ToString(), + FailureDetails = exception?.Message, + AttemptCount = 0, + CreatedAt = now, + FirstAttemptAt = now, + LastAttemptAt = now, + Status = ServiceDeadLetterStatus.Pending.ToString(), + RetryCount = 0, + Metadata = "{}", + OriginalPayload = SerializePayload(payload), + ExceptionType = exception?.GetType().FullName, + ExceptionMessage = exception?.Message, + }; + + dbContext.DeadLetterEntries.Add(entity); + await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false); + + _logger.LogWarning( + "Dead-lettered delivery {DeliveryId} for tenant {TenantId}: {Reason}", + deliveryId, + tenantId, + reason); + + return MapDeadLetteredDelivery(entity); + }, + cancellationToken); + } + + public Task> GetAsync( + string tenantId, + DeadLetterQuery? query = null, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(tenantId); + query ??= new DeadLetterQuery(); + + return InTenantDbContextAsync( + tenantId, + "reader", + async dbContext => + { + var rows = dbContext.DeadLetterEntries + .AsNoTracking() + .Where(row => row.TenantId == tenantId); + + if (!string.IsNullOrWhiteSpace(query.Id)) + { + rows = rows.Where(row => row.EntryId == query.Id); + } + + if (query.Reason.HasValue) + { + var reason = query.Reason.Value.ToString(); + rows = rows.Where(row => row.FailureReason == reason); + } + + if (!string.IsNullOrWhiteSpace(query.ChannelType)) + { + rows = rows.Where(row => row.ChannelType == query.ChannelType); + } + + if (query.Status.HasValue) + { + var statuses = MapStatuses(query.Status.Value); + rows = rows.Where(row => statuses.Contains(row.Status)); + } + + if (query.After.HasValue) + { + rows = rows.Where(row => row.CreatedAt > query.After.Value); + } + + if (query.Before.HasValue) + { + rows = rows.Where(row => row.CreatedAt < query.Before.Value); + } + + var entities = await rows + .OrderByDescending(row => row.CreatedAt) + .Skip(query.Offset) + .Take(query.Limit) + .ToListAsync(cancellationToken) + .ConfigureAwait(false); + return (IReadOnlyList)entities.Select(MapDeadLetteredDelivery).ToList(); + }, + cancellationToken); + } + + public Task RetryAsync( + string tenantId, + string deadLetterId, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(tenantId); + ArgumentException.ThrowIfNullOrWhiteSpace(deadLetterId); + + return InTenantDbContextAsync( + tenantId, + "writer", + async dbContext => + { + var entity = await dbContext.DeadLetterEntries + .FirstOrDefaultAsync(row => row.EntryId == deadLetterId, cancellationToken) + .ConfigureAwait(false); + if (entity is null) + { + return new DeadLetterRetryResult + { + DeadLetterId = deadLetterId, + Success = false, + Error = "Not found", + NewStatus = DeadLetterStatus.Pending, + }; + } + + var currentStatus = MapStatus(entity.Status); + if (currentStatus is DeadLetterStatus.Retried or DeadLetterStatus.Discarded) + { + return new DeadLetterRetryResult + { + DeadLetterId = deadLetterId, + Success = false, + Error = "Entry is already terminal", + NewStatus = currentStatus, + }; + } + + dbContext.Entry(entity).CurrentValues.SetValues(new DeadLetterRuntimeEntity + { + EntryId = entity.EntryId, + TenantId = entity.TenantId, + DeliveryId = entity.DeliveryId, + EventId = entity.EventId, + ChannelId = entity.ChannelId, + ChannelType = entity.ChannelType, + FailureReason = entity.FailureReason, + FailureDetails = entity.FailureDetails, + AttemptCount = entity.AttemptCount, + CreatedAt = entity.CreatedAt, + FirstAttemptAt = entity.FirstAttemptAt, + LastAttemptAt = entity.LastAttemptAt, + Status = ServiceDeadLetterStatus.Retried.ToString(), + RetryCount = entity.RetryCount + 1, + LastRetryAt = _timeProvider.GetUtcNow(), + Resolution = entity.Resolution, + ResolvedBy = entity.ResolvedBy, + ResolvedAt = entity.ResolvedAt, + Metadata = entity.Metadata, + OriginalPayload = entity.OriginalPayload, + ExceptionType = entity.ExceptionType, + ExceptionMessage = entity.ExceptionMessage, + }); + + await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false); + + return new DeadLetterRetryResult + { + DeadLetterId = deadLetterId, + Success = true, + NewStatus = DeadLetterStatus.Retried, + }; + }, + cancellationToken); + } + + public async Task RetryBulkAsync( + string tenantId, + DeadLetterQuery? query = null, + CancellationToken cancellationToken = default) + { + var deadLetters = await GetAsync(tenantId, query, cancellationToken).ConfigureAwait(false); + var results = new List(); + foreach (var deadLetter in deadLetters.Where(candidate => candidate.Status == DeadLetterStatus.Pending)) + { + results.Add(await RetryAsync(tenantId, deadLetter.DeadLetterId, cancellationToken).ConfigureAwait(false)); + } + + return new DeadLetterBulkRetryResult + { + Total = results.Count, + Succeeded = results.Count(result => result.Success), + Failed = results.Count(result => !result.Success), + Results = results, + }; + } + + public Task DiscardAsync( + string tenantId, + string deadLetterId, + string? reason = null, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(tenantId); + ArgumentException.ThrowIfNullOrWhiteSpace(deadLetterId); + + return InTenantDbContextAsync( + tenantId, + "writer", + async dbContext => + { + var entity = await dbContext.DeadLetterEntries + .FirstOrDefaultAsync(row => row.EntryId == deadLetterId, cancellationToken) + .ConfigureAwait(false); + if (entity is null) + { + return false; + } + + dbContext.Entry(entity).CurrentValues.SetValues(new DeadLetterRuntimeEntity + { + EntryId = entity.EntryId, + TenantId = entity.TenantId, + DeliveryId = entity.DeliveryId, + EventId = entity.EventId, + ChannelId = entity.ChannelId, + ChannelType = entity.ChannelType, + FailureReason = entity.FailureReason, + FailureDetails = entity.FailureDetails, + AttemptCount = entity.AttemptCount, + CreatedAt = entity.CreatedAt, + FirstAttemptAt = entity.FirstAttemptAt, + LastAttemptAt = entity.LastAttemptAt, + Status = ServiceDeadLetterStatus.Resolved.ToString(), + RetryCount = entity.RetryCount, + LastRetryAt = entity.LastRetryAt, + Resolution = reason, + ResolvedBy = null, + ResolvedAt = _timeProvider.GetUtcNow(), + Metadata = entity.Metadata, + OriginalPayload = entity.OriginalPayload, + ExceptionType = entity.ExceptionType, + ExceptionMessage = entity.ExceptionMessage, + }); + + await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false); + return true; + }, + cancellationToken); + } + + public Task GetStatsAsync( + string? tenantId = null, + CancellationToken cancellationToken = default) + { + if (!string.IsNullOrWhiteSpace(tenantId)) + { + return InTenantDbContextAsync( + tenantId, + "reader", + dbContext => BuildStatsAsync(dbContext, tenantId, cancellationToken), + cancellationToken); + } + + return InSystemDbContextAsync( + dbContext => BuildStatsAsync(dbContext, null, cancellationToken), + cancellationToken); + } + + public Task PurgeAsync( + string? tenantId, + TimeSpan olderThan, + CancellationToken cancellationToken = default) + { + if (!string.IsNullOrWhiteSpace(tenantId)) + { + return InTenantDbContextAsync( + tenantId, + "writer", + async dbContext => + { + var cutoff = _timeProvider.GetUtcNow() - olderThan; + return await dbContext.DeadLetterEntries + .Where(row => row.TenantId == tenantId && row.CreatedAt < cutoff) + .ExecuteDeleteAsync(cancellationToken) + .ConfigureAwait(false); + }, + cancellationToken); + } + + return InSystemDbContextAsync( + async dbContext => + { + var cutoff = _timeProvider.GetUtcNow() - olderThan; + return await dbContext.DeadLetterEntries + .Where(row => row.CreatedAt < cutoff) + .ExecuteDeleteAsync(cancellationToken) + .ConfigureAwait(false); + }, + cancellationToken); + } + + private Task BuildStatsAsync( + NotifyDbContext dbContext, + string? tenantId, + CancellationToken cancellationToken) + { + var query = dbContext.DeadLetterEntries.AsNoTracking().AsQueryable(); + if (!string.IsNullOrWhiteSpace(tenantId)) + { + query = query.Where(row => row.TenantId == tenantId); + } + + return BuildStatsCoreAsync(query, tenantId, cancellationToken); + } + + private async Task BuildStatsCoreAsync( + IQueryable query, + string? tenantId, + CancellationToken cancellationToken) + { + var rows = await query.ToListAsync(cancellationToken).ConfigureAwait(false); + + return new DeadLetterStats + { + Timestamp = _timeProvider.GetUtcNow(), + TenantId = tenantId, + TotalCount = rows.Count, + PendingCount = rows.Count(row => MapStatus(row.Status) == DeadLetterStatus.Pending), + RetryingCount = rows.Count(row => MapStatus(row.Status) == DeadLetterStatus.Retrying), + RetriedCount = rows.Count(row => MapStatus(row.Status) == DeadLetterStatus.Retried), + DiscardedCount = rows.Count(row => MapStatus(row.Status) == DeadLetterStatus.Discarded), + ByReason = rows.GroupBy(row => MapReason(row.FailureReason)).ToDictionary(group => group.Key, group => group.Count()), + ByChannel = rows.GroupBy(row => row.ChannelType).ToDictionary(group => group.Key, group => group.Count()), + OldestDeadLetterAt = rows.MinBy(row => row.CreatedAt)?.CreatedAt, + NewestDeadLetterAt = rows.MaxBy(row => row.CreatedAt)?.CreatedAt, + }; + } + + private static IReadOnlyCollection MapStatuses(DeadLetterStatus status) + => status switch + { + DeadLetterStatus.Discarded => new[] { ServiceDeadLetterStatus.Resolved.ToString(), ServiceDeadLetterStatus.Exhausted.ToString() }, + DeadLetterStatus.Retrying => new[] { ServiceDeadLetterStatus.Retrying.ToString() }, + DeadLetterStatus.Retried => new[] { ServiceDeadLetterStatus.Retried.ToString() }, + _ => new[] { ServiceDeadLetterStatus.Pending.ToString() }, + }; + + private static DeadLetteredDelivery MapDeadLetteredDelivery(DeadLetterRuntimeEntity entity) + => new() + { + DeadLetterId = entity.EntryId, + TenantId = entity.TenantId, + DeliveryId = entity.DeliveryId, + ChannelType = entity.ChannelType, + Reason = MapReason(entity.FailureReason), + ReasonDetails = entity.FailureDetails, + OriginalPayload = DeserializePayload(entity.OriginalPayload), + ExceptionType = entity.ExceptionType, + ExceptionMessage = entity.ExceptionMessage, + AttemptCount = entity.AttemptCount, + FirstAttemptAt = entity.FirstAttemptAt, + DeadLetteredAt = entity.CreatedAt, + LastRetryAt = entity.LastRetryAt, + RetryCount = entity.RetryCount, + Status = MapStatus(entity.Status), + DiscardReason = entity.Resolution, + }; + + private static DeadLetterReason MapReason(string? failureReason) + => Enum.TryParse(failureReason, true, out var parsed) + ? parsed + : DeadLetterReason.UnknownError; + + private static DeadLetterStatus MapStatus(string? status) + => status switch + { + nameof(ServiceDeadLetterStatus.Retrying) => DeadLetterStatus.Retrying, + nameof(ServiceDeadLetterStatus.Retried) => DeadLetterStatus.Retried, + nameof(ServiceDeadLetterStatus.Resolved) => DeadLetterStatus.Discarded, + nameof(ServiceDeadLetterStatus.Exhausted) => DeadLetterStatus.Discarded, + _ => DeadLetterStatus.Pending, + }; + + private static string? SerializePayload(object? payload) + { + if (payload is null) + { + return null; + } + + return payload is string text ? text : JsonSerializer.Serialize(payload, JsonOptions); + } + + private static object? DeserializePayload(string? payload) + { + if (string.IsNullOrWhiteSpace(payload)) + { + return null; + } + + try + { + return JsonSerializer.Deserialize(payload, JsonOptions); + } + catch + { + return payload; + } + } +} diff --git a/src/Notifier/StellaOps.Notifier/StellaOps.Notifier.Worker/Security/PostgresTenantIsolationValidator.cs b/src/Notifier/StellaOps.Notifier/StellaOps.Notifier.Worker/Security/PostgresTenantIsolationValidator.cs new file mode 100644 index 000000000..9f1c5dc9a --- /dev/null +++ b/src/Notifier/StellaOps.Notifier/StellaOps.Notifier.Worker/Security/PostgresTenantIsolationValidator.cs @@ -0,0 +1,739 @@ +using System.Text.Json; +using System.Text.RegularExpressions; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using StellaOps.Notifier.Worker.Storage; +using StellaOps.Notify.Persistence.Postgres.Models; + +namespace StellaOps.Notifier.Worker.Security; + +public sealed class PostgresTenantIsolationValidator : PostgresNotifyRuntimeServiceBase, ITenantIsolationValidator +{ + private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web); + + private readonly TenantIsolationOptions _options; + private readonly TimeProvider _timeProvider; + private readonly ILogger _logger; + private readonly IReadOnlyList _adminPatterns; + + public PostgresTenantIsolationValidator( + NotifyDurableStorageSupport support, + IOptions options, + TimeProvider timeProvider, + ILogger logger) + : base(support) + { + _options = options?.Value ?? new TenantIsolationOptions(); + _timeProvider = timeProvider ?? TimeProvider.System; + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _adminPatterns = _options.AdminTenantPatterns + .Select(pattern => new Regex(pattern, RegexOptions.Compiled | RegexOptions.IgnoreCase)) + .ToList(); + } + + public async Task ValidateResourceAccessAsync( + string tenantId, + string resourceType, + string resourceId, + TenantAccessOperation operation, + CancellationToken cancellationToken = default) + { + if (string.IsNullOrWhiteSpace(tenantId)) + { + return TenantValidationResult.Denied("Tenant ID is required for validation."); + } + + if (IsAdminTenant(tenantId)) + { + return TenantValidationResult.Allowed(TenantValidationType.SystemResource); + } + + if (_options.SystemResourceTypes.Contains(resourceType)) + { + return TenantValidationResult.Allowed(TenantValidationType.SystemResource); + } + + var ownership = await InSystemDbContextAsync( + dbContext => dbContext.TenantResourceOwnership + .AsNoTracking() + .FirstOrDefaultAsync( + row => row.ResourceType == resourceType && row.ResourceId == resourceId, + cancellationToken), + cancellationToken) + .ConfigureAwait(false); + + if (ownership is null) + { + return TenantValidationResult.Allowed(TenantValidationType.ResourceNotFound); + } + + if (string.Equals(ownership.TenantId, tenantId, StringComparison.Ordinal)) + { + return TenantValidationResult.Allowed(TenantValidationType.SameTenant); + } + + if (_options.AllowCrossTenantGrants) + { + var now = _timeProvider.GetUtcNow(); + var grant = await InSystemDbContextAsync( + dbContext => dbContext.CrossTenantGrants + .AsNoTracking() + .FirstOrDefaultAsync( + row => row.OwnerTenantId == ownership.TenantId && + row.TargetTenantId == tenantId && + row.ResourceType == resourceType && + row.ResourceId == resourceId && + row.IsActive && + (row.ExpiresAt == null || row.ExpiresAt > now), + cancellationToken), + cancellationToken) + .ConfigureAwait(false); + + if (grant is not null && HasOperationFlag(grant.AllowedOperations, operation)) + { + return TenantValidationResult.CrossTenantAllowed(grant.GrantId); + } + } + + await RecordViolationAsync( + tenantId, + ownership.TenantId, + resourceType, + resourceId, + operation, + cancellationToken) + .ConfigureAwait(false); + + return TenantValidationResult.Denied( + $"Tenant {tenantId} does not have access to {resourceType}/{resourceId} owned by tenant {ownership.TenantId}"); + } + + public Task ValidateDeliveryAsync( + string tenantId, + string deliveryId, + CancellationToken cancellationToken = default) + => ValidateResourceAccessAsync(tenantId, "delivery", deliveryId, TenantAccessOperation.Read, cancellationToken); + + public Task ValidateChannelAsync( + string tenantId, + string channelId, + CancellationToken cancellationToken = default) + => ValidateResourceAccessAsync(tenantId, "channel", channelId, TenantAccessOperation.Read, cancellationToken); + + public Task ValidateTemplateAsync( + string tenantId, + string templateId, + CancellationToken cancellationToken = default) + => ValidateResourceAccessAsync(tenantId, "template", templateId, TenantAccessOperation.Read, cancellationToken); + + public Task ValidateSubscriptionAsync( + string tenantId, + string subscriptionId, + CancellationToken cancellationToken = default) + => ValidateResourceAccessAsync(tenantId, "subscription", subscriptionId, TenantAccessOperation.Read, cancellationToken); + + public Task ValidateCrossTenantAccessAsync( + string sourceTenantId, + string targetTenantId, + string resourceType, + string resourceId, + CancellationToken cancellationToken = default) + { + if (string.Equals(sourceTenantId, targetTenantId, StringComparison.Ordinal)) + { + return Task.FromResult(TenantValidationResult.Allowed(TenantValidationType.SameTenant)); + } + + return ValidateResourceAccessAsync(sourceTenantId, resourceType, resourceId, TenantAccessOperation.Read, cancellationToken); + } + + public Task RegisterResourceAsync( + string tenantId, + string resourceType, + string resourceId, + CancellationToken cancellationToken = default) + => InTenantDbContextAsync( + tenantId, + "writer", + async dbContext => + { + var existing = await dbContext.TenantResourceOwnership + .FirstOrDefaultAsync( + row => row.ResourceType == resourceType && row.ResourceId == resourceId, + cancellationToken) + .ConfigureAwait(false); + + var entity = new TenantResourceOwnershipEntity + { + Id = existing?.Id ?? Guid.NewGuid(), + TenantId = tenantId, + ResourceType = resourceType, + ResourceId = resourceId, + Metadata = existing?.Metadata ?? "{}", + RegisteredAt = existing?.RegisteredAt ?? _timeProvider.GetUtcNow(), + }; + + if (existing is null) + { + dbContext.TenantResourceOwnership.Add(entity); + } + else + { + dbContext.Entry(existing).CurrentValues.SetValues(entity); + } + + await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false); + + _logger.LogDebug( + "Registered resource {ResourceType}/{ResourceId} for tenant {TenantId}.", + resourceType, + resourceId, + tenantId); + }, + cancellationToken); + + public Task UnregisterResourceAsync( + string resourceType, + string resourceId, + CancellationToken cancellationToken = default) + => InSystemDbContextAsync( + async dbContext => + { + await dbContext.TenantResourceOwnership + .Where(row => row.ResourceType == resourceType && row.ResourceId == resourceId) + .ExecuteDeleteAsync(cancellationToken) + .ConfigureAwait(false); + + await dbContext.CrossTenantGrants + .Where(row => row.ResourceType == resourceType && row.ResourceId == resourceId) + .ExecuteDeleteAsync(cancellationToken) + .ConfigureAwait(false); + }, + cancellationToken); + + public Task> GetTenantResourcesAsync( + string tenantId, + string? resourceType = null, + CancellationToken cancellationToken = default) + => InTenantDbContextAsync( + tenantId, + "reader", + async dbContext => + { + var query = dbContext.TenantResourceOwnership + .AsNoTracking() + .Where(row => row.TenantId == tenantId); + + if (!string.IsNullOrWhiteSpace(resourceType)) + { + query = query.Where(row => row.ResourceType == resourceType); + } + + return (IReadOnlyList)await query + .OrderBy(row => row.ResourceType) + .ThenBy(row => row.ResourceId) + .Select(row => new TenantResource + { + TenantId = row.TenantId, + ResourceType = row.ResourceType, + ResourceId = row.ResourceId, + RegisteredAt = row.RegisteredAt, + Metadata = DeserializeDictionary(row.Metadata), + }) + .ToListAsync(cancellationToken) + .ConfigureAwait(false); + }, + cancellationToken); + + public Task GrantCrossTenantAccessAsync( + string ownerTenantId, + string targetTenantId, + string resourceType, + string resourceId, + TenantAccessOperation allowedOperations, + DateTimeOffset? expiresAt, + string grantedBy, + CancellationToken cancellationToken = default) + { + if (!_options.AllowCrossTenantGrants) + { + throw new InvalidOperationException("Cross-tenant grants are disabled."); + } + + if (expiresAt.HasValue) + { + var maxExpiry = _timeProvider.GetUtcNow() + _options.MaxGrantDuration; + if (expiresAt > maxExpiry) + { + expiresAt = maxExpiry; + } + } + + return InSystemDbContextAsync( + async dbContext => + { + var existing = await dbContext.CrossTenantGrants + .FirstOrDefaultAsync( + row => row.OwnerTenantId == ownerTenantId && + row.TargetTenantId == targetTenantId && + row.ResourceType == resourceType && + row.ResourceId == resourceId, + cancellationToken) + .ConfigureAwait(false); + + var entity = new CrossTenantGrantEntity + { + GrantId = existing?.GrantId ?? $"grant-{Guid.NewGuid():N}"[..20], + OwnerTenantId = ownerTenantId, + TargetTenantId = targetTenantId, + ResourceType = resourceType, + ResourceId = resourceId, + AllowedOperations = (int)allowedOperations, + GrantedAt = existing?.GrantedAt ?? _timeProvider.GetUtcNow(), + GrantedBy = existing?.GrantedBy ?? grantedBy, + ExpiresAt = expiresAt, + IsActive = true, + RevokedAt = null, + RevokedBy = null, + }; + + if (existing is null) + { + dbContext.CrossTenantGrants.Add(entity); + } + else + { + dbContext.Entry(existing).CurrentValues.SetValues(entity); + } + + await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false); + + _logger.LogInformation( + "Granted cross-tenant access for {ResourceType}/{ResourceId} from {OwnerTenant} to {TargetTenant} by {GrantedBy}.", + resourceType, + resourceId, + ownerTenantId, + targetTenantId, + grantedBy); + }, + cancellationToken); + } + + public Task RevokeCrossTenantAccessAsync( + string ownerTenantId, + string targetTenantId, + string resourceType, + string resourceId, + string revokedBy, + CancellationToken cancellationToken = default) + => InSystemDbContextAsync( + async dbContext => + { + var existing = await dbContext.CrossTenantGrants + .FirstOrDefaultAsync( + row => row.OwnerTenantId == ownerTenantId && + row.TargetTenantId == targetTenantId && + row.ResourceType == resourceType && + row.ResourceId == resourceId, + cancellationToken) + .ConfigureAwait(false); + + if (existing is null) + { + return; + } + + dbContext.Entry(existing).CurrentValues.SetValues(new CrossTenantGrantEntity + { + GrantId = existing.GrantId, + OwnerTenantId = existing.OwnerTenantId, + TargetTenantId = existing.TargetTenantId, + ResourceType = existing.ResourceType, + ResourceId = existing.ResourceId, + AllowedOperations = existing.AllowedOperations, + GrantedAt = existing.GrantedAt, + GrantedBy = existing.GrantedBy, + ExpiresAt = existing.ExpiresAt, + IsActive = false, + RevokedAt = _timeProvider.GetUtcNow(), + RevokedBy = revokedBy, + }); + + await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false); + + _logger.LogInformation( + "Revoked cross-tenant access {GrantId} for {ResourceType}/{ResourceId} by {RevokedBy}.", + existing.GrantId, + resourceType, + resourceId, + revokedBy); + }, + cancellationToken); + + public Task> GetViolationsAsync( + string? tenantId = null, + DateTimeOffset? since = null, + CancellationToken cancellationToken = default) + => InSystemDbContextAsync( + async dbContext => + { + var query = dbContext.TenantIsolationViolations.AsNoTracking().AsQueryable(); + if (!string.IsNullOrWhiteSpace(tenantId)) + { + query = query.Where(row => row.RequestingTenantId == tenantId); + } + + if (since.HasValue) + { + query = query.Where(row => row.OccurredAt >= since.Value); + } + + return (IReadOnlyList)await query + .OrderByDescending(row => row.OccurredAt) + .Select(row => new TenantViolation + { + ViolationId = row.ViolationId, + RequestingTenantId = row.RequestingTenantId, + ResourceOwnerTenantId = row.ResourceOwnerTenantId, + ResourceType = row.ResourceType, + ResourceId = row.ResourceId, + Operation = (TenantAccessOperation)row.Operation, + OccurredAt = row.OccurredAt, + Context = row.Context, + Severity = (ViolationSeverity)row.Severity, + }) + .ToListAsync(cancellationToken) + .ConfigureAwait(false); + }, + cancellationToken); + + public async Task RunFuzzTestAsync( + TenantFuzzTestConfig config, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(config); + + var startTime = _timeProvider.GetUtcNow(); + var random = config.Seed.HasValue ? new Random(config.Seed.Value) : new Random(); + var failures = new List(); + var totalTests = 0; + var passedTests = 0; + var testResources = new List<(string TenantId, string ResourceType, string ResourceId)>(); + + foreach (var tenantId in config.TenantIds) + { + foreach (var resourceType in config.ResourceTypes) + { + for (var index = 0; index < 3; index++) + { + var resourceId = $"test-{resourceType}-{tenantId}-{index}"; + await RegisterResourceAsync(tenantId, resourceType, resourceId, cancellationToken).ConfigureAwait(false); + testResources.Add((tenantId, resourceType, resourceId)); + } + } + } + + try + { + for (var index = 0; index < config.Iterations; index++) + { + var resource = testResources[random.Next(testResources.Count)]; + totalTests++; + + var result = await ValidateResourceAccessAsync( + resource.TenantId, + resource.ResourceType, + resource.ResourceId, + TenantAccessOperation.Read, + cancellationToken) + .ConfigureAwait(false); + + if (result.IsAllowed) + { + passedTests++; + } + else + { + failures.Add(new FuzzTestFailure + { + TestCase = "Same tenant access", + Expected = "Allowed", + Actual = $"Denied: {result.DenialReason}", + Input = new Dictionary + { + ["tenantId"] = resource.TenantId, + ["resourceType"] = resource.ResourceType, + ["resourceId"] = resource.ResourceId, + }, + }); + } + } + + for (var index = 0; index < config.Iterations; index++) + { + var resource = testResources[random.Next(testResources.Count)]; + var differentTenant = config.TenantIds + .Where(candidate => !string.Equals(candidate, resource.TenantId, StringComparison.Ordinal)) + .OrderBy(_ => random.Next()) + .FirstOrDefault(); + + if (differentTenant is null) + { + continue; + } + + totalTests++; + var result = await ValidateResourceAccessAsync( + differentTenant, + resource.ResourceType, + resource.ResourceId, + TenantAccessOperation.Read, + cancellationToken) + .ConfigureAwait(false); + + if (!result.IsAllowed) + { + passedTests++; + } + else + { + failures.Add(new FuzzTestFailure + { + TestCase = "Cross-tenant access without grant", + Expected = "Denied", + Actual = "Allowed", + Input = new Dictionary + { + ["requestingTenantId"] = differentTenant, + ["ownerTenantId"] = resource.TenantId, + ["resourceType"] = resource.ResourceType, + ["resourceId"] = resource.ResourceId, + }, + }); + } + } + + if (config.TestCrossTenantGrants && _options.AllowCrossTenantGrants) + { + for (var index = 0; index < config.Iterations / 2; index++) + { + var resource = testResources[random.Next(testResources.Count)]; + var differentTenant = config.TenantIds + .Where(candidate => !string.Equals(candidate, resource.TenantId, StringComparison.Ordinal)) + .OrderBy(_ => random.Next()) + .FirstOrDefault(); + + if (differentTenant is null) + { + continue; + } + + await GrantCrossTenantAccessAsync( + resource.TenantId, + differentTenant, + resource.ResourceType, + resource.ResourceId, + TenantAccessOperation.Read, + null, + "fuzz-test", + cancellationToken) + .ConfigureAwait(false); + + totalTests++; + var result = await ValidateResourceAccessAsync( + differentTenant, + resource.ResourceType, + resource.ResourceId, + TenantAccessOperation.Read, + cancellationToken) + .ConfigureAwait(false); + + if (result.IsAllowed && result.IsCrossTenant) + { + passedTests++; + } + else + { + failures.Add(new FuzzTestFailure + { + TestCase = "Cross-tenant access with grant", + Expected = "Allowed (cross-tenant)", + Actual = result.IsAllowed + ? "Allowed (not marked cross-tenant)" + : $"Denied: {result.DenialReason}", + Input = new Dictionary + { + ["requestingTenantId"] = differentTenant, + ["ownerTenantId"] = resource.TenantId, + ["resourceType"] = resource.ResourceType, + ["resourceId"] = resource.ResourceId, + }, + }); + } + + await RevokeCrossTenantAccessAsync( + resource.TenantId, + differentTenant, + resource.ResourceType, + resource.ResourceId, + "fuzz-test", + cancellationToken) + .ConfigureAwait(false); + } + } + + if (config.TestEdgeCases) + { + totalTests++; + var emptyResult = await ValidateResourceAccessAsync( + string.Empty, + "delivery", + "test-resource", + TenantAccessOperation.Read, + cancellationToken) + .ConfigureAwait(false); + if (!emptyResult.IsAllowed || emptyResult.ValidationType == TenantValidationType.Denied) + { + passedTests++; + } + else + { + failures.Add(new FuzzTestFailure + { + TestCase = "Empty tenant ID", + Expected = "Denied or handled gracefully", + Actual = "Allowed", + }); + } + + totalTests++; + _ = await ValidateResourceAccessAsync( + config.TenantIds[0], + "delivery", + "non-existent-resource", + TenantAccessOperation.Read, + cancellationToken) + .ConfigureAwait(false); + passedTests++; + } + } + finally + { + foreach (var resource in testResources) + { + await UnregisterResourceAsync(resource.ResourceType, resource.ResourceId, cancellationToken).ConfigureAwait(false); + } + } + + var executionTime = _timeProvider.GetUtcNow() - startTime; + return new TenantFuzzTestResult + { + AllPassed = failures.Count == 0, + TotalTests = totalTests, + PassedTests = passedTests, + FailedTests = failures.Count, + Failures = failures, + ExecutionTime = executionTime, + }; + } + + private bool IsAdminTenant(string tenantId) + => _adminPatterns.Any(pattern => pattern.IsMatch(tenantId)); + + private Task RecordViolationAsync( + string requestingTenantId, + string ownerTenantId, + string resourceType, + string resourceId, + TenantAccessOperation operation, + CancellationToken cancellationToken) + { + if (!_options.RecordViolations) + { + return Task.CompletedTask; + } + + return InSystemDbContextAsync( + async dbContext => + { + var violation = new TenantIsolationViolationEntity + { + ViolationId = $"vio-{Guid.NewGuid():N}"[..16], + RequestingTenantId = requestingTenantId, + ResourceOwnerTenantId = ownerTenantId, + ResourceType = resourceType, + ResourceId = resourceId, + Operation = (int)operation, + OccurredAt = _timeProvider.GetUtcNow(), + Context = null, + Severity = (int)DetermineSeverity(operation), + }; + + dbContext.TenantIsolationViolations.Add(violation); + await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false); + + if (_options.LogViolations) + { + _logger.LogWarning( + "Tenant isolation violation: Tenant {RequestingTenant} attempted {Operation} on {ResourceType}/{ResourceId} owned by tenant {OwnerTenant}.", + requestingTenantId, + operation, + resourceType, + resourceId, + ownerTenantId); + } + + var cutoff = _timeProvider.GetUtcNow() - _options.ViolationRetentionPeriod; + await dbContext.TenantIsolationViolations + .Where(row => row.OccurredAt < cutoff) + .ExecuteDeleteAsync(cancellationToken) + .ConfigureAwait(false); + + var totalViolations = await dbContext.TenantIsolationViolations.CountAsync(cancellationToken).ConfigureAwait(false); + if (totalViolations > _options.MaxViolationsRetained) + { + var toRemove = totalViolations - _options.MaxViolationsRetained; + var oldestIds = await dbContext.TenantIsolationViolations + .OrderBy(row => row.OccurredAt) + .ThenBy(row => row.ViolationId) + .Take(toRemove) + .Select(row => row.ViolationId) + .ToListAsync(cancellationToken) + .ConfigureAwait(false); + + if (oldestIds.Count > 0) + { + await dbContext.TenantIsolationViolations + .Where(row => oldestIds.Contains(row.ViolationId)) + .ExecuteDeleteAsync(cancellationToken) + .ConfigureAwait(false); + } + } + }, + cancellationToken); + } + + private static ViolationSeverity DetermineSeverity(TenantAccessOperation operation) + => operation switch + { + TenantAccessOperation.Delete => ViolationSeverity.Critical, + TenantAccessOperation.Write => ViolationSeverity.High, + TenantAccessOperation.Execute => ViolationSeverity.High, + TenantAccessOperation.Share => ViolationSeverity.Medium, + _ => ViolationSeverity.Low, + }; + + private static bool HasOperationFlag(int grantedOperations, TenantAccessOperation requestedOperation) + => ((TenantAccessOperation)grantedOperations).HasFlag(requestedOperation); + + private static IReadOnlyDictionary DeserializeDictionary(string? json) + { + if (string.IsNullOrWhiteSpace(json)) + { + return new Dictionary(); + } + + return JsonSerializer.Deserialize>(json, JsonOptions) ?? new Dictionary(); + } +} diff --git a/src/Notifier/StellaOps.Notifier/StellaOps.Notifier.Worker/Security/PostgresWebhookSecurityService.cs b/src/Notifier/StellaOps.Notifier/StellaOps.Notifier.Worker/Security/PostgresWebhookSecurityService.cs new file mode 100644 index 000000000..902208283 --- /dev/null +++ b/src/Notifier/StellaOps.Notifier/StellaOps.Notifier.Worker/Security/PostgresWebhookSecurityService.cs @@ -0,0 +1,541 @@ +using System.Net; +using System.Security.Cryptography; +using System.Text; +using System.Text.Json; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Npgsql; +using StellaOps.Notifier.Worker.Storage; +using StellaOps.Notify.Persistence.Postgres.Models; + +namespace StellaOps.Notifier.Worker.Security; + +public sealed class PostgresWebhookSecurityService : PostgresNotifyRuntimeServiceBase, IWebhookSecurityService +{ + private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web); + + private readonly WebhookSecurityOptions _options; + private readonly TimeProvider _timeProvider; + private readonly ILogger _logger; + + public PostgresWebhookSecurityService( + NotifyDurableStorageSupport support, + IOptions options, + TimeProvider timeProvider, + ILogger logger) + : base(support) + { + _options = options?.Value ?? new WebhookSecurityOptions(); + _timeProvider = timeProvider ?? TimeProvider.System; + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public async Task ValidateAsync( + WebhookValidationRequest request, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(request); + + var errors = new List(); + var warnings = new List(); + var passed = WebhookValidationChecks.None; + var failed = WebhookValidationChecks.None; + + var config = await GetConfigAsync(request.TenantId, request.ChannelId, cancellationToken).ConfigureAwait(false); + if (config is null) + { + warnings.Add("No webhook security configuration found; skipping validation."); + return WebhookValidationResult.Valid(WebhookValidationChecks.All, warnings); + } + + if (!config.Enabled) + { + warnings.Add("Webhook security configuration is disabled."); + return WebhookValidationResult.Valid(WebhookValidationChecks.All, warnings); + } + + if (config.RequireSignature) + { + if (string.IsNullOrEmpty(request.Signature)) + { + errors.Add("Missing signature header."); + failed |= WebhookValidationChecks.SignatureValid; + } + else + { + var expectedSignature = GenerateSignature( + request.Body, + config.SecretKey, + config.Algorithm, + config.SignatureFormat, + config.SignaturePrefix); + if (!CompareSignatures(request.Signature, expectedSignature)) + { + errors.Add("Invalid signature."); + failed |= WebhookValidationChecks.SignatureValid; + } + else + { + passed |= WebhookValidationChecks.SignatureValid; + } + } + } + else + { + passed |= WebhookValidationChecks.SignatureValid; + } + + if (config.EnforceIpAllowlist && !string.IsNullOrWhiteSpace(request.SourceIp)) + { + if (!IsIpAllowedInternal(request.SourceIp, config.AllowedIps)) + { + errors.Add($"Source IP {request.SourceIp} not in allowlist."); + failed |= WebhookValidationChecks.IpAllowed; + } + else + { + passed |= WebhookValidationChecks.IpAllowed; + } + } + else + { + passed |= WebhookValidationChecks.IpAllowed; + } + + if (_options.EnableReplayProtection && request.Timestamp.HasValue) + { + var now = _timeProvider.GetUtcNow(); + var age = now - request.Timestamp.Value; + + if (age > config.MaxRequestAge) + { + errors.Add($"Request too old ({age.TotalSeconds:F0}s > {config.MaxRequestAge.TotalSeconds:F0}s)."); + failed |= WebhookValidationChecks.NotExpired; + } + else if (age < TimeSpan.FromSeconds(-30)) + { + errors.Add("Request timestamp is in the future."); + failed |= WebhookValidationChecks.NotExpired; + } + else + { + passed |= WebhookValidationChecks.NotExpired; + } + + if (!string.IsNullOrWhiteSpace(request.Signature)) + { + var nonceStored = await TryStoreNonceAsync( + request.TenantId, + request.ChannelId, + request.Signature, + now + _options.NonceCacheExpiry, + cancellationToken) + .ConfigureAwait(false); + + if (!nonceStored) + { + errors.Add("Duplicate request (replay detected)."); + failed |= WebhookValidationChecks.NotReplay; + } + else + { + passed |= WebhookValidationChecks.NotReplay; + } + } + else + { + passed |= WebhookValidationChecks.NotReplay; + } + } + else + { + passed |= WebhookValidationChecks.NotExpired | WebhookValidationChecks.NotReplay; + } + + if (errors.Count > 0) + { + _logger.LogWarning( + "Webhook validation failed for tenant {TenantId} channel {ChannelId}: {Errors}", + request.TenantId, + request.ChannelId, + string.Join("; ", errors)); + return WebhookValidationResult.Invalid(passed, failed, errors); + } + + return WebhookValidationResult.Valid(passed, warnings.Count > 0 ? warnings : null); + } + + public string GenerateSignature(string payload, string secretKey) + => GenerateSignature(payload, secretKey, _options.DefaultAlgorithm, "hex", null); + + public Task RegisterWebhookAsync( + WebhookSecurityConfig config, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(config); + + return InTenantDbContextAsync( + config.TenantId, + "writer", + async dbContext => + { + var existing = await dbContext.WebhookSecurityConfigs + .FirstOrDefaultAsync( + row => row.TenantId == config.TenantId && row.ChannelId == config.ChannelId, + cancellationToken) + .ConfigureAwait(false); + + var now = _timeProvider.GetUtcNow(); + var entity = new WebhookSecurityConfigEntity + { + Id = existing?.Id ?? Guid.NewGuid(), + TenantId = config.TenantId, + ChannelId = config.ChannelId, + SecretKey = config.SecretKey, + Algorithm = config.Algorithm, + SignatureHeader = config.SignatureHeader, + SignatureFormat = config.SignatureFormat, + SignaturePrefix = config.SignaturePrefix, + TimestampHeader = config.TimestampHeader, + MaxRequestAgeSeconds = (int)Math.Round(config.MaxRequestAge.TotalSeconds), + EnforceIpAllowlist = config.EnforceIpAllowlist, + AllowedIps = Serialize(config.AllowedIps ?? Array.Empty()), + RequireSignature = config.RequireSignature, + Enabled = config.Enabled, + CreatedAt = existing?.CreatedAt ?? (config.CreatedAt == default ? now : config.CreatedAt), + UpdatedAt = now, + }; + + if (existing is null) + { + dbContext.WebhookSecurityConfigs.Add(entity); + } + else + { + dbContext.Entry(existing).CurrentValues.SetValues(entity); + } + + await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false); + + _logger.LogInformation( + "Registered webhook security config for tenant {TenantId} channel {ChannelId}.", + config.TenantId, + config.ChannelId); + }, + cancellationToken); + } + + public Task GetConfigAsync( + string tenantId, + string channelId, + CancellationToken cancellationToken = default) + => InTenantDbContextAsync( + tenantId, + "reader", + async dbContext => + { + var entity = await dbContext.WebhookSecurityConfigs + .AsNoTracking() + .FirstOrDefaultAsync( + row => row.TenantId == tenantId && row.ChannelId == channelId, + cancellationToken) + .ConfigureAwait(false); + return entity is null ? null : MapConfig(entity); + }, + cancellationToken); + + public async Task UpdateAllowlistAsync( + string tenantId, + string channelId, + IReadOnlyList allowedIps, + string actor, + CancellationToken cancellationToken = default) + { + var config = await GetConfigAsync(tenantId, channelId, cancellationToken).ConfigureAwait(false); + if (config is null) + { + throw new InvalidOperationException($"No config found for tenant {tenantId} channel {channelId}"); + } + + await RegisterWebhookAsync( + config with + { + AllowedIps = allowedIps, + UpdatedAt = _timeProvider.GetUtcNow(), + }, + cancellationToken) + .ConfigureAwait(false); + + _logger.LogInformation( + "Updated IP allowlist for tenant {TenantId} channel {ChannelId} by {Actor}. IPs: {IpCount}", + tenantId, + channelId, + actor, + allowedIps.Count); + } + + public async Task IsIpAllowedAsync( + string tenantId, + string channelId, + string ipAddress, + CancellationToken cancellationToken = default) + { + var config = await GetConfigAsync(tenantId, channelId, cancellationToken).ConfigureAwait(false); + if (config is null || !config.EnforceIpAllowlist) + { + return true; + } + + return IsIpAllowedInternal(ipAddress, config.AllowedIps); + } + + public async Task RotateSecretAsync( + string tenantId, + string channelId, + CancellationToken cancellationToken = default) + { + var now = _timeProvider.GetUtcNow(); + var existing = await GetConfigAsync(tenantId, channelId, cancellationToken).ConfigureAwait(false); + var newSecret = Convert.ToHexString(RandomNumberGenerator.GetBytes(16)).ToLowerInvariant(); + + var updatedConfig = existing is null + ? new WebhookSecurityConfig + { + ConfigId = $"wh-{Guid.NewGuid():N}"[..16], + TenantId = tenantId, + ChannelId = channelId, + SecretKey = newSecret, + Algorithm = _options.DefaultAlgorithm, + SignatureHeader = _options.DefaultSignatureHeader, + SignatureFormat = "hex", + MaxRequestAge = _options.DefaultMaxRequestAge, + CreatedAt = now, + UpdatedAt = now, + } + : existing with + { + SecretKey = newSecret, + UpdatedAt = now, + }; + + await RegisterWebhookAsync(updatedConfig, cancellationToken).ConfigureAwait(false); + + return new WebhookSecretRotationResult + { + Success = true, + NewSecret = newSecret, + ActiveAt = now, + OldSecretExpiresAt = null, + }; + } + + public string? GetMaskedSecret(string tenantId, string channelId) + { + var config = GetConfigAsync(tenantId, channelId, CancellationToken.None) + .ConfigureAwait(false) + .GetAwaiter() + .GetResult(); + + if (config is null || string.IsNullOrWhiteSpace(config.SecretKey)) + { + return null; + } + + var secret = config.SecretKey; + if (secret.Length <= 4) + { + return "****"; + } + + return $"{secret[..2]}****{secret[^2..]}"; + } + + private Task TryStoreNonceAsync( + string tenantId, + string channelId, + string nonce, + DateTimeOffset expiresAt, + CancellationToken cancellationToken) + => InTenantDbContextAsync( + tenantId, + "writer", + async dbContext => + { + var now = _timeProvider.GetUtcNow(); + await dbContext.WebhookValidationNonces + .Where(row => row.ExpiresAt <= now) + .ExecuteDeleteAsync(cancellationToken) + .ConfigureAwait(false); + + dbContext.WebhookValidationNonces.Add(new WebhookValidationNonceEntity + { + Id = Guid.NewGuid(), + TenantId = tenantId, + ChannelId = channelId, + Nonce = nonce, + CreatedAt = now, + ExpiresAt = expiresAt, + }); + + try + { + await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false); + return true; + } + catch (DbUpdateException exception) when (IsUniqueViolation(exception)) + { + return false; + } + }, + cancellationToken); + + private bool IsIpAllowedInternal(string ipAddress, IReadOnlyList allowedIps) + { + if (!IPAddress.TryParse(ipAddress, out var ip)) + { + return false; + } + + foreach (var allowedIp in _options.GlobalAllowedIps) + { + if (IpMatchesPattern(ip, allowedIp)) + { + return true; + } + } + + foreach (var allowedIp in allowedIps) + { + if (IpMatchesPattern(ip, allowedIp)) + { + return true; + } + } + + return false; + } + + private static bool IpMatchesPattern(IPAddress ip, string pattern) + { + if (string.IsNullOrWhiteSpace(pattern)) + { + return false; + } + + if (pattern.Contains('/')) + { + return IsInCidrRange(ip, pattern); + } + + return IPAddress.TryParse(pattern, out var allowedIp) && ip.Equals(allowedIp); + } + + private static bool IsInCidrRange(IPAddress ip, string cidr) + { + var parts = cidr.Split('/', 2, StringSplitOptions.TrimEntries); + if (parts.Length != 2 || + !IPAddress.TryParse(parts[0], out var network) || + !int.TryParse(parts[1], out var prefixLength)) + { + return false; + } + + var ipBytes = ip.GetAddressBytes(); + var networkBytes = network.GetAddressBytes(); + if (ipBytes.Length != networkBytes.Length) + { + return false; + } + + var fullBytes = prefixLength / 8; + var remainingBits = prefixLength % 8; + for (var index = 0; index < fullBytes; index++) + { + if (ipBytes[index] != networkBytes[index]) + { + return false; + } + } + + if (remainingBits == 0) + { + return true; + } + + var mask = (byte)(0xFF << (8 - remainingBits)); + return (ipBytes[fullBytes] & mask) == (networkBytes[fullBytes] & mask); + } + + private static bool CompareSignatures(string actual, string expected) + { + var actualBytes = Encoding.UTF8.GetBytes(actual); + var expectedBytes = Encoding.UTF8.GetBytes(expected); + return CryptographicOperations.FixedTimeEquals(actualBytes, expectedBytes); + } + + private static string GenerateSignature( + string payload, + string secretKey, + string algorithm, + string format, + string? prefix) + { + var keyBytes = Encoding.UTF8.GetBytes(secretKey); + var payloadBytes = Encoding.UTF8.GetBytes(payload); + + using var hmac = CreateHmac(algorithm, keyBytes); + var hash = hmac.ComputeHash(payloadBytes); + + var signature = format.ToLowerInvariant() switch + { + "base64" => Convert.ToBase64String(hash), + "base64url" => Convert.ToBase64String(hash).Replace('+', '-').Replace('/', '_').TrimEnd('='), + _ => Convert.ToHexString(hash).ToLowerInvariant(), + }; + + return prefix is not null ? $"{prefix}{signature}" : signature; + } + + private static HMAC CreateHmac(string algorithm, byte[] key) + => algorithm.ToUpperInvariant() switch + { + "SHA384" => new HMACSHA384(key), + "SHA512" => new HMACSHA512(key), + _ => new HMACSHA256(key), + }; + + private static WebhookSecurityConfig MapConfig(WebhookSecurityConfigEntity entity) + { + var allowedIps = DeserializeList(entity.AllowedIps); + return new WebhookSecurityConfig + { + ConfigId = entity.Id.ToString("N"), + TenantId = entity.TenantId, + ChannelId = entity.ChannelId, + SecretKey = entity.SecretKey, + Algorithm = entity.Algorithm, + SignatureHeader = entity.SignatureHeader, + SignatureFormat = entity.SignatureFormat, + SignaturePrefix = entity.SignaturePrefix, + TimestampHeader = entity.TimestampHeader, + MaxRequestAge = TimeSpan.FromSeconds(entity.MaxRequestAgeSeconds), + EnforceIpAllowlist = entity.EnforceIpAllowlist, + AllowedIps = allowedIps, + RequireSignature = entity.RequireSignature, + Enabled = entity.Enabled, + CreatedAt = entity.CreatedAt, + UpdatedAt = entity.UpdatedAt, + }; + } + + private static bool IsUniqueViolation(DbUpdateException exception) + => exception.InnerException is PostgresException postgres && postgres.SqlState == PostgresErrorCodes.UniqueViolation; + + private static string Serialize(T value) => JsonSerializer.Serialize(value, JsonOptions); + + private static IReadOnlyList DeserializeList(string? json) + => string.IsNullOrWhiteSpace(json) + ? Array.Empty() + : (JsonSerializer.Deserialize>(json, JsonOptions) ?? new List()); +} diff --git a/src/Notify/__Libraries/StellaOps.Notify.Persistence/Migrations/004_security_deadletter_runtime_state.sql b/src/Notify/__Libraries/StellaOps.Notify.Persistence/Migrations/004_security_deadletter_runtime_state.sql new file mode 100644 index 000000000..1abf7f933 --- /dev/null +++ b/src/Notify/__Libraries/StellaOps.Notify.Persistence/Migrations/004_security_deadletter_runtime_state.sql @@ -0,0 +1,152 @@ +CREATE TABLE IF NOT EXISTS notify.webhook_security_configs ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + tenant_id TEXT NOT NULL, + channel_id TEXT NOT NULL, + secret_key TEXT NOT NULL, + algorithm TEXT NOT NULL, + signature_header TEXT NOT NULL, + signature_format TEXT NOT NULL, + signature_prefix TEXT NULL, + timestamp_header TEXT NULL, + max_request_age_seconds INTEGER NOT NULL DEFAULT 300, + enforce_ip_allowlist BOOLEAN NOT NULL DEFAULT FALSE, + allowed_ips JSONB NOT NULL DEFAULT '[]'::jsonb, + require_signature BOOLEAN NOT NULL DEFAULT TRUE, + enabled BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE UNIQUE INDEX IF NOT EXISTS uq_webhook_security_configs_tenant_channel + ON notify.webhook_security_configs (tenant_id, channel_id); + +CREATE TABLE IF NOT EXISTS notify.webhook_validation_nonces ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + tenant_id TEXT NOT NULL, + channel_id TEXT NOT NULL, + nonce TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + expires_at TIMESTAMPTZ NOT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS uq_webhook_validation_nonces_scope + ON notify.webhook_validation_nonces (tenant_id, channel_id, nonce); + +CREATE INDEX IF NOT EXISTS idx_webhook_validation_nonces_expires_at + ON notify.webhook_validation_nonces (expires_at); + +CREATE TABLE IF NOT EXISTS notify.tenant_resource_ownership ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + tenant_id TEXT NOT NULL, + resource_type TEXT NOT NULL, + resource_id TEXT NOT NULL, + metadata JSONB NOT NULL DEFAULT '{}'::jsonb, + registered_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE UNIQUE INDEX IF NOT EXISTS uq_tenant_resource_ownership_resource + ON notify.tenant_resource_ownership (resource_type, resource_id); + +CREATE INDEX IF NOT EXISTS idx_tenant_resource_ownership_tenant_type + ON notify.tenant_resource_ownership (tenant_id, resource_type); + +CREATE TABLE IF NOT EXISTS notify.tenant_cross_grants ( + grant_id TEXT PRIMARY KEY, + owner_tenant_id TEXT NOT NULL, + target_tenant_id TEXT NOT NULL, + resource_type TEXT NOT NULL, + resource_id TEXT NOT NULL, + allowed_operations INTEGER NOT NULL, + granted_at TIMESTAMPTZ NOT NULL DEFAULT now(), + granted_by TEXT NOT NULL, + expires_at TIMESTAMPTZ NULL, + is_active BOOLEAN NOT NULL DEFAULT TRUE, + revoked_at TIMESTAMPTZ NULL, + revoked_by TEXT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS uq_tenant_cross_grants_scope + ON notify.tenant_cross_grants (owner_tenant_id, target_tenant_id, resource_type, resource_id); + +CREATE INDEX IF NOT EXISTS idx_tenant_cross_grants_target_active + ON notify.tenant_cross_grants (target_tenant_id, is_active, expires_at); + +CREATE TABLE IF NOT EXISTS notify.tenant_isolation_violations ( + violation_id TEXT PRIMARY KEY, + requesting_tenant_id TEXT NOT NULL, + resource_owner_tenant_id TEXT NOT NULL, + resource_type TEXT NOT NULL, + resource_id TEXT NOT NULL, + operation INTEGER NOT NULL, + occurred_at TIMESTAMPTZ NOT NULL DEFAULT now(), + context TEXT NULL, + severity INTEGER NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_tenant_isolation_violations_requesting_tenant + ON notify.tenant_isolation_violations (requesting_tenant_id, occurred_at DESC); + +CREATE INDEX IF NOT EXISTS idx_tenant_isolation_violations_occurred_at + ON notify.tenant_isolation_violations (occurred_at DESC); + +CREATE TABLE IF NOT EXISTS notify.dead_letter_entries ( + entry_id TEXT PRIMARY KEY, + tenant_id TEXT NOT NULL, + delivery_id TEXT NOT NULL, + event_id TEXT NULL, + channel_id TEXT NULL, + channel_type TEXT NOT NULL, + failure_reason TEXT NOT NULL, + failure_details TEXT NULL, + attempt_count INTEGER NOT NULL DEFAULT 0, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + first_attempt_at TIMESTAMPTZ NOT NULL, + last_attempt_at TIMESTAMPTZ NULL, + status TEXT NOT NULL, + retry_count INTEGER NOT NULL DEFAULT 0, + last_retry_at TIMESTAMPTZ NULL, + resolution TEXT NULL, + resolved_by TEXT NULL, + resolved_at TIMESTAMPTZ NULL, + metadata JSONB NOT NULL DEFAULT '{}'::jsonb, + original_payload TEXT NULL, + exception_type TEXT NULL, + exception_message TEXT NULL +); + +CREATE INDEX IF NOT EXISTS idx_dead_letter_entries_tenant_created + ON notify.dead_letter_entries (tenant_id, created_at DESC); + +CREATE INDEX IF NOT EXISTS idx_dead_letter_entries_tenant_status + ON notify.dead_letter_entries (tenant_id, status); + +CREATE TABLE IF NOT EXISTS notify.retention_policies_runtime ( + tenant_id TEXT PRIMARY KEY, + delivery_retention_seconds BIGINT NOT NULL, + audit_retention_seconds BIGINT NOT NULL, + dead_letter_retention_seconds BIGINT NOT NULL, + storm_data_retention_seconds BIGINT NOT NULL, + inbox_retention_seconds BIGINT NOT NULL, + event_history_retention_seconds BIGINT NOT NULL, + auto_cleanup_enabled BOOLEAN NOT NULL DEFAULT TRUE, + cleanup_schedule TEXT NOT NULL, + max_deletes_per_run INTEGER NOT NULL DEFAULT 10000, + extend_resolved_retention BOOLEAN NOT NULL DEFAULT TRUE, + resolved_retention_multiplier DOUBLE PRECISION NOT NULL DEFAULT 2.0, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE TABLE IF NOT EXISTS notify.retention_cleanup_executions_runtime ( + execution_id TEXT PRIMARY KEY, + tenant_id TEXT NOT NULL, + started_at TIMESTAMPTZ NOT NULL DEFAULT now(), + completed_at TIMESTAMPTZ NULL, + status TEXT NOT NULL, + counts_json JSONB NULL, + error TEXT NULL, + policy_json JSONB NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_retention_cleanup_executions_runtime_tenant_started + ON notify.retention_cleanup_executions_runtime (tenant_id, started_at DESC); diff --git a/src/Notify/__Libraries/StellaOps.Notify.Persistence/Postgres/Models/DeadLetterRuntimeEntity.cs b/src/Notify/__Libraries/StellaOps.Notify.Persistence/Postgres/Models/DeadLetterRuntimeEntity.cs new file mode 100644 index 000000000..f1b6bb466 --- /dev/null +++ b/src/Notify/__Libraries/StellaOps.Notify.Persistence/Postgres/Models/DeadLetterRuntimeEntity.cs @@ -0,0 +1,30 @@ +namespace StellaOps.Notify.Persistence.Postgres.Models; + +/// +/// Durable dead-letter admin entry shared by the Notify/Notifier admin surfaces. +/// +public sealed class DeadLetterRuntimeEntity +{ + public required string EntryId { get; init; } + public required string TenantId { get; init; } + public required string DeliveryId { get; init; } + public string? EventId { get; init; } + public string? ChannelId { get; init; } + public required string ChannelType { get; init; } + public required string FailureReason { get; init; } + public string? FailureDetails { get; init; } + public int AttemptCount { get; init; } + public DateTimeOffset CreatedAt { get; init; } + public DateTimeOffset FirstAttemptAt { get; init; } + public DateTimeOffset? LastAttemptAt { get; init; } + public required string Status { get; init; } + public int RetryCount { get; init; } + public DateTimeOffset? LastRetryAt { get; init; } + public string? Resolution { get; init; } + public string? ResolvedBy { get; init; } + public DateTimeOffset? ResolvedAt { get; init; } + public string Metadata { get; init; } = "{}"; + public string? OriginalPayload { get; init; } + public string? ExceptionType { get; init; } + public string? ExceptionMessage { get; init; } +} diff --git a/src/Notify/__Libraries/StellaOps.Notify.Persistence/Postgres/Models/WebhookSecurityConfigEntity.cs b/src/Notify/__Libraries/StellaOps.Notify.Persistence/Postgres/Models/WebhookSecurityConfigEntity.cs new file mode 100644 index 000000000..3bbe9b2e3 --- /dev/null +++ b/src/Notify/__Libraries/StellaOps.Notify.Persistence/Postgres/Models/WebhookSecurityConfigEntity.cs @@ -0,0 +1,24 @@ +namespace StellaOps.Notify.Persistence.Postgres.Models; + +/// +/// Durable webhook security configuration for a tenant/channel pair. +/// +public sealed class WebhookSecurityConfigEntity +{ + public Guid Id { get; init; } + public required string TenantId { get; init; } + public required string ChannelId { get; init; } + public required string SecretKey { get; init; } + public required string Algorithm { get; init; } + public required string SignatureHeader { get; init; } + public required string SignatureFormat { get; init; } + public string? SignaturePrefix { get; init; } + public string? TimestampHeader { get; init; } + public int MaxRequestAgeSeconds { get; init; } + public bool EnforceIpAllowlist { get; init; } + public string AllowedIps { get; init; } = "[]"; + public bool RequireSignature { get; init; } + public bool Enabled { get; init; } + public DateTimeOffset CreatedAt { get; init; } + public DateTimeOffset UpdatedAt { get; init; } +} diff --git a/src/Notify/__Libraries/StellaOps.Notify.Persistence/Postgres/Models/WebhookValidationNonceEntity.cs b/src/Notify/__Libraries/StellaOps.Notify.Persistence/Postgres/Models/WebhookValidationNonceEntity.cs new file mode 100644 index 000000000..ef1b771fe --- /dev/null +++ b/src/Notify/__Libraries/StellaOps.Notify.Persistence/Postgres/Models/WebhookValidationNonceEntity.cs @@ -0,0 +1,14 @@ +namespace StellaOps.Notify.Persistence.Postgres.Models; + +/// +/// Replay-protection nonce captured for webhook validation. +/// +public sealed class WebhookValidationNonceEntity +{ + public Guid Id { get; init; } + public required string TenantId { get; init; } + public required string ChannelId { get; init; } + public required string Nonce { get; init; } + public DateTimeOffset CreatedAt { get; init; } + public DateTimeOffset ExpiresAt { get; init; } +}