Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.inject.Inject;
Expand Down Expand Up @@ -86,6 +87,7 @@
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.spi.query.QueryProgressStats;
import org.apache.pinot.spi.trace.QueryFingerprint;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.RequestScope;
Expand Down Expand Up @@ -575,6 +577,60 @@ public void processSqlQueryWithBothEnginesAndCompareResults(String query, @Suspe
}
}

@GET
@Path("query/{id}/progress")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_RUNNING_QUERY)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Get progress for a running query as identified by the id",
notes = "Progress is derived from processed work units over total work units. Single-stage work units are "
+ "selected segments; multi-stage work units include selected segments and stage op-chains. Multi-stage "
+ "responses may include labeled details for component-level progress.")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success"),
@ApiResponse(code = 400, message = "Bad Request"),
@ApiResponse(code = 404, message = "Query not found on the requested broker"),
@ApiResponse(code = 500, message = "Internal server error")
})
public QueryProgressStats getQueryProgress(
@ApiParam(value = "Query id", required = true) @PathParam("id") String id,
@ApiParam(value = "Determines if query id is internal or provided by the client") @QueryParam("client")
@DefaultValue("false") boolean isClient,
@ApiParam(value = "Timeout for servers to respond the progress request") @QueryParam("timeoutMs")
@DefaultValue("1000") int timeoutMs) {
try {
long reqId;
if (isClient) {
OptionalLong requestId = _requestHandler.getRequestIdByClientId(id);
if (!requestId.isPresent()) {
throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND)
.entity(String.format("Client query: %s not found on the broker", id))
.build());
}
reqId = requestId.getAsLong();
} else {
reqId = Long.parseLong(id);
}
QueryProgressStats progressStats =
_requestHandler.getQueryProgressStats(reqId, timeoutMs, _executor, _httpConnMgr);
if (progressStats != null) {
return progressStats;
}
} catch (NumberFormatException e) {
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.BAD_REQUEST_EXCEPTIONS, 1L);
throw new BadRequestException(String.format("Invalid internal query id: %s", id), e);
} catch (WebApplicationException e) {
throw e;
} catch (Exception e) {
throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity(String.format("Failed to get progress for query: %s on the broker due to error: %s", id,
e.getMessage()))
.build());
}
throw new WebApplicationException(
Response.status(Response.Status.NOT_FOUND).entity(String.format("Query: %s not found on the broker", id))
.build());
}

@DELETE
@Path("query/{id}")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.spi.query.QueryProgressStats;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants.Broker;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
Expand Down Expand Up @@ -411,6 +412,15 @@ public Map<Long, String> getRunningQueries() {
return Collections.unmodifiableMap(_queriesById);
}

@Override
@Nullable
public QueryProgressStats getQueryProgressStats(long queryId, int timeoutMs, Executor executor,
HttpClientConnectionManager connMgr)
throws Exception {
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker");
return null;
}

@Override
public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr,
Map<String, Integer> serverResponses)
Expand Down Expand Up @@ -439,6 +449,7 @@ public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Execut

