feat(scheduler): postgres + redis webhook rate limiter runtime

Sprint SPRINT_20260417_019_JobEngine_truthful_webhook_rate_limiter_runtime.

NoOpWebhookRateLimiter + RedisWebhookRateLimiter, service-collection
wiring, WebhookRateLimiterRuntimeTests, SCHED-WEB-16-104-WEBHOOKS doc.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
master
2026-04-19 14:41:42 +03:00
parent 052de213e1
commit 70cbfcee72
5 changed files with 480 additions and 1 deletions

View File

@@ -0,0 +1,12 @@
using System;
namespace StellaOps.Scheduler.WebService.EventWebhooks;
internal sealed class NoOpWebhookRateLimiter : IWebhookRateLimiter
{
public bool TryAcquire(string key, int limit, TimeSpan window, out TimeSpan retryAfter)
{
retryAfter = TimeSpan.Zero;
return true;
}
}

View File

@@ -0,0 +1,181 @@
using StackExchange.Redis;
using StellaOps.Scheduler.Queue;
using StellaOps.Scheduler.WebService.GraphJobs.Events;
using System;
using System.Globalization;
using System.Threading;
namespace StellaOps.Scheduler.WebService.EventWebhooks;
internal sealed class RedisWebhookRateLimiter : IWebhookRateLimiter, IDisposable
{
private const string AcquireScript = """
local key = KEYS[1]
local now_ms = tonumber(ARGV[1])
local window_ms = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
local member = ARGV[4]
local ttl_ms = tonumber(ARGV[5])
local cutoff = now_ms - window_ms
redis.call('ZREMRANGEBYSCORE', key, '-inf', '(' .. tostring(cutoff))
local count = redis.call('ZCARD', key)
if count >= limit then
local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')
if oldest[2] ~= nil then
local retry_ms = (tonumber(oldest[2]) + window_ms) - now_ms
if retry_ms < 0 then
retry_ms = 0
end
return {0, tostring(retry_ms)}
end
return {0, tostring(window_ms)}
end
redis.call('ZADD', key, now_ms, member)
redis.call('PEXPIRE', key, ttl_ms)
return {1, '0'}
""";
private readonly SchedulerRedisQueueOptions _options;
private readonly IRedisConnectionFactory _connectionFactory;
private readonly TimeProvider _timeProvider;
private readonly SemaphoreSlim _connectionGate = new(1, 1);
private IConnectionMultiplexer? _connection;
private long _sequence;
private bool _disposed;
public RedisWebhookRateLimiter(
SchedulerRedisQueueOptions options,
IRedisConnectionFactory connectionFactory,
TimeProvider? timeProvider = null)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
_timeProvider = timeProvider ?? TimeProvider.System;
if (string.IsNullOrWhiteSpace(_options.ConnectionString))
{
throw new InvalidOperationException("Scheduler webhook rate limiting requires a Redis connection string.");
}
}
public bool TryAcquire(string key, int limit, TimeSpan window, out TimeSpan retryAfter)
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(RedisWebhookRateLimiter));
}
if (limit <= 0)
{
retryAfter = TimeSpan.Zero;
return true;
}
if (window <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(nameof(window), "Webhook rate limit window must be positive.");
}
var database = GetDatabase();
var nowMilliseconds = _timeProvider.GetUtcNow().ToUnixTimeMilliseconds();
var windowMilliseconds = checked((long)Math.Ceiling(window.TotalMilliseconds));
var ttlMilliseconds = checked(windowMilliseconds * 2);
var member = string.Create(
CultureInfo.InvariantCulture,
$"{nowMilliseconds}:{Environment.ProcessId}:{Interlocked.Increment(ref _sequence)}");
var result = database.ScriptEvaluate(
AcquireScript,
[BuildRedisKey(key)],
[nowMilliseconds, windowMilliseconds, limit, member, ttlMilliseconds]);
if (result.IsNull)
{
throw new InvalidOperationException("Scheduler webhook rate limiter returned a null Redis result.");
}
var parts = (RedisResult[])result!;
if (parts.Length != 2)
{
throw new InvalidOperationException("Scheduler webhook rate limiter returned an unexpected Redis result shape.");
}
var granted = string.Equals(parts[0].ToString(), "1", StringComparison.Ordinal);
var retryAfterMilliseconds = ParseMilliseconds(parts[1]);
retryAfter = TimeSpan.FromMilliseconds(retryAfterMilliseconds);
return granted;
}
public void Dispose()
{
if (_disposed)
{
return;
}
_disposed = true;
_connection?.Dispose();
_connectionGate.Dispose();
}
private IDatabase GetDatabase()
{
if (_connection is { IsConnected: true })
{
return _connection.GetDatabase(_options.Database ?? -1);
}
_connectionGate.Wait();
try
{
if (_connection is null || !_connection.IsConnected)
{
_connection = _connectionFactory
.ConnectAsync(CreateConfigurationOptions(), CancellationToken.None)
.GetAwaiter()
.GetResult();
}
}
finally
{
_connectionGate.Release();
}
return _connection.GetDatabase(_options.Database ?? -1);
}
private ConfigurationOptions CreateConfigurationOptions()
{
var configuration = ConfigurationOptions.Parse(_options.ConnectionString!);
configuration.AbortOnConnectFail = false;
configuration.ConnectTimeout = (int)_options.InitializationTimeout.TotalMilliseconds;
configuration.ConnectRetry = 3;
configuration.ClientName ??= "stellaops-scheduler-webhooks";
if (_options.Database is not null)
{
configuration.DefaultDatabase = _options.Database;
}
return configuration;
}
private static RedisKey BuildRedisKey(string key)
=> new($"scheduler:webhooks:ratelimit:{key}");
private static double ParseMilliseconds(RedisResult result)
{
if (!double.TryParse(result.ToString(), NumberStyles.Float, CultureInfo.InvariantCulture, out var milliseconds))
{
throw new InvalidOperationException("Scheduler webhook rate limiter returned a non-numeric retry-after value.");
}
return Math.Max(0d, milliseconds);
}
}

