tests fixes and some product advisories tunes ups

This commit is contained in:
master
2026-01-30 07:57:43 +02:00
parent 644887997c
commit 55744f6a39
345 changed files with 26290 additions and 2267 deletions

View File

@@ -0,0 +1,95 @@
-- -----------------------------------------------------------------------------
-- Migration: 20260129_001_create_identity_watchlist
-- Sprint: SPRINT_0129_001_ATTESTOR_identity_watchlist_alerting
-- Task: WATCH-004
-- Description: Creates identity watchlist and alert deduplication tables.
-- -----------------------------------------------------------------------------
-- Watchlist entries table
CREATE TABLE IF NOT EXISTS attestor.identity_watchlist (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id TEXT NOT NULL,
scope TEXT NOT NULL DEFAULT 'Tenant',
display_name TEXT NOT NULL,
description TEXT,
-- Identity matching fields (at least one required)
issuer TEXT,
subject_alternative_name TEXT,
key_id TEXT,
match_mode TEXT NOT NULL DEFAULT 'Exact',
-- Alert configuration
severity TEXT NOT NULL DEFAULT 'Warning',
enabled BOOLEAN NOT NULL DEFAULT TRUE,
channel_overrides JSONB,
suppress_duplicates_minutes INT NOT NULL DEFAULT 60,
-- Metadata
tags TEXT[],
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_by TEXT NOT NULL,
updated_by TEXT NOT NULL,
-- Constraints
CONSTRAINT chk_at_least_one_identity CHECK (
issuer IS NOT NULL OR
subject_alternative_name IS NOT NULL OR
key_id IS NOT NULL
),
CONSTRAINT chk_scope_valid CHECK (scope IN ('Tenant', 'Global', 'System')),
CONSTRAINT chk_match_mode_valid CHECK (match_mode IN ('Exact', 'Prefix', 'Glob', 'Regex')),
CONSTRAINT chk_severity_valid CHECK (severity IN ('Info', 'Warning', 'Critical')),
CONSTRAINT chk_suppress_duplicates_positive CHECK (suppress_duplicates_minutes >= 1)
);
-- Performance indexes for active entry lookup
CREATE INDEX IF NOT EXISTS idx_watchlist_tenant_enabled
ON attestor.identity_watchlist(tenant_id)
WHERE enabled = TRUE;
CREATE INDEX IF NOT EXISTS idx_watchlist_scope_enabled
ON attestor.identity_watchlist(scope)
WHERE enabled = TRUE;
CREATE INDEX IF NOT EXISTS idx_watchlist_issuer
ON attestor.identity_watchlist(issuer)
WHERE enabled = TRUE AND issuer IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_watchlist_san
ON attestor.identity_watchlist(subject_alternative_name)
WHERE enabled = TRUE AND subject_alternative_name IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_watchlist_keyid
ON attestor.identity_watchlist(key_id)
WHERE enabled = TRUE AND key_id IS NOT NULL;
-- Alert deduplication table
CREATE TABLE IF NOT EXISTS attestor.identity_alert_dedup (
watchlist_id UUID NOT NULL,
identity_hash TEXT NOT NULL,
last_alert_at TIMESTAMPTZ NOT NULL,
alert_count INT NOT NULL DEFAULT 0,
PRIMARY KEY (watchlist_id, identity_hash)
);
-- Index for cleanup
CREATE INDEX IF NOT EXISTS idx_alert_dedup_last_alert
ON attestor.identity_alert_dedup(last_alert_at);
-- Comment documentation
COMMENT ON TABLE attestor.identity_watchlist IS
'Watchlist entries for monitoring signing identity appearances in transparency logs.';
COMMENT ON COLUMN attestor.identity_watchlist.scope IS
'Visibility scope: Tenant (owning tenant only), Global (all tenants), System (read-only).';
COMMENT ON COLUMN attestor.identity_watchlist.match_mode IS
'Pattern matching mode: Exact, Prefix, Glob, or Regex.';
COMMENT ON COLUMN attestor.identity_watchlist.suppress_duplicates_minutes IS
'Deduplication window in minutes. Alerts for same identity within window are suppressed.';
COMMENT ON TABLE attestor.identity_alert_dedup IS
'Tracks alert deduplication state to prevent alert storms.';

