Implement MongoDB-based storage for Pack Run approval, artifact, log, and state management
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
- Added MongoPackRunApprovalStore for managing approval states with MongoDB. - Introduced MongoPackRunArtifactUploader for uploading and storing artifacts. - Created MongoPackRunLogStore to handle logging of pack run events. - Developed MongoPackRunStateStore for persisting and retrieving pack run states. - Implemented unit tests for MongoDB stores to ensure correct functionality. - Added MongoTaskRunnerTestContext for setting up MongoDB test environment. - Enhanced PackRunStateFactory to correctly initialize state with gate reasons.
This commit is contained in:
@@ -0,0 +1,162 @@
|
||||
using System.Collections.Concurrent;
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using StellaOps.TaskRunner.Core.Execution;
|
||||
|
||||
namespace StellaOps.TaskRunner.Infrastructure.Execution;
|
||||
|
||||
/// <summary>
|
||||
/// Persists pack run logs as newline-delimited JSON for deterministic replay and offline mirroring.
|
||||
/// </summary>
|
||||
public sealed class FilePackRunLogStore : IPackRunLogStore
|
||||
{
|
||||
private static readonly JsonSerializerOptions SerializerOptions = new(JsonSerializerDefaults.Web)
|
||||
{
|
||||
WriteIndented = false
|
||||
};
|
||||
|
||||
private readonly string rootPath;
|
||||
private readonly ConcurrentDictionary<string, SemaphoreSlim> fileLocks = new(StringComparer.Ordinal);
|
||||
|
||||
public FilePackRunLogStore(string rootPath)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(rootPath);
|
||||
|
||||
this.rootPath = Path.GetFullPath(rootPath);
|
||||
Directory.CreateDirectory(this.rootPath);
|
||||
}
|
||||
|
||||
public async Task AppendAsync(string runId, PackRunLogEntry entry, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(runId);
|
||||
ArgumentNullException.ThrowIfNull(entry);
|
||||
|
||||
var path = GetPath(runId);
|
||||
Directory.CreateDirectory(Path.GetDirectoryName(path)!);
|
||||
|
||||
var gate = fileLocks.GetOrAdd(path, _ => new SemaphoreSlim(1, 1));
|
||||
await gate.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
var document = PackRunLogEntryDocument.FromDomain(entry);
|
||||
var json = JsonSerializer.Serialize(document, SerializerOptions);
|
||||
await File.AppendAllTextAsync(path, json + Environment.NewLine, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
finally
|
||||
{
|
||||
gate.Release();
|
||||
}
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<PackRunLogEntry> ReadAsync(
|
||||
string runId,
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(runId);
|
||||
|
||||
var path = GetPath(runId);
|
||||
if (!File.Exists(path))
|
||||
{
|
||||
yield break;
|
||||
}
|
||||
|
||||
await using var stream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
|
||||
using var reader = new StreamReader(stream, Encoding.UTF8);
|
||||
|
||||
while (true)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
var line = await reader.ReadLineAsync().ConfigureAwait(false);
|
||||
if (line is null)
|
||||
{
|
||||
yield break;
|
||||
}
|
||||
|
||||
if (string.IsNullOrWhiteSpace(line))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
PackRunLogEntryDocument? document = null;
|
||||
try
|
||||
{
|
||||
document = JsonSerializer.Deserialize<PackRunLogEntryDocument>(line, SerializerOptions);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Skip malformed entries to avoid stopping the stream; diagnostics are captured via worker logs.
|
||||
}
|
||||
|
||||
if (document is null)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
yield return document.ToDomain();
|
||||
}
|
||||
}
|
||||
|
||||
public Task<bool> ExistsAsync(string runId, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(runId);
|
||||
var path = GetPath(runId);
|
||||
return Task.FromResult(File.Exists(path));
|
||||
}
|
||||
|
||||
private string GetPath(string runId)
|
||||
{
|
||||
var safe = Sanitize(runId);
|
||||
return Path.Combine(rootPath, $"{safe}.ndjson");
|
||||
}
|
||||
|
||||
private static string Sanitize(string value)
|
||||
{
|
||||
var result = value.Trim();
|
||||
foreach (var invalid in Path.GetInvalidFileNameChars())
|
||||
{
|
||||
result = result.Replace(invalid, '_');
|
||||
}
|
||||
|
||||
return string.IsNullOrWhiteSpace(result) ? "run" : result;
|
||||
}
|
||||
|
||||
private sealed record PackRunLogEntryDocument(
|
||||
DateTimeOffset Timestamp,
|
||||
string Level,
|
||||
string EventType,
|
||||
string Message,
|
||||
string? StepId,
|
||||
Dictionary<string, string>? Metadata)
|
||||
{
|
||||
public static PackRunLogEntryDocument FromDomain(PackRunLogEntry entry)
|
||||
{
|
||||
var metadata = entry.Metadata is null
|
||||
? null
|
||||
: new Dictionary<string, string>(entry.Metadata, StringComparer.Ordinal);
|
||||
|
||||
return new PackRunLogEntryDocument(
|
||||
entry.Timestamp,
|
||||
entry.Level,
|
||||
entry.EventType,
|
||||
entry.Message,
|
||||
entry.StepId,
|
||||
metadata);
|
||||
}
|
||||
|
||||
public PackRunLogEntry ToDomain()
|
||||
{
|
||||
IReadOnlyDictionary<string, string>? metadata = Metadata is null
|
||||
? null
|
||||
: new Dictionary<string, string>(Metadata, StringComparer.Ordinal);
|
||||
|
||||
return new PackRunLogEntry(
|
||||
Timestamp,
|
||||
Level,
|
||||
EventType,
|
||||
Message,
|
||||
StepId,
|
||||
metadata);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,164 @@
|
||||
using MongoDB.Bson;
|
||||
using MongoDB.Bson.Serialization.Attributes;
|
||||
using MongoDB.Driver;
|
||||
using StellaOps.TaskRunner.Core.Configuration;
|
||||
using StellaOps.TaskRunner.Core.Execution;
|
||||
|
||||
namespace StellaOps.TaskRunner.Infrastructure.Execution;
|
||||
|
||||
public sealed class MongoPackRunApprovalStore : IPackRunApprovalStore
|
||||
{
|
||||
private readonly IMongoCollection<PackRunApprovalDocument> collection;
|
||||
|
||||
public MongoPackRunApprovalStore(IMongoDatabase database, TaskRunnerMongoOptions options)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(database);
|
||||
ArgumentNullException.ThrowIfNull(options);
|
||||
|
||||
collection = database.GetCollection<PackRunApprovalDocument>(options.ApprovalsCollection);
|
||||
EnsureIndexes(collection);
|
||||
}
|
||||
|
||||
public async Task SaveAsync(string runId, IReadOnlyList<PackRunApprovalState> approvals, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(runId);
|
||||
ArgumentNullException.ThrowIfNull(approvals);
|
||||
|
||||
var filter = Builders<PackRunApprovalDocument>.Filter.Eq(document => document.RunId, runId);
|
||||
|
||||
await collection.DeleteManyAsync(filter, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (approvals.Count == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var documents = approvals
|
||||
.Select(approval => PackRunApprovalDocument.FromDomain(runId, approval))
|
||||
.ToList();
|
||||
|
||||
await collection.InsertManyAsync(documents, cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task<IReadOnlyList<PackRunApprovalState>> GetAsync(string runId, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(runId);
|
||||
|
||||
var filter = Builders<PackRunApprovalDocument>.Filter.Eq(document => document.RunId, runId);
|
||||
|
||||
var documents = await collection
|
||||
.Find(filter)
|
||||
.SortBy(document => document.ApprovalId)
|
||||
.ToListAsync(cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
return documents
|
||||
.Select(document => document.ToDomain())
|
||||
.ToList();
|
||||
}
|
||||
|
||||
public async Task UpdateAsync(string runId, PackRunApprovalState approval, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(runId);
|
||||
ArgumentNullException.ThrowIfNull(approval);
|
||||
|
||||
var filter = Builders<PackRunApprovalDocument>.Filter.And(
|
||||
Builders<PackRunApprovalDocument>.Filter.Eq(document => document.RunId, runId),
|
||||
Builders<PackRunApprovalDocument>.Filter.Eq(document => document.ApprovalId, approval.ApprovalId));
|
||||
|
||||
var existingDocument = await collection
|
||||
.Find(filter)
|
||||
.FirstOrDefaultAsync(cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
if (existingDocument is null)
|
||||
{
|
||||
throw new InvalidOperationException($"Approval '{approval.ApprovalId}' not found for run '{runId}'.");
|
||||
}
|
||||
|
||||
var document = PackRunApprovalDocument.FromDomain(runId, approval, existingDocument.Id);
|
||||
await collection
|
||||
.ReplaceOneAsync(filter, document, cancellationToken: cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private static void EnsureIndexes(IMongoCollection<PackRunApprovalDocument> target)
|
||||
{
|
||||
var models = new[]
|
||||
{
|
||||
new CreateIndexModel<PackRunApprovalDocument>(
|
||||
Builders<PackRunApprovalDocument>.IndexKeys
|
||||
.Ascending(document => document.RunId)
|
||||
.Ascending(document => document.ApprovalId),
|
||||
new CreateIndexOptions { Unique = true }),
|
||||
new CreateIndexModel<PackRunApprovalDocument>(
|
||||
Builders<PackRunApprovalDocument>.IndexKeys
|
||||
.Ascending(document => document.RunId)
|
||||
.Ascending(document => document.Status))
|
||||
};
|
||||
|
||||
target.Indexes.CreateMany(models);
|
||||
}
|
||||
|
||||
private sealed class PackRunApprovalDocument
|
||||
{
|
||||
[BsonId]
|
||||
public ObjectId Id { get; init; }
|
||||
|
||||
public string RunId { get; init; } = default!;
|
||||
|
||||
public string ApprovalId { get; init; } = default!;
|
||||
|
||||
public IReadOnlyList<string> RequiredGrants { get; init; } = Array.Empty<string>();
|
||||
|
||||
public IReadOnlyList<string> StepIds { get; init; } = Array.Empty<string>();
|
||||
|
||||
public IReadOnlyList<string> Messages { get; init; } = Array.Empty<string>();
|
||||
|
||||
public string? ReasonTemplate { get; init; }
|
||||
|
||||
public DateTime RequestedAt { get; init; }
|
||||
|
||||
public string Status { get; init; } = default!;
|
||||
|
||||
public string? ActorId { get; init; }
|
||||
|
||||
public DateTime? CompletedAt { get; init; }
|
||||
|
||||
public string? Summary { get; init; }
|
||||
|
||||
public static PackRunApprovalDocument FromDomain(string runId, PackRunApprovalState approval, ObjectId? id = null)
|
||||
=> new()
|
||||
{
|
||||
Id = id ?? ObjectId.GenerateNewId(),
|
||||
RunId = runId,
|
||||
ApprovalId = approval.ApprovalId,
|
||||
RequiredGrants = approval.RequiredGrants ?? Array.Empty<string>(),
|
||||
StepIds = approval.StepIds ?? Array.Empty<string>(),
|
||||
Messages = approval.Messages ?? Array.Empty<string>(),
|
||||
ReasonTemplate = approval.ReasonTemplate,
|
||||
RequestedAt = approval.RequestedAt.UtcDateTime,
|
||||
Status = approval.Status.ToString(),
|
||||
ActorId = approval.ActorId,
|
||||
CompletedAt = approval.CompletedAt?.UtcDateTime,
|
||||
Summary = approval.Summary
|
||||
};
|
||||
|
||||
public PackRunApprovalState ToDomain()
|
||||
{
|
||||
var status = Enum.Parse<PackRunApprovalStatus>(Status, ignoreCase: true);
|
||||
|
||||
return new PackRunApprovalState(
|
||||
ApprovalId,
|
||||
RequiredGrants?.ToList() ?? new List<string>(),
|
||||
StepIds?.ToList() ?? new List<string>(),
|
||||
Messages?.ToList() ?? new List<string>(),
|
||||
ReasonTemplate,
|
||||
new DateTimeOffset(RequestedAt, TimeSpan.Zero),
|
||||
status,
|
||||
ActorId,
|
||||
CompletedAt is null ? null : new DateTimeOffset(CompletedAt.Value, TimeSpan.Zero),
|
||||
Summary);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,193 @@
|
||||
using System.Text.Json;
|
||||
using System.Text.Json.Nodes;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using MongoDB.Bson;
|
||||
using MongoDB.Bson.Serialization.Attributes;
|
||||
using MongoDB.Driver;
|
||||
using StellaOps.TaskRunner.Core.Configuration;
|
||||
using StellaOps.TaskRunner.Core.Execution;
|
||||
using StellaOps.TaskRunner.Core.Planning;
|
||||
|
||||
namespace StellaOps.TaskRunner.Infrastructure.Execution;
|
||||
|
||||
public sealed class MongoPackRunArtifactUploader : IPackRunArtifactUploader
|
||||
{
|
||||
private static readonly JsonSerializerOptions SerializerOptions = new(JsonSerializerDefaults.Web);
|
||||
|
||||
private readonly IMongoCollection<PackRunArtifactDocument> collection;
|
||||
private readonly TimeProvider timeProvider;
|
||||
private readonly ILogger<MongoPackRunArtifactUploader> logger;
|
||||
|
||||
public MongoPackRunArtifactUploader(
|
||||
IMongoDatabase database,
|
||||
TaskRunnerMongoOptions options,
|
||||
TimeProvider? timeProvider,
|
||||
ILogger<MongoPackRunArtifactUploader> logger)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(database);
|
||||
ArgumentNullException.ThrowIfNull(options);
|
||||
|
||||
collection = database.GetCollection<PackRunArtifactDocument>(options.ArtifactsCollection);
|
||||
this.timeProvider = timeProvider ?? TimeProvider.System;
|
||||
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
EnsureIndexes(collection);
|
||||
}
|
||||
|
||||
public async Task UploadAsync(
|
||||
PackRunExecutionContext context,
|
||||
PackRunState state,
|
||||
IReadOnlyList<TaskPackPlanOutput> outputs,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(context);
|
||||
ArgumentNullException.ThrowIfNull(state);
|
||||
ArgumentNullException.ThrowIfNull(outputs);
|
||||
|
||||
var filter = Builders<PackRunArtifactDocument>.Filter.Eq(document => document.RunId, context.RunId);
|
||||
await collection.DeleteManyAsync(filter, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (outputs.Count == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var timestamp = timeProvider.GetUtcNow();
|
||||
var documents = new List<PackRunArtifactDocument>(outputs.Count);
|
||||
|
||||
foreach (var output in outputs)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
documents.Add(ProcessOutput(context, output, timestamp));
|
||||
}
|
||||
|
||||
await collection.InsertManyAsync(documents, cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private PackRunArtifactDocument ProcessOutput(
|
||||
PackRunExecutionContext context,
|
||||
TaskPackPlanOutput output,
|
||||
DateTimeOffset capturedAt)
|
||||
{
|
||||
var sourcePath = ResolveString(output.Path);
|
||||
var expressionNode = ResolveExpression(output.Expression);
|
||||
string status = "skipped";
|
||||
string? notes = null;
|
||||
string? storedPath = null;
|
||||
|
||||
if (IsFileOutput(output))
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(sourcePath))
|
||||
{
|
||||
status = "unresolved";
|
||||
notes = "Output path requires runtime value.";
|
||||
}
|
||||
else if (!File.Exists(sourcePath))
|
||||
{
|
||||
status = "missing";
|
||||
notes = $"Source file '{sourcePath}' not found.";
|
||||
logger.LogWarning(
|
||||
"Pack run {RunId} output {Output} referenced missing file {Path}.",
|
||||
context.RunId,
|
||||
output.Name,
|
||||
sourcePath);
|
||||
}
|
||||
else
|
||||
{
|
||||
status = "referenced";
|
||||
storedPath = sourcePath;
|
||||
}
|
||||
}
|
||||
|
||||
BsonDocument? expressionDocument = null;
|
||||
if (expressionNode is not null)
|
||||
{
|
||||
var json = expressionNode.ToJsonString(SerializerOptions);
|
||||
expressionDocument = BsonDocument.Parse(json);
|
||||
status = status is "referenced" ? status : "materialized";
|
||||
}
|
||||
|
||||
return new PackRunArtifactDocument
|
||||
{
|
||||
Id = ObjectId.GenerateNewId(),
|
||||
RunId = context.RunId,
|
||||
Name = output.Name,
|
||||
Type = output.Type,
|
||||
SourcePath = sourcePath,
|
||||
StoredPath = storedPath,
|
||||
Status = status,
|
||||
Notes = notes,
|
||||
CapturedAt = capturedAt.UtcDateTime,
|
||||
Expression = expressionDocument
|
||||
};
|
||||
}
|
||||
|
||||
private static bool IsFileOutput(TaskPackPlanOutput output)
|
||||
=> string.Equals(output.Type, "file", StringComparison.OrdinalIgnoreCase);
|
||||
|
||||
private static string? ResolveString(TaskPackPlanParameterValue? parameter)
|
||||
{
|
||||
if (parameter is null || parameter.RequiresRuntimeValue || parameter.Value is null)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
if (parameter.Value is JsonValue jsonValue && jsonValue.TryGetValue<string>(out var value))
|
||||
{
|
||||
return value;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private static JsonNode? ResolveExpression(TaskPackPlanParameterValue? parameter)
|
||||
{
|
||||
if (parameter is null || parameter.RequiresRuntimeValue)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
return parameter.Value;
|
||||
}
|
||||
|
||||
private static void EnsureIndexes(IMongoCollection<PackRunArtifactDocument> target)
|
||||
{
|
||||
var models = new[]
|
||||
{
|
||||
new CreateIndexModel<PackRunArtifactDocument>(
|
||||
Builders<PackRunArtifactDocument>.IndexKeys
|
||||
.Ascending(document => document.RunId)
|
||||
.Ascending(document => document.Name),
|
||||
new CreateIndexOptions { Unique = true }),
|
||||
new CreateIndexModel<PackRunArtifactDocument>(
|
||||
Builders<PackRunArtifactDocument>.IndexKeys
|
||||
.Ascending(document => document.RunId)
|
||||
.Ascending(document => document.Status))
|
||||
};
|
||||
|
||||
target.Indexes.CreateMany(models);
|
||||
}
|
||||
|
||||
public sealed class PackRunArtifactDocument
|
||||
{
|
||||
[BsonId]
|
||||
public ObjectId Id { get; init; }
|
||||
|
||||
public string RunId { get; init; } = default!;
|
||||
|
||||
public string Name { get; init; } = default!;
|
||||
|
||||
public string Type { get; init; } = default!;
|
||||
|
||||
public string? SourcePath { get; init; }
|
||||
|
||||
public string? StoredPath { get; init; }
|
||||
|
||||
public string Status { get; init; } = default!;
|
||||
|
||||
public string? Notes { get; init; }
|
||||
|
||||
public DateTime CapturedAt { get; init; }
|
||||
|
||||
public BsonDocument? Expression { get; init; }
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,162 @@
|
||||
using MongoDB.Bson;
|
||||
using MongoDB.Bson.Serialization.Attributes;
|
||||
using MongoDB.Driver;
|
||||
using StellaOps.TaskRunner.Core.Configuration;
|
||||
using StellaOps.TaskRunner.Core.Execution;
|
||||
|
||||
namespace StellaOps.TaskRunner.Infrastructure.Execution;
|
||||
|
||||
public sealed class MongoPackRunLogStore : IPackRunLogStore
|
||||
{
|
||||
private readonly IMongoCollection<PackRunLogDocument> collection;
|
||||
|
||||
public MongoPackRunLogStore(IMongoDatabase database, TaskRunnerMongoOptions options)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(database);
|
||||
ArgumentNullException.ThrowIfNull(options);
|
||||
|
||||
collection = database.GetCollection<PackRunLogDocument>(options.LogsCollection);
|
||||
EnsureIndexes(collection);
|
||||
}
|
||||
|
||||
public async Task AppendAsync(string runId, PackRunLogEntry entry, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(runId);
|
||||
ArgumentNullException.ThrowIfNull(entry);
|
||||
|
||||
var filter = Builders<PackRunLogDocument>.Filter.Eq(document => document.RunId, runId);
|
||||
|
||||
for (var attempt = 0; attempt < 5; attempt++)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var last = await collection
|
||||
.Find(filter)
|
||||
.SortByDescending(document => document.Sequence)
|
||||
.FirstOrDefaultAsync(cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
var nextSequence = last is null ? 1 : last.Sequence + 1;
|
||||
|
||||
var document = PackRunLogDocument.FromDomain(runId, nextSequence, entry);
|
||||
|
||||
try
|
||||
{
|
||||
await collection.InsertOneAsync(document, cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
return;
|
||||
}
|
||||
catch (MongoWriteException ex) when (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey)
|
||||
{
|
||||
await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
throw new InvalidOperationException($"Failed to append log entry for run '{runId}' after multiple attempts.");
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<PackRunLogEntry> ReadAsync(
|
||||
string runId,
|
||||
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(runId);
|
||||
|
||||
var filter = Builders<PackRunLogDocument>.Filter.Eq(document => document.RunId, runId);
|
||||
|
||||
using var cursor = await collection
|
||||
.Find(filter)
|
||||
.SortBy(document => document.Sequence)
|
||||
.ToCursorAsync(cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
while (await cursor.MoveNextAsync(cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
foreach (var document in cursor.Current)
|
||||
{
|
||||
yield return document.ToDomain();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<bool> ExistsAsync(string runId, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(runId);
|
||||
|
||||
var filter = Builders<PackRunLogDocument>.Filter.Eq(document => document.RunId, runId);
|
||||
return await collection
|
||||
.Find(filter)
|
||||
.Limit(1)
|
||||
.AnyAsync(cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private static void EnsureIndexes(IMongoCollection<PackRunLogDocument> target)
|
||||
{
|
||||
var models = new[]
|
||||
{
|
||||
new CreateIndexModel<PackRunLogDocument>(
|
||||
Builders<PackRunLogDocument>.IndexKeys
|
||||
.Ascending(document => document.RunId)
|
||||
.Ascending(document => document.Sequence),
|
||||
new CreateIndexOptions { Unique = true }),
|
||||
new CreateIndexModel<PackRunLogDocument>(
|
||||
Builders<PackRunLogDocument>.IndexKeys
|
||||
.Ascending(document => document.RunId)
|
||||
.Ascending(document => document.Timestamp))
|
||||
};
|
||||
|
||||
target.Indexes.CreateMany(models);
|
||||
}
|
||||
|
||||
public sealed class PackRunLogDocument
|
||||
{
|
||||
[BsonId]
|
||||
public ObjectId Id { get; init; }
|
||||
|
||||
public string RunId { get; init; } = default!;
|
||||
|
||||
public long Sequence { get; init; }
|
||||
|
||||
public DateTime Timestamp { get; init; }
|
||||
|
||||
public string Level { get; init; } = default!;
|
||||
|
||||
public string EventType { get; init; } = default!;
|
||||
|
||||
public string Message { get; init; } = default!;
|
||||
|
||||
public string? StepId { get; init; }
|
||||
|
||||
public Dictionary<string, string>? Metadata { get; init; }
|
||||
|
||||
public static PackRunLogDocument FromDomain(string runId, long sequence, PackRunLogEntry entry)
|
||||
=> new()
|
||||
{
|
||||
Id = ObjectId.GenerateNewId(),
|
||||
RunId = runId,
|
||||
Sequence = sequence,
|
||||
Timestamp = entry.Timestamp.UtcDateTime,
|
||||
Level = entry.Level,
|
||||
EventType = entry.EventType,
|
||||
Message = entry.Message,
|
||||
StepId = entry.StepId,
|
||||
Metadata = entry.Metadata is null
|
||||
? null
|
||||
: new Dictionary<string, string>(entry.Metadata, StringComparer.Ordinal)
|
||||
};
|
||||
|
||||
public PackRunLogEntry ToDomain()
|
||||
{
|
||||
IReadOnlyDictionary<string, string>? metadata = Metadata is null
|
||||
? null
|
||||
: new Dictionary<string, string>(Metadata, StringComparer.Ordinal);
|
||||
|
||||
return new PackRunLogEntry(
|
||||
new DateTimeOffset(Timestamp, TimeSpan.Zero),
|
||||
Level,
|
||||
EventType,
|
||||
Message,
|
||||
StepId,
|
||||
metadata);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,209 @@
|
||||
using System.Collections.ObjectModel;
|
||||
using System.Text.Json;
|
||||
using MongoDB.Bson;
|
||||
using MongoDB.Bson.Serialization.Attributes;
|
||||
using MongoDB.Driver;
|
||||
using StellaOps.TaskRunner.Core.Configuration;
|
||||
using StellaOps.TaskRunner.Core.Execution;
|
||||
using StellaOps.TaskRunner.Core.Planning;
|
||||
|
||||
namespace StellaOps.TaskRunner.Infrastructure.Execution;
|
||||
|
||||
public sealed class MongoPackRunStateStore : IPackRunStateStore
|
||||
{
|
||||
private static readonly JsonSerializerOptions SerializerOptions = new(JsonSerializerDefaults.Web);
|
||||
|
||||
private readonly IMongoCollection<PackRunStateDocument> collection;
|
||||
|
||||
public MongoPackRunStateStore(IMongoDatabase database, TaskRunnerMongoOptions options)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(database);
|
||||
ArgumentNullException.ThrowIfNull(options);
|
||||
|
||||
collection = database.GetCollection<PackRunStateDocument>(options.RunsCollection);
|
||||
EnsureIndexes(collection);
|
||||
}
|
||||
|
||||
public async Task<PackRunState?> GetAsync(string runId, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(runId);
|
||||
|
||||
var filter = Builders<PackRunStateDocument>.Filter.Eq(document => document.RunId, runId);
|
||||
var document = await collection
|
||||
.Find(filter)
|
||||
.FirstOrDefaultAsync(cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
return document?.ToDomain();
|
||||
}
|
||||
|
||||
public async Task SaveAsync(PackRunState state, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(state);
|
||||
|
||||
var document = PackRunStateDocument.FromDomain(state);
|
||||
var filter = Builders<PackRunStateDocument>.Filter.Eq(existing => existing.RunId, state.RunId);
|
||||
|
||||
await collection
|
||||
.ReplaceOneAsync(filter, document, new ReplaceOptions { IsUpsert = true }, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task<IReadOnlyList<PackRunState>> ListAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var documents = await collection
|
||||
.Find(FilterDefinition<PackRunStateDocument>.Empty)
|
||||
.SortByDescending(document => document.UpdatedAt)
|
||||
.ToListAsync(cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
return documents
|
||||
.Select(document => document.ToDomain())
|
||||
.ToList();
|
||||
}
|
||||
|
||||
private static void EnsureIndexes(IMongoCollection<PackRunStateDocument> target)
|
||||
{
|
||||
var models = new[]
|
||||
{
|
||||
new CreateIndexModel<PackRunStateDocument>(
|
||||
Builders<PackRunStateDocument>.IndexKeys.Descending(document => document.UpdatedAt)),
|
||||
new CreateIndexModel<PackRunStateDocument>(
|
||||
Builders<PackRunStateDocument>.IndexKeys.Ascending(document => document.PlanHash))
|
||||
};
|
||||
|
||||
target.Indexes.CreateMany(models);
|
||||
}
|
||||
|
||||
private sealed class PackRunStateDocument
|
||||
{
|
||||
[BsonId]
|
||||
public string RunId { get; init; } = default!;
|
||||
|
||||
public string PlanHash { get; init; } = default!;
|
||||
|
||||
public BsonDocument Plan { get; init; } = default!;
|
||||
|
||||
public BsonDocument FailurePolicy { get; init; } = default!;
|
||||
|
||||
public DateTime RequestedAt { get; init; }
|
||||
|
||||
public DateTime CreatedAt { get; init; }
|
||||
|
||||
public DateTime UpdatedAt { get; init; }
|
||||
|
||||
public List<PackRunStepDocument> Steps { get; init; } = new();
|
||||
|
||||
public static PackRunStateDocument FromDomain(PackRunState state)
|
||||
{
|
||||
var planDocument = BsonDocument.Parse(JsonSerializer.Serialize(state.Plan, SerializerOptions));
|
||||
var failurePolicyDocument = BsonDocument.Parse(JsonSerializer.Serialize(state.FailurePolicy, SerializerOptions));
|
||||
|
||||
var steps = state.Steps.Values
|
||||
.OrderBy(step => step.StepId, StringComparer.Ordinal)
|
||||
.Select(PackRunStepDocument.FromDomain)
|
||||
.ToList();
|
||||
|
||||
return new PackRunStateDocument
|
||||
{
|
||||
RunId = state.RunId,
|
||||
PlanHash = state.PlanHash,
|
||||
Plan = planDocument,
|
||||
FailurePolicy = failurePolicyDocument,
|
||||
RequestedAt = state.RequestedAt.UtcDateTime,
|
||||
CreatedAt = state.CreatedAt.UtcDateTime,
|
||||
UpdatedAt = state.UpdatedAt.UtcDateTime,
|
||||
Steps = steps
|
||||
};
|
||||
}
|
||||
|
||||
public PackRunState ToDomain()
|
||||
{
|
||||
var planJson = Plan.ToJson();
|
||||
var plan = JsonSerializer.Deserialize<TaskPackPlan>(planJson, SerializerOptions)
|
||||
?? throw new InvalidOperationException("Failed to deserialize stored TaskPackPlan.");
|
||||
|
||||
var failurePolicyJson = FailurePolicy.ToJson();
|
||||
var failurePolicy = JsonSerializer.Deserialize<TaskPackPlanFailurePolicy>(failurePolicyJson, SerializerOptions)
|
||||
?? throw new InvalidOperationException("Failed to deserialize stored TaskPackPlanFailurePolicy.");
|
||||
|
||||
var stepRecords = Steps
|
||||
.Select(step => step.ToDomain())
|
||||
.ToDictionary(record => record.StepId, record => record, StringComparer.Ordinal);
|
||||
|
||||
return new PackRunState(
|
||||
RunId,
|
||||
PlanHash,
|
||||
plan,
|
||||
failurePolicy,
|
||||
new DateTimeOffset(RequestedAt, TimeSpan.Zero),
|
||||
new DateTimeOffset(CreatedAt, TimeSpan.Zero),
|
||||
new DateTimeOffset(UpdatedAt, TimeSpan.Zero),
|
||||
new ReadOnlyDictionary<string, PackRunStepStateRecord>(stepRecords));
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class PackRunStepDocument
|
||||
{
|
||||
public string StepId { get; init; } = default!;
|
||||
|
||||
public string Kind { get; init; } = default!;
|
||||
|
||||
public bool Enabled { get; init; }
|
||||
|
||||
public bool ContinueOnError { get; init; }
|
||||
|
||||
public int? MaxParallel { get; init; }
|
||||
|
||||
public string? ApprovalId { get; init; }
|
||||
|
||||
public string? GateMessage { get; init; }
|
||||
|
||||
public string Status { get; init; } = default!;
|
||||
|
||||
public int Attempts { get; init; }
|
||||
|
||||
public DateTime? LastTransitionAt { get; init; }
|
||||
|
||||
public DateTime? NextAttemptAt { get; init; }
|
||||
|
||||
public string? StatusReason { get; init; }
|
||||
|
||||
public static PackRunStepDocument FromDomain(PackRunStepStateRecord record)
|
||||
=> new()
|
||||
{
|
||||
StepId = record.StepId,
|
||||
Kind = record.Kind.ToString(),
|
||||
Enabled = record.Enabled,
|
||||
ContinueOnError = record.ContinueOnError,
|
||||
MaxParallel = record.MaxParallel,
|
||||
ApprovalId = record.ApprovalId,
|
||||
GateMessage = record.GateMessage,
|
||||
Status = record.Status.ToString(),
|
||||
Attempts = record.Attempts,
|
||||
LastTransitionAt = record.LastTransitionAt?.UtcDateTime,
|
||||
NextAttemptAt = record.NextAttemptAt?.UtcDateTime,
|
||||
StatusReason = record.StatusReason
|
||||
};
|
||||
|
||||
public PackRunStepStateRecord ToDomain()
|
||||
{
|
||||
var kind = Enum.Parse<PackRunStepKind>(Kind, ignoreCase: true);
|
||||
var status = Enum.Parse<PackRunStepExecutionStatus>(Status, ignoreCase: true);
|
||||
|
||||
return new PackRunStepStateRecord(
|
||||
StepId,
|
||||
kind,
|
||||
Enabled,
|
||||
ContinueOnError,
|
||||
MaxParallel,
|
||||
ApprovalId,
|
||||
GateMessage,
|
||||
status,
|
||||
Attempts,
|
||||
LastTransitionAt is null ? null : new DateTimeOffset(LastTransitionAt.Value, TimeSpan.Zero),
|
||||
NextAttemptAt is null ? null : new DateTimeOffset(NextAttemptAt.Value, TimeSpan.Zero),
|
||||
StatusReason);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3,6 +3,7 @@
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Extensions.Http" Version="10.0.0-rc.2.25502.107" />
|
||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.0-rc.2.25502.107" />
|
||||
<PackageReference Include="MongoDB.Driver" Version="3.5.0" />
|
||||
<ProjectReference Include="..\StellaOps.TaskRunner.Core\StellaOps.TaskRunner.Core.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
|
||||
Reference in New Issue
Block a user