up
This commit is contained in:
@@ -0,0 +1,212 @@
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using Mongo2Go;
|
||||
using MongoDB.Bson;
|
||||
using MongoDB.Driver;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Extensions.Time.Testing;
|
||||
using StellaOps.Concelier.Connector.Common;
|
||||
using StellaOps.Concelier.Connector.Common.Fetch;
|
||||
using StellaOps.Concelier.Connector.Common.State;
|
||||
using StellaOps.Concelier.Storage.Mongo;
|
||||
using StellaOps.Concelier.Storage.Mongo.Documents;
|
||||
|
||||
namespace StellaOps.Concelier.Connector.Common.Tests;
|
||||
|
||||
public sealed class SourceStateSeedProcessorTests : IAsyncLifetime
|
||||
{
|
||||
private readonly MongoDbRunner _runner;
|
||||
private readonly MongoClient _client;
|
||||
private readonly IMongoDatabase _database;
|
||||
private readonly DocumentStore _documentStore;
|
||||
private readonly RawDocumentStorage _rawStorage;
|
||||
private readonly MongoSourceStateRepository _stateRepository;
|
||||
private readonly FakeTimeProvider _timeProvider;
|
||||
|
||||
public SourceStateSeedProcessorTests()
|
||||
{
|
||||
_runner = MongoDbRunner.Start(singleNodeReplSet: true);
|
||||
_client = new MongoClient(_runner.ConnectionString);
|
||||
_database = _client.GetDatabase($"source-state-seed-{Guid.NewGuid():N}");
|
||||
_documentStore = new DocumentStore(_database, NullLogger<DocumentStore>.Instance);
|
||||
_rawStorage = new RawDocumentStorage(_database);
|
||||
_stateRepository = new MongoSourceStateRepository(_database, NullLogger<MongoSourceStateRepository>.Instance);
|
||||
_timeProvider = new FakeTimeProvider(new DateTimeOffset(2025, 10, 28, 12, 0, 0, TimeSpan.Zero));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ProcessAsync_PersistsDocumentsAndUpdatesCursor()
|
||||
{
|
||||
var processor = CreateProcessor();
|
||||
var documentId = Guid.NewGuid();
|
||||
|
||||
var specification = new SourceStateSeedSpecification
|
||||
{
|
||||
Source = "vndr.test",
|
||||
Documents = new[]
|
||||
{
|
||||
new SourceStateSeedDocument
|
||||
{
|
||||
DocumentId = documentId,
|
||||
Uri = "https://example.test/advisories/ADV-1",
|
||||
Content = Encoding.UTF8.GetBytes("{\"id\":\"ADV-1\"}"),
|
||||
ContentType = "application/json",
|
||||
Headers = new Dictionary<string, string> { ["X-Test"] = "true" },
|
||||
Metadata = new Dictionary<string, string> { ["test.meta"] = "value" },
|
||||
FetchedAt = _timeProvider.GetUtcNow().AddMinutes(-5),
|
||||
AddToPendingDocuments = true,
|
||||
AddToPendingMappings = true,
|
||||
KnownIdentifiers = new[] { "ADV-1" },
|
||||
}
|
||||
},
|
||||
Cursor = new SourceStateSeedCursor
|
||||
{
|
||||
LastModifiedCursor = _timeProvider.GetUtcNow().AddDays(-1),
|
||||
LastFetchAt = _timeProvider.GetUtcNow().AddMinutes(-10),
|
||||
Additional = new Dictionary<string, string> { ["custom"] = "value" },
|
||||
},
|
||||
KnownAdvisories = new[] { "ADV-0" },
|
||||
};
|
||||
|
||||
var result = await processor.ProcessAsync(specification, CancellationToken.None);
|
||||
|
||||
Assert.Equal(1, result.DocumentsProcessed);
|
||||
Assert.Single(result.PendingDocumentIds);
|
||||
Assert.Contains(documentId, result.PendingDocumentIds);
|
||||
Assert.Single(result.PendingMappingIds);
|
||||
Assert.Contains(documentId, result.PendingMappingIds);
|
||||
Assert.Equal(2, result.KnownAdvisoriesAdded.Count);
|
||||
Assert.Contains("ADV-0", result.KnownAdvisoriesAdded);
|
||||
Assert.Contains("ADV-1", result.KnownAdvisoriesAdded);
|
||||
Assert.Equal(_timeProvider.GetUtcNow(), result.CompletedAt);
|
||||
|
||||
var storedDocument = await _documentStore.FindBySourceAndUriAsync(
|
||||
"vndr.test",
|
||||
"https://example.test/advisories/ADV-1",
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.NotNull(storedDocument);
|
||||
Assert.Equal(documentId, storedDocument!.Id);
|
||||
Assert.Equal("application/json", storedDocument.ContentType);
|
||||
Assert.Equal(DocumentStatuses.PendingParse, storedDocument.Status);
|
||||
Assert.NotNull(storedDocument.GridFsId);
|
||||
Assert.NotNull(storedDocument.Headers);
|
||||
Assert.Equal("true", storedDocument.Headers!["X-Test"]);
|
||||
Assert.NotNull(storedDocument.Metadata);
|
||||
Assert.Equal("value", storedDocument.Metadata!["test.meta"]);
|
||||
|
||||
var filesCollection = _database.GetCollection<BsonDocument>("documents.files");
|
||||
var fileCount = await filesCollection.CountDocumentsAsync(FilterDefinition<BsonDocument>.Empty);
|
||||
Assert.Equal(1, fileCount);
|
||||
|
||||
var state = await _stateRepository.TryGetAsync("vndr.test", CancellationToken.None);
|
||||
Assert.NotNull(state);
|
||||
Assert.Equal(_timeProvider.GetUtcNow().UtcDateTime, state!.LastSuccess);
|
||||
|
||||
var cursor = state.Cursor;
|
||||
var pendingDocs = cursor["pendingDocuments"].AsBsonArray.Select(v => Guid.Parse(v.AsString)).ToList();
|
||||
Assert.Contains(documentId, pendingDocs);
|
||||
|
||||
var pendingMappings = cursor["pendingMappings"].AsBsonArray.Select(v => Guid.Parse(v.AsString)).ToList();
|
||||
Assert.Contains(documentId, pendingMappings);
|
||||
|
||||
var knownAdvisories = cursor["knownAdvisories"].AsBsonArray.Select(v => v.AsString).ToList();
|
||||
Assert.Contains("ADV-0", knownAdvisories);
|
||||
Assert.Contains("ADV-1", knownAdvisories);
|
||||
|
||||
Assert.Equal(_timeProvider.GetUtcNow().UtcDateTime, cursor["lastSeededAt"].ToUniversalTime());
|
||||
Assert.Equal("value", cursor["custom"].AsString);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ProcessAsync_ReplacesExistingDocumentAndCleansPreviousRawPayload()
|
||||
{
|
||||
var processor = CreateProcessor();
|
||||
var documentId = Guid.NewGuid();
|
||||
|
||||
var initialSpecification = new SourceStateSeedSpecification
|
||||
{
|
||||
Source = "vndr.test",
|
||||
Documents = new[]
|
||||
{
|
||||
new SourceStateSeedDocument
|
||||
{
|
||||
DocumentId = documentId,
|
||||
Uri = "https://example.test/advisories/ADV-2",
|
||||
Content = Encoding.UTF8.GetBytes("{\"id\":\"ADV-2\",\"rev\":1}"),
|
||||
ContentType = "application/json",
|
||||
AddToPendingDocuments = true,
|
||||
}
|
||||
},
|
||||
KnownAdvisories = new[] { "ADV-2" },
|
||||
};
|
||||
|
||||
await processor.ProcessAsync(initialSpecification, CancellationToken.None);
|
||||
|
||||
var existingRecord = await _documentStore.FindBySourceAndUriAsync(
|
||||
"vndr.test",
|
||||
"https://example.test/advisories/ADV-2",
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.NotNull(existingRecord);
|
||||
var previousGridId = existingRecord!.GridFsId;
|
||||
Assert.NotNull(previousGridId);
|
||||
|
||||
var filesCollection = _database.GetCollection<BsonDocument>("documents.files");
|
||||
var initialFiles = await filesCollection.Find(FilterDefinition<BsonDocument>.Empty).ToListAsync();
|
||||
Assert.Single(initialFiles);
|
||||
|
||||
var updatedSpecification = new SourceStateSeedSpecification
|
||||
{
|
||||
Source = "vndr.test",
|
||||
Documents = new[]
|
||||
{
|
||||
new SourceStateSeedDocument
|
||||
{
|
||||
DocumentId = documentId,
|
||||
Uri = "https://example.test/advisories/ADV-2",
|
||||
Content = Encoding.UTF8.GetBytes("{\"id\":\"ADV-2\",\"rev\":2}"),
|
||||
ContentType = "application/json",
|
||||
AddToPendingDocuments = true,
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
var secondResult = await processor.ProcessAsync(updatedSpecification, CancellationToken.None);
|
||||
|
||||
Assert.Equal(1, secondResult.DocumentsProcessed);
|
||||
Assert.Empty(secondResult.PendingDocumentIds); // already present in cursor
|
||||
Assert.Empty(secondResult.PendingMappingIds);
|
||||
|
||||
var refreshedRecord = await _documentStore.FindBySourceAndUriAsync(
|
||||
"vndr.test",
|
||||
"https://example.test/advisories/ADV-2",
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.NotNull(refreshedRecord);
|
||||
Assert.Equal(documentId, refreshedRecord!.Id);
|
||||
Assert.NotNull(refreshedRecord.GridFsId);
|
||||
Assert.NotEqual(previousGridId, refreshedRecord.GridFsId);
|
||||
|
||||
var files = await filesCollection.Find(FilterDefinition<BsonDocument>.Empty).ToListAsync();
|
||||
Assert.Single(files);
|
||||
Assert.NotEqual(previousGridId, files[0]["_id"].AsObjectId);
|
||||
}
|
||||
|
||||
private SourceStateSeedProcessor CreateProcessor()
|
||||
=> new(
|
||||
_documentStore,
|
||||
_rawStorage,
|
||||
_stateRepository,
|
||||
_timeProvider,
|
||||
NullLogger<SourceStateSeedProcessor>.Instance);
|
||||
|
||||
public Task InitializeAsync() => Task.CompletedTask;
|
||||
|
||||
public async Task DisposeAsync()
|
||||
{
|
||||
await _client.DropDatabaseAsync(_database.DatabaseNamespace.DatabaseName);
|
||||
_runner.Dispose();
|
||||
}
|
||||
}
|
||||
@@ -4,8 +4,21 @@
|
||||
<TargetFramework>net10.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
<UseConcelierTestInfra>false</UseConcelierTestInfra>
|
||||
</PropertyGroup>
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="../../__Libraries/StellaOps.Concelier.Connector.Common/StellaOps.Concelier.Connector.Common.csproj" />
|
||||
<PackageReference Include="coverlet.collector" Version="6.0.4" />
|
||||
<PackageReference Include="Microsoft.Extensions.TimeProvider.Testing" Version="9.10.0" />
|
||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.14.1" />
|
||||
<PackageReference Include="Mongo2Go" Version="4.1.0" />
|
||||
<PackageReference Include="xunit" Version="2.9.2" />
|
||||
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.2" />
|
||||
</ItemGroup>
|
||||
</Project>
|
||||
<ItemGroup>
|
||||
<Using Include="Xunit" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="../../__Libraries/StellaOps.Concelier.Connector.Common/StellaOps.Concelier.Connector.Common.csproj" />
|
||||
<ProjectReference Include="../../__Libraries/StellaOps.Concelier.Storage.Mongo/StellaOps.Concelier.Storage.Mongo.csproj" />
|
||||
</ItemGroup>
|
||||
</Project>
|
||||
|
||||
@@ -34,21 +34,41 @@ public sealed class AdvisoryRawServiceTests
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task IngestAsync_PropagatesRepositoryDuplicateResult()
|
||||
{
|
||||
var repository = new RecordingRepository();
|
||||
var service = CreateService(repository);
|
||||
|
||||
var existingDocument = CreateDocument();
|
||||
var expectedResult = new AdvisoryRawUpsertResult(false, CreateRecord(existingDocument));
|
||||
repository.NextResult = expectedResult;
|
||||
|
||||
var result = await service.IngestAsync(CreateDocument(), CancellationToken.None);
|
||||
|
||||
Assert.False(result.Inserted);
|
||||
Assert.Same(expectedResult.Record, result.Record);
|
||||
}
|
||||
|
||||
public async Task IngestAsync_PropagatesRepositoryDuplicateResult()
|
||||
{
|
||||
var repository = new RecordingRepository();
|
||||
var service = CreateService(repository);
|
||||
|
||||
var existingDocument = CreateDocument();
|
||||
var expectedResult = new AdvisoryRawUpsertResult(false, CreateRecord(existingDocument));
|
||||
repository.NextResult = expectedResult;
|
||||
|
||||
var result = await service.IngestAsync(CreateDocument(), CancellationToken.None);
|
||||
|
||||
Assert.False(result.Inserted);
|
||||
Assert.Same(expectedResult.Record, result.Record);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task IngestAsync_PreservesAliasOrderAndDuplicates()
|
||||
{
|
||||
var repository = new RecordingRepository();
|
||||
var service = CreateService(repository);
|
||||
|
||||
var aliasSeries = ImmutableArray.Create("CVE-2025-0001", "CVE-2025-0001", "GHSA-xxxx", "cve-2025-0001");
|
||||
var document = CreateDocument() with
|
||||
{
|
||||
Identifiers = new RawIdentifiers(aliasSeries, "GHSA-xxxx"),
|
||||
};
|
||||
|
||||
repository.NextResult = new AdvisoryRawUpsertResult(true, CreateRecord(document));
|
||||
|
||||
await service.IngestAsync(document, CancellationToken.None);
|
||||
|
||||
Assert.NotNull(repository.CapturedDocument);
|
||||
Assert.Equal(aliasSeries, repository.CapturedDocument!.Identifiers.Aliases);
|
||||
}
|
||||
|
||||
private static AdvisoryRawService CreateService(RecordingRepository repository)
|
||||
{
|
||||
var writeGuard = new AdvisoryRawWriteGuard(new AocWriteGuard());
|
||||
|
||||
Reference in New Issue
Block a user