diff --git a/devops/compose/docker-compose.stella-ops.yml b/devops/compose/docker-compose.stella-ops.yml
index 41bc7f11f..baa792a05 100644
--- a/devops/compose/docker-compose.stella-ops.yml
+++ b/devops/compose/docker-compose.stella-ops.yml
@@ -61,6 +61,7 @@ x-router-microservice-defaults: &router-microservice-defaults
Router__Messaging__HeartbeatInterval: "10s"
Router__Messaging__valkey__ConnectionString: "cache.stella-ops.local:6379"
Router__Messaging__valkey__Database: "0"
+ Router__Messaging__valkey__QueueWaitTimeoutSeconds: "${VALKEY_QUEUE_WAIT_TIMEOUT:-0}"
# Identity envelope verification (signed by gateway, verified by services)
Router__IdentityEnvelopeSigningKey: "${STELLAOPS_IDENTITY_ENVELOPE_SIGNING_KEY}"
@@ -339,6 +340,7 @@ services:
Gateway__Transports__Messaging__Database: "0"
Gateway__Transports__Messaging__valkey__ConnectionString: "cache.stella-ops.local:6379"
Gateway__Transports__Messaging__valkey__Database: "0"
+ Gateway__Transports__Messaging__valkey__QueueWaitTimeoutSeconds: "${VALKEY_QUEUE_WAIT_TIMEOUT:-0}"
Gateway__Transports__Messaging__RequestQueueTemplate: "router:requests:{service}"
Gateway__Transports__Messaging__ResponseQueueName: "router:responses"
Gateway__Transports__Messaging__ConsumerGroup: "router-gateway"
diff --git a/docs/implplan/SPRINT_20260310_019_DevOps_container_cpu_optimization.md b/docs/implplan/SPRINT_20260310_019_DevOps_container_cpu_optimization.md
index 3dcfc7a0e..34a1101b9 100644
--- a/docs/implplan/SPRINT_20260310_019_DevOps_container_cpu_optimization.md
+++ b/docs/implplan/SPRINT_20260310_019_DevOps_container_cpu_optimization.md
@@ -111,6 +111,24 @@ Task description:
Completion criteria:
- [x] Verified ValkeyMessageQueue already uses push-first pattern
+### WS-7 — Eliminate Valkey Queue Polling Fallback
+Status: DONE
+Dependency: none
+Owners: Developer
+Task description:
+- Remove hardcoded 1s PollingFallback and 1-5s notifiable timeout constants from QueueWaitExtensions.
+- Add configurable `QueueWaitTimeoutSeconds` to ValkeyTransportOptions (default: 0 = pure event-driven).
+- ValkeyMessageQueue.WaitForNotificationAsync uses configured timeout instead of caller-provided value.
+- Compose env var `VALKEY_QUEUE_WAIT_TIMEOUT` (default 0) controls the setting for all services.
+
+Completion criteria:
+- [x] QueueWaitTimeoutSeconds added to ValkeyTransportOptions with default 0
+- [x] ValkeyMessageQueue uses configured timeout (0 = Timeout.InfiniteTimeSpan)
+- [x] Hardcoded PollingFallback/MinimumNotifiableTimeout/MaximumNotifiableTimeout removed from QueueWaitExtensions
+- [x] Compose YAML updated for microservice defaults and gateway
+- [x] All 252 gateway tests pass
+- [x] Compose validates clean (45 services have the setting)
+
### WS-6 — GC Configuration
Status: DONE
Dependency: none
@@ -129,12 +147,14 @@ Completion criteria:
| Date (UTC) | Update | Owner |
| --- | --- | --- |
| 2026-03-10 | Sprint created. All workstreams completed. All 3 C# projects build clean. Compose validates clean. | Developer |
+| 2026-03-10 | WS-7 added: eliminated Valkey queue polling fallback. Default is now pure event-driven (QueueWaitTimeoutSeconds=0). | Developer |
## Decisions & Risks
- Resource limits are dev/QA defaults; production deployments should tune per hardware.
- GCDynamicAdaptationMode=1 requires .NET 8+; all services use .NET 8/9.
- Healthcheck interval override via HEALTHCHECK_INTERVAL env var for operator flexibility.
- Valkey pub/sub notifications are fire-and-forget; fallback timers ensure correctness if missed.
+- QueueWaitTimeoutSeconds defaults to 0 (pure event-driven). Set VALKEY_QUEUE_WAIT_TIMEOUT=5 to restore a 5s safety-net poll if pub/sub proves unreliable.
## Next Checkpoints
- Rebuild affected images (platform, jobengine, graph-indexer) after C# changes merge.
diff --git a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/Options/ValkeyTransportOptions.cs b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/Options/ValkeyTransportOptions.cs
index 0d467e472..d843f9539 100644
--- a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/Options/ValkeyTransportOptions.cs
+++ b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/Options/ValkeyTransportOptions.cs
@@ -37,4 +37,20 @@ public class ValkeyTransportOptions
/// Gets or sets the prefix for idempotency keys.
///
public string IdempotencyKeyPrefix { get; set; } = "msgq:idem:";
+
+ ///
+ /// Gets or sets the queue wait timeout in seconds.
+ /// This is the maximum time a consumer loop will block between XREADGROUP polls
+ /// when no Valkey pub/sub notification arrives.
+ ///
+ /// Default: 0 (pure event-driven mode — consumers only wake on pub/sub
+ /// notifications, no fallback polling). Set to a positive value (e.g. 5)
+ /// to add a safety-net poll interval for environments where pub/sub notifications
+ /// may be unreliable.
+ ///
+ ///
+ /// Configurable via compose env var VALKEY_QUEUE_WAIT_TIMEOUT.
+ ///
+ ///
+ public int QueueWaitTimeoutSeconds { get; set; } = 0;
}
diff --git a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyMessageQueue.cs b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyMessageQueue.cs
index e453b9484..047252be3 100644
--- a/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyMessageQueue.cs
+++ b/src/Router/__Libraries/StellaOps.Messaging.Transport.Valkey/ValkeyMessageQueue.cs
@@ -414,10 +414,18 @@ public sealed class ValkeyMessageQueue : IMessageQueue, INot
{
await EnsureNotificationSubscriptionAsync(cancellationToken).ConfigureAwait(false);
+ // Use the configured timeout from ValkeyTransportOptions if set,
+ // overriding the caller-provided timeout.
+ // QueueWaitTimeoutSeconds=0 means pure event-driven (no fallback poll).
+ var configuredSeconds = _transportOptions.QueueWaitTimeoutSeconds;
+ var effectiveTimeout = configuredSeconds <= 0
+ ? Timeout.InfiniteTimeSpan
+ : TimeSpan.FromSeconds(configuredSeconds);
+
// Wait for a pub/sub signal or timeout (fallback for missed notifications).
try
{
- await _notificationSignal.WaitAsync(timeout, cancellationToken).ConfigureAwait(false);
+ await _notificationSignal.WaitAsync(effectiveTimeout, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
diff --git a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/Extensions/QueueWaitExtensions.cs b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/Extensions/QueueWaitExtensions.cs
index 8815ea905..7ee1f5a31 100644
--- a/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/Extensions/QueueWaitExtensions.cs
+++ b/src/Router/__Libraries/StellaOps.Router.Transport.Messaging/Extensions/QueueWaitExtensions.cs
@@ -5,41 +5,20 @@ namespace StellaOps.Router.Transport.Messaging.Extensions;
///
/// Extension to efficiently await new messages on a queue.
/// Uses Pub/Sub wakeup when available,
-/// falls back to a short delay for non-notifiable queue implementations.
+/// falls back to a delay for non-notifiable queue implementations.
///
+///
+/// For Valkey queues the actual timeout is controlled by
+/// ValkeyTransportOptions.QueueWaitTimeoutSeconds (default 5s, 0 = pure event-driven).
+/// The timeout parameter passed here is used only as a hint for the queue implementation.
+///
internal static class QueueWaitExtensions
{
- private static readonly TimeSpan MinimumNotifiableTimeout = TimeSpan.FromSeconds(1);
- private static readonly TimeSpan MaximumNotifiableTimeout = TimeSpan.FromSeconds(5);
-
///
/// Fallback delay for queues that do not implement .
+ /// Only used by non-Valkey transports (e.g., in-memory test queues).
///
- private static readonly TimeSpan PollingFallback = TimeSpan.FromSeconds(1);
-
- ///
- /// Resolves the safety-net timeout for notifiable queues.
- /// The queue stays push-first; this timeout only covers missed notifications.
- ///
- internal static TimeSpan ResolveNotifiableTimeout(TimeSpan heartbeatInterval)
- {
- if (heartbeatInterval <= TimeSpan.Zero)
- {
- return MaximumNotifiableTimeout;
- }
-
- var derivedSeconds = Math.Max(
- MinimumNotifiableTimeout.TotalSeconds,
- Math.Floor(heartbeatInterval.TotalSeconds / 3d));
-
- var derivedTimeout = TimeSpan.FromSeconds(derivedSeconds);
- if (derivedTimeout > MaximumNotifiableTimeout)
- {
- return MaximumNotifiableTimeout;
- }
-
- return derivedTimeout;
- }
+ private static readonly TimeSpan NonNotifiableFallback = TimeSpan.FromSeconds(5);
///
/// Waits for new messages to become available on the queue.
@@ -52,10 +31,12 @@ internal static class QueueWaitExtensions
{
if (queue is INotifiableQueue notifiable)
{
- return notifiable.WaitForNotificationAsync(ResolveNotifiableTimeout(heartbeatInterval), cancellationToken);
+ // The timeout value is a hint; ValkeyMessageQueue overrides it
+ // with its own ValkeyTransportOptions.QueueWaitTimeoutSeconds.
+ return notifiable.WaitForNotificationAsync(heartbeatInterval, cancellationToken);
}
- return Task.Delay(PollingFallback, cancellationToken);
+ return Task.Delay(NonNotifiableFallback, cancellationToken);
}
public static Task WaitForMessagesAsync(
diff --git a/src/Router/__Tests/StellaOps.Gateway.WebService.Tests/Transport/QueueWaitExtensionsTests.cs b/src/Router/__Tests/StellaOps.Gateway.WebService.Tests/Transport/QueueWaitExtensionsTests.cs
index 8a2a1982e..796f0a0fe 100644
--- a/src/Router/__Tests/StellaOps.Gateway.WebService.Tests/Transport/QueueWaitExtensionsTests.cs
+++ b/src/Router/__Tests/StellaOps.Gateway.WebService.Tests/Transport/QueueWaitExtensionsTests.cs
@@ -7,24 +7,24 @@ namespace StellaOps.Gateway.WebService.Tests.Transport;
public sealed class QueueWaitExtensionsTests
{
[Fact]
- public async Task WaitForMessagesAsync_UsesHeartbeatDerivedTimeout_ForNotifiableQueues()
+ public async Task WaitForMessagesAsync_DelegatesToNotifiableQueue()
{
var queue = new RecordingNotifiableQueue();
await queue.WaitForMessagesAsync(TimeSpan.FromSeconds(10), CancellationToken.None);
- queue.LastTimeout.Should().Be(QueueWaitExtensions.ResolveNotifiableTimeout(TimeSpan.FromSeconds(10)));
+ // The heartbeat interval is passed as the timeout hint to the notifiable queue.
+ queue.LastTimeout.Should().Be(TimeSpan.FromSeconds(10));
}
- [Theory]
- [InlineData(10, 3)]
- [InlineData(45, 5)]
- [InlineData(1, 1)]
- public void ResolveNotifiableTimeout_ClampsToExpectedBounds(int heartbeatSeconds, int expectedSeconds)
+ [Fact]
+ public async Task WaitForMessagesAsync_ZeroHeartbeat_PassesZeroToNotifiableQueue()
{
- var timeout = QueueWaitExtensions.ResolveNotifiableTimeout(TimeSpan.FromSeconds(heartbeatSeconds));
+ var queue = new RecordingNotifiableQueue();
- timeout.Should().Be(TimeSpan.FromSeconds(expectedSeconds));
+ await queue.WaitForMessagesAsync(CancellationToken.None);
+
+ queue.LastTimeout.Should().Be(TimeSpan.Zero);
}
private sealed class RecordingNotifiableQueue : IMessageQueue, INotifiableQueue