Add unit and integration tests for VexCandidateEmitter and SmartDiff repositories

- Implemented comprehensive unit tests for VexCandidateEmitter to validate candidate emission logic based on various scenarios including absent and present APIs, confidence thresholds, and rate limiting.
- Added integration tests for SmartDiff PostgreSQL repositories, covering snapshot storage and retrieval, candidate storage, and material risk change handling.
- Ensured tests validate correct behavior for storing, retrieving, and querying snapshots and candidates, including edge cases and expected outcomes.
This commit is contained in:
master
2025-12-16 18:44:25 +02:00
parent 2170a58734
commit 3a2100aa78
126 changed files with 15776 additions and 542 deletions

View File

@@ -0,0 +1,69 @@
-- -----------------------------------------------------------------------------
-- Migration: 20251216_001_create_rekor_submission_queue.sql
-- Sprint: SPRINT_3000_0001_0002_rekor_retry_queue_metrics
-- Task: T1
-- Description: Create the Rekor submission queue table for durable retry
-- -----------------------------------------------------------------------------
-- Create attestor schema if not exists
CREATE SCHEMA IF NOT EXISTS attestor;
-- Create the queue table
CREATE TABLE IF NOT EXISTS attestor.rekor_submission_queue (
id UUID PRIMARY KEY,
tenant_id TEXT NOT NULL,
bundle_sha256 TEXT NOT NULL,
dsse_payload BYTEA NOT NULL,
backend TEXT NOT NULL DEFAULT 'primary',
-- Status lifecycle: pending -> submitting -> submitted | retrying -> dead_letter
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending', 'submitting', 'retrying', 'submitted', 'dead_letter')),
attempt_count INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 5,
next_retry_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Populated on success
rekor_uuid TEXT,
rekor_index BIGINT,
-- Populated on failure
last_error TEXT
);
-- Comments
COMMENT ON TABLE attestor.rekor_submission_queue IS
'Durable retry queue for Rekor transparency log submissions';
COMMENT ON COLUMN attestor.rekor_submission_queue.status IS
'Submission lifecycle: pending -> submitting -> (submitted | retrying -> dead_letter)';
COMMENT ON COLUMN attestor.rekor_submission_queue.backend IS
'Target Rekor backend (primary or mirror)';
COMMENT ON COLUMN attestor.rekor_submission_queue.dsse_payload IS
'Serialized DSSE envelope to submit';
-- Index for dequeue operations (status + next_retry_at for SKIP LOCKED queries)
CREATE INDEX IF NOT EXISTS idx_rekor_queue_dequeue
ON attestor.rekor_submission_queue (status, next_retry_at)
WHERE status IN ('pending', 'retrying');
-- Index for tenant-scoped queries
CREATE INDEX IF NOT EXISTS idx_rekor_queue_tenant
ON attestor.rekor_submission_queue (tenant_id);
-- Index for bundle lookup (deduplication check)
CREATE INDEX IF NOT EXISTS idx_rekor_queue_bundle
ON attestor.rekor_submission_queue (tenant_id, bundle_sha256);
-- Index for dead letter management
CREATE INDEX IF NOT EXISTS idx_rekor_queue_dead_letter
ON attestor.rekor_submission_queue (status, updated_at)
WHERE status = 'dead_letter';
-- Index for cleanup of completed submissions
CREATE INDEX IF NOT EXISTS idx_rekor_queue_completed
ON attestor.rekor_submission_queue (status, updated_at)
WHERE status = 'submitted';

View File

