diff --git a/src/Cli/StellaOps.Cli/Telemetry/CliMetrics.cs b/src/Cli/StellaOps.Cli/Telemetry/CliMetrics.cs index a82af1d2c..d30df7f9c 100644 --- a/src/Cli/StellaOps.Cli/Telemetry/CliMetrics.cs +++ b/src/Cli/StellaOps.Cli/Telemetry/CliMetrics.cs @@ -106,40 +106,28 @@ internal static class CliMetrics new("outcome", string.IsNullOrWhiteSpace(outcome) ? "unknown" : outcome))); public static void RecordPolicyFindingsList(string outcome) - => PolicyFindingsListCounter.Add(1, new KeyValuePair[] - { - new("outcome", string.IsNullOrWhiteSpace(outcome) ? "unknown" : outcome) - }); + => PolicyFindingsListCounter.Add(1, WithSealedModeTag( + new("outcome", string.IsNullOrWhiteSpace(outcome) ? "unknown" : outcome))); public static void RecordPolicyFindingsGet(string outcome) - => PolicyFindingsGetCounter.Add(1, new KeyValuePair[] - { - new("outcome", string.IsNullOrWhiteSpace(outcome) ? "unknown" : outcome) - }); + => PolicyFindingsGetCounter.Add(1, WithSealedModeTag( + new("outcome", string.IsNullOrWhiteSpace(outcome) ? "unknown" : outcome))); public static void RecordPolicyFindingsExplain(string outcome) - => PolicyFindingsExplainCounter.Add(1, new KeyValuePair[] - { - new("outcome", string.IsNullOrWhiteSpace(outcome) ? "unknown" : outcome) - }); + => PolicyFindingsExplainCounter.Add(1, WithSealedModeTag( + new("outcome", string.IsNullOrWhiteSpace(outcome) ? "unknown" : outcome))); public static void RecordNodeLockValidate(string outcome) - => NodeLockValidateCounter.Add(1, new KeyValuePair[] - { - new("outcome", string.IsNullOrWhiteSpace(outcome) ? "unknown" : outcome) - }); + => NodeLockValidateCounter.Add(1, WithSealedModeTag( + new("outcome", string.IsNullOrWhiteSpace(outcome) ? "unknown" : outcome))); public static void RecordPythonLockValidate(string outcome) - => PythonLockValidateCounter.Add(1, new KeyValuePair[] - { - new("outcome", string.IsNullOrWhiteSpace(outcome) ? "unknown" : outcome) - }); + => PythonLockValidateCounter.Add(1, WithSealedModeTag( + new("outcome", string.IsNullOrWhiteSpace(outcome) ? "unknown" : outcome))); public static void RecordJavaLockValidate(string outcome) - => JavaLockValidateCounter.Add(1, new KeyValuePair[] - { - new("outcome", string.IsNullOrWhiteSpace(outcome) ? "unknown" : outcome) - }); + => JavaLockValidateCounter.Add(1, WithSealedModeTag( + new("outcome", string.IsNullOrWhiteSpace(outcome) ? "unknown" : outcome))); public static void RecordRubyInspect(string outcome) => RubyInspectCounter.Add(1, new KeyValuePair[] diff --git a/src/__Libraries/StellaOps.Microservice/RequestDispatcher.cs b/src/__Libraries/StellaOps.Microservice/RequestDispatcher.cs new file mode 100644 index 000000000..389993c31 --- /dev/null +++ b/src/__Libraries/StellaOps.Microservice/RequestDispatcher.cs @@ -0,0 +1,311 @@ +using System.Text.Json; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using StellaOps.Router.Common.Frames; + +namespace StellaOps.Microservice; + +/// +/// Dispatches incoming REQUEST frames to the appropriate endpoint handlers. +/// +public sealed class RequestDispatcher +{ + private readonly IEndpointRegistry _registry; + private readonly IServiceProvider _serviceProvider; + private readonly ILogger _logger; + private readonly JsonSerializerOptions _jsonOptions; + + /// + /// Initializes a new instance of the class. + /// + /// The endpoint registry. + /// The service provider for resolving handlers. + /// The logger. + /// Optional JSON serialization options. + public RequestDispatcher( + IEndpointRegistry registry, + IServiceProvider serviceProvider, + ILogger logger, + JsonSerializerOptions? jsonOptions = null) + { + _registry = registry; + _serviceProvider = serviceProvider; + _logger = logger; + _jsonOptions = jsonOptions ?? new JsonSerializerOptions + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + PropertyNameCaseInsensitive = true + }; + } + + /// + /// Dispatches a REQUEST frame and returns a RESPONSE frame. + /// + /// The incoming REQUEST frame. + /// Cancellation token. + /// The RESPONSE frame. + public async Task DispatchAsync(RequestFrame request, CancellationToken cancellationToken) + { + _logger.LogDebug( + "Dispatching request {RequestId}: {Method} {Path}", + request.RequestId, + request.Method, + request.Path); + + try + { + // Find matching endpoint + if (!_registry.TryMatch(request.Method, request.Path, out var match) || match is null) + { + _logger.LogWarning( + "No endpoint found for {Method} {Path}", + request.Method, + request.Path); + + return CreateErrorResponse(request.RequestId, 404, "Not Found"); + } + + // Create request context + var context = CreateRequestContext(request, match.PathParameters); + + // Resolve and invoke handler within a scope + RawResponse response; + await using (var scope = _serviceProvider.CreateAsyncScope()) + { + response = await InvokeHandlerAsync(scope.ServiceProvider, match.Endpoint, context, cancellationToken); + } + + // Convert to response frame + return await CreateResponseFrameAsync(request.RequestId, response, cancellationToken); + } + catch (OperationCanceledException) + { + _logger.LogInformation("Request {RequestId} was cancelled", request.RequestId); + throw; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error dispatching request {RequestId}", request.RequestId); + return CreateErrorResponse(request.RequestId, 500, "Internal Server Error"); + } + } + + private RawRequestContext CreateRequestContext(RequestFrame request, IReadOnlyDictionary pathParameters) + { + var headers = new HeaderCollection(); + foreach (var (key, value) in request.Headers) + { + headers.Add(key, value); + } + + return new RawRequestContext + { + Method = request.Method, + Path = request.Path, + PathParameters = pathParameters, + Headers = headers, + Body = new MemoryStream(request.Payload.ToArray()), + CancellationToken = CancellationToken.None, // Will be overridden by caller + CorrelationId = request.CorrelationId + }; + } + + private async Task InvokeHandlerAsync( + IServiceProvider scopedProvider, + EndpointDescriptor endpoint, + RawRequestContext context, + CancellationToken cancellationToken) + { + // Get handler instance from DI + var handler = scopedProvider.GetService(endpoint.HandlerType); + if (handler is null) + { + _logger.LogError( + "Handler type {HandlerType} not registered in DI container", + endpoint.HandlerType.Name); + return RawResponse.InternalError("Handler not found"); + } + + // Determine handler type and invoke + if (handler is IRawStellaEndpoint rawHandler) + { + return await rawHandler.HandleAsync(context, cancellationToken); + } + + // Check for typed handlers - need to use reflection to invoke generic adapter + var handlerInterfaces = endpoint.HandlerType.GetInterfaces(); + + // Check for IStellaEndpoint + var typedWithRequestInterface = handlerInterfaces + .FirstOrDefault(i => i.IsGenericType && + i.GetGenericTypeDefinition() == typeof(IStellaEndpoint<,>)); + + if (typedWithRequestInterface is not null) + { + var typeArgs = typedWithRequestInterface.GetGenericArguments(); + return await InvokeTypedHandlerWithRequestAsync(handler, typeArgs[0], typeArgs[1], context, cancellationToken); + } + + // Check for IStellaEndpoint + var typedNoRequestInterface = handlerInterfaces + .FirstOrDefault(i => i.IsGenericType && + i.GetGenericTypeDefinition() == typeof(IStellaEndpoint<>)); + + if (typedNoRequestInterface is not null) + { + var typeArgs = typedNoRequestInterface.GetGenericArguments(); + return await InvokeTypedHandlerNoRequestAsync(handler, typeArgs[0], context, cancellationToken); + } + + _logger.LogError( + "Handler type {HandlerType} does not implement a supported endpoint interface", + endpoint.HandlerType.Name); + return RawResponse.InternalError("Invalid handler type"); + } + + private async Task InvokeTypedHandlerWithRequestAsync( + object handler, + Type requestType, + Type responseType, + RawRequestContext context, + CancellationToken cancellationToken) + { + try + { + // Deserialize request + object? request; + if (context.Body == Stream.Null || context.Body.Length == 0) + { + request = null; + } + else + { + context.Body.Position = 0; + request = await JsonSerializer.DeserializeAsync(context.Body, requestType, _jsonOptions, cancellationToken); + } + + if (request is null) + { + return RawResponse.BadRequest("Invalid request body"); + } + + // Get HandleAsync method + var handleMethod = handler.GetType().GetMethod("HandleAsync", [requestType, typeof(CancellationToken)]); + if (handleMethod is null) + { + return RawResponse.InternalError("Handler method not found"); + } + + // Invoke handler + var resultTask = handleMethod.Invoke(handler, [request, cancellationToken]); + if (resultTask is null) + { + return RawResponse.InternalError("Handler returned null"); + } + + await (Task)resultTask; + + // Get result from Task + var resultProperty = resultTask.GetType().GetProperty("Result"); + var response = resultProperty?.GetValue(resultTask); + + return SerializeResponse(response, responseType); + } + catch (JsonException ex) + { + return RawResponse.BadRequest($"Invalid JSON: {ex.Message}"); + } + } + + private async Task InvokeTypedHandlerNoRequestAsync( + object handler, + Type responseType, + RawRequestContext context, + CancellationToken cancellationToken) + { + try + { + // Get HandleAsync method + var handleMethod = handler.GetType().GetMethod("HandleAsync", [typeof(CancellationToken)]); + if (handleMethod is null) + { + return RawResponse.InternalError("Handler method not found"); + } + + // Invoke handler + var resultTask = handleMethod.Invoke(handler, [cancellationToken]); + if (resultTask is null) + { + return RawResponse.InternalError("Handler returned null"); + } + + await (Task)resultTask; + + // Get result from Task + var resultProperty = resultTask.GetType().GetProperty("Result"); + var response = resultProperty?.GetValue(resultTask); + + return SerializeResponse(response, responseType); + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + return RawResponse.InternalError(ex.Message); + } + } + + private RawResponse SerializeResponse(object? response, Type responseType) + { + var json = JsonSerializer.SerializeToUtf8Bytes(response, responseType, _jsonOptions); + var headers = new HeaderCollection(); + headers.Set("Content-Type", "application/json; charset=utf-8"); + + return new RawResponse + { + StatusCode = 200, + Headers = headers, + Body = new MemoryStream(json) + }; + } + + private static ResponseFrame CreateErrorResponse(string requestId, int statusCode, string message) + { + var headers = new HeaderCollection(); + headers.Set("Content-Type", "text/plain; charset=utf-8"); + + return new ResponseFrame + { + RequestId = requestId, + StatusCode = statusCode, + Headers = headers.ToDictionary(h => h.Key, h => h.Value), + Payload = System.Text.Encoding.UTF8.GetBytes(message) + }; + } + + private static async Task CreateResponseFrameAsync( + string requestId, + RawResponse response, + CancellationToken cancellationToken) + { + // Read response body + byte[] payload; + if (response.Body == Stream.Null) + { + payload = []; + } + else + { + using var ms = new MemoryStream(); + response.Body.Position = 0; + await response.Body.CopyToAsync(ms, cancellationToken); + payload = ms.ToArray(); + } + + return new ResponseFrame + { + RequestId = requestId, + StatusCode = response.StatusCode, + Headers = response.Headers.ToDictionary(h => h.Key, h => h.Value), + Payload = payload + }; + } +}