up
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Concelier Attestation Tests / attestation-tests (push) Has been cancelled

This commit is contained in:
StellaOps Bot
2025-11-28 20:55:22 +02:00
parent d040c001ac
commit 2548abc56f
231 changed files with 47468 additions and 68 deletions

View File

@@ -0,0 +1,172 @@
-- Scheduler Schema Migration 001: Initial Schema
-- Creates the scheduler schema for jobs, triggers, and workers
-- Create schema
CREATE SCHEMA IF NOT EXISTS scheduler;
-- Job status enum type
DO $$ BEGIN
CREATE TYPE scheduler.job_status AS ENUM (
'pending', 'scheduled', 'leased', 'running',
'succeeded', 'failed', 'canceled', 'timed_out'
);
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
-- Jobs table
CREATE TABLE IF NOT EXISTS scheduler.jobs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id TEXT NOT NULL,
project_id TEXT,
job_type TEXT NOT NULL,
status scheduler.job_status NOT NULL DEFAULT 'pending',
priority INT NOT NULL DEFAULT 0,
payload JSONB NOT NULL DEFAULT '{}',
payload_digest TEXT NOT NULL,
idempotency_key TEXT NOT NULL,
correlation_id TEXT,
attempt INT NOT NULL DEFAULT 0,
max_attempts INT NOT NULL DEFAULT 3,
lease_id UUID,
worker_id TEXT,
lease_until TIMESTAMPTZ,
not_before TIMESTAMPTZ,
reason TEXT,
result JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
scheduled_at TIMESTAMPTZ,
leased_at TIMESTAMPTZ,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
created_by TEXT,
UNIQUE(tenant_id, idempotency_key)
);
CREATE INDEX idx_jobs_tenant_status ON scheduler.jobs(tenant_id, status);
CREATE INDEX idx_jobs_tenant_type ON scheduler.jobs(tenant_id, job_type);
CREATE INDEX idx_jobs_scheduled ON scheduler.jobs(tenant_id, status, not_before, priority DESC, created_at)
WHERE status = 'scheduled';
CREATE INDEX idx_jobs_leased ON scheduler.jobs(tenant_id, status, lease_until)
WHERE status = 'leased';
CREATE INDEX idx_jobs_project ON scheduler.jobs(tenant_id, project_id);
CREATE INDEX idx_jobs_correlation ON scheduler.jobs(correlation_id);
-- Triggers table (cron-based job triggers)
CREATE TABLE IF NOT EXISTS scheduler.triggers (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id TEXT NOT NULL,
name TEXT NOT NULL,
description TEXT,
job_type TEXT NOT NULL,
job_payload JSONB NOT NULL DEFAULT '{}',
cron_expression TEXT NOT NULL,
timezone TEXT NOT NULL DEFAULT 'UTC',
enabled BOOLEAN NOT NULL DEFAULT TRUE,
next_fire_at TIMESTAMPTZ,
last_fire_at TIMESTAMPTZ,
last_job_id UUID REFERENCES scheduler.jobs(id),
fire_count BIGINT NOT NULL DEFAULT 0,
misfire_count INT NOT NULL DEFAULT 0,
metadata JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_by TEXT,
UNIQUE(tenant_id, name)
);
CREATE INDEX idx_triggers_tenant_id ON scheduler.triggers(tenant_id);
CREATE INDEX idx_triggers_next_fire ON scheduler.triggers(enabled, next_fire_at)
WHERE enabled = TRUE;
CREATE INDEX idx_triggers_job_type ON scheduler.triggers(tenant_id, job_type);
-- Workers table (worker registration and heartbeat)
CREATE TABLE IF NOT EXISTS scheduler.workers (
id TEXT PRIMARY KEY,
tenant_id TEXT,
hostname TEXT NOT NULL,
process_id INT,
job_types TEXT[] NOT NULL DEFAULT '{}',
max_concurrent_jobs INT NOT NULL DEFAULT 1,
current_jobs INT NOT NULL DEFAULT 0,
status TEXT NOT NULL DEFAULT 'active' CHECK (status IN ('active', 'draining', 'stopped')),
last_heartbeat_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
registered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
metadata JSONB NOT NULL DEFAULT '{}'
);
CREATE INDEX idx_workers_status ON scheduler.workers(status);
CREATE INDEX idx_workers_heartbeat ON scheduler.workers(last_heartbeat_at);
CREATE INDEX idx_workers_tenant ON scheduler.workers(tenant_id);
-- Distributed locks using advisory locks wrapper
CREATE TABLE IF NOT EXISTS scheduler.locks (
lock_key TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL,
holder_id TEXT NOT NULL,
acquired_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}'
);
CREATE INDEX idx_locks_tenant ON scheduler.locks(tenant_id);
CREATE INDEX idx_locks_expires ON scheduler.locks(expires_at);
-- Job history (completed jobs archive)
CREATE TABLE IF NOT EXISTS scheduler.job_history (
id BIGSERIAL PRIMARY KEY,
job_id UUID NOT NULL,
tenant_id TEXT NOT NULL,
project_id TEXT,
job_type TEXT NOT NULL,
status scheduler.job_status NOT NULL,
attempt INT NOT NULL,
payload_digest TEXT NOT NULL,
result JSONB,
reason TEXT,
worker_id TEXT,
duration_ms BIGINT,
created_at TIMESTAMPTZ NOT NULL,
completed_at TIMESTAMPTZ NOT NULL,
archived_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_job_history_tenant ON scheduler.job_history(tenant_id);
CREATE INDEX idx_job_history_job_id ON scheduler.job_history(job_id);
CREATE INDEX idx_job_history_type ON scheduler.job_history(tenant_id, job_type);
CREATE INDEX idx_job_history_completed ON scheduler.job_history(tenant_id, completed_at);
-- Metrics table (job execution metrics)
CREATE TABLE IF NOT EXISTS scheduler.metrics (
id BIGSERIAL PRIMARY KEY,
tenant_id TEXT NOT NULL,
job_type TEXT NOT NULL,
period_start TIMESTAMPTZ NOT NULL,
period_end TIMESTAMPTZ NOT NULL,
jobs_created BIGINT NOT NULL DEFAULT 0,
jobs_completed BIGINT NOT NULL DEFAULT 0,
jobs_failed BIGINT NOT NULL DEFAULT 0,
jobs_timed_out BIGINT NOT NULL DEFAULT 0,
avg_duration_ms BIGINT,
p50_duration_ms BIGINT,
p95_duration_ms BIGINT,
p99_duration_ms BIGINT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(tenant_id, job_type, period_start)
);
CREATE INDEX idx_metrics_tenant_period ON scheduler.metrics(tenant_id, period_start);
-- Function to update updated_at timestamp
CREATE OR REPLACE FUNCTION scheduler.update_updated_at()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Trigger for updated_at
CREATE TRIGGER trg_triggers_updated_at
BEFORE UPDATE ON scheduler.triggers
FOR EACH ROW EXECUTE FUNCTION scheduler.update_updated_at();

View File

@@ -0,0 +1,150 @@
namespace StellaOps.Scheduler.Storage.Postgres.Models;
/// <summary>
/// Job status values matching the PostgreSQL enum.
/// </summary>
public enum JobStatus
{
/// <summary>Job is pending.</summary>
Pending,
/// <summary>Job is scheduled.</summary>
Scheduled,
/// <summary>Job is leased to a worker.</summary>
Leased,
/// <summary>Job is running.</summary>
Running,
/// <summary>Job succeeded.</summary>
Succeeded,
/// <summary>Job failed.</summary>
Failed,
/// <summary>Job was canceled.</summary>
Canceled,
/// <summary>Job timed out.</summary>
TimedOut
}
/// <summary>
/// Represents a job entity in the scheduler schema.
/// </summary>
public sealed class JobEntity
{
/// <summary>
/// Unique job identifier.
/// </summary>
public required Guid Id { get; init; }
/// <summary>
/// Tenant this job belongs to.
/// </summary>
public required string TenantId { get; init; }
/// <summary>
/// Optional project identifier.
/// </summary>
public string? ProjectId { get; init; }
/// <summary>
/// Type of job to execute.
/// </summary>
public required string JobType { get; init; }
/// <summary>
/// Current job status.
/// </summary>
public JobStatus Status { get; init; } = JobStatus.Pending;
/// <summary>
/// Job priority (higher = more important).
/// </summary>
public int Priority { get; init; }
/// <summary>
/// Job payload as JSON.
/// </summary>
public string Payload { get; init; } = "{}";
/// <summary>
/// SHA256 digest of payload for deduplication.
/// </summary>
public required string PayloadDigest { get; init; }
/// <summary>
/// Idempotency key (unique per tenant).
/// </summary>
public required string IdempotencyKey { get; init; }
/// <summary>
/// Correlation ID for tracing.
/// </summary>
public string? CorrelationId { get; init; }
/// <summary>
/// Current attempt number.
/// </summary>
public int Attempt { get; init; }
/// <summary>
/// Maximum number of attempts.
/// </summary>
public int MaxAttempts { get; init; } = 3;
/// <summary>
/// Current lease ID if leased.
/// </summary>
public Guid? LeaseId { get; init; }
/// <summary>
/// Worker ID holding the lease.
/// </summary>
public string? WorkerId { get; init; }
/// <summary>
/// Lease expiration time.
/// </summary>
public DateTimeOffset? LeaseUntil { get; init; }
/// <summary>
/// Don't run before this time.
/// </summary>
public DateTimeOffset? NotBefore { get; init; }
/// <summary>
/// Reason for failure/cancellation.
/// </summary>
public string? Reason { get; init; }
/// <summary>
/// Job result as JSON.
/// </summary>
public string? Result { get; init; }
/// <summary>
/// When the job was created.
/// </summary>
public DateTimeOffset CreatedAt { get; init; }
/// <summary>
/// When the job was scheduled.
/// </summary>
public DateTimeOffset? ScheduledAt { get; init; }
/// <summary>
/// When the job was leased.
/// </summary>
public DateTimeOffset? LeasedAt { get; init; }
/// <summary>
/// When the job started running.
/// </summary>
public DateTimeOffset? StartedAt { get; init; }
/// <summary>
/// When the job completed.
/// </summary>
public DateTimeOffset? CompletedAt { get; init; }
/// <summary>
/// User who created the job.
/// </summary>
public string? CreatedBy { get; init; }
}

View File

@@ -0,0 +1,97 @@
namespace StellaOps.Scheduler.Storage.Postgres.Models;
/// <summary>
/// Represents a trigger entity in the scheduler schema.
/// </summary>
public sealed class TriggerEntity
{
/// <summary>
/// Unique trigger identifier.
/// </summary>
public required Guid Id { get; init; }
/// <summary>
/// Tenant this trigger belongs to.
/// </summary>
public required string TenantId { get; init; }
/// <summary>
/// Trigger name (unique per tenant).
/// </summary>
public required string Name { get; init; }
/// <summary>
/// Optional description.
/// </summary>
public string? Description { get; init; }
/// <summary>
/// Type of job to create when trigger fires.
/// </summary>
public required string JobType { get; init; }
/// <summary>
/// Job payload as JSON.
/// </summary>
public string JobPayload { get; init; } = "{}";
/// <summary>
/// Cron expression for scheduling.
/// </summary>
public required string CronExpression { get; init; }
/// <summary>
/// Timezone for cron evaluation.
/// </summary>
public string Timezone { get; init; } = "UTC";
/// <summary>
/// Trigger is enabled.
/// </summary>
public bool Enabled { get; init; } = true;
/// <summary>
/// Next scheduled fire time.
/// </summary>
public DateTimeOffset? NextFireAt { get; init; }
/// <summary>
/// Last time the trigger fired.
/// </summary>
public DateTimeOffset? LastFireAt { get; init; }
/// <summary>
/// ID of the last job created by this trigger.
/// </summary>
public Guid? LastJobId { get; init; }
/// <summary>
/// Total number of times the trigger has fired.
/// </summary>
public long FireCount { get; init; }
/// <summary>
/// Number of misfires.
/// </summary>
public int MisfireCount { get; init; }
/// <summary>
/// Trigger metadata as JSON.
/// </summary>
public string Metadata { get; init; } = "{}";
/// <summary>
/// When the trigger was created.
/// </summary>
public DateTimeOffset CreatedAt { get; init; }
/// <summary>
/// When the trigger was last updated.
/// </summary>
public DateTimeOffset UpdatedAt { get; init; }
/// <summary>
/// User who created the trigger.
/// </summary>
public string? CreatedBy { get; init; }
}

View File

@@ -0,0 +1,101 @@
using StellaOps.Scheduler.Storage.Postgres.Models;
namespace StellaOps.Scheduler.Storage.Postgres.Repositories;
/// <summary>
/// Repository interface for job operations.
/// </summary>
public interface IJobRepository
{
/// <summary>
/// Creates a new job.
/// </summary>
Task<JobEntity> CreateAsync(JobEntity job, CancellationToken cancellationToken = default);
/// <summary>
/// Gets a job by ID.
/// </summary>
Task<JobEntity?> GetByIdAsync(string tenantId, Guid id, CancellationToken cancellationToken = default);
/// <summary>
/// Gets a job by idempotency key.
/// </summary>
Task<JobEntity?> GetByIdempotencyKeyAsync(string tenantId, string idempotencyKey, CancellationToken cancellationToken = default);
/// <summary>
/// Gets scheduled jobs ready to run.
/// </summary>
Task<IReadOnlyList<JobEntity>> GetScheduledJobsAsync(
string tenantId,
string[] jobTypes,
int limit = 10,
CancellationToken cancellationToken = default);
/// <summary>
/// Attempts to lease a job for processing.
/// Uses SELECT FOR UPDATE SKIP LOCKED for distributed locking.
/// </summary>
Task<JobEntity?> TryLeaseJobAsync(
string tenantId,
Guid jobId,
string workerId,
TimeSpan leaseDuration,
CancellationToken cancellationToken = default);
/// <summary>
/// Extends the lease on a job.
/// </summary>
Task<bool> ExtendLeaseAsync(
string tenantId,
Guid jobId,
Guid leaseId,
TimeSpan extension,
CancellationToken cancellationToken = default);
/// <summary>
/// Marks a job as completed successfully.
/// </summary>
Task<bool> CompleteAsync(
string tenantId,
Guid jobId,
Guid leaseId,
string? result = null,
CancellationToken cancellationToken = default);
/// <summary>
/// Marks a job as failed.
/// </summary>
Task<bool> FailAsync(
string tenantId,
Guid jobId,
Guid leaseId,
string reason,
bool retry = true,
CancellationToken cancellationToken = default);
/// <summary>
/// Cancels a job.
/// </summary>
Task<bool> CancelAsync(
string tenantId,
Guid jobId,
string reason,
CancellationToken cancellationToken = default);
/// <summary>
/// Recovers expired leases (for jobs that timed out).
/// </summary>
Task<int> RecoverExpiredLeasesAsync(
string tenantId,
CancellationToken cancellationToken = default);
/// <summary>
/// Gets jobs by status.
/// </summary>
Task<IReadOnlyList<JobEntity>> GetByStatusAsync(
string tenantId,
JobStatus status,
int limit = 100,
int offset = 0,
CancellationToken cancellationToken = default);
}

View File

@@ -0,0 +1,421 @@
using Microsoft.Extensions.Logging;
using Npgsql;
using StellaOps.Infrastructure.Postgres.Repositories;
using StellaOps.Scheduler.Storage.Postgres.Models;
namespace StellaOps.Scheduler.Storage.Postgres.Repositories;
/// <summary>
/// PostgreSQL repository for job operations.
/// </summary>
public sealed class JobRepository : RepositoryBase<SchedulerDataSource>, IJobRepository
{
/// <summary>
/// Creates a new job repository.
/// </summary>
public JobRepository(SchedulerDataSource dataSource, ILogger<JobRepository> logger)
: base(dataSource, logger)
{
}
/// <inheritdoc />
public async Task<JobEntity> CreateAsync(JobEntity job, CancellationToken cancellationToken = default)
{
const string sql = """
INSERT INTO scheduler.jobs (
id, tenant_id, project_id, job_type, status, priority, payload, payload_digest,
idempotency_key, correlation_id, max_attempts, not_before, created_by
)
VALUES (
@id, @tenant_id, @project_id, @job_type, @status::scheduler.job_status, @priority, @payload::jsonb, @payload_digest,
@idempotency_key, @correlation_id, @max_attempts, @not_before, @created_by
)
RETURNING *
""";
await using var connection = await DataSource.OpenConnectionAsync(job.TenantId, "writer", cancellationToken)
.ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddJobParameters(command, job);
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
return MapJob(reader);
}
/// <inheritdoc />
public async Task<JobEntity?> GetByIdAsync(string tenantId, Guid id, CancellationToken cancellationToken = default)
{
const string sql = """
SELECT * FROM scheduler.jobs
WHERE tenant_id = @tenant_id AND id = @id
""";
return await QuerySingleOrDefaultAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
AddParameter(cmd, "id", id);
},
MapJob,
cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<JobEntity?> GetByIdempotencyKeyAsync(string tenantId, string idempotencyKey, CancellationToken cancellationToken = default)
{
const string sql = """
SELECT * FROM scheduler.jobs
WHERE tenant_id = @tenant_id AND idempotency_key = @idempotency_key
""";
return await QuerySingleOrDefaultAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
AddParameter(cmd, "idempotency_key", idempotencyKey);
},
MapJob,
cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<IReadOnlyList<JobEntity>> GetScheduledJobsAsync(
string tenantId,
string[] jobTypes,
int limit = 10,
CancellationToken cancellationToken = default)
{
const string sql = """
SELECT * FROM scheduler.jobs
WHERE tenant_id = @tenant_id
AND status = 'scheduled'
AND (not_before IS NULL OR not_before <= NOW())
AND job_type = ANY(@job_types)
ORDER BY priority DESC, created_at
LIMIT @limit
""";
return await QueryAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
AddTextArrayParameter(cmd, "job_types", jobTypes);
AddParameter(cmd, "limit", limit);
},
MapJob,
cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<JobEntity?> TryLeaseJobAsync(
string tenantId,
Guid jobId,
string workerId,
TimeSpan leaseDuration,
CancellationToken cancellationToken = default)
{
var leaseId = Guid.NewGuid();
var leaseUntil = DateTimeOffset.UtcNow.Add(leaseDuration);
const string sql = """
UPDATE scheduler.jobs
SET status = 'leased'::scheduler.job_status,
lease_id = @lease_id,
worker_id = @worker_id,
lease_until = @lease_until,
leased_at = NOW(),
attempt = attempt + 1
WHERE tenant_id = @tenant_id
AND id = @job_id
AND status = 'scheduled'
AND (not_before IS NULL OR not_before <= NOW())
RETURNING *
""";
return await QuerySingleOrDefaultAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
AddParameter(cmd, "job_id", jobId);
AddParameter(cmd, "lease_id", leaseId);
AddParameter(cmd, "worker_id", workerId);
AddParameter(cmd, "lease_until", leaseUntil);
},
MapJob,
cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<bool> ExtendLeaseAsync(
string tenantId,
Guid jobId,
Guid leaseId,
TimeSpan extension,
CancellationToken cancellationToken = default)
{
const string sql = """
UPDATE scheduler.jobs
SET lease_until = lease_until + @extension
WHERE tenant_id = @tenant_id
AND id = @job_id
AND lease_id = @lease_id
AND status = 'leased'
""";
var rows = await ExecuteAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
AddParameter(cmd, "job_id", jobId);
AddParameter(cmd, "lease_id", leaseId);
AddParameter(cmd, "extension", extension);
},
cancellationToken).ConfigureAwait(false);
return rows > 0;
}
/// <inheritdoc />
public async Task<bool> CompleteAsync(
string tenantId,
Guid jobId,
Guid leaseId,
string? result = null,
CancellationToken cancellationToken = default)
{
const string sql = """
UPDATE scheduler.jobs
SET status = 'succeeded'::scheduler.job_status,
result = @result::jsonb,
completed_at = NOW()
WHERE tenant_id = @tenant_id
AND id = @job_id
AND lease_id = @lease_id
AND status IN ('leased', 'running')
""";
var rows = await ExecuteAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
AddParameter(cmd, "job_id", jobId);
AddParameter(cmd, "lease_id", leaseId);
AddJsonbParameter(cmd, "result", result);
},
cancellationToken).ConfigureAwait(false);
return rows > 0;
}
/// <inheritdoc />
public async Task<bool> FailAsync(
string tenantId,
Guid jobId,
Guid leaseId,
string reason,
bool retry = true,
CancellationToken cancellationToken = default)
{
// If retry is allowed and attempts remaining, reschedule; otherwise mark as failed
var sql = retry
? """
UPDATE scheduler.jobs
SET status = CASE
WHEN attempt < max_attempts THEN 'scheduled'::scheduler.job_status
ELSE 'failed'::scheduler.job_status
END,
reason = @reason,
lease_id = NULL,
worker_id = NULL,
lease_until = NULL,
completed_at = CASE WHEN attempt >= max_attempts THEN NOW() ELSE NULL END
WHERE tenant_id = @tenant_id
AND id = @job_id
AND lease_id = @lease_id
"""
: """
UPDATE scheduler.jobs
SET status = 'failed'::scheduler.job_status,
reason = @reason,
completed_at = NOW()
WHERE tenant_id = @tenant_id
AND id = @job_id
AND lease_id = @lease_id
""";
var rows = await ExecuteAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
AddParameter(cmd, "job_id", jobId);
AddParameter(cmd, "lease_id", leaseId);
AddParameter(cmd, "reason", reason);
},
cancellationToken).ConfigureAwait(false);
return rows > 0;
}
/// <inheritdoc />
public async Task<bool> CancelAsync(
string tenantId,
Guid jobId,
string reason,
CancellationToken cancellationToken = default)
{
const string sql = """
UPDATE scheduler.jobs
SET status = 'canceled'::scheduler.job_status,
reason = @reason,
completed_at = NOW()
WHERE tenant_id = @tenant_id
AND id = @job_id
AND status IN ('pending', 'scheduled')
""";
var rows = await ExecuteAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
AddParameter(cmd, "job_id", jobId);
AddParameter(cmd, "reason", reason);
},
cancellationToken).ConfigureAwait(false);
return rows > 0;
}
/// <inheritdoc />
public async Task<int> RecoverExpiredLeasesAsync(
string tenantId,
CancellationToken cancellationToken = default)
{
const string sql = """
UPDATE scheduler.jobs
SET status = CASE
WHEN attempt < max_attempts THEN 'scheduled'::scheduler.job_status
ELSE 'timed_out'::scheduler.job_status
END,
reason = 'Lease expired',
lease_id = NULL,
worker_id = NULL,
lease_until = NULL,
completed_at = CASE WHEN attempt >= max_attempts THEN NOW() ELSE NULL END
WHERE tenant_id = @tenant_id
AND status = 'leased'
AND lease_until < NOW()
""";
return await ExecuteAsync(
tenantId,
sql,
cmd => AddParameter(cmd, "tenant_id", tenantId),
cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<IReadOnlyList<JobEntity>> GetByStatusAsync(
string tenantId,
JobStatus status,
int limit = 100,
int offset = 0,
CancellationToken cancellationToken = default)
{
const string sql = """
SELECT * FROM scheduler.jobs
WHERE tenant_id = @tenant_id AND status = @status::scheduler.job_status
ORDER BY created_at DESC, id
LIMIT @limit OFFSET @offset
""";
return await QueryAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
AddParameter(cmd, "status", status.ToString().ToLowerInvariant());
AddParameter(cmd, "limit", limit);
AddParameter(cmd, "offset", offset);
},
MapJob,
cancellationToken).ConfigureAwait(false);
}
private static void AddJobParameters(NpgsqlCommand command, JobEntity job)
{
AddParameter(command, "id", job.Id);
AddParameter(command, "tenant_id", job.TenantId);
AddParameter(command, "project_id", job.ProjectId);
AddParameter(command, "job_type", job.JobType);
AddParameter(command, "status", job.Status.ToString().ToLowerInvariant());
AddParameter(command, "priority", job.Priority);
AddJsonbParameter(command, "payload", job.Payload);
AddParameter(command, "payload_digest", job.PayloadDigest);
AddParameter(command, "idempotency_key", job.IdempotencyKey);
AddParameter(command, "correlation_id", job.CorrelationId);
AddParameter(command, "max_attempts", job.MaxAttempts);
AddParameter(command, "not_before", job.NotBefore);
AddParameter(command, "created_by", job.CreatedBy);
}
private static JobEntity MapJob(NpgsqlDataReader reader) => new()
{
Id = reader.GetGuid(reader.GetOrdinal("id")),
TenantId = reader.GetString(reader.GetOrdinal("tenant_id")),
ProjectId = GetNullableString(reader, reader.GetOrdinal("project_id")),
JobType = reader.GetString(reader.GetOrdinal("job_type")),
Status = ParseJobStatus(reader.GetString(reader.GetOrdinal("status"))),
Priority = reader.GetInt32(reader.GetOrdinal("priority")),
Payload = reader.GetString(reader.GetOrdinal("payload")),
PayloadDigest = reader.GetString(reader.GetOrdinal("payload_digest")),
IdempotencyKey = reader.GetString(reader.GetOrdinal("idempotency_key")),
CorrelationId = GetNullableString(reader, reader.GetOrdinal("correlation_id")),
Attempt = reader.GetInt32(reader.GetOrdinal("attempt")),
MaxAttempts = reader.GetInt32(reader.GetOrdinal("max_attempts")),
LeaseId = GetNullableGuid(reader, reader.GetOrdinal("lease_id")),
WorkerId = GetNullableString(reader, reader.GetOrdinal("worker_id")),
LeaseUntil = GetNullableDateTimeOffset(reader, reader.GetOrdinal("lease_until")),
NotBefore = GetNullableDateTimeOffset(reader, reader.GetOrdinal("not_before")),
Reason = GetNullableString(reader, reader.GetOrdinal("reason")),
Result = GetNullableString(reader, reader.GetOrdinal("result")),
CreatedAt = reader.GetFieldValue<DateTimeOffset>(reader.GetOrdinal("created_at")),
ScheduledAt = GetNullableDateTimeOffset(reader, reader.GetOrdinal("scheduled_at")),
LeasedAt = GetNullableDateTimeOffset(reader, reader.GetOrdinal("leased_at")),
StartedAt = GetNullableDateTimeOffset(reader, reader.GetOrdinal("started_at")),
CompletedAt = GetNullableDateTimeOffset(reader, reader.GetOrdinal("completed_at")),
CreatedBy = GetNullableString(reader, reader.GetOrdinal("created_by"))
};
private static JobStatus ParseJobStatus(string status) => status switch
{
"pending" => JobStatus.Pending,
"scheduled" => JobStatus.Scheduled,
"leased" => JobStatus.Leased,
"running" => JobStatus.Running,
"succeeded" => JobStatus.Succeeded,
"failed" => JobStatus.Failed,
"canceled" => JobStatus.Canceled,
"timed_out" => JobStatus.TimedOut,
_ => throw new ArgumentException($"Unknown job status: {status}", nameof(status))
};
}

