sprints work

This commit is contained in:
StellaOps Bot
2025-12-25 12:19:12 +02:00
parent 223843f1d1
commit 2a06f780cf
224 changed files with 41796 additions and 1515 deletions

View File

@@ -0,0 +1,537 @@
// -----------------------------------------------------------------------------
// CrossModuleEvidenceLinkingTests.cs
// Sprint: SPRINT_8100_0012_0002 - Unified Evidence Model
// Task: EVID-8100-018 - Cross-module evidence linking integration tests
// Description: Integration tests verifying evidence linking across modules:
// - Same subject can have evidence from multiple modules
// - Evidence types from Scanner, Attestor, Policy, Excititor
// - Evidence chain/graph queries work correctly
// -----------------------------------------------------------------------------
using System.Text;
using System.Text.Json;
using FluentAssertions;
using StellaOps.Evidence.Core;
using StellaOps.Evidence.Storage.Postgres.Tests.Fixtures;
using Xunit;
using Xunit.Abstractions;
namespace StellaOps.Evidence.Storage.Postgres.Tests;
/// <summary>
/// Integration tests for cross-module evidence linking.
/// Verifies that the unified evidence model correctly links evidence
/// from different modules (Scanner, Attestor, Policy, Excititor) to the same subject.
/// </summary>
[Collection(EvidencePostgresTestCollection.Name)]
public sealed class CrossModuleEvidenceLinkingTests : IAsyncLifetime
{
private readonly EvidencePostgresContainerFixture _fixture;
private readonly ITestOutputHelper _output;
private readonly string _tenantId = Guid.NewGuid().ToString();
private PostgresEvidenceStore _store = null!;
public CrossModuleEvidenceLinkingTests(
EvidencePostgresContainerFixture fixture,
ITestOutputHelper output)
{
_fixture = fixture;
_output = output;
}
public Task InitializeAsync()
{
_store = _fixture.CreateStore(_tenantId);
return Task.CompletedTask;
}
public async Task DisposeAsync()
{
await _fixture.TruncateAllTablesAsync();
}
#region Multi-Module Evidence for Same Subject
[Fact]
public async Task SameSubject_MultipleEvidenceTypes_AllLinked()
{
// Arrange - A container image subject with evidence from multiple modules
var subjectNodeId = $"sha256:{Guid.NewGuid():N}"; // Container image digest
var scannerEvidence = CreateScannerEvidence(subjectNodeId);
var reachabilityEvidence = CreateReachabilityEvidence(subjectNodeId);
var policyEvidence = CreatePolicyEvidence(subjectNodeId);
var vexEvidence = CreateVexEvidence(subjectNodeId);
var provenanceEvidence = CreateProvenanceEvidence(subjectNodeId);
// Act - Store all evidence
await _store.StoreAsync(scannerEvidence);
await _store.StoreAsync(reachabilityEvidence);
await _store.StoreAsync(policyEvidence);
await _store.StoreAsync(vexEvidence);
await _store.StoreAsync(provenanceEvidence);
// Assert - All evidence linked to same subject
var allEvidence = await _store.GetBySubjectAsync(subjectNodeId);
allEvidence.Should().HaveCount(5);
allEvidence.Select(e => e.EvidenceType).Should().Contain(new[]
{
EvidenceType.Scan,
EvidenceType.Reachability,
EvidenceType.Policy,
EvidenceType.Vex,
EvidenceType.Provenance
});
_output.WriteLine($"Subject {subjectNodeId} has {allEvidence.Count} evidence records from different modules");
}
[Fact]
public async Task SameSubject_FilterByType_ReturnsCorrectEvidence()
{
// Arrange
var subjectNodeId = $"sha256:{Guid.NewGuid():N}";
await _store.StoreAsync(CreateScannerEvidence(subjectNodeId));
await _store.StoreAsync(CreateScannerEvidence(subjectNodeId)); // Another scan finding
await _store.StoreAsync(CreateReachabilityEvidence(subjectNodeId));
await _store.StoreAsync(CreatePolicyEvidence(subjectNodeId));
// Act - Filter by Scan type
var scanEvidence = await _store.GetBySubjectAsync(subjectNodeId, EvidenceType.Scan);
var policyEvidence = await _store.GetBySubjectAsync(subjectNodeId, EvidenceType.Policy);
// Assert
scanEvidence.Should().HaveCount(2);
policyEvidence.Should().HaveCount(1);
}
#endregion
#region Evidence Chain Scenarios
[Fact]
public async Task EvidenceChain_ScanToVexToPolicy_LinkedCorrectly()
{
// Scenario: Vulnerability scan → VEX assessment → Policy decision
// All evidence points to the same subject (vulnerability finding)
var vulnerabilitySubject = $"sha256:{Guid.NewGuid():N}";
// 1. Scanner finds vulnerability
var scanEvidence = CreateScannerEvidence(vulnerabilitySubject);
await _store.StoreAsync(scanEvidence);
// 2. VEX assessment received
var vexEvidence = CreateVexEvidence(vulnerabilitySubject, referencedEvidenceId: scanEvidence.EvidenceId);
await _store.StoreAsync(vexEvidence);
// 3. Policy engine makes decision
var policyEvidence = CreatePolicyEvidence(vulnerabilitySubject, referencedEvidenceId: vexEvidence.EvidenceId);
await _store.StoreAsync(policyEvidence);
// Assert - Chain is queryable
var allEvidence = await _store.GetBySubjectAsync(vulnerabilitySubject);
allEvidence.Should().HaveCount(3);
// Verify order by type represents the chain
var scan = allEvidence.First(e => e.EvidenceType == EvidenceType.Scan);
var vex = allEvidence.First(e => e.EvidenceType == EvidenceType.Vex);
var policy = allEvidence.First(e => e.EvidenceType == EvidenceType.Policy);
scan.Should().NotBeNull();
vex.Should().NotBeNull();
policy.Should().NotBeNull();
_output.WriteLine($"Evidence chain: Scan({scan.EvidenceId}) → VEX({vex.EvidenceId}) → Policy({policy.EvidenceId})");
}
[Fact]
public async Task EvidenceChain_ReachabilityToEpssToPolicy_LinkedCorrectly()
{
// Scenario: Reachability analysis + EPSS score → Policy decision
var subjectNodeId = $"sha256:{Guid.NewGuid():N}";
// 1. Reachability analysis
var reachability = CreateReachabilityEvidence(subjectNodeId);
await _store.StoreAsync(reachability);
// 2. EPSS score
var epss = CreateEpssEvidence(subjectNodeId);
await _store.StoreAsync(epss);
// 3. Policy decision based on both
var policy = CreatePolicyEvidence(subjectNodeId, referencedEvidenceIds: new[]
{
reachability.EvidenceId,
epss.EvidenceId
});
await _store.StoreAsync(policy);
// Assert
var allEvidence = await _store.GetBySubjectAsync(subjectNodeId);
allEvidence.Should().HaveCount(3);
}
#endregion
#region Multi-Tenant Evidence Isolation
[Fact]
public async Task MultiTenant_SameSubject_IsolatedByTenant()
{
// Arrange - Two tenants with evidence for the same subject
var subjectNodeId = $"sha256:{Guid.NewGuid():N}";
var tenantA = Guid.NewGuid().ToString();
var tenantB = Guid.NewGuid().ToString();
var storeA = _fixture.CreateStore(tenantA);
var storeB = _fixture.CreateStore(tenantB);
var evidenceA = CreateScannerEvidence(subjectNodeId);
var evidenceB = CreateScannerEvidence(subjectNodeId);
// Act - Store in different tenant stores
await storeA.StoreAsync(evidenceA);
await storeB.StoreAsync(evidenceB);
// Assert - Each tenant only sees their own evidence
var retrievedA = await storeA.GetBySubjectAsync(subjectNodeId);
var retrievedB = await storeB.GetBySubjectAsync(subjectNodeId);
retrievedA.Should().HaveCount(1);
retrievedA[0].EvidenceId.Should().Be(evidenceA.EvidenceId);
retrievedB.Should().HaveCount(1);
retrievedB[0].EvidenceId.Should().Be(evidenceB.EvidenceId);
_output.WriteLine($"Tenant A evidence: {evidenceA.EvidenceId}");
_output.WriteLine($"Tenant B evidence: {evidenceB.EvidenceId}");
}
#endregion
#region Evidence Graph Queries
[Fact]
public async Task EvidenceGraph_AllTypesForArtifact_ReturnsComplete()
{
// Arrange - Simulate a complete evidence graph for a container artifact
var artifactDigest = $"sha256:{Guid.NewGuid():N}";
var evidenceRecords = new[]
{
CreateArtifactEvidence(artifactDigest), // SBOM entry
CreateScannerEvidence(artifactDigest), // Vulnerability scan
CreateReachabilityEvidence(artifactDigest), // Reachability analysis
CreateEpssEvidence(artifactDigest), // EPSS score
CreateVexEvidence(artifactDigest), // VEX statement
CreatePolicyEvidence(artifactDigest), // Policy decision
CreateProvenanceEvidence(artifactDigest), // Build provenance
CreateExceptionEvidence(artifactDigest) // Exception applied
};
foreach (var record in evidenceRecords)
{
await _store.StoreAsync(record);
}
// Act - Query all evidence types
var allEvidence = await _store.GetBySubjectAsync(artifactDigest);
// Assert - Complete evidence graph
allEvidence.Should().HaveCount(8);
allEvidence.Select(e => e.EvidenceType).Distinct().Should().HaveCount(8);
// Log evidence graph
foreach (var evidence in allEvidence)
{
_output.WriteLine($" {evidence.EvidenceType}: {evidence.EvidenceId}");
}
}
[Fact]
public async Task EvidenceGraph_ExistsCheck_ForAllTypes()
{
// Arrange
var subjectNodeId = $"sha256:{Guid.NewGuid():N}";
await _store.StoreAsync(CreateScannerEvidence(subjectNodeId));
await _store.StoreAsync(CreateReachabilityEvidence(subjectNodeId));
// Note: No Policy evidence
// Act & Assert
(await _store.ExistsAsync(subjectNodeId, EvidenceType.Scan)).Should().BeTrue();
(await _store.ExistsAsync(subjectNodeId, EvidenceType.Reachability)).Should().BeTrue();
(await _store.ExistsAsync(subjectNodeId, EvidenceType.Policy)).Should().BeFalse();
(await _store.ExistsAsync(subjectNodeId, EvidenceType.Vex)).Should().BeFalse();
}
#endregion
#region Cross-Module Evidence Correlation
[Fact]
public async Task Correlation_SameCorrelationId_FindsRelatedEvidence()
{
// Arrange - Evidence from different modules with same correlation ID
var subjectNodeId = $"sha256:{Guid.NewGuid():N}";
var correlationId = Guid.NewGuid().ToString();
var scanEvidence = CreateScannerEvidence(subjectNodeId, correlationId: correlationId);
var reachEvidence = CreateReachabilityEvidence(subjectNodeId, correlationId: correlationId);
var policyEvidence = CreatePolicyEvidence(subjectNodeId, correlationId: correlationId);
await _store.StoreAsync(scanEvidence);
await _store.StoreAsync(reachEvidence);
await _store.StoreAsync(policyEvidence);
// Act - Get all evidence for subject
var allEvidence = await _store.GetBySubjectAsync(subjectNodeId);
// Assert - All have same correlation ID
allEvidence.Should().HaveCount(3);
allEvidence.Should().OnlyContain(e => e.Provenance.CorrelationId == correlationId);
}
[Fact]
public async Task Generators_MultiplePerSubject_AllPreserved()
{
// Arrange - Evidence from different generators
var subjectNodeId = $"sha256:{Guid.NewGuid():N}";
var trivyEvidence = CreateScannerEvidence(subjectNodeId, generator: "stellaops/scanner/trivy");
var grypeEvidence = CreateScannerEvidence(subjectNodeId, generator: "stellaops/scanner/grype");
var snykEvidence = CreateScannerEvidence(subjectNodeId, generator: "vendor/snyk");
await _store.StoreAsync(trivyEvidence);
await _store.StoreAsync(grypeEvidence);
await _store.StoreAsync(snykEvidence);
// Act
var scanEvidence = await _store.GetBySubjectAsync(subjectNodeId, EvidenceType.Scan);
// Assert
scanEvidence.Should().HaveCount(3);
scanEvidence.Select(e => e.Provenance.GeneratorId).Should()
.Contain(new[] { "stellaops/scanner/trivy", "stellaops/scanner/grype", "vendor/snyk" });
}
#endregion
#region Evidence Count and Statistics
[Fact]
public async Task CountBySubject_AfterMultiModuleInserts_ReturnsCorrectCount()
{
// Arrange
var subjectNodeId = $"sha256:{Guid.NewGuid():N}";
await _store.StoreAsync(CreateScannerEvidence(subjectNodeId));
await _store.StoreAsync(CreateReachabilityEvidence(subjectNodeId));
await _store.StoreAsync(CreatePolicyEvidence(subjectNodeId));
// Act
var count = await _store.CountBySubjectAsync(subjectNodeId);
// Assert
count.Should().Be(3);
}
[Fact]
public async Task GetByType_AcrossSubjects_ReturnsAll()
{
// Arrange - Multiple subjects with same evidence type
var subject1 = $"sha256:{Guid.NewGuid():N}";
var subject2 = $"sha256:{Guid.NewGuid():N}";
var subject3 = $"sha256:{Guid.NewGuid():N}";
await _store.StoreAsync(CreateScannerEvidence(subject1));
await _store.StoreAsync(CreateScannerEvidence(subject2));
await _store.StoreAsync(CreateScannerEvidence(subject3));
await _store.StoreAsync(CreateReachabilityEvidence(subject1)); // Different type
// Act
var scanEvidence = await _store.GetByTypeAsync(EvidenceType.Scan);
// Assert
scanEvidence.Should().HaveCount(3);
scanEvidence.Select(e => e.SubjectNodeId).Should()
.Contain(new[] { subject1, subject2, subject3 });
}
#endregion
#region Helpers
private static EvidenceRecord CreateScannerEvidence(
string subjectNodeId,
string? correlationId = null,
string generator = "stellaops/scanner/trivy")
{
var payload = JsonSerializer.SerializeToUtf8Bytes(new
{
cve = $"CVE-2024-{Random.Shared.Next(1000, 9999)}",
severity = "HIGH",
affectedPackage = "example-lib@1.0.0"
});
var provenance = new EvidenceProvenance
{
GeneratorId = generator,
GeneratorVersion = "1.0.0",
GeneratedAt = DateTimeOffset.UtcNow,
CorrelationId = correlationId ?? Guid.NewGuid().ToString()
};
return EvidenceRecord.Create(subjectNodeId, EvidenceType.Scan, payload, provenance, "1.0.0");
}
private static EvidenceRecord CreateReachabilityEvidence(
string subjectNodeId,
string? correlationId = null)
{
var payload = JsonSerializer.SerializeToUtf8Bytes(new
{
reachable = true,
confidence = 0.95,
paths = new[] { "main.go:42", "handler.go:128" }
});
var provenance = new EvidenceProvenance
{
GeneratorId = "stellaops/scanner/reachability",
GeneratorVersion = "1.0.0",
GeneratedAt = DateTimeOffset.UtcNow,
CorrelationId = correlationId ?? Guid.NewGuid().ToString()
};
return EvidenceRecord.Create(subjectNodeId, EvidenceType.Reachability, payload, provenance, "1.0.0");
}
private static EvidenceRecord CreatePolicyEvidence(
string subjectNodeId,
string? referencedEvidenceId = null,
string[]? referencedEvidenceIds = null,
string? correlationId = null)
{
var refs = referencedEvidenceIds ?? (referencedEvidenceId is not null ? new[] { referencedEvidenceId } : null);
var payload = JsonSerializer.SerializeToUtf8Bytes(new
{
ruleId = "vuln-severity-block",
verdict = "BLOCK",
referencedEvidence = refs
});
var provenance = new EvidenceProvenance
{
GeneratorId = "stellaops/policy/opa",
GeneratorVersion = "1.0.0",
GeneratedAt = DateTimeOffset.UtcNow,
CorrelationId = correlationId ?? Guid.NewGuid().ToString()
};
return EvidenceRecord.Create(subjectNodeId, EvidenceType.Policy, payload, provenance, "1.0.0");
}
private static EvidenceRecord CreateVexEvidence(
string subjectNodeId,
string? referencedEvidenceId = null)
{
var payload = JsonSerializer.SerializeToUtf8Bytes(new
{
status = "not_affected",
justification = "vulnerable_code_not_in_execute_path",
referencedEvidence = referencedEvidenceId
});
var provenance = new EvidenceProvenance
{
GeneratorId = "stellaops/excititor/vex",
GeneratorVersion = "1.0.0",
GeneratedAt = DateTimeOffset.UtcNow
};
return EvidenceRecord.Create(subjectNodeId, EvidenceType.Vex, payload, provenance, "1.0.0");
}
private static EvidenceRecord CreateEpssEvidence(string subjectNodeId)
{
var payload = JsonSerializer.SerializeToUtf8Bytes(new
{
score = 0.0342,
percentile = 0.89,
modelDate = "2024-12-25"
});
var provenance = new EvidenceProvenance
{
GeneratorId = "stellaops/scanner/epss",
GeneratorVersion = "1.0.0",
GeneratedAt = DateTimeOffset.UtcNow
};
return EvidenceRecord.Create(subjectNodeId, EvidenceType.Epss, payload, provenance, "1.0.0");
}
private static EvidenceRecord CreateProvenanceEvidence(string subjectNodeId)
{
var payload = JsonSerializer.SerializeToUtf8Bytes(new
{
buildId = Guid.NewGuid().ToString(),
builder = "github-actions",
inputs = new[] { "go.mod", "main.go" }
});
var provenance = new EvidenceProvenance
{
GeneratorId = "stellaops/attestor/provenance",
GeneratorVersion = "1.0.0",
GeneratedAt = DateTimeOffset.UtcNow
};
return EvidenceRecord.Create(subjectNodeId, EvidenceType.Provenance, payload, provenance, "1.0.0");
}
private static EvidenceRecord CreateArtifactEvidence(string subjectNodeId)
{
var payload = JsonSerializer.SerializeToUtf8Bytes(new
{
purl = "pkg:golang/example.com/mylib@1.0.0",
digest = subjectNodeId,
sbomFormat = "SPDX-3.0.1"
});
var provenance = new EvidenceProvenance
{
GeneratorId = "stellaops/scanner/sbom",
GeneratorVersion = "1.0.0",
GeneratedAt = DateTimeOffset.UtcNow
};
return EvidenceRecord.Create(subjectNodeId, EvidenceType.Artifact, payload, provenance, "1.0.0");
}
private static EvidenceRecord CreateExceptionEvidence(string subjectNodeId)
{
var payload = JsonSerializer.SerializeToUtf8Bytes(new
{
exceptionId = Guid.NewGuid().ToString(),
reason = "Risk accepted per security review",
expiry = DateTimeOffset.UtcNow.AddDays(90)
});
var provenance = new EvidenceProvenance
{
GeneratorId = "stellaops/policy/exceptions",
GeneratorVersion = "1.0.0",
GeneratedAt = DateTimeOffset.UtcNow
};
return EvidenceRecord.Create(subjectNodeId, EvidenceType.Exception, payload, provenance, "1.0.0");
}
#endregion
}

View File

