using System.Collections.Concurrent; using System.Security.Cryptography; using System.Text; using StellaOps.Policy.Registry.Contracts; namespace StellaOps.Policy.Registry.Services; /// /// Default implementation of batch simulation orchestrator. /// Uses in-memory job queue with background processing. /// public sealed class BatchSimulationOrchestrator : IBatchSimulationOrchestrator, IDisposable { private readonly IPolicySimulationService _simulationService; private readonly TimeProvider _timeProvider; private readonly ConcurrentDictionary<(Guid TenantId, string JobId), BatchSimulationJob> _jobs = new(); private readonly ConcurrentDictionary<(Guid TenantId, string JobId), List> _results = new(); private readonly ConcurrentDictionary _idempotencyKeys = new(); private readonly ConcurrentQueue<(Guid TenantId, string JobId, BatchSimulationRequest Request)> _jobQueue = new(); private readonly CancellationTokenSource _disposalCts = new(); private readonly Task _processingTask; public BatchSimulationOrchestrator( IPolicySimulationService simulationService, TimeProvider? timeProvider = null) { _simulationService = simulationService ?? throw new ArgumentNullException(nameof(simulationService)); _timeProvider = timeProvider ?? TimeProvider.System; // Start background processing _processingTask = Task.Run(ProcessJobsAsync); } public Task SubmitBatchAsync( Guid tenantId, BatchSimulationRequest request, CancellationToken cancellationToken = default) { // Check idempotency key if (!string.IsNullOrEmpty(request.IdempotencyKey)) { if (_idempotencyKeys.TryGetValue(request.IdempotencyKey, out var existingJobId)) { var existingJob = _jobs.Values.FirstOrDefault(j => j.JobId == existingJobId && j.TenantId == tenantId); if (existingJob is not null) { return Task.FromResult(existingJob); } } } var now = _timeProvider.GetUtcNow(); var jobId = GenerateJobId(tenantId, now); var job = new BatchSimulationJob { JobId = jobId, TenantId = tenantId, PackId = request.PackId, Status = BatchJobStatus.Pending, Description = request.Description, TotalInputs = request.Inputs.Count, ProcessedInputs = 0, SucceededInputs = 0, FailedInputs = 0, CreatedAt = now, Progress = new BatchJobProgress { PercentComplete = 0, EstimatedRemainingSeconds = null, CurrentBatchIndex = 0, TotalBatches = 1 } }; _jobs[(tenantId, jobId)] = job; _results[(tenantId, jobId)] = []; if (!string.IsNullOrEmpty(request.IdempotencyKey)) { _idempotencyKeys[request.IdempotencyKey] = jobId; } // Queue job for processing _jobQueue.Enqueue((tenantId, jobId, request)); return Task.FromResult(job); } public Task GetJobAsync( Guid tenantId, string jobId, CancellationToken cancellationToken = default) { _jobs.TryGetValue((tenantId, jobId), out var job); return Task.FromResult(job); } public Task ListJobsAsync( Guid tenantId, BatchJobStatus? status = null, int pageSize = 20, string? pageToken = null, CancellationToken cancellationToken = default) { var query = _jobs.Values.Where(j => j.TenantId == tenantId); if (status.HasValue) { query = query.Where(j => j.Status == status.Value); } var items = query .OrderByDescending(j => j.CreatedAt) .ToList(); int skip = 0; if (!string.IsNullOrEmpty(pageToken) && int.TryParse(pageToken, out var offset)) { skip = offset; } var pagedItems = items.Skip(skip).Take(pageSize).ToList(); string? nextToken = skip + pagedItems.Count < items.Count ? (skip + pagedItems.Count).ToString() : null; return Task.FromResult(new BatchSimulationJobList { Items = pagedItems, NextPageToken = nextToken, TotalCount = items.Count }); } public Task CancelJobAsync( Guid tenantId, string jobId, CancellationToken cancellationToken = default) { if (!_jobs.TryGetValue((tenantId, jobId), out var job)) { return Task.FromResult(false); } if (job.Status is not (BatchJobStatus.Pending or BatchJobStatus.Running)) { return Task.FromResult(false); } var cancelledJob = job with { Status = BatchJobStatus.Cancelled, CompletedAt = _timeProvider.GetUtcNow() }; _jobs[(tenantId, jobId)] = cancelledJob; return Task.FromResult(true); } public Task GetResultsAsync( Guid tenantId, string jobId, int pageSize = 100, string? pageToken = null, CancellationToken cancellationToken = default) { if (!_jobs.TryGetValue((tenantId, jobId), out var job)) { return Task.FromResult(null); } if (!_results.TryGetValue((tenantId, jobId), out var results)) { return Task.FromResult(null); } int skip = 0; if (!string.IsNullOrEmpty(pageToken) && int.TryParse(pageToken, out var offset)) { skip = offset; } var pagedResults = results.Skip(skip).Take(pageSize).ToList(); string? nextToken = skip + pagedResults.Count < results.Count ? (skip + pagedResults.Count).ToString() : null; var summary = job.Status == BatchJobStatus.Completed ? ComputeSummary(results) : null; return Task.FromResult(new BatchSimulationResults { JobId = jobId, Results = pagedResults, Summary = summary, NextPageToken = nextToken }); } private async Task ProcessJobsAsync() { while (!_disposalCts.Token.IsCancellationRequested) { if (_jobQueue.TryDequeue(out var item)) { var (tenantId, jobId, request) = item; // Check if job was cancelled if (_jobs.TryGetValue((tenantId, jobId), out var job) && job.Status == BatchJobStatus.Cancelled) { continue; } await ProcessJobAsync(tenantId, jobId, request, _disposalCts.Token); } else { await Task.Delay(100, _disposalCts.Token).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); } } } private async Task ProcessJobAsync( Guid tenantId, string jobId, BatchSimulationRequest request, CancellationToken cancellationToken) { var startedAt = _timeProvider.GetUtcNow(); var results = _results[(tenantId, jobId)]; // Update job to running UpdateJob(tenantId, jobId, job => job with { Status = BatchJobStatus.Running, StartedAt = startedAt }); int processed = 0; int succeeded = 0; int failed = 0; foreach (var input in request.Inputs) { if (cancellationToken.IsCancellationRequested) { break; } // Check if job was cancelled if (_jobs.TryGetValue((tenantId, jobId), out var currentJob) && currentJob.Status == BatchJobStatus.Cancelled) { break; } try { var simRequest = new SimulationRequest { Input = input.Input, Options = request.Options is not null ? new SimulationOptions { Trace = request.Options.IncludeTrace, Explain = request.Options.IncludeExplain } : null }; var response = await _simulationService.SimulateAsync( tenantId, request.PackId, simRequest, cancellationToken); results.Add(new BatchSimulationInputResult { InputId = input.InputId, Success = response.Success, Response = response, DurationMilliseconds = response.DurationMilliseconds }); if (response.Success) { succeeded++; } else { failed++; if (!request.Options?.ContinueOnError ?? false) { break; } } } catch (Exception ex) { failed++; results.Add(new BatchSimulationInputResult { InputId = input.InputId, Success = false, Error = ex.Message, DurationMilliseconds = 0 }); if (!request.Options?.ContinueOnError ?? false) { break; } } processed++; // Update progress var progress = (double)processed / request.Inputs.Count * 100; UpdateJob(tenantId, jobId, job => job with { ProcessedInputs = processed, SucceededInputs = succeeded, FailedInputs = failed, Progress = new BatchJobProgress { PercentComplete = progress, CurrentBatchIndex = processed, TotalBatches = request.Inputs.Count } }); } // Finalize job var completedAt = _timeProvider.GetUtcNow(); var finalStatus = failed > 0 && succeeded == 0 ? BatchJobStatus.Failed : BatchJobStatus.Completed; UpdateJob(tenantId, jobId, job => job with { Status = finalStatus, ProcessedInputs = processed, SucceededInputs = succeeded, FailedInputs = failed, CompletedAt = completedAt, Progress = new BatchJobProgress { PercentComplete = 100, CurrentBatchIndex = processed, TotalBatches = request.Inputs.Count } }); } private void UpdateJob(Guid tenantId, string jobId, Func update) { if (_jobs.TryGetValue((tenantId, jobId), out var current)) { _jobs[(tenantId, jobId)] = update(current); } } private static BatchSimulationSummary ComputeSummary(List results) { var totalViolations = 0; var severityCounts = new Dictionary(StringComparer.OrdinalIgnoreCase); long totalDuration = 0; foreach (var result in results) { totalDuration += result.DurationMilliseconds; if (result.Response?.Summary?.ViolationsFound > 0) { totalViolations += result.Response.Summary.ViolationsFound; foreach (var (severity, count) in result.Response.Summary.ViolationsBySeverity) { severityCounts[severity] = severityCounts.GetValueOrDefault(severity) + count; } } } return new BatchSimulationSummary { TotalInputs = results.Count, Succeeded = results.Count(r => r.Success), Failed = results.Count(r => !r.Success), TotalViolations = totalViolations, ViolationsBySeverity = severityCounts, TotalDurationMilliseconds = totalDuration, AverageDurationMilliseconds = results.Count > 0 ? (double)totalDuration / results.Count : 0 }; } private static string GenerateJobId(Guid tenantId, DateTimeOffset timestamp) { var content = $"{tenantId}:{timestamp.ToUnixTimeMilliseconds()}:{Guid.NewGuid()}"; var hash = SHA256.HashData(Encoding.UTF8.GetBytes(content)); return $"batch_{Convert.ToHexString(hash)[..16].ToLowerInvariant()}"; } public void Dispose() { _disposalCts.Cancel(); _processingTask.Wait(TimeSpan.FromSeconds(5)); _disposalCts.Dispose(); } }