261 lines
9.0 KiB
C#
261 lines
9.0 KiB
C#
|
|
using StellaOps.Determinism;
|
|
using StellaOps.Scheduler.Models;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Immutable;
|
|
using System.ComponentModel.DataAnnotations;
|
|
|
|
namespace StellaOps.Scheduler.WebService.PolicyRuns;
|
|
|
|
internal sealed class InMemoryPolicyRunService : IPolicyRunService
|
|
{
|
|
private readonly ConcurrentDictionary<string, PolicyRunStatus> _runs = new(StringComparer.Ordinal);
|
|
private readonly List<PolicyRunStatus> _orderedRuns = new();
|
|
private readonly object _gate = new();
|
|
private readonly TimeProvider _timeProvider;
|
|
private readonly IGuidProvider _guidProvider;
|
|
|
|
public InMemoryPolicyRunService(TimeProvider? timeProvider = null, IGuidProvider? guidProvider = null)
|
|
{
|
|
_timeProvider = timeProvider ?? TimeProvider.System;
|
|
_guidProvider = guidProvider ?? SystemGuidProvider.Instance;
|
|
}
|
|
|
|
public Task<PolicyRunStatus> EnqueueAsync(string tenantId, PolicyRunRequest request, CancellationToken cancellationToken)
|
|
{
|
|
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
|
|
ArgumentNullException.ThrowIfNull(request);
|
|
cancellationToken.ThrowIfCancellationRequested();
|
|
|
|
var now = _timeProvider.GetUtcNow();
|
|
var runId = string.IsNullOrWhiteSpace(request.RunId)
|
|
? GenerateRunId(request.PolicyId, request.QueuedAt ?? now)
|
|
: request.RunId;
|
|
|
|
var queuedAt = request.QueuedAt ?? now;
|
|
|
|
var status = new PolicyRunStatus(
|
|
runId,
|
|
tenantId,
|
|
request.PolicyId ?? throw new ValidationException("policyId must be provided."),
|
|
request.PolicyVersion ?? throw new ValidationException("policyVersion must be provided."),
|
|
request.Mode,
|
|
PolicyRunExecutionStatus.Queued,
|
|
request.Priority,
|
|
queuedAt,
|
|
PolicyRunStats.Empty,
|
|
request.Inputs ?? PolicyRunInputs.Empty,
|
|
null,
|
|
null,
|
|
null,
|
|
null,
|
|
null,
|
|
0,
|
|
null,
|
|
null,
|
|
request.Metadata ?? ImmutableSortedDictionary<string, string>.Empty,
|
|
cancellationRequested: false,
|
|
cancellationRequestedAt: null,
|
|
cancellationReason: null,
|
|
SchedulerSchemaVersions.PolicyRunStatus);
|
|
|
|
lock (_gate)
|
|
{
|
|
if (_runs.TryGetValue(runId, out var existing))
|
|
{
|
|
return Task.FromResult(existing);
|
|
}
|
|
|
|
_runs[runId] = status;
|
|
_orderedRuns.Add(status);
|
|
}
|
|
|
|
return Task.FromResult(status);
|
|
}
|
|
|
|
public Task<IReadOnlyList<PolicyRunStatus>> ListAsync(string tenantId, PolicyRunQueryOptions options, CancellationToken cancellationToken)
|
|
{
|
|
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
|
|
ArgumentNullException.ThrowIfNull(options);
|
|
cancellationToken.ThrowIfCancellationRequested();
|
|
|
|
List<PolicyRunStatus> snapshot;
|
|
lock (_gate)
|
|
{
|
|
snapshot = _orderedRuns
|
|
.Where(run => string.Equals(run.TenantId, tenantId, StringComparison.Ordinal))
|
|
.ToList();
|
|
}
|
|
|
|
if (options.PolicyId is { Length: > 0 } policyId)
|
|
{
|
|
snapshot = snapshot
|
|
.Where(run => string.Equals(run.PolicyId, policyId, StringComparison.OrdinalIgnoreCase))
|
|
.ToList();
|
|
}
|
|
|
|
if (options.Mode is { } mode)
|
|
{
|
|
snapshot = snapshot
|
|
.Where(run => run.Mode == mode)
|
|
.ToList();
|
|
}
|
|
|
|
if (options.Status is { } status)
|
|
{
|
|
snapshot = snapshot
|
|
.Where(run => run.Status == status)
|
|
.ToList();
|
|
}
|
|
|
|
if (options.QueuedAfter is { } since)
|
|
{
|
|
snapshot = snapshot
|
|
.Where(run => run.QueuedAt >= since)
|
|
.ToList();
|
|
}
|
|
|
|
var result = snapshot
|
|
.OrderByDescending(run => run.QueuedAt)
|
|
.ThenBy(run => run.RunId, StringComparer.Ordinal)
|
|
.Take(options.Limit)
|
|
.ToList();
|
|
|
|
return Task.FromResult<IReadOnlyList<PolicyRunStatus>>(result);
|
|
}
|
|
|
|
public Task<PolicyRunStatus?> GetAsync(string tenantId, string runId, CancellationToken cancellationToken)
|
|
{
|
|
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
|
|
ArgumentException.ThrowIfNullOrWhiteSpace(runId);
|
|
cancellationToken.ThrowIfCancellationRequested();
|
|
|
|
if (!_runs.TryGetValue(runId, out var run))
|
|
{
|
|
return Task.FromResult<PolicyRunStatus?>(null);
|
|
}
|
|
|
|
if (!string.Equals(run.TenantId, tenantId, StringComparison.Ordinal))
|
|
{
|
|
return Task.FromResult<PolicyRunStatus?>(null);
|
|
}
|
|
|
|
return Task.FromResult<PolicyRunStatus?>(run);
|
|
}
|
|
|
|
public Task<PolicyRunStatus?> RequestCancellationAsync(string tenantId, string runId, string? reason, CancellationToken cancellationToken)
|
|
{
|
|
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
|
|
ArgumentException.ThrowIfNullOrWhiteSpace(runId);
|
|
cancellationToken.ThrowIfCancellationRequested();
|
|
|
|
PolicyRunStatus? updated;
|
|
lock (_gate)
|
|
{
|
|
if (!_runs.TryGetValue(runId, out var existing) || !string.Equals(existing.TenantId, tenantId, StringComparison.Ordinal))
|
|
{
|
|
return Task.FromResult<PolicyRunStatus?>(null);
|
|
}
|
|
|
|
if (IsTerminal(existing.Status))
|
|
{
|
|
return Task.FromResult<PolicyRunStatus?>(existing);
|
|
}
|
|
|
|
var cancellationReason = NormalizeCancellationReason(reason);
|
|
var now = _timeProvider.GetUtcNow();
|
|
updated = existing with
|
|
{
|
|
Status = PolicyRunExecutionStatus.Cancelled,
|
|
FinishedAt = now,
|
|
CancellationRequested = true,
|
|
CancellationRequestedAt = now,
|
|
CancellationReason = cancellationReason
|
|
};
|
|
|
|
_runs[runId] = updated;
|
|
var index = _orderedRuns.FindIndex(status => string.Equals(status.RunId, runId, StringComparison.Ordinal));
|
|
if (index >= 0)
|
|
{
|
|
_orderedRuns[index] = updated;
|
|
}
|
|
}
|
|
|
|
return Task.FromResult<PolicyRunStatus?>(updated);
|
|
}
|
|
|
|
public async Task<PolicyRunStatus> RetryAsync(string tenantId, string runId, string? requestedBy, CancellationToken cancellationToken)
|
|
{
|
|
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
|
|
ArgumentException.ThrowIfNullOrWhiteSpace(runId);
|
|
cancellationToken.ThrowIfCancellationRequested();
|
|
|
|
PolicyRunStatus existing;
|
|
lock (_gate)
|
|
{
|
|
if (!_runs.TryGetValue(runId, out var status) || !string.Equals(status.TenantId, tenantId, StringComparison.Ordinal))
|
|
{
|
|
throw new KeyNotFoundException($"Policy simulation {runId} was not found for tenant {tenantId}.");
|
|
}
|
|
|
|
if (!IsTerminal(status.Status))
|
|
{
|
|
throw new InvalidOperationException("Simulation is still in progress and cannot be retried.");
|
|
}
|
|
|
|
existing = status;
|
|
}
|
|
|
|
var metadataBuilder = (existing.Metadata ?? ImmutableSortedDictionary<string, string>.Empty).ToBuilder();
|
|
metadataBuilder["retry-of"] = runId;
|
|
var request = new PolicyRunRequest(
|
|
tenantId,
|
|
existing.PolicyId,
|
|
PolicyRunMode.Simulate,
|
|
existing.Inputs,
|
|
existing.Priority,
|
|
runId: null,
|
|
policyVersion: existing.PolicyVersion,
|
|
requestedBy: NormalizeActor(requestedBy),
|
|
queuedAt: _timeProvider.GetUtcNow(),
|
|
correlationId: null,
|
|
metadata: metadataBuilder.ToImmutable());
|
|
|
|
return await EnqueueAsync(tenantId, request, cancellationToken).ConfigureAwait(false);
|
|
}
|
|
|
|
private string GenerateRunId(string policyId, DateTimeOffset timestamp)
|
|
{
|
|
var normalizedPolicyId = string.IsNullOrWhiteSpace(policyId) ? "policy" : policyId.Trim();
|
|
var suffix = _guidProvider.NewGuid().ToString("N")[..8];
|
|
return $"run:{normalizedPolicyId}:{timestamp:yyyyMMddTHHmmssZ}:{suffix}";
|
|
}
|
|
|
|
private static bool IsTerminal(PolicyRunExecutionStatus status)
|
|
=> status is PolicyRunExecutionStatus.Succeeded or PolicyRunExecutionStatus.Failed or PolicyRunExecutionStatus.Cancelled;
|
|
|
|
private static string? NormalizeCancellationReason(string? value)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(value))
|
|
{
|
|
return null;
|
|
}
|
|
|
|
var trimmed = value.Trim();
|
|
const int maxLength = 512;
|
|
return trimmed.Length > maxLength ? trimmed[..maxLength] : trimmed;
|
|
}
|
|
|
|
private static string? NormalizeActor(string? actor)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(actor))
|
|
{
|
|
return null;
|
|
}
|
|
|
|
var trimmed = actor.Trim();
|
|
const int maxLength = 256;
|
|
return trimmed.Length > maxLength ? trimmed[..maxLength] : trimmed;
|
|
}
|
|
}
|