feat(graph): introduce graph.inspect.v1 contract and schema for SBOM relationships
Some checks failed
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Concelier Attestation Tests / attestation-tests (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
Console CI / console-ci (push) Has been cancelled
Export Center CI / export-ci (push) Has been cancelled

- Added graph.inspect.v1 documentation outlining payload structure and determinism rules.
- Created JSON schema for graph.inspect.v1 to enforce payload validation.
- Defined mapping rules for graph relationships, advisories, and VEX statements.

feat(notifications): establish remediation blueprint for gaps NR1-NR10

- Documented requirements, evidence, and tests for Notifier runtime.
- Specified deliverables and next steps for addressing identified gaps.

docs(notifications): organize operations and schemas documentation

- Created README files for operations, schemas, and security notes to clarify deliverables and policies.

feat(advisory): implement PostgreSQL caching for Link-Not-Merge linksets

- Created database schema for advisory linkset cache.
- Developed repository for managing advisory linkset cache operations.
- Added tests to ensure correct functionality of the AdvisoryLinksetCacheRepository.
This commit is contained in:
StellaOps Bot
2025-12-04 09:36:59 +02:00
parent 4dc7cf834a
commit 600f3a7a3c
37 changed files with 1326 additions and 272 deletions

View File

@@ -0,0 +1,21 @@
-- Link-Not-Merge linkset cache (PostgreSQL)
-- Stores deterministic cache entries for advisory linksets per tenant/source/advisory.
CREATE TABLE IF NOT EXISTS vuln.lnm_linkset_cache (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id TEXT NOT NULL,
source TEXT NOT NULL,
advisory_id TEXT NOT NULL,
observations TEXT[] NOT NULL DEFAULT '{}',
normalized JSONB,
conflicts JSONB,
provenance JSONB,
confidence DOUBLE PRECISION,
built_by_job_id TEXT,
created_at TIMESTAMPTZ NOT NULL,
CONSTRAINT uq_lnm_linkset_cache UNIQUE (tenant_id, advisory_id, source)
);
CREATE INDEX IF NOT EXISTS idx_lnm_linkset_cache_order
ON vuln.lnm_linkset_cache (tenant_id, created_at DESC, advisory_id, source);

View File

@@ -0,0 +1,21 @@
using System;
namespace StellaOps.Concelier.Storage.Postgres.Models;
/// <summary>
/// Represents a cached Link-Not-Merge linkset snapshot stored in PostgreSQL.
/// </summary>
public sealed class AdvisoryLinksetCacheEntity
{
public Guid Id { get; init; }
public string TenantId { get; init; } = default!;
public string Source { get; init; } = default!;
public string AdvisoryId { get; init; } = default!;
public string[] Observations { get; init; } = Array.Empty<string>();
public string? NormalizedJson { get; init; }
public string? ConflictsJson { get; init; }
public string? ProvenanceJson { get; init; }
public double? Confidence { get; init; }
public DateTimeOffset CreatedAt { get; init; }
public string? BuiltByJobId { get; init; }
}

View File

@@ -0,0 +1,264 @@
using System.Collections.Immutable;
using System.Text.Json;
using System.Text.Json.Serialization;
using Microsoft.Extensions.Logging;
using Npgsql;
using StellaOps.Concelier.Core.Linksets;
using StellaOps.Concelier.Storage.Postgres.Models;
using StellaOps.Infrastructure.Postgres.Repositories;
namespace StellaOps.Concelier.Storage.Postgres.Repositories;
/// <summary>
/// PostgreSQL implementation of the Link-Not-Merge linkset cache.
/// </summary>
public sealed class AdvisoryLinksetCacheRepository
: RepositoryBase<ConcelierDataSource>,
IAdvisoryLinksetStore
{
private static readonly JsonSerializerOptions JsonOptions = new()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
WriteIndented = false
};
public AdvisoryLinksetCacheRepository(
ConcelierDataSource dataSource,
ILogger<AdvisoryLinksetCacheRepository> logger)
: base(dataSource, logger)
{
}
public async Task UpsertAsync(
AdvisoryLinkset linkset,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(linkset);
var tenant = NormalizeTenant(linkset.TenantId);
var entity = ToEntity(linkset with { TenantId = tenant });
const string sql = """
INSERT INTO vuln.lnm_linkset_cache
(id, tenant_id, source, advisory_id, observations,
normalized, conflicts, provenance, confidence, built_by_job_id, created_at)
VALUES
(@id, @tenant_id, @source, @advisory_id, @observations,
@normalized::jsonb, @conflicts::jsonb, @provenance::jsonb,
@confidence, @built_by_job_id, @created_at)
ON CONFLICT (tenant_id, advisory_id, source) DO UPDATE SET
observations = EXCLUDED.observations,
normalized = EXCLUDED.normalized,
conflicts = EXCLUDED.conflicts,
provenance = EXCLUDED.provenance,
confidence = EXCLUDED.confidence,
built_by_job_id = EXCLUDED.built_by_job_id,
created_at = EXCLUDED.created_at
""";
await using var connection = await DataSource
.OpenConnectionAsync(tenant, "lnm-cache-upsert", cancellationToken)
.ConfigureAwait(false);
await using var command = CreateCommand(sql, connection);
AddParameter(command, "id", entity.Id);
AddParameter(command, "tenant_id", entity.TenantId);
AddParameter(command, "source", entity.Source);
AddParameter(command, "advisory_id", entity.AdvisoryId);
AddTextArrayParameter(command, "observations", entity.Observations);
AddJsonbParameter(command, "normalized", entity.NormalizedJson);
AddJsonbParameter(command, "conflicts", entity.ConflictsJson);
AddJsonbParameter(command, "provenance", entity.ProvenanceJson);
AddParameter(command, "confidence", entity.Confidence);
AddParameter(command, "built_by_job_id", entity.BuiltByJobId);
AddParameter(command, "created_at", entity.CreatedAt.UtcDateTime);
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
public async Task<IReadOnlyList<AdvisoryLinkset>> FindByTenantAsync(
string tenantId,
IEnumerable<string>? advisoryIds,
IEnumerable<string>? sources,
AdvisoryLinksetCursor? cursor,
int limit,
CancellationToken cancellationToken)
{
if (string.IsNullOrWhiteSpace(tenantId))
{
throw new ArgumentNullException(nameof(tenantId));
}
if (limit <= 0)
{
throw new ArgumentOutOfRangeException(nameof(limit));
}
var normalizedTenant = NormalizeTenant(tenantId);
var advisoryIdArray = advisoryIds?.Select(a => a.Trim()).Where(a => !string.IsNullOrWhiteSpace(a)).ToArray();
var sourceArray = sources?.Select(s => s.Trim()).Where(s => !string.IsNullOrWhiteSpace(s)).ToArray();
const string sql = """
SELECT id, tenant_id, source, advisory_id, observations,
normalized::text, conflicts::text, provenance::text,
confidence, built_by_job_id, created_at
FROM vuln.lnm_linkset_cache
WHERE tenant_id = @tenant_id
AND (@advisory_ids IS NULL OR advisory_id = ANY(@advisory_ids))
AND (@sources IS NULL OR source = ANY(@sources))
AND (
@cursor_created_at IS NULL
OR created_at < @cursor_created_at
OR (created_at = @cursor_created_at AND advisory_id > @cursor_advisory_id)
)
ORDER BY created_at DESC, advisory_id ASC
LIMIT @limit
""";
return await QueryAsync(
normalizedTenant,
sql,
cmd =>
{
AddParameter(cmd, "tenant_id", normalizedTenant);
AddTextArrayParameter(cmd, "advisory_ids", advisoryIdArray);
AddTextArrayParameter(cmd, "sources", sourceArray);
if (cursor is null)
{
AddParameter(cmd, "cursor_created_at", DBNull.Value);
AddParameter(cmd, "cursor_advisory_id", DBNull.Value);
}
else
{
AddParameter(cmd, "cursor_created_at", cursor.CreatedAt.UtcDateTime);
AddParameter(cmd, "cursor_advisory_id", cursor.AdvisoryId);
}
AddParameter(cmd, "limit", limit);
},
MapLinkset,
cancellationToken).ConfigureAwait(false);
}
private static string NormalizeTenant(string tenantId) =>
tenantId.Trim().ToLowerInvariant();
private static AdvisoryLinksetCacheEntity ToEntity(AdvisoryLinkset linkset)
{
var normalizedJson = linkset.Normalized is null
? null
: JsonSerializer.Serialize(new
{
linkset.Normalized.Purls,
linkset.Normalized.Cpes,
linkset.Normalized.Versions,
linkset.Normalized.Ranges,
linkset.Normalized.Severities
}, JsonOptions);
var conflictsJson = linkset.Conflicts is null
? null
: JsonSerializer.Serialize(
linkset.Conflicts.Select(c => new
{
c.Field,
c.Reason,
c.Values,
c.SourceIds
}),
JsonOptions);
var provenanceJson = linkset.Provenance is null
? null
: JsonSerializer.Serialize(new
{
linkset.Provenance.ObservationHashes,
linkset.Provenance.ToolVersion,
linkset.Provenance.PolicyHash
}, JsonOptions);
return new AdvisoryLinksetCacheEntity
{
Id = Guid.NewGuid(),
TenantId = linkset.TenantId,
Source = linkset.Source,
AdvisoryId = linkset.AdvisoryId,
Observations = linkset.ObservationIds.ToArray(),
NormalizedJson = normalizedJson,
ConflictsJson = conflictsJson,
ProvenanceJson = provenanceJson,
Confidence = linkset.Confidence,
CreatedAt = linkset.CreatedAt,
BuiltByJobId = linkset.BuiltByJobId
};
}
private static AdvisoryLinkset MapLinkset(NpgsqlDataReader reader)
{
var normalized = Deserialize<NormalizedPayload>(reader, 5);
var conflicts = Deserialize<List<ConflictPayload>>(reader, 6);
var provenance = Deserialize<ProvenancePayload>(reader, 7);
return new AdvisoryLinkset(
TenantId: reader.GetString(1),
Source: reader.GetString(2),
AdvisoryId: reader.GetString(3),
ObservationIds: reader.GetFieldValue<string[]>(4).ToImmutableArray(),
Normalized: normalized is null
? null
: new AdvisoryLinksetNormalized(
normalized.Purls,
normalized.Cpes,
normalized.Versions,
normalized.Ranges,
normalized.Severities),
Provenance: provenance is null
? null
: new AdvisoryLinksetProvenance(
provenance.ObservationHashes,
provenance.ToolVersion,
provenance.PolicyHash),
Confidence: GetNullableDouble(reader, 8),
Conflicts: conflicts?.Select(c => new AdvisoryLinksetConflict(
c.Field ?? string.Empty,
c.Reason ?? string.Empty,
c.Values,
c.SourceIds)).ToList(),
CreatedAt: reader.GetFieldValue<DateTimeOffset>(10),
BuiltByJobId: GetNullableString(reader, 9));
}
private static double? GetNullableDouble(NpgsqlDataReader reader, int ordinal) =>
reader.IsDBNull(ordinal) ? null : reader.GetDouble(ordinal);
private static TPayload? Deserialize<TPayload>(NpgsqlDataReader reader, int ordinal)
{
if (reader.IsDBNull(ordinal))
{
return default;
}
var json = reader.GetString(ordinal);
return JsonSerializer.Deserialize<TPayload>(json, JsonOptions);
}
private sealed record NormalizedPayload(
IReadOnlyList<string>? Purls,
IReadOnlyList<string>? Cpes,
IReadOnlyList<string>? Versions,
IReadOnlyList<Dictionary<string, object?>>? Ranges,
IReadOnlyList<Dictionary<string, object?>>? Severities);
private sealed record ConflictPayload(
string? Field,
string? Reason,
IReadOnlyList<string>? Values,
IReadOnlyList<string>? SourceIds);
private sealed record ProvenancePayload(
IReadOnlyList<string>? ObservationHashes,
string? ToolVersion,
string? PolicyHash);
}

View File

@@ -3,6 +3,7 @@ using Microsoft.Extensions.DependencyInjection;
using StellaOps.Concelier.Storage.Postgres.Repositories;
using StellaOps.Infrastructure.Postgres;
using StellaOps.Infrastructure.Postgres.Options;
using StellaOps.Concelier.Core.Linksets;
namespace StellaOps.Concelier.Storage.Postgres;
@@ -40,6 +41,8 @@ public static class ServiceCollectionExtensions
services.AddScoped<IFeedSnapshotRepository, FeedSnapshotRepository>();
services.AddScoped<IAdvisorySnapshotRepository, AdvisorySnapshotRepository>();
services.AddScoped<IMergeEventRepository, MergeEventRepository>();
services.AddScoped<IAdvisoryLinksetStore, AdvisoryLinksetCacheRepository>();
services.AddScoped<IAdvisoryLinksetLookup>(sp => sp.GetRequiredService<IAdvisoryLinksetStore>());
return services;
}
@@ -71,6 +74,8 @@ public static class ServiceCollectionExtensions
services.AddScoped<IFeedSnapshotRepository, FeedSnapshotRepository>();
services.AddScoped<IAdvisorySnapshotRepository, AdvisorySnapshotRepository>();
services.AddScoped<IMergeEventRepository, MergeEventRepository>();
services.AddScoped<IAdvisoryLinksetStore, AdvisoryLinksetCacheRepository>();
services.AddScoped<IAdvisoryLinksetLookup>(sp => sp.GetRequiredService<IAdvisoryLinksetStore>());
return services;
}

