Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ project(':iceberg-core') {
}

implementation libs.aircompressor
implementation libs.lz4Java
implementation libs.httpcomponents.httpclient5
implementation platform(libs.jackson.bom)
implementation libs.jackson.core
Expand Down
40 changes: 34 additions & 6 deletions core/src/main/java/org/apache/iceberg/puffin/PuffinFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@
import io.airlift.compress.Compressor;
import io.airlift.compress.zstd.ZstdCompressor;
import io.airlift.compress.zstd.ZstdDecompressor;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import net.jpountz.lz4.LZ4FrameInputStream;
import net.jpountz.lz4.LZ4FrameOutputStream;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.ByteBuffers;
Expand Down Expand Up @@ -108,9 +113,7 @@ static ByteBuffer compress(PuffinCompressionCodec codec, ByteBuffer input) {
case NONE:
return input.duplicate();
case LZ4:
// TODO requires LZ4 frame compressor, e.g.
// https://github.com/airlift/aircompressor/pull/142
break;
return compressLz4(input);
case ZSTD:
return compress(new ZstdCompressor(), input);
}
Expand All @@ -130,9 +133,7 @@ static ByteBuffer decompress(PuffinCompressionCodec codec, ByteBuffer input) {
return input.duplicate();

case LZ4:
// TODO requires LZ4 frame decompressor, e.g.
// https://github.com/airlift/aircompressor/pull/142
break;
return decompressLz4(input);

case ZSTD:
return decompressZstd(input);
Expand All @@ -141,6 +142,33 @@ static ByteBuffer decompress(PuffinCompressionCodec codec, ByteBuffer input) {
throw new UnsupportedOperationException("Unsupported codec: " + codec);
}

private static ByteBuffer compressLz4(ByteBuffer input) {
byte[] inputBytes = ByteBuffers.toByteArray(input);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (LZ4FrameOutputStream lz4Out =
new LZ4FrameOutputStream(
baos,
LZ4FrameOutputStream.BLOCKSIZE.SIZE_4MB,
inputBytes.length,
LZ4FrameOutputStream.FLG.Bits.BLOCK_INDEPENDENCE,
LZ4FrameOutputStream.FLG.Bits.CONTENT_SIZE)) {
lz4Out.write(inputBytes);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return ByteBuffer.wrap(baos.toByteArray());
}

private static ByteBuffer decompressLz4(ByteBuffer input) {
byte[] inputBytes = ByteBuffers.toByteArray(input);
try (LZ4FrameInputStream lz4In =
new LZ4FrameInputStream(new ByteArrayInputStream(inputBytes))) {
return ByteBuffer.wrap(lz4In.readAllBytes());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private static ByteBuffer decompressZstd(ByteBuffer input) {
byte[] inputBytes;
int inputOffset;
Expand Down
27 changes: 27 additions & 0 deletions core/src/test/java/org/apache/iceberg/puffin/TestPuffinFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -83,4 +84,30 @@ private byte[] bytes(int... unsignedBytes) {
}
return bytes;
}

@Test
void testLz4CompressDecompressRoundTrip() {
byte[] original =
"some test data for LZ4 compression round-trip".getBytes(StandardCharsets.UTF_8);
ByteBuffer input = ByteBuffer.wrap(original);

ByteBuffer compressed = PuffinFormat.compress(PuffinCompressionCodec.LZ4, input);
assertThat(compressed.remaining()).isGreaterThan(0);

ByteBuffer decompressed = PuffinFormat.decompress(PuffinCompressionCodec.LZ4, compressed);
byte[] result = new byte[decompressed.remaining()];
decompressed.get(result);
assertThat(result).isEqualTo(original);
}

@Test
void testLz4CompressDecompressEmpty() {
ByteBuffer input = ByteBuffer.wrap(new byte[0]);

ByteBuffer compressed = PuffinFormat.compress(PuffinCompressionCodec.LZ4, input);
assertThat(compressed.remaining()).isGreaterThan(0);

ByteBuffer decompressed = PuffinFormat.decompress(PuffinCompressionCodec.LZ4, compressed);
assertThat(decompressed.remaining()).isEqualTo(0);
}
}
68 changes: 57 additions & 11 deletions core/src/test/java/org/apache/iceberg/puffin/TestPuffinWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.puffin;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.iceberg.puffin.PuffinCompressionCodec.LZ4;
import static org.apache.iceberg.puffin.PuffinCompressionCodec.NONE;
import static org.apache.iceberg.puffin.PuffinCompressionCodec.ZSTD;
import static org.apache.iceberg.puffin.PuffinFormatTestUtil.EMPTY_PUFFIN_UNCOMPRESSED_FOOTER_SIZE;
Expand Down Expand Up @@ -46,19 +47,22 @@ public class TestPuffinWriter {
@TempDir private Path temp;

@Test
public void testEmptyFooterCompressed() {
public void testEmptyFooterCompressed() throws Exception {
InMemoryOutputFile outputFile = new InMemoryOutputFile();

PuffinWriter writer = Puffin.write(outputFile).compressFooter().build();
assertThatThrownBy(writer::footerSize)
.isInstanceOf(IllegalStateException.class)
.hasMessage("Footer not written yet");
assertThatThrownBy(writer::finish)
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Unsupported codec: LZ4");
assertThatThrownBy(writer::close)
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Unsupported codec: LZ4");
try (PuffinWriter writer = Puffin.write(outputFile).compressFooter().build()) {
assertThatThrownBy(writer::footerSize)
.isInstanceOf(IllegalStateException.class)
.hasMessage("Footer not written yet");
writer.finish();
assertThat(writer.footerSize()).isGreaterThan(0);
assertThat(writer.writtenBlobsMetadata()).isEmpty();
}

// Verify the compressed puffin file can be read back
try (PuffinReader reader = Puffin.read(outputFile.toInputFile()).build()) {
assertThat(reader.fileMetadata().properties()).isEmpty();
}
}

@Test
Expand Down Expand Up @@ -98,6 +102,48 @@ public void testWriteMetricDataCompressedZstd() throws Exception {
testWriteMetric(ZSTD, "v1/sample-metric-data-compressed-zstd.bin");
}

@Test
public void testWriteAndReadMetricDataCompressedLz4() throws Exception {
InMemoryOutputFile outputFile = new InMemoryOutputFile();
try (PuffinWriter writer = Puffin.write(outputFile).createdBy("Test 1234").build()) {
writer.add(
new Blob(
"some-blob",
ImmutableList.of(1),
2,
1,
ByteBuffer.wrap("abcdefghi".getBytes(UTF_8)),
LZ4,
ImmutableMap.of()));

byte[] bytes =
"xxx some blob \u0000 binary data 🤯 that is not very very very very very very long, is it? xxx"
.getBytes(UTF_8);
writer.add(
new Blob(
"some-other-blob",
ImmutableList.of(2),
2,
1,
ByteBuffer.wrap(bytes, 4, bytes.length - 8),
LZ4,
ImmutableMap.of()));
}

// Read back and verify decompression produces correct data
try (PuffinReader reader = Puffin.read(outputFile.toInputFile()).build()) {
assertThat(reader.fileMetadata().blobs()).hasSize(2);

BlobMetadata firstMeta = reader.fileMetadata().blobs().get(0);
assertThat(firstMeta.type()).isEqualTo("some-blob");
assertThat(firstMeta.compressionCodec()).isEqualTo("lz4");

BlobMetadata secondMeta = reader.fileMetadata().blobs().get(1);
assertThat(secondMeta.type()).isEqualTo("some-other-blob");
assertThat(secondMeta.compressionCodec()).isEqualTo("lz4");
}
}

@ParameterizedTest
@CsvSource({"true, 158", "false, 122"})
public void testFileSizeCalculation(boolean isEncrypted, long expectedSize) throws Exception {
Expand Down
8 changes: 8 additions & 0 deletions flink/v1.20/flink-runtime/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,14 @@ License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2

--------------------------------------------------------------------------------

This product bundles lz4-java.

Copyright: 2020 Adrien Grand and the lz4-java contributors
Project URL: https://github.com/lz4/lz4-java
License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0

--------------------------------------------------------------------------------

This product bundles Apache Parquet.

Copyright: 2014-2020 The Apache Software Foundation.
Expand Down
1 change: 1 addition & 0 deletions flink/v1.20/flink-runtime/runtime-deps.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
at.yawk.lz4:lz4-java:1.11.0
com.fasterxml.jackson.core:jackson-annotations:2.21
com.fasterxml.jackson.core:jackson-core:2.21.2
com.fasterxml.jackson.core:jackson-databind:2.21.2
Expand Down
8 changes: 8 additions & 0 deletions flink/v2.0/flink-runtime/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,14 @@ License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2

--------------------------------------------------------------------------------

This product bundles lz4-java.

Copyright: 2020 Adrien Grand and the lz4-java contributors
Project URL: https://github.com/lz4/lz4-java
License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0

--------------------------------------------------------------------------------

This product bundles Apache Parquet.

Copyright: 2014-2020 The Apache Software Foundation.
Expand Down
1 change: 1 addition & 0 deletions flink/v2.0/flink-runtime/runtime-deps.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
at.yawk.lz4:lz4-java:1.11.0
com.fasterxml.jackson.core:jackson-annotations:2.21
com.fasterxml.jackson.core:jackson-core:2.21.2
com.fasterxml.jackson.core:jackson-databind:2.21.2
Expand Down
8 changes: 8 additions & 0 deletions flink/v2.1/flink-runtime/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,14 @@ License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2

--------------------------------------------------------------------------------

This product bundles lz4-java.

Copyright: 2020 Adrien Grand and the lz4-java contributors
Project URL: https://github.com/lz4/lz4-java
License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0

--------------------------------------------------------------------------------

This product bundles Apache Parquet.

Copyright: 2014-2020 The Apache Software Foundation.
Expand Down
1 change: 1 addition & 0 deletions flink/v2.1/flink-runtime/runtime-deps.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
at.yawk.lz4:lz4-java:1.11.0
com.fasterxml.jackson.core:jackson-annotations:2.21
com.fasterxml.jackson.core:jackson-core:2.21.2
com.fasterxml.jackson.core:jackson-databind:2.21.2
Expand Down
8 changes: 8 additions & 0 deletions kafka-connect/kafka-connect-runtime/hive/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,14 @@ License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2

--------------------------------------------------------------------------------

This product bundles lz4-java.

Copyright: 2020 Adrien Grand and the lz4-java contributors
Project URL: https://github.com/lz4/lz4-java
License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0

--------------------------------------------------------------------------------

This product bundles Woodstox.

Project URL: https://github.com/FasterXML/woodstox
Expand Down
8 changes: 8 additions & 0 deletions kafka-connect/kafka-connect-runtime/main/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,14 @@ License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2

--------------------------------------------------------------------------------

This product bundles lz4-java.

Copyright: 2020 Adrien Grand and the lz4-java contributors
Project URL: https://github.com/lz4/lz4-java
License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0

--------------------------------------------------------------------------------

This product bundles Woodstox.

Project URL: https://github.com/FasterXML/woodstox
Expand Down
1 change: 1 addition & 0 deletions kafka-connect/kafka-connect-runtime/runtime-deps.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
at.yawk.lz4:lz4-java:1.11.0
com.azure:azure-core-http-netty:1.16.3
com.azure:azure-core:1.57.1
com.azure:azure-identity:1.18.2
Expand Down
8 changes: 8 additions & 0 deletions spark/v3.4/spark-runtime/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,14 @@ License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2

--------------------------------------------------------------------------------

This product bundles lz4-java.

Copyright: 2020 Adrien Grand and the lz4-java contributors
Project URL: https://github.com/lz4/lz4-java
License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0

--------------------------------------------------------------------------------

This product bundles Apache Parquet.

Copyright: 2014-2024 The Apache Software Foundation
Expand Down
1 change: 1 addition & 0 deletions spark/v3.4/spark-runtime/runtime-deps.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
at.yawk.lz4:lz4-java:1.11.0
com.fasterxml.jackson.core:jackson-annotations:2.21
com.fasterxml.jackson.core:jackson-core:2.14.2
com.fasterxml.jackson.core:jackson-databind:2.14.2
Expand Down
8 changes: 8 additions & 0 deletions spark/v3.5/spark-runtime/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,14 @@ License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2

--------------------------------------------------------------------------------

This product bundles lz4-java.

Copyright: 2020 Adrien Grand and the lz4-java contributors
Project URL: https://github.com/lz4/lz4-java
License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0

--------------------------------------------------------------------------------

This product bundles Apache Parquet.

Copyright: 2014-2024 The Apache Software Foundation
Expand Down
1 change: 1 addition & 0 deletions spark/v3.5/spark-runtime/runtime-deps.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
at.yawk.lz4:lz4-java:1.11.0
com.fasterxml.jackson.core:jackson-annotations:2.21
com.fasterxml.jackson.core:jackson-core:2.15.2
com.fasterxml.jackson.core:jackson-databind:2.15.2
Expand Down
8 changes: 8 additions & 0 deletions spark/v4.0/spark-runtime/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,14 @@ License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2

--------------------------------------------------------------------------------

This product bundles lz4-java.

Copyright: 2020 Adrien Grand and the lz4-java contributors
Project URL: https://github.com/lz4/lz4-java
License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0

--------------------------------------------------------------------------------

This product bundles Apache Parquet.

Copyright: 2014-2024 The Apache Software Foundation
Expand Down
1 change: 1 addition & 0 deletions spark/v4.0/spark-runtime/runtime-deps.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
at.yawk.lz4:lz4-java:1.11.0
com.fasterxml.jackson.core:jackson-annotations:2.21
com.fasterxml.jackson.core:jackson-core:2.15.2
com.fasterxml.jackson.core:jackson-databind:2.15.2
Expand Down
8 changes: 8 additions & 0 deletions spark/v4.1/spark-runtime/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,14 @@ License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2

--------------------------------------------------------------------------------

This product bundles lz4-java.

Copyright: 2020 Adrien Grand and the lz4-java contributors
Project URL: https://github.com/lz4/lz4-java
License: Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0

--------------------------------------------------------------------------------

This product bundles Apache Parquet.

Copyright: 2014-2024 The Apache Software Foundation
Expand Down
1 change: 1 addition & 0 deletions spark/v4.1/spark-runtime/runtime-deps.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
at.yawk.lz4:lz4-java:1.11.0
com.fasterxml.jackson.core:jackson-annotations:2.21
com.fasterxml.jackson.core:jackson-core:2.15.2
com.fasterxml.jackson.core:jackson-databind:2.15.2
Expand Down
Loading