Skip to content

Commit

Permalink
Provide a way to enable/disable streaming for a request or response (#…
Browse files Browse the repository at this point in the history
…3956)

Motivation:

`HttpService` regards every request and response as streaming. The
streaming request and response could be a super set of non-streaming ones.
However, if we let users specify whether to use streaming or
automatically detect for annotated services and gRPC services, we can
get a chance to add enhanced code paths for non-streaming `HttpService`.

Modifications:

- Add `ExchangeType` that represents whether to stream a request or
  response
- Add `HttpService.exchangeType(RequestHeaders,Route)` to determine an
  `ExchangeType` with the given parameters.
  - gRPC and annotated services will dynamically decide an `ExchangeType`
    in runtime.
- Move `FixedStreamMessage` and its subclasses to `internal` package.
- Add `AggregatingStreamMessage` that can buffer objects and can publish
  after closing the stream.
  - `AggregatingDecodedHttpRequest` uses `AggregatingStreamMessage` to
    accumulate `HttpObject`s before receiving end of stream.
  - Only closed `AggregatingDecodedHttpRequest`s can be fired to
    `HttpServerHandler`
- Refactor a way to hold `ServerConfig` to see reconfigured
  `ServerConfig` without `configUpdateListener`
- Add `EarlyResponseRoutingContext` that indicate a early response that
  can be served at `HttpServerHander`.
- Fix `Http{1,2}RequestDecoder` to properly handle
  `AggregatingDecodedHttpRequest`

Result:

- You can now set an `ExchangeType` to `HttpService` for a non-streaming
  request or response.
  • Loading branch information
