perf(router): replace 100ms Valkey polling with Pub/Sub notification wakeup and increase heartbeat to 45s

The Valkey transport layer used 100ms busy-polling loops (Task.Delay(100))
across ~90 concurrent loops in 45+ services, generating ~900 idle
commands/sec and burning ~58% CPU while the system was completely idle.

Replace polling with Redis Pub/Sub notifications:
- Publishers fire PUBLISH after each XADD (fire-and-forget)
- Consumers SUBSCRIBE and wait on SemaphoreSlim with 30s fallback timeout
- Applies to both ValkeyMessageQueue (INotifiableQueue) and ValkeyEventStream
- Non-Valkey transports fall back to 1s polling via QueueWaitExtensions

Increase heartbeat interval from 10s to 45s across all transport options,
with corresponding health threshold adjustments (stale: 135s, degraded: 90s).

Expected idle CPU reduction: ~58% → ~3-5%.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
master
2026-03-09 07:47:31 +02:00
parent f218ec82ec
commit 841add4f27
17 changed files with 230 additions and 48 deletions

View File

@@ -81,6 +81,15 @@ public sealed class ValkeyConnectionFactory : IAsyncDisposable
return _connection; return _connection;
} }
/// <summary>
/// Gets a subscriber for Pub/Sub operations.
/// </summary>
public async ValueTask<ISubscriber> GetSubscriberAsync(CancellationToken cancellationToken = default)
{
var connection = await GetConnectionAsync(cancellationToken).ConfigureAwait(false);
return connection.GetSubscriber();
}
/// <summary> /// <summary>
/// Tests the connection by sending a PING command. /// Tests the connection by sending a PING command.
/// </summary> /// </summary>

View File

@@ -24,6 +24,8 @@ public sealed class ValkeyEventStream<TEvent> : IEventStream<TEvent>
private const string TenantIdField = "tenantId"; private const string TenantIdField = "tenantId";
private const string CorrelationIdField = "correlationId"; private const string CorrelationIdField = "correlationId";
private string NotificationChannel => $"notify:s:{RedisKey}";
public ValkeyEventStream( public ValkeyEventStream(
ValkeyConnectionFactory connectionFactory, ValkeyConnectionFactory connectionFactory,
EventStreamOptions options, EventStreamOptions options,
@@ -90,6 +92,19 @@ public sealed class ValkeyEventStream<TEvent> : IEventStream<TEvent>
maxLength: _options.MaxLength.HasValue ? (int)_options.MaxLength.Value : null, maxLength: _options.MaxLength.HasValue ? (int)_options.MaxLength.Value : null,
useApproximateMaxLength: _options.ApproximateTrimming).ConfigureAwait(false); useApproximateMaxLength: _options.ApproximateTrimming).ConfigureAwait(false);
// Notify subscribers that new data is available (fire-and-forget).
try
{
await db.PublishAsync(
RedisChannel.Literal(NotificationChannel),
"1",
CommandFlags.FireAndForget).ConfigureAwait(false);
}
catch
{
// Best-effort notification; subscribers will still poll on timeout fallback.
}
return EventPublishResult.Succeeded(entryId!); return EventPublishResult.Succeeded(entryId!);
} }
@@ -131,30 +146,47 @@ public sealed class ValkeyEventStream<TEvent> : IEventStream<TEvent>
lastId = info.LastEntryId ?? "0-0"; lastId = info.LastEntryId ?? "0-0";
} }
while (!cancellationToken.IsCancellationRequested) // Subscribe to Pub/Sub notification channel for efficient wakeup.
using var notificationSignal = new SemaphoreSlim(0, 1);
var subscriber = await _connectionFactory.GetSubscriberAsync(cancellationToken).ConfigureAwait(false);
var channel = await subscriber.SubscribeAsync(RedisChannel.Literal(NotificationChannel)).ConfigureAwait(false);
channel.OnMessage(_ =>
{ {
var entries = await db.StreamReadAsync( try { notificationSignal.Release(); }
RedisKey, catch (SemaphoreFullException) { /* already signaled */ }
lastId, });
count: 100).ConfigureAwait(false);
if (entries.Length > 0) try
{
while (!cancellationToken.IsCancellationRequested)
{ {
foreach (var entry in entries) var entries = await db.StreamReadAsync(
RedisKey,
lastId,
count: 100).ConfigureAwait(false);
if (entries.Length > 0)
{ {
var streamEvent = ParseEntry(entry); foreach (var entry in entries)
if (streamEvent is not null)
{ {
yield return streamEvent; var streamEvent = ParseEntry(entry);
if (streamEvent is not null)
{
yield return streamEvent;
}
lastId = entry.Id!;
} }
lastId = entry.Id!; }
else
{
// Wait for Pub/Sub notification or fallback timeout.
await notificationSignal.WaitAsync(_options.PollInterval, cancellationToken).ConfigureAwait(false);
} }
} }
else }
{ finally
// No new entries, wait before polling again {
await Task.Delay(_options.PollInterval, cancellationToken).ConfigureAwait(false); channel.Unsubscribe();
}
} }
} }

