Files
git.stella-ops.org/src/StellaOps.Concelier.Merge/Services/AdvisoryMergeService.cs
Vladimir Moushkov 2b6304c9c3
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled
feat: Implement advisory event replay API with conflict explainers
- Added `/concelier/advisories/{vulnerabilityKey}/replay` endpoint to return conflict summaries and explainers.
- Introduced `MergeConflictExplainerPayload` to structure conflict details including type, reason, and source rankings.
- Enhanced `MergeConflictSummary` to include structured explainer payloads and hashes for persisted conflicts.
- Updated `MirrorEndpointExtensions` to enforce rate limits and cache headers for mirror distribution endpoints.
- Refactored tests to cover new replay endpoint functionality and validate conflict explainers.
- Documented changes in TASKS.md, noting completion of mirror distribution endpoints and updated operational runbook.
2025-10-20 18:59:26 +03:00

457 lines
16 KiB
C#

using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Diagnostics.Metrics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using StellaOps.Concelier.Core;
using StellaOps.Concelier.Core.Events;
using StellaOps.Concelier.Models;
using StellaOps.Concelier.Storage.Mongo.Advisories;
using StellaOps.Concelier.Storage.Mongo.Aliases;
using StellaOps.Concelier.Storage.Mongo.MergeEvents;
using System.Text.Json;
namespace StellaOps.Concelier.Merge.Services;
public sealed class AdvisoryMergeService
{
private static readonly Meter MergeMeter = new("StellaOps.Concelier.Merge");
private static readonly Counter<long> AliasCollisionCounter = MergeMeter.CreateCounter<long>(
"concelier.merge.identity_conflicts",
unit: "count",
description: "Number of alias collisions detected during merge.");
private static readonly string[] PreferredAliasSchemes =
{
AliasSchemes.Cve,
AliasSchemes.Ghsa,
AliasSchemes.OsV,
AliasSchemes.Msrc,
};
private readonly AliasGraphResolver _aliasResolver;
private readonly IAdvisoryStore _advisoryStore;
private readonly AdvisoryPrecedenceMerger _precedenceMerger;
private readonly MergeEventWriter _mergeEventWriter;
private readonly IAdvisoryEventLog _eventLog;
private readonly TimeProvider _timeProvider;
private readonly CanonicalMerger _canonicalMerger;
private readonly ILogger<AdvisoryMergeService> _logger;
public AdvisoryMergeService(
AliasGraphResolver aliasResolver,
IAdvisoryStore advisoryStore,
AdvisoryPrecedenceMerger precedenceMerger,
MergeEventWriter mergeEventWriter,
CanonicalMerger canonicalMerger,
IAdvisoryEventLog eventLog,
TimeProvider timeProvider,
ILogger<AdvisoryMergeService> logger)
{
_aliasResolver = aliasResolver ?? throw new ArgumentNullException(nameof(aliasResolver));
_advisoryStore = advisoryStore ?? throw new ArgumentNullException(nameof(advisoryStore));
_precedenceMerger = precedenceMerger ?? throw new ArgumentNullException(nameof(precedenceMerger));
_mergeEventWriter = mergeEventWriter ?? throw new ArgumentNullException(nameof(mergeEventWriter));
_canonicalMerger = canonicalMerger ?? throw new ArgumentNullException(nameof(canonicalMerger));
_eventLog = eventLog ?? throw new ArgumentNullException(nameof(eventLog));
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<AdvisoryMergeResult> MergeAsync(string seedAdvisoryKey, CancellationToken cancellationToken)
{
ArgumentException.ThrowIfNullOrWhiteSpace(seedAdvisoryKey);
var component = await _aliasResolver.BuildComponentAsync(seedAdvisoryKey, cancellationToken).ConfigureAwait(false);
var inputs = new List<Advisory>();
foreach (var advisoryKey in component.AdvisoryKeys)
{
cancellationToken.ThrowIfCancellationRequested();
var advisory = await _advisoryStore.FindAsync(advisoryKey, cancellationToken).ConfigureAwait(false);
if (advisory is not null)
{
inputs.Add(advisory);
}
}
if (inputs.Count == 0)
{
_logger.LogWarning("Alias component seeded by {Seed} contains no persisted advisories", seedAdvisoryKey);
return AdvisoryMergeResult.Empty(seedAdvisoryKey, component);
}
var canonicalKey = SelectCanonicalKey(component) ?? seedAdvisoryKey;
var canonicalMerge = ApplyCanonicalMergeIfNeeded(canonicalKey, inputs);
var before = await _advisoryStore.FindAsync(canonicalKey, cancellationToken).ConfigureAwait(false);
var normalizedInputs = NormalizeInputs(inputs, canonicalKey).ToList();
PrecedenceMergeResult precedenceResult;
try
{
precedenceResult = _precedenceMerger.Merge(normalizedInputs);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to merge alias component seeded by {Seed}", seedAdvisoryKey);
throw;
}
var merged = precedenceResult.Advisory;
var conflictDetails = precedenceResult.Conflicts;
if (component.Collisions.Count > 0)
{
foreach (var collision in component.Collisions)
{
var tags = new KeyValuePair<string, object?>[]
{
new("scheme", collision.Scheme ?? string.Empty),
new("alias_value", collision.Value ?? string.Empty),
new("advisory_count", collision.AdvisoryKeys.Count),
};
AliasCollisionCounter.Add(1, tags);
_logger.LogInformation(
"Alias collision {Scheme}:{Value} involves advisories {Advisories}",
collision.Scheme,
collision.Value,
string.Join(", ", collision.AdvisoryKeys));
}
}
await _advisoryStore.UpsertAsync(merged, cancellationToken).ConfigureAwait(false);
await _mergeEventWriter.AppendAsync(
canonicalKey,
before,
merged,
Array.Empty<Guid>(),
ConvertFieldDecisions(canonicalMerge?.Decisions),
cancellationToken).ConfigureAwait(false);
var conflictSummaries = await AppendEventLogAsync(canonicalKey, normalizedInputs, merged, conflictDetails, cancellationToken).ConfigureAwait(false);
return new AdvisoryMergeResult(seedAdvisoryKey, canonicalKey, component, inputs, before, merged, conflictSummaries);
}
private async Task<IReadOnlyList<MergeConflictSummary>> AppendEventLogAsync(
string vulnerabilityKey,
IReadOnlyList<Advisory> inputs,
Advisory merged,
IReadOnlyList<MergeConflictDetail> conflicts,
CancellationToken cancellationToken)
{
var recordedAt = _timeProvider.GetUtcNow();
var statements = new List<AdvisoryStatementInput>(inputs.Count + 1);
var statementIds = new Dictionary<Advisory, Guid>(ReferenceEqualityComparer.Instance);
foreach (var advisory in inputs)
{
var statementId = Guid.NewGuid();
statementIds[advisory] = statementId;
statements.Add(new AdvisoryStatementInput(
vulnerabilityKey,
advisory,
DetermineAsOf(advisory, recordedAt),
InputDocumentIds: Array.Empty<Guid>(),
StatementId: statementId,
AdvisoryKey: advisory.AdvisoryKey));
}
var canonicalStatementId = Guid.NewGuid();
statementIds[merged] = canonicalStatementId;
statements.Add(new AdvisoryStatementInput(
vulnerabilityKey,
merged,
recordedAt,
InputDocumentIds: Array.Empty<Guid>(),
StatementId: canonicalStatementId,
AdvisoryKey: merged.AdvisoryKey));
var conflictMaterialization = BuildConflictInputs(conflicts, vulnerabilityKey, statementIds, canonicalStatementId, recordedAt);
var conflictInputs = conflictMaterialization.Inputs;
var conflictSummaries = conflictMaterialization.Summaries;
if (statements.Count == 0 && conflictInputs.Count == 0)
{
return conflictSummaries.Count == 0
? Array.Empty<MergeConflictSummary>()
: conflictSummaries.ToArray();
}
var request = new AdvisoryEventAppendRequest(statements, conflictInputs.Count > 0 ? conflictInputs : null);
try
{
await _eventLog.AppendAsync(request, cancellationToken).ConfigureAwait(false);
}
finally
{
foreach (var conflict in conflictInputs)
{
conflict.Details.Dispose();
}
}
return conflictSummaries.Count == 0
? Array.Empty<MergeConflictSummary>()
: conflictSummaries.ToArray();
}
private static DateTimeOffset DetermineAsOf(Advisory advisory, DateTimeOffset fallback)
{
return (advisory.Modified ?? advisory.Published ?? fallback).ToUniversalTime();
}
private static ConflictMaterialization BuildConflictInputs(
IReadOnlyList<MergeConflictDetail> conflicts,
string vulnerabilityKey,
IReadOnlyDictionary<Advisory, Guid> statementIds,
Guid canonicalStatementId,
DateTimeOffset recordedAt)
{
if (conflicts.Count == 0)
{
return new ConflictMaterialization(new List<AdvisoryConflictInput>(0), new List<MergeConflictSummary>(0));
}
var inputs = new List<AdvisoryConflictInput>(conflicts.Count);
var summaries = new List<MergeConflictSummary>(conflicts.Count);
foreach (var detail in conflicts)
{
if (!statementIds.TryGetValue(detail.Suppressed, out var suppressedId))
{
continue;
}
var related = new List<Guid> { canonicalStatementId, suppressedId };
if (statementIds.TryGetValue(detail.Primary, out var primaryId))
{
if (!related.Contains(primaryId))
{
related.Add(primaryId);
}
}
var payload = new ConflictDetailPayload(
detail.ConflictType,
detail.Reason,
detail.PrimarySources,
detail.PrimaryRank,
detail.SuppressedSources,
detail.SuppressedRank,
detail.PrimaryValue,
detail.SuppressedValue);
var explainer = new MergeConflictExplainerPayload(
payload.Type,
payload.Reason,
payload.PrimarySources,
payload.PrimaryRank,
payload.SuppressedSources,
payload.SuppressedRank,
payload.PrimaryValue,
payload.SuppressedValue);
var canonicalJson = explainer.ToCanonicalJson();
var document = JsonDocument.Parse(canonicalJson);
var asOf = (detail.Primary.Modified ?? detail.Suppressed.Modified ?? recordedAt).ToUniversalTime();
var conflictId = Guid.NewGuid();
var statementIdArray = ImmutableArray.CreateRange(related);
var conflictHash = explainer.ComputeHashHex(canonicalJson);
inputs.Add(new AdvisoryConflictInput(
vulnerabilityKey,
document,
asOf,
related,
ConflictId: conflictId));
summaries.Add(new MergeConflictSummary(
conflictId,
vulnerabilityKey,
statementIdArray,
conflictHash,
asOf,
recordedAt,
explainer));
}
return new ConflictMaterialization(inputs, summaries);
}
private static IEnumerable<Advisory> NormalizeInputs(IEnumerable<Advisory> advisories, string canonicalKey)
{
foreach (var advisory in advisories)
{
yield return CloneWithKey(advisory, canonicalKey);
}
}
private static Advisory CloneWithKey(Advisory source, string advisoryKey)
=> new(
advisoryKey,
source.Title,
source.Summary,
source.Language,
source.Published,
source.Modified,
source.Severity,
source.ExploitKnown,
source.Aliases,
source.Credits,
source.References,
source.AffectedPackages,
source.CvssMetrics,
source.Provenance,
source.Description,
source.Cwes,
source.CanonicalMetricId);
private CanonicalMergeResult? ApplyCanonicalMergeIfNeeded(string canonicalKey, List<Advisory> inputs)
{
if (inputs.Count == 0)
{
return null;
}
var ghsa = FindBySource(inputs, CanonicalSources.Ghsa);
var nvd = FindBySource(inputs, CanonicalSources.Nvd);
var osv = FindBySource(inputs, CanonicalSources.Osv);
var participatingSources = 0;
if (ghsa is not null)
{
participatingSources++;
}
if (nvd is not null)
{
participatingSources++;
}
if (osv is not null)
{
participatingSources++;
}
if (participatingSources < 2)
{
return null;
}
var result = _canonicalMerger.Merge(canonicalKey, ghsa, nvd, osv);
inputs.RemoveAll(advisory => MatchesCanonicalSource(advisory));
inputs.Add(result.Advisory);
return result;
}
private static Advisory? FindBySource(IEnumerable<Advisory> advisories, string source)
=> advisories.FirstOrDefault(advisory => advisory.Provenance.Any(provenance =>
!string.Equals(provenance.Kind, "merge", StringComparison.OrdinalIgnoreCase) &&
string.Equals(provenance.Source, source, StringComparison.OrdinalIgnoreCase)));
private static bool MatchesCanonicalSource(Advisory advisory)
{
foreach (var provenance in advisory.Provenance)
{
if (string.Equals(provenance.Kind, "merge", StringComparison.OrdinalIgnoreCase))
{
continue;
}
if (string.Equals(provenance.Source, CanonicalSources.Ghsa, StringComparison.OrdinalIgnoreCase) ||
string.Equals(provenance.Source, CanonicalSources.Nvd, StringComparison.OrdinalIgnoreCase) ||
string.Equals(provenance.Source, CanonicalSources.Osv, StringComparison.OrdinalIgnoreCase))
{
return true;
}
}
return false;
}
private static IReadOnlyList<MergeFieldDecision> ConvertFieldDecisions(ImmutableArray<FieldDecision>? decisions)
{
if (decisions is null || decisions.Value.IsDefaultOrEmpty)
{
return Array.Empty<MergeFieldDecision>();
}
var builder = ImmutableArray.CreateBuilder<MergeFieldDecision>(decisions.Value.Length);
foreach (var decision in decisions.Value)
{
builder.Add(new MergeFieldDecision(
decision.Field,
decision.SelectedSource,
decision.DecisionReason,
decision.SelectedModified,
decision.ConsideredSources.ToArray()));
}
return builder.ToImmutable();
}
private static class CanonicalSources
{
public const string Ghsa = "ghsa";
public const string Nvd = "nvd";
public const string Osv = "osv";
}
private sealed record ConflictMaterialization(
List<AdvisoryConflictInput> Inputs,
List<MergeConflictSummary> Summaries);
private static string? SelectCanonicalKey(AliasComponent component)
{
foreach (var scheme in PreferredAliasSchemes)
{
var alias = component.AliasMap.Values
.SelectMany(static aliases => aliases)
.FirstOrDefault(record => string.Equals(record.Scheme, scheme, StringComparison.OrdinalIgnoreCase));
if (!string.IsNullOrWhiteSpace(alias?.Value))
{
return alias.Value;
}
}
if (component.AliasMap.TryGetValue(component.SeedAdvisoryKey, out var seedAliases))
{
var primary = seedAliases.FirstOrDefault(record => string.Equals(record.Scheme, AliasStoreConstants.PrimaryScheme, StringComparison.OrdinalIgnoreCase));
if (!string.IsNullOrWhiteSpace(primary?.Value))
{
return primary.Value;
}
}
var firstAlias = component.AliasMap.Values.SelectMany(static aliases => aliases).FirstOrDefault();
if (!string.IsNullOrWhiteSpace(firstAlias?.Value))
{
return firstAlias.Value;
}
return component.SeedAdvisoryKey;
}
}
public sealed record AdvisoryMergeResult(
string SeedAdvisoryKey,
string CanonicalAdvisoryKey,
AliasComponent Component,
IReadOnlyList<Advisory> Inputs,
Advisory? Previous,
Advisory? Merged,
IReadOnlyList<MergeConflictSummary> Conflicts)
{
public static AdvisoryMergeResult Empty(string seed, AliasComponent component)
=> new(seed, seed, component, Array.Empty<Advisory>(), null, null, Array.Empty<MergeConflictSummary>());
}