// ----------------------------------------------------------------------------- // EpssIngestJob.cs // Sprint: SPRINT_3410_0001_0001_epss_ingestion_storage // Task: EPSS-3410-009 // Description: Background job that ingests EPSS data from online or bundle sources. // ----------------------------------------------------------------------------- using System.Diagnostics; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StellaOps.Scanner.Storage.Epss; using StellaOps.Scanner.Storage.Repositories; namespace StellaOps.Scanner.Worker.Processing; /// /// Options for the EPSS ingestion job. /// public sealed class EpssIngestOptions { /// /// Configuration section name. /// public const string SectionName = "Epss:Ingest"; /// /// Whether the job is enabled. Default: true. /// public bool Enabled { get; set; } = true; /// /// Cron schedule for EPSS ingestion. Default: "0 5 0 * * *" (00:05 UTC daily). /// public string Schedule { get; set; } = "0 5 0 * * *"; /// /// Source type: "online" or "bundle". Default: "online". /// public string SourceType { get; set; } = "online"; /// /// Bundle path for air-gapped ingestion (when SourceType is "bundle"). /// public string? BundlePath { get; set; } /// /// Initial delay before first run. Default: 30 seconds. /// public TimeSpan InitialDelay { get; set; } = TimeSpan.FromSeconds(30); /// /// Retry delay on failure. Default: 5 minutes. /// public TimeSpan RetryDelay { get; set; } = TimeSpan.FromMinutes(5); /// /// Maximum retry attempts. Default: 3. /// public int MaxRetries { get; set; } = 3; } /// /// Background service that ingests EPSS data on a schedule. /// Supports online (FIRST.org) and offline (bundle) sources. /// public sealed class EpssIngestJob : BackgroundService { private readonly IEpssRepository _repository; private readonly EpssOnlineSource _onlineSource; private readonly EpssBundleSource _bundleSource; private readonly EpssCsvStreamParser _parser; private readonly IOptions _options; private readonly TimeProvider _timeProvider; private readonly ILogger _logger; private readonly ActivitySource _activitySource = new("StellaOps.Scanner.EpssIngest"); public EpssIngestJob( IEpssRepository repository, EpssOnlineSource onlineSource, EpssBundleSource bundleSource, EpssCsvStreamParser parser, IOptions options, TimeProvider timeProvider, ILogger logger) { _repository = repository ?? throw new ArgumentNullException(nameof(repository)); _onlineSource = onlineSource ?? throw new ArgumentNullException(nameof(onlineSource)); _bundleSource = bundleSource ?? throw new ArgumentNullException(nameof(bundleSource)); _parser = parser ?? throw new ArgumentNullException(nameof(parser)); _options = options ?? throw new ArgumentNullException(nameof(options)); _timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("EPSS ingest job started"); var opts = _options.Value; if (!opts.Enabled) { _logger.LogInformation("EPSS ingest job is disabled"); return; } // Initial delay to let the system stabilize await Task.Delay(opts.InitialDelay, stoppingToken); while (!stoppingToken.IsCancellationRequested) { var now = _timeProvider.GetUtcNow(); var nextRun = ComputeNextRun(now, opts.Schedule); var delay = nextRun - now; if (delay > TimeSpan.Zero) { _logger.LogDebug("EPSS ingest job waiting until {NextRun}", nextRun); await Task.Delay(delay, stoppingToken); } if (stoppingToken.IsCancellationRequested) { break; } await RunIngestionWithRetryAsync(stoppingToken); } _logger.LogInformation("EPSS ingest job stopped"); } /// /// Runs ingestion for a specific date. Used by tests and manual triggers. /// public async Task IngestAsync(DateOnly modelDate, CancellationToken cancellationToken = default) { using var activity = _activitySource.StartActivity("epss.ingest", ActivityKind.Internal); activity?.SetTag("epss.model_date", modelDate.ToString("yyyy-MM-dd")); var opts = _options.Value; var stopwatch = Stopwatch.StartNew(); _logger.LogInformation("Starting EPSS ingestion for {ModelDate}", modelDate); try { // Get source based on configuration IEpssSource source = opts.SourceType.Equals("bundle", StringComparison.OrdinalIgnoreCase) ? _bundleSource : _onlineSource; // Retrieve the EPSS file var sourceFile = await source.GetAsync(modelDate, cancellationToken).ConfigureAwait(false); _logger.LogInformation( "Retrieved EPSS file from {SourceUri}, size={Size}", sourceFile.SourceUri, sourceFile.Content.Length); // Begin import run var importRun = await _repository.BeginImportAsync( modelDate, sourceFile.SourceUri, _timeProvider.GetUtcNow(), sourceFile.FileSha256, cancellationToken).ConfigureAwait(false); _logger.LogDebug("Created import run {ImportRunId}", importRun.ImportRunId); try { // Parse and write snapshot await using var stream = new MemoryStream(sourceFile.Content); var session = _parser.ParseGzip(stream); var writeResult = await _repository.WriteSnapshotAsync( importRun.ImportRunId, modelDate, _timeProvider.GetUtcNow(), session, cancellationToken).ConfigureAwait(false); // Mark success await _repository.MarkImportSucceededAsync( importRun.ImportRunId, session.RowCount, session.DecompressedSha256, session.ModelVersionTag, session.PublishedDate, cancellationToken).ConfigureAwait(false); stopwatch.Stop(); _logger.LogInformation( "EPSS ingestion completed: modelDate={ModelDate}, rows={RowCount}, cves={CveCount}, duration={Duration}ms", modelDate, writeResult.RowCount, writeResult.DistinctCveCount, stopwatch.ElapsedMilliseconds); activity?.SetTag("epss.row_count", writeResult.RowCount); activity?.SetTag("epss.cve_count", writeResult.DistinctCveCount); activity?.SetTag("epss.duration_ms", stopwatch.ElapsedMilliseconds); } catch (Exception ex) { await _repository.MarkImportFailedAsync( importRun.ImportRunId, ex.Message, cancellationToken).ConfigureAwait(false); throw; } } catch (Exception ex) { _logger.LogError(ex, "EPSS ingestion failed for {ModelDate}", modelDate); activity?.SetStatus(ActivityStatusCode.Error, ex.Message); throw; } } private async Task RunIngestionWithRetryAsync(CancellationToken cancellationToken) { var opts = _options.Value; var modelDate = DateOnly.FromDateTime(_timeProvider.GetUtcNow().UtcDateTime); for (var attempt = 1; attempt <= opts.MaxRetries; attempt++) { try { await IngestAsync(modelDate, cancellationToken); return; } catch (Exception ex) when (attempt < opts.MaxRetries) { _logger.LogWarning( ex, "EPSS ingestion attempt {Attempt}/{MaxRetries} failed, retrying in {RetryDelay}", attempt, opts.MaxRetries, opts.RetryDelay); await Task.Delay(opts.RetryDelay, cancellationToken); } catch (Exception ex) { _logger.LogError( ex, "EPSS ingestion failed after {MaxRetries} attempts", opts.MaxRetries); } } } private static DateTimeOffset ComputeNextRun(DateTimeOffset now, string cronSchedule) { // Simple cron parser for "0 5 0 * * *" (seconds minutes hours day month dayOfWeek) // For MVP, we just schedule for 00:05 UTC the next day var today = now.UtcDateTime.Date; var scheduledTime = today.AddMinutes(5); if (now.UtcDateTime > scheduledTime) { scheduledTime = scheduledTime.AddDays(1); } return new DateTimeOffset(scheduledTime, TimeSpan.Zero); } }