diff --git a/CHANGELOG.md b/CHANGELOG.md index c259d11..819fea0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,10 @@ - Document supported Java types for Tarantool data mapping in `tuple_pojo_mapping` docs (RU/EN), including Tarantool extension types (`decimal`, `uuid`, `datetime`, `interval`, `tuple`) and related mapping notes. - Document Jackson MsgPack deserialization: integers, `bin`/`str` vs `byte[]`/`String`, floating-point vs `decimal`; reference `jackson-dataformat-msgpack` for defaults and type coercion. +### Pooling + +- Fix race conditions, ABBA deadlock between `PoolEntry` and `ConnectionImpl` monitors, NPE on inline connect failure, and connection leak after a KILL/reconnect cycle in `PoolEntry` and `IProtoClientPoolImpl`. + ### Dependencies - Updated dependencies: diff --git a/tarantool-pooling/src/main/java/io/tarantool/pool/IProtoClientPoolImpl.java b/tarantool-pooling/src/main/java/io/tarantool/pool/IProtoClientPoolImpl.java index 953c1bb..dfee534 100644 --- a/tarantool-pooling/src/main/java/io/tarantool/pool/IProtoClientPoolImpl.java +++ b/tarantool-pooling/src/main/java/io/tarantool/pool/IProtoClientPoolImpl.java @@ -524,11 +524,15 @@ public void setReconnectAfter(long reconnectAfter) throws IllegalArgumentExcepti @Override public void forEach(Consumer action) { - for (List group : entries.values()) { - for (PoolEntry entry : group) { - action.accept(entry.getClient()); + List clients = new ArrayList<>(); + synchronized (connectionPoolLock) { + for (List group : entries.values()) { + for (PoolEntry entry : group) { + clients.add(entry.getClient()); + } } } + clients.forEach(action); } /** diff --git a/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java b/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java index a746b9c..bfe401c 100644 --- a/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java +++ b/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java @@ -8,6 +8,7 @@ import java.util.ArrayDeque; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -146,16 +147,16 @@ final class PoolEntry { private CompletableFuture connectFuture; /** Last heartbeat state/event. */ - private HeartbeatEvent lastHeartbeatEvent; + private volatile HeartbeatEvent lastHeartbeatEvent; /** Heartbeat timer/task. */ - private Timeout heartbeatTask; + private volatile Timeout heartbeatTask; /** Reconnection task. */ private Timeout reconnectTask; /** Flag signaling if heartbeat started or not. */ - private boolean isHeartbeatStarted; + private volatile boolean isHeartbeatStarted; /** * Flag signaling if connection is available or not. @@ -163,7 +164,13 @@ final class PoolEntry { *

