up
Some checks failed
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
Policy Lint & Smoke / policy-lint (push) Has been cancelled

This commit is contained in:
StellaOps Bot
2025-12-01 21:16:22 +02:00
parent c11d87d252
commit 909d9b6220
208 changed files with 860954 additions and 832 deletions

View File

@@ -7,38 +7,63 @@ namespace StellaOps.Excititor.Worker.Options;
/// </summary>
public sealed class VexWorkerOrchestratorOptions
{
/// <summary>
/// Whether orchestrator integration is enabled.
/// </summary>
/// <summary>Whether orchestrator integration is enabled.</summary>
public bool Enabled { get; set; } = true;
/// <summary>
/// Interval between heartbeat emissions during job execution.
/// </summary>
/// <summary>Base address of the Orchestrator WebService (e.g. "https://orch.local/").</summary>
public Uri? BaseAddress { get; set; }
/// <summary>Logical job type registered with Orchestrator.</summary>
public string JobType { get; set; } = "exc-vex-ingest";
/// <summary>Unique worker identifier presented to Orchestrator.</summary>
public string WorkerId { get; set; } = "excititor-worker";
/// <summary>Optional task runner identifier (e.g. host name or pod).</summary>
public string? TaskRunnerId { get; set; }
/// <summary>Tenant header name; defaults to Orchestrator default.</summary>
public string TenantHeader { get; set; } = "X-Tenant-Id";
/// <summary>Tenant value to present when claiming jobs.</summary>
public string DefaultTenant { get; set; } = "default";
/// <summary>API key header name for worker auth.</summary>
public string ApiKeyHeader { get; set; } = "X-Worker-Token";
/// <summary>Optional API key value.</summary>
public string? ApiKey { get; set; }
/// <summary>Optional bearer token value.</summary>
public string? BearerToken { get; set; }
/// <summary>Interval between heartbeat emissions during job execution.</summary>
public TimeSpan HeartbeatInterval { get; set; } = TimeSpan.FromSeconds(30);
/// <summary>
/// Minimum heartbeat interval (safety floor).
/// </summary>
/// <summary>Minimum heartbeat interval (safety floor).</summary>
public TimeSpan MinHeartbeatInterval { get; set; } = TimeSpan.FromSeconds(5);
/// <summary>
/// Maximum heartbeat interval (safety cap).
/// </summary>
/// <summary>Maximum heartbeat interval (safety cap).</summary>
public TimeSpan MaxHeartbeatInterval { get; set; } = TimeSpan.FromMinutes(2);
/// <summary>
/// Enable verbose logging for heartbeat/artifact events.
/// </summary>
/// <summary>Lease duration requested when claiming jobs.</summary>
public TimeSpan DefaultLeaseDuration { get; set; } = TimeSpan.FromMinutes(5);
/// <summary>Lease extension requested on each heartbeat.</summary>
public TimeSpan HeartbeatExtend { get; set; } = TimeSpan.FromMinutes(1);
/// <summary>HTTP request timeout when talking to Orchestrator.</summary>
public TimeSpan RequestTimeout { get; set; } = TimeSpan.FromSeconds(15);
/// <summary>Enable verbose logging for heartbeat/artifact events.</summary>
public bool EnableVerboseLogging { get; set; }
/// <summary>
/// Maximum number of artifact hashes to retain in state.
/// </summary>
/// <summary>Maximum number of artifact hashes to retain in state.</summary>
public int MaxArtifactHashes { get; set; } = 1000;
/// <summary>
/// Default tenant for worker jobs when not specified.
/// </summary>
public string DefaultTenant { get; set; } = "default";
/// <summary>Emit progress events for artifacts while running.</summary>
public bool EmitProgressForArtifacts { get; set; } = true;
/// <summary>Fallback to local state only when orchestrator is unreachable.</summary>
public bool AllowLocalFallback { get; set; } = true;
}

View File