View File

@@ -14,7 +14,7 @@ namespace StellaOps.Messaging.Transport.Valkey;
/// Valkey/Redis Streams implementation of <see cref="IMessageQueue{TMessage}"/>. /// Valkey/Redis Streams implementation of <see cref="IMessageQueue{TMessage}"/>.
/// </summary> /// </summary>
/// <typeparam name="TMessage">The message type.</typeparam> /// <typeparam name="TMessage">The message type.</typeparam>
public sealed class ValkeyMessageQueue<TMessage> : IMessageQueue<TMessage>, IAsyncDisposable public sealed class ValkeyMessageQueue<TMessage> : IMessageQueue<TMessage>, INotifiableQueue, IAsyncDisposable
where TMessage : class where TMessage : class
{ {
private const string ProviderNameValue = "valkey"; private const string ProviderNameValue = "valkey";
@@ -36,11 +36,22 @@ public sealed class ValkeyMessageQueue<TMessage> : IMessageQueue<TMessage>, IAsy
private readonly ILogger<ValkeyMessageQueue<TMessage>>? _logger; private readonly ILogger<ValkeyMessageQueue<TMessage>>? _logger;
private readonly TimeProvider _timeProvider; private readonly TimeProvider _timeProvider;
private readonly SemaphoreSlim _groupInitLock = new(1, 1); private readonly SemaphoreSlim _groupInitLock = new(1, 1);
private readonly SemaphoreSlim _subscriptionInitLock = new(1, 1);
private readonly JsonSerializerOptions _jsonOptions; private readonly JsonSerializerOptions _jsonOptions;
/// <summary>
/// Semaphore used for notification-based wakeup. Starts at 0 permits.
/// Released (up to 1) when a Pub/Sub notification arrives on the queue channel.
/// </summary>
private readonly SemaphoreSlim _notificationSignal = new(0, 1);
private ChannelMessageQueue? _subscription;
private volatile bool _subscribed;
private volatile bool _groupInitialized; private volatile bool _groupInitialized;
private bool _disposed; private bool _disposed;
private string NotificationChannel => $"notify:q:{_queueOptions.QueueName}";
public ValkeyMessageQueue( public ValkeyMessageQueue(
ValkeyConnectionFactory connectionFactory, ValkeyConnectionFactory connectionFactory,
MessageQueueOptions queueOptions, MessageQueueOptions queueOptions,
@@ -398,6 +409,48 @@ public sealed class ValkeyMessageQueue<TMessage> : IMessageQueue<TMessage>, IAsy
} }
} }
/// <inheritdoc />
public async Task WaitForNotificationAsync(TimeSpan timeout, CancellationToken cancellationToken)
{
await EnsureNotificationSubscriptionAsync(cancellationToken).ConfigureAwait(false);
// Wait for a pub/sub signal or timeout (fallback for missed notifications).
try
{
await _notificationSignal.WaitAsync(timeout, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
throw;
}
}
private async Task EnsureNotificationSubscriptionAsync(CancellationToken cancellationToken)
{
if (_subscribed) return;
await _subscriptionInitLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (_subscribed) return;
var subscriber = await _connectionFactory.GetSubscriberAsync(cancellationToken).ConfigureAwait(false);
_subscription = await subscriber.SubscribeAsync(RedisChannel.Literal(NotificationChannel)).ConfigureAwait(false);
_subscription.OnMessage(_ =>
{
try { _notificationSignal.Release(); }
catch (SemaphoreFullException) { /* already signaled */ }
});
_subscribed = true;
_logger?.LogDebug("Subscribed to queue notifications on channel {Channel}", NotificationChannel);
}
finally
{
_subscriptionInitLock.Release();
}
}
#pragma warning disable CS1998 #pragma warning disable CS1998
public async ValueTask DisposeAsync() public async ValueTask DisposeAsync()
{ {
@@ -407,6 +460,15 @@ public sealed class ValkeyMessageQueue<TMessage> : IMessageQueue<TMessage>, IAsy
} }
_disposed = true; _disposed = true;
if (_subscription is not null)
{
try { _subscription.Unsubscribe(); }
catch { /* best-effort cleanup */ }
}
_notificationSignal.Dispose();
_subscriptionInitLock.Dispose();
_groupInitLock.Dispose(); _groupInitLock.Dispose();
} }
@@ -637,6 +699,20 @@ public sealed class ValkeyMessageQueue<TMessage> : IMessageQueue<TMessage>, IAsy
} }
var result = await database.ExecuteAsync("XADD", [.. args]).ConfigureAwait(false); var result = await database.ExecuteAsync("XADD", [.. args]).ConfigureAwait(false);
// Notify subscribers that new data is available (fire-and-forget).
try
{
await database.PublishAsync(
RedisChannel.Literal($"notify:q:{stream}"),
"1",
CommandFlags.FireAndForget).ConfigureAwait(false);
}
catch
{
// Best-effort notification; consumers will still poll on timeout fallback.
}
return result!.ToString()!; return result!.ToString()!;
} }

