Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions docs/user/interfaces/endpoint.rst
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ Fetch Size (PPL) [Experimental]
Description
-----------

The ``fetch_size`` parameter limits the number of rows returned in a PPL query response. The value of ``fetch_size`` should be greater than ``0``. In absence of ``fetch_size`` or a value of ``0``, the result size is governed by the ``plugins.query.size_limit`` cluster setting.
The ``fetch_size`` parameter sets a **default** limit on the number of rows returned in a PPL query response. The value of ``fetch_size`` should be greater than ``0``. In absence of ``fetch_size`` or a value of ``0``, the result size is governed by the ``plugins.query.size_limit`` cluster setting.

``fetch_size`` can be specified either as a URL parameter or in the JSON request body. If both are provided, the JSON body value takes precedence.

Expand All @@ -282,7 +282,7 @@ If ``fetch_size`` is larger than ``plugins.query.size_limit``, the result is cap
Note
----

Unlike SQL's ``fetch_size`` which enables cursor-based pagination, PPL's ``fetch_size`` does not return a cursor and does not support fetching additional pages. The response is always complete and final.
Unlike SQL's ``fetch_size`` which enables cursor-based pagination, PPL's ``fetch_size`` does not return a cursor and does not support fetching additional pages. The response is always complete and final. Because there is no pagination, ``fetch_size`` acts as a default response size for queries that do not specify their own row limit. If the query includes an explicit ``head`` command, the ``head`` limit takes precedence — otherwise ``fetch_size`` would silently discard rows with no way to retrieve them.

+--------------------+-------------------------------------+------------------------------------+
| Aspect | SQL ``fetch_size`` | PPL ``fetch_size`` |
Expand All @@ -294,6 +294,37 @@ Unlike SQL's ``fetch_size`` which enables cursor-based pagination, PPL's ``fetch
| Can fetch more? | Yes (with cursor) | No (single response) |
+--------------------+-------------------------------------+------------------------------------+

Interaction with the ``head`` command
--------------------------------------

When a PPL query contains an explicit ``head`` command in the main pipeline, the ``head`` command takes precedence over ``fetch_size``. Because PPL's ``fetch_size`` does not support pagination, capping the result below the user's explicit ``head`` limit would silently discard rows with no way to retrieve them. To avoid this, ``fetch_size`` is ignored when the main pipeline contains a ``head`` command, and the query returns the number of rows specified by ``head``.

If the query does **not** contain a ``head`` command in the main pipeline, ``fetch_size`` limits the result as usual. Note that ``head`` commands inside subqueries (e.g., within a ``join``) do not affect ``fetch_size`` behavior for the outer query, since they operate at a different scope.

+-----------------------------------------------+---------------------------------------------------+
| Query | Behavior with ``fetch_size=5`` |
+===============================================+===================================================+
| ``source=t | fields age`` | ``fetch_size`` applies — returns 5 rows |
+-----------------------------------------------+---------------------------------------------------+
| ``source=t | head 100 | fields age`` | ``head`` takes precedence — returns 100 rows |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetch_size is optional — if you don't want it, don't send it. The server should honor what the client explicitly requests. The fix belongs in the client that's injecting fetch_size by default.

From a REST API perspective, fetch_size is an explicit, optional parameter the caller chooses to send. If the caller sends fetch_size=5, they're requesting at most 5 rows — that's the API contract. Adding server-side logic to silently ignore that parameter based on query content breaks the principle of least surprise at the API layer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a very valid point. If we fix it server-side, we're working around a client behavior by making the API semantics inconsistent. If we fix it client-side, the API contract stays clean.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+-----------------------------------------------+---------------------------------------------------+
| ``source=t1 | join (source=t2 | head 100)`` | ``head`` is in subquery — ``fetch_size`` applies |
+-----------------------------------------------+---------------------------------------------------+

Examples::

