Skip to content

Support pushdown array/collect aggregation#5072

Draft
LantaoJin wants to merge 5 commits intoopensearch-project:mainfrom
LantaoJin:pr/issues/5070
Draft

Support pushdown array/collect aggregation#5072
LantaoJin wants to merge 5 commits intoopensearch-project:mainfrom
LantaoJin:pr/issues/5070

Conversation

@LantaoJin
Copy link
Member

@LantaoJin LantaoJin commented Jan 26, 2026

Description

This PR is a pushdown optimization for #5025, which introduce array/collect Calcite aggregation.

Related Issues

Resolves #5070

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • New functionality has javadoc added.
  • New functionality has a user manual doc added.
  • New PPL command checklist all confirmed.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff or -s.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Lantao Jin <ltjin@amazon.com>
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 26, 2026

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

  • 🔍 Trigger a full review

Comment @coderabbitai help to get the list of available commands and usage tips.

@LantaoJin LantaoJin added the enhancement New feature or request label Jan 26, 2026
@LantaoJin
Copy link
Member Author

This PR includes duplicated logic of #5025, which is required to rebase after 5025 merged.

@github-actions
Copy link
Contributor

github-actions bot commented Mar 3, 2026

PR Reviewer Guide 🔍

(Review updated until commit d0714da)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
📝 TODO sections

🔀 Multiple PR themes

Sub-PR theme: Add updateIndexSettings API to OpenSearchClient

Relevant files:

  • opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java
  • opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java
  • opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java

Sub-PR theme: Pushdown support for COLLECT/ARRAY_AGG aggregation

Relevant files:

  • opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java
  • core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java
  • integ-test/src/test/resources/expectedOutput/calcite/explain_mvcombine.yaml
  • integ-test/src/test/resources/expectedOutput/calcite/explain_take.yaml
  • integ-test/src/test/resources/expectedOutput/calcite/explain_patterns_simple_pattern_agg_push.yaml

Sub-PR theme: Documentation and doctest updates for mvcombine configuration

Relevant files:

  • doctest/markdown_parser.py
  • doctest/test_docs.py
  • docs/user/ppl/cmd/mvcombine.md

⚡ Recommended focus areas for review

Hardcoded Limit

The COLLECT/ARRAY_AGG pushdown always uses MAX_TOP_HITS_RESULT_WINDOW (10000) as the size for top_hits aggregation. This silently truncates results when a group has more than 10000 documents, with no warning to the user. Consider whether this limit should be configurable or at least documented as a known limitation.

case COLLECT, ARRAY_AGG -> {
  TopHitsAggregationBuilder topHitsBuilder =
      createTopHitsBuilder(
          aggCall,
          args,
          aggName,
          helper,
          MAX_TOP_HITS_RESULT_WINDOW,
          false,
          false,
          null,
          null);
  yield Pair.of(topHitsBuilder, new TopHitsParser(aggName, false, true));
}
Missing Error Handling

The updateIndexSettings method in OpenSearchNodeClient does not have a try-catch block, unlike the REST client implementation. Any exception (e.g., index not found, security exception) will propagate uncaught. Consider adding consistent error handling similar to OpenSearchRestClient.

@Override
public boolean updateIndexSettings(Map<String, Object> settings, String... indexExpression) {
  AcknowledgedResponse response =
      client
          .admin()
          .indices()
          .prepareUpdateSettings(indexExpression)
          .setSettings(
              settings.entrySet().stream()
                  .filter(e -> e.getKey().startsWith("index."))
                  .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
          .get();
  return response.isAcknowledged();
}
Settings Leak

In testMvCombineUnsupportedInV2, the updateIndexSettings call to increase max_inner_result_window is inside the try block but before the query. If updateIndexSettings itself throws, the finally block will still attempt to reset, but if the exception is thrown before the result variable is assigned, the subsequent verifyQuery(result) call after the try-catch-finally will fail with a NullPointerException since result may be uninitialized.

  updateIndexSettings(
      TEST_INDEX_BANK, "{ \"index\": { \"max_inner_result_window\":" + 10000 + " } }");
  result =
      executeQuery(
          String.format(
              "source=%s | fields state, city, age | mvcombine age", TEST_INDEX_BANK));
} catch (ResponseException e) {
  result = new JSONObject(TestUtils.getResponseBody(e.getResponse()));
} finally {
  updateIndexSettings(
      TEST_INDEX_BANK, "{ \"index\": { \"max_inner_result_window\":" + 100 + " } }");
}
verifyQuery(result);
Blocking Call