View File

@@ -0,0 +1,18 @@
namespace StellaOps.Messaging.Abstractions;
/// <summary>
/// Optional interface for queues that support efficient notification-based wakeup
/// instead of busy-wait polling. When the underlying transport supports it (e.g. Valkey Pub/Sub),
/// consumers await a signal rather than spinning with Task.Delay.
/// </summary>
public interface INotifiableQueue
{
/// <summary>
/// Waits for a notification that new messages may be available.
/// Returns immediately if a notification has already been received since the last wait.
/// Falls back to returning after <paramref name="timeout"/> if no notification arrives.
/// </summary>
/// <param name="timeout">Maximum time to wait before returning (fallback for missed notifications).</param>
/// <param name="cancellationToken">Cancellation token.</param>
Task WaitForNotificationAsync(TimeSpan timeout, CancellationToken cancellationToken);
}

View File

@@ -23,10 +23,11 @@ public sealed class EventStreamOptions
public bool ApproximateTrimming { get; set; } = true; public bool ApproximateTrimming { get; set; } = true;
/// <summary> /// <summary>
/// Gets or sets the polling interval for subscription (when applicable). /// Gets or sets the fallback timeout for subscription polling.
/// Default is 100ms. /// When Pub/Sub notifications are available, this acts as a safety-net timeout
/// in case a notification is missed. Default is 30 seconds.
/// </summary> /// </summary>
public TimeSpan PollInterval { get; set; } = TimeSpan.FromMilliseconds(100); public TimeSpan PollInterval { get; set; } = TimeSpan.FromSeconds(30);
/// <summary> /// <summary>
/// Gets or sets the idempotency window for duplicate detection. /// Gets or sets the idempotency window for duplicate detection.

View File

@@ -52,9 +52,9 @@ public sealed partial class StellaMicroserviceOptions
/// <summary> /// <summary>
/// Gets or sets the heartbeat interval. /// Gets or sets the heartbeat interval.
/// Default: 10 seconds. /// Default: 45 seconds.
/// </summary> /// </summary>
public TimeSpan HeartbeatInterval { get; set; } = TimeSpan.FromSeconds(10); public TimeSpan HeartbeatInterval { get; set; } = TimeSpan.FromSeconds(45);
/// <summary> /// <summary>
/// Gets or sets the maximum reconnect backoff. /// Gets or sets the maximum reconnect backoff.

