Repair live JobEngine runtime contracts

This commit is contained in:
master
2026-03-10 01:38:38 +02:00
parent 7be7295597
commit d16d7a1692
11 changed files with 416 additions and 73 deletions

View File

@@ -13,6 +13,7 @@ namespace StellaOps.JobEngine.Infrastructure.Postgres;
/// </summary>
public sealed class JobEngineDataSource : IAsyncDisposable
{
private const string DefaultSchemaName = JobEngineDbContextFactory.DefaultSchemaName;
private readonly NpgsqlDataSource _dataSource;
private readonly JobEngineServiceOptions.DatabaseOptions _options;
private readonly ILogger<JobEngineDataSource> _logger;
@@ -49,7 +50,7 @@ public sealed class JobEngineDataSource : IAsyncDisposable
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Open PostgreSQL connection.</returns>
public Task<NpgsqlConnection> OpenConnectionAsync(string tenantId, CancellationToken cancellationToken)
=> OpenConnectionInternalAsync(tenantId, "unspecified", cancellationToken);
=> OpenConnectionInternalAsync(tenantId, "unspecified", DefaultSchemaName, cancellationToken);
/// <summary>
/// Opens a connection with tenant context and role label configured.
@@ -59,15 +60,40 @@ public sealed class JobEngineDataSource : IAsyncDisposable
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Open PostgreSQL connection.</returns>
public Task<NpgsqlConnection> OpenConnectionAsync(string tenantId, string role, CancellationToken cancellationToken)
=> OpenConnectionInternalAsync(tenantId, role, cancellationToken);
=> OpenConnectionInternalAsync(tenantId, role, DefaultSchemaName, cancellationToken);
private async Task<NpgsqlConnection> OpenConnectionInternalAsync(string tenantId, string role, CancellationToken cancellationToken)
/// <summary>
/// Opens a connection with tenant context, role label, and schema search path configured.
/// </summary>
/// <param name="tenantId">Tenant identifier for session configuration.</param>
/// <param name="role">Role label for metrics/logging (e.g., "reader", "writer").</param>
/// <param name="schemaName">Schema name to prepend to the PostgreSQL search path.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Open PostgreSQL connection.</returns>
public Task<NpgsqlConnection> OpenConnectionAsync(string tenantId, string role, string schemaName, CancellationToken cancellationToken)
=> OpenConnectionInternalAsync(tenantId, role, ResolveSchemaName(schemaName), cancellationToken);
internal static string ResolveSchemaName(string? schemaName)
{
if (string.IsNullOrWhiteSpace(schemaName))
{
return DefaultSchemaName;
}
return schemaName.Trim();
}
private async Task<NpgsqlConnection> OpenConnectionInternalAsync(
string tenantId,
string role,
string? schemaName,
CancellationToken cancellationToken)
{
var connection = await _dataSource.OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
try
{
await ConfigureSessionAsync(connection, tenantId, cancellationToken).ConfigureAwait(false);
await ConfigureSessionAsync(connection, tenantId, schemaName, cancellationToken).ConfigureAwait(false);
JobEngineMetrics.ConnectionOpened(role);
connection.StateChange += (_, args) =>
{
@@ -86,7 +112,11 @@ public sealed class JobEngineDataSource : IAsyncDisposable
return connection;
}
private async Task ConfigureSessionAsync(NpgsqlConnection connection, string tenantId, CancellationToken cancellationToken)
private async Task ConfigureSessionAsync(
NpgsqlConnection connection,
string tenantId,
string? schemaName,
CancellationToken cancellationToken)
{
try
{
@@ -105,6 +135,13 @@ public sealed class JobEngineDataSource : IAsyncDisposable
tenantCommand.Parameters.AddWithValue("tenant", tenantId);
await tenantCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
var quotedSchemaName = QuoteIdentifier(ResolveSchemaName(schemaName));
await using var searchPathCommand = new NpgsqlCommand(
$"SET search_path TO {quotedSchemaName}, public;",
connection);
searchPathCommand.CommandTimeout = _options.CommandTimeoutSeconds;
await searchPathCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
@@ -116,4 +153,7 @@ public sealed class JobEngineDataSource : IAsyncDisposable
throw;
}
}
private static string QuoteIdentifier(string schemaName)
=> $"\"{schemaName.Replace("\"", "\"\"", StringComparison.Ordinal)}\"";
}

