feat(notifier): security + deadletter runtime

Sprint SPRINT_20260416_011_Notify_truthful_security_deadletter_runtime.

- Migration 004 security_deadletter_runtime_state.
- DeadLetterRuntimeEntity + WebhookSecurityConfigEntity +
  WebhookValidationNonceEntity persistence models.
- PostgresDeadLetterService + PostgresDeadLetterHandler observability.
- PostgresTenantIsolationValidator + PostgresWebhookSecurityService.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
master
2026-04-19 14:40:06 +03:00
parent 43d8398a5d
commit 45ebcb88b9
8 changed files with 2362 additions and 0 deletions

View File

@@ -0,0 +1,408 @@
using System.Collections.Immutable;
using System.Text.Json;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using StellaOps.Notifier.Worker.Observability;
using StellaOps.Notifier.Worker.Storage;
using StellaOps.Notify.Persistence.Postgres.Models;
namespace StellaOps.Notifier.Worker.DeadLetter;
public sealed class PostgresDeadLetterService : PostgresNotifyRuntimeServiceBase, IDeadLetterService
{
private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web);
private readonly TimeProvider _timeProvider;
private readonly INotifyMetrics? _metrics;
private readonly ILogger<PostgresDeadLetterService> _logger;
public PostgresDeadLetterService(
NotifyDurableStorageSupport support,
TimeProvider timeProvider,
ILogger<PostgresDeadLetterService> logger,
INotifyMetrics? metrics = null)
: base(support)
{
_timeProvider = timeProvider ?? TimeProvider.System;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_metrics = metrics;
}
public Task<DeadLetterEntry> EnqueueAsync(
DeadLetterEnqueueRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
return InTenantDbContextAsync(
request.TenantId,
"writer",
async dbContext =>
{
var now = _timeProvider.GetUtcNow();
var entry = new DeadLetterRuntimeEntity
{
EntryId = $"dl-{Guid.NewGuid():N}"[..19],
TenantId = request.TenantId,
DeliveryId = request.DeliveryId,
EventId = request.EventId,
ChannelId = request.ChannelId,
ChannelType = request.ChannelType,
FailureReason = request.FailureReason,
FailureDetails = request.FailureDetails,
AttemptCount = request.AttemptCount,
CreatedAt = now,
FirstAttemptAt = request.LastAttemptAt ?? now,
LastAttemptAt = request.LastAttemptAt ?? now,
Status = DeadLetterStatus.Pending.ToString(),
RetryCount = 0,
Metadata = SerializeDictionary(request.Metadata),
OriginalPayload = request.OriginalPayload,
ExceptionType = null,
ExceptionMessage = null,
};
dbContext.DeadLetterEntries.Add(entry);
await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
_metrics?.RecordDeadLetter(request.TenantId, request.FailureReason, request.ChannelType);
_logger.LogWarning(
"Dead-lettered delivery {DeliveryId} for tenant {TenantId}: {Reason}",
request.DeliveryId,
request.TenantId,
request.FailureReason);
return MapEntry(entry);
},
cancellationToken);
}
public Task<DeadLetterEntry?> GetAsync(
string tenantId,
string entryId,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
ArgumentException.ThrowIfNullOrWhiteSpace(entryId);
return InTenantDbContextAsync(
tenantId,
"reader",
async dbContext =>
{
var entity = await dbContext.DeadLetterEntries
.AsNoTracking()
.FirstOrDefaultAsync(row => row.EntryId == entryId, cancellationToken)
.ConfigureAwait(false);
return entity is null ? null : MapEntry(entity);
},
cancellationToken);
}
public Task<IReadOnlyList<DeadLetterEntry>> ListAsync(
string tenantId,
DeadLetterListOptions? options = null,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
options ??= new DeadLetterListOptions();
return InTenantDbContextAsync(
tenantId,
"reader",
async dbContext =>
{
var query = dbContext.DeadLetterEntries
.AsNoTracking()
.Where(row => row.TenantId == tenantId);
if (options.Status.HasValue)
{
var status = options.Status.Value.ToString();
query = query.Where(row => row.Status == status);
}
if (!string.IsNullOrWhiteSpace(options.ChannelId))
{
query = query.Where(row => row.ChannelId == options.ChannelId);
}
if (!string.IsNullOrWhiteSpace(options.ChannelType))
{
query = query.Where(row => row.ChannelType == options.ChannelType);
}
if (options.Since.HasValue)
{
query = query.Where(row => row.CreatedAt >= options.Since.Value);
}
if (options.Until.HasValue)
{
query = query.Where(row => row.CreatedAt <= options.Until.Value);
}
var rows = await query
.OrderByDescending(row => row.CreatedAt)
.Skip(options.Offset)
.Take(options.Limit)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
return (IReadOnlyList<DeadLetterEntry>)rows.Select(MapEntry).ToList();
},
cancellationToken);
}
public Task<DeadLetterRetryResult> RetryAsync(
string tenantId,
string entryId,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
ArgumentException.ThrowIfNullOrWhiteSpace(entryId);
return InTenantDbContextAsync(
tenantId,
"writer",
async dbContext =>
{
var entity = await dbContext.DeadLetterEntries
.FirstOrDefaultAsync(row => row.EntryId == entryId, cancellationToken)
.ConfigureAwait(false);
if (entity is null)
{
return new DeadLetterRetryResult
{
EntryId = entryId,
Success = false,
Error = "Entry not found",
};
}
var currentStatus = ParseStatus(entity.Status);
if (currentStatus is DeadLetterStatus.Retried or DeadLetterStatus.Resolved)
{
return new DeadLetterRetryResult
{
EntryId = entryId,
Success = false,
Error = $"Entry is already {currentStatus}",
};
}
var now = _timeProvider.GetUtcNow();
dbContext.Entry(entity).CurrentValues.SetValues(new DeadLetterRuntimeEntity
{
EntryId = entity.EntryId,
TenantId = entity.TenantId,
DeliveryId = entity.DeliveryId,
EventId = entity.EventId,
ChannelId = entity.ChannelId,
ChannelType = entity.ChannelType,
FailureReason = entity.FailureReason,
FailureDetails = entity.FailureDetails,
AttemptCount = entity.AttemptCount,
CreatedAt = entity.CreatedAt,
FirstAttemptAt = entity.FirstAttemptAt,
LastAttemptAt = entity.LastAttemptAt,
Status = DeadLetterStatus.Retried.ToString(),
RetryCount = entity.RetryCount + 1,
LastRetryAt = now,
Resolution = entity.Resolution,
ResolvedBy = entity.ResolvedBy,
ResolvedAt = entity.ResolvedAt,
Metadata = entity.Metadata,
OriginalPayload = entity.OriginalPayload,
ExceptionType = entity.ExceptionType,
ExceptionMessage = entity.ExceptionMessage,
});
await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
_logger.LogInformation(
"Retried dead-letter entry {EntryId} for tenant {TenantId}",
entryId,
tenantId);
return new DeadLetterRetryResult
{
EntryId = entryId,
Success = true,
RetriedAt = now,
NewDeliveryId = Guid.NewGuid().ToString("N"),
};
},
cancellationToken);
}
public async Task<IReadOnlyList<DeadLetterRetryResult>> RetryBatchAsync(
string tenantId,
IEnumerable<string> entryIds,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
ArgumentNullException.ThrowIfNull(entryIds);
var results = new List<DeadLetterRetryResult>();
foreach (var entryId in entryIds)
{
results.Add(await RetryAsync(tenantId, entryId, cancellationToken).ConfigureAwait(false));
}
return results;
}
public Task ResolveAsync(
string tenantId,
string entryId,
string resolution,
string? resolvedBy = null,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
ArgumentException.ThrowIfNullOrWhiteSpace(entryId);
ArgumentException.ThrowIfNullOrWhiteSpace(resolution);
return InTenantDbContextAsync(
tenantId,
"writer",
async dbContext =>
{
var entity = await dbContext.DeadLetterEntries
.FirstOrDefaultAsync(row => row.EntryId == entryId, cancellationToken)
.ConfigureAwait(false);
if (entity is null)
{
return;
}
dbContext.Entry(entity).CurrentValues.SetValues(new DeadLetterRuntimeEntity
{
EntryId = entity.EntryId,
TenantId = entity.TenantId,
DeliveryId = entity.DeliveryId,
EventId = entity.EventId,
ChannelId = entity.ChannelId,
ChannelType = entity.ChannelType,
FailureReason = entity.FailureReason,
FailureDetails = entity.FailureDetails,
AttemptCount = entity.AttemptCount,
CreatedAt = entity.CreatedAt,
FirstAttemptAt = entity.FirstAttemptAt,
LastAttemptAt = entity.LastAttemptAt,
Status = DeadLetterStatus.Resolved.ToString(),
RetryCount = entity.RetryCount,
LastRetryAt = entity.LastRetryAt,
Resolution = resolution,
ResolvedBy = resolvedBy,
ResolvedAt = _timeProvider.GetUtcNow(),
Metadata = entity.Metadata,
OriginalPayload = entity.OriginalPayload,
ExceptionType = entity.ExceptionType,
ExceptionMessage = entity.ExceptionMessage,
});
await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
_logger.LogInformation(
"Resolved dead-letter entry {EntryId} for tenant {TenantId}: {Resolution}",
entryId,
tenantId,
resolution);
},
cancellationToken);
}
public Task<int> PurgeExpiredAsync(
string tenantId,
TimeSpan maxAge,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
return InTenantDbContextAsync(
tenantId,
"writer",
async dbContext =>
{
var cutoff = _timeProvider.GetUtcNow() - maxAge;
return await dbContext.DeadLetterEntries
.Where(row => row.TenantId == tenantId && row.CreatedAt < cutoff)
.ExecuteDeleteAsync(cancellationToken)
.ConfigureAwait(false);
},
cancellationToken);
}
public Task<DeadLetterStats> GetStatsAsync(
string tenantId,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
return InTenantDbContextAsync(
tenantId,
"reader",
async dbContext =>
{
var rows = await dbContext.DeadLetterEntries
.AsNoTracking()
.Where(row => row.TenantId == tenantId)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
return new DeadLetterStats
{
TotalCount = rows.Count,
PendingCount = rows.Count(row => ParseStatus(row.Status) == DeadLetterStatus.Pending),
RetryingCount = rows.Count(row => ParseStatus(row.Status) == DeadLetterStatus.Retrying),
RetriedCount = rows.Count(row => ParseStatus(row.Status) == DeadLetterStatus.Retried),
ResolvedCount = rows.Count(row => ParseStatus(row.Status) == DeadLetterStatus.Resolved),
ExhaustedCount = rows.Count(row => ParseStatus(row.Status) == DeadLetterStatus.Exhausted),
ByChannel = rows.GroupBy(row => row.ChannelType).ToDictionary(group => group.Key, group => group.Count()),
ByReason = rows.GroupBy(row => row.FailureReason).ToDictionary(group => group.Key, group => group.Count()),
OldestEntryAt = rows.MinBy(row => row.CreatedAt)?.CreatedAt,
NewestEntryAt = rows.MaxBy(row => row.CreatedAt)?.CreatedAt,
};
},
cancellationToken);
}
private static DeadLetterEntry MapEntry(DeadLetterRuntimeEntity entity)
=> new()
{
EntryId = entity.EntryId,
TenantId = entity.TenantId,
DeliveryId = entity.DeliveryId,
EventId = entity.EventId ?? string.Empty,
ChannelId = entity.ChannelId ?? string.Empty,
ChannelType = entity.ChannelType,
FailureReason = entity.FailureReason,
FailureDetails = entity.FailureDetails,
AttemptCount = entity.AttemptCount,
CreatedAt = entity.CreatedAt,
LastAttemptAt = entity.LastAttemptAt,
Status = ParseStatus(entity.Status),
RetryCount = entity.RetryCount,
LastRetryAt = entity.LastRetryAt,
Resolution = entity.Resolution,
ResolvedBy = entity.ResolvedBy,
ResolvedAt = entity.ResolvedAt,
Metadata = DeserializeDictionary(entity.Metadata).ToImmutableDictionary(),
OriginalPayload = entity.OriginalPayload,
};
private static DeadLetterStatus ParseStatus(string? status)
=> Enum.TryParse<DeadLetterStatus>(status, true, out var parsed)
? parsed
: DeadLetterStatus.Pending;
private static string SerializeDictionary(IReadOnlyDictionary<string, string>? metadata)
=> JsonSerializer.Serialize(metadata ?? ImmutableDictionary<string, string>.Empty, JsonOptions);
private static Dictionary<string, string> DeserializeDictionary(string? json)
=> string.IsNullOrWhiteSpace(json)
? new Dictionary<string, string>()
: (JsonSerializer.Deserialize<Dictionary<string, string>>(json, JsonOptions) ?? new Dictionary<string, string>());
}