View File

@@ -0,0 +1,414 @@
// -----------------------------------------------------------------------------
// PostgresWatchlistRepository.cs
// Sprint: SPRINT_0129_001_ATTESTOR_identity_watchlist_alerting
// Task: WATCH-004
// Description: PostgreSQL implementation of watchlist repository.
// -----------------------------------------------------------------------------
using System.Collections.Concurrent;
using Microsoft.Extensions.Logging;
using Npgsql;
using NpgsqlTypes;
using StellaOps.Attestor.Watchlist.Models;
using StellaOps.Attestor.Watchlist.Storage;
namespace StellaOps.Attestor.Infrastructure.Watchlist;
/// <summary>
/// PostgreSQL implementation of the watchlist repository with caching.
/// </summary>
public sealed class PostgresWatchlistRepository : IWatchlistRepository
{
private readonly NpgsqlDataSource _dataSource;
private readonly ILogger<PostgresWatchlistRepository> _logger;
private readonly ConcurrentDictionary<string, CachedEntries> _cache = new();
private readonly TimeSpan _cacheTimeout = TimeSpan.FromSeconds(5);
public PostgresWatchlistRepository(
NpgsqlDataSource dataSource,
ILogger<PostgresWatchlistRepository> logger)
{
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc />
public async Task<WatchedIdentity?> GetAsync(Guid id, CancellationToken cancellationToken = default)
{
const string sql = """
SELECT id, tenant_id, scope, display_name, description,
issuer, subject_alternative_name, key_id, match_mode,
severity, enabled, channel_overrides, suppress_duplicates_minutes,
tags, created_at, updated_at, created_by, updated_by
FROM attestor.identity_watchlist
WHERE id = @id
""";
await using var conn = await _dataSource.OpenConnectionAsync(cancellationToken);
await using var cmd = new NpgsqlCommand(sql, conn);
cmd.Parameters.AddWithValue("id", id);
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
if (await reader.ReadAsync(cancellationToken))
{
return MapToEntry(reader);
}
return null;
}
/// <inheritdoc />
public async Task<IReadOnlyList<WatchedIdentity>> ListAsync(
string tenantId,
bool includeGlobal = true,
CancellationToken cancellationToken = default)
{
var sql = includeGlobal
? """
SELECT id, tenant_id, scope, display_name, description,
issuer, subject_alternative_name, key_id, match_mode,
severity, enabled, channel_overrides, suppress_duplicates_minutes,
tags, created_at, updated_at, created_by, updated_by
FROM attestor.identity_watchlist
WHERE tenant_id = @tenantId OR scope IN ('Global', 'System')
ORDER BY display_name
"""
: """
SELECT id, tenant_id, scope, display_name, description,
issuer, subject_alternative_name, key_id, match_mode,
severity, enabled, channel_overrides, suppress_duplicates_minutes,
tags, created_at, updated_at, created_by, updated_by
FROM attestor.identity_watchlist
WHERE tenant_id = @tenantId
ORDER BY display_name
""";
await using var conn = await _dataSource.OpenConnectionAsync(cancellationToken);
await using var cmd = new NpgsqlCommand(sql, conn);
cmd.Parameters.AddWithValue("tenantId", tenantId);
var entries = new List<WatchedIdentity>();
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
while (await reader.ReadAsync(cancellationToken))
{
entries.Add(MapToEntry(reader));
}
return entries;
}
/// <inheritdoc />
public async Task<IReadOnlyList<WatchedIdentity>> GetActiveForMatchingAsync(
string tenantId,
CancellationToken cancellationToken = default)
{
// Check cache first
if (_cache.TryGetValue(tenantId, out var cached) &&
cached.ExpiresAt > DateTimeOffset.UtcNow)
{
return cached.Entries;
}
const string sql = """
SELECT id, tenant_id, scope, display_name, description,
issuer, subject_alternative_name, key_id, match_mode,
severity, enabled, channel_overrides, suppress_duplicates_minutes,
tags, created_at, updated_at, created_by, updated_by
FROM attestor.identity_watchlist
WHERE enabled = TRUE
AND (tenant_id = @tenantId OR scope IN ('Global', 'System'))
ORDER BY id
""";
await using var conn = await _dataSource.OpenConnectionAsync(cancellationToken);
await using var cmd = new NpgsqlCommand(sql, conn);
cmd.Parameters.AddWithValue("tenantId", tenantId);
var entries = new List<WatchedIdentity>();
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
while (await reader.ReadAsync(cancellationToken))
{
entries.Add(MapToEntry(reader));
}
// Update cache
_cache[tenantId] = new CachedEntries(entries, DateTimeOffset.UtcNow.Add(_cacheTimeout));
return entries;
}
/// <inheritdoc />
public async Task<WatchedIdentity> UpsertAsync(
WatchedIdentity entry,
CancellationToken cancellationToken = default)
{
const string sql = """
INSERT INTO attestor.identity_watchlist (
id, tenant_id, scope, display_name, description,
issuer, subject_alternative_name, key_id, match_mode,
severity, enabled, channel_overrides, suppress_duplicates_minutes,
tags, created_at, updated_at, created_by, updated_by
) VALUES (
@id, @tenantId, @scope, @displayName, @description,
@issuer, @san, @keyId, @matchMode,
@severity, @enabled, @channelOverrides, @suppressMinutes,
@tags, @createdAt, @updatedAt, @createdBy, @updatedBy
)
ON CONFLICT (id) DO UPDATE SET
display_name = EXCLUDED.display_name,
description = EXCLUDED.description,
issuer = EXCLUDED.issuer,
subject_alternative_name = EXCLUDED.subject_alternative_name,
key_id = EXCLUDED.key_id,
match_mode = EXCLUDED.match_mode,
severity = EXCLUDED.severity,
enabled = EXCLUDED.enabled,
channel_overrides = EXCLUDED.channel_overrides,
suppress_duplicates_minutes = EXCLUDED.suppress_duplicates_minutes,
tags = EXCLUDED.tags,
updated_at = EXCLUDED.updated_at,
updated_by = EXCLUDED.updated_by
RETURNING id, tenant_id, scope, display_name, description,
issuer, subject_alternative_name, key_id, match_mode,
severity, enabled, channel_overrides, suppress_duplicates_minutes,
tags, created_at, updated_at, created_by, updated_by
""";
await using var conn = await _dataSource.OpenConnectionAsync(cancellationToken);
await using var cmd = new NpgsqlCommand(sql, conn);
cmd.Parameters.AddWithValue("id", entry.Id);
cmd.Parameters.AddWithValue("tenantId", entry.TenantId);
cmd.Parameters.AddWithValue("scope", entry.Scope.ToString());
cmd.Parameters.AddWithValue("displayName", entry.DisplayName);
cmd.Parameters.AddWithValue("description", (object?)entry.Description ?? DBNull.Value);
cmd.Parameters.AddWithValue("issuer", (object?)entry.Issuer ?? DBNull.Value);
cmd.Parameters.AddWithValue("san", (object?)entry.SubjectAlternativeName ?? DBNull.Value);
cmd.Parameters.AddWithValue("keyId", (object?)entry.KeyId ?? DBNull.Value);
cmd.Parameters.AddWithValue("matchMode", entry.MatchMode.ToString());
cmd.Parameters.AddWithValue("severity", entry.Severity.ToString());
cmd.Parameters.AddWithValue("enabled", entry.Enabled);
cmd.Parameters.AddWithValue("channelOverrides",
NpgsqlDbType.Jsonb,
entry.ChannelOverrides is { Count: > 0 }
? System.Text.Json.JsonSerializer.Serialize(entry.ChannelOverrides)
: DBNull.Value);
cmd.Parameters.AddWithValue("suppressMinutes", entry.SuppressDuplicatesMinutes);
cmd.Parameters.AddWithValue("tags",
NpgsqlDbType.Array | NpgsqlDbType.Text,
entry.Tags is { Count: > 0 } ? entry.Tags.ToArray() : Array.Empty<string>());
cmd.Parameters.AddWithValue("createdAt", entry.CreatedAt);
cmd.Parameters.AddWithValue("updatedAt", entry.UpdatedAt);
cmd.Parameters.AddWithValue("createdBy", entry.CreatedBy);
cmd.Parameters.AddWithValue("updatedBy", entry.UpdatedBy);
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
if (await reader.ReadAsync(cancellationToken))
{
// Invalidate cache
InvalidateCache(entry.TenantId);
return MapToEntry(reader);
}
throw new InvalidOperationException("Upsert did not return the expected row");
}
/// <inheritdoc />
public async Task<bool> DeleteAsync(
Guid id,
string tenantId,
CancellationToken cancellationToken = default)
{
const string sql = """
DELETE FROM attestor.identity_watchlist
WHERE id = @id AND (tenant_id = @tenantId OR @tenantId = 'system-admin')
""";
await using var conn = await _dataSource.OpenConnectionAsync(cancellationToken);
await using var cmd = new NpgsqlCommand(sql, conn);
cmd.Parameters.AddWithValue("id", id);
cmd.Parameters.AddWithValue("tenantId", tenantId);
var deleted = await cmd.ExecuteNonQueryAsync(cancellationToken);
if (deleted > 0)
{
InvalidateCache(tenantId);
}
return deleted > 0;
}
/// <inheritdoc />
public async Task<int> GetCountAsync(string tenantId, CancellationToken cancellationToken = default)
{
const string sql = """
SELECT COUNT(*)
FROM attestor.identity_watchlist
WHERE tenant_id = @tenantId OR scope IN ('Global', 'System')
""";
await using var conn = await _dataSource.OpenConnectionAsync(cancellationToken);
await using var cmd = new NpgsqlCommand(sql, conn);
cmd.Parameters.AddWithValue("tenantId", tenantId);
var result = await cmd.ExecuteScalarAsync(cancellationToken);
return Convert.ToInt32(result);
}
private void InvalidateCache(string tenantId)
{
_cache.TryRemove(tenantId, out _);
}
private static WatchedIdentity MapToEntry(NpgsqlDataReader reader)
{
var channelOverridesJson = reader.IsDBNull(reader.GetOrdinal("channel_overrides"))
? null
: reader.GetString(reader.GetOrdinal("channel_overrides"));
IReadOnlyList<string>? channelOverrides = null;
if (!string.IsNullOrEmpty(channelOverridesJson))
{
channelOverrides = System.Text.Json.JsonSerializer.Deserialize<List<string>>(channelOverridesJson);
}
var tagsOrdinal = reader.GetOrdinal("tags");
IReadOnlyList<string>? tags = reader.IsDBNull(tagsOrdinal)
? null
: (string[])reader.GetValue(tagsOrdinal);
return new WatchedIdentity
{
Id = reader.GetGuid(reader.GetOrdinal("id")),
TenantId = reader.GetString(reader.GetOrdinal("tenant_id")),
Scope = Enum.Parse<WatchlistScope>(reader.GetString(reader.GetOrdinal("scope"))),
DisplayName = reader.GetString(reader.GetOrdinal("display_name")),
Description = reader.IsDBNull(reader.GetOrdinal("description"))
? null
: reader.GetString(reader.GetOrdinal("description")),
Issuer = reader.IsDBNull(reader.GetOrdinal("issuer"))
? null
: reader.GetString(reader.GetOrdinal("issuer")),
SubjectAlternativeName = reader.IsDBNull(reader.GetOrdinal("subject_alternative_name"))
? null
: reader.GetString(reader.GetOrdinal("subject_alternative_name")),
KeyId = reader.IsDBNull(reader.GetOrdinal("key_id"))
? null
: reader.GetString(reader.GetOrdinal("key_id")),
MatchMode = Enum.Parse<WatchlistMatchMode>(reader.GetString(reader.GetOrdinal("match_mode"))),
Severity = Enum.Parse<IdentityAlertSeverity>(reader.GetString(reader.GetOrdinal("severity"))),
Enabled = reader.GetBoolean(reader.GetOrdinal("enabled")),
ChannelOverrides = channelOverrides,
SuppressDuplicatesMinutes = reader.GetInt32(reader.GetOrdinal("suppress_duplicates_minutes")),
Tags = tags,
CreatedAt = reader.GetDateTime(reader.GetOrdinal("created_at")),
UpdatedAt = reader.GetDateTime(reader.GetOrdinal("updated_at")),
CreatedBy = reader.GetString(reader.GetOrdinal("created_by")),
UpdatedBy = reader.GetString(reader.GetOrdinal("updated_by"))
};
}
private sealed record CachedEntries(IReadOnlyList<WatchedIdentity> Entries, DateTimeOffset ExpiresAt);
}
/// <summary>
/// PostgreSQL implementation of the alert dedup repository.
/// </summary>
public sealed class PostgresAlertDedupRepository : IAlertDedupRepository
{
private readonly NpgsqlDataSource _dataSource;
public PostgresAlertDedupRepository(NpgsqlDataSource dataSource)
{
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
}
/// <inheritdoc />
public async Task<AlertDedupStatus> CheckAndUpdateAsync(
Guid watchlistId,
string identityHash,
int dedupWindowMinutes,
CancellationToken cancellationToken = default)
{
var windowStart = DateTimeOffset.UtcNow.AddMinutes(-dedupWindowMinutes);
var windowEnd = DateTimeOffset.UtcNow.AddMinutes(dedupWindowMinutes);
const string sql = """
INSERT INTO attestor.identity_alert_dedup (watchlist_id, identity_hash, last_alert_at, alert_count)
VALUES (@watchlistId, @identityHash, @now, 1)
ON CONFLICT (watchlist_id, identity_hash) DO UPDATE SET
alert_count = CASE
WHEN attestor.identity_alert_dedup.last_alert_at < @windowStart
THEN 1
ELSE attestor.identity_alert_dedup.alert_count + 1
END,
last_alert_at = CASE
WHEN attestor.identity_alert_dedup.last_alert_at < @windowStart
THEN @now
ELSE attestor.identity_alert_dedup.last_alert_at
END
RETURNING
CASE WHEN last_alert_at < @now THEN FALSE ELSE TRUE END as should_suppress,
alert_count,
last_alert_at + INTERVAL '1 minute' * @dedupMinutes as window_expires
""";
await using var conn = await _dataSource.OpenConnectionAsync(cancellationToken);
await using var cmd = new NpgsqlCommand(sql, conn);
cmd.Parameters.AddWithValue("watchlistId", watchlistId);
cmd.Parameters.AddWithValue("identityHash", identityHash);
cmd.Parameters.AddWithValue("now", DateTimeOffset.UtcNow);
cmd.Parameters.AddWithValue("windowStart", windowStart);
cmd.Parameters.AddWithValue("dedupMinutes", dedupWindowMinutes);
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
if (await reader.ReadAsync(cancellationToken))
{
var shouldSuppress = reader.GetBoolean(0);
var alertCount = reader.GetInt32(1);
var windowExpires = reader.GetDateTime(2);
return shouldSuppress
? AlertDedupStatus.Suppress(alertCount, windowExpires)
: AlertDedupStatus.Send(alertCount - 1);
}
return AlertDedupStatus.Send();
}
/// <inheritdoc />
public async Task<int> GetSuppressedCountAsync(
Guid watchlistId,
string identityHash,
CancellationToken cancellationToken = default)
{
const string sql = """
SELECT alert_count FROM attestor.identity_alert_dedup
WHERE watchlist_id = @watchlistId AND identity_hash = @identityHash
""";
await using var conn = await _dataSource.OpenConnectionAsync(cancellationToken);
await using var cmd = new NpgsqlCommand(sql, conn);
cmd.Parameters.AddWithValue("watchlistId", watchlistId);
cmd.Parameters.AddWithValue("identityHash", identityHash);
var result = await cmd.ExecuteScalarAsync(cancellationToken);
return result is null ? 0 : Convert.ToInt32(result);
}
/// <inheritdoc />
public async Task<int> CleanupExpiredAsync(CancellationToken cancellationToken = default)
{
// Clean up records older than 7 days
const string sql = """
DELETE FROM attestor.identity_alert_dedup
WHERE last_alert_at < NOW() - INTERVAL '7 days'
""";
await using var conn = await _dataSource.OpenConnectionAsync(cancellationToken);
await using var cmd = new NpgsqlCommand(sql, conn);
return await cmd.ExecuteNonQueryAsync(cancellationToken);
}
}