Orchestrator decomposition: replace JobEngine with release-orchestrator + workflow services
- Remove jobengine and jobengine-worker containers from docker-compose - Create release-orchestrator service (120 endpoints) with full auth, tenant, and infrastructure DI - Wire workflow engine to PostgreSQL with definition store (wf_definitions table) - Deploy 4 canonical workflow definitions on startup (release-promotion, scan-execution, advisory-refresh, compliance-sweep) - Fix workflow definition JSON to match canonical contract schema (set-state, call-transport, decision) - Add WorkflowClient to release-orchestrator for starting workflow instances on promotion - Add WorkflowTriggerClient + endpoint to scheduler for triggering workflows from system schedules - Update gateway routes from jobengine.stella-ops.local to release-orchestrator.stella-ops.local - Remove Platform.Database dependency on JobEngine.Infrastructure - Fix workflow csproj duplicate Content items (EmbeddedResource + SDK default) - System-managed schedules with source column, SystemScheduleBootstrap, inline edit UI Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,105 @@
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using StellaOps.Scheduler.Models;
|
||||
using StellaOps.Scheduler.Persistence.Postgres.Repositories;
|
||||
using System.Collections.Immutable;
|
||||
|
||||
namespace StellaOps.Scheduler.WebService.Bootstrap;
|
||||
|
||||
/// <summary>
|
||||
/// Creates system-managed schedules on startup for each tenant.
|
||||
/// Missing schedules are inserted; existing ones are left untouched.
|
||||
/// </summary>
|
||||
internal sealed class SystemScheduleBootstrap : BackgroundService
|
||||
{
|
||||
private static readonly (string Slug, string Name, string Cron, ScheduleMode Mode, SelectorScope Scope)[] SystemSchedules =
|
||||
[
|
||||
("nightly-vuln-scan", "Nightly Vulnerability Scan", "0 2 * * *", ScheduleMode.AnalysisOnly, SelectorScope.AllImages),
|
||||
("advisory-refresh", "Continuous Advisory Refresh", "0 */4 * * *", ScheduleMode.ContentRefresh, SelectorScope.AllImages),
|
||||
("weekly-compliance-sweep", "Weekly Compliance Sweep", "0 3 * * 0", ScheduleMode.AnalysisOnly, SelectorScope.AllImages),
|
||||
("epss-score-update", "EPSS Score Update", "0 6 * * *", ScheduleMode.ContentRefresh, SelectorScope.AllImages),
|
||||
("reachability-reeval", "Reachability Re-evaluation", "0 5 * * 1-5", ScheduleMode.AnalysisOnly, SelectorScope.AllImages),
|
||||
("registry-sync", "Registry Sync", "0 */2 * * *", ScheduleMode.ContentRefresh, SelectorScope.AllImages),
|
||||
];
|
||||
|
||||
// TODO: Replace with real multi-tenant resolution when available.
|
||||
private static readonly string[] Tenants = ["demo-prod"];
|
||||
|
||||
private readonly IServiceScopeFactory _scopeFactory;
|
||||
private readonly ILogger<SystemScheduleBootstrap> _logger;
|
||||
|
||||
public SystemScheduleBootstrap(
|
||||
IServiceScopeFactory scopeFactory,
|
||||
ILogger<SystemScheduleBootstrap> logger)
|
||||
{
|
||||
_scopeFactory = scopeFactory;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
// Allow the rest of the host to start before we hit the database.
|
||||
await Task.Yield();
|
||||
|
||||
try
|
||||
{
|
||||
await using var scope = _scopeFactory.CreateAsyncScope();
|
||||
var repository = scope.ServiceProvider.GetRequiredService<IScheduleRepository>();
|
||||
|
||||
foreach (var tenantId in Tenants)
|
||||
{
|
||||
await EnsureSystemSchedulesAsync(repository, tenantId, stoppingToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
catch (Exception ex) when (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
_logger.LogError(ex, "System schedule bootstrap failed.");
|
||||
}
|
||||
}
|
||||
|
||||
private async Task EnsureSystemSchedulesAsync(
|
||||
IScheduleRepository repository,
|
||||
string tenantId,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
|
||||
foreach (var (slug, name, cron, mode, selectorScope) in SystemSchedules)
|
||||
{
|
||||
var scheduleId = $"sys-{tenantId}-{slug}";
|
||||
|
||||
var existing = await repository.GetAsync(tenantId, scheduleId, cancellationToken).ConfigureAwait(false);
|
||||
if (existing is not null)
|
||||
{
|
||||
_logger.LogDebug("System schedule {ScheduleId} already exists for tenant {TenantId}, skipping.", scheduleId, tenantId);
|
||||
continue;
|
||||
}
|
||||
|
||||
var selection = new Selector(selectorScope, tenantId);
|
||||
|
||||
var schedule = new Schedule(
|
||||
id: scheduleId,
|
||||
tenantId: tenantId,
|
||||
name: name,
|
||||
enabled: true,
|
||||
cronExpression: cron,
|
||||
timezone: "UTC",
|
||||
mode: mode,
|
||||
selection: selection,
|
||||
onlyIf: null,
|
||||
notify: null,
|
||||
limits: null,
|
||||
createdAt: now,
|
||||
createdBy: "system-bootstrap",
|
||||
updatedAt: now,
|
||||
updatedBy: "system-bootstrap",
|
||||
subscribers: null,
|
||||
schemaVersion: SchedulerSchemaVersions.Schedule,
|
||||
source: "system");
|
||||
|
||||
await repository.UpsertAsync(schedule, cancellationToken).ConfigureAwait(false);
|
||||
_logger.LogInformation("Created system schedule {ScheduleId} ({Name}) for tenant {TenantId}.", scheduleId, name, tenantId);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -21,6 +21,7 @@ using StellaOps.Scheduler.WebService.EventWebhooks;
|
||||
using StellaOps.Scheduler.WebService.FailureSignatures;
|
||||
using StellaOps.Scheduler.WebService.GraphJobs;
|
||||
using StellaOps.Scheduler.WebService.GraphJobs.Events;
|
||||
using StellaOps.Scheduler.WebService.Bootstrap;
|
||||
using StellaOps.Scheduler.WebService.Hosting;
|
||||
using StellaOps.Scheduler.WebService.Observability;
|
||||
using StellaOps.Scheduler.WebService.Options;
|
||||
@@ -28,8 +29,12 @@ using StellaOps.Scheduler.WebService.PolicyRuns;
|
||||
using StellaOps.Scheduler.WebService.PolicySimulations;
|
||||
using StellaOps.Scheduler.WebService.Runs;
|
||||
using StellaOps.Scheduler.WebService.Schedules;
|
||||
using StellaOps.Scheduler.WebService.Scripts;
|
||||
using StellaOps.Scheduler.WebService.Exceptions;
|
||||
using StellaOps.Scheduler.WebService.VulnerabilityResolverJobs;
|
||||
using StellaOps.ReleaseOrchestrator.Scripts;
|
||||
using StellaOps.ReleaseOrchestrator.Scripts.Persistence;
|
||||
using StellaOps.ReleaseOrchestrator.Scripts.Search;
|
||||
using StellaOps.Scheduler.Worker.Exceptions;
|
||||
using StellaOps.Scheduler.Worker.Observability;
|
||||
using StellaOps.Scheduler.Worker.Options;
|
||||
@@ -118,6 +123,23 @@ else
|
||||
builder.Services.AddSingleton<ISchedulerAuditService, InMemorySchedulerAuditService>();
|
||||
builder.Services.AddSingleton<IPolicyRunService, InMemoryPolicyRunService>();
|
||||
}
|
||||
// Scripts registry (shares the same Postgres options as Scheduler)
|
||||
builder.Services.AddSingleton<ScriptsDataSource>();
|
||||
builder.Services.AddSingleton<IScriptStore, PostgresScriptStore>();
|
||||
builder.Services.AddSingleton<ISearchIndexer, InMemorySearchIndexer>();
|
||||
builder.Services.AddSingleton<IScriptValidator, ScriptValidator>();
|
||||
builder.Services.AddSingleton<ILanguageValidator, CSharpScriptValidator>();
|
||||
builder.Services.AddSingleton<ILanguageValidator, PythonScriptValidator>();
|
||||
builder.Services.AddSingleton<ILanguageValidator, TypeScriptScriptValidator>();
|
||||
builder.Services.AddSingleton<IScriptRegistry, ScriptRegistry>();
|
||||
|
||||
// Workflow engine HTTP client (starts workflow instances for system schedules)
|
||||
builder.Services.AddHttpClient<StellaOps.Scheduler.WebService.Workflow.WorkflowTriggerClient>((sp, client) =>
|
||||
{
|
||||
client.BaseAddress = new Uri(
|
||||
builder.Configuration["Workflow:BaseAddress"] ?? "http://workflow.stella-ops.local");
|
||||
});
|
||||
|
||||
builder.Services.AddSingleton<IGraphJobCompletionPublisher, GraphJobEventPublisher>();
|
||||
builder.Services.AddSingleton<IResolverJobService, InMemoryResolverJobService>();
|
||||
if (cartographerOptions.Webhook.Enabled)
|
||||
@@ -147,6 +169,7 @@ builder.Services.AddSingleton<IExpiringDigestService>(NullExpiringDigestService.
|
||||
builder.Services.AddSingleton<IExpiringAlertService>(NullExpiringAlertService.Instance);
|
||||
builder.Services.AddHostedService<ExceptionLifecycleWorker>();
|
||||
builder.Services.AddHostedService<ExpiringNotificationWorker>();
|
||||
builder.Services.AddHostedService<SystemScheduleBootstrap>();
|
||||
|
||||
var schedulerOptions = builder.Configuration.GetSection("Scheduler").Get<SchedulerOptions>() ?? new SchedulerOptions();
|
||||
schedulerOptions.Validate();
|
||||
@@ -290,11 +313,13 @@ app.MapGet("/readyz", () => Results.Json(new { status = "ready" }))
|
||||
app.MapGraphJobEndpoints();
|
||||
ResolverJobEndpointExtensions.MapResolverJobEndpoints(app);
|
||||
app.MapScheduleEndpoints();
|
||||
StellaOps.Scheduler.WebService.Workflow.WorkflowTriggerEndpoints.MapWorkflowTriggerEndpoints(app);
|
||||
app.MapRunEndpoints();
|
||||
app.MapFailureSignatureEndpoints();
|
||||
app.MapPolicyRunEndpoints();
|
||||
app.MapPolicySimulationEndpoints();
|
||||
app.MapSchedulerEventWebhookEndpoints();
|
||||
app.MapScriptsEndpoints();
|
||||
|
||||
// Refresh Router endpoint cache
|
||||
app.TryRefreshStellaRouterEndpoints(routerEnabled);
|
||||
|
||||
@@ -17,7 +17,8 @@ internal sealed record ScheduleCreateRequest(
|
||||
[property: JsonPropertyName("notify")] ScheduleNotify? Notify = null,
|
||||
[property: JsonPropertyName("limits")] ScheduleLimits? Limits = null,
|
||||
[property: JsonPropertyName("subscribers")] ImmutableArray<string>? Subscribers = null,
|
||||
[property: JsonPropertyName("enabled")] bool Enabled = true);
|
||||
[property: JsonPropertyName("enabled")] bool Enabled = true,
|
||||
[property: JsonPropertyName("source")] string? Source = null);
|
||||
|
||||
internal sealed record ScheduleUpdateRequest(
|
||||
[property: JsonPropertyName("name")] string? Name,
|
||||
|
||||
@@ -41,6 +41,10 @@ internal static class ScheduleEndpoints
|
||||
.WithName("UpdateSchedule")
|
||||
.WithDescription(_t("scheduler.schedule.update_description"))
|
||||
.RequireAuthorization(SchedulerPolicies.Operate);
|
||||
group.MapDelete("/{scheduleId}", DeleteScheduleAsync)
|
||||
.WithName("DeleteSchedule")
|
||||
.WithDescription("Soft-deletes a schedule. System-managed schedules cannot be deleted.")
|
||||
.RequireAuthorization(SchedulerPolicies.Operate);
|
||||
group.MapPost("/{scheduleId}/pause", PauseScheduleAsync)
|
||||
.WithName("PauseSchedule")
|
||||
.WithDescription(_t("scheduler.schedule.pause_description"))
|
||||
@@ -265,6 +269,69 @@ internal static class ScheduleEndpoints
|
||||
}
|
||||
}
|
||||
|
||||
private static async Task<IResult> DeleteScheduleAsync(
|
||||
HttpContext httpContext,
|
||||
string scheduleId,
|
||||
[FromServices] ITenantContextAccessor tenantAccessor,
|
||||
[FromServices] IScopeAuthorizer scopeAuthorizer,
|
||||
[FromServices] IScheduleRepository repository,
|
||||
[FromServices] ISchedulerAuditService auditService,
|
||||
[FromServices] TimeProvider timeProvider,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
scopeAuthorizer.EnsureScope(httpContext, WriteScope);
|
||||
var tenant = tenantAccessor.GetTenant(httpContext);
|
||||
|
||||
var existing = await repository.GetAsync(tenant.TenantId, scheduleId, cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
if (existing is null)
|
||||
{
|
||||
return Results.NotFound();
|
||||
}
|
||||
|
||||
if (string.Equals(existing.Source, "system", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
return Results.Conflict(new { error = "System-managed schedules cannot be deleted." });
|
||||
}
|
||||
|
||||
var now = timeProvider.GetUtcNow();
|
||||
var actor = SchedulerEndpointHelpers.ResolveActorId(httpContext);
|
||||
var deleted = await repository.SoftDeleteAsync(tenant.TenantId, scheduleId, actor, now, cancellationToken).ConfigureAwait(false);
|
||||
if (!deleted)
|
||||
{
|
||||
return Results.NotFound();
|
||||
}
|
||||
|
||||
await auditService.WriteAsync(
|
||||
new SchedulerAuditEvent(
|
||||
tenant.TenantId,
|
||||
"scheduler",
|
||||
"delete",
|
||||
SchedulerEndpointHelpers.ResolveAuditActor(httpContext),
|
||||
ScheduleId: scheduleId,
|
||||
Metadata: new Dictionary<string, string>
|
||||
{
|
||||
["deletedAt"] = now.ToString("O", CultureInfo.InvariantCulture)
|
||||
}),
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
return Results.NoContent();
|
||||
}
|
||||
catch (UnauthorizedAccessException ex)
|
||||
{
|
||||
return Results.Json(new { error = ex.Message }, statusCode: StatusCodes.Status401Unauthorized);
|
||||
}
|
||||
catch (InvalidOperationException ex)
|
||||
{
|
||||
return Results.Json(new { error = ex.Message }, statusCode: StatusCodes.Status403Forbidden);
|
||||
}
|
||||
catch (Exception ex) when (ex is ArgumentException or ValidationException)
|
||||
{
|
||||
return Results.BadRequest(new { error = ex.Message });
|
||||
}
|
||||
}
|
||||
|
||||
private static async Task<IResult> PauseScheduleAsync(
|
||||
HttpContext httpContext,
|
||||
string scheduleId,
|
||||
@@ -309,7 +376,8 @@ internal static class ScheduleEndpoints
|
||||
existing.CreatedBy,
|
||||
now,
|
||||
SchedulerEndpointHelpers.ResolveActorId(httpContext),
|
||||
existing.SchemaVersion);
|
||||
existing.SchemaVersion,
|
||||
existing.Source);
|
||||
|
||||
await repository.UpsertAsync(updated, cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
await auditService.WriteAsync(
|
||||
@@ -385,7 +453,8 @@ internal static class ScheduleEndpoints
|
||||
existing.CreatedBy,
|
||||
now,
|
||||
SchedulerEndpointHelpers.ResolveActorId(httpContext),
|
||||
existing.SchemaVersion);
|
||||
existing.SchemaVersion,
|
||||
existing.Source);
|
||||
|
||||
await repository.UpsertAsync(updated, cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
await auditService.WriteAsync(
|
||||
@@ -461,7 +530,8 @@ internal static class ScheduleEndpoints
|
||||
existing.CreatedBy,
|
||||
updatedAt,
|
||||
actor,
|
||||
existing.SchemaVersion);
|
||||
existing.SchemaVersion,
|
||||
existing.Source);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
<ProjectReference Include="../../Router/__Libraries/StellaOps.Messaging/StellaOps.Messaging.csproj" />
|
||||
<ProjectReference Include="../../Router/__Libraries/StellaOps.Router.AspNet/StellaOps.Router.AspNet.csproj" />
|
||||
<ProjectReference Include="../../__Libraries/StellaOps.Localization/StellaOps.Localization.csproj" />
|
||||
<ProjectReference Include="../../ReleaseOrchestrator/__Libraries/StellaOps.ReleaseOrchestrator.Scripts/StellaOps.ReleaseOrchestrator.Scripts.csproj" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<EmbeddedResource Include="Translations\*.json" />
|
||||
|
||||
@@ -0,0 +1,104 @@
|
||||
using System.Net.Http.Json;
|
||||
using System.Text.Json;
|
||||
using System.Text.Json.Serialization;
|
||||
|
||||
namespace StellaOps.Scheduler.WebService.Workflow;
|
||||
|
||||
/// <summary>
|
||||
/// HTTP client for triggering workflow instances from the scheduler.
|
||||
/// Maps system schedule names to workflow definitions.
|
||||
/// </summary>
|
||||
public sealed class WorkflowTriggerClient(
|
||||
HttpClient httpClient,
|
||||
ILogger<WorkflowTriggerClient> logger)
|
||||
{
|
||||
private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web)
|
||||
{
|
||||
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Maps system schedule names to workflow definition names.
|
||||
/// </summary>
|
||||
private static readonly Dictionary<string, string> ScheduleToWorkflow = new(StringComparer.OrdinalIgnoreCase)
|
||||
{
|
||||
["Nightly Vulnerability Scan"] = "scan-execution",
|
||||
["Continuous Advisory Refresh"] = "advisory-refresh",
|
||||
["Weekly Compliance Sweep"] = "compliance-sweep",
|
||||
["EPSS Score Update"] = "scan-execution",
|
||||
["Reachability Re-evaluation"] = "scan-execution",
|
||||
["Registry Sync"] = "scan-execution",
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Tries to resolve a workflow name for the given schedule name.
|
||||
/// </summary>
|
||||
public static string? ResolveWorkflowName(string scheduleName)
|
||||
{
|
||||
return ScheduleToWorkflow.TryGetValue(scheduleName, out var wf) ? wf : null;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Starts a workflow instance for the given schedule.
|
||||
/// </summary>
|
||||
public async Task<WorkflowStartResult?> TriggerAsync(
|
||||
string scheduleName,
|
||||
string tenantId,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
var workflowName = ResolveWorkflowName(scheduleName);
|
||||
if (workflowName is null)
|
||||
{
|
||||
logger.LogDebug("No workflow mapping for schedule {ScheduleName}", scheduleName);
|
||||
return null;
|
||||
}
|
||||
|
||||
var request = new
|
||||
{
|
||||
workflowName,
|
||||
payload = new Dictionary<string, object?>
|
||||
{
|
||||
["triggeredBy"] = "scheduler",
|
||||
["scheduleName"] = scheduleName,
|
||||
["tenantId"] = tenantId,
|
||||
["triggeredAt"] = DateTime.UtcNow.ToString("O"),
|
||||
},
|
||||
};
|
||||
|
||||
try
|
||||
{
|
||||
var response = await httpClient.PostAsJsonAsync(
|
||||
"/api/workflow/start", request, JsonOptions, cancellationToken);
|
||||
|
||||
if (response.IsSuccessStatusCode)
|
||||
{
|
||||
var result = await response.Content.ReadFromJsonAsync<WorkflowStartResult>(
|
||||
JsonOptions, cancellationToken);
|
||||
|
||||
logger.LogInformation(
|
||||
"Triggered workflow {WorkflowName} for schedule {ScheduleName} → instance {InstanceId}",
|
||||
workflowName, scheduleName, result?.WorkflowInstanceId);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
var body = await response.Content.ReadAsStringAsync(cancellationToken);
|
||||
logger.LogWarning(
|
||||
"Workflow trigger failed for {ScheduleName} → {WorkflowName}: {StatusCode} {Body}",
|
||||
scheduleName, workflowName, response.StatusCode, body);
|
||||
return null;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogWarning(ex, "Failed to trigger workflow for schedule {ScheduleName}", scheduleName);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public sealed record WorkflowStartResult
|
||||
{
|
||||
public string? WorkflowInstanceId { get; init; }
|
||||
public string? WorkflowName { get; init; }
|
||||
public string? WorkflowVersion { get; init; }
|
||||
}
|
||||
@@ -0,0 +1,62 @@
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
using StellaOps.Auth.ServerIntegration.Tenancy;
|
||||
using StellaOps.Scheduler.Persistence.Postgres.Repositories;
|
||||
using StellaOps.Scheduler.WebService.Security;
|
||||
|
||||
namespace StellaOps.Scheduler.WebService.Workflow;
|
||||
|
||||
/// <summary>
|
||||
/// Endpoints for triggering workflow instances from system schedules.
|
||||
/// </summary>
|
||||
internal static class WorkflowTriggerEndpoints
|
||||
{
|
||||
public static IEndpointRouteBuilder MapWorkflowTriggerEndpoints(this IEndpointRouteBuilder routes)
|
||||
{
|
||||
routes.MapPost("/api/v1/scheduler/schedules/{scheduleId}/trigger-workflow", TriggerWorkflowAsync)
|
||||
.WithName("TriggerScheduleWorkflow")
|
||||
.WithDescription("Trigger a workflow instance for a system-managed schedule")
|
||||
.WithTags("Schedules")
|
||||
.RequireAuthorization(SchedulerPolicies.Operate)
|
||||
.RequireTenant();
|
||||
|
||||
return routes;
|
||||
}
|
||||
|
||||
private static async Task<IResult> TriggerWorkflowAsync(
|
||||
string scheduleId,
|
||||
[FromServices] IStellaOpsTenantAccessor tenant,
|
||||
[FromServices] ScheduleRepository scheduleRepo,
|
||||
[FromServices] WorkflowTriggerClient workflowClient,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var schedule = await scheduleRepo.GetAsync(tenant.TenantId!, scheduleId, cancellationToken);
|
||||
if (schedule is null)
|
||||
{
|
||||
return Results.NotFound(new { error = "Schedule not found" });
|
||||
}
|
||||
|
||||
var workflowName = WorkflowTriggerClient.ResolveWorkflowName(schedule.Name);
|
||||
if (workflowName is null)
|
||||
{
|
||||
return Results.BadRequest(new
|
||||
{
|
||||
error = "no_workflow_mapping",
|
||||
message = $"Schedule '{schedule.Name}' does not have a workflow mapping",
|
||||
});
|
||||
}
|
||||
|
||||
var result = await workflowClient.TriggerAsync(schedule.Name, tenant.TenantId!, cancellationToken);
|
||||
if (result is null)
|
||||
{
|
||||
return Results.StatusCode(502);
|
||||
}
|
||||
|
||||
return Results.Ok(new
|
||||
{
|
||||
scheduleId = schedule.Id,
|
||||
scheduleName = schedule.Name,
|
||||
workflowName,
|
||||
workflowInstanceId = result.WorkflowInstanceId,
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -25,7 +25,8 @@ public sealed record Schedule
|
||||
DateTimeOffset updatedAt,
|
||||
string updatedBy,
|
||||
ImmutableArray<string>? subscribers = null,
|
||||
string? schemaVersion = null)
|
||||
string? schemaVersion = null,
|
||||
string source = "user")
|
||||
: this(
|
||||
id,
|
||||
tenantId,
|
||||
@@ -43,7 +44,8 @@ public sealed record Schedule
|
||||
createdBy,
|
||||
updatedAt,
|
||||
updatedBy,
|
||||
schemaVersion)
|
||||
schemaVersion,
|
||||
source)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -65,7 +67,8 @@ public sealed record Schedule
|
||||
string createdBy,
|
||||
DateTimeOffset updatedAt,
|
||||
string updatedBy,
|
||||
string? schemaVersion = null)
|
||||
string? schemaVersion = null,
|
||||
string source = "user")
|
||||
{
|
||||
Id = Validation.EnsureId(id, nameof(id));
|
||||
TenantId = Validation.EnsureTenantId(tenantId, nameof(tenantId));
|
||||
@@ -88,6 +91,7 @@ public sealed record Schedule
|
||||
UpdatedAt = Validation.NormalizeTimestamp(updatedAt);
|
||||
UpdatedBy = Validation.EnsureSimpleIdentifier(updatedBy, nameof(updatedBy));
|
||||
SchemaVersion = SchedulerSchemaVersions.EnsureSchedule(schemaVersion);
|
||||
Source = string.IsNullOrWhiteSpace(source) ? "user" : source.Trim();
|
||||
|
||||
if (Selection.TenantId is not null && !string.Equals(Selection.TenantId, TenantId, StringComparison.Ordinal))
|
||||
{
|
||||
@@ -129,6 +133,8 @@ public sealed record Schedule
|
||||
public DateTimeOffset UpdatedAt { get; }
|
||||
|
||||
public string UpdatedBy { get; }
|
||||
|
||||
public string Source { get; } = "user";
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -0,0 +1,124 @@
|
||||
-- 004_create_scripts_schema.sql
|
||||
-- Creates the scripts schema for the multi-language script registry.
|
||||
|
||||
CREATE SCHEMA IF NOT EXISTS scripts;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS scripts.scripts (
|
||||
id TEXT PRIMARY KEY,
|
||||
name TEXT NOT NULL,
|
||||
description TEXT,
|
||||
language TEXT NOT NULL,
|
||||
content TEXT NOT NULL,
|
||||
entry_point TEXT,
|
||||
version INT NOT NULL DEFAULT 1,
|
||||
dependencies JSONB NOT NULL DEFAULT '[]',
|
||||
tags TEXT[] NOT NULL DEFAULT '{}',
|
||||
variables JSONB NOT NULL DEFAULT '[]',
|
||||
visibility TEXT NOT NULL DEFAULT 'private',
|
||||
owner_id TEXT NOT NULL,
|
||||
team_id TEXT,
|
||||
content_hash TEXT NOT NULL,
|
||||
is_sample BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
sample_category TEXT,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_scripts_owner ON scripts.scripts (owner_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_scripts_lang ON scripts.scripts (language);
|
||||
CREATE INDEX IF NOT EXISTS idx_scripts_vis ON scripts.scripts (visibility);
|
||||
CREATE INDEX IF NOT EXISTS idx_scripts_sample ON scripts.scripts (is_sample) WHERE is_sample = TRUE;
|
||||
CREATE INDEX IF NOT EXISTS idx_scripts_updated ON scripts.scripts (updated_at DESC NULLS LAST, created_at DESC);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS scripts.script_versions (
|
||||
script_id TEXT NOT NULL REFERENCES scripts.scripts(id) ON DELETE CASCADE,
|
||||
version INT NOT NULL,
|
||||
content TEXT NOT NULL,
|
||||
content_hash TEXT NOT NULL,
|
||||
dependencies JSONB NOT NULL DEFAULT '[]',
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
created_by TEXT NOT NULL,
|
||||
change_note TEXT,
|
||||
PRIMARY KEY (script_id, version)
|
||||
);
|
||||
|
||||
-- Seed sample scripts (matching frontend expectations)
|
||||
|
||||
INSERT INTO scripts.scripts (id, name, description, language, content, version, tags, variables, visibility, owner_id, content_hash, is_sample, sample_category, created_at, updated_at)
|
||||
VALUES
|
||||
(
|
||||
'scr-001',
|
||||
'Pre-deploy Health Check',
|
||||
'Validates service health endpoints before deployment proceeds. Checks HTTP status, response time, and dependency connectivity.',
|
||||
'bash',
|
||||
E'#!/bin/bash\n# Pre-deploy health check script\nset -euo pipefail\n\nSERVICE_URL=\"${SERVICE_URL:-http://localhost:8080}\"\nTIMEOUT=${TIMEOUT:-10}\n\necho \"Checking health at $SERVICE_URL/health...\"\nHTTP_CODE=$(curl -s -o /dev/null -w \"%{http_code}\" --max-time \"$TIMEOUT\" \"$SERVICE_URL/health\")\n\nif [ \"$HTTP_CODE\" -eq 200 ]; then\n echo \"Health check passed (HTTP $HTTP_CODE)\"\n exit 0\nelse\n echo \"Health check failed (HTTP $HTTP_CODE)\"\n exit 1\nfi',
|
||||
3,
|
||||
ARRAY['health-check', 'pre-deploy', 'infrastructure'],
|
||||
'[{"name":"SERVICE_URL","description":"Target service URL for health check","isRequired":true,"defaultValue":"http://localhost:8080","isSecret":false},{"name":"TIMEOUT","description":"Request timeout in seconds","isRequired":false,"defaultValue":"10","isSecret":false}]'::jsonb,
|
||||
'organization',
|
||||
'admin',
|
||||
'sha256:a1b2c3d4e5f6',
|
||||
TRUE,
|
||||
'deployment',
|
||||
'2026-01-10T08:00:00Z',
|
||||
'2026-03-15T14:30:00Z'
|
||||
),
|
||||
(
|
||||
'scr-002',
|
||||
'Database Migration Validator',
|
||||
'Validates pending database migrations against schema constraints and checks for backward compatibility.',
|
||||
'python',
|
||||
E'\"\"\"Database migration validator.\"\"\"\nimport sys\nimport hashlib\n\ndef validate_migration(migration_path: str) -> bool:\n \"\"\"Validate a single migration file.\"\"\"\n with open(migration_path, ''r'') as f:\n content = f.read()\n\n destructive_ops = [''DROP TABLE'', ''DROP COLUMN'', ''TRUNCATE'']\n for op in destructive_ops:\n if op in content.upper():\n print(f\"WARNING: Destructive operation found: {op}\")\n return False\n\n checksum = hashlib.sha256(content.encode()).hexdigest()\n print(f\"Migration checksum: {checksum[:16]}\")\n return True\n\nif __name__ == ''__main__'':\n path = sys.argv[1] if len(sys.argv) > 1 else ''migrations/''\n result = validate_migration(path)\n sys.exit(0 if result else 1)',
|
||||
2,
|
||||
ARRAY['database', 'migration', 'validation'],
|
||||
'[{"name":"DB_CONNECTION","description":"Database connection string","isRequired":true,"isSecret":true},{"name":"MIGRATION_DIR","description":"Path to migrations directory","isRequired":false,"defaultValue":"migrations/","isSecret":false}]'::jsonb,
|
||||
'team',
|
||||
'admin',
|
||||
'sha256:b2c3d4e5f6a7',
|
||||
TRUE,
|
||||
'database',
|
||||
'2026-02-01T10:00:00Z',
|
||||
'2026-03-10T09:15:00Z'
|
||||
),
|
||||
(
|
||||
'scr-003',
|
||||
'Release Notes Generator',
|
||||
'Generates release notes from git commit history between two tags, grouped by conventional commit type.',
|
||||
'typescript',
|
||||
E'/**\n * Release notes generator.\n * Parses conventional commits and groups them by type.\n */\ninterface CommitEntry {\n hash: string;\n type: string;\n scope?: string;\n message: string;\n}\n\nfunction parseConventionalCommit(line: string): CommitEntry | null {\n const match = line.match(/^(\\w+)(\\((\\w+)\\))?:\\s+(.+)$/);\n if (!match) return null;\n return { hash: '''', type: match[1], scope: match[3], message: match[4] };\n}\n\nconsole.log(''Release notes generator ready.'');',
|
||||
1,
|
||||
ARRAY['release-notes', 'git', 'automation'],
|
||||
'[]'::jsonb,
|
||||
'public',
|
||||
'admin',
|
||||
'sha256:c3d4e5f6a7b8',
|
||||
TRUE,
|
||||
'release',
|
||||
'2026-03-01T12:00:00Z',
|
||||
'2026-03-01T12:00:00Z'
|
||||
),
|
||||
(
|
||||
'scr-004',
|
||||
'Container Image Scan Wrapper',
|
||||
'Wraps Trivy container scanning with custom policy checks and outputs results in SARIF format.',
|
||||
'csharp',
|
||||
E'// Container image scan wrapper\nusing System;\nusing System.Diagnostics;\nusing System.Text.Json;\n\nvar imageRef = Environment.GetEnvironmentVariable(\"IMAGE_REF\")\n ?? throw new InvalidOperationException(\"IMAGE_REF not set\");\n\nvar severityThreshold = Environment.GetEnvironmentVariable(\"SEVERITY_THRESHOLD\") ?? \"HIGH\";\n\nConsole.WriteLine($\"Scanning {imageRef} with threshold {severityThreshold}...\");',
|
||||
5,
|
||||
ARRAY['security', 'scanning', 'trivy', 'container'],
|
||||
'[{"name":"IMAGE_REF","description":"Container image reference to scan","isRequired":true,"isSecret":false},{"name":"SEVERITY_THRESHOLD","description":"Minimum severity to report","isRequired":false,"defaultValue":"HIGH","isSecret":false}]'::jsonb,
|
||||
'organization',
|
||||
'admin',
|
||||
'sha256:d4e5f6a7b8c9',
|
||||
FALSE,
|
||||
NULL,
|
||||
'2026-01-20T16:00:00Z',
|
||||
'2026-03-20T11:45:00Z'
|
||||
)
|
||||
ON CONFLICT (id) DO NOTHING;
|
||||
|
||||
-- Seed version history for each script
|
||||
INSERT INTO scripts.script_versions (script_id, version, content, content_hash, dependencies, created_at, created_by, change_note)
|
||||
SELECT id, version, content, content_hash, dependencies, created_at, owner_id, 'Current version'
|
||||
FROM scripts.scripts
|
||||
WHERE id IN ('scr-001','scr-002','scr-003','scr-004')
|
||||
ON CONFLICT (script_id, version) DO NOTHING;
|
||||
@@ -0,0 +1,6 @@
|
||||
-- Migration: 005_add_source_column
|
||||
-- Adds source tracking for system-managed vs user-created schedules.
|
||||
|
||||
ALTER TABLE scheduler.schedules ADD COLUMN IF NOT EXISTS source TEXT NOT NULL DEFAULT 'user';
|
||||
|
||||
COMMENT ON COLUMN scheduler.schedules.source IS 'Origin: system (auto-managed), user (manual), integration (plugin-created)';
|
||||
@@ -22,15 +22,23 @@ VALUES
|
||||
ON CONFLICT (tenant_id, name) DO NOTHING;
|
||||
|
||||
-- ============================================================================
|
||||
-- Schedules
|
||||
-- Schedules (system-managed)
|
||||
-- ============================================================================
|
||||
|
||||
INSERT INTO scheduler.schedules (id, tenant_id, name, description, enabled, cron_expression, mode, selection, created_by, updated_by)
|
||||
INSERT INTO scheduler.schedules (id, tenant_id, name, description, enabled, cron_expression, timezone, mode, selection, created_by, updated_by, source)
|
||||
VALUES
|
||||
('demo-sched-001', 'demo-prod', 'production-scan', 'Production artifact scanning schedule', true,
|
||||
'0 2 * * *', 'analysisonly', '{"tags": ["production"], "registries": ["ghcr.io"]}'::jsonb, 'admin', 'admin'),
|
||||
('demo-sched-002', 'demo-prod', 'staging-scan', 'Staging artifact scanning schedule', true,
|
||||
'0 3 * * *', 'contentrefresh', '{"tags": ["staging"], "registries": ["ghcr.io"]}'::jsonb, 'admin', 'admin')
|
||||
('sys-demo-prod-nightly-vuln-scan', 'demo-prod', 'Nightly Vulnerability Scan', 'System-managed nightly vulnerability scan of all images', true,
|
||||
'0 2 * * *', 'UTC', 'analysisonly', '{"scope": "all-images"}'::jsonb, 'system-bootstrap', 'system-bootstrap', 'system'),
|
||||
('sys-demo-prod-advisory-refresh', 'demo-prod', 'Continuous Advisory Refresh', 'System-managed advisory feed refresh every 4 hours', true,
|
||||
'0 */4 * * *', 'UTC', 'contentrefresh', '{"scope": "all-images"}'::jsonb, 'system-bootstrap', 'system-bootstrap', 'system'),
|
||||
('sys-demo-prod-weekly-compliance-sweep', 'demo-prod', 'Weekly Compliance Sweep', 'System-managed weekly compliance sweep on Sundays', true,
|
||||
'0 3 * * 0', 'UTC', 'analysisonly', '{"scope": "all-images"}'::jsonb, 'system-bootstrap', 'system-bootstrap', 'system'),
|
||||
('sys-demo-prod-epss-score-update', 'demo-prod', 'EPSS Score Update', 'System-managed daily EPSS score refresh', true,
|
||||
'0 6 * * *', 'UTC', 'contentrefresh', '{"scope": "all-images"}'::jsonb, 'system-bootstrap', 'system-bootstrap', 'system'),
|
||||
('sys-demo-prod-reachability-reeval', 'demo-prod', 'Reachability Re-evaluation', 'System-managed weekday reachability analysis', true,
|
||||
'0 5 * * 1-5', 'UTC', 'analysisonly', '{"scope": "all-images"}'::jsonb, 'system-bootstrap', 'system-bootstrap', 'system'),
|
||||
('sys-demo-prod-registry-sync', 'demo-prod', 'Registry Sync', 'System-managed registry sync every 2 hours', true,
|
||||
'0 */2 * * *', 'UTC', 'contentrefresh', '{"scope": "all-images"}'::jsonb, 'system-bootstrap', 'system-bootstrap', 'system')
|
||||
ON CONFLICT DO NOTHING;
|
||||
|
||||
-- ============================================================================
|
||||
@@ -77,22 +85,22 @@ ON CONFLICT (tenant_id, idempotency_key) DO NOTHING;
|
||||
|
||||
INSERT INTO scheduler.runs (id, tenant_id, schedule_id, trigger, state, stats, reason, created_at, started_at, finished_at, deltas)
|
||||
VALUES
|
||||
('demo-run-001', 'demo-prod', 'demo-sched-001',
|
||||
'{"type": "scheduled", "triggerId": "daily-vulnerability-scan"}'::jsonb,
|
||||
('demo-run-001', 'demo-prod', 'sys-demo-prod-nightly-vuln-scan',
|
||||
'{"type": "scheduled", "triggerId": "nightly-vuln-scan"}'::jsonb,
|
||||
'completed',
|
||||
'{"findingCount": 127, "criticalCount": 3, "highCount": 12, "newFindingCount": 5, "componentCount": 842}'::jsonb,
|
||||
'{"code": "completed", "message": "Scan completed successfully"}'::jsonb,
|
||||
NOW() - INTERVAL '2 hours', NOW() - INTERVAL '2 hours', NOW() - INTERVAL '1 hour 45 minutes',
|
||||
'{"added": 5, "removed": 2, "unchanged": 120}'::jsonb),
|
||||
('demo-run-002', 'demo-prod', 'demo-sched-001',
|
||||
'{"type": "scheduled", "triggerId": "daily-vulnerability-scan"}'::jsonb,
|
||||
('demo-run-002', 'demo-prod', 'sys-demo-prod-nightly-vuln-scan',
|
||||
'{"type": "scheduled", "triggerId": "nightly-vuln-scan"}'::jsonb,
|
||||
'completed',
|
||||
'{"findingCount": 122, "criticalCount": 2, "highCount": 11, "newFindingCount": 0, "componentCount": 840}'::jsonb,
|
||||
'{"code": "completed", "message": "Scan completed successfully"}'::jsonb,
|
||||
NOW() - INTERVAL '26 hours', NOW() - INTERVAL '26 hours', NOW() - INTERVAL '25 hours 50 minutes',
|
||||
'{"added": 0, "removed": 3, "unchanged": 122}'::jsonb),
|
||||
('demo-run-003', 'demo-prod', 'demo-sched-002',
|
||||
'{"type": "scheduled", "triggerId": "staging-scan"}'::jsonb,
|
||||
('demo-run-003', 'demo-prod', 'sys-demo-prod-registry-sync',
|
||||
'{"type": "scheduled", "triggerId": "registry-sync"}'::jsonb,
|
||||
'error',
|
||||
'{"findingCount": 0, "criticalCount": 0, "highCount": 0, "newFindingCount": 0, "componentCount": 0}'::jsonb,
|
||||
'{"code": "timeout", "message": "Registry connection timed out after 300s"}'::jsonb,
|
||||
|
||||
@@ -30,11 +30,11 @@ public sealed class ScheduleRepository : RepositoryBase<SchedulerDataSource>, IS
|
||||
INSERT INTO scheduler.schedules (
|
||||
id, tenant_id, name, description, enabled, cron_expression, timezone, mode,
|
||||
selection, only_if, notify, limits, subscribers, created_at, created_by,
|
||||
updated_at, updated_by, deleted_at, deleted_by, schema_version)
|
||||
updated_at, updated_by, deleted_at, deleted_by, schema_version, source)
|
||||
VALUES (
|
||||
@id, @tenant_id, @name, @description, @enabled, @cron_expression, @timezone, @mode,
|
||||
@selection, @only_if, @notify, @limits, @subscribers, @created_at, @created_by,
|
||||
@updated_at, @updated_by, NULL, NULL, @schema_version)
|
||||
@updated_at, @updated_by, NULL, NULL, @schema_version, @source)
|
||||
ON CONFLICT (id) DO UPDATE SET
|
||||
name = EXCLUDED.name,
|
||||
description = EXCLUDED.description,
|
||||
@@ -74,6 +74,7 @@ public sealed class ScheduleRepository : RepositoryBase<SchedulerDataSource>, IS
|
||||
AddParameter(command, "updated_at", schedule.UpdatedAt);
|
||||
AddParameter(command, "updated_by", schedule.UpdatedBy);
|
||||
AddParameter(command, "schema_version", schedule.SchemaVersion ?? (object)DBNull.Value);
|
||||
AddParameter(command, "source", schedule.Source);
|
||||
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
@@ -178,6 +179,7 @@ public sealed class ScheduleRepository : RepositoryBase<SchedulerDataSource>, IS
|
||||
reader.GetString(reader.GetOrdinal("created_by")),
|
||||
DateTime.SpecifyKind(reader.GetDateTime(reader.GetOrdinal("updated_at")), DateTimeKind.Utc),
|
||||
reader.GetString(reader.GetOrdinal("updated_by")),
|
||||
GetNullableString(reader, reader.GetOrdinal("schema_version")));
|
||||
GetNullableString(reader, reader.GetOrdinal("schema_version")),
|
||||
source: GetNullableString(reader, reader.GetOrdinal("source")) ?? "user");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user