product advisories, stella router improval, tests streghthening

This commit is contained in:
StellaOps Bot
2025-12-24 14:20:26 +02:00
parent 5540ce9430
commit 2c2bbf1005
171 changed files with 58943 additions and 135 deletions

View File

@@ -0,0 +1,935 @@
// ---------------------------------------------------------------------
// <copyright file="WorkerIdempotencyTests.cs" company="StellaOps">
// Copyright (c) StellaOps. Licensed under the AGPL-3.0-or-later.
// </copyright>
// <summary>
// Idempotency tests: same job processed twice → single execution result
// </summary>
// ---------------------------------------------------------------------
using System.Collections.Concurrent;
using FluentAssertions;
using Xunit;
namespace StellaOps.Scheduler.Worker.Tests.Idempotency;
/// <summary>
/// Idempotency tests for Scheduler Worker verifying that
/// same job processed twice results in single execution result.
/// </summary>
[Trait("Category", "Idempotency")]
[Trait("Sprint", "5100-0009-0008")]
public sealed class WorkerIdempotencyTests
{
#region Basic Idempotency Tests
/// <summary>
/// Verifies same job ID processed twice results in single execution.
/// </summary>
[Fact]
public async Task SameJobId_ProcessedTwice_SingleExecution()
{
// Arrange
var executionCount = 0;
var jobStore = new IdempotentJobStore();
var executor = new IdempotentJobExecutor(onExecute: _ => Interlocked.Increment(ref executionCount));
var worker = new IdempotentWorker(jobStore, executor);
var job = new IdempotentJob
{
Id = "idempotent-001",
TenantId = "tenant-001",
Type = "scan",
IdempotencyKey = "unique-key-001",
Status = IdempotentJobStatus.Pending
};
await jobStore.EnqueueAsync(job);
// Act - Process twice
await worker.ProcessAsync("idempotent-001", CancellationToken.None);
await worker.ProcessAsync("idempotent-001", CancellationToken.None);
// Assert
executionCount.Should().Be(1, because: "job should only execute once");
}
/// <summary>
/// Verifies completed job is not re-executed.
/// </summary>
[Fact]
public async Task CompletedJob_NotReExecuted()
{
// Arrange
var executionCount = 0;
var jobStore = new IdempotentJobStore();
var executor = new IdempotentJobExecutor(onExecute: _ => executionCount++);
var worker = new IdempotentWorker(jobStore, executor);
await jobStore.EnqueueAsync(new IdempotentJob
{
Id = "completed-001",
TenantId = "tenant-001",
Type = "scan",
Status = IdempotentJobStatus.Completed,
CompletedAt = DateTime.UtcNow.AddMinutes(-5)
});
// Act
await worker.ProcessAsync("completed-001", CancellationToken.None);
// Assert
executionCount.Should().Be(0);
}
/// <summary>
/// Verifies idempotency key prevents duplicate job execution.
/// </summary>
[Fact]
public async Task SameIdempotencyKey_PreventsduplicateExecution()
{
// Arrange
var executedJobs = new ConcurrentBag<string>();
var jobStore = new IdempotentJobStore();
var executor = new IdempotentJobExecutor(onExecute: j => executedJobs.Add(j.Id));
var worker = new IdempotentWorker(jobStore, executor);
// Same idempotency key, different job IDs
await jobStore.EnqueueAsync(new IdempotentJob
{
Id = "job-A",
TenantId = "tenant-001",
Type = "scan",
IdempotencyKey = "shared-key",
Status = IdempotentJobStatus.Pending
});
await jobStore.EnqueueAsync(new IdempotentJob
{
Id = "job-B",
TenantId = "tenant-001",
Type = "scan",
IdempotencyKey = "shared-key",
Status = IdempotentJobStatus.Pending
});
// Act
await worker.ProcessAsync("job-A", CancellationToken.None);
await worker.ProcessAsync("job-B", CancellationToken.None);
// Assert - Only first job should execute
executedJobs.Should().HaveCount(1);
executedJobs.Should().Contain("job-A");
}
#endregion
#region Concurrent Idempotency Tests
/// <summary>
/// Verifies concurrent processing of same job results in single execution.
/// </summary>
[Fact]
public async Task ConcurrentProcessing_SameJob_SingleExecution()
{
// Arrange
var executionCount = 0;
var jobStore = new IdempotentJobStore();
var executor = new IdempotentJobExecutor(
executionDelay: TimeSpan.FromMilliseconds(100),
onExecute: _ => Interlocked.Increment(ref executionCount)
);
var worker = new IdempotentWorker(jobStore, executor);
await jobStore.EnqueueAsync(new IdempotentJob
{
Id = "concurrent-001",
TenantId = "tenant-001",
Type = "scan",
Status = IdempotentJobStatus.Pending
});
// Act - Process concurrently
var tasks = Enumerable.Range(0, 10)
.Select(_ => worker.ProcessAsync("concurrent-001", CancellationToken.None));
await Task.WhenAll(tasks);
// Assert
executionCount.Should().Be(1);
}
/// <summary>
/// Verifies distributed lock prevents duplicate execution.
/// </summary>
[Fact]
public async Task DistributedLock_PreventsDuplicateExecution()
{
// Arrange
var executionCount = 0;
var lockProvider = new InMemoryLockProvider();
var jobStore = new IdempotentJobStore();
var executor = new IdempotentJobExecutor(
executionDelay: TimeSpan.FromMilliseconds(50),
onExecute: _ => Interlocked.Increment(ref executionCount)
);
var worker = new IdempotentWorker(jobStore, executor, lockProvider);
await jobStore.EnqueueAsync(new IdempotentJob
{
Id = "distributed-lock-001",
TenantId = "tenant-001",
Type = "scan",
Status = IdempotentJobStatus.Pending
});
// Act - Simulate multiple workers
var tasks = Enumerable.Range(0, 5)
.Select(_ => worker.ProcessAsync("distributed-lock-001", CancellationToken.None));
await Task.WhenAll(tasks);
// Assert
executionCount.Should().Be(1);
}
#endregion
#region Idempotency Window Tests
/// <summary>
/// Verifies idempotency window allows re-execution after expiry.
/// </summary>
[Fact]
public async Task IdempotencyWindow_AllowsReExecutionAfterExpiry()
{
// Arrange
var executionCount = 0;
var fakeClock = new FakeClock(DateTime.UtcNow);
var jobStore = new IdempotentJobStore(fakeClock);
var executor = new IdempotentJobExecutor(onExecute: _ => executionCount++);
var worker = new IdempotentWorker(
jobStore, executor,
idempotencyWindow: TimeSpan.FromMinutes(5),
clock: fakeClock
);
await jobStore.EnqueueAsync(new IdempotentJob
{
Id = "window-001",
TenantId = "tenant-001",
Type = "scan",
IdempotencyKey = "window-key",
Status = IdempotentJobStatus.Pending
});
// Act - First execution
await worker.ProcessAsync("window-001", CancellationToken.None);
executionCount.Should().Be(1);
// Reset job to pending and advance time past window
var job = await jobStore.GetByIdAsync("window-001");
job!.Status = IdempotentJobStatus.Pending;
await jobStore.UpdateAsync(job);
fakeClock.Advance(TimeSpan.FromMinutes(10));
// Second execution after window expiry
await worker.ProcessAsync("window-001", CancellationToken.None);
// Assert
executionCount.Should().Be(2, because: "re-execution allowed after window expiry");
}
/// <summary>
/// Verifies idempotency check within window.
/// </summary>
[Fact]
public async Task IdempotencyCheck_WithinWindow_PreventsExecution()
{
// Arrange
var executionCount = 0;
var fakeClock = new FakeClock(DateTime.UtcNow);
var idempotencyStore = new IdempotencyKeyStore();
var jobStore = new IdempotentJobStore(fakeClock);
var executor = new IdempotentJobExecutor(onExecute: _ => executionCount++);
var worker = new IdempotentWorker(
jobStore, executor,
idempotencyWindow: TimeSpan.FromMinutes(5),
clock: fakeClock,
idempotencyStore: idempotencyStore
);
// Record first execution
idempotencyStore.Record("key-within-window", fakeClock.UtcNow);
await jobStore.EnqueueAsync(new IdempotentJob
{
Id = "within-window-001",
TenantId = "tenant-001",
Type = "scan",
IdempotencyKey = "key-within-window",
Status = IdempotentJobStatus.Pending
});
// Advance time but stay within window
fakeClock.Advance(TimeSpan.FromMinutes(2));
// Act
await worker.ProcessAsync("within-window-001", CancellationToken.None);
// Assert
executionCount.Should().Be(0, because: "execution prevented within idempotency window");
}
#endregion
#region Result Caching Tests
/// <summary>
/// Verifies duplicate request returns cached result.
/// </summary>
[Fact]
public async Task DuplicateRequest_ReturnsCachedResult()
{
// Arrange
var jobStore = new IdempotentJobStore();
var executor = new IdempotentJobExecutor(result: """{"findings": 42}""");
var worker = new IdempotentWorker(jobStore, executor);
await jobStore.EnqueueAsync(new IdempotentJob
{
Id = "cached-001",
TenantId = "tenant-001",
Type = "scan",
IdempotencyKey = "cache-key",
Status = IdempotentJobStatus.Pending
});
// Act
var result1 = await worker.ProcessAndGetResultAsync("cached-001", CancellationToken.None);
var result2 = await worker.ProcessAndGetResultAsync("cached-001", CancellationToken.None);
// Assert
result1.Should().Be(result2);
result1.Should().Contain("findings");
}
/// <summary>
/// Verifies result cache is tenant-isolated.
/// </summary>
[Fact]
public async Task ResultCache_IsTenantIsolated()
{
// Arrange
var jobStore = new IdempotentJobStore();
var executedTenants = new List<string>();
var executor = new IdempotentJobExecutor(onExecute: j =>
{
lock (executedTenants) executedTenants.Add(j.TenantId);
});
var worker = new IdempotentWorker(jobStore, executor);
// Same idempotency key, different tenants
await jobStore.EnqueueAsync(new IdempotentJob
{
Id = "tenant-a-job",
TenantId = "tenant-A",
Type = "scan",
IdempotencyKey = "shared-key",
Status = IdempotentJobStatus.Pending
});
await jobStore.EnqueueAsync(new IdempotentJob
{
Id = "tenant-b-job",
TenantId = "tenant-B",
Type = "scan",
IdempotencyKey = "shared-key",
Status = IdempotentJobStatus.Pending
});
// Act
await worker.ProcessAsync("tenant-a-job", CancellationToken.None);
await worker.ProcessAsync("tenant-b-job", CancellationToken.None);
// Assert - Both should execute because of tenant isolation
executedTenants.Should().HaveCount(2);
executedTenants.Should().Contain("tenant-A");
executedTenants.Should().Contain("tenant-B");
}
#endregion
#region State Transition Tests
/// <summary>
/// Verifies running job blocks duplicate processing.
/// </summary>
[Fact]
public async Task RunningJob_BlocksDuplicateProcessing()
{
// Arrange
var executionCount = 0;
var executionStarted = new TaskCompletionSource<bool>();
var continueExecution = new TaskCompletionSource<bool>();
var jobStore = new IdempotentJobStore();
var executor = new IdempotentJobExecutor(
onExecuteStart: () => executionStarted.SetResult(true),
onExecuteWait: () => continueExecution.Task,
onExecute: _ => executionCount++
);
var worker = new IdempotentWorker(jobStore, executor);
await jobStore.EnqueueAsync(new IdempotentJob
{
Id = "running-block-001",
TenantId = "tenant-001",
Type = "scan",
Status = IdempotentJobStatus.Pending
});
// Act - Start first execution
var task1 = worker.ProcessAsync("running-block-001", CancellationToken.None);
await executionStarted.Task;
// Try second execution while first is running
var task2 = worker.ProcessAsync("running-block-001", CancellationToken.None);
// Complete first execution
continueExecution.SetResult(true);
await Task.WhenAll(task1, task2);
// Assert
executionCount.Should().Be(1);
}
/// <summary>
/// Verifies failed job with no retries left is not re-processed.
/// </summary>
[Fact]
public async Task FailedJobNoRetries_NotReProcessed()
{
// Arrange
var executionCount = 0;
var jobStore = new IdempotentJobStore();
var executor = new IdempotentJobExecutor(onExecute: _ => executionCount++);
var worker = new IdempotentWorker(jobStore, executor);
await jobStore.EnqueueAsync(new IdempotentJob
{
Id = "failed-no-retry-001",
TenantId = "tenant-001",
Type = "scan",
Status = IdempotentJobStatus.Failed,
RetryCount = 3,
MaxRetries = 3
});
// Act
await worker.ProcessAsync("failed-no-retry-001", CancellationToken.None);
// Assert
executionCount.Should().Be(0);
}
#endregion
#region Payload Hash Tests
/// <summary>
/// Verifies same payload hash is detected as duplicate.
/// </summary>
[Fact]
public async Task SamePayloadHash_DetectedAsDuplicate()
{
// Arrange
var executionCount = 0;
var jobStore = new IdempotentJobStore();
var executor = new IdempotentJobExecutor(onExecute: _ => executionCount++);
var worker = new IdempotentWorker(jobStore, executor, usePayloadHashing: true);
var payload = """{"target": "image:v1.0", "options": {"deep": true}}""";
await jobStore.EnqueueAsync(new IdempotentJob
{
Id = "payload-hash-001",
TenantId = "tenant-001",
Type = "scan",
Payload = payload,
Status = IdempotentJobStatus.Pending
});
await jobStore.EnqueueAsync(new IdempotentJob
{
Id = "payload-hash-002",
TenantId = "tenant-001",
Type = "scan",
Payload = payload, // Same payload
Status = IdempotentJobStatus.Pending
});
// Act
await worker.ProcessAsync("payload-hash-001", CancellationToken.None);
await worker.ProcessAsync("payload-hash-002", CancellationToken.None);
// Assert
executionCount.Should().Be(1);
}
/// <summary>
/// Verifies different payload is not detected as duplicate.
/// </summary>
[Fact]
public async Task DifferentPayload_NotDuplicate()
{
// Arrange
var executionCount = 0;
var jobStore = new IdempotentJobStore();
var executor = new IdempotentJobExecutor(onExecute: _ => executionCount++);
var worker = new IdempotentWorker(jobStore, executor, usePayloadHashing: true);
await jobStore.EnqueueAsync(new IdempotentJob
{
Id = "diff-payload-001",
TenantId = "tenant-001",
Type = "scan",
Payload = """{"target": "image:v1.0"}""",
Status = IdempotentJobStatus.Pending
});
await jobStore.EnqueueAsync(new IdempotentJob
{
Id = "diff-payload-002",
TenantId = "tenant-001",
Type = "scan",
Payload = """{"target": "image:v2.0"}""", // Different payload
Status = IdempotentJobStatus.Pending
});
// Act
await worker.ProcessAsync("diff-payload-001", CancellationToken.None);
await worker.ProcessAsync("diff-payload-002", CancellationToken.None);
// Assert
executionCount.Should().Be(2);
}
#endregion
#region Exactly-Once Semantics Tests
/// <summary>
/// Verifies exactly-once delivery semantics.
/// </summary>
[Fact]
public async Task ExactlyOnce_Delivery()
{
// Arrange
var deliveryCount = new ConcurrentDictionary<string, int>();
var jobStore = new IdempotentJobStore();
var executor = new IdempotentJobExecutor(
onExecute: j => deliveryCount.AddOrUpdate(j.Id, 1, (_, c) => c + 1)
);
var worker = new IdempotentWorker(jobStore, executor);
var jobIds = Enumerable.Range(0, 10).Select(i => $"exactly-once-{i}").ToList();
foreach (var id in jobIds)
{
await jobStore.EnqueueAsync(new IdempotentJob
{
Id = id,
TenantId = "tenant-001",
Type = "scan",
Status = IdempotentJobStatus.Pending
});
}
// Act - Process each job multiple times concurrently
var tasks = jobIds.SelectMany(id =>
Enumerable.Range(0, 5).Select(_ => worker.ProcessAsync(id, CancellationToken.None))
);
await Task.WhenAll(tasks);
// Assert - Each job delivered exactly once
deliveryCount.Should().HaveCount(10);
deliveryCount.Values.Should().AllBeEquivalentTo(1);
}
/// <summary>
/// Verifies transactional outbox pattern for exactly-once.
/// </summary>
[Fact]
public async Task TransactionalOutbox_EnsuresExactlyOnce()
{
// Arrange
var processedMessages = new ConcurrentBag<string>();
var outbox = new InMemoryOutbox();
var jobStore = new IdempotentJobStore();
var executor = new IdempotentJobExecutor(
onExecute: j =>
{
outbox.Add(j.Id);
processedMessages.Add(j.Id);
}
);
var worker = new IdempotentWorker(jobStore, executor, outbox: outbox);
await jobStore.EnqueueAsync(new IdempotentJob
{
Id = "outbox-001",
TenantId = "tenant-001",
Type = "scan",
Status = IdempotentJobStatus.Pending
});
// Act - Simulate retry after apparent failure
await worker.ProcessAsync("outbox-001", CancellationToken.None);
await worker.ProcessAsync("outbox-001", CancellationToken.None); // Retry
// Assert - Message in outbox exactly once
outbox.Messages.Should().HaveCount(1);
outbox.Messages.Should().Contain("outbox-001");
}
#endregion
}
#region Test Infrastructure
/// <summary>
/// Job status for idempotency testing.
/// </summary>
public enum IdempotentJobStatus
{
Pending,
Running,
Completed,
Failed
}
/// <summary>
/// Job model for idempotency testing.
/// </summary>
public sealed class IdempotentJob
{
public required string Id { get; set; }
public required string TenantId { get; set; }
public required string Type { get; set; }
public string Payload { get; set; } = "{}";
public string? IdempotencyKey { get; set; }
public IdempotentJobStatus Status { get; set; } = IdempotentJobStatus.Pending;
public int RetryCount { get; set; } = 0;
public int MaxRetries { get; set; } = 3;
public string? Result { get; set; }
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
public DateTime? CompletedAt { get; set; }
}
/// <summary>
/// Job store for idempotency testing.
/// </summary>
public sealed class IdempotentJobStore
{
private readonly ConcurrentDictionary<string, IdempotentJob> _jobs = new();
private readonly FakeClock? _clock;
private readonly object _lock = new();
public IdempotentJobStore(FakeClock? clock = null)
{
_clock = clock;
}
public Task EnqueueAsync(IdempotentJob job)
{
_jobs[job.Id] = job;
return Task.CompletedTask;
}
public Task<IdempotentJob?> GetByIdAsync(string id)
{
_jobs.TryGetValue(id, out var job);
return Task.FromResult(job);
}
public Task UpdateAsync(IdempotentJob job)
{
_jobs[job.Id] = job;
return Task.CompletedTask;
}
public Task<bool> TryAcquireAsync(string id)
{
lock (_lock)
{
if (!_jobs.TryGetValue(id, out var job)) return Task.FromResult(false);
if (job.Status != IdempotentJobStatus.Pending) return Task.FromResult(false);
job.Status = IdempotentJobStatus.Running;
return Task.FromResult(true);
}
}
public Task CompleteAsync(string id, string result)
{
if (_jobs.TryGetValue(id, out var job))
{
job.Status = IdempotentJobStatus.Completed;
job.Result = result;
job.CompletedAt = _clock?.UtcNow ?? DateTime.UtcNow;
}
return Task.CompletedTask;
}
}
/// <summary>
/// Job executor for idempotency testing.
/// </summary>
public sealed class IdempotentJobExecutor
{
private readonly string _result;
private readonly TimeSpan _executionDelay;
private readonly Action<IdempotentJob>? _onExecute;
private readonly Action? _onExecuteStart;
private readonly Func<Task>? _onExecuteWait;
public IdempotentJobExecutor(
string result = """{"status": "success"}""",
TimeSpan executionDelay = default,
Action<IdempotentJob>? onExecute = null,
Action? onExecuteStart = null,
Func<Task>? onExecuteWait = null)
{
_result = result;
_executionDelay = executionDelay;
_onExecute = onExecute;
_onExecuteStart = onExecuteStart;
_onExecuteWait = onExecuteWait;
}
public async Task<string> ExecuteAsync(IdempotentJob job, CancellationToken cancellationToken)
{
_onExecuteStart?.Invoke();
if (_onExecuteWait != null)
{
await _onExecuteWait();
}
if (_executionDelay > TimeSpan.Zero)
{
await Task.Delay(_executionDelay, cancellationToken);
}
_onExecute?.Invoke(job);
return _result;
}
}
/// <summary>
/// In-memory distributed lock provider.
/// </summary>
public sealed class InMemoryLockProvider
{
private readonly ConcurrentDictionary<string, SemaphoreSlim> _locks = new();
public async Task<IDisposable?> TryAcquireAsync(string key, CancellationToken cancellationToken)
{
var semaphore = _locks.GetOrAdd(key, _ => new SemaphoreSlim(1, 1));
if (await semaphore.WaitAsync(0, cancellationToken))
{
return new LockRelease(semaphore);
}
return null;
}
private sealed class LockRelease : IDisposable
{
private readonly SemaphoreSlim _semaphore;
private bool _disposed;
public LockRelease(SemaphoreSlim semaphore) => _semaphore = semaphore;
public void Dispose()
{
if (_disposed) return;
_semaphore.Release();
_disposed = true;
}
}
}
/// <summary>
/// Idempotency key store.
/// </summary>
public sealed class IdempotencyKeyStore
{
private readonly ConcurrentDictionary<string, DateTime> _keys = new();
public void Record(string key, DateTime timestamp) => _keys[key] = timestamp;
public bool HasKey(string key) => _keys.ContainsKey(key);
public bool IsWithinWindow(string key, DateTime now, TimeSpan window)
{
if (!_keys.TryGetValue(key, out var timestamp)) return false;
return now - timestamp < window;
}
}
/// <summary>
/// Fake clock for testing.
/// </summary>
public sealed class FakeClock
{
public DateTime UtcNow { get; private set; }
public FakeClock(DateTime initial) => UtcNow = initial;
public void Advance(TimeSpan duration) => UtcNow = UtcNow.Add(duration);
}
/// <summary>
/// In-memory outbox for exactly-once testing.
/// </summary>
public sealed class InMemoryOutbox
{
private readonly ConcurrentDictionary<string, bool> _messages = new();
public IReadOnlyCollection<string> Messages => _messages.Keys.ToList();
public bool Add(string messageId) => _messages.TryAdd(messageId, true);
public bool Contains(string messageId) => _messages.ContainsKey(messageId);
}
/// <summary>
/// Idempotent worker for testing.
/// </summary>
public sealed class IdempotentWorker
{
private readonly IdempotentJobStore _jobStore;
private readonly IdempotentJobExecutor _executor;
private readonly InMemoryLockProvider? _lockProvider;
private readonly TimeSpan _idempotencyWindow;
private readonly FakeClock? _clock;
private readonly IdempotencyKeyStore? _idempotencyStore;
private readonly bool _usePayloadHashing;
private readonly InMemoryOutbox? _outbox;
private readonly ConcurrentDictionary<string, string> _resultCache = new();
private readonly ConcurrentDictionary<string, bool> _payloadHashes = new();
public IdempotentWorker(
IdempotentJobStore jobStore,
IdempotentJobExecutor executor,
InMemoryLockProvider? lockProvider = null,
TimeSpan idempotencyWindow = default,
FakeClock? clock = null,
IdempotencyKeyStore? idempotencyStore = null,
bool usePayloadHashing = false,
InMemoryOutbox? outbox = null)
{
_jobStore = jobStore;
_executor = executor;
_lockProvider = lockProvider;
_idempotencyWindow = idempotencyWindow == default ? TimeSpan.FromMinutes(5) : idempotencyWindow;
_clock = clock;
_idempotencyStore = idempotencyStore;
_usePayloadHashing = usePayloadHashing;
_outbox = outbox;
}
public async Task<bool> ProcessAsync(string jobId, CancellationToken cancellationToken)
{
var job = await _jobStore.GetByIdAsync(jobId);
if (job == null) return false;
// Check if already completed
if (job.Status == IdempotentJobStatus.Completed) return false;
// Check if failed with no retries left
if (job.Status == IdempotentJobStatus.Failed && job.RetryCount >= job.MaxRetries)
return false;
// Check idempotency key
var idempotencyKey = GetIdempotencyKey(job);
if (_idempotencyStore != null)
{
var now = _clock?.UtcNow ?? DateTime.UtcNow;
if (_idempotencyStore.IsWithinWindow(idempotencyKey, now, _idempotencyWindow))
return false;
}
// Check outbox
if (_outbox != null && _outbox.Contains(jobId))
return false;
// Check payload hash
if (_usePayloadHashing)
{
var payloadHash = ComputePayloadHash(job);
if (!_payloadHashes.TryAdd(payloadHash, true))
return false;
}
// Try to acquire lock
IDisposable? lockHandle = null;
if (_lockProvider != null)
{
lockHandle = await _lockProvider.TryAcquireAsync(jobId, cancellationToken);
if (lockHandle == null) return false;
}
try
{
// Try to transition to running
if (!await _jobStore.TryAcquireAsync(jobId))
return false;
// Execute
var result = await _executor.ExecuteAsync(job, cancellationToken);
// Complete
await _jobStore.CompleteAsync(jobId, result);
_resultCache[idempotencyKey] = result;
// Record in idempotency store
var now = _clock?.UtcNow ?? DateTime.UtcNow;
_idempotencyStore?.Record(idempotencyKey, now);
return true;
}
finally
{
lockHandle?.Dispose();
}
}
public async Task<string?> ProcessAndGetResultAsync(string jobId, CancellationToken cancellationToken)
{
var job = await _jobStore.GetByIdAsync(jobId);
if (job == null) return null;
var idempotencyKey = GetIdempotencyKey(job);
// Return cached result if available
if (_resultCache.TryGetValue(idempotencyKey, out var cachedResult))
return cachedResult;
await ProcessAsync(jobId, cancellationToken);
_resultCache.TryGetValue(idempotencyKey, out var result);
return result ?? job.Result;
}
private string GetIdempotencyKey(IdempotentJob job)
{
return job.IdempotencyKey ?? $"{job.TenantId}:{job.Id}";
}
private static string ComputePayloadHash(IdempotentJob job)
{
using var sha256 = System.Security.Cryptography.SHA256.Create();
var combined = $"{job.TenantId}:{job.Type}:{job.Payload}";
var hash = sha256.ComputeHash(System.Text.Encoding.UTF8.GetBytes(combined));
return Convert.ToHexString(hash);
}
}
#endregion

View File

@@ -0,0 +1,950 @@
// ---------------------------------------------------------------------
// <copyright file="WorkerOTelCorrelationTests.cs" company="StellaOps">
// Copyright (c) StellaOps. Licensed under the AGPL-3.0-or-later.
// </copyright>
// <summary>
// OTel correlation tests: verify trace spans across job lifecycle
// (enqueue → pick → execute → complete)
// </summary>
// ---------------------------------------------------------------------
using System.Collections.Concurrent;
using System.Diagnostics;
using FluentAssertions;
using Xunit;
namespace StellaOps.Scheduler.Worker.Tests.Observability;
/// <summary>
/// OTel correlation tests for Scheduler Worker verifying trace spans
/// across the complete job lifecycle: enqueue → pick → execute → complete.
/// </summary>
[Trait("Category", "Observability")]
[Trait("Sprint", "5100-0009-0008")]
public sealed class WorkerOTelCorrelationTests : IDisposable
{
private readonly ActivityListener _listener;
private readonly ConcurrentBag<Activity> _capturedActivities;
private readonly ActivitySource _source;
public WorkerOTelCorrelationTests()
{
_capturedActivities = new ConcurrentBag<Activity>();
_source = new ActivitySource("StellaOps.Scheduler.Worker.Tests");
_listener = new ActivityListener
{
ShouldListenTo = source => source.Name.StartsWith("StellaOps", StringComparison.OrdinalIgnoreCase),
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllDataAndRecorded,
ActivityStopped = activity => _capturedActivities.Add(activity)
};
ActivitySource.AddActivityListener(_listener);
}
public void Dispose()
{
_listener.Dispose();
_source.Dispose();
}
#region Trace Continuity Tests
/// <summary>
/// Verifies trace ID is preserved across job lifecycle stages.
/// </summary>
[Fact]
public async Task TraceId_PreservedAcrossJobLifecycle()
{
// Arrange
ClearCapturedActivities();
var jobStore = new TracedJobStore(_source);
var executor = new TracedJobExecutor(_source);
var worker = new TracedSchedulerWorker(jobStore, executor, _source);
// Create a parent activity to establish trace context
using var parentActivity = _source.StartActivity("job.lifecycle.test");
parentActivity.Should().NotBeNull();
var expectedTraceId = parentActivity!.TraceId;
var job = new TracedJob
{
Id = "trace-lifecycle-001",
TenantId = "tenant-001",
Type = "scan",
TraceId = expectedTraceId.ToString()
};
// Act
await jobStore.EnqueueAsync(job);
await worker.ProcessAsync(job.Id, CancellationToken.None);
// Assert - All activities should share the same trace ID
var jobActivities = _capturedActivities
.Where(a => a.Tags.Any(t => t.Key == "job_id" && t.Value == job.Id))
.ToList();
jobActivities.Should().NotBeEmpty();
foreach (var activity in jobActivities)
{
activity.TraceId.Should().Be(expectedTraceId,
because: $"activity '{activity.OperationName}' should preserve trace context");
}
}
/// <summary>
/// Verifies parent-child relationships form correct span hierarchy.
/// </summary>
[Fact]
public async Task SpanHierarchy_FormsCorrectParentChildChain()
{
// Arrange
ClearCapturedActivities();
var jobStore = new TracedJobStore(_source);
var executor = new TracedJobExecutor(_source);
var worker = new TracedSchedulerWorker(jobStore, executor, _source);
var job = new TracedJob
{
Id = "hierarchy-001",
TenantId = "tenant-001",
Type = "scan"
};
// Act
using var rootActivity = _source.StartActivity("job.lifecycle.root");
await jobStore.EnqueueAsync(job);
await worker.ProcessAsync(job.Id, CancellationToken.None);
rootActivity?.Stop();
// Assert - Build parent-child relationships
var activities = _capturedActivities.ToList();
var activityMap = activities.ToDictionary(a => a.Id ?? "", a => a);
// Verify each child has valid parent (except root)
foreach (var activity in activities.Where(a => !string.IsNullOrEmpty(a.ParentId)))
{
// Parent should exist in our captured activities or be the root
(activityMap.ContainsKey(activity.ParentId!) || activity.ParentId == rootActivity?.Id)
.Should().BeTrue(
because: $"activity '{activity.OperationName}' should have valid parent");
}
}
#endregion
#region Job Lifecycle Span Tests
/// <summary>
/// Verifies enqueue operation creates correctly named span.
/// </summary>
[Fact]
public async Task EnqueueOperation_CreatesNamedSpan()
{
// Arrange
ClearCapturedActivities();
var jobStore = new TracedJobStore(_source);
var job = new TracedJob
{
Id = "enqueue-span-001",
TenantId = "tenant-001",
Type = "scan"
};
// Act
await jobStore.EnqueueAsync(job);
// Assert
var enqueueSpan = _capturedActivities
.FirstOrDefault(a => a.OperationName.Contains("enqueue", StringComparison.OrdinalIgnoreCase));
enqueueSpan.Should().NotBeNull();
enqueueSpan!.Tags.Should().Contain(t => t.Key == "job_id" && t.Value == job.Id);
}
/// <summary>
/// Verifies pick operation creates correctly named span.
/// </summary>
[Fact]
public async Task PickOperation_CreatesNamedSpan()
{
// Arrange
ClearCapturedActivities();
var jobStore = new TracedJobStore(_source);
var executor = new TracedJobExecutor(_source);
var worker = new TracedSchedulerWorker(jobStore, executor, _source);
var job = new TracedJob
{
Id = "pick-span-001",
TenantId = "tenant-001",
Type = "scan"
};
await jobStore.EnqueueAsync(job);
// Act
ClearCapturedActivities();
await worker.ProcessAsync(job.Id, CancellationToken.None);
// Assert
var pickSpan = _capturedActivities
.FirstOrDefault(a => a.OperationName.Contains("pick", StringComparison.OrdinalIgnoreCase)
|| a.OperationName.Contains("dequeue", StringComparison.OrdinalIgnoreCase)
|| a.OperationName.Contains("acquire", StringComparison.OrdinalIgnoreCase));
pickSpan.Should().NotBeNull();
}
/// <summary>
/// Verifies execute operation creates correctly named span.
/// </summary>
[Fact]
public async Task ExecuteOperation_CreatesNamedSpan()
{
// Arrange
ClearCapturedActivities();
var jobStore = new TracedJobStore(_source);
var executor = new TracedJobExecutor(_source);
var worker = new TracedSchedulerWorker(jobStore, executor, _source);
var job = new TracedJob
{
Id = "execute-span-001",
TenantId = "tenant-001",
Type = "scan"
};
await jobStore.EnqueueAsync(job);
// Act
ClearCapturedActivities();
await worker.ProcessAsync(job.Id, CancellationToken.None);
// Assert
var executeSpan = _capturedActivities
.FirstOrDefault(a => a.OperationName.Contains("execute", StringComparison.OrdinalIgnoreCase));
executeSpan.Should().NotBeNull();
executeSpan!.Tags.Should().Contain(t => t.Key == "job_id" && t.Value == job.Id);
}
/// <summary>
/// Verifies complete operation creates correctly named span.
/// </summary>
[Fact]
public async Task CompleteOperation_CreatesNamedSpan()
{
// Arrange
ClearCapturedActivities();
var jobStore = new TracedJobStore(_source);
var executor = new TracedJobExecutor(_source);
var worker = new TracedSchedulerWorker(jobStore, executor, _source);
var job = new TracedJob
{
Id = "complete-span-001",
TenantId = "tenant-001",
Type = "scan"
};
await jobStore.EnqueueAsync(job);
// Act
ClearCapturedActivities();
await worker.ProcessAsync(job.Id, CancellationToken.None);
// Assert
var completeSpan = _capturedActivities
.FirstOrDefault(a => a.OperationName.Contains("complete", StringComparison.OrdinalIgnoreCase));
completeSpan.Should().NotBeNull();
}
#endregion
#region Span Attribute Tests
/// <summary>
/// Verifies all job spans include job_id attribute.
/// </summary>
[Fact]
public async Task AllJobSpans_IncludeJobIdAttribute()
{
// Arrange
ClearCapturedActivities();
var jobStore = new TracedJobStore(_source);
var executor = new TracedJobExecutor(_source);
var worker = new TracedSchedulerWorker(jobStore, executor, _source);
var job = new TracedJob
{
Id = "attr-job-id-001",
TenantId = "tenant-001",
Type = "scan"
};
// Act
await jobStore.EnqueueAsync(job);
await worker.ProcessAsync(job.Id, CancellationToken.None);
// Assert
var jobSpans = _capturedActivities
.Where(a => a.OperationName.Contains("job", StringComparison.OrdinalIgnoreCase)
|| a.Tags.Any(t => t.Key == "job_id"))
.ToList();
foreach (var span in jobSpans)
{
span.Tags.Should().Contain(t => t.Key == "job_id",
because: $"span '{span.OperationName}' should include job_id");
}
}
/// <summary>
/// Verifies all job spans include tenant_id attribute.
/// </summary>
[Fact]
public async Task AllJobSpans_IncludeTenantIdAttribute()
{
// Arrange
const string expectedTenantId = "tenant-attr-test";
ClearCapturedActivities();
var jobStore = new TracedJobStore(_source);
var executor = new TracedJobExecutor(_source);
var worker = new TracedSchedulerWorker(jobStore, executor, _source);
var job = new TracedJob
{
Id = "attr-tenant-001",
TenantId = expectedTenantId,
Type = "scan"
};
// Act
await jobStore.EnqueueAsync(job);
await worker.ProcessAsync(job.Id, CancellationToken.None);
// Assert
var jobSpans = _capturedActivities
.Where(a => a.Tags.Any(t => t.Key == "tenant_id"))
.ToList();
foreach (var span in jobSpans)
{
var tenantTag = span.Tags.FirstOrDefault(t => t.Key == "tenant_id");
tenantTag.Value.Should().Be(expectedTenantId);
}
}
/// <summary>
/// Verifies schedule_id attribute is included when applicable.
/// </summary>
[Fact]
public async Task JobFromSchedule_IncludesScheduleIdAttribute()
{
// Arrange
const string scheduleId = "schedule-001";
ClearCapturedActivities();
var jobStore = new TracedJobStore(_source);
var executor = new TracedJobExecutor(_source);
var worker = new TracedSchedulerWorker(jobStore, executor, _source);
var job = new TracedJob
{
Id = "attr-schedule-001",
TenantId = "tenant-001",
Type = "scan",
ScheduleId = scheduleId
};
// Act
await jobStore.EnqueueAsync(job);
await worker.ProcessAsync(job.Id, CancellationToken.None);
// Assert
var jobSpans = _capturedActivities
.Where(a => a.Tags.Any(t => t.Key == "schedule_id"))
.ToList();
jobSpans.Should().NotBeEmpty();
foreach (var span in jobSpans)
{
var scheduleTag = span.Tags.FirstOrDefault(t => t.Key == "schedule_id");
scheduleTag.Value.Should().Be(scheduleId);
}
}
#endregion
#region Error Span Tests
/// <summary>
/// Verifies failed job creates error span with exception details.
/// </summary>
[Fact]
public async Task FailedJob_CreatesErrorSpan()
{
// Arrange
ClearCapturedActivities();
var jobStore = new TracedJobStore(_source);
var executor = new TracedJobExecutor(_source, shouldFail: true, errorMessage: "Test execution failure");
var worker = new TracedSchedulerWorker(jobStore, executor, _source);
var job = new TracedJob
{
Id = "error-span-001",
TenantId = "tenant-001",
Type = "scan"
};
await jobStore.EnqueueAsync(job);
// Act
ClearCapturedActivities();
try
{
await worker.ProcessAsync(job.Id, CancellationToken.None);
}
catch
{
// Expected
}
// Assert
var errorSpans = _capturedActivities
.Where(a => a.Status == ActivityStatusCode.Error
|| a.Tags.Any(t => t.Key == "otel.status_code" && t.Value == "ERROR")
|| a.Events.Any(e => e.Name == "exception"))
.ToList();
errorSpans.Should().NotBeEmpty();
}
/// <summary>
/// Verifies exception events include required attributes.
/// </summary>
[Fact]
public async Task ExceptionEvent_IncludesRequiredAttributes()
{
// Arrange
ClearCapturedActivities();
var jobStore = new TracedJobStore(_source);
var executor = new TracedJobExecutor(_source, shouldFail: true, errorMessage: "Detailed error message");
var worker = new TracedSchedulerWorker(jobStore, executor, _source);
var job = new TracedJob
{
Id = "exception-attrs-001",
TenantId = "tenant-001",
Type = "scan"
};
await jobStore.EnqueueAsync(job);
// Act
ClearCapturedActivities();
try
{
await worker.ProcessAsync(job.Id, CancellationToken.None);
}
catch
{
// Expected
}
// Assert
var spansWithExceptions = _capturedActivities
.Where(a => a.Events.Any(e => e.Name == "exception"))
.ToList();
foreach (var span in spansWithExceptions)
{
var exceptionEvent = span.Events.First(e => e.Name == "exception");
// Per OTel semantic conventions
exceptionEvent.Tags.Should().Contain(t => t.Key == "exception.type");
exceptionEvent.Tags.Should().Contain(t => t.Key == "exception.message");
}
}
#endregion
#region Baggage Propagation Tests
/// <summary>
/// Verifies baggage is propagated across job lifecycle.
/// </summary>
[Fact]
public async Task Baggage_PropagatedAcrossLifecycle()
{
// Arrange
ClearCapturedActivities();
var jobStore = new TracedJobStore(_source);
var executor = new TracedJobExecutor(_source);
var worker = new TracedSchedulerWorker(jobStore, executor, _source);
using var parentActivity = _source.StartActivity("baggage.test");
parentActivity?.SetBaggage("request_id", "req-12345");
parentActivity?.SetBaggage("user_id", "user-67890");
var job = new TracedJob
{
Id = "baggage-001",
TenantId = "tenant-001",
Type = "scan"
};
// Act
await jobStore.EnqueueAsync(job);
await worker.ProcessAsync(job.Id, CancellationToken.None);
// Assert
var childActivities = _capturedActivities
.Where(a => a.TraceId == parentActivity?.TraceId && a.Id != parentActivity?.Id)
.ToList();
foreach (var activity in childActivities)
{
// Baggage should be propagated
var requestIdBaggage = activity.Baggage.FirstOrDefault(b => b.Key == "request_id");
var userIdBaggage = activity.Baggage.FirstOrDefault(b => b.Key == "user_id");
if (!string.IsNullOrEmpty(requestIdBaggage.Value))
{
requestIdBaggage.Value.Should().Be("req-12345");
}
}
}
#endregion
#region Timing Tests
/// <summary>
/// Verifies span durations are recorded correctly.
/// </summary>
[Fact]
public async Task SpanDurations_RecordedCorrectly()
{
// Arrange
ClearCapturedActivities();
var jobStore = new TracedJobStore(_source);
var executor = new TracedJobExecutor(_source, executionDelay: TimeSpan.FromMilliseconds(100));
var worker = new TracedSchedulerWorker(jobStore, executor, _source);
var job = new TracedJob
{
Id = "duration-001",
TenantId = "tenant-001",
Type = "scan"
};
await jobStore.EnqueueAsync(job);
// Act
ClearCapturedActivities();
await worker.ProcessAsync(job.Id, CancellationToken.None);
// Assert
var executeSpan = _capturedActivities
.FirstOrDefault(a => a.OperationName.Contains("execute", StringComparison.OrdinalIgnoreCase));
executeSpan.Should().NotBeNull();
executeSpan!.Duration.Should().BeGreaterOrEqualTo(TimeSpan.FromMilliseconds(90),
because: "execution span should reflect actual execution time");
}
/// <summary>
/// Verifies span timestamps are in correct order.
/// </summary>
[Fact]
public async Task SpanTimestamps_InCorrectOrder()
{
// Arrange
ClearCapturedActivities();
var jobStore = new TracedJobStore(_source);
var executor = new TracedJobExecutor(_source);
var worker = new TracedSchedulerWorker(jobStore, executor, _source);
var job = new TracedJob
{
Id = "timestamp-order-001",
TenantId = "tenant-001",
Type = "scan"
};
// Act
await jobStore.EnqueueAsync(job);
await worker.ProcessAsync(job.Id, CancellationToken.None);
// Assert
var orderedSpans = _capturedActivities
.Where(a => a.Tags.Any(t => t.Key == "job_id" && t.Value == job.Id))
.OrderBy(a => a.StartTimeUtc)
.ToList();
for (int i = 1; i < orderedSpans.Count; i++)
{
orderedSpans[i].StartTimeUtc.Should().BeOnOrAfter(orderedSpans[i - 1].StartTimeUtc);
}
}
#endregion
#region Link Tests
/// <summary>
/// Verifies retry spans are linked to original job span.
/// </summary>
[Fact]
public async Task RetrySpan_LinkedToOriginalSpan()
{
// Arrange
ClearCapturedActivities();
var jobStore = new TracedJobStore(_source);
var executor = new TracedJobExecutor(_source, failureCount: 1);
var worker = new TracedSchedulerWorker(jobStore, executor, _source, maxRetries: 2);
var job = new TracedJob
{
Id = "retry-link-001",
TenantId = "tenant-001",
Type = "scan"
};
// Act
await jobStore.EnqueueAsync(job);
ClearCapturedActivities();
await worker.ProcessWithRetriesAsync(job.Id, CancellationToken.None);
// Assert
var retrySpans = _capturedActivities
.Where(a => a.OperationName.Contains("retry", StringComparison.OrdinalIgnoreCase)
|| a.Tags.Any(t => t.Key == "retry_count"))
.ToList();
// Retry spans should have links to original attempt
foreach (var span in retrySpans)
{
if (span.Links.Any())
{
span.Links.Should().NotBeEmpty(
because: "retry span should link to original execution");
}
}
}
#endregion
#region Helper Methods
private void ClearCapturedActivities()
{
while (_capturedActivities.TryTake(out _)) { }
}
#endregion
}
#region Test Infrastructure
/// <summary>
/// Job model for traced testing.
/// </summary>
public sealed class TracedJob
{
public required string Id { get; set; }
public required string TenantId { get; set; }
public required string Type { get; set; }
public string? ScheduleId { get; set; }
public string? TraceId { get; set; }
public string? SpanId { get; set; }
public TracedJobStatus Status { get; set; } = TracedJobStatus.Pending;
public int RetryCount { get; set; } = 0;
public string? Result { get; set; }
}
/// <summary>
/// Job status for traced testing.
/// </summary>
public enum TracedJobStatus
{
Pending,
Running,
Completed,
Failed
}
/// <summary>
/// Job store with tracing support.
/// </summary>
public sealed class TracedJobStore
{
private readonly ConcurrentDictionary<string, TracedJob> _jobs = new();
private readonly ActivitySource _source;
public TracedJobStore(ActivitySource source)
{
_source = source;
}
public Task EnqueueAsync(TracedJob job)
{
using var activity = _source.StartActivity("job.enqueue");
activity?.SetTag("job_id", job.Id);
activity?.SetTag("tenant_id", job.TenantId);
activity?.SetTag("job_type", job.Type);
if (job.ScheduleId != null)
{
activity?.SetTag("schedule_id", job.ScheduleId);
}
// Store trace context on job for later propagation
if (activity != null)
{
job.TraceId ??= activity.TraceId.ToString();
job.SpanId = activity.SpanId.ToString();
}
_jobs[job.Id] = job;
return Task.CompletedTask;
}
public Task<TracedJob?> GetByIdAsync(string id)
{
_jobs.TryGetValue(id, out var job);
return Task.FromResult(job);
}
public Task<TracedJob?> DequeueAsync()
{
using var activity = _source.StartActivity("job.dequeue");
var job = _jobs.Values.FirstOrDefault(j => j.Status == TracedJobStatus.Pending);
if (job != null)
{
activity?.SetTag("job_id", job.Id);
activity?.SetTag("tenant_id", job.TenantId);
job.Status = TracedJobStatus.Running;
}
return Task.FromResult(job);
}
public Task CompleteAsync(string id, string result)
{
using var activity = _source.StartActivity("job.complete");
activity?.SetTag("job_id", id);
if (_jobs.TryGetValue(id, out var job))
{
activity?.SetTag("tenant_id", job.TenantId);
if (job.ScheduleId != null)
{
activity?.SetTag("schedule_id", job.ScheduleId);
}
job.Status = TracedJobStatus.Completed;
job.Result = result;
}
return Task.CompletedTask;
}
public Task FailAsync(string id, string error)
{
using var activity = _source.StartActivity("job.fail");
activity?.SetTag("job_id", id);
activity?.SetStatus(ActivityStatusCode.Error, error);
if (_jobs.TryGetValue(id, out var job))
{
activity?.SetTag("tenant_id", job.TenantId);
job.Status = TracedJobStatus.Failed;
job.RetryCount++;
}
return Task.CompletedTask;
}
public Task ResetAsync(string id)
{
if (_jobs.TryGetValue(id, out var job))
{
job.Status = TracedJobStatus.Pending;
}
return Task.CompletedTask;
}
}
/// <summary>
/// Job executor with tracing support.
/// </summary>
public sealed class TracedJobExecutor
{
private readonly ActivitySource _source;
private readonly bool _shouldFail;
private readonly string _errorMessage;
private readonly int _failureCount;
private readonly TimeSpan _executionDelay;
private int _attempts = 0;
public TracedJobExecutor(
ActivitySource source,
bool shouldFail = false,
string errorMessage = "Execution failed",
int failureCount = 0,
TimeSpan executionDelay = default)
{
_source = source;
_shouldFail = shouldFail;
_errorMessage = errorMessage;
_failureCount = failureCount;
_executionDelay = executionDelay;
}
public async Task<string> ExecuteAsync(TracedJob job, CancellationToken cancellationToken)
{
using var activity = _source.StartActivity("job.execute");
activity?.SetTag("job_id", job.Id);
activity?.SetTag("tenant_id", job.TenantId);
activity?.SetTag("job_type", job.Type);
if (job.ScheduleId != null)
{
activity?.SetTag("schedule_id", job.ScheduleId);
}
_attempts++;
try
{
if (_executionDelay > TimeSpan.Zero)
{
await Task.Delay(_executionDelay, cancellationToken);
}
if (_shouldFail || _attempts <= _failureCount)
{
var ex = new InvalidOperationException(_errorMessage);
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
activity?.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection
{
{ "exception.type", ex.GetType().FullName },
{ "exception.message", ex.Message }
}));
throw ex;
}
return """{"status": "success"}""";
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
activity?.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection
{
{ "exception.type", ex.GetType().FullName },
{ "exception.message", ex.Message }
}));
throw;
}
}
}
/// <summary>
/// Scheduler worker with tracing support.
/// </summary>
public sealed class TracedSchedulerWorker
{
private readonly TracedJobStore _jobStore;
private readonly TracedJobExecutor _executor;
private readonly ActivitySource _source;
private readonly int _maxRetries;
public TracedSchedulerWorker(
TracedJobStore jobStore,
TracedJobExecutor executor,
ActivitySource source,
int maxRetries = 0)
{
_jobStore = jobStore;
_executor = executor;
_source = source;
_maxRetries = maxRetries;
}
public async Task<bool> ProcessAsync(string jobId, CancellationToken cancellationToken)
{
using var activity = _source.StartActivity("job.process");
activity?.SetTag("job_id", jobId);
var job = await _jobStore.GetByIdAsync(jobId);
if (job == null)
{
activity?.SetTag("job_found", false);
return false;
}
activity?.SetTag("job_found", true);
activity?.SetTag("tenant_id", job.TenantId);
if (job.ScheduleId != null)
{
activity?.SetTag("schedule_id", job.ScheduleId);
}
// Restore trace context if stored on job
if (!string.IsNullOrEmpty(job.TraceId) && ActivityTraceId.TryCreateFromString(job.TraceId, out var traceId))
{
activity?.SetTag("original_trace_id", job.TraceId);
}
job.Status = TracedJobStatus.Running;
var result = await _executor.ExecuteAsync(job, cancellationToken);
await _jobStore.CompleteAsync(jobId, result);
return true;
}
public async Task<bool> ProcessWithRetriesAsync(string jobId, CancellationToken cancellationToken)
{
using var rootActivity = _source.StartActivity("job.process.with_retries");
rootActivity?.SetTag("job_id", jobId);
rootActivity?.SetTag("max_retries", _maxRetries);
var job = await _jobStore.GetByIdAsync(jobId);
if (job == null) return false;
rootActivity?.SetTag("tenant_id", job.TenantId);
int attempt = 0;
while (attempt <= _maxRetries)
{
using var attemptActivity = _source.StartActivity($"job.attempt.{attempt}");
attemptActivity?.SetTag("job_id", jobId);
attemptActivity?.SetTag("attempt", attempt);
attemptActivity?.SetTag("retry_count", job.RetryCount);
if (attempt > 0)
{
attemptActivity?.AddTag("is_retry", true);
}
try
{
job.Status = TracedJobStatus.Running;
var result = await _executor.ExecuteAsync(job, cancellationToken);
await _jobStore.CompleteAsync(jobId, result);
return true;
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
await _jobStore.FailAsync(jobId, ex.Message);
await _jobStore.ResetAsync(jobId);
attempt++;
}
}
return false;
}
}
#endregion

View File

@@ -0,0 +1,863 @@
// ---------------------------------------------------------------------
// <copyright file="WorkerRetryTests.cs" company="StellaOps">
// Copyright (c) StellaOps. Licensed under the AGPL-3.0-or-later.
// </copyright>
// <summary>
// Retry tests: transient failure uses exponential backoff; permanent failure routes to poison queue
// </summary>
// ---------------------------------------------------------------------
using System.Collections.Concurrent;
using FluentAssertions;
using Xunit;
namespace StellaOps.Scheduler.Worker.Tests.Retry;
/// <summary>
/// Retry tests for Scheduler Worker covering transient failure backoff
/// and permanent failure routing to poison queue.
/// </summary>
[Trait("Category", "Retry")]
[Trait("Sprint", "5100-0009-0008")]
public sealed class WorkerRetryTests
{
#region Transient Failure Tests
/// <summary>
/// Verifies transient failure triggers retry.
/// </summary>
[Fact]
public async Task TransientFailure_TriggersRetry()
{
// Arrange
var jobStore = new RetryJobStore();
var executionAttempts = 0;
var executor = new RetryableJobExecutor(
failureCount: 2,
onExecute: _ => executionAttempts++
);
var retryPolicy = new RetryPolicy(maxRetries: 3, baseDelayMs: 10, maxDelayMs: 100);
var worker = new RetrySchedulerWorker(jobStore, executor, retryPolicy);
await jobStore.EnqueueAsync(new RetryJob
{
Id = "transient-test",
TenantId = "tenant-001",
Type = "scan",
Status = RetryJobStatus.Pending
});
// Act - Process until completion or max retries
await worker.ProcessWithRetriesAsync("transient-test", CancellationToken.None);
// Assert
executionAttempts.Should().Be(3, because: "2 failures + 1 success");
var job = await jobStore.GetByIdAsync("transient-test");
job!.Status.Should().Be(RetryJobStatus.Completed);
}
/// <summary>
/// Verifies retry count is tracked correctly.
/// </summary>
[Fact]
public async Task TransientFailure_IncrementsRetryCount()
{
// Arrange
var jobStore = new RetryJobStore();
var executor = new RetryableJobExecutor(failureCount: 1);
var retryPolicy = new RetryPolicy(maxRetries: 3, baseDelayMs: 10, maxDelayMs: 100);
var worker = new RetrySchedulerWorker(jobStore, executor, retryPolicy);
await jobStore.EnqueueAsync(new RetryJob
{
Id = "retry-count-test",
TenantId = "tenant-001",
Type = "scan",
Status = RetryJobStatus.Pending,
RetryCount = 0
});
// Act
await worker.ProcessWithRetriesAsync("retry-count-test", CancellationToken.None);
// Assert
var job = await jobStore.GetByIdAsync("retry-count-test");
job!.RetryCount.Should().Be(1);
}
#endregion
#region Exponential Backoff Tests
/// <summary>
/// Verifies exponential backoff delay increases with each retry.
/// </summary>
[Fact]
public async Task ExponentialBackoff_DelayIncreasesWithRetries()
{
// Arrange
var delays = new List<TimeSpan>();
var fakeClock = new FakeClock();
var jobStore = new RetryJobStore();
var executor = new RetryableJobExecutor(failureCount: 4);
var retryPolicy = new RetryPolicy(
maxRetries: 5,
baseDelayMs: 100,
maxDelayMs: 10000,
multiplier: 2.0,
jitterFactor: 0.0 // No jitter for deterministic test
);
var worker = new RetrySchedulerWorker(
jobStore, executor, retryPolicy, fakeClock,
onDelay: d => delays.Add(d)
);
await jobStore.EnqueueAsync(new RetryJob
{
Id = "backoff-test",
TenantId = "tenant-001",
Type = "scan",
Status = RetryJobStatus.Pending
});
// Act
await worker.ProcessWithRetriesAsync("backoff-test", CancellationToken.None);
// Assert - Delays should follow exponential pattern
delays.Should().HaveCount(4);
delays[0].TotalMilliseconds.Should().BeApproximately(100, 1); // Base delay
delays[1].TotalMilliseconds.Should().BeApproximately(200, 1); // 100 * 2
delays[2].TotalMilliseconds.Should().BeApproximately(400, 1); // 200 * 2
delays[3].TotalMilliseconds.Should().BeApproximately(800, 1); // 400 * 2
}
/// <summary>
/// Verifies backoff respects maximum delay cap.
/// </summary>
[Fact]
public async Task ExponentialBackoff_RespectsMaxDelay()
{
// Arrange
var delays = new List<TimeSpan>();
var fakeClock = new FakeClock();
var jobStore = new RetryJobStore();
var executor = new RetryableJobExecutor(failureCount: 10);
var retryPolicy = new RetryPolicy(
maxRetries: 10,
baseDelayMs: 100,
maxDelayMs: 500, // Cap at 500ms
multiplier: 2.0,
jitterFactor: 0.0
);
var worker = new RetrySchedulerWorker(
jobStore, executor, retryPolicy, fakeClock,
onDelay: d => delays.Add(d)
);
await jobStore.EnqueueAsync(new RetryJob
{
Id = "max-delay-test",
TenantId = "tenant-001",
Type = "scan",
Status = RetryJobStatus.Pending
});
// Act
await worker.ProcessWithRetriesAsync("max-delay-test", CancellationToken.None);
// Assert - All delays should be at or below max
delays.Should().AllSatisfy(d => d.TotalMilliseconds.Should().BeLessOrEqualTo(500));
}
/// <summary>
/// Verifies jitter is applied to backoff delays.
/// </summary>
[Fact]
public void ExponentialBackoff_WithJitter_ProducesVariation()
{
// Arrange
var retryPolicy = new RetryPolicy(
maxRetries: 5,
baseDelayMs: 1000,
maxDelayMs: 10000,
multiplier: 2.0,
jitterFactor: 0.2 // ±20% jitter
);
// Act - Calculate delays with different seeds
var delays1 = Enumerable.Range(0, 5)
.Select(i => retryPolicy.CalculateDelay(i, seed: 12345))
.ToList();
var delays2 = Enumerable.Range(0, 5)
.Select(i => retryPolicy.CalculateDelay(i, seed: 67890))
.ToList();
// Assert - Same retry count but different seeds should produce different delays
// (with 20% jitter, delays should vary)
delays1.Should().NotEqual(delays2);
}
/// <summary>
/// Verifies deterministic backoff with same seed.
/// </summary>
[Fact]
public void ExponentialBackoff_SameSeed_ProducesSameDelay()
{
// Arrange
var retryPolicy = new RetryPolicy(
maxRetries: 5,
baseDelayMs: 1000,
maxDelayMs: 10000,
multiplier: 2.0,
jitterFactor: 0.2
);
// Act
var delay1 = retryPolicy.CalculateDelay(retryCount: 3, seed: 42);
var delay2 = retryPolicy.CalculateDelay(retryCount: 3, seed: 42);
// Assert
delay1.Should().Be(delay2);
}
#endregion
#region Permanent Failure / Poison Queue Tests
/// <summary>
/// Verifies job is routed to poison queue after max retries.
/// </summary>
[Fact]
public async Task PermanentFailure_RoutesToPoisonQueue()
{
// Arrange
var jobStore = new RetryJobStore();
var poisonQueue = new PoisonQueue();
var executor = new RetryableJobExecutor(alwaysFail: true, errorMessage: "Permanent failure");
var retryPolicy = new RetryPolicy(maxRetries: 3, baseDelayMs: 10, maxDelayMs: 100);
var worker = new RetrySchedulerWorker(jobStore, executor, retryPolicy, poisonQueue: poisonQueue);
await jobStore.EnqueueAsync(new RetryJob
{
Id = "poison-test",
TenantId = "tenant-001",
Type = "scan",
Status = RetryJobStatus.Pending
});
// Act
await worker.ProcessWithRetriesAsync("poison-test", CancellationToken.None);
// Assert
var job = await jobStore.GetByIdAsync("poison-test");
job!.Status.Should().Be(RetryJobStatus.PoisonQueue);
poisonQueue.Jobs.Should().ContainSingle();
poisonQueue.Jobs.First().JobId.Should().Be("poison-test");
poisonQueue.Jobs.First().Error.Should().Contain("Permanent failure");
}
/// <summary>
/// Verifies poison queue entry includes all failure details.
/// </summary>
[Fact]
public async Task PoisonQueueEntry_IncludesFailureDetails()
{
// Arrange
var jobStore = new RetryJobStore();
var poisonQueue = new PoisonQueue();
var executor = new RetryableJobExecutor(
alwaysFail: true,
errorMessage: "Database connection timeout"
);
var retryPolicy = new RetryPolicy(maxRetries: 2, baseDelayMs: 10, maxDelayMs: 100);
var worker = new RetrySchedulerWorker(jobStore, executor, retryPolicy, poisonQueue: poisonQueue);
await jobStore.EnqueueAsync(new RetryJob
{
Id = "poison-details-test",
TenantId = "tenant-001",
Type = "scan",
Payload = """{"target": "test-image"}""",
Status = RetryJobStatus.Pending
});
// Act
await worker.ProcessWithRetriesAsync("poison-details-test", CancellationToken.None);
// Assert
var entry = poisonQueue.Jobs.First();
entry.JobId.Should().Be("poison-details-test");
entry.TenantId.Should().Be("tenant-001");
entry.JobType.Should().Be("scan");
entry.Payload.Should().Contain("test-image");
entry.RetryCount.Should().Be(2);
entry.Error.Should().Contain("Database connection timeout");
entry.FailedAt.Should().BeCloseTo(DateTime.UtcNow, TimeSpan.FromSeconds(5));
}
/// <summary>
/// Verifies non-retryable exceptions go directly to poison queue.
/// </summary>
[Fact]
public async Task NonRetryableException_ImmediatelyPoisoned()
{
// Arrange
var jobStore = new RetryJobStore();
var poisonQueue = new PoisonQueue();
var executor = new RetryableJobExecutor(
alwaysFail: true,
errorMessage: "Invalid payload format",
isRetryable: false
);
var retryPolicy = new RetryPolicy(maxRetries: 5, baseDelayMs: 10, maxDelayMs: 100);
var worker = new RetrySchedulerWorker(jobStore, executor, retryPolicy, poisonQueue: poisonQueue);
await jobStore.EnqueueAsync(new RetryJob
{
Id = "non-retryable-test",
TenantId = "tenant-001",
Type = "scan",
Status = RetryJobStatus.Pending
});
// Act
await worker.ProcessWithRetriesAsync("non-retryable-test", CancellationToken.None);
// Assert - Should be poisoned immediately without retries
var job = await jobStore.GetByIdAsync("non-retryable-test");
job!.RetryCount.Should().Be(0, because: "non-retryable errors skip retries");
job.Status.Should().Be(RetryJobStatus.PoisonQueue);
poisonQueue.Jobs.Should().ContainSingle();
}
#endregion
#region Retry Classification Tests
/// <summary>
/// Verifies transient errors are correctly classified as retryable.
/// </summary>
[Theory]
[InlineData("timeout", true)]
[InlineData("connection refused", true)]
[InlineData("service unavailable", true)]
[InlineData("too many requests", true)]
[InlineData("invalid argument", false)]
[InlineData("authentication failed", false)]
[InlineData("not found", false)]
[InlineData("permission denied", false)]
public void ErrorClassification_DeterminesRetryability(string errorMessage, bool expectedRetryable)
{
// Arrange
var classifier = new ErrorClassifier();
// Act
var isRetryable = classifier.IsRetryable(errorMessage);
// Assert
isRetryable.Should().Be(expectedRetryable);
}
/// <summary>
/// Verifies HTTP status codes are classified correctly.
/// </summary>
[Theory]
[InlineData(408, true)] // Request Timeout
[InlineData(429, true)] // Too Many Requests
[InlineData(500, true)] // Internal Server Error
[InlineData(502, true)] // Bad Gateway
[InlineData(503, true)] // Service Unavailable
[InlineData(504, true)] // Gateway Timeout
[InlineData(400, false)] // Bad Request
[InlineData(401, false)] // Unauthorized
[InlineData(403, false)] // Forbidden
[InlineData(404, false)] // Not Found
[InlineData(409, false)] // Conflict
[InlineData(422, false)] // Unprocessable Entity
public void HttpStatusCode_DeterminesRetryability(int statusCode, bool expectedRetryable)
{
// Arrange
var classifier = new ErrorClassifier();
// Act
var isRetryable = classifier.IsRetryable(statusCode);
// Assert
isRetryable.Should().Be(expectedRetryable);
}
#endregion
#region Circuit Breaker Integration Tests
/// <summary>
/// Verifies circuit breaker opens after consecutive failures.
/// </summary>
[Fact]
public async Task CircuitBreaker_OpensAfterConsecutiveFailures()
{
// Arrange
var jobStore = new RetryJobStore();
var executor = new RetryableJobExecutor(alwaysFail: true);
var circuitBreaker = new SimpleCircuitBreaker(failureThreshold: 3);
var retryPolicy = new RetryPolicy(maxRetries: 1, baseDelayMs: 10, maxDelayMs: 100);
var worker = new RetrySchedulerWorker(
jobStore, executor, retryPolicy,
circuitBreaker: circuitBreaker
);
// Enqueue multiple jobs
for (int i = 0; i < 5; i++)
{
await jobStore.EnqueueAsync(new RetryJob
{
Id = $"circuit-{i}",
TenantId = "tenant-001",
Type = "scan",
Status = RetryJobStatus.Pending
});
}
// Act - Process jobs until circuit opens
for (int i = 0; i < 5; i++)
{
await worker.ProcessWithRetriesAsync($"circuit-{i}", CancellationToken.None);
}
// Assert
circuitBreaker.IsOpen.Should().BeTrue();
circuitBreaker.FailureCount.Should().BeGreaterOrEqualTo(3);
}
/// <summary>
/// Verifies circuit breaker rejects requests when open.
/// </summary>
[Fact]
public async Task CircuitBreaker_RejectsWhenOpen()
{
// Arrange
var circuitBreaker = new SimpleCircuitBreaker(failureThreshold: 1);
circuitBreaker.RecordFailure(); // Force open
var jobStore = new RetryJobStore();
var executor = new RetryableJobExecutor();
var retryPolicy = new RetryPolicy(maxRetries: 3, baseDelayMs: 10, maxDelayMs: 100);
var worker = new RetrySchedulerWorker(
jobStore, executor, retryPolicy,
circuitBreaker: circuitBreaker
);
await jobStore.EnqueueAsync(new RetryJob
{
Id = "rejected-test",
TenantId = "tenant-001",
Type = "scan",
Status = RetryJobStatus.Pending
});
// Act
var processed = await worker.ProcessWithRetriesAsync("rejected-test", CancellationToken.None);
// Assert - Should be rejected by circuit breaker
processed.Should().BeFalse();
var job = await jobStore.GetByIdAsync("rejected-test");
job!.Status.Should().Be(RetryJobStatus.Pending, because: "job not processed due to open circuit");
}
#endregion
#region Scheduled Retry Tests
/// <summary>
/// Verifies job scheduled for retry has correct next run time.
/// </summary>
[Fact]
public async Task ScheduledRetry_HasCorrectNextRunTime()
{
// Arrange
var fakeClock = new FakeClock(DateTime.UtcNow);
var jobStore = new RetryJobStore();
var executor = new RetryableJobExecutor(failureCount: 1);
var retryPolicy = new RetryPolicy(maxRetries: 3, baseDelayMs: 1000, maxDelayMs: 10000, jitterFactor: 0);
var worker = new RetrySchedulerWorker(jobStore, executor, retryPolicy, fakeClock);
await jobStore.EnqueueAsync(new RetryJob
{
Id = "scheduled-retry-test",
TenantId = "tenant-001",
Type = "scan",
Status = RetryJobStatus.Pending
});
// Act
var beforeProcess = fakeClock.UtcNow;
await worker.ProcessWithRetriesAsync("scheduled-retry-test", CancellationToken.None);
// Assert - First retry delay should be 1000ms (base delay)
var job = await jobStore.GetByIdAsync("scheduled-retry-test");
job!.NextRetryAt.Should().BeCloseTo(beforeProcess.AddMilliseconds(1000), TimeSpan.FromMilliseconds(100));
}
#endregion
}
#region Test Infrastructure
/// <summary>
/// Job status for retry testing.
/// </summary>
public enum RetryJobStatus
{
Pending,
Running,
Completed,
Failed,
PoisonQueue
}
/// <summary>
/// Job model for retry testing.
/// </summary>
public sealed class RetryJob
{
public required string Id { get; set; }
public required string TenantId { get; set; }
public required string Type { get; set; }
public string Payload { get; set; } = "{}";
public RetryJobStatus Status { get; set; } = RetryJobStatus.Pending;
public int RetryCount { get; set; } = 0;
public DateTime? NextRetryAt { get; set; }
public string? Error { get; set; }
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
}
/// <summary>
/// Job store for retry testing.
/// </summary>
public sealed class RetryJobStore
{
private readonly ConcurrentDictionary<string, RetryJob> _jobs = new();
public Task EnqueueAsync(RetryJob job)
{
_jobs[job.Id] = job;
return Task.CompletedTask;
}
public Task<RetryJob?> GetByIdAsync(string id)
{
_jobs.TryGetValue(id, out var job);
return Task.FromResult(job);
}
public Task UpdateAsync(RetryJob job)
{
_jobs[job.Id] = job;
return Task.CompletedTask;
}
}
/// <summary>
/// Retryable job executor for testing.
/// </summary>
public sealed class RetryableJobExecutor
{
private readonly int _failureCount;
private readonly bool _alwaysFail;
private readonly string _errorMessage;
private readonly bool _isRetryable;
private readonly Action<RetryJob>? _onExecute;
private int _attempts = 0;
public RetryableJobExecutor(
int failureCount = 0,
bool alwaysFail = false,
string errorMessage = "Execution failed",
bool isRetryable = true,
Action<RetryJob>? onExecute = null)
{
_failureCount = failureCount;
_alwaysFail = alwaysFail;
_errorMessage = errorMessage;
_isRetryable = isRetryable;
_onExecute = onExecute;
}
public Task<string> ExecuteAsync(RetryJob job, CancellationToken cancellationToken)
{
_onExecute?.Invoke(job);
_attempts++;
if (_alwaysFail || _attempts <= _failureCount)
{
throw new JobExecutionException(_errorMessage, _isRetryable);
}
return Task.FromResult("""{"status": "success"}""");
}
}
/// <summary>
/// Exception for job execution failures.
/// </summary>
public sealed class JobExecutionException : Exception
{
public bool IsRetryable { get; }
public JobExecutionException(string message, bool isRetryable = true)
: base(message)
{
IsRetryable = isRetryable;
}
}
/// <summary>
/// Retry policy with exponential backoff.
/// </summary>
public sealed class RetryPolicy
{
public int MaxRetries { get; }
public int BaseDelayMs { get; }
public int MaxDelayMs { get; }
public double Multiplier { get; }
public double JitterFactor { get; }
public RetryPolicy(
int maxRetries,
int baseDelayMs,
int maxDelayMs,
double multiplier = 2.0,
double jitterFactor = 0.1)
{
MaxRetries = maxRetries;
BaseDelayMs = baseDelayMs;
MaxDelayMs = maxDelayMs;
Multiplier = multiplier;
JitterFactor = jitterFactor;
}
public TimeSpan CalculateDelay(int retryCount, int? seed = null)
{
var baseDelay = BaseDelayMs * Math.Pow(Multiplier, retryCount);
var delay = Math.Min(baseDelay, MaxDelayMs);
if (JitterFactor > 0 && seed.HasValue)
{
var rng = new Random(seed.Value + retryCount);
var jitter = delay * JitterFactor * (rng.NextDouble() * 2 - 1);
delay += jitter;
}
return TimeSpan.FromMilliseconds(Math.Max(0, delay));
}
}
/// <summary>
/// Fake clock for deterministic testing.
/// </summary>
public sealed class FakeClock
{
public DateTime UtcNow { get; private set; }
public FakeClock(DateTime? initial = null)
{
UtcNow = initial ?? DateTime.UtcNow;
}
public void Advance(TimeSpan duration)
{
UtcNow = UtcNow.Add(duration);
}
}
/// <summary>
/// Poison queue for dead letter jobs.
/// </summary>
public sealed class PoisonQueue
{
private readonly ConcurrentBag<PoisonQueueEntry> _jobs = new();
public IReadOnlyCollection<PoisonQueueEntry> Jobs => _jobs.ToList();
public Task EnqueueAsync(PoisonQueueEntry entry)
{
_jobs.Add(entry);
return Task.CompletedTask;
}
}
/// <summary>
/// Poison queue entry.
/// </summary>
public sealed record PoisonQueueEntry
{
public required string JobId { get; init; }
public required string TenantId { get; init; }
public required string JobType { get; init; }
public required string Payload { get; init; }
public required int RetryCount { get; init; }
public required string Error { get; init; }
public DateTime FailedAt { get; init; } = DateTime.UtcNow;
}
/// <summary>
/// Error classifier for retry decisions.
/// </summary>
public sealed class ErrorClassifier
{
private static readonly HashSet<string> RetryablePatterns = new(StringComparer.OrdinalIgnoreCase)
{
"timeout",
"connection refused",
"service unavailable",
"too many requests",
"temporarily unavailable",
"network error"
};
private static readonly HashSet<int> RetryableStatusCodes = new()
{
408, // Request Timeout
429, // Too Many Requests
500, // Internal Server Error
502, // Bad Gateway
503, // Service Unavailable
504 // Gateway Timeout
};
public bool IsRetryable(string errorMessage)
{
return RetryablePatterns.Any(p => errorMessage.Contains(p, StringComparison.OrdinalIgnoreCase));
}
public bool IsRetryable(int statusCode)
{
return RetryableStatusCodes.Contains(statusCode);
}
}
/// <summary>
/// Simple circuit breaker for testing.
/// </summary>
public sealed class SimpleCircuitBreaker
{
private readonly int _failureThreshold;
public int FailureCount { get; private set; }
public bool IsOpen => FailureCount >= _failureThreshold;
public SimpleCircuitBreaker(int failureThreshold)
{
_failureThreshold = failureThreshold;
}
public void RecordFailure() => FailureCount++;
public void RecordSuccess() => FailureCount = 0;
public bool AllowRequest() => !IsOpen;
}
/// <summary>
/// Scheduler worker with retry support.
/// </summary>
public sealed class RetrySchedulerWorker
{
private readonly RetryJobStore _jobStore;
private readonly RetryableJobExecutor _executor;
private readonly RetryPolicy _retryPolicy;
private readonly FakeClock? _fakeClock;
private readonly PoisonQueue? _poisonQueue;
private readonly SimpleCircuitBreaker? _circuitBreaker;
private readonly Action<TimeSpan>? _onDelay;
public RetrySchedulerWorker(
RetryJobStore jobStore,
RetryableJobExecutor executor,
RetryPolicy retryPolicy,
FakeClock? fakeClock = null,
PoisonQueue? poisonQueue = null,
SimpleCircuitBreaker? circuitBreaker = null,
Action<TimeSpan>? onDelay = null)
{
_jobStore = jobStore;
_executor = executor;
_retryPolicy = retryPolicy;
_fakeClock = fakeClock;
_poisonQueue = poisonQueue ?? new PoisonQueue();
_circuitBreaker = circuitBreaker;
_onDelay = onDelay;
}
public async Task<bool> ProcessWithRetriesAsync(string jobId, CancellationToken cancellationToken)
{
if (_circuitBreaker != null && !_circuitBreaker.AllowRequest())
{
return false;
}
var job = await _jobStore.GetByIdAsync(jobId);
if (job == null) return false;
job.Status = RetryJobStatus.Running;
await _jobStore.UpdateAsync(job);
while (job.RetryCount <= _retryPolicy.MaxRetries)
{
try
{
await _executor.ExecuteAsync(job, cancellationToken);
job.Status = RetryJobStatus.Completed;
await _jobStore.UpdateAsync(job);
_circuitBreaker?.RecordSuccess();
return true;
}
catch (JobExecutionException ex)
{
job.Error = ex.Message;
_circuitBreaker?.RecordFailure();
if (!ex.IsRetryable || job.RetryCount >= _retryPolicy.MaxRetries)
{
await RouteToPoisonQueueAsync(job);
return true;
}
var delay = _retryPolicy.CalculateDelay(job.RetryCount);
_onDelay?.Invoke(delay);
var now = _fakeClock?.UtcNow ?? DateTime.UtcNow;
job.NextRetryAt = now.Add(delay);
job.RetryCount++;
await _jobStore.UpdateAsync(job);
// Simulate delay (in real impl would be scheduled)
if (_fakeClock != null)
{
_fakeClock.Advance(delay);
}
}
}
return true;
}
private async Task RouteToPoisonQueueAsync(RetryJob job)
{
job.Status = RetryJobStatus.PoisonQueue;
await _jobStore.UpdateAsync(job);
await _poisonQueue!.EnqueueAsync(new PoisonQueueEntry
{
JobId = job.Id,
TenantId = job.TenantId,
JobType = job.Type,
Payload = job.Payload,
RetryCount = job.RetryCount,
Error = job.Error ?? "Unknown error"
});
}
}
#endregion