up
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
Export Center CI / export-ci (push) Has been cancelled
Airgap Sealed CI Smoke / sealed-smoke (push) Has been cancelled
Console CI / console-ci (push) Has been cancelled
devportal-offline / build-offline (push) Has been cancelled
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
Export Center CI / export-ci (push) Has been cancelled
Airgap Sealed CI Smoke / sealed-smoke (push) Has been cancelled
Console CI / console-ci (push) Has been cancelled
devportal-offline / build-offline (push) Has been cancelled
This commit is contained in:
@@ -93,7 +93,7 @@ public sealed class TriggerRepository : RepositoryBase<SchedulerDataSource>, ITr
|
||||
metadata, created_at, updated_at, created_by
|
||||
FROM scheduler.triggers
|
||||
WHERE enabled = TRUE AND next_fire_at <= NOW()
|
||||
ORDER BY next_fire_at
|
||||
ORDER BY next_fire_at, tenant_id, id
|
||||
LIMIT @limit
|
||||
""";
|
||||
|
||||
|
||||
@@ -126,4 +126,56 @@ public sealed class DistributedLockRepositoryTests : IAsyncLifetime
|
||||
// Assert
|
||||
locks.Should().HaveCount(2);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task TryAcquire_IsExclusiveAcrossConcurrentCallers()
|
||||
{
|
||||
// Arrange
|
||||
var lockKey = $"concurrent-lock-{Guid.NewGuid()}";
|
||||
var duration = TimeSpan.FromSeconds(5);
|
||||
|
||||
// Act
|
||||
var attempts = Enumerable.Range(0, 8)
|
||||
.Select(i => Task.Run(() => _repository.TryAcquireAsync(_tenantId, lockKey, $"worker-{i}", duration)))
|
||||
.ToArray();
|
||||
|
||||
var results = await Task.WhenAll(attempts);
|
||||
|
||||
// Assert
|
||||
var successIndexes = results
|
||||
.Select((acquired, index) => (acquired, index))
|
||||
.Where(tuple => tuple.acquired)
|
||||
.Select(tuple => tuple.index)
|
||||
.ToList();
|
||||
|
||||
successIndexes.Should().HaveCount(1);
|
||||
var winningHolder = $"worker-{successIndexes.Single()}";
|
||||
|
||||
var persisted = await _repository.GetAsync(lockKey);
|
||||
persisted.Should().NotBeNull();
|
||||
persisted!.HolderId.Should().Be(winningHolder);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task TryAcquire_AllowsReacquireAfterExpiration()
|
||||
{
|
||||
// Arrange
|
||||
var lockKey = $"expiring-lock-{Guid.NewGuid()}";
|
||||
var shortDuration = TimeSpan.FromMilliseconds(500);
|
||||
|
||||
await _repository.TryAcquireAsync(_tenantId, lockKey, "worker-initial", shortDuration);
|
||||
|
||||
// Wait for expiration with a small safety buffer.
|
||||
await Task.Delay(1100);
|
||||
|
||||
// Act
|
||||
var reacquired = await _repository.TryAcquireAsync(_tenantId, lockKey, "worker-retry", TimeSpan.FromSeconds(5));
|
||||
|
||||
// Assert
|
||||
reacquired.Should().BeTrue();
|
||||
|
||||
var persisted = await _repository.GetAsync(lockKey);
|
||||
persisted.Should().NotBeNull();
|
||||
persisted!.HolderId.Should().Be("worker-retry");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -185,14 +185,55 @@ public sealed class TriggerRepositoryTests : IAsyncLifetime
|
||||
fetched.Should().BeNull();
|
||||
}
|
||||
|
||||
private TriggerEntity CreateTrigger(string name, string cron) => new()
|
||||
[Fact]
|
||||
public async Task GetDueTriggers_IsDeterministicForEqualNextFire()
|
||||
{
|
||||
Id = Guid.NewGuid(),
|
||||
TenantId = _tenantId,
|
||||
// Arrange
|
||||
var baseTime = DateTimeOffset.UtcNow.AddMinutes(-5);
|
||||
var dueAt = new DateTimeOffset(
|
||||
baseTime.Ticks - (baseTime.Ticks % TimeSpan.TicksPerMillisecond),
|
||||
baseTime.Offset);
|
||||
|
||||
const string tenantA = "tenant-a";
|
||||
const string tenantB = "tenant-b";
|
||||
|
||||
var triggerA = CreateTrigger("deterministic-a", "* * * * *", Guid.Parse("11111111-1111-1111-1111-111111111111"), dueAt, tenantA);
|
||||
var triggerB = CreateTrigger("deterministic-b", "* * * * *", Guid.Parse("22222222-2222-2222-2222-222222222222"), dueAt, tenantA);
|
||||
var triggerC = CreateTrigger("deterministic-c", "* * * * *", Guid.Parse("33333333-3333-3333-3333-333333333333"), dueAt, tenantB);
|
||||
|
||||
await _repository.CreateAsync(triggerB);
|
||||
await _repository.CreateAsync(triggerC);
|
||||
await _repository.CreateAsync(triggerA); // Insert out of order on purpose
|
||||
|
||||
var expectedOrder = new[]
|
||||
{
|
||||
triggerA.Id,
|
||||
triggerB.Id,
|
||||
triggerC.Id
|
||||
};
|
||||
|
||||
// Act
|
||||
var first = await _repository.GetDueTriggersAsync(limit: 10);
|
||||
var second = await _repository.GetDueTriggersAsync(limit: 10);
|
||||
|
||||
// Assert
|
||||
first.Select(t => t.Id).Should().Equal(expectedOrder);
|
||||
second.Select(t => t.Id).Should().Equal(expectedOrder);
|
||||
}
|
||||
|
||||
private TriggerEntity CreateTrigger(
|
||||
string name,
|
||||
string cron,
|
||||
Guid? id = null,
|
||||
DateTimeOffset? nextFireAt = null,
|
||||
string? tenantId = null) => new()
|
||||
{
|
||||
Id = id ?? Guid.NewGuid(),
|
||||
TenantId = tenantId ?? _tenantId,
|
||||
Name = name,
|
||||
JobType = "test-job",
|
||||
CronExpression = cron,
|
||||
Enabled = true,
|
||||
NextFireAt = DateTimeOffset.UtcNow.AddHours(1)
|
||||
NextFireAt = nextFireAt ?? DateTimeOffset.UtcNow.AddHours(1)
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user