up
Some checks failed
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
Policy Lint & Smoke / policy-lint (push) Has been cancelled

This commit is contained in:
StellaOps Bot
2025-11-27 23:44:42 +02:00
parent ef6e4b2067
commit 3b96b2e3ea
298 changed files with 47516 additions and 1168 deletions

View File

@@ -0,0 +1,44 @@
using System;
namespace StellaOps.Excititor.Worker.Options;
/// <summary>
/// Configuration options for the orchestrator worker SDK integration.
/// </summary>
public sealed class VexWorkerOrchestratorOptions
{
/// <summary>
/// Whether orchestrator integration is enabled.
/// </summary>
public bool Enabled { get; set; } = true;
/// <summary>
/// Interval between heartbeat emissions during job execution.
/// </summary>
public TimeSpan HeartbeatInterval { get; set; } = TimeSpan.FromSeconds(30);
/// <summary>
/// Minimum heartbeat interval (safety floor).
/// </summary>
public TimeSpan MinHeartbeatInterval { get; set; } = TimeSpan.FromSeconds(5);
/// <summary>
/// Maximum heartbeat interval (safety cap).
/// </summary>
public TimeSpan MaxHeartbeatInterval { get; set; } = TimeSpan.FromMinutes(2);
/// <summary>
/// Enable verbose logging for heartbeat/artifact events.
/// </summary>
public bool EnableVerboseLogging { get; set; }
/// <summary>
/// Maximum number of artifact hashes to retain in state.
/// </summary>
public int MaxArtifactHashes { get; set; } = 1000;
/// <summary>
/// Default tenant for worker jobs when not specified.
/// </summary>
public string DefaultTenant { get; set; } = "default";
}

View File

