using System.Text.Json;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using StellaOps.Microservice.Validation;
using StellaOps.Router.Common.Frames;
using StellaOps.Router.Common.Models;
namespace StellaOps.Microservice;
///
/// Dispatches incoming REQUEST frames to the appropriate endpoint handlers.
///
public sealed class RequestDispatcher
{
private readonly IEndpointRegistry _registry;
private readonly IServiceProvider _serviceProvider;
private readonly ILogger _logger;
private readonly JsonSerializerOptions _jsonOptions;
private readonly ISchemaRegistry? _schemaRegistry;
private readonly IRequestSchemaValidator? _schemaValidator;
///
/// Initializes a new instance of the class.
///
/// The endpoint registry.
/// The service provider for resolving handlers.
/// The logger.
/// Optional schema registry for validation.
/// Optional schema validator.
/// Optional JSON serialization options.
public RequestDispatcher(
IEndpointRegistry registry,
IServiceProvider serviceProvider,
ILogger logger,
ISchemaRegistry? schemaRegistry = null,
IRequestSchemaValidator? schemaValidator = null,
JsonSerializerOptions? jsonOptions = null)
{
_registry = registry;
_serviceProvider = serviceProvider;
_logger = logger;
_schemaRegistry = schemaRegistry;
_schemaValidator = schemaValidator;
_jsonOptions = jsonOptions ?? new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
PropertyNameCaseInsensitive = true
};
}
///
/// Dispatches a REQUEST frame and returns a RESPONSE frame.
///
/// The incoming REQUEST frame.
/// Cancellation token.
/// The RESPONSE frame.
public async Task DispatchAsync(RequestFrame request, CancellationToken cancellationToken)
{
_logger.LogDebug(
"Dispatching request {RequestId}: {Method} {Path}",
request.RequestId,
request.Method,
request.Path);
try
{
// Find matching endpoint
if (!_registry.TryMatch(request.Method, request.Path, out var match) || match is null)
{
_logger.LogWarning(
"No endpoint found for {Method} {Path}",
request.Method,
request.Path);
return CreateErrorResponse(request.RequestId, 404, "Not Found");
}
// Create request context
var context = CreateRequestContext(request, match.PathParameters);
// Resolve and invoke handler within a scope
RawResponse response;
await using (var scope = _serviceProvider.CreateAsyncScope())
{
response = await InvokeHandlerAsync(scope.ServiceProvider, match.Endpoint, context, cancellationToken);
}
// Convert to response frame
return await CreateResponseFrameAsync(request.RequestId, response, cancellationToken);
}
catch (OperationCanceledException)
{
_logger.LogInformation("Request {RequestId} was cancelled", request.RequestId);
throw;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error dispatching request {RequestId}", request.RequestId);
return CreateErrorResponse(request.RequestId, 500, "Internal Server Error");
}
}
private RawRequestContext CreateRequestContext(RequestFrame request, IReadOnlyDictionary pathParameters)
{
var headers = new HeaderCollection();
foreach (var (key, value) in request.Headers)
{
headers.Add(key, value);
}
return new RawRequestContext
{
Method = request.Method,
Path = request.Path,
PathParameters = pathParameters,
Headers = headers,
Body = new MemoryStream(request.Payload.ToArray()),
CancellationToken = CancellationToken.None, // Will be overridden by caller
CorrelationId = request.CorrelationId
};
}
private async Task InvokeHandlerAsync(
IServiceProvider scopedProvider,
EndpointDescriptor endpoint,
RawRequestContext context,
CancellationToken cancellationToken)
{
// Ensure handler type is set
if (endpoint.HandlerType is null)
{
_logger.LogError("Endpoint {Method} {Path} has no handler type", endpoint.Method, endpoint.Path);
return RawResponse.InternalError("No handler configured");
}
// Get handler instance from DI
var handler = scopedProvider.GetService(endpoint.HandlerType);
if (handler is null)
{
_logger.LogError(
"Handler type {HandlerType} not registered in DI container",
endpoint.HandlerType.Name);
return RawResponse.InternalError("Handler not found");
}
// Determine handler type and invoke
if (handler is IRawStellaEndpoint rawHandler)
{
return await rawHandler.HandleAsync(context, cancellationToken);
}
// Check for typed handlers - need to use reflection to invoke generic adapter
var handlerInterfaces = endpoint.HandlerType.GetInterfaces();
// Check for IStellaEndpoint
var typedWithRequestInterface = handlerInterfaces
.FirstOrDefault(i => i.IsGenericType &&
i.GetGenericTypeDefinition() == typeof(IStellaEndpoint<,>));
if (typedWithRequestInterface is not null)
{
var typeArgs = typedWithRequestInterface.GetGenericArguments();
return await InvokeTypedHandlerWithRequestAsync(handler, typeArgs[0], typeArgs[1], context, cancellationToken);
}
// Check for IStellaEndpoint
var typedNoRequestInterface = handlerInterfaces
.FirstOrDefault(i => i.IsGenericType &&
i.GetGenericTypeDefinition() == typeof(IStellaEndpoint<>));
if (typedNoRequestInterface is not null)
{
var typeArgs = typedNoRequestInterface.GetGenericArguments();
return await InvokeTypedHandlerNoRequestAsync(handler, typeArgs[0], context, cancellationToken);
}
_logger.LogError(
"Handler type {HandlerType} does not implement a supported endpoint interface",
endpoint.HandlerType.Name);
return RawResponse.InternalError("Invalid handler type");
}
private async Task InvokeTypedHandlerWithRequestAsync(
object handler,
Type requestType,
Type responseType,
RawRequestContext context,
CancellationToken cancellationToken)
{
try
{
// Schema validation (before deserialization)
if (_schemaRegistry is not null && _schemaValidator is not null &&
_schemaRegistry.HasSchema(context.Method, context.Path, SchemaDirection.Request))
{
if (context.Body == Stream.Null || context.Body.Length == 0)
{
return ValidationProblemDetails.Create(
context.Method,
context.Path,
SchemaDirection.Request,
[new SchemaValidationError("/", "#", "Request body is required", "required")],
context.CorrelationId
).ToRawResponse();
}
context.Body.Position = 0;
JsonDocument doc;
try
{
doc = await JsonDocument.ParseAsync(context.Body, cancellationToken: cancellationToken);
}
catch (JsonException ex)
{
return ValidationProblemDetails.Create(
context.Method,
context.Path,
SchemaDirection.Request,
[new SchemaValidationError("/", "#", $"Invalid JSON: {ex.Message}", "json")],
context.CorrelationId
).ToRawResponse();
}
var schema = _schemaRegistry.GetRequestSchema(context.Method, context.Path);
if (schema is not null && !_schemaValidator.TryValidate(doc, schema, out var errors))
{
_logger.LogInformation(
"Schema validation failed for {Method} {Path}: {ErrorCount} errors",
context.Method,
context.Path,
errors.Count);
return ValidationProblemDetails.Create(
context.Method,
context.Path,
SchemaDirection.Request,
errors,
context.CorrelationId
).ToRawResponse();
}
// Reset stream for deserialization
context.Body.Position = 0;
}
// Deserialize request
object? request;
if (context.Body == Stream.Null || context.Body.Length == 0)
{
request = null;
}
else
{
context.Body.Position = 0;
request = await JsonSerializer.DeserializeAsync(context.Body, requestType, _jsonOptions, cancellationToken);
}
if (request is null)
{
return RawResponse.BadRequest("Invalid request body");
}
// Get HandleAsync method
var handleMethod = handler.GetType().GetMethod("HandleAsync", [requestType, typeof(CancellationToken)]);
if (handleMethod is null)
{
return RawResponse.InternalError("Handler method not found");
}
// Invoke handler
var resultTask = handleMethod.Invoke(handler, [request, cancellationToken]);
if (resultTask is null)
{
return RawResponse.InternalError("Handler returned null");
}
await (Task)resultTask;
// Get result from Task
var resultProperty = resultTask.GetType().GetProperty("Result");
var response = resultProperty?.GetValue(resultTask);
return SerializeResponse(response, responseType);
}
catch (JsonException ex)
{
return RawResponse.BadRequest($"Invalid JSON: {ex.Message}");
}
}
private async Task InvokeTypedHandlerNoRequestAsync(
object handler,
Type responseType,
RawRequestContext context,
CancellationToken cancellationToken)
{
try
{
// Get HandleAsync method
var handleMethod = handler.GetType().GetMethod("HandleAsync", [typeof(CancellationToken)]);
if (handleMethod is null)
{
return RawResponse.InternalError("Handler method not found");
}
// Invoke handler
var resultTask = handleMethod.Invoke(handler, [cancellationToken]);
if (resultTask is null)
{
return RawResponse.InternalError("Handler returned null");
}
await (Task)resultTask;
// Get result from Task
var resultProperty = resultTask.GetType().GetProperty("Result");
var response = resultProperty?.GetValue(resultTask);
return SerializeResponse(response, responseType);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
return RawResponse.InternalError(ex.Message);
}
}
private RawResponse SerializeResponse(object? response, Type responseType)
{
var json = JsonSerializer.SerializeToUtf8Bytes(response, responseType, _jsonOptions);
var headers = new HeaderCollection();
headers.Set("Content-Type", "application/json; charset=utf-8");
return new RawResponse
{
StatusCode = 200,
Headers = headers,
Body = new MemoryStream(json)
};
}
private static ResponseFrame CreateErrorResponse(string requestId, int statusCode, string message)
{
var headers = new HeaderCollection();
headers.Set("Content-Type", "text/plain; charset=utf-8");
return new ResponseFrame
{
RequestId = requestId,
StatusCode = statusCode,
Headers = headers.ToDictionary(h => h.Key, h => h.Value),
Payload = System.Text.Encoding.UTF8.GetBytes(message)
};
}
private static async Task CreateResponseFrameAsync(
string requestId,
RawResponse response,
CancellationToken cancellationToken)
{
// Read response body
byte[] payload;
if (response.Body == Stream.Null)
{
payload = [];
}
else
{
using var ms = new MemoryStream();
response.Body.Position = 0;
await response.Body.CopyToAsync(ms, cancellationToken);
payload = ms.ToArray();
}
return new ResponseFrame
{
RequestId = requestId,
StatusCode = response.StatusCode,
Headers = response.Headers.ToDictionary(h => h.Key, h => h.Value),
Payload = payload
};
}
}