using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using StellaOps.Signals.Models; using StellaOps.Signals.Persistence; namespace StellaOps.Signals.Services; public sealed class RuntimeFactsIngestionService : IRuntimeFactsIngestionService { private readonly IReachabilityFactRepository factRepository; private readonly TimeProvider timeProvider; private readonly IReachabilityCache cache; private readonly IEventsPublisher eventsPublisher; private readonly IReachabilityScoringService scoringService; private readonly ILogger logger; public RuntimeFactsIngestionService( IReachabilityFactRepository factRepository, TimeProvider timeProvider, IReachabilityCache cache, IEventsPublisher eventsPublisher, IReachabilityScoringService scoringService, ILogger logger) { this.factRepository = factRepository ?? throw new ArgumentNullException(nameof(factRepository)); this.timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider)); this.cache = cache ?? throw new ArgumentNullException(nameof(cache)); this.eventsPublisher = eventsPublisher ?? throw new ArgumentNullException(nameof(eventsPublisher)); this.scoringService = scoringService ?? throw new ArgumentNullException(nameof(scoringService)); this.logger = logger ?? NullLogger.Instance; } public async Task IngestAsync(RuntimeFactsIngestRequest request, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(request); ValidateRequest(request); var subjectKey = request.Subject.ToSubjectKey(); var existing = await factRepository.GetBySubjectAsync(subjectKey, cancellationToken).ConfigureAwait(false); var document = existing ?? new ReachabilityFactDocument { Subject = request.Subject, SubjectKey = subjectKey, }; document.CallgraphId = request.CallgraphId; document.Subject = request.Subject; document.SubjectKey = subjectKey; document.ComputedAt = timeProvider.GetUtcNow(); var aggregated = AggregateRuntimeFacts(request.Events); document.RuntimeFacts = MergeRuntimeFacts(document.RuntimeFacts, aggregated); document.Metadata = MergeMetadata(document.Metadata, request.Metadata); document.Metadata ??= new Dictionary(StringComparer.Ordinal); document.Metadata.TryAdd("provenance.source", request.Metadata?.TryGetValue("source", out var source) == true ? source : "runtime"); document.Metadata["provenance.ingestedAt"] = document.ComputedAt.ToString("O"); document.Metadata["provenance.callgraphId"] = request.CallgraphId; var persisted = await factRepository.UpsertAsync(document, cancellationToken).ConfigureAwait(false); await cache.SetAsync(persisted, cancellationToken).ConfigureAwait(false); await eventsPublisher.PublishFactUpdatedAsync(persisted, cancellationToken).ConfigureAwait(false); await RecomputeReachabilityAsync(persisted, aggregated, request, cancellationToken).ConfigureAwait(false); logger.LogInformation( "Stored {RuntimeFactCount} runtime fact(s) for subject {SubjectKey} (callgraph={CallgraphId}).", persisted.RuntimeFacts?.Count ?? 0, subjectKey, request.CallgraphId); return new RuntimeFactsIngestResponse { FactId = persisted.Id, SubjectKey = subjectKey, CallgraphId = request.CallgraphId, RuntimeFactCount = persisted.RuntimeFacts?.Count ?? 0, TotalHitCount = persisted.RuntimeFacts?.Sum(f => f.HitCount) ?? 0, StoredAt = persisted.ComputedAt, }; } private static void ValidateRequest(RuntimeFactsIngestRequest request) { if (request.Subject is null) { throw new RuntimeFactsValidationException("Subject is required."); } var subjectKey = request.Subject.ToSubjectKey(); if (string.IsNullOrWhiteSpace(subjectKey)) { throw new RuntimeFactsValidationException("Subject must include either scanId, imageDigest, or component/version."); } if (string.IsNullOrWhiteSpace(request.CallgraphId)) { throw new RuntimeFactsValidationException("CallgraphId is required."); } if (request.Events is null || request.Events.Count == 0) { throw new RuntimeFactsValidationException("At least one runtime event is required."); } if (request.Events.Any(e => string.IsNullOrWhiteSpace(e.SymbolId))) { throw new RuntimeFactsValidationException("Runtime events must include symbolId."); } } private static List AggregateRuntimeFacts(IEnumerable events) { var map = new Dictionary(RuntimeFactKeyComparer.Instance); foreach (var evt in events) { if (string.IsNullOrWhiteSpace(evt.SymbolId)) { continue; } var key = new RuntimeFactKey( evt.SymbolId.Trim(), evt.CodeId?.Trim(), evt.LoaderBase?.Trim(), evt.Purl?.Trim(), evt.SymbolDigest?.Trim(), evt.BuildId?.Trim()); if (!map.TryGetValue(key, out var document)) { document = new RuntimeFactDocument { SymbolId = key.SymbolId, CodeId = key.CodeId, LoaderBase = key.LoaderBase, Purl = key.Purl, SymbolDigest = key.SymbolDigest, BuildId = key.BuildId, ProcessId = evt.ProcessId, ProcessName = Normalize(evt.ProcessName), SocketAddress = Normalize(evt.SocketAddress), ContainerId = Normalize(evt.ContainerId), EvidenceUri = Normalize(evt.EvidenceUri), ObservedAt = evt.ObservedAt, Metadata = evt.Metadata != null ? new Dictionary(evt.Metadata, StringComparer.Ordinal) : null }; map[key] = document; } else if (evt.Metadata != null && evt.Metadata.Count > 0) { document.Metadata ??= new Dictionary(StringComparer.Ordinal); foreach (var kvp in evt.Metadata) { document.Metadata[kvp.Key] = kvp.Value; } } document.HitCount = Math.Clamp(document.HitCount + Math.Max(evt.HitCount, 1), 1, int.MaxValue); document.Purl ??= Normalize(evt.Purl); document.SymbolDigest ??= Normalize(evt.SymbolDigest); document.BuildId ??= Normalize(evt.BuildId); document.ObservedAt ??= evt.ObservedAt; } return map.Values.ToList(); } private static Dictionary? MergeMetadata( Dictionary? existing, Dictionary? incoming) { if (existing is null && incoming is null) { return null; } var merged = existing is null ? new Dictionary(StringComparer.Ordinal) : new Dictionary(existing, StringComparer.Ordinal); if (incoming != null) { foreach (var (metaKey, metaValue) in incoming) { merged[metaKey] = metaValue; } } return merged; } private static List MergeRuntimeFacts( List? existing, List incoming) { var map = new Dictionary(RuntimeFactKeyComparer.Instance); if (existing is { Count: > 0 }) { foreach (var fact in existing) { var key = new RuntimeFactKey(fact.SymbolId, fact.CodeId, fact.LoaderBase, fact.Purl, fact.SymbolDigest, fact.BuildId); map[key] = new RuntimeFactDocument { SymbolId = fact.SymbolId, CodeId = fact.CodeId, LoaderBase = fact.LoaderBase, Purl = fact.Purl, SymbolDigest = fact.SymbolDigest, BuildId = fact.BuildId, ProcessId = fact.ProcessId, ProcessName = fact.ProcessName, SocketAddress = fact.SocketAddress, ContainerId = fact.ContainerId, EvidenceUri = fact.EvidenceUri, HitCount = fact.HitCount, ObservedAt = fact.ObservedAt, Metadata = fact.Metadata is null ? null : new Dictionary(fact.Metadata, StringComparer.Ordinal) }; } } if (incoming.Count > 0) { foreach (var fact in incoming) { var key = new RuntimeFactKey(fact.SymbolId, fact.CodeId, fact.LoaderBase, fact.Purl, fact.SymbolDigest, fact.BuildId); if (!map.TryGetValue(key, out var existingFact)) { map[key] = new RuntimeFactDocument { SymbolId = fact.SymbolId, CodeId = fact.CodeId, LoaderBase = fact.LoaderBase, Purl = fact.Purl, SymbolDigest = fact.SymbolDigest, BuildId = fact.BuildId, ProcessId = fact.ProcessId, ProcessName = fact.ProcessName, SocketAddress = fact.SocketAddress, ContainerId = fact.ContainerId, EvidenceUri = fact.EvidenceUri, HitCount = fact.HitCount, ObservedAt = fact.ObservedAt, Metadata = fact.Metadata is null ? null : new Dictionary(fact.Metadata, StringComparer.Ordinal) }; continue; } existingFact.HitCount = Math.Clamp(existingFact.HitCount + fact.HitCount, 1, int.MaxValue); existingFact.ProcessId ??= fact.ProcessId; existingFact.ProcessName ??= fact.ProcessName; existingFact.SocketAddress ??= fact.SocketAddress; existingFact.ContainerId ??= fact.ContainerId; existingFact.EvidenceUri ??= fact.EvidenceUri; existingFact.Purl ??= fact.Purl; existingFact.SymbolDigest ??= fact.SymbolDigest; existingFact.BuildId ??= fact.BuildId; existingFact.ObservedAt ??= fact.ObservedAt; if (fact.Metadata != null && fact.Metadata.Count > 0) { existingFact.Metadata ??= new Dictionary(StringComparer.Ordinal); foreach (var (metaKey, metaValue) in fact.Metadata) { existingFact.Metadata[metaKey] = metaValue; } } } } return map.Values .OrderBy(doc => doc.SymbolId, StringComparer.Ordinal) .ThenBy(doc => doc.CodeId, StringComparer.Ordinal) .ThenBy(doc => doc.LoaderBase, StringComparer.Ordinal) .ToList(); } private async Task RecomputeReachabilityAsync( ReachabilityFactDocument persisted, List aggregatedRuntimeFacts, RuntimeFactsIngestRequest request, CancellationToken cancellationToken) { var targets = new HashSet(StringComparer.Ordinal); if (persisted.States is { Count: > 0 }) { foreach (var state in persisted.States) { if (!string.IsNullOrWhiteSpace(state.Target)) { targets.Add(state.Target.Trim()); } } } foreach (var fact in aggregatedRuntimeFacts) { if (!string.IsNullOrWhiteSpace(fact.SymbolId)) { targets.Add(fact.SymbolId.Trim()); } } var runtimeHits = aggregatedRuntimeFacts .Select(f => f.SymbolId) .Where(id => !string.IsNullOrWhiteSpace(id)) .Select(id => id.Trim()) .Distinct(StringComparer.Ordinal) .ToList(); if (targets.Count == 0) { return; } var requestMetadata = MergeMetadata(persisted.Metadata, request.Metadata); var recomputeRequest = new ReachabilityRecomputeRequest { CallgraphId = request.CallgraphId, Subject = request.Subject, EntryPoints = persisted.EntryPoints ?? new List(), Targets = targets.ToList(), RuntimeHits = runtimeHits, Metadata = requestMetadata }; try { await scoringService.RecomputeAsync(recomputeRequest, cancellationToken).ConfigureAwait(false); } catch (Exception ex) { logger.LogError(ex, "Failed to recompute reachability after runtime ingestion for subject {SubjectKey}.", persisted.SubjectKey); throw; } } private static string? Normalize(string? value) => string.IsNullOrWhiteSpace(value) ? null : value.Trim(); private readonly record struct RuntimeFactKey(string SymbolId, string? CodeId, string? LoaderBase, string? Purl, string? SymbolDigest, string? BuildId); private sealed class RuntimeFactKeyComparer : IEqualityComparer { public static RuntimeFactKeyComparer Instance { get; } = new(); public bool Equals(RuntimeFactKey x, RuntimeFactKey y) => string.Equals(x.SymbolId, y.SymbolId, StringComparison.Ordinal) && string.Equals(x.CodeId, y.CodeId, StringComparison.Ordinal) && string.Equals(x.LoaderBase, y.LoaderBase, StringComparison.Ordinal) && string.Equals(x.Purl, y.Purl, StringComparison.Ordinal) && string.Equals(x.SymbolDigest, y.SymbolDigest, StringComparison.Ordinal) && string.Equals(x.BuildId, y.BuildId, StringComparison.Ordinal); public int GetHashCode(RuntimeFactKey obj) { var hash = new HashCode(); hash.Add(obj.SymbolId, StringComparer.Ordinal); if (obj.CodeId is not null) { hash.Add(obj.CodeId, StringComparer.Ordinal); } if (obj.LoaderBase is not null) { hash.Add(obj.LoaderBase, StringComparer.Ordinal); } if (obj.Purl is not null) { hash.Add(obj.Purl, StringComparer.Ordinal); } if (obj.SymbolDigest is not null) { hash.Add(obj.SymbolDigest, StringComparer.Ordinal); } if (obj.BuildId is not null) { hash.Add(obj.BuildId, StringComparer.Ordinal); } return hash.ToHashCode(); } } }