View File

@@ -0,0 +1,454 @@
using System.Text.Json;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using StellaOps.Notifier.Worker.Storage;
using StellaOps.Notify.Persistence.EfCore.Context;
using StellaOps.Notify.Persistence.Postgres.Models;
using ServiceDeadLetterStatus = StellaOps.Notifier.Worker.DeadLetter.DeadLetterStatus;
namespace StellaOps.Notifier.Worker.Observability;
public sealed class PostgresDeadLetterHandler : PostgresNotifyRuntimeServiceBase, IDeadLetterHandler
{
private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web);
private readonly TimeProvider _timeProvider;
private readonly ILogger<PostgresDeadLetterHandler> _logger;
public PostgresDeadLetterHandler(
NotifyDurableStorageSupport support,
TimeProvider timeProvider,
ILogger<PostgresDeadLetterHandler> logger)
: base(support)
{
_timeProvider = timeProvider ?? TimeProvider.System;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public Task<DeadLetteredDelivery> DeadLetterAsync(
string tenantId,
string deliveryId,
DeadLetterReason reason,
string channelType,
object? payload = null,
Exception? exception = null,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
ArgumentException.ThrowIfNullOrWhiteSpace(deliveryId);
ArgumentException.ThrowIfNullOrWhiteSpace(channelType);
return InTenantDbContextAsync(
tenantId,
"writer",
async dbContext =>
{
var now = _timeProvider.GetUtcNow();
var entity = new DeadLetterRuntimeEntity
{
EntryId = $"dl-{Guid.NewGuid():N}"[..19],
TenantId = tenantId,
DeliveryId = deliveryId,
EventId = deliveryId,
ChannelId = channelType,
ChannelType = channelType,
FailureReason = reason.ToString(),
FailureDetails = exception?.Message,
AttemptCount = 0,
CreatedAt = now,
FirstAttemptAt = now,
LastAttemptAt = now,
Status = ServiceDeadLetterStatus.Pending.ToString(),
RetryCount = 0,
Metadata = "{}",
OriginalPayload = SerializePayload(payload),
ExceptionType = exception?.GetType().FullName,
ExceptionMessage = exception?.Message,
};
dbContext.DeadLetterEntries.Add(entity);
await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
_logger.LogWarning(
"Dead-lettered delivery {DeliveryId} for tenant {TenantId}: {Reason}",
deliveryId,
tenantId,
reason);
return MapDeadLetteredDelivery(entity);
},
cancellationToken);
}
public Task<IReadOnlyList<DeadLetteredDelivery>> GetAsync(
string tenantId,
DeadLetterQuery? query = null,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
query ??= new DeadLetterQuery();
return InTenantDbContextAsync(
tenantId,
"reader",
async dbContext =>
{
var rows = dbContext.DeadLetterEntries
.AsNoTracking()
.Where(row => row.TenantId == tenantId);
if (!string.IsNullOrWhiteSpace(query.Id))
{
rows = rows.Where(row => row.EntryId == query.Id);
}
if (query.Reason.HasValue)
{
var reason = query.Reason.Value.ToString();
rows = rows.Where(row => row.FailureReason == reason);
}
if (!string.IsNullOrWhiteSpace(query.ChannelType))
{
rows = rows.Where(row => row.ChannelType == query.ChannelType);
}
if (query.Status.HasValue)
{
var statuses = MapStatuses(query.Status.Value);
rows = rows.Where(row => statuses.Contains(row.Status));
}
if (query.After.HasValue)
{
rows = rows.Where(row => row.CreatedAt > query.After.Value);
}
if (query.Before.HasValue)
{
rows = rows.Where(row => row.CreatedAt < query.Before.Value);
}
var entities = await rows
.OrderByDescending(row => row.CreatedAt)
.Skip(query.Offset)
.Take(query.Limit)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
return (IReadOnlyList<DeadLetteredDelivery>)entities.Select(MapDeadLetteredDelivery).ToList();
},
cancellationToken);
}
public Task<DeadLetterRetryResult> RetryAsync(
string tenantId,
string deadLetterId,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
ArgumentException.ThrowIfNullOrWhiteSpace(deadLetterId);
return InTenantDbContextAsync(
tenantId,
"writer",
async dbContext =>
{
var entity = await dbContext.DeadLetterEntries
.FirstOrDefaultAsync(row => row.EntryId == deadLetterId, cancellationToken)
.ConfigureAwait(false);
if (entity is null)
{
return new DeadLetterRetryResult
{
DeadLetterId = deadLetterId,
Success = false,
Error = "Not found",
NewStatus = DeadLetterStatus.Pending,
};
}
var currentStatus = MapStatus(entity.Status);
if (currentStatus is DeadLetterStatus.Retried or DeadLetterStatus.Discarded)
{
return new DeadLetterRetryResult
{
DeadLetterId = deadLetterId,
Success = false,
Error = "Entry is already terminal",
NewStatus = currentStatus,
};
}
dbContext.Entry(entity).CurrentValues.SetValues(new DeadLetterRuntimeEntity
{
EntryId = entity.EntryId,
TenantId = entity.TenantId,
DeliveryId = entity.DeliveryId,
EventId = entity.EventId,
ChannelId = entity.ChannelId,
ChannelType = entity.ChannelType,
FailureReason = entity.FailureReason,
FailureDetails = entity.FailureDetails,
AttemptCount = entity.AttemptCount,
CreatedAt = entity.CreatedAt,
FirstAttemptAt = entity.FirstAttemptAt,
LastAttemptAt = entity.LastAttemptAt,
Status = ServiceDeadLetterStatus.Retried.ToString(),
RetryCount = entity.RetryCount + 1,
LastRetryAt = _timeProvider.GetUtcNow(),
Resolution = entity.Resolution,
ResolvedBy = entity.ResolvedBy,
ResolvedAt = entity.ResolvedAt,
Metadata = entity.Metadata,
OriginalPayload = entity.OriginalPayload,
ExceptionType = entity.ExceptionType,
ExceptionMessage = entity.ExceptionMessage,
});
await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
return new DeadLetterRetryResult
{
DeadLetterId = deadLetterId,
Success = true,
NewStatus = DeadLetterStatus.Retried,
};
},
cancellationToken);
}
public async Task<DeadLetterBulkRetryResult> RetryBulkAsync(
string tenantId,
DeadLetterQuery? query = null,
CancellationToken cancellationToken = default)
{
var deadLetters = await GetAsync(tenantId, query, cancellationToken).ConfigureAwait(false);
var results = new List<DeadLetterRetryResult>();
foreach (var deadLetter in deadLetters.Where(candidate => candidate.Status == DeadLetterStatus.Pending))
{
results.Add(await RetryAsync(tenantId, deadLetter.DeadLetterId, cancellationToken).ConfigureAwait(false));
}
return new DeadLetterBulkRetryResult
{
Total = results.Count,
Succeeded = results.Count(result => result.Success),
Failed = results.Count(result => !result.Success),
Results = results,
};
}
public Task<bool> DiscardAsync(
string tenantId,
string deadLetterId,
string? reason = null,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
ArgumentException.ThrowIfNullOrWhiteSpace(deadLetterId);
return InTenantDbContextAsync(
tenantId,
"writer",
async dbContext =>
{
var entity = await dbContext.DeadLetterEntries
.FirstOrDefaultAsync(row => row.EntryId == deadLetterId, cancellationToken)
.ConfigureAwait(false);
if (entity is null)
{
return false;
}
dbContext.Entry(entity).CurrentValues.SetValues(new DeadLetterRuntimeEntity
{
EntryId = entity.EntryId,
TenantId = entity.TenantId,
DeliveryId = entity.DeliveryId,
EventId = entity.EventId,
ChannelId = entity.ChannelId,
ChannelType = entity.ChannelType,
FailureReason = entity.FailureReason,
FailureDetails = entity.FailureDetails,
AttemptCount = entity.AttemptCount,
CreatedAt = entity.CreatedAt,
FirstAttemptAt = entity.FirstAttemptAt,
LastAttemptAt = entity.LastAttemptAt,
Status = ServiceDeadLetterStatus.Resolved.ToString(),
RetryCount = entity.RetryCount,
LastRetryAt = entity.LastRetryAt,
Resolution = reason,
ResolvedBy = null,
ResolvedAt = _timeProvider.GetUtcNow(),
Metadata = entity.Metadata,
OriginalPayload = entity.OriginalPayload,
ExceptionType = entity.ExceptionType,
ExceptionMessage = entity.ExceptionMessage,
});
await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
return true;
},
cancellationToken);
}
public Task<DeadLetterStats> GetStatsAsync(
string? tenantId = null,
CancellationToken cancellationToken = default)
{
if (!string.IsNullOrWhiteSpace(tenantId))
{
return InTenantDbContextAsync(
tenantId,
"reader",
dbContext => BuildStatsAsync(dbContext, tenantId, cancellationToken),
cancellationToken);
}
return InSystemDbContextAsync(
dbContext => BuildStatsAsync(dbContext, null, cancellationToken),
cancellationToken);
}
public Task<int> PurgeAsync(
string? tenantId,
TimeSpan olderThan,
CancellationToken cancellationToken = default)
{
if (!string.IsNullOrWhiteSpace(tenantId))
{
return InTenantDbContextAsync(
tenantId,
"writer",
async dbContext =>
{
var cutoff = _timeProvider.GetUtcNow() - olderThan;
return await dbContext.DeadLetterEntries
.Where(row => row.TenantId == tenantId && row.CreatedAt < cutoff)
.ExecuteDeleteAsync(cancellationToken)
.ConfigureAwait(false);
},
cancellationToken);
}
return InSystemDbContextAsync(
async dbContext =>
{
var cutoff = _timeProvider.GetUtcNow() - olderThan;
return await dbContext.DeadLetterEntries
.Where(row => row.CreatedAt < cutoff)
.ExecuteDeleteAsync(cancellationToken)
.ConfigureAwait(false);
},
cancellationToken);
}
private Task<DeadLetterStats> BuildStatsAsync(
NotifyDbContext dbContext,
string? tenantId,
CancellationToken cancellationToken)
{
var query = dbContext.DeadLetterEntries.AsNoTracking().AsQueryable();
if (!string.IsNullOrWhiteSpace(tenantId))
{
query = query.Where(row => row.TenantId == tenantId);
}
return BuildStatsCoreAsync(query, tenantId, cancellationToken);
}
private async Task<DeadLetterStats> BuildStatsCoreAsync(
IQueryable<DeadLetterRuntimeEntity> query,
string? tenantId,
CancellationToken cancellationToken)
{
var rows = await query.ToListAsync(cancellationToken).ConfigureAwait(false);
return new DeadLetterStats
{
Timestamp = _timeProvider.GetUtcNow(),
TenantId = tenantId,
TotalCount = rows.Count,
PendingCount = rows.Count(row => MapStatus(row.Status) == DeadLetterStatus.Pending),
RetryingCount = rows.Count(row => MapStatus(row.Status) == DeadLetterStatus.Retrying),
RetriedCount = rows.Count(row => MapStatus(row.Status) == DeadLetterStatus.Retried),
DiscardedCount = rows.Count(row => MapStatus(row.Status) == DeadLetterStatus.Discarded),
ByReason = rows.GroupBy(row => MapReason(row.FailureReason)).ToDictionary(group => group.Key, group => group.Count()),
ByChannel = rows.GroupBy(row => row.ChannelType).ToDictionary(group => group.Key, group => group.Count()),
OldestDeadLetterAt = rows.MinBy(row => row.CreatedAt)?.CreatedAt,
NewestDeadLetterAt = rows.MaxBy(row => row.CreatedAt)?.CreatedAt,
};
}
private static IReadOnlyCollection<string> MapStatuses(DeadLetterStatus status)
=> status switch
{
DeadLetterStatus.Discarded => new[] { ServiceDeadLetterStatus.Resolved.ToString(), ServiceDeadLetterStatus.Exhausted.ToString() },
DeadLetterStatus.Retrying => new[] { ServiceDeadLetterStatus.Retrying.ToString() },
DeadLetterStatus.Retried => new[] { ServiceDeadLetterStatus.Retried.ToString() },
_ => new[] { ServiceDeadLetterStatus.Pending.ToString() },
};
private static DeadLetteredDelivery MapDeadLetteredDelivery(DeadLetterRuntimeEntity entity)
=> new()
{
DeadLetterId = entity.EntryId,
TenantId = entity.TenantId,
DeliveryId = entity.DeliveryId,
ChannelType = entity.ChannelType,
Reason = MapReason(entity.FailureReason),
ReasonDetails = entity.FailureDetails,
OriginalPayload = DeserializePayload(entity.OriginalPayload),
ExceptionType = entity.ExceptionType,
ExceptionMessage = entity.ExceptionMessage,
AttemptCount = entity.AttemptCount,
FirstAttemptAt = entity.FirstAttemptAt,
DeadLetteredAt = entity.CreatedAt,
LastRetryAt = entity.LastRetryAt,
RetryCount = entity.RetryCount,
Status = MapStatus(entity.Status),
DiscardReason = entity.Resolution,
};
private static DeadLetterReason MapReason(string? failureReason)
=> Enum.TryParse<DeadLetterReason>(failureReason, true, out var parsed)
? parsed
: DeadLetterReason.UnknownError;
private static DeadLetterStatus MapStatus(string? status)
=> status switch
{
nameof(ServiceDeadLetterStatus.Retrying) => DeadLetterStatus.Retrying,
nameof(ServiceDeadLetterStatus.Retried) => DeadLetterStatus.Retried,
nameof(ServiceDeadLetterStatus.Resolved) => DeadLetterStatus.Discarded,
nameof(ServiceDeadLetterStatus.Exhausted) => DeadLetterStatus.Discarded,
_ => DeadLetterStatus.Pending,
};
private static string? SerializePayload(object? payload)
{
if (payload is null)
{
return null;
}
return payload is string text ? text : JsonSerializer.Serialize(payload, JsonOptions);
}
private static object? DeserializePayload(string? payload)
{
if (string.IsNullOrWhiteSpace(payload))
{
return null;
}
try
{
return JsonSerializer.Deserialize<object>(payload, JsonOptions);
}
catch
{
return payload;
}
}
}

