UI work to fill SBOM sourcing management gap. UI planning remaining functionality exposure. Work on CI/Tests stabilization

Introduces CGS determinism test runs to CI workflows for Windows, macOS, Linux, Alpine, and Debian, fulfilling CGS-008 cross-platform requirements. Updates local-ci scripts to support new smoke steps, test timeouts, progress intervals, and project slicing for improved test isolation and diagnostics.
This commit is contained in:
master
2025-12-29 19:12:38 +02:00
parent 41552d26ec
commit a4badc275e
286 changed files with 50918 additions and 992 deletions

View File

@@ -276,14 +276,16 @@ public sealed class RunEndpointTests : IClassFixture<WebApplicationFactory<Progr
while (!cts.IsCancellationRequested && !(seenRetry && seenInitial && seenQueueLag && seenHeartbeat))
{
var readTask = reader.ReadLineAsync();
var completed = await Task.WhenAny(readTask, Task.Delay(200, cts.Token));
if (completed != readTask)
string? line;
try
{
continue;
line = await reader.ReadLineAsync(cts.Token);
}
catch (OperationCanceledException)
{
break;
}
var line = await readTask;
if (line is null)
{
break;

View File

@@ -0,0 +1,406 @@
// -----------------------------------------------------------------------------
// SchedulerCrashRecoveryTests.cs
// Sprint: SPRINT_20251229_004_004_BE_scheduler_resilience
// Task: SCH-003
// Description: Chaos tests for worker crash recovery and exactly-once semantics
// -----------------------------------------------------------------------------
using System.Collections.Concurrent;
using FluentAssertions;
using Xunit;
namespace StellaOps.Scheduler.Worker.Tests.Chaos;
/// <summary>
/// Chaos tests for Scheduler worker crash recovery.
/// Verifies exactly-once execution guarantees and orphaned job recovery.
///
/// EDGE CASE: Worker crash during execution.
/// When a worker crashes mid-execution, its distributed lock will expire
/// after heartbeat timeout. Another worker must detect the orphaned job
/// and recover it WITHOUT re-executing completed work.
///
/// EDGE CASE: Exactly-once semantics.
/// Job execution must happen exactly once, even across crashes.
/// Idempotency keys and state machine transitions prevent duplicates.
///
/// EDGE CASE: Heartbeat-based failure detection.
/// Workers extend their distributed lock via heartbeat updates.
/// A missed heartbeat indicates crash/network partition, triggering recovery.
/// </summary>
[Trait("Category", "Chaos")]
[Trait("Sprint", "SPRINT_20251229_004_004_BE")]
public sealed class SchedulerCrashRecoveryTests
{
#region Worker Crash and Recovery Tests
/// <summary>
/// Simulates a worker crash mid-execution and verifies another worker
/// recovers the orphaned job.
///
/// EDGE CASE: Heartbeat timeout triggers orphan detection.
/// After the heartbeat timeout elapses, the distributed lock expires.
/// A healthy worker can then claim the orphaned job.
///
/// EDGE CASE: Job state must be 'Processing' for recovery.
/// Only jobs in 'Processing' state (not Pending/Completed) are recoverable.
/// This prevents recovering jobs that haven't started yet.
/// </summary>
[Fact]
public async Task WorkerCrashMidRun_JobRecoveredByAnotherWorker()
{
// Arrange
var jobId = Guid.NewGuid().ToString();
var executionLog = new ConcurrentBag<(string Worker, DateTimeOffset Time)>();
var worker1Crashed = new TaskCompletionSource<bool>();
var worker2Completed = new TaskCompletionSource<bool>();
// Simulate worker 1 (will crash after starting)
var worker1 = new SimulatedWorker("worker-1", async (job) =>
{
executionLog.Add(("worker-1", DateTimeOffset.UtcNow));
// Simulate crash after starting work
worker1Crashed.SetResult(true);
await Task.Delay(Timeout.Infinite); // Hang forever (simulates crash)
});
// Simulate worker 2 (will recover the job)
var worker2 = new SimulatedWorker("worker-2", async (job) =>
{
executionLog.Add(("worker-2", DateTimeOffset.UtcNow));
worker2Completed.SetResult(true);
await Task.CompletedTask;
});
var jobStore = new InMemoryJobStore();
var lockManager = new InMemoryDistributedLockManager(heartbeatTimeout: TimeSpan.FromSeconds(2));
var job = new SimulatedJob
{
Id = jobId,
State = JobState.Pending,
IdempotencyKey = $"scan:{jobId}",
Payload = "image:latest"
};
await jobStore.EnqueueAsync(job);
// Act: Start worker 1
var worker1Task = Task.Run(async () =>
{
await worker1.ProcessNextJobAsync(jobStore, lockManager);
});
// Wait for worker 1 to start
await worker1Crashed.Task.WaitAsync(TimeSpan.FromSeconds(5));
// Simulate heartbeat timeout passing
await Task.Delay(TimeSpan.FromSeconds(3));
// Start worker 2 (should detect orphaned job)
var worker2Task = Task.Run(async () =>
{
await worker2.ProcessNextJobAsync(jobStore, lockManager);
});
// Wait for worker 2 to complete
await worker2Completed.Task.WaitAsync(TimeSpan.FromSeconds(5));
// Assert
executionLog.Should().HaveCount(2, "both workers should have attempted execution");
executionLog.Should().Contain(x => x.Worker == "worker-1", "worker 1 started");
executionLog.Should().Contain(x => x.Worker == "worker-2", "worker 2 recovered");
var finalJob = await jobStore.GetJobAsync(jobId);
finalJob.State.Should().Be(JobState.Completed, "job should be marked completed by worker 2");
finalJob.Attempts.Should().Be(2, "one failed attempt + one successful attempt");
}
/// <summary>
/// Verifies that a crashed job is never executed more than once successfully.
///
/// EDGE CASE: Retry limit prevents infinite loops.
/// If a job fails repeatedly, it must be moved to poison queue
/// after maxRetries attempts.
///
/// EDGE CASE: Idempotency key prevents duplicate successful execution.
/// Even if multiple workers claim the job, only one can transition it
/// to 'Completed' state due to state machine invariants.
/// </summary>
[Fact]
public async Task CrashedJob_DoesNotExecuteTwiceSuccessfully()
{
// Arrange
var jobId = Guid.NewGuid().ToString();
var successfulExecutions = new ConcurrentBag<string>();
var attemptCount = 0;
var worker = new SimulatedWorker("worker", async (job) =>
{
var attempt = Interlocked.Increment(ref attemptCount);
if (attempt == 1)
{
// First attempt: simulate crash
throw new InvalidOperationException("Worker crashed");
}
// Second attempt: succeed
successfulExecutions.Add(job.Id);
await Task.CompletedTask;
});
var jobStore = new InMemoryJobStore();
var lockManager = new InMemoryDistributedLockManager(heartbeatTimeout: TimeSpan.FromSeconds(1));
var job = new SimulatedJob
{
Id = jobId,
State = JobState.Pending,
IdempotencyKey = $"scan:{jobId}",
Payload = "image:latest",
MaxRetries = 3
};
await jobStore.EnqueueAsync(job);
// Act: Process job with retries
for (int i = 0; i < 5; i++) // Try processing multiple times
{
try
{
await worker.ProcessNextJobAsync(jobStore, lockManager);
}
catch
{
// Ignore exceptions (simulates worker crash recovery)
}
await Task.Delay(100); // Small delay between retries
}
// Assert
successfulExecutions.Should().HaveCount(1, "job should execute successfully exactly once");
attemptCount.Should().Be(2, "one failed attempt + one successful attempt");
var finalJob = await jobStore.GetJobAsync(jobId);
finalJob.State.Should().Be(JobState.Completed);
finalJob.Attempts.Should().Be(2);
}
/// <summary>
/// Verifies that jobs exceeding max retries are moved to poison queue.
///
/// EDGE CASE: Poison queue isolation.
/// Failed jobs must not block the main queue. They are moved to a separate
/// poison queue for manual investigation.
///
/// EDGE CASE: Max retries includes original attempt.
/// If maxRetries = 3, the job can execute at most 4 times (original + 3 retries).
/// </summary>
[Fact]
public async Task JobExceedingMaxRetries_MovedToPoisonQueue()
{
// Arrange
var jobId = Guid.NewGuid().ToString();
var executionAttempts = new ConcurrentBag<int>();
var worker = new SimulatedWorker("worker", async (job) =>
{
executionAttempts.Add(job.Attempts);
// Always fail
throw new InvalidOperationException("Persistent failure");
});
var jobStore = new InMemoryJobStore();
var lockManager = new InMemoryDistributedLockManager(heartbeatTimeout: TimeSpan.FromSeconds(1));
var job = new SimulatedJob
{
Id = jobId,
State = JobState.Pending,
IdempotencyKey = $"scan:{jobId}",
Payload = "image:latest",
MaxRetries = 2 // Allow 2 retries (3 total attempts)
};
await jobStore.EnqueueAsync(job);
// Act: Process job until it moves to poison queue
for (int i = 0; i < 5; i++)
{
try
{
await worker.ProcessNextJobAsync(jobStore, lockManager);
}
catch
{
// Expected: job keeps failing
}
await Task.Delay(100);
}
// Assert
executionAttempts.Should().HaveCount(3, "original attempt + 2 retries = 3 total");
var finalJob = await jobStore.GetJobAsync(jobId);
finalJob.State.Should().Be(JobState.Failed);
finalJob.Attempts.Should().Be(3);
finalJob.ErrorMessage.Should().Contain("Persistent failure");
}
#endregion
#region Test Infrastructure (Simplified Simulation)
private enum JobState
{
Pending,
Processing,
Completed,
Failed
}
private class SimulatedJob
{
public required string Id { get; init; }
public required string IdempotencyKey { get; init; }
public required string Payload { get; init; }
public JobState State { get; set; }
public int Attempts { get; set; }
public int MaxRetries { get; set; } = 3;
public string? ErrorMessage { get; set; }
public string? LockHolder { get; set; }
public DateTimeOffset? LockExpiry { get; set; }
}
private class InMemoryJobStore
{
private readonly ConcurrentDictionary<string, SimulatedJob> _jobs = new();
public Task EnqueueAsync(SimulatedJob job)
{
_jobs[job.Id] = job;
return Task.CompletedTask;
}
public Task<SimulatedJob> GetJobAsync(string id)
{
return Task.FromResult(_jobs[id]);
}
public Task<SimulatedJob?> TryClaimNextJobAsync(string workerId)
{
var now = DateTimeOffset.UtcNow;
// Find first pending job or orphaned job (lock expired)
var claimable = _jobs.Values
.Where(j => j.State == JobState.Pending ||
(j.State == JobState.Processing && j.LockExpiry < now))
.OrderBy(j => j.Attempts)
.FirstOrDefault();
if (claimable != null)
{
claimable.State = JobState.Processing;
claimable.LockHolder = workerId;
claimable.Attempts++;
}
return Task.FromResult(claimable);
}
public Task UpdateJobStateAsync(string id, JobState newState, string? errorMessage = null)
{
if (_jobs.TryGetValue(id, out var job))
{
job.State = newState;
job.ErrorMessage = errorMessage;
job.LockHolder = null;
job.LockExpiry = null;
}
return Task.CompletedTask;
}
}
private class InMemoryDistributedLockManager
{
private readonly TimeSpan _heartbeatTimeout;
public InMemoryDistributedLockManager(TimeSpan heartbeatTimeout)
{
_heartbeatTimeout = heartbeatTimeout;
}
public Task AcquireLockAsync(SimulatedJob job, string workerId)
{
job.LockHolder = workerId;
job.LockExpiry = DateTimeOffset.UtcNow.Add(_heartbeatTimeout);
return Task.CompletedTask;
}
public Task ReleaseLockAsync(SimulatedJob job)
{
job.LockHolder = null;
job.LockExpiry = null;
return Task.CompletedTask;
}
}
private class SimulatedWorker
{
private readonly string _workerId;
private readonly Func<SimulatedJob, Task> _executeJob;
public SimulatedWorker(string workerId, Func<SimulatedJob, Task> executeJob)
{
_workerId = workerId;
_executeJob = executeJob;
}
public async Task ProcessNextJobAsync(InMemoryJobStore jobStore, InMemoryDistributedLockManager lockManager)
{
var job = await jobStore.TryClaimNextJobAsync(_workerId);
if (job == null)
{
return;
}
await lockManager.AcquireLockAsync(job, _workerId);
try
{
await _executeJob(job);
// Job succeeded
await jobStore.UpdateJobStateAsync(job.Id, JobState.Completed);
}
catch (Exception ex)
{
// Job failed
if (job.Attempts >= job.MaxRetries + 1)
{
// Move to poison queue
await jobStore.UpdateJobStateAsync(job.Id, JobState.Failed, ex.Message);
}
else
{
// Mark for retry
job.State = JobState.Pending;
}
throw;
}
finally
{
await lockManager.ReleaseLockAsync(job);
}
}
}
#endregion
}

View File

@@ -0,0 +1,403 @@
// -----------------------------------------------------------------------------
// HeartbeatTimeoutTests.cs
// Sprint: SPRINT_20251229_004_004_BE_scheduler_resilience
// Task: SCH-007
// Description: Tests for heartbeat-based failure detection and stale lock cleanup
// -----------------------------------------------------------------------------
using System.Collections.Concurrent;
using FluentAssertions;
using Xunit;
namespace StellaOps.Scheduler.Worker.Tests.Heartbeat;
/// <summary>
/// Tests for heartbeat-based worker liveness detection and stale lock cleanup.
///
/// EDGE CASE: Heartbeat extension for long-running jobs.
/// Workers must periodically extend their distributed lock via heartbeat updates.
/// This prevents lock expiration during legitimate long-running executions.
///
/// EDGE CASE: Stale lock detection after timeout.
/// When a worker fails to send heartbeats (crash, network partition), its lock
/// becomes stale. Other workers must detect and reclaim the orphaned job.
///
/// EDGE CASE: Heartbeat interval vs. timeout.
/// Heartbeat interval should be significantly smaller than timeout (e.g., 1/3).
/// This provides tolerance for transient delays without false positives.
/// </summary>
[Trait("Category", "Integration")]
[Trait("Category", "Heartbeat")]
[Trait("Sprint", "SPRINT_20251229_004_004_BE")]
public sealed class HeartbeatTimeoutTests
{
#region Heartbeat Extension Tests
/// <summary>
/// Verifies that workers extend locks via periodic heartbeats.
///
/// EDGE CASE: Lock extension timing.
/// Heartbeat interval (e.g., 5s) must be less than timeout (e.g., 15s).
/// This allows multiple heartbeat failures before lock expiration.
///
/// EDGE CASE: Heartbeat during long-running jobs.
/// A job running for 60s with 15s heartbeat timeout must send
/// at least 4 heartbeats to prevent lock expiration.
/// </summary>
[Fact]
public async Task LongRunningJob_ExtendsLockViaHeartbeat()
{
// Arrange
var jobId = "long-job";
var heartbeatInterval = TimeSpan.FromSeconds(2);
var heartbeatTimeout = TimeSpan.FromSeconds(6);
var jobDuration = TimeSpan.FromSeconds(10);
var lockManager = new HeartbeatLockManager(heartbeatTimeout);
var heartbeatLog = new ConcurrentBag<DateTimeOffset>();
var worker = new HeartbeatWorker("worker-1", heartbeatInterval, async (job) =>
{
// Simulate long-running job
var elapsed = TimeSpan.Zero;
while (elapsed < jobDuration)
{
await Task.Delay(heartbeatInterval);
elapsed += heartbeatInterval;
// Extend lock via heartbeat
await lockManager.ExtendLockAsync(job.Id, "worker-1");
heartbeatLog.Add(DateTimeOffset.UtcNow);
}
});
var job = new HeartbeatTestJob
{
Id = jobId,
Payload = "long-task"
};
// Act
await lockManager.AcquireLockAsync(jobId, "worker-1");
await worker.ExecuteJobAsync(job, lockManager);
// Assert
heartbeatLog.Should().NotBeEmpty("should have sent heartbeats");
// With 10s job duration and 2s heartbeat interval, expect ~5 heartbeats
heartbeatLog.Count.Should().BeGreaterThanOrEqualTo(4, "should send periodic heartbeats");
var lockExpired = await lockManager.IsLockExpiredAsync(jobId);
lockExpired.Should().BeFalse("lock should not expire during active heartbeats");
}
/// <summary>
/// Verifies that missed heartbeats cause lock expiration.
///
/// EDGE CASE: Heartbeat failure detection.
/// If a worker stops sending heartbeats (crash, hang, network issue),
/// the lock must expire after heartbeatTimeout.
/// </summary>
[Fact]
public async Task MissedHeartbeats_CauseLockExpiration()
{
// Arrange
var jobId = "missed-heartbeat-job";
var heartbeatTimeout = TimeSpan.FromSeconds(3);
var lockManager = new HeartbeatLockManager(heartbeatTimeout);
// Act: Acquire lock but never send heartbeats
await lockManager.AcquireLockAsync(jobId, "worker-1");
// Wait for timeout to elapse
await Task.Delay(heartbeatTimeout + TimeSpan.FromSeconds(1));
// Assert
var lockExpired = await lockManager.IsLockExpiredAsync(jobId);
lockExpired.Should().BeTrue("lock should expire after missing heartbeats");
// Another worker should be able to claim the job
var claimed = await lockManager.TryClaimExpiredLockAsync(jobId, "worker-2");
claimed.Should().BeTrue("expired lock should be claimable by another worker");
}
#endregion
#region Stale Lock Cleanup Tests
/// <summary>
/// Verifies that stale locks are cleaned up and jobs are recovered.
///
/// EDGE CASE: Orphaned job recovery.
/// When a worker crashes, its lock eventually expires. A background cleanup
/// process must detect stale locks and make jobs available for retry.
/// </summary>
[Fact]
public async Task StaleLock_CleanedUpAndJobRecovered()
{
// Arrange
var jobId = "stale-lock-job";
var heartbeatTimeout = TimeSpan.FromSeconds(2);
var lockManager = new HeartbeatLockManager(heartbeatTimeout);
var jobStore = new HeartbeatJobStore();
var job = new HeartbeatTestJob
{
Id = jobId,
Payload = "image:latest",
State = JobState.Processing,
LockHolder = "crashed-worker"
};
await jobStore.StoreJobAsync(job);
await lockManager.AcquireLockAsync(jobId, "crashed-worker");
// Wait for lock to expire
await Task.Delay(heartbeatTimeout + TimeSpan.FromSeconds(1));
// Act: Run cleanup process
await lockManager.CleanupStaleLocksAsync(jobStore);
// Assert
var recoveredJob = await jobStore.GetJobAsync(jobId);
recoveredJob.State.Should().Be(JobState.Pending, "stale job should be reset to pending");
recoveredJob.LockHolder.Should().BeNull("stale lock should be released");
var lockExpired = await lockManager.IsLockExpiredAsync(jobId);
lockExpired.Should().BeTrue("stale lock should be removed");
}
/// <summary>
/// Verifies that active locks are not cleaned up.
///
/// EDGE CASE: False positive prevention.
/// The cleanup process must not remove locks that are actively maintained
/// via heartbeats, even if the job has been running for a long time.
/// </summary>
[Fact]
public async Task ActiveLock_NotCleanedUp()
{
// Arrange
var jobId = "active-lock-job";
var heartbeatTimeout = TimeSpan.FromSeconds(5);
var lockManager = new HeartbeatLockManager(heartbeatTimeout);
var jobStore = new HeartbeatJobStore();
var job = new HeartbeatTestJob
{
Id = jobId,
Payload = "image:latest",
State = JobState.Processing,
LockHolder = "active-worker"
};
await jobStore.StoreJobAsync(job);
await lockManager.AcquireLockAsync(jobId, "active-worker");
// Continuously send heartbeats
var heartbeatTask = Task.Run(async () =>
{
for (int i = 0; i < 5; i++)
{
await Task.Delay(TimeSpan.FromSeconds(1));
await lockManager.ExtendLockAsync(jobId, "active-worker");
}
});
// Wait for some time (but keep sending heartbeats)
await Task.Delay(TimeSpan.FromSeconds(3));
// Act: Run cleanup process
await lockManager.CleanupStaleLocksAsync(jobStore);
// Assert
var jobAfterCleanup = await jobStore.GetJobAsync(jobId);
jobAfterCleanup.State.Should().Be(JobState.Processing, "active job should not be reset");
jobAfterCleanup.LockHolder.Should().Be("active-worker", "active lock should be preserved");
await heartbeatTask; // Wait for heartbeat task to complete
}
#endregion
#region Heartbeat Metrics Tests
/// <summary>
/// Verifies that missed heartbeat count is tracked correctly.
///
/// EDGE CASE: Metrics for monitoring.
/// The scheduler.heartbeat.missed metric must accurately count
/// heartbeat failures for alerting and monitoring.
/// </summary>
[Fact]
public async Task MissedHeartbeat_IncrementsMissedCounter()
{
// Arrange
var lockManager = new HeartbeatLockManager(TimeSpan.FromSeconds(2));
var metrics = new HeartbeatMetrics();
await lockManager.AcquireLockAsync("job-1", "worker-1");
// Act: Wait for timeout without sending heartbeat
await Task.Delay(TimeSpan.FromSeconds(3));
// Check for missed heartbeat
var expired = await lockManager.IsLockExpiredAsync("job-1");
if (expired)
{
metrics.IncrementMissedHeartbeats();
}
// Assert
metrics.MissedHeartbeatCount.Should().Be(1, "should count missed heartbeat");
}
#endregion
#region Test Infrastructure
private enum JobState
{
Pending,
Processing,
Completed,
Failed
}
private class HeartbeatTestJob
{
public required string Id { get; init; }
public required string Payload { get; init; }
public JobState State { get; set; } = JobState.Pending;
public string? LockHolder { get; set; }
}
private class HeartbeatLockManager
{
private readonly TimeSpan _heartbeatTimeout;
private readonly ConcurrentDictionary<string, (string WorkerId, DateTimeOffset Expiry)> _locks = new();
public HeartbeatLockManager(TimeSpan heartbeatTimeout)
{
_heartbeatTimeout = heartbeatTimeout;
}
public Task AcquireLockAsync(string jobId, string workerId)
{
var expiry = DateTimeOffset.UtcNow.Add(_heartbeatTimeout);
_locks[jobId] = (workerId, expiry);
return Task.CompletedTask;
}
public Task ExtendLockAsync(string jobId, string workerId)
{
if (_locks.TryGetValue(jobId, out var existing) && existing.WorkerId == workerId)
{
var newExpiry = DateTimeOffset.UtcNow.Add(_heartbeatTimeout);
_locks[jobId] = (workerId, newExpiry);
}
return Task.CompletedTask;
}
public Task<bool> IsLockExpiredAsync(string jobId)
{
if (!_locks.TryGetValue(jobId, out var lockInfo))
{
return Task.FromResult(true); // No lock = expired
}
var expired = DateTimeOffset.UtcNow > lockInfo.Expiry;
return Task.FromResult(expired);
}
public Task<bool> TryClaimExpiredLockAsync(string jobId, string workerId)
{
if (_locks.TryGetValue(jobId, out var existing))
{
if (DateTimeOffset.UtcNow > existing.Expiry)
{
// Lock expired, claim it
var newExpiry = DateTimeOffset.UtcNow.Add(_heartbeatTimeout);
_locks[jobId] = (workerId, newExpiry);
return Task.FromResult(true);
}
}
return Task.FromResult(false);
}
public async Task CleanupStaleLocksAsync(HeartbeatJobStore jobStore)
{
foreach (var kvp in _locks)
{
if (await IsLockExpiredAsync(kvp.Key))
{
// Release stale lock and reset job to pending
var job = await jobStore.GetJobAsync(kvp.Key);
if (job != null)
{
job.State = JobState.Pending;
job.LockHolder = null;
await jobStore.StoreJobAsync(job);
}
_locks.TryRemove(kvp.Key, out _);
}
}
}
}
private class HeartbeatJobStore
{
private readonly ConcurrentDictionary<string, HeartbeatTestJob> _jobs = new();
public Task StoreJobAsync(HeartbeatTestJob job)
{
_jobs[job.Id] = job;
return Task.CompletedTask;
}
public Task<HeartbeatTestJob?> GetJobAsync(string jobId)
{
_jobs.TryGetValue(jobId, out var job);
return Task.FromResult(job);
}
}
private class HeartbeatWorker
{
private readonly string _workerId;
private readonly TimeSpan _heartbeatInterval;
private readonly Func<HeartbeatTestJob, Task> _executeJob;
public HeartbeatWorker(string workerId, TimeSpan heartbeatInterval, Func<HeartbeatTestJob, Task> executeJob)
{
_workerId = workerId;
_heartbeatInterval = heartbeatInterval;
_executeJob = executeJob;
}
public async Task ExecuteJobAsync(HeartbeatTestJob job, HeartbeatLockManager lockManager)
{
await _executeJob(job);
}
}
private class HeartbeatMetrics
{
private int _missedHeartbeatCount;
public int MissedHeartbeatCount => _missedHeartbeatCount;
public void IncrementMissedHeartbeats()
{
Interlocked.Increment(ref _missedHeartbeatCount);
}
}
#endregion
}

View File

@@ -0,0 +1,439 @@
// -----------------------------------------------------------------------------
// SchedulerBackpressureTests.cs
// Sprint: SPRINT_20251229_004_004_BE_scheduler_resilience
// Task: SCH-004
// Description: Load tests for scheduler backpressure and concurrency limits
// -----------------------------------------------------------------------------
using System.Collections.Concurrent;
using System.Diagnostics;
using FluentAssertions;
using Xunit;
namespace StellaOps.Scheduler.Worker.Tests.Load;
/// <summary>
/// Load tests for Scheduler backpressure and queue depth management.
/// Verifies that concurrency limits are enforced and queue rejections work correctly.
///
/// EDGE CASE: Concurrency limit enforcement.
/// The scheduler must never exceed maxConcurrentJobs in-flight executions.
/// This prevents resource exhaustion and maintains system stability.
///
/// EDGE CASE: Queue depth limits.
/// When the queue reaches maxQueueDepth, new jobs must be rejected.
/// This provides backpressure to upstream systems.
///
/// EDGE CASE: Fair scheduling under load.
/// Jobs should be processed in FIFO order (subject to priority).
/// High load must not cause starvation of earlier-enqueued jobs.
/// </summary>
[Trait("Category", "Performance")]
[Trait("Category", "Load")]
[Trait("Sprint", "SPRINT_20251229_004_004_BE")]
public sealed class SchedulerBackpressureTests
{
#region Concurrency Limit Tests
/// <summary>
/// Verifies that the scheduler enforces maxConcurrentJobs limit.
///
/// EDGE CASE: Concurrent job counting.
/// The scheduler tracks in-flight jobs atomically. Increments happen
/// on job start, decrements on job completion/failure.
///
/// EDGE CASE: Burst load handling.
/// When 1000 jobs are enqueued simultaneously, the scheduler must
/// limit concurrent execution to maxConcurrent, queuing the rest.
/// </summary>
[Fact]
public async Task HighLoad_EnforcesConcurrencyLimit()
{
// Arrange
const int totalJobs = 1000;
const int maxConcurrent = 10;
var concurrentCount = 0;
var maxObservedConcurrency = 0;
var processedJobs = 0;
var concurrencyLock = new object();
var scheduler = new LoadTestScheduler(maxConcurrent);
// Simulate job execution with tracking
scheduler.OnJobExecute = async (jobId) =>
{
int current;
lock (concurrencyLock)
{
current = ++concurrentCount;
maxObservedConcurrency = Math.Max(maxObservedConcurrency, current);
}
// Simulate work
await Task.Delay(10);
lock (concurrencyLock)
{
concurrentCount--;
processedJobs++;
}
};
// Act: Enqueue 1000 jobs in burst
var enqueueTasks = Enumerable.Range(0, totalJobs)
.Select(i => scheduler.EnqueueAsync(new LoadTestJob
{
Id = $"load-{i}",
Payload = $"image:{i}"
}));
await Task.WhenAll(enqueueTasks);
// Process all jobs
await scheduler.ProcessAllAsync(timeout: TimeSpan.FromMinutes(2));
// Assert
processedJobs.Should().Be(totalJobs, "all jobs should complete");
maxObservedConcurrency.Should().BeLessThanOrEqualTo(maxConcurrent,
"concurrency limit must be respected at all times");
scheduler.Metrics.PeakConcurrency.Should().BeLessThanOrEqualTo(maxConcurrent);
scheduler.Metrics.TotalEnqueued.Should().Be(totalJobs);
scheduler.Metrics.TotalCompleted.Should().Be(totalJobs);
}
/// <summary>
/// Verifies job processing throughput under sustained load.
///
/// EDGE CASE: Throughput degradation under contention.
/// As concurrency increases, per-job overhead increases due to
/// lock contention and context switching. Throughput should remain
/// predictable and not degrade exponentially.
/// </summary>
[Fact]
public async Task SustainedLoad_MaintainsThroughput()
{
// Arrange
const int totalJobs = 500;
const int maxConcurrent = 20;
var processedJobs = 0;
var scheduler = new LoadTestScheduler(maxConcurrent);
scheduler.OnJobExecute = async (jobId) =>
{
await Task.Delay(5); // Simulate fast job execution
Interlocked.Increment(ref processedJobs);
};
// Enqueue jobs
for (int i = 0; i < totalJobs; i++)
{
await scheduler.EnqueueAsync(new LoadTestJob
{
Id = $"sustained-{i}",
Payload = $"image:{i}"
});
}
// Act: Measure processing time
var stopwatch = Stopwatch.StartNew();
await scheduler.ProcessAllAsync(timeout: TimeSpan.FromMinutes(1));
stopwatch.Stop();
// Assert
processedJobs.Should().Be(totalJobs);
// With maxConcurrent=20 and 5ms per job, theoretical minimum is:
// 500 jobs / 20 concurrency = 25 batches × 5ms = 125ms
// Allow 10x overhead for scheduling, locking, etc.
stopwatch.ElapsedMilliseconds.Should().BeLessThan(1500,
"throughput should remain efficient under load");
var jobsPerSecond = totalJobs / stopwatch.Elapsed.TotalSeconds;
jobsPerSecond.Should().BeGreaterThan(100, "should process at least 100 jobs/sec");
}
#endregion
#region Queue Depth and Backpressure Tests
/// <summary>
/// Verifies that the scheduler rejects new jobs when queue is full.
///
/// EDGE CASE: Queue capacity enforcement.
/// The queue has a fixed capacity (maxQueueDepth). When full, new
/// enqueue attempts must fail immediately without blocking.
///
/// EDGE CASE: Backpressure signaling.
/// Rejected enqueue attempts return false, allowing callers to implement
/// exponential backoff or circuit breaking.
/// </summary>
[Fact]
public async Task QueueFull_RejectsNewJobs()
{
// Arrange
const int queueCapacity = 100;
var scheduler = new LoadTestScheduler(
maxConcurrent: 1,
maxQueueDepth: queueCapacity);
// Pause job processing to fill queue
scheduler.PauseProcessing();
// Act: Fill the queue to capacity
for (int i = 0; i < queueCapacity; i++)
{
var enqueued = await scheduler.TryEnqueueAsync(new LoadTestJob
{
Id = $"fill-{i}",
Payload = $"image:{i}"
});
enqueued.Should().BeTrue($"job {i} should be accepted (queue not full yet)");
}
// Try to enqueue one more (should fail)
var overflow = await scheduler.TryEnqueueAsync(new LoadTestJob
{
Id = "overflow",
Payload = "image:overflow"
});
// Assert
overflow.Should().BeFalse("queue at capacity should reject new jobs");
scheduler.Metrics.TotalEnqueued.Should().Be(queueCapacity);
scheduler.Metrics.TotalRejected.Should().Be(1);
}
/// <summary>
/// Verifies that queue depth decreases as jobs complete.
///
/// EDGE CASE: Queue depth metric accuracy.
/// The scheduler.jobs.queued metric must accurately reflect the number
/// of jobs waiting for execution (not including in-flight jobs).
/// </summary>
[Fact]
public async Task QueueDepth_DecreasesAsJobsComplete()
{
// Arrange
const int totalJobs = 50;
var scheduler = new LoadTestScheduler(maxConcurrent: 5);
var depthSamples = new ConcurrentBag<int>();
scheduler.OnJobExecute = async (jobId) =>
{
depthSamples.Add(scheduler.Metrics.QueuedCount);
await Task.Delay(10);
};
// Act: Enqueue jobs and sample queue depth during processing
for (int i = 0; i < totalJobs; i++)
{
await scheduler.EnqueueAsync(new LoadTestJob
{
Id = $"depth-{i}",
Payload = $"image:{i}"
});
}
await scheduler.ProcessAllAsync(timeout: TimeSpan.FromSeconds(30));
// Assert
depthSamples.Should().NotBeEmpty("should have sampled queue depth");
// Queue depth should trend downward
var sortedSamples = depthSamples.OrderDescending().ToList();
sortedSamples.First().Should().BeGreaterThan(sortedSamples.Last(),
"queue depth should decrease over time");
scheduler.Metrics.QueuedCount.Should().Be(0, "all jobs should be processed");
}
#endregion
#region Fairness and Priority Tests
/// <summary>
/// Verifies FIFO ordering under normal load.
///
/// EDGE CASE: Job processing order.
/// Without priority, jobs should be processed in the order they were enqueued.
/// This ensures fairness and prevents starvation.
/// </summary>
[Fact]
public async Task NormalLoad_ProcessesJobsInFIFOOrder()
{
// Arrange
const int jobCount = 20;
var processingOrder = new ConcurrentBag<int>();
var scheduler = new LoadTestScheduler(maxConcurrent: 1); // Serial processing
scheduler.OnJobExecute = async (jobId) =>
{
var jobNumber = int.Parse(jobId.Split('-')[1]);
processingOrder.Add(jobNumber);
await Task.CompletedTask;
};
// Act: Enqueue jobs in order
for (int i = 0; i < jobCount; i++)
{
await scheduler.EnqueueAsync(new LoadTestJob
{
Id = $"fifo-{i}",
Payload = $"image:{i}"
});
}
await scheduler.ProcessAllAsync(timeout: TimeSpan.FromSeconds(10));
// Assert
var actualOrder = processingOrder.ToList();
actualOrder.Should().BeInAscendingOrder("jobs should be processed in FIFO order");
actualOrder.Should().HaveCount(jobCount);
}
#endregion
#region Test Infrastructure
private class LoadTestJob
{
public required string Id { get; init; }
public required string Payload { get; init; }
public int Priority { get; init; } = 0;
}
private class LoadTestScheduler
{
private readonly ConcurrentQueue<LoadTestJob> _queue = new();
private readonly SemaphoreSlim _concurrencyLimit;
private readonly int _maxQueueDepth;
private int _queuedCount;
private int _inflightCount;
private bool _isPaused;
public Func<string, Task> OnJobExecute { get; set; } = _ => Task.CompletedTask;
public LoadTestMetrics Metrics { get; } = new();
public LoadTestScheduler(int maxConcurrent, int maxQueueDepth = int.MaxValue)
{
_concurrencyLimit = new SemaphoreSlim(maxConcurrent, maxConcurrent);
_maxQueueDepth = maxQueueDepth;
}
public Task<bool> TryEnqueueAsync(LoadTestJob job)
{
if (_queuedCount >= _maxQueueDepth)
{
Interlocked.Increment(ref Metrics._totalRejected);
return Task.FromResult(false);
}
_queue.Enqueue(job);
Interlocked.Increment(ref _queuedCount);
Interlocked.Increment(ref Metrics._totalEnqueued);
return Task.FromResult(true);
}
public async Task EnqueueAsync(LoadTestJob job)
{
var success = await TryEnqueueAsync(job);
if (!success)
{
throw new InvalidOperationException("Queue is full");
}
}
public void PauseProcessing()
{
_isPaused = true;
}
public void ResumeProcessing()
{
_isPaused = false;
}
public async Task ProcessAllAsync(TimeSpan timeout)
{
_isPaused = false;
var cts = new CancellationTokenSource(timeout);
var processingTasks = new List<Task>();
while (!cts.Token.IsCancellationRequested)
{
if (_queue.IsEmpty && _inflightCount == 0)
{
break; // All jobs completed
}
if (_isPaused || !_queue.TryDequeue(out var job))
{
await Task.Delay(10, cts.Token);
continue;
}
Interlocked.Decrement(ref _queuedCount);
var task = ProcessJobAsync(job, cts.Token);
processingTasks.Add(task);
}
await Task.WhenAll(processingTasks);
}
private async Task ProcessJobAsync(LoadTestJob job, CancellationToken ct)
{
await _concurrencyLimit.WaitAsync(ct);
try
{
var currentInflight = Interlocked.Increment(ref _inflightCount);
Metrics.UpdatePeakConcurrency(currentInflight);
await OnJobExecute(job.Id);
Interlocked.Increment(ref Metrics._totalCompleted);
}
finally
{
Interlocked.Decrement(ref _inflightCount);
_concurrencyLimit.Release();
}
}
}
private class LoadTestMetrics
{
internal int _totalEnqueued;
internal int _totalCompleted;
internal int _totalRejected;
private int _peakConcurrency;
public int TotalEnqueued => _totalEnqueued;
public int TotalCompleted => _totalCompleted;
public int TotalRejected => _totalRejected;
public int PeakConcurrency => _peakConcurrency;
public int QueuedCount { get; set; }
public void UpdatePeakConcurrency(int current)
{
int peak;
do
{
peak = _peakConcurrency;
if (current <= peak) return;
}
while (Interlocked.CompareExchange(ref _peakConcurrency, current, peak) != peak);
}
}
#endregion
}

View File

@@ -0,0 +1,481 @@
// -----------------------------------------------------------------------------
// QueueDepthMetricsTests.cs
// Sprint: SPRINT_20251229_004_004_BE_scheduler_resilience
// Task: SCH-008
// Description: Tests for queue depth and backpressure metrics verification
// -----------------------------------------------------------------------------
using System.Collections.Concurrent;
using FluentAssertions;
using Xunit;
namespace StellaOps.Scheduler.Worker.Tests.Metrics;
/// <summary>
/// Tests for scheduler metrics: queue depth, in-flight jobs, and backpressure signals.
///
/// EDGE CASE: Metric accuracy under concurrent operations.
/// Metrics must be updated atomically using Interlocked operations.
/// Race conditions in metric updates can lead to incorrect monitoring data.
///
/// EDGE CASE: Metric staleness vs. performance.
/// Metrics should be updated immediately on state changes, but without
/// introducing lock contention that would slow down job processing.
///
/// EDGE CASE: Backpressure signal timing.
/// The scheduler.backpressure.rejections metric must increment BEFORE
/// returning failure to the caller, ensuring accurate monitoring.
/// </summary>
[Trait("Category", "Metrics")]
[Trait("Category", "Observability")]
[Trait("Sprint", "SPRINT_20251229_004_004_BE")]
public sealed class QueueDepthMetricsTests
{
#region Queue Depth Metrics Tests
/// <summary>
/// Verifies that scheduler.jobs.queued metric reflects actual queue depth.
///
/// EDGE CASE: Queued vs. in-flight distinction.
/// Queued jobs are waiting for execution. In-flight jobs are currently running.
/// These must be tracked separately for accurate capacity planning.
///
/// EDGE CASE: Atomic metric updates.
/// Queue depth increments (on enqueue) and decrements (on pickup) must
/// be atomic to prevent race conditions from corrupting the metric.
/// </summary>
[Fact]
public async Task QueuedMetric_ReflectsActualQueueDepth()
{
// Arrange
var scheduler = new MetricsTestScheduler(maxConcurrent: 2);
var metrics = scheduler.Metrics;
// Act: Enqueue 5 jobs
for (int i = 0; i < 5; i++)
{
await scheduler.EnqueueAsync(new MetricsTestJob
{
Id = $"job-{i}",
Payload = $"task-{i}"
});
}
// Assert: Queued count should be 5
metrics.QueuedJobs.Should().Be(5, "all enqueued jobs should be counted");
// Act: Start processing (concurrency limit = 2)
_ = Task.Run(() => scheduler.ProcessNextBatchAsync());
await Task.Delay(100); // Allow processing to start
// Assert: Queued should decrease as jobs start
metrics.QueuedJobs.Should().BeLessThan(5, "jobs being processed should leave queue");
metrics.InflightJobs.Should().BeGreaterThan(0, "picked-up jobs should be in-flight");
// Wait for all jobs to complete
await scheduler.WaitForCompletionAsync(timeout: TimeSpan.FromSeconds(5));
// Assert: All queues should be empty
metrics.QueuedJobs.Should().Be(0, "queue should be empty after processing");
metrics.InflightJobs.Should().Be(0, "no jobs should be in-flight after completion");
}
/// <summary>
/// Verifies that scheduler.jobs.inflight metric respects concurrency limit.
///
/// EDGE CASE: Peak concurrency tracking.
/// The metric must track both current and peak in-flight count.
/// Peak is useful for capacity planning and SLA verification.
/// </summary>
[Fact]
public async Task InflightMetric_RespectsConcurrencyLimit()
{
// Arrange
const int maxConcurrent = 5;
var scheduler = new MetricsTestScheduler(maxConcurrent);
var metrics = scheduler.Metrics;
var inflightSamples = new ConcurrentBag<int>();
scheduler.OnJobStart = (jobId) =>
{
inflightSamples.Add(metrics.InflightJobs);
};
// Act: Enqueue 20 jobs
for (int i = 0; i < 20; i++)
{
await scheduler.EnqueueAsync(new MetricsTestJob
{
Id = $"job-{i}",
Payload = $"task-{i}",
Duration = TimeSpan.FromMilliseconds(50)
});
}
await scheduler.ProcessAllAsync(timeout: TimeSpan.FromSeconds(10));
// Assert
inflightSamples.Should().NotBeEmpty("should have sampled in-flight counts");
inflightSamples.Max().Should().BeLessThanOrEqualTo(maxConcurrent,
"in-flight count should never exceed concurrency limit");
metrics.PeakInflightJobs.Should().BeLessThanOrEqualTo(maxConcurrent,
"peak in-flight should respect concurrency limit");
}
#endregion
#region Backpressure Metrics Tests
/// <summary>
/// Verifies that scheduler.backpressure.rejections increments when queue is full.
///
/// EDGE CASE: Rejection count accuracy.
/// Each rejected enqueue attempt must increment the rejection counter exactly once.
/// This metric is critical for upstream circuit breakers and rate limiting.
/// </summary>
[Fact]
public async Task BackpressureRejections_IncrementsOnQueueFull()
{
// Arrange
const int queueCapacity = 10;
var scheduler = new MetricsTestScheduler(maxConcurrent: 1, maxQueueDepth: queueCapacity);
var metrics = scheduler.Metrics;
// Pause processing to fill queue
scheduler.PauseProcessing();
// Act: Fill queue to capacity
for (int i = 0; i < queueCapacity; i++)
{
await scheduler.EnqueueAsync(new MetricsTestJob
{
Id = $"fill-{i}",
Payload = $"task-{i}"
});
}
metrics.RejectedJobs.Should().Be(0, "no rejections yet");
// Try to enqueue 5 more (should all be rejected)
for (int i = 0; i < 5; i++)
{
var enqueued = await scheduler.TryEnqueueAsync(new MetricsTestJob
{
Id = $"overflow-{i}",
Payload = $"task-{i}"
});
enqueued.Should().BeFalse("queue is full");
}
// Assert
metrics.RejectedJobs.Should().Be(5, "should count all 5 rejected enqueue attempts");
}
/// <summary>
/// Verifies that backpressure metrics reset correctly after queue drains.
///
/// EDGE CASE: Metric reset semantics.
/// Rejection counters are cumulative (monotonically increasing).
/// They should NOT reset when queue drains, as they track lifetime rejections.
/// </summary>
[Fact]
public async Task BackpressureMetrics_DoNotResetAfterDrain()
{
// Arrange
const int queueCapacity = 5;
var scheduler = new MetricsTestScheduler(maxConcurrent: 1, maxQueueDepth: queueCapacity);
var metrics = scheduler.Metrics;
scheduler.PauseProcessing();
// Fill queue
for (int i = 0; i < queueCapacity; i++)
{
await scheduler.EnqueueAsync(new MetricsTestJob { Id = $"job-{i}", Payload = $"task-{i}" });
}
// Reject 3 jobs
for (int i = 0; i < 3; i++)
{
await scheduler.TryEnqueueAsync(new MetricsTestJob { Id = $"reject-{i}", Payload = $"task-{i}" });
}
var rejectionsBeforeDrain = metrics.RejectedJobs;
rejectionsBeforeDrain.Should().Be(3);
// Act: Drain queue
scheduler.ResumeProcessing();
await scheduler.ProcessAllAsync(timeout: TimeSpan.FromSeconds(5));
// Assert
metrics.RejectedJobs.Should().Be(rejectionsBeforeDrain,
"rejection counter should not reset after drain (cumulative metric)");
}
#endregion
#region Throughput Metrics Tests
/// <summary>
/// Verifies that scheduler tracks completed job count correctly.
///
/// EDGE CASE: Completed vs. failed job distinction.
/// Completed jobs succeeded. Failed jobs exhausted retries or had fatal errors.
/// These must be tracked separately for SLA monitoring.
/// </summary>
[Fact]
public async Task CompletedMetric_TracksSuccessfulJobs()
{
// Arrange
var scheduler = new MetricsTestScheduler(maxConcurrent: 5);
var metrics = scheduler.Metrics;
// Act: Enqueue and process 10 jobs
for (int i = 0; i < 10; i++)
{
await scheduler.EnqueueAsync(new MetricsTestJob
{
Id = $"job-{i}",
Payload = $"task-{i}",
Duration = TimeSpan.FromMilliseconds(10)
});
}
await scheduler.ProcessAllAsync(timeout: TimeSpan.FromSeconds(5));
// Assert
metrics.CompletedJobs.Should().Be(10, "all jobs should complete successfully");
metrics.FailedJobs.Should().Be(0, "no jobs should fail");
metrics.TotalEnqueued.Should().Be(10);
}
/// <summary>
/// Verifies that failed jobs are counted separately.
///
/// EDGE CASE: Transient vs. permanent failure.
/// Transient failures trigger retry. Permanent failures go to poison queue.
/// Only permanent failures (after max retries) should increment failed counter.
/// </summary>
[Fact]
public async Task FailedMetric_TracksJobsExceedingRetries()
{
// Arrange
var scheduler = new MetricsTestScheduler(maxConcurrent: 2);
var metrics = scheduler.Metrics;
scheduler.OnJobExecute = (jobId) =>
{
throw new InvalidOperationException("Simulated failure");
};
// Act: Enqueue 5 jobs that will all fail
for (int i = 0; i < 5; i++)
{
await scheduler.EnqueueAsync(new MetricsTestJob
{
Id = $"failing-job-{i}",
Payload = $"task-{i}",
MaxRetries = 2
});
}
await scheduler.ProcessAllAsync(timeout: TimeSpan.FromSeconds(10), expectFailures: true);
// Assert
metrics.FailedJobs.Should().Be(5, "all jobs should fail after max retries");
metrics.CompletedJobs.Should().Be(0, "no jobs should complete");
}
#endregion
#region Test Infrastructure
private class MetricsTestJob
{
public required string Id { get; init; }
public required string Payload { get; init; }
public TimeSpan Duration { get; init; } = TimeSpan.FromMilliseconds(10);
public int MaxRetries { get; init; } = 3;
public int Attempts { get; set; }
}
private class MetricsTestScheduler
{
private readonly ConcurrentQueue<MetricsTestJob> _queue = new();
private readonly SemaphoreSlim _concurrencyLimit;
private readonly int _maxQueueDepth;
private bool _isPaused;
public Action<string> OnJobStart { get; set; } = _ => { };
public Func<string, Task> OnJobExecute { get; set; } = _ => Task.CompletedTask;
public SchedulerMetrics Metrics { get; } = new();
public MetricsTestScheduler(int maxConcurrent, int maxQueueDepth = int.MaxValue)
{
_concurrencyLimit = new SemaphoreSlim(maxConcurrent, maxConcurrent);
_maxQueueDepth = maxQueueDepth;
}
public Task<bool> TryEnqueueAsync(MetricsTestJob job)
{
if (Metrics.QueuedJobs >= _maxQueueDepth)
{
Metrics.IncrementRejected();
return Task.FromResult(false);
}
_queue.Enqueue(job);
Metrics.IncrementQueued();
Metrics.IncrementTotalEnqueued();
return Task.FromResult(true);
}
public async Task EnqueueAsync(MetricsTestJob job)
{
var success = await TryEnqueueAsync(job);
if (!success)
{
throw new InvalidOperationException("Queue is full");
}
}
public void PauseProcessing() => _isPaused = true;
public void ResumeProcessing() => _isPaused = false;
public async Task ProcessNextBatchAsync()
{
while (_queue.TryDequeue(out var job))
{
if (_isPaused)
{
_queue.Enqueue(job); // Put it back
await Task.Delay(100);
continue;
}
Metrics.DecrementQueued();
_ = ProcessJobAsync(job);
}
}
public async Task ProcessAllAsync(TimeSpan timeout, bool expectFailures = false)
{
var cts = new CancellationTokenSource(timeout);
var tasks = new List<Task>();
while (!cts.Token.IsCancellationRequested)
{
if (_queue.IsEmpty && Metrics.InflightJobs == 0)
{
break;
}
if (_isPaused || !_queue.TryDequeue(out var job))
{
await Task.Delay(10, cts.Token);
continue;
}
Metrics.DecrementQueued();
var task = ProcessJobAsync(job, expectFailures);
tasks.Add(task);
}
await Task.WhenAll(tasks);
}
public Task WaitForCompletionAsync(TimeSpan timeout)
{
return ProcessAllAsync(timeout);
}
private async Task ProcessJobAsync(MetricsTestJob job, bool expectFailures = false)
{
await _concurrencyLimit.WaitAsync();
try
{
Metrics.IncrementInflight();
OnJobStart(job.Id);
await OnJobExecute(job.Id);
await Task.Delay(job.Duration);
Metrics.IncrementCompleted();
}
catch when (expectFailures)
{
job.Attempts++;
if (job.Attempts > job.MaxRetries)
{
Metrics.IncrementFailed();
}
else
{
// Re-enqueue for retry
_queue.Enqueue(job);
Metrics.IncrementQueued();
}
}
finally
{
Metrics.DecrementInflight();
_concurrencyLimit.Release();
}
}
}
private class SchedulerMetrics
{
private int _queuedJobs;
private int _inflightJobs;
private int _peakInflightJobs;
private int _completedJobs;
private int _failedJobs;
private int _rejectedJobs;
private int _totalEnqueued;
public int QueuedJobs => _queuedJobs;
public int InflightJobs => _inflightJobs;
public int PeakInflightJobs => _peakInflightJobs;
public int CompletedJobs => _completedJobs;
public int FailedJobs => _failedJobs;
public int RejectedJobs => _rejectedJobs;
public int TotalEnqueued => _totalEnqueued;
public void IncrementQueued() => Interlocked.Increment(ref _queuedJobs);
public void DecrementQueued() => Interlocked.Decrement(ref _queuedJobs);
public void IncrementInflight()
{
var current = Interlocked.Increment(ref _inflightJobs);
UpdatePeak(current);
}
public void DecrementInflight() => Interlocked.Decrement(ref _inflightJobs);
public void IncrementCompleted() => Interlocked.Increment(ref _completedJobs);
public void IncrementFailed() => Interlocked.Increment(ref _failedJobs);
public void IncrementRejected() => Interlocked.Increment(ref _rejectedJobs);
public void IncrementTotalEnqueued() => Interlocked.Increment(ref _totalEnqueued);
private void UpdatePeak(int current)
{
int peak;
do
{
peak = _peakInflightJobs;
if (current <= peak) return;
}
while (Interlocked.CompareExchange(ref _peakInflightJobs, current, peak) != peak);
}
}
#endregion
}