494 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
			
		
		
	
	
			494 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
| using System.Collections.Immutable;
 | |
| using System.Globalization;
 | |
| using System.IO;
 | |
| using System.IO.Compression;
 | |
| using System.Security.Cryptography;
 | |
| using System.Linq;
 | |
| using System.Text.Json;
 | |
| using System.Text.Json.Serialization;
 | |
| using System.Xml;
 | |
| using System.Xml.Linq;
 | |
| using Microsoft.Extensions.Logging;
 | |
| using Microsoft.Extensions.Options;
 | |
| using MongoDB.Bson;
 | |
| using StellaOps.Feedser.Normalization.Cvss;
 | |
| using StellaOps.Feedser.Source.Common;
 | |
| using StellaOps.Feedser.Source.Common.Fetch;
 | |
| using StellaOps.Feedser.Source.Ru.Bdu.Configuration;
 | |
| using StellaOps.Feedser.Source.Ru.Bdu.Internal;
 | |
| using StellaOps.Feedser.Storage.Mongo;
 | |
| using StellaOps.Feedser.Storage.Mongo.Advisories;
 | |
| using StellaOps.Feedser.Storage.Mongo.Documents;
 | |
| using StellaOps.Feedser.Storage.Mongo.Dtos;
 | |
| using StellaOps.Plugin;
 | |
| 
 | |
| namespace StellaOps.Feedser.Source.Ru.Bdu;
 | |
| 
 | |
| public sealed class RuBduConnector : IFeedConnector
 | |
