up
Some checks failed
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Concelier Attestation Tests / attestation-tests (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
Policy Lint & Smoke / policy-lint (push) Has been cancelled
devportal-offline / build-offline (push) Has been cancelled
Mirror Thin Bundle Sign & Verify / mirror-sign (push) Has been cancelled

This commit is contained in:
StellaOps Bot
2025-11-28 00:45:16 +02:00
parent 3b96b2e3ea
commit 1c6730a1d2
95 changed files with 14504 additions and 463 deletions

View File

@@ -0,0 +1,495 @@
using System.Collections.Immutable;
using System.Diagnostics;
namespace StellaOps.Policy.Engine.IncrementalOrchestrator;
/// <summary>
/// Background service that continuously processes policy change events
/// and schedules incremental re-evaluations.
/// </summary>
public sealed class IncrementalOrchestratorBackgroundService : IDisposable
{
private readonly IncrementalPolicyOrchestrator _orchestrator;
private readonly IncrementalOrchestratorOptions _options;
private readonly TimeProvider _timeProvider;
private readonly IncrementalOrchestratorMetrics _metrics;
private CancellationTokenSource? _cts;
private Task? _executingTask;
private bool _disposed;
public IncrementalOrchestratorBackgroundService(
IncrementalPolicyOrchestrator orchestrator,
IncrementalOrchestratorOptions? options = null,
TimeProvider? timeProvider = null,
IncrementalOrchestratorMetrics? metrics = null)
{
_orchestrator = orchestrator ?? throw new ArgumentNullException(nameof(orchestrator));
_options = options ?? IncrementalOrchestratorOptions.Default;
_timeProvider = timeProvider ?? TimeProvider.System;
_metrics = metrics ?? new IncrementalOrchestratorMetrics();
}
/// <summary>
/// Starts the background processing.
/// </summary>
public Task StartAsync(CancellationToken cancellationToken)
{
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_executingTask = ExecuteAsync(_cts.Token);
return Task.CompletedTask;
}
/// <summary>
/// Stops the background processing.
/// </summary>
public async Task StopAsync(CancellationToken cancellationToken)
{
if (_cts is null || _executingTask is null)
{
return;
}
await _cts.CancelAsync().ConfigureAwait(false);
try
{
await _executingTask.WaitAsync(cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Expected during shutdown
}
}
private async Task ExecuteAsync(CancellationToken stoppingToken)
{
using var pollTimer = new PeriodicTimer(_options.PollInterval);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await pollTimer.WaitForNextTickAsync(stoppingToken).ConfigureAwait(false);
var stopwatch = Stopwatch.StartNew();
var result = await _orchestrator.ProcessAsync(stoppingToken).ConfigureAwait(false);
stopwatch.Stop();
// Record metrics
_metrics.RecordProcessingCycle(result, stopwatch.Elapsed);
if (result.HasWork)
{
_metrics.RecordEventsProcessed(
result.TotalEventsRead,
result.EventsSkippedOld,
result.EventsSkippedDuplicate);
_metrics.RecordBatches(
result.BatchesProcessed,
result.BatchesFailed);
foreach (var jobId in result.JobsCreated)
{
_metrics.RecordJobCreated(jobId);
}
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_metrics.RecordError(ex);
// Wait before retrying after error
try
{
await Task.Delay(_options.RetryBackoff, stoppingToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
break;
}
}
}
}
public void Dispose()
{
if (_disposed)
{
return;
}
_disposed = true;
_cts?.Cancel();
_cts?.Dispose();
}
}
/// <summary>
/// Metrics collector for the incremental orchestrator.
/// </summary>
public class IncrementalOrchestratorMetrics
{
private long _totalCycles;
private long _totalEventsRead;
private long _totalEventsSkippedOld;
private long _totalEventsSkippedDuplicate;
private long _totalBatchesProcessed;
private long _totalBatchesFailed;
private long _totalJobsCreated;
private long _totalErrors;
private TimeSpan _totalProcessingTime;
private readonly object _lock = new();
/// <summary>
/// Records a processing cycle.
/// </summary>
public virtual void RecordProcessingCycle(OrchestratorProcessResult result, TimeSpan duration)
{
lock (_lock)
{
_totalCycles++;
_totalProcessingTime += duration;
}
}
/// <summary>
/// Records events processed.
/// </summary>
public virtual void RecordEventsProcessed(int total, int skippedOld, int skippedDuplicate)
{
lock (_lock)
{
_totalEventsRead += total;
_totalEventsSkippedOld += skippedOld;
_totalEventsSkippedDuplicate += skippedDuplicate;
}
}
/// <summary>
/// Records batches processed.
/// </summary>
public virtual void RecordBatches(int processed, int failed)
{
lock (_lock)
{
_totalBatchesProcessed += processed;
_totalBatchesFailed += failed;
}
}
/// <summary>
/// Records a job created.
/// </summary>
public virtual void RecordJobCreated(string jobId)
{
Interlocked.Increment(ref _totalJobsCreated);
}
/// <summary>
/// Records an error.
/// </summary>
public virtual void RecordError(Exception ex)
{
Interlocked.Increment(ref _totalErrors);
}
/// <summary>
/// Gets current metrics snapshot.
/// </summary>
public IncrementalOrchestratorMetricsSnapshot GetSnapshot()
{
lock (_lock)
{
return new IncrementalOrchestratorMetricsSnapshot
{
TotalCycles = _totalCycles,
TotalEventsRead = _totalEventsRead,
TotalEventsSkippedOld = _totalEventsSkippedOld,
TotalEventsSkippedDuplicate = _totalEventsSkippedDuplicate,
TotalBatchesProcessed = _totalBatchesProcessed,
TotalBatchesFailed = _totalBatchesFailed,
TotalJobsCreated = _totalJobsCreated,
TotalErrors = _totalErrors,
TotalProcessingTime = _totalProcessingTime
};
}
}
}
/// <summary>
/// Snapshot of orchestrator metrics.
/// </summary>
public sealed record IncrementalOrchestratorMetricsSnapshot
{
public long TotalCycles { get; init; }
public long TotalEventsRead { get; init; }
public long TotalEventsSkippedOld { get; init; }
public long TotalEventsSkippedDuplicate { get; init; }
public long TotalBatchesProcessed { get; init; }
public long TotalBatchesFailed { get; init; }
public long TotalJobsCreated { get; init; }
public long TotalErrors { get; init; }
public TimeSpan TotalProcessingTime { get; init; }
public double AverageProcessingTimeMs =>
TotalCycles > 0 ? TotalProcessingTime.TotalMilliseconds / TotalCycles : 0;
public double BatchSuccessRate =>
TotalBatchesProcessed + TotalBatchesFailed > 0
? (double)TotalBatchesProcessed / (TotalBatchesProcessed + TotalBatchesFailed)
: 1.0;
}
/// <summary>
/// Builder for creating a configured IncrementalOrchestratorBackgroundService.
/// </summary>
public sealed class IncrementalOrchestratorBuilder
{
private IPolicyChangeEventSource? _eventSource;
private IPolicyReEvaluationSubmitter? _submitter;
private IPolicyChangeIdempotencyStore? _idempotencyStore;
private IncrementalOrchestratorOptions _options = IncrementalOrchestratorOptions.Default;
private TimeProvider _timeProvider = TimeProvider.System;
private IncrementalOrchestratorMetrics? _metrics;
public IncrementalOrchestratorBuilder WithEventSource(IPolicyChangeEventSource source)
{
_eventSource = source;
return this;
}
public IncrementalOrchestratorBuilder WithSubmitter(IPolicyReEvaluationSubmitter submitter)
{
_submitter = submitter;
return this;
}
public IncrementalOrchestratorBuilder WithIdempotencyStore(IPolicyChangeIdempotencyStore store)
{
_idempotencyStore = store;
return this;
}
public IncrementalOrchestratorBuilder WithOptions(IncrementalOrchestratorOptions options)
{
_options = options;
return this;
}
public IncrementalOrchestratorBuilder WithOptions(Action<IncrementalOrchestratorOptions> configure)
{
var options = new IncrementalOrchestratorOptions();
configure(options);
_options = options;
return this;
}
public IncrementalOrchestratorBuilder WithTimeProvider(TimeProvider timeProvider)
{
_timeProvider = timeProvider;
return this;
}
public IncrementalOrchestratorBuilder WithMetrics(IncrementalOrchestratorMetrics metrics)
{
_metrics = metrics;
return this;
}
public IncrementalOrchestratorBackgroundService Build()
{
if (_eventSource is null)
{
throw new InvalidOperationException("Event source is required");
}
if (_submitter is null)
{
throw new InvalidOperationException("Submitter is required");
}
_idempotencyStore ??= new InMemoryPolicyChangeIdempotencyStore();
_metrics ??= new IncrementalOrchestratorMetrics();
var orchestrator = new IncrementalPolicyOrchestrator(
_eventSource,
_submitter,
_idempotencyStore,
_options,
_timeProvider);
return new IncrementalOrchestratorBackgroundService(
orchestrator,
_options,
_timeProvider,
_metrics);
}
}
/// <summary>
/// Default implementation that creates policy run jobs from change batches.
/// </summary>
public sealed class DefaultPolicyReEvaluationSubmitter : IPolicyReEvaluationSubmitter
{
private readonly TimeProvider _timeProvider;
public DefaultPolicyReEvaluationSubmitter(TimeProvider? timeProvider = null)
{
_timeProvider = timeProvider ?? TimeProvider.System;
}
/// <summary>
/// Delegate for creating policy run jobs.
/// </summary>
public Func<PolicyRunJobRequest, CancellationToken, Task<string>>? OnSubmitJob { get; set; }
public async Task<PolicyReEvaluationResult> SubmitAsync(
PolicyChangeBatch batch,
CancellationToken cancellationToken)
{
var stopwatch = Stopwatch.StartNew();
var jobIds = new List<string>();
try
{
// Build metadata for incremental targeting
var metadata = ImmutableSortedDictionary.CreateBuilder<string, string>(StringComparer.Ordinal);
if (!batch.VulnerabilityIds.IsDefaultOrEmpty)
{
metadata["delta.vulnerabilities"] = string.Join(";", batch.VulnerabilityIds);
}
if (!batch.AffectedPurls.IsDefaultOrEmpty)
{
metadata["delta.purls"] = string.Join(";", batch.AffectedPurls.Take(100)); // Limit size
}
if (!batch.AffectedProductKeys.IsDefaultOrEmpty)
{
metadata["delta.productkeys"] = string.Join(";", batch.AffectedProductKeys.Take(100));
}
if (!batch.AffectedSbomIds.IsDefaultOrEmpty)
{
metadata["delta.sboms"] = string.Join(";", batch.AffectedSbomIds.Take(100));
}
metadata["orchestrator.batchId"] = batch.BatchId;
metadata["orchestrator.eventCount"] = batch.Events.Length.ToString();
metadata["orchestrator.priority"] = batch.Priority.ToString().ToLowerInvariant();
var request = new PolicyRunJobRequest
{
TenantId = batch.TenantId,
Mode = PolicyRunJobMode.Incremental,
Priority = MapPriority(batch.Priority),
Metadata = metadata.ToImmutable(),
QueuedAt = _timeProvider.GetUtcNow(),
CorrelationId = batch.BatchId
};
if (OnSubmitJob is not null)
{
var jobId = await OnSubmitJob(request, cancellationToken).ConfigureAwait(false);
jobIds.Add(jobId);
}
else
{
// Simulate job creation for testing
jobIds.Add($"prj-{batch.BatchId[4..]}");
}
stopwatch.Stop();
return new PolicyReEvaluationResult
{
Succeeded = true,
JobIds = jobIds.ToImmutableArray(),
EstimatedFindingsCount = EstimateFindings(batch),
ProcessingTimeMs = stopwatch.ElapsedMilliseconds
};
}
catch (Exception ex)
{
stopwatch.Stop();
return new PolicyReEvaluationResult
{
Succeeded = false,
JobIds = ImmutableArray<string>.Empty,
Error = ex.Message,
ProcessingTimeMs = stopwatch.ElapsedMilliseconds
};
}
}
private static PolicyRunJobPriority MapPriority(PolicyChangePriority priority)
{
return priority switch
{
PolicyChangePriority.Emergency => PolicyRunJobPriority.Emergency,
PolicyChangePriority.High => PolicyRunJobPriority.High,
_ => PolicyRunJobPriority.Normal
};
}
private static int EstimateFindings(PolicyChangeBatch batch)
{
// Rough estimate based on batch contents
var vulnCount = batch.VulnerabilityIds.Length;
var purlCount = batch.AffectedPurls.Length;
var sbomCount = batch.AffectedSbomIds.Length;
// Assume average of 5 findings per vulnerability per SBOM
if (vulnCount > 0 && sbomCount > 0)
{
return vulnCount * sbomCount * 5;
}
// Assume average of 10 findings per PURL
if (purlCount > 0)
{
return purlCount * 10;
}
return batch.Events.Length * 5;
}
}
/// <summary>
/// Request to create a policy run job.
/// </summary>
public sealed record PolicyRunJobRequest
{
public required string TenantId { get; init; }
public required PolicyRunJobMode Mode { get; init; }
public required PolicyRunJobPriority Priority { get; init; }
public ImmutableSortedDictionary<string, string>? Metadata { get; init; }
public DateTimeOffset QueuedAt { get; init; }
public string? CorrelationId { get; init; }
}
/// <summary>
/// Policy run job mode.
/// </summary>
public enum PolicyRunJobMode
{
Full,
Incremental
}
/// <summary>
/// Policy run job priority.
/// </summary>
public enum PolicyRunJobPriority
{
Normal,
High,
Emergency
}

View File

@@ -0,0 +1,536 @@
using System.Collections.Concurrent;
using System.Collections.Immutable;
using System.Diagnostics;
using System.Security.Cryptography;
using System.Text;
namespace StellaOps.Policy.Engine.IncrementalOrchestrator;
/// <summary>
/// Configuration options for the incremental policy orchestrator.
/// </summary>
public sealed record IncrementalOrchestratorOptions
{
/// <summary>
/// How often to poll for new change events.
/// </summary>
public TimeSpan PollInterval { get; init; } = TimeSpan.FromSeconds(5);
/// <summary>
/// How long to wait before batching events together.
/// </summary>
public TimeSpan BatchWindow { get; init; } = TimeSpan.FromSeconds(10);
/// <summary>
/// Maximum events per batch.
/// </summary>
public int MaxBatchSize { get; init; } = 100;
/// <summary>
/// Maximum retry attempts for failed processing.
/// </summary>
public int MaxRetryAttempts { get; init; } = 3;
/// <summary>
/// Delay between retry attempts.
/// </summary>
public TimeSpan RetryBackoff { get; init; } = TimeSpan.FromSeconds(5);
/// <summary>
/// Whether to enable deduplication within batch window.
/// </summary>
public bool EnableDeduplication { get; init; } = true;
/// <summary>
/// Maximum age of events to process (older events are skipped).
/// </summary>
public TimeSpan MaxEventAge { get; init; } = TimeSpan.FromHours(24);
/// <summary>
/// Default options.
/// </summary>
public static IncrementalOrchestratorOptions Default { get; } = new();
}
/// <summary>
/// Interface for reading change events from a source.
/// </summary>
public interface IPolicyChangeEventSource
{
/// <summary>
/// Reads pending change events.
/// </summary>
IAsyncEnumerable<PolicyChangeEvent> ReadAsync(CancellationToken cancellationToken);
/// <summary>
/// Acknowledges that an event has been processed.
/// </summary>
Task AcknowledgeAsync(string eventId, CancellationToken cancellationToken);
/// <summary>
/// Marks an event as failed for retry.
/// </summary>
Task MarkFailedAsync(string eventId, string error, CancellationToken cancellationToken);
}
/// <summary>
/// Interface for submitting policy re-evaluation jobs.
/// </summary>
public interface IPolicyReEvaluationSubmitter
{
/// <summary>
/// Submits a batch for re-evaluation.
/// </summary>
Task<PolicyReEvaluationResult> SubmitAsync(
PolicyChangeBatch batch,
CancellationToken cancellationToken);
}
/// <summary>
/// Interface for idempotency tracking.
/// </summary>
public interface IPolicyChangeIdempotencyStore
{
/// <summary>
/// Checks if an event has already been processed.
/// </summary>
Task<bool> HasSeenAsync(string eventId, CancellationToken cancellationToken);
/// <summary>
/// Marks an event as processed.
/// </summary>
Task MarkSeenAsync(string eventId, DateTimeOffset processedAt, CancellationToken cancellationToken);
}
/// <summary>
/// Result of a policy re-evaluation submission.
/// </summary>
public sealed record PolicyReEvaluationResult
{
/// <summary>
/// Whether the submission succeeded.
/// </summary>
public required bool Succeeded { get; init; }
/// <summary>
/// Job ID(s) created for the re-evaluation.
/// </summary>
public required ImmutableArray<string> JobIds { get; init; }
/// <summary>
/// Number of findings that will be re-evaluated.
/// </summary>
public int EstimatedFindingsCount { get; init; }
/// <summary>
/// Error message if failed.
/// </summary>
public string? Error { get; init; }
/// <summary>
/// Processing duration.
/// </summary>
public long ProcessingTimeMs { get; init; }
}
/// <summary>
/// Orchestrates incremental policy re-evaluations in response to
/// advisory, VEX, and SBOM change streams.
/// </summary>
public sealed class IncrementalPolicyOrchestrator
{
private readonly IPolicyChangeEventSource _eventSource;
private readonly IPolicyReEvaluationSubmitter _submitter;
private readonly IPolicyChangeIdempotencyStore _idempotencyStore;
private readonly IncrementalOrchestratorOptions _options;
private readonly TimeProvider _timeProvider;
public IncrementalPolicyOrchestrator(
IPolicyChangeEventSource eventSource,
IPolicyReEvaluationSubmitter submitter,
IPolicyChangeIdempotencyStore idempotencyStore,
IncrementalOrchestratorOptions? options = null,
TimeProvider? timeProvider = null)
{
_eventSource = eventSource ?? throw new ArgumentNullException(nameof(eventSource));
_submitter = submitter ?? throw new ArgumentNullException(nameof(submitter));
_idempotencyStore = idempotencyStore ?? throw new ArgumentNullException(nameof(idempotencyStore));
_options = options ?? IncrementalOrchestratorOptions.Default;
_timeProvider = timeProvider ?? TimeProvider.System;
}
/// <summary>
/// Processes a single batch of pending events.
/// </summary>
public async Task<OrchestratorProcessResult> ProcessAsync(CancellationToken cancellationToken)
{
var stopwatch = Stopwatch.StartNew();
var now = _timeProvider.GetUtcNow();
var cutoffTime = now - _options.MaxEventAge;
var eventsByTenant = new Dictionary<string, List<PolicyChangeEvent>>(StringComparer.OrdinalIgnoreCase);
var skippedOld = 0;
var skippedDuplicate = 0;
var totalRead = 0;
// Read and group events by tenant
await foreach (var evt in _eventSource.ReadAsync(cancellationToken))
{
totalRead++;
// Skip events older than max age
if (evt.OccurredAt < cutoffTime)
{
skippedOld++;
await _eventSource.AcknowledgeAsync(evt.EventId, cancellationToken).ConfigureAwait(false);
continue;
}
// Check idempotency
if (_options.EnableDeduplication &&
await _idempotencyStore.HasSeenAsync(evt.EventId, cancellationToken).ConfigureAwait(false))
{
skippedDuplicate++;
await _eventSource.AcknowledgeAsync(evt.EventId, cancellationToken).ConfigureAwait(false);
continue;
}
if (!eventsByTenant.TryGetValue(evt.TenantId, out var tenantEvents))
{
tenantEvents = new List<PolicyChangeEvent>();
eventsByTenant[evt.TenantId] = tenantEvents;
}
tenantEvents.Add(evt);
// Limit total events per processing cycle
if (totalRead >= _options.MaxBatchSize * 10)
{
break;
}
}
var batchesProcessed = 0;
var batchesFailed = 0;
var jobsCreated = new List<string>();
// Process each tenant's events
foreach (var (tenantId, events) in eventsByTenant.OrderBy(kvp => kvp.Key, StringComparer.Ordinal))
{
var batches = CreateBatches(tenantId, events, now);
foreach (var batch in batches)
{
var attempts = 0;
var success = false;
while (attempts < _options.MaxRetryAttempts && !success)
{
try
{
cancellationToken.ThrowIfCancellationRequested();
var result = await _submitter.SubmitAsync(batch, cancellationToken).ConfigureAwait(false);
if (result.Succeeded)
{
success = true;
batchesProcessed++;
jobsCreated.AddRange(result.JobIds);
// Mark all events in batch as seen
foreach (var evt in batch.Events)
{
await _idempotencyStore.MarkSeenAsync(evt.EventId, now, cancellationToken)
.ConfigureAwait(false);
await _eventSource.AcknowledgeAsync(evt.EventId, cancellationToken)
.ConfigureAwait(false);
}
}
else
{
attempts++;
if (attempts < _options.MaxRetryAttempts)
{
await Task.Delay(_options.RetryBackoff, cancellationToken).ConfigureAwait(false);
}
}
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
attempts++;
if (attempts >= _options.MaxRetryAttempts)
{
batchesFailed++;
foreach (var evt in batch.Events)
{
await _eventSource.MarkFailedAsync(evt.EventId, ex.Message, cancellationToken)
.ConfigureAwait(false);
}
}
else
{
await Task.Delay(_options.RetryBackoff, cancellationToken).ConfigureAwait(false);
}
}
}
}
}
stopwatch.Stop();
return new OrchestratorProcessResult
{
TotalEventsRead = totalRead,
EventsSkippedOld = skippedOld,
EventsSkippedDuplicate = skippedDuplicate,
BatchesProcessed = batchesProcessed,
BatchesFailed = batchesFailed,
JobsCreated = jobsCreated.ToImmutableArray(),
ProcessingTimeMs = stopwatch.ElapsedMilliseconds
};
}
/// <summary>
/// Creates deterministically ordered batches from events.
/// </summary>
private IReadOnlyList<PolicyChangeBatch> CreateBatches(
string tenantId,
IReadOnlyList<PolicyChangeEvent> events,
DateTimeOffset now)
{
// Sort by priority (highest first), then by occurred time
var ordered = events
.OrderByDescending(e => (int)e.Priority)
.ThenBy(e => e.OccurredAt)
.ThenBy(e => e.EventId, StringComparer.Ordinal)
.ToList();
var batches = new List<PolicyChangeBatch>();
var currentBatch = new List<PolicyChangeEvent>();
var currentPriority = PolicyChangePriority.Normal;
foreach (var evt in ordered)
{
// Start new batch if priority changes or batch is full
if (currentBatch.Count > 0 &&
(evt.Priority != currentPriority || currentBatch.Count >= _options.MaxBatchSize))
{
batches.Add(CreateBatchFromEvents(tenantId, currentBatch, currentPriority, now));
currentBatch = new List<PolicyChangeEvent>();
}
currentBatch.Add(evt);
currentPriority = evt.Priority;
}
// Add final batch
if (currentBatch.Count > 0)
{
batches.Add(CreateBatchFromEvents(tenantId, currentBatch, currentPriority, now));
}
return batches;
}
private static PolicyChangeBatch CreateBatchFromEvents(
string tenantId,
IReadOnlyList<PolicyChangeEvent> events,
PolicyChangePriority priority,
DateTimeOffset createdAt)
{
var batchId = CreateBatchId(tenantId, events, createdAt);
// Aggregate all affected items
var allPurls = events
.SelectMany(e => e.AffectedPurls)
.Where(p => !string.IsNullOrWhiteSpace(p))
.Distinct(StringComparer.Ordinal)
.OrderBy(p => p, StringComparer.Ordinal)
.ToImmutableArray();
var allProductKeys = events
.SelectMany(e => e.AffectedProductKeys)
.Where(k => !string.IsNullOrWhiteSpace(k))
.Distinct(StringComparer.OrdinalIgnoreCase)
.OrderBy(k => k, StringComparer.OrdinalIgnoreCase)
.ToImmutableArray();
var allSbomIds = events
.SelectMany(e => e.AffectedSbomIds)
.Where(id => !string.IsNullOrWhiteSpace(id))
.Distinct(StringComparer.Ordinal)
.OrderBy(id => id, StringComparer.Ordinal)
.ToImmutableArray();
var allVulnIds = events
.Select(e => e.VulnerabilityId)
.Where(v => !string.IsNullOrWhiteSpace(v))
.Cast<string>()
.Distinct(StringComparer.OrdinalIgnoreCase)
.OrderBy(v => v, StringComparer.OrdinalIgnoreCase)
.ToImmutableArray();
return new PolicyChangeBatch
{
BatchId = batchId,
TenantId = tenantId,
Events = events.ToImmutableArray(),
Priority = priority,
CreatedAt = createdAt,
AffectedPurls = allPurls,
AffectedProductKeys = allProductKeys,
AffectedSbomIds = allSbomIds,
VulnerabilityIds = allVulnIds
};
}
private static string CreateBatchId(
string tenantId,
IReadOnlyList<PolicyChangeEvent> events,
DateTimeOffset createdAt)
{
var builder = new StringBuilder();
builder.Append(tenantId).Append('|');
builder.Append(createdAt.ToString("O")).Append('|');
foreach (var evt in events.OrderBy(e => e.EventId, StringComparer.Ordinal))
{
builder.Append(evt.EventId).Append('|');
}
var bytes = SHA256.HashData(Encoding.UTF8.GetBytes(builder.ToString()));
return $"pcb-{Convert.ToHexStringLower(bytes)[..16]}";
}
}
/// <summary>
/// Result of an orchestrator processing cycle.
/// </summary>
public sealed record OrchestratorProcessResult
{
/// <summary>
/// Total events read from source.
/// </summary>
public required int TotalEventsRead { get; init; }
/// <summary>
/// Events skipped due to age.
/// </summary>
public required int EventsSkippedOld { get; init; }
/// <summary>
/// Events skipped due to deduplication.
/// </summary>
public required int EventsSkippedDuplicate { get; init; }
/// <summary>
/// Batches successfully processed.
/// </summary>
public required int BatchesProcessed { get; init; }
/// <summary>
/// Batches that failed after retries.
/// </summary>
public required int BatchesFailed { get; init; }
/// <summary>
/// Job IDs created during processing.
/// </summary>
public required ImmutableArray<string> JobsCreated { get; init; }
/// <summary>
/// Total processing time in milliseconds.
/// </summary>
public required long ProcessingTimeMs { get; init; }
/// <summary>
/// Whether any work was done.
/// </summary>
public bool HasWork => TotalEventsRead > 0;
/// <summary>
/// Whether all batches succeeded.
/// </summary>
public bool AllSucceeded => BatchesFailed == 0;
}
/// <summary>
/// In-memory implementation of policy change event source for testing.
/// </summary>
public sealed class InMemoryPolicyChangeEventSource : IPolicyChangeEventSource
{
private readonly ConcurrentQueue<PolicyChangeEvent> _pending = new();
private readonly ConcurrentDictionary<string, PolicyChangeEvent> _failed = new();
private readonly ConcurrentDictionary<string, PolicyChangeEvent> _acknowledged = new();
public void Enqueue(PolicyChangeEvent evt)
{
_pending.Enqueue(evt);
}
public void EnqueueRange(IEnumerable<PolicyChangeEvent> events)
{
foreach (var evt in events)
{
_pending.Enqueue(evt);
}
}
public async IAsyncEnumerable<PolicyChangeEvent> ReadAsync(
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
while (_pending.TryDequeue(out var evt))
{
cancellationToken.ThrowIfCancellationRequested();
yield return evt;
}
await Task.CompletedTask;
}
public Task AcknowledgeAsync(string eventId, CancellationToken cancellationToken)
{
// Remove from failed if retrying
_failed.TryRemove(eventId, out _);
return Task.CompletedTask;
}
public Task MarkFailedAsync(string eventId, string error, CancellationToken cancellationToken)
{
// Events could be tracked for retry
return Task.CompletedTask;
}
public int PendingCount => _pending.Count;
public IReadOnlyCollection<PolicyChangeEvent> GetAcknowledged() =>
_acknowledged.Values.ToList();
}
/// <summary>
/// In-memory implementation of idempotency store for testing.
/// </summary>
public sealed class InMemoryPolicyChangeIdempotencyStore : IPolicyChangeIdempotencyStore
{
private readonly ConcurrentDictionary<string, DateTimeOffset> _seen = new(StringComparer.Ordinal);
public Task<bool> HasSeenAsync(string eventId, CancellationToken cancellationToken)
{
return Task.FromResult(_seen.ContainsKey(eventId));
}
public Task MarkSeenAsync(string eventId, DateTimeOffset processedAt, CancellationToken cancellationToken)
{
_seen[eventId] = processedAt;
return Task.CompletedTask;
}
public int SeenCount => _seen.Count;
public void Clear() => _seen.Clear();
}

View File

@@ -0,0 +1,535 @@
using System.Collections.Immutable;
using System.Security.Cryptography;
using System.Text;
namespace StellaOps.Policy.Engine.IncrementalOrchestrator;
/// <summary>
/// Types of policy-relevant changes that trigger re-evaluation.
/// </summary>
public enum PolicyChangeType
{
/// <summary>Advisory was created or updated.</summary>
AdvisoryUpdated,
/// <summary>Advisory was retracted/withdrawn.</summary>
AdvisoryRetracted,
/// <summary>VEX statement was added or modified.</summary>
VexStatementUpdated,
/// <summary>VEX conflict detected.</summary>
VexConflictDetected,
/// <summary>SBOM was ingested or updated.</summary>
SbomUpdated,
/// <summary>SBOM component changed.</summary>
SbomComponentChanged,
/// <summary>Policy version was published.</summary>
PolicyVersionPublished,
/// <summary>Manual re-evaluation triggered.</summary>
ManualTrigger
}
/// <summary>
/// Priority levels for change processing.
/// </summary>
public enum PolicyChangePriority
{
/// <summary>Normal priority - standard processing.</summary>
Normal = 0,
/// <summary>High priority - process sooner.</summary>
High = 1,
/// <summary>Emergency - immediate processing (e.g., KEV addition).</summary>
Emergency = 2
}
/// <summary>
/// Represents a change event that may trigger policy re-evaluation.
/// </summary>
public sealed record PolicyChangeEvent
{
/// <summary>
/// Unique event identifier (deterministic based on content).
/// </summary>
public required string EventId { get; init; }
/// <summary>
/// Type of change.
/// </summary>
public required PolicyChangeType ChangeType { get; init; }
/// <summary>
/// Tenant context for the change.
/// </summary>
public required string TenantId { get; init; }
/// <summary>
/// Timestamp when the change occurred (from source system).
/// </summary>
public required DateTimeOffset OccurredAt { get; init; }
/// <summary>
/// Timestamp when the event was created.
/// </summary>
public required DateTimeOffset CreatedAt { get; init; }
/// <summary>
/// Processing priority.
/// </summary>
public required PolicyChangePriority Priority { get; init; }
/// <summary>
/// Source system that produced the change.
/// </summary>
public required string Source { get; init; }
/// <summary>
/// Correlation ID for tracing.
/// </summary>
public string? CorrelationId { get; init; }
/// <summary>
/// Advisory ID (for advisory/VEX changes).
/// </summary>
public string? AdvisoryId { get; init; }
/// <summary>
/// Vulnerability ID (CVE, GHSA, etc.).
/// </summary>
public string? VulnerabilityId { get; init; }
/// <summary>
/// Affected PURLs (package URLs).
/// </summary>
public ImmutableArray<string> AffectedPurls { get; init; } = ImmutableArray<string>.Empty;
/// <summary>
/// Affected product keys (for SBOM targeting).
/// </summary>
public ImmutableArray<string> AffectedProductKeys { get; init; } = ImmutableArray<string>.Empty;
/// <summary>
/// Affected SBOM IDs (for direct targeting).
/// </summary>
public ImmutableArray<string> AffectedSbomIds { get; init; } = ImmutableArray<string>.Empty;
/// <summary>
/// Policy IDs to re-evaluate (empty = all applicable).
/// </summary>
public ImmutableArray<string> PolicyIds { get; init; } = ImmutableArray<string>.Empty;
/// <summary>
/// Additional metadata for the change.
/// </summary>
public ImmutableDictionary<string, string> Metadata { get; init; } =
ImmutableDictionary<string, string>.Empty;
/// <summary>
/// Content hash for deduplication.
/// </summary>
public required string ContentHash { get; init; }
/// <summary>
/// Computes a deterministic content hash for deduplication.
/// </summary>
public static string ComputeContentHash(
PolicyChangeType changeType,
string tenantId,
string? advisoryId,
string? vulnerabilityId,
IEnumerable<string>? affectedPurls,
IEnumerable<string>? affectedProductKeys,
IEnumerable<string>? affectedSbomIds)
{
var builder = new StringBuilder();
builder.Append(changeType.ToString()).Append('|');
builder.Append(tenantId.ToLowerInvariant()).Append('|');
builder.Append(advisoryId ?? string.Empty).Append('|');
builder.Append(vulnerabilityId ?? string.Empty).Append('|');
// Deterministic ordering
var purls = (affectedPurls ?? Enumerable.Empty<string>())
.Where(p => !string.IsNullOrWhiteSpace(p))
.Select(p => p.Trim())
.OrderBy(p => p, StringComparer.Ordinal);
var productKeys = (affectedProductKeys ?? Enumerable.Empty<string>())
.Where(k => !string.IsNullOrWhiteSpace(k))
.Select(k => k.Trim())
.OrderBy(k => k, StringComparer.Ordinal);
var sbomIds = (affectedSbomIds ?? Enumerable.Empty<string>())
.Where(s => !string.IsNullOrWhiteSpace(s))
.Select(s => s.Trim())
.OrderBy(s => s, StringComparer.Ordinal);
foreach (var purl in purls)
{
builder.Append("purl:").Append(purl).Append('|');
}
foreach (var key in productKeys)
{
builder.Append("pk:").Append(key).Append('|');
}
foreach (var id in sbomIds)
{
builder.Append("sbom:").Append(id).Append('|');
}
var bytes = SHA256.HashData(Encoding.UTF8.GetBytes(builder.ToString()));
return Convert.ToHexStringLower(bytes);
}
/// <summary>
/// Creates a deterministic event ID.
/// </summary>
public static string CreateEventId(
string tenantId,
PolicyChangeType changeType,
string source,
DateTimeOffset occurredAt,
string contentHash)
{
var seed = $"{tenantId}|{changeType}|{source}|{occurredAt:O}|{contentHash}";
var bytes = SHA256.HashData(Encoding.UTF8.GetBytes(seed));
return $"pce-{Convert.ToHexStringLower(bytes)[..16]}";
}
}
/// <summary>
/// Factory for creating normalized policy change events.
/// </summary>
public static class PolicyChangeEventFactory
{
/// <summary>
/// Creates an advisory update event.
/// </summary>
public static PolicyChangeEvent CreateAdvisoryUpdated(
string tenantId,
string advisoryId,
string? vulnerabilityId,
IEnumerable<string> affectedPurls,
string source,
DateTimeOffset occurredAt,
DateTimeOffset createdAt,
PolicyChangePriority priority = PolicyChangePriority.Normal,
string? correlationId = null,
ImmutableDictionary<string, string>? metadata = null)
{
var normalizedTenant = NormalizeTenant(tenantId);
var normalizedAdvisoryId = Normalize(advisoryId, nameof(advisoryId));
var normalizedVulnId = vulnerabilityId?.Trim();
var normalizedPurls = NormalizePurls(affectedPurls);
var contentHash = PolicyChangeEvent.ComputeContentHash(
PolicyChangeType.AdvisoryUpdated,
normalizedTenant,
normalizedAdvisoryId,
normalizedVulnId,
normalizedPurls,
null,
null);
var eventId = PolicyChangeEvent.CreateEventId(
normalizedTenant,
PolicyChangeType.AdvisoryUpdated,
source,
occurredAt,
contentHash);
return new PolicyChangeEvent
{
EventId = eventId,
ChangeType = PolicyChangeType.AdvisoryUpdated,
TenantId = normalizedTenant,
OccurredAt = occurredAt,
CreatedAt = createdAt,
Priority = priority,
Source = source,
CorrelationId = correlationId,
AdvisoryId = normalizedAdvisoryId,
VulnerabilityId = normalizedVulnId,
AffectedPurls = normalizedPurls,
ContentHash = contentHash,
Metadata = metadata ?? ImmutableDictionary<string, string>.Empty
};
}
/// <summary>
/// Creates a VEX statement update event.
/// </summary>
public static PolicyChangeEvent CreateVexUpdated(
string tenantId,
string vulnerabilityId,
IEnumerable<string> affectedProductKeys,
string source,
DateTimeOffset occurredAt,
DateTimeOffset createdAt,
PolicyChangePriority priority = PolicyChangePriority.Normal,
string? correlationId = null,
ImmutableDictionary<string, string>? metadata = null)
{
var normalizedTenant = NormalizeTenant(tenantId);
var normalizedVulnId = Normalize(vulnerabilityId, nameof(vulnerabilityId));
var normalizedKeys = NormalizeProductKeys(affectedProductKeys);
var contentHash = PolicyChangeEvent.ComputeContentHash(
PolicyChangeType.VexStatementUpdated,
normalizedTenant,
null,
normalizedVulnId,
null,
normalizedKeys,
null);
var eventId = PolicyChangeEvent.CreateEventId(
normalizedTenant,
PolicyChangeType.VexStatementUpdated,
source,
occurredAt,
contentHash);
return new PolicyChangeEvent
{
EventId = eventId,
ChangeType = PolicyChangeType.VexStatementUpdated,
TenantId = normalizedTenant,
OccurredAt = occurredAt,
CreatedAt = createdAt,
Priority = priority,
Source = source,
CorrelationId = correlationId,
VulnerabilityId = normalizedVulnId,
AffectedProductKeys = normalizedKeys,
ContentHash = contentHash,
Metadata = metadata ?? ImmutableDictionary<string, string>.Empty
};
}
/// <summary>
/// Creates an SBOM update event.
/// </summary>
public static PolicyChangeEvent CreateSbomUpdated(
string tenantId,
string sbomId,
string productKey,
IEnumerable<string> componentPurls,
string source,
DateTimeOffset occurredAt,
DateTimeOffset createdAt,
PolicyChangePriority priority = PolicyChangePriority.Normal,
string? correlationId = null,
ImmutableDictionary<string, string>? metadata = null)
{
var normalizedTenant = NormalizeTenant(tenantId);
var normalizedSbomId = Normalize(sbomId, nameof(sbomId));
var normalizedProductKey = Normalize(productKey, nameof(productKey));
var normalizedPurls = NormalizePurls(componentPurls);
var contentHash = PolicyChangeEvent.ComputeContentHash(
PolicyChangeType.SbomUpdated,
normalizedTenant,
null,
null,
normalizedPurls,
ImmutableArray.Create(normalizedProductKey),
ImmutableArray.Create(normalizedSbomId));
var eventId = PolicyChangeEvent.CreateEventId(
normalizedTenant,
PolicyChangeType.SbomUpdated,
source,
occurredAt,
contentHash);
return new PolicyChangeEvent
{
EventId = eventId,
ChangeType = PolicyChangeType.SbomUpdated,
TenantId = normalizedTenant,
OccurredAt = occurredAt,
CreatedAt = createdAt,
Priority = priority,
Source = source,
CorrelationId = correlationId,
AffectedPurls = normalizedPurls,
AffectedProductKeys = ImmutableArray.Create(normalizedProductKey),
AffectedSbomIds = ImmutableArray.Create(normalizedSbomId),
ContentHash = contentHash,
Metadata = metadata ?? ImmutableDictionary<string, string>.Empty
};
}
/// <summary>
/// Creates a manual trigger event.
/// </summary>
public static PolicyChangeEvent CreateManualTrigger(
string tenantId,
IEnumerable<string>? policyIds,
IEnumerable<string>? sbomIds,
IEnumerable<string>? productKeys,
string requestedBy,
DateTimeOffset createdAt,
PolicyChangePriority priority = PolicyChangePriority.Normal,
string? correlationId = null,
ImmutableDictionary<string, string>? metadata = null)
{
var normalizedTenant = NormalizeTenant(tenantId);
var normalizedPolicyIds = NormalizePolicyIds(policyIds);
var normalizedSbomIds = NormalizeSbomIds(sbomIds);
var normalizedProductKeys = NormalizeProductKeys(productKeys);
var contentHash = PolicyChangeEvent.ComputeContentHash(
PolicyChangeType.ManualTrigger,
normalizedTenant,
null,
null,
null,
normalizedProductKeys,
normalizedSbomIds);
var eventId = PolicyChangeEvent.CreateEventId(
normalizedTenant,
PolicyChangeType.ManualTrigger,
"manual",
createdAt,
contentHash);
return new PolicyChangeEvent
{
EventId = eventId,
ChangeType = PolicyChangeType.ManualTrigger,
TenantId = normalizedTenant,
OccurredAt = createdAt,
CreatedAt = createdAt,
Priority = priority,
Source = "manual",
CorrelationId = correlationId,
PolicyIds = normalizedPolicyIds,
AffectedProductKeys = normalizedProductKeys,
AffectedSbomIds = normalizedSbomIds,
ContentHash = contentHash,
Metadata = (metadata ?? ImmutableDictionary<string, string>.Empty)
.SetItem("requestedBy", requestedBy)
};
}
private static string NormalizeTenant(string tenantId)
{
if (string.IsNullOrWhiteSpace(tenantId))
{
throw new ArgumentException("Tenant ID cannot be null or whitespace", nameof(tenantId));
}
return tenantId.Trim().ToLowerInvariant();
}
private static string Normalize(string value, string name)
{
if (string.IsNullOrWhiteSpace(value))
{
throw new ArgumentException($"{name} cannot be null or whitespace", name);
}
return value.Trim();
}
private static ImmutableArray<string> NormalizePurls(IEnumerable<string>? purls)
{
return (purls ?? Enumerable.Empty<string>())
.Where(p => !string.IsNullOrWhiteSpace(p))
.Select(p => p.Trim())
.Distinct(StringComparer.Ordinal)
.OrderBy(p => p, StringComparer.Ordinal)
.ToImmutableArray();
}
private static ImmutableArray<string> NormalizeProductKeys(IEnumerable<string>? keys)
{
return (keys ?? Enumerable.Empty<string>())
.Where(k => !string.IsNullOrWhiteSpace(k))
.Select(k => k.Trim())
.Distinct(StringComparer.OrdinalIgnoreCase)
.OrderBy(k => k, StringComparer.OrdinalIgnoreCase)
.ToImmutableArray();
}
private static ImmutableArray<string> NormalizeSbomIds(IEnumerable<string>? ids)
{
return (ids ?? Enumerable.Empty<string>())
.Where(id => !string.IsNullOrWhiteSpace(id))
.Select(id => id.Trim())
.Distinct(StringComparer.Ordinal)
.OrderBy(id => id, StringComparer.Ordinal)
.ToImmutableArray();
}
private static ImmutableArray<string> NormalizePolicyIds(IEnumerable<string>? ids)
{
return (ids ?? Enumerable.Empty<string>())
.Where(id => !string.IsNullOrWhiteSpace(id))
.Select(id => id.Trim())
.Distinct(StringComparer.OrdinalIgnoreCase)
.OrderBy(id => id, StringComparer.OrdinalIgnoreCase)
.ToImmutableArray();
}
}
/// <summary>
/// A batch of change events to be processed together.
/// </summary>
public sealed record PolicyChangeBatch
{
/// <summary>
/// Unique batch identifier.
/// </summary>
public required string BatchId { get; init; }
/// <summary>
/// Tenant context.
/// </summary>
public required string TenantId { get; init; }
/// <summary>
/// Events in this batch (deterministically ordered).
/// </summary>
public required ImmutableArray<PolicyChangeEvent> Events { get; init; }
/// <summary>
/// Highest priority in the batch.
/// </summary>
public required PolicyChangePriority Priority { get; init; }
/// <summary>
/// When the batch was created.
/// </summary>
public required DateTimeOffset CreatedAt { get; init; }
/// <summary>
/// Combined affected PURLs from all events.
/// </summary>
public required ImmutableArray<string> AffectedPurls { get; init; }
/// <summary>
/// Combined affected product keys from all events.
/// </summary>
public required ImmutableArray<string> AffectedProductKeys { get; init; }
/// <summary>
/// Combined affected SBOM IDs from all events.
/// </summary>
public required ImmutableArray<string> AffectedSbomIds { get; init; }
/// <summary>
/// Combined vulnerability IDs from all events.
/// </summary>
public required ImmutableArray<string> VulnerabilityIds { get; init; }
}