Add StellaOps.Workflow engine: 14 libraries, WebService, 8 test projects

Extract product-agnostic workflow engine from Ablera.Serdica.Workflow into
standalone StellaOps.Workflow.* libraries targeting net10.0.

Libraries (14):
- Contracts, Abstractions (compiler, decompiler, expression runtime)
- Engine (execution, signaling, scheduling, projections, hosted services)
- ElkSharp (generic graph layout algorithm)
- Renderer.ElkSharp, Renderer.ElkJs, Renderer.Msagl, Renderer.Svg
- Signaling.Redis, Signaling.OracleAq
- DataStore.MongoDB, DataStore.PostgreSQL, DataStore.Oracle

WebService: ASP.NET Core Minimal API with 22 endpoints

Tests (8 projects, 109 tests pass):
- Engine.Tests (105 pass), WebService.Tests (4 E2E pass)
- Renderer.Tests, DataStore.MongoDB/Oracle/PostgreSQL.Tests
- Signaling.Redis.Tests, IntegrationTests.Shared

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
master
2026-03-20 19:14:44 +02:00
parent e56f9a114a
commit f5b5f24d95
422 changed files with 85428 additions and 0 deletions

View File

@@ -0,0 +1 @@
global using StellaOps.Workflow.IntegrationTests.Shared.Performance;

View File

@@ -0,0 +1,81 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using StellaOps.Workflow.Engine.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
namespace StellaOps.Workflow.IntegrationTests.Shared.Performance;
internal static class WorkflowEnginePerformanceSupport
{
public static async Task<IAsyncDisposable> StartHostedServicesAsync(
IServiceProvider provider,
CancellationToken cancellationToken = default)
{
var services = provider.GetServices<IHostedService>().ToArray();
foreach (var service in services)
{
await service.StartAsync(cancellationToken);
}
return new HostedServicesHandle(services);
}
public static async Task<T> WithRuntimeServiceAsync<T>(
IServiceProvider provider,
Func<WorkflowRuntimeService, Task<T>> action)
{
using var scope = provider.CreateScope();
var runtimeService = scope.ServiceProvider.GetRequiredService<WorkflowRuntimeService>();
return await action(runtimeService);
}
public static async Task WithRuntimeServiceAsync(
IServiceProvider provider,
Func<WorkflowRuntimeService, Task> action)
{
using var scope = provider.CreateScope();
var runtimeService = scope.ServiceProvider.GetRequiredService<WorkflowRuntimeService>();
await action(runtimeService);
}
public static async Task<IReadOnlyList<TResult>> RunConcurrentAsync<TInput, TResult>(
IEnumerable<TInput> items,
int concurrency,
Func<TInput, Task<TResult>> action)
{
ArgumentOutOfRangeException.ThrowIfLessThan(concurrency, 1);
using var semaphore = new SemaphoreSlim(concurrency);
var tasks = items.Select(async item =>
{
await semaphore.WaitAsync();
try
{
return await action(item);
}
finally
{
semaphore.Release();
}
});
return await Task.WhenAll(tasks);
}
private sealed class HostedServicesHandle(IReadOnlyList<IHostedService> services) : IAsyncDisposable
{
public async ValueTask DisposeAsync()
{
for (var index = services.Count - 1; index >= 0; index--)
{
await services[index].StopAsync(CancellationToken.None);
}
}
}
}

View File

@@ -0,0 +1,483 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;
using System.Text.Json;
using NUnit.Framework;
namespace StellaOps.Workflow.IntegrationTests.Shared.Performance;
public static class WorkflowPerformanceCategories
{
public const string Latency = "WorkflowPerfLatency";
public const string Throughput = "WorkflowPerfThroughput";
public const string Smoke = "WorkflowPerfSmoke";
public const string Nightly = "WorkflowPerfNightly";
public const string Soak = "WorkflowPerfSoak";
public const string Capacity = "WorkflowPerfCapacity";
public const string Comparison = "WorkflowPerfComparison";
}
public sealed record WorkflowPerformanceRunResult
{
public required string ScenarioName { get; init; }
public required string Tier { get; init; }
public required string EnvironmentName { get; init; }
public required DateTime StartedAtUtc { get; init; }
public required DateTime CompletedAtUtc { get; init; }
public required int OperationCount { get; init; }
public required int Concurrency { get; init; }
public required WorkflowPerformanceCounters Counters { get; init; }
public required WorkflowPerformanceResourceSnapshot ResourceSnapshot { get; init; }
public required Dictionary<string, string> Metadata { get; init; }
public WorkflowPerformanceLatencySummary? LatencySummary { get; init; }
public Dictionary<string, WorkflowPerformanceLatencySummary>? PhaseLatencySummaries { get; init; }
public WorkflowPerformanceBaselineComparison? BaselineComparison { get; init; }
public WorkflowPerformanceBackendMetrics? BackendMetrics { get; init; }
public double DurationMilliseconds =>
(CompletedAtUtc - StartedAtUtc).TotalMilliseconds;
public double ThroughputPerSecond =>
DurationMilliseconds <= 0
? 0
: OperationCount / (DurationMilliseconds / 1000d);
}
public sealed record WorkflowPerformanceCounters
{
public int WorkflowsStarted { get; init; }
public int TasksActivated { get; init; }
public int TasksCompleted { get; init; }
public int SignalsPublished { get; init; }
public int SignalsProcessed { get; init; }
public int SignalsIgnored { get; init; }
public int DeadLetteredSignals { get; init; }
public int RuntimeConflicts { get; init; }
public int Failures { get; init; }
public int StuckInstances { get; init; }
}
public sealed record WorkflowPerformanceLatencySummary
{
public required int SampleCount { get; init; }
public required double AverageMilliseconds { get; init; }
public required double P50Milliseconds { get; init; }
public required double P95Milliseconds { get; init; }
public required double P99Milliseconds { get; init; }
public required double MaxMilliseconds { get; init; }
public static WorkflowPerformanceLatencySummary? FromSamples(IEnumerable<TimeSpan> samples)
{
var ordered = samples
.Select(sample => sample.TotalMilliseconds)
.OrderBy(value => value)
.ToArray();
if (ordered.Length == 0)
{
return null;
}
return new WorkflowPerformanceLatencySummary
{
SampleCount = ordered.Length,
AverageMilliseconds = ordered.Average(),
P50Milliseconds = Percentile(ordered, 0.50),
P95Milliseconds = Percentile(ordered, 0.95),
P99Milliseconds = Percentile(ordered, 0.99),
MaxMilliseconds = ordered[^1],
};
}
private static double Percentile(double[] ordered, double percentile)
{
if (ordered.Length == 1)
{
return ordered[0];
}
var position = (ordered.Length - 1) * percentile;
var lowerIndex = (int)Math.Floor(position);
var upperIndex = (int)Math.Ceiling(position);
if (lowerIndex == upperIndex)
{
return ordered[lowerIndex];
}
var weight = position - lowerIndex;
return ordered[lowerIndex] + ((ordered[upperIndex] - ordered[lowerIndex]) * weight);
}
}
public sealed record WorkflowPerformanceResourceSnapshot
{
public required long WorkingSetBytes { get; init; }
public required long PrivateMemoryBytes { get; init; }
public required string MachineName { get; init; }
public required string FrameworkDescription { get; init; }
public required string OsDescription { get; init; }
public static WorkflowPerformanceResourceSnapshot CaptureCurrent()
{
var process = Process.GetCurrentProcess();
return new WorkflowPerformanceResourceSnapshot
{
WorkingSetBytes = process.WorkingSet64,
PrivateMemoryBytes = process.PrivateMemorySize64,
MachineName = Environment.MachineName,
FrameworkDescription = RuntimeInformation.FrameworkDescription,
OsDescription = RuntimeInformation.OSDescription,
};
}
}
public sealed record WorkflowPerformanceBaselineComparison
{
public required string Status { get; init; }
public string? BaselineJsonPath { get; init; }
public double? ThroughputDeltaPercent { get; init; }
public double? AverageLatencyDeltaPercent { get; init; }
public double? P95LatencyDeltaPercent { get; init; }
public double? MaxLatencyDeltaPercent { get; init; }
public static WorkflowPerformanceBaselineComparison Missing()
{
return new WorkflowPerformanceBaselineComparison
{
Status = "Missing",
};
}
public static WorkflowPerformanceBaselineComparison Compare(
string baselineJsonPath,
WorkflowPerformanceRunResult baseline,
WorkflowPerformanceRunResult current)
{
return new WorkflowPerformanceBaselineComparison
{
Status = "Compared",
BaselineJsonPath = baselineJsonPath,
ThroughputDeltaPercent = CalculateDeltaPercent(baseline.ThroughputPerSecond, current.ThroughputPerSecond),
AverageLatencyDeltaPercent = CalculateDeltaPercent(
baseline.LatencySummary?.AverageMilliseconds,
current.LatencySummary?.AverageMilliseconds),
P95LatencyDeltaPercent = CalculateDeltaPercent(
baseline.LatencySummary?.P95Milliseconds,
current.LatencySummary?.P95Milliseconds),
MaxLatencyDeltaPercent = CalculateDeltaPercent(
baseline.LatencySummary?.MaxMilliseconds,
current.LatencySummary?.MaxMilliseconds),
};
}
private static double? CalculateDeltaPercent(double? baseline, double? current)
{
if (!baseline.HasValue || !current.HasValue)
{
return null;
}
if (Math.Abs(baseline.Value) < 0.0001d)
{
return null;
}
return ((current.Value - baseline.Value) / baseline.Value) * 100d;
}
}
public sealed record WorkflowPerformanceBackendMetrics
{
public required string BackendName { get; init; }
public string? InstanceName { get; init; }
public string? HostName { get; init; }
public string? Version { get; init; }
public Dictionary<string, long> CounterDeltas { get; init; } = [];
public Dictionary<string, long> DurationDeltas { get; init; } = [];
public Dictionary<string, string> Metadata { get; init; } = [];
public IReadOnlyList<WorkflowPerformanceWaitMetric> TopWaitDeltas { get; init; } = [];
}
public sealed record WorkflowPerformanceWaitMetric
{
public required string Name { get; init; }
public required long TotalCount { get; init; }
public required long DurationMicroseconds { get; init; }
}
public static class WorkflowPerformanceArtifactWriter
{
private static readonly JsonSerializerOptions JsonOptions = new()
{
WriteIndented = true,
};
public static WorkflowPerformanceArtifactPaths Write(WorkflowPerformanceRunResult result)
{
var baseDirectory = Path.Combine(
TestContext.CurrentContext.WorkDirectory,
"TestResults",
"workflow-performance",
SanitizePathSegment(result.Tier));
Directory.CreateDirectory(baseDirectory);
var resultWithComparison = result with
{
BaselineComparison = LoadBaselineComparison(baseDirectory, result),
};
var fileStem = $"{result.StartedAtUtc:yyyyMMddTHHmmssfff}-{SanitizePathSegment(result.ScenarioName)}";
var jsonPath = Path.Combine(baseDirectory, $"{fileStem}.json");
var markdownPath = Path.Combine(baseDirectory, $"{fileStem}.md");
File.WriteAllText(jsonPath, JsonSerializer.Serialize(resultWithComparison, JsonOptions));
File.WriteAllText(markdownPath, BuildMarkdown(resultWithComparison));
TestContext.AddTestAttachment(jsonPath);
TestContext.AddTestAttachment(markdownPath);
TestContext.Progress.WriteLine($"Performance artifacts: {jsonPath}");
return new WorkflowPerformanceArtifactPaths
{
JsonPath = jsonPath,
MarkdownPath = markdownPath,
};
}
private static string BuildMarkdown(WorkflowPerformanceRunResult result)
{
var builder = new StringBuilder();
builder.AppendLine($"# {result.ScenarioName}");
builder.AppendLine();
builder.AppendLine($"- Tier: `{result.Tier}`");
builder.AppendLine($"- Environment: `{result.EnvironmentName}`");
builder.AppendLine($"- Started: `{result.StartedAtUtc:O}`");
builder.AppendLine($"- Completed: `{result.CompletedAtUtc:O}`");
builder.AppendLine($"- Duration ms: `{result.DurationMilliseconds.ToString("F2", CultureInfo.InvariantCulture)}`");
builder.AppendLine($"- Operations: `{result.OperationCount}`");
builder.AppendLine($"- Concurrency: `{result.Concurrency}`");
builder.AppendLine($"- Throughput/sec: `{result.ThroughputPerSecond.ToString("F2", CultureInfo.InvariantCulture)}`");
if (result.BaselineComparison is not null)
{
builder.AppendLine();
builder.AppendLine("## Baseline Comparison");
builder.AppendLine();
builder.AppendLine($"- Status: `{result.BaselineComparison.Status}`");
if (!string.IsNullOrWhiteSpace(result.BaselineComparison.BaselineJsonPath))
{
builder.AppendLine($"- BaselineJsonPath: `{result.BaselineComparison.BaselineJsonPath}`");
}
if (result.BaselineComparison.ThroughputDeltaPercent.HasValue)
{
builder.AppendLine($"- Throughput delta %: `{result.BaselineComparison.ThroughputDeltaPercent.Value.ToString("F2", CultureInfo.InvariantCulture)}`");
}
if (result.BaselineComparison.AverageLatencyDeltaPercent.HasValue)
{
builder.AppendLine($"- Avg latency delta %: `{result.BaselineComparison.AverageLatencyDeltaPercent.Value.ToString("F2", CultureInfo.InvariantCulture)}`");
}
if (result.BaselineComparison.P95LatencyDeltaPercent.HasValue)
{
builder.AppendLine($"- P95 latency delta %: `{result.BaselineComparison.P95LatencyDeltaPercent.Value.ToString("F2", CultureInfo.InvariantCulture)}`");
}
if (result.BaselineComparison.MaxLatencyDeltaPercent.HasValue)
{
builder.AppendLine($"- Max latency delta %: `{result.BaselineComparison.MaxLatencyDeltaPercent.Value.ToString("F2", CultureInfo.InvariantCulture)}`");
}
}
builder.AppendLine();
builder.AppendLine("## Counters");
builder.AppendLine();
builder.AppendLine($"- WorkflowsStarted: `{result.Counters.WorkflowsStarted}`");
builder.AppendLine($"- TasksActivated: `{result.Counters.TasksActivated}`");
builder.AppendLine($"- TasksCompleted: `{result.Counters.TasksCompleted}`");
builder.AppendLine($"- SignalsPublished: `{result.Counters.SignalsPublished}`");
builder.AppendLine($"- SignalsProcessed: `{result.Counters.SignalsProcessed}`");
builder.AppendLine($"- SignalsIgnored: `{result.Counters.SignalsIgnored}`");
builder.AppendLine($"- DeadLetteredSignals: `{result.Counters.DeadLetteredSignals}`");
builder.AppendLine($"- RuntimeConflicts: `{result.Counters.RuntimeConflicts}`");
builder.AppendLine($"- Failures: `{result.Counters.Failures}`");
builder.AppendLine($"- StuckInstances: `{result.Counters.StuckInstances}`");
if (result.LatencySummary is not null)
{
builder.AppendLine();
builder.AppendLine("## Latency");
builder.AppendLine();
builder.AppendLine($"- Samples: `{result.LatencySummary.SampleCount}`");
builder.AppendLine($"- Avg ms: `{result.LatencySummary.AverageMilliseconds.ToString("F2", CultureInfo.InvariantCulture)}`");
builder.AppendLine($"- P50 ms: `{result.LatencySummary.P50Milliseconds.ToString("F2", CultureInfo.InvariantCulture)}`");
builder.AppendLine($"- P95 ms: `{result.LatencySummary.P95Milliseconds.ToString("F2", CultureInfo.InvariantCulture)}`");
builder.AppendLine($"- P99 ms: `{result.LatencySummary.P99Milliseconds.ToString("F2", CultureInfo.InvariantCulture)}`");
builder.AppendLine($"- Max ms: `{result.LatencySummary.MaxMilliseconds.ToString("F2", CultureInfo.InvariantCulture)}`");
}
if (result.PhaseLatencySummaries is not null && result.PhaseLatencySummaries.Count > 0)
{
builder.AppendLine();
builder.AppendLine("## Phase Latency");
builder.AppendLine();
foreach (var phase in result.PhaseLatencySummaries.OrderBy(item => item.Key, StringComparer.Ordinal))
{
builder.AppendLine($"### {phase.Key}");
builder.AppendLine();
builder.AppendLine($"- Samples: `{phase.Value.SampleCount}`");
builder.AppendLine($"- Average ms: `{phase.Value.AverageMilliseconds.ToString("F2", CultureInfo.InvariantCulture)}`");
builder.AppendLine($"- P50 ms: `{phase.Value.P50Milliseconds.ToString("F2", CultureInfo.InvariantCulture)}`");
builder.AppendLine($"- P95 ms: `{phase.Value.P95Milliseconds.ToString("F2", CultureInfo.InvariantCulture)}`");
builder.AppendLine($"- P99 ms: `{phase.Value.P99Milliseconds.ToString("F2", CultureInfo.InvariantCulture)}`");
builder.AppendLine($"- Max ms: `{phase.Value.MaxMilliseconds.ToString("F2", CultureInfo.InvariantCulture)}`");
builder.AppendLine();
}
}
builder.AppendLine();
builder.AppendLine("## Resources");
builder.AppendLine();
builder.AppendLine($"- WorkingSetBytes: `{result.ResourceSnapshot.WorkingSetBytes}`");
builder.AppendLine($"- PrivateMemoryBytes: `{result.ResourceSnapshot.PrivateMemoryBytes}`");
builder.AppendLine($"- MachineName: `{result.ResourceSnapshot.MachineName}`");
builder.AppendLine($"- Framework: `{result.ResourceSnapshot.FrameworkDescription}`");
builder.AppendLine($"- OS: `{result.ResourceSnapshot.OsDescription}`");
if (result.BackendMetrics is not null)
{
builder.AppendLine();
builder.AppendLine("## Backend Metrics");
builder.AppendLine();
builder.AppendLine($"- BackendName: `{result.BackendMetrics.BackendName}`");
if (!string.IsNullOrWhiteSpace(result.BackendMetrics.InstanceName))
{
builder.AppendLine($"- InstanceName: `{result.BackendMetrics.InstanceName}`");
}
if (!string.IsNullOrWhiteSpace(result.BackendMetrics.HostName))
{
builder.AppendLine($"- HostName: `{result.BackendMetrics.HostName}`");
}
if (!string.IsNullOrWhiteSpace(result.BackendMetrics.Version))
{
builder.AppendLine($"- Version: `{result.BackendMetrics.Version}`");
}
if (result.BackendMetrics.CounterDeltas.Count > 0)
{
builder.AppendLine();
builder.AppendLine("### Counter Deltas");
builder.AppendLine();
foreach (var pair in result.BackendMetrics.CounterDeltas.OrderBy(pair => pair.Key, StringComparer.OrdinalIgnoreCase))
{
builder.AppendLine($"- {pair.Key}: `{pair.Value}`");
}
}
if (result.BackendMetrics.DurationDeltas.Count > 0)
{
builder.AppendLine();
builder.AppendLine("### Duration Deltas");
builder.AppendLine();
foreach (var pair in result.BackendMetrics.DurationDeltas.OrderBy(pair => pair.Key, StringComparer.OrdinalIgnoreCase))
{
builder.AppendLine($"- {pair.Key}: `{pair.Value}`");
}
}
if (result.BackendMetrics.TopWaitDeltas.Count > 0)
{
builder.AppendLine();
builder.AppendLine("### Top Wait Deltas");
builder.AppendLine();
foreach (var wait in result.BackendMetrics.TopWaitDeltas)
{
builder.AppendLine($"- {wait.Name}: count=`{wait.TotalCount}`, duration_micro=`{wait.DurationMicroseconds}`");
}
}
if (result.BackendMetrics.Metadata.Count > 0)
{
builder.AppendLine();
builder.AppendLine("### Backend Metadata");
builder.AppendLine();
foreach (var pair in result.BackendMetrics.Metadata.OrderBy(pair => pair.Key, StringComparer.Ordinal))
{
builder.AppendLine($"- {pair.Key}: `{pair.Value}`");
}
}
}
if (result.Metadata.Count > 0)
{
builder.AppendLine();
builder.AppendLine("## Metadata");
builder.AppendLine();
foreach (var pair in result.Metadata.OrderBy(pair => pair.Key, StringComparer.Ordinal))
{
builder.AppendLine($"- {pair.Key}: `{pair.Value}`");
}
}
return builder.ToString();
}
private static string SanitizePathSegment(string value)
{
var invalidCharacters = Path.GetInvalidFileNameChars();
var sanitized = new string(value
.Select(character => invalidCharacters.Contains(character) ? '-' : character)
.ToArray());
return sanitized.Replace(' ', '-');
}
private static WorkflowPerformanceBaselineComparison LoadBaselineComparison(
string baseDirectory,
WorkflowPerformanceRunResult current)
{
var scenarioSuffix = $"-{SanitizePathSegment(current.ScenarioName)}.json";
var baselinePath = Directory
.EnumerateFiles(baseDirectory, "*.json", SearchOption.TopDirectoryOnly)
.Where(path => path.EndsWith(scenarioSuffix, StringComparison.OrdinalIgnoreCase))
.OrderByDescending(path => path, StringComparer.OrdinalIgnoreCase)
.FirstOrDefault();
if (baselinePath is null)
{
return WorkflowPerformanceBaselineComparison.Missing();
}
try
{
var baseline = JsonSerializer.Deserialize<WorkflowPerformanceRunResult>(
File.ReadAllText(baselinePath),
JsonOptions);
return baseline is null
? WorkflowPerformanceBaselineComparison.Missing()
: WorkflowPerformanceBaselineComparison.Compare(baselinePath, baseline, current);
}
catch
{
return WorkflowPerformanceBaselineComparison.Missing();
}
}
}
public sealed record WorkflowPerformanceArtifactPaths
{
public required string JsonPath { get; init; }
public required string MarkdownPath { get; init; }
}

