Add Policy DSL Validator, Schema Exporter, and Simulation Smoke tools

- Implemented PolicyDslValidator with command-line options for strict mode and JSON output.
- Created PolicySchemaExporter to generate JSON schemas for policy-related models.
- Developed PolicySimulationSmoke tool to validate policy simulations against expected outcomes.
- Added project files and necessary dependencies for each tool.
- Ensured proper error handling and usage instructions across tools.
This commit is contained in:
master
2025-10-27 08:00:11 +02:00
parent 2b7b88ca77
commit 799f787de2
712 changed files with 49449 additions and 6124 deletions

View File

@@ -0,0 +1,88 @@
using MongoDB.Bson;
using MongoDB.Bson.Serialization.Attributes;
using StellaOps.Scheduler.Models;
namespace StellaOps.Scheduler.Storage.Mongo.Documents;
[BsonIgnoreExtraElements]
internal sealed class RunSummaryDocument
{
public string Id { get; set; } = string.Empty;
public string TenantId { get; set; } = string.Empty;
public string ScheduleId { get; set; } = string.Empty;
public DateTime UpdatedAt { get; set; } = DateTime.SpecifyKind(DateTime.UnixEpoch, DateTimeKind.Utc);
public RunSummaryEntryDocument? LastRun { get; set; }
= null;
public List<RunSummaryEntryDocument> Recent { get; set; } = new();
public RunSummaryCountersDocument Counters { get; set; } = new();
}
internal sealed class RunSummaryEntryDocument
{
public string RunId { get; set; } = string.Empty;
[BsonRepresentation(BsonType.String)]
public RunTrigger Trigger { get; set; } = RunTrigger.Cron;
[BsonRepresentation(BsonType.String)]
public RunState State { get; set; } = RunState.Planning;
public DateTime CreatedAt { get; set; } = DateTime.SpecifyKind(DateTime.UnixEpoch, DateTimeKind.Utc);
public DateTime? StartedAt { get; set; }
= null;
public DateTime? FinishedAt { get; set; }
= null;
public RunStats Stats { get; set; } = RunStats.Empty;
[BsonIgnoreIfNull]
public string? Error { get; set; }
= null;
}
internal sealed class RunSummaryCountersDocument
{
public int Total { get; set; }
= 0;
public int Planning { get; set; }
= 0;
public int Queued { get; set; }
= 0;
public int Running { get; set; }
= 0;
public int Completed { get; set; }
= 0;
public int Error { get; set; }
= 0;
public int Cancelled { get; set; }
= 0;
public int TotalDeltas { get; set; }
= 0;
public int TotalNewCriticals { get; set; }
= 0;
public int TotalNewHigh { get; set; }
= 0;
public int TotalNewMedium { get; set; }
= 0;
public int TotalNewLow { get; set; }
= 0;
}

View File

