diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java index ee9c9012f..36b10924f 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java @@ -230,19 +230,23 @@ public synchronized void stop(boolean shutdownContext) { LOG.warn("Exception while waiting for end session reply.", e); Utils.propagate(e); } finally { + IOException ex = new IOException("RSCClient instance stopped."); if (driverRpc.isSuccess()) { try { driverRpc.get().close(); } catch (Exception e) { LOG.warn("Error stopping RPC.", e); } + } else if (!driverRpc.isDone()){ + driverRpc.setFailure(ex); + LOG.warn("Set driverRpc as failure in stopping RSCClient."); } // Report failure for all pending jobs, so that clients can react. for (Map.Entry> e : jobs.entrySet()) { LOG.info("Failing pending job {} due to shutdown.", e.getKey()); try { - e.getValue().setFailure(new IOException("RSCClient instance stopped.")); + e.getValue().setFailure(ex); } catch (Exception e2) { LOG.info("Job " + e.getKey() + " already failed.", e2); } diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 8346b4b5b..d1d16366f 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -356,6 +356,8 @@ object LivyConf { val SESSION_ALLOW_CUSTOM_CLASSPATH = Entry("livy.server.session.allow-custom-classpath", false) + val REQUEST_TIMEOUT = Entry("livy.server.request.timeout", "3s") + val SPARK_MASTER = "spark.master" val SPARK_DEPLOY_MODE = "spark.submit.deployMode" val SPARK_JARS = "spark.jars" diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index 4250794dc..2471815a2 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -566,14 +566,16 @@ class InteractiveSession( } def statements: IndexedSeq[Statement] = { - ensureActive() - val r = client.get.getReplJobResults().get() + ensureRunning() + val r = client.get.getReplJobResults().get( + livyConf.getTimeAsMs(LivyConf.REQUEST_TIMEOUT), TimeUnit.MILLISECONDS) r.statements.toIndexedSeq } def getStatement(stmtId: Int): Option[Statement] = { - ensureActive() - val r = client.get.getReplJobResults(stmtId, 1).get() + ensureRunning() + val r = client.get.getReplJobResults(stmtId, 1).get( + livyConf.getTimeAsMs(LivyConf.REQUEST_TIMEOUT), TimeUnit.MILLISECONDS) if (r.statements.length < 1) { None } else { @@ -625,28 +627,31 @@ class InteractiveSession( } def addFile(uri: URI): Unit = { - ensureActive() + ensureRunning() recordActivity() - client.get.addFile(resolveURI(uri, livyConf)).get() + client.get.addFile(resolveURI(uri, livyConf)).get( + livyConf.getTimeAsMs(LivyConf.REQUEST_TIMEOUT), TimeUnit.MILLISECONDS) } def addJar(uri: URI): Unit = { - ensureActive() + ensureRunning() recordActivity() - client.get.addJar(resolveURI(uri, livyConf)).get() + client.get.addJar(resolveURI(uri, livyConf)).get( + livyConf.getTimeAsMs(LivyConf.REQUEST_TIMEOUT), TimeUnit.MILLISECONDS) } def jobStatus(id: Long): Any = { - ensureActive() + ensureRunning() val clientJobId = operations(id) recordActivity() // TODO: don't block indefinitely? - val status = client.get.getBypassJobStatus(clientJobId).get() + val status = client.get.getBypassJobStatus(clientJobId).get( + livyConf.getTimeAsMs(LivyConf.REQUEST_TIMEOUT), TimeUnit.MILLISECONDS) new JobStatus(id, status.state, status.result, status.error) } def cancelJob(id: Long): Unit = { - ensureActive() + ensureRunning() recordActivity() operations.remove(id).foreach { client.get.cancel } } @@ -689,7 +694,7 @@ class InteractiveSession( } private def performOperation(job: Array[Byte], jobType: String, sync: Boolean): Long = { - ensureActive() + ensureRunning() recordActivity() val future = client.get.bypass(ByteBuffer.wrap(job), jobType, sync) val opId = operationCounter.incrementAndGet()