using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using StellaOps.Signals.Models; using StellaOps.Signals.Persistence; namespace StellaOps.Signals.Services; public sealed class RuntimeFactsIngestionService : IRuntimeFactsIngestionService { private readonly IReachabilityFactRepository factRepository; private readonly TimeProvider timeProvider; private readonly ILogger logger; public RuntimeFactsIngestionService( IReachabilityFactRepository factRepository, TimeProvider timeProvider, ILogger logger) { this.factRepository = factRepository ?? throw new ArgumentNullException(nameof(factRepository)); this.timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider)); this.logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public async Task IngestAsync(RuntimeFactsIngestRequest request, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(request); ValidateRequest(request); var subjectKey = request.Subject.ToSubjectKey(); var existing = await factRepository.GetBySubjectAsync(subjectKey, cancellationToken).ConfigureAwait(false); var document = existing ?? new ReachabilityFactDocument { Subject = request.Subject, SubjectKey = subjectKey, }; document.CallgraphId = request.CallgraphId; document.Subject = request.Subject; document.SubjectKey = subjectKey; document.ComputedAt = timeProvider.GetUtcNow(); var aggregated = AggregateRuntimeFacts(request.Events); document.RuntimeFacts = MergeRuntimeFacts(document.RuntimeFacts, aggregated); document.Metadata = MergeMetadata(document.Metadata, request.Metadata); var persisted = await factRepository.UpsertAsync(document, cancellationToken).ConfigureAwait(false); logger.LogInformation( "Stored {RuntimeFactCount} runtime fact(s) for subject {SubjectKey} (callgraph={CallgraphId}).", persisted.RuntimeFacts?.Count ?? 0, subjectKey, request.CallgraphId); return new RuntimeFactsIngestResponse { FactId = persisted.Id, SubjectKey = subjectKey, CallgraphId = request.CallgraphId, RuntimeFactCount = persisted.RuntimeFacts?.Count ?? 0, TotalHitCount = persisted.RuntimeFacts?.Sum(f => f.HitCount) ?? 0, StoredAt = persisted.ComputedAt, }; } private static void ValidateRequest(RuntimeFactsIngestRequest request) { if (request.Subject is null) { throw new RuntimeFactsValidationException("Subject is required."); } var subjectKey = request.Subject.ToSubjectKey(); if (string.IsNullOrWhiteSpace(subjectKey)) { throw new RuntimeFactsValidationException("Subject must include either scanId, imageDigest, or component/version."); } if (string.IsNullOrWhiteSpace(request.CallgraphId)) { throw new RuntimeFactsValidationException("CallgraphId is required."); } if (request.Events is null || request.Events.Count == 0) { throw new RuntimeFactsValidationException("At least one runtime event is required."); } if (request.Events.Any(e => string.IsNullOrWhiteSpace(e.SymbolId))) { throw new RuntimeFactsValidationException("Runtime events must include symbolId."); } } private static List AggregateRuntimeFacts(IEnumerable events) { var map = new Dictionary(RuntimeFactKeyComparer.Instance); foreach (var evt in events) { if (string.IsNullOrWhiteSpace(evt.SymbolId)) { continue; } var key = new RuntimeFactKey(evt.SymbolId.Trim(), evt.CodeId?.Trim(), evt.LoaderBase?.Trim()); if (!map.TryGetValue(key, out var document)) { document = new RuntimeFactDocument { SymbolId = key.SymbolId, CodeId = key.CodeId, LoaderBase = key.LoaderBase, ProcessId = evt.ProcessId, ProcessName = Normalize(evt.ProcessName), SocketAddress = Normalize(evt.SocketAddress), ContainerId = Normalize(evt.ContainerId), EvidenceUri = Normalize(evt.EvidenceUri), Metadata = evt.Metadata != null ? new Dictionary(evt.Metadata, StringComparer.Ordinal) : null }; map[key] = document; } else if (evt.Metadata != null && evt.Metadata.Count > 0) { document.Metadata ??= new Dictionary(StringComparer.Ordinal); foreach (var kvp in evt.Metadata) { document.Metadata[kvp.Key] = kvp.Value; } } document.HitCount = Math.Clamp(document.HitCount + Math.Max(evt.HitCount, 1), 1, int.MaxValue); } return map.Values.ToList(); } private static Dictionary? MergeMetadata( Dictionary? existing, Dictionary? incoming) { if (existing is null && incoming is null) { return null; } var merged = existing is null ? new Dictionary(StringComparer.Ordinal) : new Dictionary(existing, StringComparer.Ordinal); if (incoming != null) { foreach (var (metaKey, metaValue) in incoming) { merged[metaKey] = metaValue; } } return merged; } private static List MergeRuntimeFacts( List? existing, List incoming) { var map = new Dictionary(RuntimeFactKeyComparer.Instance); if (existing is { Count: > 0 }) { foreach (var fact in existing) { var key = new RuntimeFactKey(fact.SymbolId, fact.CodeId, fact.LoaderBase); map[key] = new RuntimeFactDocument { SymbolId = fact.SymbolId, CodeId = fact.CodeId, LoaderBase = fact.LoaderBase, ProcessId = fact.ProcessId, ProcessName = fact.ProcessName, SocketAddress = fact.SocketAddress, ContainerId = fact.ContainerId, EvidenceUri = fact.EvidenceUri, HitCount = fact.HitCount, Metadata = fact.Metadata is null ? null : new Dictionary(fact.Metadata, StringComparer.Ordinal) }; } } if (incoming.Count > 0) { foreach (var fact in incoming) { var key = new RuntimeFactKey(fact.SymbolId, fact.CodeId, fact.LoaderBase); if (!map.TryGetValue(key, out var existingFact)) { map[key] = new RuntimeFactDocument { SymbolId = fact.SymbolId, CodeId = fact.CodeId, LoaderBase = fact.LoaderBase, ProcessId = fact.ProcessId, ProcessName = fact.ProcessName, SocketAddress = fact.SocketAddress, ContainerId = fact.ContainerId, EvidenceUri = fact.EvidenceUri, HitCount = fact.HitCount, Metadata = fact.Metadata is null ? null : new Dictionary(fact.Metadata, StringComparer.Ordinal) }; continue; } existingFact.HitCount = Math.Clamp(existingFact.HitCount + fact.HitCount, 1, int.MaxValue); existingFact.ProcessId ??= fact.ProcessId; existingFact.ProcessName ??= fact.ProcessName; existingFact.SocketAddress ??= fact.SocketAddress; existingFact.ContainerId ??= fact.ContainerId; existingFact.EvidenceUri ??= fact.EvidenceUri; if (fact.Metadata != null && fact.Metadata.Count > 0) { existingFact.Metadata ??= new Dictionary(StringComparer.Ordinal); foreach (var (metaKey, metaValue) in fact.Metadata) { existingFact.Metadata[metaKey] = metaValue; } } } } return map.Values .OrderBy(doc => doc.SymbolId, StringComparer.Ordinal) .ThenBy(doc => doc.CodeId, StringComparer.Ordinal) .ThenBy(doc => doc.LoaderBase, StringComparer.Ordinal) .ToList(); } private static string? Normalize(string? value) => string.IsNullOrWhiteSpace(value) ? null : value.Trim(); private readonly record struct RuntimeFactKey(string SymbolId, string? CodeId, string? LoaderBase); private sealed class RuntimeFactKeyComparer : IEqualityComparer { public static RuntimeFactKeyComparer Instance { get; } = new(); public bool Equals(RuntimeFactKey x, RuntimeFactKey y) => string.Equals(x.SymbolId, y.SymbolId, StringComparison.Ordinal) && string.Equals(x.CodeId, y.CodeId, StringComparison.Ordinal) && string.Equals(x.LoaderBase, y.LoaderBase, StringComparison.Ordinal); public int GetHashCode(RuntimeFactKey obj) { var hash = new HashCode(); hash.Add(obj.SymbolId, StringComparer.Ordinal); if (obj.CodeId is not null) { hash.Add(obj.CodeId, StringComparer.Ordinal); } if (obj.LoaderBase is not null) { hash.Add(obj.LoaderBase, StringComparer.Ordinal); } return hash.ToHashCode(); } } }