Files
git.stella-ops.org/src/Scanner/StellaOps.Scanner.Worker/Processing/EpssIngestJob.cs
master 0dc71e760a feat: Add PathViewer and RiskDriftCard components with templates and styles
- Implemented PathViewerComponent for visualizing reachability call paths.
- Added RiskDriftCardComponent to display reachability drift results.
- Created corresponding HTML templates and SCSS styles for both components.
- Introduced test fixtures for reachability analysis in JSON format.
- Enhanced user interaction with collapsible and expandable features in PathViewer.
- Included risk trend visualization and summary metrics in RiskDriftCard.
2025-12-18 18:35:30 +02:00

363 lines
13 KiB
C#

// -----------------------------------------------------------------------------
// EpssIngestJob.cs
// Sprint: SPRINT_3410_0001_0001_epss_ingestion_storage
// Task: EPSS-3410-009
// Description: Background job that ingests EPSS data from online or bundle sources.
// -----------------------------------------------------------------------------
using System.Diagnostics;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StellaOps.Scanner.Storage.Epss;
using StellaOps.Scanner.Storage.Repositories;
namespace StellaOps.Scanner.Worker.Processing;
/// <summary>
/// Options for the EPSS ingestion job.
/// </summary>
public sealed class EpssIngestOptions
{
/// <summary>
/// Configuration section name.
/// </summary>
public const string SectionName = "Epss:Ingest";
/// <summary>
/// Whether the job is enabled. Default: true.
/// </summary>
public bool Enabled { get; set; } = true;
/// <summary>
/// Cron schedule for EPSS ingestion. Default: "0 5 0 * * *" (00:05 UTC daily).
/// </summary>
public string Schedule { get; set; } = "0 5 0 * * *";
/// <summary>
/// Source type: "online" or "bundle". Default: "online".
/// </summary>
public string SourceType { get; set; } = "online";
/// <summary>
/// Bundle path for air-gapped ingestion (when SourceType is "bundle").
/// </summary>
public string? BundlePath { get; set; }
/// <summary>
/// Initial delay before first run. Default: 30 seconds.
/// </summary>
public TimeSpan InitialDelay { get; set; } = TimeSpan.FromSeconds(30);
/// <summary>
/// Retry delay on failure. Default: 5 minutes.
/// </summary>
public TimeSpan RetryDelay { get; set; } = TimeSpan.FromMinutes(5);
/// <summary>
/// Maximum retry attempts. Default: 3.
/// </summary>
public int MaxRetries { get; set; } = 3;
}
/// <summary>
/// Background service that ingests EPSS data on a schedule.
/// Supports online (FIRST.org) and offline (bundle) sources.
/// </summary>
public sealed class EpssIngestJob : BackgroundService
{
private readonly IEpssRepository _repository;
private readonly IEpssRawRepository? _rawRepository;
private readonly EpssOnlineSource _onlineSource;
private readonly EpssBundleSource _bundleSource;
private readonly EpssCsvStreamParser _parser;
private readonly IOptions<EpssIngestOptions> _options;
private readonly TimeProvider _timeProvider;
private readonly ILogger<EpssIngestJob> _logger;
private readonly ActivitySource _activitySource = new("StellaOps.Scanner.EpssIngest");
public EpssIngestJob(
IEpssRepository repository,
EpssOnlineSource onlineSource,
EpssBundleSource bundleSource,
EpssCsvStreamParser parser,
IOptions<EpssIngestOptions> options,
TimeProvider timeProvider,
ILogger<EpssIngestJob> logger,
IEpssRawRepository? rawRepository = null)
{
_repository = repository ?? throw new ArgumentNullException(nameof(repository));
_rawRepository = rawRepository; // Optional - raw storage for replay capability
_onlineSource = onlineSource ?? throw new ArgumentNullException(nameof(onlineSource));
_bundleSource = bundleSource ?? throw new ArgumentNullException(nameof(bundleSource));
_parser = parser ?? throw new ArgumentNullException(nameof(parser));
_options = options ?? throw new ArgumentNullException(nameof(options));
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("EPSS ingest job started");
var opts = _options.Value;
if (!opts.Enabled)
{
_logger.LogInformation("EPSS ingest job is disabled");
return;
}
// Initial delay to let the system stabilize
await Task.Delay(opts.InitialDelay, stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
var now = _timeProvider.GetUtcNow();
var nextRun = ComputeNextRun(now, opts.Schedule);
var delay = nextRun - now;
if (delay > TimeSpan.Zero)
{
_logger.LogDebug("EPSS ingest job waiting until {NextRun}", nextRun);
await Task.Delay(delay, stoppingToken);
}
if (stoppingToken.IsCancellationRequested)
{
break;
}
await RunIngestionWithRetryAsync(stoppingToken);
}
_logger.LogInformation("EPSS ingest job stopped");
}
/// <summary>
/// Runs ingestion for a specific date. Used by tests and manual triggers.
/// </summary>
public async Task IngestAsync(DateOnly modelDate, CancellationToken cancellationToken = default)
{
using var activity = _activitySource.StartActivity("epss.ingest", ActivityKind.Internal);
activity?.SetTag("epss.model_date", modelDate.ToString("yyyy-MM-dd"));
var opts = _options.Value;
var stopwatch = Stopwatch.StartNew();
_logger.LogInformation("Starting EPSS ingestion for {ModelDate}", modelDate);
try
{
// Get source based on configuration
IEpssSource source = opts.SourceType.Equals("bundle", StringComparison.OrdinalIgnoreCase)
? _bundleSource
: _onlineSource;
// Retrieve the EPSS file
await using var sourceFile = await source.GetAsync(modelDate, cancellationToken).ConfigureAwait(false);
// Read file content and compute hash
var fileContent = await File.ReadAllBytesAsync(sourceFile.LocalPath, cancellationToken).ConfigureAwait(false);
var fileSha256 = ComputeSha256(fileContent);
_logger.LogInformation(
"Retrieved EPSS file from {SourceUri}, size={Size}",
sourceFile.SourceUri,
fileContent.Length);
// Begin import run
var importRun = await _repository.BeginImportAsync(
modelDate,
sourceFile.SourceUri,
_timeProvider.GetUtcNow(),
fileSha256,
cancellationToken).ConfigureAwait(false);
_logger.LogDebug("Created import run {ImportRunId}", importRun.ImportRunId);
try
{
// Parse and write snapshot
await using var stream = new MemoryStream(fileContent);
var session = _parser.ParseGzip(stream);
var writeResult = await _repository.WriteSnapshotAsync(
importRun.ImportRunId,
modelDate,
_timeProvider.GetUtcNow(),
session,
cancellationToken).ConfigureAwait(false);
// Store raw payload for replay capability (Sprint: SPRINT_3413_0001_0001, Task: R2)
if (_rawRepository is not null)
{
await StoreRawPayloadAsync(
importRun.ImportRunId,
sourceFile.SourceUri,
modelDate,
session,
fileContent.Length,
cancellationToken).ConfigureAwait(false);
}
// Mark success
await _repository.MarkImportSucceededAsync(
importRun.ImportRunId,
session.RowCount,
session.DecompressedSha256,
session.ModelVersionTag,
session.PublishedDate,
cancellationToken).ConfigureAwait(false);
stopwatch.Stop();
_logger.LogInformation(
"EPSS ingestion completed: modelDate={ModelDate}, rows={RowCount}, cves={CveCount}, duration={Duration}ms",
modelDate,
writeResult.RowCount,
writeResult.DistinctCveCount,
stopwatch.ElapsedMilliseconds);
activity?.SetTag("epss.row_count", writeResult.RowCount);
activity?.SetTag("epss.cve_count", writeResult.DistinctCveCount);
activity?.SetTag("epss.duration_ms", stopwatch.ElapsedMilliseconds);
}
catch (Exception ex)
{
await _repository.MarkImportFailedAsync(
importRun.ImportRunId,
ex.Message,
cancellationToken).ConfigureAwait(false);
throw;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "EPSS ingestion failed for {ModelDate}", modelDate);
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
throw;
}
}
private async Task RunIngestionWithRetryAsync(CancellationToken cancellationToken)
{
var opts = _options.Value;
var modelDate = DateOnly.FromDateTime(_timeProvider.GetUtcNow().UtcDateTime);
for (var attempt = 1; attempt <= opts.MaxRetries; attempt++)
{
try
{
await IngestAsync(modelDate, cancellationToken);
return;
}
catch (Exception ex) when (attempt < opts.MaxRetries)
{
_logger.LogWarning(
ex,
"EPSS ingestion attempt {Attempt}/{MaxRetries} failed, retrying in {RetryDelay}",
attempt,
opts.MaxRetries,
opts.RetryDelay);
await Task.Delay(opts.RetryDelay, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(
ex,
"EPSS ingestion failed after {MaxRetries} attempts",
opts.MaxRetries);
}
}
}
private static DateTimeOffset ComputeNextRun(DateTimeOffset now, string cronSchedule)
{
// Simple cron parser for "0 5 0 * * *" (seconds minutes hours day month dayOfWeek)
// For MVP, we just schedule for 00:05 UTC the next day
var today = now.UtcDateTime.Date;
var scheduledTime = today.AddMinutes(5);
if (now.UtcDateTime > scheduledTime)
{
scheduledTime = scheduledTime.AddDays(1);
}
return new DateTimeOffset(scheduledTime, TimeSpan.Zero);
}
private static string ComputeSha256(byte[] content)
{
var hash = System.Security.Cryptography.SHA256.HashData(content);
return Convert.ToHexString(hash).ToLowerInvariant();
}
/// <summary>
/// Stores raw EPSS payload for deterministic replay capability.
/// Sprint: SPRINT_3413_0001_0001, Task: R2
/// </summary>
private async Task StoreRawPayloadAsync(
Guid importRunId,
string sourceUri,
DateOnly modelDate,
EpssParsedSession session,
long compressedSize,
CancellationToken cancellationToken)
{
if (_rawRepository is null)
{
return;
}
try
{
// Convert parsed rows to JSON array for raw storage
var payload = System.Text.Json.JsonSerializer.Serialize(
session.Rows.Select(r => new
{
cve = r.CveId,
epss = r.Score,
percentile = r.Percentile
}),
new System.Text.Json.JsonSerializerOptions { WriteIndented = false });
var payloadBytes = System.Text.Encoding.UTF8.GetBytes(payload);
var payloadSha256 = System.Security.Cryptography.SHA256.HashData(payloadBytes);
var raw = new EpssRaw
{
SourceUri = sourceUri,
AsOfDate = modelDate,
Payload = payload,
PayloadSha256 = payloadSha256,
HeaderComment = session.HeaderComment,
ModelVersion = session.ModelVersionTag,
PublishedDate = session.PublishedDate,
RowCount = session.RowCount,
CompressedSize = compressedSize,
DecompressedSize = payloadBytes.LongLength,
ImportRunId = importRunId
};
await _rawRepository.CreateAsync(raw, cancellationToken).ConfigureAwait(false);
_logger.LogDebug(
"Stored raw EPSS payload: modelDate={ModelDate}, rows={RowCount}, size={Size}",
modelDate,
session.RowCount,
payloadBytes.Length);
}
catch (Exception ex)
{
// Log but don't fail ingestion if raw storage fails
_logger.LogWarning(
ex,
"Failed to store raw EPSS payload for {ModelDate}; ingestion will continue",
modelDate);
}
}
}