From d16d7a169259d138c9e5eca66baa87f218c597e7 Mon Sep 17 00:00:00 2001 From: master <> Date: Tue, 10 Mar 2026 01:38:38 +0200 Subject: [PATCH] Repair live JobEngine runtime contracts --- ...e_control_and_jobengine_contract_repair.md | 92 ++++++++++++++ docs/modules/jobengine/architecture.md | 2 +- .../Postgres/JobEngineDataSource.cs | 50 +++++++- .../Postgres/PostgresDeadLetterRepository.cs | 8 +- .../Postgres/PostgresJobRepository.cs | 119 ++++++++++++------ .../PostgresPackRegistryRepository.cs | 57 +++++---- .../StellaOps.JobEngine.Infrastructure.csproj | 4 + .../migrations/009_packs_registry.sql | 105 ++++++++++++++++ .../PostgresDeadLetterRepositoryTests.cs | 24 ++++ .../SchemaConsistencyTests.cs | 27 ++++ .../StellaOps.JobEngine.WebService/Program.cs | 1 + 11 files changed, 416 insertions(+), 73 deletions(-) create mode 100644 docs/implplan/SPRINT_20260309_007_FE_live_release_control_and_jobengine_contract_repair.md create mode 100644 src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/009_packs_registry.sql create mode 100644 src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Tests/DeadLetter/PostgresDeadLetterRepositoryTests.cs diff --git a/docs/implplan/SPRINT_20260309_007_FE_live_release_control_and_jobengine_contract_repair.md b/docs/implplan/SPRINT_20260309_007_FE_live_release_control_and_jobengine_contract_repair.md new file mode 100644 index 000000000..88ff98f72 --- /dev/null +++ b/docs/implplan/SPRINT_20260309_007_FE_live_release_control_and_jobengine_contract_repair.md @@ -0,0 +1,92 @@ +# Sprint 20260309-007 - FE Live Release Control And JobEngine Contract Repair + +## Topic & Scope +- Repair the post-rebuild live frontdoor drift where release-control and approvals routes still point at the retired `orchestrator` host instead of JobEngine, causing browser `404` failures on approval and release actions. +- Repair the live JobEngine SQL contract failures behind `/ops/operations/jobengine` and `/ops/operations/packs`: enum-vs-text status counts and missing `packs` schema search path. +- Remove the web approval queue's dependency on the broken legacy list route by keeping queue filtering on the v2 platform projection while retaining canonical approval detail/decision actions. +- Working directory: `src/Web/StellaOps.Web`. +- Allowed coordination edits: `src/JobEngine/StellaOps.JobEngine/**`, `devops/compose/router-gateway-local.json`, `devops/compose/router-gateway-local.reverseproxy.json`, `src/Router/StellaOps.Gateway.WebService/appsettings.json`, `docs/modules/jobengine/architecture.md`, `docs/modules/router/architecture.md`, `docs/implplan/SPRINT_20260309_007_FE_live_release_control_and_jobengine_contract_repair.md`. +- Expected evidence: focused frontend approval-client tests, targeted JobEngine verification, rebuilt/redeployed web and affected services, and refreshed live Playwright/browser probes. + +## Dependencies & Concurrency +- Depends on `SPRINT_20260309_001_Platform_scratch_setup_bootstrap_restore.md` for the rebuilt baseline, `SPRINT_20260309_003_Router_live_frontdoor_contract_repair.md` for prior frontdoor ownership fixes, and `SPRINT_20260309_005_JobEngine_live_scratch_reset_and_ops_scope_repair.md` for the restored JobEngine runtime. +- Safe parallelism: avoid unrelated search/reachability/component-revival work already in the tree; this pass is limited to release-control/approval routing, approval client behavior, and JobEngine repository runtime contracts. + +## Documentation Prerequisites +- `AGENTS.md` +- `src/Web/StellaOps.Web/AGENTS.md` +- `src/JobEngine/AGENTS.md` +- `docs/code-of-conduct/CODE_OF_CONDUCT.md` +- `docs/qa/feature-checks/FLOW.md` +- `docs/modules/jobengine/architecture.md` +- `docs/modules/router/architecture.md` + +## Delivery Tracker + +### LIVE-CONTRACT-007-001 - Restore release-control and approval frontdoor ownership +Status: DOING +Dependency: none +Owners: Developer, QA +Task description: +- Align the live compose router manifest and source router defaults so release-control and approval browser paths route to JobEngine, including the canonical `/api/v1/approvals` family and the still-supported legacy `/api/v1/release-orchestrator` paths used by some flows. +- Keep the mounted compose manifest and source appsettings in sync so the next scratch rebuild does not regress the same family. + +Completion criteria: +- [ ] The live compose router manifest points `/api/v1/release-orchestrator`, `/api/release-orchestrator`, `/api/releases`, `/api/approvals`, and `/api/v1/approvals` to JobEngine. +- [ ] Source router defaults are aligned to the same route ownership. +- [ ] Direct live probes no longer return `404` for the repaired release-control families. + +### LIVE-CONTRACT-007-002 - Keep the approval queue on canonical live contracts +Status: TODO +Dependency: LIVE-CONTRACT-007-001 +Owners: Developer, QA +Task description: +- Repair the Angular approval client so queue listing relies on the v2 releases approvals projection and client-side filtering instead of dropping to the legacy list route whenever the filter shape is richer than the v2 API. +- Preserve canonical `/api/v1/approvals` detail and decision actions, and make batch actions execute against live-supported approval decisions rather than dead batch endpoints. + +Completion criteria: +- [ ] Approval queue listing does not call `/api/v1/release-orchestrator/approvals`. +- [ ] Approval detail and decision actions continue to target live canonical approval endpoints. +- [ ] Focused frontend tests lock the repaired list and batch behaviors. + +### LIVE-CONTRACT-007-003 - Repair JobEngine SQL runtime contracts for ops pages +Status: DOING +Dependency: LIVE-CONTRACT-007-001 +Owners: Developer, QA +Task description: +- Fix the repository/runtime contract issues surfaced by the live stack: `jobs/summary` must compare against the PostgreSQL enum correctly, and the embedded pack-registry repository must operate inside the `packs` schema instead of assuming the default search path. +- Prefer durable repository/runtime fixes over UI workarounds. + +Completion criteria: +- [ ] `/api/v1/jobengine/jobs/summary` no longer throws `job_status = text`. +- [ ] `/api/v1/jobengine/registry/packs` no longer throws `relation "packs" does not exist`. +- [ ] The fix is documented in the JobEngine architecture dossier. + +### LIVE-CONTRACT-007-004 - Rebuild, redeploy, and rerun live Playwright verification +Status: TODO +Dependency: LIVE-CONTRACT-007-002 +Owners: QA +Task description: +- Rebuild the affected web and backend/runtime images, redeploy the live compose slice, rerun direct probes and the authenticated live route/action checks, and record the narrowed failure inventory for the next iteration. + +Completion criteria: +- [ ] Updated router/jobengine/web artifacts are rebuilt and redeployed without disturbing unrelated dirty work. +- [ ] Direct live probes for approvals and JobEngine routes succeed after redeploy. +- [ ] The live Playwright route/action sweep is rerun from the rebuilt stack and the remaining backlog is recorded. + +## Execution Log +| Date (UTC) | Update | Owner | +| --- | --- | --- | +| 2026-03-09 | Sprint created after the full rebuild and live sweep exposed a shared release-control frontdoor drift plus two confirmed JobEngine runtime SQL failures (`job_status = text` and missing `packs` relation). | Developer | +| 2026-03-09 | Resumed after the full stack rebuild. Confirmed the live `jobs/summary` and dead-letter summary failures come from raw SQL opening without the preserved `orchestrator` search path, and confirmed `/ops/operations/packs` is blocked by a missing startup-migrated `packs` schema contract. | Developer | + +## Decisions & Risks +- Decision: fix the browser frontdoor ownership in router config instead of teaching the UI to paper over wrong service bindings. The live compose gateway is part of the product contract. +- Decision: keep approval queue listing on the platform v2 projection and use client-side filtering for unsupported filter combinations rather than relying on the legacy list endpoint as a hidden fallback. +- Decision: fix JobEngine runtime behavior in repository/session code instead of masking the failures with empty-state UI. +- Risk: the doctor/context `503` cluster seen in the sweep may still remain after this pass because those failures appear to involve gateway instance health, not just route ownership. That should become the next iteration if still present after redeploy. + +## Next Checkpoints +- 2026-03-09: land router ownership and approval-client contract repairs. +- 2026-03-09: land JobEngine repository/runtime fixes. +- 2026-03-09: rebuild/redeploy the affected slice and rerun live Playwright verification. diff --git a/docs/modules/jobengine/architecture.md b/docs/modules/jobengine/architecture.md index 2ae17f47e..b5f63cae1 100644 --- a/docs/modules/jobengine/architecture.md +++ b/docs/modules/jobengine/architecture.md @@ -5,7 +5,7 @@ ## 1) Topology - **Orchestrator API (`StellaOps.JobEngine`).** Minimal API providing job state, throttling controls, replay endpoints, and dashboard data. Authenticated via Authority scopes (`orchestrator:*`). -- **Job ledger (PostgreSQL).** Tables `jobs`, `job_history`, `sources`, `quotas`, `throttles`, `incidents` (schema `orchestrator`). Append-only history ensures auditability. +- **Job ledger (PostgreSQL).** Tables `jobs`, `job_history`, `sources`, `quotas`, `throttles`, `incidents` (schema `orchestrator`). Startup migrations execute with PostgreSQL `search_path` bound to `orchestrator, public` so unqualified DDL lands in the module schema during scratch installs and resets. Append-only history ensures auditability. - **Queue abstraction.** Supports Valkey Streams or NATS JetStream (pluggable). Each job carries lease metadata and retry policy. - **Dashboard feeds.** SSE/GraphQL endpoints supply Console UI with job timelines, throughput, error distributions, and rate-limit status. diff --git a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/Postgres/JobEngineDataSource.cs b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/Postgres/JobEngineDataSource.cs index 0c65dca07..bcdd3d071 100644 --- a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/Postgres/JobEngineDataSource.cs +++ b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/Postgres/JobEngineDataSource.cs @@ -13,6 +13,7 @@ namespace StellaOps.JobEngine.Infrastructure.Postgres; /// public sealed class JobEngineDataSource : IAsyncDisposable { + private const string DefaultSchemaName = JobEngineDbContextFactory.DefaultSchemaName; private readonly NpgsqlDataSource _dataSource; private readonly JobEngineServiceOptions.DatabaseOptions _options; private readonly ILogger _logger; @@ -49,7 +50,7 @@ public sealed class JobEngineDataSource : IAsyncDisposable /// Cancellation token. /// Open PostgreSQL connection. public Task OpenConnectionAsync(string tenantId, CancellationToken cancellationToken) - => OpenConnectionInternalAsync(tenantId, "unspecified", cancellationToken); + => OpenConnectionInternalAsync(tenantId, "unspecified", DefaultSchemaName, cancellationToken); /// /// Opens a connection with tenant context and role label configured. @@ -59,15 +60,40 @@ public sealed class JobEngineDataSource : IAsyncDisposable /// Cancellation token. /// Open PostgreSQL connection. public Task OpenConnectionAsync(string tenantId, string role, CancellationToken cancellationToken) - => OpenConnectionInternalAsync(tenantId, role, cancellationToken); + => OpenConnectionInternalAsync(tenantId, role, DefaultSchemaName, cancellationToken); - private async Task OpenConnectionInternalAsync(string tenantId, string role, CancellationToken cancellationToken) + /// + /// Opens a connection with tenant context, role label, and schema search path configured. + /// + /// Tenant identifier for session configuration. + /// Role label for metrics/logging (e.g., "reader", "writer"). + /// Schema name to prepend to the PostgreSQL search path. + /// Cancellation token. + /// Open PostgreSQL connection. + public Task OpenConnectionAsync(string tenantId, string role, string schemaName, CancellationToken cancellationToken) + => OpenConnectionInternalAsync(tenantId, role, ResolveSchemaName(schemaName), cancellationToken); + + internal static string ResolveSchemaName(string? schemaName) + { + if (string.IsNullOrWhiteSpace(schemaName)) + { + return DefaultSchemaName; + } + + return schemaName.Trim(); + } + + private async Task OpenConnectionInternalAsync( + string tenantId, + string role, + string? schemaName, + CancellationToken cancellationToken) { var connection = await _dataSource.OpenConnectionAsync(cancellationToken).ConfigureAwait(false); try { - await ConfigureSessionAsync(connection, tenantId, cancellationToken).ConfigureAwait(false); + await ConfigureSessionAsync(connection, tenantId, schemaName, cancellationToken).ConfigureAwait(false); JobEngineMetrics.ConnectionOpened(role); connection.StateChange += (_, args) => { @@ -86,7 +112,11 @@ public sealed class JobEngineDataSource : IAsyncDisposable return connection; } - private async Task ConfigureSessionAsync(NpgsqlConnection connection, string tenantId, CancellationToken cancellationToken) + private async Task ConfigureSessionAsync( + NpgsqlConnection connection, + string tenantId, + string? schemaName, + CancellationToken cancellationToken) { try { @@ -105,6 +135,13 @@ public sealed class JobEngineDataSource : IAsyncDisposable tenantCommand.Parameters.AddWithValue("tenant", tenantId); await tenantCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); } + + var quotedSchemaName = QuoteIdentifier(ResolveSchemaName(schemaName)); + await using var searchPathCommand = new NpgsqlCommand( + $"SET search_path TO {quotedSchemaName}, public;", + connection); + searchPathCommand.CommandTimeout = _options.CommandTimeoutSeconds; + await searchPathCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); } catch (Exception ex) { @@ -116,4 +153,7 @@ public sealed class JobEngineDataSource : IAsyncDisposable throw; } } + + private static string QuoteIdentifier(string schemaName) + => $"\"{schemaName.Replace("\"", "\"\"", StringComparison.Ordinal)}\""; } diff --git a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/Postgres/PostgresDeadLetterRepository.cs b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/Postgres/PostgresDeadLetterRepository.cs index 5b54399bd..0f9b18618 100644 --- a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/Postgres/PostgresDeadLetterRepository.cs +++ b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/Postgres/PostgresDeadLetterRepository.cs @@ -434,11 +434,11 @@ public sealed class PostgresDeadLetterRepository : IDeadLetterRepository limit, cancellationToken).ConfigureAwait(false); } - catch (PostgresException ex) when (ex.SqlState == PostgresErrorCodes.UndefinedFunction) + catch (PostgresException ex) when (ShouldUseActionableSummaryFallback(ex.SqlState)) { _logger.LogWarning( ex, - "Dead-letter summary function missing; falling back to direct table aggregation for tenant {TenantId}.", + "Dead-letter summary function path is unavailable for tenant {TenantId}; falling back to direct table aggregation.", tenantId); return await ReadActionableSummaryAsync( @@ -458,6 +458,10 @@ public sealed class PostgresDeadLetterRepository : IDeadLetterRepository } } + internal static bool ShouldUseActionableSummaryFallback(string? sqlState) + => string.Equals(sqlState, PostgresErrorCodes.UndefinedFunction, StringComparison.Ordinal) + || string.Equals(sqlState, PostgresErrorCodes.AmbiguousColumn, StringComparison.Ordinal); + public async Task MarkExpiredAsync( int batchLimit, CancellationToken cancellationToken) diff --git a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/Postgres/PostgresJobRepository.cs b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/Postgres/PostgresJobRepository.cs index eeb2a0c5e..d72db14d3 100644 --- a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/Postgres/PostgresJobRepository.cs +++ b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/Postgres/PostgresJobRepository.cs @@ -24,9 +24,8 @@ public sealed class PostgresJobRepository : IJobRepository lease_until, created_at, scheduled_at, leased_at, completed_at, not_before, reason, replay_of, created_by """; - // Note: Simple read queries (GetById, GetByIdempotencyKey, GetByRunId, GetExpiredLeases, List, Count) - // have been converted to EF Core LINQ. Raw SQL constants are retained only for operations requiring - // FOR UPDATE SKIP LOCKED, enum casts, or RETURNING clauses. + // Note: entity lookups (GetById, GetByIdempotencyKey, GetByRunId) use EF Core LINQ. + // Status-filtered queries stay on raw SQL so PostgreSQL enum comparisons remain explicit. private const string InsertJobSql = """ INSERT INTO jobs ( @@ -265,17 +264,30 @@ public sealed class PostgresJobRepository : IJobRepository public async Task> GetExpiredLeasesAsync(string tenantId, DateTimeOffset cutoff, int limit, CancellationToken cancellationToken) { await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false); - await using var dbContext = JobEngineDbContextFactory.Create(connection, _dataSource.CommandTimeoutSeconds, DefaultSchema); + var sql = $""" + SELECT {SelectJobColumns} + FROM jobs + WHERE tenant_id = @tenant_id + AND status = 'leased'::job_status + AND lease_until < @cutoff + ORDER BY lease_until + LIMIT @limit + """; - var entities = await dbContext.Jobs - .AsNoTracking() - .Where(j => j.TenantId == tenantId && j.Status == "leased" && j.LeaseUntil < cutoff) - .OrderBy(j => j.LeaseUntil) - .Take(limit) - .ToListAsync(cancellationToken) - .ConfigureAwait(false); + await using var command = new NpgsqlCommand(sql, connection); + command.CommandTimeout = _dataSource.CommandTimeoutSeconds; + command.Parameters.AddWithValue("tenant_id", tenantId); + command.Parameters.AddWithValue("cutoff", cutoff); + command.Parameters.AddWithValue("limit", limit); - return entities.Select(MapJobEntity).ToList(); + await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false); + var jobs = new List(); + while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false)) + { + jobs.Add(MapJob(reader)); + } + + return jobs; } public async Task> ListAsync( @@ -290,46 +302,62 @@ public sealed class PostgresJobRepository : IJobRepository CancellationToken cancellationToken) { await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false); - await using var dbContext = JobEngineDbContextFactory.Create(connection, _dataSource.CommandTimeoutSeconds, DefaultSchema); - - IQueryable query = dbContext.Jobs - .AsNoTracking() - .Where(j => j.TenantId == tenantId); + var sql = new StringBuilder($""" + SELECT {SelectJobColumns} + FROM jobs + WHERE tenant_id = @tenant_id + """); + var parameters = new List + { + new("tenant_id", tenantId), + }; if (status.HasValue) { - var statusStr = StatusToString(status.Value); - query = query.Where(j => j.Status == statusStr); + sql.Append(" AND status = @status::job_status"); + parameters.Add(new("status", StatusToString(status.Value))); } if (!string.IsNullOrEmpty(jobType)) { - query = query.Where(j => j.JobType == jobType); + sql.Append(" AND job_type = @job_type"); + parameters.Add(new("job_type", jobType)); } if (!string.IsNullOrEmpty(projectId)) { - query = query.Where(j => j.ProjectId == projectId); + sql.Append(" AND project_id = @project_id"); + parameters.Add(new("project_id", projectId)); } if (createdAfter.HasValue) { - query = query.Where(j => j.CreatedAt >= createdAfter.Value); + sql.Append(" AND created_at >= @created_after"); + parameters.Add(new("created_after", createdAfter.Value)); } if (createdBefore.HasValue) { - query = query.Where(j => j.CreatedAt < createdBefore.Value); + sql.Append(" AND created_at < @created_before"); + parameters.Add(new("created_before", createdBefore.Value)); } - var entities = await query - .OrderByDescending(j => j.CreatedAt) - .Skip(offset) - .Take(limit) - .ToListAsync(cancellationToken) - .ConfigureAwait(false); + sql.Append(" ORDER BY created_at DESC LIMIT @limit OFFSET @offset"); + parameters.Add(new("limit", limit)); + parameters.Add(new("offset", offset)); - return entities.Select(MapJobEntity).ToList(); + await using var command = new NpgsqlCommand(sql.ToString(), connection); + command.CommandTimeout = _dataSource.CommandTimeoutSeconds; + command.Parameters.AddRange(parameters.ToArray()); + + await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false); + var jobs = new List(); + while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false)) + { + jobs.Add(MapJob(reader)); + } + + return jobs; } public async Task CountAsync( @@ -340,29 +368,40 @@ public sealed class PostgresJobRepository : IJobRepository CancellationToken cancellationToken) { await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false); - await using var dbContext = JobEngineDbContextFactory.Create(connection, _dataSource.CommandTimeoutSeconds, DefaultSchema); - - IQueryable query = dbContext.Jobs - .AsNoTracking() - .Where(j => j.TenantId == tenantId); + var sql = new StringBuilder(""" + SELECT COUNT(*) + FROM jobs + WHERE tenant_id = @tenant_id + """); + var parameters = new List + { + new("tenant_id", tenantId), + }; if (status.HasValue) { - var statusStr = StatusToString(status.Value); - query = query.Where(j => j.Status == statusStr); + sql.Append(" AND status = @status::job_status"); + parameters.Add(new("status", StatusToString(status.Value))); } if (!string.IsNullOrEmpty(jobType)) { - query = query.Where(j => j.JobType == jobType); + sql.Append(" AND job_type = @job_type"); + parameters.Add(new("job_type", jobType)); } if (!string.IsNullOrEmpty(projectId)) { - query = query.Where(j => j.ProjectId == projectId); + sql.Append(" AND project_id = @project_id"); + parameters.Add(new("project_id", projectId)); } - return await query.CountAsync(cancellationToken).ConfigureAwait(false); + await using var command = new NpgsqlCommand(sql.ToString(), connection); + command.CommandTimeout = _dataSource.CommandTimeoutSeconds; + command.Parameters.AddRange(parameters.ToArray()); + + var result = await command.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false); + return Convert.ToInt32(result); } private static void AddJobParameters(NpgsqlCommand command, Job job) diff --git a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/Postgres/PostgresPackRegistryRepository.cs b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/Postgres/PostgresPackRegistryRepository.cs index f62bf207b..64c017bcd 100644 --- a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/Postgres/PostgresPackRegistryRepository.cs +++ b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/Postgres/PostgresPackRegistryRepository.cs @@ -11,6 +11,7 @@ namespace StellaOps.JobEngine.Infrastructure.Postgres; /// public sealed class PostgresPackRegistryRepository : IPackRegistryRepository { + private const string PackSchemaName = "packs"; private readonly JobEngineDataSource _dataSource; private readonly ILogger _logger; private readonly TimeProvider _timeProvider; @@ -49,7 +50,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository Guid packId, CancellationToken cancellationToken) { - await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken); + await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken); var sql = $"SELECT {PackColumns} FROM packs WHERE tenant_id = @tenant_id AND pack_id = @pack_id"; await using var command = new NpgsqlCommand(sql, connection); @@ -70,7 +71,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository string name, CancellationToken cancellationToken) { - await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken); + await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken); var sql = $"SELECT {PackColumns} FROM packs WHERE tenant_id = @tenant_id AND name = @name"; await using var command = new NpgsqlCommand(sql, connection); @@ -96,7 +97,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository int offset, CancellationToken cancellationToken) { - await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken); + await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken); var sql = $"SELECT {PackColumns} FROM packs WHERE tenant_id = @tenant_id"; var parameters = new List @@ -153,7 +154,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository string? tag, CancellationToken cancellationToken) { - await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken); + await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken); var sql = "SELECT COUNT(*) FROM packs WHERE tenant_id = @tenant_id"; var parameters = new List @@ -194,7 +195,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository public async Task CreatePackAsync(Pack pack, CancellationToken cancellationToken) { - await using var connection = await _dataSource.OpenConnectionAsync(pack.TenantId, "writer", cancellationToken); + await using var connection = await OpenPackWriterConnectionAsync(pack.TenantId, cancellationToken); const string sql = """ INSERT INTO packs ( @@ -217,7 +218,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository public async Task UpdatePackAsync(Pack pack, CancellationToken cancellationToken) { - await using var connection = await _dataSource.OpenConnectionAsync(pack.TenantId, "writer", cancellationToken); + await using var connection = await OpenPackWriterConnectionAsync(pack.TenantId, cancellationToken); const string sql = """ UPDATE packs SET @@ -251,7 +252,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository string? publishedBy, CancellationToken cancellationToken) { - await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "writer", cancellationToken); + await using var connection = await OpenPackWriterConnectionAsync(tenantId, cancellationToken); const string sql = """ UPDATE packs SET @@ -280,7 +281,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository Guid packId, CancellationToken cancellationToken) { - await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "writer", cancellationToken); + await using var connection = await OpenPackWriterConnectionAsync(tenantId, cancellationToken); const string sql = """ DELETE FROM packs @@ -305,7 +306,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository Guid packVersionId, CancellationToken cancellationToken) { - await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken); + await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken); var sql = $"SELECT {VersionColumns} FROM pack_versions WHERE tenant_id = @tenant_id AND pack_version_id = @pack_version_id"; await using var command = new NpgsqlCommand(sql, connection); @@ -327,7 +328,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository string version, CancellationToken cancellationToken) { - await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken); + await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken); var sql = $"SELECT {VersionColumns} FROM pack_versions WHERE tenant_id = @tenant_id AND pack_id = @pack_id AND version = @version"; await using var command = new NpgsqlCommand(sql, connection); @@ -350,7 +351,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository bool includePrerelease, CancellationToken cancellationToken) { - await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken); + await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken); var sql = $""" SELECT {VersionColumns} @@ -388,7 +389,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository int offset, CancellationToken cancellationToken) { - await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken); + await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken); var sql = $"SELECT {VersionColumns} FROM pack_versions WHERE tenant_id = @tenant_id AND pack_id = @pack_id"; @@ -425,7 +426,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository PackVersionStatus? status, CancellationToken cancellationToken) { - await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken); + await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken); var sql = "SELECT COUNT(*) FROM pack_versions WHERE tenant_id = @tenant_id AND pack_id = @pack_id"; @@ -448,7 +449,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository public async Task CreateVersionAsync(PackVersion version, CancellationToken cancellationToken) { - await using var connection = await _dataSource.OpenConnectionAsync(version.TenantId, "writer", cancellationToken); + await using var connection = await OpenPackWriterConnectionAsync(version.TenantId, cancellationToken); const string sql = """ INSERT INTO pack_versions ( @@ -477,7 +478,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository public async Task UpdateVersionAsync(PackVersion version, CancellationToken cancellationToken) { - await using var connection = await _dataSource.OpenConnectionAsync(version.TenantId, "writer", cancellationToken); + await using var connection = await OpenPackWriterConnectionAsync(version.TenantId, cancellationToken); const string sql = """ UPDATE pack_versions SET @@ -518,7 +519,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository string? deprecationReason, CancellationToken cancellationToken) { - await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "writer", cancellationToken); + await using var connection = await OpenPackWriterConnectionAsync(tenantId, cancellationToken); const string sql = """ UPDATE pack_versions SET @@ -557,7 +558,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository DateTimeOffset signedAt, CancellationToken cancellationToken) { - await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "writer", cancellationToken); + await using var connection = await OpenPackWriterConnectionAsync(tenantId, cancellationToken); const string sql = """ UPDATE pack_versions SET @@ -587,7 +588,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository Guid packVersionId, CancellationToken cancellationToken) { - await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "writer", cancellationToken); + await using var connection = await OpenPackWriterConnectionAsync(tenantId, cancellationToken); const string sql = """ UPDATE pack_versions SET download_count = download_count + 1 @@ -606,7 +607,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository Guid packVersionId, CancellationToken cancellationToken) { - await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "writer", cancellationToken); + await using var connection = await OpenPackWriterConnectionAsync(tenantId, cancellationToken); const string sql = """ DELETE FROM pack_versions @@ -632,7 +633,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository int limit, CancellationToken cancellationToken) { - await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken); + await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken); var sql = $""" SELECT {PackColumns} @@ -674,7 +675,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository int offset, CancellationToken cancellationToken) { - await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken); + await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken); var sql = $""" SELECT {PackColumns} @@ -707,7 +708,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository int limit, CancellationToken cancellationToken) { - await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken); + await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken); var sql = $""" SELECT p.{PackColumns.Replace("pack_id", "p.pack_id")} @@ -743,7 +744,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository int limit, CancellationToken cancellationToken) { - await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken); + await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken); var sql = $""" SELECT {PackColumns} @@ -775,7 +776,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository Guid packId, CancellationToken cancellationToken) { - await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken); + await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken); const string sql = """ SELECT COALESCE(SUM(download_count), 0) @@ -795,7 +796,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository string tenantId, CancellationToken cancellationToken) { - await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken); + await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken); const string sql = """ SELECT @@ -940,4 +941,10 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository Metadata: reader.IsDBNull(28) ? null : reader.GetString(28), DownloadCount: reader.GetInt32(29)); } + + private Task OpenPackReaderConnectionAsync(string tenantId, CancellationToken cancellationToken) => + _dataSource.OpenConnectionAsync(tenantId, "reader", PackSchemaName, cancellationToken); + + private Task OpenPackWriterConnectionAsync(string tenantId, CancellationToken cancellationToken) => + _dataSource.OpenConnectionAsync(tenantId, "writer", PackSchemaName, cancellationToken); } diff --git a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/StellaOps.JobEngine.Infrastructure.csproj b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/StellaOps.JobEngine.Infrastructure.csproj index dab123668..11e63fc9d 100644 --- a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/StellaOps.JobEngine.Infrastructure.csproj +++ b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/StellaOps.JobEngine.Infrastructure.csproj @@ -13,6 +13,10 @@ + + + + diff --git a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/009_packs_registry.sql b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/009_packs_registry.sql new file mode 100644 index 000000000..c3f0688e5 --- /dev/null +++ b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/009_packs_registry.sql @@ -0,0 +1,105 @@ +-- 009_packs_registry.sql +-- Startup migration for the JobEngine-backed pack registry projections used by /api/v1/jobengine/registry/* +-- Objects are fully qualified because the migration host search path remains orchestrator, public. + +CREATE SCHEMA IF NOT EXISTS packs; + +DO $$ BEGIN + CREATE TYPE packs.pack_status AS ENUM ( + 'draft', + 'published', + 'deprecated', + 'archived' + ); +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; + +DO $$ BEGIN + CREATE TYPE packs.pack_version_status AS ENUM ( + 'draft', + 'published', + 'deprecated', + 'archived' + ); +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; + +CREATE TABLE IF NOT EXISTS packs.packs ( + pack_id UUID NOT NULL, + tenant_id TEXT NOT NULL, + project_id TEXT, + name TEXT NOT NULL, + display_name TEXT NOT NULL, + description TEXT, + status packs.pack_status NOT NULL DEFAULT 'draft', + created_by TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_by TEXT, + metadata TEXT, + tags TEXT, + icon_uri TEXT, + version_count INTEGER NOT NULL DEFAULT 0, + latest_version TEXT, + published_at TIMESTAMPTZ, + published_by TEXT, + CONSTRAINT pk_pack_registry_packs PRIMARY KEY (tenant_id, pack_id), + CONSTRAINT uq_pack_registry_pack_name UNIQUE (tenant_id, name), + CONSTRAINT ck_pack_registry_version_count_non_negative CHECK (version_count >= 0) +); + +CREATE INDEX IF NOT EXISTS ix_pack_registry_packs_status_updated + ON packs.packs (tenant_id, status, updated_at DESC); + +CREATE INDEX IF NOT EXISTS ix_pack_registry_packs_project_status_updated + ON packs.packs (tenant_id, project_id, status, updated_at DESC); + +CREATE INDEX IF NOT EXISTS ix_pack_registry_packs_published + ON packs.packs (tenant_id, published_at DESC NULLS LAST, updated_at DESC); + +CREATE TABLE IF NOT EXISTS packs.pack_versions ( + pack_version_id UUID NOT NULL, + tenant_id TEXT NOT NULL, + pack_id UUID NOT NULL, + version TEXT NOT NULL, + sem_ver TEXT, + status packs.pack_version_status NOT NULL DEFAULT 'draft', + artifact_uri TEXT NOT NULL, + artifact_digest TEXT NOT NULL, + artifact_mime_type TEXT, + artifact_size_bytes BIGINT, + manifest_json TEXT, + manifest_digest TEXT, + release_notes TEXT, + min_engine_version TEXT, + dependencies TEXT, + created_by TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_by TEXT, + published_at TIMESTAMPTZ, + published_by TEXT, + deprecated_at TIMESTAMPTZ, + deprecated_by TEXT, + deprecation_reason TEXT, + signature_uri TEXT, + signature_algorithm TEXT, + signed_by TEXT, + signed_at TIMESTAMPTZ, + metadata TEXT, + download_count INTEGER NOT NULL DEFAULT 0, + CONSTRAINT pk_pack_registry_pack_versions PRIMARY KEY (tenant_id, pack_version_id), + CONSTRAINT uq_pack_registry_pack_version UNIQUE (tenant_id, pack_id, version), + CONSTRAINT ck_pack_registry_download_count_non_negative CHECK (download_count >= 0), + CONSTRAINT fk_pack_registry_pack_versions_pack + FOREIGN KEY (tenant_id, pack_id) + REFERENCES packs.packs (tenant_id, pack_id) + ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS ix_pack_registry_pack_versions_pack_status_created + ON packs.pack_versions (tenant_id, pack_id, status, created_at DESC); + +CREATE INDEX IF NOT EXISTS ix_pack_registry_pack_versions_status_published + ON packs.pack_versions (tenant_id, status, published_at DESC NULLS LAST, updated_at DESC); + +CREATE INDEX IF NOT EXISTS ix_pack_registry_pack_versions_downloads + ON packs.pack_versions (tenant_id, pack_id, download_count DESC); diff --git a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Tests/DeadLetter/PostgresDeadLetterRepositoryTests.cs b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Tests/DeadLetter/PostgresDeadLetterRepositoryTests.cs new file mode 100644 index 000000000..a2633ecc3 --- /dev/null +++ b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Tests/DeadLetter/PostgresDeadLetterRepositoryTests.cs @@ -0,0 +1,24 @@ +using Npgsql; +using StellaOps.JobEngine.Infrastructure.Postgres; + +namespace StellaOps.JobEngine.Tests.DeadLetter; + +public sealed class PostgresDeadLetterRepositoryTests +{ + [Theory] + [InlineData(PostgresErrorCodes.UndefinedFunction)] + [InlineData(PostgresErrorCodes.AmbiguousColumn)] + public void ShouldUseActionableSummaryFallback_ReturnsTrue_ForRecoverableLegacySqlStates(string sqlState) + { + Assert.True(PostgresDeadLetterRepository.ShouldUseActionableSummaryFallback(sqlState)); + } + + [Theory] + [InlineData(PostgresErrorCodes.UndefinedTable)] + [InlineData("XX000")] + [InlineData(null)] + public void ShouldUseActionableSummaryFallback_ReturnsFalse_ForNonFallbackSqlStates(string? sqlState) + { + Assert.False(PostgresDeadLetterRepository.ShouldUseActionableSummaryFallback(sqlState)); + } +} diff --git a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Tests/SchemaConsistencyTests.cs b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Tests/SchemaConsistencyTests.cs index e1c24809e..46f50b30f 100644 --- a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Tests/SchemaConsistencyTests.cs +++ b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Tests/SchemaConsistencyTests.cs @@ -3,6 +3,7 @@ using Microsoft.EntityFrameworkCore; using StellaOps.JobEngine.Infrastructure.EfCore.CompiledModels; using StellaOps.JobEngine.Infrastructure.EfCore.Context; using StellaOps.JobEngine.Infrastructure.EfCore.Models; +using StellaOps.JobEngine.Infrastructure.Postgres; using StellaOps.TestKit; using Xunit; @@ -51,6 +52,32 @@ public sealed class SchemaConsistencyTests compiledSchemas.Should().ContainSingle().Which.Should().Be(JobEngineDbContext.DefaultSchemaName); } + [Trait("Category", TestCategories.Unit)] + [Fact] + public void DataSourceSchemaResolution_UsesOrchestratorForDefaultConnections() + { + JobEngineDataSource.ResolveSchemaName(null).Should().Be(JobEngineDbContext.DefaultSchemaName); + JobEngineDataSource.ResolveSchemaName(string.Empty).Should().Be(JobEngineDbContext.DefaultSchemaName); + JobEngineDataSource.ResolveSchemaName(" packs ").Should().Be("packs"); + } + + [Trait("Category", TestCategories.Unit)] + [Fact] + public void InfrastructureAssembly_EmbedsPackRegistryStartupMigration() + { + var assembly = typeof(JobEngineDataSource).Assembly; + assembly.GetManifestResourceNames().Should().Contain("009_packs_registry.sql"); + + using var stream = assembly.GetManifestResourceStream("009_packs_registry.sql"); + stream.Should().NotBeNull(); + + using var reader = new StreamReader(stream!); + var sql = reader.ReadToEnd(); + + sql.Should().Contain("CREATE SCHEMA IF NOT EXISTS packs;"); + sql.Should().Contain("CREATE TABLE IF NOT EXISTS packs.pack_versions"); + } + private static JobEngineDbContext CreateRuntimeContext() { var options = new DbContextOptionsBuilder() diff --git a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.WebService/Program.cs b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.WebService/Program.cs index c60d81cd4..c41ed096a 100644 --- a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.WebService/Program.cs +++ b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.WebService/Program.cs @@ -189,6 +189,7 @@ app.MapWorkerEndpoints(); // Register quota governance and circuit breaker endpoints (per SPRINT_20260208_042) app.MapCircuitBreakerEndpoints(); +app.MapQuotaEndpoints(); app.MapQuotaGovernanceEndpoints(); // Register dead-letter queue management endpoints