@@ -1,5 +1,11 @@
using System;
using System.Collections.Immutable;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
@@ -13,7 +19,7 @@ namespace StellaOps.Excititor.Worker.Orchestration;
/// <summary>
/// Default implementation of <see cref="IVexWorkerOrchestratorClient"/>.
/// Stores heartbeats and artifacts locally and emits them to the orchestrator registry when configured.
/// Stores heartbeats and artifacts locally and, when configured, mirrors them to the Orchestrator worker API.
/// </summary>
internal sealed class VexWorkerOrchestratorClient : IVexWorkerOrchestratorClient
{
@@ -21,37 +27,94 @@ internal sealed class VexWorkerOrchestratorClient : IVexWorkerOrchestratorClient
private readonly TimeProvider _timeProvider;
private readonly IOptions<VexWorkerOrchestratorOptions> _options;
private readonly ILogger<VexWorkerOrchestratorClient> _logger;
private readonly HttpClient? _httpClient;
private readonly JsonSerializerOptions _serializerOptions;
private VexWorkerCommand? _pendingCommand;
private long _commandSequence;
public VexWorkerOrchestratorClient(
IVexConnectorStateRepository stateRepository,
TimeProvider timeProvider,
IOptions<VexWorkerOrchestratorOptions> options,
ILogger<VexWorkerOrchestratorClient> logger)
ILogger<VexWorkerOrchestratorClient> logger,
HttpClient? httpClient = null)
{
_stateRepository = stateRepository ?? throw new ArgumentNullException(nameof(stateRepository));
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_httpClient = httpClient;
_serializerOptions = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull
};
}
public ValueTask<VexWorkerJobContext> StartJobAsync(
public async ValueTask<VexWorkerJobContext> StartJobAsync(
string tenant,
string connectorId,
string? checkpoint,
CancellationToken cancellationToken = default)
{
var runId = Guid.NewGuid();
var startedAt = _timeProvider.GetUtcNow();
var context = new VexWorkerJobContext(tenant, connectorId, runId, checkpoint, startedAt);
var fallbackContext = new VexWorkerJobContext(tenant, connectorId, Guid.NewGuid(), checkpoint, startedAt);
if (!CanUseRemote())
{
return fallbackContext;
}
var claimRequest = new ClaimRequest(
WorkerId: _options.Value.WorkerId,
TaskRunnerId: _options.Value.TaskRunnerId ?? Environment.MachineName,
JobType: ResolveJobType(connectorId),
LeaseSeconds: ResolveLeaseSeconds(),
IdempotencyKey: $"exc-{connectorId}-{startedAt.ToUnixTimeSeconds()}");
var response = await PostAsync("api/v1/orchestrator/worker/claim", tenant, claimRequest, cancellationToken).ConfigureAwait(false);
if (response is null)
{
return fallbackContext;
}
if (response.StatusCode == HttpStatusCode.NoContent)
{
_logger.LogInformation("Orchestrator had no jobs for {ConnectorId}; continuing with local execution.", connectorId);
return fallbackContext;
}
if (!response.IsSuccessStatusCode)
{
await HandleErrorResponseAsync("claim", response, connectorId, cancellationToken).ConfigureAwait(false);
return fallbackContext;
}
var claim = await DeserializeAsync<ClaimResponse>(response, cancellationToken).ConfigureAwait(false);
if (claim is null)
{
return fallbackContext;
}
_logger.LogInformation(
"Orchestrator job started: tenant={Tenant} connector={ConnectorId} runId={RunId} checkpoint={Checkpoint}",
"Orchestrator job claimed: tenant={Tenant} connector={ConnectorId} jobId={JobId} leaseUntil={LeaseUntil:O}",
tenant,
connectorId,
runId,
checkpoint ?? "(none)");
claim.JobId,
claim.LeaseUntil);
return ValueTask.FromResult(context);
return new VexWorkerJobContext(
tenant,
connectorId,
claim.JobId,
checkpoint,
startedAt,
orchestratorJobId: claim.JobId,
orchestratorLeaseId: claim.LeaseId,
leaseExpiresAt: claim.LeaseUntil,
jobType: claim.JobType,
correlationId: claim.CorrelationId,
orchestratorRunId: claim.RunId);
}
public async ValueTask SendHeartbeatAsync(
@@ -87,6 +150,8 @@ internal sealed class VexWorkerOrchestratorClient : IVexWorkerOrchestratorClient
heartbeat.Progress,
heartbeat.LastArtifactHash);
}
await SendRemoteHeartbeatAsync(context, heartbeat, cancellationToken).ConfigureAwait(false);
}
public async ValueTask RecordArtifactAsync(
@@ -106,7 +171,7 @@ internal sealed class VexWorkerOrchestratorClient : IVexWorkerOrchestratorClient
: state.DocumentDigests;
// Add artifact hash if not already tracked (cap to avoid unbounded growth)
const int maxDigests = 1000;
var maxDigests = Math.Max(1, _options.Value.MaxArtifactHashes);
if (!digests.Contains(artifact.Hash))
{
digests = digests.Length >= maxDigests
@@ -129,6 +194,8 @@ internal sealed class VexWorkerOrchestratorClient : IVexWorkerOrchestratorClient
artifact.Hash,
artifact.Kind,
artifact.ProviderId);
await SendRemoteProgressForArtifactAsync(context, artifact, cancellationToken).ConfigureAwait(false);
}
public async ValueTask CompleteJobAsync(
@@ -165,6 +232,8 @@ internal sealed class VexWorkerOrchestratorClient : IVexWorkerOrchestratorClient
result.DocumentsProcessed,
result.ClaimsGenerated,
duration);
await SendRemoteCompletionAsync(context, result, cancellationToken).ConfigureAwait(false);
}
public async ValueTask FailJobAsync(
@@ -202,6 +271,13 @@ internal sealed class VexWorkerOrchestratorClient : IVexWorkerOrchestratorClient
context.ConnectorId,
errorCode,
retryAfterSeconds);
await SendRemoteCompletionAsync(
context,
new VexWorkerJobResult(0, 0, state.LastCheckpoint, state.LastArtifactHash, now),
cancellationToken,
success: false,
failureReason: Truncate($"{errorCode}: {errorMessage}", 256)).ConfigureAwait(false);
}
public ValueTask FailJobAsync(
@@ -232,16 +308,13 @@ internal sealed class VexWorkerOrchestratorClient : IVexWorkerOrchestratorClient
{
ArgumentNullException.ThrowIfNull(context);
// In this local implementation, commands are not externally sourced.
// Return Continue to indicate normal processing should continue.
// A full orchestrator integration would poll a command queue here.
if (!_options.Value.Enabled)
{
return ValueTask.FromResult<VexWorkerCommand?>(null);
}
// No pending commands in local mode
return ValueTask.FromResult<VexWorkerCommand?>(null);
var command = Interlocked.Exchange(ref _pendingCommand, null);
return ValueTask.FromResult(command);
}
public ValueTask AcknowledgeCommandAsync(
@@ -256,7 +329,6 @@ internal sealed class VexWorkerOrchestratorClient : IVexWorkerOrchestratorClient
context.RunId,
commandSequence);
// In local mode, acknowledgment is a no-op
return ValueTask.CompletedTask;
}
@@ -314,6 +386,12 @@ internal sealed class VexWorkerOrchestratorClient : IVexWorkerOrchestratorClient
state.ResumeTokens.IsEmpty ? ImmutableDictionary<string, string>.Empty : state.ResumeTokens);
}
private bool CanUseRemote()
{
var opts = _options.Value;
return opts.Enabled && _httpClient is not null && opts.BaseAddress is not null;
}
private static string Truncate(string? value, int maxLength)
{
if (string.IsNullOrEmpty(value))
@@ -325,4 +403,276 @@ internal sealed class VexWorkerOrchestratorClient : IVexWorkerOrchestratorClient
? value
: value[..maxLength];
}
private int ResolveLeaseSeconds()
{
var seconds = (int)Math.Round(_options.Value.DefaultLeaseDuration.TotalSeconds);
return Math.Clamp(seconds, 30, 3600);
}
private int ResolveHeartbeatExtendSeconds()
{
var opts = _options.Value;
var seconds = (int)Math.Round(opts.HeartbeatExtend.TotalSeconds);
var min = (int)Math.Round(opts.MinHeartbeatInterval.TotalSeconds);
var max = (int)Math.Round(opts.MaxHeartbeatInterval.TotalSeconds);
return Math.Clamp(seconds, min, max);
}
private string ResolveJobType(string connectorId)
{
return string.IsNullOrWhiteSpace(_options.Value.JobType)
? $"exc-vex-{connectorId}"
: _options.Value.JobType;
}
private async ValueTask SendRemoteHeartbeatAsync(
VexWorkerJobContext context,
VexWorkerHeartbeat heartbeat,
CancellationToken cancellationToken)
{
if (!CanUseRemote() || context.OrchestratorJobId is null || context.OrchestratorLeaseId is null)
{
return;
}
var request = new HeartbeatRequest(
context.OrchestratorLeaseId.Value,
ResolveHeartbeatExtendSeconds(),
IdempotencyKey: $"hb-{context.RunId}-{context.Sequence}");
var response = await PostAsync(
$"api/v1/orchestrator/worker/jobs/{context.OrchestratorJobId}/heartbeat",
context.Tenant,
request,
cancellationToken).ConfigureAwait(false);
if (response is null)
{
return;
}
if (response.IsSuccessStatusCode)
{
var hb = await DeserializeAsync<HeartbeatResponse>(response, cancellationToken).ConfigureAwait(false);
if (hb?.LeaseUntil is not null)
{
context.UpdateLease(hb.LeaseUntil);
}
return;
}
await HandleErrorResponseAsync("heartbeat", response, context.ConnectorId, cancellationToken).ConfigureAwait(false);
}
private async ValueTask SendRemoteProgressForArtifactAsync(
VexWorkerJobContext context,
VexWorkerArtifact artifact,
CancellationToken cancellationToken)
{
if (!CanUseRemote() || !_options.Value.EmitProgressForArtifacts || context.OrchestratorJobId is null || context.OrchestratorLeaseId is null)
{
return;
}
var metadata = Serialize(new
{
artifact.Hash,
artifact.Kind,
artifact.ProviderId,
artifact.DocumentId,
artifact.CreatedAt
});
var request = new ProgressRequest(
context.OrchestratorLeaseId.Value,
ProgressPercent: null,
Message: $"artifact:{artifact.Kind}",
Metadata: metadata,
IdempotencyKey: $"artifact-{artifact.Hash}");
var response = await PostAsync(
$"api/v1/orchestrator/worker/jobs/{context.OrchestratorJobId}/progress",
context.Tenant,
request,
cancellationToken).ConfigureAwait(false);
if (response is not null && !response.IsSuccessStatusCode)
{
await HandleErrorResponseAsync("progress", response, context.ConnectorId, cancellationToken).ConfigureAwait(false);
}
}
private async ValueTask SendRemoteCompletionAsync(
VexWorkerJobContext context,
VexWorkerJobResult result,
CancellationToken cancellationToken,
bool success = true,
string? failureReason = null)
{
if (!CanUseRemote() || context.OrchestratorJobId is null || context.OrchestratorLeaseId is null)
{
return;
}
var request = new CompleteRequest(
context.OrchestratorLeaseId.Value,
success,
success ? null : failureReason,
Artifacts: Array.Empty<ArtifactInput>(),
ResultDigest: result.LastArtifactHash,
IdempotencyKey: $"complete-{context.RunId}-{context.Sequence}");
var response = await PostAsync(
$"api/v1/orchestrator/worker/jobs/{context.OrchestratorJobId}/complete",
context.Tenant,
request,
cancellationToken).ConfigureAwait(false);
if (response is not null && !response.IsSuccessStatusCode)
{
await HandleErrorResponseAsync("complete", response, context.ConnectorId, cancellationToken).ConfigureAwait(false);
}
}
private async Task<HttpResponseMessage?> PostAsync<TPayload>(
string path,
string tenant,
TPayload payload,
CancellationToken cancellationToken)
{
if (!CanUseRemote())
{
return null;
}
var request = new HttpRequestMessage(HttpMethod.Post, path)
{
Content = new StringContent(JsonSerializer.Serialize(payload, _serializerOptions), Encoding.UTF8, "application/json")
};
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
var opts = _options.Value;
request.Headers.TryAddWithoutValidation(string.IsNullOrWhiteSpace(opts.TenantHeader) ? "X-Tenant-Id" : opts.TenantHeader, tenant);
if (!string.IsNullOrWhiteSpace(opts.ApiKey))
{
request.Headers.TryAddWithoutValidation(string.IsNullOrWhiteSpace(opts.ApiKeyHeader) ? "X-Worker-Token" : opts.ApiKeyHeader, opts.ApiKey);
}
if (!string.IsNullOrWhiteSpace(opts.BearerToken))
{
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", opts.BearerToken);
}
try
{
return await _httpClient!.SendAsync(request, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex) when (opts.AllowLocalFallback)
{
_logger.LogWarning(ex, "Failed to contact Orchestrator ({Path}); continuing locally.", path);
StorePendingCommand(VexWorkerCommandKind.Retry, reason: "orchestrator_unreachable", retryAfterSeconds: 60);
return null;
}
}
private async ValueTask<T?> DeserializeAsync<T>(HttpResponseMessage response, CancellationToken cancellationToken)
{
var stream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
return await JsonSerializer.DeserializeAsync<T>(stream, _serializerOptions, cancellationToken).ConfigureAwait(false);
}
private async Task HandleErrorResponseAsync(
string stage,
HttpResponseMessage response,
string connectorId,
CancellationToken cancellationToken)
{
ErrorResponse? error = null;
try
{
error = await DeserializeAsync<ErrorResponse>(response, cancellationToken).ConfigureAwait(false);
}
catch
{
// ignore parse issues; fall back to status code handling
}
var retryAfter = error?.RetryAfterSeconds;
switch (response.StatusCode)
{
case HttpStatusCode.TooManyRequests:
StorePendingCommand(VexWorkerCommandKind.Throttle, reason: error?.Message ?? "rate_limited", retryAfterSeconds: retryAfter ?? 60);
break;
case HttpStatusCode.Conflict:
StorePendingCommand(VexWorkerCommandKind.Retry, reason: error?.Message ?? "lease_conflict", retryAfterSeconds: retryAfter ?? 30);
break;
case HttpStatusCode.ServiceUnavailable:
case HttpStatusCode.BadGateway:
case HttpStatusCode.GatewayTimeout:
StorePendingCommand(VexWorkerCommandKind.Pause, reason: error?.Message ?? "orchestrator_unavailable", retryAfterSeconds: retryAfter ?? 120);
break;
default:
StorePendingCommand(VexWorkerCommandKind.Retry, reason: error?.Message ?? "orchestrator_error", retryAfterSeconds: retryAfter ?? 30);
break;
}
_logger.LogWarning(
"Orchestrator {Stage} call failed for connector {ConnectorId}: {Status} {Error}",
stage,
connectorId,
response.StatusCode,
error?.Message ?? response.ReasonPhrase);
}
private void StorePendingCommand(VexWorkerCommandKind kind, string? reason = null, int? retryAfterSeconds = null)
{
var issuedAt = _timeProvider.GetUtcNow();
var sequence = Interlocked.Increment(ref _commandSequence);
var expiresAt = retryAfterSeconds.HasValue ? issuedAt.AddSeconds(retryAfterSeconds.Value) : (DateTimeOffset?)null;
_pendingCommand = new VexWorkerCommand(
kind,
sequence,
issuedAt,
expiresAt,
Throttle: kind == VexWorkerCommandKind.Throttle && retryAfterSeconds.HasValue
? new VexWorkerThrottleParams(null, null, retryAfterSeconds)
: null,
Reason: reason);
}
private string Serialize(object value) => JsonSerializer.Serialize(value, _serializerOptions);
private sealed record ClaimRequest(string WorkerId, string? TaskRunnerId, string? JobType, int? LeaseSeconds, string? IdempotencyKey);
private sealed record ClaimResponse(
Guid JobId,
Guid LeaseId,
string JobType,
string Payload,
string PayloadDigest,
int Attempt,
int MaxAttempts,
DateTimeOffset LeaseUntil,
string IdempotencyKey,
string? CorrelationId,
Guid? RunId,
string? ProjectId);
private sealed record HeartbeatRequest(Guid LeaseId, int? ExtendSeconds, string? IdempotencyKey);
private sealed record HeartbeatResponse(Guid JobId, Guid LeaseId, DateTimeOffset LeaseUntil, bool Acknowledged);
private sealed record ProgressRequest(Guid LeaseId, double? ProgressPercent, string? Message, string? Metadata, string? IdempotencyKey);
private sealed record CompleteRequest(Guid LeaseId, bool Success, string? Reason, IReadOnlyList<ArtifactInput>? Artifacts, string? ResultDigest, string? IdempotencyKey);
private sealed record ArtifactInput(string ArtifactType, string Uri, string Digest, string? MimeType, long? SizeBytes, string? Metadata);
private sealed record ErrorResponse(string Error, string Message, Guid? JobId, int? RetryAfterSeconds);
}

View File

@@ -109,7 +109,16 @@ services.AddSingleton<PluginCatalog>(provider =>
services.AddOptions<VexWorkerOrchestratorOptions>()
.Bind(configuration.GetSection("Excititor:Worker:Orchestrator"))
.ValidateOnStart();
services.AddSingleton<IVexWorkerOrchestratorClient, VexWorkerOrchestratorClient>();
services.AddHttpClient<IVexWorkerOrchestratorClient, VexWorkerOrchestratorClient>((provider, client) =>
{
var opts = provider.GetRequiredService<IOptions<VexWorkerOrchestratorOptions>>().Value;
if (opts.BaseAddress is not null)
{
client.BaseAddress = opts.BaseAddress;
}
client.Timeout = opts.RequestTimeout;
});
services.AddSingleton<VexWorkerHeartbeatService>();
services.AddSingleton<IVexProviderRunner, DefaultVexProviderRunner>();