- Added `FilesystemPackRunProvenanceWriter` to write provenance manifests to the filesystem. - Introduced `MongoPackRunArtifactReader` to read artifacts from MongoDB. - Created `MongoPackRunProvenanceWriter` to store provenance manifests in MongoDB. - Developed unit tests for filesystem and MongoDB provenance writers. - Established `ITimelineEventStore` and `ITimelineIngestionService` interfaces for timeline event handling. - Implemented `TimelineIngestionService` to validate and persist timeline events with hashing. - Created PostgreSQL schema and migration scripts for timeline indexing. - Added dependency injection support for timeline indexer services. - Developed tests for timeline ingestion and schema validation.
165 lines
6.3 KiB
C#
165 lines
6.3 KiB
C#
using MongoDB.Bson;
|
|
using MongoDB.Bson.Serialization.Attributes;
|
|
using MongoDB.Driver;
|
|
using StellaOps.TaskRunner.Core.Configuration;
|
|
using StellaOps.TaskRunner.Core.Execution;
|
|
|
|
namespace StellaOps.TaskRunner.Infrastructure.Execution;
|
|
|
|
public sealed class MongoPackRunApprovalStore : IPackRunApprovalStore
|
|
{
|
|
private readonly IMongoCollection<PackRunApprovalDocument> collection;
|
|
|
|
public MongoPackRunApprovalStore(IMongoDatabase database, TaskRunnerMongoOptions options)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(database);
|
|
ArgumentNullException.ThrowIfNull(options);
|
|
|
|
collection = database.GetCollection<PackRunApprovalDocument>(options.ApprovalsCollection);
|
|
EnsureIndexes(collection);
|
|
}
|
|
|
|
public async Task SaveAsync(string runId, IReadOnlyList<PackRunApprovalState> approvals, CancellationToken cancellationToken)
|
|
{
|
|
ArgumentException.ThrowIfNullOrWhiteSpace(runId);
|
|
ArgumentNullException.ThrowIfNull(approvals);
|
|
|
|
var filter = Builders<PackRunApprovalDocument>.Filter.Eq(document => document.RunId, runId);
|
|
|
|
await collection.DeleteManyAsync(filter, cancellationToken).ConfigureAwait(false);
|
|
|
|
if (approvals.Count == 0)
|
|
{
|
|
return;
|
|
}
|
|
|
|
var documents = approvals
|
|
.Select(approval => PackRunApprovalDocument.FromDomain(runId, approval))
|
|
.ToList();
|
|
|
|
await collection.InsertManyAsync(documents, cancellationToken: cancellationToken).ConfigureAwait(false);
|
|
}
|
|
|
|
public async Task<IReadOnlyList<PackRunApprovalState>> GetAsync(string runId, CancellationToken cancellationToken)
|
|
{
|
|
ArgumentException.ThrowIfNullOrWhiteSpace(runId);
|
|
|
|
var filter = Builders<PackRunApprovalDocument>.Filter.Eq(document => document.RunId, runId);
|
|
|
|
var documents = await collection
|
|
.Find(filter)
|
|
.SortBy(document => document.ApprovalId)
|
|
.ToListAsync(cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
return documents
|
|
.Select(document => document.ToDomain())
|
|
.ToList();
|
|
}
|
|
|
|
public async Task UpdateAsync(string runId, PackRunApprovalState approval, CancellationToken cancellationToken)
|
|
{
|
|
ArgumentException.ThrowIfNullOrWhiteSpace(runId);
|
|
ArgumentNullException.ThrowIfNull(approval);
|
|
|
|
var filter = Builders<PackRunApprovalDocument>.Filter.And(
|
|
Builders<PackRunApprovalDocument>.Filter.Eq(document => document.RunId, runId),
|
|
Builders<PackRunApprovalDocument>.Filter.Eq(document => document.ApprovalId, approval.ApprovalId));
|
|
|
|
var existingDocument = await collection
|
|
.Find(filter)
|
|
.FirstOrDefaultAsync(cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
if (existingDocument is null)
|
|
{
|
|
throw new InvalidOperationException($"Approval '{approval.ApprovalId}' not found for run '{runId}'.");
|
|
}
|
|
|
|
var document = PackRunApprovalDocument.FromDomain(runId, approval, existingDocument.Id);
|
|
await collection
|
|
.ReplaceOneAsync(filter, document, cancellationToken: cancellationToken)
|
|
.ConfigureAwait(false);
|
|
}
|
|
|
|
public static IEnumerable<CreateIndexModel<PackRunApprovalDocument>> GetIndexModels()
|
|
{
|
|
yield return new CreateIndexModel<PackRunApprovalDocument>(
|
|
Builders<PackRunApprovalDocument>.IndexKeys
|
|
.Ascending(document => document.RunId)
|
|
.Ascending(document => document.ApprovalId),
|
|
new CreateIndexOptions { Unique = true, Name = "pack_run_approvals_run_approval" });
|
|
|
|
yield return new CreateIndexModel<PackRunApprovalDocument>(
|
|
Builders<PackRunApprovalDocument>.IndexKeys
|
|
.Ascending(document => document.RunId)
|
|
.Ascending(document => document.Status),
|
|
new CreateIndexOptions { Name = "pack_run_approvals_run_status" });
|
|
}
|
|
|
|
private static void EnsureIndexes(IMongoCollection<PackRunApprovalDocument> target)
|
|
=> target.Indexes.CreateMany(GetIndexModels());
|
|
|
|
public sealed class PackRunApprovalDocument
|
|
{
|
|
[BsonId]
|
|
public ObjectId Id { get; init; }
|
|
|
|
public string RunId { get; init; } = default!;
|
|
|
|
public string ApprovalId { get; init; } = default!;
|
|
|
|
public IReadOnlyList<string> RequiredGrants { get; init; } = Array.Empty<string>();
|
|
|
|
public IReadOnlyList<string> StepIds { get; init; } = Array.Empty<string>();
|
|
|
|
public IReadOnlyList<string> Messages { get; init; } = Array.Empty<string>();
|
|
|
|
public string? ReasonTemplate { get; init; }
|
|
|
|
public DateTime RequestedAt { get; init; }
|
|
|
|
public string Status { get; init; } = default!;
|
|
|
|
public string? ActorId { get; init; }
|
|
|
|
public DateTime? CompletedAt { get; init; }
|
|
|
|
public string? Summary { get; init; }
|
|
|
|
public static PackRunApprovalDocument FromDomain(string runId, PackRunApprovalState approval, ObjectId? id = null)
|
|
=> new()
|
|
{
|
|
Id = id ?? ObjectId.GenerateNewId(),
|
|
RunId = runId,
|
|
ApprovalId = approval.ApprovalId,
|
|
RequiredGrants = approval.RequiredGrants ?? Array.Empty<string>(),
|
|
StepIds = approval.StepIds ?? Array.Empty<string>(),
|
|
Messages = approval.Messages ?? Array.Empty<string>(),
|
|
ReasonTemplate = approval.ReasonTemplate,
|
|
RequestedAt = approval.RequestedAt.UtcDateTime,
|
|
Status = approval.Status.ToString(),
|
|
ActorId = approval.ActorId,
|
|
CompletedAt = approval.CompletedAt?.UtcDateTime,
|
|
Summary = approval.Summary
|
|
};
|
|
|
|
public PackRunApprovalState ToDomain()
|
|
{
|
|
var status = Enum.Parse<PackRunApprovalStatus>(Status, ignoreCase: true);
|
|
|
|
return new PackRunApprovalState(
|
|
ApprovalId,
|
|
RequiredGrants?.ToList() ?? new List<string>(),
|
|
StepIds?.ToList() ?? new List<string>(),
|
|
Messages?.ToList() ?? new List<string>(),
|
|
ReasonTemplate,
|
|
new DateTimeOffset(RequestedAt, TimeSpan.Zero),
|
|
status,
|
|
ActorId,
|
|
CompletedAt is null ? null : new DateTimeOffset(CompletedAt.Value, TimeSpan.Zero),
|
|
Summary);
|
|
}
|
|
}
|
|
}
|