From 5d045cbb8d19ee55e1cc8b524318f2152ca56079 Mon Sep 17 00:00:00 2001 From: Dmitry Kasimovskiy Date: Fri, 19 Jun 2026 10:55:48 +0300 Subject: [PATCH 1/4] fix(pooling): fix race conditions, deadlock, and connection leak in PoolEntry PoolEntry state transitions had several races between netty IO threads, the HashedWheelTimer heartbeat worker, and user-facing reconnect calls that surfaced as ABBA deadlocks, NPEs on inline connect failures, and leaked connections after a KILL/reconnect cycle in CI. PoolEntry: - Synchronize state mutations and mark connectFuture, heartbeatTask, reconnectTask, lastHeartbeatEvent, isLocked, and isShutdown volatile/AtomicBoolean for cross-thread visibility. - Narrow the entry-monitor critical sections to field mutations; call client.close() and emit() outside the monitor to break the ABBA deadlock between ConnectionImpl and PoolEntry monitors that hung DistributingRoundRobinBalancerTest. - Return the local connect future from internalConnect() so an inline connect failure cannot leave the caller observing a null connectFuture after handleConnectError() nulls it for reconnect. - Keep client.close() in shutdown() on every invocation (closeChannel is idempotent if already closed) but guard only the onConnectionClosed emit, so a KILL-then-reconnect cycle cannot leak the new connection when its auth/ping subsequently fails. - Serialize connectAfter() reconnect-task scheduling to avoid double scheduling; add a double-check of connectFuture in internalConnect() to return the in-flight future instead of starting a new connect. IProtoClientPoolImpl: - Synchronize forEach() on connectionPoolLock to avoid CME under concurrent setGroups(). Tests (BasePoolTest / ConnectionPoolReconnectsTest): - Wait for box.stat.net().CONNECTIONS.current to stabilise in getActiveConnectionsCount: the IProto worker updates it asynchronously, so a single read often lags by 5-15 connections when 20+ are opened in a burst. - Collapse the wait-for-stable Lua script to a single line so tarantool emits one YAML document (SnakeYAML rejects multi-document streams). - Wait for the active connection count to reach the expected value in ConnectionPoolReconnectsTest post-reconnect assertions via a new waitForActiveConnections() helper. Verified locally on 3.5.0 and 2.11.8: ConnectionPoolReconnectsTest, ConnectionPoolTest, ConnectionPoolHeartbeatTest, DistributingRoundRobinBalancerTest, and unit tests all pass consistently where they were previously flaky. --- .../tarantool/pool/IProtoClientPoolImpl.java | 8 +- .../java/io/tarantool/pool/PoolEntry.java | 93 ++++++++++++++----- .../pool/integration/BasePoolTest.java | 42 ++++++++- .../ConnectionPoolReconnectsTest.java | 4 +- 4 files changed, 115 insertions(+), 32 deletions(-) diff --git a/tarantool-pooling/src/main/java/io/tarantool/pool/IProtoClientPoolImpl.java b/tarantool-pooling/src/main/java/io/tarantool/pool/IProtoClientPoolImpl.java index 953c1bb..2bc7523 100644 --- a/tarantool-pooling/src/main/java/io/tarantool/pool/IProtoClientPoolImpl.java +++ b/tarantool-pooling/src/main/java/io/tarantool/pool/IProtoClientPoolImpl.java @@ -524,9 +524,11 @@ public void setReconnectAfter(long reconnectAfter) throws IllegalArgumentExcepti @Override public void forEach(Consumer action) { - for (List group : entries.values()) { - for (PoolEntry entry : group) { - action.accept(entry.getClient()); + synchronized (connectionPoolLock) { + for (List group : entries.values()) { + for (PoolEntry entry : group) { + action.accept(entry.getClient()); + } } } } diff --git a/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java b/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java index a746b9c..def6ec2 100644 --- a/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java +++ b/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java @@ -8,6 +8,7 @@ import java.util.ArrayDeque; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -146,16 +147,16 @@ final class PoolEntry { private CompletableFuture connectFuture; /** Last heartbeat state/event. */ - private HeartbeatEvent lastHeartbeatEvent; + private volatile HeartbeatEvent lastHeartbeatEvent; /** Heartbeat timer/task. */ - private Timeout heartbeatTask; + private volatile Timeout heartbeatTask; /** Reconnection task. */ private Timeout reconnectTask; /** Flag signaling if heartbeat started or not. */ - private boolean isHeartbeatStarted; + private volatile boolean isHeartbeatStarted; /** * Flag signaling if connection is available or not. @@ -163,7 +164,16 @@ final class PoolEntry { *

When connection comes to invalidated state or killed, pool entry is locked and connection * will not be returned to outer client. */ - private boolean isLocked; + private volatile boolean isLocked; + + /** + * Idempotency flag for {@link #shutdown()}. + * + *

Guarantees that the close listener is invoked only once even if both {@link #close()} and + * {@link #shutdown()} are called, or shutdown is invoked multiple times due to overlapping + * connection error events. + */ + private final AtomicBoolean isShutdown = new AtomicBoolean(false); /** Count of failed pings occurred in invalidated state. */ private int currentDeathPings; @@ -305,7 +315,7 @@ public IProtoClient getClient() { * *

Also increments count of unavailable clients. */ - public void lock() { + public synchronized void lock() { if (!isLocked) { unavailable.incrementAndGet(); isLocked = true; @@ -317,7 +327,7 @@ public void lock() { * *

Also decrements count of unavailable clients and cancels reconnect task. */ - public void unlock() { + public synchronized void unlock() { if (isLocked) { stopReconnectTask(); unavailable.decrementAndGet(); @@ -340,16 +350,28 @@ public void close() { shutdown(); } - /** Closes client and stops heartbeat task is started. */ + /** + * Closes the underlying client and stops the heartbeat task. + * + *

Performs field mutations under the entry monitor, then releases it before calling {@code + * client.close()} (which acquires the {@code ConnectionImpl} monitor) and emitting the close + * event. Holding the entry monitor across either of those calls would create an ABBA deadlock + * with the Netty close-callback path, which takes the {@code ConnectionImpl} monitor first and + * then re-enters {@link #handleConnectError(Object, Throwable)} on the entry. + */ public void shutdown() { - connectFuture = null; - stopHeartbeat(); + synchronized (this) { + connectFuture = null; + stopHeartbeat(); + } try { client.close(); } catch (Exception e) { log.warn("Cannot close client in pool", e); } - emit(listener -> listener.onConnectionClosed(tag, index)); + if (isShutdown.compareAndSet(false, true)) { + emit(listener -> listener.onConnectionClosed(tag, index)); + } } /** @@ -410,15 +432,27 @@ public void stopHeartbeat() { /** * Internal method used by reconnect task and public connect. * + *

Returns the local chain {@code cf} rather than the {@link #connectFuture} field. If the + * underlying {@code client.connect()} fails inline (e.g. connection refused), the {@code + * whenComplete} callback fires reentrantly on the calling thread, which causes {@link + * #handleConnectError(Object, Throwable)} to clear the field for reconnect purposes. The local + * variable still references the (failed) future, so callers always receive a non-null result and + * observe the failure via {@code ExecutionException} from {@code .get()}, while the field being + * null guarantees that the next reconnect attempt actually reconnects instead of returning a + * stale failed future. + * * @return {@link java.util.concurrent.CompletableFuture} with client */ - private CompletableFuture internalConnect() { + private synchronized CompletableFuture internalConnect() { + if (connectFuture != null) { + return connectFuture; + } log.info("connect {}/{}", tag, index); LongTaskTimer.Sample timer = startTimer(connectTime); CompletableFuture future = client.connect(group.getAddress(), connectTimeout, gracefulShutdown); String user = group.getUser(); - connectFuture = + CompletableFuture cf = future .thenCompose( greeting -> { @@ -428,9 +462,10 @@ private CompletableFuture internalConnect() { } return client.ping(firstPingOpts); }) - .thenApply(r -> client) - .whenComplete(this::onConnectComplete); - return connectFuture; + .thenApply(r -> client); + connectFuture = cf; + cf.whenComplete(this::onConnectComplete); + return cf; } /** @@ -462,6 +497,10 @@ private void onConnectComplete(Object r, Throwable exc) { /** * Handler for connection close. * + *

The {@code connectFuture} reset is performed under this monitor; emit, {@link #lock()}, + * {@link #shutdown()} and {@link #connectAfter()} run outside it (see {@link #shutdown()} for + * why). + * * @param r connection instance * @param exc exception which led to connection close */ @@ -470,7 +509,9 @@ private void handleConnectError(Object r, Throwable exc) { return; } Throwable failure = exc.getCause() != null ? exc.getCause() : exc; - connectFuture = null; + synchronized (this) { + connectFuture = null; + } log.error("connect error {}/{}: {}", tag, index, failure.toString()); emit(listener -> listener.onConnectionFailed(tag, index, failure)); lock(); @@ -480,13 +521,19 @@ private void handleConnectError(Object r, Throwable exc) { /** Reconnect task scheduler. */ private void connectAfter() { - log.info("reconnect {}/{} after {} ms", tag, index, reconnectAfter); - if (reconnectTask == null) { - reconnecting.incrementAndGet(); + synchronized (this) { + log.info("reconnect {}/{} after {} ms", tag, index, reconnectAfter); + if (reconnectTask != null) { + // existing task is being replaced; the existing increment in `reconnecting` carries over + // to the new task, so no counter change is needed here. + reconnectTask.cancel(); + } else { + reconnecting.incrementAndGet(); + } + reconnectTask = + timerService.newTimeout( + timeout -> internalConnect(), reconnectAfter, TimeUnit.MILLISECONDS); } - reconnectTask = - timerService.newTimeout( - timeout -> internalConnect(), reconnectAfter, TimeUnit.MILLISECONDS); emit(listener -> listener.onReconnectScheduled(tag, index, reconnectAfter)); } @@ -658,7 +705,7 @@ private void incHeartbeatCounters(int fail) { } /** Stops reconnecting task if it is active. */ - private void stopReconnectTask() { + private synchronized void stopReconnectTask() { if (reconnectTask != null) { reconnecting.decrementAndGet(); reconnectTask.cancel(); diff --git a/tarantool-pooling/src/test/java/io/tarantool/pool/integration/BasePoolTest.java b/tarantool-pooling/src/test/java/io/tarantool/pool/integration/BasePoolTest.java index a3a048b..4323d53 100644 --- a/tarantool-pooling/src/test/java/io/tarantool/pool/integration/BasePoolTest.java +++ b/tarantool-pooling/src/test/java/io/tarantool/pool/integration/BasePoolTest.java @@ -15,6 +15,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ThreadLocalRandom; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import io.micrometer.core.instrument.Counter; @@ -93,10 +94,21 @@ protected void execLua(TarantoolContainer container, String command) { protected int getActiveConnectionsCount(TarantoolContainer tt) { try { - List result = - TarantoolContainerClientHelper.executeCommandDecoded( - tt, "return box.stat.net().CONNECTIONS.current"); - return (Integer) result.get(0) - 1; + // box.stat.net().CONNECTIONS.current is updated asynchronously by the IProto + // worker; when many connections are opened in a burst it lags behind by + // 100-500 ms. Wait for it to stabilise (fiber.sleep yields the worker so it + // can accept the pending connections) before reading the value. + String lua = + "local last = box.stat.net().CONNECTIONS.current;" + + " for i = 1, 50 do" + + " require('fiber').sleep(0.05);" + + " local cur = box.stat.net().CONNECTIONS.current;" + + " if cur == last then return cur - 1 end;" + + " last = cur;" + + " end;" + + " return last - 1"; + List result = TarantoolContainerClientHelper.executeCommandDecoded(tt, lua); + return (Integer) result.get(0); } catch (Exception e) { throw new RuntimeException(e); } @@ -106,6 +118,28 @@ protected int getActiveConnectionsCountDelta(TarantoolContainer tt, int basel return getActiveConnectionsCount(tt) - baseline; } + /** + * Asserts that the active connection count on the given Tarantool container reaches {@code + * expected}, retrying until it does. The IProto worker updates {@code + * box.stat.net().CONNECTIONS.current} asynchronously, and closing administrative connections + * (e.g. the {@code net.box} connection used by {@code executeCommandDecoded}) is asynchronous on + * the server, so a single read can briefly observe a stale or transitional value. Retrying the + * assert lets the worker converge on the final value. + * + * @param tt the Tarantool container under test + * @param expected the expected number of active connections + */ + protected void waitForActiveConnections(TarantoolContainer tt, int expected) { + try { + waitFor( + "Active connections count never reached " + expected, + Duration.ofSeconds(10), + () -> assertEquals(expected, getActiveConnectionsCount(tt))); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + protected MeterRegistry createMetricsRegistry() { MeterRegistry metricsRegistry = new SimpleMeterRegistry(); LongTaskTimer.builder("request.timer") diff --git a/tarantool-pooling/src/test/java/io/tarantool/pool/integration/ConnectionPoolReconnectsTest.java b/tarantool-pooling/src/test/java/io/tarantool/pool/integration/ConnectionPoolReconnectsTest.java index 1050061..6106a47 100644 --- a/tarantool-pooling/src/test/java/io/tarantool/pool/integration/ConnectionPoolReconnectsTest.java +++ b/tarantool-pooling/src/test/java/io/tarantool/pool/integration/ConnectionPoolReconnectsTest.java @@ -77,7 +77,7 @@ public void testReconnectAfterNodeFailure() throws Exception { assertTrue(pool.hasAvailableClients()); List clients = getConnects(pool, "node-a", count1); assertTrue(pingClients(clients)); - assertEquals(count1, getActiveConnectionsCount(tt)); + waitForActiveConnections(tt, count1); tt.stop(); Thread.sleep(1000); @@ -110,7 +110,7 @@ public void testReconnectAfterNodeFailure() throws Exception { }); assertTrue(pingClients(clients)); - assertEquals(count1, getActiveConnectionsCount(tt)); + waitForActiveConnections(tt, count1); assertEquals(count1, metricsRegistry.get("pool.size").gauge().value()); assertEquals(count1, metricsRegistry.get("pool.available").gauge().value()); From 6ba4d21f5d0047d02549cf7b90d0af708cf9ef78 Mon Sep 17 00:00:00 2001 From: Dmitry Kasimovskiy Date: Fri, 19 Jun 2026 11:15:01 +0300 Subject: [PATCH 2/4] docs(changelog): add Pooling bug-fix entry for race/deadlock/leak fixes Co-Authored-By: Claude Opus 4.7 --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c259d11..bac13c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,11 @@ - Document supported Java types for Tarantool data mapping in `tuple_pojo_mapping` docs (RU/EN), including Tarantool extension types (`decimal`, `uuid`, `datetime`, `interval`, `tuple`) and related mapping notes. - Document Jackson MsgPack deserialization: integers, `bin`/`str` vs `byte[]`/`String`, floating-point vs `decimal`; reference `jackson-dataformat-msgpack` for defaults and type coercion. +### Pooling + +- Fix race conditions, ABBA deadlock between `PoolEntry` and `ConnectionImpl` monitors, NPE on inline connect failure, and connection leak after a KILL/reconnect cycle in `PoolEntry` and `IProtoClientPoolImpl`. +- Synchronize `IProtoClientPoolImpl.forEach()` on the pool lock to avoid `ConcurrentModificationException` under concurrent `setGroups()`. + ### Dependencies - Updated dependencies: From 3d7d7a1ff3b78c73722a389f7f55de069175ddd0 Mon Sep 17 00:00:00 2001 From: Dmitry Kasimovskiy Date: Fri, 19 Jun 2026 17:28:01 +0300 Subject: [PATCH 3/4] fix(pooling): fix race conditions, deadlock, and connection leak in PoolEntry Fix a relocated ABBA deadlock: `PoolEntry.connect()`/`internalConnect()` no longer hold the entry monitor across `client.connect()`, which would deadlock with the Netty close-callback path on the shared `ConnectionImpl` monitor during a close/reconnect overlap. Fix `onConnectionClosed` accounting: the `PoolEntry` shutdown idempotency flag is now reset per connection generation, so a KILL/reconnect cycle emits one close event per generation instead of suppressing all closes after the first. `IProtoClientPoolImpl.forEach()` snapshots clients under the pool lock and invokes the action outside it, avoiding `ConcurrentModificationException` under concurrent `setGroups()` without holding the lock across a user callback. --- CHANGELOG.md | 1 - .../tarantool/pool/IProtoClientPoolImpl.java | 4 +- .../java/io/tarantool/pool/PoolEntry.java | 48 +++++++++++-------- 3 files changed, 31 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bac13c8..819fea0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,7 +23,6 @@ ### Pooling - Fix race conditions, ABBA deadlock between `PoolEntry` and `ConnectionImpl` monitors, NPE on inline connect failure, and connection leak after a KILL/reconnect cycle in `PoolEntry` and `IProtoClientPoolImpl`. -- Synchronize `IProtoClientPoolImpl.forEach()` on the pool lock to avoid `ConcurrentModificationException` under concurrent `setGroups()`. ### Dependencies - Updated dependencies: diff --git a/tarantool-pooling/src/main/java/io/tarantool/pool/IProtoClientPoolImpl.java b/tarantool-pooling/src/main/java/io/tarantool/pool/IProtoClientPoolImpl.java index 2bc7523..dfee534 100644 --- a/tarantool-pooling/src/main/java/io/tarantool/pool/IProtoClientPoolImpl.java +++ b/tarantool-pooling/src/main/java/io/tarantool/pool/IProtoClientPoolImpl.java @@ -524,13 +524,15 @@ public void setReconnectAfter(long reconnectAfter) throws IllegalArgumentExcepti @Override public void forEach(Consumer action) { + List clients = new ArrayList<>(); synchronized (connectionPoolLock) { for (List group : entries.values()) { for (PoolEntry entry : group) { - action.accept(entry.getClient()); + clients.add(entry.getClient()); } } } + clients.forEach(action); } /** diff --git a/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java b/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java index def6ec2..e8505f0 100644 --- a/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java +++ b/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java @@ -167,11 +167,13 @@ final class PoolEntry { private volatile boolean isLocked; /** - * Idempotency flag for {@link #shutdown()}. + * Idempotency flag for {@link #shutdown()}, scoped to a single connection generation. * - *

Guarantees that the close listener is invoked only once even if both {@link #close()} and - * {@link #shutdown()} are called, or shutdown is invoked multiple times due to overlapping - * connection error events. + *

Guarantees that the close listener is invoked only once per generation even if both {@link + * #close()} and {@link #shutdown()} are called, or shutdown is invoked multiple times due to + * overlapping connection error events. It is reset in {@link #internalConnect()} when a new + * connection generation begins, so a KILL/reconnect cycle still emits one {@code + * onConnectionClosed} per generation. */ private final AtomicBoolean isShutdown = new AtomicBoolean(false); @@ -379,10 +381,7 @@ public void shutdown() { * * @return {@link java.util.concurrent.CompletableFuture} with client */ - public synchronized CompletableFuture connect() { - if (connectFuture != null) { - return connectFuture; - } + public CompletableFuture connect() { return internalConnect(); } @@ -432,20 +431,21 @@ public void stopHeartbeat() { /** * Internal method used by reconnect task and public connect. * - *

Returns the local chain {@code cf} rather than the {@link #connectFuture} field. If the - * underlying {@code client.connect()} fails inline (e.g. connection refused), the {@code - * whenComplete} callback fires reentrantly on the calling thread, which causes {@link - * #handleConnectError(Object, Throwable)} to clear the field for reconnect purposes. The local - * variable still references the (failed) future, so callers always receive a non-null result and - * observe the failure via {@code ExecutionException} from {@code .get()}, while the field being - * null guarantees that the next reconnect attempt actually reconnects instead of returning a - * stale failed future. + *

The {@code client.connect()} chain is built without holding the entry monitor: holding it + * across {@code client.connect()} (which takes the {@code ConnectionImpl} monitor) would create + * an ABBA deadlock with the Netty close-callback path, which takes the {@code ConnectionImpl} + * monitor first and then re-enters {@link #handleConnectError(Object, Throwable)} on the entry. + * Concurrent callers are de-duplicated via the {@link #connectFuture} field; if two threads race + * past the first check, the second {@code client.connect()} is a no-op because {@code + * ConnectionImpl.connect()} only opens a channel on a {@code CLOSED -> CONNECTING} transition. * * @return {@link java.util.concurrent.CompletableFuture} with client */ - private synchronized CompletableFuture internalConnect() { - if (connectFuture != null) { - return connectFuture; + private CompletableFuture internalConnect() { + synchronized (this) { + if (connectFuture != null) { + return connectFuture; + } } log.info("connect {}/{}", tag, index); LongTaskTimer.Sample timer = startTimer(connectTime); @@ -463,7 +463,15 @@ private synchronized CompletableFuture internalConnect() { return client.ping(firstPingOpts); }) .thenApply(r -> client); - connectFuture = cf; + synchronized (this) { + if (connectFuture != null) { + // ponytail: lost the race; ConnectionImpl's CLOSED->CONNECTING CAS already de-duped the + // channel, so our `cf` wraps the same promise and is safely discarded. + return connectFuture; + } + connectFuture = cf; + isShutdown.set(false); + } cf.whenComplete(this::onConnectComplete); return cf; } From f8ca9232d10263aa56563de45969d3cc7ba9d63f Mon Sep 17 00:00:00 2001 From: Dmitry Kasimovskiy Date: Fri, 19 Jun 2026 18:24:57 +0300 Subject: [PATCH 4/4] fix(pooling): fix race conditions, deadlock, and connection leak in PoolEntry Fix a relocated ABBA deadlock: `PoolEntry.connect()`/`internalConnect()` no longer hold the entry monitor across `client.connect()`, which would deadlock with the Netty close-callback path on the shared `ConnectionImpl` monitor during a close/reconnect overlap. Fix `onConnectionClosed` accounting: the `PoolEntry` shutdown idempotency flag is now reset per connection generation, so a KILL/reconnect cycle emits one close event per generation instead of suppressing all closes after the first. `IProtoClientPoolImpl.forEach()` snapshots clients under the pool lock and invokes the action outside it, avoiding `ConcurrentModificationException` under concurrent `setGroups()` without holding the lock across a user callback. --- .../java/io/tarantool/pool/PoolEntry.java | 24 ++++--------------- .../pool/integration/BasePoolTest.java | 14 ++++------- 2 files changed, 8 insertions(+), 30 deletions(-) diff --git a/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java b/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java index e8505f0..bfe401c 100644 --- a/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java +++ b/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java @@ -167,13 +167,8 @@ final class PoolEntry { private volatile boolean isLocked; /** - * Idempotency flag for {@link #shutdown()}, scoped to a single connection generation. - * - *

Guarantees that the close listener is invoked only once per generation even if both {@link - * #close()} and {@link #shutdown()} are called, or shutdown is invoked multiple times due to - * overlapping connection error events. It is reset in {@link #internalConnect()} when a new - * connection generation begins, so a KILL/reconnect cycle still emits one {@code - * onConnectionClosed} per generation. + * Per-generation idempotency flag for {@link #shutdown()} close-event emit; reset in {@link + * #internalConnect()} when a new connection generation begins. */ private final AtomicBoolean isShutdown = new AtomicBoolean(false); @@ -431,13 +426,8 @@ public void stopHeartbeat() { /** * Internal method used by reconnect task and public connect. * - *

The {@code client.connect()} chain is built without holding the entry monitor: holding it - * across {@code client.connect()} (which takes the {@code ConnectionImpl} monitor) would create - * an ABBA deadlock with the Netty close-callback path, which takes the {@code ConnectionImpl} - * monitor first and then re-enters {@link #handleConnectError(Object, Throwable)} on the entry. - * Concurrent callers are de-duplicated via the {@link #connectFuture} field; if two threads race - * past the first check, the second {@code client.connect()} is a no-op because {@code - * ConnectionImpl.connect()} only opens a channel on a {@code CLOSED -> CONNECTING} transition. + *

See {@link #shutdown()} for the monitor-ordering reasoning; {@code client.connect()} runs + * outside the entry monitor for the same reason. * * @return {@link java.util.concurrent.CompletableFuture} with client */ @@ -465,8 +455,6 @@ private CompletableFuture internalConnect() { .thenApply(r -> client); synchronized (this) { if (connectFuture != null) { - // ponytail: lost the race; ConnectionImpl's CLOSED->CONNECTING CAS already de-duped the - // channel, so our `cf` wraps the same promise and is safely discarded. return connectFuture; } connectFuture = cf; @@ -505,10 +493,6 @@ private void onConnectComplete(Object r, Throwable exc) { /** * Handler for connection close. * - *

The {@code connectFuture} reset is performed under this monitor; emit, {@link #lock()}, - * {@link #shutdown()} and {@link #connectAfter()} run outside it (see {@link #shutdown()} for - * why). - * * @param r connection instance * @param exc exception which led to connection close */ diff --git a/tarantool-pooling/src/test/java/io/tarantool/pool/integration/BasePoolTest.java b/tarantool-pooling/src/test/java/io/tarantool/pool/integration/BasePoolTest.java index 4323d53..0b69b8f 100644 --- a/tarantool-pooling/src/test/java/io/tarantool/pool/integration/BasePoolTest.java +++ b/tarantool-pooling/src/test/java/io/tarantool/pool/integration/BasePoolTest.java @@ -94,10 +94,8 @@ protected void execLua(TarantoolContainer container, String command) { protected int getActiveConnectionsCount(TarantoolContainer tt) { try { - // box.stat.net().CONNECTIONS.current is updated asynchronously by the IProto - // worker; when many connections are opened in a burst it lags behind by - // 100-500 ms. Wait for it to stabilise (fiber.sleep yields the worker so it - // can accept the pending connections) before reading the value. + // box.stat.net().CONNECTIONS.current is updated asynchronously by the IProto worker; + // the loop's fiber.sleep lets it drain pending connections before we read. String lua = "local last = box.stat.net().CONNECTIONS.current;" + " for i = 1, 50 do" @@ -119,12 +117,8 @@ protected int getActiveConnectionsCountDelta(TarantoolContainer tt, int basel } /** - * Asserts that the active connection count on the given Tarantool container reaches {@code - * expected}, retrying until it does. The IProto worker updates {@code - * box.stat.net().CONNECTIONS.current} asynchronously, and closing administrative connections - * (e.g. the {@code net.box} connection used by {@code executeCommandDecoded}) is asynchronous on - * the server, so a single read can briefly observe a stale or transitional value. Retrying the - * assert lets the worker converge on the final value. + * Retries {@link #getActiveConnectionsCount} until it equals {@code expected} — see there for why + * a single read is unreliable. * * @param tt the Tarantool container under test * @param expected the expected number of active connections