Add channel test providers for Email, Slack, Teams, and Webhook
	
		
			
	
		
	
	
		
	
		
			Some checks failed
		
		
	
	
		
			
				
	
				Docs CI / lint-and-preview (push) Has been cancelled
				
			
		
		
	
	
				
					
				
			
		
			Some checks failed
		
		
	
	Docs CI / lint-and-preview (push) Has been cancelled
				
			- Implemented EmailChannelTestProvider to generate email preview payloads. - Implemented SlackChannelTestProvider to create Slack message previews. - Implemented TeamsChannelTestProvider for generating Teams Adaptive Card previews. - Implemented WebhookChannelTestProvider to create webhook payloads. - Added INotifyChannelTestProvider interface for channel-specific preview generation. - Created ChannelTestPreviewContracts for request and response models. - Developed NotifyChannelTestService to handle test send requests and generate previews. - Added rate limit policies for test sends and delivery history. - Implemented unit tests for service registration and binding. - Updated project files to include necessary dependencies and configurations.
This commit is contained in:
		| @@ -7,21 +7,23 @@ using System.Threading; | ||||
| using System.Threading.Tasks; | ||||
| using Microsoft.Extensions.Logging; | ||||
| using StellaOps.Concelier.Core; | ||||
| using StellaOps.Concelier.Core.Events; | ||||
| using StellaOps.Concelier.Models; | ||||
| using StellaOps.Concelier.Storage.Mongo.Advisories; | ||||
| using StellaOps.Concelier.Storage.Mongo.Aliases; | ||||
| using StellaOps.Concelier.Storage.Mongo.MergeEvents; | ||||
|  | ||||
| namespace StellaOps.Concelier.Merge.Services; | ||||
|  | ||||
| public sealed class AdvisoryMergeService | ||||
| { | ||||
|     private static readonly Meter MergeMeter = new("StellaOps.Concelier.Merge"); | ||||
|     private static readonly Counter<long> AliasCollisionCounter = MergeMeter.CreateCounter<long>( | ||||
|         "concelier.merge.identity_conflicts", | ||||
|         unit: "count", | ||||
|         description: "Number of alias collisions detected during merge."); | ||||
|  | ||||
| using System.Text.Json; | ||||
|  | ||||
| namespace StellaOps.Concelier.Merge.Services; | ||||
|  | ||||
| public sealed class AdvisoryMergeService | ||||
| { | ||||
|     private static readonly Meter MergeMeter = new("StellaOps.Concelier.Merge"); | ||||
|     private static readonly Counter<long> AliasCollisionCounter = MergeMeter.CreateCounter<long>( | ||||
|         "concelier.merge.identity_conflicts", | ||||
|         unit: "count", | ||||
|         description: "Number of alias collisions detected during merge."); | ||||
|  | ||||
|     private static readonly string[] PreferredAliasSchemes = | ||||
|     { | ||||
|         AliasSchemes.Cve, | ||||
| @@ -34,6 +36,8 @@ public sealed class AdvisoryMergeService | ||||
|     private readonly IAdvisoryStore _advisoryStore; | ||||
|     private readonly AdvisoryPrecedenceMerger _precedenceMerger; | ||||
|     private readonly MergeEventWriter _mergeEventWriter; | ||||
|     private readonly IAdvisoryEventLog _eventLog; | ||||
|     private readonly TimeProvider _timeProvider; | ||||
|     private readonly CanonicalMerger _canonicalMerger; | ||||
|     private readonly ILogger<AdvisoryMergeService> _logger; | ||||
|  | ||||
| @@ -43,6 +47,8 @@ public sealed class AdvisoryMergeService | ||||
|         AdvisoryPrecedenceMerger precedenceMerger, | ||||
|         MergeEventWriter mergeEventWriter, | ||||
|         CanonicalMerger canonicalMerger, | ||||
|         IAdvisoryEventLog eventLog, | ||||
|         TimeProvider timeProvider, | ||||
|         ILogger<AdvisoryMergeService> logger) | ||||
|     { | ||||
|         _aliasResolver = aliasResolver ?? throw new ArgumentNullException(nameof(aliasResolver)); | ||||
| @@ -50,92 +56,222 @@ public sealed class AdvisoryMergeService | ||||
|         _precedenceMerger = precedenceMerger ?? throw new ArgumentNullException(nameof(precedenceMerger)); | ||||
|         _mergeEventWriter = mergeEventWriter ?? throw new ArgumentNullException(nameof(mergeEventWriter)); | ||||
|         _canonicalMerger = canonicalMerger ?? throw new ArgumentNullException(nameof(canonicalMerger)); | ||||
|         _eventLog = eventLog ?? throw new ArgumentNullException(nameof(eventLog)); | ||||
|         _timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider)); | ||||
|         _logger = logger ?? throw new ArgumentNullException(nameof(logger)); | ||||
|     } | ||||
|  | ||||
|     public async Task<AdvisoryMergeResult> MergeAsync(string seedAdvisoryKey, CancellationToken cancellationToken) | ||||
|     { | ||||
|         ArgumentException.ThrowIfNullOrWhiteSpace(seedAdvisoryKey); | ||||
|  | ||||
|         var component = await _aliasResolver.BuildComponentAsync(seedAdvisoryKey, cancellationToken).ConfigureAwait(false); | ||||
|         var inputs = new List<Advisory>(); | ||||
|  | ||||
|         foreach (var advisoryKey in component.AdvisoryKeys) | ||||
|         { | ||||
|             cancellationToken.ThrowIfCancellationRequested(); | ||||
|             var advisory = await _advisoryStore.FindAsync(advisoryKey, cancellationToken).ConfigureAwait(false); | ||||
|             if (advisory is not null) | ||||
|             { | ||||
|                 inputs.Add(advisory); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         if (inputs.Count == 0) | ||||
|         { | ||||
|             _logger.LogWarning("Alias component seeded by {Seed} contains no persisted advisories", seedAdvisoryKey); | ||||
|             return AdvisoryMergeResult.Empty(seedAdvisoryKey, component); | ||||
|         } | ||||
|  | ||||
|  | ||||
|     public async Task<AdvisoryMergeResult> MergeAsync(string seedAdvisoryKey, CancellationToken cancellationToken) | ||||
|     { | ||||
|         ArgumentException.ThrowIfNullOrWhiteSpace(seedAdvisoryKey); | ||||
|  | ||||
|         var component = await _aliasResolver.BuildComponentAsync(seedAdvisoryKey, cancellationToken).ConfigureAwait(false); | ||||
|         var inputs = new List<Advisory>(); | ||||
|  | ||||
|         foreach (var advisoryKey in component.AdvisoryKeys) | ||||
|         { | ||||
|             cancellationToken.ThrowIfCancellationRequested(); | ||||
|             var advisory = await _advisoryStore.FindAsync(advisoryKey, cancellationToken).ConfigureAwait(false); | ||||
|             if (advisory is not null) | ||||
|             { | ||||
|                 inputs.Add(advisory); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         if (inputs.Count == 0) | ||||
|         { | ||||
|             _logger.LogWarning("Alias component seeded by {Seed} contains no persisted advisories", seedAdvisoryKey); | ||||
|             return AdvisoryMergeResult.Empty(seedAdvisoryKey, component); | ||||
|         } | ||||
|  | ||||
|         var canonicalKey = SelectCanonicalKey(component) ?? seedAdvisoryKey; | ||||
|         var canonicalMerge = ApplyCanonicalMergeIfNeeded(canonicalKey, inputs); | ||||
|         var before = await _advisoryStore.FindAsync(canonicalKey, cancellationToken).ConfigureAwait(false); | ||||
|         var normalizedInputs = NormalizeInputs(inputs, canonicalKey).ToList(); | ||||
|  | ||||
|         Advisory? merged; | ||||
|         PrecedenceMergeResult precedenceResult; | ||||
|         try | ||||
|         { | ||||
|             merged = _precedenceMerger.Merge(normalizedInputs); | ||||
|         } | ||||
|         catch (Exception ex) | ||||
|         { | ||||
|             _logger.LogError(ex, "Failed to merge alias component seeded by {Seed}", seedAdvisoryKey); | ||||
|             throw; | ||||
|         } | ||||
|  | ||||
|         if (component.Collisions.Count > 0) | ||||
|         { | ||||
|             foreach (var collision in component.Collisions) | ||||
|             { | ||||
|                 var tags = new KeyValuePair<string, object?>[] | ||||
|                 { | ||||
|                     new("scheme", collision.Scheme ?? string.Empty), | ||||
|                     new("alias_value", collision.Value ?? string.Empty), | ||||
|                     new("advisory_count", collision.AdvisoryKeys.Count), | ||||
|                 }; | ||||
|  | ||||
|                 AliasCollisionCounter.Add(1, tags); | ||||
|  | ||||
|                 _logger.LogInformation( | ||||
|                     "Alias collision {Scheme}:{Value} involves advisories {Advisories}", | ||||
|                     collision.Scheme, | ||||
|                     collision.Value, | ||||
|                     string.Join(", ", collision.AdvisoryKeys)); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         if (merged is not null) | ||||
|         { | ||||
|             await _advisoryStore.UpsertAsync(merged, cancellationToken).ConfigureAwait(false); | ||||
|             await _mergeEventWriter.AppendAsync( | ||||
|                 canonicalKey, | ||||
|                 before, | ||||
|                 merged, | ||||
|                 Array.Empty<Guid>(), | ||||
|                 ConvertFieldDecisions(canonicalMerge?.Decisions), | ||||
|                 cancellationToken).ConfigureAwait(false); | ||||
|             precedenceResult = _precedenceMerger.Merge(normalizedInputs); | ||||
|         } | ||||
|         catch (Exception ex) | ||||
|         { | ||||
|             _logger.LogError(ex, "Failed to merge alias component seeded by {Seed}", seedAdvisoryKey); | ||||
|             throw; | ||||
|         } | ||||
|  | ||||
|         var merged = precedenceResult.Advisory; | ||||
|         var conflictDetails = precedenceResult.Conflicts; | ||||
|  | ||||
|         if (component.Collisions.Count > 0) | ||||
|         { | ||||
|             foreach (var collision in component.Collisions) | ||||
|             { | ||||
|                 var tags = new KeyValuePair<string, object?>[] | ||||
|                 { | ||||
|                     new("scheme", collision.Scheme ?? string.Empty), | ||||
|                     new("alias_value", collision.Value ?? string.Empty), | ||||
|                     new("advisory_count", collision.AdvisoryKeys.Count), | ||||
|                 }; | ||||
|  | ||||
|                 AliasCollisionCounter.Add(1, tags); | ||||
|  | ||||
|                 _logger.LogInformation( | ||||
|                     "Alias collision {Scheme}:{Value} involves advisories {Advisories}", | ||||
|                     collision.Scheme, | ||||
|                     collision.Value, | ||||
|                     string.Join(", ", collision.AdvisoryKeys)); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         await _advisoryStore.UpsertAsync(merged, cancellationToken).ConfigureAwait(false); | ||||
|         await _mergeEventWriter.AppendAsync( | ||||
|             canonicalKey, | ||||
|             before, | ||||
|             merged, | ||||
|             Array.Empty<Guid>(), | ||||
|             ConvertFieldDecisions(canonicalMerge?.Decisions), | ||||
|             cancellationToken).ConfigureAwait(false); | ||||
|  | ||||
|         await AppendEventLogAsync(canonicalKey, normalizedInputs, merged, conflictDetails, cancellationToken).ConfigureAwait(false); | ||||
|  | ||||
|         return new AdvisoryMergeResult(seedAdvisoryKey, canonicalKey, component, inputs, before, merged); | ||||
|     } | ||||
|  | ||||
|     private static IEnumerable<Advisory> NormalizeInputs(IEnumerable<Advisory> advisories, string canonicalKey) | ||||
|     { | ||||
|         foreach (var advisory in advisories) | ||||
|         { | ||||
|             yield return CloneWithKey(advisory, canonicalKey); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     } | ||||
|  | ||||
|     private async Task AppendEventLogAsync( | ||||
|         string vulnerabilityKey, | ||||
|         IReadOnlyList<Advisory> inputs, | ||||
|         Advisory merged, | ||||
|         IReadOnlyList<MergeConflictDetail> conflicts, | ||||
|         CancellationToken cancellationToken) | ||||
|     { | ||||
|         var recordedAt = _timeProvider.GetUtcNow(); | ||||
|         var statements = new List<AdvisoryStatementInput>(inputs.Count + 1); | ||||
|         var statementIds = new Dictionary<Advisory, Guid>(ReferenceEqualityComparer.Instance); | ||||
|  | ||||
|         foreach (var advisory in inputs) | ||||
|         { | ||||
|             var statementId = Guid.NewGuid(); | ||||
|             statementIds[advisory] = statementId; | ||||
|             statements.Add(new AdvisoryStatementInput( | ||||
|                 vulnerabilityKey, | ||||
|                 advisory, | ||||
|                 DetermineAsOf(advisory, recordedAt), | ||||
|                 InputDocumentIds: Array.Empty<Guid>(), | ||||
|                 StatementId: statementId, | ||||
|                 AdvisoryKey: advisory.AdvisoryKey)); | ||||
|         } | ||||
|  | ||||
|         var canonicalStatementId = Guid.NewGuid(); | ||||
|         statementIds[merged] = canonicalStatementId; | ||||
|         statements.Add(new AdvisoryStatementInput( | ||||
|             vulnerabilityKey, | ||||
|             merged, | ||||
|             recordedAt, | ||||
|             InputDocumentIds: Array.Empty<Guid>(), | ||||
|             StatementId: canonicalStatementId, | ||||
|             AdvisoryKey: merged.AdvisoryKey)); | ||||
|  | ||||
|         var conflictInputs = BuildConflictInputs(conflicts, vulnerabilityKey, statementIds, canonicalStatementId, recordedAt); | ||||
|  | ||||
|         if (statements.Count == 0 && conflictInputs.Count == 0) | ||||
|         { | ||||
|             return; | ||||
|         } | ||||
|  | ||||
|         var request = new AdvisoryEventAppendRequest(statements, conflictInputs.Count > 0 ? conflictInputs : null); | ||||
|  | ||||
|         try | ||||
|         { | ||||
|             await _eventLog.AppendAsync(request, cancellationToken).ConfigureAwait(false); | ||||
|         } | ||||
|         finally | ||||
|         { | ||||
|             foreach (var conflict in conflictInputs) | ||||
|             { | ||||
|                 conflict.Details.Dispose(); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     private static DateTimeOffset DetermineAsOf(Advisory advisory, DateTimeOffset fallback) | ||||
|     { | ||||
|         return (advisory.Modified ?? advisory.Published ?? fallback).ToUniversalTime(); | ||||
|     } | ||||
|  | ||||
|     private static List<AdvisoryConflictInput> BuildConflictInputs( | ||||
|         IReadOnlyList<MergeConflictDetail> conflicts, | ||||
|         string vulnerabilityKey, | ||||
|         IReadOnlyDictionary<Advisory, Guid> statementIds, | ||||
|         Guid canonicalStatementId, | ||||
|         DateTimeOffset recordedAt) | ||||
|     { | ||||
|         if (conflicts.Count == 0) | ||||
|         { | ||||
|             return new List<AdvisoryConflictInput>(0); | ||||
|         } | ||||
|  | ||||
|         var inputs = new List<AdvisoryConflictInput>(conflicts.Count); | ||||
|  | ||||
|         foreach (var detail in conflicts) | ||||
|         { | ||||
|             if (!statementIds.TryGetValue(detail.Suppressed, out var suppressedId)) | ||||
|             { | ||||
|                 continue; | ||||
|             } | ||||
|  | ||||
|             var related = new List<Guid> { canonicalStatementId, suppressedId }; | ||||
|             if (statementIds.TryGetValue(detail.Primary, out var primaryId)) | ||||
|             { | ||||
|                 if (!related.Contains(primaryId)) | ||||
|                 { | ||||
|                     related.Add(primaryId); | ||||
|                 } | ||||
|             } | ||||
|  | ||||
|             var payload = new ConflictDetailPayload( | ||||
|                 detail.ConflictType, | ||||
|                 detail.Reason, | ||||
|                 detail.PrimarySources, | ||||
|                 detail.PrimaryRank, | ||||
|                 detail.SuppressedSources, | ||||
|                 detail.SuppressedRank, | ||||
|                 detail.PrimaryValue, | ||||
|                 detail.SuppressedValue); | ||||
|  | ||||
|             var json = CanonicalJsonSerializer.Serialize(payload); | ||||
|             var document = JsonDocument.Parse(json); | ||||
|             var asOf = (detail.Primary.Modified ?? detail.Suppressed.Modified ?? recordedAt).ToUniversalTime(); | ||||
|  | ||||
|             inputs.Add(new AdvisoryConflictInput( | ||||
|                 vulnerabilityKey, | ||||
|                 document, | ||||
|                 asOf, | ||||
|                 related, | ||||
|                 ConflictId: null)); | ||||
|         } | ||||
|  | ||||
|         return inputs; | ||||
|     } | ||||
|  | ||||
|     private sealed record ConflictDetailPayload( | ||||
|         string Type, | ||||
|         string Reason, | ||||
|         IReadOnlyList<string> PrimarySources, | ||||
|         int PrimaryRank, | ||||
|         IReadOnlyList<string> SuppressedSources, | ||||
|         int SuppressedRank, | ||||
|         string? PrimaryValue, | ||||
|         string? SuppressedValue); | ||||
|  | ||||
|     private static IEnumerable<Advisory> NormalizeInputs(IEnumerable<Advisory> advisories, string canonicalKey) | ||||
|     { | ||||
|         foreach (var advisory in advisories) | ||||
|         { | ||||
|             yield return CloneWithKey(advisory, canonicalKey); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     private static Advisory CloneWithKey(Advisory source, string advisoryKey) | ||||
|         => new( | ||||
|             advisoryKey, | ||||
| @@ -248,47 +384,47 @@ public sealed class AdvisoryMergeService | ||||
|         public const string Nvd = "nvd"; | ||||
|         public const string Osv = "osv"; | ||||
|     } | ||||
|  | ||||
|     private static string? SelectCanonicalKey(AliasComponent component) | ||||
|     { | ||||
|         foreach (var scheme in PreferredAliasSchemes) | ||||
|         { | ||||
|             var alias = component.AliasMap.Values | ||||
|                 .SelectMany(static aliases => aliases) | ||||
|                 .FirstOrDefault(record => string.Equals(record.Scheme, scheme, StringComparison.OrdinalIgnoreCase)); | ||||
|             if (!string.IsNullOrWhiteSpace(alias?.Value)) | ||||
|             { | ||||
|                 return alias.Value; | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         if (component.AliasMap.TryGetValue(component.SeedAdvisoryKey, out var seedAliases)) | ||||
|         { | ||||
|             var primary = seedAliases.FirstOrDefault(record => string.Equals(record.Scheme, AliasStoreConstants.PrimaryScheme, StringComparison.OrdinalIgnoreCase)); | ||||
|             if (!string.IsNullOrWhiteSpace(primary?.Value)) | ||||
|             { | ||||
|                 return primary.Value; | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         var firstAlias = component.AliasMap.Values.SelectMany(static aliases => aliases).FirstOrDefault(); | ||||
|         if (!string.IsNullOrWhiteSpace(firstAlias?.Value)) | ||||
|         { | ||||
|             return firstAlias.Value; | ||||
|         } | ||||
|  | ||||
|         return component.SeedAdvisoryKey; | ||||
|     } | ||||
| } | ||||
|  | ||||
| public sealed record AdvisoryMergeResult( | ||||
|     string SeedAdvisoryKey, | ||||
|     string CanonicalAdvisoryKey, | ||||
|     AliasComponent Component, | ||||
|     IReadOnlyList<Advisory> Inputs, | ||||
|     Advisory? Previous, | ||||
|     Advisory? Merged) | ||||
| { | ||||
|     public static AdvisoryMergeResult Empty(string seed, AliasComponent component) | ||||
|         => new(seed, seed, component, Array.Empty<Advisory>(), null, null); | ||||
| } | ||||
|  | ||||
|     private static string? SelectCanonicalKey(AliasComponent component) | ||||
|     { | ||||
|         foreach (var scheme in PreferredAliasSchemes) | ||||
|         { | ||||
|             var alias = component.AliasMap.Values | ||||
|                 .SelectMany(static aliases => aliases) | ||||
|                 .FirstOrDefault(record => string.Equals(record.Scheme, scheme, StringComparison.OrdinalIgnoreCase)); | ||||
|             if (!string.IsNullOrWhiteSpace(alias?.Value)) | ||||
|             { | ||||
|                 return alias.Value; | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         if (component.AliasMap.TryGetValue(component.SeedAdvisoryKey, out var seedAliases)) | ||||
|         { | ||||
|             var primary = seedAliases.FirstOrDefault(record => string.Equals(record.Scheme, AliasStoreConstants.PrimaryScheme, StringComparison.OrdinalIgnoreCase)); | ||||
|             if (!string.IsNullOrWhiteSpace(primary?.Value)) | ||||
|             { | ||||
|                 return primary.Value; | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         var firstAlias = component.AliasMap.Values.SelectMany(static aliases => aliases).FirstOrDefault(); | ||||
|         if (!string.IsNullOrWhiteSpace(firstAlias?.Value)) | ||||
|         { | ||||
|             return firstAlias.Value; | ||||
|         } | ||||
|  | ||||
|         return component.SeedAdvisoryKey; | ||||
|     } | ||||
| } | ||||
|  | ||||
| public sealed record AdvisoryMergeResult( | ||||
|     string SeedAdvisoryKey, | ||||
|     string CanonicalAdvisoryKey, | ||||
|     AliasComponent Component, | ||||
|     IReadOnlyList<Advisory> Inputs, | ||||
|     Advisory? Previous, | ||||
|     Advisory? Merged) | ||||
| { | ||||
|     public static AdvisoryMergeResult Empty(string seed, AliasComponent component) | ||||
|         => new(seed, seed, component, Array.Empty<Advisory>(), null, null); | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user