View File

@@ -17,6 +17,7 @@
<ItemGroup>
<ProjectReference Include="..\..\..\__Libraries\StellaOps.Infrastructure.Postgres\StellaOps.Infrastructure.Postgres.csproj" />
<ProjectReference Include="..\StellaOps.Concelier.Storage.Mongo\StellaOps.Concelier.Storage.Mongo.csproj" />
<ProjectReference Include="..\StellaOps.Concelier.Core\StellaOps.Concelier.Core.csproj" />
</ItemGroup>
</Project>

View File

@@ -0,0 +1,148 @@
using System.Collections.Immutable;
using FluentAssertions;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using StellaOps.Concelier.Core.Linksets;
using StellaOps.Concelier.Storage.Postgres;
using StellaOps.Concelier.Storage.Postgres.Repositories;
using StellaOps.Infrastructure.Postgres.Options;
using Xunit;
namespace StellaOps.Concelier.Storage.Postgres.Tests.Linksets;
[Collection(ConcelierPostgresCollection.Name)]
public sealed class AdvisoryLinksetCacheRepositoryTests : IAsyncLifetime
{
private readonly ConcelierPostgresFixture _fixture;
private readonly AdvisoryLinksetCacheRepository _repository;
public AdvisoryLinksetCacheRepositoryTests(ConcelierPostgresFixture fixture)
{
_fixture = fixture;
PostgresOptions options = fixture.Fixture.CreateOptions();
options.SchemaName = fixture.SchemaName;
var dataSource = new ConcelierDataSource(Options.Create(options), NullLogger<ConcelierDataSource>.Instance);
_repository = new AdvisoryLinksetCacheRepository(dataSource, NullLogger<AdvisoryLinksetCacheRepository>.Instance);
}
public Task InitializeAsync() => _fixture.TruncateAllTablesAsync();
public Task DisposeAsync() => Task.CompletedTask;
[Fact]
public async Task Upsert_NormalizesTenantAndReplaces()
{
var createdAt = DateTimeOffset.Parse("2025-11-20T12:00:00Z");
var initial = BuildLinkset("Tenant-A", "ghsa", "GHSA-1", new[] { "obs-1" }, createdAt, confidence: 0.5);
var replacement = BuildLinkset("tenant-a", "ghsa", "GHSA-1", new[] { "obs-2" }, createdAt.AddMinutes(5), confidence: 0.9);
await _repository.UpsertAsync(initial, CancellationToken.None);
await _repository.UpsertAsync(replacement, CancellationToken.None);
var results = await _repository.FindByTenantAsync("TENANT-A", null, null, cursor: null, limit: 10, CancellationToken.None);
results.Should().ContainSingle();
results[0].TenantId.Should().Be("tenant-a");
results[0].ObservationIds.Should().ContainSingle("obs-2");
results[0].Confidence.Should().Be(0.9);
results[0].CreatedAt.Should().Be(replacement.CreatedAt);
}
[Fact]
public async Task FindByTenantAsync_OrdersAndPages()
{
var now = DateTimeOffset.UtcNow;
var linksets = new[]
{
BuildLinkset("tenant", "src", "ADV-002", new[] { "obs-1" }, now, confidence: null),
BuildLinkset("tenant", "src", "ADV-001", new[] { "obs-2" }, now, confidence: 0.7),
BuildLinkset("tenant", "src", "ADV-003", new[] { "obs-3" }, now.AddMinutes(-10), confidence: 0.2)
};
foreach (var linkset in linksets)
{
await _repository.UpsertAsync(linkset, CancellationToken.None);
}
var firstPage = await _repository.FindByTenantAsync("tenant", null, null, cursor: null, limit: 10, CancellationToken.None);
firstPage.Select(ls => ls.AdvisoryId).Should().ContainInOrder("ADV-001", "ADV-002", "ADV-003");
var cursor = new AdvisoryLinksetCursor(firstPage[1].CreatedAt, firstPage[1].AdvisoryId);
var secondPage = await _repository.FindByTenantAsync("tenant", null, null, cursor, limit: 10, CancellationToken.None);
secondPage.Should().ContainSingle();
secondPage[0].AdvisoryId.Should().Be("ADV-003");
}
[Fact]
public async Task RoundTrip_PersistsNormalizedConflictsAndProvenance()
{
var normalized = new AdvisoryLinksetNormalized(
Purls: new[] { "pkg:npm/foo@1.0.0" },
Cpes: new[] { "cpe:2.3:a:foo:bar:1.0:*:*:*:*:*:*:*" },
Versions: new[] { "1.0.0" },
Ranges: new[]
{
new Dictionary<string, object?> { ["type"] = "semver", ["introduced"] = "1.0.0", ["fixed"] = "2.0.0" }
},
Severities: new[]
{
new Dictionary<string, object?> { ["system"] = "cvssv3", ["score"] = 7.5 }
});
var conflicts = new List<AdvisoryLinksetConflict>
{
new("severity", "disagree", new[] { "7.5", "9.8" }, new[] { "nvd", "vendor" })
};
var provenance = new AdvisoryLinksetProvenance(
ObservationHashes: new[] { "h1", "h2" },
ToolVersion: "lnm-1.0",
PolicyHash: "sha256:abc");
var linkset = new AdvisoryLinkset(
TenantId: "tenant-x",
Source: "ghsa",
AdvisoryId: "GHSA-9999",
ObservationIds: ImmutableArray.Create("obs-100"),
Normalized: normalized,
Provenance: provenance,
Confidence: 0.66,
Conflicts: conflicts,
CreatedAt: DateTimeOffset.Parse("2025-11-20T00:00:00Z"),
BuiltByJobId: "job-42");
await _repository.UpsertAsync(linkset, CancellationToken.None);
var results = await _repository.FindByTenantAsync("tenant-x", new[] { "GHSA-9999" }, null, cursor: null, limit: 1, CancellationToken.None);
results.Should().ContainSingle();
var cached = results[0];
cached.Normalized.Should().NotBeNull();
cached.Normalized!.Purls.Should().ContainSingle("pkg:npm/foo@1.0.0");
cached.Normalized.Ranges!.Single()["type"].Should().Be("semver");
cached.Conflicts.Should().ContainSingle(c => c.Field == "severity" && c.Values!.Contains("9.8"));
cached.Provenance!.ObservationHashes.Should().BeEquivalentTo(new[] { "h1", "h2" });
cached.BuiltByJobId.Should().Be("job-42");
cached.Confidence.Should().Be(0.66);
}
private static AdvisoryLinkset BuildLinkset(
string tenant,
string source,
string advisoryId,
IEnumerable<string> observations,
DateTimeOffset createdAt,
double? confidence)
=> new(
TenantId: tenant,
Source: source,
AdvisoryId: advisoryId,
ObservationIds: observations.ToImmutableArray(),
Normalized: null,
Provenance: null,
Confidence: confidence,
Conflicts: null,
CreatedAt: createdAt,
BuiltByJobId: "job-1");
}