using Microsoft.Extensions.Caching.Memory; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Collections.Concurrent; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Text.Json; namespace StellaOps.FeatureFlags; /// /// Composite feature flag service that aggregates flags from multiple providers. /// Providers are checked in priority order; first match wins. /// public sealed class CompositeFeatureFlagService : IFeatureFlagService, IDisposable { private readonly IReadOnlyList _providers; private readonly FeatureFlagOptions _options; private readonly ILogger _logger; private readonly MemoryCache _cache; private readonly Subject _changeSubject; private readonly CancellationTokenSource _watchCts; private readonly List _watchTasks; private bool _disposed; /// /// Creates a new composite feature flag service. /// public CompositeFeatureFlagService( IEnumerable providers, IOptions options, ILogger logger) { _providers = providers.OrderBy(p => p.Priority).ToList(); _options = options.Value; _logger = logger; _cache = new MemoryCache(new MemoryCacheOptions()); _changeSubject = new Subject(); _watchCts = new CancellationTokenSource(); _watchTasks = []; // Start watching providers that support it StartWatching(); _logger.LogInformation( "CompositeFeatureFlagService initialized with {ProviderCount} providers: {Providers}", _providers.Count, string.Join(", ", _providers.Select(p => $"{p.Name}(priority={p.Priority})"))); } /// public IObservable OnFlagChanged => _changeSubject.AsObservable(); /// public Task IsEnabledAsync(string flagKey, CancellationToken ct = default) { return IsEnabledAsync(flagKey, FeatureFlagEvaluationContext.Empty, ct); } /// public async Task IsEnabledAsync( string flagKey, FeatureFlagEvaluationContext context, CancellationToken ct = default) { var result = await EvaluateAsync(flagKey, context, ct); return result.Enabled; } /// public async Task EvaluateAsync( string flagKey, FeatureFlagEvaluationContext? context = null, CancellationToken ct = default) { context ??= FeatureFlagEvaluationContext.Empty; // Check cache first var cacheKey = GetCacheKey(flagKey, context); if (_options.EnableCaching && _cache.TryGetValue(cacheKey, out FeatureFlagResult? cached) && cached is not null) { if (_options.EnableLogging) { _logger.LogDebug("Flag '{FlagKey}' returned from cache: {Enabled}", flagKey, cached.Enabled); } return cached; } // Try each provider in priority order foreach (var provider in _providers) { try { var result = await provider.TryGetFlagAsync(flagKey, context, ct); if (result is not null) { // Cache the result if (_options.EnableCaching) { _cache.Set(cacheKey, result, _options.CacheDuration); } if (_options.EnableLogging) { _logger.LogDebug( "Flag '{FlagKey}' evaluated by {Provider}: Enabled={Enabled}, Reason={Reason}", flagKey, provider.Name, result.Enabled, result.Reason); } return result; } } catch (Exception ex) { _logger.LogWarning(ex, "Provider {Provider} failed to evaluate flag '{FlagKey}'", provider.Name, flagKey); } } // No provider had the flag, return default var defaultResult = new FeatureFlagResult( flagKey, _options.DefaultValue, null, "Flag not found in any provider", "default"); if (_options.EnableLogging) { _logger.LogDebug( "Flag '{FlagKey}' not found, using default: {Default}", flagKey, _options.DefaultValue); } return defaultResult; } /// public async Task GetVariantAsync( string flagKey, T defaultValue, FeatureFlagEvaluationContext? context = null, CancellationToken ct = default) { var result = await EvaluateAsync(flagKey, context, ct); if (result.Variant is null) { return defaultValue; } try { // Handle direct type match if (result.Variant is T typedValue) { return typedValue; } // Handle JSON string variant if (result.Variant is string jsonString) { return JsonSerializer.Deserialize(jsonString) ?? defaultValue; } // Handle JsonElement variant if (result.Variant is JsonElement jsonElement) { return JsonSerializer.Deserialize(jsonElement.GetRawText()) ?? defaultValue; } // Try conversion return (T)Convert.ChangeType(result.Variant, typeof(T)); } catch (Exception ex) { _logger.LogWarning(ex, "Failed to convert variant for flag '{FlagKey}' to type {Type}", flagKey, typeof(T).Name); return defaultValue; } } /// public async Task> ListFlagsAsync(CancellationToken ct = default) { var allFlags = new Dictionary(); foreach (var provider in _providers) { try { var flags = await provider.ListFlagsAsync(ct); foreach (var flag in flags) { // First provider to define a flag wins (priority order) if (!allFlags.ContainsKey(flag.Key)) { allFlags[flag.Key] = flag; } } } catch (Exception ex) { _logger.LogWarning(ex, "Provider {Provider} failed to list flags", provider.Name); } } return allFlags.Values.OrderBy(f => f.Key).ToList(); } /// public void InvalidateCache(string? flagKey = null) { if (flagKey is null) { // Clear all cached values _cache.Compact(1.0); _logger.LogDebug("All feature flag cache entries invalidated"); } else { // We can't easily invalidate a single key with all contexts, // so we compact the entire cache _cache.Compact(1.0); _logger.LogDebug("Feature flag cache invalidated for key '{FlagKey}'", flagKey); } } private void StartWatching() { foreach (var provider in _providers.Where(p => p.SupportsWatch)) { var task = Task.Run(async () => { try { await foreach (var change in provider.WatchAsync(_watchCts.Token)) { // Invalidate cache for changed flag InvalidateCache(change.Key); // Publish change event _changeSubject.OnNext(change); if (_options.EnableLogging) { _logger.LogInformation( "Flag '{FlagKey}' changed from {OldValue} to {NewValue} (source: {Source})", change.Key, change.OldValue, change.NewValue, change.Source); } } } catch (OperationCanceledException) when (_watchCts.Token.IsCancellationRequested) { // Expected during shutdown } catch (Exception ex) { _logger.LogError(ex, "Error watching provider {Provider}", provider.Name); } }); _watchTasks.Add(task); } } private static string GetCacheKey(string flagKey, FeatureFlagEvaluationContext context) { // Include relevant context in cache key var contextHash = HashCode.Combine( context.UserId, context.TenantId, context.Environment); return $"ff:{flagKey}:{contextHash}"; } /// public void Dispose() { if (_disposed) return; _watchCts.Cancel(); _watchCts.Dispose(); try { Task.WaitAll([.. _watchTasks], TimeSpan.FromSeconds(5)); } catch (AggregateException) { // Ignore cancellation exceptions } _changeSubject.Dispose(); _cache.Dispose(); _disposed = true; } }