up
This commit is contained in:
@@ -0,0 +1,10 @@
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace StellaOps.Scanner.Worker.Determinism;
|
||||
|
||||
/// <summary>
|
||||
/// Deterministic metadata for a surface manifest: per-payload hashes and a Merkle-like root.
|
||||
/// </summary>
|
||||
public sealed record DeterminismEvidence(
|
||||
IReadOnlyDictionary<string, string> PayloadHashes,
|
||||
string MerkleRootSha256);
|
||||
@@ -0,0 +1,79 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
|
||||
namespace StellaOps.Scanner.Worker.Determinism;
|
||||
|
||||
/// <summary>
|
||||
/// Represents a determinism score report produced by the worker replay harness.
|
||||
/// This mirrors the determinism.json shape used in release bundles.
|
||||
/// </summary>
|
||||
public sealed record DeterminismReport(
|
||||
string Version,
|
||||
string Release,
|
||||
string Platform,
|
||||
string? PolicySha,
|
||||
string? FeedsSha,
|
||||
string? ScannerSha,
|
||||
double OverallScore,
|
||||
double ThresholdOverall,
|
||||
double ThresholdImage,
|
||||
IReadOnlyList<DeterminismImageReport> Images)
|
||||
{
|
||||
public static DeterminismReport FromHarness(Harness.DeterminismReport harnessReport,
|
||||
string release,
|
||||
string platform,
|
||||
string? policySha = null,
|
||||
string? feedsSha = null,
|
||||
string? scannerSha = null,
|
||||
string version = "1")
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(harnessReport);
|
||||
|
||||
return new DeterminismReport(
|
||||
Version: version,
|
||||
Release: release,
|
||||
Platform: platform,
|
||||
PolicySha: policySha,
|
||||
FeedsSha: feedsSha,
|
||||
ScannerSha: scannerSha,
|
||||
OverallScore: harnessReport.OverallScore,
|
||||
ThresholdOverall: harnessReport.OverallThreshold,
|
||||
ThresholdImage: harnessReport.ImageThreshold,
|
||||
Images: harnessReport.Images.Select(DeterminismImageReport.FromHarness).ToList());
|
||||
}
|
||||
}
|
||||
|
||||
public sealed record DeterminismImageReport(
|
||||
string Image,
|
||||
int Runs,
|
||||
int Identical,
|
||||
double Score,
|
||||
IReadOnlyDictionary<string, string> ArtifactHashes,
|
||||
IReadOnlyList<DeterminismRunReport> RunsDetail)
|
||||
{
|
||||
public static DeterminismImageReport FromHarness(Harness.DeterminismImageReport report)
|
||||
{
|
||||
return new DeterminismImageReport(
|
||||
Image: report.ImageDigest,
|
||||
Runs: report.Runs,
|
||||
Identical: report.Identical,
|
||||
Score: report.Score,
|
||||
ArtifactHashes: report.BaselineHashes,
|
||||
RunsDetail: report.RunReports.Select(DeterminismRunReport.FromHarness).ToList());
|
||||
}
|
||||
}
|
||||
|
||||
public sealed record DeterminismRunReport(
|
||||
int RunIndex,
|
||||
IReadOnlyDictionary<string, string> ArtifactHashes,
|
||||
IReadOnlyList<string> NonDeterministic)
|
||||
{
|
||||
public static DeterminismRunReport FromHarness(Harness.DeterminismRunReport report)
|
||||
{
|
||||
return new DeterminismRunReport(
|
||||
RunIndex: report.RunIndex,
|
||||
ArtifactHashes: report.ArtifactHashes,
|
||||
NonDeterministic: report.NonDeterministicArtifacts);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
using System;
|
||||
|
||||
namespace StellaOps.Scanner.Worker.Processing.Replay;
|
||||
|
||||
public sealed record ReplayBundleContext(ReplaySealedBundleMetadata Metadata, string BundlePath)
|
||||
{
|
||||
public ReplayBundleContext : this(Metadata ?? throw new ArgumentNullException(nameof(Metadata)),
|
||||
string.IsNullOrWhiteSpace(BundlePath) ? throw new ArgumentException("BundlePath required", nameof(BundlePath)) : BundlePath)
|
||||
{
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,97 @@
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using StellaOps.Replay.Core;
|
||||
using StellaOps.Scanner.Storage;
|
||||
using StellaOps.Scanner.Storage.ObjectStore;
|
||||
|
||||
namespace StellaOps.Scanner.Worker.Processing.Replay;
|
||||
|
||||
/// <summary>
|
||||
/// Fetches a sealed replay bundle from the configured object store, verifies its SHA-256 hash,
|
||||
/// and returns a local file path for downstream analyzers.
|
||||
/// </summary>
|
||||
internal sealed class ReplayBundleFetcher
|
||||
{
|
||||
private readonly IArtifactObjectStore _objectStore;
|
||||
private readonly ScannerStorageOptions _storageOptions;
|
||||
private readonly ILogger<ReplayBundleFetcher> _logger;
|
||||
|
||||
public ReplayBundleFetcher(IArtifactObjectStore objectStore, ScannerStorageOptions storageOptions, ILogger<ReplayBundleFetcher> logger)
|
||||
{
|
||||
_objectStore = objectStore ?? throw new ArgumentNullException(nameof(objectStore));
|
||||
_storageOptions = storageOptions ?? throw new ArgumentNullException(nameof(storageOptions));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
public async Task<string?> FetchAsync(ReplaySealedBundleMetadata metadata, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(metadata);
|
||||
|
||||
if (string.IsNullOrWhiteSpace(metadata.BundleUri))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
var (bucket, key) = ResolveDescriptor(metadata.BundleUri);
|
||||
var descriptor = new ArtifactObjectDescriptor(bucket, key, Immutable: true);
|
||||
|
||||
await using var stream = await _objectStore.GetAsync(descriptor, cancellationToken).ConfigureAwait(false);
|
||||
if (stream is null)
|
||||
{
|
||||
throw new InvalidOperationException($"Replay bundle not found: {metadata.BundleUri}");
|
||||
}
|
||||
|
||||
var tempPath = Path.Combine(Path.GetTempPath(), "stellaops", "replay", metadata.ManifestHash + ".tar.zst");
|
||||
Directory.CreateDirectory(Path.GetDirectoryName(tempPath)!);
|
||||
|
||||
await using (var file = File.Create(tempPath))
|
||||
{
|
||||
await stream.CopyToAsync(file, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
// Verify hash
|
||||
await using (var file = File.OpenRead(tempPath))
|
||||
{
|
||||
var actualHex = DeterministicHash.Sha256Hex(file);
|
||||
var expected = NormalizeHash(metadata.ManifestHash);
|
||||
if (!string.Equals(actualHex, expected, StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
File.Delete(tempPath);
|
||||
throw new InvalidOperationException($"Replay bundle hash mismatch. Expected {expected} got {actualHex}");
|
||||
}
|
||||
}
|
||||
|
||||
_logger.LogInformation("Fetched sealed replay bundle {Uri} (hash {Hash}) to {Path}", metadata.BundleUri, metadata.ManifestHash, tempPath);
|
||||
return tempPath;
|
||||
}
|
||||
|
||||
private (string Bucket, string Key) ResolveDescriptor(string uri)
|
||||
{
|
||||
// Expect cas://bucket/key
|
||||
if (!uri.StartsWith("cas://", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
// fallback to configured bucket + direct key
|
||||
return (_storageOptions.ObjectStore.BucketName, uri.Trim('/'));
|
||||
}
|
||||
|
||||
var trimmed = uri.Substring("cas://".Length);
|
||||
var slash = trimmed.IndexOf('/') ;
|
||||
if (slash < 0)
|
||||
{
|
||||
return (_storageOptions.ObjectStore.BucketName, trimmed);
|
||||
}
|
||||
|
||||
var bucket = trimmed[..slash];
|
||||
var key = trimmed[(slash + 1)..];
|
||||
return (bucket, key);
|
||||
}
|
||||
|
||||
private static string NormalizeHash(string hash)
|
||||
{
|
||||
var value = hash.Trim().ToLowerInvariant();
|
||||
return value.StartsWith("sha256:", StringComparison.Ordinal) ? value[7..] : value;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
using System;
|
||||
using System.IO;
|
||||
|
||||
namespace StellaOps.Scanner.Worker.Processing.Replay;
|
||||
|
||||
/// <summary>
|
||||
/// Represents a fetched replay bundle mounted on the local filesystem.
|
||||
/// </summary>
|
||||
public sealed class ReplayBundleMount : IDisposable
|
||||
{
|
||||
public ReplayBundleMount(string bundlePath)
|
||||
{
|
||||
BundlePath = bundlePath ?? throw new ArgumentNullException(nameof(bundlePath));
|
||||
}
|
||||
|
||||
public string BundlePath { get; }
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
try
|
||||
{
|
||||
if (File.Exists(BundlePath))
|
||||
{
|
||||
File.Delete(BundlePath);
|
||||
}
|
||||
}
|
||||
catch
|
||||
{
|
||||
// best-effort cleanup
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
namespace StellaOps.Scanner.Worker.Processing.Replay;
|
||||
|
||||
/// <summary>
|
||||
/// Captures sealed replay bundle metadata supplied via the job lease.
|
||||
/// Used to keep analyzer execution hermetic and to emit Merkle metadata downstream.
|
||||
/// </summary>
|
||||
public sealed record ReplaySealedBundleMetadata(
|
||||
string ManifestHash,
|
||||
string BundleUri,
|
||||
string? PolicySnapshotId,
|
||||
string? FeedSnapshotId);
|
||||
@@ -0,0 +1,65 @@
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using StellaOps.Scanner.Core.Contracts;
|
||||
|
||||
namespace StellaOps.Scanner.Worker.Processing.Replay;
|
||||
|
||||
/// <summary>
|
||||
/// Reads sealed replay bundle metadata from the job lease and stores it in the analysis context.
|
||||
/// This does not fetch the bundle contents (handled by upstream) but ensures downstream stages
|
||||
/// know they must stay hermetic and use the provided bundle identifiers.
|
||||
/// </summary>
|
||||
public sealed class ReplaySealedBundleStageExecutor : IScanStageExecutor
|
||||
{
|
||||
public const string BundleUriKey = "replay.bundle.uri";
|
||||
public const string BundleHashKey = "replay.bundle.sha256";
|
||||
private const string PolicyPinKey = "determinism.policy";
|
||||
private const string FeedPinKey = "determinism.feed";
|
||||
|
||||
private readonly ILogger<ReplaySealedBundleStageExecutor> _logger;
|
||||
|
||||
public ReplaySealedBundleStageExecutor(ILogger<ReplaySealedBundleStageExecutor> logger)
|
||||
{
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
public string StageName => ScanStageNames.IngestReplay;
|
||||
|
||||
public ValueTask ExecuteAsync(ScanJobContext context, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(context);
|
||||
|
||||
var metadata = context.Lease.Metadata;
|
||||
if (!metadata.TryGetValue(BundleUriKey, out var bundleUri) || string.IsNullOrWhiteSpace(bundleUri))
|
||||
{
|
||||
_logger.LogDebug("Replay bundle URI not provided; skipping sealed bundle ingestion.");
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
|
||||
if (!metadata.TryGetValue(BundleHashKey, out var bundleHash) || string.IsNullOrWhiteSpace(bundleHash))
|
||||
{
|
||||
_logger.LogWarning("Replay bundle URI provided without hash; skipping sealed bundle ingestion to avoid unverifiable input.");
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
|
||||
var policyPin = metadata.TryGetValue(PolicyPinKey, out var policy) && !string.IsNullOrWhiteSpace(policy)
|
||||
? policy
|
||||
: null;
|
||||
var feedPin = metadata.TryGetValue(FeedPinKey, out var feed) && !string.IsNullOrWhiteSpace(feed)
|
||||
? feed
|
||||
: null;
|
||||
|
||||
var sealedMetadata = new ReplaySealedBundleMetadata(
|
||||
ManifestHash: bundleHash.Trim(),
|
||||
BundleUri: bundleUri.Trim(),
|
||||
PolicySnapshotId: policyPin,
|
||||
FeedSnapshotId: feedPin);
|
||||
|
||||
context.Analysis.Set(ScanAnalysisKeys.ReplaySealedBundleMetadata, sealedMetadata);
|
||||
_logger.LogInformation("Replay sealed bundle pinned: uri={BundleUri} hash={BundleHash} policy={PolicyPin} feed={FeedPin}", bundleUri, bundleHash, policyPin, feedPin);
|
||||
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
@@ -21,11 +21,13 @@ public sealed class ScanJobContext
|
||||
|
||||
public DateTimeOffset StartUtc { get; }
|
||||
|
||||
public CancellationToken CancellationToken { get; }
|
||||
|
||||
public string JobId => Lease.JobId;
|
||||
|
||||
public CancellationToken CancellationToken { get; }
|
||||
|
||||
public string JobId => Lease.JobId;
|
||||
|
||||
public string ScanId => Lease.ScanId;
|
||||
|
||||
public string? ReplayBundlePath { get; set; }
|
||||
|
||||
public ScanAnalysisStore Analysis { get; }
|
||||
}
|
||||
|
||||
@@ -7,21 +7,24 @@ using StellaOps.Scanner.Reachability;
|
||||
|
||||
namespace StellaOps.Scanner.Worker.Processing;
|
||||
|
||||
public sealed class ScanJobProcessor
|
||||
{
|
||||
public sealed class ScanJobProcessor
|
||||
{
|
||||
private readonly IReadOnlyDictionary<string, IScanStageExecutor> _executors;
|
||||
private readonly ScanProgressReporter _progressReporter;
|
||||
private readonly ILogger<ScanJobProcessor> _logger;
|
||||
private readonly IReachabilityUnionPublisherService _reachabilityPublisher;
|
||||
private readonly Replay.ReplayBundleFetcher _replayBundleFetcher;
|
||||
|
||||
public ScanJobProcessor(
|
||||
IEnumerable<IScanStageExecutor> executors,
|
||||
ScanProgressReporter progressReporter,
|
||||
IReachabilityUnionPublisherService reachabilityPublisher,
|
||||
Replay.ReplayBundleFetcher replayBundleFetcher,
|
||||
ILogger<ScanJobProcessor> logger)
|
||||
{
|
||||
_progressReporter = progressReporter ?? throw new ArgumentNullException(nameof(progressReporter));
|
||||
_reachabilityPublisher = reachabilityPublisher ?? throw new ArgumentNullException(nameof(reachabilityPublisher));
|
||||
_replayBundleFetcher = replayBundleFetcher ?? throw new ArgumentNullException(nameof(replayBundleFetcher));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
|
||||
var map = new Dictionary<string, IScanStageExecutor>(StringComparer.OrdinalIgnoreCase);
|
||||
@@ -52,18 +55,17 @@ public sealed class ScanJobProcessor
|
||||
public async ValueTask ExecuteAsync(ScanJobContext context, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(context);
|
||||
// Placeholder: reachability publisher will be fed once lifter outputs are routed here.
|
||||
_ = _reachabilityPublisher;
|
||||
await EnsureReplayBundleFetchedAsync(context, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
foreach (var stage in ScanStageNames.Ordered)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
if (!_executors.TryGetValue(stage, out var executor))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
await _progressReporter.ExecuteStageAsync(
|
||||
context,
|
||||
stage,
|
||||
@@ -71,4 +73,19 @@ public sealed class ScanJobProcessor
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task EnsureReplayBundleFetchedAsync(ScanJobContext context, CancellationToken cancellationToken)
|
||||
{
|
||||
if (context.Analysis.TryGet<Replay.ReplaySealedBundleMetadata>(ScanAnalysisKeys.ReplaySealedBundleMetadata, out var sealedMetadata) && sealedMetadata is not null)
|
||||
{
|
||||
// Already fetched in this context
|
||||
if (!string.IsNullOrWhiteSpace(context.ReplayBundlePath) && File.Exists(context.ReplayBundlePath))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var path = await _replayBundleFetcher.FetchAsync(sealedMetadata, cancellationToken).ConfigureAwait(false);
|
||||
context.ReplayBundlePath = path;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,9 +2,10 @@ using System.Collections.Generic;
|
||||
|
||||
namespace StellaOps.Scanner.Worker.Processing;
|
||||
|
||||
public static class ScanStageNames
|
||||
{
|
||||
public const string ResolveImage = "resolve-image";
|
||||
public static class ScanStageNames
|
||||
{
|
||||
public const string IngestReplay = "ingest-replay";
|
||||
public const string ResolveImage = "resolve-image";
|
||||
public const string PullLayers = "pull-layers";
|
||||
public const string BuildFilesystem = "build-filesystem";
|
||||
public const string ExecuteAnalyzers = "execute-analyzers";
|
||||
@@ -14,6 +15,7 @@ public static class ScanStageNames
|
||||
|
||||
public static readonly IReadOnlyList<string> Ordered = new[]
|
||||
{
|
||||
IngestReplay,
|
||||
ResolveImage,
|
||||
PullLayers,
|
||||
BuildFilesystem,
|
||||
|
||||
@@ -36,7 +36,12 @@ internal sealed record SurfaceManifestRequest(
|
||||
IReadOnlyList<SurfaceManifestPayload> Payloads,
|
||||
string Component,
|
||||
string? Version,
|
||||
string? WorkerInstance);
|
||||
string? WorkerInstance,
|
||||
string? DeterminismMerkleRoot = null,
|
||||
string? ReplayBundleUri = null,
|
||||
string? ReplayBundleHash = null,
|
||||
string? ReplayPolicyPin = null,
|
||||
string? ReplayFeedPin = null);
|
||||
|
||||
internal interface ISurfaceManifestPublisher
|
||||
{
|
||||
@@ -112,7 +117,17 @@ internal sealed class SurfaceManifestPublisher : ISurfaceManifestPublisher
|
||||
WorkerInstance = request.WorkerInstance,
|
||||
Attempt = request.Attempt
|
||||
},
|
||||
Artifacts = artifacts.ToImmutableArray()
|
||||
Artifacts = artifacts.ToImmutableArray(),
|
||||
DeterminismMerkleRoot = request.DeterminismMerkleRoot,
|
||||
ReplayBundle = string.IsNullOrWhiteSpace(request.ReplayBundleUri)
|
||||
? null
|
||||
: new ReplayBundleReference
|
||||
{
|
||||
Uri = request.ReplayBundleUri!,
|
||||
Sha256 = request.ReplayBundleHash ?? string.Empty,
|
||||
PolicySnapshotId = request.ReplayPolicyPin,
|
||||
FeedSnapshotId = request.ReplayFeedPin
|
||||
}
|
||||
};
|
||||
|
||||
var manifestBytes = JsonSerializer.SerializeToUtf8Bytes(manifestDocument, SerializerOptions);
|
||||
@@ -177,7 +192,8 @@ internal sealed class SurfaceManifestPublisher : ISurfaceManifestPublisher
|
||||
ManifestDigest: manifestDigest,
|
||||
ManifestUri: manifestUri,
|
||||
ArtifactId: artifactId,
|
||||
Document: manifestDocument);
|
||||
Document: manifestDocument,
|
||||
DeterminismMerkleRoot: request.DeterminismMerkleRoot);
|
||||
}
|
||||
|
||||
private async Task<SurfaceManifestArtifact> StorePayloadAsync(SurfaceManifestPayload payload, string tenant, CancellationToken cancellationToken)
|
||||
|
||||
Reference in New Issue
Block a user