275 lines
10 KiB
C#
275 lines
10 KiB
C#
using FluentAssertions;
|
||
using Microsoft.Extensions.Logging.Abstractions;
|
||
using Microsoft.Extensions.Options;
|
||
using StellaOps.Scheduler.Persistence.Postgres.Models;
|
||
using StellaOps.Scheduler.Persistence.Postgres.Repositories;
|
||
using Xunit;
|
||
|
||
using StellaOps.TestKit;
|
||
namespace StellaOps.Scheduler.Persistence.Postgres.Tests;
|
||
|
||
/// <summary>
|
||
/// Fixed TimeProvider for deterministic tests.
|
||
/// Returns a fixed UTC time regardless of wall-clock.
|
||
/// </summary>
|
||
internal sealed class FixedTimeProvider : TimeProvider
|
||
{
|
||
private readonly DateTimeOffset _fixedTime;
|
||
public FixedTimeProvider(DateTimeOffset fixedTime) => _fixedTime = fixedTime;
|
||
public override DateTimeOffset GetUtcNow() => _fixedTime;
|
||
}
|
||
|
||
/// <summary>
|
||
/// Integration tests verifying that Scheduler repositories honour the injected
|
||
/// <see cref="TimeProvider"/> instead of relying on SQL <c>NOW()</c>.
|
||
///
|
||
/// Each repository constructor accepts an optional <c>TimeProvider? timeProvider = null</c>
|
||
/// parameter. When a <see cref="FixedTimeProvider"/> set to a distinctive past date is
|
||
/// injected, every timestamp column that the repository writes via <c>@now</c> must
|
||
/// reflect that fixed date, not the database server clock.
|
||
/// </summary>
|
||
[Collection(SchedulerPostgresCollection.Name)]
|
||
public sealed class TimeProviderIntegrationTests : IAsyncLifetime
|
||
{
|
||
private static readonly DateTimeOffset FixedTime =
|
||
new(2020, 6, 15, 12, 0, 0, TimeSpan.Zero);
|
||
|
||
private readonly SchedulerPostgresFixture _fixture;
|
||
private readonly SchedulerDataSource _dataSource;
|
||
|
||
public TimeProviderIntegrationTests(SchedulerPostgresFixture fixture)
|
||
{
|
||
_fixture = fixture;
|
||
|
||
var options = fixture.Fixture.CreateOptions();
|
||
options.SchemaName = fixture.SchemaName;
|
||
_dataSource = new SchedulerDataSource(Options.Create(options), NullLogger<SchedulerDataSource>.Instance);
|
||
}
|
||
|
||
public ValueTask InitializeAsync() => new(_fixture.TruncateAllTablesAsync());
|
||
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
||
|
||
// -----------------------------------------------------------------------
|
||
// DistributedLockRepository
|
||
// -----------------------------------------------------------------------
|
||
|
||
[Trait("Category", TestCategories.Unit)]
|
||
[Fact]
|
||
public async Task TryAcquire_UsesTimeProvider_ForExpiresAt()
|
||
{
|
||
// Arrange
|
||
var fixedTimeProvider = new FixedTimeProvider(FixedTime);
|
||
var repository = new DistributedLockRepository(
|
||
_dataSource,
|
||
NullLogger<DistributedLockRepository>.Instance,
|
||
fixedTimeProvider);
|
||
|
||
var lockKey = $"tp-lock-{Guid.NewGuid()}";
|
||
var tenantId = Guid.NewGuid().ToString();
|
||
var duration = TimeSpan.FromMinutes(5);
|
||
|
||
// Act
|
||
var acquired = await repository.TryAcquireAsync(tenantId, lockKey, "holder-1", duration);
|
||
|
||
// Assert – acquisition must succeed
|
||
acquired.Should().BeTrue();
|
||
|
||
// Read the lock back (same repository / same fixed TimeProvider so
|
||
// the expires_at > @now check uses the same fixed time).
|
||
var lockInfo = await repository.GetAsync(lockKey);
|
||
|
||
lockInfo.Should().NotBeNull("the lock should be readable with the same TimeProvider");
|
||
|
||
// expires_at must equal FixedTime + duration (the INSERT sets expires_at = @now + @duration)
|
||
lockInfo!.ExpiresAt.Should().BeCloseTo(FixedTime + duration, TimeSpan.FromSeconds(1));
|
||
}
|
||
|
||
[Trait("Category", TestCategories.Unit)]
|
||
[Fact]
|
||
public async Task TryAcquire_OnConflict_UsesTimeProvider_ForAcquiredAtAndExpiresAt()
|
||
{
|
||
// Arrange — first acquire with a fixed past time to create the lock with a known expires_at
|
||
var earlyTime = new DateTimeOffset(2020, 6, 15, 10, 0, 0, TimeSpan.Zero);
|
||
var earlyProvider = new FixedTimeProvider(earlyTime);
|
||
var earlyRepo = new DistributedLockRepository(
|
||
_dataSource,
|
||
NullLogger<DistributedLockRepository>.Instance,
|
||
earlyProvider);
|
||
|
||
var lockKey = $"tp-conflict-{Guid.NewGuid()}";
|
||
var tenantId = Guid.NewGuid().ToString();
|
||
var shortDuration = TimeSpan.FromMilliseconds(200);
|
||
|
||
await earlyRepo.TryAcquireAsync(tenantId, lockKey, "holder-old", shortDuration);
|
||
|
||
// The lock's expires_at is earlyTime + 200ms = 2020-06-15T10:00:00.200Z
|
||
// Now re-acquire with a later fixed time that exceeds the expires_at
|
||
// This triggers the ON CONFLICT DO UPDATE where expires_at < @now
|
||
var laterTime = new DateTimeOffset(2020, 6, 15, 12, 0, 0, TimeSpan.Zero);
|
||
var laterProvider = new FixedTimeProvider(laterTime);
|
||
var laterRepo = new DistributedLockRepository(
|
||
_dataSource,
|
||
NullLogger<DistributedLockRepository>.Instance,
|
||
laterProvider);
|
||
|
||
var duration = TimeSpan.FromMinutes(10);
|
||
|
||
// Act
|
||
var reacquired = await laterRepo.TryAcquireAsync(tenantId, lockKey, "holder-new", duration);
|
||
|
||
// Assert
|
||
reacquired.Should().BeTrue("expired lock should be reacquirable");
|
||
|
||
var lockInfo = await laterRepo.GetAsync(lockKey);
|
||
lockInfo.Should().NotBeNull();
|
||
|
||
// ON CONFLICT path sets acquired_at = @now
|
||
lockInfo!.AcquiredAt.Should().BeCloseTo(laterTime, TimeSpan.FromSeconds(1));
|
||
|
||
// ON CONFLICT path sets expires_at = @now + @duration
|
||
lockInfo.ExpiresAt.Should().BeCloseTo(laterTime + duration, TimeSpan.FromSeconds(1));
|
||
|
||
lockInfo.HolderId.Should().Be("holder-new");
|
||
}
|
||
|
||
// -----------------------------------------------------------------------
|
||
// WorkerRepository
|
||
// -----------------------------------------------------------------------
|
||
|
||
[Trait("Category", TestCategories.Unit)]
|
||
[Fact]
|
||
public async Task Heartbeat_UsesTimeProvider_ForLastHeartbeatAt()
|
||
{
|
||
// Arrange — insert a worker first (using system time so the row exists)
|
||
var systemRepo = new WorkerRepository(
|
||
_dataSource,
|
||
NullLogger<WorkerRepository>.Instance);
|
||
|
||
var worker = new WorkerEntity
|
||
{
|
||
Id = $"tp-worker-hb-{Guid.NewGuid()}",
|
||
Hostname = "test-host",
|
||
Status = WorkerStatus.Active,
|
||
JobTypes = ["scan"],
|
||
MaxConcurrentJobs = 4
|
||
};
|
||
await systemRepo.UpsertAsync(worker);
|
||
|
||
// Now heartbeat with a fixed TimeProvider
|
||
var fixedTimeProvider = new FixedTimeProvider(FixedTime);
|
||
var fixedRepo = new WorkerRepository(
|
||
_dataSource,
|
||
NullLogger<WorkerRepository>.Instance,
|
||
fixedTimeProvider);
|
||
|
||
// Act
|
||
var updated = await fixedRepo.HeartbeatAsync(worker.Id, 2);
|
||
|
||
// Assert
|
||
updated.Should().BeTrue();
|
||
|
||
// Read back and verify the timestamp
|
||
var fetched = await fixedRepo.GetByIdAsync(worker.Id);
|
||
fetched.Should().NotBeNull();
|
||
fetched!.LastHeartbeatAt.Should().BeCloseTo(FixedTime, TimeSpan.FromSeconds(1));
|
||
fetched.CurrentJobs.Should().Be(2);
|
||
}
|
||
|
||
[Trait("Category", TestCategories.Unit)]
|
||
[Fact]
|
||
public async Task Upsert_OnConflict_UsesTimeProvider_ForLastHeartbeatAt()
|
||
{
|
||
// Arrange — insert a worker first so the second upsert triggers ON CONFLICT
|
||
var systemRepo = new WorkerRepository(
|
||
_dataSource,
|
||
NullLogger<WorkerRepository>.Instance);
|
||
|
||
var workerId = $"tp-worker-upsert-{Guid.NewGuid()}";
|
||
var worker = new WorkerEntity
|
||
{
|
||
Id = workerId,
|
||
Hostname = "test-host",
|
||
Status = WorkerStatus.Active,
|
||
JobTypes = ["scan"],
|
||
MaxConcurrentJobs = 4
|
||
};
|
||
await systemRepo.UpsertAsync(worker);
|
||
|
||
// Second upsert with fixed TimeProvider triggers ON CONFLICT DO UPDATE
|
||
// which sets last_heartbeat_at = @now
|
||
var fixedTimeProvider = new FixedTimeProvider(FixedTime);
|
||
var fixedRepo = new WorkerRepository(
|
||
_dataSource,
|
||
NullLogger<WorkerRepository>.Instance,
|
||
fixedTimeProvider);
|
||
|
||
var updatedWorker = new WorkerEntity
|
||
{
|
||
Id = workerId,
|
||
Hostname = "updated-host",
|
||
Status = WorkerStatus.Active,
|
||
JobTypes = ["scan", "sbom"],
|
||
MaxConcurrentJobs = 8
|
||
};
|
||
|
||
// Act
|
||
var returned = await fixedRepo.UpsertAsync(updatedWorker);
|
||
|
||
// Assert — the returned entity should have last_heartbeat_at equal to our fixed time
|
||
returned.Should().NotBeNull();
|
||
returned.LastHeartbeatAt.Should().BeCloseTo(FixedTime, TimeSpan.FromSeconds(1));
|
||
returned.Hostname.Should().Be("updated-host");
|
||
returned.MaxConcurrentJobs.Should().Be(8);
|
||
|
||
// Also verify via a fresh read
|
||
var fetched = await fixedRepo.GetByIdAsync(workerId);
|
||
fetched.Should().NotBeNull();
|
||
fetched!.LastHeartbeatAt.Should().BeCloseTo(FixedTime, TimeSpan.FromSeconds(1));
|
||
}
|
||
|
||
[Trait("Category", TestCategories.Unit)]
|
||
[Fact]
|
||
public async Task GetStaleWorkers_UsesTimeProvider_ForStaleComparison()
|
||
{
|
||
// Arrange — create a worker with a heartbeat at the fixed (past) time
|
||
var fixedTimeProvider = new FixedTimeProvider(FixedTime);
|
||
var fixedRepo = new WorkerRepository(
|
||
_dataSource,
|
||
NullLogger<WorkerRepository>.Instance,
|
||
fixedTimeProvider);
|
||
|
||
// Insert the worker first with system time so it exists
|
||
var systemRepo = new WorkerRepository(
|
||
_dataSource,
|
||
NullLogger<WorkerRepository>.Instance);
|
||
|
||
var worker = new WorkerEntity
|
||
{
|
||
Id = $"tp-stale-{Guid.NewGuid()}",
|
||
Hostname = "stale-host",
|
||
Status = WorkerStatus.Active,
|
||
JobTypes = ["scan"],
|
||
MaxConcurrentJobs = 2
|
||
};
|
||
await systemRepo.UpsertAsync(worker);
|
||
|
||
// Set heartbeat to fixed time (2020-06-15 12:00:00 UTC)
|
||
await fixedRepo.HeartbeatAsync(worker.Id, 0);
|
||
|
||
// Query with a "recent" fixed time — the worker's heartbeat at 2020 should be stale
|
||
// relative to 2020-06-15 12:30:00 with a stale duration of 10 minutes
|
||
var laterTime = new DateTimeOffset(2020, 6, 15, 12, 30, 0, TimeSpan.Zero);
|
||
var laterProvider = new FixedTimeProvider(laterTime);
|
||
var laterRepo = new WorkerRepository(
|
||
_dataSource,
|
||
NullLogger<WorkerRepository>.Instance,
|
||
laterProvider);
|
||
|
||
// Act — stale duration of 10 min means heartbeats before 12:20 are stale
|
||
var staleWorkers = await laterRepo.GetStaleWorkersAsync(TimeSpan.FromMinutes(10));
|
||
|
||
// Assert
|
||
staleWorkers.Should().ContainSingle(w => w.Id == worker.Id);
|
||
}
|
||
}
|