diff --git a/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/LoggingInInterceptor.java b/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/LoggingInInterceptor.java index 436bbf3d16d..6094ef7ceca 100644 --- a/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/LoggingInInterceptor.java +++ b/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/LoggingInInterceptor.java @@ -90,6 +90,29 @@ public void handleMessage(Message message) throws Fault { message.put(LIVE_LOGGING_PROP, Boolean.FALSE); } createExchangeId(message); + + // Check if a TeeInputStream is present (CXF-8096). + // Register a close callback to log when the stream is consumed, + // rather than eagerly consuming the stream here which would block + // on streaming responses. + TeeInputStream tee = message.getContent(TeeInputStream.class); + if (tee != null) { + CachedOutputStream cos = message.getContent(CachedOutputStream.class); + if (cos != null && cos.size() > 0) { + // Stream was already consumed (e.g. SOAP/JAX-WS where body is read + // before PRE_INVOKE). Log immediately. + logMessage(message); + } else { + // Stream not consumed yet — defer logging until stream is closed. + tee.setOnClose(() -> logMessage(message)); + } + return; + } + + logMessage(message); + } + + private void logMessage(Message message) { final LogEvent event = eventMapper.map(message, sensitiveProtocolHeaderNames); if (shouldLogContent(event)) { addContent(message, event); diff --git a/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/TeeInputStream.java b/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/TeeInputStream.java new file mode 100644 index 00000000000..3d72a371478 --- /dev/null +++ b/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/TeeInputStream.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.ext.logging; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.cxf.io.CachedOutputStream; + +/** + * An InputStream wrapper that copies data to a + * CachedOutputStream as it is read, up to a limit. + * This avoids eagerly reading the entire stream, + * which is important for streaming responses + * (CXF-8096). + */ +class TeeInputStream extends FilterInputStream { + private final CachedOutputStream teeCache; + private final int teeLimit; + private int count; + private Runnable closeCallback; + + TeeInputStream(final InputStream source, + final CachedOutputStream cos, + final int lim) { + super(source); + this.teeCache = cos; + this.teeLimit = lim; + } + + void setOnClose(final Runnable callback) { + this.closeCallback = callback; + } + + @Override + public int read() throws IOException { + int b = super.read(); + if (b != -1 && count < teeLimit) { + teeCache.write(b); + count++; + } + return b; + } + + @Override + public int read(final byte[] b, + final int off, + final int len) throws IOException { + int n = super.read(b, off, len); + if (n > 0 && count < teeLimit) { + int toWrite = Math.min(n, teeLimit - count); + teeCache.write(b, off, toWrite); + count += toWrite; + } + return n; + } + + @Override + public void close() throws IOException { + try { + super.close(); + } finally { + teeCache.flush(); + if (closeCallback != null) { + closeCallback.run(); + } + } + } + + CachedOutputStream getCachedOutputStream() { + return teeCache; + } +} diff --git a/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/WireTapIn.java b/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/WireTapIn.java index 9b08c7d736f..a796fe67ad1 100644 --- a/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/WireTapIn.java +++ b/rt/features/logging/src/main/java/org/apache/cxf/ext/logging/WireTapIn.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.Reader; -import java.io.SequenceInputStream; import org.apache.cxf.helpers.IOUtils; import org.apache.cxf.interceptor.Fault; @@ -86,20 +85,20 @@ private void handleInputStream(Message message, InputStream is) throws IOExcepti InputStream bis = is instanceof DelegatingInputStream ? ((DelegatingInputStream)is).getInputStream() : is; - // only copy up to the limit since that's all we need to log - // we can stream the rest - IOUtils.copyAtLeast(bis, bos, limit == -1 ? Integer.MAX_VALUE : limit); - bos.flush(); - bis = new SequenceInputStream(bos.getInputStream(), bis); + // Wrap the stream in a TeeInputStream that copies data to the cache + // as it is consumed, up to the configured limit. This avoids blocking + // on streaming responses (CXF-8096). + int teeLimit = limit == -1 ? Integer.MAX_VALUE : limit; + TeeInputStream tee = new TeeInputStream(bis, bos, teeLimit); // restore the delegating input stream or the input stream if (is instanceof DelegatingInputStream) { - ((DelegatingInputStream)is).setInputStream(bis); + ((DelegatingInputStream)is).setInputStream(tee); } else { - message.setContent(InputStream.class, bis); + message.setContent(InputStream.class, tee); } message.setContent(CachedOutputStream.class, bos); - + message.setContent(TeeInputStream.class, tee); } public void setLimit(int limit) { diff --git a/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/AttributeMaskSensitiveHelperTest.java b/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/AttributeMaskSensitiveHelperTest.java index 925cc9b5152..52c219af344 100644 --- a/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/AttributeMaskSensitiveHelperTest.java +++ b/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/AttributeMaskSensitiveHelperTest.java @@ -133,6 +133,7 @@ public void shouldReplaceSensitiveDataInWithAdd() { intercept.handleMessage(message); } inInterceptor.handleMessage(message); + consumeAndCloseInputStream(message); // Verify LogEvent event = logEventSender.getLogEvent(); @@ -154,6 +155,7 @@ public void shouldReplaceSensitiveDataInWithSet() { intercept.handleMessage(message); } inInterceptor.handleMessage(message); + consumeAndCloseInputStream(message); // Verify LogEvent event = logEventSender.getLogEvent(); @@ -235,6 +237,18 @@ private Message prepareInMessage() { return message; } + private static void consumeAndCloseInputStream(Message message) { + try { + InputStream is = message.getContent(InputStream.class); + if (is != null) { + is.transferTo(OutputStream.nullOutputStream()); + is.close(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + private Message prepareOutMessage() { Message message = new MessageImpl(); message.put(Message.CONTENT_TYPE, contentType); diff --git a/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/MaskSensitiveHelperTest.java b/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/MaskSensitiveHelperTest.java index 761409b3a6a..17d700d8758 100644 --- a/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/MaskSensitiveHelperTest.java +++ b/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/MaskSensitiveHelperTest.java @@ -156,6 +156,7 @@ public void shouldReplaceSensitiveDataInWithAdd() { intercept.handleMessage(message); } inInterceptor.handleMessage(message); + consumeAndCloseInputStream(message); // Verify LogEvent event = logEventSender.getLogEvent(); @@ -177,6 +178,7 @@ public void shouldReplaceSensitiveDataInWithSet() { intercept.handleMessage(message); } inInterceptor.handleMessage(message); + consumeAndCloseInputStream(message); // Verify LogEvent event = logEventSender.getLogEvent(); @@ -256,6 +258,18 @@ private Message prepareInMessage() { return message; } + private static void consumeAndCloseInputStream(Message message) { + try { + InputStream is = message.getContent(InputStream.class); + if (is != null) { + is.transferTo(OutputStream.nullOutputStream()); + is.close(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + private Message prepareOutMessage() { Message message = new MessageImpl(); message.put(Message.CONTENT_TYPE, contentType); diff --git a/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/TransformTest.java b/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/TransformTest.java index 48d53b0274f..4da27d0ade5 100644 --- a/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/TransformTest.java +++ b/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/TransformTest.java @@ -136,10 +136,21 @@ public void transformInboundInterceptorInputStream() { } interceptor.handleMessage(message); + // Consume and close the stream to trigger deferred logging (CXF-8096) + try { + InputStream is = message.getContent(InputStream.class); + if (is != null) { + is.transferTo(OutputStream.nullOutputStream()); + is.close(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + // Verify LogEvent event = logEventSender.getLogEvent(); assertNotNull(event); - assertEquals(TRANSFORMED_LOGGING_CONTENT, event.getPayload()); // only the first byte is read! + assertEquals(TRANSFORMED_LOGGING_CONTENT, event.getPayload()); } @Test diff --git a/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/TruncatedTest.java b/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/TruncatedTest.java index a7567fc15fb..e6597ee5ade 100644 --- a/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/TruncatedTest.java +++ b/rt/features/logging/src/test/java/org/apache/cxf/ext/logging/TruncatedTest.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; import java.io.Reader; import java.io.StringReader; import java.io.StringWriter; @@ -41,6 +43,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -112,6 +115,13 @@ public void truncatedInboundInterceptorInputStream() throws IOException { interceptor.handleMessage(message); + // Consume and close the stream to trigger deferred logging (CXF-8096) + InputStream is = message.getContent(InputStream.class); + if (is != null) { + is.transferTo(OutputStream.nullOutputStream()); + is.close(); + } + LogEvent event = logEventSender.getLogEvent(); assertNotNull(event); assertEquals("T", event.getPayload()); // only the first byte is read! @@ -143,7 +153,53 @@ public void truncatedInboundInterceptorReader() throws IOException { assertEquals("T", event.getPayload()); // only the first byte is read! assertTrue(event.isTruncated()); } - + /** + * CXF-8096: Verify that LoggingFeature does not block when reading from a streaming + * input source. The TeeInputStream approach allows the application to read data + * incrementally while logging is deferred until the stream is closed. + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void streamingInputShouldNotBlockRead() throws Exception { + // Use PipedInputStream to simulate a slow streaming response + PipedOutputStream producer = new PipedOutputStream(); + PipedInputStream slowStream = new PipedInputStream(producer); + + Message message = new MessageImpl(); + message.setContent(InputStream.class, slowStream); + Exchange exchange = new ExchangeImpl(); + message.setExchange(exchange); + LogEventSenderMock logEventSender = new LogEventSenderMock(); + LoggingInInterceptor interceptor = new LoggingInInterceptor(logEventSender); + + // Run WireTapIn + Collection> interceptors = interceptor.getAdditionalInterceptors(); + for (PhaseInterceptor intercept : interceptors) { + intercept.handleMessage(message); + } + + // Run LoggingInInterceptor — should NOT block waiting for stream data + interceptor.handleMessage(message); + + // At this point, logging should be deferred — no event yet + assertNull("Logging should be deferred for streaming input", logEventSender.getLogEvent()); + + // Write some data and close the stream + producer.write("Hello streaming!".getBytes(StandardCharsets.UTF_8)); + producer.close(); + + // Now consume the stream from the application side + InputStream is = message.getContent(InputStream.class); + byte[] data = is.readAllBytes(); + is.close(); + + assertEquals("Hello streaming!", new String(data, StandardCharsets.UTF_8)); + + // After stream close, deferred logging should have fired + LogEvent event = logEventSender.getLogEvent(); + assertNotNull("Log event should be available after stream is closed", event); + assertEquals("Hello streaming!", event.getPayload()); + } } diff --git a/systests/tracing/src/test/java/org/apache/cxf/systest/brave/jaxrs/AbstractBraveTracingTest.java b/systests/tracing/src/test/java/org/apache/cxf/systest/brave/jaxrs/AbstractBraveTracingTest.java index a7db3b275a7..741bd1aebe4 100644 --- a/systests/tracing/src/test/java/org/apache/cxf/systest/brave/jaxrs/AbstractBraveTracingTest.java +++ b/systests/tracing/src/test/java/org/apache/cxf/systest/brave/jaxrs/AbstractBraveTracingTest.java @@ -265,7 +265,7 @@ public void testThatProvidedSpanIsNotClosedWhenActive() throws MalformedURLExcep } // Await till flush happens, usually a second is enough - await().atMost(Duration.ofSeconds(5L)).until(()-> TestSpanHandler.getAllSpans().size() == 4); + await().atMost(Duration.ofSeconds(10L)).until(()-> TestSpanHandler.getAllSpans().size() == 4); assertThat(TestSpanHandler.getAllSpans().size(), equalTo(4)); assertThat(TestSpanHandler.getAllSpans().get(3).name(), equalTo("test span")); @@ -292,7 +292,7 @@ public void testThatProvidedSpanIsNotDetachedWhenActiveUsingAsyncClient() throws } // Await till flush happens, usually a second is enough - await().atMost(Duration.ofSeconds(5L)).until(()-> TestSpanHandler.getAllSpans().size() == 4); + await().atMost(Duration.ofSeconds(10L)).until(()-> TestSpanHandler.getAllSpans().size() == 4); assertThat(TestSpanHandler.getAllSpans().size(), equalTo(4)); assertThat(TestSpanHandler.getAllSpans().get(3).name(), equalTo("test span")); @@ -340,7 +340,7 @@ public void testThatNewSpanIsCreatedOnClientTimeout() { try { client.get(); } finally { - await().atMost(Duration.ofSeconds(5L)).until(()-> TestSpanHandler.getAllSpans().size() == 2); + await().atMost(Duration.ofSeconds(10L)).until(()-> TestSpanHandler.getAllSpans().size() == 2); assertThat(TestSpanHandler.getAllSpans().size(), equalTo(2)); assertThat(TestSpanHandler.getAllSpans().get(0).name(), equalTo("GET " + client.getCurrentURI())); assertThat(TestSpanHandler.getAllSpans().get(0).tags(), hasKey("error")); diff --git a/systests/tracing/src/test/java/org/apache/cxf/systest/brave/jaxws/AbstractBraveTracingTest.java b/systests/tracing/src/test/java/org/apache/cxf/systest/brave/jaxws/AbstractBraveTracingTest.java index 64f6ebbaba1..54a13558cf8 100644 --- a/systests/tracing/src/test/java/org/apache/cxf/systest/brave/jaxws/AbstractBraveTracingTest.java +++ b/systests/tracing/src/test/java/org/apache/cxf/systest/brave/jaxws/AbstractBraveTracingTest.java @@ -204,7 +204,7 @@ public void testThatNewInnerSpanIsCreatedOneway() throws Exception { service.orderBooks(); // Await till flush happens, usually every second - await().atMost(Duration.ofSeconds(5L)).until(() -> TestSpanHandler.getAllSpans().size() == 2); + await().atMost(Duration.ofSeconds(10L)).until(() -> TestSpanHandler.getAllSpans().size() == 2); assertThat(TestSpanHandler.getAllSpans().get(0).name(), equalTo("POST /BookStore")); assertThat(TestSpanHandler.getAllSpans().get(1).name(), diff --git a/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/opentelemetry/OpenTelemetryTracingTest.java b/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/opentelemetry/OpenTelemetryTracingTest.java index 6b218a49df3..919fd777fe2 100644 --- a/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/opentelemetry/OpenTelemetryTracingTest.java +++ b/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/opentelemetry/OpenTelemetryTracingTest.java @@ -257,7 +257,7 @@ public void testThatNewInnerSpanIsCreatedUsingAsyncInvocation() throws Interrupt final Response r = withTrace(createWebClient("/bookstore/books/async")).get(); assertEquals(Status.OK.getStatusCode(), r.getStatus()); - await().atMost(Duration.ofSeconds(5L)).until(() -> otelRule.getSpans().size() == 2); + await().atMost(Duration.ofSeconds(10L)).until(() -> otelRule.getSpans().size() == 2); final List spans = getSpansSorted(); assertThat(spans.size(), equalTo(2)); @@ -287,7 +287,7 @@ public void testThatNewSpanIsCreatedUsingAsyncInvocation() throws InterruptedExc final Response r = createWebClient("/bookstore/books/async").get(); assertEquals(Status.OK.getStatusCode(), r.getStatus()); - await().atMost(Duration.ofSeconds(5L)).until(() -> otelRule.getSpans().size() == 2); + await().atMost(Duration.ofSeconds(10L)).until(() -> otelRule.getSpans().size() == 2); final List spans = getSpansSorted(); assertThat(spans.size(), equalTo(2)); @@ -303,7 +303,7 @@ public void testThatNewSpanIsCreatedWhenNotProvidedUsingAsyncClient() throws Exc final Response r = client.async().get().get(1L, TimeUnit.SECONDS); assertEquals(Status.OK.getStatusCode(), r.getStatus()); - await().atMost(Duration.ofSeconds(5L)).until(() -> otelRule.getSpans().size() == 3); + await().atMost(Duration.ofSeconds(10L)).until(() -> otelRule.getSpans().size() == 3); assertThat(otelRule.getSpans().size(), equalTo(3)); assertThat(otelRule.getSpans().get(0).getName(), equalTo("Get Books")); @@ -373,7 +373,7 @@ public void testThatProvidedSpanIsNotClosedWhenActive() throws MalformedURLExcep } // Await till flush happens, usually every second - await().atMost(Duration.ofSeconds(5L)).until(() -> otelRule.getSpans().size() == 4); + await().atMost(Duration.ofSeconds(10L)).until(() -> otelRule.getSpans().size() == 4); assertThat(otelRule.getSpans().size(), equalTo(4)); assertThat(otelRule.getSpans().get(3).getName(), equalTo("test span")); @@ -392,7 +392,7 @@ public void testThatProvidedSpanIsNotDetachedWhenActiveUsingAsyncClient() throws assertThat(Span.current().getSpanContext().getSpanId(), equalTo(span.getSpanContext().getSpanId())); - await().atMost(Duration.ofSeconds(5L)).until(() -> otelRule.getSpans().size() == 3); + await().atMost(Duration.ofSeconds(10L)).until(() -> otelRule.getSpans().size() == 3); assertThat(otelRule.getSpans().size(), equalTo(3)); assertThat(otelRule.getSpans().get(0).getName(), equalTo("Get Books")); @@ -406,7 +406,7 @@ public void testThatProvidedSpanIsNotDetachedWhenActiveUsingAsyncClient() throws } // Await till flush happens, usually every second - await().atMost(Duration.ofSeconds(5L)).until(() -> otelRule.getSpans().size() == 4); + await().atMost(Duration.ofSeconds(10L)).until(() -> otelRule.getSpans().size() == 4); assertThat(otelRule.getSpans().size(), equalTo(4)); assertThat(otelRule.getSpans().get(3).getName(), equalTo("test span")); @@ -443,7 +443,7 @@ public void testThatNewSpanIsCreatedOnClientTimeout() { try { client.get(); } finally { - await().atMost(Duration.ofSeconds(5L)).until(() -> otelRule.getSpans().size() == 2); + await().atMost(Duration.ofSeconds(10L)).until(() -> otelRule.getSpans().size() == 2); assertThat(otelRule.getSpans().toString(), otelRule.getSpans().size(), equalTo(2)); assertThat(otelRule.getSpans().get(0).getName(), equalTo("GET " + client.getCurrentURI())); assertThat(otelRule.getSpans().get(0).getStatus().getStatusCode(), equalTo(StatusCode.ERROR)); diff --git a/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/opentracing/OpenTracingTracingTest.java b/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/opentracing/OpenTracingTracingTest.java index 6566e213582..c8e3eea60f0 100644 --- a/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/opentracing/OpenTracingTracingTest.java +++ b/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/opentracing/OpenTracingTracingTest.java @@ -199,7 +199,7 @@ public void testThatNewInnerSpanIsCreatedUsingAsyncInvocation() throws Interrupt final Response r = withTrace(createWebClient("/bookstore/books/async"), spanId).get(); assertEquals(Status.OK.getStatusCode(), r.getStatus()); - await().atMost(Duration.ofSeconds(5L)).until(()-> REPORTER.getSpans().size() == 2); + await().atMost(Duration.ofSeconds(10L)).until(()-> REPORTER.getSpans().size() == 2); final List spans = getSpansSorted(); assertThat(spans.size(), equalTo(2)); @@ -226,7 +226,7 @@ public void testThatNewSpanIsCreatedUsingAsyncInvocation() throws InterruptedExc final Response r = createWebClient("/bookstore/books/async").get(); assertEquals(Status.OK.getStatusCode(), r.getStatus()); - await().atMost(Duration.ofSeconds(5L)).until(()-> REPORTER.getSpans().size() == 2); + await().atMost(Duration.ofSeconds(10L)).until(()-> REPORTER.getSpans().size() == 2); final List spans = getSpansSorted(); assertThat(spans.size(), equalTo(2)); @@ -241,7 +241,7 @@ public void testThatNewSpanIsCreatedWhenNotProvidedUsingAsyncClient() throws Exc final Response r = client.async().get().get(1L, TimeUnit.SECONDS); assertEquals(Status.OK.getStatusCode(), r.getStatus()); - await().atMost(Duration.ofSeconds(5L)).until(()-> REPORTER.getSpans().size() == 3); + await().atMost(Duration.ofSeconds(10L)).until(()-> REPORTER.getSpans().size() == 3); assertThat(REPORTER.getSpans().size(), equalTo(3)); assertThat(REPORTER.getSpans().get(0).getOperationName(), equalTo("Get Books")); @@ -325,7 +325,7 @@ public void testThatProvidedSpanIsNotClosedWhenActive() throws MalformedURLExcep } // Await till flush happens, usually every second - await().atMost(Duration.ofSeconds(5L)).until(()-> REPORTER.getSpans().size() == 4); + await().atMost(Duration.ofSeconds(10L)).until(()-> REPORTER.getSpans().size() == 4); assertThat(REPORTER.getSpans().size(), equalTo(4)); assertThat(REPORTER.getSpans().get(3).getOperationName(), equalTo("test span")); @@ -342,7 +342,7 @@ public void testThatProvidedSpanIsNotDetachedWhenActiveUsingAsyncClient() throws assertEquals(Status.OK.getStatusCode(), r.getStatus()); assertThat(tracer.activeSpan().context(), equalTo(span.context())); - await().atMost(Duration.ofSeconds(5L)).until(()-> REPORTER.getSpans().size() == 3); + await().atMost(Duration.ofSeconds(10L)).until(()-> REPORTER.getSpans().size() == 3); assertThat(REPORTER.getSpans().size(), equalTo(3)); assertThat(REPORTER.getSpans().get(0).getOperationName(), equalTo("Get Books")); @@ -356,7 +356,7 @@ public void testThatProvidedSpanIsNotDetachedWhenActiveUsingAsyncClient() throws } // Await till flush happens, usually every second - await().atMost(Duration.ofSeconds(5L)).until(()-> REPORTER.getSpans().size() == 4); + await().atMost(Duration.ofSeconds(10L)).until(()-> REPORTER.getSpans().size() == 4); assertThat(REPORTER.getSpans().size(), equalTo(4)); assertThat(REPORTER.getSpans().get(3).getOperationName(), equalTo("test span")); @@ -391,7 +391,7 @@ public void testThatNewSpanIsCreatedOnClientTimeout() { try { client.get(); } finally { - await().atMost(Duration.ofSeconds(5L)).until(()-> REPORTER.getSpans().size() == 2); + await().atMost(Duration.ofSeconds(10L)).until(()-> REPORTER.getSpans().size() == 2); assertThat(REPORTER.getSpans().toString(), REPORTER.getSpans().size(), equalTo(2)); assertThat(REPORTER.getSpans().get(0).getOperationName(), equalTo("GET " + client.getCurrentURI())); assertThat(REPORTER.getSpans().get(0).getTags(), hasItem(Tags.ERROR.getKey(), Boolean.TRUE)); diff --git a/systests/tracing/src/test/java/org/apache/cxf/systest/jaxws/tracing/opentelemetry/OpenTelemetryTracingTest.java b/systests/tracing/src/test/java/org/apache/cxf/systest/jaxws/tracing/opentelemetry/OpenTelemetryTracingTest.java index 295ae79fd79..b41c42b8996 100644 --- a/systests/tracing/src/test/java/org/apache/cxf/systest/jaxws/tracing/opentelemetry/OpenTelemetryTracingTest.java +++ b/systests/tracing/src/test/java/org/apache/cxf/systest/jaxws/tracing/opentelemetry/OpenTelemetryTracingTest.java @@ -247,7 +247,7 @@ public void testThatProvidedSpanIsNotClosedWhenActive() throws Exception { } // Await till flush happens, usually every second - await().atMost(Duration.ofSeconds(5L)).until(() -> otelRule.getSpans().size() == 4); + await().atMost(Duration.ofSeconds(10L)).until(() -> otelRule.getSpans().size() == 4); assertThat(otelRule.getSpans().size(), equalTo(4)); assertThat(otelRule.getSpans().get(3).getName(), equalTo("test span")); @@ -331,7 +331,7 @@ public void testThatNewInnerSpanIsCreatedOneway() throws Exception { service.orderBooks(); // Await till flush happens, usually every second - await().atMost(Duration.ofSeconds(5L)).until(() -> otelRule.getSpans().size() == 2); + await().atMost(Duration.ofSeconds(10L)).until(() -> otelRule.getSpans().size() == 2); assertThat(otelRule.getSpans().get(0).getName(), equalTo("POST /BookStore")); assertThat(otelRule.getSpans().get(1).getName(), diff --git a/systests/tracing/src/test/java/org/apache/cxf/systest/jaxws/tracing/opentracing/OpenTracingTracingTest.java b/systests/tracing/src/test/java/org/apache/cxf/systest/jaxws/tracing/opentracing/OpenTracingTracingTest.java index bdbdceb6f06..ab0103acc30 100644 --- a/systests/tracing/src/test/java/org/apache/cxf/systest/jaxws/tracing/opentracing/OpenTracingTracingTest.java +++ b/systests/tracing/src/test/java/org/apache/cxf/systest/jaxws/tracing/opentracing/OpenTracingTracingTest.java @@ -178,7 +178,7 @@ public void testThatProvidedSpanIsNotClosedWhenActive() throws Exception { } // Await till flush happens, usually every second - await().atMost(Duration.ofSeconds(5L)).until(()-> REPORTER.getSpans().size() == 4); + await().atMost(Duration.ofSeconds(10L)).until(()-> REPORTER.getSpans().size() == 4); assertThat(REPORTER.getSpans().size(), equalTo(4)); assertThat(REPORTER.getSpans().get(3).getOperationName(), equalTo("test span")); @@ -234,7 +234,7 @@ public void testThatNewInnerSpanIsCreatedOneway() throws Exception { service.orderBooks(); // Await till flush happens, usually every second - await().atMost(Duration.ofSeconds(5L)).until(() -> REPORTER.getSpans().size() == 2); + await().atMost(Duration.ofSeconds(10L)).until(() -> REPORTER.getSpans().size() == 2); assertThat(REPORTER.getSpans().get(0).getOperationName(), equalTo("POST /BookStore")); assertThat(REPORTER.getSpans().get(1).getOperationName(),