diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java index f68605accc57..30e95b1edba0 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java @@ -260,10 +260,6 @@ public void unlock() { this, instanceId, count); - } catch (SQLException e) { - // SQL exception happened when deleting lock information - throw new UncheckedSQLException( - e, "Failed to delete %s lock with instanceId %s", this, instanceId); } return null; @@ -298,9 +294,6 @@ private String instanceId() { return null; } } - } catch (SQLException e) { - // SQL exception happened when getting lock information - throw new UncheckedSQLException(e, "Failed to get lock information for %s", type); } }); } catch (InterruptedException e) { diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java index 3cb18ffbb77e..0df2466f4f28 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java @@ -19,11 +19,14 @@ package org.apache.iceberg.flink.maintenance.api; import static org.apache.iceberg.flink.maintenance.api.JdbcLockFactory.INIT_LOCK_TABLES_PROPERTY; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.Map; import java.util.UUID; import org.apache.iceberg.jdbc.JdbcCatalog; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.jupiter.api.Test; class TestJdbcLockFactory extends TestLockFactoryBase { @Override @@ -38,4 +41,63 @@ TriggerLockFactory lockFactory(String tableName) { tableName, properties); } + + @Test + void testSQLExceptionEnablesRetryInClientPool() throws Exception { + // Regression test for #15759: verify that removing the inner try-catch allows + // ClientPoolImpl to retry on transient connection failures. + // + // Before the fix: inner catch converted SQLException -> UncheckedSQLException + // (RuntimeException) inside the lambda. ClientPoolImpl only catches the declared + // exception type (SQLException), so RuntimeException bypasses retry entirely. + // After the fix: SQLException propagates naturally, ClientPoolImpl catches it, + // and retries on transient connection exceptions. + // + // We test this directly using JdbcClientPool: a lambda that throws + // SQLTransientConnectionException should trigger retry. Without the fix, + // wrapping it as UncheckedSQLException would prevent retry. + java.util.Map props = org.apache.iceberg.relocated.com.google.common.collect.Maps.newHashMap(); + props.put("username", "user"); + props.put("password", "password"); + String uri = "jdbc:sqlite:file::memory:?ic" + java.util.UUID.randomUUID().toString().replace("-", ""); + + try (org.apache.iceberg.jdbc.JdbcClientPool pool = new org.apache.iceberg.jdbc.JdbcClientPool(1, uri, props)) { + java.util.concurrent.atomic.AtomicInteger attempts = new java.util.concurrent.atomic.AtomicInteger(0); + + // This lambda simulates the FIXED code path: SQLException propagates naturally. + // ClientPoolImpl should catch it, check isConnectionException(), and retry. + // On second attempt, succeed. + String result = pool.run(conn -> { + if (attempts.incrementAndGet() == 1) { + throw new java.sql.SQLTransientConnectionException("transient failure"); + } + return "success"; + }); + + assertThat(result).isEqualTo("success"); + assertThat(attempts.get()).isGreaterThan(1); + } + } + + @Test + void testUncheckedSQLExceptionBypassesRetry() throws Exception { + // Companion test: demonstrates that wrapping SQLException as UncheckedSQLException + // (the OLD behavior before the fix) prevents ClientPoolImpl from retrying. + java.util.Map props = org.apache.iceberg.relocated.com.google.common.collect.Maps.newHashMap(); + props.put("username", "user"); + props.put("password", "password"); + String uri = "jdbc:sqlite:file::memory:?ic" + java.util.UUID.randomUUID().toString().replace("-", ""); + + try (org.apache.iceberg.jdbc.JdbcClientPool pool = new org.apache.iceberg.jdbc.JdbcClientPool(1, uri, props)) { + // This lambda simulates the OLD code path: catch SQLException, wrap as RuntimeException. + // ClientPoolImpl does NOT catch RuntimeException, so no retry happens. + assertThatThrownBy(() -> pool.run(conn -> { + try { + throw new java.sql.SQLTransientConnectionException("transient failure"); + } catch (java.sql.SQLException e) { + throw new org.apache.iceberg.jdbc.UncheckedSQLException(e, "wrapped"); + } + })).isInstanceOf(org.apache.iceberg.jdbc.UncheckedSQLException.class); + } + } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java index f68605accc57..30e95b1edba0 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java @@ -260,10 +260,6 @@ public void unlock() { this, instanceId, count); - } catch (SQLException e) { - // SQL exception happened when deleting lock information - throw new UncheckedSQLException( - e, "Failed to delete %s lock with instanceId %s", this, instanceId); } return null; @@ -298,9 +294,6 @@ private String instanceId() { return null; } } - } catch (SQLException e) { - // SQL exception happened when getting lock information - throw new UncheckedSQLException(e, "Failed to get lock information for %s", type); } }); } catch (InterruptedException e) { diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java index 3cb18ffbb77e..0df2466f4f28 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java @@ -19,11 +19,14 @@ package org.apache.iceberg.flink.maintenance.api; import static org.apache.iceberg.flink.maintenance.api.JdbcLockFactory.INIT_LOCK_TABLES_PROPERTY; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.Map; import java.util.UUID; import org.apache.iceberg.jdbc.JdbcCatalog; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.jupiter.api.Test; class TestJdbcLockFactory extends TestLockFactoryBase { @Override @@ -38,4 +41,63 @@ TriggerLockFactory lockFactory(String tableName) { tableName, properties); } + + @Test + void testSQLExceptionEnablesRetryInClientPool() throws Exception { + // Regression test for #15759: verify that removing the inner try-catch allows + // ClientPoolImpl to retry on transient connection failures. + // + // Before the fix: inner catch converted SQLException -> UncheckedSQLException + // (RuntimeException) inside the lambda. ClientPoolImpl only catches the declared + // exception type (SQLException), so RuntimeException bypasses retry entirely. + // After the fix: SQLException propagates naturally, ClientPoolImpl catches it, + // and retries on transient connection exceptions. + // + // We test this directly using JdbcClientPool: a lambda that throws + // SQLTransientConnectionException should trigger retry. Without the fix, + // wrapping it as UncheckedSQLException would prevent retry. + java.util.Map props = org.apache.iceberg.relocated.com.google.common.collect.Maps.newHashMap(); + props.put("username", "user"); + props.put("password", "password"); + String uri = "jdbc:sqlite:file::memory:?ic" + java.util.UUID.randomUUID().toString().replace("-", ""); + + try (org.apache.iceberg.jdbc.JdbcClientPool pool = new org.apache.iceberg.jdbc.JdbcClientPool(1, uri, props)) { + java.util.concurrent.atomic.AtomicInteger attempts = new java.util.concurrent.atomic.AtomicInteger(0); + + // This lambda simulates the FIXED code path: SQLException propagates naturally. + // ClientPoolImpl should catch it, check isConnectionException(), and retry. + // On second attempt, succeed. + String result = pool.run(conn -> { + if (attempts.incrementAndGet() == 1) { + throw new java.sql.SQLTransientConnectionException("transient failure"); + } + return "success"; + }); + + assertThat(result).isEqualTo("success"); + assertThat(attempts.get()).isGreaterThan(1); + } + } + + @Test + void testUncheckedSQLExceptionBypassesRetry() throws Exception { + // Companion test: demonstrates that wrapping SQLException as UncheckedSQLException + // (the OLD behavior before the fix) prevents ClientPoolImpl from retrying. + java.util.Map props = org.apache.iceberg.relocated.com.google.common.collect.Maps.newHashMap(); + props.put("username", "user"); + props.put("password", "password"); + String uri = "jdbc:sqlite:file::memory:?ic" + java.util.UUID.randomUUID().toString().replace("-", ""); + + try (org.apache.iceberg.jdbc.JdbcClientPool pool = new org.apache.iceberg.jdbc.JdbcClientPool(1, uri, props)) { + // This lambda simulates the OLD code path: catch SQLException, wrap as RuntimeException. + // ClientPoolImpl does NOT catch RuntimeException, so no retry happens. + assertThatThrownBy(() -> pool.run(conn -> { + try { + throw new java.sql.SQLTransientConnectionException("transient failure"); + } catch (java.sql.SQLException e) { + throw new org.apache.iceberg.jdbc.UncheckedSQLException(e, "wrapped"); + } + })).isInstanceOf(org.apache.iceberg.jdbc.UncheckedSQLException.class); + } + } } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java index f68605accc57..30e95b1edba0 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java @@ -260,10 +260,6 @@ public void unlock() { this, instanceId, count); - } catch (SQLException e) { - // SQL exception happened when deleting lock information - throw new UncheckedSQLException( - e, "Failed to delete %s lock with instanceId %s", this, instanceId); } return null; @@ -298,9 +294,6 @@ private String instanceId() { return null; } } - } catch (SQLException e) { - // SQL exception happened when getting lock information - throw new UncheckedSQLException(e, "Failed to get lock information for %s", type); } }); } catch (InterruptedException e) { diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java index 3cb18ffbb77e..0df2466f4f28 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java @@ -19,11 +19,14 @@ package org.apache.iceberg.flink.maintenance.api; import static org.apache.iceberg.flink.maintenance.api.JdbcLockFactory.INIT_LOCK_TABLES_PROPERTY; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.Map; import java.util.UUID; import org.apache.iceberg.jdbc.JdbcCatalog; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.jupiter.api.Test; class TestJdbcLockFactory extends TestLockFactoryBase { @Override @@ -38,4 +41,63 @@ TriggerLockFactory lockFactory(String tableName) { tableName, properties); } + + @Test + void testSQLExceptionEnablesRetryInClientPool() throws Exception { + // Regression test for #15759: verify that removing the inner try-catch allows + // ClientPoolImpl to retry on transient connection failures. + // + // Before the fix: inner catch converted SQLException -> UncheckedSQLException + // (RuntimeException) inside the lambda. ClientPoolImpl only catches the declared + // exception type (SQLException), so RuntimeException bypasses retry entirely. + // After the fix: SQLException propagates naturally, ClientPoolImpl catches it, + // and retries on transient connection exceptions. + // + // We test this directly using JdbcClientPool: a lambda that throws + // SQLTransientConnectionException should trigger retry. Without the fix, + // wrapping it as UncheckedSQLException would prevent retry. + java.util.Map props = org.apache.iceberg.relocated.com.google.common.collect.Maps.newHashMap(); + props.put("username", "user"); + props.put("password", "password"); + String uri = "jdbc:sqlite:file::memory:?ic" + java.util.UUID.randomUUID().toString().replace("-", ""); + + try (org.apache.iceberg.jdbc.JdbcClientPool pool = new org.apache.iceberg.jdbc.JdbcClientPool(1, uri, props)) { + java.util.concurrent.atomic.AtomicInteger attempts = new java.util.concurrent.atomic.AtomicInteger(0); + + // This lambda simulates the FIXED code path: SQLException propagates naturally. + // ClientPoolImpl should catch it, check isConnectionException(), and retry. + // On second attempt, succeed. + String result = pool.run(conn -> { + if (attempts.incrementAndGet() == 1) { + throw new java.sql.SQLTransientConnectionException("transient failure"); + } + return "success"; + }); + + assertThat(result).isEqualTo("success"); + assertThat(attempts.get()).isGreaterThan(1); + } + } + + @Test + void testUncheckedSQLExceptionBypassesRetry() throws Exception { + // Companion test: demonstrates that wrapping SQLException as UncheckedSQLException + // (the OLD behavior before the fix) prevents ClientPoolImpl from retrying. + java.util.Map props = org.apache.iceberg.relocated.com.google.common.collect.Maps.newHashMap(); + props.put("username", "user"); + props.put("password", "password"); + String uri = "jdbc:sqlite:file::memory:?ic" + java.util.UUID.randomUUID().toString().replace("-", ""); + + try (org.apache.iceberg.jdbc.JdbcClientPool pool = new org.apache.iceberg.jdbc.JdbcClientPool(1, uri, props)) { + // This lambda simulates the OLD code path: catch SQLException, wrap as RuntimeException. + // ClientPoolImpl does NOT catch RuntimeException, so no retry happens. + assertThatThrownBy(() -> pool.run(conn -> { + try { + throw new java.sql.SQLTransientConnectionException("transient failure"); + } catch (java.sql.SQLException e) { + throw new org.apache.iceberg.jdbc.UncheckedSQLException(e, "wrapped"); + } + })).isInstanceOf(org.apache.iceberg.jdbc.UncheckedSQLException.class); + } + } }