save work
This commit is contained in:
@@ -0,0 +1,11 @@
|
||||
using System.Diagnostics.Metrics;
|
||||
|
||||
namespace StellaOps.Scanner.Worker.Diagnostics;
|
||||
|
||||
public static class EpssWorkerInstrumentation
|
||||
{
|
||||
public const string MeterName = "StellaOps.Scanner.Epss";
|
||||
|
||||
public static Meter Meter { get; } = new(MeterName, version: "1.0.0");
|
||||
}
|
||||
|
||||
@@ -6,10 +6,12 @@
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
using System.Diagnostics;
|
||||
using System.Diagnostics.Metrics;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using StellaOps.Scanner.Core.Epss;
|
||||
using StellaOps.Scanner.Worker.Diagnostics;
|
||||
using StellaOps.Scanner.Storage.Epss;
|
||||
using StellaOps.Scanner.Storage.Repositories;
|
||||
|
||||
@@ -73,11 +75,6 @@ public sealed class EpssEnrichmentOptions
|
||||
EpssChangeFlags.CrossedHigh |
|
||||
EpssChangeFlags.BigJumpUp |
|
||||
EpssChangeFlags.BigJumpDown;
|
||||
|
||||
/// <summary>
|
||||
/// Suppress signals on model version change. Default: true.
|
||||
/// </summary>
|
||||
public bool SuppressSignalsOnModelChange { get; set; } = true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -86,9 +83,27 @@ public sealed class EpssEnrichmentOptions
|
||||
/// </summary>
|
||||
public sealed class EpssEnrichmentJob : BackgroundService
|
||||
{
|
||||
private static readonly Counter<long> RunsTotal = EpssWorkerInstrumentation.Meter.CreateCounter<long>(
|
||||
"epss_enrichment_runs_total",
|
||||
description: "Number of EPSS enrichment job runs.");
|
||||
|
||||
private static readonly Histogram<double> DurationMs = EpssWorkerInstrumentation.Meter.CreateHistogram<double>(
|
||||
"epss_enrichment_duration_ms",
|
||||
unit: "ms",
|
||||
description: "EPSS enrichment job duration in milliseconds.");
|
||||
|
||||
private static readonly Counter<long> InstancesUpdatedTotal = EpssWorkerInstrumentation.Meter.CreateCounter<long>(
|
||||
"epss_enrichment_updated_total",
|
||||
description: "Number of vulnerability instances updated during EPSS enrichment (best-effort, depends on configured sink).");
|
||||
|
||||
private static readonly Counter<long> BandChangesTotal = EpssWorkerInstrumentation.Meter.CreateCounter<long>(
|
||||
"epss_enrichment_band_changes_total",
|
||||
description: "Number of EPSS priority band changes detected during enrichment.");
|
||||
|
||||
private readonly IEpssRepository _epssRepository;
|
||||
private readonly IEpssProvider _epssProvider;
|
||||
private readonly IEpssSignalPublisher _signalPublisher;
|
||||
private readonly EpssSignalJob? _signalJob;
|
||||
private readonly IOptions<EpssEnrichmentOptions> _options;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly ILogger<EpssEnrichmentJob> _logger;
|
||||
@@ -103,11 +118,13 @@ public sealed class EpssEnrichmentJob : BackgroundService
|
||||
IEpssSignalPublisher signalPublisher,
|
||||
IOptions<EpssEnrichmentOptions> options,
|
||||
TimeProvider timeProvider,
|
||||
ILogger<EpssEnrichmentJob> logger)
|
||||
ILogger<EpssEnrichmentJob> logger,
|
||||
EpssSignalJob? signalJob = null)
|
||||
{
|
||||
_epssRepository = epssRepository ?? throw new ArgumentNullException(nameof(epssRepository));
|
||||
_epssProvider = epssProvider ?? throw new ArgumentNullException(nameof(epssProvider));
|
||||
_signalPublisher = signalPublisher ?? throw new ArgumentNullException(nameof(signalPublisher));
|
||||
_signalJob = signalJob;
|
||||
_options = options ?? throw new ArgumentNullException(nameof(options));
|
||||
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
@@ -167,6 +184,9 @@ public sealed class EpssEnrichmentJob : BackgroundService
|
||||
using var activity = _activitySource.StartActivity("epss.enrich", ActivityKind.Internal);
|
||||
var stopwatch = Stopwatch.StartNew();
|
||||
var opts = _options.Value;
|
||||
DateOnly? modelDateForLog = null;
|
||||
var shouldTriggerSignals = false;
|
||||
var enrichmentSucceeded = false;
|
||||
|
||||
_logger.LogInformation("Starting EPSS enrichment");
|
||||
|
||||
@@ -177,9 +197,12 @@ public sealed class EpssEnrichmentJob : BackgroundService
|
||||
if (!modelDate.HasValue)
|
||||
{
|
||||
_logger.LogWarning("No EPSS data available for enrichment");
|
||||
RunsTotal.Add(1, new TagList { { "result", "skipped" } });
|
||||
return;
|
||||
}
|
||||
|
||||
modelDateForLog = modelDate.Value;
|
||||
shouldTriggerSignals = true;
|
||||
activity?.SetTag("epss.model_date", modelDate.Value.ToString("yyyy-MM-dd"));
|
||||
_logger.LogDebug("Using EPSS model date: {ModelDate}", modelDate.Value);
|
||||
|
||||
@@ -189,6 +212,8 @@ public sealed class EpssEnrichmentJob : BackgroundService
|
||||
if (changedCves.Count == 0)
|
||||
{
|
||||
_logger.LogDebug("No CVE changes to process");
|
||||
RunsTotal.Add(1, new TagList { { "result", "noop" } });
|
||||
enrichmentSucceeded = true;
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -221,13 +246,28 @@ public sealed class EpssEnrichmentJob : BackgroundService
|
||||
activity?.SetTag("epss.updated_count", totalUpdated);
|
||||
activity?.SetTag("epss.band_change_count", totalBandChanges);
|
||||
activity?.SetTag("epss.duration_ms", stopwatch.ElapsedMilliseconds);
|
||||
|
||||
InstancesUpdatedTotal.Add(totalUpdated);
|
||||
BandChangesTotal.Add(totalBandChanges);
|
||||
DurationMs.Record(stopwatch.Elapsed.TotalMilliseconds);
|
||||
RunsTotal.Add(1, new TagList { { "result", "success" } });
|
||||
enrichmentSucceeded = true;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "EPSS enrichment failed");
|
||||
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
|
||||
RunsTotal.Add(1, new TagList { { "result", "failure" } });
|
||||
throw;
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (shouldTriggerSignals && enrichmentSucceeded && _signalJob is not null)
|
||||
{
|
||||
_signalJob.TriggerSignalGeneration();
|
||||
_logger.LogDebug("Triggered EPSS signal generation for model date {ModelDate}", modelDateForLog);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<IReadOnlyList<EpssChangeRecord>> GetChangedCvesAsync(
|
||||
@@ -238,7 +278,10 @@ public sealed class EpssEnrichmentJob : BackgroundService
|
||||
// Query epss_changes table for CVEs with matching flags for the model date (Task #4)
|
||||
_logger.LogDebug("Querying EPSS changes for model date {ModelDate} with flags {Flags}", modelDate, flags);
|
||||
|
||||
var changes = await _epssRepository.GetChangesAsync(modelDate, flags, cancellationToken: cancellationToken);
|
||||
var changes = await _epssRepository.GetChangesAsync(
|
||||
modelDate,
|
||||
flags: flags == EpssChangeFlags.None ? null : flags,
|
||||
cancellationToken: cancellationToken);
|
||||
|
||||
_logger.LogDebug("Found {Count} EPSS changes matching flags {Flags}", changes.Count, flags);
|
||||
|
||||
@@ -311,7 +354,7 @@ public sealed class EpssEnrichmentJob : BackgroundService
|
||||
return EpssPriorityBand.Low;
|
||||
}
|
||||
|
||||
private Task EmitPriorityChangedEventAsync(
|
||||
private async Task EmitPriorityChangedEventAsync(
|
||||
string cveId,
|
||||
EpssPriorityBand previousBand,
|
||||
EpssPriorityBand newBand,
|
||||
@@ -335,7 +378,7 @@ public sealed class EpssEnrichmentJob : BackgroundService
|
||||
newBand.ToString(),
|
||||
evidence.Score,
|
||||
evidence.ModelDate,
|
||||
cancellationToken);
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (!result.Success)
|
||||
{
|
||||
@@ -346,39 +389,3 @@ public sealed class EpssEnrichmentJob : BackgroundService
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Record representing an EPSS change that needs processing.
|
||||
/// </summary>
|
||||
public sealed record EpssChangeRecord
|
||||
{
|
||||
/// <summary>
|
||||
/// CVE identifier.
|
||||
/// </summary>
|
||||
public required string CveId { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Change flags indicating what changed.
|
||||
/// </summary>
|
||||
public EpssChangeFlags Flags { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Previous EPSS score (if available).
|
||||
/// </summary>
|
||||
public double? PreviousScore { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// New EPSS score.
|
||||
/// </summary>
|
||||
public double NewScore { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Previous priority band (if available).
|
||||
/// </summary>
|
||||
public EpssPriorityBand PreviousBand { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Model date for this change.
|
||||
/// </summary>
|
||||
public DateOnly ModelDate { get; init; }
|
||||
}
|
||||
|
||||
@@ -88,29 +88,28 @@ public sealed class EpssEnrichmentStageExecutor : IScanStageExecutor
|
||||
var cveIds = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
// Extract from OS package analyzer results
|
||||
if (context.Analysis.TryGet<Dictionary<string, object>>(ScanAnalysisKeys.OsPackageAnalyzers, out var osResults) && osResults is not null)
|
||||
if (context.Analysis.TryGet<object>(ScanAnalysisKeys.OsPackageAnalyzers, out var osResults) &&
|
||||
osResults is System.Collections.IDictionary osDictionary)
|
||||
{
|
||||
foreach (var analyzerResult in osResults.Values)
|
||||
foreach (var analyzerResult in osDictionary.Values)
|
||||
{
|
||||
ExtractCvesFromAnalyzerResult(analyzerResult, cveIds);
|
||||
if (analyzerResult is not null)
|
||||
{
|
||||
ExtractCvesFromAnalyzerResult(analyzerResult, cveIds);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Extract from language analyzer results
|
||||
if (context.Analysis.TryGet<Dictionary<string, object>>(ScanAnalysisKeys.LanguagePackageAnalyzers, out var langResults) && langResults is not null)
|
||||
if (context.Analysis.TryGet<object>(ScanAnalysisKeys.LanguageAnalyzerResults, out var langResults) &&
|
||||
langResults is System.Collections.IDictionary langDictionary)
|
||||
{
|
||||
foreach (var analyzerResult in langResults.Values)
|
||||
foreach (var analyzerResult in langDictionary.Values)
|
||||
{
|
||||
ExtractCvesFromAnalyzerResult(analyzerResult, cveIds);
|
||||
}
|
||||
}
|
||||
|
||||
// Extract from consolidated findings if available
|
||||
if (context.Analysis.TryGet<IEnumerable<object>>(ScanAnalysisKeys.ConsolidatedFindings, out var findings) && findings is not null)
|
||||
{
|
||||
foreach (var finding in findings)
|
||||
{
|
||||
ExtractCvesFromFinding(finding, cveIds);
|
||||
if (analyzerResult is not null)
|
||||
{
|
||||
ExtractCvesFromAnalyzerResult(analyzerResult, cveIds);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -182,24 +181,3 @@ public sealed class EpssEnrichmentStageExecutor : IScanStageExecutor
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Well-known keys for EPSS-related analysis data.
|
||||
/// </summary>
|
||||
public static partial class ScanAnalysisKeys
|
||||
{
|
||||
/// <summary>
|
||||
/// Dictionary of CVE ID to EpssEvidence for enriched findings.
|
||||
/// </summary>
|
||||
public const string EpssEvidence = "epss.evidence";
|
||||
|
||||
/// <summary>
|
||||
/// The EPSS model date used for enrichment.
|
||||
/// </summary>
|
||||
public const string EpssModelDate = "epss.model_date";
|
||||
|
||||
/// <summary>
|
||||
/// List of CVE IDs that were not found in EPSS data.
|
||||
/// </summary>
|
||||
public const string EpssNotFoundCves = "epss.not_found";
|
||||
}
|
||||
|
||||
@@ -6,6 +6,9 @@
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
using System.Diagnostics;
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
@@ -68,6 +71,7 @@ public sealed class EpssIngestJob : BackgroundService
|
||||
{
|
||||
private readonly IEpssRepository _repository;
|
||||
private readonly IEpssRawRepository? _rawRepository;
|
||||
private readonly EpssEnrichmentJob? _enrichmentJob;
|
||||
private readonly EpssOnlineSource _onlineSource;
|
||||
private readonly EpssBundleSource _bundleSource;
|
||||
private readonly EpssCsvStreamParser _parser;
|
||||
@@ -84,10 +88,12 @@ public sealed class EpssIngestJob : BackgroundService
|
||||
IOptions<EpssIngestOptions> options,
|
||||
TimeProvider timeProvider,
|
||||
ILogger<EpssIngestJob> logger,
|
||||
IEpssRawRepository? rawRepository = null)
|
||||
IEpssRawRepository? rawRepository = null,
|
||||
EpssEnrichmentJob? enrichmentJob = null)
|
||||
{
|
||||
_repository = repository ?? throw new ArgumentNullException(nameof(repository));
|
||||
_rawRepository = rawRepository; // Optional - raw storage for replay capability
|
||||
_enrichmentJob = enrichmentJob; // Optional - live enrichment trigger
|
||||
_onlineSource = onlineSource ?? throw new ArgumentNullException(nameof(onlineSource));
|
||||
_bundleSource = bundleSource ?? throw new ArgumentNullException(nameof(bundleSource));
|
||||
_parser = parser ?? throw new ArgumentNullException(nameof(parser));
|
||||
@@ -174,29 +180,43 @@ public sealed class EpssIngestJob : BackgroundService
|
||||
fileSha256,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
_logger.LogDebug("Created import run {ImportRunId}", importRun.ImportRunId);
|
||||
_logger.LogDebug("Created import run {ImportRunId}", importRun.ImportRunId);
|
||||
|
||||
try
|
||||
{
|
||||
// Parse and write snapshot
|
||||
await using var stream = new MemoryStream(fileContent);
|
||||
var session = _parser.ParseGzip(stream);
|
||||
await using var session = _parser.ParseGzip(stream);
|
||||
|
||||
System.Buffers.ArrayBufferWriter<byte>? rawPayloadBuffer = null;
|
||||
Utf8JsonWriter? rawPayloadWriter = null;
|
||||
|
||||
var rows = (IAsyncEnumerable<EpssScoreRow>)session;
|
||||
if (_rawRepository is not null)
|
||||
{
|
||||
rawPayloadBuffer = new System.Buffers.ArrayBufferWriter<byte>();
|
||||
rawPayloadWriter = new Utf8JsonWriter(rawPayloadBuffer, new JsonWriterOptions { Indented = false });
|
||||
rows = TeeRowsWithRawCaptureAsync(session, rawPayloadWriter, cancellationToken);
|
||||
}
|
||||
|
||||
var writeResult = await _repository.WriteSnapshotAsync(
|
||||
importRun.ImportRunId,
|
||||
modelDate,
|
||||
_timeProvider.GetUtcNow(),
|
||||
session,
|
||||
rows,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// Store raw payload for replay capability (Sprint: SPRINT_3413_0001_0001, Task: R2)
|
||||
if (_rawRepository is not null)
|
||||
if (_rawRepository is not null && rawPayloadBuffer is not null)
|
||||
{
|
||||
rawPayloadWriter?.Dispose();
|
||||
|
||||
await StoreRawPayloadAsync(
|
||||
importRun.ImportRunId,
|
||||
sourceFile.SourceUri,
|
||||
modelDate,
|
||||
session,
|
||||
rawPayloadBuffer.WrittenMemory,
|
||||
fileContent.Length,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
@@ -222,6 +242,15 @@ public sealed class EpssIngestJob : BackgroundService
|
||||
activity?.SetTag("epss.row_count", writeResult.RowCount);
|
||||
activity?.SetTag("epss.cve_count", writeResult.DistinctCveCount);
|
||||
activity?.SetTag("epss.duration_ms", stopwatch.ElapsedMilliseconds);
|
||||
|
||||
if (_enrichmentJob is not null)
|
||||
{
|
||||
_enrichmentJob.TriggerEnrichment();
|
||||
_logger.LogDebug(
|
||||
"Triggered EPSS enrichment for {ModelDate} after import run {ImportRunId}",
|
||||
modelDate,
|
||||
importRun.ImportRunId);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@@ -303,7 +332,8 @@ public sealed class EpssIngestJob : BackgroundService
|
||||
Guid importRunId,
|
||||
string sourceUri,
|
||||
DateOnly modelDate,
|
||||
EpssParsedSession session,
|
||||
EpssCsvStreamParser.EpssCsvParseSession session,
|
||||
ReadOnlyMemory<byte> payloadBytes,
|
||||
long compressedSize,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
@@ -314,18 +344,8 @@ public sealed class EpssIngestJob : BackgroundService
|
||||
|
||||
try
|
||||
{
|
||||
// Convert parsed rows to JSON array for raw storage
|
||||
var payload = System.Text.Json.JsonSerializer.Serialize(
|
||||
session.Rows.Select(r => new
|
||||
{
|
||||
cve = r.CveId,
|
||||
epss = r.Score,
|
||||
percentile = r.Percentile
|
||||
}),
|
||||
new System.Text.Json.JsonSerializerOptions { WriteIndented = false });
|
||||
|
||||
var payloadBytes = System.Text.Encoding.UTF8.GetBytes(payload);
|
||||
var payloadSha256 = System.Security.Cryptography.SHA256.HashData(payloadBytes);
|
||||
var payloadSha256 = System.Security.Cryptography.SHA256.HashData(payloadBytes.Span);
|
||||
var payload = Encoding.UTF8.GetString(payloadBytes.Span);
|
||||
|
||||
var raw = new EpssRaw
|
||||
{
|
||||
@@ -333,12 +353,11 @@ public sealed class EpssIngestJob : BackgroundService
|
||||
AsOfDate = modelDate,
|
||||
Payload = payload,
|
||||
PayloadSha256 = payloadSha256,
|
||||
HeaderComment = session.HeaderComment,
|
||||
ModelVersion = session.ModelVersionTag,
|
||||
PublishedDate = session.PublishedDate,
|
||||
RowCount = session.RowCount,
|
||||
CompressedSize = compressedSize,
|
||||
DecompressedSize = payloadBytes.LongLength,
|
||||
DecompressedSize = payloadBytes.Length,
|
||||
ImportRunId = importRunId
|
||||
};
|
||||
|
||||
@@ -359,4 +378,28 @@ public sealed class EpssIngestJob : BackgroundService
|
||||
modelDate);
|
||||
}
|
||||
}
|
||||
|
||||
private static async IAsyncEnumerable<EpssScoreRow> TeeRowsWithRawCaptureAsync(
|
||||
IAsyncEnumerable<EpssScoreRow> rows,
|
||||
Utf8JsonWriter writer,
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
writer.WriteStartArray();
|
||||
|
||||
await foreach (var row in rows.WithCancellation(cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
writer.WriteStartObject();
|
||||
writer.WriteString("cve", row.CveId);
|
||||
writer.WriteNumber("epss", row.Score);
|
||||
writer.WriteNumber("percentile", row.Percentile);
|
||||
writer.WriteEndObject();
|
||||
|
||||
yield return row;
|
||||
}
|
||||
|
||||
writer.WriteEndArray();
|
||||
await writer.FlushAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
using System.Diagnostics;
|
||||
using System.Diagnostics.Metrics;
|
||||
using System.Text.Json;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
@@ -13,6 +14,7 @@ using Microsoft.Extensions.Options;
|
||||
using StellaOps.Scanner.Core.Epss;
|
||||
using StellaOps.Scanner.Storage.Epss;
|
||||
using StellaOps.Scanner.Storage.Repositories;
|
||||
using StellaOps.Scanner.Worker.Diagnostics;
|
||||
|
||||
namespace StellaOps.Scanner.Worker.Processing;
|
||||
|
||||
@@ -45,6 +47,11 @@ public sealed class EpssSignalOptions
|
||||
/// Signal retention days. Default: 90.
|
||||
/// </summary>
|
||||
public int RetentionDays { get; set; } = 90;
|
||||
|
||||
/// <summary>
|
||||
/// Suppress individual signals on model version change days. Default: true.
|
||||
/// </summary>
|
||||
public bool SuppressSignalsOnModelChange { get; set; } = true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -84,6 +91,19 @@ public static class EpssSignalEventTypes
|
||||
/// </summary>
|
||||
public sealed class EpssSignalJob : BackgroundService
|
||||
{
|
||||
private static readonly Counter<long> RunsTotal = EpssWorkerInstrumentation.Meter.CreateCounter<long>(
|
||||
"epss_signal_runs_total",
|
||||
description: "Number of EPSS signal generation job runs.");
|
||||
|
||||
private static readonly Histogram<double> DurationMs = EpssWorkerInstrumentation.Meter.CreateHistogram<double>(
|
||||
"epss_signal_duration_ms",
|
||||
unit: "ms",
|
||||
description: "EPSS signal generation job duration in milliseconds.");
|
||||
|
||||
private static readonly Counter<long> SignalsEmittedTotal = EpssWorkerInstrumentation.Meter.CreateCounter<long>(
|
||||
"epss_signals_emitted_total",
|
||||
description: "Number of EPSS signals emitted, labeled by event type and tenant.");
|
||||
|
||||
private readonly IEpssRepository _epssRepository;
|
||||
private readonly IEpssSignalRepository _signalRepository;
|
||||
private readonly IObservedCveRepository _observedCveRepository;
|
||||
@@ -177,6 +197,7 @@ public sealed class EpssSignalJob : BackgroundService
|
||||
using var activity = _activitySource.StartActivity("epss.signal.generate", ActivityKind.Internal);
|
||||
var stopwatch = Stopwatch.StartNew();
|
||||
var opts = _options.Value;
|
||||
var suppressSignalsOnModelChange = opts.SuppressSignalsOnModelChange;
|
||||
|
||||
_logger.LogInformation("Starting EPSS signal generation");
|
||||
|
||||
@@ -187,21 +208,24 @@ public sealed class EpssSignalJob : BackgroundService
|
||||
if (!modelDate.HasValue)
|
||||
{
|
||||
_logger.LogWarning("No EPSS data available for signal generation");
|
||||
RunsTotal.Add(1, new TagList { { "result", "skipped" } });
|
||||
return;
|
||||
}
|
||||
|
||||
activity?.SetTag("epss.model_date", modelDate.Value.ToString("yyyy-MM-dd"));
|
||||
|
||||
// Check for model version change (S7)
|
||||
var previousModelVersion = _lastModelVersion;
|
||||
var currentModelVersion = await GetCurrentModelVersionAsync(modelDate.Value, cancellationToken);
|
||||
var isModelChange = _lastModelVersion is not null &&
|
||||
!string.Equals(_lastModelVersion, currentModelVersion, StringComparison.Ordinal);
|
||||
var isModelChange = previousModelVersion is not null &&
|
||||
currentModelVersion is not null &&
|
||||
!string.Equals(previousModelVersion, currentModelVersion, StringComparison.Ordinal);
|
||||
|
||||
if (isModelChange)
|
||||
{
|
||||
_logger.LogInformation(
|
||||
"EPSS model version changed: {OldVersion} -> {NewVersion}",
|
||||
_lastModelVersion,
|
||||
previousModelVersion,
|
||||
currentModelVersion);
|
||||
}
|
||||
|
||||
@@ -212,6 +236,7 @@ public sealed class EpssSignalJob : BackgroundService
|
||||
if (changes.Count == 0)
|
||||
{
|
||||
_logger.LogDebug("No EPSS changes to process for signals");
|
||||
RunsTotal.Add(1, new TagList { { "result", "noop" } });
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -251,7 +276,7 @@ public sealed class EpssSignalJob : BackgroundService
|
||||
continue;
|
||||
}
|
||||
|
||||
filteredCount += changes.Length - tenantChanges.Length;
|
||||
filteredCount += changes.Count - tenantChanges.Length;
|
||||
|
||||
foreach (var batch in tenantChanges.Chunk(opts.BatchSize))
|
||||
{
|
||||
@@ -275,20 +300,36 @@ public sealed class EpssSignalJob : BackgroundService
|
||||
published,
|
||||
signals.Count,
|
||||
tenantId);
|
||||
|
||||
foreach (var signal in signals)
|
||||
{
|
||||
SignalsEmittedTotal.Add(1, new TagList
|
||||
{
|
||||
{ "event_type", signal.EventType },
|
||||
{ "tenant_id", signal.TenantId.ToString("D") }
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If model changed, emit summary signal per tenant (S8)
|
||||
if (isModelChange)
|
||||
if (isModelChange && previousModelVersion is not null && currentModelVersion is not null)
|
||||
{
|
||||
await EmitModelUpdatedSignalAsync(
|
||||
tenantId,
|
||||
modelDate.Value,
|
||||
_lastModelVersion!,
|
||||
currentModelVersion!,
|
||||
previousModelVersion,
|
||||
currentModelVersion,
|
||||
suppressedSignals: suppressSignalsOnModelChange,
|
||||
tenantChanges.Length,
|
||||
cancellationToken);
|
||||
totalSignals++;
|
||||
|
||||
SignalsEmittedTotal.Add(1, new TagList
|
||||
{
|
||||
{ "event_type", EpssSignalEventTypes.ModelUpdated },
|
||||
{ "tenant_id", tenantId.ToString("D") }
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -306,11 +347,15 @@ public sealed class EpssSignalJob : BackgroundService
|
||||
activity?.SetTag("epss.filtered_count", filteredCount);
|
||||
activity?.SetTag("epss.tenant_count", activeTenants.Count);
|
||||
activity?.SetTag("epss.duration_ms", stopwatch.ElapsedMilliseconds);
|
||||
|
||||
DurationMs.Record(stopwatch.Elapsed.TotalMilliseconds);
|
||||
RunsTotal.Add(1, new TagList { { "result", "success" } });
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "EPSS signal generation failed");
|
||||
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
|
||||
RunsTotal.Add(1, new TagList { { "result", "failure" } });
|
||||
throw;
|
||||
}
|
||||
}
|
||||
@@ -322,18 +367,20 @@ public sealed class EpssSignalJob : BackgroundService
|
||||
string? modelVersion,
|
||||
bool isModelChange)
|
||||
{
|
||||
var suppressSignalsOnModelChange = _options.Value.SuppressSignalsOnModelChange;
|
||||
var signals = new List<EpssSignal>();
|
||||
|
||||
foreach (var change in changes)
|
||||
{
|
||||
// Skip generating individual signals on model change day if suppression is enabled
|
||||
// (would check tenant config in production)
|
||||
if (isModelChange && ShouldSuppressOnModelChange(change))
|
||||
if (isModelChange && suppressSignalsOnModelChange && ShouldSuppressOnModelChange(change))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
var eventType = DetermineEventType(change);
|
||||
var newBand = ComputeNewBand(change.NewPercentile);
|
||||
var eventType = DetermineEventType(change, newBand);
|
||||
if (string.IsNullOrEmpty(eventType))
|
||||
{
|
||||
continue;
|
||||
@@ -344,16 +391,16 @@ public sealed class EpssSignalJob : BackgroundService
|
||||
change.CveId,
|
||||
eventType,
|
||||
change.PreviousBand.ToString(),
|
||||
ComputeNewBand(change).ToString());
|
||||
newBand.ToString());
|
||||
|
||||
var explainHash = EpssExplainHashCalculator.ComputeExplainHash(
|
||||
modelDate,
|
||||
change.CveId,
|
||||
eventType,
|
||||
change.PreviousBand.ToString(),
|
||||
ComputeNewBand(change).ToString(),
|
||||
newBand.ToString(),
|
||||
change.NewScore,
|
||||
0, // Percentile would come from EPSS data
|
||||
change.NewPercentile,
|
||||
modelVersion);
|
||||
|
||||
var payload = JsonSerializer.Serialize(new
|
||||
@@ -362,20 +409,23 @@ public sealed class EpssSignalJob : BackgroundService
|
||||
oldScore = change.PreviousScore,
|
||||
newScore = change.NewScore,
|
||||
oldBand = change.PreviousBand.ToString(),
|
||||
newBand = ComputeNewBand(change).ToString(),
|
||||
newBand = newBand.ToString(),
|
||||
flags = change.Flags.ToString(),
|
||||
modelVersion
|
||||
});
|
||||
|
||||
double? delta = change.PreviousScore is null ? null : change.NewScore - change.PreviousScore.Value;
|
||||
|
||||
signals.Add(new EpssSignal
|
||||
{
|
||||
TenantId = tenantId,
|
||||
ModelDate = modelDate,
|
||||
CveId = change.CveId,
|
||||
EventType = eventType,
|
||||
RiskBand = ComputeNewBand(change).ToString(),
|
||||
RiskBand = newBand.ToString(),
|
||||
EpssScore = change.NewScore,
|
||||
EpssDelta = change.NewScore - (change.PreviousScore ?? 0),
|
||||
EpssDelta = delta,
|
||||
Percentile = change.NewPercentile,
|
||||
IsModelChange = isModelChange,
|
||||
ModelVersion = modelVersion,
|
||||
DedupeKey = dedupeKey,
|
||||
@@ -387,45 +437,44 @@ public sealed class EpssSignalJob : BackgroundService
|
||||
return signals;
|
||||
}
|
||||
|
||||
private static string? DetermineEventType(EpssChangeRecord change)
|
||||
private static string? DetermineEventType(EpssChangeRecord change, EpssPriorityBand newBand)
|
||||
{
|
||||
if (change.Flags.HasFlag(EpssChangeFlags.NewScored))
|
||||
{
|
||||
return EpssSignalEventTypes.NewHigh;
|
||||
}
|
||||
|
||||
if (change.Flags.HasFlag(EpssChangeFlags.CrossedHigh))
|
||||
{
|
||||
return EpssSignalEventTypes.BandChange;
|
||||
}
|
||||
|
||||
if (change.Flags.HasFlag(EpssChangeFlags.BigJumpUp))
|
||||
{
|
||||
return EpssSignalEventTypes.RiskSpike;
|
||||
}
|
||||
|
||||
if (change.Flags.HasFlag(EpssChangeFlags.BigJumpDown))
|
||||
if (change.Flags.HasFlag(EpssChangeFlags.BigJumpDown) || change.Flags.HasFlag(EpssChangeFlags.CrossedLow))
|
||||
{
|
||||
return EpssSignalEventTypes.DroppedLow;
|
||||
}
|
||||
|
||||
if (change.PreviousBand != newBand || change.Flags.HasFlag(EpssChangeFlags.CrossedHigh))
|
||||
{
|
||||
return EpssSignalEventTypes.BandChange;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private static EpssPriorityBand ComputeNewBand(EpssChangeRecord change)
|
||||
private static EpssPriorityBand ComputeNewBand(double percentile)
|
||||
{
|
||||
// Simplified band calculation - would use EpssPriorityCalculator in production
|
||||
if (change.NewScore >= 0.5)
|
||||
if (percentile >= 0.995)
|
||||
{
|
||||
return EpssPriorityBand.Critical;
|
||||
}
|
||||
|
||||
if (change.NewScore >= 0.2)
|
||||
if (percentile >= 0.99)
|
||||
{
|
||||
return EpssPriorityBand.High;
|
||||
}
|
||||
|
||||
if (change.NewScore >= 0.05)
|
||||
if (percentile >= 0.90)
|
||||
{
|
||||
return EpssPriorityBand.Medium;
|
||||
}
|
||||
@@ -443,18 +492,17 @@ public sealed class EpssSignalJob : BackgroundService
|
||||
|
||||
private async Task<string?> GetCurrentModelVersionAsync(DateOnly modelDate, CancellationToken cancellationToken)
|
||||
{
|
||||
// Would query from epss_import_run or epss_raw table
|
||||
// For now, return a placeholder based on date
|
||||
return $"v{modelDate:yyyy.MM.dd}";
|
||||
var run = await _epssRepository.GetImportRunAsync(modelDate, cancellationToken).ConfigureAwait(false);
|
||||
return string.IsNullOrWhiteSpace(run?.ModelVersionTag)
|
||||
? $"v{modelDate:yyyy.MM.dd}"
|
||||
: run.ModelVersionTag;
|
||||
}
|
||||
|
||||
private async Task<IReadOnlyList<EpssChangeRecord>> GetEpssChangesAsync(
|
||||
DateOnly modelDate,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
// TODO: Implement repository method to get changes from epss_changes table
|
||||
// For now, return empty list
|
||||
return Array.Empty<EpssChangeRecord>();
|
||||
return await _epssRepository.GetChangesAsync(modelDate, flags: null, limit: 200000, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task EmitModelUpdatedSignalAsync(
|
||||
@@ -462,6 +510,7 @@ public sealed class EpssSignalJob : BackgroundService
|
||||
DateOnly modelDate,
|
||||
string oldVersion,
|
||||
string newVersion,
|
||||
bool suppressedSignals,
|
||||
int affectedCveCount,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
@@ -470,7 +519,7 @@ public sealed class EpssSignalJob : BackgroundService
|
||||
oldVersion,
|
||||
newVersion,
|
||||
affectedCveCount,
|
||||
suppressedSignals = true
|
||||
suppressedSignals
|
||||
});
|
||||
|
||||
var signal = new EpssSignal
|
||||
|
||||
@@ -119,6 +119,19 @@ if (!string.IsNullOrWhiteSpace(connectionString))
|
||||
.BindConfiguration(EpssIngestOptions.SectionName)
|
||||
.ValidateOnStart();
|
||||
builder.Services.AddHostedService<EpssIngestJob>();
|
||||
|
||||
// EPSS live enrichment + signals (Sprint: SPRINT_3413_0001_0001)
|
||||
builder.Services.AddOptions<EpssEnrichmentOptions>()
|
||||
.BindConfiguration(EpssEnrichmentOptions.SectionName)
|
||||
.ValidateOnStart();
|
||||
builder.Services.AddSingleton<EpssEnrichmentJob>();
|
||||
builder.Services.AddHostedService(sp => sp.GetRequiredService<EpssEnrichmentJob>());
|
||||
|
||||
builder.Services.AddOptions<EpssSignalOptions>()
|
||||
.BindConfiguration(EpssSignalOptions.SectionName)
|
||||
.ValidateOnStart();
|
||||
builder.Services.AddSingleton<EpssSignalJob>();
|
||||
builder.Services.AddHostedService(sp => sp.GetRequiredService<EpssSignalJob>());
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user