312 lines
11 KiB
C#
312 lines
11 KiB
C#
using System.Text.Json;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Microsoft.Extensions.Logging;
|
|
using StellaOps.Router.Common.Frames;
|
|
|
|
namespace StellaOps.Microservice;
|
|
|
|
/// <summary>
|
|
/// Dispatches incoming REQUEST frames to the appropriate endpoint handlers.
|
|
/// </summary>
|
|
public sealed class RequestDispatcher
|
|
{
|
|
private readonly IEndpointRegistry _registry;
|
|
private readonly IServiceProvider _serviceProvider;
|
|
private readonly ILogger<RequestDispatcher> _logger;
|
|
private readonly JsonSerializerOptions _jsonOptions;
|
|
|
|
/// <summary>
|
|
/// Initializes a new instance of the <see cref="RequestDispatcher"/> class.
|
|
/// </summary>
|
|
/// <param name="registry">The endpoint registry.</param>
|
|
/// <param name="serviceProvider">The service provider for resolving handlers.</param>
|
|
/// <param name="logger">The logger.</param>
|
|
/// <param name="jsonOptions">Optional JSON serialization options.</param>
|
|
public RequestDispatcher(
|
|
IEndpointRegistry registry,
|
|
IServiceProvider serviceProvider,
|
|
ILogger<RequestDispatcher> logger,
|
|
JsonSerializerOptions? jsonOptions = null)
|
|
{
|
|
_registry = registry;
|
|
_serviceProvider = serviceProvider;
|
|
_logger = logger;
|
|
_jsonOptions = jsonOptions ?? new JsonSerializerOptions
|
|
{
|
|
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
|
|
PropertyNameCaseInsensitive = true
|
|
};
|
|
}
|
|
|
|
/// <summary>
|
|
/// Dispatches a REQUEST frame and returns a RESPONSE frame.
|
|
/// </summary>
|
|
/// <param name="request">The incoming REQUEST frame.</param>
|
|
/// <param name="cancellationToken">Cancellation token.</param>
|
|
/// <returns>The RESPONSE frame.</returns>
|
|
public async Task<ResponseFrame> DispatchAsync(RequestFrame request, CancellationToken cancellationToken)
|
|
{
|
|
_logger.LogDebug(
|
|
"Dispatching request {RequestId}: {Method} {Path}",
|
|
request.RequestId,
|
|
request.Method,
|
|
request.Path);
|
|
|
|
try
|
|
{
|
|
// Find matching endpoint
|
|
if (!_registry.TryMatch(request.Method, request.Path, out var match) || match is null)
|
|
{
|
|
_logger.LogWarning(
|
|
"No endpoint found for {Method} {Path}",
|
|
request.Method,
|
|
request.Path);
|
|
|
|
return CreateErrorResponse(request.RequestId, 404, "Not Found");
|
|
}
|
|
|
|
// Create request context
|
|
var context = CreateRequestContext(request, match.PathParameters);
|
|
|
|
// Resolve and invoke handler within a scope
|
|
RawResponse response;
|
|
await using (var scope = _serviceProvider.CreateAsyncScope())
|
|
{
|
|
response = await InvokeHandlerAsync(scope.ServiceProvider, match.Endpoint, context, cancellationToken);
|
|
}
|
|
|
|
// Convert to response frame
|
|
return await CreateResponseFrameAsync(request.RequestId, response, cancellationToken);
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
_logger.LogInformation("Request {RequestId} was cancelled", request.RequestId);
|
|
throw;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error dispatching request {RequestId}", request.RequestId);
|
|
return CreateErrorResponse(request.RequestId, 500, "Internal Server Error");
|
|
}
|
|
}
|
|
|
|
private RawRequestContext CreateRequestContext(RequestFrame request, IReadOnlyDictionary<string, string> pathParameters)
|
|
{
|
|
var headers = new HeaderCollection();
|
|
foreach (var (key, value) in request.Headers)
|
|
{
|
|
headers.Add(key, value);
|
|
}
|
|
|
|
return new RawRequestContext
|
|
{
|
|
Method = request.Method,
|
|
Path = request.Path,
|
|
PathParameters = pathParameters,
|
|
Headers = headers,
|
|
Body = new MemoryStream(request.Payload.ToArray()),
|
|
CancellationToken = CancellationToken.None, // Will be overridden by caller
|
|
CorrelationId = request.CorrelationId
|
|
};
|
|
}
|
|
|
|
private async Task<RawResponse> InvokeHandlerAsync(
|
|
IServiceProvider scopedProvider,
|
|
EndpointDescriptor endpoint,
|
|
RawRequestContext context,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
// Get handler instance from DI
|
|
var handler = scopedProvider.GetService(endpoint.HandlerType);
|
|
if (handler is null)
|
|
{
|
|
_logger.LogError(
|
|
"Handler type {HandlerType} not registered in DI container",
|
|
endpoint.HandlerType.Name);
|
|
return RawResponse.InternalError("Handler not found");
|
|
}
|
|
|
|
// Determine handler type and invoke
|
|
if (handler is IRawStellaEndpoint rawHandler)
|
|
{
|
|
return await rawHandler.HandleAsync(context, cancellationToken);
|
|
}
|
|
|
|
// Check for typed handlers - need to use reflection to invoke generic adapter
|
|
var handlerInterfaces = endpoint.HandlerType.GetInterfaces();
|
|
|
|
// Check for IStellaEndpoint<TRequest, TResponse>
|
|
var typedWithRequestInterface = handlerInterfaces
|
|
.FirstOrDefault(i => i.IsGenericType &&
|
|
i.GetGenericTypeDefinition() == typeof(IStellaEndpoint<,>));
|
|
|
|
if (typedWithRequestInterface is not null)
|
|
{
|
|
var typeArgs = typedWithRequestInterface.GetGenericArguments();
|
|
return await InvokeTypedHandlerWithRequestAsync(handler, typeArgs[0], typeArgs[1], context, cancellationToken);
|
|
}
|
|
|
|
// Check for IStellaEndpoint<TResponse>
|
|
var typedNoRequestInterface = handlerInterfaces
|
|
.FirstOrDefault(i => i.IsGenericType &&
|
|
i.GetGenericTypeDefinition() == typeof(IStellaEndpoint<>));
|
|
|
|
if (typedNoRequestInterface is not null)
|
|
{
|
|
var typeArgs = typedNoRequestInterface.GetGenericArguments();
|
|
return await InvokeTypedHandlerNoRequestAsync(handler, typeArgs[0], context, cancellationToken);
|
|
}
|
|
|
|
_logger.LogError(
|
|
"Handler type {HandlerType} does not implement a supported endpoint interface",
|
|
endpoint.HandlerType.Name);
|
|
return RawResponse.InternalError("Invalid handler type");
|
|
}
|
|
|
|
private async Task<RawResponse> InvokeTypedHandlerWithRequestAsync(
|
|
object handler,
|
|
Type requestType,
|
|
Type responseType,
|
|
RawRequestContext context,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
try
|
|
{
|
|
// Deserialize request
|
|
object? request;
|
|
if (context.Body == Stream.Null || context.Body.Length == 0)
|
|
{
|
|
request = null;
|
|
}
|
|
else
|
|
{
|
|
context.Body.Position = 0;
|
|
request = await JsonSerializer.DeserializeAsync(context.Body, requestType, _jsonOptions, cancellationToken);
|
|
}
|
|
|
|
if (request is null)
|
|
{
|
|
return RawResponse.BadRequest("Invalid request body");
|
|
}
|
|
|
|
// Get HandleAsync method
|
|
var handleMethod = handler.GetType().GetMethod("HandleAsync", [requestType, typeof(CancellationToken)]);
|
|
if (handleMethod is null)
|
|
{
|
|
return RawResponse.InternalError("Handler method not found");
|
|
}
|
|
|
|
// Invoke handler
|
|
var resultTask = handleMethod.Invoke(handler, [request, cancellationToken]);
|
|
if (resultTask is null)
|
|
{
|
|
return RawResponse.InternalError("Handler returned null");
|
|
}
|
|
|
|
await (Task)resultTask;
|
|
|
|
// Get result from Task<TResponse>
|
|
var resultProperty = resultTask.GetType().GetProperty("Result");
|
|
var response = resultProperty?.GetValue(resultTask);
|
|
|
|
return SerializeResponse(response, responseType);
|
|
}
|
|
catch (JsonException ex)
|
|
{
|
|
return RawResponse.BadRequest($"Invalid JSON: {ex.Message}");
|
|
}
|
|
}
|
|
|
|
private async Task<RawResponse> InvokeTypedHandlerNoRequestAsync(
|
|
object handler,
|
|
Type responseType,
|
|
RawRequestContext context,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
try
|
|
{
|
|
// Get HandleAsync method
|
|
var handleMethod = handler.GetType().GetMethod("HandleAsync", [typeof(CancellationToken)]);
|
|
if (handleMethod is null)
|
|
{
|
|
return RawResponse.InternalError("Handler method not found");
|
|
}
|
|
|
|
// Invoke handler
|
|
var resultTask = handleMethod.Invoke(handler, [cancellationToken]);
|
|
if (resultTask is null)
|
|
{
|
|
return RawResponse.InternalError("Handler returned null");
|
|
}
|
|
|
|
await (Task)resultTask;
|
|
|
|
// Get result from Task<TResponse>
|
|
var resultProperty = resultTask.GetType().GetProperty("Result");
|
|
var response = resultProperty?.GetValue(resultTask);
|
|
|
|
return SerializeResponse(response, responseType);
|
|
}
|
|
catch (Exception ex) when (ex is not OperationCanceledException)
|
|
{
|
|
return RawResponse.InternalError(ex.Message);
|
|
}
|
|
}
|
|
|
|
private RawResponse SerializeResponse(object? response, Type responseType)
|
|
{
|
|
var json = JsonSerializer.SerializeToUtf8Bytes(response, responseType, _jsonOptions);
|
|
var headers = new HeaderCollection();
|
|
headers.Set("Content-Type", "application/json; charset=utf-8");
|
|
|
|
return new RawResponse
|
|
{
|
|
StatusCode = 200,
|
|
Headers = headers,
|
|
Body = new MemoryStream(json)
|
|
};
|
|
}
|
|
|
|
private static ResponseFrame CreateErrorResponse(string requestId, int statusCode, string message)
|
|
{
|
|
var headers = new HeaderCollection();
|
|
headers.Set("Content-Type", "text/plain; charset=utf-8");
|
|
|
|
return new ResponseFrame
|
|
{
|
|
RequestId = requestId,
|
|
StatusCode = statusCode,
|
|
Headers = headers.ToDictionary(h => h.Key, h => h.Value),
|
|
Payload = System.Text.Encoding.UTF8.GetBytes(message)
|
|
};
|
|
}
|
|
|
|
private static async Task<ResponseFrame> CreateResponseFrameAsync(
|
|
string requestId,
|
|
RawResponse response,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
// Read response body
|
|
byte[] payload;
|
|
if (response.Body == Stream.Null)
|
|
{
|
|
payload = [];
|
|
}
|
|
else
|
|
{
|
|
using var ms = new MemoryStream();
|
|
response.Body.Position = 0;
|
|
await response.Body.CopyToAsync(ms, cancellationToken);
|
|
payload = ms.ToArray();
|
|
}
|
|
|
|
return new ResponseFrame
|
|
{
|
|
RequestId = requestId,
|
|
StatusCode = response.StatusCode,
|
|
Headers = response.Headers.ToDictionary(h => h.Key, h => h.Value),
|
|
Payload = payload
|
|
};
|
|
}
|
|
}
|