@@ -0,0 +1,185 @@
// -----------------------------------------------------------------------------
// EvidencePostgresContainerFixture.cs
// Sprint: SPRINT_8100_0012_0002 - Unified Evidence Model
// Task: EVID-8100-017 - PostgreSQL store integration tests
// Description: Collection fixture providing a shared PostgreSQL container for Evidence storage tests
// -----------------------------------------------------------------------------
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using StellaOps.Infrastructure.Postgres.Options;
using StellaOps.Infrastructure.Postgres.Testing;
using Testcontainers.PostgreSql;
using Xunit.Sdk;
namespace StellaOps.Evidence.Storage.Postgres.Tests.Fixtures;
/// <summary>
/// Collection fixture that provides a shared PostgreSQL container for Evidence storage integration tests.
/// Uses Testcontainers to spin up a PostgreSQL instance with the evidence schema.
/// </summary>
public sealed class EvidencePostgresContainerFixture : IAsyncLifetime, IAsyncDisposable
{
private PostgreSqlContainer? _container;
private PostgresFixture? _fixture;
private bool _disposed;
/// <summary>
/// Gets whether the container is running.
/// </summary>
public bool IsRunning => _container is not null;
/// <summary>
/// Gets the connection string for the PostgreSQL container.
/// </summary>
public string ConnectionString => _container?.GetConnectionString()
?? throw new InvalidOperationException("Container not started");
/// <summary>
/// Gets the PostgreSQL fixture for test schema management.
/// </summary>
public PostgresFixture Fixture => _fixture
?? throw new InvalidOperationException("Fixture not initialized");
/// <summary>
/// Creates PostgreSQL options configured for the test container.
/// </summary>
public PostgresOptions CreateOptions()
{
return new PostgresOptions
{
ConnectionString = ConnectionString,
SchemaName = EvidenceDataSource.DefaultSchemaName,
CommandTimeoutSeconds = 30,
AutoMigrate = false
};
}
/// <summary>
/// Creates an EvidenceDataSource for tests.
/// </summary>
public EvidenceDataSource CreateDataSource()
{
var options = Options.Create(CreateOptions());
return new EvidenceDataSource(options, NullLogger<EvidenceDataSource>.Instance);
}
/// <summary>
/// Creates a PostgresEvidenceStore for the specified tenant.
/// </summary>
public PostgresEvidenceStore CreateStore(string tenantId)
{
var dataSource = CreateDataSource();
return new PostgresEvidenceStore(
dataSource,
tenantId,
NullLogger<PostgresEvidenceStore>.Instance);
}
/// <summary>
/// Creates a PostgresEvidenceStoreFactory for tests.
/// </summary>
public PostgresEvidenceStoreFactory CreateStoreFactory()
{
var dataSource = CreateDataSource();
return new PostgresEvidenceStoreFactory(dataSource, NullLoggerFactory.Instance);
}
/// <inheritdoc />
public async Task InitializeAsync()
{
try
{
_container = new PostgreSqlBuilder()
.WithImage("postgres:16-alpine")
.WithDatabase("stellaops_test")
.WithUsername("test")
.WithPassword("test")
.Build();
await _container.StartAsync();
// Create fixture for schema management
_fixture = PostgresFixtureFactory.CreateRandom(ConnectionString);
await _fixture.InitializeAsync();
// Run evidence schema migrations
await _fixture.RunMigrationsFromAssemblyAsync<EvidenceDataSource>(
"Evidence",
resourcePrefix: null);
}
catch (Exception ex)
{
try
{
if (_fixture is not null)
{
await _fixture.DisposeAsync();
}
if (_container is not null)
{
await _container.DisposeAsync();
}
}
catch
{
// Ignore cleanup failures during skip.
}
_container = null;
_fixture = null;
throw SkipException.ForSkip(
$"Evidence PostgreSQL integration tests require Docker/Testcontainers. Skipping because the container failed to start: {ex.Message}");
}
}
/// <inheritdoc />
public async Task DisposeAsync()
{
await DisposeAsyncCore();
}
async ValueTask IAsyncDisposable.DisposeAsync()
{
await DisposeAsyncCore();
GC.SuppressFinalize(this);
}
private async Task DisposeAsyncCore()
{
if (_disposed) return;
_disposed = true;
if (_fixture is not null)
{
await _fixture.DisposeAsync();
}
if (_container is not null)
{
await _container.StopAsync();
await _container.DisposeAsync();
}
}
/// <summary>
/// Truncates all tables for test isolation.
/// </summary>
public async Task TruncateAllTablesAsync()
{
if (_fixture is null) return;
await _fixture.TruncateAllTablesAsync();
}
}
/// <summary>
/// Collection definition for Evidence PostgreSQL integration tests.
/// All tests in this collection share a single PostgreSQL container.
/// </summary>
[CollectionDefinition(Name)]
public sealed class EvidencePostgresTestCollection : ICollectionFixture<EvidencePostgresContainerFixture>
{
public const string Name = "Evidence PostgreSQL Integration Tests";
}

View File

@@ -0,0 +1,530 @@
// -----------------------------------------------------------------------------
// PostgresEvidenceStoreIntegrationTests.cs
// Sprint: SPRINT_8100_0012_0002 - Unified Evidence Model
// Task: EVID-8100-017 - PostgreSQL store CRUD integration tests
// Description: Integration tests verifying PostgresEvidenceStore CRUD operations
// -----------------------------------------------------------------------------
using System.Text;
using FluentAssertions;
using StellaOps.Evidence.Core;
using StellaOps.Evidence.Storage.Postgres.Tests.Fixtures;
using Xunit;
using Xunit.Abstractions;
namespace StellaOps.Evidence.Storage.Postgres.Tests;
/// <summary>
/// Integration tests for PostgresEvidenceStore CRUD operations.
/// Tests run against a real PostgreSQL container via Testcontainers.
/// </summary>
[Collection(EvidencePostgresTestCollection.Name)]
public sealed class PostgresEvidenceStoreIntegrationTests : IAsyncLifetime
{
private readonly EvidencePostgresContainerFixture _fixture;
private readonly ITestOutputHelper _output;
private readonly string _tenantId = Guid.NewGuid().ToString();
private PostgresEvidenceStore _store = null!;
public PostgresEvidenceStoreIntegrationTests(
EvidencePostgresContainerFixture fixture,
ITestOutputHelper output)
{
_fixture = fixture;
_output = output;
}
public Task InitializeAsync()
{
_store = _fixture.CreateStore(_tenantId);
return Task.CompletedTask;
}
public async Task DisposeAsync()
{
await _fixture.TruncateAllTablesAsync();
}
#region Store Tests
[Fact]
public async Task StoreAsync_NewEvidence_ReturnsEvidenceId()
{
// Arrange
var evidence = CreateTestEvidence();
// Act
var storedId = await _store.StoreAsync(evidence);
// Assert
storedId.Should().Be(evidence.EvidenceId);
_output.WriteLine($"Stored evidence: {storedId}");
}
[Fact]
public async Task StoreAsync_DuplicateEvidence_IsIdempotent()
{
// Arrange
var evidence = CreateTestEvidence();
// Act - Store twice
var firstId = await _store.StoreAsync(evidence);
var secondId = await _store.StoreAsync(evidence);
// Assert - Both return same ID, no error
firstId.Should().Be(evidence.EvidenceId);
secondId.Should().Be(evidence.EvidenceId);
// Verify only one record exists
var count = await _store.CountBySubjectAsync(evidence.SubjectNodeId);
count.Should().Be(1);
}
[Fact]
public async Task StoreBatchAsync_MultipleRecords_StoresAllSuccessfully()
{
// Arrange
var subjectId = $"sha256:{Guid.NewGuid():N}";
var records = Enumerable.Range(1, 5)
.Select(i => CreateTestEvidence(subjectId, (EvidenceType)(i % 5 + 1)))
.ToList();
// Act
var storedCount = await _store.StoreBatchAsync(records);
// Assert
storedCount.Should().Be(5);
var count = await _store.CountBySubjectAsync(subjectId);
count.Should().Be(5);
}
[Fact]
public async Task StoreBatchAsync_WithDuplicates_StoresOnlyUnique()
{
// Arrange
var evidence = CreateTestEvidence();
var records = new[] { evidence, evidence, evidence };
// Act
var storedCount = await _store.StoreBatchAsync(records);
// Assert - Only one should be stored
storedCount.Should().Be(1);
}
#endregion
#region GetById Tests
[Fact]
public async Task GetByIdAsync_ExistingEvidence_ReturnsEvidence()
{
// Arrange
var evidence = CreateTestEvidence();
await _store.StoreAsync(evidence);
// Act
var retrieved = await _store.GetByIdAsync(evidence.EvidenceId);
// Assert
retrieved.Should().NotBeNull();
retrieved!.EvidenceId.Should().Be(evidence.EvidenceId);
retrieved.SubjectNodeId.Should().Be(evidence.SubjectNodeId);
retrieved.EvidenceType.Should().Be(evidence.EvidenceType);
retrieved.PayloadSchemaVersion.Should().Be(evidence.PayloadSchemaVersion);
retrieved.Payload.ToArray().Should().BeEquivalentTo(evidence.Payload.ToArray());
retrieved.Provenance.GeneratorId.Should().Be(evidence.Provenance.GeneratorId);
}
[Fact]
public async Task GetByIdAsync_NonExistingEvidence_ReturnsNull()
{
// Arrange
var nonExistentId = $"sha256:{Guid.NewGuid():N}";
// Act
var retrieved = await _store.GetByIdAsync(nonExistentId);
// Assert
retrieved.Should().BeNull();
}
[Fact]
public async Task GetByIdAsync_WithSignatures_PreservesSignatures()
{
// Arrange
var evidence = CreateTestEvidenceWithSignatures();
await _store.StoreAsync(evidence);
// Act
var retrieved = await _store.GetByIdAsync(evidence.EvidenceId);
// Assert
retrieved.Should().NotBeNull();
retrieved!.Signatures.Should().HaveCount(2);
retrieved.Signatures[0].SignerId.Should().Be("signer-1");
retrieved.Signatures[1].SignerId.Should().Be("signer-2");
}
#endregion
#region GetBySubject Tests
[Fact]
public async Task GetBySubjectAsync_MultipleEvidence_ReturnsAll()
{
// Arrange
var subjectId = $"sha256:{Guid.NewGuid():N}";
var records = new[]
{
CreateTestEvidence(subjectId, EvidenceType.Scan),
CreateTestEvidence(subjectId, EvidenceType.Reachability),
CreateTestEvidence(subjectId, EvidenceType.Policy)
};
foreach (var record in records)
{
await _store.StoreAsync(record);
}
// Act
var retrieved = await _store.GetBySubjectAsync(subjectId);
// Assert
retrieved.Should().HaveCount(3);
retrieved.Select(e => e.EvidenceType).Should()
.Contain(new[] { EvidenceType.Scan, EvidenceType.Reachability, EvidenceType.Policy });
}
[Fact]
public async Task GetBySubjectAsync_WithTypeFilter_ReturnsFiltered()
{
// Arrange
var subjectId = $"sha256:{Guid.NewGuid():N}";
await _store.StoreAsync(CreateTestEvidence(subjectId, EvidenceType.Scan));
await _store.StoreAsync(CreateTestEvidence(subjectId, EvidenceType.Reachability));
await _store.StoreAsync(CreateTestEvidence(subjectId, EvidenceType.Policy));
// Act
var retrieved = await _store.GetBySubjectAsync(subjectId, EvidenceType.Scan);
// Assert
retrieved.Should().HaveCount(1);
retrieved[0].EvidenceType.Should().Be(EvidenceType.Scan);
}
[Fact]
public async Task GetBySubjectAsync_NoEvidence_ReturnsEmptyList()
{
// Arrange
var nonExistentSubject = $"sha256:{Guid.NewGuid():N}";
// Act
var retrieved = await _store.GetBySubjectAsync(nonExistentSubject);
// Assert
retrieved.Should().BeEmpty();
}
#endregion
#region GetByType Tests
[Fact]
public async Task GetByTypeAsync_MultipleEvidence_ReturnsMatchingType()
{
// Arrange
await _store.StoreAsync(CreateTestEvidence(evidenceType: EvidenceType.Scan));
await _store.StoreAsync(CreateTestEvidence(evidenceType: EvidenceType.Scan));
await _store.StoreAsync(CreateTestEvidence(evidenceType: EvidenceType.Reachability));
// Act
var retrieved = await _store.GetByTypeAsync(EvidenceType.Scan);
// Assert
retrieved.Should().HaveCount(2);
retrieved.Should().OnlyContain(e => e.EvidenceType == EvidenceType.Scan);
}
[Fact]
public async Task GetByTypeAsync_WithLimit_RespectsLimit()
{
// Arrange
for (int i = 0; i < 10; i++)
{
await _store.StoreAsync(CreateTestEvidence(evidenceType: EvidenceType.Vex));
}
// Act
var retrieved = await _store.GetByTypeAsync(EvidenceType.Vex, limit: 5);
// Assert
retrieved.Should().HaveCount(5);
}
#endregion
#region Exists Tests
[Fact]
public async Task ExistsAsync_ExistingEvidence_ReturnsTrue()
{
// Arrange
var evidence = CreateTestEvidence();
await _store.StoreAsync(evidence);
// Act
var exists = await _store.ExistsAsync(evidence.SubjectNodeId, evidence.EvidenceType);
// Assert
exists.Should().BeTrue();
}
[Fact]
public async Task ExistsAsync_NonExistingEvidence_ReturnsFalse()
{
// Arrange
var evidence = CreateTestEvidence();
await _store.StoreAsync(evidence);
// Act - Check for different type
var exists = await _store.ExistsAsync(evidence.SubjectNodeId, EvidenceType.License);
// Assert
exists.Should().BeFalse();
}
[Fact]
public async Task ExistsAsync_NonExistingSubject_ReturnsFalse()
{
// Arrange
var nonExistentSubject = $"sha256:{Guid.NewGuid():N}";
// Act
var exists = await _store.ExistsAsync(nonExistentSubject, EvidenceType.Scan);
// Assert
exists.Should().BeFalse();
}
#endregion
#region Delete Tests
[Fact]
public async Task DeleteAsync_ExistingEvidence_ReturnsTrue()
{
// Arrange
var evidence = CreateTestEvidence();
await _store.StoreAsync(evidence);
// Act
var deleted = await _store.DeleteAsync(evidence.EvidenceId);
// Assert
deleted.Should().BeTrue();
// Verify deletion
var retrieved = await _store.GetByIdAsync(evidence.EvidenceId);
retrieved.Should().BeNull();
}
[Fact]
public async Task DeleteAsync_NonExistingEvidence_ReturnsFalse()
{
// Arrange
var nonExistentId = $"sha256:{Guid.NewGuid():N}";
// Act
var deleted = await _store.DeleteAsync(nonExistentId);
// Assert
deleted.Should().BeFalse();
}
#endregion
#region Count Tests
[Fact]
public async Task CountBySubjectAsync_MultipleEvidence_ReturnsCorrectCount()
{
// Arrange
var subjectId = $"sha256:{Guid.NewGuid():N}";
await _store.StoreAsync(CreateTestEvidence(subjectId, EvidenceType.Scan));
await _store.StoreAsync(CreateTestEvidence(subjectId, EvidenceType.Reachability));
await _store.StoreAsync(CreateTestEvidence(subjectId, EvidenceType.Policy));
// Act
var count = await _store.CountBySubjectAsync(subjectId);
// Assert
count.Should().Be(3);
}
[Fact]
public async Task CountBySubjectAsync_NoEvidence_ReturnsZero()
{
// Arrange
var nonExistentSubject = $"sha256:{Guid.NewGuid():N}";
// Act
var count = await _store.CountBySubjectAsync(nonExistentSubject);
// Assert
count.Should().Be(0);
}
#endregion
#region Integrity Tests
[Fact]
public async Task RoundTrip_EvidenceRecord_PreservesIntegrity()
{
// Arrange
var evidence = CreateTestEvidence();
await _store.StoreAsync(evidence);
// Act
var retrieved = await _store.GetByIdAsync(evidence.EvidenceId) as EvidenceRecord;
// Assert
retrieved.Should().NotBeNull();
retrieved!.VerifyIntegrity().Should().BeTrue("evidence ID should match computed hash");
}
[Fact]
public async Task RoundTrip_BinaryPayload_PreservesData()
{
// Arrange
var binaryPayload = new byte[] { 0x00, 0x01, 0x02, 0xFF, 0xFE, 0xFD };
var provenance = EvidenceProvenance.CreateMinimal("test/binary", "1.0.0");
var evidence = EvidenceRecord.Create(
$"sha256:{Guid.NewGuid():N}",
EvidenceType.Artifact,
binaryPayload,
provenance,
"1.0.0");
await _store.StoreAsync(evidence);
// Act
var retrieved = await _store.GetByIdAsync(evidence.EvidenceId);
// Assert
retrieved.Should().NotBeNull();
retrieved!.Payload.ToArray().Should().BeEquivalentTo(binaryPayload);
}
[Fact]
public async Task RoundTrip_UnicodePayload_PreservesData()
{
// Arrange
var unicodeJson = "{\"message\": \"Hello 世界 🌍 مرحبا\"}";
var payload = Encoding.UTF8.GetBytes(unicodeJson);
var provenance = EvidenceProvenance.CreateMinimal("test/unicode", "1.0.0");
var evidence = EvidenceRecord.Create(
$"sha256:{Guid.NewGuid():N}",
EvidenceType.Custom,
payload,
provenance,
"1.0.0");
await _store.StoreAsync(evidence);
// Act
var retrieved = await _store.GetByIdAsync(evidence.EvidenceId);
// Assert
retrieved.Should().NotBeNull();
var retrievedJson = Encoding.UTF8.GetString(retrieved!.Payload.Span);
retrievedJson.Should().Be(unicodeJson);
}
#endregion
#region Factory Tests
[Fact]
public void Factory_CreateStore_ReturnsTenantScopedStore()
{
// Arrange
var factory = _fixture.CreateStoreFactory();
var tenantId1 = Guid.NewGuid().ToString();
var tenantId2 = Guid.NewGuid().ToString();
// Act
var store1 = factory.Create(tenantId1);
var store2 = factory.Create(tenantId2);
// Assert
store1.Should().NotBeNull();
store2.Should().NotBeNull();
store1.Should().NotBeSameAs(store2);
}
#endregion
#region Helpers
private static EvidenceRecord CreateTestEvidence(
string? subjectNodeId = null,
EvidenceType evidenceType = EvidenceType.Scan)
{
var subject = subjectNodeId ?? $"sha256:{Guid.NewGuid():N}";
var payload = Encoding.UTF8.GetBytes($"{{\"test\": \"{Guid.NewGuid()}\"}}");
var provenance = new EvidenceProvenance
{
GeneratorId = "test/scanner",
GeneratorVersion = "1.0.0",
GeneratedAt = DateTimeOffset.UtcNow,
CorrelationId = Guid.NewGuid().ToString(),
Environment = "test"
};
return EvidenceRecord.Create(
subject,
evidenceType,
payload,
provenance,
"1.0.0");
}
private static EvidenceRecord CreateTestEvidenceWithSignatures()
{
var subject = $"sha256:{Guid.NewGuid():N}";
var payload = Encoding.UTF8.GetBytes("{\"signed\": true}");
var provenance = EvidenceProvenance.CreateMinimal("test/signer", "1.0.0");
var signatures = new List<EvidenceSignature>
{
new()
{
SignerId = "signer-1",
Algorithm = "ES256",
SignatureBase64 = Convert.ToBase64String(new byte[] { 1, 2, 3 }),
SignedAt = DateTimeOffset.UtcNow,
SignerType = SignerType.Internal
},
new()
{
SignerId = "signer-2",
Algorithm = "RS256",
SignatureBase64 = Convert.ToBase64String(new byte[] { 4, 5, 6 }),
SignedAt = DateTimeOffset.UtcNow,
SignerType = SignerType.CI
}
};
return EvidenceRecord.Create(
subject,
EvidenceType.Provenance,
payload,
provenance,
"1.0.0",
signatures);
}
#endregion
}

View File

@@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<LangVersion>preview</LangVersion>
<IsPackable>false</IsPackable>
<IsTestProject>true</IsTestProject>
<RootNamespace>StellaOps.Evidence.Storage.Postgres.Tests</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="FluentAssertions" Version="7.0.0" />
<PackageReference Include="Testcontainers.PostgreSql" Version="4.1.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\StellaOps.Evidence.Storage.Postgres\StellaOps.Evidence.Storage.Postgres.csproj" />
<ProjectReference Include="..\..\StellaOps.Infrastructure.Postgres\StellaOps.Infrastructure.Postgres.csproj" />
</ItemGroup>
</Project>

View File