@@ -0,0 +1,524 @@
// -----------------------------------------------------------------------------
// PostgresRekorSubmissionQueue.cs
// Sprint: SPRINT_3000_0001_0002_rekor_retry_queue_metrics
// Task: T3
// Description: PostgreSQL implementation of the Rekor submission queue
// -----------------------------------------------------------------------------
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
using NpgsqlTypes;
using StellaOps.Attestor.Core.Observability;
using StellaOps.Attestor.Core.Options;
using StellaOps.Attestor.Core.Queue;
namespace StellaOps.Attestor.Infrastructure.Queue;
/// <summary>
/// PostgreSQL implementation of the Rekor submission queue.
/// Uses a dedicated table for queue persistence with optimistic locking.
/// </summary>
public sealed class PostgresRekorSubmissionQueue : IRekorSubmissionQueue
{
private readonly NpgsqlDataSource _dataSource;
private readonly RekorQueueOptions _options;
private readonly AttestorMetrics _metrics;
private readonly TimeProvider _timeProvider;
private readonly ILogger<PostgresRekorSubmissionQueue> _logger;
private const int DefaultCommandTimeoutSeconds = 30;
public PostgresRekorSubmissionQueue(
NpgsqlDataSource dataSource,
IOptions<RekorQueueOptions> options,
AttestorMetrics metrics,
TimeProvider timeProvider,
ILogger<PostgresRekorSubmissionQueue> logger)
{
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc />
public async Task<Guid> EnqueueAsync(
string tenantId,
string bundleSha256,
byte[] dssePayload,
string backend,
CancellationToken cancellationToken = default)
{
var now = _timeProvider.GetUtcNow();
var id = Guid.NewGuid();
const string sql = """
INSERT INTO attestor.rekor_submission_queue (
id, tenant_id, bundle_sha256, dsse_payload, backend,
status, attempt_count, max_attempts, next_retry_at,
created_at, updated_at
)
VALUES (
@id, @tenant_id, @bundle_sha256, @dsse_payload, @backend,
@status, 0, @max_attempts, @next_retry_at,
@created_at, @updated_at
)
""";
await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken);
await using var command = new NpgsqlCommand(sql, connection)
{
CommandTimeout = DefaultCommandTimeoutSeconds
};
command.Parameters.AddWithValue("@id", id);
command.Parameters.AddWithValue("@tenant_id", tenantId);
command.Parameters.AddWithValue("@bundle_sha256", bundleSha256);
command.Parameters.AddWithValue("@dsse_payload", dssePayload);
command.Parameters.AddWithValue("@backend", backend);
command.Parameters.AddWithValue("@status", RekorSubmissionStatus.Pending.ToString().ToLowerInvariant());
command.Parameters.AddWithValue("@max_attempts", _options.MaxAttempts);
command.Parameters.AddWithValue("@next_retry_at", now);
command.Parameters.AddWithValue("@created_at", now);
command.Parameters.AddWithValue("@updated_at", now);
await command.ExecuteNonQueryAsync(cancellationToken);
_metrics.RekorSubmissionStatusTotal.Add(1,
new("status", "pending"),
new("backend", backend));
_logger.LogDebug(
"Enqueued Rekor submission {Id} for bundle {BundleSha256} to {Backend}",
id, bundleSha256, backend);
return id;
}
/// <inheritdoc />
public async Task<IReadOnlyList<RekorQueueItem>> DequeueAsync(
int batchSize,
CancellationToken cancellationToken = default)
{
var now = _timeProvider.GetUtcNow();
// Use FOR UPDATE SKIP LOCKED for concurrent-safe dequeue
const string sql = """
UPDATE attestor.rekor_submission_queue
SET status = 'submitting', updated_at = @now
WHERE id IN (
SELECT id FROM attestor.rekor_submission_queue
WHERE status IN ('pending', 'retrying')
AND next_retry_at <= @now
ORDER BY next_retry_at ASC
LIMIT @batch_size
FOR UPDATE SKIP LOCKED
)
RETURNING id, tenant_id, bundle_sha256, dsse_payload, backend,
status, attempt_count, max_attempts, next_retry_at,
created_at, updated_at, last_error
""";
await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken);
await using var command = new NpgsqlCommand(sql, connection)
{
CommandTimeout = DefaultCommandTimeoutSeconds
};
command.Parameters.AddWithValue("@now", now);
command.Parameters.AddWithValue("@batch_size", batchSize);
var results = new List<RekorQueueItem>();
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
while (await reader.ReadAsync(cancellationToken))
{
var queuedAt = reader.GetDateTime(reader.GetOrdinal("created_at"));
var waitTime = (now - queuedAt).TotalSeconds;
_metrics.RekorQueueWaitTime.Record(waitTime);
results.Add(ReadQueueItem(reader));
}
return results;
}
/// <inheritdoc />
public async Task MarkSubmittedAsync(
Guid id,
string rekorUuid,
long? rekorIndex,
CancellationToken cancellationToken = default)
{
var now = _timeProvider.GetUtcNow();
const string sql = """
UPDATE attestor.rekor_submission_queue
SET status = 'submitted',
rekor_uuid = @rekor_uuid,
rekor_index = @rekor_index,
updated_at = @updated_at,
last_error = NULL
WHERE id = @id
RETURNING backend
""";
await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken);
await using var command = new NpgsqlCommand(sql, connection)
{
CommandTimeout = DefaultCommandTimeoutSeconds
};
command.Parameters.AddWithValue("@id", id);
command.Parameters.AddWithValue("@rekor_uuid", rekorUuid);
command.Parameters.AddWithValue("@rekor_index", (object?)rekorIndex ?? DBNull.Value);
command.Parameters.AddWithValue("@updated_at", now);
var backend = await command.ExecuteScalarAsync(cancellationToken) as string ?? "unknown";
_metrics.RekorSubmissionStatusTotal.Add(1,
new("status", "submitted"),
new("backend", backend));
_logger.LogInformation(
"Marked Rekor submission {Id} as submitted with UUID {RekorUuid}",
id, rekorUuid);
}
/// <inheritdoc />
public async Task MarkFailedAsync(
Guid id,
string errorMessage,
CancellationToken cancellationToken = default)
{
var now = _timeProvider.GetUtcNow();
// Fetch current state to determine next action
const string fetchSql = """
SELECT attempt_count, max_attempts, backend
FROM attestor.rekor_submission_queue
WHERE id = @id
FOR UPDATE
""";
await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken);
await using var transaction = await connection.BeginTransactionAsync(cancellationToken);
int attemptCount;
int maxAttempts;
string backend;
await using (var fetchCommand = new NpgsqlCommand(fetchSql, connection, transaction))
{
fetchCommand.Parameters.AddWithValue("@id", id);
await using var reader = await fetchCommand.ExecuteReaderAsync(cancellationToken);
if (!await reader.ReadAsync(cancellationToken))
{
_logger.LogWarning("Attempted to mark non-existent queue item {Id} as failed", id);
return;
}
attemptCount = reader.GetInt32(0);
maxAttempts = reader.GetInt32(1);
backend = reader.GetString(2);
}
attemptCount++;
var isDeadLetter = attemptCount >= maxAttempts;
if (isDeadLetter)
{
const string deadLetterSql = """
UPDATE attestor.rekor_submission_queue
SET status = 'dead_letter',
attempt_count = @attempt_count,
last_error = @last_error,
updated_at = @updated_at
WHERE id = @id
""";
await using var command = new NpgsqlCommand(deadLetterSql, connection, transaction);
command.Parameters.AddWithValue("@id", id);
command.Parameters.AddWithValue("@attempt_count", attemptCount);
command.Parameters.AddWithValue("@last_error", errorMessage);
command.Parameters.AddWithValue("@updated_at", now);
await command.ExecuteNonQueryAsync(cancellationToken);
_metrics.RekorSubmissionStatusTotal.Add(1,
new("status", "dead_letter"),
new("backend", backend));
_metrics.RekorDeadLetterTotal.Add(1, new("backend", backend));
_logger.LogError(
"Moved Rekor submission {Id} to dead letter after {Attempts} attempts: {Error}",
id, attemptCount, errorMessage);
}
else
{
var nextRetryAt = CalculateNextRetryTime(now, attemptCount);
const string retrySql = """
UPDATE attestor.rekor_submission_queue
SET status = 'retrying',
attempt_count = @attempt_count,
next_retry_at = @next_retry_at,
last_error = @last_error,
updated_at = @updated_at
WHERE id = @id
""";
await using var command = new NpgsqlCommand(retrySql, connection, transaction);
command.Parameters.AddWithValue("@id", id);
command.Parameters.AddWithValue("@attempt_count", attemptCount);
command.Parameters.AddWithValue("@next_retry_at", nextRetryAt);
command.Parameters.AddWithValue("@last_error", errorMessage);
command.Parameters.AddWithValue("@updated_at", now);
await command.ExecuteNonQueryAsync(cancellationToken);
_metrics.RekorSubmissionStatusTotal.Add(1,
new("status", "retrying"),
new("backend", backend));
_metrics.RekorRetryAttemptsTotal.Add(1,
new("backend", backend),
new("attempt", attemptCount.ToString()));
_logger.LogWarning(
"Marked Rekor submission {Id} for retry (attempt {Attempt}/{Max}): {Error}",
id, attemptCount, maxAttempts, errorMessage);
}
await transaction.CommitAsync(cancellationToken);
}
/// <inheritdoc />
public async Task<RekorQueueItem?> GetByIdAsync(
Guid id,
CancellationToken cancellationToken = default)
{
const string sql = """
SELECT id, tenant_id, bundle_sha256, dsse_payload, backend,
status, attempt_count, max_attempts, next_retry_at,
created_at, updated_at, last_error, rekor_uuid, rekor_index
FROM attestor.rekor_submission_queue
WHERE id = @id
""";
await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken);
await using var command = new NpgsqlCommand(sql, connection)
{
CommandTimeout = DefaultCommandTimeoutSeconds
};
command.Parameters.AddWithValue("@id", id);
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
if (!await reader.ReadAsync(cancellationToken))
{
return null;
}
return ReadQueueItem(reader);
}
/// <inheritdoc />
public async Task<IReadOnlyList<RekorQueueItem>> GetByBundleShaAsync(
string tenantId,
string bundleSha256,
CancellationToken cancellationToken = default)
{
const string sql = """
SELECT id, tenant_id, bundle_sha256, dsse_payload, backend,
status, attempt_count, max_attempts, next_retry_at,
created_at, updated_at, last_error, rekor_uuid, rekor_index
FROM attestor.rekor_submission_queue
WHERE tenant_id = @tenant_id AND bundle_sha256 = @bundle_sha256
ORDER BY created_at DESC
""";
await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken);
await using var command = new NpgsqlCommand(sql, connection)
{
CommandTimeout = DefaultCommandTimeoutSeconds
};
command.Parameters.AddWithValue("@tenant_id", tenantId);
command.Parameters.AddWithValue("@bundle_sha256", bundleSha256);
var results = new List<RekorQueueItem>();
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
while (await reader.ReadAsync(cancellationToken))
{
results.Add(ReadQueueItem(reader));
}
return results;
}
/// <inheritdoc />
public async Task<int> GetQueueDepthAsync(CancellationToken cancellationToken = default)
{
const string sql = """
SELECT COUNT(*)
FROM attestor.rekor_submission_queue
WHERE status IN ('pending', 'retrying', 'submitting')
""";
await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken);
await using var command = new NpgsqlCommand(sql, connection)
{
CommandTimeout = DefaultCommandTimeoutSeconds
};
var result = await command.ExecuteScalarAsync(cancellationToken);
return Convert.ToInt32(result);
}
/// <inheritdoc />
public async Task<IReadOnlyList<RekorQueueItem>> GetDeadLetterItemsAsync(
int limit,
CancellationToken cancellationToken = default)
{
const string sql = """
SELECT id, tenant_id, bundle_sha256, dsse_payload, backend,
status, attempt_count, max_attempts, next_retry_at,
created_at, updated_at, last_error, rekor_uuid, rekor_index
FROM attestor.rekor_submission_queue
WHERE status = 'dead_letter'
ORDER BY updated_at DESC
LIMIT @limit
""";
await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken);
await using var command = new NpgsqlCommand(sql, connection)
{
CommandTimeout = DefaultCommandTimeoutSeconds
};
command.Parameters.AddWithValue("@limit", limit);
var results = new List<RekorQueueItem>();
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
while (await reader.ReadAsync(cancellationToken))
{
results.Add(ReadQueueItem(reader));
}
return results;
}
/// <inheritdoc />
public async Task<bool> RequeueDeadLetterAsync(
Guid id,
CancellationToken cancellationToken = default)
{
var now = _timeProvider.GetUtcNow();
const string sql = """
UPDATE attestor.rekor_submission_queue
SET status = 'pending',
attempt_count = 0,
next_retry_at = @now,
last_error = NULL,
updated_at = @now
WHERE id = @id AND status = 'dead_letter'
RETURNING backend
""";
await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken);
await using var command = new NpgsqlCommand(sql, connection)
{
CommandTimeout = DefaultCommandTimeoutSeconds
};
command.Parameters.AddWithValue("@id", id);
command.Parameters.AddWithValue("@now", now);
var backend = await command.ExecuteScalarAsync(cancellationToken) as string;
if (backend is not null)
{
_metrics.RekorSubmissionStatusTotal.Add(1,
new("status", "pending"),
new("backend", backend));
_logger.LogInformation("Requeued dead letter item {Id} for retry", id);
return true;
}
return false;
}
/// <inheritdoc />
public async Task<int> PurgeSubmittedAsync(
TimeSpan olderThan,
CancellationToken cancellationToken = default)
{
var cutoff = _timeProvider.GetUtcNow().Add(-olderThan);
const string sql = """
DELETE FROM attestor.rekor_submission_queue
WHERE status = 'submitted' AND updated_at < @cutoff
""";
await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken);
await using var command = new NpgsqlCommand(sql, connection)
{
CommandTimeout = DefaultCommandTimeoutSeconds
};
command.Parameters.AddWithValue("@cutoff", cutoff);
var deleted = await command.ExecuteNonQueryAsync(cancellationToken);
if (deleted > 0)
{
_logger.LogInformation("Purged {Count} submitted queue items older than {Cutoff}", deleted, cutoff);
}
return deleted;
}
private DateTimeOffset CalculateNextRetryTime(DateTimeOffset now, int attemptCount)
{
// Exponential backoff: baseDelay * 2^attempt, capped at maxDelay
var delay = TimeSpan.FromSeconds(
Math.Min(
_options.BaseRetryDelaySeconds * Math.Pow(2, attemptCount - 1),
_options.MaxRetryDelaySeconds));
return now.Add(delay);
}
private static RekorQueueItem ReadQueueItem(NpgsqlDataReader reader)
{
return new RekorQueueItem
{
Id = reader.GetGuid(reader.GetOrdinal("id")),
TenantId = reader.GetString(reader.GetOrdinal("tenant_id")),
BundleSha256 = reader.GetString(reader.GetOrdinal("bundle_sha256")),
DssePayload = reader.GetFieldValue<byte[]>(reader.GetOrdinal("dsse_payload")),
Backend = reader.GetString(reader.GetOrdinal("backend")),
Status = Enum.Parse<RekorSubmissionStatus>(reader.GetString(reader.GetOrdinal("status")), ignoreCase: true),
AttemptCount = reader.GetInt32(reader.GetOrdinal("attempt_count")),
MaxAttempts = reader.GetInt32(reader.GetOrdinal("max_attempts")),
NextRetryAt = reader.GetDateTime(reader.GetOrdinal("next_retry_at")),
CreatedAt = reader.GetDateTime(reader.GetOrdinal("created_at")),
UpdatedAt = reader.GetDateTime(reader.GetOrdinal("updated_at")),
LastError = reader.IsDBNull(reader.GetOrdinal("last_error"))
? null
: reader.GetString(reader.GetOrdinal("last_error")),
RekorUuid = reader.IsDBNull(reader.GetOrdinal("rekor_uuid"))
? null
: reader.GetString(reader.GetOrdinal("rekor_uuid")),
RekorIndex = reader.IsDBNull(reader.GetOrdinal("rekor_index"))
? null
: reader.GetInt64(reader.GetOrdinal("rekor_index"))
};
}
}

