diff --git a/parent/pom.xml b/parent/pom.xml index 7f93d18fa83..e2bd900af2f 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -1379,6 +1379,12 @@ ${cxf.hamcrest.version} test + + org.hamcrest + hamcrest-all + ${cxf.hamcrest.version} + test + org.awaitility awaitility diff --git a/systests/jaxrs/pom.xml b/systests/jaxrs/pom.xml index 209bb809538..632b6695b1f 100644 --- a/systests/jaxrs/pom.xml +++ b/systests/jaxrs/pom.xml @@ -543,6 +543,11 @@ ${cxf.ehcache3.version} test + + org.hamcrest + hamcrest-all + test + diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookStore.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookStore.java index d4d0cecf87a..bfb26df8584 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookStore.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookStore.java @@ -19,9 +19,12 @@ package org.apache.cxf.systest.jaxrs; +import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.io.OutputStream; +import java.io.PrintWriter; import java.lang.annotation.Annotation; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; @@ -37,6 +40,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -1410,6 +1414,30 @@ public void writeTo(Writer writer) throws IOException { }).build(); } + @GET + @Path("/books/streamingoutput") + @Produces("text/xml") + public Response getBookStreamingOutput(@QueryParam("times") @DefaultValue("1") int times, + @QueryParam("delay") @DefaultValue("0") long delay) { + final StreamingOutput streamWritingError = outputStream -> { + try (PrintWriter writer = new PrintWriter(outputStream, true); + BufferedReader reader = new BufferedReader(new InputStreamReader(getBufferedBook()))) { + final String readLine = reader.readLine(); + for (int i = 0; i < times; i++) { + writer.println(readLine); + if (i < times - 1) { // skip the delay in case of the last message + TimeUnit.MILLISECONDS.sleep(delay); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new InternalServerErrorException(); + } + }; + + return Response.ok(streamWritingError).build(); + } + @GET @Path("/books/fail-late") @Produces("application/bar") diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/logging/LoggingTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/logging/LoggingTest.java index c45f9f7633c..4454a26243d 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/logging/LoggingTest.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/logging/LoggingTest.java @@ -19,26 +19,36 @@ package org.apache.cxf.systest.jaxrs.logging; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.StringReader; import java.util.Collections; -import java.util.List; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; +import javax.xml.transform.stream.StreamSource; -import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; - +import org.apache.cxf.ext.logging.LoggingFeature; +import org.apache.cxf.feature.Feature; +import org.apache.cxf.jaxrs.client.JAXRSClientFactoryBean; import org.apache.cxf.jaxrs.client.WebClient; import org.apache.cxf.jaxrs.model.AbstractResourceInfo; import org.apache.cxf.systest.jaxrs.Book; import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; +import org.hamcrest.Matchers; import org.junit.BeforeClass; import org.junit.Test; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; - public class LoggingTest extends AbstractBusClientServerTestBase { @BeforeClass public static void startServers() { @@ -58,11 +68,81 @@ public void testEchoBookElement() { assertEquals("CXF", book.getName()); } - protected WebClient createWebClient(final String url, final String mediaType) { - final List providers = Collections.singletonList(new JacksonJsonProvider()); + @Test + public void testStreamingOutputBlockingRead() throws IOException, JAXBException { + final int expectedEntry = 2; + final long expectedDelay = 1000L; + final WebClient webClient = createWebClient("/bookstore/books/streamingoutput", MediaType.TEXT_XML, + new LoggingFeature()); + final long requestSent = System.currentTimeMillis(); + final Response response = webClient + .query("times", expectedEntry) + .query("delay", expectedDelay) + .get(); + assertThat(System.currentTimeMillis() - requestSent, Matchers.lessThan(100L)); + assertEquals(200, response.getStatus()); + assertEquals(-1, response.getLength()); + + try (InputStreamReader inputStreamReader = new InputStreamReader(response.readEntity(InputStream.class)); + BufferedReader reader = new BufferedReader(inputStreamReader)) { + int readEntry = 0; + long previousStartTime = -1L; + String readLine; + while ((readLine = reader.readLine()) != null) { + assertDelayBetweenMessages(previousStartTime, expectedDelay); + final Book book = readBook(readLine); + assertEquals(123L, book.getId()); + assertEquals("CXF in Action", book.getName()); + + readEntry++; + previousStartTime = System.currentTimeMillis(); + } + + assertEquals(expectedEntry, readEntry); + } + + } + + private void assertDelayBetweenMessages(long previous, long expectedDelay) { + if (previous < 0L) { + // no need to assert the delay between messages, because it is the first one + return; + } + final long endTime = System.currentTimeMillis(); + final long delay = endTime - previous; + assertThat(delay, Matchers.greaterThan(expectedDelay - 150L)); + assertThat(delay, Matchers.lessThan(expectedDelay + 150L)); + } + + protected WebClient createWebClient(final String url, final String mediaType) { return WebClient - .create("http://localhost:" + LoggingServer.PORT + url, providers) + .create("http://localhost:" + LoggingServer.PORT) + .path(url) .accept(mediaType); } + + private static Book readBook(String input) throws JAXBException { + final Class resultClass = Book.class; + final Unmarshaller unmarshaller = JAXBContext.newInstance(resultClass) + .createUnmarshaller(); + final StreamSource streamSource = new StreamSource(new StringReader(input)); + + return unmarshaller.unmarshal(streamSource, resultClass).getValue(); + } + + protected WebClient createWebClient(final String url, final String mediaType, Feature feature) { + JAXRSClientFactoryBean bean = createBean("http://localhost:" + LoggingServer.PORT, feature); + return bean.createWebClient() + .path(url) + .accept(mediaType); + } + + private static JAXRSClientFactoryBean createBean(String address, + Feature feature) { + JAXRSClientFactoryBean bean = new JAXRSClientFactoryBean(); + bean.setAddress(address); + bean.setFeatures(Collections.singletonList(feature)); + return bean; + } }