Files
git.stella-ops.org/src/StellaOps.Feedser.Storage.Mongo/MongoJobStore.cs
root df5984d07e
Some checks failed
Build Test Deploy / build-test (push) Has been cancelled
Build Test Deploy / authority-container (push) Has been cancelled
Build Test Deploy / docs (push) Has been cancelled
Build Test Deploy / deploy (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
up
2025-10-10 06:53:40 +00:00

195 lines
7.4 KiB
C#

using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Extensions.Logging;
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Driver;
using StellaOps.Feedser.Core.Jobs;
namespace StellaOps.Feedser.Storage.Mongo;
public sealed class MongoJobStore : IJobStore
{
private static readonly string PendingStatus = JobRunStatus.Pending.ToString();
private static readonly string RunningStatus = JobRunStatus.Running.ToString();
private readonly IMongoCollection<JobRunDocument> _collection;
private readonly ILogger<MongoJobStore> _logger;
public MongoJobStore(IMongoCollection<JobRunDocument> collection, ILogger<MongoJobStore> logger)
{
_collection = collection ?? throw new ArgumentNullException(nameof(collection));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<JobRunSnapshot> CreateAsync(JobRunCreateRequest request, CancellationToken cancellationToken)
{
var runId = Guid.NewGuid();
var document = JobRunDocumentExtensions.FromRequest(request, runId);
await _collection.InsertOneAsync(document, cancellationToken: cancellationToken).ConfigureAwait(false);
_logger.LogDebug("Created job run {RunId} for {Kind} with trigger {Trigger}", runId, request.Kind, request.Trigger);
return document.ToSnapshot();
}
public async Task<JobRunSnapshot?> TryStartAsync(Guid runId, DateTimeOffset startedAt, CancellationToken cancellationToken)
{
var runIdValue = runId.ToString();
var filter = Builders<JobRunDocument>.Filter.Eq(x => x.Id, runIdValue)
& Builders<JobRunDocument>.Filter.Eq(x => x.Status, PendingStatus);
var update = Builders<JobRunDocument>.Update
.Set(x => x.Status, RunningStatus)
.Set(x => x.StartedAt, startedAt.UtcDateTime);
var result = await _collection.FindOneAndUpdateAsync(
filter,
update,
new FindOneAndUpdateOptions<JobRunDocument>
{
ReturnDocument = ReturnDocument.After,
},
cancellationToken).ConfigureAwait(false);
if (result is null)
{
_logger.LogDebug("Failed to start job run {RunId}; status transition rejected", runId);
return null;
}
return result.ToSnapshot();
}
public async Task<JobRunSnapshot?> TryCompleteAsync(Guid runId, JobRunCompletion completion, CancellationToken cancellationToken)
{
var runIdValue = runId.ToString();
var filter = Builders<JobRunDocument>.Filter.Eq(x => x.Id, runIdValue)
& Builders<JobRunDocument>.Filter.In(x => x.Status, new[] { PendingStatus, RunningStatus });
var update = Builders<JobRunDocument>.Update
.Set(x => x.Status, completion.Status.ToString())
.Set(x => x.CompletedAt, completion.CompletedAt.UtcDateTime)
.Set(x => x.Error, completion.Error);
var result = await _collection.FindOneAndUpdateAsync(
filter,
update,
new FindOneAndUpdateOptions<JobRunDocument>
{
ReturnDocument = ReturnDocument.After,
},
cancellationToken).ConfigureAwait(false);
if (result is null)
{
_logger.LogWarning("Failed to mark job run {RunId} as {Status}", runId, completion.Status);
return null;
}
return result.ToSnapshot();
}
public async Task<JobRunSnapshot?> FindAsync(Guid runId, CancellationToken cancellationToken)
{
var cursor = await _collection.FindAsync(x => x.Id == runId.ToString(), cancellationToken: cancellationToken).ConfigureAwait(false);
var document = await cursor.FirstOrDefaultAsync(cancellationToken).ConfigureAwait(false);
return document?.ToSnapshot();
}
public async Task<IReadOnlyList<JobRunSnapshot>> GetRecentRunsAsync(string? kind, int limit, CancellationToken cancellationToken)
{
if (limit <= 0)
{
return Array.Empty<JobRunSnapshot>();
}
var filter = string.IsNullOrWhiteSpace(kind)
? Builders<JobRunDocument>.Filter.Empty
: Builders<JobRunDocument>.Filter.Eq(x => x.Kind, kind);
var cursor = await _collection.Find(filter)
.SortByDescending(x => x.CreatedAt)
.Limit(limit)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
return cursor.Select(static doc => doc.ToSnapshot()).ToArray();
}
public async Task<IReadOnlyList<JobRunSnapshot>> GetActiveRunsAsync(CancellationToken cancellationToken)
{
var filter = Builders<JobRunDocument>.Filter.In(x => x.Status, new[] { PendingStatus, RunningStatus });
var cursor = await _collection.Find(filter)
.SortByDescending(x => x.CreatedAt)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
return cursor.Select(static doc => doc.ToSnapshot()).ToArray();
}
public async Task<JobRunSnapshot?> GetLastRunAsync(string kind, CancellationToken cancellationToken)
{
var cursor = await _collection.Find(x => x.Kind == kind)
.SortByDescending(x => x.CreatedAt)
.Limit(1)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
return cursor.FirstOrDefault()?.ToSnapshot();
}
public async Task<IReadOnlyDictionary<string, JobRunSnapshot>> GetLastRunsAsync(IEnumerable<string> kinds, CancellationToken cancellationToken)
{
if (kinds is null)
{
throw new ArgumentNullException(nameof(kinds));
}
var kindList = kinds
.Where(static kind => !string.IsNullOrWhiteSpace(kind))
.Select(static kind => kind.Trim())
.Distinct(StringComparer.Ordinal)
.ToArray();
if (kindList.Length == 0)
{
return new Dictionary<string, JobRunSnapshot>(StringComparer.Ordinal);
}
var matchStage = new BsonDocument("$match", new BsonDocument("kind", new BsonDocument("$in", new BsonArray(kindList))));
var sortStage = new BsonDocument("$sort", new BsonDocument("createdAt", -1));
var groupStage = new BsonDocument("$group", new BsonDocument
{
{ "_id", "$kind" },
{ "document", new BsonDocument("$first", "$$ROOT") }
});
var pipeline = new[] { matchStage, sortStage, groupStage };
var aggregate = await _collection.Aggregate<BsonDocument>(pipeline)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
var results = new Dictionary<string, JobRunSnapshot>(StringComparer.Ordinal);
foreach (var element in aggregate)
{
if (!element.TryGetValue("_id", out var idValue) || idValue.BsonType != BsonType.String)
{
continue;
}
if (!element.TryGetValue("document", out var documentValue) || documentValue.BsonType != BsonType.Document)
{
continue;
}
var document = BsonSerializer.Deserialize<JobRunDocument>(documentValue.AsBsonDocument);
results[idValue.AsString] = document.ToSnapshot();
}
return results;
}
}