Expand advisory source catalog to 75 sources and add mirror management backend
Source catalog: add 28 sources across 6 new categories (Exploit, Container, Hardware, ICS, PackageManager, additional CERTs) plus RU/CIS promotion and threat intel frameworks. Register 25 new HTTP clients. Source management API: 9 endpoints under /api/v1/sources for catalog browsing, connectivity checks, and enable/disable controls. Mirror domain API: 12 endpoints under /api/v1/mirror for domain CRUD, export management, on-demand bundle generation, and connectivity testing. Filter model: multi-value sourceVendor (comma-separated OR), sourceCategory and sourceTag shorthand resolution via ResolveFilters(). Backward-compatible with existing single-value filters. Deterministic query signatures. Mirror export scheduler: BackgroundService with configurable refresh interval, per-domain staleness detection, error isolation, and air-gap disable toggle. VEX ingestion backoff: exponential backoff for failed sources (1hr → 24hr cap) with jitter. New DB migration for tracking columns. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -49,6 +49,15 @@ public interface IVexSourceRepository
|
||||
string? errorMessage = null,
|
||||
CancellationToken cancellationToken = default);
|
||||
|
||||
/// <summary>
|
||||
/// Updates failure tracking fields for exponential backoff.
|
||||
/// </summary>
|
||||
Task UpdateFailureTrackingAsync(
|
||||
string sourceId,
|
||||
int consecutiveFailures,
|
||||
DateTimeOffset? nextEligiblePollAt,
|
||||
CancellationToken cancellationToken = default);
|
||||
|
||||
/// <summary>
|
||||
/// Deletes a source by its ID.
|
||||
/// </summary>
|
||||
|
||||
@@ -68,9 +68,28 @@ public sealed class VexIngestionScheduler : BackgroundService
|
||||
return;
|
||||
}
|
||||
|
||||
_logger.LogInformation("Found {Count} sources due for polling", dueSources.Count);
|
||||
var utcNow = DateTimeOffset.UtcNow;
|
||||
var eligibleSources = dueSources
|
||||
.Where(s => s.NextEligiblePollAt == null || s.NextEligiblePollAt <= utcNow)
|
||||
.ToList();
|
||||
|
||||
var tasks = dueSources.Select(source => PollSourceWithThrottlingAsync(source, cancellationToken));
|
||||
var skippedCount = dueSources.Count - eligibleSources.Count;
|
||||
if (skippedCount > 0)
|
||||
{
|
||||
_logger.LogInformation(
|
||||
"Skipped {SkippedCount} sources due to backoff. {EligibleCount} sources eligible for polling",
|
||||
skippedCount, eligibleSources.Count);
|
||||
}
|
||||
|
||||
if (eligibleSources.Count == 0)
|
||||
{
|
||||
_logger.LogDebug("No eligible sources after backoff filtering");
|
||||
return;
|
||||
}
|
||||
|
||||
_logger.LogInformation("Found {Count} sources due for polling", eligibleSources.Count);
|
||||
|
||||
var tasks = eligibleSources.Select(source => PollSourceWithThrottlingAsync(source, cancellationToken));
|
||||
await Task.WhenAll(tasks);
|
||||
}
|
||||
|
||||
@@ -100,6 +119,13 @@ public sealed class VexIngestionScheduler : BackgroundService
|
||||
DateTimeOffset.UtcNow,
|
||||
null,
|
||||
cancellationToken);
|
||||
|
||||
// Reset backoff on success
|
||||
if (source.ConsecutiveFailures > 0)
|
||||
{
|
||||
await _sourceRepository.UpdateFailureTrackingAsync(
|
||||
source.SourceId, 0, null, cancellationToken);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -113,6 +139,23 @@ public sealed class VexIngestionScheduler : BackgroundService
|
||||
DateTimeOffset.UtcNow,
|
||||
result.ErrorMessage,
|
||||
cancellationToken);
|
||||
|
||||
var failures = source.ConsecutiveFailures + 1;
|
||||
var backoff = _options.SourceBackoff;
|
||||
var rawBackoff = backoff.InitialBackoffSeconds * Math.Pow(backoff.BackoffMultiplier, failures - 1);
|
||||
var capped = Math.Min(rawBackoff, backoff.MaxBackoffSeconds);
|
||||
var jittered = capped + capped * backoff.JitterFactor * (Random.Shared.NextDouble() * 2 - 1);
|
||||
var nextEligible = DateTimeOffset.UtcNow.AddSeconds(jittered);
|
||||
|
||||
if (failures >= backoff.MaxConsecutiveFailures)
|
||||
{
|
||||
_logger.LogWarning(
|
||||
"Source {SourceId} has failed {Failures} consecutive times. Next retry at {NextEligible}",
|
||||
source.SourceId, failures, nextEligible);
|
||||
}
|
||||
|
||||
await _sourceRepository.UpdateFailureTrackingAsync(
|
||||
source.SourceId, failures, nextEligible, cancellationToken);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
@@ -124,6 +167,23 @@ public sealed class VexIngestionScheduler : BackgroundService
|
||||
DateTimeOffset.UtcNow,
|
||||
ex.Message,
|
||||
cancellationToken);
|
||||
|
||||
var failures = source.ConsecutiveFailures + 1;
|
||||
var backoff = _options.SourceBackoff;
|
||||
var rawBackoff = backoff.InitialBackoffSeconds * Math.Pow(backoff.BackoffMultiplier, failures - 1);
|
||||
var capped = Math.Min(rawBackoff, backoff.MaxBackoffSeconds);
|
||||
var jittered = capped + capped * backoff.JitterFactor * (Random.Shared.NextDouble() * 2 - 1);
|
||||
var nextEligible = DateTimeOffset.UtcNow.AddSeconds(jittered);
|
||||
|
||||
if (failures >= backoff.MaxConsecutiveFailures)
|
||||
{
|
||||
_logger.LogWarning(
|
||||
"Source {SourceId} has failed {Failures} consecutive times. Next retry at {NextEligible}",
|
||||
source.SourceId, failures, nextEligible);
|
||||
}
|
||||
|
||||
await _sourceRepository.UpdateFailureTrackingAsync(
|
||||
source.SourceId, failures, nextEligible, cancellationToken);
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
||||
@@ -68,6 +68,12 @@ public sealed record VexSource
|
||||
/// When the source configuration was last updated.
|
||||
/// </summary>
|
||||
public DateTimeOffset? UpdatedAt { get; init; }
|
||||
|
||||
/// <summary>Number of consecutive polling failures.</summary>
|
||||
public int ConsecutiveFailures { get; init; }
|
||||
|
||||
/// <summary>Next eligible poll time after backoff. Null means eligible immediately.</summary>
|
||||
public DateTimeOffset? NextEligiblePollAt { get; init; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -71,6 +71,32 @@ public sealed class VexHubOptions
|
||||
/// Configuration for distribution/export behavior.
|
||||
/// </summary>
|
||||
public DistributionOptions Distribution { get; set; } = new();
|
||||
|
||||
/// <summary>
|
||||
/// Configuration for source failure backoff behavior.
|
||||
/// </summary>
|
||||
public SourceBackoffOptions SourceBackoff { get; set; } = new();
|
||||
|
||||
/// <summary>
|
||||
/// Configuration for source failure backoff behavior.
|
||||
/// </summary>
|
||||
public sealed class SourceBackoffOptions
|
||||
{
|
||||
/// <summary>Initial backoff in seconds after first failure.</summary>
|
||||
public int InitialBackoffSeconds { get; set; } = 3600;
|
||||
|
||||
/// <summary>Maximum backoff in seconds.</summary>
|
||||
public int MaxBackoffSeconds { get; set; } = 86400;
|
||||
|
||||
/// <summary>Multiplier for each consecutive failure.</summary>
|
||||
public double BackoffMultiplier { get; set; } = 2.0;
|
||||
|
||||
/// <summary>Jitter factor to avoid thundering herd.</summary>
|
||||
public double JitterFactor { get; set; } = 0.1;
|
||||
|
||||
/// <summary>Max failures before logging a warning (source stays backed off, not disabled).</summary>
|
||||
public int MaxConsecutiveFailures { get; set; } = 10;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -0,0 +1,7 @@
|
||||
-- Migration: 002_add_source_backoff_columns
|
||||
-- Sprint: Advisory & VEX Source Management
|
||||
-- Adds failure tracking columns for exponential backoff
|
||||
|
||||
ALTER TABLE vexhub.vex_sources
|
||||
ADD COLUMN IF NOT EXISTS consecutive_failures INT NOT NULL DEFAULT 0,
|
||||
ADD COLUMN IF NOT EXISTS next_eligible_poll_at TIMESTAMPTZ NULL;
|
||||
Reference in New Issue
Block a user