Flink: Reuse nested projections in RowDataProjection#16790
Open
wombatu-kun wants to merge 1 commit into
Open
Conversation
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
RowDataProjectionprojects a FlinkRowDatato a subset schema. When a projected field is itself a struct, the field getter rebuilt a whole nestedRowDataProjectionon every row:Each call allocates the nested projection's field-id-to-position map, its field-getter array, and the recursively built child getters, so every row with a projected nested struct pays for the full projection setup.
This builds the nested projection once when the parent projection is constructed and reuses it per row, the same way core's
StructProjectionalready caches its nested projections. The reuse is consistent with the existing contract: the top-level projection is already reused across rows viawrap, and projected rows are consumed before the next row is wrapped.When this runs
There are two production callers of
RowDataProjection, and the nested-struct branch is taken only when a nested struct is present in the projected or equality-key schema:RowDataFileScanTaskReader): reading a table that has equality deletes (the typical Flink upsert/CDC output) while projecting a nested struct column, where the projection does not already include every column the deletes require. The reader then projects each row to drop the extra delete-applying columns, and the projected struct rebuilds its nested projection per row.BaseDeltaTaskWriter): when the equality / identifier fields include a nested sub-field. The delete-key schema is built withTypeUtil.select, which carries the enclosing struct, so the per-rowkeyProjection.wrap(row)projects that struct.Flat-schema tables, and reads without deletes, never take this branch (they use native format projection or primitive getters), so they are unaffected. The change moves one allocation from per-row to once-per-projection and is identical across the supported Flink versions, so it is applied to v1.20, v2.0, and v2.1 in this PR.
Benchmark
JMH microbenchmark (JDK 17,
-prof gc). Source schemaid: long, location: struct<lat: double, lon: double, name: string>, ts: long; projected schemalocation: struct<lat: double, lon: double>(thenamesubfield is dropped so the struct is projected rather than passed through). Each invocation wraps a pre-built row, reads the projectedlocationstruct, and reads its twodoublesubfields.Existing
TestRowDataProjectioncoverage passes unchanged.