When connection comes to invalidated state or killed, pool entry is locked and connection * will not be returned to outer client. */ - private boolean isLocked; + private volatile boolean isLocked; + + /** + * Per-generation idempotency flag for {@link #shutdown()} close-event emit; reset in {@link + * #internalConnect()} when a new connection generation begins. + */ + private final AtomicBoolean isShutdown = new AtomicBoolean(false); /** Count of failed pings occurred in invalidated state. */ private int currentDeathPings; @@ -305,7 +312,7 @@ public IProtoClient getClient() { * *

Also increments count of unavailable clients. */ - public void lock() { + public synchronized void lock() { if (!isLocked) { unavailable.incrementAndGet(); isLocked = true; @@ -317,7 +324,7 @@ public void lock() { * *

Also decrements count of unavailable clients and cancels reconnect task. */ - public void unlock() { + public synchronized void unlock() { if (isLocked) { stopReconnectTask(); unavailable.decrementAndGet(); @@ -340,16 +347,28 @@ public void close() { shutdown(); } - /** Closes client and stops heartbeat task is started. */ + /** + * Closes the underlying client and stops the heartbeat task. + * + *

Performs field mutations under the entry monitor, then releases it before calling {@code + * client.close()} (which acquires the {@code ConnectionImpl} monitor) and emitting the close + * event. Holding the entry monitor across either of those calls would create an ABBA deadlock + * with the Netty close-callback path, which takes the {@code ConnectionImpl} monitor first and + * then re-enters {@link #handleConnectError(Object, Throwable)} on the entry. + */ public void shutdown() { - connectFuture = null; - stopHeartbeat(); + synchronized (this) { + connectFuture = null; + stopHeartbeat(); + } try { client.close(); } catch (Exception e) { log.warn("Cannot close client in pool", e); } - emit(listener -> listener.onConnectionClosed(tag, index)); + if (isShutdown.compareAndSet(false, true)) { + emit(listener -> listener.onConnectionClosed(tag, index)); + } } /** @@ -357,10 +376,7 @@ public void shutdown() { * * @return {@link java.util.concurrent.CompletableFuture} with client */ - public synchronized CompletableFuture connect() { - if (connectFuture != null) { - return connectFuture; - } + public CompletableFuture connect() { return internalConnect(); } @@ -410,15 +426,23 @@ public void stopHeartbeat() { /** * Internal method used by reconnect task and public connect. * + *

See {@link #shutdown()} for the monitor-ordering reasoning; {@code client.connect()} runs + * outside the entry monitor for the same reason. + * * @return {@link java.util.concurrent.CompletableFuture} with client */ private CompletableFuture internalConnect() { + synchronized (this) { + if (connectFuture != null) { + return connectFuture; + } + } log.info("connect {}/{}", tag, index); LongTaskTimer.Sample timer = startTimer(connectTime); CompletableFuture future = client.connect(group.getAddress(), connectTimeout, gracefulShutdown); String user = group.getUser(); - connectFuture = + CompletableFuture cf = future .thenCompose( greeting -> { @@ -428,9 +452,16 @@ private CompletableFuture internalConnect() { } return client.ping(firstPingOpts); }) - .thenApply(r -> client) - .whenComplete(this::onConnectComplete); - return connectFuture; + .thenApply(r -> client); + synchronized (this) { + if (connectFuture != null) { + return connectFuture; + } + connectFuture = cf; + isShutdown.set(false); + } + cf.whenComplete(this::onConnectComplete); + return cf; } /** @@ -470,7 +501,9 @@ private void handleConnectError(Object r, Throwable exc) { return; } Throwable failure = exc.getCause() != null ? exc.getCause() : exc; - connectFuture = null; + synchronized (this) { + connectFuture = null; + } log.error("connect error {}/{}: {}", tag, index, failure.toString()); emit(listener -> listener.onConnectionFailed(tag, index, failure)); lock(); @@ -480,13 +513,19 @@ private void handleConnectError(Object r, Throwable exc) { /** Reconnect task scheduler. */ private void connectAfter() { - log.info("reconnect {}/{} after {} ms", tag, index, reconnectAfter); - if (reconnectTask == null) { - reconnecting.incrementAndGet(); + synchronized (this) { + log.info("reconnect {}/{} after {} ms", tag, index, reconnectAfter); + if (reconnectTask != null) { + // existing task is being replaced; the existing increment in `reconnecting` carries over + // to the new task, so no counter change is needed here. + reconnectTask.cancel(); + } else { + reconnecting.incrementAndGet(); + } + reconnectTask = + timerService.newTimeout( + timeout -> internalConnect(), reconnectAfter, TimeUnit.MILLISECONDS); } - reconnectTask = - timerService.newTimeout( - timeout -> internalConnect(), reconnectAfter, TimeUnit.MILLISECONDS); emit(listener -> listener.onReconnectScheduled(tag, index, reconnectAfter)); } @@ -658,7 +697,7 @@ private void incHeartbeatCounters(int fail) { } /** Stops reconnecting task if it is active. */ - private void stopReconnectTask() { + private synchronized void stopReconnectTask() { if (reconnectTask != null) { reconnecting.decrementAndGet(); reconnectTask.cancel(); diff --git a/tarantool-pooling/src/test/java/io/tarantool/pool/integration/BasePoolTest.java b/tarantool-pooling/src/test/java/io/tarantool/pool/integration/BasePoolTest.java index a3a048b..0b69b8f 100644 --- a/tarantool-pooling/src/test/java/io/tarantool/pool/integration/BasePoolTest.java +++ b/tarantool-pooling/src/test/java/io/tarantool/pool/integration/BasePoolTest.java @@ -15,6 +15,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ThreadLocalRandom; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import io.micrometer.core.instrument.Counter; @@ -93,10 +94,19 @@ protected void execLua(TarantoolContainer container, String command) { protected int getActiveConnectionsCount(TarantoolContainer tt) { try { - List result = - TarantoolContainerClientHelper.executeCommandDecoded( - tt, "return box.stat.net().CONNECTIONS.current"); - return (Integer) result.get(0) - 1; + // box.stat.net().CONNECTIONS.current is updated asynchronously by the IProto worker; + // the loop's fiber.sleep lets it drain pending connections before we read. + String lua = + "local last = box.stat.net().CONNECTIONS.current;" + + " for i = 1, 50 do" + + " require('fiber').sleep(0.05);" + + " local cur = box.stat.net().CONNECTIONS.current;" + + " if cur == last then return cur - 1 end;" + + " last = cur;" + + " end;" + + " return last - 1"; + List result = TarantoolContainerClientHelper.executeCommandDecoded(tt, lua); + return (Integer) result.get(0); } catch (Exception e) { throw new RuntimeException(e); } @@ -106,6 +116,24 @@ protected int getActiveConnectionsCountDelta(TarantoolContainer tt, int basel return getActiveConnectionsCount(tt) - baseline; } + /** + * Retries {@link #getActiveConnectionsCount} until it equals {@code expected} — see there for why + * a single read is unreliable. + * + * @param tt the Tarantool container under test + * @param expected the expected number of active connections + */ + protected void waitForActiveConnections(TarantoolContainer tt, int expected) { + try { + waitFor( + "Active connections count never reached " + expected, + Duration.ofSeconds(10), + () -> assertEquals(expected, getActiveConnectionsCount(tt))); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + protected MeterRegistry createMetricsRegistry() { MeterRegistry metricsRegistry = new SimpleMeterRegistry(); LongTaskTimer.builder("request.timer") diff --git a/tarantool-pooling/src/test/java/io/tarantool/pool/integration/ConnectionPoolReconnectsTest.java b/tarantool-pooling/src/test/java/io/tarantool/pool/integration/ConnectionPoolReconnectsTest.java index 1050061..6106a47 100644 --- a/tarantool-pooling/src/test/java/io/tarantool/pool/integration/ConnectionPoolReconnectsTest.java +++ b/tarantool-pooling/src/test/java/io/tarantool/pool/integration/ConnectionPoolReconnectsTest.java @@ -77,7 +77,7 @@ public void testReconnectAfterNodeFailure() throws Exception { assertTrue(pool.hasAvailableClients()); List clients = getConnects(pool, "node-a", count1); assertTrue(pingClients(clients)); - assertEquals(count1, getActiveConnectionsCount(tt)); + waitForActiveConnections(tt, count1); tt.stop(); Thread.sleep(1000); @@ -110,7 +110,7 @@ public void testReconnectAfterNodeFailure() throws Exception { }); assertTrue(pingClients(clients)); - assertEquals(count1, getActiveConnectionsCount(tt)); + waitForActiveConnections(tt, count1); assertEquals(count1, metricsRegistry.get("pool.size").gauge().value()); assertEquals(count1, metricsRegistry.get("pool.available").gauge().value());