Repair live canonical migrations and scanner cache bootstrap

This commit is contained in:
master
2026-03-09 21:56:41 +02:00
parent 00bf2fa99a
commit dfd22281ed
21 changed files with 1018 additions and 12 deletions

View File

@@ -0,0 +1,41 @@
-- 010_dead_letter_summary_ambiguity_fix.sql
-- Repairs get_actionable_dead_letter_summary so the sample-reason subquery
-- unambiguously references dead_letter_entries columns after projection aliasing.
CREATE OR REPLACE FUNCTION get_actionable_dead_letter_summary(
p_tenant_id TEXT,
p_limit INTEGER DEFAULT 10
)
RETURNS TABLE (
error_code TEXT,
category error_category,
entry_count BIGINT,
retryable_count BIGINT,
oldest_entry TIMESTAMPTZ,
sample_reason TEXT
) AS $$
BEGIN
RETURN QUERY
SELECT
dle.error_code,
dle.category,
COUNT(*)::BIGINT AS entry_count,
COUNT(*) FILTER (WHERE dle.is_retryable)::BIGINT AS retryable_count,
MIN(dle.created_at) AS oldest_entry,
(
SELECT dle_sample.failure_reason
FROM dead_letter_entries dle_sample
WHERE dle_sample.tenant_id = p_tenant_id
AND dle_sample.error_code = dle.error_code
AND dle_sample.status = 'pending'
ORDER BY dle_sample.created_at DESC
LIMIT 1
) AS sample_reason
FROM dead_letter_entries dle
WHERE dle.tenant_id = p_tenant_id
AND dle.status = 'pending'
GROUP BY dle.error_code, dle.category
ORDER BY COUNT(*) DESC
LIMIT p_limit;
END;
$$ LANGUAGE plpgsql STABLE;

View File

@@ -0,0 +1,25 @@
using StellaOps.JobEngine.Infrastructure.Postgres;
using StellaOps.TestKit;
namespace StellaOps.JobEngine.Tests;
public sealed class DeadLetterSummaryMigrationTests
{
[Trait("Category", TestCategories.Unit)]
[Fact]
public void DeadLetterSummaryFixMigration_EmbedsQualifiedSubqueryColumns()
{
var assembly = typeof(JobEngineDataSource).Assembly;
Assert.Contains("010_dead_letter_summary_ambiguity_fix.sql", assembly.GetManifestResourceNames());
using var stream = assembly.GetManifestResourceStream("010_dead_letter_summary_ambiguity_fix.sql");
Assert.NotNull(stream);
using var reader = new StreamReader(stream!);
var sql = reader.ReadToEnd();
Assert.Contains("FROM dead_letter_entries dle_sample", sql, StringComparison.Ordinal);
Assert.Contains("dle_sample.error_code = dle.error_code", sql, StringComparison.Ordinal);
Assert.Contains("dle_sample.status = 'pending'", sql, StringComparison.Ordinal);
}
}

View File

@@ -106,7 +106,7 @@ public sealed class RunRepository : RepositoryBase<SchedulerDataSource>, IRunRep
if (!options.States.IsDefaultOrEmpty)
{
filters.Add("state = ANY(@states)");
filters.Add("state = ANY(CAST(@states AS scheduler.run_state[]))");
}
if (options.CreatedAfter is not null)
@@ -174,7 +174,7 @@ public sealed class RunRepository : RepositoryBase<SchedulerDataSource>, IRunRep
const string sql = """
SELECT *
FROM scheduler.runs
WHERE state = @state
WHERE state = CAST(@state AS scheduler.run_state)
ORDER BY created_at ASC
LIMIT @limit
""";
@@ -197,15 +197,18 @@ public sealed class RunRepository : RepositoryBase<SchedulerDataSource>, IRunRep
AddParameter(command, "id", run.Id);
AddParameter(command, "tenant_id", run.TenantId);
AddParameter(command, "schedule_id", run.ScheduleId ?? (object)DBNull.Value);
AddParameter(command, "trigger", Serialize(run.Trigger));
AddParameter(command, "state", run.State.ToString().ToLowerInvariant());
AddParameter(command, "stats", Serialize(run.Stats));
AddParameter(command, "reason", Serialize(run.Reason));
AddJsonbParameter(command, "trigger", Serialize(run.Trigger));
command.Parameters.Add(new NpgsqlParameter<string>("state", run.State.ToString().ToLowerInvariant())
{
DataTypeName = "scheduler.run_state"
});
AddJsonbParameter(command, "stats", Serialize(run.Stats));
AddJsonbParameter(command, "reason", Serialize(run.Reason));
AddParameter(command, "created_at", run.CreatedAt);
AddParameter(command, "started_at", run.StartedAt ?? (object)DBNull.Value);
AddParameter(command, "finished_at", run.FinishedAt ?? (object)DBNull.Value);
AddParameter(command, "error", run.Error ?? (object)DBNull.Value);
AddParameter(command, "deltas", Serialize(run.Deltas));
AddJsonbParameter(command, "deltas", Serialize(run.Deltas));
AddParameter(command, "retry_of", run.RetryOf ?? (object)DBNull.Value);
AddParameter(command, "schema_version", run.SchemaVersion ?? (object)DBNull.Value);
}

