docs re-org, audit fixes, build fixes
This commit is contained in:
@@ -16,6 +16,7 @@ public sealed class LedgerExporter : ILedgerExporter
|
||||
private readonly ILedgerRepository _ledgerRepository;
|
||||
private readonly ILedgerExportRepository _exportRepository;
|
||||
private readonly ILogger<LedgerExporter> _logger;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
|
||||
private static readonly JsonSerializerOptions JsonOptions = new()
|
||||
{
|
||||
@@ -32,11 +33,13 @@ public sealed class LedgerExporter : ILedgerExporter
|
||||
public LedgerExporter(
|
||||
ILedgerRepository ledgerRepository,
|
||||
ILedgerExportRepository exportRepository,
|
||||
ILogger<LedgerExporter> logger)
|
||||
ILogger<LedgerExporter> logger,
|
||||
TimeProvider? timeProvider = null)
|
||||
{
|
||||
_ledgerRepository = ledgerRepository;
|
||||
_exportRepository = exportRepository;
|
||||
_logger = logger;
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
@@ -44,7 +47,7 @@ public sealed class LedgerExporter : ILedgerExporter
|
||||
LedgerExport export,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
var startTime = DateTimeOffset.UtcNow;
|
||||
var startTime = _timeProvider.GetUtcNow();
|
||||
|
||||
try
|
||||
{
|
||||
@@ -83,7 +86,7 @@ public sealed class LedgerExporter : ILedgerExporter
|
||||
export = export.Complete(outputUri, digest, sizeBytes, entries.Count);
|
||||
export = await _exportRepository.UpdateAsync(export, cancellationToken);
|
||||
|
||||
var duration = DateTimeOffset.UtcNow - startTime;
|
||||
var duration = _timeProvider.GetUtcNow() - startTime;
|
||||
OrchestratorMetrics.LedgerExportCompleted(export.TenantId, export.Format);
|
||||
OrchestratorMetrics.RecordLedgerExportDuration(export.TenantId, export.Format, duration.TotalSeconds);
|
||||
OrchestratorMetrics.RecordLedgerExportSize(export.TenantId, export.Format, sizeBytes);
|
||||
@@ -165,12 +168,12 @@ public sealed class LedgerExporter : ILedgerExporter
|
||||
return (content, digest);
|
||||
}
|
||||
|
||||
private static string GenerateJson(IReadOnlyList<RunLedgerEntry> entries)
|
||||
private string GenerateJson(IReadOnlyList<RunLedgerEntry> entries)
|
||||
{
|
||||
var exportData = new LedgerExportData
|
||||
{
|
||||
SchemaVersion = "1.0.0",
|
||||
ExportedAt = DateTimeOffset.UtcNow,
|
||||
ExportedAt = _timeProvider.GetUtcNow(),
|
||||
EntryCount = entries.Count,
|
||||
Entries = entries.Select(MapEntry).ToList()
|
||||
};
|
||||
|
||||
@@ -56,15 +56,18 @@ public sealed class PostgresDuplicateSuppressor : IDuplicateSuppressor
|
||||
private readonly OrchestratorDataSource _dataSource;
|
||||
private readonly string _tenantId;
|
||||
private readonly ILogger<PostgresDuplicateSuppressor> _logger;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
|
||||
public PostgresDuplicateSuppressor(
|
||||
OrchestratorDataSource dataSource,
|
||||
string tenantId,
|
||||
ILogger<PostgresDuplicateSuppressor> logger)
|
||||
ILogger<PostgresDuplicateSuppressor> logger,
|
||||
TimeProvider? timeProvider = null)
|
||||
{
|
||||
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
|
||||
_tenantId = tenantId ?? throw new ArgumentNullException(nameof(tenantId));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
}
|
||||
|
||||
public async Task<bool> HasProcessedAsync(string scopeKey, string eventKey, CancellationToken cancellationToken)
|
||||
@@ -125,7 +128,7 @@ public sealed class PostgresDuplicateSuppressor : IDuplicateSuppressor
|
||||
command.Parameters.AddWithValue("event_key", eventKey);
|
||||
command.Parameters.AddWithValue("event_time", eventTime);
|
||||
command.Parameters.AddWithValue("batch_id", (object?)batchId ?? DBNull.Value);
|
||||
command.Parameters.AddWithValue("expires_at", DateTimeOffset.UtcNow + ttl);
|
||||
command.Parameters.AddWithValue("expires_at", _timeProvider.GetUtcNow() + ttl);
|
||||
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
@@ -143,7 +146,7 @@ public sealed class PostgresDuplicateSuppressor : IDuplicateSuppressor
|
||||
return;
|
||||
}
|
||||
|
||||
var expiresAt = DateTimeOffset.UtcNow + ttl;
|
||||
var expiresAt = _timeProvider.GetUtcNow() + ttl;
|
||||
|
||||
await using var connection = await _dataSource.OpenConnectionAsync(_tenantId, "writer", cancellationToken).ConfigureAwait(false);
|
||||
await using var transaction = await connection.BeginTransactionAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
@@ -108,13 +108,16 @@ public sealed class PostgresJobRepository : IJobRepository
|
||||
|
||||
private readonly OrchestratorDataSource _dataSource;
|
||||
private readonly ILogger<PostgresJobRepository> _logger;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
|
||||
public PostgresJobRepository(
|
||||
OrchestratorDataSource dataSource,
|
||||
ILogger<PostgresJobRepository> logger)
|
||||
ILogger<PostgresJobRepository> logger,
|
||||
TimeProvider? timeProvider = null)
|
||||
{
|
||||
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
}
|
||||
|
||||
public async Task<Job?> GetByIdAsync(string tenantId, Guid jobId, CancellationToken cancellationToken)
|
||||
@@ -228,8 +231,8 @@ public sealed class PostgresJobRepository : IJobRepository
|
||||
command.Parameters.AddWithValue("lease_id", leaseId);
|
||||
command.Parameters.AddWithValue("worker_id", workerId);
|
||||
command.Parameters.AddWithValue("lease_until", leaseUntil);
|
||||
command.Parameters.AddWithValue("leased_at", DateTimeOffset.UtcNow);
|
||||
command.Parameters.AddWithValue("now", DateTimeOffset.UtcNow);
|
||||
command.Parameters.AddWithValue("leased_at", _timeProvider.GetUtcNow());
|
||||
command.Parameters.AddWithValue("now", _timeProvider.GetUtcNow());
|
||||
|
||||
if (jobType != null)
|
||||
{
|
||||
@@ -263,7 +266,7 @@ public sealed class PostgresJobRepository : IJobRepository
|
||||
command.Parameters.AddWithValue("job_id", jobId);
|
||||
command.Parameters.AddWithValue("lease_id", leaseId);
|
||||
command.Parameters.AddWithValue("new_lease_until", newLeaseUntil);
|
||||
command.Parameters.AddWithValue("now", DateTimeOffset.UtcNow);
|
||||
command.Parameters.AddWithValue("now", _timeProvider.GetUtcNow());
|
||||
|
||||
var rows = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
return rows > 0;
|
||||
|
||||
@@ -13,6 +13,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
{
|
||||
private readonly OrchestratorDataSource _dataSource;
|
||||
private readonly ILogger<PostgresPackRegistryRepository> _logger;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
|
||||
private const string PackColumns = """
|
||||
pack_id, tenant_id, project_id, name, display_name, description,
|
||||
@@ -33,10 +34,12 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
|
||||
public PostgresPackRegistryRepository(
|
||||
OrchestratorDataSource dataSource,
|
||||
ILogger<PostgresPackRegistryRepository> logger)
|
||||
ILogger<PostgresPackRegistryRepository> logger,
|
||||
TimeProvider? timeProvider = null)
|
||||
{
|
||||
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
}
|
||||
|
||||
// Pack CRUD
|
||||
@@ -264,7 +267,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
command.Parameters.AddWithValue("tenant_id", tenantId);
|
||||
command.Parameters.AddWithValue("pack_id", packId);
|
||||
command.Parameters.AddWithValue("status", status.ToString().ToLowerInvariant());
|
||||
command.Parameters.AddWithValue("updated_at", DateTimeOffset.UtcNow.UtcDateTime);
|
||||
command.Parameters.AddWithValue("updated_at", _timeProvider.GetUtcNow().UtcDateTime);
|
||||
command.Parameters.AddWithValue("updated_by", updatedBy);
|
||||
command.Parameters.AddWithValue("published_at", (object?)publishedAt?.UtcDateTime ?? DBNull.Value);
|
||||
command.Parameters.AddWithValue("published_by", (object?)publishedBy ?? DBNull.Value);
|
||||
@@ -534,7 +537,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
command.Parameters.AddWithValue("tenant_id", tenantId);
|
||||
command.Parameters.AddWithValue("pack_version_id", packVersionId);
|
||||
command.Parameters.AddWithValue("status", status.ToString().ToLowerInvariant());
|
||||
command.Parameters.AddWithValue("updated_at", DateTimeOffset.UtcNow.UtcDateTime);
|
||||
command.Parameters.AddWithValue("updated_at", _timeProvider.GetUtcNow().UtcDateTime);
|
||||
command.Parameters.AddWithValue("updated_by", updatedBy);
|
||||
command.Parameters.AddWithValue("published_at", (object?)publishedAt?.UtcDateTime ?? DBNull.Value);
|
||||
command.Parameters.AddWithValue("published_by", (object?)publishedBy ?? DBNull.Value);
|
||||
@@ -574,7 +577,7 @@ public sealed class PostgresPackRegistryRepository : IPackRegistryRepository
|
||||
command.Parameters.AddWithValue("signature_algorithm", signatureAlgorithm);
|
||||
command.Parameters.AddWithValue("signed_by", signedBy);
|
||||
command.Parameters.AddWithValue("signed_at", signedAt.UtcDateTime);
|
||||
command.Parameters.AddWithValue("updated_at", DateTimeOffset.UtcNow.UtcDateTime);
|
||||
command.Parameters.AddWithValue("updated_at", _timeProvider.GetUtcNow().UtcDateTime);
|
||||
|
||||
await command.ExecuteNonQueryAsync(cancellationToken);
|
||||
}
|
||||
|
||||
@@ -128,11 +128,16 @@ public sealed class PostgresPackRunRepository : IPackRunRepository
|
||||
|
||||
private readonly OrchestratorDataSource _dataSource;
|
||||
private readonly ILogger<PostgresPackRunRepository> _logger;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
|
||||
public PostgresPackRunRepository(OrchestratorDataSource dataSource, ILogger<PostgresPackRunRepository> logger)
|
||||
public PostgresPackRunRepository(
|
||||
OrchestratorDataSource dataSource,
|
||||
ILogger<PostgresPackRunRepository> logger,
|
||||
TimeProvider? timeProvider = null)
|
||||
{
|
||||
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
}
|
||||
|
||||
public async Task<PackRun?> GetByIdAsync(string tenantId, Guid packRunId, CancellationToken cancellationToken)
|
||||
@@ -244,7 +249,7 @@ public sealed class PostgresPackRunRepository : IPackRunRepository
|
||||
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "writer", cancellationToken).ConfigureAwait(false);
|
||||
await using var command = new NpgsqlCommand(sql, connection);
|
||||
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
command.Parameters.AddWithValue("tenant_id", tenantId);
|
||||
command.Parameters.AddWithValue("lease_id", leaseId);
|
||||
command.Parameters.AddWithValue("task_runner_id", taskRunnerId);
|
||||
@@ -275,7 +280,7 @@ public sealed class PostgresPackRunRepository : IPackRunRepository
|
||||
command.Parameters.AddWithValue("pack_run_id", packRunId);
|
||||
command.Parameters.AddWithValue("lease_id", leaseId);
|
||||
command.Parameters.AddWithValue("new_lease_until", newLeaseUntil);
|
||||
command.Parameters.AddWithValue("now", DateTimeOffset.UtcNow);
|
||||
command.Parameters.AddWithValue("now", _timeProvider.GetUtcNow());
|
||||
|
||||
var rows = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
return rows > 0;
|
||||
@@ -292,7 +297,7 @@ public sealed class PostgresPackRunRepository : IPackRunRepository
|
||||
command.Parameters.AddWithValue("lease_id", leaseId);
|
||||
command.Parameters.AddWithValue("status", StatusToString(newStatus));
|
||||
command.Parameters.AddWithValue("reason", (object?)reason ?? DBNull.Value);
|
||||
command.Parameters.AddWithValue("completed_at", DateTimeOffset.UtcNow);
|
||||
command.Parameters.AddWithValue("completed_at", _timeProvider.GetUtcNow());
|
||||
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
@@ -113,13 +113,16 @@ public sealed class PostgresQuotaRepository : IQuotaRepository
|
||||
|
||||
private readonly OrchestratorDataSource _dataSource;
|
||||
private readonly ILogger<PostgresQuotaRepository> _logger;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
|
||||
public PostgresQuotaRepository(
|
||||
OrchestratorDataSource dataSource,
|
||||
ILogger<PostgresQuotaRepository> logger)
|
||||
ILogger<PostgresQuotaRepository> logger,
|
||||
TimeProvider? timeProvider = null)
|
||||
{
|
||||
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
}
|
||||
|
||||
public async Task<Quota?> GetByIdAsync(string tenantId, Guid quotaId, CancellationToken cancellationToken)
|
||||
@@ -229,7 +232,7 @@ public sealed class PostgresQuotaRepository : IQuotaRepository
|
||||
command.Parameters.AddWithValue("current_active", currentActive);
|
||||
command.Parameters.AddWithValue("current_hour_count", currentHourCount);
|
||||
command.Parameters.AddWithValue("current_hour_start", currentHourStart);
|
||||
command.Parameters.AddWithValue("updated_at", DateTimeOffset.UtcNow);
|
||||
command.Parameters.AddWithValue("updated_at", _timeProvider.GetUtcNow());
|
||||
command.Parameters.AddWithValue("updated_by", updatedBy);
|
||||
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
@@ -245,7 +248,7 @@ public sealed class PostgresQuotaRepository : IQuotaRepository
|
||||
command.Parameters.AddWithValue("quota_id", quotaId);
|
||||
command.Parameters.AddWithValue("pause_reason", reason);
|
||||
command.Parameters.AddWithValue("quota_ticket", (object?)ticket ?? DBNull.Value);
|
||||
command.Parameters.AddWithValue("updated_at", DateTimeOffset.UtcNow);
|
||||
command.Parameters.AddWithValue("updated_at", _timeProvider.GetUtcNow());
|
||||
command.Parameters.AddWithValue("updated_by", updatedBy);
|
||||
|
||||
var rows = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
@@ -263,7 +266,7 @@ public sealed class PostgresQuotaRepository : IQuotaRepository
|
||||
|
||||
command.Parameters.AddWithValue("tenant_id", tenantId);
|
||||
command.Parameters.AddWithValue("quota_id", quotaId);
|
||||
command.Parameters.AddWithValue("updated_at", DateTimeOffset.UtcNow);
|
||||
command.Parameters.AddWithValue("updated_at", _timeProvider.GetUtcNow());
|
||||
command.Parameters.AddWithValue("updated_by", updatedBy);
|
||||
|
||||
var rows = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
@@ -281,7 +284,7 @@ public sealed class PostgresQuotaRepository : IQuotaRepository
|
||||
|
||||
command.Parameters.AddWithValue("tenant_id", tenantId);
|
||||
command.Parameters.AddWithValue("quota_id", quotaId);
|
||||
command.Parameters.AddWithValue("updated_at", DateTimeOffset.UtcNow);
|
||||
command.Parameters.AddWithValue("updated_at", _timeProvider.GetUtcNow());
|
||||
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
@@ -294,7 +297,7 @@ public sealed class PostgresQuotaRepository : IQuotaRepository
|
||||
|
||||
command.Parameters.AddWithValue("tenant_id", tenantId);
|
||||
command.Parameters.AddWithValue("quota_id", quotaId);
|
||||
command.Parameters.AddWithValue("updated_at", DateTimeOffset.UtcNow);
|
||||
command.Parameters.AddWithValue("updated_at", _timeProvider.GetUtcNow());
|
||||
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
@@ -69,13 +69,16 @@ public sealed class PostgresRunRepository : IRunRepository
|
||||
|
||||
private readonly OrchestratorDataSource _dataSource;
|
||||
private readonly ILogger<PostgresRunRepository> _logger;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
|
||||
public PostgresRunRepository(
|
||||
OrchestratorDataSource dataSource,
|
||||
ILogger<PostgresRunRepository> logger)
|
||||
ILogger<PostgresRunRepository> logger,
|
||||
TimeProvider? timeProvider = null)
|
||||
{
|
||||
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
}
|
||||
|
||||
public async Task<Run?> GetByIdAsync(string tenantId, Guid runId, CancellationToken cancellationToken)
|
||||
@@ -149,7 +152,7 @@ public sealed class PostgresRunRepository : IRunRepository
|
||||
command.Parameters.AddWithValue("tenant_id", tenantId);
|
||||
command.Parameters.AddWithValue("run_id", runId);
|
||||
command.Parameters.AddWithValue("succeeded", succeeded);
|
||||
command.Parameters.AddWithValue("now", DateTimeOffset.UtcNow);
|
||||
command.Parameters.AddWithValue("now", _timeProvider.GetUtcNow());
|
||||
|
||||
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
|
||||
if (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
|
||||
|
||||
@@ -74,13 +74,16 @@ public sealed class PostgresSourceRepository : ISourceRepository
|
||||
|
||||
private readonly OrchestratorDataSource _dataSource;
|
||||
private readonly ILogger<PostgresSourceRepository> _logger;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
|
||||
public PostgresSourceRepository(
|
||||
OrchestratorDataSource dataSource,
|
||||
ILogger<PostgresSourceRepository> logger)
|
||||
ILogger<PostgresSourceRepository> logger,
|
||||
TimeProvider? timeProvider = null)
|
||||
{
|
||||
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
}
|
||||
|
||||
public async Task<Source?> GetByIdAsync(string tenantId, Guid sourceId, CancellationToken cancellationToken)
|
||||
@@ -175,7 +178,7 @@ public sealed class PostgresSourceRepository : ISourceRepository
|
||||
command.Parameters.AddWithValue("source_id", sourceId);
|
||||
command.Parameters.AddWithValue("pause_reason", reason);
|
||||
command.Parameters.AddWithValue("pause_ticket", (object?)ticket ?? DBNull.Value);
|
||||
command.Parameters.AddWithValue("updated_at", DateTimeOffset.UtcNow);
|
||||
command.Parameters.AddWithValue("updated_at", _timeProvider.GetUtcNow());
|
||||
command.Parameters.AddWithValue("updated_by", updatedBy);
|
||||
|
||||
var rows = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
@@ -193,7 +196,7 @@ public sealed class PostgresSourceRepository : ISourceRepository
|
||||
|
||||
command.Parameters.AddWithValue("tenant_id", tenantId);
|
||||
command.Parameters.AddWithValue("source_id", sourceId);
|
||||
command.Parameters.AddWithValue("updated_at", DateTimeOffset.UtcNow);
|
||||
command.Parameters.AddWithValue("updated_at", _timeProvider.GetUtcNow());
|
||||
command.Parameters.AddWithValue("updated_by", updatedBy);
|
||||
|
||||
var rows = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
@@ -77,13 +77,16 @@ public sealed class PostgresThrottleRepository : IThrottleRepository
|
||||
|
||||
private readonly OrchestratorDataSource _dataSource;
|
||||
private readonly ILogger<PostgresThrottleRepository> _logger;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
|
||||
public PostgresThrottleRepository(
|
||||
OrchestratorDataSource dataSource,
|
||||
ILogger<PostgresThrottleRepository> logger)
|
||||
ILogger<PostgresThrottleRepository> logger,
|
||||
TimeProvider? timeProvider = null)
|
||||
{
|
||||
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
}
|
||||
|
||||
public async Task<Throttle?> GetByIdAsync(string tenantId, Guid throttleId, CancellationToken cancellationToken)
|
||||
@@ -110,7 +113,7 @@ public sealed class PostgresThrottleRepository : IThrottleRepository
|
||||
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
|
||||
command.Parameters.AddWithValue("tenant_id", tenantId);
|
||||
command.Parameters.AddWithValue("source_id", sourceId);
|
||||
command.Parameters.AddWithValue("now", DateTimeOffset.UtcNow);
|
||||
command.Parameters.AddWithValue("now", _timeProvider.GetUtcNow());
|
||||
|
||||
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
|
||||
var throttles = new List<Throttle>();
|
||||
@@ -128,7 +131,7 @@ public sealed class PostgresThrottleRepository : IThrottleRepository
|
||||
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
|
||||
command.Parameters.AddWithValue("tenant_id", tenantId);
|
||||
command.Parameters.AddWithValue("job_type", jobType);
|
||||
command.Parameters.AddWithValue("now", DateTimeOffset.UtcNow);
|
||||
command.Parameters.AddWithValue("now", _timeProvider.GetUtcNow());
|
||||
|
||||
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
|
||||
var throttles = new List<Throttle>();
|
||||
|
||||
@@ -100,13 +100,16 @@ public sealed class PostgresWatermarkRepository : IWatermarkRepository
|
||||
|
||||
private readonly OrchestratorDataSource _dataSource;
|
||||
private readonly ILogger<PostgresWatermarkRepository> _logger;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
|
||||
public PostgresWatermarkRepository(
|
||||
OrchestratorDataSource dataSource,
|
||||
ILogger<PostgresWatermarkRepository> logger)
|
||||
ILogger<PostgresWatermarkRepository> logger,
|
||||
TimeProvider? timeProvider = null)
|
||||
{
|
||||
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
}
|
||||
|
||||
public async Task<Watermark?> GetByScopeKeyAsync(string tenantId, string scopeKey, CancellationToken cancellationToken)
|
||||
@@ -271,7 +274,7 @@ public sealed class PostgresWatermarkRepository : IWatermarkRepository
|
||||
int limit,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var thresholdTime = DateTimeOffset.UtcNow - lagThreshold;
|
||||
var thresholdTime = _timeProvider.GetUtcNow() - lagThreshold;
|
||||
|
||||
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false);
|
||||
await using var command = new NpgsqlCommand(SelectLaggingSql, connection);
|
||||
|
||||
@@ -152,7 +152,8 @@ public sealed record BackfillCheckpoint(
|
||||
int batchNumber,
|
||||
DateTimeOffset batchStart,
|
||||
DateTimeOffset batchEnd,
|
||||
int eventsInBatch)
|
||||
int eventsInBatch,
|
||||
DateTimeOffset startedAt)
|
||||
{
|
||||
return new BackfillCheckpoint(
|
||||
CheckpointId: Guid.NewGuid(),
|
||||
@@ -166,7 +167,7 @@ public sealed record BackfillCheckpoint(
|
||||
EventsSkipped: 0,
|
||||
EventsFailed: 0,
|
||||
BatchHash: null,
|
||||
StartedAt: DateTimeOffset.UtcNow,
|
||||
StartedAt: startedAt,
|
||||
CompletedAt: null,
|
||||
ErrorMessage: null);
|
||||
}
|
||||
@@ -174,7 +175,7 @@ public sealed record BackfillCheckpoint(
|
||||
/// <summary>
|
||||
/// Marks the checkpoint as complete.
|
||||
/// </summary>
|
||||
public BackfillCheckpoint Complete(int processed, int skipped, int failed, string? batchHash)
|
||||
public BackfillCheckpoint Complete(int processed, int skipped, int failed, string? batchHash, DateTimeOffset completedAt)
|
||||
{
|
||||
return this with
|
||||
{
|
||||
@@ -182,18 +183,18 @@ public sealed record BackfillCheckpoint(
|
||||
EventsSkipped = skipped,
|
||||
EventsFailed = failed,
|
||||
BatchHash = batchHash,
|
||||
CompletedAt = DateTimeOffset.UtcNow
|
||||
CompletedAt = completedAt
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Marks the checkpoint as failed.
|
||||
/// </summary>
|
||||
public BackfillCheckpoint Fail(string error)
|
||||
public BackfillCheckpoint Fail(string error, DateTimeOffset completedAt)
|
||||
{
|
||||
return this with
|
||||
{
|
||||
CompletedAt = DateTimeOffset.UtcNow,
|
||||
CompletedAt = completedAt,
|
||||
ErrorMessage = error
|
||||
};
|
||||
}
|
||||
|
||||
@@ -14,15 +14,18 @@ public sealed class FirstSignalSnapshotWriter : BackgroundService
|
||||
private readonly IServiceScopeFactory _scopeFactory;
|
||||
private readonly FirstSignalSnapshotWriterOptions _options;
|
||||
private readonly ILogger<FirstSignalSnapshotWriter> _logger;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
|
||||
public FirstSignalSnapshotWriter(
|
||||
IServiceScopeFactory scopeFactory,
|
||||
IOptions<FirstSignalOptions> options,
|
||||
ILogger<FirstSignalSnapshotWriter> logger)
|
||||
ILogger<FirstSignalSnapshotWriter> logger,
|
||||
TimeProvider? timeProvider = null)
|
||||
{
|
||||
_scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(scopeFactory));
|
||||
_options = (options ?? throw new ArgumentNullException(nameof(options))).Value.SnapshotWriter;
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
@@ -76,7 +79,7 @@ public sealed class FirstSignalSnapshotWriter : BackgroundService
|
||||
var runRepository = scope.ServiceProvider.GetRequiredService<IRunRepository>();
|
||||
var firstSignalService = scope.ServiceProvider.GetRequiredService<CoreServices.IFirstSignalService>();
|
||||
|
||||
var createdAfter = DateTimeOffset.UtcNow.Subtract(lookback);
|
||||
var createdAfter = _timeProvider.GetUtcNow().Subtract(lookback);
|
||||
|
||||
var pending = await runRepository.ListAsync(
|
||||
tenantId,
|
||||
|
||||
@@ -36,13 +36,14 @@ public static class HealthEndpoints
|
||||
return app;
|
||||
}
|
||||
|
||||
private static IResult GetHealth()
|
||||
private static IResult GetHealth([FromServices] TimeProvider timeProvider)
|
||||
{
|
||||
return Results.Ok(new HealthResponse("ok", DateTimeOffset.UtcNow));
|
||||
return Results.Ok(new HealthResponse("ok", timeProvider.GetUtcNow()));
|
||||
}
|
||||
|
||||
private static async Task<IResult> GetReadiness(
|
||||
[FromServices] OrchestratorDataSource dataSource,
|
||||
[FromServices] TimeProvider timeProvider,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
@@ -53,14 +54,14 @@ public static class HealthEndpoints
|
||||
if (!dbHealthy)
|
||||
{
|
||||
return Results.Json(
|
||||
new ReadinessResponse("not_ready", DateTimeOffset.UtcNow, new Dictionary<string, string>
|
||||
new ReadinessResponse("not_ready", timeProvider.GetUtcNow(), new Dictionary<string, string>
|
||||
{
|
||||
["database"] = "unhealthy"
|
||||
}),
|
||||
statusCode: StatusCodes.Status503ServiceUnavailable);
|
||||
}
|
||||
|
||||
return Results.Ok(new ReadinessResponse("ready", DateTimeOffset.UtcNow, new Dictionary<string, string>
|
||||
return Results.Ok(new ReadinessResponse("ready", timeProvider.GetUtcNow(), new Dictionary<string, string>
|
||||
{
|
||||
["database"] = "healthy"
|
||||
}));
|
||||
@@ -68,7 +69,7 @@ public static class HealthEndpoints
|
||||
catch (Exception ex)
|
||||
{
|
||||
return Results.Json(
|
||||
new ReadinessResponse("not_ready", DateTimeOffset.UtcNow, new Dictionary<string, string>
|
||||
new ReadinessResponse("not_ready", timeProvider.GetUtcNow(), new Dictionary<string, string>
|
||||
{
|
||||
["database"] = $"error: {ex.Message}"
|
||||
}),
|
||||
@@ -76,14 +77,15 @@ public static class HealthEndpoints
|
||||
}
|
||||
}
|
||||
|
||||
private static IResult GetLiveness()
|
||||
private static IResult GetLiveness([FromServices] TimeProvider timeProvider)
|
||||
{
|
||||
// Liveness just checks the process is alive
|
||||
return Results.Ok(new HealthResponse("alive", DateTimeOffset.UtcNow));
|
||||
return Results.Ok(new HealthResponse("alive", timeProvider.GetUtcNow()));
|
||||
}
|
||||
|
||||
private static async Task<IResult> GetHealthDetails(
|
||||
[FromServices] OrchestratorDataSource dataSource,
|
||||
[FromServices] TimeProvider timeProvider,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var checks = new Dictionary<string, HealthCheckResult>();
|
||||
@@ -96,12 +98,12 @@ public static class HealthEndpoints
|
||||
checks["database"] = new HealthCheckResult(
|
||||
dbHealthy ? "healthy" : "unhealthy",
|
||||
dbHealthy ? null : "Connection test failed",
|
||||
DateTimeOffset.UtcNow);
|
||||
timeProvider.GetUtcNow());
|
||||
overallHealthy &= dbHealthy;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
checks["database"] = new HealthCheckResult("unhealthy", ex.Message, DateTimeOffset.UtcNow);
|
||||
checks["database"] = new HealthCheckResult("unhealthy", ex.Message, timeProvider.GetUtcNow());
|
||||
overallHealthy = false;
|
||||
}
|
||||
|
||||
@@ -114,7 +116,7 @@ public static class HealthEndpoints
|
||||
checks["memory"] = new HealthCheckResult(
|
||||
memoryHealthy ? "healthy" : "degraded",
|
||||
$"Used: {memoryUsedMb:F2} MB",
|
||||
DateTimeOffset.UtcNow);
|
||||
timeProvider.GetUtcNow());
|
||||
|
||||
// Thread pool check
|
||||
ThreadPool.GetAvailableThreads(out var workerThreads, out var completionPortThreads);
|
||||
@@ -124,11 +126,11 @@ public static class HealthEndpoints
|
||||
checks["threadPool"] = new HealthCheckResult(
|
||||
threadPoolHealthy ? "healthy" : "degraded",
|
||||
$"Worker threads available: {workerThreads}/{maxWorkerThreads}",
|
||||
DateTimeOffset.UtcNow);
|
||||
timeProvider.GetUtcNow());
|
||||
|
||||
var response = new HealthDetailsResponse(
|
||||
overallHealthy ? "healthy" : "unhealthy",
|
||||
DateTimeOffset.UtcNow,
|
||||
timeProvider.GetUtcNow(),
|
||||
checks);
|
||||
|
||||
return overallHealthy
|
||||
|
||||
@@ -55,10 +55,12 @@ public static class KpiEndpoints
|
||||
[FromQuery] DateTimeOffset? to,
|
||||
[FromQuery] string? tenant,
|
||||
[FromServices] IKpiCollector collector,
|
||||
[FromServices] TimeProvider timeProvider,
|
||||
CancellationToken ct)
|
||||
{
|
||||
var start = from ?? DateTimeOffset.UtcNow.AddDays(-7);
|
||||
var end = to ?? DateTimeOffset.UtcNow;
|
||||
var now = timeProvider.GetUtcNow();
|
||||
var start = from ?? now.AddDays(-7);
|
||||
var end = to ?? now;
|
||||
|
||||
var kpis = await collector.CollectAsync(start, end, tenant, ct);
|
||||
return Results.Ok(kpis);
|
||||
@@ -69,11 +71,13 @@ public static class KpiEndpoints
|
||||
[FromQuery] DateTimeOffset? to,
|
||||
[FromQuery] string? tenant,
|
||||
[FromServices] IKpiCollector collector,
|
||||
[FromServices] TimeProvider timeProvider,
|
||||
CancellationToken ct)
|
||||
{
|
||||
var now = timeProvider.GetUtcNow();
|
||||
var kpis = await collector.CollectAsync(
|
||||
from ?? DateTimeOffset.UtcNow.AddDays(-7),
|
||||
to ?? DateTimeOffset.UtcNow,
|
||||
from ?? now.AddDays(-7),
|
||||
to ?? now,
|
||||
tenant,
|
||||
ct);
|
||||
return Results.Ok(kpis.Reachability);
|
||||
@@ -84,11 +88,13 @@ public static class KpiEndpoints
|
||||
[FromQuery] DateTimeOffset? to,
|
||||
[FromQuery] string? tenant,
|
||||
[FromServices] IKpiCollector collector,
|
||||
[FromServices] TimeProvider timeProvider,
|
||||
CancellationToken ct)
|
||||
{
|
||||
var now = timeProvider.GetUtcNow();
|
||||
var kpis = await collector.CollectAsync(
|
||||
from ?? DateTimeOffset.UtcNow.AddDays(-7),
|
||||
to ?? DateTimeOffset.UtcNow,
|
||||
from ?? now.AddDays(-7),
|
||||
to ?? now,
|
||||
tenant,
|
||||
ct);
|
||||
return Results.Ok(kpis.Explainability);
|
||||
@@ -99,11 +105,13 @@ public static class KpiEndpoints
|
||||
[FromQuery] DateTimeOffset? to,
|
||||
[FromQuery] string? tenant,
|
||||
[FromServices] IKpiCollector collector,
|
||||
[FromServices] TimeProvider timeProvider,
|
||||
CancellationToken ct)
|
||||
{
|
||||
var now = timeProvider.GetUtcNow();
|
||||
var kpis = await collector.CollectAsync(
|
||||
from ?? DateTimeOffset.UtcNow.AddDays(-7),
|
||||
to ?? DateTimeOffset.UtcNow,
|
||||
from ?? now.AddDays(-7),
|
||||
to ?? now,
|
||||
tenant,
|
||||
ct);
|
||||
return Results.Ok(kpis.Runtime);
|
||||
@@ -114,11 +122,13 @@ public static class KpiEndpoints
|
||||
[FromQuery] DateTimeOffset? to,
|
||||
[FromQuery] string? tenant,
|
||||
[FromServices] IKpiCollector collector,
|
||||
[FromServices] TimeProvider timeProvider,
|
||||
CancellationToken ct)
|
||||
{
|
||||
var now = timeProvider.GetUtcNow();
|
||||
var kpis = await collector.CollectAsync(
|
||||
from ?? DateTimeOffset.UtcNow.AddDays(-7),
|
||||
to ?? DateTimeOffset.UtcNow,
|
||||
from ?? now.AddDays(-7),
|
||||
to ?? now,
|
||||
tenant,
|
||||
ct);
|
||||
return Results.Ok(kpis.Replay);
|
||||
|
||||
Reference in New Issue
Block a user