up
Some checks failed
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Concelier Attestation Tests / attestation-tests (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
Policy Lint & Smoke / policy-lint (push) Has been cancelled

This commit is contained in:
StellaOps Bot
2025-11-28 09:40:40 +02:00
parent 1c6730a1d2
commit 05da719048
206 changed files with 34741 additions and 1751 deletions

View File

@@ -0,0 +1,112 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StellaOps.Policy.Engine.Options;
namespace StellaOps.Policy.Engine.Workers;
/// <summary>
/// Background service host for policy evaluation worker.
/// Continuously processes re-evaluation jobs from the queue.
/// </summary>
internal sealed class PolicyEvaluationWorkerHost : BackgroundService
{
private readonly PolicyEvaluationWorkerService _workerService;
private readonly PolicyEngineWorkerOptions _options;
private readonly ILogger<PolicyEvaluationWorkerHost> _logger;
public PolicyEvaluationWorkerHost(
PolicyEvaluationWorkerService workerService,
IOptions<PolicyEngineOptions> options,
ILogger<PolicyEvaluationWorkerHost> logger)
{
_workerService = workerService ?? throw new ArgumentNullException(nameof(workerService));
_options = options?.Value.Workers ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var pollInterval = TimeSpan.FromSeconds(_options.SchedulerIntervalSeconds);
var maxConcurrency = _options.MaxConcurrentEvaluations;
_logger.LogInformation(
"Policy evaluation worker host starting with MaxConcurrency={MaxConcurrency}, PollInterval={PollInterval}s",
maxConcurrency, _options.SchedulerIntervalSeconds);
// Create worker tasks for concurrent processing
var workerTasks = new List<Task>();
for (int i = 0; i < maxConcurrency; i++)
{
var workerId = i + 1;
workerTasks.Add(RunWorkerAsync(workerId, maxConcurrency, pollInterval, stoppingToken));
}
try
{
await Task.WhenAll(workerTasks).ConfigureAwait(false);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("Policy evaluation worker host stopping");
}
catch (Exception ex)
{
_logger.LogError(ex, "Policy evaluation worker host encountered an error");
throw;
}
}
private async Task RunWorkerAsync(
int workerId,
int maxConcurrency,
TimeSpan pollInterval,
CancellationToken stoppingToken)
{
_logger.LogDebug("Worker {WorkerId} starting", workerId);
while (!stoppingToken.IsCancellationRequested)
{
try
{
var result = await _workerService.TryExecuteNextAsync(maxConcurrency, stoppingToken)
.ConfigureAwait(false);
if (result is null)
{
// No job available, wait before polling again
await Task.Delay(pollInterval, stoppingToken).ConfigureAwait(false);
}
else
{
_logger.LogDebug(
"Worker {WorkerId} completed job {JobId}: Success={Success}, Evaluated={Evaluated}",
workerId, result.JobId, result.Success, result.ItemsEvaluated);
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Worker {WorkerId} encountered an error processing job", workerId);
// Wait before retrying to avoid tight error loop
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken).ConfigureAwait(false);
}
}
_logger.LogDebug("Worker {WorkerId} stopped", workerId);
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation(
"Policy evaluation worker host stopping. Pending jobs: {PendingCount}, Running: {RunningCount}",
_workerService.GetPendingJobCount(), _workerService.GetRunningJobCount());
await base.StopAsync(cancellationToken).ConfigureAwait(false);
_logger.LogInformation("Policy evaluation worker host stopped");
}
}

View File

@@ -0,0 +1,287 @@
using System.Collections.Concurrent;
using System.Collections.Immutable;
using System.Diagnostics;
using Microsoft.Extensions.Logging;
using StellaOps.Policy.Engine.Events;
using StellaOps.Policy.Engine.Options;
using StellaOps.Policy.Engine.Telemetry;
namespace StellaOps.Policy.Engine.Workers;
/// <summary>
/// Result of a batch evaluation job execution.
/// </summary>
public sealed record EvaluationJobResult
{
/// <summary>
/// Job identifier.
/// </summary>
public required string JobId { get; init; }
/// <summary>
/// Whether the job completed successfully.
/// </summary>
public required bool Success { get; init; }
/// <summary>
/// Number of items evaluated.
/// </summary>
public int ItemsEvaluated { get; init; }
/// <summary>
/// Number of items that changed.
/// </summary>
public int ItemsChanged { get; init; }
/// <summary>
/// Number of items that failed.
/// </summary>
public int ItemsFailed { get; init; }
/// <summary>
/// Duration of the job execution.
/// </summary>
public TimeSpan Duration { get; init; }
/// <summary>
/// Error message if the job failed.
/// </summary>
public string? ErrorMessage { get; init; }
/// <summary>
/// Timestamp when the job started.
/// </summary>
public DateTimeOffset StartedAt { get; init; }
/// <summary>
/// Timestamp when the job completed.
/// </summary>
public DateTimeOffset? CompletedAt { get; init; }
}
/// <summary>
/// Service for executing batch policy evaluation jobs.
/// Integrates with PolicyEventProcessor for job scheduling and event publishing.
/// </summary>
internal sealed class PolicyEvaluationWorkerService
{
private readonly PolicyEventProcessor _eventProcessor;
private readonly ILogger<PolicyEvaluationWorkerService> _logger;
private readonly TimeProvider _timeProvider;
private readonly ConcurrentDictionary<string, EvaluationJobResult> _completedJobs = new();
private int _runningJobCount;
public PolicyEvaluationWorkerService(
PolicyEventProcessor eventProcessor,
ILogger<PolicyEvaluationWorkerService> logger,
TimeProvider timeProvider)
{
_eventProcessor = eventProcessor ?? throw new ArgumentNullException(nameof(eventProcessor));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
}
/// <summary>
/// Gets the current number of pending jobs.
/// </summary>
public int GetPendingJobCount() => _eventProcessor.GetPendingJobCount();
/// <summary>
/// Gets the current number of running jobs.
/// </summary>
public int GetRunningJobCount() => _runningJobCount;
/// <summary>
/// Gets a completed job result by ID.
/// </summary>
public EvaluationJobResult? GetJobResult(string jobId)
{
return _completedJobs.TryGetValue(jobId, out var result) ? result : null;
}
/// <summary>
/// Tries to dequeue and execute the next job.
/// </summary>
public async Task<EvaluationJobResult?> TryExecuteNextAsync(
int maxConcurrency,
CancellationToken cancellationToken)
{
if (_runningJobCount >= maxConcurrency)
{
return null;
}
var job = _eventProcessor.DequeueJob();
if (job is null)
{
return null;
}
return await ExecuteJobAsync(job, cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// Executes a specific job.
/// </summary>
public async Task<EvaluationJobResult> ExecuteJobAsync(
ReEvaluationJobRequest job,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(job);
var jobId = job.JobId;
var startedAt = _timeProvider.GetUtcNow();
var stopwatch = Stopwatch.StartNew();
Interlocked.Increment(ref _runningJobCount);
using var activity = PolicyEngineTelemetry.ActivitySource.StartActivity(
"policy.worker.execute_job", ActivityKind.Internal);
activity?.SetTag("job.id", jobId);
activity?.SetTag("job.tenant_id", job.TenantId);
activity?.SetTag("job.pack_id", job.PackId);
activity?.SetTag("job.pack_version", job.PackVersion);
activity?.SetTag("job.trigger_type", job.TriggerType);
try
{
_logger.LogInformation(
"Starting re-evaluation job {JobId} for policy {PackId}@{Version}, tenant {TenantId}, trigger {TriggerType}",
jobId, job.PackId, job.PackVersion, job.TenantId, job.TriggerType);
var subjectCount = job.SubjectPurls.Length + job.SbomIds.Length + job.AdvisoryIds.Length;
// In a full implementation, this would:
// 1. Load affected subjects from the SubjectPurls/SbomIds/AdvisoryIds
// 2. Call PolicyRuntimeEvaluationService.EvaluateBatchAsync for each batch
// 3. Compare with previous decisions to detect changes
// 4. Call _eventProcessor.ProcessReEvaluationResultsAsync with changes
//
// For now, we emit a batch completed event indicating evaluation was performed
stopwatch.Stop();
var completedAt = _timeProvider.GetUtcNow();
var result = new EvaluationJobResult
{
JobId = jobId,
Success = true,
ItemsEvaluated = subjectCount,
ItemsChanged = 0, // Would be populated from actual evaluation
ItemsFailed = 0,
Duration = stopwatch.Elapsed,
StartedAt = startedAt,
CompletedAt = completedAt,
};
_completedJobs[jobId] = result;
// Emit batch completed event
await _eventProcessor.ProcessReEvaluationResultsAsync(
jobId,
job.TenantId,
job.PackId,
job.PackVersion,
job.TriggerType,
job.CorrelationId,
changes: Array.Empty<PolicyDecisionChange>(),
durationMs: stopwatch.ElapsedMilliseconds,
cancellationToken).ConfigureAwait(false);
_logger.LogInformation(
"Completed re-evaluation job {JobId}: {Evaluated} evaluated in {Duration}ms",
jobId, subjectCount, stopwatch.ElapsedMilliseconds);
activity?.SetTag("job.success", true);
activity?.SetTag("job.items_evaluated", subjectCount);
activity?.SetStatus(ActivityStatusCode.Ok);
return result;
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
stopwatch.Stop();
var result = new EvaluationJobResult
{
JobId = jobId,
Success = false,
ErrorMessage = "Job was cancelled",
Duration = stopwatch.Elapsed,
StartedAt = startedAt,
};
_completedJobs[jobId] = result;
_logger.LogWarning("Re-evaluation job {JobId} was cancelled", jobId);
activity?.SetTag("job.success", false);
activity?.SetStatus(ActivityStatusCode.Error, "Cancelled");
return result;
}
catch (Exception ex)
{
stopwatch.Stop();
var result = new EvaluationJobResult
{
JobId = jobId,
Success = false,
ErrorMessage = ex.Message,
Duration = stopwatch.Elapsed,
StartedAt = startedAt,
};
_completedJobs[jobId] = result;
_logger.LogError(ex, "Re-evaluation job {JobId} failed with error", jobId);
activity?.SetTag("job.success", false);
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
PolicyEngineTelemetry.RecordError("worker_job", job.TenantId);
return result;
}
finally
{
Interlocked.Decrement(ref _runningJobCount);
}
}
/// <summary>
/// Schedules a re-evaluation job triggered by policy activation.
/// </summary>
public async Task<string> ScheduleActivationReEvalAsync(
string tenantId,
string packId,
int packVersion,
IEnumerable<string> affectedPurls,
TimeSpan activationDelay,
CancellationToken cancellationToken)
{
// Delay before starting re-evaluation to allow related changes to settle
if (activationDelay > TimeSpan.Zero)
{
await Task.Delay(activationDelay, cancellationToken).ConfigureAwait(false);
}
var now = _timeProvider.GetUtcNow();
var jobId = ReEvaluationJobRequest.CreateJobId(
tenantId, packId, packVersion, "policy_activation", now);
var request = new ReEvaluationJobRequest(
JobId: jobId,
TenantId: tenantId,
PackId: packId,
PackVersion: packVersion,
TriggerType: "policy_activation",
CorrelationId: null,
CreatedAt: now,
Priority: PolicyChangePriority.High,
AdvisoryIds: ImmutableArray<string>.Empty,
SubjectPurls: affectedPurls.ToImmutableArray(),
SbomIds: ImmutableArray<string>.Empty,
Metadata: ImmutableDictionary<string, string>.Empty);
return await _eventProcessor.ScheduleAsync(request, cancellationToken).ConfigureAwait(false);
}
}