Skip to content

Commit

Permalink
8327991
Browse files Browse the repository at this point in the history
  • Loading branch information
dfuch committed Mar 13, 2024
1 parent 0776fff commit c147ef6
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 25 deletions.
48 changes: 36 additions & 12 deletions src/java.net.http/share/classes/java/net/http/HttpClient.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -29,6 +29,8 @@
import java.io.UncheckedIOException;
import java.net.InetAddress;
import java.net.http.HttpResponse.BodyHandlers;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.http.HttpResponse.BodySubscribers;
import java.nio.channels.Selector;
import java.net.Authenticator;
import java.net.CookieHandler;
Expand Down Expand Up @@ -132,30 +134,52 @@
* reclaimed early by {@linkplain #close() closing} the client.
*
* @implNote
* <p id="closing">
* The JDK built-in implementation of the {@code HttpClient} overrides
* <p id="streaming">
* The HttpClient {@link BodyHandlers} and {@link BodySubscribers}
* API provide some {@linkplain BodySubscribers##streaming-body streaming
* or publishing {@code BodyHandler} and {@code BodySubscriber}
* implementations} which allow to stream body data back to the caller.
* In order for the resources associated with these streams to be
* reclaimed, a caller must eventually {@linkplain HttpResponse#body()
* obtain these streaming response body} and close, cancel, or
* read the returned streams to exhaustion. Likewise, a custom
* {@link BodySubscriber} implementation should either {@linkplain
* Subscription#request(long) request} all data until {@link
* BodySubscriber#onComplete() onComplete} or {@link
* BodySubscriber#onError(Throwable) onError} is signalled, or eventually
* {@linkplain Subscription#cancel() cancel} its subscription.
*
* <p id="closing">
* The JDK built-in implementation of the {@code HttpClient} overrides
* {@link #close()}, {@link #shutdown()}, {@link #shutdownNow()},
* {@link #awaitTermination(Duration)}, and {@link #isTerminated()} to
* provide a best effort implementation. Failing to close, cancel, or
* read returned streams to exhaustion, such as streams provided when using
* {@link BodyHandlers#ofInputStream()}, {@link BodyHandlers#ofLines()}, or
* {@link BodyHandlers#ofPublisher()}, may prevent requests submitted
* before an {@linkplain #shutdown() orderly shutdown}
* to run to completion. Likewise, failing to
* {@linkplain Subscription#request(long) request data} or {@linkplain
* Subscription#cancel() cancel subscriptions} from a custom {@linkplain
* java.net.http.HttpResponse.BodySubscriber BodySubscriber} may stop
* read {@link ##streaming streaming bodies} to exhaustion may stop
* delivery of data and {@linkplain #awaitTermination(Duration) stall an
* orderly shutdown}.
*
* <p id="gc">
* If not {@linkplain ##closing explicitly closed}, the JDK
* built-in implementation of the {@code HttpClient} releases
* its resources when an {@code HttpClient} instance is no longer
* strongly reachable, and all operations started on that instance have
* eventually completed. This relies both on the garbage collector
* to notice that the instance is no longer reachable, and on all
* requests started on the client to eventually complete. Failure
* to properly close {@linkplain ##streaming streaming bodies} may
* prevent the associated requests from running to completion, and
* prevent the resources allocated by the associated client from
* being reclaimed by the garbage collector.
*
*
* <p>
* If an explicit {@linkplain HttpClient.Builder#executor(Executor)
* executor} has not been set for an {@code HttpClient}, and a security manager
* has been installed, then the default executor will execute asynchronous and
* dependent tasks in a context that is granted no permissions. Custom
* {@linkplain HttpRequest.BodyPublisher request body publishers}, {@linkplain
* HttpResponse.BodyHandler response body handlers}, {@linkplain
* HttpResponse.BodySubscriber response body subscribers}, and {@linkplain
* BodySubscriber response body subscribers}, and {@linkplain
* WebSocket.Listener WebSocket Listeners}, if executing operations that require
* privileges, should do so within an appropriate {@linkplain
* AccessController#doPrivileged(PrivilegedAction) privileged context}.
Expand Down
69 changes: 56 additions & 13 deletions src/java.net.http/share/classes/java/net/http/HttpResponse.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -296,6 +296,13 @@ public interface BodyHandler<T> {
* HttpResponse<Void> response = client
* .send(request, BodyHandlers.discarding()); }
*
* @apiNote
* Some {@linkplain HttpResponse#body() body implementations} created by
* {@linkplain BodySubscribers##streaming-body body subscribers} may need to be
* properly closed, read, or cancelled for the associated resources to
* be reclaimed and for the associated request to {@linkplain HttpClient##closing
* run to completion}.
*
* @since 11
*/
public static class BodyHandlers {
Expand Down Expand Up @@ -633,8 +640,14 @@ public static BodyHandler<Path> ofFileDownload(Path directory,
*
* @apiNote See {@link BodySubscribers#ofInputStream()} for more
* information.
* <p>
* To ensure that all resources associated with the
* corresponding exchange are properly released the caller must
* eventually obtain and close the {@linkplain BodySubscribers#ofInputStream()
* returned stream}.
*
* @return a {@linkplain HttpClient##streaming streaming} response body handler
*
* @return a response body handler
*/
public static BodyHandler<InputStream> ofInputStream() {
return (responseInfo) -> BodySubscribers.ofInputStream();
Expand All @@ -651,7 +664,14 @@ public static BodyHandler<InputStream> ofInputStream() {
* <p> When the {@code HttpResponse} object is returned, the body may
* not have been completely received.
*
* @return a response body handler
* @apiNote
* To ensure that all resources associated with the
* corresponding exchange are properly released the caller must
* eventually obtain and close the {@linkplain BodySubscribers#ofLines(Charset)
* returned stream}.
*
* @return a {@linkplain HttpClient##streaming streaming} response body handler
*
*/
public static BodyHandler<Stream<String>> ofLines() {
return (responseInfo) ->
Expand Down Expand Up @@ -726,10 +746,17 @@ public static BodyHandler<String> ofString() {
* response bytes can be obtained as they are received. The publisher
* can and must be subscribed to only once.
*
* @apiNote See {@link BodySubscribers#ofPublisher()} for more
* @apiNote
* See {@link BodySubscribers#ofPublisher()} for more
* information.
* <p>
* To ensure that all resources associated with the
* corresponding exchange are properly released the caller must
* subscribe to the publisher and conform to the rules outlined in
* {@linkplain BodySubscribers#ofPublisher()}
*
* @return a {@linkplain HttpClient##streaming publishing} response body handler
*
* @return a response body handler
*/
public static BodyHandler<Publisher<List<ByteBuffer>>> ofPublisher() {
return (responseInfo) -> BodySubscribers.ofPublisher();
Expand Down Expand Up @@ -840,7 +867,7 @@ public void applyPushPromise(
* already be completed at this point.
*
* @param <T> the push promise response body type
* @param pushPromiseHandler t he body handler to use for push promises
* @param pushPromiseHandler the body handler to use for push promises
* @param pushPromisesMap a map to accumulate push promises into
* @return a push promise handler
*/
Expand Down Expand Up @@ -937,6 +964,20 @@ public interface BodySubscriber<T>
* .send(request, responseInfo ->
* BodySubscribers.mapping(BodySubscribers.ofString(UTF_8), String::getBytes)); }
*
* @apiNote
* <p id="streaming-body">
* Some {@linkplain HttpResponse#body() body implementations} created by
* {@linkplain BodySubscriber#getBody() body subscribers} may allow response bytes
* to be streamed to the caller. These implementations are typically
* {@link AutoCloseable} and may need to be explicitly closed in order for
* the resources associated with the request and the client to be {@linkplain
* HttpClient##closing eventually reclaimed}.
* Some other implementations are {@linkplain Publisher publishers} which need to be
* {@link BodySubscribers#ofPublisher() subscribed} in order for their associated
* resources to be released and for the associated request to {@linkplain
* HttpClient##closing run to completion}.
*
*
* @since 11
*/
public static class BodySubscribers {
Expand Down Expand Up @@ -1168,7 +1209,7 @@ public static BodySubscriber<Path> ofFile(Path file) {
* amount of data is delivered in a timely fashion.
*
* @param consumer a Consumer of byte arrays
* @return a BodySubscriber
* @return a body subscriber
*/
public static BodySubscriber<Void>
ofByteArrayConsumer(Consumer<Optional<byte[]>> consumer) {
Expand Down Expand Up @@ -1199,8 +1240,8 @@ public static BodySubscriber<Path> ofFile(Path file) {
* while blocking on read. In that case, the request will also be
* cancelled and the {@code InputStream} will be closed.
*
* @return a body subscriber that streams the response body as an
* {@link InputStream}.
* @return a {@linkplain HttpClient##streaming streaming body subscriber}
* that streams the response body as an {@link InputStream}.
*/
public static BodySubscriber<InputStream> ofInputStream() {
return new ResponseSubscribers.HttpResponseInputStream();
Expand All @@ -1225,8 +1266,8 @@ public static BodySubscriber<InputStream> ofInputStream() {
* from being reused for subsequent operations.
*
* @param charset the character set to use when converting bytes to characters
* @return a body subscriber that streams the response body as a
* {@link Stream Stream}{@code <String>}.
* @return a {@linkplain HttpClient##streaming streaming body subscriber} that streams
* the response body as a {@link Stream Stream}{@code <String>}.
*
* @see BufferedReader#lines()
*/
Expand Down Expand Up @@ -1268,8 +1309,10 @@ public static BodySubscriber<Stream<String>> ofLines(Charset charset) {
* HTTP connection to be closed and prevent it from being reused for
* subsequent operations.
*
* @return A {@code BodySubscriber} which publishes the response body
* @return A {@linkplain HttpClient##streaming publishing body subscriber}
* which publishes the response body
* through a {@code Publisher<List<ByteBuffer>>}.
*
*/
public static BodySubscriber<Publisher<List<ByteBuffer>>> ofPublisher() {
return ResponseSubscribers.createPublisher();
Expand All @@ -1282,7 +1325,7 @@ public static BodySubscriber<Publisher<List<ByteBuffer>>> ofPublisher() {
*
* @param <U> the type of the response body
* @param value the value to return from HttpResponse.body(), may be {@code null}
* @return a {@code BodySubscriber}
* @return a body subscriber
*/
public static <U> BodySubscriber<U> replacing(U value) {
return new ResponseSubscribers.NullSubscriber<>(Optional.ofNullable(value));
Expand Down

0 comments on commit c147ef6

Please sign in to comment.