Files
git.stella-ops.org/docs/router/08-Step.md
2025-12-02 18:38:32 +02:00

14 KiB
Raw Blame History

For this step youre teaching the system to handle streams instead of always buffering, and to enforce payload limits so the gateway cant be DoSd by large uploads. Still only using the InMemory transport.

Goal state:

  • Gateway can stream HTTP request/response bodies to/from microservice without buffering everything.
  • Gateway enforces percall and global/inflight payload limits.
  • Microservice sees a Stream on RawRequestContext.Body and reads from it.
  • All of this works over the existing InMemory “connection”.

Ill break it into concrete tasks.


0. Preconditions

Make sure you already have:

  • Minimal InMemory routing working:

    • HTTP → gateway → InMemory → microservice → InMemory → HTTP.
  • Cancellation wired (step 7):

    • FrameType.Cancel.
    • ITransportClient.SendCancelAsync implemented for InMemory.
    • Microservice uses CancellationToken in RawRequestContext.

Then layer streaming & limits on top.


1. Confirm / finalize Common primitives for streaming & limits

Project: StellaOps.Router.Common

Tasks:

  1. Ensure FrameType has:

    public enum FrameType : byte
    {
        Hello = 1,
        Heartbeat = 2,
        EndpointsUpdate = 3,
        Request = 4,
        RequestStreamData = 5,
        Response = 6,
        ResponseStreamData = 7,
        Cancel = 8
    }
    

    You may not use RequestStreamData / ResponseStreamData in InMemory implementation initially if you choose the bridging approach, but having them defined keeps the model coherent.

  2. Ensure EndpointDescriptor has:

    public bool SupportsStreaming { get; init; }
    
  3. Ensure PayloadLimits type exists (in Common or Config, but referenced by both):

    public sealed class PayloadLimits
    {
        public long MaxRequestBytesPerCall { get; set; }          // per HTTP request
        public long MaxRequestBytesPerConnection { get; set; }    // per microservice connection
        public long MaxAggregateInflightBytes { get; set; }       // across all requests
    }
    
  4. ITransportClient already contains:

    Task SendStreamingAsync(
        ConnectionState connection,
        Frame requestHeader,
        Stream requestBody,
        Func<Stream, Task> readResponseBody,
        PayloadLimits limits,
        CancellationToken ct);
    

    If not, add it now (implementation will be InMemory-only for this step).

No logic in Common; just shapes.


2. Gateway: payload budget tracker

You need a small service in the gateway that tracks inflight bytes to enforce limits.

Project: StellaOps.Gateway.WebService

2.1 Define a budget interface

public interface IPayloadBudget
{
    bool TryReserve(string connectionId, Guid requestId, long bytes);
    void Release(string connectionId, Guid requestId, long bytes);
}

2.2 Implement a simple in-memory tracker

Implementation outline:

  • Track:

    • long _globalInflightBytes.
    • Dictionary<string,long> _perConnectionInflightBytes.
    • Dictionary<Guid,long> _perRequestInflightBytes.

All updated under a lock or ConcurrentDictionary + Interlocked.

Logic for TryReserve:

  • Compute proposed:

    • newGlobal = _globalInflightBytes + bytes
    • newConn = perConnection[connectionId] + bytes
    • newReq = perRequest[requestId] + bytes
  • If any exceed configured limits (PayloadLimits from config), return false.

  • Else:

    • Commit updates and return true.

Release subtracts the bytes, never going below zero.

Register in DI:

services.AddSingleton<IPayloadBudget, PayloadBudget>();

3. Gateway: choose buffered vs streaming path

Extend TransportDispatchMiddleware to branch on mode.

Project: StellaOps.Gateway.WebService

3.1 Decide mode

At the start of the middleware:

var decision = (RoutingDecision)context.Items[RouterHttpContextKeys.RoutingDecision]!;
var endpoint = decision.Endpoint;
var limits = _options.Value.PayloadLimits; // from RouterConfig

var supportsStreaming = endpoint.SupportsStreaming;
var hasKnownLength = context.Request.ContentLength.HasValue;
var contentLength = context.Request.ContentLength ?? -1;

// Simple rule for now:
var useStreaming =
    supportsStreaming &&
    (!hasKnownLength || contentLength > limits.MaxRequestBytesPerCall);
  • If useStreaming == false:

    • Use buffered path with hard size checks.
  • If useStreaming == true:

    • Use streaming path (ITransportClient.SendStreamingAsync).

4. Gateway: buffered path with limits

Still in TransportDispatchMiddleware

4.1 Early 413 check

