Files
git.stella-ops.org/src/Scanner/StellaOps.Scanner.Worker/Processing/EpssSignalJob.cs
master 0dc71e760a feat: Add PathViewer and RiskDriftCard components with templates and styles
- Implemented PathViewerComponent for visualizing reachability call paths.
- Added RiskDriftCardComponent to display reachability drift results.
- Created corresponding HTML templates and SCSS styles for both components.
- Introduced test fixtures for reachability analysis in JSON format.
- Enhanced user interaction with collapsible and expandable features in PathViewer.
- Included risk trend visualization and summary metrics in RiskDriftCard.
2025-12-18 18:35:30 +02:00

506 lines
17 KiB
C#

// -----------------------------------------------------------------------------
// EpssSignalJob.cs
// Sprint: SPRINT_3413_0001_0001_epss_live_enrichment
// Tasks: S5-S10 - Signal generation service
// Description: Background job that generates tenant-scoped EPSS signals.
// -----------------------------------------------------------------------------
using System.Diagnostics;
using System.Text.Json;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StellaOps.Scanner.Core.Epss;
using StellaOps.Scanner.Storage.Epss;
using StellaOps.Scanner.Storage.Repositories;
namespace StellaOps.Scanner.Worker.Processing;
/// <summary>
/// Options for the EPSS signal generation job.
/// </summary>
public sealed class EpssSignalOptions
{
/// <summary>
/// Configuration section name.
/// </summary>
public const string SectionName = "Epss:Signal";
/// <summary>
/// Whether the signal job is enabled. Default: true.
/// </summary>
public bool Enabled { get; set; } = true;
/// <summary>
/// Delay after enrichment before generating signals. Default: 30 seconds.
/// </summary>
public TimeSpan PostEnrichmentDelay { get; set; } = TimeSpan.FromSeconds(30);
/// <summary>
/// Batch size for signal generation. Default: 500.
/// </summary>
public int BatchSize { get; set; } = 500;
/// <summary>
/// Signal retention days. Default: 90.
/// </summary>
public int RetentionDays { get; set; } = 90;
}
/// <summary>
/// EPSS signal event types.
/// </summary>
public static class EpssSignalEventTypes
{
/// <summary>
/// Significant score increase (delta >= threshold).
/// </summary>
public const string RiskSpike = "RISK_SPIKE";
/// <summary>
/// Priority band change (e.g., MEDIUM -> HIGH).
/// </summary>
public const string BandChange = "BAND_CHANGE";
/// <summary>
/// New CVE scored for the first time.
/// </summary>
public const string NewHigh = "NEW_HIGH";
/// <summary>
/// CVE dropped from HIGH/CRITICAL to LOW.
/// </summary>
public const string DroppedLow = "DROPPED_LOW";
/// <summary>
/// EPSS model version changed (summary event).
/// </summary>
public const string ModelUpdated = "MODEL_UPDATED";
}
/// <summary>
/// Background service that generates tenant-scoped EPSS signals.
/// Only generates signals for CVEs that are observed in tenant's inventory.
/// </summary>
public sealed class EpssSignalJob : BackgroundService
{
private readonly IEpssRepository _epssRepository;
private readonly IEpssSignalRepository _signalRepository;
private readonly IObservedCveRepository _observedCveRepository;
private readonly IEpssSignalPublisher _signalPublisher;
private readonly IEpssProvider _epssProvider;
private readonly IOptions<EpssSignalOptions> _options;
private readonly TimeProvider _timeProvider;
private readonly ILogger<EpssSignalJob> _logger;
private readonly ActivitySource _activitySource = new("StellaOps.Scanner.EpssSignal");
// Trigger for signal generation
private readonly SemaphoreSlim _signalTrigger = new(0);
// Track last processed model date to detect version changes
private string? _lastModelVersion;
public EpssSignalJob(
IEpssRepository epssRepository,
IEpssSignalRepository signalRepository,
IObservedCveRepository observedCveRepository,
IEpssSignalPublisher signalPublisher,
IEpssProvider epssProvider,
IOptions<EpssSignalOptions> options,
TimeProvider timeProvider,
ILogger<EpssSignalJob> logger)
{
_epssRepository = epssRepository ?? throw new ArgumentNullException(nameof(epssRepository));
_signalRepository = signalRepository ?? throw new ArgumentNullException(nameof(signalRepository));
_observedCveRepository = observedCveRepository ?? throw new ArgumentNullException(nameof(observedCveRepository));
_signalPublisher = signalPublisher ?? throw new ArgumentNullException(nameof(signalPublisher));
_epssProvider = epssProvider ?? throw new ArgumentNullException(nameof(epssProvider));
_options = options ?? throw new ArgumentNullException(nameof(options));
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("EPSS signal job started");
var opts = _options.Value;
if (!opts.Enabled)
{
_logger.LogInformation("EPSS signal job is disabled");
return;
}
while (!stoppingToken.IsCancellationRequested)
{
try
{
// Wait for signal trigger or cancellation
await _signalTrigger.WaitAsync(stoppingToken);
// Add delay after enrichment to ensure data consistency
await Task.Delay(opts.PostEnrichmentDelay, stoppingToken);
await GenerateSignalsAsync(stoppingToken);
// Periodic pruning of old signals
await _signalRepository.PruneAsync(opts.RetentionDays, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "EPSS signal job encountered an error");
}
}
_logger.LogInformation("EPSS signal job stopped");
}
/// <summary>
/// Triggers signal generation. Called after EPSS enrichment completes.
/// </summary>
public void TriggerSignalGeneration()
{
_signalTrigger.Release();
_logger.LogDebug("EPSS signal generation triggered");
}
/// <summary>
/// Generates signals for all tenants based on EPSS changes.
/// </summary>
public async Task GenerateSignalsAsync(CancellationToken cancellationToken = default)
{
using var activity = _activitySource.StartActivity("epss.signal.generate", ActivityKind.Internal);
var stopwatch = Stopwatch.StartNew();
var opts = _options.Value;
_logger.LogInformation("Starting EPSS signal generation");
try
{
// Get current model date
var modelDate = await _epssProvider.GetLatestModelDateAsync(cancellationToken);
if (!modelDate.HasValue)
{
_logger.LogWarning("No EPSS data available for signal generation");
return;
}
activity?.SetTag("epss.model_date", modelDate.Value.ToString("yyyy-MM-dd"));
// Check for model version change (S7)
var currentModelVersion = await GetCurrentModelVersionAsync(modelDate.Value, cancellationToken);
var isModelChange = _lastModelVersion is not null &&
!string.Equals(_lastModelVersion, currentModelVersion, StringComparison.Ordinal);
if (isModelChange)
{
_logger.LogInformation(
"EPSS model version changed: {OldVersion} -> {NewVersion}",
_lastModelVersion,
currentModelVersion);
}
_lastModelVersion = currentModelVersion;
// Get changes from epss_changes table
var changes = await GetEpssChangesAsync(modelDate.Value, cancellationToken);
if (changes.Count == 0)
{
_logger.LogDebug("No EPSS changes to process for signals");
return;
}
_logger.LogInformation("Processing {Count} EPSS changes for signal generation", changes.Count);
activity?.SetTag("epss.change_count", changes.Count);
var totalSignals = 0;
var filteredCount = 0;
// Get all active tenants (S6)
var activeTenants = await _observedCveRepository.GetActiveTenantsAsync(cancellationToken);
if (activeTenants.Count == 0)
{
_logger.LogDebug("No active tenants found; using default tenant");
activeTenants = new[] { Guid.Empty };
}
// For each tenant, filter changes to only observed CVEs
foreach (var tenantId in activeTenants)
{
// Get CVE IDs from changes
var changeCveIds = changes.Select(c => c.CveId).Distinct().ToList();
// Filter to only observed CVEs for this tenant (S6)
var observedCves = await _observedCveRepository.FilterObservedAsync(
tenantId,
changeCveIds,
cancellationToken);
var tenantChanges = changes
.Where(c => observedCves.Contains(c.CveId))
.ToArray();
if (tenantChanges.Length == 0)
{
continue;
}
filteredCount += changes.Length - tenantChanges.Length;
foreach (var batch in tenantChanges.Chunk(opts.BatchSize))
{
var signals = GenerateSignalsForBatch(
batch,
tenantId,
modelDate.Value,
currentModelVersion,
isModelChange);
if (signals.Count > 0)
{
// Store signals in database
var created = await _signalRepository.CreateBulkAsync(signals, cancellationToken);
totalSignals += created;
// Publish signals to notification system (S9)
var published = await _signalPublisher.PublishBatchAsync(signals, cancellationToken);
_logger.LogDebug(
"Published {Published}/{Total} EPSS signals for tenant {TenantId}",
published,
signals.Count,
tenantId);
}
}
// If model changed, emit summary signal per tenant (S8)
if (isModelChange)
{
await EmitModelUpdatedSignalAsync(
tenantId,
modelDate.Value,
_lastModelVersion!,
currentModelVersion!,
tenantChanges.Length,
cancellationToken);
totalSignals++;
}
}
stopwatch.Stop();
_logger.LogInformation(
"EPSS signal generation completed: signals={SignalCount}, changes={ChangeCount}, filtered={FilteredCount}, tenants={TenantCount}, duration={Duration}ms",
totalSignals,
changes.Count,
filteredCount,
activeTenants.Count,
stopwatch.ElapsedMilliseconds);
activity?.SetTag("epss.signal_count", totalSignals);
activity?.SetTag("epss.filtered_count", filteredCount);
activity?.SetTag("epss.tenant_count", activeTenants.Count);
activity?.SetTag("epss.duration_ms", stopwatch.ElapsedMilliseconds);
}
catch (Exception ex)
{
_logger.LogError(ex, "EPSS signal generation failed");
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
throw;
}
}
private IReadOnlyList<EpssSignal> GenerateSignalsForBatch(
EpssChangeRecord[] changes,
Guid tenantId,
DateOnly modelDate,
string? modelVersion,
bool isModelChange)
{
var signals = new List<EpssSignal>();
foreach (var change in changes)
{
// Skip generating individual signals on model change day if suppression is enabled
// (would check tenant config in production)
if (isModelChange && ShouldSuppressOnModelChange(change))
{
continue;
}
var eventType = DetermineEventType(change);
if (string.IsNullOrEmpty(eventType))
{
continue;
}
var dedupeKey = EpssExplainHashCalculator.ComputeDedupeKey(
modelDate,
change.CveId,
eventType,
change.PreviousBand.ToString(),
ComputeNewBand(change).ToString());
var explainHash = EpssExplainHashCalculator.ComputeExplainHash(
modelDate,
change.CveId,
eventType,
change.PreviousBand.ToString(),
ComputeNewBand(change).ToString(),
change.NewScore,
0, // Percentile would come from EPSS data
modelVersion);
var payload = JsonSerializer.Serialize(new
{
cveId = change.CveId,
oldScore = change.PreviousScore,
newScore = change.NewScore,
oldBand = change.PreviousBand.ToString(),
newBand = ComputeNewBand(change).ToString(),
flags = change.Flags.ToString(),
modelVersion
});
signals.Add(new EpssSignal
{
TenantId = tenantId,
ModelDate = modelDate,
CveId = change.CveId,
EventType = eventType,
RiskBand = ComputeNewBand(change).ToString(),
EpssScore = change.NewScore,
EpssDelta = change.NewScore - (change.PreviousScore ?? 0),
IsModelChange = isModelChange,
ModelVersion = modelVersion,
DedupeKey = dedupeKey,
ExplainHash = explainHash,
Payload = payload
});
}
return signals;
}
private static string? DetermineEventType(EpssChangeRecord change)
{
if (change.Flags.HasFlag(EpssChangeFlags.NewScored))
{
return EpssSignalEventTypes.NewHigh;
}
if (change.Flags.HasFlag(EpssChangeFlags.CrossedHigh))
{
return EpssSignalEventTypes.BandChange;
}
if (change.Flags.HasFlag(EpssChangeFlags.BigJumpUp))
{
return EpssSignalEventTypes.RiskSpike;
}
if (change.Flags.HasFlag(EpssChangeFlags.DroppedLow))
{
return EpssSignalEventTypes.DroppedLow;
}
return null;
}
private static EpssPriorityBand ComputeNewBand(EpssChangeRecord change)
{
// Simplified band calculation - would use EpssPriorityCalculator in production
if (change.NewScore >= 0.5)
{
return EpssPriorityBand.Critical;
}
if (change.NewScore >= 0.2)
{
return EpssPriorityBand.High;
}
if (change.NewScore >= 0.05)
{
return EpssPriorityBand.Medium;
}
return EpssPriorityBand.Low;
}
private static bool ShouldSuppressOnModelChange(EpssChangeRecord change)
{
// Suppress RISK_SPIKE and BAND_CHANGE on model change days to avoid alert storms
return change.Flags.HasFlag(EpssChangeFlags.BigJumpUp) ||
change.Flags.HasFlag(EpssChangeFlags.BigJumpDown) ||
change.Flags.HasFlag(EpssChangeFlags.CrossedHigh);
}
private async Task<string?> GetCurrentModelVersionAsync(DateOnly modelDate, CancellationToken cancellationToken)
{
// Would query from epss_import_run or epss_raw table
// For now, return a placeholder based on date
return $"v{modelDate:yyyy.MM.dd}";
}
private async Task<IReadOnlyList<EpssChangeRecord>> GetEpssChangesAsync(
DateOnly modelDate,
CancellationToken cancellationToken)
{
// TODO: Implement repository method to get changes from epss_changes table
// For now, return empty list
return Array.Empty<EpssChangeRecord>();
}
private async Task EmitModelUpdatedSignalAsync(
Guid tenantId,
DateOnly modelDate,
string oldVersion,
string newVersion,
int affectedCveCount,
CancellationToken cancellationToken)
{
var payload = JsonSerializer.Serialize(new
{
oldVersion,
newVersion,
affectedCveCount,
suppressedSignals = true
});
var signal = new EpssSignal
{
TenantId = tenantId,
ModelDate = modelDate,
CveId = "MODEL_UPDATE",
EventType = EpssSignalEventTypes.ModelUpdated,
IsModelChange = true,
ModelVersion = newVersion,
DedupeKey = $"{modelDate:yyyy-MM-dd}:MODEL_UPDATE:{oldVersion}->{newVersion}",
ExplainHash = EpssExplainHashCalculator.ComputeExplainHash(
modelDate,
"MODEL_UPDATE",
EpssSignalEventTypes.ModelUpdated,
oldVersion,
newVersion,
0,
0,
newVersion),
Payload = payload
};
await _signalRepository.CreateAsync(signal, cancellationToken);
_logger.LogInformation(
"Emitted MODEL_UPDATED signal: {OldVersion} -> {NewVersion}, affected {Count} CVEs",
oldVersion,
newVersion,
affectedCveCount);
}
}