@@ -17,15 +17,17 @@ internal sealed class EnsureSchedulerCollectionsMigration : ISchedulerMongoMigra
{
ArgumentNullException.ThrowIfNull(context);
var requiredCollections = new[]
{
context.Options.SchedulesCollection,
context.Options.RunsCollection,
context.Options.ImpactSnapshotsCollection,
context.Options.AuditCollection,
context.Options.LocksCollection,
context.Options.MigrationsCollection
};
var requiredCollections = new[]
{
context.Options.SchedulesCollection,
context.Options.RunsCollection,
context.Options.ImpactSnapshotsCollection,
context.Options.AuditCollection,
context.Options.RunSummariesCollection,
context.Options.GraphJobsCollection,
context.Options.LocksCollection,
context.Options.MigrationsCollection
};
var cursor = await context.Database
.ListCollectionNamesAsync(cancellationToken: cancellationToken)

View File

@@ -13,12 +13,14 @@ internal sealed class EnsureSchedulerIndexesMigration : ISchedulerMongoMigration
{
ArgumentNullException.ThrowIfNull(context);
await EnsureSchedulesIndexesAsync(context, cancellationToken).ConfigureAwait(false);
await EnsureRunsIndexesAsync(context, cancellationToken).ConfigureAwait(false);
await EnsureImpactSnapshotsIndexesAsync(context, cancellationToken).ConfigureAwait(false);
await EnsureAuditIndexesAsync(context, cancellationToken).ConfigureAwait(false);
await EnsureLocksIndexesAsync(context, cancellationToken).ConfigureAwait(false);
}
await EnsureSchedulesIndexesAsync(context, cancellationToken).ConfigureAwait(false);
await EnsureRunsIndexesAsync(context, cancellationToken).ConfigureAwait(false);
await EnsureImpactSnapshotsIndexesAsync(context, cancellationToken).ConfigureAwait(false);
await EnsureAuditIndexesAsync(context, cancellationToken).ConfigureAwait(false);
await EnsureRunSummariesIndexesAsync(context, cancellationToken).ConfigureAwait(false);
await EnsureGraphJobsIndexesAsync(context, cancellationToken).ConfigureAwait(false);
await EnsureLocksIndexesAsync(context, cancellationToken).ConfigureAwait(false);
}
private static async Task EnsureSchedulesIndexesAsync(SchedulerMongoContext context, CancellationToken cancellationToken)
{
@@ -120,11 +122,11 @@ internal sealed class EnsureSchedulerIndexesMigration : ISchedulerMongoMigration
.ConfigureAwait(false);
}
private static async Task EnsureAuditIndexesAsync(SchedulerMongoContext context, CancellationToken cancellationToken)
{
var collection = context.Database.GetCollection<BsonDocument>(context.Options.AuditCollection);
var tenantOccurred = new CreateIndexModel<BsonDocument>(
private static async Task EnsureAuditIndexesAsync(SchedulerMongoContext context, CancellationToken cancellationToken)
{
var collection = context.Database.GetCollection<BsonDocument>(context.Options.AuditCollection);
var tenantOccurred = new CreateIndexModel<BsonDocument>(
Builders<BsonDocument>.IndexKeys
.Ascending("tenantId")
.Descending("occurredAt"),
@@ -142,9 +144,72 @@ internal sealed class EnsureSchedulerIndexesMigration : ISchedulerMongoMigration
PartialFilterExpression = Builders<BsonDocument>.Filter.Exists("correlationId", true)
});
await collection.Indexes.CreateManyAsync(new[] { tenantOccurred, correlationIndex }, cancellationToken: cancellationToken)
.ConfigureAwait(false);
}
await collection.Indexes.CreateManyAsync(new[] { tenantOccurred, correlationIndex }, cancellationToken: cancellationToken)
.ConfigureAwait(false);
}
private static async Task EnsureRunSummariesIndexesAsync(SchedulerMongoContext context, CancellationToken cancellationToken)
{
var collection = context.Database.GetCollection<BsonDocument>(context.Options.RunSummariesCollection);
var tenantSchedule = new CreateIndexModel<BsonDocument>(
Builders<BsonDocument>.IndexKeys
.Ascending("tenantId")
.Ascending("scheduleId"),
new CreateIndexOptions<BsonDocument>
{
Name = "tenant_schedule",
Unique = true
});
var tenantUpdated = new CreateIndexModel<BsonDocument>(
Builders<BsonDocument>.IndexKeys
.Ascending("tenantId")
.Descending("updatedAt"),
new CreateIndexOptions<BsonDocument>
{
Name = "tenant_updatedAt_desc"
});
await collection.Indexes.CreateManyAsync(new[] { tenantSchedule, tenantUpdated }, cancellationToken: cancellationToken)
.ConfigureAwait(false);
}
private static async Task EnsureGraphJobsIndexesAsync(SchedulerMongoContext context, CancellationToken cancellationToken)
{
var collection = context.Database.GetCollection<BsonDocument>(context.Options.GraphJobsCollection);
var tenantKindCreated = new CreateIndexModel<BsonDocument>(
Builders<BsonDocument>.IndexKeys
.Ascending("tenantId")
.Ascending("kind")
.Descending("createdAt"),
new CreateIndexOptions<BsonDocument>
{
Name = "tenant_kind_createdAt_desc"
});
var tenantStatus = new CreateIndexModel<BsonDocument>(
Builders<BsonDocument>.IndexKeys
.Ascending("tenantId")
.Ascending("status"),
new CreateIndexOptions<BsonDocument>
{
Name = "tenant_status"
});
var snapshotIndex = new CreateIndexModel<BsonDocument>(
Builders<BsonDocument>.IndexKeys
.Ascending("graphSnapshotId"),
new CreateIndexOptions<BsonDocument>
{
Name = "graphSnapshot_lookup",
PartialFilterExpression = Builders<BsonDocument>.Filter.Exists("graphSnapshotId", true)
});
await collection.Indexes.CreateManyAsync(new[] { tenantKindCreated, tenantStatus, snapshotIndex }, cancellationToken: cancellationToken)
.ConfigureAwait(false);
}
private static async Task EnsureLocksIndexesAsync(SchedulerMongoContext context, CancellationToken cancellationToken)
{

View File

@@ -13,11 +13,15 @@ public sealed class SchedulerMongoOptions
public string SchedulesCollection { get; set; } = "schedules";
public string RunsCollection { get; set; } = "runs";
public string ImpactSnapshotsCollection { get; set; } = "impact_snapshots";
public string AuditCollection { get; set; } = "audit";
public string RunsCollection { get; set; } = "runs";
public string ImpactSnapshotsCollection { get; set; } = "impact_snapshots";
public string AuditCollection { get; set; } = "audit";
public string RunSummariesCollection { get; set; } = "run_summaries";
public string GraphJobsCollection { get; set; } = "graph_jobs";
public string LocksCollection { get; set; } = "locks";

View File

@@ -0,0 +1,36 @@
using System.Collections.Immutable;
using StellaOps.Scheduler.Models;
namespace StellaOps.Scheduler.Storage.Mongo.Projections;
public sealed record RunSummaryProjection(
string TenantId,
string ScheduleId,
DateTimeOffset UpdatedAt,
RunSummarySnapshot? LastRun,
ImmutableArray<RunSummarySnapshot> Recent,
RunSummaryCounters Counters);
public sealed record RunSummarySnapshot(
string RunId,
RunTrigger Trigger,
RunState State,
DateTimeOffset CreatedAt,
DateTimeOffset? StartedAt,
DateTimeOffset? FinishedAt,
RunStats Stats,
string? Error);
public sealed record RunSummaryCounters(
int Total,
int Planning,
int Queued,
int Running,
int Completed,
int Error,
int Cancelled,
int TotalDeltas,
int TotalNewCriticals,
int TotalNewHigh,
int TotalNewMedium,
int TotalNewLow);

View File

@@ -22,4 +22,18 @@ All timestamps are persisted as UTC (`+00:00`). Empty selector filters remain em
4. Follow `docs/11_DATA_SCHEMAS.md` for index requirements; update that doc if storage diverges.
5. Register `AddSchedulerMongoStorage` in the host and call `ISchedulerMongoInitializer.EnsureMigrationsAsync` during bootstrap so collections/indexes are created before workers/web APIs start.
With these artefacts in place the dependency on SCHED-MODELS-16-101/102 is cleared—storage work can move to DOING.
With these artefacts in place the dependency on SCHED-MODELS-16-101/102 is cleared—storage work can move to DOING.
## Repositories & Sessions (Sprint 16)
- `AddSchedulerMongoStorage` now registers the scheduler repositories:
- `IScheduleRepository` — CRUD helpers with tenant scoping and soft-delete markers (`deletedAt`, `deletedBy`).
- `IRunRepository` — create/update/list helpers sorted by `createdAt`, honouring the TTL index on `finishedAt`.
- `IImpactSnapshotRepository` — stores canonical `ImpactSet` snapshots with deterministic selector digests.
- `IAuditRepository` — append/list audit log entries with optional category/schedule/run filters.
- `IRunSummaryRepository` — persists the `run_summaries` materialised view keyed by tenant/schedule.
- `IRunSummaryService` — projects run updates into the materialised view (latest run + rolling counters for the last 20 updates).
- `ISchedulerAuditService` — convenience layer that stamps IDs/timestamps and writes `AuditRecord` instances with consistent metadata.
- `ISchedulerMongoSessionFactory` provides causal-consistent Mongo sessions (default `CausalConsistency = true`) that repositories accept for read-after-write flows.
- All repositories persist canonical JSON via `CanonicalJsonSerializer`; helper mappers strip internal fields before materialising DTOs.
- Soft-deleted schedules keep historical documents but are excluded from queries unless `ScheduleQueryOptions.IncludeDeleted` is set; deletions also disable the schedule and bump `updatedAt/updatedBy`.

View File

@@ -0,0 +1,32 @@
namespace StellaOps.Scheduler.Storage.Mongo.Repositories;
/// <summary>
/// Filters applied when querying scheduler audit records.
/// </summary>
public sealed class AuditQueryOptions
{
/// <summary>
/// Optional audit category filter (e.g., "scheduler").
/// </summary>
public string? Category { get; init; }
/// <summary>
/// Optional schedule identifier filter.
/// </summary>
public string? ScheduleId { get; init; }
/// <summary>
/// Optional run identifier filter.
/// </summary>
public string? RunId { get; init; }
/// <summary>
/// Lower bound for audit occurrence timestamp.
/// </summary>
public DateTimeOffset? Since { get; init; }
/// <summary>
/// Maximum number of records to return.
/// </summary>
public int? Limit { get; init; }
}

View File

@@ -0,0 +1,99 @@
using System;
using System.Collections.Generic;
using System.Linq;
using MongoDB.Bson;
using MongoDB.Driver;
using StellaOps.Scheduler.Models;
using StellaOps.Scheduler.Storage.Mongo.Internal;
using StellaOps.Scheduler.Storage.Mongo.Serialization;
namespace StellaOps.Scheduler.Storage.Mongo.Repositories;
internal sealed class AuditRepository : IAuditRepository
{
private static readonly FilterDefinitionBuilder<BsonDocument> Filter = Builders<BsonDocument>.Filter;
private static readonly SortDefinitionBuilder<BsonDocument> Sort = Builders<BsonDocument>.Sort;
private readonly IMongoCollection<BsonDocument> _collection;
public AuditRepository(SchedulerMongoContext context)
{
if (context is null)
{
throw new ArgumentNullException(nameof(context));
}
_collection = context.Database.GetCollection<BsonDocument>(context.Options.AuditCollection);
}
public async Task InsertAsync(
AuditRecord record,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(record);
var document = AuditRecordDocumentMapper.ToBsonDocument(record);
if (session is null)
{
await _collection.InsertOneAsync(document, cancellationToken: cancellationToken).ConfigureAwait(false);
}
else
{
await _collection.InsertOneAsync(session, document, cancellationToken: cancellationToken).ConfigureAwait(false);
}
}
public async Task<IReadOnlyList<AuditRecord>> ListAsync(
string tenantId,
AuditQueryOptions? options = null,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(tenantId))
{
throw new ArgumentException("Tenant id must be provided.", nameof(tenantId));
}
options ??= new AuditQueryOptions();
var filters = new List<FilterDefinition<BsonDocument>>
{
Filter.Eq("tenantId", tenantId)
};
if (!string.IsNullOrWhiteSpace(options.Category))
{
filters.Add(Filter.Eq("category", options.Category));
}
if (!string.IsNullOrWhiteSpace(options.ScheduleId))
{
filters.Add(Filter.Eq("scheduleId", options.ScheduleId));
}
if (!string.IsNullOrWhiteSpace(options.RunId))
{
filters.Add(Filter.Eq("runId", options.RunId));
}
if (options.Since is { } since)
{
filters.Add(Filter.Gte("occurredAt", since.ToUniversalTime().ToString("O")));
}
var combined = Filter.And(filters);
var find = session is null
? _collection.Find(combined)
: _collection.Find(session, combined);
var limit = options.Limit is { } specified && specified > 0 ? specified : 100;
var documents = await find
.Sort(Sort.Descending("occurredAt"))
.Limit(limit)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
return documents.Select(AuditRecordDocumentMapper.FromBsonDocument).ToArray();
}
}

View File

