devops folders consolidate
This commit is contained in:
@@ -0,0 +1,421 @@
|
||||
// -----------------------------------------------------------------------------
|
||||
// ValkeyPackageIdfService.cs
|
||||
// Sprint: SPRINT_20260125_001_Concelier_linkset_correlation_v2
|
||||
// Task: CORR-V2-007
|
||||
// Description: Valkey-backed implementation of IPackageIdfService
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
using System.Diagnostics;
|
||||
using System.Globalization;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using StackExchange.Redis;
|
||||
|
||||
namespace StellaOps.Concelier.Cache.Valkey;
|
||||
|
||||
/// <summary>
|
||||
/// Valkey-backed implementation of <see cref="IPackageIdfService"/>.
|
||||
/// Provides caching for package IDF (Inverse Document Frequency) weights
|
||||
/// used in linkset correlation scoring.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// This service caches pre-computed IDF weights with hourly refresh.
|
||||
/// On cache miss, it returns null to signal the caller should use uniform weights.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Key features:
|
||||
/// - Batch operations for efficient multi-package lookups
|
||||
/// - Graceful degradation on Valkey errors (returns null, logs warning)
|
||||
/// - TTL-based expiration with configurable refresh intervals
|
||||
/// - OpenTelemetry metrics for monitoring cache performance
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class ValkeyPackageIdfService : IPackageIdfService
|
||||
{
|
||||
private readonly ConcelierCacheConnectionFactory _connectionFactory;
|
||||
private readonly ConcelierCacheOptions _cacheOptions;
|
||||
private readonly PackageIdfOptions _idfOptions;
|
||||
private readonly PackageIdfMetrics? _metrics;
|
||||
private readonly ILogger<ValkeyPackageIdfService>? _logger;
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new instance of <see cref="ValkeyPackageIdfService"/>.
|
||||
/// </summary>
|
||||
public ValkeyPackageIdfService(
|
||||
ConcelierCacheConnectionFactory connectionFactory,
|
||||
IOptions<ConcelierCacheOptions> cacheOptions,
|
||||
IOptions<PackageIdfOptions> idfOptions,
|
||||
PackageIdfMetrics? metrics = null,
|
||||
ILogger<ValkeyPackageIdfService>? logger = null)
|
||||
{
|
||||
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
|
||||
_cacheOptions = cacheOptions?.Value ?? new ConcelierCacheOptions();
|
||||
_idfOptions = idfOptions?.Value ?? new PackageIdfOptions();
|
||||
_metrics = metrics;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public bool IsEnabled => _cacheOptions.Enabled && _idfOptions.Enabled;
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<double?> GetIdfAsync(string packageName, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (!IsEnabled || string.IsNullOrWhiteSpace(packageName))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
var sw = StartTiming();
|
||||
try
|
||||
{
|
||||
var db = await _connectionFactory.GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
var key = AdvisoryCacheKeys.IdfPackage(packageName, _cacheOptions.KeyPrefix);
|
||||
|
||||
var cached = await db.StringGetAsync(key).ConfigureAwait(false);
|
||||
if (cached.HasValue && double.TryParse((string?)cached, NumberStyles.Float, CultureInfo.InvariantCulture, out var weight))
|
||||
{
|
||||
await db.StringIncrementAsync(AdvisoryCacheKeys.IdfStatsHits(_cacheOptions.KeyPrefix)).ConfigureAwait(false);
|
||||
_metrics?.RecordHit();
|
||||
_metrics?.RecordIdfWeight(weight);
|
||||
return weight;
|
||||
}
|
||||
|
||||
await db.StringIncrementAsync(AdvisoryCacheKeys.IdfStatsMisses(_cacheOptions.KeyPrefix)).ConfigureAwait(false);
|
||||
_metrics?.RecordMiss();
|
||||
return null;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogWarning(ex, "Failed to get IDF for package {PackageName}", packageName);
|
||||
return null; // Graceful degradation
|
||||
}
|
||||
finally
|
||||
{
|
||||
StopTiming(sw, "get");
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<IReadOnlyDictionary<string, double>> GetIdfBatchAsync(
|
||||
IEnumerable<string> packageNames,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
var names = packageNames?.Where(n => !string.IsNullOrWhiteSpace(n)).Distinct().ToArray()
|
||||
?? Array.Empty<string>();
|
||||
|
||||
if (!IsEnabled || names.Length == 0)
|
||||
{
|
||||
return new Dictionary<string, double>();
|
||||
}
|
||||
|
||||
var sw = StartTiming();
|
||||
try
|
||||
{
|
||||
var db = await _connectionFactory.GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
var keys = names.Select(n => (RedisKey)AdvisoryCacheKeys.IdfPackage(n, _cacheOptions.KeyPrefix)).ToArray();
|
||||
|
||||
var values = await db.StringGetAsync(keys).ConfigureAwait(false);
|
||||
|
||||
var result = new Dictionary<string, double>(names.Length);
|
||||
var hits = 0;
|
||||
var misses = 0;
|
||||
|
||||
for (var i = 0; i < names.Length; i++)
|
||||
{
|
||||
if (values[i].HasValue &&
|
||||
double.TryParse((string?)values[i], NumberStyles.Float, CultureInfo.InvariantCulture, out var weight))
|
||||
{
|
||||
result[names[i]] = weight;
|
||||
hits++;
|
||||
_metrics?.RecordIdfWeight(weight);
|
||||
}
|
||||
else
|
||||
{
|
||||
misses++;
|
||||
}
|
||||
}
|
||||
|
||||
if (hits > 0) _metrics?.RecordHits(hits);
|
||||
if (misses > 0) _metrics?.RecordMisses(misses);
|
||||
|
||||
return result;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogWarning(ex, "Failed to batch get IDF for {Count} packages", names.Length);
|
||||
return new Dictionary<string, double>();
|
||||
}
|
||||
finally
|
||||
{
|
||||
StopTiming(sw, "batch_get");
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task SetIdfAsync(string packageName, double idfWeight, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (!IsEnabled || string.IsNullOrWhiteSpace(packageName))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
// Skip caching weights below threshold (very common packages)
|
||||
if (idfWeight < _idfOptions.MinIdfThreshold)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var sw = StartTiming();
|
||||
try
|
||||
{
|
||||
var db = await _connectionFactory.GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
var key = AdvisoryCacheKeys.IdfPackage(packageName, _cacheOptions.KeyPrefix);
|
||||
var value = idfWeight.ToString("F6", CultureInfo.InvariantCulture);
|
||||
|
||||
await db.StringSetAsync(key, value, _idfOptions.IdfTtl).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogWarning(ex, "Failed to set IDF for package {PackageName}", packageName);
|
||||
}
|
||||
finally
|
||||
{
|
||||
StopTiming(sw, "set");
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task SetIdfBatchAsync(
|
||||
IReadOnlyDictionary<string, double> idfWeights,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (!IsEnabled || idfWeights is null || idfWeights.Count == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var sw = StartTiming();
|
||||
try
|
||||
{
|
||||
var db = await _connectionFactory.GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var entries = idfWeights
|
||||
.Where(kv => !string.IsNullOrWhiteSpace(kv.Key) && kv.Value >= _idfOptions.MinIdfThreshold)
|
||||
.Select(kv => new KeyValuePair<RedisKey, RedisValue>(
|
||||
AdvisoryCacheKeys.IdfPackage(kv.Key, _cacheOptions.KeyPrefix),
|
||||
kv.Value.ToString("F6", CultureInfo.InvariantCulture)))
|
||||
.ToArray();
|
||||
|
||||
if (entries.Length == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
// Use pipeline for batch set with TTL
|
||||
var batch = db.CreateBatch();
|
||||
var tasks = new List<Task>(entries.Length);
|
||||
|
||||
foreach (var entry in entries)
|
||||
{
|
||||
tasks.Add(batch.StringSetAsync(entry.Key, entry.Value, _idfOptions.IdfTtl));
|
||||
}
|
||||
|
||||
batch.Execute();
|
||||
await Task.WhenAll(tasks).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogWarning(ex, "Failed to batch set IDF for {Count} packages", idfWeights.Count);
|
||||
}
|
||||
finally
|
||||
{
|
||||
StopTiming(sw, "batch_set");
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task UpdateCorpusStatsAsync(
|
||||
long corpusSize,
|
||||
IReadOnlyDictionary<string, long> documentFrequencies,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (!IsEnabled)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var sw = StartTiming();
|
||||
try
|
||||
{
|
||||
var db = await _connectionFactory.GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
var prefix = _cacheOptions.KeyPrefix;
|
||||
|
||||
// Update corpus size
|
||||
await db.StringSetAsync(
|
||||
AdvisoryCacheKeys.IdfCorpusSize(prefix),
|
||||
corpusSize.ToString(CultureInfo.InvariantCulture),
|
||||
_idfOptions.CorpusStatsTtl).ConfigureAwait(false);
|
||||
|
||||
// Compute and cache IDF weights
|
||||
var idfWeights = new Dictionary<string, double>(documentFrequencies.Count);
|
||||
var maxIdf = 0.0;
|
||||
|
||||
foreach (var (packageName, df) in documentFrequencies)
|
||||
{
|
||||
// IDF formula: log(N / (1 + df))
|
||||
var rawIdf = Math.Log((double)corpusSize / (1 + df));
|
||||
if (rawIdf > maxIdf) maxIdf = rawIdf;
|
||||
idfWeights[packageName] = rawIdf;
|
||||
}
|
||||
|
||||
// Normalize if configured
|
||||
if (_idfOptions.NormalizeScores && maxIdf > 0)
|
||||
{
|
||||
foreach (var key in idfWeights.Keys.ToArray())
|
||||
{
|
||||
idfWeights[key] /= maxIdf;
|
||||
}
|
||||
}
|
||||
|
||||
// Batch set the normalized IDF weights
|
||||
await SetIdfBatchAsync(idfWeights, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// Update document frequencies
|
||||
var batch = db.CreateBatch();
|
||||
var tasks = new List<Task>(documentFrequencies.Count);
|
||||
|
||||
foreach (var (packageName, df) in documentFrequencies)
|
||||
{
|
||||
tasks.Add(batch.StringSetAsync(
|
||||
AdvisoryCacheKeys.IdfDocumentFrequency(packageName, prefix),
|
||||
df.ToString(CultureInfo.InvariantCulture),
|
||||
_idfOptions.CorpusStatsTtl));
|
||||
}
|
||||
|
||||
batch.Execute();
|
||||
await Task.WhenAll(tasks).ConfigureAwait(false);
|
||||
|
||||
// Update last refresh timestamp
|
||||
await db.StringSetAsync(
|
||||
AdvisoryCacheKeys.IdfLastRefresh(prefix),
|
||||
DateTimeOffset.UtcNow.ToString("o", CultureInfo.InvariantCulture),
|
||||
_idfOptions.CorpusStatsTtl).ConfigureAwait(false);
|
||||
|
||||
_metrics?.UpdateCorpusSize(corpusSize);
|
||||
_metrics?.UpdateCachedEntries(documentFrequencies.Count);
|
||||
_metrics?.RecordRefresh(documentFrequencies.Count);
|
||||
|
||||
_logger?.LogInformation(
|
||||
"Updated IDF corpus: size={CorpusSize}, packages={PackageCount}",
|
||||
corpusSize,
|
||||
documentFrequencies.Count);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogError(ex, "Failed to update IDF corpus stats");
|
||||
}
|
||||
finally
|
||||
{
|
||||
StopTiming(sw, "refresh");
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<DateTimeOffset?> GetLastRefreshAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (!IsEnabled)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var db = await _connectionFactory.GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
var key = AdvisoryCacheKeys.IdfLastRefresh(_cacheOptions.KeyPrefix);
|
||||
|
||||
var cached = await db.StringGetAsync(key).ConfigureAwait(false);
|
||||
if (cached.HasValue &&
|
||||
DateTimeOffset.TryParse(cached, CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind, out var timestamp))
|
||||
{
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogWarning(ex, "Failed to get IDF last refresh timestamp");
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task InvalidateAsync(string packageName, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (!IsEnabled || string.IsNullOrWhiteSpace(packageName))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var db = await _connectionFactory.GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
var prefix = _cacheOptions.KeyPrefix;
|
||||
|
||||
await Task.WhenAll(
|
||||
db.KeyDeleteAsync(AdvisoryCacheKeys.IdfPackage(packageName, prefix)),
|
||||
db.KeyDeleteAsync(AdvisoryCacheKeys.IdfDocumentFrequency(packageName, prefix))
|
||||
).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogWarning(ex, "Failed to invalidate IDF for package {PackageName}", packageName);
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task InvalidateAllAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (!IsEnabled)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var db = await _connectionFactory.GetDatabaseAsync(cancellationToken).ConfigureAwait(false);
|
||||
var prefix = _cacheOptions.KeyPrefix;
|
||||
|
||||
// Delete stats keys
|
||||
await Task.WhenAll(
|
||||
db.KeyDeleteAsync(AdvisoryCacheKeys.IdfCorpusSize(prefix)),
|
||||
db.KeyDeleteAsync(AdvisoryCacheKeys.IdfLastRefresh(prefix)),
|
||||
db.KeyDeleteAsync(AdvisoryCacheKeys.IdfStatsHits(prefix)),
|
||||
db.KeyDeleteAsync(AdvisoryCacheKeys.IdfStatsMisses(prefix))
|
||||
).ConfigureAwait(false);
|
||||
|
||||
// Note: Scanning and deleting all idf:pkg:* keys would require SCAN,
|
||||
// which is expensive. For now, rely on TTL expiration.
|
||||
_logger?.LogInformation("Invalidated IDF stats; individual package keys will expire via TTL");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger?.LogError(ex, "Failed to invalidate all IDF cache");
|
||||
}
|
||||
}
|
||||
|
||||
private Stopwatch? StartTiming()
|
||||
{
|
||||
if (_metrics is null) return null;
|
||||
return Stopwatch.StartNew();
|
||||
}
|
||||
|
||||
private void StopTiming(Stopwatch? sw, string operation)
|
||||
{
|
||||
if (sw is null || _metrics is null) return;
|
||||
sw.Stop();
|
||||
_metrics.RecordLatency(sw.Elapsed.TotalMilliseconds, operation);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user