up
Some checks failed
Concelier Attestation Tests / attestation-tests (push) Has been cancelled
Policy Simulation / policy-simulate (push) Has been cancelled
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Signals CI & Image / signals-ci (push) Has been cancelled
Signals Reachability Scoring & Events / reachability-smoke (push) Has been cancelled
Signals Reachability Scoring & Events / sign-and-upload (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
Policy Lint & Smoke / policy-lint (push) Has been cancelled
Scanner Analyzers / Discover Analyzers (push) Has been cancelled
Scanner Analyzers / Build Analyzers (push) Has been cancelled
Scanner Analyzers / Test Language Analyzers (push) Has been cancelled
Scanner Analyzers / Validate Test Fixtures (push) Has been cancelled
Scanner Analyzers / Verify Deterministic Output (push) Has been cancelled
Some checks failed
Concelier Attestation Tests / attestation-tests (push) Has been cancelled
Policy Simulation / policy-simulate (push) Has been cancelled
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Signals CI & Image / signals-ci (push) Has been cancelled
Signals Reachability Scoring & Events / reachability-smoke (push) Has been cancelled
Signals Reachability Scoring & Events / sign-and-upload (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
Policy Lint & Smoke / policy-lint (push) Has been cancelled
Scanner Analyzers / Discover Analyzers (push) Has been cancelled
Scanner Analyzers / Build Analyzers (push) Has been cancelled
Scanner Analyzers / Test Language Analyzers (push) Has been cancelled
Scanner Analyzers / Validate Test Fixtures (push) Has been cancelled
Scanner Analyzers / Verify Deterministic Output (push) Has been cancelled
This commit is contained in:
@@ -0,0 +1,180 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using StellaOps.Messaging;
|
||||
using StellaOps.Messaging.Abstractions;
|
||||
using StellaOps.Policy.Engine.Options;
|
||||
|
||||
namespace StellaOps.Policy.Engine.ReachabilityFacts;
|
||||
|
||||
/// <summary>
|
||||
/// Reachability facts overlay cache backed by <see cref="IDistributedCache{TValue}"/>.
|
||||
/// Supports any transport (InMemory, Valkey, PostgreSQL) via factory injection.
|
||||
/// </summary>
|
||||
public sealed class MessagingReachabilityFactsOverlayCache : IReachabilityFactsOverlayCache
|
||||
{
|
||||
private readonly IDistributedCache<ReachabilityFact> _cache;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly ILogger<MessagingReachabilityFactsOverlayCache> _logger;
|
||||
private readonly TimeSpan _defaultTtl;
|
||||
|
||||
private long _totalRequests;
|
||||
private long _cacheHits;
|
||||
private long _cacheMisses;
|
||||
|
||||
public MessagingReachabilityFactsOverlayCache(
|
||||
IDistributedCacheFactory cacheFactory,
|
||||
ILogger<MessagingReachabilityFactsOverlayCache> logger,
|
||||
TimeProvider timeProvider,
|
||||
IOptions<PolicyEngineOptions> options)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(cacheFactory);
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
|
||||
|
||||
var cacheOptions = options?.Value.ReachabilityCache ?? new ReachabilityFactsCacheOptions();
|
||||
_defaultTtl = TimeSpan.FromMinutes(cacheOptions.DefaultTtlMinutes);
|
||||
|
||||
_cache = cacheFactory.Create<ReachabilityFact>(new CacheOptions
|
||||
{
|
||||
KeyPrefix = "rf:",
|
||||
DefaultTtl = _defaultTtl,
|
||||
});
|
||||
|
||||
_logger.LogInformation(
|
||||
"Initialized MessagingReachabilityFactsOverlayCache with provider {Provider}, TTL {Ttl}",
|
||||
_cache.ProviderName,
|
||||
_defaultTtl);
|
||||
}
|
||||
|
||||
public async Task<(ReachabilityFact? Fact, bool CacheHit)> GetAsync(
|
||||
ReachabilityFactKey key,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
Interlocked.Increment(ref _totalRequests);
|
||||
|
||||
var cacheKey = key.ToCacheKey();
|
||||
var result = await _cache.GetAsync(cacheKey, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (result.HasValue)
|
||||
{
|
||||
var fact = result.Value;
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
|
||||
// Check if entry is still valid
|
||||
if (!fact.ExpiresAt.HasValue || fact.ExpiresAt.Value > now)
|
||||
{
|
||||
Interlocked.Increment(ref _cacheHits);
|
||||
return (fact, true);
|
||||
}
|
||||
|
||||
// Entry expired - remove it
|
||||
await _cache.InvalidateAsync(cacheKey, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
Interlocked.Increment(ref _cacheMisses);
|
||||
return (null, false);
|
||||
}
|
||||
|
||||
public async Task<ReachabilityFactsBatch> GetBatchAsync(
|
||||
IReadOnlyList<ReachabilityFactKey> keys,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
var found = new Dictionary<ReachabilityFactKey, ReachabilityFact>();
|
||||
var notFound = new List<ReachabilityFactKey>();
|
||||
var cacheHits = 0;
|
||||
var cacheMisses = 0;
|
||||
|
||||
foreach (var key in keys)
|
||||
{
|
||||
var (fact, hit) = await GetAsync(key, cancellationToken).ConfigureAwait(false);
|
||||
if (fact != null)
|
||||
{
|
||||
found[key] = fact;
|
||||
cacheHits++;
|
||||
}
|
||||
else
|
||||
{
|
||||
notFound.Add(key);
|
||||
cacheMisses++;
|
||||
}
|
||||
}
|
||||
|
||||
return new ReachabilityFactsBatch
|
||||
{
|
||||
Found = found,
|
||||
NotFound = notFound,
|
||||
CacheHits = cacheHits,
|
||||
CacheMisses = cacheMisses,
|
||||
};
|
||||
}
|
||||
|
||||
public async Task SetAsync(
|
||||
ReachabilityFactKey key,
|
||||
ReachabilityFact fact,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
var cacheKey = key.ToCacheKey();
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var ttl = fact.ExpiresAt.HasValue && fact.ExpiresAt.Value > now
|
||||
? fact.ExpiresAt.Value - now
|
||||
: _defaultTtl;
|
||||
|
||||
if (ttl <= TimeSpan.Zero)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var options = new CacheEntryOptions { TimeToLive = ttl };
|
||||
await _cache.SetAsync(cacheKey, fact, options, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task SetBatchAsync(
|
||||
IReadOnlyDictionary<ReachabilityFactKey, ReachabilityFact> facts,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
|
||||
foreach (var (key, fact) in facts)
|
||||
{
|
||||
var cacheKey = key.ToCacheKey();
|
||||
var ttl = fact.ExpiresAt.HasValue && fact.ExpiresAt.Value > now
|
||||
? fact.ExpiresAt.Value - now
|
||||
: _defaultTtl;
|
||||
|
||||
if (ttl <= TimeSpan.Zero)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
var options = new CacheEntryOptions { TimeToLive = ttl };
|
||||
await _cache.SetAsync(cacheKey, fact, options, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task InvalidateAsync(ReachabilityFactKey key, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var cacheKey = key.ToCacheKey();
|
||||
await _cache.InvalidateAsync(cacheKey, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task InvalidateTenantAsync(string tenantId, CancellationToken cancellationToken = default)
|
||||
{
|
||||
// Pattern: rf:<tenantId>:*
|
||||
var pattern = $"{tenantId}:*";
|
||||
var count = await _cache.InvalidateByPatternAsync(pattern, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
_logger.LogDebug("Invalidated {Count} cache entries for tenant {TenantId}", count, tenantId);
|
||||
}
|
||||
|
||||
public ReachabilityFactsCacheStats GetStats()
|
||||
{
|
||||
return new ReachabilityFactsCacheStats
|
||||
{
|
||||
TotalRequests = Interlocked.Read(ref _totalRequests),
|
||||
CacheHits = Interlocked.Read(ref _cacheHits),
|
||||
CacheMisses = Interlocked.Read(ref _cacheMisses),
|
||||
ItemCount = 0, // Not available from IDistributedCache
|
||||
EvictionCount = 0, // Not available from IDistributedCache
|
||||
};
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user