Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public ObjectNode asJson() {
if (key.includeDefaultInJson()) {
node.put(key.getStatName(), false);
}
} else {
} else if (key.includeInJson()) {
node.put(key.getStatName(), (boolean) value);
}
break;
Expand All @@ -289,7 +289,7 @@ public ObjectNode asJson() {
if (key.includeDefaultInJson()) {
node.put(key.getStatName(), 0);
}
} else {
} else if (key.includeInJson()) {
node.put(key.getStatName(), (int) value);
}
break;
Expand All @@ -298,7 +298,7 @@ public ObjectNode asJson() {
if (key.includeDefaultInJson()) {
node.put(key.getStatName(), 0L);
}
} else {
} else if (key.includeInJson()) {
node.put(key.getStatName(), (long) value);
}
break;
Expand All @@ -307,7 +307,7 @@ public ObjectNode asJson() {
if (key.includeDefaultInJson()) {
node.put(key.getStatName(), "");
}
} else {
} else if (key.includeInJson()) {
node.put(key.getStatName(), (String) value);
}
break;
Expand Down Expand Up @@ -502,6 +502,10 @@ default boolean includeDefaultInJson() {
return false;
}

default boolean includeInJson() {
return true;
}

static int minPositive(int value1, int value2) {
if (value1 == 0 && value2 >= 0) {
return value2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public enum BrokerGauge implements AbstractMetrics.Gauge {
* Per-server adaptive routing stats exported as metrics (MSE / multi-stage engine).
*/
ADAPTIVE_SERVER_MSE_NUM_IN_FLIGHT_REQUESTS("adaptiveServerMseNumInFlightRequests", false),
ADAPTIVE_SERVER_MSE_LATENCY_EMA("adaptiveServerMseLatencyEma", false),
ADAPTIVE_SERVER_MSE_HYBRID_SCORE("adaptiveServerMseHybridScore", false),

/**
* The queue size of ServerRoutingStatsManager main executor service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.metrics.BrokerGauge;
Expand Down Expand Up @@ -452,19 +451,17 @@ private void exportStatsAsMetrics() {
BrokerGauge.ADAPTIVE_SERVER_NUM_IN_FLIGHT_REQUESTS,
BrokerGauge.ADAPTIVE_SERVER_LATENCY_EMA,
BrokerGauge.ADAPTIVE_SERVER_HYBRID_SCORE);
// TODO: Export MSE latency stats once we support it
exportStatsForMap(_mseServerQueryStatsMap, "server.",
BrokerGauge.ADAPTIVE_SERVER_MSE_NUM_IN_FLIGHT_REQUESTS,
null,
null);
BrokerGauge.ADAPTIVE_SERVER_MSE_LATENCY_EMA,
BrokerGauge.ADAPTIVE_SERVER_MSE_HYBRID_SCORE);
} catch (Exception e) {
LOGGER.error("Exception caught while exporting routing stats as metrics.", e);
}
}

private void exportStatsForMap(ConcurrentHashMap<String, ServerRoutingStatsEntry> statsMap, String tagPrefix,
BrokerGauge numInFlightGauge, @Nullable BrokerGauge latencyEmaGauge,
@Nullable BrokerGauge hybridScoreGauge) {
BrokerGauge numInFlightGauge, BrokerGauge latencyEmaGauge, BrokerGauge hybridScoreGauge) {
for (Map.Entry<String, ServerRoutingStatsEntry> entry : statsMap.entrySet()) {
String serverInstanceId = entry.getKey();
ServerRoutingStatsEntry stats = entry.getValue();
Expand All @@ -484,12 +481,8 @@ private void exportStatsForMap(ConcurrentHashMap<String, ServerRoutingStatsEntry

String tag = tagPrefix + serverInstanceId;
_brokerMetrics.setValueOfGlobalGauge(numInFlightGauge, tag, numInFlightRequests);
if (latencyEmaGauge != null) {
_brokerMetrics.setValueOfGlobalGauge(latencyEmaGauge, tag, (long) latencyEma);
}
if (hybridScoreGauge != null) {
_brokerMetrics.setValueOfGlobalGauge(hybridScoreGauge, tag, (long) hybridScore);
}
_brokerMetrics.setValueOfGlobalGauge(latencyEmaGauge, tag, (long) latencyEma);
_brokerMetrics.setValueOfGlobalGauge(hybridScoreGauge, tag, (long) hybridScore);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,19 @@ public Map<Integer, DispatchablePlanFragment> constructDispatchablePlanFragmentM
if (dispatchablePlanMetadata.getTimeBoundaryInfo() != null) {
dispatchablePlanFragment.setTimeBoundaryInfo(dispatchablePlanMetadata.getTimeBoundaryInfo());
}

PlanFragment planFrag = dispatchablePlanFragment.getPlanFragment();
PlanNode root = planFrag != null ? planFrag.getFragmentRoot() : null;
FragmentType fragmentType = FragmentType.classify(root,
!dispatchablePlanMetadata.getScannedTables().isEmpty(), _dispatchablePlanMetadataMap);
if (fragmentType != null) {
dispatchablePlanFragment.setFragmentType(fragmentType);
}
}
return dispatchablePlanFragmentMap;
}


private Map<Integer, DispatchablePlanFragment> createDispatchablePlanFragmentMap(PlanFragment planFragmentRoot) {
HashMap<Integer, DispatchablePlanFragment> result =
Maps.newHashMapWithExpectedSize(_dispatchablePlanMetadataMap.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public class DispatchablePlanFragment {
// used for passing custom properties to build StageMetadata on the server.
private final Map<String, String> _customProperties;

// Broker-only classification — not shipped to servers.
private FragmentType _fragmentType;

public DispatchablePlanFragment(PlanFragment planFragment) {
this(planFragment, new ArrayList<>(), new HashMap<>(), new HashMap<>());
}
Expand Down Expand Up @@ -132,4 +135,12 @@ public void setServerInstanceToWorkerIdMap(Map<QueryServerInstance, List<Integer
public Set<QueryServerInstance> getServerInstances() {
return _serverInstanceToWorkerIdMap.keySet();
}

public FragmentType getFragmentType() {
return _fragmentType;
}

public void setFragmentType(FragmentType fragmentType) {
_fragmentType = fragmentType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.query.planner.physical;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
import org.apache.pinot.query.planner.plannode.MailboxSendNode;
import org.apache.pinot.query.planner.plannode.PlanNode;


public enum FragmentType {
// Scans a fact table; non-SINGLETON sender
LEAF,
// Dim-table leaf via SINGLETON exchange, with a leaf upstream sender
SINGLETON_LEAF,
// Non-scanning stage (join, agg, sort); timings reflect upstream cascade delays
INTERMEDIATE;

public static FragmentType classify(PlanNode root, boolean hasScannedTable,
Map<Integer, DispatchablePlanMetadata> metadataMap) {
if (!(root instanceof MailboxSendNode)) {
return null;
}
if (!hasScannedTable) {
return INTERMEDIATE;
}
List<Integer> singletonSenderStageIds = getSingletonReceiveSenderStageIds(root);
if (singletonSenderStageIds.isEmpty()) {
return LEAF;
}
boolean allSendersAreLeaves = singletonSenderStageIds.stream().allMatch(senderStageId -> {
DispatchablePlanMetadata senderMeta = metadataMap.get(senderStageId);
return senderMeta != null && !senderMeta.getScannedTables().isEmpty();
});
return allSendersAreLeaves ? SINGLETON_LEAF : INTERMEDIATE;
}

static List<Integer> getSingletonReceiveSenderStageIds(PlanNode node) {
List<Integer> result = new ArrayList<>();
collectSingletonReceiveSenderStageIds(node, result);
return result;
}

private static void collectSingletonReceiveSenderStageIds(PlanNode node, List<Integer> result) {
if (node instanceof MailboxReceiveNode) {
MailboxReceiveNode receiveNode = (MailboxReceiveNode) node;
if (receiveNode.getDistributionType() == RelDistribution.Type.SINGLETON) {
result.add(receiveNode.getSenderStageId());
}
}
for (PlanNode input : node.getInputs()) {
collectSingletonReceiveSenderStageIds(input, result);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.query.planner.physical;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.query.planner.plannode.FilterNode;
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
import org.apache.pinot.query.planner.plannode.MailboxSendNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;


public class FragmentTypeTest {

private static final DataSchema SCHEMA = new DataSchema(
new String[]{"col1"}, new ColumnDataType[]{ColumnDataType.INT});

@Test
public void testNonMailboxSendRootReturnsNull() {
PlanNode filterNode = new FilterNode(0, SCHEMA, null, List.of(), null);
assertNull(FragmentType.classify(filterNode, true, Map.of()));
}

@Test
public void testNullRootReturnsNull() {
assertNull(FragmentType.classify(null, true, Map.of()));
}

@Test
public void testNoScannedTableReturnsIntermediate() {
MailboxSendNode sendNode = new MailboxSendNode(0, SCHEMA, List.of(),
1, PinotRelExchangeType.STREAMING, RelDistribution.Type.HASH_DISTRIBUTED,
List.of(0), false, null, false, "murmur");
assertEquals(FragmentType.classify(sendNode, false, Map.of()), FragmentType.INTERMEDIATE);
}

@Test
public void testLeafWithNoSingletonReceive() {
// A send node with no receive children (pure leaf scan)
MailboxSendNode sendNode = new MailboxSendNode(1, SCHEMA, List.of(),
0, PinotRelExchangeType.STREAMING, RelDistribution.Type.HASH_DISTRIBUTED,
List.of(0), false, null, false, "murmur");
assertEquals(FragmentType.classify(sendNode, true, Map.of()), FragmentType.LEAF);
}

@Test
public void testLeafWithNonSingletonReceive() {
// A receive node with HASH distribution (not SINGLETON) should not affect LEAF classification
MailboxReceiveNode receiveNode = new MailboxReceiveNode(1, SCHEMA, 2,
PinotRelExchangeType.STREAMING, RelDistribution.Type.HASH_DISTRIBUTED,
List.of(0), null, false, false, null);
MailboxSendNode sendNode = new MailboxSendNode(1, SCHEMA, List.of(receiveNode),
0, PinotRelExchangeType.STREAMING, RelDistribution.Type.HASH_DISTRIBUTED,
List.of(0), false, null, false, "murmur");
assertEquals(FragmentType.classify(sendNode, true, Map.of()), FragmentType.LEAF);
}

@Test
public void testSingletonLeafWhenSenderIsLeaf() {
// Stage 1 has a SINGLETON receive from stage 2, and stage 2 has scanned tables
MailboxReceiveNode receiveNode = new MailboxReceiveNode(1, SCHEMA, 2,
PinotRelExchangeType.STREAMING, RelDistribution.Type.SINGLETON,
null, null, false, false, null);
MailboxSendNode sendNode = new MailboxSendNode(1, SCHEMA, List.of(receiveNode),
0, PinotRelExchangeType.STREAMING, RelDistribution.Type.HASH_DISTRIBUTED,
List.of(0), false, null, false, "murmur");

Map<Integer, DispatchablePlanMetadata> metadataMap = new HashMap<>();
DispatchablePlanMetadata senderMeta = new DispatchablePlanMetadata();
senderMeta.addScannedTable("dimTable");
metadataMap.put(2, senderMeta);

assertEquals(FragmentType.classify(sendNode, true, metadataMap), FragmentType.SINGLETON_LEAF);
}

@Test
public void testIntermediateWhenSingletonSenderHasNoScannedTables() {
// Stage 1 has a SINGLETON receive from stage 2, but stage 2 has no scanned tables
MailboxReceiveNode receiveNode = new MailboxReceiveNode(1, SCHEMA, 2,
PinotRelExchangeType.STREAMING, RelDistribution.Type.SINGLETON,
null, null, false, false, null);
MailboxSendNode sendNode = new MailboxSendNode(1, SCHEMA, List.of(receiveNode),
0, PinotRelExchangeType.STREAMING, RelDistribution.Type.HASH_DISTRIBUTED,
List.of(0), false, null, false, "murmur");

Map<Integer, DispatchablePlanMetadata> metadataMap = new HashMap<>();
DispatchablePlanMetadata senderMeta = new DispatchablePlanMetadata();
metadataMap.put(2, senderMeta);

assertEquals(FragmentType.classify(sendNode, true, metadataMap), FragmentType.INTERMEDIATE);
}

@Test
public void testIntermediateWhenSingletonSenderMetadataMissing() {
// Stage 1 has a SINGLETON receive from stage 2, but stage 2 metadata is null
MailboxReceiveNode receiveNode = new MailboxReceiveNode(1, SCHEMA, 2,
PinotRelExchangeType.STREAMING, RelDistribution.Type.SINGLETON,
null, null, false, false, null);
MailboxSendNode sendNode = new MailboxSendNode(1, SCHEMA, List.of(receiveNode),
0, PinotRelExchangeType.STREAMING, RelDistribution.Type.HASH_DISTRIBUTED,
List.of(0), false, null, false, "murmur");

assertEquals(FragmentType.classify(sendNode, true, Map.of()), FragmentType.INTERMEDIATE);
}

@Test
public void testMultipleSingletonReceivesAllLeaves() {
// Two SINGLETON receives, both senders have scanned tables
MailboxReceiveNode receive1 = new MailboxReceiveNode(1, SCHEMA, 2,
PinotRelExchangeType.STREAMING, RelDistribution.Type.SINGLETON,
null, null, false, false, null);
MailboxReceiveNode receive2 = new MailboxReceiveNode(1, SCHEMA, 3,
PinotRelExchangeType.STREAMING, RelDistribution.Type.SINGLETON,
null, null, false, false, null);
MailboxSendNode sendNode = new MailboxSendNode(1, SCHEMA, List.of(receive1, receive2),
0, PinotRelExchangeType.STREAMING, RelDistribution.Type.HASH_DISTRIBUTED,
List.of(0), false, null, false, "murmur");

Map<Integer, DispatchablePlanMetadata> metadataMap = new HashMap<>();
DispatchablePlanMetadata meta2 = new DispatchablePlanMetadata();
meta2.addScannedTable("dim1");
metadataMap.put(2, meta2);
DispatchablePlanMetadata meta3 = new DispatchablePlanMetadata();
meta3.addScannedTable("dim2");
metadataMap.put(3, meta3);

assertEquals(FragmentType.classify(sendNode, true, metadataMap), FragmentType.SINGLETON_LEAF);
}

@Test
public void testMultipleSingletonReceivesOnlyOneIsLeaf() {
// Two SINGLETON receives; one sender has scanned tables, the other does not
MailboxReceiveNode receive1 = new MailboxReceiveNode(1, SCHEMA, 2,
PinotRelExchangeType.STREAMING, RelDistribution.Type.SINGLETON,
null, null, false, false, null);
MailboxReceiveNode receive2 = new MailboxReceiveNode(1, SCHEMA, 3,
PinotRelExchangeType.STREAMING, RelDistribution.Type.SINGLETON,
null, null, false, false, null);
MailboxSendNode sendNode = new MailboxSendNode(1, SCHEMA, List.of(receive1, receive2),
0, PinotRelExchangeType.STREAMING, RelDistribution.Type.HASH_DISTRIBUTED,
List.of(0), false, null, false, "murmur");

Map<Integer, DispatchablePlanMetadata> metadataMap = new HashMap<>();
DispatchablePlanMetadata meta2 = new DispatchablePlanMetadata();
meta2.addScannedTable("dim1");
metadataMap.put(2, meta2);
DispatchablePlanMetadata meta3 = new DispatchablePlanMetadata();
metadataMap.put(3, meta3);

assertEquals(FragmentType.classify(sendNode, true, metadataMap), FragmentType.INTERMEDIATE);
}
}
Loading
Loading