@@ -0,0 +1,667 @@
// -----------------------------------------------------------------------------
// AtLeastOnceDeliveryTests.cs
// Sprint: SPRINT_5100_0010_0003 - Router + Messaging Test Implementation
// Task: MESSAGING-5100-009 - At-least-once delivery with consumer idempotency
// Description: Integration tests verifying at-least-once delivery semantics:
// - Messages are never lost (guaranteed delivery)
// - Consumer idempotency correctly handles duplicate deliveries
// - Lease expiration triggers redelivery
// - Simulated failures result in message redelivery
// -----------------------------------------------------------------------------
using FluentAssertions;
using StellaOps.Messaging;
using StellaOps.Messaging.Abstractions;
using StellaOps.Messaging.Transport.Valkey.Tests.Fixtures;
using Xunit;
using Xunit.Abstractions;
namespace StellaOps.Messaging.Transport.Valkey.Tests;
/// <summary>
/// Tests for at-least-once delivery semantics with consumer idempotency.
///
/// At-least-once delivery guarantees:
/// 1. Every message sent is delivered at least once
/// 2. Messages may be delivered multiple times (redelivery on failure)
/// 3. Consumer idempotency handles duplicate deliveries
/// 4. No message is ever lost, even under failure conditions
/// </summary>
[Collection(ValkeyIntegrationTestCollection.Name)]
public sealed class AtLeastOnceDeliveryTests : IAsyncLifetime
{
private readonly ValkeyContainerFixture _fixture;
private readonly ITestOutputHelper _output;
private ValkeyConnectionFactory? _connectionFactory;
private ValkeyIdempotencyStore? _idempotencyStore;
public AtLeastOnceDeliveryTests(ValkeyContainerFixture fixture, ITestOutputHelper output)
{
_fixture = fixture;
_output = output;
}
public Task InitializeAsync()
{
_connectionFactory = _fixture.CreateConnectionFactory();
_idempotencyStore = new ValkeyIdempotencyStore(
_connectionFactory,
$"test-consumer-{Guid.NewGuid():N}",
null);
return Task.CompletedTask;
}
public async Task DisposeAsync()
{
if (_connectionFactory is not null)
{
await _connectionFactory.DisposeAsync();
}
}
#region At-Least-Once Delivery Guarantee Tests
[ValkeyIntegrationFact]
public async Task AtLeastOnce_MessageSent_IsDeliveredAtLeastOnce()
{
// Arrange - Producer sends message
var queue = CreateQueue<TestMessage>();
var messageId = Guid.NewGuid();
var message = new TestMessage
{
Id = messageId,
Content = "At-least-once test message"
};
// Act - Send message
var enqueueResult = await queue.EnqueueAsync(message);
enqueueResult.Success.Should().BeTrue("message should be accepted by the queue");
// Act - Consumer receives message
var leases = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
// Assert - Message is delivered
leases.Should().HaveCount(1, "message must be delivered at least once");
leases[0].Message.Id.Should().Be(messageId);
leases[0].Message.Content.Should().Be("At-least-once test message");
await leases[0].AcknowledgeAsync();
_output.WriteLine($"Message {messageId} delivered successfully");
}
[ValkeyIntegrationFact]
public async Task AtLeastOnce_UnacknowledgedLease_MessageRedelivered()
{
// Arrange - Create queue with short lease duration
var queueOptions = _fixture.CreateQueueOptions();
queueOptions.DefaultLeaseDuration = TimeSpan.FromMilliseconds(200);
var queue = CreateQueue<TestMessage>(queueOptions);
var messageId = Guid.NewGuid();
await queue.EnqueueAsync(new TestMessage { Id = messageId, Content = "Redelivery test" });
// Act - Lease message but don't acknowledge (simulating consumer crash)
var firstLease = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
firstLease.Should().HaveCount(1);
firstLease[0].Message.Id.Should().Be(messageId);
// Don't acknowledge - simulate crash
_output.WriteLine("Simulating consumer crash (not acknowledging message)");
// Wait for lease to expire
await Task.Delay(500);
// Act - Claim expired message (automatic redelivery)
var redelivered = await queue.ClaimExpiredAsync(new ClaimRequest
{
BatchSize = 10,
MinIdleTime = TimeSpan.FromMilliseconds(200),
MinDeliveryAttempts = 1
});
// Assert - Message is redelivered
redelivered.Should().HaveCount(1, "message must be redelivered after lease expiration");
redelivered[0].Message.Id.Should().Be(messageId);
redelivered[0].Attempt.Should().BeGreaterThan(1, "this should be a redelivery");
await redelivered[0].AcknowledgeAsync();
_output.WriteLine($"Message {messageId} successfully redelivered on attempt {redelivered[0].Attempt}");
}
[ValkeyIntegrationFact]
public async Task AtLeastOnce_MultipleMessages_AllDelivered()
{
// Arrange
var queue = CreateQueue<TestMessage>();
const int messageCount = 100;
var sentIds = new HashSet<Guid>();
// Act - Send multiple messages
for (int i = 0; i < messageCount; i++)
{
var id = Guid.NewGuid();
sentIds.Add(id);
await queue.EnqueueAsync(new TestMessage { Id = id, Content = $"Message-{i}" });
}
// Act - Receive all messages
var receivedIds = new HashSet<Guid>();
int remaining = messageCount;
while (remaining > 0)
{
var leases = await queue.LeaseAsync(new LeaseRequest { BatchSize = 20 });
foreach (var lease in leases)
{
receivedIds.Add(lease.Message.Id);
await lease.AcknowledgeAsync();
}
remaining -= leases.Count;
}
// Assert - All messages delivered
receivedIds.Should().BeEquivalentTo(sentIds, "all sent messages must be delivered");
_output.WriteLine($"All {messageCount} messages delivered successfully");
}
[ValkeyIntegrationFact]
public async Task AtLeastOnce_RetryAfterNack_MessageRedelivered()
{
// Arrange
var queueOptions = _fixture.CreateQueueOptions();
queueOptions.RetryInitialBackoff = TimeSpan.Zero; // Immediate retry for test speed
var queue = CreateQueue<TestMessage>(queueOptions);
var messageId = Guid.NewGuid();
await queue.EnqueueAsync(new TestMessage { Id = messageId, Content = "Retry test" });
// Act - First delivery, simulate processing failure with retry
var firstLease = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
firstLease.Should().HaveCount(1);
firstLease[0].Attempt.Should().Be(1);
// Nack for retry
await firstLease[0].ReleaseAsync(ReleaseDisposition.Retry);
_output.WriteLine("Message nacked for retry");
// Brief delay for retry processing
await Task.Delay(100);
// Act - Second delivery after retry
var secondLease = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
// Assert - Message is redelivered
secondLease.Should().HaveCount(1, "message must be redelivered after nack");
secondLease[0].Message.Id.Should().Be(messageId);
secondLease[0].Attempt.Should().Be(2, "this should be attempt 2");
await secondLease[0].AcknowledgeAsync();
_output.WriteLine($"Message {messageId} successfully processed on attempt 2");
}
#endregion
#region Consumer Idempotency Tests
[ValkeyIntegrationFact]
public async Task ConsumerIdempotency_DuplicateProcessing_DetectedAndSkipped()
{
// Arrange - Create a consumer with idempotency tracking
var queue = CreateQueue<TestMessage>();
var processedMessageIds = new HashSet<Guid>();
var processingCount = new Dictionary<Guid, int>();
var messageId = Guid.NewGuid();
await queue.EnqueueAsync(new TestMessage { Id = messageId, Content = "Idempotency test" });
// Act - Simulate receiving the message multiple times
for (int delivery = 1; delivery <= 3; delivery++)
{
// Simulate message delivery (could be redelivery)
var idempotencyKey = $"consumer-process:{messageId}";
var claimResult = await _idempotencyStore!.TryClaimAsync(
idempotencyKey,
messageId.ToString(),
TimeSpan.FromMinutes(5));
if (claimResult.IsFirstClaim)
{
// First time processing this message
processedMessageIds.Add(messageId);
processingCount[messageId] = 1;
_output.WriteLine($"Delivery {delivery}: First processing of message {messageId}");
}
else
{
// Duplicate - skip processing
processingCount[messageId] = processingCount.GetValueOrDefault(messageId) + 1;
_output.WriteLine($"Delivery {delivery}: Duplicate detected, skipping message {messageId}");
}
}
// Assert - Message processed exactly once despite multiple deliveries
processedMessageIds.Should().HaveCount(1);
processingCount[messageId].Should().BeGreaterThan(1, "we simulated multiple deliveries");
// Cleanup
var leases = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
if (leases.Count > 0)
{
await leases[0].AcknowledgeAsync();
}
}
[ValkeyIntegrationFact]
public async Task ConsumerIdempotency_ConcurrentDuplicates_OnlyOneProcessed()
{
// Arrange
var messageId = Guid.NewGuid();
var processedCount = 0;
var duplicateCount = 0;
var lockObject = new object();
// Simulate 10 concurrent consumers trying to process the same message
var tasks = Enumerable.Range(1, 10).Select(async consumerId =>
{
var idempotencyKey = $"concurrent-test:{messageId}";
var claimResult = await _idempotencyStore!.TryClaimAsync(
idempotencyKey,
$"consumer-{consumerId}",
TimeSpan.FromMinutes(5));
lock (lockObject)
{
if (claimResult.IsFirstClaim)
{
processedCount++;
_output.WriteLine($"Consumer {consumerId}: Processing message (first claim)");
}
else
{
duplicateCount++;
_output.WriteLine($"Consumer {consumerId}: Duplicate detected, existing value: {claimResult.ExistingValue}");
}
}
});
// Act
await Task.WhenAll(tasks);
// Assert - Exactly one consumer processed the message
processedCount.Should().Be(1, "only one consumer should process the message");
duplicateCount.Should().Be(9, "9 consumers should detect duplicate");
_output.WriteLine($"Processed: {processedCount}, Duplicates: {duplicateCount}");
}
[ValkeyIntegrationFact]
public async Task ConsumerIdempotency_IdempotencyWindowExpires_ReprocessingAllowed()
{
// Arrange
var messageId = Guid.NewGuid();
var shortWindow = TimeSpan.FromMilliseconds(200);
var idempotencyKey = $"window-test:{messageId}";
// Act - First claim
var firstClaim = await _idempotencyStore!.TryClaimAsync(
idempotencyKey,
"first-processor",
shortWindow);
firstClaim.IsFirstClaim.Should().BeTrue();
_output.WriteLine("First claim successful");
// Duplicate should be detected
var duplicateClaim = await _idempotencyStore!.TryClaimAsync(
idempotencyKey,
"duplicate-processor",
shortWindow);
duplicateClaim.IsDuplicate.Should().BeTrue();
_output.WriteLine("Duplicate detected as expected");
// Wait for window to expire
await Task.Delay(500);
// Act - After expiration, claim should succeed again
var afterExpiration = await _idempotencyStore!.TryClaimAsync(
idempotencyKey,
"new-processor",
shortWindow);
// Assert - Reprocessing allowed after window expiration
afterExpiration.IsFirstClaim.Should().BeTrue(
"after idempotency window expires, message can be reprocessed");
_output.WriteLine("After window expiration, new claim succeeded");
}
[ValkeyIntegrationFact]
public async Task ConsumerIdempotency_DifferentMessages_IndependentProcessing()
{
// Arrange - Three different messages
var messageIds = Enumerable.Range(1, 3).Select(_ => Guid.NewGuid()).ToList();
var processedIds = new List<Guid>();
// Act - Process each message (simulating first-time delivery)
foreach (var messageId in messageIds)
{
var idempotencyKey = $"different-msg-test:{messageId}";
var claimResult = await _idempotencyStore!.TryClaimAsync(
idempotencyKey,
messageId.ToString(),
TimeSpan.FromMinutes(5));
if (claimResult.IsFirstClaim)
{
processedIds.Add(messageId);
}
}
// Assert - All different messages processed
processedIds.Should().BeEquivalentTo(messageIds);
_output.WriteLine($"All {messageIds.Count} different messages processed independently");
}
#endregion
#region End-to-End At-Least-Once with Idempotency Tests
[ValkeyIntegrationFact]
public async Task EndToEnd_AtLeastOnceWithIdempotency_NoDuplicateProcessing()
{
// Arrange
var queueOptions = _fixture.CreateQueueOptions();
queueOptions.DefaultLeaseDuration = TimeSpan.FromMilliseconds(200);
var queue = CreateQueue<TestMessage>(queueOptions);
var messageId = Guid.NewGuid();
var processedIds = new HashSet<Guid>();
var deliveryCount = 0;
await queue.EnqueueAsync(new TestMessage { Id = messageId, Content = "E2E test" });
// Act - Consumer with idempotency-aware processing
// Simulate: first delivery - lease but crash, second delivery - process successfully
// First delivery (crash simulation - don't ack)
var firstLease = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
firstLease.Should().HaveCount(1);
deliveryCount++;
// Attempt to claim for processing
var firstClaim = await _idempotencyStore!.TryClaimAsync(
$"e2e-test:{firstLease[0].Message.Id}",
firstLease[0].MessageId,
TimeSpan.FromMinutes(5));
if (firstClaim.IsFirstClaim)
{
processedIds.Add(firstLease[0].Message.Id);
}
// Simulate crash - don't acknowledge
_output.WriteLine("First delivery: Processing started but consumer crashed");
// Wait for lease expiration
await Task.Delay(500);
// Claim expired message (redelivery)
var redelivered = await queue.ClaimExpiredAsync(new ClaimRequest
{
BatchSize = 1,
MinIdleTime = TimeSpan.FromMilliseconds(200),
MinDeliveryAttempts = 1
});
if (redelivered.Count > 0)
{
deliveryCount++;
// Attempt to claim again (should be duplicate)
var secondClaim = await _idempotencyStore!.TryClaimAsync(
$"e2e-test:{redelivered[0].Message.Id}",
redelivered[0].MessageId,
TimeSpan.FromMinutes(5));
if (secondClaim.IsFirstClaim)
{
processedIds.Add(redelivered[0].Message.Id);
}
else
{
_output.WriteLine($"Second delivery: Duplicate detected, skipping processing");
}
// This time, acknowledge
await redelivered[0].AcknowledgeAsync();
_output.WriteLine("Second delivery: Message acknowledged");
}
// Assert
processedIds.Should().HaveCount(1, "message should be processed exactly once");
deliveryCount.Should().BeGreaterThan(1, "message should be delivered at least twice (crash + redelivery)");
_output.WriteLine($"Total deliveries: {deliveryCount}, Unique processing: {processedIds.Count}");
}
[ValkeyIntegrationFact]
public async Task EndToEnd_BulkMessages_AtLeastOnceWithIdempotency()
{
// Arrange
var queue = CreateQueue<TestMessage>();
const int messageCount = 50;
var processedIds = new ConcurrentHashSet<Guid>();
var deliveryAttempts = new Dictionary<Guid, int>();
// Send messages
var sentIds = new List<Guid>();
for (int i = 0; i < messageCount; i++)
{
var id = Guid.NewGuid();
sentIds.Add(id);
await queue.EnqueueAsync(new TestMessage { Id = id, Content = $"Bulk-{i}" });
}
// Act - Process all messages with idempotency
int remaining = messageCount;
while (remaining > 0)
{
var leases = await queue.LeaseAsync(new LeaseRequest { BatchSize = 10 });
if (leases.Count == 0) break;
foreach (var lease in leases)
{
var msgId = lease.Message.Id;
deliveryAttempts[msgId] = deliveryAttempts.GetValueOrDefault(msgId) + 1;
// Check idempotency before processing
var claim = await _idempotencyStore!.TryClaimAsync(
$"bulk-test:{msgId}",
lease.MessageId,
TimeSpan.FromMinutes(5));
if (claim.IsFirstClaim)
{
processedIds.Add(msgId);
}
await lease.AcknowledgeAsync();
}
remaining -= leases.Count;
}
// Assert - All messages processed exactly once
processedIds.Count.Should().Be(messageCount, "all messages should be processed");
sentIds.Should().BeEquivalentTo(processedIds.ToList(), "all sent messages should be processed");
_output.WriteLine($"Processed {processedIds.Count}/{messageCount} messages with idempotency");
}
#endregion
#region Edge Cases
[ValkeyIntegrationFact]
public async Task EdgeCase_IdempotencyStore_ExtendWindow()
{
// Arrange
var messageId = Guid.NewGuid();
var idempotencyKey = $"extend-test:{messageId}";
var shortWindow = TimeSpan.FromSeconds(1);
// Act - Claim with short window
var claim = await _idempotencyStore!.TryClaimAsync(
idempotencyKey,
"original-value",
shortWindow);
claim.IsFirstClaim.Should().BeTrue();
// Extend the window
var extended = await _idempotencyStore!.ExtendAsync(
idempotencyKey,
TimeSpan.FromMinutes(5));
// Assert - Window extended
extended.Should().BeTrue();
// Duplicate should still be detected after original window would have expired
await Task.Delay(1500);
var afterOriginalExpiry = await _idempotencyStore!.TryClaimAsync(
idempotencyKey,
"new-value",
shortWindow);
afterOriginalExpiry.IsDuplicate.Should().BeTrue(
"window was extended, so duplicate should still be detected");
_output.WriteLine("Window extension verified - duplicate detected after original expiry");
}
[ValkeyIntegrationFact]
public async Task EdgeCase_IdempotencyStore_Release()
{
// Arrange
var messageId = Guid.NewGuid();
var idempotencyKey = $"release-test:{messageId}";
// Claim the key
var firstClaim = await _idempotencyStore!.TryClaimAsync(
idempotencyKey,
"first-value",
TimeSpan.FromMinutes(5));
firstClaim.IsFirstClaim.Should().BeTrue();
// Duplicate should be detected
var duplicate = await _idempotencyStore!.TryClaimAsync(
idempotencyKey,
"duplicate-value",
TimeSpan.FromMinutes(5));
duplicate.IsDuplicate.Should().BeTrue();
// Act - Release the key
var released = await _idempotencyStore!.ReleaseAsync(idempotencyKey);
released.Should().BeTrue();
// Assert - After release, key can be claimed again
var afterRelease = await _idempotencyStore!.TryClaimAsync(
idempotencyKey,
"new-value",
TimeSpan.FromMinutes(5));
afterRelease.IsFirstClaim.Should().BeTrue(
"after release, key should be claimable again");
_output.WriteLine("Release verified - key claimable after release");
}
[ValkeyIntegrationFact]
public async Task EdgeCase_IdempotencyStore_Exists()
{
// Arrange
var messageId = Guid.NewGuid();
var idempotencyKey = $"exists-test:{messageId}";
// Act - Check before claiming
var existsBefore = await _idempotencyStore!.ExistsAsync(idempotencyKey);
existsBefore.Should().BeFalse();
// Claim
await _idempotencyStore!.TryClaimAsync(idempotencyKey, "value", TimeSpan.FromMinutes(5));
// Check after claiming
var existsAfter = await _idempotencyStore!.ExistsAsync(idempotencyKey);
existsAfter.Should().BeTrue();
_output.WriteLine("Exists check verified");
}
[ValkeyIntegrationFact]
public async Task EdgeCase_IdempotencyStore_Get()
{
// Arrange
var messageId = Guid.NewGuid();
var idempotencyKey = $"get-test:{messageId}";
var storedValue = "stored-processor-id";
// Act - Get before claiming
var valueBefore = await _idempotencyStore!.GetAsync(idempotencyKey);
valueBefore.Should().BeNull();
// Claim
await _idempotencyStore!.TryClaimAsync(idempotencyKey, storedValue, TimeSpan.FromMinutes(5));
// Get after claiming
var valueAfter = await _idempotencyStore!.GetAsync(idempotencyKey);
// Assert
valueAfter.Should().Be(storedValue);
_output.WriteLine($"Get verified - stored value: {valueAfter}");
}
#endregion
#region Helpers
private ValkeyMessageQueue<TMessage> CreateQueue<TMessage>(
MessageQueueOptions? queueOptions = null)
where TMessage : class
{
queueOptions ??= _fixture.CreateQueueOptions();
var transportOptions = _fixture.CreateOptions();
return new ValkeyMessageQueue<TMessage>(
_connectionFactory!,
queueOptions,
transportOptions,
_fixture.GetLogger<ValkeyMessageQueue<TMessage>>());
}
#endregion
#region Test Types
public sealed class TestMessage
{
public Guid Id { get; set; }
public string? Content { get; set; }
}
/// <summary>
/// Thread-safe hash set for concurrent test scenarios.
/// </summary>
private sealed class ConcurrentHashSet<T> where T : notnull
{
private readonly HashSet<T> _set = new();
private readonly object _lock = new();
public bool Add(T item)
{
lock (_lock) return _set.Add(item);
}
public int Count
{
get { lock (_lock) return _set.Count; }
}
public List<T> ToList()
{
lock (_lock) return _set.ToList();
}
}
#endregion
}

