save dev progress
This commit is contained in:
@@ -13,6 +13,8 @@ using StellaOps.Concelier.Models;
|
||||
using StellaOps.Concelier.Storage.Advisories;
|
||||
using StellaOps.Concelier.Storage.Aliases;
|
||||
using StellaOps.Concelier.Storage.MergeEvents;
|
||||
using StellaOps.Messaging.Abstractions;
|
||||
using StellaOps.Provcache.Events;
|
||||
using System.Text.Json;
|
||||
using StellaOps.Provenance;
|
||||
|
||||
@@ -43,6 +45,7 @@ public sealed class AdvisoryMergeService
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly CanonicalMerger _canonicalMerger;
|
||||
private readonly IMergeHashCalculator? _mergeHashCalculator;
|
||||
private readonly IEventStream<FeedEpochAdvancedEvent>? _feedEpochEventStream;
|
||||
private readonly ILogger<AdvisoryMergeService> _logger;
|
||||
|
||||
public AdvisoryMergeService(
|
||||
@@ -54,7 +57,8 @@ public sealed class AdvisoryMergeService
|
||||
IAdvisoryEventLog eventLog,
|
||||
TimeProvider timeProvider,
|
||||
ILogger<AdvisoryMergeService> logger,
|
||||
IMergeHashCalculator? mergeHashCalculator = null)
|
||||
IMergeHashCalculator? mergeHashCalculator = null,
|
||||
IEventStream<FeedEpochAdvancedEvent>? feedEpochEventStream = null)
|
||||
{
|
||||
_aliasResolver = aliasResolver ?? throw new ArgumentNullException(nameof(aliasResolver));
|
||||
_advisoryStore = advisoryStore ?? throw new ArgumentNullException(nameof(advisoryStore));
|
||||
@@ -65,6 +69,7 @@ public sealed class AdvisoryMergeService
|
||||
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_mergeHashCalculator = mergeHashCalculator; // Optional during migration
|
||||
_feedEpochEventStream = feedEpochEventStream; // Optional for feed epoch invalidation
|
||||
}
|
||||
|
||||
public async Task<AdvisoryMergeResult> MergeAsync(string seedAdvisoryKey, CancellationToken cancellationToken)
|
||||
@@ -141,9 +146,93 @@ public sealed class AdvisoryMergeService
|
||||
|
||||
var conflictSummaries = await AppendEventLogAsync(canonicalKey, normalizedInputs, merged, conflictDetails, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// Publish FeedEpochAdvancedEvent if merge produced changes
|
||||
await PublishFeedEpochAdvancedAsync(before, merged, inputs, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
return new AdvisoryMergeResult(seedAdvisoryKey, canonicalKey, component, inputs, before, merged, conflictSummaries);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Publishes a FeedEpochAdvancedEvent when merge produces a new or modified canonical advisory.
|
||||
/// This triggers Provcache invalidation for cached decisions based on older feed data.
|
||||
/// </summary>
|
||||
private async Task PublishFeedEpochAdvancedAsync(
|
||||
Advisory? before,
|
||||
Advisory merged,
|
||||
IReadOnlyList<Advisory> inputs,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (_feedEpochEventStream is null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
// Determine if this is a new or modified canonical
|
||||
var isNew = before is null;
|
||||
var isModified = before is not null && before.MergeHash != merged.MergeHash;
|
||||
|
||||
if (!isNew && !isModified)
|
||||
{
|
||||
return; // No change, no need to publish
|
||||
}
|
||||
|
||||
// Extract primary source from inputs for feedId
|
||||
var feedId = ExtractPrimaryFeedId(inputs) ?? "canonical";
|
||||
|
||||
// Compute epochs based on modification timestamps
|
||||
var previousEpoch = before?.Modified?.ToString("O") ?? "initial";
|
||||
var newEpoch = merged.Modified?.ToString("O") ?? _timeProvider.GetUtcNow().ToString("O");
|
||||
var effectiveAt = _timeProvider.GetUtcNow();
|
||||
|
||||
var @event = FeedEpochAdvancedEvent.Create(
|
||||
feedId: feedId,
|
||||
previousEpoch: previousEpoch,
|
||||
newEpoch: newEpoch,
|
||||
effectiveAt: effectiveAt,
|
||||
advisoriesAdded: isNew ? 1 : 0,
|
||||
advisoriesModified: isModified ? 1 : 0);
|
||||
|
||||
try
|
||||
{
|
||||
await _feedEpochEventStream.PublishAsync(@event, options: null, cancellationToken).ConfigureAwait(false);
|
||||
_logger.LogDebug(
|
||||
"Published FeedEpochAdvancedEvent for feed {FeedId}: {PreviousEpoch} -> {NewEpoch}",
|
||||
feedId, previousEpoch, newEpoch);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Log but don't fail the merge operation for event publishing failures
|
||||
_logger.LogWarning(
|
||||
ex,
|
||||
"Failed to publish FeedEpochAdvancedEvent for feed {FeedId}",
|
||||
feedId);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Extracts the primary feed identifier from merged advisory inputs.
|
||||
/// </summary>
|
||||
private static string? ExtractPrimaryFeedId(IReadOnlyList<Advisory> inputs)
|
||||
{
|
||||
foreach (var advisory in inputs)
|
||||
{
|
||||
foreach (var provenance in advisory.Provenance)
|
||||
{
|
||||
if (string.Equals(provenance.Kind, "merge", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(provenance.Source))
|
||||
{
|
||||
return provenance.Source.ToLowerInvariant();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private async Task<IReadOnlyList<MergeConflictSummary>> AppendEventLogAsync(
|
||||
string vulnerabilityKey,
|
||||
IReadOnlyList<Advisory> inputs,
|
||||
|
||||
@@ -3,6 +3,7 @@ namespace StellaOps.Concelier.Merge.Services;
|
||||
using System.Security.Cryptography;
|
||||
using System.Linq;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using StellaOps.Concelier.Merge.Backport;
|
||||
using StellaOps.Concelier.Models;
|
||||
using StellaOps.Concelier.Storage.MergeEvents;
|
||||
|
||||
@@ -35,6 +36,28 @@ public sealed class MergeEventWriter
|
||||
IReadOnlyList<Guid> inputDocumentIds,
|
||||
IReadOnlyList<MergeFieldDecision>? fieldDecisions,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
return await AppendAsync(
|
||||
advisoryKey,
|
||||
before,
|
||||
after,
|
||||
inputDocumentIds,
|
||||
fieldDecisions,
|
||||
backportEvidence: null,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Appends a merge event with optional backport evidence for audit.
|
||||
/// </summary>
|
||||
public async Task<MergeEventRecord> AppendAsync(
|
||||
string advisoryKey,
|
||||
Advisory? before,
|
||||
Advisory after,
|
||||
IReadOnlyList<Guid> inputDocumentIds,
|
||||
IReadOnlyList<MergeFieldDecision>? fieldDecisions,
|
||||
IReadOnlyList<BackportEvidence>? backportEvidence,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(advisoryKey);
|
||||
ArgumentNullException.ThrowIfNull(after);
|
||||
@@ -44,6 +67,9 @@ public sealed class MergeEventWriter
|
||||
var timestamp = _timeProvider.GetUtcNow();
|
||||
var documentIds = inputDocumentIds?.ToArray() ?? Array.Empty<Guid>();
|
||||
|
||||
// Convert backport evidence to audit decisions
|
||||
var evidenceDecisions = ConvertToAuditDecisions(backportEvidence);
|
||||
|
||||
var record = new MergeEventRecord(
|
||||
Guid.NewGuid(),
|
||||
advisoryKey,
|
||||
@@ -51,7 +77,8 @@ public sealed class MergeEventWriter
|
||||
afterHash,
|
||||
timestamp,
|
||||
documentIds,
|
||||
fieldDecisions ?? Array.Empty<MergeFieldDecision>());
|
||||
fieldDecisions ?? Array.Empty<MergeFieldDecision>(),
|
||||
evidenceDecisions);
|
||||
|
||||
if (!CryptographicOperations.FixedTimeEquals(beforeHash, afterHash))
|
||||
{
|
||||
@@ -66,7 +93,34 @@ public sealed class MergeEventWriter
|
||||
_logger.LogInformation("Merge event for {AdvisoryKey} recorded without hash change", advisoryKey);
|
||||
}
|
||||
|
||||
if (evidenceDecisions is { Count: > 0 })
|
||||
{
|
||||
_logger.LogDebug(
|
||||
"Merge event for {AdvisoryKey} includes {Count} backport evidence decision(s)",
|
||||
advisoryKey,
|
||||
evidenceDecisions.Count);
|
||||
}
|
||||
|
||||
await _mergeEventStore.AppendAsync(record, cancellationToken).ConfigureAwait(false);
|
||||
return record;
|
||||
}
|
||||
|
||||
private static IReadOnlyList<BackportEvidenceDecision>? ConvertToAuditDecisions(
|
||||
IReadOnlyList<BackportEvidence>? evidence)
|
||||
{
|
||||
if (evidence is null || evidence.Count == 0)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
return evidence.Select(e => new BackportEvidenceDecision(
|
||||
e.CveId,
|
||||
e.DistroRelease,
|
||||
e.Tier.ToString(),
|
||||
e.Confidence,
|
||||
e.PatchId,
|
||||
e.PatchOrigin.ToString(),
|
||||
e.ProofId,
|
||||
e.EvidenceDate)).ToArray();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user