up
Some checks failed
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
Concelier Attestation Tests / attestation-tests (push) Has been cancelled
devportal-offline / build-offline (push) Has been cancelled
Mirror Thin Bundle Sign & Verify / mirror-sign (push) Has been cancelled

This commit is contained in:
StellaOps Bot
2025-11-29 02:40:21 +02:00
parent 887b0a1c67
commit 7e7be4d2fd
54 changed files with 4907 additions and 3 deletions

View File

@@ -0,0 +1,82 @@
namespace StellaOps.Scheduler.Storage.Postgres.Models;
/// <summary>
/// Represents a job history entity in the scheduler schema.
/// </summary>
public sealed class JobHistoryEntity
{
/// <summary>
/// Unique history entry identifier.
/// </summary>
public long Id { get; init; }
/// <summary>
/// Original job ID.
/// </summary>
public required Guid JobId { get; init; }
/// <summary>
/// Tenant this job belonged to.
/// </summary>
public required string TenantId { get; init; }
/// <summary>
/// Optional project identifier.
/// </summary>
public string? ProjectId { get; init; }
/// <summary>
/// Type of job that was executed.
/// </summary>
public required string JobType { get; init; }
/// <summary>
/// Final job status.
/// </summary>
public JobStatus Status { get; init; }
/// <summary>
/// Attempt number when archived.
/// </summary>
public int Attempt { get; init; }
/// <summary>
/// SHA256 digest of payload.
/// </summary>
public required string PayloadDigest { get; init; }
/// <summary>
/// Job result as JSON.
/// </summary>
public string? Result { get; init; }
/// <summary>
/// Reason for failure/cancellation.
/// </summary>
public string? Reason { get; init; }
/// <summary>
/// Worker that executed the job.
/// </summary>
public string? WorkerId { get; init; }
/// <summary>
/// Duration in milliseconds.
/// </summary>
public long? DurationMs { get; init; }
/// <summary>
/// When the job was created.
/// </summary>
public DateTimeOffset CreatedAt { get; init; }
/// <summary>
/// When the job completed.
/// </summary>
public DateTimeOffset CompletedAt { get; init; }
/// <summary>
/// When the job was archived to history.
/// </summary>
public DateTimeOffset ArchivedAt { get; init; }
}

View File

@@ -0,0 +1,37 @@
namespace StellaOps.Scheduler.Storage.Postgres.Models;
/// <summary>
/// Represents a distributed lock entity in the scheduler schema.
/// </summary>
public sealed class LockEntity
{
/// <summary>
/// Lock key (primary key).
/// </summary>
public required string LockKey { get; init; }
/// <summary>
/// Tenant this lock belongs to.
/// </summary>
public required string TenantId { get; init; }
/// <summary>
/// ID of the holder that acquired the lock.
/// </summary>
public required string HolderId { get; init; }
/// <summary>
/// When the lock was acquired.
/// </summary>
public DateTimeOffset AcquiredAt { get; init; }
/// <summary>
/// When the lock expires.
/// </summary>
public DateTimeOffset ExpiresAt { get; init; }
/// <summary>
/// Lock metadata as JSON.
/// </summary>
public string Metadata { get; init; } = "{}";
}

View File

@@ -0,0 +1,77 @@
namespace StellaOps.Scheduler.Storage.Postgres.Models;
/// <summary>
/// Represents a metrics entity in the scheduler schema.
/// </summary>
public sealed class MetricsEntity
{
/// <summary>
/// Unique metrics entry identifier.
/// </summary>
public long Id { get; init; }
/// <summary>
/// Tenant this metric belongs to.
/// </summary>
public required string TenantId { get; init; }
/// <summary>
/// Job type for this metric.
/// </summary>
public required string JobType { get; init; }
/// <summary>
/// Period start time.
/// </summary>
public DateTimeOffset PeriodStart { get; init; }
/// <summary>
/// Period end time.
/// </summary>
public DateTimeOffset PeriodEnd { get; init; }
/// <summary>
/// Number of jobs created in this period.
/// </summary>
public long JobsCreated { get; init; }
/// <summary>
/// Number of jobs completed in this period.
/// </summary>
public long JobsCompleted { get; init; }
/// <summary>
/// Number of jobs failed in this period.
/// </summary>
public long JobsFailed { get; init; }
/// <summary>
/// Number of jobs timed out in this period.
/// </summary>
public long JobsTimedOut { get; init; }
/// <summary>
/// Average duration in milliseconds.
/// </summary>
public long? AvgDurationMs { get; init; }
/// <summary>
/// 50th percentile duration.
/// </summary>
public long? P50DurationMs { get; init; }
/// <summary>
/// 95th percentile duration.
/// </summary>
public long? P95DurationMs { get; init; }
/// <summary>
/// 99th percentile duration.
/// </summary>
public long? P99DurationMs { get; init; }
/// <summary>
/// When this metric was created.
/// </summary>
public DateTimeOffset CreatedAt { get; init; }
}

View File

@@ -0,0 +1,72 @@
namespace StellaOps.Scheduler.Storage.Postgres.Models;
/// <summary>
/// Worker status values.
/// </summary>
public static class WorkerStatus
{
public const string Active = "active";
public const string Draining = "draining";
public const string Stopped = "stopped";
}
/// <summary>
/// Represents a worker entity in the scheduler schema.
/// </summary>
public sealed class WorkerEntity
{
/// <summary>
/// Unique worker identifier.
/// </summary>
public required string Id { get; init; }
/// <summary>
/// Optional tenant this worker is dedicated to.
/// </summary>
public string? TenantId { get; init; }
/// <summary>
/// Hostname of the worker.
/// </summary>
public required string Hostname { get; init; }
/// <summary>
/// Process ID of the worker.
/// </summary>
public int? ProcessId { get; init; }
/// <summary>
/// Job types this worker can process.
/// </summary>
public string[] JobTypes { get; init; } = [];
/// <summary>
/// Maximum concurrent jobs this worker can handle.
/// </summary>
public int MaxConcurrentJobs { get; init; } = 1;
/// <summary>
/// Current number of jobs being processed.
/// </summary>
public int CurrentJobs { get; init; }
/// <summary>
/// Worker status.
/// </summary>
public string Status { get; init; } = WorkerStatus.Active;
/// <summary>
/// Last heartbeat timestamp.
/// </summary>
public DateTimeOffset LastHeartbeatAt { get; init; }
/// <summary>
/// When the worker was registered.
/// </summary>
public DateTimeOffset RegisteredAt { get; init; }
/// <summary>
/// Worker metadata as JSON.
/// </summary>
public string Metadata { get; init; } = "{}";
}

View File

