tests fixes and sprints work

This commit is contained in:
master
2026-01-22 19:08:46 +02:00
parent c32fff8f86
commit 726d70dc7f
881 changed files with 134434 additions and 6228 deletions

View File

@@ -0,0 +1,64 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
using StellaOps.Platform.Analytics.Options;
namespace StellaOps.Platform.Analytics.Services;
public sealed class AnalyticsIngestionDataSource : IAsyncDisposable
{
private readonly ILogger<AnalyticsIngestionDataSource> _logger;
private readonly string? _connectionString;
private NpgsqlDataSource? _dataSource;
public AnalyticsIngestionDataSource(
IOptions<AnalyticsIngestionOptions> options,
ILogger<AnalyticsIngestionDataSource> logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_connectionString = options?.Value.PostgresConnectionString;
}
public bool IsConfigured => !string.IsNullOrWhiteSpace(_connectionString);
public async Task<NpgsqlConnection?> OpenConnectionAsync(CancellationToken cancellationToken)
{
if (!IsConfigured)
{
return null;
}
_dataSource ??= new NpgsqlDataSourceBuilder(_connectionString!)
{
Name = "StellaOps.Platform.Analytics.Ingestion"
}.Build();
var connection = await _dataSource.OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
await ConfigureSessionAsync(connection, cancellationToken).ConfigureAwait(false);
return connection;
}
public async ValueTask DisposeAsync()
{
if (_dataSource is null)
{
return;
}
await _dataSource.DisposeAsync().ConfigureAwait(false);
}
private async Task ConfigureSessionAsync(NpgsqlConnection connection, CancellationToken cancellationToken)
{
await using var tzCommand = new NpgsqlCommand("SET TIME ZONE 'UTC';", connection);
await tzCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
await using var schemaCommand = new NpgsqlCommand("SET search_path TO analytics, public;", connection);
await schemaCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
_logger.LogDebug("Configured analytics ingestion session for PostgreSQL connection.");
}
}

View File

