Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,6 @@ public void unlock() {
this,
instanceId,
count);
} catch (SQLException e) {
// SQL exception happened when deleting lock information
throw new UncheckedSQLException(
e, "Failed to delete %s lock with instanceId %s", this, instanceId);
}

return null;
Expand Down Expand Up @@ -298,9 +294,6 @@ private String instanceId() {
return null;
}
}
} catch (SQLException e) {
// SQL exception happened when getting lock information
throw new UncheckedSQLException(e, "Failed to get lock information for %s", type);
}
});
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
package org.apache.iceberg.flink.maintenance.api;

import static org.apache.iceberg.flink.maintenance.api.JdbcLockFactory.INIT_LOCK_TABLES_PROPERTY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.Map;
import java.util.UUID;
import org.apache.iceberg.jdbc.JdbcCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.jupiter.api.Test;

class TestJdbcLockFactory extends TestLockFactoryBase {
@Override
Expand All @@ -38,4 +41,30 @@ TriggerLockFactory lockFactory(String tableName) {
tableName,
properties);
}

@Test
void testUnlockAfterConnectionClosedThrowsException() throws Exception {
// Verify that when the pool is closed, exceptions propagate without being
// swallowed inside the lambda. This validates that the inner catch blocks
// were removed, allowing ClientPoolImpl's retry mechanism to work.
TriggerLockFactory.Lock lock = lockFactory.createLock();
assertThat(lock.tryLock()).isTrue();

// Close the underlying connection pool to force an exception
lockFactory.close();

// Should throw - the important thing is that no UncheckedSQLException is thrown
// from inside the lambda (which would bypass ClientPoolImpl retry logic)
assertThatThrownBy(lock::unlock).isInstanceOf(IllegalStateException.class);
}

@Test
void testIsHeldAfterConnectionClosedThrowsException() throws Exception {
TriggerLockFactory.Lock lock = lockFactory.createLock();
assertThat(lock.tryLock()).isTrue();

lockFactory.close();

assertThatThrownBy(lock::isHeld).isInstanceOf(IllegalStateException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,6 @@ public void unlock() {
this,
instanceId,
count);
} catch (SQLException e) {
// SQL exception happened when deleting lock information
throw new UncheckedSQLException(
e, "Failed to delete %s lock with instanceId %s", this, instanceId);
}

return null;
Expand Down Expand Up @@ -298,9 +294,6 @@ private String instanceId() {
return null;
}
}
} catch (SQLException e) {
// SQL exception happened when getting lock information
throw new UncheckedSQLException(e, "Failed to get lock information for %s", type);
}
});
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
package org.apache.iceberg.flink.maintenance.api;

import static org.apache.iceberg.flink.maintenance.api.JdbcLockFactory.INIT_LOCK_TABLES_PROPERTY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.Map;
import java.util.UUID;
import org.apache.iceberg.jdbc.JdbcCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.jupiter.api.Test;

class TestJdbcLockFactory extends TestLockFactoryBase {
@Override
Expand All @@ -38,4 +41,30 @@ TriggerLockFactory lockFactory(String tableName) {
tableName,
properties);
}

@Test
void testUnlockAfterConnectionClosedThrowsException() throws Exception {
// Verify that when the pool is closed, exceptions propagate without being
// swallowed inside the lambda. This validates that the inner catch blocks
// were removed, allowing ClientPoolImpl's retry mechanism to work.
TriggerLockFactory.Lock lock = lockFactory.createLock();
assertThat(lock.tryLock()).isTrue();

// Close the underlying connection pool to force an exception
lockFactory.close();

// Should throw - the important thing is that no UncheckedSQLException is thrown
// from inside the lambda (which would bypass ClientPoolImpl retry logic)
assertThatThrownBy(lock::unlock).isInstanceOf(IllegalStateException.class);
}

@Test
void testIsHeldAfterConnectionClosedThrowsException() throws Exception {
TriggerLockFactory.Lock lock = lockFactory.createLock();
assertThat(lock.tryLock()).isTrue();

lockFactory.close();

assertThatThrownBy(lock::isHeld).isInstanceOf(IllegalStateException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,6 @@ public void unlock() {
this,
instanceId,
count);
} catch (SQLException e) {
// SQL exception happened when deleting lock information
throw new UncheckedSQLException(
e, "Failed to delete %s lock with instanceId %s", this, instanceId);
}

return null;
Expand Down Expand Up @@ -298,9 +294,6 @@ private String instanceId() {
return null;
}
}
} catch (SQLException e) {
// SQL exception happened when getting lock information
throw new UncheckedSQLException(e, "Failed to get lock information for %s", type);
}
});
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
package org.apache.iceberg.flink.maintenance.api;

import static org.apache.iceberg.flink.maintenance.api.JdbcLockFactory.INIT_LOCK_TABLES_PROPERTY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.Map;
import java.util.UUID;
import org.apache.iceberg.jdbc.JdbcCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.jupiter.api.Test;

class TestJdbcLockFactory extends TestLockFactoryBase {
@Override
Expand All @@ -38,4 +41,30 @@ TriggerLockFactory lockFactory(String tableName) {
tableName,
properties);
}

@Test
void testUnlockAfterConnectionClosedThrowsException() throws Exception {
// Verify that when the pool is closed, exceptions propagate without being
// swallowed inside the lambda. This validates that the inner catch blocks
// were removed, allowing ClientPoolImpl's retry mechanism to work.
TriggerLockFactory.Lock lock = lockFactory.createLock();
assertThat(lock.tryLock()).isTrue();

// Close the underlying connection pool to force an exception
lockFactory.close();

// Should throw - the important thing is that no UncheckedSQLException is thrown
// from inside the lambda (which would bypass ClientPoolImpl retry logic)
assertThatThrownBy(lock::unlock).isInstanceOf(IllegalStateException.class);
}

@Test
void testIsHeldAfterConnectionClosedThrowsException() throws Exception {
TriggerLockFactory.Lock lock = lockFactory.createLock();
assertThat(lock.tryLock()).isTrue();

lockFactory.close();

assertThatThrownBy(lock::isHeld).isInstanceOf(IllegalStateException.class);
}
}