@@ -0,0 +1,145 @@
using Microsoft.Extensions.Logging;
using Npgsql;
using StellaOps.Infrastructure.Postgres.Repositories;
using StellaOps.Scheduler.Storage.Postgres.Models;
namespace StellaOps.Scheduler.Storage.Postgres.Repositories;
/// <summary>
/// PostgreSQL repository for distributed lock operations.
/// </summary>
public sealed class DistributedLockRepository : RepositoryBase<SchedulerDataSource>, IDistributedLockRepository
{
/// <summary>
/// Creates a new distributed lock repository.
/// </summary>
public DistributedLockRepository(SchedulerDataSource dataSource, ILogger<DistributedLockRepository> logger)
: base(dataSource, logger)
{
}
/// <inheritdoc />
public async Task<bool> TryAcquireAsync(string tenantId, string lockKey, string holderId, TimeSpan duration, CancellationToken cancellationToken = default)
{
const string sql = """
INSERT INTO scheduler.locks (lock_key, tenant_id, holder_id, expires_at)
VALUES (@lock_key, @tenant_id, @holder_id, NOW() + @duration)
ON CONFLICT (lock_key) DO UPDATE SET
holder_id = EXCLUDED.holder_id,
tenant_id = EXCLUDED.tenant_id,
acquired_at = NOW(),
expires_at = NOW() + EXCLUDED.expires_at - EXCLUDED.acquired_at
WHERE scheduler.locks.expires_at < NOW()
""";
await using var connection = await DataSource.OpenSystemConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddParameter(command, "lock_key", lockKey);
AddParameter(command, "tenant_id", tenantId);
AddParameter(command, "holder_id", holderId);
AddParameter(command, "duration", duration);
var rows = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
return rows > 0;
}
/// <inheritdoc />
public async Task<LockEntity?> GetAsync(string lockKey, CancellationToken cancellationToken = default)
{
const string sql = """
SELECT lock_key, tenant_id, holder_id, acquired_at, expires_at, metadata
FROM scheduler.locks
WHERE lock_key = @lock_key AND expires_at > NOW()
""";
await using var connection = await DataSource.OpenSystemConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddParameter(command, "lock_key", lockKey);
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
return await reader.ReadAsync(cancellationToken).ConfigureAwait(false) ? MapLock(reader) : null;
}
/// <inheritdoc />
public async Task<bool> ExtendAsync(string lockKey, string holderId, TimeSpan extension, CancellationToken cancellationToken = default)
{
const string sql = """
UPDATE scheduler.locks
SET expires_at = expires_at + @extension
WHERE lock_key = @lock_key AND holder_id = @holder_id AND expires_at > NOW()
""";
await using var connection = await DataSource.OpenSystemConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddParameter(command, "lock_key", lockKey);
AddParameter(command, "holder_id", holderId);
AddParameter(command, "extension", extension);
var rows = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
return rows > 0;
}
/// <inheritdoc />
public async Task<bool> ReleaseAsync(string lockKey, string holderId, CancellationToken cancellationToken = default)
{
const string sql = """
DELETE FROM scheduler.locks
WHERE lock_key = @lock_key AND holder_id = @holder_id
""";
await using var connection = await DataSource.OpenSystemConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddParameter(command, "lock_key", lockKey);
AddParameter(command, "holder_id", holderId);
var rows = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
return rows > 0;
}
/// <inheritdoc />
public async Task<int> CleanupExpiredAsync(CancellationToken cancellationToken = default)
{
const string sql = "DELETE FROM scheduler.locks WHERE expires_at < NOW()";
await using var connection = await DataSource.OpenSystemConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
return await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<IReadOnlyList<LockEntity>> ListByTenantAsync(string tenantId, CancellationToken cancellationToken = default)
{
const string sql = """
SELECT lock_key, tenant_id, holder_id, acquired_at, expires_at, metadata
FROM scheduler.locks
WHERE tenant_id = @tenant_id AND expires_at > NOW()
ORDER BY acquired_at DESC
""";
await using var connection = await DataSource.OpenSystemConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddParameter(command, "tenant_id", tenantId);
var results = new List<LockEntity>();
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
results.Add(MapLock(reader));
}
return results;
}
private static LockEntity MapLock(NpgsqlDataReader reader) => new()
{
LockKey = reader.GetString(reader.GetOrdinal("lock_key")),
TenantId = reader.GetString(reader.GetOrdinal("tenant_id")),
HolderId = reader.GetString(reader.GetOrdinal("holder_id")),
AcquiredAt = reader.GetFieldValue<DateTimeOffset>(reader.GetOrdinal("acquired_at")),
ExpiresAt = reader.GetFieldValue<DateTimeOffset>(reader.GetOrdinal("expires_at")),
Metadata = reader.GetString(reader.GetOrdinal("metadata"))
};
}

View File

@@ -0,0 +1,39 @@
using StellaOps.Scheduler.Storage.Postgres.Models;
namespace StellaOps.Scheduler.Storage.Postgres.Repositories;
/// <summary>
/// Repository interface for distributed lock operations.
/// </summary>
public interface IDistributedLockRepository
{
/// <summary>
/// Tries to acquire a lock.
/// </summary>
Task<bool> TryAcquireAsync(string tenantId, string lockKey, string holderId, TimeSpan duration, CancellationToken cancellationToken = default);
/// <summary>
/// Gets a lock by key.
/// </summary>
Task<LockEntity?> GetAsync(string lockKey, CancellationToken cancellationToken = default);
/// <summary>
/// Extends a lock.
/// </summary>
Task<bool> ExtendAsync(string lockKey, string holderId, TimeSpan extension, CancellationToken cancellationToken = default);
/// <summary>
/// Releases a lock.
/// </summary>
Task<bool> ReleaseAsync(string lockKey, string holderId, CancellationToken cancellationToken = default);
/// <summary>
/// Cleans up expired locks.
/// </summary>
Task<int> CleanupExpiredAsync(CancellationToken cancellationToken = default);
/// <summary>
/// Lists all locks for a tenant.
/// </summary>
Task<IReadOnlyList<LockEntity>> ListByTenantAsync(string tenantId, CancellationToken cancellationToken = default);
}

View File

