This commit is contained in:
master
2026-02-04 19:59:20 +02:00
parent 557feefdc3
commit 5548cf83bf
1479 changed files with 53557 additions and 40339 deletions

View File

@@ -491,6 +491,9 @@ builder.Services.AddConcelierPostgresStorage(pgOptions =>
pgOptions.MigrationsPath = postgresOptions.MigrationsPath;
});
// Register in-memory lease store (single-instance dev mode).
builder.Services.AddSingleton<StellaOps.Concelier.Core.Jobs.ILeaseStore, StellaOps.Concelier.Core.Jobs.InMemoryLeaseStore>();
builder.Services.AddOptions<AdvisoryObservationEventPublisherOptions>()
.Bind(builder.Configuration.GetSection("advisoryObservationEvents"))
.PostConfigure(options =>

View File

@@ -0,0 +1,65 @@
namespace StellaOps.Concelier.Core.Jobs;
/// <summary>
/// In-memory lease store for single-instance deployments.
/// For multi-instance deployments, replace with a Postgres-backed implementation.
/// </summary>
public sealed class InMemoryLeaseStore : ILeaseStore
{
private readonly object _lock = new();
private readonly Dictionary<string, JobLease> _leases = new();
public Task<JobLease?> TryAcquireAsync(
string key,
string holder,
TimeSpan leaseDuration,
DateTimeOffset now,
CancellationToken cancellationToken)
{
lock (_lock)
{
if (_leases.TryGetValue(key, out var existing) && existing.TtlAt > now && existing.Holder != holder)
{
return Task.FromResult<JobLease?>(null);
}
var lease = new JobLease(key, holder, now, now, leaseDuration, now.Add(leaseDuration));
_leases[key] = lease;
return Task.FromResult<JobLease?>(lease);
}
}
public Task<JobLease?> HeartbeatAsync(
string key,
string holder,
TimeSpan leaseDuration,
DateTimeOffset now,
CancellationToken cancellationToken)
{
lock (_lock)
{
if (_leases.TryGetValue(key, out var existing) && existing.Holder == holder)
{
var lease = new JobLease(key, holder, existing.AcquiredAt, now, leaseDuration, now.Add(leaseDuration));
_leases[key] = lease;
return Task.FromResult<JobLease?>(lease);
}
return Task.FromResult<JobLease?>(null);
}
}
public Task<bool> ReleaseAsync(string key, string holder, CancellationToken cancellationToken)
{
lock (_lock)
{
if (_leases.TryGetValue(key, out var existing) && existing.Holder == holder)
{
_leases.Remove(key);
return Task.FromResult(true);
}
}
return Task.FromResult(false);
}
}