Files
git.stella-ops.org/src/Scheduler/StellaOps.Scheduler.WebService/PolicySimulations/PolicySimulationMetricsProvider.cs
master 5a923d968c
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
feat: Implement PackRunApprovalDecisionService for handling approval decisions
- Added PackRunApprovalDecisionService to manage approval workflows for pack runs.
- Introduced PackRunApprovalDecisionRequest and PackRunApprovalDecisionResult records.
- Implemented logic to apply approval decisions and schedule run resumes based on approvals.
- Updated related tests to validate approval decision functionality.

test: Enhance tests for PackRunApprovalDecisionService

- Created PackRunApprovalDecisionServiceTests to cover various approval scenarios.
- Added in-memory stores for approvals and states to facilitate testing.
- Validated behavior for applying approvals, including handling missing states.

test: Add FilesystemPackRunArtifactUploaderTests for artifact uploads

- Implemented tests for FilesystemPackRunArtifactUploader to ensure correct file handling.
- Verified that missing files are recorded without exceptions and outputs are written as expected.

fix: Update PackRunState creation to include plan reference

- Modified PackRunState creation logic to include the plan in the state.

chore: Refactor service registration in Program.cs

- Updated service registrations in Program.cs to include new approval store and dispatcher services.
- Ensured proper dependency injection for PackRunApprovalDecisionService.

chore: Enhance TaskRunnerServiceOptions for approval store paths

- Added ApprovalStorePath and other paths to TaskRunnerServiceOptions for better configuration.

chore: Update PackRunWorkerService to handle artifact uploads

- Integrated artifact uploading into PackRunWorkerService upon successful run completion.

docs: Update TASKS.md for sprint progress

- Documented progress on approvals workflow and related tasks in TASKS.md.
2025-11-06 11:09:00 +02:00

244 lines
8.1 KiB
C#

