Rename Feedser to Concelier

This commit is contained in:
master
2025-10-18 20:04:15 +03:00
parent dd66f58b00
commit 89ede53cc3
1208 changed files with 4370 additions and 4370 deletions

View File

@@ -0,0 +1,32 @@
# AGENTS
## Role
Job orchestration and lifecycle. Registers job definitions, schedules execution, triggers runs, reports status for connectors and exporters.
## Scope
- Contracts: IJob (execute with CancellationToken), JobRunStatus, JobTriggerOutcome/Result.
- Registration: JobSchedulerBuilder.AddJob<T>(kind, cronExpression?, timeout?, leaseDuration?); options recorded in JobSchedulerOptions.
- Plugin host integration discovers IJob providers via registered IDependencyInjectionRoutine implementations.
- Coordination: start/stop, single-flight via storage locks/leases, run bookkeeping (status, timings, errors).
- Triggering: manual/cron/API; parameterized runs; idempotent rejection if already running.
- Surfacing: enumerate definitions, last run, recent runs, active runs to WebService endpoints.
## Participants
- WebService exposes REST endpoints for definitions, runs, active, and trigger.
- Storage.Mongo persists job definitions metadata, run documents, and leases (locks collection).
- Source connectors and Exporters implement IJob and are registered into the scheduler via DI and Plugin routines.
- Models/Merge/Export are invoked indirectly through jobs.
- Plugin host runtime loads dependency injection routines that register job definitions.
## Interfaces & contracts
- Kind naming: family:source:verb (e.g., nvd:fetch, redhat:map, export:trivy-db).
- Timeout and lease duration enforce cancellation and duplicate-prevention.
- TimeProvider used for deterministic timing in tests.
## In/Out of scope
In: job lifecycle, registration, trigger semantics, run metadata.
Out: business logic of connectors/exporters, HTTP handlers (owned by WebService).
## Observability & security expectations
- Metrics: job.run.started/succeeded/failed, job.durationMs, job.concurrent.rejected, job.alreadyRunning.
- Logs: kind, trigger, params hash, lease holder, outcome; redact params containing secrets.
- Honor CancellationToken early and often.
## Tests
- Author and review coverage in `../StellaOps.Concelier.Core.Tests`.
- Shared fixtures (e.g., `MongoIntegrationFixture`, `ConnectorTestHarness`) live in `../StellaOps.Concelier.Testing`.
- Keep fixtures deterministic; match new cases to real-world advisories or regression scenarios.

View File

@@ -0,0 +1,19 @@
using System.Collections.Immutable;
using StellaOps.Concelier.Models;
namespace StellaOps.Concelier.Core;
/// <summary>
/// Result emitted by <see cref="CanonicalMerger"/> describing the merged advisory and analytics about key decisions.
/// </summary>
public sealed record CanonicalMergeResult(Advisory Advisory, ImmutableArray<FieldDecision> Decisions);
/// <summary>
/// Describes how a particular canonical field was chosen during conflict resolution.
/// </summary>
public sealed record FieldDecision(
string Field,
string? SelectedSource,
string DecisionReason,
DateTimeOffset? SelectedModified,
ImmutableArray<string> ConsideredSources);

View File

