This commit is contained in:
StellaOps Bot
2026-01-06 21:03:06 +02:00
841 changed files with 15706 additions and 68106 deletions

View File

@@ -1,177 +1,171 @@
-- HLC Queue Chain: Hybrid Logical Clock Ordering with Cryptographic Sequence Proofs
-- SPRINT_20260105_002_002_SCHEDULER: SQC-002, SQC-003, SQC-004
--
-- Adds HLC-based ordering with hash chain at enqueue time for audit-safe job queue ordering.
-- See: Product Advisory "Audit-safe job queue ordering using monotonic timestamps"
BEGIN;
-- -----------------------------------------------------------------------------
-- 002_hlc_queue_chain.sql
-- Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
-- Tasks: SQC-002, SQC-003, SQC-004
-- Description: HLC-ordered scheduler queue with cryptographic chain linking
-- -----------------------------------------------------------------------------
-- ============================================================================
-- SECTION 1: Scheduler Log Table (SQC-002)
-- SQC-002: scheduler.scheduler_log - HLC-ordered, chain-linked jobs
-- ============================================================================
-- HLC-ordered, chain-linked job entries. This is the authoritative order.
-- Jobs are linked via: link_i = Hash(link_{i-1} || job_id || t_hlc || payload_hash)
CREATE TABLE IF NOT EXISTS scheduler.scheduler_log (
seq_bigint BIGSERIAL PRIMARY KEY, -- Storage order (not authoritative)
-- Storage order (BIGSERIAL for monotonic insertion, not authoritative for ordering)
seq_bigint BIGSERIAL PRIMARY KEY,
-- Tenant isolation
tenant_id TEXT NOT NULL,
t_hlc TEXT NOT NULL, -- HLC timestamp: "0001704067200000-node-1-000042"
partition_key TEXT NOT NULL DEFAULT '', -- Optional queue partition
-- HLC timestamp: "1704067200000-scheduler-east-1-000042"
-- This is the authoritative ordering key
t_hlc TEXT NOT NULL,
-- Optional queue partition for parallel processing
partition_key TEXT DEFAULT '',
-- Job identifier (deterministic from payload using GUID v5)
job_id UUID NOT NULL,
payload_hash BYTEA NOT NULL, -- SHA-256 of canonical payload JSON
prev_link BYTEA, -- Previous chain link (null for first)
link BYTEA NOT NULL, -- Hash(prev_link || job_id || t_hlc || payload_hash)
-- SHA-256 of canonical JSON payload (32 bytes)
payload_hash BYTEA NOT NULL CHECK (octet_length(payload_hash) = 32),
-- Previous chain link (null for first entry in partition)
prev_link BYTEA CHECK (prev_link IS NULL OR octet_length(prev_link) = 32),
-- Current chain link: Hash(prev_link || job_id || t_hlc || payload_hash)
link BYTEA NOT NULL CHECK (octet_length(link) = 32),
-- Wall-clock timestamp for operational queries (not authoritative)
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Ensure HLC order is unique within tenant/partition
CONSTRAINT uq_scheduler_log_order UNIQUE (tenant_id, partition_key, t_hlc, job_id)
-- Ensure unique HLC ordering within tenant/partition
CONSTRAINT uq_scheduler_log_order UNIQUE (tenant_id, t_hlc, partition_key, job_id)
);
COMMENT ON TABLE scheduler.scheduler_log IS
'HLC-ordered job log with cryptographic chain linking for audit-safe ordering';
COMMENT ON COLUMN scheduler.scheduler_log.t_hlc IS
'Hybrid Logical Clock timestamp in sortable string format';
COMMENT ON COLUMN scheduler.scheduler_log.link IS
'SHA-256 chain link: Hash(prev_link || job_id || t_hlc || payload_hash)';
-- Index for tenant + HLC ordered queries (primary query path)
-- Primary query: get jobs by HLC order within tenant
CREATE INDEX IF NOT EXISTS idx_scheduler_log_tenant_hlc
ON scheduler.scheduler_log(tenant_id, t_hlc);
ON scheduler.scheduler_log (tenant_id, t_hlc ASC);
-- Index for partition-scoped queries
-- Partition-specific queries
CREATE INDEX IF NOT EXISTS idx_scheduler_log_partition
ON scheduler.scheduler_log(tenant_id, partition_key, t_hlc);
ON scheduler.scheduler_log (tenant_id, partition_key, t_hlc ASC);
-- Index for job_id lookups (idempotency checks)
-- Job lookup by ID
CREATE INDEX IF NOT EXISTS idx_scheduler_log_job_id
ON scheduler.scheduler_log(job_id);
ON scheduler.scheduler_log (job_id);
-- Chain verification: find by link hash
CREATE INDEX IF NOT EXISTS idx_scheduler_log_link
ON scheduler.scheduler_log (link);
-- Range queries for batch snapshots
CREATE INDEX IF NOT EXISTS idx_scheduler_log_created
ON scheduler.scheduler_log (tenant_id, created_at DESC);
COMMENT ON TABLE scheduler.scheduler_log IS 'HLC-ordered scheduler queue with cryptographic chain linking for audit-safe job ordering';
COMMENT ON COLUMN scheduler.scheduler_log.t_hlc IS 'Hybrid Logical Clock timestamp: authoritative ordering key. Format: physicalTime13-nodeId-counter6';
COMMENT ON COLUMN scheduler.scheduler_log.link IS 'Chain link = SHA256(prev_link || job_id || t_hlc || payload_hash). Creates tamper-evident sequence.';
-- ============================================================================
-- SECTION 2: Batch Snapshot Table (SQC-003)
-- SQC-003: scheduler.batch_snapshot - Audit anchors for job batches
-- ============================================================================
-- Captures chain state at specific points for audit anchors and attestation.
CREATE TABLE IF NOT EXISTS scheduler.batch_snapshot (
batch_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-- Snapshot identifier
batch_id UUID PRIMARY KEY,
-- Tenant isolation
tenant_id TEXT NOT NULL,
range_start_t TEXT NOT NULL, -- HLC range start (inclusive)
range_end_t TEXT NOT NULL, -- HLC range end (inclusive)
head_link BYTEA NOT NULL, -- Chain head at snapshot time
job_count INT NOT NULL,
-- HLC range covered by this snapshot
range_start_t TEXT NOT NULL,
range_end_t TEXT NOT NULL,
-- Chain head at snapshot time (last link in range)
head_link BYTEA NOT NULL CHECK (octet_length(head_link) = 32),
-- Job count for quick validation
job_count INT NOT NULL CHECK (job_count >= 0),
-- Wall-clock timestamp
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
signed_by TEXT, -- Optional: signing key ID for DSSE
signature BYTEA -- Optional: DSSE signature bytes
-- Optional DSSE signature fields
signed_by TEXT, -- Key ID that signed
signature BYTEA, -- DSSE signature bytes
-- Constraint: signature requires signed_by
CONSTRAINT chk_signature_requires_signer CHECK (
(signature IS NULL AND signed_by IS NULL) OR
(signature IS NOT NULL AND signed_by IS NOT NULL)
)
);
COMMENT ON TABLE scheduler.batch_snapshot IS
'Audit anchors capturing chain state at specific HLC ranges';
COMMENT ON COLUMN scheduler.batch_snapshot.head_link IS
'The chain link at range_end_t - can be used to verify chain integrity';
-- Index for tenant + time ordered queries
-- Query snapshots by tenant and time
CREATE INDEX IF NOT EXISTS idx_batch_snapshot_tenant
ON scheduler.batch_snapshot(tenant_id, created_at DESC);
ON scheduler.batch_snapshot (tenant_id, created_at DESC);
-- Index for HLC range queries
CREATE INDEX IF NOT EXISTS idx_batch_snapshot_hlc_range
ON scheduler.batch_snapshot(tenant_id, range_start_t, range_end_t);
-- Query snapshots by HLC range
CREATE INDEX IF NOT EXISTS idx_batch_snapshot_range
ON scheduler.batch_snapshot (tenant_id, range_start_t, range_end_t);
COMMENT ON TABLE scheduler.batch_snapshot IS 'Audit anchors for scheduler job batches. Captures chain head at specific HLC ranges.';
COMMENT ON COLUMN scheduler.batch_snapshot.head_link IS 'Chain head (last link) at snapshot time. Can be verified by replaying chain.';
-- ============================================================================
-- SECTION 3: Chain Heads Table (SQC-004)
-- SQC-004: scheduler.chain_heads - Per-partition chain head tracking
-- ============================================================================
-- Tracks the last chain link per tenant/partition for efficient append.
CREATE TABLE IF NOT EXISTS scheduler.chain_heads (
-- Tenant isolation
tenant_id TEXT NOT NULL,
-- Partition (empty string for default partition)
partition_key TEXT NOT NULL DEFAULT '',
last_link BYTEA NOT NULL,
-- Last chain link in this partition
last_link BYTEA NOT NULL CHECK (octet_length(last_link) = 32),
-- Last HLC timestamp in this partition
last_t_hlc TEXT NOT NULL,
last_job_id UUID NOT NULL,
-- Wall-clock timestamp of last update
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Primary key: one head per tenant/partition
PRIMARY KEY (tenant_id, partition_key)
);
COMMENT ON TABLE scheduler.chain_heads IS
'Per-partition chain head tracking for efficient chain append operations';
-- Query chain heads by update time (for monitoring)
CREATE INDEX IF NOT EXISTS idx_chain_heads_updated
ON scheduler.chain_heads (updated_at DESC);
-- Trigger to update updated_at on chain_heads modifications
CREATE OR REPLACE TRIGGER update_chain_heads_updated_at
BEFORE UPDATE ON scheduler.chain_heads
FOR EACH ROW
EXECUTE FUNCTION scheduler.update_updated_at();
COMMENT ON TABLE scheduler.chain_heads IS 'Tracks current chain head for each tenant/partition. Updated atomically with scheduler_log inserts.';
COMMENT ON COLUMN scheduler.chain_heads.last_link IS 'Current chain head. Used as prev_link for next enqueue.';
-- ============================================================================
-- SECTION 4: Helper Functions
-- Atomic upsert function for chain head updates
-- ============================================================================
-- Function to get the current chain head for a tenant/partition
CREATE OR REPLACE FUNCTION scheduler.get_chain_head(
p_tenant_id TEXT,
p_partition_key TEXT DEFAULT ''
)
RETURNS TABLE (
last_link BYTEA,
last_t_hlc TEXT,
last_job_id UUID
)
LANGUAGE plpgsql STABLE
AS $$
BEGIN
RETURN QUERY
SELECT ch.last_link, ch.last_t_hlc, ch.last_job_id
FROM scheduler.chain_heads ch
WHERE ch.tenant_id = p_tenant_id
AND ch.partition_key = p_partition_key;
END;
$$;
-- Function to insert log entry and update chain head atomically
CREATE OR REPLACE FUNCTION scheduler.insert_log_with_chain_update(
p_tenant_id TEXT,
p_t_hlc TEXT,
CREATE OR REPLACE FUNCTION scheduler.upsert_chain_head(
p_tenant_id TEXT,
p_partition_key TEXT,
p_job_id UUID,
p_payload_hash BYTEA,
p_prev_link BYTEA,
p_link BYTEA
p_new_link BYTEA,
p_new_t_hlc TEXT
)
RETURNS BIGINT
RETURNS VOID
LANGUAGE plpgsql
AS $$
DECLARE
v_seq BIGINT;
BEGIN
-- Insert log entry
INSERT INTO scheduler.scheduler_log (
tenant_id, t_hlc, partition_key, job_id,
payload_hash, prev_link, link
)
VALUES (
p_tenant_id, p_t_hlc, p_partition_key, p_job_id,
p_payload_hash, p_prev_link, p_link
)
RETURNING seq_bigint INTO v_seq;
-- Upsert chain head
INSERT INTO scheduler.chain_heads (
tenant_id, partition_key, last_link, last_t_hlc, last_job_id
)
VALUES (
p_tenant_id, p_partition_key, p_link, p_t_hlc, p_job_id
)
INSERT INTO scheduler.chain_heads (tenant_id, partition_key, last_link, last_t_hlc, updated_at)
VALUES (p_tenant_id, p_partition_key, p_new_link, p_new_t_hlc, NOW())
ON CONFLICT (tenant_id, partition_key)
DO UPDATE SET
last_link = EXCLUDED.last_link,
last_t_hlc = EXCLUDED.last_t_hlc,
last_job_id = EXCLUDED.last_job_id,
updated_at = NOW();
RETURN v_seq;
updated_at = EXCLUDED.updated_at
WHERE scheduler.chain_heads.last_t_hlc < EXCLUDED.last_t_hlc;
END;
$$;
COMMENT ON FUNCTION scheduler.insert_log_with_chain_update IS
'Atomically inserts a scheduler log entry and updates the chain head';
COMMIT;
COMMENT ON FUNCTION scheduler.upsert_chain_head IS 'Atomically updates chain head. Only updates if new HLC > current HLC (monotonicity).';

View File

@@ -0,0 +1,58 @@
// -----------------------------------------------------------------------------
// BatchSnapshotEntity.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-005 - Entity for batch_snapshot table
// -----------------------------------------------------------------------------
namespace StellaOps.Scheduler.Persistence.Postgres.Models;
/// <summary>
/// Entity representing an audit anchor for a batch of scheduler jobs.
/// </summary>
public sealed record BatchSnapshotEntity
{
/// <summary>
/// Snapshot identifier.
/// </summary>
public required Guid BatchId { get; init; }
/// <summary>
/// Tenant identifier for isolation.
/// </summary>
public required string TenantId { get; init; }
/// <summary>
/// HLC range start (inclusive).
/// </summary>
public required string RangeStartT { get; init; }
/// <summary>
/// HLC range end (inclusive).
/// </summary>
public required string RangeEndT { get; init; }
/// <summary>
/// Chain head at snapshot time (last link in range).
/// </summary>
public required byte[] HeadLink { get; init; }
/// <summary>
/// Number of jobs in the snapshot range.
/// </summary>
public required int JobCount { get; init; }
/// <summary>
/// Wall-clock timestamp of snapshot creation.
/// </summary>
public required DateTimeOffset CreatedAt { get; init; }
/// <summary>
/// Key ID that signed the snapshot (null if unsigned).
/// </summary>
public string? SignedBy { get; init; }
/// <summary>
/// DSSE signature bytes (null if unsigned).
/// </summary>
public byte[]? Signature { get; init; }
}

View File

@@ -0,0 +1,38 @@
// -----------------------------------------------------------------------------
// ChainHeadEntity.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-005 - Entity for chain_heads table
// -----------------------------------------------------------------------------
namespace StellaOps.Scheduler.Persistence.Postgres.Models;
/// <summary>
/// Entity representing the current chain head for a tenant/partition.
/// </summary>
public sealed record ChainHeadEntity
{
/// <summary>
/// Tenant identifier for isolation.
/// </summary>
public required string TenantId { get; init; }
/// <summary>
/// Partition key (empty string for default partition).
/// </summary>
public string PartitionKey { get; init; } = "";
/// <summary>
/// Last chain link in this partition.
/// </summary>
public required byte[] LastLink { get; init; }
/// <summary>
/// Last HLC timestamp in this partition.
/// </summary>
public required string LastTHlc { get; init; }
/// <summary>
/// Wall-clock timestamp of last update.
/// </summary>
public required DateTimeOffset UpdatedAt { get; init; }
}

View File

@@ -0,0 +1,60 @@
// -----------------------------------------------------------------------------
// SchedulerLogEntity.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-005 - Entity for scheduler_log table
// -----------------------------------------------------------------------------
namespace StellaOps.Scheduler.Persistence.Postgres.Models;
/// <summary>
/// Entity representing an HLC-ordered, chain-linked scheduler log entry.
/// </summary>
public sealed record SchedulerLogEntity
{
/// <summary>
/// Storage sequence number (BIGSERIAL, not authoritative for ordering).
/// Populated by the database on insert; 0 for new entries before persistence.
/// </summary>
public long SeqBigint { get; init; }
/// <summary>
/// Tenant identifier for isolation.
/// </summary>
public required string TenantId { get; init; }
/// <summary>
/// HLC timestamp string: "1704067200000-scheduler-east-1-000042".
/// This is the authoritative ordering key.
/// </summary>
public required string THlc { get; init; }
/// <summary>
/// Optional queue partition for parallel processing.
/// </summary>
public string PartitionKey { get; init; } = "";
/// <summary>
/// Job identifier (deterministic from payload using GUID v5).
/// </summary>
public required Guid JobId { get; init; }
/// <summary>
/// SHA-256 of canonical JSON payload (32 bytes).
/// </summary>
public required byte[] PayloadHash { get; init; }
/// <summary>
/// Previous chain link (null for first entry in partition).
/// </summary>
public byte[]? PrevLink { get; init; }
/// <summary>
/// Current chain link: Hash(prev_link || job_id || t_hlc || payload_hash).
/// </summary>
public required byte[] Link { get; init; }
/// <summary>
/// Wall-clock timestamp for operational queries (not authoritative).
/// </summary>
public required DateTimeOffset CreatedAt { get; init; }
}

View File

@@ -0,0 +1,179 @@
// -----------------------------------------------------------------------------
// BatchSnapshotRepository.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-013 - Implement BatchSnapshotService
// -----------------------------------------------------------------------------
using Microsoft.Extensions.Logging;
using Npgsql;
using StellaOps.Infrastructure.Postgres.Repositories;
using StellaOps.Scheduler.Persistence.Postgres.Models;
namespace StellaOps.Scheduler.Persistence.Postgres.Repositories;
/// <summary>
/// PostgreSQL implementation of batch snapshot repository.
/// </summary>
public sealed class BatchSnapshotRepository : RepositoryBase<SchedulerDataSource>, IBatchSnapshotRepository
{
public BatchSnapshotRepository(
SchedulerDataSource dataSource,
ILogger<BatchSnapshotRepository> logger)
: base(dataSource, logger)
{
}
/// <inheritdoc />
public async Task InsertAsync(BatchSnapshotEntity snapshot, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(snapshot);
const string sql = """
INSERT INTO scheduler.batch_snapshot (
batch_id, tenant_id, range_start_t, range_end_t,
head_link, job_count, created_at, signed_by, signature
) VALUES (
@batch_id, @tenant_id, @range_start_t, @range_end_t,
@head_link, @job_count, @created_at, @signed_by, @signature
)
""";
await using var connection = await DataSource.OpenConnectionAsync(snapshot.TenantId, "writer", cancellationToken)
.ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddParameter(command, "batch_id", snapshot.BatchId);
AddParameter(command, "tenant_id", snapshot.TenantId);
AddParameter(command, "range_start_t", snapshot.RangeStartT);
AddParameter(command, "range_end_t", snapshot.RangeEndT);
AddParameter(command, "head_link", snapshot.HeadLink);
AddParameter(command, "job_count", snapshot.JobCount);
AddParameter(command, "created_at", snapshot.CreatedAt);
AddParameter(command, "signed_by", snapshot.SignedBy);
AddParameter(command, "signature", snapshot.Signature);
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<BatchSnapshotEntity?> GetByIdAsync(Guid batchId, CancellationToken cancellationToken = default)
{
const string sql = """
SELECT batch_id, tenant_id, range_start_t, range_end_t,
head_link, job_count, created_at, signed_by, signature
FROM scheduler.batch_snapshot
WHERE batch_id = @batch_id
""";
return await QuerySingleOrDefaultAsync(
tenantId: null!,
sql,
cmd => AddParameter(cmd, "batch_id", batchId),
MapBatchSnapshot,
cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<IReadOnlyList<BatchSnapshotEntity>> GetByTenantAsync(
string tenantId,
int limit = 100,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
const string sql = """
SELECT batch_id, tenant_id, range_start_t, range_end_t,
head_link, job_count, created_at, signed_by, signature
FROM scheduler.batch_snapshot
WHERE tenant_id = @tenant_id
ORDER BY created_at DESC
LIMIT @limit
""";
return await QueryAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
AddParameter(cmd, "limit", limit);
},
MapBatchSnapshot,
cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<IReadOnlyList<BatchSnapshotEntity>> GetContainingHlcAsync(
string tenantId,
string tHlc,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
ArgumentException.ThrowIfNullOrWhiteSpace(tHlc);
const string sql = """
SELECT batch_id, tenant_id, range_start_t, range_end_t,
head_link, job_count, created_at, signed_by, signature
FROM scheduler.batch_snapshot
WHERE tenant_id = @tenant_id
AND range_start_t <= @t_hlc
AND range_end_t >= @t_hlc
ORDER BY created_at DESC
""";
return await QueryAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
AddParameter(cmd, "t_hlc", tHlc);
},
MapBatchSnapshot,
cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<BatchSnapshotEntity?> GetLatestAsync(
string tenantId,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
const string sql = """
SELECT batch_id, tenant_id, range_start_t, range_end_t,
head_link, job_count, created_at, signed_by, signature
FROM scheduler.batch_snapshot
WHERE tenant_id = @tenant_id
ORDER BY created_at DESC
LIMIT 1
""";
return await QuerySingleOrDefaultAsync(
tenantId,
sql,
cmd => AddParameter(cmd, "tenant_id", tenantId),
MapBatchSnapshot,
cancellationToken).ConfigureAwait(false);
}
private static BatchSnapshotEntity MapBatchSnapshot(NpgsqlDataReader reader)
{
return new BatchSnapshotEntity
{
BatchId = reader.GetGuid(reader.GetOrdinal("batch_id")),
TenantId = reader.GetString(reader.GetOrdinal("tenant_id")),
RangeStartT = reader.GetString(reader.GetOrdinal("range_start_t")),
RangeEndT = reader.GetString(reader.GetOrdinal("range_end_t")),
HeadLink = reader.GetFieldValue<byte[]>(reader.GetOrdinal("head_link")),
JobCount = reader.GetInt32(reader.GetOrdinal("job_count")),
CreatedAt = reader.GetFieldValue<DateTimeOffset>(reader.GetOrdinal("created_at")),
SignedBy = reader.IsDBNull(reader.GetOrdinal("signed_by"))
? null
: reader.GetString(reader.GetOrdinal("signed_by")),
Signature = reader.IsDBNull(reader.GetOrdinal("signature"))
? null
: reader.GetFieldValue<byte[]>(reader.GetOrdinal("signature"))
};
}
}

View File

@@ -0,0 +1,140 @@
// -----------------------------------------------------------------------------
// ChainHeadRepository.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-007 - PostgreSQL implementation for chain_heads repository
// -----------------------------------------------------------------------------
using Microsoft.Extensions.Logging;
using Npgsql;
using StellaOps.Infrastructure.Postgres.Repositories;
using StellaOps.Scheduler.Persistence.Postgres.Models;
namespace StellaOps.Scheduler.Persistence.Postgres.Repositories;
/// <summary>
/// PostgreSQL repository for chain head tracking operations.
/// </summary>
public sealed class ChainHeadRepository : RepositoryBase<SchedulerDataSource>, IChainHeadRepository
{
/// <summary>
/// Creates a new chain head repository.
/// </summary>
public ChainHeadRepository(
SchedulerDataSource dataSource,
ILogger<ChainHeadRepository> logger)
: base(dataSource, logger)
{
}
/// <inheritdoc />
public async Task<ChainHeadEntity?> GetAsync(
string tenantId,
string partitionKey,
CancellationToken cancellationToken = default)
{
const string sql = """
SELECT tenant_id, partition_key, last_link, last_t_hlc, updated_at
FROM scheduler.chain_heads
WHERE tenant_id = @tenant_id AND partition_key = @partition_key
""";
return await QuerySingleOrDefaultAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
AddParameter(cmd, "partition_key", partitionKey);
},
MapChainHeadEntity,
cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<byte[]?> GetLastLinkAsync(
string tenantId,
string partitionKey,
CancellationToken cancellationToken = default)
{
const string sql = """
SELECT last_link
FROM scheduler.chain_heads
WHERE tenant_id = @tenant_id AND partition_key = @partition_key
""";
await using var connection = await DataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken)
.ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddParameter(command, "tenant_id", tenantId);
AddParameter(command, "partition_key", partitionKey);
var result = await command.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false);
return result is DBNull or null ? null : (byte[])result;
}
/// <inheritdoc />
public async Task<bool> UpsertAsync(
string tenantId,
string partitionKey,
byte[] newLink,
string newTHlc,
CancellationToken cancellationToken = default)
{
// Use the upsert function with monotonicity check
const string sql = """
INSERT INTO scheduler.chain_heads (tenant_id, partition_key, last_link, last_t_hlc, updated_at)
VALUES (@tenant_id, @partition_key, @new_link, @new_t_hlc, NOW())
ON CONFLICT (tenant_id, partition_key)
DO UPDATE SET
last_link = EXCLUDED.last_link,
last_t_hlc = EXCLUDED.last_t_hlc,
updated_at = EXCLUDED.updated_at
WHERE scheduler.chain_heads.last_t_hlc < EXCLUDED.last_t_hlc
""";
await using var connection = await DataSource.OpenConnectionAsync(tenantId, "writer", cancellationToken)
.ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddParameter(command, "tenant_id", tenantId);
AddParameter(command, "partition_key", partitionKey);
AddParameter(command, "new_link", newLink);
AddParameter(command, "new_t_hlc", newTHlc);
var rowsAffected = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
return rowsAffected > 0;
}
/// <inheritdoc />
public async Task<IReadOnlyList<ChainHeadEntity>> GetAllForTenantAsync(
string tenantId,
CancellationToken cancellationToken = default)
{
const string sql = """
SELECT tenant_id, partition_key, last_link, last_t_hlc, updated_at
FROM scheduler.chain_heads
WHERE tenant_id = @tenant_id
ORDER BY partition_key
""";
return await QueryAsync(
tenantId,
sql,
cmd => AddParameter(cmd, "tenant_id", tenantId),
MapChainHeadEntity,
cancellationToken).ConfigureAwait(false);
}
private static ChainHeadEntity MapChainHeadEntity(NpgsqlDataReader reader)
{
return new ChainHeadEntity
{
TenantId = reader.GetString(reader.GetOrdinal("tenant_id")),
PartitionKey = reader.GetString(reader.GetOrdinal("partition_key")),
LastLink = reader.GetFieldValue<byte[]>(reader.GetOrdinal("last_link")),
LastTHlc = reader.GetString(reader.GetOrdinal("last_t_hlc")),
UpdatedAt = reader.GetFieldValue<DateTimeOffset>(reader.GetOrdinal("updated_at"))
};
}
}

View File

@@ -1,6 +1,8 @@
// <copyright file="IBatchSnapshotRepository.cs" company="StellaOps">
// Copyright (c) StellaOps. Licensed under AGPL-3.0-or-later.
// </copyright>
// -----------------------------------------------------------------------------
// IBatchSnapshotRepository.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-013 - Implement BatchSnapshotService
// -----------------------------------------------------------------------------
using StellaOps.Scheduler.Persistence.Postgres.Models;
@@ -16,50 +18,33 @@ public interface IBatchSnapshotRepository
/// </summary>
/// <param name="snapshot">The snapshot to insert.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>A task representing the operation.</returns>
Task InsertAsync(BatchSnapshot snapshot, CancellationToken cancellationToken = default);
Task InsertAsync(BatchSnapshotEntity snapshot, CancellationToken cancellationToken = default);
/// <summary>
/// Gets a batch snapshot by ID.
/// </summary>
/// <param name="batchId">The batch identifier.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The snapshot if found.</returns>
Task<BatchSnapshot?> GetByIdAsync(Guid batchId, CancellationToken cancellationToken = default);
Task<BatchSnapshotEntity?> GetByIdAsync(Guid batchId, CancellationToken cancellationToken = default);
/// <summary>
/// Gets the most recent batch snapshot for a tenant.
/// Gets batch snapshots for a tenant, ordered by creation time descending.
/// </summary>
/// <param name="tenantId">Tenant identifier.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The most recent snapshot if found.</returns>
Task<BatchSnapshot?> GetLatestAsync(string tenantId, CancellationToken cancellationToken = default);
/// <summary>
/// Gets batch snapshots for a tenant within a time range.
/// </summary>
/// <param name="tenantId">Tenant identifier.</param>
/// <param name="startTime">Start time (inclusive).</param>
/// <param name="endTime">End time (inclusive).</param>
/// <param name="limit">Maximum snapshots to return.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Snapshots in the specified range.</returns>
Task<IReadOnlyList<BatchSnapshot>> GetByTimeRangeAsync(
Task<IReadOnlyList<BatchSnapshotEntity>> GetByTenantAsync(
string tenantId,
DateTimeOffset startTime,
DateTimeOffset endTime,
int limit = 100,
CancellationToken cancellationToken = default);
/// <summary>
/// Gets batch snapshots containing a specific HLC timestamp.
/// Gets batch snapshots that contain a specific HLC timestamp.
/// </summary>
/// <param name="tenantId">Tenant identifier.</param>
/// <param name="tHlc">The HLC timestamp to search for.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Snapshots containing the timestamp.</returns>
Task<IReadOnlyList<BatchSnapshot>> GetContainingHlcAsync(
Task<IReadOnlyList<BatchSnapshotEntity>> GetContainingHlcAsync(
string tenantId,
string tHlc,
CancellationToken cancellationToken = default);
/// <summary>
/// Gets the latest batch snapshot for a tenant.
/// </summary>
Task<BatchSnapshotEntity?> GetLatestAsync(
string tenantId,
CancellationToken cancellationToken = default);
}

View File

@@ -1,47 +1,64 @@
// <copyright file="IChainHeadRepository.cs" company="StellaOps">
// Copyright (c) StellaOps. Licensed under AGPL-3.0-or-later.
// </copyright>
// -----------------------------------------------------------------------------
// IChainHeadRepository.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-007 - Interface for chain_heads repository
// -----------------------------------------------------------------------------
using StellaOps.Scheduler.Persistence.Postgres.Models;
namespace StellaOps.Scheduler.Persistence.Postgres.Repositories;
/// <summary>
/// Repository interface for chain head operations.
/// Repository interface for chain head tracking operations.
/// </summary>
public interface IChainHeadRepository
{
/// <summary>
/// Gets the last chain link for a tenant/partition.
/// Gets the current chain head for a tenant/partition.
/// </summary>
/// <param name="tenantId">Tenant identifier.</param>
/// <param name="partitionKey">Partition key (empty string for default).</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The last link bytes, or null if no chain exists.</returns>
/// <returns>Current chain head, or null if no entries exist.</returns>
Task<ChainHeadEntity?> GetAsync(
string tenantId,
string partitionKey,
CancellationToken cancellationToken = default);
/// <summary>
/// Gets the last link hash for a tenant/partition.
/// Convenience method for chain linking operations.
/// </summary>
/// <param name="tenantId">Tenant identifier.</param>
/// <param name="partitionKey">Partition key (empty string for default).</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Last link hash, or null if no entries exist.</returns>
Task<byte[]?> GetLastLinkAsync(
string tenantId,
string partitionKey,
CancellationToken cancellationToken = default);
/// <summary>
/// Gets the full chain head for a tenant/partition.
/// Updates the chain head atomically with monotonicity check.
/// Only updates if new HLC > current HLC.
/// </summary>
/// <param name="tenantId">Tenant identifier.</param>
/// <param name="partitionKey">Partition key (empty string for default).</param>
/// <param name="newLink">New chain link.</param>
/// <param name="newTHlc">New HLC timestamp.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The chain head, or null if no chain exists.</returns>
Task<ChainHead?> GetAsync(
/// <returns>True if updated, false if skipped due to monotonicity.</returns>
Task<bool> UpsertAsync(
string tenantId,
string partitionKey,
byte[] newLink,
string newTHlc,
CancellationToken cancellationToken = default);
/// <summary>
/// Gets all chain heads for a tenant.
/// </summary>
/// <param name="tenantId">Tenant identifier.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>All chain heads for the tenant.</returns>
Task<IReadOnlyList<ChainHead>> GetAllForTenantAsync(
Task<IReadOnlyList<ChainHeadEntity>> GetAllForTenantAsync(
string tenantId,
CancellationToken cancellationToken = default);
}

View File

@@ -1,6 +1,8 @@
// <copyright file="ISchedulerLogRepository.cs" company="StellaOps">
// Copyright (c) StellaOps. Licensed under AGPL-3.0-or-later.
// </copyright>
// -----------------------------------------------------------------------------
// ISchedulerLogRepository.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-005 - Interface for scheduler_log repository
// -----------------------------------------------------------------------------
using StellaOps.Scheduler.Persistence.Postgres.Models;
@@ -12,98 +14,61 @@ namespace StellaOps.Scheduler.Persistence.Postgres.Repositories;
public interface ISchedulerLogRepository
{
/// <summary>
/// Inserts a log entry and atomically updates the chain head.
/// Inserts a new log entry and atomically updates the chain head.
/// </summary>
/// <param name="entry">The log entry to insert.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The sequence number of the inserted entry.</returns>
Task<long> InsertWithChainUpdateAsync(
SchedulerLogEntry entry,
/// <returns>The inserted entry with populated seq_bigint.</returns>
Task<SchedulerLogEntity> InsertWithChainUpdateAsync(
SchedulerLogEntity entry,
CancellationToken cancellationToken = default);
/// <summary>
/// Gets log entries ordered by HLC timestamp.
/// Gets log entries by HLC order within a tenant/partition.
/// </summary>
/// <param name="tenantId">Tenant identifier.</param>
/// <param name="partitionKey">Optional partition key (null for all partitions).</param>
/// <param name="limit">Maximum entries to return.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Log entries in HLC order.</returns>
Task<IReadOnlyList<SchedulerLogEntry>> GetByHlcOrderAsync(
Task<IReadOnlyList<SchedulerLogEntity>> GetByHlcOrderAsync(
string tenantId,
string? partitionKey,
int limit,
CancellationToken cancellationToken = default);
/// <summary>
/// Gets log entries within an HLC timestamp range.
/// Gets log entries within an HLC range.
/// </summary>
/// <param name="tenantId">Tenant identifier.</param>
/// <param name="startTHlc">Start timestamp (inclusive, null for unbounded).</param>
/// <param name="endTHlc">End timestamp (inclusive, null for unbounded).</param>
/// <param name="limit">Maximum entries to return (0 for unlimited).</param>
/// <param name="partitionKey">Optional partition key (null for all partitions).</param>
/// <param name="startTHlc">Start HLC (inclusive, null for no lower bound).</param>
/// <param name="endTHlc">End HLC (inclusive, null for no upper bound).</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Log entries in the specified range.</returns>
Task<IReadOnlyList<SchedulerLogEntry>> GetByHlcRangeAsync(
Task<IReadOnlyList<SchedulerLogEntity>> GetByHlcRangeAsync(
string tenantId,
string? startTHlc,
string? endTHlc,
int limit = 0,
string? partitionKey = null,
CancellationToken cancellationToken = default);
/// <summary>
/// Gets log entries after an HLC timestamp (cursor-based pagination).
/// </summary>
/// <param name="tenantId">Tenant identifier.</param>
/// <param name="afterTHlc">Start after this timestamp (exclusive).</param>
/// <param name="limit">Maximum entries to return.</param>
/// <param name="partitionKey">Optional partition key (null for all partitions).</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Log entries after the specified timestamp.</returns>
Task<IReadOnlyList<SchedulerLogEntry>> GetAfterHlcAsync(
string tenantId,
string afterTHlc,
int limit,
string? partitionKey = null,
CancellationToken cancellationToken = default);
/// <summary>
/// Counts log entries within an HLC timestamp range.
/// </summary>
/// <param name="tenantId">Tenant identifier.</param>
/// <param name="startTHlc">Start timestamp (inclusive, null for unbounded).</param>
/// <param name="endTHlc">End timestamp (inclusive, null for unbounded).</param>
/// <param name="partitionKey">Optional partition key (null for all partitions).</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Count of entries in the range.</returns>
Task<int> CountByHlcRangeAsync(
string tenantId,
string? startTHlc,
string? endTHlc,
string? partitionKey = null,
CancellationToken cancellationToken = default);
/// <summary>
/// Gets a log entry by job ID.
/// </summary>
/// <param name="jobId">Job identifier.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The log entry if found.</returns>
Task<SchedulerLogEntry?> GetByJobIdAsync(
Task<SchedulerLogEntity?> GetByJobIdAsync(
Guid jobId,
CancellationToken cancellationToken = default);
/// <summary>
/// Checks if a job ID already exists in the log.
/// Gets a log entry by its chain link hash.
/// </summary>
/// <param name="tenantId">Tenant identifier.</param>
/// <param name="jobId">Job identifier.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>True if the job exists.</returns>
Task<bool> ExistsAsync(
Task<SchedulerLogEntity?> GetByLinkAsync(
byte[] link,
CancellationToken cancellationToken = default);
/// <summary>
/// Counts entries in an HLC range.
/// </summary>
Task<int> CountByHlcRangeAsync(
string tenantId,
Guid jobId,
string? startTHlc,
string? endTHlc,
CancellationToken cancellationToken = default);
}

View File

@@ -0,0 +1,270 @@
// -----------------------------------------------------------------------------
// SchedulerLogRepository.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-006 - PostgreSQL implementation for scheduler_log repository
// -----------------------------------------------------------------------------
using Microsoft.Extensions.Logging;
using Npgsql;
using StellaOps.Infrastructure.Postgres.Repositories;
using StellaOps.Scheduler.Persistence.Postgres.Models;
namespace StellaOps.Scheduler.Persistence.Postgres.Repositories;
/// <summary>
/// PostgreSQL repository for HLC-ordered scheduler log operations.
/// </summary>
public sealed class SchedulerLogRepository : RepositoryBase<SchedulerDataSource>, ISchedulerLogRepository
{
private readonly IChainHeadRepository _chainHeadRepository;
/// <summary>
/// Creates a new scheduler log repository.
/// </summary>
public SchedulerLogRepository(
SchedulerDataSource dataSource,
ILogger<SchedulerLogRepository> logger,
IChainHeadRepository chainHeadRepository)
: base(dataSource, logger)
{
_chainHeadRepository = chainHeadRepository;
}
/// <inheritdoc />
public async Task<SchedulerLogEntity> InsertWithChainUpdateAsync(
SchedulerLogEntity entry,
CancellationToken cancellationToken = default)
{
const string sql = """
INSERT INTO scheduler.scheduler_log (
tenant_id, t_hlc, partition_key, job_id, payload_hash, prev_link, link
)
VALUES (
@tenant_id, @t_hlc, @partition_key, @job_id, @payload_hash, @prev_link, @link
)
RETURNING seq_bigint, tenant_id, t_hlc, partition_key, job_id, payload_hash, prev_link, link, created_at
""";
await using var connection = await DataSource.OpenConnectionAsync(entry.TenantId, "writer", cancellationToken)
.ConfigureAwait(false);
// Use transaction for atomicity of log insert + chain head update
await using var transaction = await connection.BeginTransactionAsync(cancellationToken).ConfigureAwait(false);
try
{
await using var command = CreateCommand(sql, connection);
command.Transaction = transaction;
AddParameter(command, "tenant_id", entry.TenantId);
AddParameter(command, "t_hlc", entry.THlc);
AddParameter(command, "partition_key", entry.PartitionKey);
AddParameter(command, "job_id", entry.JobId);
AddParameter(command, "payload_hash", entry.PayloadHash);
AddParameter(command, "prev_link", entry.PrevLink);
AddParameter(command, "link", entry.Link);
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
var result = MapSchedulerLogEntry(reader);
await reader.CloseAsync().ConfigureAwait(false);
// Update chain head atomically
await _chainHeadRepository.UpsertAsync(
entry.TenantId,
entry.PartitionKey,
entry.Link,
entry.THlc,
cancellationToken).ConfigureAwait(false);
await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
return result;
}
catch
{
await transaction.RollbackAsync(cancellationToken).ConfigureAwait(false);
throw;
}
}
/// <inheritdoc />
public async Task<IReadOnlyList<SchedulerLogEntity>> GetByHlcOrderAsync(
string tenantId,
string? partitionKey,
int limit,
CancellationToken cancellationToken = default)
{
var sql = partitionKey is not null
? """
SELECT seq_bigint, tenant_id, t_hlc, partition_key, job_id, payload_hash, prev_link, link, created_at
FROM scheduler.scheduler_log
WHERE tenant_id = @tenant_id AND partition_key = @partition_key
ORDER BY t_hlc ASC
LIMIT @limit
"""
: """
SELECT seq_bigint, tenant_id, t_hlc, partition_key, job_id, payload_hash, prev_link, link, created_at
FROM scheduler.scheduler_log
WHERE tenant_id = @tenant_id
ORDER BY t_hlc ASC
LIMIT @limit
""";
return await QueryAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
if (partitionKey is not null)
{
AddParameter(cmd, "partition_key", partitionKey);
}
AddParameter(cmd, "limit", limit);
},
MapSchedulerLogEntry,
cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<IReadOnlyList<SchedulerLogEntity>> GetByHlcRangeAsync(
string tenantId,
string? startTHlc,
string? endTHlc,
CancellationToken cancellationToken = default)
{
var whereClause = "WHERE tenant_id = @tenant_id";
if (startTHlc is not null)
{
whereClause += " AND t_hlc >= @start_t_hlc";
}
if (endTHlc is not null)
{
whereClause += " AND t_hlc <= @end_t_hlc";
}
var sql = $"""
SELECT seq_bigint, tenant_id, t_hlc, partition_key, job_id, payload_hash, prev_link, link, created_at
FROM scheduler.scheduler_log
{whereClause}
ORDER BY t_hlc ASC
""";
return await QueryAsync(
tenantId,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", tenantId);
if (startTHlc is not null)
{
AddParameter(cmd, "start_t_hlc", startTHlc);
}
if (endTHlc is not null)
{
AddParameter(cmd, "end_t_hlc", endTHlc);
}
},
MapSchedulerLogEntry,
cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<SchedulerLogEntity?> GetByJobIdAsync(
Guid jobId,
CancellationToken cancellationToken = default)
{
const string sql = """
SELECT seq_bigint, tenant_id, t_hlc, partition_key, job_id, payload_hash, prev_link, link, created_at
FROM scheduler.scheduler_log
WHERE job_id = @job_id
""";
// Job ID lookup doesn't require tenant context
return await QuerySingleOrDefaultAsync(
tenantId: null!,
sql,
cmd => AddParameter(cmd, "job_id", jobId),
MapSchedulerLogEntry,
cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<SchedulerLogEntity?> GetByLinkAsync(
byte[] link,
CancellationToken cancellationToken = default)
{
const string sql = """
SELECT seq_bigint, tenant_id, t_hlc, partition_key, job_id, payload_hash, prev_link, link, created_at
FROM scheduler.scheduler_log
WHERE link = @link
""";
return await QuerySingleOrDefaultAsync(
tenantId: null!,
sql,
cmd => AddParameter(cmd, "link", link),
MapSchedulerLogEntry,
cancellationToken).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task<int> CountByHlcRangeAsync(
string tenantId,
string? startTHlc,
string? endTHlc,
CancellationToken cancellationToken = default)
{
var whereClause = "WHERE tenant_id = @tenant_id";
if (startTHlc is not null)
{
whereClause += " AND t_hlc >= @start_t_hlc";
}
if (endTHlc is not null)
{
whereClause += " AND t_hlc <= @end_t_hlc";
}
var sql = $"""
SELECT COUNT(*)::INT
FROM scheduler.scheduler_log
{whereClause}
""";
await using var connection = await DataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken)
.ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddParameter(command, "tenant_id", tenantId);
if (startTHlc is not null)
{
AddParameter(command, "start_t_hlc", startTHlc);
}
if (endTHlc is not null)
{
AddParameter(command, "end_t_hlc", endTHlc);
}
var result = await command.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false);
return result is int count ? count : 0;
}
private static SchedulerLogEntity MapSchedulerLogEntry(NpgsqlDataReader reader)
{
return new SchedulerLogEntity
{
SeqBigint = reader.GetInt64(reader.GetOrdinal("seq_bigint")),
TenantId = reader.GetString(reader.GetOrdinal("tenant_id")),
THlc = reader.GetString(reader.GetOrdinal("t_hlc")),
PartitionKey = reader.GetString(reader.GetOrdinal("partition_key")),
JobId = reader.GetGuid(reader.GetOrdinal("job_id")),
PayloadHash = reader.GetFieldValue<byte[]>(reader.GetOrdinal("payload_hash")),
PrevLink = reader.IsDBNull(reader.GetOrdinal("prev_link"))
? null
: reader.GetFieldValue<byte[]>(reader.GetOrdinal("prev_link")),
Link = reader.GetFieldValue<byte[]>(reader.GetOrdinal("link")),
CreatedAt = reader.GetFieldValue<DateTimeOffset>(reader.GetOrdinal("created_at"))
};
}
}

View File

@@ -0,0 +1,160 @@
// -----------------------------------------------------------------------------
// SchedulerChainLinking.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-008 - Implement SchedulerChainLinking static class
// -----------------------------------------------------------------------------
using System.Security.Cryptography;
using System.Text;
using StellaOps.HybridLogicalClock;
namespace StellaOps.Scheduler.Persistence.Postgres;
/// <summary>
/// Static utility class for computing chain links in the scheduler queue.
/// Chain links provide tamper-evident sequence proofs per the advisory specification.
/// </summary>
public static class SchedulerChainLinking
{
/// <summary>
/// Number of bytes in a chain link (SHA-256 = 32 bytes).
/// </summary>
public const int LinkSizeBytes = 32;
/// <summary>
/// Compute chain link per advisory specification:
/// link_i = Hash(link_{i-1} || job_id || t_hlc || payload_hash)
/// </summary>
/// <param name="prevLink">Previous chain link, or null for first entry (uses 32 zero bytes).</param>
/// <param name="jobId">Job identifier.</param>
/// <param name="tHlc">HLC timestamp.</param>
/// <param name="payloadHash">SHA-256 hash of canonical payload.</param>
/// <returns>New chain link (32 bytes).</returns>
public static byte[] ComputeLink(
byte[]? prevLink,
Guid jobId,
HlcTimestamp tHlc,
byte[] payloadHash)
{
ArgumentNullException.ThrowIfNull(payloadHash);
if (payloadHash.Length != LinkSizeBytes)
{
throw new ArgumentException($"Payload hash must be {LinkSizeBytes} bytes", nameof(payloadHash));
}
using var hasher = IncrementalHash.CreateHash(HashAlgorithmName.SHA256);
// Previous link (or 32 zero bytes for first entry)
hasher.AppendData(prevLink ?? new byte[LinkSizeBytes]);
// Job ID as bytes (using standard Guid byte layout)
hasher.AppendData(jobId.ToByteArray());
// HLC timestamp as UTF-8 bytes
hasher.AppendData(Encoding.UTF8.GetBytes(tHlc.ToSortableString()));
// Payload hash
hasher.AppendData(payloadHash);
return hasher.GetHashAndReset();
}
/// <summary>
/// Compute chain link from string HLC timestamp.
/// </summary>
public static byte[] ComputeLink(
byte[]? prevLink,
Guid jobId,
string tHlcString,
byte[] payloadHash)
{
var tHlc = HlcTimestamp.Parse(tHlcString);
return ComputeLink(prevLink, jobId, tHlc, payloadHash);
}
/// <summary>
/// Compute deterministic payload hash from canonical JSON.
/// </summary>
/// <param name="canonicalJson">RFC 8785 canonical JSON representation of payload.</param>
/// <returns>SHA-256 hash (32 bytes).</returns>
public static byte[] ComputePayloadHash(string canonicalJson)
{
ArgumentException.ThrowIfNullOrEmpty(canonicalJson);
return SHA256.HashData(Encoding.UTF8.GetBytes(canonicalJson));
}
/// <summary>
/// Compute deterministic payload hash from raw bytes.
/// </summary>
/// <param name="payload">Payload bytes.</param>
/// <returns>SHA-256 hash (32 bytes).</returns>
public static byte[] ComputePayloadHash(byte[] payload)
{
ArgumentNullException.ThrowIfNull(payload);
return SHA256.HashData(payload);
}
/// <summary>
/// Verify that a chain link is correctly computed.
/// </summary>
/// <param name="expectedLink">The stored link to verify.</param>
/// <param name="prevLink">Previous chain link.</param>
/// <param name="jobId">Job identifier.</param>
/// <param name="tHlc">HLC timestamp.</param>
/// <param name="payloadHash">Payload hash.</param>
/// <returns>True if the link is valid.</returns>
public static bool VerifyLink(
byte[] expectedLink,
byte[]? prevLink,
Guid jobId,
HlcTimestamp tHlc,
byte[] payloadHash)
{
ArgumentNullException.ThrowIfNull(expectedLink);
if (expectedLink.Length != LinkSizeBytes)
{
return false;
}
var computed = ComputeLink(prevLink, jobId, tHlc, payloadHash);
return CryptographicOperations.FixedTimeEquals(expectedLink, computed);
}
/// <summary>
/// Verify that a chain link is correctly computed (string HLC version).
/// </summary>
public static bool VerifyLink(
byte[] expectedLink,
byte[]? prevLink,
Guid jobId,
string tHlcString,
byte[] payloadHash)
{
if (!HlcTimestamp.TryParse(tHlcString, out var tHlc))
{
return false;
}
return VerifyLink(expectedLink, prevLink, jobId, tHlc, payloadHash);
}
/// <summary>
/// Create the genesis link (first link in a chain).
/// Uses 32 zero bytes as the previous link.
/// </summary>
public static byte[] ComputeGenesisLink(
Guid jobId,
HlcTimestamp tHlc,
byte[] payloadHash)
{
return ComputeLink(null, jobId, tHlc, payloadHash);
}
/// <summary>
/// Formats a link as a hexadecimal string for display/logging.
/// </summary>
public static string ToHexString(byte[]? link)
{
if (link is null) return "(null)";
return Convert.ToHexString(link).ToLowerInvariant();
}
}

View File

@@ -28,7 +28,6 @@
<ProjectReference Include="..\..\..\__Libraries\StellaOps.Infrastructure.Postgres\StellaOps.Infrastructure.Postgres.csproj" />
<ProjectReference Include="..\..\..\__Libraries\StellaOps.Infrastructure.EfCore\StellaOps.Infrastructure.EfCore.csproj" />
<ProjectReference Include="..\..\..\__Libraries\StellaOps.HybridLogicalClock\StellaOps.HybridLogicalClock.csproj" />
<ProjectReference Include="..\..\..\__Libraries\StellaOps.Canonical.Json\StellaOps.Canonical.Json.csproj" />
</ItemGroup>
<!-- Embed SQL migrations as resources -->

View File

@@ -0,0 +1,250 @@
// -----------------------------------------------------------------------------
// HlcJobRepositoryDecorator.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-019 - Update existing JobRepository to use HLC ordering optionally
// -----------------------------------------------------------------------------
using System.Security.Cryptography;
using System.Text;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StellaOps.Determinism;
using StellaOps.HybridLogicalClock;
using StellaOps.Scheduler.Persistence.Postgres;
using StellaOps.Scheduler.Persistence.Postgres.Models;
using StellaOps.Scheduler.Persistence.Postgres.Repositories;
using StellaOps.Scheduler.Queue.Options;
namespace StellaOps.Scheduler.Queue.Decorators;
/// <summary>
/// Decorator for IJobRepository that adds HLC ordering and chain linking.
/// </summary>
/// <remarks>
/// This decorator implements the dual-write migration pattern:
/// - When EnableDualWrite=true: writes to both scheduler.jobs AND scheduler.scheduler_log
/// - When EnableHlcOrdering=true: uses HLC ordering from scheduler_log for dequeue
///
/// Migration phases:
/// Phase 1: DualWrite=true, HlcOrdering=false (write both, read legacy)
/// Phase 2: DualWrite=true, HlcOrdering=true (write both, read HLC)
/// Phase 3: DualWrite=false, HlcOrdering=true (write/read HLC only)
/// </remarks>
public sealed class HlcJobRepositoryDecorator : IJobRepository
{
private readonly IJobRepository _inner;
private readonly ISchedulerLogRepository _logRepository;
private readonly IChainHeadRepository _chainHeadRepository;
private readonly IHybridLogicalClock _hlc;
private readonly IGuidProvider _guidProvider;
private readonly HlcSchedulerOptions _options;
private readonly ILogger<HlcJobRepositoryDecorator> _logger;
public HlcJobRepositoryDecorator(
IJobRepository inner,
ISchedulerLogRepository logRepository,
IChainHeadRepository chainHeadRepository,
IHybridLogicalClock hlc,
IGuidProvider guidProvider,
IOptions<HlcSchedulerOptions> options,
ILogger<HlcJobRepositoryDecorator> logger)
{
_inner = inner ?? throw new ArgumentNullException(nameof(inner));
_logRepository = logRepository ?? throw new ArgumentNullException(nameof(logRepository));
_chainHeadRepository = chainHeadRepository ?? throw new ArgumentNullException(nameof(chainHeadRepository));
_hlc = hlc ?? throw new ArgumentNullException(nameof(hlc));
_guidProvider = guidProvider ?? throw new ArgumentNullException(nameof(guidProvider));
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc />
public async Task<JobEntity> CreateAsync(JobEntity job, CancellationToken cancellationToken = default)
{
// Always create in legacy table
var created = await _inner.CreateAsync(job, cancellationToken);
// Dual-write to scheduler_log if enabled
if (_options.EnableDualWrite)
{
try
{
await WriteToSchedulerLogAsync(created, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(
ex,
"Failed to dual-write job {JobId} to scheduler_log for tenant {TenantId}",
created.Id,
created.TenantId);
// Don't fail the operation - legacy write succeeded
}
}
return created;
}
/// <inheritdoc />
public Task<JobEntity?> GetByIdAsync(string tenantId, Guid id, CancellationToken cancellationToken = default)
=> _inner.GetByIdAsync(tenantId, id, cancellationToken);
/// <inheritdoc />
public Task<JobEntity?> GetByIdempotencyKeyAsync(string tenantId, string idempotencyKey, CancellationToken cancellationToken = default)
=> _inner.GetByIdempotencyKeyAsync(tenantId, idempotencyKey, cancellationToken);
/// <inheritdoc />
public async Task<IReadOnlyList<JobEntity>> GetScheduledJobsAsync(
string tenantId,
string[] jobTypes,
int limit = 10,
CancellationToken cancellationToken = default)
{
// If HLC ordering is enabled, query from scheduler_log instead
if (_options.EnableHlcOrdering)
{
return await GetScheduledJobsByHlcAsync(tenantId, jobTypes, limit, cancellationToken);
}
return await _inner.GetScheduledJobsAsync(tenantId, jobTypes, limit, cancellationToken);
}
/// <inheritdoc />
public Task<JobEntity?> TryLeaseJobAsync(
string tenantId,
Guid jobId,
string workerId,
TimeSpan leaseDuration,
CancellationToken cancellationToken = default)
=> _inner.TryLeaseJobAsync(tenantId, jobId, workerId, leaseDuration, cancellationToken);
/// <inheritdoc />
public Task<bool> ExtendLeaseAsync(
string tenantId,
Guid jobId,
Guid leaseId,
TimeSpan extension,
CancellationToken cancellationToken = default)
=> _inner.ExtendLeaseAsync(tenantId, jobId, leaseId, extension, cancellationToken);
/// <inheritdoc />
public Task<bool> CompleteAsync(
string tenantId,
Guid jobId,
Guid leaseId,
string? result = null,
CancellationToken cancellationToken = default)
=> _inner.CompleteAsync(tenantId, jobId, leaseId, result, cancellationToken);
/// <inheritdoc />
public Task<bool> FailAsync(
string tenantId,
Guid jobId,
Guid leaseId,
string reason,
bool retry = true,
CancellationToken cancellationToken = default)
=> _inner.FailAsync(tenantId, jobId, leaseId, reason, retry, cancellationToken);
/// <inheritdoc />
public Task<bool> CancelAsync(
string tenantId,
Guid jobId,
string reason,
CancellationToken cancellationToken = default)
=> _inner.CancelAsync(tenantId, jobId, reason, cancellationToken);
/// <inheritdoc />
public Task<int> RecoverExpiredLeasesAsync(
string tenantId,
CancellationToken cancellationToken = default)
=> _inner.RecoverExpiredLeasesAsync(tenantId, cancellationToken);
/// <inheritdoc />
public Task<IReadOnlyList<JobEntity>> GetByStatusAsync(
string tenantId,
JobStatus status,
int limit = 100,
int offset = 0,
CancellationToken cancellationToken = default)
=> _inner.GetByStatusAsync(tenantId, status, limit, offset, cancellationToken);
private async Task WriteToSchedulerLogAsync(JobEntity job, CancellationToken ct)
{
// 1. Get HLC timestamp
var tHlc = _hlc.Tick();
// 2. Compute payload hash
var payloadHash = ComputePayloadHash(job);
// 3. Get previous chain link
var partitionKey = _options.DefaultPartitionKey;
var prevLink = await _chainHeadRepository.GetLastLinkAsync(job.TenantId, partitionKey, ct);
// 4. Compute chain link
var link = SchedulerChainLinking.ComputeLink(prevLink, job.Id, tHlc, payloadHash);
// 5. Create log entry (InsertWithChainUpdateAsync updates chain head atomically)
var entry = new SchedulerLogEntity
{
TenantId = job.TenantId,
THlc = tHlc.ToSortableString(),
PartitionKey = partitionKey,
JobId = job.Id,
PayloadHash = payloadHash,
PrevLink = prevLink,
Link = link,
CreatedAt = DateTimeOffset.UtcNow
};
// 6. Insert with chain update (atomically inserts entry AND updates chain head)
await _logRepository.InsertWithChainUpdateAsync(entry, ct);
_logger.LogDebug(
"Dual-wrote job {JobId} to scheduler_log with HLC {THlc} and link {Link}",
job.Id,
tHlc.ToSortableString(),
Convert.ToHexString(link).ToLowerInvariant());
}
private async Task<IReadOnlyList<JobEntity>> GetScheduledJobsByHlcAsync(
string tenantId,
string[] jobTypes,
int limit,
CancellationToken ct)
{
// Get job IDs from scheduler_log in HLC order
var logEntries = await _logRepository.GetByHlcOrderAsync(tenantId, null, limit, ct);
if (logEntries.Count == 0)
{
return Array.Empty<JobEntity>();
}
// Fetch full job entities from legacy table
var jobs = new List<JobEntity>();
foreach (var entry in logEntries)
{
var job = await _inner.GetByIdAsync(tenantId, entry.JobId, ct);
if (job is not null &&
job.Status == JobStatus.Scheduled &&
(jobTypes.Length == 0 || jobTypes.Contains(job.JobType)))
{
jobs.Add(job);
}
}
return jobs;
}
private static byte[] ComputePayloadHash(JobEntity job)
{
// Hash key fields that define the job's identity
using var hasher = IncrementalHash.CreateHash(HashAlgorithmName.SHA256);
hasher.AppendData(Encoding.UTF8.GetBytes(job.TenantId));
hasher.AppendData(Encoding.UTF8.GetBytes(job.JobType));
hasher.AppendData(Encoding.UTF8.GetBytes(job.IdempotencyKey ?? ""));
hasher.AppendData(Encoding.UTF8.GetBytes(job.Payload ?? ""));
return hasher.GetHashAndReset();
}
}

View File

@@ -0,0 +1,163 @@
# HLC Scheduler Queue Migration Guide
This guide explains how to enable Hybrid Logical Clock (HLC) ordering on existing Scheduler deployments.
## Overview
The HLC scheduler queue adds:
- Deterministic, monotonic job ordering via HLC timestamps
- Cryptographic chain proofs for audit/compliance
- Batch snapshots for checkpoint anchoring
## Prerequisites
Before enabling HLC ordering, ensure:
1. **Database migrations applied:**
- `scheduler.scheduler_log` table
- `scheduler.chain_heads` table
- `scheduler.batch_snapshot` table
- `scheduler.upsert_chain_head` function
2. **HLC library configured:**
- `StellaOps.HybridLogicalClock` package referenced
- `IHybridLogicalClock` registered in DI
3. **Feature flag options defined:**
- `HlcSchedulerOptions` section in configuration
## Migration Phases
### Phase 1: Dual-Write (Write both, Read legacy)
Configure:
```json
{
"Scheduler": {
"HlcOrdering": {
"EnableHlcOrdering": false,
"EnableDualWrite": true,
"NodeId": "scheduler-instance-01"
}
}
}
```
In this phase:
- Jobs are written to both `scheduler.jobs` AND `scheduler.scheduler_log`
- Reads/dequeue still use legacy ordering (`priority DESC, created_at`)
- Chain links are computed and stored for all new jobs
**Validation:**
- Verify `scheduler.scheduler_log` is being populated
- Run chain verification to confirm integrity
- Monitor for any performance impact
### Phase 2: Dual-Write (Write both, Read HLC)
Configure:
```json
{
"Scheduler": {
"HlcOrdering": {
"EnableHlcOrdering": true,
"EnableDualWrite": true,
"NodeId": "scheduler-instance-01",
"VerifyChainOnDequeue": true
}
}
}
```
In this phase:
- Jobs are written to both tables
- Reads/dequeue now use HLC ordering from `scheduler.scheduler_log`
- Chain verification is enabled for additional safety
**Validation:**
- Verify job processing order matches HLC timestamps
- Compare dequeue behavior between legacy and HLC
- Monitor chain verification metrics
### Phase 3: HLC Only
Configure:
```json
{
"Scheduler": {
"HlcOrdering": {
"EnableHlcOrdering": true,
"EnableDualWrite": false,
"NodeId": "scheduler-instance-01",
"VerifyChainOnDequeue": false
}
}
}
```
In this phase:
- Jobs are written only to `scheduler.scheduler_log`
- Legacy `scheduler.jobs` table is no longer used for new jobs
- Chain verification can be disabled for performance (optional)
## Configuration Reference
| Setting | Type | Default | Description |
|---------|------|---------|-------------|
| `EnableHlcOrdering` | bool | false | Use HLC-based ordering for dequeue |
| `EnableDualWrite` | bool | false | Write to both legacy and HLC tables |
| `NodeId` | string | machine name | Unique ID for this scheduler instance |
| `VerifyChainOnDequeue` | bool | false | Verify chain integrity on each dequeue |
| `SignBatchSnapshots` | bool | false | Sign snapshots with DSSE |
| `DefaultPartitionKey` | string | "" | Default partition for unpartitioned jobs |
| `BatchSnapshotIntervalSeconds` | int | 0 | Auto-snapshot interval (0 = disabled) |
| `MaxClockSkewMs` | int | 1000 | Maximum tolerated clock skew |
## DI Registration
Register HLC scheduler services:
```csharp
services.AddHlcSchedulerQueue();
services.AddOptions<HlcSchedulerOptions>()
.Bind(configuration.GetSection(HlcSchedulerOptions.SectionName))
.ValidateDataAnnotations()
.ValidateOnStart();
```
## Rollback Procedure
If issues arise during migration:
1. **Phase 2 -> Phase 1:**
Set `EnableHlcOrdering: false` while keeping `EnableDualWrite: true`
2. **Phase 3 -> Phase 2:**
Set `EnableDualWrite: true` to resume writing to legacy table
3. **Full rollback:**
Set both `EnableHlcOrdering: false` and `EnableDualWrite: false`
## Monitoring
Key metrics to watch:
- `scheduler_hlc_enqueues_total` - Total HLC enqueue operations
- `scheduler_chain_verifications_total` - Chain verification operations
- `scheduler_chain_verification_failures_total` - Failed verifications
- `scheduler_batch_snapshots_total` - Batch snapshot operations
## Troubleshooting
### Chain verification failures
- Check for out-of-order inserts
- Verify `chain_heads` table consistency
- Check for concurrent enqueue race conditions
### Clock skew errors
- Increase `MaxClockSkewMs` if nodes have drift
- Consider NTP synchronization improvements
### Performance degradation
- Disable `VerifyChainOnDequeue` if overhead is high
- Reduce `BatchSnapshotIntervalSeconds`
- Review index usage on `scheduler_log.t_hlc`

View File

@@ -0,0 +1,207 @@
// -----------------------------------------------------------------------------
// HlcSchedulerMetrics.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-022 - Metrics: scheduler_hlc_enqueues_total, scheduler_chain_verifications_total
// -----------------------------------------------------------------------------
using System.Collections.Generic;
using System.Diagnostics.Metrics;
namespace StellaOps.Scheduler.Queue.Metrics;
/// <summary>
/// Metrics for HLC scheduler queue operations.
/// </summary>
public sealed class HlcSchedulerMetrics : IDisposable
{
/// <summary>
/// Meter name for HLC scheduler metrics.
/// </summary>
public const string MeterName = "StellaOps.Scheduler.HlcQueue";
private readonly Meter _meter;
private readonly Counter<long> _enqueuesTotal;
private readonly Counter<long> _enqueuesDuplicatesTotal;
private readonly Counter<long> _dequeueTot;
private readonly Counter<long> _chainVerificationsTotal;
private readonly Counter<long> _chainVerificationFailuresTotal;
private readonly Counter<long> _batchSnapshotsTotal;
private readonly Histogram<double> _enqueueLatencyMs;
private readonly Histogram<double> _chainLinkComputeLatencyMs;
private readonly Histogram<double> _verificationLatencyMs;
/// <summary>
/// Creates a new HLC scheduler metrics instance.
/// </summary>
public HlcSchedulerMetrics(IMeterFactory? meterFactory = null)
{
_meter = meterFactory?.Create(MeterName) ?? new Meter(MeterName);
_enqueuesTotal = _meter.CreateCounter<long>(
"scheduler_hlc_enqueues_total",
unit: "{enqueue}",
description: "Total number of HLC-ordered enqueue operations");
_enqueuesDuplicatesTotal = _meter.CreateCounter<long>(
"scheduler_hlc_enqueues_duplicates_total",
unit: "{duplicate}",
description: "Total number of duplicate enqueue attempts (idempotency hits)");
_dequeueTot = _meter.CreateCounter<long>(
"scheduler_hlc_dequeues_total",
unit: "{dequeue}",
description: "Total number of HLC-ordered dequeue operations");
_chainVerificationsTotal = _meter.CreateCounter<long>(
"scheduler_chain_verifications_total",
unit: "{verification}",
description: "Total number of chain verification operations");
_chainVerificationFailuresTotal = _meter.CreateCounter<long>(
"scheduler_chain_verification_failures_total",
unit: "{failure}",
description: "Total number of chain verification failures");
_batchSnapshotsTotal = _meter.CreateCounter<long>(
"scheduler_batch_snapshots_total",
unit: "{snapshot}",
description: "Total number of batch snapshots created");
_enqueueLatencyMs = _meter.CreateHistogram<double>(
"scheduler_hlc_enqueue_latency_ms",
unit: "ms",
description: "Latency of HLC enqueue operations in milliseconds");
_chainLinkComputeLatencyMs = _meter.CreateHistogram<double>(
"scheduler_chain_link_compute_latency_ms",
unit: "ms",
description: "Latency of chain link computation in milliseconds");
_verificationLatencyMs = _meter.CreateHistogram<double>(
"scheduler_chain_verification_latency_ms",
unit: "ms",
description: "Latency of chain verification operations in milliseconds");
}
/// <summary>
/// Records an enqueue operation.
/// </summary>
/// <param name="tenantId">Tenant identifier.</param>
/// <param name="jobType">Type of job being enqueued.</param>
/// <param name="latencyMs">Operation latency in milliseconds.</param>
public void RecordEnqueue(string tenantId, string jobType, double latencyMs)
{
var tags = new KeyValuePair<string, object?>[]
{
new("tenant_id", tenantId),
new("job_type", jobType)
};
_enqueuesTotal.Add(1, tags);
_enqueueLatencyMs.Record(latencyMs, tags);
}
/// <summary>
/// Records a duplicate enqueue attempt.
/// </summary>
/// <param name="tenantId">Tenant identifier.</param>
public void RecordDuplicateEnqueue(string tenantId)
{
_enqueuesDuplicatesTotal.Add(1, new KeyValuePair<string, object?>("tenant_id", tenantId));
}
/// <summary>
/// Records a dequeue operation.
/// </summary>
/// <param name="tenantId">Tenant identifier.</param>
/// <param name="count">Number of jobs dequeued.</param>
public void RecordDequeue(string tenantId, int count)
{
_dequeueTot.Add(count, new KeyValuePair<string, object?>("tenant_id", tenantId));
}
/// <summary>
/// Records a chain verification operation.
/// </summary>
/// <param name="tenantId">Tenant identifier.</param>
/// <param name="success">Whether verification succeeded.</param>
/// <param name="entriesChecked">Number of entries verified.</param>
/// <param name="latencyMs">Operation latency in milliseconds.</param>
public void RecordChainVerification(string tenantId, bool success, int entriesChecked, double latencyMs)
{
var tags = new KeyValuePair<string, object?>[]
{
new("tenant_id", tenantId),
new("result", success ? "success" : "failure")
};
_chainVerificationsTotal.Add(1, tags);
_verificationLatencyMs.Record(latencyMs, tags);
if (!success)
{
_chainVerificationFailuresTotal.Add(1, new KeyValuePair<string, object?>("tenant_id", tenantId));
}
}
/// <summary>
/// Records a batch snapshot creation.
/// </summary>
/// <param name="tenantId">Tenant identifier.</param>
/// <param name="jobCount">Number of jobs in the snapshot.</param>
/// <param name="signed">Whether the snapshot was signed.</param>
public void RecordBatchSnapshot(string tenantId, int jobCount, bool signed)
{
_batchSnapshotsTotal.Add(1,
new KeyValuePair<string, object?>("tenant_id", tenantId),
new KeyValuePair<string, object?>("signed", signed.ToString().ToLowerInvariant()));
}
/// <summary>
/// Records chain link computation latency.
/// </summary>
/// <param name="latencyMs">Computation latency in milliseconds.</param>
public void RecordChainLinkCompute(double latencyMs)
{
_chainLinkComputeLatencyMs.Record(latencyMs);
}
/// <inheritdoc />
public void Dispose()
{
_meter.Dispose();
}
}
/// <summary>
/// Static metric names for reference and configuration.
/// </summary>
public static class HlcSchedulerMetricNames
{
/// <summary>Total HLC enqueues.</summary>
public const string EnqueuesTotal = "scheduler_hlc_enqueues_total";
/// <summary>Total duplicate enqueue attempts.</summary>
public const string EnqueuesDuplicatesTotal = "scheduler_hlc_enqueues_duplicates_total";
/// <summary>Total HLC dequeues.</summary>
public const string DequeuesTotal = "scheduler_hlc_dequeues_total";
/// <summary>Total chain verifications.</summary>
public const string ChainVerificationsTotal = "scheduler_chain_verifications_total";
/// <summary>Total chain verification failures.</summary>
public const string ChainVerificationFailuresTotal = "scheduler_chain_verification_failures_total";
/// <summary>Total batch snapshots created.</summary>
public const string BatchSnapshotsTotal = "scheduler_batch_snapshots_total";
/// <summary>Enqueue latency histogram.</summary>
public const string EnqueueLatencyMs = "scheduler_hlc_enqueue_latency_ms";
/// <summary>Chain link computation latency histogram.</summary>
public const string ChainLinkComputeLatencyMs = "scheduler_chain_link_compute_latency_ms";
/// <summary>Chain verification latency histogram.</summary>
public const string VerificationLatencyMs = "scheduler_chain_verification_latency_ms";
}

View File

@@ -0,0 +1,65 @@
// -----------------------------------------------------------------------------
// BatchSnapshotResult.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-013 - Implement BatchSnapshotService
// -----------------------------------------------------------------------------
using StellaOps.HybridLogicalClock;
namespace StellaOps.Scheduler.Queue.Models;
/// <summary>
/// Result of creating a batch snapshot.
/// </summary>
public sealed record BatchSnapshotResult
{
/// <summary>
/// Unique batch snapshot identifier.
/// </summary>
public required Guid BatchId { get; init; }
/// <summary>
/// Tenant this snapshot belongs to.
/// </summary>
public required string TenantId { get; init; }
/// <summary>
/// Start of the HLC range (inclusive).
/// </summary>
public required HlcTimestamp RangeStart { get; init; }
/// <summary>
/// End of the HLC range (inclusive).
/// </summary>
public required HlcTimestamp RangeEnd { get; init; }
/// <summary>
/// Chain head link at the end of this range.
/// </summary>
public required byte[] HeadLink { get; init; }
/// <summary>
/// Number of jobs included in this snapshot.
/// </summary>
public required int JobCount { get; init; }
/// <summary>
/// When the snapshot was created.
/// </summary>
public required DateTimeOffset CreatedAt { get; init; }
/// <summary>
/// Key ID of the signer (if signed).
/// </summary>
public string? SignedBy { get; init; }
/// <summary>
/// DSSE signature (if signed).
/// </summary>
public byte[]? Signature { get; init; }
/// <summary>
/// Whether this snapshot is signed.
/// </summary>
public bool IsSigned => SignedBy is not null && Signature is not null;
}

View File

@@ -0,0 +1,125 @@
// -----------------------------------------------------------------------------
// ChainVerificationResult.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-015 - Implement chain verification
// -----------------------------------------------------------------------------
using StellaOps.Scheduler.Persistence.Postgres;
namespace StellaOps.Scheduler.Queue.Models;
/// <summary>
/// Result of chain verification.
/// </summary>
public sealed record ChainVerificationResult
{
/// <summary>
/// Whether the chain is valid (no issues found).
/// </summary>
public required bool IsValid { get; init; }
/// <summary>
/// Number of entries checked.
/// </summary>
public required int EntriesChecked { get; init; }
/// <summary>
/// List of issues found during verification.
/// </summary>
public required IReadOnlyList<ChainVerificationIssue> Issues { get; init; }
/// <summary>
/// First valid entry's HLC timestamp (null if no entries).
/// </summary>
public string? FirstHlc { get; init; }
/// <summary>
/// Last valid entry's HLC timestamp (null if no entries).
/// </summary>
public string? LastHlc { get; init; }
/// <summary>
/// Head link after verification (null if no entries).
/// </summary>
public byte[]? HeadLink { get; init; }
/// <summary>
/// Get a summary of the verification result.
/// </summary>
public string GetSummary()
{
if (IsValid)
{
return $"Chain valid: {EntriesChecked} entries verified, range [{FirstHlc}, {LastHlc}], head {SchedulerChainLinking.ToHexString(HeadLink)}";
}
return $"Chain INVALID: {Issues.Count} issue(s) found in {EntriesChecked} entries";
}
}
/// <summary>
/// Represents a single issue found during chain verification.
/// </summary>
public sealed record ChainVerificationIssue
{
/// <summary>
/// Job ID where the issue was found.
/// </summary>
public required Guid JobId { get; init; }
/// <summary>
/// HLC timestamp of the problematic entry.
/// </summary>
public required string THlc { get; init; }
/// <summary>
/// Type of issue found.
/// </summary>
public required ChainVerificationIssueType IssueType { get; init; }
/// <summary>
/// Human-readable description of the issue.
/// </summary>
public required string Description { get; init; }
/// <summary>
/// Expected value (for comparison issues).
/// </summary>
public string? Expected { get; init; }
/// <summary>
/// Actual value found (for comparison issues).
/// </summary>
public string? Actual { get; init; }
}
/// <summary>
/// Types of chain verification issues.
/// </summary>
public enum ChainVerificationIssueType
{
/// <summary>
/// The prev_link doesn't match the previous entry's link.
/// </summary>
PrevLinkMismatch,
/// <summary>
/// The stored link doesn't match the computed link.
/// </summary>
LinkMismatch,
/// <summary>
/// The HLC timestamp is out of order.
/// </summary>
HlcOrderViolation,
/// <summary>
/// The payload hash has invalid length.
/// </summary>
InvalidPayloadHash,
/// <summary>
/// The link has invalid length.
/// </summary>
InvalidLinkLength
}

View File

@@ -0,0 +1,65 @@
// -----------------------------------------------------------------------------
// SchedulerDequeueResult.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-010 - Implement HlcSchedulerDequeueService
// -----------------------------------------------------------------------------
using StellaOps.HybridLogicalClock;
namespace StellaOps.Scheduler.Queue.Models;
/// <summary>
/// Represents a dequeued job with its HLC ordering and chain proof.
/// </summary>
public sealed record SchedulerDequeueResult
{
/// <summary>
/// Job identifier.
/// </summary>
public required Guid JobId { get; init; }
/// <summary>
/// HLC timestamp that determines this job's position in the total order.
/// </summary>
public required HlcTimestamp Timestamp { get; init; }
/// <summary>
/// HLC timestamp as sortable string.
/// </summary>
public required string THlcString { get; init; }
/// <summary>
/// Tenant this job belongs to.
/// </summary>
public required string TenantId { get; init; }
/// <summary>
/// Queue partition for this job.
/// </summary>
public string PartitionKey { get; init; } = string.Empty;
/// <summary>
/// Chain link proving sequence position.
/// </summary>
public required byte[] Link { get; init; }
/// <summary>
/// Previous chain link (null for first entry).
/// </summary>
public byte[]? PrevLink { get; init; }
/// <summary>
/// SHA-256 hash of the canonical payload.
/// </summary>
public required byte[] PayloadHash { get; init; }
/// <summary>
/// Database sequence number for reference (not authoritative).
/// </summary>
public long SeqBigint { get; init; }
/// <summary>
/// Wall-clock creation time (not authoritative for ordering).
/// </summary>
public DateTimeOffset CreatedAt { get; init; }
}

View File

@@ -0,0 +1,49 @@
// -----------------------------------------------------------------------------
// SchedulerEnqueueResult.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-009 - Implement HlcSchedulerEnqueueService
// -----------------------------------------------------------------------------
using StellaOps.HybridLogicalClock;
namespace StellaOps.Scheduler.Queue.Models;
/// <summary>
/// Result of an HLC-ordered enqueue operation.
/// Contains the assigned HLC timestamp, job ID, and chain link.
/// </summary>
public sealed record SchedulerEnqueueResult
{
/// <summary>
/// HLC timestamp assigned at enqueue time.
/// This determines the job's position in the total order.
/// </summary>
public required HlcTimestamp Timestamp { get; init; }
/// <summary>
/// Deterministic job ID computed from payload.
/// </summary>
public required Guid JobId { get; init; }
/// <summary>
/// Chain link (SHA-256 hash) proving sequence position.
/// link = Hash(prev_link || job_id || t_hlc || payload_hash)
/// </summary>
public required byte[] Link { get; init; }
/// <summary>
/// SHA-256 hash of the canonical payload.
/// </summary>
public required byte[] PayloadHash { get; init; }
/// <summary>
/// Previous chain link (null for first entry in partition).
/// </summary>
public byte[]? PrevLink { get; init; }
/// <summary>
/// Whether this was a duplicate submission (idempotent).
/// If true, the existing job's values are returned.
/// </summary>
public bool IsDuplicate { get; init; }
}

View File

@@ -0,0 +1,68 @@
// -----------------------------------------------------------------------------
// SchedulerJobPayload.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-009 - Implement HlcSchedulerEnqueueService
// -----------------------------------------------------------------------------
using System.Collections.Immutable;
namespace StellaOps.Scheduler.Queue.Models;
/// <summary>
/// Represents a job payload for HLC-ordered scheduling.
/// This is the input to the enqueue operation.
/// </summary>
public sealed record SchedulerJobPayload
{
/// <summary>
/// Tenant this job belongs to.
/// </summary>
public required string TenantId { get; init; }
/// <summary>
/// Optional partition key for queue partitioning.
/// Jobs with the same partition key form a chain.
/// </summary>
public string PartitionKey { get; init; } = string.Empty;
/// <summary>
/// Type of job to execute (e.g., "PolicyRun", "GraphBuild").
/// </summary>
public required string JobType { get; init; }
/// <summary>
/// Job priority (higher = more important).
/// </summary>
public int Priority { get; init; }
/// <summary>
/// Idempotency key (unique per tenant).
/// Used to deduplicate job submissions.
/// </summary>
public required string IdempotencyKey { get; init; }
/// <summary>
/// Correlation ID for distributed tracing.
/// </summary>
public string? CorrelationId { get; init; }
/// <summary>
/// Maximum number of retry attempts.
/// </summary>
public int MaxAttempts { get; init; } = 3;
/// <summary>
/// Optional delay before job becomes available.
/// </summary>
public DateTimeOffset? NotBefore { get; init; }
/// <summary>
/// User or service that created the job.
/// </summary>
public string? CreatedBy { get; init; }
/// <summary>
/// Job-specific payload data (will be serialized to JSON).
/// </summary>
public ImmutableDictionary<string, object?>? Data { get; init; }
}

View File

@@ -23,4 +23,27 @@ internal interface INatsSchedulerQueuePayload<TMessage>
string? GetCorrelationId(TMessage message);
IReadOnlyDictionary<string, string>? GetAttributes(TMessage message);
// HLC fields for deterministic ordering (SPRINT_20260105_002_002)
// Default implementations return null for backward compatibility
/// <summary>
/// Gets the HLC timestamp string for deterministic ordering.
/// </summary>
string? GetTHlc(TMessage message) => null;
/// <summary>
/// Gets the chain link (hex-encoded SHA-256) proving sequence position.
/// </summary>
string? GetChainLink(TMessage message) => null;
/// <summary>
/// Gets the previous chain link (hex-encoded, null for first entry).
/// </summary>
string? GetPrevChainLink(TMessage message) => null;
/// <summary>
/// Gets the payload hash (hex-encoded SHA-256).
/// </summary>
string? GetPayloadHash(TMessage message) => null;
}

View File

@@ -613,6 +613,31 @@ internal abstract class NatsSchedulerQueueBase<TMessage> : ISchedulerQueue<TMess
headers.Add(SchedulerQueueFields.CorrelationId, correlationId);
}
// HLC fields for deterministic ordering (SPRINT_20260105_002_002)
var tHlc = _payload.GetTHlc(message);
if (!string.IsNullOrWhiteSpace(tHlc))
{
headers.Add(SchedulerQueueFields.THlc, tHlc);
}
var chainLink = _payload.GetChainLink(message);
if (!string.IsNullOrWhiteSpace(chainLink))
{
headers.Add(SchedulerQueueFields.ChainLink, chainLink);
}
var prevChainLink = _payload.GetPrevChainLink(message);
if (!string.IsNullOrWhiteSpace(prevChainLink))
{
headers.Add(SchedulerQueueFields.PrevChainLink, prevChainLink);
}
var payloadHash = _payload.GetPayloadHash(message);
if (!string.IsNullOrWhiteSpace(payloadHash))
{
headers.Add(SchedulerQueueFields.PayloadHash, payloadHash);
}
var attributes = _payload.GetAttributes(message);
if (attributes is not null)
{

View File

@@ -0,0 +1,92 @@
// -----------------------------------------------------------------------------
// HlcSchedulerOptions.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-020 - Feature flag: SchedulerOptions.EnableHlcOrdering
// -----------------------------------------------------------------------------
using System.ComponentModel.DataAnnotations;
namespace StellaOps.Scheduler.Queue.Options;
/// <summary>
/// Configuration options for HLC-based scheduler queue ordering.
/// </summary>
public sealed class HlcSchedulerOptions
{
/// <summary>
/// Configuration section name.
/// </summary>
public const string SectionName = "Scheduler:HlcOrdering";
/// <summary>
/// Gets or sets whether HLC-based ordering is enabled.
/// When true, the scheduler uses hybrid logical clock timestamps for
/// deterministic, monotonic job ordering with cryptographic chain proofs.
/// </summary>
/// <remarks>
/// Enabling HLC ordering:
/// - Jobs are ordered by HLC timestamp (t_hlc) instead of created_at
/// - Each job gets a chain link: Hash(prev_link || job_id || t_hlc || payload_hash)
/// - Chain integrity can be verified for audit/compliance
/// - Requires scheduler.scheduler_log and scheduler.chain_heads tables
/// </remarks>
public bool EnableHlcOrdering { get; set; } = false;
/// <summary>
/// Gets or sets the node ID for this scheduler instance.
/// Used in HLC timestamps for tie-breaking and distributed ordering.
/// </summary>
/// <remarks>
/// Should be unique per scheduler instance (e.g., hostname, pod name).
/// If not specified, defaults to machine name.
/// </remarks>
[Required(AllowEmptyStrings = false)]
public string NodeId { get; set; } = Environment.MachineName;
/// <summary>
/// Gets or sets whether to enable dual-write mode.
/// When true, writes to both legacy jobs table and HLC scheduler_log.
/// </summary>
/// <remarks>
/// Dual-write mode allows gradual migration:
/// Phase 1: DualWrite=true, EnableHlcOrdering=false (write both, read legacy)
/// Phase 2: DualWrite=true, EnableHlcOrdering=true (write both, read HLC)
/// Phase 3: DualWrite=false, EnableHlcOrdering=true (write/read HLC only)
/// </remarks>
public bool EnableDualWrite { get; set; } = false;
/// <summary>
/// Gets or sets whether to verify chain integrity on dequeue.
/// When true, verifies prev_link matches expected value for each job.
/// </summary>
/// <remarks>
/// Enabling verification adds overhead but catches tampering/corruption.
/// Recommended for high-security/compliance environments.
/// </remarks>
public bool VerifyChainOnDequeue { get; set; } = false;
/// <summary>
/// Gets or sets whether to sign batch snapshots with DSSE.
/// Requires attestation signing service to be configured.
/// </summary>
public bool SignBatchSnapshots { get; set; } = false;
/// <summary>
/// Gets or sets the default partition key for jobs without explicit partition.
/// </summary>
public string DefaultPartitionKey { get; set; } = "";
/// <summary>
/// Gets or sets the batch snapshot interval in seconds.
/// Zero disables automatic batch snapshots.
/// </summary>
[Range(0, 86400)] // 0 to 24 hours
public int BatchSnapshotIntervalSeconds { get; set; } = 0;
/// <summary>
/// Gets or sets the maximum clock skew tolerance in milliseconds.
/// HLC will reject operations with physical time more than this ahead of local time.
/// </summary>
[Range(0, 60000)] // 0 to 60 seconds
public int MaxClockSkewMs { get; set; } = 1000;
}

View File

@@ -23,4 +23,27 @@ internal interface IRedisSchedulerQueuePayload<TMessage>
string? GetCorrelationId(TMessage message);
IReadOnlyDictionary<string, string>? GetAttributes(TMessage message);
// HLC fields for deterministic ordering (SPRINT_20260105_002_002)
// Default implementations return null for backward compatibility
/// <summary>
/// Gets the HLC timestamp string for deterministic ordering.
/// </summary>
string? GetTHlc(TMessage message) => null;
/// <summary>
/// Gets the chain link (hex-encoded SHA-256) proving sequence position.
/// </summary>
string? GetChainLink(TMessage message) => null;
/// <summary>
/// Gets the previous chain link (hex-encoded, null for first entry).
/// </summary>
string? GetPrevChainLink(TMessage message) => null;
/// <summary>
/// Gets the payload hash (hex-encoded SHA-256).
/// </summary>
string? GetPayloadHash(TMessage message) => null;
}

View File

@@ -6,7 +6,6 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using StellaOps.HybridLogicalClock;
using StackExchange.Redis;
namespace StellaOps.Scheduler.Queue.Redis;
@@ -21,7 +20,6 @@ internal abstract class RedisSchedulerQueueBase<TMessage> : ISchedulerQueue<TMes
private readonly IRedisSchedulerQueuePayload<TMessage> _payload;
private readonly ILogger _logger;
private readonly TimeProvider _timeProvider;
private readonly IHybridLogicalClock? _hlc;
private readonly Func<ConfigurationOptions, Task<IConnectionMultiplexer>> _connectionFactory;
private readonly SemaphoreSlim _connectionLock = new(1, 1);
private readonly SemaphoreSlim _groupInitLock = new(1, 1);
@@ -38,7 +36,6 @@ internal abstract class RedisSchedulerQueueBase<TMessage> : ISchedulerQueue<TMes
IRedisSchedulerQueuePayload<TMessage> payload,
ILogger logger,
TimeProvider timeProvider,
IHybridLogicalClock? hlc = null,
Func<ConfigurationOptions, Task<IConnectionMultiplexer>>? connectionFactory = null)
{
_queueOptions = queueOptions ?? throw new ArgumentNullException(nameof(queueOptions));
@@ -47,7 +44,6 @@ internal abstract class RedisSchedulerQueueBase<TMessage> : ISchedulerQueue<TMes
_payload = payload ?? throw new ArgumentNullException(nameof(payload));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
_hlc = hlc;
_connectionFactory = connectionFactory ?? (config => Task.FromResult<IConnectionMultiplexer>(ConnectionMultiplexer.Connect(config)));
if (string.IsNullOrWhiteSpace(_redisOptions.ConnectionString))
@@ -78,11 +74,7 @@ internal abstract class RedisSchedulerQueueBase<TMessage> : ISchedulerQueue<TMes
var now = _timeProvider.GetUtcNow();
var attempt = 1;
// Generate HLC timestamp if clock is available
var hlcTimestamp = _hlc?.Tick();
var entries = BuildEntries(message, now, attempt, hlcTimestamp);
var entries = BuildEntries(message, now, attempt);
var messageId = await AddToStreamAsync(
database,
@@ -563,12 +555,12 @@ internal abstract class RedisSchedulerQueueBase<TMessage> : ISchedulerQueue<TMes
private NameValueEntry[] BuildEntries(
TMessage message,
DateTimeOffset enqueuedAt,
int attempt,
HlcTimestamp? hlcTimestamp = null)
int attempt)
{
var attributes = _payload.GetAttributes(message);
var attributeCount = attributes?.Count ?? 0;
var entries = ArrayPool<NameValueEntry>.Shared.Rent(11 + attributeCount);
// Increased capacity for HLC fields (4 additional)
var entries = ArrayPool<NameValueEntry>.Shared.Rent(14 + attributeCount);
var index = 0;
entries[index++] = new NameValueEntry(SchedulerQueueFields.QueueKind, _payload.QueueName);
@@ -598,10 +590,29 @@ internal abstract class RedisSchedulerQueueBase<TMessage> : ISchedulerQueue<TMes
entries[index++] = new NameValueEntry(SchedulerQueueFields.EnqueuedAt, enqueuedAt.ToUnixTimeMilliseconds());
entries[index++] = new NameValueEntry(SchedulerQueueFields.Payload, _payload.Serialize(message));
// Include HLC timestamp if available
if (hlcTimestamp.HasValue)
// HLC fields for deterministic ordering (SPRINT_20260105_002_002)
var tHlc = _payload.GetTHlc(message);
if (!string.IsNullOrWhiteSpace(tHlc))
{
entries[index++] = new NameValueEntry(SchedulerQueueFields.HlcTimestamp, hlcTimestamp.Value.ToSortableString());
entries[index++] = new NameValueEntry(SchedulerQueueFields.THlc, tHlc);
}
var chainLink = _payload.GetChainLink(message);
if (!string.IsNullOrWhiteSpace(chainLink))
{
entries[index++] = new NameValueEntry(SchedulerQueueFields.ChainLink, chainLink);
}
var prevChainLink = _payload.GetPrevChainLink(message);
if (!string.IsNullOrWhiteSpace(prevChainLink))
{
entries[index++] = new NameValueEntry(SchedulerQueueFields.PrevChainLink, prevChainLink);
}
var payloadHash = _payload.GetPayloadHash(message);
if (!string.IsNullOrWhiteSpace(payloadHash))
{
entries[index++] = new NameValueEntry(SchedulerQueueFields.PayloadHash, payloadHash);
}
if (attributeCount > 0 && attributes is not null)
@@ -638,7 +649,6 @@ internal abstract class RedisSchedulerQueueBase<TMessage> : ISchedulerQueue<TMes
string? segmentId = null;
string? correlationId = null;
string? idempotencyKey = null;
string? hlcTimestampStr = null;
long? enqueuedAtUnix = null;
var attempt = attemptOverride ?? 1;
var attributes = new Dictionary<string, string>(StringComparer.Ordinal);
@@ -692,10 +702,6 @@ internal abstract class RedisSchedulerQueueBase<TMessage> : ISchedulerQueue<TMes
: Math.Max(1, parsedAttempt);
}
}
else if (name.Equals(SchedulerQueueFields.HlcTimestamp, StringComparison.Ordinal))
{
hlcTimestampStr = NormalizeOptional(value.ToString());
}
else if (name.StartsWith(SchedulerQueueFields.AttributePrefix, StringComparison.Ordinal))
{
var key = name[SchedulerQueueFields.AttributePrefix.Length..];
@@ -712,14 +718,6 @@ internal abstract class RedisSchedulerQueueBase<TMessage> : ISchedulerQueue<TMes
var enqueuedAt = DateTimeOffset.FromUnixTimeMilliseconds(enqueuedAtUnix.Value);
var leaseExpires = now.Add(leaseDuration);
// Parse HLC timestamp if present
HlcTimestamp? hlcTimestamp = null;
if (!string.IsNullOrEmpty(hlcTimestampStr) &&
HlcTimestamp.TryParse(hlcTimestampStr, out var parsedHlc))
{
hlcTimestamp = parsedHlc;
}
IReadOnlyDictionary<string, string> attributeView = attributes.Count == 0
? EmptyReadOnlyDictionary<string, string>.Instance
: new ReadOnlyDictionary<string, string>(attributes);
@@ -738,8 +736,7 @@ internal abstract class RedisSchedulerQueueBase<TMessage> : ISchedulerQueue<TMes
attempt,
enqueuedAt,
leaseExpires,
consumer,
hlcTimestamp);
consumer);
}
private async Task HandlePoisonEntryAsync(IDatabase database, RedisValue entryId)

View File

@@ -14,9 +14,25 @@ internal static class SchedulerQueueFields
public const string CorrelationId = "correlationId";
public const string AttributePrefix = "attr:";
// HLC-related fields for deterministic ordering (SPRINT_20260105_002_002)
/// <summary>
/// Hybrid Logical Clock timestamp for deterministic ordering.
/// Stored as sortable string format: {PhysicalTime:D13}-{NodeId}-{LogicalCounter:D6}
/// HLC timestamp string (e.g., "1704067200000-scheduler-east-1-000042").
/// This is the authoritative ordering key.
/// </summary>
public const string HlcTimestamp = "hlcTimestamp";
public const string THlc = "tHlc";
/// <summary>
/// Chain link (hex-encoded SHA-256) proving sequence position.
/// </summary>
public const string ChainLink = "chainLink";
/// <summary>
/// Previous chain link (hex-encoded, null for first entry).
/// </summary>
public const string PrevChainLink = "prevChainLink";
/// <summary>
/// SHA-256 hash of the canonical payload (hex-encoded).
/// </summary>
public const string PayloadHash = "payloadHash";
}

View File

@@ -0,0 +1,35 @@
// -----------------------------------------------------------------------------
// ServiceCollectionExtensions.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-009 - Implement HlcSchedulerEnqueueService
// -----------------------------------------------------------------------------
using Microsoft.Extensions.DependencyInjection;
using StellaOps.Scheduler.Queue.Services;
namespace StellaOps.Scheduler.Queue;
/// <summary>
/// Extension methods for registering scheduler queue services.
/// </summary>
public static class ServiceCollectionExtensions
{
/// <summary>
/// Adds the HLC-ordered scheduler queue services.
/// </summary>
/// <param name="services">Service collection.</param>
/// <returns>The service collection for chaining.</returns>
/// <remarks>
/// Prerequisites:
/// - IHybridLogicalClock must be registered (from StellaOps.HybridLogicalClock)
/// - ISchedulerLogRepository and IChainHeadRepository must be registered (from StellaOps.Scheduler.Persistence)
/// </remarks>
public static IServiceCollection AddHlcSchedulerQueue(this IServiceCollection services)
{
services.AddScoped<IHlcSchedulerEnqueueService, HlcSchedulerEnqueueService>();
services.AddScoped<IHlcSchedulerDequeueService, HlcSchedulerDequeueService>();
services.AddScoped<IBatchSnapshotService, BatchSnapshotService>();
services.AddScoped<ISchedulerChainVerifier, SchedulerChainVerifier>();
return services;
}
}

View File

@@ -0,0 +1,242 @@
// -----------------------------------------------------------------------------
// BatchSnapshotService.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-013, SQC-014 - Implement BatchSnapshotService with optional DSSE signing
// -----------------------------------------------------------------------------
using System.Security.Cryptography;
using System.Text;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StellaOps.Determinism;
using StellaOps.HybridLogicalClock;
using StellaOps.Scheduler.Persistence.Postgres.Models;
using StellaOps.Scheduler.Persistence.Postgres.Repositories;
using StellaOps.Scheduler.Queue.Models;
using StellaOps.Scheduler.Queue.Options;
using StellaOps.Scheduler.Queue.Signing;
namespace StellaOps.Scheduler.Queue.Services;
/// <summary>
/// Service for creating and managing batch snapshots of the scheduler log.
/// </summary>
public sealed class BatchSnapshotService : IBatchSnapshotService
{
private readonly ISchedulerLogRepository _logRepository;
private readonly IBatchSnapshotRepository _snapshotRepository;
private readonly IGuidProvider _guidProvider;
private readonly TimeProvider _timeProvider;
private readonly ISchedulerSnapshotSigner? _signer;
private readonly HlcSchedulerOptions _options;
private readonly ILogger<BatchSnapshotService> _logger;
public BatchSnapshotService(
ISchedulerLogRepository logRepository,
IBatchSnapshotRepository snapshotRepository,
IGuidProvider guidProvider,
TimeProvider timeProvider,
ILogger<BatchSnapshotService> logger,
ISchedulerSnapshotSigner? signer = null,
IOptions<HlcSchedulerOptions>? options = null)
{
_logRepository = logRepository ?? throw new ArgumentNullException(nameof(logRepository));
_snapshotRepository = snapshotRepository ?? throw new ArgumentNullException(nameof(snapshotRepository));
_guidProvider = guidProvider ?? throw new ArgumentNullException(nameof(guidProvider));
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_signer = signer;
_options = options?.Value ?? new HlcSchedulerOptions();
}
/// <inheritdoc />
public async Task<BatchSnapshotResult> CreateSnapshotAsync(
string tenantId,
HlcTimestamp startT,
HlcTimestamp endT,
CancellationToken ct = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
// Validate range
if (startT.CompareTo(endT) > 0)
{
throw new ArgumentException("Start timestamp must be <= end timestamp");
}
// 1. Get jobs in range
var jobs = await _logRepository.GetByHlcRangeAsync(
tenantId,
startT.ToSortableString(),
endT.ToSortableString(),
ct);
if (jobs.Count == 0)
{
throw new InvalidOperationException(
$"No jobs found in HLC range [{startT.ToSortableString()}, {endT.ToSortableString()}] for tenant {tenantId}");
}
// 2. Get chain head (last link in range)
var headLink = jobs[^1].Link;
// 3. Create snapshot entity
var batchId = _guidProvider.NewGuid();
var createdAt = _timeProvider.GetUtcNow();
var entity = new BatchSnapshotEntity
{
BatchId = batchId,
TenantId = tenantId,
RangeStartT = startT.ToSortableString(),
RangeEndT = endT.ToSortableString(),
HeadLink = headLink,
JobCount = jobs.Count,
CreatedAt = createdAt,
SignedBy = null,
Signature = null
};
// 4. Optional: Sign snapshot with DSSE (SQC-014)
if (_options.SignBatchSnapshots && _signer is not null && _signer.IsAvailable)
{
try
{
var digest = ComputeSnapshotDigest(entity);
var signResult = await _signer.SignAsync(digest, tenantId, ct);
// Use 'with' to create new entity with signature (init-only properties)
entity = entity with
{
SignedBy = signResult.KeyId,
Signature = signResult.Signature
};
_logger.LogDebug(
"Signed batch snapshot {BatchId} with key {KeyId} using {Algorithm}",
batchId,
signResult.KeyId,
signResult.Algorithm);
}
catch (Exception ex)
{
_logger.LogWarning(
ex,
"Failed to sign batch snapshot {BatchId} for tenant {TenantId}; proceeding without signature",
batchId,
tenantId);
}
}
// 5. Persist
await _snapshotRepository.InsertAsync(entity, ct);
_logger.LogInformation(
"Created batch snapshot {BatchId} for tenant {TenantId}: range [{Start}, {End}], {JobCount} jobs, head link {HeadLink}",
batchId,
tenantId,
startT.ToSortableString(),
endT.ToSortableString(),
jobs.Count,
Convert.ToHexString(headLink).ToLowerInvariant());
return MapToResult(entity);
}
/// <inheritdoc />
public async Task<BatchSnapshotResult?> GetByIdAsync(Guid batchId, CancellationToken ct = default)
{
var entity = await _snapshotRepository.GetByIdAsync(batchId, ct);
return entity is null ? null : MapToResult(entity);
}
/// <inheritdoc />
public async Task<IReadOnlyList<BatchSnapshotResult>> GetRecentAsync(
string tenantId,
int limit = 10,
CancellationToken ct = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
var entities = await _snapshotRepository.GetByTenantAsync(tenantId, limit, ct);
return entities.Select(MapToResult).ToList();
}
/// <inheritdoc />
public async Task<BatchSnapshotResult?> GetLatestAsync(
string tenantId,
CancellationToken ct = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
var entity = await _snapshotRepository.GetLatestAsync(tenantId, ct);
return entity is null ? null : MapToResult(entity);
}
/// <inheritdoc />
public async Task<IReadOnlyList<BatchSnapshotResult>> FindContainingAsync(
string tenantId,
HlcTimestamp timestamp,
CancellationToken ct = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
var entities = await _snapshotRepository.GetContainingHlcAsync(
tenantId,
timestamp.ToSortableString(),
ct);
return entities.Select(MapToResult).ToList();
}
private static BatchSnapshotResult MapToResult(BatchSnapshotEntity entity)
{
return new BatchSnapshotResult
{
BatchId = entity.BatchId,
TenantId = entity.TenantId,
RangeStart = HlcTimestamp.Parse(entity.RangeStartT),
RangeEnd = HlcTimestamp.Parse(entity.RangeEndT),
HeadLink = entity.HeadLink,
JobCount = entity.JobCount,
CreatedAt = entity.CreatedAt,
SignedBy = entity.SignedBy,
Signature = entity.Signature
};
}
/// <summary>
/// Computes deterministic SHA-256 digest of snapshot for signing.
/// </summary>
/// <remarks>
/// Digest is computed over: batchId || tenantId || rangeStartT || rangeEndT || headLink || jobCount
/// This ensures the signature covers all critical snapshot metadata.
/// </remarks>
private static byte[] ComputeSnapshotDigest(BatchSnapshotEntity entity)
{
using var hasher = IncrementalHash.CreateHash(HashAlgorithmName.SHA256);
// BatchId as bytes
hasher.AppendData(entity.BatchId.ToByteArray());
// TenantId as UTF-8 bytes
hasher.AppendData(Encoding.UTF8.GetBytes(entity.TenantId));
// Range timestamps as UTF-8 bytes
hasher.AppendData(Encoding.UTF8.GetBytes(entity.RangeStartT));
hasher.AppendData(Encoding.UTF8.GetBytes(entity.RangeEndT));
// Head link (chain proof)
hasher.AppendData(entity.HeadLink);
// Job count as 4-byte big-endian
var jobCountBytes = BitConverter.GetBytes(entity.JobCount);
if (BitConverter.IsLittleEndian)
{
Array.Reverse(jobCountBytes);
}
hasher.AppendData(jobCountBytes);
return hasher.GetHashAndReset();
}
}

View File

@@ -0,0 +1,159 @@
// -----------------------------------------------------------------------------
// HlcSchedulerDequeueService.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-010 - Implement HlcSchedulerDequeueService
// -----------------------------------------------------------------------------
using Microsoft.Extensions.Logging;
using StellaOps.HybridLogicalClock;
using StellaOps.Scheduler.Persistence.Postgres;
using StellaOps.Scheduler.Persistence.Postgres.Models;
using StellaOps.Scheduler.Persistence.Postgres.Repositories;
using StellaOps.Scheduler.Queue.Models;
namespace StellaOps.Scheduler.Queue.Services;
/// <summary>
/// Service for HLC-ordered job dequeue with chain verification.
/// </summary>
public sealed class HlcSchedulerDequeueService : IHlcSchedulerDequeueService
{
private readonly ISchedulerLogRepository _logRepository;
private readonly ILogger<HlcSchedulerDequeueService> _logger;
public HlcSchedulerDequeueService(
ISchedulerLogRepository logRepository,
ILogger<HlcSchedulerDequeueService> logger)
{
_logRepository = logRepository ?? throw new ArgumentNullException(nameof(logRepository));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc />
public async Task<IReadOnlyList<SchedulerDequeueResult>> DequeueAsync(
string tenantId,
string? partitionKey,
int limit,
CancellationToken ct = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
if (limit <= 0)
{
throw new ArgumentOutOfRangeException(nameof(limit), "Limit must be positive");
}
var entries = await _logRepository.GetByHlcOrderAsync(tenantId, partitionKey, limit, ct);
_logger.LogDebug(
"Dequeued {Count} jobs for tenant {TenantId}, partition {PartitionKey}",
entries.Count,
tenantId,
partitionKey ?? "(all)");
return MapToResults(entries);
}
/// <inheritdoc />
public async Task<IReadOnlyList<SchedulerDequeueResult>> DequeueByRangeAsync(
string tenantId,
HlcTimestamp? startT,
HlcTimestamp? endT,
CancellationToken ct = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
var startTString = startT?.ToSortableString();
var endTString = endT?.ToSortableString();
var entries = await _logRepository.GetByHlcRangeAsync(tenantId, startTString, endTString, ct);
_logger.LogDebug(
"Dequeued {Count} jobs for tenant {TenantId} in HLC range [{Start}, {End}]",
entries.Count,
tenantId,
startTString ?? "(none)",
endTString ?? "(none)");
return MapToResults(entries);
}
/// <inheritdoc />
public async Task<SchedulerDequeueResult?> GetByJobIdAsync(
Guid jobId,
CancellationToken ct = default)
{
var entry = await _logRepository.GetByJobIdAsync(jobId, ct);
return entry is null ? null : MapToResult(entry);
}
/// <inheritdoc />
public async Task<SchedulerDequeueResult?> GetByLinkAsync(
byte[] link,
CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(link);
if (link.Length != SchedulerChainLinking.LinkSizeBytes)
{
throw new ArgumentException(
$"Link must be {SchedulerChainLinking.LinkSizeBytes} bytes",
nameof(link));
}
var entry = await _logRepository.GetByLinkAsync(link, ct);
return entry is null ? null : MapToResult(entry);
}
/// <inheritdoc />
public async Task<int> CountByRangeAsync(
string tenantId,
HlcTimestamp? startT,
HlcTimestamp? endT,
CancellationToken ct = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
var startTString = startT?.ToSortableString();
var endTString = endT?.ToSortableString();
return await _logRepository.CountByHlcRangeAsync(tenantId, startTString, endTString, ct);
}
/// <summary>
/// Maps a log entity to a dequeue result.
/// </summary>
private static SchedulerDequeueResult MapToResult(SchedulerLogEntity entry)
{
return new SchedulerDequeueResult
{
JobId = entry.JobId,
Timestamp = HlcTimestamp.Parse(entry.THlc),
THlcString = entry.THlc,
TenantId = entry.TenantId,
PartitionKey = entry.PartitionKey,
Link = entry.Link,
PrevLink = entry.PrevLink,
PayloadHash = entry.PayloadHash,
SeqBigint = entry.SeqBigint,
CreatedAt = entry.CreatedAt
};
}
/// <summary>
/// Maps multiple log entities to dequeue results.
/// </summary>
private static IReadOnlyList<SchedulerDequeueResult> MapToResults(IReadOnlyList<SchedulerLogEntity> entries)
{
if (entries.Count == 0)
{
return Array.Empty<SchedulerDequeueResult>();
}
var results = new SchedulerDequeueResult[entries.Count];
for (var i = 0; i < entries.Count; i++)
{
results[i] = MapToResult(entries[i]);
}
return results;
}
}

View File

@@ -0,0 +1,308 @@
// -----------------------------------------------------------------------------
// HlcSchedulerEnqueueService.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-009 - Implement HlcSchedulerEnqueueService
// -----------------------------------------------------------------------------
using System.Security.Cryptography;
using System.Text;
using Microsoft.Extensions.Logging;
using StellaOps.Determinism;
using StellaOps.HybridLogicalClock;
using StellaOps.Scheduler.Models;
using StellaOps.Scheduler.Persistence.Postgres;
using StellaOps.Scheduler.Persistence.Postgres.Models;
using StellaOps.Scheduler.Persistence.Postgres.Repositories;
using StellaOps.Scheduler.Queue.Models;
namespace StellaOps.Scheduler.Queue.Services;
/// <summary>
/// Service for HLC-ordered job enqueueing with cryptographic chain linking.
/// </summary>
public sealed class HlcSchedulerEnqueueService : IHlcSchedulerEnqueueService
{
/// <summary>
/// Namespace UUID for deterministic job ID generation.
/// Using a fixed namespace ensures consistent job IDs across runs.
/// </summary>
private static readonly Guid JobIdNamespace = new("a1b2c3d4-e5f6-7890-abcd-ef1234567890");
private readonly IHybridLogicalClock _hlc;
private readonly ISchedulerLogRepository _logRepository;
private readonly IChainHeadRepository _chainHeadRepository;
private readonly ILogger<HlcSchedulerEnqueueService> _logger;
public HlcSchedulerEnqueueService(
IHybridLogicalClock hlc,
ISchedulerLogRepository logRepository,
IChainHeadRepository chainHeadRepository,
ILogger<HlcSchedulerEnqueueService> logger)
{
_hlc = hlc ?? throw new ArgumentNullException(nameof(hlc));
_logRepository = logRepository ?? throw new ArgumentNullException(nameof(logRepository));
_chainHeadRepository = chainHeadRepository ?? throw new ArgumentNullException(nameof(chainHeadRepository));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc />
public async Task<SchedulerEnqueueResult> EnqueueAsync(SchedulerJobPayload payload, CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(payload);
ValidatePayload(payload);
// 1. Generate HLC timestamp
var tHlc = _hlc.Tick();
// 2. Compute deterministic job ID from payload
var jobId = ComputeDeterministicJobId(payload);
// 3. Compute canonical JSON and payload hash
var canonicalJson = SerializeToCanonicalJson(payload);
var payloadHash = SchedulerChainLinking.ComputePayloadHash(canonicalJson);
// 4. Get previous chain link for this partition
var prevLink = await _chainHeadRepository.GetLastLinkAsync(
payload.TenantId,
payload.PartitionKey,
ct);
// 5. Compute new chain link
var link = SchedulerChainLinking.ComputeLink(prevLink, jobId, tHlc, payloadHash);
// 6. Create log entry
var logEntry = new SchedulerLogEntity
{
TenantId = payload.TenantId,
THlc = tHlc.ToSortableString(),
PartitionKey = payload.PartitionKey,
JobId = jobId,
PayloadHash = payloadHash,
PrevLink = prevLink,
Link = link,
CreatedAt = DateTimeOffset.UtcNow
};
// 7. Insert log entry atomically with chain head update
try
{
await _logRepository.InsertWithChainUpdateAsync(logEntry, ct);
_logger.LogDebug(
"Enqueued job {JobId} with HLC {HlcTimestamp}, link {Link}",
jobId,
tHlc.ToSortableString(),
SchedulerChainLinking.ToHexString(link));
return new SchedulerEnqueueResult
{
Timestamp = tHlc,
JobId = jobId,
Link = link,
PayloadHash = payloadHash,
PrevLink = prevLink,
IsDuplicate = false
};
}
catch (InvalidOperationException ex) when (ex.Message.Contains("unique constraint", StringComparison.OrdinalIgnoreCase))
{
// Idempotent: job with same key already exists
_logger.LogDebug(
"Duplicate job submission for tenant {TenantId}, idempotency key {IdempotencyKey}",
payload.TenantId,
payload.IdempotencyKey);
// Retrieve existing entry
var existing = await _logRepository.GetByJobIdAsync(jobId, ct);
if (existing is null)
{
throw new InvalidOperationException(
$"Duplicate detected but existing entry not found for job {jobId}");
}
return new SchedulerEnqueueResult
{
Timestamp = HlcTimestamp.Parse(existing.THlc),
JobId = existing.JobId,
Link = existing.Link,
PayloadHash = existing.PayloadHash,
PrevLink = existing.PrevLink,
IsDuplicate = true
};
}
}
/// <inheritdoc />
public async Task<IReadOnlyList<SchedulerEnqueueResult>> EnqueueBatchAsync(
IReadOnlyList<SchedulerJobPayload> payloads,
CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(payloads);
if (payloads.Count == 0)
{
return Array.Empty<SchedulerEnqueueResult>();
}
// Validate all payloads first
foreach (var payload in payloads)
{
ValidatePayload(payload);
}
// Group by partition to compute chains correctly
var byPartition = payloads
.Select((p, i) => (Payload: p, Index: i))
.GroupBy(x => (x.Payload.TenantId, x.Payload.PartitionKey))
.ToDictionary(g => g.Key, g => g.ToList());
var results = new SchedulerEnqueueResult[payloads.Count];
var entries = new List<SchedulerLogEntity>(payloads.Count);
foreach (var ((tenantId, partitionKey), items) in byPartition)
{
// Get current chain head for this partition
var prevLink = await _chainHeadRepository.GetLastLinkAsync(tenantId, partitionKey, ct);
foreach (var (payload, index) in items)
{
// Generate HLC timestamp (monotonically increasing within batch)
var tHlc = _hlc.Tick();
// Compute deterministic job ID
var jobId = ComputeDeterministicJobId(payload);
// Compute payload hash
var canonicalJson = SerializeToCanonicalJson(payload);
var payloadHash = SchedulerChainLinking.ComputePayloadHash(canonicalJson);
// Compute chain link
var link = SchedulerChainLinking.ComputeLink(prevLink, jobId, tHlc, payloadHash);
// Create log entry
var entry = new SchedulerLogEntity
{
TenantId = payload.TenantId,
THlc = tHlc.ToSortableString(),
PartitionKey = payload.PartitionKey,
JobId = jobId,
PayloadHash = payloadHash,
PrevLink = prevLink,
Link = link,
CreatedAt = DateTimeOffset.UtcNow
};
entries.Add(entry);
results[index] = new SchedulerEnqueueResult
{
Timestamp = tHlc,
JobId = jobId,
Link = link,
PayloadHash = payloadHash,
PrevLink = prevLink,
IsDuplicate = false
};
// Next entry's prev_link is this entry's link
prevLink = link;
}
}
// Insert all entries in a single transaction
foreach (var entry in entries)
{
await _logRepository.InsertWithChainUpdateAsync(entry, ct);
}
_logger.LogDebug("Enqueued batch of {Count} jobs", payloads.Count);
return results;
}
/// <summary>
/// Compute deterministic job ID from payload using SHA-256.
/// The ID is derived from tenant + idempotency key to ensure uniqueness.
/// </summary>
private static Guid ComputeDeterministicJobId(SchedulerJobPayload payload)
{
// Use namespace-based GUID generation (similar to GUID v5)
// Input: namespace UUID + tenant_id + idempotency_key
var input = $"{payload.TenantId}:{payload.IdempotencyKey}";
var inputBytes = Encoding.UTF8.GetBytes(input);
var namespaceBytes = JobIdNamespace.ToByteArray();
// Combine namespace + input
var combined = new byte[namespaceBytes.Length + inputBytes.Length];
Buffer.BlockCopy(namespaceBytes, 0, combined, 0, namespaceBytes.Length);
Buffer.BlockCopy(inputBytes, 0, combined, namespaceBytes.Length, inputBytes.Length);
// Hash and take first 16 bytes for GUID
var hash = SHA256.HashData(combined);
var guidBytes = new byte[16];
Buffer.BlockCopy(hash, 0, guidBytes, 0, 16);
// Set version (4) and variant (RFC 4122) bits for valid GUID format
guidBytes[6] = (byte)((guidBytes[6] & 0x0F) | 0x50); // Version 5-like (using SHA-256)
guidBytes[8] = (byte)((guidBytes[8] & 0x3F) | 0x80); // RFC 4122 variant
return new Guid(guidBytes);
}
/// <summary>
/// Serialize payload to canonical JSON for deterministic hashing.
/// </summary>
private static string SerializeToCanonicalJson(SchedulerJobPayload payload)
{
// Create a serializable representation with stable ordering
var canonical = new SortedDictionary<string, object?>(StringComparer.Ordinal)
{
["tenantId"] = payload.TenantId,
["partitionKey"] = payload.PartitionKey,
["jobType"] = payload.JobType,
["priority"] = payload.Priority,
["idempotencyKey"] = payload.IdempotencyKey,
["correlationId"] = payload.CorrelationId,
["maxAttempts"] = payload.MaxAttempts,
["notBefore"] = payload.NotBefore?.ToString("O"),
["createdBy"] = payload.CreatedBy
};
// Add data if present, with sorted keys
if (payload.Data is not null && payload.Data.Count > 0)
{
var sortedData = new SortedDictionary<string, object?>(StringComparer.Ordinal);
foreach (var kvp in payload.Data.OrderBy(x => x.Key, StringComparer.Ordinal))
{
sortedData[kvp.Key] = kvp.Value;
}
canonical["data"] = sortedData;
}
return CanonicalJsonSerializer.Serialize(canonical);
}
/// <summary>
/// Validate payload before enqueueing.
/// </summary>
private static void ValidatePayload(SchedulerJobPayload payload)
{
if (string.IsNullOrWhiteSpace(payload.TenantId))
{
throw new ArgumentException("TenantId is required", nameof(payload));
}
if (string.IsNullOrWhiteSpace(payload.JobType))
{
throw new ArgumentException("JobType is required", nameof(payload));
}
if (string.IsNullOrWhiteSpace(payload.IdempotencyKey))
{
throw new ArgumentException("IdempotencyKey is required", nameof(payload));
}
if (payload.MaxAttempts < 1)
{
throw new ArgumentException("MaxAttempts must be at least 1", nameof(payload));
}
}
}

View File

@@ -0,0 +1,60 @@
// -----------------------------------------------------------------------------
// IBatchSnapshotService.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-013 - Implement BatchSnapshotService
// -----------------------------------------------------------------------------
using StellaOps.HybridLogicalClock;
using StellaOps.Scheduler.Queue.Models;
namespace StellaOps.Scheduler.Queue.Services;
/// <summary>
/// Service for creating and managing batch snapshots of the scheduler log.
/// Snapshots provide audit anchors for verifying chain integrity.
/// </summary>
public interface IBatchSnapshotService
{
/// <summary>
/// Creates a batch snapshot for a given HLC range.
/// </summary>
/// <param name="tenantId">Tenant identifier.</param>
/// <param name="startT">Start HLC timestamp (inclusive).</param>
/// <param name="endT">End HLC timestamp (inclusive).</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>The created snapshot.</returns>
/// <exception cref="InvalidOperationException">If no jobs exist in the specified range.</exception>
Task<BatchSnapshotResult> CreateSnapshotAsync(
string tenantId,
HlcTimestamp startT,
HlcTimestamp endT,
CancellationToken ct = default);
/// <summary>
/// Gets a batch snapshot by ID.
/// </summary>
Task<BatchSnapshotResult?> GetByIdAsync(Guid batchId, CancellationToken ct = default);
/// <summary>
/// Gets recent batch snapshots for a tenant.
/// </summary>
Task<IReadOnlyList<BatchSnapshotResult>> GetRecentAsync(
string tenantId,
int limit = 10,
CancellationToken ct = default);
/// <summary>
/// Gets the latest batch snapshot for a tenant.
/// </summary>
Task<BatchSnapshotResult?> GetLatestAsync(
string tenantId,
CancellationToken ct = default);
/// <summary>
/// Finds snapshots that contain a specific HLC timestamp.
/// </summary>
Task<IReadOnlyList<BatchSnapshotResult>> FindContainingAsync(
string tenantId,
HlcTimestamp timestamp,
CancellationToken ct = default);
}

View File

@@ -0,0 +1,73 @@
// -----------------------------------------------------------------------------
// IHlcSchedulerDequeueService.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-010 - Implement HlcSchedulerDequeueService
// -----------------------------------------------------------------------------
using StellaOps.HybridLogicalClock;
using StellaOps.Scheduler.Queue.Models;
namespace StellaOps.Scheduler.Queue.Services;
/// <summary>
/// Service for HLC-ordered job dequeue with chain verification.
/// </summary>
public interface IHlcSchedulerDequeueService
{
/// <summary>
/// Dequeue jobs in HLC order (ascending) for a tenant/partition.
/// </summary>
/// <param name="tenantId">Tenant identifier.</param>
/// <param name="partitionKey">Optional partition key (null for all partitions).</param>
/// <param name="limit">Maximum jobs to return.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>Jobs ordered by HLC timestamp (ascending).</returns>
Task<IReadOnlyList<SchedulerDequeueResult>> DequeueAsync(
string tenantId,
string? partitionKey,
int limit,
CancellationToken ct = default);
/// <summary>
/// Dequeue jobs within an HLC timestamp range.
/// </summary>
/// <param name="tenantId">Tenant identifier.</param>
/// <param name="startT">Start HLC (inclusive, null for no lower bound).</param>
/// <param name="endT">End HLC (inclusive, null for no upper bound).</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>Jobs ordered by HLC timestamp within the range.</returns>
Task<IReadOnlyList<SchedulerDequeueResult>> DequeueByRangeAsync(
string tenantId,
HlcTimestamp? startT,
HlcTimestamp? endT,
CancellationToken ct = default);
/// <summary>
/// Get a specific job by its ID.
/// </summary>
/// <param name="jobId">Job identifier.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>The job if found, null otherwise.</returns>
Task<SchedulerDequeueResult?> GetByJobIdAsync(
Guid jobId,
CancellationToken ct = default);
/// <summary>
/// Get a job by its chain link.
/// </summary>
/// <param name="link">Chain link hash.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>The job if found, null otherwise.</returns>
Task<SchedulerDequeueResult?> GetByLinkAsync(
byte[] link,
CancellationToken ct = default);
/// <summary>
/// Count jobs within an HLC range.
/// </summary>
Task<int> CountByRangeAsync(
string tenantId,
HlcTimestamp? startT,
HlcTimestamp? endT,
CancellationToken ct = default);
}

View File

@@ -0,0 +1,44 @@
// -----------------------------------------------------------------------------
// IHlcSchedulerEnqueueService.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-009 - Implement HlcSchedulerEnqueueService
// -----------------------------------------------------------------------------
using StellaOps.Scheduler.Queue.Models;
namespace StellaOps.Scheduler.Queue.Services;
/// <summary>
/// Service for HLC-ordered job enqueueing with cryptographic chain linking.
/// Implements the advisory requirement: "derive order from deterministic, monotonic
/// time inside your system and prove the sequence with hashes."
/// </summary>
public interface IHlcSchedulerEnqueueService
{
/// <summary>
/// Enqueue a job with HLC timestamp and chain link.
/// </summary>
/// <param name="payload">Job payload to enqueue.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>Enqueue result with HLC timestamp, job ID, and chain link.</returns>
/// <remarks>
/// This operation is atomic: the log entry and chain head update occur in a single transaction.
/// If the idempotency key already exists for the tenant, returns the existing job's details.
/// </remarks>
Task<SchedulerEnqueueResult> EnqueueAsync(SchedulerJobPayload payload, CancellationToken ct = default);
/// <summary>
/// Enqueue multiple jobs atomically in a batch.
/// All jobs receive HLC timestamps from the same clock tick sequence.
/// </summary>
/// <param name="payloads">Job payloads to enqueue.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>Enqueue results in the same order as inputs.</returns>
/// <remarks>
/// The batch is processed atomically. If any job fails to enqueue, the entire batch is rolled back.
/// Chain links are computed sequentially within the batch.
/// </remarks>
Task<IReadOnlyList<SchedulerEnqueueResult>> EnqueueBatchAsync(
IReadOnlyList<SchedulerJobPayload> payloads,
CancellationToken ct = default);
}

View File

@@ -0,0 +1,40 @@
// -----------------------------------------------------------------------------
// ISchedulerChainVerifier.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-015 - Implement chain verification
// -----------------------------------------------------------------------------
using StellaOps.HybridLogicalClock;
using StellaOps.Scheduler.Queue.Models;
namespace StellaOps.Scheduler.Queue.Services;
/// <summary>
/// Service for verifying scheduler chain integrity.
/// </summary>
public interface ISchedulerChainVerifier
{
/// <summary>
/// Verifies the chain integrity for a tenant within an optional HLC range.
/// </summary>
/// <param name="tenantId">Tenant identifier.</param>
/// <param name="partitionKey">Optional partition key (null for all partitions).</param>
/// <param name="startT">Start HLC (inclusive, null for no lower bound).</param>
/// <param name="endT">End HLC (inclusive, null for no upper bound).</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>Verification result with any issues found.</returns>
Task<ChainVerificationResult> VerifyAsync(
string tenantId,
string? partitionKey = null,
HlcTimestamp? startT = null,
HlcTimestamp? endT = null,
CancellationToken ct = default);
/// <summary>
/// Verifies a single entry's link is correctly computed.
/// </summary>
/// <param name="jobId">Job ID to verify.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>True if the entry's link is valid.</returns>
Task<bool> VerifySingleAsync(Guid jobId, CancellationToken ct = default);
}

View File

@@ -0,0 +1,215 @@
// -----------------------------------------------------------------------------
// SchedulerChainVerifier.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-015 - Implement chain verification
// -----------------------------------------------------------------------------
using System.Security.Cryptography;
using Microsoft.Extensions.Logging;
using StellaOps.HybridLogicalClock;
using StellaOps.Scheduler.Persistence.Postgres;
using StellaOps.Scheduler.Persistence.Postgres.Repositories;
using StellaOps.Scheduler.Queue.Models;
namespace StellaOps.Scheduler.Queue.Services;
/// <summary>
/// Service for verifying scheduler chain integrity.
/// </summary>
public sealed class SchedulerChainVerifier : ISchedulerChainVerifier
{
private readonly ISchedulerLogRepository _logRepository;
private readonly ILogger<SchedulerChainVerifier> _logger;
public SchedulerChainVerifier(
ISchedulerLogRepository logRepository,
ILogger<SchedulerChainVerifier> logger)
{
_logRepository = logRepository ?? throw new ArgumentNullException(nameof(logRepository));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc />
public async Task<ChainVerificationResult> VerifyAsync(
string tenantId,
string? partitionKey = null,
HlcTimestamp? startT = null,
HlcTimestamp? endT = null,
CancellationToken ct = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
// Get entries in HLC order
var entries = await _logRepository.GetByHlcRangeAsync(
tenantId,
startT?.ToSortableString(),
endT?.ToSortableString(),
ct);
if (entries.Count == 0)
{
return new ChainVerificationResult
{
IsValid = true,
EntriesChecked = 0,
Issues = Array.Empty<ChainVerificationIssue>()
};
}
// Filter by partition if specified
if (partitionKey is not null)
{
entries = entries.Where(e => e.PartitionKey == partitionKey).ToList();
}
var issues = new List<ChainVerificationIssue>();
byte[]? expectedPrevLink = null;
string? previousHlc = null;
foreach (var entry in entries)
{
// Verify payload hash length
if (entry.PayloadHash.Length != SchedulerChainLinking.LinkSizeBytes)
{
issues.Add(new ChainVerificationIssue
{
JobId = entry.JobId,
THlc = entry.THlc,
IssueType = ChainVerificationIssueType.InvalidPayloadHash,
Description = $"Payload hash length is {entry.PayloadHash.Length}, expected {SchedulerChainLinking.LinkSizeBytes}",
Expected = SchedulerChainLinking.LinkSizeBytes.ToString(),
Actual = entry.PayloadHash.Length.ToString()
});
continue;
}
// Verify link length
if (entry.Link.Length != SchedulerChainLinking.LinkSizeBytes)
{
issues.Add(new ChainVerificationIssue
{
JobId = entry.JobId,
THlc = entry.THlc,
IssueType = ChainVerificationIssueType.InvalidLinkLength,
Description = $"Link length is {entry.Link.Length}, expected {SchedulerChainLinking.LinkSizeBytes}",
Expected = SchedulerChainLinking.LinkSizeBytes.ToString(),
Actual = entry.Link.Length.ToString()
});
continue;
}
// Verify HLC ordering (if this is for a single partition)
if (previousHlc is not null && string.Compare(entry.THlc, previousHlc, StringComparison.Ordinal) < 0)
{
issues.Add(new ChainVerificationIssue
{
JobId = entry.JobId,
THlc = entry.THlc,
IssueType = ChainVerificationIssueType.HlcOrderViolation,
Description = $"HLC {entry.THlc} is before previous {previousHlc}",
Expected = $"> {previousHlc}",
Actual = entry.THlc
});
}
// Verify prev_link matches expected (for first entry, both should be null/zero)
if (!ByteArrayEquals(entry.PrevLink, expectedPrevLink))
{
issues.Add(new ChainVerificationIssue
{
JobId = entry.JobId,
THlc = entry.THlc,
IssueType = ChainVerificationIssueType.PrevLinkMismatch,
Description = "PrevLink doesn't match previous entry's link",
Expected = SchedulerChainLinking.ToHexString(expectedPrevLink),
Actual = SchedulerChainLinking.ToHexString(entry.PrevLink)
});
}
// Recompute link and verify
var tHlc = HlcTimestamp.Parse(entry.THlc);
var computed = SchedulerChainLinking.ComputeLink(
entry.PrevLink,
entry.JobId,
tHlc,
entry.PayloadHash);
if (!SchedulerChainLinking.VerifyLink(entry.Link, entry.PrevLink, entry.JobId, tHlc, entry.PayloadHash))
{
issues.Add(new ChainVerificationIssue
{
JobId = entry.JobId,
THlc = entry.THlc,
IssueType = ChainVerificationIssueType.LinkMismatch,
Description = "Stored link doesn't match computed link",
Expected = SchedulerChainLinking.ToHexString(computed),
Actual = SchedulerChainLinking.ToHexString(entry.Link)
});
}
// Update expected values for next iteration
expectedPrevLink = entry.Link;
previousHlc = entry.THlc;
}
var result = new ChainVerificationResult
{
IsValid = issues.Count == 0,
EntriesChecked = entries.Count,
Issues = issues,
FirstHlc = entries.Count > 0 ? entries[0].THlc : null,
LastHlc = entries.Count > 0 ? entries[^1].THlc : null,
HeadLink = entries.Count > 0 ? entries[^1].Link : null
};
_logger.LogInformation(
"Chain verification for tenant {TenantId}: {Status}, {EntriesChecked} entries, {IssueCount} issues",
tenantId,
result.IsValid ? "VALID" : "INVALID",
result.EntriesChecked,
issues.Count);
return result;
}
/// <inheritdoc />
public async Task<bool> VerifySingleAsync(Guid jobId, CancellationToken ct = default)
{
var entry = await _logRepository.GetByJobIdAsync(jobId, ct);
if (entry is null)
{
return false;
}
// Verify lengths
if (entry.PayloadHash.Length != SchedulerChainLinking.LinkSizeBytes ||
entry.Link.Length != SchedulerChainLinking.LinkSizeBytes)
{
return false;
}
// Verify link computation
var tHlc = HlcTimestamp.Parse(entry.THlc);
return SchedulerChainLinking.VerifyLink(
entry.Link,
entry.PrevLink,
entry.JobId,
tHlc,
entry.PayloadHash);
}
private static bool ByteArrayEquals(byte[]? a, byte[]? b)
{
if (a is null && b is null)
{
return true;
}
if (a is null || b is null)
{
return false;
}
return CryptographicOperations.FixedTimeEquals(a, b);
}
}

View File

@@ -0,0 +1,46 @@
// -----------------------------------------------------------------------------
// ISchedulerSnapshotSigner.cs
// Sprint: SPRINT_20260105_002_002_SCHEDULER_hlc_queue_chain
// Task: SQC-014 - DSSE signing integration for batch snapshots
// -----------------------------------------------------------------------------
namespace StellaOps.Scheduler.Queue.Signing;
/// <summary>
/// Interface for signing scheduler batch snapshots with DSSE.
/// </summary>
/// <remarks>
/// Implementations should use the attestation infrastructure (IAttestationSigningService)
/// to create DSSE-compliant signatures. This interface exists to decouple the scheduler
/// queue module from direct attestation dependencies.
/// </remarks>
public interface ISchedulerSnapshotSigner
{
/// <summary>
/// Signs a batch snapshot digest.
/// </summary>
/// <param name="digest">SHA-256 digest of the snapshot canonical form.</param>
/// <param name="tenantId">Tenant identifier for key selection.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>Signed result containing key ID and signature.</returns>
Task<SnapshotSignResult> SignAsync(
byte[] digest,
string tenantId,
CancellationToken ct = default);
/// <summary>
/// Gets whether signing is available and configured.
/// </summary>
bool IsAvailable { get; }
}
/// <summary>
/// Result of signing a batch snapshot.
/// </summary>
/// <param name="KeyId">Identifier of the signing key used.</param>
/// <param name="Signature">DSSE signature bytes.</param>
/// <param name="Algorithm">Signing algorithm (e.g., "ES256", "RS256").</param>
public sealed record SnapshotSignResult(
string KeyId,
byte[] Signature,
string Algorithm);

View File

@@ -20,6 +20,6 @@
<ProjectReference Include="..\StellaOps.Scheduler.Models\StellaOps.Scheduler.Models.csproj" />
<ProjectReference Include="..\StellaOps.Scheduler.Persistence\StellaOps.Scheduler.Persistence.csproj" />
<ProjectReference Include="..\..\..\__Libraries\StellaOps.HybridLogicalClock\StellaOps.HybridLogicalClock.csproj" />
<ProjectReference Include="..\..\..\__Libraries\StellaOps.Canonical.Json\StellaOps.Canonical.Json.csproj" />
<ProjectReference Include="..\..\..\__Libraries\StellaOps.Determinism.Abstractions\StellaOps.Determinism.Abstractions.csproj" />
</ItemGroup>
</Project>