Merge branch 'worktree-agent-a503735a'

# Conflicts:
#	devops/compose/docker-compose.stella-ops.yml
#	devops/docker/services-matrix.env
#	src/JobEngine/StellaOps.Scheduler.WebService/Bootstrap/SystemScheduleBootstrap.cs
#	src/JobEngine/StellaOps.Scheduler.WebService/Program.cs
#	src/JobEngine/StellaOps.Scheduler.WebService/Schedules/ScheduleEndpoints.cs
#	src/JobEngine/StellaOps.Scheduler.WebService/StellaOps.Scheduler.WebService.csproj
#	src/JobEngine/StellaOps.Scheduler.__Libraries/StellaOps.Scheduler.Models/Schedule.cs
#	src/JobEngine/StellaOps.Scheduler.__Libraries/StellaOps.Scheduler.Plugin.Abstractions/IRunProgressReporter.cs
#	src/JobEngine/StellaOps.Scheduler.__Libraries/StellaOps.Scheduler.Plugin.Abstractions/ISchedulerJobPlugin.cs
#	src/JobEngine/StellaOps.Scheduler.__Libraries/StellaOps.Scheduler.Plugin.Abstractions/ISchedulerPluginRegistry.cs
#	src/JobEngine/StellaOps.Scheduler.__Libraries/StellaOps.Scheduler.Plugin.Abstractions/JobConfigValidationResult.cs
#	src/JobEngine/StellaOps.Scheduler.__Libraries/StellaOps.Scheduler.Plugin.Abstractions/JobExecutionContext.cs
#	src/JobEngine/StellaOps.Scheduler.__Libraries/StellaOps.Scheduler.Plugin.Abstractions/JobPlan.cs
#	src/JobEngine/StellaOps.Scheduler.__Libraries/StellaOps.Scheduler.Plugin.Abstractions/JobPlanContext.cs
#	src/JobEngine/StellaOps.Scheduler.__Libraries/StellaOps.Scheduler.Plugin.Abstractions/SchedulerPluginRegistry.cs
#	src/JobEngine/StellaOps.Scheduler.__Libraries/StellaOps.Scheduler.Plugin.Abstractions/StellaOps.Scheduler.Plugin.Abstractions.csproj
This commit is contained in:
master
2026-04-08 16:27:02 +03:00
51 changed files with 2028 additions and 91 deletions

Submodule .claude/worktrees/agent-a09ac2bf added at 6b15d9827d

Submodule .claude/worktrees/agent-a0e024b5 added at 53f294400f

Submodule .claude/worktrees/agent-a10bd4c5 added at 53f294400f

Submodule .claude/worktrees/agent-a2075206 added at 53f294400f

Submodule .claude/worktrees/agent-a4519a5e added at 53f294400f

Submodule .claude/worktrees/agent-a503735a added at 908619e739

Submodule .claude/worktrees/agent-a56f8a54 added at 53f294400f

Submodule .claude/worktrees/agent-a571129e added at 53f294400f

Submodule .claude/worktrees/agent-a7d51e97 added at 53f294400f

Submodule .claude/worktrees/agent-a85909fa added at 53f294400f

Submodule .claude/worktrees/agent-a91af20e added at 53f294400f

Submodule .claude/worktrees/agent-ac7693db added at 53f294400f

Submodule .claude/worktrees/agent-acb49e4f added at 53f294400f

Submodule .claude/worktrees/agent-ad7eeb67 added at 53f294400f

Submodule .claude/worktrees/agent-ae2506a8 added at 53f294400f

Submodule .claude/worktrees/agent-aee3b313 added at 53f294400f

View File

@@ -14,19 +14,30 @@ Doctor provides a plugin-based diagnostic system that enables:
- **Capability probing** for feature compatibility
- **Evidence collection** for troubleshooting and compliance
### Scheduler Runtime Surface (run-002 remediation)
### Scheduler Integration (Sprint 20260408-003)
Doctor Scheduler now exposes an HTTP management and trend surface at:
- `GET/POST /api/v1/doctor/scheduler/schedules`
- `GET/PUT/DELETE /api/v1/doctor/scheduler/schedules/{scheduleId}`
- `GET /api/v1/doctor/scheduler/schedules/{scheduleId}/executions`
- `POST /api/v1/doctor/scheduler/schedules/{scheduleId}/execute`
- `GET /api/v1/doctor/scheduler/trends`
- `GET /api/v1/doctor/scheduler/trends/checks/{checkId}`
- `GET /api/v1/doctor/scheduler/trends/categories/{category}`
- `GET /api/v1/doctor/scheduler/trends/degrading`
> **The standalone Doctor Scheduler service is deprecated.**
> Doctor health check scheduling is now handled by the Scheduler service's `DoctorJobPlugin`.
The default local runtime uses deterministic in-memory repositories with stable ordering for schedule lists, execution history, and trend summaries.
Doctor schedules are managed via the Scheduler API with `jobKind="doctor"` and plugin-specific
configuration in `pluginConfig`. Trend data is stored in `scheduler.doctor_trends` (PostgreSQL).
**Scheduler-hosted Doctor endpoints:**
- `GET /api/v1/scheduler/doctor/trends` -- aggregated trend summaries
- `GET /api/v1/scheduler/doctor/trends/checks/{checkId}` -- per-check trend data
- `GET /api/v1/scheduler/doctor/trends/categories/{category}` -- per-category trend data
- `GET /api/v1/scheduler/doctor/trends/degrading` -- checks with degrading health
**Schedule management** uses the standard Scheduler API at `/api/v1/scheduler/schedules`
with `jobKind="doctor"` and `pluginConfig` containing Doctor-specific options (mode, categories, alerts).
Three default Doctor schedules are seeded by `SystemScheduleBootstrap`:
- `doctor-full-daily` (0 4 * * *) -- Full health check
- `doctor-quick-hourly` (0 * * * *) -- Quick health check
- `doctor-compliance-weekly` (0 5 * * 0) -- Compliance category audit
The Doctor WebService (`src/Doctor/StellaOps.Doctor.WebService/`) remains the execution engine.
The plugin communicates with it via HTTP POST to `/api/v1/doctor/run`.
### AdvisoryAI Diagnosis Surface (run-003 remediation)

View File

@@ -67,3 +67,14 @@ Every `IDoctorCheck` implementation MUST have a corresponding documentation arti
- Development: https://localhost:10260, http://localhost:10261
- Local alias: https://doctor.stella-ops.local, http://doctor.stella-ops.local
- Env var: STELLAOPS_DOCTOR_URL
## Doctor Scheduling (DEPRECATED standalone service)
The standalone `StellaOps.Doctor.Scheduler` service is deprecated as of Sprint 20260408-003.
Doctor health check scheduling is now handled by the Scheduler service's `DoctorJobPlugin`
(jobKind="doctor") in `src/JobEngine/StellaOps.Scheduler.__Libraries/StellaOps.Scheduler.Plugin.Doctor/`.
- Schedule CRUD: use the Scheduler API with `jobKind="doctor"` and `pluginConfig` for Doctor-specific options.
- Trend storage: moved from in-memory to Scheduler's PostgreSQL schema (`scheduler.doctor_trends`).
- Trend endpoints: served by the Scheduler at `/api/v1/scheduler/doctor/trends/*`.
- The Doctor WebService remains unchanged as the execution engine for health checks.
- Source code for the old scheduler is kept for one release cycle before removal.

View File

@@ -0,0 +1,24 @@
# StellaOps.Doctor.Scheduler (DEPRECATED)
> **DEPRECATED** as of Sprint 20260408-003. This standalone service is replaced by the
> `DoctorJobPlugin` in the Scheduler service (`src/JobEngine/StellaOps.Scheduler.__Libraries/StellaOps.Scheduler.Plugin.Doctor/`).
## Migration Summary
| Capability | Before (this service) | After (Scheduler plugin) |
|---|---|---|
| Schedule CRUD | In-memory, `/api/v1/doctor/scheduler/schedules` | Scheduler API with `jobKind="doctor"` |
| Cron evaluation | `DoctorScheduleWorker` background service | Scheduler's existing cron infrastructure |
| Run execution | `ScheduleExecutor` HTTP calls to Doctor WebService | `DoctorJobPlugin.ExecuteAsync()` |
| Trend storage | `InMemoryTrendRepository` (volatile) | `scheduler.doctor_trends` PostgreSQL table |
| Trend endpoints | `/api/v1/doctor/scheduler/trends/*` | `/api/v1/scheduler/doctor/trends/*` |
## What Stays
- **Doctor WebService** (`src/Doctor/StellaOps.Doctor.WebService/`): unchanged, remains the health check execution engine.
- **Doctor Plugins** (`src/Doctor/__Plugins/`): unchanged, loaded by Doctor WebService.
## Removal Timeline
Source code is kept for one release cycle. The `doctor-scheduler` container is commented out
in `docker-compose.stella-ops.yml` and disabled in `services-matrix.env`.

View File

