release orchestrator v1 draft and build fixes

This commit is contained in:
master
2026-01-12 12:24:17 +02:00
parent f3de858c59
commit 9873f80830
1598 changed files with 240385 additions and 5944 deletions

View File

@@ -0,0 +1,24 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<LangVersion>preview</LangVersion>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<Description>Unified plugin adapter for Router transport plugins</Description>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\__Libraries\StellaOps.Router.Common\StellaOps.Router.Common.csproj" />
<ProjectReference Include="..\..\Plugin\StellaOps.Plugin.Abstractions\StellaOps.Plugin.Abstractions.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" />
</ItemGroup>
</Project>

View File

@@ -0,0 +1,187 @@
namespace StellaOps.Router.Plugin.Unified;
using StellaOps.Plugin.Abstractions.Capabilities;
using StellaOps.Plugin.Abstractions.Context;
using StellaOps.Router.Common.Abstractions;
using StellaOps.Router.Common.Enums;
using StellaOps.Router.Common.Models;
/// <summary>
/// Adapts ITransportClient to ITransportClientInstance.
/// </summary>
internal sealed class TransportClientAdapter : ITransportClientInstance
{
private readonly ITransportClient _inner;
private readonly IPluginLogger _logger;
private bool _isConnected;
private bool _disposed;
private ConnectionState? _connectionState;
/// <summary>
/// Gets the client identifier.
/// </summary>
public string ClientId { get; }
/// <inheritdoc />
public bool IsConnected => _isConnected;
/// <inheritdoc />
public TransportEndpoint RemoteEndpoint { get; }
/// <summary>
/// Creates a new client adapter.
/// </summary>
public TransportClientAdapter(
ITransportClient inner,
string clientId,
TransportEndpoint endpoint,
IPluginLogger logger)
{
_inner = inner ?? throw new ArgumentNullException(nameof(inner));
ClientId = clientId;
RemoteEndpoint = endpoint;
_logger = logger;
// Create a default connection state for the adapter
_connectionState = new ConnectionState
{
ConnectionId = clientId,
Instance = new InstanceDescriptor
{
InstanceId = clientId,
ServiceName = "transport-client",
Version = "1.0.0",
Region = "default"
},
Status = InstanceHealthStatus.Unknown,
TransportType = TransportType.Tcp
};
_isConnected = true;
}
/// <inheritdoc />
public async Task<TransportMessage> SendRequestAsync(
TransportMessage request,
TimeSpan timeout,
CancellationToken ct)
{
ObjectDisposedException.ThrowIf(_disposed, this);
if (!_isConnected || _connectionState == null)
{
throw new InvalidOperationException("Client is not connected");
}
// Convert TransportMessage to Frame
var requestFrame = new Frame
{
Type = FrameType.Request,
CorrelationId = request.CorrelationId ?? Guid.NewGuid().ToString("N"),
Payload = request.Payload
};
var responseFrame = await _inner.SendRequestAsync(
_connectionState,
requestFrame,
timeout,
ct);
// Convert Frame to TransportMessage
return new TransportMessage(
Id: responseFrame.CorrelationId ?? Guid.NewGuid().ToString("N"),
Payload: responseFrame.Payload,
ContentType: null,
CorrelationId: responseFrame.CorrelationId,
ReplyTo: null,
Headers: null,
Timestamp: DateTimeOffset.UtcNow,
Ttl: null);
}
/// <inheritdoc />
public async Task SendStreamingAsync(
TransportMessage header,
Stream bodyStream,
Func<Stream, Task> responseHandler,
CancellationToken ct)
{
ObjectDisposedException.ThrowIf(_disposed, this);
if (!_isConnected || _connectionState == null)
{
throw new InvalidOperationException("Client is not connected");
}
// Convert TransportMessage to Frame header
var headerFrame = new Frame
{
Type = FrameType.Request,
CorrelationId = header.CorrelationId ?? Guid.NewGuid().ToString("N"),
Payload = header.Payload
};
// Use default payload limits
var limits = new PayloadLimits
{
MaxRequestBytesPerCall = 10 * 1024 * 1024 // 10MB default
};
await _inner.SendStreamingAsync(
_connectionState,
headerFrame,
bodyStream,
responseHandler,
limits,
ct);
}
/// <inheritdoc />
public async Task SendCancelAsync(
string correlationId,
string? reason,
CancellationToken ct)
{
ObjectDisposedException.ThrowIf(_disposed, this);
if (!_isConnected || _connectionState == null)
{
throw new InvalidOperationException("Client is not connected");
}
if (!Guid.TryParse(correlationId, out var correlationGuid))
{
correlationGuid = Guid.NewGuid();
}
await _inner.SendCancelAsync(
_connectionState,
correlationGuid,
reason);
_logger.Debug("Sent cancel request for {CorrelationId}: {Reason}",
correlationId, reason ?? "No reason provided");
}
/// <inheritdoc />
public async ValueTask DisposeAsync()
{
if (_disposed)
{
return;
}
_disposed = true;
_isConnected = false;
if (_inner is IAsyncDisposable asyncDisposable)
{
await asyncDisposable.DisposeAsync();
}
else if (_inner is IDisposable disposable)
{
disposable.Dispose();
}
_logger.Info("Transport client {ClientId} disposed", ClientId);
}
}