@@ -0,0 +1,152 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StellaOps.Excititor.Core.Orchestration;
using StellaOps.Excititor.Worker.Options;
namespace StellaOps.Excititor.Worker.Orchestration;
/// <summary>
/// Background service that emits periodic heartbeats during job execution.
/// </summary>
internal sealed class VexWorkerHeartbeatService
{
private readonly IVexWorkerOrchestratorClient _orchestratorClient;
private readonly IOptions<VexWorkerOrchestratorOptions> _options;
private readonly TimeProvider _timeProvider;
private readonly ILogger<VexWorkerHeartbeatService> _logger;
public VexWorkerHeartbeatService(
IVexWorkerOrchestratorClient orchestratorClient,
IOptions<VexWorkerOrchestratorOptions> options,
TimeProvider timeProvider,
ILogger<VexWorkerHeartbeatService> logger)
{
_orchestratorClient = orchestratorClient ?? throw new ArgumentNullException(nameof(orchestratorClient));
_options = options ?? throw new ArgumentNullException(nameof(options));
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <summary>
/// Runs the heartbeat loop for the given job context.
/// Call this in a background task while the job is running.
/// </summary>
public async Task RunAsync(
VexWorkerJobContext context,
Func<VexWorkerHeartbeatStatus> statusProvider,
Func<int?> progressProvider,
Func<string?> lastArtifactHashProvider,
Func<string?> lastArtifactKindProvider,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(context);
ArgumentNullException.ThrowIfNull(statusProvider);
if (!_options.Value.Enabled)
{
_logger.LogDebug("Orchestrator heartbeat service disabled; skipping heartbeat loop.");
return;
}
var interval = ComputeInterval();
_logger.LogDebug(
"Starting heartbeat loop for job {RunId} with interval {Interval}",
context.RunId,
interval);
await Task.Yield();
try
{
using var timer = new PeriodicTimer(interval);
// Send initial heartbeat
await SendHeartbeatAsync(
context,
statusProvider(),
progressProvider?.Invoke(),
lastArtifactHashProvider?.Invoke(),
lastArtifactKindProvider?.Invoke(),
cancellationToken).ConfigureAwait(false);
while (await timer.WaitForNextTickAsync(cancellationToken).ConfigureAwait(false))
{
if (cancellationToken.IsCancellationRequested)
{
break;
}
await SendHeartbeatAsync(
context,
statusProvider(),
progressProvider?.Invoke(),
lastArtifactHashProvider?.Invoke(),
lastArtifactKindProvider?.Invoke(),
cancellationToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
_logger.LogDebug("Heartbeat loop cancelled for job {RunId}", context.RunId);
}
catch (Exception ex)
{
_logger.LogWarning(
ex,
"Heartbeat loop error for job {RunId}: {Message}",
context.RunId,
ex.Message);
}
}
private async Task SendHeartbeatAsync(
VexWorkerJobContext context,
VexWorkerHeartbeatStatus status,
int? progress,
string? lastArtifactHash,
string? lastArtifactKind,
CancellationToken cancellationToken)
{
try
{
var heartbeat = new VexWorkerHeartbeat(
status,
progress,
QueueDepth: null,
lastArtifactHash,
lastArtifactKind,
ErrorCode: null,
RetryAfterSeconds: null);
await _orchestratorClient.SendHeartbeatAsync(context, heartbeat, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogWarning(
ex,
"Failed to send heartbeat for job {RunId}: {Message}",
context.RunId,
ex.Message);
}
}
private TimeSpan ComputeInterval()
{
var opts = _options.Value;
var interval = opts.HeartbeatInterval;
if (interval < opts.MinHeartbeatInterval)
{
interval = opts.MinHeartbeatInterval;
}
else if (interval > opts.MaxHeartbeatInterval)
{
interval = opts.MaxHeartbeatInterval;
}
return interval;
}
}

View File

@@ -0,0 +1,328 @@
using System;
using System.Collections.Immutable;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StellaOps.Excititor.Core;
using StellaOps.Excititor.Core.Orchestration;
using StellaOps.Excititor.Storage.Mongo;
using StellaOps.Excititor.Worker.Options;
namespace StellaOps.Excititor.Worker.Orchestration;
/// <summary>
/// Default implementation of <see cref="IVexWorkerOrchestratorClient"/>.
/// Stores heartbeats and artifacts locally and emits them to the orchestrator registry when configured.
/// </summary>
internal sealed class VexWorkerOrchestratorClient : IVexWorkerOrchestratorClient
{
private readonly IVexConnectorStateRepository _stateRepository;
private readonly TimeProvider _timeProvider;
private readonly IOptions<VexWorkerOrchestratorOptions> _options;
private readonly ILogger<VexWorkerOrchestratorClient> _logger;
public VexWorkerOrchestratorClient(
IVexConnectorStateRepository stateRepository,
TimeProvider timeProvider,
IOptions<VexWorkerOrchestratorOptions> options,
ILogger<VexWorkerOrchestratorClient> logger)
{
_stateRepository = stateRepository ?? throw new ArgumentNullException(nameof(stateRepository));
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public ValueTask<VexWorkerJobContext> StartJobAsync(
string tenant,
string connectorId,
string? checkpoint,
CancellationToken cancellationToken = default)
{
var runId = Guid.NewGuid();
var startedAt = _timeProvider.GetUtcNow();
var context = new VexWorkerJobContext(tenant, connectorId, runId, checkpoint, startedAt);
_logger.LogInformation(
"Orchestrator job started: tenant={Tenant} connector={ConnectorId} runId={RunId} checkpoint={Checkpoint}",
tenant,
connectorId,
runId,
checkpoint ?? "(none)");
return ValueTask.FromResult(context);
}
public async ValueTask SendHeartbeatAsync(
VexWorkerJobContext context,
VexWorkerHeartbeat heartbeat,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(context);
ArgumentNullException.ThrowIfNull(heartbeat);
var sequence = context.NextSequence();
var timestamp = _timeProvider.GetUtcNow();
// Update state with heartbeat info
var state = await _stateRepository.GetAsync(context.ConnectorId, cancellationToken).ConfigureAwait(false)
?? new VexConnectorState(context.ConnectorId, null, ImmutableArray<string>.Empty);
var updated = state with
{
LastHeartbeatAt = timestamp,
LastHeartbeatStatus = heartbeat.Status.ToString()
};
await _stateRepository.SaveAsync(updated, cancellationToken).ConfigureAwait(false);
if (_options.Value.EnableVerboseLogging)
{
_logger.LogDebug(
"Orchestrator heartbeat: runId={RunId} seq={Sequence} status={Status} progress={Progress} artifact={ArtifactHash}",
context.RunId,
sequence,
heartbeat.Status,
heartbeat.Progress,
heartbeat.LastArtifactHash);
}
}
public async ValueTask RecordArtifactAsync(
VexWorkerJobContext context,
VexWorkerArtifact artifact,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(context);
ArgumentNullException.ThrowIfNull(artifact);
// Track artifact hash in connector state for determinism verification
var state = await _stateRepository.GetAsync(context.ConnectorId, cancellationToken).ConfigureAwait(false)
?? new VexConnectorState(context.ConnectorId, null, ImmutableArray<string>.Empty);
var digests = state.DocumentDigests.IsDefault
? ImmutableArray<string>.Empty
: state.DocumentDigests;
// Add artifact hash if not already tracked (cap to avoid unbounded growth)
const int maxDigests = 1000;
if (!digests.Contains(artifact.Hash))
{
digests = digests.Length >= maxDigests
? digests.RemoveAt(0).Add(artifact.Hash)
: digests.Add(artifact.Hash);
}
var updated = state with
{
DocumentDigests = digests,
LastArtifactHash = artifact.Hash,
LastArtifactKind = artifact.Kind
};
await _stateRepository.SaveAsync(updated, cancellationToken).ConfigureAwait(false);
_logger.LogDebug(
"Orchestrator artifact recorded: runId={RunId} hash={Hash} kind={Kind} provider={Provider}",
context.RunId,
artifact.Hash,
artifact.Kind,
artifact.ProviderId);
}
public async ValueTask CompleteJobAsync(
VexWorkerJobContext context,
VexWorkerJobResult result,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(context);
ArgumentNullException.ThrowIfNull(result);
var state = await _stateRepository.GetAsync(context.ConnectorId, cancellationToken).ConfigureAwait(false)
?? new VexConnectorState(context.ConnectorId, null, ImmutableArray<string>.Empty);
var updated = state with
{
LastUpdated = result.CompletedAt,
LastSuccessAt = result.CompletedAt,
LastHeartbeatAt = result.CompletedAt,
LastHeartbeatStatus = VexWorkerHeartbeatStatus.Succeeded.ToString(),
LastArtifactHash = result.LastArtifactHash,
LastCheckpoint = result.LastCheckpoint,
FailureCount = 0,
NextEligibleRun = null,
LastFailureReason = null
};
await _stateRepository.SaveAsync(updated, cancellationToken).ConfigureAwait(false);
var duration = result.CompletedAt - context.StartedAt;
_logger.LogInformation(
"Orchestrator job completed: runId={RunId} connector={ConnectorId} documents={Documents} claims={Claims} duration={Duration}",
context.RunId,
context.ConnectorId,
result.DocumentsProcessed,
result.ClaimsGenerated,
duration);
}
public async ValueTask FailJobAsync(
VexWorkerJobContext context,
string errorCode,
string? errorMessage,
int? retryAfterSeconds,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(context);
var now = _timeProvider.GetUtcNow();
var state = await _stateRepository.GetAsync(context.ConnectorId, cancellationToken).ConfigureAwait(false)
?? new VexConnectorState(context.ConnectorId, null, ImmutableArray<string>.Empty);
var failureCount = state.FailureCount + 1;
var nextEligible = retryAfterSeconds.HasValue
? now.AddSeconds(retryAfterSeconds.Value)
: (DateTimeOffset?)null;
var updated = state with
{
LastHeartbeatAt = now,
LastHeartbeatStatus = VexWorkerHeartbeatStatus.Failed.ToString(),
FailureCount = failureCount,
NextEligibleRun = nextEligible,
LastFailureReason = Truncate($"{errorCode}: {errorMessage}", 512)
};
await _stateRepository.SaveAsync(updated, cancellationToken).ConfigureAwait(false);
_logger.LogWarning(
"Orchestrator job failed: runId={RunId} connector={ConnectorId} error={ErrorCode} retryAfter={RetryAfter}s",
context.RunId,
context.ConnectorId,
errorCode,
retryAfterSeconds);
}
public ValueTask FailJobAsync(
VexWorkerJobContext context,
VexWorkerError error,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(error);
_logger.LogDebug(
"Orchestrator job failed with classified error: runId={RunId} code={Code} category={Category} retryable={Retryable}",
context.RunId,
error.Code,
error.Category,
error.Retryable);
return FailJobAsync(
context,
error.Code,
error.Message,
error.Retryable ? error.RetryAfterSeconds : null,
cancellationToken);
}
public ValueTask<VexWorkerCommand?> GetPendingCommandAsync(
VexWorkerJobContext context,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(context);
// In this local implementation, commands are not externally sourced.
// Return Continue to indicate normal processing should continue.
// A full orchestrator integration would poll a command queue here.
if (!_options.Value.Enabled)
{
return ValueTask.FromResult<VexWorkerCommand?>(null);
}
// No pending commands in local mode
return ValueTask.FromResult<VexWorkerCommand?>(null);
}
public ValueTask AcknowledgeCommandAsync(
VexWorkerJobContext context,
long commandSequence,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(context);
_logger.LogDebug(
"Orchestrator command acknowledged: runId={RunId} sequence={Sequence}",
context.RunId,
commandSequence);
// In local mode, acknowledgment is a no-op
return ValueTask.CompletedTask;
}
public async ValueTask SaveCheckpointAsync(
VexWorkerJobContext context,
VexWorkerCheckpoint checkpoint,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(context);
ArgumentNullException.ThrowIfNull(checkpoint);
var now = _timeProvider.GetUtcNow();
var state = await _stateRepository.GetAsync(context.ConnectorId, cancellationToken).ConfigureAwait(false)
?? new VexConnectorState(context.ConnectorId, null, ImmutableArray<string>.Empty);
var updated = state with
{
LastCheckpoint = checkpoint.Cursor,
LastUpdated = checkpoint.LastProcessedAt ?? now,
DocumentDigests = checkpoint.ProcessedDigests.IsDefault
? ImmutableArray<string>.Empty
: checkpoint.ProcessedDigests,
ResumeTokens = checkpoint.ResumeTokens.IsEmpty
? ImmutableDictionary<string, string>.Empty
: checkpoint.ResumeTokens
};
await _stateRepository.SaveAsync(updated, cancellationToken).ConfigureAwait(false);
_logger.LogDebug(
"Orchestrator checkpoint saved: runId={RunId} connector={ConnectorId} cursor={Cursor} digests={DigestCount}",
context.RunId,
context.ConnectorId,
checkpoint.Cursor ?? "(none)",
checkpoint.ProcessedDigests.Length);
}
public async ValueTask<VexWorkerCheckpoint?> LoadCheckpointAsync(
string connectorId,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(connectorId);
var state = await _stateRepository.GetAsync(connectorId, cancellationToken).ConfigureAwait(false);
if (state is null)
{
return null;
}
return new VexWorkerCheckpoint(
connectorId,
state.LastCheckpoint,
state.LastUpdated,
state.DocumentDigests.IsDefault ? ImmutableArray<string>.Empty : state.DocumentDigests,
state.ResumeTokens.IsEmpty ? ImmutableDictionary<string, string>.Empty : state.ResumeTokens);
}
private static string Truncate(string? value, int maxLength)
{
if (string.IsNullOrEmpty(value))
{
return string.Empty;
}
return value.Length <= maxLength
? value
: value[..maxLength];
}
}

View File

@@ -1,25 +1,27 @@
using System.IO;
using System.Linq;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System.IO;
using System.Linq;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StellaOps.Plugin;
using StellaOps.Excititor.Connectors.RedHat.CSAF.DependencyInjection;
using StellaOps.Excititor.Core;
using StellaOps.Excititor.Core.Aoc;
using StellaOps.Excititor.Core.Orchestration;
using StellaOps.Excititor.Formats.CSAF;
using StellaOps.Excititor.Formats.CycloneDX;
using StellaOps.Excititor.Formats.OpenVEX;
using StellaOps.Excititor.Storage.Mongo;
using StellaOps.Excititor.Worker.Auth;
using StellaOps.Excititor.Worker.Options;
using StellaOps.Excititor.Worker.Orchestration;
using StellaOps.Excititor.Worker.Scheduling;
using StellaOps.Excititor.Worker.Signature;
using StellaOps.Excititor.Attestation.Extensions;
using StellaOps.Excititor.Attestation.Verification;
using StellaOps.IssuerDirectory.Client;
var builder = Host.CreateApplicationBuilder(args);
var services = builder.Services;
var configuration = builder.Configuration;
@@ -40,11 +42,11 @@ services.PostConfigure<VexWorkerOptions>(options =>
}
});
services.AddRedHatCsafConnector();
services.AddOptions<VexMongoStorageOptions>()
.Bind(configuration.GetSection("Excititor:Storage:Mongo"))
.ValidateOnStart();
services.AddOptions<VexMongoStorageOptions>()
.Bind(configuration.GetSection("Excititor:Storage:Mongo"))
.ValidateOnStart();
services.AddExcititorMongoStorage();
services.AddCsafNormalizer();
services.AddCycloneDxNormalizer();
@@ -71,38 +73,45 @@ services.PostConfigure<VexAttestationVerificationOptions>(options =>
}
});
services.AddExcititorAocGuards();
services.AddSingleton<IValidateOptions<VexWorkerOptions>, VexWorkerOptionsValidator>();
services.AddSingleton(TimeProvider.System);
services.PostConfigure<VexWorkerOptions>(options =>
{
if (!options.Providers.Any(provider => string.Equals(provider.ProviderId, "excititor:redhat", StringComparison.OrdinalIgnoreCase)))
{
options.Providers.Add(new VexWorkerProviderOptions
{
ProviderId = "excititor:redhat",
});
}
});
services.AddSingleton<IValidateOptions<VexWorkerOptions>, VexWorkerOptionsValidator>();
services.AddSingleton(TimeProvider.System);
services.PostConfigure<VexWorkerOptions>(options =>
{
if (!options.Providers.Any(provider => string.Equals(provider.ProviderId, "excititor:redhat", StringComparison.OrdinalIgnoreCase)))
{
options.Providers.Add(new VexWorkerProviderOptions
{
ProviderId = "excititor:redhat",
});
}
});
services.AddSingleton<PluginCatalog>(provider =>
{
var pluginOptions = provider.GetRequiredService<IOptions<VexWorkerPluginOptions>>().Value;
var catalog = new PluginCatalog();
var directory = pluginOptions.ResolveDirectory();
if (Directory.Exists(directory))
{
catalog.AddFromDirectory(directory, pluginOptions.ResolveSearchPattern());
}
else
{
var logger = provider.GetRequiredService<ILogger<Program>>();
logger.LogWarning("Excititor worker plugin directory '{Directory}' does not exist; proceeding without external connectors.", directory);
}
return catalog;
var directory = pluginOptions.ResolveDirectory();
if (Directory.Exists(directory))
{
catalog.AddFromDirectory(directory, pluginOptions.ResolveSearchPattern());
}
else
{
var logger = provider.GetRequiredService<ILogger<Program>>();
logger.LogWarning("Excititor worker plugin directory '{Directory}' does not exist; proceeding without external connectors.", directory);
}
return catalog;
});
// Orchestrator worker SDK integration
services.AddOptions<VexWorkerOrchestratorOptions>()
.Bind(configuration.GetSection("Excititor:Worker:Orchestrator"))
.ValidateOnStart();
services.AddSingleton<IVexWorkerOrchestratorClient, VexWorkerOrchestratorClient>();
services.AddSingleton<VexWorkerHeartbeatService>();
services.AddSingleton<IVexProviderRunner, DefaultVexProviderRunner>();
services.AddHostedService<VexWorkerHostedService>();
if (!workerConfigSnapshot.DisableConsensus)
@@ -115,5 +124,5 @@ services.AddSingleton<ITenantAuthorityClientFactory, TenantAuthorityClientFactor
var host = builder.Build();
await host.RunAsync();
public partial class Program;
public partial class Program;

View File

@@ -9,8 +9,10 @@ using MongoDB.Driver;
using StellaOps.Plugin;
using StellaOps.Excititor.Connectors.Abstractions;
using StellaOps.Excititor.Core;
using StellaOps.Excititor.Core.Orchestration;
using StellaOps.Excititor.Storage.Mongo;
using StellaOps.Excititor.Worker.Options;
using StellaOps.Excititor.Worker.Orchestration;
using StellaOps.Excititor.Worker.Signature;
namespace StellaOps.Excititor.Worker.Scheduling;
@@ -19,19 +21,27 @@ internal sealed class DefaultVexProviderRunner : IVexProviderRunner
{
private readonly IServiceProvider _serviceProvider;
private readonly PluginCatalog _pluginCatalog;
private readonly IVexWorkerOrchestratorClient _orchestratorClient;
private readonly VexWorkerHeartbeatService _heartbeatService;
private readonly ILogger<DefaultVexProviderRunner> _logger;
private readonly TimeProvider _timeProvider;
private readonly VexWorkerRetryOptions _retryOptions;
private readonly VexWorkerOrchestratorOptions _orchestratorOptions;
public DefaultVexProviderRunner(
IServiceProvider serviceProvider,
PluginCatalog pluginCatalog,
IVexWorkerOrchestratorClient orchestratorClient,
VexWorkerHeartbeatService heartbeatService,
ILogger<DefaultVexProviderRunner> logger,
TimeProvider timeProvider,
IOptions<VexWorkerOptions> workerOptions)
IOptions<VexWorkerOptions> workerOptions,
IOptions<VexWorkerOrchestratorOptions> orchestratorOptions)
{
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_pluginCatalog = pluginCatalog ?? throw new ArgumentNullException(nameof(pluginCatalog));
_orchestratorClient = orchestratorClient ?? throw new ArgumentNullException(nameof(orchestratorClient));
_heartbeatService = heartbeatService ?? throw new ArgumentNullException(nameof(heartbeatService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
if (workerOptions is null)
@@ -40,6 +50,7 @@ internal sealed class DefaultVexProviderRunner : IVexProviderRunner
}
_retryOptions = workerOptions.Value?.Retry ?? throw new InvalidOperationException("VexWorkerOptions.Retry must be configured.");
_orchestratorOptions = orchestratorOptions?.Value ?? new VexWorkerOrchestratorOptions();
}
public async ValueTask RunAsync(VexWorkerSchedule schedule, CancellationToken cancellationToken)
@@ -118,7 +129,7 @@ internal sealed class DefaultVexProviderRunner : IVexProviderRunner
var verifyingSink = new VerifyingVexRawDocumentSink(rawStore, signatureVerifier);
var context = new VexConnectorContext(
var connectorContext = new VexConnectorContext(
Since: stateBeforeRun?.LastUpdated,
Settings: effectiveSettings,
RawSink: verifyingSink,
@@ -127,33 +138,128 @@ internal sealed class DefaultVexProviderRunner : IVexProviderRunner
Services: scopeProvider,
ResumeTokens: stateBeforeRun?.ResumeTokens ?? ImmutableDictionary<string, string>.Empty);
// Start orchestrator job for heartbeat/progress tracking
var jobContext = await _orchestratorClient.StartJobAsync(
_orchestratorOptions.DefaultTenant,
connector.Id,
stateBeforeRun?.LastCheckpoint,
cancellationToken).ConfigureAwait(false);
var documentCount = 0;
string? lastArtifactHash = null;
string? lastArtifactKind = null;
var currentStatus = VexWorkerHeartbeatStatus.Running;
// Start heartbeat loop in background
using var heartbeatCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
var heartbeatTask = _heartbeatService.RunAsync(
jobContext,
() => currentStatus,
() => null, // Progress not tracked at document level
() => lastArtifactHash,
() => lastArtifactKind,
heartbeatCts.Token);
try
{
await foreach (var document in connector.FetchAsync(context, cancellationToken).ConfigureAwait(false))
await foreach (var document in connector.FetchAsync(connectorContext, cancellationToken).ConfigureAwait(false))
{
documentCount++;
lastArtifactHash = document.Digest;
lastArtifactKind = "vex-raw-document";
// Record artifact for determinism tracking
if (_orchestratorOptions.Enabled)
{
var artifact = new VexWorkerArtifact(
document.Digest,
"vex-raw-document",
connector.Id,
document.Digest,
_timeProvider.GetUtcNow());
await _orchestratorClient.RecordArtifactAsync(jobContext, artifact, cancellationToken).ConfigureAwait(false);
}
}
// Stop heartbeat loop
currentStatus = VexWorkerHeartbeatStatus.Succeeded;
await heartbeatCts.CancelAsync().ConfigureAwait(false);
await SafeWaitForTaskAsync(heartbeatTask).ConfigureAwait(false);
_logger.LogInformation(
"Connector {ConnectorId} persisted {DocumentCount} raw document(s) this run.",
connector.Id,
documentCount);
await UpdateSuccessStateAsync(stateRepository, descriptor.Id, _timeProvider.GetUtcNow(), cancellationToken).ConfigureAwait(false);
// Complete orchestrator job
var completedAt = _timeProvider.GetUtcNow();
var result = new VexWorkerJobResult(
documentCount,
ClaimsGenerated: 0, // Claims generated in separate normalization pass
lastArtifactHash,
lastArtifactHash,
completedAt);
await _orchestratorClient.CompleteJobAsync(jobContext, result, cancellationToken).ConfigureAwait(false);
await UpdateSuccessStateAsync(stateRepository, descriptor.Id, completedAt, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
currentStatus = VexWorkerHeartbeatStatus.Failed;
await heartbeatCts.CancelAsync().ConfigureAwait(false);
await SafeWaitForTaskAsync(heartbeatTask).ConfigureAwait(false);
var error = VexWorkerError.Cancelled("Operation cancelled by host");
await _orchestratorClient.FailJobAsync(jobContext, error, CancellationToken.None).ConfigureAwait(false);
throw;
}
catch (Exception ex)
{
await UpdateFailureStateAsync(stateRepository, descriptor.Id, _timeProvider.GetUtcNow(), ex, cancellationToken).ConfigureAwait(false);
currentStatus = VexWorkerHeartbeatStatus.Failed;
await heartbeatCts.CancelAsync().ConfigureAwait(false);
await SafeWaitForTaskAsync(heartbeatTask).ConfigureAwait(false);
// Classify the error for appropriate retry handling
var classifiedError = VexWorkerError.FromException(ex, stage: "fetch");
// Apply backoff delay for retryable errors
var retryDelay = classifiedError.Retryable
? (int)CalculateDelayWithJitter(1).TotalSeconds
: (int?)null;
var errorWithRetry = classifiedError.Retryable && retryDelay.HasValue
? new VexWorkerError(
classifiedError.Code,
classifiedError.Category,
classifiedError.Message,
classifiedError.Retryable,
retryDelay,
classifiedError.Stage,
classifiedError.Details)
: classifiedError;
await _orchestratorClient.FailJobAsync(jobContext, errorWithRetry, CancellationToken.None).ConfigureAwait(false);
await UpdateFailureStateAsync(stateRepository, descriptor.Id, _timeProvider.GetUtcNow(), ex, classifiedError.Retryable, cancellationToken).ConfigureAwait(false);
throw;
}
}
private static async Task SafeWaitForTaskAsync(Task task)
{
try
{
await task.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Expected when cancellation is requested
}
}
private async Task UpdateSuccessStateAsync(
IVexConnectorStateRepository stateRepository,
string connectorId,
@@ -179,33 +285,45 @@ internal sealed class DefaultVexProviderRunner : IVexProviderRunner
string connectorId,
DateTimeOffset failureTime,
Exception exception,
bool retryable,
CancellationToken cancellationToken)
{
var current = await stateRepository.GetAsync(connectorId, cancellationToken).ConfigureAwait(false)
?? new VexConnectorState(connectorId, null, ImmutableArray<string>.Empty);
var failureCount = current.FailureCount + 1;
var delay = CalculateDelayWithJitter(failureCount);
var nextEligible = failureTime + delay;
DateTimeOffset? nextEligible;
if (failureCount >= _retryOptions.FailureThreshold)
if (retryable)
{
var quarantineUntil = failureTime + _retryOptions.QuarantineDuration;
if (quarantineUntil > nextEligible)
// Apply exponential backoff for retryable errors
var delay = CalculateDelayWithJitter(failureCount);
nextEligible = failureTime + delay;
if (failureCount >= _retryOptions.FailureThreshold)
{
nextEligible = quarantineUntil;
var quarantineUntil = failureTime + _retryOptions.QuarantineDuration;
if (quarantineUntil > nextEligible)
{
nextEligible = quarantineUntil;
}
}
var retryCap = failureTime + _retryOptions.RetryCap;
if (nextEligible > retryCap)
{
nextEligible = retryCap;
}
if (nextEligible < failureTime)
{
nextEligible = failureTime;
}
}
var retryCap = failureTime + _retryOptions.RetryCap;
if (nextEligible > retryCap)
else
{
nextEligible = retryCap;
}
if (nextEligible < failureTime)
{
nextEligible = failureTime;
// Non-retryable errors: apply quarantine immediately
nextEligible = failureTime + _retryOptions.QuarantineDuration;
}
var updated = current with
@@ -219,9 +337,10 @@ internal sealed class DefaultVexProviderRunner : IVexProviderRunner
_logger.LogWarning(
exception,
"Connector {ConnectorId} failed (attempt {Attempt}). Next eligible run at {NextEligible:O}.",
"Connector {ConnectorId} failed (attempt {Attempt}, retryable={Retryable}). Next eligible run at {NextEligible:O}.",
connectorId,
failureCount,
retryable,
nextEligible);
}