up
Some checks failed
LNM Migration CI / build-runner (push) Has been cancelled
Ledger OpenAPI CI / deprecation-check (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
Airgap Sealed CI Smoke / sealed-smoke (push) Has been cancelled
Ledger Packs CI / build-pack (push) Has been cancelled
Export Center CI / export-ci (push) Has been cancelled
Ledger OpenAPI CI / validate-oas (push) Has been cancelled
Ledger OpenAPI CI / check-wellknown (push) Has been cancelled
Ledger Packs CI / verify-pack (push) Has been cancelled
LNM Migration CI / validate-metrics (push) Has been cancelled
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Some checks failed
LNM Migration CI / build-runner (push) Has been cancelled
Ledger OpenAPI CI / deprecation-check (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
Airgap Sealed CI Smoke / sealed-smoke (push) Has been cancelled
Ledger Packs CI / build-pack (push) Has been cancelled
Export Center CI / export-ci (push) Has been cancelled
Ledger OpenAPI CI / validate-oas (push) Has been cancelled
Ledger OpenAPI CI / check-wellknown (push) Has been cancelled
Ledger Packs CI / verify-pack (push) Has been cancelled
LNM Migration CI / validate-metrics (push) Has been cancelled
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
This commit is contained in:
@@ -0,0 +1,442 @@
|
||||
using System.Buffers;
|
||||
using System.Diagnostics;
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Security.Cryptography;
|
||||
using System.Text;
|
||||
using System.Text.Encodings.Web;
|
||||
using System.Text.Json;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using StellaOps.ExportCenter.Core.Planner;
|
||||
|
||||
namespace StellaOps.ExportCenter.Core.Adapters;
|
||||
|
||||
/// <summary>
|
||||
/// Combined Runtime adapter (runtime:combined) - exports scanner.entrytrace and zastava.runtime
|
||||
/// into a single NDJSON stream for offline kit attestation.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Output format: combined.runtime.ndjson with records:
|
||||
/// - combined.header: Metadata header with export info
|
||||
/// - entrytrace.*: Scanner entry trace records (from scanner.entrytrace.ndjson)
|
||||
/// - runtime.event: Zastava runtime events (from zastava.runtime.ndjson)
|
||||
/// - combined.footer: Summary with counts and hashes
|
||||
///
|
||||
/// Records are deterministically ordered for reproducible output.
|
||||
/// </remarks>
|
||||
public sealed class CombinedRuntimeAdapter : IExportAdapter
|
||||
{
|
||||
public const string Id = "runtime:combined";
|
||||
|
||||
private static readonly JsonWriterOptions WriterOptions = new()
|
||||
{
|
||||
Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,
|
||||
Indented = false,
|
||||
SkipValidation = false
|
||||
};
|
||||
|
||||
private readonly ILogger<CombinedRuntimeAdapter> _logger;
|
||||
private readonly ExportCompressor _compressor;
|
||||
|
||||
public string AdapterId => Id;
|
||||
public string DisplayName => "Combined Runtime Stream";
|
||||
public IReadOnlyList<ExportFormat> SupportedFormats { get; } = [ExportFormat.Ndjson];
|
||||
public bool SupportsStreaming => true;
|
||||
|
||||
public CombinedRuntimeAdapter(ILogger<CombinedRuntimeAdapter> logger)
|
||||
{
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_compressor = new ExportCompressor();
|
||||
}
|
||||
|
||||
public async Task<ExportAdapterResult> ProcessAsync(
|
||||
ExportAdapterContext context,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
var stopwatch = Stopwatch.StartNew();
|
||||
var itemResults = new List<AdapterItemResult>();
|
||||
|
||||
try
|
||||
{
|
||||
Directory.CreateDirectory(context.Config.OutputDirectory);
|
||||
|
||||
var result = await ProcessCombinedNdjsonAsync(context, cancellationToken);
|
||||
|
||||
stopwatch.Stop();
|
||||
|
||||
if (!result.Success)
|
||||
{
|
||||
return ExportAdapterResult.Failed(result.ErrorMessage ?? "Combined export failed");
|
||||
}
|
||||
|
||||
var counts = new ExportManifestCounts
|
||||
{
|
||||
TotalItems = result.EntryTraceCount + result.RuntimeEventCount,
|
||||
ProcessedItems = result.EntryTraceCount + result.RuntimeEventCount,
|
||||
SuccessfulItems = result.EntryTraceCount + result.RuntimeEventCount,
|
||||
FailedItems = 0,
|
||||
SkippedItems = 0,
|
||||
ArtifactCount = 1,
|
||||
TotalSizeBytes = result.Artifact!.SizeBytes,
|
||||
CompressedSizeBytes = result.Artifact.IsCompressed ? result.Artifact.SizeBytes : null,
|
||||
ByKind = new Dictionary<string, int>
|
||||
{
|
||||
["entrytrace"] = result.EntryTraceCount,
|
||||
["runtime_event"] = result.RuntimeEventCount
|
||||
},
|
||||
ByStatus = new Dictionary<string, int>
|
||||
{
|
||||
["success"] = result.EntryTraceCount + result.RuntimeEventCount
|
||||
}
|
||||
};
|
||||
|
||||
_logger.LogInformation(
|
||||
"Combined runtime export completed: {EntryTraceCount} entrytrace + {RuntimeEventCount} runtime events = {TotalBytes} bytes in {ElapsedMs}ms",
|
||||
result.EntryTraceCount, result.RuntimeEventCount, result.Artifact.SizeBytes, stopwatch.ElapsedMilliseconds);
|
||||
|
||||
return new ExportAdapterResult
|
||||
{
|
||||
Success = true,
|
||||
ItemResults = result.ItemResults,
|
||||
Artifacts = [result.Artifact],
|
||||
ManifestCounts = counts,
|
||||
ProcessingTime = stopwatch.Elapsed,
|
||||
CompletedAt = context.TimeProvider.GetUtcNow()
|
||||
};
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
return ExportAdapterResult.Failed("Export cancelled");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Combined runtime export failed");
|
||||
return ExportAdapterResult.Failed($"Export failed: {ex.Message}");
|
||||
}
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<AdapterItemResult> ProcessStreamAsync(
|
||||
ExportAdapterContext context,
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken = default)
|
||||
{
|
||||
Directory.CreateDirectory(context.Config.OutputDirectory);
|
||||
|
||||
foreach (var item in context.Items)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var content = await context.DataFetcher.FetchAsync(item, cancellationToken);
|
||||
|
||||
yield return new AdapterItemResult
|
||||
{
|
||||
ItemId = item.ItemId,
|
||||
Success = content.Success,
|
||||
ContentHash = content.OriginalHash,
|
||||
ProcessedAt = context.TimeProvider.GetUtcNow()
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
public Task<IReadOnlyList<string>> ValidateConfigAsync(
|
||||
ExportAdapterConfig config,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
var errors = new List<string>();
|
||||
|
||||
if (string.IsNullOrWhiteSpace(config.OutputDirectory))
|
||||
{
|
||||
errors.Add("Output directory is required");
|
||||
}
|
||||
|
||||
if (config.FormatOptions.Format != ExportFormat.Ndjson)
|
||||
{
|
||||
errors.Add("Combined runtime adapter only supports NDJSON format");
|
||||
}
|
||||
|
||||
return Task.FromResult<IReadOnlyList<string>>(errors);
|
||||
}
|
||||
|
||||
private async Task<CombinedExportResult> ProcessCombinedNdjsonAsync(
|
||||
ExportAdapterContext context,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var lines = new List<string>();
|
||||
var itemResults = new List<AdapterItemResult>();
|
||||
var now = context.TimeProvider.GetUtcNow();
|
||||
|
||||
// Categorize items
|
||||
var entryTraceItems = context.Items
|
||||
.Where(i => i.Kind.StartsWith("entrytrace", StringComparison.OrdinalIgnoreCase))
|
||||
.OrderBy(i => i.Name)
|
||||
.ThenBy(i => i.ItemId)
|
||||
.ToList();
|
||||
|
||||
var runtimeItems = context.Items
|
||||
.Where(i => i.Kind.StartsWith("runtime", StringComparison.OrdinalIgnoreCase) ||
|
||||
i.Kind.Equals("zastava_event", StringComparison.OrdinalIgnoreCase))
|
||||
.OrderBy(i => i.Name)
|
||||
.ThenBy(i => i.ItemId)
|
||||
.ToList();
|
||||
|
||||
// Write header
|
||||
lines.Add(BuildHeaderLine(context, entryTraceItems.Count, runtimeItems.Count, now));
|
||||
|
||||
// Process entry trace items
|
||||
var entryTraceRecordCount = 0;
|
||||
foreach (var item in entryTraceItems)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var content = await context.DataFetcher.FetchAsync(item, cancellationToken);
|
||||
if (!content.Success)
|
||||
{
|
||||
itemResults.Add(AdapterItemResult.Failed(item.ItemId, content.ErrorMessage ?? "Failed to fetch"));
|
||||
continue;
|
||||
}
|
||||
|
||||
// Entry trace items may be NDJSON themselves, pass through each line
|
||||
var entryLines = ParseNdjsonLines(content.JsonContent);
|
||||
foreach (var line in entryLines)
|
||||
{
|
||||
lines.Add(line);
|
||||
entryTraceRecordCount++;
|
||||
}
|
||||
|
||||
itemResults.Add(new AdapterItemResult
|
||||
{
|
||||
ItemId = item.ItemId,
|
||||
Success = true,
|
||||
ContentHash = content.OriginalHash,
|
||||
ProcessedAt = now
|
||||
});
|
||||
}
|
||||
|
||||
// Process runtime event items
|
||||
var runtimeEventCount = 0;
|
||||
foreach (var item in runtimeItems)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var content = await context.DataFetcher.FetchAsync(item, cancellationToken);
|
||||
if (!content.Success)
|
||||
{
|
||||
itemResults.Add(AdapterItemResult.Failed(item.ItemId, content.ErrorMessage ?? "Failed to fetch"));
|
||||
continue;
|
||||
}
|
||||
|
||||
// Runtime items may be NDJSON or single JSON
|
||||
var eventLines = ParseNdjsonLines(content.JsonContent);
|
||||
foreach (var line in eventLines)
|
||||
{
|
||||
// Wrap runtime events with type marker if not already present
|
||||
var wrappedLine = EnsureRuntimeEventType(line);
|
||||
lines.Add(wrappedLine);
|
||||
runtimeEventCount++;
|
||||
}
|
||||
|
||||
itemResults.Add(new AdapterItemResult
|
||||
{
|
||||
ItemId = item.ItemId,
|
||||
Success = true,
|
||||
ContentHash = content.OriginalHash,
|
||||
ProcessedAt = now
|
||||
});
|
||||
}
|
||||
|
||||
// Write footer
|
||||
lines.Add(BuildFooterLine(entryTraceRecordCount, runtimeEventCount, now));
|
||||
|
||||
if (lines.Count <= 2) // Only header and footer
|
||||
{
|
||||
return CombinedExportResult.Failed("No items to export");
|
||||
}
|
||||
|
||||
// Write combined NDJSON
|
||||
var ndjsonContent = string.Join("\n", lines) + "\n";
|
||||
var outputBytes = Encoding.UTF8.GetBytes(ndjsonContent);
|
||||
var originalSize = outputBytes.Length;
|
||||
var compression = context.Config.FormatOptions.Compression;
|
||||
|
||||
if (compression != CompressionFormat.None)
|
||||
{
|
||||
var compressed = _compressor.CompressBytes(outputBytes, compression);
|
||||
if (!compressed.Success)
|
||||
{
|
||||
return CombinedExportResult.Failed(compressed.ErrorMessage ?? "Compression failed");
|
||||
}
|
||||
outputBytes = compressed.CompressedData!;
|
||||
}
|
||||
|
||||
var fileName = $"combined.runtime.ndjson{ExportCompressor.GetFileExtension(compression)}";
|
||||
var outputPath = Path.Combine(context.Config.OutputDirectory, fileName);
|
||||
|
||||
await File.WriteAllBytesAsync(outputPath, outputBytes, cancellationToken);
|
||||
|
||||
var hash = ComputeSha256(outputBytes);
|
||||
if (context.Config.IncludeChecksums)
|
||||
{
|
||||
var checksumPath = outputPath + ".sha256";
|
||||
await File.WriteAllTextAsync(checksumPath, $"{hash} {fileName}\n", cancellationToken);
|
||||
}
|
||||
|
||||
return new CombinedExportResult
|
||||
{
|
||||
Success = true,
|
||||
ItemResults = itemResults,
|
||||
EntryTraceCount = entryTraceRecordCount,
|
||||
RuntimeEventCount = runtimeEventCount,
|
||||
Artifact = new ExportOutputArtifact
|
||||
{
|
||||
Path = outputPath,
|
||||
SizeBytes = outputBytes.Length,
|
||||
Sha256 = hash,
|
||||
ContentType = "application/x-ndjson",
|
||||
ItemCount = lines.Count,
|
||||
IsCompressed = compression != CompressionFormat.None,
|
||||
Compression = compression,
|
||||
OriginalSizeBytes = originalSize
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static string BuildHeaderLine(
|
||||
ExportAdapterContext context,
|
||||
int entryTraceItemCount,
|
||||
int runtimeItemCount,
|
||||
DateTimeOffset timestamp)
|
||||
{
|
||||
var buffer = new ArrayBufferWriter<byte>(256);
|
||||
using (var writer = new Utf8JsonWriter(buffer, WriterOptions))
|
||||
{
|
||||
writer.WriteStartObject();
|
||||
writer.WriteString("type", "combined.header");
|
||||
writer.WriteString("version", "1.0.0");
|
||||
writer.WriteString("schema", "stellaops.combined.runtime@v1");
|
||||
writer.WriteString("generated_at", timestamp.UtcDateTime.ToString("O"));
|
||||
writer.WriteString("tenant_id", context.TenantId.ToString("D"));
|
||||
if (!string.IsNullOrEmpty(context.CorrelationId))
|
||||
{
|
||||
writer.WriteString("correlation_id", context.CorrelationId);
|
||||
}
|
||||
writer.WritePropertyName("source_counts");
|
||||
writer.WriteStartObject();
|
||||
writer.WriteNumber("entrytrace_items", entryTraceItemCount);
|
||||
writer.WriteNumber("runtime_items", runtimeItemCount);
|
||||
writer.WriteEndObject();
|
||||
writer.WriteEndObject();
|
||||
writer.Flush();
|
||||
}
|
||||
|
||||
return Encoding.UTF8.GetString(buffer.WrittenSpan);
|
||||
}
|
||||
|
||||
private static string BuildFooterLine(int entryTraceCount, int runtimeEventCount, DateTimeOffset timestamp)
|
||||
{
|
||||
var buffer = new ArrayBufferWriter<byte>(256);
|
||||
using (var writer = new Utf8JsonWriter(buffer, WriterOptions))
|
||||
{
|
||||
writer.WriteStartObject();
|
||||
writer.WriteString("type", "combined.footer");
|
||||
writer.WritePropertyName("record_counts");
|
||||
writer.WriteStartObject();
|
||||
writer.WriteNumber("entrytrace_records", entryTraceCount);
|
||||
writer.WriteNumber("runtime_events", runtimeEventCount);
|
||||
writer.WriteNumber("total", entryTraceCount + runtimeEventCount);
|
||||
writer.WriteEndObject();
|
||||
writer.WriteString("completed_at", timestamp.UtcDateTime.ToString("O"));
|
||||
writer.WriteEndObject();
|
||||
writer.Flush();
|
||||
}
|
||||
|
||||
return Encoding.UTF8.GetString(buffer.WrittenSpan);
|
||||
}
|
||||
|
||||
private static IReadOnlyList<string> ParseNdjsonLines(string? content)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(content))
|
||||
{
|
||||
return [];
|
||||
}
|
||||
|
||||
var lines = new List<string>();
|
||||
using var reader = new StringReader(content);
|
||||
|
||||
string? line;
|
||||
while ((line = reader.ReadLine()) is not null)
|
||||
{
|
||||
var trimmed = line.Trim();
|
||||
if (!string.IsNullOrEmpty(trimmed))
|
||||
{
|
||||
lines.Add(trimmed);
|
||||
}
|
||||
}
|
||||
|
||||
return lines;
|
||||
}
|
||||
|
||||
private static string EnsureRuntimeEventType(string jsonLine)
|
||||
{
|
||||
// If the line already has a "type" field starting with "runtime." or "entrytrace.", pass through
|
||||
if (jsonLine.Contains("\"type\":\"runtime.") ||
|
||||
jsonLine.Contains("\"type\":\"entrytrace.") ||
|
||||
jsonLine.Contains("\"type\": \"runtime.") ||
|
||||
jsonLine.Contains("\"type\": \"entrytrace."))
|
||||
{
|
||||
return jsonLine;
|
||||
}
|
||||
|
||||
// Wrap as runtime.event if no type present
|
||||
try
|
||||
{
|
||||
using var doc = JsonDocument.Parse(jsonLine);
|
||||
var root = doc.RootElement;
|
||||
|
||||
if (root.TryGetProperty("type", out var typeElement))
|
||||
{
|
||||
// Has type but not runtime/entrytrace prefix, pass through
|
||||
return jsonLine;
|
||||
}
|
||||
|
||||
// Add type field for runtime events
|
||||
var buffer = new ArrayBufferWriter<byte>(jsonLine.Length + 32);
|
||||
using (var writer = new Utf8JsonWriter(buffer, WriterOptions))
|
||||
{
|
||||
writer.WriteStartObject();
|
||||
writer.WriteString("type", "runtime.event");
|
||||
|
||||
foreach (var property in root.EnumerateObject())
|
||||
{
|
||||
property.WriteTo(writer);
|
||||
}
|
||||
|
||||
writer.WriteEndObject();
|
||||
writer.Flush();
|
||||
}
|
||||
|
||||
return Encoding.UTF8.GetString(buffer.WrittenSpan);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// If parsing fails, return original
|
||||
return jsonLine;
|
||||
}
|
||||
}
|
||||
|
||||
private static string ComputeSha256(byte[] data)
|
||||
{
|
||||
var hashBytes = SHA256.HashData(data);
|
||||
return Convert.ToHexString(hashBytes).ToLowerInvariant();
|
||||
}
|
||||
|
||||
private sealed record CombinedExportResult
|
||||
{
|
||||
public required bool Success { get; init; }
|
||||
public IReadOnlyList<AdapterItemResult> ItemResults { get; init; } = [];
|
||||
public int EntryTraceCount { get; init; }
|
||||
public int RuntimeEventCount { get; init; }
|
||||
public ExportOutputArtifact? Artifact { get; init; }
|
||||
public string? ErrorMessage { get; init; }
|
||||
|
||||
public static CombinedExportResult Failed(string errorMessage)
|
||||
=> new() { Success = false, ErrorMessage = errorMessage };
|
||||
}
|
||||
}
|
||||
@@ -94,6 +94,7 @@ public static class ExportAdapterServiceExtensions
|
||||
// Register individual adapters
|
||||
services.AddSingleton<IExportAdapter, JsonRawAdapter>();
|
||||
services.AddSingleton<IExportAdapter, JsonPolicyAdapter>();
|
||||
services.AddSingleton<IExportAdapter, CombinedRuntimeAdapter>();
|
||||
services.AddSingleton<IExportAdapter>(sp =>
|
||||
new MirrorAdapter(
|
||||
sp.GetRequiredService<ILogger<MirrorAdapter>>(),
|
||||
|
||||
Reference in New Issue
Block a user