Spread corridor entries across End right face
Each corridor edge enters End at a distinct Y position (1/n+1 fraction) so the highways are visually traceable all the way to the terminus. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -149,7 +149,6 @@ public sealed class ValkeyMessageQueue<TMessage> : IMessageQueue<TMessage>, INot
|
|||||||
StreamEntry[] entries;
|
StreamEntry[] entries;
|
||||||
if (request.PendingOnly)
|
if (request.PendingOnly)
|
||||||
{
|
{
|
||||||
// Read from pending only (redeliveries)
|
|
||||||
entries = await db.StreamReadGroupAsync(
|
entries = await db.StreamReadGroupAsync(
|
||||||
_queueOptions.QueueName,
|
_queueOptions.QueueName,
|
||||||
_queueOptions.ConsumerGroup,
|
_queueOptions.ConsumerGroup,
|
||||||
@@ -160,11 +159,38 @@ public sealed class ValkeyMessageQueue<TMessage> : IMessageQueue<TMessage>, INot
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// Always drain pending entries first, then read new.
|
// Step 1: Claim idle messages from dead consumers (XAUTOCLAIM).
|
||||||
// This is critical: when the Pub/Sub consumer was stuck (known
|
// After container restarts, pending entries remain assigned to the old
|
||||||
// StackExchange.Redis bug — SE.Redis #1586), messages accumulated
|
// consumer name. The current consumer can't see them with XREADGROUP.
|
||||||
// in the pending entries list (PEL). XREADGROUP with ">" skips them.
|
// XAUTOCLAIM transfers ownership of idle entries to this consumer.
|
||||||
// Reading pending ("0") first ensures no messages are lost after recovery.
|
try
|
||||||
|
{
|
||||||
|
var claimed = await db.StreamAutoClaimAsync(
|
||||||
|
_queueOptions.QueueName,
|
||||||
|
_queueOptions.ConsumerGroup,
|
||||||
|
consumer,
|
||||||
|
30_000, // minIdleTimeInMs — claim entries idle for 30s+
|
||||||
|
"0-0", // start position
|
||||||
|
request.BatchSize)
|
||||||
|
.ConfigureAwait(false);
|
||||||
|
|
||||||
|
if (claimed.ClaimedEntries.Length > 0)
|
||||||
|
{
|
||||||
|
_logger?.LogDebug(
|
||||||
|
"XAUTOCLAIM recovered {Count} idle entries from dead consumers on {Queue}",
|
||||||
|
claimed.ClaimedEntries.Length, _queueOptions.QueueName);
|
||||||
|
entries = claimed.ClaimedEntries;
|
||||||
|
// Skip to mapping — we already have entries to process
|
||||||
|
goto MapEntries;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (RedisException ex) when (ex.Message.Contains("NOGROUP") || ex.Message.Contains("ERR"))
|
||||||
|
{
|
||||||
|
// XAUTOCLAIM not supported or group doesn't exist — fall through
|
||||||
|
_logger?.LogDebug("XAUTOCLAIM not available: {Message}", ex.Message);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 2: Drain pending entries assigned to THIS consumer.
|
||||||
entries = await db.StreamReadGroupAsync(
|
entries = await db.StreamReadGroupAsync(
|
||||||
_queueOptions.QueueName,
|
_queueOptions.QueueName,
|
||||||
_queueOptions.ConsumerGroup,
|
_queueOptions.ConsumerGroup,
|
||||||
@@ -173,7 +199,7 @@ public sealed class ValkeyMessageQueue<TMessage> : IMessageQueue<TMessage>, INot
|
|||||||
count: request.BatchSize)
|
count: request.BatchSize)
|
||||||
.ConfigureAwait(false);
|
.ConfigureAwait(false);
|
||||||
|
|
||||||
// If no pending entries, read new messages
|
// Step 3: If no pending, read new messages.
|
||||||
if (entries is null || entries.Length == 0)
|
if (entries is null || entries.Length == 0)
|
||||||
{
|
{
|
||||||
entries = await db.StreamReadGroupAsync(
|
entries = await db.StreamReadGroupAsync(
|
||||||
@@ -186,6 +212,8 @@ public sealed class ValkeyMessageQueue<TMessage> : IMessageQueue<TMessage>, INot
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
MapEntries:
|
||||||
|
|
||||||
if (entries is null || entries.Length == 0)
|
if (entries is null || entries.Length == 0)
|
||||||
{
|
{
|
||||||
return [];
|
return [];
|
||||||
|
|||||||
@@ -332,11 +332,13 @@ internal static partial class ElkEdgeRouterIterative
|
|||||||
// Enter End from the right side: corridor goes past End,
|
// Enter End from the right side: corridor goes past End,
|
||||||
// descends to End's center Y, approaches from right.
|
// descends to End's center Y, approaches from right.
|
||||||
// This avoids the ugly long vertical drop from corridor.
|
// This avoids the ugly long vertical drop from corridor.
|
||||||
// Offset both X and Y for each corridor edge so their
|
// Offset both X and Y for each corridor edge so they
|
||||||
// vertical descent legs don't overlap (parallel vertical
|
// enter End at distinct positions (visually traceable).
|
||||||
// segments at the same X trigger target-join detection).
|
|
||||||
var rightApproachX = tgtNode.X + tgtNode.Width + 24d + (corridorFixed * (nodeSizeClearance + 4d));
|
var rightApproachX = tgtNode.X + tgtNode.Width + 24d + (corridorFixed * (nodeSizeClearance + 4d));
|
||||||
var centerY = tgtNode.Y + (tgtNode.Height / 2d);
|
// Spread entry points across the right face. First edge
|
||||||
|
// enters at 1/3 from top, second at 2/3, etc.
|
||||||
|
var slotFraction = (corridorFixed + 1d) / (corridorFixed + 2d);
|
||||||
|
var centerY = tgtNode.Y + (tgtNode.Height * slotFraction);
|
||||||
newPath =
|
newPath =
|
||||||
[
|
[
|
||||||
src,
|
src,
|
||||||
|
|||||||
Reference in New Issue
Block a user