Rename Feedser to Concelier
This commit is contained in:
		
							
								
								
									
										294
									
								
								src/StellaOps.Concelier.Merge/Services/AdvisoryMergeService.cs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										294
									
								
								src/StellaOps.Concelier.Merge/Services/AdvisoryMergeService.cs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,294 @@ | ||||
| using System; | ||||
| using System.Collections.Generic; | ||||
| using System.Collections.Immutable; | ||||
| using System.Diagnostics.Metrics; | ||||
| using System.Linq; | ||||
| using System.Threading; | ||||
| using System.Threading.Tasks; | ||||
| using Microsoft.Extensions.Logging; | ||||
| using StellaOps.Concelier.Core; | ||||
| using StellaOps.Concelier.Models; | ||||
| using StellaOps.Concelier.Storage.Mongo.Advisories; | ||||
| using StellaOps.Concelier.Storage.Mongo.Aliases; | ||||
| using StellaOps.Concelier.Storage.Mongo.MergeEvents; | ||||
|  | ||||
| namespace StellaOps.Concelier.Merge.Services; | ||||
|  | ||||
| public sealed class AdvisoryMergeService | ||||
| { | ||||
|     private static readonly Meter MergeMeter = new("StellaOps.Concelier.Merge"); | ||||
|     private static readonly Counter<long> AliasCollisionCounter = MergeMeter.CreateCounter<long>( | ||||
|         "concelier.merge.identity_conflicts", | ||||
|         unit: "count", | ||||
|         description: "Number of alias collisions detected during merge."); | ||||
|  | ||||
|     private static readonly string[] PreferredAliasSchemes = | ||||
|     { | ||||
|         AliasSchemes.Cve, | ||||
|         AliasSchemes.Ghsa, | ||||
|         AliasSchemes.OsV, | ||||
|         AliasSchemes.Msrc, | ||||
|     }; | ||||
|  | ||||
|     private readonly AliasGraphResolver _aliasResolver; | ||||
|     private readonly IAdvisoryStore _advisoryStore; | ||||
|     private readonly AdvisoryPrecedenceMerger _precedenceMerger; | ||||
|     private readonly MergeEventWriter _mergeEventWriter; | ||||
|     private readonly CanonicalMerger _canonicalMerger; | ||||
|     private readonly ILogger<AdvisoryMergeService> _logger; | ||||
|  | ||||
|     public AdvisoryMergeService( | ||||
|         AliasGraphResolver aliasResolver, | ||||
|         IAdvisoryStore advisoryStore, | ||||
|         AdvisoryPrecedenceMerger precedenceMerger, | ||||
|         MergeEventWriter mergeEventWriter, | ||||
|         CanonicalMerger canonicalMerger, | ||||
|         ILogger<AdvisoryMergeService> logger) | ||||
|     { | ||||
|         _aliasResolver = aliasResolver ?? throw new ArgumentNullException(nameof(aliasResolver)); | ||||
|         _advisoryStore = advisoryStore ?? throw new ArgumentNullException(nameof(advisoryStore)); | ||||
|         _precedenceMerger = precedenceMerger ?? throw new ArgumentNullException(nameof(precedenceMerger)); | ||||
|         _mergeEventWriter = mergeEventWriter ?? throw new ArgumentNullException(nameof(mergeEventWriter)); | ||||
|         _canonicalMerger = canonicalMerger ?? throw new ArgumentNullException(nameof(canonicalMerger)); | ||||
|         _logger = logger ?? throw new ArgumentNullException(nameof(logger)); | ||||
|     } | ||||
|  | ||||
|     public async Task<AdvisoryMergeResult> MergeAsync(string seedAdvisoryKey, CancellationToken cancellationToken) | ||||
|     { | ||||
|         ArgumentException.ThrowIfNullOrWhiteSpace(seedAdvisoryKey); | ||||
|  | ||||
|         var component = await _aliasResolver.BuildComponentAsync(seedAdvisoryKey, cancellationToken).ConfigureAwait(false); | ||||
|         var inputs = new List<Advisory>(); | ||||
|  | ||||
|         foreach (var advisoryKey in component.AdvisoryKeys) | ||||
|         { | ||||
|             cancellationToken.ThrowIfCancellationRequested(); | ||||
|             var advisory = await _advisoryStore.FindAsync(advisoryKey, cancellationToken).ConfigureAwait(false); | ||||
|             if (advisory is not null) | ||||
|             { | ||||
|                 inputs.Add(advisory); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         if (inputs.Count == 0) | ||||
|         { | ||||
|             _logger.LogWarning("Alias component seeded by {Seed} contains no persisted advisories", seedAdvisoryKey); | ||||
|             return AdvisoryMergeResult.Empty(seedAdvisoryKey, component); | ||||
|         } | ||||
|  | ||||
|         var canonicalKey = SelectCanonicalKey(component) ?? seedAdvisoryKey; | ||||
|         var canonicalMerge = ApplyCanonicalMergeIfNeeded(canonicalKey, inputs); | ||||
|         var before = await _advisoryStore.FindAsync(canonicalKey, cancellationToken).ConfigureAwait(false); | ||||
|         var normalizedInputs = NormalizeInputs(inputs, canonicalKey).ToList(); | ||||
|  | ||||
|         Advisory? merged; | ||||
|         try | ||||
|         { | ||||
|             merged = _precedenceMerger.Merge(normalizedInputs); | ||||
|         } | ||||
|         catch (Exception ex) | ||||
|         { | ||||
|             _logger.LogError(ex, "Failed to merge alias component seeded by {Seed}", seedAdvisoryKey); | ||||
|             throw; | ||||
|         } | ||||
|  | ||||
|         if (component.Collisions.Count > 0) | ||||
|         { | ||||
|             foreach (var collision in component.Collisions) | ||||
|             { | ||||
|                 var tags = new KeyValuePair<string, object?>[] | ||||
|                 { | ||||
|                     new("scheme", collision.Scheme ?? string.Empty), | ||||
|                     new("alias_value", collision.Value ?? string.Empty), | ||||
|                     new("advisory_count", collision.AdvisoryKeys.Count), | ||||
|                 }; | ||||
|  | ||||
|                 AliasCollisionCounter.Add(1, tags); | ||||
|  | ||||
|                 _logger.LogInformation( | ||||
|                     "Alias collision {Scheme}:{Value} involves advisories {Advisories}", | ||||
|                     collision.Scheme, | ||||
|                     collision.Value, | ||||
|                     string.Join(", ", collision.AdvisoryKeys)); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         if (merged is not null) | ||||
|         { | ||||
|             await _advisoryStore.UpsertAsync(merged, cancellationToken).ConfigureAwait(false); | ||||
|             await _mergeEventWriter.AppendAsync( | ||||
|                 canonicalKey, | ||||
|                 before, | ||||
|                 merged, | ||||
|                 Array.Empty<Guid>(), | ||||
|                 ConvertFieldDecisions(canonicalMerge?.Decisions), | ||||
|                 cancellationToken).ConfigureAwait(false); | ||||
|         } | ||||
|  | ||||
|         return new AdvisoryMergeResult(seedAdvisoryKey, canonicalKey, component, inputs, before, merged); | ||||
|     } | ||||
|  | ||||
|     private static IEnumerable<Advisory> NormalizeInputs(IEnumerable<Advisory> advisories, string canonicalKey) | ||||
|     { | ||||
|         foreach (var advisory in advisories) | ||||
|         { | ||||
|             yield return CloneWithKey(advisory, canonicalKey); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     private static Advisory CloneWithKey(Advisory source, string advisoryKey) | ||||
|         => new( | ||||
|             advisoryKey, | ||||
|             source.Title, | ||||
|             source.Summary, | ||||
|             source.Language, | ||||
|             source.Published, | ||||
|             source.Modified, | ||||
|             source.Severity, | ||||
|             source.ExploitKnown, | ||||
|             source.Aliases, | ||||
|             source.Credits, | ||||
|             source.References, | ||||
|             source.AffectedPackages, | ||||
|             source.CvssMetrics, | ||||
|             source.Provenance, | ||||
|             source.Description, | ||||
|             source.Cwes, | ||||
|             source.CanonicalMetricId); | ||||
|  | ||||
|     private CanonicalMergeResult? ApplyCanonicalMergeIfNeeded(string canonicalKey, List<Advisory> inputs) | ||||
|     { | ||||
|         if (inputs.Count == 0) | ||||
|         { | ||||
|             return null; | ||||
|         } | ||||
|  | ||||
|         var ghsa = FindBySource(inputs, CanonicalSources.Ghsa); | ||||
|         var nvd = FindBySource(inputs, CanonicalSources.Nvd); | ||||
|         var osv = FindBySource(inputs, CanonicalSources.Osv); | ||||
|  | ||||
|         var participatingSources = 0; | ||||
|         if (ghsa is not null) | ||||
|         { | ||||
|             participatingSources++; | ||||
|         } | ||||
|  | ||||
|         if (nvd is not null) | ||||
|         { | ||||
|             participatingSources++; | ||||
|         } | ||||
|  | ||||
|         if (osv is not null) | ||||
|         { | ||||
|             participatingSources++; | ||||
|         } | ||||
|  | ||||
|         if (participatingSources < 2) | ||||
|         { | ||||
|             return null; | ||||
|         } | ||||
|  | ||||
|         var result = _canonicalMerger.Merge(canonicalKey, ghsa, nvd, osv); | ||||
|  | ||||
|         inputs.RemoveAll(advisory => MatchesCanonicalSource(advisory)); | ||||
|         inputs.Add(result.Advisory); | ||||
|  | ||||
|         return result; | ||||
|     } | ||||
|  | ||||
|     private static Advisory? FindBySource(IEnumerable<Advisory> advisories, string source) | ||||
|         => advisories.FirstOrDefault(advisory => advisory.Provenance.Any(provenance => | ||||
|             !string.Equals(provenance.Kind, "merge", StringComparison.OrdinalIgnoreCase) && | ||||
|             string.Equals(provenance.Source, source, StringComparison.OrdinalIgnoreCase))); | ||||
|  | ||||
|     private static bool MatchesCanonicalSource(Advisory advisory) | ||||
|     { | ||||
|         foreach (var provenance in advisory.Provenance) | ||||
|         { | ||||
|             if (string.Equals(provenance.Kind, "merge", StringComparison.OrdinalIgnoreCase)) | ||||
|             { | ||||
|                 continue; | ||||
|             } | ||||
|  | ||||
|             if (string.Equals(provenance.Source, CanonicalSources.Ghsa, StringComparison.OrdinalIgnoreCase) || | ||||
|                 string.Equals(provenance.Source, CanonicalSources.Nvd, StringComparison.OrdinalIgnoreCase) || | ||||
|                 string.Equals(provenance.Source, CanonicalSources.Osv, StringComparison.OrdinalIgnoreCase)) | ||||
|             { | ||||
|                 return true; | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         return false; | ||||
|     } | ||||
|  | ||||
|     private static IReadOnlyList<MergeFieldDecision> ConvertFieldDecisions(ImmutableArray<FieldDecision>? decisions) | ||||
|     { | ||||
|         if (decisions is null || decisions.Value.IsDefaultOrEmpty) | ||||
|         { | ||||
|             return Array.Empty<MergeFieldDecision>(); | ||||
|         } | ||||
|  | ||||
|         var builder = ImmutableArray.CreateBuilder<MergeFieldDecision>(decisions.Value.Length); | ||||
|         foreach (var decision in decisions.Value) | ||||
|         { | ||||
|             builder.Add(new MergeFieldDecision( | ||||
|                 decision.Field, | ||||
|                 decision.SelectedSource, | ||||
|                 decision.DecisionReason, | ||||
|                 decision.SelectedModified, | ||||
|                 decision.ConsideredSources.ToArray())); | ||||
|         } | ||||
|  | ||||
|         return builder.ToImmutable(); | ||||
|     } | ||||
|  | ||||
|     private static class CanonicalSources | ||||
|     { | ||||
|         public const string Ghsa = "ghsa"; | ||||
|         public const string Nvd = "nvd"; | ||||
|         public const string Osv = "osv"; | ||||
|     } | ||||
|  | ||||
|     private static string? SelectCanonicalKey(AliasComponent component) | ||||
|     { | ||||
|         foreach (var scheme in PreferredAliasSchemes) | ||||
|         { | ||||
|             var alias = component.AliasMap.Values | ||||
|                 .SelectMany(static aliases => aliases) | ||||
|                 .FirstOrDefault(record => string.Equals(record.Scheme, scheme, StringComparison.OrdinalIgnoreCase)); | ||||
|             if (!string.IsNullOrWhiteSpace(alias?.Value)) | ||||
|             { | ||||
|                 return alias.Value; | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         if (component.AliasMap.TryGetValue(component.SeedAdvisoryKey, out var seedAliases)) | ||||
|         { | ||||
|             var primary = seedAliases.FirstOrDefault(record => string.Equals(record.Scheme, AliasStoreConstants.PrimaryScheme, StringComparison.OrdinalIgnoreCase)); | ||||
|             if (!string.IsNullOrWhiteSpace(primary?.Value)) | ||||
|             { | ||||
|                 return primary.Value; | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         var firstAlias = component.AliasMap.Values.SelectMany(static aliases => aliases).FirstOrDefault(); | ||||
|         if (!string.IsNullOrWhiteSpace(firstAlias?.Value)) | ||||
|         { | ||||
|             return firstAlias.Value; | ||||
|         } | ||||
|  | ||||
|         return component.SeedAdvisoryKey; | ||||
|     } | ||||
| } | ||||
|  | ||||
| public sealed record AdvisoryMergeResult( | ||||
|     string SeedAdvisoryKey, | ||||
|     string CanonicalAdvisoryKey, | ||||
|     AliasComponent Component, | ||||
|     IReadOnlyList<Advisory> Inputs, | ||||
|     Advisory? Previous, | ||||
|     Advisory? Merged) | ||||
| { | ||||
|     public static AdvisoryMergeResult Empty(string seed, AliasComponent component) | ||||
|         => new(seed, seed, component, Array.Empty<Advisory>(), null, null); | ||||
| } | ||||
		Reference in New Issue
	
	Block a user