View File

@@ -0,0 +1,340 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Text;
using System.Text.Json;
using NUnit.Framework;
namespace StellaOps.Workflow.IntegrationTests.Shared.Performance;
internal sealed record WorkflowPerformanceComparisonMatrixResult
{
public required string MatrixName { get; init; }
public required DateTime GeneratedAtUtc { get; init; }
public required IReadOnlyList<string> Columns { get; init; }
public required IReadOnlyList<WorkflowPerformanceComparisonMatrixRow> Rows { get; init; }
public required IReadOnlyList<WorkflowPerformanceComparisonSource> Sources { get; init; }
public required IReadOnlyList<WorkflowPerformanceComparisonIntegrityCheck> IntegrityChecks { get; init; }
}
internal sealed record WorkflowPerformanceComparisonMatrixRow
{
public required string Section { get; init; }
public required string Metric { get; init; }
public required string Unit { get; init; }
public required Dictionary<string, double> Values { get; init; }
}
internal sealed record WorkflowPerformanceComparisonSource
{
public required string Column { get; init; }
public required string LatencyScenarioName { get; init; }
public required string LatencyArtifactPath { get; init; }
public required string ThroughputScenarioName { get; init; }
public required string ThroughputArtifactPath { get; init; }
}
internal sealed record WorkflowPerformanceComparisonIntegrityCheck
{
public required string Column { get; init; }
public required bool Passed { get; init; }
public required IReadOnlyList<string> Checks { get; init; }
}
internal sealed record WorkflowPerformanceComparisonArtifactPaths
{
public required string JsonPath { get; init; }
public required string MarkdownPath { get; init; }
}
internal static class WorkflowPerformanceComparisonMatrixWriter
{
private static readonly JsonSerializerOptions JsonOptions = new()
{
WriteIndented = true,
};
private static readonly ComparisonProfile[] Profiles =
[
new("Oracle", "oracle-aq-signal-roundtrip-latency-serial", "oracle-aq-signal-roundtrip-throughput-parallel"),
new("PostgreSQL", "postgres-signal-roundtrip-latency-serial", "postgres-signal-roundtrip-throughput-parallel"),
new("Mongo", "mongo-signal-roundtrip-latency-serial", "mongo-signal-roundtrip-throughput-parallel"),
new("Oracle+Redis", "oracle-redis-signal-roundtrip-latency-serial", "oracle-redis-signal-roundtrip-throughput-parallel"),
new("PostgreSQL+Redis", "postgres-redis-signal-roundtrip-latency-serial", "postgres-redis-signal-roundtrip-throughput-parallel"),
new("Mongo+Redis", "mongo-redis-signal-roundtrip-latency-serial", "mongo-redis-signal-roundtrip-throughput-parallel"),
];
public static WorkflowPerformanceComparisonMatrixResult BuildSixProfileSignalMatrix()
{
var scenarios = Profiles
.Select(profile => LoadScenarioPair(profile))
.ToArray();
return new WorkflowPerformanceComparisonMatrixResult
{
MatrixName = "workflow-backend-signal-roundtrip-six-profile-matrix",
GeneratedAtUtc = DateTime.UtcNow,
Columns = Profiles.Select(profile => profile.ColumnName).ToArray(),
Rows =
[
CreateRow("Serial Latency", "End-to-end avg", "ms", scenarios, scenario => scenario.Latency.LatencySummary?.AverageMilliseconds),
CreateRow("Serial Latency", "End-to-end p95", "ms", scenarios, scenario => scenario.Latency.LatencySummary?.P95Milliseconds),
CreateRow("Serial Latency", "Start avg", "ms", scenarios, scenario => GetPhaseAverage(scenario.Latency, "start")),
CreateRow("Serial Latency", "Signal publish avg", "ms", scenarios, scenario => GetPhaseAverage(scenario.Latency, "signalPublish")),
CreateRow("Serial Latency", "Signal to first completion avg", "ms", scenarios, scenario => GetPhaseAverage(scenario.Latency, "signalToFirstCompletion")),
CreateRow("Serial Latency", "Signal to completion avg", "ms", scenarios, scenario => GetPhaseAverage(scenario.Latency, "signalToCompletion")),
CreateRow("Serial Latency", "Drain-to-idle overhang avg", "ms", scenarios, scenario => GetPhaseAverage(scenario.Latency, "drainToIdleOverhang")),
CreateRow("Parallel Throughput", "Throughput", "ops/s", scenarios, scenario => scenario.Throughput.ThroughputPerSecond),
CreateRow("Parallel Throughput", "End-to-end avg", "ms", scenarios, scenario => scenario.Throughput.LatencySummary?.AverageMilliseconds),
CreateRow("Parallel Throughput", "End-to-end p95", "ms", scenarios, scenario => scenario.Throughput.LatencySummary?.P95Milliseconds),
CreateRow("Parallel Throughput", "Start avg", "ms", scenarios, scenario => GetPhaseAverage(scenario.Throughput, "start")),
CreateRow("Parallel Throughput", "Signal publish avg", "ms", scenarios, scenario => GetPhaseAverage(scenario.Throughput, "signalPublish")),
CreateRow("Parallel Throughput", "Signal to completion avg", "ms", scenarios, scenario => GetPhaseAverage(scenario.Throughput, "signalToCompletion")),
],
Sources = scenarios.Select(scenario => new WorkflowPerformanceComparisonSource
{
Column = scenario.Profile.ColumnName,
LatencyScenarioName = scenario.Profile.LatencyScenarioName,
LatencyArtifactPath = scenario.LatencyPath,
ThroughputScenarioName = scenario.Profile.ThroughputScenarioName,
ThroughputArtifactPath = scenario.ThroughputPath,
}).ToArray(),
IntegrityChecks = scenarios.Select(ValidateScenarioPair).ToArray(),
};
}
public static WorkflowPerformanceComparisonArtifactPaths Write(WorkflowPerformanceComparisonMatrixResult result)
{
var baseDirectory = Path.Combine(
TestContext.CurrentContext.WorkDirectory,
"TestResults",
"workflow-performance",
WorkflowPerformanceCategories.Comparison);
Directory.CreateDirectory(baseDirectory);
var fileStem = $"{result.GeneratedAtUtc:yyyyMMddTHHmmssfff}-{SanitizePathSegment(result.MatrixName)}";
var jsonPath = Path.Combine(baseDirectory, $"{fileStem}.json");
var markdownPath = Path.Combine(baseDirectory, $"{fileStem}.md");
File.WriteAllText(jsonPath, JsonSerializer.Serialize(result, JsonOptions));
File.WriteAllText(markdownPath, BuildMarkdown(result));
TestContext.AddTestAttachment(jsonPath);
TestContext.AddTestAttachment(markdownPath);
return new WorkflowPerformanceComparisonArtifactPaths
{
JsonPath = jsonPath,
MarkdownPath = markdownPath,
};
}
private static WorkflowPerformanceComparisonIntegrityCheck ValidateScenarioPair(LoadedScenarioPair pair)
{
var checks = new List<string>();
checks.Add(BuildIntegrityCheck("latency.failures", pair.Latency.Counters.Failures == 0, pair.Latency.Counters.Failures));
checks.Add(BuildIntegrityCheck("latency.deadLetteredSignals", pair.Latency.Counters.DeadLetteredSignals == 0, pair.Latency.Counters.DeadLetteredSignals));
checks.Add(BuildIntegrityCheck("latency.runtimeConflicts", pair.Latency.Counters.RuntimeConflicts == 0, pair.Latency.Counters.RuntimeConflicts));
checks.Add(BuildIntegrityCheck("latency.stuckInstances", pair.Latency.Counters.StuckInstances == 0, pair.Latency.Counters.StuckInstances));
checks.Add(BuildIntegrityCheck("latency.workflowsStarted", pair.Latency.Counters.WorkflowsStarted == pair.Latency.OperationCount, pair.Latency.Counters.WorkflowsStarted));
checks.Add(BuildIntegrityCheck("latency.signalsPublished", pair.Latency.Counters.SignalsPublished == pair.Latency.OperationCount, pair.Latency.Counters.SignalsPublished));
checks.Add(BuildIntegrityCheck("latency.signalsProcessed", pair.Latency.Counters.SignalsProcessed == pair.Latency.OperationCount, pair.Latency.Counters.SignalsProcessed));
checks.Add(BuildIntegrityCheck("throughput.failures", pair.Throughput.Counters.Failures == 0, pair.Throughput.Counters.Failures));
checks.Add(BuildIntegrityCheck("throughput.deadLetteredSignals", pair.Throughput.Counters.DeadLetteredSignals == 0, pair.Throughput.Counters.DeadLetteredSignals));
checks.Add(BuildIntegrityCheck("throughput.runtimeConflicts", pair.Throughput.Counters.RuntimeConflicts == 0, pair.Throughput.Counters.RuntimeConflicts));
checks.Add(BuildIntegrityCheck("throughput.stuckInstances", pair.Throughput.Counters.StuckInstances == 0, pair.Throughput.Counters.StuckInstances));
checks.Add(BuildIntegrityCheck("throughput.workflowsStarted", pair.Throughput.Counters.WorkflowsStarted == pair.Throughput.OperationCount, pair.Throughput.Counters.WorkflowsStarted));
checks.Add(BuildIntegrityCheck("throughput.signalsPublished", pair.Throughput.Counters.SignalsPublished == pair.Throughput.OperationCount, pair.Throughput.Counters.SignalsPublished));
checks.Add(BuildIntegrityCheck("throughput.signalsProcessed", pair.Throughput.Counters.SignalsProcessed == pair.Throughput.OperationCount, pair.Throughput.Counters.SignalsProcessed));
return new WorkflowPerformanceComparisonIntegrityCheck
{
Column = pair.Profile.ColumnName,
Passed = checks.All(check => check.EndsWith(":passed", StringComparison.Ordinal)),
Checks = checks,
};
}
private static string BuildIntegrityCheck(string name, bool success, int actualValue)
{
return $"{name}:{actualValue}:{(success ? "passed" : "failed")}";
}
private static WorkflowPerformanceComparisonMatrixRow CreateRow(
string section,
string metric,
string unit,
IReadOnlyList<LoadedScenarioPair> scenarios,
Func<LoadedScenarioPair, double?> valueSelector)
{
var values = new Dictionary<string, double>(StringComparer.Ordinal);
foreach (var scenario in scenarios)
{
var value = valueSelector(scenario)
?? throw new InvalidOperationException(
$"Matrix metric '{metric}' for column '{scenario.Profile.ColumnName}' is missing from source artifact.");
values[scenario.Profile.ColumnName] = value;
}
return new WorkflowPerformanceComparisonMatrixRow
{
Section = section,
Metric = metric,
Unit = unit,
Values = values,
};
}
private static LoadedScenarioPair LoadScenarioPair(ComparisonProfile profile)
{
var latencyPath = FindLatestArtifactPath(profile.LatencyScenarioName);
var throughputPath = FindLatestArtifactPath(profile.ThroughputScenarioName);
var latency = JsonSerializer.Deserialize<WorkflowPerformanceRunResult>(File.ReadAllText(latencyPath), JsonOptions)
?? throw new InvalidOperationException($"Unable to deserialize performance artifact '{latencyPath}'.");
var throughput = JsonSerializer.Deserialize<WorkflowPerformanceRunResult>(File.ReadAllText(throughputPath), JsonOptions)
?? throw new InvalidOperationException($"Unable to deserialize performance artifact '{throughputPath}'.");
if (!string.Equals(latency.ScenarioName, profile.LatencyScenarioName, StringComparison.Ordinal))
{
throw new InvalidOperationException(
$"Artifact '{latencyPath}' contained scenario '{latency.ScenarioName}' instead of expected '{profile.LatencyScenarioName}'.");
}
if (!string.Equals(throughput.ScenarioName, profile.ThroughputScenarioName, StringComparison.Ordinal))
{
throw new InvalidOperationException(
$"Artifact '{throughputPath}' contained scenario '{throughput.ScenarioName}' instead of expected '{profile.ThroughputScenarioName}'.");
}
return new LoadedScenarioPair(profile, latency, latencyPath, throughput, throughputPath);
}
private static string FindLatestArtifactPath(string scenarioName)
{
var baseDirectory = Path.Combine(
TestContext.CurrentContext.WorkDirectory,
"TestResults",
"workflow-performance");
var fileSuffix = $"-{SanitizePathSegment(scenarioName)}.json";
var path = Directory
.EnumerateFiles(baseDirectory, "*.json", SearchOption.AllDirectories)
.Where(filePath => filePath.EndsWith(fileSuffix, StringComparison.OrdinalIgnoreCase))
.OrderByDescending(filePath => File.GetLastWriteTimeUtc(filePath))
.FirstOrDefault();
return path ?? throw new InvalidOperationException(
$"Required performance artifact for scenario '{scenarioName}' was not found under '{baseDirectory}'.");
}
private static double? GetPhaseAverage(WorkflowPerformanceRunResult result, string phaseName)
{
return result.PhaseLatencySummaries is not null
&& result.PhaseLatencySummaries.TryGetValue(phaseName, out var summary)
? summary.AverageMilliseconds
: null;
}
private static string BuildMarkdown(WorkflowPerformanceComparisonMatrixResult result)
{
var builder = new StringBuilder();
builder.AppendLine($"# {result.MatrixName}");
builder.AppendLine();
builder.AppendLine($"- GeneratedAtUtc: `{result.GeneratedAtUtc:O}`");
builder.AppendLine("- SourcePolicy: `artifact-driven-only`");
builder.AppendLine("- Guarantee: every matrix cell is read from a measured JSON artifact; no hand-entered metric values are allowed.");
builder.AppendLine();
foreach (var sectionGroup in result.Rows.GroupBy(row => row.Section, StringComparer.Ordinal))
{
builder.AppendLine($"## {sectionGroup.Key}");
builder.AppendLine();
builder.Append("| Metric | Unit |");
foreach (var column in result.Columns)
{
builder.Append(' ').Append(column).Append(" |");
}
builder.AppendLine();
builder.Append("| --- | --- |");
foreach (var _ in result.Columns)
{
builder.Append(" ---: |");
}
builder.AppendLine();
foreach (var row in sectionGroup)
{
builder.Append("| ").Append(row.Metric).Append(" | ").Append(row.Unit).Append(" |");
foreach (var column in result.Columns)
{
builder.Append(' ')
.Append(row.Values[column].ToString("F2", CultureInfo.InvariantCulture))
.Append(" |");
}
builder.AppendLine();
}
builder.AppendLine();
}
builder.AppendLine("## Integrity");
builder.AppendLine();
foreach (var check in result.IntegrityChecks)
{
builder.AppendLine($"### {check.Column}");
builder.AppendLine();
builder.AppendLine($"- Passed: `{check.Passed}`");
foreach (var item in check.Checks)
{
builder.AppendLine($"- {item}");
}
builder.AppendLine();
}
builder.AppendLine("## Sources");
builder.AppendLine();
foreach (var source in result.Sources)
{
builder.AppendLine($"### {source.Column}");
builder.AppendLine();
builder.AppendLine($"- Latency scenario: `{source.LatencyScenarioName}`");
builder.AppendLine($"- Latency artifact: `{source.LatencyArtifactPath}`");
builder.AppendLine($"- Throughput scenario: `{source.ThroughputScenarioName}`");
builder.AppendLine($"- Throughput artifact: `{source.ThroughputArtifactPath}`");
builder.AppendLine();
}
return builder.ToString();
}
private static string SanitizePathSegment(string value)
{
var invalidCharacters = Path.GetInvalidFileNameChars();
var sanitized = new string(value
.Select(character => invalidCharacters.Contains(character) ? '-' : character)
.ToArray());
return sanitized.Replace(' ', '-');
}
private sealed record ComparisonProfile(
string ColumnName,
string LatencyScenarioName,
string ThroughputScenarioName);
private sealed record LoadedScenarioPair(
ComparisonProfile Profile,
WorkflowPerformanceRunResult Latency,
string LatencyPath,
WorkflowPerformanceRunResult Throughput,
string ThroughputPath);
}