@@ -0,0 +1,898 @@
using System.Collections.Immutable;
using System.Security.Cryptography;
using System.Text;
using StellaOps.Concelier.Models;
namespace StellaOps.Concelier.Core;
/// <summary>
/// Resolves conflicts between GHSA, NVD, and OSV advisories into a single canonical advisory following
/// <c>DEDUP_CONFLICTS_RESOLUTION_ALGO.md</c>.
/// </summary>
public sealed class CanonicalMerger
{
private const string GhsaSource = "ghsa";
private const string NvdSource = "nvd";
private const string OsvSource = "osv";
private static readonly ImmutableDictionary<string, string[]> FieldPrecedence = new Dictionary<string, string[]>(StringComparer.OrdinalIgnoreCase)
{
["title"] = new[] { GhsaSource, NvdSource, OsvSource },
["summary"] = new[] { GhsaSource, NvdSource, OsvSource },
["description"] = new[] { GhsaSource, NvdSource, OsvSource },
["language"] = new[] { GhsaSource, NvdSource, OsvSource },
["severity"] = new[] { NvdSource, GhsaSource, OsvSource },
["references"] = new[] { GhsaSource, NvdSource, OsvSource },
["credits"] = new[] { GhsaSource, OsvSource, NvdSource },
["affectedPackages"] = new[] { OsvSource, GhsaSource, NvdSource },
["cvssMetrics"] = new[] { NvdSource, GhsaSource, OsvSource },
["cwes"] = new[] { NvdSource, GhsaSource, OsvSource },
}.ToImmutableDictionary(StringComparer.OrdinalIgnoreCase);
private static readonly ImmutableHashSet<string> FreshnessSensitiveFields = ImmutableHashSet.Create(
StringComparer.OrdinalIgnoreCase,
"title",
"summary",
"description",
"references",
"credits",
"affectedPackages");
private static readonly ImmutableDictionary<string, int> SourceOrder = new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase)
{
[GhsaSource] = 0,
[NvdSource] = 1,
[OsvSource] = 2,
}.ToImmutableDictionary(StringComparer.OrdinalIgnoreCase);
private readonly TimeProvider _timeProvider;
private readonly TimeSpan _freshnessThreshold;
public CanonicalMerger(TimeProvider? timeProvider = null, TimeSpan? freshnessThreshold = null)
{
_timeProvider = timeProvider ?? TimeProvider.System;
_freshnessThreshold = freshnessThreshold ?? TimeSpan.FromHours(48);
}
public CanonicalMergeResult Merge(string advisoryKey, Advisory? ghsa, Advisory? nvd, Advisory? osv)
{
ArgumentException.ThrowIfNullOrWhiteSpace(advisoryKey);
var candidates = BuildCandidates(ghsa, nvd, osv);
if (candidates.Count == 0)
{
throw new ArgumentException("At least one advisory must be provided.", nameof(advisoryKey));
}
var now = _timeProvider.GetUtcNow();
var decisions = new List<FieldDecision>();
var provenanceSet = new HashSet<AdvisoryProvenance>();
foreach (var candidate in candidates)
{
foreach (var existingProvenance in candidate.Advisory.Provenance)
{
provenanceSet.Add(existingProvenance);
}
}
var titleSelection = SelectStringField("title", candidates, advisory => advisory.Title, isFreshnessSensitive: true);
if (titleSelection.HasValue)
{
decisions.Add(titleSelection.Decision);
AddMergeProvenance(provenanceSet, titleSelection, now, ProvenanceFieldMasks.Advisory);
}
var summarySelection = SelectStringField("summary", candidates, advisory => advisory.Summary, isFreshnessSensitive: true);
if (summarySelection.HasValue)
{
decisions.Add(summarySelection.Decision);
AddMergeProvenance(provenanceSet, summarySelection, now, ProvenanceFieldMasks.Advisory);
}
var descriptionSelection = SelectStringField("description", candidates, advisory => advisory.Description, isFreshnessSensitive: true);
if (descriptionSelection.HasValue)
{
decisions.Add(descriptionSelection.Decision);
AddMergeProvenance(provenanceSet, descriptionSelection, now, ProvenanceFieldMasks.Advisory);
}
var languageSelection = SelectStringField("language", candidates, advisory => advisory.Language, isFreshnessSensitive: false);
if (languageSelection.HasValue)
{
decisions.Add(languageSelection.Decision);
AddMergeProvenance(provenanceSet, languageSelection, now, ProvenanceFieldMasks.Advisory);
}
var topLevelSeveritySelection = SelectStringField("severity", candidates, advisory => advisory.Severity, isFreshnessSensitive: false);
if (topLevelSeveritySelection.HasValue)
{
decisions.Add(topLevelSeveritySelection.Decision);
AddMergeProvenance(provenanceSet, topLevelSeveritySelection, now, ProvenanceFieldMasks.Advisory);
}
var aliases = MergeAliases(candidates);
var creditsResult = MergeCredits(candidates);
if (creditsResult.UnionDecision is not null)
{
decisions.Add(creditsResult.UnionDecision);
}
decisions.AddRange(creditsResult.Decisions);
var referencesResult = MergeReferences(candidates);
if (referencesResult.UnionDecision is not null)
{
decisions.Add(referencesResult.UnionDecision);
}
decisions.AddRange(referencesResult.Decisions);
var weaknessesResult = MergeWeaknesses(candidates, now);
decisions.AddRange(weaknessesResult.Decisions);
foreach (var weaknessProvenance in weaknessesResult.AdditionalProvenance)
{
provenanceSet.Add(weaknessProvenance);
}
var packagesResult = MergePackages(candidates, now);
decisions.AddRange(packagesResult.Decisions);
foreach (var packageProvenance in packagesResult.AdditionalProvenance)
{
provenanceSet.Add(packageProvenance);
}
var metricsResult = MergeCvssMetrics(candidates);
if (metricsResult.Decision is not null)
{
decisions.Add(metricsResult.Decision);
}
var exploitKnown = candidates.Any(candidate => candidate.Advisory.ExploitKnown);
var published = candidates
.Select(candidate => candidate.Advisory.Published)
.Where(static value => value.HasValue)
.Select(static value => value!.Value)
.DefaultIfEmpty()
.Min();
var modified = candidates
.Select(candidate => candidate.Advisory.Modified)
.Where(static value => value.HasValue)
.Select(static value => value!.Value)
.DefaultIfEmpty()
.Max();
var title = titleSelection.Value ?? ghsa?.Title ?? nvd?.Title ?? osv?.Title ?? advisoryKey;
var summary = summarySelection.Value ?? ghsa?.Summary ?? nvd?.Summary ?? osv?.Summary;
var description = descriptionSelection.Value ?? ghsa?.Description ?? nvd?.Description ?? osv?.Description;
var language = languageSelection.Value ?? ghsa?.Language ?? nvd?.Language ?? osv?.Language;
var severity = topLevelSeveritySelection.Value ?? metricsResult.CanonicalSeverity ?? ghsa?.Severity ?? nvd?.Severity ?? osv?.Severity;
var canonicalMetricId = metricsResult.CanonicalMetricId ?? ghsa?.CanonicalMetricId ?? nvd?.CanonicalMetricId ?? osv?.CanonicalMetricId;
if (string.IsNullOrWhiteSpace(title))
{
title = advisoryKey;
}
var provenance = provenanceSet
.OrderBy(static p => p.Source, StringComparer.Ordinal)
.ThenBy(static p => p.Kind, StringComparer.Ordinal)
.ThenBy(static p => p.RecordedAt)
.ToImmutableArray();
var advisory = new Advisory(
advisoryKey,
title,
summary,
language,
published == DateTimeOffset.MinValue ? null : published,
modified == DateTimeOffset.MinValue ? null : modified,
severity,
exploitKnown,
aliases,
creditsResult.Credits,
referencesResult.References,
packagesResult.Packages,
metricsResult.Metrics,
provenance,
description,
weaknessesResult.Weaknesses,
canonicalMetricId);
return new CanonicalMergeResult(
advisory,
decisions
.OrderBy(static d => d.Field, StringComparer.Ordinal)
.ThenBy(static d => d.SelectedSource, StringComparer.Ordinal)
.ToImmutableArray());
}
private ImmutableArray<string> MergeAliases(List<AdvisorySnapshot> candidates)
{
var set = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
foreach (var candidate in candidates)
{
foreach (var alias in candidate.Advisory.Aliases)
{
if (!string.IsNullOrWhiteSpace(alias))
{
set.Add(alias);
}
}
}
return set.Count == 0 ? ImmutableArray<string>.Empty : set.OrderBy(static value => value, StringComparer.Ordinal).ToImmutableArray();
}
private CreditsMergeResult MergeCredits(List<AdvisorySnapshot> candidates)
{
var precedence = GetPrecedence("credits");
var isFreshnessSensitive = FreshnessSensitiveFields.Contains("credits");
var map = new Dictionary<string, CreditSelection>(StringComparer.OrdinalIgnoreCase);
var considered = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
var decisions = new List<FieldDecision>();
foreach (var candidate in candidates)
{
foreach (var credit in candidate.Advisory.Credits)
{
var key = $"{credit.DisplayName}|{credit.Role}";
considered.Add(candidate.Source);
if (!map.TryGetValue(key, out var existing))
{
map[key] = new CreditSelection(credit, candidate.Source, candidate.Modified);
continue;
}
var candidateRank = GetRank(candidate.Source, precedence);
var existingRank = GetRank(existing.Source, precedence);
var reason = EvaluateReplacementReason(candidateRank, existingRank, candidate.Modified, existing.Modified, isFreshnessSensitive);
if (reason is null)
{
continue;
}
var consideredSources = new HashSet<string>(StringComparer.OrdinalIgnoreCase)
{
existing.Source,
candidate.Source,
};
map[key] = new CreditSelection(credit, candidate.Source, candidate.Modified);
decisions.Add(new FieldDecision(
Field: $"credits[{key}]",
SelectedSource: candidate.Source,
DecisionReason: reason,
SelectedModified: candidate.Modified,
ConsideredSources: consideredSources.OrderBy(static value => value, StringComparer.OrdinalIgnoreCase).ToImmutableArray()));
}
}
var credits = map.Values.Select(static s => s.Credit).ToImmutableArray();
FieldDecision? decision = null;
if (considered.Count > 0)
{
decision = new FieldDecision(
Field: "credits",
SelectedSource: null,
DecisionReason: "union",
SelectedModified: null,
ConsideredSources: considered.OrderBy(static value => value, StringComparer.OrdinalIgnoreCase).ToImmutableArray());
}
return new CreditsMergeResult(credits, decision, decisions);
}
private ReferencesMergeResult MergeReferences(List<AdvisorySnapshot> candidates)
{
var precedence = GetPrecedence("references");
var isFreshnessSensitive = FreshnessSensitiveFields.Contains("references");
var map = new Dictionary<string, ReferenceSelection>(StringComparer.OrdinalIgnoreCase);
var considered = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
var decisions = new List<FieldDecision>();
foreach (var candidate in candidates)
{
foreach (var reference in candidate.Advisory.References)
{
if (string.IsNullOrWhiteSpace(reference.Url))
{
continue;
}
var key = NormalizeReferenceKey(reference.Url);
considered.Add(candidate.Source);
if (!map.TryGetValue(key, out var existing))
{
map[key] = new ReferenceSelection(reference, candidate.Source, candidate.Modified);
continue;
}
var candidateRank = GetRank(candidate.Source, precedence);
var existingRank = GetRank(existing.Source, precedence);
var reason = EvaluateReplacementReason(candidateRank, existingRank, candidate.Modified, existing.Modified, isFreshnessSensitive);
if (reason is null)
{
continue;
}
var consideredSources = new HashSet<string>(StringComparer.OrdinalIgnoreCase)
{
existing.Source,
candidate.Source,
};
map[key] = new ReferenceSelection(reference, candidate.Source, candidate.Modified);
decisions.Add(new FieldDecision(
Field: $"references[{key}]",
SelectedSource: candidate.Source,
DecisionReason: reason,
SelectedModified: candidate.Modified,
ConsideredSources: consideredSources.OrderBy(static value => value, StringComparer.OrdinalIgnoreCase).ToImmutableArray()));
}
}
var references = map.Values.Select(static s => s.Reference).ToImmutableArray();
FieldDecision? decision = null;
if (considered.Count > 0)
{
decision = new FieldDecision(
Field: "references",
SelectedSource: null,
DecisionReason: "union",
SelectedModified: null,
ConsideredSources: considered.OrderBy(static value => value, StringComparer.OrdinalIgnoreCase).ToImmutableArray());
}
return new ReferencesMergeResult(references, decision, decisions);
}
private PackagesMergeResult MergePackages(List<AdvisorySnapshot> candidates, DateTimeOffset now)
{
var precedence = GetPrecedence("affectedPackages");
var isFreshnessSensitive = FreshnessSensitiveFields.Contains("affectedPackages");
var map = new Dictionary<string, PackageSelection>(StringComparer.OrdinalIgnoreCase);
var decisions = new List<FieldDecision>();
var additionalProvenance = new List<AdvisoryProvenance>();
foreach (var candidate in candidates)
{
foreach (var package in candidate.Advisory.AffectedPackages)
{
var key = CreatePackageKey(package);
var consideredSources = new HashSet<string>(StringComparer.OrdinalIgnoreCase) { candidate.Source };
if (!map.TryGetValue(key, out var existing))
{
var enriched = AppendMergeProvenance(package, candidate.Source, "precedence", now);
additionalProvenance.Add(enriched.MergeProvenance);
map[key] = new PackageSelection(enriched.Package, candidate.Source, candidate.Modified);
decisions.Add(new FieldDecision(
Field: $"affectedPackages[{key}]",
SelectedSource: candidate.Source,
DecisionReason: "precedence",
SelectedModified: candidate.Modified,
ConsideredSources: consideredSources.ToImmutableArray()));
continue;
}
consideredSources.Add(existing.Source);
var candidateRank = GetRank(candidate.Source, precedence);
var existingRank = GetRank(existing.Source, precedence);
var reason = EvaluateReplacementReason(candidateRank, existingRank, candidate.Modified, existing.Modified, isFreshnessSensitive);
if (reason is null)
{
continue;
}
var enrichedPackage = AppendMergeProvenance(package, candidate.Source, reason, now);
additionalProvenance.Add(enrichedPackage.MergeProvenance);
map[key] = new PackageSelection(enrichedPackage.Package, candidate.Source, candidate.Modified);
decisions.Add(new FieldDecision(
Field: $"affectedPackages[{key}]",
SelectedSource: candidate.Source,
DecisionReason: reason,
SelectedModified: candidate.Modified,
ConsideredSources: consideredSources.ToImmutableArray()));
}
}
var packages = map.Values.Select(static s => s.Package).ToImmutableArray();
return new PackagesMergeResult(packages, decisions, additionalProvenance);
}
private WeaknessMergeResult MergeWeaknesses(List<AdvisorySnapshot> candidates, DateTimeOffset now)
{
var precedence = GetPrecedence("cwes");
var map = new Dictionary<string, WeaknessSelection>(StringComparer.OrdinalIgnoreCase);
var decisions = new List<FieldDecision>();
var additionalProvenance = new List<AdvisoryProvenance>();
foreach (var candidate in candidates)
{
var candidateWeaknesses = candidate.Advisory.Cwes.IsDefaultOrEmpty
? ImmutableArray<AdvisoryWeakness>.Empty
: candidate.Advisory.Cwes;
foreach (var weakness in candidateWeaknesses)
{
var key = $"{weakness.Taxonomy}|{weakness.Identifier}";
var consideredSources = new HashSet<string>(StringComparer.OrdinalIgnoreCase) { candidate.Source };
if (!map.TryGetValue(key, out var existing))
{
var enriched = AppendWeaknessProvenance(weakness, candidate.Source, "precedence", now);
map[key] = new WeaknessSelection(enriched.Weakness, candidate.Source, candidate.Modified);
additionalProvenance.Add(enriched.MergeProvenance);
decisions.Add(new FieldDecision(
Field: $"cwes[{key}]",
SelectedSource: candidate.Source,
DecisionReason: "precedence",
SelectedModified: candidate.Modified,
ConsideredSources: consideredSources.ToImmutableArray()));
continue;
}
consideredSources.Add(existing.Source);
var candidateRank = GetRank(candidate.Source, precedence);
var existingRank = GetRank(existing.Source, precedence);
var decisionReason = string.Empty;
var shouldReplace = false;
if (candidateRank < existingRank)
{
shouldReplace = true;
decisionReason = "precedence";
}
else if (candidateRank == existingRank && candidate.Modified > existing.Modified)
{
shouldReplace = true;
decisionReason = "tie_breaker";
}
if (!shouldReplace)
{
continue;
}
var enrichedWeakness = AppendWeaknessProvenance(weakness, candidate.Source, decisionReason, now);
map[key] = new WeaknessSelection(enrichedWeakness.Weakness, candidate.Source, candidate.Modified);
additionalProvenance.Add(enrichedWeakness.MergeProvenance);
decisions.Add(new FieldDecision(
Field: $"cwes[{key}]",
SelectedSource: candidate.Source,
DecisionReason: decisionReason,
SelectedModified: candidate.Modified,
ConsideredSources: consideredSources.ToImmutableArray()));
}
}
var mergedWeaknesses = map.Values
.Select(static value => value.Weakness)
.OrderBy(static value => value.Taxonomy, StringComparer.Ordinal)
.ThenBy(static value => value.Identifier, StringComparer.Ordinal)
.ThenBy(static value => value.Name, StringComparer.Ordinal)
.ToImmutableArray();
return new WeaknessMergeResult(mergedWeaknesses, decisions, additionalProvenance);
}
private CvssMergeResult MergeCvssMetrics(List<AdvisorySnapshot> candidates)
{
var precedence = GetPrecedence("cvssMetrics");
var map = new Dictionary<string, MetricSelection>(StringComparer.OrdinalIgnoreCase);
var considered = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
foreach (var candidate in candidates)
{
foreach (var metric in candidate.Advisory.CvssMetrics)
{
var key = $"{metric.Version}|{metric.Vector}";
considered.Add(candidate.Source);
if (!map.TryGetValue(key, out var existing))
{
map[key] = new MetricSelection(metric, candidate.Source, candidate.Modified);
continue;
}
var candidateRank = GetRank(candidate.Source, precedence);
var existingRank = GetRank(existing.Source, precedence);
if (candidateRank < existingRank ||
(candidateRank == existingRank && candidate.Modified > existing.Modified))
{
map[key] = new MetricSelection(metric, candidate.Source, candidate.Modified);
}
}
}
var orderedMetrics = map
.Values
.OrderBy(selection => GetRank(selection.Source, precedence))
.ThenByDescending(selection => selection.Modified)
.Select(static selection => selection.Metric)
.ToImmutableArray();
FieldDecision? decision = null;
string? canonicalMetricId = null;
string? canonicalSelectedSource = null;
DateTimeOffset? canonicalSelectedModified = null;
var canonical = orderedMetrics.FirstOrDefault();
if (canonical is not null)
{
canonicalMetricId = $"{canonical.Version}|{canonical.Vector}";
if (map.TryGetValue(canonicalMetricId, out var selection))
{
canonicalSelectedSource = selection.Source;
canonicalSelectedModified = selection.Modified;
}
}
if (considered.Count > 0)
{
decision = new FieldDecision(
Field: "cvssMetrics",
SelectedSource: canonicalSelectedSource,
DecisionReason: "precedence",
SelectedModified: canonicalSelectedModified,
ConsideredSources: considered.OrderBy(static value => value, StringComparer.OrdinalIgnoreCase).ToImmutableArray());
}
var severity = canonical?.BaseSeverity;
return new CvssMergeResult(orderedMetrics, severity, canonicalMetricId, decision);
}
private static string CreatePackageKey(AffectedPackage package)
=> string.Join('|', package.Type ?? string.Empty, package.Identifier ?? string.Empty, package.Platform ?? string.Empty);
private static (AffectedPackage Package, AdvisoryProvenance MergeProvenance) AppendMergeProvenance(
AffectedPackage package,
string source,
string decisionReason,
DateTimeOffset recordedAt)
{
var provenance = new AdvisoryProvenance(
source,
kind: "merge",
value: CreatePackageKey(package),
recordedAt: recordedAt,
fieldMask: new[] { ProvenanceFieldMasks.AffectedPackages },
decisionReason: decisionReason);
var provenanceList = package.Provenance.ToBuilder();
provenanceList.Add(provenance);
var packageWithProvenance = new AffectedPackage(
package.Type,
package.Identifier,
package.Platform,
package.VersionRanges,
package.Statuses,
provenanceList,
package.NormalizedVersions);
return (packageWithProvenance, provenance);
}
private static string NormalizeReferenceKey(string url)
{
var trimmed = url?.Trim();
if (string.IsNullOrEmpty(trimmed))
{
return string.Empty;
}
if (!Uri.TryCreate(trimmed, UriKind.Absolute, out var uri))
{
return trimmed;
}
var builder = new StringBuilder();
var scheme = uri.Scheme.Equals("http", StringComparison.OrdinalIgnoreCase) ? "https" : uri.Scheme.ToLowerInvariant();
builder.Append(scheme).Append("://").Append(uri.Host.ToLowerInvariant());
if (!uri.IsDefaultPort)
{
builder.Append(':').Append(uri.Port);
}
var path = uri.AbsolutePath;
if (!string.IsNullOrEmpty(path) && path != "/")
{
if (!path.StartsWith('/'))
{
builder.Append('/');
}
builder.Append(path.TrimEnd('/'));
}
var query = uri.Query;
if (!string.IsNullOrEmpty(query))
{
var parameters = query.TrimStart('?')
.Split('&', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries);
Array.Sort(parameters, StringComparer.Ordinal);
builder.Append('?').Append(string.Join('&', parameters));
}
return builder.ToString();
}
private string? EvaluateReplacementReason(int candidateRank, int existingRank, DateTimeOffset candidateModified, DateTimeOffset existingModified, bool isFreshnessSensitive)
{
if (candidateRank < existingRank)
{
return "precedence";
}
if (isFreshnessSensitive && candidateRank > existingRank && candidateModified - existingModified >= _freshnessThreshold)
{
return "freshness_override";
}
if (candidateRank == existingRank && candidateModified > existingModified)
{
return "tie_breaker";
}
return null;
}
private static (AdvisoryWeakness Weakness, AdvisoryProvenance MergeProvenance) AppendWeaknessProvenance(
AdvisoryWeakness weakness,
string source,
string decisionReason,
DateTimeOffset recordedAt)
{
var provenance = new AdvisoryProvenance(
source,
kind: "merge",
value: $"{weakness.Taxonomy}:{weakness.Identifier}",
recordedAt: recordedAt,
fieldMask: new[] { ProvenanceFieldMasks.Weaknesses },
decisionReason: decisionReason);
var provenanceList = weakness.Provenance.IsDefaultOrEmpty
? ImmutableArray.Create(provenance)
: weakness.Provenance.Add(provenance);
var weaknessWithProvenance = new AdvisoryWeakness(
weakness.Taxonomy,
weakness.Identifier,
weakness.Name,
weakness.Uri,
provenanceList);
return (weaknessWithProvenance, provenance);
}
private FieldSelection<string> SelectStringField(
string field,
List<AdvisorySnapshot> candidates,
Func<Advisory, string?> selector,
bool isFreshnessSensitive)
{
var precedence = GetPrecedence(field);
var valueCandidates = new List<ValueCandidate>();
foreach (var candidate in candidates)
{
var value = Validation.TrimToNull(selector(candidate.Advisory));
if (!string.IsNullOrEmpty(value))
{
valueCandidates.Add(new ValueCandidate(candidate, value));
}
}
if (valueCandidates.Count == 0)
{
return FieldSelection<string>.Empty;
}
var consideredSources = valueCandidates
.Select(vc => vc.Candidate.Source)
.Distinct(StringComparer.OrdinalIgnoreCase)
.OrderBy(static source => source, StringComparer.OrdinalIgnoreCase)
.ToImmutableArray();
var best = valueCandidates
.OrderBy(vc => GetRank(vc.Candidate.Source, precedence))
.ThenByDescending(vc => vc.Candidate.Modified)
.First();
var decisionReason = "precedence";
if (isFreshnessSensitive)
{
var freshnessOverride = valueCandidates
.Where(vc => GetRank(vc.Candidate.Source, precedence) > GetRank(best.Candidate.Source, precedence))
.Where(vc => vc.Candidate.Modified - best.Candidate.Modified >= _freshnessThreshold)
.OrderByDescending(vc => vc.Candidate.Modified)
.ThenBy(vc => GetRank(vc.Candidate.Source, precedence))
.FirstOrDefault();
if (freshnessOverride is not null)
{
best = freshnessOverride;
decisionReason = "freshness_override";
}
}
var sameRankCandidates = valueCandidates
.Where(vc => GetRank(vc.Candidate.Source, precedence) == GetRank(best.Candidate.Source, precedence))
.ToList();
if (sameRankCandidates.Count > 1)
{
var tied = sameRankCandidates
.OrderBy(vc => vc.Value.Length)
.ThenBy(vc => vc.Value, StringComparer.Ordinal)
.ThenBy(vc => ComputeStableHash(vc.Value))
.First();
if (!ReferenceEquals(tied, best))
{
best = tied;
decisionReason = "tie_breaker";
}
}
var decision = new FieldDecision(
field,
best.Candidate.Source,
decisionReason,
best.Candidate.Modified,
consideredSources);
return new FieldSelection<string>(field, best.Value, best.Candidate, decisionReason, decision);
}
private static void AddMergeProvenance(
HashSet<AdvisoryProvenance> provenanceSet,
FieldSelection<string> selection,
DateTimeOffset recordedAt,
string fieldMask)
{
if (!selection.HasValue || selection.Winner is null)
{
return;
}
var provenance = new AdvisoryProvenance(
selection.Winner.Source,
kind: "merge",
value: selection.Field,
recordedAt: recordedAt,
fieldMask: new[] { fieldMask },
decisionReason: selection.DecisionReason);
provenanceSet.Add(provenance);
}
private static List<AdvisorySnapshot> BuildCandidates(Advisory? ghsa, Advisory? nvd, Advisory? osv)
{
var list = new List<AdvisorySnapshot>(capacity: 3);
if (ghsa is not null)
{
list.Add(CreateSnapshot(GhsaSource, ghsa));
}
if (nvd is not null)
{
list.Add(CreateSnapshot(NvdSource, nvd));
}
if (osv is not null)
{
list.Add(CreateSnapshot(OsvSource, osv));
}
return list;
}
private static AdvisorySnapshot CreateSnapshot(string source, Advisory advisory)
{
var modified = advisory.Modified
?? advisory.Published
?? DateTimeOffset.UnixEpoch;
return new AdvisorySnapshot(source, advisory, modified);
}
private static ImmutableDictionary<string, int> GetPrecedence(string field)
{
if (FieldPrecedence.TryGetValue(field, out var order))
{
return order
.Select((source, index) => (source, index))
.ToImmutableDictionary(item => item.source, item => item.index, StringComparer.OrdinalIgnoreCase);
}
return SourceOrder;
}
private static int GetRank(string source, ImmutableDictionary<string, int> precedence)
=> precedence.TryGetValue(source, out var rank) ? rank : int.MaxValue;
private static string ComputeStableHash(string value)
{
var bytes = Encoding.UTF8.GetBytes(value);
var hash = SHA256.HashData(bytes);
return Convert.ToHexString(hash);
}
private sealed class FieldSelection<T>
{
public FieldSelection(string field, T? value, AdvisorySnapshot? winner, string decisionReason, FieldDecision decision)
{
Field = field;
Value = value;
Winner = winner;
DecisionReason = decisionReason;
Decision = decision;
}
public string Field { get; }
public T? Value { get; }
public AdvisorySnapshot? Winner { get; }
public string DecisionReason { get; }
public FieldDecision Decision { get; }
public bool HasValue => Winner is not null;
public static FieldSelection<T> Empty { get; } = new FieldSelection<T>(
string.Empty,
default,
null,
string.Empty,
new FieldDecision(string.Empty, null, string.Empty, null, ImmutableArray<string>.Empty));
}
private sealed record AdvisorySnapshot(string Source, Advisory Advisory, DateTimeOffset Modified);
private sealed record ValueCandidate(AdvisorySnapshot Candidate, string Value);
private readonly record struct PackageSelection(AffectedPackage Package, string Source, DateTimeOffset Modified);
private readonly record struct ReferenceSelection(AdvisoryReference Reference, string Source, DateTimeOffset Modified);
private readonly record struct CreditSelection(AdvisoryCredit Credit, string Source, DateTimeOffset Modified);
private readonly record struct MetricSelection(CvssMetric Metric, string Source, DateTimeOffset Modified);
private readonly record struct WeaknessSelection(AdvisoryWeakness Weakness, string Source, DateTimeOffset Modified);
private readonly record struct CreditsMergeResult(ImmutableArray<AdvisoryCredit> Credits, FieldDecision? UnionDecision, IReadOnlyList<FieldDecision> Decisions);
private readonly record struct ReferencesMergeResult(ImmutableArray<AdvisoryReference> References, FieldDecision? UnionDecision, IReadOnlyList<FieldDecision> Decisions);
private readonly record struct PackagesMergeResult(
ImmutableArray<AffectedPackage> Packages,
IReadOnlyList<FieldDecision> Decisions,
IReadOnlyList<AdvisoryProvenance> AdditionalProvenance);
private readonly record struct WeaknessMergeResult(
ImmutableArray<AdvisoryWeakness> Weaknesses,
IReadOnlyList<FieldDecision> Decisions,
IReadOnlyList<AdvisoryProvenance> AdditionalProvenance);
private readonly record struct CvssMergeResult(
ImmutableArray<CvssMetric> Metrics,
string? CanonicalSeverity,
string? CanonicalMetricId,
FieldDecision? Decision);
}