| {
 | |
|     private static readonly JsonSerializerOptions SerializerOptions = new(JsonSerializerDefaults.Web)
 | |
|     {
 | |
|         DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
 | |
|         PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
 | |
|         WriteIndented = false,
 | |
|     };
 | |
| 
 | |
|     private readonly SourceFetchService _fetchService;
 | |
|     private readonly RawDocumentStorage _rawDocumentStorage;
 | |
|     private readonly IDocumentStore _documentStore;
 | |
|     private readonly IDtoStore _dtoStore;
 | |
|     private readonly IAdvisoryStore _advisoryStore;
 | |
|     private readonly ISourceStateRepository _stateRepository;
 | |
|     private readonly RuBduOptions _options;
 | |
|     private readonly TimeProvider _timeProvider;
 | |
|     private readonly ILogger<RuBduConnector> _logger;
 | |
| 
 | |
|     private readonly string _cacheDirectory;
 | |
|     private readonly string _archiveCachePath;
 | |
| 
 | |
|     public RuBduConnector(
 | |
|         SourceFetchService fetchService,
 | |
|         RawDocumentStorage rawDocumentStorage,
 | |
|         IDocumentStore documentStore,
 | |
|         IDtoStore dtoStore,
 | |
|         IAdvisoryStore advisoryStore,
 | |
|         ISourceStateRepository stateRepository,
 | |
|         IOptions<RuBduOptions> options,
 | |
|         TimeProvider? timeProvider,
 | |
|         ILogger<RuBduConnector> logger)
 | |
|     {
 | |
|         _fetchService = fetchService ?? throw new ArgumentNullException(nameof(fetchService));
 | |
|         _rawDocumentStorage = rawDocumentStorage ?? throw new ArgumentNullException(nameof(rawDocumentStorage));
 | |
|         _documentStore = documentStore ?? throw new ArgumentNullException(nameof(documentStore));
 | |
|         _dtoStore = dtoStore ?? throw new ArgumentNullException(nameof(dtoStore));
 | |
|         _advisoryStore = advisoryStore ?? throw new ArgumentNullException(nameof(advisoryStore));
 | |
|         _stateRepository = stateRepository ?? throw new ArgumentNullException(nameof(stateRepository));
 | |
|         _options = (options ?? throw new ArgumentNullException(nameof(options))).Value ?? throw new ArgumentNullException(nameof(options));
 | |
|         _options.Validate();
 | |
|         _timeProvider = timeProvider ?? TimeProvider.System;
 | |
|         _logger = logger ?? throw new ArgumentNullException(nameof(logger));
 | |
|         _cacheDirectory = ResolveCacheDirectory(_options.CacheDirectory);
 | |
|         _archiveCachePath = Path.Combine(_cacheDirectory, "vulxml.zip");
 | |
|         EnsureCacheDirectory();
 | |
|     }
 | |
| 
 | |
|     public string SourceName => RuBduConnectorPlugin.SourceName;
 | |
| 
 | |
|     public async Task FetchAsync(IServiceProvider services, CancellationToken cancellationToken)
 | |
|     {
 | |
|         ArgumentNullException.ThrowIfNull(services);
 | |
| 
 | |
|         var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false);
 | |
|         var pendingDocuments = cursor.PendingDocuments.ToHashSet();
 | |
|         var pendingMappings = cursor.PendingMappings.ToHashSet();
 | |
|         var now = _timeProvider.GetUtcNow();
 | |
| 
 | |
|         SourceFetchContentResult archiveResult = default;
 | |
|         byte[]? archiveContent = null;
 | |
|         var usedCache = false;
 | |
| 
 | |
|         try
 | |
|         {
 | |
|             var request = new SourceFetchRequest(RuBduOptions.HttpClientName, SourceName, _options.DataArchiveUri)
 | |
|             {
 | |
|                 AcceptHeaders = new[]
 | |
|                 {
 | |
|                     "application/zip",
 | |
|                     "application/octet-stream",
 | |
|                     "application/x-zip-compressed",
 | |
|                 },
 | |
|                 TimeoutOverride = _options.RequestTimeout,
 | |
|             };
 | |
| 
 | |
|             archiveResult = await _fetchService.FetchContentAsync(request, cancellationToken).ConfigureAwait(false);
 | |
| 
 | |
|             if (archiveResult.IsNotModified)
 | |
|             {
 | |
|                 _logger.LogDebug("RU-BDU archive not modified.");
 | |
|                 await UpdateCursorAsync(cursor.WithLastSuccessfulFetch(now), cancellationToken).ConfigureAwait(false);
 | |
|                 return;
 | |
|             }
 | |
| 
 | |
|             if (archiveResult.IsSuccess && archiveResult.Content is not null)
 | |
|             {
 | |
|                 archiveContent = archiveResult.Content;
 | |
|                 TryWriteCachedArchive(archiveContent);
 | |
|             }
 | |
|         }
 | |
|         catch (Exception ex) when (ex is HttpRequestException or TaskCanceledException)
 | |
|         {
 | |
|             if (TryReadCachedArchive(out var cachedFallback))
 | |
|             {
 | |
|                 _logger.LogWarning(ex, "RU-BDU archive fetch failed; using cached artefact {CachePath}", _archiveCachePath);
 | |
|                 archiveContent = cachedFallback;
 | |
|                 usedCache = true;
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 _logger.LogError(ex, "RU-BDU archive fetch failed for {ArchiveUri}", _options.DataArchiveUri);
 | |
|                 await _stateRepository.MarkFailureAsync(SourceName, now, _options.FailureBackoff, ex.Message, cancellationToken).ConfigureAwait(false);
 | |
|                 throw;
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         if (archiveContent is null)
 | |
|         {
 | |
|             if (TryReadCachedArchive(out var cachedFallback))
 | |
|             {
 | |
|                 _logger.LogWarning("RU-BDU archive unavailable (status={Status}); using cached artefact {CachePath}", archiveResult.StatusCode, _archiveCachePath);
 | |
|                 archiveContent = cachedFallback;
 | |
|                 usedCache = true;
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 _logger.LogWarning("RU-BDU archive fetch returned no content (status={Status})", archiveResult.StatusCode);
 | |
|                 await UpdateCursorAsync(cursor.WithLastSuccessfulFetch(now), cancellationToken).ConfigureAwait(false);
 | |
|                 return;
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         var archiveLastModified = archiveResult.LastModified;
 | |
|         int added;
 | |
|         try
 | |
|         {
 | |
|             added = await ProcessArchiveAsync(archiveContent, now, pendingDocuments, pendingMappings, archiveLastModified, cancellationToken).ConfigureAwait(false);
 | |
|         }
 | |
|         catch (Exception ex)
 | |
|         {
 | |
|             if (!usedCache)
 | |
|             {
 | |
|                 _logger.LogError(ex, "RU-BDU archive processing failed");
 | |
|                 await _stateRepository.MarkFailureAsync(SourceName, now, _options.FailureBackoff, ex.Message, cancellationToken).ConfigureAwait(false);
 | |
|             }
 | |
|             throw;
 | |
|         }
 | |
| 
 | |
|         var updatedCursor = cursor
 | |
|             .WithPendingDocuments(pendingDocuments)
 | |
|             .WithPendingMappings(pendingMappings)
 | |
|             .WithLastSuccessfulFetch(now);
 | |
| 
 | |
|         await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false);
 | |
|     }
 | |
| 
 | |
|     public async Task ParseAsync(IServiceProvider services, CancellationToken cancellationToken)
 | |
|     {
 | |
|         ArgumentNullException.ThrowIfNull(services);
 | |
| 
 | |
|         var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false);
 | |
|         if (cursor.PendingDocuments.Count == 0)
 | |
|         {
 | |
|             return;
 | |
|         }
 | |
| 
 | |
|         var pendingDocuments = cursor.PendingDocuments.ToList();
 | |
|         var pendingMappings = cursor.PendingMappings.ToList();
 | |
| 
 | |
|         foreach (var documentId in cursor.PendingDocuments)
 | |
|         {
 | |
|             cancellationToken.ThrowIfCancellationRequested();
 | |
| 
 | |
|             var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false);
 | |
|             if (document is null)
 | |
|             {
 | |
|                 pendingDocuments.Remove(documentId);
 | |
|                 pendingMappings.Remove(documentId);
 | |
|                 continue;
 | |
|             }
 | |
| 
 | |
|             if (!document.GridFsId.HasValue)
 | |
|             {
 | |
|                 _logger.LogWarning("RU-BDU document {DocumentId} missing GridFS payload", documentId);
 | |
|                 await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
 | |
|                 pendingDocuments.Remove(documentId);
 | |
|                 pendingMappings.Remove(documentId);
 | |
|                 continue;
 | |
|             }
 | |
| 
 | |
|             byte[] payload;
 | |
|             try
 | |
|             {
 | |
|                 payload = await _rawDocumentStorage.DownloadAsync(document.GridFsId.Value, cancellationToken).ConfigureAwait(false);
 | |
|             }
 | |
|             catch (Exception ex)
 | |
|             {
 | |
|                 _logger.LogError(ex, "RU-BDU unable to download raw document {DocumentId}", documentId);
 | |
|                 throw;
 | |
|             }
 | |
| 
 | |
|             RuBduVulnerabilityDto? dto;
 | |
|             try
 | |
|             {
 | |
|                 dto = JsonSerializer.Deserialize<RuBduVulnerabilityDto>(payload, SerializerOptions);
 | |
|             }
 | |
|             catch (Exception ex)
 | |
|             {
 | |
|                 _logger.LogWarning(ex, "RU-BDU failed to deserialize document {DocumentId}", documentId);
 | |
|                 await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
 | |
|                 pendingDocuments.Remove(documentId);
 | |
|                 pendingMappings.Remove(documentId);
 | |
|                 continue;
 | |
|             }
 | |
| 
 | |
|             if (dto is null)
 | |
|             {
 | |
|                 _logger.LogWarning("RU-BDU document {DocumentId} produced null DTO", documentId);
 | |
|                 await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
 | |
|                 pendingDocuments.Remove(documentId);
 | |
|                 pendingMappings.Remove(documentId);
 | |
|                 continue;
 | |
|             }
 | |
| 
 | |
|             var bson = MongoDB.Bson.BsonDocument.Parse(JsonSerializer.Serialize(dto, SerializerOptions));
 | |
|             var dtoRecord = new DtoRecord(Guid.NewGuid(), document.Id, SourceName, "ru-bdu.v1", bson, _timeProvider.GetUtcNow());
 | |
|             await _dtoStore.UpsertAsync(dtoRecord, cancellationToken).ConfigureAwait(false);
 | |
|             await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.PendingMap, cancellationToken).ConfigureAwait(false);
 | |
| 
 | |
|             pendingDocuments.Remove(documentId);
 | |
|             if (!pendingMappings.Contains(documentId))
 | |
|             {
 | |
|                 pendingMappings.Add(documentId);
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         var updatedCursor = cursor
 | |
|             .WithPendingDocuments(pendingDocuments)
 | |
|             .WithPendingMappings(pendingMappings);
 | |
| 
 | |
|         await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false);
 | |
|     }
 | |
| 
 | |
|     public async Task MapAsync(IServiceProvider services, CancellationToken cancellationToken)
 | |
|     {
 | |
|         ArgumentNullException.ThrowIfNull(services);
 | |
| 
 | |
|         var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false);
 | |
|         if (cursor.PendingMappings.Count == 0)
 | |
|         {
 | |
|             return;
 | |
|         }
 | |
| 
 | |
|         var pendingMappings = cursor.PendingMappings.ToList();
 | |
| 
 | |
|         foreach (var documentId in cursor.PendingMappings)
 | |
|         {
 | |
|             cancellationToken.ThrowIfCancellationRequested();
 | |
| 
 | |
|             var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false);
 | |
|             if (document is null)
 | |
|             {
 | |
|                 pendingMappings.Remove(documentId);
 | |
|                 continue;
 | |
|             }
 | |
| 
 | |
|             var dtoRecord = await _dtoStore.FindByDocumentIdAsync(documentId, cancellationToken).ConfigureAwait(false);
 | |
|             if (dtoRecord is null)
 | |
|             {
 | |
|                 _logger.LogWarning("RU-BDU document {DocumentId} missing DTO payload", documentId);
 | |
|                 await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
 | |
|                 pendingMappings.Remove(documentId);
 | |
|                 continue;
 | |
|             }
 | |
| 
 | |
|             RuBduVulnerabilityDto dto;
 | |
|             try
 | |
|             {
 | |
|                 dto = JsonSerializer.Deserialize<RuBduVulnerabilityDto>(dtoRecord.Payload.ToString(), SerializerOptions) ?? throw new InvalidOperationException("DTO deserialized to null");
 | |
|             }
 | |
|             catch (Exception ex)
 | |
|             {
 | |
|                 _logger.LogError(ex, "RU-BDU failed to deserialize DTO for document {DocumentId}", documentId);
 | |
|                 await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
 | |
|                 pendingMappings.Remove(documentId);
 | |
|                 continue;
 | |
|             }
 | |
| 
 | |
|             try
 | |
|             {
 | |
|                 var advisory = RuBduMapper.Map(dto, document, dtoRecord.ValidatedAt);
 | |
|                 await _advisoryStore.UpsertAsync(advisory, cancellationToken).ConfigureAwait(false);
 | |
|                 await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Mapped, cancellationToken).ConfigureAwait(false);
 | |
|                 pendingMappings.Remove(documentId);
 | |
|             }
 | |