View File

@@ -0,0 +1,35 @@
using System.Linq;
using FluentAssertions;
using NUnit.Framework;
namespace StellaOps.Workflow.IntegrationTests.Shared.Performance;
[TestFixture]
[Category("Integration")]
[NonParallelizable]
public class WorkflowPerformanceComparisonMatrixTests
{
[Test]
[Category(WorkflowPerformanceCategories.Comparison)]
public void WorkflowBackendSignalRoundTripMatrix_WhenLatestArtifactsAreLoaded_ShouldWriteSixProfileComparison()
{
var result = WorkflowPerformanceComparisonMatrixWriter.BuildSixProfileSignalMatrix();
var artifacts = WorkflowPerformanceComparisonMatrixWriter.Write(result);
result.Columns.Should().ContainInOrder(
"Oracle",
"PostgreSQL",
"Mongo",
"Oracle+Redis",
"PostgreSQL+Redis",
"Mongo+Redis");
result.Rows.Should().NotBeEmpty();
result.Rows.Select(row => row.Section).Should().Contain("Serial Latency");
result.Rows.Select(row => row.Section).Should().Contain("Parallel Throughput");
result.IntegrityChecks.Should().OnlyContain(check => check.Passed);
artifacts.JsonPath.Should().NotBeNullOrWhiteSpace();
artifacts.MarkdownPath.Should().NotBeNullOrWhiteSpace();
}
}

