Skip to content

Commit dc196aa

Browse files
committed
CXF-8629: AsyncHTTPConduit (hc5) should support chunked request / response. Add test cases with auto-redirect
1 parent 9164d67 commit dc196aa

File tree

3 files changed

+197
-10
lines changed

3 files changed

+197
-10
lines changed

rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -696,7 +696,10 @@ protected synchronized HttpResponse getHttpResponse() throws IOException {
696696
}
697697

698698
protected void handleResponseAsync() throws IOException {
699-
isAsync = true;
699+
// The response hasn't been handled yet, should be handled asynchronously
700+
if (httpResponse == null) {
701+
isAsync = true;
702+
}
700703
}
701704

702705
protected void closeInputStream() throws IOException {

systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/FileStore.java

+52
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import jakarta.activation.DataHandler;
3131
import jakarta.ws.rs.Consumes;
32+
import jakarta.ws.rs.GET;
3233
import jakarta.ws.rs.POST;
3334
import jakarta.ws.rs.Path;
3435
import jakarta.ws.rs.QueryParam;
@@ -38,8 +39,10 @@
3839
import jakarta.ws.rs.core.Context;
3940
import jakarta.ws.rs.core.HttpHeaders;
4041
import jakarta.ws.rs.core.Response;
42+
import jakarta.ws.rs.core.Response.ResponseBuilder;
4143
import jakarta.ws.rs.core.Response.Status;
4244
import jakarta.ws.rs.core.StreamingOutput;
45+
import jakarta.ws.rs.core.UriBuilder;
4346
import jakarta.ws.rs.core.UriInfo;
4447
import org.apache.cxf.common.util.StringUtils;
4548
import org.apache.cxf.helpers.IOUtils;
@@ -136,4 +139,53 @@ public void write(OutputStream os) throws IOException, WebApplicationException {
136139
}
137140
}
138141
}
142+
143+
@GET
144+
@Consumes("multipart/form-data")
145+
public void getBook(@QueryParam("chunked") boolean chunked, @QueryParam("filename") String source,
146+
@Suspended final AsyncResponse response) {
147+
148+
if (StringUtils.isEmpty(source)) {
149+
response.resume(Response.status(Status.BAD_REQUEST).build());
150+
return;
151+
}
152+
153+
try {
154+
if (!store.containsKey(source)) {
155+
response.resume(Response.status(Status.NOT_FOUND).build());
156+
return;
157+
}
158+
159+
final byte[] content = store.get(source);
160+
if (response.isSuspended()) {
161+
final StreamingOutput stream = new StreamingOutput() {
162+
@Override
163+
public void write(OutputStream os) throws IOException, WebApplicationException {
164+
if (chunked) {
165+
// Make sure we have enough data for chunking to kick in
166+
for (int i = 0; i < 10; ++i) {
167+
os.write(content);
168+
}
169+
} else {
170+
os.write(content);
171+
}
172+
}
173+
};
174+
response.resume(Response.ok().entity(stream).build());
175+
}
176+
177+
} catch (final Exception ex) {
178+
response.resume(Response.serverError().build());
179+
}
180+
}
181+
182+
@GET
183+
@Path("/redirect")
184+
public Response addBook(@Context UriInfo uriInfo) {
185+
final UriBuilder builder = uriInfo.getBaseUriBuilder().path(getClass());
186+
uriInfo.getQueryParameters(true).forEach((p, v) -> builder.queryParam(p, v.get(0)));
187+
188+
final ResponseBuilder response = Response.status(303).header("Location", builder.build());
189+
return response.build();
190+
}
139191
}

systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/JAXRSAsyncClientChunkingTest.java

+141-9
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,20 @@
2626
import java.util.Collection;
2727
import java.util.List;
2828
import java.util.Random;
29+
import java.util.concurrent.ConcurrentHashMap;
30+
import java.util.concurrent.ConcurrentMap;
31+
import java.util.concurrent.ExecutionException;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.TimeoutException;
34+
import java.util.concurrent.atomic.AtomicInteger;
35+
import java.util.logging.Logger;
2936

3037
import jakarta.ws.rs.client.Entity;
3138
import jakarta.ws.rs.core.MediaType;
3239
import jakarta.ws.rs.core.MultivaluedMap;
3340
import jakarta.ws.rs.core.Response;
3441
import org.apache.cxf.interceptor.LoggingInInterceptor;
42+
import org.apache.cxf.interceptor.LoggingMessage;
3543
import org.apache.cxf.interceptor.LoggingOutInterceptor;
3644
import org.apache.cxf.jaxrs.client.ClientConfiguration;
3745
import org.apache.cxf.jaxrs.client.WebClient;
@@ -40,6 +48,7 @@
4048
import org.apache.cxf.jaxrs.impl.MetadataMap;
4149
import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
4250
import org.apache.cxf.jaxrs.provider.MultipartProvider;
51+
import org.apache.cxf.message.Message;
4352
import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
4453
import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHTTPConduit;
4554

@@ -50,16 +59,20 @@
5059

