Compare commits
2 Commits
822e3b6037
...
5a923d968c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5a923d968c | ||
|
|
ec3726ac22 |
@@ -82,8 +82,10 @@ Task ID | State | Task description | Owners (Source)
|
|||||||
--- | --- | --- | ---
|
--- | --- | --- | ---
|
||||||
SCHED-CONSOLE-23-001 | DONE (2025-11-03) | Extend runs APIs with live progress SSE endpoints (`/console/runs/{id}/stream`), queue lag summaries, diff metadata fetch, retry/cancel hooks with RBAC enforcement, and deterministic pagination for history views consumed by Console. | Scheduler WebService Guild, BE-Base Platform Guild (src/Scheduler/StellaOps.Scheduler.WebService/TASKS.md)
|
SCHED-CONSOLE-23-001 | DONE (2025-11-03) | Extend runs APIs with live progress SSE endpoints (`/console/runs/{id}/stream`), queue lag summaries, diff metadata fetch, retry/cancel hooks with RBAC enforcement, and deterministic pagination for history views consumed by Console. | Scheduler WebService Guild, BE-Base Platform Guild (src/Scheduler/StellaOps.Scheduler.WebService/TASKS.md)
|
||||||
SCHED-CONSOLE-27-001 | DONE (2025-11-03) | Provide policy batch simulation orchestration endpoints (`/policies/simulations` POST/GET) exposing run creation, shard status, SSE progress, cancellation, and retries with RBAC enforcement. Dependencies: SCHED-CONSOLE-23-001. | Scheduler WebService Guild, Policy Registry Guild (src/Scheduler/StellaOps.Scheduler.WebService/TASKS.md)
|
SCHED-CONSOLE-27-001 | DONE (2025-11-03) | Provide policy batch simulation orchestration endpoints (`/policies/simulations` POST/GET) exposing run creation, shard status, SSE progress, cancellation, and retries with RBAC enforcement. Dependencies: SCHED-CONSOLE-23-001. | Scheduler WebService Guild, Policy Registry Guild (src/Scheduler/StellaOps.Scheduler.WebService/TASKS.md)
|
||||||
SCHED-CONSOLE-27-002 | DOING (2025-11-03) | Emit telemetry endpoints/metrics (`policy_simulation_queue_depth`, `policy_simulation_latency`) and webhook callbacks for completion/failure consumed by Registry. Dependencies: SCHED-CONSOLE-27-001. | Scheduler WebService Guild, Observability Guild (src/Scheduler/StellaOps.Scheduler.WebService/TASKS.md)
|
SCHED-CONSOLE-27-002 | DONE (2025-11-05) | Emit telemetry endpoints/metrics (`policy_simulation_queue_depth`, `policy_simulation_latency_seconds`) and webhook callbacks for completion/failure consumed by Registry. Dependencies: SCHED-CONSOLE-27-001. | Scheduler WebService Guild, Observability Guild (src/Scheduler/StellaOps.Scheduler.WebService/TASKS.md)
|
||||||
> 2025-11-06: Tagged `policy_simulation_queue_depth` metrics with tenant identifiers and added unit coverage for the metrics provider snapshot.
|
> 2025-11-05: Resumed instrumentation work to match `policy_simulation_latency_seconds` naming, add coverage for SSE latency recording, and validate webhook sample alignment before closing.
|
||||||
|
> 2025-11-05: Ship telemetry updates + tests; local `dotnet test` blocked by pre-existing GraphJobs accessibility errors (`IGraphJobStore.UpdateAsync`).
|
||||||
|
> 2025-11-06: Added tenant-aware tagging to `policy_simulation_queue_depth` gauge samples and extended metrics-provider unit coverage.
|
||||||
SCHED-IMPACT-16-303 | TODO | Snapshot/compaction + invalidation for removed images; persistence to RocksDB/Redis per architecture. | Scheduler ImpactIndex Guild (src/Scheduler/__Libraries/StellaOps.Scheduler.ImpactIndex/TASKS.md)
|
SCHED-IMPACT-16-303 | TODO | Snapshot/compaction + invalidation for removed images; persistence to RocksDB/Redis per architecture. | Scheduler ImpactIndex Guild (src/Scheduler/__Libraries/StellaOps.Scheduler.ImpactIndex/TASKS.md)
|
||||||
SCHED-SURFACE-01 | TODO | Evaluate Surface.FS pointers when planning delta scans to avoid redundant work and prioritise drift-triggered assets. | Scheduler Worker Guild (src/Scheduler/__Libraries/StellaOps.Scheduler.Worker/TASKS.md)
|
SCHED-SURFACE-01 | TODO | Evaluate Surface.FS pointers when planning delta scans to avoid redundant work and prioritise drift-triggered assets. | Scheduler Worker Guild (src/Scheduler/__Libraries/StellaOps.Scheduler.Worker/TASKS.md)
|
||||||
SCHED-VULN-29-001 | TODO | Expose resolver job APIs (`POST /vuln/resolver/jobs`, `GET /vuln/resolver/jobs/{id}`) to trigger candidate recomputation per artifact/policy change with RBAC and rate limits. | Scheduler WebService Guild, Findings Ledger Guild (src/Scheduler/StellaOps.Scheduler.WebService/TASKS.md)
|
SCHED-VULN-29-001 | TODO | Expose resolver job APIs (`POST /vuln/resolver/jobs`, `GET /vuln/resolver/jobs/{id}`) to trigger candidate recomputation per artifact/policy change with RBAC and rate limits. | Scheduler WebService Guild, Findings Ledger Guild (src/Scheduler/StellaOps.Scheduler.WebService/TASKS.md)
|
||||||
|
|||||||
@@ -81,7 +81,7 @@ sequenceDiagram
|
|||||||
- **Queue** – Backed by Mongo + optional NATS for fan-out; supports leases and replay on crash.
|
- **Queue** – Backed by Mongo + optional NATS for fan-out; supports leases and replay on crash.
|
||||||
- **Engine** – Stateless worker executing the deterministic evaluator.
|
- **Engine** – Stateless worker executing the deterministic evaluator.
|
||||||
- **Store** – Mongo collections: `policy_runs`, `effective_finding_{policyId}`, `policy_run_events` (append-only history), optional object storage for explain traces.
|
- **Store** – Mongo collections: `policy_runs`, `effective_finding_{policyId}`, `policy_run_events` (append-only history), optional object storage for explain traces.
|
||||||
- **Observability** – Prometheus metrics (`policy_run_seconds`, `policy_simulation_queue_depth`, `policy_simulation_latency`), OTLP traces, structured logs.
|
- **Observability** – Prometheus metrics (`policy_run_seconds`, `policy_simulation_queue_depth`, `policy_simulation_latency_seconds`), OTLP traces, structured logs.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
|||||||
@@ -57,7 +57,7 @@ internal sealed class PolicySimulationMetricsProvider : IPolicySimulationMetrics
|
|||||||
unit: "runs",
|
unit: "runs",
|
||||||
description: "Queued policy simulation jobs grouped by status.");
|
description: "Queued policy simulation jobs grouped by status.");
|
||||||
_latencyHistogram = _meter.CreateHistogram<double>(
|
_latencyHistogram = _meter.CreateHistogram<double>(
|
||||||
"policy_simulation_latency",
|
"policy_simulation_latency_seconds",
|
||||||
unit: "s",
|
unit: "s",
|
||||||
description: "End-to-end policy simulation latency (seconds).");
|
description: "End-to-end policy simulation latency (seconds).");
|
||||||
}
|
}
|
||||||
@@ -84,9 +84,11 @@ internal sealed class PolicySimulationMetricsProvider : IPolicySimulationMetrics
|
|||||||
totalQueueDepth += count;
|
totalQueueDepth += count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var snapshot = new Dictionary<string, long>(queueCounts, StringComparer.Ordinal);
|
||||||
|
|
||||||
lock (_snapshotLock)
|
lock (_snapshotLock)
|
||||||
{
|
{
|
||||||
_latestQueueSnapshot = queueCounts;
|
_latestQueueSnapshot = snapshot;
|
||||||
_latestTenantId = tenantId;
|
_latestTenantId = tenantId;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -115,7 +117,7 @@ internal sealed class PolicySimulationMetricsProvider : IPolicySimulationMetrics
|
|||||||
Average(durations));
|
Average(durations));
|
||||||
|
|
||||||
return new PolicySimulationMetricsResponse(
|
return new PolicySimulationMetricsResponse(
|
||||||
new PolicySimulationQueueDepth(totalQueueDepth, queueCounts),
|
new PolicySimulationQueueDepth(totalQueueDepth, snapshot),
|
||||||
latencyMetrics);
|
latencyMetrics);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -30,8 +30,10 @@
|
|||||||
| ID | Status | Owner(s) | Depends on | Description | Exit Criteria |
|
| ID | Status | Owner(s) | Depends on | Description | Exit Criteria |
|
||||||
|----|--------|----------|------------|-------------|---------------|
|
|----|--------|----------|------------|-------------|---------------|
|
||||||
| SCHED-CONSOLE-27-001 | DONE (2025-11-03) | Scheduler WebService Guild, Policy Registry Guild | SCHED-WEB-16-103, REGISTRY-API-27-005 | Provide policy batch simulation orchestration endpoints (`/policies/simulations` POST/GET) exposing run creation, shard status, SSE progress, cancellation, and retries with RBAC enforcement. | API handles shard lifecycle with SSE heartbeats + retry headers; unauthorized requests rejected; integration tests cover submit/cancel/resume flows. |
|
| SCHED-CONSOLE-27-001 | DONE (2025-11-03) | Scheduler WebService Guild, Policy Registry Guild | SCHED-WEB-16-103, REGISTRY-API-27-005 | Provide policy batch simulation orchestration endpoints (`/policies/simulations` POST/GET) exposing run creation, shard status, SSE progress, cancellation, and retries with RBAC enforcement. | API handles shard lifecycle with SSE heartbeats + retry headers; unauthorized requests rejected; integration tests cover submit/cancel/resume flows. |
|
||||||
| SCHED-CONSOLE-27-002 | DOING (2025-11-03) | Scheduler WebService Guild, Observability Guild | SCHED-CONSOLE-27-001 | Emit telemetry endpoints/metrics (`policy_simulation_queue_depth`, `policy_simulation_latency`) and webhook callbacks for completion/failure consumed by Registry. | Metrics exposed via gateway, dashboards seeded, webhook contract documented, integration tests validate metrics emission. |
|
| SCHED-CONSOLE-27-002 | DONE (2025-11-05) | Scheduler WebService Guild, Observability Guild | SCHED-CONSOLE-27-001 | Emit telemetry endpoints/metrics (`policy_simulation_queue_depth`, `policy_simulation_latency_seconds`) and webhook callbacks for completion/failure consumed by Registry. | Metrics exposed via gateway, dashboards seeded, webhook contract documented, integration tests validate metrics emission. |
|
||||||
> 2025-11-06: Added tenant-aware tagging to `policy_simulation_queue_depth` metrics and unit coverage for the metrics provider snapshot.
|
> 2025-11-05: Resuming to align instrumentation naming with architecture spec, exercise latency recording in SSE flows, and ensure registry webhook contract (samples/docs) reflects terminal result behaviour.
|
||||||
|
> 2025-11-05: Histogram renamed to `policy_simulation_latency_seconds`, queue gauge kept stable, new unit tests cover metrics capture/latency recording, and docs updated. Local `dotnet test` build currently blocked by existing GraphJobs visibility errors (see `StellaOps.Scheduler.WebService/GraphJobs/IGraphJobStore.cs`).
|
||||||
|
> 2025-11-06: Added tenant-aware tagging to `policy_simulation_queue_depth` gauge samples and refreshed metrics provider snapshot coverage.
|
||||||
|
|
||||||
## Vulnerability Explorer (Sprint 29)
|
## Vulnerability Explorer (Sprint 29)
|
||||||
| ID | Status | Owner(s) | Depends on | Description | Exit Criteria |
|
| ID | Status | Owner(s) | Depends on | Description | Exit Criteria |
|
||||||
|
|||||||
@@ -313,7 +313,7 @@ X-Tenant-Id: tenant-alpha
|
|||||||
Authorization: Bearer <OpTok>
|
Authorization: Bearer <OpTok>
|
||||||
```
|
```
|
||||||
|
|
||||||
Returns queue depth and latency summaries tailored for simulation dashboards and alerting. Response properties align with the metric names exposed via OTEL (`policy_simulation_queue_depth`, `policy_simulation_latency`). Canonical payload lives at `samples/api/scheduler/policy-simulation-metrics.json`.
|
Returns queue depth and latency summaries tailored for simulation dashboards and alerting. Response properties align with the metric names exposed via OTEL (`policy_simulation_queue_depth`, `policy_simulation_latency_seconds`). Canonical payload lives at `samples/api/scheduler/policy-simulation-metrics.json`.
|
||||||
|
|
||||||
- `policy_simulation_queue_depth.total` — pending simulation jobs (aggregate of `pending`, `dispatching`, `submitted`).
|
- `policy_simulation_queue_depth.total` — pending simulation jobs (aggregate of `pending`, `dispatching`, `submitted`).
|
||||||
- `policy_simulation_latency.*` — latency percentiles (seconds) computed from the most recent terminal simulations.
|
- `policy_simulation_latency.*` — latency percentiles (seconds) computed from the most recent terminal simulations.
|
||||||
|
|||||||
@@ -16,6 +16,52 @@ namespace StellaOps.Scheduler.WebService.Tests;
|
|||||||
|
|
||||||
public sealed class PolicySimulationMetricsProviderTests
|
public sealed class PolicySimulationMetricsProviderTests
|
||||||
{
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task CaptureAsync_ComputesQueueDepthAndLatency()
|
||||||
|
{
|
||||||
|
var now = DateTimeOffset.UtcNow;
|
||||||
|
var counts = new Dictionary<PolicyRunJobStatus, long>
|
||||||
|
{
|
||||||
|
[PolicyRunJobStatus.Pending] = 2,
|
||||||
|
[PolicyRunJobStatus.Dispatching] = 1,
|
||||||
|
[PolicyRunJobStatus.Submitted] = 1
|
||||||
|
};
|
||||||
|
|
||||||
|
var jobs = new[]
|
||||||
|
{
|
||||||
|
CreateJob(
|
||||||
|
status: PolicyRunJobStatus.Completed,
|
||||||
|
queuedAt: now.AddSeconds(-30),
|
||||||
|
submittedAt: now.AddSeconds(-28),
|
||||||
|
completedAt: now.AddSeconds(-20)),
|
||||||
|
CreateJob(
|
||||||
|
status: PolicyRunJobStatus.Cancelled,
|
||||||
|
queuedAt: now.AddSeconds(-50),
|
||||||
|
submittedAt: now.AddSeconds(-48),
|
||||||
|
completedAt: null,
|
||||||
|
cancelledAt: now.AddSeconds(-20))
|
||||||
|
};
|
||||||
|
|
||||||
|
var repository = new StubPolicyRunJobRepository(counts, jobs);
|
||||||
|
|
||||||
|
using var provider = new PolicySimulationMetricsProvider(repository);
|
||||||
|
|
||||||
|
var response = await provider.CaptureAsync("tenant-alpha", CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.Equal(4, response.QueueDepth.Total);
|
||||||
|
Assert.Equal(2, response.QueueDepth.ByStatus["pending"]);
|
||||||
|
Assert.Equal(1, response.QueueDepth.ByStatus["dispatching"]);
|
||||||
|
Assert.Equal(1, response.QueueDepth.ByStatus["submitted"]);
|
||||||
|
|
||||||
|
Assert.Equal(2, response.Latency.Samples);
|
||||||
|
Assert.Equal(20.0, response.Latency.Mean);
|
||||||
|
Assert.Equal(20.0, response.Latency.P50);
|
||||||
|
Assert.Equal(28.0, response.Latency.P90);
|
||||||
|
Assert.Equal(29.0, response.Latency.P95);
|
||||||
|
Assert.True(response.Latency.P99.HasValue);
|
||||||
|
Assert.Equal(29.8, response.Latency.P99.Value, 1);
|
||||||
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public async Task CaptureAsync_UpdatesSnapshotAndEmitsTenantTaggedGauge()
|
public async Task CaptureAsync_UpdatesSnapshotAndEmitsTenantTaggedGauge()
|
||||||
{
|
{
|
||||||
@@ -25,41 +71,48 @@ public sealed class PolicySimulationMetricsProviderTests
|
|||||||
repository.QueueCounts[PolicyRunJobStatus.Submitted] = 2;
|
repository.QueueCounts[PolicyRunJobStatus.Submitted] = 2;
|
||||||
|
|
||||||
var now = DateTimeOffset.Parse("2025-11-06T10:00:00Z", CultureInfo.InvariantCulture, DateTimeStyles.AdjustToUniversal);
|
var now = DateTimeOffset.Parse("2025-11-06T10:00:00Z", CultureInfo.InvariantCulture, DateTimeStyles.AdjustToUniversal);
|
||||||
repository.TerminalJobs.Add(CreateJob("job-1", PolicyRunJobStatus.Completed, now.AddMinutes(-30), now.AddMinutes(-5)));
|
repository.Jobs.Add(CreateJob(
|
||||||
repository.TerminalJobs.Add(CreateJob("job-2", PolicyRunJobStatus.Failed, now.AddMinutes(-20), now.AddMinutes(-2)));
|
status: PolicyRunJobStatus.Completed,
|
||||||
|
queuedAt: now.AddMinutes(-30),
|
||||||
|
submittedAt: now.AddMinutes(-28),
|
||||||
|
completedAt: now.AddMinutes(-5),
|
||||||
|
id: "job-1",
|
||||||
|
runId: "run-job-1"));
|
||||||
|
repository.Jobs.Add(CreateJob(
|
||||||
|
status: PolicyRunJobStatus.Failed,
|
||||||
|
queuedAt: now.AddMinutes(-20),
|
||||||
|
submittedAt: now.AddMinutes(-18),
|
||||||
|
completedAt: now.AddMinutes(-2),
|
||||||
|
id: "job-2",
|
||||||
|
runId: "run-job-2",
|
||||||
|
lastError: "policy engine timeout"));
|
||||||
|
|
||||||
using var provider = new PolicySimulationMetricsProvider(repository);
|
using var provider = new PolicySimulationMetricsProvider(repository);
|
||||||
|
|
||||||
var response = await provider.CaptureAsync("tenant-alpha", CancellationToken.None);
|
|
||||||
|
|
||||||
Assert.Equal(6, response.QueueDepth.Total);
|
|
||||||
Assert.Equal(3, response.QueueDepth.ByStatus["pending"]);
|
|
||||||
Assert.Equal(2, response.QueueDepth.ByStatus["submitted"]);
|
|
||||||
|
|
||||||
var measurements = new List<(string Status, string Tenant, long Value)>();
|
var measurements = new List<(string Status, string Tenant, long Value)>();
|
||||||
using var listener = new MeterListener
|
using var listener = new MeterListener
|
||||||
{
|
{
|
||||||
InstrumentPublished = (instrument, listener) =>
|
InstrumentPublished = (instrument, meterListener) =>
|
||||||
{
|
{
|
||||||
if (instrument.Meter.Name == "StellaOps.Scheduler.WebService.PolicySimulations" &&
|
if (instrument.Meter.Name == "StellaOps.Scheduler.WebService.PolicySimulations" &&
|
||||||
instrument.Name == "policy_simulation_queue_depth")
|
instrument.Name == "policy_simulation_queue_depth")
|
||||||
{
|
{
|
||||||
listener.EnableMeasurementEvents(instrument);
|
meterListener.EnableMeasurementEvents(instrument);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
listener.SetMeasurementEventCallback<long>((instrument, measurement, tags, state) =>
|
listener.SetMeasurementEventCallback<long>((instrument, measurement, tags, state) =>
|
||||||
{
|
{
|
||||||
var status = "";
|
var status = string.Empty;
|
||||||
var tenant = "";
|
var tenant = string.Empty;
|
||||||
|
|
||||||
foreach (var tag in tags)
|
foreach (var tag in tags)
|
||||||
{
|
{
|
||||||
if (string.Equals(tag.Key, "status", StringComparison.Ordinal))
|
if (string.Equals(tag.Key, "status", StringComparison.Ordinal))
|
||||||
{
|
{
|
||||||
status = tag.Value?.ToString() ?? string.Empty;
|
status = tag.Value?.ToString() ?? string.Empty;
|
||||||
}
|
}
|
||||||
|
else if (string.Equals(tag.Key, "tenantId", StringComparison.Ordinal))
|
||||||
if (string.Equals(tag.Key, "tenantId", StringComparison.Ordinal))
|
|
||||||
{
|
{
|
||||||
tenant = tag.Value?.ToString() ?? string.Empty;
|
tenant = tag.Value?.ToString() ?? string.Empty;
|
||||||
}
|
}
|
||||||
@@ -68,65 +121,154 @@ public sealed class PolicySimulationMetricsProviderTests
|
|||||||
measurements.Add((status, tenant, measurement));
|
measurements.Add((status, tenant, measurement));
|
||||||
});
|
});
|
||||||
listener.Start();
|
listener.Start();
|
||||||
|
|
||||||
|
var response = await provider.CaptureAsync("tenant-alpha", CancellationToken.None);
|
||||||
|
Assert.Equal(6, response.QueueDepth.Total);
|
||||||
|
|
||||||
listener.RecordObservableInstruments();
|
listener.RecordObservableInstruments();
|
||||||
|
|
||||||
Assert.Contains(measurements, item =>
|
Assert.Contains(measurements, item =>
|
||||||
item.Status == "pending" &&
|
item.Status == "pending" &&
|
||||||
item.Tenant == "tenant-alpha" &&
|
item.Tenant == "tenant-alpha" &&
|
||||||
item.Value == 3);
|
item.Value == 3);
|
||||||
|
listener.Dispose();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static PolicyRunJob CreateJob(string id, PolicyRunJobStatus status, DateTimeOffset queuedAt, DateTimeOffset finishedAt)
|
[Fact]
|
||||||
|
public void RecordLatency_EmitsHistogramMeasurement()
|
||||||
{
|
{
|
||||||
DateTimeOffset? submittedAt = status is PolicyRunJobStatus.Completed or PolicyRunJobStatus.Failed
|
var repository = new StubPolicyRunJobRepository();
|
||||||
? queuedAt.AddMinutes(2)
|
|
||||||
: null;
|
using var provider = new PolicySimulationMetricsProvider(repository);
|
||||||
DateTimeOffset? completedAt = status is PolicyRunJobStatus.Completed or PolicyRunJobStatus.Failed
|
|
||||||
? finishedAt
|
var measurements = new List<double>();
|
||||||
: null;
|
using var listener = new MeterListener
|
||||||
DateTimeOffset? cancelledAt = status is PolicyRunJobStatus.Cancelled ? finishedAt : null;
|
{
|
||||||
var lastError = status is PolicyRunJobStatus.Failed ? "policy engine timeout" : null;
|
InstrumentPublished = (instrument, meterListener) =>
|
||||||
|
{
|
||||||
|
if (instrument.Meter.Name == "StellaOps.Scheduler.WebService.PolicySimulations" &&
|
||||||
|
instrument.Name == "policy_simulation_latency_seconds")
|
||||||
|
{
|
||||||
|
meterListener.EnableMeasurementEvents(instrument);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
listener.SetMeasurementEventCallback<double>((instrument, measurement, tags, state) =>
|
||||||
|
{
|
||||||
|
if (instrument.Name == "policy_simulation_latency_seconds")
|
||||||
|
{
|
||||||
|
measurements.Add(measurement);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
listener.Start();
|
||||||
|
|
||||||
|
var now = DateTimeOffset.UtcNow;
|
||||||
|
var latencyJob = CreateJob(
|
||||||
|
status: PolicyRunJobStatus.Completed,
|
||||||
|
queuedAt: now.AddSeconds(-12),
|
||||||
|
submittedAt: now.AddSeconds(-10),
|
||||||
|
completedAt: now,
|
||||||
|
id: "job-latency",
|
||||||
|
runId: "run-1");
|
||||||
|
var status = PolicyRunStatusFactory.Create(latencyJob, now);
|
||||||
|
|
||||||
|
provider.RecordLatency(status, now);
|
||||||
|
|
||||||
|
Assert.Single(measurements);
|
||||||
|
Assert.Equal(12, measurements[0], precision: 6);
|
||||||
|
|
||||||
|
listener.Dispose();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static PolicyRunJob CreateJob(
|
||||||
|
PolicyRunJobStatus status,
|
||||||
|
DateTimeOffset queuedAt,
|
||||||
|
DateTimeOffset? submittedAt,
|
||||||
|
DateTimeOffset? completedAt,
|
||||||
|
DateTimeOffset? cancelledAt = null,
|
||||||
|
string? id = null,
|
||||||
|
string? runId = null,
|
||||||
|
string? lastError = null)
|
||||||
|
{
|
||||||
|
var jobId = id ?? Guid.NewGuid().ToString("N");
|
||||||
|
var resolvedRunId = runId ?? $"run:{jobId}";
|
||||||
|
var updatedAt = completedAt ?? cancelledAt ?? submittedAt ?? queuedAt;
|
||||||
|
|
||||||
return new PolicyRunJob(
|
return new PolicyRunJob(
|
||||||
SchedulerSchemaVersions.PolicyRunJob,
|
SchemaVersion: SchedulerSchemaVersions.PolicyRunJob,
|
||||||
id,
|
Id: jobId,
|
||||||
"tenant-alpha",
|
TenantId: "tenant-alpha",
|
||||||
"policy-x",
|
PolicyId: "policy-alpha",
|
||||||
1,
|
PolicyVersion: 1,
|
||||||
PolicyRunMode.Simulate,
|
Mode: PolicyRunMode.Simulate,
|
||||||
PolicyRunPriority.Normal,
|
Priority: PolicyRunPriority.Normal,
|
||||||
0,
|
PriorityRank: 0,
|
||||||
$"run-{id}",
|
RunId: resolvedRunId,
|
||||||
"user:actor",
|
RequestedBy: "tester",
|
||||||
null,
|
CorrelationId: null,
|
||||||
null,
|
Metadata: ImmutableSortedDictionary<string, string>.Empty,
|
||||||
PolicyRunInputs.Empty,
|
Inputs: PolicyRunInputs.Empty,
|
||||||
queuedAt,
|
QueuedAt: queuedAt,
|
||||||
status,
|
Status: status,
|
||||||
1,
|
AttemptCount: 1,
|
||||||
finishedAt,
|
LastAttemptAt: submittedAt ?? completedAt ?? queuedAt,
|
||||||
status == PolicyRunJobStatus.Failed ? "policy engine timeout" : null,
|
LastError: lastError,
|
||||||
queuedAt,
|
CreatedAt: queuedAt,
|
||||||
finishedAt,
|
UpdatedAt: updatedAt,
|
||||||
finishedAt,
|
AvailableAt: queuedAt,
|
||||||
submittedAt,
|
SubmittedAt: submittedAt,
|
||||||
completedAt,
|
CompletedAt: completedAt,
|
||||||
null,
|
LeaseOwner: null,
|
||||||
null,
|
LeaseExpiresAt: null,
|
||||||
false,
|
CancellationRequested: false,
|
||||||
null,
|
CancellationRequestedAt: null,
|
||||||
null,
|
CancellationReason: null,
|
||||||
cancelledAt);
|
CancelledAt: cancelledAt);
|
||||||
}
|
}
|
||||||
|
|
||||||
private sealed class StubPolicyRunJobRepository : IPolicyRunJobRepository
|
private sealed class StubPolicyRunJobRepository : IPolicyRunJobRepository
|
||||||
{
|
{
|
||||||
public Dictionary<PolicyRunJobStatus, long> QueueCounts { get; } = new();
|
public StubPolicyRunJobRepository()
|
||||||
public List<PolicyRunJob> TerminalJobs { get; } = new();
|
|
||||||
|
|
||||||
public Task<long> CountAsync(string tenantId, PolicyRunMode mode, IReadOnlyCollection<PolicyRunJobStatus> statuses, CancellationToken cancellationToken = default)
|
|
||||||
{
|
{
|
||||||
var total = 0L;
|
}
|
||||||
|
|
||||||
|
public StubPolicyRunJobRepository(
|
||||||
|
IDictionary<PolicyRunJobStatus, long> counts,
|
||||||
|
IEnumerable<PolicyRunJob> jobs)
|
||||||
|
{
|
||||||
|
foreach (var pair in counts)
|
||||||
|
{
|
||||||
|
QueueCounts[pair.Key] = pair.Value;
|
||||||
|
}
|
||||||
|
|
||||||
|
Jobs.AddRange(jobs);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Dictionary<PolicyRunJobStatus, long> QueueCounts { get; } = new();
|
||||||
|
public List<PolicyRunJob> Jobs { get; } = new();
|
||||||
|
|
||||||
|
public Task InsertAsync(
|
||||||
|
PolicyRunJob job,
|
||||||
|
IClientSessionHandle? session = null,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
Jobs.Add(job);
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<long> CountAsync(
|
||||||
|
string tenantId,
|
||||||
|
PolicyRunMode mode,
|
||||||
|
IReadOnlyCollection<PolicyRunJobStatus> statuses,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
if (statuses is null || statuses.Count == 0)
|
||||||
|
{
|
||||||
|
return Task.FromResult(QueueCounts.Values.Sum());
|
||||||
|
}
|
||||||
|
|
||||||
|
long total = 0;
|
||||||
foreach (var status in statuses)
|
foreach (var status in statuses)
|
||||||
{
|
{
|
||||||
if (QueueCounts.TryGetValue(status, out var count))
|
if (QueueCounts.TryGetValue(status, out var count))
|
||||||
@@ -148,28 +290,50 @@ public sealed class PolicySimulationMetricsProviderTests
|
|||||||
IClientSessionHandle? session = null,
|
IClientSessionHandle? session = null,
|
||||||
CancellationToken cancellationToken = default)
|
CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
IReadOnlyList<PolicyRunJob> filtered = TerminalJobs;
|
IEnumerable<PolicyRunJob> query = Jobs;
|
||||||
|
|
||||||
if (statuses is { Count: > 0 })
|
if (statuses is { Count: > 0 })
|
||||||
{
|
{
|
||||||
filtered = TerminalJobs.Where(job => statuses.Contains(job.Status)).ToList();
|
query = query.Where(job => statuses.Contains(job.Status));
|
||||||
}
|
}
|
||||||
|
|
||||||
return Task.FromResult(filtered);
|
if (queuedAfter is not null)
|
||||||
|
{
|
||||||
|
query = query.Where(job => (job.QueuedAt ?? job.CreatedAt) >= queuedAfter.Value);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task<PolicyRunJob?> GetAsync(string tenantId, string jobId, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
var result = query.Take(limit).ToList().AsReadOnly();
|
||||||
|
return Task.FromResult<IReadOnlyList<PolicyRunJob>>(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<PolicyRunJob?> GetAsync(
|
||||||
|
string tenantId,
|
||||||
|
string jobId,
|
||||||
|
IClientSessionHandle? session = null,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
=> Task.FromResult<PolicyRunJob?>(Jobs.FirstOrDefault(job => job.Id == jobId));
|
||||||
|
|
||||||
|
public Task<PolicyRunJob?> GetByRunIdAsync(
|
||||||
|
string tenantId,
|
||||||
|
string runId,
|
||||||
|
IClientSessionHandle? session = null,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
=> Task.FromResult<PolicyRunJob?>(Jobs.FirstOrDefault(job => string.Equals(job.RunId, runId, StringComparison.Ordinal)));
|
||||||
|
|
||||||
|
public Task<PolicyRunJob?> LeaseAsync(
|
||||||
|
string leaseOwner,
|
||||||
|
DateTimeOffset now,
|
||||||
|
TimeSpan leaseDuration,
|
||||||
|
int maxAttempts,
|
||||||
|
IClientSessionHandle? session = null,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
=> Task.FromResult<PolicyRunJob?>(null);
|
=> Task.FromResult<PolicyRunJob?>(null);
|
||||||
|
|
||||||
public Task<PolicyRunJob?> GetByRunIdAsync(string tenantId, string runId, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
public Task<bool> ReplaceAsync(
|
||||||
=> Task.FromResult<PolicyRunJob?>(null);
|
PolicyRunJob job,
|
||||||
|
string? expectedLeaseOwner = null,
|
||||||
public Task InsertAsync(PolicyRunJob job, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
IClientSessionHandle? session = null,
|
||||||
=> Task.CompletedTask;
|
CancellationToken cancellationToken = default)
|
||||||
|
|
||||||
public Task<PolicyRunJob?> LeaseAsync(string leaseOwner, DateTimeOffset now, TimeSpan leaseDuration, int maxAttempts, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
|
||||||
=> Task.FromResult<PolicyRunJob?>(null);
|
|
||||||
|
|
||||||
public Task<bool> ReplaceAsync(PolicyRunJob job, string? expectedLeaseOwner = null, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
|
||||||
=> Task.FromResult(true);
|
=> Task.FromResult(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user