using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Npgsql; using NpgsqlTypes; using StellaOps.Concelier.SbomIntegration.Models; using StellaOps.Concelier.SbomIntegration.Parsing; using StellaOps.Messaging; using StellaOps.Messaging.Abstractions; using StellaOps.Platform.Analytics.Models; using StellaOps.Platform.Analytics.Options; using StellaOps.Platform.Analytics.Utilities; using StellaOps.Scanner.Surface.FS; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text.Json; using System.Threading; using System.Threading.Tasks; namespace StellaOps.Platform.Analytics.Services; public sealed class AnalyticsIngestionService : BackgroundService { private readonly AnalyticsIngestionOptions _options; private readonly AnalyticsIngestionDataSource _dataSource; private readonly ICasContentReader _casReader; private readonly IParsedSbomParser _sbomParser; private readonly IVulnerabilityCorrelationService? _correlationService; private readonly ILogger _logger; private readonly IEventStream? _eventStream; private readonly JsonSerializerOptions _jsonOptions = new() { PropertyNameCaseInsensitive = true }; public AnalyticsIngestionService( IOptions options, AnalyticsIngestionDataSource dataSource, ICasContentReader casReader, IParsedSbomParser sbomParser, ILogger logger, IEventStreamFactory? eventStreamFactory = null, IVulnerabilityCorrelationService? correlationService = null) { _options = options?.Value ?? new AnalyticsIngestionOptions(); _options.Normalize(); _dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource)); _casReader = casReader ?? throw new ArgumentNullException(nameof(casReader)); _sbomParser = sbomParser ?? throw new ArgumentNullException(nameof(sbomParser)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _correlationService = correlationService; if (eventStreamFactory is not null && !string.IsNullOrWhiteSpace(_options.Streams.ScannerStream)) { _eventStream = eventStreamFactory.Create(new EventStreamOptions { StreamName = _options.Streams.ScannerStream }); } } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { if (!_options.Enabled) { _logger.LogInformation("Analytics ingestion disabled by configuration."); return; } if (_eventStream is null) { _logger.LogWarning("Analytics ingestion disabled: no event stream configured."); return; } var position = _options.Streams.StartFromBeginning ? StreamPosition.Beginning : StreamPosition.End; _logger.LogInformation( "Analytics ingestion started; subscribing to {StreamName} from {Position}.", _eventStream.StreamName, position.Value); try { await foreach (var streamEvent in _eventStream.SubscribeAsync(position, stoppingToken)) { await HandleEventAsync(streamEvent.Event, stoppingToken).ConfigureAwait(false); } } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { _logger.LogInformation("Analytics ingestion stopped."); } catch (Exception ex) { _logger.LogError(ex, "Analytics ingestion failed."); throw; } } private async Task HandleEventAsync(OrchestratorEventEnvelope envelope, CancellationToken cancellationToken) { if (!string.Equals(envelope.Kind, OrchestratorEventKinds.ScannerReportReady, StringComparison.OrdinalIgnoreCase)) { return; } if (!IsTenantAllowed(envelope.Tenant)) { _logger.LogDebug("Skipping scanner event {EventId}; tenant {Tenant} not allowed.", envelope.EventId, envelope.Tenant); return; } if (envelope.Payload is null || envelope.Payload.Value.ValueKind == JsonValueKind.Undefined) { _logger.LogWarning("Scanner report event {EventId} missing payload.", envelope.EventId); return; } ReportReadyEventPayload? payload; try { payload = envelope.Payload.Value.Deserialize(_jsonOptions); } catch (JsonException ex) { _logger.LogWarning(ex, "Failed to parse scanner report payload for event {EventId}.", envelope.EventId); return; } if (payload is null) { _logger.LogWarning("Scanner report payload empty for event {EventId}.", envelope.EventId); return; } await IngestSbomAsync(envelope, payload, cancellationToken).ConfigureAwait(false); } private async Task IngestSbomAsync( OrchestratorEventEnvelope envelope, ReportReadyEventPayload payload, CancellationToken cancellationToken) { var surface = payload.Report.Surface; var manifest = await ResolveManifestAsync(surface, cancellationToken).ConfigureAwait(false); if (manifest is null) { _logger.LogWarning("Scanner report {ReportId} missing surface manifest.", payload.ReportId); return; } var sbomArtifact = SelectSbomArtifact(manifest.Artifacts); if (sbomArtifact is null) { _logger.LogWarning("Scanner report {ReportId} contains no SBOM artifacts.", payload.ReportId); return; } var sbomContent = await ReadContentAsync(sbomArtifact.Uri, cancellationToken).ConfigureAwait(false); if (sbomContent is null) { _logger.LogWarning("Failed to read SBOM content for report {ReportId}.", payload.ReportId); return; } var sbomFormat = ResolveSbomFormat(sbomArtifact); ParsedSbom parsedSbom; await using (var sbomStream = new MemoryStream(sbomContent.Bytes, writable: false)) { parsedSbom = await _sbomParser.ParseAsync(sbomStream, sbomFormat, cancellationToken) .ConfigureAwait(false); } var artifactDigest = NormalizeDigest(payload.ImageDigest) ?? NormalizeDigest(envelope.Scope?.Digest); if (string.IsNullOrWhiteSpace(artifactDigest)) { _logger.LogWarning("Scanner report {ReportId} missing artifact digest.", payload.ReportId); return; } var artifactName = ResolveArtifactName(envelope); var artifactVersion = ResolveArtifactVersion(envelope); var sbomDigest = NormalizeDigest(sbomArtifact.Digest) ?? sbomContent.Digest; var storageUri = sbomArtifact.Uri; var contentSize = sbomArtifact.SizeBytes > 0 ? sbomArtifact.SizeBytes : sbomContent.Length; var formatLabel = NormalizeSbomFormat(parsedSbom.Format, sbomFormat); await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken).ConfigureAwait(false); if (connection is null) { _logger.LogWarning("Analytics ingestion skipped: database is not configured."); return; } var componentSeeds = BuildComponentSeeds(parsedSbom); var componentCount = componentSeeds.Count; await using var transaction = await connection.BeginTransactionAsync(cancellationToken).ConfigureAwait(false); var artifactId = await UpsertArtifactAsync( connection, transaction, artifactDigest, artifactName, artifactVersion, sbomDigest, formatLabel, parsedSbom.SpecVersion, componentCount, cancellationToken).ConfigureAwait(false); if (!string.IsNullOrWhiteSpace(sbomDigest)) { await UpsertRawSbomAsync( connection, transaction, artifactId, sbomDigest, contentSize, storageUri, formatLabel, parsedSbom.SpecVersion, cancellationToken).ConfigureAwait(false); } var componentIds = new Dictionary(); foreach (var seed in componentSeeds) { var key = new ComponentKey(seed.Purl, seed.HashSha256); if (!componentIds.TryGetValue(key, out var componentId)) { componentId = await UpsertComponentAsync( connection, transaction, seed, cancellationToken).ConfigureAwait(false); componentIds[key] = componentId; } var inserted = await InsertArtifactComponentAsync( connection, transaction, artifactId, componentId, seed, cancellationToken).ConfigureAwait(false); if (inserted) { await IncrementComponentCountsAsync( connection, transaction, componentId, cancellationToken).ConfigureAwait(false); } } await transaction.CommitAsync(cancellationToken).ConfigureAwait(false); if (_correlationService is not null) { var purls = componentSeeds .Select(seed => seed.Purl) .Where(purl => !string.IsNullOrWhiteSpace(purl)) .Distinct(StringComparer.OrdinalIgnoreCase) .ToArray(); await _correlationService.CorrelateForPurlsAsync(purls, cancellationToken) .ConfigureAwait(false); await _correlationService.UpdateArtifactCountsAsync(artifactId, cancellationToken) .ConfigureAwait(false); } } private async Task ResolveManifestAsync( SurfacePointersPayload? surface, CancellationToken cancellationToken) { if (surface is null) { return null; } if (surface.Manifest.Artifacts.Count > 0) { return surface.Manifest; } if (string.IsNullOrWhiteSpace(surface.ManifestUri)) { return null; } var manifestContent = await ReadContentAsync(surface.ManifestUri, cancellationToken).ConfigureAwait(false); if (manifestContent is null) { return null; } try { return JsonSerializer.Deserialize( manifestContent.Bytes, _jsonOptions); } catch (JsonException ex) { _logger.LogWarning(ex, "Failed to deserialize surface manifest from {ManifestUri}.", surface.ManifestUri); return null; } } internal static SurfaceManifestArtifact? SelectSbomArtifact(IReadOnlyList artifacts) { if (artifacts.Count == 0) { return null; } SurfaceManifestArtifact? Find(Func predicate) => artifacts.FirstOrDefault(predicate); return Find(a => string.Equals(a.Kind, "sbom-inventory", StringComparison.OrdinalIgnoreCase)) ?? Find(a => string.Equals(a.View, "inventory", StringComparison.OrdinalIgnoreCase)) ?? Find(a => string.Equals(a.Kind, "sbom-usage", StringComparison.OrdinalIgnoreCase)) ?? Find(a => string.Equals(a.View, "usage", StringComparison.OrdinalIgnoreCase)) ?? Find(a => a.Kind.Contains("sbom", StringComparison.OrdinalIgnoreCase)) ?? Find(a => a.MediaType.Contains("cyclonedx", StringComparison.OrdinalIgnoreCase)) ?? Find(a => a.MediaType.Contains("spdx", StringComparison.OrdinalIgnoreCase)); } private async Task ReadContentAsync(string uri, CancellationToken cancellationToken) { if (string.IsNullOrWhiteSpace(uri)) { return null; } var casContent = await _casReader.OpenReadAsync(uri, cancellationToken).ConfigureAwait(false); if (casContent is null) { return null; } await using var stream = casContent.Stream; using var buffer = new MemoryStream(); await stream.CopyToAsync(buffer, cancellationToken).ConfigureAwait(false); var bytes = buffer.ToArray(); var digest = Sha256Hasher.Compute(bytes); return new ContentPayload(bytes, casContent.Length ?? bytes.Length, digest); } internal static SbomFormat ResolveSbomFormat(SurfaceManifestArtifact artifact) { var format = artifact.Format?.ToLowerInvariant() ?? string.Empty; if (format.Contains("spdx", StringComparison.OrdinalIgnoreCase)) { return SbomFormat.SPDX; } if (format.Contains("cdx", StringComparison.OrdinalIgnoreCase) || format.Contains("cyclonedx", StringComparison.OrdinalIgnoreCase)) { return SbomFormat.CycloneDX; } var media = artifact.MediaType?.ToLowerInvariant() ?? string.Empty; return media.Contains("spdx", StringComparison.OrdinalIgnoreCase) ? SbomFormat.SPDX : SbomFormat.CycloneDX; } internal static string NormalizeSbomFormat(string parsedFormat, SbomFormat fallback) { if (parsedFormat.Equals("spdx", StringComparison.OrdinalIgnoreCase)) { return "spdx"; } if (parsedFormat.Equals("cyclonedx", StringComparison.OrdinalIgnoreCase)) { return "cyclonedx"; } return fallback == SbomFormat.SPDX ? "spdx" : "cyclonedx"; } internal static string NormalizeDigest(string? digest) { if (string.IsNullOrWhiteSpace(digest)) { return string.Empty; } var trimmed = digest.Trim(); return trimmed.StartsWith("sha256:", StringComparison.OrdinalIgnoreCase) ? $"sha256:{trimmed[7..].ToLowerInvariant()}" : $"sha256:{trimmed.ToLowerInvariant()}"; } internal static string ResolveArtifactName(OrchestratorEventEnvelope envelope) { if (!string.IsNullOrWhiteSpace(envelope.Scope?.Repo)) { return envelope.Scope!.Repo!; } return envelope.Scope?.Image ?? envelope.Scope?.Component ?? "unknown"; } internal static string? ResolveArtifactVersion(OrchestratorEventEnvelope envelope) { if (string.IsNullOrWhiteSpace(envelope.Scope?.Image)) { return null; } var image = envelope.Scope.Image; var tagIndex = image.LastIndexOf(':'); if (tagIndex > 0 && tagIndex < image.Length - 1) { return image[(tagIndex + 1)..]; } return null; } private List BuildComponentSeeds(ParsedSbom sbom) { var dependencyMap = BuildDependencyMap(sbom); var paths = BuildDependencyPaths(sbom, dependencyMap); var seeds = new List(); foreach (var component in sbom.Components) { var purl = !string.IsNullOrWhiteSpace(component.Purl) ? PurlParser.Parse(component.Purl).Normalized : PurlParser.BuildGeneric(component.Name, component.Version); var hash = ResolveComponentHash(component, purl); var licenseExpression = LicenseExpressionRenderer.BuildExpression(component.Licenses); var supplier = component.Supplier?.Name ?? component.Publisher ?? sbom.Metadata.Supplier ?? sbom.Metadata.Manufacturer; paths.TryGetValue(component.BomRef, out var dependencyPath); var depth = dependencyPath?.Length > 0 ? dependencyPath.Length - 1 : 0; var introducedVia = dependencyPath is { Length: > 1 } ? dependencyPath[^2] : null; seeds.Add(new ComponentSeed( component.BomRef, purl, hash, component.Name, component.Version, MapComponentType(component.Type), supplier, licenseExpression, licenseExpression, component.Description, component.Cpe, MapScope(component.Scope), dependencyPath, depth, introducedVia)); } return seeds .GroupBy(seed => new ComponentKey(seed.Purl, seed.HashSha256)) .Select(group => group .OrderBy(seed => seed.Depth) .ThenBy(seed => seed.BomRef, StringComparer.Ordinal) .First()) .ToList(); } internal static Dictionary> BuildDependencyMap(ParsedSbom sbom) { var map = new Dictionary>(StringComparer.Ordinal); foreach (var dependency in sbom.Dependencies) { if (string.IsNullOrWhiteSpace(dependency.SourceRef)) { continue; } var list = dependency.DependsOn .Where(value => !string.IsNullOrWhiteSpace(value)) .Distinct(StringComparer.Ordinal) .OrderBy(value => value, StringComparer.Ordinal) .ToArray(); if (list.Length > 0) { map[dependency.SourceRef] = list; } } return map; } internal static Dictionary BuildDependencyPaths( ParsedSbom sbom, Dictionary> dependencyMap) { var paths = new Dictionary(StringComparer.Ordinal); var root = sbom.Metadata.RootComponentRef; if (string.IsNullOrWhiteSpace(root)) { return paths; } var queue = new Queue(); paths[root] = new[] { root }; queue.Enqueue(root); while (queue.Count > 0) { var current = queue.Dequeue(); if (!dependencyMap.TryGetValue(current, out var children)) { continue; } foreach (var child in children) { if (paths.ContainsKey(child)) { continue; } var parentPath = paths[current]; var childPath = new string[parentPath.Length + 1]; Array.Copy(parentPath, childPath, parentPath.Length); childPath[^1] = child; paths[child] = childPath; queue.Enqueue(child); } } return paths; } internal static string ResolveComponentHash(ParsedComponent component, string purl) { var hash = component.Hashes .FirstOrDefault(h => h.Algorithm.Equals("sha-256", StringComparison.OrdinalIgnoreCase) || h.Algorithm.Equals("sha256", StringComparison.OrdinalIgnoreCase)) ?.Value; return !string.IsNullOrWhiteSpace(hash) ? NormalizeDigest(hash) : Sha256Hasher.Compute(purl); } internal static string MapComponentType(string? type) { if (string.IsNullOrWhiteSpace(type)) { return "library"; } var normalized = type.Trim().ToLowerInvariant(); return normalized switch { "application" => "application", "container" => "container", "framework" => "framework", "operating-system" => "operating-system", "operating system" => "operating-system", "os" => "operating-system", "device" => "device", "firmware" => "firmware", "file" => "file", _ => "library" }; } internal static string MapScope(ComponentScope scope) { return scope switch { ComponentScope.Optional => "optional", ComponentScope.Excluded => "excluded", ComponentScope.Unknown => "unknown", _ => "required" }; } private async Task UpsertArtifactAsync( NpgsqlConnection connection, NpgsqlTransaction transaction, string digest, string name, string? version, string sbomDigest, string sbomFormat, string sbomSpecVersion, int componentCount, CancellationToken cancellationToken) { const string sql = """ INSERT INTO analytics.artifacts ( artifact_type, name, version, digest, sbom_digest, sbom_format, sbom_spec_version, component_count, vulnerability_count, critical_count, high_count, medium_count, low_count, updated_at ) VALUES ( @artifact_type, @name, @version, @digest, @sbom_digest, @sbom_format, @sbom_spec_version, @component_count, 0, 0, 0, 0, 0, now() ) ON CONFLICT (digest) DO UPDATE SET name = EXCLUDED.name, version = COALESCE(EXCLUDED.version, analytics.artifacts.version), sbom_digest = EXCLUDED.sbom_digest, sbom_format = EXCLUDED.sbom_format, sbom_spec_version = EXCLUDED.sbom_spec_version, component_count = EXCLUDED.component_count, updated_at = now() RETURNING artifact_id; """; await using var command = new NpgsqlCommand(sql, connection, transaction); command.Parameters.AddWithValue("artifact_type", "container"); command.Parameters.AddWithValue("name", name); command.Parameters.AddWithValue("version", (object?)version ?? DBNull.Value); command.Parameters.AddWithValue("digest", digest); command.Parameters.AddWithValue("sbom_digest", sbomDigest); command.Parameters.AddWithValue("sbom_format", sbomFormat); command.Parameters.AddWithValue("sbom_spec_version", sbomSpecVersion); command.Parameters.AddWithValue("component_count", componentCount); var result = await command.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false); return result is Guid id ? id : Guid.Empty; } private async Task UpsertRawSbomAsync( NpgsqlConnection connection, NpgsqlTransaction transaction, Guid artifactId, string contentHash, long contentSize, string storageUri, string format, string specVersion, CancellationToken cancellationToken) { const string sql = """ INSERT INTO analytics.raw_sboms ( artifact_id, format, spec_version, content_hash, content_size, storage_uri, ingest_version, schema_version ) VALUES ( @artifact_id, @format, @spec_version, @content_hash, @content_size, @storage_uri, @ingest_version, @schema_version ) ON CONFLICT (content_hash) DO NOTHING; """; await using var command = new NpgsqlCommand(sql, connection, transaction); command.Parameters.AddWithValue("artifact_id", artifactId); command.Parameters.AddWithValue("format", format); command.Parameters.AddWithValue("spec_version", specVersion); command.Parameters.AddWithValue("content_hash", contentHash); command.Parameters.AddWithValue("content_size", contentSize); command.Parameters.AddWithValue("storage_uri", storageUri); command.Parameters.AddWithValue("ingest_version", _options.IngestVersion); command.Parameters.AddWithValue("schema_version", _options.SchemaVersion); await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); } private async Task UpsertComponentAsync( NpgsqlConnection connection, NpgsqlTransaction transaction, ComponentSeed seed, CancellationToken cancellationToken) { const string sql = """ INSERT INTO analytics.components ( purl, purl_type, purl_namespace, purl_name, purl_version, hash_sha256, name, version, description, component_type, supplier, supplier_normalized, license_declared, license_concluded, license_category, cpe ) SELECT @purl, parsed.purl_type, parsed.purl_namespace, parsed.purl_name, parsed.purl_version, @hash_sha256, @name, @version, @description, @component_type, @supplier, analytics.normalize_supplier(@supplier), @license_declared, @license_concluded, analytics.categorize_license(@license_concluded), @cpe FROM analytics.parse_purl(@purl) AS parsed ON CONFLICT (purl, hash_sha256) DO UPDATE SET last_seen_at = now(), updated_at = now(), supplier = COALESCE(EXCLUDED.supplier, analytics.components.supplier), supplier_normalized = COALESCE(EXCLUDED.supplier_normalized, analytics.components.supplier_normalized), license_declared = COALESCE(EXCLUDED.license_declared, analytics.components.license_declared), license_concluded = COALESCE(EXCLUDED.license_concluded, analytics.components.license_concluded), license_category = COALESCE(EXCLUDED.license_category, analytics.components.license_category), description = COALESCE(EXCLUDED.description, analytics.components.description), cpe = COALESCE(EXCLUDED.cpe, analytics.components.cpe), component_type = COALESCE(EXCLUDED.component_type, analytics.components.component_type), name = COALESCE(EXCLUDED.name, analytics.components.name), version = COALESCE(EXCLUDED.version, analytics.components.version) RETURNING component_id; """; await using var command = new NpgsqlCommand(sql, connection, transaction); command.Parameters.AddWithValue("purl", seed.Purl); command.Parameters.AddWithValue("hash_sha256", seed.HashSha256); command.Parameters.AddWithValue("name", seed.Name); command.Parameters.AddWithValue("version", (object?)seed.Version ?? DBNull.Value); command.Parameters.AddWithValue("description", (object?)seed.Description ?? DBNull.Value); command.Parameters.AddWithValue("component_type", seed.ComponentType); command.Parameters.AddWithValue("supplier", (object?)seed.Supplier ?? DBNull.Value); command.Parameters.AddWithValue("license_declared", (object?)seed.LicenseDeclared ?? DBNull.Value); command.Parameters.AddWithValue("license_concluded", (object?)seed.LicenseConcluded ?? DBNull.Value); command.Parameters.AddWithValue("cpe", (object?)seed.Cpe ?? DBNull.Value); var result = await command.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false); return result is Guid id ? id : Guid.Empty; } private async Task InsertArtifactComponentAsync( NpgsqlConnection connection, NpgsqlTransaction transaction, Guid artifactId, Guid componentId, ComponentSeed seed, CancellationToken cancellationToken) { const string sql = """ INSERT INTO analytics.artifact_components ( artifact_id, component_id, bom_ref, scope, dependency_path, depth, introduced_via ) VALUES ( @artifact_id, @component_id, @bom_ref, @scope, @dependency_path, @depth, @introduced_via ) ON CONFLICT (artifact_id, component_id) DO NOTHING; """; await using var command = new NpgsqlCommand(sql, connection, transaction); command.Parameters.AddWithValue("artifact_id", artifactId); command.Parameters.AddWithValue("component_id", componentId); command.Parameters.AddWithValue("bom_ref", seed.BomRef); command.Parameters.AddWithValue("scope", (object?)seed.Scope ?? DBNull.Value); command.Parameters.AddWithValue("depth", seed.Depth); command.Parameters.AddWithValue("introduced_via", (object?)seed.IntroducedVia ?? DBNull.Value); var pathParameter = new NpgsqlParameter("dependency_path", NpgsqlDbType.Array | NpgsqlDbType.Text) { Value = (object?)seed.DependencyPath ?? DBNull.Value }; command.Parameters.Add(pathParameter); var rows = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); return rows > 0; } private static async Task IncrementComponentCountsAsync( NpgsqlConnection connection, NpgsqlTransaction transaction, Guid componentId, CancellationToken cancellationToken) { const string sql = """ UPDATE analytics.components SET artifact_count = artifact_count + 1, sbom_count = sbom_count + 1, last_seen_at = now(), updated_at = now() WHERE component_id = @component_id; """; await using var command = new NpgsqlCommand(sql, connection, transaction); command.Parameters.AddWithValue("component_id", componentId); await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); } private bool IsTenantAllowed(string tenant) { return TenantNormalizer.IsAllowed(tenant, _options.AllowedTenants); } private sealed record ContentPayload(byte[] Bytes, long Length, string Digest); private sealed record ComponentSeed( string BomRef, string Purl, string HashSha256, string Name, string? Version, string ComponentType, string? Supplier, string? LicenseDeclared, string? LicenseConcluded, string? Description, string? Cpe, string? Scope, string[]? DependencyPath, int Depth, string? IntroducedVia); private sealed record ComponentKey(string Purl, string HashSha256); }