semi implemented and features implemented save checkpoint

This commit is contained in:
master
2026-02-08 18:00:49 +02:00
parent 04360dff63
commit 1bf6bbf395
20895 changed files with 716795 additions and 64 deletions

View File

@@ -4,8 +4,13 @@
// Description: Stage executor for pushing verdicts as OCI referrer artifacts.
// -----------------------------------------------------------------------------
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Logging;
using StellaOps.Scanner.Evidence;
using StellaOps.Scanner.Storage.Oci;
using StellaOps.Scanner.Worker.Options;
using System.Collections.Concurrent;
using System.Globalization;
namespace StellaOps.Scanner.Worker.Processing;
@@ -15,14 +20,20 @@ namespace StellaOps.Scanner.Worker.Processing;
/// </summary>
public sealed class VerdictPushStageExecutor : IScanStageExecutor
{
private readonly VerdictOciPublisher _publisher;
private static readonly TimeSpan SubmissionReceiptTtl = TimeSpan.FromMinutes(15);
private static readonly ConcurrentDictionary<string, SubmissionReceipt> SubmissionReceipts = new(StringComparer.Ordinal);
private readonly IVerdictOciPublisher _publisher;
private readonly IOptionsMonitor<ScannerWorkerOptions> _options;
private readonly ILogger<VerdictPushStageExecutor> _logger;
public VerdictPushStageExecutor(
VerdictOciPublisher publisher,
IVerdictOciPublisher publisher,
IOptionsMonitor<ScannerWorkerOptions> options,
ILogger<VerdictPushStageExecutor> logger)
{
_publisher = publisher ?? throw new ArgumentNullException(nameof(publisher));
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
@@ -52,6 +63,17 @@ public sealed class VerdictPushStageExecutor : IScanStageExecutor
return;
}
var idempotencyKey = AttestationIdempotencyKey.FromDsseEnvelope(envelope);
if (TryGetCachedReceipt(idempotencyKey, context.TimeProvider.GetUtcNow(), out var cachedReceipt))
{
_logger.LogInformation(
"Skipping duplicate verdict push for job {JobId}; using cached receipt {ManifestDigest}.",
context.JobId,
cachedReceipt.ManifestDigest);
StorePushResult(context, idempotencyKey, cachedReceipt.ManifestDigest, cachedReceipt.ManifestReference);
return;
}
var request = new VerdictOciPublishRequest
{
Reference = options.RegistryReference,
@@ -63,31 +85,64 @@ public sealed class VerdictPushStageExecutor : IScanStageExecutor
Decision = options.Decision,
GraphRevisionId = options.GraphRevisionId,
ProofBundleDigest = options.ProofBundleDigest,
VerdictTimestamp = context.TimeProvider.GetUtcNow()
VerdictTimestamp = options.VerdictTimestamp,
IdempotencyKey = idempotencyKey
};
try
{
var result = await _publisher.PushAsync(request, cancellationToken).ConfigureAwait(false);
if (result.Success)
var retryCount = Math.Max(0, _options.CurrentValue.VerdictPush.MaxRetries);
var maxAttempts = retryCount + 1;
for (var attempt = 1; attempt <= maxAttempts; attempt++)
{
_logger.LogInformation(
"Pushed verdict for job {JobId} to {Reference} with digest {ManifestDigest}.",
context.JobId,
request.Reference,
result.ManifestDigest);
var result = await _publisher.PushAsync(request, cancellationToken).ConfigureAwait(false);
if (result.Success)
{
_logger.LogInformation(
result.AlreadyExists
? "Verdict already exists for job {JobId} at {Reference} ({ManifestDigest})."
: "Pushed verdict for job {JobId} to {Reference} with digest {ManifestDigest}.",
context.JobId,
request.Reference,
result.ManifestDigest);
// Store the push result in the analysis store for downstream consumers
context.Analysis.Set(VerdictPushAnalysisKeys.VerdictManifestDigest, result.ManifestDigest ?? string.Empty);
context.Analysis.Set(VerdictPushAnalysisKeys.VerdictManifestReference, result.ManifestReference ?? string.Empty);
}
else
{
_logger.LogError(
"Failed to push verdict for job {JobId}: {Error}",
var manifestDigest = result.ManifestDigest ?? string.Empty;
var manifestReference = result.ManifestReference ?? string.Empty;
RememberReceipt(idempotencyKey, manifestDigest, manifestReference, context.TimeProvider.GetUtcNow());
StorePushResult(context, idempotencyKey, manifestDigest, manifestReference);
return;
}
if (IsAlreadySubmittedError(result.Error))
{
_logger.LogInformation(
"Verdict push returned already-submitted signal for job {JobId} (attempt {Attempt}/{MaxAttempts}): {Error}",
context.JobId,
attempt,
maxAttempts,
result.Error);
StorePushResult(context, idempotencyKey, result.ManifestDigest ?? string.Empty, result.ManifestReference ?? string.Empty);
return;
}
if (!ShouldRetry(result.Error, attempt, maxAttempts))
{
_logger.LogError(
"Failed to push verdict for job {JobId}: {Error}",
context.JobId,
result.Error);
return;
}
var delay = ComputeRetryDelay(attempt);
_logger.LogWarning(
"Transient verdict push failure for job {JobId} (attempt {Attempt}/{MaxAttempts}): {Error}. Retrying in {Delay}.",
context.JobId,
result.Error);
attempt,
maxAttempts,
result.Error,
delay);
await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
}
}
catch (Exception ex) when (ex is not OperationCanceledException)
@@ -134,6 +189,13 @@ public sealed class VerdictPushStageExecutor : IScanStageExecutor
decision = "unknown";
}
DateTimeOffset? verdictTimestamp = null;
if (metadata.TryGetValue(VerdictPushMetadataKeys.Timestamp, out var timestampRaw) &&
DateTimeOffset.TryParse(timestampRaw, CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind, out var parsedTimestamp))
{
verdictTimestamp = parsedTimestamp;
}
return new VerdictPushOptions
{
RegistryReference = registryRef!,
@@ -143,7 +205,8 @@ public sealed class VerdictPushStageExecutor : IScanStageExecutor
PolicyDigest = metadata.GetValueOrDefault(VerdictPushMetadataKeys.PolicyDigest) ?? "sha256:unknown",
Decision = decision,
GraphRevisionId = metadata.GetValueOrDefault(VerdictPushMetadataKeys.GraphRevisionId),
ProofBundleDigest = metadata.GetValueOrDefault(VerdictPushMetadataKeys.ProofBundleDigest)
ProofBundleDigest = metadata.GetValueOrDefault(VerdictPushMetadataKeys.ProofBundleDigest),
VerdictTimestamp = verdictTimestamp
};
}
@@ -186,6 +249,83 @@ public sealed class VerdictPushStageExecutor : IScanStageExecutor
return null;
}
private static bool TryGetCachedReceipt(string idempotencyKey, DateTimeOffset now, out SubmissionReceipt receipt)
{
if (SubmissionReceipts.TryGetValue(idempotencyKey, out receipt))
{
if (receipt.ExpiresAtUtc > now)
{
return true;
}
SubmissionReceipts.TryRemove(idempotencyKey, out _);
}
receipt = default;
return false;
}
private static void RememberReceipt(
string idempotencyKey,
string manifestDigest,
string manifestReference,
DateTimeOffset now)
{
SubmissionReceipts[idempotencyKey] = new SubmissionReceipt(
manifestDigest,
manifestReference,
now.Add(SubmissionReceiptTtl));
}
private static void StorePushResult(
ScanJobContext context,
string idempotencyKey,
string manifestDigest,
string manifestReference)
{
context.Analysis.Set(VerdictPushAnalysisKeys.VerdictIdempotencyKey, idempotencyKey);
context.Analysis.Set(VerdictPushAnalysisKeys.VerdictManifestDigest, manifestDigest);
context.Analysis.Set(VerdictPushAnalysisKeys.VerdictManifestReference, manifestReference);
}
private static bool IsAlreadySubmittedError(string? error)
{
if (string.IsNullOrWhiteSpace(error))
{
return false;
}
return error.Contains("409", StringComparison.OrdinalIgnoreCase) ||
error.Contains("conflict", StringComparison.OrdinalIgnoreCase) ||
error.Contains("already", StringComparison.OrdinalIgnoreCase);
}
private static bool IsTransientError(string? error)
{
if (string.IsNullOrWhiteSpace(error))
{
return false;
}
return error.Contains("429", StringComparison.OrdinalIgnoreCase) ||
error.Contains("500", StringComparison.OrdinalIgnoreCase) ||
error.Contains("502", StringComparison.OrdinalIgnoreCase) ||
error.Contains("503", StringComparison.OrdinalIgnoreCase) ||
error.Contains("504", StringComparison.OrdinalIgnoreCase) ||
error.Contains("timeout", StringComparison.OrdinalIgnoreCase) ||
error.Contains("temporar", StringComparison.OrdinalIgnoreCase) ||
error.Contains("unavailable", StringComparison.OrdinalIgnoreCase);
}
private static bool ShouldRetry(string? error, int attempt, int maxAttempts)
=> attempt < maxAttempts && IsTransientError(error);
private static TimeSpan ComputeRetryDelay(int attempt)
{
var milliseconds = Math.Min(2000, 200 * (1 << Math.Clamp(attempt - 1, 0, 4)));
return TimeSpan.FromMilliseconds(milliseconds);
}
private sealed class VerdictPushOptions
{
public required string RegistryReference { get; init; }
@@ -196,7 +336,13 @@ public sealed class VerdictPushStageExecutor : IScanStageExecutor
public required string Decision { get; init; }
public string? GraphRevisionId { get; init; }
public string? ProofBundleDigest { get; init; }
public DateTimeOffset? VerdictTimestamp { get; init; }
}
private readonly record struct SubmissionReceipt(
string ManifestDigest,
string ManifestReference,
DateTimeOffset ExpiresAtUtc);
}
/// <summary>
@@ -212,6 +358,7 @@ public static class VerdictPushMetadataKeys
public const string Decision = "verdict.decision";
public const string GraphRevisionId = "verdict.graph.revision.id";
public const string ProofBundleDigest = "verdict.proof.bundle.digest";
public const string Timestamp = "verdict.timestamp";
}
/// <summary>
@@ -221,6 +368,7 @@ public static class VerdictPushAnalysisKeys
{
public const string VerdictDsseEnvelope = "verdict.dsse.envelope";
public const string VerdictDsseEnvelopeMemory = "verdict.dsse.envelope.memory";
public const string VerdictIdempotencyKey = "verdict.push.idempotency.key";
public const string VerdictManifestDigest = "verdict.push.manifest.digest";
public const string VerdictManifestReference = "verdict.push.manifest.reference";
}