@Override
public OptionalLong getRequestIdByClientId(String clientQueryId) {
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker");
return _clientQueryIds.entrySet().stream()
.filter(e -> clientQueryId.equals(e.getValue()))
.mapToLong(Map.Entry::getKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,15 @@
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.spi.query.QueryExecutionContext;
import org.apache.pinot.spi.query.QueryProgressStats;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.spi.trace.QueryFingerprint;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Broker;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
import org.apache.pinot.spi.utils.DataSizeUtils;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.FilterKind;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
Expand Down Expand Up @@ -338,6 +340,65 @@ protected boolean handleCancel(long queryId, int timeoutMs, Executor executor, H
return true;
}

@Override
@Nullable
public QueryProgressStats getQueryProgressStats(long queryId, int timeoutMs, Executor executor,
HttpClientConnectionManager connMgr)
throws Exception {
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker");
QueryServers queryServers = _serversById.get(queryId);
if (queryServers == null) {
return null;
}

// TODO: Use different global query id for OFFLINE and REALTIME table after releasing 0.12.0. See QueryIdUtils for
// details.
String globalQueryId = getGlobalQueryId(queryId);
List<String> serverUrls = new ArrayList<>(queryServers._servers.size());
for (ServerInstance serverInstance : queryServers._servers) {
serverUrls.add(String.format("%s/query/%s/progress", serverInstance.getAdminEndpoint(), globalQueryId));
}
if (serverUrls.isEmpty()) {
return null;
}

CompletionService<MultiHttpRequestResponse> completionService =
new MultiHttpRequest(executor, connMgr).executeGet(serverUrls, null, timeoutMs);
List<QueryProgressStats> serverProgressStats = new ArrayList<>(serverUrls.size());
List<String> errMsgs = new ArrayList<>(serverUrls.size());
for (int i = 0; i < serverUrls.size(); i++) {
MultiHttpRequestResponse httpRequestResponse = null;
URI uri = null;
try {
httpRequestResponse = completionService.take().get();
uri = httpRequestResponse.getURI();
int status = httpRequestResponse.getResponse().getCode();
if (status == 200) {
String responseString = EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
serverProgressStats.add(JsonUtils.stringToObject(responseString, QueryProgressStats.class));
} else if (status != 404) {
Comment thread
xiangfu0 marked this conversation as resolved.
String responseString = EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
throw new Exception(
String.format("Unexpected status=%d and response='%s' from uri='%s'", status, responseString, uri));
}
} catch (Exception e) {
LOGGER.debug("Failed to get progress for query id: {} from uri: {}", queryId, uri, e);
errMsgs.add(e.getMessage());
} finally {
Comment thread
xiangfu0 marked this conversation as resolved.
if (httpRequestResponse != null) {
httpRequestResponse.close();
}
}
}
if (!serverProgressStats.isEmpty()) {
return QueryProgressStats.aggregate(serverProgressStats);
}
if (!errMsgs.isEmpty()) {
throw new Exception("Unexpected responses from servers: " + StringUtils.join(errMsgs, ","));
}
return null;
}

@Override
protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions,
JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.spi.auth.broker.RequesterIdentity;
import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.spi.query.QueryProgressStats;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.RequestScope;
import org.apache.pinot.spi.trace.Tracing;
Expand Down Expand Up @@ -82,6 +83,13 @@ default BrokerResponse handleExplainTimeSeriesRequest(String lang, String rawQue

Map<Long, String> getRunningQueries();

/// Returns segment-based progress for a running query, or `null` if the query is not running or progress is not
/// available for the query engine.
@Nullable
QueryProgressStats getQueryProgressStats(long queryId, int timeoutMs, Executor executor,
HttpClientConnectionManager connMgr)
throws Exception;

/**
* Cancel a query as identified by the queryId. This method is non-blocking so the query may still run for a while
* after calling this method. This cancel method can be called multiple times.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.pinot.spi.auth.broker.RequesterIdentity;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.spi.query.QueryProgressStats;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
Expand Down Expand Up @@ -159,6 +160,22 @@ public Map<Long, String> getRunningQueries() {
return queries;
}

@Override
@Nullable
public QueryProgressStats getQueryProgressStats(long queryId, int timeoutMs, Executor executor,
HttpClientConnectionManager connMgr)
throws Exception {
// Both engines share the same request ID generator, so the query will have unique IDs across the two engines.
if (_multiStageBrokerRequestHandler != null) {
QueryProgressStats mseProgressStats =
_multiStageBrokerRequestHandler.getQueryProgressStats(queryId, timeoutMs, executor, connMgr);
if (mseProgressStats != null) {
return mseProgressStats;
}
}
return _singleStageBrokerRequestHandler.getQueryProgressStats(queryId, timeoutMs, executor, connMgr);
}

@Override
public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr,
Map<String, Integer> serverResponses)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -98,6 +99,7 @@
import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.spi.metrics.PinotMeter;
import org.apache.pinot.spi.query.QueryExecutionContext;
import org.apache.pinot.spi.query.QueryProgressStats;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.spi.trace.QueryFingerprint;
import org.apache.pinot.spi.trace.RequestContext;
Expand Down Expand Up @@ -136,6 +138,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
private final WorkerManager _multiClusterWorkerManager;
private final MailboxService _mailboxService;
private final QueryDispatcher _queryDispatcher;
private final Map<Long, QueryExecutionContext> _executionContextsById;
@Nullable
private final ServerRoutingStatsManager _serverRoutingStatsManager;
private final boolean _explainAskingServerDefault;
Expand Down Expand Up @@ -205,6 +208,7 @@ public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId
_queryDispatcher =
new QueryDispatcher(_mailboxService, failureDetector, tlsConfig, isQueryCancellationEnabled(), cancelTimeout,
dispatchKeepAliveTimeMs, dispatchKeepAliveTimeoutMs, dispatchKeepAliveWithoutCalls);
_executionContextsById = new ConcurrentHashMap<>();
LOGGER.info("Initialized MultiStageBrokerRequestHandler on host: {}, port: {} with broker id: {}, timeout: {}ms, "
+ "query log max length: {}, query log max rate: {}, query cancellation enabled: {}", hostname, port,
_brokerId, _brokerTimeoutMs, _queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit(),
Expand Down Expand Up @@ -271,6 +275,37 @@ public void shutDown() {
_queryDispatcher.shutdown();
}

@Override
@Nullable
public QueryProgressStats getQueryProgressStats(long queryId, int timeoutMs, Executor executor,
HttpClientConnectionManager connMgr) {
QueryExecutionContext executionContext = _executionContextsById.get(queryId);
if (executionContext == null) {
return null;
}
List<QueryProgressStats> progressStatsList = new ArrayList<>(2);
List<QueryProgressStats> details = new ArrayList<>(4);
QueryProgressStats brokerProgressStats = executionContext.getProgressStats().withLabel("Broker");
progressStatsList.add(brokerProgressStats);
details.add(brokerProgressStats);
QueryProgressStats serverProgressStats = _queryDispatcher.getQueryProgressStats(queryId, timeoutMs);
if (serverProgressStats != null) {
progressStatsList.add(serverProgressStats);
if (serverProgressStats.getDetails().isEmpty()) {
details.add(serverProgressStats.withLabel("Servers"));
} else {
details.addAll(serverProgressStats.getDetails());
}
}
return QueryProgressStats.aggregate(progressStatsList).withLabel("Query").withDetails(details);
}

@Override
protected void onQueryFinish(long requestId) {
super.onQueryFinish(requestId);
_executionContextsById.remove(requestId);
}

@Override
protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions,
JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext,
Expand Down Expand Up @@ -690,6 +725,11 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI

try {
String clientRequestId = extractClientRequestId(query.getSqlNodeAndOptions());
QueryExecutionContext executionContext = QueryThreadContext.get().getExecutionContext();
executionContext.addTotalWorkUnits(1);
if (isQueryCancellationEnabled()) {
_executionContextsById.put(requestId, executionContext);
}
onQueryStart(requestId, clientRequestId, query.getTextQuery());
long executionStartTimeNs = System.nanoTime();

Expand All @@ -700,6 +740,7 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI
if (allLeafStagesEmpty) {
try {
queryResults = QueryDispatcher.runReducer(dispatchableSubPlan, query.getOptions(), _mailboxService);
executionContext.incrementProcessedWorkUnits();
} catch (QueryException e) {
// Re-throw typed errors (auth, validation, etc.) so they propagate with their
// original error codes, matching the dispatch branch behavior.
Expand All @@ -724,6 +765,7 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI
try {
queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan,
timer.getRemainingTimeMs(), query.getOptions(), _serverRoutingStatsManager);
executionContext.incrementProcessedWorkUnits();
} catch (QueryException e) {
throw e;
} catch (Throwable t) {
Expand Down
Loading
Loading