feat: Implement vulnerability token signing and verification utilities
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
- Added VulnTokenSigner for signing JWT tokens with specified algorithms and keys. - Introduced VulnTokenUtilities for resolving tenant and subject claims, and sanitizing context dictionaries. - Created VulnTokenVerificationUtilities for parsing tokens, verifying signatures, and deserializing payloads. - Developed VulnWorkflowAntiForgeryTokenIssuer for issuing anti-forgery tokens with configurable options. - Implemented VulnWorkflowAntiForgeryTokenVerifier for verifying anti-forgery tokens and validating payloads. - Added AuthorityVulnerabilityExplorerOptions to manage configuration for vulnerability explorer features. - Included tests for FilesystemPackRunDispatcher to ensure proper job handling under egress policy restrictions.
This commit is contained in:
@@ -1,411 +1,411 @@
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Immutable;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using MongoDB.Driver;
|
||||
using StellaOps.Scheduler.Queue;
|
||||
using StellaOps.Scheduler.Storage.Mongo.Projections;
|
||||
using StellaOps.Scheduler.Storage.Mongo.Repositories;
|
||||
using StellaOps.Scheduler.Storage.Mongo.Services;
|
||||
using StellaOps.Scheduler.Worker.Options;
|
||||
using StellaOps.Scheduler.Worker.Observability;
|
||||
using StellaOps.Scheduler.Worker.Planning;
|
||||
|
||||
namespace StellaOps.Scheduler.Worker.Tests;
|
||||
|
||||
public sealed class PlannerBackgroundServiceTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_RespectsTenantFairnessCap()
|
||||
{
|
||||
var timeProvider = new TestTimeProvider(DateTimeOffset.Parse("2025-10-27T12:00:00Z"));
|
||||
|
||||
var runs = new[]
|
||||
{
|
||||
CreateRun("run-a1", "tenant-a", RunTrigger.Manual, timeProvider.GetUtcNow().AddMinutes(1), "schedule-a"),
|
||||
CreateRun("run-a2", "tenant-a", RunTrigger.Cron, timeProvider.GetUtcNow().AddMinutes(2), "schedule-a"),
|
||||
CreateRun("run-b1", "tenant-b", RunTrigger.Feedser, timeProvider.GetUtcNow().AddMinutes(3), "schedule-b"),
|
||||
CreateRun("run-c1", "tenant-c", RunTrigger.Cron, timeProvider.GetUtcNow().AddMinutes(4), "schedule-c"),
|
||||
};
|
||||
|
||||
var repository = new TestRunRepository(runs, Array.Empty<Run>());
|
||||
var options = CreateOptions(maxConcurrentTenants: 2);
|
||||
var scheduleRepository = new TestScheduleRepository(runs.Select(run => CreateSchedule(run.ScheduleId!, run.TenantId, timeProvider.GetUtcNow())));
|
||||
var snapshotRepository = new StubImpactSnapshotRepository();
|
||||
var runSummaryService = new StubRunSummaryService(timeProvider);
|
||||
var plannerQueue = new RecordingPlannerQueue();
|
||||
var targetingService = new StubImpactTargetingService(timeProvider);
|
||||
|
||||
using var metrics = new SchedulerWorkerMetrics();
|
||||
var executionService = new PlannerExecutionService(
|
||||
scheduleRepository,
|
||||
repository,
|
||||
snapshotRepository,
|
||||
runSummaryService,
|
||||
targetingService,
|
||||
plannerQueue,
|
||||
options,
|
||||
timeProvider,
|
||||
metrics,
|
||||
NullLogger<PlannerExecutionService>.Instance);
|
||||
|
||||
var service = new PlannerBackgroundService(
|
||||
repository,
|
||||
executionService,
|
||||
options,
|
||||
timeProvider,
|
||||
NullLogger<PlannerBackgroundService>.Instance);
|
||||
|
||||
await service.StartAsync(CancellationToken.None);
|
||||
try
|
||||
{
|
||||
await WaitForConditionAsync(() => repository.UpdateCount >= 2);
|
||||
}
|
||||
finally
|
||||
{
|
||||
await service.StopAsync(CancellationToken.None);
|
||||
}
|
||||
|
||||
var processedIds = repository.UpdatedRuns.Select(run => run.Id).ToArray();
|
||||
Assert.Equal(new[] { "run-a1", "run-b1" }, processedIds);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_PrioritizesManualAndEventTriggers()
|
||||
{
|
||||
var timeProvider = new TestTimeProvider(DateTimeOffset.Parse("2025-10-27T18:00:00Z"));
|
||||
|
||||
var runs = new[]
|
||||
{
|
||||
CreateRun("run-cron", "tenant-alpha", RunTrigger.Cron, timeProvider.GetUtcNow().AddMinutes(1), "schedule-cron"),
|
||||
CreateRun("run-feedser", "tenant-bravo", RunTrigger.Feedser, timeProvider.GetUtcNow().AddMinutes(2), "schedule-feedser"),
|
||||
CreateRun("run-manual", "tenant-charlie", RunTrigger.Manual, timeProvider.GetUtcNow().AddMinutes(3), "schedule-manual"),
|
||||
CreateRun("run-vexer", "tenant-delta", RunTrigger.Vexer, timeProvider.GetUtcNow().AddMinutes(4), "schedule-vexer"),
|
||||
};
|
||||
|
||||
var repository = new TestRunRepository(runs, Array.Empty<Run>());
|
||||
var options = CreateOptions(maxConcurrentTenants: 4);
|
||||
var scheduleRepository = new TestScheduleRepository(runs.Select(run => CreateSchedule(run.ScheduleId!, run.TenantId, timeProvider.GetUtcNow())));
|
||||
var snapshotRepository = new StubImpactSnapshotRepository();
|
||||
var runSummaryService = new StubRunSummaryService(timeProvider);
|
||||
var plannerQueue = new RecordingPlannerQueue();
|
||||
var targetingService = new StubImpactTargetingService(timeProvider);
|
||||
|
||||
using var metrics = new SchedulerWorkerMetrics();
|
||||
var executionService = new PlannerExecutionService(
|
||||
scheduleRepository,
|
||||
repository,
|
||||
snapshotRepository,
|
||||
runSummaryService,
|
||||
targetingService,
|
||||
plannerQueue,
|
||||
options,
|
||||
timeProvider,
|
||||
metrics,
|
||||
NullLogger<PlannerExecutionService>.Instance);
|
||||
|
||||
var service = new PlannerBackgroundService(
|
||||
repository,
|
||||
executionService,
|
||||
options,
|
||||
timeProvider,
|
||||
NullLogger<PlannerBackgroundService>.Instance);
|
||||
|
||||
await service.StartAsync(CancellationToken.None);
|
||||
try
|
||||
{
|
||||
await WaitForConditionAsync(() => repository.UpdateCount >= runs.Length);
|
||||
}
|
||||
finally
|
||||
{
|
||||
await service.StopAsync(CancellationToken.None);
|
||||
}
|
||||
|
||||
var processedIds = repository.UpdatedRuns.Select(run => run.Id).ToArray();
|
||||
Assert.Equal(new[] { "run-manual", "run-feedser", "run-vexer", "run-cron" }, processedIds);
|
||||
}
|
||||
|
||||
private static SchedulerWorkerOptions CreateOptions(int maxConcurrentTenants)
|
||||
{
|
||||
return new SchedulerWorkerOptions
|
||||
{
|
||||
Planner =
|
||||
{
|
||||
BatchSize = 20,
|
||||
PollInterval = TimeSpan.FromMilliseconds(1),
|
||||
IdleDelay = TimeSpan.FromMilliseconds(1),
|
||||
MaxConcurrentTenants = maxConcurrentTenants,
|
||||
MaxRunsPerMinute = int.MaxValue,
|
||||
QueueLeaseDuration = TimeSpan.FromMinutes(5)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static Run CreateRun(
|
||||
string id,
|
||||
string tenantId,
|
||||
RunTrigger trigger,
|
||||
DateTimeOffset createdAt,
|
||||
string scheduleId)
|
||||
=> new(
|
||||
id: id,
|
||||
tenantId: tenantId,
|
||||
trigger: trigger,
|
||||
state: RunState.Planning,
|
||||
stats: RunStats.Empty,
|
||||
createdAt: createdAt,
|
||||
reason: RunReason.Empty,
|
||||
scheduleId: scheduleId);
|
||||
|
||||
private static Schedule CreateSchedule(string scheduleId, string tenantId, DateTimeOffset now)
|
||||
=> new(
|
||||
id: scheduleId,
|
||||
tenantId: tenantId,
|
||||
name: $"Schedule-{scheduleId}",
|
||||
enabled: true,
|
||||
cronExpression: "0 2 * * *",
|
||||
timezone: "UTC",
|
||||
mode: ScheduleMode.AnalysisOnly,
|
||||
selection: new Selector(SelectorScope.AllImages, tenantId),
|
||||
onlyIf: ScheduleOnlyIf.Default,
|
||||
notify: ScheduleNotify.Default,
|
||||
limits: ScheduleLimits.Default,
|
||||
createdAt: now,
|
||||
createdBy: "system",
|
||||
updatedAt: now,
|
||||
updatedBy: "system",
|
||||
subscribers: ImmutableArray<string>.Empty);
|
||||
|
||||
private static async Task WaitForConditionAsync(Func<bool> predicate, TimeSpan? timeout = null)
|
||||
{
|
||||
var deadline = DateTime.UtcNow + (timeout ?? TimeSpan.FromSeconds(1));
|
||||
while (!predicate())
|
||||
{
|
||||
if (DateTime.UtcNow > deadline)
|
||||
{
|
||||
throw new TimeoutException("Planner background service did not reach expected state within the allotted time.");
|
||||
}
|
||||
|
||||
await Task.Delay(10);
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class TestRunRepository : IRunRepository
|
||||
{
|
||||
private readonly Queue<IReadOnlyList<Run>> _responses;
|
||||
private readonly ConcurrentQueue<Run> _updates = new();
|
||||
private int _updateCount;
|
||||
|
||||
public TestRunRepository(params IReadOnlyList<Run>[] responses)
|
||||
{
|
||||
if (responses is null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(responses));
|
||||
}
|
||||
|
||||
_responses = new Queue<IReadOnlyList<Run>>(responses.Select(static runs => (IReadOnlyList<Run>)runs.ToArray()));
|
||||
}
|
||||
|
||||
public int UpdateCount => Volatile.Read(ref _updateCount);
|
||||
|
||||
public IReadOnlyList<Run> UpdatedRuns => _updates.ToArray();
|
||||
|
||||
public Task InsertAsync(Run run, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
||||
=> throw new NotSupportedException();
|
||||
|
||||
public Task<bool> UpdateAsync(Run run, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
||||
{
|
||||
_updates.Enqueue(run);
|
||||
Interlocked.Increment(ref _updateCount);
|
||||
return Task.FromResult(true);
|
||||
}
|
||||
|
||||
public Task<Run?> GetAsync(string tenantId, string runId, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
||||
=> Task.FromResult<Run?>(null);
|
||||
|
||||
public Task<IReadOnlyList<Run>> ListAsync(string tenantId, RunQueryOptions? options = null, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
||||
=> Task.FromResult<IReadOnlyList<Run>>(Array.Empty<Run>());
|
||||
|
||||
public Task<IReadOnlyList<Run>> ListByStateAsync(RunState state, int limit = 50, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (state != RunState.Planning)
|
||||
{
|
||||
return Task.FromResult<IReadOnlyList<Run>>(Array.Empty<Run>());
|
||||
}
|
||||
|
||||
var next = _responses.Count > 0 ? _responses.Dequeue() : Array.Empty<Run>();
|
||||
|
||||
if (next.Count > limit)
|
||||
{
|
||||
next = next.Take(limit).ToArray();
|
||||
}
|
||||
|
||||
return Task.FromResult(next);
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class TestScheduleRepository : IScheduleRepository
|
||||
{
|
||||
public TestScheduleRepository(IEnumerable<Schedule> schedules)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(schedules);
|
||||
|
||||
_schedules = new Dictionary<(string TenantId, string ScheduleId), Schedule>();
|
||||
foreach (var schedule in schedules)
|
||||
{
|
||||
if (schedule is null)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
_schedules[(schedule.TenantId, schedule.Id)] = schedule;
|
||||
}
|
||||
}
|
||||
|
||||
private readonly Dictionary<(string TenantId, string ScheduleId), Schedule> _schedules;
|
||||
|
||||
public Task UpsertAsync(Schedule schedule, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
||||
{
|
||||
_schedules[(schedule.TenantId, schedule.Id)] = schedule;
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task<Schedule?> GetAsync(string tenantId, string scheduleId, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
||||
{
|
||||
_schedules.TryGetValue((tenantId, scheduleId), out var schedule);
|
||||
return Task.FromResult(schedule);
|
||||
}
|
||||
|
||||
public Task<IReadOnlyList<Schedule>> ListAsync(string tenantId, ScheduleQueryOptions? options = null, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var results = _schedules.Values.Where(schedule => schedule.TenantId == tenantId).ToArray();
|
||||
return Task.FromResult<IReadOnlyList<Schedule>>(results);
|
||||
}
|
||||
|
||||
public Task<bool> SoftDeleteAsync(string tenantId, string scheduleId, string deletedBy, DateTimeOffset deletedAt, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var removed = _schedules.Remove((tenantId, scheduleId));
|
||||
return Task.FromResult(removed);
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class StubImpactSnapshotRepository : IImpactSnapshotRepository
|
||||
{
|
||||
public ImpactSet? LastSnapshot { get; private set; }
|
||||
|
||||
public Task UpsertAsync(ImpactSet snapshot, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
||||
{
|
||||
LastSnapshot = snapshot;
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task<ImpactSet?> GetBySnapshotIdAsync(string snapshotId, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
||||
=> Task.FromResult<ImpactSet?>(null);
|
||||
|
||||
public Task<ImpactSet?> GetLatestBySelectorAsync(Selector selector, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
||||
=> Task.FromResult<ImpactSet?>(null);
|
||||
}
|
||||
|
||||
private sealed class StubRunSummaryService : IRunSummaryService
|
||||
{
|
||||
private readonly TimeProvider _timeProvider;
|
||||
|
||||
public StubRunSummaryService(TimeProvider timeProvider)
|
||||
{
|
||||
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
|
||||
}
|
||||
|
||||
public Task<RunSummaryProjection> ProjectAsync(Run run, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var projection = new RunSummaryProjection(
|
||||
run.TenantId,
|
||||
run.ScheduleId ?? string.Empty,
|
||||
_timeProvider.GetUtcNow(),
|
||||
null,
|
||||
ImmutableArray<RunSummarySnapshot>.Empty,
|
||||
new RunSummaryCounters(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0));
|
||||
|
||||
return Task.FromResult(projection);
|
||||
}
|
||||
|
||||
public Task<RunSummaryProjection?> GetAsync(string tenantId, string scheduleId, CancellationToken cancellationToken = default)
|
||||
=> Task.FromResult<RunSummaryProjection?>(null);
|
||||
|
||||
public Task<IReadOnlyList<RunSummaryProjection>> ListAsync(string tenantId, CancellationToken cancellationToken = default)
|
||||
=> Task.FromResult<IReadOnlyList<RunSummaryProjection>>(Array.Empty<RunSummaryProjection>());
|
||||
}
|
||||
|
||||
private sealed class StubImpactTargetingService : IImpactTargetingService
|
||||
{
|
||||
private static readonly string DefaultDigest = "sha256:" + new string('a', 64);
|
||||
private readonly TimeProvider _timeProvider;
|
||||
|
||||
public StubImpactTargetingService(TimeProvider timeProvider)
|
||||
{
|
||||
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
|
||||
}
|
||||
|
||||
public ValueTask<ImpactSet> ResolveByPurlsAsync(IEnumerable<string> productKeys, bool usageOnly, Selector selector, CancellationToken cancellationToken = default)
|
||||
=> throw new NotSupportedException();
|
||||
|
||||
public ValueTask<ImpactSet> ResolveByVulnerabilitiesAsync(IEnumerable<string> vulnerabilityIds, bool usageOnly, Selector selector, CancellationToken cancellationToken = default)
|
||||
=> throw new NotSupportedException();
|
||||
|
||||
public ValueTask<ImpactSet> ResolveAllAsync(Selector selector, bool usageOnly, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var image = new ImpactImage(
|
||||
DefaultDigest,
|
||||
registry: "registry.test",
|
||||
repository: "repo/sample",
|
||||
namespaces: new[] { selector.TenantId ?? "unknown" },
|
||||
tags: new[] { "latest" },
|
||||
usedByEntrypoint: true);
|
||||
|
||||
var impactSet = new ImpactSet(
|
||||
selector,
|
||||
ImmutableArray.Create(image),
|
||||
usageOnly,
|
||||
_timeProvider.GetUtcNow(),
|
||||
total: 1,
|
||||
snapshotId: null,
|
||||
schemaVersion: SchedulerSchemaVersions.ImpactSet);
|
||||
|
||||
return ValueTask.FromResult(impactSet);
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class RecordingPlannerQueue : ISchedulerPlannerQueue
|
||||
{
|
||||
private readonly ConcurrentQueue<PlannerQueueMessage> _messages = new();
|
||||
|
||||
public IReadOnlyList<PlannerQueueMessage> Messages => _messages.ToArray();
|
||||
|
||||
public ValueTask<SchedulerQueueEnqueueResult> EnqueueAsync(PlannerQueueMessage message, CancellationToken cancellationToken = default)
|
||||
{
|
||||
_messages.Enqueue(message);
|
||||
return ValueTask.FromResult(new SchedulerQueueEnqueueResult(message.Run.Id, Deduplicated: false));
|
||||
}
|
||||
|
||||
public ValueTask<IReadOnlyList<ISchedulerQueueLease<PlannerQueueMessage>>> LeaseAsync(SchedulerQueueLeaseRequest request, CancellationToken cancellationToken = default)
|
||||
=> ValueTask.FromResult<IReadOnlyList<ISchedulerQueueLease<PlannerQueueMessage>>>(Array.Empty<ISchedulerQueueLease<PlannerQueueMessage>>());
|
||||
|
||||
public ValueTask<IReadOnlyList<ISchedulerQueueLease<PlannerQueueMessage>>> ClaimExpiredAsync(SchedulerQueueClaimOptions options, CancellationToken cancellationToken = default)
|
||||
=> ValueTask.FromResult<IReadOnlyList<ISchedulerQueueLease<PlannerQueueMessage>>>(Array.Empty<ISchedulerQueueLease<PlannerQueueMessage>>());
|
||||
}
|
||||
|
||||
private sealed class TestTimeProvider : TimeProvider
|
||||
{
|
||||
private DateTimeOffset _now;
|
||||
|
||||
public TestTimeProvider(DateTimeOffset initial)
|
||||
{
|
||||
_now = initial;
|
||||
}
|
||||
|
||||
public override DateTimeOffset GetUtcNow() => _now;
|
||||
|
||||
public void Advance(TimeSpan delta) => _now = _now.Add(delta);
|
||||
}
|
||||
}
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Immutable;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using MongoDB.Driver;
|
||||
using StellaOps.Scheduler.Queue;
|
||||
using StellaOps.Scheduler.Storage.Mongo.Projections;
|
||||
using StellaOps.Scheduler.Storage.Mongo.Repositories;
|
||||
using StellaOps.Scheduler.Storage.Mongo.Services;
|
||||
using StellaOps.Scheduler.Worker.Options;
|
||||
using StellaOps.Scheduler.Worker.Observability;
|
||||
using StellaOps.Scheduler.Worker.Planning;
|
||||
|
||||
namespace StellaOps.Scheduler.Worker.Tests;
|
||||
|
||||
public sealed class PlannerBackgroundServiceTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_RespectsTenantFairnessCap()
|
||||
{
|
||||
var timeProvider = new TestTimeProvider(DateTimeOffset.Parse("2025-10-27T12:00:00Z"));
|
||||
|
||||
var runs = new[]
|
||||
{
|
||||
CreateRun("run-a1", "tenant-a", RunTrigger.Manual, timeProvider.GetUtcNow().AddMinutes(1), "schedule-a"),
|
||||
CreateRun("run-a2", "tenant-a", RunTrigger.Cron, timeProvider.GetUtcNow().AddMinutes(2), "schedule-a"),
|
||||
CreateRun("run-b1", "tenant-b", RunTrigger.Conselier, timeProvider.GetUtcNow().AddMinutes(3), "schedule-b"),
|
||||
CreateRun("run-c1", "tenant-c", RunTrigger.Cron, timeProvider.GetUtcNow().AddMinutes(4), "schedule-c"),
|
||||
};
|
||||
|
||||
var repository = new TestRunRepository(runs, Array.Empty<Run>());
|
||||
var options = CreateOptions(maxConcurrentTenants: 2);
|
||||
var scheduleRepository = new TestScheduleRepository(runs.Select(run => CreateSchedule(run.ScheduleId!, run.TenantId, timeProvider.GetUtcNow())));
|
||||
var snapshotRepository = new StubImpactSnapshotRepository();
|
||||
var runSummaryService = new StubRunSummaryService(timeProvider);
|
||||
var plannerQueue = new RecordingPlannerQueue();
|
||||
var targetingService = new StubImpactTargetingService(timeProvider);
|
||||
|
||||
using var metrics = new SchedulerWorkerMetrics();
|
||||
var executionService = new PlannerExecutionService(
|
||||
scheduleRepository,
|
||||
repository,
|
||||
snapshotRepository,
|
||||
runSummaryService,
|
||||
targetingService,
|
||||
plannerQueue,
|
||||
options,
|
||||
timeProvider,
|
||||
metrics,
|
||||
NullLogger<PlannerExecutionService>.Instance);
|
||||
|
||||
var service = new PlannerBackgroundService(
|
||||
repository,
|
||||
executionService,
|
||||
options,
|
||||
timeProvider,
|
||||
NullLogger<PlannerBackgroundService>.Instance);
|
||||
|
||||
await service.StartAsync(CancellationToken.None);
|
||||
try
|
||||
{
|
||||
await WaitForConditionAsync(() => repository.UpdateCount >= 2);
|
||||
}
|
||||
finally
|
||||
{
|
||||
await service.StopAsync(CancellationToken.None);
|
||||
}
|
||||
|
||||
var processedIds = repository.UpdatedRuns.Select(run => run.Id).ToArray();
|
||||
Assert.Equal(new[] { "run-a1", "run-b1" }, processedIds);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_PrioritizesManualAndEventTriggers()
|
||||
{
|
||||
var timeProvider = new TestTimeProvider(DateTimeOffset.Parse("2025-10-27T18:00:00Z"));
|
||||
|
||||
var runs = new[]
|
||||
{
|
||||
CreateRun("run-cron", "tenant-alpha", RunTrigger.Cron, timeProvider.GetUtcNow().AddMinutes(1), "schedule-cron"),
|
||||
CreateRun("run-conselier", "tenant-bravo", RunTrigger.Conselier, timeProvider.GetUtcNow().AddMinutes(2), "schedule-conselier"),
|
||||
CreateRun("run-manual", "tenant-charlie", RunTrigger.Manual, timeProvider.GetUtcNow().AddMinutes(3), "schedule-manual"),
|
||||
CreateRun("run-excitor", "tenant-delta", RunTrigger.Excitor, timeProvider.GetUtcNow().AddMinutes(4), "schedule-excitor"),
|
||||
};
|
||||
|
||||
var repository = new TestRunRepository(runs, Array.Empty<Run>());
|
||||
var options = CreateOptions(maxConcurrentTenants: 4);
|
||||
var scheduleRepository = new TestScheduleRepository(runs.Select(run => CreateSchedule(run.ScheduleId!, run.TenantId, timeProvider.GetUtcNow())));
|
||||
var snapshotRepository = new StubImpactSnapshotRepository();
|
||||
var runSummaryService = new StubRunSummaryService(timeProvider);
|
||||
var plannerQueue = new RecordingPlannerQueue();
|
||||
var targetingService = new StubImpactTargetingService(timeProvider);
|
||||
|
||||
using var metrics = new SchedulerWorkerMetrics();
|
||||
var executionService = new PlannerExecutionService(
|
||||
scheduleRepository,
|
||||
repository,
|
||||
snapshotRepository,
|
||||
runSummaryService,
|
||||
targetingService,
|
||||
plannerQueue,
|
||||
options,
|
||||
timeProvider,
|
||||
metrics,
|
||||
NullLogger<PlannerExecutionService>.Instance);
|
||||
|
||||
var service = new PlannerBackgroundService(
|
||||
repository,
|
||||
executionService,
|
||||
options,
|
||||
timeProvider,
|
||||
NullLogger<PlannerBackgroundService>.Instance);
|
||||
|
||||
await service.StartAsync(CancellationToken.None);
|
||||
try
|
||||
{
|
||||
await WaitForConditionAsync(() => repository.UpdateCount >= runs.Length);
|
||||
}
|
||||
finally
|
||||
{
|
||||
await service.StopAsync(CancellationToken.None);
|
||||
}
|
||||
|
||||
var processedIds = repository.UpdatedRuns.Select(run => run.Id).ToArray();
|
||||
Assert.Equal(new[] { "run-manual", "run-conselier", "run-excitor", "run-cron" }, processedIds);
|
||||
}
|
||||
|
||||
private static SchedulerWorkerOptions CreateOptions(int maxConcurrentTenants)
|
||||
{
|
||||
return new SchedulerWorkerOptions
|
||||
{
|
||||
Planner =
|
||||
{
|
||||
BatchSize = 20,
|
||||
PollInterval = TimeSpan.FromMilliseconds(1),
|
||||
IdleDelay = TimeSpan.FromMilliseconds(1),
|
||||
MaxConcurrentTenants = maxConcurrentTenants,
|
||||
MaxRunsPerMinute = int.MaxValue,
|
||||
QueueLeaseDuration = TimeSpan.FromMinutes(5)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static Run CreateRun(
|
||||
string id,
|
||||
string tenantId,
|
||||
RunTrigger trigger,
|
||||
DateTimeOffset createdAt,
|
||||
string scheduleId)
|
||||
=> new(
|
||||
id: id,
|
||||
tenantId: tenantId,
|
||||
trigger: trigger,
|
||||
state: RunState.Planning,
|
||||
stats: RunStats.Empty,
|
||||
createdAt: createdAt,
|
||||
reason: RunReason.Empty,
|
||||
scheduleId: scheduleId);
|
||||
|
||||
private static Schedule CreateSchedule(string scheduleId, string tenantId, DateTimeOffset now)
|
||||
=> new(
|
||||
id: scheduleId,
|
||||
tenantId: tenantId,
|
||||
name: $"Schedule-{scheduleId}",
|
||||
enabled: true,
|
||||
cronExpression: "0 2 * * *",
|
||||
timezone: "UTC",
|
||||
mode: ScheduleMode.AnalysisOnly,
|
||||
selection: new Selector(SelectorScope.AllImages, tenantId),
|
||||
onlyIf: ScheduleOnlyIf.Default,
|
||||
notify: ScheduleNotify.Default,
|
||||
limits: ScheduleLimits.Default,
|
||||
createdAt: now,
|
||||
createdBy: "system",
|
||||
updatedAt: now,
|
||||
updatedBy: "system",
|
||||
subscribers: ImmutableArray<string>.Empty);
|
||||
|
||||
private static async Task WaitForConditionAsync(Func<bool> predicate, TimeSpan? timeout = null)
|
||||
{
|
||||
var deadline = DateTime.UtcNow + (timeout ?? TimeSpan.FromSeconds(1));
|
||||
while (!predicate())
|
||||
{
|
||||
if (DateTime.UtcNow > deadline)
|
||||
{
|
||||
throw new TimeoutException("Planner background service did not reach expected state within the allotted time.");
|
||||
}
|
||||
|
||||
await Task.Delay(10);
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class TestRunRepository : IRunRepository
|
||||
{
|
||||
private readonly Queue<IReadOnlyList<Run>> _responses;
|
||||
private readonly ConcurrentQueue<Run> _updates = new();
|
||||
private int _updateCount;
|
||||
|
||||
public TestRunRepository(params IReadOnlyList<Run>[] responses)
|
||||
{
|
||||
if (responses is null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(responses));
|
||||
}
|
||||
|
||||
_responses = new Queue<IReadOnlyList<Run>>(responses.Select(static runs => (IReadOnlyList<Run>)runs.ToArray()));
|
||||
}
|
||||
|
||||
public int UpdateCount => Volatile.Read(ref _updateCount);
|
||||
|
||||
public IReadOnlyList<Run> UpdatedRuns => _updates.ToArray();
|
||||
|
||||
public Task InsertAsync(Run run, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
||||
=> throw new NotSupportedException();
|
||||
|
||||
public Task<bool> UpdateAsync(Run run, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
||||
{
|
||||
_updates.Enqueue(run);
|
||||
Interlocked.Increment(ref _updateCount);
|
||||
return Task.FromResult(true);
|
||||
}
|
||||
|
||||
public Task<Run?> GetAsync(string tenantId, string runId, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
||||
=> Task.FromResult<Run?>(null);
|
||||
|
||||
public Task<IReadOnlyList<Run>> ListAsync(string tenantId, RunQueryOptions? options = null, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
||||
=> Task.FromResult<IReadOnlyList<Run>>(Array.Empty<Run>());
|
||||
|
||||
public Task<IReadOnlyList<Run>> ListByStateAsync(RunState state, int limit = 50, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (state != RunState.Planning)
|
||||
{
|
||||
return Task.FromResult<IReadOnlyList<Run>>(Array.Empty<Run>());
|
||||
}
|
||||
|
||||
var next = _responses.Count > 0 ? _responses.Dequeue() : Array.Empty<Run>();
|
||||
|
||||
if (next.Count > limit)
|
||||
{
|
||||
next = next.Take(limit).ToArray();
|
||||
}
|
||||
|
||||
return Task.FromResult(next);
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class TestScheduleRepository : IScheduleRepository
|
||||
{
|
||||
public TestScheduleRepository(IEnumerable<Schedule> schedules)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(schedules);
|
||||
|
||||
_schedules = new Dictionary<(string TenantId, string ScheduleId), Schedule>();
|
||||
foreach (var schedule in schedules)
|
||||
{
|
||||
if (schedule is null)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
_schedules[(schedule.TenantId, schedule.Id)] = schedule;
|
||||
}
|
||||
}
|
||||
|
||||
private readonly Dictionary<(string TenantId, string ScheduleId), Schedule> _schedules;
|
||||
|
||||
public Task UpsertAsync(Schedule schedule, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
||||
{
|
||||
_schedules[(schedule.TenantId, schedule.Id)] = schedule;
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task<Schedule?> GetAsync(string tenantId, string scheduleId, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
||||
{
|
||||
_schedules.TryGetValue((tenantId, scheduleId), out var schedule);
|
||||
return Task.FromResult(schedule);
|
||||
}
|
||||
|
||||
public Task<IReadOnlyList<Schedule>> ListAsync(string tenantId, ScheduleQueryOptions? options = null, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var results = _schedules.Values.Where(schedule => schedule.TenantId == tenantId).ToArray();
|
||||
return Task.FromResult<IReadOnlyList<Schedule>>(results);
|
||||
}
|
||||
|
||||
public Task<bool> SoftDeleteAsync(string tenantId, string scheduleId, string deletedBy, DateTimeOffset deletedAt, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var removed = _schedules.Remove((tenantId, scheduleId));
|
||||
return Task.FromResult(removed);
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class StubImpactSnapshotRepository : IImpactSnapshotRepository
|
||||
{
|
||||
public ImpactSet? LastSnapshot { get; private set; }
|
||||
|
||||
public Task UpsertAsync(ImpactSet snapshot, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
||||
{
|
||||
LastSnapshot = snapshot;
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task<ImpactSet?> GetBySnapshotIdAsync(string snapshotId, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
||||
=> Task.FromResult<ImpactSet?>(null);
|
||||
|
||||
public Task<ImpactSet?> GetLatestBySelectorAsync(Selector selector, IClientSessionHandle? session = null, CancellationToken cancellationToken = default)
|
||||
=> Task.FromResult<ImpactSet?>(null);
|
||||
}
|
||||
|
||||
private sealed class StubRunSummaryService : IRunSummaryService
|
||||
{
|
||||
private readonly TimeProvider _timeProvider;
|
||||
|
||||
public StubRunSummaryService(TimeProvider timeProvider)
|
||||
{
|
||||
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
|
||||
}
|
||||
|
||||
public Task<RunSummaryProjection> ProjectAsync(Run run, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var projection = new RunSummaryProjection(
|
||||
run.TenantId,
|
||||
run.ScheduleId ?? string.Empty,
|
||||
_timeProvider.GetUtcNow(),
|
||||
null,
|
||||
ImmutableArray<RunSummarySnapshot>.Empty,
|
||||
new RunSummaryCounters(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0));
|
||||
|
||||
return Task.FromResult(projection);
|
||||
}
|
||||
|
||||
public Task<RunSummaryProjection?> GetAsync(string tenantId, string scheduleId, CancellationToken cancellationToken = default)
|
||||
=> Task.FromResult<RunSummaryProjection?>(null);
|
||||
|
||||
public Task<IReadOnlyList<RunSummaryProjection>> ListAsync(string tenantId, CancellationToken cancellationToken = default)
|
||||
=> Task.FromResult<IReadOnlyList<RunSummaryProjection>>(Array.Empty<RunSummaryProjection>());
|
||||
}
|
||||
|
||||
private sealed class StubImpactTargetingService : IImpactTargetingService
|
||||
{
|
||||
private static readonly string DefaultDigest = "sha256:" + new string('a', 64);
|
||||
private readonly TimeProvider _timeProvider;
|
||||
|
||||
public StubImpactTargetingService(TimeProvider timeProvider)
|
||||
{
|
||||
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
|
||||
}
|
||||
|
||||
public ValueTask<ImpactSet> ResolveByPurlsAsync(IEnumerable<string> productKeys, bool usageOnly, Selector selector, CancellationToken cancellationToken = default)
|
||||
=> throw new NotSupportedException();
|
||||
|
||||
public ValueTask<ImpactSet> ResolveByVulnerabilitiesAsync(IEnumerable<string> vulnerabilityIds, bool usageOnly, Selector selector, CancellationToken cancellationToken = default)
|
||||
=> throw new NotSupportedException();
|
||||
|
||||
public ValueTask<ImpactSet> ResolveAllAsync(Selector selector, bool usageOnly, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var image = new ImpactImage(
|
||||
DefaultDigest,
|
||||
registry: "registry.test",
|
||||
repository: "repo/sample",
|
||||
namespaces: new[] { selector.TenantId ?? "unknown" },
|
||||
tags: new[] { "latest" },
|
||||
usedByEntrypoint: true);
|
||||
|
||||
var impactSet = new ImpactSet(
|
||||
selector,
|
||||
ImmutableArray.Create(image),
|
||||
usageOnly,
|
||||
_timeProvider.GetUtcNow(),
|
||||
total: 1,
|
||||
snapshotId: null,
|
||||
schemaVersion: SchedulerSchemaVersions.ImpactSet);
|
||||
|
||||
return ValueTask.FromResult(impactSet);
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class RecordingPlannerQueue : ISchedulerPlannerQueue
|
||||
{
|
||||
private readonly ConcurrentQueue<PlannerQueueMessage> _messages = new();
|
||||
|
||||
public IReadOnlyList<PlannerQueueMessage> Messages => _messages.ToArray();
|
||||
|
||||
public ValueTask<SchedulerQueueEnqueueResult> EnqueueAsync(PlannerQueueMessage message, CancellationToken cancellationToken = default)
|
||||
{
|
||||
_messages.Enqueue(message);
|
||||
return ValueTask.FromResult(new SchedulerQueueEnqueueResult(message.Run.Id, Deduplicated: false));
|
||||
}
|
||||
|
||||
public ValueTask<IReadOnlyList<ISchedulerQueueLease<PlannerQueueMessage>>> LeaseAsync(SchedulerQueueLeaseRequest request, CancellationToken cancellationToken = default)
|
||||
=> ValueTask.FromResult<IReadOnlyList<ISchedulerQueueLease<PlannerQueueMessage>>>(Array.Empty<ISchedulerQueueLease<PlannerQueueMessage>>());
|
||||
|
||||
public ValueTask<IReadOnlyList<ISchedulerQueueLease<PlannerQueueMessage>>> ClaimExpiredAsync(SchedulerQueueClaimOptions options, CancellationToken cancellationToken = default)
|
||||
=> ValueTask.FromResult<IReadOnlyList<ISchedulerQueueLease<PlannerQueueMessage>>>(Array.Empty<ISchedulerQueueLease<PlannerQueueMessage>>());
|
||||
}
|
||||
|
||||
private sealed class TestTimeProvider : TimeProvider
|
||||
{
|
||||
private DateTimeOffset _now;
|
||||
|
||||
public TestTimeProvider(DateTimeOffset initial)
|
||||
{
|
||||
_now = initial;
|
||||
}
|
||||
|
||||
public override DateTimeOffset GetUtcNow() => _now;
|
||||
|
||||
public void Advance(TimeSpan delta) => _now = _now.Add(delta);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user