Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
api-governance / spectral-lint (push) Has been cancelled
oas-ci / oas-validate (push) Has been cancelled
Policy Lint & Smoke / policy-lint (push) Has been cancelled
Policy Simulation / policy-simulate (push) Has been cancelled
SDK Publish & Sign / sdk-publish (push) Has been cancelled
350 lines
14 KiB
C#
350 lines
14 KiB
C#
using System.Collections.Concurrent;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
|
|
namespace StellaOps.Notifier.Worker.Observability;
|
|
|
|
/// <summary>
|
|
/// Handles notifications that have failed permanently after all retries.
|
|
/// </summary>
|
|
public interface IDeadLetterHandler
|
|
{
|
|
/// <summary>
|
|
/// Moves a delivery to the dead-letter queue.
|
|
/// </summary>
|
|
Task<DeadLetteredDelivery> DeadLetterAsync(
|
|
string tenantId,
|
|
string deliveryId,
|
|
DeadLetterReason reason,
|
|
string channelType,
|
|
object? payload = null,
|
|
Exception? exception = null,
|
|
CancellationToken cancellationToken = default);
|
|
|
|
/// <summary>
|
|
/// Gets dead-lettered deliveries for a tenant.
|
|
/// </summary>
|
|
Task<IReadOnlyList<DeadLetteredDelivery>> GetAsync(
|
|
string tenantId,
|
|
DeadLetterQuery? query = null,
|
|
CancellationToken cancellationToken = default);
|
|
|
|
/// <summary>
|
|
/// Retries a dead-lettered delivery.
|
|
/// </summary>
|
|
Task<DeadLetterRetryResult> RetryAsync(
|
|
string tenantId,
|
|
string deadLetterId,
|
|
CancellationToken cancellationToken = default);
|
|
|
|
/// <summary>
|
|
/// Retries all matching dead-lettered deliveries.
|
|
/// </summary>
|
|
Task<DeadLetterBulkRetryResult> RetryBulkAsync(
|
|
string tenantId,
|
|
DeadLetterQuery? query = null,
|
|
CancellationToken cancellationToken = default);
|
|
|
|
/// <summary>
|
|
/// Discards a dead-lettered delivery.
|
|
/// </summary>
|
|
Task<bool> DiscardAsync(
|
|
string tenantId,
|
|
string deadLetterId,
|
|
string? reason = null,
|
|
CancellationToken cancellationToken = default);
|
|
|
|
/// <summary>
|
|
/// Gets statistics about dead-lettered deliveries.
|
|
/// </summary>
|
|
Task<DeadLetterStats> GetStatsAsync(
|
|
string? tenantId = null,
|
|
CancellationToken cancellationToken = default);
|
|
|
|
/// <summary>
|
|
/// Purges old dead-lettered deliveries.
|
|
/// </summary>
|
|
Task<int> PurgeAsync(
|
|
string? tenantId,
|
|
TimeSpan olderThan,
|
|
CancellationToken cancellationToken = default);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Reason for dead-lettering.
|
|
/// </summary>
|
|
public enum DeadLetterReason
|
|
{
|
|
MaxRetriesExceeded,
|
|
InvalidPayload,
|
|
ChannelUnavailable,
|
|
AuthenticationFailed,
|
|
RateLimited,
|
|
TemplateRenderFailed,
|
|
ConfigurationError,
|
|
UnknownError
|
|
}
|
|
|
|
/// <summary>
|
|
/// A dead-lettered delivery.
|
|
/// </summary>
|
|
public sealed record DeadLetteredDelivery
|
|
{
|
|
public required string DeadLetterId { get; init; }
|
|
public required string TenantId { get; init; }
|
|
public required string DeliveryId { get; init; }
|
|
public required string ChannelType { get; init; }
|
|
public required DeadLetterReason Reason { get; init; }
|
|
public string? ReasonDetails { get; init; }
|
|
public object? OriginalPayload { get; init; }
|
|
public string? ExceptionType { get; init; }
|
|
public string? ExceptionMessage { get; init; }
|
|
public int AttemptCount { get; init; }
|
|
public DateTimeOffset FirstAttemptAt { get; init; }
|
|
public DateTimeOffset DeadLetteredAt { get; init; }
|
|
public DateTimeOffset? LastRetryAt { get; init; }
|
|
public int RetryCount { get; init; }
|
|
public DeadLetterStatus Status { get; init; } = DeadLetterStatus.Pending;
|
|
public string? DiscardReason { get; init; }
|
|
}
|
|
|
|
/// <summary>
|
|
/// Status of a dead-lettered delivery.
|
|
/// </summary>
|
|
public enum DeadLetterStatus
|
|
{
|
|
Pending,
|
|
Retrying,
|
|
Retried,
|
|
Discarded
|
|
}
|
|
|
|
/// <summary>
|
|
/// Query for dead-lettered deliveries.
|
|
/// </summary>
|
|
public sealed record DeadLetterQuery
|
|
{
|
|
public DeadLetterReason? Reason { get; init; }
|
|
public string? ChannelType { get; init; }
|
|
public DeadLetterStatus? Status { get; init; }
|
|
public DateTimeOffset? After { get; init; }
|
|
public DateTimeOffset? Before { get; init; }
|
|
public int Limit { get; init; } = 100;
|
|
public int Offset { get; init; }
|
|
}
|
|
|
|
/// <summary>
|
|
/// Result of a retry attempt.
|
|
/// </summary>
|
|
public sealed record DeadLetterRetryResult
|
|
{
|
|
public required string DeadLetterId { get; init; }
|
|
public bool Success { get; init; }
|
|
public string? Error { get; init; }
|
|
public DeadLetterStatus NewStatus { get; init; }
|
|
}
|
|
|
|
/// <summary>
|
|
/// Result of a bulk retry operation.
|
|
/// </summary>
|
|
public sealed record DeadLetterBulkRetryResult
|
|
{
|
|
public int Total { get; init; }
|
|
public int Succeeded { get; init; }
|
|
public int Failed { get; init; }
|
|
public IReadOnlyList<DeadLetterRetryResult> Results { get; init; } = [];
|
|
}
|
|
|
|
/// <summary>
|
|
/// Statistics about dead-lettered deliveries.
|
|
/// </summary>
|
|
public sealed record DeadLetterStats
|
|
{
|
|
public DateTimeOffset Timestamp { get; init; }
|
|
public string? TenantId { get; init; }
|
|
public int TotalCount { get; init; }
|
|
public int PendingCount { get; init; }
|
|
public int RetryingCount { get; init; }
|
|
public int RetriedCount { get; init; }
|
|
public int DiscardedCount { get; init; }
|
|
public IReadOnlyDictionary<DeadLetterReason, int> ByReason { get; init; } = new Dictionary<DeadLetterReason, int>();
|
|
public IReadOnlyDictionary<string, int> ByChannel { get; init; } = new Dictionary<string, int>();
|
|
public DateTimeOffset? OldestDeadLetterAt { get; init; }
|
|
public DateTimeOffset? NewestDeadLetterAt { get; init; }
|
|
}
|
|
|
|
/// <summary>
|
|
/// Options for dead-letter handling.
|
|
/// </summary>
|
|
public sealed class DeadLetterOptions
|
|
{
|
|
public const string SectionName = "Notifier:Observability:DeadLetter";
|
|
|
|
public bool Enabled { get; set; } = true;
|
|
public int MaxRetryAttempts { get; set; } = 3;
|
|
public TimeSpan RetryDelay { get; set; } = TimeSpan.FromMinutes(5);
|
|
public TimeSpan RetentionPeriod { get; set; } = TimeSpan.FromDays(30);
|
|
public bool AutoPurge { get; set; } = true;
|
|
public TimeSpan PurgeInterval { get; set; } = TimeSpan.FromHours(24);
|
|
public int AlertThreshold { get; set; } = 100;
|
|
}
|
|
|
|
/// <summary>
|
|
/// In-memory implementation of dead-letter handler.
|
|
/// </summary>
|
|
public sealed class InMemoryDeadLetterHandler : IDeadLetterHandler
|
|
{
|
|
private readonly ConcurrentDictionary<string, List<DeadLetteredDelivery>> _deadLetters = new();
|
|
private readonly DeadLetterOptions _options;
|
|
private readonly TimeProvider _timeProvider;
|
|
private readonly INotifierMetrics? _metrics;
|
|
private readonly ILogger<InMemoryDeadLetterHandler> _logger;
|
|
|
|
public InMemoryDeadLetterHandler(
|
|
IOptions<DeadLetterOptions> options,
|
|
TimeProvider timeProvider,
|
|
INotifierMetrics? metrics,
|
|
ILogger<InMemoryDeadLetterHandler> logger)
|
|
{
|
|
_options = options?.Value ?? new DeadLetterOptions();
|
|
_timeProvider = timeProvider ?? TimeProvider.System;
|
|
_metrics = metrics;
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
}
|
|
|
|
public Task<DeadLetteredDelivery> DeadLetterAsync(
|
|
string tenantId,
|
|
string deliveryId,
|
|
DeadLetterReason reason,
|
|
string channelType,
|
|
object? payload = null,
|
|
Exception? exception = null,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
var now = _timeProvider.GetUtcNow();
|
|
var deadLetter = new DeadLetteredDelivery
|
|
{
|
|
DeadLetterId = $"dl-{Guid.NewGuid():N}"[..16],
|
|
TenantId = tenantId,
|
|
DeliveryId = deliveryId,
|
|
ChannelType = channelType,
|
|
Reason = reason,
|
|
ReasonDetails = exception?.Message,
|
|
OriginalPayload = payload,
|
|
ExceptionType = exception?.GetType().FullName,
|
|
ExceptionMessage = exception?.Message,
|
|
DeadLetteredAt = now,
|
|
FirstAttemptAt = now,
|
|
Status = DeadLetterStatus.Pending
|
|
};
|
|
|
|
var list = _deadLetters.GetOrAdd(tenantId, _ => []);
|
|
lock (list) { list.Add(deadLetter); }
|
|
|
|
_metrics?.RecordDeadLetter(tenantId, reason.ToString(), channelType);
|
|
_logger.LogWarning("Dead-lettered delivery {DeliveryId} for tenant {TenantId}: {Reason}", deliveryId, tenantId, reason);
|
|
|
|
return Task.FromResult(deadLetter);
|
|
}
|
|
|
|
public Task<IReadOnlyList<DeadLetteredDelivery>> GetAsync(
|
|
string tenantId,
|
|
DeadLetterQuery? query = null,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (!_deadLetters.TryGetValue(tenantId, out var list))
|
|
return Task.FromResult<IReadOnlyList<DeadLetteredDelivery>>([]);
|
|
|
|
IEnumerable<DeadLetteredDelivery> filtered;
|
|
lock (list) { filtered = list.ToList(); }
|
|
|
|
if (query is not null)
|
|
{
|
|
if (query.Reason.HasValue) filtered = filtered.Where(d => d.Reason == query.Reason.Value);
|
|
if (!string.IsNullOrEmpty(query.ChannelType)) filtered = filtered.Where(d => d.ChannelType == query.ChannelType);
|
|
if (query.Status.HasValue) filtered = filtered.Where(d => d.Status == query.Status.Value);
|
|
if (query.After.HasValue) filtered = filtered.Where(d => d.DeadLetteredAt > query.After.Value);
|
|
if (query.Before.HasValue) filtered = filtered.Where(d => d.DeadLetteredAt < query.Before.Value);
|
|
}
|
|
|
|
var result = filtered.OrderByDescending(d => d.DeadLetteredAt).Skip(query?.Offset ?? 0).Take(query?.Limit ?? 100).ToList();
|
|
return Task.FromResult<IReadOnlyList<DeadLetteredDelivery>>(result);
|
|
}
|
|
|
|
public Task<DeadLetterRetryResult> RetryAsync(string tenantId, string deadLetterId, CancellationToken cancellationToken = default)
|
|
{
|
|
if (!_deadLetters.TryGetValue(tenantId, out var list))
|
|
return Task.FromResult(new DeadLetterRetryResult { DeadLetterId = deadLetterId, Success = false, Error = "Not found", NewStatus = DeadLetterStatus.Pending });
|
|
|
|
DeadLetteredDelivery? deadLetter;
|
|
lock (list) { deadLetter = list.FirstOrDefault(d => d.DeadLetterId == deadLetterId); }
|
|
if (deadLetter is null)
|
|
return Task.FromResult(new DeadLetterRetryResult { DeadLetterId = deadLetterId, Success = false, Error = "Not found", NewStatus = DeadLetterStatus.Pending });
|
|
|
|
lock (list)
|
|
{
|
|
var index = list.FindIndex(d => d.DeadLetterId == deadLetterId);
|
|
if (index >= 0)
|
|
list[index] = deadLetter with { Status = DeadLetterStatus.Retried, LastRetryAt = _timeProvider.GetUtcNow(), RetryCount = deadLetter.RetryCount + 1 };
|
|
}
|
|
|
|
_logger.LogInformation("Retrying dead-lettered delivery {DeadLetterId} for tenant {TenantId}", deadLetterId, tenantId);
|
|
return Task.FromResult(new DeadLetterRetryResult { DeadLetterId = deadLetterId, Success = true, NewStatus = DeadLetterStatus.Retried });
|
|
}
|
|
|
|
public async Task<DeadLetterBulkRetryResult> RetryBulkAsync(string tenantId, DeadLetterQuery? query = null, CancellationToken cancellationToken = default)
|
|
{
|
|
var deadLetters = await GetAsync(tenantId, query, cancellationToken);
|
|
var results = new List<DeadLetterRetryResult>();
|
|
foreach (var dl in deadLetters.Where(d => d.Status == DeadLetterStatus.Pending))
|
|
results.Add(await RetryAsync(tenantId, dl.DeadLetterId, cancellationToken));
|
|
return new DeadLetterBulkRetryResult { Total = results.Count, Succeeded = results.Count(r => r.Success), Failed = results.Count(r => !r.Success), Results = results };
|
|
}
|
|
|
|
public Task<bool> DiscardAsync(string tenantId, string deadLetterId, string? reason = null, CancellationToken cancellationToken = default)
|
|
{
|
|
if (!_deadLetters.TryGetValue(tenantId, out var list)) return Task.FromResult(false);
|
|
lock (list)
|
|
{
|
|
var index = list.FindIndex(d => d.DeadLetterId == deadLetterId);
|
|
if (index < 0) return Task.FromResult(false);
|
|
list[index] = list[index] with { Status = DeadLetterStatus.Discarded, DiscardReason = reason };
|
|
}
|
|
_logger.LogInformation("Discarded dead-lettered delivery {DeadLetterId} for tenant {TenantId}: {Reason}", deadLetterId, tenantId, reason ?? "No reason");
|
|
return Task.FromResult(true);
|
|
}
|
|
|
|
public Task<DeadLetterStats> GetStatsAsync(string? tenantId = null, CancellationToken cancellationToken = default)
|
|
{
|
|
var all = tenantId is not null ? (_deadLetters.TryGetValue(tenantId, out var l) ? l.ToList() : []) : _deadLetters.Values.SelectMany(v => v).ToList();
|
|
return Task.FromResult(new DeadLetterStats
|
|
{
|
|
Timestamp = _timeProvider.GetUtcNow(),
|
|
TenantId = tenantId,
|
|
TotalCount = all.Count,
|
|
PendingCount = all.Count(d => d.Status == DeadLetterStatus.Pending),
|
|
RetryingCount = all.Count(d => d.Status == DeadLetterStatus.Retrying),
|
|
RetriedCount = all.Count(d => d.Status == DeadLetterStatus.Retried),
|
|
DiscardedCount = all.Count(d => d.Status == DeadLetterStatus.Discarded),
|
|
ByReason = all.GroupBy(d => d.Reason).ToDictionary(g => g.Key, g => g.Count()),
|
|
ByChannel = all.GroupBy(d => d.ChannelType).ToDictionary(g => g.Key, g => g.Count()),
|
|
OldestDeadLetterAt = all.MinBy(d => d.DeadLetteredAt)?.DeadLetteredAt,
|
|
NewestDeadLetterAt = all.MaxBy(d => d.DeadLetteredAt)?.DeadLetteredAt
|
|
});
|
|
}
|
|
|
|
public Task<int> PurgeAsync(string? tenantId, TimeSpan olderThan, CancellationToken cancellationToken = default)
|
|
{
|
|
var cutoff = _timeProvider.GetUtcNow() - olderThan;
|
|
var purged = 0;
|
|
var tenants = tenantId is not null ? [tenantId] : _deadLetters.Keys.ToList();
|
|
foreach (var t in tenants)
|
|
{
|
|
if (!_deadLetters.TryGetValue(t, out var list)) continue;
|
|
lock (list) { purged += list.RemoveAll(d => d.DeadLetteredAt < cutoff); }
|
|
}
|
|
_logger.LogInformation("Purged {Count} dead-lettered deliveries older than {OlderThan}", purged, olderThan);
|
|
return Task.FromResult(purged);
|
|
}
|
|
}
|