feat: Update policy simulation metrics and tests to align with new naming conventions and enhance telemetry coverage
This commit is contained in:
@@ -81,9 +81,11 @@ Summary: Scheduling & Automation focus on Scheduler (phase I).
|
||||
Task ID | State | Task description | Owners (Source)
|
||||
--- | --- | --- | ---
|
||||
SCHED-CONSOLE-23-001 | DONE (2025-11-03) | Extend runs APIs with live progress SSE endpoints (`/console/runs/{id}/stream`), queue lag summaries, diff metadata fetch, retry/cancel hooks with RBAC enforcement, and deterministic pagination for history views consumed by Console. | Scheduler WebService Guild, BE-Base Platform Guild (src/Scheduler/StellaOps.Scheduler.WebService/TASKS.md)
|
||||
SCHED-CONSOLE-27-001 | DONE (2025-11-03) | Provide policy batch simulation orchestration endpoints (`/policies/simulations` POST/GET) exposing run creation, shard status, SSE progress, cancellation, and retries with RBAC enforcement. Dependencies: SCHED-CONSOLE-23-001. | Scheduler WebService Guild, Policy Registry Guild (src/Scheduler/StellaOps.Scheduler.WebService/TASKS.md)
|
||||
SCHED-CONSOLE-27-002 | DOING (2025-11-03) | Emit telemetry endpoints/metrics (`policy_simulation_queue_depth`, `policy_simulation_latency`) and webhook callbacks for completion/failure consumed by Registry. Dependencies: SCHED-CONSOLE-27-001. | Scheduler WebService Guild, Observability Guild (src/Scheduler/StellaOps.Scheduler.WebService/TASKS.md)
|
||||
SCHED-IMPACT-16-303 | TODO | Snapshot/compaction + invalidation for removed images; persistence to RocksDB/Redis per architecture. | Scheduler ImpactIndex Guild (src/Scheduler/__Libraries/StellaOps.Scheduler.ImpactIndex/TASKS.md)
|
||||
SCHED-CONSOLE-27-001 | DONE (2025-11-03) | Provide policy batch simulation orchestration endpoints (`/policies/simulations` POST/GET) exposing run creation, shard status, SSE progress, cancellation, and retries with RBAC enforcement. Dependencies: SCHED-CONSOLE-23-001. | Scheduler WebService Guild, Policy Registry Guild (src/Scheduler/StellaOps.Scheduler.WebService/TASKS.md)
|
||||
SCHED-CONSOLE-27-002 | DONE (2025-11-05) | Emit telemetry endpoints/metrics (`policy_simulation_queue_depth`, `policy_simulation_latency_seconds`) and webhook callbacks for completion/failure consumed by Registry. Dependencies: SCHED-CONSOLE-27-001. | Scheduler WebService Guild, Observability Guild (src/Scheduler/StellaOps.Scheduler.WebService/TASKS.md)
|
||||
> 2025-11-05: Resumed instrumentation work to match `policy_simulation_latency_seconds` naming, add coverage for SSE latency recording, and validate webhook sample alignment before closing.
|
||||
> 2025-11-05: Ship telemetry updates + tests; local `dotnet test` blocked by pre-existing GraphJobs accessibility errors (`IGraphJobStore.UpdateAsync`).
|
||||
SCHED-IMPACT-16-303 | TODO | Snapshot/compaction + invalidation for removed images; persistence to RocksDB/Redis per architecture. | Scheduler ImpactIndex Guild (src/Scheduler/__Libraries/StellaOps.Scheduler.ImpactIndex/TASKS.md)
|
||||
SCHED-SURFACE-01 | TODO | Evaluate Surface.FS pointers when planning delta scans to avoid redundant work and prioritise drift-triggered assets. | Scheduler Worker Guild (src/Scheduler/__Libraries/StellaOps.Scheduler.Worker/TASKS.md)
|
||||
SCHED-VULN-29-001 | TODO | Expose resolver job APIs (`POST /vuln/resolver/jobs`, `GET /vuln/resolver/jobs/{id}`) to trigger candidate recomputation per artifact/policy change with RBAC and rate limits. | Scheduler WebService Guild, Findings Ledger Guild (src/Scheduler/StellaOps.Scheduler.WebService/TASKS.md)
|
||||
SCHED-VULN-29-002 | TODO | Provide projector lag metrics endpoint and webhook notifications for backlog breaches consumed by DevOps dashboards. Dependencies: SCHED-VULN-29-001. | Scheduler WebService Guild, Observability Guild (src/Scheduler/StellaOps.Scheduler.WebService/TASKS.md)
|
||||
|
||||
@@ -81,7 +81,7 @@ sequenceDiagram
|
||||
- **Queue** – Backed by Mongo + optional NATS for fan-out; supports leases and replay on crash.
|
||||
- **Engine** – Stateless worker executing the deterministic evaluator.
|
||||
- **Store** – Mongo collections: `policy_runs`, `effective_finding_{policyId}`, `policy_run_events` (append-only history), optional object storage for explain traces.
|
||||
- **Observability** – Prometheus metrics (`policy_run_seconds`, `policy_simulation_queue_depth`, `policy_simulation_latency`), OTLP traces, structured logs.
|
||||
- **Observability** – Prometheus metrics (`policy_run_seconds`, `policy_simulation_queue_depth`, `policy_simulation_latency_seconds`), OTLP traces, structured logs.
|
||||
|
||||
---
|
||||
|
||||
|
||||
@@ -56,7 +56,7 @@ internal sealed class PolicySimulationMetricsProvider : IPolicySimulationMetrics
|
||||
unit: "runs",
|
||||
description: "Queued policy simulation jobs grouped by status.");
|
||||
_latencyHistogram = _meter.CreateHistogram<double>(
|
||||
"policy_simulation_latency",
|
||||
"policy_simulation_latency_seconds",
|
||||
unit: "s",
|
||||
description: "End-to-end policy simulation latency (seconds).");
|
||||
}
|
||||
|
||||
@@ -29,10 +29,12 @@
|
||||
## Policy Studio (Sprint 27)
|
||||
| ID | Status | Owner(s) | Depends on | Description | Exit Criteria |
|
||||
|----|--------|----------|------------|-------------|---------------|
|
||||
| SCHED-CONSOLE-27-001 | DONE (2025-11-03) | Scheduler WebService Guild, Policy Registry Guild | SCHED-WEB-16-103, REGISTRY-API-27-005 | Provide policy batch simulation orchestration endpoints (`/policies/simulations` POST/GET) exposing run creation, shard status, SSE progress, cancellation, and retries with RBAC enforcement. | API handles shard lifecycle with SSE heartbeats + retry headers; unauthorized requests rejected; integration tests cover submit/cancel/resume flows. |
|
||||
| SCHED-CONSOLE-27-002 | DOING (2025-11-03) | Scheduler WebService Guild, Observability Guild | SCHED-CONSOLE-27-001 | Emit telemetry endpoints/metrics (`policy_simulation_queue_depth`, `policy_simulation_latency`) and webhook callbacks for completion/failure consumed by Registry. | Metrics exposed via gateway, dashboards seeded, webhook contract documented, integration tests validate metrics emission. |
|
||||
|
||||
## Vulnerability Explorer (Sprint 29)
|
||||
| SCHED-CONSOLE-27-001 | DONE (2025-11-03) | Scheduler WebService Guild, Policy Registry Guild | SCHED-WEB-16-103, REGISTRY-API-27-005 | Provide policy batch simulation orchestration endpoints (`/policies/simulations` POST/GET) exposing run creation, shard status, SSE progress, cancellation, and retries with RBAC enforcement. | API handles shard lifecycle with SSE heartbeats + retry headers; unauthorized requests rejected; integration tests cover submit/cancel/resume flows. |
|
||||
| SCHED-CONSOLE-27-002 | DONE (2025-11-05) | Scheduler WebService Guild, Observability Guild | SCHED-CONSOLE-27-001 | Emit telemetry endpoints/metrics (`policy_simulation_queue_depth`, `policy_simulation_latency_seconds`) and webhook callbacks for completion/failure consumed by Registry. | Metrics exposed via gateway, dashboards seeded, webhook contract documented, integration tests validate metrics emission. |
|
||||
> 2025-11-05: Resuming to align instrumentation naming with architecture spec, exercise latency recording in SSE flows, and ensure registry webhook contract (samples/docs) reflects terminal result behaviour.
|
||||
> 2025-11-05: Histogram renamed to `policy_simulation_latency_seconds`, queue gauge kept stable, new unit tests cover metrics capture/latency recording, and docs updated. Local `dotnet test` build currently blocked by existing GraphJobs visibility errors (see `StellaOps.Scheduler.WebService/GraphJobs/IGraphJobStore.cs`).
|
||||
|
||||
## Vulnerability Explorer (Sprint 29)
|
||||
| ID | Status | Owner(s) | Depends on | Description | Exit Criteria |
|
||||
|----|--------|----------|------------|-------------|---------------|
|
||||
| SCHED-VULN-29-001 | TODO | Scheduler WebService Guild, Findings Ledger Guild | SCHED-WEB-16-103, SBOM-VULN-29-001 | Expose resolver job APIs (`POST /vuln/resolver/jobs`, `GET /vuln/resolver/jobs/{id}`) to trigger candidate recomputation per artifact/policy change with RBAC and rate limits. | Resolver APIs documented; integration tests cover submit/status/cancel; unauthorized requests rejected. |
|
||||
|
||||
@@ -313,7 +313,7 @@ X-Tenant-Id: tenant-alpha
|
||||
Authorization: Bearer <OpTok>
|
||||
```
|
||||
|
||||
Returns queue depth and latency summaries tailored for simulation dashboards and alerting. Response properties align with the metric names exposed via OTEL (`policy_simulation_queue_depth`, `policy_simulation_latency`). Canonical payload lives at `samples/api/scheduler/policy-simulation-metrics.json`.
|
||||
Returns queue depth and latency summaries tailored for simulation dashboards and alerting. Response properties align with the metric names exposed via OTEL (`policy_simulation_queue_depth`, `policy_simulation_latency_seconds`). Canonical payload lives at `samples/api/scheduler/policy-simulation-metrics.json`.
|
||||
|
||||
- `policy_simulation_queue_depth.total` — pending simulation jobs (aggregate of `pending`, `dispatching`, `submitted`).
|
||||
- `policy_simulation_latency.*` — latency percentiles (seconds) computed from the most recent terminal simulations.
|
||||
|
||||
@@ -0,0 +1,242 @@
|
||||
using System.Collections.Immutable;
|
||||
using System.Diagnostics.Metrics;
|
||||
using StellaOps.Scheduler.Models;
|
||||
using StellaOps.Scheduler.Storage.Mongo.Repositories;
|
||||
using StellaOps.Scheduler.WebService.PolicySimulations;
|
||||
|
||||
namespace StellaOps.Scheduler.WebService.Tests;
|
||||
|
||||
public sealed class PolicySimulationMetricsProviderTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task CaptureAsync_ComputesQueueDepthAndLatency()
|
||||
{
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
var queueCounts = new Dictionary<PolicyRunJobStatus, long>
|
||||
{
|
||||
[PolicyRunJobStatus.Pending] = 2,
|
||||
[PolicyRunJobStatus.Dispatching] = 1,
|
||||
[PolicyRunJobStatus.Submitted] = 1
|
||||
};
|
||||
|
||||
var jobs = new List<PolicyRunJob>
|
||||
{
|
||||
CreateJob(
|
||||
status: PolicyRunJobStatus.Completed,
|
||||
queuedAt: now.AddSeconds(-30),
|
||||
submittedAt: now.AddSeconds(-28),
|
||||
completedAt: now.AddSeconds(-20)),
|
||||
CreateJob(
|
||||
status: PolicyRunJobStatus.Cancelled,
|
||||
queuedAt: now.AddSeconds(-50),
|
||||
submittedAt: now.AddSeconds(-48),
|
||||
completedAt: null,
|
||||
cancelledAt: now.AddSeconds(-20))
|
||||
};
|
||||
|
||||
await using var provider = new PolicySimulationMetricsProvider(
|
||||
new StubPolicyRunJobRepository(queueCounts, jobs));
|
||||
|
||||
var response = await provider.CaptureAsync("tenant-alpha", CancellationToken.None);
|
||||
|
||||
Assert.Equal(4, response.QueueDepth.Total);
|
||||
Assert.Equal(2, response.QueueDepth.ByStatus["pending"]);
|
||||
Assert.Equal(1, response.QueueDepth.ByStatus["dispatching"]);
|
||||
Assert.Equal(1, response.QueueDepth.ByStatus["submitted"]);
|
||||
|
||||
Assert.Equal(2, response.Latency.Samples);
|
||||
Assert.Equal(20.0, response.Latency.Mean);
|
||||
Assert.Equal(20.0, response.Latency.P50);
|
||||
Assert.Equal(28.0, response.Latency.P90);
|
||||
Assert.Equal(29.0, response.Latency.P95);
|
||||
Assert.Equal(30.0, response.Latency.P99);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void RecordLatency_EmitsHistogramMeasurement()
|
||||
{
|
||||
var repo = new StubPolicyRunJobRepository(
|
||||
counts: new Dictionary<PolicyRunJobStatus, long>(),
|
||||
jobs: Array.Empty<PolicyRunJob>());
|
||||
|
||||
using var provider = new PolicySimulationMetricsProvider(repo);
|
||||
|
||||
var measurements = new List<double>();
|
||||
using var listener = new MeterListener
|
||||
{
|
||||
InstrumentPublished = (instrument, meterListener) =>
|
||||
{
|
||||
if (instrument.Meter.Name == "StellaOps.Scheduler.WebService.PolicySimulations" &&
|
||||
instrument.Name == "policy_simulation_latency_seconds")
|
||||
{
|
||||
meterListener.EnableMeasurementEvents(instrument);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
listener.SetMeasurementEventCallback<double>((instrument, measurement, tags, state) =>
|
||||
{
|
||||
if (instrument.Name == "policy_simulation_latency_seconds")
|
||||
{
|
||||
measurements.Add(measurement);
|
||||
}
|
||||
});
|
||||
|
||||
listener.Start();
|
||||
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
var status = new PolicyRunStatus(
|
||||
runId: "run-1",
|
||||
tenantId: "tenant-alpha",
|
||||
policyId: "policy-alpha",
|
||||
policyVersion: 1,
|
||||
mode: PolicyRunMode.Simulate,
|
||||
status: PolicyRunExecutionStatus.Succeeded,
|
||||
priority: PolicyRunPriority.Normal,
|
||||
queuedAt: now.AddSeconds(-12),
|
||||
startedAt: now.AddSeconds(-10),
|
||||
finishedAt: now,
|
||||
stats: PolicyRunStats.Empty,
|
||||
inputs: PolicyRunInputs.Empty,
|
||||
determinismHash: null,
|
||||
errorCode: null,
|
||||
error: null,
|
||||
attempts: 1,
|
||||
traceId: null,
|
||||
explainUri: null,
|
||||
metadata: ImmutableSortedDictionary<string, string>.Empty,
|
||||
cancellationRequested: false,
|
||||
cancellationRequestedAt: null,
|
||||
cancellationReason: null,
|
||||
schemaVersion: null);
|
||||
|
||||
provider.RecordLatency(status, now);
|
||||
|
||||
listener.Dispose();
|
||||
|
||||
Assert.Single(measurements);
|
||||
Assert.Equal(12, measurements[0], precision: 6);
|
||||
}
|
||||
|
||||
private static PolicyRunJob CreateJob(
|
||||
PolicyRunJobStatus status,
|
||||
DateTimeOffset queuedAt,
|
||||
DateTimeOffset submittedAt,
|
||||
DateTimeOffset? completedAt,
|
||||
DateTimeOffset? cancelledAt = null)
|
||||
{
|
||||
var id = Guid.NewGuid().ToString("N");
|
||||
var runId = $"run:{id}";
|
||||
var updatedAt = completedAt ?? cancelledAt ?? submittedAt;
|
||||
|
||||
return new PolicyRunJob(
|
||||
SchemaVersion: SchedulerSchemaVersions.PolicyRunJob,
|
||||
Id: id,
|
||||
TenantId: "tenant-alpha",
|
||||
PolicyId: "policy-alpha",
|
||||
PolicyVersion: 1,
|
||||
Mode: PolicyRunMode.Simulate,
|
||||
Priority: PolicyRunPriority.Normal,
|
||||
PriorityRank: 0,
|
||||
RunId: runId,
|
||||
RequestedBy: "tester",
|
||||
CorrelationId: null,
|
||||
Metadata: ImmutableSortedDictionary<string, string>.Empty,
|
||||
Inputs: PolicyRunInputs.Empty,
|
||||
QueuedAt: queuedAt,
|
||||
Status: status,
|
||||
AttemptCount: 1,
|
||||
LastAttemptAt: submittedAt,
|
||||
LastError: null,
|
||||
CreatedAt: queuedAt,
|
||||
UpdatedAt: updatedAt,
|
||||
AvailableAt: queuedAt,
|
||||
SubmittedAt: submittedAt,
|
||||
CompletedAt: completedAt,
|
||||
LeaseOwner: null,
|
||||
LeaseExpiresAt: null,
|
||||
CancellationRequested: false,
|
||||
CancellationRequestedAt: null,
|
||||
CancellationReason: null,
|
||||
CancelledAt: cancelledAt);
|
||||
}
|
||||
|
||||
private sealed class StubPolicyRunJobRepository : IPolicyRunJobRepository
|
||||
{
|
||||
private readonly IReadOnlyDictionary<PolicyRunJobStatus, long> _counts;
|
||||
private readonly IReadOnlyList<PolicyRunJob> _jobs;
|
||||
|
||||
public StubPolicyRunJobRepository(
|
||||
IReadOnlyDictionary<PolicyRunJobStatus, long> counts,
|
||||
IReadOnlyList<PolicyRunJob> jobs)
|
||||
{
|
||||
_counts = counts;
|
||||
_jobs = jobs;
|
||||
}
|
||||
|
||||
public Task<long> CountAsync(
|
||||
string tenantId,
|
||||
PolicyRunMode mode,
|
||||
IReadOnlyCollection<PolicyRunJobStatus> statuses,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
long total = 0;
|
||||
foreach (var status in statuses)
|
||||
{
|
||||
if (_counts.TryGetValue(status, out var value))
|
||||
{
|
||||
total += value;
|
||||
}
|
||||
}
|
||||
|
||||
return Task.FromResult(total);
|
||||
}
|
||||
|
||||
public Task<IReadOnlyList<PolicyRunJob>> ListAsync(
|
||||
string tenantId,
|
||||
string? policyId = null,
|
||||
PolicyRunMode? mode = null,
|
||||
IReadOnlyCollection<PolicyRunJobStatus>? statuses = null,
|
||||
DateTimeOffset? queuedAfter = null,
|
||||
int limit = 50,
|
||||
MongoDB.Driver.IClientSessionHandle? session = null,
|
||||
CancellationToken cancellationToken = default)
|
||||
=> Task.FromResult(_jobs);
|
||||
|
||||
Task IPolicyRunJobRepository.InsertAsync(
|
||||
PolicyRunJob job,
|
||||
MongoDB.Driver.IClientSessionHandle? session,
|
||||
CancellationToken cancellationToken)
|
||||
=> throw new NotSupportedException();
|
||||
|
||||
Task<PolicyRunJob?> IPolicyRunJobRepository.GetAsync(
|
||||
string tenantId,
|
||||
string jobId,
|
||||
MongoDB.Driver.IClientSessionHandle? session,
|
||||
CancellationToken cancellationToken)
|
||||
=> throw new NotSupportedException();
|
||||
|
||||
Task<PolicyRunJob?> IPolicyRunJobRepository.GetByRunIdAsync(
|
||||
string tenantId,
|
||||
string runId,
|
||||
MongoDB.Driver.IClientSessionHandle? session,
|
||||
CancellationToken cancellationToken)
|
||||
=> throw new NotSupportedException();
|
||||
|
||||
Task<PolicyRunJob?> IPolicyRunJobRepository.LeaseAsync(
|
||||
string leaseOwner,
|
||||
DateTimeOffset now,
|
||||
TimeSpan leaseDuration,
|
||||
int maxAttempts,
|
||||
MongoDB.Driver.IClientSessionHandle? session,
|
||||
CancellationToken cancellationToken)
|
||||
=> throw new NotSupportedException();
|
||||
|
||||
Task<bool> IPolicyRunJobRepository.ReplaceAsync(
|
||||
PolicyRunJob job,
|
||||
string? expectedLeaseOwner,
|
||||
MongoDB.Driver.IClientSessionHandle? session,
|
||||
CancellationToken cancellationToken)
|
||||
=> throw new NotSupportedException();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user