|             catch (Exception ex)
 | |
|             {
 | |
|                 _logger.LogError(ex, "RU-BDU mapping failed for document {DocumentId}", documentId);
 | |
|                 await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
 | |
|                 pendingMappings.Remove(documentId);
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         var updatedCursor = cursor.WithPendingMappings(pendingMappings);
 | |
|         await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false);
 | |
|     }
 | |
| 
 | |
|     private async Task<int> ProcessArchiveAsync(
 | |
|         byte[] archiveContent,
 | |
|         DateTimeOffset now,
 | |
|         HashSet<Guid> pendingDocuments,
 | |
|         HashSet<Guid> pendingMappings,
 | |
|         DateTimeOffset? archiveLastModified,
 | |
|         CancellationToken cancellationToken)
 | |
|     {
 | |
|         var added = 0;
 | |
|         using var archiveStream = new MemoryStream(archiveContent, writable: false);
 | |
|         using var archive = new ZipArchive(archiveStream, ZipArchiveMode.Read, leaveOpen: false);
 | |
|         var entry = archive.GetEntry("export/export.xml") ?? archive.Entries.FirstOrDefault();
 | |
|         if (entry is null)
 | |
|         {
 | |
|             _logger.LogWarning("RU-BDU archive does not contain export/export.xml; skipping.");
 | |
|             return added;
 | |
|         }
 | |
| 
 | |
|         await using var entryStream = entry.Open();
 | |
|         using var reader = XmlReader.Create(entryStream, new XmlReaderSettings
 | |
|         {
 | |
|             IgnoreComments = true,
 | |
|             IgnoreWhitespace = true,
 | |
|             DtdProcessing = DtdProcessing.Ignore,
 | |
|             CloseInput = false,
 | |
|         });
 | |
| 
 | |
|         while (reader.Read())
 | |
|         {
 | |
|             cancellationToken.ThrowIfCancellationRequested();
 | |
|             if (reader.NodeType != XmlNodeType.Element || !reader.Name.Equals("vul", StringComparison.OrdinalIgnoreCase))
 | |
|             {
 | |
|                 continue;
 | |
|             }
 | |
| 
 | |
|             if (RuBduXmlParser.TryParse(XNode.ReadFrom(reader) as XElement ?? new XElement("vul")) is not { } dto)
 | |
|             {
 | |
|                 continue;
 | |
|             }
 | |
| 
 | |
|             var payload = JsonSerializer.SerializeToUtf8Bytes(dto, SerializerOptions);
 | |
|             var sha = Convert.ToHexString(SHA256.HashData(payload)).ToLowerInvariant();
 | |
|             var documentUri = BuildDocumentUri(dto.Identifier);
 | |
| 
 | |
|             var existing = await _documentStore.FindBySourceAndUriAsync(SourceName, documentUri, cancellationToken).ConfigureAwait(false);
 | |
|             if (existing is not null && string.Equals(existing.Sha256, sha, StringComparison.OrdinalIgnoreCase))
 | |
|             {
 | |
|                 continue;
 | |
|             }
 | |
| 
 | |
|             var gridFsId = await _rawDocumentStorage.UploadAsync(SourceName, documentUri, payload, "application/json", null, cancellationToken).ConfigureAwait(false);
 | |
| 
 | |
|             var metadata = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase)
 | |
|             {
 | |
|                 ["ru-bdu.identifier"] = dto.Identifier,
 | |
|             };
 | |
| 
 | |
|             if (!string.IsNullOrWhiteSpace(dto.Name))
 | |
|             {
 | |
|                 metadata["ru-bdu.name"] = dto.Name!;
 | |
|             }
 | |
| 
 | |
|             var recordId = existing?.Id ?? Guid.NewGuid();
 | |
|             var record = new DocumentRecord(
 | |
|                 recordId,
 | |
|                 SourceName,
 | |
|                 documentUri,
 | |
|                 now,
 | |
|                 sha,
 | |
|                 DocumentStatuses.PendingParse,
 | |
|                 "application/json",
 | |
|                 Headers: null,
 | |
|                 Metadata: metadata,
 | |
|                 Etag: null,
 | |
|                 LastModified: archiveLastModified ?? dto.IdentifyDate,
 | |
|                 GridFsId: gridFsId,
 | |
|                 ExpiresAt: null);
 | |
| 
 | |
|             var upserted = await _documentStore.UpsertAsync(record, cancellationToken).ConfigureAwait(false);
 | |
|             pendingDocuments.Add(upserted.Id);
 | |
|             pendingMappings.Remove(upserted.Id);
 | |
|             added++;
 | |
| 
 | |
|             if (added >= _options.MaxVulnerabilitiesPerFetch)
 | |
|             {
 | |
|                 break;
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         return added;
 | |
|     }
 | |
| 
 | |
|     private string ResolveCacheDirectory(string? configuredPath)
 | |
|     {
 | |
|         if (!string.IsNullOrWhiteSpace(configuredPath))
 | |
|         {
 | |
|             return Path.GetFullPath(Path.IsPathRooted(configuredPath)
 | |
|                 ? configuredPath
 | |
|                 : Path.Combine(AppContext.BaseDirectory, configuredPath));
 | |
|         }
 | |
| 
 | |
|         return Path.Combine(AppContext.BaseDirectory, "cache", RuBduConnectorPlugin.SourceName);
 | |
|     }
 | |
| 
 | |
|     private void EnsureCacheDirectory()
 | |
|     {
 | |
|         try
 | |
|         {
 | |
|             Directory.CreateDirectory(_cacheDirectory);
 | |
|         }
 | |
|         catch (Exception ex)
 | |
|         {
 | |
|             _logger.LogWarning(ex, "RU-BDU unable to ensure cache directory {CachePath}", _cacheDirectory);
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     private void TryWriteCachedArchive(byte[] content)
 | |
|     {
 | |
|         try
 | |
|         {
 | |
|             Directory.CreateDirectory(Path.GetDirectoryName(_archiveCachePath)!);
 | |
|             File.WriteAllBytes(_archiveCachePath, content);
 | |
|         }
 | |
|         catch (Exception ex)
 | |
|         {
 | |
|             _logger.LogDebug(ex, "RU-BDU failed to write cache archive {CachePath}", _archiveCachePath);
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     private bool TryReadCachedArchive(out byte[] content)
 | |
|     {
 | |
|         try
 | |
|         {
 | |
|             if (File.Exists(_archiveCachePath))
 | |
|             {
 | |
|                 content = File.ReadAllBytes(_archiveCachePath);
 | |
|                 return true;
 | |
|             }
 | |
|         }
 | |
|         catch (Exception ex)
 | |
|         {
 | |
|             _logger.LogDebug(ex, "RU-BDU failed to read cache archive {CachePath}", _archiveCachePath);
 | |
|         }
 | |
| 
 | |
|         content = Array.Empty<byte>();
 | |
|         return false;
 | |
|     }
 | |
| 
 | |
|     private static string BuildDocumentUri(string identifier)
 | |
|     {
 | |
|         var slug = identifier.Contains(':', StringComparison.Ordinal)
 | |
|             ? identifier[(identifier.IndexOf(':') + 1)..]
 | |
|             : identifier;
 | |
|         return $"https://bdu.fstec.ru/vul/{slug}";
 | |
|     }
 | |
| 
 | |
|     private async Task<RuBduCursor> GetCursorAsync(CancellationToken cancellationToken)
 | |
|     {
 | |
|         var state = await _stateRepository.TryGetAsync(SourceName, cancellationToken).ConfigureAwait(false);
 | |
|         return state is null ? RuBduCursor.Empty : RuBduCursor.FromBson(state.Cursor);
 | |
|     }
 | |
| 
 | |
|     private Task UpdateCursorAsync(RuBduCursor cursor, CancellationToken cancellationToken)
 | |
|     {
 | |
|         var document = cursor.ToBsonDocument();
 | |
|         var completedAt = cursor.LastSuccessfulFetch ?? _timeProvider.GetUtcNow();
 | |
|         return _stateRepository.UpdateCursorAsync(SourceName, document, completedAt, cancellationToken);
 | |
|     }
 | |
| }
 |