// -----------------------------------------------------------------------------
// PostgresReachabilityCache.cs
// Sprint: SPRINT_3700_0006_0001_incremental_cache (CACHE-004)
// Description: PostgreSQL implementation of IReachabilityCache.
// -----------------------------------------------------------------------------
using Microsoft.Extensions.Logging;
using Npgsql;
using System;
using System.Collections.Generic;
using System.Data;
using System.Threading;
using System.Threading.Tasks;
namespace StellaOps.Scanner.Reachability.Cache;
///
/// PostgreSQL implementation of the reachability cache.
///
public sealed class PostgresReachabilityCache : IReachabilityCache
{
private readonly Services.PostgresReachabilityDataSourceProvider _dataSourceProvider;
private readonly ILogger _logger;
private readonly TimeProvider _timeProvider;
public PostgresReachabilityCache(
string connectionString,
ILogger logger,
TimeProvider? timeProvider = null)
: this(new Services.PostgresReachabilityDataSourceProvider(connectionString), logger, timeProvider)
{
}
internal PostgresReachabilityCache(
Services.PostgresReachabilityDataSourceProvider dataSourceProvider,
ILogger logger,
TimeProvider? timeProvider = null)
{
_dataSourceProvider = dataSourceProvider ?? throw new ArgumentNullException(nameof(dataSourceProvider));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_timeProvider = timeProvider ?? TimeProvider.System;
}
///
public async Task GetAsync(
string serviceId,
string graphHash,
CancellationToken cancellationToken = default)
{
await using var conn = await OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
// Get cache entry
const string entrySql = """
SELECT id, cached_at, expires_at, entry_point_count, sink_count
FROM reach_cache_entries
WHERE service_id = @serviceId AND graph_hash = @graphHash
AND (expires_at IS NULL OR expires_at > NOW())
""";
await using var entryCmd = new NpgsqlCommand(entrySql, conn);
entryCmd.Parameters.AddWithValue("@serviceId", serviceId);
entryCmd.Parameters.AddWithValue("@graphHash", graphHash);
await using var entryReader = await entryCmd.ExecuteReaderAsync(cancellationToken);
if (!await entryReader.ReadAsync(cancellationToken))
{
return null; // Cache miss
}
var entryId = entryReader.GetGuid(0);
var cachedAt = entryReader.GetDateTime(1);
var expiresAt = entryReader.IsDBNull(2) ? (DateTimeOffset?)null : entryReader.GetDateTime(2);
var entryPointCount = entryReader.GetInt32(3);
var sinkCount = entryReader.GetInt32(4);
await entryReader.CloseAsync();
// Get cached pairs
const string pairsSql = """
SELECT entry_method_key, sink_method_key, is_reachable, path_length, confidence, computed_at
FROM reach_cache_pairs
WHERE cache_entry_id = @entryId
""";
await using var pairsCmd = new NpgsqlCommand(pairsSql, conn);
pairsCmd.Parameters.AddWithValue("@entryId", entryId);
var pairs = new List();
await using var pairsReader = await pairsCmd.ExecuteReaderAsync(cancellationToken);
while (await pairsReader.ReadAsync(cancellationToken))
{
pairs.Add(new ReachablePairResult
{
EntryMethodKey = pairsReader.GetString(0),
SinkMethodKey = pairsReader.GetString(1),
IsReachable = pairsReader.GetBoolean(2),
PathLength = pairsReader.IsDBNull(3) ? null : pairsReader.GetInt32(3),
Confidence = pairsReader.GetDouble(4),
ComputedAt = pairsReader.GetDateTime(5)
});
}
// Update stats
await UpdateStatsAsync(conn, serviceId, isHit: true, cancellationToken: cancellationToken);
_logger.LogDebug("Cache hit for {ServiceId}, {PairCount} pairs", serviceId, pairs.Count);
return new CachedReachabilityResult
{
ServiceId = serviceId,
GraphHash = graphHash,
CachedAt = cachedAt,
TimeToLive = expiresAt.HasValue ? expiresAt.Value - _timeProvider.GetUtcNow() : null,
ReachablePairs = pairs,
EntryPointCount = entryPointCount,
SinkCount = sinkCount
};
}
///
public async Task SetAsync(
ReachabilityCacheEntry entry,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(entry);
await using var conn = await OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
await using var tx = await conn.BeginTransactionAsync(cancellationToken);
try
{
// Delete existing entry for this service/hash
const string deleteSql = """
DELETE FROM reach_cache_entries
WHERE service_id = @serviceId AND graph_hash = @graphHash
""";
await using var deleteCmd = new NpgsqlCommand(deleteSql, conn, tx);
deleteCmd.Parameters.AddWithValue("@serviceId", entry.ServiceId);
deleteCmd.Parameters.AddWithValue("@graphHash", entry.GraphHash);
await deleteCmd.ExecuteNonQueryAsync(cancellationToken);
// Insert new cache entry
var reachableCount = 0;
var unreachableCount = 0;
foreach (var pair in entry.ReachablePairs)
{
if (pair.IsReachable) reachableCount++;
else unreachableCount++;
}
var expiresAt = entry.TimeToLive.HasValue
? (object)_timeProvider.GetUtcNow().Add(entry.TimeToLive.Value)
: DBNull.Value;
const string insertEntrySql = """
INSERT INTO reach_cache_entries
(service_id, graph_hash, sbom_hash, entry_point_count, sink_count,
pair_count, reachable_count, unreachable_count, expires_at)
VALUES
(@serviceId, @graphHash, @sbomHash, @entryPointCount, @sinkCount,
@pairCount, @reachableCount, @unreachableCount, @expiresAt)
RETURNING id
""";
await using var insertCmd = new NpgsqlCommand(insertEntrySql, conn, tx);
insertCmd.Parameters.AddWithValue("@serviceId", entry.ServiceId);
insertCmd.Parameters.AddWithValue("@graphHash", entry.GraphHash);
insertCmd.Parameters.AddWithValue("@sbomHash", entry.SbomHash ?? (object)DBNull.Value);
insertCmd.Parameters.AddWithValue("@entryPointCount", entry.EntryPointCount);
insertCmd.Parameters.AddWithValue("@sinkCount", entry.SinkCount);
insertCmd.Parameters.AddWithValue("@pairCount", entry.ReachablePairs.Count);
insertCmd.Parameters.AddWithValue("@reachableCount", reachableCount);
insertCmd.Parameters.AddWithValue("@unreachableCount", unreachableCount);
insertCmd.Parameters.AddWithValue("@expiresAt", expiresAt);
var entryId = (Guid)(await insertCmd.ExecuteScalarAsync(cancellationToken))!;
// Insert pairs in batches
if (entry.ReachablePairs.Count > 0)
{
await InsertPairsBatchAsync(conn, tx, entryId, entry.ReachablePairs, cancellationToken);
}
await tx.CommitAsync(cancellationToken);
// Update stats
await UpdateStatsAsync(conn, entry.ServiceId, isHit: false, entry.GraphHash, cancellationToken);
_logger.LogInformation(
"Cached {PairCount} pairs for {ServiceId}, graph {Hash}",
entry.ReachablePairs.Count, entry.ServiceId, entry.GraphHash);
}
catch
{
await tx.RollbackAsync(cancellationToken);
throw;
}
}
///
public async Task GetReachablePairAsync(
string serviceId,
string entryMethodKey,
string sinkMethodKey,
CancellationToken cancellationToken = default)
{
await using var conn = await OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
const string sql = """
SELECT p.is_reachable, p.path_length, p.confidence, p.computed_at
FROM reach_cache_pairs p
JOIN reach_cache_entries e ON p.cache_entry_id = e.id
WHERE e.service_id = @serviceId
AND p.entry_method_key = @entryKey
AND p.sink_method_key = @sinkKey
AND (e.expires_at IS NULL OR e.expires_at > NOW())
ORDER BY e.cached_at DESC
LIMIT 1
""";
await using var cmd = new NpgsqlCommand(sql, conn);
cmd.Parameters.AddWithValue("@serviceId", serviceId);
cmd.Parameters.AddWithValue("@entryKey", entryMethodKey);
cmd.Parameters.AddWithValue("@sinkKey", sinkMethodKey);
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
if (!await reader.ReadAsync(cancellationToken))
{
return null;
}
return new ReachablePairResult
{
EntryMethodKey = entryMethodKey,
SinkMethodKey = sinkMethodKey,
IsReachable = reader.GetBoolean(0),
PathLength = reader.IsDBNull(1) ? null : reader.GetInt32(1),
Confidence = reader.GetDouble(2),
ComputedAt = reader.GetDateTime(3)
};
}
///
public async Task InvalidateAsync(
string serviceId,
IEnumerable affectedMethodKeys,
CancellationToken cancellationToken = default)
{
await using var conn = await OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
// For now, invalidate entire cache for service
// More granular invalidation would require additional indices
const string sql = """
DELETE FROM reach_cache_entries
WHERE service_id = @serviceId
""";
await using var cmd = new NpgsqlCommand(sql, conn);
cmd.Parameters.AddWithValue("@serviceId", serviceId);
var deleted = await cmd.ExecuteNonQueryAsync(cancellationToken);
if (deleted > 0)
{
await UpdateInvalidationTimeAsync(conn, serviceId, cancellationToken);
_logger.LogInformation("Invalidated {Count} cache entries for {ServiceId}", deleted, serviceId);
}
return deleted;
}
///
public async Task InvalidateAllAsync(
string serviceId,
CancellationToken cancellationToken = default)
{
await InvalidateAsync(serviceId, Array.Empty(), cancellationToken);
}
///
public async Task GetStatisticsAsync(
string serviceId,
CancellationToken cancellationToken = default)
{
await using var conn = await OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
const string sql = """
SELECT total_hits, total_misses, full_recomputes, incremental_computes,
current_graph_hash, last_populated_at, last_invalidated_at
FROM reach_cache_stats
WHERE service_id = @serviceId
""";
await using var cmd = new NpgsqlCommand(sql, conn);
cmd.Parameters.AddWithValue("@serviceId", serviceId);
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
if (!await reader.ReadAsync(cancellationToken))
{
return new CacheStatistics { ServiceId = serviceId };
}
// Get cached pair count
await reader.CloseAsync();
const string countSql = """
SELECT COALESCE(SUM(pair_count), 0)
FROM reach_cache_entries
WHERE service_id = @serviceId AND (expires_at IS NULL OR expires_at > NOW())
""";
await using var countCmd = new NpgsqlCommand(countSql, conn);
countCmd.Parameters.AddWithValue("@serviceId", serviceId);
var pairCount = Convert.ToInt32(await countCmd.ExecuteScalarAsync(cancellationToken));
return new CacheStatistics
{
ServiceId = serviceId,
CachedPairCount = pairCount,
HitCount = reader.GetInt64(0),
MissCount = reader.GetInt64(1),
LastPopulatedAt = reader.IsDBNull(5) ? null : reader.GetDateTime(5),
LastInvalidatedAt = reader.IsDBNull(6) ? null : reader.GetDateTime(6),
CurrentGraphHash = reader.IsDBNull(4) ? null : reader.GetString(4)
};
}
private async Task InsertPairsBatchAsync(
NpgsqlConnection conn,
NpgsqlTransaction tx,
Guid entryId,
IReadOnlyList pairs,
CancellationToken cancellationToken)
{
await using var writer = await conn.BeginBinaryImportAsync(
"COPY reach_cache_pairs (cache_entry_id, entry_method_key, sink_method_key, is_reachable, path_length, confidence, computed_at) FROM STDIN (FORMAT BINARY)",
cancellationToken);
foreach (var pair in pairs)
{
await writer.StartRowAsync(cancellationToken);
await writer.WriteAsync(entryId, NpgsqlTypes.NpgsqlDbType.Uuid, cancellationToken);
await writer.WriteAsync(pair.EntryMethodKey, NpgsqlTypes.NpgsqlDbType.Text, cancellationToken);
await writer.WriteAsync(pair.SinkMethodKey, NpgsqlTypes.NpgsqlDbType.Text, cancellationToken);
await writer.WriteAsync(pair.IsReachable, NpgsqlTypes.NpgsqlDbType.Boolean, cancellationToken);
if (pair.PathLength.HasValue)
await writer.WriteAsync(pair.PathLength.Value, NpgsqlTypes.NpgsqlDbType.Integer, cancellationToken);
else
await writer.WriteNullAsync(cancellationToken);
await writer.WriteAsync(pair.Confidence, NpgsqlTypes.NpgsqlDbType.Double, cancellationToken);
await writer.WriteAsync(pair.ComputedAt.UtcDateTime, NpgsqlTypes.NpgsqlDbType.TimestampTz, cancellationToken);
}
await writer.CompleteAsync(cancellationToken);
}
private static async Task UpdateStatsAsync(
NpgsqlConnection conn,
string serviceId,
bool isHit,
string? graphHash = null,
CancellationToken cancellationToken = default)
{
const string sql = "SELECT update_reach_cache_stats(@serviceId, @isHit, NULL, @graphHash)";
await using var cmd = new NpgsqlCommand(sql, conn);
cmd.Parameters.AddWithValue("@serviceId", serviceId);
cmd.Parameters.AddWithValue("@isHit", isHit);
cmd.Parameters.AddWithValue("@graphHash", graphHash ?? (object)DBNull.Value);
await cmd.ExecuteNonQueryAsync(cancellationToken);
}
private static async Task UpdateInvalidationTimeAsync(
NpgsqlConnection conn,
string serviceId,
CancellationToken cancellationToken)
{
const string sql = """
UPDATE reach_cache_stats
SET last_invalidated_at = NOW(), updated_at = NOW()
WHERE service_id = @serviceId
""";
await using var cmd = new NpgsqlCommand(sql, conn);
cmd.Parameters.AddWithValue("@serviceId", serviceId);
await cmd.ExecuteNonQueryAsync(cancellationToken);
}
private ValueTask OpenConnectionAsync(CancellationToken cancellationToken)
=> _dataSourceProvider.OpenConnectionAsync(cancellationToken);
}