Introduce PostgresDeploymentCompatibilityStore with migration 011, in-memory fallback, deployment endpoints, and Postgres fixture for integration tests. Update Scheduler repository with connection policy adoption. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
163 lines
7.1 KiB
C#
163 lines
7.1 KiB
C#
|
|
using Microsoft.Extensions.Configuration;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Npgsql;
|
|
using StellaOps.Infrastructure.Postgres.Migrations;
|
|
using StellaOps.JobEngine.Core.Backfill;
|
|
using StellaOps.JobEngine.Core.DeadLetter;
|
|
using StellaOps.JobEngine.Core.Observability;
|
|
using StellaOps.JobEngine.Core.Repositories;
|
|
using StellaOps.JobEngine.Core.Services;
|
|
using StellaOps.JobEngine.Infrastructure.Caching;
|
|
using StellaOps.JobEngine.Infrastructure.Ledger;
|
|
using StellaOps.JobEngine.Infrastructure.Observability;
|
|
using StellaOps.JobEngine.Infrastructure.Options;
|
|
using StellaOps.JobEngine.Infrastructure.Postgres;
|
|
using StellaOps.JobEngine.Infrastructure.Repositories;
|
|
using StellaOps.JobEngine.Infrastructure.Services;
|
|
using System;
|
|
using System.Linq;
|
|
|
|
namespace StellaOps.JobEngine.Infrastructure;
|
|
|
|
/// <summary>
|
|
/// Extension methods for registering JobEngine infrastructure services.
|
|
/// </summary>
|
|
public static class ServiceCollectionExtensions
|
|
{
|
|
/// <summary>
|
|
/// Adds JobEngine infrastructure services to the service collection.
|
|
/// </summary>
|
|
/// <param name="services">The service collection.</param>
|
|
/// <param name="configuration">The configuration.</param>
|
|
/// <returns>The service collection for chaining.</returns>
|
|
public static IServiceCollection AddJobEngineInfrastructure(
|
|
this IServiceCollection services,
|
|
IConfiguration configuration)
|
|
{
|
|
// Register configuration options
|
|
services.AddOptions<JobEngineServiceOptions>()
|
|
.Bind(configuration.GetSection(JobEngineServiceOptions.SectionName))
|
|
.PostConfigure(options =>
|
|
{
|
|
var explicitConnection = configuration[$"{JobEngineServiceOptions.SectionName}:Database:ConnectionString"];
|
|
if (!string.IsNullOrWhiteSpace(explicitConnection))
|
|
{
|
|
options.Database.ConnectionString = explicitConnection;
|
|
return;
|
|
}
|
|
|
|
var orchestratorConnection = configuration["Orchestrator:Database:ConnectionString"];
|
|
if (ShouldReplaceConnectionString(options.Database.ConnectionString)
|
|
&& !string.IsNullOrWhiteSpace(orchestratorConnection))
|
|
{
|
|
options.Database.ConnectionString = orchestratorConnection;
|
|
}
|
|
|
|
var fallbackConnection =
|
|
configuration.GetConnectionString("Default")
|
|
?? configuration["ConnectionStrings:Default"];
|
|
|
|
if (string.IsNullOrWhiteSpace(fallbackConnection))
|
|
{
|
|
return;
|
|
}
|
|
|
|
if (ShouldReplaceConnectionString(options.Database.ConnectionString))
|
|
{
|
|
options.Database.ConnectionString = fallbackConnection;
|
|
}
|
|
});
|
|
|
|
services.AddStartupMigrations<JobEngineServiceOptions>(
|
|
schemaName: "orchestrator",
|
|
moduleName: "JobEngine",
|
|
migrationsAssembly: typeof(JobEngineDataSource).Assembly,
|
|
connectionStringSelector: options => options.Database.ConnectionString);
|
|
|
|
// Register data source
|
|
services.AddSingleton<JobEngineDataSource>();
|
|
|
|
// Register repositories
|
|
services.AddScoped<Infrastructure.Repositories.IJobRepository, PostgresJobRepository>();
|
|
services.AddScoped<IArtifactRepository, PostgresArtifactRepository>();
|
|
services.AddScoped<ISourceRepository, PostgresSourceRepository>();
|
|
services.AddScoped<IRunRepository, PostgresRunRepository>();
|
|
services.AddScoped<Infrastructure.Repositories.IQuotaRepository, PostgresQuotaRepository>();
|
|
services.AddScoped<IThrottleRepository, PostgresThrottleRepository>();
|
|
services.AddScoped<IWatermarkRepository, PostgresWatermarkRepository>();
|
|
services.AddScoped<Infrastructure.Repositories.IBackfillRepository, PostgresBackfillRepository>();
|
|
services.AddScoped<IPackRunRepository, PostgresPackRunRepository>();
|
|
services.AddScoped<IPackRunLogRepository, PostgresPackRunLogRepository>();
|
|
services.AddScoped<IPackRegistryRepository, PostgresPackRegistryRepository>();
|
|
services.AddScoped<IFirstSignalSnapshotRepository, PostgresFirstSignalSnapshotRepository>();
|
|
services.AddScoped<IDeadLetterRepository, PostgresDeadLetterRepository>();
|
|
|
|
// Register audit and ledger repositories
|
|
services.AddScoped<IAuditRepository, PostgresAuditRepository>();
|
|
services.AddScoped<ILedgerRepository, PostgresLedgerRepository>();
|
|
services.AddScoped<ILedgerExportRepository, PostgresLedgerExportRepository>();
|
|
services.AddScoped<IManifestRepository, PostgresManifestRepository>();
|
|
|
|
// Register ledger exporter service
|
|
services.AddScoped<ILedgerExporter, LedgerExporter>();
|
|
|
|
// Register duplicate suppression factory
|
|
services.AddSingleton<IDuplicateSuppressorFactory, PostgresDuplicateSuppressorFactory>();
|
|
|
|
// Register golden signals metrics (per ORCH-OBS-51-001)
|
|
services.AddSingleton<JobEngineGoldenSignals>();
|
|
|
|
// Register incident mode hooks (per ORCH-OBS-55-001)
|
|
var incidentModeOptions = configuration
|
|
.GetSection(IncidentModeHooksOptions.SectionName)
|
|
.Get<IncidentModeHooksOptions>() ?? new IncidentModeHooksOptions();
|
|
services.AddSingleton(incidentModeOptions);
|
|
services.AddSingleton<IIncidentModeHooks, IncidentModeHooks>();
|
|
|
|
// First signal (TTFS) services
|
|
services.Configure<FirstSignalOptions>(configuration.GetSection(FirstSignalOptions.SectionName));
|
|
services.AddHttpClient<IFailureSignatureLookupClient, SchedulerFailureSignatureLookupClient>();
|
|
services.AddSingleton<IFirstSignalCache, FirstSignalCache>();
|
|
services.AddScoped<StellaOps.JobEngine.Core.Services.IFirstSignalService, FirstSignalService>();
|
|
|
|
// Circuit breaker and quota governance services (per SPRINT_20260208_042)
|
|
services.AddScoped<ICircuitBreakerService, CircuitBreakerService>();
|
|
services.AddScoped<IQuotaGovernanceService, QuotaGovernanceService>();
|
|
|
|
return services;
|
|
}
|
|
|
|
private static bool ShouldReplaceConnectionString(string? configuredConnectionString)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(configuredConnectionString))
|
|
{
|
|
return true;
|
|
}
|
|
|
|
try
|
|
{
|
|
var builder = new NpgsqlConnectionStringBuilder(configuredConnectionString);
|
|
var host = builder.Host?.Trim();
|
|
if (string.IsNullOrWhiteSpace(host))
|
|
{
|
|
return true;
|
|
}
|
|
|
|
return host.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries)
|
|
.All(IsLoopbackHost);
|
|
}
|
|
catch
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
|
|
private static bool IsLoopbackHost(string host)
|
|
{
|
|
return host.Equals("localhost", StringComparison.OrdinalIgnoreCase)
|
|
|| host.Equals("127.0.0.1", StringComparison.OrdinalIgnoreCase)
|
|
|| host.Equals("::1", StringComparison.OrdinalIgnoreCase);
|
|
}
|
|
}
|