Initial commit (history squashed)
This commit is contained in:
500
src/StellaOps.Feedser.Source.Osv/OsvConnector.cs
Normal file
500
src/StellaOps.Feedser.Source.Osv/OsvConnector.cs
Normal file
@@ -0,0 +1,500 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.IO.Compression;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
using System.Net.Http;
|
||||
using System.Security.Cryptography;
|
||||
using System.Text.Json;
|
||||
using System.Text.Json.Serialization;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using MongoDB.Bson;
|
||||
using MongoDB.Bson.IO;
|
||||
using StellaOps.Feedser.Models;
|
||||
using StellaOps.Feedser.Models;
|
||||
using StellaOps.Feedser.Source.Common;
|
||||
using StellaOps.Feedser.Source.Common.Fetch;
|
||||
using StellaOps.Feedser.Source.Osv.Configuration;
|
||||
using StellaOps.Feedser.Source.Osv.Internal;
|
||||
using StellaOps.Feedser.Storage.Mongo;
|
||||
using StellaOps.Feedser.Storage.Mongo.Advisories;
|
||||
using StellaOps.Feedser.Storage.Mongo.Documents;
|
||||
using StellaOps.Feedser.Storage.Mongo.Dtos;
|
||||
using StellaOps.Plugin;
|
||||
|
||||
namespace StellaOps.Feedser.Source.Osv;
|
||||
|
||||
public sealed class OsvConnector : IFeedConnector
|
||||
{
|
||||
private static readonly JsonSerializerOptions SerializerOptions = new(JsonSerializerDefaults.Web)
|
||||
{
|
||||
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
|
||||
PropertyNameCaseInsensitive = true,
|
||||
};
|
||||
|
||||
private readonly IHttpClientFactory _httpClientFactory;
|
||||
private readonly RawDocumentStorage _rawDocumentStorage;
|
||||
private readonly IDocumentStore _documentStore;
|
||||
private readonly IDtoStore _dtoStore;
|
||||
private readonly IAdvisoryStore _advisoryStore;
|
||||
private readonly ISourceStateRepository _stateRepository;
|
||||
private readonly OsvOptions _options;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly ILogger<OsvConnector> _logger;
|
||||
|
||||
public OsvConnector(
|
||||
IHttpClientFactory httpClientFactory,
|
||||
RawDocumentStorage rawDocumentStorage,
|
||||
IDocumentStore documentStore,
|
||||
IDtoStore dtoStore,
|
||||
IAdvisoryStore advisoryStore,
|
||||
ISourceStateRepository stateRepository,
|
||||
IOptions<OsvOptions> options,
|
||||
TimeProvider? timeProvider,
|
||||
ILogger<OsvConnector> logger)
|
||||
{
|
||||
_httpClientFactory = httpClientFactory ?? throw new ArgumentNullException(nameof(httpClientFactory));
|
||||
_rawDocumentStorage = rawDocumentStorage ?? throw new ArgumentNullException(nameof(rawDocumentStorage));
|
||||
_documentStore = documentStore ?? throw new ArgumentNullException(nameof(documentStore));
|
||||
_dtoStore = dtoStore ?? throw new ArgumentNullException(nameof(dtoStore));
|
||||
_advisoryStore = advisoryStore ?? throw new ArgumentNullException(nameof(advisoryStore));
|
||||
_stateRepository = stateRepository ?? throw new ArgumentNullException(nameof(stateRepository));
|
||||
_options = (options ?? throw new ArgumentNullException(nameof(options))).Value ?? throw new ArgumentNullException(nameof(options));
|
||||
_options.Validate();
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
public string SourceName => OsvConnectorPlugin.SourceName;
|
||||
|
||||
public async Task FetchAsync(IServiceProvider services, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(services);
|
||||
|
||||
var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false);
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var pendingDocuments = cursor.PendingDocuments.ToHashSet();
|
||||
var cursorState = cursor;
|
||||
var remainingCapacity = _options.MaxAdvisoriesPerFetch;
|
||||
|
||||
foreach (var ecosystem in _options.Ecosystems)
|
||||
{
|
||||
if (remainingCapacity <= 0)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
try
|
||||
{
|
||||
var result = await FetchEcosystemAsync(
|
||||
ecosystem,
|
||||
cursorState,
|
||||
pendingDocuments,
|
||||
now,
|
||||
remainingCapacity,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
cursorState = result.Cursor;
|
||||
remainingCapacity -= result.NewDocuments;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "OSV fetch failed for ecosystem {Ecosystem}", ecosystem);
|
||||
await _stateRepository.MarkFailureAsync(SourceName, now, TimeSpan.FromMinutes(10), ex.Message, cancellationToken).ConfigureAwait(false);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
cursorState = cursorState
|
||||
.WithPendingDocuments(pendingDocuments)
|
||||
.WithPendingMappings(cursor.PendingMappings);
|
||||
|
||||
await UpdateCursorAsync(cursorState, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task ParseAsync(IServiceProvider services, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(services);
|
||||
|
||||
var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false);
|
||||
if (cursor.PendingDocuments.Count == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var remainingDocuments = cursor.PendingDocuments.ToList();
|
||||
var pendingMappings = cursor.PendingMappings.ToList();
|
||||
|
||||
foreach (var documentId in cursor.PendingDocuments)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false);
|
||||
if (document is null)
|
||||
{
|
||||
remainingDocuments.Remove(documentId);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!document.GridFsId.HasValue)
|
||||
{
|
||||
_logger.LogWarning("OSV document {DocumentId} missing GridFS content", document.Id);
|
||||
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
|
||||
remainingDocuments.Remove(documentId);
|
||||
continue;
|
||||
}
|
||||
|
||||
byte[] bytes;
|
||||
try
|
||||
{
|
||||
bytes = await _rawDocumentStorage.DownloadAsync(document.GridFsId.Value, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Unable to download OSV raw document {DocumentId}", document.Id);
|
||||
throw;
|
||||
}
|
||||
|
||||
OsvVulnerabilityDto? dto;
|
||||
try
|
||||
{
|
||||
dto = JsonSerializer.Deserialize<OsvVulnerabilityDto>(bytes, SerializerOptions);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Failed to deserialize OSV document {DocumentId} ({Uri})", document.Id, document.Uri);
|
||||
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
|
||||
remainingDocuments.Remove(documentId);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (dto is null || string.IsNullOrWhiteSpace(dto.Id))
|
||||
{
|
||||
_logger.LogWarning("OSV document {DocumentId} produced empty payload", document.Id);
|
||||
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
|
||||
remainingDocuments.Remove(documentId);
|
||||
continue;
|
||||
}
|
||||
|
||||
var sanitized = JsonSerializer.Serialize(dto, SerializerOptions);
|
||||
var payload = MongoDB.Bson.BsonDocument.Parse(sanitized);
|
||||
var dtoRecord = new DtoRecord(
|
||||
Guid.NewGuid(),
|
||||
document.Id,
|
||||
SourceName,
|
||||
"osv.v1",
|
||||
payload,
|
||||
_timeProvider.GetUtcNow());
|
||||
|
||||
await _dtoStore.UpsertAsync(dtoRecord, cancellationToken).ConfigureAwait(false);
|
||||
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.PendingMap, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
remainingDocuments.Remove(documentId);
|
||||
if (!pendingMappings.Contains(documentId))
|
||||
{
|
||||
pendingMappings.Add(documentId);
|
||||
}
|
||||
}
|
||||
|
||||
var updatedCursor = cursor
|
||||
.WithPendingDocuments(remainingDocuments)
|
||||
.WithPendingMappings(pendingMappings);
|
||||
|
||||
await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task MapAsync(IServiceProvider services, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(services);
|
||||
|
||||
var cursor = await GetCursorAsync(cancellationToken).ConfigureAwait(false);
|
||||
if (cursor.PendingMappings.Count == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var pendingMappings = cursor.PendingMappings.ToList();
|
||||
|
||||
foreach (var documentId in cursor.PendingMappings)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var dto = await _dtoStore.FindByDocumentIdAsync(documentId, cancellationToken).ConfigureAwait(false);
|
||||
var document = await _documentStore.FindAsync(documentId, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (dto is null || document is null)
|
||||
{
|
||||
pendingMappings.Remove(documentId);
|
||||
continue;
|
||||
}
|
||||
|
||||
var payloadJson = dto.Payload.ToJson(new JsonWriterSettings
|
||||
{
|
||||
OutputMode = JsonOutputMode.RelaxedExtendedJson,
|
||||
});
|
||||
|
||||
OsvVulnerabilityDto? osvDto;
|
||||
try
|
||||
{
|
||||
osvDto = JsonSerializer.Deserialize<OsvVulnerabilityDto>(payloadJson, SerializerOptions);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to deserialize OSV DTO for document {DocumentId}", document.Id);
|
||||
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
|
||||
pendingMappings.Remove(documentId);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (osvDto is null || string.IsNullOrWhiteSpace(osvDto.Id))
|
||||
{
|
||||
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Failed, cancellationToken).ConfigureAwait(false);
|
||||
pendingMappings.Remove(documentId);
|
||||
continue;
|
||||
}
|
||||
|
||||
var ecosystem = document.Metadata is not null && document.Metadata.TryGetValue("osv.ecosystem", out var ecosystemValue)
|
||||
? ecosystemValue
|
||||
: "unknown";
|
||||
|
||||
var advisory = OsvMapper.Map(osvDto, document, dto, ecosystem);
|
||||
await _advisoryStore.UpsertAsync(advisory, cancellationToken).ConfigureAwait(false);
|
||||
await _documentStore.UpdateStatusAsync(document.Id, DocumentStatuses.Mapped, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
pendingMappings.Remove(documentId);
|
||||
}
|
||||
|
||||
var updatedCursor = cursor.WithPendingMappings(pendingMappings);
|
||||
await UpdateCursorAsync(updatedCursor, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task<OsvCursor> GetCursorAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var state = await _stateRepository.TryGetAsync(SourceName, cancellationToken).ConfigureAwait(false);
|
||||
return state is null ? OsvCursor.Empty : OsvCursor.FromBson(state.Cursor);
|
||||
}
|
||||
|
||||
private async Task UpdateCursorAsync(OsvCursor cursor, CancellationToken cancellationToken)
|
||||
{
|
||||
var document = cursor.ToBsonDocument();
|
||||
await _stateRepository.UpdateCursorAsync(SourceName, document, _timeProvider.GetUtcNow(), cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task<(OsvCursor Cursor, int NewDocuments)> FetchEcosystemAsync(
|
||||
string ecosystem,
|
||||
OsvCursor cursor,
|
||||
HashSet<Guid> pendingDocuments,
|
||||
DateTimeOffset now,
|
||||
int remainingCapacity,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var client = _httpClientFactory.CreateClient(OsvOptions.HttpClientName);
|
||||
client.Timeout = _options.HttpTimeout;
|
||||
|
||||
var archiveUri = BuildArchiveUri(ecosystem);
|
||||
using var request = new HttpRequestMessage(HttpMethod.Get, archiveUri);
|
||||
|
||||
if (cursor.TryGetArchiveMetadata(ecosystem, out var archiveMetadata))
|
||||
{
|
||||
if (!string.IsNullOrWhiteSpace(archiveMetadata.ETag))
|
||||
{
|
||||
request.Headers.TryAddWithoutValidation("If-None-Match", archiveMetadata.ETag);
|
||||
}
|
||||
|
||||
if (archiveMetadata.LastModified.HasValue)
|
||||
{
|
||||
request.Headers.IfModifiedSince = archiveMetadata.LastModified.Value;
|
||||
}
|
||||
}
|
||||
|
||||
using var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (response.StatusCode == HttpStatusCode.NotModified)
|
||||
{
|
||||
return (cursor, 0);
|
||||
}
|
||||
|
||||
response.EnsureSuccessStatusCode();
|
||||
|
||||
await using var archiveStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
|
||||
using var archive = new ZipArchive(archiveStream, ZipArchiveMode.Read, leaveOpen: false);
|
||||
|
||||
var existingLastModified = cursor.GetLastModified(ecosystem);
|
||||
var processedIdsSet = cursor.ProcessedIdsByEcosystem.TryGetValue(ecosystem, out var processedIds)
|
||||
? new HashSet<string>(processedIds, StringComparer.OrdinalIgnoreCase)
|
||||
: new HashSet<string>(StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
var currentMaxModified = existingLastModified ?? DateTimeOffset.MinValue;
|
||||
var currentProcessedIds = new HashSet<string>(processedIdsSet, StringComparer.OrdinalIgnoreCase);
|
||||
var processedUpdated = false;
|
||||
var newDocuments = 0;
|
||||
|
||||
var minimumModified = existingLastModified.HasValue
|
||||
? existingLastModified.Value - _options.ModifiedTolerance
|
||||
: now - _options.InitialBackfill;
|
||||
|
||||
ProvenanceDiagnostics.ReportResumeWindow(SourceName, minimumModified, _logger);
|
||||
|
||||
foreach (var entry in archive.Entries)
|
||||
{
|
||||
if (remainingCapacity <= 0)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
if (!entry.FullName.EndsWith(".json", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
await using var entryStream = entry.Open();
|
||||
using var memory = new MemoryStream();
|
||||
await entryStream.CopyToAsync(memory, cancellationToken).ConfigureAwait(false);
|
||||
var bytes = memory.ToArray();
|
||||
|
||||
OsvVulnerabilityDto? dto;
|
||||
try
|
||||
{
|
||||
dto = JsonSerializer.Deserialize<OsvVulnerabilityDto>(bytes, SerializerOptions);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Failed to parse OSV entry {Entry} for ecosystem {Ecosystem}", entry.FullName, ecosystem);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (dto is null || string.IsNullOrWhiteSpace(dto.Id))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
var modified = (dto.Modified ?? dto.Published ?? DateTimeOffset.MinValue).ToUniversalTime();
|
||||
if (modified < minimumModified)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (existingLastModified.HasValue && modified < existingLastModified.Value - _options.ModifiedTolerance)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (modified < currentMaxModified - _options.ModifiedTolerance)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (modified == currentMaxModified && currentProcessedIds.Contains(dto.Id))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
var documentUri = BuildDocumentUri(ecosystem, dto.Id);
|
||||
var sha256 = Convert.ToHexString(SHA256.HashData(bytes)).ToLowerInvariant();
|
||||
|
||||
var existing = await _documentStore.FindBySourceAndUriAsync(SourceName, documentUri, cancellationToken).ConfigureAwait(false);
|
||||
if (existing is not null && string.Equals(existing.Sha256, sha256, StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
var gridFsId = await _rawDocumentStorage.UploadAsync(SourceName, documentUri, bytes, "application/json", null, cancellationToken).ConfigureAwait(false);
|
||||
var metadata = new Dictionary<string, string>(StringComparer.Ordinal)
|
||||
{
|
||||
["osv.ecosystem"] = ecosystem,
|
||||
["osv.id"] = dto.Id,
|
||||
["osv.modified"] = modified.ToString("O"),
|
||||
};
|
||||
|
||||
var recordId = existing?.Id ?? Guid.NewGuid();
|
||||
var record = new DocumentRecord(
|
||||
recordId,
|
||||
SourceName,
|
||||
documentUri,
|
||||
_timeProvider.GetUtcNow(),
|
||||
sha256,
|
||||
DocumentStatuses.PendingParse,
|
||||
"application/json",
|
||||
Headers: null,
|
||||
Metadata: metadata,
|
||||
Etag: null,
|
||||
LastModified: modified,
|
||||
GridFsId: gridFsId,
|
||||
ExpiresAt: null);
|
||||
|
||||
var upserted = await _documentStore.UpsertAsync(record, cancellationToken).ConfigureAwait(false);
|
||||
pendingDocuments.Add(upserted.Id);
|
||||
newDocuments++;
|
||||
remainingCapacity--;
|
||||
|
||||
if (modified > currentMaxModified)
|
||||
{
|
||||
currentMaxModified = modified;
|
||||
currentProcessedIds = new HashSet<string>(StringComparer.OrdinalIgnoreCase) { dto.Id };
|
||||
processedUpdated = true;
|
||||
}
|
||||
else if (modified == currentMaxModified)
|
||||
{
|
||||
currentProcessedIds.Add(dto.Id);
|
||||
processedUpdated = true;
|
||||
}
|
||||
|
||||
if (_options.RequestDelay > TimeSpan.Zero)
|
||||
{
|
||||
try
|
||||
{
|
||||
await Task.Delay(_options.RequestDelay, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (TaskCanceledException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (processedUpdated && currentMaxModified != DateTimeOffset.MinValue)
|
||||
{
|
||||
cursor = cursor.WithLastModified(ecosystem, currentMaxModified, currentProcessedIds);
|
||||
}
|
||||
else if (processedUpdated && existingLastModified.HasValue)
|
||||
{
|
||||
cursor = cursor.WithLastModified(ecosystem, existingLastModified.Value, currentProcessedIds);
|
||||
}
|
||||
|
||||
var etag = response.Headers.ETag?.Tag;
|
||||
var lastModifiedHeader = response.Content.Headers.LastModified;
|
||||
cursor = cursor.WithArchiveMetadata(ecosystem, etag, lastModifiedHeader);
|
||||
|
||||
return (cursor, newDocuments);
|
||||
}
|
||||
|
||||
private Uri BuildArchiveUri(string ecosystem)
|
||||
{
|
||||
var trimmed = ecosystem.Trim('/');
|
||||
var baseUri = _options.BaseUri;
|
||||
var builder = new UriBuilder(baseUri);
|
||||
var path = builder.Path;
|
||||
if (!path.EndsWith('/'))
|
||||
{
|
||||
path += "/";
|
||||
}
|
||||
|
||||
path += $"{trimmed}/{_options.ArchiveFileName}";
|
||||
builder.Path = path;
|
||||
return builder.Uri;
|
||||
}
|
||||
|
||||
private static string BuildDocumentUri(string ecosystem, string vulnerabilityId)
|
||||
{
|
||||
var safeId = vulnerabilityId.Replace(' ', '-');
|
||||
return $"https://osv-vulnerabilities.storage.googleapis.com/{ecosystem}/{safeId}.json";
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user