diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java index 9395b0e4810e..f9898fa321ba 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java @@ -108,16 +108,18 @@ private static RowData.FieldGetter createFieldGetter( switch (projectField.type().typeId()) { case STRUCT: RowType nestedRowType = (RowType) rowType.getTypeAt(position); + // Build the nested projection once instead of recreating it on every row. + RowDataProjection nestedProjection = + RowDataProjection.create( + nestedRowType, rowField.type().asStructType(), projectField.type().asStructType()); + int nestedFieldCount = nestedRowType.getFieldCount(); return row -> { // null nested struct value if (row.isNullAt(position)) { return null; } - RowData nestedRow = row.getRow(position, nestedRowType.getFieldCount()); - return RowDataProjection.create( - nestedRowType, rowField.type().asStructType(), projectField.type().asStructType()) - .wrap(nestedRow); + return nestedProjection.wrap(row.getRow(position, nestedFieldCount)); }; case MAP: diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java index 9395b0e4810e..f9898fa321ba 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java @@ -108,16 +108,18 @@ private static RowData.FieldGetter createFieldGetter( switch (projectField.type().typeId()) { case STRUCT: RowType nestedRowType = (RowType) rowType.getTypeAt(position); + // Build the nested projection once instead of recreating it on every row. + RowDataProjection nestedProjection = + RowDataProjection.create( + nestedRowType, rowField.type().asStructType(), projectField.type().asStructType()); + int nestedFieldCount = nestedRowType.getFieldCount(); return row -> { // null nested struct value if (row.isNullAt(position)) { return null; } - RowData nestedRow = row.getRow(position, nestedRowType.getFieldCount()); - return RowDataProjection.create( - nestedRowType, rowField.type().asStructType(), projectField.type().asStructType()) - .wrap(nestedRow); + return nestedProjection.wrap(row.getRow(position, nestedFieldCount)); }; case MAP: diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java index 4144b04fe4eb..4c3853347c6a 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java @@ -109,16 +109,18 @@ private static RowData.FieldGetter createFieldGetter( switch (projectField.type().typeId()) { case STRUCT: RowType nestedRowType = (RowType) rowType.getTypeAt(position); + // Build the nested projection once instead of recreating it on every row. + RowDataProjection nestedProjection = + RowDataProjection.create( + nestedRowType, rowField.type().asStructType(), projectField.type().asStructType()); + int nestedFieldCount = nestedRowType.getFieldCount(); return row -> { // null nested struct value if (row.isNullAt(position)) { return null; } - RowData nestedRow = row.getRow(position, nestedRowType.getFieldCount()); - return RowDataProjection.create( - nestedRowType, rowField.type().asStructType(), projectField.type().asStructType()) - .wrap(nestedRow); + return nestedProjection.wrap(row.getRow(position, nestedFieldCount)); }; case MAP: