This commit is contained in:
master
2026-01-08 20:48:20 +02:00
7 changed files with 1504 additions and 9 deletions

View File

@@ -110,17 +110,38 @@ public sealed class WorkerRepository : RepositoryBase<SchedulerDataSource>, IWor
public async Task<WorkerEntity> UpsertAsync(WorkerEntity worker, CancellationToken cancellationToken = default)
{
const string sql = """
INSERT INTO scheduler.workers (id, tenant_id, hostname, process_id, job_types, max_concurrent_jobs, metadata)
VALUES (@id, @tenant_id, @hostname, @process_id, @job_types, @max_concurrent_jobs, @metadata::jsonb)
INSERT INTO scheduler.workers (
id,
tenant_id,
hostname,
process_id,
job_types,
max_concurrent_jobs,
current_jobs,
status,
metadata
)
VALUES (
@id,
@tenant_id,
@hostname,
@process_id,
@job_types,
@max_concurrent_jobs,
@current_jobs,
@status,
@metadata::jsonb
)
ON CONFLICT (id) DO UPDATE SET
tenant_id = EXCLUDED.tenant_id,
hostname = EXCLUDED.hostname,
process_id = EXCLUDED.process_id,
job_types = EXCLUDED.job_types,
max_concurrent_jobs = EXCLUDED.max_concurrent_jobs,
current_jobs = EXCLUDED.current_jobs,
metadata = EXCLUDED.metadata,
last_heartbeat_at = NOW(),
status = 'active'
status = EXCLUDED.status
RETURNING *
""";
@@ -133,6 +154,8 @@ public sealed class WorkerRepository : RepositoryBase<SchedulerDataSource>, IWor
AddParameter(command, "process_id", worker.ProcessId);
AddTextArrayParameter(command, "job_types", worker.JobTypes);
AddParameter(command, "max_concurrent_jobs", worker.MaxConcurrentJobs);
AddParameter(command, "current_jobs", worker.CurrentJobs);
AddParameter(command, "status", worker.Status);
AddJsonbParameter(command, "metadata", worker.Metadata);
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);

View File

@@ -12,6 +12,7 @@ namespace StellaOps.Scheduler.Persistence.Postgres.Tests;
public sealed class TriggerRepositoryTests : IAsyncLifetime
{
private readonly SchedulerPostgresFixture _fixture;
private readonly SchedulerDataSource _dataSource;
private readonly TriggerRepository _repository;
private readonly string _tenantId = Guid.NewGuid().ToString();
@@ -21,8 +22,8 @@ public sealed class TriggerRepositoryTests : IAsyncLifetime
var options = fixture.Fixture.CreateOptions();
options.SchemaName = fixture.SchemaName;
var dataSource = new SchedulerDataSource(Options.Create(options), NullLogger<SchedulerDataSource>.Instance);
_repository = new TriggerRepository(dataSource, NullLogger<TriggerRepository>.Instance);
_dataSource = new SchedulerDataSource(Options.Create(options), NullLogger<SchedulerDataSource>.Instance);
_repository = new TriggerRepository(_dataSource, NullLogger<TriggerRepository>.Instance);
}
public ValueTask InitializeAsync() => new(_fixture.TruncateAllTablesAsync());
@@ -141,6 +142,17 @@ public sealed class TriggerRepositoryTests : IAsyncLifetime
var trigger = CreateTrigger("fire-test", "* * * * *");
await _repository.CreateAsync(trigger);
var jobId = Guid.NewGuid();
var jobRepository = new JobRepository(_dataSource, NullLogger<JobRepository>.Instance);
await jobRepository.CreateAsync(new JobEntity
{
Id = jobId,
TenantId = _tenantId,
JobType = "scan",
PayloadDigest = "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
IdempotencyKey = $"job-{jobId}",
CreatedAt = DateTimeOffset.UtcNow,
Payload = "{}"
});
var nextFireAt = DateTimeOffset.UtcNow.AddMinutes(1);
// Act