Bind startup migrations to module schema search path
This commit is contained in:
@@ -91,6 +91,7 @@ public abstract class StartupMigrationHost : IHostedService
|
||||
{
|
||||
// Step 2: Ensure schema and migrations table exist
|
||||
await EnsureSchemaAsync(connection, cancellationToken).ConfigureAwait(false);
|
||||
await SetSearchPathAsync(connection, cancellationToken).ConfigureAwait(false);
|
||||
await EnsureMigrationsTableAsync(connection, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// Step 3: Load and categorize migrations
|
||||
@@ -237,17 +238,28 @@ public abstract class StartupMigrationHost : IHostedService
|
||||
|
||||
private async Task EnsureSchemaAsync(NpgsqlConnection connection, CancellationToken cancellationToken)
|
||||
{
|
||||
var quotedSchema = QuoteIdentifier(_schemaName);
|
||||
await using var command = new NpgsqlCommand(
|
||||
$"CREATE SCHEMA IF NOT EXISTS {_schemaName}",
|
||||
$"CREATE SCHEMA IF NOT EXISTS {quotedSchema}",
|
||||
connection);
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task SetSearchPathAsync(NpgsqlConnection connection, CancellationToken cancellationToken)
|
||||
{
|
||||
var quotedSchema = QuoteIdentifier(_schemaName);
|
||||
await using var command = new NpgsqlCommand(
|
||||
$"SET search_path TO {quotedSchema}, public",
|
||||
connection);
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task EnsureMigrationsTableAsync(NpgsqlConnection connection, CancellationToken cancellationToken)
|
||||
{
|
||||
var quotedSchema = QuoteIdentifier(_schemaName);
|
||||
await using var command = new NpgsqlCommand(
|
||||
$"""
|
||||
CREATE TABLE IF NOT EXISTS {_schemaName}.schema_migrations (
|
||||
CREATE TABLE IF NOT EXISTS {quotedSchema}.schema_migrations (
|
||||
migration_name TEXT PRIMARY KEY,
|
||||
category TEXT NOT NULL DEFAULT 'startup',
|
||||
checksum TEXT NOT NULL,
|
||||
@@ -258,7 +270,7 @@ public abstract class StartupMigrationHost : IHostedService
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_schema_migrations_applied_at
|
||||
ON {_schemaName}.schema_migrations(applied_at DESC);
|
||||
ON {quotedSchema}.schema_migrations(applied_at DESC);
|
||||
""",
|
||||
connection);
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
@@ -269,9 +281,10 @@ public abstract class StartupMigrationHost : IHostedService
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var result = new Dictionary<string, AppliedMigration>(StringComparer.Ordinal);
|
||||
var quotedSchema = QuoteIdentifier(_schemaName);
|
||||
|
||||
await using var command = new NpgsqlCommand(
|
||||
$"SELECT migration_name, category, checksum, applied_at FROM {_schemaName}.schema_migrations",
|
||||
$"SELECT migration_name, category, checksum, applied_at FROM {quotedSchema}.schema_migrations",
|
||||
connection);
|
||||
|
||||
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
|
||||
@@ -350,6 +363,7 @@ public abstract class StartupMigrationHost : IHostedService
|
||||
migration.Name, migration.Category);
|
||||
|
||||
var sw = Stopwatch.StartNew();
|
||||
var quotedSchema = QuoteIdentifier(_schemaName);
|
||||
|
||||
await using var transaction = await connection.BeginTransactionAsync(cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
@@ -366,7 +380,7 @@ public abstract class StartupMigrationHost : IHostedService
|
||||
// Record migration
|
||||
await using (var recordCommand = new NpgsqlCommand(
|
||||
$"""
|
||||
INSERT INTO {_schemaName}.schema_migrations
|
||||
INSERT INTO {quotedSchema}.schema_migrations
|
||||
(migration_name, category, checksum, duration_ms, applied_by)
|
||||
VALUES (@name, @category, @checksum, @duration, @applied_by)
|
||||
ON CONFLICT (migration_name) DO NOTHING
|
||||
@@ -434,6 +448,12 @@ public abstract class StartupMigrationHost : IHostedService
|
||||
return lastSlash >= 0 ? resourceName[(lastSlash + 1)..] : resourceName;
|
||||
}
|
||||
|
||||
private static string QuoteIdentifier(string identifier)
|
||||
{
|
||||
var escaped = identifier.Replace("\"", "\"\"", StringComparison.Ordinal);
|
||||
return $"\"{escaped}\"";
|
||||
}
|
||||
|
||||
private record AppliedMigration(string Name, string Category, string Checksum, DateTimeOffset AppliedAt);
|
||||
private record PendingMigration(string Name, string ResourceName, MigrationCategory Category, string Checksum, string Content);
|
||||
}
|
||||
|
||||
@@ -33,6 +33,29 @@ public sealed partial class StartupMigrationHostTests
|
||||
return result is true;
|
||||
}
|
||||
|
||||
private async Task<bool> ColumnExistsAsync(string schemaName, string tableName, string columnName)
|
||||
{
|
||||
await using var conn = new NpgsqlConnection(ConnectionString);
|
||||
await conn.OpenAsync();
|
||||
|
||||
await using var cmd = new NpgsqlCommand(
|
||||
"""
|
||||
SELECT EXISTS(
|
||||
SELECT 1
|
||||
FROM information_schema.columns
|
||||
WHERE table_schema = @schema
|
||||
AND table_name = @table
|
||||
AND column_name = @column)
|
||||
""",
|
||||
conn);
|
||||
cmd.Parameters.AddWithValue("schema", schemaName);
|
||||
cmd.Parameters.AddWithValue("table", tableName);
|
||||
cmd.Parameters.AddWithValue("column", columnName);
|
||||
|
||||
var result = await cmd.ExecuteScalarAsync();
|
||||
return result is true;
|
||||
}
|
||||
|
||||
private async Task CorruptChecksumAsync(string schemaName, string migrationName)
|
||||
{
|
||||
await using var conn = new NpgsqlConnection(ConnectionString);
|
||||
|
||||
@@ -0,0 +1,53 @@
|
||||
using FluentAssertions;
|
||||
using Npgsql;
|
||||
using StellaOps.Infrastructure.Postgres.Migrations;
|
||||
using Xunit;
|
||||
|
||||
namespace StellaOps.Infrastructure.Postgres.Tests.Migrations;
|
||||
|
||||
public sealed partial class StartupMigrationHostTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task StartAsync_WithPublicNameCollision_AppliesMigrationsInTargetSchema()
|
||||
{
|
||||
// Arrange
|
||||
var schemaName = $"test_{Guid.NewGuid():N}"[..20];
|
||||
var options = new StartupMigrationOptions { FailOnPendingReleaseMigrations = false };
|
||||
|
||||
await using (var conn = new NpgsqlConnection(ConnectionString))
|
||||
{
|
||||
await conn.OpenAsync();
|
||||
await using var seed = new NpgsqlCommand(
|
||||
"""
|
||||
DROP TABLE IF EXISTS public.test_table;
|
||||
CREATE TABLE public.test_table (
|
||||
id SERIAL PRIMARY KEY,
|
||||
name TEXT NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
""",
|
||||
conn);
|
||||
await seed.ExecuteNonQueryAsync();
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var host = CreateTestHost(schemaName, options: options);
|
||||
|
||||
// Act
|
||||
await host.StartAsync(CancellationToken.None);
|
||||
|
||||
// Assert
|
||||
(await TableExistsAsync(schemaName, "test_table")).Should().BeTrue();
|
||||
(await ColumnExistsAsync(schemaName, "test_table", "description")).Should().BeTrue();
|
||||
(await ColumnExistsAsync("public", "test_table", "description")).Should().BeFalse();
|
||||
}
|
||||
finally
|
||||
{
|
||||
await using var conn = new NpgsqlConnection(ConnectionString);
|
||||
await conn.OpenAsync();
|
||||
await using var cleanup = new NpgsqlCommand("DROP TABLE IF EXISTS public.test_table;", conn);
|
||||
await cleanup.ExecuteNonQueryAsync();
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user