When supportsStreaming == false:

  1. If Content-Length known and:

    if (hasKnownLength && contentLength > limits.MaxRequestBytesPerCall)
    {
        context.Response.StatusCode = StatusCodes.Status413PayloadTooLarge;
        return;
    }
    
  2. When reading body into memory:

    • Read in chunks.
    • Track bytesReadThisCall.
    • If bytesReadThisCall > limits.MaxRequestBytesPerCall, abort and return 413.

You dont have to call IPayloadBudget for buffered mode yet; you can, but the hard per-call limit already protects RAM for this step.

Buffered path then proceeds as before:

  • Build MinimalRequestPayload with full body.
  • Send via SendRequestAsync.
  • Map response.

5. Gateway: streaming path (InMemory)

This is the new part.

5.1 Use ITransportClient.SendStreamingAsync

In the useStreaming == true branch:

var correlationId = Guid.NewGuid();

var headerPayload = new MinimalRequestPayload
{
    Method = context.Request.Method,
    Path = context.Request.Path.ToString(),
    Headers = ExtractHeaders(context.Request),
    Body = Array.Empty<byte>(),    // streaming body will follow
    IsStreaming = true             // add this flag to your payload DTO
};

var headerFrame = new Frame
{
    Type = FrameType.Request,
    CorrelationId = correlationId,
    Payload = SerializeRequestPayload(headerPayload)
};

using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(context.RequestAborted);
linkedCts.CancelAfter(decision.EffectiveTimeout);

// register cancel → SendCancelAsync (already done in step 7)

await _transportClient.SendStreamingAsync(
    decision.Connection,
    headerFrame,
    context.Request.Body,
    async responseBodyStream =>
    {
        // Copy microservice stream directly to HTTP response
        await responseBodyStream.CopyToAsync(context.Response.Body, linkedCts.Token);
    },
    limits,
    linkedCts.Token);

Key points:

  • Streaming path does not buffer the whole body.
  • Limits and cancellation are enforced inside SendStreamingAsync.

6. InMemory transport: streaming implementation

Project: gateway side InMemory ITransportClient implementation and InMemory router hub; microservice side connection.

For InMemory, you can model streaming via bridged streams: a producer/consumer pair in memory.

6.1 Add streaming call to InMemory client

In InMemoryTransportClient:

public async Task SendStreamingAsync(
    ConnectionState connection,
    Frame requestHeader,
    Stream httpRequestBody,
    Func<Stream, Task> readResponseBody,
    PayloadLimits limits,
    CancellationToken ct)
{
    await _hub.StreamFromGatewayAsync(
        connection.ConnectionId,
        requestHeader,
        httpRequestBody,
        readResponseBody,
        limits,
        ct);
}

Expose StreamFromGatewayAsync on IInMemoryRouterHub:

Task StreamFromGatewayAsync(
    string connectionId,
    Frame requestHeader,
    Stream requestBody,
    Func<Stream, Task> readResponseBody,
    PayloadLimits limits,
    CancellationToken ct);

6.2 InMemory hub streaming strategy (bridging style)

Inside StreamFromGatewayAsync:

  1. Create a pair of connected streams for request body:

    • e.g., a custom ProducerConsumerStream built on a Channel<byte[]> or System.IO.Pipelines.
    • “Producer” side (writer) will be fed from HTTP.
    • “Consumer” side will be given to the microservice as RawRequestContext.Body.
  2. Create a pair of connected streams for response body:

    • “Consumer” side will be used in readResponseBody to write to HTTP.
    • “Producer” side will be given to the microservice handler to write response body.
  3. On the microservice side:

    • Build a RawRequestContext with Body = requestBodyConsumerStream and CancellationToken = ct.
    • Dispatch to the endpoint handler as usual.
    • Have the handlers RawResponse.WriteBodyAsync pointed at responseBodyProducerStream.
  4. Parallel tasks:

    • Task 1: Copy HTTP → requestBodyProducerStream in chunks, enforcing PayloadLimits (see next section).
    • Task 2: Execute the handler, which reads from Body and writes to responseBodyProducerStream.
    • Task 3: Copy responseBodyConsumerStream → HTTP via readResponseBody.
  5. Propagate cancellation:

    • If ct is canceled (client disconnect/timeout/payload limit breach):

      • Stop HTTP→requestBody copy.
      • Signal stream completion / cancellation to handler.
    • Handler should see cancellation via CancellationToken.

Because this is InMemory, you dont have to materialize explicit RequestStreamData frames; you only need the behavior. Real transports will implement the same semantics with actual frames.


7. Enforce payload limits in streaming copy

Still in StreamFromGatewayAsync / InMemory side:

