feat: Enhance Task Runner with simulation and failure policy support
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled

- Added tests for output projection and failure policy population in TaskPackPlanner.
- Introduced new failure policy manifest in TestManifests.
- Implemented simulation endpoints in the web service for task execution.
- Created TaskRunnerServiceOptions for configuration management.
- Updated appsettings.json to include TaskRunner configuration.
- Enhanced PackRunWorkerService to handle execution graphs and state management.
- Added support for parallel execution and conditional steps in the worker service.
- Updated documentation to reflect new features and changes in execution flow.
This commit is contained in:
master
2025-11-04 19:05:50 +02:00
parent 2eb6852d34
commit 3bd0955202
83 changed files with 15161 additions and 10678 deletions

View File

@@ -86,109 +86,287 @@ internal sealed class GraphJobService : IGraphJobService
}
public async Task<GraphJobCollection> GetJobsAsync(string tenantId, GraphJobQuery query, CancellationToken cancellationToken)
{
return await _store.GetJobsAsync(tenantId, query, cancellationToken);
}
public async Task<GraphJobResponse> CompleteJobAsync(string tenantId, GraphJobCompletionRequest request, CancellationToken cancellationToken)
{
if (request.Status is not (GraphJobStatus.Completed or GraphJobStatus.Failed or GraphJobStatus.Cancelled))
{
throw new ValidationException("Completion requires status completed, failed, or cancelled.");
}
var occurredAt = request.OccurredAt == default ? _clock.UtcNow : request.OccurredAt.ToUniversalTime();
switch (request.JobType)
{
case GraphJobQueryType.Build:
{
var existing = await _store.GetBuildJobAsync(tenantId, request.JobId, cancellationToken);
if (existing is null)
{
throw new KeyNotFoundException($"Graph build job '{request.JobId}' not found.");
}
var current = existing;
if (current.Status is GraphJobStatus.Pending or GraphJobStatus.Queued)
{
current = GraphJobStateMachine.EnsureTransition(current, GraphJobStatus.Running, occurredAt, attempts: current.Attempts);
}
var updated = GraphJobStateMachine.EnsureTransition(current, request.Status, occurredAt, attempts: current.Attempts + 1, errorMessage: request.Error);
var metadata = MergeMetadata(updated.Metadata, request.ResultUri);
var normalized = new GraphBuildJob(
id: updated.Id,
tenantId: updated.TenantId,
sbomId: updated.SbomId,
sbomVersionId: updated.SbomVersionId,
sbomDigest: updated.SbomDigest,
graphSnapshotId: request.GraphSnapshotId?.Trim() ?? updated.GraphSnapshotId,
status: updated.Status,
trigger: updated.Trigger,
attempts: updated.Attempts,
cartographerJobId: updated.CartographerJobId,
correlationId: request.CorrelationId?.Trim() ?? updated.CorrelationId,
createdAt: updated.CreatedAt,
startedAt: updated.StartedAt,
completedAt: updated.CompletedAt,
error: updated.Error,
metadata: metadata,
schemaVersion: updated.SchemaVersion);
var stored = await _store.UpdateAsync(normalized, cancellationToken);
var response = GraphJobResponse.From(stored);
await PublishCompletionAsync(tenantId, GraphJobQueryType.Build, request.Status, occurredAt, response, request.ResultUri, request.CorrelationId, request.Error, cancellationToken);
return response;
}
case GraphJobQueryType.Overlay:
{
var existing = await _store.GetOverlayJobAsync(tenantId, request.JobId, cancellationToken);
if (existing is null)
{
throw new KeyNotFoundException($"Graph overlay job '{request.JobId}' not found.");
}
var current = existing;
if (current.Status is GraphJobStatus.Pending or GraphJobStatus.Queued)
{
current = GraphJobStateMachine.EnsureTransition(current, GraphJobStatus.Running, occurredAt, attempts: current.Attempts);
}
var updated = GraphJobStateMachine.EnsureTransition(current, request.Status, occurredAt, attempts: current.Attempts + 1, errorMessage: request.Error);
var metadata = MergeMetadata(updated.Metadata, request.ResultUri);
var normalized = new GraphOverlayJob(
id: updated.Id,
tenantId: updated.TenantId,
graphSnapshotId: updated.GraphSnapshotId,
buildJobId: updated.BuildJobId,
overlayKind: updated.OverlayKind,
overlayKey: updated.OverlayKey,
subjects: updated.Subjects,
status: updated.Status,
trigger: updated.Trigger,
attempts: updated.Attempts,
correlationId: request.CorrelationId?.Trim() ?? updated.CorrelationId,
createdAt: updated.CreatedAt,
startedAt: updated.StartedAt,
completedAt: updated.CompletedAt,
error: updated.Error,
metadata: metadata,
schemaVersion: updated.SchemaVersion);
var stored = await _store.UpdateAsync(normalized, cancellationToken);
var response = GraphJobResponse.From(stored);
await PublishCompletionAsync(tenantId, GraphJobQueryType.Overlay, request.Status, occurredAt, response, request.ResultUri, request.CorrelationId, request.Error, cancellationToken);
return response;
}
default:
throw new ValidationException("Unsupported job type.");
}
}
public async Task<OverlayLagMetricsResponse> GetOverlayLagMetricsAsync(string tenantId, CancellationToken cancellationToken)
{
return await _store.GetJobsAsync(tenantId, query, cancellationToken);
}
public async Task<GraphJobResponse> CompleteJobAsync(string tenantId, GraphJobCompletionRequest request, CancellationToken cancellationToken)
{
if (request.Status is not (GraphJobStatus.Completed or GraphJobStatus.Failed or GraphJobStatus.Cancelled))
{
throw new ValidationException("Completion requires status completed, failed, or cancelled.");
}
var occurredAt = request.OccurredAt == default ? _clock.UtcNow : request.OccurredAt.ToUniversalTime();
var graphSnapshotId = Normalize(request.GraphSnapshotId);
var correlationId = Normalize(request.CorrelationId);
var resultUri = Normalize(request.ResultUri);
var error = request.Status == GraphJobStatus.Failed ? Normalize(request.Error) : null;
switch (request.JobType)
{
case GraphJobQueryType.Build:
{
var existing = await _store.GetBuildJobAsync(tenantId, request.JobId, cancellationToken).ConfigureAwait(false);
if (existing is null)
{
throw new KeyNotFoundException($"Graph build job '{request.JobId}' not found.");
}
return await CompleteBuildJobInternal(
tenantId,
existing,
request.Status,
occurredAt,
graphSnapshotId,
correlationId,
resultUri,
error,
cancellationToken).ConfigureAwait(false);
}
case GraphJobQueryType.Overlay:
{
var existing = await _store.GetOverlayJobAsync(tenantId, request.JobId, cancellationToken).ConfigureAwait(false);
if (existing is null)
{
throw new KeyNotFoundException($"Graph overlay job '{request.JobId}' not found.");
}
return await CompleteOverlayJobInternal(
tenantId,
existing,
request.Status,
occurredAt,
graphSnapshotId,
correlationId,
resultUri,
error,
cancellationToken).ConfigureAwait(false);
}
default:
throw new ValidationException("Unsupported job type.");
}
}
private async Task<GraphJobResponse> CompleteBuildJobInternal(
string tenantId,
GraphBuildJob current,
GraphJobStatus requestedStatus,
DateTimeOffset occurredAt,
string? graphSnapshotId,
string? correlationId,
string? resultUri,
string? error,
CancellationToken cancellationToken)
{
var latest = current;
for (var attempt = 0; attempt < 3; attempt++)
{
var transition = PrepareBuildTransition(latest, requestedStatus, occurredAt, graphSnapshotId, correlationId, resultUri, error);
if (!transition.HasChanges)
{
return GraphJobResponse.From(latest);
}
var updateResult = await _store.UpdateAsync(transition.Job, transition.ExpectedStatus, cancellationToken).ConfigureAwait(false);
if (updateResult.Updated)
{
var stored = updateResult.Job;
var response = GraphJobResponse.From(stored);
if (transition.ShouldPublish)
{
await PublishCompletionAsync(
tenantId,
GraphJobQueryType.Build,
stored.Status,
occurredAt,
response,
ExtractResultUri(response),
stored.CorrelationId,
stored.Error,
cancellationToken).ConfigureAwait(false);
}
return response;
}
latest = updateResult.Job;
}
return GraphJobResponse.From(latest);
}
private async Task<GraphJobResponse> CompleteOverlayJobInternal(
string tenantId,
GraphOverlayJob current,
GraphJobStatus requestedStatus,
DateTimeOffset occurredAt,
string? graphSnapshotId,
string? correlationId,
string? resultUri,
string? error,
CancellationToken cancellationToken)
{
var latest = current;
for (var attempt = 0; attempt < 3; attempt++)
{
var transition = PrepareOverlayTransition(latest, requestedStatus, occurredAt, graphSnapshotId, correlationId, resultUri, error);
if (!transition.HasChanges)
{
return GraphJobResponse.From(latest);
}
var updateResult = await _store.UpdateAsync(transition.Job, transition.ExpectedStatus, cancellationToken).ConfigureAwait(false);
if (updateResult.Updated)
{
var stored = updateResult.Job;
var response = GraphJobResponse.From(stored);
if (transition.ShouldPublish)
{
await PublishCompletionAsync(
tenantId,
GraphJobQueryType.Overlay,
stored.Status,
occurredAt,
response,
ExtractResultUri(response),
stored.CorrelationId,
stored.Error,
cancellationToken).ConfigureAwait(false);
}
return response;
}
latest = updateResult.Job;
}
return GraphJobResponse.From(latest);
}
private static CompletionTransition<GraphBuildJob> PrepareBuildTransition(
GraphBuildJob current,
GraphJobStatus requestedStatus,
DateTimeOffset occurredAt,
string? graphSnapshotId,
string? correlationId,
string? resultUri,
string? error)
{
var transitional = current;
if (transitional.Status is GraphJobStatus.Pending or GraphJobStatus.Queued)
{
transitional = GraphJobStateMachine.EnsureTransition(transitional, GraphJobStatus.Running, occurredAt, attempts: transitional.Attempts);
}
var desiredAttempts = transitional.Status == requestedStatus ? transitional.Attempts : transitional.Attempts + 1;
var updated = GraphJobStateMachine.EnsureTransition(transitional, requestedStatus, occurredAt, attempts: desiredAttempts, errorMessage: error);
var metadata = updated.Metadata;
if (resultUri is { Length: > 0 })
{
if (!metadata.TryGetValue("resultUri", out var existingValue) || !string.Equals(existingValue, resultUri, StringComparison.Ordinal))
{
metadata = MergeMetadata(metadata, resultUri);
}
}
var normalized = new GraphBuildJob(
id: updated.Id,
tenantId: updated.TenantId,
sbomId: updated.SbomId,
sbomVersionId: updated.SbomVersionId,
sbomDigest: updated.SbomDigest,
graphSnapshotId: graphSnapshotId ?? updated.GraphSnapshotId,
status: updated.Status,
trigger: updated.Trigger,
attempts: updated.Attempts,
cartographerJobId: updated.CartographerJobId,
correlationId: correlationId ?? updated.CorrelationId,
createdAt: updated.CreatedAt,
startedAt: updated.StartedAt,
completedAt: updated.CompletedAt,
error: updated.Error,
metadata: metadata,
schemaVersion: updated.SchemaVersion);
var hasChanges = !normalized.Equals(current);
var shouldPublish = hasChanges && current.Status != normalized.Status;
return new CompletionTransition<GraphBuildJob>(normalized, current.Status, hasChanges, shouldPublish);
}
private static CompletionTransition<GraphOverlayJob> PrepareOverlayTransition(
GraphOverlayJob current,
GraphJobStatus requestedStatus,
DateTimeOffset occurredAt,
string? graphSnapshotId,
string? correlationId,
string? resultUri,
string? error)
{
var transitional = current;
if (transitional.Status is GraphJobStatus.Pending or GraphJobStatus.Queued)
{
transitional = GraphJobStateMachine.EnsureTransition(transitional, GraphJobStatus.Running, occurredAt, attempts: transitional.Attempts);
}
var desiredAttempts = transitional.Status == requestedStatus ? transitional.Attempts : transitional.Attempts + 1;
var updated = GraphJobStateMachine.EnsureTransition(transitional, requestedStatus, occurredAt, attempts: desiredAttempts, errorMessage: error);
var metadata = updated.Metadata;
if (resultUri is { Length: > 0 })
{
if (!metadata.TryGetValue("resultUri", out var existingValue) || !string.Equals(existingValue, resultUri, StringComparison.Ordinal))
{
metadata = MergeMetadata(metadata, resultUri);
}
}
var normalized = new GraphOverlayJob(
id: updated.Id,
tenantId: updated.TenantId,
graphSnapshotId: graphSnapshotId ?? updated.GraphSnapshotId,
buildJobId: updated.BuildJobId,
overlayKind: updated.OverlayKind,
overlayKey: updated.OverlayKey,
subjects: updated.Subjects,
status: updated.Status,
trigger: updated.Trigger,
attempts: updated.Attempts,
correlationId: correlationId ?? updated.CorrelationId,
createdAt: updated.CreatedAt,
startedAt: updated.StartedAt,
completedAt: updated.CompletedAt,
error: updated.Error,
metadata: metadata,
schemaVersion: updated.SchemaVersion);
var hasChanges = !normalized.Equals(current);
var shouldPublish = hasChanges && current.Status != normalized.Status;
return new CompletionTransition<GraphOverlayJob>(normalized, current.Status, hasChanges, shouldPublish);
}
private static string? Normalize(string? value)
=> string.IsNullOrWhiteSpace(value) ? null : value.Trim();
private static string? ExtractResultUri(GraphJobResponse response)
=> response.Payload switch
{
GraphBuildJob build when build.Metadata.TryGetValue("resultUri", out var value) => value,
GraphOverlayJob overlay when overlay.Metadata.TryGetValue("resultUri", out var value) => value,
_ => null
};
private sealed record CompletionTransition<TJob>(TJob Job, GraphJobStatus ExpectedStatus, bool HasChanges, bool ShouldPublish)
where TJob : class;
public async Task<OverlayLagMetricsResponse> GetOverlayLagMetricsAsync(string tenantId, CancellationToken cancellationToken)
{
var now = _clock.UtcNow;
var overlayJobs = await _store.GetOverlayJobsAsync(tenantId, cancellationToken);

View File

@@ -0,0 +1,8 @@
namespace StellaOps.Scheduler.WebService.GraphJobs;
internal readonly record struct GraphJobUpdateResult<TJob>(bool Updated, TJob Job) where TJob : class
{
public static GraphJobUpdateResult<TJob> UpdatedResult(TJob job) => new(true, job);
public static GraphJobUpdateResult<TJob> NotUpdated(TJob job) => new(false, job);
}

View File

@@ -14,9 +14,9 @@ public interface IGraphJobStore
ValueTask<GraphOverlayJob?> GetOverlayJobAsync(string tenantId, string jobId, CancellationToken cancellationToken);
ValueTask<GraphBuildJob> UpdateAsync(GraphBuildJob job, CancellationToken cancellationToken);
ValueTask<GraphOverlayJob> UpdateAsync(GraphOverlayJob job, CancellationToken cancellationToken);
ValueTask<GraphJobUpdateResult<GraphBuildJob>> UpdateAsync(GraphBuildJob job, GraphJobStatus expectedStatus, CancellationToken cancellationToken);
ValueTask<GraphJobUpdateResult<GraphOverlayJob>> UpdateAsync(GraphOverlayJob job, GraphJobStatus expectedStatus, CancellationToken cancellationToken);
ValueTask<IReadOnlyCollection<GraphOverlayJob>> GetOverlayJobsAsync(string tenantId, CancellationToken cancellationToken);
}