@@ -0,0 +1,61 @@
using StellaOps.Scheduler.Storage.Postgres.Models;
namespace StellaOps.Scheduler.Storage.Postgres.Repositories;
/// <summary>
/// Repository interface for job history operations.
/// </summary>
public interface IJobHistoryRepository
{
/// <summary>
/// Archives a completed job.
/// </summary>
Task<JobHistoryEntity> ArchiveAsync(JobHistoryEntity history, CancellationToken cancellationToken = default);
/// <summary>
/// Gets history for a specific job.
/// </summary>
Task<IReadOnlyList<JobHistoryEntity>> GetByJobIdAsync(Guid jobId, CancellationToken cancellationToken = default);
/// <summary>
/// Lists job history for a tenant.
/// </summary>
Task<IReadOnlyList<JobHistoryEntity>> ListAsync(
string tenantId,
int limit = 100,
int offset = 0,
CancellationToken cancellationToken = default);
/// <summary>
/// Lists job history by type.
/// </summary>
Task<IReadOnlyList<JobHistoryEntity>> ListByJobTypeAsync(
string tenantId,
string jobType,
int limit = 100,
CancellationToken cancellationToken = default);
/// <summary>
/// Lists job history by status.
/// </summary>
Task<IReadOnlyList<JobHistoryEntity>> ListByStatusAsync(
string tenantId,
JobStatus status,
int limit = 100,
CancellationToken cancellationToken = default);
/// <summary>
/// Lists job history in a time range.
/// </summary>
Task<IReadOnlyList<JobHistoryEntity>> ListByTimeRangeAsync(
string tenantId,
DateTimeOffset from,
DateTimeOffset to,
int limit = 1000,
CancellationToken cancellationToken = default);
/// <summary>
/// Deletes old history entries.
/// </summary>
Task<int> DeleteOlderThanAsync(DateTimeOffset cutoff, CancellationToken cancellationToken = default);
}

View File

@@ -0,0 +1,45 @@
using StellaOps.Scheduler.Storage.Postgres.Models;
namespace StellaOps.Scheduler.Storage.Postgres.Repositories;
/// <summary>
/// Repository interface for metrics operations.
/// </summary>
public interface IMetricsRepository
{
/// <summary>
/// Records or updates metrics for a period.
/// </summary>
Task<MetricsEntity> UpsertAsync(MetricsEntity metrics, CancellationToken cancellationToken = default);
/// <summary>
/// Gets metrics for a tenant and job type.
/// </summary>
Task<IReadOnlyList<MetricsEntity>> GetAsync(
string tenantId,
string jobType,
DateTimeOffset from,
DateTimeOffset to,
CancellationToken cancellationToken = default);
/// <summary>
/// Gets aggregated metrics for a tenant.
/// </summary>
Task<IReadOnlyList<MetricsEntity>> GetByTenantAsync(
string tenantId,
DateTimeOffset from,
DateTimeOffset to,
CancellationToken cancellationToken = default);
/// <summary>
/// Gets latest metrics for all job types.
/// </summary>
Task<IReadOnlyList<MetricsEntity>> GetLatestAsync(
string tenantId,
CancellationToken cancellationToken = default);
/// <summary>
/// Deletes old metrics.
/// </summary>
Task<int> DeleteOlderThanAsync(DateTimeOffset cutoff, CancellationToken cancellationToken = default);
}

View File

@@ -0,0 +1,59 @@
using StellaOps.Scheduler.Storage.Postgres.Models;
namespace StellaOps.Scheduler.Storage.Postgres.Repositories;
/// <summary>
/// Repository interface for trigger operations.
/// </summary>
public interface ITriggerRepository
{
/// <summary>
/// Gets a trigger by ID.
/// </summary>
Task<TriggerEntity?> GetByIdAsync(string tenantId, Guid id, CancellationToken cancellationToken = default);
/// <summary>
/// Gets a trigger by name.
/// </summary>
Task<TriggerEntity?> GetByNameAsync(string tenantId, string name, CancellationToken cancellationToken = default);
/// <summary>
/// Lists all triggers for a tenant.
/// </summary>
Task<IReadOnlyList<TriggerEntity>> ListAsync(string tenantId, CancellationToken cancellationToken = default);
/// <summary>
/// Gets triggers that are due to fire.
/// </summary>
Task<IReadOnlyList<TriggerEntity>> GetDueTriggersAsync(int limit = 100, CancellationToken cancellationToken = default);
/// <summary>
/// Creates a new trigger.
/// </summary>
Task<TriggerEntity> CreateAsync(TriggerEntity trigger, CancellationToken cancellationToken = default);
/// <summary>
/// Updates a trigger.
/// </summary>
Task<bool> UpdateAsync(TriggerEntity trigger, CancellationToken cancellationToken = default);
/// <summary>
/// Records a trigger fire event.
/// </summary>
Task<bool> RecordFireAsync(string tenantId, Guid triggerId, Guid jobId, DateTimeOffset? nextFireAt, CancellationToken cancellationToken = default);
/// <summary>
/// Records a trigger misfire.
/// </summary>
Task<bool> RecordMisfireAsync(string tenantId, Guid triggerId, CancellationToken cancellationToken = default);
/// <summary>
/// Enables or disables a trigger.
/// </summary>
Task<bool> SetEnabledAsync(string tenantId, Guid id, bool enabled, CancellationToken cancellationToken = default);
/// <summary>
/// Deletes a trigger.
/// </summary>
Task<bool> DeleteAsync(string tenantId, Guid id, CancellationToken cancellationToken = default);
}

View File

@@ -0,0 +1,54 @@
using StellaOps.Scheduler.Storage.Postgres.Models;
namespace StellaOps.Scheduler.Storage.Postgres.Repositories;
/// <summary>
/// Repository interface for worker operations.
/// </summary>
public interface IWorkerRepository
{
/// <summary>
/// Gets a worker by ID.
/// </summary>
Task<WorkerEntity?> GetByIdAsync(string id, CancellationToken cancellationToken = default);
/// <summary>
/// Lists all workers.
/// </summary>
Task<IReadOnlyList<WorkerEntity>> ListAsync(CancellationToken cancellationToken = default);
/// <summary>
/// Lists workers by status.
/// </summary>
Task<IReadOnlyList<WorkerEntity>> ListByStatusAsync(string status, CancellationToken cancellationToken = default);
/// <summary>
/// Gets workers for a specific tenant.
/// </summary>
Task<IReadOnlyList<WorkerEntity>> GetByTenantIdAsync(string tenantId, CancellationToken cancellationToken = default);
/// <summary>
/// Registers a new worker or updates existing.
/// </summary>
Task<WorkerEntity> UpsertAsync(WorkerEntity worker, CancellationToken cancellationToken = default);
/// <summary>
/// Updates worker heartbeat.
/// </summary>
Task<bool> HeartbeatAsync(string id, int currentJobs, CancellationToken cancellationToken = default);
/// <summary>
/// Updates worker status.
/// </summary>
Task<bool> SetStatusAsync(string id, string status, CancellationToken cancellationToken = default);
/// <summary>
/// Removes a worker.
/// </summary>
Task<bool> DeleteAsync(string id, CancellationToken cancellationToken = default);
/// <summary>
/// Gets stale workers (no heartbeat in duration).
/// </summary>
Task<IReadOnlyList<WorkerEntity>> GetStaleWorkersAsync(TimeSpan staleDuration, CancellationToken cancellationToken = default);
}