View File

@@ -0,0 +1,67 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
using StellaOps.Scheduler.Queue;
using StellaOps.Scheduler.WebService.GraphJobs.Events;
using StellaOps.Scheduler.WebService.Options;
using System;
namespace StellaOps.Scheduler.WebService.EventWebhooks;
internal static class WebhookRateLimiterServiceCollectionExtensions
{
public static IServiceCollection AddSchedulerWebhookRateLimiter(
this IServiceCollection services,
IConfiguration configuration,
IHostEnvironment environment)
{
ArgumentNullException.ThrowIfNull(services);
ArgumentNullException.ThrowIfNull(configuration);
ArgumentNullException.ThrowIfNull(environment);
services.RemoveAll<IWebhookRateLimiter>();
if (environment.IsEnvironment("Testing"))
{
services.AddSingleton<IWebhookRateLimiter, InMemoryWebhookRateLimiter>();
return services;
}
var eventsOptions = new SchedulerEventsOptions();
configuration.GetSection("Scheduler:Events").Bind(eventsOptions);
eventsOptions.Webhooks ??= new SchedulerInboundWebhooksOptions();
eventsOptions.Webhooks.Conselier ??= SchedulerWebhookOptions.CreateDefault("conselier");
eventsOptions.Webhooks.Excitor ??= SchedulerWebhookOptions.CreateDefault("excitor");
if (!eventsOptions.Webhooks.Conselier.Enabled && !eventsOptions.Webhooks.Excitor.Enabled)
{
services.AddSingleton<IWebhookRateLimiter, NoOpWebhookRateLimiter>();
return services;
}
var queueOptions = new SchedulerQueueOptions();
configuration.GetSection("scheduler:queue").Bind(queueOptions);
if (queueOptions.Kind != SchedulerQueueTransportKind.Redis || string.IsNullOrWhiteSpace(queueOptions.Redis.ConnectionString))
{
throw new InvalidOperationException(
"Scheduler inbound webhooks require scheduler:queue Redis connection outside the Testing environment.");
}
var redisOptions = new SchedulerRedisQueueOptions
{
ConnectionString = queueOptions.Redis.ConnectionString,
Database = queueOptions.Redis.Database,
InitializationTimeout = queueOptions.Redis.InitializationTimeout
};
services.AddSingleton<IWebhookRateLimiter>(sp =>
new RedisWebhookRateLimiter(
redisOptions,
sp.GetRequiredService<IRedisConnectionFactory>(),
sp.GetService<TimeProvider>()));
return services;
}
}