5160
import static org.hamcrest.CoreMatchers.equalTo;
5261
import static org.hamcrest.CoreMatchers.not;
62+
import static org.hamcrest.CoreMatchers.startsWith;
5363
import static org.junit.Assert.assertThat;
5464
import static org.junit.Assert.assertTrue;
5565

5666
@RunWith(value = org.junit.runners.Parameterized.class)
5767
public class JAXRSAsyncClientChunkingTest extends AbstractBusClientServerTestBase {
5868
private static final String PORT = allocatePort(FileStoreServer.class);
5969
private final Boolean chunked;
70+
private final Boolean autoRedirect;
71+
private final ConcurrentMap<String, AtomicInteger> ids = new ConcurrentHashMap<>();
6072

61-
public JAXRSAsyncClientChunkingTest(Boolean chunked) {
73+
public JAXRSAsyncClientChunkingTest(Boolean chunked, Boolean autoRedirect) {
6274
this.chunked = chunked;
75+
this.autoRedirect = autoRedirect;
6376
}
6477

6578
@BeforeClass
@@ -69,9 +82,14 @@ public static void startServers() throws Exception {
6982
createStaticBus();
7083
}
7184

72-
@Parameters(name = "{0}")
73-
public static Collection<Boolean> data() {
74-
return Arrays.asList(new Boolean[] {Boolean.FALSE, Boolean.TRUE});
85+
@Parameters(name = "chunked {0}, auto-redirect {1}")
86+
public static Collection<Boolean[]> data() {
87+
return Arrays.asList(new Boolean[][] {
88+
{Boolean.FALSE /* chunked */, Boolean.FALSE /* autoredirect */},
89+
{Boolean.FALSE /* chunked */, Boolean.TRUE /* autoredirect */},
90+
{Boolean.TRUE /* chunked */, Boolean.FALSE /* autoredirect */},
91+
{Boolean.TRUE /* chunked */, Boolean.TRUE /* autoredirect */},
92+
});
7593
}
7694

7795
@Test
@@ -82,24 +100,61 @@ public void testMultipartChunking() {
82100
final ClientConfiguration config = WebClient.getConfig(webClient);
83101
config.getBus().setProperty(AsyncHTTPConduit.USE_ASYNC, true);
84102
config.getHttpConduit().getClient().setAllowChunking(chunked);
103+
config.getHttpConduit().getClient().setAutoRedirect(autoRedirect);
85104
configureLogging(config);
86105

106+
final String filename = "keymanagers.jks";
87107
try {
88-
final String filename = "keymanagers.jks";
89108
final MultivaluedMap<String, String> headers = new MetadataMap<>();
90109
headers.add("Content-ID", filename);
91110
headers.add("Content-Type", "application/binary");
92-
headers.add("Content-Disposition", "attachment; filename=" + chunked + "_" + filename);
111+
headers.add("Content-Disposition", "attachment; filename=" + chunked + "_" + autoRedirect + "_" + filename);
93112
final Attachment att = new Attachment(getClass().getResourceAsStream("/" + filename), headers);
94113
final MultipartBody entity = new MultipartBody(att);
95-
try (Response response = webClient.header("Content-Type", "multipart/form-data").post(entity)) {
114+
try (Response response = webClient.header("Content-Type", MediaType.MULTIPART_FORM_DATA).post(entity)) {
96115
assertThat(response.getStatus(), equalTo(201));
97116
assertThat(response.getHeaderString("Transfer-Encoding"), equalTo(chunked ? "chunked" : null));
98117
assertThat(response.getEntity(), not(equalTo(null)));
99118
}
100119
} finally {
101120
webClient.close();
102121
}
122+
123+
assertRedirect(chunked + "_" + autoRedirect + "_" + filename);
124+
}
125+
126+
@Test
127+
public void testMultipartChunkingAsync() throws InterruptedException, ExecutionException, TimeoutException {
128+
final String url = "http://localhost:" + PORT + "/file-store";
129+
final WebClient webClient = WebClient.create(url, List.of(new MultipartProvider())).query("chunked", chunked);
130+
131+
final ClientConfiguration config = WebClient.getConfig(webClient);
132+
config.getBus().setProperty(AsyncHTTPConduit.USE_ASYNC, true);
133+
config.getHttpConduit().getClient().setAllowChunking(chunked);
134+
config.getHttpConduit().getClient().setAutoRedirect(autoRedirect);
135+
configureLogging(config);
136+
137+
final String filename = "keymanagers.jks";
138+
try {
139+
final MultivaluedMap<String, String> headers = new MetadataMap<>();
140+
headers.add("Content-ID", filename);
141+
headers.add("Content-Type", "application/binary");
142+
headers.add("Content-Disposition", "attachment; filename=" + chunked
143+
+ "_" + autoRedirect + "_async_" + filename);
144+
final Attachment att = new Attachment(getClass().getResourceAsStream("/" + filename), headers);
145+
final Entity<MultipartBody> entity = Entity.entity(new MultipartBody(att),
146+
MediaType.MULTIPART_FORM_DATA_TYPE);
147+
try (Response response = webClient.header("Content-Type", MediaType.MULTIPART_FORM_DATA).async()
148+
.post(entity).get(10, TimeUnit.SECONDS)) {
149+
assertThat(response.getStatus(), equalTo(201));
150+
assertThat(response.getHeaderString("Transfer-Encoding"), equalTo(chunked ? "chunked" : null));
151+
assertThat(response.getEntity(), not(equalTo(null)));
152+
}
153+
} finally {
154+
webClient.close();
155+
}
156+
157+
assertRedirect(chunked + "_" + autoRedirect + "_" + filename);
103158
}
104159

105160
@Test
@@ -110,6 +165,7 @@ public void testStreamChunking() throws IOException {
110165
final ClientConfiguration config = WebClient.getConfig(webClient);
111166
config.getBus().setProperty(AsyncHTTPConduit.USE_ASYNC, true);
112167
config.getHttpConduit().getClient().setAllowChunking(chunked);
168+
config.getHttpConduit().getClient().setAutoRedirect(autoRedirect);
113169
configureLogging(config);
114170

115171
final byte[] bytes = new byte [32 * 1024];
@@ -126,13 +182,89 @@ public void testStreamChunking() throws IOException {
126182
} finally {
127183
webClient.close();
128184
}
185+
186+
assertNoDuplicateLogging();
129187
}
130-
188+
189+
@Test
190+
public void testStreamChunkingAsync() throws IOException, InterruptedException,
191+
ExecutionException, TimeoutException {
192+
final String url = "http://localhost:" + PORT + "/file-store/stream";
193+
final WebClient webClient = WebClient.create(url).query("chunked", chunked);
194+
195+
final ClientConfiguration config = WebClient.getConfig(webClient);
196+
config.getBus().setProperty(AsyncHTTPConduit.USE_ASYNC, true);
197+
config.getHttpConduit().getClient().setAllowChunking(chunked);
198+
config.getHttpConduit().getClient().setAutoRedirect(autoRedirect);
199+
configureLogging(config);
200+
201+
final byte[] bytes = new byte [32 * 1024];
202+
final Random random = new Random();
203+
random.nextBytes(bytes);
204+
205+
try (InputStream in = new ByteArrayInputStream(bytes)) {
206+
final Entity<InputStream> entity = Entity.entity(in, MediaType.APPLICATION_OCTET_STREAM);
207+
try (Response response = webClient.async().post(entity).get(10, TimeUnit.SECONDS)) {
208+
assertThat(response.getStatus(), equalTo(200));
209+
assertThat(response.getHeaderString("Transfer-Encoding"), equalTo(chunked ? "chunked" : null));
210+
assertThat(response.getEntity(), not(equalTo(null)));
211+
}
212+
} finally {
213+
webClient.close();
214+
}
215+
216+
assertNoDuplicateLogging();
217+
}
218+
219+
private void assertRedirect(String filename) {
220+
final String url = "http://localhost:" + PORT + "/file-store/redirect";
221+
222+
final WebClient webClient = WebClient.create(url, List.of(new MultipartProvider()))
223+
.query("chunked", chunked)
224+
.query("filename", filename);
225+
226+
final ClientConfiguration config = WebClient.getConfig(webClient);
227+
config.getBus().setProperty(AsyncHTTPConduit.USE_ASYNC, true);
228+
config.getHttpConduit().getClient().setAllowChunking(chunked);
229+
config.getHttpConduit().getClient().setAutoRedirect(autoRedirect);
230+
configureLogging(config);
231+
232+
try {
233+
try (Response response = webClient.get()) {
234+
if (autoRedirect) {
235+
assertThat(response.getStatus(), equalTo(200));
236+
assertThat(response.getHeaderString("Transfer-Encoding"), equalTo(chunked ? "chunked" : null));
237+
assertThat(response.getEntity(), not(equalTo(null)));
238+
} else {
239+
assertThat(response.getStatus(), equalTo(303));
240+
assertThat(response.getHeaderString("Location"),
241+
startsWith("http://localhost:" + PORT + "/file-store"));
242+
}
243+
}
244+
} finally {
245+
webClient.close();
246+
}
247+
248+
assertNoDuplicateLogging();
249+
}
250+
251+
private void assertNoDuplicateLogging() {
252+
ids.forEach((id, counter) -> assertThat("Duplicate client logging for message " + id,
253+
counter.get(), equalTo(1)));
254+
}
255+
131256
private void configureLogging(final ClientConfiguration config) {
132257
final LoggingOutInterceptor out = new LoggingOutInterceptor();
133258
out.setShowMultipartContent(false);
134259

135-
final LoggingInInterceptor in = new LoggingInInterceptor();
260+
final LoggingInInterceptor in = new LoggingInInterceptor() {
261+
@Override
262+
protected void logging(Logger logger, Message message) {
263+
super.logging(logger, message);
264+
final String id = (String) message.get(LoggingMessage.ID_KEY);
265+
ids.computeIfAbsent(id, key -> new AtomicInteger()).incrementAndGet();
266+
}
267+
};
136268
in.setShowBinaryContent(false);
137269

138270
config.getInInterceptors().add(in);

0 commit comments

Comments
 (0)