diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java index 2957a4837893..f3c785058a46 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java @@ -490,7 +490,8 @@ public void write(int repetitionLevel, byte[] bytes) { } private static class ArrayDataWriter extends ParquetValueWriters.RepeatedWriter { - private final LogicalType elementType; + private final ArrayData.ElementGetter elementGetter; + private final ElementIterator elementIterator; private ArrayDataWriter( int definitionLevel, @@ -498,25 +499,27 @@ private ArrayDataWriter( ParquetValueWriter writer, LogicalType elementType) { super(definitionLevel, repetitionLevel, writer); - this.elementType = elementType; + this.elementGetter = ArrayData.createElementGetter(elementType); + this.elementIterator = new ElementIterator(); } @Override protected Iterator elements(ArrayData list) { - return new ElementIterator<>(list); + // The parent writer fully consumes the iterator inside a single write() call, so a single + // reusable instance avoids allocating an iterator per row. + elementIterator.reset(list); + return elementIterator; } - private class ElementIterator implements Iterator { - private final int size; - private final ArrayData list; - private final ArrayData.ElementGetter getter; + private class ElementIterator implements Iterator { + private ArrayData list; + private int size; private int index; - private ElementIterator(ArrayData list) { - this.list = list; - size = list.size(); - getter = ArrayData.createElementGetter(elementType); - index = 0; + private void reset(ArrayData newList) { + this.list = newList; + this.size = newList.size(); + this.index = 0; } @Override @@ -531,7 +534,7 @@ public E next() { throw new NoSuchElementException(); } - E element = (E) getter.getElementOrNull(list, index); + E element = (E) elementGetter.getElementOrNull(list, index); index += 1; return element; @@ -541,8 +544,9 @@ public E next() { private static class MapDataWriter extends ParquetValueWriters.RepeatedKeyValueWriter { - private final LogicalType keyType; - private final LogicalType valueType; + private final ArrayData.ElementGetter keyGetter; + private final ArrayData.ElementGetter valueGetter; + private final EntryIterator entryIterator; private MapDataWriter( int definitionLevel, @@ -552,32 +556,32 @@ private MapDataWriter( LogicalType keyType, LogicalType valueType) { super(definitionLevel, repetitionLevel, keyWriter, valueWriter); - this.keyType = keyType; - this.valueType = valueType; + this.keyGetter = ArrayData.createElementGetter(keyType); + this.valueGetter = ArrayData.createElementGetter(valueType); + this.entryIterator = new EntryIterator(); } @Override protected Iterator> pairs(MapData map) { - return new EntryIterator<>(map); - } - - private class EntryIterator implements Iterator> { - private final int size; - private final ArrayData keys; - private final ArrayData values; - private final ParquetValueReaders.ReusableEntry entry; - private final ArrayData.ElementGetter keyGetter; - private final ArrayData.ElementGetter valueGetter; + // The parent writer fully consumes the iterator inside a single write() call, so a single + // reusable instance (and its reusable entry) avoids allocating per row. + entryIterator.reset(map); + return entryIterator; + } + + private class EntryIterator implements Iterator> { + private final ParquetValueReaders.ReusableEntry entry = + new ParquetValueReaders.ReusableEntry<>(); + private ArrayData keys; + private ArrayData values; + private int size; private int index; - private EntryIterator(MapData map) { - size = map.size(); - keys = map.keyArray(); - values = map.valueArray(); - entry = new ParquetValueReaders.ReusableEntry<>(); - keyGetter = ArrayData.createElementGetter(keyType); - valueGetter = ArrayData.createElementGetter(valueType); - index = 0; + private void reset(MapData map) { + this.keys = map.keyArray(); + this.values = map.valueArray(); + this.size = map.size(); + this.index = 0; } @Override diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java index 2957a4837893..f3c785058a46 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java @@ -490,7 +490,8 @@ public void write(int repetitionLevel, byte[] bytes) { } private static class ArrayDataWriter extends ParquetValueWriters.RepeatedWriter { - private final LogicalType elementType; + private final ArrayData.ElementGetter elementGetter; + private final ElementIterator elementIterator; private ArrayDataWriter( int definitionLevel, @@ -498,25 +499,27 @@ private ArrayDataWriter( ParquetValueWriter writer, LogicalType elementType) { super(definitionLevel, repetitionLevel, writer); - this.elementType = elementType; + this.elementGetter = ArrayData.createElementGetter(elementType); + this.elementIterator = new ElementIterator(); } @Override protected Iterator elements(ArrayData list) { - return new ElementIterator<>(list); + // The parent writer fully consumes the iterator inside a single write() call, so a single + // reusable instance avoids allocating an iterator per row. + elementIterator.reset(list); + return elementIterator; } - private class ElementIterator implements Iterator { - private final int size; - private final ArrayData list; - private final ArrayData.ElementGetter getter; + private class ElementIterator implements Iterator { + private ArrayData list; + private int size; private int index; - private ElementIterator(ArrayData list) { - this.list = list; - size = list.size(); - getter = ArrayData.createElementGetter(elementType); - index = 0; + private void reset(ArrayData newList) { + this.list = newList; + this.size = newList.size(); + this.index = 0; } @Override @@ -531,7 +534,7 @@ public E next() { throw new NoSuchElementException(); } - E element = (E) getter.getElementOrNull(list, index); + E element = (E) elementGetter.getElementOrNull(list, index); index += 1; return element; @@ -541,8 +544,9 @@ public E next() { private static class MapDataWriter extends ParquetValueWriters.RepeatedKeyValueWriter { - private final LogicalType keyType; - private final LogicalType valueType; + private final ArrayData.ElementGetter keyGetter; + private final ArrayData.ElementGetter valueGetter; + private final EntryIterator entryIterator; private MapDataWriter( int definitionLevel, @@ -552,32 +556,32 @@ private MapDataWriter( LogicalType keyType, LogicalType valueType) { super(definitionLevel, repetitionLevel, keyWriter, valueWriter); - this.keyType = keyType; - this.valueType = valueType; + this.keyGetter = ArrayData.createElementGetter(keyType); + this.valueGetter = ArrayData.createElementGetter(valueType); + this.entryIterator = new EntryIterator(); } @Override protected Iterator> pairs(MapData map) { - return new EntryIterator<>(map); - } - - private class EntryIterator implements Iterator> { - private final int size; - private final ArrayData keys; - private final ArrayData values; - private final ParquetValueReaders.ReusableEntry entry; - private final ArrayData.ElementGetter keyGetter; - private final ArrayData.ElementGetter valueGetter; + // The parent writer fully consumes the iterator inside a single write() call, so a single + // reusable instance (and its reusable entry) avoids allocating per row. + entryIterator.reset(map); + return entryIterator; + } + + private class EntryIterator implements Iterator> { + private final ParquetValueReaders.ReusableEntry entry = + new ParquetValueReaders.ReusableEntry<>(); + private ArrayData keys; + private ArrayData values; + private int size; private int index; - private EntryIterator(MapData map) { - size = map.size(); - keys = map.keyArray(); - values = map.valueArray(); - entry = new ParquetValueReaders.ReusableEntry<>(); - keyGetter = ArrayData.createElementGetter(keyType); - valueGetter = ArrayData.createElementGetter(valueType); - index = 0; + private void reset(MapData map) { + this.keys = map.keyArray(); + this.values = map.valueArray(); + this.size = map.size(); + this.index = 0; } @Override diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java index d9a7e0dab970..48807544cff3 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java @@ -512,7 +512,8 @@ public void write(int repetitionLevel, byte[] bytes) { } private static class ArrayDataWriter extends ParquetValueWriters.RepeatedWriter { - private final LogicalType elementType; + private final ArrayData.ElementGetter elementGetter; + private final ElementIterator elementIterator; private ArrayDataWriter( int definitionLevel, @@ -520,25 +521,27 @@ private ArrayDataWriter( ParquetValueWriter writer, LogicalType elementType) { super(definitionLevel, repetitionLevel, writer); - this.elementType = elementType; + this.elementGetter = ArrayData.createElementGetter(elementType); + this.elementIterator = new ElementIterator(); } @Override protected Iterator elements(ArrayData list) { - return new ElementIterator<>(list); + // The parent writer fully consumes the iterator inside a single write() call, so a single + // reusable instance avoids allocating an iterator per row. + elementIterator.reset(list); + return elementIterator; } - private class ElementIterator implements Iterator { - private final int size; - private final ArrayData list; - private final ArrayData.ElementGetter getter; + private class ElementIterator implements Iterator { + private ArrayData list; + private int size; private int index; - private ElementIterator(ArrayData list) { - this.list = list; - size = list.size(); - getter = ArrayData.createElementGetter(elementType); - index = 0; + private void reset(ArrayData newList) { + this.list = newList; + this.size = newList.size(); + this.index = 0; } @Override @@ -553,7 +556,7 @@ public E next() { throw new NoSuchElementException(); } - E element = (E) getter.getElementOrNull(list, index); + E element = (E) elementGetter.getElementOrNull(list, index); index += 1; return element; @@ -563,8 +566,9 @@ public E next() { private static class MapDataWriter extends ParquetValueWriters.RepeatedKeyValueWriter { - private final LogicalType keyType; - private final LogicalType valueType; + private final ArrayData.ElementGetter keyGetter; + private final ArrayData.ElementGetter valueGetter; + private final EntryIterator entryIterator; private MapDataWriter( int definitionLevel, @@ -574,32 +578,32 @@ private MapDataWriter( LogicalType keyType, LogicalType valueType) { super(definitionLevel, repetitionLevel, keyWriter, valueWriter); - this.keyType = keyType; - this.valueType = valueType; + this.keyGetter = ArrayData.createElementGetter(keyType); + this.valueGetter = ArrayData.createElementGetter(valueType); + this.entryIterator = new EntryIterator(); } @Override protected Iterator> pairs(MapData map) { - return new EntryIterator<>(map); - } - - private class EntryIterator implements Iterator> { - private final int size; - private final ArrayData keys; - private final ArrayData values; - private final ParquetValueReaders.ReusableEntry entry; - private final ArrayData.ElementGetter keyGetter; - private final ArrayData.ElementGetter valueGetter; + // The parent writer fully consumes the iterator inside a single write() call, so a single + // reusable instance (and its reusable entry) avoids allocating per row. + entryIterator.reset(map); + return entryIterator; + } + + private class EntryIterator implements Iterator> { + private final ParquetValueReaders.ReusableEntry entry = + new ParquetValueReaders.ReusableEntry<>(); + private ArrayData keys; + private ArrayData values; + private int size; private int index; - private EntryIterator(MapData map) { - size = map.size(); - keys = map.keyArray(); - values = map.valueArray(); - entry = new ParquetValueReaders.ReusableEntry<>(); - keyGetter = ArrayData.createElementGetter(keyType); - valueGetter = ArrayData.createElementGetter(valueType); - index = 0; + private void reset(MapData map) { + this.keys = map.keyArray(); + this.values = map.valueArray(); + this.size = map.size(); + this.index = 0; } @Override