View File

@@ -0,0 +1,32 @@
using System;
namespace StellaOps.Workflow.IntegrationTests.Shared.Performance;
public sealed record WorkflowSignalDrainTelemetry
{
public required int ProcessedCount { get; init; }
public required int TotalRounds { get; init; }
public required int IdleEmptyRounds { get; init; }
public required DateTime StartedAtUtc { get; init; }
public required DateTime CompletedAtUtc { get; init; }
public DateTime? FirstProcessedAtUtc { get; init; }
public DateTime? LastProcessedAtUtc { get; init; }
public double DurationMilliseconds =>
(CompletedAtUtc - StartedAtUtc).TotalMilliseconds;
public double? TimeToFirstProcessedMilliseconds =>
FirstProcessedAtUtc.HasValue
? (FirstProcessedAtUtc.Value - StartedAtUtc).TotalMilliseconds
: null;
public double? TimeToLastProcessedMilliseconds =>
LastProcessedAtUtc.HasValue
? (LastProcessedAtUtc.Value - StartedAtUtc).TotalMilliseconds
: null;
public double? DrainToIdleOverhangMilliseconds =>
LastProcessedAtUtc.HasValue
? (CompletedAtUtc - LastProcessedAtUtc.Value).TotalMilliseconds
: null;
}

View File

@@ -0,0 +1,35 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<ManagePackageVersionsCentrally>false</ManagePackageVersionsCentrally>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<IsPackable>false</IsPackable>
<NoWarn>CS8601;CS8602;CS8604;NU1015</NoWarn>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="FluentAssertions" Version="6.12.0" />
<PackageReference Include="Moq" Version="4.20.72" />
<PackageReference Include="NUnit" Version="4.2.2" />
</ItemGroup>
<!-- TODO: These test files reference Serdica platform-specific bootstrap types (AddWorkflowPlatformServices,
Engine.Studio, IAuthorizationService, HealthCheckService, RedisDockerFixture) that are not in this repository.
Re-enable when those platform services are ported to Stella. -->
<ItemGroup>
<Compile Remove="WorkflowPlatformBootstrapTests.cs" />
<Compile Remove="WorkflowPlatformRedisSignalDriverBootstrapTests.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\__Libraries\StellaOps.Workflow.Abstractions\StellaOps.Workflow.Abstractions.csproj" />
<ProjectReference Include="..\..\__Libraries\StellaOps.Workflow.Contracts\StellaOps.Workflow.Contracts.csproj" />
<ProjectReference Include="..\..\__Libraries\StellaOps.Workflow.Engine\StellaOps.Workflow.Engine.csproj" />
<ProjectReference Include="..\..\__Libraries\StellaOps.Workflow.DataStore.MongoDB\StellaOps.Workflow.DataStore.MongoDB.csproj" />
<ProjectReference Include="..\..\__Libraries\StellaOps.Workflow.DataStore.PostgreSQL\StellaOps.Workflow.DataStore.PostgreSQL.csproj" />
<ProjectReference Include="..\..\__Libraries\StellaOps.Workflow.DataStore.Oracle\StellaOps.Workflow.DataStore.Oracle.csproj" />
<ProjectReference Include="..\..\__Libraries\StellaOps.Workflow.Signaling.Redis\StellaOps.Workflow.Signaling.Redis.csproj" />
<ProjectReference Include="..\..\__Libraries\StellaOps.Workflow.Signaling.OracleAq\StellaOps.Workflow.Signaling.OracleAq.csproj" />
</ItemGroup>
</Project>

View File

@@ -0,0 +1,132 @@
using System.Collections.Generic;
using StellaOps.Workflow.Abstractions;
using StellaOps.Workflow.Contracts;
namespace StellaOps.Workflow.IntegrationTests.Shared.TransportProbes;
public sealed class TransportProbeWorkflowRequest
{
[WorkflowBusinessId]
public required string ProbeKey { get; init; }
}
public sealed class LegacyRabbitOutcomeProbeWorkflow
: IDeclarativeWorkflow<TransportProbeWorkflowRequest>
{
private static readonly LegacyRabbitAddress ProbeAddress = new("integration.legacy.probe");
public string WorkflowName => "IntegrationLegacyRabbitOutcomeProbe";
public string WorkflowVersion => "1.0.0";
public string DisplayName => "Integration Legacy Rabbit Outcome Probe";
public IReadOnlyCollection<string> WorkflowRoles => [];
public IReadOnlyCollection<WorkflowTaskDescriptor> Tasks => Spec.TaskDescriptors;
public WorkflowSpec<TransportProbeWorkflowRequest> Spec { get; } = WorkflowSpec.For<TransportProbeWorkflowRequest>()
.InitializeState(startRequest => new
{
probeKey = startRequest.ProbeKey,
outcome = "pending",
})
.StartWith(flow => flow
.Call(
"Invoke Legacy Rabbit",
ProbeAddress,
context => new { probeKey = context.StateValues["probeKey"].Get<string>() },
WorkflowHandledBranchAction.Complete,
WorkflowHandledBranchAction.Complete)
.Set("outcome", _ => "success")
.Complete())
.Build();
}
public sealed class MicroserviceOutcomeProbeWorkflow
: IDeclarativeWorkflow<TransportProbeWorkflowRequest>
{
private static readonly Address ProbeAddress = new("integration-probe", "probe.microservice");
public string WorkflowName => "IntegrationMicroserviceOutcomeProbe";
public string WorkflowVersion => "1.0.0";
public string DisplayName => "Integration Microservice Outcome Probe";
public IReadOnlyCollection<string> WorkflowRoles => [];
public IReadOnlyCollection<WorkflowTaskDescriptor> Tasks => Spec.TaskDescriptors;
public WorkflowSpec<TransportProbeWorkflowRequest> Spec { get; } = WorkflowSpec.For<TransportProbeWorkflowRequest>()
.InitializeState(startRequest => new
{
probeKey = startRequest.ProbeKey,
outcome = "pending",
})
.StartWith(flow => flow
.Call(
"Invoke Microservice",
ProbeAddress,
context => new { probeKey = context.StateValues["probeKey"].Get<string>() },
WorkflowHandledBranchAction.Complete,
WorkflowHandledBranchAction.Complete)
.Set("outcome", _ => "success")
.Complete())
.Build();
}
public sealed class GraphqlOutcomeProbeWorkflow
: IDeclarativeWorkflow<TransportProbeWorkflowRequest>
{
private static readonly GraphqlAddress ProbeAddress = new(
"stella",
"query IntegrationProbe($probeKey: String!) { probe(probeKey: $probeKey) { ok } }",
"IntegrationProbe");
public string WorkflowName => "IntegrationGraphqlOutcomeProbe";
public string WorkflowVersion => "1.0.0";
public string DisplayName => "Integration Graphql Outcome Probe";
public IReadOnlyCollection<string> WorkflowRoles => [];
public IReadOnlyCollection<WorkflowTaskDescriptor> Tasks => Spec.TaskDescriptors;
public WorkflowSpec<TransportProbeWorkflowRequest> Spec { get; } = WorkflowSpec.For<TransportProbeWorkflowRequest>()
.InitializeState(startRequest => new
{
probeKey = startRequest.ProbeKey,
outcome = "pending",
})
.StartWith(flow => flow
.QueryGraphql(
"Invoke Graphql",
ProbeAddress,
context => new { probeKey = context.StateValues["probeKey"].Get<string>() },
WorkflowHandledBranchAction.Complete,
WorkflowHandledBranchAction.Complete)
.Set("outcome", _ => "success")
.Complete())
.Build();
}
public sealed class HttpOutcomeProbeWorkflow
: IDeclarativeWorkflow<TransportProbeWorkflowRequest>
{
private static readonly HttpAddress ProbeAddress = new("integration-probe", "/probe/http");
public string WorkflowName => "IntegrationHttpOutcomeProbe";
public string WorkflowVersion => "1.0.0";
public string DisplayName => "Integration Http Outcome Probe";
public IReadOnlyCollection<string> WorkflowRoles => [];
public IReadOnlyCollection<WorkflowTaskDescriptor> Tasks => Spec.TaskDescriptors;
public WorkflowSpec<TransportProbeWorkflowRequest> Spec { get; } = WorkflowSpec.For<TransportProbeWorkflowRequest>()
.InitializeState(startRequest => new
{
probeKey = startRequest.ProbeKey,
outcome = "pending",
})
.StartWith(flow => flow
.Call(
"Invoke Http",
ProbeAddress,
context => new { probeKey = context.StateValues["probeKey"].Get<string>() },
WorkflowHandledBranchAction.Complete,
WorkflowHandledBranchAction.Complete)
.Set("outcome", _ => "success")
.Complete())
.Build();
}

View File