The updateIndexSettings method uses .get() which is a blocking call on a future. This is consistent with other methods in the class, but it's worth verifying that this won't cause thread starvation in the node client context, especially since there's no timeout specified.

public boolean updateIndexSettings(Map<String, Object> settings, String... indexExpression) {
  AcknowledgedResponse response =
      client
          .admin()
          .indices()
          .prepareUpdateSettings(indexExpression)
          .setSettings(
              settings.entrySet().stream()
                  .filter(e -> e.getKey().startsWith("index."))
                  .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
          .get();
  return response.isAcknowledged();

@github-actions
Copy link
Contributor

github-actions bot commented Mar 3, 2026

PR Code Suggestions ✨

Latest suggestions up to d0714da
Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix wrong JUnit lifecycle annotation version

The @After annotation is from JUnit 4 (org.junit.After), but the test class uses
JUnit 5 annotations (@Test from org.junit.jupiter.api.Test). In JUnit 5, the
equivalent annotation is @AfterEach. Using @After in a JUnit 5 test class means the
teardown method will never be executed, leaving the index setting permanently
changed.

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMvCombineCommandIT.java [38-41]

-@After
+@AfterEach
 public void afterTest() throws IOException {
   updateIndexSettings(INDEX, "{ \"index\": { \"max_inner_result_window\":" + 100 + " } }");
 }
Suggestion importance[1-10]: 8

__

Why: The @After annotation (JUnit 4) is imported but the test class uses JUnit 5 (@Test from org.junit.jupiter.api.Test). This means the teardown method will never execute, leaving max_inner_result_window permanently set to 10000, which could affect other tests.

Medium
Handle exceptions from blocking future call

The .get() call on the future can throw checked and unchecked exceptions (e.g.,
InterruptedException, ExecutionException) that are not handled. If the call is
interrupted or fails, the exception will propagate as an unchecked exception without
a meaningful error message. Wrap the call in a try-catch block similar to how
getIndexMaxResultWindows handles it.

opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java [142-153]

-AcknowledgedResponse response =
-    client
-        .admin()
-        .indices()
-        .prepareUpdateSettings(indexExpression)
-        .setSettings(
-            settings.entrySet().stream()
-                .filter(e -> e.getKey().startsWith("index."))
-                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
-        .get();
-return response.isAcknowledged();
+try {
+  AcknowledgedResponse response =
+      client
+          .admin()
+          .indices()
+          .prepareUpdateSettings(indexExpression)
+          .setSettings(
+              settings.entrySet().stream()
+                  .filter(e -> e.getKey().startsWith("index."))
+                  .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
+          .get();
+  return response.isAcknowledged();
+} catch (Exception e) {
+  throw new IllegalStateException(
+      String.format("Failed to update index settings %s for %s", settings, Arrays.toString(indexExpression)), e);
+}
Suggestion importance[1-10]: 6

__

Why: The .get() call can throw InterruptedException or ExecutionException that are not handled, which could result in confusing error messages. The getIndexMaxResultWindows method in the same class wraps similar calls in try-catch, so this is an inconsistency worth fixing.

Low
General
Respect index max_inner_result_window setting

The COLLECT/ARRAY_AGG pushdown hardcodes MAX_TOP_HITS_RESULT_WINDOW (10000) as the
size, but the actual index setting max_inner_result_window may be lower (default is
100). If the index setting hasn't been updated, OpenSearch will reject the query.
Consider reading the actual max_inner_result_window setting from the index at query
time and using the minimum of the configured value and MAX_TOP_HITS_RESULT_WINDOW,
or at least validating/capping against the index setting before building the
aggregation.

opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java [608-621]

 case COLLECT, ARRAY_AGG -> {
+    // Use the configured max_inner_result_window if available, otherwise fall back to MAX_TOP_HITS_RESULT_WINDOW
+    int size = helper.getMaxInnerResultWindow() > 0
+        ? Math.min(helper.getMaxInnerResultWindow(), MAX_TOP_HITS_RESULT_WINDOW)
+        : MAX_TOP_HITS_RESULT_WINDOW;
     TopHitsAggregationBuilder topHitsBuilder =
         createTopHitsBuilder(
             aggCall,
             args,
             aggName,
             helper,
-            MAX_TOP_HITS_RESULT_WINDOW,
+            size,
             false,
             false,
             null,
             null);
     yield Pair.of(topHitsBuilder, new TopHitsParser(aggName, false, true));
 }
Suggestion importance[1-10]: 3

__

Why: While the concern about the hardcoded MAX_TOP_HITS_RESULT_WINDOW is valid, the PR's design intentionally requires users to update the index setting (as documented in the mvcombine.md). The improved_code references helper.getMaxInnerResultWindow() which may not exist in the actual codebase, making this suggestion potentially incorrect.

Low

Previous suggestions

Suggestions up to commit 7111912
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix lost exception cause in error handling

The String.format call has 3 arguments but only 2 format specifiers (%s), so the
exception e is passed as an extra argument and will be silently ignored. The
exception's stack trace will be lost. Pass e as the cause to IllegalStateException
instead.

opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java [130-133]

 } catch (IOException e) {
   throw new IllegalStateException(
-      String.format("Failed to update index settings %s for %s", settings, indexExpression, e));
+      String.format("Failed to update index settings %s for %s", settings, indexExpression), e);
 }
Suggestion importance[1-10]: 9

__

Why: The String.format call has only 2 format specifiers but 3 arguments — the exception e is passed as an extra format argument instead of as the cause to IllegalStateException, causing the stack trace to be silently lost. This is a real bug that would make debugging failures very difficult.

High
General
Use newly defined method instead of dead code

The private method getTopHitsAggregationBuilder is defined but never called in the
visible diff — the COLLECT/ARRAY_AGG case uses createTopHitsBuilder instead. This
dead code may indicate that the intended refactoring was not completed, and the new
method's logic (e.g., fetchSource configuration) may not be applied for
COLLECT/ARRAY_AGG.

opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java [628-658]

-private static TopHitsAggregationBuilder getTopHitsAggregationBuilder(
-    AggregateCall aggCall,
-    List<Pair<RexNode, String>> args,
-    String aggName,
-    AggregateBuilderHelper helper,
-    Integer topHitsSize) {
-...
+case COLLECT, ARRAY_AGG -> {
+    TopHitsAggregationBuilder topHitsBuilder =
+        getTopHitsAggregationBuilder(
+            aggCall,
+            args,
+            aggName,
+            helper,
+            MAX_TOP_HITS_RESULT_WINDOW);
+    yield Pair.of(topHitsBuilder, new TopHitsParser(aggName, false, true));
+}
Suggestion importance[1-10]: 5

__

Why: The getTopHitsAggregationBuilder method is defined but never called — the COLLECT/ARRAY_AGG case still uses createTopHitsBuilder. This is a valid observation about potentially dead code, but the suggestion's improved_code only shows the call-site change without confirming the new method's logic is equivalent or superior to createTopHitsBuilder.

Low

@github-actions
Copy link
Contributor

github-actions bot commented Mar 5, 2026

Persistent review updated to latest commit d0714da

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEATURE] Support pushdown collect/array_agg aggregation

1 participant