diff --git a/src/main/java/com/rabbitmq/stream/compression/CompressionUtils.java b/src/main/java/com/rabbitmq/stream/compression/CompressionUtils.java index 46090e5f55..779e17954d 100644 --- a/src/main/java/com/rabbitmq/stream/compression/CompressionUtils.java +++ b/src/main/java/com/rabbitmq/stream/compression/CompressionUtils.java @@ -15,7 +15,7 @@ package com.rabbitmq.stream.compression; import com.github.luben.zstd.Zstd; -import com.github.luben.zstd.ZstdInputStream; +import com.github.luben.zstd.ZstdInputStreamNoFinalizer; import com.github.luben.zstd.ZstdOutputStream; import java.io.IOException; import java.io.InputStream; @@ -101,7 +101,7 @@ public OutputStream compress(OutputStream outputStream) { @Override public InputStream decompress(InputStream inputStream) { try { - return new ZstdInputStream(inputStream); + return new ZstdInputStreamNoFinalizer(inputStream); } catch (IOException e) { throw new CompressionException("Error while creating Zstd compression input stream", e); } diff --git a/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java b/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java index c6a1141893..786384eb53 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java +++ b/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java @@ -562,16 +562,23 @@ static int handleDeliver( if (comp.code() != Compression.NONE.code()) { CompressionCodec compressionCodec = client.compressionCodecFactory.get(comp); ByteBuf outBb = client.channel.alloc().heapBuffer(uncompressedDataSize); - ByteBuf slice = message.slice(message.readerIndex(), dataSize); - InputStream inputStream = compressionCodec.decompress(new ByteBufInputStream(slice)); byte[] inBuffer = new byte[Math.min(uncompressedDataSize, 1024)]; int n; + ByteBuf slice = message.slice(message.readerIndex(), dataSize); + InputStream inputStream = compressionCodec.decompress(new ByteBufInputStream(slice)); try { while (-1 != (n = inputStream.read(inBuffer))) { outBb.writeBytes(inBuffer, 0, n); } } catch (IOException e) { throw new StreamException("Error while uncompressing sub-entry", e); + } finally { + try { + inputStream.close(); + } catch (IOException e) { + throw new StreamException( + "Error while closing sub-entry compressed input stream", e); + } } message.readerIndex(message.readerIndex() + dataSize); bbToReadFrom = outBb;