// ----------------------------------------------------------------------------- // CommandHandlers.Feeds.cs // Sprint: SPRINT_20251226_007_BE_determinism_gaps // Task: DET-GAP-04 // Description: Command handlers for feed snapshot operations. // ----------------------------------------------------------------------------- using System.Text.Json; using Microsoft.Extensions.DependencyInjection; using Spectre.Console; namespace StellaOps.Cli.Commands; internal static partial class CommandHandlers { private static readonly JsonSerializerOptions FeedsJsonOptions = new() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase, WriteIndented = true }; internal static async Task HandleFeedsSnapshotCreateAsync( IServiceProvider services, string? label, string[]? sources, bool json, bool verbose, CancellationToken cancellationToken) { if (verbose) { AnsiConsole.MarkupLine("[blue]Creating feed snapshot...[/]"); if (!string.IsNullOrEmpty(label)) AnsiConsole.MarkupLine($" Label: [bold]{Markup.Escape(label)}[/]"); if (sources?.Length > 0) AnsiConsole.MarkupLine($" Sources: [bold]{string.Join(", ", sources)}[/]"); } try { var httpClientFactory = services.GetService(); if (httpClientFactory == null) { AnsiConsole.MarkupLine("[red]Error: HTTP client factory not available.[/]"); return 1; } var client = httpClientFactory.CreateClient("Concelier"); var request = new { label, sources }; var content = new StringContent( JsonSerializer.Serialize(request, FeedsJsonOptions), System.Text.Encoding.UTF8, "application/json"); using var response = await client.PostAsync("/api/v1/feeds/snapshot", content, cancellationToken); if (!response.IsSuccessStatusCode) { var error = await response.Content.ReadAsStringAsync(cancellationToken); AnsiConsole.MarkupLine($"[red]Error: {response.StatusCode}[/]"); if (verbose) { AnsiConsole.MarkupLine($"[grey]{Markup.Escape(error)}[/]"); } return 1; } var responseText = await response.Content.ReadAsStringAsync(cancellationToken); if (json) { AnsiConsole.WriteLine(responseText); } else { var result = JsonSerializer.Deserialize(responseText, FeedsJsonOptions); if (result != null) { AnsiConsole.MarkupLine("[green]✓[/] Snapshot created successfully"); AnsiConsole.MarkupLine($" Snapshot ID: [bold]{result.SnapshotId}[/]"); AnsiConsole.MarkupLine($" Digest: [cyan]{result.CompositeDigest}[/]"); AnsiConsole.MarkupLine($" Created: {result.CreatedAt:u}"); AnsiConsole.MarkupLine($" Sources: {result.Sources?.Length ?? 0}"); if (result.Sources?.Length > 0) { var table = new Table() .AddColumn("Source") .AddColumn("Digest") .AddColumn("Items"); foreach (var source in result.Sources) { table.AddRow( source.SourceId ?? "-", source.Digest?.Substring(0, Math.Min(16, source.Digest.Length)) + "..." ?? "-", source.ItemCount.ToString()); } AnsiConsole.Write(table); } } } return 0; } catch (Exception ex) { AnsiConsole.MarkupLine($"[red]Error: {Markup.Escape(ex.Message)}[/]"); if (verbose) { AnsiConsole.WriteException(ex); } return 1; } } internal static async Task HandleFeedsSnapshotListAsync( IServiceProvider services, int limit, bool json, bool verbose, CancellationToken cancellationToken) { if (verbose) { AnsiConsole.MarkupLine("[blue]Listing feed snapshots...[/]"); AnsiConsole.MarkupLine($" Limit: {limit}"); } try { var httpClientFactory = services.GetService(); if (httpClientFactory == null) { AnsiConsole.MarkupLine("[red]Error: HTTP client factory not available.[/]"); return 1; } var client = httpClientFactory.CreateClient("Concelier"); using var response = await client.GetAsync($"/api/v1/feeds/snapshot?limit={limit}", cancellationToken); if (!response.IsSuccessStatusCode) { var error = await response.Content.ReadAsStringAsync(cancellationToken); AnsiConsole.MarkupLine($"[red]Error: {response.StatusCode}[/]"); if (verbose) { AnsiConsole.MarkupLine($"[grey]{Markup.Escape(error)}[/]"); } return 1; } var responseText = await response.Content.ReadAsStringAsync(cancellationToken); if (json) { AnsiConsole.WriteLine(responseText); } else { var result = JsonSerializer.Deserialize(responseText, FeedsJsonOptions); if (result?.Snapshots != null) { var table = new Table() .Title("Feed Snapshots") .AddColumn("ID") .AddColumn("Digest") .AddColumn("Label") .AddColumn("Created") .AddColumn("Sources") .AddColumn("Items"); foreach (var snapshot in result.Snapshots) { table.AddRow( snapshot.SnapshotId ?? "-", snapshot.CompositeDigest?.Substring(0, Math.Min(16, snapshot.CompositeDigest.Length)) + "..." ?? "-", snapshot.Label ?? "-", snapshot.CreatedAt.ToString("u"), snapshot.SourceCount.ToString(), snapshot.TotalItemCount.ToString()); } AnsiConsole.Write(table); AnsiConsole.MarkupLine($"[grey]Total: {result.Snapshots.Length} snapshots[/]"); } } return 0; } catch (Exception ex) { AnsiConsole.MarkupLine($"[red]Error: {Markup.Escape(ex.Message)}[/]"); if (verbose) { AnsiConsole.WriteException(ex); } return 1; } } internal static async Task HandleFeedsSnapshotExportAsync( IServiceProvider services, string snapshotId, string output, string? compression, bool json, bool verbose, CancellationToken cancellationToken) { if (verbose) { AnsiConsole.MarkupLine("[blue]Exporting feed snapshot...[/]"); AnsiConsole.MarkupLine($" Snapshot: [bold]{Markup.Escape(snapshotId)}[/]"); AnsiConsole.MarkupLine($" Output: [bold]{Markup.Escape(output)}[/]"); AnsiConsole.MarkupLine($" Compression: {compression ?? "zstd"}"); } try { var httpClientFactory = services.GetService(); if (httpClientFactory == null) { AnsiConsole.MarkupLine("[red]Error: HTTP client factory not available.[/]"); return 1; } var client = httpClientFactory.CreateClient("Concelier"); var format = compression ?? "zstd"; var url = $"/api/v1/feeds/snapshot/{Uri.EscapeDataString(snapshotId)}/export?format={format}"; await AnsiConsole.Progress() .StartAsync(async ctx => { var task = ctx.AddTask("[green]Downloading snapshot bundle[/]"); using var response = await client.GetAsync(url, HttpCompletionOption.ResponseHeadersRead, cancellationToken); if (!response.IsSuccessStatusCode) { var error = await response.Content.ReadAsStringAsync(cancellationToken); throw new CommandLineException($"Export failed: {response.StatusCode} - {error}"); } var totalBytes = response.Content.Headers.ContentLength ?? 0; task.MaxValue = totalBytes > 0 ? totalBytes : 100; await using var stream = await response.Content.ReadAsStreamAsync(cancellationToken); await using var fileStream = File.Create(output); var buffer = new byte[81920]; long totalRead = 0; int bytesRead; while ((bytesRead = await stream.ReadAsync(buffer, cancellationToken)) > 0) { await fileStream.WriteAsync(buffer.AsMemory(0, bytesRead), cancellationToken); totalRead += bytesRead; task.Value = totalBytes > 0 ? totalRead : Math.Min(totalRead, 100); } task.Value = task.MaxValue; }); var fileInfo = new FileInfo(output); if (json) { var metadata = new { snapshotId, outputPath = output, sizeBytes = fileInfo.Length, compression = compression ?? "zstd" }; AnsiConsole.WriteLine(JsonSerializer.Serialize(metadata, FeedsJsonOptions)); } else { AnsiConsole.MarkupLine("[green]✓[/] Snapshot exported successfully"); AnsiConsole.MarkupLine($" Output: [bold]{output}[/]"); AnsiConsole.MarkupLine($" Size: {FormatBytes(fileInfo.Length)}"); } return 0; } catch (CommandLineException ex) { AnsiConsole.MarkupLine($"[red]Error: {Markup.Escape(ex.Message)}[/]"); return 1; } catch (Exception ex) { AnsiConsole.MarkupLine($"[red]Error: {Markup.Escape(ex.Message)}[/]"); if (verbose) { AnsiConsole.WriteException(ex); } return 1; } } internal static async Task HandleFeedsSnapshotImportAsync( IServiceProvider services, string input, bool validate, bool json, bool verbose, CancellationToken cancellationToken) { if (verbose) { AnsiConsole.MarkupLine("[blue]Importing feed snapshot...[/]"); AnsiConsole.MarkupLine($" Input: [bold]{Markup.Escape(input)}[/]"); AnsiConsole.MarkupLine($" Validate: {validate}"); } if (!File.Exists(input)) { AnsiConsole.MarkupLine($"[red]Error: File not found: {Markup.Escape(input)}[/]"); return 1; } try { var httpClientFactory = services.GetService(); if (httpClientFactory == null) { AnsiConsole.MarkupLine("[red]Error: HTTP client factory not available.[/]"); return 1; } var client = httpClientFactory.CreateClient("Concelier"); await using var fileStream = File.OpenRead(input); var content = new StreamContent(fileStream); content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("application/octet-stream"); var form = new MultipartFormDataContent { { content, "file", Path.GetFileName(input) } }; var url = $"/api/v1/feeds/snapshot/import?validate={validate.ToString().ToLowerInvariant()}"; using var response = await client.PostAsync(url, form, cancellationToken); if (!response.IsSuccessStatusCode) { var error = await response.Content.ReadAsStringAsync(cancellationToken); AnsiConsole.MarkupLine($"[red]Error: {response.StatusCode}[/]"); if (verbose) { AnsiConsole.MarkupLine($"[grey]{Markup.Escape(error)}[/]"); } return 1; } var responseText = await response.Content.ReadAsStringAsync(cancellationToken); if (json) { AnsiConsole.WriteLine(responseText); } else { var result = JsonSerializer.Deserialize(responseText, FeedsJsonOptions); if (result != null) { AnsiConsole.MarkupLine("[green]✓[/] Snapshot imported successfully"); AnsiConsole.MarkupLine($" Snapshot ID: [bold]{result.SnapshotId}[/]"); AnsiConsole.MarkupLine($" Digest: [cyan]{result.CompositeDigest}[/]"); AnsiConsole.MarkupLine($" Sources: {result.SourceCount}"); } } return 0; } catch (Exception ex) { AnsiConsole.MarkupLine($"[red]Error: {Markup.Escape(ex.Message)}[/]"); if (verbose) { AnsiConsole.WriteException(ex); } return 1; } } internal static async Task HandleFeedsSnapshotValidateAsync( IServiceProvider services, string snapshotId, bool json, bool verbose, CancellationToken cancellationToken) { if (verbose) { AnsiConsole.MarkupLine("[blue]Validating feed snapshot...[/]"); AnsiConsole.MarkupLine($" Snapshot: [bold]{Markup.Escape(snapshotId)}[/]"); } try { var httpClientFactory = services.GetService(); if (httpClientFactory == null) { AnsiConsole.MarkupLine("[red]Error: HTTP client factory not available.[/]"); return 1; } var client = httpClientFactory.CreateClient("Concelier"); var url = $"/api/v1/feeds/snapshot/{Uri.EscapeDataString(snapshotId)}/validate"; using var response = await client.GetAsync(url, cancellationToken); if (!response.IsSuccessStatusCode) { var error = await response.Content.ReadAsStringAsync(cancellationToken); AnsiConsole.MarkupLine($"[red]Error: {response.StatusCode}[/]"); if (verbose) { AnsiConsole.MarkupLine($"[grey]{Markup.Escape(error)}[/]"); } return 1; } var responseText = await response.Content.ReadAsStringAsync(cancellationToken); if (json) { AnsiConsole.WriteLine(responseText); } else { var result = JsonSerializer.Deserialize(responseText, FeedsJsonOptions); if (result != null) { if (result.IsValid) { AnsiConsole.MarkupLine("[green]✓[/] Snapshot is valid and can be replayed"); AnsiConsole.MarkupLine($" Snapshot Digest: [cyan]{result.SnapshotDigest}[/]"); AnsiConsole.MarkupLine($" Current Digest: [cyan]{result.CurrentDigest}[/]"); } else { AnsiConsole.MarkupLine("[red]✗[/] Snapshot has drifted from current state"); AnsiConsole.MarkupLine($" Snapshot Digest: [cyan]{result.SnapshotDigest}[/]"); AnsiConsole.MarkupLine($" Current Digest: [yellow]{result.CurrentDigest}[/]"); if (result.DriftedSources?.Length > 0) { AnsiConsole.MarkupLine("\n[yellow]Drifted Sources:[/]"); var table = new Table() .AddColumn("Source") .AddColumn("Snapshot Digest") .AddColumn("Current Digest") .AddColumn("+Added") .AddColumn("-Removed") .AddColumn("~Modified"); foreach (var drift in result.DriftedSources) { table.AddRow( drift.SourceId ?? "-", drift.SnapshotDigest?.Substring(0, 12) + "..." ?? "-", drift.CurrentDigest?.Substring(0, 12) + "..." ?? "-", $"[green]+{drift.AddedItems}[/]", $"[red]-{drift.RemovedItems}[/]", $"[yellow]~{drift.ModifiedItems}[/]"); } AnsiConsole.Write(table); } } } } return 0; } catch (Exception ex) { AnsiConsole.MarkupLine($"[red]Error: {Markup.Escape(ex.Message)}[/]"); if (verbose) { AnsiConsole.WriteException(ex); } return 1; } } // DTO types for JSON deserialization private sealed record CreateSnapshotResponse( string SnapshotId, string CompositeDigest, DateTimeOffset CreatedAt, SourceSnapshotSummary[]? Sources); private sealed record SourceSnapshotSummary( string SourceId, string Digest, int ItemCount); private sealed record ListSnapshotsResponse( SnapshotListItem[] Snapshots); private sealed record SnapshotListItem( string SnapshotId, string CompositeDigest, string? Label, DateTimeOffset CreatedAt, int SourceCount, int TotalItemCount); private sealed record ImportSnapshotResponse( string SnapshotId, string CompositeDigest, DateTimeOffset CreatedAt, int SourceCount); private sealed record ValidateSnapshotResponse( bool IsValid, string SnapshotDigest, string CurrentDigest, DriftedSourceInfo[]? DriftedSources); private sealed record DriftedSourceInfo( string SourceId, string SnapshotDigest, string CurrentDigest, int AddedItems, int RemovedItems, int ModifiedItems); }