feat: Implement Policy Engine Evaluation Service and Cache with unit tests
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
Temp commit to debug
This commit is contained in:
@@ -1,8 +1,8 @@
|
||||
namespace StellaOps.Scheduler.WebService.GraphJobs;
|
||||
|
||||
internal readonly record struct GraphJobUpdateResult<TJob>(bool Updated, TJob Job) where TJob : class
|
||||
{
|
||||
public static GraphJobUpdateResult<TJob> UpdatedResult(TJob job) => new(true, job);
|
||||
|
||||
public static GraphJobUpdateResult<TJob> NotUpdated(TJob job) => new(false, job);
|
||||
}
|
||||
namespace StellaOps.Scheduler.WebService.GraphJobs;
|
||||
|
||||
internal readonly record struct GraphJobUpdateResult<TJob>(bool Updated, TJob Job) where TJob : class
|
||||
{
|
||||
public static GraphJobUpdateResult<TJob> UpdatedResult(TJob job) => new(true, job);
|
||||
|
||||
public static GraphJobUpdateResult<TJob> NotUpdated(TJob job) => new(false, job);
|
||||
}
|
||||
|
||||
@@ -1,42 +1,42 @@
|
||||
# Scheduler WebService Task Board (Sprint 16)
|
||||
|
||||
| ID | Status | Owner(s) | Depends on | Description | Exit Criteria |
|
||||
|----|--------|----------|------------|-------------|---------------|
|
||||
|
||||
## Policy Engine v2 (Sprint 20)
|
||||
|
||||
| ID | Status | Owner(s) | Depends on | Description | Exit Criteria |
|
||||
|----|--------|----------|------------|-------------|---------------|
|
||||
> 2025-10-29: Added `/api/v1/scheduler/policy/runs` create/list/get endpoints with in-memory queue, scope/tenant enforcement, and contract docs (`docs/SCHED-WEB-20-001-POLICY-RUNS.md`). Tests cover happy path + auth failures.
|
||||
> 2025-10-26: Use canonical request/response samples from `samples/api/scheduler/policy-*.json`; serializer contract defined in `src/Scheduler/__Libraries/StellaOps.Scheduler.Models/docs/SCHED-MODELS-20-001-POLICY-RUNS.md`.
|
||||
| SCHED-WEB-20-002 | BLOCKED (waiting on SCHED-WORKER-20-301) | Scheduler WebService Guild | SCHED-WEB-20-001, SCHED-WORKER-20-301 | Provide simulation trigger endpoint returning diff preview metadata and job state for UI/CLI consumption. | Simulation endpoint returns deterministic diffs metadata; rate limits enforced; tests cover concurrency. |
|
||||
> 2025-10-29: WebService requires Worker policy job orchestration + Policy Engine diff callbacks (POLICY-ENGINE-20-003/006) to provide simulation previews. Awaiting completion of SCHED-WORKER-20-301 before wiring API.
|
||||
|
||||
## Graph Explorer v1 (Sprint 21)
|
||||
|
||||
| ID | Status | Owner(s) | Depends on | Description | Exit Criteria |
|
||||
|----|--------|----------|------------|-------------|---------------|
|
||||
| SCHED-WEB-21-004 | DONE (2025-11-04) | Scheduler WebService Guild, Scheduler Storage Guild | SCHED-WEB-21-001, SCHED-STORAGE-16-201 | Persist graph job lifecycle to Mongo storage and publish `scheduler.graph.job.completed@1` events + outbound webhook to Cartographer. | Storage repositories updated; events emitted; webhook payload documented; integration tests cover storage + event flow. **Note:** Events currently log JSON envelopes while the shared platform bus is provisioned. Cartographer webhook now posts JSON payloads when configured; replace inline logging with bus publisher once the shared event transport is online. |
|
||||
> 2025-10-30: Implemented Redis-backed publisher (`Scheduler:Events:GraphJobs`) emitting `scheduler.graph.job.completed@1` to configured stream with optional logging fallback; docs/configs to be validated with DevOps before closing.
|
||||
> 2025-11-04: Resumed SCHED-WEB-21-004 to finalize Mongo lifecycle persistence guards, graph completion events, and Cartographer webhook verification.
|
||||
> 2025-11-04: SCHED-WEB-21-004 completed – lifecycle stored in Mongo with optimistic concurrency, completion events/webhooks emitted once per transition, and result URI metadata refreshed idempotently with unit/integration coverage.
|
||||
|
||||
## StellaOps Console (Sprint 23)
|
||||
| ID | Status | Owner(s) | Depends on | Description | Exit Criteria |
|
||||
|----|--------|----------|------------|-------------|---------------|
|
||||
| SCHED-CONSOLE-23-001 | DONE (2025-11-03) | Scheduler WebService Guild, BE-Base Platform Guild | SCHED-WEB-16-103, SCHED-WEB-20-001 | Extend runs APIs with live progress SSE endpoints (`/console/runs/{id}/stream`), queue lag summaries, diff metadata fetch, retry/cancel hooks with RBAC enforcement, and deterministic pagination for history views consumed by Console. | SSE emits heartbeats/backoff headers, progress payload schema documented, unauthorized actions blocked in integration tests, metrics/logs expose queue lag + correlation IDs. |
|
||||
|
||||
## Policy Studio (Sprint 27)
|
||||
| ID | Status | Owner(s) | Depends on | Description | Exit Criteria |
|
||||
|----|--------|----------|------------|-------------|---------------|
|
||||
| SCHED-CONSOLE-27-001 | DONE (2025-11-03) | Scheduler WebService Guild, Policy Registry Guild | SCHED-WEB-16-103, REGISTRY-API-27-005 | Provide policy batch simulation orchestration endpoints (`/policies/simulations` POST/GET) exposing run creation, shard status, SSE progress, cancellation, and retries with RBAC enforcement. | API handles shard lifecycle with SSE heartbeats + retry headers; unauthorized requests rejected; integration tests cover submit/cancel/resume flows. |
|
||||
| SCHED-CONSOLE-27-002 | DOING (2025-11-03) | Scheduler WebService Guild, Observability Guild | SCHED-CONSOLE-27-001 | Emit telemetry endpoints/metrics (`policy_simulation_queue_depth`, `policy_simulation_latency`) and webhook callbacks for completion/failure consumed by Registry. | Metrics exposed via gateway, dashboards seeded, webhook contract documented, integration tests validate metrics emission. |
|
||||
|
||||
## Vulnerability Explorer (Sprint 29)
|
||||
| ID | Status | Owner(s) | Depends on | Description | Exit Criteria |
|
||||
|----|--------|----------|------------|-------------|---------------|
|
||||
| SCHED-VULN-29-001 | TODO | Scheduler WebService Guild, Findings Ledger Guild | SCHED-WEB-16-103, SBOM-VULN-29-001 | Expose resolver job APIs (`POST /vuln/resolver/jobs`, `GET /vuln/resolver/jobs/{id}`) to trigger candidate recomputation per artifact/policy change with RBAC and rate limits. | Resolver APIs documented; integration tests cover submit/status/cancel; unauthorized requests rejected. |
|
||||
| SCHED-VULN-29-002 | TODO | Scheduler WebService Guild, Observability Guild | SCHED-VULN-29-001 | Provide projector lag metrics endpoint and webhook notifications for backlog breaches consumed by DevOps dashboards. | Lag metrics exposed; webhook events triggered on thresholds; docs updated. |
|
||||
|
||||
## Notes
|
||||
- 2025-10-27: Minimal API host now wires Authority, health endpoints, and restart-only plug-in discovery per architecture §§1–2.
|
||||
# Scheduler WebService Task Board (Sprint 16)
|
||||
|
||||
| ID | Status | Owner(s) | Depends on | Description | Exit Criteria |
|
||||
|----|--------|----------|------------|-------------|---------------|
|
||||
|
||||
## Policy Engine v2 (Sprint 20)
|
||||
|
||||
| ID | Status | Owner(s) | Depends on | Description | Exit Criteria |
|
||||
|----|--------|----------|------------|-------------|---------------|
|
||||
> 2025-10-29: Added `/api/v1/scheduler/policy/runs` create/list/get endpoints with in-memory queue, scope/tenant enforcement, and contract docs (`docs/SCHED-WEB-20-001-POLICY-RUNS.md`). Tests cover happy path + auth failures.
|
||||
> 2025-10-26: Use canonical request/response samples from `samples/api/scheduler/policy-*.json`; serializer contract defined in `src/Scheduler/__Libraries/StellaOps.Scheduler.Models/docs/SCHED-MODELS-20-001-POLICY-RUNS.md`.
|
||||
| SCHED-WEB-20-002 | BLOCKED (waiting on SCHED-WORKER-20-301) | Scheduler WebService Guild | SCHED-WEB-20-001, SCHED-WORKER-20-301 | Provide simulation trigger endpoint returning diff preview metadata and job state for UI/CLI consumption. | Simulation endpoint returns deterministic diffs metadata; rate limits enforced; tests cover concurrency. |
|
||||
> 2025-10-29: WebService requires Worker policy job orchestration + Policy Engine diff callbacks (POLICY-ENGINE-20-003/006) to provide simulation previews. Awaiting completion of SCHED-WORKER-20-301 before wiring API.
|
||||
|
||||
## Graph Explorer v1 (Sprint 21)
|
||||
|
||||
| ID | Status | Owner(s) | Depends on | Description | Exit Criteria |
|
||||
|----|--------|----------|------------|-------------|---------------|
|
||||
| SCHED-WEB-21-004 | DONE (2025-11-04) | Scheduler WebService Guild, Scheduler Storage Guild | SCHED-WEB-21-001, SCHED-STORAGE-16-201 | Persist graph job lifecycle to Mongo storage and publish `scheduler.graph.job.completed@1` events + outbound webhook to Cartographer. | Storage repositories updated; events emitted; webhook payload documented; integration tests cover storage + event flow. **Note:** Events currently log JSON envelopes while the shared platform bus is provisioned. Cartographer webhook now posts JSON payloads when configured; replace inline logging with bus publisher once the shared event transport is online. |
|
||||
> 2025-10-30: Implemented Redis-backed publisher (`Scheduler:Events:GraphJobs`) emitting `scheduler.graph.job.completed@1` to configured stream with optional logging fallback; docs/configs to be validated with DevOps before closing.
|
||||
> 2025-11-04: Resumed SCHED-WEB-21-004 to finalize Mongo lifecycle persistence guards, graph completion events, and Cartographer webhook verification.
|
||||
> 2025-11-04: SCHED-WEB-21-004 completed – lifecycle stored in Mongo with optimistic concurrency, completion events/webhooks emitted once per transition, and result URI metadata refreshed idempotently with unit/integration coverage.
|
||||
|
||||
## StellaOps Console (Sprint 23)
|
||||
| ID | Status | Owner(s) | Depends on | Description | Exit Criteria |
|
||||
|----|--------|----------|------------|-------------|---------------|
|
||||
| SCHED-CONSOLE-23-001 | DONE (2025-11-03) | Scheduler WebService Guild, BE-Base Platform Guild | SCHED-WEB-16-103, SCHED-WEB-20-001 | Extend runs APIs with live progress SSE endpoints (`/console/runs/{id}/stream`), queue lag summaries, diff metadata fetch, retry/cancel hooks with RBAC enforcement, and deterministic pagination for history views consumed by Console. | SSE emits heartbeats/backoff headers, progress payload schema documented, unauthorized actions blocked in integration tests, metrics/logs expose queue lag + correlation IDs. |
|
||||
|
||||
## Policy Studio (Sprint 27)
|
||||
| ID | Status | Owner(s) | Depends on | Description | Exit Criteria |
|
||||
|----|--------|----------|------------|-------------|---------------|
|
||||
| SCHED-CONSOLE-27-001 | DONE (2025-11-03) | Scheduler WebService Guild, Policy Registry Guild | SCHED-WEB-16-103, REGISTRY-API-27-005 | Provide policy batch simulation orchestration endpoints (`/policies/simulations` POST/GET) exposing run creation, shard status, SSE progress, cancellation, and retries with RBAC enforcement. | API handles shard lifecycle with SSE heartbeats + retry headers; unauthorized requests rejected; integration tests cover submit/cancel/resume flows. |
|
||||
| SCHED-CONSOLE-27-002 | DOING (2025-11-03) | Scheduler WebService Guild, Observability Guild | SCHED-CONSOLE-27-001 | Emit telemetry endpoints/metrics (`policy_simulation_queue_depth`, `policy_simulation_latency`) and webhook callbacks for completion/failure consumed by Registry. | Metrics exposed via gateway, dashboards seeded, webhook contract documented, integration tests validate metrics emission. |
|
||||
|
||||
## Vulnerability Explorer (Sprint 29)
|
||||
| ID | Status | Owner(s) | Depends on | Description | Exit Criteria |
|
||||
|----|--------|----------|------------|-------------|---------------|
|
||||
| SCHED-VULN-29-001 | TODO | Scheduler WebService Guild, Findings Ledger Guild | SCHED-WEB-16-103, SBOM-VULN-29-001 | Expose resolver job APIs (`POST /vuln/resolver/jobs`, `GET /vuln/resolver/jobs/{id}`) to trigger candidate recomputation per artifact/policy change with RBAC and rate limits. | Resolver APIs documented; integration tests cover submit/status/cancel; unauthorized requests rejected. |
|
||||
| SCHED-VULN-29-002 | TODO | Scheduler WebService Guild, Observability Guild | SCHED-VULN-29-001 | Provide projector lag metrics endpoint and webhook notifications for backlog breaches consumed by DevOps dashboards. | Lag metrics exposed; webhook events triggered on thresholds; docs updated. |
|
||||
|
||||
## Notes
|
||||
- 2025-10-27: Minimal API host now wires Authority, health endpoints, and restart-only plug-in discovery per architecture §§1–2.
|
||||
|
||||
@@ -1,12 +0,0 @@
|
||||
|
||||
## Policy simulations
|
||||
|
||||
`/api/v1/scheduler/policies/simulations` orchestrates Policy Engine runs in `simulate` mode without mutating persisted findings.
|
||||
|
||||
- **Create** — `POST /api/v1/scheduler/policies/simulations` (scope `policy:simulate`) enqueues a simulation for `policyId`/`policyVersion`, respecting optional `metadata` and structured `inputs` (`sbomSet`, `advisoryCursor`, `vexCursor`, `captureExplain`). Returns `201 Created` with `simulation.runId` and status `queued`.
|
||||
- **List/Get** — `GET /api/v1/scheduler/policies/simulations` and `/.../{simulationId}` expose `PolicyRunStatus` documents filtered to `mode=simulate`, including attempt counts, stats, and cancellation markers.
|
||||
- **Cancel** — `POST /.../{simulationId}/cancel` records `cancellationRequested=true` (optional reason, timestamp) and immediately reflects the updated status; workers honour the flag on the next lease cycle.
|
||||
- **Retry** — `POST /.../{simulationId}/retry` clones a terminal simulation (cancelled/failed/succeeded) into a fresh job preserving inputs/metadata. Non-terminal runs yield `409 Conflict`.
|
||||
- **Stream** — `GET /.../{simulationId}/stream` emits SSE events (`initial`, `status`, `queueLag`, `heartbeat`, `completed`) with the latest `PolicyRunStatus`, enabling Console to render shard progress and cancellation state in real time.
|
||||
|
||||
Simulation APIs share the same deterministic pagination/metadata contracts as policy runs and surface queue depth snapshots via the existing scheduler queue metrics.
|
||||
@@ -1,70 +1,70 @@
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using StellaOps.Scheduler.Models;
|
||||
using StellaOps.Scheduler.Storage.Mongo.Repositories;
|
||||
using StellaOps.Scheduler.WebService.GraphJobs;
|
||||
using Xunit;
|
||||
|
||||
namespace StellaOps.Scheduler.Storage.Mongo.Tests.Integration;
|
||||
|
||||
public sealed class GraphJobStoreTests
|
||||
{
|
||||
private static readonly DateTimeOffset OccurredAt = new(2025, 11, 4, 10, 30, 0, TimeSpan.Zero);
|
||||
|
||||
[Fact]
|
||||
public async Task UpdateAsync_SucceedsWhenExpectedStatusMatches()
|
||||
{
|
||||
using var harness = new SchedulerMongoTestHarness();
|
||||
var repository = new GraphJobRepository(harness.Context);
|
||||
var store = new MongoGraphJobStore(repository);
|
||||
|
||||
var initial = CreateBuildJob();
|
||||
await store.AddAsync(initial, CancellationToken.None);
|
||||
|
||||
var running = GraphJobStateMachine.EnsureTransition(initial, GraphJobStatus.Running, OccurredAt, attempts: initial.Attempts);
|
||||
var completed = GraphJobStateMachine.EnsureTransition(running, GraphJobStatus.Completed, OccurredAt, attempts: running.Attempts + 1);
|
||||
|
||||
var updateResult = await store.UpdateAsync(completed, GraphJobStatus.Pending, CancellationToken.None);
|
||||
|
||||
Assert.True(updateResult.Updated);
|
||||
var persisted = await store.GetBuildJobAsync(initial.TenantId, initial.Id, CancellationToken.None);
|
||||
Assert.NotNull(persisted);
|
||||
Assert.Equal(GraphJobStatus.Completed, persisted!.Status);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task UpdateAsync_ReturnsExistingWhenExpectedStatusMismatch()
|
||||
{
|
||||
using var harness = new SchedulerMongoTestHarness();
|
||||
var repository = new GraphJobRepository(harness.Context);
|
||||
var store = new MongoGraphJobStore(repository);
|
||||
|
||||
var initial = CreateBuildJob();
|
||||
await store.AddAsync(initial, CancellationToken.None);
|
||||
|
||||
var running = GraphJobStateMachine.EnsureTransition(initial, GraphJobStatus.Running, OccurredAt, attempts: initial.Attempts);
|
||||
var completed = GraphJobStateMachine.EnsureTransition(running, GraphJobStatus.Completed, OccurredAt, attempts: running.Attempts + 1);
|
||||
|
||||
await store.UpdateAsync(completed, GraphJobStatus.Pending, CancellationToken.None);
|
||||
|
||||
var result = await store.UpdateAsync(completed, GraphJobStatus.Pending, CancellationToken.None);
|
||||
|
||||
Assert.False(result.Updated);
|
||||
Assert.Equal(GraphJobStatus.Completed, result.Job.Status);
|
||||
}
|
||||
|
||||
private static GraphBuildJob CreateBuildJob()
|
||||
{
|
||||
var digest = "sha256:" + new string('b', 64);
|
||||
return new GraphBuildJob(
|
||||
id: "gbj_store_test",
|
||||
tenantId: "tenant-store",
|
||||
sbomId: "sbom-alpha",
|
||||
sbomVersionId: "sbom-alpha-v1",
|
||||
sbomDigest: digest,
|
||||
status: GraphJobStatus.Pending,
|
||||
trigger: GraphBuildJobTrigger.SbomVersion,
|
||||
createdAt: OccurredAt,
|
||||
metadata: null);
|
||||
}
|
||||
}
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using StellaOps.Scheduler.Models;
|
||||
using StellaOps.Scheduler.Storage.Mongo.Repositories;
|
||||
using StellaOps.Scheduler.WebService.GraphJobs;
|
||||
using Xunit;
|
||||
|
||||
namespace StellaOps.Scheduler.Storage.Mongo.Tests.Integration;
|
||||
|
||||
public sealed class GraphJobStoreTests
|
||||
{
|
||||
private static readonly DateTimeOffset OccurredAt = new(2025, 11, 4, 10, 30, 0, TimeSpan.Zero);
|
||||
|
||||
[Fact]
|
||||
public async Task UpdateAsync_SucceedsWhenExpectedStatusMatches()
|
||||
{
|
||||
using var harness = new SchedulerMongoTestHarness();
|
||||
var repository = new GraphJobRepository(harness.Context);
|
||||
var store = new MongoGraphJobStore(repository);
|
||||
|
||||
var initial = CreateBuildJob();
|
||||
await store.AddAsync(initial, CancellationToken.None);
|
||||
|
||||
var running = GraphJobStateMachine.EnsureTransition(initial, GraphJobStatus.Running, OccurredAt, attempts: initial.Attempts);
|
||||
var completed = GraphJobStateMachine.EnsureTransition(running, GraphJobStatus.Completed, OccurredAt, attempts: running.Attempts + 1);
|
||||
|
||||
var updateResult = await store.UpdateAsync(completed, GraphJobStatus.Pending, CancellationToken.None);
|
||||
|
||||
Assert.True(updateResult.Updated);
|
||||
var persisted = await store.GetBuildJobAsync(initial.TenantId, initial.Id, CancellationToken.None);
|
||||
Assert.NotNull(persisted);
|
||||
Assert.Equal(GraphJobStatus.Completed, persisted!.Status);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task UpdateAsync_ReturnsExistingWhenExpectedStatusMismatch()
|
||||
{
|
||||
using var harness = new SchedulerMongoTestHarness();
|
||||
var repository = new GraphJobRepository(harness.Context);
|
||||
var store = new MongoGraphJobStore(repository);
|
||||
|
||||
var initial = CreateBuildJob();
|
||||
await store.AddAsync(initial, CancellationToken.None);
|
||||
|
||||
var running = GraphJobStateMachine.EnsureTransition(initial, GraphJobStatus.Running, OccurredAt, attempts: initial.Attempts);
|
||||
var completed = GraphJobStateMachine.EnsureTransition(running, GraphJobStatus.Completed, OccurredAt, attempts: running.Attempts + 1);
|
||||
|
||||
await store.UpdateAsync(completed, GraphJobStatus.Pending, CancellationToken.None);
|
||||
|
||||
var result = await store.UpdateAsync(completed, GraphJobStatus.Pending, CancellationToken.None);
|
||||
|
||||
Assert.False(result.Updated);
|
||||
Assert.Equal(GraphJobStatus.Completed, result.Job.Status);
|
||||
}
|
||||
|
||||
private static GraphBuildJob CreateBuildJob()
|
||||
{
|
||||
var digest = "sha256:" + new string('b', 64);
|
||||
return new GraphBuildJob(
|
||||
id: "gbj_store_test",
|
||||
tenantId: "tenant-store",
|
||||
sbomId: "sbom-alpha",
|
||||
sbomVersionId: "sbom-alpha-v1",
|
||||
sbomDigest: digest,
|
||||
status: GraphJobStatus.Pending,
|
||||
trigger: GraphBuildJobTrigger.SbomVersion,
|
||||
createdAt: OccurredAt,
|
||||
metadata: null);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,218 +1,218 @@
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using StellaOps.Scheduler.Models;
|
||||
using StellaOps.Scheduler.WebService.GraphJobs;
|
||||
using Xunit;
|
||||
|
||||
namespace StellaOps.Scheduler.WebService.Tests;
|
||||
|
||||
public sealed class GraphJobServiceTests
|
||||
{
|
||||
private static readonly DateTimeOffset FixedTime = new(2025, 11, 4, 12, 0, 0, TimeSpan.Zero);
|
||||
|
||||
[Fact]
|
||||
public async Task CompleteBuildJob_PersistsMetadataAndPublishesOnce()
|
||||
{
|
||||
var store = new TrackingGraphJobStore();
|
||||
var initial = CreateBuildJob();
|
||||
await store.AddAsync(initial, CancellationToken.None);
|
||||
|
||||
var clock = new FixedClock(FixedTime);
|
||||
var publisher = new RecordingPublisher();
|
||||
var webhook = new RecordingWebhookClient();
|
||||
var service = new GraphJobService(store, clock, publisher, webhook);
|
||||
|
||||
var request = new GraphJobCompletionRequest
|
||||
{
|
||||
JobId = initial.Id,
|
||||
JobType = GraphJobQueryType.Build,
|
||||
Status = GraphJobStatus.Completed,
|
||||
OccurredAt = FixedTime,
|
||||
GraphSnapshotId = "graph_snap_final ",
|
||||
ResultUri = "oras://cartographer/bundle ",
|
||||
CorrelationId = "corr-123 "
|
||||
};
|
||||
|
||||
var response = await service.CompleteJobAsync(initial.TenantId, request, CancellationToken.None);
|
||||
|
||||
Assert.Equal(GraphJobStatus.Completed, response.Status);
|
||||
Assert.Equal(1, store.BuildUpdateCount);
|
||||
Assert.Single(publisher.Notifications);
|
||||
Assert.Single(webhook.Notifications);
|
||||
|
||||
var stored = await store.GetBuildJobAsync(initial.TenantId, initial.Id, CancellationToken.None);
|
||||
Assert.NotNull(stored);
|
||||
Assert.Equal("graph_snap_final", stored!.GraphSnapshotId);
|
||||
Assert.Equal("corr-123", stored.CorrelationId);
|
||||
Assert.True(stored.Metadata.TryGetValue("resultUri", out var resultUri));
|
||||
Assert.Equal("oras://cartographer/bundle", resultUri);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CompleteBuildJob_IsIdempotentWhenAlreadyCompleted()
|
||||
{
|
||||
var store = new TrackingGraphJobStore();
|
||||
var initial = CreateBuildJob();
|
||||
await store.AddAsync(initial, CancellationToken.None);
|
||||
|
||||
var clock = new FixedClock(FixedTime);
|
||||
var publisher = new RecordingPublisher();
|
||||
var webhook = new RecordingWebhookClient();
|
||||
var service = new GraphJobService(store, clock, publisher, webhook);
|
||||
|
||||
var request = new GraphJobCompletionRequest
|
||||
{
|
||||
JobId = initial.Id,
|
||||
JobType = GraphJobQueryType.Build,
|
||||
Status = GraphJobStatus.Completed,
|
||||
OccurredAt = FixedTime,
|
||||
GraphSnapshotId = "graph_snap_final",
|
||||
ResultUri = "oras://cartographer/bundle",
|
||||
CorrelationId = "corr-123"
|
||||
};
|
||||
|
||||
await service.CompleteJobAsync(initial.TenantId, request, CancellationToken.None);
|
||||
var updateCountAfterFirst = store.BuildUpdateCount;
|
||||
|
||||
var secondResponse = await service.CompleteJobAsync(initial.TenantId, request, CancellationToken.None);
|
||||
|
||||
Assert.Equal(GraphJobStatus.Completed, secondResponse.Status);
|
||||
Assert.Equal(updateCountAfterFirst, store.BuildUpdateCount);
|
||||
Assert.Single(publisher.Notifications);
|
||||
Assert.Single(webhook.Notifications);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CompleteBuildJob_UpdatesResultUriWithoutReemittingEvent()
|
||||
{
|
||||
var store = new TrackingGraphJobStore();
|
||||
var initial = CreateBuildJob();
|
||||
await store.AddAsync(initial, CancellationToken.None);
|
||||
|
||||
var clock = new FixedClock(FixedTime);
|
||||
var publisher = new RecordingPublisher();
|
||||
var webhook = new RecordingWebhookClient();
|
||||
var service = new GraphJobService(store, clock, publisher, webhook);
|
||||
|
||||
var firstRequest = new GraphJobCompletionRequest
|
||||
{
|
||||
JobId = initial.Id,
|
||||
JobType = GraphJobQueryType.Build,
|
||||
Status = GraphJobStatus.Completed,
|
||||
OccurredAt = FixedTime,
|
||||
GraphSnapshotId = "graph_snap_final",
|
||||
ResultUri = null,
|
||||
CorrelationId = "corr-123"
|
||||
};
|
||||
|
||||
await service.CompleteJobAsync(initial.TenantId, firstRequest, CancellationToken.None);
|
||||
Assert.Equal(1, store.BuildUpdateCount);
|
||||
Assert.Single(publisher.Notifications);
|
||||
Assert.Single(webhook.Notifications);
|
||||
|
||||
var secondRequest = firstRequest with
|
||||
{
|
||||
ResultUri = "oras://cartographer/bundle-v2",
|
||||
OccurredAt = FixedTime.AddSeconds(30)
|
||||
};
|
||||
|
||||
var response = await service.CompleteJobAsync(initial.TenantId, secondRequest, CancellationToken.None);
|
||||
|
||||
Assert.Equal(GraphJobStatus.Completed, response.Status);
|
||||
Assert.Equal(2, store.BuildUpdateCount);
|
||||
Assert.Single(publisher.Notifications);
|
||||
Assert.Single(webhook.Notifications);
|
||||
|
||||
var stored = await store.GetBuildJobAsync(initial.TenantId, initial.Id, CancellationToken.None);
|
||||
Assert.NotNull(stored);
|
||||
Assert.True(stored!.Metadata.TryGetValue("resultUri", out var resultUri));
|
||||
Assert.Equal("oras://cartographer/bundle-v2", resultUri);
|
||||
}
|
||||
|
||||
private static GraphBuildJob CreateBuildJob()
|
||||
{
|
||||
var digest = "sha256:" + new string('a', 64);
|
||||
return new GraphBuildJob(
|
||||
id: "gbj_test",
|
||||
tenantId: "tenant-alpha",
|
||||
sbomId: "sbom-alpha",
|
||||
sbomVersionId: "sbom-alpha-v1",
|
||||
sbomDigest: digest,
|
||||
status: GraphJobStatus.Pending,
|
||||
trigger: GraphBuildJobTrigger.SbomVersion,
|
||||
createdAt: FixedTime,
|
||||
metadata: null);
|
||||
}
|
||||
|
||||
private sealed class TrackingGraphJobStore : IGraphJobStore
|
||||
{
|
||||
private readonly InMemoryGraphJobStore _inner = new();
|
||||
|
||||
public int BuildUpdateCount { get; private set; }
|
||||
|
||||
public int OverlayUpdateCount { get; private set; }
|
||||
|
||||
public ValueTask<GraphBuildJob> AddAsync(GraphBuildJob job, CancellationToken cancellationToken)
|
||||
=> _inner.AddAsync(job, cancellationToken);
|
||||
|
||||
public ValueTask<GraphOverlayJob> AddAsync(GraphOverlayJob job, CancellationToken cancellationToken)
|
||||
=> _inner.AddAsync(job, cancellationToken);
|
||||
|
||||
public ValueTask<GraphJobCollection> GetJobsAsync(string tenantId, GraphJobQuery query, CancellationToken cancellationToken)
|
||||
=> _inner.GetJobsAsync(tenantId, query, cancellationToken);
|
||||
|
||||
public ValueTask<GraphBuildJob?> GetBuildJobAsync(string tenantId, string jobId, CancellationToken cancellationToken)
|
||||
=> _inner.GetBuildJobAsync(tenantId, jobId, cancellationToken);
|
||||
|
||||
public ValueTask<GraphOverlayJob?> GetOverlayJobAsync(string tenantId, string jobId, CancellationToken cancellationToken)
|
||||
=> _inner.GetOverlayJobAsync(tenantId, jobId, cancellationToken);
|
||||
|
||||
public async ValueTask<GraphJobUpdateResult<GraphBuildJob>> UpdateAsync(GraphBuildJob job, GraphJobStatus expectedStatus, CancellationToken cancellationToken)
|
||||
{
|
||||
BuildUpdateCount++;
|
||||
return await _inner.UpdateAsync(job, expectedStatus, cancellationToken);
|
||||
}
|
||||
|
||||
public async ValueTask<GraphJobUpdateResult<GraphOverlayJob>> UpdateAsync(GraphOverlayJob job, GraphJobStatus expectedStatus, CancellationToken cancellationToken)
|
||||
{
|
||||
OverlayUpdateCount++;
|
||||
return await _inner.UpdateAsync(job, expectedStatus, cancellationToken);
|
||||
}
|
||||
|
||||
public ValueTask<IReadOnlyCollection<GraphOverlayJob>> GetOverlayJobsAsync(string tenantId, CancellationToken cancellationToken)
|
||||
=> _inner.GetOverlayJobsAsync(tenantId, cancellationToken);
|
||||
}
|
||||
|
||||
private sealed class RecordingPublisher : IGraphJobCompletionPublisher
|
||||
{
|
||||
public List<GraphJobCompletionNotification> Notifications { get; } = new();
|
||||
|
||||
public Task PublishAsync(GraphJobCompletionNotification notification, CancellationToken cancellationToken)
|
||||
{
|
||||
Notifications.Add(notification);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class RecordingWebhookClient : ICartographerWebhookClient
|
||||
{
|
||||
public List<GraphJobCompletionNotification> Notifications { get; } = new();
|
||||
|
||||
public Task NotifyAsync(GraphJobCompletionNotification notification, CancellationToken cancellationToken)
|
||||
{
|
||||
Notifications.Add(notification);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class FixedClock : ISystemClock
|
||||
{
|
||||
public FixedClock(DateTimeOffset utcNow)
|
||||
{
|
||||
UtcNow = utcNow;
|
||||
}
|
||||
|
||||
public DateTimeOffset UtcNow { get; set; }
|
||||
}
|
||||
}
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using StellaOps.Scheduler.Models;
|
||||
using StellaOps.Scheduler.WebService.GraphJobs;
|
||||
using Xunit;
|
||||
|
||||
namespace StellaOps.Scheduler.WebService.Tests;
|
||||
|
||||
public sealed class GraphJobServiceTests
|
||||
{
|
||||
private static readonly DateTimeOffset FixedTime = new(2025, 11, 4, 12, 0, 0, TimeSpan.Zero);
|
||||
|
||||
[Fact]
|
||||
public async Task CompleteBuildJob_PersistsMetadataAndPublishesOnce()
|
||||
{
|
||||
var store = new TrackingGraphJobStore();
|
||||
var initial = CreateBuildJob();
|
||||
await store.AddAsync(initial, CancellationToken.None);
|
||||
|
||||
var clock = new FixedClock(FixedTime);
|
||||
var publisher = new RecordingPublisher();
|
||||
var webhook = new RecordingWebhookClient();
|
||||
var service = new GraphJobService(store, clock, publisher, webhook);
|
||||
|
||||
var request = new GraphJobCompletionRequest
|
||||
{
|
||||
JobId = initial.Id,
|
||||
JobType = GraphJobQueryType.Build,
|
||||
Status = GraphJobStatus.Completed,
|
||||
OccurredAt = FixedTime,
|
||||
GraphSnapshotId = "graph_snap_final ",
|
||||
ResultUri = "oras://cartographer/bundle ",
|
||||
CorrelationId = "corr-123 "
|
||||
};
|
||||
|
||||
var response = await service.CompleteJobAsync(initial.TenantId, request, CancellationToken.None);
|
||||
|
||||
Assert.Equal(GraphJobStatus.Completed, response.Status);
|
||||
Assert.Equal(1, store.BuildUpdateCount);
|
||||
Assert.Single(publisher.Notifications);
|
||||
Assert.Single(webhook.Notifications);
|
||||
|
||||
var stored = await store.GetBuildJobAsync(initial.TenantId, initial.Id, CancellationToken.None);
|
||||
Assert.NotNull(stored);
|
||||
Assert.Equal("graph_snap_final", stored!.GraphSnapshotId);
|
||||
Assert.Equal("corr-123", stored.CorrelationId);
|
||||
Assert.True(stored.Metadata.TryGetValue("resultUri", out var resultUri));
|
||||
Assert.Equal("oras://cartographer/bundle", resultUri);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CompleteBuildJob_IsIdempotentWhenAlreadyCompleted()
|
||||
{
|
||||
var store = new TrackingGraphJobStore();
|
||||
var initial = CreateBuildJob();
|
||||
await store.AddAsync(initial, CancellationToken.None);
|
||||
|
||||
var clock = new FixedClock(FixedTime);
|
||||
var publisher = new RecordingPublisher();
|
||||
var webhook = new RecordingWebhookClient();
|
||||
var service = new GraphJobService(store, clock, publisher, webhook);
|
||||
|
||||
var request = new GraphJobCompletionRequest
|
||||
{
|
||||
JobId = initial.Id,
|
||||
JobType = GraphJobQueryType.Build,
|
||||
Status = GraphJobStatus.Completed,
|
||||
OccurredAt = FixedTime,
|
||||
GraphSnapshotId = "graph_snap_final",
|
||||
ResultUri = "oras://cartographer/bundle",
|
||||
CorrelationId = "corr-123"
|
||||
};
|
||||
|
||||
await service.CompleteJobAsync(initial.TenantId, request, CancellationToken.None);
|
||||
var updateCountAfterFirst = store.BuildUpdateCount;
|
||||
|
||||
var secondResponse = await service.CompleteJobAsync(initial.TenantId, request, CancellationToken.None);
|
||||
|
||||
Assert.Equal(GraphJobStatus.Completed, secondResponse.Status);
|
||||
Assert.Equal(updateCountAfterFirst, store.BuildUpdateCount);
|
||||
Assert.Single(publisher.Notifications);
|
||||
Assert.Single(webhook.Notifications);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CompleteBuildJob_UpdatesResultUriWithoutReemittingEvent()
|
||||
{
|
||||
var store = new TrackingGraphJobStore();
|
||||
var initial = CreateBuildJob();
|
||||
await store.AddAsync(initial, CancellationToken.None);
|
||||
|
||||
var clock = new FixedClock(FixedTime);
|
||||
var publisher = new RecordingPublisher();
|
||||
var webhook = new RecordingWebhookClient();
|
||||
var service = new GraphJobService(store, clock, publisher, webhook);
|
||||
|
||||
var firstRequest = new GraphJobCompletionRequest
|
||||
{
|
||||
JobId = initial.Id,
|
||||
JobType = GraphJobQueryType.Build,
|
||||
Status = GraphJobStatus.Completed,
|
||||
OccurredAt = FixedTime,
|
||||
GraphSnapshotId = "graph_snap_final",
|
||||
ResultUri = null,
|
||||
CorrelationId = "corr-123"
|
||||
};
|
||||
|
||||
await service.CompleteJobAsync(initial.TenantId, firstRequest, CancellationToken.None);
|
||||
Assert.Equal(1, store.BuildUpdateCount);
|
||||
Assert.Single(publisher.Notifications);
|
||||
Assert.Single(webhook.Notifications);
|
||||
|
||||
var secondRequest = firstRequest with
|
||||
{
|
||||
ResultUri = "oras://cartographer/bundle-v2",
|
||||
OccurredAt = FixedTime.AddSeconds(30)
|
||||
};
|
||||
|
||||
var response = await service.CompleteJobAsync(initial.TenantId, secondRequest, CancellationToken.None);
|
||||
|
||||
Assert.Equal(GraphJobStatus.Completed, response.Status);
|
||||
Assert.Equal(2, store.BuildUpdateCount);
|
||||
Assert.Single(publisher.Notifications);
|
||||
Assert.Single(webhook.Notifications);
|
||||
|
||||
var stored = await store.GetBuildJobAsync(initial.TenantId, initial.Id, CancellationToken.None);
|
||||
Assert.NotNull(stored);
|
||||
Assert.True(stored!.Metadata.TryGetValue("resultUri", out var resultUri));
|
||||
Assert.Equal("oras://cartographer/bundle-v2", resultUri);
|
||||
}
|
||||
|
||||
private static GraphBuildJob CreateBuildJob()
|
||||
{
|
||||
var digest = "sha256:" + new string('a', 64);
|
||||
return new GraphBuildJob(
|
||||
id: "gbj_test",
|
||||
tenantId: "tenant-alpha",
|
||||
sbomId: "sbom-alpha",
|
||||
sbomVersionId: "sbom-alpha-v1",
|
||||
sbomDigest: digest,
|
||||
status: GraphJobStatus.Pending,
|
||||
trigger: GraphBuildJobTrigger.SbomVersion,
|
||||
createdAt: FixedTime,
|
||||
metadata: null);
|
||||
}
|
||||
|
||||
private sealed class TrackingGraphJobStore : IGraphJobStore
|
||||
{
|
||||
private readonly InMemoryGraphJobStore _inner = new();
|
||||
|
||||
public int BuildUpdateCount { get; private set; }
|
||||
|
||||
public int OverlayUpdateCount { get; private set; }
|
||||
|
||||
public ValueTask<GraphBuildJob> AddAsync(GraphBuildJob job, CancellationToken cancellationToken)
|
||||
=> _inner.AddAsync(job, cancellationToken);
|
||||
|
||||
public ValueTask<GraphOverlayJob> AddAsync(GraphOverlayJob job, CancellationToken cancellationToken)
|
||||
=> _inner.AddAsync(job, cancellationToken);
|
||||
|
||||
public ValueTask<GraphJobCollection> GetJobsAsync(string tenantId, GraphJobQuery query, CancellationToken cancellationToken)
|
||||
=> _inner.GetJobsAsync(tenantId, query, cancellationToken);
|
||||
|
||||
public ValueTask<GraphBuildJob?> GetBuildJobAsync(string tenantId, string jobId, CancellationToken cancellationToken)
|
||||
=> _inner.GetBuildJobAsync(tenantId, jobId, cancellationToken);
|
||||
|
||||
public ValueTask<GraphOverlayJob?> GetOverlayJobAsync(string tenantId, string jobId, CancellationToken cancellationToken)
|
||||
=> _inner.GetOverlayJobAsync(tenantId, jobId, cancellationToken);
|
||||
|
||||
public async ValueTask<GraphJobUpdateResult<GraphBuildJob>> UpdateAsync(GraphBuildJob job, GraphJobStatus expectedStatus, CancellationToken cancellationToken)
|
||||
{
|
||||
BuildUpdateCount++;
|
||||
return await _inner.UpdateAsync(job, expectedStatus, cancellationToken);
|
||||
}
|
||||
|
||||
public async ValueTask<GraphJobUpdateResult<GraphOverlayJob>> UpdateAsync(GraphOverlayJob job, GraphJobStatus expectedStatus, CancellationToken cancellationToken)
|
||||
{
|
||||
OverlayUpdateCount++;
|
||||
return await _inner.UpdateAsync(job, expectedStatus, cancellationToken);
|
||||
}
|
||||
|
||||
public ValueTask<IReadOnlyCollection<GraphOverlayJob>> GetOverlayJobsAsync(string tenantId, CancellationToken cancellationToken)
|
||||
=> _inner.GetOverlayJobsAsync(tenantId, cancellationToken);
|
||||
}
|
||||
|
||||
private sealed class RecordingPublisher : IGraphJobCompletionPublisher
|
||||
{
|
||||
public List<GraphJobCompletionNotification> Notifications { get; } = new();
|
||||
|
||||
public Task PublishAsync(GraphJobCompletionNotification notification, CancellationToken cancellationToken)
|
||||
{
|
||||
Notifications.Add(notification);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class RecordingWebhookClient : ICartographerWebhookClient
|
||||
{
|
||||
public List<GraphJobCompletionNotification> Notifications { get; } = new();
|
||||
|
||||
public Task NotifyAsync(GraphJobCompletionNotification notification, CancellationToken cancellationToken)
|
||||
{
|
||||
Notifications.Add(notification);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class FixedClock : ISystemClock
|
||||
{
|
||||
public FixedClock(DateTimeOffset utcNow)
|
||||
{
|
||||
UtcNow = utcNow;
|
||||
}
|
||||
|
||||
public DateTimeOffset UtcNow { get; set; }
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user