diff --git a/parent/pom.xml b/parent/pom.xml
index 7f93d18fa83..e2bd900af2f 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -1379,6 +1379,12 @@
${cxf.hamcrest.version}
test
+
+ org.hamcrest
+ hamcrest-all
+ ${cxf.hamcrest.version}
+ test
+
org.awaitility
awaitility
diff --git a/systests/jaxrs/pom.xml b/systests/jaxrs/pom.xml
index 209bb809538..632b6695b1f 100644
--- a/systests/jaxrs/pom.xml
+++ b/systests/jaxrs/pom.xml
@@ -543,6 +543,11 @@
${cxf.ehcache3.version}
test
+
+ org.hamcrest
+ hamcrest-all
+ test
+
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookStore.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookStore.java
index d4d0cecf87a..bfb26df8584 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookStore.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookStore.java
@@ -19,9 +19,12 @@
package org.apache.cxf.systest.jaxrs;
+import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
import java.io.OutputStream;
+import java.io.PrintWriter;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
@@ -37,6 +40,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@@ -1410,6 +1414,30 @@ public void writeTo(Writer writer) throws IOException {
}).build();
}
+ @GET
+ @Path("/books/streamingoutput")
+ @Produces("text/xml")
+ public Response getBookStreamingOutput(@QueryParam("times") @DefaultValue("1") int times,
+ @QueryParam("delay") @DefaultValue("0") long delay) {
+ final StreamingOutput streamWritingError = outputStream -> {
+ try (PrintWriter writer = new PrintWriter(outputStream, true);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(getBufferedBook()))) {
+ final String readLine = reader.readLine();
+ for (int i = 0; i < times; i++) {
+ writer.println(readLine);
+ if (i < times - 1) { // skip the delay in case of the last message
+ TimeUnit.MILLISECONDS.sleep(delay);
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InternalServerErrorException();
+ }
+ };
+
+ return Response.ok(streamWritingError).build();
+ }
+
@GET
@Path("/books/fail-late")
@Produces("application/bar")
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/logging/LoggingTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/logging/LoggingTest.java
index c45f9f7633c..4454a26243d 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/logging/LoggingTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/logging/LoggingTest.java
@@ -19,26 +19,36 @@
package org.apache.cxf.systest.jaxrs.logging;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.StringReader;
import java.util.Collections;
-import java.util.List;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
-import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
-
+import org.apache.cxf.ext.logging.LoggingFeature;
+import org.apache.cxf.feature.Feature;
+import org.apache.cxf.jaxrs.client.JAXRSClientFactoryBean;
import org.apache.cxf.jaxrs.client.WebClient;
import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
import org.apache.cxf.systest.jaxrs.Book;
import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.hamcrest.Matchers;
import org.junit.BeforeClass;
import org.junit.Test;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-
public class LoggingTest extends AbstractBusClientServerTestBase {
@BeforeClass
public static void startServers() {
@@ -58,11 +68,81 @@ public void testEchoBookElement() {
assertEquals("CXF", book.getName());
}
- protected WebClient createWebClient(final String url, final String mediaType) {
- final List> providers = Collections.singletonList(new JacksonJsonProvider());
+ @Test
+ public void testStreamingOutputBlockingRead() throws IOException, JAXBException {
+ final int expectedEntry = 2;
+ final long expectedDelay = 1000L;
+ final WebClient webClient = createWebClient("/bookstore/books/streamingoutput", MediaType.TEXT_XML,
+ new LoggingFeature());
+ final long requestSent = System.currentTimeMillis();
+ final Response response = webClient
+ .query("times", expectedEntry)
+ .query("delay", expectedDelay)
+ .get();
+ assertThat(System.currentTimeMillis() - requestSent, Matchers.lessThan(100L));
+ assertEquals(200, response.getStatus());
+ assertEquals(-1, response.getLength());
+
+ try (InputStreamReader inputStreamReader = new InputStreamReader(response.readEntity(InputStream.class));
+ BufferedReader reader = new BufferedReader(inputStreamReader)) {
+ int readEntry = 0;
+ long previousStartTime = -1L;
+ String readLine;
+ while ((readLine = reader.readLine()) != null) {
+ assertDelayBetweenMessages(previousStartTime, expectedDelay);
+ final Book book = readBook(readLine);
+ assertEquals(123L, book.getId());
+ assertEquals("CXF in Action", book.getName());
+
+ readEntry++;
+ previousStartTime = System.currentTimeMillis();
+ }
+
+ assertEquals(expectedEntry, readEntry);
+ }
+
+ }
+
+ private void assertDelayBetweenMessages(long previous, long expectedDelay) {
+ if (previous < 0L) {
+ // no need to assert the delay between messages, because it is the first one
+ return;
+ }
+ final long endTime = System.currentTimeMillis();
+ final long delay = endTime - previous;
+ assertThat(delay, Matchers.greaterThan(expectedDelay - 150L));
+ assertThat(delay, Matchers.lessThan(expectedDelay + 150L));
+ }
+
+ protected WebClient createWebClient(final String url, final String mediaType) {
return WebClient
- .create("http://localhost:" + LoggingServer.PORT + url, providers)
+ .create("http://localhost:" + LoggingServer.PORT)
+ .path(url)
.accept(mediaType);
}
+
+ private static Book readBook(String input) throws JAXBException {
+ final Class resultClass = Book.class;
+ final Unmarshaller unmarshaller = JAXBContext.newInstance(resultClass)
+ .createUnmarshaller();
+ final StreamSource streamSource = new StreamSource(new StringReader(input));
+
+ return unmarshaller.unmarshal(streamSource, resultClass).getValue();
+ }
+
+ protected WebClient createWebClient(final String url, final String mediaType, Feature feature) {
+ JAXRSClientFactoryBean bean = createBean("http://localhost:" + LoggingServer.PORT, feature);
+ return bean.createWebClient()
+ .path(url)
+ .accept(mediaType);
+ }
+
+ private static JAXRSClientFactoryBean createBean(String address,
+ Feature feature) {
+ JAXRSClientFactoryBean bean = new JAXRSClientFactoryBean();
+ bean.setAddress(address);
+ bean.setFeatures(Collections.singletonList(feature));
+ return bean;
+ }
}