View File

@@ -0,0 +1,739 @@
using System.Text.Json;
using System.Text.RegularExpressions;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StellaOps.Notifier.Worker.Storage;
using StellaOps.Notify.Persistence.Postgres.Models;
namespace StellaOps.Notifier.Worker.Security;
public sealed class PostgresTenantIsolationValidator : PostgresNotifyRuntimeServiceBase, ITenantIsolationValidator
{
private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web);
private readonly TenantIsolationOptions _options;
private readonly TimeProvider _timeProvider;
private readonly ILogger<PostgresTenantIsolationValidator> _logger;
private readonly IReadOnlyList<Regex> _adminPatterns;
public PostgresTenantIsolationValidator(
NotifyDurableStorageSupport support,
IOptions<TenantIsolationOptions> options,
TimeProvider timeProvider,
ILogger<PostgresTenantIsolationValidator> logger)
: base(support)
{
_options = options?.Value ?? new TenantIsolationOptions();
_timeProvider = timeProvider ?? TimeProvider.System;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_adminPatterns = _options.AdminTenantPatterns
.Select(pattern => new Regex(pattern, RegexOptions.Compiled | RegexOptions.IgnoreCase))
.ToList();
}
public async Task<TenantValidationResult> ValidateResourceAccessAsync(
string tenantId,
string resourceType,
string resourceId,
TenantAccessOperation operation,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(tenantId))
{
return TenantValidationResult.Denied("Tenant ID is required for validation.");
}
if (IsAdminTenant(tenantId))
{
return TenantValidationResult.Allowed(TenantValidationType.SystemResource);
}
if (_options.SystemResourceTypes.Contains(resourceType))
{
return TenantValidationResult.Allowed(TenantValidationType.SystemResource);
}
var ownership = await InSystemDbContextAsync(
dbContext => dbContext.TenantResourceOwnership
.AsNoTracking()
.FirstOrDefaultAsync(
row => row.ResourceType == resourceType && row.ResourceId == resourceId,
cancellationToken),
cancellationToken)
.ConfigureAwait(false);
if (ownership is null)
{
return TenantValidationResult.Allowed(TenantValidationType.ResourceNotFound);
}
if (string.Equals(ownership.TenantId, tenantId, StringComparison.Ordinal))
{
return TenantValidationResult.Allowed(TenantValidationType.SameTenant);
}
if (_options.AllowCrossTenantGrants)
{
var now = _timeProvider.GetUtcNow();
var grant = await InSystemDbContextAsync(
dbContext => dbContext.CrossTenantGrants
.AsNoTracking()
.FirstOrDefaultAsync(
row => row.OwnerTenantId == ownership.TenantId &&
row.TargetTenantId == tenantId &&
row.ResourceType == resourceType &&
row.ResourceId == resourceId &&
row.IsActive &&
(row.ExpiresAt == null || row.ExpiresAt > now),
cancellationToken),
cancellationToken)
.ConfigureAwait(false);
if (grant is not null && HasOperationFlag(grant.AllowedOperations, operation))
{
return TenantValidationResult.CrossTenantAllowed(grant.GrantId);
}
}
await RecordViolationAsync(
tenantId,
ownership.TenantId,
resourceType,
resourceId,
operation,
cancellationToken)
.ConfigureAwait(false);
return TenantValidationResult.Denied(
$"Tenant {tenantId} does not have access to {resourceType}/{resourceId} owned by tenant {ownership.TenantId}");
}
public Task<TenantValidationResult> ValidateDeliveryAsync(
string tenantId,
string deliveryId,
CancellationToken cancellationToken = default)
=> ValidateResourceAccessAsync(tenantId, "delivery", deliveryId, TenantAccessOperation.Read, cancellationToken);
public Task<TenantValidationResult> ValidateChannelAsync(
string tenantId,
string channelId,
CancellationToken cancellationToken = default)
=> ValidateResourceAccessAsync(tenantId, "channel", channelId, TenantAccessOperation.Read, cancellationToken);
public Task<TenantValidationResult> ValidateTemplateAsync(
string tenantId,
string templateId,
CancellationToken cancellationToken = default)
=> ValidateResourceAccessAsync(tenantId, "template", templateId, TenantAccessOperation.Read, cancellationToken);
public Task<TenantValidationResult> ValidateSubscriptionAsync(
string tenantId,
string subscriptionId,
CancellationToken cancellationToken = default)
=> ValidateResourceAccessAsync(tenantId, "subscription", subscriptionId, TenantAccessOperation.Read, cancellationToken);
public Task<TenantValidationResult> ValidateCrossTenantAccessAsync(
string sourceTenantId,
string targetTenantId,
string resourceType,
string resourceId,
CancellationToken cancellationToken = default)
{
if (string.Equals(sourceTenantId, targetTenantId, StringComparison.Ordinal))
{
return Task.FromResult(TenantValidationResult.Allowed(TenantValidationType.SameTenant));
}
return ValidateResourceAccessAsync(sourceTenantId, resourceType, resourceId, TenantAccessOperation.Read, cancellationToken);
}
public Task RegisterResourceAsync(
string tenantId,
string resourceType,
string resourceId,
CancellationToken cancellationToken = default)
=> InTenantDbContextAsync(
tenantId,
"writer",
async dbContext =>
{
var existing = await dbContext.TenantResourceOwnership
.FirstOrDefaultAsync(
row => row.ResourceType == resourceType && row.ResourceId == resourceId,
cancellationToken)
.ConfigureAwait(false);
var entity = new TenantResourceOwnershipEntity
{
Id = existing?.Id ?? Guid.NewGuid(),
TenantId = tenantId,
ResourceType = resourceType,
ResourceId = resourceId,
Metadata = existing?.Metadata ?? "{}",
RegisteredAt = existing?.RegisteredAt ?? _timeProvider.GetUtcNow(),
};
if (existing is null)
{
dbContext.TenantResourceOwnership.Add(entity);
}
else
{
dbContext.Entry(existing).CurrentValues.SetValues(entity);
}
await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
_logger.LogDebug(
"Registered resource {ResourceType}/{ResourceId} for tenant {TenantId}.",
resourceType,
resourceId,
tenantId);
},
cancellationToken);
public Task UnregisterResourceAsync(
string resourceType,
string resourceId,
CancellationToken cancellationToken = default)
=> InSystemDbContextAsync(
async dbContext =>
{
await dbContext.TenantResourceOwnership
.Where(row => row.ResourceType == resourceType && row.ResourceId == resourceId)
.ExecuteDeleteAsync(cancellationToken)
.ConfigureAwait(false);
await dbContext.CrossTenantGrants
.Where(row => row.ResourceType == resourceType && row.ResourceId == resourceId)
.ExecuteDeleteAsync(cancellationToken)
.ConfigureAwait(false);
},
cancellationToken);
public Task<IReadOnlyList<TenantResource>> GetTenantResourcesAsync(
string tenantId,
string? resourceType = null,
CancellationToken cancellationToken = default)
=> InTenantDbContextAsync(
tenantId,
"reader",
async dbContext =>
{
var query = dbContext.TenantResourceOwnership
.AsNoTracking()
.Where(row => row.TenantId == tenantId);
if (!string.IsNullOrWhiteSpace(resourceType))
{
query = query.Where(row => row.ResourceType == resourceType);
}
return (IReadOnlyList<TenantResource>)await query
.OrderBy(row => row.ResourceType)
.ThenBy(row => row.ResourceId)
.Select(row => new TenantResource
{
TenantId = row.TenantId,
ResourceType = row.ResourceType,
ResourceId = row.ResourceId,
RegisteredAt = row.RegisteredAt,
Metadata = DeserializeDictionary(row.Metadata),
})
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
},
cancellationToken);
public Task GrantCrossTenantAccessAsync(
string ownerTenantId,
string targetTenantId,
string resourceType,
string resourceId,
TenantAccessOperation allowedOperations,
DateTimeOffset? expiresAt,
string grantedBy,
CancellationToken cancellationToken = default)
{
if (!_options.AllowCrossTenantGrants)
{
throw new InvalidOperationException("Cross-tenant grants are disabled.");
}
if (expiresAt.HasValue)
{
var maxExpiry = _timeProvider.GetUtcNow() + _options.MaxGrantDuration;
if (expiresAt > maxExpiry)
{
expiresAt = maxExpiry;
}
}
return InSystemDbContextAsync(
async dbContext =>
{
var existing = await dbContext.CrossTenantGrants
.FirstOrDefaultAsync(
row => row.OwnerTenantId == ownerTenantId &&
row.TargetTenantId == targetTenantId &&
row.ResourceType == resourceType &&
row.ResourceId == resourceId,
cancellationToken)
.ConfigureAwait(false);
var entity = new CrossTenantGrantEntity
{
GrantId = existing?.GrantId ?? $"grant-{Guid.NewGuid():N}"[..20],
OwnerTenantId = ownerTenantId,
TargetTenantId = targetTenantId,
ResourceType = resourceType,
ResourceId = resourceId,
AllowedOperations = (int)allowedOperations,
GrantedAt = existing?.GrantedAt ?? _timeProvider.GetUtcNow(),
GrantedBy = existing?.GrantedBy ?? grantedBy,
ExpiresAt = expiresAt,
IsActive = true,
RevokedAt = null,
RevokedBy = null,
};
if (existing is null)
{
dbContext.CrossTenantGrants.Add(entity);
}
else
{
dbContext.Entry(existing).CurrentValues.SetValues(entity);
}
await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
_logger.LogInformation(
"Granted cross-tenant access for {ResourceType}/{ResourceId} from {OwnerTenant} to {TargetTenant} by {GrantedBy}.",
resourceType,
resourceId,
ownerTenantId,
targetTenantId,
grantedBy);
},
cancellationToken);
}
public Task RevokeCrossTenantAccessAsync(
string ownerTenantId,
string targetTenantId,
string resourceType,
string resourceId,
string revokedBy,
CancellationToken cancellationToken = default)
=> InSystemDbContextAsync(
async dbContext =>
{
var existing = await dbContext.CrossTenantGrants
.FirstOrDefaultAsync(
row => row.OwnerTenantId == ownerTenantId &&
row.TargetTenantId == targetTenantId &&
row.ResourceType == resourceType &&
row.ResourceId == resourceId,
cancellationToken)
.ConfigureAwait(false);
if (existing is null)
{
return;
}
dbContext.Entry(existing).CurrentValues.SetValues(new CrossTenantGrantEntity
{
GrantId = existing.GrantId,
OwnerTenantId = existing.OwnerTenantId,
TargetTenantId = existing.TargetTenantId,
ResourceType = existing.ResourceType,
ResourceId = existing.ResourceId,
AllowedOperations = existing.AllowedOperations,
GrantedAt = existing.GrantedAt,
GrantedBy = existing.GrantedBy,
ExpiresAt = existing.ExpiresAt,
IsActive = false,
RevokedAt = _timeProvider.GetUtcNow(),
RevokedBy = revokedBy,
});
await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
_logger.LogInformation(
"Revoked cross-tenant access {GrantId} for {ResourceType}/{ResourceId} by {RevokedBy}.",
existing.GrantId,
resourceType,
resourceId,
revokedBy);
},
cancellationToken);
public Task<IReadOnlyList<TenantViolation>> GetViolationsAsync(
string? tenantId = null,
DateTimeOffset? since = null,
CancellationToken cancellationToken = default)
=> InSystemDbContextAsync(
async dbContext =>
{
var query = dbContext.TenantIsolationViolations.AsNoTracking().AsQueryable();
if (!string.IsNullOrWhiteSpace(tenantId))
{
query = query.Where(row => row.RequestingTenantId == tenantId);
}
if (since.HasValue)
{
query = query.Where(row => row.OccurredAt >= since.Value);
}
return (IReadOnlyList<TenantViolation>)await query
.OrderByDescending(row => row.OccurredAt)
.Select(row => new TenantViolation
{
ViolationId = row.ViolationId,
RequestingTenantId = row.RequestingTenantId,
ResourceOwnerTenantId = row.ResourceOwnerTenantId,
ResourceType = row.ResourceType,
ResourceId = row.ResourceId,
Operation = (TenantAccessOperation)row.Operation,
OccurredAt = row.OccurredAt,
Context = row.Context,
Severity = (ViolationSeverity)row.Severity,
})
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
},
cancellationToken);
public async Task<TenantFuzzTestResult> RunFuzzTestAsync(
TenantFuzzTestConfig config,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(config);
var startTime = _timeProvider.GetUtcNow();
var random = config.Seed.HasValue ? new Random(config.Seed.Value) : new Random();
var failures = new List<FuzzTestFailure>();
var totalTests = 0;
var passedTests = 0;
var testResources = new List<(string TenantId, string ResourceType, string ResourceId)>();
foreach (var tenantId in config.TenantIds)
{
foreach (var resourceType in config.ResourceTypes)
{
for (var index = 0; index < 3; index++)
{
var resourceId = $"test-{resourceType}-{tenantId}-{index}";
await RegisterResourceAsync(tenantId, resourceType, resourceId, cancellationToken).ConfigureAwait(false);
testResources.Add((tenantId, resourceType, resourceId));
}
}
}
try
{
for (var index = 0; index < config.Iterations; index++)
{
var resource = testResources[random.Next(testResources.Count)];
totalTests++;
var result = await ValidateResourceAccessAsync(
resource.TenantId,
resource.ResourceType,
resource.ResourceId,
TenantAccessOperation.Read,
cancellationToken)
.ConfigureAwait(false);
if (result.IsAllowed)
{
passedTests++;
}
else
{
failures.Add(new FuzzTestFailure
{
TestCase = "Same tenant access",
Expected = "Allowed",
Actual = $"Denied: {result.DenialReason}",
Input = new Dictionary<string, string>
{
["tenantId"] = resource.TenantId,
["resourceType"] = resource.ResourceType,
["resourceId"] = resource.ResourceId,
},
});
}
}
for (var index = 0; index < config.Iterations; index++)
{
var resource = testResources[random.Next(testResources.Count)];
var differentTenant = config.TenantIds
.Where(candidate => !string.Equals(candidate, resource.TenantId, StringComparison.Ordinal))
.OrderBy(_ => random.Next())
.FirstOrDefault();
if (differentTenant is null)
{
continue;
}
totalTests++;
var result = await ValidateResourceAccessAsync(
differentTenant,
resource.ResourceType,
resource.ResourceId,
TenantAccessOperation.Read,
cancellationToken)
.ConfigureAwait(false);
if (!result.IsAllowed)
{
passedTests++;
}
else
{
failures.Add(new FuzzTestFailure
{
TestCase = "Cross-tenant access without grant",
Expected = "Denied",
Actual = "Allowed",
Input = new Dictionary<string, string>
{
["requestingTenantId"] = differentTenant,
["ownerTenantId"] = resource.TenantId,
["resourceType"] = resource.ResourceType,
["resourceId"] = resource.ResourceId,
},
});
}
}
if (config.TestCrossTenantGrants && _options.AllowCrossTenantGrants)
{
for (var index = 0; index < config.Iterations / 2; index++)
{
var resource = testResources[random.Next(testResources.Count)];
var differentTenant = config.TenantIds
.Where(candidate => !string.Equals(candidate, resource.TenantId, StringComparison.Ordinal))
.OrderBy(_ => random.Next())
.FirstOrDefault();
if (differentTenant is null)
{
continue;
}
await GrantCrossTenantAccessAsync(
resource.TenantId,
differentTenant,
resource.ResourceType,
resource.ResourceId,
TenantAccessOperation.Read,
null,
"fuzz-test",
cancellationToken)
.ConfigureAwait(false);
totalTests++;
var result = await ValidateResourceAccessAsync(
differentTenant,
resource.ResourceType,
resource.ResourceId,
TenantAccessOperation.Read,
cancellationToken)
.ConfigureAwait(false);
if (result.IsAllowed && result.IsCrossTenant)
{
passedTests++;
}
else
{
failures.Add(new FuzzTestFailure
{
TestCase = "Cross-tenant access with grant",
Expected = "Allowed (cross-tenant)",
Actual = result.IsAllowed
? "Allowed (not marked cross-tenant)"
: $"Denied: {result.DenialReason}",
Input = new Dictionary<string, string>
{
["requestingTenantId"] = differentTenant,
["ownerTenantId"] = resource.TenantId,
["resourceType"] = resource.ResourceType,
["resourceId"] = resource.ResourceId,
},
});
}
await RevokeCrossTenantAccessAsync(
resource.TenantId,
differentTenant,
resource.ResourceType,
resource.ResourceId,
"fuzz-test",
cancellationToken)
.ConfigureAwait(false);
}
}
if (config.TestEdgeCases)
{
totalTests++;
var emptyResult = await ValidateResourceAccessAsync(
string.Empty,
"delivery",
"test-resource",
TenantAccessOperation.Read,
cancellationToken)
.ConfigureAwait(false);
if (!emptyResult.IsAllowed || emptyResult.ValidationType == TenantValidationType.Denied)
{
passedTests++;
}
else
{
failures.Add(new FuzzTestFailure
{
TestCase = "Empty tenant ID",
Expected = "Denied or handled gracefully",
Actual = "Allowed",
});
}
totalTests++;
_ = await ValidateResourceAccessAsync(
config.TenantIds[0],
"delivery",
"non-existent-resource",
TenantAccessOperation.Read,
cancellationToken)
.ConfigureAwait(false);
passedTests++;
}
}
finally
{
foreach (var resource in testResources)
{
await UnregisterResourceAsync(resource.ResourceType, resource.ResourceId, cancellationToken).ConfigureAwait(false);
}
}
var executionTime = _timeProvider.GetUtcNow() - startTime;
return new TenantFuzzTestResult
{
AllPassed = failures.Count == 0,
TotalTests = totalTests,
PassedTests = passedTests,
FailedTests = failures.Count,
Failures = failures,
ExecutionTime = executionTime,
};
}
private bool IsAdminTenant(string tenantId)
=> _adminPatterns.Any(pattern => pattern.IsMatch(tenantId));
private Task RecordViolationAsync(
string requestingTenantId,
string ownerTenantId,
string resourceType,
string resourceId,
TenantAccessOperation operation,
CancellationToken cancellationToken)
{
if (!_options.RecordViolations)
{
return Task.CompletedTask;
}
return InSystemDbContextAsync(
async dbContext =>
{
var violation = new TenantIsolationViolationEntity
{
ViolationId = $"vio-{Guid.NewGuid():N}"[..16],
RequestingTenantId = requestingTenantId,
ResourceOwnerTenantId = ownerTenantId,
ResourceType = resourceType,
ResourceId = resourceId,
Operation = (int)operation,
OccurredAt = _timeProvider.GetUtcNow(),
Context = null,
Severity = (int)DetermineSeverity(operation),
};
dbContext.TenantIsolationViolations.Add(violation);
await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
if (_options.LogViolations)
{
_logger.LogWarning(
"Tenant isolation violation: Tenant {RequestingTenant} attempted {Operation} on {ResourceType}/{ResourceId} owned by tenant {OwnerTenant}.",
requestingTenantId,
operation,
resourceType,
resourceId,
ownerTenantId);
}
var cutoff = _timeProvider.GetUtcNow() - _options.ViolationRetentionPeriod;
await dbContext.TenantIsolationViolations
.Where(row => row.OccurredAt < cutoff)
.ExecuteDeleteAsync(cancellationToken)
.ConfigureAwait(false);
var totalViolations = await dbContext.TenantIsolationViolations.CountAsync(cancellationToken).ConfigureAwait(false);
if (totalViolations > _options.MaxViolationsRetained)
{
var toRemove = totalViolations - _options.MaxViolationsRetained;
var oldestIds = await dbContext.TenantIsolationViolations
.OrderBy(row => row.OccurredAt)
.ThenBy(row => row.ViolationId)
.Take(toRemove)
.Select(row => row.ViolationId)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
if (oldestIds.Count > 0)
{
await dbContext.TenantIsolationViolations
.Where(row => oldestIds.Contains(row.ViolationId))
.ExecuteDeleteAsync(cancellationToken)
.ConfigureAwait(false);
}
}
},
cancellationToken);
}
private static ViolationSeverity DetermineSeverity(TenantAccessOperation operation)
=> operation switch
{
TenantAccessOperation.Delete => ViolationSeverity.Critical,
TenantAccessOperation.Write => ViolationSeverity.High,
TenantAccessOperation.Execute => ViolationSeverity.High,
TenantAccessOperation.Share => ViolationSeverity.Medium,
_ => ViolationSeverity.Low,
};
private static bool HasOperationFlag(int grantedOperations, TenantAccessOperation requestedOperation)
=> ((TenantAccessOperation)grantedOperations).HasFlag(requestedOperation);
private static IReadOnlyDictionary<string, string> DeserializeDictionary(string? json)
{
if (string.IsNullOrWhiteSpace(json))
{
return new Dictionary<string, string>();
}
return JsonSerializer.Deserialize<Dictionary<string, string>>(json, JsonOptions) ?? new Dictionary<string, string>();
}
}