@@ -10,21 +10,18 @@ namespace StellaOps.Scheduler.WebService.Bootstrap;
/// <summary>
/// Creates system-managed schedules on startup for each tenant.
/// Missing schedules are inserted; existing ones are left untouched.
/// Includes both scan schedules and Doctor health check schedules.
/// </summary>
internal sealed class SystemScheduleBootstrap : BackgroundService
{
private static readonly (string Slug, string Name, string Cron, ScheduleMode Mode, SelectorScope Scope, string JobKind, ImmutableDictionary<string, object?>? PluginConfig)[] SystemSchedules =
[
// Scan schedules (jobKind = "scan")
("nightly-vuln-scan", "Nightly Vulnerability Scan", "0 2 * * *", ScheduleMode.AnalysisOnly, SelectorScope.AllImages, "scan", null),
("advisory-refresh", "Continuous Advisory Refresh", "0 */4 * * *", ScheduleMode.ContentRefresh, SelectorScope.AllImages, "scan", null),
("weekly-compliance-sweep", "Weekly Compliance Sweep", "0 3 * * 0", ScheduleMode.AnalysisOnly, SelectorScope.AllImages, "scan", null),
("epss-score-update", "EPSS Score Update", "0 6 * * *", ScheduleMode.ContentRefresh, SelectorScope.AllImages, "scan", null),
("reachability-reeval", "Reachability Re-evaluation", "0 5 * * 1-5", ScheduleMode.AnalysisOnly, SelectorScope.AllImages, "scan", null),
("registry-sync", "Registry Sync", "0 */2 * * *", ScheduleMode.ContentRefresh, SelectorScope.AllImages, "scan", null),
// Doctor health check schedules (jobKind = "doctor")
// Doctor health check schedules (replaces standalone doctor-scheduler seeds)
("doctor-full-daily", "Daily Health Check", "0 4 * * *", ScheduleMode.AnalysisOnly, SelectorScope.AllImages, "doctor",
ImmutableDictionary.CreateRange<string, object?>(new KeyValuePair<string, object?>[]
{
@@ -37,7 +34,7 @@ internal sealed class SystemScheduleBootstrap : BackgroundService
new("doctorMode", "quick"),
new("timeoutSeconds", 120),
})),
("doctor-compliance-weekly", "Weekly Compliance Audit", "0 5 * * 0", ScheduleMode.AnalysisOnly, SelectorScope.AllImages, "doctor",
("doctor-compliance-weekly","Weekly Compliance Audit", "0 5 * * 0", ScheduleMode.AnalysisOnly, SelectorScope.AllImages, "doctor",
ImmutableDictionary.CreateRange<string, object?>(new KeyValuePair<string, object?>[]
{
new("doctorMode", "categories"),
@@ -124,7 +121,7 @@ internal sealed class SystemScheduleBootstrap : BackgroundService
pluginConfig: pluginConfig);
await repository.UpsertAsync(schedule, cancellationToken).ConfigureAwait(false);
_logger.LogInformation("Created system schedule {ScheduleId} ({Name}, jobKind={JobKind}) for tenant {TenantId}.", scheduleId, name, jobKind, tenantId);
_logger.LogInformation("Created system schedule {ScheduleId} ({Name}, kind={JobKind}) for tenant {TenantId}.", scheduleId, name, jobKind, tenantId);
}
}
}

View File

@@ -2,7 +2,6 @@
using Microsoft.AspNetCore.Authentication;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
using StellaOps.Audit.Emission;
using StellaOps.Auth.Abstractions;
using StellaOps.Localization;
using StellaOps.Auth.ServerIntegration;
@@ -10,7 +9,6 @@ using StellaOps.Auth.ServerIntegration.Tenancy;
using StellaOps.Plugin.DependencyInjection;
using StellaOps.Plugin.Hosting;
using StellaOps.Router.AspNet;
using StellaOps.Scheduler.Plugin;
using StellaOps.Scheduler.ImpactIndex;
using StellaOps.Scheduler.Models;
using StellaOps.Scheduler.Persistence.Extensions;
@@ -31,11 +29,18 @@ using StellaOps.Scheduler.WebService.PolicyRuns;
using StellaOps.Scheduler.WebService.PolicySimulations;
using StellaOps.Scheduler.WebService.Runs;
using StellaOps.Scheduler.WebService.Schedules;
using StellaOps.Scheduler.WebService.Scripts;
using StellaOps.Scheduler.WebService.Exceptions;
using StellaOps.Scheduler.WebService.VulnerabilityResolverJobs;
using StellaOps.ReleaseOrchestrator.Scripts;
using StellaOps.ReleaseOrchestrator.Scripts.Persistence;
using StellaOps.ReleaseOrchestrator.Scripts.Search;
using StellaOps.Scheduler.Worker.Exceptions;
using StellaOps.Scheduler.Worker.Observability;
using StellaOps.Scheduler.Worker.Options;
using StellaOps.Scheduler.Plugin;
using StellaOps.Scheduler.Plugin.Scan;
using StellaOps.Scheduler.Plugin.Doctor;
using System.Linq;
var builder = WebApplication.CreateBuilder(args);
@@ -121,6 +126,16 @@ else
builder.Services.AddSingleton<ISchedulerAuditService, InMemorySchedulerAuditService>();
builder.Services.AddSingleton<IPolicyRunService, InMemoryPolicyRunService>();
}
// Scripts registry (shares the same Postgres options as Scheduler)
builder.Services.AddSingleton<ScriptsDataSource>();
builder.Services.AddSingleton<IScriptStore, PostgresScriptStore>();
builder.Services.AddSingleton<ISearchIndexer, InMemorySearchIndexer>();
builder.Services.AddSingleton<IScriptValidator, ScriptValidator>();
builder.Services.AddSingleton<ILanguageValidator, CSharpScriptValidator>();
builder.Services.AddSingleton<ILanguageValidator, PythonScriptValidator>();
builder.Services.AddSingleton<ILanguageValidator, TypeScriptScriptValidator>();
builder.Services.AddSingleton<IScriptRegistry, ScriptRegistry>();
// Workflow engine HTTP client (starts workflow instances for system schedules)
builder.Services.AddHttpClient<StellaOps.Scheduler.WebService.Workflow.WorkflowTriggerClient>((sp, client) =>
{
@@ -176,23 +191,39 @@ var pluginHostOptions = SchedulerPluginHostFactory.Build(schedulerOptions.Plugin
builder.Services.AddSingleton(pluginHostOptions);
builder.Services.RegisterPluginRoutines(builder.Configuration, pluginHostOptions);
// Scheduler plugin registry: discover and register ISchedulerJobPlugin implementations
// Scheduler Plugin Registry: register built-in and assembly-loaded job plugins
var pluginRegistry = new SchedulerPluginRegistry();
// Register built-in scan plugin (default for all existing schedules)
// Built-in: ScanJobPlugin (handles jobKind="scan")
var scanPlugin = new ScanJobPlugin();
pluginRegistry.Register(scanPlugin);
// Discover ISchedulerJobPlugin implementations from assembly-loaded plugins
var loadResult = PluginHost.LoadPlugins(pluginHostOptions);
foreach (var pluginAssembly in loadResult.Plugins)
// Built-in: DoctorJobPlugin (handles jobKind="doctor")
var doctorPlugin = new DoctorJobPlugin();
pluginRegistry.Register(doctorPlugin);
doctorPlugin.ConfigureServices(builder.Services, builder.Configuration);
// Discover assembly-loaded ISchedulerJobPlugin implementations from plugin DLLs
var pluginLoadResult = StellaOps.Plugin.Hosting.PluginHost.LoadPlugins(pluginHostOptions);
foreach (var loadedPlugin in pluginLoadResult.Plugins)
{
var jobPlugins = StellaOps.Plugin.PluginLoader.LoadPlugins<ISchedulerJobPlugin>(
new[] { pluginAssembly.Assembly });
foreach (var jobPlugin in jobPlugins)
foreach (var type in loadedPlugin.Assembly.GetTypes())
{
pluginRegistry.Register(jobPlugin);
jobPlugin.ConfigureServices(builder.Services, builder.Configuration);
if (type.IsAbstract || type.IsInterface || !typeof(ISchedulerJobPlugin).IsAssignableFrom(type))
continue;
if (Activator.CreateInstance(type) is ISchedulerJobPlugin jobPlugin)
{
try
{
pluginRegistry.Register(jobPlugin);
jobPlugin.ConfigureServices(builder.Services, builder.Configuration);
}
catch (InvalidOperationException)
{
// Duplicate JobKind; skip silently (built-in takes precedence)
}
}
}
}
@@ -281,9 +312,6 @@ builder.Services.AddEndpointsApiExplorer();
builder.Services.AddStellaOpsLocalization(builder.Configuration);
builder.Services.AddTranslationBundle(System.Reflection.Assembly.GetExecutingAssembly());
// Unified audit emission (posts audit events to Timeline service)
builder.Services.AddAuditEmission(builder.Configuration);
// Stella Router integration
var routerEnabled = builder.Services.AddRouterMicroservice(
builder.Configuration,
@@ -332,11 +360,13 @@ app.MapFailureSignatureEndpoints();
app.MapPolicyRunEndpoints();
app.MapPolicySimulationEndpoints();
app.MapSchedulerEventWebhookEndpoints();
app.MapScriptsEndpoints();
// Map plugin-provided endpoints (e.g., Doctor trend endpoints)
foreach (var (jobKind, _) in pluginRegistry.ListRegistered())
// Map plugin-registered endpoints (e.g. Doctor trend endpoints)
var registry = app.Services.GetRequiredService<ISchedulerPluginRegistry>();
foreach (var (jobKind, _) in registry.ListRegistered())
{
var plugin = pluginRegistry.Resolve(jobKind);
var plugin = registry.Resolve(jobKind);
plugin?.MapEndpoints(app);
}

View File

@@ -183,7 +183,7 @@ internal static class ScheduleEndpoints
SchedulerEndpointHelpers.ResolveActorId(httpContext),
SchedulerSchemaVersions.Schedule,
source: request.Source ?? "user",
jobKind: request.JobKind ?? "scan",
jobKind: request.JobKind,
pluginConfig: request.PluginConfig);
await repository.UpsertAsync(schedule, cancellationToken: cancellationToken).ConfigureAwait(false);

View File

@@ -12,6 +12,8 @@
<ItemGroup>
<ProjectReference Include="../StellaOps.Scheduler.__Libraries/StellaOps.Scheduler.Models/StellaOps.Scheduler.Models.csproj" />
<ProjectReference Include="../StellaOps.Scheduler.__Libraries/StellaOps.Scheduler.Plugin.Abstractions/StellaOps.Scheduler.Plugin.Abstractions.csproj" />
<ProjectReference Include="../StellaOps.Scheduler.__Libraries/StellaOps.Scheduler.Plugin.Scan/StellaOps.Scheduler.Plugin.Scan.csproj" />
<ProjectReference Include="../StellaOps.Scheduler.__Libraries/StellaOps.Scheduler.Plugin.Doctor/StellaOps.Scheduler.Plugin.Doctor.csproj" />
<ProjectReference Include="../StellaOps.Scheduler.__Libraries/StellaOps.Scheduler.ImpactIndex/StellaOps.Scheduler.ImpactIndex.csproj" />
<ProjectReference Include="../StellaOps.Scheduler.__Libraries/StellaOps.Scheduler.Queue/StellaOps.Scheduler.Queue.csproj" />
<ProjectReference Include="../StellaOps.Scheduler.__Libraries/StellaOps.Scheduler.Persistence/StellaOps.Scheduler.Persistence.csproj" />
@@ -24,7 +26,7 @@
<ProjectReference Include="../../Router/__Libraries/StellaOps.Messaging/StellaOps.Messaging.csproj" />
<ProjectReference Include="../../Router/__Libraries/StellaOps.Router.AspNet/StellaOps.Router.AspNet.csproj" />
<ProjectReference Include="../../__Libraries/StellaOps.Localization/StellaOps.Localization.csproj" />
<ProjectReference Include="../../__Libraries/StellaOps.Audit.Emission/StellaOps.Audit.Emission.csproj" />
<ProjectReference Include="../../ReleaseOrchestrator/__Libraries/StellaOps.ReleaseOrchestrator.Scripts/StellaOps.ReleaseOrchestrator.Scripts.csproj" />
</ItemGroup>
<ItemGroup>
<EmbeddedResource Include="Translations\*.json" />

View File

@@ -27,7 +27,7 @@ public sealed record Schedule
ImmutableArray<string>? subscribers = null,
string? schemaVersion = null,
string source = "user",
string jobKind = "scan",
string? jobKind = null,
ImmutableDictionary<string, object?>? pluginConfig = null)
: this(
id,
@@ -73,7 +73,7 @@ public sealed record Schedule
string updatedBy,
string? schemaVersion = null,
string source = "user",
string jobKind = "scan",
string? jobKind = null,
ImmutableDictionary<string, object?>? pluginConfig = null)
{
Id = Validation.EnsureId(id, nameof(id));
@@ -98,7 +98,7 @@ public sealed record Schedule
UpdatedBy = Validation.EnsureSimpleIdentifier(updatedBy, nameof(updatedBy));
SchemaVersion = SchedulerSchemaVersions.EnsureSchedule(schemaVersion);
Source = string.IsNullOrWhiteSpace(source) ? "user" : source.Trim();
JobKind = string.IsNullOrWhiteSpace(jobKind) ? "scan" : jobKind.Trim().ToLowerInvariant();
JobKind = string.IsNullOrWhiteSpace(jobKind) ? "scan" : jobKind.Trim();
PluginConfig = pluginConfig;
if (Selection.TenantId is not null && !string.Equals(Selection.TenantId, TenantId, StringComparison.Ordinal))
@@ -145,15 +145,15 @@ public sealed record Schedule
public string Source { get; } = "user";
/// <summary>
/// Identifies which <see cref="Plugin.ISchedulerJobPlugin"/> handles this schedule.
/// Defaults to "scan" for backward compatibility with existing schedules.
/// Identifies which plugin handles this schedule's execution (e.g. "scan", "doctor").
/// Defaults to "scan" for backward compatibility.
/// </summary>
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string JobKind { get; } = "scan";
/// <summary>
/// Plugin-specific configuration stored as JSON. For scan jobs this is null
/// (mode/selector cover everything). For other job kinds (e.g., "doctor") this
/// contains plugin-specific settings.
/// Plugin-specific configuration stored as JSON. For scan jobs this is null.
/// For other plugins (e.g., doctor) this contains plugin-specific settings.
/// </summary>
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public ImmutableDictionary<string, object?>? PluginConfig { get; }

View File

@@ -0,0 +1,16 @@
-- Migration: 007_add_schedule_job_kind
-- Adds job_kind and plugin_config columns to support the scheduler plugin architecture.
-- job_kind routes schedule execution to the correct ISchedulerJobPlugin.
-- plugin_config stores plugin-specific JSON configuration.
ALTER TABLE scheduler.schedules
ADD COLUMN IF NOT EXISTS job_kind TEXT NOT NULL DEFAULT 'scan';
ALTER TABLE scheduler.schedules
ADD COLUMN IF NOT EXISTS plugin_config JSONB;
COMMENT ON COLUMN scheduler.schedules.job_kind IS 'Routes to the ISchedulerJobPlugin that handles this schedule (e.g. scan, doctor, policy-sweep).';
COMMENT ON COLUMN scheduler.schedules.plugin_config IS 'Plugin-specific configuration as JSON. Schema defined by the plugin identified by job_kind.';
-- Index for filtering schedules by job kind (common in UI and API queries).
CREATE INDEX IF NOT EXISTS idx_schedules_job_kind ON scheduler.schedules(tenant_id, job_kind) WHERE deleted_at IS NULL;

View File

@@ -0,0 +1,36 @@
-- Migration: 008_add_doctor_trends
-- Adds the doctor_trends table for storing health check trend data
-- previously held in the standalone Doctor Scheduler's in-memory repository.
CREATE TABLE IF NOT EXISTS scheduler.doctor_trends (
id BIGSERIAL PRIMARY KEY,
tenant_id TEXT NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
check_id TEXT NOT NULL,
plugin_id TEXT NOT NULL,
category TEXT NOT NULL,
run_id TEXT NOT NULL,
status TEXT NOT NULL,
health_score INT NOT NULL,
duration_ms INT NOT NULL DEFAULT 0,
evidence_values JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_doctor_trends_tenant_check ON scheduler.doctor_trends(tenant_id, check_id, timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_doctor_trends_tenant_category ON scheduler.doctor_trends(tenant_id, category, timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_doctor_trends_tenant_run ON scheduler.doctor_trends(tenant_id, run_id);
CREATE INDEX IF NOT EXISTS idx_doctor_trends_timestamp ON scheduler.doctor_trends(timestamp DESC);
-- BRIN index for time-series queries over large datasets
CREATE INDEX IF NOT EXISTS brin_doctor_trends_timestamp ON scheduler.doctor_trends USING BRIN(timestamp) WITH (pages_per_range = 128);
COMMENT ON TABLE scheduler.doctor_trends IS 'Health check trend data from Doctor scheduled runs. Migrated from the standalone Doctor Scheduler in-memory repository.';
-- RLS for tenant isolation
ALTER TABLE scheduler.doctor_trends ENABLE ROW LEVEL SECURITY;
ALTER TABLE scheduler.doctor_trends FORCE ROW LEVEL SECURITY;
DROP POLICY IF EXISTS doctor_trends_tenant_isolation ON scheduler.doctor_trends;
CREATE POLICY doctor_trends_tenant_isolation ON scheduler.doctor_trends FOR ALL
USING (tenant_id = scheduler_app.require_current_tenant())
WITH CHECK (tenant_id = scheduler_app.require_current_tenant());

View File

@@ -0,0 +1,48 @@
# AGENTS - Scheduler Plugin Architecture
## Overview
The Scheduler Plugin system enables non-scanning workloads (health checks, policy sweeps,
graph builds, etc.) to be scheduled and executed as first-class Scheduler jobs.
## Plugin Contract
Every plugin implements `ISchedulerJobPlugin` from `StellaOps.Scheduler.Plugin.Abstractions`:
| Method | Purpose |
|---|---|
| `JobKind` | Unique string identifier stored in `Schedule.JobKind` |
| `DisplayName` | Human-readable name for UI |
| `CreatePlanAsync` | Build execution plan from Schedule + Run |
| `ExecuteAsync` | Execute the plan (called by Worker Host) |
| `ValidateConfigAsync` | Validate plugin-specific config in `Schedule.PluginConfig` |
| `GetConfigJsonSchema` | Return JSON schema for UI-driven config forms |
| `ConfigureServices` | Register plugin DI services at startup |
| `MapEndpoints` | Register plugin HTTP endpoints |
## Built-in Plugins
| JobKind | Library | Description |
|---|---|---|
| `scan` | `StellaOps.Scheduler.Plugin.Scan` | Wraps existing scan logic (zero behavioral change) |
| `doctor` | `StellaOps.Scheduler.Plugin.Doctor` | Doctor health checks (replaces standalone doctor-scheduler) |
## Adding a New Plugin
1. Create a class library under `StellaOps.Scheduler.__Libraries/StellaOps.Scheduler.Plugin.<Name>/`.
2. Implement `ISchedulerJobPlugin`.
3. Reference `StellaOps.Scheduler.Plugin.Abstractions`.
4. Register in `Program.cs` or drop DLL into `plugins/scheduler/` for assembly-loaded discovery.
5. Create schedules with `jobKind="your-kind"` and `pluginConfig={...}`.
## Database Schema
- `scheduler.schedules.job_kind` (TEXT, default 'scan') routes to the plugin
- `scheduler.schedules.plugin_config` (JSONB, nullable) stores plugin-specific config
- Plugin-specific tables (e.g. `scheduler.doctor_trends`) added via embedded SQL migrations
## Working Directory
- Abstractions: `src/JobEngine/StellaOps.Scheduler.__Libraries/StellaOps.Scheduler.Plugin.Abstractions/`
- Scan plugin: `src/JobEngine/StellaOps.Scheduler.__Libraries/StellaOps.Scheduler.Plugin.Scan/`
- Doctor plugin: `src/JobEngine/StellaOps.Scheduler.__Libraries/StellaOps.Scheduler.Plugin.Doctor/`

View File

@@ -4,33 +4,21 @@ namespace StellaOps.Scheduler.Plugin;
/// <summary>
/// Callback interface for plugins to report progress and update Run state.
/// Implementations are provided by the Scheduler infrastructure and persist
/// progress updates to storage.
/// </summary>
public interface IRunProgressReporter
{
/// <summary>
/// Reports progress as a fraction of estimated steps.
/// Reports progress as completed/total with an optional message.
/// </summary>
/// <param name="completed">Number of steps completed so far.</param>
/// <param name="total">Total number of steps expected.</param>
/// <param name="message">Optional human-readable progress message.</param>
/// <param name="ct">Cancellation token.</param>
Task ReportProgressAsync(int completed, int total, string? message = null, CancellationToken ct = default);
/// <summary>
/// Transitions the Run to a new state (e.g., Running, Completed, Error).
/// Transitions the Run to a new state, optionally recording an error.
/// </summary>
/// <param name="newState">Target state.</param>
/// <param name="error">Error message when transitioning to Error state.</param>
/// <param name="ct">Cancellation token.</param>
Task TransitionStateAsync(RunState newState, string? error = null, CancellationToken ct = default);
/// <summary>
/// Appends a log entry to the Run's execution log.
/// </summary>
/// <param name="message">Log message.</param>
/// <param name="level">Log level (info, warn, error).</param>
/// <param name="ct">Cancellation token.</param>
Task AppendLogAsync(string message, string level = "info", CancellationToken ct = default);
}

View File

@@ -6,10 +6,8 @@ using StellaOps.Scheduler.Models;
namespace StellaOps.Scheduler.Plugin;
/// <summary>
/// Defines a pluggable job type for the Scheduler service.
/// Each implementation handles a specific <see cref="JobKind"/> (e.g., "scan", "doctor", "policy-sweep").
/// The Scheduler routes cron triggers and manual runs to the correct plugin based on
/// <see cref="Schedule.JobKind"/>.
/// Identifies the kind of job a plugin handles. Used in Schedule.JobKind
/// to route cron triggers to the correct plugin at execution time.
/// </summary>
public interface ISchedulerJobPlugin
{
@@ -39,12 +37,12 @@ public interface ISchedulerJobPlugin
/// <summary>
/// Executes the plan. Called by the Worker Host.
/// Must be idempotent and support cancellation.
/// Updates Run state via the provided <see cref="IRunProgressReporter"/>.
/// Updates Run state via the provided IRunProgressReporter.
/// </summary>
Task ExecuteAsync(JobExecutionContext context, CancellationToken ct);
/// <summary>
/// Validates plugin-specific configuration stored in <see cref="Schedule.PluginConfig"/>.
/// Optionally validates plugin-specific configuration stored in Schedule.PluginConfig.
/// Called on schedule create/update.
/// </summary>
Task<JobConfigValidationResult> ValidateConfigAsync(
@@ -53,7 +51,6 @@ public interface ISchedulerJobPlugin
/// <summary>
/// Returns the JSON schema for plugin-specific configuration, enabling UI-driven forms.
/// Returns null if the plugin requires no configuration.
/// </summary>
string? GetConfigJsonSchema();

View File

@@ -1,20 +1,17 @@
namespace StellaOps.Scheduler.Plugin;
/// <summary>
/// Registry of available scheduler job plugins keyed by <see cref="ISchedulerJobPlugin.JobKind"/>.
/// Used by the Scheduler to route schedule triggers and manual runs to the correct plugin.
/// Registry for discovering and resolving scheduler job plugins by their JobKind.
/// </summary>
public interface ISchedulerPluginRegistry
{
/// <summary>
/// Registers a plugin. Throws if a plugin with the same <see cref="ISchedulerJobPlugin.JobKind"/>
/// is already registered.
/// Registers a plugin. Throws if a plugin with the same JobKind is already registered.
/// </summary>
void Register(ISchedulerJobPlugin plugin);
/// <summary>
/// Resolves the plugin for the given job kind.
/// Returns null if no plugin is registered for the kind.
/// Resolves a plugin by its JobKind. Returns null if no plugin is registered for the kind.
/// </summary>
ISchedulerJobPlugin? Resolve(string jobKind);

View File

@@ -1,21 +1,20 @@
namespace StellaOps.Scheduler.Plugin;
/// <summary>
/// Result of plugin configuration validation.
/// Returned by <see cref="ISchedulerJobPlugin.ValidateConfigAsync"/>.
/// Result of plugin config validation.
/// </summary>
public sealed record JobConfigValidationResult(
bool IsValid,
IReadOnlyList<string> Errors)
{
/// <summary>
/// Returns a successful validation result with no errors.
/// A successful validation result with no errors.
/// </summary>
public static JobConfigValidationResult Success { get; } = new(true, []);
public static JobConfigValidationResult Valid { get; } = new(true, Array.Empty<string>());
/// <summary>
/// Creates a failed validation result with the specified errors.
/// Creates a failed validation result with the given errors.
/// </summary>
public static JobConfigValidationResult Failure(params string[] errors)
public static JobConfigValidationResult Invalid(params string[] errors)
=> new(false, errors);
}

View File

@@ -4,8 +4,6 @@ namespace StellaOps.Scheduler.Plugin;
/// <summary>
/// Context passed to <see cref="ISchedulerJobPlugin.ExecuteAsync"/>.
/// Provides access to the schedule, run, plan, a progress reporter for
/// updating run state, the DI container, and a deterministic time source.
/// </summary>
public sealed record JobExecutionContext(
Schedule Schedule,

View File

@@ -2,8 +2,6 @@ namespace StellaOps.Scheduler.Plugin;
/// <summary>
/// The plan produced by a plugin. Serialized to JSON and stored on the Run.
/// Contains the <see cref="JobKind"/> to identify which plugin created it,
/// a typed payload dictionary, and an estimated step count for progress tracking.
/// </summary>
public sealed record JobPlan(
string JobKind,

View File

@@ -4,8 +4,6 @@ namespace StellaOps.Scheduler.Plugin;
/// <summary>
/// Immutable context passed to <see cref="ISchedulerJobPlugin.CreatePlanAsync"/>.
/// Provides access to the schedule definition, the newly created run record,
/// the DI container, and a deterministic time source.
/// </summary>
public sealed record JobPlanContext(
Schedule Schedule,

View File

@@ -4,7 +4,6 @@ namespace StellaOps.Scheduler.Plugin;
/// <summary>
/// Thread-safe in-memory registry for scheduler job plugins.
/// Plugins are registered at startup and resolved at trigger time.
/// </summary>
public sealed class SchedulerPluginRegistry : ISchedulerPluginRegistry
{
@@ -23,9 +22,9 @@ public sealed class SchedulerPluginRegistry : ISchedulerPluginRegistry
if (!_plugins.TryAdd(plugin.JobKind, plugin))
{
throw new InvalidOperationException(
$"A scheduler plugin with JobKind '{plugin.JobKind}' is already registered " +
$"(existing: {_plugins[plugin.JobKind].GetType().FullName}, " +
$"new: {plugin.GetType().FullName}).");
$"A plugin with JobKind '{plugin.JobKind}' is already registered. " +
$"Existing: '{_plugins[plugin.JobKind].DisplayName}', " +
$"Attempted: '{plugin.DisplayName}'.");
}
}
@@ -46,7 +45,6 @@ public sealed class SchedulerPluginRegistry : ISchedulerPluginRegistry
return _plugins.Values
.OrderBy(p => p.JobKind, StringComparer.OrdinalIgnoreCase)
.Select(p => (p.JobKind, p.DisplayName))
.ToList()
.AsReadOnly();
.ToArray();
}
}

View File

@@ -7,7 +7,7 @@
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<RootNamespace>StellaOps.Scheduler.Plugin</RootNamespace>
<AssemblyName>StellaOps.Scheduler.Plugin.Abstractions</AssemblyName>
<Description>Plugin contract abstractions for the StellaOps Scheduler job plugin architecture</Description>
<Description>Plugin abstraction contracts for the StellaOps Scheduler job plugin system.</Description>
</PropertyGroup>
<ItemGroup>

View File

@@ -0,0 +1,32 @@
namespace StellaOps.Scheduler.Plugin.Doctor;
/// <summary>
/// Configuration options for the Doctor job plugin.
/// </summary>
public sealed class DoctorJobOptions
{
/// <summary>
/// Configuration section name.
/// </summary>
public const string SectionName = "Scheduler:Doctor";
/// <summary>
/// URL of the Doctor WebService API.
/// </summary>
public string DoctorApiUrl { get; set; } = "http://doctor.stella-ops.local";
/// <summary>
/// Default timeout for Doctor runs (in seconds).
/// </summary>
public int DefaultTimeoutSeconds { get; set; } = 300;
/// <summary>
/// How long to retain trend data (in days).
/// </summary>
public int TrendDataRetentionDays { get; set; } = 365;
/// <summary>
/// Polling interval when waiting for Doctor run completion (in seconds).
/// </summary>
public int PollIntervalSeconds { get; set; } = 2;
}

View File

@@ -0,0 +1,551 @@
using System.Diagnostics;
using System.Net.Http.Json;
using System.Text.Json;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Routing;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StellaOps.Scheduler.Models;
namespace StellaOps.Scheduler.Plugin.Doctor;
/// <summary>
/// Doctor health check job plugin for the Scheduler.
/// Replaces the standalone doctor-scheduler service by executing Doctor runs
/// via the Doctor WebService HTTP API and storing trend data in the Scheduler's database.
/// </summary>
public sealed class DoctorJobPlugin : ISchedulerJobPlugin
{
/// <inheritdoc />
public string JobKind => "doctor";
/// <inheritdoc />
public string DisplayName => "Doctor Health Checks";
/// <inheritdoc />
public Version Version { get; } = new(1, 0, 0);
/// <inheritdoc />
public Task<JobPlan> CreatePlanAsync(JobPlanContext context, CancellationToken ct)
{
var config = DoctorScheduleConfig.FromPluginConfig(context.Schedule.PluginConfig);
var payload = new Dictionary<string, object?>
{
["doctorMode"] = config.DoctorMode,
["categories"] = config.Categories,
["plugins"] = config.Plugins,
["timeoutSeconds"] = config.TimeoutSeconds,
["scheduleId"] = context.Schedule.Id,
};
var plan = new JobPlan(
JobKind: "doctor",
Payload: payload,
EstimatedSteps: 3); // trigger, poll, store trends
return Task.FromResult(plan);
}
/// <inheritdoc />
public async Task ExecuteAsync(JobExecutionContext context, CancellationToken ct)
{
var logger = context.Services.GetService<ILoggerFactory>()?.CreateLogger<DoctorJobPlugin>();
var httpClientFactory = context.Services.GetRequiredService<IHttpClientFactory>();
var options = context.Services.GetRequiredService<IOptions<DoctorJobOptions>>().Value;
var trendRepository = context.Services.GetService<IDoctorTrendRepository>();
var config = DoctorScheduleConfig.FromPluginConfig(context.Schedule.PluginConfig);
var httpClient = httpClientFactory.CreateClient("DoctorApi");
logger?.LogInformation(
"Executing Doctor health check for schedule {ScheduleId} in {Mode} mode",
context.Schedule.Id, config.DoctorMode);
await context.Reporter.ReportProgressAsync(0, 3, "Triggering Doctor run", ct).ConfigureAwait(false);
// Step 1: Trigger Doctor run
var runId = await TriggerDoctorRunAsync(httpClient, options, config, logger, ct).ConfigureAwait(false);
await context.Reporter.ReportProgressAsync(1, 3, $"Doctor run {runId} triggered, waiting for completion", ct).ConfigureAwait(false);
await context.Reporter.AppendLogAsync($"Doctor run triggered: {runId}", "info", ct).ConfigureAwait(false);
// Step 2: Poll for completion
var (status, summary) = await WaitForRunCompletionAsync(httpClient, options, runId, config, logger, ct).ConfigureAwait(false);
await context.Reporter.ReportProgressAsync(2, 3, $"Doctor run completed with status: {status}", ct).ConfigureAwait(false);
await context.Reporter.AppendLogAsync(
$"Doctor run {runId} completed: {summary.PassedChecks}/{summary.TotalChecks} passed, health={summary.HealthScore}",
status == "fail" ? "error" : "info", ct).ConfigureAwait(false);
// Step 3: Store trend data
if (trendRepository is not null)
{
await StoreTrendDataAsync(httpClient, options, trendRepository, runId,
context.Schedule.TenantId, context.TimeProvider, logger, ct).ConfigureAwait(false);
}
await context.Reporter.ReportProgressAsync(3, 3, "Trend data stored", ct).ConfigureAwait(false);
// Check for alert conditions
if (config.Alerts?.Enabled == true)
{
var shouldAlert = (config.Alerts.AlertOnFail && status == "fail") ||
(config.Alerts.AlertOnWarn && status == "warn");
if (shouldAlert)
{
await context.Reporter.AppendLogAsync(
$"Alert condition met: status={status}, alertOnFail={config.Alerts.AlertOnFail}, alertOnWarn={config.Alerts.AlertOnWarn}",
"warning", ct).ConfigureAwait(false);
}
}
// Transition to completed or error based on Doctor results
if (status == "fail" && summary.FailedChecks > 0)
{
await context.Reporter.TransitionStateAsync(
RunState.Completed,
$"Doctor run completed with {summary.FailedChecks} failed checks",
ct).ConfigureAwait(false);
}
else
{
await context.Reporter.TransitionStateAsync(RunState.Completed, ct: ct).ConfigureAwait(false);
}
}
/// <inheritdoc />
public Task<JobConfigValidationResult> ValidateConfigAsync(
IReadOnlyDictionary<string, object?> pluginConfig,
CancellationToken ct)
{
var errors = new List<string>();
if (pluginConfig.TryGetValue("doctorMode", out var modeObj) && modeObj is string mode)
{
var validModes = new[] { "full", "quick", "categories", "plugins" };
if (!validModes.Contains(mode, StringComparer.OrdinalIgnoreCase))
{
errors.Add($"Invalid doctorMode '{mode}'. Valid values: {string.Join(", ", validModes)}");
}
if (string.Equals(mode, "categories", StringComparison.OrdinalIgnoreCase))
{
if (!pluginConfig.TryGetValue("categories", out var catObj) || catObj is not JsonElement cats || cats.GetArrayLength() == 0)
{
errors.Add("At least one category is required when doctorMode is 'categories'.");
}
}
if (string.Equals(mode, "plugins", StringComparison.OrdinalIgnoreCase))
{
if (!pluginConfig.TryGetValue("plugins", out var plugObj) || plugObj is not JsonElement plugs || plugs.GetArrayLength() == 0)
{
errors.Add("At least one plugin is required when doctorMode is 'plugins'.");
}
}
}
return Task.FromResult(errors.Count > 0
? JobConfigValidationResult.Invalid([.. errors])
: JobConfigValidationResult.Valid);
}
/// <inheritdoc />
public string? GetConfigJsonSchema()
{
return """
{
"type": "object",
"properties": {
"doctorMode": {
"type": "string",
"enum": ["full", "quick", "categories", "plugins"],
"default": "full"
},
"categories": {
"type": "array",
"items": { "type": "string" }
},
"plugins": {
"type": "array",
"items": { "type": "string" }
},
"timeoutSeconds": {
"type": "integer",
"minimum": 30,
"maximum": 3600,
"default": 300
},
"alerts": {
"type": "object",
"properties": {
"enabled": { "type": "boolean", "default": true },
"alertOnFail": { "type": "boolean", "default": true },
"alertOnWarn": { "type": "boolean", "default": false },
"alertOnStatusChange": { "type": "boolean", "default": true },
"channels": { "type": "array", "items": { "type": "string" } },
"minSeverity": { "type": "string", "default": "Fail" }
}
}
}
}
""";
}
/// <inheritdoc />
public void ConfigureServices(IServiceCollection services, IConfiguration configuration)
{
services.Configure<DoctorJobOptions>(configuration.GetSection(DoctorJobOptions.SectionName));
services.AddHttpClient("DoctorApi", (sp, client) =>
{
var opts = sp.GetRequiredService<IOptions<DoctorJobOptions>>().Value;
client.BaseAddress = new Uri(opts.DoctorApiUrl);
client.Timeout = TimeSpan.FromSeconds(opts.DefaultTimeoutSeconds + 30);
});
}
/// <inheritdoc />
public void MapEndpoints(IEndpointRouteBuilder routes)
{
var group = routes.MapGroup("/api/v1/scheduler/doctor")
.WithTags("Doctor", "Scheduler");
group.MapGet("/trends", async (
DateTimeOffset? from,
DateTimeOffset? to,
HttpContext httpContext,
IDoctorTrendRepository? trendRepository,
TimeProvider timeProvider) =>
{
if (trendRepository is null)
{
return Results.Json(new { summaries = Array.Empty<object>() });
}
var window = ResolveWindow(from, to, timeProvider);
if (window is null)
{
return Results.BadRequest(new { message = "Invalid time window: from must be before to." });
}
// Use a default tenant for now; in production this would come from the auth context
var tenantId = "demo-prod";
var summaries = await trendRepository.GetTrendSummariesAsync(tenantId, window.Value.From, window.Value.To);
return Results.Ok(new
{
window = new { from = window.Value.From, to = window.Value.To },
summaries
});
})
.WithName("GetSchedulerDoctorTrends")
.WithDescription("Returns aggregated health-check trend summaries from the Scheduler's Doctor plugin.");
group.MapGet("/trends/checks/{checkId}", async (
string checkId,
DateTimeOffset? from,
DateTimeOffset? to,
IDoctorTrendRepository? trendRepository,
TimeProvider timeProvider) =>
{
if (string.IsNullOrWhiteSpace(checkId))
{
return Results.BadRequest(new { message = "checkId is required." });
}
if (trendRepository is null)
{
return Results.Json(new { dataPoints = Array.Empty<object>() });
}
var window = ResolveWindow(from, to, timeProvider);
if (window is null)
{
return Results.BadRequest(new { message = "Invalid time window." });
}
var tenantId = "demo-prod";
var data = await trendRepository.GetTrendDataAsync(tenantId, checkId, window.Value.From, window.Value.To);
return Results.Ok(new
{
window = new { from = window.Value.From, to = window.Value.To },
checkId,
dataPoints = data
});
})
.WithName("GetSchedulerDoctorCheckTrend")
.WithDescription("Returns trend data points for a specific Doctor health check.");
group.MapGet("/trends/categories/{category}", async (
string category,
DateTimeOffset? from,
DateTimeOffset? to,
IDoctorTrendRepository? trendRepository,
TimeProvider timeProvider) =>
{
if (string.IsNullOrWhiteSpace(category))
{
return Results.BadRequest(new { message = "category is required." });
}
if (trendRepository is null)
{
return Results.Json(new { dataPoints = Array.Empty<object>() });
}
var window = ResolveWindow(from, to, timeProvider);
if (window is null)
{
return Results.BadRequest(new { message = "Invalid time window." });
}
var tenantId = "demo-prod";
var data = await trendRepository.GetCategoryTrendDataAsync(tenantId, category, window.Value.From, window.Value.To);
return Results.Ok(new
{
window = new { from = window.Value.From, to = window.Value.To },
category,
dataPoints = data
});
})
.WithName("GetSchedulerDoctorCategoryTrend")
.WithDescription("Returns trend data points for all checks within a specific Doctor check category.");
group.MapGet("/trends/degrading", async (
DateTimeOffset? from,
DateTimeOffset? to,
double? threshold,
IDoctorTrendRepository? trendRepository,
TimeProvider timeProvider) =>
{
if (trendRepository is null)
{
return Results.Json(new { checks = Array.Empty<object>() });
}
var window = ResolveWindow(from, to, timeProvider);
if (window is null)
{
return Results.BadRequest(new { message = "Invalid time window." });
}
var effectiveThreshold = threshold ?? 0.1d;
if (effectiveThreshold < 0 || double.IsNaN(effectiveThreshold))
{
return Results.BadRequest(new { message = "Threshold must be >= 0." });
}
var tenantId = "demo-prod";
var degrading = await trendRepository.GetDegradingChecksAsync(tenantId, window.Value.From, window.Value.To, effectiveThreshold);
return Results.Ok(new
{
window = new { from = window.Value.From, to = window.Value.To },
threshold = effectiveThreshold,
checks = degrading
});
})
.WithName("GetSchedulerDoctorDegradingChecks")
.WithDescription("Returns Doctor health checks with degrading trends.");
}
private static async Task<string> TriggerDoctorRunAsync(
HttpClient httpClient,
DoctorJobOptions options,
DoctorScheduleConfig config,
ILogger? logger,
CancellationToken ct)
{
var request = new
{
mode = config.DoctorMode,
categories = config.Categories,
plugins = config.Plugins,
async_ = true
};
var response = await httpClient.PostAsJsonAsync(
"/api/v1/doctor/run",
request,
ct).ConfigureAwait(false);
response.EnsureSuccessStatusCode();
var result = await response.Content.ReadFromJsonAsync<RunTriggerResponse>(cancellationToken: ct).ConfigureAwait(false);
var runId = result?.RunId ?? throw new InvalidOperationException("No run ID returned from Doctor API.");
logger?.LogInformation("Triggered Doctor run {RunId}", runId);
return runId;
}
private static async Task<(string Status, ExecutionSummary Summary)> WaitForRunCompletionAsync(
HttpClient httpClient,
DoctorJobOptions options,
string runId,
DoctorScheduleConfig config,
ILogger? logger,
CancellationToken ct)
{
var timeout = TimeSpan.FromSeconds(config.TimeoutSeconds > 0 ? config.TimeoutSeconds : options.DefaultTimeoutSeconds);
var sw = Stopwatch.StartNew();
while (sw.Elapsed < timeout)
{
ct.ThrowIfCancellationRequested();
var response = await httpClient.GetAsync($"/api/v1/doctor/run/{runId}", ct).ConfigureAwait(false);
if (!response.IsSuccessStatusCode)
{
await Task.Delay(TimeSpan.FromSeconds(options.PollIntervalSeconds), ct).ConfigureAwait(false);
continue;
}
var result = await response.Content.ReadFromJsonAsync<RunStatusResponse>(cancellationToken: ct).ConfigureAwait(false);
if (result?.Status is "completed")
{
var summary = new ExecutionSummary
{
TotalChecks = result.TotalChecks,
PassedChecks = result.PassedChecks,
WarnedChecks = result.WarnedChecks,
FailedChecks = result.FailedChecks,
SkippedChecks = result.SkippedChecks,
HealthScore = result.HealthScore,
};
var status = result.FailedChecks > 0 ? "fail"
: result.WarnedChecks > 0 ? "warn"
: "pass";
logger?.LogInformation("Doctor run {RunId} completed: {Status}", runId, status);
return (status, summary);
}
await Task.Delay(TimeSpan.FromSeconds(options.PollIntervalSeconds), ct).ConfigureAwait(false);
}
throw new TimeoutException($"Doctor run {runId} did not complete within {timeout.TotalSeconds}s");
}
private static async Task StoreTrendDataAsync(
HttpClient httpClient,
DoctorJobOptions options,
IDoctorTrendRepository trendRepository,
string runId,
string tenantId,
TimeProvider timeProvider,
ILogger? logger,
CancellationToken ct)
{
try
{
var response = await httpClient.GetAsync($"/api/v1/doctor/run/{runId}/results", ct).ConfigureAwait(false);
if (!response.IsSuccessStatusCode)
{
logger?.LogWarning("Failed to fetch Doctor run results for {RunId}: {StatusCode}", runId, response.StatusCode);
return;
}
var results = await response.Content.ReadFromJsonAsync<RunResultsResponse>(cancellationToken: ct).ConfigureAwait(false);
if (results?.Results is null || results.Results.Count == 0)
{
return;
}
var timestamp = timeProvider.GetUtcNow();
var dataPoints = results.Results.Select(r => new DoctorTrendDataPoint
{
TenantId = tenantId,
Timestamp = timestamp,
CheckId = r.CheckId,
PluginId = r.PluginId,
Category = r.Category,
RunId = runId,
Status = r.Status,
HealthScore = CalculateHealthScore(r.Status),
DurationMs = r.DurationMs,
EvidenceValues = ExtractTrendEvidence(r.Evidence),
}).ToList();
await trendRepository.StoreTrendDataAsync(dataPoints, ct).ConfigureAwait(false);
logger?.LogInformation("Stored {Count} trend data points for Doctor run {RunId}", dataPoints.Count, runId);
}
catch (Exception ex)
{
logger?.LogWarning(ex, "Failed to store trend data for Doctor run {RunId}", runId);
}
}
private static int CalculateHealthScore(string status) => status.ToLowerInvariant() switch
{
"pass" => 100,
"warn" => 50,
"fail" => 0,
"skip" => -1,
_ => 0
};
private static IReadOnlyDictionary<string, string> ExtractTrendEvidence(Dictionary<string, object>? evidence)
{
if (evidence is null)
{
return new Dictionary<string, string>();
}
return evidence
.Where(kv => kv.Value is int or long or double or string)
.Where(kv => !kv.Key.Contains("url", StringComparison.OrdinalIgnoreCase))
.Where(kv => !kv.Key.Contains("message", StringComparison.OrdinalIgnoreCase))
.Take(10)
.ToDictionary(kv => kv.Key, kv => kv.Value?.ToString() ?? string.Empty);
}
private static (DateTimeOffset From, DateTimeOffset To)? ResolveWindow(
DateTimeOffset? from, DateTimeOffset? to, TimeProvider timeProvider)
{
var end = to ?? timeProvider.GetUtcNow();
var start = from ?? end.AddDays(-30);
return start > end ? null : (start, end);
}
// HTTP response models (matching Doctor WebService API)
private sealed record RunTriggerResponse(string RunId);
private sealed record RunStatusResponse(
string Status,
int TotalChecks,
int PassedChecks,
int WarnedChecks,
int FailedChecks,
int SkippedChecks,
int HealthScore);
private sealed record RunResultsResponse(IReadOnlyList<CheckResult>? Results);
private sealed record CheckResult(
string CheckId,
string PluginId,
string Category,
string Status,
int DurationMs,
Dictionary<string, object>? Evidence);
}
/// <summary>
/// Summary of a Doctor schedule execution's results.
/// </summary>
internal sealed record ExecutionSummary
{
public int TotalChecks { get; init; }
public int PassedChecks { get; init; }
public int WarnedChecks { get; init; }
public int FailedChecks { get; init; }
public int SkippedChecks { get; init; }
public int HealthScore { get; init; }
}

View File

@@ -0,0 +1,85 @@
using System.Text.Json;
using System.Text.Json.Serialization;
namespace StellaOps.Scheduler.Plugin.Doctor;
/// <summary>
/// Doctor-specific schedule configuration stored in Schedule.PluginConfig.
/// </summary>
public sealed record DoctorScheduleConfig
{
/// <summary>
/// Doctor run mode: full, quick, categories, or plugins.
/// </summary>
[JsonPropertyName("doctorMode")]
public string DoctorMode { get; init; } = "full";
/// <summary>
/// Categories to run (only relevant when doctorMode is "categories").
/// </summary>
[JsonPropertyName("categories")]
public IReadOnlyList<string> Categories { get; init; } = [];
/// <summary>
/// Specific plugins to run (only relevant when doctorMode is "plugins").
/// </summary>
[JsonPropertyName("plugins")]
public IReadOnlyList<string> Plugins { get; init; } = [];
/// <summary>
/// Timeout in seconds for the Doctor run.
/// </summary>
[JsonPropertyName("timeoutSeconds")]
public int TimeoutSeconds { get; init; } = 300;
/// <summary>
/// Alert configuration for this schedule.
/// </summary>
[JsonPropertyName("alerts")]
public DoctorAlertConfig? Alerts { get; init; }
/// <summary>
/// Deserializes a DoctorScheduleConfig from a plugin config dictionary.
/// </summary>
public static DoctorScheduleConfig FromPluginConfig(IReadOnlyDictionary<string, object?>? pluginConfig)
{
if (pluginConfig is null || pluginConfig.Count == 0)
{
return new DoctorScheduleConfig();
}
// Re-serialize to JSON and back to get proper deserialization
var json = JsonSerializer.Serialize(pluginConfig);
return JsonSerializer.Deserialize<DoctorScheduleConfig>(json) ?? new DoctorScheduleConfig();
}
}
/// <summary>
/// Alert configuration for Doctor health check schedules.
/// </summary>
public sealed record DoctorAlertConfig
{
[JsonPropertyName("enabled")]
public bool Enabled { get; init; } = true;
[JsonPropertyName("alertOnFail")]
public bool AlertOnFail { get; init; } = true;
[JsonPropertyName("alertOnWarn")]
public bool AlertOnWarn { get; init; }
[JsonPropertyName("alertOnStatusChange")]
public bool AlertOnStatusChange { get; init; } = true;
[JsonPropertyName("channels")]
public IReadOnlyList<string> Channels { get; init; } = [];
[JsonPropertyName("emailRecipients")]
public IReadOnlyList<string> EmailRecipients { get; init; } = [];
[JsonPropertyName("webhookUrls")]
public IReadOnlyList<string> WebhookUrls { get; init; } = [];
[JsonPropertyName("minSeverity")]
public string MinSeverity { get; init; } = "Fail";
}

View File

@@ -0,0 +1,39 @@
namespace StellaOps.Scheduler.Plugin.Doctor;
/// <summary>
/// A single data point in a Doctor health trend, persisted to scheduler.doctor_trends.
/// </summary>
public sealed record DoctorTrendDataPoint
{
public long Id { get; init; }
public required string TenantId { get; init; }
public DateTimeOffset Timestamp { get; init; }
public required string CheckId { get; init; }
public required string PluginId { get; init; }
public required string Category { get; init; }
public required string RunId { get; init; }
public required string Status { get; init; }
public int HealthScore { get; init; }
public int DurationMs { get; init; }
public IReadOnlyDictionary<string, string> EvidenceValues { get; init; } = new Dictionary<string, string>();
}
/// <summary>
/// Aggregated trend summary for a Doctor check over a time period.
/// </summary>
public sealed record DoctorTrendSummary
{
public required string CheckId { get; init; }
public required string CheckName { get; init; }
public DateTimeOffset PeriodStart { get; init; }
public DateTimeOffset PeriodEnd { get; init; }
public int TotalRuns { get; init; }
public int PassCount { get; init; }
public int WarnCount { get; init; }
public int FailCount { get; init; }
public double SuccessRate => TotalRuns > 0 ? (double)PassCount / TotalRuns : 0;
public double AvgHealthScore { get; init; }
public string Direction { get; init; } = "stable";
public double ChangePercent { get; init; }
public int AvgDurationMs { get; init; }
}

View File

@@ -0,0 +1,41 @@
namespace StellaOps.Scheduler.Plugin.Doctor;
/// <summary>
/// Repository for persisting and querying Doctor trend data in the Scheduler's Postgres schema.
/// </summary>
public interface IDoctorTrendRepository
{
/// <summary>
/// Stores trend data points from a Doctor run.
/// </summary>
Task StoreTrendDataAsync(IEnumerable<DoctorTrendDataPoint> dataPoints, CancellationToken ct = default);
/// <summary>
/// Gets trend data points for a specific check over a time range.
/// </summary>
Task<IReadOnlyList<DoctorTrendDataPoint>> GetTrendDataAsync(
string tenantId, string checkId, DateTimeOffset from, DateTimeOffset to, CancellationToken ct = default);
/// <summary>
/// Gets trend data points for a category over a time range.
/// </summary>
Task<IReadOnlyList<DoctorTrendDataPoint>> GetCategoryTrendDataAsync(
string tenantId, string category, DateTimeOffset from, DateTimeOffset to, CancellationToken ct = default);
/// <summary>
/// Gets aggregated trend summaries for all checks over a time range.
/// </summary>
Task<IReadOnlyList<DoctorTrendSummary>> GetTrendSummariesAsync(
string tenantId, DateTimeOffset from, DateTimeOffset to, CancellationToken ct = default);
/// <summary>
/// Gets checks with degrading trends.
/// </summary>
Task<IReadOnlyList<DoctorTrendSummary>> GetDegradingChecksAsync(
string tenantId, DateTimeOffset from, DateTimeOffset to, double degradationThreshold = 0.1, CancellationToken ct = default);
/// <summary>
/// Prunes old trend data beyond retention period.
/// </summary>
Task PruneOldDataAsync(DateTimeOffset olderThan, CancellationToken ct = default);
}

View File

@@ -0,0 +1,189 @@
using System.Text.Json;
using Dapper;
using Npgsql;
namespace StellaOps.Scheduler.Plugin.Doctor;
/// <summary>
/// PostgreSQL implementation of <see cref="IDoctorTrendRepository"/>.
/// Uses the Scheduler's database connection and the scheduler.doctor_trends table.
/// </summary>
public sealed class PostgresDoctorTrendRepository : IDoctorTrendRepository
{
private readonly Func<NpgsqlConnection> _connectionFactory;
public PostgresDoctorTrendRepository(Func<NpgsqlConnection> connectionFactory)
{
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
}
/// <inheritdoc />
public async Task StoreTrendDataAsync(IEnumerable<DoctorTrendDataPoint> dataPoints, CancellationToken ct = default)
{
await using var connection = _connectionFactory();
await connection.OpenAsync(ct).ConfigureAwait(false);
const string sql = """
INSERT INTO scheduler.doctor_trends
(tenant_id, timestamp, check_id, plugin_id, category, run_id, status, health_score, duration_ms, evidence_values)
VALUES
(@TenantId, @Timestamp, @CheckId, @PluginId, @Category, @RunId, @Status, @HealthScore, @DurationMs, @EvidenceValues::jsonb)
""";
foreach (var dp in dataPoints)
{
var evidenceJson = JsonSerializer.Serialize(dp.EvidenceValues);
await connection.ExecuteAsync(sql, new
{
dp.TenantId,
dp.Timestamp,
dp.CheckId,
dp.PluginId,
dp.Category,
dp.RunId,
dp.Status,
dp.HealthScore,
dp.DurationMs,
EvidenceValues = evidenceJson,
}).ConfigureAwait(false);
}
}
/// <inheritdoc />
public async Task<IReadOnlyList<DoctorTrendDataPoint>> GetTrendDataAsync(
string tenantId, string checkId, DateTimeOffset from, DateTimeOffset to, CancellationToken ct = default)
{
await using var connection = _connectionFactory();
await connection.OpenAsync(ct).ConfigureAwait(false);
const string sql = """
SELECT id, tenant_id AS TenantId, timestamp, check_id AS CheckId, plugin_id AS PluginId,
category, run_id AS RunId, status, health_score AS HealthScore, duration_ms AS DurationMs,
evidence_values::text AS EvidenceValuesJson
FROM scheduler.doctor_trends
WHERE tenant_id = @TenantId AND check_id = @CheckId
AND timestamp >= @From AND timestamp <= @To
ORDER BY timestamp DESC
LIMIT 10000
""";
var rows = await connection.QueryAsync<DoctorTrendRow>(sql, new { TenantId = tenantId, CheckId = checkId, From = from, To = to })
.ConfigureAwait(false);
return rows.Select(MapRow).ToArray();
}
/// <inheritdoc />
public async Task<IReadOnlyList<DoctorTrendDataPoint>> GetCategoryTrendDataAsync(
string tenantId, string category, DateTimeOffset from, DateTimeOffset to, CancellationToken ct = default)
{
await using var connection = _connectionFactory();
await connection.OpenAsync(ct).ConfigureAwait(false);
const string sql = """
SELECT id, tenant_id AS TenantId, timestamp, check_id AS CheckId, plugin_id AS PluginId,
category, run_id AS RunId, status, health_score AS HealthScore, duration_ms AS DurationMs,
evidence_values::text AS EvidenceValuesJson
FROM scheduler.doctor_trends
WHERE tenant_id = @TenantId AND category = @Category
AND timestamp >= @From AND timestamp <= @To
ORDER BY timestamp DESC
LIMIT 10000
""";
var rows = await connection.QueryAsync<DoctorTrendRow>(sql, new { TenantId = tenantId, Category = category, From = from, To = to })
.ConfigureAwait(false);
return rows.Select(MapRow).ToArray();
}
/// <inheritdoc />
public async Task<IReadOnlyList<DoctorTrendSummary>> GetTrendSummariesAsync(
string tenantId, DateTimeOffset from, DateTimeOffset to, CancellationToken ct = default)
{
await using var connection = _connectionFactory();
await connection.OpenAsync(ct).ConfigureAwait(false);
const string sql = """
SELECT check_id AS CheckId,
check_id AS CheckName,
MIN(timestamp) AS PeriodStart,
MAX(timestamp) AS PeriodEnd,
COUNT(*)::int AS TotalRuns,
COUNT(*) FILTER (WHERE status = 'pass')::int AS PassCount,
COUNT(*) FILTER (WHERE status = 'warn')::int AS WarnCount,
COUNT(*) FILTER (WHERE status = 'fail')::int AS FailCount,
COALESCE(AVG(health_score), 0) AS AvgHealthScore,
COALESCE(AVG(duration_ms)::int, 0) AS AvgDurationMs
FROM scheduler.doctor_trends
WHERE tenant_id = @TenantId AND timestamp >= @From AND timestamp <= @To
GROUP BY check_id
ORDER BY check_id
""";
var rows = await connection.QueryAsync<DoctorTrendSummary>(sql, new { TenantId = tenantId, From = from, To = to })
.ConfigureAwait(false);
return rows.ToArray();
}
/// <inheritdoc />
public async Task<IReadOnlyList<DoctorTrendSummary>> GetDegradingChecksAsync(
string tenantId, DateTimeOffset from, DateTimeOffset to, double degradationThreshold = 0.1, CancellationToken ct = default)
{
// Simple approach: compare first-half success rate vs second-half success rate
var summaries = await GetTrendSummariesAsync(tenantId, from, to, ct).ConfigureAwait(false);
return summaries
.Where(s => s.TotalRuns > 0 && s.FailCount > s.TotalRuns * degradationThreshold)
.Select(s => s with { Direction = "degrading" })
.ToArray();
}
/// <inheritdoc />
public async Task PruneOldDataAsync(DateTimeOffset olderThan, CancellationToken ct = default)
{
await using var connection = _connectionFactory();
await connection.OpenAsync(ct).ConfigureAwait(false);
const string sql = "DELETE FROM scheduler.doctor_trends WHERE timestamp < @OlderThan";
await connection.ExecuteAsync(sql, new { OlderThan = olderThan }).ConfigureAwait(false);
}
private static DoctorTrendDataPoint MapRow(DoctorTrendRow row)
{
var evidence = string.IsNullOrEmpty(row.EvidenceValuesJson)
? new Dictionary<string, string>()
: JsonSerializer.Deserialize<Dictionary<string, string>>(row.EvidenceValuesJson) ?? new Dictionary<string, string>();
return new DoctorTrendDataPoint
{
Id = row.Id,
TenantId = row.TenantId,
Timestamp = row.Timestamp,
CheckId = row.CheckId,
PluginId = row.PluginId,
Category = row.Category,
RunId = row.RunId,
Status = row.Status,
HealthScore = row.HealthScore,
DurationMs = row.DurationMs,
EvidenceValues = evidence,
};
}
private sealed record DoctorTrendRow
{
public long Id { get; init; }
public required string TenantId { get; init; }
public DateTimeOffset Timestamp { get; init; }
public required string CheckId { get; init; }
public required string PluginId { get; init; }
public required string Category { get; init; }
public required string RunId { get; init; }
public required string Status { get; init; }
public int HealthScore { get; init; }
public int DurationMs { get; init; }
public string? EvidenceValuesJson { get; init; }
}
}

View File

@@ -0,0 +1,25 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<LangVersion>preview</LangVersion>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<RootNamespace>StellaOps.Scheduler.Plugin.Doctor</RootNamespace>
<AssemblyName>StellaOps.Scheduler.Plugin.Doctor</AssemblyName>
<Description>Doctor health check job plugin for the StellaOps Scheduler. Replaces the standalone doctor-scheduler service.</Description>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\StellaOps.Scheduler.Plugin.Abstractions\StellaOps.Scheduler.Plugin.Abstractions.csproj" />
</ItemGroup>
<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Dapper" />
<PackageReference Include="Npgsql" />
</ItemGroup>
</Project>

View File

@@ -0,0 +1,86 @@
using Microsoft.AspNetCore.Routing;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using StellaOps.Scheduler.Models;
namespace StellaOps.Scheduler.Plugin.Scan;
/// <summary>
/// Built-in scan job plugin that wraps the existing Scheduler scan logic.
/// This is a pure delegation layer that ensures backward compatibility:
/// existing scan schedules execute through the same code paths as before.
/// </summary>
public sealed class ScanJobPlugin : ISchedulerJobPlugin
{
/// <inheritdoc />
public string JobKind => "scan";
/// <inheritdoc />
public string DisplayName => "Vulnerability Scan";
/// <inheritdoc />
public Version Version { get; } = new(1, 0, 0);
/// <inheritdoc />
public Task<JobPlan> CreatePlanAsync(JobPlanContext context, CancellationToken ct)
{
// Scan plans use the existing Schedule model fields (Mode, Selection, OnlyIf, Limits)
// which are already persisted. The plugin just wraps them into a JobPlan envelope.
var payload = new Dictionary<string, object?>
{
["mode"] = context.Schedule.Mode.ToString(),
["selectorScope"] = context.Schedule.Selection.Scope.ToString(),
["scheduleId"] = context.Schedule.Id,
};
var plan = new JobPlan(
JobKind: "scan",
Payload: payload,
EstimatedSteps: 1);
return Task.FromResult(plan);
}
/// <inheritdoc />
public Task ExecuteAsync(JobExecutionContext context, CancellationToken ct)
{
// Scan execution is handled by the existing Worker Host pipeline.
// This plugin delegates to the existing queue-based execution:
// the Run is already queued by the Scheduler and picked up by
// the Worker Host's segment processor. No additional logic needed.
var logger = context.Services.GetService<ILoggerFactory>()?.CreateLogger<ScanJobPlugin>();
logger?.LogDebug(
"ScanJobPlugin.ExecuteAsync invoked for run {RunId} on schedule {ScheduleId}. " +
"Execution is handled by the existing Worker Host pipeline.",
context.Run.Id,
context.Schedule.Id);
return Task.CompletedTask;
}
/// <inheritdoc />
public Task<JobConfigValidationResult> ValidateConfigAsync(
IReadOnlyDictionary<string, object?> pluginConfig,
CancellationToken ct)
{
// Scan jobs do not use pluginConfig; all config is in Schedule.Mode/Selection/etc.
// If pluginConfig is provided, it is ignored (not an error for forward compatibility).
return Task.FromResult(JobConfigValidationResult.Valid);
}
/// <inheritdoc />
public string? GetConfigJsonSchema() => null;
/// <inheritdoc />
public void ConfigureServices(IServiceCollection services, IConfiguration configuration)
{
// No additional services needed; scan services are registered in the main DI setup.
}
/// <inheritdoc />
public void MapEndpoints(IEndpointRouteBuilder routes)
{
// No additional endpoints; scan endpoints are registered in the main endpoint setup.
}
}

View File

@@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<LangVersion>preview</LangVersion>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<RootNamespace>StellaOps.Scheduler.Plugin.Scan</RootNamespace>
<AssemblyName>StellaOps.Scheduler.Plugin.Scan</AssemblyName>
<Description>Built-in scan job plugin for the StellaOps Scheduler. Wraps existing scan logic with zero behavioral change.</Description>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\StellaOps.Scheduler.Plugin.Abstractions\StellaOps.Scheduler.Plugin.Abstractions.csproj" />
</ItemGroup>
</Project>

View File

@@ -0,0 +1,139 @@
using Dapper;
using StellaOps.Scheduler.Persistence.Postgres;
namespace StellaOps.Scheduler.Worker.Exceptions;
public sealed class PostgresExceptionRepository : IExceptionRepository
{
private readonly SchedulerDataSource _dataSource;
public PostgresExceptionRepository(SchedulerDataSource dataSource)
{
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
}
public async ValueTask<ExceptionRecord?> GetAsync(string exceptionId, CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(exceptionId);
await using var conn = await _dataSource.OpenSystemConnectionAsync(cancellationToken);
const string sql = """
SELECT exception_id, tenant_id, policy_id, vulnerability_id, component_purl,
state, created_at, activation_date, expiration_date, activated_at,
expired_at, justification, created_by
FROM scheduler.scheduler_exceptions
WHERE exception_id = @ExceptionId
LIMIT 1;
""";
var row = await conn.QuerySingleOrDefaultAsync(sql, new { ExceptionId = exceptionId });
return row is null ? null : Map(row);
}
public async ValueTask<IReadOnlyList<ExceptionRecord>> GetPendingActivationsAsync(
DateTimeOffset asOf,
CancellationToken cancellationToken = default)
{
await using var conn = await _dataSource.OpenSystemConnectionAsync(cancellationToken);
const string sql = """
SELECT exception_id, tenant_id, policy_id, vulnerability_id, component_purl,
state, created_at, activation_date, expiration_date, activated_at,
expired_at, justification, created_by
FROM scheduler.scheduler_exceptions
WHERE state = 'pending' AND activation_date <= @AsOf
ORDER BY activation_date ASC;
""";
var rows = await conn.QueryAsync(sql, new { AsOf = asOf });
return rows.Select(Map).ToList();
}
public async ValueTask<IReadOnlyList<ExceptionRecord>> GetExpiredExceptionsAsync(
DateTimeOffset asOf,
CancellationToken cancellationToken = default)
{
await using var conn = await _dataSource.OpenSystemConnectionAsync(cancellationToken);
const string sql = """
SELECT exception_id, tenant_id, policy_id, vulnerability_id, component_purl,
state, created_at, activation_date, expiration_date, activated_at,
expired_at, justification, created_by
FROM scheduler.scheduler_exceptions
WHERE state = 'active' AND expiration_date <= @AsOf
ORDER BY expiration_date ASC;
""";
var rows = await conn.QueryAsync(sql, new { AsOf = asOf });
return rows.Select(Map).ToList();
}
public async ValueTask<IReadOnlyList<ExceptionRecord>> GetExpiringExceptionsAsync(
DateTimeOffset windowStart,
DateTimeOffset windowEnd,
CancellationToken cancellationToken = default)
{
await using var conn = await _dataSource.OpenSystemConnectionAsync(cancellationToken);
const string sql = """
SELECT exception_id, tenant_id, policy_id, vulnerability_id, component_purl,
state, created_at, activation_date, expiration_date, activated_at,
expired_at, justification, created_by
FROM scheduler.scheduler_exceptions
WHERE state = 'active'
AND expiration_date > @WindowStart
AND expiration_date <= @WindowEnd
ORDER BY expiration_date ASC;
""";
var rows = await conn.QueryAsync(sql, new { WindowStart = windowStart, WindowEnd = windowEnd });
return rows.Select(Map).ToList();
}
public async ValueTask UpdateAsync(ExceptionRecord record, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(record);
await using var conn = await _dataSource.OpenSystemConnectionAsync(cancellationToken);
const string sql = """
UPDATE scheduler.scheduler_exceptions
SET state = @State::scheduler.exception_state,
activation_date = @ActivationDate,
expiration_date = @ExpirationDate,
activated_at = @ActivatedAt,
expired_at = @ExpiredAt,
justification = @Justification
WHERE exception_id = @ExceptionId;
""";
await conn.ExecuteAsync(sql, new
{
record.ExceptionId,
State = record.State.ToString().ToLowerInvariant(),
record.ActivationDate,
record.ExpirationDate,
record.ActivatedAt,
record.ExpiredAt,
record.Justification
});
}
private static ExceptionRecord Map(dynamic row)
{
return new ExceptionRecord(
(string)row.exception_id,
(string)row.tenant_id,
(string)row.policy_id,
(string)row.vulnerability_id,
(string?)row.component_purl,
Enum.Parse<ExceptionState>((string)row.state, true),
DateTime.SpecifyKind(row.created_at, DateTimeKind.Utc),
row.activation_date is null ? null : (DateTimeOffset?)DateTime.SpecifyKind(row.activation_date, DateTimeKind.Utc),
row.expiration_date is null ? null : (DateTimeOffset?)DateTime.SpecifyKind(row.expiration_date, DateTimeKind.Utc),
row.activated_at is null ? null : (DateTimeOffset?)DateTime.SpecifyKind(row.activated_at, DateTimeKind.Utc),
row.expired_at is null ? null : (DateTimeOffset?)DateTime.SpecifyKind(row.expired_at, DateTimeKind.Utc),
(string?)row.justification,
(string?)row.created_by);
}
}

View File

@@ -0,0 +1,105 @@
using Microsoft.AspNetCore.Mvc;
using StellaOps.Auth.ServerIntegration.Tenancy;
using StellaOps.JobEngine.Core.Domain;
using StellaOps.JobEngine.Infrastructure.Repositories;
using StellaOps.ReleaseOrchestrator.WebApi.Contracts;
using StellaOps.ReleaseOrchestrator.WebApi.Services;
namespace StellaOps.ReleaseOrchestrator.WebApi.Endpoints;
/// <summary>
/// Legacy /api/v1/jobengine/* compatibility endpoints.
///
/// The UI clients (deadletter.client.ts, slo.client.ts, quota.client.ts,
/// jobengine-control.client.ts, audit-log.client.ts, etc.) still call
/// /api/v1/jobengine/* paths. The gateway routes these to the
/// release-orchestrator host preserving the path. This file registers
/// the legacy paths so the requests actually match an endpoint.
///
/// Endpoints that already have real implementations (audit) delegate
/// to the same handlers. Endpoints for features not yet migrated from
/// JobEngine (quotas, deadletter, jobs, slos, pack-runs, dag, stream,
/// sources) return structured 501 Not Implemented responses so the UI
/// gets a clear signal rather than a confusing 404.
/// </summary>
public static class JobEngineLegacyEndpoints
{
public static IEndpointRouteBuilder MapJobEngineLegacyEndpoints(this IEndpointRouteBuilder app)
{
MapLegacyAuditEndpoints(app);
MapLegacyStubEndpoints(app);
return app;
}
// -----------------------------------------------------------------------
// Audit -- has real implementation, re-use same handlers
// -----------------------------------------------------------------------
private static void MapLegacyAuditEndpoints(IEndpointRouteBuilder app)
{
var group = app.MapGroup("/api/v1/jobengine/audit")
.WithTags("JobEngine Legacy - Audit")
.RequireAuthorization(ReleaseOrchestratorPolicies.Read)
.RequireTenant();
group.MapGet("events", AuditEndpoints.ListAuditEventsLegacy);
}
// -----------------------------------------------------------------------
// Stubs -- features not yet migrated from the monolithic JobEngine
// -----------------------------------------------------------------------
private static void MapLegacyStubEndpoints(IEndpointRouteBuilder app)
{
// Each sub-path gets a catch-all GET and POST so the UI receives
// a well-formed 501 instead of a 404.
string[] stubPrefixes =
{
"/api/v1/jobengine/quotas",
"/api/v1/jobengine/deadletter",
"/api/v1/jobengine/jobs",
"/api/v1/jobengine/runs",
"/api/v1/jobengine/dag",
"/api/v1/jobengine/pack-runs",
"/api/v1/jobengine/stream",
"/api/v1/jobengine/sources",
"/api/v1/jobengine/slos",
};
foreach (var prefix in stubPrefixes)
{
var tag = $"JobEngine Legacy - {prefix.Split('/').Last()}";
var group = app.MapGroup(prefix)
.WithTags(tag)
.RequireTenant();
group.MapGet("{**rest}", NotImplementedStub)
.WithDescription("Legacy jobengine compatibility stub - feature pending migration.");
group.MapPost("{**rest}", NotImplementedStub)
.WithDescription("Legacy jobengine compatibility stub - feature pending migration.");
group.MapPut("{**rest}", NotImplementedStub)
.WithDescription("Legacy jobengine compatibility stub - feature pending migration.");
group.MapDelete("{**rest}", NotImplementedStub)
.WithDescription("Legacy jobengine compatibility stub - feature pending migration.");
// Also handle the root path (no trailing path)
group.MapGet(string.Empty, NotImplementedStub)
.WithDescription("Legacy jobengine compatibility stub - feature pending migration.");
group.MapPost(string.Empty, NotImplementedStub)
.WithDescription("Legacy jobengine compatibility stub - feature pending migration.");
}
}
private static IResult NotImplementedStub(HttpContext context)
{
var path = context.Request.Path.Value ?? "";
return Results.Json(
new
{
error = "not_implemented",
message = "This jobengine endpoint has not yet been migrated to the release-orchestrator service.",
path,
hint = "The corresponding feature is tracked in the orchestrator decomposition plan."
},
statusCode: StatusCodes.Status501NotImplemented);
}
}

View File

@@ -0,0 +1,278 @@
using StellaOps.ReleaseOrchestrator.Persistence.Hashing;
namespace StellaOps.ReleaseOrchestrator.Persistence.Domain;
/// <summary>
/// Represents an immutable audit log entry for release-orchestrator operations.
/// Captures who did what, when, and with what effect.
/// </summary>
public sealed record AuditEntry(
/// <summary>Unique audit entry identifier.</summary>
Guid EntryId,
/// <summary>Tenant owning this entry.</summary>
string TenantId,
/// <summary>Type of audited event.</summary>
AuditEventType EventType,
/// <summary>Resource type being audited (job, run, source, quota, etc.).</summary>
string ResourceType,
/// <summary>Resource identifier being audited.</summary>
Guid ResourceId,
/// <summary>Actor who performed the action.</summary>
string ActorId,
/// <summary>Actor type (user, system, worker, api-key).</summary>
ActorType ActorType,
/// <summary>IP address of the actor (if applicable).</summary>
string? ActorIp,
/// <summary>User agent string (if applicable).</summary>
string? UserAgent,
/// <summary>HTTP method used (if applicable).</summary>
string? HttpMethod,
/// <summary>Request path (if applicable).</summary>
string? RequestPath,
/// <summary>State before the change (JSON).</summary>
string? OldState,
/// <summary>State after the change (JSON).</summary>
string? NewState,
/// <summary>Human-readable description of the change.</summary>
string Description,
/// <summary>Correlation ID for distributed tracing.</summary>
string? CorrelationId,
/// <summary>SHA-256 hash of the previous entry for chain integrity.</summary>
string? PreviousEntryHash,
/// <summary>SHA-256 hash of this entry's content for integrity.</summary>
string ContentHash,
/// <summary>Sequence number within the tenant's audit stream.</summary>
long SequenceNumber,
/// <summary>When the event occurred.</summary>
DateTimeOffset OccurredAt,
/// <summary>Optional metadata JSON blob.</summary>
string? Metadata)
{
/// <summary>
/// Creates a new audit entry with computed hash.
/// Uses the platform's compliance-aware crypto abstraction.
/// </summary>
public static AuditEntry Create(
CanonicalJsonHasher hasher,
string tenantId,
AuditEventType eventType,
string resourceType,
Guid resourceId,
string actorId,
ActorType actorType,
string description,
DateTimeOffset occurredAt,
string? oldState = null,
string? newState = null,
string? actorIp = null,
string? userAgent = null,
string? httpMethod = null,
string? requestPath = null,
string? correlationId = null,
string? previousEntryHash = null,
long sequenceNumber = 0,
string? metadata = null,
Guid? entryId = null)
{
ArgumentNullException.ThrowIfNull(hasher);
var actualEntryId = entryId ?? Guid.NewGuid();
// Compute canonical hash from immutable content
var contentHash = hasher.ComputeCanonicalHash(new
{
EntryId = actualEntryId,
TenantId = tenantId,
EventType = eventType,
ResourceType = resourceType,
ResourceId = resourceId,
ActorId = actorId,
ActorType = actorType,
Description = description,
OldState = oldState,
NewState = newState,
OccurredAt = occurredAt,
SequenceNumber = sequenceNumber
});
return new AuditEntry(
EntryId: actualEntryId,
TenantId: tenantId,
EventType: eventType,
ResourceType: resourceType,
ResourceId: resourceId,
ActorId: actorId,
ActorType: actorType,
ActorIp: actorIp,
UserAgent: userAgent,
HttpMethod: httpMethod,
RequestPath: requestPath,
OldState: oldState,
NewState: newState,
Description: description,
CorrelationId: correlationId,
PreviousEntryHash: previousEntryHash,
ContentHash: contentHash,
SequenceNumber: sequenceNumber,
OccurredAt: occurredAt,
Metadata: metadata);
}
/// <summary>
/// Verifies the integrity of this entry's content hash.
/// </summary>
public bool VerifyIntegrity(CanonicalJsonHasher hasher)
{
ArgumentNullException.ThrowIfNull(hasher);
var computed = hasher.ComputeCanonicalHash(new
{
EntryId,
TenantId,
EventType,
ResourceType,
ResourceId,
ActorId,
ActorType,
Description,
OldState,
NewState,
OccurredAt,
SequenceNumber
});
return string.Equals(ContentHash, computed, StringComparison.OrdinalIgnoreCase);
}
/// <summary>
/// Verifies the chain link to the previous entry.
/// </summary>
public bool VerifyChainLink(AuditEntry? previousEntry)
{
if (previousEntry is null)
{
return PreviousEntryHash is null || SequenceNumber == 1;
}
return string.Equals(PreviousEntryHash, previousEntry.ContentHash, StringComparison.OrdinalIgnoreCase);
}
}
/// <summary>
/// Types of auditable events in the orchestrator.
/// </summary>
public enum AuditEventType
{
// Job lifecycle events
JobCreated = 100,
JobScheduled = 101,
JobLeased = 102,
JobCompleted = 103,
JobFailed = 104,
JobCanceled = 105,
JobRetried = 106,
// Run lifecycle events
RunCreated = 200,
RunStarted = 201,
RunCompleted = 202,
RunFailed = 203,
RunCanceled = 204,
// Source management events
SourceCreated = 300,
SourceUpdated = 301,
SourcePaused = 302,
SourceResumed = 303,
SourceDeleted = 304,
// Quota management events
QuotaCreated = 400,
QuotaUpdated = 401,
QuotaPaused = 402,
QuotaResumed = 403,
QuotaDeleted = 404,
// SLO management events
SloCreated = 500,
SloUpdated = 501,
SloEnabled = 502,
SloDisabled = 503,
SloDeleted = 504,
SloAlertTriggered = 505,
SloAlertAcknowledged = 506,
SloAlertResolved = 507,
// Dead-letter events
DeadLetterCreated = 600,
DeadLetterReplayed = 601,
DeadLetterResolved = 602,
DeadLetterExpired = 603,
// Backfill events
BackfillCreated = 700,
BackfillStarted = 701,
BackfillCompleted = 702,
BackfillFailed = 703,
BackfillCanceled = 704,
// Ledger events
LedgerExportRequested = 800,
LedgerExportCompleted = 801,
LedgerExportFailed = 802,
// Worker events
WorkerClaimed = 900,
WorkerHeartbeat = 901,
WorkerProgressReported = 902,
WorkerCompleted = 903,
// Security events
AuthenticationSuccess = 1000,
AuthenticationFailure = 1001,
AuthorizationDenied = 1002,
ApiKeyCreated = 1003,
ApiKeyRevoked = 1004
}
/// <summary>
/// Types of actors that can perform auditable actions.
/// </summary>
public enum ActorType
{
/// <summary>Human user via UI or API.</summary>
User = 0,
/// <summary>System-initiated action (scheduler, background job).</summary>
System = 1,
/// <summary>Worker process.</summary>
Worker = 2,
/// <summary>API key authentication.</summary>
ApiKey = 3,
/// <summary>Service-to-service call.</summary>
Service = 4,
/// <summary>Unknown or unidentified actor.</summary>
Unknown = 99
}

View File

@@ -0,0 +1,74 @@
namespace StellaOps.ReleaseOrchestrator.Persistence.Domain;
/// <summary>
/// Represents the first meaningful signal for a job/run.
/// </summary>
public sealed record FirstSignal
{
public required string Version { get; init; } = "1.0";
public required string SignalId { get; init; }
public required Guid JobId { get; init; }
public required DateTimeOffset Timestamp { get; init; }
public required FirstSignalKind Kind { get; init; }
public required FirstSignalPhase Phase { get; init; }
public required FirstSignalScope Scope { get; init; }
public required string Summary { get; init; }
public int? EtaSeconds { get; init; }
public LastKnownOutcome? LastKnownOutcome { get; init; }
public IReadOnlyList<NextAction>? NextActions { get; init; }
public required FirstSignalDiagnostics Diagnostics { get; init; }
}
public enum FirstSignalKind
{
Queued,
Started,
Phase,
Blocked,
Failed,
Succeeded,
Canceled,
Unavailable
}
public enum FirstSignalPhase
{
Resolve,
Fetch,
Restore,
Analyze,
Policy,
Report,
Unknown
}
public sealed record FirstSignalScope
{
public required string Type { get; init; } // "repo" | "image" | "artifact"
public required string Id { get; init; }
}
public sealed record LastKnownOutcome
{
public required string SignatureId { get; init; }
public string? ErrorCode { get; init; }
public required string Token { get; init; }
public string? Excerpt { get; init; }
public required string Confidence { get; init; } // "low" | "medium" | "high"
public required DateTimeOffset FirstSeenAt { get; init; }
public required int HitCount { get; init; }
}
public sealed record NextAction
{
public required string Type { get; init; } // "open_logs" | "open_job" | "docs" | "retry" | "cli_command"
public required string Label { get; init; }
public required string Target { get; init; }
}
public sealed record FirstSignalDiagnostics
{
public required bool CacheHit { get; init; }
public required string Source { get; init; } // "snapshot" | "failure_index" | "cold_start"
public required string CorrelationId { get; init; }
}

View File

@@ -0,0 +1,79 @@
using StellaOps.Cryptography;
using System.Text;
using System.Text.Json;
using System.Text.Json.Nodes;
using System.Text.Json.Serialization;
namespace StellaOps.ReleaseOrchestrator.Persistence.Hashing;
/// <summary>
/// Produces deterministic, canonical JSON and hashes for release-orchestrator payloads (events, audit, manifests).
/// Keys are sorted lexicographically; arrays preserve order; nulls are retained; timestamps remain ISO 8601 with offsets.
/// Uses compliance-profile-aware hashing via <see cref="ICryptoHash"/>.
/// </summary>
public sealed class CanonicalJsonHasher
{
private readonly ICryptoHash _cryptoHash;
private static readonly JsonSerializerOptions SerializerOptions = new()
{
DefaultIgnoreCondition = JsonIgnoreCondition.Never,
WriteIndented = false,
PropertyNamingPolicy = null,
Converters = { new JsonStringEnumConverter(JsonNamingPolicy.CamelCase) }
};
/// <summary>
/// Creates a new CanonicalJsonHasher with the specified crypto hash service.
/// </summary>
/// <param name="cryptoHash">Crypto hash service for compliance-aware hashing.</param>
public CanonicalJsonHasher(ICryptoHash cryptoHash)
{
_cryptoHash = cryptoHash ?? throw new ArgumentNullException(nameof(cryptoHash));
}
/// <summary>
/// Serialize the value to canonical JSON (sorted object keys, stable formatting).
/// </summary>
public static string ToCanonicalJson<T>(T value)
{
var node = JsonSerializer.SerializeToNode(value, SerializerOptions) ?? new JsonObject();
// Work on a detached copy to avoid parent conflicts.
var ordered = OrderNode(node.DeepClone());
return ordered.ToJsonString(SerializerOptions);
}
/// <summary>
/// Compute hash over canonical JSON using the active compliance profile (lowercase hex).
/// Uses <see cref="HashPurpose.Content"/> for content hashing.
/// </summary>
public string ComputeCanonicalHash<T>(T value)
{
var canonicalJson = ToCanonicalJson(value);
var bytes = Encoding.UTF8.GetBytes(canonicalJson);
return _cryptoHash.ComputeHashHexForPurpose(bytes, HashPurpose.Content);
}
private static JsonNode OrderNode(JsonNode node)
{
switch (node)
{
case JsonObject obj:
var orderedObj = new JsonObject();
foreach (var kvp in obj.OrderBy(x => x.Key, StringComparer.Ordinal))
{
orderedObj.Add(kvp.Key, kvp.Value is null ? null : OrderNode(kvp.Value.DeepClone()));
}
return orderedObj;
case JsonArray arr:
var orderedArr = new JsonArray();
foreach (var item in arr)
{
orderedArr.Add(item is null ? null : OrderNode(item.DeepClone()));
}
return orderedArr;
default:
return node.DeepClone(); // primitives stay as-is
}
}
}

View File

@@ -0,0 +1,34 @@
<?xml version="1.0" ?>
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<LangVersion>preview</LangVersion>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<RootNamespace>StellaOps.ReleaseOrchestrator.Persistence</RootNamespace>
<AssemblyName>StellaOps.ReleaseOrchestrator.Persistence</AssemblyName>
<Description>Lean persistence layer for ReleaseOrchestrator (audit + first-signal)</Description>
</PropertyGroup>
<ItemGroup>
<EmbeddedResource Include="Migrations\**\*.sql"
LogicalName="%(RecursiveDir)%(Filename)%(Extension)" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
<PackageReference Include="Microsoft.Extensions.Options" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" />
<PackageReference Include="Npgsql" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\__Libraries\StellaOps.Cryptography\StellaOps.Cryptography.csproj" />
<ProjectReference Include="..\..\..\__Libraries\StellaOps.Infrastructure.Postgres\StellaOps.Infrastructure.Postgres.csproj" />
</ItemGroup>
</Project>