diff --git a/build.gradle b/build.gradle index 261dfabf0412..d12cd44d24eb 100644 --- a/build.gradle +++ b/build.gradle @@ -384,6 +384,7 @@ project(':iceberg-core') { } implementation libs.aircompressor + implementation libs.lz4Java implementation libs.httpcomponents.httpclient5 implementation platform(libs.jackson.bom) implementation libs.jackson.core diff --git a/core/src/main/java/org/apache/iceberg/puffin/PuffinFormat.java b/core/src/main/java/org/apache/iceberg/puffin/PuffinFormat.java index 7a2ee61612a9..84c2fdaa7f73 100644 --- a/core/src/main/java/org/apache/iceberg/puffin/PuffinFormat.java +++ b/core/src/main/java/org/apache/iceberg/puffin/PuffinFormat.java @@ -21,13 +21,18 @@ import io.airlift.compress.Compressor; import io.airlift.compress.zstd.ZstdCompressor; import io.airlift.compress.zstd.ZstdDecompressor; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.util.Map; import java.util.function.Function; import java.util.stream.Stream; import javax.annotation.Nullable; +import net.jpountz.lz4.LZ4FrameInputStream; +import net.jpountz.lz4.LZ4FrameOutputStream; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.util.ByteBuffers; @@ -108,9 +113,7 @@ static ByteBuffer compress(PuffinCompressionCodec codec, ByteBuffer input) { case NONE: return input.duplicate(); case LZ4: - // TODO requires LZ4 frame compressor, e.g. - // https://github.com/airlift/aircompressor/pull/142 - break; + return compressLz4(input); case ZSTD: return compress(new ZstdCompressor(), input); } @@ -130,9 +133,7 @@ static ByteBuffer decompress(PuffinCompressionCodec codec, ByteBuffer input) { return input.duplicate(); case LZ4: - // TODO requires LZ4 frame decompressor, e.g. - // https://github.com/airlift/aircompressor/pull/142 - break; + return decompressLz4(input); case ZSTD: return decompressZstd(input); @@ -141,6 +142,33 @@ static ByteBuffer decompress(PuffinCompressionCodec codec, ByteBuffer input) { throw new UnsupportedOperationException("Unsupported codec: " + codec); } + private static ByteBuffer compressLz4(ByteBuffer input) { + byte[] inputBytes = ByteBuffers.toByteArray(input); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (LZ4FrameOutputStream lz4Out = + new LZ4FrameOutputStream( + baos, + LZ4FrameOutputStream.BLOCKSIZE.SIZE_4MB, + inputBytes.length, + LZ4FrameOutputStream.FLG.Bits.BLOCK_INDEPENDENCE, + LZ4FrameOutputStream.FLG.Bits.CONTENT_SIZE)) { + lz4Out.write(inputBytes); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return ByteBuffer.wrap(baos.toByteArray()); + } + + private static ByteBuffer decompressLz4(ByteBuffer input) { + byte[] inputBytes = ByteBuffers.toByteArray(input); + try (LZ4FrameInputStream lz4In = + new LZ4FrameInputStream(new ByteArrayInputStream(inputBytes))) { + return ByteBuffer.wrap(lz4In.readAllBytes()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + private static ByteBuffer decompressZstd(ByteBuffer input) { byte[] inputBytes; int inputOffset; diff --git a/core/src/test/java/org/apache/iceberg/puffin/TestPuffinFormat.java b/core/src/test/java/org/apache/iceberg/puffin/TestPuffinFormat.java index 896c8fe5d10c..b40e7900f678 100644 --- a/core/src/test/java/org/apache/iceberg/puffin/TestPuffinFormat.java +++ b/core/src/test/java/org/apache/iceberg/puffin/TestPuffinFormat.java @@ -26,6 +26,7 @@ import java.io.ByteArrayOutputStream; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.junit.jupiter.api.Test; @@ -83,4 +84,30 @@ private byte[] bytes(int... unsignedBytes) { } return bytes; } + + @Test + void testLz4CompressDecompressRoundTrip() { + byte[] original = + "some test data for LZ4 compression round-trip".getBytes(StandardCharsets.UTF_8); + ByteBuffer input = ByteBuffer.wrap(original); + + ByteBuffer compressed = PuffinFormat.compress(PuffinCompressionCodec.LZ4, input); + assertThat(compressed.remaining()).isGreaterThan(0); + + ByteBuffer decompressed = PuffinFormat.decompress(PuffinCompressionCodec.LZ4, compressed); + byte[] result = new byte[decompressed.remaining()]; + decompressed.get(result); + assertThat(result).isEqualTo(original); + } + + @Test + void testLz4CompressDecompressEmpty() { + ByteBuffer input = ByteBuffer.wrap(new byte[0]); + + ByteBuffer compressed = PuffinFormat.compress(PuffinCompressionCodec.LZ4, input); + assertThat(compressed.remaining()).isGreaterThan(0); + + ByteBuffer decompressed = PuffinFormat.decompress(PuffinCompressionCodec.LZ4, compressed); + assertThat(decompressed.remaining()).isEqualTo(0); + } } diff --git a/core/src/test/java/org/apache/iceberg/puffin/TestPuffinWriter.java b/core/src/test/java/org/apache/iceberg/puffin/TestPuffinWriter.java index 337fff817ad9..d22a907afe4c 100644 --- a/core/src/test/java/org/apache/iceberg/puffin/TestPuffinWriter.java +++ b/core/src/test/java/org/apache/iceberg/puffin/TestPuffinWriter.java @@ -19,6 +19,7 @@ package org.apache.iceberg.puffin; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.iceberg.puffin.PuffinCompressionCodec.LZ4; import static org.apache.iceberg.puffin.PuffinCompressionCodec.NONE; import static org.apache.iceberg.puffin.PuffinCompressionCodec.ZSTD; import static org.apache.iceberg.puffin.PuffinFormatTestUtil.EMPTY_PUFFIN_UNCOMPRESSED_FOOTER_SIZE; @@ -46,19 +47,22 @@ public class TestPuffinWriter { @TempDir private Path temp; @Test - public void testEmptyFooterCompressed() { + public void testEmptyFooterCompressed() throws Exception { InMemoryOutputFile outputFile = new InMemoryOutputFile(); - PuffinWriter writer = Puffin.write(outputFile).compressFooter().build(); - assertThatThrownBy(writer::footerSize) - .isInstanceOf(IllegalStateException.class) - .hasMessage("Footer not written yet"); - assertThatThrownBy(writer::finish) - .isInstanceOf(UnsupportedOperationException.class) - .hasMessage("Unsupported codec: LZ4"); - assertThatThrownBy(writer::close) - .isInstanceOf(UnsupportedOperationException.class) - .hasMessage("Unsupported codec: LZ4"); + try (PuffinWriter writer = Puffin.write(outputFile).compressFooter().build()) { + assertThatThrownBy(writer::footerSize) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Footer not written yet"); + writer.finish(); + assertThat(writer.footerSize()).isGreaterThan(0); + assertThat(writer.writtenBlobsMetadata()).isEmpty(); + } + + // Verify the compressed puffin file can be read back + try (PuffinReader reader = Puffin.read(outputFile.toInputFile()).build()) { + assertThat(reader.fileMetadata().properties()).isEmpty(); + } } @Test @@ -98,6 +102,48 @@ public void testWriteMetricDataCompressedZstd() throws Exception { testWriteMetric(ZSTD, "v1/sample-metric-data-compressed-zstd.bin"); } + @Test + public void testWriteAndReadMetricDataCompressedLz4() throws Exception { + InMemoryOutputFile outputFile = new InMemoryOutputFile(); + try (PuffinWriter writer = Puffin.write(outputFile).createdBy("Test 1234").build()) { + writer.add( + new Blob( + "some-blob", + ImmutableList.of(1), + 2, + 1, + ByteBuffer.wrap("abcdefghi".getBytes(UTF_8)), + LZ4, + ImmutableMap.of())); + + byte[] bytes = + "xxx some blob \u0000 binary data 🤯 that is not very very very very very very long, is it? xxx" + .getBytes(UTF_8); + writer.add( + new Blob( + "some-other-blob", + ImmutableList.of(2), + 2, + 1, + ByteBuffer.wrap(bytes, 4, bytes.length - 8), + LZ4, + ImmutableMap.of())); + } + + // Read back and verify decompression produces correct data + try (PuffinReader reader = Puffin.read(outputFile.toInputFile()).build()) { + assertThat(reader.fileMetadata().blobs()).hasSize(2); + + BlobMetadata firstMeta = reader.fileMetadata().blobs().get(0); + assertThat(firstMeta.type()).isEqualTo("some-blob"); + assertThat(firstMeta.compressionCodec()).isEqualTo("lz4"); + + BlobMetadata secondMeta = reader.fileMetadata().blobs().get(1); + assertThat(secondMeta.type()).isEqualTo("some-other-blob"); + assertThat(secondMeta.compressionCodec()).isEqualTo("lz4"); + } + } + @ParameterizedTest @CsvSource({"true, 158", "false, 122"}) public void testFileSizeCalculation(boolean isEncrypted, long expectedSize) throws Exception { diff --git a/flink/v1.20/flink-runtime/LICENSE b/flink/v1.20/flink-runtime/LICENSE index 36a03cb4fcf9..dfd2fb783ac6 100644 --- a/flink/v1.20/flink-runtime/LICENSE +++ b/flink/v1.20/flink-runtime/LICENSE @@ -219,6 +219,14 @@ License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2 -------------------------------------------------------------------------------- +This product bundles lz4-java. + +Copyright: 2020 Adrien Grand and the lz4-java contributors +Project URL: https://github.com/lz4/lz4-java +License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + This product bundles Apache Parquet. Copyright: 2014-2020 The Apache Software Foundation. diff --git a/flink/v1.20/flink-runtime/runtime-deps.txt b/flink/v1.20/flink-runtime/runtime-deps.txt index 7c7aed1e4357..fe6b924c9ae7 100644 --- a/flink/v1.20/flink-runtime/runtime-deps.txt +++ b/flink/v1.20/flink-runtime/runtime-deps.txt @@ -1,3 +1,4 @@ +at.yawk.lz4:lz4-java:1.11.0 com.fasterxml.jackson.core:jackson-annotations:2.21 com.fasterxml.jackson.core:jackson-core:2.21.2 com.fasterxml.jackson.core:jackson-databind:2.21.2 diff --git a/flink/v2.0/flink-runtime/LICENSE b/flink/v2.0/flink-runtime/LICENSE index 36a03cb4fcf9..dfd2fb783ac6 100644 --- a/flink/v2.0/flink-runtime/LICENSE +++ b/flink/v2.0/flink-runtime/LICENSE @@ -219,6 +219,14 @@ License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2 -------------------------------------------------------------------------------- +This product bundles lz4-java. + +Copyright: 2020 Adrien Grand and the lz4-java contributors +Project URL: https://github.com/lz4/lz4-java +License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + This product bundles Apache Parquet. Copyright: 2014-2020 The Apache Software Foundation. diff --git a/flink/v2.0/flink-runtime/runtime-deps.txt b/flink/v2.0/flink-runtime/runtime-deps.txt index c70e3fbba92c..e95876560a9f 100644 --- a/flink/v2.0/flink-runtime/runtime-deps.txt +++ b/flink/v2.0/flink-runtime/runtime-deps.txt @@ -1,3 +1,4 @@ +at.yawk.lz4:lz4-java:1.11.0 com.fasterxml.jackson.core:jackson-annotations:2.21 com.fasterxml.jackson.core:jackson-core:2.21.2 com.fasterxml.jackson.core:jackson-databind:2.21.2 diff --git a/flink/v2.1/flink-runtime/LICENSE b/flink/v2.1/flink-runtime/LICENSE index 36a03cb4fcf9..dfd2fb783ac6 100644 --- a/flink/v2.1/flink-runtime/LICENSE +++ b/flink/v2.1/flink-runtime/LICENSE @@ -219,6 +219,14 @@ License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2 -------------------------------------------------------------------------------- +This product bundles lz4-java. + +Copyright: 2020 Adrien Grand and the lz4-java contributors +Project URL: https://github.com/lz4/lz4-java +License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + This product bundles Apache Parquet. Copyright: 2014-2020 The Apache Software Foundation. diff --git a/flink/v2.1/flink-runtime/runtime-deps.txt b/flink/v2.1/flink-runtime/runtime-deps.txt index 3dfc56f15ea9..8c884b2933fc 100644 --- a/flink/v2.1/flink-runtime/runtime-deps.txt +++ b/flink/v2.1/flink-runtime/runtime-deps.txt @@ -1,3 +1,4 @@ +at.yawk.lz4:lz4-java:1.11.0 com.fasterxml.jackson.core:jackson-annotations:2.21 com.fasterxml.jackson.core:jackson-core:2.21.2 com.fasterxml.jackson.core:jackson-databind:2.21.2 diff --git a/kafka-connect/kafka-connect-runtime/hive/LICENSE b/kafka-connect/kafka-connect-runtime/hive/LICENSE index 0d777ba54bba..4f31d60c24ae 100644 --- a/kafka-connect/kafka-connect-runtime/hive/LICENSE +++ b/kafka-connect/kafka-connect-runtime/hive/LICENSE @@ -252,6 +252,14 @@ License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2 -------------------------------------------------------------------------------- +This product bundles lz4-java. + +Copyright: 2020 Adrien Grand and the lz4-java contributors +Project URL: https://github.com/lz4/lz4-java +License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + This product bundles Woodstox. Project URL: https://github.com/FasterXML/woodstox diff --git a/kafka-connect/kafka-connect-runtime/main/LICENSE b/kafka-connect/kafka-connect-runtime/main/LICENSE index 2f93e2793f77..eb0c76b2b51e 100644 --- a/kafka-connect/kafka-connect-runtime/main/LICENSE +++ b/kafka-connect/kafka-connect-runtime/main/LICENSE @@ -238,6 +238,14 @@ License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2 -------------------------------------------------------------------------------- +This product bundles lz4-java. + +Copyright: 2020 Adrien Grand and the lz4-java contributors +Project URL: https://github.com/lz4/lz4-java +License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + This product bundles Woodstox. Project URL: https://github.com/FasterXML/woodstox diff --git a/kafka-connect/kafka-connect-runtime/runtime-deps.txt b/kafka-connect/kafka-connect-runtime/runtime-deps.txt index 98b7ced14217..37976bc061b7 100644 --- a/kafka-connect/kafka-connect-runtime/runtime-deps.txt +++ b/kafka-connect/kafka-connect-runtime/runtime-deps.txt @@ -1,3 +1,4 @@ +at.yawk.lz4:lz4-java:1.11.0 com.azure:azure-core-http-netty:1.16.3 com.azure:azure-core:1.57.1 com.azure:azure-identity:1.18.2 diff --git a/spark/v3.4/spark-runtime/LICENSE b/spark/v3.4/spark-runtime/LICENSE index a67296eb412c..28b086ffd54a 100644 --- a/spark/v3.4/spark-runtime/LICENSE +++ b/spark/v3.4/spark-runtime/LICENSE @@ -219,6 +219,14 @@ License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2 -------------------------------------------------------------------------------- +This product bundles lz4-java. + +Copyright: 2020 Adrien Grand and the lz4-java contributors +Project URL: https://github.com/lz4/lz4-java +License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + This product bundles Apache Parquet. Copyright: 2014-2024 The Apache Software Foundation diff --git a/spark/v3.4/spark-runtime/runtime-deps.txt b/spark/v3.4/spark-runtime/runtime-deps.txt index fa0b58c856c0..33f351946d38 100644 --- a/spark/v3.4/spark-runtime/runtime-deps.txt +++ b/spark/v3.4/spark-runtime/runtime-deps.txt @@ -1,3 +1,4 @@ +at.yawk.lz4:lz4-java:1.11.0 com.fasterxml.jackson.core:jackson-annotations:2.21 com.fasterxml.jackson.core:jackson-core:2.14.2 com.fasterxml.jackson.core:jackson-databind:2.14.2 diff --git a/spark/v3.5/spark-runtime/LICENSE b/spark/v3.5/spark-runtime/LICENSE index a67296eb412c..28b086ffd54a 100644 --- a/spark/v3.5/spark-runtime/LICENSE +++ b/spark/v3.5/spark-runtime/LICENSE @@ -219,6 +219,14 @@ License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2 -------------------------------------------------------------------------------- +This product bundles lz4-java. + +Copyright: 2020 Adrien Grand and the lz4-java contributors +Project URL: https://github.com/lz4/lz4-java +License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + This product bundles Apache Parquet. Copyright: 2014-2024 The Apache Software Foundation diff --git a/spark/v3.5/spark-runtime/runtime-deps.txt b/spark/v3.5/spark-runtime/runtime-deps.txt index 9a087517cbb0..ce6906fe3428 100644 --- a/spark/v3.5/spark-runtime/runtime-deps.txt +++ b/spark/v3.5/spark-runtime/runtime-deps.txt @@ -1,3 +1,4 @@ +at.yawk.lz4:lz4-java:1.11.0 com.fasterxml.jackson.core:jackson-annotations:2.21 com.fasterxml.jackson.core:jackson-core:2.15.2 com.fasterxml.jackson.core:jackson-databind:2.15.2 diff --git a/spark/v4.0/spark-runtime/LICENSE b/spark/v4.0/spark-runtime/LICENSE index a67296eb412c..28b086ffd54a 100644 --- a/spark/v4.0/spark-runtime/LICENSE +++ b/spark/v4.0/spark-runtime/LICENSE @@ -219,6 +219,14 @@ License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2 -------------------------------------------------------------------------------- +This product bundles lz4-java. + +Copyright: 2020 Adrien Grand and the lz4-java contributors +Project URL: https://github.com/lz4/lz4-java +License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + This product bundles Apache Parquet. Copyright: 2014-2024 The Apache Software Foundation diff --git a/spark/v4.0/spark-runtime/runtime-deps.txt b/spark/v4.0/spark-runtime/runtime-deps.txt index 9a087517cbb0..ce6906fe3428 100644 --- a/spark/v4.0/spark-runtime/runtime-deps.txt +++ b/spark/v4.0/spark-runtime/runtime-deps.txt @@ -1,3 +1,4 @@ +at.yawk.lz4:lz4-java:1.11.0 com.fasterxml.jackson.core:jackson-annotations:2.21 com.fasterxml.jackson.core:jackson-core:2.15.2 com.fasterxml.jackson.core:jackson-databind:2.15.2 diff --git a/spark/v4.1/spark-runtime/LICENSE b/spark/v4.1/spark-runtime/LICENSE index a67296eb412c..28b086ffd54a 100644 --- a/spark/v4.1/spark-runtime/LICENSE +++ b/spark/v4.1/spark-runtime/LICENSE @@ -219,6 +219,14 @@ License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2 -------------------------------------------------------------------------------- +This product bundles lz4-java. + +Copyright: 2020 Adrien Grand and the lz4-java contributors +Project URL: https://github.com/lz4/lz4-java +License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + This product bundles Apache Parquet. Copyright: 2014-2024 The Apache Software Foundation diff --git a/spark/v4.1/spark-runtime/runtime-deps.txt b/spark/v4.1/spark-runtime/runtime-deps.txt index 9a087517cbb0..ce6906fe3428 100644 --- a/spark/v4.1/spark-runtime/runtime-deps.txt +++ b/spark/v4.1/spark-runtime/runtime-deps.txt @@ -1,3 +1,4 @@ +at.yawk.lz4:lz4-java:1.11.0 com.fasterxml.jackson.core:jackson-annotations:2.21 com.fasterxml.jackson.core:jackson-core:2.15.2 com.fasterxml.jackson.core:jackson-databind:2.15.2