using System.IO.Compression; using System.Text; using System.Text.Json; using Microsoft.Extensions.Logging.Abstractions; using StellaOps.Cryptography; using StellaOps.Determinism; using StellaOps.Signals.Models; using StellaOps.Signals.Persistence; using StellaOps.Signals.Services; using StellaOps.Signals.Storage; using StellaOps.Signals.Storage.Models; using Xunit; using StellaOps.TestKit; namespace StellaOps.Signals.Tests; public class RuntimeFactsBatchIngestionTests { private const string TestTenantId = "test-tenant"; private const string TestCallgraphId = "test-callgraph-123"; [Trait("Category", TestCategories.Unit)] [Fact] public async Task IngestBatchAsync_ParsesNdjsonAndStoresArtifact() { // Arrange var repository = new InMemoryReachabilityFactRepository(); var artifactStore = new InMemoryRuntimeFactsArtifactStore(); var cryptoHash = DefaultCryptoHash.CreateForTests(); var service = CreateService(repository, artifactStore, cryptoHash); var events = new[] { new { symbolId = "func_a", hitCount = 5, callgraphId = TestCallgraphId, subject = new { scanId = "scan-1" } }, new { symbolId = "func_b", hitCount = 3, callgraphId = TestCallgraphId, subject = new { scanId = "scan-1" } }, new { symbolId = "func_c", hitCount = 1, callgraphId = TestCallgraphId, subject = new { scanId = "scan-1" } } }; var ndjson = string.Join("\n", events.Select(e => JsonSerializer.Serialize(e))); using var stream = new MemoryStream(Encoding.UTF8.GetBytes(ndjson)); // Act var result = await service.IngestBatchAsync(TestTenantId, stream, "application/x-ndjson", CancellationToken.None); // Assert Assert.NotNull(result); Assert.StartsWith("cas://reachability/runtime-facts/", result.CasUri); Assert.StartsWith("blake3:", result.BatchHash); Assert.Equal(1, result.ProcessedCount); Assert.Equal(3, result.TotalEvents); Assert.Equal(9, result.TotalHitCount); Assert.Contains("scan-1", result.SubjectKeys); Assert.True(artifactStore.StoredArtifacts.Count > 0); } [Trait("Category", TestCategories.Unit)] [Fact] public async Task IngestBatchAsync_HandlesGzipCompressedContent() { // Arrange var repository = new InMemoryReachabilityFactRepository(); var artifactStore = new InMemoryRuntimeFactsArtifactStore(); var cryptoHash = DefaultCryptoHash.CreateForTests(); var service = CreateService(repository, artifactStore, cryptoHash); var events = new[] { new { symbolId = "func_gzip", hitCount = 10, callgraphId = TestCallgraphId, subject = new { scanId = "scan-gzip" } } }; var ndjson = string.Join("\n", events.Select(e => JsonSerializer.Serialize(e))); using var compressedStream = new MemoryStream(); await using (var gzipStream = new GZipStream(compressedStream, CompressionMode.Compress, leaveOpen: true)) { await gzipStream.WriteAsync(Encoding.UTF8.GetBytes(ndjson)); } compressedStream.Position = 0; // Act var result = await service.IngestBatchAsync(TestTenantId, compressedStream, "application/gzip", CancellationToken.None); // Assert Assert.Equal(1, result.ProcessedCount); Assert.Equal(1, result.TotalEvents); Assert.Equal(10, result.TotalHitCount); } [Trait("Category", TestCategories.Unit)] [Fact] public async Task IngestBatchAsync_GroupsEventsBySubject() { // Arrange var repository = new InMemoryReachabilityFactRepository(); var artifactStore = new InMemoryRuntimeFactsArtifactStore(); var cryptoHash = DefaultCryptoHash.CreateForTests(); var service = CreateService(repository, artifactStore, cryptoHash); var events = new[] { new { symbolId = "func_a", hitCount = 1, callgraphId = "cg-1", subject = new { scanId = "scan-1" } }, new { symbolId = "func_b", hitCount = 2, callgraphId = "cg-1", subject = new { scanId = "scan-1" } }, new { symbolId = "func_c", hitCount = 3, callgraphId = "cg-2", subject = new { scanId = "scan-2" } }, new { symbolId = "func_d", hitCount = 4, callgraphId = "cg-2", subject = new { scanId = "scan-2" } } }; var ndjson = string.Join("\n", events.Select(e => JsonSerializer.Serialize(e))); using var stream = new MemoryStream(Encoding.UTF8.GetBytes(ndjson)); // Act var result = await service.IngestBatchAsync(TestTenantId, stream, "application/x-ndjson", CancellationToken.None); // Assert Assert.Equal(2, result.ProcessedCount); Assert.Equal(4, result.TotalEvents); Assert.Equal(10, result.TotalHitCount); Assert.Contains("scan-1", result.SubjectKeys); Assert.Contains("scan-2", result.SubjectKeys); } [Trait("Category", TestCategories.Unit)] [Fact] public async Task IngestBatchAsync_LinksCasUriToFactDocument() { // Arrange var repository = new InMemoryReachabilityFactRepository(); var artifactStore = new InMemoryRuntimeFactsArtifactStore(); var cryptoHash = DefaultCryptoHash.CreateForTests(); var service = CreateService(repository, artifactStore, cryptoHash); var events = new[] { new { symbolId = "func_link", hitCount = 1, callgraphId = TestCallgraphId, subject = new { scanId = "scan-link" } } }; var ndjson = string.Join("\n", events.Select(e => JsonSerializer.Serialize(e))); using var stream = new MemoryStream(Encoding.UTF8.GetBytes(ndjson)); // Act var result = await service.IngestBatchAsync(TestTenantId, stream, "application/x-ndjson", CancellationToken.None); // Assert var fact = await repository.GetBySubjectAsync("scan-link", CancellationToken.None); Assert.NotNull(fact); Assert.Equal(result.CasUri, fact.RuntimeFactsBatchUri); Assert.Equal(result.BatchHash, fact.RuntimeFactsBatchHash); } [Trait("Category", TestCategories.Unit)] [Fact] public async Task IngestBatchAsync_SkipsInvalidLines() { // Arrange var repository = new InMemoryReachabilityFactRepository(); var artifactStore = new InMemoryRuntimeFactsArtifactStore(); var cryptoHash = DefaultCryptoHash.CreateForTests(); var service = CreateService(repository, artifactStore, cryptoHash); var ndjson = """ {"symbolId": "func_valid", "hitCount": 1, "callgraphId": "cg-1", "subject": {"scanId": "scan-skip"}} invalid json line {"symbolId": "", "hitCount": 1, "callgraphId": "cg-1", "subject": {"scanId": "scan-skip"}} {"symbolId": "func_valid2", "hitCount": 2, "callgraphId": "cg-1", "subject": {"scanId": "scan-skip"}} """; using var stream = new MemoryStream(Encoding.UTF8.GetBytes(ndjson)); // Act var result = await service.IngestBatchAsync(TestTenantId, stream, "application/x-ndjson", CancellationToken.None); // Assert Assert.Equal(1, result.ProcessedCount); Assert.Equal(2, result.TotalEvents); // Only valid lines Assert.Equal(3, result.TotalHitCount); } [Trait("Category", TestCategories.Unit)] [Fact] public async Task IngestBatchAsync_WorksWithoutArtifactStore() { // Arrange var repository = new InMemoryReachabilityFactRepository(); var service = CreateService(repository, artifactStore: null, cryptoHash: null); var events = new[] { new { symbolId = "func_no_cas", hitCount = 5, callgraphId = TestCallgraphId, subject = new { scanId = "scan-no-cas" } } }; var ndjson = string.Join("\n", events.Select(e => JsonSerializer.Serialize(e))); using var stream = new MemoryStream(Encoding.UTF8.GetBytes(ndjson)); // Act var result = await service.IngestBatchAsync(TestTenantId, stream, "application/x-ndjson", CancellationToken.None); // Assert Assert.NotNull(result); Assert.StartsWith("cas://reachability/runtime-facts/", result.CasUri); Assert.Equal(1, result.ProcessedCount); } private static RuntimeFactsIngestionService CreateService( IReachabilityFactRepository repository, IRuntimeFactsArtifactStore? artifactStore, ICryptoHash? cryptoHash) { var cache = new InMemoryReachabilityCache(); var eventsPublisher = new NullEventsPublisher(); var scoringService = new StubReachabilityScoringService(); var provenanceNormalizer = new StubProvenanceNormalizer(); return new RuntimeFactsIngestionService( repository, TimeProvider.System, SystemGuidProvider.Instance, cache, eventsPublisher, scoringService, provenanceNormalizer, NullLogger.Instance, artifactStore, cryptoHash); } private sealed class InMemoryReachabilityFactRepository : IReachabilityFactRepository { private readonly Dictionary _facts = new(StringComparer.Ordinal); public Task UpsertAsync(ReachabilityFactDocument document, CancellationToken cancellationToken) { _facts[document.SubjectKey] = document; return Task.FromResult(document); } public Task GetBySubjectAsync(string subjectKey, CancellationToken cancellationToken) => Task.FromResult(_facts.TryGetValue(subjectKey, out var doc) ? doc : null); public Task> GetExpiredAsync(DateTimeOffset cutoff, int limit, CancellationToken cancellationToken) => Task.FromResult>([]); public Task DeleteAsync(string subjectKey, CancellationToken cancellationToken) { var removed = _facts.Remove(subjectKey); return Task.FromResult(removed); } public Task GetRuntimeFactsCountAsync(string subjectKey, CancellationToken cancellationToken) => Task.FromResult(_facts.TryGetValue(subjectKey, out var doc) ? doc.RuntimeFacts?.Count ?? 0 : 0); public Task TrimRuntimeFactsAsync(string subjectKey, int maxCount, CancellationToken cancellationToken) => Task.CompletedTask; } private sealed class InMemoryRuntimeFactsArtifactStore : IRuntimeFactsArtifactStore { public Dictionary StoredArtifacts { get; } = new(StringComparer.Ordinal); public async Task SaveAsync(RuntimeFactsArtifactSaveRequest request, Stream content, CancellationToken cancellationToken) { using var ms = new MemoryStream(); await content.CopyToAsync(ms, cancellationToken); var artifact = new StoredRuntimeFactsArtifact( Path: $"cas/reachability/runtime-facts/{request.Hash[..2]}/{request.Hash}/{request.FileName}", Length: ms.Length, Hash: request.Hash, ContentType: request.ContentType, CasUri: $"cas://reachability/runtime-facts/{request.Hash}"); StoredArtifacts[request.Hash] = artifact; return artifact; } public Task GetAsync(string hash, CancellationToken cancellationToken) => Task.FromResult(null); public Task ExistsAsync(string hash, CancellationToken cancellationToken) => Task.FromResult(StoredArtifacts.ContainsKey(hash)); public Task DeleteAsync(string hash, CancellationToken cancellationToken) { return Task.FromResult(StoredArtifacts.Remove(hash)); } } private sealed class InMemoryReachabilityCache : IReachabilityCache { public Task GetAsync(string subjectKey, CancellationToken cancellationToken) => Task.FromResult(null); public Task SetAsync(ReachabilityFactDocument document, CancellationToken cancellationToken) => Task.CompletedTask; public Task InvalidateAsync(string subjectKey, CancellationToken cancellationToken) => Task.CompletedTask; } private sealed class NullEventsPublisher : IEventsPublisher { public Task PublishFactUpdatedAsync(ReachabilityFactDocument fact, CancellationToken cancellationToken) => Task.CompletedTask; public Task PublishRuntimeUpdatedAsync(RuntimeUpdatedEvent runtimeEvent, CancellationToken cancellationToken) => Task.CompletedTask; } private sealed class StubReachabilityScoringService : IReachabilityScoringService { public Task RecomputeAsync(ReachabilityRecomputeRequest request, CancellationToken cancellationToken) => Task.FromResult(new ReachabilityFactDocument { SubjectKey = request.Subject.ToSubjectKey() }); } private sealed class StubProvenanceNormalizer : IRuntimeFactsProvenanceNormalizer { public ContextFacts CreateContextFacts( IEnumerable events, ReachabilitySubject subject, string callgraphId, Dictionary? metadata, DateTimeOffset timestamp) => new(); public ProvenanceFeed NormalizeToFeed( IEnumerable events, ReachabilitySubject subject, string callgraphId, Dictionary? metadata, DateTimeOffset generatedAt) => new() { FeedId = "test-feed", GeneratedAt = generatedAt }; } }