View File

@@ -27,6 +27,12 @@ limit before queuing downstream processing.
`RateLimitWindowSeconds`).
* Requests over the limit return `429` and include a `Retry-After` header.
* Defaults: 120 requests / 60 seconds. Adjust via configuration.
* Outside the `Testing` environment, the limiter is distributed and backed by
the existing `scheduler:queue` Redis transport contract. `InMemoryWebhookRateLimiter`
remains test-only.
* If either inbound webhook remains enabled outside `Testing` without
`scheduler:queue.kind=Redis` and a Redis connection string, Scheduler startup
fails fast instead of falling back to process-local rate limiting.
## Configuration
@@ -44,6 +50,11 @@ Scheduler:
Enabled: true
HmacSecret: excitor-secret
RequireClientCertificate: false
queue:
kind: Redis
redis:
connectionString: localhost:6379
database: 0
```
## Response envelope
@@ -55,4 +66,3 @@ On success the webhook returns `202 Accepted` and a JSON body:
```
Failures return problem JSON with `error` describing the violation.

View File

@@ -0,0 +1,209 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.FileProviders;
using Microsoft.Extensions.Hosting;
using StackExchange.Redis;
using StellaOps.Scheduler.Queue;
using StellaOps.Scheduler.WebService.EventWebhooks;
using StellaOps.Scheduler.WebService.GraphJobs.Events;
using Testcontainers.Redis;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;
using StellaOps.TestKit;
namespace StellaOps.Scheduler.WebService.Tests;
public sealed class WebhookRateLimiterRuntimeTests : IAsyncLifetime
{
private static long _keySequence;
private readonly RedisContainer _redis = new RedisBuilder().Build();
private string? _skipReason;
public async ValueTask InitializeAsync()
{
try
{
await _redis.StartAsync();
}
catch (Exception ex) when (IsDockerUnavailable(ex))
{
_skipReason = $"Docker engine is not available for Redis-backed tests: {ex.Message}";
}
}
public async ValueTask DisposeAsync()
{
if (_skipReason is not null)
{
return;
}
await _redis.DisposeAsync().AsTask();
}
[Trait("Category", TestCategories.Unit)]
[Fact]
public void TestingRegistration_UsesInMemoryWebhookRateLimiter()
{
var services = CreateServices();
services.AddSchedulerWebhookRateLimiter(
BuildConfiguration(),
new StaticHostEnvironment("Testing"));
using var provider = services.BuildServiceProvider();
Assert.IsType<InMemoryWebhookRateLimiter>(provider.GetRequiredService<IWebhookRateLimiter>());
}
[Trait("Category", TestCategories.Unit)]
[Fact]
public void ProductionRegistration_WithDisabledWebhooks_UsesNoOpLimiter()
{
var services = CreateServices();
services.AddSchedulerWebhookRateLimiter(
BuildConfiguration(new Dictionary<string, string?>
{
["Scheduler:Events:Webhooks:Conselier:Enabled"] = "false",
["Scheduler:Events:Webhooks:Excitor:Enabled"] = "false"
}),
new StaticHostEnvironment("Production"));
using var provider = services.BuildServiceProvider();
Assert.IsType<NoOpWebhookRateLimiter>(provider.GetRequiredService<IWebhookRateLimiter>());
}
[Trait("Category", TestCategories.Unit)]
[Fact]
public void ProductionRegistration_WithoutRedisQueue_FailsFast()
{
var services = CreateServices();
var exception = Assert.Throws<InvalidOperationException>(() =>
services.AddSchedulerWebhookRateLimiter(
BuildConfiguration(new Dictionary<string, string?>
{
["Scheduler:Events:Webhooks:Conselier:Enabled"] = "true",
["Scheduler:Events:Webhooks:Conselier:HmacSecret"] = "conselier-secret",
["Scheduler:Events:Webhooks:Excitor:Enabled"] = "true",
["Scheduler:Events:Webhooks:Excitor:HmacSecret"] = "excitor-secret"
}),
new StaticHostEnvironment("Production")));
Assert.Contains(
"Scheduler inbound webhooks require scheduler:queue Redis connection outside the Testing environment.",
exception.Message,
StringComparison.Ordinal);
}
[Trait("Category", TestCategories.Unit)]
[Fact]
public void ProductionRegistration_WithRedisQueue_UsesRedisWebhookRateLimiter()
{
var services = CreateServices();
services.AddSchedulerWebhookRateLimiter(
BuildConfiguration(new Dictionary<string, string?>
{
["Scheduler:Events:Webhooks:Conselier:Enabled"] = "true",
["Scheduler:Events:Webhooks:Conselier:HmacSecret"] = "conselier-secret",
["Scheduler:Events:Webhooks:Excitor:Enabled"] = "true",
["Scheduler:Events:Webhooks:Excitor:HmacSecret"] = "excitor-secret",
["scheduler:queue:kind"] = "Redis",
["scheduler:queue:redis:connectionString"] = "localhost:6379"
}),
new StaticHostEnvironment("Production"));
using var provider = services.BuildServiceProvider();
Assert.IsType<RedisWebhookRateLimiter>(provider.GetRequiredService<IWebhookRateLimiter>());
}
[Trait("Category", TestCategories.Integration)]
[Fact]
public void RedisLimiter_SharesSlidingWindowAcrossInstances()
{
if (SkipIfUnavailable())
{
return;
}
var redisOptions = CreateRedisOptions();
using var firstLimiter = new RedisWebhookRateLimiter(redisOptions, new RedisConnectionFactory(), TimeProvider.System);
using var secondLimiter = new RedisWebhookRateLimiter(redisOptions, new RedisConnectionFactory(), TimeProvider.System);
var key = $"excitor-{Interlocked.Increment(ref _keySequence).ToString(CultureInfo.InvariantCulture)}";
var window = TimeSpan.FromSeconds(60);
Assert.True(firstLimiter.TryAcquire(key, limit: 1, window, out var firstRetryAfter));
Assert.Equal(TimeSpan.Zero, firstRetryAfter);
Assert.False(secondLimiter.TryAcquire(key, limit: 1, window, out var secondRetryAfter));
Assert.True(secondRetryAfter > TimeSpan.Zero);
}
private static ServiceCollection CreateServices()
{
var services = new ServiceCollection();
services.AddLogging();
services.AddSingleton(TimeProvider.System);
services.AddSingleton<IRedisConnectionFactory, ThrowingRedisConnectionFactory>();
return services;
}
private static IConfiguration BuildConfiguration(IDictionary<string, string?>? values = null)
{
return new ConfigurationBuilder()
.AddInMemoryCollection(values ?? new Dictionary<string, string?>())
.Build();
}
private SchedulerRedisQueueOptions CreateRedisOptions()
=> new()
{
ConnectionString = _redis.GetConnectionString(),
InitializationTimeout = TimeSpan.FromSeconds(30)
};
private bool SkipIfUnavailable()
{
if (_skipReason is not null)
{
return true;
}
return false;
}
private static bool IsDockerUnavailable(Exception exception)
{
while (exception is AggregateException aggregate && aggregate.InnerException is not null)
{
exception = aggregate.InnerException;
}
return exception is TimeoutException
|| exception.GetType().Name.Contains("Docker", StringComparison.OrdinalIgnoreCase);
}
private sealed class StaticHostEnvironment(string environmentName) : IHostEnvironment
{
public string EnvironmentName { get; set; } = environmentName;
public string ApplicationName { get; set; } = "StellaOps.Scheduler.WebService";
public string ContentRootPath { get; set; } = AppContext.BaseDirectory;
public IFileProvider ContentRootFileProvider { get; set; } = new PhysicalFileProvider(AppContext.BaseDirectory);
}
private sealed class ThrowingRedisConnectionFactory : IRedisConnectionFactory
{
public Task<IConnectionMultiplexer> ConnectAsync(ConfigurationOptions options, CancellationToken cancellationToken)
=> throw new InvalidOperationException("Redis connection should not be established in this test.");
}
}