feat: Implement runner execution pipeline with planner dispatch and execution services
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled

- Introduced RunnerBackgroundService to handle execution of runner segments.
- Added RunnerExecutionService for processing segments and aggregating results.
- Implemented PlannerQueueDispatchService to manage dispatching of planner messages.
- Created PlannerQueueDispatcherBackgroundService for leasing and processing planner queue messages.
- Developed ScannerReportClient for interacting with the scanner service.
- Enhanced observability with SchedulerWorkerMetrics for tracking planner and runner performance.
- Added comprehensive documentation for the new runner execution pipeline and observability metrics.
- Implemented event emission for rescan activity and scanner report readiness.
This commit is contained in:
Vladimir Moushkov
2025-10-27 18:57:35 +02:00
parent 730354a1af
commit 4d932cc1ba
42 changed files with 3981 additions and 184 deletions

View File

@@ -4,9 +4,9 @@ This file describe implementation of Stella Ops (docs/README.md). Implementation
| --- | --- | --- | --- | --- | --- | --- |
| Sprint 16 | Scheduler Intelligence | src/StellaOps.Scheduler.Worker/TASKS.md | DOING (2025-10-27) | Scheduler Worker Guild | SCHED-WORKER-16-201 | Planner loop (cron/event triggers, leases, fairness). |
| Sprint 16 | Scheduler Intelligence | src/StellaOps.Scheduler.Worker/TASKS.md | DONE (2025-10-27) | Scheduler Worker Guild | SCHED-WORKER-16-202 | ImpactIndex targeting and shard planning. |
| Sprint 16 | Scheduler Intelligence | src/StellaOps.Scheduler.Worker/TASKS.md | TODO | Scheduler Worker Guild | SCHED-WORKER-16-203 | Runner execution invoking Scanner analysis/content refresh. |
| Sprint 16 | Scheduler Intelligence | src/StellaOps.Scheduler.Worker/TASKS.md | TODO | Scheduler Worker Guild | SCHED-WORKER-16-204 | Emit rescan/report events for Notify/UI. |
| Sprint 16 | Scheduler Intelligence | src/StellaOps.Scheduler.Worker/TASKS.md | TODO | Scheduler Worker Guild | SCHED-WORKER-16-205 | Metrics/telemetry for Scheduler planners/runners. |
| Sprint 16 | Scheduler Intelligence | src/StellaOps.Scheduler.Worker/TASKS.md | DONE (2025-10-27) | Scheduler Worker Guild | SCHED-WORKER-16-203 | Runner execution invoking Scanner analysis/content refresh. |
| Sprint 16 | Scheduler Intelligence | src/StellaOps.Scheduler.Worker/TASKS.md | DONE (2025-10-27) | Scheduler Worker Guild | SCHED-WORKER-16-204 | Emit rescan/report events for Notify/UI. |
| Sprint 16 | Scheduler Intelligence | src/StellaOps.Scheduler.Worker/TASKS.md | DONE (2025-10-27) | Scheduler Worker Guild | SCHED-WORKER-16-205 | Metrics/telemetry for Scheduler planners/runners. |
| Sprint 17 | Symbol Intelligence & Forensics | ops/offline-kit/TASKS.md | BLOCKED (2025-10-26) | Offline Kit Guild, DevOps Guild | DEVOPS-OFFLINE-17-004 | Run mirror_debug_store.py once release artefacts exist and archive verification evidence with the Offline Kit. |
| Sprint 17 | Symbol Intelligence & Forensics | ops/devops/TASKS.md | BLOCKED (2025-10-26) | DevOps Guild | DEVOPS-REL-17-004 | Ensure release workflow publishes `out/release/debug` (build-id tree + manifest) and fails when symbols are missing. |
> DOCS-AOC-19-004: Architecture overview & policy-engine docs refreshed 2025-10-26 — reuse new AOC boundary diagram + metrics guidance.

View File

@@ -66,6 +66,21 @@ Scanner WebService can emit signed `scanner.report.*` events to Redis Streams wh
Helm values mirror the same knobs under each services `env` map (see `deploy/helm/stellaops/values-*.yaml`).
### Scheduler worker configuration
Every Compose profile now provisions the `scheduler-worker` container (backed by the
`StellaOps.Scheduler.Worker.Host` entrypoint). The environment placeholders exposed
in the `.env` samples match the options bound by `AddSchedulerWorker`:
- `SCHEDULER_QUEUE_KIND` queue transport (`Nats` or `Redis`).
- `SCHEDULER_QUEUE_NATS_URL` NATS connection string used by planner/runner consumers.
- `SCHEDULER_STORAGE_DATABASE` MongoDB database name for scheduler state.
- `SCHEDULER_SCANNER_BASEADDRESS` base URL the runner uses when invoking Scanners
`/api/v1/reports` (defaults to the in-cluster `http://scanner-web:8444`).
Helm deployments inherit the same defaults from `services.scheduler-worker.env` in
`values.yaml`; override them per environment as needed.
### Front-door network hand-off
`docker-compose.prod.yaml` adds a `frontdoor` network so operators can attach Traefik, Envoy, or an on-prem load balancer that terminates TLS. Override `FRONTDOOR_NETWORK` in `prod.env` if your reverse proxy uses a different bridge name. Attach only the externally reachable services (Authority, Signer, Attestor, Concelier, Scanner Web, Notify Web, UI) to that network—internal infrastructure (Mongo, MinIO, RustFS, NATS) stays on the private `stellaops` network.

View File

@@ -170,8 +170,8 @@ services:
labels: *release-labels
scanner-worker:
image: registry.stella-ops.org/stellaops/scanner-worker@sha256:eea5d6cfe7835950c5ec7a735a651f2f0d727d3e470cf9027a4a402ea89c4fb5
restart: unless-stopped
image: registry.stella-ops.org/stellaops/scanner-worker@sha256:eea5d6cfe7835950c5ec7a735a651f2f0d727d3e470cf9027a4a402ea89c4fb5
restart: unless-stopped
depends_on:
- scanner-web
- rustfs
@@ -182,10 +182,30 @@ services:
SCANNER__ARTIFACTSTORE__ENDPOINT: "http://rustfs:8080/api/v1"
SCANNER__ARTIFACTSTORE__BUCKET: "scanner-artifacts"
SCANNER__ARTIFACTSTORE__TIMEOUTSECONDS: "30"
SCANNER__QUEUE__BROKER: "${SCANNER_QUEUE_BROKER}"
networks:
- stellaops
labels: *release-labels
SCANNER__QUEUE__BROKER: "${SCANNER_QUEUE_BROKER}"
networks:
- stellaops
labels: *release-labels
scheduler-worker:
image: registry.stella-ops.org/stellaops/scheduler-worker:2025.10.0-edge
restart: unless-stopped
depends_on:
- mongo
- nats
- scanner-web
command:
- "dotnet"
- "StellaOps.Scheduler.Worker.Host.dll"
environment:
SCHEDULER__QUEUE__KIND: "${SCHEDULER_QUEUE_KIND:-Nats}"
SCHEDULER__QUEUE__NATS__URL: "${SCHEDULER_QUEUE_NATS_URL:-nats://nats:4222}"
SCHEDULER__STORAGE__CONNECTIONSTRING: "mongodb://${MONGO_INITDB_ROOT_USERNAME}:${MONGO_INITDB_ROOT_PASSWORD}@mongo:27017"
SCHEDULER__STORAGE__DATABASE: "${SCHEDULER_STORAGE_DATABASE:-stellaops_scheduler}"
SCHEDULER__WORKER__RUNNER__SCANNER__BASEADDRESS: "${SCHEDULER_SCANNER_BASEADDRESS:-http://scanner-web:8444}"
networks:
- stellaops
labels: *release-labels
notify-web:
image: ${NOTIFY_WEB_IMAGE:-registry.stella-ops.org/stellaops/notify-web:2025.09.2}

View File

@@ -168,8 +168,8 @@ services:
labels: *release-labels
scanner-worker:
image: registry.stella-ops.org/stellaops/scanner-worker@sha256:92dda42f6f64b2d9522104a5c9ffb61d37b34dd193132b68457a259748008f37
restart: unless-stopped
image: registry.stella-ops.org/stellaops/scanner-worker@sha256:92dda42f6f64b2d9522104a5c9ffb61d37b34dd193132b68457a259748008f37
restart: unless-stopped
depends_on:
- scanner-web
- rustfs
@@ -181,9 +181,29 @@ services:
SCANNER__ARTIFACTSTORE__BUCKET: "scanner-artifacts"
SCANNER__ARTIFACTSTORE__TIMEOUTSECONDS: "30"
SCANNER__QUEUE__BROKER: "${SCANNER_QUEUE_BROKER}"
networks:
- stellaops
labels: *release-labels
networks:
- stellaops
labels: *release-labels
scheduler-worker:
image: registry.stella-ops.org/stellaops/scheduler-worker:2025.10.0-edge
restart: unless-stopped
depends_on:
- mongo
- nats
- scanner-web
command:
- "dotnet"
- "StellaOps.Scheduler.Worker.Host.dll"
environment:
SCHEDULER__QUEUE__KIND: "${SCHEDULER_QUEUE_KIND:-Nats}"
SCHEDULER__QUEUE__NATS__URL: "${SCHEDULER_QUEUE_NATS_URL:-nats://nats:4222}"
SCHEDULER__STORAGE__CONNECTIONSTRING: "mongodb://${MONGO_INITDB_ROOT_USERNAME}:${MONGO_INITDB_ROOT_PASSWORD}@mongo:27017"
SCHEDULER__STORAGE__DATABASE: "${SCHEDULER_STORAGE_DATABASE:-stellaops_scheduler}"
SCHEDULER__WORKER__RUNNER__SCANNER__BASEADDRESS: "${SCHEDULER_SCANNER_BASEADDRESS:-http://scanner-web:8444}"
networks:
- stellaops
labels: *release-labels
notify-web:
image: ${NOTIFY_WEB_IMAGE:-registry.stella-ops.org/stellaops/notify-web:2025.10.0-edge}

View File

@@ -193,6 +193,26 @@ services:
- stellaops
labels: *release-labels
scheduler-worker:
image: registry.stella-ops.org/stellaops/scheduler-worker:2025.10.0-edge
restart: unless-stopped
depends_on:
- mongo
- nats
- scanner-web
command:
- "dotnet"
- "StellaOps.Scheduler.Worker.Host.dll"
environment:
SCHEDULER__QUEUE__KIND: "${SCHEDULER_QUEUE_KIND:-Nats}"
SCHEDULER__QUEUE__NATS__URL: "${SCHEDULER_QUEUE_NATS_URL:-nats://nats:4222}"
SCHEDULER__STORAGE__CONNECTIONSTRING: "mongodb://${MONGO_INITDB_ROOT_USERNAME}:${MONGO_INITDB_ROOT_PASSWORD}@mongo:27017"
SCHEDULER__STORAGE__DATABASE: "${SCHEDULER_STORAGE_DATABASE:-stellaops_scheduler}"
SCHEDULER__WORKER__RUNNER__SCANNER__BASEADDRESS: "${SCHEDULER_SCANNER_BASEADDRESS:-http://scanner-web:8444}"
networks:
- stellaops
labels: *release-labels
notify-web:
image: ${NOTIFY_WEB_IMAGE:-registry.stella-ops.org/stellaops/notify-web:2025.09.2}
restart: unless-stopped

View File

@@ -168,8 +168,8 @@ services:
labels: *release-labels
scanner-worker:
image: registry.stella-ops.org/stellaops/scanner-worker@sha256:32e25e76386eb9ea8bee0a1ad546775db9a2df989fab61ac877e351881960dab
restart: unless-stopped
image: registry.stella-ops.org/stellaops/scanner-worker@sha256:32e25e76386eb9ea8bee0a1ad546775db9a2df989fab61ac877e351881960dab
restart: unless-stopped
depends_on:
- scanner-web
- rustfs
@@ -180,10 +180,30 @@ services:
SCANNER__ARTIFACTSTORE__ENDPOINT: "http://rustfs:8080/api/v1"
SCANNER__ARTIFACTSTORE__BUCKET: "scanner-artifacts"
SCANNER__ARTIFACTSTORE__TIMEOUTSECONDS: "30"
SCANNER__QUEUE__BROKER: "${SCANNER_QUEUE_BROKER}"
networks:
- stellaops
labels: *release-labels
SCANNER__QUEUE__BROKER: "${SCANNER_QUEUE_BROKER}"
networks:
- stellaops
labels: *release-labels
scheduler-worker:
image: registry.stella-ops.org/stellaops/scheduler-worker:2025.10.0-edge
restart: unless-stopped
depends_on:
- mongo
- nats
- scanner-web
command:
- "dotnet"
- "StellaOps.Scheduler.Worker.Host.dll"
environment:
SCHEDULER__QUEUE__KIND: "${SCHEDULER_QUEUE_KIND:-Nats}"
SCHEDULER__QUEUE__NATS__URL: "${SCHEDULER_QUEUE_NATS_URL:-nats://nats:4222}"
SCHEDULER__STORAGE__CONNECTIONSTRING: "mongodb://${MONGO_INITDB_ROOT_USERNAME}:${MONGO_INITDB_ROOT_PASSWORD}@mongo:27017"
SCHEDULER__STORAGE__DATABASE: "${SCHEDULER_STORAGE_DATABASE:-stellaops_scheduler}"
SCHEDULER__WORKER__RUNNER__SCANNER__BASEADDRESS: "${SCHEDULER_SCANNER_BASEADDRESS:-http://scanner-web:8444}"
networks:
- stellaops
labels: *release-labels
notify-web:
image: ${NOTIFY_WEB_IMAGE:-registry.stella-ops.org/stellaops/notify-web:2025.09.2}

View File

@@ -23,3 +23,7 @@ SCANNER_EVENTS_DSN=
SCANNER_EVENTS_STREAM=stella.events
SCANNER_EVENTS_PUBLISH_TIMEOUT_SECONDS=5
SCANNER_EVENTS_MAX_STREAM_LENGTH=10000
SCHEDULER_QUEUE_KIND=Nats
SCHEDULER_QUEUE_NATS_URL=nats://nats:4222
SCHEDULER_STORAGE_DATABASE=stellaops_scheduler
SCHEDULER_SCANNER_BASEADDRESS=http://scanner-web:8444

View File

@@ -22,3 +22,7 @@ SCANNER_EVENTS_DSN=
SCANNER_EVENTS_STREAM=stella.events
SCANNER_EVENTS_PUBLISH_TIMEOUT_SECONDS=5
SCANNER_EVENTS_MAX_STREAM_LENGTH=10000
SCHEDULER_QUEUE_KIND=Nats
SCHEDULER_QUEUE_NATS_URL=nats://nats:4222
SCHEDULER_STORAGE_DATABASE=stellaops_scheduler
SCHEDULER_SCANNER_BASEADDRESS=http://scanner-web:8444

View File

@@ -25,5 +25,9 @@ SCANNER_EVENTS_DSN=
SCANNER_EVENTS_STREAM=stella.events
SCANNER_EVENTS_PUBLISH_TIMEOUT_SECONDS=5
SCANNER_EVENTS_MAX_STREAM_LENGTH=10000
SCHEDULER_QUEUE_KIND=Nats
SCHEDULER_QUEUE_NATS_URL=nats://nats:4222
SCHEDULER_STORAGE_DATABASE=stellaops_scheduler
SCHEDULER_SCANNER_BASEADDRESS=http://scanner-web:8444
# External reverse proxy (Traefik, Envoy, etc.) that terminates TLS.
FRONTDOOR_NETWORK=stellaops_frontdoor

View File

@@ -22,3 +22,7 @@ SCANNER_EVENTS_DSN=
SCANNER_EVENTS_STREAM=stella.events
SCANNER_EVENTS_PUBLISH_TIMEOUT_SECONDS=5
SCANNER_EVENTS_MAX_STREAM_LENGTH=10000
SCHEDULER_QUEUE_KIND=Nats
SCHEDULER_QUEUE_NATS_URL=nats://nats:4222
SCHEDULER_STORAGE_DATABASE=stellaops_scheduler
SCHEDULER_SCANNER_BASEADDRESS=http://scanner-web:8444

View File

@@ -34,4 +34,16 @@ telemetry:
metricsPort: 9464
resources: {}
services: {}
services:
scheduler-worker:
image: registry.stella-ops.org/stellaops/scheduler-worker:2025.10.0-edge
replicas: 1
command:
- dotnet
- StellaOps.Scheduler.Worker.Host.dll
env:
SCHEDULER__QUEUE__KIND: Nats
SCHEDULER__QUEUE__NATS__URL: nats://nats:4222
SCHEDULER__STORAGE__CONNECTIONSTRING: mongodb://scheduler-mongo:27017
SCHEDULER__STORAGE__DATABASE: stellaops_scheduler
SCHEDULER__WORKER__RUNNER__SCANNER__BASEADDRESS: http://scanner-web:8444

View File

@@ -123,6 +123,7 @@ Update `docs/assets/dashboards/` with screenshots when Grafana capture pipeline
- [CLI AOC commands](../cli/cli-reference.md)
- [Concelier architecture](../ARCHITECTURE_CONCELIER.md)
- [Excititor architecture](../ARCHITECTURE_EXCITITOR.md)
- [Scheduler Worker observability guide](../ops/scheduler-worker-operations.md)
---

View File

@@ -0,0 +1,261 @@
{
"title": "Scheduler Worker Planning & Rescan",
"uid": "scheduler-worker-observability",
"schemaVersion": 38,
"version": 1,
"editable": true,
"timezone": "",
"graphTooltip": 0,
"time": {
"from": "now-24h",
"to": "now"
},
"templating": {
"list": [
{
"name": "datasource",
"type": "datasource",
"query": "prometheus",
"hide": 0,
"refresh": 1,
"current": {}
},
{
"name": "mode",
"label": "Mode",
"type": "query",
"datasource": {
"type": "prometheus",
"uid": "${datasource}"
},
"query": "label_values(scheduler_planner_runs_total, mode)",
"refresh": 1,
"multi": true,
"includeAll": true,
"allValue": ".*",
"current": {
"selected": false,
"text": "All",
"value": ".*"
}
}
]
},
"annotations": {
"list": []
},
"panels": [
{
"id": 1,
"title": "Planner Runs per Status",
"type": "timeseries",
"datasource": {
"type": "prometheus",
"uid": "${datasource}"
},
"fieldConfig": {
"defaults": {
"unit": "ops",
"displayName": "{{status}}"
},
"overrides": []
},
"options": {
"legend": {
"displayMode": "table",
"placement": "bottom"
}
},
"targets": [
{
"expr": "sum by (status) (rate(scheduler_planner_runs_total{mode=~\"$mode\"}[5m]))",
"legendFormat": "{{status}}",
"refId": "A"
}
],
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 0
}
},
{
"id": 2,
"title": "Planner Latency P95 (s)",
"type": "timeseries",
"datasource": {
"type": "prometheus",
"uid": "${datasource}"
},
"fieldConfig": {
"defaults": {
"unit": "s"
},
"overrides": []
},
"options": {
"legend": {
"displayMode": "table",
"placement": "bottom"
}
},
"targets": [
{
"expr": "histogram_quantile(0.95, sum by (le) (rate(scheduler_planner_latency_seconds_bucket{mode=~\"$mode\"}[5m])))",
"legendFormat": "p95",
"refId": "A"
}
],
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 0
}
},
{
"id": 3,
"title": "Runner Segments per Status",
"type": "timeseries",
"datasource": {
"type": "prometheus",
"uid": "${datasource}"
},
"fieldConfig": {
"defaults": {
"unit": "ops",
"displayName": "{{status}}"
},
"overrides": []
},
"options": {
"legend": {
"displayMode": "table",
"placement": "bottom"
}
},
"targets": [
{
"expr": "sum by (status) (rate(scheduler_runner_segments_total{mode=~\"$mode\"}[5m]))",
"legendFormat": "{{status}}",
"refId": "A"
}
],
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 8
}
},
{
"id": 4,
"title": "New Findings per Severity",
"type": "timeseries",
"datasource": {
"type": "prometheus",
"uid": "${datasource}"
},
"fieldConfig": {
"defaults": {
"unit": "ops",
"displayName": "{{severity}}"
},
"overrides": []
},
"options": {
"legend": {
"displayMode": "table",
"placement": "bottom"
}
},
"targets": [
{
"expr": "sum(rate(scheduler_runner_delta_critical_total{mode=~\"$mode\"}[5m]))",
"legendFormat": "critical",
"refId": "A"
},
{
"expr": "sum(rate(scheduler_runner_delta_high_total{mode=~\"$mode\"}[5m]))",
"legendFormat": "high",
"refId": "B"
},
{
"expr": "sum(rate(scheduler_runner_delta_total{mode=~\"$mode\"}[5m]))",
"legendFormat": "total",
"refId": "C"
}
],
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 8
}
},
{
"id": 5,
"title": "Runner Backlog by Schedule",
"type": "table",
"datasource": {
"type": "prometheus",
"uid": "${datasource}"
},
"fieldConfig": {
"defaults": {
"displayName": "{{scheduleId}}",
"unit": "none"
},
"overrides": []
},
"options": {
"showHeader": true
},
"targets": [
{
"expr": "max by (scheduleId) (scheduler_runner_backlog{mode=~\"$mode\"})",
"format": "table",
"refId": "A"
}
],
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 16
}
},
{
"id": 6,
"title": "Active Runs",
"type": "stat",
"datasource": {
"type": "prometheus",
"uid": "${datasource}"
},
"fieldConfig": {
"defaults": {
"unit": "none"
},
"overrides": []
},
"options": {
"orientation": "horizontal",
"textMode": "value"
},
"targets": [
{
"expr": "sum(scheduler_runs_active{mode=~\"$mode\"})",
"refId": "A"
}
],
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 16
}
}
]
}