View File

@@ -434,11 +434,11 @@ public sealed class PostgresDeadLetterRepository : IDeadLetterRepository
limit,
cancellationToken).ConfigureAwait(false);
}
catch (PostgresException ex) when (ex.SqlState == PostgresErrorCodes.UndefinedFunction)
catch (PostgresException ex) when (ShouldUseActionableSummaryFallback(ex.SqlState))
{
_logger.LogWarning(
ex,
"Dead-letter summary function missing; falling back to direct table aggregation for tenant {TenantId}.",
"Dead-letter summary function path is unavailable for tenant {TenantId}; falling back to direct table aggregation.",
tenantId);
return await ReadActionableSummaryAsync(
@@ -458,6 +458,10 @@ public sealed class PostgresDeadLetterRepository : IDeadLetterRepository
}
}
internal static bool ShouldUseActionableSummaryFallback(string? sqlState)
=> string.Equals(sqlState, PostgresErrorCodes.UndefinedFunction, StringComparison.Ordinal)
|| string.Equals(sqlState, PostgresErrorCodes.AmbiguousColumn, StringComparison.Ordinal);
public async Task<int> MarkExpiredAsync(
int batchLimit,
CancellationToken cancellationToken)

View File

@@ -24,9 +24,8 @@ public sealed class PostgresJobRepository : IJobRepository
lease_until, created_at, scheduled_at, leased_at, completed_at, not_before, reason, replay_of, created_by
""";
// Note: Simple read queries (GetById, GetByIdempotencyKey, GetByRunId, GetExpiredLeases, List, Count)
// have been converted to EF Core LINQ. Raw SQL constants are retained only for operations requiring
// FOR UPDATE SKIP LOCKED, enum casts, or RETURNING clauses.
// Note: entity lookups (GetById, GetByIdempotencyKey, GetByRunId) use EF Core LINQ.
// Status-filtered queries stay on raw SQL so PostgreSQL enum comparisons remain explicit.
private const string InsertJobSql = """
INSERT INTO jobs (
@@ -265,17 +264,30 @@ public sealed class PostgresJobRepository : IJobRepository
public async Task<IReadOnlyList<Job>> GetExpiredLeasesAsync(string tenantId, DateTimeOffset cutoff, int limit, CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false);
await using var dbContext = JobEngineDbContextFactory.Create(connection, _dataSource.CommandTimeoutSeconds, DefaultSchema);
var sql = $"""
SELECT {SelectJobColumns}
FROM jobs
WHERE tenant_id = @tenant_id
AND status = 'leased'::job_status
AND lease_until < @cutoff
ORDER BY lease_until
LIMIT @limit
""";
var entities = await dbContext.Jobs
.AsNoTracking()
.Where(j => j.TenantId == tenantId && j.Status == "leased" && j.LeaseUntil < cutoff)
.OrderBy(j => j.LeaseUntil)
.Take(limit)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
await using var command = new NpgsqlCommand(sql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddWithValue("tenant_id", tenantId);
command.Parameters.AddWithValue("cutoff", cutoff);
command.Parameters.AddWithValue("limit", limit);
return entities.Select(MapJobEntity).ToList();
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
var jobs = new List<Job>();
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
jobs.Add(MapJob(reader));
}
return jobs;
}
public async Task<IReadOnlyList<Job>> ListAsync(
@@ -290,46 +302,62 @@ public sealed class PostgresJobRepository : IJobRepository
CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false);
await using var dbContext = JobEngineDbContextFactory.Create(connection, _dataSource.CommandTimeoutSeconds, DefaultSchema);
IQueryable<JobEntity> query = dbContext.Jobs
.AsNoTracking()
.Where(j => j.TenantId == tenantId);
var sql = new StringBuilder($"""
SELECT {SelectJobColumns}
FROM jobs
WHERE tenant_id = @tenant_id
""");
var parameters = new List<NpgsqlParameter>
{
new("tenant_id", tenantId),
};
if (status.HasValue)
{
var statusStr = StatusToString(status.Value);
query = query.Where(j => j.Status == statusStr);
sql.Append(" AND status = @status::job_status");
parameters.Add(new("status", StatusToString(status.Value)));
}
if (!string.IsNullOrEmpty(jobType))
{
query = query.Where(j => j.JobType == jobType);
sql.Append(" AND job_type = @job_type");
parameters.Add(new("job_type", jobType));
}
if (!string.IsNullOrEmpty(projectId))
{
query = query.Where(j => j.ProjectId == projectId);
sql.Append(" AND project_id = @project_id");
parameters.Add(new("project_id", projectId));
}
if (createdAfter.HasValue)
{
query = query.Where(j => j.CreatedAt >= createdAfter.Value);
sql.Append(" AND created_at >= @created_after");
parameters.Add(new("created_after", createdAfter.Value));
}
if (createdBefore.HasValue)
{
query = query.Where(j => j.CreatedAt < createdBefore.Value);
sql.Append(" AND created_at < @created_before");
parameters.Add(new("created_before", createdBefore.Value));
}
var entities = await query
.OrderByDescending(j => j.CreatedAt)
.Skip(offset)
.Take(limit)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
sql.Append(" ORDER BY created_at DESC LIMIT @limit OFFSET @offset");
parameters.Add(new("limit", limit));
parameters.Add(new("offset", offset));
return entities.Select(MapJobEntity).ToList();
await using var command = new NpgsqlCommand(sql.ToString(), connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddRange(parameters.ToArray());
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
var jobs = new List<Job>();
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
jobs.Add(MapJob(reader));
}
return jobs;
}
public async Task<int> CountAsync(
@@ -340,29 +368,40 @@ public sealed class PostgresJobRepository : IJobRepository
CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false);
await using var dbContext = JobEngineDbContextFactory.Create(connection, _dataSource.CommandTimeoutSeconds, DefaultSchema);
IQueryable<JobEntity> query = dbContext.Jobs
.AsNoTracking()
.Where(j => j.TenantId == tenantId);
var sql = new StringBuilder("""
SELECT COUNT(*)
FROM jobs
WHERE tenant_id = @tenant_id
""");
var parameters = new List<NpgsqlParameter>
{
new("tenant_id", tenantId),
};
if (status.HasValue)
{
var statusStr = StatusToString(status.Value);
query = query.Where(j => j.Status == statusStr);
sql.Append(" AND status = @status::job_status");
parameters.Add(new("status", StatusToString(status.Value)));
}
if (!string.IsNullOrEmpty(jobType))
{
query = query.Where(j => j.JobType == jobType);
sql.Append(" AND job_type = @job_type");
parameters.Add(new("job_type", jobType));
}
if (!string.IsNullOrEmpty(projectId))
{
query = query.Where(j => j.ProjectId == projectId);
sql.Append(" AND project_id = @project_id");
parameters.Add(new("project_id", projectId));
}
return await query.CountAsync(cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(sql.ToString(), connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddRange(parameters.ToArray());
var result = await command.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false);
return Convert.ToInt32(result);
}
private static void AddJobParameters(NpgsqlCommand command, Job job)

