Refactor code structure for improved readability and maintainability
This commit is contained in:
@@ -0,0 +1,153 @@
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Text.Json;
|
||||
using StellaOps.TaskRunner.Client.Models;
|
||||
|
||||
namespace StellaOps.TaskRunner.Client.Streaming;
|
||||
|
||||
/// <summary>
|
||||
/// Helper for reading NDJSON streaming logs.
|
||||
/// </summary>
|
||||
public static class StreamingLogReader
|
||||
{
|
||||
private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web);
|
||||
|
||||
/// <summary>
|
||||
/// Reads log entries from an NDJSON stream.
|
||||
/// </summary>
|
||||
/// <param name="stream">The input stream containing NDJSON log entries.</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
/// <returns>Async enumerable of log entries.</returns>
|
||||
public static async IAsyncEnumerable<RunLogEntry> ReadAsync(
|
||||
Stream stream,
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(stream);
|
||||
|
||||
using var reader = new StreamReader(stream);
|
||||
|
||||
string? line;
|
||||
while ((line = await reader.ReadLineAsync(cancellationToken).ConfigureAwait(false)) is not null)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
if (string.IsNullOrWhiteSpace(line))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
RunLogEntry? entry;
|
||||
try
|
||||
{
|
||||
entry = JsonSerializer.Deserialize<RunLogEntry>(line, JsonOptions);
|
||||
}
|
||||
catch (JsonException)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (entry is not null)
|
||||
{
|
||||
yield return entry;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Collects all log entries from a stream into a list.
|
||||
/// </summary>
|
||||
/// <param name="stream">The input stream containing NDJSON log entries.</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
/// <returns>List of all log entries.</returns>
|
||||
public static async Task<IReadOnlyList<RunLogEntry>> CollectAsync(
|
||||
Stream stream,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
var entries = new List<RunLogEntry>();
|
||||
|
||||
await foreach (var entry in ReadAsync(stream, cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
entries.Add(entry);
|
||||
}
|
||||
|
||||
return entries;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Filters log entries by level.
|
||||
/// </summary>
|
||||
/// <param name="entries">Source log entries.</param>
|
||||
/// <param name="levels">Log levels to include (e.g., "error", "warning").</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
/// <returns>Filtered log entries.</returns>
|
||||
public static async IAsyncEnumerable<RunLogEntry> FilterByLevelAsync(
|
||||
IAsyncEnumerable<RunLogEntry> entries,
|
||||
IReadOnlySet<string> levels,
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(entries);
|
||||
ArgumentNullException.ThrowIfNull(levels);
|
||||
|
||||
await foreach (var entry in entries.WithCancellation(cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
if (levels.Contains(entry.Level, StringComparer.OrdinalIgnoreCase))
|
||||
{
|
||||
yield return entry;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Filters log entries by step ID.
|
||||
/// </summary>
|
||||
/// <param name="entries">Source log entries.</param>
|
||||
/// <param name="stepId">Step ID to filter by.</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
/// <returns>Filtered log entries.</returns>
|
||||
public static async IAsyncEnumerable<RunLogEntry> FilterByStepAsync(
|
||||
IAsyncEnumerable<RunLogEntry> entries,
|
||||
string stepId,
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(entries);
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(stepId);
|
||||
|
||||
await foreach (var entry in entries.WithCancellation(cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
if (string.Equals(entry.StepId, stepId, StringComparison.Ordinal))
|
||||
{
|
||||
yield return entry;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Groups log entries by step ID.
|
||||
/// </summary>
|
||||
/// <param name="entries">Source log entries.</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
/// <returns>Dictionary of step ID to log entries.</returns>
|
||||
public static async Task<IReadOnlyDictionary<string, IReadOnlyList<RunLogEntry>>> GroupByStepAsync(
|
||||
IAsyncEnumerable<RunLogEntry> entries,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(entries);
|
||||
|
||||
var groups = new Dictionary<string, List<RunLogEntry>>(StringComparer.Ordinal);
|
||||
|
||||
await foreach (var entry in entries.WithCancellation(cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
var key = entry.StepId ?? "(global)";
|
||||
if (!groups.TryGetValue(key, out var list))
|
||||
{
|
||||
list = [];
|
||||
groups[key] = list;
|
||||
}
|
||||
list.Add(entry);
|
||||
}
|
||||
|
||||
return groups.ToDictionary(
|
||||
kvp => kvp.Key,
|
||||
kvp => (IReadOnlyList<RunLogEntry>)kvp.Value,
|
||||
StringComparer.Ordinal);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user