diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowDefinitionsContracts.cs b/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowDefinitionsContracts.cs
index bff1b4d95..a3d59ce14 100644
--- a/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowDefinitionsContracts.cs
+++ b/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowDefinitionsContracts.cs
@@ -26,9 +26,24 @@ public sealed record WorkflowDefinitionGetRequest
/// Filter by multiple workflow names.
public IReadOnlyCollection WorkflowNames { get; init; } = [];
+
+ /// Pagination: rows to skip. 0 = start from beginning.
+ public int Skip { get; init; }
+
+ /// Pagination: max rows to return. 0 = return all.
+ public int Take { get; init; }
+
+ ///
+ /// Optional sort. SortBy is whitelisted per-endpoint — for definitions the allowed values are
+ /// "workflowName", "workflowVersion", "displayName". Null sorts by a stable default.
+ ///
+ public WorkflowSortModel? Sort { get; init; }
}
public sealed record WorkflowDefinitionGetResponse
{
public IReadOnlyCollection Definitions { get; init; } = [];
+
+ /// Total number of definitions matching the filter (pre-pagination).
+ public int TotalCount { get; init; }
}
diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowInstanceContracts.cs b/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowInstanceContracts.cs
index 8967e60d5..97129b20e 100644
--- a/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowInstanceContracts.cs
+++ b/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowInstanceContracts.cs
@@ -40,11 +40,27 @@ public sealed record WorkflowInstancesGetRequest
/// When true, populate ActiveTask and WorkflowState on each instance summary.
public bool IncludeDetails { get; init; }
+
+ /// Pagination: rows to skip. 0 = start from beginning.
+ public int Skip { get; init; }
+
+ /// Pagination: max rows to return. 0 = use server default cap.
+ public int Take { get; init; }
+
+ ///
+ /// Optional sort. SortBy is whitelisted per-endpoint — for instances the allowed values are
+ /// "workflowInstanceId", "workflowName", "workflowVersion", "status", "createdOnUtc",
+ /// "completedOnUtc", "businessReferenceKey". Null sorts by createdOnUtc desc by default.
+ ///
+ public WorkflowSortModel? Sort { get; init; }
}
public sealed record WorkflowInstancesGetResponse
{
public IReadOnlyCollection Instances { get; init; } = [];
+
+ /// Total number of rows matching the filter (pre-pagination).
+ public int TotalCount { get; init; }
}
public sealed record WorkflowInstanceGetRequest
diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowOperationalContracts.cs b/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowOperationalContracts.cs
index da5000022..0e4f60698 100644
--- a/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowOperationalContracts.cs
+++ b/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowOperationalContracts.cs
@@ -75,8 +75,27 @@ public sealed record WorkflowSignalDeadLettersGetRequest
public string? SignalId { get; init; }
public string? WorkflowInstanceId { get; init; }
public string? SignalType { get; init; }
+
+ ///
+ /// Hard safety cap on how many messages the underlying driver is allowed to materialise
+ /// before paging is applied. Clamped to 500. Kept separate from so slow
+ /// drivers (e.g. Oracle AQ browse) don't get asked for an unbounded result set.
+ ///
public int MaxMessages { get; init; } = 50;
+
public bool IncludeRawPayload { get; init; }
+
+ /// Pagination: rows to skip. 0 = start from beginning.
+ public int Skip { get; init; }
+
+ /// Pagination: max rows to return. 0 = use as the effective cap.
+ public int Take { get; init; }
+
+ ///
+ /// Optional sort. SortBy is whitelisted — allowed values are "signalId", "workflowInstanceId",
+ /// "signalType", "enqueuedOnUtc", "deliveryCount". Null sorts by enqueuedOnUtc desc.
+ ///
+ public WorkflowSortModel? Sort { get; init; }
}
public sealed record WorkflowSignalDeadLetterMessage
@@ -101,6 +120,9 @@ public sealed record WorkflowSignalDeadLetterMessage
public sealed record WorkflowSignalDeadLettersGetResponse
{
public IReadOnlyCollection Messages { get; init; } = [];
+
+ /// Total number of rows matching the filter (pre-pagination).
+ public int TotalCount { get; init; }
}
public sealed record WorkflowSignalDeadLetterReplayRequest
diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowSortModel.cs b/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowSortModel.cs
new file mode 100644
index 000000000..9863c33e9
--- /dev/null
+++ b/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowSortModel.cs
@@ -0,0 +1,15 @@
+namespace StellaOps.Workflow.Contracts;
+
+///
+/// Server-side sort request for list endpoints. is matched against a
+/// per-endpoint whitelist on the server — unknown values are rejected with a validation error
+/// rather than silently ignored, so typos and client/server drift surface early.
+///
+public sealed record WorkflowSortModel
+{
+ /// Whitelisted column identifier (e.g. "createdOnUtc", "workflowName").
+ public required string Prop { get; init; }
+
+ /// "asc" or "desc". Defaults to ascending.
+ public string Direction { get; init; } = "asc";
+}
diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowTaskContracts.cs b/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowTaskContracts.cs
index 06e080f81..a1b5ed9b4 100644
--- a/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowTaskContracts.cs
+++ b/src/Workflow/__Libraries/StellaOps.Workflow.Contracts/WorkflowTaskContracts.cs
@@ -56,11 +56,27 @@ public sealed record WorkflowTasksGetRequest
public string? ActorId { get; init; }
public IReadOnlyCollection ActorRoles { get; init; } = [];
public IReadOnlyCollection CandidateRoles { get; init; } = [];
+
+ /// Pagination: rows to skip. 0 = start from beginning.
+ public int Skip { get; init; }
+
+ /// Pagination: max rows to return. 0 = use server default cap.
+ public int Take { get; init; }
+
+ ///
+ /// Optional sort. SortBy is whitelisted per-endpoint — for tasks the allowed values are
+ /// "workflowTaskId", "taskName", "workflowName", "workflowVersion", "status", "assignee",
+ /// "createdOnUtc", "completedOnUtc", "deadlineUtc". Null sorts by createdOnUtc desc by default.
+ ///
+ public WorkflowSortModel? Sort { get; init; }
}
public sealed record WorkflowTasksGetResponse
{
public IReadOnlyCollection Tasks { get; init; } = [];
+
+ /// Total number of rows matching the filter (pre-pagination).
+ public int TotalCount { get; init; }
}
public sealed record WorkflowTaskGetRequest
diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.MongoDB/MongoWorkflowSignalStore.cs b/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.MongoDB/MongoWorkflowSignalStore.cs
index f88cbb897..d6171c087 100644
--- a/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.MongoDB/MongoWorkflowSignalStore.cs
+++ b/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.MongoDB/MongoWorkflowSignalStore.cs
@@ -284,14 +284,24 @@ public sealed class MongoWorkflowSignalStore(
filter &= Builders.Filter.Eq(x => x.SignalType, request.SignalType);
}
+ var effectiveTake = request.Take > 0 ? request.Take : request.MaxMessages;
+ effectiveTake = Math.Clamp(effectiveTake, 1, 500);
+ var effectiveSkip = Math.Max(0, request.Skip);
+
+ // Count for the response envelope so the UI can render "page X of Y".
+ var totalCount = (int)Math.Min(int.MaxValue,
+ await deadLetters.CountDocumentsAsync(filter, cancellationToken: cancellationToken));
+
+ var sort = BuildDeadLetterSort(request.Sort);
var documents = await deadLetters.Find(filter)
- .SortByDescending(x => x.DeadLetteredOnUtc)
- .ThenBy(x => x.SignalId)
- .Limit(Math.Clamp(request.MaxMessages, 1, 200))
+ .Sort(sort)
+ .Skip(effectiveSkip)
+ .Limit(effectiveTake)
.ToListAsync(cancellationToken);
return new WorkflowSignalDeadLettersGetResponse
{
+ TotalCount = totalCount,
Messages = documents.Select(document =>
{
try
@@ -342,6 +352,33 @@ public sealed class MongoWorkflowSignalStore(
};
}
+ private static SortDefinition BuildDeadLetterSort(WorkflowSortModel? sort)
+ {
+ var builder = Builders.Sort;
+ if (sort is null)
+ {
+ return builder.Combine(
+ builder.Descending(x => x.DeadLetteredOnUtc),
+ builder.Ascending(x => x.SignalId));
+ }
+
+ var desc = string.Equals(sort.Direction?.Trim(), "desc", StringComparison.OrdinalIgnoreCase);
+ SortDefinition primary = (sort.Prop ?? string.Empty).ToLowerInvariant() switch
+ {
+ "signalid" => desc ? builder.Descending(x => x.SignalId) : builder.Ascending(x => x.SignalId),
+ "workflowinstanceid" => desc ? builder.Descending(x => x.WorkflowInstanceId) : builder.Ascending(x => x.WorkflowInstanceId),
+ "signaltype" => desc ? builder.Descending(x => x.SignalType) : builder.Ascending(x => x.SignalType),
+ "enqueuedonutc" => desc ? builder.Descending(x => x.EnqueuedOnUtc) : builder.Ascending(x => x.EnqueuedOnUtc),
+ "deliverycount" => desc ? builder.Descending(x => x.DeliveryCount) : builder.Ascending(x => x.DeliveryCount),
+ _ => throw new ArgumentException(
+ $"Sort column '{sort.Prop}' is not supported for dead-letter listing. "
+ + "Allowed: signalId, workflowInstanceId, signalType, enqueuedOnUtc, deliveryCount."),
+ };
+
+ // Stable tie-breaker by SignalId (effectively the collection's unique key for dead letters).
+ return builder.Combine(primary, builder.Ascending(x => x.SignalId));
+ }
+
internal async Task ReplayAsync(
WorkflowSignalDeadLetterReplayRequest request,
CancellationToken cancellationToken = default)
diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.PostgreSQL/PostgresWorkflowSignalStore.cs b/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.PostgreSQL/PostgresWorkflowSignalStore.cs
index 02b208b2f..8dfa4a906 100644
--- a/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.PostgreSQL/PostgresWorkflowSignalStore.cs
+++ b/src/Workflow/__Libraries/StellaOps.Workflow.DataStore.PostgreSQL/PostgresWorkflowSignalStore.cs
@@ -305,7 +305,35 @@ public sealed class PostgresWorkflowSignalStore(
{
ArgumentNullException.ThrowIfNull(request);
+ // Resolve an ORDER BY clause from the whitelist — never interpolate user input into SQL.
+ var orderBy = ResolveDeadLetterOrderBy(request.Sort);
+
+ // Effective cap: use Take when given; otherwise MaxMessages. Hard-cap at 500 so a slow
+ // driver can't be asked for an unbounded result set.
+ var effectiveTake = request.Take > 0 ? request.Take : request.MaxMessages;
+ effectiveTake = Math.Clamp(effectiveTake, 1, 500);
+ var effectiveSkip = Math.Max(0, request.Skip);
+
await using var scope = await database.OpenScopeAsync(requireTransaction: false, cancellationToken);
+
+ // 1) Total count with the same WHERE (so the UI can show "page X of Y").
+ int totalCount;
+ await using (var countCommand = database.CreateCommand(
+ scope.Connection,
+ $"""
+ select count(*)
+ from {database.Qualify(Postgres.DeadLetterTableName)}
+ where (@signal_id is null or signal_id = @signal_id)
+ and (@workflow_instance_id is null or workflow_instance_id = @workflow_instance_id)
+ and (@signal_type is null or signal_type = @signal_type)
+ """))
+ {
+ countCommand.Parameters.Add("signal_id", NpgsqlDbType.Text).Value = (object?)request.SignalId ?? DBNull.Value;
+ countCommand.Parameters.Add("workflow_instance_id", NpgsqlDbType.Text).Value = (object?)request.WorkflowInstanceId ?? DBNull.Value;
+ countCommand.Parameters.Add("signal_type", NpgsqlDbType.Text).Value = (object?)request.SignalType ?? DBNull.Value;
+ totalCount = Convert.ToInt32(await countCommand.ExecuteScalarAsync(cancellationToken) ?? 0);
+ }
+
await using var command = database.CreateCommand(
scope.Connection,
$"""
@@ -325,13 +353,15 @@ public sealed class PostgresWorkflowSignalStore(
where (@signal_id is null or signal_id = @signal_id)
and (@workflow_instance_id is null or workflow_instance_id = @workflow_instance_id)
and (@signal_type is null or signal_type = @signal_type)
- order by dead_lettered_on_utc desc, signal_id
+ {orderBy}
+ offset @skip
limit @max_messages
""");
command.Parameters.Add("signal_id", NpgsqlDbType.Text).Value = (object?)request.SignalId ?? DBNull.Value;
command.Parameters.Add("workflow_instance_id", NpgsqlDbType.Text).Value = (object?)request.WorkflowInstanceId ?? DBNull.Value;
command.Parameters.Add("signal_type", NpgsqlDbType.Text).Value = (object?)request.SignalType ?? DBNull.Value;
- command.Parameters.AddWithValue("max_messages", Math.Clamp(request.MaxMessages, 1, 200));
+ command.Parameters.AddWithValue("max_messages", effectiveTake);
+ command.Parameters.AddWithValue("skip", effectiveSkip);
var results = new List();
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
@@ -387,9 +417,35 @@ public sealed class PostgresWorkflowSignalStore(
return new WorkflowSignalDeadLettersGetResponse
{
Messages = results,
+ TotalCount = totalCount,
};
}
+ ///
+ /// Maps a whitelisted to a literal ORDER BY clause.
+ /// Enforcing the whitelist here is what keeps client-supplied identifiers out of the raw SQL
+ /// string. The PK-ish tie-breaker (signal_id) keeps pagination stable across ties.
+ ///
+ private static string ResolveDeadLetterOrderBy(WorkflowSortModel? sort)
+ {
+ if (sort is null) return "order by dead_lettered_on_utc desc, signal_id";
+
+ var desc = string.Equals(sort.Direction?.Trim(), "desc", StringComparison.OrdinalIgnoreCase);
+ var dir = desc ? "desc" : "asc";
+ var column = (sort.Prop ?? string.Empty).ToLowerInvariant() switch
+ {
+ "signalid" => "signal_id",
+ "workflowinstanceid" => "workflow_instance_id",
+ "signaltype" => "signal_type",
+ "enqueuedonutc" => "enqueued_on_utc",
+ "deliverycount" => "delivery_count",
+ _ => throw new ArgumentException(
+ $"Sort column '{sort.Prop}' is not supported for dead-letter listing. "
+ + "Allowed: signalId, workflowInstanceId, signalType, enqueuedOnUtc, deliveryCount."),
+ };
+ return $"order by {column} {dir}, signal_id";
+ }
+
internal async Task ReplayAsync(
WorkflowSignalDeadLetterReplayRequest request,
CancellationToken cancellationToken = default)
diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Constants/MessageKeys.cs b/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Constants/MessageKeys.cs
index 915b1a5e3..6d6a76f2a 100644
--- a/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Constants/MessageKeys.cs
+++ b/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Constants/MessageKeys.cs
@@ -11,4 +11,5 @@ public static class MessageKeys
public static string WorkflowPayloadFieldMissing => nameof(WorkflowPayloadFieldMissing);
public static string WorkflowTransportFailed => nameof(WorkflowTransportFailed);
public static string WorkflowRuntimeFailed => nameof(WorkflowRuntimeFailed);
+ public static string WorkflowSortColumnNotAllowed => nameof(WorkflowSortColumnNotAllowed);
}
diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Projections/WorkflowProjectionStore.cs b/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Projections/WorkflowProjectionStore.cs
index 19aeb9e5b..75f1719bf 100644
--- a/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Projections/WorkflowProjectionStore.cs
+++ b/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Projections/WorkflowProjectionStore.cs
@@ -140,23 +140,30 @@ public sealed class WorkflowProjectionStore(
query = query.Where(x => x.Status == request.Status);
}
- var tasks = await query
- .OrderBy(x => x.CreatedOnUtc)
- .ToListAsync(cancellationToken);
+ var orderedQuery = WorkflowSortExpressions.ApplyTaskSort(query, request.Sort);
+
+ var tasks = await orderedQuery.ToListAsync(cancellationToken);
var summaries = tasks
.Select(MapTaskSummary)
.Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(businessReferenceKey, request.BusinessReferenceParts))
.ToArray();
- if (request.CandidateRoles.Count == 0)
+ if (request.CandidateRoles.Count > 0)
{
- return summaries;
+ summaries = summaries
+ .Where(x => x.EffectiveRoles.Intersect(request.CandidateRoles, StringComparer.OrdinalIgnoreCase).Any())
+ .ToArray();
}
- return summaries
- .Where(x => x.EffectiveRoles.Intersect(request.CandidateRoles, StringComparer.OrdinalIgnoreCase).Any())
- .ToArray();
+ // Honour Skip/Take when the client asked for a page (0 means "all").
+ if (request.Skip > 0 || request.Take > 0)
+ {
+ var take = request.Take > 0 ? request.Take : summaries.Length;
+ summaries = summaries.Skip(request.Skip).Take(take).ToArray();
+ }
+
+ return summaries;
}
public async Task GetTaskAsync(
@@ -502,8 +509,9 @@ public sealed class WorkflowProjectionStore(
query = query.Where(x => x.Status == request.Status);
}
- var instances = await query
- .OrderByDescending(x => x.CreatedOnUtc)
+ var orderedQuery = WorkflowSortExpressions.ApplyInstanceSort(query, request.Sort);
+
+ var instances = await orderedQuery
.Select(x => new WorkflowInstanceSummary
{
WorkflowInstanceId = x.WorkflowInstanceId,
@@ -516,9 +524,17 @@ public sealed class WorkflowProjectionStore(
})
.ToArrayAsync(cancellationToken);
- return instances
+ var filtered = instances
.Where(x => x.BusinessReference.MatchesBusinessReferenceFilter(businessReferenceKey, request.BusinessReferenceParts))
.ToArray();
+
+ if (request.Skip > 0 || request.Take > 0)
+ {
+ var take = request.Take > 0 ? request.Take : filtered.Length;
+ filtered = filtered.Skip(request.Skip).Take(take).ToArray();
+ }
+
+ return filtered;
}
public async Task GetInstanceAsync(
diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Projections/WorkflowSortExpressions.cs b/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Projections/WorkflowSortExpressions.cs
new file mode 100644
index 000000000..15de2aa58
--- /dev/null
+++ b/src/Workflow/__Libraries/StellaOps.Workflow.Engine/Projections/WorkflowSortExpressions.cs
@@ -0,0 +1,80 @@
+using System;
+using System.Linq;
+
+using StellaOps.Workflow.Contracts;
+using StellaOps.Workflow.DataStore.Oracle.Entities;
+using StellaOps.Workflow.Engine.Constants;
+using StellaOps.Workflow.Engine.Exceptions;
+
+namespace StellaOps.Workflow.Engine.Projections;
+
+///
+/// Per-entity sort application for list endpoints. Each method:
+/// 1. Matches against a hard-coded whitelist — this is the
+/// only place client-supplied identifiers touch EF, so non-whitelisted values become a
+/// instead of slipping through into a dynamic ORDER BY.
+/// 2. Appends a PK tie-breaker (.ThenBy(x => x.Id)) so pagination stays stable across
+/// rows that compare equal on the requested sort column.
+/// 3. Falls back to a sensible default when is null (mirrors the previous
+/// hard-coded ordering).
+///
+public static class WorkflowSortExpressions
+{
+ public static IOrderedQueryable ApplyInstanceSort(
+ IQueryable query,
+ WorkflowSortModel? sort)
+ {
+ if (sort is null)
+ {
+ return query
+ .OrderByDescending(x => x.CreatedOnUtc)
+ .ThenBy(x => x.Id);
+ }
+
+ var desc = IsDescending(sort.Direction);
+ IOrderedQueryable ordered = (sort.Prop ?? string.Empty).ToLowerInvariant() switch
+ {
+ "workflowinstanceid" => desc ? query.OrderByDescending(x => x.WorkflowInstanceId) : query.OrderBy(x => x.WorkflowInstanceId),
+ "workflowname" => desc ? query.OrderByDescending(x => x.WorkflowName) : query.OrderBy(x => x.WorkflowName),
+ "workflowversion" => desc ? query.OrderByDescending(x => x.WorkflowVersion) : query.OrderBy(x => x.WorkflowVersion),
+ "status" => desc ? query.OrderByDescending(x => x.Status) : query.OrderBy(x => x.Status),
+ "createdonutc" => desc ? query.OrderByDescending(x => x.CreatedOnUtc) : query.OrderBy(x => x.CreatedOnUtc),
+ "completedonutc" => desc ? query.OrderByDescending(x => x.CompletedOnUtc) : query.OrderBy(x => x.CompletedOnUtc),
+ "businessreferencekey" => desc ? query.OrderByDescending(x => x.BusinessReferenceKey) : query.OrderBy(x => x.BusinessReferenceKey),
+ _ => throw new BaseResultException(MessageKeys.WorkflowSortColumnNotAllowed, sort.Prop ?? string.Empty, "instances"),
+ };
+ return ordered.ThenBy(x => x.Id);
+ }
+
+ public static IOrderedQueryable ApplyTaskSort(
+ IQueryable query,
+ WorkflowSortModel? sort)
+ {
+ if (sort is null)
+ {
+ return query
+ .OrderByDescending(x => x.CreatedOnUtc)
+ .ThenBy(x => x.Id);
+ }
+
+ var desc = IsDescending(sort.Direction);
+ IOrderedQueryable ordered = (sort.Prop ?? string.Empty).ToLowerInvariant() switch
+ {
+ "workflowtaskid" => desc ? query.OrderByDescending(x => x.WorkflowTaskId) : query.OrderBy(x => x.WorkflowTaskId),
+ "taskname" => desc ? query.OrderByDescending(x => x.TaskName) : query.OrderBy(x => x.TaskName),
+ "workflowname" => desc ? query.OrderByDescending(x => x.WorkflowName) : query.OrderBy(x => x.WorkflowName),
+ "workflowversion" => desc ? query.OrderByDescending(x => x.WorkflowVersion) : query.OrderBy(x => x.WorkflowVersion),
+ "status" => desc ? query.OrderByDescending(x => x.Status) : query.OrderBy(x => x.Status),
+ "assignee" => desc ? query.OrderByDescending(x => x.Assignee) : query.OrderBy(x => x.Assignee),
+ "createdonutc" => desc ? query.OrderByDescending(x => x.CreatedOnUtc) : query.OrderBy(x => x.CreatedOnUtc),
+ "completedonutc" => desc ? query.OrderByDescending(x => x.CompletedOnUtc) : query.OrderBy(x => x.CompletedOnUtc),
+ // DeadlineUtc maps to StaleAfterUtc on the projection (the contract exposes the deadline, the projection stores staleness).
+ "deadlineutc" => desc ? query.OrderByDescending(x => x.StaleAfterUtc) : query.OrderBy(x => x.StaleAfterUtc),
+ _ => throw new BaseResultException(MessageKeys.WorkflowSortColumnNotAllowed, sort.Prop ?? string.Empty, "tasks"),
+ };
+ return ordered.ThenBy(x => x.Id);
+ }
+
+ private static bool IsDescending(string? direction) =>
+ string.Equals(direction?.Trim(), "desc", StringComparison.OrdinalIgnoreCase);
+}
diff --git a/src/Workflow/__Libraries/StellaOps.Workflow.Signaling.OracleAq/OracleAqWorkflowSignalDeadLetterStore.cs b/src/Workflow/__Libraries/StellaOps.Workflow.Signaling.OracleAq/OracleAqWorkflowSignalDeadLetterStore.cs
index 40a159d8f..e40ab4ac0 100644
--- a/src/Workflow/__Libraries/StellaOps.Workflow.Signaling.OracleAq/OracleAqWorkflowSignalDeadLetterStore.cs
+++ b/src/Workflow/__Libraries/StellaOps.Workflow.Signaling.OracleAq/OracleAqWorkflowSignalDeadLetterStore.cs
@@ -26,16 +26,21 @@ public sealed class OracleAqWorkflowSignalDeadLetterStore(
{
ArgumentNullException.ThrowIfNull(request);
+ // Oracle AQ browse has no offset / sort / count facility, so we materialise up to the
+ // safety cap, filter+sort+page in process, and report TotalCount = post-filter length.
+ // Callers paging past the cap will silently miss later messages — the documented trade-off
+ // for the AQ driver.
+ var safetyCap = Math.Clamp(request.MaxMessages <= 0 ? 500 : request.MaxMessages, 1, 500);
var rawMessages = await transport.BrowseAsync(
new OracleAqBrowseRequest
{
QueueName = options.DeadLetterQueueName,
Correlation = request.SignalId,
- MaxMessages = Math.Clamp(request.MaxMessages, 1, 200),
+ MaxMessages = safetyCap,
},
cancellationToken);
- var results = new List(rawMessages.Count);
+ var filtered = new List(rawMessages.Count);
foreach (var rawMessage in rawMessages)
{
var mapped = MapMessage(rawMessage, request.IncludeRawPayload);
@@ -44,15 +49,63 @@ public sealed class OracleAqWorkflowSignalDeadLetterStore(
continue;
}
- results.Add(mapped);
+ filtered.Add(mapped);
}
+ var sorted = ApplyInMemorySort(filtered, request.Sort);
+
+ var totalCount = sorted.Count;
+ var effectiveTake = request.Take > 0 ? request.Take : safetyCap;
+ effectiveTake = Math.Clamp(effectiveTake, 1, 500);
+ var effectiveSkip = Math.Max(0, request.Skip);
+ var page = sorted.Skip(effectiveSkip).Take(effectiveTake).ToArray();
+
return new WorkflowSignalDeadLettersGetResponse
{
- Messages = results,
+ Messages = page,
+ TotalCount = totalCount,
};
}
+ ///
+ /// Applies the sort whitelist in memory. Queue browse gives us messages in enqueue order; we
+ /// reorder them here before paging so the UI sees a stable page independent of queue position.
+ ///
+ private static IReadOnlyList ApplyInMemorySort(
+ List messages,
+ WorkflowSortModel? sort)
+ {
+ if (sort is null)
+ {
+ return messages
+ .OrderByDescending(x => x.EnqueuedOnUtc)
+ .ThenBy(x => x.SignalId, StringComparer.OrdinalIgnoreCase)
+ .ToArray();
+ }
+
+ var desc = string.Equals(sort.Direction?.Trim(), "desc", StringComparison.OrdinalIgnoreCase);
+ IOrderedEnumerable ordered = (sort.Prop ?? string.Empty).ToLowerInvariant() switch
+ {
+ "signalid" => desc ? messages.OrderByDescending(x => x.SignalId, StringComparer.OrdinalIgnoreCase)
+ : messages.OrderBy(x => x.SignalId, StringComparer.OrdinalIgnoreCase),
+ "workflowinstanceid" => desc ? messages.OrderByDescending(x => x.WorkflowInstanceId, StringComparer.OrdinalIgnoreCase)
+ : messages.OrderBy(x => x.WorkflowInstanceId, StringComparer.OrdinalIgnoreCase),
+ "signaltype" => desc ? messages.OrderByDescending(x => x.SignalType, StringComparer.OrdinalIgnoreCase)
+ : messages.OrderBy(x => x.SignalType, StringComparer.OrdinalIgnoreCase),
+ "enqueuedonutc" => desc ? messages.OrderByDescending(x => x.EnqueuedOnUtc)
+ : messages.OrderBy(x => x.EnqueuedOnUtc),
+ "deliverycount" => desc ? messages.OrderByDescending(x => x.DeliveryCount)
+ : messages.OrderBy(x => x.DeliveryCount),
+ _ => throw new ArgumentException(
+ $"Sort column '{sort.Prop}' is not supported for dead-letter listing. "
+ + "Allowed: signalId, workflowInstanceId, signalType, enqueuedOnUtc, deliveryCount."),
+ };
+
+ return ordered
+ .ThenBy(x => x.SignalId, StringComparer.OrdinalIgnoreCase)
+ .ToArray();
+ }
+
public async Task ReplayAsync(
WorkflowSignalDeadLetterReplayRequest request,
CancellationToken cancellationToken = default)
diff --git a/src/Workflow/__Tests/StellaOps.Workflow.Engine.Tests/Projections/WorkflowSortExpressionsTests.cs b/src/Workflow/__Tests/StellaOps.Workflow.Engine.Tests/Projections/WorkflowSortExpressionsTests.cs
new file mode 100644
index 000000000..667009acd
--- /dev/null
+++ b/src/Workflow/__Tests/StellaOps.Workflow.Engine.Tests/Projections/WorkflowSortExpressionsTests.cs
@@ -0,0 +1,158 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+
+using StellaOps.Workflow.Contracts;
+using StellaOps.Workflow.DataStore.Oracle.Entities;
+using StellaOps.Workflow.Engine.Exceptions;
+using StellaOps.Workflow.Engine.Projections;
+
+using FluentAssertions;
+
+using NUnit.Framework;
+
+namespace StellaOps.Workflow.Engine.Tests.Projections;
+
+///
+/// Direct unit tests for . These bypass DI entirely so the
+/// whitelist + tie-breaker behaviour is fast and deterministic to verify, and mirrored on both
+/// the Ablera and upstream StellaOps sides.
+///
+[TestFixture]
+public class WorkflowSortExpressionsTests
+{
+ [Test]
+ public void ApplyInstanceSort_NullSort_DefaultsToCreatedOnUtcDescWithPkTieBreaker()
+ {
+ var now = DateTime.UtcNow;
+ var data = new[]
+ {
+ BuildInstance(id: 1, createdOnUtc: now.AddMinutes(-10)),
+ BuildInstance(id: 2, createdOnUtc: now),
+ BuildInstance(id: 3, createdOnUtc: now), // tie on createdOnUtc with #2
+ }.AsQueryable();
+
+ var ordered = WorkflowSortExpressions.ApplyInstanceSort(data, sort: null).ToArray();
+
+ ordered.Select(x => x.Id).Should().Equal(new[] { 2m, 3m, 1m }, "desc by CreatedOnUtc, then PK asc for the tie");
+ }
+
+ [TestCase("workflowName", "asc", new[] { "Alpha", "Beta", "Gamma" })]
+ [TestCase("workflowName", "desc", new[] { "Gamma", "Beta", "Alpha" })]
+ [TestCase("status", "asc", new[] { "Completed", "Open", "Pending" })]
+ public void ApplyInstanceSort_WhitelistedColumns_OrdersAsExpected(string sortBy, string direction, string[] expected)
+ {
+ var data = new[]
+ {
+ BuildInstance(id: 1, workflowName: "Beta", status: "Open"),
+ BuildInstance(id: 2, workflowName: "Gamma", status: "Pending"),
+ BuildInstance(id: 3, workflowName: "Alpha", status: "Completed"),
+ }.AsQueryable();
+
+ var sort = new WorkflowSortModel { Prop = sortBy, Direction = direction };
+ var ordered = WorkflowSortExpressions.ApplyInstanceSort(data, sort).ToArray();
+
+ var projected = sortBy == "status"
+ ? ordered.Select(x => x.Status).ToArray()
+ : ordered.Select(x => x.WorkflowName).ToArray();
+ projected.Should().Equal(expected);
+ }
+
+ [Test]
+ public void ApplyInstanceSort_UnknownColumn_ThrowsBaseResultException()
+ {
+ var data = new[] { BuildInstance(id: 1) }.AsQueryable();
+
+ var act = () => WorkflowSortExpressions.ApplyInstanceSort(
+ data,
+ new WorkflowSortModel { Prop = "attackerInjection; DROP TABLE --", Direction = "asc" }).ToArray();
+
+ act.Should().Throw();
+ }
+
+ [TestCase("taskName", "asc", new[] { "approve", "notify", "review" })]
+ [TestCase("assignee", "desc", new[] { "yuri", "alice", null })]
+ public void ApplyTaskSort_WhitelistedColumns_OrdersAsExpected(string sortBy, string direction, string?[] expected)
+ {
+ var data = new[]
+ {
+ BuildTask(id: 1, taskName: "review", assignee: "alice"),
+ BuildTask(id: 2, taskName: "approve", assignee: null),
+ BuildTask(id: 3, taskName: "notify", assignee: "yuri"),
+ }.AsQueryable();
+
+ var sort = new WorkflowSortModel { Prop = sortBy, Direction = direction };
+ var ordered = WorkflowSortExpressions.ApplyTaskSort(data, sort).ToArray();
+
+ var projected = sortBy == "assignee"
+ ? ordered.Select(x => x.Assignee).ToArray()
+ : ordered.Select(x => x.TaskName).Cast().ToArray();
+ projected.Should().Equal(expected);
+ }
+
+ [Test]
+ public void ApplyTaskSort_DeadlineUtc_MapsToStaleAfterUtcColumn()
+ {
+ // DeadlineUtc is the contract-facing name; the projection column is StaleAfterUtc.
+ var data = new[]
+ {
+ BuildTask(id: 1, staleAfterUtc: DateTime.UtcNow.AddHours(2)),
+ BuildTask(id: 2, staleAfterUtc: DateTime.UtcNow.AddHours(1)),
+ BuildTask(id: 3, staleAfterUtc: null),
+ }.AsQueryable();
+
+ var ordered = WorkflowSortExpressions.ApplyTaskSort(
+ data,
+ new WorkflowSortModel { Prop = "deadlineUtc", Direction = "asc" }).ToArray();
+
+ ordered.Select(x => x.Id).Should().Equal(new[] { 3m, 2m, 1m }, "null sorts first in asc; then earliest deadline first");
+ }
+
+ [Test]
+ public void ApplyTaskSort_UnknownColumn_ThrowsBaseResultException()
+ {
+ var data = new[] { BuildTask(id: 1) }.AsQueryable();
+
+ var act = () => WorkflowSortExpressions.ApplyTaskSort(
+ data,
+ new WorkflowSortModel { Prop = "nonexistentColumn", Direction = "asc" }).ToArray();
+
+ act.Should().Throw();
+ }
+
+ private static WorkflowInstanceProjection BuildInstance(
+ int id,
+ string workflowName = "Workflow",
+ string status = "Open",
+ DateTime? createdOnUtc = null) => new()
+ {
+ Id = id,
+ WorkflowInstanceId = $"inst-{id}",
+ WorkflowName = workflowName,
+ WorkflowVersion = "1.0.0",
+ BusinessReferenceKey = null,
+ BusinessReferenceJson = null,
+ Status = status,
+ CreatedOnUtc = createdOnUtc ?? DateTime.UtcNow.AddMinutes(-id),
+ };
+
+ private static WorkflowTaskProjection BuildTask(
+ int id,
+ string taskName = "task",
+ string? assignee = null,
+ DateTime? staleAfterUtc = null) => new()
+ {
+ Id = id,
+ WorkflowTaskId = $"task-{id}",
+ WorkflowInstanceId = $"inst-{id}",
+ WorkflowName = "Workflow",
+ WorkflowVersion = "1.0.0",
+ TaskName = taskName,
+ TaskType = "HumanTask",
+ Route = "default/task",
+ Assignee = assignee,
+ Status = "Open",
+ CreatedOnUtc = DateTime.UtcNow.AddMinutes(-id),
+ StaleAfterUtc = staleAfterUtc,
+ };
+}