Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
- Implemented comprehensive unit tests for RabbitMqTransportServer, covering constructor, disposal, connection management, event handlers, and exception handling. - Added configuration tests for RabbitMqTransportServer to validate SSL, durable queues, auto-recovery, and custom virtual host options. - Created unit tests for UdpFrameProtocol, including frame parsing and serialization, header size validation, and round-trip data preservation. - Developed tests for UdpTransportClient, focusing on connection handling, event subscriptions, and exception scenarios. - Established tests for UdpTransportServer, ensuring proper start/stop behavior, connection state management, and event handling. - Included tests for UdpTransportOptions to verify default values and modification capabilities. - Enhanced service registration tests for Udp transport services in the dependency injection container.
105 lines
3.9 KiB
C#
105 lines
3.9 KiB
C#
using System;
|
|
using System.IO;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.Extensions.Logging;
|
|
using StellaOps.Cryptography;
|
|
using StellaOps.Replay.Core;
|
|
using StellaOps.Scanner.Storage;
|
|
using StellaOps.Scanner.Storage.ObjectStore;
|
|
|
|
namespace StellaOps.Scanner.Worker.Processing.Replay;
|
|
|
|
/// <summary>
|
|
/// Fetches a sealed replay bundle from the configured object store, verifies its SHA-256 hash,
|
|
/// and returns a local file path for downstream analyzers.
|
|
/// </summary>
|
|
internal sealed class ReplayBundleFetcher
|
|
{
|
|
private readonly IArtifactObjectStore _objectStore;
|
|
private readonly ICryptoHash _cryptoHash;
|
|
private readonly ScannerStorageOptions _storageOptions;
|
|
private readonly ILogger<ReplayBundleFetcher> _logger;
|
|
|
|
public ReplayBundleFetcher(
|
|
IArtifactObjectStore objectStore,
|
|
ICryptoHash cryptoHash,
|
|
ScannerStorageOptions storageOptions,
|
|
ILogger<ReplayBundleFetcher> logger)
|
|
{
|
|
_objectStore = objectStore ?? throw new ArgumentNullException(nameof(objectStore));
|
|
_cryptoHash = cryptoHash ?? throw new ArgumentNullException(nameof(cryptoHash));
|
|
_storageOptions = storageOptions ?? throw new ArgumentNullException(nameof(storageOptions));
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
}
|
|
|
|
public async Task<string?> FetchAsync(ReplaySealedBundleMetadata metadata, CancellationToken cancellationToken)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(metadata);
|
|
|
|
if (string.IsNullOrWhiteSpace(metadata.BundleUri))
|
|
{
|
|
return null;
|
|
}
|
|
|
|
var (bucket, key) = ResolveDescriptor(metadata.BundleUri);
|
|
var descriptor = new ArtifactObjectDescriptor(bucket, key, Immutable: true);
|
|
|
|
await using var stream = await _objectStore.GetAsync(descriptor, cancellationToken).ConfigureAwait(false);
|
|
if (stream is null)
|
|
{
|
|
throw new InvalidOperationException($"Replay bundle not found: {metadata.BundleUri}");
|
|
}
|
|
|
|
var tempPath = Path.Combine(Path.GetTempPath(), "stellaops", "replay", metadata.ManifestHash + ".tar.zst");
|
|
Directory.CreateDirectory(Path.GetDirectoryName(tempPath)!);
|
|
|
|
await using (var file = File.Create(tempPath))
|
|
{
|
|
await stream.CopyToAsync(file, cancellationToken).ConfigureAwait(false);
|
|
}
|
|
|
|
// Verify hash
|
|
await using (var file = File.OpenRead(tempPath))
|
|
{
|
|
var actualHex = await DeterministicHash.Sha256HexAsync(_cryptoHash, file, cancellationToken).ConfigureAwait(false);
|
|
var expected = NormalizeHash(metadata.ManifestHash);
|
|
if (!string.Equals(actualHex, expected, StringComparison.OrdinalIgnoreCase))
|
|
{
|
|
File.Delete(tempPath);
|
|
throw new InvalidOperationException($"Replay bundle hash mismatch. Expected {expected} got {actualHex}");
|
|
}
|
|
}
|
|
|
|
_logger.LogInformation("Fetched sealed replay bundle {Uri} (hash {Hash}) to {Path}", metadata.BundleUri, metadata.ManifestHash, tempPath);
|
|
return tempPath;
|
|
}
|
|
|
|
private (string Bucket, string Key) ResolveDescriptor(string uri)
|
|
{
|
|
// Expect cas://bucket/key
|
|
if (!uri.StartsWith("cas://", StringComparison.OrdinalIgnoreCase))
|
|
{
|
|
// fallback to configured bucket + direct key
|
|
return (_storageOptions.ObjectStore.BucketName, uri.Trim('/'));
|
|
}
|
|
|
|
var trimmed = uri.Substring("cas://".Length);
|
|
var slash = trimmed.IndexOf('/') ;
|
|
if (slash < 0)
|
|
{
|
|
return (_storageOptions.ObjectStore.BucketName, trimmed);
|
|
}
|
|
|
|
var bucket = trimmed[..slash];
|
|
var key = trimmed[(slash + 1)..];
|
|
return (bucket, key);
|
|
}
|
|
|
|
private static string NormalizeHash(string hash)
|
|
{
|
|
var value = hash.Trim().ToLowerInvariant();
|
|
return value.StartsWith("sha256:", StringComparison.Ordinal) ? value[7..] : value;
|
|
}
|
|
}
|