feat(workflow): server-side sort + dead-letter paging (backport)
- New shared `WorkflowSortModel { Prop, Direction }` record; 4 list requests
gain an optional `Sort` property and the dead-letter request gains `Skip/Take`
plus `TotalCount` on the response. Matches the `sortModel: { prop, direction }`
convention that sc-table-view emits, so client payloads bind directly.
- `WorkflowSortExpressions` whitelist helper (public) applies sort on instance
and task queries with a PK tie-breaker for stable pagination. Unknown columns
raise `BaseResultException(WorkflowSortColumnNotAllowed, ...)` rather than
leaking into the ORDER BY. Projection store picks up the helper on both the
instance and task list paths.
- Dead-letter stores uplifted per driver:
* PostgreSQL: OFFSET/LIMIT + whitelisted ORDER BY, separate COUNT(*) query.
* MongoDB: Skip/Limit/Sort builder + CountDocumentsAsync for total.
* Oracle AQ: browse to a 500-cap, filter+sort+page in process, TotalCount =
post-filter length (queue-browse can't offset/sort natively).
- New StellaOps.Workflow.Engine.Tests cover the sort helper whitelist + tie-
breaker behaviour; all 9 tests pass alongside the 24 earlier converter +
OnComplete tests.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -26,9 +26,24 @@ public sealed record WorkflowDefinitionGetRequest
|
||||
|
||||
/// <summary>Filter by multiple workflow names.</summary>
|
||||
public IReadOnlyCollection<string> WorkflowNames { get; init; } = [];
|
||||
|
||||
/// <summary>Pagination: rows to skip. 0 = start from beginning.</summary>
|
||||
public int Skip { get; init; }
|
||||
|
||||
/// <summary>Pagination: max rows to return. 0 = return all.</summary>
|
||||
public int Take { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Optional sort. SortBy is whitelisted per-endpoint — for definitions the allowed values are
|
||||
/// "workflowName", "workflowVersion", "displayName". Null sorts by a stable default.
|
||||
/// </summary>
|
||||
public WorkflowSortModel? Sort { get; init; }
|
||||
}
|
||||
|
||||
public sealed record WorkflowDefinitionGetResponse
|
||||
{
|
||||
public IReadOnlyCollection<WorkflowDefinitionDescriptor> Definitions { get; init; } = [];
|
||||
|
||||
/// <summary>Total number of definitions matching the filter (pre-pagination).</summary>
|
||||
public int TotalCount { get; init; }
|
||||
}
|
||||
|
||||
@@ -40,11 +40,27 @@ public sealed record WorkflowInstancesGetRequest
|
||||
|
||||
/// <summary>When true, populate ActiveTask and WorkflowState on each instance summary.</summary>
|
||||
public bool IncludeDetails { get; init; }
|
||||
|
||||
/// <summary>Pagination: rows to skip. 0 = start from beginning.</summary>
|
||||
public int Skip { get; init; }
|
||||
|
||||
/// <summary>Pagination: max rows to return. 0 = use server default cap.</summary>
|
||||
public int Take { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Optional sort. SortBy is whitelisted per-endpoint — for instances the allowed values are
|
||||
/// "workflowInstanceId", "workflowName", "workflowVersion", "status", "createdOnUtc",
|
||||
/// "completedOnUtc", "businessReferenceKey". Null sorts by createdOnUtc desc by default.
|
||||
/// </summary>
|
||||
public WorkflowSortModel? Sort { get; init; }
|
||||
}
|
||||
|
||||
public sealed record WorkflowInstancesGetResponse
|
||||
{
|
||||
public IReadOnlyCollection<WorkflowInstanceSummary> Instances { get; init; } = [];
|
||||
|
||||
/// <summary>Total number of rows matching the filter (pre-pagination).</summary>
|
||||
public int TotalCount { get; init; }
|
||||
}
|
||||
|
||||
public sealed record WorkflowInstanceGetRequest
|
||||
|
||||
@@ -75,8 +75,27 @@ public sealed record WorkflowSignalDeadLettersGetRequest
|
||||
public string? SignalId { get; init; }
|
||||
public string? WorkflowInstanceId { get; init; }
|
||||
public string? SignalType { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Hard safety cap on how many messages the underlying driver is allowed to materialise
|
||||
/// before paging is applied. Clamped to 500. Kept separate from <see cref="Take"/> so slow
|
||||
/// drivers (e.g. Oracle AQ browse) don't get asked for an unbounded result set.
|
||||
/// </summary>
|
||||
public int MaxMessages { get; init; } = 50;
|
||||
|
||||
public bool IncludeRawPayload { get; init; }
|
||||
|
||||
/// <summary>Pagination: rows to skip. 0 = start from beginning.</summary>
|
||||
public int Skip { get; init; }
|
||||
|
||||
/// <summary>Pagination: max rows to return. 0 = use <see cref="MaxMessages"/> as the effective cap.</summary>
|
||||
public int Take { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Optional sort. SortBy is whitelisted — allowed values are "signalId", "workflowInstanceId",
|
||||
/// "signalType", "enqueuedOnUtc", "deliveryCount". Null sorts by enqueuedOnUtc desc.
|
||||
/// </summary>
|
||||
public WorkflowSortModel? Sort { get; init; }
|
||||
}
|
||||
|
||||
public sealed record WorkflowSignalDeadLetterMessage
|
||||
@@ -101,6 +120,9 @@ public sealed record WorkflowSignalDeadLetterMessage
|
||||
public sealed record WorkflowSignalDeadLettersGetResponse
|
||||
{
|
||||
public IReadOnlyCollection<WorkflowSignalDeadLetterMessage> Messages { get; init; } = [];
|
||||
|
||||
/// <summary>Total number of rows matching the filter (pre-pagination).</summary>
|
||||
public int TotalCount { get; init; }
|
||||
}
|
||||
|
||||
public sealed record WorkflowSignalDeadLetterReplayRequest
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
namespace StellaOps.Workflow.Contracts;
|
||||
|
||||
/// <summary>
|
||||
/// Server-side sort request for list endpoints. <see cref="SortBy"/> is matched against a
|
||||
/// per-endpoint whitelist on the server — unknown values are rejected with a validation error
|
||||
/// rather than silently ignored, so typos and client/server drift surface early.
|
||||
/// </summary>
|
||||
public sealed record WorkflowSortModel
|
||||
{
|
||||
/// <summary>Whitelisted column identifier (e.g. "createdOnUtc", "workflowName").</summary>
|
||||
public required string Prop { get; init; }
|
||||
|
||||
/// <summary>"asc" or "desc". Defaults to ascending.</summary>
|
||||
public string Direction { get; init; } = "asc";
|
||||
}
|
||||
@@ -56,11 +56,27 @@ public sealed record WorkflowTasksGetRequest
|
||||
public string? ActorId { get; init; }
|
||||
public IReadOnlyCollection<string> ActorRoles { get; init; } = [];
|
||||
public IReadOnlyCollection<string> CandidateRoles { get; init; } = [];
|
||||
|
||||
/// <summary>Pagination: rows to skip. 0 = start from beginning.</summary>
|
||||
public int Skip { get; init; }
|
||||
|
||||
/// <summary>Pagination: max rows to return. 0 = use server default cap.</summary>
|
||||
public int Take { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Optional sort. SortBy is whitelisted per-endpoint — for tasks the allowed values are
|
||||
/// "workflowTaskId", "taskName", "workflowName", "workflowVersion", "status", "assignee",
|
||||
/// "createdOnUtc", "completedOnUtc", "deadlineUtc". Null sorts by createdOnUtc desc by default.
|
||||
/// </summary>
|
||||
public WorkflowSortModel? Sort { get; init; }
|
||||
}
|
||||
|
||||
public sealed record WorkflowTasksGetResponse
|
||||
{
|
||||
public IReadOnlyCollection<WorkflowTaskSummary> Tasks { get; init; } = [];
|
||||
|
||||
/// <summary>Total number of rows matching the filter (pre-pagination).</summary>
|
||||
public int TotalCount { get; init; }
|
||||
}
|
||||
|
||||
public sealed record WorkflowTaskGetRequest
|
||||
|
||||
@@ -284,14 +284,24 @@ public sealed class MongoWorkflowSignalStore(
|
||||
filter &= Builders<WorkflowSignalDocument>.Filter.Eq(x => x.SignalType, request.SignalType);
|
||||
}
|
||||
|
||||
var effectiveTake = request.Take > 0 ? request.Take : request.MaxMessages;
|
||||
effectiveTake = Math.Clamp(effectiveTake, 1, 500);
|
||||
var effectiveSkip = Math.Max(0, request.Skip);
|
||||
|
||||
// Count for the response envelope so the UI can render "page X of Y".
|
||||
var totalCount = (int)Math.Min(int.MaxValue,
|
||||
await deadLetters.CountDocumentsAsync(filter, cancellationToken: cancellationToken));
|
||||
|
||||
var sort = BuildDeadLetterSort(request.Sort);
|
||||
var documents = await deadLetters.Find(filter)
|
||||
.SortByDescending(x => x.DeadLetteredOnUtc)
|
||||
.ThenBy(x => x.SignalId)
|
||||
.Limit(Math.Clamp(request.MaxMessages, 1, 200))
|
||||
.Sort(sort)
|
||||
.Skip(effectiveSkip)
|
||||
.Limit(effectiveTake)
|
||||
.ToListAsync(cancellationToken);
|
||||
|
||||
return new WorkflowSignalDeadLettersGetResponse
|
||||
{
|
||||
TotalCount = totalCount,
|
||||
Messages = documents.Select(document =>
|
||||
{
|
||||
try
|
||||
@@ -342,6 +352,33 @@ public sealed class MongoWorkflowSignalStore(
|
||||
};
|
||||
}
|
||||
|
||||
private static SortDefinition<WorkflowSignalDocument> BuildDeadLetterSort(WorkflowSortModel? sort)
|
||||
{
|
||||
var builder = Builders<WorkflowSignalDocument>.Sort;
|
||||
if (sort is null)
|
||||
{
|
||||
return builder.Combine(
|
||||
builder.Descending(x => x.DeadLetteredOnUtc),
|
||||
builder.Ascending(x => x.SignalId));
|
||||
}
|
||||
|
||||
var desc = string.Equals(sort.Direction?.Trim(), "desc", StringComparison.OrdinalIgnoreCase);
|
||||
SortDefinition<WorkflowSignalDocument> primary = (sort.Prop ?? string.Empty).ToLowerInvariant() switch
|
||||
{
|
||||
"signalid" => desc ? builder.Descending(x => x.SignalId) : builder.Ascending(x => x.SignalId),
|
||||
"workflowinstanceid" => desc ? builder.Descending(x => x.WorkflowInstanceId) : builder.Ascending(x => x.WorkflowInstanceId),
|
||||
"signaltype" => desc ? builder.Descending(x => x.SignalType) : builder.Ascending(x => x.SignalType),
|
||||
"enqueuedonutc" => desc ? builder.Descending(x => x.EnqueuedOnUtc) : builder.Ascending(x => x.EnqueuedOnUtc),
|
||||
"deliverycount" => desc ? builder.Descending(x => x.DeliveryCount) : builder.Ascending(x => x.DeliveryCount),
|
||||
_ => throw new ArgumentException(
|
||||
$"Sort column '{sort.Prop}' is not supported for dead-letter listing. "
|
||||
+ "Allowed: signalId, workflowInstanceId, signalType, enqueuedOnUtc, deliveryCount."),
|
||||
};
|
||||
|
||||
// Stable tie-breaker by SignalId (effectively the collection's unique key for dead letters).
|
||||
return builder.Combine(primary, builder.Ascending(x => x.SignalId));
|
||||
}
|
||||
|
||||
internal async Task<WorkflowSignalDeadLetterReplayResponse> ReplayAsync(
|
||||
WorkflowSignalDeadLetterReplayRequest request,
|
||||
CancellationToken cancellationToken = default)
|
||||
|
||||
@@ -305,7 +305,35 @@ public sealed class PostgresWorkflowSignalStore(
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
|
||||
// Resolve an ORDER BY clause from the whitelist — never interpolate user input into SQL.
|
||||
var orderBy = ResolveDeadLetterOrderBy(request.Sort);
|
||||
|
||||
// Effective cap: use Take when given; otherwise MaxMessages. Hard-cap at 500 so a slow
|
||||
// driver can't be asked for an unbounded result set.
|
||||
var effectiveTake = request.Take > 0 ? request.Take : request.MaxMessages;
|
||||
effectiveTake = Math.Clamp(effectiveTake, 1, 500);
|
||||
var effectiveSkip = Math.Max(0, request.Skip);
|
||||
|
||||
await using var scope = await database.OpenScopeAsync(requireTransaction: false, cancellationToken);
|
||||
|
||||
// 1) Total count with the same WHERE (so the UI can show "page X of Y").
|
||||
int totalCount;
|
||||
await using (var countCommand = database.CreateCommand(
|
||||
scope.Connection,
|
||||
$"""
|
||||
select count(*)
|
||||
from {database.Qualify(Postgres.DeadLetterTableName)}
|
||||
where (@signal_id is null or signal_id = @signal_id)
|
||||
and (@workflow_instance_id is null or workflow_instance_id = @workflow_instance_id)
|
||||
and (@signal_type is null or signal_type = @signal_type)
|
||||
"""))
|
||||
{
|
||||
countCommand.Parameters.Add("signal_id", NpgsqlDbType.Text).Value = (object?)request.SignalId ?? DBNull.Value;
|
||||
countCommand.Parameters.Add("workflow_instance_id", NpgsqlDbType.Text).Value = (object?)request.WorkflowInstanceId ?? DBNull.Value;
|
||||
countCommand.Parameters.Add("signal_type", NpgsqlDbType.Text).Value = (object?)request.SignalType ?? DBNull.Value;
|
||||
totalCount = Convert.ToInt32(await countCommand.ExecuteScalarAsync(cancellationToken) ?? 0);
|
||||
}
|
||||
|
||||
await using var command = database.CreateCommand(
|
||||
scope.Connection,
|
||||
$"""
|
||||
@@ -325,13 +353,15 @@ public sealed class PostgresWorkflowSignalStore(
|
||||
where (@signal_id is null or signal_id = @signal_id)
|
||||
and (@workflow_instance_id is null or workflow_instance_id = @workflow_instance_id)
|
||||
and (@signal_type is null or signal_type = @signal_type)
|
||||
order by dead_lettered_on_utc desc, signal_id
|
||||
{orderBy}
|
||||
offset @skip
|
||||
limit @max_messages
|
||||
""");
|
||||
command.Parameters.Add("signal_id", NpgsqlDbType.Text).Value = (object?)request.SignalId ?? DBNull.Value;
|
||||
command.Parameters.Add("workflow_instance_id", NpgsqlDbType.Text).Value = (object?)request.WorkflowInstanceId ?? DBNull.Value;
|
||||
command.Parameters.Add("signal_type", NpgsqlDbType.Text).Value = (object?)request.SignalType ?? DBNull.Value;
|
||||
command.Parameters.AddWithValue("max_messages", Math.Clamp(request.MaxMessages, 1, 200));
|
||||
command.Parameters.AddWithValue("max_messages", effectiveTake);
|
||||
command.Parameters.AddWithValue("skip", effectiveSkip);
|
||||
|
||||
var results = new List<WorkflowSignalDeadLetterMessage>();
|
||||
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
|
||||
@@ -387,9 +417,35 @@ public sealed class PostgresWorkflowSignalStore(
|
||||
return new WorkflowSignalDeadLettersGetResponse
|
||||
{
|
||||
Messages = results,
|
||||
TotalCount = totalCount,
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Maps a whitelisted <see cref="WorkflowSortModel.SortBy"/> to a literal ORDER BY clause.
|
||||
/// Enforcing the whitelist here is what keeps client-supplied identifiers out of the raw SQL
|
||||
/// string. The PK-ish tie-breaker (signal_id) keeps pagination stable across ties.
|
||||
/// </summary>
|
||||
private static string ResolveDeadLetterOrderBy(WorkflowSortModel? sort)
|
||||
{
|
||||
if (sort is null) return "order by dead_lettered_on_utc desc, signal_id";
|
||||
|
||||
var desc = string.Equals(sort.Direction?.Trim(), "desc", StringComparison.OrdinalIgnoreCase);
|
||||
var dir = desc ? "desc" : "asc";
|
||||
var column = (sort.Prop ?? string.Empty).ToLowerInvariant() switch
|
||||
{
|
||||
"signalid" => "signal_id",
|
||||
"workflowinstanceid" => "workflow_instance_id",
|
||||
"signaltype" => "signal_type",
|
||||
"enqueuedonutc" => "enqueued_on_utc",
|
||||
"deliverycount" => "delivery_count",
|
||||
_ => throw new ArgumentException(
|
||||
$"Sort column '{sort.Prop}' is not supported for dead-letter listing. "
|
||||
+ "Allowed: signalId, workflowInstanceId, signalType, enqueuedOnUtc, deliveryCount."),
|
||||
};
|
||||
return $"order by {column} {dir}, signal_id";
|
||||
}
|
||||
|
||||
internal async Task<WorkflowSignalDeadLetterReplayResponse> ReplayAsync(
|
||||
WorkflowSignalDeadLetterReplayRequest request,
|
||||
CancellationToken cancellationToken = default)
|
||||
|
||||
@@ -11,4 +11,5 @@ public static class MessageKeys
|
||||
public static string WorkflowPayloadFieldMissing => nameof(WorkflowPayloadFieldMissing);
|
||||
public static string WorkflowTransportFailed => nameof(WorkflowTransportFailed);
|
||||
public static string WorkflowRuntimeFailed => nameof(WorkflowRuntimeFailed);
|
||||
public static string WorkflowSortColumnNotAllowed => nameof(WorkflowSortColumnNotAllowed);
|
||||
}
|
||||
|
||||
@@ -140,23 +140,30 @@ public sealed class WorkflowProjectionStore(
|
||||
query = query.Where(x => x.Status == request.Status);
|
||||
}
|
||||
|
||||
var tasks = await query
|
||||
.OrderBy(x => x.CreatedOnUtc)
|
||||
.ToListAsync(cancellationToken);
|
||||
var orderedQuery = WorkflowSortExpressions.ApplyTaskSort(query, request.Sort);
|
||||
|
||||
var tasks = await orderedQuery.ToListAsync(cancellationToken);
|
||||
|
||||
var summaries = tasks
|
||||
.Select(MapTaskSummary)
|
||||
.Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(businessReferenceKey, request.BusinessReferenceParts))
|
||||
.ToArray();
|
||||
|
||||
if (request.CandidateRoles.Count == 0)
|
||||
if (request.CandidateRoles.Count > 0)
|
||||
{
|
||||
return summaries;
|
||||
summaries = summaries
|
||||
.Where(x => x.EffectiveRoles.Intersect(request.CandidateRoles, StringComparer.OrdinalIgnoreCase).Any())
|
||||
.ToArray();
|
||||
}
|
||||
|
||||
return summaries
|
||||
.Where(x => x.EffectiveRoles.Intersect(request.CandidateRoles, StringComparer.OrdinalIgnoreCase).Any())
|
||||
.ToArray();
|
||||
// Honour Skip/Take when the client asked for a page (0 means "all").
|
||||
if (request.Skip > 0 || request.Take > 0)
|
||||
{
|
||||
var take = request.Take > 0 ? request.Take : summaries.Length;
|
||||
summaries = summaries.Skip(request.Skip).Take(take).ToArray();
|
||||
}
|
||||
|
||||
return summaries;
|
||||
}
|
||||
|
||||
public async Task<WorkflowTaskSummary?> GetTaskAsync(
|
||||
@@ -502,8 +509,9 @@ public sealed class WorkflowProjectionStore(
|
||||
query = query.Where(x => x.Status == request.Status);
|
||||
}
|
||||
|
||||
var instances = await query
|
||||
.OrderByDescending(x => x.CreatedOnUtc)
|
||||
var orderedQuery = WorkflowSortExpressions.ApplyInstanceSort(query, request.Sort);
|
||||
|
||||
var instances = await orderedQuery
|
||||
.Select(x => new WorkflowInstanceSummary
|
||||
{
|
||||
WorkflowInstanceId = x.WorkflowInstanceId,
|
||||
@@ -516,9 +524,17 @@ public sealed class WorkflowProjectionStore(
|
||||
})
|
||||
.ToArrayAsync(cancellationToken);
|
||||
|
||||
return instances
|
||||
var filtered = instances
|
||||
.Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(businessReferenceKey, request.BusinessReferenceParts))
|
||||
.ToArray();
|
||||
|
||||
if (request.Skip > 0 || request.Take > 0)
|
||||
{
|
||||
var take = request.Take > 0 ? request.Take : filtered.Length;
|
||||
filtered = filtered.Skip(request.Skip).Take(take).ToArray();
|
||||
}
|
||||
|
||||
return filtered;
|
||||
}
|
||||
|
||||
public async Task<WorkflowInstanceSummary?> GetInstanceAsync(
|
||||
|
||||
@@ -0,0 +1,80 @@
|
||||
using System;
|
||||
using System.Linq;
|
||||
|
||||
using StellaOps.Workflow.Contracts;
|
||||
using StellaOps.Workflow.DataStore.Oracle.Entities;
|
||||
using StellaOps.Workflow.Engine.Constants;
|
||||
using StellaOps.Workflow.Engine.Exceptions;
|
||||
|
||||
namespace StellaOps.Workflow.Engine.Projections;
|
||||
|
||||
/// <summary>
|
||||
/// Per-entity sort application for list endpoints. Each method:
|
||||
/// 1. Matches <see cref="WorkflowSortModel.SortBy"/> against a hard-coded whitelist — this is the
|
||||
/// only place client-supplied identifiers touch EF, so non-whitelisted values become a
|
||||
/// <see cref="BaseResultException"/> instead of slipping through into a dynamic ORDER BY.
|
||||
/// 2. Appends a PK tie-breaker (<c>.ThenBy(x => x.Id)</c>) so pagination stays stable across
|
||||
/// rows that compare equal on the requested sort column.
|
||||
/// 3. Falls back to a sensible default when <paramref name="sort"/> is null (mirrors the previous
|
||||
/// hard-coded ordering).
|
||||
/// </summary>
|
||||
public static class WorkflowSortExpressions
|
||||
{
|
||||
public static IOrderedQueryable<WorkflowInstanceProjection> ApplyInstanceSort(
|
||||
IQueryable<WorkflowInstanceProjection> query,
|
||||
WorkflowSortModel? sort)
|
||||
{
|
||||
if (sort is null)
|
||||
{
|
||||
return query
|
||||
.OrderByDescending(x => x.CreatedOnUtc)
|
||||
.ThenBy(x => x.Id);
|
||||
}
|
||||
|
||||
var desc = IsDescending(sort.Direction);
|
||||
IOrderedQueryable<WorkflowInstanceProjection> ordered = (sort.Prop ?? string.Empty).ToLowerInvariant() switch
|
||||
{
|
||||
"workflowinstanceid" => desc ? query.OrderByDescending(x => x.WorkflowInstanceId) : query.OrderBy(x => x.WorkflowInstanceId),
|
||||
"workflowname" => desc ? query.OrderByDescending(x => x.WorkflowName) : query.OrderBy(x => x.WorkflowName),
|
||||
"workflowversion" => desc ? query.OrderByDescending(x => x.WorkflowVersion) : query.OrderBy(x => x.WorkflowVersion),
|
||||
"status" => desc ? query.OrderByDescending(x => x.Status) : query.OrderBy(x => x.Status),
|
||||
"createdonutc" => desc ? query.OrderByDescending(x => x.CreatedOnUtc) : query.OrderBy(x => x.CreatedOnUtc),
|
||||
"completedonutc" => desc ? query.OrderByDescending(x => x.CompletedOnUtc) : query.OrderBy(x => x.CompletedOnUtc),
|
||||
"businessreferencekey" => desc ? query.OrderByDescending(x => x.BusinessReferenceKey) : query.OrderBy(x => x.BusinessReferenceKey),
|
||||
_ => throw new BaseResultException(MessageKeys.WorkflowSortColumnNotAllowed, sort.Prop ?? string.Empty, "instances"),
|
||||
};
|
||||
return ordered.ThenBy(x => x.Id);
|
||||
}
|
||||
|
||||
public static IOrderedQueryable<WorkflowTaskProjection> ApplyTaskSort(
|
||||
IQueryable<WorkflowTaskProjection> query,
|
||||
WorkflowSortModel? sort)
|
||||
{
|
||||
if (sort is null)
|
||||
{
|
||||
return query
|
||||
.OrderByDescending(x => x.CreatedOnUtc)
|
||||
.ThenBy(x => x.Id);
|
||||
}
|
||||
|
||||
var desc = IsDescending(sort.Direction);
|
||||
IOrderedQueryable<WorkflowTaskProjection> ordered = (sort.Prop ?? string.Empty).ToLowerInvariant() switch
|
||||
{
|
||||
"workflowtaskid" => desc ? query.OrderByDescending(x => x.WorkflowTaskId) : query.OrderBy(x => x.WorkflowTaskId),
|
||||
"taskname" => desc ? query.OrderByDescending(x => x.TaskName) : query.OrderBy(x => x.TaskName),
|
||||
"workflowname" => desc ? query.OrderByDescending(x => x.WorkflowName) : query.OrderBy(x => x.WorkflowName),
|
||||
"workflowversion" => desc ? query.OrderByDescending(x => x.WorkflowVersion) : query.OrderBy(x => x.WorkflowVersion),
|
||||
"status" => desc ? query.OrderByDescending(x => x.Status) : query.OrderBy(x => x.Status),
|
||||
"assignee" => desc ? query.OrderByDescending(x => x.Assignee) : query.OrderBy(x => x.Assignee),
|
||||
"createdonutc" => desc ? query.OrderByDescending(x => x.CreatedOnUtc) : query.OrderBy(x => x.CreatedOnUtc),
|
||||
"completedonutc" => desc ? query.OrderByDescending(x => x.CompletedOnUtc) : query.OrderBy(x => x.CompletedOnUtc),
|
||||
// DeadlineUtc maps to StaleAfterUtc on the projection (the contract exposes the deadline, the projection stores staleness).
|
||||
"deadlineutc" => desc ? query.OrderByDescending(x => x.StaleAfterUtc) : query.OrderBy(x => x.StaleAfterUtc),
|
||||
_ => throw new BaseResultException(MessageKeys.WorkflowSortColumnNotAllowed, sort.Prop ?? string.Empty, "tasks"),
|
||||
};
|
||||
return ordered.ThenBy(x => x.Id);
|
||||
}
|
||||
|
||||
private static bool IsDescending(string? direction) =>
|
||||
string.Equals(direction?.Trim(), "desc", StringComparison.OrdinalIgnoreCase);
|
||||
}
|
||||
@@ -26,16 +26,21 @@ public sealed class OracleAqWorkflowSignalDeadLetterStore(
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
|
||||
// Oracle AQ browse has no offset / sort / count facility, so we materialise up to the
|
||||
// safety cap, filter+sort+page in process, and report TotalCount = post-filter length.
|
||||
// Callers paging past the cap will silently miss later messages — the documented trade-off
|
||||
// for the AQ driver.
|
||||
var safetyCap = Math.Clamp(request.MaxMessages <= 0 ? 500 : request.MaxMessages, 1, 500);
|
||||
var rawMessages = await transport.BrowseAsync(
|
||||
new OracleAqBrowseRequest
|
||||
{
|
||||
QueueName = options.DeadLetterQueueName,
|
||||
Correlation = request.SignalId,
|
||||
MaxMessages = Math.Clamp(request.MaxMessages, 1, 200),
|
||||
MaxMessages = safetyCap,
|
||||
},
|
||||
cancellationToken);
|
||||
|
||||
var results = new List<WorkflowSignalDeadLetterMessage>(rawMessages.Count);
|
||||
var filtered = new List<WorkflowSignalDeadLetterMessage>(rawMessages.Count);
|
||||
foreach (var rawMessage in rawMessages)
|
||||
{
|
||||
var mapped = MapMessage(rawMessage, request.IncludeRawPayload);
|
||||
@@ -44,15 +49,63 @@ public sealed class OracleAqWorkflowSignalDeadLetterStore(
|
||||
continue;
|
||||
}
|
||||
|
||||
results.Add(mapped);
|
||||
filtered.Add(mapped);
|
||||
}
|
||||
|
||||
var sorted = ApplyInMemorySort(filtered, request.Sort);
|
||||
|
||||
var totalCount = sorted.Count;
|
||||
var effectiveTake = request.Take > 0 ? request.Take : safetyCap;
|
||||
effectiveTake = Math.Clamp(effectiveTake, 1, 500);
|
||||
var effectiveSkip = Math.Max(0, request.Skip);
|
||||
var page = sorted.Skip(effectiveSkip).Take(effectiveTake).ToArray();
|
||||
|
||||
return new WorkflowSignalDeadLettersGetResponse
|
||||
{
|
||||
Messages = results,
|
||||
Messages = page,
|
||||
TotalCount = totalCount,
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Applies the sort whitelist in memory. Queue browse gives us messages in enqueue order; we
|
||||
/// reorder them here before paging so the UI sees a stable page independent of queue position.
|
||||
/// </summary>
|
||||
private static IReadOnlyList<WorkflowSignalDeadLetterMessage> ApplyInMemorySort(
|
||||
List<WorkflowSignalDeadLetterMessage> messages,
|
||||
WorkflowSortModel? sort)
|
||||
{
|
||||
if (sort is null)
|
||||
{
|
||||
return messages
|
||||
.OrderByDescending(x => x.EnqueuedOnUtc)
|
||||
.ThenBy(x => x.SignalId, StringComparer.OrdinalIgnoreCase)
|
||||
.ToArray();
|
||||
}
|
||||
|
||||
var desc = string.Equals(sort.Direction?.Trim(), "desc", StringComparison.OrdinalIgnoreCase);
|
||||
IOrderedEnumerable<WorkflowSignalDeadLetterMessage> ordered = (sort.Prop ?? string.Empty).ToLowerInvariant() switch
|
||||
{
|
||||
"signalid" => desc ? messages.OrderByDescending(x => x.SignalId, StringComparer.OrdinalIgnoreCase)
|
||||
: messages.OrderBy(x => x.SignalId, StringComparer.OrdinalIgnoreCase),
|
||||
"workflowinstanceid" => desc ? messages.OrderByDescending(x => x.WorkflowInstanceId, StringComparer.OrdinalIgnoreCase)
|
||||
: messages.OrderBy(x => x.WorkflowInstanceId, StringComparer.OrdinalIgnoreCase),
|
||||
"signaltype" => desc ? messages.OrderByDescending(x => x.SignalType, StringComparer.OrdinalIgnoreCase)
|
||||
: messages.OrderBy(x => x.SignalType, StringComparer.OrdinalIgnoreCase),
|
||||
"enqueuedonutc" => desc ? messages.OrderByDescending(x => x.EnqueuedOnUtc)
|
||||
: messages.OrderBy(x => x.EnqueuedOnUtc),
|
||||
"deliverycount" => desc ? messages.OrderByDescending(x => x.DeliveryCount)
|
||||
: messages.OrderBy(x => x.DeliveryCount),
|
||||
_ => throw new ArgumentException(
|
||||
$"Sort column '{sort.Prop}' is not supported for dead-letter listing. "
|
||||
+ "Allowed: signalId, workflowInstanceId, signalType, enqueuedOnUtc, deliveryCount."),
|
||||
};
|
||||
|
||||
return ordered
|
||||
.ThenBy(x => x.SignalId, StringComparer.OrdinalIgnoreCase)
|
||||
.ToArray();
|
||||
}
|
||||
|
||||
public async Task<WorkflowSignalDeadLetterReplayResponse> ReplayAsync(
|
||||
WorkflowSignalDeadLetterReplayRequest request,
|
||||
CancellationToken cancellationToken = default)
|
||||
|
||||
@@ -0,0 +1,158 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
|
||||
using StellaOps.Workflow.Contracts;
|
||||
using StellaOps.Workflow.DataStore.Oracle.Entities;
|
||||
using StellaOps.Workflow.Engine.Exceptions;
|
||||
using StellaOps.Workflow.Engine.Projections;
|
||||
|
||||
using FluentAssertions;
|
||||
|
||||
using NUnit.Framework;
|
||||
|
||||
namespace StellaOps.Workflow.Engine.Tests.Projections;
|
||||
|
||||
/// <summary>
|
||||
/// Direct unit tests for <see cref="WorkflowSortExpressions"/>. These bypass DI entirely so the
|
||||
/// whitelist + tie-breaker behaviour is fast and deterministic to verify, and mirrored on both
|
||||
/// the Ablera and upstream StellaOps sides.
|
||||
/// </summary>
|
||||
[TestFixture]
|
||||
public class WorkflowSortExpressionsTests
|
||||
{
|
||||
[Test]
|
||||
public void ApplyInstanceSort_NullSort_DefaultsToCreatedOnUtcDescWithPkTieBreaker()
|
||||
{
|
||||
var now = DateTime.UtcNow;
|
||||
var data = new[]
|
||||
{
|
||||
BuildInstance(id: 1, createdOnUtc: now.AddMinutes(-10)),
|
||||
BuildInstance(id: 2, createdOnUtc: now),
|
||||
BuildInstance(id: 3, createdOnUtc: now), // tie on createdOnUtc with #2
|
||||
}.AsQueryable();
|
||||
|
||||
var ordered = WorkflowSortExpressions.ApplyInstanceSort(data, sort: null).ToArray();
|
||||
|
||||
ordered.Select(x => x.Id).Should().Equal(new[] { 2m, 3m, 1m }, "desc by CreatedOnUtc, then PK asc for the tie");
|
||||
}
|
||||
|
||||
[TestCase("workflowName", "asc", new[] { "Alpha", "Beta", "Gamma" })]
|
||||
[TestCase("workflowName", "desc", new[] { "Gamma", "Beta", "Alpha" })]
|
||||
[TestCase("status", "asc", new[] { "Completed", "Open", "Pending" })]
|
||||
public void ApplyInstanceSort_WhitelistedColumns_OrdersAsExpected(string sortBy, string direction, string[] expected)
|
||||
{
|
||||
var data = new[]
|
||||
{
|
||||
BuildInstance(id: 1, workflowName: "Beta", status: "Open"),
|
||||
BuildInstance(id: 2, workflowName: "Gamma", status: "Pending"),
|
||||
BuildInstance(id: 3, workflowName: "Alpha", status: "Completed"),
|
||||
}.AsQueryable();
|
||||
|
||||
var sort = new WorkflowSortModel { Prop = sortBy, Direction = direction };
|
||||
var ordered = WorkflowSortExpressions.ApplyInstanceSort(data, sort).ToArray();
|
||||
|
||||
var projected = sortBy == "status"
|
||||
? ordered.Select(x => x.Status).ToArray()
|
||||
: ordered.Select(x => x.WorkflowName).ToArray();
|
||||
projected.Should().Equal(expected);
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void ApplyInstanceSort_UnknownColumn_ThrowsBaseResultException()
|
||||
{
|
||||
var data = new[] { BuildInstance(id: 1) }.AsQueryable();
|
||||
|
||||
var act = () => WorkflowSortExpressions.ApplyInstanceSort(
|
||||
data,
|
||||
new WorkflowSortModel { Prop = "attackerInjection; DROP TABLE --", Direction = "asc" }).ToArray();
|
||||
|
||||
act.Should().Throw<BaseResultException>();
|
||||
}
|
||||
|
||||
[TestCase("taskName", "asc", new[] { "approve", "notify", "review" })]
|
||||
[TestCase("assignee", "desc", new[] { "yuri", "alice", null })]
|
||||
public void ApplyTaskSort_WhitelistedColumns_OrdersAsExpected(string sortBy, string direction, string?[] expected)
|
||||
{
|
||||
var data = new[]
|
||||
{
|
||||
BuildTask(id: 1, taskName: "review", assignee: "alice"),
|
||||
BuildTask(id: 2, taskName: "approve", assignee: null),
|
||||
BuildTask(id: 3, taskName: "notify", assignee: "yuri"),
|
||||
}.AsQueryable();
|
||||
|
||||
var sort = new WorkflowSortModel { Prop = sortBy, Direction = direction };
|
||||
var ordered = WorkflowSortExpressions.ApplyTaskSort(data, sort).ToArray();
|
||||
|
||||
var projected = sortBy == "assignee"
|
||||
? ordered.Select(x => x.Assignee).ToArray()
|
||||
: ordered.Select(x => x.TaskName).Cast<string?>().ToArray();
|
||||
projected.Should().Equal(expected);
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void ApplyTaskSort_DeadlineUtc_MapsToStaleAfterUtcColumn()
|
||||
{
|
||||
// DeadlineUtc is the contract-facing name; the projection column is StaleAfterUtc.
|
||||
var data = new[]
|
||||
{
|
||||
BuildTask(id: 1, staleAfterUtc: DateTime.UtcNow.AddHours(2)),
|
||||
BuildTask(id: 2, staleAfterUtc: DateTime.UtcNow.AddHours(1)),
|
||||
BuildTask(id: 3, staleAfterUtc: null),
|
||||
}.AsQueryable();
|
||||
|
||||
var ordered = WorkflowSortExpressions.ApplyTaskSort(
|
||||
data,
|
||||
new WorkflowSortModel { Prop = "deadlineUtc", Direction = "asc" }).ToArray();
|
||||
|
||||
ordered.Select(x => x.Id).Should().Equal(new[] { 3m, 2m, 1m }, "null sorts first in asc; then earliest deadline first");
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void ApplyTaskSort_UnknownColumn_ThrowsBaseResultException()
|
||||
{
|
||||
var data = new[] { BuildTask(id: 1) }.AsQueryable();
|
||||
|
||||
var act = () => WorkflowSortExpressions.ApplyTaskSort(
|
||||
data,
|
||||
new WorkflowSortModel { Prop = "nonexistentColumn", Direction = "asc" }).ToArray();
|
||||
|
||||
act.Should().Throw<BaseResultException>();
|
||||
}
|
||||
|
||||
private static WorkflowInstanceProjection BuildInstance(
|
||||
int id,
|
||||
string workflowName = "Workflow",
|
||||
string status = "Open",
|
||||
DateTime? createdOnUtc = null) => new()
|
||||
{
|
||||
Id = id,
|
||||
WorkflowInstanceId = $"inst-{id}",
|
||||
WorkflowName = workflowName,
|
||||
WorkflowVersion = "1.0.0",
|
||||
BusinessReferenceKey = null,
|
||||
BusinessReferenceJson = null,
|
||||
Status = status,
|
||||
CreatedOnUtc = createdOnUtc ?? DateTime.UtcNow.AddMinutes(-id),
|
||||
};
|
||||
|
||||
private static WorkflowTaskProjection BuildTask(
|
||||
int id,
|
||||
string taskName = "task",
|
||||
string? assignee = null,
|
||||
DateTime? staleAfterUtc = null) => new()
|
||||
{
|
||||
Id = id,
|
||||
WorkflowTaskId = $"task-{id}",
|
||||
WorkflowInstanceId = $"inst-{id}",
|
||||
WorkflowName = "Workflow",
|
||||
WorkflowVersion = "1.0.0",
|
||||
TaskName = taskName,
|
||||
TaskType = "HumanTask",
|
||||
Route = "default/task",
|
||||
Assignee = assignee,
|
||||
Status = "Open",
|
||||
CreatedOnUtc = DateTime.UtcNow.AddMinutes(-id),
|
||||
StaleAfterUtc = staleAfterUtc,
|
||||
};
|
||||
}
|
||||
Reference in New Issue
Block a user