Skip to content

Flink: Avoid per-row field getter allocation in RowDataWrapper#16789

Open
wombatu-kun wants to merge 1 commit into
apache:mainfrom
wombatu-kun:flink-rowdatawrapper-getter-alloc
Open

Flink: Avoid per-row field getter allocation in RowDataWrapper#16789
wombatu-kun wants to merge 1 commit into
apache:mainfrom
wombatu-kun:flink-rowdatawrapper-getter-alloc

Conversation

@wombatu-kun

Copy link
Copy Markdown
Contributor

RowDataWrapper adapts a Flink RowData to an Iceberg StructLike and sits on the hot path of every partitioned write (PartitionKey.partition), equality-delete key extraction, and range-shuffle sort-key computation. Its constructor pre-builds a getter for most types, but buildGetter returns null for the default case, which covers the most common primitive types: INT, BIGINT, BOOLEAN, FLOAT, DOUBLE, and DATE. For those fields, get(pos, javaClass) falls back to constructing a fresh Flink field getter on every access:

Object value = FlinkRowData.createFieldGetter(types[pos], pos).getFieldOrNull(rowData);

FlinkRowData.createFieldGetter allocates two lambdas per call (Flink's RowData.createFieldGetter plus the null-checking wrapper around it), so every row that is partitioned, keyed, or shuffled on a primitive column allocates two short-lived objects for each such field.

This pre-builds the fallback getter once in the constructor, the same way the wrapper already pre-builds getters for the handled types, so get no longer allocates. Behavior is unchanged: the raw Flink value is already the correct Iceberg representation for every default-case type, which is exactly what the previous fallback returned.

The change is identical across the supported Flink versions, so it is applied to v1.20, v2.0, and v2.1 in this PR.

Benchmark

JMH microbenchmark (JDK 17, -prof gc) that wraps a row and reads its fields. The schema has seven columns; readPrimitiveKey reads only the five primitive columns, representative of a typical partition or equality key.

Benchmark Metric Before After Delta
readPrimitiveKey time 64.04 ns/op 45.31 ns/op -29.3%
readPrimitiveKey alloc 275.0 B/op 75.0 B/op -72.7%
readAllFields time 84.25 ns/op 81.37 ns/op -3.4%
readAllFields alloc 275.0 B/op 75.0 B/op -72.7%

The remaining 75 B/op is return-value boxing that this change does not touch.

Existing TestRowDataWrapper coverage passes unchanged.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@github-actions github-actions Bot added the flink label Jun 12, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant