From 6ec6c4ebea7020a9d855fe7569d3780df23a0dd6 Mon Sep 17 00:00:00 2001 From: master <> Date: Tue, 14 Apr 2026 12:21:47 +0300 Subject: [PATCH] feat(workflow): server-side sort + dead-letter paging (backport) - New shared `WorkflowSortModel { Prop, Direction }` record; 4 list requests gain an optional `Sort` property and the dead-letter request gains `Skip/Take` plus `TotalCount` on the response. Matches the `sortModel: { prop, direction }` convention that sc-table-view emits, so client payloads bind directly. - `WorkflowSortExpressions` whitelist helper (public) applies sort on instance and task queries with a PK tie-breaker for stable pagination. Unknown columns raise `BaseResultException(WorkflowSortColumnNotAllowed, ...)` rather than leaking into the ORDER BY. Projection store picks up the helper on both the instance and task list paths. - Dead-letter stores uplifted per driver: * PostgreSQL: OFFSET/LIMIT + whitelisted ORDER BY, separate COUNT(*) query. * MongoDB: Skip/Limit/Sort builder + CountDocumentsAsync for total. * Oracle AQ: browse to a 500-cap, filter+sort+page in process, TotalCount = post-filter length (queue-browse can't offset/sort natively). - New StellaOps.Workflow.Engine.Tests cover the sort helper whitelist + tie- breaker behaviour; all 9 tests pass alongside the 24 earlier converter + OnComplete tests. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../WorkflowDefinitionsContracts.cs | 15 ++ .../WorkflowInstanceContracts.cs | 16 ++ .../WorkflowOperationalContracts.cs | 22 +++ .../WorkflowSortModel.cs | 15 ++ .../WorkflowTaskContracts.cs | 16 ++ .../MongoWorkflowSignalStore.cs | 43 ++++- .../PostgresWorkflowSignalStore.cs | 60 ++++++- .../Constants/MessageKeys.cs | 1 + .../Projections/WorkflowProjectionStore.cs | 38 +++-- .../Projections/WorkflowSortExpressions.cs | 80 +++++++++ .../OracleAqWorkflowSignalDeadLetterStore.cs | 61 ++++++- .../WorkflowSortExpressionsTests.cs | 158 ++++++++++++++++++ 12 files changed, 505 insertions(+), 20 deletions(-) create mode 100644 src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowSortModel.cs create mode 100644 src/Workflow/__Libraries/StellaOps.Workflow.Engine/Projections/WorkflowSortExpressions.cs create mode 100644 src/Workflow/__Tests/StellaOps.Workflow.Engine.Tests/Projections/WorkflowSortExpressionsTests.cs diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowDefinitionsContracts.cs b/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowDefinitionsContracts.cs index bff1b4d95..a3d59ce14 100644 --- a/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowDefinitionsContracts.cs +++ b/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowDefinitionsContracts.cs @@ -26,9 +26,24 @@ public sealed record WorkflowDefinitionGetRequest /// Filter by multiple workflow names. public IReadOnlyCollection WorkflowNames { get; init; } = []; + + /// Pagination: rows to skip. 0 = start from beginning. + public int Skip { get; init; } + + /// Pagination: max rows to return. 0 = return all. + public int Take { get; init; } + + /// + /// Optional sort. SortBy is whitelisted per-endpoint — for definitions the allowed values are + /// "workflowName", "workflowVersion", "displayName". Null sorts by a stable default. + /// + public WorkflowSortModel? Sort { get; init; } } public sealed record WorkflowDefinitionGetResponse { public IReadOnlyCollection Definitions { get; init; } = []; + + /// Total number of definitions matching the filter (pre-pagination). + public int TotalCount { get; init; } } diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowInstanceContracts.cs b/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowInstanceContracts.cs index 8967e60d5..97129b20e 100644 --- a/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowInstanceContracts.cs +++ b/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowInstanceContracts.cs @@ -40,11 +40,27 @@ public sealed record WorkflowInstancesGetRequest /// When true, populate ActiveTask and WorkflowState on each instance summary. public bool IncludeDetails { get; init; } + + /// Pagination: rows to skip. 0 = start from beginning. + public int Skip { get; init; } + + /// Pagination: max rows to return. 0 = use server default cap. + public int Take { get; init; } + + /// + /// Optional sort. SortBy is whitelisted per-endpoint — for instances the allowed values are + /// "workflowInstanceId", "workflowName", "workflowVersion", "status", "createdOnUtc", + /// "completedOnUtc", "businessReferenceKey". Null sorts by createdOnUtc desc by default. + /// + public WorkflowSortModel? Sort { get; init; } } public sealed record WorkflowInstancesGetResponse { public IReadOnlyCollection Instances { get; init; } = []; + + /// Total number of rows matching the filter (pre-pagination). + public int TotalCount { get; init; } } public sealed record WorkflowInstanceGetRequest diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowOperationalContracts.cs b/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowOperationalContracts.cs index da5000022..0e4f60698 100644 --- a/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowOperationalContracts.cs +++ b/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowOperationalContracts.cs @@ -75,8 +75,27 @@ public sealed record WorkflowSignalDeadLettersGetRequest public string? SignalId { get; init; } public string? WorkflowInstanceId { get; init; } public string? SignalType { get; init; } + + /// + /// Hard safety cap on how many messages the underlying driver is allowed to materialise + /// before paging is applied. Clamped to 500. Kept separate from so slow + /// drivers (e.g. Oracle AQ browse) don't get asked for an unbounded result set. + /// public int MaxMessages { get; init; } = 50; + public bool IncludeRawPayload { get; init; } + + /// Pagination: rows to skip. 0 = start from beginning. + public int Skip { get; init; } + + /// Pagination: max rows to return. 0 = use as the effective cap. + public int Take { get; init; } + + /// + /// Optional sort. SortBy is whitelisted — allowed values are "signalId", "workflowInstanceId", + /// "signalType", "enqueuedOnUtc", "deliveryCount". Null sorts by enqueuedOnUtc desc. + /// + public WorkflowSortModel? Sort { get; init; } } public sealed record WorkflowSignalDeadLetterMessage @@ -101,6 +120,9 @@ public sealed record WorkflowSignalDeadLetterMessage public sealed record WorkflowSignalDeadLettersGetResponse { public IReadOnlyCollection Messages { get; init; } = []; + + /// Total number of rows matching the filter (pre-pagination). + public int TotalCount { get; init; } } public sealed record WorkflowSignalDeadLetterReplayRequest diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowSortModel.cs b/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowSortModel.cs new file mode 100644 index 000000000..9863c33e9 --- /dev/null +++ b/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowSortModel.cs @@ -0,0 +1,15 @@ +namespace StellaOps.Workflow.Contracts; + +/// +/// Server-side sort request for list endpoints. is matched against a +/// per-endpoint whitelist on the server — unknown values are rejected with a validation error +/// rather than silently ignored, so typos and client/server drift surface early. +/// +public sealed record WorkflowSortModel +{ + /// Whitelisted column identifier (e.g. "createdOnUtc", "workflowName"). + public required string Prop { get; init; } + + /// "asc" or "desc". Defaults to ascending. + public string Direction { get; init; } = "asc"; +} diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowTaskContracts.cs b/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowTaskContracts.cs index 06e080f81..a1b5ed9b4 100644 --- a/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowTaskContracts.cs +++ b/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowTaskContracts.cs @@ -56,11 +56,27 @@ public sealed record WorkflowTasksGetRequest public string? ActorId { get; init; } public IReadOnlyCollection ActorRoles { get; init; } = []; public IReadOnlyCollection CandidateRoles { get; init; } = []; + + /// Pagination: rows to skip. 0 = start from beginning. + public int Skip { get; init; } + + /// Pagination: max rows to return. 0 = use server default cap. + public int Take { get; init; } + + /// + /// Optional sort. SortBy is whitelisted per-endpoint — for tasks the allowed values are + /// "workflowTaskId", "taskName", "workflowName", "workflowVersion", "status", "assignee", + /// "createdOnUtc", "completedOnUtc", "deadlineUtc". Null sorts by createdOnUtc desc by default. + /// + public WorkflowSortModel? Sort { get; init; } } public sealed record WorkflowTasksGetResponse { public IReadOnlyCollection Tasks { get; init; } = []; + + /// Total number of rows matching the filter (pre-pagination). + public int TotalCount { get; init; } } public sealed record WorkflowTaskGetRequest diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.MongoDB/MongoWorkflowSignalStore.cs b/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.MongoDB/MongoWorkflowSignalStore.cs index f88cbb897..d6171c087 100644 --- a/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.MongoDB/MongoWorkflowSignalStore.cs +++ b/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.MongoDB/MongoWorkflowSignalStore.cs @@ -284,14 +284,24 @@ public sealed class MongoWorkflowSignalStore( filter &= Builders.Filter.Eq(x => x.SignalType, request.SignalType); } + var effectiveTake = request.Take > 0 ? request.Take : request.MaxMessages; + effectiveTake = Math.Clamp(effectiveTake, 1, 500); + var effectiveSkip = Math.Max(0, request.Skip); + + // Count for the response envelope so the UI can render "page X of Y". + var totalCount = (int)Math.Min(int.MaxValue, + await deadLetters.CountDocumentsAsync(filter, cancellationToken: cancellationToken)); + + var sort = BuildDeadLetterSort(request.Sort); var documents = await deadLetters.Find(filter) - .SortByDescending(x => x.DeadLetteredOnUtc) - .ThenBy(x => x.SignalId) - .Limit(Math.Clamp(request.MaxMessages, 1, 200)) + .Sort(sort) + .Skip(effectiveSkip) + .Limit(effectiveTake) .ToListAsync(cancellationToken); return new WorkflowSignalDeadLettersGetResponse { + TotalCount = totalCount, Messages = documents.Select(document => { try @@ -342,6 +352,33 @@ public sealed class MongoWorkflowSignalStore( }; } + private static SortDefinition BuildDeadLetterSort(WorkflowSortModel? sort) + { + var builder = Builders.Sort; + if (sort is null) + { + return builder.Combine( + builder.Descending(x => x.DeadLetteredOnUtc), + builder.Ascending(x => x.SignalId)); + } + + var desc = string.Equals(sort.Direction?.Trim(), "desc", StringComparison.OrdinalIgnoreCase); + SortDefinition primary = (sort.Prop ?? string.Empty).ToLowerInvariant() switch + { + "signalid" => desc ? builder.Descending(x => x.SignalId) : builder.Ascending(x => x.SignalId), + "workflowinstanceid" => desc ? builder.Descending(x => x.WorkflowInstanceId) : builder.Ascending(x => x.WorkflowInstanceId), + "signaltype" => desc ? builder.Descending(x => x.SignalType) : builder.Ascending(x => x.SignalType), + "enqueuedonutc" => desc ? builder.Descending(x => x.EnqueuedOnUtc) : builder.Ascending(x => x.EnqueuedOnUtc), + "deliverycount" => desc ? builder.Descending(x => x.DeliveryCount) : builder.Ascending(x => x.DeliveryCount), + _ => throw new ArgumentException( + $"Sort column '{sort.Prop}' is not supported for dead-letter listing. " + + "Allowed: signalId, workflowInstanceId, signalType, enqueuedOnUtc, deliveryCount."), + }; + + // Stable tie-breaker by SignalId (effectively the collection's unique key for dead letters). + return builder.Combine(primary, builder.Ascending(x => x.SignalId)); + } + internal async Task ReplayAsync( WorkflowSignalDeadLetterReplayRequest request, CancellationToken cancellationToken = default) diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.PostgreSQL/PostgresWorkflowSignalStore.cs b/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.PostgreSQL/PostgresWorkflowSignalStore.cs index 02b208b2f..8dfa4a906 100644 --- a/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.PostgreSQL/PostgresWorkflowSignalStore.cs +++ b/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.PostgreSQL/PostgresWorkflowSignalStore.cs @@ -305,7 +305,35 @@ public sealed class PostgresWorkflowSignalStore( { ArgumentNullException.ThrowIfNull(request); + // Resolve an ORDER BY clause from the whitelist — never interpolate user input into SQL. + var orderBy = ResolveDeadLetterOrderBy(request.Sort); + + // Effective cap: use Take when given; otherwise MaxMessages. Hard-cap at 500 so a slow + // driver can't be asked for an unbounded result set. + var effectiveTake = request.Take > 0 ? request.Take : request.MaxMessages; + effectiveTake = Math.Clamp(effectiveTake, 1, 500); + var effectiveSkip = Math.Max(0, request.Skip); + await using var scope = await database.OpenScopeAsync(requireTransaction: false, cancellationToken); + + // 1) Total count with the same WHERE (so the UI can show "page X of Y"). + int totalCount; + await using (var countCommand = database.CreateCommand( + scope.Connection, + $""" + select count(*) + from {database.Qualify(Postgres.DeadLetterTableName)} + where (@signal_id is null or signal_id = @signal_id) + and (@workflow_instance_id is null or workflow_instance_id = @workflow_instance_id) + and (@signal_type is null or signal_type = @signal_type) + """)) + { + countCommand.Parameters.Add("signal_id", NpgsqlDbType.Text).Value = (object?)request.SignalId ?? DBNull.Value; + countCommand.Parameters.Add("workflow_instance_id", NpgsqlDbType.Text).Value = (object?)request.WorkflowInstanceId ?? DBNull.Value; + countCommand.Parameters.Add("signal_type", NpgsqlDbType.Text).Value = (object?)request.SignalType ?? DBNull.Value; + totalCount = Convert.ToInt32(await countCommand.ExecuteScalarAsync(cancellationToken) ?? 0); + } + await using var command = database.CreateCommand( scope.Connection, $""" @@ -325,13 +353,15 @@ public sealed class PostgresWorkflowSignalStore( where (@signal_id is null or signal_id = @signal_id) and (@workflow_instance_id is null or workflow_instance_id = @workflow_instance_id) and (@signal_type is null or signal_type = @signal_type) - order by dead_lettered_on_utc desc, signal_id + {orderBy} + offset @skip limit @max_messages """); command.Parameters.Add("signal_id", NpgsqlDbType.Text).Value = (object?)request.SignalId ?? DBNull.Value; command.Parameters.Add("workflow_instance_id", NpgsqlDbType.Text).Value = (object?)request.WorkflowInstanceId ?? DBNull.Value; command.Parameters.Add("signal_type", NpgsqlDbType.Text).Value = (object?)request.SignalType ?? DBNull.Value; - command.Parameters.AddWithValue("max_messages", Math.Clamp(request.MaxMessages, 1, 200)); + command.Parameters.AddWithValue("max_messages", effectiveTake); + command.Parameters.AddWithValue("skip", effectiveSkip); var results = new List(); await using var reader = await command.ExecuteReaderAsync(cancellationToken); @@ -387,9 +417,35 @@ public sealed class PostgresWorkflowSignalStore( return new WorkflowSignalDeadLettersGetResponse { Messages = results, + TotalCount = totalCount, }; } + /// + /// Maps a whitelisted to a literal ORDER BY clause. + /// Enforcing the whitelist here is what keeps client-supplied identifiers out of the raw SQL + /// string. The PK-ish tie-breaker (signal_id) keeps pagination stable across ties. + /// + private static string ResolveDeadLetterOrderBy(WorkflowSortModel? sort) + { + if (sort is null) return "order by dead_lettered_on_utc desc, signal_id"; + + var desc = string.Equals(sort.Direction?.Trim(), "desc", StringComparison.OrdinalIgnoreCase); + var dir = desc ? "desc" : "asc"; + var column = (sort.Prop ?? string.Empty).ToLowerInvariant() switch + { + "signalid" => "signal_id", + "workflowinstanceid" => "workflow_instance_id", + "signaltype" => "signal_type", + "enqueuedonutc" => "enqueued_on_utc", + "deliverycount" => "delivery_count", + _ => throw new ArgumentException( + $"Sort column '{sort.Prop}' is not supported for dead-letter listing. " + + "Allowed: signalId, workflowInstanceId, signalType, enqueuedOnUtc, deliveryCount."), + }; + return $"order by {column} {dir}, signal_id"; + } + internal async Task ReplayAsync( WorkflowSignalDeadLetterReplayRequest request, CancellationToken cancellationToken = default) diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Constants/MessageKeys.cs b/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Constants/MessageKeys.cs index 915b1a5e3..6d6a76f2a 100644 --- a/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Constants/MessageKeys.cs +++ b/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Constants/MessageKeys.cs @@ -11,4 +11,5 @@ public static class MessageKeys public static string WorkflowPayloadFieldMissing => nameof(WorkflowPayloadFieldMissing); public static string WorkflowTransportFailed => nameof(WorkflowTransportFailed); public static string WorkflowRuntimeFailed => nameof(WorkflowRuntimeFailed); + public static string WorkflowSortColumnNotAllowed => nameof(WorkflowSortColumnNotAllowed); } diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Projections/WorkflowProjectionStore.cs b/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Projections/WorkflowProjectionStore.cs index 19aeb9e5b..75f1719bf 100644 --- a/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Projections/WorkflowProjectionStore.cs +++ b/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Projections/WorkflowProjectionStore.cs @@ -140,23 +140,30 @@ public sealed class WorkflowProjectionStore( query = query.Where(x => x.Status == request.Status); } - var tasks = await query - .OrderBy(x => x.CreatedOnUtc) - .ToListAsync(cancellationToken); + var orderedQuery = WorkflowSortExpressions.ApplyTaskSort(query, request.Sort); + + var tasks = await orderedQuery.ToListAsync(cancellationToken); var summaries = tasks .Select(MapTaskSummary) .Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(businessReferenceKey, request.BusinessReferenceParts)) .ToArray(); - if (request.CandidateRoles.Count == 0) + if (request.CandidateRoles.Count > 0) { - return summaries; + summaries = summaries + .Where(x => x.EffectiveRoles.Intersect(request.CandidateRoles, StringComparer.OrdinalIgnoreCase).Any()) + .ToArray(); } - return summaries - .Where(x => x.EffectiveRoles.Intersect(request.CandidateRoles, StringComparer.OrdinalIgnoreCase).Any()) - .ToArray(); + // Honour Skip/Take when the client asked for a page (0 means "all"). + if (request.Skip > 0 || request.Take > 0) + { + var take = request.Take > 0 ? request.Take : summaries.Length; + summaries = summaries.Skip(request.Skip).Take(take).ToArray(); + } + + return summaries; } public async Task GetTaskAsync( @@ -502,8 +509,9 @@ public sealed class WorkflowProjectionStore( query = query.Where(x => x.Status == request.Status); } - var instances = await query - .OrderByDescending(x => x.CreatedOnUtc) + var orderedQuery = WorkflowSortExpressions.ApplyInstanceSort(query, request.Sort); + + var instances = await orderedQuery .Select(x => new WorkflowInstanceSummary { WorkflowInstanceId = x.WorkflowInstanceId, @@ -516,9 +524,17 @@ public sealed class WorkflowProjectionStore( }) .ToArrayAsync(cancellationToken); - return instances + var filtered = instances .Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(businessReferenceKey, request.BusinessReferenceParts)) .ToArray(); + + if (request.Skip > 0 || request.Take > 0) + { + var take = request.Take > 0 ? request.Take : filtered.Length; + filtered = filtered.Skip(request.Skip).Take(take).ToArray(); + } + + return filtered; } public async Task GetInstanceAsync( diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Projections/WorkflowSortExpressions.cs b/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Projections/WorkflowSortExpressions.cs new file mode 100644 index 000000000..15de2aa58 --- /dev/null +++ b/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Projections/WorkflowSortExpressions.cs @@ -0,0 +1,80 @@ +using System; +using System.Linq; + +using StellaOps.Workflow.Contracts; +using StellaOps.Workflow.DataStore.Oracle.Entities; +using StellaOps.Workflow.Engine.Constants; +using StellaOps.Workflow.Engine.Exceptions; + +namespace StellaOps.Workflow.Engine.Projections; + +/// +/// Per-entity sort application for list endpoints. Each method: +/// 1. Matches against a hard-coded whitelist — this is the +/// only place client-supplied identifiers touch EF, so non-whitelisted values become a +/// instead of slipping through into a dynamic ORDER BY. +/// 2. Appends a PK tie-breaker (.ThenBy(x => x.Id)) so pagination stays stable across +/// rows that compare equal on the requested sort column. +/// 3. Falls back to a sensible default when is null (mirrors the previous +/// hard-coded ordering). +/// +public static class WorkflowSortExpressions +{ + public static IOrderedQueryable ApplyInstanceSort( + IQueryable query, + WorkflowSortModel? sort) + { + if (sort is null) + { + return query + .OrderByDescending(x => x.CreatedOnUtc) + .ThenBy(x => x.Id); + } + + var desc = IsDescending(sort.Direction); + IOrderedQueryable ordered = (sort.Prop ?? string.Empty).ToLowerInvariant() switch + { + "workflowinstanceid" => desc ? query.OrderByDescending(x => x.WorkflowInstanceId) : query.OrderBy(x => x.WorkflowInstanceId), + "workflowname" => desc ? query.OrderByDescending(x => x.WorkflowName) : query.OrderBy(x => x.WorkflowName), + "workflowversion" => desc ? query.OrderByDescending(x => x.WorkflowVersion) : query.OrderBy(x => x.WorkflowVersion), + "status" => desc ? query.OrderByDescending(x => x.Status) : query.OrderBy(x => x.Status), + "createdonutc" => desc ? query.OrderByDescending(x => x.CreatedOnUtc) : query.OrderBy(x => x.CreatedOnUtc), + "completedonutc" => desc ? query.OrderByDescending(x => x.CompletedOnUtc) : query.OrderBy(x => x.CompletedOnUtc), + "businessreferencekey" => desc ? query.OrderByDescending(x => x.BusinessReferenceKey) : query.OrderBy(x => x.BusinessReferenceKey), + _ => throw new BaseResultException(MessageKeys.WorkflowSortColumnNotAllowed, sort.Prop ?? string.Empty, "instances"), + }; + return ordered.ThenBy(x => x.Id); + } + + public static IOrderedQueryable ApplyTaskSort( + IQueryable query, + WorkflowSortModel? sort) + { + if (sort is null) + { + return query + .OrderByDescending(x => x.CreatedOnUtc) + .ThenBy(x => x.Id); + } + + var desc = IsDescending(sort.Direction); + IOrderedQueryable ordered = (sort.Prop ?? string.Empty).ToLowerInvariant() switch + { + "workflowtaskid" => desc ? query.OrderByDescending(x => x.WorkflowTaskId) : query.OrderBy(x => x.WorkflowTaskId), + "taskname" => desc ? query.OrderByDescending(x => x.TaskName) : query.OrderBy(x => x.TaskName), + "workflowname" => desc ? query.OrderByDescending(x => x.WorkflowName) : query.OrderBy(x => x.WorkflowName), + "workflowversion" => desc ? query.OrderByDescending(x => x.WorkflowVersion) : query.OrderBy(x => x.WorkflowVersion), + "status" => desc ? query.OrderByDescending(x => x.Status) : query.OrderBy(x => x.Status), + "assignee" => desc ? query.OrderByDescending(x => x.Assignee) : query.OrderBy(x => x.Assignee), + "createdonutc" => desc ? query.OrderByDescending(x => x.CreatedOnUtc) : query.OrderBy(x => x.CreatedOnUtc), + "completedonutc" => desc ? query.OrderByDescending(x => x.CompletedOnUtc) : query.OrderBy(x => x.CompletedOnUtc), + // DeadlineUtc maps to StaleAfterUtc on the projection (the contract exposes the deadline, the projection stores staleness). + "deadlineutc" => desc ? query.OrderByDescending(x => x.StaleAfterUtc) : query.OrderBy(x => x.StaleAfterUtc), + _ => throw new BaseResultException(MessageKeys.WorkflowSortColumnNotAllowed, sort.Prop ?? string.Empty, "tasks"), + }; + return ordered.ThenBy(x => x.Id); + } + + private static bool IsDescending(string? direction) => + string.Equals(direction?.Trim(), "desc", StringComparison.OrdinalIgnoreCase); +} diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.Signaling.OracleAq/OracleAqWorkflowSignalDeadLetterStore.cs b/src/Workflow/__Libraries/StellaOps.Workflow.Signaling.OracleAq/OracleAqWorkflowSignalDeadLetterStore.cs index 40a159d8f..e40ab4ac0 100644 --- a/src/Workflow/__Libraries/StellaOps.Workflow.Signaling.OracleAq/OracleAqWorkflowSignalDeadLetterStore.cs +++ b/src/Workflow/__Libraries/StellaOps.Workflow.Signaling.OracleAq/OracleAqWorkflowSignalDeadLetterStore.cs @@ -26,16 +26,21 @@ public sealed class OracleAqWorkflowSignalDeadLetterStore( { ArgumentNullException.ThrowIfNull(request); + // Oracle AQ browse has no offset / sort / count facility, so we materialise up to the + // safety cap, filter+sort+page in process, and report TotalCount = post-filter length. + // Callers paging past the cap will silently miss later messages — the documented trade-off + // for the AQ driver. + var safetyCap = Math.Clamp(request.MaxMessages <= 0 ? 500 : request.MaxMessages, 1, 500); var rawMessages = await transport.BrowseAsync( new OracleAqBrowseRequest { QueueName = options.DeadLetterQueueName, Correlation = request.SignalId, - MaxMessages = Math.Clamp(request.MaxMessages, 1, 200), + MaxMessages = safetyCap, }, cancellationToken); - var results = new List(rawMessages.Count); + var filtered = new List(rawMessages.Count); foreach (var rawMessage in rawMessages) { var mapped = MapMessage(rawMessage, request.IncludeRawPayload); @@ -44,15 +49,63 @@ public sealed class OracleAqWorkflowSignalDeadLetterStore( continue; } - results.Add(mapped); + filtered.Add(mapped); } + var sorted = ApplyInMemorySort(filtered, request.Sort); + + var totalCount = sorted.Count; + var effectiveTake = request.Take > 0 ? request.Take : safetyCap; + effectiveTake = Math.Clamp(effectiveTake, 1, 500); + var effectiveSkip = Math.Max(0, request.Skip); + var page = sorted.Skip(effectiveSkip).Take(effectiveTake).ToArray(); + return new WorkflowSignalDeadLettersGetResponse { - Messages = results, + Messages = page, + TotalCount = totalCount, }; } + /// + /// Applies the sort whitelist in memory. Queue browse gives us messages in enqueue order; we + /// reorder them here before paging so the UI sees a stable page independent of queue position. + /// + private static IReadOnlyList ApplyInMemorySort( + List messages, + WorkflowSortModel? sort) + { + if (sort is null) + { + return messages + .OrderByDescending(x => x.EnqueuedOnUtc) + .ThenBy(x => x.SignalId, StringComparer.OrdinalIgnoreCase) + .ToArray(); + } + + var desc = string.Equals(sort.Direction?.Trim(), "desc", StringComparison.OrdinalIgnoreCase); + IOrderedEnumerable ordered = (sort.Prop ?? string.Empty).ToLowerInvariant() switch + { + "signalid" => desc ? messages.OrderByDescending(x => x.SignalId, StringComparer.OrdinalIgnoreCase) + : messages.OrderBy(x => x.SignalId, StringComparer.OrdinalIgnoreCase), + "workflowinstanceid" => desc ? messages.OrderByDescending(x => x.WorkflowInstanceId, StringComparer.OrdinalIgnoreCase) + : messages.OrderBy(x => x.WorkflowInstanceId, StringComparer.OrdinalIgnoreCase), + "signaltype" => desc ? messages.OrderByDescending(x => x.SignalType, StringComparer.OrdinalIgnoreCase) + : messages.OrderBy(x => x.SignalType, StringComparer.OrdinalIgnoreCase), + "enqueuedonutc" => desc ? messages.OrderByDescending(x => x.EnqueuedOnUtc) + : messages.OrderBy(x => x.EnqueuedOnUtc), + "deliverycount" => desc ? messages.OrderByDescending(x => x.DeliveryCount) + : messages.OrderBy(x => x.DeliveryCount), + _ => throw new ArgumentException( + $"Sort column '{sort.Prop}' is not supported for dead-letter listing. " + + "Allowed: signalId, workflowInstanceId, signalType, enqueuedOnUtc, deliveryCount."), + }; + + return ordered + .ThenBy(x => x.SignalId, StringComparer.OrdinalIgnoreCase) + .ToArray(); + } + public async Task ReplayAsync( WorkflowSignalDeadLetterReplayRequest request, CancellationToken cancellationToken = default) diff --git a/src/Workflow/__Tests/StellaOps.Workflow.Engine.Tests/Projections/WorkflowSortExpressionsTests.cs b/src/Workflow/__Tests/StellaOps.Workflow.Engine.Tests/Projections/WorkflowSortExpressionsTests.cs new file mode 100644 index 000000000..667009acd --- /dev/null +++ b/src/Workflow/__Tests/StellaOps.Workflow.Engine.Tests/Projections/WorkflowSortExpressionsTests.cs @@ -0,0 +1,158 @@ +using System; +using System.Collections.Generic; +using System.Linq; + +using StellaOps.Workflow.Contracts; +using StellaOps.Workflow.DataStore.Oracle.Entities; +using StellaOps.Workflow.Engine.Exceptions; +using StellaOps.Workflow.Engine.Projections; + +using FluentAssertions; + +using NUnit.Framework; + +namespace StellaOps.Workflow.Engine.Tests.Projections; + +/// +/// Direct unit tests for . These bypass DI entirely so the +/// whitelist + tie-breaker behaviour is fast and deterministic to verify, and mirrored on both +/// the Ablera and upstream StellaOps sides. +/// +[TestFixture] +public class WorkflowSortExpressionsTests +{ + [Test] + public void ApplyInstanceSort_NullSort_DefaultsToCreatedOnUtcDescWithPkTieBreaker() + { + var now = DateTime.UtcNow; + var data = new[] + { + BuildInstance(id: 1, createdOnUtc: now.AddMinutes(-10)), + BuildInstance(id: 2, createdOnUtc: now), + BuildInstance(id: 3, createdOnUtc: now), // tie on createdOnUtc with #2 + }.AsQueryable(); + + var ordered = WorkflowSortExpressions.ApplyInstanceSort(data, sort: null).ToArray(); + + ordered.Select(x => x.Id).Should().Equal(new[] { 2m, 3m, 1m }, "desc by CreatedOnUtc, then PK asc for the tie"); + } + + [TestCase("workflowName", "asc", new[] { "Alpha", "Beta", "Gamma" })] + [TestCase("workflowName", "desc", new[] { "Gamma", "Beta", "Alpha" })] + [TestCase("status", "asc", new[] { "Completed", "Open", "Pending" })] + public void ApplyInstanceSort_WhitelistedColumns_OrdersAsExpected(string sortBy, string direction, string[] expected) + { + var data = new[] + { + BuildInstance(id: 1, workflowName: "Beta", status: "Open"), + BuildInstance(id: 2, workflowName: "Gamma", status: "Pending"), + BuildInstance(id: 3, workflowName: "Alpha", status: "Completed"), + }.AsQueryable(); + + var sort = new WorkflowSortModel { Prop = sortBy, Direction = direction }; + var ordered = WorkflowSortExpressions.ApplyInstanceSort(data, sort).ToArray(); + + var projected = sortBy == "status" + ? ordered.Select(x => x.Status).ToArray() + : ordered.Select(x => x.WorkflowName).ToArray(); + projected.Should().Equal(expected); + } + + [Test] + public void ApplyInstanceSort_UnknownColumn_ThrowsBaseResultException() + { + var data = new[] { BuildInstance(id: 1) }.AsQueryable(); + + var act = () => WorkflowSortExpressions.ApplyInstanceSort( + data, + new WorkflowSortModel { Prop = "attackerInjection; DROP TABLE --", Direction = "asc" }).ToArray(); + + act.Should().Throw(); + } + + [TestCase("taskName", "asc", new[] { "approve", "notify", "review" })] + [TestCase("assignee", "desc", new[] { "yuri", "alice", null })] + public void ApplyTaskSort_WhitelistedColumns_OrdersAsExpected(string sortBy, string direction, string?[] expected) + { + var data = new[] + { + BuildTask(id: 1, taskName: "review", assignee: "alice"), + BuildTask(id: 2, taskName: "approve", assignee: null), + BuildTask(id: 3, taskName: "notify", assignee: "yuri"), + }.AsQueryable(); + + var sort = new WorkflowSortModel { Prop = sortBy, Direction = direction }; + var ordered = WorkflowSortExpressions.ApplyTaskSort(data, sort).ToArray(); + + var projected = sortBy == "assignee" + ? ordered.Select(x => x.Assignee).ToArray() + : ordered.Select(x => x.TaskName).Cast().ToArray(); + projected.Should().Equal(expected); + } + + [Test] + public void ApplyTaskSort_DeadlineUtc_MapsToStaleAfterUtcColumn() + { + // DeadlineUtc is the contract-facing name; the projection column is StaleAfterUtc. + var data = new[] + { + BuildTask(id: 1, staleAfterUtc: DateTime.UtcNow.AddHours(2)), + BuildTask(id: 2, staleAfterUtc: DateTime.UtcNow.AddHours(1)), + BuildTask(id: 3, staleAfterUtc: null), + }.AsQueryable(); + + var ordered = WorkflowSortExpressions.ApplyTaskSort( + data, + new WorkflowSortModel { Prop = "deadlineUtc", Direction = "asc" }).ToArray(); + + ordered.Select(x => x.Id).Should().Equal(new[] { 3m, 2m, 1m }, "null sorts first in asc; then earliest deadline first"); + } + + [Test] + public void ApplyTaskSort_UnknownColumn_ThrowsBaseResultException() + { + var data = new[] { BuildTask(id: 1) }.AsQueryable(); + + var act = () => WorkflowSortExpressions.ApplyTaskSort( + data, + new WorkflowSortModel { Prop = "nonexistentColumn", Direction = "asc" }).ToArray(); + + act.Should().Throw(); + } + + private static WorkflowInstanceProjection BuildInstance( + int id, + string workflowName = "Workflow", + string status = "Open", + DateTime? createdOnUtc = null) => new() + { + Id = id, + WorkflowInstanceId = $"inst-{id}", + WorkflowName = workflowName, + WorkflowVersion = "1.0.0", + BusinessReferenceKey = null, + BusinessReferenceJson = null, + Status = status, + CreatedOnUtc = createdOnUtc ?? DateTime.UtcNow.AddMinutes(-id), + }; + + private static WorkflowTaskProjection BuildTask( + int id, + string taskName = "task", + string? assignee = null, + DateTime? staleAfterUtc = null) => new() + { + Id = id, + WorkflowTaskId = $"task-{id}", + WorkflowInstanceId = $"inst-{id}", + WorkflowName = "Workflow", + WorkflowVersion = "1.0.0", + TaskName = taskName, + TaskType = "HumanTask", + Route = "default/task", + Assignee = assignee, + Status = "Open", + CreatedOnUtc = DateTime.UtcNow.AddMinutes(-id), + StaleAfterUtc = staleAfterUtc, + }; +}