View File

@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using System.Collections.Concurrent;
using System.Collections.Generic;
using StellaOps.Scheduler.Models;
namespace StellaOps.Scheduler.WebService.GraphJobs;
@@ -60,17 +61,37 @@ internal sealed class InMemoryGraphJobStore : IGraphJobStore
return ValueTask.FromResult<GraphOverlayJob?>(null);
}
public ValueTask<GraphBuildJob> UpdateAsync(GraphBuildJob job, CancellationToken cancellationToken)
{
_buildJobs[job.Id] = job;
return ValueTask.FromResult(job);
}
public ValueTask<GraphOverlayJob> UpdateAsync(GraphOverlayJob job, CancellationToken cancellationToken)
{
_overlayJobs[job.Id] = job;
return ValueTask.FromResult(job);
}
public ValueTask<GraphJobUpdateResult<GraphBuildJob>> UpdateAsync(GraphBuildJob job, GraphJobStatus expectedStatus, CancellationToken cancellationToken)
{
if (_buildJobs.TryGetValue(job.Id, out var existing) && string.Equals(existing.TenantId, job.TenantId, StringComparison.Ordinal))
{
if (existing.Status == expectedStatus)
{
_buildJobs[job.Id] = job;
return ValueTask.FromResult(GraphJobUpdateResult<GraphBuildJob>.UpdatedResult(job));
}
return ValueTask.FromResult(GraphJobUpdateResult<GraphBuildJob>.NotUpdated(existing));
}
throw new KeyNotFoundException($"Graph build job '{job.Id}' not found.");
}
public ValueTask<GraphJobUpdateResult<GraphOverlayJob>> UpdateAsync(GraphOverlayJob job, GraphJobStatus expectedStatus, CancellationToken cancellationToken)
{
if (_overlayJobs.TryGetValue(job.Id, out var existing) && string.Equals(existing.TenantId, job.TenantId, StringComparison.Ordinal))
{
if (existing.Status == expectedStatus)
{
_overlayJobs[job.Id] = job;
return ValueTask.FromResult(GraphJobUpdateResult<GraphOverlayJob>.UpdatedResult(job));
}
return ValueTask.FromResult(GraphJobUpdateResult<GraphOverlayJob>.NotUpdated(existing));
}
throw new KeyNotFoundException($"Graph overlay job '{job.Id}' not found.");
}
public ValueTask<IReadOnlyCollection<GraphOverlayJob>> GetOverlayJobsAsync(string tenantId, CancellationToken cancellationToken)
{

View File

@@ -1,4 +1,5 @@
using StellaOps.Scheduler.Models;
using System.Collections.Generic;
using StellaOps.Scheduler.Models;
using StellaOps.Scheduler.Storage.Mongo.Repositories;
namespace StellaOps.Scheduler.WebService.GraphJobs;
@@ -44,11 +45,37 @@ internal sealed class MongoGraphJobStore : IGraphJobStore
public async ValueTask<GraphOverlayJob?> GetOverlayJobAsync(string tenantId, string jobId, CancellationToken cancellationToken)
=> await _repository.GetOverlayJobAsync(tenantId, jobId, cancellationToken);
public async ValueTask<GraphBuildJob> UpdateAsync(GraphBuildJob job, CancellationToken cancellationToken)
=> await _repository.ReplaceAsync(job, cancellationToken);
public async ValueTask<GraphOverlayJob> UpdateAsync(GraphOverlayJob job, CancellationToken cancellationToken)
=> await _repository.ReplaceAsync(job, cancellationToken);
public async ValueTask<GraphJobUpdateResult<GraphBuildJob>> UpdateAsync(GraphBuildJob job, GraphJobStatus expectedStatus, CancellationToken cancellationToken)
{
if (await _repository.TryReplaceAsync(job, expectedStatus, cancellationToken).ConfigureAwait(false))
{
return GraphJobUpdateResult<GraphBuildJob>.UpdatedResult(job);
}
var existing = await _repository.GetBuildJobAsync(job.TenantId, job.Id, cancellationToken).ConfigureAwait(false);
if (existing is null)
{
throw new KeyNotFoundException($"Graph build job '{job.Id}' not found.");
}
return GraphJobUpdateResult<GraphBuildJob>.NotUpdated(existing);
}
public async ValueTask<GraphJobUpdateResult<GraphOverlayJob>> UpdateAsync(GraphOverlayJob job, GraphJobStatus expectedStatus, CancellationToken cancellationToken)
{
if (await _repository.TryReplaceOverlayAsync(job, expectedStatus, cancellationToken).ConfigureAwait(false))
{
return GraphJobUpdateResult<GraphOverlayJob>.UpdatedResult(job);
}
var existing = await _repository.GetOverlayJobAsync(job.TenantId, job.Id, cancellationToken).ConfigureAwait(false);
if (existing is null)
{
throw new KeyNotFoundException($"Graph overlay job '{job.Id}' not found.");
}
return GraphJobUpdateResult<GraphOverlayJob>.NotUpdated(existing);
}
public async ValueTask<IReadOnlyCollection<GraphOverlayJob>> GetOverlayJobsAsync(string tenantId, CancellationToken cancellationToken)
=> await _repository.ListOverlayJobsAsync(tenantId, cancellationToken);

View File

@@ -16,8 +16,10 @@
| ID | Status | Owner(s) | Depends on | Description | Exit Criteria |
|----|--------|----------|------------|-------------|---------------|
| SCHED-WEB-21-004 | DOING (2025-10-26) | Scheduler WebService Guild, Scheduler Storage Guild | SCHED-WEB-21-001, SCHED-STORAGE-16-201 | Persist graph job lifecycle to Mongo storage and publish `scheduler.graph.job.completed@1` events + outbound webhook to Cartographer. | Storage repositories updated; events emitted; webhook payload documented; integration tests cover storage + event flow. **Note:** Events currently log JSON envelopes while the shared platform bus is provisioned. Cartographer webhook now posts JSON payloads when configured; replace inline logging with bus publisher once the shared event transport is online. |
| SCHED-WEB-21-004 | DONE (2025-11-04) | Scheduler WebService Guild, Scheduler Storage Guild | SCHED-WEB-21-001, SCHED-STORAGE-16-201 | Persist graph job lifecycle to Mongo storage and publish `scheduler.graph.job.completed@1` events + outbound webhook to Cartographer. | Storage repositories updated; events emitted; webhook payload documented; integration tests cover storage + event flow. **Note:** Events currently log JSON envelopes while the shared platform bus is provisioned. Cartographer webhook now posts JSON payloads when configured; replace inline logging with bus publisher once the shared event transport is online. |
> 2025-10-30: Implemented Redis-backed publisher (`Scheduler:Events:GraphJobs`) emitting `scheduler.graph.job.completed@1` to configured stream with optional logging fallback; docs/configs to be validated with DevOps before closing.
> 2025-11-04: Resumed SCHED-WEB-21-004 to finalize Mongo lifecycle persistence guards, graph completion events, and Cartographer webhook verification.
> 2025-11-04: SCHED-WEB-21-004 completed lifecycle stored in Mongo with optimistic concurrency, completion events/webhooks emitted once per transition, and result URI metadata refreshed idempotently with unit/integration coverage.
## StellaOps Console (Sprint 23)
| ID | Status | Owner(s) | Depends on | Description | Exit Criteria |

View File

@@ -97,7 +97,7 @@ Webhook invoked by Scheduler Worker once Cartographer finishes a build/overlay j
}
```
The endpoint advances the job through `running → terminal` transitions via `GraphJobStateMachine`, captures the latest correlation identifier, and stores the optional `resultUri` in metadata for downstream exports.
The endpoint advances the job through `running → terminal` transitions via `GraphJobStateMachine`, captures the latest correlation identifier, and stores the optional `resultUri` in metadata for downstream exports. Repeated notifications are idempotent: if the job already reached a terminal state, the response returns the stored snapshot without publishing another event. When a `resultUri` value changes, only the metadata is refreshed—events and webhooks are emitted once per successful status transition.
### `GET /graphs/overlays/lag`
Returns per-tenant overlay lag metrics (counts, min/max/average lag seconds, and last five completions with correlation IDs + result URIs). Requires `graph:read`.
@@ -131,7 +131,6 @@ Response example:
`StellaOps.Scheduler.WebService.Tests/GraphJobEndpointTests.cs` covers scope enforcement and the build-list happy path using the in-memory store. Future work should add overlay coverage once Cartographer adapters are available.
## Known gaps / TODO
- Persist jobs to Scheduler storage and publish `scheduler.graph.job.completed@1` events + outbound webhook to Cartographer (see new `SCHED-WEB-21-004`).
- Extend `GET /graphs/jobs` with pagination cursors shared with Cartographer/Console.
## Known gaps / TODO
- Extend `GET /graphs/jobs` with pagination cursors shared with Cartographer/Console.