using System.Collections.Generic; using System.Collections.Immutable; using System.IO; using System.IO.Compression; using System.Linq; using System.Net; using System.Net.Http; using System.Runtime.CompilerServices; using System.Text; using System.Text.Json; using System.Text.Json.Serialization; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StellaOps.Excititor.Connectors.Abstractions; using StellaOps.Excititor.Connectors.MSRC.CSAF.Authentication; using StellaOps.Excititor.Connectors.MSRC.CSAF.Configuration; using StellaOps.Excititor.Core; using StellaOps.Excititor.Storage.Mongo; namespace StellaOps.Excititor.Connectors.MSRC.CSAF; public sealed class MsrcCsafConnector : VexConnectorBase { private const string QuarantineMetadataKey = "excititor.quarantine.reason"; private const string FormatMetadataKey = "msrc.csaf.format"; private const string VulnerabilityMetadataKey = "msrc.vulnerabilityId"; private const string AdvisoryIdMetadataKey = "msrc.advisoryId"; private const string LastModifiedMetadataKey = "msrc.lastModified"; private const string ReleaseDateMetadataKey = "msrc.releaseDate"; private const string CvssSeverityMetadataKey = "msrc.severity"; private const string CvrfUrlMetadataKey = "msrc.cvrfUrl"; private static readonly VexConnectorDescriptor DescriptorInstance = new( id: "excititor:msrc", kind: VexProviderKind.Vendor, displayName: "Microsoft MSRC CSAF") { Description = "Authenticated connector for Microsoft Security Response Center CSAF advisories.", SupportedFormats = ImmutableArray.Create(VexDocumentFormat.Csaf), Tags = ImmutableArray.Create("microsoft", "csaf", "vendor"), }; private readonly IHttpClientFactory _httpClientFactory; private readonly IMsrcTokenProvider _tokenProvider; private readonly IVexConnectorStateRepository _stateRepository; private readonly IOptions _options; private readonly ILogger _logger; private readonly JsonSerializerOptions _serializerOptions = new(JsonSerializerDefaults.Web) { PropertyNameCaseInsensitive = true, ReadCommentHandling = JsonCommentHandling.Skip, }; private MsrcConnectorOptions? _validatedOptions; public MsrcCsafConnector( IHttpClientFactory httpClientFactory, IMsrcTokenProvider tokenProvider, IVexConnectorStateRepository stateRepository, IOptions options, ILogger logger, TimeProvider timeProvider) : base(DescriptorInstance, logger, timeProvider) { _httpClientFactory = httpClientFactory ?? throw new ArgumentNullException(nameof(httpClientFactory)); _tokenProvider = tokenProvider ?? throw new ArgumentNullException(nameof(tokenProvider)); _stateRepository = stateRepository ?? throw new ArgumentNullException(nameof(stateRepository)); _options = options ?? throw new ArgumentNullException(nameof(options)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public override ValueTask ValidateAsync(VexConnectorSettings settings, CancellationToken cancellationToken) { var options = _options.Value ?? throw new InvalidOperationException("MSRC connector options were not registered."); options.Validate(); _validatedOptions = options; LogConnectorEvent( LogLevel.Information, "validate", "Validated MSRC CSAF connector options.", new Dictionary { ["baseUri"] = options.BaseUri.ToString(), ["locale"] = options.Locale, ["apiVersion"] = options.ApiVersion, ["pageSize"] = options.PageSize, ["maxAdvisories"] = options.MaxAdvisoriesPerFetch, }); return ValueTask.CompletedTask; } public override async IAsyncEnumerable FetchAsync( VexConnectorContext context, [EnumeratorCancellation] CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(context); var options = EnsureOptionsValidated(); var state = await _stateRepository.GetAsync(Descriptor.Id, cancellationToken).ConfigureAwait(false); var (from, to) = CalculateWindow(context.Since, state, options); LogConnectorEvent( LogLevel.Information, "fetch.window", $"Fetching MSRC CSAF advisories updated between {from:O} and {to:O}.", new Dictionary { ["from"] = from, ["to"] = to, ["cursorOverlapSeconds"] = options.CursorOverlap.TotalSeconds, }); var client = await CreateAuthenticatedClientAsync(options, cancellationToken).ConfigureAwait(false); var knownDigests = state?.DocumentDigests ?? ImmutableArray.Empty; var digestSet = new HashSet(knownDigests, StringComparer.OrdinalIgnoreCase); var digestList = new List(knownDigests); var latest = state?.LastUpdated ?? from; var fetched = 0; var stateChanged = false; await foreach (var summary in EnumerateSummariesAsync(client, options, from, to, cancellationToken).ConfigureAwait(false)) { cancellationToken.ThrowIfCancellationRequested(); if (fetched >= options.MaxAdvisoriesPerFetch) { break; } if (string.IsNullOrWhiteSpace(summary.CvrfUrl)) { LogConnectorEvent(LogLevel.Debug, "skip.no-cvrf", $"Skipping MSRC advisory {summary.Id} because no CSAF URL was provided."); continue; } var documentUri = ResolveCvrfUri(options.BaseUri, summary.CvrfUrl); VexRawDocument? rawDocument = null; try { rawDocument = await DownloadCsafAsync(client, summary, documentUri, options, cancellationToken).ConfigureAwait(false); } catch (Exception ex) when (ex is not OperationCanceledException) { LogConnectorEvent(LogLevel.Warning, "fetch.error", $"Failed to download MSRC CSAF package {documentUri}.", new Dictionary { ["advisoryId"] = summary.Id, ["vulnerabilityId"] = summary.VulnerabilityId ?? summary.Id, }, ex); await Task.Delay(GetRetryDelay(options, 1), cancellationToken).ConfigureAwait(false); continue; } if (!digestSet.Add(rawDocument.Digest)) { LogConnectorEvent(LogLevel.Debug, "skip.duplicate", $"Skipping MSRC CSAF package {documentUri} because it was already processed."); continue; } await context.RawSink.StoreAsync(rawDocument, cancellationToken).ConfigureAwait(false); digestList.Add(rawDocument.Digest); stateChanged = true; fetched++; latest = DetermineLatest(summary, latest) ?? latest; var quarantineReason = rawDocument.Metadata.TryGetValue(QuarantineMetadataKey, out var reason) ? reason : null; if (quarantineReason is not null) { LogConnectorEvent(LogLevel.Warning, "quarantine", $"Quarantined MSRC CSAF package {documentUri} ({quarantineReason})."); continue; } yield return rawDocument; if (options.RequestDelay > TimeSpan.Zero) { await Task.Delay(options.RequestDelay, cancellationToken).ConfigureAwait(false); } } if (stateChanged) { if (digestList.Count > options.MaxTrackedDigests) { var trimmed = digestList.Count - options.MaxTrackedDigests; digestList.RemoveRange(0, trimmed); } var baseState = state ?? new VexConnectorState( Descriptor.Id, null, ImmutableArray.Empty, ImmutableDictionary.Empty, null, 0, null, null); var newState = baseState with { LastUpdated = latest == DateTimeOffset.MinValue ? state?.LastUpdated : latest, DocumentDigests = digestList.ToImmutableArray(), }; await _stateRepository.SaveAsync(newState, cancellationToken).ConfigureAwait(false); } LogConnectorEvent( LogLevel.Information, "fetch.completed", $"MSRC CSAF fetch completed with {fetched} new documents.", new Dictionary { ["fetched"] = fetched, ["stateChanged"] = stateChanged, ["lastUpdated"] = latest, }); } public override ValueTask NormalizeAsync(VexRawDocument document, CancellationToken cancellationToken) => throw new NotSupportedException("MSRC CSAF connector relies on CSAF normalizers for document processing."); private async Task DownloadCsafAsync( HttpClient client, MsrcVulnerabilitySummary summary, Uri documentUri, MsrcConnectorOptions options, CancellationToken cancellationToken) { using var response = await SendWithRetryAsync( client, () => new HttpRequestMessage(HttpMethod.Get, documentUri), options, cancellationToken).ConfigureAwait(false); var payload = await response.Content.ReadAsByteArrayAsync(cancellationToken).ConfigureAwait(false); var validation = ValidateCsafPayload(payload); var metadata = BuildMetadata(builder => { builder.Add(AdvisoryIdMetadataKey, summary.Id); builder.Add(VulnerabilityMetadataKey, summary.VulnerabilityId ?? summary.Id); builder.Add(CvrfUrlMetadataKey, documentUri.ToString()); builder.Add(FormatMetadataKey, validation.Format); if (!string.IsNullOrWhiteSpace(summary.Severity)) { builder.Add(CvssSeverityMetadataKey, summary.Severity); } if (summary.LastModifiedDate is not null) { builder.Add(LastModifiedMetadataKey, summary.LastModifiedDate.Value.ToString("O")); } if (summary.ReleaseDate is not null) { builder.Add(ReleaseDateMetadataKey, summary.ReleaseDate.Value.ToString("O")); } if (!string.IsNullOrWhiteSpace(validation.QuarantineReason)) { builder.Add(QuarantineMetadataKey, validation.QuarantineReason); } if (response.Headers.ETag is not null) { builder.Add("http.etag", response.Headers.ETag.Tag); } if (response.Content.Headers.LastModified is { } lastModified) { builder.Add("http.lastModified", lastModified.ToString("O")); } }); return CreateRawDocument(VexDocumentFormat.Csaf, documentUri, payload, metadata); } private async Task CreateAuthenticatedClientAsync(MsrcConnectorOptions options, CancellationToken cancellationToken) { var token = await _tokenProvider.GetAccessTokenAsync(cancellationToken).ConfigureAwait(false); var client = _httpClientFactory.CreateClient(MsrcConnectorOptions.ApiClientName); client.DefaultRequestHeaders.Remove("Authorization"); client.DefaultRequestHeaders.Add("Authorization", $"{token.Type} {token.Value}"); client.DefaultRequestHeaders.Remove("Accept-Language"); client.DefaultRequestHeaders.Add("Accept-Language", options.Locale); client.DefaultRequestHeaders.Remove("api-version"); client.DefaultRequestHeaders.Add("api-version", options.ApiVersion); client.DefaultRequestHeaders.Remove("Accept"); client.DefaultRequestHeaders.Add("Accept", "application/json"); return client; } private async Task SendWithRetryAsync( HttpClient client, Func requestFactory, MsrcConnectorOptions options, CancellationToken cancellationToken) { Exception? lastError = null; HttpResponseMessage? response = null; for (var attempt = 1; attempt <= options.MaxRetryAttempts; attempt++) { response?.Dispose(); using var request = requestFactory(); try { response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); if (response.IsSuccessStatusCode) { return response; } if (!ShouldRetry(response.StatusCode) || attempt == options.MaxRetryAttempts) { response.EnsureSuccessStatusCode(); } } catch (Exception ex) when (IsTransient(ex) && attempt < options.MaxRetryAttempts) { lastError = ex; LogConnectorEvent(LogLevel.Warning, "retry", $"Retrying MSRC request (attempt {attempt}/{options.MaxRetryAttempts}).", exception: ex); } catch (Exception) { response?.Dispose(); throw; } await Task.Delay(GetRetryDelay(options, attempt), cancellationToken).ConfigureAwait(false); } response?.Dispose(); throw lastError ?? new InvalidOperationException("MSRC request retries exhausted."); } private TimeSpan GetRetryDelay(MsrcConnectorOptions options, int attempt) { var baseDelay = options.RetryBaseDelay.TotalMilliseconds; var multiplier = Math.Pow(2, Math.Max(0, attempt - 1)); var jitter = Random.Shared.NextDouble() * baseDelay * 0.25; var delayMs = Math.Min(baseDelay * multiplier + jitter, TimeSpan.FromMinutes(5).TotalMilliseconds); return TimeSpan.FromMilliseconds(delayMs); } private async IAsyncEnumerable EnumerateSummariesAsync( HttpClient client, MsrcConnectorOptions options, DateTimeOffset from, DateTimeOffset to, [EnumeratorCancellation] CancellationToken cancellationToken) { var fetched = 0; var requestUri = BuildSummaryUri(options, from, to); while (requestUri is not null && fetched < options.MaxAdvisoriesPerFetch) { using var response = await SendWithRetryAsync( client, () => new HttpRequestMessage(HttpMethod.Get, requestUri), options, cancellationToken).ConfigureAwait(false); await using var stream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); var payload = await JsonSerializer.DeserializeAsync(stream, _serializerOptions, cancellationToken).ConfigureAwait(false) ?? new MsrcSummaryResponse(); foreach (var summary in payload.Value) { if (string.IsNullOrWhiteSpace(summary.CvrfUrl)) { continue; } yield return summary; fetched++; if (fetched >= options.MaxAdvisoriesPerFetch) { yield break; } } if (string.IsNullOrWhiteSpace(payload.NextLink)) { break; } if (!Uri.TryCreate(payload.NextLink, UriKind.Absolute, out requestUri)) { LogConnectorEvent(LogLevel.Warning, "pagination.invalid", $"MSRC pagination returned invalid next link '{payload.NextLink}'."); break; } } } private static Uri BuildSummaryUri(MsrcConnectorOptions options, DateTimeOffset from, DateTimeOffset to) { var baseText = options.BaseUri.ToString().TrimEnd('/'); var builder = new StringBuilder(baseText.Length + 128); builder.Append(baseText); if (!baseText.EndsWith("/vulnerabilities", StringComparison.OrdinalIgnoreCase)) { builder.Append("/vulnerabilities"); } builder.Append("?"); builder.Append("$top=").Append(options.PageSize); builder.Append("&lastModifiedStartDateTime=").Append(Uri.EscapeDataString(from.ToUniversalTime().ToString("O"))); builder.Append("&lastModifiedEndDateTime=").Append(Uri.EscapeDataString(to.ToUniversalTime().ToString("O"))); builder.Append("&$orderby=lastModifiedDate"); builder.Append("&locale=").Append(Uri.EscapeDataString(options.Locale)); builder.Append("&api-version=").Append(Uri.EscapeDataString(options.ApiVersion)); return new Uri(builder.ToString(), UriKind.Absolute); } private (DateTimeOffset From, DateTimeOffset To) CalculateWindow( DateTimeOffset? contextSince, VexConnectorState? state, MsrcConnectorOptions options) { var now = UtcNow(); var since = contextSince ?? state?.LastUpdated ?? options.InitialLastModified ?? now.AddDays(-30); if (state?.LastUpdated is { } persisted && persisted > since) { since = persisted; } if (options.CursorOverlap > TimeSpan.Zero) { since = since.Add(-options.CursorOverlap); } if (since < now.AddYears(-20)) { since = now.AddYears(-20); } return (since, now); } private static bool ShouldRetry(HttpStatusCode statusCode) => statusCode == HttpStatusCode.TooManyRequests || (int)statusCode >= 500; private static bool IsTransient(Exception exception) => exception is HttpRequestException or IOException or TaskCanceledException; private static Uri ResolveCvrfUri(Uri baseUri, string cvrfUrl) => Uri.TryCreate(cvrfUrl, UriKind.Absolute, out var absolute) ? absolute : new Uri(baseUri, cvrfUrl); private static CsafValidationResult ValidateCsafPayload(ReadOnlyMemory payload) { try { if (IsZip(payload.Span)) { using var zipStream = new MemoryStream(payload.ToArray(), writable: false); using var archive = new ZipArchive(zipStream, ZipArchiveMode.Read, leaveOpen: true); var entry = archive.Entries.FirstOrDefault(e => e.Name.EndsWith(".json", StringComparison.OrdinalIgnoreCase)) ?? archive.Entries.FirstOrDefault(); if (entry is null) { return new CsafValidationResult("zip", "Zip archive did not contain any entries."); } using var entryStream = entry.Open(); using var reader = new StreamReader(entryStream, Encoding.UTF8); using var json = JsonDocument.Parse(reader.ReadToEnd()); return CsafValidationResult.Valid("zip"); } if (IsGzip(payload.Span)) { using var input = new MemoryStream(payload.ToArray(), writable: false); using var gzip = new GZipStream(input, CompressionMode.Decompress); using var reader = new StreamReader(gzip, Encoding.UTF8); using var json = JsonDocument.Parse(reader.ReadToEnd()); return CsafValidationResult.Valid("gzip"); } using var jsonDocument = JsonDocument.Parse(payload); return CsafValidationResult.Valid("json"); } catch (JsonException ex) { return new CsafValidationResult("json", $"JSON parse failed: {ex.Message}"); } catch (InvalidDataException ex) { return new CsafValidationResult("invalid", ex.Message); } catch (EndOfStreamException ex) { return new CsafValidationResult("invalid", ex.Message); } } private static bool IsZip(ReadOnlySpan content) => content.Length > 3 && content[0] == 0x50 && content[1] == 0x4B; private static bool IsGzip(ReadOnlySpan content) => content.Length > 2 && content[0] == 0x1F && content[1] == 0x8B; private static DateTimeOffset? DetermineLatest(MsrcVulnerabilitySummary summary, DateTimeOffset? current) { var candidate = summary.LastModifiedDate ?? summary.ReleaseDate; if (candidate is null) { return current; } if (current is null || candidate > current) { return candidate; } return current; } private MsrcConnectorOptions EnsureOptionsValidated() { if (_validatedOptions is not null) { return _validatedOptions; } var options = _options.Value ?? throw new InvalidOperationException("MSRC connector options were not registered."); options.Validate(); _validatedOptions = options; return options; } private sealed record CsafValidationResult(string Format, string? QuarantineReason) { public static CsafValidationResult Valid(string format) => new(format, null); } } internal sealed record MsrcSummaryResponse { [JsonPropertyName("value")] public List Value { get; init; } = new(); [JsonPropertyName("@odata.nextLink")] public string? NextLink { get; init; } } internal sealed record MsrcVulnerabilitySummary { [JsonPropertyName("id")] public string Id { get; init; } = string.Empty; [JsonPropertyName("vulnerabilityId")] public string? VulnerabilityId { get; init; } [JsonPropertyName("severity")] public string? Severity { get; init; } [JsonPropertyName("releaseDate")] public DateTimeOffset? ReleaseDate { get; init; } [JsonPropertyName("lastModifiedDate")] public DateTimeOffset? LastModifiedDate { get; init; } [JsonPropertyName("cvrfUrl")] public string? CvrfUrl { get; init; } }