From 802c59fabb9a058e0807cfc745441749e0ee34d4 Mon Sep 17 00:00:00 2001 From: Jiwon Park Date: Fri, 5 Jun 2026 17:11:39 +0900 Subject: [PATCH 1/5] [SPARK-57274][CONNECT] Support fetch/type accessors and getMoreResults for SparkConnectStatement Signed-off-by: Jiwon Park --- .../client/jdbc/SparkConnectConnection.scala | 29 +++++- .../client/jdbc/SparkConnectStatement.scala | 74 +++++++++++---- .../jdbc/SparkConnectStatementSuite.scala | 90 +++++++++++++++++++ 3 files changed, 173 insertions(+), 20 deletions(-) diff --git a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectConnection.scala b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectConnection.scala index 21b9471bb6069..a4d1cb5e802cc 100644 --- a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectConnection.scala +++ b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectConnection.scala @@ -98,8 +98,11 @@ class SparkConnectConnection(val url: String, val info: Properties) extends Conn override def createStatement( resultSetType: Int, resultSetConcurrency: Int, - resultSetHoldability: Int): Statement = - throw new SQLFeatureNotSupportedException + resultSetHoldability: Int): Statement = { + // holdability is ignored + checkSupportedResultSet(resultSetType, resultSetConcurrency) + createStatement() + } override def prepareStatement( sql: String, @@ -128,8 +131,26 @@ class SparkConnectConnection(val url: String, val info: Properties) extends Conn throw new SQLFeatureNotSupportedException override def createStatement( - resultSetType: Int, resultSetConcurrency: Int): Statement = - throw new SQLFeatureNotSupportedException + resultSetType: Int, resultSetConcurrency: Int): Statement = { + checkSupportedResultSet(resultSetType, resultSetConcurrency) + createStatement() + } + + // SCROLL_INSENSITIVE is accepted but the returned statement is forward-only. + // Mirrors the Hive JDBC driver policy used by the Spark Thrift Server. + private def checkSupportedResultSet( + resultSetType: Int, resultSetConcurrency: Int): Unit = { + if (resultSetConcurrency != ResultSet.CONCUR_READ_ONLY) { + throw new SQLFeatureNotSupportedException( + s"ResultSet concurrency $resultSetConcurrency is not supported; " + + "only CONCUR_READ_ONLY.") + } + if (resultSetType == ResultSet.TYPE_SCROLL_SENSITIVE) { + throw new SQLFeatureNotSupportedException( + s"ResultSet type $resultSetType is not supported; " + + "use TYPE_FORWARD_ONLY or TYPE_SCROLL_INSENSITIVE.") + } + } override def prepareStatement( sql: String, diff --git a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala index d1947ae93a40c..5b78363f2d7a3 100644 --- a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala +++ b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala @@ -28,6 +28,12 @@ class SparkConnectStatement(conn: SparkConnectConnection) extends Statement { private var maxRows: Int = 0 + private var fetchSize: Int = SparkConnectStatement.DEFAULT_FETCH_SIZE + + private var queryTimeout: Int = 0 + + private var resultsExhausted: Boolean = false + @volatile private var closed: Boolean = false override def isClosed: Boolean = closed @@ -94,6 +100,7 @@ class SparkConnectStatement(conn: SparkConnectConnection) extends Statement { // reset before executing new query operationId = null resultSet = null + resultsExhausted = false var df = conn.spark.sql(sql) if (maxRows > 0) { @@ -140,11 +147,17 @@ class SparkConnectStatement(conn: SparkConnectConnection) extends Statement { override def getQueryTimeout: Int = { checkOpen() - 0 + queryTimeout } - override def setQueryTimeout(seconds: Int): Unit = - throw new SQLFeatureNotSupportedException + // stored as a hint and echoed back; Spark Connect has no client-side timeout + override def setQueryTimeout(seconds: Int): Unit = { + checkOpen() + if (seconds < 0) { + throw new SQLException("Query timeout must be zero or a positive integer.") + } + queryTimeout = seconds + } override def cancel(): Unit = { checkOpen() @@ -164,35 +177,60 @@ class SparkConnectStatement(conn: SparkConnectConnection) extends Statement { override def getUpdateCount: Int = { checkOpen() - if (resultSet != null) { + if (resultsExhausted || resultSet != null) { -1 } else { 0 // always return 0 because affected rows is not supported yet } } - override def getMoreResults: Boolean = - throw new SQLFeatureNotSupportedException + // a single result per execute(), so there is no next one: close the current + // ResultSet and mark exhausted, flipping getUpdateCount() to -1 so drain loops end + override def getMoreResults: Boolean = { + checkOpen() + if (resultSet != null) { + resultSet.close() + resultSet = null + } + resultsExhausted = true + false + } - override def setFetchDirection(direction: Int): Unit = - throw new SQLFeatureNotSupportedException + override def setFetchDirection(direction: Int): Unit = { + checkOpen() + if (direction != ResultSet.FETCH_FORWARD) { + throw new SQLException(s"Fetch direction $direction is not supported.") + } + } - override def getFetchDirection: Int = - throw new SQLFeatureNotSupportedException + override def getFetchDirection: Int = { + checkOpen() + ResultSet.FETCH_FORWARD + } - override def setFetchSize(rows: Int): Unit = - throw new SQLFeatureNotSupportedException + // stored as a hint; Spark Connect results are forward-only and server-paginated + override def setFetchSize(rows: Int): Unit = { + checkOpen() + if (rows < 0) { + throw new SQLException("Fetch size must be zero or a positive integer.") + } + fetchSize = if (rows == 0) SparkConnectStatement.DEFAULT_FETCH_SIZE else rows + } - override def getFetchSize: Int = - throw new SQLFeatureNotSupportedException + override def getFetchSize: Int = { + checkOpen() + fetchSize + } override def getResultSetConcurrency: Int = { checkOpen() ResultSet.CONCUR_READ_ONLY } - override def getResultSetType: Int = - throw new SQLFeatureNotSupportedException + override def getResultSetType: Int = { + checkOpen() + ResultSet.TYPE_FORWARD_ONLY + } override def addBatch(sql: String): Unit = throw new SQLFeatureNotSupportedException @@ -265,3 +303,7 @@ class SparkConnectStatement(conn: SparkConnectConnection) extends Statement { override def isWrapperFor(iface: Class[_]): Boolean = iface.isInstance(this) } + +object SparkConnectStatement { + private val DEFAULT_FETCH_SIZE = 1000 +} diff --git a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala index fa9df3f1247f7..929d3fb27fffc 100644 --- a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala +++ b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala @@ -116,4 +116,94 @@ class SparkConnectStatementSuite extends ConnectFunSuite with RemoteSparkSession } } } + + test("fetch size, fetch direction, result set type and query timeout accessors") { + withStatement { stmt => + // fetch size: a stored hint that defaults, resets to default on 0, and validates + assert(stmt.getFetchSize === 1000) + stmt.setFetchSize(42) + assert(stmt.getFetchSize === 42) + stmt.setFetchSize(0) + assert(stmt.getFetchSize === 1000) + val se1 = intercept[SQLException] { + stmt.setFetchSize(-1) + } + assert(se1.getMessage === "Fetch size must be zero or a positive integer.") + + // fetch direction: only FETCH_FORWARD is supported + stmt.setFetchDirection(ResultSet.FETCH_FORWARD) + assert(stmt.getFetchDirection === ResultSet.FETCH_FORWARD) + intercept[SQLException] { + stmt.setFetchDirection(ResultSet.FETCH_REVERSE) + } + + // result set type is forward-only + assert(stmt.getResultSetType === ResultSet.TYPE_FORWARD_ONLY) + + // query timeout: a stored hint with validation + assert(stmt.getQueryTimeout === 0) + stmt.setQueryTimeout(30) + assert(stmt.getQueryTimeout === 30) + val se2 = intercept[SQLException] { + stmt.setQueryTimeout(-1) + } + assert(se2.getMessage === "Query timeout must be zero or a positive integer.") + } + } + + test("getMoreResults terminates JDBC drain loops") { + // A typical JDBC result-draining loop. With getMoreResults throwing (or not + // flipping getUpdateCount to -1) this would spin forever; assert it returns. + def drain(stmt: Statement): Unit = { + while (stmt.getMoreResults || stmt.getUpdateCount != -1) {} + } + + withTable("t_drain") { + withStatement { stmt => + // result-bearing command + assert(stmt.execute("SELECT id FROM range(3)")) + assert(stmt.getUpdateCount === -1) + drain(stmt) + assert(stmt.getResultSet === null) + assert(stmt.getUpdateCount === -1) + + // result-less command + assert(!stmt.execute("CREATE TABLE t_drain (id INT) USING Parquet")) + assert(stmt.getUpdateCount === 0) + drain(stmt) + assert(stmt.getUpdateCount === -1) + } + } + } + + test("createStatement with result set type and concurrency") { + withConnection { conn => + Using.resource( + conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) { stmt => + assert(stmt.getResultSetType === ResultSet.TYPE_FORWARD_ONLY) + } + + // scroll-insensitive is accepted but downgraded to forward-only + Using.resource( + conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY)) { + stmt => assert(stmt.getResultSetType === ResultSet.TYPE_FORWARD_ONLY) + } + + // the holdability overload applies the same policy + Using.resource(conn.createStatement( + ResultSet.TYPE_FORWARD_ONLY, + ResultSet.CONCUR_READ_ONLY, + ResultSet.CLOSE_CURSORS_AT_COMMIT)) { stmt => + assert(stmt.getResultSetType === ResultSet.TYPE_FORWARD_ONLY) + } + + // updatable concurrency and scroll-sensitive type are rejected + intercept[SQLFeatureNotSupportedException] { + conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE) + } + intercept[SQLFeatureNotSupportedException] { + conn.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_READ_ONLY) + } + } + } } From ed8558be18c7a7e9fb9b1cb32b43db525e792b1b Mon Sep 17 00:00:00 2001 From: Jiwon Park Date: Fri, 5 Jun 2026 19:03:41 +0900 Subject: [PATCH 2/5] [SPARK-57274][CONNECT] Address review: do not store dummy queryTimeout/fetchSize state getQueryTimeout and getFetchSize now always return 0 to reflect the real state; the setters validate and silently drop the value instead of echoing it back. Remove the now-unused DEFAULT_FETCH_SIZE constant. Signed-off-by: Jiwon Park --- .../client/jdbc/SparkConnectStatement.scala | 18 ++++-------------- .../jdbc/SparkConnectStatementSuite.scala | 12 +++++------- 2 files changed, 9 insertions(+), 21 deletions(-) diff --git a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala index 5b78363f2d7a3..d796c1fd6d03a 100644 --- a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala +++ b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala @@ -28,10 +28,6 @@ class SparkConnectStatement(conn: SparkConnectConnection) extends Statement { private var maxRows: Int = 0 - private var fetchSize: Int = SparkConnectStatement.DEFAULT_FETCH_SIZE - - private var queryTimeout: Int = 0 - private var resultsExhausted: Boolean = false @volatile private var closed: Boolean = false @@ -147,16 +143,15 @@ class SparkConnectStatement(conn: SparkConnectConnection) extends Statement { override def getQueryTimeout: Int = { checkOpen() - queryTimeout + 0 } - // stored as a hint and echoed back; Spark Connect has no client-side timeout + // This driver does not apply a query timeout; validate and silently drop the value. override def setQueryTimeout(seconds: Int): Unit = { checkOpen() if (seconds < 0) { throw new SQLException("Query timeout must be zero or a positive integer.") } - queryTimeout = seconds } override def cancel(): Unit = { @@ -208,18 +203,17 @@ class SparkConnectStatement(conn: SparkConnectConnection) extends Statement { ResultSet.FETCH_FORWARD } - // stored as a hint; Spark Connect results are forward-only and server-paginated + // This driver does not apply a fetch size hint; validate and silently drop the value. override def setFetchSize(rows: Int): Unit = { checkOpen() if (rows < 0) { throw new SQLException("Fetch size must be zero or a positive integer.") } - fetchSize = if (rows == 0) SparkConnectStatement.DEFAULT_FETCH_SIZE else rows } override def getFetchSize: Int = { checkOpen() - fetchSize + 0 } override def getResultSetConcurrency: Int = { @@ -303,7 +297,3 @@ class SparkConnectStatement(conn: SparkConnectConnection) extends Statement { override def isWrapperFor(iface: Class[_]): Boolean = iface.isInstance(this) } - -object SparkConnectStatement { - private val DEFAULT_FETCH_SIZE = 1000 -} diff --git a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala index 929d3fb27fffc..bf8fd77f174e9 100644 --- a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala +++ b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala @@ -119,12 +119,10 @@ class SparkConnectStatementSuite extends ConnectFunSuite with RemoteSparkSession test("fetch size, fetch direction, result set type and query timeout accessors") { withStatement { stmt => - // fetch size: a stored hint that defaults, resets to default on 0, and validates - assert(stmt.getFetchSize === 1000) + // fetch size: validated then silently dropped, always reads back as 0 + assert(stmt.getFetchSize === 0) stmt.setFetchSize(42) - assert(stmt.getFetchSize === 42) - stmt.setFetchSize(0) - assert(stmt.getFetchSize === 1000) + assert(stmt.getFetchSize === 0) val se1 = intercept[SQLException] { stmt.setFetchSize(-1) } @@ -140,10 +138,10 @@ class SparkConnectStatementSuite extends ConnectFunSuite with RemoteSparkSession // result set type is forward-only assert(stmt.getResultSetType === ResultSet.TYPE_FORWARD_ONLY) - // query timeout: a stored hint with validation + // query timeout: validated then silently dropped, always reads back as 0 assert(stmt.getQueryTimeout === 0) stmt.setQueryTimeout(30) - assert(stmt.getQueryTimeout === 30) + assert(stmt.getQueryTimeout === 0) val se2 = intercept[SQLException] { stmt.setQueryTimeout(-1) } From 6d860eb1a8d8b7352e2a73d5caa1d253e1d4a3a4 Mon Sep 17 00:00:00 2001 From: Jiwon Park Date: Fri, 5 Jun 2026 19:17:18 +0900 Subject: [PATCH 3/5] [SPARK-57274][CONNECT] Address review: stringify result set constants in error messages Use JdbcErrorUtils.stringify* so error messages show readable names instead of raw int codes. Add stringifyResultSetConcurrency and apply it, along with the existing stringifyFetchDirection/stringifyResultSetType, to the remaining call sites in SparkConnectStatement and SparkConnectConnection. Signed-off-by: Jiwon Park --- .../sql/connect/client/jdbc/SparkConnectConnection.scala | 6 +++--- .../sql/connect/client/jdbc/SparkConnectStatement.scala | 4 +++- .../sql/connect/client/jdbc/util/JdbcErrorUtils.scala | 7 +++++++ 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectConnection.scala b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectConnection.scala index a4d1cb5e802cc..7bec7416229e3 100644 --- a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectConnection.scala +++ b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectConnection.scala @@ -142,12 +142,12 @@ class SparkConnectConnection(val url: String, val info: Properties) extends Conn resultSetType: Int, resultSetConcurrency: Int): Unit = { if (resultSetConcurrency != ResultSet.CONCUR_READ_ONLY) { throw new SQLFeatureNotSupportedException( - s"ResultSet concurrency $resultSetConcurrency is not supported; " + - "only CONCUR_READ_ONLY.") + s"ResultSet concurrency ${stringifyResultSetConcurrency(resultSetConcurrency)} " + + "is not supported; only CONCUR_READ_ONLY.") } if (resultSetType == ResultSet.TYPE_SCROLL_SENSITIVE) { throw new SQLFeatureNotSupportedException( - s"ResultSet type $resultSetType is not supported; " + + s"ResultSet type ${stringifyResultSetType(resultSetType)} is not supported; " + "use TYPE_FORWARD_ONLY or TYPE_SCROLL_INSENSITIVE.") } } diff --git a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala index d796c1fd6d03a..245832087268e 100644 --- a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala +++ b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.connect.client.jdbc import java.sql.{Array => _, _} import org.apache.spark.sql.connect.client.SparkResult +import org.apache.spark.sql.connect.client.jdbc.util.JdbcErrorUtils class SparkConnectStatement(conn: SparkConnectConnection) extends Statement { @@ -194,7 +195,8 @@ class SparkConnectStatement(conn: SparkConnectConnection) extends Statement { override def setFetchDirection(direction: Int): Unit = { checkOpen() if (direction != ResultSet.FETCH_FORWARD) { - throw new SQLException(s"Fetch direction $direction is not supported.") + throw new SQLException( + s"Fetch direction ${JdbcErrorUtils.stringifyFetchDirection(direction)} is not supported.") } } diff --git a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/util/JdbcErrorUtils.scala b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/util/JdbcErrorUtils.scala index 6480c5d768f3f..a732dae0d6af5 100644 --- a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/util/JdbcErrorUtils.scala +++ b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/util/JdbcErrorUtils.scala @@ -46,6 +46,13 @@ private[jdbc] object JdbcErrorUtils { throw new IllegalArgumentException(s"Invalid ResultSet type: $typ") } + def stringifyResultSetConcurrency(concurrency: Int): String = concurrency match { + case ResultSet.CONCUR_READ_ONLY => "CONCUR_READ_ONLY" + case ResultSet.CONCUR_UPDATABLE => "CONCUR_UPDATABLE" + case _ => + throw new IllegalArgumentException(s"Invalid ResultSet concurrency: $concurrency") + } + def stringifyFetchDirection(direction: Int): String = direction match { case ResultSet.FETCH_FORWARD => "FETCH_FORWARD" case ResultSet.FETCH_REVERSE => "FETCH_REVERSE" From e18d28162d02bf279de0f1678a9a4f488dfa13eb Mon Sep 17 00:00:00 2001 From: Jiwon Park Date: Fri, 5 Jun 2026 20:11:56 +0900 Subject: [PATCH 4/5] [SPARK-57274][CONNECT] Address review: do not support createStatement with holdability Revert the 3-arg createStatement(type, concurrency, holdability) to throw SQLFeatureNotSupportedException, matching the JDBC javadoc (which permits it) and the Hive JDBC driver. Holdability has no meaning without transaction support, so accepting it while ignoring the value is misleading. Signed-off-by: Jiwon Park --- .../sql/connect/client/jdbc/SparkConnectConnection.scala | 7 ++----- .../connect/client/jdbc/SparkConnectStatementSuite.scala | 8 ++++---- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectConnection.scala b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectConnection.scala index 7bec7416229e3..da223f6afb2b0 100644 --- a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectConnection.scala +++ b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectConnection.scala @@ -98,11 +98,8 @@ class SparkConnectConnection(val url: String, val info: Properties) extends Conn override def createStatement( resultSetType: Int, resultSetConcurrency: Int, - resultSetHoldability: Int): Statement = { - // holdability is ignored - checkSupportedResultSet(resultSetType, resultSetConcurrency) - createStatement() - } + resultSetHoldability: Int): Statement = + throw new SQLFeatureNotSupportedException override def prepareStatement( sql: String, diff --git a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala index bf8fd77f174e9..bd4dea293f0e0 100644 --- a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala +++ b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala @@ -187,12 +187,12 @@ class SparkConnectStatementSuite extends ConnectFunSuite with RemoteSparkSession stmt => assert(stmt.getResultSetType === ResultSet.TYPE_FORWARD_ONLY) } - // the holdability overload applies the same policy - Using.resource(conn.createStatement( + // the holdability overload is not supported + intercept[SQLFeatureNotSupportedException] { + conn.createStatement( ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, - ResultSet.CLOSE_CURSORS_AT_COMMIT)) { stmt => - assert(stmt.getResultSetType === ResultSet.TYPE_FORWARD_ONLY) + ResultSet.CLOSE_CURSORS_AT_COMMIT) } // updatable concurrency and scroll-sensitive type are rejected From 974081d99fe4276a185f43466ce114f1b6beea5e Mon Sep 17 00:00:00 2001 From: Jiwon Park Date: Fri, 5 Jun 2026 21:40:31 +0900 Subject: [PATCH 5/5] [SPARK-57274][CONNECT] Address review: only support TYPE_FORWARD_ONLY result sets Reject TYPE_SCROLL_INSENSITIVE instead of accepting it and silently downgrading to forward-only. Spark Connect has no server-side scrollable cursor, and the previous code claimed to mirror the Hive JDBC driver while behaving differently from it (Hive backs SCROLL_INSENSITIVE with distinct ResultSet logic). Scrollable result sets can be added in a follow-up with proper client-side support. Signed-off-by: Jiwon Park --- .../client/jdbc/SparkConnectConnection.scala | 14 +++++++------- .../client/jdbc/SparkConnectStatementSuite.scala | 14 +++++--------- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectConnection.scala b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectConnection.scala index da223f6afb2b0..fb6816302a296 100644 --- a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectConnection.scala +++ b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectConnection.scala @@ -133,20 +133,20 @@ class SparkConnectConnection(val url: String, val info: Properties) extends Conn createStatement() } - // SCROLL_INSENSITIVE is accepted but the returned statement is forward-only. - // Mirrors the Hive JDBC driver policy used by the Spark Thrift Server. + // Spark Connect results are forward-only and server-paginated, so only + // TYPE_FORWARD_ONLY result sets are supported. private def checkSupportedResultSet( resultSetType: Int, resultSetConcurrency: Int): Unit = { + if (resultSetType != ResultSet.TYPE_FORWARD_ONLY) { + throw new SQLFeatureNotSupportedException( + s"ResultSet type ${stringifyResultSetType(resultSetType)} is not supported; " + + "only TYPE_FORWARD_ONLY.") + } if (resultSetConcurrency != ResultSet.CONCUR_READ_ONLY) { throw new SQLFeatureNotSupportedException( s"ResultSet concurrency ${stringifyResultSetConcurrency(resultSetConcurrency)} " + "is not supported; only CONCUR_READ_ONLY.") } - if (resultSetType == ResultSet.TYPE_SCROLL_SENSITIVE) { - throw new SQLFeatureNotSupportedException( - s"ResultSet type ${stringifyResultSetType(resultSetType)} is not supported; " + - "use TYPE_FORWARD_ONLY or TYPE_SCROLL_INSENSITIVE.") - } } override def prepareStatement( diff --git a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala index bd4dea293f0e0..1e768a5888e3b 100644 --- a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala +++ b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala @@ -181,12 +181,6 @@ class SparkConnectStatementSuite extends ConnectFunSuite with RemoteSparkSession assert(stmt.getResultSetType === ResultSet.TYPE_FORWARD_ONLY) } - // scroll-insensitive is accepted but downgraded to forward-only - Using.resource( - conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY)) { - stmt => assert(stmt.getResultSetType === ResultSet.TYPE_FORWARD_ONLY) - } - // the holdability overload is not supported intercept[SQLFeatureNotSupportedException] { conn.createStatement( @@ -195,12 +189,14 @@ class SparkConnectStatementSuite extends ConnectFunSuite with RemoteSparkSession ResultSet.CLOSE_CURSORS_AT_COMMIT) } - // updatable concurrency and scroll-sensitive type are rejected + // only TYPE_FORWARD_ONLY and CONCUR_READ_ONLY are supported intercept[SQLFeatureNotSupportedException] { conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE) } - intercept[SQLFeatureNotSupportedException] { - conn.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_READ_ONLY) + Seq(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.TYPE_SCROLL_SENSITIVE).foreach { typ => + intercept[SQLFeatureNotSupportedException] { + conn.createStatement(typ, ResultSet.CONCUR_READ_ONLY) + } } } }