Skip to content

Flink: Reuse nested projections in RowDataProjection#16790

Open
wombatu-kun wants to merge 1 commit into
apache:mainfrom
wombatu-kun:flink-rowdataprojection-nested-reuse
Open

Flink: Reuse nested projections in RowDataProjection#16790
wombatu-kun wants to merge 1 commit into
apache:mainfrom
wombatu-kun:flink-rowdataprojection-nested-reuse

Conversation

@wombatu-kun

Copy link
Copy Markdown
Contributor

RowDataProjection projects a Flink RowData to a subset schema. When a projected field is itself a struct, the field getter rebuilt a whole nested RowDataProjection on every row:

RowData nestedRow = row.getRow(position, nestedRowType.getFieldCount());
return RowDataProjection.create(
        nestedRowType, rowField.type().asStructType(), projectField.type().asStructType())
    .wrap(nestedRow);

Each call allocates the nested projection's field-id-to-position map, its field-getter array, and the recursively built child getters, so every row with a projected nested struct pays for the full projection setup.

This builds the nested projection once when the parent projection is constructed and reuses it per row, the same way core's StructProjection already caches its nested projections. The reuse is consistent with the existing contract: the top-level projection is already reused across rows via wrap, and projected rows are consumed before the next row is wrapped.

When this runs

There are two production callers of RowDataProjection, and the nested-struct branch is taken only when a nested struct is present in the projected or equality-key schema:

  • Source reads (RowDataFileScanTaskReader): reading a table that has equality deletes (the typical Flink upsert/CDC output) while projecting a nested struct column, where the projection does not already include every column the deletes require. The reader then projects each row to drop the extra delete-applying columns, and the projected struct rebuilds its nested projection per row.
  • Upsert sinks (BaseDeltaTaskWriter): when the equality / identifier fields include a nested sub-field. The delete-key schema is built with TypeUtil.select, which carries the enclosing struct, so the per-row keyProjection.wrap(row) projects that struct.

Flat-schema tables, and reads without deletes, never take this branch (they use native format projection or primitive getters), so they are unaffected. The change moves one allocation from per-row to once-per-projection and is identical across the supported Flink versions, so it is applied to v1.20, v2.0, and v2.1 in this PR.

Benchmark

JMH microbenchmark (JDK 17, -prof gc). Source schema id: long, location: struct<lat: double, lon: double, name: string>, ts: long; projected schema location: struct<lat: double, lon: double> (the name subfield is dropped so the struct is projected rather than passed through). Each invocation wraps a pre-built row, reads the projected location struct, and reads its two double subfields.

Benchmark Metric Before After Delta
projectNestedStruct time 121.95 ns/op 23.45 ns/op -80.8%
projectNestedStruct alloc 448.0 B/op 48.0 B/op -89.3%

Existing TestRowDataProjection coverage passes unchanged.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@github-actions github-actions Bot added the flink label Jun 12, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant