search and ai stabilization work, localization stablized.
This commit is contained in:
@@ -0,0 +1,219 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using Npgsql;
|
||||
using NpgsqlTypes;
|
||||
using StellaOps.AdvisoryAI.KnowledgeSearch;
|
||||
using System.Text.Json;
|
||||
using System.Diagnostics;
|
||||
using System.Linq;
|
||||
|
||||
namespace StellaOps.AdvisoryAI.UnifiedSearch;
|
||||
|
||||
internal sealed class UnifiedSearchIndexer : IUnifiedSearchIndexer
|
||||
{
|
||||
private readonly KnowledgeSearchOptions _options;
|
||||
private readonly IEnumerable<ISearchIngestionAdapter> _adapters;
|
||||
private readonly ILogger<UnifiedSearchIndexer> _logger;
|
||||
|
||||
public UnifiedSearchIndexer(
|
||||
IOptions<KnowledgeSearchOptions> options,
|
||||
IEnumerable<ISearchIngestionAdapter> adapters,
|
||||
ILogger<UnifiedSearchIndexer> logger)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(options);
|
||||
_options = options.Value ?? new KnowledgeSearchOptions();
|
||||
_adapters = adapters ?? throw new ArgumentNullException(nameof(adapters));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
public async Task IndexAllAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (!_options.Enabled || string.IsNullOrWhiteSpace(_options.ConnectionString))
|
||||
{
|
||||
_logger.LogDebug("Unified search indexing skipped because configuration is incomplete.");
|
||||
return;
|
||||
}
|
||||
|
||||
foreach (var adapter in _adapters)
|
||||
{
|
||||
try
|
||||
{
|
||||
_logger.LogInformation("Unified search indexing domain '{Domain}'.", adapter.Domain);
|
||||
var chunks = await adapter.ProduceChunksAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (chunks.Count == 0)
|
||||
{
|
||||
_logger.LogDebug("No chunks produced by adapter for domain '{Domain}'.", adapter.Domain);
|
||||
continue;
|
||||
}
|
||||
|
||||
await UpsertChunksAsync(chunks, cancellationToken).ConfigureAwait(false);
|
||||
_logger.LogInformation("Indexed {Count} chunks for domain '{Domain}'.", chunks.Count, adapter.Domain);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Failed to index domain '{Domain}'; continuing with other adapters.", adapter.Domain);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<UnifiedSearchIndexSummary> RebuildAllAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (!_options.Enabled || string.IsNullOrWhiteSpace(_options.ConnectionString))
|
||||
{
|
||||
_logger.LogDebug("Unified search rebuild skipped because configuration is incomplete.");
|
||||
return new UnifiedSearchIndexSummary(0, 0, 0);
|
||||
}
|
||||
|
||||
var stopwatch = Stopwatch.StartNew();
|
||||
var domains = 0;
|
||||
var chunks = 0;
|
||||
|
||||
foreach (var adapter in _adapters)
|
||||
{
|
||||
try
|
||||
{
|
||||
await DeleteChunksByDomainAsync(adapter.Domain, cancellationToken).ConfigureAwait(false);
|
||||
var domainChunks = await adapter.ProduceChunksAsync(cancellationToken).ConfigureAwait(false);
|
||||
if (domainChunks.Count > 0)
|
||||
{
|
||||
await UpsertChunksAsync(domainChunks, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
domains++;
|
||||
chunks += domainChunks.Count;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Failed to rebuild domain '{Domain}'; continuing with remaining domains.", adapter.Domain);
|
||||
}
|
||||
}
|
||||
|
||||
stopwatch.Stop();
|
||||
return new UnifiedSearchIndexSummary(domains, chunks, (long)stopwatch.Elapsed.TotalMilliseconds);
|
||||
}
|
||||
|
||||
public async Task DeleteChunksByDomainAsync(string domain, CancellationToken cancellationToken)
|
||||
{
|
||||
if (!_options.Enabled || string.IsNullOrWhiteSpace(_options.ConnectionString))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
await using var dataSource = new NpgsqlDataSourceBuilder(_options.ConnectionString).Build();
|
||||
const string sql = "DELETE FROM advisoryai.kb_chunk WHERE domain = @domain;";
|
||||
await using var command = dataSource.CreateCommand(sql);
|
||||
command.CommandTimeout = 60;
|
||||
command.Parameters.AddWithValue("domain", domain);
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task UpsertChunksAsync(IReadOnlyList<UnifiedChunk> chunks, CancellationToken cancellationToken)
|
||||
{
|
||||
await using var dataSource = new NpgsqlDataSourceBuilder(_options.ConnectionString).Build();
|
||||
await using var connection = await dataSource.OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// Ensure parent documents exist for each unique DocId
|
||||
var uniqueDocIds = chunks.Select(static c => c.DocId).Distinct(StringComparer.Ordinal).ToArray();
|
||||
foreach (var docId in uniqueDocIds)
|
||||
{
|
||||
var chunk = chunks.First(c => c.DocId == docId);
|
||||
await EnsureDocumentExistsAsync(connection, docId, chunk, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
const string sql = """
|
||||
INSERT INTO advisoryai.kb_chunk
|
||||
(
|
||||
chunk_id, doc_id, kind, anchor, section_path,
|
||||
span_start, span_end, title, body, body_tsv,
|
||||
embedding, metadata, domain, entity_key, entity_type, freshness,
|
||||
indexed_at
|
||||
)
|
||||
VALUES
|
||||
(
|
||||
@chunk_id, @doc_id, @kind, @anchor, @section_path,
|
||||
@span_start, @span_end, @title, @body,
|
||||
setweight(to_tsvector('simple', coalesce(@title, '')), 'A') ||
|
||||
setweight(to_tsvector('simple', coalesce(@section_path, '')), 'B') ||
|
||||
setweight(to_tsvector('simple', coalesce(@body, '')), 'D'),
|
||||
@embedding, @metadata::jsonb, @domain, @entity_key, @entity_type, @freshness,
|
||||
NOW()
|
||||
)
|
||||
ON CONFLICT (chunk_id) DO UPDATE SET
|
||||
kind = EXCLUDED.kind,
|
||||
title = EXCLUDED.title,
|
||||
body = EXCLUDED.body,
|
||||
body_tsv = EXCLUDED.body_tsv,
|
||||
embedding = EXCLUDED.embedding,
|
||||
metadata = EXCLUDED.metadata,
|
||||
domain = EXCLUDED.domain,
|
||||
entity_key = EXCLUDED.entity_key,
|
||||
entity_type = EXCLUDED.entity_type,
|
||||
freshness = EXCLUDED.freshness,
|
||||
indexed_at = NOW();
|
||||
""";
|
||||
|
||||
await using var command = connection.CreateCommand();
|
||||
command.CommandText = sql;
|
||||
command.CommandTimeout = 120;
|
||||
|
||||
foreach (var chunk in chunks)
|
||||
{
|
||||
command.Parameters.Clear();
|
||||
command.Parameters.AddWithValue("chunk_id", chunk.ChunkId);
|
||||
command.Parameters.AddWithValue("doc_id", chunk.DocId);
|
||||
command.Parameters.AddWithValue("kind", chunk.Kind);
|
||||
command.Parameters.AddWithValue("anchor", (object?)chunk.Anchor ?? DBNull.Value);
|
||||
command.Parameters.AddWithValue("section_path", (object?)chunk.SectionPath ?? DBNull.Value);
|
||||
command.Parameters.AddWithValue("span_start", chunk.SpanStart);
|
||||
command.Parameters.AddWithValue("span_end", chunk.SpanEnd);
|
||||
command.Parameters.AddWithValue("title", chunk.Title);
|
||||
command.Parameters.AddWithValue("body", chunk.Body);
|
||||
command.Parameters.AddWithValue(
|
||||
"embedding",
|
||||
NpgsqlDbType.Array | NpgsqlDbType.Real,
|
||||
chunk.Embedding is null ? Array.Empty<float>() : chunk.Embedding);
|
||||
command.Parameters.AddWithValue("metadata", NpgsqlDbType.Jsonb, chunk.Metadata.RootElement.GetRawText());
|
||||
command.Parameters.AddWithValue("domain", chunk.Domain);
|
||||
command.Parameters.AddWithValue("entity_key", (object?)chunk.EntityKey ?? DBNull.Value);
|
||||
command.Parameters.AddWithValue("entity_type", (object?)chunk.EntityType ?? DBNull.Value);
|
||||
command.Parameters.AddWithValue("freshness",
|
||||
chunk.Freshness.HasValue ? (object)chunk.Freshness.Value : DBNull.Value);
|
||||
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
private static async Task EnsureDocumentExistsAsync(
|
||||
NpgsqlConnection connection,
|
||||
string docId,
|
||||
UnifiedChunk chunk,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
const string sql = """
|
||||
INSERT INTO advisoryai.kb_doc
|
||||
(doc_id, doc_type, product, version, source_ref, path, title, content_hash, metadata, indexed_at)
|
||||
VALUES (@doc_id, @doc_type, @product, @version, @source_ref, @path, @title, @content_hash, '{}'::jsonb, NOW())
|
||||
ON CONFLICT (doc_id) DO NOTHING;
|
||||
""";
|
||||
|
||||
await using var command = connection.CreateCommand();
|
||||
command.CommandText = sql;
|
||||
command.CommandTimeout = 30;
|
||||
command.Parameters.AddWithValue("doc_id", docId);
|
||||
command.Parameters.AddWithValue("doc_type", chunk.Domain);
|
||||
command.Parameters.AddWithValue("product", "stella-ops");
|
||||
command.Parameters.AddWithValue("version", "local");
|
||||
command.Parameters.AddWithValue("source_ref", chunk.Domain);
|
||||
command.Parameters.AddWithValue("path", chunk.Kind);
|
||||
command.Parameters.AddWithValue("title", chunk.Title);
|
||||
command.Parameters.AddWithValue("content_hash", KnowledgeSearchText.StableId(chunk.Body));
|
||||
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
public sealed record UnifiedSearchIndexSummary(
|
||||
int DomainCount,
|
||||
int ChunkCount,
|
||||
long DurationMs);
|
||||
Reference in New Issue
Block a user