Add channel test providers for Email, Slack, Teams, and Webhook
- Implemented EmailChannelTestProvider to generate email preview payloads. - Implemented SlackChannelTestProvider to create Slack message previews. - Implemented TeamsChannelTestProvider for generating Teams Adaptive Card previews. - Implemented WebhookChannelTestProvider to create webhook payloads. - Added INotifyChannelTestProvider interface for channel-specific preview generation. - Created ChannelTestPreviewContracts for request and response models. - Developed NotifyChannelTestService to handle test send requests and generate previews. - Added rate limit policies for test sends and delivery history. - Implemented unit tests for service registration and binding. - Updated project files to include necessary dependencies and configurations.
This commit is contained in:
@@ -32,7 +32,7 @@ services.AddCsafNormalizer();
|
||||
services.AddCycloneDxNormalizer();
|
||||
services.AddOpenVexNormalizer();
|
||||
services.AddSingleton<IVexSignatureVerifier, NoopVexSignatureVerifier>();
|
||||
services.AddSingleton<IVexIngestOrchestrator, VexIngestOrchestrator>();
|
||||
services.AddScoped<IVexIngestOrchestrator, VexIngestOrchestrator>();
|
||||
services.AddVexExportEngine();
|
||||
services.AddVexExportCacheServices();
|
||||
services.AddVexAttestation();
|
||||
@@ -140,6 +140,23 @@ app.MapGet("/excititor/statements/{vulnerabilityId}/{productKey}", async (
|
||||
return Results.Ok(claims);
|
||||
});
|
||||
|
||||
app.MapPost("/excititor/admin/backfill-statements", async (
|
||||
VexStatementBackfillRequest? request,
|
||||
VexStatementBackfillService backfillService,
|
||||
CancellationToken cancellationToken) =>
|
||||
{
|
||||
request ??= new VexStatementBackfillRequest();
|
||||
var result = await backfillService.RunAsync(request, cancellationToken).ConfigureAwait(false);
|
||||
var message = FormattableString.Invariant(
|
||||
$"Backfill completed: evaluated {result.DocumentsEvaluated}, backfilled {result.DocumentsBackfilled}, claims written {result.ClaimsWritten}, skipped {result.SkippedExisting}, failures {result.NormalizationFailures}.");
|
||||
|
||||
return Results.Ok(new
|
||||
{
|
||||
message,
|
||||
summary = result
|
||||
});
|
||||
});
|
||||
|
||||
IngestEndpoints.MapIngestEndpoints(app);
|
||||
ResolveEndpoint.MapResolveEndpoint(app);
|
||||
MirrorEndpoints.MapMirrorEndpoints(app);
|
||||
|
||||
@@ -3,6 +3,7 @@ using System.Diagnostics;
|
||||
using System.Globalization;
|
||||
using System.Linq;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using MongoDB.Driver;
|
||||
using StellaOps.Excititor.Connectors.Abstractions;
|
||||
using StellaOps.Excititor.Core;
|
||||
using StellaOps.Excititor.Storage.Mongo;
|
||||
@@ -30,6 +31,7 @@ internal sealed class VexIngestOrchestrator : IVexIngestOrchestrator
|
||||
private readonly IVexConnectorStateRepository _stateRepository;
|
||||
private readonly IVexNormalizerRouter _normalizerRouter;
|
||||
private readonly IVexSignatureVerifier _signatureVerifier;
|
||||
private readonly IVexMongoSessionProvider _sessionProvider;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly ILogger<VexIngestOrchestrator> _logger;
|
||||
|
||||
@@ -42,6 +44,7 @@ internal sealed class VexIngestOrchestrator : IVexIngestOrchestrator
|
||||
IVexConnectorStateRepository stateRepository,
|
||||
IVexNormalizerRouter normalizerRouter,
|
||||
IVexSignatureVerifier signatureVerifier,
|
||||
IVexMongoSessionProvider sessionProvider,
|
||||
TimeProvider timeProvider,
|
||||
ILogger<VexIngestOrchestrator> logger)
|
||||
{
|
||||
@@ -52,6 +55,7 @@ internal sealed class VexIngestOrchestrator : IVexIngestOrchestrator
|
||||
_stateRepository = stateRepository ?? throw new ArgumentNullException(nameof(stateRepository));
|
||||
_normalizerRouter = normalizerRouter ?? throw new ArgumentNullException(nameof(normalizerRouter));
|
||||
_signatureVerifier = signatureVerifier ?? throw new ArgumentNullException(nameof(signatureVerifier));
|
||||
_sessionProvider = sessionProvider ?? throw new ArgumentNullException(nameof(sessionProvider));
|
||||
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
|
||||
@@ -73,6 +77,8 @@ internal sealed class VexIngestOrchestrator : IVexIngestOrchestrator
|
||||
var startedAt = _timeProvider.GetUtcNow();
|
||||
var results = ImmutableArray.CreateBuilder<InitProviderResult>();
|
||||
|
||||
var session = await _sessionProvider.StartSessionAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var (handles, missing) = ResolveConnectors(options.Providers);
|
||||
foreach (var providerId in missing)
|
||||
{
|
||||
@@ -85,7 +91,7 @@ internal sealed class VexIngestOrchestrator : IVexIngestOrchestrator
|
||||
try
|
||||
{
|
||||
await ValidateConnectorAsync(handle, cancellationToken).ConfigureAwait(false);
|
||||
await EnsureProviderRegistrationAsync(handle.Descriptor, cancellationToken).ConfigureAwait(false);
|
||||
await EnsureProviderRegistrationAsync(handle.Descriptor, session, cancellationToken).ConfigureAwait(false);
|
||||
stopwatch.Stop();
|
||||
|
||||
results.Add(new InitProviderResult(
|
||||
@@ -133,6 +139,7 @@ internal sealed class VexIngestOrchestrator : IVexIngestOrchestrator
|
||||
var startedAt = _timeProvider.GetUtcNow();
|
||||
var since = ResolveSince(options.Since, options.Window, startedAt);
|
||||
var results = ImmutableArray.CreateBuilder<ProviderRunResult>();
|
||||
var session = await _sessionProvider.StartSessionAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var (handles, missing) = ResolveConnectors(options.Providers);
|
||||
foreach (var providerId in missing)
|
||||
@@ -142,7 +149,7 @@ internal sealed class VexIngestOrchestrator : IVexIngestOrchestrator
|
||||
|
||||
foreach (var handle in handles)
|
||||
{
|
||||
var result = await ExecuteRunAsync(handle, since, options.Force, cancellationToken).ConfigureAwait(false);
|
||||
var result = await ExecuteRunAsync(handle, since, options.Force, session, cancellationToken).ConfigureAwait(false);
|
||||
results.Add(result);
|
||||
}
|
||||
|
||||
@@ -157,6 +164,7 @@ internal sealed class VexIngestOrchestrator : IVexIngestOrchestrator
|
||||
var runId = Guid.NewGuid();
|
||||
var startedAt = _timeProvider.GetUtcNow();
|
||||
var results = ImmutableArray.CreateBuilder<ProviderRunResult>();
|
||||
var session = await _sessionProvider.StartSessionAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var (handles, missing) = ResolveConnectors(options.Providers);
|
||||
foreach (var providerId in missing)
|
||||
@@ -166,8 +174,8 @@ internal sealed class VexIngestOrchestrator : IVexIngestOrchestrator
|
||||
|
||||
foreach (var handle in handles)
|
||||
{
|
||||
var since = await ResolveResumeSinceAsync(handle.Descriptor.Id, options.Checkpoint, cancellationToken).ConfigureAwait(false);
|
||||
var result = await ExecuteRunAsync(handle, since, force: false, cancellationToken).ConfigureAwait(false);
|
||||
var since = await ResolveResumeSinceAsync(handle.Descriptor.Id, options.Checkpoint, session, cancellationToken).ConfigureAwait(false);
|
||||
var result = await ExecuteRunAsync(handle, since, force: false, session, cancellationToken).ConfigureAwait(false);
|
||||
results.Add(result);
|
||||
}
|
||||
|
||||
@@ -183,6 +191,7 @@ internal sealed class VexIngestOrchestrator : IVexIngestOrchestrator
|
||||
var startedAt = _timeProvider.GetUtcNow();
|
||||
var threshold = options.MaxAge is null ? (DateTimeOffset?)null : startedAt - options.MaxAge.Value;
|
||||
var results = ImmutableArray.CreateBuilder<ReconcileProviderResult>();
|
||||
var session = await _sessionProvider.StartSessionAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var (handles, missing) = ResolveConnectors(options.Providers);
|
||||
foreach (var providerId in missing)
|
||||
@@ -194,14 +203,14 @@ internal sealed class VexIngestOrchestrator : IVexIngestOrchestrator
|
||||
{
|
||||
try
|
||||
{
|
||||
var state = await _stateRepository.GetAsync(handle.Descriptor.Id, cancellationToken).ConfigureAwait(false);
|
||||
var state = await _stateRepository.GetAsync(handle.Descriptor.Id, cancellationToken, session).ConfigureAwait(false);
|
||||
var lastUpdated = state?.LastUpdated;
|
||||
var stale = threshold.HasValue && (lastUpdated is null || lastUpdated < threshold.Value);
|
||||
|
||||
if (stale || state is null)
|
||||
{
|
||||
var since = stale ? threshold : lastUpdated;
|
||||
var result = await ExecuteRunAsync(handle, since, force: false, cancellationToken).ConfigureAwait(false);
|
||||
var result = await ExecuteRunAsync(handle, since, force: false, session, cancellationToken).ConfigureAwait(false);
|
||||
results.Add(new ReconcileProviderResult(
|
||||
handle.Descriptor.Id,
|
||||
result.Status,
|
||||
@@ -262,22 +271,23 @@ internal sealed class VexIngestOrchestrator : IVexIngestOrchestrator
|
||||
await handle.Connector.ValidateAsync(VexConnectorSettings.Empty, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task EnsureProviderRegistrationAsync(VexConnectorDescriptor descriptor, CancellationToken cancellationToken)
|
||||
private async Task EnsureProviderRegistrationAsync(VexConnectorDescriptor descriptor, IClientSessionHandle session, CancellationToken cancellationToken)
|
||||
{
|
||||
var existing = await _providerStore.FindAsync(descriptor.Id, cancellationToken).ConfigureAwait(false);
|
||||
var existing = await _providerStore.FindAsync(descriptor.Id, cancellationToken, session).ConfigureAwait(false);
|
||||
if (existing is not null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var provider = new VexProvider(descriptor.Id, descriptor.DisplayName, descriptor.Kind);
|
||||
await _providerStore.SaveAsync(provider, cancellationToken).ConfigureAwait(false);
|
||||
await _providerStore.SaveAsync(provider, cancellationToken, session).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task<ProviderRunResult> ExecuteRunAsync(
|
||||
ConnectorHandle handle,
|
||||
DateTimeOffset? since,
|
||||
bool force,
|
||||
IClientSessionHandle session,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var providerId = handle.Descriptor.Id;
|
||||
@@ -287,12 +297,12 @@ internal sealed class VexIngestOrchestrator : IVexIngestOrchestrator
|
||||
try
|
||||
{
|
||||
await ValidateConnectorAsync(handle, cancellationToken).ConfigureAwait(false);
|
||||
await EnsureProviderRegistrationAsync(handle.Descriptor, cancellationToken).ConfigureAwait(false);
|
||||
await EnsureProviderRegistrationAsync(handle.Descriptor, session, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (force)
|
||||
{
|
||||
var resetState = new VexConnectorState(providerId, null, ImmutableArray<string>.Empty);
|
||||
await _stateRepository.SaveAsync(resetState, cancellationToken).ConfigureAwait(false);
|
||||
await _stateRepository.SaveAsync(resetState, cancellationToken, session).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
var context = new VexConnectorContext(
|
||||
@@ -316,13 +326,13 @@ internal sealed class VexIngestOrchestrator : IVexIngestOrchestrator
|
||||
if (!batch.Claims.IsDefaultOrEmpty && batch.Claims.Length > 0)
|
||||
{
|
||||
claims += batch.Claims.Length;
|
||||
await _claimStore.AppendAsync(batch.Claims, _timeProvider.GetUtcNow(), cancellationToken).ConfigureAwait(false);
|
||||
await _claimStore.AppendAsync(batch.Claims, _timeProvider.GetUtcNow(), cancellationToken, session).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
stopwatch.Stop();
|
||||
var completedAt = _timeProvider.GetUtcNow();
|
||||
var state = await _stateRepository.GetAsync(providerId, cancellationToken).ConfigureAwait(false);
|
||||
var state = await _stateRepository.GetAsync(providerId, cancellationToken, session).ConfigureAwait(false);
|
||||
|
||||
var checkpoint = state?.DocumentDigests.IsDefaultOrEmpty == false
|
||||
? state.DocumentDigests[^1]
|
||||
@@ -392,7 +402,7 @@ internal sealed class VexIngestOrchestrator : IVexIngestOrchestrator
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<DateTimeOffset?> ResolveResumeSinceAsync(string providerId, string? checkpoint, CancellationToken cancellationToken)
|
||||
private async Task<DateTimeOffset?> ResolveResumeSinceAsync(string providerId, string? checkpoint, IClientSessionHandle session, CancellationToken cancellationToken)
|
||||
{
|
||||
if (!string.IsNullOrWhiteSpace(checkpoint))
|
||||
{
|
||||
@@ -406,14 +416,14 @@ internal sealed class VexIngestOrchestrator : IVexIngestOrchestrator
|
||||
}
|
||||
|
||||
var digest = checkpoint.Trim();
|
||||
var document = await _rawStore.FindByDigestAsync(digest, cancellationToken).ConfigureAwait(false);
|
||||
var document = await _rawStore.FindByDigestAsync(digest, cancellationToken, session).ConfigureAwait(false);
|
||||
if (document is not null)
|
||||
{
|
||||
return document.RetrievedAt;
|
||||
}
|
||||
}
|
||||
|
||||
var state = await _stateRepository.GetAsync(providerId, cancellationToken).ConfigureAwait(false);
|
||||
var state = await _stateRepository.GetAsync(providerId, cancellationToken, session).ConfigureAwait(false);
|
||||
return state?.LastUpdated;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user