up
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
sdk-generator-smoke / sdk-smoke (push) Has been cancelled
SDK Publish & Sign / sdk-publish (push) Has been cancelled
api-governance / spectral-lint (push) Has been cancelled
oas-ci / oas-validate (push) Has been cancelled
Mirror Thin Bundle Sign & Verify / mirror-sign (push) Has been cancelled

This commit is contained in:
StellaOps Bot
2025-11-27 07:46:56 +02:00
parent d63af51f84
commit ea970ead2a
302 changed files with 43161 additions and 1534 deletions

View File

@@ -0,0 +1,252 @@
using System.Collections.Concurrent;
using Cronos;
using Microsoft.Extensions.Logging;
using StellaOps.Notify.Models;
using StellaOps.Notify.Storage.Mongo.Repositories;
using StellaOps.Notifier.Worker.Channels;
namespace StellaOps.Notifier.Worker.Digest;
/// <summary>
/// Default implementation of the digest schedule runner.
/// </summary>
public sealed class DigestScheduleRunner : IDigestScheduleRunner
{
private readonly IDigestGenerator _digestGenerator;
private readonly INotifyChannelRepository _channelRepository;
private readonly IReadOnlyDictionary<NotifyChannelType, INotifyChannelAdapter> _channelAdapters;
private readonly TimeProvider _timeProvider;
private readonly ILogger<DigestScheduleRunner> _logger;
// In-memory schedule store (in production, would use a repository)
private readonly ConcurrentDictionary<string, DigestSchedule> _schedules = new();
private readonly ConcurrentDictionary<string, DateTimeOffset> _lastRunTimes = new();
public DigestScheduleRunner(
IDigestGenerator digestGenerator,
INotifyChannelRepository channelRepository,
IEnumerable<INotifyChannelAdapter> channelAdapters,
TimeProvider timeProvider,
ILogger<DigestScheduleRunner> logger)
{
_digestGenerator = digestGenerator ?? throw new ArgumentNullException(nameof(digestGenerator));
_channelRepository = channelRepository ?? throw new ArgumentNullException(nameof(channelRepository));
_channelAdapters = BuildAdapterMap(channelAdapters);
_timeProvider = timeProvider ?? TimeProvider.System;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<int> ProcessDueDigestsAsync(CancellationToken cancellationToken = default)
{
var now = _timeProvider.GetUtcNow();
var processed = 0;
foreach (var schedule in _schedules.Values.Where(s => s.Enabled))
{
cancellationToken.ThrowIfCancellationRequested();
try
{
if (IsDue(schedule, now))
{
await ProcessScheduleAsync(schedule, now, cancellationToken).ConfigureAwait(false);
processed++;
}
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to process digest schedule {ScheduleId} for tenant {TenantId}",
schedule.ScheduleId, schedule.TenantId);
}
}
return processed;
}
public DateTimeOffset? GetNextScheduledTime(DigestSchedule schedule, DateTimeOffset? after = null)
{
ArgumentNullException.ThrowIfNull(schedule);
var referenceTime = after ?? _timeProvider.GetUtcNow();
try
{
var timeZone = TimeZoneInfo.FindSystemTimeZoneById(schedule.TimeZone);
if (!string.IsNullOrWhiteSpace(schedule.CronExpression))
{
var cron = CronExpression.Parse(schedule.CronExpression);
var next = cron.GetNextOccurrence(referenceTime.UtcDateTime, timeZone);
return next.HasValue
? new DateTimeOffset(next.Value, timeZone.GetUtcOffset(next.Value))
: null;
}
// Default period-based scheduling
return schedule.Period switch
{
DigestPeriod.Hourly => referenceTime.AddHours(1).Date.AddHours(referenceTime.Hour + 1),
DigestPeriod.Daily => referenceTime.Date.AddDays(1).AddHours(9), // 9 AM next day
DigestPeriod.Weekly => GetNextWeekday(referenceTime, DayOfWeek.Monday).AddHours(9),
_ => null
};
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Failed to calculate next scheduled time for {ScheduleId}",
schedule.ScheduleId);
return null;
}
}
/// <summary>
/// Registers a digest schedule.
/// </summary>
public void RegisterSchedule(DigestSchedule schedule)
{
ArgumentNullException.ThrowIfNull(schedule);
_schedules[schedule.ScheduleId] = schedule;
_logger.LogInformation(
"Registered digest schedule {ScheduleId} for tenant {TenantId}",
schedule.ScheduleId, schedule.TenantId);
}
/// <summary>
/// Unregisters a digest schedule.
/// </summary>
public void UnregisterSchedule(string scheduleId)
{
_schedules.TryRemove(scheduleId, out _);
_lastRunTimes.TryRemove(scheduleId, out _);
}
private bool IsDue(DigestSchedule schedule, DateTimeOffset now)
{
// Check if we've run recently
if (_lastRunTimes.TryGetValue(schedule.ScheduleId, out var lastRun))
{
var minInterval = schedule.Period switch
{
DigestPeriod.Hourly => TimeSpan.FromMinutes(55),
DigestPeriod.Daily => TimeSpan.FromHours(23),
DigestPeriod.Weekly => TimeSpan.FromDays(6.5),
_ => TimeSpan.FromHours(1)
};
if (now - lastRun < minInterval)
{
return false;
}
}
var nextScheduled = GetNextScheduledTime(schedule, _lastRunTimes.GetValueOrDefault(schedule.ScheduleId));
return nextScheduled.HasValue && now >= nextScheduled.Value;
}
private async Task ProcessScheduleAsync(
DigestSchedule schedule,
DateTimeOffset now,
CancellationToken cancellationToken)
{
_logger.LogDebug("Processing digest schedule {ScheduleId}", schedule.ScheduleId);
// Calculate period
var (periodStart, periodEnd) = CalculatePeriod(schedule, now);
// Generate digest
var digest = await _digestGenerator.GenerateAsync(
schedule, periodStart, periodEnd, cancellationToken).ConfigureAwait(false);
// Record run time
_lastRunTimes[schedule.ScheduleId] = now;
// Skip if no events
if (digest.Status == NotifyDigestStatus.Skipped || digest.EventCount == 0)
{
_logger.LogDebug(
"Skipping empty digest {DigestId} for schedule {ScheduleId}",
digest.DigestId, schedule.ScheduleId);
return;
}
// Format content
var content = await _digestGenerator.FormatAsync(
digest, schedule.TemplateId, cancellationToken).ConfigureAwait(false);
// Get channel and send
var channel = await _channelRepository.GetAsync(
schedule.TenantId, schedule.ChannelId, cancellationToken).ConfigureAwait(false);
if (channel is null)
{
_logger.LogWarning(
"Channel {ChannelId} not found for digest schedule {ScheduleId}",
schedule.ChannelId, schedule.ScheduleId);
return;
}
if (!_channelAdapters.TryGetValue(channel.Type, out var adapter))
{
_logger.LogWarning(
"No adapter found for channel type {ChannelType}",
channel.Type);
return;
}
var rendered = NotifyDeliveryRendered.Create(
channelType: channel.Type,
format: NotifyDeliveryFormat.Json,
target: channel.Config?.Target ?? string.Empty,
title: $"Notification Digest: {schedule.Name}",
body: content,
locale: "en-us");
var result = await adapter.SendAsync(channel, rendered, cancellationToken).ConfigureAwait(false);
if (result.Success)
{
_logger.LogInformation(
"Sent digest {DigestId} via channel {ChannelId}: {EventCount} events",
digest.DigestId, schedule.ChannelId, digest.EventCount);
}
else
{
_logger.LogWarning(
"Failed to send digest {DigestId}: {Reason}",
digest.DigestId, result.Reason);
}
}
private static (DateTimeOffset Start, DateTimeOffset End) CalculatePeriod(
DigestSchedule schedule,
DateTimeOffset now)
{
return schedule.Period switch
{
DigestPeriod.Hourly => (now.AddHours(-1), now),
DigestPeriod.Daily => (now.Date.AddDays(-1), now.Date),
DigestPeriod.Weekly => (now.Date.AddDays(-7), now.Date),
_ => (now.AddHours(-1), now)
};
}
private static DateTimeOffset GetNextWeekday(DateTimeOffset from, DayOfWeek target)
{
var daysUntil = ((int)target - (int)from.DayOfWeek + 7) % 7;
if (daysUntil == 0) daysUntil = 7;
return from.Date.AddDays(daysUntil);
}
private static IReadOnlyDictionary<NotifyChannelType, INotifyChannelAdapter> BuildAdapterMap(
IEnumerable<INotifyChannelAdapter> adapters)
{
var builder = new Dictionary<NotifyChannelType, INotifyChannelAdapter>();
foreach (var adapter in adapters)
{
builder[adapter.ChannelType] = adapter;
}
return builder;
}
}