Skip to content

[bugfix] Propagate MSQ distinct early termination as partial results#18648

Open
xiangfu0 wants to merge 1 commit into
apache:masterfrom
xiangfu0:codex/msq-distinct-partial-flags
Open

[bugfix] Propagate MSQ distinct early termination as partial results#18648
xiangfu0 wants to merge 1 commit into
apache:masterfrom
xiangfu0:codex/msq-distinct-partial-flags

Conversation

@xiangfu0
Copy link
Copy Markdown
Contributor

@xiangfu0 xiangfu0 commented Jun 2, 2026

What changed

This PR propagates distinct early-termination metadata from single-stage leaf execution into multi-stage query responses.

  • Maps leaf-stage EARLY_TERMINATION_REASON metadata into MSQ leaf stats.
  • Adds one additive earlyTerminationReasons array to BrokerResponseNativeV2 instead of one response field per reason.
  • Marks the V2 response as partialResult=true when any early-termination reason is present.
  • Adds StatMap.Type.STRING_SET and stores early-termination reasons as an ordered set stat, avoiding delimiter parsing/joining entirely.
  • Skips EarlyTerminationReason.NONE and unknown metadata values during leaf stat propagation.
  • Extends the existing custom integration test with an MSQ distinct query using queryOptions=maxRowsInDistinct=1, verifying the V2 response is partial and includes DISTINCT_MAX_ROWS.

This PR intentionally does not add native MSQ distinct operator early-termination behavior.

Why

PR #17247 added distinct early-termination query options for single-stage execution. When a multi-stage query uses a leaf stage backed by the single-stage engine, the leaf metadata can contain an early-termination reason, but LeafOperator did not propagate that metadata into the V2 broker response. As a result, the MSQ response could omit both the partial-result marker and the reason.

User manual

For multi-stage queries whose leaf-stage single-stage execution early-terminates distinct processing, the V2 broker response now includes earlyTerminationReasons and sets partialResult to true.

Example response fields:

{
  "partialResult": true,
  "earlyTerminationReasons": ["DISTINCT_MAX_ROWS"]
}

Possible distinct reasons propagated from the single-stage leaf metadata include:

Reason Triggering query option
DISTINCT_MAX_ROWS maxRowsInDistinct
DISTINCT_MAX_ROWS_WITHOUT_CHANGE maxRowsWithoutChangeInDistinct
DISTINCT_MAX_EXECUTION_TIME maxExecutionTimeMsInDistinct

Sample SQL:

SET "useMultistageEngine" = true;
SET "maxRowsInDistinct" = 1000;
SELECT DISTINCT userId FROM events;

Sample API request using query options:

{
  "sql": "SELECT DISTINCT userId FROM events LIMIT 10000",
  "queryOptions": "maxRowsInDistinct=1"
}

Validation

git diff --check
GITHUB_ACTIONS=true ./mvnw -pl pinot-common,pinot-query-runtime -am -Dtest=StatMapTest,BrokerResponseNativeV2Test,LeafOperatorTest -Dsurefire.failIfNoSpecifiedTests=false test
./mvnw spotless:apply license:format checkstyle:check license:check -pl pinot-common,pinot-query-runtime
./mvnw -pl pinot-integration-tests -am -Dtest=DistinctQueriesTest -Dsurefire.failIfNoSpecifiedTests=false test

Results:

  • StatMapTest and BrokerResponseNativeV2Test: 39 common-module tests passed, 6 skipped.
  • LeafOperatorTest: 18 tests passed.
  • DistinctQueriesTest: 3 integration tests passed, including testMultiStageMaxRowsInDistinctEarlyTermination.
  • Spotless, checkstyle, and license checks passed for pinot-common and pinot-query-runtime.

@xiangfu0 xiangfu0 force-pushed the codex/msq-distinct-partial-flags branch from f306c8d to 4a3b9cf Compare June 2, 2026 02:50
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Jun 2, 2026

Codecov Report

❌ Patch coverage is 93.75000% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 64.45%. Comparing base (3b9a26f) to head (248de7c).
⚠️ Report is 2 commits behind head on master.

Files with missing lines Patch % Lines
...common/response/broker/BrokerResponseNativeV2.java 88.88% 0 Missing and 2 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18648      +/-   ##
============================================
+ Coverage     64.40%   64.45%   +0.05%     
  Complexity     1291     1291              
============================================
  Files          3365     3365              
  Lines        208058   208096      +38     
  Branches      32480    32487       +7     
