using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Runtime.CompilerServices; using System.Threading.Channels; using StellaOps.Scanner.WebService.Domain; namespace StellaOps.Scanner.WebService.Services; public interface IScanProgressPublisher { ScanProgressEvent Publish( ScanId scanId, string state, string? message = null, IReadOnlyDictionary? data = null, string? correlationId = null); } public interface IScanProgressReader { bool Exists(ScanId scanId); IAsyncEnumerable SubscribeAsync(ScanId scanId, CancellationToken cancellationToken); } public sealed class ScanProgressStream : IScanProgressPublisher, IScanProgressReader { private sealed class ProgressChannel { private readonly List history = new(); private readonly Channel channel = Channel.CreateUnbounded(new UnboundedChannelOptions { AllowSynchronousContinuations = true, SingleReader = false, SingleWriter = false }); public int Sequence { get; private set; } public ScanProgressEvent Append(ScanProgressEvent progressEvent) { history.Add(progressEvent); channel.Writer.TryWrite(progressEvent); return progressEvent; } public IReadOnlyList Snapshot() { return history.Count == 0 ? Array.Empty() : history.ToArray(); } public ChannelReader Reader => channel.Reader; public int NextSequence() => ++Sequence; } private static readonly IReadOnlyDictionary EmptyData = new ReadOnlyDictionary(new SortedDictionary(StringComparer.OrdinalIgnoreCase)); private readonly ConcurrentDictionary channels = new(StringComparer.OrdinalIgnoreCase); private readonly TimeProvider timeProvider; public ScanProgressStream(TimeProvider timeProvider) { this.timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider)); } public bool Exists(ScanId scanId) => channels.ContainsKey(scanId.Value); public ScanProgressEvent Publish( ScanId scanId, string state, string? message = null, IReadOnlyDictionary? data = null, string? correlationId = null) { var channel = channels.GetOrAdd(scanId.Value, _ => new ProgressChannel()); ScanProgressEvent progressEvent; lock (channel) { var sequence = channel.NextSequence(); var correlation = correlationId ?? $"{scanId.Value}:{sequence:D4}"; progressEvent = new ScanProgressEvent( scanId, sequence, timeProvider.GetUtcNow(), state, message, correlation, NormalizePayload(data)); channel.Append(progressEvent); } return progressEvent; } public async IAsyncEnumerable SubscribeAsync( ScanId scanId, [EnumeratorCancellation] CancellationToken cancellationToken) { if (!channels.TryGetValue(scanId.Value, out var channel)) { yield break; } IReadOnlyList snapshot; lock (channel) { snapshot = channel.Snapshot(); } foreach (var progressEvent in snapshot) { yield return progressEvent; } var reader = channel.Reader; while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) { while (reader.TryRead(out var progressEvent)) { yield return progressEvent; } } } private static IReadOnlyDictionary NormalizePayload(IReadOnlyDictionary? data) { if (data is null || data.Count == 0) { return EmptyData; } var sorted = new SortedDictionary(StringComparer.OrdinalIgnoreCase); foreach (var pair in data) { sorted[pair.Key] = pair.Value; } return sorted.Count == 0 ? EmptyData : new ReadOnlyDictionary(sorted); } }