Files
git.stella-ops.org/src/VexLens/StellaOps.VexLens/Orchestration/IConsensusJobService.cs
StellaOps Bot 75611a505f save progress
2026-01-04 19:08:47 +02:00

483 lines
17 KiB
C#

using System.Text.Json;
using StellaOps.VexLens.Consensus;
using StellaOps.VexLens.Export;
using StellaOps.VexLens.Models;
using StellaOps.VexLens.Storage;
namespace StellaOps.VexLens.Orchestration;
/// <summary>
/// Service for creating and managing consensus compute jobs with the orchestrator.
/// </summary>
public interface IConsensusJobService
{
/// <summary>
/// Creates a job request for single consensus computation.
/// </summary>
ConsensusJobRequest CreateComputeJob(
string vulnerabilityId,
string productKey,
string? tenantId = null,
bool forceRecompute = false);
/// <summary>
/// Creates a job request for batch consensus computation.
/// </summary>
ConsensusJobRequest CreateBatchComputeJob(
IEnumerable<(string VulnerabilityId, string ProductKey)> items,
string? tenantId = null);
/// <summary>
/// Creates a job request for incremental update after VEX statement ingestion.
/// </summary>
ConsensusJobRequest CreateIncrementalUpdateJob(
IEnumerable<string> statementIds,
string triggeredBy);
/// <summary>
/// Creates a job request for trust weight recalibration.
/// </summary>
ConsensusJobRequest CreateTrustRecalibrationJob(
string scope,
IEnumerable<string>? affectedIssuers = null);
/// <summary>
/// Creates a job request for projection refresh.
/// </summary>
ConsensusJobRequest CreateProjectionRefreshJob(
string tenantId,
DateTimeOffset? since = null,
VexStatus? status = null);
/// <summary>
/// Creates a job request for snapshot creation.
/// </summary>
ConsensusJobRequest CreateSnapshotJob(SnapshotRequest request);
/// <summary>
/// Executes a consensus job and returns the result.
/// </summary>
Task<ConsensusJobResult> ExecuteJobAsync(
ConsensusJobRequest request,
CancellationToken cancellationToken = default);
/// <summary>
/// Gets the job type registration information.
/// </summary>
ConsensusJobTypeRegistration GetRegistration();
}
/// <summary>
/// A consensus job request to be sent to the orchestrator.
/// </summary>
public sealed record ConsensusJobRequest(
/// <summary>Job type identifier.</summary>
string JobType,
/// <summary>Tenant ID for the job.</summary>
string? TenantId,
/// <summary>Job priority (higher = more urgent).</summary>
int Priority,
/// <summary>Idempotency key for deduplication.</summary>
string IdempotencyKey,
/// <summary>JSON payload for the job.</summary>
string Payload,
/// <summary>Correlation ID for tracing.</summary>
string? CorrelationId = null,
/// <summary>Maximum retry attempts.</summary>
int MaxAttempts = 3);
/// <summary>
/// Result of a consensus job execution.
/// </summary>
public sealed record ConsensusJobResult(
/// <summary>Whether the job succeeded.</summary>
bool Success,
/// <summary>Job type that was executed.</summary>
string JobType,
/// <summary>Number of items processed.</summary>
int ItemsProcessed,
/// <summary>Number of items that failed.</summary>
int ItemsFailed,
/// <summary>Execution duration.</summary>
TimeSpan Duration,
/// <summary>Result payload (job-type specific).</summary>
string? ResultPayload,
/// <summary>Error message if failed.</summary>
string? ErrorMessage);
/// <summary>
/// Registration information for consensus job types.
/// </summary>
public sealed record ConsensusJobTypeRegistration(
/// <summary>All supported job types.</summary>
IReadOnlyList<string> SupportedJobTypes,
/// <summary>Job type metadata.</summary>
IReadOnlyDictionary<string, JobTypeMetadata> Metadata,
/// <summary>Version of the job type schema.</summary>
string SchemaVersion);
/// <summary>
/// Metadata about a job type.
/// </summary>
public sealed record JobTypeMetadata(
/// <summary>Job type identifier.</summary>
string JobType,
/// <summary>Human-readable description.</summary>
string Description,
/// <summary>Default priority.</summary>
int DefaultPriority,
/// <summary>Whether batching is supported.</summary>
bool SupportsBatching,
/// <summary>Typical execution timeout.</summary>
TimeSpan DefaultTimeout,
/// <summary>JSON schema for the payload.</summary>
string? PayloadSchema);
/// <summary>
/// Default implementation of consensus job service.
/// </summary>
public sealed class ConsensusJobService : IConsensusJobService
{
private readonly IVexConsensusEngine _consensusEngine;
private readonly IConsensusProjectionStore _projectionStore;
private readonly IConsensusExportService _exportService;
private readonly TimeProvider _timeProvider;
private const string SchemaVersion = "1.0.0";
private static readonly JsonSerializerOptions JsonOptions = new()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
WriteIndented = false
};
public ConsensusJobService(
IVexConsensusEngine consensusEngine,
IConsensusProjectionStore projectionStore,
IConsensusExportService exportService,
TimeProvider? timeProvider = null)
{
_consensusEngine = consensusEngine;
_projectionStore = projectionStore;
_exportService = exportService;
_timeProvider = timeProvider ?? TimeProvider.System;
}
public ConsensusJobRequest CreateComputeJob(
string vulnerabilityId,
string productKey,
string? tenantId = null,
bool forceRecompute = false)
{
var payload = new
{
vulnerabilityId,
productKey,
tenantId,
forceRecompute
};
return new ConsensusJobRequest(
JobType: ConsensusJobTypes.Compute,
TenantId: tenantId,
Priority: ConsensusJobTypes.GetDefaultPriority(ConsensusJobTypes.Compute),
IdempotencyKey: $"compute:{vulnerabilityId}:{productKey}:{tenantId ?? "default"}",
Payload: JsonSerializer.Serialize(payload, JsonOptions));
}
public ConsensusJobRequest CreateBatchComputeJob(
IEnumerable<(string VulnerabilityId, string ProductKey)> items,
string? tenantId = null)
{
var itemsList = items.Select(i => new { vulnerabilityId = i.VulnerabilityId, productKey = i.ProductKey }).ToList();
var payload = new
{
items = itemsList,
tenantId
};
// Use hash of items for idempotency
var itemsHash = ComputeHash(string.Join("|", itemsList.Select(i => $"{i.vulnerabilityId}:{i.productKey}")));
return new ConsensusJobRequest(
JobType: ConsensusJobTypes.BatchCompute,
TenantId: tenantId,
Priority: ConsensusJobTypes.GetDefaultPriority(ConsensusJobTypes.BatchCompute),
IdempotencyKey: $"batch:{itemsHash}:{tenantId ?? "default"}",
Payload: JsonSerializer.Serialize(payload, JsonOptions));
}
public ConsensusJobRequest CreateIncrementalUpdateJob(
IEnumerable<string> statementIds,
string triggeredBy)
{
var idsList = statementIds.ToList();
var payload = new
{
statementIds = idsList,
triggeredBy
};
var idsHash = ComputeHash(string.Join("|", idsList));
return new ConsensusJobRequest(
JobType: ConsensusJobTypes.IncrementalUpdate,
TenantId: null,
Priority: ConsensusJobTypes.GetDefaultPriority(ConsensusJobTypes.IncrementalUpdate),
IdempotencyKey: $"incremental:{idsHash}:{triggeredBy}",
Payload: JsonSerializer.Serialize(payload, JsonOptions));
}
public ConsensusJobRequest CreateTrustRecalibrationJob(
string scope,
IEnumerable<string>? affectedIssuers = null)
{
var payload = new
{
scope,
affectedIssuers = affectedIssuers?.ToList()
};
var issuersHash = affectedIssuers != null
? ComputeHash(string.Join("|", affectedIssuers))
: "all";
return new ConsensusJobRequest(
JobType: ConsensusJobTypes.TrustRecalibration,
TenantId: null,
Priority: ConsensusJobTypes.GetDefaultPriority(ConsensusJobTypes.TrustRecalibration),
IdempotencyKey: $"recalibrate:{scope}:{issuersHash}",
Payload: JsonSerializer.Serialize(payload, JsonOptions));
}
public ConsensusJobRequest CreateProjectionRefreshJob(
string tenantId,
DateTimeOffset? since = null,
VexStatus? status = null)
{
var payload = new
{
tenantId,
since,
status = status?.ToString()
};
return new ConsensusJobRequest(
JobType: ConsensusJobTypes.ProjectionRefresh,
TenantId: tenantId,
Priority: ConsensusJobTypes.GetDefaultPriority(ConsensusJobTypes.ProjectionRefresh),
IdempotencyKey: $"refresh:{tenantId}:{since?.ToString("O") ?? "all"}:{status?.ToString() ?? "all"}",
Payload: JsonSerializer.Serialize(payload, JsonOptions));
}
public ConsensusJobRequest CreateSnapshotJob(SnapshotRequest request)
{
var payload = new
{
snapshotRequest = request
};
var requestHash = ComputeHash($"{request.TenantId}:{request.MinimumConfidence}:{request.Status}");
return new ConsensusJobRequest(
JobType: ConsensusJobTypes.SnapshotCreate,
TenantId: request.TenantId,
Priority: ConsensusJobTypes.GetDefaultPriority(ConsensusJobTypes.SnapshotCreate),
IdempotencyKey: $"snapshot:{requestHash}:{_timeProvider.GetUtcNow():yyyyMMddHHmm}",
Payload: JsonSerializer.Serialize(payload, JsonOptions));
}
public async Task<ConsensusJobResult> ExecuteJobAsync(
ConsensusJobRequest request,
CancellationToken cancellationToken = default)
{
var startTime = _timeProvider.GetUtcNow();
try
{
return request.JobType switch
{
ConsensusJobTypes.Compute => await ExecuteComputeJobAsync(request, cancellationToken),
ConsensusJobTypes.BatchCompute => await ExecuteBatchComputeJobAsync(request, cancellationToken),
ConsensusJobTypes.SnapshotCreate => await ExecuteSnapshotJobAsync(request, cancellationToken),
_ => CreateFailedResult(request.JobType, startTime, $"Unsupported job type: {request.JobType}")
};
}
catch (Exception ex)
{
return CreateFailedResult(request.JobType, startTime, ex.Message);
}
}
public ConsensusJobTypeRegistration GetRegistration()
{
var metadata = new Dictionary<string, JobTypeMetadata>();
foreach (var jobType in ConsensusJobTypes.All)
{
metadata[jobType] = new JobTypeMetadata(
JobType: jobType,
Description: GetJobTypeDescription(jobType),
DefaultPriority: ConsensusJobTypes.GetDefaultPriority(jobType),
SupportsBatching: ConsensusJobTypes.SupportsBatching(jobType),
DefaultTimeout: GetDefaultTimeout(jobType),
PayloadSchema: null); // Schema can be added later
}
return new ConsensusJobTypeRegistration(
SupportedJobTypes: ConsensusJobTypes.All,
Metadata: metadata,
SchemaVersion: SchemaVersion);
}
private async Task<ConsensusJobResult> ExecuteComputeJobAsync(
ConsensusJobRequest request,
CancellationToken cancellationToken)
{
var startTime = _timeProvider.GetUtcNow();
var payload = JsonSerializer.Deserialize<ComputePayload>(request.Payload, JsonOptions)
?? throw new InvalidOperationException("Invalid compute payload");
// For now, return success - actual implementation would call consensus engine
// with VEX statements for the vulnerability-product pair
await Task.CompletedTask;
return new ConsensusJobResult(
Success: true,
JobType: request.JobType,
ItemsProcessed: 1,
ItemsFailed: 0,
Duration: _timeProvider.GetUtcNow() - startTime,
ResultPayload: JsonSerializer.Serialize(new
{
vulnerabilityId = payload.VulnerabilityId,
productKey = payload.ProductKey,
status = "computed"
}, JsonOptions),
ErrorMessage: null);
}
private async Task<ConsensusJobResult> ExecuteBatchComputeJobAsync(
ConsensusJobRequest request,
CancellationToken cancellationToken)
{
var startTime = _timeProvider.GetUtcNow();
var payload = JsonSerializer.Deserialize<BatchComputePayload>(request.Payload, JsonOptions)
?? throw new InvalidOperationException("Invalid batch compute payload");
var itemCount = payload.Items?.Count ?? 0;
await Task.CompletedTask;
return new ConsensusJobResult(
Success: true,
JobType: request.JobType,
ItemsProcessed: itemCount,
ItemsFailed: 0,
Duration: _timeProvider.GetUtcNow() - startTime,
ResultPayload: JsonSerializer.Serialize(new { processedCount = itemCount }, JsonOptions),
ErrorMessage: null);
}
private async Task<ConsensusJobResult> ExecuteSnapshotJobAsync(
ConsensusJobRequest request,
CancellationToken cancellationToken)
{
var startTime = _timeProvider.GetUtcNow();
// Create snapshot using export service
var snapshotRequest = ConsensusExportExtensions.FullExportRequest(request.TenantId);
var snapshot = await _exportService.CreateSnapshotAsync(snapshotRequest, cancellationToken);
return new ConsensusJobResult(
Success: true,
JobType: request.JobType,
ItemsProcessed: snapshot.Projections.Count,
ItemsFailed: 0,
Duration: _timeProvider.GetUtcNow() - startTime,
ResultPayload: JsonSerializer.Serialize(new
{
snapshotId = snapshot.SnapshotId,
projectionCount = snapshot.Projections.Count,
contentHash = snapshot.Metadata.ContentHash
}, JsonOptions),
ErrorMessage: null);
}
private ConsensusJobResult CreateFailedResult(string jobType, DateTimeOffset startTime, string error)
{
return new ConsensusJobResult(
Success: false,
JobType: jobType,
ItemsProcessed: 0,
ItemsFailed: 1,
Duration: _timeProvider.GetUtcNow() - startTime,
ResultPayload: null,
ErrorMessage: error);
}
private static string GetJobTypeDescription(string jobType) => jobType switch
{
ConsensusJobTypes.Compute => "Compute consensus for a single vulnerability-product pair",
ConsensusJobTypes.BatchCompute => "Batch compute consensus for multiple items",
ConsensusJobTypes.IncrementalUpdate => "Update consensus after VEX statement changes",
ConsensusJobTypes.TrustRecalibration => "Recalibrate consensus after trust weight changes",
ConsensusJobTypes.ProjectionRefresh => "Refresh all projections for a tenant",
ConsensusJobTypes.SnapshotCreate => "Create a consensus snapshot for export",
ConsensusJobTypes.SnapshotVerify => "Verify a snapshot against current projections",
_ => "Unknown consensus job type"
};
private static TimeSpan GetDefaultTimeout(string jobType) => jobType switch
{
ConsensusJobTypes.Compute => TimeSpan.FromSeconds(30),
ConsensusJobTypes.BatchCompute => TimeSpan.FromMinutes(5),
ConsensusJobTypes.IncrementalUpdate => TimeSpan.FromMinutes(2),
ConsensusJobTypes.TrustRecalibration => TimeSpan.FromMinutes(10),
ConsensusJobTypes.ProjectionRefresh => TimeSpan.FromMinutes(15),
ConsensusJobTypes.SnapshotCreate => TimeSpan.FromMinutes(5),
ConsensusJobTypes.SnapshotVerify => TimeSpan.FromMinutes(5),
_ => TimeSpan.FromMinutes(5)
};
private static string ComputeHash(string input)
{
var hash = System.Security.Cryptography.SHA256.HashData(
System.Text.Encoding.UTF8.GetBytes(input));
return Convert.ToHexString(hash).ToLowerInvariant()[..16];
}
// Payload DTOs for deserialization
private sealed record ComputePayload(
string VulnerabilityId,
string ProductKey,
string? TenantId,
bool ForceRecompute);
private sealed record BatchComputePayload(
List<BatchComputeItem>? Items,
string? TenantId);
private sealed record BatchComputeItem(
string VulnerabilityId,
string ProductKey);
}