Skip to content

[FLINK-39558][table] LogicalUnnestRule: use Calcite Uncollect rowType instead of LogicalType round-trip#28053

Open
jnh5y wants to merge 4 commits intoapache:masterfrom
jnh5y:flink-39558-uncollect-rowtype
Open

[FLINK-39558][table] LogicalUnnestRule: use Calcite Uncollect rowType instead of LogicalType round-trip#28053
jnh5y wants to merge 4 commits intoapache:masterfrom
jnh5y:flink-39558-uncollect-rowtype

Conversation

@jnh5y
Copy link
Copy Markdown
Contributor

@jnh5y jnh5y commented Apr 27, 2026

What is the purpose of the change

Fixes an internal-error class in LogicalUnnestRule where the TableFunctionScan rowType diverges from what Calcite derives for the original Correlate(Uncollect) tree, causing RelOptUtil.verifyTypeEquivalence to fail for LEFT JOIN UNNEST shapes that don't fit the FLINK-33217 patch path.

Field naming change

Calcite's Uncollect derives field names from the source array, so plan output for UNNEST columns changes:

  • ARRAY<T> / MULTISET<T>: the unnested column is named after the source array column (e.g., tags0 instead of synthetic f0 / EXPR$0).
  • MAP<K,V>: key/value columns are named KEY and VALUE instead of f0 / f1.
  • WITH ORDINALITY: ordinality column is named ORDINALITY (unchanged).

Multiple unnests in the same query are auto-disambiguated by Calcite's outer Correlate (e.g. two MAP unnests produce KEY, VALUE, KEY0, VALUE0).

The runtime INTERNAL_UNNEST_ROWS function is positional, so persisted CompiledPlan instances continue to restore correctly (verified by CorrelateRestoreTest).

Does this pull request potentially affect one of the following parts?

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? n/a

Adds two tests against the existing nested_not_null.business_data column,
covering LEFT-JOIN UNNEST shapes that hit RelOptUtil.verifyTypeEquivalence
("Cannot add expression of different type to set"):

  - testNullMismatchLeftJoinNoAliasList: bare Uncollect under LEFT (no
    column-list alias inserts no Project).
  - testNullMismatchLeftJoinOnPredicate: ON-clause predicate adds a
    LogicalFilter between the LEFT correlate and the Uncollect.

Generated-by: claude-opus-4-7
@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 27, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@jnh5y jnh5y marked this pull request as ready for review April 28, 2026 12:52
Comment on lines +286 to +297
def testNullMismatchLeftJoinNoAliasList(): Unit = {
// Bare Uncollect under the LEFT correlate (no column-list alias inserts no Project).
util.verifyRelPlan(
"SELECT * FROM nested_not_null LEFT JOIN UNNEST(nested_not_null.business_data) AS exploded_bd ON TRUE")
}

@Test
def testNullMismatchLeftJoinOnPredicate(): Unit = {
// ON-clause predicate adds a LogicalFilter between the LEFT correlate and the Uncollect.
util.verifyRelPlan(
"SELECT * FROM nested_not_null LEFT JOIN UNNEST(nested_not_null.business_data) AS exploded_bd ON exploded_bd <> 'debug'")
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need comments here?
is sql not self explainable?
If you do something which is different from others (other tests in this file) you need justification

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed comments in this file and also in the rule.

Also, added some ITCases.

Use uncollect.getRowType() directly instead of round-tripping through
Flink's LogicalType. The rewritten Correlate's derived rowType then
matches the original byte-for-byte, fixing the verifyTypeEquivalence
crashes for LEFT JOIN UNNEST shapes that the FLINK-33217 patch did not
cover (bare Uncollect, Filter(Uncollect)). The
getLogicalProjectWithAdjustedNullability helper introduced by
FLINK-33217 is removed along with its dependent imports.

Plan fixtures are updated to reflect Calcite's source-derived field
naming (e.g. tags0 instead of f0/EXPR$0; KEY/VALUE for MAPs).

UnnestITCase coverage is added for the two LEFT-JOIN UNNEST shapes
(batch + stream), including empty-array and predicate-filters-everything
cases.

Generated-by: claude-opus-4-7
@jnh5y jnh5y force-pushed the flink-39558-uncollect-rowtype branch from 05b336d to 772f2e7 Compare April 28, 2026 21:13
jnh5y added 2 commits April 28, 2026 20:46
Generated-by: claude-opus-4-7
Generated-by: claude-opus-4-7
Comment on lines 689 to 693
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[id, f0 AS k, f1 AS v, ORDINALITY AS pos])
+- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.map_data)], correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.map_data))], select=[id,map_data,f0,f1,ORDINALITY], rowType=[RecordType(INTEGER id, (VARCHAR(2147483647), VARCHAR(2147483647)) MAP map_data, VARCHAR(2147483647) f0, VARCHAR(2147483647) f1, INTEGER ORDINALITY)], joinType=[INNER])
Calc(select=[id, KEY AS k, VALUE AS v, ORDINALITY AS pos])
+- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.map_data)], correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.map_data))], select=[id,map_data,KEY,VALUE,ORDINALITY], rowType=[RecordType(INTEGER id, (VARCHAR(2147483647), VARCHAR(2147483647)) MAP map_data, VARCHAR(2147483647) KEY, VARCHAR(2147483647) VALUE, INTEGER ORDINALITY)], joinType=[INNER])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[id, map_data])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a change in exec plan
do we have a store restore test?
if no, then better to have with plan generated before the change and check that after that the query still able to process

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants