up
Some checks failed
AOC Guard CI / aoc-guard (push) Has been cancelled
Concelier Attestation Tests / attestation-tests (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
Export Center CI / export-ci (push) Has been cancelled
Risk Bundle CI / risk-bundle-build (push) Has been cancelled
Scanner Analyzers / Discover Analyzers (push) Has been cancelled
Scanner Analyzers / Validate Test Fixtures (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Risk Bundle CI / risk-bundle-offline-kit (push) Has been cancelled
Risk Bundle CI / publish-checksums (push) Has been cancelled
Scanner Analyzers / Build Analyzers (push) Has been cancelled
Scanner Analyzers / Test Language Analyzers (push) Has been cancelled
Scanner Analyzers / Verify Deterministic Output (push) Has been cancelled
devportal-offline / build-offline (push) Has been cancelled
Mirror Thin Bundle Sign & Verify / mirror-sign (push) Has been cancelled

This commit is contained in:
StellaOps Bot
2025-12-07 23:38:50 +02:00
parent 68bc53a07b
commit 3d01bf9edc
49 changed files with 8269 additions and 1728 deletions

View File

@@ -0,0 +1,73 @@
using Microsoft.Extensions.Logging;
using Npgsql;
using StellaOps.ExportCenter.Core.Configuration;
namespace StellaOps.ExportCenter.Infrastructure.Db;
/// <summary>
/// Manages Npgsql data source for Export Center with tenant isolation.
/// </summary>
public sealed class ExportCenterDataSource : IAsyncDisposable
{
private readonly NpgsqlDataSource _dataSource;
private readonly ILogger<ExportCenterDataSource> _logger;
public ExportCenterDataSource(
DatabaseOptions databaseOptions,
ILogger<ExportCenterDataSource> logger)
{
ArgumentNullException.ThrowIfNull(databaseOptions);
ArgumentException.ThrowIfNullOrWhiteSpace(databaseOptions.ConnectionString);
_logger = logger;
_dataSource = CreateDataSource(databaseOptions.ConnectionString);
}
public async ValueTask DisposeAsync()
{
await _dataSource.DisposeAsync();
}
public Task<NpgsqlConnection> OpenConnectionAsync(CancellationToken cancellationToken)
=> OpenConnectionAsync(null, cancellationToken);
public async Task<NpgsqlConnection> OpenConnectionAsync(Guid? tenantId, CancellationToken cancellationToken)
{
var connection = await _dataSource.OpenConnectionAsync(cancellationToken);
await ConfigureSessionAsync(connection, tenantId, cancellationToken);
return connection;
}
private static NpgsqlDataSource CreateDataSource(string connectionString)
{
var builder = new NpgsqlDataSourceBuilder(connectionString);
builder.EnableDynamicJson();
return builder.Build();
}
private async Task ConfigureSessionAsync(NpgsqlConnection connection, Guid? tenantId, CancellationToken cancellationToken)
{
try
{
await using var command = new NpgsqlCommand("SET TIME ZONE 'UTC';", connection);
await command.ExecuteNonQueryAsync(cancellationToken);
if (tenantId.HasValue)
{
await using var tenantCommand = new NpgsqlCommand("SELECT set_config('app.current_tenant', @tenant, false);", connection);
tenantCommand.Parameters.AddWithValue("tenant", tenantId.Value.ToString("D"));
await tenantCommand.ExecuteNonQueryAsync(cancellationToken);
}
}
catch (Exception ex)
{
if (_logger.IsEnabled(LogLevel.Error))
{
_logger.LogError(ex, "Failed to configure Export Center session state.");
}
await connection.DisposeAsync();
throw;
}
}
}

View File

@@ -0,0 +1,90 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StellaOps.ExportCenter.Core.Configuration;
namespace StellaOps.ExportCenter.Infrastructure.Db;
/// <summary>
/// Extension methods for registering Export Center database services.
/// </summary>
public static class ExportCenterDbServiceExtensions
{
/// <summary>
/// Adds Export Center database services to the service collection.
/// </summary>
public static IServiceCollection AddExportCenterDatabase(
this IServiceCollection services,
Action<DatabaseOptions>? configureOptions = null)
{
if (configureOptions is not null)
{
services.Configure(configureOptions);
}
services.AddSingleton(sp =>
{
var options = sp.GetRequiredService<IOptions<ExportCenterOptions>>().Value.Database;
var logger = sp.GetRequiredService<ILogger<ExportCenterDataSource>>();
return new ExportCenterDataSource(options, logger);
});
services.AddSingleton<IExportCenterMigrationRunner, ExportCenterMigrationRunner>();
return services;
}
/// <summary>
/// Adds the startup migration hosted service.
/// </summary>
public static IServiceCollection AddExportCenterMigrations(this IServiceCollection services)
{
services.AddHostedService<ExportCenterMigrationHostedService>();
return services;
}
}
/// <summary>
/// Hosted service that runs database migrations at startup.
/// </summary>
internal sealed class ExportCenterMigrationHostedService(
IExportCenterMigrationRunner migrationRunner,
IOptions<ExportCenterOptions> options,
ILogger<ExportCenterMigrationHostedService> logger) : IHostedService
{
public async Task StartAsync(CancellationToken cancellationToken)
{
if (!options.Value.Database.ApplyMigrationsAtStartup)
{
if (logger.IsEnabled(LogLevel.Information))
{
logger.LogInformation("Export Center database migrations disabled by configuration.");
}
return;
}
try
{
if (logger.IsEnabled(LogLevel.Information))
{
logger.LogInformation("Applying Export Center database migrations...");
}
await migrationRunner.ApplyAsync(cancellationToken);
if (logger.IsEnabled(LogLevel.Information))
{
logger.LogInformation("Export Center database migrations completed successfully.");
}
}
catch (Exception ex)
{
logger.LogCritical(ex, "Failed to apply Export Center database migrations.");
throw;
}
}
public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}

View File

@@ -0,0 +1,139 @@
using Microsoft.Extensions.Logging;
using Npgsql;
namespace StellaOps.ExportCenter.Infrastructure.Db;
/// <summary>
/// Interface for running Export Center database migrations.
/// </summary>
public interface IExportCenterMigrationRunner
{
Task ApplyAsync(CancellationToken cancellationToken);
}
/// <summary>
/// Applies SQL migrations for Export Center with checksum validation.
/// </summary>
internal sealed class ExportCenterMigrationRunner(
ExportCenterDataSource dataSource,
ILogger<ExportCenterMigrationRunner> logger) : IExportCenterMigrationRunner
{
private const string VersionTableSql = """
CREATE TABLE IF NOT EXISTS export_center.export_schema_version
(
version integer PRIMARY KEY,
script_name text NOT NULL,
script_checksum text NOT NULL,
applied_at_utc timestamptz NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC')
);
""";
public async Task ApplyAsync(CancellationToken cancellationToken)
{
var scripts = MigrationLoader.LoadAll();
if (scripts.Count == 0)
{
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug("No migrations discovered for Export Center.");
}
return;
}
await using var connection = await dataSource.OpenConnectionAsync(cancellationToken);
await using var transaction = await connection.BeginTransactionAsync(cancellationToken);
// Ensure schema exists first
await EnsureSchemaAsync(connection, transaction, cancellationToken);
await EnsureVersionTableAsync(connection, transaction, cancellationToken);
var appliedScripts = await LoadAppliedScriptsAsync(connection, transaction, cancellationToken);
foreach (var script in scripts)
{
if (appliedScripts.TryGetValue(script.Version, out var existingChecksum))
{
if (!string.Equals(existingChecksum, script.Sha256, StringComparison.Ordinal))
{
throw new InvalidOperationException(
$"Checksum mismatch for migration {script.Name}. Expected {existingChecksum}, computed {script.Sha256}.");
}
continue;
}
if (logger.IsEnabled(LogLevel.Information))
{
logger.LogInformation("Applying Export Center migration {Version}: {Name}", script.Version, script.Name);
}
await ExecuteScriptAsync(connection, transaction, script.Sql, cancellationToken);
await RecordAppliedScriptAsync(connection, transaction, script, cancellationToken);
}
await transaction.CommitAsync(cancellationToken);
}
private static async Task EnsureSchemaAsync(NpgsqlConnection connection, NpgsqlTransaction transaction, CancellationToken cancellationToken)
{
const string schemaSql = """
CREATE SCHEMA IF NOT EXISTS export_center;
CREATE SCHEMA IF NOT EXISTS export_center_app;
""";
await using var command = new NpgsqlCommand(schemaSql, connection, transaction);
await command.ExecuteNonQueryAsync(cancellationToken);
}
private static async Task EnsureVersionTableAsync(NpgsqlConnection connection, NpgsqlTransaction transaction, CancellationToken cancellationToken)
{
await using var command = new NpgsqlCommand(VersionTableSql, connection, transaction);
await command.ExecuteNonQueryAsync(cancellationToken);
}
private static async Task<Dictionary<int, string>> LoadAppliedScriptsAsync(NpgsqlConnection connection, NpgsqlTransaction transaction, CancellationToken cancellationToken)
{
const string sql = """
SELECT version, script_checksum
FROM export_center.export_schema_version
ORDER BY version;
""";
await using var command = new NpgsqlCommand(sql, connection, transaction);
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
var dictionary = new Dictionary<int, string>();
while (await reader.ReadAsync(cancellationToken))
{
var version = reader.GetInt32(0);
var checksum = reader.GetString(1);
dictionary[version] = checksum;
}
return dictionary;
}
private static async Task ExecuteScriptAsync(NpgsqlConnection connection, NpgsqlTransaction transaction, string sql, CancellationToken cancellationToken)
{
await using var command = new NpgsqlCommand(sql, connection, transaction)
{
CommandTimeout = 0
};
await command.ExecuteNonQueryAsync(cancellationToken);
}
private static async Task RecordAppliedScriptAsync(NpgsqlConnection connection, NpgsqlTransaction transaction, MigrationScript script, CancellationToken cancellationToken)
{
const string insertSql = """
INSERT INTO export_center.export_schema_version(version, script_name, script_checksum)
VALUES (@version, @name, @checksum);
""";
await using var command = new NpgsqlCommand(insertSql, connection, transaction);
command.Parameters.AddWithValue("version", script.Version);
command.Parameters.AddWithValue("name", script.Name);
command.Parameters.AddWithValue("checksum", script.Sha256);
await command.ExecuteNonQueryAsync(cancellationToken);
}
}

View File

@@ -0,0 +1,42 @@
using System.Reflection;
namespace StellaOps.ExportCenter.Infrastructure.Db;
/// <summary>
/// Loads SQL migration scripts from embedded resources.
/// </summary>
internal static class MigrationLoader
{
private static readonly Assembly Assembly = typeof(MigrationLoader).Assembly;
public static IReadOnlyList<MigrationScript> LoadAll()
{
var scripts = new List<MigrationScript>();
foreach (var resourceName in Assembly.GetManifestResourceNames())
{
if (!resourceName.Contains(".Db.Migrations.", StringComparison.OrdinalIgnoreCase))
{
continue;
}
using var stream = Assembly.GetManifestResourceStream(resourceName);
if (stream is null)
{
continue;
}
using var reader = new StreamReader(stream);
var sql = reader.ReadToEnd();
if (MigrationScript.TryCreate(resourceName, sql, out var script))
{
scripts.Add(script);
}
}
return scripts
.OrderBy(script => script.Version)
.ToArray();
}
}

View File

@@ -0,0 +1,59 @@
using System.Diagnostics.CodeAnalysis;
using System.Security.Cryptography;
using System.Text;
using System.Text.RegularExpressions;
namespace StellaOps.ExportCenter.Infrastructure.Db;
/// <summary>
/// Represents a SQL migration script with version tracking.
/// </summary>
internal sealed partial class MigrationScript
{
private static readonly Regex VersionRegex = GetVersionRegex();
private MigrationScript(int version, string name, string sql)
{
Version = version;
Name = name;
Sql = sql;
Sha256 = ComputeSha256(sql);
}
public int Version { get; }
public string Name { get; }
public string Sql { get; }
public string Sha256 { get; }
public static bool TryCreate(string resourceName, string sql, [NotNullWhen(true)] out MigrationScript? script)
{
var fileName = resourceName.Split('.').Last();
var match = VersionRegex.Match(fileName);
if (!match.Success || !int.TryParse(match.Groups["version"].Value, out var version))
{
script = null;
return false;
}
script = new MigrationScript(version, fileName, sql);
return true;
}
private static string ComputeSha256(string sql)
{
var normalized = NormalizeLineEndings(sql);
var bytes = Encoding.UTF8.GetBytes(normalized);
var hash = SHA256.HashData(bytes);
return Convert.ToHexString(hash).ToLowerInvariant();
}
private static string NormalizeLineEndings(string value)
=> value.Replace("\r\n", "\n", StringComparison.Ordinal);
[GeneratedRegex(@"^(?<version>\d{3,})[_-]", RegexOptions.Compiled)]
private static partial Regex GetVersionRegex();
}

View File

@@ -0,0 +1,180 @@
-- 001_initial_schema.sql
-- Establishes core schema, RLS policies, and tables for Export Center.
CREATE SCHEMA IF NOT EXISTS export_center;
CREATE SCHEMA IF NOT EXISTS export_center_app;
-- Tenant isolation function
CREATE OR REPLACE FUNCTION export_center_app.require_current_tenant()
RETURNS uuid
LANGUAGE plpgsql
AS $$
DECLARE
tenant_text text;
BEGIN
tenant_text := current_setting('app.current_tenant', true);
IF tenant_text IS NULL OR length(tenant_text) = 0 THEN
RAISE EXCEPTION 'app.current_tenant is not set for the current session';
END IF;
RETURN tenant_text::uuid;
END;
$$;
-- Export Profiles: defines scope and configuration for exports
CREATE TABLE IF NOT EXISTS export_center.export_profiles
(
profile_id uuid PRIMARY KEY,
tenant_id uuid NOT NULL,
name text NOT NULL CHECK (length(name) BETWEEN 1 AND 256),
description text,
kind smallint NOT NULL CHECK (kind BETWEEN 1 AND 4),
status smallint NOT NULL CHECK (status BETWEEN 1 AND 4),
scope_json jsonb,
format_json jsonb,
signing_json jsonb,
schedule text,
created_at timestamptz NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC'),
updated_at timestamptz NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC'),
archived_at timestamptz
);
CREATE INDEX IF NOT EXISTS ix_export_profiles_tenant_status
ON export_center.export_profiles (tenant_id, status);
CREATE UNIQUE INDEX IF NOT EXISTS uq_export_profiles_tenant_name
ON export_center.export_profiles (tenant_id, name) WHERE archived_at IS NULL;
ALTER TABLE export_center.export_profiles
ENABLE ROW LEVEL SECURITY;
CREATE POLICY IF NOT EXISTS export_profiles_isolation
ON export_center.export_profiles
USING (tenant_id = export_center_app.require_current_tenant())
WITH CHECK (tenant_id = export_center_app.require_current_tenant());
-- Export Runs: tracks individual export executions
CREATE TABLE IF NOT EXISTS export_center.export_runs
(
run_id uuid PRIMARY KEY,
profile_id uuid NOT NULL,
tenant_id uuid NOT NULL,
status smallint NOT NULL CHECK (status BETWEEN 1 AND 6),
trigger smallint NOT NULL CHECK (trigger BETWEEN 1 AND 4),
correlation_id text,
initiated_by text,
total_items integer NOT NULL DEFAULT 0 CHECK (total_items >= 0),
processed_items integer NOT NULL DEFAULT 0 CHECK (processed_items >= 0),
failed_items integer NOT NULL DEFAULT 0 CHECK (failed_items >= 0),
total_size_bytes bigint NOT NULL DEFAULT 0 CHECK (total_size_bytes >= 0),
error_json jsonb,
created_at timestamptz NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC'),
started_at timestamptz,
completed_at timestamptz,
expires_at timestamptz,
CONSTRAINT fk_runs_profile FOREIGN KEY (profile_id) REFERENCES export_center.export_profiles (profile_id)
);
CREATE INDEX IF NOT EXISTS ix_export_runs_tenant_status
ON export_center.export_runs (tenant_id, status);
CREATE INDEX IF NOT EXISTS ix_export_runs_profile_created
ON export_center.export_runs (profile_id, created_at DESC);
CREATE INDEX IF NOT EXISTS ix_export_runs_correlation
ON export_center.export_runs (correlation_id) WHERE correlation_id IS NOT NULL;
ALTER TABLE export_center.export_runs
ENABLE ROW LEVEL SECURITY;
CREATE POLICY IF NOT EXISTS export_runs_isolation
ON export_center.export_runs
USING (tenant_id = export_center_app.require_current_tenant())
WITH CHECK (tenant_id = export_center_app.require_current_tenant());
-- Export Inputs: tracks items included in each export run
CREATE TABLE IF NOT EXISTS export_center.export_inputs
(
input_id uuid PRIMARY KEY,
run_id uuid NOT NULL,
tenant_id uuid NOT NULL,
kind smallint NOT NULL CHECK (kind BETWEEN 1 AND 8),
status smallint NOT NULL CHECK (status BETWEEN 1 AND 5),
source_ref text NOT NULL CHECK (length(source_ref) BETWEEN 1 AND 512),
name text,
content_hash text CHECK (content_hash IS NULL OR content_hash ~ '^[0-9a-f]{64}$'),
size_bytes bigint NOT NULL DEFAULT 0 CHECK (size_bytes >= 0),
metadata_json jsonb,
error_json jsonb,
created_at timestamptz NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC'),
processed_at timestamptz,
CONSTRAINT fk_inputs_run FOREIGN KEY (run_id) REFERENCES export_center.export_runs (run_id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS ix_export_inputs_run_status
ON export_center.export_inputs (run_id, status);
CREATE INDEX IF NOT EXISTS ix_export_inputs_tenant_kind
ON export_center.export_inputs (tenant_id, kind);
CREATE INDEX IF NOT EXISTS ix_export_inputs_source_ref
ON export_center.export_inputs (tenant_id, source_ref);
ALTER TABLE export_center.export_inputs
ENABLE ROW LEVEL SECURITY;
CREATE POLICY IF NOT EXISTS export_inputs_isolation
ON export_center.export_inputs
USING (tenant_id = export_center_app.require_current_tenant())
WITH CHECK (tenant_id = export_center_app.require_current_tenant());
-- Export Distributions: tracks artifact distribution to targets
CREATE TABLE IF NOT EXISTS export_center.export_distributions
(
distribution_id uuid PRIMARY KEY,
run_id uuid NOT NULL,
tenant_id uuid NOT NULL,
kind smallint NOT NULL CHECK (kind BETWEEN 1 AND 5),
status smallint NOT NULL CHECK (status BETWEEN 1 AND 6),
target text NOT NULL CHECK (length(target) BETWEEN 1 AND 1024),
artifact_path text NOT NULL CHECK (length(artifact_path) BETWEEN 1 AND 1024),
artifact_hash text CHECK (artifact_hash IS NULL OR artifact_hash ~ '^[0-9a-f]{64}$'),
size_bytes bigint NOT NULL DEFAULT 0 CHECK (size_bytes >= 0),
content_type text,
metadata_json jsonb,
error_json jsonb,
attempt_count integer NOT NULL DEFAULT 0 CHECK (attempt_count >= 0),
created_at timestamptz NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC'),
distributed_at timestamptz,
verified_at timestamptz,
CONSTRAINT fk_distributions_run FOREIGN KEY (run_id) REFERENCES export_center.export_runs (run_id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS ix_export_distributions_run_status
ON export_center.export_distributions (run_id, status);
CREATE INDEX IF NOT EXISTS ix_export_distributions_tenant_kind
ON export_center.export_distributions (tenant_id, kind);
ALTER TABLE export_center.export_distributions
ENABLE ROW LEVEL SECURITY;
CREATE POLICY IF NOT EXISTS export_distributions_isolation
ON export_center.export_distributions
USING (tenant_id = export_center_app.require_current_tenant())
WITH CHECK (tenant_id = export_center_app.require_current_tenant());
-- Trigger function to update timestamps
CREATE OR REPLACE FUNCTION export_center_app.update_updated_at()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
NEW.updated_at := NOW() AT TIME ZONE 'UTC';
RETURN NEW;
END;
$$;
CREATE TRIGGER trg_export_profiles_updated_at
BEFORE UPDATE ON export_center.export_profiles
FOR EACH ROW
EXECUTE FUNCTION export_center_app.update_updated_at();

View File

@@ -15,7 +15,14 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="10.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="10.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="10.0.0" />
<PackageReference Include="Npgsql" Version="8.0.3" />
</ItemGroup>
<ItemGroup>
<EmbeddedResource Include="Db\Migrations\*.sql" />
</ItemGroup>
</Project>