feat(infra-postgres): detect explicit transaction control in migrations

Adds MigrationSqlTransactionClassifier to recognize migration SQL that opens
its own transactions (BEGIN/COMMIT/ROLLBACK) so MigrationRunner can skip
wrapping those files in an outer transaction. StartupMigrationHost now surfaces
a MigrationCategory indicator for runtime-aligned bootstrap. Test harness
extended with an explicit-transaction fixture and execution scenario coverage.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
master
2026-04-13 21:56:27 +03:00
parent 337aa58023
commit a393b6d6e1
12 changed files with 377 additions and 33 deletions

View File

@@ -57,6 +57,11 @@ public static class MigrationCategoryExtensions
return MigrationCategory.Seed;
}
if (name.Contains("rollback", StringComparison.OrdinalIgnoreCase))
{
return MigrationCategory.Release;
}
// Try to parse leading digits
var numericPrefix = new string(name.TakeWhile(char.IsDigit).ToArray());
if (int.TryParse(numericPrefix, out var prefix))

View File

@@ -278,6 +278,37 @@ public sealed class MigrationRunner : IMigrationRunner
_logger.LogInformation("Applying migration {Migration} ({Category}) for {Module}...", migration.Name, migration.Category, ModuleName);
var sw = Stopwatch.StartNew();
var quotedSchema = QuoteIdentifier(SchemaName);
if (MigrationSqlTransactionClassifier.UsesExplicitTransactionControl(migration.Content))
{
try
{
await using (var command = new NpgsqlCommand(migration.Content, connection))
{
command.CommandTimeout = timeoutSeconds;
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
await using var recordTransaction = await connection.BeginTransactionAsync(cancellationToken).ConfigureAwait(false);
await RecordMigrationAsync(
connection,
recordTransaction,
quotedSchema,
migration,
sw.ElapsedMilliseconds,
cancellationToken).ConfigureAwait(false);
await recordTransaction.CommitAsync(cancellationToken).ConfigureAwait(false);
_logger.LogInformation("Applied migration {Migration} for {Module} in {Duration}ms.", migration.Name, ModuleName, sw.ElapsedMilliseconds);
return sw.ElapsedMilliseconds;
}
catch
{
await ResetTransactionStateAsync(connection, cancellationToken).ConfigureAwait(false);
throw;
}
}
await using var transaction = await connection.BeginTransactionAsync(cancellationToken).ConfigureAwait(false);
@@ -286,9 +317,8 @@ public sealed class MigrationRunner : IMigrationRunner
// Bind the search_path to the target module schema for this transaction.
// SET LOCAL scopes the change to the current transaction so that unqualified
// table names in migration SQL resolve to the module schema, not public.
var quotedSchemaLocal = QuoteIdentifier(SchemaName);
await using (var searchPathCommand = new NpgsqlCommand(
$"SET LOCAL search_path TO {quotedSchemaLocal}, public", connection, transaction))
$"SET LOCAL search_path TO {quotedSchema}, public", connection, transaction))
{
await searchPathCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
@@ -299,22 +329,13 @@ public sealed class MigrationRunner : IMigrationRunner
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
await using (var record = new NpgsqlCommand(
$"""
INSERT INTO {SchemaName}.schema_migrations (migration_name, category, checksum, duration_ms, applied_by)
VALUES (@name, @category, @checksum, @duration, @applied_by)
ON CONFLICT (migration_name) DO NOTHING;
""",
await RecordMigrationAsync(
connection,
transaction))
{
record.Parameters.AddWithValue("name", migration.Name);
record.Parameters.AddWithValue("category", migration.Category.ToString().ToLowerInvariant());
record.Parameters.AddWithValue("checksum", migration.Checksum);
record.Parameters.AddWithValue("duration", (int)sw.ElapsedMilliseconds);
record.Parameters.AddWithValue("applied_by", Environment.MachineName);
await record.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
transaction,
quotedSchema,
migration,
sw.ElapsedMilliseconds,
cancellationToken).ConfigureAwait(false);
await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
_logger.LogInformation("Applied migration {Migration} for {Module} in {Duration}ms.", migration.Name, ModuleName, sw.ElapsedMilliseconds);
@@ -327,6 +348,45 @@ public sealed class MigrationRunner : IMigrationRunner
}
}
private static async Task RecordMigrationAsync(
NpgsqlConnection connection,
NpgsqlTransaction transaction,
string quotedSchema,
PendingMigration migration,
long durationMs,
CancellationToken cancellationToken)
{
await using var record = new NpgsqlCommand(
$"""
INSERT INTO {quotedSchema}.schema_migrations (migration_name, category, checksum, duration_ms, applied_by)
VALUES (@name, @category, @checksum, @duration, @applied_by)
ON CONFLICT (migration_name) DO NOTHING;
""",
connection,
transaction);
record.Parameters.AddWithValue("name", migration.Name);
record.Parameters.AddWithValue("category", migration.Category.ToString().ToLowerInvariant());
record.Parameters.AddWithValue("checksum", migration.Checksum);
record.Parameters.AddWithValue("duration", (int)durationMs);
record.Parameters.AddWithValue("applied_by", Environment.MachineName);
await record.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
private static async Task ResetTransactionStateAsync(
NpgsqlConnection connection,
CancellationToken cancellationToken)
{
try
{
await using var resetCommand = new NpgsqlCommand("ROLLBACK;", connection);
await resetCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
catch
{
// Best effort only: the original migration exception should remain the failure signal.
}
}
private async Task EnsureSchemaAsync(NpgsqlConnection connection, CancellationToken cancellationToken)
{
var schemaName = QuoteIdentifier(SchemaName);

View File

@@ -0,0 +1,21 @@
using System.Text.RegularExpressions;
namespace StellaOps.Infrastructure.Postgres.Migrations;
internal static partial class MigrationSqlTransactionClassifier
{
public static bool UsesExplicitTransactionControl(string sql)
{
if (string.IsNullOrWhiteSpace(sql))
{
return false;
}
return ExplicitTransactionControlPattern().IsMatch(sql);
}
[GeneratedRegex(
@"^\s*(BEGIN(?:\s+TRANSACTION)?|START\s+TRANSACTION|COMMIT(?:\s+WORK)?|ROLLBACK(?:\s+WORK)?)\s*;\s*$",
RegexOptions.IgnoreCase | RegexOptions.Multiline | RegexOptions.CultureInvariant)]
private static partial Regex ExplicitTransactionControlPattern();
}

View File

@@ -365,6 +365,40 @@ public abstract class StartupMigrationHost : IHostedService
var sw = Stopwatch.StartNew();
var quotedSchema = QuoteIdentifier(_schemaName);
if (MigrationSqlTransactionClassifier.UsesExplicitTransactionControl(migration.Content))
{
try
{
await using (var migrationCommand = new NpgsqlCommand(migration.Content, connection))
{
migrationCommand.CommandTimeout = _options.MigrationTimeoutSeconds;
await migrationCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
await using var recordTransaction = await connection.BeginTransactionAsync(cancellationToken)
.ConfigureAwait(false);
await RecordMigrationAsync(
connection,
recordTransaction,
quotedSchema,
migration,
sw.ElapsedMilliseconds,
cancellationToken).ConfigureAwait(false);
await recordTransaction.CommitAsync(cancellationToken).ConfigureAwait(false);
_logger.LogInformation(
"Migration: {Migration} completed in {Duration}ms.",
migration.Name, sw.ElapsedMilliseconds);
return;
}
catch
{
await ResetTransactionStateAsync(connection, cancellationToken).ConfigureAwait(false);
throw;
}
}
await using var transaction = await connection.BeginTransactionAsync(cancellationToken)
.ConfigureAwait(false);
@@ -387,23 +421,13 @@ public abstract class StartupMigrationHost : IHostedService
}
// Record migration
await using (var recordCommand = new NpgsqlCommand(
$"""
INSERT INTO {quotedSchema}.schema_migrations
(migration_name, category, checksum, duration_ms, applied_by)
VALUES (@name, @category, @checksum, @duration, @applied_by)
ON CONFLICT (migration_name) DO NOTHING
""",
await RecordMigrationAsync(
connection,
transaction))
{
recordCommand.Parameters.AddWithValue("name", migration.Name);
recordCommand.Parameters.AddWithValue("category", migration.Category.ToString().ToLowerInvariant());
recordCommand.Parameters.AddWithValue("checksum", migration.Checksum);
recordCommand.Parameters.AddWithValue("duration", (int)sw.ElapsedMilliseconds);
recordCommand.Parameters.AddWithValue("applied_by", Environment.MachineName);
await recordCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
transaction,
quotedSchema,
migration,
sw.ElapsedMilliseconds,
cancellationToken).ConfigureAwait(false);
await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
@@ -418,6 +442,47 @@ public abstract class StartupMigrationHost : IHostedService
}
}
private static async Task RecordMigrationAsync(
NpgsqlConnection connection,
NpgsqlTransaction transaction,
string quotedSchema,
PendingMigration migration,
long durationMs,
CancellationToken cancellationToken)
{
await using var recordCommand = new NpgsqlCommand(
$"""
INSERT INTO {quotedSchema}.schema_migrations
(migration_name, category, checksum, duration_ms, applied_by)
VALUES (@name, @category, @checksum, @duration, @applied_by)
ON CONFLICT (migration_name) DO NOTHING
""",
connection,
transaction);
recordCommand.Parameters.AddWithValue("name", migration.Name);
recordCommand.Parameters.AddWithValue("category", migration.Category.ToString().ToLowerInvariant());
recordCommand.Parameters.AddWithValue("checksum", migration.Checksum);
recordCommand.Parameters.AddWithValue("duration", (int)durationMs);
recordCommand.Parameters.AddWithValue("applied_by", Environment.MachineName);
await recordCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
private static async Task ResetTransactionStateAsync(
NpgsqlConnection connection,
CancellationToken cancellationToken)
{
try
{
await using var resetCommand = new NpgsqlCommand("ROLLBACK;", connection);
await resetCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
catch
{
// Best effort: if the connection is not currently inside a transaction,
// there is nothing to reset before the original migration exception surfaces.
}
}
private static long ComputeLockKey(string schemaName)
{
// Use a deterministic hash of the schema name as the lock key

View File

@@ -13,6 +13,7 @@ public partial class MigrationCategoryTests
[InlineData("004_add_audit_columns.sql", MigrationCategory.Startup)]
[InlineData("100_drop_legacy_auth_columns.sql", MigrationCategory.Release)]
[InlineData("101_migrate_user_roles.sql", MigrationCategory.Release)]
[InlineData("007_enable_rls_rollback.sql", MigrationCategory.Release)]
[InlineData("S001_default_admin_role.sql", MigrationCategory.Seed)]
[InlineData("S002_system_permissions.sql", MigrationCategory.Seed)]
[InlineData("DM001_BackfillTenantIds.sql", MigrationCategory.Data)]

View File

@@ -56,6 +56,19 @@ public sealed partial class StartupMigrationHostTests
return result is true;
}
private async Task<int> GetRowCountAsync(string schemaName, string tableName)
{
await using var conn = new NpgsqlConnection(ConnectionString);
await conn.OpenAsync();
await using var cmd = new NpgsqlCommand(
$"SELECT COUNT(*) FROM \"{schemaName}\".\"{tableName}\"",
conn);
var result = await cmd.ExecuteScalarAsync();
return Convert.ToInt32(result);
}
private async Task CorruptChecksumAsync(string schemaName, string migrationName)
{
await using var conn = new NpgsqlConnection(ConnectionString);

View File

@@ -1,4 +1,5 @@
using FluentAssertions;
using Microsoft.Extensions.Logging.Abstractions;
using StellaOps.Infrastructure.Postgres.Migrations;
using Xunit;
@@ -52,6 +53,53 @@ public sealed partial class StartupMigrationHostTests
finalCount.Should().Be(initialCount);
}
[Fact]
public async Task StartAsync_WithExplicitTransactionMigration_AppliesAndRecordsItAsync()
{
// Arrange
var schemaName = $"test_{Guid.NewGuid():N}"[..20];
var options = new StartupMigrationOptions { FailOnPendingReleaseMigrations = false };
var host = CreateTestHost(schemaName, options: options);
// Act
await host.StartAsync(CancellationToken.None);
// Assert
var migrations = await GetAppliedMigrationNamesAsync(schemaName);
migrations.Should().Contain("003_explicit_transaction.sql");
var markerTableExists = await TableExistsAsync(schemaName, "explicit_transaction_markers");
markerTableExists.Should().BeTrue();
var markerCount = await GetRowCountAsync(schemaName, "explicit_transaction_markers");
markerCount.Should().Be(1);
}
[Fact]
public async Task MigrationRunner_RunFromAssemblyAsync_WithExplicitTransactionMigration_AppliesAndRecordsItAsync()
{
// Arrange
var schemaName = $"test_{Guid.NewGuid():N}"[..20];
var runner = new MigrationRunner(
ConnectionString,
schemaName,
"TestRunner",
NullLogger<MigrationRunner>.Instance);
// Act
var result = await runner.RunFromAssemblyAsync(
typeof(StartupMigrationHostTests).Assembly,
resourcePrefix: "TestMigrations",
options: new MigrationRunOptions());
// Assert
result.Success.Should().BeTrue();
var migrations = await GetAppliedMigrationNamesAsync(schemaName);
migrations.Should().Contain("003_explicit_transaction.sql");
var markerCount = await GetRowCountAsync(schemaName, "explicit_transaction_markers");
markerCount.Should().Be(1);
}
[Fact]
public async Task StartAsync_CreatesSchemaAndMigrationsTableAsync()
{

View File

@@ -0,0 +1,12 @@
BEGIN;
CREATE TABLE IF NOT EXISTS explicit_transaction_markers (
id INT PRIMARY KEY,
note TEXT NOT NULL
);
INSERT INTO explicit_transaction_markers (id, note)
VALUES (1, 'applied')
ON CONFLICT (id) DO NOTHING;
COMMIT;