docs consolidation and others
This commit is contained in:
@@ -0,0 +1,228 @@
|
||||
// -----------------------------------------------------------------------------
|
||||
// PostgresHlcStateStore.cs
|
||||
// Sprint: SPRINT_20260105_002_001_LB_hlc_core_library
|
||||
// Task: HLC-005 - Implement PostgresHlcStateStore with atomic update semantics
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Npgsql;
|
||||
|
||||
namespace StellaOps.HybridLogicalClock;
|
||||
|
||||
/// <summary>
|
||||
/// PostgreSQL implementation of HLC state store for production deployments.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Uses atomic upsert with conditional update to ensure:
|
||||
/// - State is never rolled back (only forward updates accepted)
|
||||
/// - Concurrent saves from same node are handled correctly
|
||||
/// - Node restarts resume from persisted state
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Required schema:
|
||||
/// <code>
|
||||
/// CREATE TABLE scheduler.hlc_state (
|
||||
/// node_id TEXT PRIMARY KEY,
|
||||
/// physical_time BIGINT NOT NULL,
|
||||
/// logical_counter INT NOT NULL,
|
||||
/// updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
/// );
|
||||
/// </code>
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class PostgresHlcStateStore : IHlcStateStore
|
||||
{
|
||||
private readonly NpgsqlDataSource _dataSource;
|
||||
private readonly ILogger<PostgresHlcStateStore> _logger;
|
||||
private readonly string _schema;
|
||||
private readonly string _tableName;
|
||||
|
||||
/// <summary>
|
||||
/// Creates a new PostgreSQL HLC state store.
|
||||
/// </summary>
|
||||
/// <param name="dataSource">Npgsql data source</param>
|
||||
/// <param name="logger">Logger</param>
|
||||
/// <param name="schema">Database schema (default: "scheduler")</param>
|
||||
/// <param name="tableName">Table name (default: "hlc_state")</param>
|
||||
public PostgresHlcStateStore(
|
||||
NpgsqlDataSource dataSource,
|
||||
ILogger<PostgresHlcStateStore> logger,
|
||||
string schema = "scheduler",
|
||||
string tableName = "hlc_state")
|
||||
{
|
||||
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_schema = schema;
|
||||
_tableName = tableName;
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public async Task<HlcTimestamp?> LoadAsync(string nodeId, CancellationToken ct = default)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(nodeId);
|
||||
|
||||
var sql = $"""
|
||||
SELECT physical_time, logical_counter
|
||||
FROM {_schema}.{_tableName}
|
||||
WHERE node_id = @node_id
|
||||
""";
|
||||
|
||||
await using var connection = await _dataSource.OpenConnectionAsync(ct);
|
||||
await using var cmd = new NpgsqlCommand(sql, connection);
|
||||
cmd.Parameters.AddWithValue("node_id", nodeId);
|
||||
|
||||
await using var reader = await cmd.ExecuteReaderAsync(ct);
|
||||
|
||||
if (!await reader.ReadAsync(ct))
|
||||
{
|
||||
_logger.LogDebug("No HLC state found for node {NodeId}", nodeId);
|
||||
return null;
|
||||
}
|
||||
|
||||
var physicalTime = reader.GetInt64(0);
|
||||
var logicalCounter = reader.GetInt32(1);
|
||||
|
||||
var timestamp = new HlcTimestamp
|
||||
{
|
||||
PhysicalTime = physicalTime,
|
||||
NodeId = nodeId,
|
||||
LogicalCounter = logicalCounter
|
||||
};
|
||||
|
||||
_logger.LogDebug("Loaded HLC state for node {NodeId}: {Timestamp}", nodeId, timestamp);
|
||||
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public async Task SaveAsync(HlcTimestamp timestamp, CancellationToken ct = default)
|
||||
{
|
||||
// Atomic upsert with conditional update (only update if new state is greater)
|
||||
var sql = $"""
|
||||
INSERT INTO {_schema}.{_tableName} (node_id, physical_time, logical_counter, updated_at)
|
||||
VALUES (@node_id, @physical_time, @logical_counter, NOW())
|
||||
ON CONFLICT (node_id) DO UPDATE SET
|
||||
physical_time = EXCLUDED.physical_time,
|
||||
logical_counter = EXCLUDED.logical_counter,
|
||||
updated_at = NOW()
|
||||
WHERE
|
||||
-- Only update if new timestamp is greater (maintains monotonicity)
|
||||
EXCLUDED.physical_time > {_schema}.{_tableName}.physical_time
|
||||
OR (
|
||||
EXCLUDED.physical_time = {_schema}.{_tableName}.physical_time
|
||||
AND EXCLUDED.logical_counter > {_schema}.{_tableName}.logical_counter
|
||||
)
|
||||
""";
|
||||
|
||||
await using var connection = await _dataSource.OpenConnectionAsync(ct);
|
||||
await using var cmd = new NpgsqlCommand(sql, connection);
|
||||
cmd.Parameters.AddWithValue("node_id", timestamp.NodeId);
|
||||
cmd.Parameters.AddWithValue("physical_time", timestamp.PhysicalTime);
|
||||
cmd.Parameters.AddWithValue("logical_counter", timestamp.LogicalCounter);
|
||||
|
||||
var rowsAffected = await cmd.ExecuteNonQueryAsync(ct);
|
||||
|
||||
if (rowsAffected > 0)
|
||||
{
|
||||
_logger.LogDebug("Saved HLC state for node {NodeId}: {Timestamp}", timestamp.NodeId, timestamp);
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.LogDebug(
|
||||
"HLC state not updated for node {NodeId}: {Timestamp} (existing state is newer)",
|
||||
timestamp.NodeId,
|
||||
timestamp);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Ensures the HLC state table exists in the database.
|
||||
/// </summary>
|
||||
/// <param name="ct">Cancellation token</param>
|
||||
public async Task EnsureTableExistsAsync(CancellationToken ct = default)
|
||||
{
|
||||
var sql = $"""
|
||||
CREATE SCHEMA IF NOT EXISTS {_schema};
|
||||
|
||||
CREATE TABLE IF NOT EXISTS {_schema}.{_tableName} (
|
||||
node_id TEXT PRIMARY KEY,
|
||||
physical_time BIGINT NOT NULL,
|
||||
logical_counter INT NOT NULL,
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_{_tableName}_updated
|
||||
ON {_schema}.{_tableName}(updated_at DESC);
|
||||
""";
|
||||
|
||||
await using var connection = await _dataSource.OpenConnectionAsync(ct);
|
||||
await using var cmd = new NpgsqlCommand(sql, connection);
|
||||
await cmd.ExecuteNonQueryAsync(ct);
|
||||
|
||||
_logger.LogInformation("Ensured HLC state table exists: {Schema}.{Table}", _schema, _tableName);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets all stored states (for monitoring/debugging).
|
||||
/// </summary>
|
||||
/// <param name="ct">Cancellation token</param>
|
||||
/// <returns>Dictionary of node IDs to their HLC states</returns>
|
||||
public async Task<IReadOnlyDictionary<string, HlcTimestamp>> GetAllStatesAsync(CancellationToken ct = default)
|
||||
{
|
||||
var sql = $"""
|
||||
SELECT node_id, physical_time, logical_counter
|
||||
FROM {_schema}.{_tableName}
|
||||
ORDER BY updated_at DESC
|
||||
""";
|
||||
|
||||
await using var connection = await _dataSource.OpenConnectionAsync(ct);
|
||||
await using var cmd = new NpgsqlCommand(sql, connection);
|
||||
|
||||
var results = new Dictionary<string, HlcTimestamp>(StringComparer.Ordinal);
|
||||
|
||||
await using var reader = await cmd.ExecuteReaderAsync(ct);
|
||||
while (await reader.ReadAsync(ct))
|
||||
{
|
||||
var nodeId = reader.GetString(0);
|
||||
results[nodeId] = new HlcTimestamp
|
||||
{
|
||||
NodeId = nodeId,
|
||||
PhysicalTime = reader.GetInt64(1),
|
||||
LogicalCounter = reader.GetInt32(2)
|
||||
};
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Deletes stale HLC states for nodes that haven't updated in the specified duration.
|
||||
/// </summary>
|
||||
/// <param name="staleDuration">Duration after which a state is considered stale</param>
|
||||
/// <param name="ct">Cancellation token</param>
|
||||
/// <returns>Number of deleted states</returns>
|
||||
public async Task<int> CleanupStaleStatesAsync(TimeSpan staleDuration, CancellationToken ct = default)
|
||||
{
|
||||
var sql = $"""
|
||||
DELETE FROM {_schema}.{_tableName}
|
||||
WHERE updated_at < NOW() - @stale_interval
|
||||
""";
|
||||
|
||||
await using var connection = await _dataSource.OpenConnectionAsync(ct);
|
||||
await using var cmd = new NpgsqlCommand(sql, connection);
|
||||
cmd.Parameters.AddWithValue("stale_interval", staleDuration);
|
||||
|
||||
var rowsDeleted = await cmd.ExecuteNonQueryAsync(ct);
|
||||
|
||||
if (rowsDeleted > 0)
|
||||
{
|
||||
_logger.LogInformation(
|
||||
"Cleaned up {Count} stale HLC states (older than {StaleDuration})",
|
||||
rowsDeleted,
|
||||
staleDuration);
|
||||
}
|
||||
|
||||
return rowsDeleted;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user