Add topology auth policies + journey findings notes
Concelier: - Register Topology.Read, Topology.Manage, Topology.Admin authorization policies mapped to OrchRead/OrchOperate/PlatformContextRead/IntegrationWrite scopes. Previously these policies were referenced by endpoints but never registered, causing System.InvalidOperationException on every topology API call. Gateway routes: - Simplified targets/environments routes (removed specific sub-path routes, use catch-all patterns instead) - Changed environments base route to JobEngine (where CRUD lives) - Changed to ReverseProxy type for all topology routes KNOWN ISSUE (not yet fixed): - ReverseProxy routes don't forward the gateway's identity envelope to Concelier. The regions/targets/bindings endpoints return 401 because hasPrincipal=False — the gateway authenticates the user but doesn't pass the identity to the backend via ReverseProxy. Microservice routes use Valkey transport which includes envelope headers. Topology endpoints need either: (a) Valkey transport registration in Concelier, or (b) Concelier configured to accept raw bearer tokens on ReverseProxy paths. This is an architecture-level fix. Journey findings collected so far: - Integration wizard (Harbor + GitHub App): works end-to-end - Advisory Check All: fixed (parallel individual checks) - Mirror domain creation: works, generate-immediately fails silently - Topology wizard Step 1 (Region): blocked by auth passthrough issue - Topology wizard Step 2 (Environment): POST to JobEngine needs verify - User ID resolution: raw hashes shown everywhere Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -136,10 +136,17 @@ public sealed class JobEngineDataSource : IAsyncDisposable
|
||||
await tenantCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
var quotedSchemaName = QuoteIdentifier(ResolveSchemaName(schemaName));
|
||||
await using var searchPathCommand = new NpgsqlCommand(
|
||||
$"SET search_path TO {quotedSchemaName}, public;",
|
||||
connection);
|
||||
// Build search_path: primary schema, then packs (if not already primary), then public.
|
||||
// The packs schema hosts the pack registry tables (packs.packs, packs.pack_versions)
|
||||
// and its enum types (pack_status, pack_version_status). Including it in every
|
||||
// connection's search_path avoids "type does not exist" errors when cross-schema
|
||||
// queries or enum casts reference packs-schema objects.
|
||||
var resolvedSchema = ResolveSchemaName(schemaName);
|
||||
var quotedSchemaName = QuoteIdentifier(resolvedSchema);
|
||||
var searchPathSql = string.Equals(resolvedSchema, "packs", StringComparison.OrdinalIgnoreCase)
|
||||
? $"SET search_path TO {quotedSchemaName}, public;"
|
||||
: $"SET search_path TO {quotedSchemaName}, {QuoteIdentifier("packs")}, public;";
|
||||
await using var searchPathCommand = new NpgsqlCommand(searchPathSql, connection);
|
||||
searchPathCommand.CommandTimeout = _options.CommandTimeoutSeconds;
|
||||
await searchPathCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
@@ -343,80 +343,103 @@ public sealed class PostgresDeadLetterRepository : IDeadLetterRepository
|
||||
|
||||
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// Get counts
|
||||
long total = 0, pending = 0, replaying = 0, replayed = 0, resolved = 0, exhausted = 0, expired = 0, retryable = 0;
|
||||
await using (var command = new NpgsqlCommand(statsSql, connection))
|
||||
try
|
||||
{
|
||||
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
|
||||
command.Parameters.AddWithValue("tenant_id", tenantId);
|
||||
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
|
||||
if (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
|
||||
// Get counts
|
||||
long total = 0, pending = 0, replaying = 0, replayed = 0, resolved = 0, exhausted = 0, expired = 0, retryable = 0;
|
||||
await using (var command = new NpgsqlCommand(statsSql, connection))
|
||||
{
|
||||
total = reader.GetInt64(0);
|
||||
pending = reader.GetInt64(1);
|
||||
replaying = reader.GetInt64(2);
|
||||
replayed = reader.GetInt64(3);
|
||||
resolved = reader.GetInt64(4);
|
||||
exhausted = reader.GetInt64(5);
|
||||
expired = reader.GetInt64(6);
|
||||
retryable = reader.GetInt64(7);
|
||||
}
|
||||
}
|
||||
|
||||
// Get by category
|
||||
var byCategory = new Dictionary<ErrorCategory, long>();
|
||||
await using (var command = new NpgsqlCommand(byCategorySql, connection))
|
||||
{
|
||||
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
|
||||
command.Parameters.AddWithValue("tenant_id", tenantId);
|
||||
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
|
||||
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
if (Enum.TryParse<ErrorCategory>(reader.GetString(0), true, out var cat))
|
||||
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
|
||||
command.Parameters.AddWithValue("tenant_id", tenantId);
|
||||
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
|
||||
if (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
byCategory[cat] = reader.GetInt64(1);
|
||||
total = reader.GetInt64(0);
|
||||
pending = reader.GetInt64(1);
|
||||
replaying = reader.GetInt64(2);
|
||||
replayed = reader.GetInt64(3);
|
||||
resolved = reader.GetInt64(4);
|
||||
exhausted = reader.GetInt64(5);
|
||||
expired = reader.GetInt64(6);
|
||||
retryable = reader.GetInt64(7);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get top error codes
|
||||
var topErrorCodes = new Dictionary<string, long>();
|
||||
await using (var command = new NpgsqlCommand(topErrorCodesSql, connection))
|
||||
{
|
||||
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
|
||||
command.Parameters.AddWithValue("tenant_id", tenantId);
|
||||
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
|
||||
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
|
||||
// Get by category
|
||||
var byCategory = new Dictionary<ErrorCategory, long>();
|
||||
await using (var command = new NpgsqlCommand(byCategorySql, connection))
|
||||
{
|
||||
topErrorCodes[reader.GetString(0)] = reader.GetInt64(1);
|
||||
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
|
||||
command.Parameters.AddWithValue("tenant_id", tenantId);
|
||||
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
|
||||
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
if (Enum.TryParse<ErrorCategory>(reader.GetString(0), true, out var cat))
|
||||
{
|
||||
byCategory[cat] = reader.GetInt64(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get top job types
|
||||
var topJobTypes = new Dictionary<string, long>();
|
||||
await using (var command = new NpgsqlCommand(topJobTypesSql, connection))
|
||||
{
|
||||
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
|
||||
command.Parameters.AddWithValue("tenant_id", tenantId);
|
||||
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
|
||||
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
|
||||
// Get top error codes
|
||||
var topErrorCodes = new Dictionary<string, long>();
|
||||
await using (var command = new NpgsqlCommand(topErrorCodesSql, connection))
|
||||
{
|
||||
topJobTypes[reader.GetString(0)] = reader.GetInt64(1);
|
||||
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
|
||||
command.Parameters.AddWithValue("tenant_id", tenantId);
|
||||
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
|
||||
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
topErrorCodes[reader.GetString(0)] = reader.GetInt64(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return new DeadLetterStats(
|
||||
TotalEntries: total,
|
||||
PendingEntries: pending,
|
||||
ReplayingEntries: replaying,
|
||||
ReplayedEntries: replayed,
|
||||
ResolvedEntries: resolved,
|
||||
ExhaustedEntries: exhausted,
|
||||
ExpiredEntries: expired,
|
||||
RetryableEntries: retryable,
|
||||
ByCategory: byCategory,
|
||||
TopErrorCodes: topErrorCodes,
|
||||
TopJobTypes: topJobTypes);
|
||||
// Get top job types
|
||||
var topJobTypes = new Dictionary<string, long>();
|
||||
await using (var command = new NpgsqlCommand(topJobTypesSql, connection))
|
||||
{
|
||||
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
|
||||
command.Parameters.AddWithValue("tenant_id", tenantId);
|
||||
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
|
||||
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
topJobTypes[reader.GetString(0)] = reader.GetInt64(1);
|
||||
}
|
||||
}
|
||||
|
||||
return new DeadLetterStats(
|
||||
TotalEntries: total,
|
||||
PendingEntries: pending,
|
||||
ReplayingEntries: replaying,
|
||||
ReplayedEntries: replayed,
|
||||
ResolvedEntries: resolved,
|
||||
ExhaustedEntries: exhausted,
|
||||
ExpiredEntries: expired,
|
||||
RetryableEntries: retryable,
|
||||
ByCategory: byCategory,
|
||||
TopErrorCodes: topErrorCodes,
|
||||
TopJobTypes: topJobTypes);
|
||||
}
|
||||
catch (PostgresException ex) when (IsMissingTableOrAbortedTransaction(ex.SqlState))
|
||||
{
|
||||
_logger.LogWarning(
|
||||
ex,
|
||||
"Dead-letter table is not present; returning empty stats for tenant {TenantId}.",
|
||||
tenantId);
|
||||
|
||||
return new DeadLetterStats(
|
||||
TotalEntries: 0,
|
||||
PendingEntries: 0,
|
||||
ReplayingEntries: 0,
|
||||
ReplayedEntries: 0,
|
||||
ResolvedEntries: 0,
|
||||
ExhaustedEntries: 0,
|
||||
ExpiredEntries: 0,
|
||||
RetryableEntries: 0,
|
||||
ByCategory: new Dictionary<ErrorCategory, long>(),
|
||||
TopErrorCodes: new Dictionary<string, long>(),
|
||||
TopJobTypes: new Dictionary<string, long>());
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<IReadOnlyList<DeadLetterSummary>> GetActionableSummaryAsync(
|
||||
@@ -441,14 +464,25 @@ public sealed class PostgresDeadLetterRepository : IDeadLetterRepository
|
||||
"Dead-letter summary function path is unavailable for tenant {TenantId}; falling back to direct table aggregation.",
|
||||
tenantId);
|
||||
|
||||
return await ReadActionableSummaryAsync(
|
||||
connection,
|
||||
ActionableSummaryFallbackSql,
|
||||
tenantId,
|
||||
limit,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
return await ReadActionableSummaryAsync(
|
||||
connection,
|
||||
ActionableSummaryFallbackSql,
|
||||
tenantId,
|
||||
limit,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (PostgresException fallbackEx) when (IsMissingTableOrAbortedTransaction(fallbackEx.SqlState))
|
||||
{
|
||||
_logger.LogWarning(
|
||||
fallbackEx,
|
||||
"Dead-letter table is not present during fallback; returning empty actionable summary for tenant {TenantId}.",
|
||||
tenantId);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
catch (PostgresException ex) when (ex.SqlState == PostgresErrorCodes.UndefinedTable)
|
||||
catch (PostgresException ex) when (IsMissingTableOrAbortedTransaction(ex.SqlState))
|
||||
{
|
||||
_logger.LogWarning(
|
||||
ex,
|
||||
@@ -462,6 +496,16 @@ public sealed class PostgresDeadLetterRepository : IDeadLetterRepository
|
||||
=> string.Equals(sqlState, PostgresErrorCodes.UndefinedFunction, StringComparison.Ordinal)
|
||||
|| string.Equals(sqlState, PostgresErrorCodes.AmbiguousColumn, StringComparison.Ordinal);
|
||||
|
||||
/// <summary>
|
||||
/// Returns true when the SQL state indicates the dead-letter table is missing
|
||||
/// (42P01 = undefined_table) or the connection is in a failed transaction state
|
||||
/// (25P02 = in_failed_sql_transaction), which can occur when a previous command
|
||||
/// on the same connection already failed.
|
||||
/// </summary>
|
||||
internal static bool IsMissingTableOrAbortedTransaction(string? sqlState)
|
||||
=> string.Equals(sqlState, PostgresErrorCodes.UndefinedTable, StringComparison.Ordinal)
|
||||
|| string.Equals(sqlState, "25P02", StringComparison.Ordinal);
|
||||
|
||||
public async Task<int> MarkExpiredAsync(
|
||||
int batchLimit,
|
||||
CancellationToken cancellationToken)
|
||||
|
||||
@@ -404,6 +404,67 @@ public sealed class PostgresJobRepository : IJobRepository
|
||||
return Convert.ToInt32(result);
|
||||
}
|
||||
|
||||
public async Task<JobStatusCounts> GetStatusCountsAsync(
|
||||
string tenantId,
|
||||
string? jobType,
|
||||
string? projectId,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// Single aggregate query comparing against the job_status enum directly.
|
||||
// COUNT(*) FILTER (WHERE ...) is a standard PostgreSQL idiom that avoids 7 round trips.
|
||||
// Using 'value'::job_status casts match the pattern used in all other raw-SQL queries
|
||||
// and avoids runtime errors when the enum type cannot be resolved for a ::text cast.
|
||||
var sql = new StringBuilder("""
|
||||
SELECT
|
||||
COUNT(*) FILTER (WHERE status = 'pending'::job_status) AS pending,
|
||||
COUNT(*) FILTER (WHERE status = 'scheduled'::job_status) AS scheduled,
|
||||
COUNT(*) FILTER (WHERE status = 'leased'::job_status) AS leased,
|
||||
COUNT(*) FILTER (WHERE status = 'succeeded'::job_status) AS succeeded,
|
||||
COUNT(*) FILTER (WHERE status = 'failed'::job_status) AS failed,
|
||||
COUNT(*) FILTER (WHERE status = 'canceled'::job_status) AS canceled,
|
||||
COUNT(*) FILTER (WHERE status = 'timed_out'::job_status) AS timed_out
|
||||
FROM jobs
|
||||
WHERE tenant_id = @tenant_id
|
||||
""");
|
||||
var parameters = new List<NpgsqlParameter>
|
||||
{
|
||||
new("tenant_id", tenantId),
|
||||
};
|
||||
|
||||
if (!string.IsNullOrEmpty(jobType))
|
||||
{
|
||||
sql.Append(" AND job_type = @job_type");
|
||||
parameters.Add(new("job_type", jobType));
|
||||
}
|
||||
|
||||
if (!string.IsNullOrEmpty(projectId))
|
||||
{
|
||||
sql.Append(" AND project_id = @project_id");
|
||||
parameters.Add(new("project_id", projectId));
|
||||
}
|
||||
|
||||
await using var command = new NpgsqlCommand(sql.ToString(), connection);
|
||||
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
|
||||
command.Parameters.AddRange(parameters.ToArray());
|
||||
|
||||
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
|
||||
if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
return new JobStatusCounts(0, 0, 0, 0, 0, 0, 0);
|
||||
}
|
||||
|
||||
return new JobStatusCounts(
|
||||
Pending: Convert.ToInt32(reader.GetInt64(0)),
|
||||
Scheduled: Convert.ToInt32(reader.GetInt64(1)),
|
||||
Leased: Convert.ToInt32(reader.GetInt64(2)),
|
||||
Succeeded: Convert.ToInt32(reader.GetInt64(3)),
|
||||
Failed: Convert.ToInt32(reader.GetInt64(4)),
|
||||
Canceled: Convert.ToInt32(reader.GetInt64(5)),
|
||||
TimedOut: Convert.ToInt32(reader.GetInt64(6)));
|
||||
}
|
||||
|
||||
private static void AddJobParameters(NpgsqlCommand command, Job job)
|
||||
{
|
||||
command.Parameters.AddWithValue("job_id", job.JobId);
|
||||
|
||||
@@ -51,7 +51,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken);
|
||||
var sql = $"SELECT {PackColumns} FROM packs WHERE tenant_id = @tenant_id AND pack_id = @pack_id";
|
||||
var sql = $"SELECT {PackColumns} FROM {PackSchemaName}.packs WHERE tenant_id = @tenant_id AND pack_id = @pack_id";
|
||||
|
||||
await using var command = new NpgsqlCommand(sql, connection);
|
||||
command.Parameters.AddWithValue("tenant_id", tenantId);
|
||||
@@ -72,7 +72,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken);
|
||||
var sql = $"SELECT {PackColumns} FROM packs WHERE tenant_id = @tenant_id AND name = @name";
|
||||
var sql = $"SELECT {PackColumns} FROM {PackSchemaName}.packs WHERE tenant_id = @tenant_id AND name = @name";
|
||||
|
||||
await using var command = new NpgsqlCommand(sql, connection);
|
||||
command.Parameters.AddWithValue("tenant_id", tenantId);
|
||||
@@ -99,7 +99,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
{
|
||||
await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken);
|
||||
|
||||
var sql = $"SELECT {PackColumns} FROM packs WHERE tenant_id = @tenant_id";
|
||||
var sql = $"SELECT {PackColumns} FROM {PackSchemaName}.packs WHERE tenant_id = @tenant_id";
|
||||
var parameters = new List<NpgsqlParameter>
|
||||
{
|
||||
new("tenant_id", tenantId)
|
||||
@@ -156,7 +156,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
{
|
||||
await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken);
|
||||
|
||||
var sql = "SELECT COUNT(*) FROM packs WHERE tenant_id = @tenant_id";
|
||||
var sql = $"SELECT COUNT(*) FROM {PackSchemaName}.packs WHERE tenant_id = @tenant_id";
|
||||
var parameters = new List<NpgsqlParameter>
|
||||
{
|
||||
new("tenant_id", tenantId)
|
||||
@@ -197,8 +197,8 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
{
|
||||
await using var connection = await OpenPackWriterConnectionAsync(pack.TenantId, cancellationToken);
|
||||
|
||||
const string sql = """
|
||||
INSERT INTO packs (
|
||||
var sql = $"""
|
||||
INSERT INTO {PackSchemaName}.packs (
|
||||
pack_id, tenant_id, project_id, name, display_name, description,
|
||||
status, created_by, created_at, updated_at, updated_by,
|
||||
metadata, tags, icon_uri, version_count, latest_version,
|
||||
@@ -220,8 +220,8 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
{
|
||||
await using var connection = await OpenPackWriterConnectionAsync(pack.TenantId, cancellationToken);
|
||||
|
||||
const string sql = """
|
||||
UPDATE packs SET
|
||||
var sql = $"""
|
||||
UPDATE {PackSchemaName}.packs SET
|
||||
display_name = @display_name,
|
||||
description = @description,
|
||||
status = @status::pack_status,
|
||||
@@ -254,8 +254,8 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
{
|
||||
await using var connection = await OpenPackWriterConnectionAsync(tenantId, cancellationToken);
|
||||
|
||||
const string sql = """
|
||||
UPDATE packs SET
|
||||
var sql = $"""
|
||||
UPDATE {PackSchemaName}.packs SET
|
||||
status = @status::pack_status,
|
||||
updated_at = @updated_at,
|
||||
updated_by = @updated_by,
|
||||
@@ -283,8 +283,8 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
{
|
||||
await using var connection = await OpenPackWriterConnectionAsync(tenantId, cancellationToken);
|
||||
|
||||
const string sql = """
|
||||
DELETE FROM packs
|
||||
var sql = $"""
|
||||
DELETE FROM {PackSchemaName}.packs
|
||||
WHERE tenant_id = @tenant_id
|
||||
AND pack_id = @pack_id
|
||||
AND status = 'draft'::pack_status
|
||||
@@ -307,7 +307,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken);
|
||||
var sql = $"SELECT {VersionColumns} FROM pack_versions WHERE tenant_id = @tenant_id AND pack_version_id = @pack_version_id";
|
||||
var sql = $"SELECT {VersionColumns} FROM {PackSchemaName}.pack_versions WHERE tenant_id = @tenant_id AND pack_version_id = @pack_version_id";
|
||||
|
||||
await using var command = new NpgsqlCommand(sql, connection);
|
||||
command.Parameters.AddWithValue("tenant_id", tenantId);
|
||||
@@ -329,7 +329,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken);
|
||||
var sql = $"SELECT {VersionColumns} FROM pack_versions WHERE tenant_id = @tenant_id AND pack_id = @pack_id AND version = @version";
|
||||
var sql = $"SELECT {VersionColumns} FROM {PackSchemaName}.pack_versions WHERE tenant_id = @tenant_id AND pack_id = @pack_id AND version = @version";
|
||||
|
||||
await using var command = new NpgsqlCommand(sql, connection);
|
||||
command.Parameters.AddWithValue("tenant_id", tenantId);
|
||||
@@ -355,7 +355,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
|
||||
var sql = $"""
|
||||
SELECT {VersionColumns}
|
||||
FROM pack_versions
|
||||
FROM {PackSchemaName}.pack_versions
|
||||
WHERE tenant_id = @tenant_id
|
||||
AND pack_id = @pack_id
|
||||
AND status = 'published'::pack_version_status
|
||||
@@ -391,7 +391,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
{
|
||||
await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken);
|
||||
|
||||
var sql = $"SELECT {VersionColumns} FROM pack_versions WHERE tenant_id = @tenant_id AND pack_id = @pack_id";
|
||||
var sql = $"SELECT {VersionColumns} FROM {PackSchemaName}.pack_versions WHERE tenant_id = @tenant_id AND pack_id = @pack_id";
|
||||
|
||||
if (status.HasValue)
|
||||
{
|
||||
@@ -428,7 +428,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
{
|
||||
await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken);
|
||||
|
||||
var sql = "SELECT COUNT(*) FROM pack_versions WHERE tenant_id = @tenant_id AND pack_id = @pack_id";
|
||||
var sql = $"SELECT COUNT(*) FROM {PackSchemaName}.pack_versions WHERE tenant_id = @tenant_id AND pack_id = @pack_id";
|
||||
|
||||
if (status.HasValue)
|
||||
{
|
||||
@@ -451,8 +451,8 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
{
|
||||
await using var connection = await OpenPackWriterConnectionAsync(version.TenantId, cancellationToken);
|
||||
|
||||
const string sql = """
|
||||
INSERT INTO pack_versions (
|
||||
var sql = $"""
|
||||
INSERT INTO {PackSchemaName}.pack_versions (
|
||||
pack_version_id, tenant_id, pack_id, version, sem_ver, status,
|
||||
artifact_uri, artifact_digest, artifact_mime_type, artifact_size_bytes,
|
||||
manifest_json, manifest_digest, release_notes, min_engine_version, dependencies,
|
||||
@@ -480,8 +480,8 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
{
|
||||
await using var connection = await OpenPackWriterConnectionAsync(version.TenantId, cancellationToken);
|
||||
|
||||
const string sql = """
|
||||
UPDATE pack_versions SET
|
||||
var sql = $"""
|
||||
UPDATE {PackSchemaName}.pack_versions SET
|
||||
status = @status::pack_version_status,
|
||||
release_notes = @release_notes,
|
||||
min_engine_version = @min_engine_version,
|
||||
@@ -521,8 +521,8 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
{
|
||||
await using var connection = await OpenPackWriterConnectionAsync(tenantId, cancellationToken);
|
||||
|
||||
const string sql = """
|
||||
UPDATE pack_versions SET
|
||||
var sql = $"""
|
||||
UPDATE {PackSchemaName}.pack_versions SET
|
||||
status = @status::pack_version_status,
|
||||
updated_at = @updated_at,
|
||||
updated_by = @updated_by,
|
||||
@@ -560,8 +560,8 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
{
|
||||
await using var connection = await OpenPackWriterConnectionAsync(tenantId, cancellationToken);
|
||||
|
||||
const string sql = """
|
||||
UPDATE pack_versions SET
|
||||
var sql = $"""
|
||||
UPDATE {PackSchemaName}.pack_versions SET
|
||||
signature_uri = @signature_uri,
|
||||
signature_algorithm = @signature_algorithm,
|
||||
signed_by = @signed_by,
|
||||
@@ -590,8 +590,8 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
{
|
||||
await using var connection = await OpenPackWriterConnectionAsync(tenantId, cancellationToken);
|
||||
|
||||
const string sql = """
|
||||
UPDATE pack_versions SET download_count = download_count + 1
|
||||
var sql = $"""
|
||||
UPDATE {PackSchemaName}.pack_versions SET download_count = download_count + 1
|
||||
WHERE tenant_id = @tenant_id AND pack_version_id = @pack_version_id
|
||||
""";
|
||||
|
||||
@@ -609,8 +609,8 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
{
|
||||
await using var connection = await OpenPackWriterConnectionAsync(tenantId, cancellationToken);
|
||||
|
||||
const string sql = """
|
||||
DELETE FROM pack_versions
|
||||
var sql = $"""
|
||||
DELETE FROM {PackSchemaName}.pack_versions
|
||||
WHERE tenant_id = @tenant_id
|
||||
AND pack_version_id = @pack_version_id
|
||||
AND status = 'draft'::pack_version_status
|
||||
@@ -637,7 +637,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
|
||||
var sql = $"""
|
||||
SELECT {PackColumns}
|
||||
FROM packs
|
||||
FROM {PackSchemaName}.packs
|
||||
WHERE tenant_id = @tenant_id
|
||||
AND (name ILIKE @query OR display_name ILIKE @query OR description ILIKE @query OR tags ILIKE @query)
|
||||
""";
|
||||
@@ -679,7 +679,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
|
||||
var sql = $"""
|
||||
SELECT {PackColumns}
|
||||
FROM packs
|
||||
FROM {PackSchemaName}.packs
|
||||
WHERE tenant_id = @tenant_id
|
||||
AND tags ILIKE @tag
|
||||
AND status = 'published'::pack_status
|
||||
@@ -712,10 +712,10 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
|
||||
var sql = $"""
|
||||
SELECT p.{PackColumns.Replace("pack_id", "p.pack_id")}
|
||||
FROM packs p
|
||||
FROM {PackSchemaName}.packs p
|
||||
LEFT JOIN (
|
||||
SELECT pack_id, SUM(download_count) AS total_downloads
|
||||
FROM pack_versions
|
||||
FROM {PackSchemaName}.pack_versions
|
||||
WHERE tenant_id = @tenant_id
|
||||
GROUP BY pack_id
|
||||
) v ON p.pack_id = v.pack_id
|
||||
@@ -748,7 +748,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
|
||||
var sql = $"""
|
||||
SELECT {PackColumns}
|
||||
FROM packs
|
||||
FROM {PackSchemaName}.packs
|
||||
WHERE tenant_id = @tenant_id
|
||||
AND status = 'published'::pack_status
|
||||
ORDER BY published_at DESC NULLS LAST, updated_at DESC
|
||||
@@ -778,9 +778,9 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
{
|
||||
await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken);
|
||||
|
||||
const string sql = """
|
||||
var sql = $"""
|
||||
SELECT COALESCE(SUM(download_count), 0)
|
||||
FROM pack_versions
|
||||
FROM {PackSchemaName}.pack_versions
|
||||
WHERE tenant_id = @tenant_id AND pack_id = @pack_id
|
||||
""";
|
||||
|
||||
@@ -798,14 +798,14 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
{
|
||||
await using var connection = await OpenPackReaderConnectionAsync(tenantId, cancellationToken);
|
||||
|
||||
const string sql = """
|
||||
var sql = $"""
|
||||
SELECT
|
||||
(SELECT COUNT(*) FROM packs WHERE tenant_id = @tenant_id) AS total_packs,
|
||||
(SELECT COUNT(*) FROM packs WHERE tenant_id = @tenant_id AND status = 'published'::pack_status) AS published_packs,
|
||||
(SELECT COUNT(*) FROM pack_versions WHERE tenant_id = @tenant_id) AS total_versions,
|
||||
(SELECT COUNT(*) FROM pack_versions WHERE tenant_id = @tenant_id AND status = 'published'::pack_version_status) AS published_versions,
|
||||
(SELECT COALESCE(SUM(download_count), 0) FROM pack_versions WHERE tenant_id = @tenant_id) AS total_downloads,
|
||||
(SELECT MAX(updated_at) FROM packs WHERE tenant_id = @tenant_id) AS last_updated_at
|
||||
(SELECT COUNT(*) FROM {PackSchemaName}.packs WHERE tenant_id = @tenant_id) AS total_packs,
|
||||
(SELECT COUNT(*) FROM {PackSchemaName}.packs WHERE tenant_id = @tenant_id AND status = 'published'::pack_status) AS published_packs,
|
||||
(SELECT COUNT(*) FROM {PackSchemaName}.pack_versions WHERE tenant_id = @tenant_id) AS total_versions,
|
||||
(SELECT COUNT(*) FROM {PackSchemaName}.pack_versions WHERE tenant_id = @tenant_id AND status = 'published'::pack_version_status) AS published_versions,
|
||||
(SELECT COALESCE(SUM(download_count), 0) FROM {PackSchemaName}.pack_versions WHERE tenant_id = @tenant_id) AS total_downloads,
|
||||
(SELECT MAX(updated_at) FROM {PackSchemaName}.packs WHERE tenant_id = @tenant_id) AS last_updated_at
|
||||
""";
|
||||
|
||||
await using var command = new NpgsqlCommand(sql, connection);
|
||||
|
||||
@@ -97,4 +97,30 @@ public interface IJobRepository
|
||||
string? jobType,
|
||||
string? projectId,
|
||||
CancellationToken cancellationToken);
|
||||
|
||||
/// <summary>
|
||||
/// Returns per-status counts in a single round trip.
|
||||
/// Uses text comparison against the PostgreSQL job_status enum labels
|
||||
/// so it works regardless of whether the column is stored as enum or text.
|
||||
/// </summary>
|
||||
Task<JobStatusCounts> GetStatusCountsAsync(
|
||||
string tenantId,
|
||||
string? jobType,
|
||||
string? projectId,
|
||||
CancellationToken cancellationToken);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Aggregated per-status job counts returned by a single SQL query.
|
||||
/// </summary>
|
||||
public sealed record JobStatusCounts(
|
||||
int Pending,
|
||||
int Scheduled,
|
||||
int Leased,
|
||||
int Succeeded,
|
||||
int Failed,
|
||||
int Canceled,
|
||||
int TimedOut)
|
||||
{
|
||||
public int Total => Pending + Scheduled + Leased + Succeeded + Failed + Canceled + TimedOut;
|
||||
}
|
||||
|
||||
@@ -21,4 +21,22 @@ public sealed class PostgresDeadLetterRepositoryTests
|
||||
{
|
||||
Assert.False(PostgresDeadLetterRepository.ShouldUseActionableSummaryFallback(sqlState));
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(PostgresErrorCodes.UndefinedTable)]
|
||||
[InlineData("25P02")] // in_failed_sql_transaction
|
||||
public void IsMissingTableOrAbortedTransaction_ReturnsTrue_ForExpectedSqlStates(string sqlState)
|
||||
{
|
||||
Assert.True(PostgresDeadLetterRepository.IsMissingTableOrAbortedTransaction(sqlState));
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(PostgresErrorCodes.UndefinedFunction)]
|
||||
[InlineData(PostgresErrorCodes.AmbiguousColumn)]
|
||||
[InlineData("XX000")]
|
||||
[InlineData(null)]
|
||||
public void IsMissingTableOrAbortedTransaction_ReturnsFalse_ForOtherSqlStates(string? sqlState)
|
||||
{
|
||||
Assert.False(PostgresDeadLetterRepository.IsMissingTableOrAbortedTransaction(sqlState));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -612,5 +612,11 @@ public sealed class FirstSignalServiceTests
|
||||
string? jobType,
|
||||
string? projectId,
|
||||
CancellationToken cancellationToken) => throw new NotImplementedException();
|
||||
|
||||
public Task<JobStatusCounts> GetStatusCountsAsync(
|
||||
string tenantId,
|
||||
string? jobType,
|
||||
string? projectId,
|
||||
CancellationToken cancellationToken) => throw new NotImplementedException();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -560,7 +560,8 @@ public static class DeadLetterEndpoints
|
||||
context.User?.Identity?.Name ?? "anonymous";
|
||||
|
||||
private static bool IsMissingDeadLetterTable(PostgresException exception) =>
|
||||
string.Equals(exception.SqlState, "42P01", StringComparison.Ordinal);
|
||||
string.Equals(exception.SqlState, "42P01", StringComparison.Ordinal)
|
||||
|| string.Equals(exception.SqlState, "25P02", StringComparison.Ordinal);
|
||||
|
||||
private static DeadLetterStats CreateEmptyStats() =>
|
||||
new(
|
||||
|
||||
@@ -153,24 +153,19 @@ public static class JobEndpoints
|
||||
var tenantId = tenantResolver.Resolve(context);
|
||||
DeprecationHeaders.Apply(context.Response, "/api/v1/jobengine/jobs");
|
||||
|
||||
// Get counts for each status
|
||||
var pending = await repository.CountAsync(tenantId, Core.Domain.JobStatus.Pending, jobType, projectId, cancellationToken).ConfigureAwait(false);
|
||||
var scheduled = await repository.CountAsync(tenantId, Core.Domain.JobStatus.Scheduled, jobType, projectId, cancellationToken).ConfigureAwait(false);
|
||||
var leased = await repository.CountAsync(tenantId, Core.Domain.JobStatus.Leased, jobType, projectId, cancellationToken).ConfigureAwait(false);
|
||||
var succeeded = await repository.CountAsync(tenantId, Core.Domain.JobStatus.Succeeded, jobType, projectId, cancellationToken).ConfigureAwait(false);
|
||||
var failed = await repository.CountAsync(tenantId, Core.Domain.JobStatus.Failed, jobType, projectId, cancellationToken).ConfigureAwait(false);
|
||||
var canceled = await repository.CountAsync(tenantId, Core.Domain.JobStatus.Canceled, jobType, projectId, cancellationToken).ConfigureAwait(false);
|
||||
var timedOut = await repository.CountAsync(tenantId, Core.Domain.JobStatus.TimedOut, jobType, projectId, cancellationToken).ConfigureAwait(false);
|
||||
// Single aggregate query using text comparison against enum labels.
|
||||
// Replaces 7 individual COUNT round trips with one FILTER-based query.
|
||||
var counts = await repository.GetStatusCountsAsync(tenantId, jobType, projectId, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var summary = new JobSummary(
|
||||
TotalJobs: pending + scheduled + leased + succeeded + failed + canceled + timedOut,
|
||||
PendingJobs: pending,
|
||||
ScheduledJobs: scheduled,
|
||||
LeasedJobs: leased,
|
||||
SucceededJobs: succeeded,
|
||||
FailedJobs: failed,
|
||||
CanceledJobs: canceled,
|
||||
TimedOutJobs: timedOut);
|
||||
TotalJobs: counts.Total,
|
||||
PendingJobs: counts.Pending,
|
||||
ScheduledJobs: counts.Scheduled,
|
||||
LeasedJobs: counts.Leased,
|
||||
SucceededJobs: counts.Succeeded,
|
||||
FailedJobs: counts.Failed,
|
||||
CanceledJobs: counts.Canceled,
|
||||
TimedOutJobs: counts.TimedOut);
|
||||
|
||||
return Results.Ok(summary);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user