Add Canonical JSON serialization library with tests and documentation
- Implemented CanonJson class for deterministic JSON serialization and hashing. - Added unit tests for CanonJson functionality, covering various scenarios including key sorting, handling of nested objects, arrays, and special characters. - Created project files for the Canonical JSON library and its tests, including necessary package references. - Added README.md for library usage and API reference. - Introduced RabbitMqIntegrationFactAttribute for conditional RabbitMQ integration tests.
This commit is contained in:
@@ -0,0 +1,391 @@
|
||||
// -----------------------------------------------------------------------------
|
||||
// PostgresReachabilityCache.cs
|
||||
// Sprint: SPRINT_3700_0006_0001_incremental_cache (CACHE-004)
|
||||
// Description: PostgreSQL implementation of IReachabilityCache.
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Data;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Npgsql;
|
||||
|
||||
namespace StellaOps.Scanner.Reachability.Cache;
|
||||
|
||||
/// <summary>
|
||||
/// PostgreSQL implementation of the reachability cache.
|
||||
/// </summary>
|
||||
public sealed class PostgresReachabilityCache : IReachabilityCache
|
||||
{
|
||||
private readonly string _connectionString;
|
||||
private readonly ILogger<PostgresReachabilityCache> _logger;
|
||||
|
||||
public PostgresReachabilityCache(
|
||||
string connectionString,
|
||||
ILogger<PostgresReachabilityCache> logger)
|
||||
{
|
||||
_connectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<CachedReachabilityResult?> GetAsync(
|
||||
string serviceId,
|
||||
string graphHash,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
await using var conn = new NpgsqlConnection(_connectionString);
|
||||
await conn.OpenAsync(cancellationToken);
|
||||
|
||||
// Get cache entry
|
||||
const string entrySql = """
|
||||
SELECT id, cached_at, expires_at, entry_point_count, sink_count
|
||||
FROM reach_cache_entries
|
||||
WHERE service_id = @serviceId AND graph_hash = @graphHash
|
||||
AND (expires_at IS NULL OR expires_at > NOW())
|
||||
""";
|
||||
|
||||
await using var entryCmd = new NpgsqlCommand(entrySql, conn);
|
||||
entryCmd.Parameters.AddWithValue("@serviceId", serviceId);
|
||||
entryCmd.Parameters.AddWithValue("@graphHash", graphHash);
|
||||
|
||||
await using var entryReader = await entryCmd.ExecuteReaderAsync(cancellationToken);
|
||||
|
||||
if (!await entryReader.ReadAsync(cancellationToken))
|
||||
{
|
||||
return null; // Cache miss
|
||||
}
|
||||
|
||||
var entryId = entryReader.GetGuid(0);
|
||||
var cachedAt = entryReader.GetDateTime(1);
|
||||
var expiresAt = entryReader.IsDBNull(2) ? (DateTimeOffset?)null : entryReader.GetDateTime(2);
|
||||
var entryPointCount = entryReader.GetInt32(3);
|
||||
var sinkCount = entryReader.GetInt32(4);
|
||||
|
||||
await entryReader.CloseAsync();
|
||||
|
||||
// Get cached pairs
|
||||
const string pairsSql = """
|
||||
SELECT entry_method_key, sink_method_key, is_reachable, path_length, confidence, computed_at
|
||||
FROM reach_cache_pairs
|
||||
WHERE cache_entry_id = @entryId
|
||||
""";
|
||||
|
||||
await using var pairsCmd = new NpgsqlCommand(pairsSql, conn);
|
||||
pairsCmd.Parameters.AddWithValue("@entryId", entryId);
|
||||
|
||||
var pairs = new List<ReachablePairResult>();
|
||||
await using var pairsReader = await pairsCmd.ExecuteReaderAsync(cancellationToken);
|
||||
|
||||
while (await pairsReader.ReadAsync(cancellationToken))
|
||||
{
|
||||
pairs.Add(new ReachablePairResult
|
||||
{
|
||||
EntryMethodKey = pairsReader.GetString(0),
|
||||
SinkMethodKey = pairsReader.GetString(1),
|
||||
IsReachable = pairsReader.GetBoolean(2),
|
||||
PathLength = pairsReader.IsDBNull(3) ? null : pairsReader.GetInt32(3),
|
||||
Confidence = pairsReader.GetDouble(4),
|
||||
ComputedAt = pairsReader.GetDateTime(5)
|
||||
});
|
||||
}
|
||||
|
||||
// Update stats
|
||||
await UpdateStatsAsync(conn, serviceId, isHit: true, cancellationToken: cancellationToken);
|
||||
|
||||
_logger.LogDebug("Cache hit for {ServiceId}, {PairCount} pairs", serviceId, pairs.Count);
|
||||
|
||||
return new CachedReachabilityResult
|
||||
{
|
||||
ServiceId = serviceId,
|
||||
GraphHash = graphHash,
|
||||
CachedAt = cachedAt,
|
||||
TimeToLive = expiresAt.HasValue ? expiresAt.Value - DateTimeOffset.UtcNow : null,
|
||||
ReachablePairs = pairs,
|
||||
EntryPointCount = entryPointCount,
|
||||
SinkCount = sinkCount
|
||||
};
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task SetAsync(
|
||||
ReachabilityCacheEntry entry,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(entry);
|
||||
|
||||
await using var conn = new NpgsqlConnection(_connectionString);
|
||||
await conn.OpenAsync(cancellationToken);
|
||||
await using var tx = await conn.BeginTransactionAsync(cancellationToken);
|
||||
|
||||
try
|
||||
{
|
||||
// Delete existing entry for this service/hash
|
||||
const string deleteSql = """
|
||||
DELETE FROM reach_cache_entries
|
||||
WHERE service_id = @serviceId AND graph_hash = @graphHash
|
||||
""";
|
||||
|
||||
await using var deleteCmd = new NpgsqlCommand(deleteSql, conn, tx);
|
||||
deleteCmd.Parameters.AddWithValue("@serviceId", entry.ServiceId);
|
||||
deleteCmd.Parameters.AddWithValue("@graphHash", entry.GraphHash);
|
||||
await deleteCmd.ExecuteNonQueryAsync(cancellationToken);
|
||||
|
||||
// Insert new cache entry
|
||||
var reachableCount = 0;
|
||||
var unreachableCount = 0;
|
||||
foreach (var pair in entry.ReachablePairs)
|
||||
{
|
||||
if (pair.IsReachable) reachableCount++;
|
||||
else unreachableCount++;
|
||||
}
|
||||
|
||||
var expiresAt = entry.TimeToLive.HasValue
|
||||
? (object)DateTimeOffset.UtcNow.Add(entry.TimeToLive.Value)
|
||||
: DBNull.Value;
|
||||
|
||||
const string insertEntrySql = """
|
||||
INSERT INTO reach_cache_entries
|
||||
(service_id, graph_hash, sbom_hash, entry_point_count, sink_count,
|
||||
pair_count, reachable_count, unreachable_count, expires_at)
|
||||
VALUES
|
||||
(@serviceId, @graphHash, @sbomHash, @entryPointCount, @sinkCount,
|
||||
@pairCount, @reachableCount, @unreachableCount, @expiresAt)
|
||||
RETURNING id
|
||||
""";
|
||||
|
||||
await using var insertCmd = new NpgsqlCommand(insertEntrySql, conn, tx);
|
||||
insertCmd.Parameters.AddWithValue("@serviceId", entry.ServiceId);
|
||||
insertCmd.Parameters.AddWithValue("@graphHash", entry.GraphHash);
|
||||
insertCmd.Parameters.AddWithValue("@sbomHash", entry.SbomHash ?? (object)DBNull.Value);
|
||||
insertCmd.Parameters.AddWithValue("@entryPointCount", entry.EntryPointCount);
|
||||
insertCmd.Parameters.AddWithValue("@sinkCount", entry.SinkCount);
|
||||
insertCmd.Parameters.AddWithValue("@pairCount", entry.ReachablePairs.Count);
|
||||
insertCmd.Parameters.AddWithValue("@reachableCount", reachableCount);
|
||||
insertCmd.Parameters.AddWithValue("@unreachableCount", unreachableCount);
|
||||
insertCmd.Parameters.AddWithValue("@expiresAt", expiresAt);
|
||||
|
||||
var entryId = (Guid)(await insertCmd.ExecuteScalarAsync(cancellationToken))!;
|
||||
|
||||
// Insert pairs in batches
|
||||
if (entry.ReachablePairs.Count > 0)
|
||||
{
|
||||
await InsertPairsBatchAsync(conn, tx, entryId, entry.ReachablePairs, cancellationToken);
|
||||
}
|
||||
|
||||
await tx.CommitAsync(cancellationToken);
|
||||
|
||||
// Update stats
|
||||
await UpdateStatsAsync(conn, entry.ServiceId, isHit: false, entry.GraphHash, cancellationToken);
|
||||
|
||||
_logger.LogInformation(
|
||||
"Cached {PairCount} pairs for {ServiceId}, graph {Hash}",
|
||||
entry.ReachablePairs.Count, entry.ServiceId, entry.GraphHash);
|
||||
}
|
||||
catch
|
||||
{
|
||||
await tx.RollbackAsync(cancellationToken);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<ReachablePairResult?> GetReachablePairAsync(
|
||||
string serviceId,
|
||||
string entryMethodKey,
|
||||
string sinkMethodKey,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
await using var conn = new NpgsqlConnection(_connectionString);
|
||||
await conn.OpenAsync(cancellationToken);
|
||||
|
||||
const string sql = """
|
||||
SELECT p.is_reachable, p.path_length, p.confidence, p.computed_at
|
||||
FROM reach_cache_pairs p
|
||||
JOIN reach_cache_entries e ON p.cache_entry_id = e.id
|
||||
WHERE e.service_id = @serviceId
|
||||
AND p.entry_method_key = @entryKey
|
||||
AND p.sink_method_key = @sinkKey
|
||||
AND (e.expires_at IS NULL OR e.expires_at > NOW())
|
||||
ORDER BY e.cached_at DESC
|
||||
LIMIT 1
|
||||
""";
|
||||
|
||||
await using var cmd = new NpgsqlCommand(sql, conn);
|
||||
cmd.Parameters.AddWithValue("@serviceId", serviceId);
|
||||
cmd.Parameters.AddWithValue("@entryKey", entryMethodKey);
|
||||
cmd.Parameters.AddWithValue("@sinkKey", sinkMethodKey);
|
||||
|
||||
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
|
||||
|
||||
if (!await reader.ReadAsync(cancellationToken))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
return new ReachablePairResult
|
||||
{
|
||||
EntryMethodKey = entryMethodKey,
|
||||
SinkMethodKey = sinkMethodKey,
|
||||
IsReachable = reader.GetBoolean(0),
|
||||
PathLength = reader.IsDBNull(1) ? null : reader.GetInt32(1),
|
||||
Confidence = reader.GetDouble(2),
|
||||
ComputedAt = reader.GetDateTime(3)
|
||||
};
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<int> InvalidateAsync(
|
||||
string serviceId,
|
||||
IEnumerable<string> affectedMethodKeys,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
await using var conn = new NpgsqlConnection(_connectionString);
|
||||
await conn.OpenAsync(cancellationToken);
|
||||
|
||||
// For now, invalidate entire cache for service
|
||||
// More granular invalidation would require additional indices
|
||||
const string sql = """
|
||||
DELETE FROM reach_cache_entries
|
||||
WHERE service_id = @serviceId
|
||||
""";
|
||||
|
||||
await using var cmd = new NpgsqlCommand(sql, conn);
|
||||
cmd.Parameters.AddWithValue("@serviceId", serviceId);
|
||||
|
||||
var deleted = await cmd.ExecuteNonQueryAsync(cancellationToken);
|
||||
|
||||
if (deleted > 0)
|
||||
{
|
||||
await UpdateInvalidationTimeAsync(conn, serviceId, cancellationToken);
|
||||
_logger.LogInformation("Invalidated {Count} cache entries for {ServiceId}", deleted, serviceId);
|
||||
}
|
||||
|
||||
return deleted;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task InvalidateAllAsync(
|
||||
string serviceId,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
await InvalidateAsync(serviceId, Array.Empty<string>(), cancellationToken);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<CacheStatistics> GetStatisticsAsync(
|
||||
string serviceId,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
await using var conn = new NpgsqlConnection(_connectionString);
|
||||
await conn.OpenAsync(cancellationToken);
|
||||
|
||||
const string sql = """
|
||||
SELECT total_hits, total_misses, full_recomputes, incremental_computes,
|
||||
current_graph_hash, last_populated_at, last_invalidated_at
|
||||
FROM reach_cache_stats
|
||||
WHERE service_id = @serviceId
|
||||
""";
|
||||
|
||||
await using var cmd = new NpgsqlCommand(sql, conn);
|
||||
cmd.Parameters.AddWithValue("@serviceId", serviceId);
|
||||
|
||||
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
|
||||
|
||||
if (!await reader.ReadAsync(cancellationToken))
|
||||
{
|
||||
return new CacheStatistics { ServiceId = serviceId };
|
||||
}
|
||||
|
||||
// Get cached pair count
|
||||
await reader.CloseAsync();
|
||||
|
||||
const string countSql = """
|
||||
SELECT COALESCE(SUM(pair_count), 0)
|
||||
FROM reach_cache_entries
|
||||
WHERE service_id = @serviceId AND (expires_at IS NULL OR expires_at > NOW())
|
||||
""";
|
||||
|
||||
await using var countCmd = new NpgsqlCommand(countSql, conn);
|
||||
countCmd.Parameters.AddWithValue("@serviceId", serviceId);
|
||||
var pairCount = Convert.ToInt32(await countCmd.ExecuteScalarAsync(cancellationToken));
|
||||
|
||||
return new CacheStatistics
|
||||
{
|
||||
ServiceId = serviceId,
|
||||
CachedPairCount = pairCount,
|
||||
HitCount = reader.GetInt64(0),
|
||||
MissCount = reader.GetInt64(1),
|
||||
LastPopulatedAt = reader.IsDBNull(5) ? null : reader.GetDateTime(5),
|
||||
LastInvalidatedAt = reader.IsDBNull(6) ? null : reader.GetDateTime(6),
|
||||
CurrentGraphHash = reader.IsDBNull(4) ? null : reader.GetString(4)
|
||||
};
|
||||
}
|
||||
|
||||
private async Task InsertPairsBatchAsync(
|
||||
NpgsqlConnection conn,
|
||||
NpgsqlTransaction tx,
|
||||
Guid entryId,
|
||||
IReadOnlyList<ReachablePairResult> pairs,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
await using var writer = await conn.BeginBinaryImportAsync(
|
||||
"COPY reach_cache_pairs (cache_entry_id, entry_method_key, sink_method_key, is_reachable, path_length, confidence, computed_at) FROM STDIN (FORMAT BINARY)",
|
||||
cancellationToken);
|
||||
|
||||
foreach (var pair in pairs)
|
||||
{
|
||||
await writer.StartRowAsync(cancellationToken);
|
||||
await writer.WriteAsync(entryId, NpgsqlTypes.NpgsqlDbType.Uuid, cancellationToken);
|
||||
await writer.WriteAsync(pair.EntryMethodKey, NpgsqlTypes.NpgsqlDbType.Text, cancellationToken);
|
||||
await writer.WriteAsync(pair.SinkMethodKey, NpgsqlTypes.NpgsqlDbType.Text, cancellationToken);
|
||||
await writer.WriteAsync(pair.IsReachable, NpgsqlTypes.NpgsqlDbType.Boolean, cancellationToken);
|
||||
|
||||
if (pair.PathLength.HasValue)
|
||||
await writer.WriteAsync(pair.PathLength.Value, NpgsqlTypes.NpgsqlDbType.Integer, cancellationToken);
|
||||
else
|
||||
await writer.WriteNullAsync(cancellationToken);
|
||||
|
||||
await writer.WriteAsync(pair.Confidence, NpgsqlTypes.NpgsqlDbType.Double, cancellationToken);
|
||||
await writer.WriteAsync(pair.ComputedAt.UtcDateTime, NpgsqlTypes.NpgsqlDbType.TimestampTz, cancellationToken);
|
||||
}
|
||||
|
||||
await writer.CompleteAsync(cancellationToken);
|
||||
}
|
||||
|
||||
private static async Task UpdateStatsAsync(
|
||||
NpgsqlConnection conn,
|
||||
string serviceId,
|
||||
bool isHit,
|
||||
string? graphHash = null,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
const string sql = "SELECT update_reach_cache_stats(@serviceId, @isHit, NULL, @graphHash)";
|
||||
|
||||
await using var cmd = new NpgsqlCommand(sql, conn);
|
||||
cmd.Parameters.AddWithValue("@serviceId", serviceId);
|
||||
cmd.Parameters.AddWithValue("@isHit", isHit);
|
||||
cmd.Parameters.AddWithValue("@graphHash", graphHash ?? (object)DBNull.Value);
|
||||
|
||||
await cmd.ExecuteNonQueryAsync(cancellationToken);
|
||||
}
|
||||
|
||||
private static async Task UpdateInvalidationTimeAsync(
|
||||
NpgsqlConnection conn,
|
||||
string serviceId,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
const string sql = """
|
||||
UPDATE reach_cache_stats
|
||||
SET last_invalidated_at = NOW(), updated_at = NOW()
|
||||
WHERE service_id = @serviceId
|
||||
""";
|
||||
|
||||
await using var cmd = new NpgsqlCommand(sql, conn);
|
||||
cmd.Parameters.AddWithValue("@serviceId", serviceId);
|
||||
|
||||
await cmd.ExecuteNonQueryAsync(cancellationToken);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user