@@ -0,0 +1,236 @@
using System;
using System.Collections.Generic;
using System.Text.Json;
using StellaOps.Workflow.Abstractions;
using StellaOps.Workflow.Contracts;
namespace StellaOps.Workflow.IntegrationTests.Shared.TransportProbes;
internal static class TransportUnhandledProbeWorkflowSupport
{
public const string ProbeRoute = "integration/probes";
public const string ProbeTaskName = "Execute Probe";
public const string ProbeTaskType = "IntegrationProbe";
public static readonly string[] ProbeTaskRoles = ["INTEGRATION"];
public static Dictionary<string, JsonElement> BuildInitialState(
TransportProbeWorkflowRequest startRequest)
{
return new Dictionary<string, object?>(StringComparer.OrdinalIgnoreCase)
{
["probeKey"] = startRequest.ProbeKey,
["outcome"] = "pending",
}.AsWorkflowJsonDictionary();
}
public static WorkflowHumanTaskDefinition<TransportProbeWorkflowRequest> CreateProbeTask(
Action<WorkflowFlowBuilder<TransportProbeWorkflowRequest>> onComplete)
{
return WorkflowHumanTask.For<TransportProbeWorkflowRequest>(
ProbeTaskName,
ProbeTaskType,
ProbeRoute,
ProbeTaskRoles)
.WithPayload(context => new
{
probeKey = context.StateValues["probeKey"].Get<string>(),
})
.OnComplete(onComplete);
}
}
public sealed class LegacyRabbitUnhandledStartProbeWorkflow
: IDeclarativeWorkflow<TransportProbeWorkflowRequest>
{
private static readonly LegacyRabbitAddress ProbeAddress = new("integration.legacy.probe");
public string WorkflowName => "IntegrationLegacyRabbitUnhandledStartProbe";
public string WorkflowVersion => "1.0.0";
public string DisplayName => "Integration Legacy Rabbit Unhandled Start Probe";
public IReadOnlyCollection<string> WorkflowRoles => [];
public IReadOnlyCollection<WorkflowTaskDescriptor> Tasks => Spec.TaskDescriptors;
public WorkflowSpec<TransportProbeWorkflowRequest> Spec { get; } = WorkflowSpec.For<TransportProbeWorkflowRequest>()
.InitializeState(TransportUnhandledProbeWorkflowSupport.BuildInitialState)
.StartWith(flow => flow
.Call(
"Invoke Legacy Rabbit",
ProbeAddress,
context => new { probeKey = context.StateValues["probeKey"].Get<string>() })
.Set("outcome", _ => "success")
.Complete())
.Build();
}
public sealed class LegacyRabbitUnhandledCompletionProbeWorkflow
: IDeclarativeWorkflow<TransportProbeWorkflowRequest>
{
private static readonly LegacyRabbitAddress ProbeAddress = new("integration.legacy.probe");
public string WorkflowName => "IntegrationLegacyRabbitUnhandledCompletionProbe";
public string WorkflowVersion => "1.0.0";
public string DisplayName => "Integration Legacy Rabbit Unhandled Completion Probe";
public IReadOnlyCollection<string> WorkflowRoles => [];
public IReadOnlyCollection<WorkflowTaskDescriptor> Tasks => Spec.TaskDescriptors;
public WorkflowSpec<TransportProbeWorkflowRequest> Spec { get; } = WorkflowSpec.For<TransportProbeWorkflowRequest>()
.InitializeState(TransportUnhandledProbeWorkflowSupport.BuildInitialState)
.AddTask(TransportUnhandledProbeWorkflowSupport.CreateProbeTask(flow => flow
.Call(
"Invoke Legacy Rabbit",
ProbeAddress,
context => new { probeKey = context.StateValues["probeKey"].Get<string>() })
.Set("outcome", _ => "success")
.Complete()))
.StartWith(flow => flow.ActivateTask(TransportUnhandledProbeWorkflowSupport.ProbeTaskName))
.Build();
}
public sealed class MicroserviceUnhandledStartProbeWorkflow
: IDeclarativeWorkflow<TransportProbeWorkflowRequest>
{
private static readonly Address ProbeAddress = new("integration-probe", "probe.microservice");
public string WorkflowName => "IntegrationMicroserviceUnhandledStartProbe";
public string WorkflowVersion => "1.0.0";
public string DisplayName => "Integration Microservice Unhandled Start Probe";
public IReadOnlyCollection<string> WorkflowRoles => [];
public IReadOnlyCollection<WorkflowTaskDescriptor> Tasks => Spec.TaskDescriptors;
public WorkflowSpec<TransportProbeWorkflowRequest> Spec { get; } = WorkflowSpec.For<TransportProbeWorkflowRequest>()
.InitializeState(TransportUnhandledProbeWorkflowSupport.BuildInitialState)
.StartWith(flow => flow
.Call(
"Invoke Microservice",
ProbeAddress,
context => new { probeKey = context.StateValues["probeKey"].Get<string>() })
.Set("outcome", _ => "success")
.Complete())
.Build();
}
public sealed class MicroserviceUnhandledCompletionProbeWorkflow
: IDeclarativeWorkflow<TransportProbeWorkflowRequest>
{
private static readonly Address ProbeAddress = new("integration-probe", "probe.microservice");
public string WorkflowName => "IntegrationMicroserviceUnhandledCompletionProbe";
public string WorkflowVersion => "1.0.0";
public string DisplayName => "Integration Microservice Unhandled Completion Probe";
public IReadOnlyCollection<string> WorkflowRoles => [];
public IReadOnlyCollection<WorkflowTaskDescriptor> Tasks => Spec.TaskDescriptors;
public WorkflowSpec<TransportProbeWorkflowRequest> Spec { get; } = WorkflowSpec.For<TransportProbeWorkflowRequest>()
.InitializeState(TransportUnhandledProbeWorkflowSupport.BuildInitialState)
.AddTask(TransportUnhandledProbeWorkflowSupport.CreateProbeTask(flow => flow
.Call(
"Invoke Microservice",
ProbeAddress,
context => new { probeKey = context.StateValues["probeKey"].Get<string>() })
.Set("outcome", _ => "success")
.Complete()))
.StartWith(flow => flow.ActivateTask(TransportUnhandledProbeWorkflowSupport.ProbeTaskName))
.Build();
}
public sealed class GraphqlUnhandledStartProbeWorkflow
: IDeclarativeWorkflow<TransportProbeWorkflowRequest>
{
private static readonly GraphqlAddress ProbeAddress = new(
"stella",
"query IntegrationProbe($probeKey: String!) { probe(probeKey: $probeKey) { ok } }",
"IntegrationProbe");
public string WorkflowName => "IntegrationGraphqlUnhandledStartProbe";
public string WorkflowVersion => "1.0.0";
public string DisplayName => "Integration Graphql Unhandled Start Probe";
public IReadOnlyCollection<string> WorkflowRoles => [];
public IReadOnlyCollection<WorkflowTaskDescriptor> Tasks => Spec.TaskDescriptors;
public WorkflowSpec<TransportProbeWorkflowRequest> Spec { get; } = WorkflowSpec.For<TransportProbeWorkflowRequest>()
.InitializeState(TransportUnhandledProbeWorkflowSupport.BuildInitialState)
.StartWith(flow => flow
.QueryGraphql(
"Invoke Graphql",
ProbeAddress,
context => new { probeKey = context.StateValues["probeKey"].Get<string>() })
.Set("outcome", _ => "success")
.Complete())
.Build();
}
public sealed class GraphqlUnhandledCompletionProbeWorkflow
: IDeclarativeWorkflow<TransportProbeWorkflowRequest>
{
private static readonly GraphqlAddress ProbeAddress = new(
"stella",
"query IntegrationProbe($probeKey: String!) { probe(probeKey: $probeKey) { ok } }",
"IntegrationProbe");
public string WorkflowName => "IntegrationGraphqlUnhandledCompletionProbe";
public string WorkflowVersion => "1.0.0";
public string DisplayName => "Integration Graphql Unhandled Completion Probe";
public IReadOnlyCollection<string> WorkflowRoles => [];
public IReadOnlyCollection<WorkflowTaskDescriptor> Tasks => Spec.TaskDescriptors;
public WorkflowSpec<TransportProbeWorkflowRequest> Spec { get; } = WorkflowSpec.For<TransportProbeWorkflowRequest>()
.InitializeState(TransportUnhandledProbeWorkflowSupport.BuildInitialState)
.AddTask(TransportUnhandledProbeWorkflowSupport.CreateProbeTask(flow => flow
.QueryGraphql(
"Invoke Graphql",
ProbeAddress,
context => new { probeKey = context.StateValues["probeKey"].Get<string>() })
.Set("outcome", _ => "success")
.Complete()))
.StartWith(flow => flow.ActivateTask(TransportUnhandledProbeWorkflowSupport.ProbeTaskName))
.Build();
}
public sealed class HttpUnhandledStartProbeWorkflow
: IDeclarativeWorkflow<TransportProbeWorkflowRequest>
{
private static readonly HttpAddress ProbeAddress = new("integration-probe", "/probe/http");
public string WorkflowName => "IntegrationHttpUnhandledStartProbe";
public string WorkflowVersion => "1.0.0";
public string DisplayName => "Integration Http Unhandled Start Probe";
public IReadOnlyCollection<string> WorkflowRoles => [];
public IReadOnlyCollection<WorkflowTaskDescriptor> Tasks => Spec.TaskDescriptors;
public WorkflowSpec<TransportProbeWorkflowRequest> Spec { get; } = WorkflowSpec.For<TransportProbeWorkflowRequest>()
.InitializeState(TransportUnhandledProbeWorkflowSupport.BuildInitialState)
.StartWith(flow => flow
.Call(
"Invoke Http",
ProbeAddress,
context => new { probeKey = context.StateValues["probeKey"].Get<string>() })
.Set("outcome", _ => "success")
.Complete())
.Build();
}
public sealed class HttpUnhandledCompletionProbeWorkflow
: IDeclarativeWorkflow<TransportProbeWorkflowRequest>
{
private static readonly HttpAddress ProbeAddress = new("integration-probe", "/probe/http");
public string WorkflowName => "IntegrationHttpUnhandledCompletionProbe";
public string WorkflowVersion => "1.0.0";
public string DisplayName => "Integration Http Unhandled Completion Probe";
public IReadOnlyCollection<string> WorkflowRoles => [];
public IReadOnlyCollection<WorkflowTaskDescriptor> Tasks => Spec.TaskDescriptors;
public WorkflowSpec<TransportProbeWorkflowRequest> Spec { get; } = WorkflowSpec.For<TransportProbeWorkflowRequest>()
.InitializeState(TransportUnhandledProbeWorkflowSupport.BuildInitialState)
.AddTask(TransportUnhandledProbeWorkflowSupport.CreateProbeTask(flow => flow
.Call(
"Invoke Http",
ProbeAddress,
context => new { probeKey = context.StateValues["probeKey"].Get<string>() })
.Set("outcome", _ => "success")
.Complete()))
.StartWith(flow => flow.ActivateTask(TransportUnhandledProbeWorkflowSupport.ProbeTaskName))
.Build();
}

View File

@@ -0,0 +1,149 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using StellaOps.Workflow.Engine.Exceptions;
using StellaOps.Workflow.Abstractions;
using StellaOps.Workflow.Contracts;
using StellaOps.Workflow.Engine.Constants;
using StellaOps.Workflow.Engine.Services;
using FluentAssertions;
using NUnit.Framework;
namespace StellaOps.Workflow.IntegrationTests.Shared;
public static class WorkflowIntegrationAssertions
{
public static async Task<StartWorkflowResponse> StartWorkflowOrFailWithInnerAsync(
WorkflowRuntimeService runtimeService,
StartWorkflowRequest request)
{
try
{
return await runtimeService.StartWorkflowAsync(request);
}
catch (BaseResultException exception) when (exception.InnerException is not null)
{
Assert.Fail(exception.InnerException.ToString());
throw;
}
}
public static async Task<WorkflowTaskSummary> GetSingleOpenTaskAsync(
WorkflowRuntimeService runtimeService,
string workflowInstanceId,
string? actorId = null,
IReadOnlyCollection<string>? actorRoles = null)
{
return (await runtimeService.GetTasksAsync(new WorkflowTasksGetRequest
{
WorkflowInstanceId = workflowInstanceId,
ActorId = actorId,
ActorRoles = actorRoles ?? Array.Empty<string>(),
Status = "Open",
})).Tasks.Should().ContainSingle().Subject;
}
public static async Task<WorkflowInstanceGetResponse> GetInstanceAsync(
WorkflowRuntimeService runtimeService,
string workflowInstanceId,
string? actorId = null,
IReadOnlyCollection<string>? actorRoles = null)
{
return await runtimeService.GetInstanceAsync(new WorkflowInstanceGetRequest
{
WorkflowInstanceId = workflowInstanceId,
ActorId = actorId,
ActorRoles = actorRoles ?? Array.Empty<string>(),
});
}
public static async Task<WorkflowInstanceSummary> GetSingleInstanceSummaryAsync(
WorkflowRuntimeService runtimeService,
string workflowName,
string? businessReferenceKey = null,
string? status = null)
{
return (await runtimeService.GetInstancesAsync(new WorkflowInstancesGetRequest
{
WorkflowName = workflowName,
BusinessReferenceKey = businessReferenceKey,
Status = status,
})).Instances.Should().ContainSingle().Subject;
}
public static async Task<WorkflowRuntimeStateRecord> GetRuntimeRecordAsync(
IWorkflowRuntimeStateStore runtimeStateStore,
string workflowInstanceId)
{
var runtimeRecord = await runtimeStateStore.GetAsync(workflowInstanceId);
runtimeRecord.Should().NotBeNull();
return runtimeRecord!;
}
public static async Task<WorkflowTaskSummary> GetSingleTaskByNameAsync(
WorkflowRuntimeService runtimeService,
string workflowInstanceId,
string taskName,
string? actorId = null,
IReadOnlyCollection<string>? actorRoles = null,
string? status = "Open")
{
return (await runtimeService.GetTasksAsync(new WorkflowTasksGetRequest
{
WorkflowInstanceId = workflowInstanceId,
ActorId = actorId,
ActorRoles = actorRoles ?? Array.Empty<string>(),
Status = status,
})).Tasks.Should().ContainSingle(x => string.Equals(x.TaskName, taskName, StringComparison.Ordinal)).Subject;
}
public static void AssertCompleted(WorkflowInstanceGetResponse instance)
{
instance.Instance.Status.Should().Be("Completed");
instance.Instance.RuntimeProvider.Should().Be(WorkflowRuntimeProviderNames.Engine);
instance.Instance.RuntimeStatus.Should().Be("Finished");
instance.RuntimeState.Should().NotBeNull();
instance.RuntimeState!.RuntimeStatus.Should().Be("Finished");
}
public static void AssertRunning(WorkflowInstanceGetResponse instance)
{
instance.Instance.Status.Should().Be("Open");
instance.Instance.RuntimeProvider.Should().Be(WorkflowRuntimeProviderNames.Engine);
instance.RuntimeState.Should().NotBeNull();
instance.RuntimeState!.RuntimeStatus.Should().NotBe("Finished");
}
public static void AssertContinuationInstance(
WorkflowInstanceSummary instance,
string workflowName,
string expectedStatus)
{
instance.WorkflowName.Should().Be(workflowName);
instance.Status.Should().Be(expectedStatus);
instance.RuntimeProvider.Should().Be(WorkflowRuntimeProviderNames.Engine);
}
public static void AssertTaskNames(
WorkflowInstanceGetResponse instance,
params string[] taskNames)
{
instance.Tasks.Select(x => x.TaskName).Should().BeEquivalentTo(taskNames);
}
public static void AssertTransportFailure(BaseResultException exception)
{
exception.MessageId.Should().Be(MessageKeys.WorkflowTransportFailed);
exception.InnerException.Should().BeNull();
}
public static void AssertRuntimeTimeout(BaseResultException exception)
{
exception.MessageId.Should().Be(MessageKeys.WorkflowRuntimeFailed);
exception.InnerException.Should().BeOfType<TimeoutException>();
}
}

