Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
- Document supported Java types for Tarantool data mapping in `tuple_pojo_mapping` docs (RU/EN), including Tarantool extension types (`decimal`, `uuid`, `datetime`, `interval`, `tuple`) and related mapping notes.
- Document Jackson MsgPack deserialization: integers, `bin`/`str` vs `byte[]`/`String`, floating-point vs `decimal`; reference `jackson-dataformat-msgpack` for defaults and type coercion.

### Pooling

- Fix race conditions, ABBA deadlock between `PoolEntry` and `ConnectionImpl` monitors, NPE on inline connect failure, and connection leak after a KILL/reconnect cycle in `PoolEntry` and `IProtoClientPoolImpl`.

### Dependencies
- Updated dependencies:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,11 +524,15 @@ public void setReconnectAfter(long reconnectAfter) throws IllegalArgumentExcepti

@Override
public void forEach(Consumer<IProtoClient> action) {
for (List<PoolEntry> group : entries.values()) {
for (PoolEntry entry : group) {
action.accept(entry.getClient());
List<IProtoClient> clients = new ArrayList<>();
synchronized (connectionPoolLock) {
for (List<PoolEntry> group : entries.values()) {
for (PoolEntry entry : group) {
clients.add(entry.getClient());
}
}
}
clients.forEach(action);
}

/**
Expand Down
91 changes: 65 additions & 26 deletions tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.ArrayDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand Down Expand Up @@ -146,24 +147,30 @@ final class PoolEntry {
private CompletableFuture<IProtoClient> connectFuture;

/** Last heartbeat state/event. */
private HeartbeatEvent lastHeartbeatEvent;
private volatile HeartbeatEvent lastHeartbeatEvent;

/** Heartbeat timer/task. */
private Timeout heartbeatTask;
private volatile Timeout heartbeatTask;

/** Reconnection task. */
private Timeout reconnectTask;

/** Flag signaling if heartbeat started or not. */
private boolean isHeartbeatStarted;
private volatile boolean isHeartbeatStarted;

/**
* Flag signaling if connection is available or not.
*
* <p>When connection comes to invalidated state or killed, pool entry is locked and connection
* will not be returned to outer client.
*/
private boolean isLocked;
private volatile boolean isLocked;

/**
* Per-generation idempotency flag for {@link #shutdown()} close-event emit; reset in {@link
* #internalConnect()} when a new connection generation begins.
*/
private final AtomicBoolean isShutdown = new AtomicBoolean(false);

/** Count of failed pings occurred in invalidated state. */
private int currentDeathPings;
Expand Down Expand Up @@ -305,7 +312,7 @@ public IProtoClient getClient() {
*
* <p>Also increments count of unavailable clients.
*/
public void lock() {
public synchronized void lock() {
if (!isLocked) {
unavailable.incrementAndGet();
isLocked = true;
Expand All @@ -317,7 +324,7 @@ public void lock() {
*
* <p>Also decrements count of unavailable clients and cancels reconnect task.
*/
public void unlock() {
public synchronized void unlock() {
if (isLocked) {
stopReconnectTask();
unavailable.decrementAndGet();
Expand All @@ -340,27 +347,36 @@ public void close() {
shutdown();
}

/** Closes client and stops heartbeat task is started. */
/**
* Closes the underlying client and stops the heartbeat task.
*
* <p>Performs field mutations under the entry monitor, then releases it before calling {@code
* client.close()} (which acquires the {@code ConnectionImpl} monitor) and emitting the close
* event. Holding the entry monitor across either of those calls would create an ABBA deadlock
* with the Netty close-callback path, which takes the {@code ConnectionImpl} monitor first and
* then re-enters {@link #handleConnectError(Object, Throwable)} on the entry.
*/
public void shutdown() {
connectFuture = null;
stopHeartbeat();
synchronized (this) {
connectFuture = null;
stopHeartbeat();
}
try {
client.close();
} catch (Exception e) {
log.warn("Cannot close client in pool", e);
}
emit(listener -> listener.onConnectionClosed(tag, index));
if (isShutdown.compareAndSet(false, true)) {
emit(listener -> listener.onConnectionClosed(tag, index));
}
}

/**
* Start client connection process and returns futures.
*
* @return {@link java.util.concurrent.CompletableFuture} with client
*/
public synchronized CompletableFuture<IProtoClient> connect() {
if (connectFuture != null) {
return connectFuture;
}
public CompletableFuture<IProtoClient> connect() {
return internalConnect();
}

Expand Down Expand Up @@ -410,15 +426,23 @@ public void stopHeartbeat() {
/**
* Internal method used by reconnect task and public connect.
*
* <p>See {@link #shutdown()} for the monitor-ordering reasoning; {@code client.connect()} runs
* outside the entry monitor for the same reason.
*
* @return {@link java.util.concurrent.CompletableFuture} with client
*/
private CompletableFuture<IProtoClient> internalConnect() {
synchronized (this) {
if (connectFuture != null) {
return connectFuture;
}
}
log.info("connect {}/{}", tag, index);
LongTaskTimer.Sample timer = startTimer(connectTime);
CompletableFuture<?> future =
client.connect(group.getAddress(), connectTimeout, gracefulShutdown);
String user = group.getUser();
connectFuture =
CompletableFuture<IProtoClient> cf =
future
.thenCompose(
greeting -> {
Expand All @@ -428,9 +452,16 @@ private CompletableFuture<IProtoClient> internalConnect() {
}
return client.ping(firstPingOpts);
})
.thenApply(r -> client)
.whenComplete(this::onConnectComplete);
return connectFuture;
.thenApply(r -> client);
synchronized (this) {
if (connectFuture != null) {
return connectFuture;
}
connectFuture = cf;
isShutdown.set(false);
}
cf.whenComplete(this::onConnectComplete);
return cf;
}

/**
Expand Down Expand Up @@ -470,7 +501,9 @@ private void handleConnectError(Object r, Throwable exc) {
return;
}
Throwable failure = exc.getCause() != null ? exc.getCause() : exc;
connectFuture = null;
synchronized (this) {
connectFuture = null;
}
log.error("connect error {}/{}: {}", tag, index, failure.toString());
emit(listener -> listener.onConnectionFailed(tag, index, failure));
lock();
Expand All @@ -480,13 +513,19 @@ private void handleConnectError(Object r, Throwable exc) {

/** Reconnect task scheduler. */
private void connectAfter() {
log.info("reconnect {}/{} after {} ms", tag, index, reconnectAfter);
if (reconnectTask == null) {
reconnecting.incrementAndGet();
synchronized (this) {
log.info("reconnect {}/{} after {} ms", tag, index, reconnectAfter);
if (reconnectTask != null) {
// existing task is being replaced; the existing increment in `reconnecting` carries over
// to the new task, so no counter change is needed here.
reconnectTask.cancel();
} else {
reconnecting.incrementAndGet();
}
reconnectTask =
timerService.newTimeout(
timeout -> internalConnect(), reconnectAfter, TimeUnit.MILLISECONDS);
}
reconnectTask =
timerService.newTimeout(
timeout -> internalConnect(), reconnectAfter, TimeUnit.MILLISECONDS);
emit(listener -> listener.onReconnectScheduled(tag, index, reconnectAfter));
}

Expand Down Expand Up @@ -658,7 +697,7 @@ private void incHeartbeatCounters(int fail) {
}

/** Stops reconnecting task if it is active. */
private void stopReconnectTask() {
private synchronized void stopReconnectTask() {
if (reconnectTask != null) {
reconnecting.decrementAndGet();
reconnectTask.cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ThreadLocalRandom;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import io.micrometer.core.instrument.Counter;
Expand Down Expand Up @@ -93,10 +94,19 @@ protected void execLua(TarantoolContainer<?> container, String command) {

protected int getActiveConnectionsCount(TarantoolContainer<?> tt) {
try {
List<? extends Object> result =
TarantoolContainerClientHelper.executeCommandDecoded(
tt, "return box.stat.net().CONNECTIONS.current");
return (Integer) result.get(0) - 1;
// box.stat.net().CONNECTIONS.current is updated asynchronously by the IProto worker;
// the loop's fiber.sleep lets it drain pending connections before we read.
String lua =
"local last = box.stat.net().CONNECTIONS.current;"
+ " for i = 1, 50 do"
+ " require('fiber').sleep(0.05);"
+ " local cur = box.stat.net().CONNECTIONS.current;"
+ " if cur == last then return cur - 1 end;"
+ " last = cur;"
+ " end;"
+ " return last - 1";
List<? extends Object> result = TarantoolContainerClientHelper.executeCommandDecoded(tt, lua);
return (Integer) result.get(0);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -106,6 +116,24 @@ protected int getActiveConnectionsCountDelta(TarantoolContainer<?> tt, int basel
return getActiveConnectionsCount(tt) - baseline;
}

/**
* Retries {@link #getActiveConnectionsCount} until it equals {@code expected} — see there for why
* a single read is unreliable.
*
* @param tt the Tarantool container under test
* @param expected the expected number of active connections
*/
protected void waitForActiveConnections(TarantoolContainer<?> tt, int expected) {
try {
waitFor(
"Active connections count never reached " + expected,
Duration.ofSeconds(10),
() -> assertEquals(expected, getActiveConnectionsCount(tt)));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

protected MeterRegistry createMetricsRegistry() {
MeterRegistry metricsRegistry = new SimpleMeterRegistry();
LongTaskTimer.builder("request.timer")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void testReconnectAfterNodeFailure() throws Exception {
assertTrue(pool.hasAvailableClients());
List<IProtoClient> clients = getConnects(pool, "node-a", count1);
assertTrue(pingClients(clients));
assertEquals(count1, getActiveConnectionsCount(tt));
waitForActiveConnections(tt, count1);

tt.stop();
Thread.sleep(1000);
Expand Down Expand Up @@ -110,7 +110,7 @@ public void testReconnectAfterNodeFailure() throws Exception {
});

assertTrue(pingClients(clients));
assertEquals(count1, getActiveConnectionsCount(tt));
waitForActiveConnections(tt, count1);

assertEquals(count1, metricsRegistry.get("pool.size").gauge().value());
assertEquals(count1, metricsRegistry.get("pool.available").gauge().value());
Expand Down