# Phase 2: Scheduler Module Conversion **Sprint:** 3 **Duration:** 1 sprint **Status:** DOING (fresh-start approved; Mongo backfill skipped) **Dependencies:** Phase 0 (Foundations) — DONE --- ## Objectives 1. Create `StellaOps.Scheduler.Storage.Postgres` project 2. Implement Scheduler schema in PostgreSQL 3. Implement 7+ repository interfaces 4. Replace MongoDB job tracking with PostgreSQL 5. Implement PostgreSQL advisory locks for distributed locking 6. Backfill Mongo data or explicitly decide on fresh-start (PG-T2.9–T2.11) --- ## Deliverables | Deliverable | Acceptance Criteria | |-------------|---------------------| | Scheduler schema | All tables created with indexes | | Repository implementations | All 7+ interfaces implemented | | Advisory locks | Distributed locking working | | Integration tests | 100% coverage of CRUD operations | | Verification report | Schedule execution verified | --- ## Schema Reference See [SPECIFICATION.md](../SPECIFICATION.md) Section 5.4 for complete Scheduler schema. **Tables:** - `scheduler.schedules` - `scheduler.triggers` - `scheduler.runs` - `scheduler.graph_jobs` - `scheduler.policy_jobs` - `scheduler.impact_snapshots` - `scheduler.workers` - `scheduler.execution_logs` - `scheduler.locks` - `scheduler.run_summaries` - `scheduler.audit` --- ## Task Breakdown ### T2.1: Create Scheduler.Storage.Postgres Project **Status:** DONE **Assignee:** Scheduler Guild **Estimate:** 0.5 days **Subtasks:** - [x] T2.1.1: Create project structure - [x] T2.1.2: Add NuGet references - [x] T2.1.3: Create `SchedulerDataSource` class - [x] T2.1.4: Create `ServiceCollectionExtensions.cs` --- ### T2.2: Implement Schema Migrations **Status:** DONE **Assignee:** Scheduler Guild **Estimate:** 1 day **Subtasks:** - [x] T2.2.1: Create `V001_CreateSchedulerSchema` migration - [x] T2.2.2: Include all tables and indexes - [x] T2.2.3: Add partial index for active schedules - [x] T2.2.4: Test migration idempotency --- ### T2.3: Implement Schedule Repository **Status:** DONE **Assignee:** Scheduler Guild **Estimate:** 1 day **Interface:** ```csharp public interface IScheduleRepository { Task GetAsync(string tenantId, string scheduleId, CancellationToken ct); Task> ListAsync(string tenantId, ScheduleQueryOptions? options, CancellationToken ct); Task UpsertAsync(Schedule schedule, CancellationToken ct); Task SoftDeleteAsync(string tenantId, string scheduleId, string deletedBy, DateTimeOffset deletedAt, CancellationToken ct); Task> GetDueSchedulesAsync(DateTimeOffset now, CancellationToken ct); } ``` **Subtasks:** - [x] T2.3.1: Implement all interface methods - [x] T2.3.2: Handle soft delete correctly - [x] T2.3.3: Implement GetDueSchedules for trigger calculation - [x] T2.3.4: Write integration tests --- ### T2.4: Implement Run Repository **Status:** DONE **Assignee:** Scheduler Guild **Estimate:** 1 day **Interface:** ```csharp public interface IRunRepository { Task GetAsync(string tenantId, Guid runId, CancellationToken ct); Task> ListAsync(string tenantId, RunQueryOptions? options, CancellationToken ct); Task CreateAsync(Run run, CancellationToken ct); Task UpdateAsync(Run run, CancellationToken ct); Task> GetPendingRunsAsync(string tenantId, CancellationToken ct); Task> GetRunsByScheduleAsync(string tenantId, Guid scheduleId, int limit, CancellationToken ct); } ``` **Subtasks:** - [x] T2.4.1: Implement all interface methods - [x] T2.4.2: Handle state transitions - [x] T2.4.3: Implement efficient pagination - [x] T2.4.4: Write integration tests --- ### T2.5: Implement Graph Job Repository **Status:** DONE **Assignee:** Scheduler Guild **Estimate:** 0.5 days **Subtasks:** - [x] T2.5.1: Implement CRUD operations - [x] T2.5.2: Implement status queries - [x] T2.5.3: Write integration tests --- ### T2.6: Implement Policy Job Repository **Status:** DONE **Assignee:** Scheduler Guild **Estimate:** 0.5 days **Subtasks:** - [x] T2.6.1: Implement CRUD operations - [x] T2.6.2: Implement status queries - [x] T2.6.3: Write integration tests --- ### T2.7: Implement Impact Snapshot Repository **Status:** DONE **Assignee:** Scheduler Guild **Estimate:** 0.5 days **Subtasks:** - [x] T2.7.1: Implement CRUD operations - [x] T2.7.2: Implement queries by run - [x] T2.7.3: Write integration tests --- ### T2.8: Implement Distributed Locking **Status:** DONE **Assignee:** Scheduler Guild **Estimate:** 1 day **Description:** Implement distributed locking using PostgreSQL advisory locks. **Options:** 1. PostgreSQL advisory locks (`pg_advisory_lock`) 2. Table-based locks with SELECT FOR UPDATE SKIP LOCKED 3. Combination approach **Subtasks:** - [x] T2.8.1: Choose locking strategy - [x] T2.8.2: Implement `IDistributedLock` interface - [x] T2.8.3: Implement lock acquisition with timeout - [x] T2.8.4: Implement lock renewal - [x] T2.8.5: Implement lock release - [x] T2.8.6: Write concurrency tests **Implementation Example:** ```csharp public sealed class PostgresDistributedLock : IDistributedLock { private readonly SchedulerDataSource _dataSource; public async Task TryAcquireAsync( string lockKey, TimeSpan timeout, CancellationToken ct) { var lockId = ComputeLockId(lockKey); await using var connection = await _dataSource.OpenConnectionAsync("system", ct); await using var cmd = connection.CreateCommand(); cmd.CommandText = "SELECT pg_try_advisory_lock(@lock_id)"; cmd.Parameters.AddWithValue("lock_id", lockId); var acquired = await cmd.ExecuteScalarAsync(ct) is true; if (!acquired) return null; return new LockHandle(connection, lockId); } private static long ComputeLockId(string key) => unchecked((long)key.GetHashCode()); } ``` --- ### T2.9: Implement Worker Registration **Status:** DONE **Assignee:** TBD **Estimate:** 0.5 days **Subtasks:** - [x] T2.9.1: Implement worker registration - [x] T2.9.2: Implement heartbeat updates - [x] T2.9.3: Implement dead worker detection - [x] T2.9.4: Write integration tests --- ### T2.10: Add Configuration Switch **Status:** DONE **Assignee:** Scheduler Guild **Estimate:** 0.5 days **Subtasks:** - [x] T2.10.1: Update service registration - [x] T2.10.2: Test backend switching - [x] T2.10.3: Document configuration --- ### T2.11: Run Verification Tests **Status:** DONE (fresh-start; Postgres-only verification) **Assignee:** Scheduler Guild **Estimate:** 1 day **Subtasks:** - [x] T2.11.1: Test schedule CRUD - [x] T2.11.2: Test run creation and state transitions - [x] T2.11.3: Test trigger calculation - [x] T2.11.4: Test distributed locking under concurrency - [x] T2.11.5: Test job execution end-to-end - [x] T2.11.6: Generate verification report (fresh-start baseline; Mongo parity not applicable) --- ### T2.12: Switch to PostgreSQL-Only **Status:** DONE **Assignee:** Scheduler Guild **Estimate:** 0.5 days **Subtasks:** - [x] T2.12.1: Update configuration (`Persistence:Scheduler=Postgres`) - [x] T2.12.2: Deploy to staging - [x] T2.12.3: Run integration tests - [x] T2.12.4: Deploy to production - [x] T2.12.5: Monitor metrics --- ## Exit Criteria - [x] All repository interfaces implemented - [x] Distributed locking working correctly - [x] All integration tests pass (module-level) - [x] Fresh-start verification completed (no Mongo parity/backfill) - [x] Scheduler running on PostgreSQL in staging/production ## Execution Log | Date (UTC) | Update | Owner | | --- | --- | --- | | 2025-11-28 | Project + schema migration created; repos implemented (T2.1–T2.8) | Scheduler Guild | | 2025-11-30 | Determinism and concurrency tests added; advisory locks in place | Scheduler Guild | | 2025-12-02 | Backfill tool added; Mongo endpoint unavailable → parity/backfill blocked | Scheduler Guild | | 2025-12-05 | Phase 0 unblocked; fresh-start approved (skip Mongo backfill). Verification done on Postgres-only baseline; cutover pending config switch/deploy. | PM | | 2025-12-05 | Config switched to Postgres, staged and produced deployed; integration smoke passed; monitoring active. | Scheduler Guild | --- ## Risks & Mitigations | Risk | Likelihood | Impact | Mitigation | |------|------------|--------|------------| | Lock contention | Medium | Medium | Test under load, tune timeouts | | Trigger calculation errors | Low | High | Extensive testing with edge cases | | State transition bugs | Medium | Medium | State machine tests | --- *Phase Version: 1.0.0* *Last Updated: 2025-11-28*