View File

@@ -0,0 +1,203 @@
// -----------------------------------------------------------------------------
// ValkeyContainerFixture.cs
// Sprint: SPRINT_5100_0010_0003 - Router + Messaging Test Implementation
// Task: MESSAGING-5100-004 - Valkey transport compliance tests
// Description: Collection fixture providing a shared Valkey container for integration tests
// -----------------------------------------------------------------------------
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using StellaOps.Router.Testing.Fixtures;
using Testcontainers.Redis;
using Xunit.Sdk;
namespace StellaOps.Messaging.Transport.Valkey.Tests.Fixtures;
/// <summary>
/// Collection fixture that provides a shared Valkey container for integration tests.
/// Uses Redis container (Valkey is Redis-compatible).
/// Implements IAsyncLifetime to start/stop the container with the test collection.
/// </summary>
public sealed class ValkeyContainerFixture : RouterCollectionFixture, IAsyncDisposable
{
private RedisContainer? _container;
private bool _disposed;
/// <summary>
/// Gets the Valkey container hostname.
/// </summary>
public string HostName => _container?.Hostname ?? "localhost";
/// <summary>
/// Gets the Valkey container mapped port.
/// </summary>
public int Port => _container?.GetMappedPublicPort(6379) ?? 6379;
/// <summary>
/// Gets the connection string for the Valkey container.
/// </summary>
public string ConnectionString => $"{HostName}:{Port}";
/// <summary>
/// Gets a null logger for tests.
/// </summary>
public ILogger<T> GetLogger<T>() => NullLogger<T>.Instance;
/// <summary>
/// Gets whether the container is running.
/// </summary>
public bool IsRunning => _container is not null;
/// <summary>
/// Creates Valkey transport options configured for the test container.
/// </summary>
public ValkeyTransportOptions CreateOptions(int? database = null)
{
return new ValkeyTransportOptions
{
ConnectionString = ConnectionString,
Database = database,
InitializationTimeout = TimeSpan.FromSeconds(30),
ConnectRetry = 3,
AbortOnConnectFail = false,
IdempotencyKeyPrefix = "test:idem:"
};
}
/// <summary>
/// Creates a ValkeyConnectionFactory configured for the test container.
/// </summary>
public ValkeyConnectionFactory CreateConnectionFactory(int? database = null)
{
var options = CreateOptions(database);
return new ValkeyConnectionFactory(
Options.Create(options),
GetLogger<ValkeyConnectionFactory>());
}
/// <summary>
/// Creates message queue options for testing.
/// </summary>
public StellaOps.Messaging.MessageQueueOptions CreateQueueOptions(
string? queueName = null,
string? consumerGroup = null,
string? consumerName = null)
{
return new StellaOps.Messaging.MessageQueueOptions
{
QueueName = queueName ?? $"test:queue:{Guid.NewGuid():N}",
ConsumerGroup = consumerGroup ?? "test-group",
ConsumerName = consumerName ?? $"consumer-{Environment.ProcessId}",
DefaultLeaseDuration = TimeSpan.FromSeconds(30),
MaxDeliveryAttempts = 3,
IdempotencyWindow = TimeSpan.FromMinutes(5),
ApproximateMaxLength = 10000,
RetryInitialBackoff = TimeSpan.FromMilliseconds(100),
RetryMaxBackoff = TimeSpan.FromSeconds(10),
RetryBackoffMultiplier = 2.0
};
}
/// <summary>
/// Creates a ValkeyMessageQueue for testing.
/// </summary>
public ValkeyMessageQueue<TMessage> CreateMessageQueue<TMessage>(
ValkeyConnectionFactory? connectionFactory = null,
StellaOps.Messaging.MessageQueueOptions? queueOptions = null,
TimeProvider? timeProvider = null)
where TMessage : class
{
connectionFactory ??= CreateConnectionFactory();
queueOptions ??= CreateQueueOptions();
var transportOptions = CreateOptions();
return new ValkeyMessageQueue<TMessage>(
connectionFactory,
queueOptions,
transportOptions,
GetLogger<ValkeyMessageQueue<TMessage>>(),
timeProvider);
}
/// <summary>
/// Restarts the container.
/// </summary>
public async Task RestartAsync()
{
if (_container is null)
{
throw new InvalidOperationException("Valkey container is not running.");
}
await _container.StopAsync();
await _container.StartAsync();
}
/// <inheritdoc />
public override async Task InitializeAsync()
{
try
{
_container = new RedisBuilder()
.WithImage("valkey/valkey:8-alpine")
.WithPortBinding(6379, true)
.Build();
await _container.StartAsync();
}
catch (Exception ex)
{
try
{
if (_container is not null)
{
await _container.DisposeAsync();
}
}
catch
{
// Ignore cleanup failures during skip.
}
_container = null;
throw SkipException.ForSkip(
$"Valkey integration tests require Docker/Testcontainers. Skipping because the container failed to start: {ex.Message}");
}
}
/// <inheritdoc />
public override async Task DisposeAsync()
{
await DisposeAsyncCore();
}
async ValueTask IAsyncDisposable.DisposeAsync()
{
await DisposeAsyncCore();
GC.SuppressFinalize(this);
}
private async Task DisposeAsyncCore()
{
if (_disposed) return;
_disposed = true;
if (_container is not null)
{
await _container.StopAsync();
await _container.DisposeAsync();
}
}
}
/// <summary>
/// Collection definition for Valkey integration tests.
/// All tests in this collection share a single Valkey container.
/// </summary>
[CollectionDefinition(Name)]
public sealed class ValkeyIntegrationTestCollection : ICollectionFixture<ValkeyContainerFixture>
{
public const string Name = "Valkey Integration Tests";
}

View File

@@ -0,0 +1,46 @@
// -----------------------------------------------------------------------------
// ValkeyIntegrationFactAttribute.cs
// Sprint: SPRINT_5100_0010_0003 - Router + Messaging Test Implementation
// Task: MESSAGING-5100-004 - Valkey transport compliance tests
// Description: Attribute that skips Valkey integration tests when Docker is not available
// -----------------------------------------------------------------------------
using Xunit;
namespace StellaOps.Messaging.Transport.Valkey.Tests.Fixtures;
/// <summary>
/// Fact attribute for Valkey integration tests.
/// Skips tests when STELLAOPS_TEST_VALKEY environment variable is not set.
/// </summary>
[AttributeUsage(AttributeTargets.Method)]
public sealed class ValkeyIntegrationFactAttribute : FactAttribute
{
public ValkeyIntegrationFactAttribute()
{
var enabled = Environment.GetEnvironmentVariable("STELLAOPS_TEST_VALKEY");
if (!string.Equals(enabled, "1", StringComparison.OrdinalIgnoreCase) &&
!string.Equals(enabled, "true", StringComparison.OrdinalIgnoreCase))
{
Skip = "Valkey integration tests are opt-in. Set STELLAOPS_TEST_VALKEY=1 (requires Docker/Testcontainers).";
}
}
}
/// <summary>
/// Theory attribute for Valkey integration tests.
/// Skips tests when STELLAOPS_TEST_VALKEY environment variable is not set.
/// </summary>
[AttributeUsage(AttributeTargets.Method)]
public sealed class ValkeyIntegrationTheoryAttribute : TheoryAttribute
{
public ValkeyIntegrationTheoryAttribute()
{
var enabled = Environment.GetEnvironmentVariable("STELLAOPS_TEST_VALKEY");
if (!string.Equals(enabled, "1", StringComparison.OrdinalIgnoreCase) &&
!string.Equals(enabled, "true", StringComparison.OrdinalIgnoreCase))
{
Skip = "Valkey integration tests are opt-in. Set STELLAOPS_TEST_VALKEY=1 (requires Docker/Testcontainers).";
}
}
}

View File

@@ -0,0 +1,41 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<LangVersion>preview</LangVersion>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<TreatWarningsAsErrors>false</TreatWarningsAsErrors>
<!-- Suppress CA2255 from OpenSSL auto-init shim included via Directory.Build.props -->
<NoWarn>$(NoWarn);CA2255</NoWarn>
<IsPackable>false</IsPackable>
<RootNamespace>StellaOps.Messaging.Transport.Valkey.Tests</RootNamespace>
<!-- Disable Concelier test infrastructure since not needed for Messaging tests -->
<UseConcelierTestInfra>false</UseConcelierTestInfra>
</PropertyGroup>
<ItemGroup>
<Using Include="Xunit" />
<Using Include="FluentAssertions" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.14.0" />
<PackageReference Include="xunit" Version="2.9.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.2">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="FluentAssertions" Version="6.12.0" />
<PackageReference Include="Moq" Version="4.20.70" />
<PackageReference Include="Testcontainers.Redis" Version="3.9.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="10.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\StellaOps.Messaging.Transport.Valkey\StellaOps.Messaging.Transport.Valkey.csproj" />
<ProjectReference Include="..\StellaOps.Router.Testing\StellaOps.Router.Testing.csproj" />
</ItemGroup>
</Project>

View File

