using System; using System.Collections.Generic; using System.IO; using System.IO.Compression; using System.Linq; using System.Net; using System.Net.Http; using System.Security.Cryptography; using System.Text.Json; using System.Text.Json.Serialization; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using MongoDB.Bson; using MongoDB.Bson.IO; using StellaOps.Feedser.Models; using StellaOps.Feedser.Models; using StellaOps.Feedser.Source.Common; using StellaOps.Feedser.Source.Common.Fetch; using StellaOps.Feedser.Source.Osv.Configuration; using StellaOps.Feedser.Source.Osv.Internal; using StellaOps.Feedser.Storage.Mongo; using StellaOps.Feedser.Storage.Mongo.Advisories; using StellaOps.Feedser.Storage.Mongo.Documents; using StellaOps.Feedser.Storage.Mongo.Dtos; using StellaOps.Plugin; namespace StellaOps.Feedser.Source.Osv; public sealed class OsvConnector : IFeedConnector { private static readonly JsonSerializerOptions SerializerOptions = new(JsonSerializerDefaults.Web) { DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, PropertyNameCaseInsensitive = true, }; private readonly IHttpClientFactory _httpClientFactory; private readonly RawDocumentStorage _rawDocumentStorage; private readonly IDocumentStore _documentStore; private readonly IDtoStore _dtoStore; private readonly IAdvisoryStore _advisoryStore; private readonly ISourceStateRepository _stateRepository; private readonly OsvOptions _options; private readonly TimeProvider _timeProvider; private readonly ILogger _logger; public OsvConnector( IHttpClientFactory httpClientFactory, RawDocumentStorage rawDocumentStorage, IDocumentStore documentStore, IDtoStore dtoStore, IAdvisoryStore advisoryStore, ISourceStateRepository stateRepository, IOptions options, TimeProvider? timeProvider, ILogger logger) { _httpClientFactory = httpClientFactory ?? throw new ArgumentNullException(nameof(httpClientFactory)); _rawDocumentStorage = rawDocumentStorage ?? throw new ArgumentNullException(nameof(rawDocumentStorage)); _documentStore = documentStore ?? throw new ArgumentNullException(nameof(documentStore)); _dtoStore = dtoStore ?? throw new ArgumentNullException(nameof(dtoStore)); _advisoryStore = advisoryStore ?? throw new ArgumentNullException(nameof(advisoryStore)); _stateRepository = stateRepository ?? throw new ArgumentNullException(nameof(stateRepository)); _options = (options ?? throw new ArgumentNullException(nameof(options))).Value ?? throw new ArgumentNullException(nameof(options)); _options.Validate(); _timeProvider = timeProvider ?? TimeProvider.System; _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public string SourceName => OsvConnectorPlugin.SourceName; public async Task FetchAsync(IServiceProvider services, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(services); var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false); var now = _timeProvider.GetUtcNow(); var pendingDocuments = cursor.PendingDocuments.ToHashSet(); var cursorState = cursor; var remainingCapacity = _options.MaxAdvisoriesPerFetch; foreach (var ecosystem in _options.Ecosystems) { if (remainingCapacity <= 0) { break; } cancellationToken.ThrowIfCancellationRequested(); try { var result = await FetchEcosystemAsync( ecosystem, cursorState, pendingDocuments, now, remainingCapacity, cancellationToken).ConfigureAwait(false); cursorState = result.Cursor; remainingCapacity -= result.NewDocuments; } catch (Exception ex) { _logger.LogError(ex, "OSV fetch failed for ecosystem {Ecosystem}", ecosystem); await _stateRepository.MarkFailureAsync(SourceName, now, TimeSpan.FromMinutes(10), ex.Message, cancellationToken).ConfigureAwait(false); throw; } } cursorState = cursorState .WithPendingDocuments(pendingDocuments) .WithPendingMappings(cursor.PendingMappings); await UpdateCursorAsync(cursorState, cancellationToken).ConfigureAwait(false); } public async Task ParseAsync(IServiceProvider services, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(services); var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false); if (cursor.PendingDocuments.Count == 0) { return; } var remainingDocuments = cursor.PendingDocuments.ToList(); var pendingMappings = cursor.PendingMappings.ToList(); foreach (var documentId in cursor.PendingDocuments) { cancellationToken.ThrowIfCancellationRequested(); var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false); if (document is null) { remainingDocuments.Remove(documentId); continue; } if (!document.GridFsId.HasValue) { _logger.LogWarning("OSV document {DocumentId} missing GridFS content", document.Id); await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false); remainingDocuments.Remove(documentId); continue; } byte[] bytes; try { bytes = await _rawDocumentStorage.DownloadAsync(document.GridFsId.Value, cancellationToken).ConfigureAwait(false); } catch (Exception ex) { _logger.LogError(ex, "Unable to download OSV raw document {DocumentId}", document.Id); throw; } OsvVulnerabilityDto? dto; try { dto = JsonSerializer.Deserialize(bytes, SerializerOptions); } catch (Exception ex) { _logger.LogWarning(ex, "Failed to deserialize OSV document {DocumentId} ({Uri})", document.Id, document.Uri); await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false); remainingDocuments.Remove(documentId); continue; } if (dto is null || string.IsNullOrWhiteSpace(dto.Id)) { _logger.LogWarning("OSV document {DocumentId} produced empty payload", document.Id); await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false); remainingDocuments.Remove(documentId); continue; } var sanitized = JsonSerializer.Serialize(dto, SerializerOptions); var payload = MongoDB.Bson.BsonDocument.Parse(sanitized); var dtoRecord = new DtoRecord( Guid.NewGuid(), document.Id, SourceName, "osv.v1", payload, _timeProvider.GetUtcNow()); await _dtoStore.UpsertAsync(dtoRecord, cancellationToken).ConfigureAwait(false); await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.PendingMap, cancellationToken).ConfigureAwait(false); remainingDocuments.Remove(documentId); if (!pendingMappings.Contains(documentId)) { pendingMappings.Add(documentId); } } var updatedCursor = cursor .WithPendingDocuments(remainingDocuments) .WithPendingMappings(pendingMappings); await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false); } public async Task MapAsync(IServiceProvider services, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(services); var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false); if (cursor.PendingMappings.Count == 0) { return; } var pendingMappings = cursor.PendingMappings.ToList(); foreach (var documentId in cursor.PendingMappings) { cancellationToken.ThrowIfCancellationRequested(); var dto = await _dtoStore.FindByDocumentIdAsync(documentId, cancellationToken).ConfigureAwait(false); var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false); if (dto is null || document is null) { pendingMappings.Remove(documentId); continue; } var payloadJson = dto.Payload.ToJson(new JsonWriterSettings { OutputMode = JsonOutputMode.RelaxedExtendedJson, }); OsvVulnerabilityDto? osvDto; try { osvDto = JsonSerializer.Deserialize(payloadJson, SerializerOptions); } catch (Exception ex) { _logger.LogError(ex, "Failed to deserialize OSV DTO for document {DocumentId}", document.Id); await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false); pendingMappings.Remove(documentId); continue; } if (osvDto is null || string.IsNullOrWhiteSpace(osvDto.Id)) { await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false); pendingMappings.Remove(documentId); continue; } var ecosystem = document.Metadata is not null && document.Metadata.TryGetValue("osv.ecosystem", out var ecosystemValue) ? ecosystemValue : "unknown"; var advisory = OsvMapper.Map(osvDto, document, dto, ecosystem); await _advisoryStore.UpsertAsync(advisory, cancellationToken).ConfigureAwait(false); await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Mapped, cancellationToken).ConfigureAwait(false); pendingMappings.Remove(documentId); } var updatedCursor = cursor.WithPendingMappings(pendingMappings); await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false); } private async Task GetCursorAsync(CancellationToken cancellationToken) { var state = await _stateRepository.TryGetAsync(SourceName, cancellationToken).ConfigureAwait(false); return state is null ? OsvCursor.Empty : OsvCursor.FromBson(state.Cursor); } private async Task UpdateCursorAsync(OsvCursor cursor, CancellationToken cancellationToken) { var document = cursor.ToBsonDocument(); await _stateRepository.UpdateCursorAsync(SourceName, document, _timeProvider.GetUtcNow(), cancellationToken).ConfigureAwait(false); } private async Task<(OsvCursor Cursor, int NewDocuments)> FetchEcosystemAsync( string ecosystem, OsvCursor cursor, HashSet pendingDocuments, DateTimeOffset now, int remainingCapacity, CancellationToken cancellationToken) { var client = _httpClientFactory.CreateClient(OsvOptions.HttpClientName); client.Timeout = _options.HttpTimeout; var archiveUri = BuildArchiveUri(ecosystem); using var request = new HttpRequestMessage(HttpMethod.Get, archiveUri); if (cursor.TryGetArchiveMetadata(ecosystem, out var archiveMetadata)) { if (!string.IsNullOrWhiteSpace(archiveMetadata.ETag)) { request.Headers.TryAddWithoutValidation("If-None-Match", archiveMetadata.ETag); } if (archiveMetadata.LastModified.HasValue) { request.Headers.IfModifiedSince = archiveMetadata.LastModified.Value; } } using var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); if (response.StatusCode == HttpStatusCode.NotModified) { return (cursor, 0); } response.EnsureSuccessStatusCode(); await using var archiveStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); using var archive = new ZipArchive(archiveStream, ZipArchiveMode.Read, leaveOpen: false); var existingLastModified = cursor.GetLastModified(ecosystem); var processedIdsSet = cursor.ProcessedIdsByEcosystem.TryGetValue(ecosystem, out var processedIds) ? new HashSet(processedIds, StringComparer.OrdinalIgnoreCase) : new HashSet(StringComparer.OrdinalIgnoreCase); var currentMaxModified = existingLastModified ?? DateTimeOffset.MinValue; var currentProcessedIds = new HashSet(processedIdsSet, StringComparer.OrdinalIgnoreCase); var processedUpdated = false; var newDocuments = 0; var minimumModified = existingLastModified.HasValue ? existingLastModified.Value - _options.ModifiedTolerance : now - _options.InitialBackfill; ProvenanceDiagnostics.ReportResumeWindow(SourceName, minimumModified, _logger); foreach (var entry in archive.Entries) { if (remainingCapacity <= 0) { break; } cancellationToken.ThrowIfCancellationRequested(); if (!entry.FullName.EndsWith(".json", StringComparison.OrdinalIgnoreCase)) { continue; } await using var entryStream = entry.Open(); using var memory = new MemoryStream(); await entryStream.CopyToAsync(memory, cancellationToken).ConfigureAwait(false); var bytes = memory.ToArray(); OsvVulnerabilityDto? dto; try { dto = JsonSerializer.Deserialize(bytes, SerializerOptions); } catch (Exception ex) { _logger.LogWarning(ex, "Failed to parse OSV entry {Entry} for ecosystem {Ecosystem}", entry.FullName, ecosystem); continue; } if (dto is null || string.IsNullOrWhiteSpace(dto.Id)) { continue; } var modified = (dto.Modified ?? dto.Published ?? DateTimeOffset.MinValue).ToUniversalTime(); if (modified < minimumModified) { continue; } if (existingLastModified.HasValue && modified < existingLastModified.Value - _options.ModifiedTolerance) { continue; } if (modified < currentMaxModified - _options.ModifiedTolerance) { continue; } if (modified == currentMaxModified && currentProcessedIds.Contains(dto.Id)) { continue; } var documentUri = BuildDocumentUri(ecosystem, dto.Id); var sha256 = Convert.ToHexString(SHA256.HashData(bytes)).ToLowerInvariant(); var existing = await _documentStore.FindBySourceAndUriAsync(SourceName, documentUri, cancellationToken).ConfigureAwait(false); if (existing is not null && string.Equals(existing.Sha256, sha256, StringComparison.OrdinalIgnoreCase)) { continue; } var gridFsId = await _rawDocumentStorage.UploadAsync(SourceName, documentUri, bytes, "application/json", null, cancellationToken).ConfigureAwait(false); var metadata = new Dictionary(StringComparer.Ordinal) { ["osv.ecosystem"] = ecosystem, ["osv.id"] = dto.Id, ["osv.modified"] = modified.ToString("O"), }; var recordId = existing?.Id ?? Guid.NewGuid(); var record = new DocumentRecord( recordId, SourceName, documentUri, _timeProvider.GetUtcNow(), sha256, DocumentStatuses.PendingParse, "application/json", Headers: null, Metadata: metadata, Etag: null, LastModified: modified, GridFsId: gridFsId, ExpiresAt: null); var upserted = await _documentStore.UpsertAsync(record, cancellationToken).ConfigureAwait(false); pendingDocuments.Add(upserted.Id); newDocuments++; remainingCapacity--; if (modified > currentMaxModified) { currentMaxModified = modified; currentProcessedIds = new HashSet(StringComparer.OrdinalIgnoreCase) { dto.Id }; processedUpdated = true; } else if (modified == currentMaxModified) { currentProcessedIds.Add(dto.Id); processedUpdated = true; } if (_options.RequestDelay > TimeSpan.Zero) { try { await Task.Delay(_options.RequestDelay, cancellationToken).ConfigureAwait(false); } catch (TaskCanceledException) { break; } } } if (processedUpdated && currentMaxModified != DateTimeOffset.MinValue) { cursor = cursor.WithLastModified(ecosystem, currentMaxModified, currentProcessedIds); } else if (processedUpdated && existingLastModified.HasValue) { cursor = cursor.WithLastModified(ecosystem, existingLastModified.Value, currentProcessedIds); } var etag = response.Headers.ETag?.Tag; var lastModifiedHeader = response.Content.Headers.LastModified; cursor = cursor.WithArchiveMetadata(ecosystem, etag, lastModifiedHeader); return (cursor, newDocuments); } private Uri BuildArchiveUri(string ecosystem) { var trimmed = ecosystem.Trim('/'); var baseUri = _options.BaseUri; var builder = new UriBuilder(baseUri); var path = builder.Path; if (!path.EndsWith('/')) { path += "/"; } path += $"{trimmed}/{_options.ArchiveFileName}"; builder.Path = path; return builder.Uri; } private static string BuildDocumentUri(string ecosystem, string vulnerabilityId) { var safeId = vulnerabilityId.Replace(' ', '-'); return $"https://osv-vulnerabilities.storage.googleapis.com/{ecosystem}/{safeId}.json"; } }