View File

@@ -0,0 +1,99 @@
using System;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using StellaOps.Scheduler.Models;
using StellaOps.Scheduler.Persistence.Postgres;
using StellaOps.Scheduler.Persistence.Postgres.Repositories;
using StellaOps.TestKit;
using Xunit;
namespace StellaOps.Scheduler.Persistence.Postgres.Tests;
[Collection(SchedulerPostgresCollection.Name)]
public sealed class RunRepositoryTests : IAsyncLifetime
{
private readonly SchedulerPostgresFixture fixture;
public RunRepositoryTests(SchedulerPostgresFixture fixture)
{
this.fixture = fixture;
}
public ValueTask InitializeAsync() => new(fixture.TruncateAllTablesAsync());
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
[Trait("Category", TestCategories.Unit)]
[Fact]
public async Task ListByStateAsync_ReturnsRunsMatchingEnumState()
{
var repository = CreateRepository();
var queuedRun = CreateRun("tenant-a", RunState.Queued);
var runningRun = CreateRun("tenant-a", RunState.Running);
await repository.InsertAsync(queuedRun, CancellationToken.None);
await repository.InsertAsync(runningRun, CancellationToken.None);
var results = await repository.ListByStateAsync(RunState.Queued, cancellationToken: CancellationToken.None);
results.Should().ContainSingle(run => run.Id == queuedRun.Id);
results.Should().NotContain(run => run.Id == runningRun.Id);
}
[Trait("Category", TestCategories.Unit)]
[Fact]
public async Task ListAsync_FiltersByEnumStatesArray()
{
var repository = CreateRepository();
var queuedRun = CreateRun("tenant-b", RunState.Queued, scheduleId: "sched-1");
var runningRun = CreateRun("tenant-b", RunState.Running, scheduleId: "sched-1");
var completedRun = CreateRun("tenant-b", RunState.Completed, scheduleId: "sched-1");
await repository.InsertAsync(queuedRun, CancellationToken.None);
await repository.InsertAsync(runningRun, CancellationToken.None);
await repository.InsertAsync(completedRun, CancellationToken.None);
var results = await repository.ListAsync(
"tenant-b",
new RunQueryOptions
{
ScheduleId = "sched-1",
States = ImmutableArray.Create(RunState.Queued, RunState.Running),
SortAscending = true,
Limit = 10
},
CancellationToken.None);
results.Select(run => run.Id).Should().BeEquivalentTo(
new[] { queuedRun.Id, runningRun.Id },
options => options.WithoutStrictOrdering());
results.Should().NotContain(run => run.Id == completedRun.Id);
}
private RunRepository CreateRepository()
{
var options = fixture.Fixture.CreateOptions();
options.SchemaName = SchedulerDataSource.DefaultSchemaName;
var dataSource = new SchedulerDataSource(Options.Create(options), NullLogger<SchedulerDataSource>.Instance);
return new RunRepository(dataSource, NullLogger<RunRepository>.Instance);
}
private static Run CreateRun(string tenantId, RunState state, string? scheduleId = null)
{
var createdAt = new DateTimeOffset(2026, 3, 9, 10, 0, 0, TimeSpan.Zero)
.AddMinutes(Guid.NewGuid().GetHashCode() & 15);
return new Run(
id: Guid.NewGuid().ToString("N"),
tenantId: tenantId,
trigger: RunTrigger.Manual,
state: state,
stats: new RunStats(candidates: 1, queued: 1),
createdAt: createdAt,
scheduleId: scheduleId);
}
}