[flink] Fix COUNT(column) aggregate pushdown to reject nullable columns#3271
Open
beryllw wants to merge 1 commit intoapache:mainfrom
Open
[flink] Fix COUNT(column) aggregate pushdown to reject nullable columns#3271beryllw wants to merge 1 commit intoapache:mainfrom
beryllw wants to merge 1 commit intoapache:mainfrom
Conversation
Contributor
Author
|
@luoyuxia cc |
Contributor
There was a problem hiding this comment.
Pull request overview
This PR fixes Flink aggregate pushdown in FlinkTableSource so that COUNT(column) is only pushed down to Fluss’ row-count API when the argument cannot be NULL (avoiding incorrect results for nullable columns, per #3270).
Changes:
- Update aggregate pushdown logic to reject
COUNT(expr)when the COUNT argument’s type is nullable. - Extend batch IT coverage to exercise
COUNT(id)pushdown and to ensureCOUNT(address)on a nullable column is not pushed down. - Modify log-table test data to include NULLs in the
addresscolumn.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java |
Tightens COUNT aggregate pushdown to avoid pushing down nullable-argument COUNT(expr) as a row-count. |
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java |
Adds/extends IT assertions around COUNT pushdown behavior and adjusts log-table test data to include NULL addresses. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+592
to
599
| // prepare table data with NULL values in address column | ||
| try (Table table = conn.getTable(tablePath)) { | ||
| AppendWriter appendWriter = table.newAppend().createWriter(); | ||
| for (int i = 1; i <= 5; i++) { | ||
| Object[] values = new Object[] {i, "address" + i, "name" + i}; | ||
| Object[] values = new Object[] {i, i % 2 == 0 ? null : "address" + i, "name" + i}; | ||
| appendWriter.append(row(values)); | ||
| // make sure every bucket has records | ||
| appendWriter.flush(); |
Comment on lines
+491
to
+506
| // test COUNT(column) with NULL values - should NOT push down for nullable columns | ||
| // This will fail because log table doesn't support full scan in batch mode | ||
| assertThatThrownBy( | ||
| () -> | ||
| tEnv.explainSql( | ||
| String.format("SELECT COUNT(address) FROM %s", tableName))) | ||
| .hasMessageContaining( | ||
| "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); | ||
| assertThatThrownBy( | ||
| () -> | ||
| tEnv.explainSql( | ||
| String.format( | ||
| "SELECT COUNT(DISTINCT address) FROM %s", | ||
| tableName))) | ||
| .hasMessageContaining( | ||
| "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); |
Comment on lines
+817
to
+826
| // For COUNT(column), reject if column is nullable (cannot handle NULL filtering) | ||
| if (isCountAgg) { | ||
| List<org.apache.flink.table.expressions.Expression> args = aggExpr.getChildren(); | ||
| if (!args.isEmpty() && args.get(0) instanceof ResolvedExpression) { | ||
| ResolvedExpression arg = (ResolvedExpression) args.get(0); | ||
| if (arg.getOutputDataType().getLogicalType().isNullable()) { | ||
| return false; | ||
| } | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Purpose
Linked issue: close #3270
Fixes the aggregate pushdown logic to correctly handle COUNT(column) on nullable columns.
Brief change log
When the aggregate is CountAggFunction with a nullable column argument, the pushdown is rejected so that Flink handles the NULL-excluding count correctly.
Tests
API and Format
Documentation