// ───────────────────────────────────────────────────────────────────────────
// StellaOps Attestor — Distributed Verification Provider (Resilient, Multi-Node)
// SPDX-License-Identifier: AGPL-3.0-or-later
// ───────────────────────────────────────────────────────────────────────────
using System.Collections.Concurrent;
using System.Net.Http.Json;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Polly;
using Polly.CircuitBreaker;
using Polly.Retry;
using Polly.Timeout;
using StellaOps.Attestor.Verify.Configuration;
using StellaOps.Attestor.Verify.Models;
namespace StellaOps.Attestor.Verify.Providers;
///
/// Provides distributed verification by distributing work across multiple verification nodes.
/// Implements circuit breaker, retry policies, and consistent hashing for deterministic routing.
///
public class DistributedVerificationProvider : IVerificationProvider
{
private readonly ILogger _logger;
private readonly DistributedVerificationOptions _options;
private readonly HttpClient _httpClient;
private readonly ConcurrentDictionary _circuitStates = new();
private readonly ConsistentHashRing _hashRing;
private readonly ResiliencePipeline _resiliencePipeline;
public DistributedVerificationProvider(
ILogger logger,
IOptions options,
HttpClient httpClient)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient));
if (_options.Nodes == null || _options.Nodes.Count == 0)
{
throw new ArgumentException("At least one verification node must be configured");
}
_hashRing = new ConsistentHashRing(_options.Nodes, _options.VirtualNodeMultiplier);
_resiliencePipeline = BuildResiliencePipeline();
_logger.LogInformation("Initialized distributed verification provider with {NodeCount} nodes", _options.Nodes.Count);
}
///
public async Task VerifyAsync(
VerificationRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
// Compute deterministic hash for routing
var routingKey = ComputeRoutingKey(request);
var orderedNodes = _hashRing.GetOrderedNodes(routingKey);
_logger.LogDebug(
"Routing verification request {RequestId} with key {RoutingKey} through {NodeCount} nodes",
request.RequestId,
routingKey,
orderedNodes.Count);
// Try nodes in order until one succeeds
List exceptions = [];
foreach (var node in orderedNodes)
{
if (!IsNodeHealthy(node))
{
_logger.LogDebug("Skipping unhealthy node {NodeId}", node.Id);
continue;
}
try
{
var result = await _resiliencePipeline.ExecuteAsync(
async ct => await ExecuteVerificationAsync(node, request, ct),
cancellationToken);
_logger.LogInformation(
"Verification request {RequestId} completed on node {NodeId} with result {Status}",
request.RequestId,
node.Id,
result.Status);
return result;
}
catch (Exception ex) when (ex is HttpRequestException or TaskCanceledException or BrokenCircuitException)
{
_logger.LogWarning(ex, "Node {NodeId} failed for request {RequestId}", node.Id, request.RequestId);
exceptions.Add(ex);
MarkNodeUnhealthy(node);
}
}
// All nodes failed
_logger.LogError(
"All {NodeCount} nodes failed for verification request {RequestId}",
orderedNodes.Count,
request.RequestId);
return new VerificationResult
{
RequestId = request.RequestId,
Status = VerificationStatus.Error,
ErrorMessage = $"All verification nodes failed. {exceptions.Count} errors occurred.",
Timestamp = DateTimeOffset.UtcNow,
};
}
///
public async Task CheckHealthAsync(CancellationToken cancellationToken = default)
{
var results = new ConcurrentDictionary();
var tasks = _options.Nodes.Select(async node =>
{
try
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(TimeSpan.FromSeconds(5));
var response = await _httpClient.GetAsync(
new Uri(node.Endpoint, "health"),
cts.Token);
results[node.Id] = response.IsSuccessStatusCode;
}
catch
{
results[node.Id] = false;
}
});
await Task.WhenAll(tasks);
var healthyCount = results.Count(r => r.Value);
var totalCount = results.Count;
return new HealthCheckResult
{
IsHealthy = healthyCount >= _options.MinHealthyNodes,
HealthyNodeCount = healthyCount,
TotalNodeCount = totalCount,
NodeStatuses = results.ToDictionary(r => r.Key, r => r.Value),
Timestamp = DateTimeOffset.UtcNow,
};
}
///
/// Gets the current distribution statistics for monitoring.
///
public DistributionStats GetDistributionStats()
{
var healthyNodes = _options.Nodes.Where(IsNodeHealthy).ToList();
var unhealthyNodes = _options.Nodes.Except(healthyNodes).ToList();
return new DistributionStats
{
TotalNodes = _options.Nodes.Count,
HealthyNodes = healthyNodes.Count,
UnhealthyNodes = unhealthyNodes.Count,
VirtualNodesPerNode = _options.VirtualNodeMultiplier,
CircuitBreakerStates = _circuitStates.ToDictionary(
kvp => kvp.Key,
kvp => kvp.Value.ToString()),
};
}
private async Task ExecuteVerificationAsync(
VerificationNode node,
VerificationRequest request,
CancellationToken cancellationToken)
{
var endpoint = new Uri(node.Endpoint, "api/v1/verify");
_logger.LogDebug(
"Sending verification request {RequestId} to node {NodeId} at {Endpoint}",
request.RequestId,
node.Id,
endpoint);
using var response = await _httpClient.PostAsJsonAsync(endpoint, request, cancellationToken);
response.EnsureSuccessStatusCode();
var result = await response.Content.ReadFromJsonAsync(cancellationToken);
return result ?? throw new InvalidOperationException("Received null response from verification node");
}
private ResiliencePipeline BuildResiliencePipeline()
{
return new ResiliencePipelineBuilder()
.AddTimeout(new TimeoutStrategyOptions
{
Timeout = _options.RequestTimeout,
OnTimeout = args =>
{
_logger.LogWarning("Request timed out after {Timeout}", args.Timeout);
return default;
},
})
.AddRetry(new RetryStrategyOptions
{
MaxRetryAttempts = _options.MaxRetries,
Delay = _options.RetryDelay,
BackoffType = DelayBackoffType.Exponential,
ShouldHandle = new PredicateBuilder()
.Handle()
.Handle(),
OnRetry = args =>
{
_logger.LogWarning(
args.Outcome.Exception,
"Retry attempt {AttemptNumber} after delay {Delay}",
args.AttemptNumber,
args.RetryDelay);
return default;
},
})
.Build();
}
private static string ComputeRoutingKey(VerificationRequest request)
{
// Create a deterministic routing key based on the content to verify
// This ensures the same content always routes to the same primary node
var keyMaterial = $"{request.DigestAlgorithm}:{request.Digest}:{request.ArtifactUri}";
var hashBytes = SHA256.HashData(Encoding.UTF8.GetBytes(keyMaterial));
return Convert.ToHexString(hashBytes);
}
private bool IsNodeHealthy(VerificationNode node)
{
if (!_circuitStates.TryGetValue(node.Id, out var state))
{
return true; // No circuit breaker state means healthy
}
// Allow recovery after cooldown period
if (state.LastFailure.HasValue &&
DateTimeOffset.UtcNow - state.LastFailure.Value > _options.CircuitBreakerCooldown)
{
state.FailureCount = 0;
state.LastFailure = null;
return true;
}
return state.FailureCount < _options.CircuitBreakerThreshold;
}
private void MarkNodeUnhealthy(VerificationNode node)
{
var state = _circuitStates.GetOrAdd(node.Id, _ => new CircuitBreakerState());
state.FailureCount++;
state.LastFailure = DateTimeOffset.UtcNow;
if (state.FailureCount >= _options.CircuitBreakerThreshold)
{
_logger.LogWarning(
"Node {NodeId} circuit breaker opened after {FailureCount} failures",
node.Id,
state.FailureCount);
}
}
private sealed class CircuitBreakerState
{
public int FailureCount { get; set; }
public DateTimeOffset? LastFailure { get; set; }
public override string ToString() =>
FailureCount >= 3 ? "Open" : FailureCount > 0 ? "HalfOpen" : "Closed";
}
}
///
/// Implements consistent hashing for deterministic node selection.
///
internal sealed class ConsistentHashRing
{
private readonly SortedDictionary _ring = new();
private readonly int[] _sortedHashes;
private readonly VerificationNode[] _sortedNodes;
public ConsistentHashRing(IReadOnlyList nodes, int virtualNodeMultiplier)
{
foreach (var node in nodes)
{
for (var i = 0; i < virtualNodeMultiplier; i++)
{
var virtualKey = $"{node.Id}:{i}";
var hash = ComputeHash(virtualKey);
_ring[hash] = node;
}
}
_sortedHashes = [.. _ring.Keys];
_sortedNodes = [.. _ring.Values];
}
///
/// Gets nodes ordered by proximity to the routing key for failover.
///
public List GetOrderedNodes(string routingKey)
{
var keyHash = ComputeHash(routingKey);
// Binary search for the first node >= hash
var index = Array.BinarySearch(_sortedHashes, keyHash);
if (index < 0)
{
index = ~index;
}
// Collect unique nodes starting from the found position
var orderedNodes = new List();
var seen = new HashSet();
for (var i = 0; i < _sortedHashes.Length && orderedNodes.Count < _ring.Count; i++)
{
var actualIndex = (index + i) % _sortedHashes.Length;
var node = _sortedNodes[actualIndex];
if (seen.Add(node.Id))
{
orderedNodes.Add(node);
}
}
return orderedNodes;
}
private static int ComputeHash(string key)
{
var hashBytes = SHA256.HashData(Encoding.UTF8.GetBytes(key));
return BitConverter.ToInt32(hashBytes, 0);
}
}
///
/// Configuration options for distributed verification.
///
public class DistributedVerificationOptions
{
///
/// List of verification nodes.
///
public List Nodes { get; set; } = [];
///
/// Minimum number of healthy nodes required.
///
public int MinHealthyNodes { get; set; } = 1;
///
/// Number of virtual nodes per physical node for consistent hashing.
///
public int VirtualNodeMultiplier { get; set; } = 100;
///
/// Maximum retry attempts per node.
///
public int MaxRetries { get; set; } = 3;
///
/// Delay between retries.
///
public TimeSpan RetryDelay { get; set; } = TimeSpan.FromMilliseconds(500);
///
/// Request timeout per node.
///
public TimeSpan RequestTimeout { get; set; } = TimeSpan.FromSeconds(30);
///
/// Number of consecutive failures before circuit breaker opens.
///
public int CircuitBreakerThreshold { get; set; } = 3;
///
/// Time before a tripped circuit breaker allows retry.
///
public TimeSpan CircuitBreakerCooldown { get; set; } = TimeSpan.FromMinutes(1);
}
///
/// Represents a verification node in the distributed cluster.
///
public class VerificationNode
{
///
/// Unique identifier for this node.
///
public required string Id { get; init; }
///
/// Base URI for the node's API.
///
public required Uri Endpoint { get; init; }
///
/// Node priority (lower = higher priority).
///
public int Priority { get; init; } = 100;
///
/// Node region for locality-aware routing.
///
public string? Region { get; init; }
}
///
/// Health check result for the distributed provider.
///
public class HealthCheckResult
{
public bool IsHealthy { get; init; }
public int HealthyNodeCount { get; init; }
public int TotalNodeCount { get; init; }
public Dictionary NodeStatuses { get; init; } = [];
public DateTimeOffset Timestamp { get; init; }
}
///
/// Distribution statistics for monitoring.
///
public class DistributionStats
{
public int TotalNodes { get; init; }
public int HealthyNodes { get; init; }
public int UnhealthyNodes { get; init; }
public int VirtualNodesPerNode { get; init; }
public Dictionary CircuitBreakerStates { get; init; } = [];
}