Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.parquet.column.values.factory.ValuesWriterFactory;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;

/**
Expand Down Expand Up @@ -135,6 +136,7 @@ public static WriterVersion fromString(String name) {
private final Map<String, String> extraMetaData;
private final ColumnProperty<Boolean> statistics;
private final ColumnProperty<Boolean> sizeStatistics;
private final ColumnProperty<CompressionCodecName> compressionCodec;

private ParquetProperties(Builder builder) {
this.pageSizeThreshold = builder.pageSize;
Expand Down Expand Up @@ -167,6 +169,7 @@ private ParquetProperties(Builder builder) {
this.extraMetaData = builder.extraMetaData;
this.statistics = builder.statistics.build();
this.sizeStatistics = builder.sizeStatistics.build();
this.compressionCodec = builder.compressionCodec.build();
}

public static Builder builder() {
Expand Down Expand Up @@ -348,6 +351,14 @@ public int getBloomFilterCandidatesCount(ColumnDescriptor column) {
return numBloomFilterCandidates.getValue(column);
}

public CompressionCodecName getCompressionCodec(ColumnDescriptor column) {
return compressionCodec.getValue(column);
}

public CompressionCodecName getDefaultCompressionCodec() {
return compressionCodec.getDefaultValue();
}

public Map<String, String> getExtraMetaData() {
return extraMetaData;
}
Expand Down Expand Up @@ -419,6 +430,7 @@ public static class Builder {
private Map<String, String> extraMetaData = new HashMap<>();
private final ColumnProperty.Builder<Boolean> statistics;
private final ColumnProperty.Builder<Boolean> sizeStatistics;
private final ColumnProperty.Builder<CompressionCodecName> compressionCodec;

private Builder() {
enableDict = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_IS_DICTIONARY_ENABLED);
Expand All @@ -436,6 +448,8 @@ private Builder() {
ColumnProperty.<Integer>builder().withDefaultValue(DEFAULT_BLOOM_FILTER_CANDIDATES_NUMBER);
statistics = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_STATISTICS_ENABLED);
sizeStatistics = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_SIZE_STATISTICS_ENABLED);
compressionCodec =
ColumnProperty.<CompressionCodecName>builder().withDefaultValue(CompressionCodecName.UNCOMPRESSED);
}

private Builder(ParquetProperties toCopy) {
Expand All @@ -460,6 +474,7 @@ private Builder(ParquetProperties toCopy) {
this.extraMetaData = toCopy.extraMetaData;
this.statistics = ColumnProperty.builder(toCopy.statistics);
this.sizeStatistics = ColumnProperty.builder(toCopy.sizeStatistics);
this.compressionCodec = ColumnProperty.builder(toCopy.compressionCodec);
}

/**
Expand Down Expand Up @@ -756,6 +771,32 @@ public Builder withSizeStatisticsEnabled(String columnPath, boolean enabled) {
return this;
}

/**
* Set the compression codec for the columns not specified by
* {@link #withCompressionCodec(String, CompressionCodecName)}.
*
* @param codecName the compression codec to use by default
* @return this builder for method chaining.
*/
public Builder withCompressionCodec(CompressionCodecName codecName) {
Comment thread
emkornfield marked this conversation as resolved.
this.compressionCodec.withDefaultValue(
Objects.requireNonNull(codecName, "compressionCodecName cannot be null"));
return this;
}

/**
* Set the compression codec for the specified column.
*
* @param columnPath the path of the column (dot-string)
* @param codecName the compression codec to use for the column
* @return this builder for method chaining.
*/
public Builder withCompressionCodec(String columnPath, CompressionCodecName codecName) {
this.compressionCodec.withValue(
columnPath, Objects.requireNonNull(codecName, "compressionCodecName cannot be null"));
return this;
}

