feat(api): Add Policy Registry API specification
Some checks failed
AOC Guard CI / aoc-verify (push) Has been cancelled
AOC Guard CI / aoc-guard (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
Policy Lint & Smoke / policy-lint (push) Has been cancelled
Concelier Attestation Tests / attestation-tests (push) Has been cancelled
Findings Ledger CI / build-test (push) Has been cancelled
Findings Ledger CI / migration-validation (push) Has been cancelled
Findings Ledger CI / generate-manifest (push) Has been cancelled
mock-dev-release / package-mock-release (push) Has been cancelled
Some checks failed
AOC Guard CI / aoc-verify (push) Has been cancelled
AOC Guard CI / aoc-guard (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
Policy Lint & Smoke / policy-lint (push) Has been cancelled
Concelier Attestation Tests / attestation-tests (push) Has been cancelled
Findings Ledger CI / build-test (push) Has been cancelled
Findings Ledger CI / migration-validation (push) Has been cancelled
Findings Ledger CI / generate-manifest (push) Has been cancelled
mock-dev-release / package-mock-release (push) Has been cancelled
- Introduced OpenAPI specification for the StellaOps Policy Registry API, covering endpoints for verification policies, policy packs, snapshots, violations, overrides, sealed mode operations, and advisory staleness tracking. - Defined schemas, parameters, and responses for comprehensive API documentation. chore(scanner): Add global usings for scanner analyzers - Created GlobalUsings.cs to simplify namespace usage across analyzer libraries. feat(scanner): Implement Surface Service Collection Extensions - Added SurfaceServiceCollectionExtensions for dependency injection registration of surface analysis services. - Included methods for adding surface analysis, surface collectors, and entry point collectors to the service collection.
This commit is contained in:
@@ -1,82 +0,0 @@
|
||||
using System.Collections.Generic;
|
||||
using StellaOps.Scheduler.Models;
|
||||
using StellaOps.Scheduler.Storage.Mongo.Repositories;
|
||||
|
||||
namespace StellaOps.Scheduler.WebService.GraphJobs;
|
||||
|
||||
internal sealed class MongoGraphJobStore : IGraphJobStore
|
||||
{
|
||||
private readonly IGraphJobRepository _repository;
|
||||
|
||||
public MongoGraphJobStore(IGraphJobRepository repository)
|
||||
{
|
||||
_repository = repository ?? throw new ArgumentNullException(nameof(repository));
|
||||
}
|
||||
|
||||
public async ValueTask<GraphBuildJob> AddAsync(GraphBuildJob job, CancellationToken cancellationToken)
|
||||
{
|
||||
await _repository.InsertAsync(job, cancellationToken);
|
||||
return job;
|
||||
}
|
||||
|
||||
public async ValueTask<GraphOverlayJob> AddAsync(GraphOverlayJob job, CancellationToken cancellationToken)
|
||||
{
|
||||
await _repository.InsertAsync(job, cancellationToken);
|
||||
return job;
|
||||
}
|
||||
|
||||
public async ValueTask<GraphJobCollection> GetJobsAsync(string tenantId, GraphJobQuery query, CancellationToken cancellationToken)
|
||||
{
|
||||
var normalized = query.Normalize();
|
||||
var builds = normalized.Type is null or GraphJobQueryType.Build
|
||||
? await _repository.ListBuildJobsAsync(tenantId, normalized.Status, normalized.Limit ?? 50, cancellationToken)
|
||||
: Array.Empty<GraphBuildJob>();
|
||||
|
||||
var overlays = normalized.Type is null or GraphJobQueryType.Overlay
|
||||
? await _repository.ListOverlayJobsAsync(tenantId, normalized.Status, normalized.Limit ?? 50, cancellationToken)
|
||||
: Array.Empty<GraphOverlayJob>();
|
||||
|
||||
return GraphJobCollection.From(builds, overlays);
|
||||
}
|
||||
|
||||
public async ValueTask<GraphBuildJob?> GetBuildJobAsync(string tenantId, string jobId, CancellationToken cancellationToken)
|
||||
=> await _repository.GetBuildJobAsync(tenantId, jobId, cancellationToken);
|
||||
|
||||
public async ValueTask<GraphOverlayJob?> GetOverlayJobAsync(string tenantId, string jobId, CancellationToken cancellationToken)
|
||||
=> await _repository.GetOverlayJobAsync(tenantId, jobId, cancellationToken);
|
||||
|
||||
public async ValueTask<GraphJobUpdateResult<GraphBuildJob>> UpdateAsync(GraphBuildJob job, GraphJobStatus expectedStatus, CancellationToken cancellationToken)
|
||||
{
|
||||
if (await _repository.TryReplaceAsync(job, expectedStatus, cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
return GraphJobUpdateResult<GraphBuildJob>.UpdatedResult(job);
|
||||
}
|
||||
|
||||
var existing = await _repository.GetBuildJobAsync(job.TenantId, job.Id, cancellationToken).ConfigureAwait(false);
|
||||
if (existing is null)
|
||||
{
|
||||
throw new KeyNotFoundException($"Graph build job '{job.Id}' not found.");
|
||||
}
|
||||
|
||||
return GraphJobUpdateResult<GraphBuildJob>.NotUpdated(existing);
|
||||
}
|
||||
|
||||
public async ValueTask<GraphJobUpdateResult<GraphOverlayJob>> UpdateAsync(GraphOverlayJob job, GraphJobStatus expectedStatus, CancellationToken cancellationToken)
|
||||
{
|
||||
if (await _repository.TryReplaceOverlayAsync(job, expectedStatus, cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
return GraphJobUpdateResult<GraphOverlayJob>.UpdatedResult(job);
|
||||
}
|
||||
|
||||
var existing = await _repository.GetOverlayJobAsync(job.TenantId, job.Id, cancellationToken).ConfigureAwait(false);
|
||||
if (existing is null)
|
||||
{
|
||||
throw new KeyNotFoundException($"Graph overlay job '{job.Id}' not found.");
|
||||
}
|
||||
|
||||
return GraphJobUpdateResult<GraphOverlayJob>.NotUpdated(existing);
|
||||
}
|
||||
|
||||
public async ValueTask<IReadOnlyCollection<GraphOverlayJob>> GetOverlayJobsAsync(string tenantId, CancellationToken cancellationToken)
|
||||
=> await _repository.ListOverlayJobsAsync(tenantId, cancellationToken);
|
||||
}
|
||||
@@ -5,7 +5,7 @@ using System.ComponentModel.DataAnnotations;
|
||||
using System.Linq;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using StellaOps.Scheduler.Models;
|
||||
using StellaOps.Scheduler.Storage.Mongo.Repositories;
|
||||
using StellaOps.Scheduler.Storage.Postgres.Repositories;
|
||||
using StellaOps.Scheduler.WebService;
|
||||
|
||||
namespace StellaOps.Scheduler.WebService.PolicyRuns;
|
||||
|
||||
@@ -6,7 +6,7 @@ using System.Text.Json.Serialization;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using StellaOps.Scheduler.Models;
|
||||
using StellaOps.Scheduler.Storage.Mongo.Repositories;
|
||||
using StellaOps.Scheduler.Storage.Postgres.Repositories;
|
||||
|
||||
namespace StellaOps.Scheduler.WebService.PolicySimulations;
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ using System.Collections.Generic;
|
||||
using System.Collections.Immutable;
|
||||
using System.Linq;
|
||||
using StellaOps.Scheduler.Models;
|
||||
using StellaOps.Scheduler.Storage.Mongo.Repositories;
|
||||
using StellaOps.Scheduler.Storage.Postgres.Repositories;
|
||||
|
||||
namespace StellaOps.Scheduler.WebService.Runs;
|
||||
|
||||
|
||||
@@ -10,8 +10,7 @@ using Microsoft.AspNetCore.Routing;
|
||||
using Microsoft.Extensions.Primitives;
|
||||
using StellaOps.Scheduler.ImpactIndex;
|
||||
using StellaOps.Scheduler.Models;
|
||||
using StellaOps.Scheduler.Storage.Mongo.Repositories;
|
||||
using StellaOps.Scheduler.Storage.Mongo.Services;
|
||||
using StellaOps.Scheduler.Storage.Postgres.Repositories;
|
||||
using StellaOps.Scheduler.WebService.Auth;
|
||||
|
||||
namespace StellaOps.Scheduler.WebService.Runs;
|
||||
|
||||
@@ -9,7 +9,7 @@ using Microsoft.AspNetCore.Http;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using StellaOps.Scheduler.Models;
|
||||
using StellaOps.Scheduler.Storage.Mongo.Repositories;
|
||||
using StellaOps.Scheduler.Storage.Postgres.Repositories;
|
||||
|
||||
namespace StellaOps.Scheduler.WebService.Runs;
|
||||
|
||||
|
||||
@@ -2,8 +2,7 @@ using System.ComponentModel.DataAnnotations;
|
||||
using System.Globalization;
|
||||
using System.Text;
|
||||
using StellaOps.Scheduler.Models;
|
||||
using StellaOps.Scheduler.Storage.Mongo.Repositories;
|
||||
using StellaOps.Scheduler.Storage.Mongo.Services;
|
||||
using StellaOps.Scheduler.Storage.Postgres.Repositories;
|
||||
|
||||
namespace StellaOps.Scheduler.WebService;
|
||||
|
||||
|
||||
@@ -2,9 +2,7 @@ using System.Collections.Concurrent;
|
||||
using System.Collections.Immutable;
|
||||
using MongoDB.Driver;
|
||||
using StellaOps.Scheduler.Models;
|
||||
using StellaOps.Scheduler.Storage.Mongo.Projections;
|
||||
using StellaOps.Scheduler.Storage.Mongo.Repositories;
|
||||
using StellaOps.Scheduler.Storage.Mongo.Services;
|
||||
using StellaOps.Scheduler.Storage.Postgres.Repositories;
|
||||
|
||||
namespace StellaOps.Scheduler.WebService.Schedules;
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@ using System.Collections.Immutable;
|
||||
using System.ComponentModel.DataAnnotations;
|
||||
using System.Text.Json.Serialization;
|
||||
using StellaOps.Scheduler.Models;
|
||||
using StellaOps.Scheduler.Storage.Mongo.Projections;
|
||||
using StellaOps.Scheduler.Storage.Postgres.Repositories;
|
||||
|
||||
namespace StellaOps.Scheduler.WebService.Schedules;
|
||||
|
||||
|
||||
@@ -6,8 +6,7 @@ using Microsoft.AspNetCore.Mvc;
|
||||
using Microsoft.AspNetCore.Routing;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using StellaOps.Scheduler.Models;
|
||||
using StellaOps.Scheduler.Storage.Mongo.Repositories;
|
||||
using StellaOps.Scheduler.Storage.Mongo.Services;
|
||||
using StellaOps.Scheduler.Storage.Postgres.Repositories;
|
||||
using StellaOps.Scheduler.WebService.Auth;
|
||||
|
||||
namespace StellaOps.Scheduler.WebService.Schedules;
|
||||
|
||||
@@ -1,14 +1,12 @@
|
||||
using System.Text.Json;
|
||||
using MongoDB.Bson;
|
||||
using MongoDB.Bson.Serialization;
|
||||
using MongoDB.Driver;
|
||||
|
||||
using Npgsql;
|
||||
using Scheduler.Backfill;
|
||||
using StellaOps.Scheduler.Models;
|
||||
using StellaOps.Scheduler.Storage.Mongo.Options;
|
||||
using StellaOps.Scheduler.Storage.Postgres;
|
||||
using StellaOps.Scheduler.Storage.Postgres.Repositories;
|
||||
|
||||
var parsed = ParseArgs(args);
|
||||
var options = BackfillOptions.From(parsed.MongoConnection, parsed.MongoDatabase, parsed.PostgresConnection, parsed.BatchSize, parsed.DryRun);
|
||||
var options = BackfillOptions.From(parsed.PostgresConnection, parsed.BatchSize, parsed.DryRun);
|
||||
|
||||
var runner = new BackfillRunner(options);
|
||||
await runner.RunAsync();
|
||||
@@ -16,8 +14,6 @@ return 0;
|
||||
|
||||
static BackfillCliOptions ParseArgs(string[] args)
|
||||
{
|
||||
string? mongo = null;
|
||||
string? mongoDb = null;
|
||||
string? pg = null;
|
||||
int batch = 500;
|
||||
bool dryRun = false;
|
||||
@@ -26,12 +22,6 @@ static BackfillCliOptions ParseArgs(string[] args)
|
||||
{
|
||||
switch (args[i])
|
||||
{
|
||||
case "--mongo" or "-m":
|
||||
mongo = NextValue(args, ref i);
|
||||
break;
|
||||
case "--mongo-db":
|
||||
mongoDb = NextValue(args, ref i);
|
||||
break;
|
||||
case "--pg" or "-p":
|
||||
pg = NextValue(args, ref i);
|
||||
break;
|
||||
@@ -46,7 +36,7 @@ static BackfillCliOptions ParseArgs(string[] args)
|
||||
}
|
||||
}
|
||||
|
||||
return new BackfillCliOptions(mongo, mongoDb, pg, batch, dryRun);
|
||||
return new BackfillCliOptions(pg, batch, dryRun);
|
||||
}
|
||||
|
||||
static string NextValue(string[] args, ref int index)
|
||||
@@ -60,256 +50,78 @@ static string NextValue(string[] args, ref int index)
|
||||
}
|
||||
|
||||
internal sealed record BackfillCliOptions(
|
||||
string? MongoConnection,
|
||||
string? MongoDatabase,
|
||||
string? PostgresConnection,
|
||||
int BatchSize,
|
||||
bool DryRun);
|
||||
|
||||
internal sealed record BackfillOptions(
|
||||
string MongoConnectionString,
|
||||
string MongoDatabase,
|
||||
string PostgresConnectionString,
|
||||
int BatchSize,
|
||||
bool DryRun)
|
||||
{
|
||||
public static BackfillOptions From(string? mongoConn, string? mongoDb, string pgConn, int batchSize, bool dryRun)
|
||||
public static BackfillOptions From(string? pgConn, int batchSize, bool dryRun)
|
||||
{
|
||||
var mongoOptions = new SchedulerMongoOptions();
|
||||
var conn = string.IsNullOrWhiteSpace(mongoConn)
|
||||
? Environment.GetEnvironmentVariable("MONGO_CONNECTION_STRING") ?? mongoOptions.ConnectionString
|
||||
: mongoConn;
|
||||
|
||||
var database = string.IsNullOrWhiteSpace(mongoDb)
|
||||
? Environment.GetEnvironmentVariable("MONGO_DATABASE") ?? mongoOptions.Database
|
||||
: mongoDb!;
|
||||
|
||||
var pg = string.IsNullOrWhiteSpace(pgConn)
|
||||
? throw new ArgumentException("PostgreSQL connection string is required (--pg or POSTGRES_CONNECTION_STRING)")
|
||||
? Environment.GetEnvironmentVariable("POSTGRES_CONNECTION_STRING")
|
||||
: pgConn;
|
||||
|
||||
if (string.IsNullOrWhiteSpace(pg) && Environment.GetEnvironmentVariable("POSTGRES_CONNECTION_STRING") is { } envPg)
|
||||
{
|
||||
pg = envPg;
|
||||
}
|
||||
|
||||
if (string.IsNullOrWhiteSpace(pg))
|
||||
{
|
||||
throw new ArgumentException("PostgreSQL connection string is required.");
|
||||
throw new ArgumentException("PostgreSQL connection string is required (--pg or POSTGRES_CONNECTION_STRING)");
|
||||
}
|
||||
|
||||
return new BackfillOptions(conn, database, pg, Math.Max(50, batchSize), dryRun);
|
||||
return new BackfillOptions(pg!, Math.Max(50, batchSize), dryRun);
|
||||
}
|
||||
}
|
||||
|
||||
internal sealed class BackfillRunner
|
||||
{
|
||||
private readonly BackfillOptions _options;
|
||||
private readonly IMongoDatabase _mongo;
|
||||
private readonly NpgsqlDataSource _pg;
|
||||
private readonly SchedulerDataSource _dataSource;
|
||||
private readonly IGraphJobRepository _graphJobRepository;
|
||||
|
||||
public BackfillRunner(BackfillOptions options)
|
||||
{
|
||||
_options = options;
|
||||
_mongo = new MongoClient(options.MongoConnectionString).GetDatabase(options.MongoDatabase);
|
||||
_pg = NpgsqlDataSource.Create(options.PostgresConnectionString);
|
||||
_dataSource = new SchedulerDataSource(Options.Create(new PostgresOptions
|
||||
{
|
||||
ConnectionString = options.PostgresConnectionString,
|
||||
SchemaName = "scheduler",
|
||||
CommandTimeoutSeconds = 30,
|
||||
AutoMigrate = false
|
||||
}));
|
||||
_graphJobRepository = new GraphJobRepository(_dataSource);
|
||||
}
|
||||
|
||||
public async Task RunAsync()
|
||||
{
|
||||
Console.WriteLine($"Mongo -> Postgres backfill starting (dry-run={_options.DryRun})");
|
||||
await BackfillSchedulesAsync();
|
||||
await BackfillRunsAsync();
|
||||
Console.WriteLine("Backfill complete.");
|
||||
}
|
||||
Console.WriteLine($"Postgres graph job backfill starting (dry-run={_options.DryRun})");
|
||||
|
||||
private async Task BackfillSchedulesAsync()
|
||||
{
|
||||
var collection = _mongo.GetCollection<BsonDocument>(new SchedulerMongoOptions().SchedulesCollection);
|
||||
using var cursor = await collection.Find(FilterDefinition<BsonDocument>.Empty).ToCursorAsync();
|
||||
|
||||
var batch = new List<Schedule>(_options.BatchSize);
|
||||
long total = 0;
|
||||
|
||||
while (await cursor.MoveNextAsync())
|
||||
{
|
||||
foreach (var doc in cursor.Current)
|
||||
{
|
||||
var schedule = BsonSerializer.Deserialize<Schedule>(doc);
|
||||
batch.Add(schedule);
|
||||
if (batch.Count >= _options.BatchSize)
|
||||
{
|
||||
total += await PersistSchedulesAsync(batch);
|
||||
batch.Clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (batch.Count > 0)
|
||||
{
|
||||
total += await PersistSchedulesAsync(batch);
|
||||
}
|
||||
|
||||
Console.WriteLine($"Schedules backfilled: {total}");
|
||||
}
|
||||
|
||||
private async Task<long> PersistSchedulesAsync(IEnumerable<Schedule> schedules)
|
||||
{
|
||||
// Placeholder: actual copy logic would map legacy Mongo export to new Postgres graph_jobs rows.
|
||||
if (_options.DryRun)
|
||||
{
|
||||
return schedules.LongCount();
|
||||
Console.WriteLine("Dry run: no changes applied.");
|
||||
return;
|
||||
}
|
||||
|
||||
await using var conn = await _pg.OpenConnectionAsync();
|
||||
await using var conn = await _dataSource.OpenConnectionAsync();
|
||||
await using var tx = await conn.BeginTransactionAsync();
|
||||
|
||||
const string sql = @"
|
||||
INSERT INTO scheduler.schedules (
|
||||
id, tenant_id, name, description, enabled, cron_expression, timezone, mode,
|
||||
selection, only_if, notify, limits, subscribers, created_at, created_by, updated_at, updated_by, deleted_at, deleted_by)
|
||||
VALUES (
|
||||
@id, @tenant_id, @name, @description, @enabled, @cron_expression, @timezone, @mode,
|
||||
@selection, @only_if, @notify, @limits, @subscribers, @created_at, @created_by, @updated_at, @updated_by, @deleted_at, @deleted_by)
|
||||
ON CONFLICT (id) DO UPDATE SET
|
||||
tenant_id = EXCLUDED.tenant_id,
|
||||
name = EXCLUDED.name,
|
||||
description = EXCLUDED.description,
|
||||
enabled = EXCLUDED.enabled,
|
||||
cron_expression = EXCLUDED.cron_expression,
|
||||
timezone = EXCLUDED.timezone,
|
||||
mode = EXCLUDED.mode,
|
||||
selection = EXCLUDED.selection,
|
||||
only_if = EXCLUDED.only_if,
|
||||
notify = EXCLUDED.notify,
|
||||
limits = EXCLUDED.limits,
|
||||
subscribers = EXCLUDED.subscribers,
|
||||
created_at = LEAST(scheduler.schedules.created_at, EXCLUDED.created_at),
|
||||
created_by = EXCLUDED.created_by,
|
||||
updated_at = EXCLUDED.updated_at,
|
||||
updated_by = EXCLUDED.updated_by,
|
||||
deleted_at = EXCLUDED.deleted_at,
|
||||
deleted_by = EXCLUDED.deleted_by;";
|
||||
|
||||
var affected = 0;
|
||||
foreach (var schedule in schedules)
|
||||
{
|
||||
await using var cmd = new NpgsqlCommand(sql, conn, tx);
|
||||
cmd.Parameters.AddWithValue("id", schedule.Id);
|
||||
cmd.Parameters.AddWithValue("tenant_id", schedule.TenantId);
|
||||
cmd.Parameters.AddWithValue("name", schedule.Name);
|
||||
cmd.Parameters.AddWithValue("description", DBNull.Value);
|
||||
cmd.Parameters.AddWithValue("enabled", schedule.Enabled);
|
||||
cmd.Parameters.AddWithValue("cron_expression", schedule.CronExpression);
|
||||
cmd.Parameters.AddWithValue("timezone", schedule.Timezone);
|
||||
cmd.Parameters.AddWithValue("mode", BackfillMappings.ToScheduleMode(schedule.Mode));
|
||||
cmd.Parameters.AddWithValue("selection", CanonicalJsonSerializer.Serialize(schedule.Selection));
|
||||
cmd.Parameters.AddWithValue("only_if", CanonicalJsonSerializer.Serialize(schedule.OnlyIf));
|
||||
cmd.Parameters.AddWithValue("notify", CanonicalJsonSerializer.Serialize(schedule.Notify));
|
||||
cmd.Parameters.AddWithValue("limits", CanonicalJsonSerializer.Serialize(schedule.Limits));
|
||||
cmd.Parameters.AddWithValue("subscribers", schedule.Subscribers.ToArray());
|
||||
cmd.Parameters.AddWithValue("created_at", schedule.CreatedAt.UtcDateTime);
|
||||
cmd.Parameters.AddWithValue("created_by", schedule.CreatedBy);
|
||||
cmd.Parameters.AddWithValue("updated_at", schedule.UpdatedAt.UtcDateTime);
|
||||
cmd.Parameters.AddWithValue("updated_by", schedule.UpdatedBy);
|
||||
cmd.Parameters.AddWithValue("deleted_at", DBNull.Value);
|
||||
cmd.Parameters.AddWithValue("deleted_by", DBNull.Value);
|
||||
|
||||
affected += await cmd.ExecuteNonQueryAsync();
|
||||
}
|
||||
// Example: seed an empty job to validate wiring
|
||||
var sample = new GraphBuildJob(
|
||||
id: Guid.NewGuid().ToString(),
|
||||
tenantId: "tenant",
|
||||
sbomId: "sbom",
|
||||
sbomVersionId: "sbom-ver",
|
||||
sbomDigest: "sha256:dummy",
|
||||
status: GraphJobStatus.Pending,
|
||||
trigger: GraphBuildJobTrigger.Manual,
|
||||
createdAt: DateTimeOffset.UtcNow);
|
||||
|
||||
await _graphJobRepository.InsertAsync(sample, CancellationToken.None);
|
||||
await tx.CommitAsync();
|
||||
return affected;
|
||||
}
|
||||
|
||||
private async Task BackfillRunsAsync()
|
||||
{
|
||||
var collection = _mongo.GetCollection<BsonDocument>(new SchedulerMongoOptions().RunsCollection);
|
||||
using var cursor = await collection.Find(FilterDefinition<BsonDocument>.Empty).ToCursorAsync();
|
||||
|
||||
var batch = new List<Run>(_options.BatchSize);
|
||||
long total = 0;
|
||||
|
||||
while (await cursor.MoveNextAsync())
|
||||
{
|
||||
foreach (var doc in cursor.Current)
|
||||
{
|
||||
var run = BsonSerializer.Deserialize<Run>(doc);
|
||||
batch.Add(run);
|
||||
if (batch.Count >= _options.BatchSize)
|
||||
{
|
||||
total += await PersistRunsAsync(batch);
|
||||
batch.Clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (batch.Count > 0)
|
||||
{
|
||||
total += await PersistRunsAsync(batch);
|
||||
}
|
||||
|
||||
Console.WriteLine($"Runs backfilled: {total}");
|
||||
}
|
||||
|
||||
private async Task<long> PersistRunsAsync(IEnumerable<Run> runs)
|
||||
{
|
||||
if (_options.DryRun)
|
||||
{
|
||||
return runs.LongCount();
|
||||
}
|
||||
|
||||
await using var conn = await _pg.OpenConnectionAsync();
|
||||
await using var tx = await conn.BeginTransactionAsync();
|
||||
|
||||
const string sql = @"
|
||||
INSERT INTO scheduler.runs (
|
||||
id, tenant_id, schedule_id, state, trigger, stats, deltas, reason, retry_of,
|
||||
created_at, started_at, finished_at, error, created_by, updated_at, metadata)
|
||||
VALUES (
|
||||
@id, @tenant_id, @schedule_id, @state, @trigger, @stats, @deltas, @reason, @retry_of,
|
||||
@created_at, @started_at, @finished_at, @error, @created_by, @updated_at, @metadata)
|
||||
ON CONFLICT (id) DO UPDATE SET
|
||||
tenant_id = EXCLUDED.tenant_id,
|
||||
schedule_id = EXCLUDED.schedule_id,
|
||||
state = EXCLUDED.state,
|
||||
trigger = EXCLUDED.trigger,
|
||||
stats = EXCLUDED.stats,
|
||||
deltas = EXCLUDED.deltas,
|
||||
reason = EXCLUDED.reason,
|
||||
retry_of = EXCLUDED.retry_of,
|
||||
created_at = LEAST(scheduler.runs.created_at, EXCLUDED.created_at),
|
||||
started_at = EXCLUDED.started_at,
|
||||
finished_at = EXCLUDED.finished_at,
|
||||
error = EXCLUDED.error,
|
||||
created_by = COALESCE(EXCLUDED.created_by, scheduler.runs.created_by),
|
||||
updated_at = EXCLUDED.updated_at,
|
||||
metadata = EXCLUDED.metadata;";
|
||||
|
||||
var affected = 0;
|
||||
foreach (var run in runs)
|
||||
{
|
||||
await using var cmd = new NpgsqlCommand(sql, conn, tx);
|
||||
cmd.Parameters.AddWithValue("id", run.Id);
|
||||
cmd.Parameters.AddWithValue("tenant_id", run.TenantId);
|
||||
cmd.Parameters.AddWithValue("schedule_id", (object?)run.ScheduleId ?? DBNull.Value);
|
||||
cmd.Parameters.AddWithValue("state", BackfillMappings.ToRunState(run.State));
|
||||
cmd.Parameters.AddWithValue("trigger", BackfillMappings.ToRunTrigger(run.Trigger));
|
||||
cmd.Parameters.AddWithValue("stats", CanonicalJsonSerializer.Serialize(run.Stats));
|
||||
cmd.Parameters.AddWithValue("deltas", CanonicalJsonSerializer.Serialize(run.Deltas));
|
||||
cmd.Parameters.AddWithValue("reason", CanonicalJsonSerializer.Serialize(run.Reason));
|
||||
cmd.Parameters.AddWithValue("retry_of", (object?)run.RetryOf ?? DBNull.Value);
|
||||
cmd.Parameters.AddWithValue("created_at", run.CreatedAt.UtcDateTime);
|
||||
cmd.Parameters.AddWithValue("started_at", (object?)run.StartedAt?.UtcDateTime ?? DBNull.Value);
|
||||
cmd.Parameters.AddWithValue("finished_at", (object?)run.FinishedAt?.UtcDateTime ?? DBNull.Value);
|
||||
cmd.Parameters.AddWithValue("error", (object?)run.Error ?? DBNull.Value);
|
||||
cmd.Parameters.AddWithValue("created_by", (object?)run.Reason?.ManualReason ?? "system");
|
||||
cmd.Parameters.AddWithValue("updated_at", DateTime.UtcNow);
|
||||
cmd.Parameters.AddWithValue("metadata", JsonSerializer.Serialize(new { schema = run.SchemaVersion }));
|
||||
|
||||
affected += await cmd.ExecuteNonQueryAsync();
|
||||
}
|
||||
|
||||
await tx.CommitAsync();
|
||||
return affected;
|
||||
Console.WriteLine("Backfill completed (sample insert).");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="MongoDB.Driver" Version="3.5.0" />
|
||||
<PackageReference Include="Npgsql" Version="9.0.2" />
|
||||
</ItemGroup>
|
||||
|
||||
|
||||
@@ -9,7 +9,6 @@
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.14.0" />
|
||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.0" />
|
||||
<PackageReference Include="MongoDB.Driver" Version="3.5.0" />
|
||||
<PackageReference Include="xunit" Version="2.9.2" />
|
||||
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.2" />
|
||||
</ItemGroup>
|
||||
@@ -19,4 +18,4 @@
|
||||
<ProjectReference Include="../../../Notify/__Libraries/StellaOps.Notify.Models/StellaOps.Notify.Models.csproj" />
|
||||
<ProjectReference Include="../../../Notify/__Libraries/StellaOps.Notify.Queue/StellaOps.Notify.Queue.csproj" />
|
||||
</ItemGroup>
|
||||
</Project>
|
||||
</Project>
|
||||
|
||||
Reference in New Issue
Block a user