Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,29 @@ public void handleMessage(Message message) throws Fault {
message.put(LIVE_LOGGING_PROP, Boolean.FALSE);
}
createExchangeId(message);

// Check if a TeeInputStream is present (CXF-8096).
// Register a close callback to log when the stream is consumed,
// rather than eagerly consuming the stream here which would block
// on streaming responses.
TeeInputStream tee = message.getContent(TeeInputStream.class);
if (tee != null) {
CachedOutputStream cos = message.getContent(CachedOutputStream.class);
if (cos != null && cos.size() > 0) {
// Stream was already consumed (e.g. SOAP/JAX-WS where body is read
// before PRE_INVOKE). Log immediately.
logMessage(message);
} else {
// Stream not consumed yet — defer logging until stream is closed.
tee.setOnClose(() -> logMessage(message));
}
return;
}

logMessage(message);
}

private void logMessage(Message message) {
final LogEvent event = eventMapper.map(message, sensitiveProtocolHeaderNames);
if (shouldLogContent(event)) {
addContent(message, event);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cxf.ext.logging;

import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;

import org.apache.cxf.io.CachedOutputStream;

/**
* An InputStream wrapper that copies data to a
* CachedOutputStream as it is read, up to a limit.
* This avoids eagerly reading the entire stream,
* which is important for streaming responses
* (CXF-8096).
*/
class TeeInputStream extends FilterInputStream {
private final CachedOutputStream teeCache;
private final int teeLimit;
private int count;
private Runnable closeCallback;

TeeInputStream(final InputStream source,
final CachedOutputStream cos,
final int lim) {
super(source);
this.teeCache = cos;
this.teeLimit = lim;
}

void setOnClose(final Runnable callback) {
this.closeCallback = callback;
}

@Override
public int read() throws IOException {
int b = super.read();
if (b != -1 && count < teeLimit) {
teeCache.write(b);
count++;
}
return b;
}

@Override
public int read(final byte[] b,
final int off,
final int len) throws IOException {
int n = super.read(b, off, len);
if (n > 0 && count < teeLimit) {
int toWrite = Math.min(n, teeLimit - count);
teeCache.write(b, off, toWrite);
count += toWrite;
}
return n;
}

@Override
public void close() throws IOException {
try {
super.close();
} finally {
teeCache.flush();
if (closeCallback != null) {
closeCallback.run();
}
}
}

CachedOutputStream getCachedOutputStream() {
return teeCache;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.io.SequenceInputStream;

import org.apache.cxf.helpers.IOUtils;
import org.apache.cxf.interceptor.Fault;
Expand Down Expand Up @@ -86,20 +85,20 @@ private void handleInputStream(Message message, InputStream is) throws IOExcepti
InputStream bis = is instanceof DelegatingInputStream
? ((DelegatingInputStream)is).getInputStream() : is;

// only copy up to the limit since that's all we need to log
// we can stream the rest
IOUtils.copyAtLeast(bis, bos, limit == -1 ? Integer.MAX_VALUE : limit);
bos.flush();
bis = new SequenceInputStream(bos.getInputStream(), bis);
// Wrap the stream in a TeeInputStream that copies data to the cache
// as it is consumed, up to the configured limit. This avoids blocking
// on streaming responses (CXF-8096).
int teeLimit = limit == -1 ? Integer.MAX_VALUE : limit;
TeeInputStream tee = new TeeInputStream(bis, bos, teeLimit);

// restore the delegating input stream or the input stream
if (is instanceof DelegatingInputStream) {
((DelegatingInputStream)is).setInputStream(bis);
((DelegatingInputStream)is).setInputStream(tee);
} else {
message.setContent(InputStream.class, bis);
message.setContent(InputStream.class, tee);
}
message.setContent(CachedOutputStream.class, bos);

message.setContent(TeeInputStream.class, tee);
}

public void setLimit(int limit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public void shouldReplaceSensitiveDataInWithAdd() {
intercept.handleMessage(message);
}
inInterceptor.handleMessage(message);
consumeAndCloseInputStream(message);

// Verify
LogEvent event = logEventSender.getLogEvent();
Expand All @@ -154,6 +155,7 @@ public void shouldReplaceSensitiveDataInWithSet() {
intercept.handleMessage(message);
}
inInterceptor.handleMessage(message);
consumeAndCloseInputStream(message);

// Verify
LogEvent event = logEventSender.getLogEvent();
Expand Down Expand Up @@ -235,6 +237,18 @@ private Message prepareInMessage() {
return message;
}

private static void consumeAndCloseInputStream(Message message) {
try {
InputStream is = message.getContent(InputStream.class);
if (is != null) {
is.transferTo(OutputStream.nullOutputStream());
is.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private Message prepareOutMessage() {
Message message = new MessageImpl();
message.put(Message.CONTENT_TYPE, contentType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public void shouldReplaceSensitiveDataInWithAdd() {
intercept.handleMessage(message);
}
inInterceptor.handleMessage(message);
consumeAndCloseInputStream(message);

// Verify
LogEvent event = logEventSender.getLogEvent();
Expand All @@ -177,6 +178,7 @@ public void shouldReplaceSensitiveDataInWithSet() {
intercept.handleMessage(message);
}
inInterceptor.handleMessage(message);
consumeAndCloseInputStream(message);

// Verify
LogEvent event = logEventSender.getLogEvent();
Expand Down Expand Up @@ -256,6 +258,18 @@ private Message prepareInMessage() {
return message;
}

private static void consumeAndCloseInputStream(Message message) {
try {
InputStream is = message.getContent(InputStream.class);
if (is != null) {
is.transferTo(OutputStream.nullOutputStream());
is.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private Message prepareOutMessage() {
Message message = new MessageImpl();
message.put(Message.CONTENT_TYPE, contentType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,21 @@ public void transformInboundInterceptorInputStream() {
}
interceptor.handleMessage(message);

// Consume and close the stream to trigger deferred logging (CXF-8096)
try {
InputStream is = message.getContent(InputStream.class);
if (is != null) {
is.transferTo(OutputStream.nullOutputStream());
is.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}

// Verify
LogEvent event = logEventSender.getLogEvent();
assertNotNull(event);
assertEquals(TRANSFORMED_LOGGING_CONTENT, event.getPayload()); // only the first byte is read!
assertEquals(TRANSFORMED_LOGGING_CONTENT, event.getPayload());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.Reader;
import java.io.StringReader;
import java.io.StringWriter;
Expand All @@ -41,6 +43,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;


Expand Down Expand Up @@ -112,6 +115,13 @@ public void truncatedInboundInterceptorInputStream() throws IOException {

interceptor.handleMessage(message);

// Consume and close the stream to trigger deferred logging (CXF-8096)
InputStream is = message.getContent(InputStream.class);
if (is != null) {
is.transferTo(OutputStream.nullOutputStream());
is.close();
}

LogEvent event = logEventSender.getLogEvent();
assertNotNull(event);
assertEquals("T", event.getPayload()); // only the first byte is read!
Expand Down Expand Up @@ -143,7 +153,53 @@ public void truncatedInboundInterceptorReader() throws IOException {
assertEquals("T", event.getPayload()); // only the first byte is read!
assertTrue(event.isTruncated());
}


/**
* CXF-8096: Verify that LoggingFeature does not block when reading from a streaming
* input source. The TeeInputStream approach allows the application to read data
* incrementally while logging is deferred until the stream is closed.
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void streamingInputShouldNotBlockRead() throws Exception {
// Use PipedInputStream to simulate a slow streaming response
PipedOutputStream producer = new PipedOutputStream();
PipedInputStream slowStream = new PipedInputStream(producer);

Message message = new MessageImpl();
message.setContent(InputStream.class, slowStream);
Exchange exchange = new ExchangeImpl();
message.setExchange(exchange);
LogEventSenderMock logEventSender = new LogEventSenderMock();
LoggingInInterceptor interceptor = new LoggingInInterceptor(logEventSender);

// Run WireTapIn
Collection<PhaseInterceptor<? extends Message>> interceptors = interceptor.getAdditionalInterceptors();
for (PhaseInterceptor intercept : interceptors) {
intercept.handleMessage(message);
}

// Run LoggingInInterceptor — should NOT block waiting for stream data
interceptor.handleMessage(message);

// At this point, logging should be deferred — no event yet
assertNull("Logging should be deferred for streaming input", logEventSender.getLogEvent());

// Write some data and close the stream
producer.write("Hello streaming!".getBytes(StandardCharsets.UTF_8));
producer.close();

// Now consume the stream from the application side
InputStream is = message.getContent(InputStream.class);
byte[] data = is.readAllBytes();
is.close();

assertEquals("Hello streaming!", new String(data, StandardCharsets.UTF_8));

// After stream close, deferred logging should have fired
LogEvent event = logEventSender.getLogEvent();
assertNotNull("Log event should be available after stream is closed", event);
assertEquals("Hello streaming!", event.getPayload());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public void testThatProvidedSpanIsNotClosedWhenActive() throws MalformedURLExcep
}

// Await till flush happens, usually a second is enough
await().atMost(Duration.ofSeconds(5L)).until(()-> TestSpanHandler.getAllSpans().size() == 4);
await().atMost(Duration.ofSeconds(10L)).until(()-> TestSpanHandler.getAllSpans().size() == 4);

assertThat(TestSpanHandler.getAllSpans().size(), equalTo(4));
assertThat(TestSpanHandler.getAllSpans().get(3).name(), equalTo("test span"));
Expand All @@ -292,7 +292,7 @@ public void testThatProvidedSpanIsNotDetachedWhenActiveUsingAsyncClient() throws
}

// Await till flush happens, usually a second is enough
await().atMost(Duration.ofSeconds(5L)).until(()-> TestSpanHandler.getAllSpans().size() == 4);
await().atMost(Duration.ofSeconds(10L)).until(()-> TestSpanHandler.getAllSpans().size() == 4);

assertThat(TestSpanHandler.getAllSpans().size(), equalTo(4));
assertThat(TestSpanHandler.getAllSpans().get(3).name(), equalTo("test span"));
Expand Down Expand Up @@ -340,7 +340,7 @@ public void testThatNewSpanIsCreatedOnClientTimeout() {
try {
client.get();
} finally {
await().atMost(Duration.ofSeconds(5L)).until(()-> TestSpanHandler.getAllSpans().size() == 2);
await().atMost(Duration.ofSeconds(10L)).until(()-> TestSpanHandler.getAllSpans().size() == 2);
assertThat(TestSpanHandler.getAllSpans().size(), equalTo(2));
assertThat(TestSpanHandler.getAllSpans().get(0).name(), equalTo("GET " + client.getCurrentURI()));
assertThat(TestSpanHandler.getAllSpans().get(0).tags(), hasKey("error"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void testThatNewInnerSpanIsCreatedOneway() throws Exception {
service.orderBooks();

// Await till flush happens, usually every second
await().atMost(Duration.ofSeconds(5L)).until(() -> TestSpanHandler.getAllSpans().size() == 2);
await().atMost(Duration.ofSeconds(10L)).until(() -> TestSpanHandler.getAllSpans().size() == 2);

assertThat(TestSpanHandler.getAllSpans().get(0).name(), equalTo("POST /BookStore"));
assertThat(TestSpanHandler.getAllSpans().get(1).name(),
Expand Down
Loading
Loading