@@ -0,0 +1,877 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
using NpgsqlTypes;
using StellaOps.Concelier.SbomIntegration.Models;
using StellaOps.Concelier.SbomIntegration.Parsing;
using StellaOps.Messaging;
using StellaOps.Messaging.Abstractions;
using StellaOps.Platform.Analytics.Models;
using StellaOps.Platform.Analytics.Options;
using StellaOps.Platform.Analytics.Utilities;
using StellaOps.Scanner.Surface.FS;
namespace StellaOps.Platform.Analytics.Services;
public sealed class AnalyticsIngestionService : BackgroundService
{
private readonly AnalyticsIngestionOptions _options;
private readonly AnalyticsIngestionDataSource _dataSource;
private readonly ICasContentReader _casReader;
private readonly IParsedSbomParser _sbomParser;
private readonly IVulnerabilityCorrelationService? _correlationService;
private readonly ILogger<AnalyticsIngestionService> _logger;
private readonly IEventStream<OrchestratorEventEnvelope>? _eventStream;
private readonly JsonSerializerOptions _jsonOptions = new()
{
PropertyNameCaseInsensitive = true
};
public AnalyticsIngestionService(
IOptions<AnalyticsIngestionOptions> options,
AnalyticsIngestionDataSource dataSource,
ICasContentReader casReader,
IParsedSbomParser sbomParser,
ILogger<AnalyticsIngestionService> logger,
IEventStreamFactory? eventStreamFactory = null,
IVulnerabilityCorrelationService? correlationService = null)
{
_options = options?.Value ?? new AnalyticsIngestionOptions();
_options.Normalize();
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
_casReader = casReader ?? throw new ArgumentNullException(nameof(casReader));
_sbomParser = sbomParser ?? throw new ArgumentNullException(nameof(sbomParser));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_correlationService = correlationService;
if (eventStreamFactory is not null && !string.IsNullOrWhiteSpace(_options.Streams.ScannerStream))
{
_eventStream = eventStreamFactory.Create<OrchestratorEventEnvelope>(new EventStreamOptions
{
StreamName = _options.Streams.ScannerStream
});
}
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (!_options.Enabled)
{
_logger.LogInformation("Analytics ingestion disabled by configuration.");
return;
}
if (_eventStream is null)
{
_logger.LogWarning("Analytics ingestion disabled: no event stream configured.");
return;
}
var position = _options.Streams.StartFromBeginning
? StreamPosition.Beginning
: StreamPosition.End;
_logger.LogInformation(
"Analytics ingestion started; subscribing to {StreamName} from {Position}.",
_eventStream.StreamName,
position.Value);
try
{
await foreach (var streamEvent in _eventStream.SubscribeAsync(position, stoppingToken))
{
await HandleEventAsync(streamEvent.Event, stoppingToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("Analytics ingestion stopped.");
}
catch (Exception ex)
{
_logger.LogError(ex, "Analytics ingestion failed.");
throw;
}
}
private async Task HandleEventAsync(OrchestratorEventEnvelope envelope, CancellationToken cancellationToken)
{
if (!string.Equals(envelope.Kind, OrchestratorEventKinds.ScannerReportReady, StringComparison.OrdinalIgnoreCase))
{
return;
}
if (!IsTenantAllowed(envelope.Tenant))
{
_logger.LogDebug("Skipping scanner event {EventId}; tenant {Tenant} not allowed.", envelope.EventId, envelope.Tenant);
return;
}
if (envelope.Payload is null || envelope.Payload.Value.ValueKind == JsonValueKind.Undefined)
{
_logger.LogWarning("Scanner report event {EventId} missing payload.", envelope.EventId);
return;
}
ReportReadyEventPayload? payload;
try
{
payload = envelope.Payload.Value.Deserialize<ReportReadyEventPayload>(_jsonOptions);
}
catch (JsonException ex)
{
_logger.LogWarning(ex, "Failed to parse scanner report payload for event {EventId}.", envelope.EventId);
return;
}
if (payload is null)
{
_logger.LogWarning("Scanner report payload empty for event {EventId}.", envelope.EventId);
return;
}
await IngestSbomAsync(envelope, payload, cancellationToken).ConfigureAwait(false);
}
private async Task IngestSbomAsync(
OrchestratorEventEnvelope envelope,
ReportReadyEventPayload payload,
CancellationToken cancellationToken)
{
var surface = payload.Report.Surface;
var manifest = await ResolveManifestAsync(surface, cancellationToken).ConfigureAwait(false);
if (manifest is null)
{
_logger.LogWarning("Scanner report {ReportId} missing surface manifest.", payload.ReportId);
return;
}
var sbomArtifact = SelectSbomArtifact(manifest.Artifacts);
if (sbomArtifact is null)
{
_logger.LogWarning("Scanner report {ReportId} contains no SBOM artifacts.", payload.ReportId);
return;
}
var sbomContent = await ReadContentAsync(sbomArtifact.Uri, cancellationToken).ConfigureAwait(false);
if (sbomContent is null)
{
_logger.LogWarning("Failed to read SBOM content for report {ReportId}.", payload.ReportId);
return;
}
var sbomFormat = ResolveSbomFormat(sbomArtifact);
ParsedSbom parsedSbom;
await using (var sbomStream = new MemoryStream(sbomContent.Bytes, writable: false))
{
parsedSbom = await _sbomParser.ParseAsync(sbomStream, sbomFormat, cancellationToken)
.ConfigureAwait(false);
}
var artifactDigest = NormalizeDigest(payload.ImageDigest)
?? NormalizeDigest(envelope.Scope?.Digest);
if (string.IsNullOrWhiteSpace(artifactDigest))
{
_logger.LogWarning("Scanner report {ReportId} missing artifact digest.", payload.ReportId);
return;
}
var artifactName = ResolveArtifactName(envelope);
var artifactVersion = ResolveArtifactVersion(envelope);
var sbomDigest = NormalizeDigest(sbomArtifact.Digest) ?? sbomContent.Digest;
var storageUri = sbomArtifact.Uri;
var contentSize = sbomArtifact.SizeBytes > 0 ? sbomArtifact.SizeBytes : sbomContent.Length;
var formatLabel = NormalizeSbomFormat(parsedSbom.Format, sbomFormat);
await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
if (connection is null)
{
_logger.LogWarning("Analytics ingestion skipped: database is not configured.");
return;
}
var componentSeeds = BuildComponentSeeds(parsedSbom);
var componentCount = componentSeeds.Count;
await using var transaction = await connection.BeginTransactionAsync(cancellationToken).ConfigureAwait(false);
var artifactId = await UpsertArtifactAsync(
connection,
transaction,
artifactDigest,
artifactName,
artifactVersion,
sbomDigest,
formatLabel,
parsedSbom.SpecVersion,
componentCount,
cancellationToken).ConfigureAwait(false);
if (!string.IsNullOrWhiteSpace(sbomDigest))
{
await UpsertRawSbomAsync(
connection,
transaction,
artifactId,
sbomDigest,
contentSize,
storageUri,
formatLabel,
parsedSbom.SpecVersion,
cancellationToken).ConfigureAwait(false);
}
var componentIds = new Dictionary<ComponentKey, Guid>();
foreach (var seed in componentSeeds)
{
var key = new ComponentKey(seed.Purl, seed.HashSha256);
if (!componentIds.TryGetValue(key, out var componentId))
{
componentId = await UpsertComponentAsync(
connection,
transaction,
seed,
cancellationToken).ConfigureAwait(false);
componentIds[key] = componentId;
}
var inserted = await InsertArtifactComponentAsync(
connection,
transaction,
artifactId,
componentId,
seed,
cancellationToken).ConfigureAwait(false);
if (inserted)
{
await IncrementComponentCountsAsync(
connection,
transaction,
componentId,
cancellationToken).ConfigureAwait(false);
}
}
await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
if (_correlationService is not null)
{
var purls = componentSeeds
.Select(seed => seed.Purl)
.Where(purl => !string.IsNullOrWhiteSpace(purl))
.Distinct(StringComparer.OrdinalIgnoreCase)
.ToArray();
await _correlationService.CorrelateForPurlsAsync(purls, cancellationToken)
.ConfigureAwait(false);
await _correlationService.UpdateArtifactCountsAsync(artifactId, cancellationToken)
.ConfigureAwait(false);
}
}
private async Task<SurfaceManifestDocument?> ResolveManifestAsync(
SurfacePointersPayload? surface,
CancellationToken cancellationToken)
{
if (surface is null)
{
return null;
}
if (surface.Manifest.Artifacts.Count > 0)
{
return surface.Manifest;
}
if (string.IsNullOrWhiteSpace(surface.ManifestUri))
{
return null;
}
var manifestContent = await ReadContentAsync(surface.ManifestUri, cancellationToken).ConfigureAwait(false);
if (manifestContent is null)
{
return null;
}
try
{
return JsonSerializer.Deserialize<SurfaceManifestDocument>(
manifestContent.Bytes,
_jsonOptions);
}
catch (JsonException ex)
{
_logger.LogWarning(ex, "Failed to deserialize surface manifest from {ManifestUri}.", surface.ManifestUri);
return null;
}
}
internal static SurfaceManifestArtifact? SelectSbomArtifact(IReadOnlyList<SurfaceManifestArtifact> artifacts)
{
if (artifacts.Count == 0)
{
return null;
}
SurfaceManifestArtifact? Find(Func<SurfaceManifestArtifact, bool> predicate)
=> artifacts.FirstOrDefault(predicate);
return Find(a => string.Equals(a.Kind, "sbom-inventory", StringComparison.OrdinalIgnoreCase))
?? Find(a => string.Equals(a.View, "inventory", StringComparison.OrdinalIgnoreCase))
?? Find(a => string.Equals(a.Kind, "sbom-usage", StringComparison.OrdinalIgnoreCase))
?? Find(a => string.Equals(a.View, "usage", StringComparison.OrdinalIgnoreCase))
?? Find(a => a.Kind.Contains("sbom", StringComparison.OrdinalIgnoreCase))
?? Find(a => a.MediaType.Contains("cyclonedx", StringComparison.OrdinalIgnoreCase))
?? Find(a => a.MediaType.Contains("spdx", StringComparison.OrdinalIgnoreCase));
}
private async Task<ContentPayload?> ReadContentAsync(string uri, CancellationToken cancellationToken)
{
if (string.IsNullOrWhiteSpace(uri))
{
return null;
}
var casContent = await _casReader.OpenReadAsync(uri, cancellationToken).ConfigureAwait(false);
if (casContent is null)
{
return null;
}
await using var stream = casContent.Stream;
using var buffer = new MemoryStream();
await stream.CopyToAsync(buffer, cancellationToken).ConfigureAwait(false);
var bytes = buffer.ToArray();
var digest = Sha256Hasher.Compute(bytes);
return new ContentPayload(bytes, casContent.Length ?? bytes.Length, digest);
}
internal static SbomFormat ResolveSbomFormat(SurfaceManifestArtifact artifact)
{
var format = artifact.Format?.ToLowerInvariant() ?? string.Empty;
if (format.Contains("spdx", StringComparison.OrdinalIgnoreCase))
{
return SbomFormat.SPDX;
}
if (format.Contains("cdx", StringComparison.OrdinalIgnoreCase) ||
format.Contains("cyclonedx", StringComparison.OrdinalIgnoreCase))
{
return SbomFormat.CycloneDX;
}
var media = artifact.MediaType?.ToLowerInvariant() ?? string.Empty;
return media.Contains("spdx", StringComparison.OrdinalIgnoreCase)
? SbomFormat.SPDX
: SbomFormat.CycloneDX;
}
internal static string NormalizeSbomFormat(string parsedFormat, SbomFormat fallback)
{
if (parsedFormat.Equals("spdx", StringComparison.OrdinalIgnoreCase))
{
return "spdx";
}
if (parsedFormat.Equals("cyclonedx", StringComparison.OrdinalIgnoreCase))
{
return "cyclonedx";
}
return fallback == SbomFormat.SPDX ? "spdx" : "cyclonedx";
}
internal static string NormalizeDigest(string? digest)
{
if (string.IsNullOrWhiteSpace(digest))
{
return string.Empty;
}
var trimmed = digest.Trim();
return trimmed.StartsWith("sha256:", StringComparison.OrdinalIgnoreCase)
? $"sha256:{trimmed[7..].ToLowerInvariant()}"
: $"sha256:{trimmed.ToLowerInvariant()}";
}
internal static string ResolveArtifactName(OrchestratorEventEnvelope envelope)
{
if (!string.IsNullOrWhiteSpace(envelope.Scope?.Repo))
{
return envelope.Scope!.Repo!;
}
return envelope.Scope?.Image ?? envelope.Scope?.Component ?? "unknown";
}
internal static string? ResolveArtifactVersion(OrchestratorEventEnvelope envelope)
{
if (string.IsNullOrWhiteSpace(envelope.Scope?.Image))
{
return null;
}
var image = envelope.Scope.Image;
var tagIndex = image.LastIndexOf(':');
if (tagIndex > 0 && tagIndex < image.Length - 1)
{
return image[(tagIndex + 1)..];
}
return null;
}
private List<ComponentSeed> BuildComponentSeeds(ParsedSbom sbom)
{
var dependencyMap = BuildDependencyMap(sbom);
var paths = BuildDependencyPaths(sbom, dependencyMap);
var seeds = new List<ComponentSeed>();
foreach (var component in sbom.Components)
{
var purl = !string.IsNullOrWhiteSpace(component.Purl)
? PurlParser.Parse(component.Purl).Normalized
: PurlParser.BuildGeneric(component.Name, component.Version);
var hash = ResolveComponentHash(component, purl);
var licenseExpression = LicenseExpressionRenderer.BuildExpression(component.Licenses);
var supplier = component.Supplier?.Name ?? component.Publisher ?? sbom.Metadata.Supplier ?? sbom.Metadata.Manufacturer;
paths.TryGetValue(component.BomRef, out var dependencyPath);
var depth = dependencyPath?.Length > 0 ? dependencyPath.Length - 1 : 0;
var introducedVia = dependencyPath is { Length: > 1 } ? dependencyPath[^2] : null;
seeds.Add(new ComponentSeed(
component.BomRef,
purl,
hash,
component.Name,
component.Version,
MapComponentType(component.Type),
supplier,
licenseExpression,
licenseExpression,
component.Description,
component.Cpe,
MapScope(component.Scope),
dependencyPath,
depth,
introducedVia));
}
return seeds
.GroupBy(seed => new ComponentKey(seed.Purl, seed.HashSha256))
.Select(group => group
.OrderBy(seed => seed.Depth)
.ThenBy(seed => seed.BomRef, StringComparer.Ordinal)
.First())
.ToList();
}
internal static Dictionary<string, IReadOnlyList<string>> BuildDependencyMap(ParsedSbom sbom)
{
var map = new Dictionary<string, IReadOnlyList<string>>(StringComparer.Ordinal);
foreach (var dependency in sbom.Dependencies)
{
if (string.IsNullOrWhiteSpace(dependency.SourceRef))
{
continue;
}
var list = dependency.DependsOn
.Where(value => !string.IsNullOrWhiteSpace(value))
.Distinct(StringComparer.Ordinal)
.OrderBy(value => value, StringComparer.Ordinal)
.ToArray();
if (list.Length > 0)
{
map[dependency.SourceRef] = list;
}
}
return map;
}
internal static Dictionary<string, string[]> BuildDependencyPaths(
ParsedSbom sbom,
Dictionary<string, IReadOnlyList<string>> dependencyMap)
{
var paths = new Dictionary<string, string[]>(StringComparer.Ordinal);
var root = sbom.Metadata.RootComponentRef;
if (string.IsNullOrWhiteSpace(root))
{
return paths;
}
var queue = new Queue<string>();
paths[root] = new[] { root };
queue.Enqueue(root);
while (queue.Count > 0)
{
var current = queue.Dequeue();
if (!dependencyMap.TryGetValue(current, out var children))
{
continue;
}
foreach (var child in children)
{
if (paths.ContainsKey(child))
{
continue;
}
var parentPath = paths[current];
var childPath = new string[parentPath.Length + 1];
Array.Copy(parentPath, childPath, parentPath.Length);
childPath[^1] = child;
paths[child] = childPath;
queue.Enqueue(child);
}
}
return paths;
}
internal static string ResolveComponentHash(ParsedComponent component, string purl)
{
var hash = component.Hashes
.FirstOrDefault(h => h.Algorithm.Equals("sha-256", StringComparison.OrdinalIgnoreCase)
|| h.Algorithm.Equals("sha256", StringComparison.OrdinalIgnoreCase))
?.Value;
return !string.IsNullOrWhiteSpace(hash) ? NormalizeDigest(hash) : Sha256Hasher.Compute(purl);
}
internal static string MapComponentType(string? type)
{
if (string.IsNullOrWhiteSpace(type))
{
return "library";
}
var normalized = type.Trim().ToLowerInvariant();
return normalized switch
{
"application" => "application",
"container" => "container",
"framework" => "framework",
"operating-system" => "operating-system",
"operating system" => "operating-system",
"os" => "operating-system",
"device" => "device",
"firmware" => "firmware",
"file" => "file",
_ => "library"
};
}
internal static string MapScope(ComponentScope scope)
{
return scope switch
{
ComponentScope.Optional => "optional",
ComponentScope.Excluded => "excluded",
ComponentScope.Unknown => "unknown",
_ => "required"
};
}
private async Task<Guid> UpsertArtifactAsync(
NpgsqlConnection connection,
NpgsqlTransaction transaction,
string digest,
string name,
string? version,
string sbomDigest,
string sbomFormat,
string sbomSpecVersion,
int componentCount,
CancellationToken cancellationToken)
{
const string sql = """
INSERT INTO analytics.artifacts (
artifact_type,
name,
version,
digest,
sbom_digest,
sbom_format,
sbom_spec_version,
component_count,
vulnerability_count,
critical_count,
high_count,
medium_count,
low_count,
updated_at
)
VALUES (
@artifact_type,
@name,
@version,
@digest,
@sbom_digest,
@sbom_format,
@sbom_spec_version,
@component_count,
0,
0,
0,
0,
0,
now()
)
ON CONFLICT (digest) DO UPDATE SET
name = EXCLUDED.name,
version = COALESCE(EXCLUDED.version, analytics.artifacts.version),
sbom_digest = EXCLUDED.sbom_digest,
sbom_format = EXCLUDED.sbom_format,
sbom_spec_version = EXCLUDED.sbom_spec_version,
component_count = EXCLUDED.component_count,
updated_at = now()
RETURNING artifact_id;
""";
await using var command = new NpgsqlCommand(sql, connection, transaction);
command.Parameters.AddWithValue("artifact_type", "container");
command.Parameters.AddWithValue("name", name);
command.Parameters.AddWithValue("version", (object?)version ?? DBNull.Value);
command.Parameters.AddWithValue("digest", digest);
command.Parameters.AddWithValue("sbom_digest", sbomDigest);
command.Parameters.AddWithValue("sbom_format", sbomFormat);
command.Parameters.AddWithValue("sbom_spec_version", sbomSpecVersion);
command.Parameters.AddWithValue("component_count", componentCount);
var result = await command.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false);
return result is Guid id ? id : Guid.Empty;
}
private async Task UpsertRawSbomAsync(
NpgsqlConnection connection,
NpgsqlTransaction transaction,
Guid artifactId,
string contentHash,
long contentSize,
string storageUri,
string format,
string specVersion,
CancellationToken cancellationToken)
{
const string sql = """
INSERT INTO analytics.raw_sboms (
artifact_id,
format,
spec_version,
content_hash,
content_size,
storage_uri,
ingest_version,
schema_version
)
VALUES (
@artifact_id,
@format,
@spec_version,
@content_hash,
@content_size,
@storage_uri,
@ingest_version,
@schema_version
)
ON CONFLICT (content_hash) DO NOTHING;
""";
await using var command = new NpgsqlCommand(sql, connection, transaction);
command.Parameters.AddWithValue("artifact_id", artifactId);
command.Parameters.AddWithValue("format", format);
command.Parameters.AddWithValue("spec_version", specVersion);
command.Parameters.AddWithValue("content_hash", contentHash);
command.Parameters.AddWithValue("content_size", contentSize);
command.Parameters.AddWithValue("storage_uri", storageUri);
command.Parameters.AddWithValue("ingest_version", _options.IngestVersion);
command.Parameters.AddWithValue("schema_version", _options.SchemaVersion);
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
private async Task<Guid> UpsertComponentAsync(
NpgsqlConnection connection,
NpgsqlTransaction transaction,
ComponentSeed seed,
CancellationToken cancellationToken)
{
const string sql = """
INSERT INTO analytics.components (
purl,
purl_type,
purl_namespace,
purl_name,
purl_version,
hash_sha256,
name,
version,
description,
component_type,
supplier,
supplier_normalized,
license_declared,
license_concluded,
license_category,
cpe
)
SELECT
@purl,
parsed.purl_type,
parsed.purl_namespace,
parsed.purl_name,
parsed.purl_version,
@hash_sha256,
@name,
@version,
@description,
@component_type,
@supplier,
analytics.normalize_supplier(@supplier),
@license_declared,
@license_concluded,
analytics.categorize_license(@license_concluded),
@cpe
FROM analytics.parse_purl(@purl) AS parsed
ON CONFLICT (purl, hash_sha256) DO UPDATE SET
last_seen_at = now(),
updated_at = now(),
supplier = COALESCE(EXCLUDED.supplier, analytics.components.supplier),
supplier_normalized = COALESCE(EXCLUDED.supplier_normalized, analytics.components.supplier_normalized),
license_declared = COALESCE(EXCLUDED.license_declared, analytics.components.license_declared),
license_concluded = COALESCE(EXCLUDED.license_concluded, analytics.components.license_concluded),
license_category = COALESCE(EXCLUDED.license_category, analytics.components.license_category),
description = COALESCE(EXCLUDED.description, analytics.components.description),
cpe = COALESCE(EXCLUDED.cpe, analytics.components.cpe),
component_type = COALESCE(EXCLUDED.component_type, analytics.components.component_type),
name = COALESCE(EXCLUDED.name, analytics.components.name),
version = COALESCE(EXCLUDED.version, analytics.components.version)
RETURNING component_id;
""";
await using var command = new NpgsqlCommand(sql, connection, transaction);
command.Parameters.AddWithValue("purl", seed.Purl);
command.Parameters.AddWithValue("hash_sha256", seed.HashSha256);
command.Parameters.AddWithValue("name", seed.Name);
command.Parameters.AddWithValue("version", (object?)seed.Version ?? DBNull.Value);
command.Parameters.AddWithValue("description", (object?)seed.Description ?? DBNull.Value);
command.Parameters.AddWithValue("component_type", seed.ComponentType);
command.Parameters.AddWithValue("supplier", (object?)seed.Supplier ?? DBNull.Value);
command.Parameters.AddWithValue("license_declared", (object?)seed.LicenseDeclared ?? DBNull.Value);
command.Parameters.AddWithValue("license_concluded", (object?)seed.LicenseConcluded ?? DBNull.Value);
command.Parameters.AddWithValue("cpe", (object?)seed.Cpe ?? DBNull.Value);
var result = await command.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false);
return result is Guid id ? id : Guid.Empty;
}
private async Task<bool> InsertArtifactComponentAsync(
NpgsqlConnection connection,
NpgsqlTransaction transaction,
Guid artifactId,
Guid componentId,
ComponentSeed seed,
CancellationToken cancellationToken)
{
const string sql = """
INSERT INTO analytics.artifact_components (
artifact_id,
component_id,
bom_ref,
scope,
dependency_path,
depth,
introduced_via
)
VALUES (
@artifact_id,
@component_id,
@bom_ref,
@scope,
@dependency_path,
@depth,
@introduced_via
)
ON CONFLICT (artifact_id, component_id) DO NOTHING;
""";
await using var command = new NpgsqlCommand(sql, connection, transaction);
command.Parameters.AddWithValue("artifact_id", artifactId);
command.Parameters.AddWithValue("component_id", componentId);
command.Parameters.AddWithValue("bom_ref", seed.BomRef);
command.Parameters.AddWithValue("scope", (object?)seed.Scope ?? DBNull.Value);
command.Parameters.AddWithValue("depth", seed.Depth);
command.Parameters.AddWithValue("introduced_via", (object?)seed.IntroducedVia ?? DBNull.Value);
var pathParameter = new NpgsqlParameter("dependency_path", NpgsqlDbType.Array | NpgsqlDbType.Text)
{
Value = (object?)seed.DependencyPath ?? DBNull.Value
};
command.Parameters.Add(pathParameter);
var rows = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
return rows > 0;
}
private static async Task IncrementComponentCountsAsync(
NpgsqlConnection connection,
NpgsqlTransaction transaction,
Guid componentId,
CancellationToken cancellationToken)
{
const string sql = """
UPDATE analytics.components
SET
artifact_count = artifact_count + 1,
sbom_count = sbom_count + 1,
last_seen_at = now(),
updated_at = now()
WHERE component_id = @component_id;
""";
await using var command = new NpgsqlCommand(sql, connection, transaction);
command.Parameters.AddWithValue("component_id", componentId);
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
private bool IsTenantAllowed(string tenant)
{
return TenantNormalizer.IsAllowed(tenant, _options.AllowedTenants);
}
private sealed record ContentPayload(byte[] Bytes, long Length, string Digest);
private sealed record ComponentSeed(
string BomRef,
string Purl,
string HashSha256,
string Name,
string? Version,
string ComponentType,
string? Supplier,
string? LicenseDeclared,
string? LicenseConcluded,
string? Description,
string? Cpe,
string? Scope,
string[]? DependencyPath,
int Depth,
string? IntroducedVia);
private sealed record ComponentKey(string Purl, string HashSha256);
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,126 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StellaOps.Platform.Analytics.Options;
namespace StellaOps.Platform.Analytics.Services;
public interface ICasContentReader
{
Task<CasContent?> OpenReadAsync(string casUri, CancellationToken cancellationToken);
}
public sealed record CasContent(Stream Stream, long? Length);
public sealed class FileCasContentReader : ICasContentReader
{
private readonly AnalyticsCasOptions _options;
private readonly ILogger<FileCasContentReader> _logger;
public FileCasContentReader(
IOptions<AnalyticsIngestionOptions> options,
ILogger<FileCasContentReader> logger)
{
_options = options?.Value.Cas ?? new AnalyticsCasOptions();
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public Task<CasContent?> OpenReadAsync(string casUri, CancellationToken cancellationToken)
{
if (!TryParseCasUri(casUri, out var reference))
{
_logger.LogWarning("Unsupported CAS URI '{CasUri}'.", casUri);
return Task.FromResult<CasContent?>(null);
}
if (string.IsNullOrWhiteSpace(_options.RootPath))
{
_logger.LogWarning("CAS root path not configured; skipping {CasUri}.", casUri);
return Task.FromResult<CasContent?>(null);
}
var root = Path.GetFullPath(_options.RootPath);
foreach (var candidate in ExpandKeyCandidates(reference.Key))
{
var keyPath = candidate.Replace('/', Path.DirectorySeparatorChar);
var resolved = Path.GetFullPath(Path.Combine(root, reference.Bucket, keyPath));
if (!resolved.StartsWith(root, StringComparison.OrdinalIgnoreCase))
{
_logger.LogWarning("CAS URI '{CasUri}' resolved outside root '{Root}'.", casUri, root);
return Task.FromResult<CasContent?>(null);
}
if (!File.Exists(resolved))
{
continue;
}
var stream = new FileStream(resolved, FileMode.Open, FileAccess.Read, FileShare.Read);
var length = new FileInfo(resolved).Length;
return Task.FromResult<CasContent?>(new CasContent(stream, length));
}
_logger.LogWarning("CAS object not found at '{Key}' for '{CasUri}'.", reference.Key, casUri);
return Task.FromResult<CasContent?>(null);
}
private bool TryParseCasUri(string casUri, out CasReference reference)
{
reference = default!;
if (string.IsNullOrWhiteSpace(casUri))
{
return false;
}
if (!Uri.TryCreate(casUri, UriKind.Absolute, out var uri) ||
!string.Equals(uri.Scheme, "cas", StringComparison.OrdinalIgnoreCase))
{
return false;
}
var bucket = uri.Host;
var key = uri.AbsolutePath.TrimStart('/');
if (string.IsNullOrWhiteSpace(bucket))
{
if (string.IsNullOrWhiteSpace(_options.DefaultBucket))
{
return false;
}
bucket = _options.DefaultBucket!;
}
if (string.IsNullOrWhiteSpace(key))
{
return false;
}
reference = new CasReference(casUri, bucket, key);
return true;
}
private static IEnumerable<string> ExpandKeyCandidates(string key)
{
yield return key;
var colonIndex = key.IndexOf(':');
if (colonIndex <= 0 || colonIndex >= key.Length - 1)
{
yield break;
}
var prefix = key[..colonIndex];
var suffix = key[(colonIndex + 1)..];
yield return $"{prefix}/{suffix}";
yield return suffix;
}
}
public sealed record CasReference(string Uri, string Bucket, string Key);

View File

@@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace StellaOps.Platform.Analytics.Services;
public interface IVulnerabilityCorrelationService
{
Task CorrelateForPurlsAsync(IReadOnlyCollection<string> purls, CancellationToken cancellationToken);
Task UpdateArtifactCountsAsync(Guid artifactId, CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,603 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
using StellaOps.Messaging;
using StellaOps.Messaging.Abstractions;
using StellaOps.Platform.Analytics.Models;
using StellaOps.Platform.Analytics.Options;
using StellaOps.Platform.Analytics.Utilities;
namespace StellaOps.Platform.Analytics.Services;
public sealed class VulnerabilityCorrelationService : BackgroundService, IVulnerabilityCorrelationService
{
private readonly AnalyticsIngestionOptions _options;
private readonly AnalyticsIngestionDataSource _dataSource;
private readonly ILogger<VulnerabilityCorrelationService> _logger;
private readonly IEventStream<AdvisoryObservationUpdatedEvent>? _observationStream;
private readonly IEventStream<AdvisoryLinksetUpdatedEvent>? _linksetStream;
private readonly JsonSerializerOptions _jsonOptions = new()
{
PropertyNameCaseInsensitive = true
};
public VulnerabilityCorrelationService(
IOptions<AnalyticsIngestionOptions> options,
AnalyticsIngestionDataSource dataSource,
ILogger<VulnerabilityCorrelationService> logger,
IEventStreamFactory? eventStreamFactory = null)
{
_options = options?.Value ?? new AnalyticsIngestionOptions();
_options.Normalize();
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
if (eventStreamFactory is not null)
{
if (!string.IsNullOrWhiteSpace(_options.Streams.ConcelierObservationStream))
{
_observationStream = eventStreamFactory.Create<AdvisoryObservationUpdatedEvent>(
new EventStreamOptions
{
StreamName = _options.Streams.ConcelierObservationStream
});
}
if (!string.IsNullOrWhiteSpace(_options.Streams.ConcelierLinksetStream))
{
_linksetStream = eventStreamFactory.Create<AdvisoryLinksetUpdatedEvent>(
new EventStreamOptions
{
StreamName = _options.Streams.ConcelierLinksetStream
});
}
}
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (!_options.Enabled)
{
_logger.LogInformation("Vulnerability correlation disabled by configuration.");
return;
}
if (_observationStream is null && _linksetStream is null)
{
_logger.LogWarning("Vulnerability correlation disabled: no event streams configured.");
return;
}
var tasks = new List<Task>(2);
if (_observationStream is not null)
{
tasks.Add(ConsumeObservationStreamAsync(stoppingToken));
}
if (_linksetStream is not null)
{
tasks.Add(ConsumeLinksetStreamAsync(stoppingToken));
}
try
{
await Task.WhenAll(tasks).ConfigureAwait(false);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("Vulnerability correlation stopped.");
}
catch (Exception ex)
{
_logger.LogError(ex, "Vulnerability correlation failed.");
throw;
}
}
public async Task CorrelateForPurlsAsync(
IReadOnlyCollection<string> purls,
CancellationToken cancellationToken)
{
var normalized = NormalizePurls(purls);
if (normalized.Count == 0)
{
return;
}
await using var connection = await _dataSource
.OpenConnectionAsync(cancellationToken)
.ConfigureAwait(false);
if (connection is null)
{
_logger.LogWarning("Vulnerability correlation skipped: Postgres not configured.");
return;
}
var components = await LoadComponentsAsync(connection, normalized, cancellationToken)
.ConfigureAwait(false);
if (components.Count == 0)
{
return;
}
var matches = await LoadVulnerabilityMatchesAsync(connection, normalized, cancellationToken)
.ConfigureAwait(false);
if (matches.Count == 0)
{
return;
}
await using var transaction = await connection.BeginTransactionAsync(cancellationToken)
.ConfigureAwait(false);
foreach (var component in components)
{
if (!matches.TryGetValue(component.Purl, out var vulnMatches))
{
continue;
}
foreach (var match in vulnMatches)
{
if (!VulnerabilityCorrelationRules.TryParseNormalizedVersions(
match.NormalizedVersionsJson,
_jsonOptions,
out var rules,
out var error))
{
_logger.LogWarning(error, "Failed to parse normalized versions payload.");
}
var affects = rules.Count == 0
|| VersionRuleEvaluator.Matches(component.Version, rules);
var fixedVersion = VulnerabilityCorrelationRules.ExtractFixedVersion(rules);
var fixAvailable = !string.IsNullOrWhiteSpace(fixedVersion);
var affectedVersions = match.NormalizedVersionsJson;
await UpsertComponentVulnAsync(
connection,
transaction,
component.ComponentId,
match,
affects,
affectedVersions,
fixedVersion,
fixAvailable,
cancellationToken).ConfigureAwait(false);
}
}
await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
}
public async Task UpdateArtifactCountsAsync(Guid artifactId, CancellationToken cancellationToken)
{
await using var connection = await _dataSource
.OpenConnectionAsync(cancellationToken)
.ConfigureAwait(false);
if (connection is null)
{
_logger.LogWarning("Artifact count update skipped: Postgres not configured.");
return;
}
const string sql = """
WITH counts AS (
SELECT
COUNT(DISTINCT cv.vuln_id) FILTER (WHERE cv.affects = TRUE) AS total,
COUNT(DISTINCT CASE WHEN cv.affects = TRUE AND cv.severity = 'critical' THEN cv.vuln_id END) AS critical,
COUNT(DISTINCT CASE WHEN cv.affects = TRUE AND cv.severity = 'high' THEN cv.vuln_id END) AS high,
COUNT(DISTINCT CASE WHEN cv.affects = TRUE AND cv.severity = 'medium' THEN cv.vuln_id END) AS medium,
COUNT(DISTINCT CASE WHEN cv.affects = TRUE AND cv.severity = 'low' THEN cv.vuln_id END) AS low
FROM analytics.artifact_components ac
JOIN analytics.component_vulns cv ON cv.component_id = ac.component_id
WHERE ac.artifact_id = @artifact_id
)
UPDATE analytics.artifacts
SET
vulnerability_count = COALESCE((SELECT total FROM counts), 0),
critical_count = COALESCE((SELECT critical FROM counts), 0),
high_count = COALESCE((SELECT high FROM counts), 0),
medium_count = COALESCE((SELECT medium FROM counts), 0),
low_count = COALESCE((SELECT low FROM counts), 0),
updated_at = now()
WHERE artifact_id = @artifact_id;
""";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("artifact_id", artifactId);
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
private async Task ConsumeObservationStreamAsync(CancellationToken stoppingToken)
{
if (_observationStream is null)
{
return;
}
var position = _options.Streams.StartFromBeginning
? StreamPosition.Beginning
: StreamPosition.End;
_logger.LogInformation(
"Subscribed to {StreamName} for advisory observation updates from {Position}.",
_observationStream.StreamName,
position.Value);
await foreach (var streamEvent in _observationStream.SubscribeAsync(position, stoppingToken))
{
var payload = streamEvent.Event;
if (!IsTenantAllowed(payload.TenantId))
{
continue;
}
var purls = payload.LinksetSummary.Purls?.ToArray() ?? Array.Empty<string>();
if (purls.Length == 0)
{
continue;
}
await CorrelateForPurlsAsync(purls, stoppingToken).ConfigureAwait(false);
await UpdateArtifactCountsForPurlsAsync(purls, stoppingToken).ConfigureAwait(false);
}
}
private async Task ConsumeLinksetStreamAsync(CancellationToken stoppingToken)
{
if (_linksetStream is null)
{
return;
}
var position = _options.Streams.StartFromBeginning
? StreamPosition.Beginning
: StreamPosition.End;
_logger.LogInformation(
"Subscribed to {StreamName} for advisory linkset updates from {Position}.",
_linksetStream.StreamName,
position.Value);
await foreach (var streamEvent in _linksetStream.SubscribeAsync(position, stoppingToken))
{
var payload = streamEvent.Event;
if (!IsTenantAllowed(payload.TenantId))
{
continue;
}
var purls = await ResolvePurlsForAdvisoryAsync(payload.AdvisoryId, stoppingToken)
.ConfigureAwait(false);
if (purls.Count == 0)
{
continue;
}
await CorrelateForPurlsAsync(purls, stoppingToken).ConfigureAwait(false);
await UpdateArtifactCountsForPurlsAsync(purls, stoppingToken).ConfigureAwait(false);
}
}
private bool IsTenantAllowed(string tenant)
=> TenantNormalizer.IsAllowed(tenant, _options.AllowedTenants);
private IReadOnlyList<string> NormalizePurls(IReadOnlyCollection<string> purls)
{
if (purls.Count == 0)
{
return Array.Empty<string>();
}
return purls
.Where(value => !string.IsNullOrWhiteSpace(value))
.Select(value => PurlParser.Parse(value).Normalized)
.Where(value => !string.IsNullOrWhiteSpace(value))
.Distinct(StringComparer.OrdinalIgnoreCase)
.ToArray();
}
private async Task<List<ComponentSnapshot>> LoadComponentsAsync(
NpgsqlConnection connection,
IReadOnlyList<string> purls,
CancellationToken cancellationToken)
{
const string sql = """
SELECT component_id, purl, COALESCE(NULLIF(purl_version, ''), NULLIF(version, ''))
FROM analytics.components
WHERE purl = ANY(@purls);
""";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("purls", purls);
var components = new List<ComponentSnapshot>();
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
var componentId = reader.GetGuid(0);
var purl = reader.GetString(1);
var version = reader.IsDBNull(2) ? null : reader.GetString(2);
components.Add(new ComponentSnapshot(componentId, purl, version));
}
return components;
}
private async Task<Dictionary<string, List<VulnerabilityMatch>>> LoadVulnerabilityMatchesAsync(
NpgsqlConnection connection,
IReadOnlyList<string> purls,
CancellationToken cancellationToken)
{
const string sql = """
SELECT DISTINCT ON (aff.package_purl, adv.primary_vuln_id)
aff.package_purl,
adv.primary_vuln_id,
COALESCE(src.source_type, src.key, 'unknown') AS source,
adv.severity,
adv.published_at,
cvss.base_score,
cvss.vector,
canon.epss_score,
(kev.cve_id IS NOT NULL) AS kev_listed,
aff.normalized_versions::text AS normalized_versions
FROM vuln.advisory_affected aff
JOIN vuln.advisories adv ON adv.id = aff.advisory_id
LEFT JOIN vuln.sources src ON src.id = adv.source_id
LEFT JOIN LATERAL (
SELECT base_score, vector
FROM vuln.advisory_cvss
WHERE advisory_id = adv.id
ORDER BY is_primary DESC, base_score DESC, version DESC
LIMIT 1
) cvss ON TRUE
LEFT JOIN vuln.kev_flags kev ON kev.cve_id = adv.primary_vuln_id
LEFT JOIN vuln.advisory_canonical canon ON canon.cve = adv.primary_vuln_id
WHERE aff.package_purl = ANY(@purls)
AND adv.state = 'active'
ORDER BY aff.package_purl, adv.primary_vuln_id, COALESCE(src.priority, 100) ASC,
COALESCE(adv.updated_at, adv.created_at) DESC;
""";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("purls", purls);
var matches = new Dictionary<string, List<VulnerabilityMatch>>(StringComparer.OrdinalIgnoreCase);
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
var purl = reader.GetString(0);
var vulnId = reader.GetString(1);
var source = reader.IsDBNull(2) ? "unknown" : reader.GetString(2);
var severity = reader.IsDBNull(3) ? null : reader.GetString(3);
var publishedAt = reader.IsDBNull(4) ? (DateTimeOffset?)null : reader.GetFieldValue<DateTimeOffset>(4);
var cvssScore = reader.IsDBNull(5) ? (decimal?)null : reader.GetDecimal(5);
var cvssVector = reader.IsDBNull(6) ? null : reader.GetString(6);
var epssScore = reader.IsDBNull(7) ? (decimal?)null : reader.GetDecimal(7);
var kevListed = !reader.IsDBNull(8) && reader.GetBoolean(8);
var normalizedVersionsJson = reader.IsDBNull(9) ? null : reader.GetString(9);
var match = new VulnerabilityMatch(
purl,
vulnId,
VulnerabilityCorrelationRules.NormalizeSource(source),
VulnerabilityCorrelationRules.NormalizeSeverity(severity),
cvssScore,
cvssVector,
epssScore,
kevListed,
normalizedVersionsJson,
publishedAt);
if (!matches.TryGetValue(purl, out var list))
{
list = new List<VulnerabilityMatch>();
matches[purl] = list;
}
list.Add(match);
}
return matches;
}
private async Task<IReadOnlyList<string>> ResolvePurlsForAdvisoryAsync(
string advisoryId,
CancellationToken cancellationToken)
{
if (string.IsNullOrWhiteSpace(advisoryId))
{
return Array.Empty<string>();
}
await using var connection = await _dataSource
.OpenConnectionAsync(cancellationToken)
.ConfigureAwait(false);
if (connection is null)
{
return Array.Empty<string>();
}
const string sql = """
SELECT DISTINCT aff.package_purl
FROM vuln.advisory_affected aff
JOIN vuln.advisories adv ON adv.id = aff.advisory_id
WHERE aff.package_purl IS NOT NULL
AND (adv.primary_vuln_id = @advisory_id OR adv.advisory_key = @advisory_id);
""";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("advisory_id", advisoryId);
var purls = new List<string>();
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
var purl = reader.GetString(0);
if (!string.IsNullOrWhiteSpace(purl))
{
purls.Add(purl);
}
}
return purls;
}
private async Task UpdateArtifactCountsForPurlsAsync(
IReadOnlyCollection<string> purls,
CancellationToken cancellationToken)
{
var normalized = NormalizePurls(purls);
if (normalized.Count == 0)
{
return;
}
await using var connection = await _dataSource
.OpenConnectionAsync(cancellationToken)
.ConfigureAwait(false);
if (connection is null)
{
return;
}
const string sql = """
WITH target_artifacts AS (
SELECT DISTINCT ac.artifact_id
FROM analytics.artifact_components ac
JOIN analytics.components c ON c.component_id = ac.component_id
WHERE c.purl = ANY(@purls)
),
counts AS (
SELECT
ac.artifact_id,
COUNT(DISTINCT cv.vuln_id) FILTER (WHERE cv.affects = TRUE) AS total,
COUNT(DISTINCT CASE WHEN cv.affects = TRUE AND cv.severity = 'critical' THEN cv.vuln_id END) AS critical,
COUNT(DISTINCT CASE WHEN cv.affects = TRUE AND cv.severity = 'high' THEN cv.vuln_id END) AS high,
COUNT(DISTINCT CASE WHEN cv.affects = TRUE AND cv.severity = 'medium' THEN cv.vuln_id END) AS medium,
COUNT(DISTINCT CASE WHEN cv.affects = TRUE AND cv.severity = 'low' THEN cv.vuln_id END) AS low
FROM analytics.artifact_components ac
JOIN analytics.component_vulns cv ON cv.component_id = ac.component_id
WHERE ac.artifact_id IN (SELECT artifact_id FROM target_artifacts)
GROUP BY ac.artifact_id
)
UPDATE analytics.artifacts a
SET
vulnerability_count = COALESCE(c.total, 0),
critical_count = COALESCE(c.critical, 0),
high_count = COALESCE(c.high, 0),
medium_count = COALESCE(c.medium, 0),
low_count = COALESCE(c.low, 0),
updated_at = now()
FROM target_artifacts t
LEFT JOIN counts c ON c.artifact_id = t.artifact_id
WHERE a.artifact_id = t.artifact_id;
""";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("purls", normalized);
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
private static async Task UpsertComponentVulnAsync(
NpgsqlConnection connection,
NpgsqlTransaction transaction,
Guid componentId,
VulnerabilityMatch match,
bool affects,
string? affectedVersions,
string? fixedVersion,
bool fixAvailable,
CancellationToken cancellationToken)
{
const string sql = """
INSERT INTO analytics.component_vulns (
component_id,
vuln_id,
source,
severity,
cvss_score,
cvss_vector,
epss_score,
kev_listed,
affects,
affected_versions,
fixed_version,
fix_available,
introduced_via,
published_at
)
VALUES (
@component_id,
@vuln_id,
@source,
@severity,
@cvss_score,
@cvss_vector,
@epss_score,
@kev_listed,
@affects,
@affected_versions,
@fixed_version,
@fix_available,
@introduced_via,
@published_at
)
ON CONFLICT (component_id, vuln_id) DO UPDATE SET
source = EXCLUDED.source,
severity = EXCLUDED.severity,
cvss_score = EXCLUDED.cvss_score,
cvss_vector = EXCLUDED.cvss_vector,
epss_score = EXCLUDED.epss_score,
kev_listed = EXCLUDED.kev_listed,
affects = EXCLUDED.affects,
affected_versions = EXCLUDED.affected_versions,
fixed_version = COALESCE(EXCLUDED.fixed_version, analytics.component_vulns.fixed_version),
fix_available = EXCLUDED.fix_available,
introduced_via = COALESCE(EXCLUDED.introduced_via, analytics.component_vulns.introduced_via),
published_at = COALESCE(EXCLUDED.published_at, analytics.component_vulns.published_at),
updated_at = now();
""";
await using var command = new NpgsqlCommand(sql, connection, transaction);
command.Parameters.AddWithValue("component_id", componentId);
command.Parameters.AddWithValue("vuln_id", match.VulnId);
command.Parameters.AddWithValue("source", match.Source);
command.Parameters.AddWithValue("severity", match.Severity);
command.Parameters.AddWithValue("cvss_score", (object?)match.CvssScore ?? DBNull.Value);
command.Parameters.AddWithValue("cvss_vector", (object?)match.CvssVector ?? DBNull.Value);
command.Parameters.AddWithValue("epss_score", (object?)match.EpssScore ?? DBNull.Value);
command.Parameters.AddWithValue("kev_listed", match.KevListed);
command.Parameters.AddWithValue("affects", affects);
command.Parameters.AddWithValue("affected_versions", (object?)affectedVersions ?? DBNull.Value);
command.Parameters.AddWithValue("fixed_version", (object?)fixedVersion ?? DBNull.Value);
command.Parameters.AddWithValue("fix_available", fixAvailable);
command.Parameters.AddWithValue("introduced_via", DBNull.Value);
command.Parameters.AddWithValue("published_at", (object?)match.PublishedAt ?? DBNull.Value);
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
private sealed record ComponentSnapshot(Guid ComponentId, string Purl, string? Version);
private sealed record VulnerabilityMatch(
string Purl,
string VulnId,
string Source,
string Severity,
decimal? CvssScore,
string? CvssVector,
decimal? EpssScore,
bool KevListed,
string? NormalizedVersionsJson,
DateTimeOffset? PublishedAt);
}