7.1 HTTP → microservice copy with budget

In Task 1:

var buffer = new byte[64 * 1024];
int read;
var requestId = requestHeader.CorrelationId;
var connectionId = connectionIdFromArgs;

while ((read = await httpRequestBody.ReadAsync(buffer, 0, buffer.Length, ct)) > 0)
{
    if (!_budget.TryReserve(connectionId, requestId, read))
    {
        // Limit exceeded: signal failure
        await _cancelCallback?.Invoke(requestId, "PayloadLimitExceeded"); // or call SendCancelAsync
        break;
    }

    await requestBodyProducerStream.WriteAsync(buffer.AsMemory(0, read), ct);
}

// After loop, ensure we release whatever was reserved
_budget.Release(connectionId, requestId, totalBytesReserved);
await requestBodyProducerStream.FlushAsync(ct);
await requestBodyProducerStream.DisposeAsync();

If TryReserve fails:

  • Stop reading further bytes.

  • Trigger cancellation downstream:

    • Either call the existing SendCancelAsync path.
    • Or signal completion with error and let handler catch cancellation.

Gateway side should then translate this into 413 or 503 to the client.

7.2 Response copy

Response path doesnt need budget tracking (the danger is inbound to gateway); but if you want symmetry, you can also enforce a max outbound size.

For now, just stream microservice → HTTP through readResponseBody until EOF or cancellation.


8. Microservice side: streaming-aware RawRequestContext.Body

Your streaming bridging already gives the handler a Stream that reads what the gateway sends:

  • No changes required in handler interfaces.

  • You only need to ensure:

    • RawRequestContext.Body may be non-seekable.
    • Handlers know they must treat it as a forward-only stream.

Guidance for devs in Microservice.md:

  • For binary uploads or large files, implement IRawStellaEndpoint and read incrementally:

    [StellaEndpoint("POST", "/billing/invoices/upload")]
    public sealed class InvoiceUploadEndpoint : IRawStellaEndpoint
    {
        public async Task<RawResponse> HandleAsync(RawRequestContext ctx)
        {
            var buffer = new byte[64 * 1024];
            int read;
            while ((read = await ctx.Body.ReadAsync(buffer.AsMemory(0, buffer.Length), ctx.CancellationToken)) > 0)
            {
                // Process chunk
            }
    
            return new RawResponse { StatusCode = 204 };
        }
    }
    

9. Tests

Scope: still InMemory, but now streaming & limits.

9.1 Streaming happy path

  • Setup:

    • Endpoint with SupportsStreaming = true.

    • IRawStellaEndpoint that:

      • Counts total bytes read from ctx.Body.
      • Returns 200.
  • Test:

    • Send an HTTP POST with a body larger than MaxRequestBytesPerCall, but with streaming enabled.

    • Assert:

      • Gateway does not buffer entire body in one array (you can assert via instrumentation or at least confirm no 413).
      • Handler sees the full number of bytes.
      • Response is 200.

9.2 Per-call limit breach

  • Configure:

    • SupportsStreaming = false (or use streaming but set low MaxRequestBytesPerCall).
  • Test:

    • Send a body larger than limit.

    • Assert:

      • Gateway responds 413.
      • Handler is not invoked at all.

9.3 Global/in-flight limit breach

  • Configure:

    • MaxAggregateInflightBytes very low (e.g. 1 MB).
  • Test:

    • Start multiple concurrent streaming requests that each try to send more than the allowed total.

    • Assert:

      • Some of them get a CANCEL / error (413 or 503).
      • IPayloadBudget denies reservations and releases resources correctly.

10. Done criteria for “Add streaming & payload limits (InMemory)”

Youre done with this step when:

  • Gateway:

    • Chooses buffered vs streaming based on EndpointDescriptor.SupportsStreaming and size.
    • Enforces MaxRequestBytesPerCall for buffered requests (413 on violation).
    • Uses ITransportClient.SendStreamingAsync for streaming.
    • Has an IPayloadBudget preventing excessive in-flight payload accumulation.
  • InMemory transport:

    • Implements SendStreamingAsync by bridging HTTP streams to microservice handlers and back.
    • Enforces payload limits while copying.
  • Microservice:

    • Receives a functional Stream in RawRequestContext.Body.
    • Can implement IRawStellaEndpoint that reads incrementally for large payloads.
  • Tests:

    • Demonstrate a streaming endpoint works for large payloads.
    • Demonstrate per-call and aggregate limits are respected and cause rejections/cancellations.

After this, you can reuse the same semantics when you implement real transports (TCP/TLS/RabbitMQ), with InMemory as your reference implementation.