View File

@@ -0,0 +1,82 @@
# Scheduler Worker Observability & Runbook
## Purpose
Monitor planner and runner health for the Scheduler Worker (Sprint16 telemetry). The new .NET meters surface queue throughput, latency, backlog, and delta severities so operators can detect stalled runs before rescan SLAs slip.
> **Grafana note:** Import `docs/ops/scheduler-worker-grafana-dashboard.json` into the Prometheus-backed Grafana stack that scrapes the OpenTelemetry Collector.
---
## Key metrics
| Metric | Use case | Suggested query |
| --- | --- | --- |
| `scheduler_planner_runs_total{status}` | Planner throughput & failure ratio | `sum by (status) (rate(scheduler_planner_runs_total[5m]))` |
| `scheduler_planner_latency_seconds_bucket` | Planning latency (p95 / p99) | `histogram_quantile(0.95, sum by (le) (rate(scheduler_planner_latency_seconds_bucket[5m])))` |
| `scheduler_runner_segments_total{status}` | Runner success vs retries | `sum by (status) (rate(scheduler_runner_segments_total[5m]))` |
| `scheduler_runner_delta_{critical,high,total}` | Newly-detected findings | `sum(rate(scheduler_runner_delta_critical_total[5m]))` |
| `scheduler_runner_backlog{scheduleId}` | Remaining digests awaiting runner | `max by (scheduleId) (scheduler_runner_backlog)` |
| `scheduler_runs_active{mode}` | Active runs in-flight | `sum(scheduler_runs_active)` |
Reference queries power the bundled Grafana dashboard panels. Use the `mode` template variable to focus on `analysisOnly` versus `contentRefresh` schedules.
---
## Grafana dashboard
1. Import `docs/ops/scheduler-worker-grafana-dashboard.json` (UID `scheduler-worker-observability`).
2. Point the `datasource` variable to the Prometheus instance scraping the collector. Optional: pin the `mode` variable to a specific schedule mode.
3. Panels included:
- **Planner Runs per Status** visualises success vs failure ratio.
- **Planner Latency P95** highlights degradations in ImpactIndex or Mongo lookups.
- **Runner Segments per Status** shows retry pressure and queue health.
- **New Findings per Severity** rolls up delta counters (critical/high/total).
- **Runner Backlog by Schedule** tabulates outstanding digests per schedule.
- **Active Runs** stat panel showing the current number of in-flight runs.
Capture screenshots once Grafana provisioning completes and store them under `docs/assets/dashboards/` (pending automation ticket OBS-157).
---
## Prometheus alerts
Import `docs/ops/scheduler-worker-prometheus-rules.yaml` into your Prometheus rule configuration. The bundle defines:
- **SchedulerPlannerFailuresHigh** 5%+ of planner runs failed for 10 minutes. Page SRE.
- **SchedulerPlannerLatencyHigh** planner p95 latency remains above 45s for 10 minutes. Investigate ImpactIndex, Mongo, and Feedser/Vexer event queues.
- **SchedulerRunnerBacklogGrowing** backlog exceeded 500 images for 15 minutes. Inspect runner workers, Scanner availability, and rate limiting.
- **SchedulerRunStuck** active run count stayed flat for 30 minutes while remaining non-zero. Check stuck segments, expired leases, and scanner retries.
Hook these alerts into the existing Observability notification pathway (`observability-pager` routing key) and ensure `service=scheduler-worker` is mapped to the on-call rotation.
---
## Runbook snapshot
1. **Planner failure/latency:**
- Check Planner logs for ImpactIndex or Mongo exceptions.
- Verify Feedser/Vexer webhook health; requeue events if necessary.
- If planner is overwhelmed, temporarily reduce schedule parallelism via `stella scheduler schedule update`.
2. **Runner backlog spike:**
- Confirm Scanner WebService health (`/healthz`).
- Inspect runner queue for stuck segments; consider increasing runner workers or scaling scanner capacity.
- Review rate limits (schedule limits, ImpactIndex throughput) before changing global throttles.
3. **Stuck runs:**
- Use `stella scheduler runs list --state running` to identify affected runs.
- Drill into Grafana panel “Runner Backlog by Schedule” to see offending schedule IDs.
- If a segment will not progress, use `stella scheduler segments release --segment <id>` to force retry after resolving root cause.
4. **Unexpected critical deltas:**
- Correlate `scheduler_runner_delta_critical_total` spikes with Notify events (`scheduler.rescan.delta`).
- Pivot to Scanner report links for impacted digests and confirm they match upstream advisories/policies.
Document incidents and mitigation in `ops/runbooks/INCIDENT_LOG.md` (per SRE policy) and attach Grafana screenshots for post-mortems.
---
## Checklist
- [ ] Grafana dashboard imported and wired to Prometheus datasource.
- [ ] Prometheus alert rules deployed (see above).
- [ ] Runbook linked from on-call rotation portal.
- [ ] Observability Guild sign-off captured for Sprint16 telemetry (OWNER: @obs-guild).

View File

@@ -0,0 +1,42 @@
groups:
- name: scheduler-worker
interval: 30s
rules:
- alert: SchedulerPlannerFailuresHigh
expr: sum(rate(scheduler_planner_runs_total{status="failed"}[5m]))
/
sum(rate(scheduler_planner_runs_total[5m])) > 0.05
for: 10m
labels:
severity: critical
service: scheduler-worker
annotations:
summary: "Planner failure ratio above 5%"
description: "More than 5% of planning runs are failing. Inspect scheduler logs and ImpactIndex connectivity before queues back up."
- alert: SchedulerPlannerLatencyHigh
expr: histogram_quantile(0.95, sum by (le) (rate(scheduler_planner_latency_seconds_bucket[5m]))) > 45
for: 10m
labels:
severity: warning
service: scheduler-worker
annotations:
summary: "Planner latency p95 above 45s"
description: "Planning latency p95 stayed above 45 seconds for 10 minutes. Check ImpactIndex, Mongo, or external selectors to prevent missed SLAs."
- alert: SchedulerRunnerBacklogGrowing
expr: max_over_time(scheduler_runner_backlog[15m]) > 500
for: 15m
labels:
severity: warning
service: scheduler-worker
annotations:
summary: "Runner backlog above 500 images"
description: "Runner backlog exceeded 500 images over the last 15 minutes. Verify runner workers, scanner availability, and rate limits."
- alert: SchedulerRunStuck
expr: sum(scheduler_runs_active) > 0 and max_over_time(scheduler_runs_active[30m]) == min_over_time(scheduler_runs_active[30m])
for: 30m
labels:
severity: warning
service: scheduler-worker
annotations:
summary: "Scheduler runs stuck without progress"
description: "Active runs count has remained flat for 30 minutes. Investigate stuck segments or scanner timeouts."

View File

@@ -161,6 +161,7 @@ Provision the following secrets/configs (names can be overridden via Helm values
- [ ] Tempo and Loki report tenant activity (`/api/status`).
- [ ] Retention policy tested by uploading sample data and verifying expiry.
- [ ] Alerts wired into SLO evaluator (DEVOPS-OBS-51-001).
- [ ] Component rule packs imported (e.g. `docs/ops/scheduler-worker-prometheus-rules.yaml`).
---

View File

@@ -0,0 +1,32 @@
using System.Diagnostics;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using StellaOps.Scheduler.Queue;
using StellaOps.Scheduler.Storage.Mongo;
using StellaOps.Scheduler.Worker.DependencyInjection;
var builder = Host.CreateApplicationBuilder(args);
builder.Logging.Configure(options =>
{
options.ActivityTrackingOptions = ActivityTrackingOptions.TraceId
| ActivityTrackingOptions.SpanId
| ActivityTrackingOptions.ParentId;
});
builder.Services.AddSchedulerQueues(builder.Configuration);
var storageSection = builder.Configuration.GetSection("Scheduler:Storage");
if (storageSection.Exists())
{
builder.Services.AddSchedulerMongoStorage(storageSection);
}
builder.Services.AddSchedulerWorker(builder.Configuration.GetSection("Scheduler:Worker"));
var host = builder.Build();
await host.RunAsync();
public partial class Program;

View File

@@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net10.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>
<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\StellaOps.Scheduler.Queue\StellaOps.Scheduler.Queue.csproj" />
<ProjectReference Include="..\StellaOps.Scheduler.Storage.Mongo\StellaOps.Scheduler.Storage.Mongo.csproj" />
<ProjectReference Include="..\StellaOps.Scheduler.Worker\StellaOps.Scheduler.Worker.csproj" />
</ItemGroup>
</Project>

View File

@@ -1,5 +1,4 @@
global using System.Collections.Immutable;
global using Moq;
global using StellaOps.Scheduler.ImpactIndex;
global using StellaOps.Scheduler.Models;
global using StellaOps.Scheduler.Worker;

View File

@@ -0,0 +1,120 @@
using System.Collections.Generic;
using System.Net;
using System.Net.Http;
using System.Net.Http.Json;
using System.Text.Json;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using StellaOps.Scheduler.Models;
using StellaOps.Scheduler.Worker.Execution;
using StellaOps.Scheduler.Worker.Options;
namespace StellaOps.Scheduler.Worker.Tests;
public sealed class HttpScannerReportClientTests
{
[Fact]
public async Task ExecuteAsync_WhenReportReturnsFindings_ProducesDeltaSummary()
{
var handler = new StubHttpMessageHandler(request =>
{
if (request.RequestUri?.AbsolutePath.EndsWith("/api/v1/reports", StringComparison.OrdinalIgnoreCase) == true)
{
var payload = new
{
report = new
{
reportId = "report-123",
imageDigest = "sha256:abc",
generatedAt = DateTimeOffset.UtcNow,
verdict = "warn",
policy = new { revisionId = "rev-1", digest = "digest-1" },
summary = new { total = 3, blocked = 2, warned = 1, ignored = 0, quieted = 0 }
},
dsse = new
{
payloadType = "application/vnd.in-toto+json",
payload = "eyJkYXRhIjoidGVzdCJ9",
signatures = new[] { new { keyId = "test", algorithm = "ed25519", signature = "c2ln" } }
}
};
return Task.FromResult(new HttpResponseMessage(HttpStatusCode.OK)
{
Content = JsonContent.Create(payload)
});
}
return Task.FromResult(new HttpResponseMessage(HttpStatusCode.OK));
});
var httpClient = new HttpClient(handler)
{
BaseAddress = new Uri("https://scanner.example")
};
var options = Microsoft.Extensions.Options.Options.Create(new SchedulerWorkerOptions());
options.Value.Runner.Scanner.BaseAddress = httpClient.BaseAddress;
options.Value.Runner.Scanner.EnableContentRefresh = false;
var client = new HttpScannerReportClient(httpClient, options, NullLogger<HttpScannerReportClient>.Instance);
var result = await client.ExecuteAsync(
new ScannerReportRequest("tenant-1", "run-1", "sha256:abc", ScheduleMode.AnalysisOnly, true, new Dictionary<string, string>()),
CancellationToken.None);
Assert.Equal("sha256:abc", result.ImageDigest);
Assert.NotNull(result.Delta);
Assert.Equal(3, result.Delta!.NewFindings);
Assert.Equal(2, result.Delta.NewCriticals);
Assert.Equal(1, result.Delta.NewHigh);
Assert.Equal(0, result.Delta.NewMedium);
Assert.Equal(0, result.Delta.NewLow);
Assert.Equal("report-123", result.Report.ReportId);
Assert.Equal("rev-1", result.Report.PolicyRevisionId);
Assert.NotNull(result.Dsse);
}
[Fact]
public async Task ExecuteAsync_WhenReportFails_RetriesAndThrows()
{
var callCount = 0;
var handler = new StubHttpMessageHandler(_ =>
{
callCount++;
return Task.FromResult(new HttpResponseMessage(HttpStatusCode.InternalServerError));
});
var httpClient = new HttpClient(handler)
{
BaseAddress = new Uri("https://scanner.example")
};
var options = Microsoft.Extensions.Options.Options.Create(new SchedulerWorkerOptions());
options.Value.Runner.Scanner.BaseAddress = httpClient.BaseAddress;
options.Value.Runner.Scanner.EnableContentRefresh = false;
options.Value.Runner.Scanner.MaxRetryAttempts = 2;
options.Value.Runner.Scanner.RetryBaseDelay = TimeSpan.FromMilliseconds(1);
var client = new HttpScannerReportClient(httpClient, options, NullLogger<HttpScannerReportClient>.Instance);
await Assert.ThrowsAsync<HttpRequestException>(() => client.ExecuteAsync(
new ScannerReportRequest("tenant-1", "run-1", "sha256:abc", ScheduleMode.AnalysisOnly, true, new Dictionary<string, string>()),
CancellationToken.None));
Assert.Equal(3, callCount);
}
private sealed class StubHttpMessageHandler : HttpMessageHandler
{
private readonly Func<HttpRequestMessage, Task<HttpResponseMessage>> _handler;
public StubHttpMessageHandler(Func<HttpRequestMessage, Task<HttpResponseMessage>> handler)
{
_handler = handler;
}
protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
=> _handler(request);
}
}

View File

@@ -1,4 +1,9 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using StellaOps.Scheduler.ImpactIndex;
namespace StellaOps.Scheduler.Worker.Tests;
@@ -9,17 +14,20 @@ public sealed class ImpactTargetingServiceTests
{
var selector = new Selector(SelectorScope.AllImages, tenantId: "tenant-alpha");
var expected = CreateEmptyImpactSet(selector, usageOnly: false);
IEnumerable<string>? capturedKeys = null;
var mockIndex = new Mock<IImpactIndex>(MockBehavior.Strict);
mockIndex
.Setup(index => index.ResolveByPurlsAsync(
It.Is<IEnumerable<string>>(keys => keys.SequenceEqual(new[] { "pkg:npm/a", "pkg:npm/b" })),
false,
selector,
It.IsAny<CancellationToken>()))
.ReturnsAsync(expected);
var index = new StubImpactIndex
{
OnResolveByPurls = (purls, usageOnly, sel, _) =>
{
capturedKeys = purls.ToArray();
Assert.False(usageOnly);
Assert.Equal(selector, sel);
return ValueTask.FromResult(expected);
}
};
var service = new ImpactTargetingService(mockIndex.Object);
var service = new ImpactTargetingService(index);
var result = await service.ResolveByPurlsAsync(
new[] { "pkg:npm/a", "pkg:npm/A ", null!, "pkg:npm/b" },
@@ -27,21 +35,21 @@ public sealed class ImpactTargetingServiceTests
selector);
Assert.Equal(expected, result);
mockIndex.VerifyAll();
Assert.Equal(new[] { "pkg:npm/a", "pkg:npm/b" }, capturedKeys);
}
[Fact]
public async Task ResolveByVulnerabilitiesAsync_ReturnsEmptyWhenNoIds()
{
var selector = new Selector(SelectorScope.AllImages, tenantId: "tenant-alpha");
var mockIndex = new Mock<IImpactIndex>(MockBehavior.Strict);
var service = new ImpactTargetingService(mockIndex.Object);
var index = new StubImpactIndex();
var service = new ImpactTargetingService(index);
var result = await service.ResolveByVulnerabilitiesAsync(Array.Empty<string>(), usageOnly: true, selector);
Assert.Empty(result.Images);
Assert.True(result.UsageOnly);
mockIndex.Verify(index => index.ResolveByVulnerabilitiesAsync(It.IsAny<IEnumerable<string>>(), It.IsAny<bool>(), It.IsAny<Selector>(), It.IsAny<CancellationToken>()), Times.Never);
Assert.Null(index.LastVulnerabilityIds);
}
[Fact]
@@ -50,16 +58,20 @@ public sealed class ImpactTargetingServiceTests
var selector = new Selector(SelectorScope.AllImages, tenantId: "tenant-alpha");
var expected = CreateEmptyImpactSet(selector, usageOnly: true);
var mockIndex = new Mock<IImpactIndex>();
mockIndex
.Setup(index => index.ResolveAllAsync(selector, true, It.IsAny<CancellationToken>()))
.Returns(new ValueTask<ImpactSet>(expected));
var index = new StubImpactIndex
{
OnResolveAll = (sel, usageOnly, _) =>
{
Assert.Equal(selector, sel);
Assert.True(usageOnly);
return ValueTask.FromResult(expected);
}
};
var service = new ImpactTargetingService(mockIndex.Object);
var service = new ImpactTargetingService(index);
var result = await service.ResolveAllAsync(selector, usageOnly: true);
Assert.Equal(expected, result);
mockIndex.VerifyAll();
}
[Fact]
@@ -77,10 +89,7 @@ public sealed class ImpactTargetingServiceTests
namespaces: new[] { "team-a" },
tags: new[] { "v1" },
usedByEntrypoint: false,
labels: new[]
{
KeyValuePair.Create("env", "prod")
}),
labels: new[] { KeyValuePair.Create("env", "prod") }),
new ImpactImage(
"sha256:111",
"registry-1",
@@ -100,17 +109,12 @@ public sealed class ImpactTargetingServiceTests
snapshotId: "snap-1",
schemaVersion: SchedulerSchemaVersions.ImpactSet);
var mockIndex = new Mock<IImpactIndex>(MockBehavior.Strict);
mockIndex
.Setup(index => index.ResolveByPurlsAsync(
It.IsAny<IEnumerable<string>>(),
false,
selector,
It.IsAny<CancellationToken>()))
.ReturnsAsync(indexResult);
var service = new ImpactTargetingService(mockIndex.Object);
var index = new StubImpactIndex
{
OnResolveByPurls = (_, _, _, _) => ValueTask.FromResult(indexResult)
};
var service = new ImpactTargetingService(index);
var result = await service.ResolveByPurlsAsync(new[] { "pkg:npm/a" }, usageOnly: false, selector);
Assert.Single(result.Images);
@@ -163,16 +167,12 @@ public sealed class ImpactTargetingServiceTests
snapshotId: null,
schemaVersion: SchedulerSchemaVersions.ImpactSet);
var mockIndex = new Mock<IImpactIndex>(MockBehavior.Strict);
mockIndex
.Setup(index => index.ResolveByPurlsAsync(
It.IsAny<IEnumerable<string>>(),
true,
selector,
It.IsAny<CancellationToken>()))
.ReturnsAsync(indexResult);
var index = new StubImpactIndex
{
OnResolveByPurls = (_, _, _, _) => ValueTask.FromResult(indexResult)
};
var service = new ImpactTargetingService(mockIndex.Object);
var service = new ImpactTargetingService(index);
var result = await service.ResolveByPurlsAsync(new[] { "pkg:npm/a" }, usageOnly: true, selector);
Assert.Single(result.Images);
@@ -180,8 +180,7 @@ public sealed class ImpactTargetingServiceTests
}
private static ImpactSet CreateEmptyImpactSet(Selector selector, bool usageOnly)
{
return new ImpactSet(
=> new(
selector,
ImmutableArray<ImpactImage>.Empty,
usageOnly,
@@ -189,5 +188,30 @@ public sealed class ImpactTargetingServiceTests
0,
snapshotId: null,
schemaVersion: SchedulerSchemaVersions.ImpactSet);
private sealed class StubImpactIndex : IImpactIndex
{
public Func<IEnumerable<string>, bool, Selector, CancellationToken, ValueTask<ImpactSet>>? OnResolveByPurls { get; set; }
public Func<IEnumerable<string>, bool, Selector, CancellationToken, ValueTask<ImpactSet>>? OnResolveByVulnerabilities { get; set; }
public Func<Selector, bool, CancellationToken, ValueTask<ImpactSet>>? OnResolveAll { get; set; }
public IEnumerable<string>? LastVulnerabilityIds { get; private set; }
public ValueTask<ImpactSet> ResolveByPurlsAsync(IEnumerable<string> purls, bool usageOnly, Selector selector, CancellationToken cancellationToken = default)
=> OnResolveByPurls?.Invoke(purls, usageOnly, selector, cancellationToken)
?? ValueTask.FromResult(CreateEmptyImpactSet(selector, usageOnly));
public ValueTask<ImpactSet> ResolveByVulnerabilitiesAsync(IEnumerable<string> vulnerabilityIds, bool usageOnly, Selector selector, CancellationToken cancellationToken = default)
{
LastVulnerabilityIds = vulnerabilityIds;
return OnResolveByVulnerabilities?.Invoke(vulnerabilityIds, usageOnly, selector, cancellationToken)
?? ValueTask.FromResult(CreateEmptyImpactSet(selector, usageOnly));
}
public ValueTask<ImpactSet> ResolveAllAsync(Selector selector, bool usageOnly, CancellationToken cancellationToken = default)
=> OnResolveAll?.Invoke(selector, usageOnly, cancellationToken)
?? ValueTask.FromResult(CreateEmptyImpactSet(selector, usageOnly));
}
}

View File

@@ -1,11 +1,16 @@
using Microsoft.Extensions.Logging;
using StellaOps.Scheduler.Storage.Mongo.Projections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using MongoDB.Driver;
using Microsoft.Extensions.Logging.Abstractions;
using StellaOps.Scheduler.Models;
using StellaOps.Scheduler.Queue;
using StellaOps.Scheduler.Storage.Mongo.Projections;
using StellaOps.Scheduler.Storage.Mongo.Repositories;
using StellaOps.Scheduler.Storage.Mongo.Services;
using StellaOps.Scheduler.Worker.Options;
using StellaOps.Scheduler.Worker.Planning;
using StellaOps.Scheduler.Worker.Observability;
namespace StellaOps.Scheduler.Worker.Tests;
@@ -18,56 +23,34 @@ public sealed class PlannerExecutionServiceTests
var run = CreateRun(schedule.Id);
var impactSet = CreateImpactSet(schedule.Selection, images: 2);
var scheduleRepository = new Mock<IScheduleRepository>();
scheduleRepository
.Setup(repo => repo.GetAsync(run.TenantId, run.ScheduleId!, null, It.IsAny<CancellationToken>()))
.ReturnsAsync(schedule);
var scheduleRepository = new StubScheduleRepository(schedule);
var runRepository = new InMemoryRunRepository(run);
var snapshotRepository = new RecordingImpactSnapshotRepository();
var runSummaryService = new RecordingRunSummaryService();
var targetingService = new StubImpactTargetingService(impactSet);
var plannerQueue = new RecordingPlannerQueue();
var runRepository = new Mock<IRunRepository>();
runRepository
.Setup(repo => repo.UpdateAsync(It.IsAny<Run>(), null, It.IsAny<CancellationToken>()))
.ReturnsAsync(true);
using var metrics = new SchedulerWorkerMetrics();
var snapshotRepository = new Mock<IImpactSnapshotRepository>();
snapshotRepository
.Setup(repo => repo.UpsertAsync(It.IsAny<ImpactSet>(), null, It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);
var runSummaryService = new Mock<IRunSummaryService>();
runSummaryService
.Setup(service => service.ProjectAsync(It.IsAny<Run>(), It.IsAny<CancellationToken>()))
.Returns(Task.FromResult(default(RunSummaryProjection)!));
var targetingService = new Mock<IImpactTargetingService>();
targetingService
.Setup(service => service.ResolveAllAsync(schedule.Selection, true, It.IsAny<CancellationToken>()))
.Returns(new ValueTask<ImpactSet>(impactSet));
var plannerQueue = new Mock<ISchedulerPlannerQueue>();
plannerQueue
.Setup(queue => queue.EnqueueAsync(It.IsAny<PlannerQueueMessage>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(new SchedulerQueueEnqueueResult("msg-1", false));
var options = new SchedulerWorkerOptions();
var service = new PlannerExecutionService(
scheduleRepository.Object,
runRepository.Object,
snapshotRepository.Object,
runSummaryService.Object,
targetingService.Object,
plannerQueue.Object,
options,
scheduleRepository,
runRepository,
snapshotRepository,
runSummaryService,
targetingService,
plannerQueue,
new SchedulerWorkerOptions(),
TimeProvider.System,
CreateLogger());
metrics,
NullLogger<PlannerExecutionService>.Instance);
var result = await service.ProcessAsync(run, CancellationToken.None);
Assert.Equal(PlannerExecutionStatus.Enqueued, result.Status);
Assert.NotNull(result.UpdatedRun);
Assert.Single(plannerQueue.Messages);
Assert.NotNull(snapshotRepository.LastSnapshot);
Assert.Equal(RunState.Queued, result.UpdatedRun!.State);
Assert.Equal(impactSet.Images.Length, result.UpdatedRun.Stats.Queued);
plannerQueue.Verify(queue => queue.EnqueueAsync(It.IsAny<PlannerQueueMessage>(), It.IsAny<CancellationToken>()), Times.Once);
snapshotRepository.Verify(repo => repo.UpsertAsync(It.IsAny<ImpactSet>(), null, It.IsAny<CancellationToken>()), Times.Once);
}
[Fact]
@@ -77,96 +60,75 @@ public sealed class PlannerExecutionServiceTests
var run = CreateRun(schedule.Id);
var impactSet = CreateImpactSet(schedule.Selection, images: 0);
var scheduleRepository = new Mock<IScheduleRepository>();
scheduleRepository
.Setup(repo => repo.GetAsync(run.TenantId, run.ScheduleId!, null, It.IsAny<CancellationToken>()))
.ReturnsAsync(schedule);
var runRepository = new Mock<IRunRepository>();
runRepository
.Setup(repo => repo.UpdateAsync(It.IsAny<Run>(), null, It.IsAny<CancellationToken>()))
.ReturnsAsync(true);
var snapshotRepository = new Mock<IImpactSnapshotRepository>();
var runSummaryService = new Mock<IRunSummaryService>();
runSummaryService
.Setup(service => service.ProjectAsync(It.IsAny<Run>(), It.IsAny<CancellationToken>()))
.Returns(Task.FromResult(default(RunSummaryProjection)!));
var targetingService = new Mock<IImpactTargetingService>();
targetingService
.Setup(service => service.ResolveAllAsync(schedule.Selection, true, It.IsAny<CancellationToken>()))
.Returns(new ValueTask<ImpactSet>(impactSet));
var plannerQueue = new Mock<ISchedulerPlannerQueue>();
var options = new SchedulerWorkerOptions();
var service = new PlannerExecutionService(
scheduleRepository.Object,
runRepository.Object,
snapshotRepository.Object,
runSummaryService.Object,
targetingService.Object,
plannerQueue.Object,
options,
TimeProvider.System,
CreateLogger());
var service = CreateService(schedule, run, impactSet, out var plannerQueue);
var result = await service.ProcessAsync(run, CancellationToken.None);
Assert.Equal(PlannerExecutionStatus.CompletedWithoutWork, result.Status);
Assert.NotNull(result.UpdatedRun);
Assert.Equal(RunState.Completed, result.UpdatedRun!.State);
plannerQueue.Verify(queue => queue.EnqueueAsync(It.IsAny<PlannerQueueMessage>(), It.IsAny<CancellationToken>()), Times.Never);
Assert.Empty(plannerQueue.Messages);
}
[Fact]
public async Task ProcessAsync_WhenScheduleMissing_MarksRunAsFailed()
{
var run = CreateRun(scheduleId: "missing");
var scheduleRepository = new StubScheduleRepository(); // empty repository
var runRepository = new InMemoryRunRepository(run);
var snapshotRepository = new RecordingImpactSnapshotRepository();
var runSummaryService = new RecordingRunSummaryService();
var targetingService = new StubImpactTargetingService(CreateImpactSet(new Selector(SelectorScope.AllImages, run.TenantId), 0));
var plannerQueue = new RecordingPlannerQueue();
var scheduleRepository = new Mock<IScheduleRepository>();
scheduleRepository
.Setup(repo => repo.GetAsync(run.TenantId, run.ScheduleId!, null, It.IsAny<CancellationToken>()))
.ReturnsAsync((Schedule?)null);
var runRepository = new Mock<IRunRepository>();
runRepository
.Setup(repo => repo.UpdateAsync(It.IsAny<Run>(), null, It.IsAny<CancellationToken>()))
.ReturnsAsync(true);
var snapshotRepository = new Mock<IImpactSnapshotRepository>();
var runSummaryService = new Mock<IRunSummaryService>();
runSummaryService
.Setup(service => service.ProjectAsync(It.IsAny<Run>(), It.IsAny<CancellationToken>()))
.Returns(Task.FromResult(default(RunSummaryProjection)!));
var targetingService = new Mock<IImpactTargetingService>();
var plannerQueue = new Mock<ISchedulerPlannerQueue>();
using var metrics = new SchedulerWorkerMetrics();
var service = new PlannerExecutionService(
scheduleRepository.Object,
runRepository.Object,
snapshotRepository.Object,
runSummaryService.Object,
targetingService.Object,
plannerQueue.Object,
scheduleRepository,
runRepository,
snapshotRepository,
runSummaryService,
targetingService,
plannerQueue,
new SchedulerWorkerOptions(),
TimeProvider.System,
CreateLogger());
metrics,
NullLogger<PlannerExecutionService>.Instance);
var result = await service.ProcessAsync(run, CancellationToken.None);
Assert.Equal(PlannerExecutionStatus.Failed, result.Status);
Assert.NotNull(result.UpdatedRun);
Assert.Equal(RunState.Error, result.UpdatedRun!.State);
targetingService.Verify(service => service.ResolveAllAsync(It.IsAny<Selector>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()), Times.Never);
plannerQueue.Verify(queue => queue.EnqueueAsync(It.IsAny<PlannerQueueMessage>(), It.IsAny<CancellationToken>()), Times.Never);
Assert.Empty(plannerQueue.Messages);
}
private static PlannerExecutionService CreateService(
Schedule schedule,
Run run,
ImpactSet impactSet,
out RecordingPlannerQueue plannerQueue)
{
var scheduleRepository = new StubScheduleRepository(schedule);
var runRepository = new InMemoryRunRepository(run);
var snapshotRepository = new RecordingImpactSnapshotRepository();
var runSummaryService = new RecordingRunSummaryService();
var targetingService = new StubImpactTargetingService(impactSet);
plannerQueue = new RecordingPlannerQueue();
return new PlannerExecutionService(
scheduleRepository,
runRepository,
snapshotRepository,
runSummaryService,
targetingService,
plannerQueue,
new SchedulerWorkerOptions(),
TimeProvider.System,
new SchedulerWorkerMetrics(),
NullLogger<PlannerExecutionService>.Instance);
}
private static Run CreateRun(string scheduleId)
{
return new Run(
=> new(
id: "run_001",
tenantId: "tenant-alpha",
trigger: RunTrigger.Cron,
@@ -174,11 +136,9 @@ public sealed class PlannerExecutionServiceTests
stats: RunStats.Empty,
createdAt: DateTimeOffset.UtcNow.AddMinutes(-5),
scheduleId: scheduleId);
}
private static Schedule CreateSchedule()
{
return new Schedule(
=> new(
id: "sch_001",
tenantId: "tenant-alpha",
name: "Nightly",
@@ -195,7 +155,6 @@ public sealed class PlannerExecutionServiceTests
updatedAt: DateTimeOffset.UtcNow.AddHours(-1),
updatedBy: "system",
subscribers: ImmutableArray<string>.Empty);
}
private static ImpactSet CreateImpactSet(Selector selector, int images)
{
@@ -219,8 +178,144 @@ public sealed class PlannerExecutionServiceTests
schemaVersion: SchedulerSchemaVersions.ImpactSet);
}
private static ILogger<PlannerExecutionService> CreateLogger()
private sealed class StubScheduleRepository : IScheduleRepository
{
return LoggerFactory.Create(builder => { }).CreateLogger<PlannerExecutionService>();
private readonly Dictionary<(string TenantId, string ScheduleId), Schedule> _store;
public StubScheduleRepository(params Schedule[] schedules)
{
_store = schedules.ToDictionary(schedule => (schedule.TenantId, schedule.Id), schedule => schedule);
}
public Task UpsertAsync(Schedule schedule, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
{
_store[(schedule.TenantId, schedule.Id)] = schedule;
return Task.CompletedTask;
}
public Task<Schedule?> GetAsync(string tenantId, string scheduleId, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
{
_store.TryGetValue((tenantId, scheduleId), out var schedule);
return Task.FromResult(schedule);
}
public Task<IReadOnlyList<Schedule>> ListAsync(string tenantId, ScheduleQueryOptions? options = null, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
=> Task.FromResult<IReadOnlyList<Schedule>>(_store.Values.Where(schedule => schedule.TenantId == tenantId).ToArray());
public Task<bool> SoftDeleteAsync(string tenantId, string scheduleId, string deletedBy, DateTimeOffset deletedAt, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
=> Task.FromResult(_store.Remove((tenantId, scheduleId)));
}
private sealed class InMemoryRunRepository : IRunRepository
{
private readonly ConcurrentDictionary<(string Tenant, string RunId), Run> _runs = new();
public InMemoryRunRepository(params Run[] runs)
{
foreach (var run in runs)
{
_runs[(run.TenantId, run.Id)] = run;
}
}
public Task InsertAsync(Run run, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
{
_runs[(run.TenantId, run.Id)] = run;
return Task.CompletedTask;
}
public Task<bool> UpdateAsync(Run run, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
{
_runs[(run.TenantId, run.Id)] = run;
return Task.FromResult(true);
}
public Task<Run?> GetAsync(string tenantId, string runId, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
{
_runs.TryGetValue((tenantId, runId), out var run);
return Task.FromResult(run);
}
public Task<IReadOnlyList<Run>> ListAsync(string tenantId, RunQueryOptions? options = null, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
=> Task.FromResult<IReadOnlyList<Run>>(_runs.Values.Where(run => run.TenantId == tenantId).ToArray());
public Task<IReadOnlyList<Run>> ListByStateAsync(RunState state, int limit = 50, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
=> Task.FromResult<IReadOnlyList<Run>>(_runs.Values.Where(run => run.State == state).Take(limit).ToArray());
}
private sealed class RecordingImpactSnapshotRepository : IImpactSnapshotRepository
{
public ImpactSet? LastSnapshot { get; private set; }
public Task UpsertAsync(ImpactSet snapshot, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
{
LastSnapshot = snapshot;
return Task.CompletedTask;
}
public Task<ImpactSet?> GetBySnapshotIdAsync(string snapshotId, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
=> Task.FromResult<ImpactSet?>(null);
public Task<ImpactSet?> GetLatestBySelectorAsync(Selector selector, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
=> Task.FromResult<ImpactSet?>(null);
}
private sealed class RecordingRunSummaryService : IRunSummaryService
{
public Run? LastRun { get; private set; }
public Task<RunSummaryProjection> ProjectAsync(Run run, CancellationToken cancellationToken = default)
{
LastRun = run;
return Task.FromResult(new RunSummaryProjection(
run.TenantId,
run.ScheduleId ?? string.Empty,
DateTimeOffset.UtcNow,
null,
ImmutableArray<RunSummarySnapshot>.Empty,
new RunSummaryCounters(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)));
}
public Task<RunSummaryProjection?> GetAsync(string tenantId, string scheduleId, CancellationToken cancellationToken = default)
=> Task.FromResult<RunSummaryProjection?>(null);
public Task<IReadOnlyList<RunSummaryProjection>> ListAsync(string tenantId, CancellationToken cancellationToken = default)
=> Task.FromResult<IReadOnlyList<RunSummaryProjection>>(Array.Empty<RunSummaryProjection>());
}
private sealed class StubImpactTargetingService : IImpactTargetingService
{
private readonly ImpactSet _result;
public StubImpactTargetingService(ImpactSet result)
{
_result = result;
}
public ValueTask<ImpactSet> ResolveByPurlsAsync(IEnumerable<string> productKeys, bool usageOnly, Selector selector, CancellationToken cancellationToken = default)
=> new(_result);
public ValueTask<ImpactSet> ResolveByVulnerabilitiesAsync(IEnumerable<string> vulnerabilityIds, bool usageOnly, Selector selector, CancellationToken cancellationToken = default)
=> new(_result);
public ValueTask<ImpactSet> ResolveAllAsync(Selector selector, bool usageOnly, CancellationToken cancellationToken = default)
=> new(_result);
}
private sealed class RecordingPlannerQueue : ISchedulerPlannerQueue
{
public List<PlannerQueueMessage> Messages { get; } = new();
public ValueTask<SchedulerQueueEnqueueResult> EnqueueAsync(PlannerQueueMessage message, CancellationToken cancellationToken = default)
{
Messages.Add(message);
return ValueTask.FromResult(new SchedulerQueueEnqueueResult(Guid.NewGuid().ToString(), Deduplicated: false));
}
public ValueTask<IReadOnlyList<ISchedulerQueueLease<PlannerQueueMessage>>> LeaseAsync(SchedulerQueueLeaseRequest request, CancellationToken cancellationToken = default)
=> throw new NotSupportedException();
public ValueTask<IReadOnlyList<ISchedulerQueueLease<PlannerQueueMessage>>> ClaimExpiredAsync(SchedulerQueueClaimOptions options, CancellationToken cancellationToken = default)
=> throw new NotSupportedException();
}
}

View File

@@ -0,0 +1,170 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging.Abstractions;
using StellaOps.Scheduler.Models;
using StellaOps.Scheduler.Queue;
using StellaOps.Scheduler.Worker.Planning;
using StellaOps.Scheduler.Worker.Options;
namespace StellaOps.Scheduler.Worker.Tests;
public sealed class PlannerQueueDispatchServiceTests
{
[Fact]
public async Task DispatchAsync_EnqueuesRunnerSegmentsDeterministically()
{
var run = CreateRun();
var schedule = CreateSchedule(parallelism: 2, maxJobs: 4, ratePerSecond: 11);
var impactSet = CreateImpactSet(run.TenantId, count: 5);
var message = new PlannerQueueMessage(run, impactSet, schedule, correlationId: "corr-123");
var shardPlanner = new ImpactShardPlanner();
var runnerQueue = new RecordingRunnerQueue();
var service = new PlannerQueueDispatchService(
shardPlanner,
runnerQueue,
new SchedulerWorkerOptions(),
NullLogger<PlannerQueueDispatchService>.Instance);
var result = await service.DispatchAsync(message, CancellationToken.None);
Assert.Equal(PlannerQueueDispatchStatus.DispatchCompleted, result.Status);
Assert.Equal(2, result.SegmentCount);
Assert.Equal(4, runnerQueue.Messages.Sum(msg => msg.ImageDigests.Count));
Assert.All(runnerQueue.Messages, msg => Assert.Equal(run.Id, msg.RunId));
Assert.All(runnerQueue.Messages, msg => Assert.Equal(run.TenantId, msg.TenantId));
Assert.All(runnerQueue.Messages, msg => Assert.Equal(run.ScheduleId, msg.ScheduleId));
Assert.All(runnerQueue.Messages, msg => Assert.Equal(impactSet.UsageOnly, msg.UsageOnly));
Assert.All(runnerQueue.Messages, msg => Assert.Equal(11, msg.RatePerSecond));
Assert.Collection(
runnerQueue.Messages.OrderBy(msg => msg.SegmentId),
first =>
{
Assert.Equal($"{run.Id}:0000", first.SegmentId);
Assert.Equal(2, first.ImageDigests.Count);
},
second =>
{
Assert.Equal($"{run.Id}:0001", second.SegmentId);
Assert.Equal(2, second.ImageDigests.Count);
});
}
[Fact]
public async Task DispatchAsync_NoImages_ReturnsNoWork()
{
var run = CreateRun();
var schedule = CreateSchedule();
var impactSet = new ImpactSet(
new Selector(SelectorScope.AllImages, run.TenantId),
ImmutableArray<ImpactImage>.Empty,
usageOnly: true,
DateTimeOffset.UtcNow,
total: 0,
snapshotId: null,
schemaVersion: SchedulerSchemaVersions.ImpactSet);
var message = new PlannerQueueMessage(run, impactSet, schedule);
var shardPlanner = new StubImpactShardPlanner(ImmutableArray<ImpactShard>.Empty);
var runnerQueue = new RecordingRunnerQueue();
var service = new PlannerQueueDispatchService(
shardPlanner,
runnerQueue,
new SchedulerWorkerOptions(),
NullLogger<PlannerQueueDispatchService>.Instance);
var result = await service.DispatchAsync(message, CancellationToken.None);
Assert.Equal(PlannerQueueDispatchStatus.NoWork, result.Status);
Assert.Empty(runnerQueue.Messages);
}
private static Run CreateRun()
=> new(
id: "run-123",
tenantId: "tenant-abc",
trigger: RunTrigger.Cron,
state: RunState.Queued,
stats: new RunStats(candidates: 6, deduped: 5, queued: 5),
createdAt: DateTimeOffset.UtcNow.AddMinutes(-5),
scheduleId: "sched-789");
private static Schedule CreateSchedule(int? parallelism = null, int? maxJobs = null, int? ratePerSecond = null)
=> new(
id: "sched-789",
tenantId: "tenant-abc",
name: "Nightly",
enabled: true,
cronExpression: "0 2 * * *",
timezone: "UTC",
mode: ScheduleMode.AnalysisOnly,
selection: new Selector(SelectorScope.AllImages, tenantId: "tenant-abc"),
onlyIf: ScheduleOnlyIf.Default,
notify: ScheduleNotify.Default,
limits: new ScheduleLimits(maxJobs, ratePerSecond, parallelism),
createdAt: DateTimeOffset.UtcNow.AddDays(-1),
createdBy: "system",
updatedAt: DateTimeOffset.UtcNow.AddHours(-1),
updatedBy: "system",
subscribers: ImmutableArray<string>.Empty);
private static ImpactSet CreateImpactSet(string tenantId, int count)
{
var selector = new Selector(SelectorScope.AllImages, tenantId);
var images = Enumerable.Range(0, count)
.Select(index => new ImpactImage(
imageDigest: $"sha256:{index:D64}",
registry: "registry.example.com",
repository: "service/api",
namespaces: new[] { "team-a" },
tags: new[] { $"v{index}" },
usedByEntrypoint: index % 2 == 0))
.ToImmutableArray();
return new ImpactSet(
selector,
images,
usageOnly: true,
generatedAt: DateTimeOffset.UtcNow.AddMinutes(-2),
total: count,
snapshotId: "snapshot-xyz",
schemaVersion: SchedulerSchemaVersions.ImpactSet);
}
private sealed class RecordingRunnerQueue : ISchedulerRunnerQueue
{
public List<RunnerSegmentQueueMessage> Messages { get; } = new();
public ValueTask<SchedulerQueueEnqueueResult> EnqueueAsync(RunnerSegmentQueueMessage message, CancellationToken cancellationToken = default)
{
Messages.Add(message);
return ValueTask.FromResult(new SchedulerQueueEnqueueResult(Guid.NewGuid().ToString(), Deduplicated: false));
}
public ValueTask<IReadOnlyList<ISchedulerQueueLease<RunnerSegmentQueueMessage>>> LeaseAsync(
SchedulerQueueLeaseRequest request,
CancellationToken cancellationToken = default)
=> throw new NotSupportedException();
public ValueTask<IReadOnlyList<ISchedulerQueueLease<RunnerSegmentQueueMessage>>> ClaimExpiredAsync(
SchedulerQueueClaimOptions options,
CancellationToken cancellationToken = default)
=> throw new NotSupportedException();
}
private sealed class StubImpactShardPlanner : IImpactShardPlanner
{
private readonly ImmutableArray<ImpactShard> _result;
public StubImpactShardPlanner(ImmutableArray<ImpactShard> result)
{
_result = result;
}
public ImmutableArray<ImpactShard> PlanShards(ImpactSet impactSet, int? maxJobs, int? parallelism) => _result;
}
}

View File

@@ -0,0 +1,327 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using MongoDB.Driver;
using StellaOps.Scheduler.Models;
using StellaOps.Scheduler.Queue;
using StellaOps.Scheduler.Storage.Mongo.Repositories;
using StellaOps.Scheduler.Storage.Mongo.Services;
using StellaOps.Scheduler.Storage.Mongo.Projections;
using StellaOps.Scheduler.Worker.Events;
using StellaOps.Scheduler.Worker.Execution;
using StellaOps.Scheduler.Worker.Observability;
namespace StellaOps.Scheduler.Worker.Tests;
public sealed class RunnerExecutionServiceTests
{
[Fact]
public async Task ExecuteAsync_UpdatesRunStatsAndDeltas()
{
var run = CreateRun();
var repository = new InMemoryRunRepository(run);
var summaryService = new RecordingRunSummaryService();
var impactRepository = new InMemoryImpactSnapshotRepository(run.Id,
new[]
{
CreateImpactImage("sha256:1111111111111111111111111111111111111111111111111111111111111111", "registry-1", "repo-1"),
CreateImpactImage("sha256:2222222222222222222222222222222222222222222222222222222222222222", "registry-1", "repo-2")
});
var scannerClient = new StubScannerReportClient(new Dictionary<string, RunnerImageResult>
{
["sha256:1111111111111111111111111111111111111111111111111111111111111111"] = CreateRunnerImageResult(
"sha256:1111111111111111111111111111111111111111111111111111111111111111",
new DeltaSummary(
"sha256:1111111111111111111111111111111111111111111111111111111111111111",
newFindings: 2,
newCriticals: 1,
newHigh: 0,
newMedium: 1,
newLow: 0,
kevHits: ImmutableArray.Create("CVE-2025-0001"),
topFindings: ImmutableArray.Create(new DeltaFinding("pkg:purl", "CVE-2025-0001", SeverityRank.Critical)),
reportUrl: "https://scanner/reports/1",
attestation: null,
detectedAt: DateTimeOffset.UtcNow)),
["sha256:2222222222222222222222222222222222222222222222222222222222222222"] = CreateRunnerImageResult(
"sha256:2222222222222222222222222222222222222222222222222222222222222222",
delta: null)
});
var eventPublisher = new RecordingSchedulerEventPublisher();
using var metrics = new SchedulerWorkerMetrics();
var service = new RunnerExecutionService(
repository,
summaryService,
impactRepository,
scannerClient,
eventPublisher,
metrics,
TimeProvider.System,
NullLogger<RunnerExecutionService>.Instance);
var message = new RunnerSegmentQueueMessage(
segmentId: "run-123:0000",
runId: run.Id,
tenantId: run.TenantId,
imageDigests: new[]
{
"sha256:1111111111111111111111111111111111111111111111111111111111111111",
"sha256:2222222222222222222222222222222222222222222222222222222222222222"
},
scheduleId: run.ScheduleId,
ratePerSecond: null,
usageOnly: true,
attributes: new Dictionary<string, string>
{
["scheduleMode"] = ScheduleMode.AnalysisOnly.ToString(),
["impactSnapshotId"] = $"impact::{run.Id}"
},
correlationId: "corr-xyz");
var result = await service.ExecuteAsync(message, CancellationToken.None);
Assert.Equal(RunnerSegmentExecutionStatus.Completed, result.Status);
Assert.True(result.RunCompleted);
Assert.Equal(1, result.DeltaImages);
var persisted = repository.GetSnapshot(run.TenantId, run.Id);
Assert.NotNull(persisted);
Assert.Equal(2, persisted!.Stats.Completed);
Assert.Equal(1, persisted.Stats.Deltas);
Assert.Equal(1, persisted.Stats.NewCriticals);
Assert.Equal(1, persisted.Stats.NewMedium);
Assert.Contains(persisted.Deltas, delta => delta.ImageDigest == "sha256:1111111111111111111111111111111111111111111111111111111111111111");
Assert.Equal(persisted, summaryService.LastProjected);
Assert.Equal(2, eventPublisher.ReportReady.Count);
Assert.Single(eventPublisher.RescanDeltaPayloads);
}
[Fact]
public async Task ExecuteAsync_WhenRunMissing_ReturnsRunMissing()
{
var repository = new InMemoryRunRepository();
var impactRepository = new InMemoryImpactSnapshotRepository("run-123", Array.Empty<ImpactImage>());
var eventPublisher = new RecordingSchedulerEventPublisher();
using var metrics = new SchedulerWorkerMetrics();
var service = new RunnerExecutionService(
repository,
new RecordingRunSummaryService(),
impactRepository,
new StubScannerReportClient(new Dictionary<string, RunnerImageResult>()),
eventPublisher,
metrics,
TimeProvider.System,
NullLogger<RunnerExecutionService>.Instance);
var message = new RunnerSegmentQueueMessage(
segmentId: "run-123:0000",
runId: "run-123",
tenantId: "tenant-abc",
imageDigests: new[] { "sha256:3333333333333333333333333333333333333333333333333333333333333333" },
scheduleId: "sched-1",
ratePerSecond: null,
usageOnly: true,
attributes: new Dictionary<string, string>(),
correlationId: null);
var result = await service.ExecuteAsync(message, CancellationToken.None);
Assert.Equal(RunnerSegmentExecutionStatus.RunMissing, result.Status);
}
private static Run CreateRun()
=> new(
id: "run-123",
tenantId: "tenant-abc",
trigger: RunTrigger.Cron,
state: RunState.Queued,
stats: new RunStats(
candidates: 4,
deduped: 4,
queued: 2,
completed: 0,
deltas: 0,
newCriticals: 0,
newHigh: 0,
newMedium: 0,
newLow: 0),
createdAt: DateTimeOffset.UtcNow.AddMinutes(-10),
scheduleId: "sched-1");
private static ImpactImage CreateImpactImage(string digest, string registry, string repository)
=> new(
imageDigest: digest,
registry: registry,
repository: repository,
namespaces: null,
tags: null,
usedByEntrypoint: false,
labels: null);
private static RunnerImageResult CreateRunnerImageResult(string digest, DeltaSummary? delta)
{
var newTotal = delta?.NewFindings ?? 0;
var summary = new RunnerReportSummary(
Total: newTotal,
Blocked: delta?.NewCriticals ?? 0,
Warned: delta?.NewHigh ?? 0,
Ignored: delta?.NewLow ?? 0,
Quieted: 0);
var snapshot = new RunnerReportSnapshot(
ReportId: $"report-{digest[^4..]}",
ImageDigest: digest,
Verdict: "warn",
GeneratedAt: DateTimeOffset.UtcNow,
Summary: summary,
PolicyRevisionId: "pol-rev",
PolicyDigest: "pol-digest");
return new RunnerImageResult(
digest,
delta,
ContentRefreshed: false,
snapshot,
Dsse: null);
}
private sealed class InMemoryRunRepository : IRunRepository
{
private readonly ConcurrentDictionary<(string Tenant, string RunId), Run> _runs = new();
public InMemoryRunRepository(params Run[] runs)
{
foreach (var run in runs)
{
_runs[(run.TenantId, run.Id)] = run;
}
}
public Task InsertAsync(Run run, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
{
_runs[(run.TenantId, run.Id)] = run;
return Task.CompletedTask;
}
public Task<bool> UpdateAsync(Run run, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
{
_runs[(run.TenantId, run.Id)] = run;
return Task.FromResult(true);
}
public Task<Run?> GetAsync(string tenantId, string runId, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
{
_runs.TryGetValue((tenantId, runId), out var run);
return Task.FromResult(run);
}
public Task<IReadOnlyList<Run>> ListAsync(string tenantId, RunQueryOptions? options = null, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
=> Task.FromResult<IReadOnlyList<Run>>(_runs.Values.Where(run => run.TenantId == tenantId).ToArray());
public Task<IReadOnlyList<Run>> ListByStateAsync(RunState state, int limit = 50, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
=> Task.FromResult<IReadOnlyList<Run>>(_runs.Values.Where(run => run.State == state).Take(limit).ToArray());
public Run? GetSnapshot(string tenantId, string runId)
{
_runs.TryGetValue((tenantId, runId), out var run);
return run;
}
}
private sealed class InMemoryImpactSnapshotRepository : IImpactSnapshotRepository
{
private readonly string _snapshotId;
private readonly ImpactSet _snapshot;
public InMemoryImpactSnapshotRepository(string runId, IEnumerable<ImpactImage> images)
{
_snapshotId = $"impact::{runId}";
var imageArray = images.ToImmutableArray();
_snapshot = new ImpactSet(
new Selector(SelectorScope.AllImages, "tenant-abc"),
imageArray,
usageOnly: true,
generatedAt: DateTimeOffset.UtcNow,
total: imageArray.Length);
}
public Task UpsertAsync(ImpactSet snapshot, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
=> Task.CompletedTask;
public Task<ImpactSet?> GetBySnapshotIdAsync(string snapshotId, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
=> Task.FromResult<ImpactSet?>(string.Equals(snapshotId, _snapshotId, StringComparison.Ordinal) ? _snapshot : null);
public Task<ImpactSet?> GetLatestBySelectorAsync(Selector selector, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
=> Task.FromResult<ImpactSet?>(_snapshot);
}
private sealed class RecordingSchedulerEventPublisher : ISchedulerEventPublisher
{
public List<(Run run, RunnerImageResult result)> ReportReady { get; } = new();
public List<(Run run, IReadOnlyList<DeltaSummary> deltas)> RescanDeltaPayloads { get; } = new();
public Task PublishReportReadyAsync(Run run, RunnerSegmentQueueMessage message, RunnerImageResult result, ImpactImage? impactImage, CancellationToken cancellationToken)
{
ReportReady.Add((run, result));
return Task.CompletedTask;
}
public Task PublishRescanDeltaAsync(Run run, RunnerSegmentQueueMessage message, IReadOnlyList<DeltaSummary> deltas, IReadOnlyDictionary<string, ImpactImage> impactLookup, CancellationToken cancellationToken)
{
RescanDeltaPayloads.Add((run, deltas));
return Task.CompletedTask;
}
}
private sealed class RecordingRunSummaryService : IRunSummaryService
{
public Run? LastProjected { get; private set; }
public Task<RunSummaryProjection> ProjectAsync(Run run, CancellationToken cancellationToken = default)
{
LastProjected = run;
return Task.FromResult(new RunSummaryProjection(
run.TenantId,
run.ScheduleId ?? string.Empty,
DateTimeOffset.UtcNow,
null,
ImmutableArray<RunSummarySnapshot>.Empty,
new RunSummaryCounters(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)));
}
public Task<RunSummaryProjection?> GetAsync(string tenantId, string scheduleId, CancellationToken cancellationToken = default)
=> Task.FromResult<RunSummaryProjection?>(null);
public Task<IReadOnlyList<RunSummaryProjection>> ListAsync(string tenantId, CancellationToken cancellationToken = default)
=> Task.FromResult<IReadOnlyList<RunSummaryProjection>>(Array.Empty<RunSummaryProjection>());
}
private sealed class StubScannerReportClient : IScannerReportClient
{
private readonly IReadOnlyDictionary<string, RunnerImageResult> _responses;
public StubScannerReportClient(IReadOnlyDictionary<string, RunnerImageResult> responses)
{
_responses = responses;
}
public Task<RunnerImageResult> ExecuteAsync(ScannerReportRequest request, CancellationToken cancellationToken = default)
{
if (_responses.TryGetValue(request.ImageDigest, out var result))
{
return Task.FromResult(result);
}
return Task.FromResult(CreateRunnerImageResult(request.ImageDigest, delta: null));
}
}
}

View File

@@ -0,0 +1,140 @@
using System;
using System.Collections.Generic;
using System.Text.Json.Nodes;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging.Abstractions;
using StellaOps.Notify.Models;
using StellaOps.Notify.Queue;
using StellaOps.Scheduler.Models;
using StellaOps.Scheduler.Queue;
using StellaOps.Scheduler.Worker.Events;
using StellaOps.Scheduler.Worker.Execution;
using Xunit;
namespace StellaOps.Scheduler.Worker.Tests;
public sealed class SchedulerEventPublisherTests
{
[Fact]
public async Task PublishReportReadyAsync_EnqueuesNotifyEvent()
{
var queue = new RecordingNotifyEventQueue();
var options = new NotifyEventQueueOptions();
var publisher = new SchedulerEventPublisher(queue, options, TimeProvider.System, NullLogger<SchedulerEventPublisher>.Instance);
var run = CreateRun();
var message = CreateMessage(run);
var delta = new DeltaSummary(
run.Id,
newFindings: 2,
newCriticals: 1,
newHigh: 1,
newMedium: 0,
newLow: 0);
var result = CreateRunnerImageResult(run.Id, delta);
var impact = new ImpactImage(run.Id, "registry", "repository");
await publisher.PublishReportReadyAsync(run, message, result, impact, CancellationToken.None);
Assert.Single(queue.Messages);
var notifyEvent = queue.Messages[0].Event;
Assert.Equal(NotifyEventKinds.ScannerReportReady, notifyEvent.Kind);
Assert.Equal(run.TenantId, notifyEvent.Tenant);
Assert.NotNull(notifyEvent.Scope);
Assert.Equal("repository", notifyEvent.Scope!.Repo);
var payload = Assert.IsType<JsonObject>(notifyEvent.Payload);
Assert.Equal(result.Report.ReportId, payload["reportId"]!.GetValue<string>());
Assert.Equal("warn", payload["verdict"]!.GetValue<string>());
var deltaNode = Assert.IsType<JsonObject>(payload["delta"]);
Assert.Equal(1, deltaNode["newCritical"]!.GetValue<int>());
}
[Fact]
public async Task PublishRescanDeltaAsync_EnqueuesDeltaEvent()
{
var queue = new RecordingNotifyEventQueue();
var options = new NotifyEventQueueOptions();
var publisher = new SchedulerEventPublisher(queue, options, TimeProvider.System, NullLogger<SchedulerEventPublisher>.Instance);
var run = CreateRun();
var message = CreateMessage(run);
var delta = new DeltaSummary(run.Id, 1, 1, 0, 0, 0);
var impactLookup = new Dictionary<string, ImpactImage>
{
[run.Id] = new ImpactImage(run.Id, "registry", "repository")
};
await publisher.PublishRescanDeltaAsync(run, message, new[] { delta }, impactLookup, CancellationToken.None);
Assert.Single(queue.Messages);
var notifyEvent = queue.Messages[0].Event;
Assert.Equal(NotifyEventKinds.SchedulerRescanDelta, notifyEvent.Kind);
var payload = Assert.IsType<JsonObject>(notifyEvent.Payload);
var digests = Assert.IsType<JsonArray>(payload["impactedDigests"]);
Assert.Equal(run.Id, digests[0]!.GetValue<string>());
}
private const string SampleDigest = "sha256:1111111111111111111111111111111111111111111111111111111111111111";
private static Run CreateRun()
=> new(
id: SampleDigest,
tenantId: "tenant-1",
trigger: RunTrigger.Cron,
state: RunState.Running,
stats: new RunStats(queued: 1, completed: 0),
createdAt: DateTimeOffset.UtcNow,
scheduleId: "schedule-1");
private static RunnerSegmentQueueMessage CreateMessage(Run run)
=> new(
segmentId: $"{run.Id}:0000",
runId: run.Id,
tenantId: run.TenantId,
imageDigests: new[] { run.Id },
scheduleId: run.ScheduleId,
ratePerSecond: null,
usageOnly: true,
attributes: new Dictionary<string, string>(StringComparer.Ordinal)
{
["scheduleMode"] = ScheduleMode.AnalysisOnly.ToString()
});
private static RunnerImageResult CreateRunnerImageResult(string digest, DeltaSummary? delta)
{
var summary = new RunnerReportSummary(
Total: delta?.NewFindings ?? 0,
Blocked: delta?.NewCriticals ?? 0,
Warned: delta?.NewHigh ?? 0,
Ignored: delta?.NewLow ?? 0,
Quieted: 0);
var snapshot = new RunnerReportSnapshot(
ReportId: $"report-{digest[^4..]}",
ImageDigest: digest,
Verdict: "warn",
GeneratedAt: DateTimeOffset.UtcNow,
Summary: summary,
PolicyRevisionId: null,
PolicyDigest: null);
return new RunnerImageResult(digest, delta, ContentRefreshed: false, snapshot, Dsse: null);
}
private sealed class RecordingNotifyEventQueue : INotifyEventQueue
{
public List<NotifyQueueEventMessage> Messages { get; } = new();
public ValueTask<NotifyQueueEnqueueResult> PublishAsync(NotifyQueueEventMessage message, CancellationToken cancellationToken = default)
{
Messages.Add(message);
return ValueTask.FromResult(new NotifyQueueEnqueueResult(Guid.NewGuid().ToString("N"), false));
}
public ValueTask<IReadOnlyList<INotifyQueueLease<NotifyQueueEventMessage>>> LeaseAsync(NotifyQueueLeaseRequest request, CancellationToken cancellationToken = default)
=> throw new NotSupportedException();
public ValueTask<IReadOnlyList<INotifyQueueLease<NotifyQueueEventMessage>>> ClaimExpiredAsync(NotifyQueueClaimOptions options, CancellationToken cancellationToken = default)
=> throw new NotSupportedException();
}
}

View File

@@ -3,15 +3,19 @@
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<UseConcelierTestInfra>false</UseConcelierTestInfra>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.14.0" />
<PackageReference Include="Moq" Version="4.20.70" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.0-rc.2.25502.107" />
<PackageReference Include="MongoDB.Driver" Version="3.5.0" />
<PackageReference Include="xunit" Version="2.9.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.2" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="../StellaOps.Scheduler.Worker/StellaOps.Scheduler.Worker.csproj" />
<ProjectReference Include="../StellaOps.Scheduler.Models/StellaOps.Scheduler.Models.csproj" />
<ProjectReference Include="../StellaOps.Notify.Models/StellaOps.Notify.Models.csproj" />
<ProjectReference Include="../StellaOps.Notify.Queue/StellaOps.Notify.Queue.csproj" />
</ItemGroup>
</Project>

View File

@@ -0,0 +1,59 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using StellaOps.Notify.Queue;
using StellaOps.Scheduler.Worker.Events;
using StellaOps.Scheduler.Worker.Execution;
using StellaOps.Scheduler.Worker.Options;
using StellaOps.Scheduler.Worker.Observability;
using StellaOps.Scheduler.Worker.Planning;
namespace StellaOps.Scheduler.Worker.DependencyInjection;
public static class SchedulerWorkerServiceCollectionExtensions
{
public static IServiceCollection AddSchedulerWorker(this IServiceCollection services, IConfiguration configuration)
{
ArgumentNullException.ThrowIfNull(services);
ArgumentNullException.ThrowIfNull(configuration);
services
.AddOptions<SchedulerWorkerOptions>()
.Bind(configuration)
.PostConfigure(options => options.Validate());
services.AddSingleton(TimeProvider.System);
services.AddSingleton<SchedulerWorkerMetrics>();
services.AddSingleton<IImpactTargetingService, ImpactTargetingService>();
services.AddSingleton<IImpactShardPlanner, ImpactShardPlanner>();
services.AddSingleton<IPlannerQueueDispatchService, PlannerQueueDispatchService>();
services.AddSingleton<PlannerExecutionService>();
services.AddSingleton<IRunnerExecutionService, RunnerExecutionService>();
services.AddSingleton<ISchedulerEventPublisher>(sp =>
{
var loggerFactory = sp.GetRequiredService<ILoggerFactory>();
var queue = sp.GetService<INotifyEventQueue>();
var queueOptions = sp.GetService<NotifyEventQueueOptions>();
var timeProvider = sp.GetRequiredService<TimeProvider>();
if (queue is null || queueOptions is null)
{
return new NullSchedulerEventPublisher(loggerFactory.CreateLogger<NullSchedulerEventPublisher>());
}
return new SchedulerEventPublisher(
queue,
queueOptions,
timeProvider,
loggerFactory.CreateLogger<SchedulerEventPublisher>());
});
services.AddHttpClient<IScannerReportClient, HttpScannerReportClient>();
services.AddHostedService<PlannerBackgroundService>();
services.AddHostedService<PlannerQueueDispatcherBackgroundService>();
services.AddHostedService<RunnerBackgroundService>();
return services;
}
}

View File

@@ -0,0 +1,501 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json.Nodes;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using StellaOps.Notify.Models;
using StellaOps.Notify.Queue;
using StellaOps.Scheduler.Models;
using StellaOps.Scheduler.Queue;
using StellaOps.Scheduler.Worker.Execution;
namespace StellaOps.Scheduler.Worker.Events;
internal interface ISchedulerEventPublisher
{
Task PublishReportReadyAsync(
Run run,
RunnerSegmentQueueMessage message,
RunnerImageResult result,
ImpactImage? impactImage,
CancellationToken cancellationToken);
Task PublishRescanDeltaAsync(
Run run,
RunnerSegmentQueueMessage message,
IReadOnlyList<DeltaSummary> deltas,
IReadOnlyDictionary<string, ImpactImage> impactLookup,
CancellationToken cancellationToken);
}
internal sealed class SchedulerEventPublisher : ISchedulerEventPublisher
{
private const string Source = "scheduler.worker";
private readonly INotifyEventQueue _queue;
private readonly NotifyEventQueueOptions _queueOptions;
private readonly TimeProvider _timeProvider;
private readonly ILogger<SchedulerEventPublisher> _logger;
private readonly string _stream;
public SchedulerEventPublisher(
INotifyEventQueue queue,
NotifyEventQueueOptions queueOptions,
TimeProvider timeProvider,
ILogger<SchedulerEventPublisher> logger)
{
_queue = queue ?? throw new ArgumentNullException(nameof(queue));
_queueOptions = queueOptions ?? throw new ArgumentNullException(nameof(queueOptions));
_timeProvider = timeProvider ?? TimeProvider.System;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_stream = ResolveStream(queueOptions);
}
public async Task PublishReportReadyAsync(
Run run,
RunnerSegmentQueueMessage message,
RunnerImageResult result,
ImpactImage? impactImage,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(run);
ArgumentNullException.ThrowIfNull(message);
ArgumentNullException.ThrowIfNull(result);
if (result.Report is null)
{
return;
}
var now = _timeProvider.GetUtcNow();
var occurredAt = result.Report.GeneratedAt == default ? now : result.Report.GeneratedAt;
var scope = BuildScope(result.ImageDigest, impactImage);
var payload = BuildReportPayload(result);
var attributes = BuildReportAttributes(run, message, result, impactImage);
var notifyEvent = NotifyEvent.Create(
eventId: Guid.NewGuid(),
kind: NotifyEventKinds.ScannerReportReady,
tenant: run.TenantId,
ts: occurredAt,
payload: payload,
scope: scope,
version: "1",
actor: Source,
attributes: attributes);
await PublishAsync(notifyEvent, run, message, cancellationToken).ConfigureAwait(false);
}
public async Task PublishRescanDeltaAsync(
Run run,
RunnerSegmentQueueMessage message,
IReadOnlyList<DeltaSummary> deltas,
IReadOnlyDictionary<string, ImpactImage> impactLookup,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(run);
ArgumentNullException.ThrowIfNull(message);
ArgumentNullException.ThrowIfNull(deltas);
if (deltas.Count == 0)
{
return;
}
var now = _timeProvider.GetUtcNow();
var payload = BuildRescanPayload(run, deltas);
var attributes = BuildRescanAttributes(run, message, deltas, impactLookup);
var notifyEvent = NotifyEvent.Create(
eventId: Guid.NewGuid(),
kind: NotifyEventKinds.SchedulerRescanDelta,
tenant: run.TenantId,
ts: now,
payload: payload,
version: "1",
actor: Source,
attributes: attributes);
await PublishAsync(notifyEvent, run, message, cancellationToken).ConfigureAwait(false);
}
private async Task PublishAsync(
NotifyEvent notifyEvent,
Run run,
RunnerSegmentQueueMessage message,
CancellationToken cancellationToken)
{
var partitionKey = string.IsNullOrWhiteSpace(run.ScheduleId) ? run.Id : run.ScheduleId!;
var traceId = string.IsNullOrWhiteSpace(message.CorrelationId) ? null : message.CorrelationId!.Trim();
var queueAttributes = new Dictionary<string, string>(StringComparer.Ordinal)
{
["source"] = Source
};
try
{
var queueMessage = new NotifyQueueEventMessage(
notifyEvent,
_stream,
partitionKey: partitionKey,
traceId: traceId,
attributes: queueAttributes);
await _queue.PublishAsync(queueMessage, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (Exception ex)
{
_logger.LogError(
ex,
"Failed to publish scheduler event {EventKind} for run {RunId}.",
notifyEvent.Kind,
run.Id);
}
}
private static NotifyEventScope BuildScope(string imageDigest, ImpactImage? impactImage)
{
var repo = impactImage?.Repository;
if (string.IsNullOrWhiteSpace(repo))
{
repo = "(unknown)";
}
var @namespace = impactImage?.Namespaces.IsDefaultOrEmpty == false
? impactImage.Namespaces[0]
: impactImage?.Registry;
return NotifyEventScope.Create(
@namespace: string.IsNullOrWhiteSpace(@namespace) ? null : @namespace,
repo: repo,
digest: imageDigest);
}
private static JsonObject BuildReportPayload(RunnerImageResult result)
{
var payload = new JsonObject
{
["reportId"] = result.Report.ReportId,
["verdict"] = string.IsNullOrWhiteSpace(result.Report.Verdict)
? "warn"
: result.Report.Verdict.ToLowerInvariant()
};
if (result.Report.GeneratedAt != default)
{
payload["generatedAt"] = JsonValue.Create(result.Report.GeneratedAt.ToUniversalTime());
}
payload["summary"] = BuildSummaryNode(result.Report.Summary);
payload["delta"] = BuildDeltaNode(result.Delta);
payload["links"] = BuildLinksNode(result.Delta);
if (!string.IsNullOrWhiteSpace(result.Report.PolicyRevisionId) ||
!string.IsNullOrWhiteSpace(result.Report.PolicyDigest))
{
var policy = new JsonObject();
if (!string.IsNullOrWhiteSpace(result.Report.PolicyRevisionId))
{
policy["revisionId"] = result.Report.PolicyRevisionId;
}
if (!string.IsNullOrWhiteSpace(result.Report.PolicyDigest))
{
policy["digest"] = result.Report.PolicyDigest;
}
payload["policy"] = policy;
}
if (result.Report.Summary.Quieted > 0)
{
payload["quietedFindingCount"] = result.Report.Summary.Quieted;
}
if (result.Dsse is not null)
{
payload["dsse"] = BuildDsseNode(result.Dsse);
}
return payload;
}
private static JsonObject BuildSummaryNode(RunnerReportSummary summary)
{
return new JsonObject
{
["total"] = summary.Total,
["blocked"] = summary.Blocked,
["warned"] = summary.Warned,
["ignored"] = summary.Ignored,
["quieted"] = summary.Quieted
};
}
private static JsonObject BuildDeltaNode(DeltaSummary? delta)
{
var node = new JsonObject
{
["newCritical"] = delta?.NewCriticals ?? 0,
["newHigh"] = delta?.NewHigh ?? 0
};
var kevArray = new JsonArray();
if (delta is not null && !delta.KevHits.IsDefaultOrEmpty)
{
foreach (var kev in delta.KevHits)
{
kevArray.Add(kev);
}
}
node["kev"] = kevArray;
return node;
}
private static JsonObject BuildLinksNode(DeltaSummary? delta)
{
var links = new JsonObject();
if (delta is not null && !string.IsNullOrWhiteSpace(delta.ReportUrl))
{
links["ui"] = delta.ReportUrl;
}
if (delta?.Attestation?.Uuid is { Length: > 0 } uuid)
{
links["rekor"] = uuid;
}
return links;
}
private static JsonObject BuildDsseNode(RunnerDsseEnvelope envelope)
{
var node = new JsonObject
{
["payloadType"] = envelope.PayloadType,
["payload"] = envelope.Payload
};
if (envelope.Signatures.Count > 0)
{
var signatures = new JsonArray();
foreach (var signature in envelope.Signatures)
{
signatures.Add(new JsonObject
{
["keyId"] = signature.KeyId,
["algorithm"] = signature.Algorithm,
["signature"] = signature.Signature
});
}
node["signatures"] = signatures;
}
else
{
node["signatures"] = new JsonArray();
}
return node;
}
private static IEnumerable<KeyValuePair<string, string>> BuildReportAttributes(
Run run,
RunnerSegmentQueueMessage message,
RunnerImageResult result,
ImpactImage? impactImage)
{
var attributes = new Dictionary<string, string>(StringComparer.Ordinal)
{
["source"] = Source,
["runId"] = run.Id,
["segmentId"] = message.SegmentId,
["trigger"] = run.Trigger.ToString(),
["scheduleId"] = run.ScheduleId ?? string.Empty,
["reportId"] = result.Report.ReportId,
["verdict"] = string.IsNullOrWhiteSpace(result.Report.Verdict)
? "warn"
: result.Report.Verdict.ToLowerInvariant()
};
if (!string.IsNullOrWhiteSpace(message.CorrelationId))
{
attributes["correlationId"] = message.CorrelationId!;
}
if (!string.IsNullOrWhiteSpace(result.Report.PolicyRevisionId))
{
attributes["policyRevisionId"] = result.Report.PolicyRevisionId!;
}
if (!string.IsNullOrWhiteSpace(result.Report.PolicyDigest))
{
attributes["policyDigest"] = result.Report.PolicyDigest!;
}
if (impactImage is not null)
{
attributes["registry"] = impactImage.Registry;
attributes["repository"] = impactImage.Repository;
}
if (result.Delta is not null)
{
attributes["deltaImages"] = "1";
attributes["deltaNewCritical"] = result.Delta.NewCriticals.ToString();
attributes["deltaNewHigh"] = result.Delta.NewHigh.ToString();
}
return attributes;
}
private static JsonObject BuildRescanPayload(Run run, IReadOnlyList<DeltaSummary> deltas)
{
var totalFindings = deltas.Sum(delta => delta.NewFindings);
if (totalFindings <= 0)
{
totalFindings = deltas.Count;
}
var payload = new JsonObject
{
["scheduleId"] = string.IsNullOrWhiteSpace(run.ScheduleId) ? string.Empty : run.ScheduleId,
["impactedDigests"] = new JsonArray(deltas.Select(delta => JsonValue.Create(delta.ImageDigest)).ToArray()),
["summary"] = new JsonObject
{
["newCritical"] = deltas.Sum(delta => delta.NewCriticals),
["newHigh"] = deltas.Sum(delta => delta.NewHigh),
["total"] = totalFindings
}
};
var reason = BuildReason(run.Reason);
if (!string.IsNullOrWhiteSpace(reason))
{
payload["reason"] = reason;
}
return payload;
}
private static IEnumerable<KeyValuePair<string, string>> BuildRescanAttributes(
Run run,
RunnerSegmentQueueMessage message,
IReadOnlyList<DeltaSummary> deltas,
IReadOnlyDictionary<string, ImpactImage> impactLookup)
{
var attributes = new Dictionary<string, string>(StringComparer.Ordinal)
{
["source"] = Source,
["runId"] = run.Id,
["segmentId"] = message.SegmentId,
["trigger"] = run.Trigger.ToString(),
["scheduleId"] = run.ScheduleId ?? string.Empty,
["deltaCount"] = deltas.Count.ToString()
};
if (!string.IsNullOrWhiteSpace(message.CorrelationId))
{
attributes["correlationId"] = message.CorrelationId!;
}
if (impactLookup.Count > 0)
{
var repositories = deltas
.Select(delta => impactLookup.TryGetValue(delta.ImageDigest, out var impact) ? impact.Repository : null)
.Where(repo => !string.IsNullOrWhiteSpace(repo))
.Distinct(StringComparer.Ordinal)
.ToArray();
if (repositories.Length > 0)
{
attributes["repositories"] = string.Join(",", repositories);
}
}
return attributes;
}
private static string? BuildReason(RunReason reason)
{
if (!string.IsNullOrWhiteSpace(reason.ManualReason))
{
return $"manual:{reason.ManualReason}";
}
if (!string.IsNullOrWhiteSpace(reason.FeedserExportId))
{
return $"feedser:{reason.FeedserExportId}";
}
if (!string.IsNullOrWhiteSpace(reason.VexerExportId))
{
return $"vexer:{reason.VexerExportId}";
}
return null;
}
private static string ResolveStream(NotifyEventQueueOptions options)
{
return options.Transport switch
{
NotifyQueueTransportKind.Nats => string.IsNullOrWhiteSpace(options.Nats.Subject)
? "notify.events"
: options.Nats.Subject,
_ => options.Redis.Streams.Count > 0 && !string.IsNullOrWhiteSpace(options.Redis.Streams[0].Stream)
? options.Redis.Streams[0].Stream
: "notify:events"
};
}
}
internal sealed class NullSchedulerEventPublisher : ISchedulerEventPublisher
{
private readonly ILogger<NullSchedulerEventPublisher> _logger;
private int _hasWarned;
public NullSchedulerEventPublisher(ILogger<NullSchedulerEventPublisher> logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public Task PublishReportReadyAsync(
Run run,
RunnerSegmentQueueMessage message,
RunnerImageResult result,
ImpactImage? impactImage,
CancellationToken cancellationToken)
{
WarnOnce();
return Task.CompletedTask;
}
public Task PublishRescanDeltaAsync(
Run run,
RunnerSegmentQueueMessage message,
IReadOnlyList<DeltaSummary> deltas,
IReadOnlyDictionary<string, ImpactImage> impactLookup,
CancellationToken cancellationToken)
{
WarnOnce();
return Task.CompletedTask;
}
private void WarnOnce()
{
if (Interlocked.Exchange(ref _hasWarned, 1) == 0)
{
_logger.LogWarning("Notify event queue not configured; scheduler events will not be published.");
}
}
}

View File

@@ -0,0 +1,319 @@
using System;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Json;
using System.Text.Json;
using System.Text.Json.Serialization;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StellaOps.Scheduler.Models;
using StellaOps.Scheduler.Worker.Options;
namespace StellaOps.Scheduler.Worker.Execution;
internal sealed class HttpScannerReportClient : IScannerReportClient
{
private static readonly JsonSerializerOptions SerializerOptions = new(JsonSerializerDefaults.Web)
{
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull
};
private readonly HttpClient _httpClient;
private readonly IOptions<SchedulerWorkerOptions> _options;
private readonly ILogger<HttpScannerReportClient> _logger;
public HttpScannerReportClient(
HttpClient httpClient,
IOptions<SchedulerWorkerOptions> options,
ILogger<HttpScannerReportClient> logger)
{
_httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient));
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<RunnerImageResult> ExecuteAsync(
ScannerReportRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
var settings = _options.Value.Runner.Scanner;
ConfigureHttpClientBaseAddress(settings);
if (request.Mode == ScheduleMode.ContentRefresh && settings.EnableContentRefresh)
{
await TriggerContentRefreshAsync(request, settings, cancellationToken).ConfigureAwait(false);
}
var report = await FetchReportAsync(request, settings, cancellationToken).ConfigureAwait(false);
var reportSnapshot = BuildReportSnapshot(report, request.ImageDigest);
var delta = BuildDeltaSummary(report, request.ImageDigest);
var dsse = BuildDsseEnvelope(report);
return new RunnerImageResult(
request.ImageDigest,
delta,
ContentRefreshed: request.Mode == ScheduleMode.ContentRefresh && settings.EnableContentRefresh,
reportSnapshot,
dsse);
}
private void ConfigureHttpClientBaseAddress(SchedulerWorkerOptions.RunnerOptions.ScannerOptions settings)
{
if (settings.BaseAddress is not null && _httpClient.BaseAddress != settings.BaseAddress)
{
_httpClient.BaseAddress = settings.BaseAddress;
}
}
private async Task TriggerContentRefreshAsync(
ScannerReportRequest request,
SchedulerWorkerOptions.RunnerOptions.ScannerOptions settings,
CancellationToken cancellationToken)
{
try
{
var response = await _httpClient.PostAsJsonAsync(
settings.ScansPath,
new ScanSubmitRequest(new ScanTargetRequest(null, request.ImageDigest)),
SerializerOptions,
cancellationToken).ConfigureAwait(false);
if (!response.IsSuccessStatusCode)
{
_logger.LogWarning(
"Scanner content refresh submission returned status {StatusCode} for digest {Digest}.",
(int)response.StatusCode,
request.ImageDigest);
}
}
catch (HttpRequestException ex)
{
_logger.LogWarning(
ex,
"Scanner content refresh submission failed for digest {Digest}. Proceeding with report request.",
request.ImageDigest);
}
}
private async Task<ReportResponse> FetchReportAsync(
ScannerReportRequest request,
SchedulerWorkerOptions.RunnerOptions.ScannerOptions settings,
CancellationToken cancellationToken)
{
var payload = new ReportRequest { ImageDigest = request.ImageDigest };
HttpResponseMessage? response = null;
var attempt = 0;
while (true)
{
attempt++;
try
{
response = await _httpClient.PostAsJsonAsync(
settings.ReportsPath,
payload,
SerializerOptions,
cancellationToken)
.ConfigureAwait(false);
response.EnsureSuccessStatusCode();
var report = await response.Content.ReadFromJsonAsync<ReportResponse>(SerializerOptions, cancellationToken)
.ConfigureAwait(false);
if (report is null)
{
throw new InvalidOperationException("Scanner response payload was empty.");
}
return report;
}
catch (Exception ex) when (IsTransient(ex) && attempt <= settings.MaxRetryAttempts)
{
var delay = TimeSpan.FromMilliseconds(settings.RetryBaseDelay.TotalMilliseconds * Math.Pow(2, attempt - 1));
_logger.LogWarning(
ex,
"Scanner report attempt {Attempt} failed for digest {Digest}; retrying in {Delay}.",
attempt,
request.ImageDigest,
delay);
await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
}
finally
{
response?.Dispose();
}
}
}
private static bool IsTransient(Exception exception)
=> exception is HttpRequestException or TaskCanceledException;
private static RunnerReportSnapshot BuildReportSnapshot(ReportResponse report, string fallbackDigest)
{
var document = report.Report ?? new ReportDocument();
var summary = document.Summary ?? new ReportSummary();
return new RunnerReportSnapshot(
string.IsNullOrWhiteSpace(document.ReportId) ? Guid.NewGuid().ToString("N") : document.ReportId,
string.IsNullOrWhiteSpace(document.ImageDigest) ? fallbackDigest : document.ImageDigest,
string.IsNullOrWhiteSpace(document.Verdict) ? "warn" : document.Verdict,
document.GeneratedAt,
new RunnerReportSummary(
summary.Total,
summary.Blocked,
summary.Warned,
summary.Ignored,
summary.Quieted),
document.Policy?.RevisionId,
document.Policy?.Digest);
}
private static RunnerDsseEnvelope? BuildDsseEnvelope(ReportResponse report)
{
if (report.Dsse is null || string.IsNullOrWhiteSpace(report.Dsse.PayloadType))
{
return null;
}
var signatures = report.Dsse.Signatures is null
? Array.Empty<RunnerDsseSignature>()
: report.Dsse.Signatures
.Where(signature => signature is not null)
.Select(signature => new RunnerDsseSignature(
signature!.KeyId,
signature.Algorithm,
signature.Signature))
.ToArray();
return new RunnerDsseEnvelope(
report.Dsse.PayloadType,
report.Dsse.Payload,
signatures);
}
private static DeltaSummary? BuildDeltaSummary(ReportResponse report, string imageDigest)
{
if (report?.Report?.Summary is null)
{
return null;
}
var summary = report.Report.Summary;
var blocked = summary.Blocked;
var warned = summary.Warned;
var ignored = summary.Ignored;
var newFindings = blocked + warned;
if (newFindings == 0 && ignored == 0)
{
return null;
}
return new DeltaSummary(
imageDigest,
newFindings,
newCriticals: blocked,
newHigh: warned,
newMedium: 0,
newLow: ignored,
kevHits: Array.Empty<string>(),
topFindings: Array.Empty<DeltaFinding>(),
reportUrl: null,
attestation: null,
detectedAt: report.Report.GeneratedAt == default ? null : report.Report.GeneratedAt);
}
private sealed record ReportRequest
{
[JsonPropertyName("imageDigest")]
public string ImageDigest { get; init; } = string.Empty;
}
private sealed record ReportResponse
{
[JsonPropertyName("report")]
public ReportDocument Report { get; init; } = new();
[JsonPropertyName("dsse")]
public DsseEnvelope? Dsse { get; init; }
}
private sealed record ReportDocument
{
[JsonPropertyName("reportId")]
public string ReportId { get; init; } = string.Empty;
[JsonPropertyName("imageDigest")]
public string ImageDigest { get; init; } = string.Empty;
[JsonPropertyName("generatedAt")]
public DateTimeOffset GeneratedAt { get; init; }
[JsonPropertyName("verdict")]
public string Verdict { get; init; } = string.Empty;
[JsonPropertyName("policy")]
public ReportPolicy Policy { get; init; } = new();
[JsonPropertyName("summary")]
public ReportSummary Summary { get; init; } = new();
}
private sealed record ReportPolicy
{
[JsonPropertyName("revisionId")]
public string? RevisionId { get; init; }
[JsonPropertyName("digest")]
public string? Digest { get; init; }
}
private sealed record ReportSummary
{
[JsonPropertyName("total")]
public int Total { get; init; }
[JsonPropertyName("blocked")]
public int Blocked { get; init; }
[JsonPropertyName("warned")]
public int Warned { get; init; }
[JsonPropertyName("ignored")]
public int Ignored { get; init; }
[JsonPropertyName("quieted")]
public int Quieted { get; init; }
}
private sealed record DsseEnvelope
{
[JsonPropertyName("payloadType")]
public string PayloadType { get; init; } = string.Empty;
[JsonPropertyName("payload")]
public string Payload { get; init; } = string.Empty;
[JsonPropertyName("signatures")]
public IReadOnlyList<DsseSignature> Signatures { get; init; } = Array.Empty<DsseSignature>();
}
private sealed record DsseSignature
{
[JsonPropertyName("keyId")]
public string KeyId { get; init; } = string.Empty;
[JsonPropertyName("algorithm")]
public string Algorithm { get; init; } = string.Empty;
[JsonPropertyName("signature")]
public string Signature { get; init; } = string.Empty;
}
private sealed record ScanSubmitRequest(ScanTargetRequest Image);
private sealed record ScanTargetRequest(string? Reference, string? Digest);
}

View File

@@ -0,0 +1,145 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using StellaOps.Scheduler.Queue;
using StellaOps.Scheduler.Worker.Options;
namespace StellaOps.Scheduler.Worker.Execution;
internal sealed class RunnerBackgroundService : BackgroundService
{
private readonly ISchedulerRunnerQueue _runnerQueue;
private readonly IRunnerExecutionService _executionService;
private readonly SchedulerWorkerOptions _options;
private readonly ILogger<RunnerBackgroundService> _logger;
public RunnerBackgroundService(
ISchedulerRunnerQueue runnerQueue,
IRunnerExecutionService executionService,
SchedulerWorkerOptions options,
ILogger<RunnerBackgroundService> logger)
{
_runnerQueue = runnerQueue ?? throw new ArgumentNullException(nameof(runnerQueue));
_executionService = executionService ?? throw new ArgumentNullException(nameof(executionService));
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var execOptions = _options.Runner.Execution;
var leaseRequest = new SchedulerQueueLeaseRequest(execOptions.ConsumerName, execOptions.BatchSize, execOptions.LeaseDuration);
_logger.LogInformation("Runner execution loop started with consumer {Consumer}.", execOptions.ConsumerName);
while (!stoppingToken.IsCancellationRequested)
{
IReadOnlyList<ISchedulerQueueLease<RunnerSegmentQueueMessage>> leases;
try
{
leases = await _runnerQueue.LeaseAsync(leaseRequest, stoppingToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Runner execution failed to lease segments; backing off.");
await DelayAsync(execOptions.IdleDelay, stoppingToken).ConfigureAwait(false);
continue;
}
if (leases.Count == 0)
{
await DelayAsync(execOptions.IdleDelay, stoppingToken).ConfigureAwait(false);
continue;
}
foreach (var lease in leases)
{
await ProcessLeaseAsync(lease, execOptions.LeaseDuration, stoppingToken).ConfigureAwait(false);
}
}
_logger.LogInformation("Runner execution loop stopping.");
}
private async Task ProcessLeaseAsync(
ISchedulerQueueLease<RunnerSegmentQueueMessage> lease,
TimeSpan leaseDuration,
CancellationToken cancellationToken)
{
try
{
var result = await _executionService.ExecuteAsync(lease.Message, cancellationToken).ConfigureAwait(false);
await lease.AcknowledgeAsync(cancellationToken).ConfigureAwait(false);
_logger.LogDebug(
"Runner segment {SegmentId} processed; status={Status} processed={Processed} deltaImages={DeltaImages} runCompleted={RunCompleted}.",
lease.SegmentId,
result.Status,
result.ProcessedImages,
result.DeltaImages,
result.RunCompleted);
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
_logger.LogWarning(
ex,
"Runner segment {SegmentId} failed on attempt {Attempt}; releasing for retry.",
lease.SegmentId,
lease.Attempt);
try
{
await lease.ReleaseAsync(SchedulerQueueReleaseDisposition.Retry, cancellationToken).ConfigureAwait(false);
}
catch (Exception releaseEx) when (releaseEx is not OperationCanceledException)
{
_logger.LogError(
releaseEx,
"Failed to release runner segment {SegmentId}; attempting lease renewal.",
lease.SegmentId);
try
{
await lease.RenewAsync(leaseDuration, cancellationToken).ConfigureAwait(false);
}
catch (Exception renewEx) when (renewEx is not OperationCanceledException)
{
_logger.LogCritical(
renewEx,
"Unable to renew runner segment {SegmentId}; acknowledging to avoid tight failure loop.",
lease.SegmentId);
await lease.AcknowledgeAsync(cancellationToken).ConfigureAwait(false);
}
}
}
}
private static async Task DelayAsync(TimeSpan delay, CancellationToken cancellationToken)
{
if (delay <= TimeSpan.Zero)
{
return;
}
try
{
await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
}
}

View File

@@ -0,0 +1,376 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using StellaOps.Scheduler.Models;
using StellaOps.Scheduler.Queue;
using StellaOps.Scheduler.Storage.Mongo.Repositories;
using StellaOps.Scheduler.Storage.Mongo.Services;
using StellaOps.Scheduler.Worker.Events;
using StellaOps.Scheduler.Worker.Observability;
namespace StellaOps.Scheduler.Worker.Execution;
public interface IRunnerExecutionService
{
Task<RunnerSegmentExecutionResult> ExecuteAsync(RunnerSegmentQueueMessage message, CancellationToken cancellationToken = default);
}
internal sealed class RunnerExecutionService : IRunnerExecutionService
{
private static readonly IReadOnlyDictionary<string, ImpactImage> EmptyImpactLookup =
new Dictionary<string, ImpactImage>(0, StringComparer.Ordinal);
private readonly IRunRepository _runRepository;
private readonly IRunSummaryService _runSummaryService;
private readonly IImpactSnapshotRepository _impactSnapshotRepository;
private readonly IScannerReportClient _scannerClient;
private readonly ISchedulerEventPublisher _eventPublisher;
private readonly SchedulerWorkerMetrics _metrics;
private readonly TimeProvider _timeProvider;
private readonly ILogger<RunnerExecutionService> _logger;
public RunnerExecutionService(
IRunRepository runRepository,
IRunSummaryService runSummaryService,
IImpactSnapshotRepository impactSnapshotRepository,
IScannerReportClient scannerClient,
ISchedulerEventPublisher eventPublisher,
SchedulerWorkerMetrics metrics,
TimeProvider? timeProvider,
ILogger<RunnerExecutionService> logger)
{
_runRepository = runRepository ?? throw new ArgumentNullException(nameof(runRepository));
_runSummaryService = runSummaryService ?? throw new ArgumentNullException(nameof(runSummaryService));
_impactSnapshotRepository = impactSnapshotRepository ?? throw new ArgumentNullException(nameof(impactSnapshotRepository));
_scannerClient = scannerClient ?? throw new ArgumentNullException(nameof(scannerClient));
_eventPublisher = eventPublisher ?? throw new ArgumentNullException(nameof(eventPublisher));
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
_timeProvider = timeProvider ?? TimeProvider.System;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<RunnerSegmentExecutionResult> ExecuteAsync(
RunnerSegmentQueueMessage message,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(message);
cancellationToken.ThrowIfCancellationRequested();
var scheduleMode = ResolveScheduleMode(message.Attributes);
var modeLabel = scheduleMode.ToString().ToLowerInvariant();
var run = await _runRepository
.GetAsync(message.TenantId, message.RunId, cancellationToken: cancellationToken)
.ConfigureAwait(false);
if (run is null)
{
_logger.LogWarning(
"Runner segment {SegmentId} references missing run {RunId} for tenant {TenantId}.",
message.SegmentId,
message.RunId,
message.TenantId);
_metrics.RecordRunnerSegment(modeLabel, RunnerSegmentExecutionStatus.RunMissing.ToString(), 0, 0);
return RunnerSegmentExecutionResult.RunMissing(message.RunId);
}
var now = _timeProvider.GetUtcNow();
var accumulator = new StatsAccumulator(run.Stats);
var impactLookup = await LoadImpactLookupAsync(run, message, cancellationToken).ConfigureAwait(false);
var deltaSummaries = new List<DeltaSummary>();
var imageContexts = new List<ImageExecutionContext>(message.ImageDigests.Count);
foreach (var digest in message.ImageDigests)
{
impactLookup.TryGetValue(digest, out var impactImage);
var request = new ScannerReportRequest(
message.TenantId,
message.RunId,
digest,
scheduleMode,
message.UsageOnly,
message.Attributes);
RunnerImageResult result;
try
{
result = await _scannerClient.ExecuteAsync(request, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
_logger.LogWarning(
ex,
"Scanner execution failed for run {RunId} digest {Digest}; propagating failure for retry.",
message.RunId,
digest);
throw;
}
accumulator.Record(result);
imageContexts.Add(new ImageExecutionContext(result, impactImage));
if (result.Delta is { } delta && HasMeaningfulDelta(delta))
{
deltaSummaries.Add(delta);
}
}
var updatedStats = accumulator.Build();
var startedAt = run.StartedAt ?? now;
var completed = updatedStats.Completed >= updatedStats.Queued && updatedStats.Queued > 0;
var finishedAt = completed ? now : run.FinishedAt;
var newState = completed ? RunState.Completed : RunState.Running;
var deltas = run.Deltas.ToList();
deltas.AddRange(deltaSummaries);
var updatedRun = new Run(
run.Id,
run.TenantId,
run.Trigger,
newState,
updatedStats,
run.CreatedAt,
run.Reason,
run.ScheduleId,
startedAt,
finishedAt,
error: null,
deltas,
run.SchemaVersion);
var persisted = await _runRepository.UpdateAsync(updatedRun, cancellationToken: cancellationToken).ConfigureAwait(false);
if (!persisted)
{
_logger.LogWarning("Failed to persist run {RunId} after processing runner segment {SegmentId}.", run.Id, message.SegmentId);
}
if (persisted && !string.IsNullOrWhiteSpace(updatedRun.ScheduleId))
{
try
{
await _runSummaryService.ProjectAsync(updatedRun, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
_logger.LogWarning(ex, "Failed to project run summary for run {RunId}.", run.Id);
}
}
var segmentStatus = persisted ? RunnerSegmentExecutionStatus.Completed.ToString() : "persist_failed";
_metrics.RecordRunnerSegment(modeLabel, segmentStatus, accumulator.Processed, accumulator.DeltaImages);
if (deltaSummaries.Count > 0)
{
_metrics.RecordDeltaSummaries(modeLabel, deltaSummaries);
}
var remaining = Math.Max(updatedRun.Stats.Queued - updatedRun.Stats.Completed, 0);
_metrics.UpdateBacklog(modeLabel, updatedRun.ScheduleId, remaining);
if (completed && persisted)
{
var duration = (updatedRun.FinishedAt ?? now) - (updatedRun.StartedAt ?? updatedRun.CreatedAt);
_metrics.RecordRunCompletion(modeLabel, "completed", duration);
}
if (persisted)
{
foreach (var context in imageContexts)
{
await _eventPublisher.PublishReportReadyAsync(
updatedRun,
message,
context.Result,
context.ImpactImage,
cancellationToken)
.ConfigureAwait(false);
}
if (deltaSummaries.Count > 0)
{
await _eventPublisher.PublishRescanDeltaAsync(
updatedRun,
message,
deltaSummaries,
impactLookup,
cancellationToken)
.ConfigureAwait(false);
}
}
return RunnerSegmentExecutionResult.Success(
updatedRun,
accumulator.Processed,
accumulator.DeltaImages,
completed,
deltaSummaries);
}
private async Task<IReadOnlyDictionary<string, ImpactImage>> LoadImpactLookupAsync(
Run run,
RunnerSegmentQueueMessage message,
CancellationToken cancellationToken)
{
var snapshotId = ResolveSnapshotId(run, message);
if (string.IsNullOrWhiteSpace(snapshotId))
{
return EmptyImpactLookup;
}
try
{
var snapshot = await _impactSnapshotRepository
.GetBySnapshotIdAsync(snapshotId, cancellationToken: cancellationToken)
.ConfigureAwait(false);
if (snapshot?.Images.Length > 0)
{
var map = new Dictionary<string, ImpactImage>(snapshot.Images.Length, StringComparer.Ordinal);
foreach (var image in snapshot.Images)
{
if (!map.ContainsKey(image.ImageDigest))
{
map[image.ImageDigest] = image;
}
}
return map;
}
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
_logger.LogWarning(
ex,
"Failed to load impact snapshot {SnapshotId} for run {RunId}.",
snapshotId,
run.Id);
}
return EmptyImpactLookup;
}
private static string? ResolveSnapshotId(Run run, RunnerSegmentQueueMessage message)
{
if (message.Attributes.TryGetValue("impactSnapshotId", out var snapshotId) &&
!string.IsNullOrWhiteSpace(snapshotId))
{
return snapshotId.Trim();
}
return string.IsNullOrWhiteSpace(run.Id) ? null : $"impact::{run.Id}";
}
private static ScheduleMode ResolveScheduleMode(IReadOnlyDictionary<string, string> attributes)
{
if (attributes.TryGetValue("scheduleMode", out var mode) &&
Enum.TryParse<ScheduleMode>(mode, ignoreCase: true, out var parsed))
{
return parsed;
}
return ScheduleMode.AnalysisOnly;
}
private readonly record struct ImageExecutionContext(
RunnerImageResult Result,
ImpactImage? ImpactImage);
private static bool HasMeaningfulDelta(DeltaSummary delta)
{
if (delta is null)
{
return false;
}
if (delta.NewFindings > 0 ||
delta.NewCriticals > 0 ||
delta.NewHigh > 0 ||
delta.NewMedium > 0 ||
delta.NewLow > 0)
{
return true;
}
return !delta.KevHits.IsDefaultOrEmpty;
}
private sealed class StatsAccumulator
{
private readonly RunStats _baseStats;
public StatsAccumulator(RunStats baseStats)
{
_baseStats = baseStats ?? RunStats.Empty;
}
public int Processed { get; private set; }
public int DeltaImages { get; private set; }
private int _newCriticals;
private int _newHigh;
private int _newMedium;
private int _newLow;
public void Record(RunnerImageResult result)
{
Processed++;
if (result.Delta is { } delta && HasMeaningfulDelta(delta))
{
DeltaImages++;
_newCriticals += delta.NewCriticals;
_newHigh += delta.NewHigh;
_newMedium += delta.NewMedium;
_newLow += delta.NewLow;
}
}
public RunStats Build()
{
return new RunStats(
_baseStats.Candidates,
_baseStats.Deduped,
_baseStats.Queued,
_baseStats.Completed + Processed,
_baseStats.Deltas + DeltaImages,
_baseStats.NewCriticals + _newCriticals,
_baseStats.NewHigh + _newHigh,
_baseStats.NewMedium + _newMedium,
_baseStats.NewLow + _newLow);
}
}
}
public sealed record RunnerSegmentExecutionResult(
RunnerSegmentExecutionStatus Status,
Run? UpdatedRun,
int ProcessedImages,
int DeltaImages,
bool RunCompleted,
IReadOnlyList<DeltaSummary> DeltaSummaries)
{
public static RunnerSegmentExecutionResult Success(Run updatedRun, int processedImages, int deltaImages, bool runCompleted, IReadOnlyList<DeltaSummary> deltas)
=> new(RunnerSegmentExecutionStatus.Completed, updatedRun, processedImages, deltaImages, runCompleted, deltas);
public static RunnerSegmentExecutionResult RunMissing(string runId)
=> new(RunnerSegmentExecutionStatus.RunMissing, null, 0, 0, false, Array.Empty<DeltaSummary>());
}
public enum RunnerSegmentExecutionStatus
{
Completed,
RunMissing
}

View File

@@ -0,0 +1,52 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using StellaOps.Scheduler.Models;
namespace StellaOps.Scheduler.Worker.Execution;
public interface IScannerReportClient
{
Task<RunnerImageResult> ExecuteAsync(ScannerReportRequest request, CancellationToken cancellationToken = default);
}
public sealed record ScannerReportRequest(
string TenantId,
string RunId,
string ImageDigest,
ScheduleMode Mode,
bool UsageOnly,
IReadOnlyDictionary<string, string> Attributes);
public sealed record RunnerImageResult(
string ImageDigest,
DeltaSummary? Delta,
bool ContentRefreshed,
RunnerReportSnapshot Report,
RunnerDsseEnvelope? Dsse);
public sealed record RunnerReportSnapshot(
string ReportId,
string ImageDigest,
string Verdict,
DateTimeOffset GeneratedAt,
RunnerReportSummary Summary,
string? PolicyRevisionId,
string? PolicyDigest);
public sealed record RunnerReportSummary(
int Total,
int Blocked,
int Warned,
int Ignored,
int Quieted);
public sealed record RunnerDsseEnvelope(
string PayloadType,
string Payload,
IReadOnlyList<RunnerDsseSignature> Signatures);
public sealed record RunnerDsseSignature(
string KeyId,
string Algorithm,
string Signature);

View File

@@ -0,0 +1,210 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.Metrics;
using StellaOps.Scheduler.Models;
namespace StellaOps.Scheduler.Worker.Observability;
public sealed class SchedulerWorkerMetrics : IDisposable
{
public const string MeterName = "StellaOps.Scheduler.Worker";
private readonly Meter _meter;
private readonly Counter<long> _plannerRunsTotal;
private readonly Histogram<double> _plannerLatencySeconds;
private readonly Counter<long> _runnerSegmentsTotal;
private readonly Counter<long> _runnerImagesTotal;
private readonly Counter<long> _runnerDeltaCriticalTotal;
private readonly Counter<long> _runnerDeltaHighTotal;
private readonly Counter<long> _runnerDeltaFindingsTotal;
private readonly Counter<long> _runnerKevHitsTotal;
private readonly Histogram<double> _runDurationSeconds;
private readonly UpDownCounter<long> _runsActive;
private readonly ConcurrentDictionary<string, long> _backlog = new(StringComparer.Ordinal);
private readonly ObservableGauge<long> _backlogGauge;
private bool _disposed;
public SchedulerWorkerMetrics()
{
_meter = new Meter(MeterName);
_plannerRunsTotal = _meter.CreateCounter<long>(
"scheduler_planner_runs_total",
unit: "count",
description: "Planner runs grouped by status and mode.");
_plannerLatencySeconds = _meter.CreateHistogram<double>(
"scheduler_planner_latency_seconds",
unit: "s",
description: "Latency between run creation and planner processing grouped by mode and status.");
_runnerSegmentsTotal = _meter.CreateCounter<long>(
"scheduler_runner_segments_total",
unit: "count",
description: "Runner segments processed grouped by status and mode.");
_runnerImagesTotal = _meter.CreateCounter<long>(
"scheduler_runner_images_total",
unit: "count",
description: "Images processed by runner grouped by mode and delta outcome.");
_runnerDeltaCriticalTotal = _meter.CreateCounter<long>(
"scheduler_runner_delta_critical_total",
unit: "count",
description: "Critical findings observed by runner grouped by mode.");
_runnerDeltaHighTotal = _meter.CreateCounter<long>(
"scheduler_runner_delta_high_total",
unit: "count",
description: "High findings observed by runner grouped by mode.");
_runnerDeltaFindingsTotal = _meter.CreateCounter<long>(
"scheduler_runner_delta_total",
unit: "count",
description: "Total findings observed by runner grouped by mode.");
_runnerKevHitsTotal = _meter.CreateCounter<long>(
"scheduler_runner_delta_kev_total",
unit: "count",
description: "KEV hits observed by runner grouped by mode.");
_runDurationSeconds = _meter.CreateHistogram<double>(
"scheduler_run_duration_seconds",
unit: "s",
description: "End-to-end run durations grouped by mode and result.");
_runsActive = _meter.CreateUpDownCounter<long>(
"scheduler_runs_active",
unit: "count",
description: "Active scheduler runs grouped by mode.");
_backlogGauge = _meter.CreateObservableGauge<long>(
"scheduler_runner_backlog",
ObserveBacklog,
unit: "images",
description: "Remaining images queued for runner processing grouped by mode and schedule.");
}
public void RecordPlannerResult(string mode, string status, TimeSpan latency, int imageCount)
{
var tags = new[]
{
new KeyValuePair<string, object?>("mode", mode),
new KeyValuePair<string, object?>("status", status)
};
_plannerRunsTotal.Add(1, tags);
_plannerLatencySeconds.Record(Math.Max(latency.TotalSeconds, 0d), tags);
if (status.Equals("enqueued", StringComparison.OrdinalIgnoreCase) && imageCount > 0)
{
_runsActive.Add(1, new[] { new KeyValuePair<string, object?>("mode", mode) });
}
}
public void RecordRunnerSegment(string mode, string status, int processedImages, int deltaImages)
{
var tags = new[]
{
new KeyValuePair<string, object?>("mode", mode),
new KeyValuePair<string, object?>("status", status)
};
_runnerSegmentsTotal.Add(1, tags);
var imageTags = new[]
{
new KeyValuePair<string, object?>("mode", mode),
new KeyValuePair<string, object?>("delta", deltaImages > 0 ? "true" : "false")
};
_runnerImagesTotal.Add(processedImages, imageTags);
}
public void RecordDeltaSummaries(string mode, IReadOnlyList<DeltaSummary> deltas)
{
if (deltas.Count == 0)
{
return;
}
var tags = new[] { new KeyValuePair<string, object?>("mode", mode) };
foreach (var delta in deltas)
{
if (delta.NewCriticals > 0)
{
_runnerDeltaCriticalTotal.Add(delta.NewCriticals, tags);
}
if (delta.NewHigh > 0)
{
_runnerDeltaHighTotal.Add(delta.NewHigh, tags);
}
if (delta.NewFindings > 0)
{
_runnerDeltaFindingsTotal.Add(delta.NewFindings, tags);
}
if (!delta.KevHits.IsDefaultOrEmpty)
{
_runnerKevHitsTotal.Add(delta.KevHits.Length, tags);
}
}
}
public void RecordRunCompletion(string mode, string result, TimeSpan? duration, bool decrementActive = true)
{
var tags = new[]
{
new KeyValuePair<string, object?>("mode", mode),
new KeyValuePair<string, object?>("result", result)
};
if (duration is { } runDuration)
{
_runDurationSeconds.Record(Math.Max(runDuration.TotalSeconds, 0d), tags);
}
if (decrementActive)
{
_runsActive.Add(-1, new[] { new KeyValuePair<string, object?>("mode", mode) });
}
}
public void UpdateBacklog(string mode, string? scheduleId, long backlog)
{
var key = BuildBacklogKey(mode, scheduleId);
if (backlog <= 0)
{
_backlog.TryRemove(key, out _);
}
else
{
_backlog[key] = backlog;
}
}
private IEnumerable<Measurement<long>> ObserveBacklog()
{
foreach (var entry in _backlog)
{
var (mode, scheduleId) = SplitBacklogKey(entry.Key);
yield return new Measurement<long>(
entry.Value,
new KeyValuePair<string, object?>("mode", mode),
new KeyValuePair<string, object?>("scheduleId", scheduleId ?? string.Empty));
}
}
private static string BuildBacklogKey(string mode, string? scheduleId)
=> $"{mode}|{scheduleId ?? string.Empty}";
private static (string Mode, string? ScheduleId) SplitBacklogKey(string key)
{
var parts = key.Split('|', 2);
return parts.Length == 2
? (parts[0], string.IsNullOrEmpty(parts[1]) ? null : parts[1])
: (key, null);
}
public void Dispose()
{
if (_disposed)
{
return;
}
_meter.Dispose();
_disposed = true;
}
}

View File

@@ -9,9 +9,12 @@ public sealed class SchedulerWorkerOptions
{
public PlannerOptions Planner { get; set; } = new();
public RunnerOptions Runner { get; set; } = new();
public void Validate()
{
Planner.Validate();
Runner.Validate();
}
public sealed class PlannerOptions
@@ -79,4 +82,188 @@ public sealed class SchedulerWorkerOptions
}
}
}
public sealed class RunnerOptions
{
public DispatchOptions Dispatch { get; set; } = new();
public ExecutionOptions Execution { get; set; } = new();
public ScannerOptions Scanner { get; set; } = new();
public void Validate()
{
Dispatch.Validate();
Execution.Validate();
Scanner.Validate();
}
public sealed class DispatchOptions
{
/// <summary>
/// Consumer name used when leasing planner queue messages to dispatch runner segments.
/// </summary>
public string ConsumerName { get; set; } = "scheduler-runner-dispatch";
/// <summary>
/// Maximum number of planner messages claimed per lease.
/// </summary>
public int BatchSize { get; set; } = 5;
/// <summary>
/// Duration of the lease held while dispatching runner segments.
/// </summary>
public TimeSpan LeaseDuration { get; set; } = TimeSpan.FromMinutes(5);
/// <summary>
/// Delay applied between polls when no planner messages are available.
/// </summary>
public TimeSpan IdleDelay { get; set; } = TimeSpan.FromSeconds(10);
public void Validate()
{
if (string.IsNullOrWhiteSpace(ConsumerName))
{
throw new InvalidOperationException("Runner dispatch consumer name must be configured.");
}
if (BatchSize <= 0)
{
throw new InvalidOperationException("Runner dispatch batch size must be greater than zero.");
}
if (LeaseDuration <= TimeSpan.Zero)
{
throw new InvalidOperationException("Runner dispatch lease duration must be greater than zero.");
}
if (IdleDelay < TimeSpan.Zero)
{
throw new InvalidOperationException("Runner dispatch idle delay cannot be negative.");
}
}
}
public sealed class ExecutionOptions
{
/// <summary>
/// Consumer name used when leasing runner segment messages.
/// </summary>
public string ConsumerName { get; set; } = "scheduler-runner";
/// <summary>
/// Maximum number of runner segments leased per poll.
/// </summary>
public int BatchSize { get; set; } = 5;
/// <summary>
/// Lease duration granted while processing a runner segment.
/// </summary>
public TimeSpan LeaseDuration { get; set; } = TimeSpan.FromMinutes(5);
/// <summary>
/// Delay applied between polls when no runner segments are available.
/// </summary>
public TimeSpan IdleDelay { get; set; } = TimeSpan.FromSeconds(5);
/// <summary>
/// Maximum number of runner segments processed concurrently.
/// </summary>
public int MaxConcurrentSegments { get; set; } = Environment.ProcessorCount;
/// <summary>
/// Timeout applied to scanner requests per image digest.
/// </summary>
public TimeSpan ReportTimeout { get; set; } = TimeSpan.FromSeconds(60);
public void Validate()
{
if (string.IsNullOrWhiteSpace(ConsumerName))
{
throw new InvalidOperationException("Runner execution consumer name must be configured.");
}
if (BatchSize <= 0)
{
throw new InvalidOperationException("Runner execution batch size must be greater than zero.");
}
if (LeaseDuration <= TimeSpan.Zero)
{
throw new InvalidOperationException("Runner execution lease duration must be greater than zero.");
}
if (IdleDelay < TimeSpan.Zero)
{
throw new InvalidOperationException("Runner execution idle delay cannot be negative.");
}
if (MaxConcurrentSegments <= 0)
{
throw new InvalidOperationException("Runner execution max concurrent segments must be greater than zero.");
}
if (ReportTimeout <= TimeSpan.Zero)
{
throw new InvalidOperationException("Runner execution report timeout must be greater than zero.");
}
}
}
public sealed class ScannerOptions
{
/// <summary>
/// Base address for Scanner WebService API calls.
/// </summary>
public Uri? BaseAddress { get; set; }
/// <summary>
/// Relative path to the reports endpoint.
/// </summary>
public string ReportsPath { get; set; } = "/api/v1/reports";
/// <summary>
/// Relative path to the scans endpoint (content refresh).
/// </summary>
public string ScansPath { get; set; } = "/api/v1/scans";
/// <summary>
/// Whether runner should attempt content refresh before requesting report in content refresh mode.
/// </summary>
public bool EnableContentRefresh { get; set; } = true;
/// <summary>
/// Maximum number of scanner retries for transient failures.
/// </summary>
public int MaxRetryAttempts { get; set; } = 3;
/// <summary>
/// Base delay applied between retries for transient failures.
/// </summary>
public TimeSpan RetryBaseDelay { get; set; } = TimeSpan.FromSeconds(2);
public void Validate()
{
if (string.IsNullOrWhiteSpace(ReportsPath))
{
throw new InvalidOperationException("Runner scanner reports path must be configured.");
}
if (string.IsNullOrWhiteSpace(ScansPath))
{
throw new InvalidOperationException("Runner scanner scans path must be configured.");
}
if (MaxRetryAttempts < 0)
{
throw new InvalidOperationException("Runner scanner retry attempts cannot be negative.");
}
if (RetryBaseDelay < TimeSpan.Zero)
{
throw new InvalidOperationException("Runner scanner retry delay cannot be negative.");
}
}
}
}
}

View File

@@ -5,6 +5,7 @@ using StellaOps.Scheduler.Queue;
using StellaOps.Scheduler.Storage.Mongo.Repositories;
using StellaOps.Scheduler.Storage.Mongo.Services;
using StellaOps.Scheduler.Worker.Options;
using StellaOps.Scheduler.Worker.Observability;
namespace StellaOps.Scheduler.Worker.Planning;
@@ -18,6 +19,7 @@ internal sealed class PlannerExecutionService
private readonly ISchedulerPlannerQueue _plannerQueue;
private readonly SchedulerWorkerOptions _options;
private readonly TimeProvider _timeProvider;
private readonly SchedulerWorkerMetrics _metrics;
private readonly ILogger<PlannerExecutionService> _logger;
public PlannerExecutionService(
@@ -29,6 +31,7 @@ internal sealed class PlannerExecutionService
ISchedulerPlannerQueue plannerQueue,
SchedulerWorkerOptions options,
TimeProvider? timeProvider,
SchedulerWorkerMetrics metrics,
ILogger<PlannerExecutionService> logger)
{
_scheduleRepository = scheduleRepository ?? throw new ArgumentNullException(nameof(scheduleRepository));
@@ -39,6 +42,7 @@ internal sealed class PlannerExecutionService
_plannerQueue = plannerQueue ?? throw new ArgumentNullException(nameof(plannerQueue));
_options = options ?? throw new ArgumentNullException(nameof(options));
_timeProvider = timeProvider ?? TimeProvider.System;
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
@@ -52,6 +56,9 @@ internal sealed class PlannerExecutionService
return new PlannerExecutionResult(PlannerExecutionStatus.Skipped, run);
}
var plannerStartedAt = _timeProvider.GetUtcNow();
var plannerLatency = plannerStartedAt - run.CreatedAt;
if (string.IsNullOrWhiteSpace(run.ScheduleId))
{
_logger.LogWarning("Run {RunId} has no scheduleId; marking as failed.", run.Id);
@@ -63,6 +70,7 @@ internal sealed class PlannerExecutionService
};
await PersistRunAsync(failed, cancellationToken).ConfigureAwait(false);
_metrics.RecordPlannerResult("unknown", "failed", plannerLatency, 0);
return new PlannerExecutionResult(
PlannerExecutionStatus.Failed,
failed,
@@ -92,6 +100,7 @@ internal sealed class PlannerExecutionService
};
await PersistRunAsync(failed, cancellationToken).ConfigureAwait(false);
_metrics.RecordPlannerResult("unknown", "failed", plannerLatency, 0);
return new PlannerExecutionResult(
PlannerExecutionStatus.Failed,
failed,
@@ -113,6 +122,7 @@ internal sealed class PlannerExecutionService
}
var usageOnly = schedule.Mode != ScheduleMode.ContentRefresh;
var modeLabel = schedule.Mode.ToString().ToLowerInvariant();
ImpactSet impactSet;
try
@@ -133,6 +143,7 @@ internal sealed class PlannerExecutionService
};
await PersistRunAsync(failed, cancellationToken).ConfigureAwait(false);
_metrics.RecordPlannerResult(modeLabel, "failed", plannerLatency, 0);
return new PlannerExecutionResult(
PlannerExecutionStatus.Failed,
failed,
@@ -176,6 +187,9 @@ internal sealed class PlannerExecutionService
await PersistRunAsync(completed, cancellationToken).ConfigureAwait(false);
_logger.LogInformation("Run {RunId} produced no impacted images; marking Completed.", run.Id);
_metrics.RecordPlannerResult(modeLabel, "no_work", plannerLatency, 0);
_metrics.UpdateBacklog(modeLabel, run.ScheduleId, 0);
_metrics.RecordRunCompletion(modeLabel, "completed", TimeSpan.Zero, decrementActive: false);
return new PlannerExecutionResult(
PlannerExecutionStatus.CompletedWithoutWork,
completed,
@@ -212,6 +226,8 @@ internal sealed class PlannerExecutionService
snapshot.Images.Length,
run.TenantId,
schedule.Id);
_metrics.RecordPlannerResult(modeLabel, "enqueued", plannerLatency, snapshot.Images.Length);
_metrics.UpdateBacklog(modeLabel, run.ScheduleId, snapshot.Images.Length);
return new PlannerExecutionResult(
PlannerExecutionStatus.Enqueued,

View File

@@ -0,0 +1,212 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Extensions.Logging;
using StellaOps.Scheduler.Models;
using StellaOps.Scheduler.Queue;
using StellaOps.Scheduler.Worker.Options;
namespace StellaOps.Scheduler.Worker.Planning;
public interface IPlannerQueueDispatchService
{
Task<PlannerQueueDispatchResult> DispatchAsync(PlannerQueueMessage message, CancellationToken cancellationToken = default);
}
internal sealed class PlannerQueueDispatchService : IPlannerQueueDispatchService
{
private readonly IImpactShardPlanner _shardPlanner;
private readonly ISchedulerRunnerQueue _runnerQueue;
private readonly SchedulerWorkerOptions _options;
private readonly ILogger<PlannerQueueDispatchService> _logger;
public PlannerQueueDispatchService(
IImpactShardPlanner shardPlanner,
ISchedulerRunnerQueue runnerQueue,
SchedulerWorkerOptions options,
ILogger<PlannerQueueDispatchService> logger)
{
_shardPlanner = shardPlanner ?? throw new ArgumentNullException(nameof(shardPlanner));
_runnerQueue = runnerQueue ?? throw new ArgumentNullException(nameof(runnerQueue));
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<PlannerQueueDispatchResult> DispatchAsync(
PlannerQueueMessage message,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(message);
cancellationToken.ThrowIfCancellationRequested();
var run = message.Run;
if (run is null)
{
throw new InvalidOperationException("Planner queue message did not include a run payload.");
}
var impactSet = message.ImpactSet ?? throw new InvalidOperationException("Planner queue message did not include an impact set.");
if (impactSet.Images.Length == 0)
{
_logger.LogDebug("Skipping dispatch for run {RunId} because impact set is empty.", run.Id);
return PlannerQueueDispatchResult.NoWork(run.Id);
}
var schedule = message.Schedule;
var limits = schedule?.Limits ?? ScheduleLimits.Default;
var shards = _shardPlanner.PlanShards(impactSet, limits.MaxJobs, limits.Parallelism);
if (shards.Length == 0)
{
_logger.LogDebug(
"Planner dispatch produced no shards for run {RunId}; maxJobs={MaxJobs} parallelism={Parallelism}.",
run.Id,
limits.MaxJobs,
limits.Parallelism);
return PlannerQueueDispatchResult.NoWork(run.Id);
}
var shardCount = shards.Length;
var enqueueTasks = new List<Task>(shardCount);
var attributes = BuildCommonAttributes(run, impactSet, shardCount, schedule);
foreach (var shard in shards)
{
var segmentId = $"{run.Id}:{shard.Index:D4}";
var digests = shard.Images.Select(static image => image.ImageDigest).ToArray();
if (digests.Length == 0)
{
continue;
}
var segmentAttributes = MergeAttributes(attributes, shard, schedule);
var runnerMessage = new RunnerSegmentQueueMessage(
segmentId,
run.Id,
run.TenantId,
digests,
run.ScheduleId,
limits.RatePerSecond,
impactSet.UsageOnly,
segmentAttributes,
message.CorrelationId);
enqueueTasks.Add(_runnerQueue.EnqueueAsync(runnerMessage, cancellationToken).AsTask());
}
if (enqueueTasks.Count == 0)
{
_logger.LogWarning("No runner segments were generated for run {RunId} despite non-empty impact set.", run.Id);
return PlannerQueueDispatchResult.NoWork(run.Id);
}
await Task.WhenAll(enqueueTasks).ConfigureAwait(false);
_logger.LogInformation(
"Run {RunId} dispatched {SegmentCount} runner segments covering {ImageCount} images.",
run.Id,
enqueueTasks.Count,
impactSet.Images.Length);
return PlannerQueueDispatchResult.Success(run.Id, enqueueTasks.Count, impactSet.Images.Length);
}
private static IReadOnlyDictionary<string, string> BuildCommonAttributes(
Run run,
ImpactSet impactSet,
int shardCount,
Schedule? schedule)
{
var map = new Dictionary<string, string>(StringComparer.Ordinal)
{
["runId"] = run.Id,
["tenantId"] = run.TenantId,
["usageOnly"] = impactSet.UsageOnly ? "true" : "false",
["shardCount"] = shardCount.ToString(),
["totalImages"] = impactSet.Images.Length.ToString()
};
if (!string.IsNullOrWhiteSpace(impactSet.SnapshotId))
{
map["impactSnapshotId"] = impactSet.SnapshotId!;
}
if (impactSet.GeneratedAt != default)
{
map["impactGeneratedAt"] = impactSet.GeneratedAt.UtcDateTime.ToString("O");
}
if (!string.IsNullOrWhiteSpace(run.ScheduleId))
{
map["scheduleId"] = run.ScheduleId!;
}
if (schedule is not null)
{
map["scheduleMode"] = schedule.Mode.ToString();
if (!string.IsNullOrWhiteSpace(schedule.Name))
{
map["scheduleName"] = schedule.Name;
}
if (schedule.Limits.RatePerSecond is { } rate && rate > 0)
{
map["ratePerSecond"] = rate.ToString();
}
if (schedule.Limits.Parallelism is { } parallelism && parallelism > 0)
{
map["parallelism"] = parallelism.ToString();
}
if (schedule.Limits.MaxJobs is { } maxJobs && maxJobs > 0)
{
map["maxJobs"] = maxJobs.ToString();
}
}
return map;
}
private static IReadOnlyDictionary<string, string> MergeAttributes(
IReadOnlyDictionary<string, string> common,
ImpactShard shard,
Schedule? schedule)
{
if (shard.Images.Length == 0)
{
return common;
}
var map = new Dictionary<string, string>(common, StringComparer.Ordinal)
{
["shardIndex"] = shard.Index.ToString(),
["shardSize"] = shard.Images.Length.ToString()
};
if (schedule?.Mode == ScheduleMode.ContentRefresh)
{
var entrypointCount = shard.Images.Count(static image => image.UsedByEntrypoint);
map["entrypointCount"] = entrypointCount.ToString();
}
return map;
}
}
public readonly record struct PlannerQueueDispatchResult(
string RunId,
PlannerQueueDispatchStatus Status,
int SegmentCount,
int ImageCount)
{
public static PlannerQueueDispatchResult Success(string runId, int segmentCount, int imageCount)
=> new(runId, PlannerQueueDispatchStatus.DispatchCompleted, segmentCount, imageCount);
public static PlannerQueueDispatchResult NoWork(string runId)
=> new(runId, PlannerQueueDispatchStatus.NoWork, 0, 0);
}
public enum PlannerQueueDispatchStatus
{
NoWork,
DispatchCompleted
}

View File

@@ -0,0 +1,145 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using StellaOps.Scheduler.Queue;
using StellaOps.Scheduler.Worker.Options;
namespace StellaOps.Scheduler.Worker.Planning;
internal sealed class PlannerQueueDispatcherBackgroundService : BackgroundService
{
private readonly ISchedulerPlannerQueue _plannerQueue;
private readonly IPlannerQueueDispatchService _dispatchService;
private readonly SchedulerWorkerOptions _options;
private readonly ILogger<PlannerQueueDispatcherBackgroundService> _logger;
public PlannerQueueDispatcherBackgroundService(
ISchedulerPlannerQueue plannerQueue,
IPlannerQueueDispatchService dispatchService,
SchedulerWorkerOptions options,
ILogger<PlannerQueueDispatcherBackgroundService> logger)
{
_plannerQueue = plannerQueue ?? throw new ArgumentNullException(nameof(plannerQueue));
_dispatchService = dispatchService ?? throw new ArgumentNullException(nameof(dispatchService));
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var dispatchOptions = _options.Runner.Dispatch;
var consumer = dispatchOptions.ConsumerName;
var leaseRequest = new SchedulerQueueLeaseRequest(consumer, dispatchOptions.BatchSize, dispatchOptions.LeaseDuration);
_logger.LogInformation("Planner dispatcher loop started with consumer {Consumer}.", consumer);
while (!stoppingToken.IsCancellationRequested)
{
IReadOnlyList<ISchedulerQueueLease<PlannerQueueMessage>> leases;
try
{
leases = await _plannerQueue.LeaseAsync(leaseRequest, stoppingToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Planner dispatcher failed to lease messages; backing off.");
await DelayAsync(dispatchOptions.IdleDelay, stoppingToken).ConfigureAwait(false);
continue;
}
if (leases.Count == 0)
{
await DelayAsync(dispatchOptions.IdleDelay, stoppingToken).ConfigureAwait(false);
continue;
}
foreach (var lease in leases)
{
await ProcessLeaseAsync(lease, dispatchOptions.LeaseDuration, stoppingToken).ConfigureAwait(false);
}
}
_logger.LogInformation("Planner dispatcher loop stopping.");
}
private async Task ProcessLeaseAsync(
ISchedulerQueueLease<PlannerQueueMessage> lease,
TimeSpan leaseDuration,
CancellationToken cancellationToken)
{
try
{
var result = await _dispatchService.DispatchAsync(lease.Message, cancellationToken).ConfigureAwait(false);
await lease.AcknowledgeAsync(cancellationToken).ConfigureAwait(false);
_logger.LogDebug(
"Dispatched planner message {MessageId} for run {RunId}; status={Status} segments={Segments} images={Images}.",
lease.MessageId,
result.RunId,
result.Status,
result.SegmentCount,
result.ImageCount);
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
_logger.LogError(
ex,
"Planner dispatch failed for message {MessageId} (run {RunId}); releasing for retry.",
lease.MessageId,
lease.RunId);
try
{
await lease.ReleaseAsync(SchedulerQueueReleaseDisposition.Retry, cancellationToken).ConfigureAwait(false);
}
catch (Exception releaseEx) when (releaseEx is not OperationCanceledException)
{
_logger.LogWarning(
releaseEx,
"Failed to release planner message {MessageId}; attempting lease renewal.",
lease.MessageId);
try
{
await lease.RenewAsync(leaseDuration, cancellationToken).ConfigureAwait(false);
}
catch (Exception renewEx) when (renewEx is not OperationCanceledException)
{
_logger.LogError(
renewEx,
"Lease renewal also failed for planner message {MessageId}; acknowledging to avoid tight loop.",
lease.MessageId);
await lease.AcknowledgeAsync(cancellationToken).ConfigureAwait(false);
}
}
}
}
private static async Task DelayAsync(TimeSpan delay, CancellationToken cancellationToken)
{
if (delay <= TimeSpan.Zero)
{
return;
}
try
{
await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
}
}

View File

@@ -9,7 +9,10 @@
<ProjectReference Include="../StellaOps.Scheduler.Models/StellaOps.Scheduler.Models.csproj" />
<ProjectReference Include="../StellaOps.Scheduler.Storage.Mongo/StellaOps.Scheduler.Storage.Mongo.csproj" />
<ProjectReference Include="../StellaOps.Scheduler.Queue/StellaOps.Scheduler.Queue.csproj" />
<ProjectReference Include="../StellaOps.Notify.Models/StellaOps.Notify.Models.csproj" />
<ProjectReference Include="../StellaOps.Notify.Queue/StellaOps.Notify.Queue.csproj" />
<PackageReference Include="Cronos" Version="0.10.0" />
<PackageReference Include="System.Threading.RateLimiting" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Http" Version="10.0.0-rc.2.25502.107" />
</ItemGroup>
</Project>

View File

@@ -4,13 +4,15 @@
|----|--------|----------|------------|-------------|---------------|
| SCHED-WORKER-16-201 | DOING (2025-10-27) | Scheduler Worker Guild | SCHED-QUEUE-16-401 | Planner loop (cron + event triggers) with lease management, fairness, and rate limiting (§6). | Planner integration tests cover cron/event triggers; rate limits enforced; logs include run IDs. |
| SCHED-WORKER-16-202 | DONE (2025-10-27) | Scheduler Worker Guild | SCHED-IMPACT-16-301 | Wire ImpactIndex targeting (ResolveByPurls/vulns), dedupe, shard planning. | Targeting tests confirm correct image selection; dedupe documented; shards evenly distributed. |
| SCHED-WORKER-16-203 | TODO | Scheduler Worker Guild | SCHED-WORKER-16-202 | Runner execution: call Scanner `/reports` (analysis-only) or `/scans` when configured; collect deltas; handle retries. | Runner tests stub Scanner; retries/backoff validated; deltas aggregated deterministically. |
| SCHED-WORKER-16-204 | TODO | Scheduler Worker Guild | SCHED-WORKER-16-203 | Emit events (`scheduler.rescan.delta`, `scanner.report.ready`) for Notify/UI with summaries. | Events published to queue; payload schema documented; integration tests verify consumption. |
| SCHED-WORKER-16-205 | TODO | Scheduler Worker Guild | SCHED-WORKER-16-201 | Metrics/telemetry: run stats, queue depth, planner latency, delta counts. | Metrics exported per spec; dashboards updated; alerts configured. |
| SCHED-WORKER-16-203 | DONE (2025-10-27) | Scheduler Worker Guild | SCHED-WORKER-16-202 | Runner execution: call Scanner `/reports` (analysis-only) or `/scans` when configured; collect deltas; handle retries. | Runner tests stub Scanner; retries/backoff validated; deltas aggregated deterministically. |
| SCHED-WORKER-16-204 | DONE (2025-10-27) | Scheduler Worker Guild | SCHED-WORKER-16-203 | Emit events (`scheduler.rescan.delta`, `scanner.report.ready`) for Notify/UI with summaries. | Events published to queue; payload schema documented; integration tests verify consumption. |
| SCHED-WORKER-16-205 | DONE (2025-10-27) | Scheduler Worker Guild | SCHED-WORKER-16-201 | Metrics/telemetry: run stats, queue depth, planner latency, delta counts. | Metrics exported per spec; dashboards updated; alerts configured. |
> 2025-10-27: Impact targeting sanitizes selector-constrained results, dedupes digests, and documents shard planning in `docs/SCHED-WORKER-16-202-IMPACT-TARGETING.md`.
> 2025-10-27: Planner loop processes Planning runs via PlannerExecutionService; documented in docs/SCHED-WORKER-16-201-PLANNER.md.
> 2025-10-27: Runner dispatcher + execution service documented in docs/SCHED-WORKER-16-203-RUNNER.md; queue pipeline now drives scanner invocations, aggregates deltas back into run stats, and `AddSchedulerWorker` wires the background services into the host.
## Policy Engine v2 (Sprint 20)
| ID | Status | Owner(s) | Depends on | Description | Exit Criteria |

View File

@@ -0,0 +1,54 @@
# SCHED-WORKER-16-203 — Runner Execution Pipeline
_Sprint 16 · Scheduler Worker Guild_
This increment brings the scheduler runner online. The worker now consumes the
planner queue, shards impact sets into deterministic runner segments, executes
them against Scanner, and aggregates deltas back into run state.
## Planner queue dispatch
`PlannerQueueDispatchService` consumes `PlannerQueueMessage` payloads and uses
`ImpactShardPlanner` to slice the associated `ImpactSet`. Each shard yields a
stable `RunnerSegmentQueueMessage`:
- `segmentId` is `{runId}:{shardIndex:D4}` for idempotency.
- Attributes include schedule mode, limits, shard size, and usage hints.
- `RatePerSecond` carries through schedule limits so execution can pace calls.
`PlannerQueueDispatcherBackgroundService` leases planner queue messages, invokes
the dispatch service, and releases messages on failure with retry semantics.
## Runner execution
`RunnerBackgroundService` leases runner segments and hands them to
`RunnerExecutionService`. The execution service:
1. Loads the target `Run` (marking `StartedAt` when first segment processes).
2. Calls `IScannerReportClient` (`HttpScannerReportClient`) for each digest
according to schedule mode (`analysis-only` vs. `content-refresh`) and usage
flag. A light retry/backoff loop shields transient failures.
3. Aggregates output into `DeltaSummary` records, updating cumulative stats
(`Completed`, `Deltas`, severity counters) in a deterministic manner.
4. Persists the updated run and projects schedule summaries when the update
succeeds.
5. Signals completion when cumulative `Completed >= Queued`.
Segment processing is idempotent—the same segment will re-create the same delta
summaries and stat deltas. Failures bubble so the queue retry policy can apply
exponential backoff.
## Tests
- `PlannerQueueDispatchServiceTests` verify shard sizing, attribute emission, and
deterministic `segmentId` construction with schedule limits.
- `RunnerExecutionServiceTests` cover stat aggregation, delta persistence, and
missing-run handling. Scanner interactions are stubbed via `IScannerReportClient`.
## Follow-ups
- `AddSchedulerWorker(configuration)` registers impact targeting, planner
dispatch, runner execution, and the three hosted services. Call it after
`AddSchedulerQueues` and `AddSchedulerMongoStorage` when bootstrapping the
worker host.
- Extend execution metrics (Sprint 16-205) before exposing Prometheus counters.

View File

@@ -0,0 +1,36 @@
# SCHED-WORKER-16-204 — Platform Events
_Sprint 16 · Scheduler Worker Guild_
The runner now emits canonical platform events so Notify/UI can surface
rescan activity in near real time.
## Event emission
- `scheduler.rescan.delta@1` — published once per runner segment when that
segment produced at least one meaningful delta (new critical/high findings or
KEV hits). Payload batches all impacted digests for the segment and includes
severity totals. Reason strings (manual trigger, Feedser/Vexer exports) flow
from the run reason when present.
- `scanner.report.ready@1` — published for every image the runner processes.
The payload mirrors the Scanner contract (verdict, summary buckets, DSSE
envelope) and surfaces delta counts/links when available. Scope information
is derived from the impact snapshot so notify rules can match on registry and
repository.
Events are formatted using `NotifyEvent` envelopes and published via the
configured Notify queue transport. When Notify is not configured the worker
logs once and suppresses event emission.
## Payload references
- Schema source: `docs/events/scheduler.rescan.delta@1.json`
- Schema source: `docs/events/scanner.report.ready@1.json`
- Sample payloads: `docs/events/samples/*.sample.json`
## Metrics tie-in
Event emission complements the new observability counters introduced in
`SCHED-WORKER-16-205` (runner segment totals, delta counts, backlog gauge) so
that operators can correlate queue depth with downstream notifications.

View File

@@ -0,0 +1,43 @@
# SCHED-WORKER-16-205 — Scheduler Worker Observability
_Sprint 16 · Scheduler Worker Guild_
The scheduler worker now exposes first-class metrics covering planner latency,
runner throughput, and backlog health.
## Meter: `StellaOps.Scheduler.Worker`
| Metric | Type | Tags | Description |
| --- | --- | --- | --- |
| `scheduler_planner_runs_total` | Counter | `mode`, `status` | Planner outcomes (`enqueued`, `no_work`, `failed`). |
| `scheduler_planner_latency_seconds` | Histogram | `mode`, `status` | Time between run creation and planner completion. |
| `scheduler_runner_segments_total` | Counter | `mode`, `status` | Runner segments processed (`Completed`, `persist_failed`, `RunMissing`). |
| `scheduler_runner_images_total` | Counter | `mode`, `delta` | Images processed per mode, split by whether a delta was observed. |
| `scheduler_runner_delta_total` | Counter | `mode` | Total new findings observed. |
| `scheduler_runner_delta_critical_total` | Counter | `mode` | Critical findings observed. |
| `scheduler_runner_delta_high_total` | Counter | `mode` | High findings observed. |
| `scheduler_runner_delta_kev_total` | Counter | `mode` | KEV hits surfaced across runner segments. |
| `scheduler_run_duration_seconds` | Histogram | `mode`, `result` | End-to-end run durations (currently recorded for successful completions). |
| `scheduler_runs_active` | Up/down counter | `mode` | Active runs in-flight. |
| `scheduler_runner_backlog` | Observable gauge | `mode`, `scheduleId` | Remaining images awaiting runner processing per schedule. |
## Instrumentation notes
- Planner records latency once a run transitions out of `Planning`. `no_work`
completions emit zero-duration runs without incrementing the active counter.
- Runner updates backlog after every segment and decrements the active counter
when a run reaches `Completed`.
- Delta counters aggregate per severity and KEV hit; they only increment when
`DeltaSummary` reports meaningful changes.
- Metrics are emitted regardless of Notify availability so operators can track
queue pressure even in air-gapped deployments.
## Dashboards & alerts
- **Grafana dashboard:** `docs/ops/scheduler-worker-grafana-dashboard.json`
(import into Prometheus-backed Grafana). Panels mirror the metrics above with
mode filters.
- **Prometheus rules:** `docs/ops/scheduler-worker-prometheus-rules.yaml`
provides planner failure/latency, backlog, and stuck-run alerts.
- **Operations guide:** see `docs/ops/scheduler-worker-operations.md` for
runbook steps, alert context, and dashboard wiring instructions.