audit remarks work
This commit is contained in:
83
src/Scheduler/Tools/Scheduler.Backfill/BackfillApp.cs
Normal file
83
src/Scheduler/Tools/Scheduler.Backfill/BackfillApp.cs
Normal file
@@ -0,0 +1,83 @@
|
||||
using System.CommandLine;
|
||||
|
||||
namespace Scheduler.Backfill;
|
||||
|
||||
public static class BackfillApp
|
||||
{
|
||||
public static async Task<int> RunAsync(string[] args)
|
||||
{
|
||||
var pgOption = new Option<string?>("--pg")
|
||||
{
|
||||
Description = "PostgreSQL connection string (falls back to POSTGRES_CONNECTION_STRING)."
|
||||
};
|
||||
|
||||
var batchOption = new Option<int>("--batch")
|
||||
{
|
||||
Description = "Batch size for inserts (min 50).",
|
||||
DefaultValueFactory = _ => 500
|
||||
};
|
||||
|
||||
var sourceOption = new Option<FileInfo>("--source")
|
||||
{
|
||||
Description = "Path to NDJSON file containing GraphBuildJob payloads.",
|
||||
Required = true
|
||||
};
|
||||
|
||||
var dryRunOption = new Option<bool>("--dry-run")
|
||||
{
|
||||
Description = "Validate and report without inserting rows."
|
||||
};
|
||||
|
||||
var timeoutOption = new Option<int>("--timeout-seconds")
|
||||
{
|
||||
Description = "Cancel the backfill after the given number of seconds (0 disables).",
|
||||
DefaultValueFactory = _ => 0
|
||||
};
|
||||
|
||||
var command = new RootCommand("Scheduler graph job backfill tool");
|
||||
command.Add(pgOption);
|
||||
command.Add(batchOption);
|
||||
command.Add(sourceOption);
|
||||
command.Add(dryRunOption);
|
||||
command.Add(timeoutOption);
|
||||
|
||||
command.SetAction(async (parseResult, cancellationToken) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
var pg = parseResult.GetValue(pgOption);
|
||||
var batch = parseResult.GetValue(batchOption);
|
||||
var source = parseResult.GetValue(sourceOption);
|
||||
var dryRun = parseResult.GetValue(dryRunOption);
|
||||
var timeoutSeconds = parseResult.GetValue(timeoutOption);
|
||||
|
||||
if (source is null)
|
||||
{
|
||||
Console.Error.WriteLine("[FAIL] --source is required.");
|
||||
return 1;
|
||||
}
|
||||
|
||||
var options = BackfillOptions.From(pg, batch, dryRun, source.FullName, timeoutSeconds);
|
||||
var runner = new BackfillRunner(options, Console.WriteLine);
|
||||
|
||||
if (options.Timeout is null)
|
||||
{
|
||||
await runner.RunAsync(cancellationToken).ConfigureAwait(false);
|
||||
return 0;
|
||||
}
|
||||
|
||||
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||||
cts.CancelAfter(options.Timeout.Value);
|
||||
await runner.RunAsync(cts.Token).ConfigureAwait(false);
|
||||
return 0;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Console.Error.WriteLine($"[FAIL] {ex.Message}");
|
||||
return 1;
|
||||
}
|
||||
});
|
||||
|
||||
return await command.Parse(args).InvokeAsync().ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
148
src/Scheduler/Tools/Scheduler.Backfill/BackfillRunner.cs
Normal file
148
src/Scheduler/Tools/Scheduler.Backfill/BackfillRunner.cs
Normal file
@@ -0,0 +1,148 @@
|
||||
using System.Text.Json;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Extensions.Options;
|
||||
using StellaOps.Infrastructure.Postgres.Options;
|
||||
using StellaOps.Scheduler.Models;
|
||||
using StellaOps.Scheduler.Persistence.Postgres;
|
||||
using StellaOps.Scheduler.Persistence.Postgres.Repositories;
|
||||
|
||||
namespace Scheduler.Backfill;
|
||||
|
||||
public sealed record BackfillOptions(
|
||||
string PostgresConnectionString,
|
||||
int BatchSize,
|
||||
bool DryRun,
|
||||
string SourcePath,
|
||||
TimeSpan? Timeout)
|
||||
{
|
||||
public static BackfillOptions From(string? pgConn, int batchSize, bool dryRun, string sourcePath, int timeoutSeconds)
|
||||
{
|
||||
var pg = string.IsNullOrWhiteSpace(pgConn)
|
||||
? Environment.GetEnvironmentVariable("POSTGRES_CONNECTION_STRING")
|
||||
: pgConn;
|
||||
|
||||
if (string.IsNullOrWhiteSpace(pg))
|
||||
{
|
||||
throw new ArgumentException("PostgreSQL connection string is required (--pg or POSTGRES_CONNECTION_STRING)");
|
||||
}
|
||||
|
||||
if (string.IsNullOrWhiteSpace(sourcePath))
|
||||
{
|
||||
throw new ArgumentException("Source file path is required (--source)");
|
||||
}
|
||||
|
||||
var normalizedBatch = Math.Max(50, batchSize);
|
||||
var timeout = timeoutSeconds > 0 ? TimeSpan.FromSeconds(timeoutSeconds) : (TimeSpan?)null;
|
||||
|
||||
return new BackfillOptions(pg, normalizedBatch, dryRun, sourcePath, timeout);
|
||||
}
|
||||
}
|
||||
|
||||
public sealed class BackfillRunner
|
||||
{
|
||||
private readonly BackfillOptions _options;
|
||||
private readonly Action<string> _log;
|
||||
private readonly SchedulerDataSource _dataSource;
|
||||
private readonly IGraphJobRepository _graphJobRepository;
|
||||
|
||||
public BackfillRunner(BackfillOptions options, Action<string>? log = null)
|
||||
{
|
||||
_options = options;
|
||||
_log = log ?? (_ => { });
|
||||
|
||||
_dataSource = new SchedulerDataSource(Options.Create(new PostgresOptions
|
||||
{
|
||||
ConnectionString = options.PostgresConnectionString,
|
||||
SchemaName = "scheduler",
|
||||
CommandTimeoutSeconds = 30,
|
||||
AutoMigrate = false
|
||||
}), NullLogger<SchedulerDataSource>.Instance);
|
||||
_graphJobRepository = new GraphJobRepository(_dataSource);
|
||||
}
|
||||
|
||||
public async Task RunAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (!File.Exists(_options.SourcePath))
|
||||
{
|
||||
throw new FileNotFoundException($"Source file '{_options.SourcePath}' does not exist.", _options.SourcePath);
|
||||
}
|
||||
|
||||
_log($"Graph job backfill starting (dry-run={_options.DryRun}, batch={_options.BatchSize}).");
|
||||
|
||||
var batch = new List<GraphBuildJob>(_options.BatchSize);
|
||||
var total = 0;
|
||||
var inserted = 0;
|
||||
|
||||
await foreach (var job in ReadJobsAsync(_options.SourcePath, cancellationToken))
|
||||
{
|
||||
batch.Add(job);
|
||||
total++;
|
||||
|
||||
if (batch.Count >= _options.BatchSize)
|
||||
{
|
||||
inserted += await ProcessBatchAsync(batch, cancellationToken).ConfigureAwait(false);
|
||||
batch.Clear();
|
||||
}
|
||||
}
|
||||
|
||||
if (batch.Count > 0)
|
||||
{
|
||||
inserted += await ProcessBatchAsync(batch, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
_log($"Backfill completed. Jobs processed: {total}. Jobs inserted: {inserted}.");
|
||||
}
|
||||
|
||||
private async Task<int> ProcessBatchAsync(List<GraphBuildJob> batch, CancellationToken cancellationToken)
|
||||
{
|
||||
if (_options.DryRun)
|
||||
{
|
||||
_log($"Dry run: would insert {batch.Count} jobs.");
|
||||
return 0;
|
||||
}
|
||||
|
||||
foreach (var job in batch)
|
||||
{
|
||||
await _graphJobRepository.InsertAsync(job, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
_log($"Inserted {batch.Count} jobs.");
|
||||
return batch.Count;
|
||||
}
|
||||
|
||||
private static async IAsyncEnumerable<GraphBuildJob> ReadJobsAsync(string path, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
using var stream = File.OpenRead(path);
|
||||
using var reader = new StreamReader(stream);
|
||||
var lineNumber = 0;
|
||||
|
||||
while (true)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
var line = await reader.ReadLineAsync(cancellationToken).ConfigureAwait(false);
|
||||
if (line is null)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
lineNumber++;
|
||||
|
||||
if (string.IsNullOrWhiteSpace(line))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
GraphBuildJob job;
|
||||
try
|
||||
{
|
||||
job = CanonicalJsonSerializer.Deserialize<GraphBuildJob>(line);
|
||||
}
|
||||
catch (JsonException ex)
|
||||
{
|
||||
throw new InvalidOperationException($"Failed to parse GraphBuildJob on line {lineNumber}: {ex.Message}");
|
||||
}
|
||||
|
||||
yield return job;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,130 +1,3 @@
|
||||
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Extensions.Options;
|
||||
using Npgsql;
|
||||
using Scheduler.Backfill;
|
||||
using StellaOps.Scheduler.Models;
|
||||
using StellaOps.Scheduler.Persistence.Postgres;
|
||||
using StellaOps.Scheduler.Persistence.Postgres.Repositories;
|
||||
using StellaOps.Infrastructure.Postgres.Options;
|
||||
|
||||
var parsed = ParseArgs(args);
|
||||
var options = BackfillOptions.From(parsed.PostgresConnection, parsed.BatchSize, parsed.DryRun);
|
||||
|
||||
var runner = new BackfillRunner(options);
|
||||
await runner.RunAsync();
|
||||
return 0;
|
||||
|
||||
static BackfillCliOptions ParseArgs(string[] args)
|
||||
{
|
||||
string? pg = null;
|
||||
int batch = 500;
|
||||
bool dryRun = false;
|
||||
|
||||
for (var i = 0; i < args.Length; i++)
|
||||
{
|
||||
switch (args[i])
|
||||
{
|
||||
case "--pg" or "-p":
|
||||
pg = NextValue(args, ref i);
|
||||
break;
|
||||
case "--batch":
|
||||
batch = int.TryParse(NextValue(args, ref i), out var b) ? b : 500;
|
||||
break;
|
||||
case "--dry-run":
|
||||
dryRun = true;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return new BackfillCliOptions(pg, batch, dryRun);
|
||||
}
|
||||
|
||||
static string NextValue(string[] args, ref int index)
|
||||
{
|
||||
if (index + 1 >= args.Length)
|
||||
{
|
||||
return string.Empty;
|
||||
}
|
||||
index++;
|
||||
return args[index];
|
||||
}
|
||||
|
||||
internal sealed record BackfillCliOptions(
|
||||
string? PostgresConnection,
|
||||
int BatchSize,
|
||||
bool DryRun);
|
||||
|
||||
internal sealed record BackfillOptions(
|
||||
string PostgresConnectionString,
|
||||
int BatchSize,
|
||||
bool DryRun)
|
||||
{
|
||||
public static BackfillOptions From(string? pgConn, int batchSize, bool dryRun)
|
||||
{
|
||||
var pg = string.IsNullOrWhiteSpace(pgConn)
|
||||
? Environment.GetEnvironmentVariable("POSTGRES_CONNECTION_STRING")
|
||||
: pgConn;
|
||||
|
||||
if (string.IsNullOrWhiteSpace(pg))
|
||||
{
|
||||
throw new ArgumentException("PostgreSQL connection string is required (--pg or POSTGRES_CONNECTION_STRING)");
|
||||
}
|
||||
|
||||
return new BackfillOptions(pg!, Math.Max(50, batchSize), dryRun);
|
||||
}
|
||||
}
|
||||
|
||||
internal sealed class BackfillRunner
|
||||
{
|
||||
private readonly BackfillOptions _options;
|
||||
private readonly NpgsqlDataSource _pg;
|
||||
private readonly SchedulerDataSource _dataSource;
|
||||
private readonly IGraphJobRepository _graphJobRepository;
|
||||
|
||||
public BackfillRunner(BackfillOptions options)
|
||||
{
|
||||
_options = options;
|
||||
_pg = NpgsqlDataSource.Create(options.PostgresConnectionString);
|
||||
_dataSource = new SchedulerDataSource(Options.Create(new PostgresOptions
|
||||
{
|
||||
ConnectionString = options.PostgresConnectionString,
|
||||
SchemaName = "scheduler",
|
||||
CommandTimeoutSeconds = 30,
|
||||
AutoMigrate = false
|
||||
}), NullLogger<SchedulerDataSource>.Instance);
|
||||
_graphJobRepository = new GraphJobRepository(_dataSource);
|
||||
}
|
||||
|
||||
public async Task RunAsync()
|
||||
{
|
||||
Console.WriteLine($"Postgres graph job backfill starting (dry-run={_options.DryRun})");
|
||||
|
||||
// Placeholder: actual copy logic would map legacy export to new Postgres graph_jobs rows.
|
||||
if (_options.DryRun)
|
||||
{
|
||||
Console.WriteLine("Dry run: no changes applied.");
|
||||
return;
|
||||
}
|
||||
|
||||
await using var conn = await _dataSource.OpenSystemConnectionAsync(CancellationToken.None);
|
||||
await using var tx = await conn.BeginTransactionAsync();
|
||||
|
||||
// Example: seed an empty job to validate wiring
|
||||
var sample = new GraphBuildJob(
|
||||
id: Guid.NewGuid().ToString(),
|
||||
tenantId: "tenant",
|
||||
sbomId: "sbom",
|
||||
sbomVersionId: "sbom-ver",
|
||||
sbomDigest: "sha256:dummy",
|
||||
status: GraphJobStatus.Pending,
|
||||
trigger: GraphBuildJobTrigger.Manual,
|
||||
createdAt: DateTimeOffset.UtcNow);
|
||||
|
||||
await _graphJobRepository.InsertAsync(sample, CancellationToken.None);
|
||||
await tx.CommitAsync();
|
||||
Console.WriteLine("Backfill completed (sample insert).");
|
||||
}
|
||||
}
|
||||
return await BackfillApp.RunAsync(args);
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Npgsql" />
|
||||
<PackageReference Include="System.CommandLine" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
||||
@@ -0,0 +1,61 @@
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Threading.Tasks;
|
||||
using FluentAssertions;
|
||||
using Scheduler.Backfill;
|
||||
using StellaOps.Scheduler.Models;
|
||||
using Xunit;
|
||||
|
||||
namespace StellaOps.Scheduler.Backfill.Tests;
|
||||
|
||||
public sealed class BackfillOptionsTests
|
||||
{
|
||||
[Fact]
|
||||
public void From_ClampsBatchSize()
|
||||
{
|
||||
var options = BackfillOptions.From(
|
||||
pgConn: "Host=localhost;Username=stella;Password=secret;Database=scheduler",
|
||||
batchSize: 10,
|
||||
dryRun: true,
|
||||
sourcePath: "jobs.ndjson",
|
||||
timeoutSeconds: 0);
|
||||
|
||||
options.BatchSize.Should().Be(50);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Runner_DryRun_ParsesNdjson()
|
||||
{
|
||||
var job = new GraphBuildJob(
|
||||
id: "job-1",
|
||||
tenantId: "tenant",
|
||||
sbomId: "sbom",
|
||||
sbomVersionId: "sbom-ver",
|
||||
sbomDigest: "sha256:abc",
|
||||
status: GraphJobStatus.Pending,
|
||||
trigger: GraphBuildJobTrigger.Manual,
|
||||
createdAt: new DateTimeOffset(2025, 1, 1, 0, 0, 0, TimeSpan.Zero));
|
||||
|
||||
var json = CanonicalJsonSerializer.Serialize(job);
|
||||
|
||||
var tempPath = Path.GetTempFileName();
|
||||
await File.WriteAllTextAsync(tempPath, json + Environment.NewLine);
|
||||
|
||||
try
|
||||
{
|
||||
var options = new BackfillOptions(
|
||||
PostgresConnectionString: "Host=localhost;Username=stella;Password=secret;Database=scheduler",
|
||||
BatchSize: 50,
|
||||
DryRun: true,
|
||||
SourcePath: tempPath,
|
||||
Timeout: null);
|
||||
|
||||
var runner = new BackfillRunner(options);
|
||||
await runner.RunAsync(default);
|
||||
}
|
||||
finally
|
||||
{
|
||||
File.Delete(tempPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user