feat: Enhance MongoDB storage with event publishing and outbox support
- Added `MongoAdvisoryObservationEventPublisher` and `NatsAdvisoryObservationEventPublisher` for event publishing. - Registered `IAdvisoryObservationEventPublisher` to choose between NATS and MongoDB based on configuration. - Introduced `MongoAdvisoryObservationEventOutbox` for outbox pattern implementation. - Updated service collection to include new event publishers and outbox. - Added a new hosted service `AdvisoryObservationTransportWorker` for processing events. feat: Update project dependencies - Added `NATS.Client.Core` package to the project for NATS integration. test: Add unit tests for AdvisoryLinkset normalization - Created `AdvisoryLinksetNormalizationConfidenceTests` to validate confidence score calculations. fix: Adjust confidence assertion in `AdvisoryObservationAggregationTests` - Updated confidence assertion to allow a range instead of a fixed value. test: Implement tests for AdvisoryObservationEventFactory - Added `AdvisoryObservationEventFactoryTests` to ensure correct mapping and hashing of observation events. chore: Configure test project for Findings Ledger - Created `Directory.Build.props` for test project configuration. - Added `StellaOps.Findings.Ledger.Exports.Unit.csproj` for unit tests related to findings ledger exports. feat: Implement export contracts for findings ledger - Defined export request and response contracts in `ExportContracts.cs`. - Created various export item records for findings, VEX, advisories, and SBOMs. feat: Add export functionality to Findings Ledger Web Service - Implemented endpoints for exporting findings, VEX, advisories, and SBOMs. - Integrated `ExportQueryService` for handling export logic and pagination. test: Add tests for Node language analyzer phase 22 - Implemented `NodePhase22SampleLoaderTests` to validate loading of NDJSON fixtures. - Created sample NDJSON file for testing. chore: Set up isolated test environment for Node tests - Added `node-isolated.runsettings` for isolated test execution. - Created `node-tests-isolated.sh` script for running tests in isolation.
This commit is contained in:
@@ -0,0 +1,214 @@
|
||||
using System.Text.Json.Nodes;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Npgsql;
|
||||
using NpgsqlTypes;
|
||||
using StellaOps.Findings.Ledger.Infrastructure.Exports;
|
||||
using StellaOps.Findings.Ledger.Infrastructure.Postgres;
|
||||
using StellaOps.Findings.Ledger.WebService.Contracts;
|
||||
|
||||
namespace StellaOps.Findings.Ledger.WebService.Services;
|
||||
|
||||
public sealed class ExportQueryService
|
||||
{
|
||||
private const int DefaultPageSize = 500;
|
||||
private const int MaxPageSize = 5000;
|
||||
|
||||
private readonly LedgerDataSource _dataSource;
|
||||
private readonly ILogger<ExportQueryService> _logger;
|
||||
|
||||
public ExportQueryService(LedgerDataSource dataSource, ILogger<ExportQueryService> logger)
|
||||
{
|
||||
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
public ExportPage<VexExportItem> GetVexEmpty() => new(Array.Empty<VexExportItem>(), null);
|
||||
|
||||
public ExportPage<AdvisoryExportItem> GetAdvisoriesEmpty() => new(Array.Empty<AdvisoryExportItem>(), null);
|
||||
|
||||
public ExportPage<SbomExportItem> GetSbomsEmpty() => new(Array.Empty<SbomExportItem>(), null);
|
||||
|
||||
public int ClampPageSize(int? requested)
|
||||
{
|
||||
if (!requested.HasValue || requested.Value <= 0)
|
||||
{
|
||||
return DefaultPageSize;
|
||||
}
|
||||
|
||||
return Math.Min(requested.Value, MaxPageSize);
|
||||
}
|
||||
|
||||
public string ComputeFiltersHash(ExportFindingsRequest request)
|
||||
{
|
||||
var filters = new Dictionary<string, string?>
|
||||
{
|
||||
["shape"] = request.Shape,
|
||||
["since_sequence"] = request.SinceSequence?.ToString(),
|
||||
["until_sequence"] = request.UntilSequence?.ToString(),
|
||||
["since_observed_at"] = request.SinceObservedAt?.ToString("O"),
|
||||
["until_observed_at"] = request.UntilObservedAt?.ToString("O"),
|
||||
["status"] = request.Status,
|
||||
["severity"] = request.Severity?.ToString()
|
||||
};
|
||||
|
||||
return ExportPaging.ComputeFiltersHash(filters);
|
||||
}
|
||||
|
||||
public async Task<ExportPage<FindingExportItem>> GetFindingsAsync(ExportFindingsRequest request, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
|
||||
if (!string.Equals(request.FiltersHash, ComputeFiltersHash(request), StringComparison.Ordinal))
|
||||
{
|
||||
throw new InvalidOperationException("filters_hash_mismatch");
|
||||
}
|
||||
|
||||
const string baseSql = """
|
||||
SELECT le.sequence_no,
|
||||
le.recorded_at,
|
||||
fp.finding_id,
|
||||
fp.policy_version,
|
||||
fp.status,
|
||||
fp.severity,
|
||||
fp.labels,
|
||||
fp.cycle_hash,
|
||||
le.evidence_bundle_ref,
|
||||
le.event_hash
|
||||
FROM findings_projection fp
|
||||
JOIN ledger_events le
|
||||
ON le.tenant_id = fp.tenant_id
|
||||
AND le.event_id = fp.current_event_id
|
||||
WHERE fp.tenant_id = @tenant_id
|
||||
""";
|
||||
|
||||
var sqlBuilder = new System.Text.StringBuilder(baseSql);
|
||||
var parameters = new List<NpgsqlParameter>
|
||||
{
|
||||
new("tenant_id", request.TenantId)
|
||||
{
|
||||
NpgsqlDbType = NpgsqlDbType.Text
|
||||
}
|
||||
};
|
||||
|
||||
if (request.SinceSequence.HasValue)
|
||||
{
|
||||
sqlBuilder.Append(" AND le.sequence_no >= @since_sequence");
|
||||
parameters.Add(new NpgsqlParameter<long>("since_sequence", request.SinceSequence.Value)
|
||||
{
|
||||
NpgsqlDbType = NpgsqlDbType.Bigint
|
||||
});
|
||||
}
|
||||
|
||||
if (request.UntilSequence.HasValue)
|
||||
{
|
||||
sqlBuilder.Append(" AND le.sequence_no <= @until_sequence");
|
||||
parameters.Add(new NpgsqlParameter<long>("until_sequence", request.UntilSequence.Value)
|
||||
{
|
||||
NpgsqlDbType = NpgsqlDbType.Bigint
|
||||
});
|
||||
}
|
||||
|
||||
if (request.SinceObservedAt.HasValue)
|
||||
{
|
||||
sqlBuilder.Append(" AND le.recorded_at >= @since_observed_at");
|
||||
parameters.Add(new NpgsqlParameter<DateTimeOffset>("since_observed_at", request.SinceObservedAt.Value)
|
||||
{
|
||||
NpgsqlDbType = NpgsqlDbType.TimestampTz
|
||||
});
|
||||
}
|
||||
|
||||
if (request.UntilObservedAt.HasValue)
|
||||
{
|
||||
sqlBuilder.Append(" AND le.recorded_at <= @until_observed_at");
|
||||
parameters.Add(new NpgsqlParameter<DateTimeOffset>("until_observed_at", request.UntilObservedAt.Value)
|
||||
{
|
||||
NpgsqlDbType = NpgsqlDbType.TimestampTz
|
||||
});
|
||||
}
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(request.Status))
|
||||
{
|
||||
sqlBuilder.Append(" AND fp.status = @status");
|
||||
parameters.Add(new NpgsqlParameter<string>("status", request.Status)
|
||||
{
|
||||
NpgsqlDbType = NpgsqlDbType.Text
|
||||
});
|
||||
}
|
||||
|
||||
if (request.Severity.HasValue)
|
||||
{
|
||||
sqlBuilder.Append(" AND fp.severity = @severity");
|
||||
parameters.Add(new NpgsqlParameter<decimal>("severity", request.Severity.Value)
|
||||
{
|
||||
NpgsqlDbType = NpgsqlDbType.Numeric
|
||||
});
|
||||
}
|
||||
|
||||
if (request.PagingKey is not null)
|
||||
{
|
||||
sqlBuilder.Append(" AND (le.sequence_no > @cursor_seq OR (le.sequence_no = @cursor_seq AND fp.policy_version > @cursor_policy) OR (le.sequence_no = @cursor_seq AND fp.policy_version = @cursor_policy AND fp.cycle_hash > @cursor_cycle))");
|
||||
parameters.Add(new NpgsqlParameter<long>("cursor_seq", request.PagingKey.SequenceNumber)
|
||||
{
|
||||
NpgsqlDbType = NpgsqlDbType.Bigint
|
||||
});
|
||||
parameters.Add(new NpgsqlParameter<string>("cursor_policy", request.PagingKey.PolicyVersion)
|
||||
{
|
||||
NpgsqlDbType = NpgsqlDbType.Text
|
||||
});
|
||||
parameters.Add(new NpgsqlParameter<string>("cursor_cycle", request.PagingKey.CycleHash)
|
||||
{
|
||||
NpgsqlDbType = NpgsqlDbType.Char
|
||||
});
|
||||
}
|
||||
|
||||
sqlBuilder.Append(" ORDER BY le.sequence_no, fp.policy_version, fp.cycle_hash");
|
||||
sqlBuilder.Append(" LIMIT @take");
|
||||
parameters.Add(new NpgsqlParameter<int>("take", request.PageSize + 1)
|
||||
{
|
||||
NpgsqlDbType = NpgsqlDbType.Integer
|
||||
});
|
||||
|
||||
await using var connection = await _dataSource.OpenConnectionAsync(request.TenantId, cancellationToken).ConfigureAwait(false);
|
||||
await using var command = new NpgsqlCommand(sqlBuilder.ToString(), connection)
|
||||
{
|
||||
CommandTimeout = _dataSource.CommandTimeoutSeconds
|
||||
};
|
||||
command.Parameters.AddRange(parameters.ToArray());
|
||||
|
||||
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
|
||||
var items = new List<FindingExportItem>();
|
||||
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
var labelsJson = reader.GetFieldValue<string>(6);
|
||||
var labels = JsonNode.Parse(labelsJson)?.AsObject();
|
||||
|
||||
items.Add(new FindingExportItem(
|
||||
EventSequence: reader.GetInt64(0),
|
||||
ObservedAt: reader.GetFieldValue<DateTimeOffset>(1),
|
||||
FindingId: reader.GetString(2),
|
||||
PolicyVersion: reader.GetString(3),
|
||||
Status: reader.GetString(4),
|
||||
Severity: reader.IsDBNull(5) ? null : reader.GetDecimal(5),
|
||||
CycleHash: reader.GetString(7),
|
||||
EvidenceBundleRef: reader.IsDBNull(8) ? null : reader.GetString(8),
|
||||
Provenance: new ExportProvenance(
|
||||
PolicyVersion: reader.GetString(3),
|
||||
CycleHash: reader.GetString(7),
|
||||
LedgerEventHash: reader.IsDBNull(9) ? null : reader.GetString(9)),
|
||||
Labels: labels));
|
||||
}
|
||||
|
||||
string? nextPageToken = null;
|
||||
if (items.Count > request.PageSize)
|
||||
{
|
||||
var last = items[request.PageSize];
|
||||
items = items.Take(request.PageSize).ToList();
|
||||
var key = new ExportPagingKey(last.EventSequence, last.PolicyVersion, last.CycleHash);
|
||||
nextPageToken = ExportPaging.CreatePageToken(
|
||||
new ExportPaging.ExportPageKey(key.SequenceNumber, key.PolicyVersion, key.CycleHash),
|
||||
request.FiltersHash);
|
||||
}
|
||||
|
||||
return new ExportPage<FindingExportItem>(items, nextPageToken);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user