View File

@@ -0,0 +1,253 @@
using System.Collections.Generic;
using StellaOps.Workflow.Abstractions;
using StellaOps.Workflow.Engine.Execution;
using StellaOps.Workflow.Engine.Hosting;
using StellaOps.Workflow.Engine.Scheduling;
using StellaOps.Workflow.Engine.Signaling;
using StellaOps.Workflow.DataStore.MongoDB;
using StellaOps.Workflow.Engine.Services;
using StellaOps.Workflow.Engine.Studio;
using StellaOps.Workflow.DataStore.PostgreSQL;
using FluentAssertions;
using Microsoft.AspNetCore.Authorization;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Options;
using NUnit.Framework;
namespace StellaOps.Workflow.IntegrationTests.Shared;
[TestFixture]
[Category("Integration")]
public class WorkflowPlatformBootstrapTests
{
[Test]
public void AddWorkflowPlatformServices_WhenMinimumConfigurationProvided_ShouldBuildServiceProvider()
{
var configuration = new ConfigurationBuilder()
.AddInMemoryCollection(new Dictionary<string, string?>
{
["RabbitConfig:HostName"] = "localhost",
["RabbitConfig:UserName"] = "guest",
["RabbitConfig:Password"] = "guest",
["RabbitConfig:Port"] = "5672",
["RabbitConfig:Exchange"] = "workflow",
["RabbitConfig:RequestQueueName"] = "workflow.request",
["MicroserviceConfig:SectionName"] = "Workflow",
["MicroserviceConfig:ExchangeName"] = "workflow",
["ConnectionStrings:DefaultConnection"] = "DATA SOURCE=(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=localhost)(PORT=1521))(CONNECT_DATA=(SID=orcl1)));USER ID=srd_wfklw;PASSWORD=srd_wfklw",
["PluginsConfig:PluginsPrefix"] = "StellaOps.Workflow",
["PluginsConfig:PluginsDirectory"] = @"..\..\StellaOps.Workflow\PluginBinaries",
["PluginsConfig:PluginsOrder:0"] = "StellaOps.Workflow.Engine.AssignPermissions.Generic",
["PluginsConfig:PluginsOrder:1"] = "StellaOps.Workflow.DataStore.Oracle",
["PluginsConfig:PluginsOrder:2"] = "StellaOps.Workflow.Engine.Transport.Microservice",
["PluginsConfig:PluginsOrder:3"] = "StellaOps.Workflow.Engine.Transport.LegacyRabbit",
["PluginsConfig:PluginsOrder:4"] = "StellaOps.Workflow.Engine.Transport.GraphQL",
["PluginsConfig:PluginsOrder:5"] = "StellaOps.Workflow.Engine.Transport.Http",
["PluginsConfig:PluginsOrder:6"] = "StellaOps.Workflow.Engine.Workflows.Bulstrad",
})
.Build();
var services = new ServiceCollection();
services.AddLogging();
services.AddWorkflowPlatformServices(configuration);
services.Replace(ServiceDescriptor.Scoped<IWorkflowMicroserviceTransport, NullWorkflowMicroserviceTransport>());
services.Replace(ServiceDescriptor.Scoped<IWorkflowLegacyRabbitTransport, NullWorkflowLegacyRabbitTransport>());
services.Replace(ServiceDescriptor.Scoped<IWorkflowGraphqlTransport, NullWorkflowGraphqlTransport>());
services.Replace(ServiceDescriptor.Scoped<IWorkflowHttpTransport, NullWorkflowHttpTransport>());
using var provider = services.BuildServiceProvider();
provider.GetRequiredService<ISerdicaWorkflowCatalog>().Should().NotBeNull();
provider
.GetRequiredService<ISerdicaWorkflowCatalog>()
.GetDefinition("ApproveApplication", "1.0.0")
.Should()
.NotBeNull();
provider.GetRequiredService<WorkflowRuntimeService>().Should().NotBeNull();
provider.GetRequiredService<WorkflowDiagramService>().Should().NotBeNull();
provider.GetRequiredService<IWorkflowRuntimeOrchestrator>().Should().BeOfType<ConfiguredWorkflowRuntimeOrchestrator>();
provider.GetRequiredService<IOptions<WorkflowRuntimeOptions>>().Value.DefaultProvider.Should().Be(WorkflowRuntimeProviderNames.Engine);
provider.GetRequiredService<IWorkflowSignalBus>().Should().BeOfType<WorkflowSignalBusBridge>();
provider.GetRequiredService<IWorkflowScheduleBus>().Should().BeOfType<WorkflowScheduleBusBridge>();
provider.GetRequiredService<IWorkflowSignalStore>().Should().BeOfType<OracleAqWorkflowSignalBus>();
provider.GetRequiredService<IWorkflowSignalDriver>().Should().BeOfType<OracleAqWorkflowSignalBus>();
provider.GetRequiredService<IWorkflowSignalScheduler>().Should().BeOfType<OracleAqWorkflowScheduleBus>();
var runtimeStateStore = provider.GetRequiredService<IWorkflowRuntimeStateStore>();
runtimeStateStore.GetType().FullName.Should().Be("StellaOps.Workflow.DataStore.Oracle.OracleWorkflowRuntimeStateStore");
runtimeStateStore.GetType().Assembly.GetName().Name.Should().Be("StellaOps.Workflow.DataStore.Oracle");
var hostedJobLockService = provider.GetRequiredService<IWorkflowHostedJobLockService>();
hostedJobLockService.GetType().FullName.Should().Be("StellaOps.Workflow.DataStore.Oracle.OracleWorkflowHostedJobLockService");
hostedJobLockService.GetType().Assembly.GetName().Name.Should().Be("StellaOps.Workflow.DataStore.Oracle");
provider.GetRequiredService<IAuthorizationService>().Should().NotBeNull();
provider.GetRequiredService<HealthCheckService>().Should().NotBeNull();
}
[Test]
public void WorkflowDefinitionCatalog_ShouldContainApproveApplicationVersionedDefinition()
{
var configuration = new ConfigurationBuilder()
.AddInMemoryCollection(new Dictionary<string, string?>
{
["RabbitConfig:HostName"] = "localhost",
["RabbitConfig:UserName"] = "guest",
["RabbitConfig:Password"] = "guest",
["RabbitConfig:Port"] = "5672",
["RabbitConfig:Exchange"] = "workflow",
["RabbitConfig:RequestQueueName"] = "workflow.request",
["MicroserviceConfig:SectionName"] = "Workflow",
["MicroserviceConfig:ExchangeName"] = "workflow",
["ConnectionStrings:DefaultConnection"] = "DATA SOURCE=(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=localhost)(PORT=1521))(CONNECT_DATA=(SID=orcl1)));USER ID=srd_wfklw;PASSWORD=srd_wfklw",
["PluginsConfig:PluginsPrefix"] = "StellaOps.Workflow",
["PluginsConfig:PluginsDirectory"] = @"..\..\StellaOps.Workflow\PluginBinaries",
["PluginsConfig:PluginsOrder:0"] = "StellaOps.Workflow.Engine.AssignPermissions.Generic",
["PluginsConfig:PluginsOrder:1"] = "StellaOps.Workflow.DataStore.Oracle",
["PluginsConfig:PluginsOrder:2"] = "StellaOps.Workflow.Engine.Transport.Microservice",
["PluginsConfig:PluginsOrder:3"] = "StellaOps.Workflow.Engine.Transport.LegacyRabbit",
["PluginsConfig:PluginsOrder:4"] = "StellaOps.Workflow.Engine.Transport.GraphQL",
["PluginsConfig:PluginsOrder:5"] = "StellaOps.Workflow.Engine.Transport.Http",
["PluginsConfig:PluginsOrder:6"] = "StellaOps.Workflow.Engine.Workflows.Bulstrad",
})
.Build();
var services = new ServiceCollection();
services.AddLogging();
services.AddWorkflowPlatformServices(configuration);
services.Replace(ServiceDescriptor.Scoped<IWorkflowMicroserviceTransport, NullWorkflowMicroserviceTransport>());
services.Replace(ServiceDescriptor.Scoped<IWorkflowLegacyRabbitTransport, NullWorkflowLegacyRabbitTransport>());
services.Replace(ServiceDescriptor.Scoped<IWorkflowGraphqlTransport, NullWorkflowGraphqlTransport>());
services.Replace(ServiceDescriptor.Scoped<IWorkflowHttpTransport, NullWorkflowHttpTransport>());
using var provider = services.BuildServiceProvider();
provider
.GetRequiredService<ISerdicaWorkflowCatalog>()
.GetDefinition("ApproveApplication", "1.0.0")
.Should()
.NotBeNull();
}
[Test]
public void AddWorkflowPlatformServices_WhenPostgresBackendPluginIsSelected_ShouldBuildBackendNeutralProvider()
{
var configuration = new ConfigurationBuilder()
.AddInMemoryCollection(new Dictionary<string, string?>
{
["WorkflowBackend:Provider"] = WorkflowBackendNames.Postgres,
["WorkflowBackend:Postgres:ConnectionStringName"] = "WorkflowPostgres",
["WorkflowBackend:Postgres:SchemaName"] = "wf_bootstrap_test",
["ConnectionStrings:WorkflowPostgres"] = "Host=localhost;Port=5432;Database=workflow;Username=postgres;Password=postgres",
["RabbitConfig:HostName"] = "localhost",
["RabbitConfig:UserName"] = "guest",
["RabbitConfig:Password"] = "guest",
["RabbitConfig:Port"] = "5672",
["RabbitConfig:Exchange"] = "workflow",
["RabbitConfig:RequestQueueName"] = "workflow.request",
["MicroserviceConfig:SectionName"] = "Workflow",
["MicroserviceConfig:ExchangeName"] = "workflow",
["ConnectionStrings:DefaultConnection"] = "DATA SOURCE=(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=localhost)(PORT=1521))(CONNECT_DATA=(SID=orcl1)));USER ID=srd_wfklw;PASSWORD=srd_wfklw",
["PluginsConfig:PluginsPrefix"] = "StellaOps.Workflow",
["PluginsConfig:PluginsDirectory"] = @"..\..\StellaOps.Workflow\PluginBinaries",
["PluginsConfig:PluginsOrder:0"] = "StellaOps.Workflow.Engine.AssignPermissions.Generic",
["PluginsConfig:PluginsOrder:1"] = "StellaOps.Workflow.DataStore.PostgreSQL",
["PluginsConfig:PluginsOrder:2"] = "StellaOps.Workflow.Engine.Transport.Microservice",
["PluginsConfig:PluginsOrder:3"] = "StellaOps.Workflow.Engine.Transport.LegacyRabbit",
["PluginsConfig:PluginsOrder:4"] = "StellaOps.Workflow.Engine.Transport.GraphQL",
["PluginsConfig:PluginsOrder:5"] = "StellaOps.Workflow.Engine.Transport.Http",
["PluginsConfig:PluginsOrder:6"] = "StellaOps.Workflow.Engine.Workflows.Bulstrad",
})
.Build();
var services = new ServiceCollection();
services.AddLogging();
services.AddWorkflowPlatformServices(configuration);
services.Replace(ServiceDescriptor.Scoped<IWorkflowMicroserviceTransport, NullWorkflowMicroserviceTransport>());
services.Replace(ServiceDescriptor.Scoped<IWorkflowLegacyRabbitTransport, NullWorkflowLegacyRabbitTransport>());
services.Replace(ServiceDescriptor.Scoped<IWorkflowGraphqlTransport, NullWorkflowGraphqlTransport>());
services.Replace(ServiceDescriptor.Scoped<IWorkflowHttpTransport, NullWorkflowHttpTransport>());
using var provider = services.BuildServiceProvider();
provider.GetRequiredService<IOptions<WorkflowBackendOptions>>().Value.Provider.Should().Be(WorkflowBackendNames.Postgres);
var runtimeStateStore = provider.GetRequiredService<IWorkflowRuntimeStateStore>();
runtimeStateStore.GetType().FullName.Should().Be(typeof(PostgresWorkflowRuntimeStateStore).FullName);
runtimeStateStore.GetType().Assembly.GetName().Name.Should().Be("StellaOps.Workflow.DataStore.PostgreSQL");
var hostedJobLockService = provider.GetRequiredService<IWorkflowHostedJobLockService>();
hostedJobLockService.GetType().FullName.Should().Be(typeof(PostgresWorkflowHostedJobLockService).FullName);
hostedJobLockService.GetType().Assembly.GetName().Name.Should().Be("StellaOps.Workflow.DataStore.PostgreSQL");
provider.GetRequiredService<IWorkflowProjectionStore>().GetType().FullName.Should().Be(typeof(PostgresWorkflowProjectionStore).FullName);
provider.GetRequiredService<IWorkflowProjectionRetentionStore>().GetType().FullName.Should().Be(typeof(PostgresWorkflowProjectionRetentionStore).FullName);
provider.GetRequiredService<IWorkflowMutationCoordinator>().GetType().FullName.Should().Be(typeof(PostgresWorkflowMutationCoordinator).FullName);
provider.GetRequiredService<IWorkflowSignalBus>().Should().BeOfType<WorkflowSignalBusBridge>();
provider.GetRequiredService<IWorkflowScheduleBus>().Should().BeOfType<WorkflowScheduleBusBridge>();
provider.GetRequiredService<IWorkflowSignalStore>().GetType().FullName.Should().Be(typeof(PostgresWorkflowSignalStore).FullName);
provider.GetRequiredService<IWorkflowSignalDriver>().GetType().FullName.Should().Be(typeof(PostgresWorkflowSignalBus).FullName);
provider.GetRequiredService<IWorkflowSignalScheduler>().GetType().FullName.Should().Be(typeof(PostgresWorkflowScheduleBus).FullName);
provider.GetRequiredService<HealthCheckService>().Should().NotBeNull();
}
[Test]
public void AddWorkflowPlatformServices_WhenMongoBackendPluginIsSelected_ShouldBuildBackendNeutralProvider()
{
var configuration = new ConfigurationBuilder()
.AddInMemoryCollection(new Dictionary<string, string?>
{
["WorkflowBackend:Provider"] = WorkflowBackendNames.Mongo,
[$"{WorkflowStoreMongoOptions.SectionName}:ConnectionStringName"] = "WorkflowMongo",
[$"{WorkflowStoreMongoOptions.SectionName}:DatabaseName"] = "wf_bootstrap_test",
["ConnectionStrings:WorkflowMongo"] = "mongodb://127.0.0.1:27017/?replicaSet=rs0&directConnection=true",
["RabbitConfig:HostName"] = "localhost",
["RabbitConfig:UserName"] = "guest",
["RabbitConfig:Password"] = "guest",
["RabbitConfig:Port"] = "5672",
["RabbitConfig:Exchange"] = "workflow",
["RabbitConfig:RequestQueueName"] = "workflow.request",
["MicroserviceConfig:SectionName"] = "Workflow",
["MicroserviceConfig:ExchangeName"] = "workflow",
["ConnectionStrings:DefaultConnection"] = "DATA SOURCE=(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=localhost)(PORT=1521))(CONNECT_DATA=(SID=orcl1)));USER ID=srd_wfklw;PASSWORD=srd_wfklw",
["PluginsConfig:PluginsPrefix"] = "StellaOps.Workflow",
["PluginsConfig:PluginsDirectory"] = @"..\..\StellaOps.Workflow\PluginBinaries",
["PluginsConfig:PluginsOrder:0"] = "StellaOps.Workflow.Engine.AssignPermissions.Generic",
["PluginsConfig:PluginsOrder:1"] = "StellaOps.Workflow.DataStore.MongoDB",
["PluginsConfig:PluginsOrder:2"] = "StellaOps.Workflow.Engine.Transport.Microservice",
["PluginsConfig:PluginsOrder:3"] = "StellaOps.Workflow.Engine.Transport.LegacyRabbit",
["PluginsConfig:PluginsOrder:4"] = "StellaOps.Workflow.Engine.Transport.GraphQL",
["PluginsConfig:PluginsOrder:5"] = "StellaOps.Workflow.Engine.Transport.Http",
["PluginsConfig:PluginsOrder:6"] = "StellaOps.Workflow.Engine.Workflows.Bulstrad",
})
.Build();
var services = new ServiceCollection();
services.AddLogging();
services.AddWorkflowPlatformServices(configuration);
services.Replace(ServiceDescriptor.Scoped<IWorkflowMicroserviceTransport, NullWorkflowMicroserviceTransport>());
services.Replace(ServiceDescriptor.Scoped<IWorkflowLegacyRabbitTransport, NullWorkflowLegacyRabbitTransport>());
services.Replace(ServiceDescriptor.Scoped<IWorkflowGraphqlTransport, NullWorkflowGraphqlTransport>());
services.Replace(ServiceDescriptor.Scoped<IWorkflowHttpTransport, NullWorkflowHttpTransport>());
using var provider = services.BuildServiceProvider();
provider.GetRequiredService<IOptions<WorkflowBackendOptions>>().Value.Provider.Should().Be(WorkflowBackendNames.Mongo);
provider.GetRequiredService<IWorkflowRuntimeStateStore>().GetType().FullName.Should().Be(typeof(MongoWorkflowRuntimeStateStore).FullName);
provider.GetRequiredService<IWorkflowHostedJobLockService>().GetType().FullName.Should().Be(typeof(MongoWorkflowHostedJobLockService).FullName);
provider.GetRequiredService<IWorkflowProjectionStore>().GetType().FullName.Should().Be(typeof(MongoWorkflowProjectionStore).FullName);
provider.GetRequiredService<IWorkflowProjectionRetentionStore>().GetType().FullName.Should().Be(typeof(MongoWorkflowProjectionRetentionStore).FullName);
provider.GetRequiredService<IWorkflowMutationCoordinator>().GetType().FullName.Should().Be(typeof(MongoWorkflowMutationCoordinator).FullName);
provider.GetRequiredService<IWorkflowSignalBus>().Should().BeOfType<WorkflowSignalBusBridge>();
provider.GetRequiredService<IWorkflowScheduleBus>().Should().BeOfType<WorkflowScheduleBusBridge>();
provider.GetRequiredService<IWorkflowSignalStore>().GetType().FullName.Should().Be(typeof(MongoWorkflowSignalStore).FullName);
provider.GetRequiredService<IWorkflowSignalDriver>().GetType().FullName.Should().Be(typeof(MongoWorkflowSignalBus).FullName);
provider.GetRequiredService<IWorkflowSignalScheduler>().GetType().FullName.Should().Be(typeof(MongoWorkflowScheduleBus).FullName);
provider.GetRequiredService<HealthCheckService>().Should().NotBeNull();
}
}

