using System; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Text.Json.Serialization; using System.Threading; using System.Threading.Tasks; using StellaOps.Scheduler.Models; namespace StellaOps.Scheduler.Queue; public sealed class PlannerQueueMessage { [JsonConstructor] public PlannerQueueMessage( Run run, ImpactSet impactSet, Schedule? schedule = null, string? correlationId = null) { Run = run ?? throw new ArgumentNullException(nameof(run)); ImpactSet = impactSet ?? throw new ArgumentNullException(nameof(impactSet)); if (schedule is not null && string.IsNullOrWhiteSpace(schedule.Id)) { throw new ArgumentException("Schedule must have a valid identifier.", nameof(schedule)); } if (!string.IsNullOrWhiteSpace(correlationId)) { correlationId = correlationId!.Trim(); } Schedule = schedule; CorrelationId = string.IsNullOrWhiteSpace(correlationId) ? null : correlationId; } public Run Run { get; } public ImpactSet ImpactSet { get; } public Schedule? Schedule { get; } public string? CorrelationId { get; } public string IdempotencyKey => Run.Id; public string TenantId => Run.TenantId; public string? ScheduleId => Run.ScheduleId; } public sealed class RunnerSegmentQueueMessage { private readonly ReadOnlyCollection _imageDigests; private readonly IReadOnlyDictionary _attributes; private readonly IReadOnlyDictionary _surfaceManifests; [JsonConstructor] public RunnerSegmentQueueMessage( string segmentId, string runId, string tenantId, IReadOnlyList imageDigests, string? scheduleId = null, int? ratePerSecond = null, bool usageOnly = true, IReadOnlyDictionary? attributes = null, string? correlationId = null, IReadOnlyDictionary? surfaceManifests = null) { if (string.IsNullOrWhiteSpace(segmentId)) { throw new ArgumentException("Segment identifier must be provided.", nameof(segmentId)); } if (string.IsNullOrWhiteSpace(runId)) { throw new ArgumentException("Run identifier must be provided.", nameof(runId)); } if (string.IsNullOrWhiteSpace(tenantId)) { throw new ArgumentException("Tenant identifier must be provided.", nameof(tenantId)); } SegmentId = segmentId; RunId = runId; TenantId = tenantId; ScheduleId = string.IsNullOrWhiteSpace(scheduleId) ? null : scheduleId; RatePerSecond = ratePerSecond; UsageOnly = usageOnly; CorrelationId = string.IsNullOrWhiteSpace(correlationId) ? null : correlationId; _imageDigests = new ReadOnlyCollection(NormalizeDigests(imageDigests)); _attributes = attributes is null ? EmptyReadOnlyDictionary.Instance : new ReadOnlyDictionary(new Dictionary(attributes, StringComparer.Ordinal)); _surfaceManifests = surfaceManifests is null ? EmptyReadOnlyDictionary.Instance : new ReadOnlyDictionary(new Dictionary(surfaceManifests, StringComparer.Ordinal)); } public string SegmentId { get; } public string RunId { get; } public string TenantId { get; } public string? ScheduleId { get; } public int? RatePerSecond { get; } public bool UsageOnly { get; } public string? CorrelationId { get; } public IReadOnlyList ImageDigests => _imageDigests; public IReadOnlyDictionary Attributes => _attributes; [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] public IReadOnlyDictionary SurfaceManifests => _surfaceManifests; public string IdempotencyKey => SegmentId; private static List NormalizeDigests(IReadOnlyList digests) { if (digests is null) { throw new ArgumentNullException(nameof(digests)); } var list = new List(); foreach (var digest in digests) { if (string.IsNullOrWhiteSpace(digest)) { continue; } list.Add(digest.Trim()); } if (list.Count == 0) { throw new ArgumentException("At least one image digest must be provided.", nameof(digests)); } return list; } private sealed class EmptyReadOnlyDictionary where TKey : notnull { public static readonly IReadOnlyDictionary Instance = new ReadOnlyDictionary(new Dictionary(0, EqualityComparer.Default)); } } public readonly record struct SchedulerQueueEnqueueResult(string MessageId, bool Deduplicated); public sealed class SchedulerQueueLeaseRequest { public SchedulerQueueLeaseRequest(string consumer, int batchSize, TimeSpan leaseDuration) { if (string.IsNullOrWhiteSpace(consumer)) { throw new ArgumentException("Consumer identifier must be provided.", nameof(consumer)); } if (batchSize <= 0) { throw new ArgumentOutOfRangeException(nameof(batchSize), batchSize, "Batch size must be positive."); } if (leaseDuration <= TimeSpan.Zero) { throw new ArgumentOutOfRangeException(nameof(leaseDuration), leaseDuration, "Lease duration must be positive."); } Consumer = consumer; BatchSize = batchSize; LeaseDuration = leaseDuration; } public string Consumer { get; } public int BatchSize { get; } public TimeSpan LeaseDuration { get; } } public sealed class SchedulerQueueClaimOptions { public SchedulerQueueClaimOptions(string claimantConsumer, int batchSize, TimeSpan minIdleTime) { if (string.IsNullOrWhiteSpace(claimantConsumer)) { throw new ArgumentException("Consumer identifier must be provided.", nameof(claimantConsumer)); } if (batchSize <= 0) { throw new ArgumentOutOfRangeException(nameof(batchSize), batchSize, "Batch size must be positive."); } if (minIdleTime < TimeSpan.Zero) { throw new ArgumentOutOfRangeException(nameof(minIdleTime), minIdleTime, "Idle time cannot be negative."); } ClaimantConsumer = claimantConsumer; BatchSize = batchSize; MinIdleTime = minIdleTime; } public string ClaimantConsumer { get; } public int BatchSize { get; } public TimeSpan MinIdleTime { get; } } /// /// Minimal pointer to a Surface.FS manifest associated with an image digest. /// Kept local to avoid coupling queue contracts to worker assemblies. /// public sealed record SurfaceManifestPointer { public SurfaceManifestPointer(string manifestDigest, string? tenant) { ManifestDigest = manifestDigest ?? throw new ArgumentNullException(nameof(manifestDigest)); Tenant = tenant; } [JsonPropertyName("manifestDigest")] public string ManifestDigest { get; init; } [JsonPropertyName("tenant")] [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] public string? Tenant { get; init; } } public enum SchedulerQueueReleaseDisposition { Retry, Abandon } public interface ISchedulerQueue { ValueTask EnqueueAsync(TMessage message, CancellationToken cancellationToken = default); ValueTask>> LeaseAsync(SchedulerQueueLeaseRequest request, CancellationToken cancellationToken = default); ValueTask>> ClaimExpiredAsync(SchedulerQueueClaimOptions options, CancellationToken cancellationToken = default); } public interface ISchedulerQueueLease { string MessageId { get; } int Attempt { get; } DateTimeOffset EnqueuedAt { get; } DateTimeOffset LeaseExpiresAt { get; } string Consumer { get; } string TenantId { get; } string RunId { get; } string? ScheduleId { get; } string? SegmentId { get; } string? CorrelationId { get; } string IdempotencyKey { get; } IReadOnlyDictionary Attributes { get; } TMessage Message { get; } Task AcknowledgeAsync(CancellationToken cancellationToken = default); Task RenewAsync(TimeSpan leaseDuration, CancellationToken cancellationToken = default); Task ReleaseAsync(SchedulerQueueReleaseDisposition disposition, CancellationToken cancellationToken = default); Task DeadLetterAsync(string reason, CancellationToken cancellationToken = default); } public interface ISchedulerPlannerQueue : ISchedulerQueue { } public interface ISchedulerRunnerQueue : ISchedulerQueue { }