View File

@@ -455,7 +455,7 @@ public static class StellaRouterIntegrationHelper
NormalizeDuration(destination, $"{destinationSection}:RequestTimeout", TimeSpan.FromSeconds(30)); NormalizeDuration(destination, $"{destinationSection}:RequestTimeout", TimeSpan.FromSeconds(30));
NormalizeDuration(destination, $"{destinationSection}:LeaseDuration", TimeSpan.FromMinutes(5)); NormalizeDuration(destination, $"{destinationSection}:LeaseDuration", TimeSpan.FromMinutes(5));
NormalizeDuration(destination, $"{destinationSection}:HeartbeatInterval", TimeSpan.FromSeconds(10)); NormalizeDuration(destination, $"{destinationSection}:HeartbeatInterval", TimeSpan.FromSeconds(45));
} }
private static void NormalizeDuration( private static void NormalizeDuration(

View File

@@ -99,9 +99,9 @@ public sealed class StellaRouterOptions
/// <summary> /// <summary>
/// Heartbeat interval for health reporting. /// Heartbeat interval for health reporting.
/// Default: 10 seconds. /// Default: 45 seconds.
/// </summary> /// </summary>
public TimeSpan HeartbeatInterval { get; set; } = TimeSpan.FromSeconds(10); public TimeSpan HeartbeatInterval { get; set; } = TimeSpan.FromSeconds(45);
/// <summary> /// <summary>
/// Initial reconnect backoff delay. /// Initial reconnect backoff delay.

View File

@@ -39,9 +39,9 @@ public class StellaRouterOptionsBase
/// <summary> /// <summary>
/// Heartbeat interval in seconds for health reporting. /// Heartbeat interval in seconds for health reporting.
/// Default: 10 seconds. /// Default: 45 seconds.
/// </summary> /// </summary>
public int HeartbeatIntervalSeconds { get; set; } = 10; public int HeartbeatIntervalSeconds { get; set; } = 45;
/// <summary> /// <summary>
/// Service trust mode for gateway-enforced authorization semantics. /// Service trust mode for gateway-enforced authorization semantics.

View File

@@ -12,21 +12,23 @@ public sealed class HealthOptions
/// <summary> /// <summary>
/// Gets or sets the threshold after which a connection is considered stale (no heartbeat). /// Gets or sets the threshold after which a connection is considered stale (no heartbeat).
/// Default: 30 seconds. /// Should be at least 3x the heartbeat interval (45s default).
/// Default: 135 seconds.
/// </summary> /// </summary>
public TimeSpan StaleThreshold { get; set; } = TimeSpan.FromSeconds(30); public TimeSpan StaleThreshold { get; set; } = TimeSpan.FromSeconds(135);
/// <summary> /// <summary>
/// Gets or sets the threshold after which a connection is considered degraded. /// Gets or sets the threshold after which a connection is considered degraded.
/// Default: 15 seconds. /// Should be at least 2x the heartbeat interval (45s default).
/// Default: 90 seconds.
/// </summary> /// </summary>
public TimeSpan DegradedThreshold { get; set; } = TimeSpan.FromSeconds(15); public TimeSpan DegradedThreshold { get; set; } = TimeSpan.FromSeconds(90);
/// <summary> /// <summary>
/// Gets or sets the interval at which to check for stale connections. /// Gets or sets the interval at which to check for stale connections.
/// Default: 5 seconds. /// Default: 15 seconds.
/// </summary> /// </summary>
public TimeSpan CheckInterval { get; set; } = TimeSpan.FromSeconds(5); public TimeSpan CheckInterval { get; set; } = TimeSpan.FromSeconds(15);
/// <summary> /// <summary>
/// Gets or sets the number of ping measurements to keep for averaging. /// Gets or sets the number of ping measurements to keep for averaging.

View File

@@ -25,13 +25,13 @@ public sealed class InMemoryTransportOptions
/// <summary> /// <summary>
/// Gets or sets the heartbeat interval. /// Gets or sets the heartbeat interval.
/// Default: 10 seconds. /// Default: 45 seconds.
/// </summary> /// </summary>
public TimeSpan HeartbeatInterval { get; set; } = TimeSpan.FromSeconds(10); public TimeSpan HeartbeatInterval { get; set; } = TimeSpan.FromSeconds(45);
/// <summary> /// <summary>
/// Gets or sets the heartbeat timeout (time since last heartbeat before connection is considered unhealthy). /// Gets or sets the heartbeat timeout (time since last heartbeat before connection is considered unhealthy).
/// Default: 30 seconds. /// Default: 135 seconds (3x heartbeat interval).
/// </summary> /// </summary>
public TimeSpan HeartbeatTimeout { get; set; } = TimeSpan.FromSeconds(30); public TimeSpan HeartbeatTimeout { get; set; } = TimeSpan.FromSeconds(135);
} }

