From 7be7978580435fc5616051f22b05012e1eb8b759 Mon Sep 17 00:00:00 2001 From: master <> Date: Mon, 6 Apr 2026 08:53:22 +0300 Subject: [PATCH] Add Workflow cancel contracts and projection store improvements Introduce WorkflowCancelContracts, update IWorkflowRuntimeApi with cancel support, and refine Postgres/Mongo projection store serialization. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../IWorkflowRuntimeApi.cs | 4 ++ .../WorkflowBusinessReferenceExtensions.cs | 69 +++++++++++++++++-- .../WorkflowCancelContracts.cs | 20 ++++++ .../MongoWorkflowProjectionStore.cs | 27 +++++--- .../PostgresWorkflowJson.cs | 19 +++-- .../PostgresWorkflowProjectionStore.cs | 10 +-- .../Projections/WorkflowProjectionStore.cs | 14 ++-- 7 files changed, 131 insertions(+), 32 deletions(-) create mode 100644 src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowCancelContracts.cs diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.Abstractions/IWorkflowRuntimeApi.cs b/src/Workflow/__Libraries/StellaOps.Workflow.Abstractions/IWorkflowRuntimeApi.cs index 7ef9122d9..fb847772b 100644 --- a/src/Workflow/__Libraries/StellaOps.Workflow.Abstractions/IWorkflowRuntimeApi.cs +++ b/src/Workflow/__Libraries/StellaOps.Workflow.Abstractions/IWorkflowRuntimeApi.cs @@ -46,4 +46,8 @@ public interface IWorkflowRuntimeApi Task RaiseExternalSignalAsync( WorkflowSignalRaiseRequest request, CancellationToken cancellationToken = default); + + Task CancelWorkflowAsync( + WorkflowCancelRequest request, + CancellationToken cancellationToken = default); } diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.Abstractions/WorkflowBusinessReferenceExtensions.cs b/src/Workflow/__Libraries/StellaOps.Workflow.Abstractions/WorkflowBusinessReferenceExtensions.cs index 57c7ebc76..b494d2b06 100644 --- a/src/Workflow/__Libraries/StellaOps.Workflow.Abstractions/WorkflowBusinessReferenceExtensions.cs +++ b/src/Workflow/__Libraries/StellaOps.Workflow.Abstractions/WorkflowBusinessReferenceExtensions.cs @@ -2,6 +2,7 @@ using System; using System.Collections.Generic; using System.Globalization; using System.Linq; +using System.Security.Cryptography; using System.Text; using System.Text.Json; @@ -11,6 +12,9 @@ namespace StellaOps.Workflow.Abstractions; public static class WorkflowBusinessReferenceExtensions { + public const int MaxBusinessReferenceKeyLength = 128; + private const int CompactBusinessReferenceHashLength = 16; + public static WorkflowBusinessReference? NormalizeBusinessReference(WorkflowBusinessReference? businessReference) { if (businessReference is null) @@ -29,7 +33,7 @@ public static class WorkflowBusinessReferenceExtensions var key = string.IsNullOrWhiteSpace(businessReference.Key) ? BuildCanonicalBusinessReferenceKey(normalizedParts) - : ConvertBusinessReferenceValueToString(businessReference.Key); + : NormalizeBusinessReferenceKey(ConvertBusinessReferenceValueToString(businessReference.Key)); if (string.IsNullOrWhiteSpace(key) && normalizedParts.Count == 0) { @@ -72,7 +76,36 @@ public static class WorkflowBusinessReferenceExtensions builder.Append(Uri.EscapeDataString(normalizedParts[index].Value!)); } - return builder.ToString(); + return NormalizeBusinessReferenceKey(builder.ToString()); + } + + public static string? NormalizeBusinessReferenceKey(string? key) + { + if (string.IsNullOrWhiteSpace(key)) + { + return null; + } + + if (key.Length <= MaxBusinessReferenceKeyLength) + { + return key; + } + + var marker = $"|h={ComputeBusinessReferenceHash(key)}|"; + var availableCharacters = MaxBusinessReferenceKeyLength - marker.Length; + if (availableCharacters <= 0) + { + return marker.Length > MaxBusinessReferenceKeyLength + ? marker[..MaxBusinessReferenceKeyLength] + : marker; + } + + var prefixLength = availableCharacters / 2; + var suffixLength = availableCharacters - prefixLength; + return string.Concat( + key.AsSpan(0, prefixLength), + marker, + key.AsSpan(key.Length - suffixLength, suffixLength)); } public static bool MatchesBusinessReferenceFilter( @@ -81,8 +114,9 @@ public static class WorkflowBusinessReferenceExtensions IDictionary parts) { var normalizedReference = NormalizeBusinessReference(businessReference); - if (!string.IsNullOrWhiteSpace(key) - && !string.Equals(normalizedReference?.Key, key, StringComparison.Ordinal)) + var normalizedFilterKey = NormalizeBusinessReferenceKey(key); + if (!string.IsNullOrWhiteSpace(normalizedFilterKey) + && !string.Equals(normalizedReference?.Key, normalizedFilterKey, StringComparison.Ordinal)) { return false; } @@ -175,11 +209,32 @@ public static class WorkflowBusinessReferenceExtensions { return value switch { - IDictionary dictionary => new Dictionary(dictionary, StringComparer.OrdinalIgnoreCase), + IDictionary dictionary => CopyLastWins(dictionary), JsonElement element when element.ValueKind == JsonValueKind.Object => - JsonSerializer.Deserialize>(element.GetRawText()) - ?? new Dictionary(StringComparer.OrdinalIgnoreCase), + CopyLastWins( + JsonSerializer.Deserialize>(element.GetRawText()) + ?? new Dictionary()), _ => new Dictionary(StringComparer.OrdinalIgnoreCase), }; } + + private static Dictionary CopyLastWins(IEnumerable> source) + { + var result = new Dictionary(StringComparer.OrdinalIgnoreCase); + foreach (var pair in source) + { + if (!string.IsNullOrWhiteSpace(pair.Key)) + { + result[pair.Key] = pair.Value; + } + } + + return result; + } + + private static string ComputeBusinessReferenceHash(string key) + { + var hashBytes = SHA256.HashData(Encoding.UTF8.GetBytes(key)); + return Convert.ToHexString(hashBytes[..(CompactBusinessReferenceHashLength / 2)]); + } } diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowCancelContracts.cs b/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowCancelContracts.cs new file mode 100644 index 000000000..551d35cd5 --- /dev/null +++ b/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowCancelContracts.cs @@ -0,0 +1,20 @@ +using System.Collections.Generic; + +namespace StellaOps.Workflow.Contracts; + +public sealed record WorkflowCancelRequest +{ + public required string WorkflowInstanceId { get; init; } + public string SignalName { get; init; } = "cancel"; + public string? CleanupWorkflowName { get; init; } + public IDictionary CleanupPayload { get; init; } = new Dictionary(); +} + +public sealed record WorkflowCancelResponse +{ + public required string WorkflowInstanceId { get; init; } + public required bool Cancelled { get; init; } + public string? SignalId { get; init; } + public bool CleanupStarted { get; init; } + public string? CleanupWorkflowInstanceId { get; init; } +} diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.MongoDB/MongoWorkflowProjectionStore.cs b/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.MongoDB/MongoWorkflowProjectionStore.cs index 3e6456bd0..57cb22337 100644 --- a/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.MongoDB/MongoWorkflowProjectionStore.cs +++ b/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.MongoDB/MongoWorkflowProjectionStore.cs @@ -96,6 +96,7 @@ public sealed class MongoWorkflowProjectionStore( WorkflowTasksGetRequest request, CancellationToken cancellationToken = default) { + var businessReferenceKey = WorkflowBusinessReferenceExtensions.NormalizeBusinessReferenceKey(request.BusinessReferenceKey); var filter = FilterDefinition.Empty; if (!string.IsNullOrWhiteSpace(request.WorkflowName)) { @@ -112,9 +113,9 @@ public sealed class MongoWorkflowProjectionStore( filter &= Builders.Filter.Eq(x => x.WorkflowInstanceId, request.WorkflowInstanceId); } - if (!string.IsNullOrWhiteSpace(request.BusinessReferenceKey)) + if (!string.IsNullOrWhiteSpace(businessReferenceKey)) { - filter &= Builders.Filter.Eq(x => x.BusinessReferenceKey, request.BusinessReferenceKey); + filter &= Builders.Filter.Eq(x => x.BusinessReferenceKey, businessReferenceKey); } if (!string.IsNullOrWhiteSpace(request.Assignee)) @@ -133,7 +134,7 @@ public sealed class MongoWorkflowProjectionStore( cancellationToken); var summaries = taskDocuments .Select(MapTaskSummary) - .Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(request.BusinessReferenceKey, request.BusinessReferenceParts)) + .Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(businessReferenceKey, request.BusinessReferenceParts)) .ToArray(); if (request.CandidateRoles.Count == 0) @@ -445,6 +446,7 @@ public sealed class MongoWorkflowProjectionStore( WorkflowInstancesGetRequest request, CancellationToken cancellationToken = default) { + var businessReferenceKey = WorkflowBusinessReferenceExtensions.NormalizeBusinessReferenceKey(request.BusinessReferenceKey); var filter = FilterDefinition.Empty; if (!string.IsNullOrWhiteSpace(request.WorkflowName)) { @@ -456,9 +458,9 @@ public sealed class MongoWorkflowProjectionStore( filter &= Builders.Filter.Eq(x => x.WorkflowVersion, request.WorkflowVersion); } - if (!string.IsNullOrWhiteSpace(request.BusinessReferenceKey)) + if (!string.IsNullOrWhiteSpace(businessReferenceKey)) { - filter &= Builders.Filter.Eq(x => x.BusinessReferenceKey, request.BusinessReferenceKey); + filter &= Builders.Filter.Eq(x => x.BusinessReferenceKey, businessReferenceKey); } if (!string.IsNullOrWhiteSpace(request.Status)) @@ -473,7 +475,7 @@ public sealed class MongoWorkflowProjectionStore( return instanceDocuments .Select(MapInstanceSummary) - .Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(request.BusinessReferenceKey, request.BusinessReferenceParts)) + .Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(businessReferenceKey, request.BusinessReferenceParts)) .ToArray(); } @@ -912,9 +914,16 @@ public sealed class MongoWorkflowProjectionStore( private static IReadOnlyDictionary ToPublicTaskPayload( IReadOnlyDictionary payload) { - return payload - .Where(x => !string.Equals(x.Key, WorkflowRuntimePayloadKeys.ProjectionWorkflowInstanceIdPayloadKey, StringComparison.OrdinalIgnoreCase)) - .ToDictionary(x => x.Key, x => x.Value, StringComparer.OrdinalIgnoreCase); + var result = new Dictionary(StringComparer.OrdinalIgnoreCase); + foreach (var item in payload) + { + if (!string.Equals(item.Key, WorkflowRuntimePayloadKeys.ProjectionWorkflowInstanceIdPayloadKey, StringComparison.OrdinalIgnoreCase)) + { + result[item.Key] = item.Value; + } + } + + return result; } private static string? TryReadProjectionWorkflowInstanceId( diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.PostgreSQL/PostgresWorkflowJson.cs b/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.PostgreSQL/PostgresWorkflowJson.cs index b8c5fd1cc..0b7c09ebe 100644 --- a/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.PostgreSQL/PostgresWorkflowJson.cs +++ b/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.PostgreSQL/PostgresWorkflowJson.cs @@ -89,12 +89,19 @@ internal static class PostgresWorkflowJson public static IReadOnlyDictionary ToPublicTaskPayload( IReadOnlyDictionary payload) { - return payload - .Where(x => !string.Equals( - x.Key, - WorkflowRuntimePayloadKeys.ProjectionWorkflowInstanceIdPayloadKey, - StringComparison.OrdinalIgnoreCase)) - .ToDictionary(x => x.Key, x => x.Value, StringComparer.OrdinalIgnoreCase); + var result = new Dictionary(StringComparer.OrdinalIgnoreCase); + foreach (var item in payload) + { + if (!string.Equals( + item.Key, + WorkflowRuntimePayloadKeys.ProjectionWorkflowInstanceIdPayloadKey, + StringComparison.OrdinalIgnoreCase)) + { + result[item.Key] = item.Value; + } + } + + return result; } public static string? TryReadProjectionWorkflowInstanceId(IReadOnlyDictionary payload) diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.PostgreSQL/PostgresWorkflowProjectionStore.cs b/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.PostgreSQL/PostgresWorkflowProjectionStore.cs index d89d33479..4a12cc6e2 100644 --- a/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.PostgreSQL/PostgresWorkflowProjectionStore.cs +++ b/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.PostgreSQL/PostgresWorkflowProjectionStore.cs @@ -192,6 +192,7 @@ public sealed class PostgresWorkflowProjectionStore( WorkflowTasksGetRequest request, CancellationToken cancellationToken) { + var businessReferenceKey = WorkflowBusinessReferenceExtensions.NormalizeBusinessReferenceKey(request.BusinessReferenceKey); await using var scope = await database.OpenScopeAsync(requireTransaction: false, cancellationToken); await using var command = database.CreateCommand( scope.Connection, @@ -228,7 +229,7 @@ public sealed class PostgresWorkflowProjectionStore( command.Parameters.Add("workflow_name", NpgsqlDbType.Text).Value = (object?)request.WorkflowName ?? DBNull.Value; command.Parameters.Add("workflow_version", NpgsqlDbType.Text).Value = (object?)request.WorkflowVersion ?? DBNull.Value; command.Parameters.Add("workflow_instance_id", NpgsqlDbType.Text).Value = (object?)request.WorkflowInstanceId ?? DBNull.Value; - command.Parameters.Add("business_reference_key", NpgsqlDbType.Text).Value = (object?)request.BusinessReferenceKey ?? DBNull.Value; + command.Parameters.Add("business_reference_key", NpgsqlDbType.Text).Value = (object?)businessReferenceKey ?? DBNull.Value; command.Parameters.Add("assignee", NpgsqlDbType.Text).Value = (object?)request.Assignee ?? DBNull.Value; command.Parameters.Add("status", NpgsqlDbType.Text).Value = (object?)request.Status ?? DBNull.Value; @@ -240,7 +241,7 @@ public sealed class PostgresWorkflowProjectionStore( } var filtered = tasks - .Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(request.BusinessReferenceKey, request.BusinessReferenceParts)) + .Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(businessReferenceKey, request.BusinessReferenceParts)) .ToArray(); if (request.CandidateRoles.Count == 0) { @@ -531,6 +532,7 @@ public sealed class PostgresWorkflowProjectionStore( WorkflowInstancesGetRequest request, CancellationToken cancellationToken) { + var businessReferenceKey = WorkflowBusinessReferenceExtensions.NormalizeBusinessReferenceKey(request.BusinessReferenceKey); await using var scope = await database.OpenScopeAsync(requireTransaction: false, cancellationToken); await using var command = database.CreateCommand( scope.Connection, @@ -552,7 +554,7 @@ public sealed class PostgresWorkflowProjectionStore( """); command.Parameters.Add("workflow_name", NpgsqlDbType.Text).Value = (object?)request.WorkflowName ?? DBNull.Value; command.Parameters.Add("workflow_version", NpgsqlDbType.Text).Value = (object?)request.WorkflowVersion ?? DBNull.Value; - command.Parameters.Add("business_reference_key", NpgsqlDbType.Text).Value = (object?)request.BusinessReferenceKey ?? DBNull.Value; + command.Parameters.Add("business_reference_key", NpgsqlDbType.Text).Value = (object?)businessReferenceKey ?? DBNull.Value; command.Parameters.Add("status", NpgsqlDbType.Text).Value = (object?)request.Status ?? DBNull.Value; var instances = new List(); @@ -574,7 +576,7 @@ public sealed class PostgresWorkflowProjectionStore( } return instances - .Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(request.BusinessReferenceKey, request.BusinessReferenceParts)) + .Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(businessReferenceKey, request.BusinessReferenceParts)) .ToArray(); } diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Projections/WorkflowProjectionStore.cs b/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Projections/WorkflowProjectionStore.cs index b96be12b3..19aeb9e5b 100644 --- a/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Projections/WorkflowProjectionStore.cs +++ b/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Projections/WorkflowProjectionStore.cs @@ -107,6 +107,7 @@ public sealed class WorkflowProjectionStore( WorkflowTasksGetRequest request, CancellationToken cancellationToken = default) { + var businessReferenceKey = WorkflowBusinessReferenceExtensions.NormalizeBusinessReferenceKey(request.BusinessReferenceKey); var query = dbContext.WorkflowTasks.AsNoTracking().AsQueryable(); if (!string.IsNullOrWhiteSpace(request.WorkflowName)) @@ -124,9 +125,9 @@ public sealed class WorkflowProjectionStore( query = query.Where(x => x.WorkflowInstanceId == request.WorkflowInstanceId); } - if (!string.IsNullOrWhiteSpace(request.BusinessReferenceKey)) + if (!string.IsNullOrWhiteSpace(businessReferenceKey)) { - query = query.Where(x => x.BusinessReferenceKey == request.BusinessReferenceKey); + query = query.Where(x => x.BusinessReferenceKey == businessReferenceKey); } if (!string.IsNullOrWhiteSpace(request.Assignee)) @@ -145,7 +146,7 @@ public sealed class WorkflowProjectionStore( var summaries = tasks .Select(MapTaskSummary) - .Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(request.BusinessReferenceKey, request.BusinessReferenceParts)) + .Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(businessReferenceKey, request.BusinessReferenceParts)) .ToArray(); if (request.CandidateRoles.Count == 0) @@ -478,6 +479,7 @@ public sealed class WorkflowProjectionStore( WorkflowInstancesGetRequest request, CancellationToken cancellationToken = default) { + var businessReferenceKey = WorkflowBusinessReferenceExtensions.NormalizeBusinessReferenceKey(request.BusinessReferenceKey); var query = dbContext.WorkflowInstances.AsNoTracking().AsQueryable(); if (!string.IsNullOrWhiteSpace(request.WorkflowName)) @@ -490,9 +492,9 @@ public sealed class WorkflowProjectionStore( query = query.Where(x => x.WorkflowVersion == request.WorkflowVersion); } - if (!string.IsNullOrWhiteSpace(request.BusinessReferenceKey)) + if (!string.IsNullOrWhiteSpace(businessReferenceKey)) { - query = query.Where(x => x.BusinessReferenceKey == request.BusinessReferenceKey); + query = query.Where(x => x.BusinessReferenceKey == businessReferenceKey); } if (!string.IsNullOrWhiteSpace(request.Status)) @@ -515,7 +517,7 @@ public sealed class WorkflowProjectionStore( .ToArrayAsync(cancellationToken); return instances - .Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(request.BusinessReferenceKey, request.BusinessReferenceParts)) + .Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(businessReferenceKey, request.BusinessReferenceParts)) .ToArray(); }