@@ -0,0 +1,126 @@
using MongoDB.Bson;
using MongoDB.Driver;
using StellaOps.Scheduler.Models;
using StellaOps.Scheduler.Storage.Mongo.Internal;
using StellaOps.Scheduler.Storage.Mongo.Serialization;
namespace StellaOps.Scheduler.Storage.Mongo.Repositories;
internal sealed class GraphJobRepository : IGraphJobRepository
{
private static readonly FilterDefinitionBuilder<BsonDocument> Filter = Builders<BsonDocument>.Filter;
private readonly IMongoCollection<BsonDocument> _collection;
public GraphJobRepository(SchedulerMongoContext context)
{
ArgumentNullException.ThrowIfNull(context);
_collection = context.Database.GetCollection<BsonDocument>(context.Options.GraphJobsCollection);
}
public async Task InsertAsync(GraphBuildJob job, CancellationToken cancellationToken = default)
{
var document = GraphJobDocumentMapper.ToBsonDocument(job);
await _collection.InsertOneAsync(document, cancellationToken: cancellationToken).ConfigureAwait(false);
}
public async Task InsertAsync(GraphOverlayJob job, CancellationToken cancellationToken = default)
{
var document = GraphJobDocumentMapper.ToBsonDocument(job);
await _collection.InsertOneAsync(document, cancellationToken: cancellationToken).ConfigureAwait(false);
}
public async Task<GraphBuildJob?> GetBuildJobAsync(string tenantId, string jobId, CancellationToken cancellationToken = default)
{
var filter = BuildIdFilter(tenantId, jobId, "build");
var document = await _collection.Find(filter).FirstOrDefaultAsync(cancellationToken).ConfigureAwait(false);
return document is null ? null : GraphJobDocumentMapper.ToGraphBuildJob(document);
}
public async Task<GraphOverlayJob?> GetOverlayJobAsync(string tenantId, string jobId, CancellationToken cancellationToken = default)
{
var filter = BuildIdFilter(tenantId, jobId, "overlay");
var document = await _collection.Find(filter).FirstOrDefaultAsync(cancellationToken).ConfigureAwait(false);
return document is null ? null : GraphJobDocumentMapper.ToGraphOverlayJob(document);
}
public async Task<GraphBuildJob> ReplaceAsync(GraphBuildJob job, CancellationToken cancellationToken = default)
{
var document = GraphJobDocumentMapper.ToBsonDocument(job);
var filter = BuildIdFilter(job.TenantId, job.Id, "build");
await _collection.ReplaceOneAsync(filter, document, new ReplaceOptions { IsUpsert = false }, cancellationToken).ConfigureAwait(false);
return job;
}
public async Task<GraphOverlayJob> ReplaceAsync(GraphOverlayJob job, CancellationToken cancellationToken = default)
{
var document = GraphJobDocumentMapper.ToBsonDocument(job);
var filter = BuildIdFilter(job.TenantId, job.Id, "overlay");
await _collection.ReplaceOneAsync(filter, document, new ReplaceOptions { IsUpsert = false }, cancellationToken).ConfigureAwait(false);
return job;
}
public async Task<IReadOnlyList<GraphBuildJob>> ListBuildJobsAsync(string tenantId, GraphJobStatus? status, int limit, CancellationToken cancellationToken = default)
{
var filters = new List<FilterDefinition<BsonDocument>>
{
Filter.Eq("tenantId", tenantId),
Filter.Eq("kind", "build")
};
if (status is { } s)
{
filters.Add(Filter.Eq("status", s.ToString().ToLowerInvariant()));
}
var filter = Filter.And(filters);
var cursor = await _collection.Find(filter)
.Sort(Builders<BsonDocument>.Sort.Descending("createdAt"))
.Limit(limit)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
return cursor.Select(GraphJobDocumentMapper.ToGraphBuildJob).ToArray();
}
public async Task<IReadOnlyList<GraphOverlayJob>> ListOverlayJobsAsync(string tenantId, GraphJobStatus? status, int limit, CancellationToken cancellationToken = default)
{
var filters = new List<FilterDefinition<BsonDocument>>
{
Filter.Eq("tenantId", tenantId),
Filter.Eq("kind", "overlay")
};
if (status is { } s)
{
filters.Add(Filter.Eq("status", s.ToString().ToLowerInvariant()));
}
var filter = Filter.And(filters);
var cursor = await _collection.Find(filter)
.Sort(Builders<BsonDocument>.Sort.Descending("createdAt"))
.Limit(limit)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
return cursor.Select(GraphJobDocumentMapper.ToGraphOverlayJob).ToArray();
}
public async Task<IReadOnlyCollection<GraphOverlayJob>> ListOverlayJobsAsync(string tenantId, CancellationToken cancellationToken = default)
{
var filter = Filter.And(
Filter.Eq("tenantId", tenantId),
Filter.Eq("kind", "overlay"));
var cursor = await _collection.Find(filter)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
return cursor.Select(GraphJobDocumentMapper.ToGraphOverlayJob).ToArray();
}
private static FilterDefinition<BsonDocument> BuildIdFilter(string tenantId, string jobId, string kind)
=> Filter.And(
Filter.Eq("_id", jobId),
Filter.Eq("tenantId", tenantId),
Filter.Eq("kind", kind));
}

View File

@@ -0,0 +1,18 @@
using MongoDB.Driver;
using StellaOps.Scheduler.Models;
namespace StellaOps.Scheduler.Storage.Mongo.Repositories;
public interface IAuditRepository
{
Task InsertAsync(
AuditRecord record,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default);
Task<IReadOnlyList<AuditRecord>> ListAsync(
string tenantId,
AuditQueryOptions? options = null,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default);
}

View File

@@ -0,0 +1,24 @@
using StellaOps.Scheduler.Models;
namespace StellaOps.Scheduler.Storage.Mongo.Repositories;
public interface IGraphJobRepository
{
Task InsertAsync(GraphBuildJob job, CancellationToken cancellationToken = default);
Task InsertAsync(GraphOverlayJob job, CancellationToken cancellationToken = default);
Task<GraphBuildJob?> GetBuildJobAsync(string tenantId, string jobId, CancellationToken cancellationToken = default);
Task<GraphOverlayJob?> GetOverlayJobAsync(string tenantId, string jobId, CancellationToken cancellationToken = default);
Task<GraphBuildJob> ReplaceAsync(GraphBuildJob job, CancellationToken cancellationToken = default);
Task<GraphOverlayJob> ReplaceAsync(GraphOverlayJob job, CancellationToken cancellationToken = default);
Task<IReadOnlyList<GraphBuildJob>> ListBuildJobsAsync(string tenantId, GraphJobStatus? status, int limit, CancellationToken cancellationToken = default);
Task<IReadOnlyList<GraphOverlayJob>> ListOverlayJobsAsync(string tenantId, GraphJobStatus? status, int limit, CancellationToken cancellationToken = default);
Task<IReadOnlyCollection<GraphOverlayJob>> ListOverlayJobsAsync(string tenantId, CancellationToken cancellationToken = default);
}

View File

@@ -0,0 +1,22 @@
using MongoDB.Driver;
using StellaOps.Scheduler.Models;
namespace StellaOps.Scheduler.Storage.Mongo.Repositories;
public interface IImpactSnapshotRepository
{
Task UpsertAsync(
ImpactSet snapshot,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default);
Task<ImpactSet?> GetBySnapshotIdAsync(
string snapshotId,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default);
Task<ImpactSet?> GetLatestBySelectorAsync(
Selector selector,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default);
}

View File

@@ -0,0 +1,29 @@
using MongoDB.Driver;
using StellaOps.Scheduler.Models;
namespace StellaOps.Scheduler.Storage.Mongo.Repositories;
public interface IRunRepository
{
Task InsertAsync(
Run run,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default);
Task<bool> UpdateAsync(
Run run,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default);
Task<Run?> GetAsync(
string tenantId,
string runId,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default);
Task<IReadOnlyList<Run>> ListAsync(
string tenantId,
RunQueryOptions? options = null,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default);
}

View File

@@ -0,0 +1,19 @@
using StellaOps.Scheduler.Storage.Mongo.Documents;
namespace StellaOps.Scheduler.Storage.Mongo.Repositories;
internal interface IRunSummaryRepository
{
Task<RunSummaryDocument?> GetAsync(
string tenantId,
string scheduleId,
CancellationToken cancellationToken = default);
Task<IReadOnlyList<RunSummaryDocument>> ListAsync(
string tenantId,
CancellationToken cancellationToken = default);
Task UpsertAsync(
RunSummaryDocument document,
CancellationToken cancellationToken = default);
}

View File

@@ -0,0 +1,32 @@
using MongoDB.Driver;
using StellaOps.Scheduler.Models;
namespace StellaOps.Scheduler.Storage.Mongo.Repositories;
public interface IScheduleRepository
{
Task UpsertAsync(
Schedule schedule,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default);
Task<Schedule?> GetAsync(
string tenantId,
string scheduleId,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default);
Task<IReadOnlyList<Schedule>> ListAsync(
string tenantId,
ScheduleQueryOptions? options = null,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default);
Task<bool> SoftDeleteAsync(
string tenantId,
string scheduleId,
string deletedBy,
DateTimeOffset deletedAt,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default);
}

View File

