using System; using System.Collections.Generic; using System.Text.Json.Nodes; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging.Abstractions; using StellaOps.Notify.Models; using StellaOps.Notify.Queue; using StellaOps.Scheduler.Models; using StellaOps.Scheduler.Queue; using StellaOps.Scheduler.Worker.Events; using StellaOps.Scheduler.Worker.Execution; using Xunit; namespace StellaOps.Scheduler.Worker.Tests; public sealed class SchedulerEventPublisherTests { [Fact] public async Task PublishReportReadyAsync_EnqueuesNotifyEvent() { var queue = new RecordingNotifyEventQueue(); var options = new NotifyEventQueueOptions(); var publisher = new SchedulerEventPublisher(queue, options, TimeProvider.System, NullLogger.Instance); var run = CreateRun(); var message = CreateMessage(run); var delta = new DeltaSummary( run.Id, newFindings: 2, newCriticals: 1, newHigh: 1, newMedium: 0, newLow: 0); var result = CreateRunnerImageResult(run.Id, delta); var impact = new ImpactImage(run.Id, "registry", "repository"); await publisher.PublishReportReadyAsync(run, message, result, impact, CancellationToken.None); Assert.Single(queue.Messages); var notifyEvent = queue.Messages[0].Event; Assert.Equal(NotifyEventKinds.ScannerReportReady, notifyEvent.Kind); Assert.Equal(run.TenantId, notifyEvent.Tenant); Assert.NotNull(notifyEvent.Scope); Assert.Equal("repository", notifyEvent.Scope!.Repo); var payload = Assert.IsType(notifyEvent.Payload); Assert.Equal(result.Report.ReportId, payload["reportId"]!.GetValue()); Assert.Equal("warn", payload["verdict"]!.GetValue()); var deltaNode = Assert.IsType(payload["delta"]); Assert.Equal(1, deltaNode["newCritical"]!.GetValue()); } [Fact] public async Task PublishRescanDeltaAsync_EnqueuesDeltaEvent() { var queue = new RecordingNotifyEventQueue(); var options = new NotifyEventQueueOptions(); var publisher = new SchedulerEventPublisher(queue, options, TimeProvider.System, NullLogger.Instance); var run = CreateRun(); var message = CreateMessage(run); var delta = new DeltaSummary(run.Id, 1, 1, 0, 0, 0); var impactLookup = new Dictionary { [run.Id] = new ImpactImage(run.Id, "registry", "repository") }; await publisher.PublishRescanDeltaAsync(run, message, new[] { delta }, impactLookup, CancellationToken.None); Assert.Single(queue.Messages); var notifyEvent = queue.Messages[0].Event; Assert.Equal(NotifyEventKinds.SchedulerRescanDelta, notifyEvent.Kind); var payload = Assert.IsType(notifyEvent.Payload); var digests = Assert.IsType(payload["impactedDigests"]); Assert.Equal(run.Id, digests[0]!.GetValue()); } private const string SampleDigest = "sha256:1111111111111111111111111111111111111111111111111111111111111111"; private static Run CreateRun() => new( id: SampleDigest, tenantId: "tenant-1", trigger: RunTrigger.Cron, state: RunState.Running, stats: new RunStats(queued: 1, completed: 0), createdAt: DateTimeOffset.UtcNow, scheduleId: "schedule-1"); private static RunnerSegmentQueueMessage CreateMessage(Run run) => new( segmentId: $"{run.Id}:0000", runId: run.Id, tenantId: run.TenantId, imageDigests: new[] { run.Id }, scheduleId: run.ScheduleId, ratePerSecond: null, usageOnly: true, attributes: new Dictionary(StringComparer.Ordinal) { ["scheduleMode"] = ScheduleMode.AnalysisOnly.ToString() }); private static RunnerImageResult CreateRunnerImageResult(string digest, DeltaSummary? delta) { var summary = new RunnerReportSummary( Total: delta?.NewFindings ?? 0, Blocked: delta?.NewCriticals ?? 0, Warned: delta?.NewHigh ?? 0, Ignored: delta?.NewLow ?? 0, Quieted: 0); var snapshot = new RunnerReportSnapshot( ReportId: $"report-{digest[^4..]}", ImageDigest: digest, Verdict: "warn", GeneratedAt: DateTimeOffset.UtcNow, Summary: summary, PolicyRevisionId: null, PolicyDigest: null); return new RunnerImageResult(digest, delta, ContentRefreshed: false, snapshot, Dsse: null); } private sealed class RecordingNotifyEventQueue : INotifyEventQueue { public List Messages { get; } = new(); public ValueTask PublishAsync(NotifyQueueEventMessage message, CancellationToken cancellationToken = default) { Messages.Add(message); return ValueTask.FromResult(new NotifyQueueEnqueueResult(Guid.NewGuid().ToString("N"), false)); } public ValueTask>> LeaseAsync(NotifyQueueLeaseRequest request, CancellationToken cancellationToken = default) => throw new NotSupportedException(); public ValueTask>> ClaimExpiredAsync(NotifyQueueClaimOptions options, CancellationToken cancellationToken = default) => throw new NotSupportedException(); } }