Files
git.stella-ops.org/src/Policy/StellaOps.Policy.Engine/EffectiveDecisionMap/MessagingEffectiveDecisionMap.cs
StellaOps Bot 999e26a48e up
2025-12-13 02:22:15 +02:00

429 lines
17 KiB
C#

using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StellaOps.Messaging;
using StellaOps.Messaging.Abstractions;
using StellaOps.Policy.Engine.Options;
using StellaOps.Policy.Engine.Telemetry;
namespace StellaOps.Policy.Engine.EffectiveDecisionMap;
/// <summary>
/// Transport-agnostic effective decision map using StellaOps.Messaging abstractions.
/// Works with any configured transport (Valkey, PostgreSQL, InMemory).
/// </summary>
internal sealed class MessagingEffectiveDecisionMap : IEffectiveDecisionMap
{
private readonly IDistributedCache<string, EffectiveDecisionEntry> _entryCache;
private readonly ISortedIndex<string, string> _assetIndex;
private readonly IDistributedCache<string, long> _versionCache;
private readonly ILogger<MessagingEffectiveDecisionMap> _logger;
private readonly EffectiveDecisionMapOptions _options;
private readonly TimeProvider _timeProvider;
private const string EntryKeyPrefix = "edm:entry";
private const string IndexKeyPrefix = "edm:index";
private const string VersionKeyPrefix = "edm:version";
public MessagingEffectiveDecisionMap(
IDistributedCacheFactory cacheFactory,
ISortedIndexFactory sortedIndexFactory,
ILogger<MessagingEffectiveDecisionMap> logger,
IOptions<PolicyEngineOptions> options,
TimeProvider timeProvider)
{
ArgumentNullException.ThrowIfNull(cacheFactory);
ArgumentNullException.ThrowIfNull(sortedIndexFactory);
_entryCache = cacheFactory.Create<string, EffectiveDecisionEntry>(new CacheOptions { KeyPrefix = "edm:entries" });
_assetIndex = sortedIndexFactory.Create<string, string>("edm-asset-index");
_versionCache = cacheFactory.Create<string, long>(new CacheOptions { KeyPrefix = "edm:versions" });
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_options = options?.Value.EffectiveDecisionMap ?? new EffectiveDecisionMapOptions();
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
}
public async Task SetAsync(
string tenantId,
string snapshotId,
EffectiveDecisionEntry entry,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(entry);
var entryKey = GetEntryKey(tenantId, snapshotId, entry.AssetId);
var indexKey = GetIndexKey(tenantId, snapshotId);
var now = _timeProvider.GetUtcNow();
var ttl = entry.ExpiresAt - now;
if (ttl <= TimeSpan.Zero)
{
ttl = TimeSpan.FromMinutes(_options.DefaultTtlMinutes);
}
var cacheOptions = new CacheEntryOptions { TimeToLive = ttl };
// Store entry with TTL
await _entryCache.SetAsync(entryKey, entry, cacheOptions, cancellationToken).ConfigureAwait(false);
// Add to sorted index by evaluated_at timestamp
var score = entry.EvaluatedAt.ToUnixTimeMilliseconds();
await _assetIndex.AddAsync(indexKey, entry.AssetId, score, cancellationToken).ConfigureAwait(false);
// Set TTL on index (slightly longer than entries)
await _assetIndex.SetExpirationAsync(indexKey, ttl.Add(TimeSpan.FromMinutes(5)), cancellationToken).ConfigureAwait(false);
PolicyEngineTelemetry.EffectiveDecisionMapOperations.Add(1,
new KeyValuePair<string, object?>("operation", "set"),
new KeyValuePair<string, object?>("tenant_id", tenantId));
}
public async Task SetBatchAsync(
string tenantId,
string snapshotId,
IEnumerable<EffectiveDecisionEntry> entries,
CancellationToken cancellationToken = default)
{
var now = _timeProvider.GetUtcNow();
var indexKey = GetIndexKey(tenantId, snapshotId);
var count = 0;
var maxTtl = TimeSpan.Zero;
var indexElements = new List<ScoredElement<string>>();
foreach (var entry in entries)
{
var entryKey = GetEntryKey(tenantId, snapshotId, entry.AssetId);
var ttl = entry.ExpiresAt - now;
if (ttl <= TimeSpan.Zero)
{
ttl = TimeSpan.FromMinutes(_options.DefaultTtlMinutes);
}
if (ttl > maxTtl) maxTtl = ttl;
var cacheOptions = new CacheEntryOptions { TimeToLive = ttl };
await _entryCache.SetAsync(entryKey, entry, cacheOptions, cancellationToken).ConfigureAwait(false);
indexElements.Add(new ScoredElement<string>(entry.AssetId, entry.EvaluatedAt.ToUnixTimeMilliseconds()));
count++;
}
if (indexElements.Count > 0)
{
await _assetIndex.AddRangeAsync(indexKey, indexElements, cancellationToken).ConfigureAwait(false);
await _assetIndex.SetExpirationAsync(indexKey, maxTtl.Add(TimeSpan.FromMinutes(5)), cancellationToken).ConfigureAwait(false);
}
// Increment version after batch write
await IncrementVersionAsync(tenantId, snapshotId, cancellationToken).ConfigureAwait(false);
PolicyEngineTelemetry.EffectiveDecisionMapOperations.Add(count,
new KeyValuePair<string, object?>("operation", "set_batch"),
new KeyValuePair<string, object?>("tenant_id", tenantId));
_logger.LogDebug("Set {Count} effective decisions for snapshot {SnapshotId}", count, snapshotId);
}
public async Task<EffectiveDecisionEntry?> GetAsync(
string tenantId,
string snapshotId,
string assetId,
CancellationToken cancellationToken = default)
{
var entryKey = GetEntryKey(tenantId, snapshotId, assetId);
var result = await _entryCache.GetAsync(entryKey, cancellationToken).ConfigureAwait(false);
PolicyEngineTelemetry.EffectiveDecisionMapOperations.Add(1,
new KeyValuePair<string, object?>("operation", "get"),
new KeyValuePair<string, object?>("tenant_id", tenantId),
new KeyValuePair<string, object?>("cache_hit", result.HasValue));
return result.HasValue ? result.Value : null;
}
public async Task<EffectiveDecisionQueryResult> GetBatchAsync(
string tenantId,
string snapshotId,
IReadOnlyList<string> assetIds,
CancellationToken cancellationToken = default)
{
var entries = new Dictionary<string, EffectiveDecisionEntry>();
var notFound = new List<string>();
foreach (var assetId in assetIds)
{
var entryKey = GetEntryKey(tenantId, snapshotId, assetId);
var result = await _entryCache.GetAsync(entryKey, cancellationToken).ConfigureAwait(false);
if (result.HasValue && result.Value is not null)
{
entries[assetId] = result.Value;
}
else
{
notFound.Add(assetId);
}
}
var version = await GetVersionAsync(tenantId, snapshotId, cancellationToken).ConfigureAwait(false);
PolicyEngineTelemetry.EffectiveDecisionMapOperations.Add(assetIds.Count,
new KeyValuePair<string, object?>("operation", "get_batch"),
new KeyValuePair<string, object?>("tenant_id", tenantId));
return new EffectiveDecisionQueryResult
{
Entries = entries,
NotFound = notFound,
MapVersion = version,
FromCache = true,
};
}
public async Task<IReadOnlyList<EffectiveDecisionEntry>> GetAllForSnapshotAsync(
string tenantId,
string snapshotId,
EffectiveDecisionFilter? filter = null,
CancellationToken cancellationToken = default)
{
var indexKey = GetIndexKey(tenantId, snapshotId);
// Get all asset IDs from index, ordered by score (evaluated_at) descending
var assetElements = await _assetIndex.GetByRankAsync(
indexKey, 0, -1, SortOrder.Descending, cancellationToken).ConfigureAwait(false);
if (assetElements.Count == 0)
{
return Array.Empty<EffectiveDecisionEntry>();
}
var entries = new List<EffectiveDecisionEntry>();
foreach (var element in assetElements)
{
var entryKey = GetEntryKey(tenantId, snapshotId, element.Element);
var result = await _entryCache.GetAsync(entryKey, cancellationToken).ConfigureAwait(false);
if (!result.HasValue || result.Value is null) continue;
var entry = result.Value;
// Apply filters
if (filter != null)
{
if (filter.Statuses?.Count > 0 &&
!filter.Statuses.Contains(entry.Status, StringComparer.OrdinalIgnoreCase))
{
continue;
}
if (filter.Severities?.Count > 0 &&
(entry.Severity is null || !filter.Severities.Contains(entry.Severity, StringComparer.OrdinalIgnoreCase)))
{
continue;
}
if (filter.HasException == true && entry.ExceptionId is null)
{
continue;
}
if (filter.HasException == false && entry.ExceptionId is not null)
{
continue;
}
if (filter.MinAdvisoryCount.HasValue && entry.AdvisoryCount < filter.MinAdvisoryCount)
{
continue;
}
if (filter.MinHighSeverityCount.HasValue && entry.HighSeverityCount < filter.MinHighSeverityCount)
{
continue;
}
}
entries.Add(entry);
// Apply limit (accounting for offset)
if (filter?.Limit > 0 && entries.Count >= filter.Limit + (filter?.Offset ?? 0))
{
break;
}
}
// Apply offset
if (filter?.Offset > 0)
{
entries = entries.Skip(filter.Offset).ToList();
}
// Apply final limit
if (filter?.Limit > 0)
{
entries = entries.Take(filter.Limit).ToList();
}
PolicyEngineTelemetry.EffectiveDecisionMapOperations.Add(1,
new KeyValuePair<string, object?>("operation", "get_all"),
new KeyValuePair<string, object?>("tenant_id", tenantId));
return entries;
}
public async Task<EffectiveDecisionSummary> GetSummaryAsync(
string tenantId,
string snapshotId,
CancellationToken cancellationToken = default)
{
var entries = await GetAllForSnapshotAsync(tenantId, snapshotId, null, cancellationToken)
.ConfigureAwait(false);
var statusCounts = entries
.GroupBy(e => e.Status, StringComparer.OrdinalIgnoreCase)
.ToDictionary(g => g.Key, g => g.Count(), StringComparer.OrdinalIgnoreCase);
var severityCounts = entries
.Where(e => e.Severity is not null)
.GroupBy(e => e.Severity!, StringComparer.OrdinalIgnoreCase)
.ToDictionary(g => g.Key, g => g.Count(), StringComparer.OrdinalIgnoreCase);
var version = await GetVersionAsync(tenantId, snapshotId, cancellationToken).ConfigureAwait(false);
return new EffectiveDecisionSummary
{
SnapshotId = snapshotId,
TotalAssets = entries.Count,
StatusCounts = statusCounts,
SeverityCounts = severityCounts,
ExceptionCount = entries.Count(e => e.ExceptionId is not null),
MapVersion = version,
ComputedAt = _timeProvider.GetUtcNow(),
};
}
public async Task InvalidateAsync(
string tenantId,
string snapshotId,
string assetId,
CancellationToken cancellationToken = default)
{
var entryKey = GetEntryKey(tenantId, snapshotId, assetId);
var indexKey = GetIndexKey(tenantId, snapshotId);
await _entryCache.InvalidateAsync(entryKey, cancellationToken).ConfigureAwait(false);
await _assetIndex.RemoveAsync(indexKey, assetId, cancellationToken).ConfigureAwait(false);
await IncrementVersionAsync(tenantId, snapshotId, cancellationToken).ConfigureAwait(false);
PolicyEngineTelemetry.EffectiveDecisionMapOperations.Add(1,
new KeyValuePair<string, object?>("operation", "invalidate"),
new KeyValuePair<string, object?>("tenant_id", tenantId));
}
public async Task InvalidateSnapshotAsync(
string tenantId,
string snapshotId,
CancellationToken cancellationToken = default)
{
var indexKey = GetIndexKey(tenantId, snapshotId);
// Get all asset IDs from the index
var assetElements = await _assetIndex.GetByRankAsync(indexKey, 0, -1, cancellationToken: cancellationToken).ConfigureAwait(false);
var count = assetElements.Count;
foreach (var element in assetElements)
{
var entryKey = GetEntryKey(tenantId, snapshotId, element.Element);
await _entryCache.InvalidateAsync(entryKey, cancellationToken).ConfigureAwait(false);
}
// Delete the index
await _assetIndex.DeleteAsync(indexKey, cancellationToken).ConfigureAwait(false);
// Delete the version
var versionKey = GetVersionKey(tenantId, snapshotId);
await _versionCache.InvalidateAsync(versionKey, cancellationToken).ConfigureAwait(false);
PolicyEngineTelemetry.EffectiveDecisionMapOperations.Add(count,
new KeyValuePair<string, object?>("operation", "invalidate_snapshot"),
new KeyValuePair<string, object?>("tenant_id", tenantId));
_logger.LogInformation("Invalidated {Count} entries for snapshot {SnapshotId}", count, snapshotId);
}
public async Task InvalidateTenantAsync(
string tenantId,
CancellationToken cancellationToken = default)
{
// Invalidate all entries and indexes for the tenant using pattern matching
var pattern = $"{EntryKeyPrefix}:{tenantId}:*";
var entryCount = await _entryCache.InvalidateByPatternAsync(pattern, cancellationToken).ConfigureAwait(false);
// Note: ISortedIndex doesn't have pattern-based deletion, so we can't easily clean up indexes
// This is a limitation of the abstraction - the Redis implementation handled this with KEYS scan
PolicyEngineTelemetry.EffectiveDecisionMapOperations.Add(entryCount,
new KeyValuePair<string, object?>("operation", "invalidate_tenant"),
new KeyValuePair<string, object?>("tenant_id", tenantId));
_logger.LogInformation("Invalidated {Count} entries for tenant {TenantId}", entryCount, tenantId);
}
public async Task<long> GetVersionAsync(
string tenantId,
string snapshotId,
CancellationToken cancellationToken = default)
{
var versionKey = GetVersionKey(tenantId, snapshotId);
var result = await _versionCache.GetAsync(versionKey, cancellationToken).ConfigureAwait(false);
return result.HasValue ? result.Value : 0;
}
public async Task<long> IncrementVersionAsync(
string tenantId,
string snapshotId,
CancellationToken cancellationToken = default)
{
var versionKey = GetVersionKey(tenantId, snapshotId);
var current = await GetVersionAsync(tenantId, snapshotId, cancellationToken).ConfigureAwait(false);
var newVersion = current + 1;
var cacheOptions = new CacheEntryOptions
{
TimeToLive = TimeSpan.FromMinutes(_options.DefaultTtlMinutes + 10)
};
await _versionCache.SetAsync(versionKey, newVersion, cacheOptions, cancellationToken).ConfigureAwait(false);
return newVersion;
}
public Task<EffectiveDecisionMapStats> GetStatsAsync(
string? tenantId = null,
CancellationToken cancellationToken = default)
{
// Stats require implementation-specific queries that aren't available through abstractions
// Return placeholder stats - a complete implementation would need transport-specific code
return Task.FromResult(new EffectiveDecisionMapStats
{
TotalEntries = 0,
TotalSnapshots = 0,
MemoryUsedBytes = null,
ExpiringWithinHour = 0,
LastEvictionAt = null,
LastEvictionCount = 0,
});
}
private static string GetEntryKey(string tenantId, string snapshotId, string assetId) =>
$"{EntryKeyPrefix}:{tenantId}:{snapshotId}:{assetId}";
private static string GetIndexKey(string tenantId, string snapshotId) =>
$"{IndexKeyPrefix}:{tenantId}:{snapshotId}";
private static string GetVersionKey(string tenantId, string snapshotId) =>
$"{VersionKeyPrefix}:{tenantId}:{snapshotId}";
}