Search/AdvisoryAI and DAL conversion to EF finishes up. Preparation for microservices consolidation.
This commit is contained in:
@@ -27,34 +27,100 @@ internal sealed class UnifiedSearchIndexer : IUnifiedSearchIndexer
|
||||
}
|
||||
|
||||
public async Task IndexAllAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
await IndexAllWithSummaryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
internal async Task<UnifiedSearchIndexSummary> IndexAllWithSummaryAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (!_options.Enabled || string.IsNullOrWhiteSpace(_options.ConnectionString))
|
||||
{
|
||||
_logger.LogDebug("Unified search indexing skipped because configuration is incomplete.");
|
||||
return;
|
||||
return new UnifiedSearchIndexSummary(0, 0, 0);
|
||||
}
|
||||
|
||||
foreach (var adapter in _adapters)
|
||||
var stopwatch = Stopwatch.StartNew();
|
||||
var domains = 0;
|
||||
var chunks = 0;
|
||||
var changed = 0;
|
||||
var removed = 0;
|
||||
|
||||
foreach (var domainGroup in _adapters
|
||||
.GroupBy(static adapter => adapter.Domain, StringComparer.OrdinalIgnoreCase)
|
||||
.OrderBy(static group => group.Key, StringComparer.OrdinalIgnoreCase))
|
||||
{
|
||||
try
|
||||
{
|
||||
_logger.LogInformation("Unified search indexing domain '{Domain}'.", adapter.Domain);
|
||||
var chunks = await adapter.ProduceChunksAsync(cancellationToken).ConfigureAwait(false);
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
if (chunks.Count == 0)
|
||||
var domainStopwatch = Stopwatch.StartNew();
|
||||
var domain = domainGroup.Key;
|
||||
var domainChunks = new List<UnifiedChunk>();
|
||||
var hadSuccessfulAdapter = false;
|
||||
|
||||
foreach (var adapter in domainGroup)
|
||||
{
|
||||
try
|
||||
{
|
||||
_logger.LogDebug("No chunks produced by adapter for domain '{Domain}'.", adapter.Domain);
|
||||
continue;
|
||||
_logger.LogInformation("Unified search indexing adapter '{Adapter}' for domain '{Domain}'.",
|
||||
adapter.GetType().Name,
|
||||
domain);
|
||||
var adapterChunks = await adapter.ProduceChunksAsync(cancellationToken).ConfigureAwait(false);
|
||||
domainChunks.AddRange(adapterChunks);
|
||||
hadSuccessfulAdapter = true;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex,
|
||||
"Failed to index adapter '{Adapter}' for domain '{Domain}'; continuing with other adapters in this domain.",
|
||||
adapter.GetType().Name,
|
||||
domain);
|
||||
}
|
||||
}
|
||||
|
||||
await UpsertChunksAsync(chunks, cancellationToken).ConfigureAwait(false);
|
||||
_logger.LogInformation("Indexed {Count} chunks for domain '{Domain}'.", chunks.Count, adapter.Domain);
|
||||
}
|
||||
catch (Exception ex)
|
||||
if (!hadSuccessfulAdapter)
|
||||
{
|
||||
_logger.LogWarning(ex, "Failed to index domain '{Domain}'; continuing with other adapters.", adapter.Domain);
|
||||
_logger.LogWarning(
|
||||
"Unified search skipped domain '{Domain}' because all adapters failed in this refresh cycle.",
|
||||
domain);
|
||||
continue;
|
||||
}
|
||||
|
||||
var deduplicated = DeduplicateChunks(domainChunks);
|
||||
var changedForDomain = 0;
|
||||
if (deduplicated.Count > 0)
|
||||
{
|
||||
changedForDomain = await UpsertChunksAsync(deduplicated, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
var removedForDomain = await DeleteMissingChunksByDomainAsync(
|
||||
domain,
|
||||
deduplicated.Select(static chunk => chunk.ChunkId).ToArray(),
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
domainStopwatch.Stop();
|
||||
domains++;
|
||||
chunks += deduplicated.Count;
|
||||
changed += changedForDomain;
|
||||
removed += removedForDomain;
|
||||
_logger.LogInformation(
|
||||
"Unified search refresh domain '{Domain}' completed: seen_chunks={SeenChunkCount}, changed_chunks={ChangedChunkCount}, removed={RemovedCount}, duration_ms={DurationMs}",
|
||||
domain,
|
||||
deduplicated.Count,
|
||||
changedForDomain,
|
||||
removedForDomain,
|
||||
(long)domainStopwatch.Elapsed.TotalMilliseconds);
|
||||
}
|
||||
|
||||
stopwatch.Stop();
|
||||
_logger.LogInformation(
|
||||
"Unified search incremental indexing completed: domains={DomainCount}, seen_chunks={SeenChunkCount}, changed_chunks={ChangedChunkCount}, removed={RemovedCount}, duration_ms={DurationMs}",
|
||||
domains,
|
||||
chunks,
|
||||
changed,
|
||||
removed,
|
||||
(long)stopwatch.Elapsed.TotalMilliseconds);
|
||||
|
||||
return new UnifiedSearchIndexSummary(domains, chunks, (long)stopwatch.Elapsed.TotalMilliseconds);
|
||||
}
|
||||
|
||||
public async Task<UnifiedSearchIndexSummary> RebuildAllAsync(CancellationToken cancellationToken)
|
||||
@@ -69,24 +135,58 @@ internal sealed class UnifiedSearchIndexer : IUnifiedSearchIndexer
|
||||
var domains = 0;
|
||||
var chunks = 0;
|
||||
|
||||
foreach (var adapter in _adapters)
|
||||
foreach (var domainGroup in _adapters
|
||||
.GroupBy(static adapter => adapter.Domain, StringComparer.OrdinalIgnoreCase)
|
||||
.OrderBy(static group => group.Key, StringComparer.OrdinalIgnoreCase))
|
||||
{
|
||||
try
|
||||
{
|
||||
await DeleteChunksByDomainAsync(adapter.Domain, cancellationToken).ConfigureAwait(false);
|
||||
var domainChunks = await adapter.ProduceChunksAsync(cancellationToken).ConfigureAwait(false);
|
||||
if (domainChunks.Count > 0)
|
||||
{
|
||||
await UpsertChunksAsync(domainChunks, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
domains++;
|
||||
chunks += domainChunks.Count;
|
||||
}
|
||||
catch (Exception ex)
|
||||
var domain = domainGroup.Key;
|
||||
var domainStopwatch = Stopwatch.StartNew();
|
||||
var domainChunks = new List<UnifiedChunk>();
|
||||
var hadSuccessfulAdapter = false;
|
||||
|
||||
foreach (var adapter in domainGroup)
|
||||
{
|
||||
_logger.LogWarning(ex, "Failed to rebuild domain '{Domain}'; continuing with remaining domains.", adapter.Domain);
|
||||
try
|
||||
{
|
||||
var adapterChunks = await adapter.ProduceChunksAsync(cancellationToken).ConfigureAwait(false);
|
||||
domainChunks.AddRange(adapterChunks);
|
||||
hadSuccessfulAdapter = true;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex,
|
||||
"Failed to rebuild adapter '{Adapter}' for domain '{Domain}'; continuing with other adapters in this domain.",
|
||||
adapter.GetType().Name,
|
||||
domain);
|
||||
}
|
||||
}
|
||||
|
||||
if (!hadSuccessfulAdapter)
|
||||
{
|
||||
_logger.LogWarning(
|
||||
"Unified search rebuild skipped domain '{Domain}' because all adapters failed.",
|
||||
domain);
|
||||
continue;
|
||||
}
|
||||
|
||||
await DeleteChunksByDomainAsync(domain, cancellationToken).ConfigureAwait(false);
|
||||
var deduplicated = DeduplicateChunks(domainChunks);
|
||||
if (deduplicated.Count > 0)
|
||||
{
|
||||
await UpsertChunksAsync(deduplicated, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
domainStopwatch.Stop();
|
||||
domains++;
|
||||
chunks += deduplicated.Count;
|
||||
|
||||
_logger.LogInformation(
|
||||
"Unified search rebuild domain '{Domain}' completed: chunks={ChunkCount}, duration_ms={DurationMs}",
|
||||
domain,
|
||||
deduplicated.Count,
|
||||
(long)domainStopwatch.Elapsed.TotalMilliseconds);
|
||||
}
|
||||
|
||||
stopwatch.Stop();
|
||||
@@ -108,7 +208,42 @@ internal sealed class UnifiedSearchIndexer : IUnifiedSearchIndexer
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task UpsertChunksAsync(IReadOnlyList<UnifiedChunk> chunks, CancellationToken cancellationToken)
|
||||
private async Task<int> DeleteMissingChunksByDomainAsync(
|
||||
string domain,
|
||||
IReadOnlyCollection<string> currentChunkIds,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (!_options.Enabled || string.IsNullOrWhiteSpace(_options.ConnectionString))
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
await using var dataSource = new NpgsqlDataSourceBuilder(_options.ConnectionString).Build();
|
||||
await using var command = dataSource.CreateCommand();
|
||||
command.CommandTimeout = 90;
|
||||
command.Parameters.AddWithValue("domain", domain);
|
||||
|
||||
if (currentChunkIds.Count == 0)
|
||||
{
|
||||
command.CommandText = "DELETE FROM advisoryai.kb_chunk WHERE domain = @domain;";
|
||||
}
|
||||
else
|
||||
{
|
||||
command.CommandText = """
|
||||
DELETE FROM advisoryai.kb_chunk
|
||||
WHERE domain = @domain
|
||||
AND NOT (chunk_id = ANY(@chunk_ids));
|
||||
""";
|
||||
command.Parameters.AddWithValue(
|
||||
"chunk_ids",
|
||||
NpgsqlDbType.Array | NpgsqlDbType.Text,
|
||||
currentChunkIds.ToArray());
|
||||
}
|
||||
|
||||
return await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task<int> UpsertChunksAsync(IReadOnlyList<UnifiedChunk> chunks, CancellationToken cancellationToken)
|
||||
{
|
||||
await using var dataSource = new NpgsqlDataSourceBuilder(_options.ConnectionString).Build();
|
||||
await using var connection = await dataSource.OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
|
||||
@@ -140,7 +275,12 @@ internal sealed class UnifiedSearchIndexer : IUnifiedSearchIndexer
|
||||
NOW()
|
||||
)
|
||||
ON CONFLICT (chunk_id) DO UPDATE SET
|
||||
doc_id = EXCLUDED.doc_id,
|
||||
kind = EXCLUDED.kind,
|
||||
anchor = EXCLUDED.anchor,
|
||||
section_path = EXCLUDED.section_path,
|
||||
span_start = EXCLUDED.span_start,
|
||||
span_end = EXCLUDED.span_end,
|
||||
title = EXCLUDED.title,
|
||||
body = EXCLUDED.body,
|
||||
body_tsv = EXCLUDED.body_tsv,
|
||||
@@ -150,13 +290,29 @@ internal sealed class UnifiedSearchIndexer : IUnifiedSearchIndexer
|
||||
entity_key = EXCLUDED.entity_key,
|
||||
entity_type = EXCLUDED.entity_type,
|
||||
freshness = EXCLUDED.freshness,
|
||||
indexed_at = NOW();
|
||||
indexed_at = NOW()
|
||||
WHERE advisoryai.kb_chunk.doc_id IS DISTINCT FROM EXCLUDED.doc_id
|
||||
OR advisoryai.kb_chunk.kind IS DISTINCT FROM EXCLUDED.kind
|
||||
OR advisoryai.kb_chunk.anchor IS DISTINCT FROM EXCLUDED.anchor
|
||||
OR advisoryai.kb_chunk.section_path IS DISTINCT FROM EXCLUDED.section_path
|
||||
OR advisoryai.kb_chunk.span_start IS DISTINCT FROM EXCLUDED.span_start
|
||||
OR advisoryai.kb_chunk.span_end IS DISTINCT FROM EXCLUDED.span_end
|
||||
OR advisoryai.kb_chunk.title IS DISTINCT FROM EXCLUDED.title
|
||||
OR advisoryai.kb_chunk.body IS DISTINCT FROM EXCLUDED.body
|
||||
OR advisoryai.kb_chunk.body_tsv IS DISTINCT FROM EXCLUDED.body_tsv
|
||||
OR advisoryai.kb_chunk.embedding IS DISTINCT FROM EXCLUDED.embedding
|
||||
OR advisoryai.kb_chunk.metadata IS DISTINCT FROM EXCLUDED.metadata
|
||||
OR advisoryai.kb_chunk.domain IS DISTINCT FROM EXCLUDED.domain
|
||||
OR advisoryai.kb_chunk.entity_key IS DISTINCT FROM EXCLUDED.entity_key
|
||||
OR advisoryai.kb_chunk.entity_type IS DISTINCT FROM EXCLUDED.entity_type
|
||||
OR advisoryai.kb_chunk.freshness IS DISTINCT FROM EXCLUDED.freshness;
|
||||
""";
|
||||
|
||||
await using var command = connection.CreateCommand();
|
||||
command.CommandText = sql;
|
||||
command.CommandTimeout = 120;
|
||||
|
||||
var affectedRows = 0;
|
||||
foreach (var chunk in chunks)
|
||||
{
|
||||
command.Parameters.Clear();
|
||||
@@ -180,8 +336,10 @@ internal sealed class UnifiedSearchIndexer : IUnifiedSearchIndexer
|
||||
command.Parameters.AddWithValue("freshness",
|
||||
chunk.Freshness.HasValue ? (object)chunk.Freshness.Value : DBNull.Value);
|
||||
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
affectedRows += await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
return affectedRows;
|
||||
}
|
||||
|
||||
private static async Task EnsureDocumentExistsAsync(
|
||||
@@ -211,6 +369,22 @@ internal sealed class UnifiedSearchIndexer : IUnifiedSearchIndexer
|
||||
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private static IReadOnlyList<UnifiedChunk> DeduplicateChunks(IEnumerable<UnifiedChunk> chunks)
|
||||
{
|
||||
var byChunkId = new SortedDictionary<string, UnifiedChunk>(StringComparer.Ordinal);
|
||||
foreach (var chunk in chunks)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(chunk.ChunkId))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
byChunkId[chunk.ChunkId] = chunk;
|
||||
}
|
||||
|
||||
return byChunkId.Values.ToArray();
|
||||
}
|
||||
}
|
||||
|
||||
public sealed record UnifiedSearchIndexSummary(
|
||||
|
||||
Reference in New Issue
Block a user