up
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
AOC Guard CI / aoc-guard (push) Has been cancelled
Policy Lint & Smoke / policy-lint (push) Has been cancelled
SDK Publish & Sign / sdk-publish (push) Has been cancelled
sdk-generator-smoke / sdk-smoke (push) Has been cancelled

This commit is contained in:
StellaOps Bot
2025-11-27 08:51:10 +02:00
parent ea970ead2a
commit c34fb7256d
126 changed files with 18553 additions and 693 deletions

View File

@@ -0,0 +1,298 @@
using System.Collections.Concurrent;
using Microsoft.Extensions.Logging;
using StellaOps.Notifier.Worker.DeadLetter;
using StellaOps.Notifier.Worker.Observability;
namespace StellaOps.Notifier.Worker.Retention;
/// <summary>
/// Default implementation of retention policy service.
/// </summary>
public sealed class DefaultRetentionPolicyService : IRetentionPolicyService
{
private readonly ConcurrentDictionary<string, RetentionPolicy> _policies = new();
private readonly ConcurrentDictionary<string, RetentionCleanupExecution> _lastExecutions = new();
private readonly IDeadLetterService _deadLetterService;
private readonly TimeProvider _timeProvider;
private readonly INotifyMetrics? _metrics;
private readonly ILogger<DefaultRetentionPolicyService> _logger;
public DefaultRetentionPolicyService(
IDeadLetterService deadLetterService,
TimeProvider timeProvider,
ILogger<DefaultRetentionPolicyService> logger,
INotifyMetrics? metrics = null)
{
_deadLetterService = deadLetterService ?? throw new ArgumentNullException(nameof(deadLetterService));
_timeProvider = timeProvider ?? TimeProvider.System;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_metrics = metrics;
}
public Task<RetentionPolicy> GetPolicyAsync(
string tenantId,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
var policy = _policies.GetValueOrDefault(tenantId, RetentionPolicy.Default);
return Task.FromResult(policy);
}
public Task SetPolicyAsync(
string tenantId,
RetentionPolicy policy,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
ArgumentNullException.ThrowIfNull(policy);
_policies[tenantId] = policy;
_logger.LogInformation(
"Updated retention policy for tenant {TenantId}: DeliveryRetention={DeliveryRetention}, AuditRetention={AuditRetention}",
tenantId, policy.DeliveryRetention, policy.AuditRetention);
return Task.CompletedTask;
}
public async Task<RetentionCleanupResult> ExecuteCleanupAsync(
string tenantId,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
var executionId = Guid.NewGuid().ToString("N");
var startedAt = _timeProvider.GetUtcNow();
var policy = await GetPolicyAsync(tenantId, cancellationToken).ConfigureAwait(false);
var execution = new RetentionCleanupExecution
{
ExecutionId = executionId,
TenantId = tenantId,
StartedAt = startedAt,
Status = RetentionCleanupStatus.Running,
PolicyUsed = policy
};
_lastExecutions[tenantId] = execution;
_logger.LogInformation(
"Starting retention cleanup {ExecutionId} for tenant {TenantId}",
executionId, tenantId);
try
{
var counts = await ExecuteCleanupInternalAsync(tenantId, policy, cancellationToken)
.ConfigureAwait(false);
var completedAt = _timeProvider.GetUtcNow();
var duration = completedAt - startedAt;
execution = execution with
{
CompletedAt = completedAt,
Status = RetentionCleanupStatus.Completed,
Counts = counts
};
_lastExecutions[tenantId] = execution;
_logger.LogInformation(
"Completed retention cleanup {ExecutionId} for tenant {TenantId}: {Total} items deleted in {Duration}ms",
executionId, tenantId, counts.Total, duration.TotalMilliseconds);
return new RetentionCleanupResult
{
TenantId = tenantId,
Success = true,
ExecutedAt = startedAt,
Duration = duration,
Counts = counts
};
}
catch (OperationCanceledException)
{
execution = execution with
{
CompletedAt = _timeProvider.GetUtcNow(),
Status = RetentionCleanupStatus.Cancelled,
Error = "Operation was cancelled"
};
_lastExecutions[tenantId] = execution;
_logger.LogWarning(
"Retention cleanup {ExecutionId} for tenant {TenantId} was cancelled",
executionId, tenantId);
return new RetentionCleanupResult
{
TenantId = tenantId,
Success = false,
Error = "Operation was cancelled",
ExecutedAt = startedAt,
Duration = _timeProvider.GetUtcNow() - startedAt,
Counts = new RetentionCleanupCounts()
};
}
catch (Exception ex)
{
execution = execution with
{
CompletedAt = _timeProvider.GetUtcNow(),
Status = RetentionCleanupStatus.Failed,
Error = ex.Message
};
_lastExecutions[tenantId] = execution;
_logger.LogError(ex,
"Retention cleanup {ExecutionId} for tenant {TenantId} failed",
executionId, tenantId);
return new RetentionCleanupResult
{
TenantId = tenantId,
Success = false,
Error = ex.Message,
ExecutedAt = startedAt,
Duration = _timeProvider.GetUtcNow() - startedAt,
Counts = new RetentionCleanupCounts()
};
}
}
public async Task<IReadOnlyList<RetentionCleanupResult>> ExecuteCleanupAllAsync(
CancellationToken cancellationToken = default)
{
var tenantIds = _policies.Keys.ToArray();
var results = new List<RetentionCleanupResult>();
foreach (var tenantId in tenantIds)
{
cancellationToken.ThrowIfCancellationRequested();
var result = await ExecuteCleanupAsync(tenantId, cancellationToken).ConfigureAwait(false);
results.Add(result);
}
_logger.LogInformation(
"Completed retention cleanup for {Count} tenants: {Successful} successful, {Failed} failed",
results.Count, results.Count(r => r.Success), results.Count(r => !r.Success));
return results;
}
public Task<RetentionCleanupExecution?> GetLastExecutionAsync(
string tenantId,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
_lastExecutions.TryGetValue(tenantId, out var execution);
return Task.FromResult(execution);
}
public async Task<RetentionCleanupPreview> PreviewCleanupAsync(
string tenantId,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
var policy = await GetPolicyAsync(tenantId, cancellationToken).ConfigureAwait(false);
var now = _timeProvider.GetUtcNow();
var cutoffDates = new Dictionary<string, DateTimeOffset>
{
["Deliveries"] = now - policy.DeliveryRetention,
["AuditEntries"] = now - policy.AuditRetention,
["DeadLetterEntries"] = now - policy.DeadLetterRetention,
["StormData"] = now - policy.StormDataRetention,
["InboxMessages"] = now - policy.InboxRetention,
["Events"] = now - policy.EventHistoryRetention
};
// Get estimated dead-letter count
var deadLetterStats = await _deadLetterService.GetStatsAsync(tenantId, cancellationToken)
.ConfigureAwait(false);
// Estimate counts based on age distribution (simplified - in production would query actual counts)
var estimatedCounts = new RetentionCleanupCounts
{
DeadLetterEntries = EstimateExpiredCount(deadLetterStats, policy.DeadLetterRetention, now)
};
return new RetentionCleanupPreview
{
TenantId = tenantId,
PreviewedAt = now,
EstimatedCounts = estimatedCounts,
PolicyApplied = policy,
CutoffDates = cutoffDates
};
}
private async Task<RetentionCleanupCounts> ExecuteCleanupInternalAsync(
string tenantId,
RetentionPolicy policy,
CancellationToken cancellationToken)
{
var deadLetterCount = 0;
// Purge expired dead-letter entries
deadLetterCount = await _deadLetterService.PurgeExpiredAsync(
tenantId,
policy.DeadLetterRetention,
cancellationToken).ConfigureAwait(false);
if (deadLetterCount > 0)
{
_metrics?.RecordRetentionCleanup(tenantId, "DeadLetter", deadLetterCount);
}
// In a full implementation, we would also clean up:
// - Delivery records from delivery store
// - Audit log entries from audit store
// - Storm tracking data from storm store
// - Inbox messages from inbox store
// - Event history from event store
// For now, return counts with just dead-letter cleanup
return new RetentionCleanupCounts
{
DeadLetterEntries = deadLetterCount
};
}
private static int EstimateExpiredCount(DeadLetterStats stats, TimeSpan retention, DateTimeOffset now)
{
if (!stats.OldestEntryAt.HasValue)
{
return 0;
}
var cutoff = now - retention;
if (stats.OldestEntryAt.Value >= cutoff)
{
return 0;
}
// Rough estimation - assume linear distribution
if (!stats.NewestEntryAt.HasValue || stats.TotalCount == 0)
{
return 0;
}
var totalSpan = stats.NewestEntryAt.Value - stats.OldestEntryAt.Value;
if (totalSpan.TotalSeconds <= 0)
{
return stats.TotalCount;
}
var expiredSpan = cutoff - stats.OldestEntryAt.Value;
var ratio = Math.Clamp(expiredSpan.TotalSeconds / totalSpan.TotalSeconds, 0, 1);
return (int)(stats.TotalCount * ratio);
}
}