ikhoon authored Mar 22, 2022
1 parent a099ca3 commit 871d872
Show file tree
Hide file tree
Showing 90 changed files with 3,602 additions and 1,196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class RoutersBenchmark {
@Benchmark
public Routed<ServiceConfig> exactMatch() {
final RoutingContext ctx = DefaultRoutingContext.of(HOST, "localhost", METHOD1_HEADERS.path(),
null, METHOD1_HEADERS, false);
null, METHOD1_HEADERS, RoutingStatus.OK);
final Routed<ServiceConfig> routed = ROUTER.find(ctx);
if (routed.value() != SERVICES.get(0)) {
throw new IllegalStateException("Routing error");
Expand All @@ -82,7 +82,7 @@ public Routed<ServiceConfig> exactMatch() {
public Routed<ServiceConfig> exactMatch_wrapped() {
final RoutingContext ctx = new RoutingContextWrapper(
DefaultRoutingContext.of(HOST, "localhost", METHOD1_HEADERS.path(),
null, METHOD1_HEADERS, false));
null, METHOD1_HEADERS, RoutingStatus.OK));
final Routed<ServiceConfig> routed = ROUTER.find(ctx);
if (routed.value() != SERVICES.get(0)) {
throw new IllegalStateException("Routing error");
Expand Down
68 changes: 68 additions & 0 deletions core/src/main/java/com/linecorp/armeria/common/ExchangeType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2021 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common;

import com.linecorp.armeria.common.annotation.UnstableApi;

/**
* Represents whether to stream an {@link HttpRequest} or {@link HttpResponse}.
*/
@UnstableApi
public enum ExchangeType {
/**
* No streaming. A {@link HttpRequest} and a {@link HttpResponse} will be buffered
* when they are sent or received.
*/
UNARY(false, false),
/**
* A streaming {@link HttpRequest} with a non-streaming {@link HttpResponse}.
* The {@link HttpResponse} will be buffered when it is sent or received.
*/
REQUEST_STREAMING(true, false),
/**
* A non-streaming {@link HttpRequest} with a streaming {@link HttpResponse}.
* The {@link HttpRequest} will be buffered when it is sent or received.
*/
RESPONSE_STREAMING(false, true),
/**
* Bidirectional streaming.
* Neither a {@link HttpRequest} nor a {@link HttpResponse} is buffered.
*/
BIDI_STREAMING(true, true);

private final boolean requestStreaming;
private final boolean responseStreaming;

ExchangeType(boolean requestStreaming, boolean responseStreaming) {
this.requestStreaming = requestStreaming;
this.responseStreaming = responseStreaming;
}

/**
* Returns whether to support request streaming.
*/
public boolean isRequestStreaming() {
return requestStreaming;
}

/**
* Returns whether to support response streaming.
*/
public boolean isResponseStreaming() {
return responseStreaming;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

package com.linecorp.armeria.common;

import com.linecorp.armeria.common.stream.EmptyFixedStreamMessage;
import com.linecorp.armeria.common.stream.OneElementFixedStreamMessage;
import com.linecorp.armeria.common.stream.RegularFixedStreamMessage;
import com.linecorp.armeria.common.stream.TwoElementFixedStreamMessage;
import com.linecorp.armeria.internal.common.stream.EmptyFixedStreamMessage;
import com.linecorp.armeria.internal.common.stream.OneElementFixedStreamMessage;
import com.linecorp.armeria.internal.common.stream.RegularFixedStreamMessage;
import com.linecorp.armeria.internal.common.stream.TwoElementFixedStreamMessage;

/**
* An {@link HttpRequest} optimized for when all the {@link HttpObject}s that will be published are known at
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

import static com.google.common.base.Preconditions.checkArgument;

import com.linecorp.armeria.common.stream.OneElementFixedStreamMessage;
import com.linecorp.armeria.common.stream.RegularFixedStreamMessage;
import com.linecorp.armeria.common.stream.ThreeElementFixedStreamMessage;
import com.linecorp.armeria.common.stream.TwoElementFixedStreamMessage;
import com.linecorp.armeria.internal.common.stream.OneElementFixedStreamMessage;
import com.linecorp.armeria.internal.common.stream.RegularFixedStreamMessage;
import com.linecorp.armeria.internal.common.stream.ThreeElementFixedStreamMessage;
import com.linecorp.armeria.internal.common.stream.TwoElementFixedStreamMessage;

/**
* An {@link HttpResponse} optimized for when all the {@link HttpObject}s that will be published are known at
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,8 @@ public long contentLength() {
cache.put(HttpHeaderNames.CONTENT_LENGTH, parsed);
return parsed;
} else {
cache.put(HttpHeaderNames.CONTENT_LENGTH, -1);
return -1;
cache.put(HttpHeaderNames.CONTENT_LENGTH, -1L);
return -1L;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.linecorp.armeria.common.util.CompositeException;
import com.linecorp.armeria.common.util.EventLoopCheckingFuture;
import com.linecorp.armeria.internal.common.stream.AbortingSubscriber;
import com.linecorp.armeria.internal.common.stream.NeverInvokedSubscriber;
import com.linecorp.armeria.internal.common.stream.NoopSubscription;

import io.netty.util.concurrent.EventExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import com.linecorp.armeria.common.util.CompositeException;
import com.linecorp.armeria.common.util.EventLoopCheckingFuture;
import com.linecorp.armeria.internal.common.stream.AbortingSubscriber;
import com.linecorp.armeria.internal.common.stream.NeverInvokedSubscriber;
import com.linecorp.armeria.internal.common.stream.NoopSubscription;
import com.linecorp.armeria.internal.common.stream.StreamMessageUtil;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.linecorp.armeria.common.stream;

import static com.linecorp.armeria.internal.common.stream.InternalStreamMessageUtil.containsWithPooledObjects;
import static java.util.Objects.requireNonNull;

import java.util.List;
Expand Down Expand Up @@ -149,6 +150,7 @@ public CompletableFuture<List<U>> collect(EventExecutor executor, SubscriptionOp

final ImmutableList.Builder<U> builder = ImmutableList.builderWithExpectedSize(objs.size());
Throwable cause0 = null;
final boolean withPooledObjects = containsWithPooledObjects(options);
for (Object obj : objs) {
if (cause0 != null) {
// An error was raised. The remaing objects should be released.
Expand All @@ -157,8 +159,9 @@ public CompletableFuture<List<U>> collect(EventExecutor executor, SubscriptionOp
}

try {
final U result = function.apply(obj);
U result = function.apply(obj);
if (result != null) {
result = StreamMessageUtil.touchOrCopyAndClose(result, withPooledObjects);
builder.add(result);
} else {
StreamMessageUtil.closeOrAbort(obj);
Expand Down Expand Up @@ -202,7 +205,8 @@ public void subscribe(Subscriber<? super U> subscriber, EventExecutor executor,
requireNonNull(executor, "executor");
requireNonNull(options, "options");

source.subscribe(new FuseableSubscriber<>(subscriber, function, errorFunction), executor, options);
source.subscribe(new FuseableSubscriber<>(subscriber, function, errorFunction,
containsWithPooledObjects(options)), executor, options);
}

@Override
Expand All @@ -223,17 +227,19 @@ private static final class FuseableSubscriber<U> implements Subscriber<Object>,
private final MapperFunction<Object, U> function;
@Nullable
private final Function<Throwable, Throwable> errorFunction;
private final boolean withPooledObjects;

@Nullable
private volatile Subscription upstream;
private volatile boolean canceled;

FuseableSubscriber(Subscriber<? super U> downstream, @Nullable MapperFunction<Object, U> function,
@Nullable Function<Throwable, Throwable> errorFunction) {
@Nullable Function<Throwable, Throwable> errorFunction, boolean withPooledObjects) {
requireNonNull(downstream, "downstream");
this.downstream = downstream;
this.function = function;
this.errorFunction = errorFunction;
this.withPooledObjects = withPooledObjects;
}

@Override
Expand Down Expand Up @@ -261,6 +267,7 @@ public void onNext(Object item) {
result = (U) item;
}
if (result != null) {
result = StreamMessageUtil.touchOrCopyAndClose(result, withPooledObjects);
downstream.onNext(result);
} else {
StreamMessageUtil.closeOrAbort(item);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@
import com.linecorp.armeria.common.annotation.UnstableApi;
import com.linecorp.armeria.internal.common.stream.AbortedStreamMessage;
import com.linecorp.armeria.internal.common.stream.DecodedStreamMessage;
import com.linecorp.armeria.internal.common.stream.EmptyFixedStreamMessage;
import com.linecorp.armeria.internal.common.stream.OneElementFixedStreamMessage;
import com.linecorp.armeria.internal.common.stream.RecoverableStreamMessage;
import com.linecorp.armeria.internal.common.stream.RegularFixedStreamMessage;
import com.linecorp.armeria.internal.common.stream.ThreeElementFixedStreamMessage;
import com.linecorp.armeria.internal.common.stream.TwoElementFixedStreamMessage;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.EventLoop;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,8 @@ public boolean equals(AsciiString a, AsciiString b) {
}

/**
* <a href="https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.3">rfc7540, 8.1.2.3</a> states the path must not
* be empty, and instead should be {@code /}.
* <a href="https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.3">rfc7540, 8.1.2.3</a>
* states the path must not be empty, and instead should be {@code /}.
*/
private static final String EMPTY_REQUEST_PATH = "/";

Expand Down Expand Up @@ -424,13 +424,13 @@ public static boolean isContentAlwaysEmptyWithValidation(HttpStatus status, Http
}

/**
* Returns {@code true} if the specified {@code request} is a CORS preflight request.
* Returns {@code true} if the specified {@code headers} is a CORS preflight request.
*/
public static boolean isCorsPreflightRequest(com.linecorp.armeria.common.HttpRequest request) {
requireNonNull(request, "request");
return request.method() == HttpMethod.OPTIONS &&
request.headers().contains(HttpHeaderNames.ORIGIN) &&
request.headers().contains(HttpHeaderNames.ACCESS_CONTROL_REQUEST_METHOD);
public static boolean isCorsPreflightRequest(RequestHeaders headers) {
requireNonNull(headers, "headers");
return headers.method() == HttpMethod.OPTIONS &&
headers.contains(HttpHeaderNames.ORIGIN) &&
headers.contains(HttpHeaderNames.ACCESS_CONTROL_REQUEST_METHOD);
}

/**
Expand Down Expand Up @@ -787,7 +787,7 @@ public static Http2Headers toNettyHttp2ServerTrailers(HttpHeaders inputHeaders)
builder.remove(disallowed.getKey());
}
for (AsciiString disallowed : PSEUDO_HEADERS) {
builder.remove(disallowed);
builder.remove(disallowed);
}
for (Entry<AsciiString, AsciiString> disallowed : HTTP_TRAILER_DISALLOWED_LIST) {
builder.remove(disallowed.getKey());
Expand Down Expand Up @@ -1012,6 +1012,7 @@ public static ResponseHeaders setOrRemoveContentLength(ResponseHeaders headers,
if (!headers.contains(HttpHeaderNames.CONTENT_LENGTH) || !content.isEmpty()) {
return headers.toBuilder()
.contentLength(content.length())
.removeAndThen(HttpHeaderNames.TRANSFER_ENCODING)
.build();
}

Expand Down
Loading

0 comments on commit 871d872

Please sign in to comment.