============================================
+ Hits         133992   134132     +140     
+ Misses        63295    63197      -98     
+ Partials      10771    10767       -4     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-21 64.45% <93.75%> (+0.05%) ⬆️
temurin 64.45% <93.75%> (+0.05%) ⬆️
unittests 64.45% <93.75%> (+0.05%) ⬆️
unittests1 56.88% <93.75%> (+0.07%) ⬆️
unittests2 37.13% <3.12%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@xiangfu0 xiangfu0 added bug Something is not working as expected multi-stage Related to the multi-stage query engine labels Jun 2, 2026
@xiangfu0 xiangfu0 marked this pull request as ready for review June 2, 2026 03:50
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Propagates distinct early-termination metadata from single-stage leaf execution into multi-stage query responses, ensuring MSQ clients can detect partial DISTINCT results via the V2 broker response.

Changes:

  • Read EARLY_TERMINATION_REASON from leaf SSE metadata and aggregate it into MSQ leaf stats.
  • Expose aggregated reasons in BrokerResponseNativeV2 as earlyTerminationReasons and treat presence as partialResult=true.
  • Add a focused unit test covering the leaf bridge behavior and the serialized V2 response shape (including absence of legacy per-reason flags).

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.

File Description
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java Merge SSE early-termination reason metadata into a new aggregated leaf stat key.
pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java Add earlyTerminationReasons field (derived from broker stats), merge helper, and include it in partialResult.
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java Add unit coverage validating propagation + JSON response shape for early termination reasons.

@xiangfu0 xiangfu0 force-pushed the codex/msq-distinct-partial-flags branch from 4a3b9cf to ca80bf9 Compare June 2, 2026 04:13
@xiangfu0 xiangfu0 changed the title Propagate MSQ distinct early termination flags Propagate MSQ distinct early termination reasons Jun 2, 2026
@xiangfu0 xiangfu0 force-pushed the codex/msq-distinct-partial-flags branch 3 times, most recently from bdd349d to 248de7c Compare June 2, 2026 06:42
@xiangfu0 xiangfu0 changed the title Propagate MSQ distinct early termination reasons Propagate MSQ distinct early termination as partial results Jun 2, 2026
@xiangfu0 xiangfu0 changed the title Propagate MSQ distinct early termination as partial results [bugfix]Propagate MSQ distinct early termination as partial results Jun 2, 2026
@gortiz
Copy link
Copy Markdown
Contributor

gortiz commented Jun 2, 2026

I think it is important to say that we have a completely different concept of EarlyTermination in MSE. This is fired when a downstream operator notifies its inputs (aka children or upstream) that no more data will be needed (ie when there is an exception or a limit is reached).

This doesn't mean this PR is wrong, but we have a terminology issue we need to clarify.

TBH I don't like to name this as early termination because that concept doesn't imply an actual error. As said, early termination could be produced by a correct business logic as a limit.

Why don't we just return this early incorrect termination as an error and serialize the partial data in case of error?

@xiangfu0 xiangfu0 force-pushed the codex/msq-distinct-partial-flags branch from 248de7c to 98086fc Compare June 2, 2026 19:09
@xiangfu0 xiangfu0 changed the title [bugfix]Propagate MSQ distinct early termination as partial results [bugfix] Propagate MSQ distinct early termination as partial results Jun 2, 2026
@xiangfu0
Copy link
Copy Markdown
Contributor Author

xiangfu0 commented Jun 2, 2026

Updated the implementation to use StatMap.Type.STRING_SET for early termination reasons, so there is no comma delimiter, string parser, or join path anymore.\n\nOn the terminology point: I kept this PR scoped to propagating the existing single-stage distinct early-termination metadata into MSQ and marking the V2 response as partial. I did not convert these distinct limits into processing errors because that would change the existing SSE semantics; today these options intentionally return partial data with metadata rather than fail the query.

@xiangfu0
Copy link
Copy Markdown
Contributor Author

xiangfu0 commented Jun 2, 2026

I think it is important to say that we have a completely different concept of EarlyTermination in MSE. This is fired when a downstream operator notifies its inputs (aka children or upstream) that no more data will be needed (ie when there is an exception or a limit is reached).

This doesn't mean this PR is wrong, but we have a terminology issue we need to clarify.

TBH I don't like to name this as early termination because that concept doesn't imply an actual error. As said, early termination could be produced by a correct business logic as a limit.

Why don't we just return this early incorrect termination as an error and serialize the partial data in case of error?

yes, this query option is similar to the maxRowsInJoin in JoinOperator, which we allow child operator return early with partial results.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something is not working as expected multi-stage Related to the multi-stage query engine

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants