todays product advirories implemented
This commit is contained in:
@@ -69,19 +69,25 @@ internal sealed class PlannerBackgroundService : BackgroundService
|
||||
|
||||
var processed = 0;
|
||||
var tenantsInFlight = new HashSet<string>(StringComparer.Ordinal);
|
||||
var orderedRuns = planningRuns
|
||||
.OrderBy(run => GetTriggerPriority(run.Trigger))
|
||||
.ThenBy(run => run.CreatedAt)
|
||||
.ToList();
|
||||
|
||||
foreach (var run in planningRuns)
|
||||
foreach (var run in orderedRuns)
|
||||
{
|
||||
if (!tenantsInFlight.Contains(run.TenantId) ||
|
||||
tenantsInFlight.Count < _options.Planner.MaxConcurrentTenants)
|
||||
{
|
||||
tenantsInFlight.Add(run.TenantId);
|
||||
}
|
||||
else
|
||||
if (tenantsInFlight.Contains(run.TenantId))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (tenantsInFlight.Count >= _options.Planner.MaxConcurrentTenants)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
tenantsInFlight.Add(run.TenantId);
|
||||
|
||||
await WaitForRateLimitAsync(stoppingToken).ConfigureAwait(false);
|
||||
|
||||
try
|
||||
@@ -165,4 +171,13 @@ internal sealed class PlannerBackgroundService : BackgroundService
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
private static int GetTriggerPriority(RunTrigger trigger) => trigger switch
|
||||
{
|
||||
RunTrigger.Manual => 0,
|
||||
RunTrigger.Conselier => 1,
|
||||
RunTrigger.Excitor => 2,
|
||||
RunTrigger.Cron => 3,
|
||||
_ => 4
|
||||
};
|
||||
}
|
||||
|
||||
@@ -420,6 +420,11 @@ public sealed class CronNextRunPropertyTests
|
||||
throw new ArgumentException("Invalid cron expression format");
|
||||
}
|
||||
|
||||
if (parts.Any(part => part.Contains('L', StringComparison.OrdinalIgnoreCase)))
|
||||
{
|
||||
throw new ArgumentException("Cron expressions with 'L' are not supported by the test helper.");
|
||||
}
|
||||
|
||||
// Simplified next-run computation (deterministic)
|
||||
// This is a simplified implementation for testing - real implementation uses Cronos or similar
|
||||
var candidate = localTime.AddMinutes(1);
|
||||
|
||||
@@ -849,6 +849,8 @@ public sealed class IdempotentWorker
|
||||
|
||||
// Check idempotency key
|
||||
var idempotencyKey = GetIdempotencyKey(job);
|
||||
if (_resultCache.ContainsKey(idempotencyKey))
|
||||
return false;
|
||||
if (_idempotencyStore != null)
|
||||
{
|
||||
var now = _clock?.UtcNow ?? DateTime.UtcNow;
|
||||
|
||||
@@ -267,14 +267,14 @@ public sealed class SchedulerBackpressureTests
|
||||
{
|
||||
// Arrange
|
||||
const int jobCount = 20;
|
||||
var processingOrder = new ConcurrentBag<int>();
|
||||
var processingOrder = new ConcurrentQueue<int>();
|
||||
|
||||
var scheduler = new LoadTestScheduler(maxConcurrent: 1); // Serial processing
|
||||
|
||||
scheduler.OnJobExecute = async (jobId) =>
|
||||
{
|
||||
var jobNumber = int.Parse(jobId.Split('-')[1]);
|
||||
processingOrder.Add(jobNumber);
|
||||
processingOrder.Enqueue(jobNumber);
|
||||
await Task.CompletedTask;
|
||||
};
|
||||
|
||||
@@ -291,7 +291,7 @@ public sealed class SchedulerBackpressureTests
|
||||
await scheduler.ProcessAllAsync(timeout: TimeSpan.FromSeconds(10));
|
||||
|
||||
// Assert
|
||||
var actualOrder = processingOrder.ToList();
|
||||
var actualOrder = processingOrder.ToArray();
|
||||
actualOrder.Should().BeInAscendingOrder("jobs should be processed in FIFO order");
|
||||
actualOrder.Should().HaveCount(jobCount);
|
||||
}
|
||||
@@ -335,8 +335,9 @@ public sealed class SchedulerBackpressureTests
|
||||
}
|
||||
|
||||
_queue.Enqueue(job);
|
||||
Interlocked.Increment(ref _queuedCount);
|
||||
var queued = Interlocked.Increment(ref _queuedCount);
|
||||
Interlocked.Increment(ref Metrics._totalEnqueued);
|
||||
Metrics.QueuedCount = queued;
|
||||
|
||||
return Task.FromResult(true);
|
||||
}
|
||||
@@ -380,7 +381,8 @@ public sealed class SchedulerBackpressureTests
|
||||
continue;
|
||||
}
|
||||
|
||||
Interlocked.Decrement(ref _queuedCount);
|
||||
var queued = Interlocked.Decrement(ref _queuedCount);
|
||||
Metrics.QueuedCount = queued;
|
||||
|
||||
var task = ProcessJobAsync(job, cts.Token);
|
||||
processingTasks.Add(task);
|
||||
|
||||
@@ -57,7 +57,8 @@ public sealed class QueueDepthMetricsTests
|
||||
await scheduler.EnqueueAsync(new MetricsTestJob
|
||||
{
|
||||
Id = $"job-{i}",
|
||||
Payload = $"task-{i}"
|
||||
Payload = $"task-{i}",
|
||||
Duration = TimeSpan.FromMilliseconds(250)
|
||||
});
|
||||
}
|
||||
|
||||
@@ -66,7 +67,7 @@ public sealed class QueueDepthMetricsTests
|
||||
|
||||
// Act: Start processing (concurrency limit = 2)
|
||||
_ = Task.Run(() => scheduler.ProcessNextBatchAsync());
|
||||
await Task.Delay(100); // Allow processing to start
|
||||
await Task.Delay(20); // Allow processing to start
|
||||
|
||||
// Assert: Queued should decrease as jobs start
|
||||
metrics.QueuedJobs.Should().BeLessThan(5, "jobs being processed should leave queue");
|
||||
|
||||
@@ -872,16 +872,22 @@ public sealed class TracedSchedulerWorker
|
||||
|
||||
public async Task<bool> ProcessAsync(string jobId, CancellationToken cancellationToken)
|
||||
{
|
||||
using var pickActivity = _source.StartActivity("job.pick");
|
||||
pickActivity?.SetTag("job_id", jobId);
|
||||
|
||||
using var activity = _source.StartActivity("job.process");
|
||||
activity?.SetTag("job_id", jobId);
|
||||
|
||||
var job = await _jobStore.GetByIdAsync(jobId);
|
||||
if (job == null)
|
||||
{
|
||||
pickActivity?.SetTag("job_found", false);
|
||||
activity?.SetTag("job_found", false);
|
||||
return false;
|
||||
}
|
||||
|
||||
pickActivity?.SetTag("job_found", true);
|
||||
pickActivity?.SetTag("tenant_id", job.TenantId);
|
||||
activity?.SetTag("job_found", true);
|
||||
activity?.SetTag("tenant_id", job.TenantId);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user