using System;
using System.Collections.Generic;
using System.Diagnostics.Metrics;
using System.Linq;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;
using StellaOps.Scheduler.Models;
using StellaOps.Scheduler.Storage.Mongo.Repositories;
namespace StellaOps.Scheduler.WebService.PolicySimulations;
internal interface IPolicySimulationMetricsProvider
{
Task<PolicySimulationMetricsResponse> CaptureAsync(string tenantId, CancellationToken cancellationToken);
}
internal interface IPolicySimulationMetricsRecorder
{
void RecordLatency(PolicyRunStatus status, DateTimeOffset observedAt);
}
internal sealed class PolicySimulationMetricsProvider : IPolicySimulationMetricsProvider, IPolicySimulationMetricsRecorder, IDisposable
{
private static readonly PolicyRunJobStatus[] QueueStatuses =
{
PolicyRunJobStatus.Pending,
PolicyRunJobStatus.Dispatching,
PolicyRunJobStatus.Submitted,
};
private static readonly PolicyRunJobStatus[] TerminalStatuses =
{
PolicyRunJobStatus.Completed,
PolicyRunJobStatus.Failed,
PolicyRunJobStatus.Cancelled,
};
private readonly IPolicyRunJobRepository _repository;
private readonly TimeProvider _timeProvider;
private readonly Meter _meter;
private readonly ObservableGauge<long> _queueGauge;
private readonly Histogram<double> _latencyHistogram;
private readonly object _snapshotLock = new();
private IReadOnlyDictionary<string, long> _latestQueueSnapshot = new Dictionary<string, long>(StringComparer.Ordinal);
private string _latestTenantId = string.Empty;
private bool _disposed;
public PolicySimulationMetricsProvider(IPolicyRunJobRepository repository, TimeProvider? timeProvider = null)
{
_repository = repository ?? throw new ArgumentNullException(nameof(repository));
_timeProvider = timeProvider ?? TimeProvider.System;
_meter = new Meter("StellaOps.Scheduler.WebService.PolicySimulations");
_queueGauge = _meter.CreateObservableGauge<long>(
"policy_simulation_queue_depth",
ObserveQueueDepth,
unit: "runs",
description: "Queued policy simulation jobs grouped by status.");
_latencyHistogram = _meter.CreateHistogram<double>(
"policy_simulation_latency_seconds",
unit: "s",
description: "End-to-end policy simulation latency (seconds).");
}
public async Task<PolicySimulationMetricsResponse> CaptureAsync(string tenantId, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
if (string.IsNullOrWhiteSpace(tenantId))
{
throw new ArgumentException("Tenant id must be provided.", nameof(tenantId));
}
var queueCounts = new Dictionary<string, long>(StringComparer.OrdinalIgnoreCase);
long totalQueueDepth = 0;
foreach (var status in QueueStatuses)
{
var count = await _repository.CountAsync(
tenantId,
PolicyRunMode.Simulate,
new[] { status },
cancellationToken).ConfigureAwait(false);
queueCounts[status.ToString().ToLowerInvariant()] = count;
totalQueueDepth += count;
}
var snapshot = new Dictionary<string, long>(queueCounts, StringComparer.Ordinal);
lock (_snapshotLock)
{
_latestQueueSnapshot = snapshot;
_latestTenantId = tenantId;
}
var sampleSize = 200;
var recentJobs = await _repository.ListAsync(
tenantId,
policyId: null,
mode: PolicyRunMode.Simulate,
statuses: TerminalStatuses,
queuedAfter: null,
limit: sampleSize,
cancellationToken: cancellationToken).ConfigureAwait(false);
var durations = recentJobs
.Select(job => CalculateLatencySeconds(job, _timeProvider.GetUtcNow()))
.Where(duration => duration >= 0)
.OrderBy(duration => duration)
.ToArray();
var latencyMetrics = new PolicySimulationLatencyMetrics(
durations.Length,
Percentile(durations, 0.50),
Percentile(durations, 0.90),
Percentile(durations, 0.95),
Percentile(durations, 0.99),
Average(durations));
return new PolicySimulationMetricsResponse(
new PolicySimulationQueueDepth(totalQueueDepth, snapshot),
latencyMetrics);
}
public void RecordLatency(PolicyRunStatus status, DateTimeOffset observedAt)
{
if (status is null)
{
throw new ArgumentNullException(nameof(status));
}
var latencySeconds = CalculateLatencySeconds(status, observedAt);
if (latencySeconds >= 0)
{
_latencyHistogram.Record(latencySeconds);
}
}
private IEnumerable<Measurement<long>> ObserveQueueDepth()
{
IReadOnlyDictionary<string, long> snapshot;
string tenantId;
lock (_snapshotLock)
{
snapshot = _latestQueueSnapshot;
tenantId = _latestTenantId;
}
tenantId = string.IsNullOrWhiteSpace(tenantId) ? "unknown" : tenantId;
foreach (var pair in snapshot)
{
yield return new Measurement<long>(
pair.Value,
new KeyValuePair<string, object?>("status", pair.Key),
new KeyValuePair<string, object?>("tenantId", tenantId));
}
}
private static double CalculateLatencySeconds(PolicyRunJob job, DateTimeOffset now)
{
var started = job.QueuedAt ?? job.CreatedAt;
var finished = job.CompletedAt ?? job.CancelledAt ?? job.UpdatedAt;
if (started == default)
{
return -1;
}
var duration = (finished - started).TotalSeconds;
return duration < 0 ? 0 : duration;
}
private static double CalculateLatencySeconds(PolicyRunStatus status, DateTimeOffset now)
{
var started = status.QueuedAt;
var finished = status.FinishedAt ?? now;
if (started == default)
{
return -1;
}
var duration = (finished - started).TotalSeconds;
return duration < 0 ? 0 : duration;
}
private static double? Percentile(IReadOnlyList<double> values, double percentile)
{
if (values.Count == 0)
{
return null;
}
var position = percentile * (values.Count - 1);
var lowerIndex = (int)Math.Floor(position);
var upperIndex = (int)Math.Ceiling(position);
if (lowerIndex == upperIndex)
{
return Math.Round(values[lowerIndex], 4);
}
var fraction = position - lowerIndex;
var interpolated = values[lowerIndex] + (values[upperIndex] - values[lowerIndex]) * fraction;
return Math.Round(interpolated, 4);
}
private static double? Average(IReadOnlyList<double> values)
{
if (values.Count == 0)
{
return null;
}
var sum = values.Sum();
return Math.Round(sum / values.Count, 4);
}
public void Dispose()
{
if (_disposed)
{
return;
}
_meter.Dispose();
_disposed = true;
}
}
internal sealed record PolicySimulationMetricsResponse(
[property: JsonPropertyName("policy_simulation_queue_depth")] PolicySimulationQueueDepth QueueDepth,
[property: JsonPropertyName("policy_simulation_latency")] PolicySimulationLatencyMetrics Latency);
internal sealed record PolicySimulationQueueDepth(
[property: JsonPropertyName("total")] long Total,
[property: JsonPropertyName("by_status")] IReadOnlyDictionary<string, long> ByStatus);
internal sealed record PolicySimulationLatencyMetrics(
[property: JsonPropertyName("samples")] int Samples,
[property: JsonPropertyName("p50_seconds")] double? P50,
[property: JsonPropertyName("p90_seconds")] double? P90,
[property: JsonPropertyName("p95_seconds")] double? P95,
[property: JsonPropertyName("p99_seconds")] double? P99,
[property: JsonPropertyName("mean_seconds")] double? Mean);