// ----------------------------------------------------------------------------- // PostgresHlcStateStore.cs // Sprint: SPRINT_20260105_002_001_LB_hlc_core_library // Task: HLC-005 - Implement PostgresHlcStateStore with atomic update semantics // ----------------------------------------------------------------------------- using Microsoft.Extensions.Logging; using Npgsql; namespace StellaOps.HybridLogicalClock; /// /// PostgreSQL implementation of HLC state store for production deployments. /// /// /// /// Uses atomic upsert with conditional update to ensure: /// - State is never rolled back (only forward updates accepted) /// - Concurrent saves from same node are handled correctly /// - Node restarts resume from persisted state /// /// /// Required schema: /// /// CREATE TABLE scheduler.hlc_state ( /// node_id TEXT PRIMARY KEY, /// physical_time BIGINT NOT NULL, /// logical_counter INT NOT NULL, /// updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() /// ); /// /// /// public sealed class PostgresHlcStateStore : IHlcStateStore { private readonly NpgsqlDataSource _dataSource; private readonly ILogger _logger; private readonly string _schema; private readonly string _tableName; /// /// Creates a new PostgreSQL HLC state store. /// /// Npgsql data source /// Logger /// Database schema (default: "scheduler") /// Table name (default: "hlc_state") public PostgresHlcStateStore( NpgsqlDataSource dataSource, ILogger logger, string schema = "scheduler", string tableName = "hlc_state") { _dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _schema = schema; _tableName = tableName; } /// public async Task LoadAsync(string nodeId, CancellationToken ct = default) { ArgumentException.ThrowIfNullOrWhiteSpace(nodeId); var sql = $""" SELECT physical_time, logical_counter FROM {_schema}.{_tableName} WHERE node_id = @node_id """; await using var connection = await _dataSource.OpenConnectionAsync(ct); await using var cmd = new NpgsqlCommand(sql, connection); cmd.Parameters.AddWithValue("node_id", nodeId); await using var reader = await cmd.ExecuteReaderAsync(ct); if (!await reader.ReadAsync(ct)) { _logger.LogDebug("No HLC state found for node {NodeId}", nodeId); return null; } var physicalTime = reader.GetInt64(0); var logicalCounter = reader.GetInt32(1); var timestamp = new HlcTimestamp { PhysicalTime = physicalTime, NodeId = nodeId, LogicalCounter = logicalCounter }; _logger.LogDebug("Loaded HLC state for node {NodeId}: {Timestamp}", nodeId, timestamp); return timestamp; } /// public async Task SaveAsync(HlcTimestamp timestamp, CancellationToken ct = default) { // Atomic upsert with conditional update (only update if new state is greater) var sql = $""" INSERT INTO {_schema}.{_tableName} (node_id, physical_time, logical_counter, updated_at) VALUES (@node_id, @physical_time, @logical_counter, NOW()) ON CONFLICT (node_id) DO UPDATE SET physical_time = EXCLUDED.physical_time, logical_counter = EXCLUDED.logical_counter, updated_at = NOW() WHERE -- Only update if new timestamp is greater (maintains monotonicity) EXCLUDED.physical_time > {_schema}.{_tableName}.physical_time OR ( EXCLUDED.physical_time = {_schema}.{_tableName}.physical_time AND EXCLUDED.logical_counter > {_schema}.{_tableName}.logical_counter ) """; await using var connection = await _dataSource.OpenConnectionAsync(ct); await using var cmd = new NpgsqlCommand(sql, connection); cmd.Parameters.AddWithValue("node_id", timestamp.NodeId); cmd.Parameters.AddWithValue("physical_time", timestamp.PhysicalTime); cmd.Parameters.AddWithValue("logical_counter", timestamp.LogicalCounter); var rowsAffected = await cmd.ExecuteNonQueryAsync(ct); if (rowsAffected > 0) { _logger.LogDebug("Saved HLC state for node {NodeId}: {Timestamp}", timestamp.NodeId, timestamp); } else { _logger.LogDebug( "HLC state not updated for node {NodeId}: {Timestamp} (existing state is newer)", timestamp.NodeId, timestamp); } } /// /// Ensures the HLC state table exists in the database. /// /// Cancellation token public async Task EnsureTableExistsAsync(CancellationToken ct = default) { var sql = $""" CREATE SCHEMA IF NOT EXISTS {_schema}; CREATE TABLE IF NOT EXISTS {_schema}.{_tableName} ( node_id TEXT PRIMARY KEY, physical_time BIGINT NOT NULL, logical_counter INT NOT NULL, updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_{_tableName}_updated ON {_schema}.{_tableName}(updated_at DESC); """; await using var connection = await _dataSource.OpenConnectionAsync(ct); await using var cmd = new NpgsqlCommand(sql, connection); await cmd.ExecuteNonQueryAsync(ct); _logger.LogInformation("Ensured HLC state table exists: {Schema}.{Table}", _schema, _tableName); } /// /// Gets all stored states (for monitoring/debugging). /// /// Cancellation token /// Dictionary of node IDs to their HLC states public async Task> GetAllStatesAsync(CancellationToken ct = default) { var sql = $""" SELECT node_id, physical_time, logical_counter FROM {_schema}.{_tableName} ORDER BY updated_at DESC """; await using var connection = await _dataSource.OpenConnectionAsync(ct); await using var cmd = new NpgsqlCommand(sql, connection); var results = new Dictionary(StringComparer.Ordinal); await using var reader = await cmd.ExecuteReaderAsync(ct); while (await reader.ReadAsync(ct)) { var nodeId = reader.GetString(0); results[nodeId] = new HlcTimestamp { NodeId = nodeId, PhysicalTime = reader.GetInt64(1), LogicalCounter = reader.GetInt32(2) }; } return results; } /// /// Deletes stale HLC states for nodes that haven't updated in the specified duration. /// /// Duration after which a state is considered stale /// Cancellation token /// Number of deleted states public async Task CleanupStaleStatesAsync(TimeSpan staleDuration, CancellationToken ct = default) { var sql = $""" DELETE FROM {_schema}.{_tableName} WHERE updated_at < NOW() - @stale_interval """; await using var connection = await _dataSource.OpenConnectionAsync(ct); await using var cmd = new NpgsqlCommand(sql, connection); cmd.Parameters.AddWithValue("stale_interval", staleDuration); var rowsDeleted = await cmd.ExecuteNonQueryAsync(ct); if (rowsDeleted > 0) { _logger.LogInformation( "Cleaned up {Count} stale HLC states (older than {StaleDuration})", rowsDeleted, staleDuration); } return rowsDeleted; } }