# head 100 takes precedence — returns 100 rows, fetch_size=5 is ignored
>> curl -H 'Content-Type: application/json' -X POST localhost:9200/_plugins/_ppl -d '{
"fetch_size" : 5,
"query" : "source = accounts | head 100 | fields firstname"
}'

# No head command — fetch_size=5 limits the result to 5 rows
>> curl -H 'Content-Type: application/json' -X POST localhost:9200/_plugins/_ppl -d '{
"fetch_size" : 5,
"query" : "source = accounts | fields firstname"
}'

Example 1: JSON body
-------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2579,8 +2579,8 @@ public void testExplainFetchSizePushDown() throws IOException {
@Test
public void testExplainFetchSizeWithSmallerHead() throws IOException {
// fetch_size=10 with user's | head 3
// Two LogicalSort nodes: inner fetch=[3] from user head, outer fetch=[10] from fetch_size
// Effective limit = min(3, 10) = 3
// Explicit head takes precedence: only one LogicalSort(fetch=[3]) from user head
// fetch_size does not inject an additional Head when user already has one
String expected = loadExpectedPlan("explain_fetch_size_with_head_push.yaml");
assertYamlEqualsIgnoreId(
expected,
Expand All @@ -2591,8 +2591,8 @@ public void testExplainFetchSizeWithSmallerHead() throws IOException {
@Test
public void testExplainFetchSizeSmallerThanHead() throws IOException {
// fetch_size=5 with user's | head 100
// Two LogicalSort nodes: inner fetch=[100] from user head, outer fetch=[5] from fetch_size
// Effective limit = min(100, 5) = 5
// Explicit head takes precedence: only one LogicalSort(fetch=[100]) from user head
// fetch_size does not inject an additional Head when user already has one
String expected = loadExpectedPlan("explain_fetch_size_smaller_than_head_push.yaml");
assertYamlEqualsIgnoreId(
expected,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@

package org.opensearch.sql.calcite.remote;

import static org.opensearch.sql.legacy.TestUtils.getResponseBody;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_HOBBIES;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_OCCUPATION;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_STATE_COUNTRY;
import static org.opensearch.sql.plugin.rest.RestPPLQueryAction.QUERY_API_ENDPOINT;
import static org.opensearch.sql.util.MatcherUtils.assertJsonEquals;
import static org.opensearch.sql.util.MatcherUtils.rows;
import static org.opensearch.sql.util.MatcherUtils.schema;
Expand All @@ -17,9 +19,13 @@
import static org.opensearch.sql.util.MatcherUtils.verifySchema;

import java.io.IOException;
import java.util.Locale;
import org.json.JSONObject;
import org.junit.Assert;
import org.junit.Test;
import org.opensearch.client.Request;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.Response;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.legacy.TestsConstants;
import org.opensearch.sql.ppl.PPLIntegTestCase;
Expand Down Expand Up @@ -1075,4 +1081,32 @@ public void testComplexSortPushDownForSMJWithMaxOptionAndFieldList() throws IOEx
rows("Jake", "USA", "California", 4, 2023, 70, "a"),
rows("Hello", "USA", "New York", 4, 2023, 30, "e"));
}

@Test
public void testFetchSizeAppliesWhenHeadOnlyInJoinSubquery() throws IOException {
// head inside a join subquery should NOT suppress fetch_size on the outer query.
// The subquery's head operates at a different scope.
// Self-join STATE_COUNTRY on name, right side limited to head 3.
// fetch_size=2 should still cap the outer result to 2 rows.
String query =
String.format(
Locale.ROOT,
"source=%s | inner join left=a, right=b ON a.name = b.name"
+ " [source=%s | sort name | head 3] | sort a.name | fields a.name, a.age",
TEST_INDEX_STATE_COUNTRY,
TEST_INDEX_STATE_COUNTRY);
Request request = new Request("POST", QUERY_API_ENDPOINT);
request.setJsonEntity(
String.format(Locale.ROOT, "{\"query\": \"%s\", \"fetch_size\": 2}", query));
RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder();
restOptionsBuilder.addHeader("Content-Type", "application/json");
request.setOptions(restOptionsBuilder);
Response response = client().performRequest(request);
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
JSONObject result = new JSONObject(getResponseBody(response, true));
verifySchema(result, schema("name", "string"), schema("age", "int"));
// Right side after sort+head 3: David, Hello, Jake. Self-join matches all 3.
// Outer sort by name + fetch_size=2 caps to first 2: David, Hello
verifyDataRows(result, rows("David", 40), rows("Hello", 30));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ public void testFetchSizeWithStats() throws IOException {
}

@Test
public void testFetchSizeWithHead() throws IOException {
// Both head command and fetch_size - the smaller limit should win
// head 3 limits to 3, fetch_size 10 would allow 10, so we get 3
public void testHeadOverridesFetchSizeWhenSmaller() throws IOException {
// Explicit head takes precedence over fetch_size
// head 3 returns 3 rows regardless of fetch_size=10
JSONObject result =
executeQueryWithFetchSize(
String.format("source=%s | head 3 | fields firstname", TEST_INDEX_ACCOUNT), 10);
Expand All @@ -183,16 +183,59 @@ public void testFetchSizeWithHead() throws IOException {
}

@Test
public void testFetchSizeSmallerThanHead() throws IOException {
// fetch_size smaller than head - fetch_size should further limit
// head 100 would return 100, but fetch_size 5 limits to 5
public void testHeadOverridesFetchSizeWhenLarger() throws IOException {
// Explicit head takes precedence over fetch_size
// head 100 should return 100 rows even though fetch_size=5
JSONObject result =
executeQueryWithFetchSize(
String.format("source=%s | head 100 | fields firstname", TEST_INDEX_ACCOUNT), 5);
JSONArray dataRows = result.getJSONArray("datarows");
assertEquals(100, dataRows.length());
}

@Test
public void testHeadOverridesFetchSizeWithOffset() throws IOException {
// Explicit head with offset takes precedence over fetch_size
// head 3 from 2 should skip 2 rows and return 3 rows, ignoring fetch_size=100
JSONObject result =
executeQueryWithFetchSize(
String.format("source=%s | head 3 from 2 | fields firstname", TEST_INDEX_ACCOUNT), 100);
JSONArray dataRows = result.getJSONArray("datarows");
assertEquals(3, dataRows.length());
}

@Test
public void testHeadOverridesFetchSizeWithFilter() throws IOException {
// Explicit head after filter takes precedence over fetch_size
// Even with fetch_size=2, head 5 should return 5 matching rows
JSONObject result =
executeQueryWithFetchSize(
String.format(
"source=%s | where age > 30 | head 5 | fields firstname, age", TEST_INDEX_ACCOUNT),
2);
JSONArray dataRows = result.getJSONArray("datarows");
assertEquals(5, dataRows.length());
}

@Test
public void testHeadEqualToFetchSize() throws IOException {
// When head and fetch_size are the same value, head takes precedence (no double-Head)
JSONObject result =
executeQueryWithFetchSize(
String.format("source=%s | head 7 | fields firstname", TEST_INDEX_ACCOUNT), 7);
JSONArray dataRows = result.getJSONArray("datarows");
assertEquals(7, dataRows.length());
}

@Test
public void testHeadLargerThanDatasetWithFetchSize() throws IOException {
// head 1000 on a 7-row index with fetch_size=3: head takes precedence, returns all 7 rows
JSONObject result =
executeQueryWithFetchSize(String.format("source=%s | head 1000", TEST_INDEX_BANK), 3);
JSONArray dataRows = result.getJSONArray("datarows");
assertEquals(7, dataRows.length());
}

@Test
public void testFetchSizeAsUrlParameter() throws IOException {
// fetch_size specified as URL parameter instead of JSON body
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
calcite:
logical: |
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalSort(fetch=[5])
LogicalProject(age=[$8])
LogicalSort(fetch=[100])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
LogicalProject(age=[$8])
LogicalSort(fetch=[100])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
physical: |
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->100, LIMIT->5, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["age"],"excludes":[]}}, requestedTotalSize=5, pageSize=null, startFrom=0)])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->100, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":100,"timeout":"1m","_source":{"includes":["age"],"excludes":[]}}, requestedTotalSize=100, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
calcite:
logical: |
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalSort(fetch=[10])
LogicalProject(age=[$8])
LogicalSort(fetch=[3])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
LogicalProject(age=[$8])
LogicalSort(fetch=[3])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
physical: |
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->3, LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":3,"timeout":"1m","_source":{"includes":["age"],"excludes":[]}}, requestedTotalSize=3, pageSize=null, startFrom=0)])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->3, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":3,"timeout":"1m","_source":{"includes":["age"],"excludes":[]}}, requestedTotalSize=3, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
calcite:
logical: |
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalSort(fetch=[5])
LogicalProject(age=[$8])
LogicalSort(fetch=[100])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
LogicalProject(age=[$8])
LogicalSort(fetch=[100])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
physical: |
EnumerableLimit(fetch=[10000])
EnumerableCalc(expr#0..16=[{inputs}], age=[$t8])
EnumerableLimit(fetch=[5])
EnumerableLimit(fetch=[100])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
EnumerableLimit(fetch=[100])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
calcite:
logical: |
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalSort(fetch=[10])
LogicalProject(age=[$8])
LogicalSort(fetch=[3])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
LogicalProject(age=[$8])
LogicalSort(fetch=[3])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
physical: |
EnumerableLimit(fetch=[10000])
EnumerableLimit(fetch=[10])
EnumerableCalc(expr#0..16=[{inputs}], age=[$t8])
EnumerableLimit(fetch=[3])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
EnumerableCalc(expr#0..16=[{inputs}], age=[$t8])
EnumerableLimit(fetch=[3])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import static org.opensearch.sql.executor.QueryType.PPL;

import com.google.common.collect.ImmutableList;
import java.util.List;
import lombok.Builder;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.ast.Node;
import org.opensearch.sql.ast.expression.AllFields;
import org.opensearch.sql.ast.statement.Explain;
import org.opensearch.sql.ast.statement.Query;
Expand All @@ -32,7 +34,7 @@ public class AstStatementBuilder extends OpenSearchPPLParserBaseVisitor<Statemen
@Override
public Statement visitPplStatement(OpenSearchPPLParser.PplStatementContext ctx) {
UnresolvedPlan rawPlan = astBuilder.visit(ctx);
if (context.getFetchSize() > 0) {
if (context.getFetchSize() > 0 && !containsHead(rawPlan)) {
rawPlan = new Head(context.getFetchSize(), 0).attach(rawPlan);
}
UnresolvedPlan plan = addSelectAll(rawPlan);
Expand Down Expand Up @@ -69,6 +71,27 @@ public static class StatementBuilderContext {
private final String explainMode;
}

/**
* Checks if the main pipeline contains a {@link Head} node by walking the first-child chain. Only
* the main pipeline is checked — subqueries in joins or nested structures are not traversed. When
* the user's query already includes an explicit {@code head} command, we should not inject an
* additional Head for fetch_size so that the user's explicit limit takes precedence.
*/
private boolean containsHead(UnresolvedPlan plan) {
UnresolvedPlan current = plan;
while (current != null) {
if (current instanceof Head) {
return true;
}
List<? extends Node> children = current.getChild();
if (children.isEmpty() || !(children.get(0) instanceof UnresolvedPlan)) {
break;
}
current = (UnresolvedPlan) children.get(0);
}
return false;
}

private UnresolvedPlan addSelectAll(UnresolvedPlan plan) {
if ((plan instanceof Project) && !((Project) plan).isExcluded()) {
return plan;
Expand Down
Loading
Loading