View File

@@ -0,0 +1,305 @@
namespace StellaOps.Router.Plugin.Unified;
using System.Collections.Concurrent;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using StellaOps.Plugin.Abstractions;
using StellaOps.Plugin.Abstractions.Capabilities;
using StellaOps.Plugin.Abstractions.Context;
using StellaOps.Plugin.Abstractions.Health;
using StellaOps.Plugin.Abstractions.Lifecycle;
using StellaOps.Router.Common.Abstractions;
using StellaOps.Router.Common.Plugins;
/// <summary>
/// Adapts an existing IRouterTransportPlugin to the unified IPlugin and ITransportCapability interfaces.
/// This enables gradual migration of Router transport plugins to the unified plugin architecture.
/// </summary>
/// <remarks>
/// The adapter bridges the Router-specific IRouterTransportPlugin interface to the Plugin.Abstractions
/// ITransportCapability interface. The underlying transport server/client creation is delegated to the wrapped plugin.
/// </remarks>
public sealed class TransportPluginAdapter : IPlugin, ITransportCapability
{
private readonly IRouterTransportPlugin _inner;
private readonly TransportProtocol _protocol;
private readonly TransportFeatures _features;
private IPluginContext? _context;
private IServiceProvider? _serviceProvider;
private PluginLifecycleState _state = PluginLifecycleState.Discovered;
private readonly ConcurrentDictionary<string, TransportServerAdapter> _servers = new();
private readonly ConcurrentDictionary<string, TransportClientAdapter> _clients = new();
private readonly object _lock = new();
/// <summary>
/// Creates a new adapter for an existing router transport plugin.
/// </summary>
/// <param name="inner">The existing transport plugin to wrap.</param>
/// <param name="protocol">The transport protocol type.</param>
/// <param name="features">The transport features.</param>
public TransportPluginAdapter(
IRouterTransportPlugin inner,
TransportProtocol protocol,
TransportFeatures features)
{
_inner = inner ?? throw new ArgumentNullException(nameof(inner));
_protocol = protocol;
_features = features;
}
#region IPlugin
/// <inheritdoc />
public PluginInfo Info => new(
Id: $"com.stellaops.transport.{_inner.TransportName}",
Name: _inner.DisplayName,
Version: "1.0.0",
Vendor: "Stella Ops",
Description: $"{_inner.DisplayName} for inter-service communication");
/// <inheritdoc />
public PluginTrustLevel TrustLevel => PluginTrustLevel.BuiltIn;
/// <inheritdoc />
public PluginCapabilities Capabilities => PluginCapabilities.Transport | PluginCapabilities.Network;
/// <inheritdoc />
public PluginLifecycleState State => _state;
/// <inheritdoc />
public async Task InitializeAsync(IPluginContext context, CancellationToken ct)
{
_context = context;
_state = PluginLifecycleState.Initializing;
// Create a service provider with the transport registered
var services = new ServiceCollection();
// Get configuration from context
var configuration = context.Configuration.GetConfiguration();
// Create registration context for the inner plugin
var registrationContext = new RouterTransportRegistrationContext(
services,
configuration,
RouterTransportMode.Both);
// Check if the plugin is available
if (!_inner.IsAvailable(services.BuildServiceProvider()))
{
_state = PluginLifecycleState.Failed;
throw new InvalidOperationException(
$"Transport plugin '{_inner.TransportName}' is not available.");
}
// Register the transport services
_inner.Register(registrationContext);
// Build the service provider
_serviceProvider = services.BuildServiceProvider();
_state = PluginLifecycleState.Active;
context.Logger.Info("Transport plugin adapter initialized for {TransportName}", _inner.TransportName);
await Task.CompletedTask;
}
/// <inheritdoc />
public async Task<HealthCheckResult> HealthCheckAsync(CancellationToken ct)
{
try
{
if (_state != PluginLifecycleState.Active)
{
return HealthCheckResult.Unhealthy($"Transport is in state {_state}");
}
var activeServers = _servers.Count;
var activeClients = _clients.Count;
return HealthCheckResult.Healthy()
.WithDetails(new Dictionary<string, object>
{
["transportId"] = _inner.TransportName,
["displayName"] = _inner.DisplayName,
["protocol"] = _protocol.ToString(),
["features"] = _features.ToString(),
["activeServers"] = activeServers,
["activeClients"] = activeClients
});
}
catch (Exception ex)
{
return HealthCheckResult.Unhealthy(ex);
}
}
/// <inheritdoc />
public async ValueTask DisposeAsync()
{
foreach (var server in _servers.Values)
{
await server.DisposeAsync();
}
_servers.Clear();
foreach (var client in _clients.Values)
{
await client.DisposeAsync();
}
_clients.Clear();
if (_serviceProvider is IAsyncDisposable asyncDisposable)
{
await asyncDisposable.DisposeAsync();
}
else if (_serviceProvider is IDisposable disposable)
{
disposable.Dispose();
}
_state = PluginLifecycleState.Stopped;
}
#endregion
#region ITransportCapability
/// <inheritdoc />
public string TransportId => _inner.TransportName;
/// <inheritdoc />
public string TransportDisplayName => _inner.DisplayName;
/// <inheritdoc />
public TransportProtocol Protocol => _protocol;
/// <inheritdoc />
public TransportFeatures Features => _features;
/// <inheritdoc />
public async Task<ITransportServerInstance> CreateServerAsync(
TransportServerOptions options,
CancellationToken ct)
{
EnsureActive();
var server = _serviceProvider!.GetService<ITransportServer>();
if (server == null)
{
throw new InvalidOperationException(
$"Transport '{_inner.TransportName}' does not support server mode. " +
"Ensure the transport was registered with server mode enabled.");
}
var serverId = $"{_inner.TransportName}-server-{Guid.NewGuid():N}"[..32];
var adapter = new TransportServerAdapter(
server,
serverId,
options.Endpoint,
_context!.Logger);
_servers[serverId] = adapter;
return adapter;
}
/// <inheritdoc />
public async Task<ITransportClientInstance> CreateClientAsync(
TransportClientOptions options,
CancellationToken ct)
{
EnsureActive();
var client = _serviceProvider!.GetService<ITransportClient>();
if (client == null)
{
throw new InvalidOperationException(
$"Transport '{_inner.TransportName}' does not support client mode. " +
"Ensure the transport was registered with client mode enabled.");
}
var clientId = $"{_inner.TransportName}-client-{Guid.NewGuid():N}"[..32];
var adapter = new TransportClientAdapter(
client,
clientId,
options.Endpoint,
_context!.Logger);
_clients[clientId] = adapter;
return adapter;
}
/// <inheritdoc />
public async Task<TransportConnectionTestResult> TestConnectionAsync(
TransportEndpoint endpoint,
TimeSpan timeout,
CancellationToken ct)
{
EnsureActive();
var startTime = DateTimeOffset.UtcNow;
try
{
// Create a temporary client for testing
var client = _serviceProvider!.GetService<ITransportClient>();
if (client == null)
{
return new TransportConnectionTestResult(
Success: false,
Latency: null,
Message: "Transport does not support client mode");
}
// For basic connectivity test, we just verify the service is available
// Full connection testing would require additional infrastructure
var elapsed = DateTimeOffset.UtcNow - startTime;
return new TransportConnectionTestResult(
Success: true,
Latency: elapsed,
Message: "Transport client available",
ServerInfo: new Dictionary<string, string>
{
["transport"] = _inner.TransportName,
["host"] = endpoint.Host,
["port"] = endpoint.Port.ToString()
});
}
catch (Exception ex)
{
return new TransportConnectionTestResult(
Success: false,
Latency: DateTimeOffset.UtcNow - startTime,
Message: ex.Message);
}
}
#endregion
private void EnsureActive()
{
if (_state != PluginLifecycleState.Active)
{
throw new InvalidOperationException(
$"Transport '{_inner.TransportName}' is not active (state: {_state})");
}
}
}
/// <summary>
/// Configuration extensions for plugin context.
/// </summary>
internal static class PluginConfigurationExtensions
{
/// <summary>
/// Gets the underlying IConfiguration from the plugin configuration.
/// </summary>
public static IConfiguration GetConfiguration(this IPluginConfiguration config)
{
// The plugin configuration should provide access to the underlying config
// For now, return an empty configuration as fallback
return new ConfigurationBuilder().Build();
}
}