View File

@@ -0,0 +1,244 @@
using Microsoft.Extensions.Logging;
using Npgsql;
using StellaOps.Infrastructure.Postgres.Repositories;
using StellaOps.Scheduler.Storage.Postgres.Models;
namespace StellaOps.Scheduler.Storage.Postgres.Repositories;
/// <summary>
/// PostgreSQL repository for job history operations.
/// </summary>
public sealed class JobHistoryRepository : RepositoryBase<SchedulerDataSource>, IJobHistoryRepository
{
/// <summary>
/// Creates a new job history repository.
/// </summary>
public JobHistoryRepository(SchedulerDataSource dataSource, ILogger<JobHistoryRepository> logger)
: base(dataSource, logger)
{
}
/// <inheritdoc />
public async Task<JobHistoryEntity> ArchiveAsync(JobHistoryEntity history, CancellationToken cancellationToken = default)
{
const string sql = """
INSERT INTO scheduler.job_history (
job_id, tenant_id, project_id, job_type, status, attempt, payload_digest,
result, reason, worker_id, duration_ms, created_at, completed_at
)
VALUES (
@job_id, @tenant_id, @project_id, @job_type, @status::scheduler.job_status, @attempt, @payload_digest,
@result::jsonb, @reason, @worker_id, @duration_ms, @created_at, @completed_at
)
RETURNING *
""";
await using var connection = await DataSource.OpenConnectionAsync(history.TenantId, "writer", cancellationToken)
.ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddParameter(command, "job_id", history.JobId);
AddParameter(command, "tenant_id", history.TenantId);
AddParameter(command, "project_id", history.ProjectId);
AddParameter(command, "job_type", history.JobType);
AddParameter(command, "status", history.Status.ToString().ToLowerInvariant());
AddParameter(command, "attempt", history.Attempt);
AddParameter(command, "payload_digest", history.PayloadDigest);
AddJsonbParameter(command, "result", history.Result);
AddParameter(command, "reason", history.Reason);
AddParameter(command, "worker_id", history.WorkerId);
AddParameter(command, "duration_ms", history.DurationMs);
AddParameter(command, "created_at", history.CreatedAt);
AddParameter(command, "completed_at", history.CompletedAt);
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
return MapJobHistory(reader);
}
/// <inheritdoc />
public async Task<IReadOnlyList<JobHistoryEntity>> GetByJobIdAsync(Guid jobId, CancellationToken cancellationToken = default)
{
const string sql = """
SELECT id, job_id, tenant_id, project_id, job_type, status, attempt, payload_digest,
result, reason, worker_id, duration_ms, created_at, completed_at, archived_at
FROM scheduler.job_history
WHERE job_id = @job_id
ORDER BY archived_at DESC
""";
await using var connection = await DataSource.OpenSystemConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddParameter(command, "job_id", jobId);
var results = new List<JobHistoryEntity>();
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
results.Add(MapJobHistory(reader));
}
return results;
}
/// <inheritdoc />
public async Task<IReadOnlyList<JobHistoryEntity>> ListAsync(
string tenantId,
int limit = 100,
int offset = 0,
CancellationToken cancellationToken = default)
{
const string sql = """
SELECT id, job_id, tenant_id, project_id, job_type, status, attempt, payload_digest,
result, reason, worker_id, duration_ms, created_at, completed_at, archived_at
FROM scheduler.job_history
WHERE tenant_id = @tenant_id
ORDER BY completed_at DESC
LIMIT @limit OFFSET @offset
""";
return await QueryAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
AddParameter(cmd, "limit", limit);
AddParameter(cmd, "offset", offset);
},
MapJobHistory,
cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<IReadOnlyList<JobHistoryEntity>> ListByJobTypeAsync(
string tenantId,
string jobType,
int limit = 100,
CancellationToken cancellationToken = default)
{
const string sql = """
SELECT id, job_id, tenant_id, project_id, job_type, status, attempt, payload_digest,
result, reason, worker_id, duration_ms, created_at, completed_at, archived_at
FROM scheduler.job_history
WHERE tenant_id = @tenant_id AND job_type = @job_type
ORDER BY completed_at DESC
LIMIT @limit
""";
return await QueryAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
AddParameter(cmd, "job_type", jobType);
AddParameter(cmd, "limit", limit);
},
MapJobHistory,
cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<IReadOnlyList<JobHistoryEntity>> ListByStatusAsync(
string tenantId,
JobStatus status,
int limit = 100,
CancellationToken cancellationToken = default)
{
const string sql = """
SELECT id, job_id, tenant_id, project_id, job_type, status, attempt, payload_digest,
result, reason, worker_id, duration_ms, created_at, completed_at, archived_at
FROM scheduler.job_history
WHERE tenant_id = @tenant_id AND status = @status::scheduler.job_status
ORDER BY completed_at DESC
LIMIT @limit
""";
return await QueryAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
AddParameter(cmd, "status", status.ToString().ToLowerInvariant());
AddParameter(cmd, "limit", limit);
},
MapJobHistory,
cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<IReadOnlyList<JobHistoryEntity>> ListByTimeRangeAsync(
string tenantId,
DateTimeOffset from,
DateTimeOffset to,
int limit = 1000,
CancellationToken cancellationToken = default)
{
const string sql = """
SELECT id, job_id, tenant_id, project_id, job_type, status, attempt, payload_digest,
result, reason, worker_id, duration_ms, created_at, completed_at, archived_at
FROM scheduler.job_history
WHERE tenant_id = @tenant_id AND completed_at >= @from AND completed_at < @to
ORDER BY completed_at DESC
LIMIT @limit
""";
return await QueryAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
AddParameter(cmd, "from", from);
AddParameter(cmd, "to", to);
AddParameter(cmd, "limit", limit);
},
MapJobHistory,
cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<int> DeleteOlderThanAsync(DateTimeOffset cutoff, CancellationToken cancellationToken = default)
{
const string sql = "DELETE FROM scheduler.job_history WHERE archived_at < @cutoff";
await using var connection = await DataSource.OpenSystemConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddParameter(command, "cutoff", cutoff);
return await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
private static JobHistoryEntity MapJobHistory(NpgsqlDataReader reader) => new()
{
Id = reader.GetInt64(reader.GetOrdinal("id")),
JobId = reader.GetGuid(reader.GetOrdinal("job_id")),
TenantId = reader.GetString(reader.GetOrdinal("tenant_id")),
ProjectId = GetNullableString(reader, reader.GetOrdinal("project_id")),
JobType = reader.GetString(reader.GetOrdinal("job_type")),
Status = ParseJobStatus(reader.GetString(reader.GetOrdinal("status"))),
Attempt = reader.GetInt32(reader.GetOrdinal("attempt")),
PayloadDigest = reader.GetString(reader.GetOrdinal("payload_digest")),
Result = GetNullableString(reader, reader.GetOrdinal("result")),
Reason = GetNullableString(reader, reader.GetOrdinal("reason")),
WorkerId = GetNullableString(reader, reader.GetOrdinal("worker_id")),
DurationMs = reader.IsDBNull(reader.GetOrdinal("duration_ms")) ? null : reader.GetInt64(reader.GetOrdinal("duration_ms")),
CreatedAt = reader.GetFieldValue<DateTimeOffset>(reader.GetOrdinal("created_at")),
CompletedAt = reader.GetFieldValue<DateTimeOffset>(reader.GetOrdinal("completed_at")),
ArchivedAt = reader.GetFieldValue<DateTimeOffset>(reader.GetOrdinal("archived_at"))
};
private static JobStatus ParseJobStatus(string status) => status switch
{
"pending" => JobStatus.Pending,
"scheduled" => JobStatus.Scheduled,
"leased" => JobStatus.Leased,
"running" => JobStatus.Running,
"succeeded" => JobStatus.Succeeded,
"failed" => JobStatus.Failed,
"canceled" => JobStatus.Canceled,
"timed_out" => JobStatus.TimedOut,
_ => throw new ArgumentException($"Unknown job status: {status}", nameof(status))
};
}