public ParquetProperties build() {
ParquetProperties properties = new ParquetProperties(this);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.column;

import static org.junit.Assert.assertEquals;

import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.junit.Test;

public class ParquetPropertiesCompressionTest {

private static ColumnDescriptor col(String name) {
return new ColumnDescriptor(new String[] {name}, PrimitiveTypeName.BINARY, 0, 0);
}

@Test
public void testDefaultCompressionCodecIsUncompressed() {
ParquetProperties props = ParquetProperties.builder().build();
assertEquals(CompressionCodecName.UNCOMPRESSED, props.getCompressionCodec(col("any_column")));
assertEquals(CompressionCodecName.UNCOMPRESSED, props.getDefaultCompressionCodec());
}

@Test
public void testSetDefaultCompressionCodec() {
ParquetProperties props = ParquetProperties.builder()
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build();
assertEquals(CompressionCodecName.SNAPPY, props.getCompressionCodec(col("any_column")));
assertEquals(CompressionCodecName.SNAPPY, props.getDefaultCompressionCodec());
}

@Test
public void testPerColumnCompressionCodec() {
ParquetProperties props = ParquetProperties.builder()
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withCompressionCodec("col_a", CompressionCodecName.GZIP)
.withCompressionCodec("col_b", CompressionCodecName.UNCOMPRESSED)
.build();

// Per-column overrides
assertEquals(CompressionCodecName.GZIP, props.getCompressionCodec(col("col_a")));
assertEquals(CompressionCodecName.UNCOMPRESSED, props.getCompressionCodec(col("col_b")));
// Default for non-overridden columns
assertEquals(CompressionCodecName.SNAPPY, props.getCompressionCodec(col("col_c")));
assertEquals(CompressionCodecName.SNAPPY, props.getDefaultCompressionCodec());
}

@Test
public void testCopyPreservesCompressionCodec() {
ParquetProperties original = ParquetProperties.builder()
.withCompressionCodec(CompressionCodecName.GZIP)
.withCompressionCodec("col_a", CompressionCodecName.SNAPPY)
.build();

ParquetProperties copy = ParquetProperties.copy(original).build();

assertEquals(CompressionCodecName.GZIP, copy.getCompressionCodec(col("other")));
assertEquals(CompressionCodecName.SNAPPY, copy.getCompressionCodec(col("col_a")));
}

@Test
public void testCopyCanOverrideDefault() {
ParquetProperties original = ParquetProperties.builder()
.withCompressionCodec(CompressionCodecName.GZIP)
.withCompressionCodec("col_a", CompressionCodecName.SNAPPY)
.build();

ParquetProperties modified = ParquetProperties.copy(original)
.withCompressionCodec(CompressionCodecName.ZSTD)
.build();

// Default overridden
assertEquals(CompressionCodecName.ZSTD, modified.getCompressionCodec(col("other")));
// Per-column override preserved
assertEquals(CompressionCodecName.SNAPPY, modified.getCompressionCodec(col("col_a")));
}

@Test(expected = NullPointerException.class)
public void testNullDefaultCompressionCodecThrows() {
ParquetProperties.builder().withCompressionCodec((CompressionCodecName) null);
}

@Test(expected = NullPointerException.class)
public void testNullPerColumnCompressionCodecThrows() {
ParquetProperties.builder().withCompressionCodec("col_a", null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.zip.CRC32;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.ByteBufferReleaser;
Expand All @@ -44,6 +45,7 @@
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor;
import org.apache.parquet.crypto.AesCipher;
import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
Expand Down Expand Up @@ -576,22 +578,7 @@ public ColumnChunkPageWriteStore(
ByteBufferAllocator allocator,
int columnIndexTruncateLength,
boolean pageWriteChecksumEnabled) {
this.schema = schema;
for (ColumnDescriptor path : schema.getColumns()) {
writers.put(
path,
new ColumnChunkPageWriter(
path,
compressor,
allocator,
columnIndexTruncateLength,
pageWriteChecksumEnabled,
null,
null,
null,
-1,
-1));
}
this(compressor, schema, allocator, columnIndexTruncateLength, pageWriteChecksumEnabled, null, 0);
}

@Deprecated
Expand Down Expand Up @@ -621,14 +608,32 @@ public ColumnChunkPageWriteStore(
boolean pageWriteChecksumEnabled,
InternalFileEncryptor fileEncryptor,
int rowGroupOrdinal) {
this(
path -> compressor,
schema,
allocator,
columnIndexTruncateLength,
pageWriteChecksumEnabled,
fileEncryptor,
rowGroupOrdinal);
}

private ColumnChunkPageWriteStore(
Function<ColumnDescriptor, BytesInputCompressor> compressorProvider,
MessageType schema,
ByteBufferAllocator allocator,
int columnIndexTruncateLength,
boolean pageWriteChecksumEnabled,
InternalFileEncryptor fileEncryptor,
int rowGroupOrdinal) {
this.schema = schema;
if (null == fileEncryptor) {
for (ColumnDescriptor path : schema.getColumns()) {
writers.put(
path,
new ColumnChunkPageWriter(
path,
compressor,
compressorProvider.apply(path),
allocator,
columnIndexTruncateLength,
pageWriteChecksumEnabled,
Expand Down Expand Up @@ -660,7 +665,7 @@ public ColumnChunkPageWriteStore(
path,
new ColumnChunkPageWriter(
path,
compressor,
compressorProvider.apply(path),
allocator,
columnIndexTruncateLength,
pageWriteChecksumEnabled,
Expand All @@ -672,6 +677,88 @@ public ColumnChunkPageWriteStore(
}
}

public static Builder builder() {
return new Builder();
}

/**
* Builder for {@link ColumnChunkPageWriteStore}. Prefer this over the constructors when new
* parameters are needed so that callers do not have to be updated every time a parameter is
* added.
*/
public static class Builder {
private Function<ColumnDescriptor, BytesInputCompressor> compressorProvider;
private MessageType schema;
private ByteBufferAllocator allocator;
private int columnIndexTruncateLength = ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
private boolean pageWriteChecksumEnabled = ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
private InternalFileEncryptor fileEncryptor = null;
private int rowGroupOrdinal = 0;

private Builder() {}

/**
* Use a single compressor for every column.
*/
public Builder withCompressor(BytesInputCompressor compressor) {
this.compressorProvider = path -> compressor;
return this;
}

/**
* Resolve the compressor per column from the given codec factory and properties, allowing
* per-column compression codecs.
*/
public Builder withCodecFactory(CompressionCodecFactory codecFactory, ParquetProperties props) {
this.compressorProvider = path -> codecFactory.getCompressor(props.getCompressionCodec(path));
return this;
}

public Builder withSchema(MessageType schema) {
this.schema = schema;
return this;
}

public Builder withAllocator(ByteBufferAllocator allocator) {
this.allocator = allocator;
return this;
}

public Builder withColumnIndexTruncateLength(int columnIndexTruncateLength) {
this.columnIndexTruncateLength = columnIndexTruncateLength;
return this;
}

public Builder withPageWriteChecksumEnabled(boolean pageWriteChecksumEnabled) {
this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
return this;
}

public Builder withFileEncryptor(InternalFileEncryptor fileEncryptor) {
this.fileEncryptor = fileEncryptor;
return this;
}

public Builder withRowGroupOrdinal(int rowGroupOrdinal) {
this.rowGroupOrdinal = rowGroupOrdinal;
return this;
}

public ColumnChunkPageWriteStore build() {
if (compressorProvider == null) {
throw new IllegalStateException("A compressor or codec factory must be set");
}
return new ColumnChunkPageWriteStore(
compressorProvider,
schema,
allocator,
columnIndexTruncateLength,
pageWriteChecksumEnabled,
fileEncryptor,
rowGroupOrdinal);
}
}

@Override
public PageWriter getPageWriter(ColumnDescriptor path) {
return writers.get(path);
Expand Down
Loading