up
This commit is contained in:
@@ -28,6 +28,11 @@ public static class ServiceCollectionExtensions
|
||||
|
||||
// Register repositories
|
||||
services.AddScoped<IJobRepository, JobRepository>();
|
||||
services.AddScoped<ITriggerRepository, TriggerRepository>();
|
||||
services.AddScoped<IWorkerRepository, WorkerRepository>();
|
||||
services.AddScoped<IDistributedLockRepository, DistributedLockRepository>();
|
||||
services.AddScoped<IJobHistoryRepository, JobHistoryRepository>();
|
||||
services.AddScoped<IMetricsRepository, MetricsRepository>();
|
||||
|
||||
return services;
|
||||
}
|
||||
@@ -47,6 +52,11 @@ public static class ServiceCollectionExtensions
|
||||
|
||||
// Register repositories
|
||||
services.AddScoped<IJobRepository, JobRepository>();
|
||||
services.AddScoped<ITriggerRepository, TriggerRepository>();
|
||||
services.AddScoped<IWorkerRepository, WorkerRepository>();
|
||||
services.AddScoped<IDistributedLockRepository, DistributedLockRepository>();
|
||||
services.AddScoped<IJobHistoryRepository, JobHistoryRepository>();
|
||||
services.AddScoped<IMetricsRepository, MetricsRepository>();
|
||||
|
||||
return services;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,129 @@
|
||||
using FluentAssertions;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Extensions.Options;
|
||||
using StellaOps.Scheduler.Storage.Postgres.Repositories;
|
||||
using Xunit;
|
||||
|
||||
namespace StellaOps.Scheduler.Storage.Postgres.Tests;
|
||||
|
||||
[Collection(SchedulerPostgresCollection.Name)]
|
||||
public sealed class DistributedLockRepositoryTests : IAsyncLifetime
|
||||
{
|
||||
private readonly SchedulerPostgresFixture _fixture;
|
||||
private readonly DistributedLockRepository _repository;
|
||||
private readonly string _tenantId = Guid.NewGuid().ToString();
|
||||
|
||||
public DistributedLockRepositoryTests(SchedulerPostgresFixture fixture)
|
||||
{
|
||||
_fixture = fixture;
|
||||
|
||||
var options = fixture.Fixture.CreateOptions();
|
||||
options.SchemaName = fixture.SchemaName;
|
||||
var dataSource = new SchedulerDataSource(Options.Create(options), NullLogger<SchedulerDataSource>.Instance);
|
||||
_repository = new DistributedLockRepository(dataSource, NullLogger<DistributedLockRepository>.Instance);
|
||||
}
|
||||
|
||||
public Task InitializeAsync() => _fixture.TruncateAllTablesAsync();
|
||||
public Task DisposeAsync() => Task.CompletedTask;
|
||||
|
||||
[Fact]
|
||||
public async Task TryAcquire_SucceedsOnFirstAttempt()
|
||||
{
|
||||
// Arrange
|
||||
var lockKey = $"test-lock-{Guid.NewGuid()}";
|
||||
|
||||
// Act
|
||||
var acquired = await _repository.TryAcquireAsync(_tenantId, lockKey, "worker-1", TimeSpan.FromMinutes(5));
|
||||
|
||||
// Assert
|
||||
acquired.Should().BeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task TryAcquire_FailsWhenAlreadyHeld()
|
||||
{
|
||||
// Arrange
|
||||
var lockKey = $"contended-lock-{Guid.NewGuid()}";
|
||||
await _repository.TryAcquireAsync(_tenantId, lockKey, "worker-1", TimeSpan.FromMinutes(5));
|
||||
|
||||
// Act
|
||||
var secondAcquire = await _repository.TryAcquireAsync(_tenantId, lockKey, "worker-2", TimeSpan.FromMinutes(5));
|
||||
|
||||
// Assert
|
||||
secondAcquire.Should().BeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Release_AllowsReacquisition()
|
||||
{
|
||||
// Arrange
|
||||
var lockKey = $"release-test-{Guid.NewGuid()}";
|
||||
await _repository.TryAcquireAsync(_tenantId, lockKey, "worker-1", TimeSpan.FromMinutes(5));
|
||||
|
||||
// Act
|
||||
await _repository.ReleaseAsync(lockKey, "worker-1");
|
||||
var reacquired = await _repository.TryAcquireAsync(_tenantId, lockKey, "worker-2", TimeSpan.FromMinutes(5));
|
||||
|
||||
// Assert
|
||||
reacquired.Should().BeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Extend_ExtendsLockDuration()
|
||||
{
|
||||
// Arrange
|
||||
var lockKey = $"extend-test-{Guid.NewGuid()}";
|
||||
await _repository.TryAcquireAsync(_tenantId, lockKey, "worker-1", TimeSpan.FromMinutes(1));
|
||||
|
||||
// Act
|
||||
var extended = await _repository.ExtendAsync(lockKey, "worker-1", TimeSpan.FromMinutes(10));
|
||||
|
||||
// Assert
|
||||
extended.Should().BeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Extend_FailsForDifferentHolder()
|
||||
{
|
||||
// Arrange
|
||||
var lockKey = $"extend-fail-{Guid.NewGuid()}";
|
||||
await _repository.TryAcquireAsync(_tenantId, lockKey, "worker-1", TimeSpan.FromMinutes(5));
|
||||
|
||||
// Act
|
||||
var extended = await _repository.ExtendAsync(lockKey, "worker-2", TimeSpan.FromMinutes(10));
|
||||
|
||||
// Assert
|
||||
extended.Should().BeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Get_ReturnsLockInfo()
|
||||
{
|
||||
// Arrange
|
||||
var lockKey = $"get-test-{Guid.NewGuid()}";
|
||||
await _repository.TryAcquireAsync(_tenantId, lockKey, "worker-1", TimeSpan.FromMinutes(5));
|
||||
|
||||
// Act
|
||||
var lockInfo = await _repository.GetAsync(lockKey);
|
||||
|
||||
// Assert
|
||||
lockInfo.Should().NotBeNull();
|
||||
lockInfo!.HolderId.Should().Be("worker-1");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ListByTenant_ReturnsTenantsLocks()
|
||||
{
|
||||
// Arrange
|
||||
var lockKey1 = $"tenant-lock-1-{Guid.NewGuid()}";
|
||||
var lockKey2 = $"tenant-lock-2-{Guid.NewGuid()}";
|
||||
await _repository.TryAcquireAsync(_tenantId, lockKey1, "worker-1", TimeSpan.FromMinutes(5));
|
||||
await _repository.TryAcquireAsync(_tenantId, lockKey2, "worker-1", TimeSpan.FromMinutes(5));
|
||||
|
||||
// Act
|
||||
var locks = await _repository.ListByTenantAsync(_tenantId);
|
||||
|
||||
// Assert
|
||||
locks.Should().HaveCount(2);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,198 @@
|
||||
using FluentAssertions;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Extensions.Options;
|
||||
using StellaOps.Scheduler.Storage.Postgres.Models;
|
||||
using StellaOps.Scheduler.Storage.Postgres.Repositories;
|
||||
using Xunit;
|
||||
|
||||
namespace StellaOps.Scheduler.Storage.Postgres.Tests;
|
||||
|
||||
[Collection(SchedulerPostgresCollection.Name)]
|
||||
public sealed class TriggerRepositoryTests : IAsyncLifetime
|
||||
{
|
||||
private readonly SchedulerPostgresFixture _fixture;
|
||||
private readonly TriggerRepository _repository;
|
||||
private readonly string _tenantId = Guid.NewGuid().ToString();
|
||||
|
||||
public TriggerRepositoryTests(SchedulerPostgresFixture fixture)
|
||||
{
|
||||
_fixture = fixture;
|
||||
|
||||
var options = fixture.Fixture.CreateOptions();
|
||||
options.SchemaName = fixture.SchemaName;
|
||||
var dataSource = new SchedulerDataSource(Options.Create(options), NullLogger<SchedulerDataSource>.Instance);
|
||||
_repository = new TriggerRepository(dataSource, NullLogger<TriggerRepository>.Instance);
|
||||
}
|
||||
|
||||
public Task InitializeAsync() => _fixture.TruncateAllTablesAsync();
|
||||
public Task DisposeAsync() => Task.CompletedTask;
|
||||
|
||||
[Fact]
|
||||
public async Task CreateAndGet_RoundTripsTrigger()
|
||||
{
|
||||
// Arrange
|
||||
var trigger = new TriggerEntity
|
||||
{
|
||||
Id = Guid.NewGuid(),
|
||||
TenantId = _tenantId,
|
||||
Name = "daily-scan",
|
||||
Description = "Daily vulnerability scan",
|
||||
JobType = "scan",
|
||||
JobPayload = "{\"target\": \"registry.example.com\"}",
|
||||
CronExpression = "0 0 * * *",
|
||||
Timezone = "UTC",
|
||||
Enabled = true,
|
||||
NextFireAt = DateTimeOffset.UtcNow.AddDays(1)
|
||||
};
|
||||
|
||||
// Act
|
||||
await _repository.CreateAsync(trigger);
|
||||
var fetched = await _repository.GetByIdAsync(_tenantId, trigger.Id);
|
||||
|
||||
// Assert
|
||||
fetched.Should().NotBeNull();
|
||||
fetched!.Id.Should().Be(trigger.Id);
|
||||
fetched.Name.Should().Be("daily-scan");
|
||||
fetched.JobType.Should().Be("scan");
|
||||
fetched.CronExpression.Should().Be("0 0 * * *");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task GetByName_ReturnsCorrectTrigger()
|
||||
{
|
||||
// Arrange
|
||||
var trigger = CreateTrigger("weekly-report", "0 0 * * 0");
|
||||
await _repository.CreateAsync(trigger);
|
||||
|
||||
// Act
|
||||
var fetched = await _repository.GetByNameAsync(_tenantId, "weekly-report");
|
||||
|
||||
// Assert
|
||||
fetched.Should().NotBeNull();
|
||||
fetched!.Id.Should().Be(trigger.Id);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task List_ReturnsAllTriggersForTenant()
|
||||
{
|
||||
// Arrange
|
||||
var trigger1 = CreateTrigger("trigger1", "0 * * * *");
|
||||
var trigger2 = CreateTrigger("trigger2", "0 0 * * *");
|
||||
await _repository.CreateAsync(trigger1);
|
||||
await _repository.CreateAsync(trigger2);
|
||||
|
||||
// Act
|
||||
var triggers = await _repository.ListAsync(_tenantId);
|
||||
|
||||
// Assert
|
||||
triggers.Should().HaveCount(2);
|
||||
triggers.Select(t => t.Name).Should().Contain(["trigger1", "trigger2"]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task GetDueTriggers_ReturnsTriggersReadyToFire()
|
||||
{
|
||||
// Arrange - One due trigger, one future trigger
|
||||
var dueTrigger = CreateTrigger("due", "* * * * *");
|
||||
dueTrigger = new TriggerEntity
|
||||
{
|
||||
Id = dueTrigger.Id,
|
||||
TenantId = dueTrigger.TenantId,
|
||||
Name = dueTrigger.Name,
|
||||
JobType = dueTrigger.JobType,
|
||||
CronExpression = dueTrigger.CronExpression,
|
||||
NextFireAt = DateTimeOffset.UtcNow.AddMinutes(-1), // Due
|
||||
Enabled = true
|
||||
};
|
||||
|
||||
var futureTrigger = CreateTrigger("future", "0 0 * * *");
|
||||
futureTrigger = new TriggerEntity
|
||||
{
|
||||
Id = futureTrigger.Id,
|
||||
TenantId = futureTrigger.TenantId,
|
||||
Name = futureTrigger.Name,
|
||||
JobType = futureTrigger.JobType,
|
||||
CronExpression = futureTrigger.CronExpression,
|
||||
NextFireAt = DateTimeOffset.UtcNow.AddDays(1), // Not due
|
||||
Enabled = true
|
||||
};
|
||||
|
||||
await _repository.CreateAsync(dueTrigger);
|
||||
await _repository.CreateAsync(futureTrigger);
|
||||
|
||||
// Act
|
||||
var dueTriggers = await _repository.GetDueTriggersAsync();
|
||||
|
||||
// Assert
|
||||
dueTriggers.Should().HaveCount(1);
|
||||
dueTriggers[0].Name.Should().Be("due");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task RecordFire_UpdatesTriggerState()
|
||||
{
|
||||
// Arrange
|
||||
var trigger = CreateTrigger("fire-test", "* * * * *");
|
||||
await _repository.CreateAsync(trigger);
|
||||
var jobId = Guid.NewGuid();
|
||||
var nextFireAt = DateTimeOffset.UtcNow.AddMinutes(1);
|
||||
|
||||
// Act
|
||||
var result = await _repository.RecordFireAsync(_tenantId, trigger.Id, jobId, nextFireAt);
|
||||
var fetched = await _repository.GetByIdAsync(_tenantId, trigger.Id);
|
||||
|
||||
// Assert
|
||||
result.Should().BeTrue();
|
||||
fetched!.LastJobId.Should().Be(jobId);
|
||||
fetched.NextFireAt.Should().BeCloseTo(nextFireAt, TimeSpan.FromSeconds(1));
|
||||
fetched.FireCount.Should().Be(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SetEnabled_TogglesEnableState()
|
||||
{
|
||||
// Arrange
|
||||
var trigger = CreateTrigger("toggle-test", "* * * * *");
|
||||
await _repository.CreateAsync(trigger);
|
||||
|
||||
// Act - Disable
|
||||
await _repository.SetEnabledAsync(_tenantId, trigger.Id, false);
|
||||
var disabled = await _repository.GetByIdAsync(_tenantId, trigger.Id);
|
||||
|
||||
// Assert
|
||||
disabled!.Enabled.Should().BeFalse();
|
||||
|
||||
// Act - Re-enable
|
||||
await _repository.SetEnabledAsync(_tenantId, trigger.Id, true);
|
||||
var enabled = await _repository.GetByIdAsync(_tenantId, trigger.Id);
|
||||
|
||||
// Assert
|
||||
enabled!.Enabled.Should().BeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Delete_RemovesTrigger()
|
||||
{
|
||||
// Arrange
|
||||
var trigger = CreateTrigger("delete-test", "* * * * *");
|
||||
await _repository.CreateAsync(trigger);
|
||||
|
||||
// Act
|
||||
await _repository.DeleteAsync(_tenantId, trigger.Id);
|
||||
var fetched = await _repository.GetByIdAsync(_tenantId, trigger.Id);
|
||||
|
||||
// Assert
|
||||
fetched.Should().BeNull();
|
||||
}
|
||||
|
||||
private TriggerEntity CreateTrigger(string name, string cron) => new()
|
||||
{
|
||||
Id = Guid.NewGuid(),
|
||||
TenantId = _tenantId,
|
||||
Name = name,
|
||||
JobType = "test-job",
|
||||
CronExpression = cron,
|
||||
Enabled = true,
|
||||
NextFireAt = DateTimeOffset.UtcNow.AddHours(1)
|
||||
};
|
||||
}
|
||||
@@ -0,0 +1,155 @@
|
||||
using FluentAssertions;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Extensions.Options;
|
||||
using StellaOps.Scheduler.Storage.Postgres.Models;
|
||||
using StellaOps.Scheduler.Storage.Postgres.Repositories;
|
||||
using Xunit;
|
||||
|
||||
namespace StellaOps.Scheduler.Storage.Postgres.Tests;
|
||||
|
||||
[Collection(SchedulerPostgresCollection.Name)]
|
||||
public sealed class WorkerRepositoryTests : IAsyncLifetime
|
||||
{
|
||||
private readonly SchedulerPostgresFixture _fixture;
|
||||
private readonly WorkerRepository _repository;
|
||||
|
||||
public WorkerRepositoryTests(SchedulerPostgresFixture fixture)
|
||||
{
|
||||
_fixture = fixture;
|
||||
|
||||
var options = fixture.Fixture.CreateOptions();
|
||||
options.SchemaName = fixture.SchemaName;
|
||||
var dataSource = new SchedulerDataSource(Options.Create(options), NullLogger<SchedulerDataSource>.Instance);
|
||||
_repository = new WorkerRepository(dataSource, NullLogger<WorkerRepository>.Instance);
|
||||
}
|
||||
|
||||
public Task InitializeAsync() => _fixture.TruncateAllTablesAsync();
|
||||
public Task DisposeAsync() => Task.CompletedTask;
|
||||
|
||||
[Fact]
|
||||
public async Task UpsertAndGet_RoundTripsWorker()
|
||||
{
|
||||
// Arrange
|
||||
var worker = new WorkerEntity
|
||||
{
|
||||
Id = $"worker-{Guid.NewGuid()}",
|
||||
Hostname = "node-01.cluster.local",
|
||||
Status = WorkerStatus.Active,
|
||||
JobTypes = ["scan", "sbom"],
|
||||
MaxConcurrentJobs = 4
|
||||
};
|
||||
|
||||
// Act
|
||||
await _repository.UpsertAsync(worker);
|
||||
var fetched = await _repository.GetByIdAsync(worker.Id);
|
||||
|
||||
// Assert
|
||||
fetched.Should().NotBeNull();
|
||||
fetched!.Id.Should().Be(worker.Id);
|
||||
fetched.Hostname.Should().Be("node-01.cluster.local");
|
||||
fetched.JobTypes.Should().BeEquivalentTo(["scan", "sbom"]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Heartbeat_UpdatesLastHeartbeat()
|
||||
{
|
||||
// Arrange
|
||||
var worker = CreateWorker();
|
||||
await _repository.UpsertAsync(worker);
|
||||
|
||||
// Act
|
||||
await Task.Delay(100); // Ensure time difference
|
||||
await _repository.HeartbeatAsync(worker.Id, 2);
|
||||
var fetched = await _repository.GetByIdAsync(worker.Id);
|
||||
|
||||
// Assert
|
||||
fetched!.LastHeartbeatAt.Should().BeCloseTo(DateTimeOffset.UtcNow, TimeSpan.FromSeconds(5));
|
||||
fetched.CurrentJobs.Should().Be(2);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ListByStatus_ReturnsWorkersWithStatus()
|
||||
{
|
||||
// Arrange
|
||||
var activeWorker = CreateWorker();
|
||||
var drainingWorker = new WorkerEntity
|
||||
{
|
||||
Id = $"draining-{Guid.NewGuid()}",
|
||||
Hostname = "node-02",
|
||||
Status = WorkerStatus.Draining,
|
||||
JobTypes = ["scan"],
|
||||
MaxConcurrentJobs = 4
|
||||
};
|
||||
await _repository.UpsertAsync(activeWorker);
|
||||
await _repository.UpsertAsync(drainingWorker);
|
||||
|
||||
// Act
|
||||
var activeWorkers = await _repository.ListByStatusAsync(WorkerStatus.Active);
|
||||
|
||||
// Assert
|
||||
activeWorkers.Should().HaveCount(1);
|
||||
activeWorkers[0].Id.Should().Be(activeWorker.Id);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SetStatus_ChangesWorkerStatus()
|
||||
{
|
||||
// Arrange
|
||||
var worker = CreateWorker();
|
||||
await _repository.UpsertAsync(worker);
|
||||
|
||||
// Act
|
||||
await _repository.SetStatusAsync(worker.Id, WorkerStatus.Draining);
|
||||
var fetched = await _repository.GetByIdAsync(worker.Id);
|
||||
|
||||
// Assert
|
||||
fetched!.Status.Should().Be(WorkerStatus.Draining);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Delete_RemovesWorker()
|
||||
{
|
||||
// Arrange
|
||||
var worker = CreateWorker();
|
||||
await _repository.UpsertAsync(worker);
|
||||
|
||||
// Act
|
||||
await _repository.DeleteAsync(worker.Id);
|
||||
var fetched = await _repository.GetByIdAsync(worker.Id);
|
||||
|
||||
// Assert
|
||||
fetched.Should().BeNull();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task List_ReturnsAllWorkers()
|
||||
{
|
||||
// Arrange
|
||||
var worker1 = CreateWorker();
|
||||
var worker2 = new WorkerEntity
|
||||
{
|
||||
Id = $"worker2-{Guid.NewGuid()}",
|
||||
Hostname = "node-02",
|
||||
Status = WorkerStatus.Active,
|
||||
JobTypes = ["scan"],
|
||||
MaxConcurrentJobs = 2
|
||||
};
|
||||
await _repository.UpsertAsync(worker1);
|
||||
await _repository.UpsertAsync(worker2);
|
||||
|
||||
// Act
|
||||
var workers = await _repository.ListAsync();
|
||||
|
||||
// Assert
|
||||
workers.Should().HaveCount(2);
|
||||
}
|
||||
|
||||
private WorkerEntity CreateWorker() => new()
|
||||
{
|
||||
Id = $"worker-{Guid.NewGuid()}",
|
||||
Hostname = "test-host",
|
||||
Status = WorkerStatus.Active,
|
||||
JobTypes = ["scan"],
|
||||
MaxConcurrentJobs = 4
|
||||
};
|
||||
}
|
||||
Reference in New Issue
Block a user