View File

@@ -0,0 +1,541 @@
using System.Net;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
using StellaOps.Notifier.Worker.Storage;
using StellaOps.Notify.Persistence.Postgres.Models;
namespace StellaOps.Notifier.Worker.Security;
public sealed class PostgresWebhookSecurityService : PostgresNotifyRuntimeServiceBase, IWebhookSecurityService
{
private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web);
private readonly WebhookSecurityOptions _options;
private readonly TimeProvider _timeProvider;
private readonly ILogger<PostgresWebhookSecurityService> _logger;
public PostgresWebhookSecurityService(
NotifyDurableStorageSupport support,
IOptions<WebhookSecurityOptions> options,
TimeProvider timeProvider,
ILogger<PostgresWebhookSecurityService> logger)
: base(support)
{
_options = options?.Value ?? new WebhookSecurityOptions();
_timeProvider = timeProvider ?? TimeProvider.System;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<WebhookValidationResult> ValidateAsync(
WebhookValidationRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
var errors = new List<string>();
var warnings = new List<string>();
var passed = WebhookValidationChecks.None;
var failed = WebhookValidationChecks.None;
var config = await GetConfigAsync(request.TenantId, request.ChannelId, cancellationToken).ConfigureAwait(false);
if (config is null)
{
warnings.Add("No webhook security configuration found; skipping validation.");
return WebhookValidationResult.Valid(WebhookValidationChecks.All, warnings);
}
if (!config.Enabled)
{
warnings.Add("Webhook security configuration is disabled.");
return WebhookValidationResult.Valid(WebhookValidationChecks.All, warnings);
}
if (config.RequireSignature)
{
if (string.IsNullOrEmpty(request.Signature))
{
errors.Add("Missing signature header.");
failed |= WebhookValidationChecks.SignatureValid;
}
else
{
var expectedSignature = GenerateSignature(
request.Body,
config.SecretKey,
config.Algorithm,
config.SignatureFormat,
config.SignaturePrefix);
if (!CompareSignatures(request.Signature, expectedSignature))
{
errors.Add("Invalid signature.");
failed |= WebhookValidationChecks.SignatureValid;
}
else
{
passed |= WebhookValidationChecks.SignatureValid;
}
}
}
else
{
passed |= WebhookValidationChecks.SignatureValid;
}
if (config.EnforceIpAllowlist && !string.IsNullOrWhiteSpace(request.SourceIp))
{
if (!IsIpAllowedInternal(request.SourceIp, config.AllowedIps))
{
errors.Add($"Source IP {request.SourceIp} not in allowlist.");
failed |= WebhookValidationChecks.IpAllowed;
}
else
{
passed |= WebhookValidationChecks.IpAllowed;
}
}
else
{
passed |= WebhookValidationChecks.IpAllowed;
}
if (_options.EnableReplayProtection && request.Timestamp.HasValue)
{
var now = _timeProvider.GetUtcNow();
var age = now - request.Timestamp.Value;
if (age > config.MaxRequestAge)
{
errors.Add($"Request too old ({age.TotalSeconds:F0}s > {config.MaxRequestAge.TotalSeconds:F0}s).");
failed |= WebhookValidationChecks.NotExpired;
}
else if (age < TimeSpan.FromSeconds(-30))
{
errors.Add("Request timestamp is in the future.");
failed |= WebhookValidationChecks.NotExpired;
}
else
{
passed |= WebhookValidationChecks.NotExpired;
}
if (!string.IsNullOrWhiteSpace(request.Signature))
{
var nonceStored = await TryStoreNonceAsync(
request.TenantId,
request.ChannelId,
request.Signature,
now + _options.NonceCacheExpiry,
cancellationToken)
.ConfigureAwait(false);
if (!nonceStored)
{
errors.Add("Duplicate request (replay detected).");
failed |= WebhookValidationChecks.NotReplay;
}
else
{
passed |= WebhookValidationChecks.NotReplay;
}
}
else
{
passed |= WebhookValidationChecks.NotReplay;
}
}
else
{
passed |= WebhookValidationChecks.NotExpired | WebhookValidationChecks.NotReplay;
}
if (errors.Count > 0)
{
_logger.LogWarning(
"Webhook validation failed for tenant {TenantId} channel {ChannelId}: {Errors}",
request.TenantId,
request.ChannelId,
string.Join("; ", errors));
return WebhookValidationResult.Invalid(passed, failed, errors);
}
return WebhookValidationResult.Valid(passed, warnings.Count > 0 ? warnings : null);
}
public string GenerateSignature(string payload, string secretKey)
=> GenerateSignature(payload, secretKey, _options.DefaultAlgorithm, "hex", null);
public Task RegisterWebhookAsync(
WebhookSecurityConfig config,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(config);
return InTenantDbContextAsync(
config.TenantId,
"writer",
async dbContext =>
{
var existing = await dbContext.WebhookSecurityConfigs
.FirstOrDefaultAsync(
row => row.TenantId == config.TenantId && row.ChannelId == config.ChannelId,
cancellationToken)
.ConfigureAwait(false);
var now = _timeProvider.GetUtcNow();
var entity = new WebhookSecurityConfigEntity
{
Id = existing?.Id ?? Guid.NewGuid(),
TenantId = config.TenantId,
ChannelId = config.ChannelId,
SecretKey = config.SecretKey,
Algorithm = config.Algorithm,
SignatureHeader = config.SignatureHeader,
SignatureFormat = config.SignatureFormat,
SignaturePrefix = config.SignaturePrefix,
TimestampHeader = config.TimestampHeader,
MaxRequestAgeSeconds = (int)Math.Round(config.MaxRequestAge.TotalSeconds),
EnforceIpAllowlist = config.EnforceIpAllowlist,
AllowedIps = Serialize(config.AllowedIps ?? Array.Empty<string>()),
RequireSignature = config.RequireSignature,
Enabled = config.Enabled,
CreatedAt = existing?.CreatedAt ?? (config.CreatedAt == default ? now : config.CreatedAt),
UpdatedAt = now,
};
if (existing is null)
{
dbContext.WebhookSecurityConfigs.Add(entity);
}
else
{
dbContext.Entry(existing).CurrentValues.SetValues(entity);
}
await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
_logger.LogInformation(
"Registered webhook security config for tenant {TenantId} channel {ChannelId}.",
config.TenantId,
config.ChannelId);
},
cancellationToken);
}
public Task<WebhookSecurityConfig?> GetConfigAsync(
string tenantId,
string channelId,
CancellationToken cancellationToken = default)
=> InTenantDbContextAsync(
tenantId,
"reader",
async dbContext =>
{
var entity = await dbContext.WebhookSecurityConfigs
.AsNoTracking()
.FirstOrDefaultAsync(
row => row.TenantId == tenantId && row.ChannelId == channelId,
cancellationToken)
.ConfigureAwait(false);
return entity is null ? null : MapConfig(entity);
},
cancellationToken);
public async Task UpdateAllowlistAsync(
string tenantId,
string channelId,
IReadOnlyList<string> allowedIps,
string actor,
CancellationToken cancellationToken = default)
{
var config = await GetConfigAsync(tenantId, channelId, cancellationToken).ConfigureAwait(false);
if (config is null)
{
throw new InvalidOperationException($"No config found for tenant {tenantId} channel {channelId}");
}
await RegisterWebhookAsync(
config with
{
AllowedIps = allowedIps,
UpdatedAt = _timeProvider.GetUtcNow(),
},
cancellationToken)
.ConfigureAwait(false);
_logger.LogInformation(
"Updated IP allowlist for tenant {TenantId} channel {ChannelId} by {Actor}. IPs: {IpCount}",
tenantId,
channelId,
actor,
allowedIps.Count);
}
public async Task<bool> IsIpAllowedAsync(
string tenantId,
string channelId,
string ipAddress,
CancellationToken cancellationToken = default)
{
var config = await GetConfigAsync(tenantId, channelId, cancellationToken).ConfigureAwait(false);
if (config is null || !config.EnforceIpAllowlist)
{
return true;
}
return IsIpAllowedInternal(ipAddress, config.AllowedIps);
}
public async Task<WebhookSecretRotationResult> RotateSecretAsync(
string tenantId,
string channelId,
CancellationToken cancellationToken = default)
{
var now = _timeProvider.GetUtcNow();
var existing = await GetConfigAsync(tenantId, channelId, cancellationToken).ConfigureAwait(false);
var newSecret = Convert.ToHexString(RandomNumberGenerator.GetBytes(16)).ToLowerInvariant();
var updatedConfig = existing is null
? new WebhookSecurityConfig
{
ConfigId = $"wh-{Guid.NewGuid():N}"[..16],
TenantId = tenantId,
ChannelId = channelId,
SecretKey = newSecret,
Algorithm = _options.DefaultAlgorithm,
SignatureHeader = _options.DefaultSignatureHeader,
SignatureFormat = "hex",
MaxRequestAge = _options.DefaultMaxRequestAge,
CreatedAt = now,
UpdatedAt = now,
}
: existing with
{
SecretKey = newSecret,
UpdatedAt = now,
};
await RegisterWebhookAsync(updatedConfig, cancellationToken).ConfigureAwait(false);
return new WebhookSecretRotationResult
{
Success = true,
NewSecret = newSecret,
ActiveAt = now,
OldSecretExpiresAt = null,
};
}
public string? GetMaskedSecret(string tenantId, string channelId)
{
var config = GetConfigAsync(tenantId, channelId, CancellationToken.None)
.ConfigureAwait(false)
.GetAwaiter()
.GetResult();
if (config is null || string.IsNullOrWhiteSpace(config.SecretKey))
{
return null;
}
var secret = config.SecretKey;
if (secret.Length <= 4)
{
return "****";
}
return $"{secret[..2]}****{secret[^2..]}";
}
private Task<bool> TryStoreNonceAsync(
string tenantId,
string channelId,
string nonce,
DateTimeOffset expiresAt,
CancellationToken cancellationToken)
=> InTenantDbContextAsync(
tenantId,
"writer",
async dbContext =>
{
var now = _timeProvider.GetUtcNow();
await dbContext.WebhookValidationNonces
.Where(row => row.ExpiresAt <= now)
.ExecuteDeleteAsync(cancellationToken)
.ConfigureAwait(false);
dbContext.WebhookValidationNonces.Add(new WebhookValidationNonceEntity
{
Id = Guid.NewGuid(),
TenantId = tenantId,
ChannelId = channelId,
Nonce = nonce,
CreatedAt = now,
ExpiresAt = expiresAt,
});
try
{
await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
return true;
}
catch (DbUpdateException exception) when (IsUniqueViolation(exception))
{
return false;
}
},
cancellationToken);
private bool IsIpAllowedInternal(string ipAddress, IReadOnlyList<string> allowedIps)
{
if (!IPAddress.TryParse(ipAddress, out var ip))
{
return false;
}
foreach (var allowedIp in _options.GlobalAllowedIps)
{
if (IpMatchesPattern(ip, allowedIp))
{
return true;
}
}
foreach (var allowedIp in allowedIps)
{
if (IpMatchesPattern(ip, allowedIp))
{
return true;
}
}
return false;
}
private static bool IpMatchesPattern(IPAddress ip, string pattern)
{
if (string.IsNullOrWhiteSpace(pattern))
{
return false;
}
if (pattern.Contains('/'))
{
return IsInCidrRange(ip, pattern);
}
return IPAddress.TryParse(pattern, out var allowedIp) && ip.Equals(allowedIp);
}
private static bool IsInCidrRange(IPAddress ip, string cidr)
{
var parts = cidr.Split('/', 2, StringSplitOptions.TrimEntries);
if (parts.Length != 2 ||
!IPAddress.TryParse(parts[0], out var network) ||
!int.TryParse(parts[1], out var prefixLength))
{
return false;
}
var ipBytes = ip.GetAddressBytes();
var networkBytes = network.GetAddressBytes();
if (ipBytes.Length != networkBytes.Length)
{
return false;
}
var fullBytes = prefixLength / 8;
var remainingBits = prefixLength % 8;
for (var index = 0; index < fullBytes; index++)
{
if (ipBytes[index] != networkBytes[index])
{
return false;
}
}
if (remainingBits == 0)
{
return true;
}
var mask = (byte)(0xFF << (8 - remainingBits));
return (ipBytes[fullBytes] & mask) == (networkBytes[fullBytes] & mask);
}
private static bool CompareSignatures(string actual, string expected)
{
var actualBytes = Encoding.UTF8.GetBytes(actual);
var expectedBytes = Encoding.UTF8.GetBytes(expected);
return CryptographicOperations.FixedTimeEquals(actualBytes, expectedBytes);
}
private static string GenerateSignature(
string payload,
string secretKey,
string algorithm,
string format,
string? prefix)
{
var keyBytes = Encoding.UTF8.GetBytes(secretKey);
var payloadBytes = Encoding.UTF8.GetBytes(payload);
using var hmac = CreateHmac(algorithm, keyBytes);
var hash = hmac.ComputeHash(payloadBytes);
var signature = format.ToLowerInvariant() switch
{
"base64" => Convert.ToBase64String(hash),
"base64url" => Convert.ToBase64String(hash).Replace('+', '-').Replace('/', '_').TrimEnd('='),
_ => Convert.ToHexString(hash).ToLowerInvariant(),
};
return prefix is not null ? $"{prefix}{signature}" : signature;
}
private static HMAC CreateHmac(string algorithm, byte[] key)
=> algorithm.ToUpperInvariant() switch
{
"SHA384" => new HMACSHA384(key),
"SHA512" => new HMACSHA512(key),
_ => new HMACSHA256(key),
};
private static WebhookSecurityConfig MapConfig(WebhookSecurityConfigEntity entity)
{
var allowedIps = DeserializeList(entity.AllowedIps);
return new WebhookSecurityConfig
{
ConfigId = entity.Id.ToString("N"),
TenantId = entity.TenantId,
ChannelId = entity.ChannelId,
SecretKey = entity.SecretKey,
Algorithm = entity.Algorithm,
SignatureHeader = entity.SignatureHeader,
SignatureFormat = entity.SignatureFormat,
SignaturePrefix = entity.SignaturePrefix,
TimestampHeader = entity.TimestampHeader,
MaxRequestAge = TimeSpan.FromSeconds(entity.MaxRequestAgeSeconds),
EnforceIpAllowlist = entity.EnforceIpAllowlist,
AllowedIps = allowedIps,
RequireSignature = entity.RequireSignature,
Enabled = entity.Enabled,
CreatedAt = entity.CreatedAt,
UpdatedAt = entity.UpdatedAt,
};
}
private static bool IsUniqueViolation(DbUpdateException exception)
=> exception.InnerException is PostgresException postgres && postgres.SqlState == PostgresErrorCodes.UniqueViolation;
private static string Serialize<T>(T value) => JsonSerializer.Serialize(value, JsonOptions);
private static IReadOnlyList<string> DeserializeList(string? json)
=> string.IsNullOrWhiteSpace(json)
? Array.Empty<string>()
: (JsonSerializer.Deserialize<List<string>>(json, JsonOptions) ?? new List<string>());
}