@@ -0,0 +1,94 @@
using System;
using MongoDB.Bson;
using MongoDB.Driver;
using StellaOps.Scheduler.Models;
using StellaOps.Scheduler.Storage.Mongo.Internal;
using StellaOps.Scheduler.Storage.Mongo.Serialization;
namespace StellaOps.Scheduler.Storage.Mongo.Repositories;
internal sealed class ImpactSnapshotRepository : IImpactSnapshotRepository
{
private static readonly FilterDefinitionBuilder<BsonDocument> Filter = Builders<BsonDocument>.Filter;
private static readonly SortDefinitionBuilder<BsonDocument> Sort = Builders<BsonDocument>.Sort;
private readonly IMongoCollection<BsonDocument> _collection;
public ImpactSnapshotRepository(SchedulerMongoContext context)
{
if (context is null)
{
throw new ArgumentNullException(nameof(context));
}
_collection = context.Database.GetCollection<BsonDocument>(context.Options.ImpactSnapshotsCollection);
}
public async Task UpsertAsync(
ImpactSet snapshot,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(snapshot);
var document = ImpactSetDocumentMapper.ToBsonDocument(snapshot);
var filter = Filter.Eq("_id", document["_id"]);
var options = new ReplaceOptions { IsUpsert = true };
if (session is null)
{
await _collection.ReplaceOneAsync(filter, document, options, cancellationToken).ConfigureAwait(false);
}
else
{
await _collection.ReplaceOneAsync(session, filter, document, options, cancellationToken).ConfigureAwait(false);
}
}
public async Task<ImpactSet?> GetBySnapshotIdAsync(
string snapshotId,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(snapshotId))
{
throw new ArgumentException("Snapshot id must be provided.", nameof(snapshotId));
}
var filter = Filter.Eq("_id", snapshotId);
var query = session is null
? _collection.Find(filter)
: _collection.Find(session, filter);
var document = await query.FirstOrDefaultAsync(cancellationToken).ConfigureAwait(false);
return document is null ? null : ImpactSetDocumentMapper.FromBsonDocument(document);
}
public async Task<ImpactSet?> GetLatestBySelectorAsync(
Selector selector,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(selector);
if (string.IsNullOrWhiteSpace(selector.TenantId))
{
throw new ArgumentException("Selector tenantId is required to resolve impact snapshots.", nameof(selector));
}
var digest = ImpactSetDocumentMapper.ComputeSelectorDigest(selector);
var filters = Filter.And(
Filter.Eq("selectorDigest", digest),
Filter.Eq("selector.tenantId", selector.TenantId));
var find = session is null
? _collection.Find(filters)
: _collection.Find(session, filters);
var document = await find
.Sort(Sort.Descending("generatedAt"))
.FirstOrDefaultAsync(cancellationToken)
.ConfigureAwait(false);
return document is null ? null : ImpactSetDocumentMapper.FromBsonDocument(document);
}
}

View File

@@ -0,0 +1,35 @@
using System.Collections.Immutable;
using StellaOps.Scheduler.Models;
namespace StellaOps.Scheduler.Storage.Mongo.Repositories;
/// <summary>
/// Filters applied when listing scheduler runs.
/// </summary>
public sealed class RunQueryOptions
{
/// <summary>
/// Optional schedule identifier to scope the list.
/// </summary>
public string? ScheduleId { get; init; }
/// <summary>
/// Optional set of run states to include. When empty all states are returned.
/// </summary>
public ImmutableArray<RunState> States { get; init; } = ImmutableArray<RunState>.Empty;
/// <summary>
/// Optional lower bound for creation timestamp (UTC).
/// </summary>
public DateTimeOffset? CreatedAfter { get; init; }
/// <summary>
/// Maximum number of runs to return (default 50 when unspecified).
/// </summary>
public int? Limit { get; init; }
/// <summary>
/// Sort order flag. Defaults to descending by createdAt.
/// </summary>
public bool SortAscending { get; init; }
}

View File

@@ -0,0 +1,153 @@
using System;
using System.Collections.Generic;
using System.Linq;
using MongoDB.Bson;
using MongoDB.Driver;
using StellaOps.Scheduler.Models;
using StellaOps.Scheduler.Storage.Mongo.Internal;
using StellaOps.Scheduler.Storage.Mongo.Serialization;
namespace StellaOps.Scheduler.Storage.Mongo.Repositories;
internal sealed class RunRepository : IRunRepository
{
private const int DefaultListLimit = 50;
private static readonly FilterDefinitionBuilder<BsonDocument> Filter = Builders<BsonDocument>.Filter;
private static readonly SortDefinitionBuilder<BsonDocument> Sort = Builders<BsonDocument>.Sort;
private readonly IMongoCollection<BsonDocument> _collection;
public RunRepository(SchedulerMongoContext context)
{
if (context is null)
{
throw new ArgumentNullException(nameof(context));
}
_collection = context.Database.GetCollection<BsonDocument>(context.Options.RunsCollection);
}
public async Task InsertAsync(
Run run,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(run);
var document = RunDocumentMapper.ToBsonDocument(run);
if (session is null)
{
await _collection.InsertOneAsync(document, cancellationToken: cancellationToken).ConfigureAwait(false);
}
else
{
await _collection.InsertOneAsync(session, document, cancellationToken: cancellationToken).ConfigureAwait(false);
}
}
public async Task<bool> UpdateAsync(
Run run,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(run);
var document = RunDocumentMapper.ToBsonDocument(run);
var filter = Filter.And(
Filter.Eq("_id", run.Id),
Filter.Eq("tenantId", run.TenantId));
var options = new ReplaceOptions { IsUpsert = false };
ReplaceOneResult result;
if (session is null)
{
result = await _collection.ReplaceOneAsync(filter, document, options, cancellationToken).ConfigureAwait(false);
}
else
{
result = await _collection.ReplaceOneAsync(session, filter, document, options, cancellationToken).ConfigureAwait(false);
}
return result.MatchedCount > 0;
}
public async Task<Run?> GetAsync(
string tenantId,
string runId,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(tenantId))
{
throw new ArgumentException("Tenant id must be provided.", nameof(tenantId));
}
if (string.IsNullOrWhiteSpace(runId))
{
throw new ArgumentException("Run id must be provided.", nameof(runId));
}
var filter = Filter.And(
Filter.Eq("_id", runId),
Filter.Eq("tenantId", tenantId));
var query = session is null
? _collection.Find(filter)
: _collection.Find(session, filter);
var document = await query.FirstOrDefaultAsync(cancellationToken).ConfigureAwait(false);
return document is null ? null : RunDocumentMapper.FromBsonDocument(document);
}
public async Task<IReadOnlyList<Run>> ListAsync(
string tenantId,
RunQueryOptions? options = null,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(tenantId))
{
throw new ArgumentException("Tenant id must be provided.", nameof(tenantId));
}
options ??= new RunQueryOptions();
var filters = new List<FilterDefinition<BsonDocument>>
{
Filter.Eq("tenantId", tenantId)
};
if (!string.IsNullOrWhiteSpace(options.ScheduleId))
{
filters.Add(Filter.Eq("scheduleId", options.ScheduleId));
}
if (options.States.Length > 0)
{
filters.Add(Filter.In("state", options.States.Select(state => state.ToString().ToLowerInvariant())));
}
if (options.CreatedAfter is { } createdAfter)
{
filters.Add(Filter.Gt("createdAt", createdAfter.ToUniversalTime().UtcDateTime));
}
var combined = Filter.And(filters);
var find = session is null
? _collection.Find(combined)
: _collection.Find(session, combined);
var limit = options.Limit is { } specified && specified > 0 ? specified : DefaultListLimit;
find = find.Limit(limit);
var sortDefinition = options.SortAscending
? Sort.Ascending("createdAt")
: Sort.Descending("createdAt");
find = find.Sort(sortDefinition);
var documents = await find.ToListAsync(cancellationToken).ConfigureAwait(false);
return documents.Select(RunDocumentMapper.FromBsonDocument).ToArray();
}
}

View File

@@ -0,0 +1,79 @@
using MongoDB.Driver;
using StellaOps.Scheduler.Storage.Mongo.Documents;
using StellaOps.Scheduler.Storage.Mongo.Internal;
namespace StellaOps.Scheduler.Storage.Mongo.Repositories;
internal sealed class RunSummaryRepository : IRunSummaryRepository
{
private readonly IMongoCollection<RunSummaryDocument> _collection;
public RunSummaryRepository(SchedulerMongoContext context)
{
ArgumentNullException.ThrowIfNull(context);
_collection = context.Database.GetCollection<RunSummaryDocument>(context.Options.RunSummariesCollection);
}
public async Task<RunSummaryDocument?> GetAsync(
string tenantId,
string scheduleId,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(tenantId))
{
throw new ArgumentException("Tenant id must be provided.", nameof(tenantId));
}
if (string.IsNullOrWhiteSpace(scheduleId))
{
throw new ArgumentException("Schedule id must be provided.", nameof(scheduleId));
}
var filter = Builders<RunSummaryDocument>.Filter.Eq(document => document.Id, CreateDocumentId(tenantId, scheduleId));
var document = await _collection.Find(filter).FirstOrDefaultAsync(cancellationToken).ConfigureAwait(false);
return document;
}
public async Task<IReadOnlyList<RunSummaryDocument>> ListAsync(
string tenantId,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(tenantId))
{
throw new ArgumentException("Tenant id must be provided.", nameof(tenantId));
}
var filter = Builders<RunSummaryDocument>.Filter.Eq(document => document.TenantId, tenantId);
var sort = Builders<RunSummaryDocument>.Sort.Descending(document => document.UpdatedAt);
var documents = await _collection.Find(filter).Sort(sort).ToListAsync(cancellationToken).ConfigureAwait(false);
return documents;
}
public Task UpsertAsync(
RunSummaryDocument document,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(document);
if (string.IsNullOrWhiteSpace(document.TenantId))
{
throw new ArgumentException("Tenant id must be provided.", nameof(document.TenantId));
}
if (string.IsNullOrWhiteSpace(document.ScheduleId))
{
throw new ArgumentException("Schedule id must be provided.", nameof(document.ScheduleId));
}
document.Id = CreateDocumentId(document.TenantId, document.ScheduleId);
var filter = Builders<RunSummaryDocument>.Filter.Eq(x => x.Id, document.Id);
return _collection.ReplaceOneAsync(filter, document, new ReplaceOptions { IsUpsert = true }, cancellationToken);
}
private static string CreateDocumentId(string tenantId, string scheduleId)
=> string.Create(tenantId.Length + scheduleId.Length + 1, (tenantId, scheduleId), static (span, value) =>
{
value.tenantId.AsSpan().CopyTo(span);
span[value.tenantId.Length] = ':';
value.scheduleId.AsSpan().CopyTo(span[(value.tenantId.Length + 1)..]);
});
}

