From c8bc3d0626d5885de54831400bb3df6967030b96 Mon Sep 17 00:00:00 2001 From: Anupam Yadav Date: Thu, 23 Apr 2026 16:56:38 +0000 Subject: [PATCH] Flink: Fix JdbcLockFactory to allow ClientPoolImpl connection retry (#15759) Remove inner try-catch blocks in JdbcLock.unlock() and instanceId() that wrapped SQLException as UncheckedSQLException (RuntimeException). This prevented ClientPoolImpl from catching the exception for retry, since it only catches the declared exception type (SQLException). Added tests demonstrating that: - SQLException propagates to ClientPoolImpl enabling retry on transient failures - Wrapping as UncheckedSQLException (old behavior) bypasses retry --- .../maintenance/api/JdbcLockFactory.java | 7 -- .../maintenance/api/TestJdbcLockFactory.java | 64 +++++++++++++++++++ .../maintenance/api/JdbcLockFactory.java | 7 -- .../maintenance/api/TestJdbcLockFactory.java | 64 +++++++++++++++++++ .../maintenance/api/JdbcLockFactory.java | 7 -- .../maintenance/api/TestJdbcLockFactory.java | 64 +++++++++++++++++++ 6 files changed, 192 insertions(+), 21 deletions(-) diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java index f68605accc57..30e95b1edba0 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java @@ -260,10 +260,6 @@ public void unlock() { this, instanceId, count); - } catch (SQLException e) { - // SQL exception happened when deleting lock information - throw new UncheckedSQLException( - e, "Failed to delete %s lock with instanceId %s", this, instanceId); } return null; @@ -298,9 +294,6 @@ private String instanceId() { return null; } } - } catch (SQLException e) { - // SQL exception happened when getting lock information - throw new UncheckedSQLException(e, "Failed to get lock information for %s", type); } }); } catch (InterruptedException e) { diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java index 3cb18ffbb77e..c5ca92827684 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java @@ -19,11 +19,18 @@ package org.apache.iceberg.flink.maintenance.api; import static org.apache.iceberg.flink.maintenance.api.JdbcLockFactory.INIT_LOCK_TABLES_PROPERTY; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.sql.SQLTransientConnectionException; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.iceberg.jdbc.JdbcCatalog; +import org.apache.iceberg.jdbc.JdbcClientPool; +import org.apache.iceberg.jdbc.UncheckedSQLException; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.jupiter.api.Test; class TestJdbcLockFactory extends TestLockFactoryBase { @Override @@ -38,4 +45,61 @@ TriggerLockFactory lockFactory(String tableName) { tableName, properties); } + + @Test + void testSQLExceptionEnablesRetryInClientPool() throws Exception { + // Regression test for #15759: verify that removing the inner try-catch allows + // ClientPoolImpl to retry on transient connection failures. + // + // Before the fix: inner catch converted SQLException -> UncheckedSQLException + // (RuntimeException) inside the lambda. ClientPoolImpl only catches the declared + // exception type (SQLException), so RuntimeException bypasses retry entirely. + // After the fix: SQLException propagates naturally, ClientPoolImpl catches it, + // and retries on transient connection exceptions. + Map props = Maps.newHashMap(); + props.put("username", "user"); + props.put("password", "password"); + String uri = "jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", ""); + + try (JdbcClientPool pool = new JdbcClientPool(1, uri, props)) { + AtomicInteger attempts = new AtomicInteger(0); + + String result = + pool.run( + conn -> { + if (attempts.incrementAndGet() == 1) { + throw new SQLTransientConnectionException("transient failure"); + } + return "success"; + }); + + assertThat(result).isEqualTo("success"); + assertThat(attempts.get()).isGreaterThan(1); + } + } + + @Test + void testUncheckedSQLExceptionBypassesRetry() throws Exception { + // Companion test: demonstrates that wrapping SQLException as UncheckedSQLException + // (the OLD behavior before the fix) prevents ClientPoolImpl from retrying. + Map props = Maps.newHashMap(); + props.put("username", "user"); + props.put("password", "password"); + String uri = "jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", ""); + + try (JdbcClientPool pool = new JdbcClientPool(1, uri, props)) { + assertThatThrownBy( + () -> + pool.run( + conn -> { + try { + throw new SQLTransientConnectionException("transient failure"); + } catch (java.sql.SQLException e) { + throw new UncheckedSQLException(e, "wrapped"); + } + })) + .isInstanceOf(UncheckedSQLException.class) + .hasMessageContaining("wrapped"); + } + } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java index f68605accc57..30e95b1edba0 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java @@ -260,10 +260,6 @@ public void unlock() { this, instanceId, count); - } catch (SQLException e) { - // SQL exception happened when deleting lock information - throw new UncheckedSQLException( - e, "Failed to delete %s lock with instanceId %s", this, instanceId); } return null; @@ -298,9 +294,6 @@ private String instanceId() { return null; } } - } catch (SQLException e) { - // SQL exception happened when getting lock information - throw new UncheckedSQLException(e, "Failed to get lock information for %s", type); } }); } catch (InterruptedException e) { diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java index 3cb18ffbb77e..c5ca92827684 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java @@ -19,11 +19,18 @@ package org.apache.iceberg.flink.maintenance.api; import static org.apache.iceberg.flink.maintenance.api.JdbcLockFactory.INIT_LOCK_TABLES_PROPERTY; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.sql.SQLTransientConnectionException; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.iceberg.jdbc.JdbcCatalog; +import org.apache.iceberg.jdbc.JdbcClientPool; +import org.apache.iceberg.jdbc.UncheckedSQLException; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.jupiter.api.Test; class TestJdbcLockFactory extends TestLockFactoryBase { @Override @@ -38,4 +45,61 @@ TriggerLockFactory lockFactory(String tableName) { tableName, properties); } + + @Test + void testSQLExceptionEnablesRetryInClientPool() throws Exception { + // Regression test for #15759: verify that removing the inner try-catch allows + // ClientPoolImpl to retry on transient connection failures. + // + // Before the fix: inner catch converted SQLException -> UncheckedSQLException + // (RuntimeException) inside the lambda. ClientPoolImpl only catches the declared + // exception type (SQLException), so RuntimeException bypasses retry entirely. + // After the fix: SQLException propagates naturally, ClientPoolImpl catches it, + // and retries on transient connection exceptions. + Map props = Maps.newHashMap(); + props.put("username", "user"); + props.put("password", "password"); + String uri = "jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", ""); + + try (JdbcClientPool pool = new JdbcClientPool(1, uri, props)) { + AtomicInteger attempts = new AtomicInteger(0); + + String result = + pool.run( + conn -> { + if (attempts.incrementAndGet() == 1) { + throw new SQLTransientConnectionException("transient failure"); + } + return "success"; + }); + + assertThat(result).isEqualTo("success"); + assertThat(attempts.get()).isGreaterThan(1); + } + } + + @Test + void testUncheckedSQLExceptionBypassesRetry() throws Exception { + // Companion test: demonstrates that wrapping SQLException as UncheckedSQLException + // (the OLD behavior before the fix) prevents ClientPoolImpl from retrying. + Map props = Maps.newHashMap(); + props.put("username", "user"); + props.put("password", "password"); + String uri = "jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", ""); + + try (JdbcClientPool pool = new JdbcClientPool(1, uri, props)) { + assertThatThrownBy( + () -> + pool.run( + conn -> { + try { + throw new SQLTransientConnectionException("transient failure"); + } catch (java.sql.SQLException e) { + throw new UncheckedSQLException(e, "wrapped"); + } + })) + .isInstanceOf(UncheckedSQLException.class) + .hasMessageContaining("wrapped"); + } + } } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java index f68605accc57..30e95b1edba0 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java @@ -260,10 +260,6 @@ public void unlock() { this, instanceId, count); - } catch (SQLException e) { - // SQL exception happened when deleting lock information - throw new UncheckedSQLException( - e, "Failed to delete %s lock with instanceId %s", this, instanceId); } return null; @@ -298,9 +294,6 @@ private String instanceId() { return null; } } - } catch (SQLException e) { - // SQL exception happened when getting lock information - throw new UncheckedSQLException(e, "Failed to get lock information for %s", type); } }); } catch (InterruptedException e) { diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java index 3cb18ffbb77e..c5ca92827684 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java @@ -19,11 +19,18 @@ package org.apache.iceberg.flink.maintenance.api; import static org.apache.iceberg.flink.maintenance.api.JdbcLockFactory.INIT_LOCK_TABLES_PROPERTY; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.sql.SQLTransientConnectionException; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.iceberg.jdbc.JdbcCatalog; +import org.apache.iceberg.jdbc.JdbcClientPool; +import org.apache.iceberg.jdbc.UncheckedSQLException; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.jupiter.api.Test; class TestJdbcLockFactory extends TestLockFactoryBase { @Override @@ -38,4 +45,61 @@ TriggerLockFactory lockFactory(String tableName) { tableName, properties); } + + @Test + void testSQLExceptionEnablesRetryInClientPool() throws Exception { + // Regression test for #15759: verify that removing the inner try-catch allows + // ClientPoolImpl to retry on transient connection failures. + // + // Before the fix: inner catch converted SQLException -> UncheckedSQLException + // (RuntimeException) inside the lambda. ClientPoolImpl only catches the declared + // exception type (SQLException), so RuntimeException bypasses retry entirely. + // After the fix: SQLException propagates naturally, ClientPoolImpl catches it, + // and retries on transient connection exceptions. + Map props = Maps.newHashMap(); + props.put("username", "user"); + props.put("password", "password"); + String uri = "jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", ""); + + try (JdbcClientPool pool = new JdbcClientPool(1, uri, props)) { + AtomicInteger attempts = new AtomicInteger(0); + + String result = + pool.run( + conn -> { + if (attempts.incrementAndGet() == 1) { + throw new SQLTransientConnectionException("transient failure"); + } + return "success"; + }); + + assertThat(result).isEqualTo("success"); + assertThat(attempts.get()).isGreaterThan(1); + } + } + + @Test + void testUncheckedSQLExceptionBypassesRetry() throws Exception { + // Companion test: demonstrates that wrapping SQLException as UncheckedSQLException + // (the OLD behavior before the fix) prevents ClientPoolImpl from retrying. + Map props = Maps.newHashMap(); + props.put("username", "user"); + props.put("password", "password"); + String uri = "jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", ""); + + try (JdbcClientPool pool = new JdbcClientPool(1, uri, props)) { + assertThatThrownBy( + () -> + pool.run( + conn -> { + try { + throw new SQLTransientConnectionException("transient failure"); + } catch (java.sql.SQLException e) { + throw new UncheckedSQLException(e, "wrapped"); + } + })) + .isInstanceOf(UncheckedSQLException.class) + .hasMessageContaining("wrapped"); + } + } }