feat: Implement approvals workflow and notifications integration
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled

- Added approvals orchestration with persistence and workflow scaffolding.
- Integrated notifications insights and staged resume hooks.
- Introduced approval coordinator and policy notification bridge with unit tests.
- Added approval decision API with resume requeue and persisted plan snapshots.
- Documented the Excitor consensus API beta and provided JSON sample payload.
- Created analyzers to flag usage of deprecated merge service APIs.
- Implemented logging for artifact uploads and approval decision service.
- Added tests for PackRunApprovalDecisionService and related components.
This commit is contained in:
master
2025-11-06 08:48:13 +02:00
parent 21a2759412
commit dd217b4546
98 changed files with 3883 additions and 2381 deletions

View File

@@ -110,18 +110,20 @@ public sealed class FilePackRunStateStore : IPackRunStateStore
return result;
}
private sealed record StateDocument(
string RunId,
string PlanHash,
TaskPackPlanFailurePolicy FailurePolicy,
DateTimeOffset CreatedAt,
DateTimeOffset UpdatedAt,
IReadOnlyList<StepDocument> Steps)
{
public static StateDocument FromDomain(PackRunState state)
{
var steps = state.Steps.Values
.OrderBy(step => step.StepId, StringComparer.Ordinal)
private sealed record StateDocument(
string RunId,
string PlanHash,
TaskPackPlan Plan,
TaskPackPlanFailurePolicy FailurePolicy,
DateTimeOffset RequestedAt,
DateTimeOffset CreatedAt,
DateTimeOffset UpdatedAt,
IReadOnlyList<StepDocument> Steps)
{
public static StateDocument FromDomain(PackRunState state)
{
var steps = state.Steps.Values
.OrderBy(step => step.StepId, StringComparer.Ordinal)
.Select(step => new StepDocument(
step.StepId,
step.Kind,
@@ -137,15 +139,17 @@ public sealed class FilePackRunStateStore : IPackRunStateStore
step.StatusReason))
.ToList();
return new StateDocument(
state.RunId,
state.PlanHash,
state.FailurePolicy,
state.CreatedAt,
state.UpdatedAt,
steps);
}
return new StateDocument(
state.RunId,
state.PlanHash,
state.Plan,
state.FailurePolicy,
state.RequestedAt,
state.CreatedAt,
state.UpdatedAt,
steps);
}
public PackRunState ToDomain()
{
var steps = Steps.ToDictionary(
@@ -165,14 +169,16 @@ public sealed class FilePackRunStateStore : IPackRunStateStore
step.StatusReason),
StringComparer.Ordinal);
return new PackRunState(
RunId,
PlanHash,
FailurePolicy,
CreatedAt,
UpdatedAt,
steps);
}
return new PackRunState(
RunId,
PlanHash,
Plan,
FailurePolicy,
RequestedAt,
CreatedAt,
UpdatedAt,
steps);
}
}
private sealed record StepDocument(

View File

@@ -4,13 +4,13 @@ using StellaOps.AirGap.Policy;
using StellaOps.TaskRunner.Core.Execution;
using StellaOps.TaskRunner.Core.Planning;
using StellaOps.TaskRunner.Core.TaskPacks;
namespace StellaOps.TaskRunner.Infrastructure.Execution;
public sealed class FilesystemPackRunDispatcher : IPackRunJobDispatcher
{
private readonly string queuePath;
private readonly string archivePath;
namespace StellaOps.TaskRunner.Infrastructure.Execution;
public sealed class FilesystemPackRunDispatcher : IPackRunJobDispatcher, IPackRunJobScheduler
{
private readonly string queuePath;
private readonly string archivePath;
private readonly TaskPackManifestLoader manifestLoader = new();
private readonly TaskPackPlanner planner;
private readonly JsonSerializerOptions serializerOptions = new(JsonSerializerDefaults.Web);
@@ -30,11 +30,11 @@ public sealed class FilesystemPackRunDispatcher : IPackRunJobDispatcher
.OrderBy(path => path, StringComparer.Ordinal)
.ToArray();
foreach (var file in files)
{
cancellationToken.ThrowIfCancellationRequested();
try
foreach (var file in files)
{
cancellationToken.ThrowIfCancellationRequested();
try
{
var jobJson = await File.ReadAllTextAsync(file, cancellationToken).ConfigureAwait(false);
var job = JsonSerializer.Deserialize<JobEnvelope>(jobJson, serializerOptions);
@@ -43,38 +43,69 @@ public sealed class FilesystemPackRunDispatcher : IPackRunJobDispatcher
continue;
}
var manifestPath = ResolvePath(queuePath, job.ManifestPath);
var inputsPath = job.InputsPath is null ? null : ResolvePath(queuePath, job.InputsPath);
var manifest = manifestLoader.Load(manifestPath);
var inputs = await LoadInputsAsync(inputsPath, cancellationToken).ConfigureAwait(false);
var planResult = planner.Plan(manifest, inputs);
if (!planResult.Success || planResult.Plan is null)
{
throw new InvalidOperationException($"Failed to plan pack for run {job.RunId}: {string.Join(';', planResult.Errors.Select(e => e.Message))}");
}
var archiveFile = Path.Combine(archivePath, Path.GetFileName(file));
File.Move(file, archiveFile, overwrite: true);
var requestedAt = job.RequestedAt ?? DateTimeOffset.UtcNow;
return new PackRunExecutionContext(job.RunId ?? Guid.NewGuid().ToString("n"), planResult.Plan, requestedAt);
}
catch (Exception ex)
{
TaskPackPlan? plan = job.Plan;
if (plan is null)
{
if (string.IsNullOrWhiteSpace(job.ManifestPath))
{
continue;
}
var manifestPath = ResolvePath(queuePath, job.ManifestPath);
var inputsPath = job.InputsPath is null ? null : ResolvePath(queuePath, job.InputsPath);
var manifest = manifestLoader.Load(manifestPath);
var inputs = await LoadInputsAsync(inputsPath, cancellationToken).ConfigureAwait(false);
var planResult = planner.Plan(manifest, inputs);
if (!planResult.Success || planResult.Plan is null)
{
throw new InvalidOperationException($"Failed to plan pack for run {job.RunId}: {string.Join(';', planResult.Errors.Select(e => e.Message))}");
}
plan = planResult.Plan;
}
var archiveFile = Path.Combine(archivePath, Path.GetFileName(file));
File.Move(file, archiveFile, overwrite: true);
var requestedAt = job.RequestedAt ?? DateTimeOffset.UtcNow;
var runId = string.IsNullOrWhiteSpace(job.RunId) ? Guid.NewGuid().ToString("n") : job.RunId;
return new PackRunExecutionContext(runId, plan, requestedAt);
}
catch (Exception ex)
{
var failedPath = file + ".failed";
File.Move(file, failedPath, overwrite: true);
Console.Error.WriteLine($"Failed to dequeue job '{file}': {ex.Message}");
}
}
return null;
}
private static string ResolvePath(string root, string relative)
=> Path.IsPathRooted(relative) ? relative : Path.Combine(root, relative);
private static async Task<IDictionary<string, JsonNode?>> LoadInputsAsync(string? path, CancellationToken cancellationToken)
return null;
}
public async Task ScheduleAsync(PackRunExecutionContext context, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(context);
var envelope = new JobEnvelope(
context.RunId,
ManifestPath: null,
InputsPath: null,
context.RequestedAt,
context.Plan);
Directory.CreateDirectory(queuePath);
var safeRunId = string.IsNullOrWhiteSpace(context.RunId) ? Guid.NewGuid().ToString("n") : SanitizeFileName(context.RunId);
var fileName = $"{safeRunId}-{DateTimeOffset.UtcNow:yyyyMMddHHmmssfff}.json";
var path = Path.Combine(queuePath, fileName);
var json = JsonSerializer.Serialize(envelope, serializerOptions);
await File.WriteAllTextAsync(path, json, cancellationToken).ConfigureAwait(false);
}
private static string ResolvePath(string root, string relative)
=> Path.IsPathRooted(relative) ? relative : Path.Combine(root, relative);
private static async Task<IDictionary<string, JsonNode?>> LoadInputsAsync(string? path, CancellationToken cancellationToken)
{
if (string.IsNullOrWhiteSpace(path) || !File.Exists(path))
{
@@ -92,7 +123,23 @@ public sealed class FilesystemPackRunDispatcher : IPackRunJobDispatcher
pair => pair.Key,
pair => pair.Value,
StringComparer.Ordinal);
}
private sealed record JobEnvelope(string? RunId, string ManifestPath, string? InputsPath, DateTimeOffset? RequestedAt);
}
}
private sealed record JobEnvelope(
string? RunId,
string? ManifestPath,
string? InputsPath,
DateTimeOffset? RequestedAt,
TaskPackPlan? Plan);
private static string SanitizeFileName(string value)
{
var safe = value.Trim();
foreach (var invalid in Path.GetInvalidFileNameChars())
{
safe = safe.Replace(invalid, '_');
}
return safe;
}
}

