Restore Doctor search after AdvisoryAI cold-start race
This commit is contained in:
@@ -10,6 +10,7 @@ namespace StellaOps.AdvisoryAI.KnowledgeSearch;
|
||||
internal sealed class PostgresKnowledgeSearchStore : IKnowledgeSearchStore, IKnowledgeSearchCorpusAvailabilityStore, IAsyncDisposable
|
||||
{
|
||||
private static readonly JsonDocument EmptyJsonDocument = JsonDocument.Parse("{}");
|
||||
private const string SchemaLockKey = "advisoryai_knowledge_schema";
|
||||
|
||||
private readonly KnowledgeSearchOptions _options;
|
||||
private readonly ILogger<PostgresKnowledgeSearchStore> _logger;
|
||||
@@ -38,6 +39,8 @@ internal sealed class PostgresKnowledgeSearchStore : IKnowledgeSearchStore, IKno
|
||||
await using var connection = await GetDataSource().OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
|
||||
await using var transaction = await connection.BeginTransactionAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
await AcquireSchemaLockAsync(connection, transaction, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
const string createSchemaSql = "CREATE SCHEMA IF NOT EXISTS advisoryai;";
|
||||
await ExecuteNonQueryAsync(connection, transaction, createSchemaSql, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
@@ -1123,6 +1126,19 @@ internal sealed class PostgresKnowledgeSearchStore : IKnowledgeSearchStore, IKno
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private static async Task AcquireSchemaLockAsync(
|
||||
NpgsqlConnection connection,
|
||||
NpgsqlTransaction transaction,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
await using var command = connection.CreateCommand();
|
||||
command.Transaction = transaction;
|
||||
command.CommandText = "SELECT pg_advisory_xact_lock(hashtext($1));";
|
||||
command.CommandTimeout = ToCommandTimeoutSeconds(TimeSpan.FromSeconds(30));
|
||||
command.Parameters.AddWithValue(SchemaLockKey);
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private static int ToCommandTimeoutSeconds(TimeSpan timeout)
|
||||
{
|
||||
if (timeout <= TimeSpan.Zero)
|
||||
|
||||
@@ -372,6 +372,60 @@ public sealed class UnifiedSearchLiveAdapterIntegrationTests
|
||||
(await CountDomainChunksAsync(connection, "policy")).Should().Be(4);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task PostgresKnowledgeSearchStore_EnsureSchemaAsync_IsSafeUnderConcurrentStartupCalls()
|
||||
{
|
||||
await using var fixture = await StartPostgresOrSkipAsync();
|
||||
var options = Options.Create(new KnowledgeSearchOptions
|
||||
{
|
||||
Enabled = true,
|
||||
ConnectionString = fixture.ConnectionString,
|
||||
FtsLanguageConfig = "simple"
|
||||
});
|
||||
|
||||
await using var resetConnection = new NpgsqlConnection(fixture.ConnectionString);
|
||||
await resetConnection.OpenAsync();
|
||||
await ExecuteSqlAsync(resetConnection, "DROP SCHEMA IF EXISTS advisoryai CASCADE;");
|
||||
|
||||
var stores = Enumerable.Range(0, 6)
|
||||
.Select(_ => new PostgresKnowledgeSearchStore(options, NullLogger<PostgresKnowledgeSearchStore>.Instance))
|
||||
.ToArray();
|
||||
var gate = new ManualResetEventSlim(false);
|
||||
|
||||
try
|
||||
{
|
||||
var tasks = stores
|
||||
.Select(store => Task.Run(async () =>
|
||||
{
|
||||
gate.Wait();
|
||||
await store.EnsureSchemaAsync(CancellationToken.None);
|
||||
}))
|
||||
.ToArray();
|
||||
|
||||
gate.Set();
|
||||
await Task.WhenAll(tasks);
|
||||
|
||||
await using var verifyConnection = new NpgsqlConnection(fixture.ConnectionString);
|
||||
await verifyConnection.OpenAsync();
|
||||
|
||||
(await ScalarAsync<string?>(
|
||||
verifyConnection,
|
||||
"SELECT to_regclass('advisoryai.kb_chunk')::text;")).Should().Be("advisoryai.kb_chunk");
|
||||
(await ScalarAsync<int>(
|
||||
verifyConnection,
|
||||
"SELECT COUNT(*) FROM advisoryai.__migration_history;")).Should().BeGreaterThan(0);
|
||||
}
|
||||
finally
|
||||
{
|
||||
foreach (var store in stores)
|
||||
{
|
||||
await store.DisposeAsync();
|
||||
}
|
||||
|
||||
gate.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task UnifiedSearchIndexer_RebuildAllAsync_PopulatesEnglishFtsColumns_AndRecallsUnifiedDomains()
|
||||
{
|
||||
@@ -981,6 +1035,19 @@ public sealed class UnifiedSearchLiveAdapterIntegrationTests
|
||||
return Convert.ToInt32(scalar, System.Globalization.CultureInfo.InvariantCulture);
|
||||
}
|
||||
|
||||
private static async Task<T> ScalarAsync<T>(NpgsqlConnection connection, string sql)
|
||||
{
|
||||
await using var command = connection.CreateCommand();
|
||||
command.CommandText = sql;
|
||||
var scalar = await command.ExecuteScalarAsync();
|
||||
if (scalar is null || scalar is DBNull)
|
||||
{
|
||||
return default!;
|
||||
}
|
||||
|
||||
return (T)Convert.ChangeType(scalar, typeof(T), CultureInfo.InvariantCulture);
|
||||
}
|
||||
|
||||
private static async Task<int> CountEnglishTsvRowsAsync(NpgsqlConnection connection, string domain)
|
||||
{
|
||||
await using var command = connection.CreateCommand();
|
||||
|
||||
Reference in New Issue
Block a user