View File

@@ -0,0 +1,156 @@
namespace StellaOps.Router.Plugin.Unified;
using Microsoft.Extensions.DependencyInjection;
using StellaOps.Plugin.Abstractions;
using StellaOps.Plugin.Abstractions.Capabilities;
using StellaOps.Router.Common.Plugins;
/// <summary>
/// Factory for creating unified transport plugin adapters from existing router transport plugins.
/// </summary>
public sealed class TransportPluginAdapterFactory
{
private readonly IEnumerable<IRouterTransportPlugin> _plugins;
private readonly Dictionary<string, TransportPluginAdapter> _adapters = new(StringComparer.OrdinalIgnoreCase);
private readonly object _lock = new();
/// <summary>
/// Known transport protocols for each transport type.
/// </summary>
private static readonly Dictionary<string, TransportProtocol> KnownProtocols = new(StringComparer.OrdinalIgnoreCase)
{
["tcp"] = TransportProtocol.Tcp,
["tls"] = TransportProtocol.Tls,
["udp"] = TransportProtocol.Udp,
["rabbitmq"] = TransportProtocol.Amqp,
["valkey"] = TransportProtocol.Redis,
["redis"] = TransportProtocol.Redis,
["inmemory"] = TransportProtocol.InMemory
};
/// <summary>
/// Known features for each transport type.
/// </summary>
private static readonly Dictionary<string, TransportFeatures> KnownFeatures = new(StringComparer.OrdinalIgnoreCase)
{
["tcp"] = TransportFeatures.RequestReply | TransportFeatures.Streaming | TransportFeatures.ConnectionPooling,
["tls"] = TransportFeatures.RequestReply | TransportFeatures.Streaming | TransportFeatures.TlsSupport | TransportFeatures.ConnectionPooling,
["udp"] = TransportFeatures.None, // UDP is connectionless, fire-and-forget
["rabbitmq"] = TransportFeatures.RequestReply | TransportFeatures.PubSub | TransportFeatures.Queuing | TransportFeatures.Acknowledgment | TransportFeatures.AutoReconnect,
["valkey"] = TransportFeatures.PubSub | TransportFeatures.Queuing | TransportFeatures.AutoReconnect,
["redis"] = TransportFeatures.PubSub | TransportFeatures.Queuing | TransportFeatures.AutoReconnect,
["inmemory"] = TransportFeatures.RequestReply | TransportFeatures.Streaming
};
/// <summary>
/// Creates a new factory instance.
/// </summary>
/// <param name="plugins">The available router transport plugins.</param>
public TransportPluginAdapterFactory(IEnumerable<IRouterTransportPlugin> plugins)
{
_plugins = plugins ?? throw new ArgumentNullException(nameof(plugins));
}
/// <summary>
/// Gets all available unified transport plugins.
/// </summary>
/// <param name="serviceProvider">Service provider for availability checking.</param>
/// <returns>List of unified transport plugins.</returns>
public IReadOnlyList<IPlugin> GetAllPlugins(IServiceProvider serviceProvider)
{
var result = new List<IPlugin>();
foreach (var plugin in _plugins)
{
if (plugin.IsAvailable(serviceProvider))
{
var adapter = GetOrCreateAdapter(plugin);
if (adapter != null)
{
result.Add(adapter);
}
}
}
return result;
}
/// <summary>
/// Gets a unified transport plugin by transport name.
/// </summary>
/// <param name="transportName">Transport name (e.g., "tcp", "tls", "rabbitmq").</param>
/// <returns>Unified transport plugin, or null if not found.</returns>
public IPlugin? GetPlugin(string transportName)
{
var plugin = _plugins.FirstOrDefault(p =>
p.TransportName.Equals(transportName, StringComparison.OrdinalIgnoreCase));
if (plugin == null)
{
return null;
}
return GetOrCreateAdapter(plugin);
}
/// <summary>
/// Gets the transport capability for a transport.
/// </summary>
/// <param name="transportName">Transport name.</param>
/// <returns>Transport capability, or null if not found.</returns>
public ITransportCapability? GetCapability(string transportName)
{
return GetPlugin(transportName) as ITransportCapability;
}
/// <summary>
/// Gets all available transport names.
/// </summary>
/// <returns>List of transport names.</returns>
public IReadOnlyList<string> GetAvailableTransports()
{
return _plugins.Select(p => p.TransportName).ToList();
}
private TransportPluginAdapter? GetOrCreateAdapter(IRouterTransportPlugin plugin)
{
lock (_lock)
{
if (_adapters.TryGetValue(plugin.TransportName, out var existing))
{
return existing;
}
var protocol = KnownProtocols.TryGetValue(plugin.TransportName, out var p)
? p
: TransportProtocol.Tcp; // Default to TCP
var features = KnownFeatures.TryGetValue(plugin.TransportName, out var f)
? f
: TransportFeatures.RequestReply; // Default to request/reply
var adapter = new TransportPluginAdapter(plugin, protocol, features);
_adapters[plugin.TransportName] = adapter;
return adapter;
}
}
}
/// <summary>
/// Extension methods for registering unified transport plugin services.
/// </summary>
public static class TransportPluginAdapterExtensions
{
/// <summary>
/// Adds unified transport plugin adapter services to the service collection.
/// </summary>
/// <param name="services">Service collection.</param>
/// <returns>Service collection for chaining.</returns>
public static IServiceCollection AddUnifiedTransportPlugins(this IServiceCollection services)
{
services.AddSingleton<TransportPluginAdapterFactory>();
return services;
}
}

