using System.Text; using StellaOps.TaskRunner.Client.Models; using StellaOps.TaskRunner.Client.Streaming; using StellaOps.TaskRunner.Client.Pagination; using StellaOps.TaskRunner.Client.Lifecycle; using StellaOps.TestKit; namespace StellaOps.TaskRunner.Tests; public sealed class TaskRunnerClientTests { [Trait("Category", TestCategories.Unit)] [Fact] public async Task StreamingLogReader_ParsesNdjsonLines() { var ct = CancellationToken.None; var ndjson = """ {"timestamp":"2025-01-01T00:00:00Z","level":"info","stepId":"step-1","message":"Starting","traceId":"abc123"} {"timestamp":"2025-01-01T00:00:01Z","level":"error","stepId":"step-1","message":"Failed","traceId":"abc123"} """; using var stream = new MemoryStream(Encoding.UTF8.GetBytes(ndjson)); var entries = await StreamingLogReader.CollectAsync(stream, ct); Assert.Equal(2, entries.Count); Assert.Equal("info", entries[0].Level); Assert.Equal("error", entries[1].Level); Assert.Equal("step-1", entries[0].StepId); Assert.Equal("Starting", entries[0].Message); } [Trait("Category", TestCategories.Unit)] [Fact] public async Task StreamingLogReader_SkipsEmptyLines() { var ct = CancellationToken.None; var ndjson = """ {"timestamp":"2025-01-01T00:00:00Z","level":"info","stepId":"step-1","message":"Test","traceId":"abc123"} {"timestamp":"2025-01-01T00:00:01Z","level":"info","stepId":"step-2","message":"Test2","traceId":"abc123"} """; using var stream = new MemoryStream(Encoding.UTF8.GetBytes(ndjson)); var entries = await StreamingLogReader.CollectAsync(stream, ct); Assert.Equal(2, entries.Count); } [Trait("Category", TestCategories.Unit)] [Fact] public async Task StreamingLogReader_SkipsMalformedLines() { var ct = CancellationToken.None; var ndjson = """ {"timestamp":"2025-01-01T00:00:00Z","level":"info","stepId":"step-1","message":"Valid","traceId":"abc123"} not valid json {"timestamp":"2025-01-01T00:00:01Z","level":"info","stepId":"step-2","message":"AlsoValid","traceId":"abc123"} """; using var stream = new MemoryStream(Encoding.UTF8.GetBytes(ndjson)); var entries = await StreamingLogReader.CollectAsync(stream, ct); Assert.Equal(2, entries.Count); Assert.Equal("Valid", entries[0].Message); Assert.Equal("AlsoValid", entries[1].Message); } [Trait("Category", TestCategories.Unit)] [Fact] public async Task StreamingLogReader_FilterByLevel_FiltersCorrectly() { var ct = CancellationToken.None; var entries = new List { new(DateTimeOffset.UtcNow, "info", "step-1", "Info message", "trace1"), new(DateTimeOffset.UtcNow, "error", "step-1", "Error message", "trace1"), new(DateTimeOffset.UtcNow, "warning", "step-1", "Warning message", "trace1"), }; var levels = new HashSet(StringComparer.OrdinalIgnoreCase) { "error", "warning" }; var filtered = new List(); await foreach (var entry in StreamingLogReader.FilterByLevelAsync(entries.ToAsyncEnumerable(), levels, ct)) { filtered.Add(entry); } Assert.Equal(2, filtered.Count); Assert.DoesNotContain(filtered, e => e.Level == "info"); } [Trait("Category", TestCategories.Unit)] [Fact] public async Task StreamingLogReader_GroupByStep_GroupsCorrectly() { var ct = CancellationToken.None; var entries = new List { new(DateTimeOffset.UtcNow, "info", "step-1", "Message 1", "trace1"), new(DateTimeOffset.UtcNow, "info", "step-2", "Message 2", "trace1"), new(DateTimeOffset.UtcNow, "info", "step-1", "Message 3", "trace1"), new(DateTimeOffset.UtcNow, "info", null, "Global message", "trace1"), }; var groups = await StreamingLogReader.GroupByStepAsync(entries.ToAsyncEnumerable(), ct); Assert.Equal(3, groups.Count); Assert.Equal(2, groups["step-1"].Count); Assert.Single(groups["step-2"]); Assert.Single(groups["(global)"]); } [Trait("Category", TestCategories.Unit)] [Fact] public async Task Paginator_IteratesAllPages() { var ct = CancellationToken.None; var allItems = Enumerable.Range(1, 25).ToList(); var pageSize = 10; var fetchCalls = 0; var paginator = new Paginator( async (offset, limit, token) => { fetchCalls++; var items = allItems.Skip(offset).Take(limit).ToList(); var hasMore = offset + items.Count < allItems.Count; return new PagedResponse(items, allItems.Count, hasMore); }, pageSize); var collected = await paginator.CollectAsync(ct); Assert.Equal(25, collected.Count); Assert.Equal(3, fetchCalls); // 10, 10, 5 items Assert.Equal(allItems, collected); } [Trait("Category", TestCategories.Unit)] [Fact] public async Task Paginator_GetPage_ReturnsCorrectPage() { var ct = CancellationToken.None; var allItems = Enumerable.Range(1, 25).ToList(); var pageSize = 10; var paginator = new Paginator( async (offset, limit, token) => { var items = allItems.Skip(offset).Take(limit).ToList(); var hasMore = offset + items.Count < allItems.Count; return new PagedResponse(items, allItems.Count, hasMore); }, pageSize); var page2 = await paginator.GetPageAsync(2, ct); Assert.Equal(10, page2.Items.Count); Assert.Equal(11, page2.Items[0]); // Items 11-20 } [Trait("Category", TestCategories.Unit)] [Fact] public async Task PaginatorExtensions_TakeAsync_TakesCorrectNumber() { var ct = CancellationToken.None; var items = Enumerable.Range(1, 100).ToAsyncEnumerable(); var taken = new List(); await foreach (var item in items.TakeAsync(5, ct)) { taken.Add(item); } Assert.Equal(5, taken.Count); Assert.Equal(new[] { 1, 2, 3, 4, 5 }, taken); } [Trait("Category", TestCategories.Unit)] [Fact] public async Task PaginatorExtensions_SkipAsync_SkipsCorrectNumber() { var ct = CancellationToken.None; var items = Enumerable.Range(1, 10).ToAsyncEnumerable(); var skipped = new List(); await foreach (var item in items.SkipAsync(5, ct)) { skipped.Add(item); } Assert.Equal(5, skipped.Count); Assert.Equal(new[] { 6, 7, 8, 9, 10 }, skipped); } [Trait("Category", TestCategories.Unit)] [Fact] public void PackRunLifecycleHelper_TerminalStatuses_IncludesExpectedStatuses() { Assert.Contains("completed", PackRunLifecycleHelper.TerminalStatuses); Assert.Contains("failed", PackRunLifecycleHelper.TerminalStatuses); Assert.Contains("cancelled", PackRunLifecycleHelper.TerminalStatuses); Assert.Contains("rejected", PackRunLifecycleHelper.TerminalStatuses); Assert.DoesNotContain("running", PackRunLifecycleHelper.TerminalStatuses); Assert.DoesNotContain("pending", PackRunLifecycleHelper.TerminalStatuses); } [Trait("Category", TestCategories.Unit)] [Fact] public void PackRunModels_CreatePackRunRequest_SerializesCorrectly() { var request = new CreatePackRunRequest( "my-pack", "1.0.0", new Dictionary { ["key"] = "value" }, "tenant-1", "corr-123"); Assert.Equal("my-pack", request.PackId); Assert.Equal("1.0.0", request.PackVersion); Assert.NotNull(request.Inputs); Assert.Equal("value", request.Inputs["key"]); } [Trait("Category", TestCategories.Unit)] [Fact] public void PackRunModels_SimulatedStep_HasCorrectProperties() { var loopInfo = new LoopInfo("{{ inputs.items }}", "item", 100); var step = new SimulatedStep( "step-1", "loop", "WillIterate", loopInfo, null, null); Assert.Equal("step-1", step.StepId); Assert.Equal("loop", step.Kind); Assert.NotNull(step.LoopInfo); Assert.Equal("{{ inputs.items }}", step.LoopInfo.ItemsExpression); } } internal static class AsyncEnumerableExtensions { public static async IAsyncEnumerable ToAsyncEnumerable(this IEnumerable source) { foreach (var item in source) { yield return item; } await Task.CompletedTask; } }