using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Npgsql; using NpgsqlTypes; using StellaOps.AdvisoryAI.KnowledgeSearch; using System.Text.Json; using System.Diagnostics; using System.Globalization; using System.Linq; namespace StellaOps.AdvisoryAI.UnifiedSearch; internal sealed class UnifiedSearchIndexer : IUnifiedSearchIndexer { private readonly KnowledgeSearchOptions _options; private readonly IKnowledgeSearchStore _store; private readonly IEnumerable _adapters; private readonly ILogger _logger; public UnifiedSearchIndexer( IOptions options, IKnowledgeSearchStore store, IEnumerable adapters, ILogger logger) { ArgumentNullException.ThrowIfNull(options); _options = options.Value ?? new KnowledgeSearchOptions(); _store = store ?? throw new ArgumentNullException(nameof(store)); _adapters = adapters ?? throw new ArgumentNullException(nameof(adapters)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public async Task IndexAllAsync(CancellationToken cancellationToken) { await IndexAllWithSummaryAsync(cancellationToken).ConfigureAwait(false); } internal async Task IndexAllWithSummaryAsync(CancellationToken cancellationToken) { if (!_options.Enabled || string.IsNullOrWhiteSpace(_options.ConnectionString)) { _logger.LogDebug("Unified search indexing skipped because configuration is incomplete."); return new UnifiedSearchIndexSummary(0, 0, 0); } await _store.EnsureSchemaAsync(cancellationToken).ConfigureAwait(false); var stopwatch = Stopwatch.StartNew(); var domains = 0; var chunks = 0; var changed = 0; var removed = 0; foreach (var domainGroup in _adapters .GroupBy(static adapter => adapter.Domain, StringComparer.OrdinalIgnoreCase) .OrderBy(static group => group.Key, StringComparer.OrdinalIgnoreCase)) { cancellationToken.ThrowIfCancellationRequested(); var domainStopwatch = Stopwatch.StartNew(); var domain = domainGroup.Key; var domainChunks = new List(); var hadSuccessfulAdapter = false; foreach (var adapter in domainGroup) { try { _logger.LogInformation("Unified search indexing adapter '{Adapter}' for domain '{Domain}'.", adapter.GetType().Name, domain); var adapterChunks = await adapter.ProduceChunksAsync(cancellationToken).ConfigureAwait(false); domainChunks.AddRange(adapterChunks); hadSuccessfulAdapter = true; } catch (Exception ex) { _logger.LogWarning(ex, "Failed to index adapter '{Adapter}' for domain '{Domain}'; continuing with other adapters in this domain.", adapter.GetType().Name, domain); } } if (!hadSuccessfulAdapter) { _logger.LogWarning( "Unified search skipped domain '{Domain}' because all adapters failed in this refresh cycle.", domain); continue; } var deduplicated = DeduplicateChunks(domainChunks); var changedForDomain = 0; if (deduplicated.Count > 0) { changedForDomain = await UpsertChunksAsync(deduplicated, cancellationToken).ConfigureAwait(false); } var removedForDomain = await DeleteMissingChunksByDomainAsync( domain, deduplicated.Select(static chunk => chunk.ChunkId).ToArray(), cancellationToken) .ConfigureAwait(false); domainStopwatch.Stop(); domains++; chunks += deduplicated.Count; changed += changedForDomain; removed += removedForDomain; _logger.LogInformation( "Unified search refresh domain '{Domain}' completed: seen_chunks={SeenChunkCount}, changed_chunks={ChangedChunkCount}, removed={RemovedCount}, duration_ms={DurationMs}", domain, deduplicated.Count, changedForDomain, removedForDomain, (long)domainStopwatch.Elapsed.TotalMilliseconds); } stopwatch.Stop(); _logger.LogInformation( "Unified search incremental indexing completed: domains={DomainCount}, seen_chunks={SeenChunkCount}, changed_chunks={ChangedChunkCount}, removed={RemovedCount}, duration_ms={DurationMs}", domains, chunks, changed, removed, (long)stopwatch.Elapsed.TotalMilliseconds); return new UnifiedSearchIndexSummary(domains, chunks, (long)stopwatch.Elapsed.TotalMilliseconds); } public async Task RebuildAllAsync(CancellationToken cancellationToken) { if (!_options.Enabled || string.IsNullOrWhiteSpace(_options.ConnectionString)) { _logger.LogDebug("Unified search rebuild skipped because configuration is incomplete."); return new UnifiedSearchIndexSummary(0, 0, 0); } await _store.EnsureSchemaAsync(cancellationToken).ConfigureAwait(false); var stopwatch = Stopwatch.StartNew(); var domains = 0; var chunks = 0; foreach (var domainGroup in _adapters .GroupBy(static adapter => adapter.Domain, StringComparer.OrdinalIgnoreCase) .OrderBy(static group => group.Key, StringComparer.OrdinalIgnoreCase)) { cancellationToken.ThrowIfCancellationRequested(); var domain = domainGroup.Key; var domainStopwatch = Stopwatch.StartNew(); var domainChunks = new List(); var hadSuccessfulAdapter = false; foreach (var adapter in domainGroup) { try { var adapterChunks = await adapter.ProduceChunksAsync(cancellationToken).ConfigureAwait(false); domainChunks.AddRange(adapterChunks); hadSuccessfulAdapter = true; } catch (Exception ex) { _logger.LogWarning(ex, "Failed to rebuild adapter '{Adapter}' for domain '{Domain}'; continuing with other adapters in this domain.", adapter.GetType().Name, domain); } } if (!hadSuccessfulAdapter) { _logger.LogWarning( "Unified search rebuild skipped domain '{Domain}' because all adapters failed.", domain); continue; } await DeleteChunksByDomainAsync(domain, cancellationToken).ConfigureAwait(false); var deduplicated = DeduplicateChunks(domainChunks); if (deduplicated.Count > 0) { await UpsertChunksAsync(deduplicated, cancellationToken).ConfigureAwait(false); } domainStopwatch.Stop(); domains++; chunks += deduplicated.Count; _logger.LogInformation( "Unified search rebuild domain '{Domain}' completed: chunks={ChunkCount}, duration_ms={DurationMs}", domain, deduplicated.Count, (long)domainStopwatch.Elapsed.TotalMilliseconds); } stopwatch.Stop(); return new UnifiedSearchIndexSummary(domains, chunks, (long)stopwatch.Elapsed.TotalMilliseconds); } public async Task DeleteChunksByDomainAsync(string domain, CancellationToken cancellationToken) { if (!_options.Enabled || string.IsNullOrWhiteSpace(_options.ConnectionString)) { return; } await using var dataSource = new NpgsqlDataSourceBuilder(_options.ConnectionString).Build(); const string sql = "DELETE FROM advisoryai.kb_chunk WHERE domain = @domain;"; await using var command = dataSource.CreateCommand(sql); command.CommandTimeout = 60; command.Parameters.AddWithValue("domain", domain); await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); } private async Task DeleteMissingChunksByDomainAsync( string domain, IReadOnlyCollection currentChunkIds, CancellationToken cancellationToken) { if (!_options.Enabled || string.IsNullOrWhiteSpace(_options.ConnectionString)) { return 0; } await using var dataSource = new NpgsqlDataSourceBuilder(_options.ConnectionString).Build(); await using var command = dataSource.CreateCommand(); command.CommandTimeout = 90; command.Parameters.AddWithValue("domain", domain); if (currentChunkIds.Count == 0) { command.CommandText = "DELETE FROM advisoryai.kb_chunk WHERE domain = @domain;"; } else { command.CommandText = """ DELETE FROM advisoryai.kb_chunk WHERE domain = @domain AND NOT (chunk_id = ANY(@chunk_ids)); """; command.Parameters.AddWithValue( "chunk_ids", NpgsqlDbType.Array | NpgsqlDbType.Text, currentChunkIds.ToArray()); } return await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); } private async Task UpsertChunksAsync(IReadOnlyList chunks, CancellationToken cancellationToken) { await using var dataSource = new NpgsqlDataSourceBuilder(_options.ConnectionString).Build(); await using var connection = await dataSource.OpenConnectionAsync(cancellationToken).ConfigureAwait(false); var hasEmbeddingVectorColumn = await HasEmbeddingVectorColumnAsync(connection, cancellationToken).ConfigureAwait(false); // Ensure parent documents exist for each unique DocId var uniqueDocIds = chunks.Select(static c => c.DocId).Distinct(StringComparer.Ordinal).ToArray(); foreach (var docId in uniqueDocIds) { var chunk = chunks.First(c => c.DocId == docId); await EnsureDocumentExistsAsync(connection, docId, chunk, cancellationToken).ConfigureAwait(false); } var sql = hasEmbeddingVectorColumn ? """ INSERT INTO advisoryai.kb_chunk ( chunk_id, doc_id, kind, anchor, section_path, span_start, span_end, title, body, body_tsv, body_tsv_en, body_tsv_de, body_tsv_fr, body_tsv_es, body_tsv_ru, embedding, embedding_vec, metadata, domain, entity_key, entity_type, freshness, indexed_at ) VALUES ( @chunk_id, @doc_id, @kind, @anchor, @section_path, @span_start, @span_end, @title, @body, setweight(to_tsvector('simple', coalesce(@title, '')), 'A') || setweight(to_tsvector('simple', coalesce(@section_path, '')), 'B') || setweight(to_tsvector('simple', coalesce(@body, '')), 'D'), setweight(to_tsvector('english', coalesce(@title, '')), 'A') || setweight(to_tsvector('english', coalesce(@section_path, '')), 'B') || setweight(to_tsvector('english', coalesce(@body, '')), 'D'), setweight(to_tsvector('german', coalesce(@title, '')), 'A') || setweight(to_tsvector('german', coalesce(@section_path, '')), 'B') || setweight(to_tsvector('german', coalesce(@body, '')), 'D'), setweight(to_tsvector('french', coalesce(@title, '')), 'A') || setweight(to_tsvector('french', coalesce(@section_path, '')), 'B') || setweight(to_tsvector('french', coalesce(@body, '')), 'D'), setweight(to_tsvector('spanish', coalesce(@title, '')), 'A') || setweight(to_tsvector('spanish', coalesce(@section_path, '')), 'B') || setweight(to_tsvector('spanish', coalesce(@body, '')), 'D'), setweight(to_tsvector('russian', coalesce(@title, '')), 'A') || setweight(to_tsvector('russian', coalesce(@section_path, '')), 'B') || setweight(to_tsvector('russian', coalesce(@body, '')), 'D'), @embedding, CAST(@embedding_vector AS vector), @metadata::jsonb, @domain, @entity_key, @entity_type, @freshness, NOW() ) ON CONFLICT (chunk_id) DO UPDATE SET doc_id = EXCLUDED.doc_id, kind = EXCLUDED.kind, anchor = EXCLUDED.anchor, section_path = EXCLUDED.section_path, span_start = EXCLUDED.span_start, span_end = EXCLUDED.span_end, title = EXCLUDED.title, body = EXCLUDED.body, body_tsv = EXCLUDED.body_tsv, body_tsv_en = EXCLUDED.body_tsv_en, body_tsv_de = EXCLUDED.body_tsv_de, body_tsv_fr = EXCLUDED.body_tsv_fr, body_tsv_es = EXCLUDED.body_tsv_es, body_tsv_ru = EXCLUDED.body_tsv_ru, embedding = EXCLUDED.embedding, embedding_vec = EXCLUDED.embedding_vec, metadata = EXCLUDED.metadata, domain = EXCLUDED.domain, entity_key = EXCLUDED.entity_key, entity_type = EXCLUDED.entity_type, freshness = EXCLUDED.freshness, indexed_at = NOW() WHERE advisoryai.kb_chunk.doc_id IS DISTINCT FROM EXCLUDED.doc_id OR advisoryai.kb_chunk.kind IS DISTINCT FROM EXCLUDED.kind OR advisoryai.kb_chunk.anchor IS DISTINCT FROM EXCLUDED.anchor OR advisoryai.kb_chunk.section_path IS DISTINCT FROM EXCLUDED.section_path OR advisoryai.kb_chunk.span_start IS DISTINCT FROM EXCLUDED.span_start OR advisoryai.kb_chunk.span_end IS DISTINCT FROM EXCLUDED.span_end OR advisoryai.kb_chunk.title IS DISTINCT FROM EXCLUDED.title OR advisoryai.kb_chunk.body IS DISTINCT FROM EXCLUDED.body OR advisoryai.kb_chunk.body_tsv IS DISTINCT FROM EXCLUDED.body_tsv OR advisoryai.kb_chunk.body_tsv_en IS DISTINCT FROM EXCLUDED.body_tsv_en OR advisoryai.kb_chunk.body_tsv_de IS DISTINCT FROM EXCLUDED.body_tsv_de OR advisoryai.kb_chunk.body_tsv_fr IS DISTINCT FROM EXCLUDED.body_tsv_fr OR advisoryai.kb_chunk.body_tsv_es IS DISTINCT FROM EXCLUDED.body_tsv_es OR advisoryai.kb_chunk.body_tsv_ru IS DISTINCT FROM EXCLUDED.body_tsv_ru OR advisoryai.kb_chunk.embedding IS DISTINCT FROM EXCLUDED.embedding OR advisoryai.kb_chunk.embedding_vec IS DISTINCT FROM EXCLUDED.embedding_vec OR advisoryai.kb_chunk.metadata IS DISTINCT FROM EXCLUDED.metadata OR advisoryai.kb_chunk.domain IS DISTINCT FROM EXCLUDED.domain OR advisoryai.kb_chunk.entity_key IS DISTINCT FROM EXCLUDED.entity_key OR advisoryai.kb_chunk.entity_type IS DISTINCT FROM EXCLUDED.entity_type OR advisoryai.kb_chunk.freshness IS DISTINCT FROM EXCLUDED.freshness; """ : """ INSERT INTO advisoryai.kb_chunk ( chunk_id, doc_id, kind, anchor, section_path, span_start, span_end, title, body, body_tsv, body_tsv_en, body_tsv_de, body_tsv_fr, body_tsv_es, body_tsv_ru, embedding, metadata, domain, entity_key, entity_type, freshness, indexed_at ) VALUES ( @chunk_id, @doc_id, @kind, @anchor, @section_path, @span_start, @span_end, @title, @body, setweight(to_tsvector('simple', coalesce(@title, '')), 'A') || setweight(to_tsvector('simple', coalesce(@section_path, '')), 'B') || setweight(to_tsvector('simple', coalesce(@body, '')), 'D'), setweight(to_tsvector('english', coalesce(@title, '')), 'A') || setweight(to_tsvector('english', coalesce(@section_path, '')), 'B') || setweight(to_tsvector('english', coalesce(@body, '')), 'D'), setweight(to_tsvector('german', coalesce(@title, '')), 'A') || setweight(to_tsvector('german', coalesce(@section_path, '')), 'B') || setweight(to_tsvector('german', coalesce(@body, '')), 'D'), setweight(to_tsvector('french', coalesce(@title, '')), 'A') || setweight(to_tsvector('french', coalesce(@section_path, '')), 'B') || setweight(to_tsvector('french', coalesce(@body, '')), 'D'), setweight(to_tsvector('spanish', coalesce(@title, '')), 'A') || setweight(to_tsvector('spanish', coalesce(@section_path, '')), 'B') || setweight(to_tsvector('spanish', coalesce(@body, '')), 'D'), setweight(to_tsvector('russian', coalesce(@title, '')), 'A') || setweight(to_tsvector('russian', coalesce(@section_path, '')), 'B') || setweight(to_tsvector('russian', coalesce(@body, '')), 'D'), @embedding, @metadata::jsonb, @domain, @entity_key, @entity_type, @freshness, NOW() ) ON CONFLICT (chunk_id) DO UPDATE SET doc_id = EXCLUDED.doc_id, kind = EXCLUDED.kind, anchor = EXCLUDED.anchor, section_path = EXCLUDED.section_path, span_start = EXCLUDED.span_start, span_end = EXCLUDED.span_end, title = EXCLUDED.title, body = EXCLUDED.body, body_tsv = EXCLUDED.body_tsv, body_tsv_en = EXCLUDED.body_tsv_en, body_tsv_de = EXCLUDED.body_tsv_de, body_tsv_fr = EXCLUDED.body_tsv_fr, body_tsv_es = EXCLUDED.body_tsv_es, body_tsv_ru = EXCLUDED.body_tsv_ru, embedding = EXCLUDED.embedding, metadata = EXCLUDED.metadata, domain = EXCLUDED.domain, entity_key = EXCLUDED.entity_key, entity_type = EXCLUDED.entity_type, freshness = EXCLUDED.freshness, indexed_at = NOW() WHERE advisoryai.kb_chunk.doc_id IS DISTINCT FROM EXCLUDED.doc_id OR advisoryai.kb_chunk.kind IS DISTINCT FROM EXCLUDED.kind OR advisoryai.kb_chunk.anchor IS DISTINCT FROM EXCLUDED.anchor OR advisoryai.kb_chunk.section_path IS DISTINCT FROM EXCLUDED.section_path OR advisoryai.kb_chunk.span_start IS DISTINCT FROM EXCLUDED.span_start OR advisoryai.kb_chunk.span_end IS DISTINCT FROM EXCLUDED.span_end OR advisoryai.kb_chunk.title IS DISTINCT FROM EXCLUDED.title OR advisoryai.kb_chunk.body IS DISTINCT FROM EXCLUDED.body OR advisoryai.kb_chunk.body_tsv IS DISTINCT FROM EXCLUDED.body_tsv OR advisoryai.kb_chunk.body_tsv_en IS DISTINCT FROM EXCLUDED.body_tsv_en OR advisoryai.kb_chunk.body_tsv_de IS DISTINCT FROM EXCLUDED.body_tsv_de OR advisoryai.kb_chunk.body_tsv_fr IS DISTINCT FROM EXCLUDED.body_tsv_fr OR advisoryai.kb_chunk.body_tsv_es IS DISTINCT FROM EXCLUDED.body_tsv_es OR advisoryai.kb_chunk.body_tsv_ru IS DISTINCT FROM EXCLUDED.body_tsv_ru OR advisoryai.kb_chunk.embedding IS DISTINCT FROM EXCLUDED.embedding OR advisoryai.kb_chunk.metadata IS DISTINCT FROM EXCLUDED.metadata OR advisoryai.kb_chunk.domain IS DISTINCT FROM EXCLUDED.domain OR advisoryai.kb_chunk.entity_key IS DISTINCT FROM EXCLUDED.entity_key OR advisoryai.kb_chunk.entity_type IS DISTINCT FROM EXCLUDED.entity_type OR advisoryai.kb_chunk.freshness IS DISTINCT FROM EXCLUDED.freshness; """; await using var command = connection.CreateCommand(); command.CommandText = sql; command.CommandTimeout = 120; var affectedRows = 0; foreach (var chunk in chunks) { command.Parameters.Clear(); command.Parameters.AddWithValue("chunk_id", chunk.ChunkId); command.Parameters.AddWithValue("doc_id", chunk.DocId); command.Parameters.AddWithValue("kind", chunk.Kind); command.Parameters.AddWithValue("anchor", (object?)chunk.Anchor ?? DBNull.Value); command.Parameters.AddWithValue("section_path", (object?)chunk.SectionPath ?? DBNull.Value); command.Parameters.AddWithValue("span_start", chunk.SpanStart); command.Parameters.AddWithValue("span_end", chunk.SpanEnd); command.Parameters.AddWithValue("title", chunk.Title); command.Parameters.AddWithValue("body", chunk.Body); command.Parameters.AddWithValue( "embedding", NpgsqlDbType.Array | NpgsqlDbType.Real, chunk.Embedding is null ? Array.Empty() : chunk.Embedding); if (hasEmbeddingVectorColumn) { var vectorLiteral = chunk.Embedding is null ? (object)DBNull.Value : BuildVectorLiteral(chunk.Embedding); command.Parameters.AddWithValue("embedding_vector", vectorLiteral); } command.Parameters.AddWithValue("metadata", NpgsqlDbType.Jsonb, chunk.Metadata.RootElement.GetRawText()); command.Parameters.AddWithValue("domain", chunk.Domain); command.Parameters.AddWithValue("entity_key", (object?)chunk.EntityKey ?? DBNull.Value); command.Parameters.AddWithValue("entity_type", (object?)chunk.EntityType ?? DBNull.Value); command.Parameters.AddWithValue("freshness", chunk.Freshness.HasValue ? (object)chunk.Freshness.Value : DBNull.Value); affectedRows += await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); } return affectedRows; } private static async Task HasEmbeddingVectorColumnAsync( NpgsqlConnection connection, CancellationToken cancellationToken) { const string sql = """ SELECT EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_schema = 'advisoryai' AND table_name = 'kb_chunk' AND column_name = 'embedding_vec' ); """; await using var command = connection.CreateCommand(); command.CommandText = sql; command.CommandTimeout = 30; var result = await command.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false); return result is bool value && value; } private static string BuildVectorLiteral(float[] values) { return "[" + string.Join(",", values.Select(static value => value.ToString("G9", CultureInfo.InvariantCulture))) + "]"; } private static async Task EnsureDocumentExistsAsync( NpgsqlConnection connection, string docId, UnifiedChunk chunk, CancellationToken cancellationToken) { var sourceRef = ResolveSourceRef(chunk); var sourcePath = ResolveSourcePath(chunk); const string sql = """ INSERT INTO advisoryai.kb_doc (doc_id, doc_type, product, version, source_ref, path, title, content_hash, metadata, indexed_at) VALUES (@doc_id, @doc_type, @product, @version, @source_ref, @path, @title, @content_hash, '{}'::jsonb, NOW()) ON CONFLICT (doc_id) DO UPDATE SET title = EXCLUDED.title, content_hash = EXCLUDED.content_hash, indexed_at = NOW(); """; await using var command = connection.CreateCommand(); command.CommandText = sql; command.CommandTimeout = 30; command.Parameters.AddWithValue("doc_id", docId); command.Parameters.AddWithValue("doc_type", chunk.Domain); command.Parameters.AddWithValue("product", "stella-ops"); command.Parameters.AddWithValue("version", "local"); command.Parameters.AddWithValue("source_ref", sourceRef); command.Parameters.AddWithValue("path", sourcePath); command.Parameters.AddWithValue("title", chunk.Title); command.Parameters.AddWithValue("content_hash", KnowledgeSearchText.StableId(chunk.Body)); await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); } private static string ResolveSourceRef(UnifiedChunk chunk) { if (!string.IsNullOrWhiteSpace(chunk.EntityKey)) { return chunk.EntityKey.Trim(); } return chunk.DocId; } private static string ResolveSourcePath(UnifiedChunk chunk) { if (!string.IsNullOrWhiteSpace(chunk.DocId)) { return chunk.DocId; } return $"{chunk.Domain}/{chunk.Kind}"; } private static IReadOnlyList DeduplicateChunks(IEnumerable chunks) { var byChunkId = new SortedDictionary(StringComparer.Ordinal); foreach (var chunk in chunks) { if (string.IsNullOrWhiteSpace(chunk.ChunkId)) { continue; } byChunkId[chunk.ChunkId] = chunk; } return byChunkId.Values.ToArray(); } } public sealed record UnifiedSearchIndexSummary( int DomainCount, int ChunkCount, long DurationMs);