diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/TargetServerType.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/TargetServerType.java new file mode 100644 index 0000000000..d934d8b0d2 --- /dev/null +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/TargetServerType.java @@ -0,0 +1,47 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.vertx.pgclient; + +/** + * Defines the target server type when connecting to a PostgreSQL cluster with multiple hosts. + * This is similar to the {@code targetServerType} parameter in the PostgreSQL JDBC driver + * and {@code target_session_attrs} in libpq. + */ +public enum TargetServerType { + + /** + * Connect to any server (default). + */ + ANY, + + /** + * Connect only to a primary (read-write) server. + */ + PRIMARY, + + /** + * Connect only to a secondary (read-only/hot standby) server. + */ + SECONDARY, + + /** + * Prefer connecting to a primary server, but fall back to a secondary if no primary is available. + */ + PREFER_PRIMARY, + + /** + * Prefer connecting to a secondary server, but fall back to a primary if no secondary is available. + */ + PREFER_SECONDARY +} diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgPoolOptions.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgPoolOptions.java index f493891c16..3896b94ba7 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgPoolOptions.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgPoolOptions.java @@ -16,18 +16,30 @@ */ package io.vertx.pgclient.impl; +import io.vertx.pgclient.PgConnectOptions; +import io.vertx.pgclient.TargetServerType; import io.vertx.sqlclient.PoolOptions; +import java.util.List; + public class PgPoolOptions extends PoolOptions { - public PgPoolOptions(PoolOptions other) { - super(other); - } + private boolean pipelined; + private TargetServerType targetServerType = TargetServerType.ANY; + private List servers; public PgPoolOptions() { } - private boolean pipelined; + public PgPoolOptions(PoolOptions other) { + super(other); + if (other instanceof PgPoolOptions) { + PgPoolOptions pgOther = (PgPoolOptions) other; + this.pipelined = pgOther.pipelined; + this.targetServerType = pgOther.targetServerType; + this.servers = pgOther.servers; + } + } public boolean isPipelined() { return pipelined; @@ -37,4 +49,22 @@ public PgPoolOptions setPipelined(boolean pipelined) { this.pipelined = pipelined; return this; } + + public TargetServerType getTargetServerType() { + return targetServerType; + } + + public PgPoolOptions setTargetServerType(TargetServerType targetServerType) { + this.targetServerType = targetServerType; + return this; + } + + public List getServers() { + return servers; + } + + public PgPoolOptions setServers(List servers) { + this.servers = servers; + return this; + } } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java index 0f332a1356..edb0b7157f 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java @@ -57,6 +57,7 @@ public class PgSocketConnection extends SocketConnectionBase { public int processId; public int secretKey; public PgDatabaseMetadata dbMetaData; + public ServerType serverType = ServerType.UNDEFINED; private PgConnectOptions connectOptions; public PgSocketConnection(NetSocketInternal socket, diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/ServerType.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/ServerType.java new file mode 100644 index 0000000000..05bded5292 --- /dev/null +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/ServerType.java @@ -0,0 +1,24 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.vertx.pgclient.impl; + +/** + * Internal enum representing the detected server type of a PostgreSQL instance. + */ +public enum ServerType { + + PRIMARY, + SECONDARY, + UNDEFINED +} diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/ServerTypeAwarePgConnectionFactory.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/ServerTypeAwarePgConnectionFactory.java new file mode 100644 index 0000000000..7005ed27bd --- /dev/null +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/ServerTypeAwarePgConnectionFactory.java @@ -0,0 +1,267 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.vertx.pgclient.impl; + +import io.vertx.core.Completable; +import io.vertx.core.Context; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.pgclient.PgConnectOptions; +import io.vertx.pgclient.TargetServerType; +import io.vertx.sqlclient.PropertyKind; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.internal.QueryResultHandler; +import io.vertx.sqlclient.internal.RowDescriptorBase; +import io.vertx.sqlclient.spi.connection.Connection; +import io.vertx.sqlclient.spi.connection.ConnectionFactory; +import io.vertx.sqlclient.spi.protocol.SimpleQueryCommand; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collector; + +/** + * A {@link ConnectionFactory} wrapper that selects connections based on the PostgreSQL server type + * (primary, secondary). It wraps a {@link PgConnectionFactory} delegate and implements + * round-robin connection attempts with server type detection. + */ +public class ServerTypeAwarePgConnectionFactory implements ConnectionFactory { + + private final PgConnectionFactory delegate; + private final List servers; + private final TargetServerType targetServerType; + private final ServerType[] cachedTypes; + private final AtomicInteger roundRobinIdx; + + public ServerTypeAwarePgConnectionFactory(PgConnectionFactory delegate, + List servers, + TargetServerType targetServerType) { + this.delegate = delegate; + this.servers = servers; + this.targetServerType = targetServerType; + this.cachedTypes = new ServerType[servers.size()]; + Arrays.fill(cachedTypes, ServerType.UNDEFINED); + this.roundRobinIdx = new AtomicInteger(0); + } + + @Override + public Future connect(Context context, Future fut) { + // Ignore the pool's options supplier; we manage our own server list + return connectWithServerType(context); + } + + @Override + public Future connect(Context context, PgConnectOptions options) { + return connectWithServerType(context); + } + + private Future connectWithServerType(Context context) { + int startIdx = roundRobinIdx.getAndUpdate(i -> (i + 1) % servers.size()); + switch (targetServerType) { + case PRIMARY: + return connectingRound(context, startIdx, 0, ServerType.PRIMARY); + case SECONDARY: + return connectingRound(context, startIdx, 0, ServerType.SECONDARY); + case PREFER_PRIMARY: + // First pass: try to find a primary; second pass: accept any server + return connectingRound(context, startIdx, 0, ServerType.PRIMARY) + .recover(err -> connectToAny(context, startIdx, 0)); + case PREFER_SECONDARY: + // First pass: try to find a secondary; second pass: accept any server + return connectingRound(context, startIdx, 0, ServerType.SECONDARY) + .recover(err -> connectToAny(context, startIdx, 0)); + case ANY: + default: + return connectToAny(context, startIdx, 0); + } + } + + /** + * Attempts to connect to any reachable server, starting at {@code startIdx} and trying + * each server in round-robin order exactly once. Returns the first successful connection + * regardless of server type. Fails if all servers are unreachable. + */ + private Future connectToAny(Context context, int startIdx, int attempt) { + if (attempt >= servers.size()) { + return Future.failedFuture("Could not connect to any server"); + } + int idx = (startIdx + attempt) % servers.size(); + PgConnectOptions opts = servers.get(idx); + return delegate.connect(context, opts) + .recover(err -> connectToAny(context, startIdx, attempt + 1)); + } + + /** + * Recursively attempts connections to servers in round-robin order, trying each server + * in the list exactly once (at most {@code servers.size()} attempts total). + * + *

For each attempt, the method looks up the server at index + * {@code (startIdx + attempt) % servers.size()} and first checks the type cache: + *

    + *
  • Cached as different type than desired: skip this server entirely, + * no TCP connection is made.
  • + *
  • Cached as desired type, or not cached: connect and verify.
  • + *
+ * + *

After connecting and detecting the actual server type, the method decides: + *

    + *
  • Match: return the connection immediately.
  • + *
  • Mismatch: close this connection and continue to the next server.
  • + *
  • Connection failure: skip this server and continue to the next.
  • + *
+ * + *

When all servers have been tried without finding the desired type, fail with + * "Could not find a server of type ...". For PREFER_* modes, the caller recovers from + * this failure by falling back to {@link #connectToAny}. + * + *

Cache staleness after failover: The cache may become stale when a primary is + * demoted to replica or a replica is promoted to primary. This is safe because the cache + * is only used to skip TCP connections, never to trust the server type without verification. + * Specifically: + *

    + *
  • Cached as desired type, actually changed: always re-verified via + * {@link #detectServerType} after connecting; if the actual type changed, the cache + * is updated and the connection is treated as a mismatch.
  • + *
  • Cached as different type than desired, actually promoted: skipped with no + * TCP connection. In the worst case this causes one extra round of connection attempts + * before the cache self-corrects: the first call after failover connects to the + * now-stale "desired" server, discovers the mismatch, updates the cache, and tries + * the next server which is the newly promoted one.
  • + *
  • All servers cached as different type than desired: all servers are skipped + * by cache, the method retries from {@code startIdx} with cache disabled to force + * re-verification. This guarantees progress after a failover where all cached types + * become stale.
  • + *
+ * + * @param context the Vert.x context + * @param startIdx the starting index in the server list (from round-robin counter) + * @param attempt current attempt number (0-based), incremented on each recursive call + * @param desired the desired server type (PRIMARY or SECONDARY) + * @return a future resolved with a matching connection, or failed + */ + private Future connectingRound(Context context, int startIdx, int attempt, + ServerType desired) { + return connectingRound(context, startIdx, attempt, desired, true); + } + + private Future connectingRound(Context context, int startIdx, int attempt, + ServerType desired, boolean useCache) { + if (attempt >= servers.size()) { + if (useCache) { + // All servers may have been skipped by stale cache; retry ignoring cache + return connectingRound(context, startIdx, 0, desired, false); + } + return Future.failedFuture("Could not find a server of type " + desired); + } + int idx = (startIdx + attempt) % servers.size(); + PgConnectOptions opts = servers.get(idx); + if (useCache) { + ServerType cachedType = cachedTypes[idx]; + if (cachedType != ServerType.UNDEFINED && cachedType != desired) { + return connectingRound(context, startIdx, attempt + 1, desired, true); + } + } + return delegate.connect(context, opts).compose( + conn -> detectServerType(conn).compose( + detectedType -> { + cachedTypes[idx] = detectedType; + if (detectedType == desired) { + return Future.succeededFuture(conn); + } else { + return closeConnection(conn) + .transform(v -> connectingRound(context, startIdx, attempt + 1, desired, useCache)); + } + }, + err -> closeConnection(conn) + .transform(v -> connectingRound(context, startIdx, attempt + 1, desired, useCache))), + err -> connectingRound(context, startIdx, attempt + 1, desired, useCache) + ); + } + + /** + * Detects the server type of an established connection. + * For PG 14+, the {@code in_hot_standby} parameter is reported during startup. + * For older versions, we fall back to querying {@code SHOW transaction_read_only}. + */ + private Future detectServerType(Connection conn) { + PgSocketConnection pgConn = (PgSocketConnection) conn; + if (pgConn.serverType != ServerType.UNDEFINED) { + return Future.succeededFuture(pgConn.serverType); + } + // PG < 14: query transaction_read_only + return queryTransactionReadOnly(pgConn); + } + + /** + * Fallback for PG < 14 where {@code in_hot_standby} is not reported as a GUC_REPORT parameter + * during the startup handshake. PG 14+ sends it automatically, which is handled in + * {@link io.vertx.pgclient.impl.codec.InitPgCommandMessage#handleParameterStatus} and stored on + * {@link PgSocketConnection#serverType} before this method is ever reached. This method is only + * called when {@code serverType} is still {@link ServerType#UNDEFINED} after connect. + */ + private Future queryTransactionReadOnly(PgSocketConnection conn) { + Promise promise = Promise.promise(); + Collector collector = Collector.of( + () -> new String[1], + (acc, row) -> acc[0] = row.getString(0), + (a, b) -> a, + acc -> acc[0] + ); + QueryResultHandler resultHandler = new QueryResultHandler<>() { + @Override + public void addProperty(PropertyKind property, V v) { + } + @Override + public void handleResult(int updatedCount, int size, RowDescriptorBase desc, String result, Throwable failure) { + if (failure != null) { + promise.tryFail(failure); + } else { + ServerType type = "on".equalsIgnoreCase(result) ? ServerType.SECONDARY : ServerType.PRIMARY; + conn.serverType = type; + promise.tryComplete(type); + } + } + }; + SimpleQueryCommand cmd = new SimpleQueryCommand<>( + "SHOW transaction_read_only", + true, + false, + collector, + resultHandler + ); + conn.schedule(cmd, (res, err) -> { + if (err != null) { + promise.tryFail(err); + } + // On success, handleResult was already called before this completable fires + }); + return promise.future(); + } + + private Future closeConnection(Connection conn) { + Promise promise = Promise.promise(); + conn.close(null, promise); + return promise.future(); + } + + @Override + public void close(Completable promise) { + delegate.close(promise); + } + + public PgConnectionFactory getDelegate() { + return delegate; + } +} diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitPgCommandMessage.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitPgCommandMessage.java index 4f1c11bce5..cc3591f1ab 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitPgCommandMessage.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitPgCommandMessage.java @@ -23,6 +23,7 @@ import io.vertx.core.VertxException; import io.vertx.pgclient.impl.PgDatabaseMetadata; import io.vertx.pgclient.impl.PgSocketConnection; +import io.vertx.pgclient.impl.ServerType; import io.vertx.pgclient.impl.auth.scram.ScramAuthentication; import io.vertx.pgclient.impl.auth.scram.ScramSession; import io.vertx.sqlclient.spi.connection.Connection; @@ -95,6 +96,10 @@ public void handleParameterStatus(String key, String value) { if(key.equals("server_version")) { ((PgSocketConnection)cmd.connection()).dbMetaData = new PgDatabaseMetadata(value); } + if(key.equals("in_hot_standby")) { + ((PgSocketConnection)cmd.connection()).serverType = + "on".equalsIgnoreCase(value) ? ServerType.SECONDARY : ServerType.PRIMARY; + } } @Override diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/spi/PgDriver.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/spi/PgDriver.java index a0343a53b4..464cdffc67 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/spi/PgDriver.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/spi/PgDriver.java @@ -9,10 +9,12 @@ import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.VertxInternal; import io.vertx.pgclient.PgConnectOptions; +import io.vertx.pgclient.TargetServerType; import io.vertx.pgclient.impl.PgConnectionFactory; import io.vertx.pgclient.impl.PgConnectionImpl; import io.vertx.pgclient.impl.PgConnectionUriParser; import io.vertx.pgclient.impl.PgPoolOptions; +import io.vertx.pgclient.impl.ServerTypeAwarePgConnectionFactory; import io.vertx.sqlclient.Pool; import io.vertx.sqlclient.PoolOptions; import io.vertx.sqlclient.SqlConnectOptions; @@ -38,9 +40,23 @@ public PgDriver() { @Override protected Pool newPool(VertxInternal vertx, Handler connectHandler, Supplier> databases, PoolOptions poolOptions, NetClientOptions transportOptions, CloseFuture closeFuture) { boolean pipelinedPool = poolOptions instanceof PgPoolOptions && ((PgPoolOptions) poolOptions).isPipelined(); - ConnectionFactory factory = createConnectionFactory(vertx, transportOptions); + PgConnectionFactory baseFactory = (PgConnectionFactory) createConnectionFactory(vertx, transportOptions); + ConnectionFactory factory = baseFactory; + Supplier> effectiveDatabases = databases; + + if (poolOptions instanceof PgPoolOptions) { + PgPoolOptions pgOpts = (PgPoolOptions) poolOptions; + if (pgOpts.getTargetServerType() != null + && pgOpts.getTargetServerType() != TargetServerType.ANY + && pgOpts.getServers() != null && !pgOpts.getServers().isEmpty()) { + factory = new ServerTypeAwarePgConnectionFactory(baseFactory, pgOpts.getServers(), pgOpts.getTargetServerType()); + // The wrapper ignores the supplier; provide a dummy that returns the first server + effectiveDatabases = () -> Future.succeededFuture(pgOpts.getServers().get(0)); + } + } + PoolImpl pool = new PoolImpl(vertx, this, pipelinedPool, poolOptions, null, null, - factory, databases, connectHandler, this::wrapConnection, closeFuture); + factory, effectiveDatabases, connectHandler, this::wrapConnection, closeFuture); pool.init(); closeFuture.add(factory); return pool; @@ -75,6 +91,12 @@ public int appendQueryPlaceholder(StringBuilder queryBuilder, int index, int cur @Override public SqlConnectionInternal wrapConnection(ContextInternal context, ConnectionFactory factory, Connection connection) { - return new PgConnectionImpl((PgConnectionFactory) factory, context, connection); + PgConnectionFactory pgFactory; + if (factory instanceof ServerTypeAwarePgConnectionFactory) { + pgFactory = ((ServerTypeAwarePgConnectionFactory) factory).getDelegate(); + } else { + pgFactory = (PgConnectionFactory) factory; + } + return new PgConnectionImpl(pgFactory, context, connection); } } diff --git a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgTargetServerTypeTest.java b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgTargetServerTypeTest.java new file mode 100644 index 0000000000..f035fc3355 --- /dev/null +++ b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PgTargetServerTypeTest.java @@ -0,0 +1,296 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.vertx.tests.pgclient; + +import io.vertx.core.Vertx; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.pgclient.PgBuilder; +import io.vertx.pgclient.PgConnectOptions; +import io.vertx.pgclient.TargetServerType; +import io.vertx.pgclient.impl.PgPoolOptions; +import io.vertx.tests.pgclient.junit.ContainerPgRule; +import io.vertx.tests.sqlclient.ProxyServer; +import io.vertx.sqlclient.Pool; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; + +@RunWith(VertxUnitRunner.class) +public class PgTargetServerTypeTest { + + @ClassRule + public static ContainerPgRule rule = new ContainerPgRule(); + + private Vertx vertx; + private PgConnectOptions options; + + @Before + public void setup() { + vertx = Vertx.vertx(); + options = rule.options(); + } + + @After + public void tearDown(TestContext ctx) { + vertx.close().onComplete(ctx.asyncAssertSuccess()); + } + + private Pool createPool(TargetServerType targetServerType) { + PgPoolOptions poolOptions = new PgPoolOptions(); + poolOptions.setMaxSize(1); + poolOptions.setTargetServerType(targetServerType); + poolOptions.setServers(Collections.singletonList(options)); + return PgBuilder.pool(b -> b + .with(poolOptions) + .connectingTo(options) + .using(vertx)); + } + + @Test + public void testTargetAny(TestContext ctx) { + Async async = ctx.async(); + Pool pool = createPool(TargetServerType.ANY); + pool.query("SELECT 1") + .execute() + .onComplete(ctx.asyncAssertSuccess(rs -> { + ctx.assertEquals(1, rs.size()); + pool.close().onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } + + @Test + public void testTargetPrimary(TestContext ctx) { + // A single PG container is always a primary + Async async = ctx.async(); + Pool pool = createPool(TargetServerType.PRIMARY); + pool.query("SELECT 1") + .execute() + .onComplete(ctx.asyncAssertSuccess(rs -> { + ctx.assertEquals(1, rs.size()); + pool.close().onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } + + @Test + public void testTargetSecondaryFailsOnPrimary(TestContext ctx) { + // A single PG container is a primary, so requesting SECONDARY should fail + Async async = ctx.async(); + Pool pool = createPool(TargetServerType.SECONDARY); + pool.query("SELECT 1") + .execute() + .onComplete(ctx.asyncAssertFailure(err -> { + ctx.assertTrue(err.getMessage().contains("Could not find a server of type SECONDARY"), + "Expected error about SECONDARY, got: " + err.getMessage()); + pool.close().onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } + + @Test + public void testTargetPreferSecondaryFallsToPrimary(TestContext ctx) { + // PREFER_SECONDARY against a primary: the strict pass should connect (probing for + // secondary), find PRIMARY, close, then fall back via connectToAny. This means at + // least 2 TCP connections through the proxy: one probed and closed, one kept. + Async async = ctx.async(); + AtomicInteger connectionCount = new AtomicInteger(); + ProxyServer proxy = ProxyServer.create(vertx, options.getPort(), options.getHost()); + proxy.proxyHandler(conn -> { + connectionCount.incrementAndGet(); + conn.connect(); + }); + proxy.listen(8080, "localhost", ctx.asyncAssertSuccess(v -> { + PgConnectOptions proxyOpts = new PgConnectOptions(options) + .setHost("localhost") + .setPort(8080); + PgPoolOptions poolOptions = new PgPoolOptions(); + poolOptions.setMaxSize(1); + poolOptions.setTargetServerType(TargetServerType.PREFER_SECONDARY); + poolOptions.setServers(Collections.singletonList(proxyOpts)); + Pool pool = PgBuilder.pool(b -> b + .with(poolOptions) + .connectingTo(proxyOpts) + .using(vertx)); + pool.query("SELECT 1") + .execute() + .onComplete(ctx.asyncAssertSuccess(rs -> { + ctx.assertEquals(1, rs.size()); + // At least 2: one from the strict pass (probed, found PRIMARY, closed) + // and one from the connectToAny fallback (kept) + ctx.assertTrue(connectionCount.get() >= 2, + "Expected at least 2 connections (probe + fallback), got: " + connectionCount.get()); + pool.close().onComplete(ctx.asyncAssertSuccess(v2 -> async.complete())); + })); + })); + } + + @Test + public void testTargetPreferPrimary(TestContext ctx) { + // PREFER_PRIMARY should connect to primary directly + Async async = ctx.async(); + Pool pool = createPool(TargetServerType.PREFER_PRIMARY); + pool.query("SELECT 1") + .execute() + .onComplete(ctx.asyncAssertSuccess(rs -> { + ctx.assertEquals(1, rs.size()); + pool.close().onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } + + @Test + public void testMultipleServersTargetPrimary(TestContext ctx) { + // Use the same server twice to simulate multi-host. + // connectingTo uses a refused address to prove the pool uses the servers list, + // not the connectingTo address (ServerTypeAwarePgConnectionFactory ignores it). + Async async = ctx.async(); + PgConnectOptions refused = new PgConnectOptions(options).setPort(1); + PgPoolOptions poolOptions = new PgPoolOptions(); + poolOptions.setMaxSize(1); + poolOptions.setTargetServerType(TargetServerType.PRIMARY); + poolOptions.setServers(Arrays.asList(options, options)); + Pool pool = PgBuilder.pool(b -> b + .with(poolOptions) + .connectingTo(refused) + .using(vertx)); + pool.query("SELECT 1") + .execute() + .onComplete(ctx.asyncAssertSuccess(rs -> { + ctx.assertEquals(1, rs.size()); + pool.close().onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } + + @Test + public void testMultipleServersWithRefusedHost(TestContext ctx) { + // First server refuses connection (wrong port), second is the real primary. + // connectingTo uses a refused address to prove the pool uses the servers list. + Async async = ctx.async(); + PgConnectOptions refused = new PgConnectOptions(options).setPort(1); + PgPoolOptions poolOptions = new PgPoolOptions(); + poolOptions.setMaxSize(1); + poolOptions.setTargetServerType(TargetServerType.PRIMARY); + poolOptions.setServers(Arrays.asList(refused, options)); + Pool pool = PgBuilder.pool(b -> b + .with(poolOptions) + .connectingTo(refused) + .using(vertx)); + pool.query("SELECT 1") + .execute() + .onComplete(ctx.asyncAssertSuccess(rs -> { + ctx.assertEquals(1, rs.size()); + pool.close().onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } + + @Test + public void testPreferSecondaryFallsBackThroughRefusedHosts(TestContext ctx) { + // PREFER_SECONDARY with [refused, primary, refused]: + // Strict pass: refused (skip), primary (wrong type, closed), refused (skip) → fails + // Fallback: refused (skip), primary → succeeds + Async async = ctx.async(); + PgConnectOptions refused = new PgConnectOptions(options).setPort(1); + PgPoolOptions poolOptions = new PgPoolOptions(); + poolOptions.setMaxSize(1); + poolOptions.setTargetServerType(TargetServerType.PREFER_SECONDARY); + poolOptions.setServers(Arrays.asList(refused, options, refused)); + Pool pool = PgBuilder.pool(b -> b + .with(poolOptions) + .connectingTo(refused) + .using(vertx)); + pool.query("SELECT 1") + .execute() + .onComplete(ctx.asyncAssertSuccess(rs -> { + ctx.assertEquals(1, rs.size()); + pool.close().onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } + + @Test + public void testStaleCacheDoesNotPreventProgress(TestContext ctx) { + // Two entries for the same primary. After the first query, both are cached as PRIMARY. + // On the second query, the strict pass (desired=SECONDARY) skips both via cache, + // then retries with cache disabled, connects, finds PRIMARY, fails, + // and falls back to connectToAny. + Async async = ctx.async(); + PgPoolOptions poolOptions = new PgPoolOptions(); + poolOptions.setMaxSize(1); + poolOptions.setTargetServerType(TargetServerType.PREFER_SECONDARY); + poolOptions.setServers(Arrays.asList(options, options)); + PgConnectOptions refused = new PgConnectOptions(options).setPort(1); + Pool pool = PgBuilder.pool(b -> b + .with(poolOptions) + .connectingTo(refused) + .using(vertx)); + pool.query("SELECT 1") + .execute() + .onComplete(ctx.asyncAssertSuccess(rs1 -> { + // Both entries now cached as PRIMARY; second query exercises stale-cache retry + pool.query("SELECT 1") + .execute() + .onComplete(ctx.asyncAssertSuccess(rs2 -> { + ctx.assertEquals(1, rs2.size()); + pool.close().onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + })); + } + + @Test + public void testStrictModeWithAllCachedWrongTypeRetries(TestContext ctx) { + // Two entries for the same primary. Strict SECONDARY fails on first query (caches both + // as PRIMARY). Second query skips both via cache, retries with cache disabled, connects, + // finds PRIMARY, correctly fails with the same error. + Async async = ctx.async(); + PgPoolOptions poolOptions = new PgPoolOptions(); + poolOptions.setMaxSize(1); + poolOptions.setTargetServerType(TargetServerType.SECONDARY); + poolOptions.setServers(Arrays.asList(options, options)); + PgConnectOptions refused = new PgConnectOptions(options).setPort(1); + Pool pool = PgBuilder.pool(b -> b + .with(poolOptions) + .connectingTo(refused) + .using(vertx)); + pool.query("SELECT 1") + .execute() + .onComplete(ctx.asyncAssertFailure(err1 -> { + // Both cached as PRIMARY; second query exercises cache-disabled retry path + pool.query("SELECT 1") + .execute() + .onComplete(ctx.asyncAssertFailure(err2 -> { + ctx.assertTrue(err2.getMessage().contains("Could not find a server of type SECONDARY"), + "Expected error about SECONDARY, got: " + err2.getMessage()); + pool.close().onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + })); + } + + @Test + public void testDetectServerTypeViaShowQuery(TestContext ctx) { + // Verify that SHOW transaction_read_only returns 'off' for a primary + Async async = ctx.async(); + Pool pool = createPool(TargetServerType.PRIMARY); + pool.query("SHOW transaction_read_only") + .execute() + .onComplete(ctx.asyncAssertSuccess(rs -> { + ctx.assertEquals(1, rs.size()); + ctx.assertEquals("off", rs.iterator().next().getString(0)); + pool.close().onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } +} diff --git a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/ServerTypeAwareConnectionFactoryTest.java b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/ServerTypeAwareConnectionFactoryTest.java new file mode 100644 index 0000000000..9683ceb98f --- /dev/null +++ b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/ServerTypeAwareConnectionFactoryTest.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.vertx.tests.pgclient; + +import io.vertx.pgclient.PgConnectOptions; +import io.vertx.pgclient.TargetServerType; +import io.vertx.pgclient.impl.PgPoolOptions; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.*; + +public class ServerTypeAwareConnectionFactoryTest { + + @Test + public void testPgPoolOptionsCopyConstructor() { + PgPoolOptions original = new PgPoolOptions(); + original.setPipelined(true); + original.setTargetServerType(TargetServerType.PREFER_SECONDARY); + List servers = Arrays.asList( + new PgConnectOptions().setHost("host1").setPort(5432) + ); + original.setServers(servers); + PgPoolOptions copy = new PgPoolOptions(original); + assertTrue(copy.isPipelined()); + assertEquals(TargetServerType.PREFER_SECONDARY, copy.getTargetServerType()); + assertEquals(servers, copy.getServers()); + } + + @Test + public void testPgPoolOptionsCopyFromPlainPoolOptions() { + io.vertx.sqlclient.PoolOptions plain = new io.vertx.sqlclient.PoolOptions().setMaxSize(10); + PgPoolOptions copy = new PgPoolOptions(plain); + assertEquals(10, copy.getMaxSize()); + assertFalse(copy.isPipelined()); + assertEquals(TargetServerType.ANY, copy.getTargetServerType()); + assertNull(copy.getServers()); + } +}