View File

@@ -0,0 +1,40 @@
using Microsoft.Extensions.Logging;
using StellaOps.TaskRunner.Core.Execution;
using StellaOps.TaskRunner.Core.Planning;
namespace StellaOps.TaskRunner.Infrastructure.Execution;
public sealed class LoggingPackRunArtifactUploader : IPackRunArtifactUploader
{
private readonly ILogger<LoggingPackRunArtifactUploader> _logger;
public LoggingPackRunArtifactUploader(ILogger<LoggingPackRunArtifactUploader> logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public Task UploadAsync(
PackRunExecutionContext context,
PackRunState state,
IReadOnlyList<TaskPackPlanOutput> outputs,
CancellationToken cancellationToken)
{
if (outputs.Count == 0)
{
return Task.CompletedTask;
}
foreach (var output in outputs)
{
var path = output.Path?.Value?.ToString() ?? "(dynamic)";
_logger.LogInformation(
"Pack run {RunId} scheduled artifact upload for output {Output} (type={Type}, path={Path}).",
context.RunId,
output.Name,
output.Type,
path);
}
return Task.CompletedTask;
}
}

View File

@@ -0,0 +1,117 @@
using Microsoft.Extensions.Logging;
using StellaOps.TaskRunner.Core.Execution;
using StellaOps.TaskRunner.Core.Planning;
namespace StellaOps.TaskRunner.Infrastructure.Execution;
public sealed class PackRunApprovalDecisionService
{
private readonly IPackRunApprovalStore _approvalStore;
private readonly IPackRunStateStore _stateStore;
private readonly IPackRunJobScheduler _scheduler;
private readonly ILogger<PackRunApprovalDecisionService> _logger;
public PackRunApprovalDecisionService(
IPackRunApprovalStore approvalStore,
IPackRunStateStore stateStore,
IPackRunJobScheduler scheduler,
ILogger<PackRunApprovalDecisionService> logger)
{
_approvalStore = approvalStore ?? throw new ArgumentNullException(nameof(approvalStore));
_stateStore = stateStore ?? throw new ArgumentNullException(nameof(stateStore));
_scheduler = scheduler ?? throw new ArgumentNullException(nameof(scheduler));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<PackRunApprovalDecisionResult> ApplyAsync(
PackRunApprovalDecisionRequest request,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(request);
ArgumentException.ThrowIfNullOrWhiteSpace(request.RunId);
ArgumentException.ThrowIfNullOrWhiteSpace(request.ApprovalId);
var runId = request.RunId.Trim();
var approvalId = request.ApprovalId.Trim();
var state = await _stateStore.GetAsync(runId, cancellationToken).ConfigureAwait(false);
if (state is null)
{
_logger.LogWarning("Approval decision for run {RunId} rejected run state not found.", runId);
return PackRunApprovalDecisionResult.NotFound;
}
var approvals = await _approvalStore.GetAsync(runId, cancellationToken).ConfigureAwait(false);
if (approvals.Count == 0)
{
_logger.LogWarning("Approval decision for run {RunId} rejected approval state not found.", runId);
return PackRunApprovalDecisionResult.NotFound;
}
var requestedAt = state.RequestedAt != default ? state.RequestedAt : state.CreatedAt;
var coordinator = PackRunApprovalCoordinator.Restore(state.Plan, approvals, requestedAt);
ApprovalActionResult actionResult;
var now = DateTimeOffset.UtcNow;
switch (request.Decision)
{
case PackRunApprovalDecisionType.Approved:
actionResult = coordinator.Approve(approvalId, request.ActorId ?? "system", now, request.Summary);
break;
case PackRunApprovalDecisionType.Rejected:
actionResult = coordinator.Reject(approvalId, request.ActorId ?? "system", now, request.Summary);
break;
case PackRunApprovalDecisionType.Expired:
actionResult = coordinator.Expire(approvalId, now, request.Summary);
break;
default:
throw new ArgumentOutOfRangeException(nameof(request.Decision), request.Decision, "Unsupported approval decision.");
}
await _approvalStore.UpdateAsync(runId, actionResult.State, cancellationToken).ConfigureAwait(false);
_logger.LogInformation(
"Applied approval decision {Decision} for run {RunId} (approval {ApprovalId}, actor={ActorId}).",
request.Decision,
runId,
approvalId,
request.ActorId ?? "(system)");
if (actionResult.ShouldResumeRun && request.Decision == PackRunApprovalDecisionType.Approved)
{
var context = new PackRunExecutionContext(runId, state.Plan, requestedAt);
await _scheduler.ScheduleAsync(context, cancellationToken).ConfigureAwait(false);
_logger.LogInformation("Scheduled run {RunId} for resume after approvals completed.", runId);
return PackRunApprovalDecisionResult.Resumed;
}
return PackRunApprovalDecisionResult.Applied;
}
}
public sealed record PackRunApprovalDecisionRequest(
string RunId,
string ApprovalId,
PackRunApprovalDecisionType Decision,
string? ActorId,
string? Summary);
public enum PackRunApprovalDecisionType
{
Approved,
Rejected,
Expired
}
public sealed record PackRunApprovalDecisionResult(string Status)
{
public static PackRunApprovalDecisionResult NotFound { get; } = new("not_found");
public static PackRunApprovalDecisionResult Applied { get; } = new("applied");
public static PackRunApprovalDecisionResult Resumed { get; } = new("resumed");
public bool ShouldResume => ReferenceEquals(this, Resumed);
}