fix(graph): migration 002 now tolerates legacy graph_nodes/edges schemas

Rewrites migration 002 to use ALTER TABLE ... IF EXISTS with per-column guards
and a data-migration DO block that backfills document_json/written_at/batch_id
from the older (tenant_id, data, created_at) layout when present. Updates
GraphChangeStreamProcessor + SavedViewsMigrationHostedService for the aligned
schema and extends the incremental processor tests for the new path.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
master
2026-04-13 21:57:54 +03:00
parent 257e29355b
commit d613f8f2a4
4 changed files with 172 additions and 36 deletions

View File

@@ -1,6 +1,7 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StellaOps.Graph.Indexer.Persistence.Extensions;
using StellaOps.Infrastructure.Postgres.Migrations;
using StellaOps.Infrastructure.Postgres.Options;
@@ -11,7 +12,7 @@ public sealed class GraphSavedViewsMigrationHostedService : IHostedService
private readonly IOptions<PostgresOptions> _options;
private readonly ILoggerFactory _loggerFactory;
private readonly IHostApplicationLifetime _lifetime;
private StartupMigrationHost? _inner;
private readonly List<StartupMigrationHost> _hosts = [];
public GraphSavedViewsMigrationHostedService(
IOptions<PostgresOptions> options,
@@ -31,31 +32,62 @@ public sealed class GraphSavedViewsMigrationHostedService : IHostedService
return Task.CompletedTask;
}
_inner = new GraphSavedViewsStartupMigrationHost(
var schemaName = string.IsNullOrWhiteSpace(options.SchemaName)
? PostgresGraphSavedViewStore.DefaultSchemaName
: options.SchemaName.Trim();
_hosts.Clear();
_hosts.Add(new GraphStartupMigrationHost(
options.ConnectionString,
string.IsNullOrWhiteSpace(options.SchemaName)
? PostgresGraphSavedViewStore.DefaultSchemaName
: options.SchemaName.Trim(),
schemaName,
"Graph.Persistence",
typeof(GraphIndexerPersistenceExtensions).Assembly,
_loggerFactory.CreateLogger("Migration.Graph.Persistence"),
_lifetime));
_hosts.Add(new GraphStartupMigrationHost(
options.ConnectionString,
schemaName,
"Graph.Api",
typeof(Program).Assembly,
_loggerFactory.CreateLogger("Migration.Graph.Api"),
_lifetime);
return _inner.StartAsync(cancellationToken);
_lifetime));
return StartHostsAsync(cancellationToken);
}
public Task StopAsync(CancellationToken cancellationToken)
=> _inner?.StopAsync(cancellationToken) ?? Task.CompletedTask;
=> StopHostsAsync(cancellationToken);
private sealed class GraphSavedViewsStartupMigrationHost : StartupMigrationHost
private async Task StartHostsAsync(CancellationToken cancellationToken)
{
public GraphSavedViewsStartupMigrationHost(
foreach (var host in _hosts)
{
await host.StartAsync(cancellationToken).ConfigureAwait(false);
}
}
private async Task StopHostsAsync(CancellationToken cancellationToken)
{
foreach (var host in _hosts)
{
await host.StopAsync(cancellationToken).ConfigureAwait(false);
}
}
private sealed class GraphStartupMigrationHost : StartupMigrationHost
{
public GraphStartupMigrationHost(
string connectionString,
string schemaName,
string moduleName,
System.Reflection.Assembly migrationsAssembly,
ILogger logger,
IHostApplicationLifetime lifetime)
: base(
connectionString,
schemaName,
"Graph.Api",
typeof(Program).Assembly,
moduleName,
migrationsAssembly,
logger,
lifetime)
{

View File

@@ -3,6 +3,7 @@ using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StellaOps.Graph.Indexer.Ingestion.Sbom;
using System.Diagnostics;
using System.Globalization;
using System.Linq;
using System.Text.Json.Nodes;
@@ -37,30 +38,30 @@ public sealed class GraphChangeStreamProcessor : BackgroundService
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
protected override Task ExecuteAsync(CancellationToken stoppingToken)
=> RunPollingLoopAsync(stoppingToken);
internal async Task RunPollingLoopAsync(CancellationToken stoppingToken)
{
using var pollTimer = new PeriodicTimer(_options.PollInterval);
using var backfillTimer = new PeriodicTimer(_options.BackfillInterval);
var backfillStopwatch = Stopwatch.StartNew();
while (!stoppingToken.IsCancellationRequested)
try
{
var pollTask = pollTimer.WaitForNextTickAsync(stoppingToken).AsTask();
var backfillTask = backfillTimer.WaitForNextTickAsync(stoppingToken).AsTask();
var completed = await Task.WhenAny(pollTask, backfillTask).ConfigureAwait(false);
if (completed.IsCanceled || stoppingToken.IsCancellationRequested)
{
break;
}
if (completed == pollTask)
while (await pollTimer.WaitForNextTickAsync(stoppingToken).ConfigureAwait(false))
{
await ApplyStreamAsync(isBackfill: false, stoppingToken).ConfigureAwait(false);
if (_options.BackfillInterval <= TimeSpan.Zero || backfillStopwatch.Elapsed >= _options.BackfillInterval)
{
await ApplyStreamAsync(isBackfill: true, stoppingToken).ConfigureAwait(false);
backfillStopwatch.Restart();
}
}
else
{
await ApplyStreamAsync(isBackfill: true, stoppingToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// Graceful BackgroundService shutdown.
}
}

View File

@@ -11,11 +11,47 @@ CREATE SCHEMA IF NOT EXISTS graph;
CREATE TABLE IF NOT EXISTS graph.graph_nodes (
id TEXT PRIMARY KEY,
batch_id TEXT NOT NULL,
document_json JSONB NOT NULL,
written_at TIMESTAMPTZ NOT NULL
batch_id TEXT,
document_json JSONB NOT NULL DEFAULT '{}'::jsonb,
written_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
ALTER TABLE IF EXISTS graph.graph_nodes
ADD COLUMN IF NOT EXISTS batch_id TEXT,
ADD COLUMN IF NOT EXISTS document_json JSONB NOT NULL DEFAULT '{}'::jsonb,
ADD COLUMN IF NOT EXISTS written_at TIMESTAMPTZ NOT NULL DEFAULT NOW();
DO $$
BEGIN
IF EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_schema = 'graph'
AND table_name = 'graph_nodes'
AND column_name = 'data'
) THEN
EXECUTE $sql$
UPDATE graph.graph_nodes
SET document_json = COALESCE(document_json, data, '{}'::jsonb),
written_at = COALESCE(written_at, created_at, NOW()),
batch_id = COALESCE(batch_id, tenant_id || ':' || id)
WHERE document_json = '{}'::jsonb
OR batch_id IS NULL
OR written_at IS NULL
$sql$;
ELSE
EXECUTE $sql$
UPDATE graph.graph_nodes
SET document_json = COALESCE(document_json, '{}'::jsonb),
written_at = COALESCE(written_at, NOW()),
batch_id = COALESCE(batch_id, id)
WHERE document_json = '{}'::jsonb
OR batch_id IS NULL
OR written_at IS NULL
$sql$;
END IF;
END $$;
CREATE INDEX IF NOT EXISTS idx_graph_nodes_batch_id ON graph.graph_nodes (batch_id);
CREATE INDEX IF NOT EXISTS idx_graph_nodes_written_at ON graph.graph_nodes (written_at);
@@ -25,13 +61,49 @@ CREATE INDEX IF NOT EXISTS idx_graph_nodes_written_at ON graph.graph_nodes (writ
CREATE TABLE IF NOT EXISTS graph.graph_edges (
id TEXT PRIMARY KEY,
batch_id TEXT NOT NULL,
batch_id TEXT,
source_id TEXT NOT NULL,
target_id TEXT NOT NULL,
document_json JSONB NOT NULL,
written_at TIMESTAMPTZ NOT NULL
document_json JSONB NOT NULL DEFAULT '{}'::jsonb,
written_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
ALTER TABLE IF EXISTS graph.graph_edges
ADD COLUMN IF NOT EXISTS batch_id TEXT,
ADD COLUMN IF NOT EXISTS document_json JSONB NOT NULL DEFAULT '{}'::jsonb,
ADD COLUMN IF NOT EXISTS written_at TIMESTAMPTZ NOT NULL DEFAULT NOW();
DO $$
BEGIN
IF EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_schema = 'graph'
AND table_name = 'graph_edges'
AND column_name = 'data'
) THEN
EXECUTE $sql$
UPDATE graph.graph_edges
SET document_json = COALESCE(document_json, data, '{}'::jsonb),
written_at = COALESCE(written_at, created_at, NOW()),
batch_id = COALESCE(batch_id, tenant_id || ':' || id)
WHERE document_json = '{}'::jsonb
OR batch_id IS NULL
OR written_at IS NULL
$sql$;
ELSE
EXECUTE $sql$
UPDATE graph.graph_edges
SET document_json = COALESCE(document_json, '{}'::jsonb),
written_at = COALESCE(written_at, NOW()),
batch_id = COALESCE(batch_id, id)
WHERE document_json = '{}'::jsonb
OR batch_id IS NULL
OR written_at IS NULL
$sql$;
END IF;
END $$;
CREATE INDEX IF NOT EXISTS idx_graph_edges_batch_id ON graph.graph_edges (batch_id);
CREATE INDEX IF NOT EXISTS idx_graph_edges_source_id ON graph.graph_edges (source_id);
CREATE INDEX IF NOT EXISTS idx_graph_edges_target_id ON graph.graph_edges (target_id);

View File

@@ -14,7 +14,7 @@ namespace StellaOps.Graph.Indexer.Tests;
public sealed class GraphChangeStreamProcessorTests
{
[Trait("Category", TestCategories.Unit)]
[Fact]
[Fact]
public async Task ApplyStreamAsync_SkipsDuplicates_AndRetries()
{
var tenant = "tenant-a";
@@ -55,6 +55,37 @@ public sealed class GraphChangeStreamProcessorTests
Assert.True(writer.SucceededAfterRetry);
}
[Trait("Category", TestCategories.Unit)]
[Fact]
public async Task RunPollingLoopAsync_WithLongBackfillInterval_DoesNotThrowConcurrencyErrors()
{
var changeSource = new FakeChangeSource(Array.Empty<GraphChangeEvent>());
var backfillSource = new FakeChangeSource(Array.Empty<GraphChangeEvent>());
var store = new InMemoryIdempotencyStore();
var writer = new FlakyWriter(failFirst: false);
using var metrics = new GraphBackfillMetrics();
var options = Options.Create(new GraphChangeStreamOptions
{
PollInterval = TimeSpan.FromMilliseconds(5),
BackfillInterval = TimeSpan.FromMinutes(5),
RetryBackoff = TimeSpan.FromMilliseconds(5),
MaxRetryAttempts = 1
});
var processor = new GraphChangeStreamProcessor(
changeSource,
backfillSource,
writer,
store,
options,
metrics,
NullLogger<GraphChangeStreamProcessor>.Instance);
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(35));
await processor.RunPollingLoopAsync(cts.Token);
}
private sealed class FakeChangeSource : IGraphChangeEventSource, IGraphBackfillSource
{
private readonly IReadOnlyList<GraphChangeEvent> _events;