Add Workflow cancel contracts and projection store improvements

Introduce WorkflowCancelContracts, update IWorkflowRuntimeApi with cancel
support, and refine Postgres/Mongo projection store serialization.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
master
2026-04-06 08:53:22 +03:00
parent 7dd22e4b16
commit 7be7978580
7 changed files with 131 additions and 32 deletions

View File

@@ -46,4 +46,8 @@ public interface IWorkflowRuntimeApi
Task<WorkflowSignalRaiseResponse> RaiseExternalSignalAsync( Task<WorkflowSignalRaiseResponse> RaiseExternalSignalAsync(
WorkflowSignalRaiseRequest request, WorkflowSignalRaiseRequest request,
CancellationToken cancellationToken = default); CancellationToken cancellationToken = default);
Task<WorkflowCancelResponse> CancelWorkflowAsync(
WorkflowCancelRequest request,
CancellationToken cancellationToken = default);
} }

View File

@@ -2,6 +2,7 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Globalization; using System.Globalization;
using System.Linq; using System.Linq;
using System.Security.Cryptography;
using System.Text; using System.Text;
using System.Text.Json; using System.Text.Json;
@@ -11,6 +12,9 @@ namespace StellaOps.Workflow.Abstractions;
public static class WorkflowBusinessReferenceExtensions public static class WorkflowBusinessReferenceExtensions
{ {
public const int MaxBusinessReferenceKeyLength = 128;
private const int CompactBusinessReferenceHashLength = 16;
public static WorkflowBusinessReference? NormalizeBusinessReference(WorkflowBusinessReference? businessReference) public static WorkflowBusinessReference? NormalizeBusinessReference(WorkflowBusinessReference? businessReference)
{ {
if (businessReference is null) if (businessReference is null)
@@ -29,7 +33,7 @@ public static class WorkflowBusinessReferenceExtensions
var key = string.IsNullOrWhiteSpace(businessReference.Key) var key = string.IsNullOrWhiteSpace(businessReference.Key)
? BuildCanonicalBusinessReferenceKey(normalizedParts) ? BuildCanonicalBusinessReferenceKey(normalizedParts)
: ConvertBusinessReferenceValueToString(businessReference.Key); : NormalizeBusinessReferenceKey(ConvertBusinessReferenceValueToString(businessReference.Key));
if (string.IsNullOrWhiteSpace(key) && normalizedParts.Count == 0) if (string.IsNullOrWhiteSpace(key) && normalizedParts.Count == 0)
{ {
@@ -72,7 +76,36 @@ public static class WorkflowBusinessReferenceExtensions
builder.Append(Uri.EscapeDataString(normalizedParts[index].Value!)); builder.Append(Uri.EscapeDataString(normalizedParts[index].Value!));
} }
return builder.ToString(); return NormalizeBusinessReferenceKey(builder.ToString());
}
public static string? NormalizeBusinessReferenceKey(string? key)
{
if (string.IsNullOrWhiteSpace(key))
{
return null;
}
if (key.Length <= MaxBusinessReferenceKeyLength)
{
return key;
}
var marker = $"|h={ComputeBusinessReferenceHash(key)}|";
var availableCharacters = MaxBusinessReferenceKeyLength - marker.Length;
if (availableCharacters <= 0)
{
return marker.Length > MaxBusinessReferenceKeyLength
? marker[..MaxBusinessReferenceKeyLength]
: marker;
}
var prefixLength = availableCharacters / 2;
var suffixLength = availableCharacters - prefixLength;
return string.Concat(
key.AsSpan(0, prefixLength),
marker,
key.AsSpan(key.Length - suffixLength, suffixLength));
} }
public static bool MatchesBusinessReferenceFilter( public static bool MatchesBusinessReferenceFilter(
@@ -81,8 +114,9 @@ public static class WorkflowBusinessReferenceExtensions
IDictionary<string, object?> parts) IDictionary<string, object?> parts)
{ {
var normalizedReference = NormalizeBusinessReference(businessReference); var normalizedReference = NormalizeBusinessReference(businessReference);
if (!string.IsNullOrWhiteSpace(key) var normalizedFilterKey = NormalizeBusinessReferenceKey(key);
&& !string.Equals(normalizedReference?.Key, key, StringComparison.Ordinal)) if (!string.IsNullOrWhiteSpace(normalizedFilterKey)
&& !string.Equals(normalizedReference?.Key, normalizedFilterKey, StringComparison.Ordinal))
{ {
return false; return false;
} }
@@ -175,11 +209,32 @@ public static class WorkflowBusinessReferenceExtensions
{ {
return value switch return value switch
{ {
IDictionary<string, object?> dictionary => new Dictionary<string, object?>(dictionary, StringComparer.OrdinalIgnoreCase), IDictionary<string, object?> dictionary => CopyLastWins(dictionary),
JsonElement element when element.ValueKind == JsonValueKind.Object => JsonElement element when element.ValueKind == JsonValueKind.Object =>
JsonSerializer.Deserialize<Dictionary<string, object?>>(element.GetRawText()) CopyLastWins(
?? new Dictionary<string, object?>(StringComparer.OrdinalIgnoreCase), JsonSerializer.Deserialize<Dictionary<string, object?>>(element.GetRawText())
?? new Dictionary<string, object?>()),
_ => new Dictionary<string, object?>(StringComparer.OrdinalIgnoreCase), _ => new Dictionary<string, object?>(StringComparer.OrdinalIgnoreCase),
}; };
} }
private static Dictionary<string, object?> CopyLastWins(IEnumerable<KeyValuePair<string, object?>> source)
{
var result = new Dictionary<string, object?>(StringComparer.OrdinalIgnoreCase);
foreach (var pair in source)
{
if (!string.IsNullOrWhiteSpace(pair.Key))
{
result[pair.Key] = pair.Value;
}
}
return result;
}
private static string ComputeBusinessReferenceHash(string key)
{
var hashBytes = SHA256.HashData(Encoding.UTF8.GetBytes(key));
return Convert.ToHexString(hashBytes[..(CompactBusinessReferenceHashLength / 2)]);
}
} }

View File

@@ -0,0 +1,20 @@
using System.Collections.Generic;
namespace StellaOps.Workflow.Contracts;
public sealed record WorkflowCancelRequest
{
public required string WorkflowInstanceId { get; init; }
public string SignalName { get; init; } = "cancel";
public string? CleanupWorkflowName { get; init; }
public IDictionary<string, object?> CleanupPayload { get; init; } = new Dictionary<string, object?>();
}
public sealed record WorkflowCancelResponse
{
public required string WorkflowInstanceId { get; init; }
public required bool Cancelled { get; init; }
public string? SignalId { get; init; }
public bool CleanupStarted { get; init; }
public string? CleanupWorkflowInstanceId { get; init; }
}

View File

@@ -96,6 +96,7 @@ public sealed class MongoWorkflowProjectionStore(
WorkflowTasksGetRequest request, WorkflowTasksGetRequest request,
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
{ {
var businessReferenceKey = WorkflowBusinessReferenceExtensions.NormalizeBusinessReferenceKey(request.BusinessReferenceKey);
var filter = FilterDefinition<WorkflowTaskDocument>.Empty; var filter = FilterDefinition<WorkflowTaskDocument>.Empty;
if (!string.IsNullOrWhiteSpace(request.WorkflowName)) if (!string.IsNullOrWhiteSpace(request.WorkflowName))
{ {
@@ -112,9 +113,9 @@ public sealed class MongoWorkflowProjectionStore(
filter &= Builders<WorkflowTaskDocument>.Filter.Eq(x => x.WorkflowInstanceId, request.WorkflowInstanceId); filter &= Builders<WorkflowTaskDocument>.Filter.Eq(x => x.WorkflowInstanceId, request.WorkflowInstanceId);
} }
if (!string.IsNullOrWhiteSpace(request.BusinessReferenceKey)) if (!string.IsNullOrWhiteSpace(businessReferenceKey))
{ {
filter &= Builders<WorkflowTaskDocument>.Filter.Eq(x => x.BusinessReferenceKey, request.BusinessReferenceKey); filter &= Builders<WorkflowTaskDocument>.Filter.Eq(x => x.BusinessReferenceKey, businessReferenceKey);
} }
if (!string.IsNullOrWhiteSpace(request.Assignee)) if (!string.IsNullOrWhiteSpace(request.Assignee))
@@ -133,7 +134,7 @@ public sealed class MongoWorkflowProjectionStore(
cancellationToken); cancellationToken);
var summaries = taskDocuments var summaries = taskDocuments
.Select(MapTaskSummary) .Select(MapTaskSummary)
.Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(request.BusinessReferenceKey, request.BusinessReferenceParts)) .Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(businessReferenceKey, request.BusinessReferenceParts))
.ToArray(); .ToArray();
if (request.CandidateRoles.Count == 0) if (request.CandidateRoles.Count == 0)
@@ -445,6 +446,7 @@ public sealed class MongoWorkflowProjectionStore(
WorkflowInstancesGetRequest request, WorkflowInstancesGetRequest request,
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
{ {
var businessReferenceKey = WorkflowBusinessReferenceExtensions.NormalizeBusinessReferenceKey(request.BusinessReferenceKey);
var filter = FilterDefinition<WorkflowInstanceDocument>.Empty; var filter = FilterDefinition<WorkflowInstanceDocument>.Empty;
if (!string.IsNullOrWhiteSpace(request.WorkflowName)) if (!string.IsNullOrWhiteSpace(request.WorkflowName))
{ {
@@ -456,9 +458,9 @@ public sealed class MongoWorkflowProjectionStore(
filter &= Builders<WorkflowInstanceDocument>.Filter.Eq(x => x.WorkflowVersion, request.WorkflowVersion); filter &= Builders<WorkflowInstanceDocument>.Filter.Eq(x => x.WorkflowVersion, request.WorkflowVersion);
} }
if (!string.IsNullOrWhiteSpace(request.BusinessReferenceKey)) if (!string.IsNullOrWhiteSpace(businessReferenceKey))
{ {
filter &= Builders<WorkflowInstanceDocument>.Filter.Eq(x => x.BusinessReferenceKey, request.BusinessReferenceKey); filter &= Builders<WorkflowInstanceDocument>.Filter.Eq(x => x.BusinessReferenceKey, businessReferenceKey);
} }
if (!string.IsNullOrWhiteSpace(request.Status)) if (!string.IsNullOrWhiteSpace(request.Status))
@@ -473,7 +475,7 @@ public sealed class MongoWorkflowProjectionStore(
return instanceDocuments return instanceDocuments
.Select(MapInstanceSummary) .Select(MapInstanceSummary)
.Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(request.BusinessReferenceKey, request.BusinessReferenceParts)) .Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(businessReferenceKey, request.BusinessReferenceParts))
.ToArray(); .ToArray();
} }
@@ -912,9 +914,16 @@ public sealed class MongoWorkflowProjectionStore(
private static IReadOnlyDictionary<string, JsonElement> ToPublicTaskPayload( private static IReadOnlyDictionary<string, JsonElement> ToPublicTaskPayload(
IReadOnlyDictionary<string, JsonElement> payload) IReadOnlyDictionary<string, JsonElement> payload)
{ {
return payload var result = new Dictionary<string, JsonElement>(StringComparer.OrdinalIgnoreCase);
.Where(x => !string.Equals(x.Key, WorkflowRuntimePayloadKeys.ProjectionWorkflowInstanceIdPayloadKey, StringComparison.OrdinalIgnoreCase)) foreach (var item in payload)
.ToDictionary(x => x.Key, x => x.Value, StringComparer.OrdinalIgnoreCase); {
if (!string.Equals(item.Key, WorkflowRuntimePayloadKeys.ProjectionWorkflowInstanceIdPayloadKey, StringComparison.OrdinalIgnoreCase))
{
result[item.Key] = item.Value;
}
}
return result;
} }
private static string? TryReadProjectionWorkflowInstanceId( private static string? TryReadProjectionWorkflowInstanceId(

View File

@@ -89,12 +89,19 @@ internal static class PostgresWorkflowJson
public static IReadOnlyDictionary<string, JsonElement> ToPublicTaskPayload( public static IReadOnlyDictionary<string, JsonElement> ToPublicTaskPayload(
IReadOnlyDictionary<string, JsonElement> payload) IReadOnlyDictionary<string, JsonElement> payload)
{ {
return payload var result = new Dictionary<string, JsonElement>(StringComparer.OrdinalIgnoreCase);
.Where(x => !string.Equals( foreach (var item in payload)
x.Key, {
WorkflowRuntimePayloadKeys.ProjectionWorkflowInstanceIdPayloadKey, if (!string.Equals(
StringComparison.OrdinalIgnoreCase)) item.Key,
.ToDictionary(x => x.Key, x => x.Value, StringComparer.OrdinalIgnoreCase); WorkflowRuntimePayloadKeys.ProjectionWorkflowInstanceIdPayloadKey,
StringComparison.OrdinalIgnoreCase))
{
result[item.Key] = item.Value;
}
}
return result;
} }
public static string? TryReadProjectionWorkflowInstanceId(IReadOnlyDictionary<string, JsonElement> payload) public static string? TryReadProjectionWorkflowInstanceId(IReadOnlyDictionary<string, JsonElement> payload)

View File

@@ -192,6 +192,7 @@ public sealed class PostgresWorkflowProjectionStore(
WorkflowTasksGetRequest request, WorkflowTasksGetRequest request,
CancellationToken cancellationToken) CancellationToken cancellationToken)
{ {
var businessReferenceKey = WorkflowBusinessReferenceExtensions.NormalizeBusinessReferenceKey(request.BusinessReferenceKey);
await using var scope = await database.OpenScopeAsync(requireTransaction: false, cancellationToken); await using var scope = await database.OpenScopeAsync(requireTransaction: false, cancellationToken);
await using var command = database.CreateCommand( await using var command = database.CreateCommand(
scope.Connection, scope.Connection,
@@ -228,7 +229,7 @@ public sealed class PostgresWorkflowProjectionStore(
command.Parameters.Add("workflow_name", NpgsqlDbType.Text).Value = (object?)request.WorkflowName ?? DBNull.Value; command.Parameters.Add("workflow_name", NpgsqlDbType.Text).Value = (object?)request.WorkflowName ?? DBNull.Value;
command.Parameters.Add("workflow_version", NpgsqlDbType.Text).Value = (object?)request.WorkflowVersion ?? DBNull.Value; command.Parameters.Add("workflow_version", NpgsqlDbType.Text).Value = (object?)request.WorkflowVersion ?? DBNull.Value;
command.Parameters.Add("workflow_instance_id", NpgsqlDbType.Text).Value = (object?)request.WorkflowInstanceId ?? DBNull.Value; command.Parameters.Add("workflow_instance_id", NpgsqlDbType.Text).Value = (object?)request.WorkflowInstanceId ?? DBNull.Value;
command.Parameters.Add("business_reference_key", NpgsqlDbType.Text).Value = (object?)request.BusinessReferenceKey ?? DBNull.Value; command.Parameters.Add("business_reference_key", NpgsqlDbType.Text).Value = (object?)businessReferenceKey ?? DBNull.Value;
command.Parameters.Add("assignee", NpgsqlDbType.Text).Value = (object?)request.Assignee ?? DBNull.Value; command.Parameters.Add("assignee", NpgsqlDbType.Text).Value = (object?)request.Assignee ?? DBNull.Value;
command.Parameters.Add("status", NpgsqlDbType.Text).Value = (object?)request.Status ?? DBNull.Value; command.Parameters.Add("status", NpgsqlDbType.Text).Value = (object?)request.Status ?? DBNull.Value;
@@ -240,7 +241,7 @@ public sealed class PostgresWorkflowProjectionStore(
} }
var filtered = tasks var filtered = tasks
.Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(request.BusinessReferenceKey, request.BusinessReferenceParts)) .Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(businessReferenceKey, request.BusinessReferenceParts))
.ToArray(); .ToArray();
if (request.CandidateRoles.Count == 0) if (request.CandidateRoles.Count == 0)
{ {
@@ -531,6 +532,7 @@ public sealed class PostgresWorkflowProjectionStore(
WorkflowInstancesGetRequest request, WorkflowInstancesGetRequest request,
CancellationToken cancellationToken) CancellationToken cancellationToken)
{ {
var businessReferenceKey = WorkflowBusinessReferenceExtensions.NormalizeBusinessReferenceKey(request.BusinessReferenceKey);
await using var scope = await database.OpenScopeAsync(requireTransaction: false, cancellationToken); await using var scope = await database.OpenScopeAsync(requireTransaction: false, cancellationToken);
await using var command = database.CreateCommand( await using var command = database.CreateCommand(
scope.Connection, scope.Connection,
@@ -552,7 +554,7 @@ public sealed class PostgresWorkflowProjectionStore(
"""); """);
command.Parameters.Add("workflow_name", NpgsqlDbType.Text).Value = (object?)request.WorkflowName ?? DBNull.Value; command.Parameters.Add("workflow_name", NpgsqlDbType.Text).Value = (object?)request.WorkflowName ?? DBNull.Value;
command.Parameters.Add("workflow_version", NpgsqlDbType.Text).Value = (object?)request.WorkflowVersion ?? DBNull.Value; command.Parameters.Add("workflow_version", NpgsqlDbType.Text).Value = (object?)request.WorkflowVersion ?? DBNull.Value;
command.Parameters.Add("business_reference_key", NpgsqlDbType.Text).Value = (object?)request.BusinessReferenceKey ?? DBNull.Value; command.Parameters.Add("business_reference_key", NpgsqlDbType.Text).Value = (object?)businessReferenceKey ?? DBNull.Value;
command.Parameters.Add("status", NpgsqlDbType.Text).Value = (object?)request.Status ?? DBNull.Value; command.Parameters.Add("status", NpgsqlDbType.Text).Value = (object?)request.Status ?? DBNull.Value;
var instances = new List<WorkflowInstanceSummary>(); var instances = new List<WorkflowInstanceSummary>();
@@ -574,7 +576,7 @@ public sealed class PostgresWorkflowProjectionStore(
} }
return instances return instances
.Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(request.BusinessReferenceKey, request.BusinessReferenceParts)) .Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(businessReferenceKey, request.BusinessReferenceParts))
.ToArray(); .ToArray();
} }

View File

@@ -107,6 +107,7 @@ public sealed class WorkflowProjectionStore(
WorkflowTasksGetRequest request, WorkflowTasksGetRequest request,
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
{ {
var businessReferenceKey = WorkflowBusinessReferenceExtensions.NormalizeBusinessReferenceKey(request.BusinessReferenceKey);
var query = dbContext.WorkflowTasks.AsNoTracking().AsQueryable(); var query = dbContext.WorkflowTasks.AsNoTracking().AsQueryable();
if (!string.IsNullOrWhiteSpace(request.WorkflowName)) if (!string.IsNullOrWhiteSpace(request.WorkflowName))
@@ -124,9 +125,9 @@ public sealed class WorkflowProjectionStore(
query = query.Where(x => x.WorkflowInstanceId == request.WorkflowInstanceId); query = query.Where(x => x.WorkflowInstanceId == request.WorkflowInstanceId);
} }
if (!string.IsNullOrWhiteSpace(request.BusinessReferenceKey)) if (!string.IsNullOrWhiteSpace(businessReferenceKey))
{ {
query = query.Where(x => x.BusinessReferenceKey == request.BusinessReferenceKey); query = query.Where(x => x.BusinessReferenceKey == businessReferenceKey);
} }
if (!string.IsNullOrWhiteSpace(request.Assignee)) if (!string.IsNullOrWhiteSpace(request.Assignee))
@@ -145,7 +146,7 @@ public sealed class WorkflowProjectionStore(
var summaries = tasks var summaries = tasks
.Select(MapTaskSummary) .Select(MapTaskSummary)
.Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(request.BusinessReferenceKey, request.BusinessReferenceParts)) .Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(businessReferenceKey, request.BusinessReferenceParts))
.ToArray(); .ToArray();
if (request.CandidateRoles.Count == 0) if (request.CandidateRoles.Count == 0)
@@ -478,6 +479,7 @@ public sealed class WorkflowProjectionStore(
WorkflowInstancesGetRequest request, WorkflowInstancesGetRequest request,
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
{ {
var businessReferenceKey = WorkflowBusinessReferenceExtensions.NormalizeBusinessReferenceKey(request.BusinessReferenceKey);
var query = dbContext.WorkflowInstances.AsNoTracking().AsQueryable(); var query = dbContext.WorkflowInstances.AsNoTracking().AsQueryable();
if (!string.IsNullOrWhiteSpace(request.WorkflowName)) if (!string.IsNullOrWhiteSpace(request.WorkflowName))
@@ -490,9 +492,9 @@ public sealed class WorkflowProjectionStore(
query = query.Where(x => x.WorkflowVersion == request.WorkflowVersion); query = query.Where(x => x.WorkflowVersion == request.WorkflowVersion);
} }
if (!string.IsNullOrWhiteSpace(request.BusinessReferenceKey)) if (!string.IsNullOrWhiteSpace(businessReferenceKey))
{ {
query = query.Where(x => x.BusinessReferenceKey == request.BusinessReferenceKey); query = query.Where(x => x.BusinessReferenceKey == businessReferenceKey);
} }
if (!string.IsNullOrWhiteSpace(request.Status)) if (!string.IsNullOrWhiteSpace(request.Status))
@@ -515,7 +517,7 @@ public sealed class WorkflowProjectionStore(
.ToArrayAsync(cancellationToken); .ToArrayAsync(cancellationToken);
return instances return instances
.Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(request.BusinessReferenceKey, request.BusinessReferenceParts)) .Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(businessReferenceKey, request.BusinessReferenceParts))
.ToArray(); .ToArray();
} }