View File

@@ -0,0 +1,137 @@
using System.Collections.Generic;
using StellaOps.Workflow.Abstractions;
using StellaOps.Workflow.Engine.Scheduling;
using StellaOps.Workflow.Engine.Signaling;
using StellaOps.Workflow.Signaling.Redis;
using StellaOps.Workflow.DataStore.Oracle;
using StellaOps.Workflow.DataStore.PostgreSQL;
using StellaOps.Workflow.Engine.Services;
using FluentAssertions;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using NUnit.Framework;
namespace StellaOps.Workflow.IntegrationTests.Shared;
[TestFixture]
[Category("Integration")]
public class WorkflowPlatformRedisSignalDriverBootstrapTests
{
private RedisDockerFixture? redisFixture;
[OneTimeSetUp]
public async Task OneTimeSetUpAsync()
{
redisFixture = new RedisDockerFixture();
await redisFixture.StartOrIgnoreAsync();
}
[OneTimeTearDown]
public void OneTimeTearDown()
{
redisFixture?.Dispose();
}
[Test]
public void AddWorkflowPlatformServices_WhenPostgresAndRedisSignalDriverAreSelected_ShouldBuildProvider()
{
var configuration = new ConfigurationBuilder()
.AddInMemoryCollection(new Dictionary<string, string?>
{
["WorkflowBackend:Provider"] = WorkflowBackendNames.Postgres,
["WorkflowBackend:Postgres:ConnectionStringName"] = "WorkflowPostgres",
["WorkflowBackend:Postgres:SchemaName"] = "wf_bootstrap_test",
["WorkflowSignalDriver:Provider"] = WorkflowSignalDriverNames.Redis,
["WorkflowSignalDriver:Redis:ChannelName"] = "stella:test:bootstrap",
["RedisConfig:ServerUrl"] = redisFixture!.ConnectionString,
["ConnectionStrings:WorkflowPostgres"] = "Host=localhost;Port=5432;Database=workflow;Username=postgres;Password=postgres",
["RabbitConfig:HostName"] = "localhost",
["RabbitConfig:UserName"] = "guest",
["RabbitConfig:Password"] = "guest",
["RabbitConfig:Port"] = "5672",
["RabbitConfig:Exchange"] = "workflow",
["RabbitConfig:RequestQueueName"] = "workflow.request",
["MicroserviceConfig:SectionName"] = "Workflow",
["MicroserviceConfig:ExchangeName"] = "workflow",
["ConnectionStrings:DefaultConnection"] = "DATA SOURCE=(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=localhost)(PORT=1521))(CONNECT_DATA=(SID=orcl1)));USER ID=srd_wfklw;PASSWORD=srd_wfklw",
["PluginsConfig:PluginsPrefix"] = "StellaOps.Workflow",
["PluginsConfig:PluginsDirectory"] = @"..\..\StellaOps.Workflow\PluginBinaries",
["PluginsConfig:PluginsOrder:0"] = "StellaOps.Workflow.Engine.AssignPermissions.Generic",
["PluginsConfig:PluginsOrder:1"] = "StellaOps.Workflow.DataStore.PostgreSQL",
["PluginsConfig:PluginsOrder:2"] = "StellaOps.Workflow.Signaling.Redis",
["PluginsConfig:PluginsOrder:3"] = "StellaOps.Workflow.Engine.Transport.Microservice",
["PluginsConfig:PluginsOrder:4"] = "StellaOps.Workflow.Engine.Transport.LegacyRabbit",
["PluginsConfig:PluginsOrder:5"] = "StellaOps.Workflow.Engine.Transport.GraphQL",
["PluginsConfig:PluginsOrder:6"] = "StellaOps.Workflow.Engine.Transport.Http",
["PluginsConfig:PluginsOrder:7"] = "StellaOps.Workflow.Engine.Workflows.Bulstrad",
})
.Build();
var services = new ServiceCollection();
services.AddLogging();
services.AddWorkflowPlatformServices(configuration);
using var provider = services.BuildServiceProvider();
provider.GetRequiredService<IWorkflowSignalBus>().Should().BeOfType<WorkflowSignalBusBridge>();
provider.GetRequiredService<IWorkflowScheduleBus>().Should().BeOfType<WorkflowScheduleBusBridge>();
provider.GetRequiredService<IWorkflowSignalStore>().GetType().FullName.Should().Be(typeof(PostgresWorkflowSignalStore).FullName);
provider.GetRequiredService<IWorkflowSignalClaimStore>().GetType().FullName.Should().Be(typeof(PostgresWorkflowSignalStore).FullName);
provider.GetRequiredService<IWorkflowSignalDriver>().GetType().FullName.Should().Be(typeof(RedisWorkflowSignalDriver).FullName);
provider.GetRequiredService<IWorkflowWakeOutbox>().Should().BeOfType<NullWorkflowWakeOutbox>();
provider.GetRequiredService<IWorkflowWakeOutboxReceiver>().Should().BeOfType<NullWorkflowWakeOutboxReceiver>();
provider.GetServices<IHostedService>()
.Should()
.NotContain(service => service.GetType().FullName == typeof(RedisWorkflowWakeOutboxPublisherHostedService).FullName);
}
[Test]
public void AddWorkflowPlatformServices_WhenOracleAndRedisSignalDriverAreSelected_ShouldBuildProvider()
{
var configuration = new ConfigurationBuilder()
.AddInMemoryCollection(new Dictionary<string, string?>
{
["WorkflowBackend:Provider"] = WorkflowBackendNames.Oracle,
["WorkflowSignalDriver:Provider"] = WorkflowSignalDriverNames.Redis,
["WorkflowSignalDriver:Redis:ChannelName"] = "stella:test:oracle:bootstrap",
["RedisConfig:ServerUrl"] = redisFixture!.ConnectionString,
["RabbitConfig:HostName"] = "localhost",
["RabbitConfig:UserName"] = "guest",
["RabbitConfig:Password"] = "guest",
["RabbitConfig:Port"] = "5672",
["RabbitConfig:Exchange"] = "workflow",
["RabbitConfig:RequestQueueName"] = "workflow.request",
["MicroserviceConfig:SectionName"] = "Workflow",
["MicroserviceConfig:ExchangeName"] = "workflow",
["ConnectionStrings:DefaultConnection"] = "DATA SOURCE=(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=localhost)(PORT=1521))(CONNECT_DATA=(SID=orcl1)));USER ID=srd_wfklw;PASSWORD=srd_wfklw",
["PluginsConfig:PluginsPrefix"] = "StellaOps.Workflow",
["PluginsConfig:PluginsDirectory"] = @"..\..\StellaOps.Workflow\PluginBinaries",
["PluginsConfig:PluginsOrder:0"] = "StellaOps.Workflow.Engine.AssignPermissions.Generic",
["PluginsConfig:PluginsOrder:1"] = "StellaOps.Workflow.DataStore.Oracle",
["PluginsConfig:PluginsOrder:2"] = "StellaOps.Workflow.Signaling.Redis",
})
.Build();
var services = new ServiceCollection();
services.AddLogging();
services.AddWorkflowPlatformServices(configuration);
using var provider = services.BuildServiceProvider();
provider.GetRequiredService<IWorkflowSignalBus>().Should().BeOfType<WorkflowSignalBusBridge>();
provider.GetRequiredService<IWorkflowScheduleBus>().Should().BeOfType<WorkflowScheduleBusBridge>();
provider.GetRequiredService<IWorkflowSignalStore>().GetType().FullName.Should().Be(typeof(OracleAqWorkflowSignalBus).FullName);
provider.GetRequiredService<IWorkflowSignalClaimStore>().GetType().FullName.Should().Be(typeof(OracleAqWorkflowSignalBus).FullName);
provider.GetRequiredService<IWorkflowSignalDriver>().GetType().FullName.Should().Be(typeof(RedisWorkflowSignalDriver).FullName);
provider.GetRequiredService<IWorkflowWakeOutbox>().Should().BeOfType<NullWorkflowWakeOutbox>();
provider.GetRequiredService<IWorkflowWakeOutboxReceiver>().Should().BeOfType<NullWorkflowWakeOutboxReceiver>();
provider.GetServices<IHostedService>()
.Should()
.NotContain(service => service.GetType().FullName == typeof(RedisWorkflowWakeOutboxPublisherHostedService).FullName);
}
}

