368 lines
9.4 KiB
Markdown
368 lines
9.4 KiB
Markdown
# StellaOps.HybridLogicalClock
|
|
|
|
A Hybrid Logical Clock (HLC) library for deterministic, monotonic job ordering across distributed nodes. HLC combines physical (wall-clock) time with logical counters to provide causally-ordered timestamps even under clock skew.
|
|
|
|
## Overview
|
|
|
|
### Problem Statement
|
|
|
|
Distributed systems face challenges with event ordering:
|
|
- Wall-clock timestamps are susceptible to clock skew between nodes
|
|
- Logical clocks alone don't provide real-time context
|
|
- Concurrent events from different nodes need deterministic tie-breaking
|
|
|
|
### Solution
|
|
|
|
HLC addresses these by combining:
|
|
- **Physical time** (Unix milliseconds UTC) for real-time context
|
|
- **Logical counter** for events at the same millisecond
|
|
- **Node ID** for deterministic tie-breaking across nodes
|
|
|
|
## Installation
|
|
|
|
Reference the project in your `.csproj`:
|
|
|
|
```xml
|
|
<ProjectReference Include="..\__Libraries\StellaOps.HybridLogicalClock\StellaOps.HybridLogicalClock.csproj" />
|
|
```
|
|
|
|
## Quick Start
|
|
|
|
### Basic Usage
|
|
|
|
```csharp
|
|
using StellaOps.HybridLogicalClock;
|
|
|
|
// Create a clock instance
|
|
var clock = new HybridLogicalClock(
|
|
TimeProvider.System,
|
|
nodeId: "scheduler-east-1",
|
|
stateStore: new InMemoryHlcStateStore(),
|
|
logger: logger);
|
|
|
|
// Generate timestamps for local events
|
|
var ts1 = clock.Tick(); // e.g., 1704067200000-scheduler-east-1-000000
|
|
var ts2 = clock.Tick(); // e.g., 1704067200000-scheduler-east-1-000001
|
|
|
|
// Timestamps are always monotonically increasing
|
|
Debug.Assert(ts2 > ts1);
|
|
|
|
// When receiving a message from another node
|
|
var remoteTs = HlcTimestamp.Parse("1704067200100-scheduler-west-1-000005");
|
|
var mergedTs = clock.Receive(remoteTs); // Merges clocks, returns new timestamp > both
|
|
```
|
|
|
|
### Dependency Injection
|
|
|
|
```csharp
|
|
// Program.cs or Startup.cs
|
|
|
|
// Option 1: In-memory state (development/testing)
|
|
services.AddHybridLogicalClock(
|
|
nodeId: Environment.MachineName,
|
|
maxClockSkew: TimeSpan.FromMinutes(1));
|
|
|
|
// Option 2: PostgreSQL persistence (production)
|
|
services.AddHybridLogicalClock<PostgresHlcStateStore>(
|
|
nodeId: Environment.MachineName,
|
|
maxClockSkew: TimeSpan.FromMinutes(1));
|
|
|
|
// Option 3: Custom state store factory
|
|
services.AddHybridLogicalClock(
|
|
nodeId: Environment.MachineName,
|
|
stateStoreFactory: sp => new PostgresHlcStateStore(
|
|
sp.GetRequiredService<NpgsqlDataSource>(),
|
|
sp.GetRequiredService<ILogger<PostgresHlcStateStore>>()),
|
|
maxClockSkew: TimeSpan.FromMinutes(1));
|
|
```
|
|
|
|
Then inject the clock:
|
|
|
|
```csharp
|
|
public class JobScheduler(IHybridLogicalClock clock)
|
|
{
|
|
public void EnqueueJob(Job job)
|
|
{
|
|
job.EnqueuedAt = clock.Tick();
|
|
// Jobs are now globally ordered across all scheduler nodes
|
|
}
|
|
}
|
|
```
|
|
|
|
## Core Types
|
|
|
|
### HlcTimestamp
|
|
|
|
A readonly record struct representing an HLC timestamp:
|
|
|
|
```csharp
|
|
public readonly record struct HlcTimestamp : IComparable<HlcTimestamp>
|
|
{
|
|
public required long PhysicalTime { get; init; } // Unix milliseconds UTC
|
|
public required string NodeId { get; init; } // e.g., "scheduler-east-1"
|
|
public required int LogicalCounter { get; init; } // Events at same millisecond
|
|
}
|
|
```
|
|
|
|
**Key Methods:**
|
|
|
|
| Method | Description |
|
|
|--------|-------------|
|
|
| `ToSortableString()` | Returns `"1704067200000-scheduler-east-1-000042"` |
|
|
| `Parse(string)` | Parses from sortable string format |
|
|
| `TryParse(string, out HlcTimestamp)` | Safe parsing without exceptions |
|
|
| `ToDateTimeOffset()` | Converts physical time to DateTimeOffset |
|
|
| `CompareTo(HlcTimestamp)` | Total ordering comparison |
|
|
| `IsBefore(HlcTimestamp)` | Returns true if causally before |
|
|
| `IsAfter(HlcTimestamp)` | Returns true if causally after |
|
|
| `IsConcurrent(HlcTimestamp)` | True if same time/counter, different nodes |
|
|
|
|
**Comparison Operators:**
|
|
|
|
```csharp
|
|
if (ts1 < ts2) { /* ts1 happened before ts2 */ }
|
|
if (ts1 > ts2) { /* ts1 happened after ts2 */ }
|
|
if (ts1 <= ts2) { /* ts1 happened at or before ts2 */ }
|
|
if (ts1 >= ts2) { /* ts1 happened at or after ts2 */ }
|
|
```
|
|
|
|
### IHybridLogicalClock
|
|
|
|
The main clock interface:
|
|
|
|
```csharp
|
|
public interface IHybridLogicalClock
|
|
{
|
|
HlcTimestamp Tick(); // Generate timestamp for local event
|
|
HlcTimestamp Receive(HlcTimestamp remote); // Merge with remote timestamp
|
|
HlcTimestamp Current { get; } // Current clock state
|
|
string NodeId { get; } // This node's identifier
|
|
}
|
|
```
|
|
|
|
### IHlcStateStore
|
|
|
|
Persistence interface for clock state:
|
|
|
|
```csharp
|
|
public interface IHlcStateStore
|
|
{
|
|
Task<HlcTimestamp?> LoadAsync(string nodeId, CancellationToken ct = default);
|
|
Task SaveAsync(HlcTimestamp timestamp, CancellationToken ct = default);
|
|
}
|
|
```
|
|
|
|
**Implementations:**
|
|
- `InMemoryHlcStateStore` - For development and testing
|
|
- `PostgresHlcStateStore` - For production with durable persistence
|
|
|
|
## Persistence
|
|
|
|
### PostgreSQL Schema
|
|
|
|
```sql
|
|
CREATE TABLE scheduler.hlc_state (
|
|
node_id TEXT PRIMARY KEY,
|
|
physical_time BIGINT NOT NULL,
|
|
logical_counter INT NOT NULL,
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
);
|
|
|
|
CREATE INDEX idx_hlc_state_updated ON scheduler.hlc_state(updated_at DESC);
|
|
```
|
|
|
|
### PostgresHlcStateStore
|
|
|
|
Uses atomic upsert with conditional update to maintain monotonicity:
|
|
|
|
```csharp
|
|
var stateStore = new PostgresHlcStateStore(
|
|
dataSource,
|
|
logger,
|
|
schemaName: "scheduler", // default
|
|
tableName: "hlc_state" // default
|
|
);
|
|
```
|
|
|
|
## Serialization
|
|
|
|
### JSON (System.Text.Json)
|
|
|
|
Two converters are provided:
|
|
|
|
1. **String format** (default) - Compact, sortable:
|
|
```json
|
|
"1704067200000-scheduler-east-1-000042"
|
|
```
|
|
|
|
2. **Object format** - Explicit properties:
|
|
```json
|
|
{
|
|
"physicalTime": 1704067200000,
|
|
"nodeId": "scheduler-east-1",
|
|
"logicalCounter": 42
|
|
}
|
|
```
|
|
|
|
Register converters:
|
|
|
|
```csharp
|
|
var options = new JsonSerializerOptions();
|
|
options.Converters.Add(new HlcTimestampJsonConverter()); // String format
|
|
// or
|
|
options.Converters.Add(new HlcTimestampObjectJsonConverter()); // Object format
|
|
```
|
|
|
|
### Database (Npgsql/Dapper)
|
|
|
|
Extension methods for reading/writing HLC timestamps:
|
|
|
|
```csharp
|
|
// NpgsqlCommand extension
|
|
await using var cmd = dataSource.CreateCommand();
|
|
cmd.CommandText = "INSERT INTO events (timestamp) VALUES (@ts)";
|
|
cmd.AddHlcTimestamp("ts", timestamp);
|
|
await cmd.ExecuteNonQueryAsync();
|
|
|
|
// NpgsqlDataReader extension
|
|
await using var reader = await cmd.ExecuteReaderAsync();
|
|
var ts = reader.GetHlcTimestamp("timestamp");
|
|
var nullableTs = reader.GetHlcTimestampOrNull("timestamp");
|
|
```
|
|
|
|
Dapper type handlers:
|
|
|
|
```csharp
|
|
// Register handlers at startup
|
|
HlcTypeHandlerRegistration.Register(services);
|
|
|
|
// Then use normally with Dapper
|
|
var results = await connection.QueryAsync<MyEntity>(
|
|
"SELECT * FROM events WHERE timestamp > @since",
|
|
new { since = sinceTimestamp });
|
|
```
|
|
|
|
## Clock Skew Handling
|
|
|
|
The clock detects and rejects excessive clock skew:
|
|
|
|
```csharp
|
|
var clock = new HybridLogicalClock(
|
|
timeProvider,
|
|
nodeId,
|
|
stateStore,
|
|
logger,
|
|
maxClockSkew: TimeSpan.FromMinutes(1)); // Default: 1 minute
|
|
|
|
try
|
|
{
|
|
var merged = clock.Receive(remoteTimestamp);
|
|
}
|
|
catch (HlcClockSkewException ex)
|
|
{
|
|
// Remote clock differs by more than maxClockSkew
|
|
logger.LogWarning(
|
|
"Clock skew detected: {Actual} exceeds threshold {Max}",
|
|
ex.ActualSkew, ex.MaxAllowedSkew);
|
|
}
|
|
```
|
|
|
|
## Recovery from Restart
|
|
|
|
After a node restart, initialize the clock from persisted state:
|
|
|
|
```csharp
|
|
var clock = new HybridLogicalClock(timeProvider, nodeId, stateStore, logger);
|
|
|
|
// Load last persisted state
|
|
bool recovered = await clock.InitializeFromStateAsync();
|
|
if (recovered)
|
|
{
|
|
logger.LogInformation("Clock recovered from state: {Current}", clock.Current);
|
|
}
|
|
|
|
// First tick after restart is guaranteed > last persisted tick
|
|
var ts = clock.Tick();
|
|
```
|
|
|
|
## Testing
|
|
|
|
### FakeTimeProvider
|
|
|
|
For deterministic testing, use a fake time provider:
|
|
|
|
```csharp
|
|
public class FakeTimeProvider : TimeProvider
|
|
{
|
|
private DateTimeOffset _now = DateTimeOffset.UtcNow;
|
|
|
|
public override DateTimeOffset GetUtcNow() => _now;
|
|
|
|
public void SetUtcNow(DateTimeOffset value) => _now = value;
|
|
public void Advance(TimeSpan duration) => _now = _now.Add(duration);
|
|
}
|
|
|
|
[Fact]
|
|
public void Tick_Advances_Counter()
|
|
{
|
|
var timeProvider = new FakeTimeProvider();
|
|
var clock = new HybridLogicalClock(timeProvider, "test", new InMemoryHlcStateStore(), logger);
|
|
|
|
var ts1 = clock.Tick();
|
|
var ts2 = clock.Tick();
|
|
|
|
Assert.Equal(0, ts1.LogicalCounter);
|
|
Assert.Equal(1, ts2.LogicalCounter);
|
|
}
|
|
```
|
|
|
|
## Algorithm
|
|
|
|
### On Local Event (Tick)
|
|
|
|
```
|
|
l' = l
|
|
l = max(l, physical_clock())
|
|
if l == l':
|
|
c = c + 1
|
|
else:
|
|
c = 0
|
|
return (l, node_id, c)
|
|
```
|
|
|
|
### On Receive
|
|
|
|
```
|
|
l' = l
|
|
l = max(l', m_l, physical_clock())
|
|
if l == l' == m_l:
|
|
c = max(c, m_c) + 1
|
|
elif l == l':
|
|
c = c + 1
|
|
elif l == m_l:
|
|
c = m_c + 1
|
|
else:
|
|
c = 0
|
|
return (l, node_id, c)
|
|
```
|
|
|
|
## Performance
|
|
|
|
Benchmarks on typical hardware:
|
|
|
|
| Operation | Throughput |
|
|
|-----------|------------|
|
|
| Tick (single-thread) | > 100,000/sec |
|
|
| Tick (multi-thread) | > 50,000/sec |
|
|
| Receive | > 50,000/sec |
|
|
| Parse | > 500,000/sec |
|
|
| ToSortableString | > 500,000/sec |
|
|
| CompareTo | > 10,000,000/sec |
|
|
|
|
Memory: `HlcTimestamp` is a value type (struct) with minimal allocation.
|
|
|
|
## References
|
|
|
|
- [Logical Physical Clocks and Consistent Snapshots in Globally Distributed Databases](https://cse.buffalo.edu/tech-reports/2014-04.pdf) - Kulkarni et al.
|
|
- [Time, Clocks, and the Ordering of Events in a Distributed System](https://lamport.azurewebsites.net/pubs/time-clocks.pdf) - Lamport
|