feat: Implement MongoDB orchestrator storage with registry, commands, and heartbeats
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
- Added NullAdvisoryObservationEventTransport for handling advisory observation events. - Created IOrchestratorRegistryStore interface for orchestrator registry operations. - Implemented MongoOrchestratorRegistryStore for MongoDB interactions with orchestrator data. - Defined OrchestratorCommandDocument and OrchestratorCommandRecord for command handling. - Added OrchestratorHeartbeatDocument and OrchestratorHeartbeatRecord for heartbeat tracking. - Created OrchestratorRegistryDocument and OrchestratorRegistryRecord for registry management. - Developed tests for orchestrator collections migration and MongoOrchestratorRegistryStore functionality. - Introduced AirgapImportRequest and AirgapImportValidator for air-gapped VEX bundle imports. - Added incident mode rules sample JSON for notifier configuration.
This commit is contained in:
@@ -39,7 +39,8 @@ public sealed record AdvisoryLinksetProvenance(
|
||||
public sealed record AdvisoryLinksetConflict(
|
||||
string Field,
|
||||
string Reason,
|
||||
IReadOnlyList<string>? Values);
|
||||
IReadOnlyList<string>? Values,
|
||||
IReadOnlyList<string>? SourceIds = null);
|
||||
|
||||
internal static class BsonDocumentHelper
|
||||
{
|
||||
|
||||
@@ -32,10 +32,23 @@ internal static class AdvisoryLinksetNormalization
|
||||
ArgumentNullException.ThrowIfNull(linkset);
|
||||
|
||||
var normalized = Build(linkset.PackageUrls);
|
||||
var conflicts = ExtractConflicts(linkset);
|
||||
var confidence = ComputeConfidence(linkset, providedConfidence, conflicts);
|
||||
|
||||
return (normalized, confidence, conflicts);
|
||||
var inputs = new[]
|
||||
{
|
||||
new LinksetCorrelation.Input(
|
||||
Vendor: null,
|
||||
FetchedAt: null,
|
||||
Aliases: linkset.Aliases,
|
||||
Purls: linkset.PackageUrls,
|
||||
Cpes: linkset.Cpes,
|
||||
References: linkset.References.Select(r => r.Url).ToArray())
|
||||
};
|
||||
|
||||
var noteConflicts = ExtractConflicts(linkset);
|
||||
var (confidenceScore, conflicts) = LinksetCorrelation.Compute(inputs, noteConflicts);
|
||||
var coerced = providedConfidence.HasValue ? CoerceConfidence(providedConfidence) : confidenceScore;
|
||||
|
||||
return (normalized, coerced, conflicts);
|
||||
}
|
||||
|
||||
private static AdvisoryLinksetNormalized? Build(IEnumerable<string> purlValues)
|
||||
@@ -190,37 +203,4 @@ internal static class AdvisoryLinksetNormalization
|
||||
|
||||
return conflicts;
|
||||
}
|
||||
|
||||
private static double? ComputeConfidence(RawLinkset linkset, double? providedConfidence, IReadOnlyList<AdvisoryLinksetConflict> conflicts)
|
||||
{
|
||||
if (providedConfidence.HasValue)
|
||||
{
|
||||
return CoerceConfidence(providedConfidence);
|
||||
}
|
||||
|
||||
double aliasScore = linkset.Aliases.IsDefaultOrEmpty ? 0d : 1d;
|
||||
double purlOverlapScore = linkset.PackageUrls.IsDefaultOrEmpty
|
||||
? 0d
|
||||
: (linkset.PackageUrls.Length > 1 ? 1d : 0.6d);
|
||||
double cpeOverlapScore = linkset.Cpes.IsDefaultOrEmpty
|
||||
? 0d
|
||||
: (linkset.Cpes.Length > 1 ? 1d : 0.5d);
|
||||
double severityAgreement = conflicts.Any(c => c.Reason == "severity-mismatch") ? 0.2d : 0.5d;
|
||||
double referenceOverlap = linkset.References.IsDefaultOrEmpty ? 0d : 0.5d;
|
||||
double freshnessScore = 0.5d; // until fetchedAt spread is available
|
||||
|
||||
var confidence = (0.40 * aliasScore) +
|
||||
(0.25 * purlOverlapScore) +
|
||||
(0.15 * cpeOverlapScore) +
|
||||
(0.10 * severityAgreement) +
|
||||
(0.05 * referenceOverlap) +
|
||||
(0.05 * freshnessScore);
|
||||
|
||||
if (conflicts.Count > 0 && confidence > 0.7d)
|
||||
{
|
||||
confidence -= 0.1d; // penalize non-empty conflict sets
|
||||
}
|
||||
|
||||
return Math.Clamp(confidence, 0d, 1d);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,346 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Collections.Immutable;
|
||||
using System.Linq;
|
||||
using StellaOps.Concelier.Models;
|
||||
|
||||
namespace StellaOps.Concelier.Core.Linksets;
|
||||
|
||||
internal static class LinksetCorrelation
|
||||
{
|
||||
internal readonly record struct Input(
|
||||
string? Vendor,
|
||||
DateTimeOffset? FetchedAt,
|
||||
IReadOnlyCollection<string> Aliases,
|
||||
IReadOnlyCollection<string> Purls,
|
||||
IReadOnlyCollection<string> Cpes,
|
||||
IReadOnlyCollection<string> References);
|
||||
|
||||
internal static (double Confidence, IReadOnlyList<AdvisoryLinksetConflict> Conflicts) Compute(
|
||||
IReadOnlyCollection<Input> inputs,
|
||||
IReadOnlyList<AdvisoryLinksetConflict>? additionalConflicts = null)
|
||||
{
|
||||
if (inputs.Count == 0)
|
||||
{
|
||||
return (1.0, Array.Empty<AdvisoryLinksetConflict>());
|
||||
}
|
||||
|
||||
var conflicts = new List<AdvisoryLinksetConflict>();
|
||||
|
||||
var aliasScore = CalculateAliasScore(inputs, conflicts);
|
||||
var (purlScore, rangeConflicts) = CalculatePurlScore(inputs);
|
||||
conflicts.AddRange(rangeConflicts);
|
||||
|
||||
var cpeScore = CalculateCpeScore(inputs);
|
||||
var (referenceScore, referenceConflicts) = CalculateReferenceScore(inputs);
|
||||
conflicts.AddRange(referenceConflicts);
|
||||
|
||||
var severityAgreement = 0.5d; // no severity data available in linkset inputs
|
||||
var freshnessScore = CalculateFreshnessScore(inputs);
|
||||
|
||||
var baseConfidence = Clamp01(
|
||||
(0.40d * aliasScore) +
|
||||
(0.25d * purlScore) +
|
||||
(0.15d * cpeScore) +
|
||||
(0.10d * severityAgreement) +
|
||||
(0.05d * referenceScore) +
|
||||
(0.05d * freshnessScore));
|
||||
|
||||
if (conflicts.Count > 0 && baseConfidence > 0.7d)
|
||||
{
|
||||
baseConfidence -= 0.1d;
|
||||
}
|
||||
|
||||
if (baseConfidence < 0.1d && conflicts.Count > 0)
|
||||
{
|
||||
baseConfidence = 0.1d; // keep deterministic low signal, not zero
|
||||
}
|
||||
|
||||
if (additionalConflicts is { Count: > 0 })
|
||||
{
|
||||
conflicts.AddRange(additionalConflicts);
|
||||
}
|
||||
|
||||
return (Clamp01(baseConfidence), DeduplicateAndSort(conflicts, inputs));
|
||||
}
|
||||
|
||||
private static double CalculateAliasScore(IReadOnlyCollection<Input> inputs, List<AdvisoryLinksetConflict> conflicts)
|
||||
{
|
||||
if (inputs.Count == 1)
|
||||
{
|
||||
return inputs.First().Aliases.Count > 0 ? 1d : 0d;
|
||||
}
|
||||
|
||||
var intersection = inputs
|
||||
.Select(i => i.Aliases.Select(a => a.ToLowerInvariant()).ToHashSet(StringComparer.Ordinal))
|
||||
.Aggregate((acc, next) =>
|
||||
{
|
||||
acc.IntersectWith(next);
|
||||
return acc;
|
||||
});
|
||||
|
||||
if (intersection.Count > 0)
|
||||
{
|
||||
return 1d;
|
||||
}
|
||||
|
||||
var anyAliases = inputs.Any(i => i.Aliases.Count > 0);
|
||||
if (anyAliases)
|
||||
{
|
||||
var values = inputs
|
||||
.Select(i => $"{i.Vendor ?? "source"}:{i.Aliases.FirstOrDefault() ?? "<none>"}")
|
||||
.ToArray();
|
||||
conflicts.Add(new AdvisoryLinksetConflict("aliases", "alias-inconsistency", values));
|
||||
}
|
||||
|
||||
var vendors = inputs.Select(i => i.Vendor ?? string.Empty).ToHashSet(StringComparer.OrdinalIgnoreCase);
|
||||
return vendors.Count == 1 ? 0.5d : 0d;
|
||||
}
|
||||
|
||||
private static (double Score, IReadOnlyList<AdvisoryLinksetConflict> Conflicts) CalculatePurlScore(
|
||||
IReadOnlyCollection<Input> inputs)
|
||||
{
|
||||
var conflicts = new List<AdvisoryLinksetConflict>();
|
||||
if (inputs.All(i => i.Purls.Count == 0))
|
||||
{
|
||||
return (0d, conflicts);
|
||||
}
|
||||
|
||||
List<HashSet<string>> packageKeysPerInput = inputs
|
||||
.Select(i => i.Purls
|
||||
.Select(ExtractPackageKey)
|
||||
.Where(k => !string.IsNullOrEmpty(k))
|
||||
.ToHashSet(StringComparer.Ordinal))
|
||||
.ToList();
|
||||
|
||||
var sharedPackages = packageKeysPerInput
|
||||
.Skip(1)
|
||||
.Aggregate(
|
||||
new HashSet<string>(packageKeysPerInput.First()!, StringComparer.Ordinal),
|
||||
(acc, next) =>
|
||||
{
|
||||
acc.IntersectWith(next!);
|
||||
return acc;
|
||||
});
|
||||
|
||||
if (sharedPackages.Count > 0)
|
||||
{
|
||||
var hasExactPurlOverlap = HasExactPurlOverlap(inputs);
|
||||
if (!hasExactPurlOverlap)
|
||||
{
|
||||
var divergent = CollectRangeConflicts(inputs, sharedPackages);
|
||||
conflicts.AddRange(divergent);
|
||||
}
|
||||
|
||||
return (hasExactPurlOverlap ? 1d : 0.6d, conflicts);
|
||||
}
|
||||
|
||||
return (0d, conflicts);
|
||||
}
|
||||
|
||||
private static IEnumerable<AdvisoryLinksetConflict> CollectRangeConflicts(
|
||||
IReadOnlyCollection<Input> inputs,
|
||||
HashSet<string> sharedPackages)
|
||||
{
|
||||
var conflicts = new List<AdvisoryLinksetConflict>();
|
||||
|
||||
foreach (var package in sharedPackages)
|
||||
{
|
||||
var values = inputs
|
||||
.SelectMany(i => i.Purls
|
||||
.Where(p => ExtractPackageKey(p) == package)
|
||||
.Select(p => $"{i.Vendor ?? "source"}:{p}"))
|
||||
.ToArray();
|
||||
|
||||
var sourceIds = inputs
|
||||
.Select(i => i.Vendor ?? "source")
|
||||
.ToArray();
|
||||
|
||||
if (values.Length > 1)
|
||||
{
|
||||
conflicts.Add(new AdvisoryLinksetConflict(
|
||||
"affected.versions",
|
||||
"affected-range-divergence",
|
||||
values,
|
||||
sourceIds));
|
||||
}
|
||||
}
|
||||
|
||||
return conflicts;
|
||||
}
|
||||
|
||||
private static bool HasExactPurlOverlap(IReadOnlyCollection<Input> inputs)
|
||||
{
|
||||
var first = inputs.First().Purls.ToHashSet(StringComparer.Ordinal);
|
||||
return inputs.Skip(1).Any(input => input.Purls.Any(first.Contains));
|
||||
}
|
||||
|
||||
private static string ExtractPackageKey(string purl)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(purl))
|
||||
{
|
||||
return string.Empty;
|
||||
}
|
||||
|
||||
var atIndex = purl.LastIndexOf('@');
|
||||
return atIndex > 0 ? purl[..atIndex] : purl;
|
||||
}
|
||||
|
||||
private static double CalculateCpeScore(IReadOnlyCollection<Input> inputs)
|
||||
{
|
||||
if (inputs.All(i => i.Cpes.Count == 0))
|
||||
{
|
||||
return 0d;
|
||||
}
|
||||
|
||||
var cpeSets = inputs.Select(i => i.Cpes.ToHashSet(StringComparer.OrdinalIgnoreCase)).ToList();
|
||||
var exactOverlap = cpeSets.Skip(1).Any(set => set.Overlaps(cpeSets.First()));
|
||||
if (exactOverlap)
|
||||
{
|
||||
return 1d;
|
||||
}
|
||||
|
||||
var vendorProductSets = inputs
|
||||
.Select(i => i.Cpes.Select(ParseVendorProduct).Where(vp => vp.vendor is not null).ToHashSet())
|
||||
.ToList();
|
||||
|
||||
var sharedVendorProduct = vendorProductSets.Skip(1).Any(set => set.Overlaps(vendorProductSets.First()));
|
||||
return sharedVendorProduct ? 0.5d : 0d;
|
||||
}
|
||||
|
||||
private static (string? vendor, string? product) ParseVendorProduct(string cpe)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(cpe))
|
||||
{
|
||||
return (null, null);
|
||||
}
|
||||
|
||||
var parts = cpe.Split(':');
|
||||
if (parts.Length >= 6 && parts[0].StartsWith("cpe", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
// cpe:2.3:a:vendor:product:version...
|
||||
return (parts[3], parts[4]);
|
||||
}
|
||||
|
||||
if (parts.Length >= 5 && parts[0] == "cpe" && parts[1] == "/")
|
||||
{
|
||||
return (parts[2], parts[3]);
|
||||
}
|
||||
|
||||
return (null, null);
|
||||
}
|
||||
|
||||
private static (double Score, IReadOnlyList<AdvisoryLinksetConflict> Conflicts) CalculateReferenceScore(
|
||||
IReadOnlyCollection<Input> inputs)
|
||||
{
|
||||
var conflicts = new List<AdvisoryLinksetConflict>();
|
||||
if (inputs.All(i => i.References.Count == 0))
|
||||
{
|
||||
return (0d, conflicts);
|
||||
}
|
||||
|
||||
double maxOverlap = 0d;
|
||||
var inputList = inputs.ToList();
|
||||
for (var i = 0; i < inputList.Count; i++)
|
||||
{
|
||||
for (var j = i + 1; j < inputList.Count; j++)
|
||||
{
|
||||
var first = inputList[i].References.Select(r => r.ToLowerInvariant()).ToHashSet();
|
||||
var second = inputList[j].References.Select(r => r.ToLowerInvariant()).ToHashSet();
|
||||
|
||||
var intersection = first.Intersect(second).Count();
|
||||
var denom = Math.Max(first.Count, second.Count);
|
||||
var overlap = denom == 0 ? 0d : (double)intersection / denom;
|
||||
if (overlap > maxOverlap)
|
||||
{
|
||||
maxOverlap = overlap;
|
||||
}
|
||||
|
||||
if (overlap == 0d && !string.Equals(inputList[i].Vendor, inputList[j].Vendor, StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
var values = new[]
|
||||
{
|
||||
$"{inputList[i].Vendor ?? "source"}:{first.FirstOrDefault() ?? "<none>"}",
|
||||
$"{inputList[j].Vendor ?? "source"}:{second.FirstOrDefault() ?? "<none>"}"
|
||||
};
|
||||
|
||||
conflicts.Add(new AdvisoryLinksetConflict(
|
||||
"references",
|
||||
"reference-clash",
|
||||
values,
|
||||
new[]
|
||||
{
|
||||
inputList[i].Vendor ?? "source",
|
||||
inputList[j].Vendor ?? "source"
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return (maxOverlap, conflicts);
|
||||
}
|
||||
|
||||
private static double CalculateFreshnessScore(IReadOnlyCollection<Input> inputs)
|
||||
{
|
||||
var fetched = inputs
|
||||
.Select(i => i.FetchedAt)
|
||||
.Where(d => d.HasValue)
|
||||
.Select(d => d!.Value)
|
||||
.ToList();
|
||||
|
||||
if (fetched.Count <= 1)
|
||||
{
|
||||
return 0.5d; // neutral when unknown
|
||||
}
|
||||
|
||||
var min = fetched.Min();
|
||||
var max = fetched.Max();
|
||||
var spread = max - min;
|
||||
|
||||
if (spread <= TimeSpan.FromHours(48))
|
||||
{
|
||||
return 1d;
|
||||
}
|
||||
|
||||
if (spread >= TimeSpan.FromDays(14))
|
||||
{
|
||||
return 0d;
|
||||
}
|
||||
|
||||
var remaining = TimeSpan.FromDays(14) - spread;
|
||||
return Clamp01(remaining.TotalSeconds / TimeSpan.FromDays(14).TotalSeconds);
|
||||
}
|
||||
|
||||
private static IReadOnlyList<AdvisoryLinksetConflict> DeduplicateAndSort(
|
||||
IEnumerable<AdvisoryLinksetConflict> conflicts,
|
||||
IReadOnlyCollection<Input> inputs)
|
||||
{
|
||||
var set = new HashSet<string>(StringComparer.Ordinal);
|
||||
var list = new List<AdvisoryLinksetConflict>();
|
||||
|
||||
foreach (var conflict in conflicts)
|
||||
{
|
||||
var key = $"{conflict.Field}|{conflict.Reason}|{string.Join('|', conflict.Values ?? Array.Empty<string>())}";
|
||||
if (set.Add(key))
|
||||
{
|
||||
if (conflict.SourceIds is null || conflict.SourceIds.Count == 0)
|
||||
{
|
||||
var allSources = inputs.Select(i => i.Vendor ?? "source").Distinct(StringComparer.OrdinalIgnoreCase).ToArray();
|
||||
list.Add(conflict with { SourceIds = allSources });
|
||||
}
|
||||
else
|
||||
{
|
||||
list.Add(conflict);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return list
|
||||
.OrderBy(c => c.Field, StringComparer.Ordinal)
|
||||
.ThenBy(c => c.Reason, StringComparer.Ordinal)
|
||||
.ThenBy(c => string.Join('|', c.Values ?? Array.Empty<string>()), StringComparer.Ordinal)
|
||||
.ToList();
|
||||
}
|
||||
|
||||
private static double Clamp01(double value) => Math.Clamp(value, 0d, 1d);
|
||||
}
|
||||
@@ -216,28 +216,27 @@ public sealed class AdvisoryObservationQueryService : IAdvisoryObservationQueryS
|
||||
var referenceSet = new HashSet<AdvisoryObservationReference>();
|
||||
var scopeSet = new HashSet<string>(StringComparer.Ordinal);
|
||||
var relationshipSet = new HashSet<RawRelationship>();
|
||||
var conflictSet = new HashSet<string>(StringComparer.Ordinal);
|
||||
var conflicts = new List<AdvisoryLinksetConflict>();
|
||||
var confidence = 1.0;
|
||||
|
||||
var correlationInputs = new List<LinksetCorrelation.Input>(observations.Length);
|
||||
|
||||
foreach (var observation in observations)
|
||||
{
|
||||
foreach (var alias in observation.Linkset.Aliases)
|
||||
{
|
||||
aliasSet.Add(alias);
|
||||
}
|
||||
|
||||
foreach (var purl in observation.Linkset.Purls)
|
||||
{
|
||||
purlSet.Add(purl);
|
||||
}
|
||||
|
||||
foreach (var cpe in observation.Linkset.Cpes)
|
||||
{
|
||||
cpeSet.Add(cpe);
|
||||
}
|
||||
|
||||
foreach (var reference in observation.Linkset.References)
|
||||
aliasSet.Add(alias);
|
||||
}
|
||||
|
||||
foreach (var purl in observation.Linkset.Purls)
|
||||
{
|
||||
purlSet.Add(purl);
|
||||
}
|
||||
|
||||
foreach (var cpe in observation.Linkset.Cpes)
|
||||
{
|
||||
cpeSet.Add(cpe);
|
||||
}
|
||||
|
||||
foreach (var reference in observation.Linkset.References)
|
||||
{
|
||||
referenceSet.Add(reference);
|
||||
}
|
||||
@@ -252,19 +251,17 @@ public sealed class AdvisoryObservationQueryService : IAdvisoryObservationQueryS
|
||||
relationshipSet.Add(relationship);
|
||||
}
|
||||
|
||||
var linksetProjection = AdvisoryLinksetNormalization.FromRawLinksetWithConfidence(observation.RawLinkset);
|
||||
confidence = Math.Min(confidence, linksetProjection.confidence ?? 1.0);
|
||||
|
||||
foreach (var conflict in linksetProjection.conflicts)
|
||||
{
|
||||
var key = $"{conflict.Field}|{conflict.Reason}|{string.Join('|', conflict.Values ?? Array.Empty<string>())}";
|
||||
if (conflictSet.Add(key))
|
||||
{
|
||||
conflicts.Add(conflict);
|
||||
}
|
||||
}
|
||||
correlationInputs.Add(new LinksetCorrelation.Input(
|
||||
observation.Source.Vendor,
|
||||
observation.Upstream.FetchedAt,
|
||||
observation.Linkset.Aliases,
|
||||
observation.Linkset.Purls,
|
||||
observation.Linkset.Cpes,
|
||||
observation.Linkset.References.Select(r => r.Url).ToArray()));
|
||||
}
|
||||
|
||||
var (confidence, conflicts) = LinksetCorrelation.Compute(correlationInputs);
|
||||
|
||||
return new AdvisoryObservationLinksetAggregate(
|
||||
aliasSet.OrderBy(static alias => alias, StringComparer.Ordinal).ToImmutableArray(),
|
||||
purlSet.OrderBy(static purl => purl, StringComparer.Ordinal).ToImmutableArray(),
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace StellaOps.Concelier.Core.Observations;
|
||||
|
||||
/// <summary>
|
||||
/// Transports advisory.observation.updated@1 events from the outbox to external subscribers (e.g., NATS).
|
||||
/// </summary>
|
||||
public interface IAdvisoryObservationEventTransport
|
||||
{
|
||||
Task SendAsync(AdvisoryObservationUpdatedEvent @event, CancellationToken cancellationToken);
|
||||
}
|
||||
@@ -108,5 +108,10 @@ public sealed class AdvisoryLinksetConflictDocument
|
||||
[BsonElement("values")]
|
||||
[BsonIgnoreIfNull]
|
||||
public List<string>? Values { get; set; }
|
||||
= null;
|
||||
= new();
|
||||
|
||||
[BsonElement("sourceIds")]
|
||||
[BsonIgnoreIfNull]
|
||||
public List<string>? SourceIds { get; set; }
|
||||
= new();
|
||||
}
|
||||
|
||||
@@ -111,7 +111,8 @@ internal sealed class ConcelierMongoLinksetStore : IMongoAdvisoryLinksetStore
|
||||
{
|
||||
Field = conflict.Field,
|
||||
Reason = conflict.Reason,
|
||||
Values = conflict.Values is null ? null : new List<string>(conflict.Values)
|
||||
Values = conflict.Values is null ? null : new List<string>(conflict.Values),
|
||||
SourceIds = conflict.SourceIds is null ? null : new List<string>(conflict.SourceIds)
|
||||
}).ToList(),
|
||||
Provenance = linkset.Provenance is null ? null : new AdvisoryLinksetProvenanceDocument
|
||||
{
|
||||
@@ -153,7 +154,8 @@ internal sealed class ConcelierMongoLinksetStore : IMongoAdvisoryLinksetStore
|
||||
: doc.Conflicts.Select(conflict => new CoreLinksets.AdvisoryLinksetConflict(
|
||||
conflict.Field,
|
||||
conflict.Reason,
|
||||
conflict.Values)).ToList(),
|
||||
conflict.Values,
|
||||
conflict.SourceIds)).ToList(),
|
||||
DateTime.SpecifyKind(doc.CreatedAt, DateTimeKind.Utc),
|
||||
doc.BuiltByJobId);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,102 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using MongoDB.Bson;
|
||||
using MongoDB.Driver;
|
||||
using StellaOps.Concelier.Storage.Mongo.Orchestrator;
|
||||
|
||||
namespace StellaOps.Concelier.Storage.Mongo.Migrations;
|
||||
|
||||
internal sealed class EnsureOrchestratorCollectionsMigration : IMongoMigration
|
||||
{
|
||||
public string Id => "20251122_orchestrator_registry_commands";
|
||||
|
||||
public string Description => "Ensure orchestrator registry, commands, and heartbeats collections exist with indexes";
|
||||
|
||||
public async Task ApplyAsync(IMongoDatabase database, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(database);
|
||||
|
||||
await EnsureRegistryAsync(database, cancellationToken).ConfigureAwait(false);
|
||||
await EnsureCommandsAsync(database, cancellationToken).ConfigureAwait(false);
|
||||
await EnsureHeartbeatsAsync(database, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private static async Task EnsureRegistryAsync(IMongoDatabase database, CancellationToken ct)
|
||||
{
|
||||
var name = MongoStorageDefaults.Collections.OrchestratorRegistry;
|
||||
await EnsureCollectionAsync(database, name, ct).ConfigureAwait(false);
|
||||
|
||||
var collection = database.GetCollection<BsonDocument>(name);
|
||||
var indexes = new List<CreateIndexModel<BsonDocument>>
|
||||
{
|
||||
new(new BsonDocument
|
||||
{
|
||||
{"tenant", 1},
|
||||
{"connectorId", 1},
|
||||
}, new CreateIndexOptions { Name = "orch_registry_tenant_connector", Unique = true }),
|
||||
new(new BsonDocument
|
||||
{
|
||||
{"source", 1},
|
||||
}, new CreateIndexOptions { Name = "orch_registry_source" }),
|
||||
};
|
||||
|
||||
await collection.Indexes.CreateManyAsync(indexes, cancellationToken: ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private static async Task EnsureCommandsAsync(IMongoDatabase database, CancellationToken ct)
|
||||
{
|
||||
var name = MongoStorageDefaults.Collections.OrchestratorCommands;
|
||||
await EnsureCollectionAsync(database, name, ct).ConfigureAwait(false);
|
||||
|
||||
var collection = database.GetCollection<BsonDocument>(name);
|
||||
var indexes = new List<CreateIndexModel<BsonDocument>>
|
||||
{
|
||||
new(new BsonDocument
|
||||
{
|
||||
{"tenant", 1},
|
||||
{"connectorId", 1},
|
||||
{"runId", 1},
|
||||
{"sequence", 1},
|
||||
}, new CreateIndexOptions { Name = "orch_cmd_tenant_connector_run_seq" }),
|
||||
new(new BsonDocument { {"expiresAt", 1} }, new CreateIndexOptions
|
||||
{
|
||||
Name = "orch_cmd_expiresAt_ttl",
|
||||
ExpireAfter = TimeSpan.FromSeconds(0),
|
||||
})
|
||||
};
|
||||
|
||||
await collection.Indexes.CreateManyAsync(indexes, cancellationToken: ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private static async Task EnsureHeartbeatsAsync(IMongoDatabase database, CancellationToken ct)
|
||||
{
|
||||
var name = MongoStorageDefaults.Collections.OrchestratorHeartbeats;
|
||||
await EnsureCollectionAsync(database, name, ct).ConfigureAwait(false);
|
||||
|
||||
var collection = database.GetCollection<BsonDocument>(name);
|
||||
var indexes = new List<CreateIndexModel<BsonDocument>>
|
||||
{
|
||||
new(new BsonDocument
|
||||
{
|
||||
{"tenant", 1},
|
||||
{"connectorId", 1},
|
||||
{"runId", 1},
|
||||
{"sequence", 1},
|
||||
}, new CreateIndexOptions { Name = "orch_hb_tenant_connector_run_seq" }),
|
||||
new(new BsonDocument { {"timestamp", -1} }, new CreateIndexOptions { Name = "orch_hb_timestamp_desc" })
|
||||
};
|
||||
|
||||
await collection.Indexes.CreateManyAsync(indexes, cancellationToken: ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private static async Task EnsureCollectionAsync(IMongoDatabase database, string collectionName, CancellationToken ct)
|
||||
{
|
||||
var filter = new BsonDocument("name", collectionName);
|
||||
using var cursor = await database.ListCollectionsAsync(new ListCollectionsOptions { Filter = filter }, ct).ConfigureAwait(false);
|
||||
var exists = await cursor.AnyAsync(ct).ConfigureAwait(false);
|
||||
if (!exists)
|
||||
{
|
||||
await database.CreateCollectionAsync(collectionName, cancellationToken: ct).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -30,5 +30,8 @@ public static class MongoStorageDefaults
|
||||
public const string AdvisoryObservations = "advisory_observations";
|
||||
public const string AdvisoryLinksets = "advisory_linksets";
|
||||
public const string AdvisoryObservationEvents = "advisory_observation_events";
|
||||
public const string OrchestratorRegistry = "orchestrator_registry";
|
||||
public const string OrchestratorCommands = "orchestrator_commands";
|
||||
public const string OrchestratorHeartbeats = "orchestrator_heartbeats";
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,18 +11,18 @@ namespace StellaOps.Concelier.Storage.Mongo.Observations;
|
||||
internal sealed class AdvisoryObservationTransportWorker : BackgroundService
|
||||
{
|
||||
private readonly IAdvisoryObservationEventOutbox _outbox;
|
||||
private readonly IAdvisoryObservationEventPublisher _publisher;
|
||||
private readonly IAdvisoryObservationEventTransport _transport;
|
||||
private readonly ILogger<AdvisoryObservationTransportWorker> _logger;
|
||||
private readonly AdvisoryObservationEventPublisherOptions _options;
|
||||
|
||||
public AdvisoryObservationTransportWorker(
|
||||
IAdvisoryObservationEventOutbox outbox,
|
||||
IAdvisoryObservationEventPublisher publisher,
|
||||
IAdvisoryObservationEventTransport transport,
|
||||
IOptions<AdvisoryObservationEventPublisherOptions> options,
|
||||
ILogger<AdvisoryObservationTransportWorker> logger)
|
||||
{
|
||||
_outbox = outbox ?? throw new ArgumentNullException(nameof(outbox));
|
||||
_publisher = publisher ?? throw new ArgumentNullException(nameof(publisher));
|
||||
_transport = transport ?? throw new ArgumentNullException(nameof(transport));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_options = options.Value;
|
||||
}
|
||||
@@ -48,7 +48,7 @@ internal sealed class AdvisoryObservationTransportWorker : BackgroundService
|
||||
|
||||
foreach (var evt in batch)
|
||||
{
|
||||
await _publisher.PublishAsync(evt, stoppingToken).ConfigureAwait(false);
|
||||
await _transport.SendAsync(evt, stoppingToken).ConfigureAwait(false);
|
||||
await _outbox.MarkPublishedAsync(evt.EventId, DateTimeOffset.UtcNow, stoppingToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ using StellaOps.Concelier.Core.Observations;
|
||||
|
||||
namespace StellaOps.Concelier.Storage.Mongo.Observations;
|
||||
|
||||
internal sealed class NatsAdvisoryObservationEventPublisher : IAdvisoryObservationEventPublisher
|
||||
internal sealed class NatsAdvisoryObservationEventPublisher : IAdvisoryObservationEventTransport
|
||||
{
|
||||
private readonly ILogger<NatsAdvisoryObservationEventPublisher> _logger;
|
||||
private readonly AdvisoryObservationEventPublisherOptions _options;
|
||||
@@ -26,7 +26,7 @@ internal sealed class NatsAdvisoryObservationEventPublisher : IAdvisoryObservati
|
||||
_options = options.Value;
|
||||
}
|
||||
|
||||
public async Task PublishAsync(AdvisoryObservationUpdatedEvent @event, CancellationToken cancellationToken)
|
||||
public async Task SendAsync(AdvisoryObservationUpdatedEvent @event, CancellationToken cancellationToken)
|
||||
{
|
||||
if (!_options.Enabled)
|
||||
{
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using StellaOps.Concelier.Core.Observations;
|
||||
|
||||
namespace StellaOps.Concelier.Storage.Mongo.Observations;
|
||||
|
||||
internal sealed class NullAdvisoryObservationEventTransport : IAdvisoryObservationEventTransport
|
||||
{
|
||||
public static readonly NullAdvisoryObservationEventTransport Instance = new();
|
||||
|
||||
private NullAdvisoryObservationEventTransport() { }
|
||||
|
||||
public Task SendAsync(AdvisoryObservationUpdatedEvent @event, CancellationToken cancellationToken)
|
||||
=> Task.CompletedTask;
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
namespace StellaOps.Concelier.Storage.Mongo.Orchestrator;
|
||||
|
||||
public interface IOrchestratorRegistryStore
|
||||
{
|
||||
Task UpsertAsync(OrchestratorRegistryRecord record, CancellationToken cancellationToken);
|
||||
|
||||
Task<OrchestratorRegistryRecord?> GetAsync(string tenant, string connectorId, CancellationToken cancellationToken);
|
||||
|
||||
Task EnqueueCommandAsync(OrchestratorCommandRecord command, CancellationToken cancellationToken);
|
||||
|
||||
Task<IReadOnlyList<OrchestratorCommandRecord>> GetPendingCommandsAsync(
|
||||
string tenant,
|
||||
string connectorId,
|
||||
Guid runId,
|
||||
long? afterSequence,
|
||||
CancellationToken cancellationToken);
|
||||
|
||||
Task AppendHeartbeatAsync(OrchestratorHeartbeatRecord heartbeat, CancellationToken cancellationToken);
|
||||
}
|
||||
@@ -0,0 +1,94 @@
|
||||
using System;
|
||||
using System.Linq;
|
||||
using MongoDB.Driver;
|
||||
|
||||
namespace StellaOps.Concelier.Storage.Mongo.Orchestrator;
|
||||
|
||||
public sealed class MongoOrchestratorRegistryStore : IOrchestratorRegistryStore
|
||||
{
|
||||
private readonly IMongoCollection<OrchestratorRegistryDocument> _registry;
|
||||
private readonly IMongoCollection<OrchestratorCommandDocument> _commands;
|
||||
private readonly IMongoCollection<OrchestratorHeartbeatDocument> _heartbeats;
|
||||
|
||||
public MongoOrchestratorRegistryStore(
|
||||
IMongoCollection<OrchestratorRegistryDocument> registry,
|
||||
IMongoCollection<OrchestratorCommandDocument> commands,
|
||||
IMongoCollection<OrchestratorHeartbeatDocument> heartbeats)
|
||||
{
|
||||
_registry = registry ?? throw new ArgumentNullException(nameof(registry));
|
||||
_commands = commands ?? throw new ArgumentNullException(nameof(commands));
|
||||
_heartbeats = heartbeats ?? throw new ArgumentNullException(nameof(heartbeats));
|
||||
}
|
||||
|
||||
public async Task UpsertAsync(OrchestratorRegistryRecord record, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(record);
|
||||
|
||||
var document = OrchestratorRegistryDocumentExtensions.FromRecord(record);
|
||||
|
||||
var filter = Builders<OrchestratorRegistryDocument>.Filter.And(
|
||||
Builders<OrchestratorRegistryDocument>.Filter.Eq(x => x.Tenant, record.Tenant),
|
||||
Builders<OrchestratorRegistryDocument>.Filter.Eq(x => x.ConnectorId, record.ConnectorId));
|
||||
|
||||
var options = new ReplaceOptions { IsUpsert = true };
|
||||
await _registry.ReplaceOneAsync(filter, document, options, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task<OrchestratorRegistryRecord?> GetAsync(string tenant, string connectorId, CancellationToken cancellationToken)
|
||||
{
|
||||
var filter = Builders<OrchestratorRegistryDocument>.Filter.And(
|
||||
Builders<OrchestratorRegistryDocument>.Filter.Eq(x => x.Tenant, tenant),
|
||||
Builders<OrchestratorRegistryDocument>.Filter.Eq(x => x.ConnectorId, connectorId));
|
||||
|
||||
var document = await _registry
|
||||
.Find(filter)
|
||||
.FirstOrDefaultAsync(cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
return document?.ToRecord();
|
||||
}
|
||||
|
||||
public async Task EnqueueCommandAsync(OrchestratorCommandRecord command, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(command);
|
||||
|
||||
var document = OrchestratorCommandDocumentExtensions.FromRecord(command);
|
||||
await _commands.InsertOneAsync(document, cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task<IReadOnlyList<OrchestratorCommandRecord>> GetPendingCommandsAsync(
|
||||
string tenant,
|
||||
string connectorId,
|
||||
Guid runId,
|
||||
long? afterSequence,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var filter = Builders<OrchestratorCommandDocument>.Filter.And(
|
||||
Builders<OrchestratorCommandDocument>.Filter.Eq(x => x.Tenant, tenant),
|
||||
Builders<OrchestratorCommandDocument>.Filter.Eq(x => x.ConnectorId, connectorId),
|
||||
Builders<OrchestratorCommandDocument>.Filter.Eq(x => x.RunId, runId));
|
||||
|
||||
if (afterSequence.HasValue)
|
||||
{
|
||||
filter &= Builders<OrchestratorCommandDocument>.Filter.Gt(x => x.Sequence, afterSequence.Value);
|
||||
}
|
||||
|
||||
var results = await _commands
|
||||
.Find(filter)
|
||||
.Sort(Builders<OrchestratorCommandDocument>.Sort.Ascending(x => x.Sequence))
|
||||
.ToListAsync(cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
return results
|
||||
.Select(static c => c.ToRecord())
|
||||
.ToArray();
|
||||
}
|
||||
|
||||
public async Task AppendHeartbeatAsync(OrchestratorHeartbeatRecord heartbeat, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(heartbeat);
|
||||
|
||||
var document = OrchestratorHeartbeatDocumentExtensions.FromRecord(heartbeat);
|
||||
await _heartbeats.InsertOneAsync(document, cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,141 @@
|
||||
using System;
|
||||
using System.Globalization;
|
||||
using MongoDB.Bson.Serialization.Attributes;
|
||||
|
||||
namespace StellaOps.Concelier.Storage.Mongo.Orchestrator;
|
||||
|
||||
[BsonIgnoreExtraElements]
|
||||
public sealed class OrchestratorCommandDocument
|
||||
{
|
||||
[BsonId]
|
||||
public string Id { get; set; } = string.Empty;
|
||||
|
||||
[BsonElement("tenant")]
|
||||
public string Tenant { get; set; } = string.Empty;
|
||||
|
||||
[BsonElement("connectorId")]
|
||||
public string ConnectorId { get; set; } = string.Empty;
|
||||
|
||||
[BsonElement("runId")]
|
||||
public Guid RunId { get; set; }
|
||||
= Guid.Empty;
|
||||
|
||||
[BsonElement("sequence")]
|
||||
public long Sequence { get; set; }
|
||||
= 0;
|
||||
|
||||
[BsonElement("command")]
|
||||
public OrchestratorCommandKind Command { get; set; }
|
||||
= OrchestratorCommandKind.Pause;
|
||||
|
||||
[BsonElement("throttle")]
|
||||
public OrchestratorThrottleOverrideDocument? Throttle { get; set; }
|
||||
= null;
|
||||
|
||||
[BsonElement("backfill")]
|
||||
public OrchestratorBackfillRangeDocument? Backfill { get; set; }
|
||||
= null;
|
||||
|
||||
[BsonElement("createdAt")]
|
||||
public DateTime CreatedAt { get; set; }
|
||||
= DateTime.SpecifyKind(DateTime.UnixEpoch, DateTimeKind.Utc);
|
||||
|
||||
[BsonElement("expiresAt")]
|
||||
public DateTime? ExpiresAt { get; set; }
|
||||
= null;
|
||||
}
|
||||
|
||||
[BsonIgnoreExtraElements]
|
||||
public sealed class OrchestratorThrottleOverrideDocument
|
||||
{
|
||||
[BsonElement("rpm")]
|
||||
public int? Rpm { get; set; }
|
||||
= null;
|
||||
|
||||
[BsonElement("burst")]
|
||||
public int? Burst { get; set; }
|
||||
= null;
|
||||
|
||||
[BsonElement("cooldownSeconds")]
|
||||
public int? CooldownSeconds { get; set; }
|
||||
= null;
|
||||
|
||||
[BsonElement("expiresAt")]
|
||||
public DateTime? ExpiresAt { get; set; }
|
||||
= null;
|
||||
}
|
||||
|
||||
[BsonIgnoreExtraElements]
|
||||
public sealed class OrchestratorBackfillRangeDocument
|
||||
{
|
||||
[BsonElement("fromCursor")]
|
||||
public string? FromCursor { get; set; }
|
||||
= null;
|
||||
|
||||
[BsonElement("toCursor")]
|
||||
public string? ToCursor { get; set; }
|
||||
= null;
|
||||
}
|
||||
|
||||
internal static class OrchestratorCommandDocumentExtensions
|
||||
{
|
||||
public static OrchestratorCommandDocument FromRecord(OrchestratorCommandRecord record)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(record);
|
||||
|
||||
return new OrchestratorCommandDocument
|
||||
{
|
||||
Id = BuildId(record.Tenant, record.ConnectorId, record.RunId, record.Sequence),
|
||||
Tenant = record.Tenant,
|
||||
ConnectorId = record.ConnectorId,
|
||||
RunId = record.RunId,
|
||||
Sequence = record.Sequence,
|
||||
Command = record.Command,
|
||||
Throttle = record.Throttle is null
|
||||
? null
|
||||
: new OrchestratorThrottleOverrideDocument
|
||||
{
|
||||
Rpm = record.Throttle.Rpm,
|
||||
Burst = record.Throttle.Burst,
|
||||
CooldownSeconds = record.Throttle.CooldownSeconds,
|
||||
ExpiresAt = record.Throttle.ExpiresAt?.UtcDateTime,
|
||||
},
|
||||
Backfill = record.Backfill is null
|
||||
? null
|
||||
: new OrchestratorBackfillRangeDocument
|
||||
{
|
||||
FromCursor = record.Backfill.FromCursor,
|
||||
ToCursor = record.Backfill.ToCursor,
|
||||
},
|
||||
CreatedAt = record.CreatedAt.UtcDateTime,
|
||||
ExpiresAt = record.ExpiresAt?.UtcDateTime,
|
||||
};
|
||||
}
|
||||
|
||||
public static OrchestratorCommandRecord ToRecord(this OrchestratorCommandDocument document)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(document);
|
||||
|
||||
return new OrchestratorCommandRecord(
|
||||
document.Tenant,
|
||||
document.ConnectorId,
|
||||
document.RunId,
|
||||
document.Sequence,
|
||||
document.Command,
|
||||
document.Throttle is null
|
||||
? null
|
||||
: new OrchestratorThrottleOverride(
|
||||
document.Throttle.Rpm,
|
||||
document.Throttle.Burst,
|
||||
document.Throttle.CooldownSeconds,
|
||||
document.Throttle.ExpiresAt is null ? null : DateTime.SpecifyKind(document.Throttle.ExpiresAt.Value, DateTimeKind.Utc)),
|
||||
document.Backfill is null
|
||||
? null
|
||||
: new OrchestratorBackfillRange(document.Backfill.FromCursor, document.Backfill.ToCursor),
|
||||
DateTime.SpecifyKind(document.CreatedAt, DateTimeKind.Utc),
|
||||
document.ExpiresAt is null ? null : DateTime.SpecifyKind(document.ExpiresAt.Value, DateTimeKind.Utc));
|
||||
}
|
||||
|
||||
private static string BuildId(string tenant, string connectorId, Guid runId, long sequence)
|
||||
=> string.Create(CultureInfo.InvariantCulture, $"{tenant}:{connectorId}:{runId}:{sequence}");
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
using System;
|
||||
|
||||
namespace StellaOps.Concelier.Storage.Mongo.Orchestrator;
|
||||
|
||||
public sealed record OrchestratorCommandRecord(
|
||||
string Tenant,
|
||||
string ConnectorId,
|
||||
Guid RunId,
|
||||
long Sequence,
|
||||
OrchestratorCommandKind Command,
|
||||
OrchestratorThrottleOverride? Throttle,
|
||||
OrchestratorBackfillRange? Backfill,
|
||||
DateTimeOffset CreatedAt,
|
||||
DateTimeOffset? ExpiresAt);
|
||||
|
||||
public enum OrchestratorCommandKind
|
||||
{
|
||||
Pause,
|
||||
Resume,
|
||||
Throttle,
|
||||
Backfill,
|
||||
}
|
||||
|
||||
public sealed record OrchestratorThrottleOverride(int? Rpm, int? Burst, int? CooldownSeconds, DateTimeOffset? ExpiresAt);
|
||||
|
||||
public sealed record OrchestratorBackfillRange(string? FromCursor, string? ToCursor);
|
||||
@@ -0,0 +1,105 @@
|
||||
using System;
|
||||
using System.Globalization;
|
||||
using MongoDB.Bson.Serialization.Attributes;
|
||||
|
||||
namespace StellaOps.Concelier.Storage.Mongo.Orchestrator;
|
||||
|
||||
[BsonIgnoreExtraElements]
|
||||
public sealed class OrchestratorHeartbeatDocument
|
||||
{
|
||||
[BsonId]
|
||||
public string Id { get; set; } = string.Empty;
|
||||
|
||||
[BsonElement("tenant")]
|
||||
public string Tenant { get; set; } = string.Empty;
|
||||
|
||||
[BsonElement("connectorId")]
|
||||
public string ConnectorId { get; set; } = string.Empty;
|
||||
|
||||
[BsonElement("runId")]
|
||||
public Guid RunId { get; set; }
|
||||
= Guid.Empty;
|
||||
|
||||
[BsonElement("sequence")]
|
||||
public long Sequence { get; set; }
|
||||
= 0;
|
||||
|
||||
[BsonElement("status")]
|
||||
public OrchestratorHeartbeatStatus Status { get; set; }
|
||||
= OrchestratorHeartbeatStatus.Starting;
|
||||
|
||||
[BsonElement("progress")]
|
||||
public int? Progress { get; set; }
|
||||
= null;
|
||||
|
||||
[BsonElement("queueDepth")]
|
||||
public int? QueueDepth { get; set; }
|
||||
= null;
|
||||
|
||||
[BsonElement("lastArtifactHash")]
|
||||
public string? LastArtifactHash { get; set; }
|
||||
= null;
|
||||
|
||||
[BsonElement("lastArtifactKind")]
|
||||
public string? LastArtifactKind { get; set; }
|
||||
= null;
|
||||
|
||||
[BsonElement("errorCode")]
|
||||
public string? ErrorCode { get; set; }
|
||||
= null;
|
||||
|
||||
[BsonElement("retryAfterSeconds")]
|
||||
public int? RetryAfterSeconds { get; set; }
|
||||
= null;
|
||||
|
||||
[BsonElement("timestamp")]
|
||||
public DateTime Timestamp { get; set; }
|
||||
= DateTime.SpecifyKind(DateTime.UnixEpoch, DateTimeKind.Utc);
|
||||
}
|
||||
|
||||
internal static class OrchestratorHeartbeatDocumentExtensions
|
||||
{
|
||||
public static OrchestratorHeartbeatDocument FromRecord(OrchestratorHeartbeatRecord record)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(record);
|
||||
|
||||
return new OrchestratorHeartbeatDocument
|
||||
{
|
||||
Id = BuildId(record.Tenant, record.ConnectorId, record.RunId, record.Sequence),
|
||||
Tenant = record.Tenant,
|
||||
ConnectorId = record.ConnectorId,
|
||||
RunId = record.RunId,
|
||||
Sequence = record.Sequence,
|
||||
Status = record.Status,
|
||||
Progress = record.Progress,
|
||||
QueueDepth = record.QueueDepth,
|
||||
LastArtifactHash = record.LastArtifactHash,
|
||||
LastArtifactKind = record.LastArtifactKind,
|
||||
ErrorCode = record.ErrorCode,
|
||||
RetryAfterSeconds = record.RetryAfterSeconds,
|
||||
Timestamp = record.TimestampUtc.UtcDateTime,
|
||||
};
|
||||
}
|
||||
|
||||
public static OrchestratorHeartbeatRecord ToRecord(this OrchestratorHeartbeatDocument document)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(document);
|
||||
|
||||
return new OrchestratorHeartbeatRecord(
|
||||
document.Tenant,
|
||||
document.ConnectorId,
|
||||
document.RunId,
|
||||
document.Sequence,
|
||||
document.Status,
|
||||
document.Progress,
|
||||
document.QueueDepth,
|
||||
document.LastArtifactHash,
|
||||
document.LastArtifactKind,
|
||||
document.ErrorCode,
|
||||
document.RetryAfterSeconds,
|
||||
DateTime.SpecifyKind(document.Timestamp, DateTimeKind.Utc));
|
||||
}
|
||||
|
||||
private static string BuildId(string tenant, string connectorId, Guid runId, long sequence)
|
||||
=> string.Create(CultureInfo.InvariantCulture, $"{tenant}:{connectorId}:{runId}:{sequence}");
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
using System;
|
||||
|
||||
namespace StellaOps.Concelier.Storage.Mongo.Orchestrator;
|
||||
|
||||
public sealed record OrchestratorHeartbeatRecord(
|
||||
string Tenant,
|
||||
string ConnectorId,
|
||||
Guid RunId,
|
||||
long Sequence,
|
||||
OrchestratorHeartbeatStatus Status,
|
||||
int? Progress,
|
||||
int? QueueDepth,
|
||||
string? LastArtifactHash,
|
||||
string? LastArtifactKind,
|
||||
string? ErrorCode,
|
||||
int? RetryAfterSeconds,
|
||||
DateTimeOffset TimestampUtc);
|
||||
|
||||
public enum OrchestratorHeartbeatStatus
|
||||
{
|
||||
Starting,
|
||||
Running,
|
||||
Paused,
|
||||
Throttled,
|
||||
Backfill,
|
||||
Failed,
|
||||
Succeeded,
|
||||
}
|
||||
@@ -0,0 +1,165 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Globalization;
|
||||
using MongoDB.Bson.Serialization.Attributes;
|
||||
|
||||
namespace StellaOps.Concelier.Storage.Mongo.Orchestrator;
|
||||
|
||||
[BsonIgnoreExtraElements]
|
||||
public sealed class OrchestratorRegistryDocument
|
||||
{
|
||||
[BsonId]
|
||||
public string Id { get; set; } = string.Empty;
|
||||
|
||||
[BsonElement("tenant")]
|
||||
public string Tenant { get; set; } = string.Empty;
|
||||
|
||||
[BsonElement("connectorId")]
|
||||
public string ConnectorId { get; set; } = string.Empty;
|
||||
|
||||
[BsonElement("source")]
|
||||
public string Source { get; set; } = string.Empty;
|
||||
|
||||
[BsonElement("capabilities")]
|
||||
public IReadOnlyCollection<string> Capabilities { get; set; } = Array.Empty<string>();
|
||||
|
||||
[BsonElement("authRef")]
|
||||
public string AuthRef { get; set; } = string.Empty;
|
||||
|
||||
[BsonElement("schedule")]
|
||||
public OrchestratorScheduleDocument Schedule { get; set; } = new();
|
||||
|
||||
[BsonElement("ratePolicy")]
|
||||
public OrchestratorRatePolicyDocument RatePolicy { get; set; } = new();
|
||||
|
||||
[BsonElement("artifactKinds")]
|
||||
public IReadOnlyCollection<string> ArtifactKinds { get; set; } = Array.Empty<string>();
|
||||
|
||||
[BsonElement("lockKey")]
|
||||
public string LockKey { get; set; } = string.Empty;
|
||||
|
||||
[BsonElement("egressGuard")]
|
||||
public OrchestratorEgressGuardDocument EgressGuard { get; set; } = new();
|
||||
|
||||
[BsonElement("createdAt")]
|
||||
public DateTime CreatedAt { get; set; }
|
||||
= DateTime.SpecifyKind(DateTime.UnixEpoch, DateTimeKind.Utc);
|
||||
|
||||
[BsonElement("updatedAt")]
|
||||
public DateTime UpdatedAt { get; set; }
|
||||
= DateTime.SpecifyKind(DateTime.UnixEpoch, DateTimeKind.Utc);
|
||||
}
|
||||
|
||||
[BsonIgnoreExtraElements]
|
||||
public sealed class OrchestratorScheduleDocument
|
||||
{
|
||||
[BsonElement("cron")]
|
||||
public string Cron { get; set; } = string.Empty;
|
||||
|
||||
[BsonElement("timeZone")]
|
||||
public string TimeZone { get; set; } = "UTC";
|
||||
|
||||
[BsonElement("maxParallelRuns")]
|
||||
public int MaxParallelRuns { get; set; }
|
||||
= 1;
|
||||
|
||||
[BsonElement("maxLagMinutes")]
|
||||
public int MaxLagMinutes { get; set; }
|
||||
= 0;
|
||||
}
|
||||
|
||||
[BsonIgnoreExtraElements]
|
||||
public sealed class OrchestratorRatePolicyDocument
|
||||
{
|
||||
[BsonElement("rpm")]
|
||||
public int Rpm { get; set; }
|
||||
= 0;
|
||||
|
||||
[BsonElement("burst")]
|
||||
public int Burst { get; set; }
|
||||
= 0;
|
||||
|
||||
[BsonElement("cooldownSeconds")]
|
||||
public int CooldownSeconds { get; set; }
|
||||
= 0;
|
||||
}
|
||||
|
||||
[BsonIgnoreExtraElements]
|
||||
public sealed class OrchestratorEgressGuardDocument
|
||||
{
|
||||
[BsonElement("allowlist")]
|
||||
public IReadOnlyCollection<string> Allowlist { get; set; } = Array.Empty<string>();
|
||||
|
||||
[BsonElement("airgapMode")]
|
||||
public bool AirgapMode { get; set; }
|
||||
= true;
|
||||
}
|
||||
|
||||
internal static class OrchestratorRegistryDocumentExtensions
|
||||
{
|
||||
public static OrchestratorRegistryDocument FromRecord(OrchestratorRegistryRecord record)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(record);
|
||||
|
||||
return new OrchestratorRegistryDocument
|
||||
{
|
||||
Id = BuildId(record.Tenant, record.ConnectorId),
|
||||
Tenant = record.Tenant,
|
||||
ConnectorId = record.ConnectorId,
|
||||
Source = record.Source,
|
||||
Capabilities = record.Capabilities,
|
||||
AuthRef = record.AuthRef,
|
||||
Schedule = new OrchestratorScheduleDocument
|
||||
{
|
||||
Cron = record.Schedule.Cron,
|
||||
TimeZone = record.Schedule.TimeZone,
|
||||
MaxParallelRuns = record.Schedule.MaxParallelRuns,
|
||||
MaxLagMinutes = record.Schedule.MaxLagMinutes,
|
||||
},
|
||||
RatePolicy = new OrchestratorRatePolicyDocument
|
||||
{
|
||||
Rpm = record.RatePolicy.Rpm,
|
||||
Burst = record.RatePolicy.Burst,
|
||||
CooldownSeconds = record.RatePolicy.CooldownSeconds,
|
||||
},
|
||||
ArtifactKinds = record.ArtifactKinds,
|
||||
LockKey = record.LockKey,
|
||||
EgressGuard = new OrchestratorEgressGuardDocument
|
||||
{
|
||||
Allowlist = record.EgressGuard.Allowlist,
|
||||
AirgapMode = record.EgressGuard.AirgapMode,
|
||||
},
|
||||
CreatedAt = record.CreatedAt.UtcDateTime,
|
||||
UpdatedAt = record.UpdatedAt.UtcDateTime,
|
||||
};
|
||||
}
|
||||
|
||||
public static OrchestratorRegistryRecord ToRecord(this OrchestratorRegistryDocument document)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(document);
|
||||
|
||||
return new OrchestratorRegistryRecord(
|
||||
document.Tenant,
|
||||
document.ConnectorId,
|
||||
document.Source,
|
||||
document.Capabilities,
|
||||
document.AuthRef,
|
||||
new OrchestratorSchedule(
|
||||
document.Schedule.Cron,
|
||||
document.Schedule.TimeZone,
|
||||
document.Schedule.MaxParallelRuns,
|
||||
document.Schedule.MaxLagMinutes),
|
||||
new OrchestratorRatePolicy(
|
||||
document.RatePolicy.Rpm,
|
||||
document.RatePolicy.Burst,
|
||||
document.RatePolicy.CooldownSeconds),
|
||||
document.ArtifactKinds,
|
||||
document.LockKey,
|
||||
new OrchestratorEgressGuard(document.EgressGuard.Allowlist, document.EgressGuard.AirgapMode),
|
||||
DateTime.SpecifyKind(document.CreatedAt, DateTimeKind.Utc),
|
||||
DateTime.SpecifyKind(document.UpdatedAt, DateTimeKind.Utc));
|
||||
}
|
||||
|
||||
private static string BuildId(string tenant, string connectorId)
|
||||
=> string.Create(CultureInfo.InvariantCulture, $"{tenant}:{connectorId}");
|
||||
}
|
||||
@@ -0,0 +1,33 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace StellaOps.Concelier.Storage.Mongo.Orchestrator;
|
||||
|
||||
public sealed record OrchestratorRegistryRecord(
|
||||
string Tenant,
|
||||
string ConnectorId,
|
||||
string Source,
|
||||
IReadOnlyCollection<string> Capabilities,
|
||||
string AuthRef,
|
||||
OrchestratorSchedule Schedule,
|
||||
OrchestratorRatePolicy RatePolicy,
|
||||
IReadOnlyCollection<string> ArtifactKinds,
|
||||
string LockKey,
|
||||
OrchestratorEgressGuard EgressGuard,
|
||||
DateTimeOffset CreatedAt,
|
||||
DateTimeOffset UpdatedAt);
|
||||
|
||||
public sealed record OrchestratorSchedule(
|
||||
string Cron,
|
||||
string TimeZone,
|
||||
int MaxParallelRuns,
|
||||
int MaxLagMinutes);
|
||||
|
||||
public sealed record OrchestratorRatePolicy(
|
||||
int Rpm,
|
||||
int Burst,
|
||||
int CooldownSeconds);
|
||||
|
||||
public sealed record OrchestratorEgressGuard(
|
||||
IReadOnlyCollection<string> Allowlist,
|
||||
bool AirgapMode);
|
||||
@@ -23,6 +23,7 @@ using StellaOps.Concelier.Storage.Mongo.Migrations;
|
||||
using StellaOps.Concelier.Storage.Mongo.Observations;
|
||||
using StellaOps.Concelier.Core.Observations;
|
||||
using StellaOps.Concelier.Storage.Mongo.Linksets;
|
||||
using StellaOps.Concelier.Storage.Mongo.Orchestrator;
|
||||
|
||||
namespace StellaOps.Concelier.Storage.Mongo;
|
||||
|
||||
@@ -81,17 +82,17 @@ public static class ServiceCollectionExtensions
|
||||
services.AddSingleton<IAdvisoryObservationLookup, AdvisoryObservationLookup>();
|
||||
services.AddSingleton<IAdvisoryEventRepository, MongoAdvisoryEventRepository>();
|
||||
services.AddSingleton<IAdvisoryEventLog, AdvisoryEventLog>();
|
||||
services.AddSingleton<MongoAdvisoryObservationEventPublisher>();
|
||||
services.AddSingleton<IAdvisoryObservationEventPublisher, MongoAdvisoryObservationEventPublisher>();
|
||||
services.AddSingleton<NatsAdvisoryObservationEventPublisher>();
|
||||
services.AddSingleton<IAdvisoryObservationEventPublisher>(sp =>
|
||||
services.AddSingleton<IAdvisoryObservationEventTransport>(sp =>
|
||||
{
|
||||
var options = sp.GetRequiredService<IOptions<AdvisoryObservationEventPublisherOptions>>().Value;
|
||||
if (string.Equals(options.Transport, "nats", StringComparison.OrdinalIgnoreCase))
|
||||
if (options.Enabled && string.Equals(options.Transport, "nats", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
return sp.GetRequiredService<NatsAdvisoryObservationEventPublisher>();
|
||||
}
|
||||
|
||||
return sp.GetRequiredService<MongoAdvisoryObservationEventPublisher>();
|
||||
return NullAdvisoryObservationEventTransport.Instance;
|
||||
});
|
||||
services.AddSingleton<IAdvisoryObservationEventOutbox, MongoAdvisoryObservationEventOutbox>();
|
||||
services.AddSingleton<IAdvisoryRawRepository, MongoAdvisoryRawRepository>();
|
||||
@@ -129,6 +130,24 @@ public static class ServiceCollectionExtensions
|
||||
return database.GetCollection<AdvisoryObservationEventDocument>(MongoStorageDefaults.Collections.AdvisoryObservationEvents);
|
||||
});
|
||||
|
||||
services.AddSingleton<IMongoCollection<OrchestratorRegistryDocument>>(static sp =>
|
||||
{
|
||||
var database = sp.GetRequiredService<IMongoDatabase>();
|
||||
return database.GetCollection<OrchestratorRegistryDocument>(MongoStorageDefaults.Collections.OrchestratorRegistry);
|
||||
});
|
||||
|
||||
services.AddSingleton<IMongoCollection<OrchestratorCommandDocument>>(static sp =>
|
||||
{
|
||||
var database = sp.GetRequiredService<IMongoDatabase>();
|
||||
return database.GetCollection<OrchestratorCommandDocument>(MongoStorageDefaults.Collections.OrchestratorCommands);
|
||||
});
|
||||
|
||||
services.AddSingleton<IMongoCollection<OrchestratorHeartbeatDocument>>(static sp =>
|
||||
{
|
||||
var database = sp.GetRequiredService<IMongoDatabase>();
|
||||
return database.GetCollection<OrchestratorHeartbeatDocument>(MongoStorageDefaults.Collections.OrchestratorHeartbeats);
|
||||
});
|
||||
|
||||
services.AddSingleton<IMongoCollection<AdvisoryLinksetDocument>>(static sp =>
|
||||
{
|
||||
var database = sp.GetRequiredService<IMongoDatabase>();
|
||||
@@ -136,6 +155,7 @@ public static class ServiceCollectionExtensions
|
||||
});
|
||||
|
||||
services.AddHostedService<RawDocumentRetentionService>();
|
||||
services.AddHostedService<AdvisoryObservationTransportWorker>();
|
||||
|
||||
services.AddSingleton<MongoMigrationRunner>();
|
||||
services.AddSingleton<IMongoMigration, EnsureDocumentExpiryIndexesMigration>();
|
||||
@@ -149,9 +169,12 @@ public static class ServiceCollectionExtensions
|
||||
services.AddSingleton<IMongoMigration, EnsureAdvisoryEventCollectionsMigration>();
|
||||
services.AddSingleton<IMongoMigration, EnsureAdvisoryObservationEventCollectionMigration>();
|
||||
services.AddSingleton<IMongoMigration, SemVerStyleBackfillMigration>();
|
||||
services.AddSingleton<IMongoMigration, EnsureOrchestratorCollectionsMigration>();
|
||||
|
||||
services.AddSingleton<IOrchestratorRegistryStore, MongoOrchestratorRegistryStore>();
|
||||
|
||||
services.AddSingleton<IHostedService, AdvisoryObservationTransportWorker>();
|
||||
|
||||
return services;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user