View File

@@ -11,6 +11,7 @@ namespace StellaOps.JobEngine.Infrastructure.Postgres;
/// </summary>
public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
{
private const string PackSchemaName = "packs";
private readonly JobEngineDataSource _dataSource;
private readonly ILogger<PostgresPackRegistryRepository> _logger;
private readonly TimeProvider _timeProvider;
@@ -49,7 +50,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
Guid packId,
CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken);
await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken);
var sql = $"SELECT {PackColumns} FROM packs WHERE tenant_id = @tenant_id AND pack_id = @pack_id";
await using var command = new NpgsqlCommand(sql, connection);
@@ -70,7 +71,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
string name,
CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken);
await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken);
var sql = $"SELECT {PackColumns} FROM packs WHERE tenant_id = @tenant_id AND name = @name";
await using var command = new NpgsqlCommand(sql, connection);
@@ -96,7 +97,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
int offset,
CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken);
await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken);
var sql = $"SELECT {PackColumns} FROM packs WHERE tenant_id = @tenant_id";
var parameters = new List<NpgsqlParameter>
@@ -153,7 +154,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
string? tag,
CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken);
await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken);
var sql = "SELECT COUNT(*) FROM packs WHERE tenant_id = @tenant_id";
var parameters = new List<NpgsqlParameter>
@@ -194,7 +195,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
public async Task CreatePackAsync(Pack pack, CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(pack.TenantId, "writer", cancellationToken);
await using var connection = await OpenPackWriterConnectionAsync(pack.TenantId, cancellationToken);
const string sql = """
INSERT INTO packs (
@@ -217,7 +218,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
public async Task UpdatePackAsync(Pack pack, CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(pack.TenantId, "writer", cancellationToken);
await using var connection = await OpenPackWriterConnectionAsync(pack.TenantId, cancellationToken);
const string sql = """
UPDATE packs SET
@@ -251,7 +252,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
string? publishedBy,
CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "writer", cancellationToken);
await using var connection = await OpenPackWriterConnectionAsync(tenantId, cancellationToken);
const string sql = """
UPDATE packs SET
@@ -280,7 +281,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
Guid packId,
CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "writer", cancellationToken);
await using var connection = await OpenPackWriterConnectionAsync(tenantId, cancellationToken);
const string sql = """
DELETE FROM packs
@@ -305,7 +306,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
Guid packVersionId,
CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken);
await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken);
var sql = $"SELECT {VersionColumns} FROM pack_versions WHERE tenant_id = @tenant_id AND pack_version_id = @pack_version_id";
await using var command = new NpgsqlCommand(sql, connection);
@@ -327,7 +328,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
string version,
CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken);
await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken);
var sql = $"SELECT {VersionColumns} FROM pack_versions WHERE tenant_id = @tenant_id AND pack_id = @pack_id AND version = @version";
await using var command = new NpgsqlCommand(sql, connection);
@@ -350,7 +351,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
bool includePrerelease,
CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken);
await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken);
var sql = $"""
SELECT {VersionColumns}
@@ -388,7 +389,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
int offset,
CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken);
await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken);
var sql = $"SELECT {VersionColumns} FROM pack_versions WHERE tenant_id = @tenant_id AND pack_id = @pack_id";
@@ -425,7 +426,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
PackVersionStatus? status,
CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken);
await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken);
var sql = "SELECT COUNT(*) FROM pack_versions WHERE tenant_id = @tenant_id AND pack_id = @pack_id";
@@ -448,7 +449,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
public async Task CreateVersionAsync(PackVersion version, CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(version.TenantId, "writer", cancellationToken);
await using var connection = await OpenPackWriterConnectionAsync(version.TenantId, cancellationToken);
const string sql = """
INSERT INTO pack_versions (
@@ -477,7 +478,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
public async Task UpdateVersionAsync(PackVersion version, CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(version.TenantId, "writer", cancellationToken);
await using var connection = await OpenPackWriterConnectionAsync(version.TenantId, cancellationToken);
const string sql = """
UPDATE pack_versions SET
@@ -518,7 +519,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
string? deprecationReason,
CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "writer", cancellationToken);
await using var connection = await OpenPackWriterConnectionAsync(tenantId, cancellationToken);
const string sql = """
UPDATE pack_versions SET
@@ -557,7 +558,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
DateTimeOffset signedAt,
CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "writer", cancellationToken);
await using var connection = await OpenPackWriterConnectionAsync(tenantId, cancellationToken);
const string sql = """
UPDATE pack_versions SET
@@ -587,7 +588,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
Guid packVersionId,
CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "writer", cancellationToken);
await using var connection = await OpenPackWriterConnectionAsync(tenantId, cancellationToken);
const string sql = """
UPDATE pack_versions SET download_count = download_count + 1
@@ -606,7 +607,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
Guid packVersionId,
CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "writer", cancellationToken);
await using var connection = await OpenPackWriterConnectionAsync(tenantId, cancellationToken);
const string sql = """
DELETE FROM pack_versions
@@ -632,7 +633,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
int limit,
CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken);
await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken);
var sql = $"""
SELECT {PackColumns}
@@ -674,7 +675,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
int offset,
CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken);
await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken);
var sql = $"""
SELECT {PackColumns}
@@ -707,7 +708,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
int limit,
CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken);
await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken);
var sql = $"""
SELECT p.{PackColumns.Replace("pack_id", "p.pack_id")}
@@ -743,7 +744,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
int limit,
CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken);
await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken);
var sql = $"""
SELECT {PackColumns}
@@ -775,7 +776,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
Guid packId,
CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken);
await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken);
const string sql = """
SELECT COALESCE(SUM(download_count), 0)
@@ -795,7 +796,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
string tenantId,
CancellationToken cancellationToken)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken);
await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken);
const string sql = """
SELECT
@@ -940,4 +941,10 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
Metadata: reader.IsDBNull(28) ? null : reader.GetString(28),
DownloadCount: reader.GetInt32(29));
}
private Task<NpgsqlConnection> OpenPackReaderConnectionAsync(string tenantId, CancellationToken cancellationToken) =>
_dataSource.OpenConnectionAsync(tenantId, "reader", PackSchemaName, cancellationToken);
private Task<NpgsqlConnection> OpenPackWriterConnectionAsync(string tenantId, CancellationToken cancellationToken) =>
_dataSource.OpenConnectionAsync(tenantId, "writer", PackSchemaName, cancellationToken);
}

View File

@@ -13,6 +13,10 @@
<EmbeddedResource Include="migrations\**\*.sql" LogicalName="%(RecursiveDir)%(Filename)%(Extension)" />
</ItemGroup>
<ItemGroup>
<InternalsVisibleTo Include="StellaOps.JobEngine.Tests" />
</ItemGroup>
<ItemGroup>
<!-- Prevent automatic compiled-model binding so non-default schemas can build runtime models. -->
<Compile Remove="EfCore\CompiledModels\JobEngineDbContextAssemblyAttributes.cs" />

View File

@@ -0,0 +1,105 @@
-- 009_packs_registry.sql
-- Startup migration for the JobEngine-backed pack registry projections used by /api/v1/jobengine/registry/*
-- Objects are fully qualified because the migration host search path remains orchestrator, public.
CREATE SCHEMA IF NOT EXISTS packs;
DO $$ BEGIN
CREATE TYPE packs.pack_status AS ENUM (
'draft',
'published',
'deprecated',
'archived'
);
EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$;
DO $$ BEGIN
CREATE TYPE packs.pack_version_status AS ENUM (
'draft',
'published',
'deprecated',
'archived'
);
EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$;
CREATE TABLE IF NOT EXISTS packs.packs (
pack_id UUID NOT NULL,
tenant_id TEXT NOT NULL,
project_id TEXT,
name TEXT NOT NULL,
display_name TEXT NOT NULL,
description TEXT,
status packs.pack_status NOT NULL DEFAULT 'draft',
created_by TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_by TEXT,
metadata TEXT,
tags TEXT,
icon_uri TEXT,
version_count INTEGER NOT NULL DEFAULT 0,
latest_version TEXT,
published_at TIMESTAMPTZ,
published_by TEXT,
CONSTRAINT pk_pack_registry_packs PRIMARY KEY (tenant_id, pack_id),
CONSTRAINT uq_pack_registry_pack_name UNIQUE (tenant_id, name),
CONSTRAINT ck_pack_registry_version_count_non_negative CHECK (version_count >= 0)
);
CREATE INDEX IF NOT EXISTS ix_pack_registry_packs_status_updated
ON packs.packs (tenant_id, status, updated_at DESC);
CREATE INDEX IF NOT EXISTS ix_pack_registry_packs_project_status_updated
ON packs.packs (tenant_id, project_id, status, updated_at DESC);
CREATE INDEX IF NOT EXISTS ix_pack_registry_packs_published
ON packs.packs (tenant_id, published_at DESC NULLS LAST, updated_at DESC);
CREATE TABLE IF NOT EXISTS packs.pack_versions (
pack_version_id UUID NOT NULL,
tenant_id TEXT NOT NULL,
pack_id UUID NOT NULL,
version TEXT NOT NULL,
sem_ver TEXT,
status packs.pack_version_status NOT NULL DEFAULT 'draft',
artifact_uri TEXT NOT NULL,
artifact_digest TEXT NOT NULL,
artifact_mime_type TEXT,
artifact_size_bytes BIGINT,
manifest_json TEXT,
manifest_digest TEXT,
release_notes TEXT,
min_engine_version TEXT,
dependencies TEXT,
created_by TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_by TEXT,
published_at TIMESTAMPTZ,
published_by TEXT,
deprecated_at TIMESTAMPTZ,
deprecated_by TEXT,
deprecation_reason TEXT,
signature_uri TEXT,
signature_algorithm TEXT,
signed_by TEXT,
signed_at TIMESTAMPTZ,
metadata TEXT,
download_count INTEGER NOT NULL DEFAULT 0,
CONSTRAINT pk_pack_registry_pack_versions PRIMARY KEY (tenant_id, pack_version_id),
CONSTRAINT uq_pack_registry_pack_version UNIQUE (tenant_id, pack_id, version),
CONSTRAINT ck_pack_registry_download_count_non_negative CHECK (download_count >= 0),
CONSTRAINT fk_pack_registry_pack_versions_pack
FOREIGN KEY (tenant_id, pack_id)
REFERENCES packs.packs (tenant_id, pack_id)
ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS ix_pack_registry_pack_versions_pack_status_created
ON packs.pack_versions (tenant_id, pack_id, status, created_at DESC);
CREATE INDEX IF NOT EXISTS ix_pack_registry_pack_versions_status_published
ON packs.pack_versions (tenant_id, status, published_at DESC NULLS LAST, updated_at DESC);
CREATE INDEX IF NOT EXISTS ix_pack_registry_pack_versions_downloads
ON packs.pack_versions (tenant_id, pack_id, download_count DESC);

View File

@@ -0,0 +1,24 @@
using Npgsql;
using StellaOps.JobEngine.Infrastructure.Postgres;
namespace StellaOps.JobEngine.Tests.DeadLetter;
public sealed class PostgresDeadLetterRepositoryTests
{
[Theory]
[InlineData(PostgresErrorCodes.UndefinedFunction)]
[InlineData(PostgresErrorCodes.AmbiguousColumn)]
public void ShouldUseActionableSummaryFallback_ReturnsTrue_ForRecoverableLegacySqlStates(string sqlState)
{
Assert.True(PostgresDeadLetterRepository.ShouldUseActionableSummaryFallback(sqlState));
}
[Theory]
[InlineData(PostgresErrorCodes.UndefinedTable)]
[InlineData("XX000")]
[InlineData(null)]
public void ShouldUseActionableSummaryFallback_ReturnsFalse_ForNonFallbackSqlStates(string? sqlState)
{
Assert.False(PostgresDeadLetterRepository.ShouldUseActionableSummaryFallback(sqlState));
}
}

View File

@@ -3,6 +3,7 @@ using Microsoft.EntityFrameworkCore;
using StellaOps.JobEngine.Infrastructure.EfCore.CompiledModels;
using StellaOps.JobEngine.Infrastructure.EfCore.Context;
using StellaOps.JobEngine.Infrastructure.EfCore.Models;
using StellaOps.JobEngine.Infrastructure.Postgres;
using StellaOps.TestKit;
using Xunit;
@@ -51,6 +52,32 @@ public sealed class SchemaConsistencyTests
compiledSchemas.Should().ContainSingle().Which.Should().Be(JobEngineDbContext.DefaultSchemaName);
}
[Trait("Category", TestCategories.Unit)]
[Fact]
public void DataSourceSchemaResolution_UsesOrchestratorForDefaultConnections()
{
JobEngineDataSource.ResolveSchemaName(null).Should().Be(JobEngineDbContext.DefaultSchemaName);
JobEngineDataSource.ResolveSchemaName(string.Empty).Should().Be(JobEngineDbContext.DefaultSchemaName);
JobEngineDataSource.ResolveSchemaName(" packs ").Should().Be("packs");
}
[Trait("Category", TestCategories.Unit)]
[Fact]
public void InfrastructureAssembly_EmbedsPackRegistryStartupMigration()
{
var assembly = typeof(JobEngineDataSource).Assembly;
assembly.GetManifestResourceNames().Should().Contain("009_packs_registry.sql");
using var stream = assembly.GetManifestResourceStream("009_packs_registry.sql");
stream.Should().NotBeNull();
using var reader = new StreamReader(stream!);
var sql = reader.ReadToEnd();
sql.Should().Contain("CREATE SCHEMA IF NOT EXISTS packs;");
sql.Should().Contain("CREATE TABLE IF NOT EXISTS packs.pack_versions");
}
private static JobEngineDbContext CreateRuntimeContext()
{
var options = new DbContextOptionsBuilder<JobEngineDbContext>()

View File

@@ -189,6 +189,7 @@ app.MapWorkerEndpoints();
// Register quota governance and circuit breaker endpoints (per SPRINT_20260208_042)
app.MapCircuitBreakerEndpoints();
app.MapQuotaEndpoints();
app.MapQuotaGovernanceEndpoints();
// Register dead-letter queue management endpoints