View File

@@ -0,0 +1,411 @@
using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using StellaOps.Workflow.Abstractions;
using StellaOps.Workflow.Contracts;
namespace StellaOps.Workflow.IntegrationTests.Shared;
public sealed class WorkflowTransportScripts
{
public ScriptedWorkflowLegacyRabbitTransport LegacyRabbit { get; } = new();
public ScriptedWorkflowMicroserviceTransport Microservice { get; } = new();
public ScriptedWorkflowGraphqlTransport Graphql { get; } = new();
public ScriptedWorkflowHttpTransport Http { get; } = new();
}
public sealed class ScriptedWorkflowLegacyRabbitTransport
: ScriptedTransportBase<WorkflowLegacyRabbitRequest, WorkflowMicroserviceResponse>,
IWorkflowLegacyRabbitTransport
{
public ScriptedWorkflowLegacyRabbitTransport Respond(
string command,
object? payload,
WorkflowLegacyRabbitMode mode = WorkflowLegacyRabbitMode.Envelope)
{
EnqueueResponse(BuildKey(command, mode), new WorkflowMicroserviceResponse
{
Succeeded = true,
Payload = payload,
});
return this;
}
public ScriptedWorkflowLegacyRabbitTransport Fail(
string command,
string error,
WorkflowLegacyRabbitMode mode = WorkflowLegacyRabbitMode.Envelope,
object? payload = null)
{
EnqueueResponse(BuildKey(command, mode), new WorkflowMicroserviceResponse
{
Succeeded = false,
Error = error,
Payload = payload,
});
return this;
}
public ScriptedWorkflowLegacyRabbitTransport Throw(
string command,
Exception exception,
WorkflowLegacyRabbitMode mode = WorkflowLegacyRabbitMode.Envelope)
{
EnqueueException(BuildKey(command, mode), exception);
return this;
}
public ScriptedWorkflowLegacyRabbitTransport Timeout(
string command,
WorkflowLegacyRabbitMode mode = WorkflowLegacyRabbitMode.Envelope)
{
return Throw(
command,
new TimeoutException($"Timeout for legacy Rabbit command '{command}' in mode '{mode}'."),
mode);
}
public Task<WorkflowMicroserviceResponse> ExecuteAsync(
WorkflowLegacyRabbitRequest request,
CancellationToken cancellationToken = default)
{
return ExecuteAsync(request, BuildKey(request.Command, request.Mode));
}
protected override WorkflowMicroserviceResponse BuildMissingResponse(WorkflowLegacyRabbitRequest request, string key)
{
return new WorkflowMicroserviceResponse
{
Succeeded = false,
Error = $"No scripted legacy Rabbit response configured for {key}.",
};
}
private static string BuildKey(string command, WorkflowLegacyRabbitMode mode)
{
return $"{mode}:{command}";
}
}
public sealed class ScriptedWorkflowMicroserviceTransport
: ScriptedTransportBase<WorkflowMicroserviceRequest, WorkflowMicroserviceResponse>,
IWorkflowMicroserviceTransport
{
public ScriptedWorkflowMicroserviceTransport Respond(
string microserviceName,
string command,
object? payload)
{
EnqueueResponse(BuildKey(microserviceName, command), new WorkflowMicroserviceResponse
{
Succeeded = true,
Payload = payload,
});
return this;
}
public ScriptedWorkflowMicroserviceTransport Fail(
string microserviceName,
string command,
string error,
object? payload = null)
{
EnqueueResponse(BuildKey(microserviceName, command), new WorkflowMicroserviceResponse
{
Succeeded = false,
Error = error,
Payload = payload,
});
return this;
}
public ScriptedWorkflowMicroserviceTransport Throw(
string microserviceName,
string command,
Exception exception)
{
EnqueueException(BuildKey(microserviceName, command), exception);
return this;
}
public ScriptedWorkflowMicroserviceTransport Timeout(string microserviceName, string command)
{
return Throw(
microserviceName,
command,
new TimeoutException($"Timeout for microservice command '{microserviceName}.{command}'."));
}
public Task<WorkflowMicroserviceResponse> ExecuteAsync(
WorkflowMicroserviceRequest request,
CancellationToken cancellationToken = default)
{
return ExecuteAsync(request, BuildKey(request.MicroserviceName, request.Command));
}
protected override WorkflowMicroserviceResponse BuildMissingResponse(WorkflowMicroserviceRequest request, string key)
{
return new WorkflowMicroserviceResponse
{
Succeeded = false,
Error = $"No scripted microservice response configured for {key}.",
};
}
private static string BuildKey(string microserviceName, string command)
{
return $"{microserviceName}:{command}";
}
}
public sealed class ScriptedWorkflowGraphqlTransport
: ScriptedTransportBase<WorkflowGraphqlRequest, WorkflowGraphqlResponse>,
IWorkflowGraphqlTransport
{
public ScriptedWorkflowGraphqlTransport Respond(
string target,
string query,
object? payload,
string? operationName = null)
{
EnqueueResponse(BuildKey(target, query, operationName), new WorkflowGraphqlResponse
{
Succeeded = true,
JsonPayload = payload is null ? null : JsonSerializer.Serialize(payload),
});
return this;
}
public ScriptedWorkflowGraphqlTransport Fail(
string target,
string query,
string error,
string? operationName = null,
object? payload = null)
{
EnqueueResponse(BuildKey(target, query, operationName), new WorkflowGraphqlResponse
{
Succeeded = false,
Error = error,
JsonPayload = payload is null ? null : JsonSerializer.Serialize(payload),
});
return this;
}
public ScriptedWorkflowGraphqlTransport Throw(
string target,
string query,
Exception exception,
string? operationName = null)
{
EnqueueException(BuildKey(target, query, operationName), exception);
return this;
}
public ScriptedWorkflowGraphqlTransport Timeout(
string target,
string query,
string? operationName = null)
{
return Throw(
target,
query,
new TimeoutException($"Timeout for GraphQL request '{target}:{operationName ?? "<none>"}'."),
operationName);
}
public Task<WorkflowGraphqlResponse> ExecuteAsync(
WorkflowGraphqlRequest request,
CancellationToken cancellationToken = default)
{
return ExecuteAsync(request, BuildKey(request.Target, request.Query, request.OperationName));
}
protected override WorkflowGraphqlResponse BuildMissingResponse(WorkflowGraphqlRequest request, string key)
{
return new WorkflowGraphqlResponse
{
Succeeded = false,
Error = $"No scripted GraphQL response configured for {key}.",
};
}
private static string BuildKey(string target, string query, string? operationName)
{
return $"{target}:{operationName ?? "<none>"}:{query}";
}
}
public sealed class ScriptedWorkflowHttpTransport
: ScriptedTransportBase<WorkflowHttpRequest, WorkflowHttpResponse>,
IWorkflowHttpTransport
{
public ScriptedWorkflowHttpTransport Respond(
string target,
string path,
object? payload,
string method = "POST",
int statusCode = 200)
{
EnqueueResponse(BuildKey(target, method, path), new WorkflowHttpResponse
{
Succeeded = true,
StatusCode = statusCode,
JsonPayload = payload is null ? null : JsonSerializer.Serialize(payload),
});
return this;
}
public ScriptedWorkflowHttpTransport Fail(
string target,
string path,
string error,
string method = "POST",
int statusCode = 500,
object? payload = null)
{
EnqueueResponse(BuildKey(target, method, path), new WorkflowHttpResponse
{
Succeeded = false,
StatusCode = statusCode,
Error = error,
JsonPayload = payload is null ? null : JsonSerializer.Serialize(payload),
});
return this;
}
public ScriptedWorkflowHttpTransport Throw(
string target,
string path,
Exception exception,
string method = "POST")
{
EnqueueException(BuildKey(target, method, path), exception);
return this;
}
public ScriptedWorkflowHttpTransport Timeout(
string target,
string path,
string method = "POST")
{
return Throw(
target,
path,
new TimeoutException($"Timeout for HTTP request '{method} {target}:{path}'."),
method);
}
public Task<WorkflowHttpResponse> ExecuteAsync(
WorkflowHttpRequest request,
CancellationToken cancellationToken = default)
{
return ExecuteAsync(request, BuildKey(request.Target, request.Method, request.Path));
}
protected override WorkflowHttpResponse BuildMissingResponse(WorkflowHttpRequest request, string key)
{
return new WorkflowHttpResponse
{
Succeeded = false,
Error = $"No scripted HTTP response configured for {key}.",
};
}
private static string BuildKey(string target, string method, string path)
{
return $"{method.Trim().ToUpperInvariant()}:{target}:{path}";
}
}
public abstract class ScriptedTransportBase<TRequest, TResponse>
{
private readonly Dictionary<string, ScriptedCallSequence<TResponse>> scripts = new(StringComparer.OrdinalIgnoreCase);
private readonly object invocationSync = new();
public List<TRequest> Invocations { get; } = [];
protected void EnqueueResponse(string key, TResponse response)
{
GetSequence(key).EnqueueResponse(response);
}
protected void EnqueueException(string key, Exception exception)
{
GetSequence(key).EnqueueException(exception);
}
protected Task<TResponse> ExecuteAsync(TRequest request, string key)
{
lock (invocationSync)
{
Invocations.Add(request);
}
try
{
if (scripts.TryGetValue(key, out var sequence))
{
return Task.FromResult(sequence.ResolveNext());
}
return Task.FromResult(BuildMissingResponse(request, key));
}
catch (Exception exception)
{
return Task.FromException<TResponse>(exception);
}
}
protected abstract TResponse BuildMissingResponse(TRequest request, string key);
private ScriptedCallSequence<TResponse> GetSequence(string key)
{
if (!scripts.TryGetValue(key, out var sequence))
{
sequence = new ScriptedCallSequence<TResponse>();
scripts[key] = sequence;
}
return sequence;
}
}
internal sealed class ScriptedCallSequence<TResponse>
{
private readonly Queue<Func<TResponse>> outcomes = [];
private readonly object sync = new();
public void EnqueueResponse(TResponse response)
{
lock (sync)
{
outcomes.Enqueue(() => response);
}
}
public void EnqueueException(Exception exception)
{
lock (sync)
{
outcomes.Enqueue(() => throw exception);
}
}
public TResponse ResolveNext()
{
lock (sync)
{
if (outcomes.Count == 0)
{
throw new InvalidOperationException("No scripted transport outcomes are available.");
}
var next = outcomes.Count == 1 ? outcomes.Peek() : outcomes.Dequeue();
return next();
}
}
}