diff --git a/src/Graph/StellaOps.Graph.Api/Services/GraphSavedViewsMigrationHostedService.cs b/src/Graph/StellaOps.Graph.Api/Services/GraphSavedViewsMigrationHostedService.cs index 283d59274..704f27766 100644 --- a/src/Graph/StellaOps.Graph.Api/Services/GraphSavedViewsMigrationHostedService.cs +++ b/src/Graph/StellaOps.Graph.Api/Services/GraphSavedViewsMigrationHostedService.cs @@ -1,6 +1,7 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using StellaOps.Graph.Indexer.Persistence.Extensions; using StellaOps.Infrastructure.Postgres.Migrations; using StellaOps.Infrastructure.Postgres.Options; @@ -11,7 +12,7 @@ public sealed class GraphSavedViewsMigrationHostedService : IHostedService private readonly IOptions _options; private readonly ILoggerFactory _loggerFactory; private readonly IHostApplicationLifetime _lifetime; - private StartupMigrationHost? _inner; + private readonly List _hosts = []; public GraphSavedViewsMigrationHostedService( IOptions options, @@ -31,31 +32,62 @@ public sealed class GraphSavedViewsMigrationHostedService : IHostedService return Task.CompletedTask; } - _inner = new GraphSavedViewsStartupMigrationHost( + var schemaName = string.IsNullOrWhiteSpace(options.SchemaName) + ? PostgresGraphSavedViewStore.DefaultSchemaName + : options.SchemaName.Trim(); + + _hosts.Clear(); + _hosts.Add(new GraphStartupMigrationHost( options.ConnectionString, - string.IsNullOrWhiteSpace(options.SchemaName) - ? PostgresGraphSavedViewStore.DefaultSchemaName - : options.SchemaName.Trim(), + schemaName, + "Graph.Persistence", + typeof(GraphIndexerPersistenceExtensions).Assembly, + _loggerFactory.CreateLogger("Migration.Graph.Persistence"), + _lifetime)); + _hosts.Add(new GraphStartupMigrationHost( + options.ConnectionString, + schemaName, + "Graph.Api", + typeof(Program).Assembly, _loggerFactory.CreateLogger("Migration.Graph.Api"), - _lifetime); - return _inner.StartAsync(cancellationToken); + _lifetime)); + + return StartHostsAsync(cancellationToken); } public Task StopAsync(CancellationToken cancellationToken) - => _inner?.StopAsync(cancellationToken) ?? Task.CompletedTask; + => StopHostsAsync(cancellationToken); - private sealed class GraphSavedViewsStartupMigrationHost : StartupMigrationHost + private async Task StartHostsAsync(CancellationToken cancellationToken) { - public GraphSavedViewsStartupMigrationHost( + foreach (var host in _hosts) + { + await host.StartAsync(cancellationToken).ConfigureAwait(false); + } + } + + private async Task StopHostsAsync(CancellationToken cancellationToken) + { + foreach (var host in _hosts) + { + await host.StopAsync(cancellationToken).ConfigureAwait(false); + } + } + + private sealed class GraphStartupMigrationHost : StartupMigrationHost + { + public GraphStartupMigrationHost( string connectionString, string schemaName, + string moduleName, + System.Reflection.Assembly migrationsAssembly, ILogger logger, IHostApplicationLifetime lifetime) : base( connectionString, schemaName, - "Graph.Api", - typeof(Program).Assembly, + moduleName, + migrationsAssembly, logger, lifetime) { diff --git a/src/Graph/StellaOps.Graph.Indexer/Incremental/GraphChangeStreamProcessor.cs b/src/Graph/StellaOps.Graph.Indexer/Incremental/GraphChangeStreamProcessor.cs index c93ad7958..90671f668 100644 --- a/src/Graph/StellaOps.Graph.Indexer/Incremental/GraphChangeStreamProcessor.cs +++ b/src/Graph/StellaOps.Graph.Indexer/Incremental/GraphChangeStreamProcessor.cs @@ -3,6 +3,7 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StellaOps.Graph.Indexer.Ingestion.Sbom; +using System.Diagnostics; using System.Globalization; using System.Linq; using System.Text.Json.Nodes; @@ -37,30 +38,30 @@ public sealed class GraphChangeStreamProcessor : BackgroundService _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } - protected override async Task ExecuteAsync(CancellationToken stoppingToken) + protected override Task ExecuteAsync(CancellationToken stoppingToken) + => RunPollingLoopAsync(stoppingToken); + + internal async Task RunPollingLoopAsync(CancellationToken stoppingToken) { using var pollTimer = new PeriodicTimer(_options.PollInterval); - using var backfillTimer = new PeriodicTimer(_options.BackfillInterval); + var backfillStopwatch = Stopwatch.StartNew(); - while (!stoppingToken.IsCancellationRequested) + try { - var pollTask = pollTimer.WaitForNextTickAsync(stoppingToken).AsTask(); - var backfillTask = backfillTimer.WaitForNextTickAsync(stoppingToken).AsTask(); - - var completed = await Task.WhenAny(pollTask, backfillTask).ConfigureAwait(false); - if (completed.IsCanceled || stoppingToken.IsCancellationRequested) - { - break; - } - - if (completed == pollTask) + while (await pollTimer.WaitForNextTickAsync(stoppingToken).ConfigureAwait(false)) { await ApplyStreamAsync(isBackfill: false, stoppingToken).ConfigureAwait(false); + + if (_options.BackfillInterval <= TimeSpan.Zero || backfillStopwatch.Elapsed >= _options.BackfillInterval) + { + await ApplyStreamAsync(isBackfill: true, stoppingToken).ConfigureAwait(false); + backfillStopwatch.Restart(); + } } - else - { - await ApplyStreamAsync(isBackfill: true, stoppingToken).ConfigureAwait(false); - } + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + // Graceful BackgroundService shutdown. } } diff --git a/src/Graph/__Libraries/StellaOps.Graph.Indexer.Persistence/Migrations/002_efcore_repository_tables.sql b/src/Graph/__Libraries/StellaOps.Graph.Indexer.Persistence/Migrations/002_efcore_repository_tables.sql index 268746803..3e72c2865 100644 --- a/src/Graph/__Libraries/StellaOps.Graph.Indexer.Persistence/Migrations/002_efcore_repository_tables.sql +++ b/src/Graph/__Libraries/StellaOps.Graph.Indexer.Persistence/Migrations/002_efcore_repository_tables.sql @@ -11,11 +11,47 @@ CREATE SCHEMA IF NOT EXISTS graph; CREATE TABLE IF NOT EXISTS graph.graph_nodes ( id TEXT PRIMARY KEY, - batch_id TEXT NOT NULL, - document_json JSONB NOT NULL, - written_at TIMESTAMPTZ NOT NULL + batch_id TEXT, + document_json JSONB NOT NULL DEFAULT '{}'::jsonb, + written_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); +ALTER TABLE IF EXISTS graph.graph_nodes + ADD COLUMN IF NOT EXISTS batch_id TEXT, + ADD COLUMN IF NOT EXISTS document_json JSONB NOT NULL DEFAULT '{}'::jsonb, + ADD COLUMN IF NOT EXISTS written_at TIMESTAMPTZ NOT NULL DEFAULT NOW(); + +DO $$ +BEGIN + IF EXISTS ( + SELECT 1 + FROM information_schema.columns + WHERE table_schema = 'graph' + AND table_name = 'graph_nodes' + AND column_name = 'data' + ) THEN + EXECUTE $sql$ + UPDATE graph.graph_nodes + SET document_json = COALESCE(document_json, data, '{}'::jsonb), + written_at = COALESCE(written_at, created_at, NOW()), + batch_id = COALESCE(batch_id, tenant_id || ':' || id) + WHERE document_json = '{}'::jsonb + OR batch_id IS NULL + OR written_at IS NULL + $sql$; + ELSE + EXECUTE $sql$ + UPDATE graph.graph_nodes + SET document_json = COALESCE(document_json, '{}'::jsonb), + written_at = COALESCE(written_at, NOW()), + batch_id = COALESCE(batch_id, id) + WHERE document_json = '{}'::jsonb + OR batch_id IS NULL + OR written_at IS NULL + $sql$; + END IF; +END $$; + CREATE INDEX IF NOT EXISTS idx_graph_nodes_batch_id ON graph.graph_nodes (batch_id); CREATE INDEX IF NOT EXISTS idx_graph_nodes_written_at ON graph.graph_nodes (written_at); @@ -25,13 +61,49 @@ CREATE INDEX IF NOT EXISTS idx_graph_nodes_written_at ON graph.graph_nodes (writ CREATE TABLE IF NOT EXISTS graph.graph_edges ( id TEXT PRIMARY KEY, - batch_id TEXT NOT NULL, + batch_id TEXT, source_id TEXT NOT NULL, target_id TEXT NOT NULL, - document_json JSONB NOT NULL, - written_at TIMESTAMPTZ NOT NULL + document_json JSONB NOT NULL DEFAULT '{}'::jsonb, + written_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); +ALTER TABLE IF EXISTS graph.graph_edges + ADD COLUMN IF NOT EXISTS batch_id TEXT, + ADD COLUMN IF NOT EXISTS document_json JSONB NOT NULL DEFAULT '{}'::jsonb, + ADD COLUMN IF NOT EXISTS written_at TIMESTAMPTZ NOT NULL DEFAULT NOW(); + +DO $$ +BEGIN + IF EXISTS ( + SELECT 1 + FROM information_schema.columns + WHERE table_schema = 'graph' + AND table_name = 'graph_edges' + AND column_name = 'data' + ) THEN + EXECUTE $sql$ + UPDATE graph.graph_edges + SET document_json = COALESCE(document_json, data, '{}'::jsonb), + written_at = COALESCE(written_at, created_at, NOW()), + batch_id = COALESCE(batch_id, tenant_id || ':' || id) + WHERE document_json = '{}'::jsonb + OR batch_id IS NULL + OR written_at IS NULL + $sql$; + ELSE + EXECUTE $sql$ + UPDATE graph.graph_edges + SET document_json = COALESCE(document_json, '{}'::jsonb), + written_at = COALESCE(written_at, NOW()), + batch_id = COALESCE(batch_id, id) + WHERE document_json = '{}'::jsonb + OR batch_id IS NULL + OR written_at IS NULL + $sql$; + END IF; +END $$; + CREATE INDEX IF NOT EXISTS idx_graph_edges_batch_id ON graph.graph_edges (batch_id); CREATE INDEX IF NOT EXISTS idx_graph_edges_source_id ON graph.graph_edges (source_id); CREATE INDEX IF NOT EXISTS idx_graph_edges_target_id ON graph.graph_edges (target_id); diff --git a/src/Graph/__Tests/StellaOps.Graph.Indexer.Tests/GraphChangeStreamProcessorTests.cs b/src/Graph/__Tests/StellaOps.Graph.Indexer.Tests/GraphChangeStreamProcessorTests.cs index 6c1b6e6b6..a1496d792 100644 --- a/src/Graph/__Tests/StellaOps.Graph.Indexer.Tests/GraphChangeStreamProcessorTests.cs +++ b/src/Graph/__Tests/StellaOps.Graph.Indexer.Tests/GraphChangeStreamProcessorTests.cs @@ -14,7 +14,7 @@ namespace StellaOps.Graph.Indexer.Tests; public sealed class GraphChangeStreamProcessorTests { [Trait("Category", TestCategories.Unit)] - [Fact] + [Fact] public async Task ApplyStreamAsync_SkipsDuplicates_AndRetries() { var tenant = "tenant-a"; @@ -55,6 +55,37 @@ public sealed class GraphChangeStreamProcessorTests Assert.True(writer.SucceededAfterRetry); } + [Trait("Category", TestCategories.Unit)] + [Fact] + public async Task RunPollingLoopAsync_WithLongBackfillInterval_DoesNotThrowConcurrencyErrors() + { + var changeSource = new FakeChangeSource(Array.Empty()); + var backfillSource = new FakeChangeSource(Array.Empty()); + var store = new InMemoryIdempotencyStore(); + var writer = new FlakyWriter(failFirst: false); + using var metrics = new GraphBackfillMetrics(); + + var options = Options.Create(new GraphChangeStreamOptions + { + PollInterval = TimeSpan.FromMilliseconds(5), + BackfillInterval = TimeSpan.FromMinutes(5), + RetryBackoff = TimeSpan.FromMilliseconds(5), + MaxRetryAttempts = 1 + }); + + var processor = new GraphChangeStreamProcessor( + changeSource, + backfillSource, + writer, + store, + options, + metrics, + NullLogger.Instance); + + using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(35)); + await processor.RunPollingLoopAsync(cts.Token); + } + private sealed class FakeChangeSource : IGraphChangeEventSource, IGraphBackfillSource { private readonly IReadOnlyList _events;