View File

@@ -1,5 +1,6 @@
using GateDetectors = StellaOps.Scanner.Reachability.Gates.Detectors;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
@@ -317,7 +318,7 @@ if (workerOptions.VerdictPush.Enabled)
{
client.Timeout = workerOptions.VerdictPush.Timeout;
});
builder.Services.AddSingleton<StellaOps.Scanner.Storage.Oci.VerdictOciPublisher>();
builder.Services.AddSingleton<StellaOps.Scanner.Storage.Oci.IVerdictOciPublisher, StellaOps.Scanner.Storage.Oci.VerdictOciPublisher>();
builder.Services.AddSingleton<IScanStageExecutor, VerdictPushStageExecutor>();
}

View File

@@ -9,3 +9,4 @@ Source of truth: `docs/implplan/SPRINT_20260113_001_001_SCANNER_elf_section_hash
| ELF-SECTION-DI-0001 | DONE | Register section hash extractor options and services. |
| AUDIT-HOTLIST-SCANNER-WORKER-0001 | DONE | Apply audit hotlist findings for Scanner.Worker. |
| REMED-06 | DONE | SOLID review notes captured for SPRINT_20260130_002. |
| SPRINT-20260208-060-IDEMP-001 | DONE | Implement idempotent verdict attestation submission (idempotency key + dedupe + retry classification + tests). |