View File

@@ -0,0 +1,22 @@
namespace StellaOps.Scheduler.Storage.Mongo.Repositories;
/// <summary>
/// Filters applied when listing scheduler schedules.
/// </summary>
public sealed class ScheduleQueryOptions
{
/// <summary>
/// When true, returns disabled schedules; otherwise disabled entries are excluded.
/// </summary>
public bool IncludeDisabled { get; init; }
/// <summary>
/// When true, includes soft-deleted schedules; by default deleted entries are excluded.
/// </summary>
public bool IncludeDeleted { get; init; }
/// <summary>
/// Optional maximum number of schedules to return.
/// </summary>
public int? Limit { get; init; }
}

View File

@@ -0,0 +1,180 @@
using System;
using System.Collections.Generic;
using System.Linq;
using MongoDB.Bson;
using MongoDB.Driver;
using StellaOps.Scheduler.Models;
using StellaOps.Scheduler.Storage.Mongo.Internal;
using StellaOps.Scheduler.Storage.Mongo.Serialization;
namespace StellaOps.Scheduler.Storage.Mongo.Repositories;
internal sealed class ScheduleRepository : IScheduleRepository
{
private static readonly FilterDefinitionBuilder<BsonDocument> Filter = Builders<BsonDocument>.Filter;
private static readonly UpdateDefinitionBuilder<BsonDocument> Update = Builders<BsonDocument>.Update;
private readonly IMongoCollection<BsonDocument> _collection;
public ScheduleRepository(SchedulerMongoContext context)
{
if (context is null)
{
throw new ArgumentNullException(nameof(context));
}
_collection = context.Database.GetCollection<BsonDocument>(context.Options.SchedulesCollection);
}
public async Task UpsertAsync(
Schedule schedule,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(schedule);
var document = ScheduleDocumentMapper.ToBsonDocument(schedule);
document.Remove("deletedAt");
document.Remove("deletedBy");
var filter = Filter.And(
Filter.Eq("_id", schedule.Id),
Filter.Eq("tenantId", schedule.TenantId));
var options = new ReplaceOptions { IsUpsert = true };
if (session is null)
{
await _collection.ReplaceOneAsync(filter, document, options, cancellationToken).ConfigureAwait(false);
}
else
{
await _collection.ReplaceOneAsync(session, filter, document, options, cancellationToken).ConfigureAwait(false);
}
}
public async Task<Schedule?> GetAsync(
string tenantId,
string scheduleId,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(tenantId))
{
throw new ArgumentException("Tenant id must be provided.", nameof(tenantId));
}
if (string.IsNullOrWhiteSpace(scheduleId))
{
throw new ArgumentException("Schedule id must be provided.", nameof(scheduleId));
}
var filter = Filter.And(
Filter.Eq("_id", scheduleId),
Filter.Eq("tenantId", tenantId),
Filter.Or(
Filter.Exists("deletedAt", false),
Filter.Eq("deletedAt", BsonNull.Value)));
var query = session is null
? _collection.Find(filter)
: _collection.Find(session, filter);
var document = await query.FirstOrDefaultAsync(cancellationToken).ConfigureAwait(false);
return document is null ? null : ScheduleDocumentMapper.FromBsonDocument(document);
}
public async Task<IReadOnlyList<Schedule>> ListAsync(
string tenantId,
ScheduleQueryOptions? options = null,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(tenantId))
{
throw new ArgumentException("Tenant id must be provided.", nameof(tenantId));
}
options ??= new ScheduleQueryOptions();
var filters = new List<FilterDefinition<BsonDocument>>
{
Filter.Eq("tenantId", tenantId)
};
if (!options.IncludeDeleted)
{
filters.Add(Filter.Or(
Filter.Exists("deletedAt", false),
Filter.Eq("deletedAt", BsonNull.Value)));
}
if (!options.IncludeDisabled)
{
filters.Add(Filter.Eq("enabled", true));
}
var combined = Filter.And(filters);
var find = session is null
? _collection.Find(combined)
: _collection.Find(session, combined);
if (options.Limit is { } limit && limit > 0)
{
find = find.Limit(limit);
}
var documents = await find.Sort(Builders<BsonDocument>.Sort.Ascending("name"))
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
return documents.Select(ScheduleDocumentMapper.FromBsonDocument).ToArray();
}
public async Task<bool> SoftDeleteAsync(
string tenantId,
string scheduleId,
string deletedBy,
DateTimeOffset deletedAt,
IClientSessionHandle? session = null,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(tenantId))
{
throw new ArgumentException("Tenant id must be provided.", nameof(tenantId));
}
if (string.IsNullOrWhiteSpace(scheduleId))
{
throw new ArgumentException("Schedule id must be provided.", nameof(scheduleId));
}
if (string.IsNullOrWhiteSpace(deletedBy))
{
throw new ArgumentException("Deleted by must be provided.", nameof(deletedBy));
}
var filter = Filter.And(
Filter.Eq("_id", scheduleId),
Filter.Eq("tenantId", tenantId));
var utc = deletedAt.ToUniversalTime();
var update = Update
.Set("deletedAt", utc.UtcDateTime)
.Set("deletedBy", deletedBy)
.Set("enabled", false)
.Set("updatedAt", utc.UtcDateTime)
.Set("updatedBy", deletedBy);
UpdateResult result;
if (session is null)
{
result = await _collection.UpdateOneAsync(filter, update, cancellationToken: cancellationToken).ConfigureAwait(false);
}
else
{
result = await _collection.UpdateOneAsync(session, filter, update, cancellationToken: cancellationToken).ConfigureAwait(false);
}
return result.ModifiedCount > 0;
}
}

View File

@@ -0,0 +1,23 @@
using MongoDB.Bson;
using StellaOps.Scheduler.Models;
namespace StellaOps.Scheduler.Storage.Mongo.Serialization;
internal static class AuditRecordDocumentMapper
{
public static BsonDocument ToBsonDocument(AuditRecord record)
{
ArgumentNullException.ThrowIfNull(record);
var json = CanonicalJsonSerializer.Serialize(record);
var document = BsonDocument.Parse(json);
document["_id"] = record.Id;
return document;
}
public static AuditRecord FromBsonDocument(BsonDocument document)
{
ArgumentNullException.ThrowIfNull(document);
var node = document.ToCanonicalJsonNode();
return CanonicalJsonSerializer.Deserialize<AuditRecord>(node.ToCanonicalJson());
}
}

View File

