Reduce idle CPU across 62 containers (phase 1)

- Add resource limits (heavy/medium/light tiers) to all 59 .NET services
- Add .NET GC tuning (server/workstation GC, DATAS, conserve memory)
- Convert FirstSignalSnapshotWriter from 10s polling to Valkey pub/sub
- Convert EnvironmentSettingsRefreshService from 60s polling to Valkey pub/sub
- Consolidate GraphAnalytics dual timers to single timer with idle-skip
- Increase healthcheck interval from 30s to 60s (configurable)
- Reduce debug logging to Information on 4 high-traffic services

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
master
2026-03-10 02:16:19 +02:00
parent c0c0267ac9
commit 166745f9f9
12 changed files with 601 additions and 89 deletions

View File

@@ -8,37 +8,49 @@ namespace StellaOps.Graph.Indexer.Analytics;
public sealed class GraphAnalyticsHostedService : BackgroundService
{
private readonly IGraphAnalyticsPipeline _pipeline;
private readonly IGraphSnapshotProvider _snapshotProvider;
private readonly GraphAnalyticsOptions _options;
private readonly ILogger<GraphAnalyticsHostedService> _logger;
public GraphAnalyticsHostedService(
IGraphAnalyticsPipeline pipeline,
IGraphSnapshotProvider snapshotProvider,
IOptions<GraphAnalyticsOptions> options,
ILogger<GraphAnalyticsHostedService> logger)
{
_pipeline = pipeline ?? throw new ArgumentNullException(nameof(pipeline));
_snapshotProvider = snapshotProvider ?? throw new ArgumentNullException(nameof(snapshotProvider));
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using var clusteringTimer = new PeriodicTimer(_options.ClusterInterval);
using var centralityTimer = new PeriodicTimer(_options.CentralityInterval);
var interval = _options.ClusterInterval < _options.CentralityInterval
? _options.ClusterInterval
: _options.CentralityInterval;
using var timer = new PeriodicTimer(interval);
while (!stoppingToken.IsCancellationRequested)
{
var clusteringTask = clusteringTimer.WaitForNextTickAsync(stoppingToken).AsTask();
var centralityTask = centralityTimer.WaitForNextTickAsync(stoppingToken).AsTask();
var completed = await Task.WhenAny(clusteringTask, centralityTask).ConfigureAwait(false);
if (completed.IsCanceled || stoppingToken.IsCancellationRequested)
if (!await timer.WaitForNextTickAsync(stoppingToken).ConfigureAwait(false))
{
break;
}
try
{
if (_options.SkipWhenIdle)
{
var pending = await _snapshotProvider.GetPendingSnapshotsAsync(stoppingToken).ConfigureAwait(false);
if (pending.Count == 0)
{
_logger.LogDebug("graph-indexer: skipping analytics pipeline, no pending snapshots");
continue;
}
}
await _pipeline.RunAsync(new GraphAnalyticsRunContext(ForceBackfill: false), stoppingToken).ConfigureAwait(false);
}
catch (OperationCanceledException)

View File

@@ -28,4 +28,9 @@ public sealed class GraphAnalyticsOptions
/// Whether to also write cluster ids onto graph node documents (alongside overlays).
/// </summary>
public bool WriteClusterAssignmentsToNodes { get; set; } = true;
/// <summary>
/// When true, skips the analytics pipeline if no pending snapshots exist.
/// </summary>
public bool SkipWhenIdle { get; set; } = true;
}

View File

@@ -1,6 +1,9 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StellaOps.JobEngine.Core.Domain.Events;
using StellaOps.JobEngine.Infrastructure.Services;
using StellaOps.Messaging.Transport.Valkey;
using StackExchange.Redis;
namespace StellaOps.JobEngine.Infrastructure.Events;
@@ -14,19 +17,22 @@ public sealed class JobEngineEventPublisher : IEventPublisher
private readonly IEventSigner? _eventSigner;
private readonly EventPublishOptions _options;
private readonly ILogger<JobEngineEventPublisher> _logger;
private readonly IServiceProvider? _serviceProvider;
public JobEngineEventPublisher(
IIdempotencyStore idempotencyStore,
INotifierBus notifierBus,
IOptions<EventPublishOptions> options,
ILogger<JobEngineEventPublisher> logger,
IEventSigner? eventSigner = null)
IEventSigner? eventSigner = null,
IServiceProvider? serviceProvider = null)
{
_idempotencyStore = idempotencyStore;
_notifierBus = notifierBus;
_eventSigner = eventSigner;
_options = options.Value;
_logger = logger;
_serviceProvider = serviceProvider;
}
public async Task<bool> PublishAsync(EventEnvelope envelope, CancellationToken cancellationToken = default)
@@ -48,6 +54,14 @@ public sealed class JobEngineEventPublisher : IEventPublisher
await PublishWithRetryAsync(channel, message, cancellationToken);
// Fire Valkey notification for job-lifecycle events to wake
// FirstSignalSnapshotWriter immediately instead of waiting for
// its fallback poll interval.
if (channel == "orch.jobs")
{
await TryNotifyFirstSignalDirtyAsync().ConfigureAwait(false);
}
JobEngineMetrics.EventPublished(envelope.TenantId, envelope.EventType.ToEventTypeName());
_logger.LogInformation(
@@ -206,6 +220,40 @@ public sealed class JobEngineEventPublisher : IEventPublisher
System.Net.Http.HttpRequestException or
System.IO.IOException;
}
/// <summary>
/// Fire-and-forget notification to the Valkey pub/sub channel that wakes
/// <see cref="FirstSignalSnapshotWriter"/>. This must never fail the
/// event publish — all exceptions are swallowed and logged.
/// </summary>
private async Task TryNotifyFirstSignalDirtyAsync()
{
try
{
if (_serviceProvider is null)
{
return;
}
var connectionFactory = _serviceProvider.GetService(typeof(ValkeyConnectionFactory)) as ValkeyConnectionFactory;
if (connectionFactory is null)
{
return;
}
var subscriber = await connectionFactory.GetSubscriberAsync().ConfigureAwait(false);
await subscriber.PublishAsync(
RedisChannel.Literal(FirstSignalSnapshotWriter.NotificationChannel),
"1",
CommandFlags.FireAndForget).ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogDebug(
ex,
"Failed to publish first-signal dirty notification (fire-and-forget); snapshot writer will use fallback timer.");
}
}
}
/// <summary>

