diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/AstraCqlRetryPolicy.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/AstraCqlRetryPolicy.java new file mode 100644 index 0000000000..c21fa11f62 --- /dev/null +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/AstraCqlRetryPolicy.java @@ -0,0 +1,30 @@ +package io.stargate.sgv2.jsonapi.service.cqldriver; + +import com.datastax.oss.driver.api.core.context.DriverContext; +import com.datastax.oss.driver.api.core.retry.RetryDecision; + +/** + * In Astra, CQL router will be used and retry on the next node will fail. So, the decision will be + * to retry on the same node. + */ +public class AstraCqlRetryPolicy extends BaseCqlRetryPolicy { + + public AstraCqlRetryPolicy(DriverContext context, String profileName) { + super(context, profileName); + } + + @Override + protected RetryDecision retryDecisionForUnavailable() { + return RetryDecision.RETRY_SAME; + } + + @Override + protected RetryDecision retryDecisionForRequestAborted() { + return RetryDecision.RETRY_SAME; + } + + @Override + protected RetryDecision retryDecisionForErrorResponse() { + return RetryDecision.RETRY_SAME; + } +} diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/BaseCqlRetryPolicy.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/BaseCqlRetryPolicy.java new file mode 100644 index 0000000000..ad7197eea0 --- /dev/null +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/BaseCqlRetryPolicy.java @@ -0,0 +1,267 @@ +package io.stargate.sgv2.jsonapi.service.cqldriver; + +import com.datastax.oss.driver.api.core.ConsistencyLevel; +import com.datastax.oss.driver.api.core.connection.ClosedConnectionException; +import com.datastax.oss.driver.api.core.connection.HeartbeatException; +import com.datastax.oss.driver.api.core.context.DriverContext; +import com.datastax.oss.driver.api.core.retry.RetryDecision; +import com.datastax.oss.driver.api.core.retry.RetryPolicy; +import com.datastax.oss.driver.api.core.retry.RetryVerdict; +import com.datastax.oss.driver.api.core.servererrors.*; +import com.datastax.oss.driver.api.core.session.Request; +import com.datastax.oss.driver.internal.core.retry.DefaultRetryPolicy; +import edu.umd.cs.findbugs.annotations.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Custom retry policy tailored for DataAPI, providing distinct retry logic for specific scenarios + * compared to {@link DefaultRetryPolicy}. Logs are recorded at the INFO level instead of TRACE + * level, aligning with DataAPI logging standards. + */ +public class BaseCqlRetryPolicy implements RetryPolicy { + private static final Logger LOG = LoggerFactory.getLogger(BaseCqlRetryPolicy.class); + private static final int MAX_RETRIES = Integer.getInteger("stargate.cql_proxy.max_retries", 3); + + public BaseCqlRetryPolicy(DriverContext context, String profileName) {} + + /** + * {@inheritDoc} + * + *
This implementation triggers a maximum of {@code MAX_RETRIES} retry (to the same node), and + * only if enough replicas had responded to the read request but data was not retrieved amongst + * those. That usually means that enough replicas are alive to satisfy the consistency, but the + * coordinator picked a dead one for data retrieval, not having detected that replica as dead yet. + * The reasoning is that by the time we get the timeout, the dead replica will likely have been + * detected as dead and the retry has a high chance of success. Otherwise, the exception is + * rethrown. + */ + @Override + public RetryVerdict onReadTimeoutVerdict( + @NonNull Request request, + @NonNull ConsistencyLevel cl, + int blockFor, + int received, + boolean dataPresent, + int retryCount) { + + RetryDecision retryDecision = + (retryCount < MAX_RETRIES && received >= blockFor && !dataPresent) + ? RetryDecision.RETRY_SAME + : RetryDecision.RETHROW; + + if (LOG.isInfoEnabled()) { + LOG.info( + "Retrying on read timeout on same host (consistency: {}, required responses: {}, received responses: {}, data retrieved: {}, retries: {}, retry decision: {})", + cl, + blockFor, + received, + dataPresent, + retryCount, + retryDecision); + } + + return () -> retryDecision; + } + + /** + * {@inheritDoc} + * + *
This implementation triggers a maximum of {@code MAX_RETRIES} retries (to the same node), + * and only for {@code WriteType.CAS} and {@code WriteType.SIMPLE} write. The reasoning is that + * collections use lightweight transactions, the write type is either CAS or SIMPLE and tables use + * SIMPLE for writes. + */ + @Override + public RetryVerdict onWriteTimeoutVerdict( + @NonNull Request request, + @NonNull ConsistencyLevel cl, + @NonNull WriteType writeType, + int blockFor, + int received, + int retryCount) { + + final RetryDecision retryDecision = + (retryCount < MAX_RETRIES && (writeType == WriteType.CAS || writeType == WriteType.SIMPLE)) + ? RetryDecision.RETRY_SAME + : RetryDecision.RETHROW; + + if (LOG.isInfoEnabled()) { + LOG.info( + "Retrying on write timeout on same host (consistency: {}, write type: {}, required acknowledgments: {}, received acknowledgments: {}, retries: {}, retry decision: {})", + cl, + writeType, + blockFor, + received, + retryCount, + retryDecision); + } + + return () -> retryDecision; + } + + /** + * {@inheritDoc} + * + *
This implementation triggers a maximum of {@code MAX_RETRIES} retry, to the next node in the + * query plan. The rationale is that the first coordinator might have been network-isolated from + * all other nodes (thinking they're down), but still able to communicate with the client; in that + * case, retrying on the same host has almost no chance of success, but moving to the next host + * might solve the issue. + * + *
Note: In Astra, CQL router will be used and retry on the next node will fail. So, the + * decision will be to retry on the same node. + */ + @Override + public RetryVerdict onUnavailableVerdict( + @NonNull Request request, + @NonNull ConsistencyLevel cl, + int required, + int alive, + int retryCount) { + + RetryDecision retryDecision = + (retryCount < MAX_RETRIES) ? retryDecisionForUnavailable() : RetryDecision.RETHROW; + + if (LOG.isInfoEnabled()) { + LOG.info( + "Retrying on unavailable exception on next host (consistency: {}, required replica: {}, alive replica: {}, retries: {}, retry decision: {})", + cl, + required, + alive, + retryCount, + retryDecision); + } + + return () -> retryDecision; + } + + /** + * {@inheritDoc} + * + *
This implementation retries on the next node if the connection was closed, and rethrows + * (assuming a driver bug) in all other cases. + * + *
Note: In Astra, CQL router will be used and retry on the next node will fail. So, the + * decision will be to retry on the same node. + */ + @Override + public RetryVerdict onRequestAbortedVerdict( + @NonNull Request request, @NonNull Throwable error, int retryCount) { + + RetryDecision retryDecision = + (retryCount < MAX_RETRIES + && (error instanceof ClosedConnectionException + || error instanceof HeartbeatException)) + ? retryDecisionForRequestAborted() + : RetryDecision.RETHROW; + + if (LOG.isInfoEnabled()) { + LOG.info( + "Retrying on aborted request on next host (retries: {}, error: {}, retry decision: {})", + retryCount, + error, + retryDecision); + } + + return () -> retryDecision; + } + + /** + * {@inheritDoc} + * + *
This implementation rethrows read and write failures, and retries other errors on the next + * node. + * + *
Note: In Astra, CQL router will be used and retry on the next node will fail. So, the
+ * decision will be to retry on the same node.
+ */
+ @Override
+ public RetryVerdict onErrorResponseVerdict(
+ @NonNull Request request, @NonNull CoordinatorException error, int retryCount) {
+
+ // Issue1830: CASWriteUnknownException and TruncateException have been included in the default
+ // case.
+ var retryDecision =
+ switch (error) {
+ case ReadFailureException e -> RetryDecision.RETHROW;
+ case WriteFailureException e -> RetryDecision.RETHROW;
+ default ->
+ (retryCount < MAX_RETRIES) ? retryDecisionForErrorResponse() : RetryDecision.RETHROW;
+ };
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "Retrying on node error on next host (retries: {}, error: {}, retry decision: {})",
+ retryCount,
+ error,
+ retryDecision);
+ }
+
+ return () -> retryDecision;
+ }
+
+ @Override
+ public void close() {}
+
+ protected RetryDecision retryDecisionForUnavailable() {
+ return RetryDecision.RETRY_NEXT;
+ }
+
+ protected RetryDecision retryDecisionForRequestAborted() {
+ return RetryDecision.RETRY_NEXT;
+ }
+
+ protected RetryDecision retryDecisionForErrorResponse() {
+ return RetryDecision.RETRY_NEXT;
+ }
+
+ @Override
+ @Deprecated
+ public RetryDecision onReadTimeout(
+ @NonNull Request request,
+ @NonNull ConsistencyLevel cl,
+ int blockFor,
+ int received,
+ boolean dataPresent,
+ int retryCount) {
+ throw new UnsupportedOperationException("onReadTimeout");
+ }
+
+ @Override
+ @Deprecated
+ public RetryDecision onWriteTimeout(
+ @NonNull Request request,
+ @NonNull ConsistencyLevel cl,
+ @NonNull WriteType writeType,
+ int blockFor,
+ int received,
+ int retryCount) {
+ throw new UnsupportedOperationException("onWriteTimeout");
+ }
+
+ @Override
+ @Deprecated
+ public RetryDecision onUnavailable(
+ @NonNull Request request,
+ @NonNull ConsistencyLevel cl,
+ int required,
+ int alive,
+ int retryCount) {
+ throw new UnsupportedOperationException("onUnavailable");
+ }
+
+ @Override
+ @Deprecated
+ public RetryDecision onRequestAborted(
+ @NonNull Request request, @NonNull Throwable error, int retryCount) {
+ throw new UnsupportedOperationException("onRequestAborted");
+ }
+
+ @Override
+ @Deprecated
+ public RetryDecision onErrorResponse(
+ @NonNull Request request, @NonNull CoordinatorException error, int retryCount) {
+ throw new UnsupportedOperationException("onErrorResponse");
+ }
+}
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlProxyRetryPolicy.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlProxyRetryPolicy.java
deleted file mode 100644
index 9fdd745e8e..0000000000
--- a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlProxyRetryPolicy.java
+++ /dev/null
@@ -1,150 +0,0 @@
-package io.stargate.sgv2.jsonapi.service.cqldriver;
-
-import static com.datastax.oss.driver.internal.core.retry.DefaultRetryPolicy.RETRYING_ON_UNAVAILABLE;
-
-import com.datastax.oss.driver.api.core.ConsistencyLevel;
-import com.datastax.oss.driver.api.core.context.DriverContext;
-import com.datastax.oss.driver.api.core.retry.RetryDecision;
-import com.datastax.oss.driver.api.core.retry.RetryPolicy;
-import com.datastax.oss.driver.api.core.retry.RetryVerdict;
-import com.datastax.oss.driver.api.core.servererrors.CoordinatorException;
-import com.datastax.oss.driver.api.core.servererrors.WriteType;
-import com.datastax.oss.driver.api.core.session.Request;
-import edu.umd.cs.findbugs.annotations.NonNull;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This retry policy while executing cql statements is useful when the requests are delegated to a
- * proxy layer which is responsible for retrying the requests. This policy will only retry once if
- * the intermediate layer is unavailable for some reason. In all other cases, this will rethrow the
- * exception to avoid retrying the requests at the driver level.
- */
-public class CqlProxyRetryPolicy implements RetryPolicy {
- private static final Logger LOG = LoggerFactory.getLogger(CqlProxyRetryPolicy.class);
- private final String logPrefix;
-
- private static final int MAX_RETRIES = Integer.getInteger("stargate.cql_proxy.max_retries", 3);
-
- public CqlProxyRetryPolicy(DriverContext context, String profileName) {
- this.logPrefix = (context != null ? context.getSessionName() : null) + "|" + profileName;
- }
-
- @Override
- public RetryDecision onReadTimeout(
- @NonNull Request request,
- @NonNull ConsistencyLevel cl,
- int blockFor,
- int received,
- boolean dataPresent,
- int retryCount) {
- return RetryDecision.RETHROW;
- }
-
- @Override
- public RetryVerdict onReadTimeoutVerdict(
- @NonNull Request request,
- @NonNull ConsistencyLevel cl,
- int blockFor,
- int received,
- boolean dataPresent,
- int retryCount) {
- RetryDecision retryDecision =
- onReadTimeout(request, cl, blockFor, received, dataPresent, retryCount);
- return () -> retryDecision;
- }
-
- @Override
- public RetryDecision onWriteTimeout(
- @NonNull Request request,
- @NonNull ConsistencyLevel cl,
- @NonNull WriteType writeType,
- int blockFor,
- int received,
- int retryCount) {
- var retryDecision = RetryDecision.RETHROW;
- if (retryCount < MAX_RETRIES && writeType == WriteType.CAS) {
- retryDecision = RetryDecision.RETRY_SAME;
- }
- if (LOG.isInfoEnabled()) {
- LOG.info(
- "Write timeout for request writeType : {}, retryCount: {}, consistency level: {} -> retry decision is: {}",
- writeType,
- retryCount,
- cl,
- retryDecision);
- }
- return retryDecision;
- }
-
- @Override
- public RetryVerdict onWriteTimeoutVerdict(
- @NonNull Request request,
- @NonNull ConsistencyLevel cl,
- @NonNull WriteType writeType,
- int blockFor,
- int received,
- int retryCount) {
- RetryDecision retryDecision =
- onWriteTimeout(request, cl, writeType, blockFor, received, retryCount);
- return () -> retryDecision;
- }
-
- @Override
- public RetryDecision onUnavailable(
- @NonNull Request request,
- @NonNull ConsistencyLevel cl,
- int required,
- int alive,
- int retryCount) {
- RetryDecision decision = (retryCount == 0) ? RetryDecision.RETRY_SAME : RetryDecision.RETHROW;
-
- if (decision == RetryDecision.RETRY_SAME && LOG.isTraceEnabled()) {
- LOG.trace(RETRYING_ON_UNAVAILABLE, logPrefix, cl, required, alive, retryCount);
- }
-
- return decision;
- }
-
- @Override
- public RetryVerdict onUnavailableVerdict(
- @NonNull Request request,
- @NonNull ConsistencyLevel cl,
- int required,
- int alive,
- int retryCount) {
- RetryDecision retryDecision = onUnavailable(request, cl, required, alive, retryCount);
- return () -> retryDecision;
- }
-
- @Override
- public RetryDecision onRequestAborted(
- @NonNull Request request, @NonNull Throwable error, int retryCount) {
- return RetryDecision.RETHROW;
- }
-
- @Override
- public RetryVerdict onRequestAbortedVerdict(
- @NonNull Request request, @NonNull Throwable error, int retryCount) {
- RetryDecision retryDecision = onRequestAborted(request, error, retryCount);
- return () -> retryDecision;
- }
-
- @Override
- public RetryDecision onErrorResponse(
- @NonNull Request request, @NonNull CoordinatorException error, int retryCount) {
- return RetryDecision.RETHROW;
- }
-
- @Override
- public RetryVerdict onErrorResponseVerdict(
- @NonNull Request request, @NonNull CoordinatorException error, int retryCount) {
- RetryDecision retryDecision = onErrorResponse(request, error, retryCount);
- return () -> retryDecision;
- }
-
- @Override
- public void close() {
- // nothing to do
- }
-}
diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf
index 6826cc80a0..589dd0d0b1 100644
--- a/src/main/resources/application.conf
+++ b/src/main/resources/application.conf
@@ -3,7 +3,7 @@ datastax-java-driver {
version = V4
}
advanced.retry-policy {
- class = io.stargate.sgv2.jsonapi.service.cqldriver.CqlProxyRetryPolicy
+ class = io.stargate.sgv2.jsonapi.service.cqldriver.BaseCqlRetryPolicy
}
advanced.session-leak.threshold = 0
advanced.connection {
diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/AstraCqlRetryPolicyTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/AstraCqlRetryPolicyTest.java
new file mode 100644
index 0000000000..e73cd2fc70
--- /dev/null
+++ b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/AstraCqlRetryPolicyTest.java
@@ -0,0 +1,61 @@
+package io.stargate.sgv2.jsonapi.service.cqldriver.retry;
+
+import static com.datastax.oss.driver.api.core.DefaultConsistencyLevel.QUORUM;
+import static com.datastax.oss.driver.api.core.retry.RetryDecision.*;
+
+import com.datastax.oss.driver.api.core.connection.ClosedConnectionException;
+import com.datastax.oss.driver.api.core.connection.HeartbeatException;
+import com.datastax.oss.driver.api.core.servererrors.CASWriteUnknownException;
+import com.datastax.oss.driver.api.core.servererrors.ReadFailureException;
+import com.datastax.oss.driver.api.core.servererrors.TruncateException;
+import com.datastax.oss.driver.api.core.servererrors.WriteFailureException;
+import io.stargate.sgv2.jsonapi.service.cqldriver.AstraCqlRetryPolicy;
+import org.junit.Test;
+
+public class AstraCqlRetryPolicyTest extends RetryPolicyTestBase {
+
+ public AstraCqlRetryPolicyTest() {
+ super(new AstraCqlRetryPolicy(null, null));
+ }
+
+ @Test
+ public void shouldProcessUnavailable() {
+ assertOnUnavailable(QUORUM, 2, 1, 0).hasDecision(RETRY_SAME);
+ assertOnUnavailable(QUORUM, 2, 1, 1).hasDecision(RETRY_SAME);
+ assertOnUnavailable(QUORUM, 2, 1, 2).hasDecision(RETRY_SAME);
+ assertOnUnavailable(QUORUM, 2, 1, 3).hasDecision(RETHROW);
+ }
+
+ @Test
+ public void shouldProcessAbortedRequest() {
+ assertOnRequestAborted(ClosedConnectionException.class, 0).hasDecision(RETRY_SAME);
+ assertOnRequestAborted(ClosedConnectionException.class, 1).hasDecision(RETRY_SAME);
+ assertOnRequestAborted(ClosedConnectionException.class, 2).hasDecision(RETRY_SAME);
+ assertOnRequestAborted(ClosedConnectionException.class, 3).hasDecision(RETHROW);
+
+ assertOnRequestAborted(HeartbeatException.class, 0).hasDecision(RETRY_SAME);
+ assertOnRequestAborted(HeartbeatException.class, 1).hasDecision(RETRY_SAME);
+ assertOnRequestAborted(HeartbeatException.class, 2).hasDecision(RETRY_SAME);
+ assertOnRequestAborted(HeartbeatException.class, 3).hasDecision(RETHROW);
+
+ assertOnRequestAborted(Throwable.class, 0).hasDecision(RETHROW);
+ }
+
+ @Test
+ public void shouldProcessErrorResponse() {
+ // rethrow on ReadFailureException and WriteFailureException
+ assertOnErrorResponse(ReadFailureException.class, 0).hasDecision(RETHROW);
+ assertOnErrorResponse(WriteFailureException.class, 0).hasDecision(RETHROW);
+
+ // Issue1830 - retry 3 times on CASWriteUnknownException and TruncateException
+ assertOnErrorResponse(CASWriteUnknownException.class, 0).hasDecision(RETRY_SAME);
+ assertOnErrorResponse(CASWriteUnknownException.class, 1).hasDecision(RETRY_SAME);
+ assertOnErrorResponse(CASWriteUnknownException.class, 2).hasDecision(RETRY_SAME);
+ assertOnErrorResponse(CASWriteUnknownException.class, 3).hasDecision(RETHROW);
+
+ assertOnErrorResponse(TruncateException.class, 0).hasDecision(RETRY_SAME);
+ assertOnErrorResponse(TruncateException.class, 1).hasDecision(RETRY_SAME);
+ assertOnErrorResponse(TruncateException.class, 2).hasDecision(RETRY_SAME);
+ assertOnErrorResponse(TruncateException.class, 3).hasDecision(RETHROW);
+ }
+}
diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/BaseCqlRetryPolicyTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/BaseCqlRetryPolicyTest.java
new file mode 100644
index 0000000000..c7a3ff1d66
--- /dev/null
+++ b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/BaseCqlRetryPolicyTest.java
@@ -0,0 +1,87 @@
+package io.stargate.sgv2.jsonapi.service.cqldriver.retry;
+
+import static com.datastax.oss.driver.api.core.DefaultConsistencyLevel.QUORUM;
+import static com.datastax.oss.driver.api.core.retry.RetryDecision.RETHROW;
+import static com.datastax.oss.driver.api.core.retry.RetryDecision.RETRY_NEXT;
+import static com.datastax.oss.driver.api.core.retry.RetryDecision.RETRY_SAME;
+import static com.datastax.oss.driver.api.core.servererrors.DefaultWriteType.CAS;
+import static com.datastax.oss.driver.api.core.servererrors.DefaultWriteType.SIMPLE;
+
+import com.datastax.oss.driver.api.core.connection.ClosedConnectionException;
+import com.datastax.oss.driver.api.core.connection.HeartbeatException;
+import com.datastax.oss.driver.api.core.servererrors.*;
+import io.stargate.sgv2.jsonapi.service.cqldriver.BaseCqlRetryPolicy;
+import org.junit.Test;
+
+public class BaseCqlRetryPolicyTest extends RetryPolicyTestBase {
+
+ public BaseCqlRetryPolicyTest() {
+ super(new BaseCqlRetryPolicy(null, null));
+ }
+
+ @Test
+ public void shouldProcessReadTimeouts() {
+ assertOnReadTimeout(QUORUM, 2, 2, false, 0).hasDecision(RETRY_SAME);
+ assertOnReadTimeout(QUORUM, 2, 2, false, 1).hasDecision(RETRY_SAME);
+ assertOnReadTimeout(QUORUM, 2, 2, false, 2).hasDecision(RETRY_SAME);
+ assertOnReadTimeout(QUORUM, 2, 2, false, 3).hasDecision(RETHROW);
+
+ assertOnReadTimeout(QUORUM, 2, 2, true, 0).hasDecision(RETHROW);
+ assertOnReadTimeout(QUORUM, 2, 1, true, 0).hasDecision(RETHROW);
+ assertOnReadTimeout(QUORUM, 2, 1, false, 0).hasDecision(RETHROW);
+ }
+
+ @Test
+ public void shouldProcessWriteTimeouts() {
+ assertOnWriteTimeout(QUORUM, CAS, 2, 0, 0).hasDecision(RETRY_SAME);
+ assertOnWriteTimeout(QUORUM, CAS, 2, 0, 1).hasDecision(RETRY_SAME);
+ assertOnWriteTimeout(QUORUM, CAS, 2, 0, 2).hasDecision(RETRY_SAME);
+ assertOnWriteTimeout(QUORUM, CAS, 2, 0, 3).hasDecision(RETHROW);
+
+ assertOnWriteTimeout(QUORUM, SIMPLE, 2, 0, 0).hasDecision(RETRY_SAME);
+ assertOnWriteTimeout(QUORUM, SIMPLE, 2, 0, 1).hasDecision(RETRY_SAME);
+ assertOnWriteTimeout(QUORUM, SIMPLE, 2, 0, 2).hasDecision(RETRY_SAME);
+ assertOnWriteTimeout(QUORUM, SIMPLE, 2, 0, 3).hasDecision(RETHROW);
+ }
+
+ @Test
+ public void shouldProcessUnavailable() {
+ assertOnUnavailable(QUORUM, 2, 1, 0).hasDecision(RETRY_NEXT);
+ assertOnUnavailable(QUORUM, 2, 1, 1).hasDecision(RETRY_NEXT);
+ assertOnUnavailable(QUORUM, 2, 1, 2).hasDecision(RETRY_NEXT);
+ assertOnUnavailable(QUORUM, 2, 1, 3).hasDecision(RETHROW);
+ }
+
+ @Test
+ public void shouldProcessAbortedRequest() {
+ assertOnRequestAborted(ClosedConnectionException.class, 0).hasDecision(RETRY_NEXT);
+ assertOnRequestAborted(ClosedConnectionException.class, 1).hasDecision(RETRY_NEXT);
+ assertOnRequestAborted(ClosedConnectionException.class, 2).hasDecision(RETRY_NEXT);
+ assertOnRequestAborted(ClosedConnectionException.class, 3).hasDecision(RETHROW);
+
+ assertOnRequestAborted(HeartbeatException.class, 0).hasDecision(RETRY_NEXT);
+ assertOnRequestAborted(HeartbeatException.class, 1).hasDecision(RETRY_NEXT);
+ assertOnRequestAborted(HeartbeatException.class, 2).hasDecision(RETRY_NEXT);
+ assertOnRequestAborted(HeartbeatException.class, 3).hasDecision(RETHROW);
+
+ assertOnRequestAborted(Throwable.class, 0).hasDecision(RETHROW);
+ }
+
+ @Test
+ public void shouldProcessErrorResponse() {
+ // rethrow on ReadFailureException and WriteFailureException
+ assertOnErrorResponse(ReadFailureException.class, 0).hasDecision(RETHROW);
+ assertOnErrorResponse(WriteFailureException.class, 0).hasDecision(RETHROW);
+
+ // Issue1830 - retry 3 times on CASWriteUnknownException and TruncateException
+ assertOnErrorResponse(CASWriteUnknownException.class, 0).hasDecision(RETRY_NEXT);
+ assertOnErrorResponse(CASWriteUnknownException.class, 1).hasDecision(RETRY_NEXT);
+ assertOnErrorResponse(CASWriteUnknownException.class, 2).hasDecision(RETRY_NEXT);
+ assertOnErrorResponse(CASWriteUnknownException.class, 3).hasDecision(RETHROW);
+
+ assertOnErrorResponse(TruncateException.class, 0).hasDecision(RETRY_NEXT);
+ assertOnErrorResponse(TruncateException.class, 1).hasDecision(RETRY_NEXT);
+ assertOnErrorResponse(TruncateException.class, 2).hasDecision(RETRY_NEXT);
+ assertOnErrorResponse(TruncateException.class, 3).hasDecision(RETHROW);
+ }
+}
diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/RetryPolicyTestBase.java b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/RetryPolicyTestBase.java
new file mode 100644
index 0000000000..0fdda57840
--- /dev/null
+++ b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/RetryPolicyTestBase.java
@@ -0,0 +1,68 @@
+package io.stargate.sgv2.jsonapi.service.cqldriver.retry;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+import com.datastax.oss.driver.api.core.ConsistencyLevel;
+import com.datastax.oss.driver.api.core.retry.RetryDecision;
+import com.datastax.oss.driver.api.core.retry.RetryPolicy;
+import com.datastax.oss.driver.api.core.retry.RetryVerdict;
+import com.datastax.oss.driver.api.core.servererrors.CoordinatorException;
+import com.datastax.oss.driver.api.core.servererrors.WriteType;
+import com.datastax.oss.driver.api.core.session.Request;
+import org.assertj.core.api.AbstractAssert;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public abstract class RetryPolicyTestBase {
+ private final RetryPolicy policy;
+
+ @Mock private Request request;
+
+ protected RetryPolicyTestBase(RetryPolicy policy) {
+ this.policy = policy;
+ }
+
+ protected RetryVerdictAssert assertOnReadTimeout(
+ ConsistencyLevel cl, int blockFor, int received, boolean dataPresent, int retryCount) {
+ return new RetryVerdictAssert(
+ policy.onReadTimeoutVerdict(request, cl, blockFor, received, dataPresent, retryCount));
+ }
+
+ protected RetryVerdictAssert assertOnWriteTimeout(
+ ConsistencyLevel cl, WriteType writeType, int blockFor, int received, int retryCount) {
+ return new RetryVerdictAssert(
+ policy.onWriteTimeoutVerdict(request, cl, writeType, blockFor, received, retryCount));
+ }
+
+ protected RetryVerdictAssert assertOnUnavailable(
+ ConsistencyLevel cl, int required, int alive, int retryCount) {
+ return new RetryVerdictAssert(
+ policy.onUnavailableVerdict(request, cl, required, alive, retryCount));
+ }
+
+ protected RetryVerdictAssert assertOnRequestAborted(
+ Class extends Throwable> errorClass, int retryCount) {
+ return new RetryVerdictAssert(
+ policy.onRequestAbortedVerdict(request, mock(errorClass), retryCount));
+ }
+
+ protected RetryVerdictAssert assertOnErrorResponse(
+ Class extends CoordinatorException> errorClass, int retryCount) {
+ return new RetryVerdictAssert(
+ policy.onErrorResponseVerdict(request, mock(errorClass), retryCount));
+ }
+
+ public static class RetryVerdictAssert extends AbstractAssert