@@ -0,0 +1,144 @@
using System.Globalization;
using System.Text.Json;
using System.Text.Json.Nodes;
using MongoDB.Bson;
using MongoDB.Bson.IO;
namespace StellaOps.Scheduler.Storage.Mongo.Serialization;
internal static class BsonDocumentJsonExtensions
{
public static JsonNode ToCanonicalJsonNode(this BsonDocument document, params string[] fieldsToRemove)
{
ArgumentNullException.ThrowIfNull(document);
var clone = document.DeepClone().AsBsonDocument;
clone.Remove("_id");
if (fieldsToRemove is { Length: > 0 })
{
foreach (var field in fieldsToRemove)
{
clone.Remove(field);
}
}
var json = clone.ToJson(new JsonWriterSettings
{
OutputMode = JsonOutputMode.RelaxedExtendedJson,
Indent = false,
});
var node = JsonNode.Parse(json) ?? throw new InvalidOperationException("Unable to parse BSON document JSON.");
return NormalizeExtendedJson(node);
}
private static JsonNode NormalizeExtendedJson(JsonNode node)
{
if (node is JsonObject obj)
{
if (TryConvertExtendedDate(obj, out var replacement))
{
return replacement;
}
foreach (var property in obj.ToList())
{
if (property.Value is null)
{
continue;
}
var normalized = NormalizeExtendedJson(property.Value);
if (!ReferenceEquals(normalized, property.Value))
{
obj[property.Key] = normalized;
}
}
return obj;
}
if (node is JsonArray array)
{
for (var i = 0; i < array.Count; i++)
{
if (array[i] is null)
{
continue;
}
var normalized = NormalizeExtendedJson(array[i]!);
if (!ReferenceEquals(normalized, array[i]))
{
array[i] = normalized;
}
}
return array;
}
return node;
}
private static bool TryConvertExtendedDate(JsonObject obj, out JsonNode replacement)
{
replacement = obj;
if (obj.Count != 1 || !obj.TryGetPropertyValue("$date", out var value) || value is null)
{
return false;
}
if (value is JsonValue directValue)
{
if (directValue.TryGetValue(out string? dateString) && TryParseIso(dateString, out var iso))
{
replacement = JsonValue.Create(iso);
return true;
}
if (directValue.TryGetValue(out long epochMilliseconds))
{
replacement = JsonValue.Create(DateTimeOffset.FromUnixTimeMilliseconds(epochMilliseconds).ToString("O"));
return true;
}
}
else if (value is JsonObject nested &&
nested.TryGetPropertyValue("$numberLong", out var numberNode) &&
numberNode is JsonValue numberValue &&
numberValue.TryGetValue(out string? numberString) &&
long.TryParse(numberString, NumberStyles.Integer, CultureInfo.InvariantCulture, out var ms))
{
replacement = JsonValue.Create(DateTimeOffset.FromUnixTimeMilliseconds(ms).ToString("O"));
return true;
}
return false;
}
private static bool TryParseIso(string? value, out string iso)
{
iso = string.Empty;
if (string.IsNullOrWhiteSpace(value))
{
return false;
}
if (DateTimeOffset.TryParse(value, CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind, out var parsed))
{
iso = parsed.ToUniversalTime().ToString("O");
return true;
}
return false;
}
public static string ToCanonicalJson(this JsonNode node)
{
ArgumentNullException.ThrowIfNull(node);
return node.ToJsonString(new JsonSerializerOptions
{
WriteIndented = false
});
}
}

View File

@@ -0,0 +1,125 @@
using MongoDB.Bson;
using StellaOps.Scheduler.Models;
namespace StellaOps.Scheduler.Storage.Mongo.Serialization;
internal static class GraphJobDocumentMapper
{
private const string PayloadField = "payload";
public static BsonDocument ToBsonDocument(GraphBuildJob job)
{
ArgumentNullException.ThrowIfNull(job);
var payloadJson = CanonicalJsonSerializer.Serialize(job);
var payloadDocument = BsonDocument.Parse(payloadJson);
var document = new BsonDocument
{
["_id"] = job.Id,
["tenantId"] = job.TenantId,
["kind"] = "build",
["status"] = job.Status.ToString().ToLowerInvariant(),
["createdAt"] = job.CreatedAt.UtcDateTime,
["attempts"] = job.Attempts,
[PayloadField] = payloadDocument
};
if (!string.IsNullOrWhiteSpace(job.GraphSnapshotId))
{
document["graphSnapshotId"] = job.GraphSnapshotId;
}
if (!string.IsNullOrWhiteSpace(job.CorrelationId))
{
document["correlationId"] = job.CorrelationId;
}
if (job.StartedAt is { } startedAt)
{
document["startedAt"] = startedAt.UtcDateTime;
}
if (job.CompletedAt is { } completedAt)
{
document["completedAt"] = completedAt.UtcDateTime;
}
if (!string.IsNullOrWhiteSpace(job.Error))
{
document["error"] = job.Error;
}
return document;
}
public static BsonDocument ToBsonDocument(GraphOverlayJob job)
{
ArgumentNullException.ThrowIfNull(job);
var payloadJson = CanonicalJsonSerializer.Serialize(job);
var payloadDocument = BsonDocument.Parse(payloadJson);
var document = new BsonDocument
{
["_id"] = job.Id,
["tenantId"] = job.TenantId,
["kind"] = "overlay",
["status"] = job.Status.ToString().ToLowerInvariant(),
["createdAt"] = job.CreatedAt.UtcDateTime,
["attempts"] = job.Attempts,
[PayloadField] = payloadDocument
};
document["graphSnapshotId"] = job.GraphSnapshotId;
document["overlayKind"] = job.OverlayKind.ToString().ToLowerInvariant();
document["overlayKey"] = job.OverlayKey;
if (!string.IsNullOrWhiteSpace(job.BuildJobId))
{
document["buildJobId"] = job.BuildJobId;
}
if (!string.IsNullOrWhiteSpace(job.CorrelationId))
{
document["correlationId"] = job.CorrelationId;
}
if (job.StartedAt is { } startedAt)
{
document["startedAt"] = startedAt.UtcDateTime;
}
if (job.CompletedAt is { } completedAt)
{
document["completedAt"] = completedAt.UtcDateTime;
}
if (!string.IsNullOrWhiteSpace(job.Error))
{
document["error"] = job.Error;
}
return document;
}
public static GraphBuildJob ToGraphBuildJob(BsonDocument document)
{
ArgumentNullException.ThrowIfNull(document);
var payloadDocument = document[PayloadField].AsBsonDocument;
var json = payloadDocument.ToJson();
var job = CanonicalJsonSerializer.Deserialize<GraphBuildJob>(json);
return job;
}
public static GraphOverlayJob ToGraphOverlayJob(BsonDocument document)
{
ArgumentNullException.ThrowIfNull(document);
var payloadDocument = document[PayloadField].AsBsonDocument;
var json = payloadDocument.ToJson();
var job = CanonicalJsonSerializer.Deserialize<GraphOverlayJob>(json);
return job;
}
}

View File

@@ -0,0 +1,57 @@
using System;
using System.Security.Cryptography;
using System.Text;
using MongoDB.Bson;
using StellaOps.Scheduler.Models;
namespace StellaOps.Scheduler.Storage.Mongo.Serialization;
internal static class ImpactSetDocumentMapper
{
private const string SelectorHashPrefix = "selector::";
public static BsonDocument ToBsonDocument(ImpactSet impactSet)
{
ArgumentNullException.ThrowIfNull(impactSet);
var json = CanonicalJsonSerializer.Serialize(impactSet);
var document = BsonDocument.Parse(json);
document["_id"] = ComputeDocumentId(impactSet);
document["selectorDigest"] = ComputeSelectorDigest(impactSet);
return document;
}
public static ImpactSet FromBsonDocument(BsonDocument document)
{
ArgumentNullException.ThrowIfNull(document);
var node = document.ToCanonicalJsonNode();
return CanonicalJsonSerializer.Deserialize<ImpactSet>(node.ToCanonicalJson());
}
private static string ComputeDocumentId(ImpactSet impactSet)
{
if (!string.IsNullOrWhiteSpace(impactSet.SnapshotId))
{
return impactSet.SnapshotId!;
}
var selectorJson = CanonicalJsonSerializer.Serialize(impactSet.Selector);
using var sha256 = SHA256.Create();
var hash = sha256.ComputeHash(Encoding.UTF8.GetBytes(selectorJson));
return SelectorHashPrefix + Convert.ToHexString(hash).ToLowerInvariant();
}
private static string ComputeSelectorDigest(ImpactSet impactSet)
{
return ComputeSelectorDigest(impactSet.Selector);
}
public static string ComputeSelectorDigest(Selector selector)
{
ArgumentNullException.ThrowIfNull(selector);
var selectorJson = CanonicalJsonSerializer.Serialize(selector);
using var sha256 = SHA256.Create();
var hash = sha256.ComputeHash(Encoding.UTF8.GetBytes(selectorJson));
return Convert.ToHexString(hash).ToLowerInvariant();
}
}

View File

@@ -0,0 +1,23 @@
using MongoDB.Bson;
using StellaOps.Scheduler.Models;
namespace StellaOps.Scheduler.Storage.Mongo.Serialization;
internal static class RunDocumentMapper
{
public static BsonDocument ToBsonDocument(Run run)
{
ArgumentNullException.ThrowIfNull(run);
var json = CanonicalJsonSerializer.Serialize(run);
var document = BsonDocument.Parse(json);
document["_id"] = run.Id;
return document;
}
public static Run FromBsonDocument(BsonDocument document)
{
ArgumentNullException.ThrowIfNull(document);
var node = document.ToCanonicalJsonNode();
return CanonicalJsonSerializer.Deserialize<Run>(node.ToCanonicalJson());
}
}

View File