View File

@@ -0,0 +1,6 @@
namespace StellaOps.Concelier.Core.Jobs;
public interface IJob
{
Task ExecuteAsync(JobExecutionContext context, CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,18 @@
namespace StellaOps.Concelier.Core.Jobs;
public interface IJobCoordinator
{
Task<JobTriggerResult> TriggerAsync(string kind, IReadOnlyDictionary<string, object?>? parameters, string trigger, CancellationToken cancellationToken);
Task<IReadOnlyList<JobDefinition>> GetDefinitionsAsync(CancellationToken cancellationToken);
Task<IReadOnlyList<JobRunSnapshot>> GetRecentRunsAsync(string? kind, int limit, CancellationToken cancellationToken);
Task<IReadOnlyList<JobRunSnapshot>> GetActiveRunsAsync(CancellationToken cancellationToken);
Task<JobRunSnapshot?> GetRunAsync(Guid runId, CancellationToken cancellationToken);
Task<JobRunSnapshot?> GetLastRunAsync(string kind, CancellationToken cancellationToken);
Task<IReadOnlyDictionary<string, JobRunSnapshot>> GetLastRunsAsync(IEnumerable<string> kinds, CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,20 @@
namespace StellaOps.Concelier.Core.Jobs;
public interface IJobStore
{
Task<JobRunSnapshot> CreateAsync(JobRunCreateRequest request, CancellationToken cancellationToken);
Task<JobRunSnapshot?> TryStartAsync(Guid runId, DateTimeOffset startedAt, CancellationToken cancellationToken);
Task<JobRunSnapshot?> TryCompleteAsync(Guid runId, JobRunCompletion completion, CancellationToken cancellationToken);
Task<JobRunSnapshot?> FindAsync(Guid runId, CancellationToken cancellationToken);
Task<IReadOnlyList<JobRunSnapshot>> GetRecentRunsAsync(string? kind, int limit, CancellationToken cancellationToken);
Task<IReadOnlyList<JobRunSnapshot>> GetActiveRunsAsync(CancellationToken cancellationToken);
Task<JobRunSnapshot?> GetLastRunAsync(string kind, CancellationToken cancellationToken);
Task<IReadOnlyDictionary<string, JobRunSnapshot>> GetLastRunsAsync(IEnumerable<string> kinds, CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,10 @@
namespace StellaOps.Concelier.Core.Jobs;
public interface ILeaseStore
{
Task<JobLease?> TryAcquireAsync(string key, string holder, TimeSpan leaseDuration, DateTimeOffset now, CancellationToken cancellationToken);
Task<JobLease?> HeartbeatAsync(string key, string holder, TimeSpan leaseDuration, DateTimeOffset now, CancellationToken cancellationToken);
Task<bool> ReleaseAsync(string key, string holder, CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,635 @@
using System.Collections;
using System.Diagnostics;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using System.Globalization;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace StellaOps.Concelier.Core.Jobs;
public sealed class JobCoordinator : IJobCoordinator
{
private readonly JobSchedulerOptions _options;
private readonly IJobStore _jobStore;
private readonly ILeaseStore _leaseStore;
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<JobCoordinator> _logger;
private readonly ILoggerFactory _loggerFactory;
private readonly TimeProvider _timeProvider;
private readonly JobDiagnostics _diagnostics;
private readonly string _holderId;
public JobCoordinator(
IOptions<JobSchedulerOptions> optionsAccessor,
IJobStore jobStore,
ILeaseStore leaseStore,
IServiceScopeFactory scopeFactory,
ILogger<JobCoordinator> logger,
ILoggerFactory loggerFactory,
TimeProvider timeProvider,
JobDiagnostics diagnostics)
{
_options = (optionsAccessor ?? throw new ArgumentNullException(nameof(optionsAccessor))).Value;
_jobStore = jobStore ?? throw new ArgumentNullException(nameof(jobStore));
_leaseStore = leaseStore ?? throw new ArgumentNullException(nameof(leaseStore));
_scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(scopeFactory));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
_diagnostics = diagnostics ?? throw new ArgumentNullException(nameof(diagnostics));
_holderId = BuildHolderId();
}
public async Task<JobTriggerResult> TriggerAsync(string kind, IReadOnlyDictionary<string, object?>? parameters, string trigger, CancellationToken cancellationToken)
{
using var triggerActivity = _diagnostics.StartTriggerActivity(kind, trigger);
if (!_options.Definitions.TryGetValue(kind, out var definition))
{
var result = JobTriggerResult.NotFound($"Job kind '{kind}' is not registered.");
triggerActivity?.SetStatus(ActivityStatusCode.Error, result.ErrorMessage);
triggerActivity?.SetTag("job.trigger.outcome", result.Outcome.ToString());
_diagnostics.RecordTriggerRejected(kind, trigger, "not_found");
return result;
}
triggerActivity?.SetTag("job.enabled", definition.Enabled);
triggerActivity?.SetTag("job.timeout_seconds", definition.Timeout.TotalSeconds);
triggerActivity?.SetTag("job.lease_seconds", definition.LeaseDuration.TotalSeconds);
if (!definition.Enabled)
{
var result = JobTriggerResult.Disabled($"Job kind '{kind}' is disabled.");
triggerActivity?.SetStatus(ActivityStatusCode.Ok, "disabled");
triggerActivity?.SetTag("job.trigger.outcome", result.Outcome.ToString());
_diagnostics.RecordTriggerRejected(kind, trigger, "disabled");
return result;
}
parameters ??= new Dictionary<string, object?>();
var parameterSnapshot = parameters.Count == 0
? new Dictionary<string, object?>(StringComparer.Ordinal)
: new Dictionary<string, object?>(parameters, StringComparer.Ordinal);
if (!TryNormalizeParameters(parameterSnapshot, out var normalizedParameters, out var parameterError))
{
var message = string.IsNullOrWhiteSpace(parameterError)
? "Job trigger parameters contain unsupported values."
: parameterError;
triggerActivity?.SetStatus(ActivityStatusCode.Error, message);
triggerActivity?.SetTag("job.trigger.outcome", JobTriggerOutcome.InvalidParameters.ToString());
_diagnostics.RecordTriggerRejected(kind, trigger, "invalid_parameters");
return JobTriggerResult.InvalidParameters(message);
}
parameterSnapshot = normalizedParameters;
string? parametersHash;
try
{
parametersHash = JobParametersHasher.Compute(parameterSnapshot);
}
catch (Exception ex)
{
var message = $"Job trigger parameters cannot be serialized: {ex.Message}";
triggerActivity?.SetStatus(ActivityStatusCode.Error, message);
triggerActivity?.SetTag("job.trigger.outcome", JobTriggerOutcome.InvalidParameters.ToString());
_diagnostics.RecordTriggerRejected(kind, trigger, "invalid_parameters");
_logger.LogWarning(ex, "Failed to serialize parameters for job {Kind}", kind);
return JobTriggerResult.InvalidParameters(message);
}
triggerActivity?.SetTag("job.parameters_count", parameterSnapshot.Count);
var now = _timeProvider.GetUtcNow();
var leaseDuration = definition.LeaseDuration <= TimeSpan.Zero ? _options.DefaultLeaseDuration : definition.LeaseDuration;
JobLease? lease = null;
try
{
lease = await _leaseStore.TryAcquireAsync(definition.LeaseKey, _holderId, leaseDuration, now, cancellationToken).ConfigureAwait(false);
if (lease is null)
{
var result = JobTriggerResult.AlreadyRunning($"Job '{kind}' is already running.");
triggerActivity?.SetStatus(ActivityStatusCode.Ok, "already_running");
triggerActivity?.SetTag("job.trigger.outcome", result.Outcome.ToString());
_diagnostics.RecordTriggerRejected(kind, trigger, "already_running");
return result;
}
var createdAt = _timeProvider.GetUtcNow();
var request = new JobRunCreateRequest(
definition.Kind,
trigger,
parameterSnapshot,
parametersHash,
definition.Timeout,
leaseDuration,
createdAt);
triggerActivity?.SetTag("job.parameters_hash", request.ParametersHash);
var run = await _jobStore.CreateAsync(request, cancellationToken).ConfigureAwait(false);
var startedAt = _timeProvider.GetUtcNow();
var started = await _jobStore.TryStartAsync(run.RunId, startedAt, cancellationToken).ConfigureAwait(false) ?? run;
triggerActivity?.SetTag("job.run_id", started.RunId);
triggerActivity?.SetTag("job.created_at", createdAt.UtcDateTime);
triggerActivity?.SetTag("job.started_at", started.StartedAt?.UtcDateTime ?? startedAt.UtcDateTime);
var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
if (definition.Timeout > TimeSpan.Zero)
{
linkedTokenSource.CancelAfter(definition.Timeout);
}
var capturedLease = lease ?? throw new InvalidOperationException("Lease acquisition returned null.");
try
{
_ = Task.Run(() => ExecuteJobAsync(definition, capturedLease, started, parameterSnapshot, trigger, linkedTokenSource), CancellationToken.None)
.ContinueWith(t =>
{
if (t.Exception is not null)
{
_logger.LogError(t.Exception, "Unhandled job execution failure for {Kind}", definition.Kind);
}
},
TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously);
lease = null; // released by background job execution
}
catch (Exception ex)
{
lease = capturedLease; // ensure outer finally releases if scheduling fails
triggerActivity?.SetStatus(ActivityStatusCode.Error, ex.Message);
triggerActivity?.SetTag("job.trigger.outcome", "exception");
_diagnostics.RecordTriggerRejected(kind, trigger, "queue_failure");
throw;
}
var accepted = JobTriggerResult.Accepted(started);
_diagnostics.RecordTriggerAccepted(kind, trigger);
triggerActivity?.SetStatus(ActivityStatusCode.Ok);
triggerActivity?.SetTag("job.trigger.outcome", accepted.Outcome.ToString());
return accepted;
}
catch (Exception ex)
{
triggerActivity?.SetStatus(ActivityStatusCode.Error, ex.Message);
triggerActivity?.SetTag("job.trigger.outcome", "exception");
_diagnostics.RecordTriggerRejected(kind, trigger, "exception");
throw;
}
finally
{
// Release handled by background execution path. If we failed before scheduling, release here.
if (lease is not null)
{
var releaseError = await TryReleaseLeaseAsync(lease, definition.Kind).ConfigureAwait(false);
if (releaseError is not null)
{
_logger.LogError(releaseError, "Failed to release lease {LeaseKey} for job {Kind}", lease.Key, definition.Kind);
}
}
}
}
public Task<IReadOnlyList<JobDefinition>> GetDefinitionsAsync(CancellationToken cancellationToken)
{
IReadOnlyList<JobDefinition> results = _options.Definitions.Values.OrderBy(x => x.Kind, StringComparer.Ordinal).ToArray();
return Task.FromResult(results);
}
public Task<IReadOnlyList<JobRunSnapshot>> GetRecentRunsAsync(string? kind, int limit, CancellationToken cancellationToken)
=> _jobStore.GetRecentRunsAsync(kind, limit, cancellationToken);
public Task<IReadOnlyList<JobRunSnapshot>> GetActiveRunsAsync(CancellationToken cancellationToken)
=> _jobStore.GetActiveRunsAsync(cancellationToken);
public Task<JobRunSnapshot?> GetRunAsync(Guid runId, CancellationToken cancellationToken)
=> _jobStore.FindAsync(runId, cancellationToken);
public Task<JobRunSnapshot?> GetLastRunAsync(string kind, CancellationToken cancellationToken)
=> _jobStore.GetLastRunAsync(kind, cancellationToken);
public Task<IReadOnlyDictionary<string, JobRunSnapshot>> GetLastRunsAsync(IEnumerable<string> kinds, CancellationToken cancellationToken)
=> _jobStore.GetLastRunsAsync(kinds, cancellationToken);
private static bool TryNormalizeParameters(
IReadOnlyDictionary<string, object?> source,
out Dictionary<string, object?> normalized,
out string? error)
{
if (source.Count == 0)
{
normalized = new Dictionary<string, object?>(StringComparer.Ordinal);
error = null;
return true;
}
normalized = new Dictionary<string, object?>(source.Count, StringComparer.Ordinal);
foreach (var kvp in source)
{
if (string.IsNullOrWhiteSpace(kvp.Key))
{
error = "Parameter keys must be non-empty strings.";
normalized = default!;
return false;
}
try
{
normalized[kvp.Key] = NormalizeParameterValue(kvp.Value);
}
catch (Exception ex)
{
error = $"Parameter '{kvp.Key}' cannot be serialized: {ex.Message}";
normalized = default!;
return false;
}
}
error = null;
return true;
}
private static object? NormalizeParameterValue(object? value)
{
if (value is null)
{
return null;
}
switch (value)
{
case string or bool or double or decimal:
return value;
case byte or sbyte or short or ushort or int or long:
return Convert.ToInt64(value, CultureInfo.InvariantCulture);
case uint ui:
return Convert.ToInt64(ui);
case ulong ul when ul <= long.MaxValue:
return (long)ul;
case ulong ul:
return ul.ToString(CultureInfo.InvariantCulture);
case float f:
return (double)f;
case DateTime dt:
return dt.Kind == DateTimeKind.Utc ? dt : dt.ToUniversalTime();
case DateTimeOffset dto:
return dto.ToUniversalTime();
case TimeSpan ts:
return ts.ToString("c", CultureInfo.InvariantCulture);
case Guid guid:
return guid.ToString("D");
case Enum enumValue:
return enumValue.ToString();
case byte[] bytes:
return Convert.ToBase64String(bytes);
case JsonDocument document:
return NormalizeJsonElement(document.RootElement);
case JsonElement element:
return NormalizeJsonElement(element);
case IDictionary dictionary:
{
var nested = new SortedDictionary<string, object?>(StringComparer.Ordinal);
foreach (DictionaryEntry entry in dictionary)
{
if (entry.Key is not string key || string.IsNullOrWhiteSpace(key))
{
throw new InvalidOperationException("Nested dictionary keys must be non-empty strings.");
}
nested[key] = NormalizeParameterValue(entry.Value);
}
return nested;
}
case IEnumerable enumerable when value is not string:
{
var list = new List<object?>();
foreach (var item in enumerable)
{
list.Add(NormalizeParameterValue(item));
}
return list;
}
default:
throw new InvalidOperationException($"Unsupported parameter value of type '{value.GetType().FullName}'.");
}
}
private static object? NormalizeJsonElement(JsonElement element)
{
return element.ValueKind switch
{
JsonValueKind.Null => null,
JsonValueKind.String => element.GetString(),
JsonValueKind.True => true,
JsonValueKind.False => false,
JsonValueKind.Number => element.TryGetInt64(out var l)
? l
: element.TryGetDecimal(out var dec)
? dec
: element.GetDouble(),
JsonValueKind.Object => NormalizeJsonObject(element),
JsonValueKind.Array => NormalizeJsonArray(element),
_ => throw new InvalidOperationException($"Unsupported JSON value '{element.ValueKind}'."),
};
}
private static SortedDictionary<string, object?> NormalizeJsonObject(JsonElement element)
{
var result = new SortedDictionary<string, object?>(StringComparer.Ordinal);
foreach (var property in element.EnumerateObject())
{
result[property.Name] = NormalizeJsonElement(property.Value);
}
return result;
}
private static List<object?> NormalizeJsonArray(JsonElement element)
{
var items = new List<object?>();
foreach (var item in element.EnumerateArray())
{
items.Add(NormalizeJsonElement(item));
}
return items;
}
private async Task<JobRunSnapshot?> CompleteRunAsync(Guid runId, JobRunStatus status, string? error, CancellationToken cancellationToken)
{
var completedAt = _timeProvider.GetUtcNow();
var completion = new JobRunCompletion(status, completedAt, error);
return await _jobStore.TryCompleteAsync(runId, completion, cancellationToken).ConfigureAwait(false);
}
private TimeSpan? ResolveDuration(JobRunSnapshot original, JobRunSnapshot? completed)
{
if (completed?.Duration is { } duration)
{
return duration;
}
var startedAt = completed?.StartedAt ?? original.StartedAt ?? original.CreatedAt;
var completedAt = completed?.CompletedAt ?? _timeProvider.GetUtcNow();
var elapsed = completedAt - startedAt;
return elapsed >= TimeSpan.Zero ? elapsed : null;
}
private static async Task<Exception?> ObserveLeaseTaskAsync(Task heartbeatTask)
{
try
{
await heartbeatTask.ConfigureAwait(false);
return null;
}
catch (OperationCanceledException)
{
return null;
}
catch (Exception ex)
{
return ex;
}
}
private async Task<Exception?> TryReleaseLeaseAsync(JobLease lease, string kind)
{
try
{
await _leaseStore.ReleaseAsync(lease.Key, _holderId, CancellationToken.None).ConfigureAwait(false);
return null;
}
catch (Exception ex)
{
return new LeaseMaintenanceException($"Failed to release lease for job '{kind}'.", ex);
}
}
private static Exception? CombineLeaseExceptions(Exception? first, Exception? second)
{
if (first is null)
{
return second;
}
if (second is null)
{
return first;
}
return new AggregateException(first, second);
}
private async Task ExecuteJobAsync(
JobDefinition definition,
JobLease lease,
JobRunSnapshot run,
IReadOnlyDictionary<string, object?> parameters,
string trigger,
CancellationTokenSource linkedTokenSource)
{
using (linkedTokenSource)
{
var cancellationToken = linkedTokenSource.Token;
using var heartbeatCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
var heartbeatTask = MaintainLeaseAsync(definition, lease, heartbeatCts.Token);
using var activity = _diagnostics.StartExecutionActivity(run.Kind, trigger, run.RunId);
activity?.SetTag("job.timeout_seconds", definition.Timeout.TotalSeconds);
activity?.SetTag("job.lease_seconds", definition.LeaseDuration.TotalSeconds);
activity?.SetTag("job.parameters_count", parameters.Count);
activity?.SetTag("job.created_at", run.CreatedAt.UtcDateTime);
activity?.SetTag("job.started_at", (run.StartedAt ?? run.CreatedAt).UtcDateTime);
activity?.SetTag("job.parameters_hash", run.ParametersHash);
_diagnostics.RecordRunStarted(run.Kind);
JobRunStatus finalStatus = JobRunStatus.Succeeded;
string? error = null;
Exception? executionException = null;
JobRunSnapshot? completedSnapshot = null;
Exception? leaseException = null;
try
{
using var scope = _scopeFactory.CreateScope();
var job = (IJob)scope.ServiceProvider.GetRequiredService(definition.JobType);
var jobLogger = _loggerFactory.CreateLogger(definition.JobType);
var context = new JobExecutionContext(
run.RunId,
run.Kind,
trigger,
parameters,
scope.ServiceProvider,
_timeProvider,
jobLogger);
await job.ExecuteAsync(context, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException oce)
{
finalStatus = JobRunStatus.Cancelled;
error = oce.Message;
executionException = oce;
}
catch (Exception ex)
{
finalStatus = JobRunStatus.Failed;
error = ex.ToString();
executionException = ex;
}
finally
{
heartbeatCts.Cancel();
leaseException = await ObserveLeaseTaskAsync(heartbeatTask).ConfigureAwait(false);
var releaseException = await TryReleaseLeaseAsync(lease, definition.Kind).ConfigureAwait(false);
leaseException = CombineLeaseExceptions(leaseException, releaseException);
if (leaseException is not null)
{
var leaseMessage = $"Lease maintenance failed: {leaseException.GetType().Name}: {leaseException.Message}";
if (finalStatus != JobRunStatus.Failed)
{
finalStatus = JobRunStatus.Failed;
error = leaseMessage;
executionException = leaseException;
}
else
{
error = string.IsNullOrWhiteSpace(error)
? leaseMessage
: $"{error}{Environment.NewLine}{leaseMessage}";
executionException = executionException is null
? leaseException
: new AggregateException(executionException, leaseException);
}
}
}
completedSnapshot = await CompleteRunAsync(run.RunId, finalStatus, error, CancellationToken.None).ConfigureAwait(false);
if (!string.IsNullOrWhiteSpace(error))
{
activity?.SetTag("job.error", error);
}
activity?.SetTag("job.status", finalStatus.ToString());
var completedDuration = ResolveDuration(run, completedSnapshot);
if (completedDuration.HasValue)
{
activity?.SetTag("job.duration_seconds", completedDuration.Value.TotalSeconds);
}
switch (finalStatus)
{
case JobRunStatus.Succeeded:
activity?.SetStatus(ActivityStatusCode.Ok);
_logger.LogInformation("Job {Kind} run {RunId} succeeded", run.Kind, run.RunId);
break;
case JobRunStatus.Cancelled:
activity?.SetStatus(ActivityStatusCode.Ok, "cancelled");
_logger.LogWarning(executionException, "Job {Kind} run {RunId} cancelled", run.Kind, run.RunId);
break;
case JobRunStatus.Failed:
activity?.SetStatus(ActivityStatusCode.Error, executionException?.Message ?? error);
_logger.LogError(executionException, "Job {Kind} run {RunId} failed", run.Kind, run.RunId);
break;
}
_diagnostics.RecordRunCompleted(run.Kind, finalStatus, completedDuration, error);
}
}
private async Task MaintainLeaseAsync(JobDefinition definition, JobLease lease, CancellationToken cancellationToken)
{
var leaseDuration = lease.LeaseDuration <= TimeSpan.Zero ? _options.DefaultLeaseDuration : lease.LeaseDuration;
var delay = TimeSpan.FromMilliseconds(Math.Max(1000, leaseDuration.TotalMilliseconds / 2));
while (!cancellationToken.IsCancellationRequested)
{
try
{
await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
break;
}
var now = _timeProvider.GetUtcNow();
try
{
await _leaseStore.HeartbeatAsync(definition.LeaseKey, _holderId, leaseDuration, now, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
throw new LeaseMaintenanceException($"Failed to heartbeat lease for job '{definition.Kind}'.", ex);
}
}
}
private static string BuildHolderId()
{
var machine = Environment.MachineName;
var processId = Environment.ProcessId;
return $"{machine}:{processId}";
}
}
internal sealed class LeaseMaintenanceException : Exception
{
public LeaseMaintenanceException(string message, Exception innerException)
: base(message, innerException)
{
}
}
internal static class JobParametersHasher
{
internal static readonly JsonSerializerOptions SerializerOptions = new()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
WriteIndented = false,
};
public static string? Compute(IReadOnlyDictionary<string, object?> parameters)
{
if (parameters is null || parameters.Count == 0)
{
return null;
}
var canonicalJson = JsonSerializer.Serialize(Sort(parameters), SerializerOptions);
var bytes = Encoding.UTF8.GetBytes(canonicalJson);
var hash = SHA256.HashData(bytes);
return Convert.ToHexString(hash).ToLowerInvariant();
}
private static SortedDictionary<string, object?> Sort(IReadOnlyDictionary<string, object?> parameters)
{
var sorted = new SortedDictionary<string, object?>(StringComparer.Ordinal);
foreach (var kvp in parameters)
{
sorted[kvp.Key] = kvp.Value;
}
return sorted;
}
}

View File

@@ -0,0 +1,12 @@
namespace StellaOps.Concelier.Core.Jobs;
public sealed record JobDefinition(
string Kind,
Type JobType,
TimeSpan Timeout,
TimeSpan LeaseDuration,
string? CronExpression,
bool Enabled)
{
public string LeaseKey => $"job:{Kind}";
}

View File

@@ -0,0 +1,171 @@
using System.Diagnostics;
using System.Diagnostics.Metrics;
namespace StellaOps.Concelier.Core.Jobs;
public sealed class JobDiagnostics : IDisposable
{
public const string ActivitySourceName = "StellaOps.Concelier.Jobs";
public const string MeterName = "StellaOps.Concelier.Jobs";
public const string TriggerActivityName = "concelier.job.trigger";
public const string ExecuteActivityName = "concelier.job.execute";
public const string SchedulerActivityName = "concelier.scheduler.evaluate";
private readonly Counter<long> _triggersAccepted;
private readonly Counter<long> _triggersRejected;
private readonly Counter<long> _runsCompleted;
private readonly UpDownCounter<long> _runsActive;
private readonly Histogram<double> _runDurationSeconds;
private readonly Histogram<double> _schedulerSkewMilliseconds;
public JobDiagnostics()
{
ActivitySource = new ActivitySource(ActivitySourceName);
Meter = new Meter(MeterName);
_triggersAccepted = Meter.CreateCounter<long>(
name: "concelier.jobs.triggers.accepted",
unit: "count",
description: "Number of job trigger requests accepted for execution.");
_triggersRejected = Meter.CreateCounter<long>(
name: "concelier.jobs.triggers.rejected",
unit: "count",
description: "Number of job trigger requests rejected or ignored by the coordinator.");
_runsCompleted = Meter.CreateCounter<long>(
name: "concelier.jobs.runs.completed",
unit: "count",
description: "Number of job executions that have finished grouped by outcome.");
_runsActive = Meter.CreateUpDownCounter<long>(
name: "concelier.jobs.runs.active",
unit: "count",
description: "Current number of running job executions.");
_runDurationSeconds = Meter.CreateHistogram<double>(
name: "concelier.jobs.runs.duration",
unit: "s",
description: "Distribution of job execution durations in seconds.");
_schedulerSkewMilliseconds = Meter.CreateHistogram<double>(
name: "concelier.scheduler.skew",
unit: "ms",
description: "Difference between the intended and actual scheduler fire time in milliseconds.");
}
public ActivitySource ActivitySource { get; }
public Meter Meter { get; }
public Activity? StartTriggerActivity(string kind, string trigger)
{
var activity = ActivitySource.StartActivity(TriggerActivityName, ActivityKind.Internal);
if (activity is not null)
{
activity.SetTag("job.kind", kind);
activity.SetTag("job.trigger", trigger);
}
return activity;
}
public Activity? StartSchedulerActivity(string kind, DateTimeOffset scheduledFor, DateTimeOffset invokedAt)
{
var activity = ActivitySource.StartActivity(SchedulerActivityName, ActivityKind.Internal);
if (activity is not null)
{
activity.SetTag("job.kind", kind);
activity.SetTag("job.scheduled_for", scheduledFor.UtcDateTime);
activity.SetTag("job.invoked_at", invokedAt.UtcDateTime);
activity.SetTag("job.scheduler_delay_ms", (invokedAt - scheduledFor).TotalMilliseconds);
}
return activity;
}
public Activity? StartExecutionActivity(string kind, string trigger, Guid runId)
{
var activity = ActivitySource.StartActivity(ExecuteActivityName, ActivityKind.Internal);
if (activity is not null)
{
activity.SetTag("job.kind", kind);
activity.SetTag("job.trigger", trigger);
activity.SetTag("job.run_id", runId);
}
return activity;
}
public void RecordTriggerAccepted(string kind, string trigger)
{
var tags = new TagList
{
{ "job.kind", kind },
{ "job.trigger", trigger },
};
_triggersAccepted.Add(1, tags);
}
public void RecordTriggerRejected(string kind, string trigger, string reason)
{
var tags = new TagList
{
{ "job.kind", kind },
{ "job.trigger", trigger },
{ "job.reason", reason },
};
_triggersRejected.Add(1, tags);
}
public void RecordRunStarted(string kind)
{
var tags = new TagList { { "job.kind", kind } };
_runsActive.Add(1, tags);
}
public void RecordRunCompleted(string kind, JobRunStatus status, TimeSpan? duration, string? error)
{
var outcome = status.ToString();
var completionTags = new TagList
{
{ "job.kind", kind },
{ "job.status", outcome },
};
if (!string.IsNullOrWhiteSpace(error))
{
completionTags.Add("job.error", error);
}
_runsCompleted.Add(1, completionTags);
var activeTags = new TagList { { "job.kind", kind } };
_runsActive.Add(-1, activeTags);
if (duration.HasValue)
{
var seconds = Math.Max(duration.Value.TotalSeconds, 0d);
var durationTags = new TagList
{
{ "job.kind", kind },
{ "job.status", outcome },
};
_runDurationSeconds.Record(seconds, durationTags);
}
}
public void RecordSchedulerSkew(string kind, DateTimeOffset scheduledFor, DateTimeOffset invokedAt)
{
var skew = (invokedAt - scheduledFor).TotalMilliseconds;
var tags = new TagList { { "job.kind", kind } };
_schedulerSkewMilliseconds.Record(skew, tags);
}
public void Dispose()
{
ActivitySource.Dispose();
Meter.Dispose();
}
}

View File

@@ -0,0 +1,42 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace StellaOps.Concelier.Core.Jobs;
public sealed class JobExecutionContext
{
public JobExecutionContext(
Guid runId,
string kind,
string trigger,
IReadOnlyDictionary<string, object?> parameters,
IServiceProvider services,
TimeProvider timeProvider,
ILogger logger)
{
RunId = runId;
Kind = kind;
Trigger = trigger;
Parameters = parameters ?? throw new ArgumentNullException(nameof(parameters));
Services = services ?? throw new ArgumentNullException(nameof(services));
TimeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
Logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public Guid RunId { get; }
public string Kind { get; }
public string Trigger { get; }
public IReadOnlyDictionary<string, object?> Parameters { get; }
public IServiceProvider Services { get; }
public TimeProvider TimeProvider { get; }
public ILogger Logger { get; }
public T GetRequiredService<T>() where T : notnull
=> Services.GetRequiredService<T>();
}

View File

@@ -0,0 +1,9 @@
namespace StellaOps.Concelier.Core.Jobs;
public sealed record JobLease(
string Key,
string Holder,
DateTimeOffset AcquiredAt,
DateTimeOffset HeartbeatAt,
TimeSpan LeaseDuration,
DateTimeOffset TtlAt);

View File

@@ -0,0 +1,128 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using StellaOps.DependencyInjection;
using StellaOps.Plugin.Hosting;
namespace StellaOps.Concelier.Core.Jobs;
public static class JobPluginRegistrationExtensions
{
public static IServiceCollection RegisterJobPluginRoutines(
this IServiceCollection services,
IConfiguration configuration,
PluginHostOptions options,
ILogger? logger = null)
{
ArgumentNullException.ThrowIfNull(services);
ArgumentNullException.ThrowIfNull(configuration);
ArgumentNullException.ThrowIfNull(options);
var loadResult = PluginHost.LoadPlugins(options, logger);
if (!services.Any(sd => sd.ServiceType == typeof(PluginHostResult)))
{
services.AddSingleton(loadResult);
}
var currentServices = services;
var seenRoutineTypes = new HashSet<string>(StringComparer.Ordinal);
foreach (var plugin in loadResult.Plugins)
{
foreach (var routineType in GetRoutineTypes(plugin.Assembly))
{
if (!typeof(IDependencyInjectionRoutine).IsAssignableFrom(routineType))
{
continue;
}
if (routineType.IsInterface || routineType.IsAbstract)
{
continue;
}
var routineKey = routineType.FullName ?? routineType.Name;
if (!seenRoutineTypes.Add(routineKey))
{
continue;
}
IDependencyInjectionRoutine? routineInstance;
try
{
routineInstance = Activator.CreateInstance(routineType) as IDependencyInjectionRoutine;
}
catch (Exception ex)
{
logger?.LogWarning(
ex,
"Failed to create dependency injection routine {Routine} from plugin {Plugin}.",
routineType.FullName ?? routineType.Name,
plugin.Assembly.FullName ?? plugin.AssemblyPath);
continue;
}
if (routineInstance is null)
{
continue;
}
try
{
var updated = routineInstance.Register(currentServices, configuration);
if (updated is not null && !ReferenceEquals(updated, currentServices))
{
currentServices = updated;
}
}
catch (Exception ex)
{
logger?.LogError(
ex,
"Dependency injection routine {Routine} from plugin {Plugin} threw during registration.",
routineType.FullName ?? routineType.Name,
plugin.Assembly.FullName ?? plugin.AssemblyPath);
}
}
}
if (loadResult.MissingOrderedPlugins.Count > 0)
{
logger?.LogWarning(
"Missing ordered plugin(s): {Missing}",
string.Join(", ", loadResult.MissingOrderedPlugins));
}
return currentServices;
}
private static IEnumerable<Type> GetRoutineTypes(Assembly assembly)
{
if (assembly is null)
{
yield break;
}
Type[] types;
try
{
types = assembly.GetTypes();
}
catch (ReflectionTypeLoadException ex)
{
types = ex.Types.Where(static t => t is not null)!
.Select(static t => t!)
.ToArray();
}
foreach (var type in types)
{
yield return type;
}
}
}

View File

@@ -0,0 +1,6 @@
namespace StellaOps.Concelier.Core.Jobs;
public sealed record JobRunCompletion(
JobRunStatus Status,
DateTimeOffset CompletedAt,
string? Error);

View File

@@ -0,0 +1,10 @@
namespace StellaOps.Concelier.Core.Jobs;
public sealed record JobRunCreateRequest(
string Kind,
string Trigger,
IReadOnlyDictionary<string, object?> Parameters,
string? ParametersHash,
TimeSpan? Timeout,
TimeSpan? LeaseDuration,
DateTimeOffset CreatedAt);

View File

@@ -0,0 +1,21 @@
namespace StellaOps.Concelier.Core.Jobs;
/// <summary>
/// Immutable projection of a job run as stored in persistence.
/// </summary>
public sealed record JobRunSnapshot(
Guid RunId,
string Kind,
JobRunStatus Status,
DateTimeOffset CreatedAt,
DateTimeOffset? StartedAt,
DateTimeOffset? CompletedAt,
string Trigger,
string? ParametersHash,
string? Error,
TimeSpan? Timeout,
TimeSpan? LeaseDuration,
IReadOnlyDictionary<string, object?> Parameters)
{
public TimeSpan? Duration => StartedAt is null || CompletedAt is null ? null : CompletedAt - StartedAt;
}

View File

@@ -0,0 +1,10 @@
namespace StellaOps.Concelier.Core.Jobs;
public enum JobRunStatus
{
Pending,
Running,
Succeeded,
Failed,
Cancelled,
}

View File

@@ -0,0 +1,47 @@
using System;
using Microsoft.Extensions.DependencyInjection;
namespace StellaOps.Concelier.Core.Jobs;
public sealed class JobSchedulerBuilder
{
private readonly IServiceCollection _services;
public JobSchedulerBuilder(IServiceCollection services)
{
_services = services ?? throw new ArgumentNullException(nameof(services));
}
public JobSchedulerBuilder AddJob<TJob>(
string kind,
string? cronExpression = null,
TimeSpan? timeout = null,
TimeSpan? leaseDuration = null,
bool enabled = true)
where TJob : class, IJob
{
ArgumentException.ThrowIfNullOrEmpty(kind);
_services.AddTransient<TJob>();
_services.Configure<JobSchedulerOptions>(options =>
{
if (options.Definitions.ContainsKey(kind))
{
throw new InvalidOperationException($"Job '{kind}' is already registered.");
}
var resolvedTimeout = timeout ?? options.DefaultTimeout;
var resolvedLease = leaseDuration ?? options.DefaultLeaseDuration;
options.Definitions.Add(kind, new JobDefinition(
kind,
typeof(TJob),
resolvedTimeout,
resolvedLease,
cronExpression,
enabled));
});
return this;
}
}

View File

@@ -0,0 +1,165 @@
using Cronos;
using System.Diagnostics;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace StellaOps.Concelier.Core.Jobs;
/// <summary>
/// Background service that evaluates cron expressions for registered jobs and triggers them.
/// </summary>
public sealed class JobSchedulerHostedService : BackgroundService
{
private readonly IJobCoordinator _coordinator;
private readonly JobSchedulerOptions _options;
private readonly ILogger<JobSchedulerHostedService> _logger;
private readonly TimeProvider _timeProvider;
private readonly JobDiagnostics _diagnostics;
private readonly Dictionary<string, CronExpression> _cronExpressions = new(StringComparer.Ordinal);
private readonly Dictionary<string, DateTimeOffset> _nextOccurrences = new(StringComparer.Ordinal);
public JobSchedulerHostedService(
IJobCoordinator coordinator,
IOptions<JobSchedulerOptions> optionsAccessor,
ILogger<JobSchedulerHostedService> logger,
TimeProvider timeProvider,
JobDiagnostics diagnostics)
{
_coordinator = coordinator ?? throw new ArgumentNullException(nameof(coordinator));
_options = (optionsAccessor ?? throw new ArgumentNullException(nameof(optionsAccessor))).Value;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
_diagnostics = diagnostics ?? throw new ArgumentNullException(nameof(diagnostics));
foreach (var definition in _options.Definitions.Values)
{
if (string.IsNullOrWhiteSpace(definition.CronExpression))
{
continue;
}
try
{
var cron = CronExpression.Parse(definition.CronExpression!, CronFormat.Standard);
_cronExpressions[definition.Kind] = cron;
}
catch (CronFormatException ex)
{
_logger.LogError(ex, "Invalid cron expression '{Cron}' for job {Kind}", definition.CronExpression, definition.Kind);
}
}
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (_cronExpressions.Count == 0)
{
_logger.LogInformation("No cron-based jobs registered; scheduler idle.");
await Task.Delay(Timeout.Infinite, stoppingToken).ConfigureAwait(false);
return;
}
while (!stoppingToken.IsCancellationRequested)
{
var now = _timeProvider.GetUtcNow();
var nextWake = now.AddMinutes(5); // default sleep when nothing scheduled
foreach (var (kind, cron) in _cronExpressions)
{
if (!_options.Definitions.TryGetValue(kind, out var definition) || !definition.Enabled)
{
continue;
}
var next = GetNextOccurrence(kind, cron, now);
if (next <= now.AddMilliseconds(500))
{
_ = TriggerJobAsync(kind, next, stoppingToken);
_nextOccurrences[kind] = GetNextOccurrence(kind, cron, now.AddSeconds(1));
next = _nextOccurrences[kind];
}
if (next < nextWake)
{
nextWake = next;
}
}
var delay = nextWake - now;
if (delay < TimeSpan.FromSeconds(1))
{
delay = TimeSpan.FromSeconds(1);
}
try
{
await Task.Delay(delay, stoppingToken).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
break;
}
}
}
private DateTimeOffset GetNextOccurrence(string kind, CronExpression cron, DateTimeOffset reference)
{
if (_nextOccurrences.TryGetValue(kind, out var cached) && cached > reference)
{
return cached;
}
var next = cron.GetNextOccurrence(reference.UtcDateTime, TimeZoneInfo.Utc);
if (next is null)
{
// No future occurrence; schedule far in future to avoid tight loop.
next = reference.UtcDateTime.AddYears(100);
}
var nextUtc = DateTime.SpecifyKind(next.Value, DateTimeKind.Utc);
var offset = new DateTimeOffset(nextUtc);
_nextOccurrences[kind] = offset;
return offset;
}
private async Task TriggerJobAsync(string kind, DateTimeOffset scheduledFor, CancellationToken stoppingToken)
{
var invokedAt = _timeProvider.GetUtcNow();
_diagnostics.RecordSchedulerSkew(kind, scheduledFor, invokedAt);
using var activity = _diagnostics.StartSchedulerActivity(kind, scheduledFor, invokedAt);
try
{
var result = await _coordinator.TriggerAsync(kind, parameters: null, trigger: "scheduler", stoppingToken).ConfigureAwait(false);
activity?.SetTag("job.trigger.outcome", result.Outcome.ToString());
if (result.Run is not null)
{
activity?.SetTag("job.run_id", result.Run.RunId);
}
if (!string.IsNullOrWhiteSpace(result.ErrorMessage))
{
activity?.SetTag("job.trigger.error", result.ErrorMessage);
}
if (result.Outcome == JobTriggerOutcome.Accepted)
{
activity?.SetStatus(ActivityStatusCode.Ok);
}
else
{
activity?.SetStatus(ActivityStatusCode.Ok, result.Outcome.ToString());
}
if (result.Outcome != JobTriggerOutcome.Accepted)
{
_logger.LogDebug("Scheduler trigger for {Kind} resulted in {Outcome}", kind, result.Outcome);
}
}
catch (Exception ex) when (!stoppingToken.IsCancellationRequested)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
_logger.LogError(ex, "Cron trigger for job {Kind} failed", kind);
}
}
}

View File

@@ -0,0 +1,12 @@
namespace StellaOps.Concelier.Core.Jobs;
public sealed class JobSchedulerOptions
{
public static JobSchedulerOptions Empty { get; } = new();
public IDictionary<string, JobDefinition> Definitions { get; } = new Dictionary<string, JobDefinition>(StringComparer.Ordinal);
public TimeSpan DefaultTimeout { get; set; } = TimeSpan.FromMinutes(15);
public TimeSpan DefaultLeaseDuration { get; set; } = TimeSpan.FromMinutes(5);
}

View File

@@ -0,0 +1,40 @@
namespace StellaOps.Concelier.Core.Jobs;
public enum JobTriggerOutcome
{
Accepted,
NotFound,
Disabled,
AlreadyRunning,
LeaseRejected,
InvalidParameters,
Failed,
Cancelled,
}
public sealed record JobTriggerResult(JobTriggerOutcome Outcome, JobRunSnapshot? Run, string? ErrorMessage)
{
public static JobTriggerResult Accepted(JobRunSnapshot run)
=> new(JobTriggerOutcome.Accepted, run, null);
public static JobTriggerResult NotFound(string message)
=> new(JobTriggerOutcome.NotFound, null, message);
public static JobTriggerResult Disabled(string message)
=> new(JobTriggerOutcome.Disabled, null, message);
public static JobTriggerResult AlreadyRunning(string message)
=> new(JobTriggerOutcome.AlreadyRunning, null, message);
public static JobTriggerResult LeaseRejected(string message)
=> new(JobTriggerOutcome.LeaseRejected, null, message);
public static JobTriggerResult InvalidParameters(string message)
=> new(JobTriggerOutcome.InvalidParameters, null, message);
public static JobTriggerResult Failed(JobRunSnapshot run, string error)
=> new(JobTriggerOutcome.Failed, run, error);
public static JobTriggerResult Cancelled(JobRunSnapshot run, string error)
=> new(JobTriggerOutcome.Cancelled, run, error);
}

View File

@@ -0,0 +1,27 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
namespace StellaOps.Concelier.Core.Jobs;
public static class JobServiceCollectionExtensions
{
public static JobSchedulerBuilder AddJobScheduler(this IServiceCollection services, Action<JobSchedulerOptions>? configure = null)
{
ArgumentNullException.ThrowIfNull(services);
var optionsBuilder = services.AddOptions<JobSchedulerOptions>();
if (configure is not null)
{
optionsBuilder.Configure(configure);
}
services.AddSingleton(sp => sp.GetRequiredService<IOptions<JobSchedulerOptions>>().Value);
services.AddSingleton<JobDiagnostics>();
services.TryAddSingleton(TimeProvider.System);
services.AddSingleton<IJobCoordinator, JobCoordinator>();
services.AddHostedService<JobSchedulerHostedService>();
return new JobSchedulerBuilder(services);
}
}

View File

@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<LangVersion>preview</LangVersion>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.0" />
<PackageReference Include="Cronos" Version="0.10.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\StellaOps.Concelier.Models\StellaOps.Concelier.Models.csproj" />
<ProjectReference Include="..\StellaOps.Plugin\StellaOps.Plugin.csproj" />
</ItemGroup>
</Project>

View File

@@ -0,0 +1,20 @@
# TASKS
| Task | Owner(s) | Depends on | Notes |
|---|---|---|---|
|JobCoordinator implementation (create/get/mark status)|BE-Core|Storage.Mongo|DONE `JobCoordinator` drives Mongo-backed runs.|
|Cron scheduling loop with TimeProvider|BE-Core|Core|DONE `JobSchedulerHostedService` evaluates cron expressions.|
|Single-flight/lease semantics|BE-Core|Storage.Mongo|DONE lease acquisition backed by `MongoLeaseStore`.|
|Trigger API contract (Result mapping)|BE-Core|WebService|DONE `JobTriggerResult` outcomes map to HTTP statuses.|
|Run telemetry enrichment|BE-Core|Observability|DONE `JobDiagnostics` ties activities & counters into coordinator/scheduler paths.|
|Deterministic params hashing|BE-Core|Core|DONE `JobParametersHasher` creates SHA256 hash.|
|Golden tests for timeout/cancel|QA|Core|DONE JobCoordinatorTests cover cancellation timeout path.|
|JobSchedulerBuilder options registry coverage|BE-Core|Core|DONE added scheduler tests confirming cron/timeout/lease metadata persists via JobSchedulerOptions.|
|Plugin discovery + DI glue with PluginHost|BE-Core|Plugin libs|DONE JobPluginRegistrationExtensions now loads PluginHost routines and wires connector/exporter registrations.|
|Harden lease release error handling in JobCoordinator|BE-Core|Storage.Mongo|DONE lease release failures now logged, wrapped, and drive run failure status; fire-and-forget execution guarded. Verified with `dotnet test --no-build --filter JobCoordinator`.|
|Validate job trigger parameters for serialization|BE-Core|WebService|DONE trigger parameters normalized/serialized with defensive checks returning InvalidParameters on failure. Full-suite `dotnet test --no-build` currently red from live connector fixture drift (Oracle/JVN/RedHat).|
|FEEDCORE-ENGINE-03-001 Canonical merger implementation|BE-Core|Merge|DONE `CanonicalMerger` applies GHSA/NVD/OSV conflict rules with deterministic provenance and comprehensive unit coverage. **Coordination:** Connector leads must align mapper outputs with the canonical field expectations before 2025-10-18 so Merge can activate the path globally.|
|FEEDCORE-ENGINE-03-002 Field precedence and tie-breaker map|BE-Core|Merge|DONE field precedence and freshness overrides enforced via `FieldPrecedence` map with tie-breakers and analytics capture. **Reminder:** Storage/Merge owners review precedence overrides when onboarding new feeds to ensure `decisionReason` tagging stays consistent.|
|Canonical merger parity for description/CWE/canonical metric|BE-Core|Models|DONE (2025-10-15) merger now populates description/CWEs/canonical metric id with provenance and regression tests cover the new decisions.|
|Reference normalization & freshness instrumentation cleanup|BE-Core, QA|Models|DONE (2025-10-15) reference keys normalized, freshness overrides applied to union fields, and new tests assert decision logging.|
|FEEDCORE-ENGINE-07-001 Advisory event log & asOf queries|Team Core Engine & Storage Analytics|FEEDSTORAGE-DATA-07-001|TODO Introduce immutable advisory statement events, expose `asOf` query surface for merge/export pipelines, and document determinism guarantees for replay.|
|FEEDCORE-ENGINE-07-002 Noise prior computation service|Team Core Engine & Data Science|FEEDCORE-ENGINE-07-001|TODO Build rule-based learner capturing false-positive priors per package/env, persist summaries, and expose APIs for Excititor/scan suppressors with reproducible statistics.|