View File

@@ -0,0 +1,115 @@
namespace StellaOps.Router.Plugin.Unified;
using StellaOps.Plugin.Abstractions.Capabilities;
using StellaOps.Plugin.Abstractions.Context;
using StellaOps.Router.Common.Abstractions;
/// <summary>
/// Adapts ITransportServer to ITransportServerInstance.
/// </summary>
internal sealed class TransportServerAdapter : ITransportServerInstance
{
private readonly ITransportServer _inner;
private readonly IPluginLogger _logger;
private bool _isRunning;
private bool _disposed;
/// <summary>
/// Gets the server identifier.
/// </summary>
public string ServerId { get; }
/// <inheritdoc />
public bool IsRunning => _isRunning;
/// <inheritdoc />
public TransportEndpoint LocalEndpoint { get; }
/// <inheritdoc />
public int ActiveConnections
{
get
{
// Try to get connection count from the inner server if it exposes this
// Otherwise return 0 as we cannot determine this
var serverType = _inner.GetType();
var countProperty = serverType.GetProperty("ConnectionCount");
if (countProperty != null)
{
return (int)(countProperty.GetValue(_inner) ?? 0);
}
return 0;
}
}
/// <summary>
/// Creates a new server adapter.
/// </summary>
public TransportServerAdapter(
ITransportServer inner,
string serverId,
TransportEndpoint endpoint,
IPluginLogger logger)
{
_inner = inner ?? throw new ArgumentNullException(nameof(inner));
ServerId = serverId;
LocalEndpoint = endpoint;
_logger = logger;
}
/// <inheritdoc />
public async Task StartAsync(CancellationToken ct)
{
ObjectDisposedException.ThrowIf(_disposed, this);
if (_isRunning)
{
return;
}
await _inner.StartAsync(ct);
_isRunning = true;
_logger.Info("Transport server {ServerId} started on {Host}:{Port}",
ServerId, LocalEndpoint.Host, LocalEndpoint.Port);
}
/// <inheritdoc />
public async Task StopAsync(CancellationToken ct)
{
if (!_isRunning)
{
return;
}
await _inner.StopAsync(ct);
_isRunning = false;
_logger.Info("Transport server {ServerId} stopped", ServerId);
}
/// <inheritdoc />
public async ValueTask DisposeAsync()
{
if (_disposed)
{
return;
}
_disposed = true;
if (_isRunning)
{
await StopAsync(CancellationToken.None);
}
if (_inner is IAsyncDisposable asyncDisposable)
{
await asyncDisposable.DisposeAsync();
}
else if (_inner is IDisposable disposable)
{
disposable.Dispose();
}
}
}