@@ -0,0 +1,25 @@
using MongoDB.Bson;
using StellaOps.Scheduler.Models;
namespace StellaOps.Scheduler.Storage.Mongo.Serialization;
internal static class ScheduleDocumentMapper
{
private static readonly string[] IgnoredFields = { "deletedAt", "deletedBy" };
public static BsonDocument ToBsonDocument(Schedule schedule)
{
ArgumentNullException.ThrowIfNull(schedule);
var json = CanonicalJsonSerializer.Serialize(schedule);
var document = BsonDocument.Parse(json);
document["_id"] = schedule.Id;
return document;
}
public static Schedule FromBsonDocument(BsonDocument document)
{
ArgumentNullException.ThrowIfNull(document);
var node = document.ToCanonicalJsonNode(IgnoredFields);
return CanonicalJsonSerializer.Deserialize<Schedule>(node.ToCanonicalJson());
}
}

View File

@@ -1,8 +1,12 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using StellaOps.Scheduler.Storage.Mongo.Internal;
using StellaOps.Scheduler.Storage.Mongo.Migrations;
using StellaOps.Scheduler.Storage.Mongo.Options;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using StellaOps.Scheduler.Storage.Mongo.Internal;
using StellaOps.Scheduler.Storage.Mongo.Migrations;
using StellaOps.Scheduler.Storage.Mongo.Options;
using StellaOps.Scheduler.Storage.Mongo.Repositories;
using StellaOps.Scheduler.Storage.Mongo.Services;
using StellaOps.Scheduler.Storage.Mongo.Sessions;
namespace StellaOps.Scheduler.Storage.Mongo;
@@ -16,11 +20,21 @@ public static class ServiceCollectionExtensions
services.Configure<SchedulerMongoOptions>(configuration);
services.AddSingleton<SchedulerMongoContext>();
services.AddSingleton<SchedulerMongoMigrationRunner>();
services.AddSingleton<ISchedulerMongoMigration, EnsureSchedulerCollectionsMigration>();
services.AddSingleton<ISchedulerMongoMigration, EnsureSchedulerIndexesMigration>();
services.AddSingleton<ISchedulerMongoInitializer, SchedulerMongoInitializer>();
services.AddHostedService<SchedulerMongoInitializerHostedService>();
return services;
}
}
services.AddSingleton<ISchedulerMongoMigration, EnsureSchedulerCollectionsMigration>();
services.AddSingleton<ISchedulerMongoMigration, EnsureSchedulerIndexesMigration>();
services.AddSingleton<ISchedulerMongoInitializer, SchedulerMongoInitializer>();
services.AddHostedService<SchedulerMongoInitializerHostedService>();
services.TryAddSingleton(TimeProvider.System);
services.AddSingleton<ISchedulerMongoSessionFactory, SchedulerMongoSessionFactory>();
services.AddSingleton<IScheduleRepository, ScheduleRepository>();
services.AddSingleton<IRunRepository, RunRepository>();
services.AddSingleton<IImpactSnapshotRepository, ImpactSnapshotRepository>();
services.AddSingleton<IRunSummaryRepository, RunSummaryRepository>();
services.AddSingleton<IAuditRepository, AuditRepository>();
services.AddSingleton<IGraphJobRepository, GraphJobRepository>();
services.AddSingleton<IRunSummaryService, RunSummaryService>();
services.AddSingleton<ISchedulerAuditService, SchedulerAuditService>();
return services;
}
}

View File

@@ -0,0 +1,20 @@
using StellaOps.Scheduler.Models;
using StellaOps.Scheduler.Storage.Mongo.Projections;
namespace StellaOps.Scheduler.Storage.Mongo.Services;
public interface IRunSummaryService
{
Task<RunSummaryProjection> ProjectAsync(
Run run,
CancellationToken cancellationToken = default);
Task<RunSummaryProjection?> GetAsync(
string tenantId,
string scheduleId,
CancellationToken cancellationToken = default);
Task<IReadOnlyList<RunSummaryProjection>> ListAsync(
string tenantId,
CancellationToken cancellationToken = default);
}

View File

@@ -0,0 +1,10 @@
using StellaOps.Scheduler.Models;
namespace StellaOps.Scheduler.Storage.Mongo.Services;
public interface ISchedulerAuditService
{
Task<AuditRecord> WriteAsync(
SchedulerAuditEvent auditEvent,
CancellationToken cancellationToken = default);
}

View File

