Files
git.stella-ops.org/src/TimelineIndexer/StellaOps.TimelineIndexer/StellaOps.TimelineIndexer.Tests/TimelineIngestionWorkerTests.cs

124 lines
5.2 KiB
C#

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using StellaOps.TimelineIndexer.Core.Abstractions;
using StellaOps.TimelineIndexer.Core.Models;
using StellaOps.TimelineIndexer.Core.Models.Results;
using StellaOps.TimelineIndexer.Core.Services;
using StellaOps.TimelineIndexer.Worker;
namespace StellaOps.TimelineIndexer.Tests;
public sealed class TimelineIngestionWorkerTests
{
[Trait("Category", TestCategories.Unit)]
[Fact]
public async Task Worker_Ingests_And_Dedupes()
{
var subscriber = new InMemoryTimelineEventSubscriber();
var store = new RecordingStore();
var serviceCollection = new ServiceCollection();
serviceCollection.AddSingleton<ITimelineEventSubscriber>(subscriber);
serviceCollection.AddSingleton<ITimelineEventStore>(store);
serviceCollection.AddSingleton<ITimelineIngestionService, TimelineIngestionService>();
serviceCollection.AddSingleton<IHostedService, TimelineIngestionWorker>();
serviceCollection.AddLogging();
using var host = serviceCollection.BuildServiceProvider();
var hosted = host.GetRequiredService<IHostedService>();
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
await hosted.StartAsync(cts.Token);
var evt = new TimelineEventEnvelope
{
EventId = "evt-1",
TenantId = "tenant-a",
EventType = "test",
Source = "unit",
OccurredAt = DateTimeOffset.UtcNow,
RawPayloadJson = "{}"
};
subscriber.Enqueue(evt);
subscriber.Enqueue(evt); // duplicate
subscriber.Complete();
await Task.Delay(200, cts.Token);
await hosted.StopAsync(cts.Token);
Assert.Equal(1, store.InsertCalls); // duplicate dropped
Assert.Equal("sha256:44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a", store.LastHash); // hash of "{}"
}
[Trait("Category", TestCategories.Unit)]
[Fact]
public async Task Worker_Passes_Evidence_Metadata()
{
var subscriber = new InMemoryTimelineEventSubscriber();
var store = new RecordingStore();
var services = new ServiceCollection();
services.AddSingleton<ITimelineEventSubscriber>(subscriber);
services.AddSingleton<ITimelineEventStore>(store);
services.AddSingleton<ITimelineIngestionService, TimelineIngestionService>();
services.AddSingleton<IHostedService, TimelineIngestionWorker>();
services.AddLogging();
using var provider = services.BuildServiceProvider();
using StellaOps.TestKit;
var hosted = provider.GetRequiredService<IHostedService>();
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
await hosted.StartAsync(cts.Token);
var evt = new TimelineEventEnvelope
{
EventId = "evt-evidence-worker",
TenantId = "tenant-e",
EventType = "export.bundle.sealed",
Source = "exporter",
OccurredAt = DateTimeOffset.UtcNow,
RawPayloadJson = "{}",
BundleId = Guid.Parse("9f34f8c6-7a7c-4d63-9c70-2ae6e8f8c6aa"),
BundleDigest = "sha256:abcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd",
AttestationSubject = "sha256:abcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd",
AttestationDigest = "sha256:0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcd",
ManifestUri = "bundles/9f34f8c6/manifest.dsse.json"
};
subscriber.Enqueue(evt);
subscriber.Complete();
await Task.Delay(200, cts.Token);
await hosted.StopAsync(cts.Token);
Assert.Equal(evt.BundleId, store.LastBundleId);
Assert.Equal(evt.BundleDigest, store.LastBundleDigest);
Assert.Equal(evt.AttestationSubject, store.LastAttestationSubject);
Assert.Equal(evt.AttestationDigest, store.LastAttestationDigest);
Assert.Equal(evt.ManifestUri, store.LastManifestUri);
}
private sealed class RecordingStore : ITimelineEventStore
{
private readonly HashSet<(string tenant, string id)> _seen = new();
public int InsertCalls { get; private set; }
public string? LastHash { get; private set; }
public Guid? LastBundleId { get; private set; }
public string? LastBundleDigest { get; private set; }
public string? LastAttestationSubject { get; private set; }
public string? LastAttestationDigest { get; private set; }
public string? LastManifestUri { get; private set; }
public Task<bool> InsertAsync(TimelineEventEnvelope envelope, CancellationToken cancellationToken = default)
{
InsertCalls++;
LastHash = envelope.PayloadHash;
LastBundleId = envelope.BundleId;
LastBundleDigest = envelope.BundleDigest;
LastAttestationSubject = envelope.AttestationSubject;
LastAttestationDigest = envelope.AttestationDigest;
LastManifestUri = envelope.ManifestUri;
return Task.FromResult(_seen.Add((envelope.TenantId, envelope.EventId)));
}
}
}