View File

@@ -28,6 +28,7 @@ public sealed class FirstSignalSnapshotWriterOptions
public bool Enabled { get; set; }
public string? TenantId { get; set; }
public int PollIntervalSeconds { get; set; } = 10;
public int FallbackPollIntervalSeconds { get; set; } = 60;
public int MaxRunsPerTick { get; set; } = 50;
public int LookbackMinutes { get; set; } = 60;
}

View File

@@ -7,23 +7,40 @@ using Microsoft.Extensions.Options;
using StellaOps.JobEngine.Core.Domain;
using StellaOps.JobEngine.Infrastructure.Options;
using StellaOps.JobEngine.Infrastructure.Repositories;
using StellaOps.Messaging.Transport.Valkey;
using StackExchange.Redis;
namespace StellaOps.JobEngine.Infrastructure.Services;
public sealed class FirstSignalSnapshotWriter : BackgroundService
{
/// <summary>
/// Valkey pub/sub channel used to notify this writer that new job-lifecycle
/// data is available and it should wake up immediately.
/// </summary>
internal const string NotificationChannel = "notify:firstsignal:dirty";
private readonly IServiceScopeFactory _scopeFactory;
private readonly IServiceProvider _serviceProvider;
private readonly FirstSignalSnapshotWriterOptions _options;
private readonly ILogger<FirstSignalSnapshotWriter> _logger;
private readonly TimeProvider _timeProvider;
/// <summary>
/// Semaphore used for notification-based wakeup. Starts at 0 permits.
/// Released (up to 1) when a Valkey pub/sub notification arrives.
/// </summary>
private readonly SemaphoreSlim _notificationSignal = new(0, 1);
public FirstSignalSnapshotWriter(
IServiceScopeFactory scopeFactory,
IServiceProvider serviceProvider,
IOptions<FirstSignalOptions> options,
ILogger<FirstSignalSnapshotWriter> logger,
TimeProvider? timeProvider = null)
{
_scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(scopeFactory));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_options = (options ?? throw new ArgumentNullException(nameof(options))).Value.SnapshotWriter;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_timeProvider = timeProvider ?? TimeProvider.System;
@@ -48,13 +65,35 @@ public sealed class FirstSignalSnapshotWriter : BackgroundService
var tenantId = _options.TenantId.Trim();
var lookback = TimeSpan.FromMinutes(Math.Max(1, _options.LookbackMinutes));
var pollInterval = TimeSpan.FromSeconds(Math.Max(1, _options.PollIntervalSeconds));
var fallbackInterval = TimeSpan.FromSeconds(Math.Max(1, _options.FallbackPollIntervalSeconds));
var maxRuns = Math.Max(1, _options.MaxRunsPerTick);
using var timer = new PeriodicTimer(pollInterval);
// Try to subscribe to Valkey pub/sub for immediate wake-up notifications.
await TrySubscribeToValkeyNotificationsAsync(stoppingToken).ConfigureAwait(false);
while (await timer.WaitForNextTickAsync(stoppingToken).ConfigureAwait(false))
using var timer = new PeriodicTimer(fallbackInterval);
while (!stoppingToken.IsCancellationRequested)
{
// Wait for either a Valkey notification or the fallback timer to fire.
try
{
await Task.WhenAny(
_notificationSignal.WaitAsync(stoppingToken),
timer.WaitForNextTickAsync(stoppingToken).AsTask()
).ConfigureAwait(false);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
// Drain the semaphore to avoid duplicate wakeups from queued notifications.
while (_notificationSignal.Wait(0))
{
// Intentionally empty: draining any extra permits.
}
try
{
await WarmTenantAsync(tenantId, lookback, maxRuns, stoppingToken).ConfigureAwait(false);
@@ -70,6 +109,50 @@ public sealed class FirstSignalSnapshotWriter : BackgroundService
}
}
/// <summary>
/// Attempts to subscribe to the Valkey notification channel. If Valkey is
/// unavailable, logs a warning and falls back to timer-only mode.
/// </summary>
private async Task TrySubscribeToValkeyNotificationsAsync(CancellationToken cancellationToken)
{
try
{
var connectionFactory = _serviceProvider.GetService<ValkeyConnectionFactory>();
if (connectionFactory is null)
{
_logger.LogWarning(
"ValkeyConnectionFactory not available; FirstSignalSnapshotWriter will use timer-only mode " +
"(fallback interval {Interval}s).",
_options.FallbackPollIntervalSeconds);
return;
}
var subscriber = await connectionFactory.GetSubscriberAsync(cancellationToken).ConfigureAwait(false);
var channel = await subscriber
.SubscribeAsync(RedisChannel.Literal(NotificationChannel))
.ConfigureAwait(false);
channel.OnMessage(_ =>
{
try { _notificationSignal.Release(); }
catch (SemaphoreFullException) { /* already signaled */ }
});
_logger.LogInformation(
"FirstSignalSnapshotWriter subscribed to Valkey channel {Channel} for immediate wake-up notifications.",
NotificationChannel);
}
catch (Exception ex)
{
_logger.LogWarning(
ex,
"Failed to subscribe to Valkey channel {Channel}; FirstSignalSnapshotWriter will use timer-only mode " +
"(fallback interval {Interval}s).",
NotificationChannel,
_options.FallbackPollIntervalSeconds);
}
}
private async Task WarmTenantAsync(
string tenantId,
TimeSpan lookback,

View File

@@ -27,6 +27,7 @@
<ProjectReference Include="..\..\..\__Libraries\StellaOps.Infrastructure.Postgres\StellaOps.Infrastructure.Postgres.csproj" />
<ProjectReference Include="..\..\..\Telemetry\StellaOps.Telemetry.Core\StellaOps.Telemetry.Core\StellaOps.Telemetry.Core.csproj"/>
<ProjectReference Include="..\..\..\Router/__Libraries/StellaOps.Messaging\StellaOps.Messaging.csproj" />
<ProjectReference Include="..\..\..\Router/__Libraries/StellaOps.Messaging.Transport.Valkey\StellaOps.Messaging.Transport.Valkey.csproj" />
</ItemGroup>
<ItemGroup>

View File

@@ -1,6 +1,7 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
using StellaOps.Auth.Abstractions;
using StellaOps.Auth.ServerIntegration;
using StellaOps.Infrastructure.Postgres.Migrations;
@@ -255,6 +256,15 @@ builder.Services.AddSingleton<ITranslationBundleProvider>(sp => sp.GetRequiredSe
// Environment settings composer (3-layer merge: env vars -> YAML -> DB)
builder.Services.AddSingleton<EnvironmentSettingsComposer>();
builder.Services.AddSingleton<SetupStateDetector>();
// Valkey/Redis connection for pub/sub notifications (environment settings dirty signal)
var redisCs = builder.Configuration["ConnectionStrings:Redis"];
if (!string.IsNullOrWhiteSpace(redisCs))
{
builder.Services.AddSingleton<IConnectionMultiplexer>(
sp => ConnectionMultiplexer.Connect(redisCs));
}
builder.Services.AddHostedService<EnvironmentSettingsRefreshService>();
builder.Services.AddSingleton<IScoreEvaluationService, ScoreEvaluationService>();

View File

@@ -4,52 +4,130 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
using StellaOps.Platform.WebService.Options;
namespace StellaOps.Platform.WebService.Services;
/// <summary>
/// Background service that periodically invalidates the <see cref="IEnvironmentSettingsStore"/>
/// cache so DB-layer changes are picked up without restart.
/// Background service that invalidates the <see cref="IEnvironmentSettingsStore"/>
/// cache when notified via Valkey pub/sub or on a fallback periodic timer (default 300s).
/// </summary>
public sealed class EnvironmentSettingsRefreshService : BackgroundService
{
private readonly IEnvironmentSettingsStore _store;
private readonly IOptionsMonitor<PlatformServiceOptions> _optionsMonitor;
private readonly ILogger<EnvironmentSettingsRefreshService> _logger;
private readonly IConnectionMultiplexer? _connectionMultiplexer;
private readonly SemaphoreSlim _notificationSignal = new(0, 1);
private const int DefaultFallbackSeconds = 300;
private static readonly RedisChannel DirtyChannel =
RedisChannel.Literal("notify:platform:envsettings:dirty");
private ISubscriber? _subscriber;
public EnvironmentSettingsRefreshService(
IEnvironmentSettingsStore store,
IOptionsMonitor<PlatformServiceOptions> optionsMonitor,
ILogger<EnvironmentSettingsRefreshService> logger)
ILogger<EnvironmentSettingsRefreshService> logger,
IConnectionMultiplexer? connectionMultiplexer = null)
{
_store = store;
_optionsMonitor = optionsMonitor;
_logger = logger;
_connectionMultiplexer = connectionMultiplexer;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("EnvironmentSettingsRefreshService started");
// Subscribe to Valkey dirty notifications (best-effort)
try
{
if (_connectionMultiplexer is not null)
{
_subscriber = _connectionMultiplexer.GetSubscriber();
await _subscriber.SubscribeAsync(DirtyChannel, (_, _) =>
{
// Release the semaphore to wake the loop immediately.
// CurrentCount check avoids SemaphoreFullException when multiple
// notifications arrive before the loop drains.
if (_notificationSignal.CurrentCount == 0)
{
try { _notificationSignal.Release(); }
catch (SemaphoreFullException) { /* already signalled */ }
}
}).ConfigureAwait(false);
_logger.LogInformation(
"EnvironmentSettingsRefreshService subscribed to Valkey channel {Channel}",
DirtyChannel);
}
else
{
_logger.LogInformation(
"EnvironmentSettingsRefreshService running without Valkey subscription (fallback timer only)");
}
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"EnvironmentSettingsRefreshService failed to subscribe to Valkey; falling back to timer-only mode");
}
// Determine fallback interval
var seconds = _optionsMonitor.CurrentValue.Cache.EnvironmentSettingsRefreshSeconds;
if (seconds <= 0) seconds = DefaultFallbackSeconds;
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(seconds));
while (!stoppingToken.IsCancellationRequested)
{
var seconds = _optionsMonitor.CurrentValue.Cache.EnvironmentSettingsRefreshSeconds;
if (seconds <= 0) seconds = 60;
var semaphoreTask = _notificationSignal.WaitAsync(stoppingToken);
var timerTask = timer.WaitForNextTickAsync(stoppingToken).AsTask();
try
{
await Task.Delay(TimeSpan.FromSeconds(seconds), stoppingToken).ConfigureAwait(false);
await Task.WhenAny(semaphoreTask, timerTask).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
break;
}
if (stoppingToken.IsCancellationRequested) break;
_store.InvalidateCache();
_logger.LogDebug("Environment settings cache invalidated");
}
_logger.LogInformation("EnvironmentSettingsRefreshService stopped");
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
// Unsubscribe from Valkey channel before stopping
if (_subscriber is not null)
{
try
{
await _subscriber.UnsubscribeAsync(DirtyChannel).ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Error unsubscribing from Valkey channel during shutdown");
}
}
await base.StopAsync(cancellationToken).ConfigureAwait(false);
}
public override void Dispose()
{
_notificationSignal.Dispose();
base.Dispose();
}
}

View File

@@ -4,6 +4,7 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Npgsql;
using StackExchange.Redis;
using StellaOps.Platform.Database.EfCore.Context;
using StellaOps.Platform.Database.Postgres;
@@ -19,10 +20,13 @@ public sealed class PostgresEnvironmentSettingsStore : IEnvironmentSettingsStore
{
private readonly NpgsqlDataSource _dataSource;
private readonly ILogger<PostgresEnvironmentSettingsStore> _logger;
private readonly ISubscriber? _subscriber;
private volatile IReadOnlyDictionary<string, string>? _cache;
private readonly object _cacheLock = new();
private const int DefaultCommandTimeoutSeconds = 30;
private static readonly RedisChannel DirtyChannel =
RedisChannel.Literal("notify:platform:envsettings:dirty");
private const string UpsertSql = """
INSERT INTO platform.environment_settings (key, value, updated_at, updated_by)
@@ -32,10 +36,12 @@ public sealed class PostgresEnvironmentSettingsStore : IEnvironmentSettingsStore
public PostgresEnvironmentSettingsStore(
NpgsqlDataSource dataSource,
ILogger<PostgresEnvironmentSettingsStore>? logger = null)
ILogger<PostgresEnvironmentSettingsStore>? logger = null,
IConnectionMultiplexer? connectionMultiplexer = null)
{
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
_logger = logger ?? Microsoft.Extensions.Logging.Abstractions.NullLogger<PostgresEnvironmentSettingsStore>.Instance;
_subscriber = connectionMultiplexer?.GetSubscriber();
}
public async Task<IReadOnlyDictionary<string, string>> GetAllAsync(CancellationToken ct = default)
@@ -107,6 +113,7 @@ public sealed class PostgresEnvironmentSettingsStore : IEnvironmentSettingsStore
ct).ConfigureAwait(false);
InvalidateCache();
PublishDirtyNotification();
_logger.LogInformation("Environment setting {Key} updated by {UpdatedBy}", key, updatedBy);
}
@@ -129,6 +136,7 @@ public sealed class PostgresEnvironmentSettingsStore : IEnvironmentSettingsStore
dbContext.EnvironmentSettings.Remove(entity);
var rows = await dbContext.SaveChangesAsync(ct).ConfigureAwait(false);
InvalidateCache();
PublishDirtyNotification();
_logger.LogInformation("Environment setting {Key} deleted ({Rows} rows affected)", key, rows);
}
@@ -145,4 +153,17 @@ public sealed class PostgresEnvironmentSettingsStore : IEnvironmentSettingsStore
_cache = null;
}
}
private void PublishDirtyNotification()
{
try
{
_subscriber?.PublishAsync(DirtyChannel, "1", CommandFlags.FireAndForget);
}
catch
{
// Fire-and-forget: Valkey notification is best-effort.
// The background refresh service will still pick up changes on the fallback timer.
}
}
}

View File

@@ -11,6 +11,7 @@
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.OpenApi" />
<PackageReference Include="Microsoft.EntityFrameworkCore" />
<PackageReference Include="StackExchange.Redis" />
</ItemGroup>
<ItemGroup>