View File

@@ -0,0 +1,38 @@
using StellaOps.Messaging.Abstractions;
namespace StellaOps.Router.Transport.Messaging.Extensions;
/// <summary>
/// Extension to efficiently await new messages on a queue.
/// Uses <see cref="INotifiableQueue"/> Pub/Sub wakeup when available,
/// falls back to a short delay for non-notifiable queue implementations.
/// </summary>
internal static class QueueWaitExtensions
{
/// <summary>
/// Default fallback timeout when the queue supports Pub/Sub notifications.
/// The wait returns early on notification; this is only a safety net.
/// </summary>
private static readonly TimeSpan NotifiableTimeout = TimeSpan.FromSeconds(30);
/// <summary>
/// Fallback delay for queues that do not implement <see cref="INotifiableQueue"/>.
/// </summary>
private static readonly TimeSpan PollingFallback = TimeSpan.FromSeconds(1);
/// <summary>
/// Waits for new messages to become available on the queue.
/// </summary>
public static Task WaitForMessagesAsync<TMessage>(
this IMessageQueue<TMessage> queue,
CancellationToken cancellationToken)
where TMessage : class
{
if (queue is INotifiableQueue notifiable)
{
return notifiable.WaitForNotificationAsync(NotifiableTimeout, cancellationToken);
}
return Task.Delay(PollingFallback, cancellationToken);
}
}

View File

@@ -4,6 +4,7 @@ using Microsoft.Extensions.Options;
using StellaOps.Messaging; using StellaOps.Messaging;
using StellaOps.Messaging.Abstractions; using StellaOps.Messaging.Abstractions;
using StellaOps.Router.Common.Abstractions; using StellaOps.Router.Common.Abstractions;
using StellaOps.Router.Transport.Messaging.Extensions;
using StellaOps.Router.Common.Enums; using StellaOps.Router.Common.Enums;
using StellaOps.Router.Common.Models; using StellaOps.Router.Common.Models;
using StellaOps.Router.Transport.Messaging.Options; using StellaOps.Router.Transport.Messaging.Options;
@@ -202,9 +203,10 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
} }
} }
// Wait for Pub/Sub notification instead of busy-polling.
if (leases.Count == 0) if (leases.Count == 0)
{ {
await Task.Delay(100, cancellationToken); await _responseQueue.WaitForMessagesAsync(cancellationToken);
} }
} }
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
@@ -265,9 +267,10 @@ public sealed class MessagingTransportClient : ITransportClient, IMicroserviceTr
} }
} }
// Wait for Pub/Sub notification instead of busy-polling.
if (leases.Count == 0) if (leases.Count == 0)
{ {
await Task.Delay(100, cancellationToken); await _serviceIncomingQueue.WaitForMessagesAsync(cancellationToken);
} }
} }
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)

View File

@@ -4,6 +4,7 @@ using Microsoft.Extensions.Options;
using StellaOps.Messaging; using StellaOps.Messaging;
using StellaOps.Messaging.Abstractions; using StellaOps.Messaging.Abstractions;
using StellaOps.Router.Common.Abstractions; using StellaOps.Router.Common.Abstractions;
using StellaOps.Router.Transport.Messaging.Extensions;
using StellaOps.Router.Common.Enums; using StellaOps.Router.Common.Enums;
using StellaOps.Router.Common.Models; using StellaOps.Router.Common.Models;
using StellaOps.Router.Transport.Messaging.Options; using StellaOps.Router.Transport.Messaging.Options;
@@ -169,10 +170,10 @@ public sealed class MessagingTransportServer : ITransportServer, IDisposable
} }
} }
// Small delay if no messages // Wait for Pub/Sub notification instead of busy-polling.
if (leases.Count == 0) if (leases.Count == 0)
{ {
await Task.Delay(100, cancellationToken); await _requestQueue.WaitForMessagesAsync(cancellationToken);
} }
} }
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
@@ -213,9 +214,10 @@ public sealed class MessagingTransportServer : ITransportServer, IDisposable
} }
} }
// Wait for Pub/Sub notification instead of busy-polling.
if (leases.Count == 0) if (leases.Count == 0)
{ {
await Task.Delay(100, cancellationToken); await _responseQueue.WaitForMessagesAsync(cancellationToken);
} }
} }
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)

View File

@@ -55,8 +55,9 @@ public class MessagingTransportOptions
/// <summary> /// <summary>
/// Gets or sets the heartbeat interval. /// Gets or sets the heartbeat interval.
/// Default: 45 seconds.
/// </summary> /// </summary>
public TimeSpan HeartbeatInterval { get; set; } = TimeSpan.FromSeconds(10); public TimeSpan HeartbeatInterval { get; set; } = TimeSpan.FromSeconds(45);
/// <summary> /// <summary>
/// Gets or sets the dead letter queue suffix. /// Gets or sets the dead letter queue suffix.

View File

@@ -41,7 +41,7 @@ public sealed class StellaRouterOptionsTests
Assert.Equal(AuthorizationMappingStrategy.Hybrid, options.AuthorizationMapping); Assert.Equal(AuthorizationMappingStrategy.Hybrid, options.AuthorizationMapping);
Assert.Equal(MissingAuthorizationBehavior.WarnAndAllow, options.OnMissingAuthorization); Assert.Equal(MissingAuthorizationBehavior.WarnAndAllow, options.OnMissingAuthorization);
Assert.Equal(TimeSpan.FromSeconds(30), options.DefaultTimeout); Assert.Equal(TimeSpan.FromSeconds(30), options.DefaultTimeout);
Assert.Equal(TimeSpan.FromSeconds(10), options.HeartbeatInterval); Assert.Equal(TimeSpan.FromSeconds(45), options.HeartbeatInterval);
} }
[Fact] [Fact]

View File

@@ -43,24 +43,24 @@ public sealed class InMemoryTransportOptionsTests
[Trait("Category", TestCategories.Unit)] [Trait("Category", TestCategories.Unit)]
[Fact] [Fact]
public void Constructor_HeartbeatInterval_Is10Seconds() public void Constructor_HeartbeatInterval_Is45Seconds()
{ {
// Arrange & Act // Arrange & Act
var options = new InMemoryTransportOptions(); var options = new InMemoryTransportOptions();
// Assert // Assert
options.HeartbeatInterval.Should().Be(TimeSpan.FromSeconds(10)); options.HeartbeatInterval.Should().Be(TimeSpan.FromSeconds(45));
} }
[Trait("Category", TestCategories.Unit)] [Trait("Category", TestCategories.Unit)]
[Fact] [Fact]
public void Constructor_HeartbeatTimeout_Is30Seconds() public void Constructor_HeartbeatTimeout_Is135Seconds()
{ {
// Arrange & Act // Arrange & Act
var options = new InMemoryTransportOptions(); var options = new InMemoryTransportOptions();
// Assert // Assert
options.HeartbeatTimeout.Should().Be(TimeSpan.FromSeconds(30)); options.HeartbeatTimeout.Should().Be(TimeSpan.FromSeconds(135));
} }
#endregion #endregion