Files
git.stella-ops.org/src/__Libraries/StellaOps.HybridLogicalClock/HybridLogicalClock.cs
2026-01-27 08:23:42 +02:00

318 lines
9.7 KiB
C#

// -----------------------------------------------------------------------------
// HybridLogicalClock.cs
// Sprint: SPRINT_20260105_002_001_LB_hlc_core_library
// Task: HLC-003 - Implement HybridLogicalClock class with Tick/Receive/Current
// -----------------------------------------------------------------------------
using Microsoft.Extensions.Logging;
namespace StellaOps.HybridLogicalClock;
/// <summary>
/// Implementation of Hybrid Logical Clock algorithm for deterministic,
/// monotonic timestamp generation across distributed nodes.
/// </summary>
/// <remarks>
/// <para>
/// The HLC algorithm combines physical (wall-clock) time with a logical counter:
/// - Physical time provides approximate real-time ordering
/// - Logical counter ensures monotonicity when physical time doesn't advance
/// - Node ID provides stable tie-breaking for concurrent events
/// </para>
/// <para>
/// On local event or send:
/// <code>
/// l' = l
/// l = max(l, physical_clock())
/// if l == l':
/// c = c + 1
/// else:
/// c = 0
/// return (l, node_id, c)
/// </code>
/// </para>
/// <para>
/// On receive(m_l, m_c):
/// <code>
/// l' = l
/// l = max(l', m_l, physical_clock())
/// if l == l' == m_l:
/// c = max(c, m_c) + 1
/// elif l == l':
/// c = c + 1
/// elif l == m_l:
/// c = m_c + 1
/// else:
/// c = 0
/// return (l, node_id, c)
/// </code>
/// </para>
/// </remarks>
public sealed class HybridLogicalClock : IHybridLogicalClock
{
private readonly TimeProvider _timeProvider;
private readonly string _nodeId;
private readonly IHlcStateStore _stateStore;
private readonly TimeSpan _maxClockSkew;
private readonly ILogger<HybridLogicalClock> _logger;
private long _lastPhysicalTime;
private int _logicalCounter;
private readonly object _lock = new();
/// <inheritdoc/>
public string NodeId => _nodeId;
/// <inheritdoc/>
public HlcTimestamp Current
{
get
{
lock (_lock)
{
return new HlcTimestamp
{
PhysicalTime = _lastPhysicalTime,
NodeId = _nodeId,
LogicalCounter = _logicalCounter
};
}
}
}
/// <summary>
/// Creates a new Hybrid Logical Clock instance.
/// </summary>
/// <param name="timeProvider">Time provider for wall-clock time</param>
/// <param name="nodeId">Unique identifier for this node (e.g., "scheduler-east-1")</param>
/// <param name="stateStore">Persistent storage for clock state</param>
/// <param name="logger">Logger for diagnostics</param>
/// <param name="maxClockSkew">Maximum allowed clock skew (default: 1 minute)</param>
public HybridLogicalClock(
TimeProvider timeProvider,
string nodeId,
IHlcStateStore stateStore,
ILogger<HybridLogicalClock> logger,
TimeSpan? maxClockSkew = null)
{
ArgumentNullException.ThrowIfNull(timeProvider);
ArgumentException.ThrowIfNullOrWhiteSpace(nodeId);
ArgumentNullException.ThrowIfNull(stateStore);
ArgumentNullException.ThrowIfNull(logger);
_timeProvider = timeProvider;
_nodeId = nodeId;
_stateStore = stateStore;
_logger = logger;
_maxClockSkew = maxClockSkew ?? TimeSpan.FromMinutes(1);
// Initialize to 0 so first Tick() will advance physical time and reset counter
// This follows the standard HLC algorithm where l starts at 0
_lastPhysicalTime = 0;
_logicalCounter = 0;
_logger.LogInformation(
"HLC initialized for node {NodeId} with max skew {MaxSkew}",
_nodeId,
_maxClockSkew);
}
/// <summary>
/// Initialize clock from persisted state (call during startup).
/// </summary>
/// <param name="ct">Cancellation token</param>
/// <returns>True if state was recovered, false if starting fresh</returns>
public async Task<bool> InitializeFromStateAsync(CancellationToken ct = default)
{
var persistedState = await _stateStore.LoadAsync(_nodeId, ct);
if (persistedState.HasValue)
{
lock (_lock)
{
// Ensure we start at least at the persisted time
var physicalNow = _timeProvider.GetUtcNow().ToUnixTimeMilliseconds();
_lastPhysicalTime = Math.Max(physicalNow, persistedState.Value.PhysicalTime);
// If we're at the same physical time as persisted, increment counter
if (_lastPhysicalTime == persistedState.Value.PhysicalTime)
{
_logicalCounter = persistedState.Value.LogicalCounter + 1;
}
else
{
_logicalCounter = 0;
}
}
_logger.LogInformation(
"HLC for node {NodeId} recovered from persisted state: {Timestamp}",
_nodeId,
persistedState.Value);
return true;
}
_logger.LogInformation(
"HLC for node {NodeId} starting fresh (no persisted state)",
_nodeId);
return false;
}
/// <inheritdoc/>
public HlcTimestamp Tick()
{
HlcTimestamp timestamp;
lock (_lock)
{
var physicalNow = _timeProvider.GetUtcNow().ToUnixTimeMilliseconds();
if (physicalNow > _lastPhysicalTime)
{
// Physical time advanced - reset counter
_lastPhysicalTime = physicalNow;
_logicalCounter = 0;
}
else
{
// Physical time hasn't advanced - increment counter
_logicalCounter++;
// Check for counter overflow (unlikely but handle it)
if (_logicalCounter < 0)
{
_logger.LogWarning(
"HLC counter overflow for node {NodeId}, forcing time advance",
_nodeId);
// Force time advance to next millisecond
_lastPhysicalTime++;
_logicalCounter = 0;
}
}
timestamp = new HlcTimestamp
{
PhysicalTime = _lastPhysicalTime,
NodeId = _nodeId,
LogicalCounter = _logicalCounter
};
}
// Persist state asynchronously (fire-and-forget with error logging)
_ = PersistStateAsync(timestamp);
return timestamp;
}
/// <inheritdoc/>
public HlcTimestamp Receive(HlcTimestamp remote)
{
HlcTimestamp timestamp;
lock (_lock)
{
var physicalNow = _timeProvider.GetUtcNow().ToUnixTimeMilliseconds();
// Validate clock skew
var skew = TimeSpan.FromMilliseconds(Math.Abs(remote.PhysicalTime - physicalNow));
if (skew > _maxClockSkew)
{
_logger.LogError(
"Clock skew of {Skew} from node {RemoteNode} exceeds threshold {MaxSkew}",
skew,
remote.NodeId,
_maxClockSkew);
throw new HlcClockSkewException(skew, _maxClockSkew);
}
// Find maximum physical time
var maxPhysical = Math.Max(Math.Max(_lastPhysicalTime, remote.PhysicalTime), physicalNow);
// Apply HLC receive algorithm
if (maxPhysical == _lastPhysicalTime && maxPhysical == remote.PhysicalTime)
{
// All three equal - take max counter and increment
_logicalCounter = Math.Max(_logicalCounter, remote.LogicalCounter) + 1;
}
else if (maxPhysical == _lastPhysicalTime)
{
// Our time is max - just increment our counter
_logicalCounter++;
}
else if (maxPhysical == remote.PhysicalTime)
{
// Remote time is max - take their counter and increment
_logicalCounter = remote.LogicalCounter + 1;
}
else
{
// Physical clock is max - reset counter
_logicalCounter = 0;
}
_lastPhysicalTime = maxPhysical;
timestamp = new HlcTimestamp
{
PhysicalTime = _lastPhysicalTime,
NodeId = _nodeId,
LogicalCounter = _logicalCounter
};
}
// Persist state asynchronously
_ = PersistStateAsync(timestamp);
_logger.LogDebug(
"HLC receive from {RemoteNode}: {RemoteTimestamp} -> {LocalTimestamp}",
remote.NodeId,
remote,
timestamp);
return timestamp;
}
private Task PersistStateAsync(HlcTimestamp timestamp)
{
try
{
var saveTask = _stateStore.SaveAsync(timestamp);
if (saveTask.IsCompletedSuccessfully)
{
return Task.CompletedTask;
}
return PersistStateAsyncSlow(saveTask, timestamp);
}
catch (Exception ex)
{
_logger.LogWarning(
ex,
"Failed to persist HLC state for node {NodeId}: {Timestamp}",
_nodeId,
timestamp);
return Task.CompletedTask;
}
}
private async Task PersistStateAsyncSlow(Task saveTask, HlcTimestamp timestamp)
{
try
{
await saveTask.ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogWarning(
ex,
"Failed to persist HLC state for node {NodeId}: {Timestamp}",
_nodeId,
timestamp);
}
}
}