Skip to content

[flink] Fix COUNT(column) aggregate pushdown to reject nullable columns#3271

Open
beryllw wants to merge 1 commit intoapache:mainfrom
beryllw:flink-count-bugfix
Open

[flink] Fix COUNT(column) aggregate pushdown to reject nullable columns#3271
beryllw wants to merge 1 commit intoapache:mainfrom
beryllw:flink-count-bugfix

Conversation

@beryllw
Copy link
Copy Markdown
Contributor

@beryllw beryllw commented May 8, 2026

Purpose

Linked issue: close #3270

Fixes the aggregate pushdown logic to correctly handle COUNT(column) on nullable columns.

Brief change log

When the aggregate is CountAggFunction with a nullable column argument, the pushdown is rejected so that Flink handles the NULL-excluding count correctly.

Tests

  • FlinkTableSourceBatchITCase#testCountPushDownForPkTable
  • FlinkTableSourceBatchITCase#testCountPushDownForLogTable

API and Format

Documentation

@beryllw
Copy link
Copy Markdown
Contributor Author

beryllw commented May 8, 2026

@luoyuxia cc

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes Flink aggregate pushdown in FlinkTableSource so that COUNT(column) is only pushed down to Fluss’ row-count API when the argument cannot be NULL (avoiding incorrect results for nullable columns, per #3270).

Changes:

  • Update aggregate pushdown logic to reject COUNT(expr) when the COUNT argument’s type is nullable.
  • Extend batch IT coverage to exercise COUNT(id) pushdown and to ensure COUNT(address) on a nullable column is not pushed down.
  • Modify log-table test data to include NULLs in the address column.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.

File Description
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java Tightens COUNT aggregate pushdown to avoid pushing down nullable-argument COUNT(expr) as a row-count.
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java Adds/extends IT assertions around COUNT pushdown behavior and adjusts log-table test data to include NULL addresses.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +592 to 599
// prepare table data with NULL values in address column
try (Table table = conn.getTable(tablePath)) {
AppendWriter appendWriter = table.newAppend().createWriter();
for (int i = 1; i <= 5; i++) {
Object[] values = new Object[] {i, "address" + i, "name" + i};
Object[] values = new Object[] {i, i % 2 == 0 ? null : "address" + i, "name" + i};
appendWriter.append(row(values));
// make sure every bucket has records
appendWriter.flush();
Comment on lines +491 to +506
// test COUNT(column) with NULL values - should NOT push down for nullable columns
// This will fail because log table doesn't support full scan in batch mode
assertThatThrownBy(
() ->
tEnv.explainSql(
String.format("SELECT COUNT(address) FROM %s", tableName)))
.hasMessageContaining(
"Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode.");
assertThatThrownBy(
() ->
tEnv.explainSql(
String.format(
"SELECT COUNT(DISTINCT address) FROM %s",
tableName)))
.hasMessageContaining(
"Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode.");
Comment on lines +817 to +826
// For COUNT(column), reject if column is nullable (cannot handle NULL filtering)
if (isCountAgg) {
List<org.apache.flink.table.expressions.Expression> args = aggExpr.getChildren();
if (!args.isEmpty() && args.get(0) instanceof ResolvedExpression) {
ResolvedExpression arg = (ResolvedExpression) args.get(0);
if (arg.getOutputDataType().getLogicalType().isNullable()) {
return false;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[flink] COUNT(column) aggregate pushdown produces incorrect results for nullable columns

2 participants