diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/AstraCqlRetryPolicy.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/AstraCqlRetryPolicy.java new file mode 100644 index 0000000000..c21fa11f62 --- /dev/null +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/AstraCqlRetryPolicy.java @@ -0,0 +1,30 @@ +package io.stargate.sgv2.jsonapi.service.cqldriver; + +import com.datastax.oss.driver.api.core.context.DriverContext; +import com.datastax.oss.driver.api.core.retry.RetryDecision; + +/** + * In Astra, CQL router will be used and retry on the next node will fail. So, the decision will be + * to retry on the same node. + */ +public class AstraCqlRetryPolicy extends BaseCqlRetryPolicy { + + public AstraCqlRetryPolicy(DriverContext context, String profileName) { + super(context, profileName); + } + + @Override + protected RetryDecision retryDecisionForUnavailable() { + return RetryDecision.RETRY_SAME; + } + + @Override + protected RetryDecision retryDecisionForRequestAborted() { + return RetryDecision.RETRY_SAME; + } + + @Override + protected RetryDecision retryDecisionForErrorResponse() { + return RetryDecision.RETRY_SAME; + } +} diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/BaseCqlRetryPolicy.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/BaseCqlRetryPolicy.java new file mode 100644 index 0000000000..ad7197eea0 --- /dev/null +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/BaseCqlRetryPolicy.java @@ -0,0 +1,267 @@ +package io.stargate.sgv2.jsonapi.service.cqldriver; + +import com.datastax.oss.driver.api.core.ConsistencyLevel; +import com.datastax.oss.driver.api.core.connection.ClosedConnectionException; +import com.datastax.oss.driver.api.core.connection.HeartbeatException; +import com.datastax.oss.driver.api.core.context.DriverContext; +import com.datastax.oss.driver.api.core.retry.RetryDecision; +import com.datastax.oss.driver.api.core.retry.RetryPolicy; +import com.datastax.oss.driver.api.core.retry.RetryVerdict; +import com.datastax.oss.driver.api.core.servererrors.*; +import com.datastax.oss.driver.api.core.session.Request; +import com.datastax.oss.driver.internal.core.retry.DefaultRetryPolicy; +import edu.umd.cs.findbugs.annotations.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Custom retry policy tailored for DataAPI, providing distinct retry logic for specific scenarios + * compared to {@link DefaultRetryPolicy}. Logs are recorded at the INFO level instead of TRACE + * level, aligning with DataAPI logging standards. + */ +public class BaseCqlRetryPolicy implements RetryPolicy { + private static final Logger LOG = LoggerFactory.getLogger(BaseCqlRetryPolicy.class); + private static final int MAX_RETRIES = Integer.getInteger("stargate.cql_proxy.max_retries", 3); + + public BaseCqlRetryPolicy(DriverContext context, String profileName) {} + + /** + * {@inheritDoc} + * + *

This implementation triggers a maximum of {@code MAX_RETRIES} retry (to the same node), and + * only if enough replicas had responded to the read request but data was not retrieved amongst + * those. That usually means that enough replicas are alive to satisfy the consistency, but the + * coordinator picked a dead one for data retrieval, not having detected that replica as dead yet. + * The reasoning is that by the time we get the timeout, the dead replica will likely have been + * detected as dead and the retry has a high chance of success. Otherwise, the exception is + * rethrown. + */ + @Override + public RetryVerdict onReadTimeoutVerdict( + @NonNull Request request, + @NonNull ConsistencyLevel cl, + int blockFor, + int received, + boolean dataPresent, + int retryCount) { + + RetryDecision retryDecision = + (retryCount < MAX_RETRIES && received >= blockFor && !dataPresent) + ? RetryDecision.RETRY_SAME + : RetryDecision.RETHROW; + + if (LOG.isInfoEnabled()) { + LOG.info( + "Retrying on read timeout on same host (consistency: {}, required responses: {}, received responses: {}, data retrieved: {}, retries: {}, retry decision: {})", + cl, + blockFor, + received, + dataPresent, + retryCount, + retryDecision); + } + + return () -> retryDecision; + } + + /** + * {@inheritDoc} + * + *

This implementation triggers a maximum of {@code MAX_RETRIES} retries (to the same node), + * and only for {@code WriteType.CAS} and {@code WriteType.SIMPLE} write. The reasoning is that + * collections use lightweight transactions, the write type is either CAS or SIMPLE and tables use + * SIMPLE for writes. + */ + @Override + public RetryVerdict onWriteTimeoutVerdict( + @NonNull Request request, + @NonNull ConsistencyLevel cl, + @NonNull WriteType writeType, + int blockFor, + int received, + int retryCount) { + + final RetryDecision retryDecision = + (retryCount < MAX_RETRIES && (writeType == WriteType.CAS || writeType == WriteType.SIMPLE)) + ? RetryDecision.RETRY_SAME + : RetryDecision.RETHROW; + + if (LOG.isInfoEnabled()) { + LOG.info( + "Retrying on write timeout on same host (consistency: {}, write type: {}, required acknowledgments: {}, received acknowledgments: {}, retries: {}, retry decision: {})", + cl, + writeType, + blockFor, + received, + retryCount, + retryDecision); + } + + return () -> retryDecision; + } + + /** + * {@inheritDoc} + * + *

This implementation triggers a maximum of {@code MAX_RETRIES} retry, to the next node in the + * query plan. The rationale is that the first coordinator might have been network-isolated from + * all other nodes (thinking they're down), but still able to communicate with the client; in that + * case, retrying on the same host has almost no chance of success, but moving to the next host + * might solve the issue. + * + *

Note: In Astra, CQL router will be used and retry on the next node will fail. So, the + * decision will be to retry on the same node. + */ + @Override + public RetryVerdict onUnavailableVerdict( + @NonNull Request request, + @NonNull ConsistencyLevel cl, + int required, + int alive, + int retryCount) { + + RetryDecision retryDecision = + (retryCount < MAX_RETRIES) ? retryDecisionForUnavailable() : RetryDecision.RETHROW; + + if (LOG.isInfoEnabled()) { + LOG.info( + "Retrying on unavailable exception on next host (consistency: {}, required replica: {}, alive replica: {}, retries: {}, retry decision: {})", + cl, + required, + alive, + retryCount, + retryDecision); + } + + return () -> retryDecision; + } + + /** + * {@inheritDoc} + * + *

This implementation retries on the next node if the connection was closed, and rethrows + * (assuming a driver bug) in all other cases. + * + *

Note: In Astra, CQL router will be used and retry on the next node will fail. So, the + * decision will be to retry on the same node. + */ + @Override + public RetryVerdict onRequestAbortedVerdict( + @NonNull Request request, @NonNull Throwable error, int retryCount) { + + RetryDecision retryDecision = + (retryCount < MAX_RETRIES + && (error instanceof ClosedConnectionException + || error instanceof HeartbeatException)) + ? retryDecisionForRequestAborted() + : RetryDecision.RETHROW; + + if (LOG.isInfoEnabled()) { + LOG.info( + "Retrying on aborted request on next host (retries: {}, error: {}, retry decision: {})", + retryCount, + error, + retryDecision); + } + + return () -> retryDecision; + } + + /** + * {@inheritDoc} + * + *

This implementation rethrows read and write failures, and retries other errors on the next + * node. + * + *

Note: In Astra, CQL router will be used and retry on the next node will fail. So, the + * decision will be to retry on the same node. + */ + @Override + public RetryVerdict onErrorResponseVerdict( + @NonNull Request request, @NonNull CoordinatorException error, int retryCount) { + + // Issue1830: CASWriteUnknownException and TruncateException have been included in the default + // case. + var retryDecision = + switch (error) { + case ReadFailureException e -> RetryDecision.RETHROW; + case WriteFailureException e -> RetryDecision.RETHROW; + default -> + (retryCount < MAX_RETRIES) ? retryDecisionForErrorResponse() : RetryDecision.RETHROW; + }; + + if (LOG.isInfoEnabled()) { + LOG.info( + "Retrying on node error on next host (retries: {}, error: {}, retry decision: {})", + retryCount, + error, + retryDecision); + } + + return () -> retryDecision; + } + + @Override + public void close() {} + + protected RetryDecision retryDecisionForUnavailable() { + return RetryDecision.RETRY_NEXT; + } + + protected RetryDecision retryDecisionForRequestAborted() { + return RetryDecision.RETRY_NEXT; + } + + protected RetryDecision retryDecisionForErrorResponse() { + return RetryDecision.RETRY_NEXT; + } + + @Override + @Deprecated + public RetryDecision onReadTimeout( + @NonNull Request request, + @NonNull ConsistencyLevel cl, + int blockFor, + int received, + boolean dataPresent, + int retryCount) { + throw new UnsupportedOperationException("onReadTimeout"); + } + + @Override + @Deprecated + public RetryDecision onWriteTimeout( + @NonNull Request request, + @NonNull ConsistencyLevel cl, + @NonNull WriteType writeType, + int blockFor, + int received, + int retryCount) { + throw new UnsupportedOperationException("onWriteTimeout"); + } + + @Override + @Deprecated + public RetryDecision onUnavailable( + @NonNull Request request, + @NonNull ConsistencyLevel cl, + int required, + int alive, + int retryCount) { + throw new UnsupportedOperationException("onUnavailable"); + } + + @Override + @Deprecated + public RetryDecision onRequestAborted( + @NonNull Request request, @NonNull Throwable error, int retryCount) { + throw new UnsupportedOperationException("onRequestAborted"); + } + + @Override + @Deprecated + public RetryDecision onErrorResponse( + @NonNull Request request, @NonNull CoordinatorException error, int retryCount) { + throw new UnsupportedOperationException("onErrorResponse"); + } +} diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlProxyRetryPolicy.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlProxyRetryPolicy.java deleted file mode 100644 index 9fdd745e8e..0000000000 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlProxyRetryPolicy.java +++ /dev/null @@ -1,150 +0,0 @@ -package io.stargate.sgv2.jsonapi.service.cqldriver; - -import static com.datastax.oss.driver.internal.core.retry.DefaultRetryPolicy.RETRYING_ON_UNAVAILABLE; - -import com.datastax.oss.driver.api.core.ConsistencyLevel; -import com.datastax.oss.driver.api.core.context.DriverContext; -import com.datastax.oss.driver.api.core.retry.RetryDecision; -import com.datastax.oss.driver.api.core.retry.RetryPolicy; -import com.datastax.oss.driver.api.core.retry.RetryVerdict; -import com.datastax.oss.driver.api.core.servererrors.CoordinatorException; -import com.datastax.oss.driver.api.core.servererrors.WriteType; -import com.datastax.oss.driver.api.core.session.Request; -import edu.umd.cs.findbugs.annotations.NonNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This retry policy while executing cql statements is useful when the requests are delegated to a - * proxy layer which is responsible for retrying the requests. This policy will only retry once if - * the intermediate layer is unavailable for some reason. In all other cases, this will rethrow the - * exception to avoid retrying the requests at the driver level. - */ -public class CqlProxyRetryPolicy implements RetryPolicy { - private static final Logger LOG = LoggerFactory.getLogger(CqlProxyRetryPolicy.class); - private final String logPrefix; - - private static final int MAX_RETRIES = Integer.getInteger("stargate.cql_proxy.max_retries", 3); - - public CqlProxyRetryPolicy(DriverContext context, String profileName) { - this.logPrefix = (context != null ? context.getSessionName() : null) + "|" + profileName; - } - - @Override - public RetryDecision onReadTimeout( - @NonNull Request request, - @NonNull ConsistencyLevel cl, - int blockFor, - int received, - boolean dataPresent, - int retryCount) { - return RetryDecision.RETHROW; - } - - @Override - public RetryVerdict onReadTimeoutVerdict( - @NonNull Request request, - @NonNull ConsistencyLevel cl, - int blockFor, - int received, - boolean dataPresent, - int retryCount) { - RetryDecision retryDecision = - onReadTimeout(request, cl, blockFor, received, dataPresent, retryCount); - return () -> retryDecision; - } - - @Override - public RetryDecision onWriteTimeout( - @NonNull Request request, - @NonNull ConsistencyLevel cl, - @NonNull WriteType writeType, - int blockFor, - int received, - int retryCount) { - var retryDecision = RetryDecision.RETHROW; - if (retryCount < MAX_RETRIES && writeType == WriteType.CAS) { - retryDecision = RetryDecision.RETRY_SAME; - } - if (LOG.isInfoEnabled()) { - LOG.info( - "Write timeout for request writeType : {}, retryCount: {}, consistency level: {} -> retry decision is: {}", - writeType, - retryCount, - cl, - retryDecision); - } - return retryDecision; - } - - @Override - public RetryVerdict onWriteTimeoutVerdict( - @NonNull Request request, - @NonNull ConsistencyLevel cl, - @NonNull WriteType writeType, - int blockFor, - int received, - int retryCount) { - RetryDecision retryDecision = - onWriteTimeout(request, cl, writeType, blockFor, received, retryCount); - return () -> retryDecision; - } - - @Override - public RetryDecision onUnavailable( - @NonNull Request request, - @NonNull ConsistencyLevel cl, - int required, - int alive, - int retryCount) { - RetryDecision decision = (retryCount == 0) ? RetryDecision.RETRY_SAME : RetryDecision.RETHROW; - - if (decision == RetryDecision.RETRY_SAME && LOG.isTraceEnabled()) { - LOG.trace(RETRYING_ON_UNAVAILABLE, logPrefix, cl, required, alive, retryCount); - } - - return decision; - } - - @Override - public RetryVerdict onUnavailableVerdict( - @NonNull Request request, - @NonNull ConsistencyLevel cl, - int required, - int alive, - int retryCount) { - RetryDecision retryDecision = onUnavailable(request, cl, required, alive, retryCount); - return () -> retryDecision; - } - - @Override - public RetryDecision onRequestAborted( - @NonNull Request request, @NonNull Throwable error, int retryCount) { - return RetryDecision.RETHROW; - } - - @Override - public RetryVerdict onRequestAbortedVerdict( - @NonNull Request request, @NonNull Throwable error, int retryCount) { - RetryDecision retryDecision = onRequestAborted(request, error, retryCount); - return () -> retryDecision; - } - - @Override - public RetryDecision onErrorResponse( - @NonNull Request request, @NonNull CoordinatorException error, int retryCount) { - return RetryDecision.RETHROW; - } - - @Override - public RetryVerdict onErrorResponseVerdict( - @NonNull Request request, @NonNull CoordinatorException error, int retryCount) { - RetryDecision retryDecision = onErrorResponse(request, error, retryCount); - return () -> retryDecision; - } - - @Override - public void close() { - // nothing to do - } -} diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 6826cc80a0..589dd0d0b1 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -3,7 +3,7 @@ datastax-java-driver { version = V4 } advanced.retry-policy { - class = io.stargate.sgv2.jsonapi.service.cqldriver.CqlProxyRetryPolicy + class = io.stargate.sgv2.jsonapi.service.cqldriver.BaseCqlRetryPolicy } advanced.session-leak.threshold = 0 advanced.connection { diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/AstraCqlRetryPolicyTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/AstraCqlRetryPolicyTest.java new file mode 100644 index 0000000000..e73cd2fc70 --- /dev/null +++ b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/AstraCqlRetryPolicyTest.java @@ -0,0 +1,61 @@ +package io.stargate.sgv2.jsonapi.service.cqldriver.retry; + +import static com.datastax.oss.driver.api.core.DefaultConsistencyLevel.QUORUM; +import static com.datastax.oss.driver.api.core.retry.RetryDecision.*; + +import com.datastax.oss.driver.api.core.connection.ClosedConnectionException; +import com.datastax.oss.driver.api.core.connection.HeartbeatException; +import com.datastax.oss.driver.api.core.servererrors.CASWriteUnknownException; +import com.datastax.oss.driver.api.core.servererrors.ReadFailureException; +import com.datastax.oss.driver.api.core.servererrors.TruncateException; +import com.datastax.oss.driver.api.core.servererrors.WriteFailureException; +import io.stargate.sgv2.jsonapi.service.cqldriver.AstraCqlRetryPolicy; +import org.junit.Test; + +public class AstraCqlRetryPolicyTest extends RetryPolicyTestBase { + + public AstraCqlRetryPolicyTest() { + super(new AstraCqlRetryPolicy(null, null)); + } + + @Test + public void shouldProcessUnavailable() { + assertOnUnavailable(QUORUM, 2, 1, 0).hasDecision(RETRY_SAME); + assertOnUnavailable(QUORUM, 2, 1, 1).hasDecision(RETRY_SAME); + assertOnUnavailable(QUORUM, 2, 1, 2).hasDecision(RETRY_SAME); + assertOnUnavailable(QUORUM, 2, 1, 3).hasDecision(RETHROW); + } + + @Test + public void shouldProcessAbortedRequest() { + assertOnRequestAborted(ClosedConnectionException.class, 0).hasDecision(RETRY_SAME); + assertOnRequestAborted(ClosedConnectionException.class, 1).hasDecision(RETRY_SAME); + assertOnRequestAborted(ClosedConnectionException.class, 2).hasDecision(RETRY_SAME); + assertOnRequestAborted(ClosedConnectionException.class, 3).hasDecision(RETHROW); + + assertOnRequestAborted(HeartbeatException.class, 0).hasDecision(RETRY_SAME); + assertOnRequestAborted(HeartbeatException.class, 1).hasDecision(RETRY_SAME); + assertOnRequestAborted(HeartbeatException.class, 2).hasDecision(RETRY_SAME); + assertOnRequestAborted(HeartbeatException.class, 3).hasDecision(RETHROW); + + assertOnRequestAborted(Throwable.class, 0).hasDecision(RETHROW); + } + + @Test + public void shouldProcessErrorResponse() { + // rethrow on ReadFailureException and WriteFailureException + assertOnErrorResponse(ReadFailureException.class, 0).hasDecision(RETHROW); + assertOnErrorResponse(WriteFailureException.class, 0).hasDecision(RETHROW); + + // Issue1830 - retry 3 times on CASWriteUnknownException and TruncateException + assertOnErrorResponse(CASWriteUnknownException.class, 0).hasDecision(RETRY_SAME); + assertOnErrorResponse(CASWriteUnknownException.class, 1).hasDecision(RETRY_SAME); + assertOnErrorResponse(CASWriteUnknownException.class, 2).hasDecision(RETRY_SAME); + assertOnErrorResponse(CASWriteUnknownException.class, 3).hasDecision(RETHROW); + + assertOnErrorResponse(TruncateException.class, 0).hasDecision(RETRY_SAME); + assertOnErrorResponse(TruncateException.class, 1).hasDecision(RETRY_SAME); + assertOnErrorResponse(TruncateException.class, 2).hasDecision(RETRY_SAME); + assertOnErrorResponse(TruncateException.class, 3).hasDecision(RETHROW); + } +} diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/BaseCqlRetryPolicyTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/BaseCqlRetryPolicyTest.java new file mode 100644 index 0000000000..c7a3ff1d66 --- /dev/null +++ b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/BaseCqlRetryPolicyTest.java @@ -0,0 +1,87 @@ +package io.stargate.sgv2.jsonapi.service.cqldriver.retry; + +import static com.datastax.oss.driver.api.core.DefaultConsistencyLevel.QUORUM; +import static com.datastax.oss.driver.api.core.retry.RetryDecision.RETHROW; +import static com.datastax.oss.driver.api.core.retry.RetryDecision.RETRY_NEXT; +import static com.datastax.oss.driver.api.core.retry.RetryDecision.RETRY_SAME; +import static com.datastax.oss.driver.api.core.servererrors.DefaultWriteType.CAS; +import static com.datastax.oss.driver.api.core.servererrors.DefaultWriteType.SIMPLE; + +import com.datastax.oss.driver.api.core.connection.ClosedConnectionException; +import com.datastax.oss.driver.api.core.connection.HeartbeatException; +import com.datastax.oss.driver.api.core.servererrors.*; +import io.stargate.sgv2.jsonapi.service.cqldriver.BaseCqlRetryPolicy; +import org.junit.Test; + +public class BaseCqlRetryPolicyTest extends RetryPolicyTestBase { + + public BaseCqlRetryPolicyTest() { + super(new BaseCqlRetryPolicy(null, null)); + } + + @Test + public void shouldProcessReadTimeouts() { + assertOnReadTimeout(QUORUM, 2, 2, false, 0).hasDecision(RETRY_SAME); + assertOnReadTimeout(QUORUM, 2, 2, false, 1).hasDecision(RETRY_SAME); + assertOnReadTimeout(QUORUM, 2, 2, false, 2).hasDecision(RETRY_SAME); + assertOnReadTimeout(QUORUM, 2, 2, false, 3).hasDecision(RETHROW); + + assertOnReadTimeout(QUORUM, 2, 2, true, 0).hasDecision(RETHROW); + assertOnReadTimeout(QUORUM, 2, 1, true, 0).hasDecision(RETHROW); + assertOnReadTimeout(QUORUM, 2, 1, false, 0).hasDecision(RETHROW); + } + + @Test + public void shouldProcessWriteTimeouts() { + assertOnWriteTimeout(QUORUM, CAS, 2, 0, 0).hasDecision(RETRY_SAME); + assertOnWriteTimeout(QUORUM, CAS, 2, 0, 1).hasDecision(RETRY_SAME); + assertOnWriteTimeout(QUORUM, CAS, 2, 0, 2).hasDecision(RETRY_SAME); + assertOnWriteTimeout(QUORUM, CAS, 2, 0, 3).hasDecision(RETHROW); + + assertOnWriteTimeout(QUORUM, SIMPLE, 2, 0, 0).hasDecision(RETRY_SAME); + assertOnWriteTimeout(QUORUM, SIMPLE, 2, 0, 1).hasDecision(RETRY_SAME); + assertOnWriteTimeout(QUORUM, SIMPLE, 2, 0, 2).hasDecision(RETRY_SAME); + assertOnWriteTimeout(QUORUM, SIMPLE, 2, 0, 3).hasDecision(RETHROW); + } + + @Test + public void shouldProcessUnavailable() { + assertOnUnavailable(QUORUM, 2, 1, 0).hasDecision(RETRY_NEXT); + assertOnUnavailable(QUORUM, 2, 1, 1).hasDecision(RETRY_NEXT); + assertOnUnavailable(QUORUM, 2, 1, 2).hasDecision(RETRY_NEXT); + assertOnUnavailable(QUORUM, 2, 1, 3).hasDecision(RETHROW); + } + + @Test + public void shouldProcessAbortedRequest() { + assertOnRequestAborted(ClosedConnectionException.class, 0).hasDecision(RETRY_NEXT); + assertOnRequestAborted(ClosedConnectionException.class, 1).hasDecision(RETRY_NEXT); + assertOnRequestAborted(ClosedConnectionException.class, 2).hasDecision(RETRY_NEXT); + assertOnRequestAborted(ClosedConnectionException.class, 3).hasDecision(RETHROW); + + assertOnRequestAborted(HeartbeatException.class, 0).hasDecision(RETRY_NEXT); + assertOnRequestAborted(HeartbeatException.class, 1).hasDecision(RETRY_NEXT); + assertOnRequestAborted(HeartbeatException.class, 2).hasDecision(RETRY_NEXT); + assertOnRequestAborted(HeartbeatException.class, 3).hasDecision(RETHROW); + + assertOnRequestAborted(Throwable.class, 0).hasDecision(RETHROW); + } + + @Test + public void shouldProcessErrorResponse() { + // rethrow on ReadFailureException and WriteFailureException + assertOnErrorResponse(ReadFailureException.class, 0).hasDecision(RETHROW); + assertOnErrorResponse(WriteFailureException.class, 0).hasDecision(RETHROW); + + // Issue1830 - retry 3 times on CASWriteUnknownException and TruncateException + assertOnErrorResponse(CASWriteUnknownException.class, 0).hasDecision(RETRY_NEXT); + assertOnErrorResponse(CASWriteUnknownException.class, 1).hasDecision(RETRY_NEXT); + assertOnErrorResponse(CASWriteUnknownException.class, 2).hasDecision(RETRY_NEXT); + assertOnErrorResponse(CASWriteUnknownException.class, 3).hasDecision(RETHROW); + + assertOnErrorResponse(TruncateException.class, 0).hasDecision(RETRY_NEXT); + assertOnErrorResponse(TruncateException.class, 1).hasDecision(RETRY_NEXT); + assertOnErrorResponse(TruncateException.class, 2).hasDecision(RETRY_NEXT); + assertOnErrorResponse(TruncateException.class, 3).hasDecision(RETHROW); + } +} diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/RetryPolicyTestBase.java b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/RetryPolicyTestBase.java new file mode 100644 index 0000000000..0fdda57840 --- /dev/null +++ b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/RetryPolicyTestBase.java @@ -0,0 +1,68 @@ +package io.stargate.sgv2.jsonapi.service.cqldriver.retry; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +import com.datastax.oss.driver.api.core.ConsistencyLevel; +import com.datastax.oss.driver.api.core.retry.RetryDecision; +import com.datastax.oss.driver.api.core.retry.RetryPolicy; +import com.datastax.oss.driver.api.core.retry.RetryVerdict; +import com.datastax.oss.driver.api.core.servererrors.CoordinatorException; +import com.datastax.oss.driver.api.core.servererrors.WriteType; +import com.datastax.oss.driver.api.core.session.Request; +import org.assertj.core.api.AbstractAssert; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public abstract class RetryPolicyTestBase { + private final RetryPolicy policy; + + @Mock private Request request; + + protected RetryPolicyTestBase(RetryPolicy policy) { + this.policy = policy; + } + + protected RetryVerdictAssert assertOnReadTimeout( + ConsistencyLevel cl, int blockFor, int received, boolean dataPresent, int retryCount) { + return new RetryVerdictAssert( + policy.onReadTimeoutVerdict(request, cl, blockFor, received, dataPresent, retryCount)); + } + + protected RetryVerdictAssert assertOnWriteTimeout( + ConsistencyLevel cl, WriteType writeType, int blockFor, int received, int retryCount) { + return new RetryVerdictAssert( + policy.onWriteTimeoutVerdict(request, cl, writeType, blockFor, received, retryCount)); + } + + protected RetryVerdictAssert assertOnUnavailable( + ConsistencyLevel cl, int required, int alive, int retryCount) { + return new RetryVerdictAssert( + policy.onUnavailableVerdict(request, cl, required, alive, retryCount)); + } + + protected RetryVerdictAssert assertOnRequestAborted( + Class errorClass, int retryCount) { + return new RetryVerdictAssert( + policy.onRequestAbortedVerdict(request, mock(errorClass), retryCount)); + } + + protected RetryVerdictAssert assertOnErrorResponse( + Class errorClass, int retryCount) { + return new RetryVerdictAssert( + policy.onErrorResponseVerdict(request, mock(errorClass), retryCount)); + } + + public static class RetryVerdictAssert extends AbstractAssert { + RetryVerdictAssert(RetryVerdict actual) { + super(actual, RetryVerdictAssert.class); + } + + public RetryVerdictAssert hasDecision(RetryDecision decision) { + assertThat(actual.getRetryDecision()).isEqualTo(decision); + return this; + } + } +}