173 lines
5.7 KiB
C#
173 lines
5.7 KiB
C#
// <copyright file="OfflineHlcManager.cs" company="StellaOps">
|
|
// Copyright (c) StellaOps. Licensed under BUSL-1.1.
|
|
// </copyright>
|
|
|
|
using System.Security.Cryptography;
|
|
using System.Text;
|
|
using Microsoft.Extensions.Logging;
|
|
using StellaOps.AirGap.Sync.Models;
|
|
using StellaOps.AirGap.Sync.Stores;
|
|
using StellaOps.Canonical.Json;
|
|
using StellaOps.Determinism;
|
|
using StellaOps.HybridLogicalClock;
|
|
|
|
namespace StellaOps.AirGap.Sync.Services;
|
|
|
|
/// <summary>
|
|
/// Interface for offline HLC management.
|
|
/// </summary>
|
|
public interface IOfflineHlcManager
|
|
{
|
|
/// <summary>
|
|
/// Enqueues a job locally while offline, maintaining the local chain.
|
|
/// </summary>
|
|
/// <typeparam name="T">The payload type.</typeparam>
|
|
/// <param name="payload">The job payload.</param>
|
|
/// <param name="idempotencyKey">The idempotency key for deterministic job ID.</param>
|
|
/// <param name="partitionKey">Optional partition key.</param>
|
|
/// <param name="cancellationToken">Cancellation token.</param>
|
|
/// <returns>The enqueue result.</returns>
|
|
Task<OfflineEnqueueResult> EnqueueOfflineAsync<T>(
|
|
T payload,
|
|
string idempotencyKey,
|
|
string? partitionKey = null,
|
|
CancellationToken cancellationToken = default) where T : notnull;
|
|
|
|
/// <summary>
|
|
/// Gets the current node's job log for export.
|
|
/// </summary>
|
|
/// <param name="cancellationToken">Cancellation token.</param>
|
|
/// <returns>The node job log, or null if empty.</returns>
|
|
Task<NodeJobLog?> GetNodeJobLogAsync(CancellationToken cancellationToken = default);
|
|
|
|
/// <summary>
|
|
/// Gets the node ID.
|
|
/// </summary>
|
|
string NodeId { get; }
|
|
}
|
|
|
|
/// <summary>
|
|
/// Manages HLC operations for offline/air-gap scenarios.
|
|
/// </summary>
|
|
public sealed class OfflineHlcManager : IOfflineHlcManager
|
|
{
|
|
private readonly IHybridLogicalClock _hlc;
|
|
private readonly IOfflineJobLogStore _jobLogStore;
|
|
private readonly IGuidProvider _guidProvider;
|
|
private readonly ILogger<OfflineHlcManager> _logger;
|
|
|
|
/// <summary>
|
|
/// Initializes a new instance of the <see cref="OfflineHlcManager"/> class.
|
|
/// </summary>
|
|
public OfflineHlcManager(
|
|
IHybridLogicalClock hlc,
|
|
IOfflineJobLogStore jobLogStore,
|
|
IGuidProvider guidProvider,
|
|
ILogger<OfflineHlcManager> logger)
|
|
{
|
|
_hlc = hlc ?? throw new ArgumentNullException(nameof(hlc));
|
|
_jobLogStore = jobLogStore ?? throw new ArgumentNullException(nameof(jobLogStore));
|
|
_guidProvider = guidProvider ?? throw new ArgumentNullException(nameof(guidProvider));
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
}
|
|
|
|
/// <inheritdoc/>
|
|
public string NodeId => _hlc.NodeId;
|
|
|
|
/// <inheritdoc/>
|
|
public async Task<OfflineEnqueueResult> EnqueueOfflineAsync<T>(
|
|
T payload,
|
|
string idempotencyKey,
|
|
string? partitionKey = null,
|
|
CancellationToken cancellationToken = default) where T : notnull
|
|
{
|
|
ArgumentNullException.ThrowIfNull(payload);
|
|
ArgumentException.ThrowIfNullOrWhiteSpace(idempotencyKey);
|
|
|
|
// 1. Generate HLC timestamp
|
|
var tHlc = _hlc.Tick();
|
|
|
|
// 2. Compute deterministic job ID from idempotency key
|
|
var jobId = ComputeDeterministicJobId(idempotencyKey);
|
|
|
|
// 3. Serialize and hash payload
|
|
var payloadJson = CanonJson.Serialize(payload);
|
|
var payloadHash = SHA256.HashData(Encoding.UTF8.GetBytes(payloadJson));
|
|
|
|
// 4. Get previous chain link
|
|
var prevLink = await _jobLogStore.GetLastLinkAsync(NodeId, cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
// 5. Compute chain link
|
|
var link = ComputeLink(prevLink, jobId, tHlc, payloadHash);
|
|
|
|
// 6. Create and store entry
|
|
var entry = new OfflineJobLogEntry
|
|
{
|
|
NodeId = NodeId,
|
|
THlc = tHlc,
|
|
JobId = jobId,
|
|
PartitionKey = partitionKey,
|
|
Payload = payloadJson,
|
|
PayloadHash = payloadHash,
|
|
PrevLink = prevLink,
|
|
Link = link,
|
|
EnqueuedAt = DateTimeOffset.UtcNow
|
|
};
|
|
|
|
await _jobLogStore.AppendAsync(entry, cancellationToken).ConfigureAwait(false);
|
|
|
|
_logger.LogInformation(
|
|
"Enqueued offline job {JobId} with HLC {THlc} on node {NodeId}",
|
|
jobId, tHlc, NodeId);
|
|
|
|
return new OfflineEnqueueResult
|
|
{
|
|
THlc = tHlc,
|
|
JobId = jobId,
|
|
Link = link,
|
|
NodeId = NodeId
|
|
};
|
|
}
|
|
|
|
/// <inheritdoc/>
|
|
public Task<NodeJobLog?> GetNodeJobLogAsync(CancellationToken cancellationToken = default)
|
|
=> _jobLogStore.GetNodeJobLogAsync(NodeId, cancellationToken);
|
|
|
|
/// <summary>
|
|
/// Computes deterministic job ID from idempotency key.
|
|
/// </summary>
|
|
private Guid ComputeDeterministicJobId(string idempotencyKey)
|
|
{
|
|
var hash = SHA256.HashData(Encoding.UTF8.GetBytes(idempotencyKey));
|
|
// Use first 16 bytes of SHA-256 as deterministic GUID
|
|
return new Guid(hash.AsSpan(0, 16));
|
|
}
|
|
|
|
/// <summary>
|
|
/// Computes chain link: Hash(prev_link || job_id || t_hlc || payload_hash).
|
|
/// </summary>
|
|
internal static byte[] ComputeLink(
|
|
byte[]? prevLink,
|
|
Guid jobId,
|
|
HlcTimestamp tHlc,
|
|
byte[] payloadHash)
|
|
{
|
|
using var hasher = IncrementalHash.CreateHash(HashAlgorithmName.SHA256);
|
|
|
|
// Previous link (or 32 zero bytes for first entry)
|
|
hasher.AppendData(prevLink ?? new byte[32]);
|
|
|
|
// Job ID as bytes
|
|
hasher.AppendData(jobId.ToByteArray());
|
|
|
|
// HLC timestamp as UTF-8 bytes
|
|
hasher.AppendData(Encoding.UTF8.GetBytes(tHlc.ToSortableString()));
|
|
|
|
// Payload hash
|
|
hasher.AppendData(payloadHash);
|
|
|
|
return hasher.GetHashAndReset();
|
|
}
|
|
}
|