save progress
This commit is contained in:
@@ -1,21 +1,17 @@
|
||||
// ───────────────────────────────────────────────────────────────────────────
|
||||
// StellaOps Attestor — Distributed Verification Provider (Resilient, Multi-Node)
|
||||
// -----------------------------------------------------------------------------
|
||||
// StellaOps Attestor - Distributed Verification Provider (Resilient, Multi-Node)
|
||||
// SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
// ───────────────────────────────────────────────────────────────────────────
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
#if STELLAOPS_EXPERIMENTAL_DISTRIBUTED_VERIFY
|
||||
|
||||
using System.Buffers.Binary;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Net.Http.Json;
|
||||
using System.Security.Cryptography;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using Polly;
|
||||
using Polly.CircuitBreaker;
|
||||
using Polly.Retry;
|
||||
using Polly.Timeout;
|
||||
using StellaOps.Attestor.Verify.Configuration;
|
||||
using StellaOps.Attestor.Verify.Models;
|
||||
|
||||
@@ -32,7 +28,6 @@ public class DistributedVerificationProvider : IVerificationProvider
|
||||
private readonly HttpClient _httpClient;
|
||||
private readonly ConcurrentDictionary<string, CircuitBreakerState> _circuitStates = new();
|
||||
private readonly ConsistentHashRing _hashRing;
|
||||
private readonly ResiliencePipeline<VerificationResult> _resiliencePipeline;
|
||||
|
||||
public DistributedVerificationProvider(
|
||||
ILogger<DistributedVerificationProvider> logger,
|
||||
@@ -49,7 +44,6 @@ public class DistributedVerificationProvider : IVerificationProvider
|
||||
}
|
||||
|
||||
_hashRing = new ConsistentHashRing(_options.Nodes, _options.VirtualNodeMultiplier);
|
||||
_resiliencePipeline = BuildResiliencePipeline();
|
||||
|
||||
_logger.LogInformation("Initialized distributed verification provider with {NodeCount} nodes", _options.Nodes.Count);
|
||||
}
|
||||
@@ -83,9 +77,7 @@ public class DistributedVerificationProvider : IVerificationProvider
|
||||
|
||||
try
|
||||
{
|
||||
var result = await _resiliencePipeline.ExecuteAsync(
|
||||
async ct => await ExecuteVerificationAsync(node, request, ct),
|
||||
cancellationToken);
|
||||
var result = await ExecuteWithRetriesAsync(node, request, cancellationToken);
|
||||
|
||||
_logger.LogInformation(
|
||||
"Verification request {RequestId} completed on node {NodeId} with result {Status}",
|
||||
@@ -196,37 +188,36 @@ public class DistributedVerificationProvider : IVerificationProvider
|
||||
return result ?? throw new InvalidOperationException("Received null response from verification node");
|
||||
}
|
||||
|
||||
private ResiliencePipeline<VerificationResult> BuildResiliencePipeline()
|
||||
private async Task<VerificationResult> ExecuteWithRetriesAsync(
|
||||
VerificationNode node,
|
||||
VerificationRequest request,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
return new ResiliencePipelineBuilder<VerificationResult>()
|
||||
.AddTimeout(new TimeoutStrategyOptions
|
||||
Exception? lastError = null;
|
||||
|
||||
for (var attempt = 0; attempt <= _options.MaxRetries; attempt++)
|
||||
{
|
||||
using var attemptCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||||
attemptCts.CancelAfter(_options.RequestTimeout);
|
||||
|
||||
try
|
||||
{
|
||||
Timeout = _options.RequestTimeout,
|
||||
OnTimeout = args =>
|
||||
{
|
||||
_logger.LogWarning("Request timed out after {Timeout}", args.Timeout);
|
||||
return default;
|
||||
},
|
||||
})
|
||||
.AddRetry(new RetryStrategyOptions<VerificationResult>
|
||||
return await ExecuteVerificationAsync(node, request, attemptCts.Token);
|
||||
}
|
||||
catch (Exception ex) when (ex is HttpRequestException or TaskCanceledException)
|
||||
{
|
||||
MaxRetryAttempts = _options.MaxRetries,
|
||||
Delay = _options.RetryDelay,
|
||||
BackoffType = DelayBackoffType.Exponential,
|
||||
ShouldHandle = new PredicateBuilder<VerificationResult>()
|
||||
.Handle<HttpRequestException>()
|
||||
.Handle<TaskCanceledException>(),
|
||||
OnRetry = args =>
|
||||
lastError = ex;
|
||||
if (attempt >= _options.MaxRetries)
|
||||
{
|
||||
_logger.LogWarning(
|
||||
args.Outcome.Exception,
|
||||
"Retry attempt {AttemptNumber} after delay {Delay}",
|
||||
args.AttemptNumber,
|
||||
args.RetryDelay);
|
||||
return default;
|
||||
},
|
||||
})
|
||||
.Build();
|
||||
break;
|
||||
}
|
||||
|
||||
_logger.LogWarning(ex, "Retry attempt {AttemptNumber} after delay {Delay}", attempt + 1, _options.RetryDelay);
|
||||
await Task.Delay(_options.RetryDelay, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
throw lastError ?? new InvalidOperationException("Verification retry failed.");
|
||||
}
|
||||
|
||||
private static string ComputeRoutingKey(VerificationRequest request)
|
||||
@@ -342,7 +333,7 @@ internal sealed class ConsistentHashRing
|
||||
private static int ComputeHash(string key)
|
||||
{
|
||||
var hashBytes = SHA256.HashData(Encoding.UTF8.GetBytes(key));
|
||||
return BitConverter.ToInt32(hashBytes, 0);
|
||||
return BinaryPrimitives.ReadInt32BigEndian(hashBytes);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user