@@ -0,0 +1,723 @@
// -----------------------------------------------------------------------------
// ValkeyTransportComplianceTests.cs
// Sprint: SPRINT_5100_0010_0003 - Router + Messaging Test Implementation
// Task: MESSAGING-5100-004 - Valkey transport compliance tests
// Description: Transport compliance tests for Valkey transport covering roundtrip,
// pub/sub semantics, consumer groups, ack/nack, and backpressure.
// -----------------------------------------------------------------------------
using System.Text.Json;
using FluentAssertions;
using StellaOps.Messaging;
using StellaOps.Messaging.Abstractions;
using StellaOps.Messaging.Transport.Valkey.Tests.Fixtures;
using Xunit;
using Xunit.Abstractions;
namespace StellaOps.Messaging.Transport.Valkey.Tests;
/// <summary>
/// Transport compliance tests for Valkey transport.
/// Validates:
/// - Message roundtrip (enqueue → lease → message preserved)
/// - Consumer group semantics (exclusive delivery, multiple consumers)
/// - Ack/Nack behavior (acknowledge, release, dead-letter)
/// - Idempotency (duplicate detection)
/// - Backpressure (batch limits, pending counts)
/// - Lease management (renewal, expiration, claiming)
/// </summary>
[Collection(ValkeyIntegrationTestCollection.Name)]
public sealed class ValkeyTransportComplianceTests : IAsyncLifetime
{
private readonly ValkeyContainerFixture _fixture;
private readonly ITestOutputHelper _output;
private ValkeyConnectionFactory? _connectionFactory;
public ValkeyTransportComplianceTests(ValkeyContainerFixture fixture, ITestOutputHelper output)
{
_fixture = fixture;
_output = output;
}
public Task InitializeAsync()
{
_connectionFactory = _fixture.CreateConnectionFactory();
return Task.CompletedTask;
}
public async Task DisposeAsync()
{
if (_connectionFactory is not null)
{
await _connectionFactory.DisposeAsync();
}
}
#region Message Roundtrip Tests
[ValkeyIntegrationFact]
public async Task Roundtrip_SimpleMessage_AllFieldsPreserved()
{
// Arrange
var queue = CreateQueue<TestMessage>();
var original = new TestMessage
{
Id = Guid.NewGuid(),
Content = "Hello Valkey!",
Timestamp = DateTimeOffset.UtcNow,
Tags = new[] { "tag1", "tag2" }
};
// Act
var enqueueResult = await queue.EnqueueAsync(original);
enqueueResult.Success.Should().BeTrue();
enqueueResult.MessageId.Should().NotBeNullOrEmpty();
var leases = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
// Assert
leases.Should().HaveCount(1);
var lease = leases[0];
lease.Message.Id.Should().Be(original.Id);
lease.Message.Content.Should().Be(original.Content);
lease.Message.Tags.Should().BeEquivalentTo(original.Tags);
lease.Attempt.Should().Be(1);
await lease.AcknowledgeAsync();
_output.WriteLine("Roundtrip test passed");
}
[ValkeyIntegrationFact]
public async Task Roundtrip_ComplexMessage_PreservedAfterSerialization()
{
// Arrange
var queue = CreateQueue<ComplexMessage>();
var original = new ComplexMessage
{
Id = Guid.NewGuid(),
Metadata = new Dictionary<string, object>
{
["key1"] = "value1",
["key2"] = 42,
["key3"] = true
},
NestedData = new NestedObject
{
Name = "nested",
Value = 123.45m
}
};
// Act
await queue.EnqueueAsync(original);
var leases = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
// Assert
var lease = leases[0];
lease.Message.Id.Should().Be(original.Id);
lease.Message.NestedData.Name.Should().Be(original.NestedData.Name);
lease.Message.NestedData.Value.Should().Be(original.NestedData.Value);
await lease.AcknowledgeAsync();
_output.WriteLine("Complex message roundtrip test passed");
}
[ValkeyIntegrationFact]
public async Task Roundtrip_BinaryData_PreservesAllBytes()
{
// Arrange
var queue = CreateQueue<BinaryMessage>();
var binaryPayload = Enumerable.Range(0, 256).Select(i => (byte)i).ToArray();
var original = new BinaryMessage
{
Id = Guid.NewGuid(),
Data = binaryPayload
};
// Act
await queue.EnqueueAsync(original);
var leases = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
// Assert
leases[0].Message.Data.Should().BeEquivalentTo(binaryPayload);
await leases[0].AcknowledgeAsync();
_output.WriteLine("Binary data roundtrip test passed");
}
[ValkeyIntegrationTheory]
[InlineData(1)]
[InlineData(10)]
[InlineData(100)]
[InlineData(1000)]
public async Task Roundtrip_MultipleMessages_OrderPreserved(int messageCount)
{
// Arrange
var queue = CreateQueue<TestMessage>();
var messages = Enumerable.Range(1, messageCount)
.Select(i => new TestMessage
{
Id = Guid.NewGuid(),
Content = $"Message-{i:D5}",
Timestamp = DateTimeOffset.UtcNow.AddMilliseconds(i)
})
.ToList();
// Act - Enqueue all
foreach (var msg in messages)
{
await queue.EnqueueAsync(msg);
}
// Lease and verify order
var receivedContents = new List<string>();
int remaining = messageCount;
while (remaining > 0)
{
var batchSize = Math.Min(remaining, 50);
var leases = await queue.LeaseAsync(new LeaseRequest { BatchSize = batchSize });
foreach (var lease in leases)
{
receivedContents.Add(lease.Message.Content!);
await lease.AcknowledgeAsync();
}
remaining -= leases.Count;
}
// Assert - FIFO order preserved
var expectedContents = messages.Select(m => m.Content).ToList();
receivedContents.Should().BeEquivalentTo(expectedContents, options => options.WithStrictOrdering());
_output.WriteLine($"Order preserved for {messageCount} messages");
}
#endregion
#region Consumer Group Semantics Tests
[ValkeyIntegrationFact]
public async Task ConsumerGroup_MultipleConsumers_ExclusiveDelivery()
{
// Arrange - Two consumers in same group
var queueOptions = _fixture.CreateQueueOptions();
var queue1 = CreateQueue<TestMessage>(queueOptions: queueOptions, consumerName: "consumer-1");
var queue2 = CreateQueue<TestMessage>(queueOptions: queueOptions, consumerName: "consumer-2");
var messages = Enumerable.Range(1, 20)
.Select(i => new TestMessage { Id = Guid.NewGuid(), Content = $"Msg-{i}" })
.ToList();
foreach (var msg in messages)
{
await queue1.EnqueueAsync(msg);
}
// Act - Both consumers lease
var leases1 = await queue1.LeaseAsync(new LeaseRequest { BatchSize = 10 });
var leases2 = await queue2.LeaseAsync(new LeaseRequest { BatchSize = 10 });
// Assert - Messages should be distributed (no duplicates)
var allIds = leases1.Concat(leases2).Select(l => l.Message.Id).ToList();
allIds.Should().OnlyHaveUniqueItems("each message should be delivered to only one consumer");
allIds.Should().HaveCount(20, "all messages should be delivered");
// Cleanup
foreach (var lease in leases1.Concat(leases2))
{
await lease.AcknowledgeAsync();
}
_output.WriteLine("Exclusive delivery test passed");
}
[ValkeyIntegrationFact]
public async Task ConsumerGroup_DifferentGroups_EachReceivesAllMessages()
{
// Arrange - Two different consumer groups
var queueName = $"test:queue:{Guid.NewGuid():N}";
var options1 = _fixture.CreateQueueOptions(queueName: queueName, consumerGroup: "group-1");
var options2 = _fixture.CreateQueueOptions(queueName: queueName, consumerGroup: "group-2");
var queue1 = CreateQueue<TestMessage>(queueOptions: options1);
var queue2 = CreateQueue<TestMessage>(queueOptions: options2);
var message = new TestMessage { Id = Guid.NewGuid(), Content = "Shared message" };
// Act - Enqueue to one queue (same stream)
await queue1.EnqueueAsync(message);
// Both groups should receive the message
var leases1 = await queue1.LeaseAsync(new LeaseRequest { BatchSize = 1 });
var leases2 = await queue2.LeaseAsync(new LeaseRequest { BatchSize = 1 });
// Assert
leases1.Should().HaveCount(1);
leases2.Should().HaveCount(1);
leases1[0].Message.Id.Should().Be(message.Id);
leases2[0].Message.Id.Should().Be(message.Id);
await leases1[0].AcknowledgeAsync();
await leases2[0].AcknowledgeAsync();
_output.WriteLine("Different groups test passed");
}
#endregion
#region Ack/Nack/Release Semantics Tests
[ValkeyIntegrationFact]
public async Task Acknowledge_RemovesMessageFromQueue()
{
// Arrange
var queue = CreateQueue<TestMessage>();
await queue.EnqueueAsync(new TestMessage { Id = Guid.NewGuid(), Content = "Ack test" });
// Act
var leases = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
await leases[0].AcknowledgeAsync();
// Assert - No more messages
var pending = await queue.GetPendingCountAsync();
pending.Should().Be(0);
var moreLeases = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
moreLeases.Should().BeEmpty();
_output.WriteLine("Acknowledge removes message test passed");
}
[ValkeyIntegrationFact]
public async Task Release_Retry_MessageBecomesAvailableAgain()
{
// Arrange
var queueOptions = _fixture.CreateQueueOptions();
queueOptions.RetryInitialBackoff = TimeSpan.Zero; // No backoff for test speed
var queue = CreateQueue<TestMessage>(queueOptions: queueOptions);
var message = new TestMessage { Id = Guid.NewGuid(), Content = "Retry test" };
await queue.EnqueueAsync(message);
// Act - Lease and release for retry
var leases = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
leases.Should().HaveCount(1);
leases[0].Attempt.Should().Be(1);
await leases[0].ReleaseAsync(ReleaseDisposition.Retry);
// Wait briefly for re-enqueue
await Task.Delay(100);
// Lease again
var retryLeases = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
// Assert
retryLeases.Should().HaveCount(1);
retryLeases[0].Message.Id.Should().Be(message.Id);
retryLeases[0].Attempt.Should().Be(2);
await retryLeases[0].AcknowledgeAsync();
_output.WriteLine("Release retry test passed");
}
[ValkeyIntegrationFact]
public async Task DeadLetter_MovesMessageToDeadLetterQueue()
{
// Arrange
var mainQueueName = $"test:main:{Guid.NewGuid():N}";
var dlqName = $"test:dlq:{Guid.NewGuid():N}";
var mainOptions = _fixture.CreateQueueOptions(queueName: mainQueueName);
mainOptions.DeadLetterQueue = dlqName;
var dlqOptions = _fixture.CreateQueueOptions(queueName: dlqName);
var mainQueue = CreateQueue<TestMessage>(queueOptions: mainOptions);
var dlqQueue = CreateQueue<TestMessage>(queueOptions: dlqOptions);
var message = new TestMessage { Id = Guid.NewGuid(), Content = "DLQ test" };
await mainQueue.EnqueueAsync(message);
// Act
var leases = await mainQueue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
await leases[0].DeadLetterAsync("test-reason");
// Assert - Message should be in DLQ
var dlqLeases = await dlqQueue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
dlqLeases.Should().HaveCount(1);
dlqLeases[0].Message.Id.Should().Be(message.Id);
// Main queue should be empty
var mainPending = await mainQueue.GetPendingCountAsync();
mainPending.Should().Be(0);
await dlqLeases[0].AcknowledgeAsync();
_output.WriteLine("Dead letter test passed");
}
[ValkeyIntegrationFact]
public async Task MaxDeliveryAttempts_ExceededCausesDeadLetter()
{
// Arrange
var mainQueueName = $"test:main:{Guid.NewGuid():N}";
var dlqName = $"test:dlq:{Guid.NewGuid():N}";
var mainOptions = _fixture.CreateQueueOptions(queueName: mainQueueName);
mainOptions.MaxDeliveryAttempts = 3;
mainOptions.DeadLetterQueue = dlqName;
mainOptions.RetryInitialBackoff = TimeSpan.Zero;
var dlqOptions = _fixture.CreateQueueOptions(queueName: dlqName);
var mainQueue = CreateQueue<TestMessage>(queueOptions: mainOptions);
var dlqQueue = CreateQueue<TestMessage>(queueOptions: dlqOptions);
var message = new TestMessage { Id = Guid.NewGuid(), Content = "Max attempts test" };
await mainQueue.EnqueueAsync(message);
// Act - Retry until max attempts exceeded
for (int i = 0; i < 3; i++)
{
var leases = await mainQueue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
if (leases.Count == 0) break;
await leases[0].ReleaseAsync(ReleaseDisposition.Retry);
await Task.Delay(50);
}
// Wait for final retry to dead-letter
await Task.Delay(200);
// Assert - Message should be in DLQ
var dlqLeases = await dlqQueue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
dlqLeases.Should().HaveCount(1);
dlqLeases[0].Message.Id.Should().Be(message.Id);
await dlqLeases[0].AcknowledgeAsync();
_output.WriteLine("Max delivery attempts test passed");
}
#endregion
#region Idempotency Tests
[ValkeyIntegrationFact]
public async Task Idempotency_DuplicateKey_ReturnsExistingMessage()
{
// Arrange
var queue = CreateQueue<TestMessage>();
var idempotencyKey = Guid.NewGuid().ToString();
var message = new TestMessage { Id = Guid.NewGuid(), Content = "Idempotent message" };
// Act - Enqueue twice with same key
var result1 = await queue.EnqueueAsync(message, EnqueueOptions.WithIdempotencyKey(idempotencyKey));
var result2 = await queue.EnqueueAsync(message, EnqueueOptions.WithIdempotencyKey(idempotencyKey));
// Assert
result1.Success.Should().BeTrue();
result1.WasDuplicate.Should().BeFalse();
result2.Success.Should().BeTrue();
result2.WasDuplicate.Should().BeTrue();
result2.MessageId.Should().Be(result1.MessageId);
// Only one message should be in queue
var leases = await queue.LeaseAsync(new LeaseRequest { BatchSize = 10 });
leases.Should().HaveCount(1);
await leases[0].AcknowledgeAsync();
_output.WriteLine("Idempotency test passed");
}
[ValkeyIntegrationFact]
public async Task Idempotency_DifferentKeys_BothMessagesEnqueued()
{
// Arrange
var queue = CreateQueue<TestMessage>();
var message1 = new TestMessage { Id = Guid.NewGuid(), Content = "Message 1" };
var message2 = new TestMessage { Id = Guid.NewGuid(), Content = "Message 2" };
// Act
await queue.EnqueueAsync(message1, EnqueueOptions.WithIdempotencyKey("key-1"));
await queue.EnqueueAsync(message2, EnqueueOptions.WithIdempotencyKey("key-2"));
// Assert
var leases = await queue.LeaseAsync(new LeaseRequest { BatchSize = 10 });
leases.Should().HaveCount(2);
foreach (var lease in leases)
{
await lease.AcknowledgeAsync();
}
_output.WriteLine("Different idempotency keys test passed");
}
#endregion
#region Backpressure Tests
[ValkeyIntegrationFact]
public async Task Backpressure_BatchSize_LimitsMessageCount()
{
// Arrange
var queue = CreateQueue<TestMessage>();
for (int i = 0; i < 100; i++)
{
await queue.EnqueueAsync(new TestMessage { Id = Guid.NewGuid(), Content = $"Msg-{i}" });
}
// Act - Request only 10
var leases = await queue.LeaseAsync(new LeaseRequest { BatchSize = 10 });
// Assert
leases.Should().HaveCount(10);
// Cleanup
foreach (var lease in leases)
{
await lease.AcknowledgeAsync();
}
// Remaining messages
var pending = await queue.GetPendingCountAsync();
pending.Should().Be(0); // Not pending because not leased yet
_output.WriteLine("Batch size backpressure test passed");
}
[ValkeyIntegrationFact]
public async Task Backpressure_PendingCount_ReflectsUnacknowledged()
{
// Arrange
var queue = CreateQueue<TestMessage>();
for (int i = 0; i < 50; i++)
{
await queue.EnqueueAsync(new TestMessage { Id = Guid.NewGuid(), Content = $"Msg-{i}" });
}
// Act - Lease 30, ack 10
var leases = await queue.LeaseAsync(new LeaseRequest { BatchSize = 30 });
leases.Should().HaveCount(30);
for (int i = 0; i < 10; i++)
{
await leases[i].AcknowledgeAsync();
}
// Assert - 20 still pending
var pending = await queue.GetPendingCountAsync();
pending.Should().Be(20);
// Cleanup
for (int i = 10; i < 30; i++)
{
await leases[i].AcknowledgeAsync();
}
_output.WriteLine("Pending count test passed");
}
[ValkeyIntegrationFact]
public async Task Backpressure_EmptyQueue_ReturnsEmpty()
{
// Arrange
var queue = CreateQueue<TestMessage>();
// Act
var leases = await queue.LeaseAsync(new LeaseRequest { BatchSize = 10 });
// Assert
leases.Should().BeEmpty();
_output.WriteLine("Empty queue test passed");
}
#endregion
#region Lease Management Tests
[ValkeyIntegrationFact]
public async Task LeaseRenewal_ExtendsLeaseTime()
{
// Arrange
var queue = CreateQueue<TestMessage>();
await queue.EnqueueAsync(new TestMessage { Id = Guid.NewGuid(), Content = "Renewal test" });
// Act
var leases = await queue.LeaseAsync(new LeaseRequest
{
BatchSize = 1,
LeaseDuration = TimeSpan.FromSeconds(30)
});
var originalExpiry = leases[0].LeaseExpiresAt;
await leases[0].RenewAsync(TimeSpan.FromMinutes(5));
// Assert - Lease should be extended
leases[0].LeaseExpiresAt.Should().BeAfter(originalExpiry);
await leases[0].AcknowledgeAsync();
_output.WriteLine("Lease renewal test passed");
}
[ValkeyIntegrationFact]
public async Task ClaimExpired_RecoversStaleMessages()
{
// Arrange
var queueOptions = _fixture.CreateQueueOptions();
queueOptions.DefaultLeaseDuration = TimeSpan.FromMilliseconds(100);
var queue = CreateQueue<TestMessage>(queueOptions: queueOptions);
await queue.EnqueueAsync(new TestMessage { Id = Guid.NewGuid(), Content = "Stale test" });
// Lease and let expire
var leases = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
leases.Should().HaveCount(1);
// Wait for lease to expire
await Task.Delay(500);
// Act - Claim expired
var claimed = await queue.ClaimExpiredAsync(new ClaimRequest
{
BatchSize = 10,
MinIdleTime = TimeSpan.FromMilliseconds(100),
MinDeliveryAttempts = 1
});
// Assert
claimed.Should().HaveCount(1);
claimed[0].Message.Content.Should().Be("Stale test");
claimed[0].Attempt.Should().BeGreaterThan(1);
await claimed[0].AcknowledgeAsync();
_output.WriteLine("Claim expired test passed");
}
#endregion
#region Metadata/Headers Tests
[ValkeyIntegrationFact]
public async Task Metadata_CorrelationId_PreservedInLease()
{
// Arrange
var queue = CreateQueue<TestMessage>();
var correlationId = Guid.NewGuid().ToString();
var message = new TestMessage { Id = Guid.NewGuid(), Content = "Correlation test" };
// Act
await queue.EnqueueAsync(message, EnqueueOptions.WithCorrelation(correlationId));
var leases = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
// Assert
leases[0].CorrelationId.Should().Be(correlationId);
await leases[0].AcknowledgeAsync();
_output.WriteLine("Correlation ID test passed");
}
[ValkeyIntegrationFact]
public async Task Metadata_TenantId_PreservedInLease()
{
// Arrange
var queue = CreateQueue<TestMessage>();
var tenantId = "tenant-123";
var message = new TestMessage { Id = Guid.NewGuid(), Content = "Tenant test" };
// Act
await queue.EnqueueAsync(message, new EnqueueOptions { TenantId = tenantId });
var leases = await queue.LeaseAsync(new LeaseRequest { BatchSize = 1 });
// Assert
leases[0].TenantId.Should().Be(tenantId);
await leases[0].AcknowledgeAsync();
_output.WriteLine("Tenant ID test passed");
}
#endregion
#region Connection Resilience Tests
[ValkeyIntegrationFact]
public async Task ConnectionResilience_Ping_Succeeds()
{
// Arrange & Act
var act = async () => await _connectionFactory!.PingAsync();
// Assert
await act.Should().NotThrowAsync();
_output.WriteLine("Ping test passed");
}
[ValkeyIntegrationFact]
public async Task ConnectionResilience_QueueProviderName_IsValkey()
{
// Arrange
var queue = CreateQueue<TestMessage>();
// Assert
queue.ProviderName.Should().Be("valkey");
queue.QueueName.Should().NotBeNullOrEmpty();
_output.WriteLine("Provider name test passed");
await Task.CompletedTask;
}
#endregion
#region Helpers
private ValkeyMessageQueue<TMessage> CreateQueue<TMessage>(
MessageQueueOptions? queueOptions = null,
string? consumerName = null)
where TMessage : class
{
queueOptions ??= _fixture.CreateQueueOptions();
if (consumerName is not null)
{
queueOptions.ConsumerName = consumerName;
}
var transportOptions = _fixture.CreateOptions();
return new ValkeyMessageQueue<TMessage>(
_connectionFactory!,
queueOptions,
transportOptions,
_fixture.GetLogger<ValkeyMessageQueue<TMessage>>());
}
#endregion
#region Test Message Types
public sealed class TestMessage
{
public Guid Id { get; set; }
public string? Content { get; set; }
public DateTimeOffset Timestamp { get; set; }
public string[]? Tags { get; set; }
}
public sealed class ComplexMessage
{
public Guid Id { get; set; }
public Dictionary<string, object>? Metadata { get; set; }
public NestedObject? NestedData { get; set; }
}
public sealed class NestedObject
{
public string? Name { get; set; }
public decimal Value { get; set; }
}
public sealed class BinaryMessage
{
public Guid Id { get; set; }
public byte[]? Data { get; set; }
}
#endregion
}

View File

@@ -0,0 +1,319 @@
using FluentAssertions;
using Xunit;
namespace StellaOps.Provcache.Tests;
/// <summary>
/// Determinism tests for DecisionDigestBuilder.
/// Verifies that same inputs always produce the same DecisionDigest.
/// </summary>
public class DecisionDigestBuilderDeterminismTests
{
private readonly ProvcacheOptions _options = new()
{
DigestVersion = "v1",
DefaultTtl = TimeSpan.FromHours(24)
};
private readonly FakeTimeProvider _timeProvider;
public DecisionDigestBuilderDeterminismTests()
{
_timeProvider = new FakeTimeProvider(new DateTimeOffset(2024, 12, 24, 12, 0, 0, TimeSpan.Zero));
}
[Fact]
public void Build_SameInputs_ProducesSameDigest()
{
// Arrange
var dispositions = new Dictionary<string, string>
{
["CVE-2024-001"] = "fixed",
["CVE-2024-002"] = "affected",
["CVE-2024-003"] = "not_affected"
};
var evidenceChunks = new List<string>
{
"sha256:a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2",
"sha256:b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3",
"sha256:c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4"
};
// Act
var digest1 = CreateBuilder()
.WithDispositions(dispositions)
.WithEvidenceChunks(evidenceChunks)
.Build();
var digest2 = CreateBuilder()
.WithDispositions(dispositions)
.WithEvidenceChunks(evidenceChunks)
.Build();
// Assert
digest1.VerdictHash.Should().Be(digest2.VerdictHash);
digest1.ProofRoot.Should().Be(digest2.ProofRoot);
digest1.TrustScore.Should().Be(digest2.TrustScore);
}
[Fact]
public void Build_DispositionsInDifferentOrder_ProducesSameVerdictHash()
{
// Arrange - Same dispositions, different insertion order
var dispositions1 = new Dictionary<string, string>
{
["CVE-2024-001"] = "fixed",
["CVE-2024-002"] = "affected",
["CVE-2024-003"] = "not_affected"
};
var dispositions2 = new Dictionary<string, string>
{
["CVE-2024-003"] = "not_affected",
["CVE-2024-001"] = "fixed",
["CVE-2024-002"] = "affected"
};
// Act
var digest1 = CreateBuilder().WithDispositions(dispositions1).Build();
var digest2 = CreateBuilder().WithDispositions(dispositions2).Build();
// Assert - Should be same because dispositions are sorted by key
digest1.VerdictHash.Should().Be(digest2.VerdictHash);
}
[Fact]
public void Build_DifferentDispositions_ProducesDifferentVerdictHash()
{
// Arrange
var dispositions1 = new Dictionary<string, string> { ["CVE-2024-001"] = "fixed" };
var dispositions2 = new Dictionary<string, string> { ["CVE-2024-001"] = "affected" };
// Act
var digest1 = CreateBuilder().WithDispositions(dispositions1).Build();
var digest2 = CreateBuilder().WithDispositions(dispositions2).Build();
// Assert
digest1.VerdictHash.Should().NotBe(digest2.VerdictHash);
}
[Fact]
public void Build_SameEvidenceChunks_ProducesSameMerkleRoot()
{
// Arrange - valid SHA256 hex hashes (64 characters each)
var chunks = new List<string>
{
"sha256:a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1",
"sha256:b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2",
"sha256:c3c3c3c3c3c3c3c3c3c3c3c3c3c3c3c3c3c3c3c3c3c3c3c3c3c3c3c3c3c3c3c3",
"sha256:d4d4d4d4d4d4d4d4d4d4d4d4d4d4d4d4d4d4d4d4d4d4d4d4d4d4d4d4d4d4d4d4"
};
// Act
var digest1 = CreateBuilder().WithEvidenceChunks(chunks).Build();
var digest2 = CreateBuilder().WithEvidenceChunks(chunks).Build();
// Assert
digest1.ProofRoot.Should().Be(digest2.ProofRoot);
}
[Fact]
public void Build_DifferentEvidenceChunkOrder_ProducesDifferentMerkleRoot()
{
// Arrange - Merkle tree is order-sensitive (valid SHA256 hex hashes)
var chunks1 = new List<string>
{
"sha256:aaaa1111aaaa1111aaaa1111aaaa1111aaaa1111aaaa1111aaaa1111aaaa1111",
"sha256:bbbb2222bbbb2222bbbb2222bbbb2222bbbb2222bbbb2222bbbb2222bbbb2222"
};
var chunks2 = new List<string>
{
"sha256:bbbb2222bbbb2222bbbb2222bbbb2222bbbb2222bbbb2222bbbb2222bbbb2222",
"sha256:aaaa1111aaaa1111aaaa1111aaaa1111aaaa1111aaaa1111aaaa1111aaaa1111"
};
// Act
var digest1 = CreateBuilder().WithEvidenceChunks(chunks1).Build();
var digest2 = CreateBuilder().WithEvidenceChunks(chunks2).Build();
// Assert - Merkle tree preserves order, so roots should differ
digest1.ProofRoot.Should().NotBe(digest2.ProofRoot);
}
[Fact]
public void WithTrustScore_ComponentWeights_ProducesConsistentScore()
{
// Arrange - Using weighted formula: 25% reach + 20% sbom + 20% vex + 15% policy + 20% signer
// 100 * 0.25 + 100 * 0.20 + 100 * 0.20 + 100 * 0.15 + 100 * 0.20 = 100
// Act
var digest = CreateBuilder()
.WithTrustScore(
reachabilityScore: 100,
sbomCompletenessScore: 100,
vexCoverageScore: 100,
policyFreshnessScore: 100,
signerTrustScore: 100)
.Build();
// Assert
digest.TrustScore.Should().Be(100);
}
[Fact]
public void WithTrustScore_MixedScores_CalculatesCorrectWeight()
{
// Arrange - 80 * 0.25 + 60 * 0.20 + 70 * 0.20 + 50 * 0.15 + 90 * 0.20
// = 20 + 12 + 14 + 7.5 + 18 = 71.5 → 72
// Act
var digest = CreateBuilder()
.WithTrustScore(
reachabilityScore: 80,
sbomCompletenessScore: 60,
vexCoverageScore: 70,
policyFreshnessScore: 50,
signerTrustScore: 90)
.Build();
// Assert
digest.TrustScore.Should().Be(72);
}
[Fact]
public void WithDefaultTimestamps_UsesFrozenTime()
{
// Arrange
var frozenTime = new DateTimeOffset(2024, 12, 24, 12, 0, 0, TimeSpan.Zero);
var timeProvider = new FakeTimeProvider(frozenTime);
var builder = new DecisionDigestBuilder(_options, timeProvider);
// Act
var digest = builder
.WithVeriKey("sha256:verikey")
.WithVerdictHash("sha256:verdict")
.WithProofRoot("sha256:proof")
.WithReplaySeed(["feed1"], ["rule1"])
.WithTrustScore(85)
.WithDefaultTimestamps()
.Build();
// Assert
digest.CreatedAt.Should().Be(frozenTime);
digest.ExpiresAt.Should().Be(frozenTime.Add(_options.DefaultTtl));
}
[Fact]
public void Build_MultipleTimes_ReturnsConsistentDigest()
{
// Arrange
var dispositions = new Dictionary<string, string> { ["CVE-1"] = "fixed" };
var builder = CreateBuilder().WithDispositions(dispositions);
// Act - Build multiple times
var digests = Enumerable.Range(0, 100)
.Select(_ => builder.Build())
.Select(d => (d.VerdictHash, d.ProofRoot))
.Distinct()
.ToList();
// Assert - All should be identical
digests.Should().HaveCount(1);
}
[Fact]
public void Build_EmptyDispositions_ProducesConsistentHash()
{
// Arrange
var empty1 = new Dictionary<string, string>();
var empty2 = new Dictionary<string, string>();
// Act
var digest1 = CreateBuilder().WithDispositions(empty1).Build();
var digest2 = CreateBuilder().WithDispositions(empty2).Build();
// Assert
digest1.VerdictHash.Should().Be(digest2.VerdictHash);
digest1.VerdictHash.Should().StartWith("sha256:");
}
[Fact]
public void Build_EmptyEvidenceChunks_ProducesConsistentHash()
{
// Arrange
var empty1 = new List<string>();
var empty2 = Array.Empty<string>();
// Act
var digest1 = CreateBuilder().WithEvidenceChunks(empty1).Build();
var digest2 = CreateBuilder().WithEvidenceChunks(empty2).Build();
// Assert
digest1.ProofRoot.Should().Be(digest2.ProofRoot);
digest1.ProofRoot.Should().StartWith("sha256:");
}
[Fact]
public void Build_ReplaySeedPreservedCorrectly()
{
// Arrange
var feedIds = new[] { "cve-2024", "ghsa-2024" };
var ruleIds = new[] { "policy-v1", "exceptions" };
var frozenEpoch = new DateTimeOffset(2024, 12, 24, 0, 0, 0, TimeSpan.Zero);
// Act
var digest = CreateBuilder()
.WithReplaySeed(feedIds, ruleIds, frozenEpoch)
.Build();
// Assert
digest.ReplaySeed.FeedIds.Should().BeEquivalentTo(feedIds);
digest.ReplaySeed.RuleIds.Should().BeEquivalentTo(ruleIds);
digest.ReplaySeed.FrozenEpoch.Should().Be(frozenEpoch);
}
[Fact]
public void Build_MissingComponent_ThrowsInvalidOperationException()
{
// Arrange
var builder = new DecisionDigestBuilder(_options, _timeProvider)
.WithVeriKey("sha256:abc");
// Missing other required components
// Act
var act = () => builder.Build();
// Assert
act.Should().Throw<InvalidOperationException>()
.WithMessage("*missing required components*");
}
private DecisionDigestBuilder CreateBuilder()
{
return new DecisionDigestBuilder(_options, _timeProvider)
.WithVeriKey("sha256:testverikey")
.WithVerdictHash("sha256:defaultverdict")
.WithProofRoot("sha256:defaultproof")
.WithReplaySeed(["feed1"], ["rule1"])
.WithTrustScore(85)
.WithTimestamps(
_timeProvider.GetUtcNow(),
_timeProvider.GetUtcNow().AddHours(24));
}
/// <summary>
/// Fake time provider for deterministic timestamp testing.
/// </summary>
private sealed class FakeTimeProvider : TimeProvider
{
private readonly DateTimeOffset _frozenTime;
public FakeTimeProvider(DateTimeOffset frozenTime)
{
_frozenTime = frozenTime;
}
public override DateTimeOffset GetUtcNow() => _frozenTime;
}
}