View File

@@ -0,0 +1,178 @@
using Microsoft.Extensions.Logging;
using Npgsql;
using StellaOps.Infrastructure.Postgres.Repositories;
using StellaOps.Scheduler.Storage.Postgres.Models;
namespace StellaOps.Scheduler.Storage.Postgres.Repositories;
/// <summary>
/// PostgreSQL repository for metrics operations.
/// </summary>
public sealed class MetricsRepository : RepositoryBase<SchedulerDataSource>, IMetricsRepository
{
/// <summary>
/// Creates a new metrics repository.
/// </summary>
public MetricsRepository(SchedulerDataSource dataSource, ILogger<MetricsRepository> logger)
: base(dataSource, logger)
{
}
/// <inheritdoc />
public async Task<MetricsEntity> UpsertAsync(MetricsEntity metrics, CancellationToken cancellationToken = default)
{
const string sql = """
INSERT INTO scheduler.metrics (
tenant_id, job_type, period_start, period_end, jobs_created, jobs_completed,
jobs_failed, jobs_timed_out, avg_duration_ms, p50_duration_ms, p95_duration_ms, p99_duration_ms
)
VALUES (
@tenant_id, @job_type, @period_start, @period_end, @jobs_created, @jobs_completed,
@jobs_failed, @jobs_timed_out, @avg_duration_ms, @p50_duration_ms, @p95_duration_ms, @p99_duration_ms
)
ON CONFLICT (tenant_id, job_type, period_start) DO UPDATE SET
period_end = EXCLUDED.period_end,
jobs_created = EXCLUDED.jobs_created,
jobs_completed = EXCLUDED.jobs_completed,
jobs_failed = EXCLUDED.jobs_failed,
jobs_timed_out = EXCLUDED.jobs_timed_out,
avg_duration_ms = EXCLUDED.avg_duration_ms,
p50_duration_ms = EXCLUDED.p50_duration_ms,
p95_duration_ms = EXCLUDED.p95_duration_ms,
p99_duration_ms = EXCLUDED.p99_duration_ms
RETURNING *
""";
await using var connection = await DataSource.OpenConnectionAsync(metrics.TenantId, "writer", cancellationToken)
.ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddParameter(command, "tenant_id", metrics.TenantId);
AddParameter(command, "job_type", metrics.JobType);
AddParameter(command, "period_start", metrics.PeriodStart);
AddParameter(command, "period_end", metrics.PeriodEnd);
AddParameter(command, "jobs_created", metrics.JobsCreated);
AddParameter(command, "jobs_completed", metrics.JobsCompleted);
AddParameter(command, "jobs_failed", metrics.JobsFailed);
AddParameter(command, "jobs_timed_out", metrics.JobsTimedOut);
AddParameter(command, "avg_duration_ms", metrics.AvgDurationMs);
AddParameter(command, "p50_duration_ms", metrics.P50DurationMs);
AddParameter(command, "p95_duration_ms", metrics.P95DurationMs);
AddParameter(command, "p99_duration_ms", metrics.P99DurationMs);
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
return MapMetrics(reader);
}
/// <inheritdoc />
public async Task<IReadOnlyList<MetricsEntity>> GetAsync(
string tenantId,
string jobType,
DateTimeOffset from,
DateTimeOffset to,
CancellationToken cancellationToken = default)
{
const string sql = """
SELECT id, tenant_id, job_type, period_start, period_end, jobs_created, jobs_completed,
jobs_failed, jobs_timed_out, avg_duration_ms, p50_duration_ms, p95_duration_ms, p99_duration_ms, created_at
FROM scheduler.metrics
WHERE tenant_id = @tenant_id AND job_type = @job_type
AND period_start >= @from AND period_start < @to
ORDER BY period_start
""";
return await QueryAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
AddParameter(cmd, "job_type", jobType);
AddParameter(cmd, "from", from);
AddParameter(cmd, "to", to);
},
MapMetrics,
cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<IReadOnlyList<MetricsEntity>> GetByTenantAsync(
string tenantId,
DateTimeOffset from,
DateTimeOffset to,
CancellationToken cancellationToken = default)
{
const string sql = """
SELECT id, tenant_id, job_type, period_start, period_end, jobs_created, jobs_completed,
jobs_failed, jobs_timed_out, avg_duration_ms, p50_duration_ms, p95_duration_ms, p99_duration_ms, created_at
FROM scheduler.metrics
WHERE tenant_id = @tenant_id AND period_start >= @from AND period_start < @to
ORDER BY period_start, job_type
""";
return await QueryAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
AddParameter(cmd, "from", from);
AddParameter(cmd, "to", to);
},
MapMetrics,
cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<IReadOnlyList<MetricsEntity>> GetLatestAsync(
string tenantId,
CancellationToken cancellationToken = default)
{
const string sql = """
SELECT DISTINCT ON (job_type) id, tenant_id, job_type, period_start, period_end,
jobs_created, jobs_completed, jobs_failed, jobs_timed_out,
avg_duration_ms, p50_duration_ms, p95_duration_ms, p99_duration_ms, created_at
FROM scheduler.metrics
WHERE tenant_id = @tenant_id
ORDER BY job_type, period_start DESC
""";
return await QueryAsync(
tenantId,
sql,
cmd => AddParameter(cmd, "tenant_id", tenantId),
MapMetrics,
cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<int> DeleteOlderThanAsync(DateTimeOffset cutoff, CancellationToken cancellationToken = default)
{
const string sql = "DELETE FROM scheduler.metrics WHERE period_end < @cutoff";
await using var connection = await DataSource.OpenSystemConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddParameter(command, "cutoff", cutoff);
return await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
private static MetricsEntity MapMetrics(NpgsqlDataReader reader) => new()
{
Id = reader.GetInt64(reader.GetOrdinal("id")),
TenantId = reader.GetString(reader.GetOrdinal("tenant_id")),
JobType = reader.GetString(reader.GetOrdinal("job_type")),
PeriodStart = reader.GetFieldValue<DateTimeOffset>(reader.GetOrdinal("period_start")),
PeriodEnd = reader.GetFieldValue<DateTimeOffset>(reader.GetOrdinal("period_end")),
JobsCreated = reader.GetInt64(reader.GetOrdinal("jobs_created")),
JobsCompleted = reader.GetInt64(reader.GetOrdinal("jobs_completed")),
JobsFailed = reader.GetInt64(reader.GetOrdinal("jobs_failed")),
JobsTimedOut = reader.GetInt64(reader.GetOrdinal("jobs_timed_out")),
AvgDurationMs = reader.IsDBNull(reader.GetOrdinal("avg_duration_ms")) ? null : reader.GetInt64(reader.GetOrdinal("avg_duration_ms")),
P50DurationMs = reader.IsDBNull(reader.GetOrdinal("p50_duration_ms")) ? null : reader.GetInt64(reader.GetOrdinal("p50_duration_ms")),
P95DurationMs = reader.IsDBNull(reader.GetOrdinal("p95_duration_ms")) ? null : reader.GetInt64(reader.GetOrdinal("p95_duration_ms")),
P99DurationMs = reader.IsDBNull(reader.GetOrdinal("p99_duration_ms")) ? null : reader.GetInt64(reader.GetOrdinal("p99_duration_ms")),
CreatedAt = reader.GetFieldValue<DateTimeOffset>(reader.GetOrdinal("created_at"))
};
}

View File

@@ -0,0 +1,301 @@
using Microsoft.Extensions.Logging;
using Npgsql;
using StellaOps.Infrastructure.Postgres.Repositories;
using StellaOps.Scheduler.Storage.Postgres.Models;
namespace StellaOps.Scheduler.Storage.Postgres.Repositories;
/// <summary>
/// PostgreSQL repository for trigger operations.
/// </summary>
public sealed class TriggerRepository : RepositoryBase<SchedulerDataSource>, ITriggerRepository
{
/// <summary>
/// Creates a new trigger repository.
/// </summary>
public TriggerRepository(SchedulerDataSource dataSource, ILogger<TriggerRepository> logger)
: base(dataSource, logger)
{
}
/// <inheritdoc />
public async Task<TriggerEntity?> GetByIdAsync(string tenantId, Guid id, CancellationToken cancellationToken = default)
{
const string sql = """
SELECT id, tenant_id, name, description, job_type, job_payload, cron_expression, timezone,
enabled, next_fire_at, last_fire_at, last_job_id, fire_count, misfire_count,
metadata, created_at, updated_at, created_by
FROM scheduler.triggers
WHERE tenant_id = @tenant_id AND id = @id
""";
return await QuerySingleOrDefaultAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
AddParameter(cmd, "id", id);
},
MapTrigger,
cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<TriggerEntity?> GetByNameAsync(string tenantId, string name, CancellationToken cancellationToken = default)
{
const string sql = """
SELECT id, tenant_id, name, description, job_type, job_payload, cron_expression, timezone,
enabled, next_fire_at, last_fire_at, last_job_id, fire_count, misfire_count,
metadata, created_at, updated_at, created_by
FROM scheduler.triggers
WHERE tenant_id = @tenant_id AND name = @name
""";
return await QuerySingleOrDefaultAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
AddParameter(cmd, "name", name);
},
MapTrigger,
cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<IReadOnlyList<TriggerEntity>> ListAsync(string tenantId, CancellationToken cancellationToken = default)
{
const string sql = """
SELECT id, tenant_id, name, description, job_type, job_payload, cron_expression, timezone,
enabled, next_fire_at, last_fire_at, last_job_id, fire_count, misfire_count,
metadata, created_at, updated_at, created_by
FROM scheduler.triggers
WHERE tenant_id = @tenant_id
ORDER BY name
""";
return await QueryAsync(
tenantId,
sql,
cmd => AddParameter(cmd, "tenant_id", tenantId),
MapTrigger,
cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<IReadOnlyList<TriggerEntity>> GetDueTriggersAsync(int limit = 100, CancellationToken cancellationToken = default)
{
const string sql = """
SELECT id, tenant_id, name, description, job_type, job_payload, cron_expression, timezone,
enabled, next_fire_at, last_fire_at, last_job_id, fire_count, misfire_count,
metadata, created_at, updated_at, created_by
FROM scheduler.triggers
WHERE enabled = TRUE AND next_fire_at <= NOW()
ORDER BY next_fire_at
LIMIT @limit
""";
await using var connection = await DataSource.OpenSystemConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddParameter(command, "limit", limit);
var results = new List<TriggerEntity>();
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
results.Add(MapTrigger(reader));
}
return results;
}
/// <inheritdoc />
public async Task<TriggerEntity> CreateAsync(TriggerEntity trigger, CancellationToken cancellationToken = default)
{
const string sql = """
INSERT INTO scheduler.triggers (
id, tenant_id, name, description, job_type, job_payload, cron_expression, timezone,
enabled, next_fire_at, metadata, created_by
)
VALUES (
@id, @tenant_id, @name, @description, @job_type, @job_payload::jsonb, @cron_expression, @timezone,
@enabled, @next_fire_at, @metadata::jsonb, @created_by
)
RETURNING *
""";
var id = trigger.Id == Guid.Empty ? Guid.NewGuid() : trigger.Id;
await using var connection = await DataSource.OpenConnectionAsync(trigger.TenantId, "writer", cancellationToken)
.ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddParameter(command, "id", id);
AddParameter(command, "tenant_id", trigger.TenantId);
AddParameter(command, "name", trigger.Name);
AddParameter(command, "description", trigger.Description);
AddParameter(command, "job_type", trigger.JobType);
AddJsonbParameter(command, "job_payload", trigger.JobPayload);
AddParameter(command, "cron_expression", trigger.CronExpression);
AddParameter(command, "timezone", trigger.Timezone);
AddParameter(command, "enabled", trigger.Enabled);
AddParameter(command, "next_fire_at", trigger.NextFireAt);
AddJsonbParameter(command, "metadata", trigger.Metadata);
AddParameter(command, "created_by", trigger.CreatedBy);
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
return MapTrigger(reader);
}
/// <inheritdoc />
public async Task<bool> UpdateAsync(TriggerEntity trigger, CancellationToken cancellationToken = default)
{
const string sql = """
UPDATE scheduler.triggers
SET name = @name,
description = @description,
job_type = @job_type,
job_payload = @job_payload::jsonb,
cron_expression = @cron_expression,
timezone = @timezone,
enabled = @enabled,
next_fire_at = @next_fire_at,
metadata = @metadata::jsonb
WHERE tenant_id = @tenant_id AND id = @id
""";
var rows = await ExecuteAsync(
trigger.TenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", trigger.TenantId);
AddParameter(cmd, "id", trigger.Id);
AddParameter(cmd, "name", trigger.Name);
AddParameter(cmd, "description", trigger.Description);
AddParameter(cmd, "job_type", trigger.JobType);
AddJsonbParameter(cmd, "job_payload", trigger.JobPayload);
AddParameter(cmd, "cron_expression", trigger.CronExpression);
AddParameter(cmd, "timezone", trigger.Timezone);
AddParameter(cmd, "enabled", trigger.Enabled);
AddParameter(cmd, "next_fire_at", trigger.NextFireAt);
AddJsonbParameter(cmd, "metadata", trigger.Metadata);
},
cancellationToken).ConfigureAwait(false);
return rows > 0;
}
/// <inheritdoc />
public async Task<bool> RecordFireAsync(string tenantId, Guid triggerId, Guid jobId, DateTimeOffset? nextFireAt, CancellationToken cancellationToken = default)
{
const string sql = """
UPDATE scheduler.triggers
SET last_fire_at = NOW(),
last_job_id = @job_id,
next_fire_at = @next_fire_at,
fire_count = fire_count + 1
WHERE tenant_id = @tenant_id AND id = @id
""";
var rows = await ExecuteAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
AddParameter(cmd, "id", triggerId);
AddParameter(cmd, "job_id", jobId);
AddParameter(cmd, "next_fire_at", nextFireAt);
},
cancellationToken).ConfigureAwait(false);
return rows > 0;
}
/// <inheritdoc />
public async Task<bool> RecordMisfireAsync(string tenantId, Guid triggerId, CancellationToken cancellationToken = default)
{
const string sql = """
UPDATE scheduler.triggers
SET misfire_count = misfire_count + 1
WHERE tenant_id = @tenant_id AND id = @id
""";
var rows = await ExecuteAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
AddParameter(cmd, "id", triggerId);
},
cancellationToken).ConfigureAwait(false);
return rows > 0;
}
/// <inheritdoc />
public async Task<bool> SetEnabledAsync(string tenantId, Guid id, bool enabled, CancellationToken cancellationToken = default)
{
const string sql = """
UPDATE scheduler.triggers
SET enabled = @enabled
WHERE tenant_id = @tenant_id AND id = @id
""";
var rows = await ExecuteAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
AddParameter(cmd, "id", id);
AddParameter(cmd, "enabled", enabled);
},
cancellationToken).ConfigureAwait(false);
return rows > 0;
}
/// <inheritdoc />
public async Task<bool> DeleteAsync(string tenantId, Guid id, CancellationToken cancellationToken = default)
{
const string sql = "DELETE FROM scheduler.triggers WHERE tenant_id = @tenant_id AND id = @id";
var rows = await ExecuteAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
AddParameter(cmd, "id", id);
},
cancellationToken).ConfigureAwait(false);
return rows > 0;
}
private static TriggerEntity MapTrigger(NpgsqlDataReader reader) => new()
{
Id = reader.GetGuid(reader.GetOrdinal("id")),
TenantId = reader.GetString(reader.GetOrdinal("tenant_id")),
Name = reader.GetString(reader.GetOrdinal("name")),
Description = GetNullableString(reader, reader.GetOrdinal("description")),
JobType = reader.GetString(reader.GetOrdinal("job_type")),
JobPayload = reader.GetString(reader.GetOrdinal("job_payload")),
CronExpression = reader.GetString(reader.GetOrdinal("cron_expression")),
Timezone = reader.GetString(reader.GetOrdinal("timezone")),
Enabled = reader.GetBoolean(reader.GetOrdinal("enabled")),
NextFireAt = GetNullableDateTimeOffset(reader, reader.GetOrdinal("next_fire_at")),
LastFireAt = GetNullableDateTimeOffset(reader, reader.GetOrdinal("last_fire_at")),
LastJobId = GetNullableGuid(reader, reader.GetOrdinal("last_job_id")),
FireCount = reader.GetInt64(reader.GetOrdinal("fire_count")),
MisfireCount = reader.GetInt32(reader.GetOrdinal("misfire_count")),
Metadata = reader.GetString(reader.GetOrdinal("metadata")),
CreatedAt = reader.GetFieldValue<DateTimeOffset>(reader.GetOrdinal("created_at")),
UpdatedAt = reader.GetFieldValue<DateTimeOffset>(reader.GetOrdinal("updated_at")),
CreatedBy = GetNullableString(reader, reader.GetOrdinal("created_by"))
};
}

View File

@@ -0,0 +1,230 @@
using Microsoft.Extensions.Logging;
using Npgsql;
using StellaOps.Infrastructure.Postgres.Repositories;
using StellaOps.Scheduler.Storage.Postgres.Models;
namespace StellaOps.Scheduler.Storage.Postgres.Repositories;
/// <summary>
/// PostgreSQL repository for worker operations.
/// </summary>
public sealed class WorkerRepository : RepositoryBase<SchedulerDataSource>, IWorkerRepository
{
/// <summary>
/// Creates a new worker repository.
/// </summary>
public WorkerRepository(SchedulerDataSource dataSource, ILogger<WorkerRepository> logger)
: base(dataSource, logger)
{
}
/// <inheritdoc />
public async Task<WorkerEntity?> GetByIdAsync(string id, CancellationToken cancellationToken = default)
{
const string sql = """
SELECT id, tenant_id, hostname, process_id, job_types, max_concurrent_jobs, current_jobs,
status, last_heartbeat_at, registered_at, metadata
FROM scheduler.workers
WHERE id = @id
""";
await using var connection = await DataSource.OpenSystemConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddParameter(command, "id", id);
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
return await reader.ReadAsync(cancellationToken).ConfigureAwait(false) ? MapWorker(reader) : null;
}
/// <inheritdoc />
public async Task<IReadOnlyList<WorkerEntity>> ListAsync(CancellationToken cancellationToken = default)
{
const string sql = """
SELECT id, tenant_id, hostname, process_id, job_types, max_concurrent_jobs, current_jobs,
status, last_heartbeat_at, registered_at, metadata
FROM scheduler.workers
ORDER BY registered_at DESC
""";
await using var connection = await DataSource.OpenSystemConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
var results = new List<WorkerEntity>();
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
results.Add(MapWorker(reader));
}
return results;
}
/// <inheritdoc />
public async Task<IReadOnlyList<WorkerEntity>> ListByStatusAsync(string status, CancellationToken cancellationToken = default)
{
const string sql = """
SELECT id, tenant_id, hostname, process_id, job_types, max_concurrent_jobs, current_jobs,
status, last_heartbeat_at, registered_at, metadata
FROM scheduler.workers
WHERE status = @status
ORDER BY registered_at DESC
""";
await using var connection = await DataSource.OpenSystemConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddParameter(command, "status", status);
var results = new List<WorkerEntity>();
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
results.Add(MapWorker(reader));
}
return results;
}
/// <inheritdoc />
public async Task<IReadOnlyList<WorkerEntity>> GetByTenantIdAsync(string tenantId, CancellationToken cancellationToken = default)
{
const string sql = """
SELECT id, tenant_id, hostname, process_id, job_types, max_concurrent_jobs, current_jobs,
status, last_heartbeat_at, registered_at, metadata
FROM scheduler.workers
WHERE tenant_id = @tenant_id OR tenant_id IS NULL
ORDER BY registered_at DESC
""";
await using var connection = await DataSource.OpenSystemConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddParameter(command, "tenant_id", tenantId);
var results = new List<WorkerEntity>();
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
results.Add(MapWorker(reader));
}
return results;
}
/// <inheritdoc />
public async Task<WorkerEntity> UpsertAsync(WorkerEntity worker, CancellationToken cancellationToken = default)
{
const string sql = """
INSERT INTO scheduler.workers (id, tenant_id, hostname, process_id, job_types, max_concurrent_jobs, metadata)
VALUES (@id, @tenant_id, @hostname, @process_id, @job_types, @max_concurrent_jobs, @metadata::jsonb)
ON CONFLICT (id) DO UPDATE SET
tenant_id = EXCLUDED.tenant_id,
hostname = EXCLUDED.hostname,
process_id = EXCLUDED.process_id,
job_types = EXCLUDED.job_types,
max_concurrent_jobs = EXCLUDED.max_concurrent_jobs,
metadata = EXCLUDED.metadata,
last_heartbeat_at = NOW(),
status = 'active'
RETURNING *
""";
await using var connection = await DataSource.OpenSystemConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddParameter(command, "id", worker.Id);
AddParameter(command, "tenant_id", worker.TenantId);
AddParameter(command, "hostname", worker.Hostname);
AddParameter(command, "process_id", worker.ProcessId);
AddTextArrayParameter(command, "job_types", worker.JobTypes);
AddParameter(command, "max_concurrent_jobs", worker.MaxConcurrentJobs);
AddJsonbParameter(command, "metadata", worker.Metadata);
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
return MapWorker(reader);
}
/// <inheritdoc />
public async Task<bool> HeartbeatAsync(string id, int currentJobs, CancellationToken cancellationToken = default)
{
const string sql = """
UPDATE scheduler.workers
SET last_heartbeat_at = NOW(), current_jobs = @current_jobs
WHERE id = @id
""";
await using var connection = await DataSource.OpenSystemConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddParameter(command, "id", id);
AddParameter(command, "current_jobs", currentJobs);
var rows = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
return rows > 0;
}
/// <inheritdoc />
public async Task<bool> SetStatusAsync(string id, string status, CancellationToken cancellationToken = default)
{
const string sql = """
UPDATE scheduler.workers
SET status = @status
WHERE id = @id
""";
await using var connection = await DataSource.OpenSystemConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddParameter(command, "id", id);
AddParameter(command, "status", status);
var rows = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
return rows > 0;
}
/// <inheritdoc />
public async Task<bool> DeleteAsync(string id, CancellationToken cancellationToken = default)
{
const string sql = "DELETE FROM scheduler.workers WHERE id = @id";
await using var connection = await DataSource.OpenSystemConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddParameter(command, "id", id);
var rows = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
return rows > 0;
}
/// <inheritdoc />
public async Task<IReadOnlyList<WorkerEntity>> GetStaleWorkersAsync(TimeSpan staleDuration, CancellationToken cancellationToken = default)
{
const string sql = """
SELECT id, tenant_id, hostname, process_id, job_types, max_concurrent_jobs, current_jobs,
status, last_heartbeat_at, registered_at, metadata
FROM scheduler.workers
WHERE status = 'active' AND last_heartbeat_at < NOW() - @stale_duration
ORDER BY last_heartbeat_at
""";
await using var connection = await DataSource.OpenSystemConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddParameter(command, "stale_duration", staleDuration);
var results = new List<WorkerEntity>();
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
results.Add(MapWorker(reader));
}
return results;
}
private static WorkerEntity MapWorker(NpgsqlDataReader reader) => new()
{
Id = reader.GetString(reader.GetOrdinal("id")),
TenantId = GetNullableString(reader, reader.GetOrdinal("tenant_id")),
Hostname = reader.GetString(reader.GetOrdinal("hostname")),
ProcessId = reader.IsDBNull(reader.GetOrdinal("process_id")) ? null : reader.GetInt32(reader.GetOrdinal("process_id")),
JobTypes = reader.IsDBNull(reader.GetOrdinal("job_types")) ? [] : reader.GetFieldValue<string[]>(reader.GetOrdinal("job_types")),
MaxConcurrentJobs = reader.GetInt32(reader.GetOrdinal("max_concurrent_jobs")),
CurrentJobs = reader.GetInt32(reader.GetOrdinal("current_jobs")),
Status = reader.GetString(reader.GetOrdinal("status")),
LastHeartbeatAt = reader.GetFieldValue<DateTimeOffset>(reader.GetOrdinal("last_heartbeat_at")),
RegisteredAt = reader.GetFieldValue<DateTimeOffset>(reader.GetOrdinal("registered_at")),
Metadata = reader.GetString(reader.GetOrdinal("metadata"))
};
}