Harden remaining runtime transport lifecycles

This commit is contained in:
master
2026-04-06 00:24:16 +03:00
parent 751546084e
commit fc798a1573
29 changed files with 311 additions and 107 deletions

23
src/Workflow/AGENTS.md Normal file
View File

@@ -0,0 +1,23 @@
# AGENTS - Workflow Module
## Working Directory
- `src/Workflow/**` (workflow abstractions, engine, storage backends, signaling, and tests).
## Required Reading
- `docs/README.md`
- `docs/07_HIGH_LEVEL_ARCHITECTURE.md`
- `docs/modules/platform/architecture-overview.md`
- `docs/modules/release-orchestrator/modules/workflow-engine.md`
## Engineering Rules
- Preserve deterministic workflow state transitions, event ordering, and projection behavior across storage backends.
- Keep PostgreSQL, signaling, and engine changes compatible at the contract level unless the sprint explicitly scopes a cross-backend divergence.
- Runtime storage code must follow the shared transport attribution rules: stable client identity, pooled connections, and no anonymous long-lived transports.
- Avoid cross-module edits unless the active sprint explicitly allows them.
## Testing & Verification
- Workflow backend/storage changes must run the targeted backend test project, not just the broader module solution.
- Prefer focused integration coverage for persistence, signaling, wake-outbox, and projection flows when touching storage or transport behavior.
## Sprint Discipline
- Track task state in the active sprint file and record blockers or contract changes in `Decisions & Risks`.

View File

@@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("StellaOps.Workflow.DataStore.PostgreSQL.Tests")]

View File

@@ -7,6 +7,11 @@ public sealed class PostgresWorkflowBackendOptions
public const string SectionName = $"{WorkflowBackendOptions.SectionName}:Postgres";
public string ConnectionStringName { get; set; } = "WorkflowPostgres";
public string? ApplicationName { get; set; }
public bool Pooling { get; set; } = true;
public int MinPoolSize { get; set; } = 1;
public int MaxPoolSize { get; set; } = 100;
public int? ConnectionIdleLifetimeSeconds { get; set; } = 300;
public string SchemaName { get; set; } = "srd_wfklw";
public string RuntimeStatesTableName { get; set; } = "wf_runtime_states";
public string HostedJobLocksTableName { get; set; } = "wf_host_locks";

View File

@@ -18,6 +18,7 @@ public sealed class PostgresWorkflowDatabase(
PostgresWorkflowMutationSessionAccessor sessionAccessor,
IWorkflowMutationScopeAccessor scopeAccessor)
{
private const string DefaultApplicationName = "stellaops-workflow";
private readonly PostgresWorkflowBackendOptions postgres = options.Value;
internal PostgresWorkflowBackendOptions Options => postgres;
@@ -86,14 +87,36 @@ public sealed class PostgresWorkflowDatabase(
}
internal async Task<NpgsqlConnection> OpenConnectionAsync(CancellationToken cancellationToken = default)
{
var connection = new NpgsqlConnection(BuildConnectionString());
await connection.OpenAsync(cancellationToken);
return connection;
}
internal string BuildConnectionString()
{
var connectionString = configuration.GetConnectionString(postgres.ConnectionStringName)
?? throw new InvalidOperationException(
$"PostgreSQL workflow backend requires connection string '{postgres.ConnectionStringName}'.");
var connection = new NpgsqlConnection(connectionString);
await connection.OpenAsync(cancellationToken);
return connection;
var normalizedMinPoolSize = Math.Max(postgres.MinPoolSize, 0);
var normalizedMaxPoolSize = Math.Max(postgres.MaxPoolSize, normalizedMinPoolSize);
var builder = new NpgsqlConnectionStringBuilder(connectionString)
{
ApplicationName = string.IsNullOrWhiteSpace(postgres.ApplicationName)
? DefaultApplicationName
: postgres.ApplicationName.Trim(),
Pooling = postgres.Pooling,
MinPoolSize = normalizedMinPoolSize,
MaxPoolSize = normalizedMaxPoolSize,
};
if (postgres.ConnectionIdleLifetimeSeconds.HasValue && postgres.ConnectionIdleLifetimeSeconds.Value > 0)
{
builder.ConnectionIdleLifetime = postgres.ConnectionIdleLifetimeSeconds.Value;
}
return builder.ToString();
}
}

View File

@@ -0,0 +1,78 @@
using System.Collections.Generic;
using StellaOps.Workflow.DataStore.PostgreSQL;
using StellaOps.Workflow.Engine.Services;
using FluentAssertions;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Options;
using Npgsql;
using NUnit.Framework;
namespace StellaOps.Workflow.DataStore.PostgreSQL.Tests;
[TestFixture]
public class PostgresWorkflowDatabaseTests
{
[Test]
public void BuildConnectionString_WhenApplicationNameIsOmitted_UsesWorkflowDefaults()
{
var options = new PostgresWorkflowBackendOptions
{
ConnectionStringName = "WorkflowPostgres",
MinPoolSize = 2,
MaxPoolSize = 9,
ConnectionIdleLifetimeSeconds = 123,
};
var connectionString = CreateDatabase(options).BuildConnectionString();
var builder = new NpgsqlConnectionStringBuilder(connectionString);
builder.ApplicationName.Should().Be("stellaops-workflow");
builder.Pooling.Should().BeTrue();
builder.MinPoolSize.Should().Be(2);
builder.MaxPoolSize.Should().Be(9);
builder.ConnectionIdleLifetime.Should().Be(123);
}
[Test]
public void BuildConnectionString_WhenExplicitSettingsAreProvided_AppliesNormalizedValues()
{
var options = new PostgresWorkflowBackendOptions
{
ConnectionStringName = "WorkflowPostgres",
ApplicationName = "workflow-projection-tests",
Pooling = false,
MinPoolSize = 8,
MaxPoolSize = 3,
};
var connectionString = CreateDatabase(options).BuildConnectionString();
var builder = new NpgsqlConnectionStringBuilder(connectionString);
builder.ApplicationName.Should().Be("workflow-projection-tests");
builder.Pooling.Should().BeFalse();
builder.MinPoolSize.Should().Be(8);
builder.MaxPoolSize.Should().Be(8);
}
private static PostgresWorkflowDatabase CreateDatabase(PostgresWorkflowBackendOptions options)
{
var configuration = new ConfigurationBuilder()
.AddInMemoryCollection(new Dictionary<string, string?>
{
[$"ConnectionStrings:{options.ConnectionStringName}"] =
"Host=localhost;Database=workflow;Username=workflow;Password=workflow",
})
.Build();
return new PostgresWorkflowDatabase(
configuration,
Options.Create(options),
new PostgresWorkflowMutationSessionAccessor(),
new WorkflowMutationScopeAccessor());
}
}