View File

@@ -29,6 +29,7 @@ internal sealed class AttestorSubmissionService : IAttestorSubmissionService
private readonly IAttestorArchiveStore _archiveStore;
private readonly IAttestorAuditSink _auditSink;
private readonly IAttestorVerificationCache _verificationCache;
private readonly ITimeSkewValidator _timeSkewValidator;
private readonly ILogger<AttestorSubmissionService> _logger;
private readonly TimeProvider _timeProvider;
private readonly AttestorOptions _options;
@@ -43,6 +44,7 @@ internal sealed class AttestorSubmissionService : IAttestorSubmissionService
IAttestorArchiveStore archiveStore,
IAttestorAuditSink auditSink,
IAttestorVerificationCache verificationCache,
ITimeSkewValidator timeSkewValidator,
IOptions<AttestorOptions> options,
ILogger<AttestorSubmissionService> logger,
TimeProvider timeProvider,
@@ -56,6 +58,7 @@ internal sealed class AttestorSubmissionService : IAttestorSubmissionService
_archiveStore = archiveStore;
_auditSink = auditSink;
_verificationCache = verificationCache;
_timeSkewValidator = timeSkewValidator ?? throw new ArgumentNullException(nameof(timeSkewValidator));
_logger = logger;
_timeProvider = timeProvider;
_options = options.Value;
@@ -139,6 +142,20 @@ internal sealed class AttestorSubmissionService : IAttestorSubmissionService
throw new InvalidOperationException("No Rekor submission outcome was produced.");
}
// Validate time skew between Rekor integrated time and local time (SPRINT_3000_0001_0003 T5)
var timeSkewResult = ValidateSubmissionTimeSkew(canonicalOutcome);
if (!timeSkewResult.IsValid && _options.TimeSkew.FailOnReject)
{
_logger.LogError(
"Submission rejected due to time skew: BundleSha={BundleSha}, IntegratedTime={IntegratedTime:O}, LocalTime={LocalTime:O}, SkewSeconds={SkewSeconds:F1}, Status={Status}",
request.Meta.BundleSha256,
timeSkewResult.IntegratedTime,
timeSkewResult.LocalTime,
timeSkewResult.SkewSeconds,
timeSkewResult.Status);
throw new TimeSkewValidationException(timeSkewResult);
}
var entry = CreateEntry(request, context, canonicalOutcome, mirrorOutcome);
await _repository.SaveAsync(entry, cancellationToken).ConfigureAwait(false);
await InvalidateVerificationCacheAsync(cacheSubject, cancellationToken).ConfigureAwait(false);
@@ -490,6 +507,23 @@ internal sealed class AttestorSubmissionService : IAttestorSubmissionService
}
}
/// <summary>
/// Validates time skew between Rekor integrated time and local time.
/// Per SPRINT_3000_0001_0003 T5.
/// </summary>
private TimeSkewValidationResult ValidateSubmissionTimeSkew(SubmissionOutcome outcome)
{
if (outcome.Submission is null)
{
return TimeSkewValidationResult.Skipped("No submission response available");
}
var integratedTime = outcome.Submission.IntegratedTimeUtc;
var localTime = _timeProvider.GetUtcNow();
return _timeSkewValidator.Validate(integratedTime, localTime);
}
private async Task ArchiveAsync(
AttestorEntry entry,
byte[] canonicalBundle,

View File

@@ -25,6 +25,7 @@ internal sealed class AttestorVerificationService : IAttestorVerificationService
private readonly IRekorClient _rekorClient;
private readonly ITransparencyWitnessClient _witnessClient;
private readonly IAttestorVerificationEngine _engine;
private readonly ITimeSkewValidator _timeSkewValidator;
private readonly ILogger<AttestorVerificationService> _logger;
private readonly AttestorOptions _options;
private readonly AttestorMetrics _metrics;
@@ -37,6 +38,7 @@ internal sealed class AttestorVerificationService : IAttestorVerificationService
IRekorClient rekorClient,
ITransparencyWitnessClient witnessClient,
IAttestorVerificationEngine engine,
ITimeSkewValidator timeSkewValidator,
IOptions<AttestorOptions> options,
ILogger<AttestorVerificationService> logger,
AttestorMetrics metrics,
@@ -48,6 +50,7 @@ internal sealed class AttestorVerificationService : IAttestorVerificationService
_rekorClient = rekorClient ?? throw new ArgumentNullException(nameof(rekorClient));
_witnessClient = witnessClient ?? throw new ArgumentNullException(nameof(witnessClient));
_engine = engine ?? throw new ArgumentNullException(nameof(engine));
_timeSkewValidator = timeSkewValidator ?? throw new ArgumentNullException(nameof(timeSkewValidator));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
_activitySource = activitySource ?? throw new ArgumentNullException(nameof(activitySource));
@@ -72,13 +75,38 @@ internal sealed class AttestorVerificationService : IAttestorVerificationService
using var activity = _activitySource.StartVerification(subjectTag, issuerTag, policyId);
var evaluationTime = _timeProvider.GetUtcNow();
// Validate time skew between entry's integrated time and evaluation time (SPRINT_3000_0001_0003 T6)
var timeSkewResult = ValidateVerificationTimeSkew(entry, evaluationTime);
var additionalIssues = new List<string>();
if (!timeSkewResult.IsValid)
{
var issue = $"time_skew_rejected: {timeSkewResult.Message}";
_logger.LogWarning(
"Verification time skew issue for entry {Uuid}: IntegratedTime={IntegratedTime:O}, EvaluationTime={EvaluationTime:O}, SkewSeconds={SkewSeconds:F1}, Status={Status}",
entry.RekorUuid,
timeSkewResult.IntegratedTime,
evaluationTime,
timeSkewResult.SkewSeconds,
timeSkewResult.Status);
if (_options.TimeSkew.FailOnReject)
{
additionalIssues.Add(issue);
}
}
var report = await _engine.EvaluateAsync(entry, request.Bundle, evaluationTime, cancellationToken).ConfigureAwait(false);
var result = report.Succeeded ? "ok" : "failed";
// Merge any time skew issues with the report
var allIssues = report.Issues.Concat(additionalIssues).ToArray();
var succeeded = report.Succeeded && additionalIssues.Count == 0;
var result = succeeded ? "ok" : "failed";
activity?.SetTag(AttestorTelemetryTags.Result, result);
if (!report.Succeeded)
if (!succeeded)
{
activity?.SetStatus(ActivityStatusCode.Error, string.Join(",", report.Issues));
activity?.SetStatus(ActivityStatusCode.Error, string.Join(",", allIssues));
}
_metrics.VerifyTotal.Add(
@@ -98,17 +126,27 @@ internal sealed class AttestorVerificationService : IAttestorVerificationService
return new AttestorVerificationResult
{
Ok = report.Succeeded,
Ok = succeeded,
Uuid = entry.RekorUuid,
Index = entry.Index,
LogUrl = entry.Log.Url,
Status = entry.Status,
Issues = report.Issues,
Issues = allIssues,
CheckedAt = evaluationTime,
Report = report
Report = report with { Succeeded = succeeded, Issues = allIssues }
};
}
/// <summary>
/// Validates time skew between entry's integrated time and evaluation time.
/// Per SPRINT_3000_0001_0003 T6.
/// </summary>
private TimeSkewValidationResult ValidateVerificationTimeSkew(AttestorEntry entry, DateTimeOffset evaluationTime)
{
var integratedTime = entry.Log.IntegratedTimeUtc;
return _timeSkewValidator.Validate(integratedTime, evaluationTime);
}
public Task<AttestorEntry?> GetEntryAsync(string rekorUuid, bool refreshProof, CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(rekorUuid))

View File

@@ -0,0 +1,226 @@
// -----------------------------------------------------------------------------
// RekorRetryWorker.cs
// Sprint: SPRINT_3000_0001_0002_rekor_retry_queue_metrics
// Task: T7
// Description: Background service for processing the Rekor retry queue
// -----------------------------------------------------------------------------
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StellaOps.Attestor.Core.Observability;
using StellaOps.Attestor.Core.Options;
using StellaOps.Attestor.Core.Queue;
using StellaOps.Attestor.Core.Rekor;
using StellaOps.Attestor.Core.Submission;
namespace StellaOps.Attestor.Infrastructure.Workers;
/// <summary>
/// Background service for processing the Rekor submission retry queue.
/// </summary>
public sealed class RekorRetryWorker : BackgroundService
{
private readonly IRekorSubmissionQueue _queue;
private readonly IRekorClient _rekorClient;
private readonly RekorQueueOptions _options;
private readonly AttestorOptions _attestorOptions;
private readonly AttestorMetrics _metrics;
private readonly TimeProvider _timeProvider;
private readonly ILogger<RekorRetryWorker> _logger;
public RekorRetryWorker(
IRekorSubmissionQueue queue,
IRekorClient rekorClient,
IOptions<RekorQueueOptions> queueOptions,
IOptions<AttestorOptions> attestorOptions,
AttestorMetrics metrics,
TimeProvider timeProvider,
ILogger<RekorRetryWorker> logger)
{
_queue = queue ?? throw new ArgumentNullException(nameof(queue));
_rekorClient = rekorClient ?? throw new ArgumentNullException(nameof(rekorClient));
_options = queueOptions?.Value ?? throw new ArgumentNullException(nameof(queueOptions));
_attestorOptions = attestorOptions?.Value ?? throw new ArgumentNullException(nameof(attestorOptions));
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
// Register queue depth callback for metrics
_metrics.RegisterQueueDepthCallback(GetCurrentQueueDepth);
}
private int _lastKnownQueueDepth;
private int GetCurrentQueueDepth() => _lastKnownQueueDepth;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (!_options.Enabled)
{
_logger.LogInformation("Rekor retry queue is disabled");
return;
}
_logger.LogInformation(
"Rekor retry worker started with batch size {BatchSize}, poll interval {PollIntervalMs}ms",
_options.BatchSize, _options.PollIntervalMs);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await ProcessBatchAsync(stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Rekor retry worker error during batch processing");
_metrics.ErrorTotal.Add(1, new("type", "rekor_retry_worker"));
}
try
{
await Task.Delay(_options.PollIntervalMs, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
}
_logger.LogInformation("Rekor retry worker stopped");
}
private async Task ProcessBatchAsync(CancellationToken stoppingToken)
{
// Update queue depth gauge
var depth = await _queue.GetQueueDepthAsync(stoppingToken);
_lastKnownQueueDepth = depth.TotalWaiting;
if (depth.TotalWaiting == 0)
{
return;
}
_logger.LogDebug(
"Queue depth: pending={Pending}, submitting={Submitting}, retrying={Retrying}, dead_letter={DeadLetter}",
depth.Pending, depth.Submitting, depth.Retrying, depth.DeadLetter);
// Process batch
var items = await _queue.DequeueAsync(_options.BatchSize, stoppingToken);
if (items.Count == 0)
{
return;
}
_logger.LogDebug("Processing {Count} items from Rekor queue", items.Count);
foreach (var item in items)
{
if (stoppingToken.IsCancellationRequested)
break;
await ProcessItemAsync(item, stoppingToken);
}
// Purge old dead letter items periodically
if (_options.DeadLetterRetentionDays > 0 && depth.DeadLetter > 0)
{
await _queue.PurgeDeadLetterAsync(_options.DeadLetterRetentionDays, stoppingToken);
}
}
private async Task ProcessItemAsync(RekorQueueItem item, CancellationToken ct)
{
var attemptNumber = item.AttemptCount + 1;
_logger.LogDebug(
"Processing Rekor queue item {Id}, attempt {Attempt}/{MaxAttempts}, backend={Backend}",
item.Id, attemptNumber, item.MaxAttempts, item.Backend);
_metrics.RekorRetryAttemptsTotal.Add(1,
new("backend", item.Backend),
new("attempt", attemptNumber.ToString()));
try
{
var backend = ResolveBackend(item.Backend);
var request = BuildSubmissionRequest(item);
var response = await _rekorClient.SubmitAsync(request, backend, ct);
await _queue.MarkSubmittedAsync(
item.Id,
response.Uuid ?? string.Empty,
response.Index,
ct);
_logger.LogInformation(
"Rekor queue item {Id} successfully submitted: UUID={RekorUuid}, Index={LogIndex}",
item.Id, response.Uuid, response.Index);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Rekor queue item {Id} submission failed on attempt {Attempt}: {Message}",
item.Id, attemptNumber, ex.Message);
if (attemptNumber >= item.MaxAttempts)
{
await _queue.MarkDeadLetterAsync(item.Id, ex.Message, ct);
_logger.LogError(
"Rekor queue item {Id} exceeded max attempts ({MaxAttempts}), moved to dead letter",
item.Id, item.MaxAttempts);
}
else
{
await _queue.MarkRetryAsync(item.Id, ex.Message, ct);
}
}
}
private RekorBackend ResolveBackend(string backend)
{
return backend.ToLowerInvariant() switch
{
"primary" => new RekorBackend(
_attestorOptions.Rekor.Primary.Url ?? throw new InvalidOperationException("Primary Rekor URL not configured"),
"primary"),
"mirror" => new RekorBackend(
_attestorOptions.Rekor.Mirror.Url ?? throw new InvalidOperationException("Mirror Rekor URL not configured"),
"mirror"),
_ => throw new InvalidOperationException($"Unknown Rekor backend: {backend}")
};
}
private static AttestorSubmissionRequest BuildSubmissionRequest(RekorQueueItem item)
{
// Reconstruct the submission request from the stored payload
return new AttestorSubmissionRequest
{
TenantId = item.TenantId,
BundleSha256 = item.BundleSha256,
DssePayload = item.DssePayload
};
}
}
/// <summary>
/// Simple Rekor backend configuration.
/// </summary>
public sealed record RekorBackend(string Url, string Name);
/// <summary>
/// Submission request for the retry worker.
/// </summary>
public sealed class AttestorSubmissionRequest
{
public string TenantId { get; init; } = string.Empty;
public string BundleSha256 { get; init; } = string.Empty;
public byte[] DssePayload { get; init; } = Array.Empty<byte>();
}