feat: Implement Runtime Facts ingestion service and NDJSON reader
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
- Added RuntimeFactsNdjsonReader for reading NDJSON formatted runtime facts. - Introduced IRuntimeFactsIngestionService interface and its implementation. - Enhanced Program.cs to register new services and endpoints for runtime facts. - Updated CallgraphIngestionService to include CAS URI in stored artifacts. - Created RuntimeFactsValidationException for validation errors during ingestion. - Added tests for RuntimeFactsIngestionService and RuntimeFactsNdjsonReader. - Implemented SignalsSealedModeMonitor for compliance checks in sealed mode. - Updated project dependencies for testing utilities.
This commit is contained in:
@@ -0,0 +1,255 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using StellaOps.Signals.Models;
|
||||
using StellaOps.Signals.Persistence;
|
||||
|
||||
namespace StellaOps.Signals.Services;
|
||||
|
||||
public sealed class RuntimeFactsIngestionService : IRuntimeFactsIngestionService
|
||||
{
|
||||
private readonly IReachabilityFactRepository factRepository;
|
||||
private readonly TimeProvider timeProvider;
|
||||
private readonly ILogger<RuntimeFactsIngestionService> logger;
|
||||
|
||||
public RuntimeFactsIngestionService(
|
||||
IReachabilityFactRepository factRepository,
|
||||
TimeProvider timeProvider,
|
||||
ILogger<RuntimeFactsIngestionService> logger)
|
||||
{
|
||||
this.factRepository = factRepository ?? throw new ArgumentNullException(nameof(factRepository));
|
||||
this.timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
|
||||
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
public async Task<RuntimeFactsIngestResponse> IngestAsync(RuntimeFactsIngestRequest request, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
|
||||
ValidateRequest(request);
|
||||
|
||||
var subjectKey = request.Subject.ToSubjectKey();
|
||||
|
||||
var existing = await factRepository.GetBySubjectAsync(subjectKey, cancellationToken).ConfigureAwait(false);
|
||||
var document = existing ?? new ReachabilityFactDocument
|
||||
{
|
||||
Subject = request.Subject,
|
||||
SubjectKey = subjectKey,
|
||||
};
|
||||
|
||||
document.CallgraphId = request.CallgraphId;
|
||||
document.Subject = request.Subject;
|
||||
document.SubjectKey = subjectKey;
|
||||
document.ComputedAt = timeProvider.GetUtcNow();
|
||||
var aggregated = AggregateRuntimeFacts(request.Events);
|
||||
document.RuntimeFacts = MergeRuntimeFacts(document.RuntimeFacts, aggregated);
|
||||
document.Metadata = MergeMetadata(document.Metadata, request.Metadata);
|
||||
|
||||
var persisted = await factRepository.UpsertAsync(document, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
logger.LogInformation(
|
||||
"Stored {RuntimeFactCount} runtime fact(s) for subject {SubjectKey} (callgraph={CallgraphId}).",
|
||||
persisted.RuntimeFacts?.Count ?? 0,
|
||||
subjectKey,
|
||||
request.CallgraphId);
|
||||
|
||||
return new RuntimeFactsIngestResponse
|
||||
{
|
||||
FactId = persisted.Id,
|
||||
SubjectKey = subjectKey,
|
||||
CallgraphId = request.CallgraphId,
|
||||
RuntimeFactCount = persisted.RuntimeFacts?.Count ?? 0,
|
||||
TotalHitCount = persisted.RuntimeFacts?.Sum(f => f.HitCount) ?? 0,
|
||||
StoredAt = persisted.ComputedAt,
|
||||
};
|
||||
}
|
||||
|
||||
private static void ValidateRequest(RuntimeFactsIngestRequest request)
|
||||
{
|
||||
if (request.Subject is null)
|
||||
{
|
||||
throw new RuntimeFactsValidationException("Subject is required.");
|
||||
}
|
||||
|
||||
var subjectKey = request.Subject.ToSubjectKey();
|
||||
if (string.IsNullOrWhiteSpace(subjectKey))
|
||||
{
|
||||
throw new RuntimeFactsValidationException("Subject must include either scanId, imageDigest, or component/version.");
|
||||
}
|
||||
|
||||
if (string.IsNullOrWhiteSpace(request.CallgraphId))
|
||||
{
|
||||
throw new RuntimeFactsValidationException("CallgraphId is required.");
|
||||
}
|
||||
|
||||
if (request.Events is null || request.Events.Count == 0)
|
||||
{
|
||||
throw new RuntimeFactsValidationException("At least one runtime event is required.");
|
||||
}
|
||||
|
||||
if (request.Events.Any(e => string.IsNullOrWhiteSpace(e.SymbolId)))
|
||||
{
|
||||
throw new RuntimeFactsValidationException("Runtime events must include symbolId.");
|
||||
}
|
||||
}
|
||||
|
||||
private static List<RuntimeFactDocument> AggregateRuntimeFacts(IEnumerable<RuntimeFactEvent> events)
|
||||
{
|
||||
var map = new Dictionary<RuntimeFactKey, RuntimeFactDocument>(RuntimeFactKeyComparer.Instance);
|
||||
|
||||
foreach (var evt in events)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(evt.SymbolId))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
var key = new RuntimeFactKey(evt.SymbolId.Trim(), evt.CodeId?.Trim(), evt.LoaderBase?.Trim());
|
||||
if (!map.TryGetValue(key, out var document))
|
||||
{
|
||||
document = new RuntimeFactDocument
|
||||
{
|
||||
SymbolId = key.SymbolId,
|
||||
CodeId = key.CodeId,
|
||||
LoaderBase = key.LoaderBase,
|
||||
Metadata = evt.Metadata != null
|
||||
? new Dictionary<string, string?>(evt.Metadata, StringComparer.Ordinal)
|
||||
: null
|
||||
};
|
||||
|
||||
map[key] = document;
|
||||
}
|
||||
else if (evt.Metadata != null && evt.Metadata.Count > 0)
|
||||
{
|
||||
document.Metadata ??= new Dictionary<string, string?>(StringComparer.Ordinal);
|
||||
foreach (var kvp in evt.Metadata)
|
||||
{
|
||||
document.Metadata[kvp.Key] = kvp.Value;
|
||||
}
|
||||
}
|
||||
|
||||
document.HitCount = Math.Clamp(document.HitCount + Math.Max(evt.HitCount, 1), 1, int.MaxValue);
|
||||
}
|
||||
|
||||
return map.Values.ToList();
|
||||
}
|
||||
|
||||
private static Dictionary<string, string?>? MergeMetadata(
|
||||
Dictionary<string, string?>? existing,
|
||||
Dictionary<string, string?>? incoming)
|
||||
{
|
||||
if (existing is null && incoming is null)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
var merged = existing is null
|
||||
? new Dictionary<string, string?>(StringComparer.Ordinal)
|
||||
: new Dictionary<string, string?>(existing, StringComparer.Ordinal);
|
||||
|
||||
if (incoming != null)
|
||||
{
|
||||
foreach (var (key, value) in incoming)
|
||||
{
|
||||
merged[key] = value;
|
||||
}
|
||||
}
|
||||
|
||||
return merged;
|
||||
}
|
||||
|
||||
private static List<RuntimeFactDocument> MergeRuntimeFacts(
|
||||
List<RuntimeFactDocument>? existing,
|
||||
List<RuntimeFactDocument> incoming)
|
||||
{
|
||||
var map = new Dictionary<RuntimeFactKey, RuntimeFactDocument>(RuntimeFactKeyComparer.Instance);
|
||||
|
||||
if (existing is { Count: > 0 })
|
||||
{
|
||||
foreach (var fact in existing)
|
||||
{
|
||||
var key = new RuntimeFactKey(fact.SymbolId, fact.CodeId, fact.LoaderBase);
|
||||
map[key] = new RuntimeFactDocument
|
||||
{
|
||||
SymbolId = fact.SymbolId,
|
||||
CodeId = fact.CodeId,
|
||||
LoaderBase = fact.LoaderBase,
|
||||
HitCount = fact.HitCount,
|
||||
Metadata = fact.Metadata is null
|
||||
? null
|
||||
: new Dictionary<string, string?>(fact.Metadata, StringComparer.Ordinal)
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
if (incoming.Count > 0)
|
||||
{
|
||||
foreach (var fact in incoming)
|
||||
{
|
||||
var key = new RuntimeFactKey(fact.SymbolId, fact.CodeId, fact.LoaderBase);
|
||||
if (!map.TryGetValue(key, out var existingFact))
|
||||
{
|
||||
map[key] = new RuntimeFactDocument
|
||||
{
|
||||
SymbolId = fact.SymbolId,
|
||||
CodeId = fact.CodeId,
|
||||
LoaderBase = fact.LoaderBase,
|
||||
HitCount = fact.HitCount,
|
||||
Metadata = fact.Metadata is null
|
||||
? null
|
||||
: new Dictionary<string, string?>(fact.Metadata, StringComparer.Ordinal)
|
||||
};
|
||||
continue;
|
||||
}
|
||||
|
||||
existingFact.HitCount = Math.Clamp(existingFact.HitCount + fact.HitCount, 1, int.MaxValue);
|
||||
if (fact.Metadata != null && fact.Metadata.Count > 0)
|
||||
{
|
||||
existingFact.Metadata ??= new Dictionary<string, string?>(StringComparer.Ordinal);
|
||||
foreach (var (key, value) in fact.Metadata)
|
||||
{
|
||||
existingFact.Metadata[key] = value;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return map.Values
|
||||
.OrderBy(doc => doc.SymbolId, StringComparer.Ordinal)
|
||||
.ThenBy(doc => doc.CodeId, StringComparer.Ordinal)
|
||||
.ThenBy(doc => doc.LoaderBase, StringComparer.Ordinal)
|
||||
.ToList();
|
||||
}
|
||||
|
||||
private readonly record struct RuntimeFactKey(string SymbolId, string? CodeId, string? LoaderBase);
|
||||
|
||||
private sealed class RuntimeFactKeyComparer : IEqualityComparer<RuntimeFactKey>
|
||||
{
|
||||
public static RuntimeFactKeyComparer Instance { get; } = new();
|
||||
|
||||
public bool Equals(RuntimeFactKey x, RuntimeFactKey y) =>
|
||||
string.Equals(x.SymbolId, y.SymbolId, StringComparison.Ordinal) &&
|
||||
string.Equals(x.CodeId, y.CodeId, StringComparison.Ordinal) &&
|
||||
string.Equals(x.LoaderBase, y.LoaderBase, StringComparison.Ordinal);
|
||||
|
||||
public int GetHashCode(RuntimeFactKey obj)
|
||||
{
|
||||
var hash = new HashCode();
|
||||
hash.Add(obj.SymbolId, StringComparer.Ordinal);
|
||||
if (obj.CodeId is not null)
|
||||
{
|
||||
hash.Add(obj.CodeId, StringComparer.Ordinal);
|
||||
}
|
||||
|
||||
if (obj.LoaderBase is not null)
|
||||
{
|
||||
hash.Add(obj.LoaderBase, StringComparer.Ordinal);
|
||||
}
|
||||
|
||||
return hash.ToHashCode();
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user