View File

@@ -0,0 +1,181 @@
namespace StellaOps.Notifier.Worker.Retention;
/// <summary>
/// Service for managing data retention policies and cleanup.
/// </summary>
public interface IRetentionPolicyService
{
/// <summary>
/// Gets the retention policy for a tenant.
/// </summary>
Task<RetentionPolicy> GetPolicyAsync(
string tenantId,
CancellationToken cancellationToken = default);
/// <summary>
/// Sets/updates the retention policy for a tenant.
/// </summary>
Task SetPolicyAsync(
string tenantId,
RetentionPolicy policy,
CancellationToken cancellationToken = default);
/// <summary>
/// Executes retention cleanup for a tenant.
/// </summary>
Task<RetentionCleanupResult> ExecuteCleanupAsync(
string tenantId,
CancellationToken cancellationToken = default);
/// <summary>
/// Executes retention cleanup for all tenants.
/// </summary>
Task<IReadOnlyList<RetentionCleanupResult>> ExecuteCleanupAllAsync(
CancellationToken cancellationToken = default);
/// <summary>
/// Gets the last cleanup execution details.
/// </summary>
Task<RetentionCleanupExecution?> GetLastExecutionAsync(
string tenantId,
CancellationToken cancellationToken = default);
/// <summary>
/// Previews what would be cleaned up without actually deleting.
/// </summary>
Task<RetentionCleanupPreview> PreviewCleanupAsync(
string tenantId,
CancellationToken cancellationToken = default);
}
/// <summary>
/// Data retention policy configuration.
/// </summary>
public sealed record RetentionPolicy
{
/// <summary>
/// Retention period for delivery records.
/// </summary>
public TimeSpan DeliveryRetention { get; init; } = TimeSpan.FromDays(90);
/// <summary>
/// Retention period for audit log entries.
/// </summary>
public TimeSpan AuditRetention { get; init; } = TimeSpan.FromDays(365);
/// <summary>
/// Retention period for dead-letter entries.
/// </summary>
public TimeSpan DeadLetterRetention { get; init; } = TimeSpan.FromDays(30);
/// <summary>
/// Retention period for storm tracking data.
/// </summary>
public TimeSpan StormDataRetention { get; init; } = TimeSpan.FromDays(7);
/// <summary>
/// Retention period for inbox messages.
/// </summary>
public TimeSpan InboxRetention { get; init; } = TimeSpan.FromDays(30);
/// <summary>
/// Retention period for event history.
/// </summary>
public TimeSpan EventHistoryRetention { get; init; } = TimeSpan.FromDays(30);
/// <summary>
/// Whether automatic cleanup is enabled.
/// </summary>
public bool AutoCleanupEnabled { get; init; } = true;
/// <summary>
/// Cron expression for automatic cleanup schedule.
/// </summary>
public string CleanupSchedule { get; init; } = "0 2 * * *"; // Daily at 2 AM
/// <summary>
/// Maximum records to delete per cleanup run.
/// </summary>
public int MaxDeletesPerRun { get; init; } = 10000;
/// <summary>
/// Whether to keep resolved/acknowledged deliveries longer.
/// </summary>
public bool ExtendResolvedRetention { get; init; } = true;
/// <summary>
/// Extension multiplier for resolved items (e.g., 2x = double the retention).
/// </summary>
public double ResolvedRetentionMultiplier { get; init; } = 2.0;
/// <summary>
/// Default policy with standard retention periods.
/// </summary>
public static RetentionPolicy Default => new();
}
/// <summary>
/// Result of a retention cleanup execution.
/// </summary>
public sealed record RetentionCleanupResult
{
public required string TenantId { get; init; }
public required bool Success { get; init; }
public string? Error { get; init; }
public required DateTimeOffset ExecutedAt { get; init; }
public TimeSpan Duration { get; init; }
public required RetentionCleanupCounts Counts { get; init; }
}
/// <summary>
/// Counts of items deleted during retention cleanup.
/// </summary>
public sealed record RetentionCleanupCounts
{
public int Deliveries { get; init; }
public int AuditEntries { get; init; }
public int DeadLetterEntries { get; init; }
public int StormData { get; init; }
public int InboxMessages { get; init; }
public int Events { get; init; }
public int Total => Deliveries + AuditEntries + DeadLetterEntries + StormData + InboxMessages + Events;
}
/// <summary>
/// Details of a cleanup execution.
/// </summary>
public sealed record RetentionCleanupExecution
{
public required string ExecutionId { get; init; }
public required string TenantId { get; init; }
public required DateTimeOffset StartedAt { get; init; }
public DateTimeOffset? CompletedAt { get; init; }
public required RetentionCleanupStatus Status { get; init; }
public RetentionCleanupCounts? Counts { get; init; }
public string? Error { get; init; }
public RetentionPolicy PolicyUsed { get; init; } = RetentionPolicy.Default;
}
/// <summary>
/// Status of a cleanup execution.
/// </summary>
public enum RetentionCleanupStatus
{
Running,
Completed,
Failed,
Cancelled
}
/// <summary>
/// Preview of what would be cleaned up.
/// </summary>
public sealed record RetentionCleanupPreview
{
public required string TenantId { get; init; }
public required DateTimeOffset PreviewedAt { get; init; }
public required RetentionCleanupCounts EstimatedCounts { get; init; }
public required RetentionPolicy PolicyApplied { get; init; }
public required IReadOnlyDictionary<string, DateTimeOffset> CutoffDates { get; init; }
}