View File

@@ -0,0 +1,478 @@
// SPDX-License-Identifier: AGPL-3.0-or-later
// Copyright (C) 2025 StellaOps Contributors
using FluentAssertions;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.TestHost;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Moq;
using StellaOps.Provcache.Api;
using System.Net;
using System.Net.Http.Json;
using System.Text.Json;
using Xunit;
namespace StellaOps.Provcache.Tests;
/// <summary>
/// API endpoint integration tests for Provcache endpoints.
/// </summary>
public sealed class ProvcacheApiTests : IAsyncDisposable
{
private readonly Mock<IProvcacheService> _mockService;
private readonly IHost _host;
private readonly HttpClient _client;
public ProvcacheApiTests()
{
_mockService = new Mock<IProvcacheService>();
_host = Host.CreateDefaultBuilder()
.ConfigureWebHost(webBuilder =>
{
webBuilder.UseTestServer()
.ConfigureServices(services =>
{
services.AddSingleton(_mockService.Object);
services.AddRouting();
services.AddLogging(b => b.SetMinimumLevel(LogLevel.Warning));
})
.Configure(app =>
{
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapProvcacheEndpoints();
});
});
})
.Build();
_host.Start();
_client = _host.GetTestClient();
}
public async ValueTask DisposeAsync()
{
_client.Dispose();
await _host.StopAsync();
_host.Dispose();
}
#region GET /v1/provcache/{veriKey}
[Fact]
public async Task GetByVeriKey_CacheHit_Returns200WithEntry()
{
// Arrange
const string veriKey = "sha256:abc123abc123abc123abc123abc123abc123abc123abc123abc123abc123abc1";
var entry = CreateTestEntry(veriKey);
var result = ProvcacheServiceResult.Hit(entry, "valkey", 1.5);
_mockService.Setup(s => s.GetAsync(veriKey, false, It.IsAny<CancellationToken>()))
.ReturnsAsync(result);
// Act
var response = await _client.GetAsync($"/v1/provcache/{Uri.EscapeDataString(veriKey)}");
// Assert
response.StatusCode.Should().Be(HttpStatusCode.OK);
var content = await response.Content.ReadFromJsonAsync<ProvcacheGetResponse>();
content.Should().NotBeNull();
content!.VeriKey.Should().Be(veriKey);
content.Status.Should().Be("hit");
content.Source.Should().Be("valkey");
}
[Fact]
public async Task GetByVeriKey_CacheMiss_Returns204()
{
// Arrange
const string veriKey = "sha256:miss123miss123miss123miss123miss123miss123miss123miss123miss123m";
var result = ProvcacheServiceResult.Miss(2.0);
_mockService.Setup(s => s.GetAsync(veriKey, false, It.IsAny<CancellationToken>()))
.ReturnsAsync(result);
// Act
var response = await _client.GetAsync($"/v1/provcache/{Uri.EscapeDataString(veriKey)}");
// Assert
response.StatusCode.Should().Be(HttpStatusCode.NoContent);
}
[Fact]
public async Task GetByVeriKey_Expired_Returns410Gone()
{
// Arrange
const string veriKey = "sha256:exp123exp123exp123exp123exp123exp123exp123exp123exp123exp123exp1";
var entry = CreateTestEntry(veriKey, expired: true);
var result = ProvcacheServiceResult.Expired(entry, 5.0);
_mockService.Setup(s => s.GetAsync(veriKey, false, It.IsAny<CancellationToken>()))
.ReturnsAsync(result);
// Act
var response = await _client.GetAsync($"/v1/provcache/{Uri.EscapeDataString(veriKey)}");
// Assert
response.StatusCode.Should().Be(HttpStatusCode.Gone);
}
[Fact]
public async Task GetByVeriKey_WithBypassCache_PassesFlagToService()
{
// Arrange
const string veriKey = "sha256:bypass123bypass123bypass123bypass123bypass123bypass123bypass1234";
var result = ProvcacheServiceResult.Miss(0.5);
_mockService.Setup(s => s.GetAsync(veriKey, true, It.IsAny<CancellationToken>()))
.ReturnsAsync(result);
// Act
await _client.GetAsync($"/v1/provcache/{Uri.EscapeDataString(veriKey)}?bypassCache=true");
// Assert
_mockService.Verify(s => s.GetAsync(veriKey, true, It.IsAny<CancellationToken>()), Times.Once);
}
#endregion
#region POST /v1/provcache
[Fact]
public async Task CreateOrUpdate_ValidRequest_Returns201Created()
{
// Arrange
const string veriKey = "sha256:new123new123new123new123new123new123new123new123new123new123new1";
var entry = CreateTestEntry(veriKey);
_mockService.Setup(s => s.SetAsync(It.IsAny<ProvcacheEntry>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(true);
var request = new ProvcacheCreateRequest
{
Entry = entry
};
// Act
var response = await _client.PostAsJsonAsync("/v1/provcache", request);
// Assert
response.StatusCode.Should().Be(HttpStatusCode.Created);
var content = await response.Content.ReadFromJsonAsync<ProvcacheCreateResponse>();
content.Should().NotBeNull();
content!.VeriKey.Should().Be(veriKey);
content.Success.Should().BeTrue();
}
[Fact]
public async Task CreateOrUpdate_NullEntry_Returns400BadRequest()
{
// Arrange
var request = new ProvcacheCreateRequest { Entry = null };
// Act
var response = await _client.PostAsJsonAsync("/v1/provcache", request);
// Assert
response.StatusCode.Should().Be(HttpStatusCode.BadRequest);
}
#endregion
#region POST /v1/provcache/invalidate
[Fact]
public async Task Invalidate_SingleVeriKey_Returns200WithAffectedCount()
{
// Arrange
const string veriKey = "sha256:inv123inv123inv123inv123inv123inv123inv123inv123inv123inv123inv1";
_mockService.Setup(s => s.InvalidateAsync(veriKey, It.IsAny<string?>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(true);
var request = new ProvcacheInvalidateRequest
{
Type = null, // null means single VeriKey invalidation
Value = veriKey,
Reason = "Test invalidation"
};
// Act
var response = await _client.PostAsJsonAsync("/v1/provcache/invalidate", request);
// Assert
response.StatusCode.Should().Be(HttpStatusCode.OK);
var content = await response.Content.ReadFromJsonAsync<ProvcacheInvalidateResponse>();
content.Should().NotBeNull();
content!.EntriesAffected.Should().Be(1);
content.Type.Should().Be("verikey");
}
[Fact]
public async Task Invalidate_ByPolicyHash_Returns200WithBulkResult()
{
// Arrange
const string policyHash = "sha256:policyhash123policyhash123policyhash123policyhash123policyhash";
var invalidationRequest = InvalidationRequest.ByPolicyHash(policyHash, "Policy updated");
var invalidationResult = new InvalidationResult
{
EntriesAffected = 5,
Request = invalidationRequest,
Timestamp = DateTimeOffset.UtcNow
};
_mockService.Setup(s => s.InvalidateByAsync(
It.Is<InvalidationRequest>(r => r.Type == InvalidationType.PolicyHash),
It.IsAny<CancellationToken>()))
.ReturnsAsync(invalidationResult);
var request = new ProvcacheInvalidateRequest
{
Type = InvalidationType.PolicyHash,
Value = policyHash,
Reason = "Policy updated"
};
// Act
var response = await _client.PostAsJsonAsync("/v1/provcache/invalidate", request);
// Assert
response.StatusCode.Should().Be(HttpStatusCode.OK);
var content = await response.Content.ReadFromJsonAsync<ProvcacheInvalidateResponse>();
content.Should().NotBeNull();
content!.EntriesAffected.Should().Be(5);
}
[Fact]
public async Task Invalidate_ByPattern_Returns200WithPatternResult()
{
// Arrange
const string pattern = "sha256:test*";
var invalidationRequest = InvalidationRequest.ByPattern(pattern, "Cleanup");
var invalidationResult = new InvalidationResult
{
EntriesAffected = 10,
Request = invalidationRequest,
Timestamp = DateTimeOffset.UtcNow
};
_mockService.Setup(s => s.InvalidateByAsync(
It.Is<InvalidationRequest>(r => r.Type == InvalidationType.Pattern),
It.IsAny<CancellationToken>()))
.ReturnsAsync(invalidationResult);
var request = new ProvcacheInvalidateRequest
{
Type = InvalidationType.Pattern,
Value = pattern,
Reason = "Cleanup"
};
// Act
var response = await _client.PostAsJsonAsync("/v1/provcache/invalidate", request);
// Assert
response.StatusCode.Should().Be(HttpStatusCode.OK);
var content = await response.Content.ReadFromJsonAsync<ProvcacheInvalidateResponse>();
content.Should().NotBeNull();
content!.EntriesAffected.Should().Be(10);
}
#endregion
#region GET /v1/provcache/metrics
[Fact]
public async Task GetMetrics_Returns200WithMetrics()
{
// Arrange
var metrics = new ProvcacheMetrics
{
TotalRequests = 1000,
TotalHits = 800,
TotalMisses = 200,
TotalInvalidations = 50,
CurrentEntryCount = 500,
AvgLatencyMs = 2.5,
P99LatencyMs = 10.0,
ValkeyCacheHealthy = true,
PostgresRepositoryHealthy = true,
CollectedAt = DateTimeOffset.UtcNow
};
_mockService.Setup(s => s.GetMetricsAsync(It.IsAny<CancellationToken>()))
.ReturnsAsync(metrics);
// Act
var response = await _client.GetAsync("/v1/provcache/metrics");
// Assert
response.StatusCode.Should().Be(HttpStatusCode.OK);
var content = await response.Content.ReadFromJsonAsync<ProvcacheMetricsResponse>();
content.Should().NotBeNull();
content!.TotalRequests.Should().Be(1000);
content.TotalHits.Should().Be(800);
content.HitRate.Should().BeApproximately(0.8, 0.01);
}
#endregion
#region Contract Verification Tests
[Fact]
public async Task GetByVeriKey_ResponseContract_HasRequiredFields()
{
// Arrange
const string veriKey = "sha256:contract123contract123contract123contract123contract123contract";
var entry = CreateTestEntry(veriKey);
var result = ProvcacheServiceResult.Hit(entry, "valkey", 1.0);
_mockService.Setup(s => s.GetAsync(veriKey, false, It.IsAny<CancellationToken>()))
.ReturnsAsync(result);
// Act
var response = await _client.GetAsync($"/v1/provcache/{Uri.EscapeDataString(veriKey)}");
var json = await response.Content.ReadAsStringAsync();
var doc = JsonDocument.Parse(json);
var root = doc.RootElement;
// Assert - Verify contract structure
root.TryGetProperty("veriKey", out _).Should().BeTrue("Response must have 'veriKey' field");
root.TryGetProperty("status", out _).Should().BeTrue("Response must have 'status' field");
root.TryGetProperty("source", out _).Should().BeTrue("Response must have 'source' field");
root.TryGetProperty("elapsedMs", out _).Should().BeTrue("Response must have 'elapsedMs' field");
root.TryGetProperty("entry", out _).Should().BeTrue("Response must have 'entry' field");
}
[Fact]
public async Task CreateOrUpdate_ResponseContract_HasRequiredFields()
{
// Arrange
const string veriKey = "sha256:contractcreate123contractcreate123contractcreate123contractcre";
var entry = CreateTestEntry(veriKey);
_mockService.Setup(s => s.SetAsync(It.IsAny<ProvcacheEntry>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(true);
var request = new ProvcacheCreateRequest { Entry = entry };
// Act
var response = await _client.PostAsJsonAsync("/v1/provcache", request);
var json = await response.Content.ReadAsStringAsync();
var doc = JsonDocument.Parse(json);
var root = doc.RootElement;
// Assert - Verify contract structure
root.TryGetProperty("veriKey", out _).Should().BeTrue("Response must have 'veriKey' field");
root.TryGetProperty("success", out _).Should().BeTrue("Response must have 'success' field");
root.TryGetProperty("expiresAt", out _).Should().BeTrue("Response must have 'expiresAt' field");
}
[Fact]
public async Task InvalidateResponse_Contract_HasRequiredFields()
{
// Arrange
const string veriKey = "sha256:contractinv123contractinv123contractinv123contractinv123contra";
_mockService.Setup(s => s.InvalidateAsync(veriKey, It.IsAny<string?>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(true);
var request = new ProvcacheInvalidateRequest
{
Type = null,
Value = veriKey
};
// Act
var response = await _client.PostAsJsonAsync("/v1/provcache/invalidate", request);
var json = await response.Content.ReadAsStringAsync();
var doc = JsonDocument.Parse(json);
var root = doc.RootElement;
// Assert - Verify contract structure
root.TryGetProperty("entriesAffected", out _).Should().BeTrue("Response must have 'entriesAffected' field");
root.TryGetProperty("type", out _).Should().BeTrue("Response must have 'type' field");
root.TryGetProperty("value", out _).Should().BeTrue("Response must have 'value' field");
}
[Fact]
public async Task MetricsResponse_Contract_HasRequiredFields()
{
// Arrange
var metrics = new ProvcacheMetrics
{
TotalRequests = 100,
TotalHits = 80,
TotalMisses = 20,
TotalInvalidations = 5,
CurrentEntryCount = 50,
AvgLatencyMs = 1.0,
P99LatencyMs = 5.0,
ValkeyCacheHealthy = true,
PostgresRepositoryHealthy = true,
CollectedAt = DateTimeOffset.UtcNow
};
_mockService.Setup(s => s.GetMetricsAsync(It.IsAny<CancellationToken>()))
.ReturnsAsync(metrics);
// Act
var response = await _client.GetAsync("/v1/provcache/metrics");
var json = await response.Content.ReadAsStringAsync();
var doc = JsonDocument.Parse(json);
var root = doc.RootElement;
// Assert - Verify contract structure
root.TryGetProperty("totalRequests", out _).Should().BeTrue("Response must have 'totalRequests' field");
root.TryGetProperty("totalHits", out _).Should().BeTrue("Response must have 'totalHits' field");
root.TryGetProperty("totalMisses", out _).Should().BeTrue("Response must have 'totalMisses' field");
root.TryGetProperty("hitRate", out _).Should().BeTrue("Response must have 'hitRate' field");
root.TryGetProperty("currentEntryCount", out _).Should().BeTrue("Response must have 'currentEntryCount' field");
}
#endregion
#region Test Helpers
private static ProvcacheEntry CreateTestEntry(string veriKey, bool expired = false)
{
var now = DateTimeOffset.UtcNow;
return new ProvcacheEntry
{
VeriKey = veriKey,
Decision = CreateTestDecisionDigest(veriKey),
PolicyHash = "sha256:policy123policy123policy123policy123policy123policy123policy1234",
SignerSetHash = "sha256:signer123signer123signer123signer123signer123signer123signer12",
FeedEpoch = "2025-W01",
CreatedAt = now.AddMinutes(-5),
ExpiresAt = expired ? now.AddMinutes(-1) : now.AddHours(1),
HitCount = 0
};
}
private static DecisionDigest CreateTestDecisionDigest(string veriKey)
{
return new DecisionDigest
{
VeriKey = veriKey,
DigestVersion = "v1",
VerdictHash = "sha256:verdict123verdict123verdict123verdict123verdict123verdict12345",
ProofRoot = "sha256:proof123proof123proof123proof123proof123proof123proof1234567",
ReplaySeed = new ReplaySeed
{
FeedIds = ["cve-nvd", "ghsa-2024"],
RuleIds = ["base-policy"]
},
CreatedAt = DateTimeOffset.UtcNow,
ExpiresAt = DateTimeOffset.UtcNow.AddHours(1),
TrustScore = 85
};
}
#endregion
}

View File

@@ -0,0 +1,38 @@
<?xml version="1.0" encoding="utf-8"?>
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<LangVersion>preview</LangVersion>
<IsPackable>false</IsPackable>
<RootNamespace>StellaOps.Provcache.Tests</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.14.0" />
<PackageReference Include="xunit" Version="2.9.3" />
<PackageReference Include="xunit.runner.visualstudio" Version="3.0.2">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="FluentAssertions" Version="8.2.0" />
<PackageReference Include="Moq" Version="4.20.72" />
<PackageReference Include="coverlet.collector" Version="6.0.2">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="Microsoft.AspNetCore.TestHost" Version="10.0.0" />
</ItemGroup>
<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="../../StellaOps.Provcache/StellaOps.Provcache.csproj" />
<ProjectReference Include="../../StellaOps.Provcache.Api/StellaOps.Provcache.Api.csproj" />
</ItemGroup>
</Project>

View File

@@ -0,0 +1,459 @@
using FluentAssertions;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Moq;
using Xunit;
namespace StellaOps.Provcache.Tests;
/// <summary>
/// Integration tests for the write-behind queue.
/// Tests batching, retry logic, and metrics.
/// </summary>
public class WriteBehindQueueTests
{
private readonly ProvcacheOptions _options = new()
{
EnableWriteBehind = true,
WriteBehindMaxBatchSize = 5,
WriteBehindFlushInterval = TimeSpan.FromMilliseconds(100),
WriteBehindQueueCapacity = 100,
WriteBehindMaxRetries = 3
};
private static ProvcacheEntry CreateTestEntry(string id) => new()
{
VeriKey = $"sha256:verikey_{id}",
Decision = new DecisionDigest
{
DigestVersion = "v1",
VeriKey = $"sha256:verikey_{id}",
VerdictHash = $"sha256:verdict_{id}",
ProofRoot = $"sha256:proof_{id}",
TrustScore = 85,
ReplaySeed = new ReplaySeed
{
FeedIds = ["feed1"],
RuleIds = ["rule1"]
},
CreatedAt = DateTimeOffset.UtcNow,
ExpiresAt = DateTimeOffset.UtcNow.AddHours(24)
},
PolicyHash = "sha256:policy",
SignerSetHash = "sha256:signers",
FeedEpoch = "2024-12-24T12:00:00Z",
CreatedAt = DateTimeOffset.UtcNow,
ExpiresAt = DateTimeOffset.UtcNow.AddHours(24),
HitCount = 0
};
[Fact]
public async Task EnqueueAsync_SingleEntry_UpdatesMetrics()
{
// Arrange
var repository = new Mock<IProvcacheRepository>();
var queue = new WriteBehindQueue(
repository.Object,
Options.Create(_options),
NullLogger<WriteBehindQueue>.Instance);
var entry = CreateTestEntry("1");
// Act
await queue.EnqueueAsync(entry);
// Assert
var metrics = queue.GetMetrics();
metrics.TotalEnqueued.Should().Be(1);
metrics.CurrentQueueDepth.Should().Be(1);
}
[Fact]
public async Task EnqueueAsync_MultipleEntries_TracksQueueDepth()
{
// Arrange
var repository = new Mock<IProvcacheRepository>();
var queue = new WriteBehindQueue(
repository.Object,
Options.Create(_options),
NullLogger<WriteBehindQueue>.Instance);
// Act
for (int i = 0; i < 10; i++)
{
await queue.EnqueueAsync(CreateTestEntry(i.ToString()));
}
// Assert
var metrics = queue.GetMetrics();
metrics.TotalEnqueued.Should().Be(10);
metrics.CurrentQueueDepth.Should().Be(10);
}
[Fact]
public void GetMetrics_InitialState_AllZeros()
{
// Arrange
var repository = new Mock<IProvcacheRepository>();
var queue = new WriteBehindQueue(
repository.Object,
Options.Create(_options),
NullLogger<WriteBehindQueue>.Instance);
// Act
var metrics = queue.GetMetrics();
// Assert
metrics.TotalEnqueued.Should().Be(0);
metrics.TotalPersisted.Should().Be(0);
metrics.TotalFailed.Should().Be(0);
metrics.TotalRetries.Should().Be(0);
metrics.TotalBatches.Should().Be(0);
metrics.CurrentQueueDepth.Should().Be(0);
}
[Fact]
public async Task ProcessBatch_SuccessfulPersist_UpdatesPersistMetrics()
{
// Arrange
var repository = new Mock<IProvcacheRepository>();
repository.Setup(r => r.UpsertManyAsync(It.IsAny<IEnumerable<ProvcacheEntry>>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);
var queue = new WriteBehindQueue(
repository.Object,
Options.Create(_options),
NullLogger<WriteBehindQueue>.Instance);
// Enqueue entries
for (int i = 0; i < 5; i++)
{
await queue.EnqueueAsync(CreateTestEntry(i.ToString()));
}
// Act - Start the queue and let it process
using var cts = new CancellationTokenSource();
var task = queue.StartAsync(cts.Token);
// Wait for processing
await Task.Delay(500);
// Stop
await queue.StopAsync(CancellationToken.None);
// Assert
var metrics = queue.GetMetrics();
metrics.TotalPersisted.Should().BeGreaterThanOrEqualTo(5);
metrics.TotalBatches.Should().BeGreaterThanOrEqualTo(1);
}
[Fact]
public void WriteBehindMetrics_Timestamp_IsRecent()
{
// Arrange
var repository = new Mock<IProvcacheRepository>();
var queue = new WriteBehindQueue(
repository.Object,
Options.Create(_options),
NullLogger<WriteBehindQueue>.Instance);
// Act
var metrics = queue.GetMetrics();
// Assert
metrics.Timestamp.Should().BeCloseTo(DateTimeOffset.UtcNow, TimeSpan.FromSeconds(5));
}
}
/// <summary>
/// Integration tests for the Provcache service with storage layer.
/// </summary>
public class ProvcacheServiceStorageIntegrationTests
{
private readonly ProvcacheOptions _options = new()
{
DefaultTtl = TimeSpan.FromHours(24),
MaxTtl = TimeSpan.FromDays(7),
TimeWindowBucket = TimeSpan.FromHours(1),
EnableWriteBehind = false, // Disable write-behind for sync tests
AllowCacheBypass = true
};
private static ProvcacheEntry CreateTestEntry(string veriKey) => new()
{
VeriKey = veriKey,
Decision = new DecisionDigest
{
DigestVersion = "v1",
VeriKey = veriKey,
VerdictHash = "sha256:verdict123",
ProofRoot = "sha256:proof456",
TrustScore = 90,
ReplaySeed = new ReplaySeed
{
FeedIds = ["nvd:2024"],
RuleIds = ["rule:cve-critical"]
},
CreatedAt = DateTimeOffset.UtcNow,
ExpiresAt = DateTimeOffset.UtcNow.AddHours(24)
},
PolicyHash = "sha256:policy789",
SignerSetHash = "sha256:signers000",
FeedEpoch = "2024-12-24T00:00:00Z",
CreatedAt = DateTimeOffset.UtcNow,
ExpiresAt = DateTimeOffset.UtcNow.AddHours(24),
HitCount = 0
};
[Fact]
public async Task SetAsync_ThenGetAsync_ReturnsEntry()
{
// Arrange
var veriKey = "sha256:test_verikey_1";
var entry = CreateTestEntry(veriKey);
var store = new Mock<IProvcacheStore>();
store.Setup(s => s.SetAsync(It.IsAny<ProvcacheEntry>(), It.IsAny<CancellationToken>()))
.Returns(ValueTask.CompletedTask);
store.Setup(s => s.GetAsync(veriKey, It.IsAny<CancellationToken>()))
.ReturnsAsync(new ProvcacheLookupResult { IsHit = true, Entry = entry, Source = "valkey" });
var repository = new Mock<IProvcacheRepository>();
repository.Setup(r => r.UpsertAsync(It.IsAny<ProvcacheEntry>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);
var service = new ProvcacheService(
store.Object,
repository.Object,
Options.Create(_options),
NullLogger<ProvcacheService>.Instance);
// Act
await service.SetAsync(entry);
var result = await service.GetAsync(veriKey);
// Assert
result.WasCached.Should().BeTrue();
result.Entry.Should().NotBeNull();
result.Entry!.VeriKey.Should().Be(veriKey);
result.Source.Should().Be("valkey");
}
[Fact]
public async Task GetAsync_CacheMissWithDbHit_BackfillsCache()
{
// Arrange
var veriKey = "sha256:test_verikey_2";
var entry = CreateTestEntry(veriKey);
var store = new Mock<IProvcacheStore>();
store.Setup(s => s.GetAsync(veriKey, It.IsAny<CancellationToken>()))
.ReturnsAsync(new ProvcacheLookupResult { IsHit = false });
store.Setup(s => s.SetAsync(It.IsAny<ProvcacheEntry>(), It.IsAny<CancellationToken>()))
.Returns(ValueTask.CompletedTask);
var repository = new Mock<IProvcacheRepository>();
repository.Setup(r => r.GetAsync(veriKey, It.IsAny<CancellationToken>()))
.ReturnsAsync(entry);
var service = new ProvcacheService(
store.Object,
repository.Object,
Options.Create(_options),
NullLogger<ProvcacheService>.Instance);
// Act
var result = await service.GetAsync(veriKey);
// Assert
result.WasCached.Should().BeTrue();
result.Entry.Should().NotBeNull();
result.Source.Should().Be("postgres");
// Verify backfill
store.Verify(s => s.SetAsync(It.Is<ProvcacheEntry>(e => e.VeriKey == veriKey), It.IsAny<CancellationToken>()), Times.Once);
}
[Fact]
public async Task GetAsync_FullMiss_ReturnsMissResult()
{
// Arrange
var veriKey = "sha256:test_verikey_3";
var store = new Mock<IProvcacheStore>();
store.Setup(s => s.GetAsync(veriKey, It.IsAny<CancellationToken>()))
.ReturnsAsync(new ProvcacheLookupResult { IsHit = false });
var repository = new Mock<IProvcacheRepository>();
repository.Setup(r => r.GetAsync(veriKey, It.IsAny<CancellationToken>()))
.ReturnsAsync((ProvcacheEntry?)null);
var service = new ProvcacheService(
store.Object,
repository.Object,
Options.Create(_options),
NullLogger<ProvcacheService>.Instance);
// Act
var result = await service.GetAsync(veriKey);
// Assert
result.WasCached.Should().BeFalse();
result.Status.Should().Be(ProvcacheResultStatus.CacheMiss);
result.Entry.Should().BeNull();
}
[Fact]
public async Task GetOrComputeAsync_CacheHit_DoesNotCallFactory()
{
// Arrange
var veriKey = "sha256:test_verikey_4";
var entry = CreateTestEntry(veriKey);
var factoryCalled = false;
var store = new Mock<IProvcacheStore>();
store.Setup(s => s.GetAsync(veriKey, It.IsAny<CancellationToken>()))
.ReturnsAsync(new ProvcacheLookupResult { IsHit = true, Entry = entry, Source = "valkey" });
var repository = new Mock<IProvcacheRepository>();
var service = new ProvcacheService(
store.Object,
repository.Object,
Options.Create(_options),
NullLogger<ProvcacheService>.Instance);
// Act
var result = await service.GetOrComputeAsync(veriKey, async _ =>
{
factoryCalled = true;
return entry;
});
// Assert
factoryCalled.Should().BeFalse();
result.VeriKey.Should().Be(veriKey);
}
[Fact]
public async Task GetOrComputeAsync_CacheMiss_CallsFactoryAndStores()
{
// Arrange
var veriKey = "sha256:test_verikey_5";
var entry = CreateTestEntry(veriKey);
var factoryCalled = false;
var store = new Mock<IProvcacheStore>();
store.Setup(s => s.GetAsync(veriKey, It.IsAny<CancellationToken>()))
.ReturnsAsync(new ProvcacheLookupResult { IsHit = false });
store.Setup(s => s.SetAsync(It.IsAny<ProvcacheEntry>(), It.IsAny<CancellationToken>()))
.Returns(ValueTask.CompletedTask);
var repository = new Mock<IProvcacheRepository>();
repository.Setup(r => r.GetAsync(veriKey, It.IsAny<CancellationToken>()))
.ReturnsAsync((ProvcacheEntry?)null);
repository.Setup(r => r.UpsertAsync(It.IsAny<ProvcacheEntry>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);
var service = new ProvcacheService(
store.Object,
repository.Object,
Options.Create(_options),
NullLogger<ProvcacheService>.Instance);
// Act
var result = await service.GetOrComputeAsync(veriKey, async _ =>
{
factoryCalled = true;
return entry;
});
// Assert
factoryCalled.Should().BeTrue();
result.VeriKey.Should().Be(veriKey);
store.Verify(s => s.SetAsync(It.Is<ProvcacheEntry>(e => e.VeriKey == veriKey), It.IsAny<CancellationToken>()), Times.Once);
}
[Fact]
public async Task InvalidateAsync_RemovesFromBothStoreLayers()
{
// Arrange
var veriKey = "sha256:test_verikey_6";
var store = new Mock<IProvcacheStore>();
store.Setup(s => s.InvalidateAsync(veriKey, It.IsAny<CancellationToken>()))
.ReturnsAsync(true);
var repository = new Mock<IProvcacheRepository>();
repository.Setup(r => r.DeleteAsync(veriKey, It.IsAny<CancellationToken>()))
.ReturnsAsync(true);
var service = new ProvcacheService(
store.Object,
repository.Object,
Options.Create(_options),
NullLogger<ProvcacheService>.Instance);
// Act
var result = await service.InvalidateAsync(veriKey, "test invalidation");
// Assert
result.Should().BeTrue();
store.Verify(s => s.InvalidateAsync(veriKey, It.IsAny<CancellationToken>()), Times.Once);
repository.Verify(r => r.DeleteAsync(veriKey, It.IsAny<CancellationToken>()), Times.Once);
}
[Fact]
public async Task GetAsync_BypassCache_ReturnsbypassedResult()
{
// Arrange
var veriKey = "sha256:test_verikey_7";
var store = new Mock<IProvcacheStore>();
var repository = new Mock<IProvcacheRepository>();
var service = new ProvcacheService(
store.Object,
repository.Object,
Options.Create(_options),
NullLogger<ProvcacheService>.Instance);
// Act
var result = await service.GetAsync(veriKey, bypassCache: true);
// Assert
result.Status.Should().Be(ProvcacheResultStatus.Bypassed);
store.Verify(s => s.GetAsync(It.IsAny<string>(), It.IsAny<CancellationToken>()), Times.Never);
}
[Fact]
public async Task GetMetricsAsync_ReturnsCurrentMetrics()
{
// Arrange
var store = new Mock<IProvcacheStore>();
store.Setup(s => s.GetAsync(It.IsAny<string>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(new ProvcacheLookupResult { IsHit = false });
var repository = new Mock<IProvcacheRepository>();
repository.Setup(r => r.GetAsync(It.IsAny<string>(), It.IsAny<CancellationToken>()))
.ReturnsAsync((ProvcacheEntry?)null);
repository.Setup(r => r.GetStatisticsAsync(It.IsAny<CancellationToken>()))
.ReturnsAsync(new ProvcacheStatistics { TotalEntries = 0, TotalHits = 0 });
var service = new ProvcacheService(
store.Object,
repository.Object,
Options.Create(_options),
NullLogger<ProvcacheService>.Instance);
// Generate some traffic
await service.GetAsync("sha256:miss1");
await service.GetAsync("sha256:miss2");
// Act
var metrics = await service.GetMetricsAsync();
// Assert
metrics.TotalRequests.Should().BeGreaterThanOrEqualTo(2);
}
}

View File

@@ -0,0 +1,244 @@
using FluentAssertions;
using Xunit;
namespace StellaOps.Provcache.Tests;
/// <summary>
/// Determinism tests for VeriKeyBuilder.
/// Verifies that same inputs always produce the same VeriKey.
/// </summary>
public class VeriKeyBuilderDeterminismTests
{
private readonly ProvcacheOptions _options = new()
{
TimeWindowBucket = TimeSpan.FromHours(1)
};
[Fact]
public void Build_SameInputs_ProducesSameVeriKey()
{
// Arrange
var sourceHash = "sha256:abc123def456";
var sbomHash = "sha256:sbom111222333";
var vexHashes = new[] { "sha256:vex111", "sha256:vex222", "sha256:vex333" };
var policyHash = "sha256:policy999";
var signerHashes = new[] { "sha256:cert111", "sha256:cert222" };
var timeWindow = "2024-12-24T12:00:00Z";
// Act
var veriKey1 = new VeriKeyBuilder(_options)
.WithSourceHash(sourceHash)
.WithSbomHash(sbomHash)
.WithVexStatementHashes(vexHashes)
.WithMergePolicyHash(policyHash)
.WithCertificateHashes(signerHashes)
.WithTimeWindow(timeWindow)
.Build();
var veriKey2 = new VeriKeyBuilder(_options)
.WithSourceHash(sourceHash)
.WithSbomHash(sbomHash)
.WithVexStatementHashes(vexHashes)
.WithMergePolicyHash(policyHash)
.WithCertificateHashes(signerHashes)
.WithTimeWindow(timeWindow)
.Build();
// Assert
veriKey1.Should().Be(veriKey2);
veriKey1.Should().StartWith("sha256:");
}
[Fact]
public void Build_DifferentInputOrder_VexHashes_ProducesSameVeriKey()
{
// Arrange - VEX hashes in different orders
var vexHashesOrder1 = new[] { "sha256:vex333", "sha256:vex111", "sha256:vex222" };
var vexHashesOrder2 = new[] { "sha256:vex111", "sha256:vex222", "sha256:vex333" };
// Act
var veriKey1 = CreateBuilder().WithVexStatementHashes(vexHashesOrder1).Build();
var veriKey2 = CreateBuilder().WithVexStatementHashes(vexHashesOrder2).Build();
// Assert - Should be same because hashes are sorted
veriKey1.Should().Be(veriKey2);
}
[Fact]
public void Build_DifferentInputOrder_CertificateHashes_ProducesSameVeriKey()
{
// Arrange - Certificate hashes in different orders
var certOrder1 = new[] { "sha256:cert222", "sha256:cert111" };
var certOrder2 = new[] { "sha256:cert111", "sha256:cert222" };
// Act
var veriKey1 = CreateBuilder().WithCertificateHashes(certOrder1).Build();
var veriKey2 = CreateBuilder().WithCertificateHashes(certOrder2).Build();
// Assert - Should be same because hashes are sorted
veriKey1.Should().Be(veriKey2);
}
[Fact]
public void Build_DifferentSourceHash_ProducesDifferentVeriKey()
{
// Arrange
var veriKey1 = CreateBuilder().WithSourceHash("sha256:source111").Build();
var veriKey2 = CreateBuilder().WithSourceHash("sha256:source222").Build();
// Assert
veriKey1.Should().NotBe(veriKey2);
}
[Fact]
public void Build_DifferentSbomHash_ProducesDifferentVeriKey()
{
// Arrange
var veriKey1 = CreateBuilder().WithSbomHash("sha256:sbom111").Build();
var veriKey2 = CreateBuilder().WithSbomHash("sha256:sbom222").Build();
// Assert
veriKey1.Should().NotBe(veriKey2);
}
[Fact]
public void Build_DifferentTimeWindow_ProducesDifferentVeriKey()
{
// Arrange
var veriKey1 = CreateBuilder().WithTimeWindow("2024-12-24T12:00:00Z").Build();
var veriKey2 = CreateBuilder().WithTimeWindow("2024-12-24T13:00:00Z").Build();
// Assert
veriKey1.Should().NotBe(veriKey2);
}
[Fact]
public void Build_MultipleTimes_ReturnsConsistentResult()
{
// Arrange & Act - Create multiple builder instances with same inputs
var results = Enumerable.Range(0, 100)
.Select(_ => CreateBuilder().Build())
.Distinct()
.ToList();
// Assert - All should be identical
results.Should().HaveCount(1);
}
[Fact]
public void Build_AcrossMultipleBuilders_ProducesSameResult()
{
// Act - Create 10 different builder instances
var results = Enumerable.Range(0, 10)
.Select(_ => CreateBuilder().Build())
.Distinct()
.ToList();
// Assert - All should be identical
results.Should().HaveCount(1);
}
[Fact]
public void Build_WithHashPrefixNormalization_ProducesSameVeriKey()
{
// Arrange - Same hash with different case prefixes
var veriKey1 = new VeriKeyBuilder(_options)
.WithSourceHash("SHA256:ABC123")
.WithSbomHash("sha256:def456")
.WithVexStatementHashes(["sha256:vex1"])
.WithMergePolicyHash("sha256:policy")
.WithCertificateHashes(["sha256:cert"])
.WithTimeWindow("2024-12-24T12:00:00Z")
.Build();
var veriKey2 = new VeriKeyBuilder(_options)
.WithSourceHash("sha256:abc123")
.WithSbomHash("sha256:def456")
.WithVexStatementHashes(["sha256:vex1"])
.WithMergePolicyHash("sha256:policy")
.WithCertificateHashes(["sha256:cert"])
.WithTimeWindow("2024-12-24T12:00:00Z")
.Build();
// Assert
veriKey1.Should().Be(veriKey2);
}
[Fact]
public void WithTimeWindow_Timestamp_BucketsDeterministically()
{
// Arrange
var timestamp1 = new DateTimeOffset(2024, 12, 24, 12, 30, 0, TimeSpan.Zero);
var timestamp2 = new DateTimeOffset(2024, 12, 24, 12, 45, 0, TimeSpan.Zero);
var timestamp3 = new DateTimeOffset(2024, 12, 24, 13, 15, 0, TimeSpan.Zero);
// Act
var builder1 = CreateBuilder().WithTimeWindow(timestamp1);
var builder2 = CreateBuilder().WithTimeWindow(timestamp2);
var builder3 = CreateBuilder().WithTimeWindow(timestamp3);
// Assert - timestamps 1 and 2 are in same hour bucket, 3 is different
builder1.Build().Should().Be(builder2.Build());
builder1.Build().Should().NotBe(builder3.Build());
}
[Fact]
public void BuildWithComponents_ReturnsSameVeriKeyAsIndividualComponents()
{
// Arrange & Act - Create two identical builders
var veriKey = CreateBuilder().Build();
var components = CreateBuilder().BuildWithComponents();
// Assert
components.VeriKey.Should().Be(veriKey);
components.SourceHash.Should().StartWith("sha256:");
components.SbomHash.Should().StartWith("sha256:");
}
[Fact]
public void Build_EmptyVexSet_ProducesConsistentHash()
{
// Arrange
var emptyVex1 = Array.Empty<string>();
var emptyVex2 = new List<string>();
// Act
var veriKey1 = CreateBuilder()
.WithVexStatementHashes(emptyVex1)
.Build();
var veriKey2 = CreateBuilder()
.WithVexStatementHashes(emptyVex2)
.Build();
// Assert
veriKey1.Should().Be(veriKey2);
}
[Fact]
public void Build_MissingComponent_ThrowsInvalidOperationException()
{
// Arrange
var builder = new VeriKeyBuilder(_options)
.WithSourceHash("sha256:abc123");
// Missing other required components
// Act
var act = () => builder.Build();
// Assert
act.Should().Throw<InvalidOperationException>()
.WithMessage("*missing required components*");
}
private VeriKeyBuilder CreateBuilder()
{
return new VeriKeyBuilder(_options)
.WithSourceHash("sha256:defaultsource")
.WithSbomHash("sha256:defaultsbom")
.WithVexStatementHashes(["sha256:vex1", "sha256:vex2"])
.WithMergePolicyHash("sha256:defaultpolicy")
.WithCertificateHashes(["sha256:cert1"])
.WithTimeWindow("2024-12-24T12:00:00Z");
}
}

View File

@@ -17,3 +17,21 @@ public sealed class RabbitMqIntegrationFactAttribute : FactAttribute
}
}
/// <summary>
/// Theory attribute for RabbitMQ integration tests.
/// Skips tests when STELLAOPS_TEST_RABBITMQ environment variable is not set.
/// </summary>
[AttributeUsage(AttributeTargets.Method)]
public sealed class RabbitMqIntegrationTheoryAttribute : TheoryAttribute
{
public RabbitMqIntegrationTheoryAttribute()
{
var enabled = Environment.GetEnvironmentVariable("STELLAOPS_TEST_RABBITMQ");
if (!string.Equals(enabled, "1", StringComparison.OrdinalIgnoreCase) &&
!string.Equals(enabled, "true", StringComparison.OrdinalIgnoreCase))
{
Skip = "RabbitMQ integration tests are opt-in. Set STELLAOPS_TEST_RABBITMQ=1 (requires Docker/Testcontainers).";
}
}
}

View File

@@ -0,0 +1,634 @@
// -----------------------------------------------------------------------------
// RabbitMqTransportComplianceTests.cs
// Sprint: SPRINT_5100_0010_0003 - Router + Messaging Test Implementation
// Task: MESSAGING-5100-005 - RabbitMQ transport compliance tests
// Description: Transport compliance tests for RabbitMQ transport covering roundtrip,
// ack/nack semantics, and frame protocol behavior.
// -----------------------------------------------------------------------------
using System.Text;
using FluentAssertions;
using Microsoft.Extensions.Options;
using StellaOps.Router.Common.Enums;
using StellaOps.Router.Common.Frames;
using StellaOps.Router.Common.Models;
using StellaOps.Router.Transport.RabbitMq.Tests.Fixtures;
using Xunit;
using Xunit.Abstractions;
namespace StellaOps.Router.Transport.RabbitMq.Tests;
/// <summary>
/// Transport compliance tests for RabbitMQ transport.
/// Validates:
/// - Protocol roundtrip (frame encoding → publish → consume → decode)
/// - Frame type discrimination
/// - Message ordering and delivery guarantees
/// - Connection resilience and recovery
/// </summary>
[Collection(RabbitMqIntegrationTestCollection.Name)]
public sealed class RabbitMqTransportComplianceTests : IAsyncLifetime
{
private readonly RabbitMqContainerFixture _fixture;
private readonly ITestOutputHelper _output;
private RabbitMqTransportServer? _server;
private RabbitMqTransportClient? _client;
public RabbitMqTransportComplianceTests(RabbitMqContainerFixture fixture, ITestOutputHelper output)
{
_fixture = fixture;
_output = output;
}
public Task InitializeAsync()
{
return Task.CompletedTask;
}
public async Task DisposeAsync()
{
if (_client is not null)
{
await _client.DisposeAsync();
}
if (_server is not null)
{
await _server.DisposeAsync();
}
}
#region Protocol Roundtrip Tests
[RabbitMqIntegrationFact]
public async Task ProtocolRoundtrip_HelloFrame_ReceivedByServer()
{
// Arrange
const string nodeId = "gw-hello-roundtrip";
_server = CreateServer(nodeId);
_client = CreateClient("svc-hello-roundtrip", nodeId);
Frame? receivedFrame = null;
var frameReceived = new TaskCompletionSource<bool>();
_server.OnFrame += (connectionId, frame) =>
{
if (frame.Type == FrameType.Hello)
{
receivedFrame = frame;
frameReceived.TrySetResult(true);
}
};
await _server.StartAsync(CancellationToken.None);
// Act
await ConnectClientAsync(_client, "svc-hello-roundtrip");
// Assert
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await frameReceived.Task.WaitAsync(cts.Token);
receivedFrame.Should().NotBeNull();
receivedFrame!.Type.Should().Be(FrameType.Hello);
receivedFrame.CorrelationId.Should().NotBeNullOrEmpty();
_output.WriteLine("Hello frame roundtrip test passed");
}
[RabbitMqIntegrationFact]
public async Task ProtocolRoundtrip_HeartbeatFrame_ReceivedByServer()
{
// Arrange
const string nodeId = "gw-heartbeat-roundtrip";
_server = CreateServer(nodeId);
_client = CreateClient("svc-heartbeat-roundtrip", nodeId);
var heartbeatReceived = new TaskCompletionSource<bool>();
_server.OnFrame += (connectionId, frame) =>
{
if (frame.Type == FrameType.Heartbeat)
{
heartbeatReceived.TrySetResult(true);
}
};
await _server.StartAsync(CancellationToken.None);
await ConnectClientAsync(_client, "svc-heartbeat-roundtrip");
await Task.Delay(500); // Wait for HELLO to establish connection
// Act
var heartbeat = new HeartbeatPayload
{
InstanceId = "svc-heartbeat-roundtrip",
Status = InstanceHealthStatus.Healthy,
InFlightRequestCount = 0,
ErrorRate = 0,
TimestampUtc = DateTime.UtcNow
};
await _client.SendHeartbeatAsync(heartbeat, CancellationToken.None);
// Assert
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await heartbeatReceived.Task.WaitAsync(cts.Token);
_output.WriteLine("Heartbeat frame roundtrip test passed");
}
#endregion
#region Frame Protocol Tests
[RabbitMqIntegrationFact]
public void FrameProtocol_ParseFrame_AllFrameTypes()
{
// Test that all frame types parse correctly
var frameTypes = new[] { "Request", "Response", "Hello", "Heartbeat", "Cancel" };
foreach (var typeName in frameTypes)
{
var properties = new TestBasicProperties { Type = typeName, CorrelationId = "test" };
var body = new byte[] { 1, 2, 3 };
var frame = RabbitMqFrameProtocol.ParseFrame(body, properties);
var expectedType = Enum.Parse<FrameType>(typeName);
frame.Type.Should().Be(expectedType, $"Frame type {typeName} should parse correctly");
}
_output.WriteLine("All frame types parse correctly");
}
[RabbitMqIntegrationFact]
public void FrameProtocol_CreateProperties_PreservesFrameType()
{
// Test that frame type is preserved in properties
var frameTypes = new[] { FrameType.Request, FrameType.Response, FrameType.Hello, FrameType.Heartbeat, FrameType.Cancel };
foreach (var frameType in frameTypes)
{
var frame = new Frame
{
Type = frameType,
CorrelationId = Guid.NewGuid().ToString("N"),
Payload = Array.Empty<byte>()
};
var properties = RabbitMqFrameProtocol.CreateProperties(frame, null);
properties.Type.Should().Be(frameType.ToString(), $"Frame type {frameType} should be preserved in properties");
}
_output.WriteLine("Frame types preserved in properties");
}
[RabbitMqIntegrationTheory]
[InlineData(0)]
[InlineData(1)]
[InlineData(100)]
[InlineData(1000)]
public void FrameProtocol_BinaryPayload_Preserved(int payloadSize)
{
// Test that binary payloads are preserved during parsing
var payload = new byte[payloadSize];
if (payloadSize > 0)
{
new Random(payloadSize).NextBytes(payload);
}
var properties = new TestBasicProperties { Type = "Request", CorrelationId = "test" };
var frame = RabbitMqFrameProtocol.ParseFrame(payload, properties);
frame.Payload.ToArray().Should().BeEquivalentTo(payload);
_output.WriteLine($"Binary payload size {payloadSize} preserved");
}
[RabbitMqIntegrationFact]
public void FrameProtocol_CorrelationId_Preserved()
{
// Test that correlation ID is preserved
var correlationId = Guid.NewGuid().ToString("N");
var properties = new TestBasicProperties { Type = "Request", CorrelationId = correlationId };
var frame = RabbitMqFrameProtocol.ParseFrame(Array.Empty<byte>(), properties);
frame.CorrelationId.Should().Be(correlationId);
_output.WriteLine("Correlation ID preserved");
}
#endregion
#region Connection Semantics Tests
[RabbitMqIntegrationFact]
public async Task ConnectionSemantics_ServerStart_Succeeds()
{
// Arrange
_server = CreateServer("gw-start");
// Act
var act = async () => await _server.StartAsync(CancellationToken.None);
// Assert
await act.Should().NotThrowAsync();
_output.WriteLine("Server start test passed");
}
[RabbitMqIntegrationFact]
public async Task ConnectionSemantics_ServerStop_Succeeds()
{
// Arrange
_server = CreateServer("gw-stop");
await _server.StartAsync(CancellationToken.None);
// Act
var act = async () => await _server.StopAsync(CancellationToken.None);
// Assert
await act.Should().NotThrowAsync();
_output.WriteLine("Server stop test passed");
}
[RabbitMqIntegrationFact]
public async Task ConnectionSemantics_ClientConnect_Succeeds()
{
// Arrange
_client = CreateClient("svc-connect");
// Act
var act = async () => await ConnectClientAsync(_client, "svc-connect");
// Assert
await act.Should().NotThrowAsync();
_output.WriteLine("Client connect test passed");
}
[RabbitMqIntegrationFact]
public async Task ConnectionSemantics_ClientDisconnect_Succeeds()
{
// Arrange
_client = CreateClient("svc-disconnect");
await ConnectClientAsync(_client, "svc-disconnect");
// Act
var act = async () => await _client.DisconnectAsync();
// Assert
await act.Should().NotThrowAsync();
_output.WriteLine("Client disconnect test passed");
}
[RabbitMqIntegrationFact]
public async Task ConnectionSemantics_MultipleClients_CanConnectSimultaneously()
{
// Arrange
var client1 = CreateClient("svc-multi-1");
var client2 = CreateClient("svc-multi-2");
try
{
// Act
await Task.WhenAll(
ConnectClientAsync(client1, "svc-multi-1"),
ConnectClientAsync(client2, "svc-multi-2"));
// Assert - both connections succeeded
_output.WriteLine("Multiple clients connected simultaneously");
}
finally
{
await client1.DisposeAsync();
await client2.DisposeAsync();
}
}
#endregion
#region Message Delivery Tests
[RabbitMqIntegrationFact]
public async Task MessageDelivery_HelloFromClient_ServerReceives()
{
// Arrange
const string nodeId = "gw-delivery";
_server = CreateServer(nodeId);
_client = CreateClient("svc-delivery", nodeId);
string? receivedConnectionId = null;
var helloReceived = new TaskCompletionSource<bool>();
_server.OnFrame += (connectionId, frame) =>
{
if (frame.Type == FrameType.Hello)
{
receivedConnectionId = connectionId;
helloReceived.TrySetResult(true);
}
};
await _server.StartAsync(CancellationToken.None);
// Act
await ConnectClientAsync(_client, "svc-delivery");
// Assert
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await helloReceived.Task.WaitAsync(cts.Token);
receivedConnectionId.Should().NotBeNullOrEmpty();
_output.WriteLine($"Server received HELLO from connection: {receivedConnectionId}");
}
[RabbitMqIntegrationFact]
public async Task MessageDelivery_MultipleHeartbeats_AllReceived()
{
// Arrange
const string nodeId = "gw-multi-heartbeat";
const int heartbeatCount = 5;
_server = CreateServer(nodeId);
_client = CreateClient("svc-multi-heartbeat", nodeId);
var receivedCount = 0;
var allReceived = new TaskCompletionSource<bool>();
_server.OnFrame += (connectionId, frame) =>
{
if (frame.Type == FrameType.Heartbeat)
{
var count = Interlocked.Increment(ref receivedCount);
if (count == heartbeatCount)
{
allReceived.TrySetResult(true);
}
}
};
await _server.StartAsync(CancellationToken.None);
await ConnectClientAsync(_client, "svc-multi-heartbeat");
await Task.Delay(500);
// Act
for (int i = 0; i < heartbeatCount; i++)
{
var heartbeat = new HeartbeatPayload
{
InstanceId = "svc-multi-heartbeat",
Status = InstanceHealthStatus.Healthy,
InFlightRequestCount = i,
ErrorRate = 0,
TimestampUtc = DateTime.UtcNow
};
await _client.SendHeartbeatAsync(heartbeat, CancellationToken.None);
}
// Assert
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15));
await allReceived.Task.WaitAsync(cts.Token);
receivedCount.Should().Be(heartbeatCount);
_output.WriteLine($"All {heartbeatCount} heartbeats received");
}
#endregion
#region Connection Resilience Tests
[RabbitMqIntegrationFact]
public async Task ConnectionResilience_BrokerRestart_ClientRecovers()
{
// Arrange
const string nodeId = "gw-resilience";
_server = CreateServer(nodeId);
_client = CreateClient("svc-resilience", nodeId);
var postRestartReceived = new TaskCompletionSource<bool>();
_server.OnFrame += (_, frame) =>
{
if (frame.Type == FrameType.Heartbeat)
{
postRestartReceived.TrySetResult(true);
}
};
await _server.StartAsync(CancellationToken.None);
await ConnectClientAsync(_client, "svc-resilience");
// Wait for connection established
await Task.Delay(1000);
// Act - Restart broker
await _fixture.RestartAsync();
// Wait for recovery and try sending
await EventuallyAsync(
async () =>
{
var heartbeat = new HeartbeatPayload
{
InstanceId = "svc-resilience",
Status = InstanceHealthStatus.Healthy,
InFlightRequestCount = 0,
ErrorRate = 0,
TimestampUtc = DateTime.UtcNow
};
await _client.SendHeartbeatAsync(heartbeat, CancellationToken.None);
return true;
},
timeout: TimeSpan.FromSeconds(30),
swallowExceptions: true);
// Assert
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15));
await postRestartReceived.Task.WaitAsync(cts.Token);
_output.WriteLine("Connection resilience test passed");
}
#endregion
#region Queue Configuration Tests
[RabbitMqIntegrationFact]
public async Task QueueConfig_AutoDeleteQueues_CleanedUpOnDisconnect()
{
// Arrange
var options = _fixture.CreateOptions(instanceId: "svc-autodelete");
options.AutoDeleteQueues = true;
_client = new RabbitMqTransportClient(
Options.Create(options),
_fixture.GetLogger<RabbitMqTransportClient>());
await ConnectClientAsync(_client, "svc-autodelete");
// Act
await _client.DisconnectAsync();
await _client.DisposeAsync();
_client = null;
// Assert - queue should be auto-deleted (no way to verify without management API)
// Success is indicated by no exceptions
_output.WriteLine("Auto-delete queue test passed");
}
[RabbitMqIntegrationFact]
public async Task QueueConfig_PrefetchCount_AppliedOnConnect()
{
// Arrange
var options = _fixture.CreateOptions(instanceId: "svc-prefetch");
options.PrefetchCount = 50;
_client = new RabbitMqTransportClient(
Options.Create(options),
_fixture.GetLogger<RabbitMqTransportClient>());
// Act & Assert - success indicates prefetch was set
var act = async () => await ConnectClientAsync(_client, "svc-prefetch");
await act.Should().NotThrowAsync();
_output.WriteLine("Prefetch count test passed");
}
#endregion
#region Determinism Tests
[RabbitMqIntegrationFact]
public void Determinism_SameFrame_SameProperties()
{
// Test that same input produces same output
for (int run = 0; run < 10; run++)
{
var frame = new Frame
{
Type = FrameType.Request,
CorrelationId = "deterministic-test",
Payload = Encoding.UTF8.GetBytes("consistent-data")
};
var props1 = RabbitMqFrameProtocol.CreateProperties(frame, "reply-queue");
var props2 = RabbitMqFrameProtocol.CreateProperties(frame, "reply-queue");
props1.Type.Should().Be(props2.Type);
props1.CorrelationId.Should().Be(props2.CorrelationId);
props1.ReplyTo.Should().Be(props2.ReplyTo);
}
_output.WriteLine("Determinism test passed");
}
#endregion
#region Helpers
private RabbitMqTransportServer CreateServer(string? nodeId = null)
{
var options = _fixture.CreateOptions(nodeId: nodeId ?? $"gw-{Guid.NewGuid():N}"[..12]);
return new RabbitMqTransportServer(
Options.Create(options),
_fixture.GetLogger<RabbitMqTransportServer>());
}
private RabbitMqTransportClient CreateClient(string? instanceId = null, string? nodeId = null)
{
var options = _fixture.CreateOptions(
instanceId: instanceId ?? $"svc-{Guid.NewGuid():N}"[..12],
nodeId: nodeId);
return new RabbitMqTransportClient(
Options.Create(options),
_fixture.GetLogger<RabbitMqTransportClient>());
}
private static async Task ConnectClientAsync(RabbitMqTransportClient client, string instanceId)
{
var instance = new InstanceDescriptor
{
InstanceId = instanceId,
ServiceName = "test-service",
Version = "1.0.0",
Region = "us-east-1"
};
await client.ConnectAsync(instance, [], CancellationToken.None);
}
private static async Task EventuallyAsync(
Func<Task<bool>> predicate,
TimeSpan timeout,
bool swallowExceptions,
TimeSpan? pollInterval = null)
{
pollInterval ??= TimeSpan.FromMilliseconds(500);
var deadline = DateTime.UtcNow + timeout;
while (DateTime.UtcNow < deadline)
{
try
{
if (await predicate())
{
return;
}
}
catch when (swallowExceptions)
{
// Retry
}
await Task.Delay(pollInterval.Value);
}
(await predicate()).Should().BeTrue("condition should become true within {0}", timeout);
}
#endregion
#region Test Helpers
private sealed class TestBasicProperties : RabbitMQ.Client.IReadOnlyBasicProperties
{
public string? AppId { get; init; }
public string? ClusterId { get; init; }
public string? ContentEncoding { get; init; }
public string? ContentType { get; init; }
public string? CorrelationId { get; init; }
public RabbitMQ.Client.DeliveryModes DeliveryMode { get; init; }
public string? Expiration { get; init; }
public IDictionary<string, object?>? Headers { get; init; }
public string? MessageId { get; init; }
public bool Persistent { get; init; }
public byte Priority { get; init; }
public string? ReplyTo { get; init; }
public RabbitMQ.Client.PublicationAddress? ReplyToAddress { get; init; }
public RabbitMQ.Client.AmqpTimestamp Timestamp { get; init; }
public string? Type { get; init; }
public string? UserId { get; init; }
public bool IsAppIdPresent() => AppId != null;
public bool IsClusterIdPresent() => ClusterId != null;
public bool IsContentEncodingPresent() => ContentEncoding != null;
public bool IsContentTypePresent() => ContentType != null;
public bool IsCorrelationIdPresent() => CorrelationId != null;
public bool IsDeliveryModePresent() => true;
public bool IsExpirationPresent() => Expiration != null;
public bool IsHeadersPresent() => Headers != null;
public bool IsMessageIdPresent() => MessageId != null;
public bool IsPriorityPresent() => true;
public bool IsReplyToPresent() => ReplyTo != null;
public bool IsTimestampPresent() => true;
public bool IsTypePresent() => Type != null;
public bool IsUserIdPresent() => UserId != null;
}
#endregion
}