@@ -0,0 +1,204 @@
using System.Collections.Immutable;
using Microsoft.Extensions.Logging;
using StellaOps.Scheduler.Models;
using StellaOps.Scheduler.Storage.Mongo.Documents;
using StellaOps.Scheduler.Storage.Mongo.Projections;
using StellaOps.Scheduler.Storage.Mongo.Repositories;
namespace StellaOps.Scheduler.Storage.Mongo.Services;
internal sealed class RunSummaryService : IRunSummaryService
{
private const int RecentLimit = 20;
private readonly IRunSummaryRepository _repository;
private readonly TimeProvider _timeProvider;
private readonly ILogger<RunSummaryService> _logger;
public RunSummaryService(
IRunSummaryRepository repository,
TimeProvider? timeProvider,
ILogger<RunSummaryService> logger)
{
_repository = repository ?? throw new ArgumentNullException(nameof(repository));
_timeProvider = timeProvider ?? TimeProvider.System;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<RunSummaryProjection> ProjectAsync(
Run run,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(run);
if (string.IsNullOrWhiteSpace(run.ScheduleId))
{
throw new ArgumentException("Run must contain a scheduleId to project summary data.", nameof(run));
}
var document = await _repository
.GetAsync(run.TenantId, run.ScheduleId!, cancellationToken)
.ConfigureAwait(false)
?? new RunSummaryDocument
{
TenantId = run.TenantId,
ScheduleId = run.ScheduleId!,
};
UpdateDocument(document, run);
document.UpdatedAt = _timeProvider.GetUtcNow().UtcDateTime;
await _repository.UpsertAsync(document, cancellationToken).ConfigureAwait(false);
_logger.LogDebug(
"Projected run summary for tenant {TenantId} schedule {ScheduleId} using run {RunId}.",
run.TenantId,
run.ScheduleId,
run.Id);
return ToProjection(document);
}
public async Task<RunSummaryProjection?> GetAsync(
string tenantId,
string scheduleId,
CancellationToken cancellationToken = default)
{
var document = await _repository
.GetAsync(tenantId, scheduleId, cancellationToken)
.ConfigureAwait(false);
return document is null ? null : ToProjection(document);
}
public async Task<IReadOnlyList<RunSummaryProjection>> ListAsync(
string tenantId,
CancellationToken cancellationToken = default)
{
var documents = await _repository.ListAsync(tenantId, cancellationToken).ConfigureAwait(false);
return documents.Select(ToProjection).ToArray();
}
private static void UpdateDocument(RunSummaryDocument document, Run run)
{
var entry = document.Recent.FirstOrDefault(item => string.Equals(item.RunId, run.Id, StringComparison.Ordinal));
if (entry is null)
{
entry = new RunSummaryEntryDocument
{
RunId = run.Id,
};
document.Recent.Add(entry);
}
entry.Trigger = run.Trigger;
entry.State = run.State;
entry.CreatedAt = run.CreatedAt.UtcDateTime;
entry.StartedAt = run.StartedAt?.UtcDateTime;
entry.FinishedAt = run.FinishedAt?.UtcDateTime;
entry.Error = run.Error;
entry.Stats = run.Stats;
document.Recent = document.Recent
.OrderByDescending(item => item.CreatedAt)
.ThenByDescending(item => item.RunId, StringComparer.Ordinal)
.Take(RecentLimit)
.ToList();
document.LastRun = document.Recent.FirstOrDefault();
document.Counters = ComputeCounters(document.Recent);
}
private static RunSummaryCountersDocument ComputeCounters(IEnumerable<RunSummaryEntryDocument> entries)
{
var counters = new RunSummaryCountersDocument();
foreach (var entry in entries)
{
counters.Total++;
switch (entry.State)
{
case RunState.Planning:
counters.Planning++;
break;
case RunState.Queued:
counters.Queued++;
break;
case RunState.Running:
counters.Running++;
break;
case RunState.Completed:
counters.Completed++;
break;
case RunState.Error:
counters.Error++;
break;
case RunState.Cancelled:
counters.Cancelled++;
break;
default:
break;
}
counters.TotalDeltas += entry.Stats.Deltas;
counters.TotalNewCriticals += entry.Stats.NewCriticals;
counters.TotalNewHigh += entry.Stats.NewHigh;
counters.TotalNewMedium += entry.Stats.NewMedium;
counters.TotalNewLow += entry.Stats.NewLow;
}
return counters;
}
private static RunSummaryProjection ToProjection(RunSummaryDocument document)
{
var updatedAt = new DateTimeOffset(DateTime.SpecifyKind(document.UpdatedAt, DateTimeKind.Utc));
var lastRun = document.LastRun is null
? null
: ToSnapshot(document.LastRun);
var recent = document.Recent
.Select(ToSnapshot)
.ToImmutableArray();
var counters = new RunSummaryCounters(
document.Counters.Total,
document.Counters.Planning,
document.Counters.Queued,
document.Counters.Running,
document.Counters.Completed,
document.Counters.Error,
document.Counters.Cancelled,
document.Counters.TotalDeltas,
document.Counters.TotalNewCriticals,
document.Counters.TotalNewHigh,
document.Counters.TotalNewMedium,
document.Counters.TotalNewLow);
return new RunSummaryProjection(
document.TenantId,
document.ScheduleId,
updatedAt,
lastRun,
recent,
counters);
}
private static RunSummarySnapshot ToSnapshot(RunSummaryEntryDocument entry)
{
var createdAt = new DateTimeOffset(DateTime.SpecifyKind(entry.CreatedAt, DateTimeKind.Utc));
DateTimeOffset? startedAt = entry.StartedAt is null
? null
: new DateTimeOffset(DateTime.SpecifyKind(entry.StartedAt.Value, DateTimeKind.Utc));
DateTimeOffset? finishedAt = entry.FinishedAt is null
? null
: new DateTimeOffset(DateTime.SpecifyKind(entry.FinishedAt.Value, DateTimeKind.Utc));
return new RunSummarySnapshot(
entry.RunId,
entry.Trigger,
entry.State,
createdAt,
startedAt,
finishedAt,
entry.Stats,
entry.Error);
}
}

View File

@@ -0,0 +1,18 @@
using System.Collections.Generic;
using StellaOps.Scheduler.Models;
namespace StellaOps.Scheduler.Storage.Mongo.Services;
public sealed record SchedulerAuditEvent(
string TenantId,
string Category,
string Action,
AuditActor Actor,
string? EntityId = null,
string? ScheduleId = null,
string? RunId = null,
string? CorrelationId = null,
IReadOnlyDictionary<string, string>? Metadata = null,
string? Message = null,
DateTimeOffset? OccurredAt = null,
string? AuditId = null);

View File

@@ -0,0 +1,62 @@
using Microsoft.Extensions.Logging;
using StellaOps.Scheduler.Models;
using StellaOps.Scheduler.Storage.Mongo.Repositories;
namespace StellaOps.Scheduler.Storage.Mongo.Services;
internal sealed class SchedulerAuditService : ISchedulerAuditService
{
private readonly IAuditRepository _repository;
private readonly TimeProvider _timeProvider;
private readonly ILogger<SchedulerAuditService> _logger;
public SchedulerAuditService(
IAuditRepository repository,
TimeProvider? timeProvider,
ILogger<SchedulerAuditService> logger)
{
_repository = repository ?? throw new ArgumentNullException(nameof(repository));
_timeProvider = timeProvider ?? TimeProvider.System;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<AuditRecord> WriteAsync(
SchedulerAuditEvent auditEvent,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(auditEvent);
var occurredAt = auditEvent.OccurredAt ?? _timeProvider.GetUtcNow();
var identifier = string.IsNullOrWhiteSpace(auditEvent.AuditId)
? $"audit_{Guid.NewGuid():N}"
: auditEvent.AuditId!.Trim();
var metadata = auditEvent.Metadata is null
? Enumerable.Empty<KeyValuePair<string, string>>()
: auditEvent.Metadata.Where(pair => !string.IsNullOrWhiteSpace(pair.Key) && !string.IsNullOrWhiteSpace(pair.Value));
var record = new AuditRecord(
identifier,
auditEvent.TenantId,
auditEvent.Category,
auditEvent.Action,
occurredAt,
auditEvent.Actor,
auditEvent.EntityId,
auditEvent.ScheduleId,
auditEvent.RunId,
auditEvent.CorrelationId,
metadata,
auditEvent.Message);
await _repository.InsertAsync(record, session: null, cancellationToken).ConfigureAwait(false);
_logger.LogDebug(
"Scheduler audit record persisted with id {AuditId} for tenant {TenantId} action {Action}.",
record.Id,
record.TenantId,
record.Action);
return record;
}
}

View File

@@ -0,0 +1,18 @@
using MongoDB.Driver;
namespace StellaOps.Scheduler.Storage.Mongo.Sessions;
/// <summary>
/// Provides helper methods for creating MongoDB sessions used by scheduler storage.
/// </summary>
public interface ISchedulerMongoSessionFactory
{
/// <summary>
/// Starts a new client session applying the provided options.
/// </summary>
/// <param name="options">Session options (optional).</param>
/// <param name="cancellationToken">Cancellation token.</param>
Task<IClientSessionHandle> StartSessionAsync(
SchedulerMongoSessionOptions? options = null,
CancellationToken cancellationToken = default);
}

View File

@@ -0,0 +1,32 @@
using MongoDB.Driver;
using StellaOps.Scheduler.Storage.Mongo.Internal;
namespace StellaOps.Scheduler.Storage.Mongo.Sessions;
internal sealed class SchedulerMongoSessionFactory : ISchedulerMongoSessionFactory
{
private readonly SchedulerMongoContext _context;
public SchedulerMongoSessionFactory(SchedulerMongoContext context)
{
_context = context ?? throw new ArgumentNullException(nameof(context));
}
public Task<IClientSessionHandle> StartSessionAsync(
SchedulerMongoSessionOptions? options = null,
CancellationToken cancellationToken = default)
{
var clientOptions = new ClientSessionOptions
{
CausalConsistency = options?.CausalConsistency ?? true,
};
if (options?.ReadPreference is not null)
{
clientOptions.DefaultTransactionOptions = new TransactionOptions(
readPreference: options.ReadPreference);
}
return _context.Client.StartSessionAsync(clientOptions, cancellationToken);
}
}

View File

@@ -0,0 +1,19 @@
using MongoDB.Driver;
namespace StellaOps.Scheduler.Storage.Mongo.Sessions;
/// <summary>
/// Options controlling MongoDB client sessions for scheduler storage operations.
/// </summary>
public sealed class SchedulerMongoSessionOptions
{
/// <summary>
/// When true (default), the session enables causal consistency for read operations following writes.
/// </summary>
public bool CausalConsistency { get; init; } = true;
/// <summary>
/// Optional read preference override applied to the session when specified.
/// </summary>
public ReadPreference? ReadPreference { get; init; }
}

View File

@@ -1,19 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="8.0.0" />
<PackageReference Include="MongoDB.Bson" Version="3.5.0" />
<PackageReference Include="MongoDB.Driver" Version="3.5.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="../StellaOps.Scheduler.Models/StellaOps.Scheduler.Models.csproj" />
</ItemGroup>
</Project>
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="10.0.0-rc.2.25502.107" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="10.0.0-rc.2.25502.107" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.0-rc.2.25502.107" />
<PackageReference Include="Microsoft.Extensions.Options" Version="10.0.0-rc.2.25502.107" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="10.0.0-rc.2.25502.107" />
<PackageReference Include="MongoDB.Bson" Version="3.5.0" />
<PackageReference Include="MongoDB.Driver" Version="3.5.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="../StellaOps.Scheduler.Models/StellaOps.Scheduler.Models.csproj" />
</ItemGroup>
</Project>

View File

@@ -5,5 +5,5 @@
| ID | Status | Owner(s) | Depends on | Description | Exit Criteria |
|----|--------|----------|------------|-------------|---------------|
| SCHED-STORAGE-16-201 | DONE (2025-10-19) | Scheduler Storage Guild | SCHED-MODELS-16-101 | Create Mongo collections (schedules, runs, impact_cursors, locks, audit) with indexes/migrations per architecture. | Migration scripts and indexes implemented; integration tests cover CRUD paths. |
| SCHED-STORAGE-16-202 | TODO | Scheduler Storage Guild | SCHED-STORAGE-16-201 | Implement repositories/services with tenant scoping, soft delete, TTL for completed runs, and causal consistency options. | Unit tests pass; TTL/soft delete validated; documentation updated. |
| SCHED-STORAGE-16-203 | TODO | Scheduler Storage Guild | SCHED-STORAGE-16-201 | Audit/logging pipeline + run stats materialized views for UI. | Audit entries persisted; stats queries efficient; docs capture usage. |
| SCHED-STORAGE-16-202 | DONE (2025-10-26) | Scheduler Storage Guild | SCHED-STORAGE-16-201 | Implement repositories/services with tenant scoping, soft delete, TTL for completed runs, and causal consistency options. | Unit tests pass; TTL/soft delete validated; documentation updated. |
| SCHED-STORAGE-16-203 | DONE (2025-10-26) | Scheduler Storage Guild | SCHED-STORAGE-16-201 | Audit/logging pipeline + run stats materialized views for UI. | Audit entries persisted; stats queries efficient; docs capture usage. |