View File

@@ -0,0 +1,38 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StellaOps.Infrastructure.Postgres.Connections;
using StellaOps.Infrastructure.Postgres.Options;
namespace StellaOps.Scheduler.Storage.Postgres;
/// <summary>
/// PostgreSQL data source for the Scheduler module.
/// Manages connections with tenant context for job scheduling and queue management.
/// </summary>
public sealed class SchedulerDataSource : DataSourceBase
{
/// <summary>
/// Default schema name for Scheduler tables.
/// </summary>
public const string DefaultSchemaName = "scheduler";
/// <summary>
/// Creates a new Scheduler data source.
/// </summary>
public SchedulerDataSource(IOptions<PostgresOptions> options, ILogger<SchedulerDataSource> logger)
: base(CreateOptions(options.Value), logger)
{
}
/// <inheritdoc />
protected override string ModuleName => "Scheduler";
private static PostgresOptions CreateOptions(PostgresOptions baseOptions)
{
if (string.IsNullOrWhiteSpace(baseOptions.SchemaName))
{
baseOptions.SchemaName = DefaultSchemaName;
}
return baseOptions;
}
}

View File

@@ -0,0 +1,53 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using StellaOps.Infrastructure.Postgres;
using StellaOps.Infrastructure.Postgres.Options;
using StellaOps.Scheduler.Storage.Postgres.Repositories;
namespace StellaOps.Scheduler.Storage.Postgres;
/// <summary>
/// Extension methods for configuring Scheduler PostgreSQL storage services.
/// </summary>
public static class ServiceCollectionExtensions
{
/// <summary>
/// Adds Scheduler PostgreSQL storage services.
/// </summary>
/// <param name="services">Service collection.</param>
/// <param name="configuration">Configuration root.</param>
/// <param name="sectionName">Configuration section name for PostgreSQL options.</param>
/// <returns>Service collection for chaining.</returns>
public static IServiceCollection AddSchedulerPostgresStorage(
this IServiceCollection services,
IConfiguration configuration,
string sectionName = "Postgres:Scheduler")
{
services.Configure<PostgresOptions>(sectionName, configuration.GetSection(sectionName));
services.AddSingleton<SchedulerDataSource>();
// Register repositories
services.AddScoped<IJobRepository, JobRepository>();
return services;
}
/// <summary>
/// Adds Scheduler PostgreSQL storage services with explicit options.
/// </summary>
/// <param name="services">Service collection.</param>
/// <param name="configureOptions">Options configuration action.</param>
/// <returns>Service collection for chaining.</returns>
public static IServiceCollection AddSchedulerPostgresStorage(
this IServiceCollection services,
Action<PostgresOptions> configureOptions)
{
services.Configure(configureOptions);
services.AddSingleton<SchedulerDataSource>();
// Register repositories
services.AddScoped<IJobRepository, JobRepository>();
return services;
}
}

View File

@@ -0,0 +1,21 @@
<?xml version="1.0" ?>
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<LangVersion>preview</LangVersion>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<RootNamespace>StellaOps.Scheduler.Storage.Postgres</RootNamespace>
</PropertyGroup>
<ItemGroup>
<None Include="Migrations\**\*.sql" CopyToOutputDirectory="PreserveNewest" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\__Libraries\StellaOps.Infrastructure.Postgres\StellaOps.Infrastructure.Postgres.csproj" />
</ItemGroup>
</Project>