diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java index 9754a0a4dffe..0c2cb4184a17 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.common.datatable; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Preconditions; import java.io.DataInput; @@ -26,8 +27,10 @@ import java.util.Collections; import java.util.EnumMap; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nullable; import org.apache.pinot.spi.utils.JsonUtils; @@ -149,6 +152,24 @@ public StatMap merge(K key, String value) { return this; } + public Set getStringSet(K key) { + Preconditions.checkArgument(key.getType() == Type.STRING_SET, "Key %s is of type %s, not STRING_SET", key, + key.getType()); + Object o = _map.get(key); + return o == null ? Set.of() : (Set) o; + } + + public StatMap merge(K key, Set value) { + Set oldValue = getStringSet(key); + Set newValue = key.merge(oldValue, value); + if (newValue.isEmpty()) { + _map.remove(key); + } else { + _map.put(key, Collections.unmodifiableSet(new LinkedHashSet<>(newValue))); + } + return this; + } + /** * Returns the value associated with the key. *

@@ -164,6 +185,8 @@ public Object getAny(K key) { return getLong(key); case STRING: return getString(key); + case STRING_SET: + return getStringSet(key); default: throw new IllegalArgumentException("Unsupported type: " + key.getType()); } @@ -216,6 +239,9 @@ public StatMap merge(StatMap other) { case STRING: merge(key, (String) value); break; + case STRING_SET: + merge(key, (Set) value); + break; default: throw new IllegalArgumentException("Unsupported type: " + key.getType()); } @@ -261,6 +287,14 @@ public StatMap merge(DataInput input) case STRING: merge(key, input.readUTF()); break; + case STRING_SET: + int size = input.readInt(); + LinkedHashSet values = new LinkedHashSet<>(size); + for (int j = 0; j < size; j++) { + values.add(input.readUTF()); + } + merge(key, values); + break; default: throw new IllegalStateException("Unknown type " + key.getType()); } @@ -311,6 +345,18 @@ public ObjectNode asJson() { node.put(key.getStatName(), (String) value); } break; + case STRING_SET: + if (value == null) { + if (key.includeDefaultInJson()) { + node.putArray(key.getStatName()); + } + } else { + ArrayNode arrayNode = node.putArray(key.getStatName()); + for (String stringValue : (Set) value) { + arrayNode.add(stringValue); + } + } + break; default: throw new IllegalArgumentException("Unsupported type: " + key.getType()); } @@ -366,6 +412,18 @@ public void serialize(DataOutput output) } break; } + case STRING_SET: { + Set value = getStringSet(key); + if (!value.isEmpty()) { + writtenKeys++; + output.writeByte(ordinal); + output.writeInt(value.size()); + for (String stringValue : value) { + output.writeUTF(stringValue); + } + } + break; + } default: throw new IllegalStateException("Unknown type " + key.getType()); } @@ -398,6 +456,12 @@ private boolean checkContainsNoDefault() { throw new IllegalStateException("String value must be non-null but null is stored for key " + key); } break; + case STRING_SET: + if (value == null || ((Set) value).isEmpty()) { + throw new IllegalStateException("String set value must be non-empty but " + value + " is stored for key " + + key); + } + break; default: throw new IllegalArgumentException("Unsupported type: " + key.getType()); } @@ -493,6 +557,12 @@ default String merge(@Nullable String value1, @Nullable String value2) { return value2 != null ? value2 : value1; } + default Set merge(Set value1, Set value2) { + LinkedHashSet merged = new LinkedHashSet<>(value1); + merged.addAll(value2); + return merged; + } + /** * The type of the values associated to this key. */ @@ -540,6 +610,7 @@ public enum Type { BOOLEAN, INT, LONG, - STRING + STRING, + STRING_SET } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java index 69ae90b675a0..b14e8bd6fea5 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java @@ -38,8 +38,8 @@ */ @JsonPropertyOrder({ "resultTable", "numRowsResultSet", "partialResult", "exceptions", "numGroupsLimitReached", - "numGroupsWarningLimitReached", "numGroups", "maxRowsInJoinReached", "maxRowsInJoin", - "maxRowsInWindowReached", "maxRowsInWindow", "timeUsedMs", "stageStats", + "numGroupsWarningLimitReached", "numGroups", "earlyTerminationReasons", "maxRowsInJoinReached", + "maxRowsInJoin", "maxRowsInWindowReached", "maxRowsInWindow", "timeUsedMs", "stageStats", "maxRowsInOperator", "requestId", "clientRequestId", "brokerId", "numDocsScanned", "totalDocs", "numEntriesScannedInFilter", "numEntriesScannedPostFilter", "numServersQueried", "numServersResponded", "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", @@ -112,7 +112,8 @@ public int getNumRowsResultSet() { @JsonProperty(access = JsonProperty.Access.READ_ONLY) @Override public boolean isPartialResult() { - return getExceptionsSize() > 0 || isNumGroupsLimitReached() || isMaxRowsInJoinReached(); + return getExceptionsSize() > 0 || isNumGroupsLimitReached() || !getEarlyTerminationReasons().isEmpty() + || isMaxRowsInJoinReached(); } @Override @@ -163,6 +164,11 @@ public void mergeNumGroupsWarningLimitReached(boolean numGroupsWarningLimitReach _brokerStats.merge(StatKey.NUM_GROUPS_WARNING_LIMIT_REACHED, numGroupsWarningLimitReached); } + @JsonInclude(JsonInclude.Include.NON_EMPTY) + public List getEarlyTerminationReasons() { + return List.copyOf(_brokerStats.getStringSet(StatKey.EARLY_TERMINATION_REASONS)); + } + @Override public boolean isMaxRowsInJoinReached() { return _maxRowsInJoinReached; @@ -487,7 +493,8 @@ public long merge(long value1, long value2) { public long merge(long value1, long value2) { return Math.max(value1, value2); } - }; + }, + EARLY_TERMINATION_REASONS(StatMap.Type.STRING_SET); private final StatMap.Type _type; diff --git a/pinot-common/src/test/java/org/apache/pinot/common/datatable/StatMapTest.java b/pinot-common/src/test/java/org/apache/pinot/common/datatable/StatMapTest.java index b6b19a2b6669..b8ec1831f798 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/datatable/StatMapTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/datatable/StatMapTest.java @@ -22,6 +22,9 @@ import com.google.common.io.ByteArrayDataOutput; import com.google.common.io.ByteStreams; import java.io.IOException; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; import org.apache.pinot.common.response.broker.BrokerResponseNativeV2; import org.testng.Assert; import org.testng.SkipException; @@ -70,6 +73,15 @@ public void dynamicTypeCheckPutString(MyStats stat) { statMap.merge(stat, "foo"); } + @Test(dataProvider = "allTypeStats", expectedExceptions = IllegalArgumentException.class) + public void dynamicTypeCheckPutStringSet(MyStats stat) { + if (stat.getType() == StatMap.Type.STRING_SET) { + throw new SkipException("Skipping STRING_SET test"); + } + StatMap statMap = new StatMap<>(MyStats.class); + statMap.merge(stat, Set.of("foo")); + } + @Test(dataProvider = "allTypeStats") public void singleEncodeDecode(MyStats stat) throws IOException { @@ -87,6 +99,9 @@ public void singleEncodeDecode(MyStats stat) case STRING: statMap.merge(stat, "foo"); break; + case STRING_SET: + statMap.merge(stat, Set.of("foo")); + break; default: throw new IllegalStateException(); } @@ -111,6 +126,9 @@ public void encodeDecodeAll() case STRING: statMap.merge(stat, "foo"); break; + case STRING_SET: + statMap.merge(stat, Set.of("foo")); + break; default: throw new IllegalStateException(); } @@ -118,6 +136,18 @@ public void encodeDecodeAll() testSerializeDeserialize(statMap); } + @Test + public void stringSetPreservesOrderAndDeduplicates() { + StatMap statMap = new StatMap<>(MyStats.class) + .merge(MyStats.STR_SET_KEY, stringSet("foo", "bar")) + .merge(MyStats.STR_SET_KEY, stringSet("foo", "baz")); + + Assert.assertEquals(List.copyOf(statMap.getStringSet(MyStats.STR_SET_KEY)), List.of("foo", "bar", "baz")); + Assert.assertEquals(statMap.asJson().get("strSetKey").get(0).asText(), "foo"); + Assert.assertEquals(statMap.asJson().get("strSetKey").get(1).asText(), "bar"); + Assert.assertEquals(statMap.asJson().get("strSetKey").get(2).asText(), "baz"); + } + private & StatMap.Key> void testSerializeDeserialize(StatMap statMap) throws IOException { ByteArrayDataOutput output = ByteStreams.newDataOutput(); @@ -153,22 +183,26 @@ static StatMap[] complexStats() { .merge(MyStats.BOOL_KEY, true) .merge(MyStats.LONG_KEY, 1L) .merge(MyStats.INT_KEY, 1) - .merge(MyStats.STR_KEY, "foo"), + .merge(MyStats.STR_KEY, "foo") + .merge(MyStats.STR_SET_KEY, stringSet("foo", "bar")), new StatMap<>(MyStats.class) .merge(MyStats.BOOL_KEY, false) .merge(MyStats.LONG_KEY, 1L) .merge(MyStats.INT_KEY, 1) - .merge(MyStats.STR_KEY, "foo"), + .merge(MyStats.STR_KEY, "foo") + .merge(MyStats.STR_SET_KEY, stringSet("foo", "bar")), new StatMap<>(MyStats.class) .merge(MyStats.BOOL_KEY, true) .merge(MyStats.LONG_KEY, 0L) .merge(MyStats.INT_KEY, 1) - .merge(MyStats.STR_KEY, "foo"), + .merge(MyStats.STR_KEY, "foo") + .merge(MyStats.STR_SET_KEY, stringSet("foo", "bar")), new StatMap<>(MyStats.class) .merge(MyStats.BOOL_KEY, false) .merge(MyStats.LONG_KEY, 1L) .merge(MyStats.INT_KEY, 0) - .merge(MyStats.STR_KEY, "foo"), + .merge(MyStats.STR_KEY, "foo") + .merge(MyStats.STR_SET_KEY, stringSet("foo", "bar")), new StatMap<>(MyStats.class) .merge(MyStats.BOOL_KEY, false) .merge(MyStats.LONG_KEY, 1L) @@ -188,11 +222,16 @@ static MyStats[] allTypeStats() { return MyStats.values(); } + private static Set stringSet(String... values) { + return new LinkedHashSet<>(List.of(values)); + } + public enum MyStats implements StatMap.Key { BOOL_KEY(StatMap.Type.BOOLEAN), LONG_KEY(StatMap.Type.LONG), INT_KEY(StatMap.Type.INT), - STR_KEY(StatMap.Type.STRING); + STR_KEY(StatMap.Type.STRING), + STR_SET_KEY(StatMap.Type.STRING_SET); private final StatMap.Type _type; diff --git a/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2Test.java b/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2Test.java new file mode 100644 index 000000000000..320db22557a0 --- /dev/null +++ b/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2Test.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.response.broker; + +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import org.apache.pinot.common.datatable.StatMap; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + + +public class BrokerResponseNativeV2Test { + @Test + public void testEarlyTerminationReasonsMarkPartialResponse() { + BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2(); + assertTrue(brokerResponse.getEarlyTerminationReasons().isEmpty()); + assertFalse(brokerResponse.isPartialResult()); + + brokerResponse.addBrokerStats(new StatMap<>(BrokerResponseNativeV2.StatKey.class) + .merge(BrokerResponseNativeV2.StatKey.EARLY_TERMINATION_REASONS, + stringSet("DISTINCT_MAX_ROWS", "DISTINCT_MAX_ROWS_WITHOUT_CHANGE")) + .merge(BrokerResponseNativeV2.StatKey.EARLY_TERMINATION_REASONS, + stringSet("DISTINCT_MAX_ROWS", "DISTINCT_MAX_EXECUTION_TIME"))); + + assertEquals(brokerResponse.getEarlyTerminationReasons(), + List.of("DISTINCT_MAX_ROWS", "DISTINCT_MAX_ROWS_WITHOUT_CHANGE", "DISTINCT_MAX_EXECUTION_TIME")); + assertTrue(brokerResponse.isPartialResult()); + } + + private static Set stringSet(String... values) { + return new LinkedHashSet<>(List.of(values)); + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/DistinctQueriesTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/DistinctQueriesTest.java index 808a966d117e..2d521a775e0c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/DistinctQueriesTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/DistinctQueriesTest.java @@ -33,13 +33,15 @@ import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.annotations.Test; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; -/** - * Integration test for distinct early termination query options. Uses SSE (v1) only because these options - * are enforced in DistinctCombineOperator which is not used by the multi-stage query engine. - */ +/// Integration test for distinct early termination query options. +/// +/// SSE responses expose reason-specific boolean fields. MSQ responses expose the aggregated early termination reasons +/// and mark the response as partial. @Test(suiteName = "CustomClusterIntegrationTest") public class DistinctQueriesTest extends CustomDataQueryClusterIntegrationTest { private static final String TABLE_NAME = "DistinctQueriesCustomTest"; @@ -138,6 +140,24 @@ public void testMaxRowsInDistinctEarlyTermination() "partialResult should be true. Response: " + response); } + /// Tests `maxRowsInDistinct` with MSQ: the leaf SSE execution sets early termination metadata and the multi-stage + /// response marks itself partial with the aggregated reason. + @Test + public void testMultiStageMaxRowsInDistinctEarlyTermination() + throws Exception { + setUseMultiStageQueryEngine(true); + String sql = String.format("SELECT DISTINCT %s FROM %s LIMIT 10000", STRING_COL, getTableName()); + JsonNode response = postQueryWithOptions(sql, "maxRowsInDistinct=1"); + + assertTrue(response.path("exceptions").isEmpty(), "expected no exceptions. Response: " + response); + assertTrue(response.path("partialResult").asBoolean(false), + "partialResult should be true. Response: " + response); + assertEquals(response.path("earlyTerminationReasons").path(0).asText(), "DISTINCT_MAX_ROWS", + "expected distinct early termination reason. Response: " + response); + assertFalse(response.has("maxRowsInDistinctReached"), + "MSQ response should expose earlyTerminationReasons instead of legacy V1 flags. Response: " + response); + } + /** * Tests maxRowsWithoutChangeInDistinct: when merging a segment adds no new distinct values, the * segment's numDocsScanned counts toward the no-change budget. With 2 identical segments, the diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java index 5a650941d825..cb4a8c3b4241 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; @@ -44,6 +45,7 @@ import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; +import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock.EarlyTerminationReason; import org.apache.pinot.core.operator.blocks.results.ExplainV2ResultBlock; import org.apache.pinot.core.operator.blocks.results.MetadataResultsBlock; import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock; @@ -414,8 +416,10 @@ private synchronized void mergeExecutionStats(Map executionStats case NUM_CONSUMING_SEGMENTS_MATCHED: _statMap.merge(StatKey.NUM_CONSUMING_SEGMENTS_MATCHED, Integer.parseInt(entry.getValue())); break; - case SORTED: case EARLY_TERMINATION_REASON: + mergeEarlyTerminationReason(entry.getValue()); + break; + case SORTED: break; default: throw new IllegalArgumentException("Unhandled leaf execution stat: " + key); @@ -423,6 +427,20 @@ private synchronized void mergeExecutionStats(Map executionStats } } + private void mergeEarlyTerminationReason(@Nullable String earlyTerminationReason) { + if (earlyTerminationReason == null || earlyTerminationReason.isEmpty()) { + return; + } + try { + EarlyTerminationReason reason = EarlyTerminationReason.valueOf(earlyTerminationReason); + if (reason != EarlyTerminationReason.NONE) { + _statMap.merge(StatKey.EARLY_TERMINATION_REASONS, Set.of(reason.name())); + } + } catch (IllegalArgumentException e) { + LOGGER.debug("Skipping unknown early termination reason: {}", earlyTerminationReason); + } + } + private ExplainedNode asNode(ExplainInfo info) { int size = info.getInputs().size(); List inputs = new ArrayList<>(size); @@ -749,7 +767,8 @@ public String getStatName() { /** * Time spent in single-stage execution engine for this leaf stage. */ - SSE_EXECUTION_TIME_MS(StatMap.Type.LONG, null); + SSE_EXECUTION_TIME_MS(StatMap.Type.LONG, null), + EARLY_TERMINATION_REASONS(StatMap.Type.STRING_SET); // IMPORTANT: When adding new StatKeys, make sure to either create the same key in BrokerResponseNativeV2.StatKey or // call the constructor that accepts a String as last argument and set it to null. // Otherwise the constructor will fail with an IllegalArgumentException which will not be caught and will @@ -798,6 +817,9 @@ public void updateBrokerMetadata(StatMap oldMeta case STRING: oldMetadata.merge(_brokerKey, stats.getString(this)); break; + case STRING_SET: + oldMetadata.merge(_brokerKey, stats.getStringSet(this)); + break; default: throw new IllegalStateException("Unsupported type: " + _type); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java index e69ffac27cc5..685055773a28 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.query.runtime.operator; +import com.fasterxml.jackson.databind.JsonNode; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -29,12 +30,17 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; +import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.common.datatable.StatMap; +import org.apache.pinot.common.response.broker.BrokerResponseNativeV2; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.data.table.Record; import org.apache.pinot.core.data.table.Table; import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock; import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; +import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock.EarlyTerminationReason; import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock; import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; import org.apache.pinot.core.operator.blocks.results.MetadataResultsBlock; @@ -50,6 +56,7 @@ import org.apache.pinot.query.runtime.blocks.SuccessMseBlock; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.apache.pinot.spi.exception.QueryErrorCode; +import org.apache.pinot.spi.utils.JsonUtils; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.annotations.AfterClass; @@ -63,6 +70,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; @@ -408,6 +416,93 @@ void execute() { operator.close(); } + @Test + public void shouldPropagateDistinctEarlyTerminationReason() { + // Given: + QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT DISTINCT intCol FROM tbl"); + DataSchema schema = new DataSchema(new String[]{"intCol"}, + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}); + InstanceResponseBlock metadataBlock = new InstanceResponseBlock(new MetadataResultsBlock()); + metadataBlock.getResponseMetadata().put(DataTable.MetadataKey.EARLY_TERMINATION_REASON.getName(), + EarlyTerminationReason.DISTINCT_MAX_ROWS.name()); + QueryExecutor queryExecutor = mockQueryExecutor(Collections.emptyList(), metadataBlock); + LeafOperator operator = + new LeafOperator(OperatorTestUtil.getTracingContext(), mockQueryRequests(1), schema, queryExecutor, + _executorService); + _operatorRef.set(operator); + + // When: + assertTrue(operator.nextBlock().isEos(), "Expected EOS after reading the metadata block"); + + // Then: + StatMap leafStats = operator.copyStatMaps(); + assertEquals(List.copyOf(leafStats.getStringSet(LeafOperator.StatKey.EARLY_TERMINATION_REASONS)), + List.of(EarlyTerminationReason.DISTINCT_MAX_ROWS.name())); + + BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2(); + MultiStageOperator.Type.LEAF.mergeInto(brokerResponse, leafStats); + assertEquals(brokerResponse.getEarlyTerminationReasons(), List.of(EarlyTerminationReason.DISTINCT_MAX_ROWS.name())); + assertTrue(brokerResponse.isPartialResult()); + JsonNode responseJson = JsonUtils.objectToJsonNode(brokerResponse); + assertEquals(responseJson.path("earlyTerminationReasons").path(0).asText(), + EarlyTerminationReason.DISTINCT_MAX_ROWS.name()); + assertFalse(responseJson.has("maxRowsInDistinctReached")); + assertFalse(responseJson.has("maxRowsWithoutChangeInDistinctReached")); + assertFalse(responseJson.has("maxExecutionTimeInDistinctReached")); + assertTrue(responseJson.path("partialResult").asBoolean(false)); + + operator.close(); + } + + @Test + public void shouldSkipNoneEarlyTerminationReason() { + assertSkippedEarlyTerminationReason(EarlyTerminationReason.NONE.name()); + } + + @Test + public void shouldSkipUnknownEarlyTerminationReason() { + assertSkippedEarlyTerminationReason("UNKNOWN_REASON"); + } + + @Test + public void shouldSkipEmptyEarlyTerminationReason() { + assertSkippedEarlyTerminationReason(""); + } + + @Test + public void shouldSkipNullEarlyTerminationReason() { + assertSkippedEarlyTerminationReason(null); + } + + private void assertSkippedEarlyTerminationReason(@Nullable String earlyTerminationReason) { + // Given: + QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT DISTINCT intCol FROM tbl"); + DataSchema schema = new DataSchema(new String[]{"intCol"}, + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}); + InstanceResponseBlock metadataBlock = new InstanceResponseBlock(new MetadataResultsBlock()); + metadataBlock.getResponseMetadata().put(DataTable.MetadataKey.EARLY_TERMINATION_REASON.getName(), + earlyTerminationReason); + QueryExecutor queryExecutor = mockQueryExecutor(Collections.emptyList(), metadataBlock); + LeafOperator operator = + new LeafOperator(OperatorTestUtil.getTracingContext(), mockQueryRequests(1), schema, queryExecutor, + _executorService); + _operatorRef.set(operator); + + // When: + assertTrue(operator.nextBlock().isEos(), "Expected EOS after reading the metadata block"); + + // Then: + StatMap leafStats = operator.copyStatMaps(); + assertTrue(leafStats.getStringSet(LeafOperator.StatKey.EARLY_TERMINATION_REASONS).isEmpty()); + + BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2(); + MultiStageOperator.Type.LEAF.mergeInto(brokerResponse, leafStats); + assertTrue(brokerResponse.getEarlyTerminationReasons().isEmpty()); + assertFalse(brokerResponse.isPartialResult()); + + operator.close(); + } + @Test public void shouldReturnAggregationResultBlock() { // Given: