Refactor code structure and optimize performance across multiple modules
This commit is contained in:
@@ -120,6 +120,53 @@ public sealed class GraphJobRepository : IGraphJobRepository
|
||||
public ValueTask<IReadOnlyCollection<GraphOverlayJob>> ListOverlayJobsAsync(string tenantId, CancellationToken cancellationToken)
|
||||
=> ListOverlayJobsAsync(tenantId, status: null, limit: 50, cancellationToken);
|
||||
|
||||
// Cross-tenant overloads for background services - scans all tenants
|
||||
public async ValueTask<IReadOnlyCollection<GraphBuildJob>> ListBuildJobsAsync(GraphJobStatus? status, int limit, CancellationToken cancellationToken)
|
||||
{
|
||||
var sql = "SELECT payload FROM scheduler.graph_jobs WHERE type=@Type";
|
||||
if (status is not null)
|
||||
{
|
||||
sql += " AND status=@Status";
|
||||
}
|
||||
sql += " ORDER BY created_at LIMIT @Limit";
|
||||
|
||||
await using var conn = await _dataSource.OpenSystemConnectionAsync(cancellationToken).ConfigureAwait(false);
|
||||
var results = await conn.QueryAsync<string>(sql, new
|
||||
{
|
||||
Type = (short)GraphJobQueryType.Build,
|
||||
Status = status is not null ? (short)status : (short?)null,
|
||||
Limit = limit
|
||||
});
|
||||
|
||||
return results
|
||||
.Select(r => CanonicalJsonSerializer.Deserialize<GraphBuildJob>(r))
|
||||
.Where(r => r is not null)!
|
||||
.ToArray()!;
|
||||
}
|
||||
|
||||
public async ValueTask<IReadOnlyCollection<GraphOverlayJob>> ListOverlayJobsAsync(GraphJobStatus? status, int limit, CancellationToken cancellationToken)
|
||||
{
|
||||
var sql = "SELECT payload FROM scheduler.graph_jobs WHERE type=@Type";
|
||||
if (status is not null)
|
||||
{
|
||||
sql += " AND status=@Status";
|
||||
}
|
||||
sql += " ORDER BY created_at LIMIT @Limit";
|
||||
|
||||
await using var conn = await _dataSource.OpenSystemConnectionAsync(cancellationToken).ConfigureAwait(false);
|
||||
var results = await conn.QueryAsync<string>(sql, new
|
||||
{
|
||||
Type = (short)GraphJobQueryType.Overlay,
|
||||
Status = status is not null ? (short)status : (short?)null,
|
||||
Limit = limit
|
||||
});
|
||||
|
||||
return results
|
||||
.Select(r => CanonicalJsonSerializer.Deserialize<GraphOverlayJob>(r))
|
||||
.Where(r => r is not null)!
|
||||
.ToArray()!;
|
||||
}
|
||||
|
||||
public async ValueTask<bool> TryReplaceAsync(GraphBuildJob job, GraphJobStatus expectedStatus, CancellationToken cancellationToken)
|
||||
{
|
||||
const string sql = @"UPDATE scheduler.graph_jobs
|
||||
|
||||
@@ -19,4 +19,8 @@ public interface IGraphJobRepository
|
||||
ValueTask<IReadOnlyCollection<GraphBuildJob>> ListBuildJobsAsync(string tenantId, GraphJobStatus? status, int limit, CancellationToken cancellationToken);
|
||||
ValueTask<IReadOnlyCollection<GraphOverlayJob>> ListOverlayJobsAsync(string tenantId, GraphJobStatus? status, int limit, CancellationToken cancellationToken);
|
||||
ValueTask<IReadOnlyCollection<GraphOverlayJob>> ListOverlayJobsAsync(string tenantId, CancellationToken cancellationToken);
|
||||
|
||||
// Cross-tenant overloads for background services
|
||||
ValueTask<IReadOnlyCollection<GraphBuildJob>> ListBuildJobsAsync(GraphJobStatus? status, int limit, CancellationToken cancellationToken);
|
||||
ValueTask<IReadOnlyCollection<GraphOverlayJob>> ListOverlayJobsAsync(GraphJobStatus? status, int limit, CancellationToken cancellationToken);
|
||||
}
|
||||
|
||||
@@ -58,4 +58,9 @@ public interface IJobHistoryRepository
|
||||
/// Deletes old history entries.
|
||||
/// </summary>
|
||||
Task<int> DeleteOlderThanAsync(DateTimeOffset cutoff, CancellationToken cancellationToken = default);
|
||||
|
||||
/// <summary>
|
||||
/// Gets recent failed jobs across all tenants for background indexing.
|
||||
/// </summary>
|
||||
Task<IReadOnlyList<JobHistoryEntity>> GetRecentFailedAsync(int limit, CancellationToken cancellationToken = default);
|
||||
}
|
||||
|
||||
@@ -210,6 +210,31 @@ public sealed class JobHistoryRepository : RepositoryBase<SchedulerDataSource>,
|
||||
return await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<IReadOnlyList<JobHistoryEntity>> GetRecentFailedAsync(int limit, CancellationToken cancellationToken = default)
|
||||
{
|
||||
const string sql = """
|
||||
SELECT id, job_id, tenant_id, project_id, job_type, status, attempt, payload_digest,
|
||||
result, reason, worker_id, duration_ms, created_at, completed_at, archived_at
|
||||
FROM scheduler.job_history
|
||||
WHERE status = 'failed'::scheduler.job_status OR status = 'timed_out'::scheduler.job_status
|
||||
ORDER BY completed_at DESC
|
||||
LIMIT @limit
|
||||
""";
|
||||
|
||||
await using var connection = await DataSource.OpenSystemConnectionAsync(cancellationToken).ConfigureAwait(false);
|
||||
await using var command = CreateCommand(sql, connection);
|
||||
AddParameter(command, "limit", limit);
|
||||
|
||||
var results = new List<JobHistoryEntity>();
|
||||
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
|
||||
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
results.Add(MapJobHistory(reader));
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
private static JobHistoryEntity MapJobHistory(NpgsqlDataReader reader) => new()
|
||||
{
|
||||
Id = reader.GetInt64(reader.GetOrdinal("id")),
|
||||
|
||||
@@ -89,8 +89,7 @@ public sealed class PartitionMaintenanceWorker : BackgroundService
|
||||
|
||||
_logger.LogInformation("Starting partition maintenance cycle");
|
||||
|
||||
await using var conn = await _dataSource.GetConnectionAsync(ct);
|
||||
await conn.OpenAsync(ct);
|
||||
await using var conn = await _dataSource.OpenSystemConnectionAsync(ct);
|
||||
|
||||
foreach (var (schemaTable, _) in opts.ManagedTables)
|
||||
{
|
||||
|
||||
@@ -7,7 +7,6 @@ using Microsoft.Extensions.Logging;
|
||||
using StellaOps.Scheduler.Models;
|
||||
using StellaOps.Scheduler.Queue;
|
||||
using StellaOps.Scheduler.Storage.Postgres.Repositories;
|
||||
using StellaOps.Scheduler.Storage.Postgres.Repositories.Services;
|
||||
using StellaOps.Scheduler.Worker.Events;
|
||||
using StellaOps.Scheduler.Worker.Observability;
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
using System.Text.Json;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
@@ -94,10 +95,13 @@ public sealed class FailureSignatureIndexer : BackgroundService
|
||||
_logger.LogDebug("Starting failure indexing batch");
|
||||
|
||||
// Get recent failed jobs that haven't been indexed
|
||||
var failedJobs = await _historyRepository.GetRecentFailedJobsAsync(
|
||||
var historyEntries = await _historyRepository.GetRecentFailedAsync(
|
||||
_options.Value.BatchSize,
|
||||
ct);
|
||||
|
||||
// Convert history entries to failed job records
|
||||
var failedJobs = historyEntries.Select(ConvertToFailedJobRecord).ToList();
|
||||
|
||||
var indexed = 0;
|
||||
foreach (var job in failedJobs)
|
||||
{
|
||||
@@ -278,6 +282,58 @@ public sealed class FailureSignatureIndexer : BackgroundService
|
||||
|
||||
return (errorCode, ErrorCategory.Unknown);
|
||||
}
|
||||
|
||||
private static FailedJobRecord ConvertToFailedJobRecord(JobHistoryEntity entity)
|
||||
{
|
||||
// Try to extract additional details from the result JSON
|
||||
string? imageDigest = null;
|
||||
string? artifactDigest = null;
|
||||
string? repository = null;
|
||||
string? errorCode = null;
|
||||
string? scannerVersion = null;
|
||||
string? runtimeVersion = null;
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(entity.Result))
|
||||
{
|
||||
try
|
||||
{
|
||||
using var doc = JsonDocument.Parse(entity.Result);
|
||||
var root = doc.RootElement;
|
||||
|
||||
if (root.TryGetProperty("imageDigest", out var imgProp))
|
||||
imageDigest = imgProp.GetString();
|
||||
if (root.TryGetProperty("artifactDigest", out var artProp))
|
||||
artifactDigest = artProp.GetString();
|
||||
if (root.TryGetProperty("repository", out var repoProp))
|
||||
repository = repoProp.GetString();
|
||||
if (root.TryGetProperty("errorCode", out var codeProp))
|
||||
errorCode = codeProp.GetString();
|
||||
if (root.TryGetProperty("scannerVersion", out var scanVerProp))
|
||||
scannerVersion = scanVerProp.GetString();
|
||||
if (root.TryGetProperty("runtimeVersion", out var rtVerProp))
|
||||
runtimeVersion = rtVerProp.GetString();
|
||||
}
|
||||
catch (JsonException)
|
||||
{
|
||||
// Result is not valid JSON, ignore
|
||||
}
|
||||
}
|
||||
|
||||
return new FailedJobRecord
|
||||
{
|
||||
JobId = entity.JobId,
|
||||
TenantId = entity.TenantId,
|
||||
JobType = entity.JobType,
|
||||
ImageDigest = imageDigest,
|
||||
ArtifactDigest = artifactDigest,
|
||||
Repository = repository,
|
||||
Error = entity.Reason,
|
||||
ErrorCode = errorCode,
|
||||
ScannerVersion = scannerVersion,
|
||||
RuntimeVersion = runtimeVersion,
|
||||
FailedAt = entity.CompletedAt
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -3,7 +3,6 @@ using Microsoft.Extensions.Logging;
|
||||
using StellaOps.Scheduler.Models;
|
||||
using StellaOps.Scheduler.Queue;
|
||||
using StellaOps.Scheduler.Storage.Postgres.Repositories;
|
||||
using StellaOps.Scheduler.Storage.Postgres.Repositories.Services;
|
||||
using StellaOps.Scheduler.Worker.Options;
|
||||
using StellaOps.Scheduler.Worker.Observability;
|
||||
|
||||
|
||||
@@ -202,11 +202,11 @@ internal sealed class PlannerQueueDispatchService : IPlannerQueueDispatchService
|
||||
return map;
|
||||
}
|
||||
|
||||
private async Task<IReadOnlyDictionary<string, SurfaceManifestPointer>> PrefetchManifestsAsync(
|
||||
private async Task<IReadOnlyDictionary<string, Queue.SurfaceManifestPointer>> PrefetchManifestsAsync(
|
||||
IReadOnlyList<string> digests,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var results = new Dictionary<string, SurfaceManifestPointer>(StringComparer.Ordinal);
|
||||
var results = new Dictionary<string, Queue.SurfaceManifestPointer>(StringComparer.Ordinal);
|
||||
|
||||
foreach (var digest in digests)
|
||||
{
|
||||
@@ -224,7 +224,7 @@ internal sealed class PlannerQueueDispatchService : IPlannerQueueDispatchService
|
||||
continue;
|
||||
}
|
||||
|
||||
var pointer = new SurfaceManifestPointer(digest, manifest.Tenant);
|
||||
var pointer = new Queue.SurfaceManifestPointer(digest, manifest.Tenant);
|
||||
results[digest] = pointer;
|
||||
_metrics.RecordSurfaceManifestPrefetch(result: "hit");
|
||||
}
|
||||
@@ -239,9 +239,7 @@ internal sealed class PlannerQueueDispatchService : IPlannerQueueDispatchService
|
||||
}
|
||||
}
|
||||
|
||||
return results.Count == 0
|
||||
? (IReadOnlyDictionary<string, SurfaceManifestPointer>)EmptyReadOnlyDictionary<string, SurfaceManifestPointer>.Instance
|
||||
: results;
|
||||
return results;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
using System.Diagnostics;
|
||||
using System.Net.Http.Json;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user