// ----------------------------------------------------------------------------- // ValkeyRateLimitStore.cs // Sprint: SPRINT_1200_001_001_router_rate_limiting_core // Task: 1.3 - Valkey-Backed Environment Rate Limiter // Description: Valkey-backed fixed-window rate limit store with atomic Lua script // ----------------------------------------------------------------------------- using Microsoft.Extensions.Logging; using StackExchange.Redis; namespace StellaOps.Router.Gateway.RateLimit; /// /// Valkey-backed fixed-window rate limit store. /// public sealed class ValkeyRateLimitStore : IValkeyRateLimitStore, IDisposable { private const string RateLimitScript = @" local bucket = ARGV[1] local key = ARGV[2] local ruleCount = tonumber(ARGV[3]) local nowSec = tonumber(redis.call('TIME')[1]) local allowed = 1 local maxRetryAfter = 0 local outCount = 0 local outLimit = 0 local outWindow = 0 for i = 0, ruleCount - 1 do local windowSec = tonumber(ARGV[4 + (i * 2)]) local limit = tonumber(ARGV[5 + (i * 2)]) local windowStart = nowSec - (nowSec % windowSec) local counterKey = bucket .. ':' .. key .. ':' .. windowSec .. ':' .. windowStart local count = redis.call('INCR', counterKey) if count == 1 then redis.call('EXPIRE', counterKey, windowSec + 1) end if i == 0 then outCount = count outLimit = limit outWindow = windowSec end if count > limit then allowed = 0 local retryAfter = windowSec - (nowSec - windowStart) if retryAfter > maxRetryAfter then maxRetryAfter = retryAfter outCount = count outLimit = limit outWindow = windowSec end end end if allowed == 1 then return {1, 0, outCount, outLimit, outWindow} end return {0, maxRetryAfter, outCount, outLimit, outWindow} "; private readonly string _connectionString; private readonly string _bucket; private readonly ILogger? _logger; private readonly SemaphoreSlim _connectionLock = new(1, 1); private IConnectionMultiplexer? _connection; private bool _disposed; public ValkeyRateLimitStore(string connectionString, string bucket, ILogger? logger = null) { _connectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString)); _bucket = string.IsNullOrWhiteSpace(bucket) ? throw new ArgumentException("Bucket is required", nameof(bucket)) : bucket; _logger = logger; } public async Task IncrementAndCheckAsync( string key, IReadOnlyList rules, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(key); ArgumentNullException.ThrowIfNull(rules); if (rules.Count == 0) { return new RateLimitStoreResult( Allowed: true, CurrentCount: 0, Limit: 0, WindowSeconds: 0, RetryAfterSeconds: 0); } var connection = await GetConnectionAsync(cancellationToken).ConfigureAwait(false); var db = connection.GetDatabase(); // Deterministic ordering: smallest window first (used for representative headers when allowed). var orderedRules = rules .OrderBy(r => r.PerSeconds) .ThenBy(r => r.MaxRequests) .ToArray(); var values = new RedisValue[3 + (orderedRules.Length * 2)]; values[0] = _bucket; values[1] = key; values[2] = orderedRules.Length; var idx = 3; foreach (var rule in orderedRules) { values[idx++] = rule.PerSeconds; values[idx++] = rule.MaxRequests; } var raw = await db.ScriptEvaluateAsync( RateLimitScript, [], values).ConfigureAwait(false); var results = (RedisResult[])raw!; var allowed = (int)results[0]! == 1; var retryAfter = (int)results[1]!; var currentCount = (long)results[2]!; var limit = (int)results[3]!; var windowSeconds = (int)results[4]!; if (!allowed && retryAfter <= 0) { _logger?.LogWarning("Valkey rate limit script returned invalid retry_after ({RetryAfter}) for {Key}", retryAfter, key); retryAfter = 1; } return new RateLimitStoreResult( Allowed: allowed, CurrentCount: currentCount, Limit: limit, WindowSeconds: windowSeconds, RetryAfterSeconds: allowed ? 0 : retryAfter); } public void Dispose() { if (_disposed) return; _disposed = true; if (_connection is not null) { _connection.Close(); _connection.Dispose(); } _connectionLock.Dispose(); } private async Task GetConnectionAsync(CancellationToken cancellationToken) { if (_connection is not null && _connection.IsConnected) { return _connection; } await _connectionLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { if (_connection is null || !_connection.IsConnected) { _connection?.Dispose(); _logger?.LogDebug("Connecting to Valkey at {Endpoint}", _connectionString); _connection = await ConnectionMultiplexer.ConnectAsync(_connectionString).ConfigureAwait(false); _logger?.LogInformation("Connected to Valkey"); } } finally { _connectionLock.Release(); } return _connection; } }