diff --git a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyMessageQueue.cs b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyMessageQueue.cs index aa7be1c9e..045eee07a 100644 --- a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyMessageQueue.cs +++ b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyMessageQueue.cs @@ -149,7 +149,6 @@ public sealed class ValkeyMessageQueue : IMessageQueue, INot StreamEntry[] entries; if (request.PendingOnly) { - // Read from pending only (redeliveries) entries = await db.StreamReadGroupAsync( _queueOptions.QueueName, _queueOptions.ConsumerGroup, @@ -160,11 +159,38 @@ public sealed class ValkeyMessageQueue : IMessageQueue, INot } else { - // Always drain pending entries first, then read new. - // This is critical: when the Pub/Sub consumer was stuck (known - // StackExchange.Redis bug — SE.Redis #1586), messages accumulated - // in the pending entries list (PEL). XREADGROUP with ">" skips them. - // Reading pending ("0") first ensures no messages are lost after recovery. + // Step 1: Claim idle messages from dead consumers (XAUTOCLAIM). + // After container restarts, pending entries remain assigned to the old + // consumer name. The current consumer can't see them with XREADGROUP. + // XAUTOCLAIM transfers ownership of idle entries to this consumer. + try + { + var claimed = await db.StreamAutoClaimAsync( + _queueOptions.QueueName, + _queueOptions.ConsumerGroup, + consumer, + 30_000, // minIdleTimeInMs — claim entries idle for 30s+ + "0-0", // start position + request.BatchSize) + .ConfigureAwait(false); + + if (claimed.ClaimedEntries.Length > 0) + { + _logger?.LogDebug( + "XAUTOCLAIM recovered {Count} idle entries from dead consumers on {Queue}", + claimed.ClaimedEntries.Length, _queueOptions.QueueName); + entries = claimed.ClaimedEntries; + // Skip to mapping — we already have entries to process + goto MapEntries; + } + } + catch (RedisException ex) when (ex.Message.Contains("NOGROUP") || ex.Message.Contains("ERR")) + { + // XAUTOCLAIM not supported or group doesn't exist — fall through + _logger?.LogDebug("XAUTOCLAIM not available: {Message}", ex.Message); + } + + // Step 2: Drain pending entries assigned to THIS consumer. entries = await db.StreamReadGroupAsync( _queueOptions.QueueName, _queueOptions.ConsumerGroup, @@ -173,7 +199,7 @@ public sealed class ValkeyMessageQueue : IMessageQueue, INot count: request.BatchSize) .ConfigureAwait(false); - // If no pending entries, read new messages + // Step 3: If no pending, read new messages. if (entries is null || entries.Length == 0) { entries = await db.StreamReadGroupAsync( @@ -186,6 +212,8 @@ public sealed class ValkeyMessageQueue : IMessageQueue, INot } } + MapEntries: + if (entries is null || entries.Length == 0) { return []; diff --git a/src/__Libraries/StellaOps.ElkSharp/ElkEdgeRouterIterative.WinnerRefinement.Hybrid.cs b/src/__Libraries/StellaOps.ElkSharp/ElkEdgeRouterIterative.WinnerRefinement.Hybrid.cs index 72463d688..947054a63 100644 --- a/src/__Libraries/StellaOps.ElkSharp/ElkEdgeRouterIterative.WinnerRefinement.Hybrid.cs +++ b/src/__Libraries/StellaOps.ElkSharp/ElkEdgeRouterIterative.WinnerRefinement.Hybrid.cs @@ -332,11 +332,13 @@ internal static partial class ElkEdgeRouterIterative // Enter End from the right side: corridor goes past End, // descends to End's center Y, approaches from right. // This avoids the ugly long vertical drop from corridor. - // Offset both X and Y for each corridor edge so their - // vertical descent legs don't overlap (parallel vertical - // segments at the same X trigger target-join detection). + // Offset both X and Y for each corridor edge so they + // enter End at distinct positions (visually traceable). var rightApproachX = tgtNode.X + tgtNode.Width + 24d + (corridorFixed * (nodeSizeClearance + 4d)); - var centerY = tgtNode.Y + (tgtNode.Height / 2d); + // Spread entry points across the right face. First edge + // enters at 1/3 from top, second at 2/3, etc. + var slotFraction = (corridorFixed + 1d) / (corridorFixed + 2d); + var centerY = tgtNode.Y + (tgtNode.Height * slotFraction); newPath = [ src,