using System.Collections.Immutable;
using System.Diagnostics;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StellaOps.Policy.Engine.Options;
using StellaOps.Policy.Engine.Telemetry;
using StellaOps.Policy.Storage.Postgres.Models;
using StellaOps.Policy.Storage.Postgres.Repositories;
using StackExchange.Redis;
namespace StellaOps.Policy.Engine.ExceptionCache;
///
/// Redis-backed exception effective cache with warm/invalidation support.
/// Key structure:
/// - Entry by asset: stellaops:exc:{tenant}:a:{asset}:{advisory|all} -> JSON array of entries
/// - Entry by exception: stellaops:exc:{tenant}:e:{exceptionId} -> JSON entry
/// - Index by exception: stellaops:exc:{tenant}:idx:e:{exceptionId} -> set of asset keys
/// - Version: stellaops:exc:{tenant}:v -> integer version
/// - Stats: stellaops:exc:{tenant}:stats -> JSON stats
///
internal sealed class RedisExceptionEffectiveCache : IExceptionEffectiveCache
{
private readonly IConnectionMultiplexer _redis;
private readonly IExceptionRepository _repository;
private readonly ILogger _logger;
private readonly ExceptionCacheOptions _options;
private readonly TimeProvider _timeProvider;
private const string KeyPrefix = "stellaops:exc";
private static readonly JsonSerializerOptions JsonOptions = new()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
WriteIndented = false,
};
public RedisExceptionEffectiveCache(
IConnectionMultiplexer redis,
IExceptionRepository repository,
ILogger logger,
IOptions options,
TimeProvider timeProvider)
{
_redis = redis ?? throw new ArgumentNullException(nameof(redis));
_repository = repository ?? throw new ArgumentNullException(nameof(repository));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_options = options?.Value.ExceptionCache ?? new ExceptionCacheOptions();
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
}
public async Task GetForAssetAsync(
string tenantId,
string assetId,
string? advisoryId,
DateTimeOffset asOf,
CancellationToken cancellationToken = default)
{
var sw = Stopwatch.StartNew();
var db = _redis.GetDatabase();
// Try specific advisory key first, then fall back to "all"
var entries = new List();
var fromCache = false;
if (advisoryId is not null)
{
var specificKey = GetAssetKey(tenantId, assetId, advisoryId);
var specificJson = await db.StringGetAsync(specificKey).ConfigureAwait(false);
if (specificJson.HasValue)
{
var specificEntries = JsonSerializer.Deserialize>((string)specificJson!, JsonOptions);
if (specificEntries is not null)
{
entries.AddRange(specificEntries);
fromCache = true;
}
}
}
// Also get "all" entries (exceptions without specific advisory)
var allKey = GetAssetKey(tenantId, assetId, null);
var allJson = await db.StringGetAsync(allKey).ConfigureAwait(false);
if (allJson.HasValue)
{
var allEntries = JsonSerializer.Deserialize>((string)allJson!, JsonOptions);
if (allEntries is not null)
{
entries.AddRange(allEntries);
fromCache = true;
}
}
// Filter by time and sort by priority
var validEntries = entries
.Where(e => e.EffectiveFrom <= asOf && (e.ExpiresAt is null || e.ExpiresAt > asOf))
.OrderByDescending(e => e.Priority)
.ToImmutableArray();
var version = await GetVersionAsync(tenantId, cancellationToken).ConfigureAwait(false);
sw.Stop();
PolicyEngineTelemetry.RecordExceptionCacheOperation(tenantId, fromCache ? "hit" : "miss");
return new ExceptionCacheQueryResult
{
Entries = validEntries,
FromCache = fromCache,
CacheVersion = version,
QueryDurationMs = sw.ElapsedMilliseconds,
};
}
public async Task> GetBatchAsync(
string tenantId,
IReadOnlyList assetIds,
DateTimeOffset asOf,
CancellationToken cancellationToken = default)
{
var results = new Dictionary(StringComparer.OrdinalIgnoreCase);
var db = _redis.GetDatabase();
// Get all "all" keys for assets
var keys = assetIds.Select(id => (RedisKey)GetAssetKey(tenantId, id, null)).ToArray();
var values = await db.StringGetAsync(keys).ConfigureAwait(false);
var version = await GetVersionAsync(tenantId, cancellationToken).ConfigureAwait(false);
for (int i = 0; i < assetIds.Count; i++)
{
var entries = ImmutableArray.Empty;
var fromCache = false;
if (values[i].HasValue)
{
var cachedEntries = JsonSerializer.Deserialize>((string)values[i]!, JsonOptions);
if (cachedEntries is not null)
{
entries = cachedEntries
.Where(e => e.EffectiveFrom <= asOf && (e.ExpiresAt is null || e.ExpiresAt > asOf))
.OrderByDescending(e => e.Priority)
.ToImmutableArray();
fromCache = true;
}
}
results[assetIds[i]] = new ExceptionCacheQueryResult
{
Entries = entries,
FromCache = fromCache,
CacheVersion = version,
QueryDurationMs = 0,
};
}
PolicyEngineTelemetry.RecordExceptionCacheOperation(tenantId, "batch_get");
return results;
}
public async Task SetAsync(
string tenantId,
ExceptionCacheEntry entry,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(entry);
var db = _redis.GetDatabase();
var assetKey = GetAssetKey(tenantId, entry.AssetId, entry.AdvisoryId);
var exceptionIndexKey = GetExceptionIndexKey(tenantId, entry.ExceptionId);
// Get existing entries for this asset
var existingJson = await db.StringGetAsync(assetKey).ConfigureAwait(false);
var entries = existingJson.HasValue
? JsonSerializer.Deserialize>((string)existingJson!, JsonOptions) ?? new List()
: new List();
// Remove existing entry for same exception if any
entries.RemoveAll(e => e.ExceptionId == entry.ExceptionId);
// Add new entry
entries.Add(entry);
var ttl = ComputeTtl(entry);
var json = JsonSerializer.Serialize(entries, JsonOptions);
var tasks = new List
{
db.StringSetAsync(assetKey, json, ttl),
db.SetAddAsync(exceptionIndexKey, assetKey),
db.KeyExpireAsync(exceptionIndexKey, ttl + TimeSpan.FromMinutes(5)),
};
await Task.WhenAll(tasks).ConfigureAwait(false);
PolicyEngineTelemetry.RecordExceptionCacheOperation(tenantId, "set");
}
public async Task SetBatchAsync(
string tenantId,
IEnumerable entries,
CancellationToken cancellationToken = default)
{
var db = _redis.GetDatabase();
var batch = db.CreateBatch();
var count = 0;
// Group entries by asset+advisory
var groupedEntries = entries
.GroupBy(e => GetAssetKey(tenantId, e.AssetId, e.AdvisoryId))
.ToDictionary(g => g.Key, g => g.ToList());
foreach (var (assetKey, assetEntries) in groupedEntries)
{
var ttl = assetEntries.Max(e => ComputeTtl(e));
var json = JsonSerializer.Serialize(assetEntries, JsonOptions);
_ = batch.StringSetAsync(assetKey, json, ttl);
// Update exception indexes
foreach (var entry in assetEntries)
{
var exceptionIndexKey = GetExceptionIndexKey(tenantId, entry.ExceptionId);
_ = batch.SetAddAsync(exceptionIndexKey, assetKey);
_ = batch.KeyExpireAsync(exceptionIndexKey, ttl + TimeSpan.FromMinutes(5));
}
count += assetEntries.Count;
}
batch.Execute();
// Increment version
await IncrementVersionAsync(tenantId, cancellationToken).ConfigureAwait(false);
PolicyEngineTelemetry.RecordExceptionCacheOperation(tenantId, "set_batch");
_logger.LogDebug("Set {Count} exception cache entries for tenant {TenantId}", count, tenantId);
}
public async Task InvalidateExceptionAsync(
string tenantId,
string exceptionId,
CancellationToken cancellationToken = default)
{
var db = _redis.GetDatabase();
var exceptionIndexKey = GetExceptionIndexKey(tenantId, exceptionId);
// Get all asset keys affected by this exception
var assetKeys = await db.SetMembersAsync(exceptionIndexKey).ConfigureAwait(false);
if (assetKeys.Length > 0)
{
// For each asset key, remove entries for this exception
foreach (var assetKey in assetKeys)
{
var json = await db.StringGetAsync((string)assetKey!).ConfigureAwait(false);
if (json.HasValue)
{
var entries = JsonSerializer.Deserialize>((string)json!, JsonOptions);
if (entries is not null)
{
entries.RemoveAll(e => e.ExceptionId == exceptionId);
if (entries.Count > 0)
{
await db.StringSetAsync((string)assetKey!, JsonSerializer.Serialize(entries, JsonOptions))
.ConfigureAwait(false);
}
else
{
await db.KeyDeleteAsync((string)assetKey!).ConfigureAwait(false);
}
}
}
}
}
// Delete the exception index
await db.KeyDeleteAsync(exceptionIndexKey).ConfigureAwait(false);
// Increment version
await IncrementVersionAsync(tenantId, cancellationToken).ConfigureAwait(false);
PolicyEngineTelemetry.RecordExceptionCacheOperation(tenantId, "invalidate_exception");
_logger.LogInformation(
"Invalidated exception {ExceptionId} affecting {Count} assets for tenant {TenantId}",
exceptionId, assetKeys.Length, tenantId);
}
public async Task InvalidateAssetAsync(
string tenantId,
string assetId,
CancellationToken cancellationToken = default)
{
var db = _redis.GetDatabase();
var server = _redis.GetServer(_redis.GetEndPoints().First());
// Find all keys for this asset (all advisory variants)
var pattern = $"{KeyPrefix}:{tenantId}:a:{assetId}:*";
var keys = server.Keys(pattern: pattern).ToArray();
if (keys.Length > 0)
{
await db.KeyDeleteAsync(keys).ConfigureAwait(false);
}
// Increment version
await IncrementVersionAsync(tenantId, cancellationToken).ConfigureAwait(false);
PolicyEngineTelemetry.RecordExceptionCacheOperation(tenantId, "invalidate_asset");
_logger.LogDebug("Invalidated {Count} cache keys for asset {AssetId}", keys.Length, assetId);
}
public async Task InvalidateTenantAsync(
string tenantId,
CancellationToken cancellationToken = default)
{
var server = _redis.GetServer(_redis.GetEndPoints().First());
var pattern = $"{KeyPrefix}:{tenantId}:*";
var keys = server.Keys(pattern: pattern).ToArray();
if (keys.Length > 0)
{
var db = _redis.GetDatabase();
await db.KeyDeleteAsync(keys).ConfigureAwait(false);
}
PolicyEngineTelemetry.RecordExceptionCacheOperation(tenantId, "invalidate_tenant");
_logger.LogInformation("Invalidated {Count} cache keys for tenant {TenantId}", keys.Length, tenantId);
}
public async Task WarmAsync(
string tenantId,
CancellationToken cancellationToken = default)
{
using var activity = PolicyEngineTelemetry.ActivitySource.StartActivity(
"exception.cache.warm", ActivityKind.Internal);
activity?.SetTag("tenant_id", tenantId);
var sw = Stopwatch.StartNew();
var now = _timeProvider.GetUtcNow();
_logger.LogInformation("Starting cache warm for tenant {TenantId}", tenantId);
try
{
var exceptions = await _repository.GetAllAsync(
tenantId,
ExceptionStatus.Active,
limit: _options.MaxEntriesPerTenant,
offset: 0,
cancellationToken: cancellationToken).ConfigureAwait(false);
if (exceptions.Count == 0)
{
_logger.LogDebug("No active exceptions to warm for tenant {TenantId}", tenantId);
return;
}
var entries = new List();
foreach (var exception in exceptions)
{
entries.Add(new ExceptionCacheEntry
{
ExceptionId = exception.Id.ToString(),
AssetId = string.IsNullOrWhiteSpace(exception.ProjectId) ? "*" : exception.ProjectId!,
AdvisoryId = null,
CveId = null,
DecisionOverride = "allow",
ExceptionType = "waiver",
Priority = 0,
EffectiveFrom = exception.CreatedAt,
ExpiresAt = exception.ExpiresAt,
CachedAt = now,
ExceptionName = exception.Name,
});
}
if (entries.Count > 0)
{
await SetBatchAsync(tenantId, entries, cancellationToken).ConfigureAwait(false);
}
sw.Stop();
// Update warm stats
await UpdateWarmStatsAsync(tenantId, now, entries.Count).ConfigureAwait(false);
PolicyEngineTelemetry.RecordExceptionCacheOperation(tenantId, "warm");
_logger.LogInformation(
"Warmed cache with {Count} entries from {ExceptionCount} exceptions for tenant {TenantId} in {Duration}ms",
entries.Count, exceptions.Count, tenantId, sw.ElapsedMilliseconds);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to warm cache for tenant {TenantId}", tenantId);
PolicyEngineTelemetry.RecordError("exception_cache_warm", tenantId);
throw;
}
}
public async Task GetSummaryAsync(
string tenantId,
CancellationToken cancellationToken = default)
{
var server = _redis.GetServer(_redis.GetEndPoints().First());
var db = _redis.GetDatabase();
var now = _timeProvider.GetUtcNow();
// Count asset keys
var assetPattern = $"{KeyPrefix}:{tenantId}:a:*";
var assetKeys = server.Keys(pattern: assetPattern).ToArray();
// Count exception index keys
var exceptionPattern = $"{KeyPrefix}:{tenantId}:idx:e:*";
var exceptionKeys = server.Keys(pattern: exceptionPattern).ToArray();
// Aggregate stats
var byType = new Dictionary(StringComparer.OrdinalIgnoreCase);
var byDecision = new Dictionary(StringComparer.OrdinalIgnoreCase);
var totalEntries = 0;
var expiringWithinHour = 0;
var uniqueAssets = new HashSet(StringComparer.OrdinalIgnoreCase);
foreach (var key in assetKeys.Take(1000)) // Limit scan for performance
{
var json = await db.StringGetAsync(key).ConfigureAwait(false);
if (!json.HasValue) continue;
var entries = JsonSerializer.Deserialize>((string)json!, JsonOptions);
if (entries is null) continue;
foreach (var entry in entries)
{
totalEntries++;
uniqueAssets.Add(entry.AssetId);
byType.TryGetValue(entry.ExceptionType, out var typeCount);
byType[entry.ExceptionType] = typeCount + 1;
byDecision.TryGetValue(entry.DecisionOverride, out var decisionCount);
byDecision[entry.DecisionOverride] = decisionCount + 1;
if (entry.ExpiresAt.HasValue && entry.ExpiresAt.Value - now <= TimeSpan.FromHours(1))
{
expiringWithinHour++;
}
}
}
var version = await GetVersionAsync(tenantId, cancellationToken).ConfigureAwait(false);
return new ExceptionCacheSummary
{
TenantId = tenantId,
TotalEntries = totalEntries,
UniqueExceptions = exceptionKeys.Length,
UniqueAssets = uniqueAssets.Count,
ByType = byType,
ByDecision = byDecision,
ExpiringWithinHour = expiringWithinHour,
CacheVersion = version,
ComputedAt = now,
};
}
public async Task GetStatsAsync(
string? tenantId = null,
CancellationToken cancellationToken = default)
{
var server = _redis.GetServer(_redis.GetEndPoints().First());
var pattern = tenantId != null
? $"{KeyPrefix}:{tenantId}:a:*"
: $"{KeyPrefix}:*:a:*";
var entryCount = server.Keys(pattern: pattern).Count();
var tenantPattern = tenantId != null
? $"{KeyPrefix}:{tenantId}:v"
: $"{KeyPrefix}:*:v";
var tenantCount = server.Keys(pattern: tenantPattern).Count();
long? memoryUsed = null;
try
{
var info = server.Info("memory");
var memorySection = info.FirstOrDefault(s => s.Key == "Memory");
if (memorySection is not null)
{
var usedMemory = memorySection.FirstOrDefault(p => p.Key == "used_memory");
if (usedMemory.Key is not null && long.TryParse(usedMemory.Value, out var bytes))
{
memoryUsed = bytes;
}
}
}
catch
{
// Ignore - memory info not available
}
return new ExceptionCacheStats
{
TotalEntries = entryCount,
TotalTenants = tenantCount,
MemoryUsedBytes = memoryUsed,
HitCount = 0, // Would need to track separately
MissCount = 0,
LastWarmAt = null,
LastInvalidationAt = null,
};
}
public async Task GetVersionAsync(
string tenantId,
CancellationToken cancellationToken = default)
{
var db = _redis.GetDatabase();
var versionKey = GetVersionKey(tenantId);
var version = await db.StringGetAsync(versionKey).ConfigureAwait(false);
return version.HasValue ? (long)version : 0;
}
public async Task HandleExceptionEventAsync(
ExceptionEvent exceptionEvent,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(exceptionEvent);
using var activity = PolicyEngineTelemetry.ActivitySource.StartActivity(
"exception.cache.handle_event", ActivityKind.Internal);
activity?.SetTag("tenant_id", exceptionEvent.TenantId);
activity?.SetTag("event_type", exceptionEvent.EventType);
activity?.SetTag("exception_id", exceptionEvent.ExceptionId);
_logger.LogDebug(
"Handling exception event {EventType} for exception {ExceptionId} tenant {TenantId}",
exceptionEvent.EventType, exceptionEvent.ExceptionId, exceptionEvent.TenantId);
switch (exceptionEvent.EventType.ToLowerInvariant())
{
case "activated":
await WarmExceptionAsync(exceptionEvent.TenantId, exceptionEvent.ExceptionId, cancellationToken)
.ConfigureAwait(false);
break;
case "expired":
case "revoked":
case "deleted":
await InvalidateExceptionAsync(exceptionEvent.TenantId, exceptionEvent.ExceptionId, cancellationToken)
.ConfigureAwait(false);
break;
case "updated":
await InvalidateExceptionAsync(exceptionEvent.TenantId, exceptionEvent.ExceptionId, cancellationToken)
.ConfigureAwait(false);
await WarmExceptionAsync(exceptionEvent.TenantId, exceptionEvent.ExceptionId, cancellationToken)
.ConfigureAwait(false);
break;
case "created":
await WarmExceptionAsync(exceptionEvent.TenantId, exceptionEvent.ExceptionId, cancellationToken)
.ConfigureAwait(false);
break;
default:
_logger.LogWarning("Unknown exception event type: {EventType}", exceptionEvent.EventType);
break;
}
PolicyEngineTelemetry.RecordExceptionCacheOperation(exceptionEvent.TenantId, $"event_{exceptionEvent.EventType}");
}
private async Task WarmExceptionAsync(string tenantId, string exceptionId, CancellationToken cancellationToken)
{
if (!Guid.TryParse(exceptionId, out var exceptionGuid))
{
_logger.LogWarning("Unable to parse exception id {ExceptionId} for tenant {TenantId}", exceptionId, tenantId);
return;
}
var exception = await _repository.GetByIdAsync(tenantId, exceptionGuid, cancellationToken)
.ConfigureAwait(false);
if (exception is null || exception.Status != ExceptionStatus.Active)
{
return;
}
var now = _timeProvider.GetUtcNow();
var entries = new List();
entries.Add(new ExceptionCacheEntry
{
ExceptionId = exception.Id.ToString(),
AssetId = string.IsNullOrWhiteSpace(exception.ProjectId) ? "*" : exception.ProjectId!,
AdvisoryId = null,
CveId = null,
DecisionOverride = "allow",
ExceptionType = "waiver",
Priority = 0,
EffectiveFrom = exception.CreatedAt,
ExpiresAt = exception.ExpiresAt,
CachedAt = now,
ExceptionName = exception.Name,
});
await SetBatchAsync(tenantId, entries, cancellationToken).ConfigureAwait(false);
_logger.LogDebug(
"Warmed cache with {Count} entries for exception {ExceptionId}",
entries.Count, exceptionId);
}
private async Task IncrementVersionAsync(string tenantId, CancellationToken cancellationToken)
{
var db = _redis.GetDatabase();
var versionKey = GetVersionKey(tenantId);
var newVersion = await db.StringIncrementAsync(versionKey).ConfigureAwait(false);
// Set TTL on version key if not already set
await db.KeyExpireAsync(versionKey, TimeSpan.FromMinutes(_options.DefaultTtlMinutes + 10), ExpireWhen.HasNoExpiry)
.ConfigureAwait(false);
return newVersion;
}
private async Task UpdateWarmStatsAsync(string tenantId, DateTimeOffset warmAt, int count)
{
var db = _redis.GetDatabase();
var statsKey = GetStatsKey(tenantId);
var stats = new Dictionary
{
["lastWarmAt"] = warmAt.ToString("O"),
["lastWarmCount"] = count.ToString(),
};
await db.HashSetAsync(statsKey, stats.Select(kv => new HashEntry(kv.Key, kv.Value)).ToArray())
.ConfigureAwait(false);
}
private TimeSpan ComputeTtl(ExceptionCacheEntry entry)
{
if (entry.ExpiresAt.HasValue)
{
var ttl = entry.ExpiresAt.Value - _timeProvider.GetUtcNow();
if (ttl > TimeSpan.Zero)
{
return ttl;
}
}
return TimeSpan.FromMinutes(_options.DefaultTtlMinutes);
}
private static string GetAssetKey(string tenantId, string assetId, string? advisoryId) =>
$"{KeyPrefix}:{tenantId}:a:{assetId}:{advisoryId ?? "all"}";
private static string GetExceptionIndexKey(string tenantId, string exceptionId) =>
$"{KeyPrefix}:{tenantId}:idx:e:{exceptionId}";
private static string GetVersionKey(string tenantId) =>
$"{KeyPrefix}:{tenantId}:v";
private static string GetStatsKey(string tenantId) =>
$"{KeyPrefix}:{tenantId}:stats";
}