14 KiB
For this step you’re teaching the system to handle streams instead of always buffering, and to enforce payload limits so the gateway can’t be DoS’d by large uploads. Still only using the InMemory transport.
Goal state:
- Gateway can stream HTTP request/response bodies to/from microservice without buffering everything.
- Gateway enforces per‑call and global/in‑flight payload limits.
- Microservice sees a
StreamonRawRequestContext.Bodyand reads from it. - All of this works over the existing InMemory “connection”.
I’ll break it into concrete tasks.
0. Preconditions
Make sure you already have:
-
Minimal InMemory routing working:
- HTTP → gateway → InMemory → microservice → InMemory → HTTP.
-
Cancellation wired (step 7):
FrameType.Cancel.ITransportClient.SendCancelAsyncimplemented for InMemory.- Microservice uses
CancellationTokeninRawRequestContext.
Then layer streaming & limits on top.
1. Confirm / finalize Common primitives for streaming & limits
Project: StellaOps.Router.Common
Tasks:
-
Ensure
FrameTypehas:public enum FrameType : byte { Hello = 1, Heartbeat = 2, EndpointsUpdate = 3, Request = 4, RequestStreamData = 5, Response = 6, ResponseStreamData = 7, Cancel = 8 }You may not use
RequestStreamData/ResponseStreamDatain InMemory implementation initially if you choose the bridging approach, but having them defined keeps the model coherent. -
Ensure
EndpointDescriptorhas:public bool SupportsStreaming { get; init; } -
Ensure
PayloadLimitstype exists (in Common or Config, but referenced by both):public sealed class PayloadLimits { public long MaxRequestBytesPerCall { get; set; } // per HTTP request public long MaxRequestBytesPerConnection { get; set; } // per microservice connection public long MaxAggregateInflightBytes { get; set; } // across all requests } -
ITransportClientalready contains:Task SendStreamingAsync( ConnectionState connection, Frame requestHeader, Stream requestBody, Func<Stream, Task> readResponseBody, PayloadLimits limits, CancellationToken ct);If not, add it now (implementation will be InMemory-only for this step).
No logic in Common; just shapes.
2. Gateway: payload budget tracker
You need a small service in the gateway that tracks in‑flight bytes to enforce limits.
Project: StellaOps.Gateway.WebService
2.1 Define a budget interface
public interface IPayloadBudget
{
bool TryReserve(string connectionId, Guid requestId, long bytes);
void Release(string connectionId, Guid requestId, long bytes);
}
2.2 Implement a simple in-memory tracker
Implementation outline:
-
Track:
long _globalInflightBytes.Dictionary<string,long> _perConnectionInflightBytes.Dictionary<Guid,long> _perRequestInflightBytes.
All updated under a lock or ConcurrentDictionary + Interlocked.
Logic for TryReserve:
-
Compute proposed:
newGlobal = _globalInflightBytes + bytesnewConn = perConnection[connectionId] + bytesnewReq = perRequest[requestId] + bytes
-
If any exceed configured limits (
PayloadLimitsfrom config), returnfalse. -
Else:
- Commit updates and return
true.
- Commit updates and return
Release subtracts the bytes, never going below zero.
Register in DI:
services.AddSingleton<IPayloadBudget, PayloadBudget>();
3. Gateway: choose buffered vs streaming path
Extend TransportDispatchMiddleware to branch on mode.
Project: StellaOps.Gateway.WebService
3.1 Decide mode
At the start of the middleware:
var decision = (RoutingDecision)context.Items[RouterHttpContextKeys.RoutingDecision]!;
var endpoint = decision.Endpoint;
var limits = _options.Value.PayloadLimits; // from RouterConfig
var supportsStreaming = endpoint.SupportsStreaming;
var hasKnownLength = context.Request.ContentLength.HasValue;
var contentLength = context.Request.ContentLength ?? -1;
// Simple rule for now:
var useStreaming =
supportsStreaming &&
(!hasKnownLength || contentLength > limits.MaxRequestBytesPerCall);
-
If
useStreaming == false:- Use buffered path with hard size checks.
-
If
useStreaming == true:- Use streaming path (
ITransportClient.SendStreamingAsync).
- Use streaming path (
4. Gateway: buffered path with limits
Still in TransportDispatchMiddleware
4.1 Early 413 check
When supportsStreaming == false:
-
If
Content-Lengthknown and:if (hasKnownLength && contentLength > limits.MaxRequestBytesPerCall) { context.Response.StatusCode = StatusCodes.Status413PayloadTooLarge; return; } -
When reading body into memory:
- Read in chunks.
- Track
bytesReadThisCall. - If
bytesReadThisCall > limits.MaxRequestBytesPerCall, abort and return 413.
You don’t have to call IPayloadBudget for buffered mode yet; you can, but the hard per-call limit already protects RAM for this step.
Buffered path then proceeds as before:
- Build
MinimalRequestPayloadwith full body. - Send via
SendRequestAsync. - Map response.
5. Gateway: streaming path (InMemory)
This is the new part.
5.1 Use ITransportClient.SendStreamingAsync
In the useStreaming == true branch:
var correlationId = Guid.NewGuid();
var headerPayload = new MinimalRequestPayload
{
Method = context.Request.Method,
Path = context.Request.Path.ToString(),
Headers = ExtractHeaders(context.Request),
Body = Array.Empty<byte>(), // streaming body will follow
IsStreaming = true // add this flag to your payload DTO
};
var headerFrame = new Frame
{
Type = FrameType.Request,
CorrelationId = correlationId,
Payload = SerializeRequestPayload(headerPayload)
};
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(context.RequestAborted);
linkedCts.CancelAfter(decision.EffectiveTimeout);
// register cancel → SendCancelAsync (already done in step 7)
await _transportClient.SendStreamingAsync(
decision.Connection,
headerFrame,
context.Request.Body,
async responseBodyStream =>
{
// Copy microservice stream directly to HTTP response
await responseBodyStream.CopyToAsync(context.Response.Body, linkedCts.Token);
},
limits,
linkedCts.Token);
Key points:
- Streaming path does not buffer the whole body.
- Limits and cancellation are enforced inside
SendStreamingAsync.
6. InMemory transport: streaming implementation
Project: gateway side InMemory ITransportClient implementation and InMemory router hub; microservice side connection.
For InMemory, you can model streaming via bridged streams: a producer/consumer pair in memory.
6.1 Add streaming call to InMemory client
In InMemoryTransportClient:
public async Task SendStreamingAsync(
ConnectionState connection,
Frame requestHeader,
Stream httpRequestBody,
Func<Stream, Task> readResponseBody,
PayloadLimits limits,
CancellationToken ct)
{
await _hub.StreamFromGatewayAsync(
connection.ConnectionId,
requestHeader,
httpRequestBody,
readResponseBody,
limits,
ct);
}
Expose StreamFromGatewayAsync on IInMemoryRouterHub:
Task StreamFromGatewayAsync(
string connectionId,
Frame requestHeader,
Stream requestBody,
Func<Stream, Task> readResponseBody,
PayloadLimits limits,
CancellationToken ct);
6.2 InMemory hub streaming strategy (bridging style)
Inside StreamFromGatewayAsync:
-
Create a pair of connected streams for request body:
- e.g., a custom
ProducerConsumerStreambuilt on aChannel<byte[]>orSystem.IO.Pipelines. - “Producer” side (writer) will be fed from HTTP.
- “Consumer” side will be given to the microservice as
RawRequestContext.Body.
- e.g., a custom
-
Create a pair of connected streams for response body:
- “Consumer” side will be used in
readResponseBodyto write to HTTP. - “Producer” side will be given to the microservice handler to write response body.
- “Consumer” side will be used in
-
On the microservice side:
- Build a
RawRequestContextwithBody = requestBodyConsumerStreamandCancellationToken = ct. - Dispatch to the endpoint handler as usual.
- Have the handler’s
RawResponse.WriteBodyAsyncpointed atresponseBodyProducerStream.
- Build a
-
Parallel tasks:
- Task 1: Copy HTTP →
requestBodyProducerStreamin chunks, enforcingPayloadLimits(see next section). - Task 2: Execute the handler, which reads from
Bodyand writes toresponseBodyProducerStream. - Task 3: Copy
responseBodyConsumerStream→ HTTP viareadResponseBody.
- Task 1: Copy HTTP →
-
Propagate cancellation:
-
If
ctis canceled (client disconnect/timeout/payload limit breach):- Stop HTTP→requestBody copy.
- Signal stream completion / cancellation to handler.
-
Handler should see cancellation via
CancellationToken.
-
Because this is InMemory, you don’t have to materialize explicit RequestStreamData frames; you only need the behavior. Real transports will implement the same semantics with actual frames.
7. Enforce payload limits in streaming copy
Still in StreamFromGatewayAsync / InMemory side:
7.1 HTTP → microservice copy with budget
In Task 1:
var buffer = new byte[64 * 1024];
int read;
var requestId = requestHeader.CorrelationId;
var connectionId = connectionIdFromArgs;
while ((read = await httpRequestBody.ReadAsync(buffer, 0, buffer.Length, ct)) > 0)
{
if (!_budget.TryReserve(connectionId, requestId, read))
{
// Limit exceeded: signal failure
await _cancelCallback?.Invoke(requestId, "PayloadLimitExceeded"); // or call SendCancelAsync
break;
}
await requestBodyProducerStream.WriteAsync(buffer.AsMemory(0, read), ct);
}
// After loop, ensure we release whatever was reserved
_budget.Release(connectionId, requestId, totalBytesReserved);
await requestBodyProducerStream.FlushAsync(ct);
await requestBodyProducerStream.DisposeAsync();
If TryReserve fails:
-
Stop reading further bytes.
-
Trigger cancellation downstream:
- Either call the existing
SendCancelAsyncpath. - Or signal completion with error and let handler catch cancellation.
- Either call the existing
Gateway side should then translate this into 413 or 503 to the client.
7.2 Response copy
Response path doesn’t need budget tracking (the danger is inbound to gateway); but if you want symmetry, you can also enforce a max outbound size.
For now, just stream microservice → HTTP through readResponseBody until EOF or cancellation.
8. Microservice side: streaming-aware RawRequestContext.Body
Your streaming bridging already gives the handler a Stream that reads what the gateway sends:
-
No changes required in handler interfaces.
-
You only need to ensure:
RawRequestContext.Bodymay be non-seekable.- Handlers know they must treat it as a forward-only stream.
Guidance for devs in Microservice.md:
-
For binary uploads or large files, implement
IRawStellaEndpointand read incrementally:[StellaEndpoint("POST", "/billing/invoices/upload")] public sealed class InvoiceUploadEndpoint : IRawStellaEndpoint { public async Task<RawResponse> HandleAsync(RawRequestContext ctx) { var buffer = new byte[64 * 1024]; int read; while ((read = await ctx.Body.ReadAsync(buffer.AsMemory(0, buffer.Length), ctx.CancellationToken)) > 0) { // Process chunk } return new RawResponse { StatusCode = 204 }; } }
9. Tests
Scope: still InMemory, but now streaming & limits.
9.1 Streaming happy path
-
Setup:
-
Endpoint with
SupportsStreaming = true. -
IRawStellaEndpointthat:- Counts total bytes read from
ctx.Body. - Returns 200.
- Counts total bytes read from
-
-
Test:
-
Send an HTTP POST with a body larger than
MaxRequestBytesPerCall, but with streaming enabled. -
Assert:
- Gateway does not buffer entire body in one array (you can assert via instrumentation or at least confirm no 413).
- Handler sees the full number of bytes.
- Response is 200.
-
9.2 Per-call limit breach
-
Configure:
SupportsStreaming = false(or use streaming but set lowMaxRequestBytesPerCall).
-
Test:
-
Send a body larger than limit.
-
Assert:
- Gateway responds 413.
- Handler is not invoked at all.
-
9.3 Global/in-flight limit breach
-
Configure:
MaxAggregateInflightBytesvery low (e.g. 1 MB).
-
Test:
-
Start multiple concurrent streaming requests that each try to send more than the allowed total.
-
Assert:
- Some of them get a CANCEL / error (413 or 503).
IPayloadBudgetdenies reservations and releases resources correctly.
-
10. Done criteria for “Add streaming & payload limits (InMemory)”
You’re done with this step when:
-
Gateway:
- Chooses buffered vs streaming based on
EndpointDescriptor.SupportsStreamingand size. - Enforces
MaxRequestBytesPerCallfor buffered requests (413 on violation). - Uses
ITransportClient.SendStreamingAsyncfor streaming. - Has an
IPayloadBudgetpreventing excessive in-flight payload accumulation.
- Chooses buffered vs streaming based on
-
InMemory transport:
- Implements
SendStreamingAsyncby bridging HTTP streams to microservice handlers and back. - Enforces payload limits while copying.
- Implements
-
Microservice:
- Receives a functional
StreaminRawRequestContext.Body. - Can implement
IRawStellaEndpointthat reads incrementally for large payloads.
- Receives a functional
-
Tests:
- Demonstrate a streaming endpoint works for large payloads.
- Demonstrate per-call and aggregate limits are respected and cause rejections/cancellations.
After this, you can reuse the same semantics when you implement real transports (TCP/TLS/RabbitMQ), with InMemory as your reference implementation.