Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
378bd7d
HIVE-29651: Update ZookeeperExternalSessionsRegistryClient to handle …
tanishq-chugh Jun 4, 2026
607486a
Fix formatting
tanishq-chugh Jun 8, 2026
24b238e
Logging changes to address review comments
tanishq-chugh Jun 8, 2026
288ed83
Update timeout calculation in getSession to prevent overflow
tanishq-chugh Jun 8, 2026
b4d487c
Refactor ClaimsPathListener to use in-place methods instead of a clas…
tanishq-chugh Jun 8, 2026
abb83dd
Fix the FIFO test as per the review comment & remove leftover config …
tanishq-chugh Jun 8, 2026
901c255
Address the cases of HS2-ZK connection getting lost / connection gett…
tanishq-chugh Jun 8, 2026
5318f99
Change the log line added to debug
tanishq-chugh Jun 10, 2026
cd2e600
Fix formatting issues
tanishq-chugh Jun 10, 2026
2cfa130
Address SonarQube issues - 1
tanishq-chugh Jun 11, 2026
345ae06
Logic to kill orphan DAGs left behind by crashed HS2
tanishq-chugh Jun 11, 2026
4decc19
Address Sonarqube - 2
tanishq-chugh Jun 11, 2026
2bf881a
Update Mocks in TestTezTask as per the new code logic & Remove lefto…
tanishq-chugh Jun 12, 2026
9d19541
Fix flakiness of newly added UT testFIFOSessionClaimsFromDifferentReg…
tanishq-chugh Jun 12, 2026
a37ca4a
Address kill dag Race condition in TezExternalSessionState
tanishq-chugh Jun 15, 2026
ff84296
Address SonarQube - 2
tanishq-chugh Jun 17, 2026
da8314e
Address Co-pilot comments and fix the cases for default Sessions
tanishq-chugh Jun 18, 2026
cb98266
Address co-pilot comments - 2
tanishq-chugh Jun 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,11 @@ public interface ExternalSessionsRegistry {
* Closes the external session registry.
*/
void close();

/**
* Returns true if this registry instance currently holds a claim on the given AM.
*/
default boolean isClaimed(String appId) {
return true; // Non-ZK registries case is always true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,26 @@
package org.apache.hadoop.hive.ql.exec.tez;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;

/**
Expand Down Expand Up @@ -133,6 +142,11 @@ public void close(boolean keepDagFilesDir) throws Exception {
// We never close external sessions that don't have errors.
try {
if (externalAppId != null) {
LOG.debug("Returning external session with appID: {}", externalAppId);
SessionState sessionState = SessionState.get();
if (sessionState != null) {
sessionState.setTezSession(null);
}
registry.returnSession(externalAppId);
}
} catch (Exception e) {
Expand Down Expand Up @@ -181,4 +195,73 @@ public boolean killQuery(String reason) throws HiveException {
killQuery.killQuery(queryId, reason, conf, false);
return true;
}

@Override
public DAGClient submitDAG(DAG dag) throws TezException, IOException {
if (!registry.isClaimed(externalAppId)) {
throw new TezException("Cannot submit DAG as the Tez Session no-longer owns the AM: " + externalAppId);
}
try {
return getTezClient().submitDAG(dag);
} catch (TezException e) {
if (e.getMessage() == null || !e.getMessage().contains("App master already running a DAG")) {
throw e;
}
tryKillRunningDAGs(getTezClient());
Comment on lines +206 to +210
return getTezClient().submitDAG(dag);
}
}

private void tryKillRunningDAGs(TezClient session) throws TezException {
if (!registry.isClaimed(externalAppId)) {
throw new TezException("Cannot kill running DAG as the Tez Session no-longer owns the AM: " + externalAppId);
}
LOG.info("External session has an AM which is already running a DAG on app ID {}", externalAppId);
DAGClientAMProtocolBlockingPB proxy = session.sendAMHeartbeat(null);
if (proxy == null) {
throw new TezException("Error while trying to connect to AM for app ID " + externalAppId);
}
long killTimeoutMs = TimeUnit.SECONDS.toMillis(
HiveConf.getIntVar(conf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS));
try {
DAGClientAMProtocolRPC.GetAllDAGsResponseProto allDAGSResponse =
proxy.getAllDAGs(null, DAGClientAMProtocolRPC.GetAllDAGsRequestProto.newBuilder().build());
for (String dagId : allDAGSResponse.getDagIdList()) {
LOG.info("External session: attempting to kill dagId {} on app ID {}", dagId, externalAppId);
proxy.tryKillDAG(null, DAGClientAMProtocolRPC.TryKillDAGRequestProto.newBuilder().setDagId(dagId).build());
waitForDagTerminal(proxy, dagId, killTimeoutMs);
}
} catch (Exception e) {
throw new TezException("Error while trying to kill existing DAG running on app ID " + externalAppId, e);
}
}

private void waitForDagTerminal(DAGClientAMProtocolBlockingPB proxy, String dagId, long timeoutMs)
throws TezException, ServiceException {
long startTimeMs = System.currentTimeMillis();
long pollIntervalMs = conf.getTimeVar(ConfVars.TEZ_DAG_STATUS_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
while (System.currentTimeMillis() - startTimeMs < timeoutMs) {
long remainingMs = timeoutMs - (System.currentTimeMillis() - startTimeMs);
DAGClientAMProtocolRPC.GetDAGStatusResponseProto response = proxy.getDAGStatus(null,
DAGClientAMProtocolRPC.GetDAGStatusRequestProto.newBuilder()
.setDagId(dagId)
.setTimeout(Math.min(pollIntervalMs, remainingMs))
.build());
if (response.hasDagStatus() && response.getDagStatus().hasState()
&& isTerminalDagState(response.getDagStatus().getState())) {
LOG.info("External session: dagId {} on app ID {} reached terminal state {}", dagId, externalAppId,
response.getDagStatus().getState());
return;
}
}
throw new TezException("Timed out after " + timeoutMs + " ms waiting for orphan DAG " + dagId
+ " on app ID " + externalAppId + " to reach terminal state after kill");
}

private static boolean isTerminalDagState(DAGProtos.DAGStatusStateProto state) {
return switch (state) {
case DAG_SUCCEEDED, DAG_KILLED, DAG_FAILED, DAG_ERROR -> true;
default -> false;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import org.apache.hadoop.hive.ql.wm.WmContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;

/**
Expand Down Expand Up @@ -86,6 +88,7 @@ public String toString() {

HiveConf getConf();
TezClient getTezClient();
DAGClient submitDAG(DAG dag) throws TezException, IOException;
boolean isOpen();
boolean isOpening();
boolean getDoAsEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,20 @@ void returnSession(TezSession tezSessionState) {
+ " belongs to the pool. Put it back in");
defaultSessionPool.returnSession((TezSessionPoolSession)tezSessionState);
}

if (useExternalSessions && !tezSessionState.isDefault()) {
if (tezSessionState.getTezClient() != null
&& tezSessionState.getTezClient().getAppMasterApplicationId() != null) {
try {
tezSessionState.close(false);
} catch (Exception ex) {
LOG.warn("Failed to return external Tez session {}", tezSessionState.getSessionId(), ex);
}
} else {
LOG.warn("Not returning session '{}' as tez client or app id is null", tezSessionState.getSessionId());
}
}

// non default session nothing changes. The user can continue to use the existing
// session in the SessionState
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import org.apache.hadoop.hive.registry.impl.TezAmInstance;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -337,6 +339,11 @@ public TezClient getTezClient() {
return baseSession.getTezClient();
}

@Override
public DAGClient submitDAG(DAG dag) throws TezException, IOException {
return baseSession.submitDAG(dag);
}

@Override
public boolean isOpening() {
return baseSession.isOpening();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,13 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.PreWarmVertex;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.Progress;
import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
Expand Down Expand Up @@ -820,6 +822,11 @@ public TezClient getTezClient() {
return session;
}

@Override
public DAGClient submitDAG(DAG dag) throws TezException, IOException {
return getTezClient().submitDAG(dag);
}

@Override
public LocalResource getAppJarLr() {
return appJarLr;
Expand Down
7 changes: 3 additions & 4 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -472,9 +472,8 @@ void ensureSessionHasResources(
TezSession session, String[] nonConfResources) throws Exception {
TezClient client = session.getTezClient();
// TODO null can also mean that this operation was interrupted. Should we really try to re-create the session in that case ?
if (client == null) {
// Note: the only sane case where this can happen is the non-pool one. We should get rid
// of it, in non-pool case perf doesn't matter so we might as well open at get time
if (client == null || !session.isOpen()) {
// Note: We should get rid of it, in non-pool case perf doesn't matter so we might as well open at get time
// and then call update like we do in the else.
// Can happen if the user sets the tez flag after the session was established.
LOG.info("Tez session hasn't been created yet. Opening session");
Expand Down Expand Up @@ -692,7 +691,7 @@ DAGClient submit(DAG dag, Ref<TezSession> sessionStateRef) throws Exception {

private DAGClient submitInternal(DAG dag, TezSession sessionState) throws TezException, IOException {
runtimeContext.init(sessionState);
return sessionState.getTezClient().submitDAG(dag);
return sessionState.submitDAG(dag);
}

private void sessionDestroyOrReturnToPool(Ref<TezSession> sessionStateRef,
Expand Down
Loading
Loading