From 13902c35526caadaa0e0456f8b9dd2555aeb5d38 Mon Sep 17 00:00:00 2001 From: Hazel Date: Tue, 21 Jan 2025 18:02:38 -0800 Subject: [PATCH 01/11] create new class --- .../service/cqldriver/CqlRetryPolicy.java | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java new file mode 100644 index 0000000000..4665d9aea4 --- /dev/null +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java @@ -0,0 +1,43 @@ +package io.stargate.sgv2.jsonapi.service.cqldriver; + +import com.datastax.oss.driver.api.core.ConsistencyLevel; +import com.datastax.oss.driver.api.core.context.DriverContext; +import com.datastax.oss.driver.api.core.retry.RetryDecision; +import com.datastax.oss.driver.api.core.retry.RetryVerdict; +import com.datastax.oss.driver.api.core.servererrors.WriteType; +import com.datastax.oss.driver.api.core.session.Request; +import com.datastax.oss.driver.internal.core.retry.DefaultRetryPolicy; +import edu.umd.cs.findbugs.annotations.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CqlRetryPolicy extends DefaultRetryPolicy { + private final String logPrefix; + private static final Logger LOG = LoggerFactory.getLogger(CqlRetryPolicy.class); + private static final int MAX_RETRIES = Integer.getInteger("stargate.cql_proxy.max_retries", 3); + + public CqlRetryPolicy(DriverContext context, String profileName) { + super(context, profileName); + this.logPrefix = (context != null ? context.getSessionName() : null) + "|" + profileName; + } + + @Override + public RetryVerdict onWriteTimeoutVerdict( + @NonNull Request request, + @NonNull ConsistencyLevel cl, + @NonNull WriteType writeType, + int blockFor, + int received, + int retryCount) { + final RetryDecision retryDecision; + if (retryCount < MAX_RETRIES && (writeType == WriteType.CAS || writeType == WriteType.SIMPLE)) { + retryDecision = RetryDecision.RETRY_SAME; + } else { + retryDecision = RetryDecision.RETHROW; + } + if (LOG.isInfoEnabled()) { + LOG.info(RETRYING_ON_WRITE_TIMEOUT, logPrefix, cl, writeType, blockFor, received, retryCount); + } + return () -> retryDecision; + } +} From 6cf4079cc4c0a775b7c719d2f260cdc15b0da0b6 Mon Sep 17 00:00:00 2001 From: Hazel Date: Wed, 22 Jan 2025 10:39:49 -0800 Subject: [PATCH 02/11] add log --- .../service/cqldriver/CqlRetryPolicy.java | 108 +++++++++++++++++- 1 file changed, 106 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java index 4665d9aea4..1372856ed5 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java @@ -4,23 +4,76 @@ import com.datastax.oss.driver.api.core.context.DriverContext; import com.datastax.oss.driver.api.core.retry.RetryDecision; import com.datastax.oss.driver.api.core.retry.RetryVerdict; +import com.datastax.oss.driver.api.core.servererrors.CoordinatorException; import com.datastax.oss.driver.api.core.servererrors.WriteType; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.internal.core.retry.DefaultRetryPolicy; +import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting; import edu.umd.cs.findbugs.annotations.NonNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class CqlRetryPolicy extends DefaultRetryPolicy { - private final String logPrefix; private static final Logger LOG = LoggerFactory.getLogger(CqlRetryPolicy.class); private static final int MAX_RETRIES = Integer.getInteger("stargate.cql_proxy.max_retries", 3); + @VisibleForTesting + public static final String RETRYING_ON_READ_TIMEOUT = + "[{}] Retrying on read timeout on same host (consistency: {}, required responses: {}, " + + "received responses: {}, data retrieved: {}, retries: {}, retry decision: {})"; + + @VisibleForTesting + public static final String RETRYING_ON_WRITE_TIMEOUT = + "[{}] Retrying on write timeout on same host (consistency: {}, write type: {}, " + + "required acknowledgments: {}, received acknowledgments: {}, retries: {}, retry decision: {})"; + + @VisibleForTesting + public static final String RETRYING_ON_UNAVAILABLE = + "[{}] Retrying on unavailable exception on next host (consistency: {}, " + + "required replica: {}, alive replica: {}, retries: {}, retry decision: {})"; + + @VisibleForTesting + public static final String RETRYING_ON_ABORTED = + "[{}] Retrying on aborted request on next host (retries: {}, error: {}, retry decision: {})"; + + @VisibleForTesting + public static final String RETRYING_ON_ERROR = + "[{}] Retrying on node error on next host (retries: {}, error: {}, retry decision: {})"; + + private final String logPrefix; + public CqlRetryPolicy(DriverContext context, String profileName) { super(context, profileName); this.logPrefix = (context != null ? context.getSessionName() : null) + "|" + profileName; } + @Override + public RetryVerdict onReadTimeoutVerdict( + @NonNull Request request, + @NonNull ConsistencyLevel cl, + int blockFor, + int received, + boolean dataPresent, + int retryCount) { + var retryVerdict = + super.onReadTimeoutVerdict(request, cl, blockFor, received, dataPresent, retryCount); + var retryDecision = retryVerdict.getRetryDecision(); + + if (LOG.isInfoEnabled()) { + LOG.info( + RETRYING_ON_READ_TIMEOUT, + logPrefix, + cl, + blockFor, + received, + false, + retryCount, + retryDecision); + } + + return retryVerdict; + } + @Override public RetryVerdict onWriteTimeoutVerdict( @NonNull Request request, @@ -36,8 +89,59 @@ public RetryVerdict onWriteTimeoutVerdict( retryDecision = RetryDecision.RETHROW; } if (LOG.isInfoEnabled()) { - LOG.info(RETRYING_ON_WRITE_TIMEOUT, logPrefix, cl, writeType, blockFor, received, retryCount); + LOG.info( + RETRYING_ON_WRITE_TIMEOUT, + logPrefix, + cl, + writeType, + blockFor, + received, + retryCount, + retryDecision); } return () -> retryDecision; } + + @Override + public RetryVerdict onUnavailableVerdict( + @NonNull Request request, + @NonNull ConsistencyLevel cl, + int required, + int alive, + int retryCount) { + var retryVerdict = super.onUnavailableVerdict(request, cl, required, alive, retryCount); + var retryDecision = retryVerdict.getRetryDecision(); + + if (LOG.isInfoEnabled()) { + LOG.info(RETRYING_ON_UNAVAILABLE, logPrefix, cl, required, alive, retryCount, retryDecision); + } + + return retryVerdict; + } + + @Override + public RetryVerdict onRequestAbortedVerdict( + @NonNull Request request, @NonNull Throwable error, int retryCount) { + var retryVerdict = super.onRequestAbortedVerdict(request, error, retryCount); + var retryDecision = retryVerdict.getRetryDecision(); + + if (LOG.isInfoEnabled()) { + LOG.info(RETRYING_ON_ABORTED, logPrefix, retryCount, error, retryDecision); + } + + return retryVerdict; + } + + @Override + public RetryVerdict onErrorResponseVerdict( + @NonNull Request request, @NonNull CoordinatorException error, int retryCount) { + var retryVerdict = super.onErrorResponseVerdict(request, error, retryCount); + var retryDecision = retryVerdict.getRetryDecision(); + + if (LOG.isInfoEnabled()) { + LOG.info(RETRYING_ON_ERROR, logPrefix, retryCount, error, retryDecision); + } + + return retryVerdict; + } } From 6fa612a9aec534250af812f94789627d7237e33b Mon Sep 17 00:00:00 2001 From: Hazel Date: Wed, 22 Jan 2025 11:57:33 -0800 Subject: [PATCH 03/11] add new retry logic --- .../service/cqldriver/CqlRetryPolicy.java | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java index 1372856ed5..222d6a323a 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java @@ -4,7 +4,9 @@ import com.datastax.oss.driver.api.core.context.DriverContext; import com.datastax.oss.driver.api.core.retry.RetryDecision; import com.datastax.oss.driver.api.core.retry.RetryVerdict; +import com.datastax.oss.driver.api.core.servererrors.CASWriteUnknownException; import com.datastax.oss.driver.api.core.servererrors.CoordinatorException; +import com.datastax.oss.driver.api.core.servererrors.TruncateException; import com.datastax.oss.driver.api.core.servererrors.WriteType; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.internal.core.retry.DefaultRetryPolicy; @@ -135,13 +137,25 @@ public RetryVerdict onRequestAbortedVerdict( @Override public RetryVerdict onErrorResponseVerdict( @NonNull Request request, @NonNull CoordinatorException error, int retryCount) { - var retryVerdict = super.onErrorResponseVerdict(request, error, retryCount); - var retryDecision = retryVerdict.getRetryDecision(); + + var retryDecision = + switch (error) { + case CASWriteUnknownException e -> + (retryCount < MAX_RETRIES) ? RetryDecision.RETRY_SAME : RetryDecision.RETHROW; + + case TruncateException e -> + (retryCount < MAX_RETRIES) ? RetryDecision.RETRY_SAME : RetryDecision.RETHROW; + + default -> { + var retryVerdict = super.onErrorResponseVerdict(request, error, retryCount); + yield retryVerdict.getRetryDecision(); + } + }; if (LOG.isInfoEnabled()) { LOG.info(RETRYING_ON_ERROR, logPrefix, retryCount, error, retryDecision); } - return retryVerdict; + return () -> retryDecision; } } From e85717a8c1d10561758abacd1bfc9c32242d1a78 Mon Sep 17 00:00:00 2001 From: Hazel Date: Wed, 22 Jan 2025 15:40:43 -0800 Subject: [PATCH 04/11] add todo --- .../sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java index 222d6a323a..add519b1bb 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java @@ -85,6 +85,7 @@ public RetryVerdict onWriteTimeoutVerdict( int received, int retryCount) { final RetryDecision retryDecision; + // TODO(Hazel): only retry two write type or all? if (retryCount < MAX_RETRIES && (writeType == WriteType.CAS || writeType == WriteType.SIMPLE)) { retryDecision = RetryDecision.RETRY_SAME; } else { @@ -111,6 +112,8 @@ public RetryVerdict onUnavailableVerdict( int required, int alive, int retryCount) { + // TODO(Hazel): no error passed in this method, cannot tailor the retry decision for + // UnavailableException, override the default retry 1 to retry 3? var retryVerdict = super.onUnavailableVerdict(request, cl, required, alive, retryCount); var retryDecision = retryVerdict.getRetryDecision(); @@ -140,16 +143,14 @@ public RetryVerdict onErrorResponseVerdict( var retryDecision = switch (error) { + // TODO(Hazel): what decision? RETRY_SAME or RETRY_NEXT? case CASWriteUnknownException e -> (retryCount < MAX_RETRIES) ? RetryDecision.RETRY_SAME : RetryDecision.RETHROW; case TruncateException e -> (retryCount < MAX_RETRIES) ? RetryDecision.RETRY_SAME : RetryDecision.RETHROW; - default -> { - var retryVerdict = super.onErrorResponseVerdict(request, error, retryCount); - yield retryVerdict.getRetryDecision(); - } + default -> super.onErrorResponseVerdict(request, error, retryCount).getRetryDecision(); }; if (LOG.isInfoEnabled()) { From caac7e41eca2eabce7a5343a30c0feee03e22cda Mon Sep 17 00:00:00 2001 From: Hazel Date: Wed, 22 Jan 2025 16:04:16 -0800 Subject: [PATCH 05/11] add java doc --- .../service/cqldriver/CqlRetryPolicy.java | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java index add519b1bb..4b59cef39c 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java @@ -15,6 +15,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Custom retry policy tailored for DataAPI, providing distinct retry logic for specific scenarios + * compared to {@link DefaultRetryPolicy}. + * + *

Key differences from the default implementation: + * + *

    + *
  • Overrides {@code onWriteTimeoutVerdict} and {@code onErrorResponseVerdict} to customize + * retry behavior for write timeouts and error responses. + *
  • Other methods retain the default logic but include additional log messages. + *
  • Logs provide enhanced details, including errors and retry decisions, for improved debugging + * and monitoring. + *
  • Logs are recorded at the INFO level instead of TRACE level, aligning with DataAPI logging + * standards. + *
+ */ public class CqlRetryPolicy extends DefaultRetryPolicy { private static final Logger LOG = LoggerFactory.getLogger(CqlRetryPolicy.class); private static final int MAX_RETRIES = Integer.getInteger("stargate.cql_proxy.max_retries", 3); @@ -143,12 +159,13 @@ public RetryVerdict onErrorResponseVerdict( var retryDecision = switch (error) { - // TODO(Hazel): what decision? RETRY_SAME or RETRY_NEXT? + // TODO(Hazel): what decision? RETRY_SAME or RETRY_NEXT? default doesn't care the + // retryCount case CASWriteUnknownException e -> - (retryCount < MAX_RETRIES) ? RetryDecision.RETRY_SAME : RetryDecision.RETHROW; + (retryCount < MAX_RETRIES) ? RetryDecision.RETRY_NEXT : RetryDecision.RETHROW; case TruncateException e -> - (retryCount < MAX_RETRIES) ? RetryDecision.RETRY_SAME : RetryDecision.RETHROW; + (retryCount < MAX_RETRIES) ? RetryDecision.RETRY_NEXT : RetryDecision.RETHROW; default -> super.onErrorResponseVerdict(request, error, retryCount).getRetryDecision(); }; From e06dbdb5c89b26645c3ce040099cb9592c8b86bf Mon Sep 17 00:00:00 2001 From: Hazel Date: Wed, 22 Jan 2025 16:05:17 -0800 Subject: [PATCH 06/11] add unit test --- .../cqldriver/retry/CqlRetryPolicyTest.java | 82 +++++++++++++++++++ .../cqldriver/retry/RetryPolicyTestBase.java | 68 +++++++++++++++ 2 files changed, 150 insertions(+) create mode 100644 src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/CqlRetryPolicyTest.java create mode 100644 src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/RetryPolicyTestBase.java diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/CqlRetryPolicyTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/CqlRetryPolicyTest.java new file mode 100644 index 0000000000..0c3a61a751 --- /dev/null +++ b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/CqlRetryPolicyTest.java @@ -0,0 +1,82 @@ +package io.stargate.sgv2.jsonapi.service.cqldriver.retry; + +import static com.datastax.oss.driver.api.core.DefaultConsistencyLevel.QUORUM; +import static com.datastax.oss.driver.api.core.retry.RetryDecision.RETHROW; +import static com.datastax.oss.driver.api.core.retry.RetryDecision.RETRY_NEXT; +import static com.datastax.oss.driver.api.core.retry.RetryDecision.RETRY_SAME; +import static com.datastax.oss.driver.api.core.servererrors.DefaultWriteType.CAS; +import static com.datastax.oss.driver.api.core.servererrors.DefaultWriteType.SIMPLE; + +import com.datastax.oss.driver.api.core.connection.ClosedConnectionException; +import com.datastax.oss.driver.api.core.connection.HeartbeatException; +import com.datastax.oss.driver.api.core.servererrors.*; +import io.stargate.sgv2.jsonapi.service.cqldriver.CqlRetryPolicy; +import org.junit.Test; + +public class CqlRetryPolicyTest extends RetryPolicyTestBase { + + public CqlRetryPolicyTest() { + super(new CqlRetryPolicy(null, null)); + } + + @Test + public void shouldProcessReadTimeouts() { + assertOnReadTimeout(QUORUM, 2, 2, false, 0).hasDecision(RETRY_SAME); + assertOnReadTimeout(QUORUM, 2, 2, false, 1).hasDecision(RETHROW); + assertOnReadTimeout(QUORUM, 2, 2, true, 0).hasDecision(RETHROW); + assertOnReadTimeout(QUORUM, 2, 1, true, 0).hasDecision(RETHROW); + assertOnReadTimeout(QUORUM, 2, 1, false, 0).hasDecision(RETHROW); + } + + @Test + public void shouldProcessWriteTimeouts() { + assertOnWriteTimeout(QUORUM, CAS, 2, 0, 0).hasDecision(RETRY_SAME); + assertOnWriteTimeout(QUORUM, CAS, 2, 0, 1).hasDecision(RETRY_SAME); + assertOnWriteTimeout(QUORUM, CAS, 2, 0, 2).hasDecision(RETRY_SAME); + assertOnWriteTimeout(QUORUM, CAS, 2, 0, 3).hasDecision(RETHROW); + + assertOnWriteTimeout(QUORUM, SIMPLE, 2, 0, 0).hasDecision(RETRY_SAME); + assertOnWriteTimeout(QUORUM, SIMPLE, 2, 0, 1).hasDecision(RETRY_SAME); + assertOnWriteTimeout(QUORUM, SIMPLE, 2, 0, 2).hasDecision(RETRY_SAME); + assertOnWriteTimeout(QUORUM, SIMPLE, 2, 0, 3).hasDecision(RETHROW); + } + + @Test + public void shouldProcessUnavailable() { + assertOnUnavailable(QUORUM, 2, 1, 0).hasDecision(RETRY_NEXT); + assertOnUnavailable(QUORUM, 2, 1, 1).hasDecision(RETHROW); + } + + @Test + public void shouldProcessAbortedRequest() { + assertOnRequestAborted(ClosedConnectionException.class, 0).hasDecision(RETRY_NEXT); + assertOnRequestAborted(ClosedConnectionException.class, 1).hasDecision(RETRY_NEXT); + assertOnRequestAborted(HeartbeatException.class, 0).hasDecision(RETRY_NEXT); + assertOnRequestAborted(HeartbeatException.class, 1).hasDecision(RETRY_NEXT); + assertOnRequestAborted(Throwable.class, 0).hasDecision(RETHROW); + } + + @Test + public void shouldProcessErrorResponse() { + // Inherited from DefaultRetryPolicy - rethrow on ReadFailureException and WriteFailureException + assertOnErrorResponse(ReadFailureException.class, 0).hasDecision(RETHROW); + assertOnErrorResponse(WriteFailureException.class, 0).hasDecision(RETHROW); + + // Override from CqlRetryPolicy - retry 3 times on CASWriteUnknownException and + // TruncateException + assertOnErrorResponse(CASWriteUnknownException.class, 0).hasDecision(RETRY_NEXT); + assertOnErrorResponse(CASWriteUnknownException.class, 1).hasDecision(RETRY_NEXT); + assertOnErrorResponse(CASWriteUnknownException.class, 2).hasDecision(RETRY_NEXT); + assertOnErrorResponse(CASWriteUnknownException.class, 3).hasDecision(RETHROW); + assertOnErrorResponse(TruncateException.class, 0).hasDecision(RETRY_NEXT); + assertOnErrorResponse(TruncateException.class, 1).hasDecision(RETRY_NEXT); + assertOnErrorResponse(TruncateException.class, 2).hasDecision(RETRY_NEXT); + assertOnErrorResponse(TruncateException.class, 3).hasDecision(RETHROW); + + // Inherited from DefaultRetryPolicy - retry next on other exceptions + assertOnErrorResponse(OverloadedException.class, 0).hasDecision(RETRY_NEXT); + assertOnErrorResponse(OverloadedException.class, 1).hasDecision(RETRY_NEXT); + assertOnErrorResponse(ServerError.class, 0).hasDecision(RETRY_NEXT); + assertOnErrorResponse(ServerError.class, 1).hasDecision(RETRY_NEXT); + } +} diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/RetryPolicyTestBase.java b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/RetryPolicyTestBase.java new file mode 100644 index 0000000000..0fdda57840 --- /dev/null +++ b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/RetryPolicyTestBase.java @@ -0,0 +1,68 @@ +package io.stargate.sgv2.jsonapi.service.cqldriver.retry; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +import com.datastax.oss.driver.api.core.ConsistencyLevel; +import com.datastax.oss.driver.api.core.retry.RetryDecision; +import com.datastax.oss.driver.api.core.retry.RetryPolicy; +import com.datastax.oss.driver.api.core.retry.RetryVerdict; +import com.datastax.oss.driver.api.core.servererrors.CoordinatorException; +import com.datastax.oss.driver.api.core.servererrors.WriteType; +import com.datastax.oss.driver.api.core.session.Request; +import org.assertj.core.api.AbstractAssert; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public abstract class RetryPolicyTestBase { + private final RetryPolicy policy; + + @Mock private Request request; + + protected RetryPolicyTestBase(RetryPolicy policy) { + this.policy = policy; + } + + protected RetryVerdictAssert assertOnReadTimeout( + ConsistencyLevel cl, int blockFor, int received, boolean dataPresent, int retryCount) { + return new RetryVerdictAssert( + policy.onReadTimeoutVerdict(request, cl, blockFor, received, dataPresent, retryCount)); + } + + protected RetryVerdictAssert assertOnWriteTimeout( + ConsistencyLevel cl, WriteType writeType, int blockFor, int received, int retryCount) { + return new RetryVerdictAssert( + policy.onWriteTimeoutVerdict(request, cl, writeType, blockFor, received, retryCount)); + } + + protected RetryVerdictAssert assertOnUnavailable( + ConsistencyLevel cl, int required, int alive, int retryCount) { + return new RetryVerdictAssert( + policy.onUnavailableVerdict(request, cl, required, alive, retryCount)); + } + + protected RetryVerdictAssert assertOnRequestAborted( + Class errorClass, int retryCount) { + return new RetryVerdictAssert( + policy.onRequestAbortedVerdict(request, mock(errorClass), retryCount)); + } + + protected RetryVerdictAssert assertOnErrorResponse( + Class errorClass, int retryCount) { + return new RetryVerdictAssert( + policy.onErrorResponseVerdict(request, mock(errorClass), retryCount)); + } + + public static class RetryVerdictAssert extends AbstractAssert { + RetryVerdictAssert(RetryVerdict actual) { + super(actual, RetryVerdictAssert.class); + } + + public RetryVerdictAssert hasDecision(RetryDecision decision) { + assertThat(actual.getRetryDecision()).isEqualTo(decision); + return this; + } + } +} From 992d36292b9f43b6c1e739c254e598ba67dc2829 Mon Sep 17 00:00:00 2001 From: Hazel Date: Tue, 28 Jan 2025 09:14:05 -0800 Subject: [PATCH 07/11] refactor CqlRetryPolicy class --- .../service/cqldriver/CqlRetryPolicy.java | 184 +++++++++++------- .../cqldriver/retry/CqlRetryPolicyTest.java | 2 +- 2 files changed, 112 insertions(+), 74 deletions(-) diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java index 4b59cef39c..63b4489c6e 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java @@ -1,16 +1,14 @@ package io.stargate.sgv2.jsonapi.service.cqldriver; import com.datastax.oss.driver.api.core.ConsistencyLevel; -import com.datastax.oss.driver.api.core.context.DriverContext; +import com.datastax.oss.driver.api.core.connection.ClosedConnectionException; +import com.datastax.oss.driver.api.core.connection.HeartbeatException; import com.datastax.oss.driver.api.core.retry.RetryDecision; +import com.datastax.oss.driver.api.core.retry.RetryPolicy; import com.datastax.oss.driver.api.core.retry.RetryVerdict; -import com.datastax.oss.driver.api.core.servererrors.CASWriteUnknownException; -import com.datastax.oss.driver.api.core.servererrors.CoordinatorException; -import com.datastax.oss.driver.api.core.servererrors.TruncateException; -import com.datastax.oss.driver.api.core.servererrors.WriteType; +import com.datastax.oss.driver.api.core.servererrors.*; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.internal.core.retry.DefaultRetryPolicy; -import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting; import edu.umd.cs.findbugs.annotations.NonNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,40 +29,10 @@ * standards. * */ -public class CqlRetryPolicy extends DefaultRetryPolicy { +public class CqlRetryPolicy implements RetryPolicy { private static final Logger LOG = LoggerFactory.getLogger(CqlRetryPolicy.class); private static final int MAX_RETRIES = Integer.getInteger("stargate.cql_proxy.max_retries", 3); - @VisibleForTesting - public static final String RETRYING_ON_READ_TIMEOUT = - "[{}] Retrying on read timeout on same host (consistency: {}, required responses: {}, " - + "received responses: {}, data retrieved: {}, retries: {}, retry decision: {})"; - - @VisibleForTesting - public static final String RETRYING_ON_WRITE_TIMEOUT = - "[{}] Retrying on write timeout on same host (consistency: {}, write type: {}, " - + "required acknowledgments: {}, received acknowledgments: {}, retries: {}, retry decision: {})"; - - @VisibleForTesting - public static final String RETRYING_ON_UNAVAILABLE = - "[{}] Retrying on unavailable exception on next host (consistency: {}, " - + "required replica: {}, alive replica: {}, retries: {}, retry decision: {})"; - - @VisibleForTesting - public static final String RETRYING_ON_ABORTED = - "[{}] Retrying on aborted request on next host (retries: {}, error: {}, retry decision: {})"; - - @VisibleForTesting - public static final String RETRYING_ON_ERROR = - "[{}] Retrying on node error on next host (retries: {}, error: {}, retry decision: {})"; - - private final String logPrefix; - - public CqlRetryPolicy(DriverContext context, String profileName) { - super(context, profileName); - this.logPrefix = (context != null ? context.getSessionName() : null) + "|" + profileName; - } - @Override public RetryVerdict onReadTimeoutVerdict( @NonNull Request request, @@ -73,23 +41,24 @@ public RetryVerdict onReadTimeoutVerdict( int received, boolean dataPresent, int retryCount) { - var retryVerdict = - super.onReadTimeoutVerdict(request, cl, blockFor, received, dataPresent, retryCount); - var retryDecision = retryVerdict.getRetryDecision(); + + RetryDecision retryDecision = + (retryCount < MAX_RETRIES && received >= blockFor && !dataPresent) + ? RetryDecision.RETRY_SAME + : RetryDecision.RETHROW; if (LOG.isInfoEnabled()) { LOG.info( - RETRYING_ON_READ_TIMEOUT, - logPrefix, + "Retrying on read timeout on same host (consistency: {}, required responses: {}, received responses: {}, data retrieved: {}, retries: {}, retry decision: {})", cl, blockFor, received, - false, + dataPresent, retryCount, retryDecision); } - return retryVerdict; + return () -> retryDecision; } @Override @@ -100,17 +69,17 @@ public RetryVerdict onWriteTimeoutVerdict( int blockFor, int received, int retryCount) { - final RetryDecision retryDecision; - // TODO(Hazel): only retry two write type or all? - if (retryCount < MAX_RETRIES && (writeType == WriteType.CAS || writeType == WriteType.SIMPLE)) { - retryDecision = RetryDecision.RETRY_SAME; - } else { - retryDecision = RetryDecision.RETHROW; - } + + // Collections use lightweight transactions, the write type is either CAS or SIMPLE. Tables use + // SIMPLE for writes. + final RetryDecision retryDecision = + (retryCount < MAX_RETRIES && (writeType == WriteType.CAS || writeType == WriteType.SIMPLE)) + ? RetryDecision.RETRY_SAME + : RetryDecision.RETHROW; + if (LOG.isInfoEnabled()) { LOG.info( - RETRYING_ON_WRITE_TIMEOUT, - logPrefix, + "Retrying on write timeout on same host (consistency: {}, write type: {}, required acknowledgments: {}, received acknowledgments: {}, retries: {}, retry decision: {})", cl, writeType, blockFor, @@ -118,6 +87,7 @@ public RetryVerdict onWriteTimeoutVerdict( retryCount, retryDecision); } + return () -> retryDecision; } @@ -128,29 +98,41 @@ public RetryVerdict onUnavailableVerdict( int required, int alive, int retryCount) { - // TODO(Hazel): no error passed in this method, cannot tailor the retry decision for - // UnavailableException, override the default retry 1 to retry 3? - var retryVerdict = super.onUnavailableVerdict(request, cl, required, alive, retryCount); - var retryDecision = retryVerdict.getRetryDecision(); + + RetryDecision retryDecision = + (retryCount < MAX_RETRIES) ? RetryDecision.RETRY_NEXT : RetryDecision.RETHROW; if (LOG.isInfoEnabled()) { - LOG.info(RETRYING_ON_UNAVAILABLE, logPrefix, cl, required, alive, retryCount, retryDecision); + LOG.info( + "Retrying on unavailable exception on next host (consistency: {}, required replica: {}, alive replica: {}, retries: {}, retry decision: {})", + cl, + required, + alive, + retryCount, + retryDecision); } - return retryVerdict; + return () -> retryDecision; } @Override public RetryVerdict onRequestAbortedVerdict( @NonNull Request request, @NonNull Throwable error, int retryCount) { - var retryVerdict = super.onRequestAbortedVerdict(request, error, retryCount); - var retryDecision = retryVerdict.getRetryDecision(); + + RetryDecision retryDecision = + (error instanceof ClosedConnectionException || error instanceof HeartbeatException) + ? RetryDecision.RETRY_NEXT + : RetryDecision.RETHROW; if (LOG.isInfoEnabled()) { - LOG.info(RETRYING_ON_ABORTED, logPrefix, retryCount, error, retryDecision); + LOG.info( + "Retrying on aborted request on next host (retries: {}, error: {}, retry decision: {})", + retryCount, + error, + retryDecision); } - return retryVerdict; + return () -> retryDecision; } @Override @@ -159,21 +141,77 @@ public RetryVerdict onErrorResponseVerdict( var retryDecision = switch (error) { - // TODO(Hazel): what decision? RETRY_SAME or RETRY_NEXT? default doesn't care the - // retryCount - case CASWriteUnknownException e -> - (retryCount < MAX_RETRIES) ? RetryDecision.RETRY_NEXT : RetryDecision.RETHROW; - - case TruncateException e -> - (retryCount < MAX_RETRIES) ? RetryDecision.RETRY_NEXT : RetryDecision.RETHROW; - - default -> super.onErrorResponseVerdict(request, error, retryCount).getRetryDecision(); + case CASWriteUnknownException e -> handleErrorResponseRetry(retryCount); + case TruncateException e -> handleErrorResponseRetry(retryCount); + case ReadFailureException e -> handleErrorResponseRetry(retryCount); + case WriteFailureException e -> handleErrorResponseRetry(retryCount); + default -> RetryDecision.RETHROW; }; if (LOG.isInfoEnabled()) { - LOG.info(RETRYING_ON_ERROR, logPrefix, retryCount, error, retryDecision); + LOG.info( + "Retrying on node error on next host (retries: {}, error: {}, retry decision: {})", + retryCount, + error, + retryDecision); } return () -> retryDecision; } + + @Override + @Deprecated + public RetryDecision onReadTimeout( + @NonNull Request request, + @NonNull ConsistencyLevel cl, + int blockFor, + int received, + boolean dataPresent, + int retryCount) { + throw new UnsupportedOperationException("onReadTimeout"); + } + + @Override + @Deprecated + public RetryDecision onWriteTimeout( + @NonNull Request request, + @NonNull ConsistencyLevel cl, + @NonNull WriteType writeType, + int blockFor, + int received, + int retryCount) { + throw new UnsupportedOperationException("onWriteTimeout"); + } + + @Override + @Deprecated + public RetryDecision onUnavailable( + @NonNull Request request, + @NonNull ConsistencyLevel cl, + int required, + int alive, + int retryCount) { + throw new UnsupportedOperationException("onUnavailable"); + } + + @Override + @Deprecated + public RetryDecision onRequestAborted( + @NonNull Request request, @NonNull Throwable error, int retryCount) { + throw new UnsupportedOperationException("onRequestAborted"); + } + + @Override + @Deprecated + public RetryDecision onErrorResponse( + @NonNull Request request, @NonNull CoordinatorException error, int retryCount) { + throw new UnsupportedOperationException("onErrorResponse"); + } + + @Override + public void close() {} + + private RetryDecision handleErrorResponseRetry(int retryCount) { + return (retryCount < MAX_RETRIES) ? RetryDecision.RETRY_NEXT : RetryDecision.RETHROW; + } } diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/CqlRetryPolicyTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/CqlRetryPolicyTest.java index 0c3a61a751..ef34294308 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/CqlRetryPolicyTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/CqlRetryPolicyTest.java @@ -16,7 +16,7 @@ public class CqlRetryPolicyTest extends RetryPolicyTestBase { public CqlRetryPolicyTest() { - super(new CqlRetryPolicy(null, null)); + super(new CqlRetryPolicy()); } @Test From 8c162329629053bdc280fba2ac7cc18ee4c8c082 Mon Sep 17 00:00:00 2001 From: Hazel Date: Tue, 28 Jan 2025 10:57:36 -0800 Subject: [PATCH 08/11] refactor BaseCqlRetryPolicy and add AstraCqlRetryPolicy --- .../cqldriver/AstraCqlRetryPolicy.java | 25 ++++ ...tryPolicy.java => BaseCqlRetryPolicy.java} | 111 +++++++++++++----- 2 files changed, 104 insertions(+), 32 deletions(-) create mode 100644 src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/AstraCqlRetryPolicy.java rename src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/{CqlRetryPolicy.java => BaseCqlRetryPolicy.java} (60%) diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/AstraCqlRetryPolicy.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/AstraCqlRetryPolicy.java new file mode 100644 index 0000000000..fbd1807c9b --- /dev/null +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/AstraCqlRetryPolicy.java @@ -0,0 +1,25 @@ +package io.stargate.sgv2.jsonapi.service.cqldriver; + +import com.datastax.oss.driver.api.core.retry.RetryDecision; + +/** + * In Astra, CQL router will be used and retry on the next node will fail. So, the decision will be + * to retry on the same node. + */ +public class AstraCqlRetryPolicy extends BaseCqlRetryPolicy { + + @Override + protected RetryDecision retryDecisionForUnavailable() { + return RetryDecision.RETRY_SAME; + } + + @Override + protected RetryDecision retryDecisionForRequestAborted() { + return RetryDecision.RETRY_SAME; + } + + @Override + protected RetryDecision retryDecisionForErrorResponse() { + return RetryDecision.RETRY_SAME; + } +} diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/BaseCqlRetryPolicy.java similarity index 60% rename from src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java rename to src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/BaseCqlRetryPolicy.java index 63b4489c6e..ba0fa1aaa6 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlRetryPolicy.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/BaseCqlRetryPolicy.java @@ -15,24 +15,24 @@ /** * Custom retry policy tailored for DataAPI, providing distinct retry logic for specific scenarios - * compared to {@link DefaultRetryPolicy}. - * - *

Key differences from the default implementation: - * - *

    - *
  • Overrides {@code onWriteTimeoutVerdict} and {@code onErrorResponseVerdict} to customize - * retry behavior for write timeouts and error responses. - *
  • Other methods retain the default logic but include additional log messages. - *
  • Logs provide enhanced details, including errors and retry decisions, for improved debugging - * and monitoring. - *
  • Logs are recorded at the INFO level instead of TRACE level, aligning with DataAPI logging - * standards. - *
+ * compared to {@link DefaultRetryPolicy}. Logs are recorded at the INFO level instead of TRACE + * level, aligning with DataAPI logging standards. */ -public class CqlRetryPolicy implements RetryPolicy { - private static final Logger LOG = LoggerFactory.getLogger(CqlRetryPolicy.class); +public class BaseCqlRetryPolicy implements RetryPolicy { + private static final Logger LOG = LoggerFactory.getLogger(BaseCqlRetryPolicy.class); private static final int MAX_RETRIES = Integer.getInteger("stargate.cql_proxy.max_retries", 3); + /** + * {@inheritDoc} + * + *

This implementation triggers a maximum of {@code MAX_RETRIES} retry (to the same node), and + * only if enough replicas had responded to the read request but data was not retrieved amongst + * those. That usually means that enough replicas are alive to satisfy the consistency, but the + * coordinator picked a dead one for data retrieval, not having detected that replica as dead yet. + * The reasoning is that by the time we get the timeout, the dead replica will likely have been + * detected as dead and the retry has a high chance of success. Otherwise, the exception is + * rethrown. + */ @Override public RetryVerdict onReadTimeoutVerdict( @NonNull Request request, @@ -61,6 +61,14 @@ public RetryVerdict onReadTimeoutVerdict( return () -> retryDecision; } + /** + * {@inheritDoc} + * + *

This implementation triggers a maximum of {@code MAX_RETRIES} retries (to the same node), + * and only for {@code WriteType.CAS} and {@code WriteType.SIMPLE} write. The reasoning is that + * collections use lightweight transactions, the write type is either CAS or SIMPLE and tables use + * SIMPLE for writes. + */ @Override public RetryVerdict onWriteTimeoutVerdict( @NonNull Request request, @@ -70,8 +78,6 @@ public RetryVerdict onWriteTimeoutVerdict( int received, int retryCount) { - // Collections use lightweight transactions, the write type is either CAS or SIMPLE. Tables use - // SIMPLE for writes. final RetryDecision retryDecision = (retryCount < MAX_RETRIES && (writeType == WriteType.CAS || writeType == WriteType.SIMPLE)) ? RetryDecision.RETRY_SAME @@ -91,6 +97,18 @@ public RetryVerdict onWriteTimeoutVerdict( return () -> retryDecision; } + /** + * {@inheritDoc} + * + *

This implementation triggers a maximum of {@code MAX_RETRIES} retry, to the next node in the + * query plan. The rationale is that the first coordinator might have been network-isolated from + * all other nodes (thinking they're down), but still able to communicate with the client; in that + * case, retrying on the same host has almost no chance of success, but moving to the next host + * might solve the issue. + * + *

Note: In Astra, CQL router will be used and retry on the next node will fail. So, the + * decision will be to retry on the same node. + */ @Override public RetryVerdict onUnavailableVerdict( @NonNull Request request, @@ -100,7 +118,7 @@ public RetryVerdict onUnavailableVerdict( int retryCount) { RetryDecision retryDecision = - (retryCount < MAX_RETRIES) ? RetryDecision.RETRY_NEXT : RetryDecision.RETHROW; + (retryCount < MAX_RETRIES) ? retryDecisionForUnavailable() : RetryDecision.RETHROW; if (LOG.isInfoEnabled()) { LOG.info( @@ -115,13 +133,24 @@ public RetryVerdict onUnavailableVerdict( return () -> retryDecision; } + /** + * {@inheritDoc} + * + *

This implementation retries on the next node if the connection was closed, and rethrows + * (assuming a driver bug) in all other cases. + * + *

Note: In Astra, CQL router will be used and retry on the next node will fail. So, the + * decision will be to retry on the same node. + */ @Override public RetryVerdict onRequestAbortedVerdict( @NonNull Request request, @NonNull Throwable error, int retryCount) { RetryDecision retryDecision = - (error instanceof ClosedConnectionException || error instanceof HeartbeatException) - ? RetryDecision.RETRY_NEXT + (retryCount < MAX_RETRIES + && (error instanceof ClosedConnectionException + || error instanceof HeartbeatException)) + ? retryDecisionForRequestAborted() : RetryDecision.RETHROW; if (LOG.isInfoEnabled()) { @@ -135,17 +164,27 @@ public RetryVerdict onRequestAbortedVerdict( return () -> retryDecision; } + /** + * {@inheritDoc} + * + *

This implementation rethrows read and write failures, and retries other errors on the next + * node. + * + *

Note: In Astra, CQL router will be used and retry on the next node will fail. So, the + * decision will be to retry on the same node. + */ @Override public RetryVerdict onErrorResponseVerdict( @NonNull Request request, @NonNull CoordinatorException error, int retryCount) { + // Issue1830: CASWriteUnknownException and TruncateException have been included in the default + // case. var retryDecision = switch (error) { - case CASWriteUnknownException e -> handleErrorResponseRetry(retryCount); - case TruncateException e -> handleErrorResponseRetry(retryCount); - case ReadFailureException e -> handleErrorResponseRetry(retryCount); - case WriteFailureException e -> handleErrorResponseRetry(retryCount); - default -> RetryDecision.RETHROW; + case ReadFailureException e -> RetryDecision.RETHROW; + case WriteFailureException e -> RetryDecision.RETHROW; + default -> + (retryCount < MAX_RETRIES) ? retryDecisionForErrorResponse() : RetryDecision.RETHROW; }; if (LOG.isInfoEnabled()) { @@ -159,6 +198,21 @@ public RetryVerdict onErrorResponseVerdict( return () -> retryDecision; } + @Override + public void close() {} + + protected RetryDecision retryDecisionForUnavailable() { + return RetryDecision.RETRY_NEXT; + } + + protected RetryDecision retryDecisionForRequestAborted() { + return RetryDecision.RETRY_NEXT; + } + + protected RetryDecision retryDecisionForErrorResponse() { + return RetryDecision.RETRY_NEXT; + } + @Override @Deprecated public RetryDecision onReadTimeout( @@ -207,11 +261,4 @@ public RetryDecision onErrorResponse( @NonNull Request request, @NonNull CoordinatorException error, int retryCount) { throw new UnsupportedOperationException("onErrorResponse"); } - - @Override - public void close() {} - - private RetryDecision handleErrorResponseRetry(int retryCount) { - return (retryCount < MAX_RETRIES) ? RetryDecision.RETRY_NEXT : RetryDecision.RETHROW; - } } From ee48ab145b9560c517ae32868a677d8d21ff28cc Mon Sep 17 00:00:00 2001 From: Hazel Date: Tue, 28 Jan 2025 11:16:05 -0800 Subject: [PATCH 09/11] fix unit tests --- .../retry/AstraCqlRetryPolicyTest.java | 61 +++++++++++++++++++ ...yTest.java => BaseCqlRetryPolicyTest.java} | 35 ++++++----- 2 files changed, 81 insertions(+), 15 deletions(-) create mode 100644 src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/AstraCqlRetryPolicyTest.java rename src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/{CqlRetryPolicyTest.java => BaseCqlRetryPolicyTest.java} (75%) diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/AstraCqlRetryPolicyTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/AstraCqlRetryPolicyTest.java new file mode 100644 index 0000000000..77bc408834 --- /dev/null +++ b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/AstraCqlRetryPolicyTest.java @@ -0,0 +1,61 @@ +package io.stargate.sgv2.jsonapi.service.cqldriver.retry; + +import static com.datastax.oss.driver.api.core.DefaultConsistencyLevel.QUORUM; +import static com.datastax.oss.driver.api.core.retry.RetryDecision.*; + +import com.datastax.oss.driver.api.core.connection.ClosedConnectionException; +import com.datastax.oss.driver.api.core.connection.HeartbeatException; +import com.datastax.oss.driver.api.core.servererrors.CASWriteUnknownException; +import com.datastax.oss.driver.api.core.servererrors.ReadFailureException; +import com.datastax.oss.driver.api.core.servererrors.TruncateException; +import com.datastax.oss.driver.api.core.servererrors.WriteFailureException; +import io.stargate.sgv2.jsonapi.service.cqldriver.AstraCqlRetryPolicy; +import org.junit.Test; + +public class AstraCqlRetryPolicyTest extends RetryPolicyTestBase { + + public AstraCqlRetryPolicyTest() { + super(new AstraCqlRetryPolicy()); + } + + @Test + public void shouldProcessUnavailable() { + assertOnUnavailable(QUORUM, 2, 1, 0).hasDecision(RETRY_SAME); + assertOnUnavailable(QUORUM, 2, 1, 1).hasDecision(RETRY_SAME); + assertOnUnavailable(QUORUM, 2, 1, 2).hasDecision(RETRY_SAME); + assertOnUnavailable(QUORUM, 2, 1, 3).hasDecision(RETHROW); + } + + @Test + public void shouldProcessAbortedRequest() { + assertOnRequestAborted(ClosedConnectionException.class, 0).hasDecision(RETRY_SAME); + assertOnRequestAborted(ClosedConnectionException.class, 1).hasDecision(RETRY_SAME); + assertOnRequestAborted(ClosedConnectionException.class, 2).hasDecision(RETRY_SAME); + assertOnRequestAborted(ClosedConnectionException.class, 3).hasDecision(RETHROW); + + assertOnRequestAborted(HeartbeatException.class, 0).hasDecision(RETRY_SAME); + assertOnRequestAborted(HeartbeatException.class, 1).hasDecision(RETRY_SAME); + assertOnRequestAborted(HeartbeatException.class, 2).hasDecision(RETRY_SAME); + assertOnRequestAborted(HeartbeatException.class, 3).hasDecision(RETHROW); + + assertOnRequestAborted(Throwable.class, 0).hasDecision(RETHROW); + } + + @Test + public void shouldProcessErrorResponse() { + // rethrow on ReadFailureException and WriteFailureException + assertOnErrorResponse(ReadFailureException.class, 0).hasDecision(RETHROW); + assertOnErrorResponse(WriteFailureException.class, 0).hasDecision(RETHROW); + + // Issue1830 - retry 3 times on CASWriteUnknownException and TruncateException + assertOnErrorResponse(CASWriteUnknownException.class, 0).hasDecision(RETRY_SAME); + assertOnErrorResponse(CASWriteUnknownException.class, 1).hasDecision(RETRY_SAME); + assertOnErrorResponse(CASWriteUnknownException.class, 2).hasDecision(RETRY_SAME); + assertOnErrorResponse(CASWriteUnknownException.class, 3).hasDecision(RETHROW); + + assertOnErrorResponse(TruncateException.class, 0).hasDecision(RETRY_SAME); + assertOnErrorResponse(TruncateException.class, 1).hasDecision(RETRY_SAME); + assertOnErrorResponse(TruncateException.class, 2).hasDecision(RETRY_SAME); + assertOnErrorResponse(TruncateException.class, 3).hasDecision(RETHROW); + } +} diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/CqlRetryPolicyTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/BaseCqlRetryPolicyTest.java similarity index 75% rename from src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/CqlRetryPolicyTest.java rename to src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/BaseCqlRetryPolicyTest.java index ef34294308..ca0c95740e 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/CqlRetryPolicyTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/BaseCqlRetryPolicyTest.java @@ -10,19 +10,22 @@ import com.datastax.oss.driver.api.core.connection.ClosedConnectionException; import com.datastax.oss.driver.api.core.connection.HeartbeatException; import com.datastax.oss.driver.api.core.servererrors.*; -import io.stargate.sgv2.jsonapi.service.cqldriver.CqlRetryPolicy; +import io.stargate.sgv2.jsonapi.service.cqldriver.BaseCqlRetryPolicy; import org.junit.Test; -public class CqlRetryPolicyTest extends RetryPolicyTestBase { +public class BaseCqlRetryPolicyTest extends RetryPolicyTestBase { - public CqlRetryPolicyTest() { - super(new CqlRetryPolicy()); + public BaseCqlRetryPolicyTest() { + super(new BaseCqlRetryPolicy()); } @Test public void shouldProcessReadTimeouts() { assertOnReadTimeout(QUORUM, 2, 2, false, 0).hasDecision(RETRY_SAME); - assertOnReadTimeout(QUORUM, 2, 2, false, 1).hasDecision(RETHROW); + assertOnReadTimeout(QUORUM, 2, 2, false, 1).hasDecision(RETRY_SAME); + assertOnReadTimeout(QUORUM, 2, 2, false, 2).hasDecision(RETRY_SAME); + assertOnReadTimeout(QUORUM, 2, 2, false, 3).hasDecision(RETHROW); + assertOnReadTimeout(QUORUM, 2, 2, true, 0).hasDecision(RETHROW); assertOnReadTimeout(QUORUM, 2, 1, true, 0).hasDecision(RETHROW); assertOnReadTimeout(QUORUM, 2, 1, false, 0).hasDecision(RETHROW); @@ -44,39 +47,41 @@ public void shouldProcessWriteTimeouts() { @Test public void shouldProcessUnavailable() { assertOnUnavailable(QUORUM, 2, 1, 0).hasDecision(RETRY_NEXT); - assertOnUnavailable(QUORUM, 2, 1, 1).hasDecision(RETHROW); + assertOnUnavailable(QUORUM, 2, 1, 1).hasDecision(RETRY_NEXT); + assertOnUnavailable(QUORUM, 2, 1, 2).hasDecision(RETRY_NEXT); + assertOnUnavailable(QUORUM, 2, 1, 3).hasDecision(RETHROW); } @Test public void shouldProcessAbortedRequest() { assertOnRequestAborted(ClosedConnectionException.class, 0).hasDecision(RETRY_NEXT); assertOnRequestAborted(ClosedConnectionException.class, 1).hasDecision(RETRY_NEXT); + assertOnRequestAborted(ClosedConnectionException.class, 2).hasDecision(RETRY_NEXT); + assertOnRequestAborted(ClosedConnectionException.class, 3).hasDecision(RETHROW); + assertOnRequestAborted(HeartbeatException.class, 0).hasDecision(RETRY_NEXT); assertOnRequestAborted(HeartbeatException.class, 1).hasDecision(RETRY_NEXT); + assertOnRequestAborted(HeartbeatException.class, 2).hasDecision(RETRY_NEXT); + assertOnRequestAborted(HeartbeatException.class, 3).hasDecision(RETHROW); + assertOnRequestAborted(Throwable.class, 0).hasDecision(RETHROW); } @Test public void shouldProcessErrorResponse() { - // Inherited from DefaultRetryPolicy - rethrow on ReadFailureException and WriteFailureException + // rethrow on ReadFailureException and WriteFailureException assertOnErrorResponse(ReadFailureException.class, 0).hasDecision(RETHROW); assertOnErrorResponse(WriteFailureException.class, 0).hasDecision(RETHROW); - // Override from CqlRetryPolicy - retry 3 times on CASWriteUnknownException and - // TruncateException + // Issue1830 - retry 3 times on CASWriteUnknownException and TruncateException assertOnErrorResponse(CASWriteUnknownException.class, 0).hasDecision(RETRY_NEXT); assertOnErrorResponse(CASWriteUnknownException.class, 1).hasDecision(RETRY_NEXT); assertOnErrorResponse(CASWriteUnknownException.class, 2).hasDecision(RETRY_NEXT); assertOnErrorResponse(CASWriteUnknownException.class, 3).hasDecision(RETHROW); + assertOnErrorResponse(TruncateException.class, 0).hasDecision(RETRY_NEXT); assertOnErrorResponse(TruncateException.class, 1).hasDecision(RETRY_NEXT); assertOnErrorResponse(TruncateException.class, 2).hasDecision(RETRY_NEXT); assertOnErrorResponse(TruncateException.class, 3).hasDecision(RETHROW); - - // Inherited from DefaultRetryPolicy - retry next on other exceptions - assertOnErrorResponse(OverloadedException.class, 0).hasDecision(RETRY_NEXT); - assertOnErrorResponse(OverloadedException.class, 1).hasDecision(RETRY_NEXT); - assertOnErrorResponse(ServerError.class, 0).hasDecision(RETRY_NEXT); - assertOnErrorResponse(ServerError.class, 1).hasDecision(RETRY_NEXT); } } From 85a9193ac1ef4fe8036029388b5caf0dd534a937 Mon Sep 17 00:00:00 2001 From: Hazel Date: Tue, 28 Jan 2025 11:18:17 -0800 Subject: [PATCH 10/11] change config and remove current retry policy --- .../cqldriver/CqlProxyRetryPolicy.java | 150 ------------------ src/main/resources/application.conf | 2 +- 2 files changed, 1 insertion(+), 151 deletions(-) delete mode 100644 src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlProxyRetryPolicy.java diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlProxyRetryPolicy.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlProxyRetryPolicy.java deleted file mode 100644 index 9fdd745e8e..0000000000 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CqlProxyRetryPolicy.java +++ /dev/null @@ -1,150 +0,0 @@ -package io.stargate.sgv2.jsonapi.service.cqldriver; - -import static com.datastax.oss.driver.internal.core.retry.DefaultRetryPolicy.RETRYING_ON_UNAVAILABLE; - -import com.datastax.oss.driver.api.core.ConsistencyLevel; -import com.datastax.oss.driver.api.core.context.DriverContext; -import com.datastax.oss.driver.api.core.retry.RetryDecision; -import com.datastax.oss.driver.api.core.retry.RetryPolicy; -import com.datastax.oss.driver.api.core.retry.RetryVerdict; -import com.datastax.oss.driver.api.core.servererrors.CoordinatorException; -import com.datastax.oss.driver.api.core.servererrors.WriteType; -import com.datastax.oss.driver.api.core.session.Request; -import edu.umd.cs.findbugs.annotations.NonNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This retry policy while executing cql statements is useful when the requests are delegated to a - * proxy layer which is responsible for retrying the requests. This policy will only retry once if - * the intermediate layer is unavailable for some reason. In all other cases, this will rethrow the - * exception to avoid retrying the requests at the driver level. - */ -public class CqlProxyRetryPolicy implements RetryPolicy { - private static final Logger LOG = LoggerFactory.getLogger(CqlProxyRetryPolicy.class); - private final String logPrefix; - - private static final int MAX_RETRIES = Integer.getInteger("stargate.cql_proxy.max_retries", 3); - - public CqlProxyRetryPolicy(DriverContext context, String profileName) { - this.logPrefix = (context != null ? context.getSessionName() : null) + "|" + profileName; - } - - @Override - public RetryDecision onReadTimeout( - @NonNull Request request, - @NonNull ConsistencyLevel cl, - int blockFor, - int received, - boolean dataPresent, - int retryCount) { - return RetryDecision.RETHROW; - } - - @Override - public RetryVerdict onReadTimeoutVerdict( - @NonNull Request request, - @NonNull ConsistencyLevel cl, - int blockFor, - int received, - boolean dataPresent, - int retryCount) { - RetryDecision retryDecision = - onReadTimeout(request, cl, blockFor, received, dataPresent, retryCount); - return () -> retryDecision; - } - - @Override - public RetryDecision onWriteTimeout( - @NonNull Request request, - @NonNull ConsistencyLevel cl, - @NonNull WriteType writeType, - int blockFor, - int received, - int retryCount) { - var retryDecision = RetryDecision.RETHROW; - if (retryCount < MAX_RETRIES && writeType == WriteType.CAS) { - retryDecision = RetryDecision.RETRY_SAME; - } - if (LOG.isInfoEnabled()) { - LOG.info( - "Write timeout for request writeType : {}, retryCount: {}, consistency level: {} -> retry decision is: {}", - writeType, - retryCount, - cl, - retryDecision); - } - return retryDecision; - } - - @Override - public RetryVerdict onWriteTimeoutVerdict( - @NonNull Request request, - @NonNull ConsistencyLevel cl, - @NonNull WriteType writeType, - int blockFor, - int received, - int retryCount) { - RetryDecision retryDecision = - onWriteTimeout(request, cl, writeType, blockFor, received, retryCount); - return () -> retryDecision; - } - - @Override - public RetryDecision onUnavailable( - @NonNull Request request, - @NonNull ConsistencyLevel cl, - int required, - int alive, - int retryCount) { - RetryDecision decision = (retryCount == 0) ? RetryDecision.RETRY_SAME : RetryDecision.RETHROW; - - if (decision == RetryDecision.RETRY_SAME && LOG.isTraceEnabled()) { - LOG.trace(RETRYING_ON_UNAVAILABLE, logPrefix, cl, required, alive, retryCount); - } - - return decision; - } - - @Override - public RetryVerdict onUnavailableVerdict( - @NonNull Request request, - @NonNull ConsistencyLevel cl, - int required, - int alive, - int retryCount) { - RetryDecision retryDecision = onUnavailable(request, cl, required, alive, retryCount); - return () -> retryDecision; - } - - @Override - public RetryDecision onRequestAborted( - @NonNull Request request, @NonNull Throwable error, int retryCount) { - return RetryDecision.RETHROW; - } - - @Override - public RetryVerdict onRequestAbortedVerdict( - @NonNull Request request, @NonNull Throwable error, int retryCount) { - RetryDecision retryDecision = onRequestAborted(request, error, retryCount); - return () -> retryDecision; - } - - @Override - public RetryDecision onErrorResponse( - @NonNull Request request, @NonNull CoordinatorException error, int retryCount) { - return RetryDecision.RETHROW; - } - - @Override - public RetryVerdict onErrorResponseVerdict( - @NonNull Request request, @NonNull CoordinatorException error, int retryCount) { - RetryDecision retryDecision = onErrorResponse(request, error, retryCount); - return () -> retryDecision; - } - - @Override - public void close() { - // nothing to do - } -} diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 6826cc80a0..589dd0d0b1 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -3,7 +3,7 @@ datastax-java-driver { version = V4 } advanced.retry-policy { - class = io.stargate.sgv2.jsonapi.service.cqldriver.CqlProxyRetryPolicy + class = io.stargate.sgv2.jsonapi.service.cqldriver.BaseCqlRetryPolicy } advanced.session-leak.threshold = 0 advanced.connection { From b096de09fc2194c9291ddf57fa996d26b9869639 Mon Sep 17 00:00:00 2001 From: Hazel Date: Tue, 28 Jan 2025 12:57:17 -0800 Subject: [PATCH 11/11] fix tests --- .../sgv2/jsonapi/service/cqldriver/AstraCqlRetryPolicy.java | 5 +++++ .../sgv2/jsonapi/service/cqldriver/BaseCqlRetryPolicy.java | 3 +++ .../service/cqldriver/retry/AstraCqlRetryPolicyTest.java | 2 +- .../service/cqldriver/retry/BaseCqlRetryPolicyTest.java | 2 +- 4 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/AstraCqlRetryPolicy.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/AstraCqlRetryPolicy.java index fbd1807c9b..c21fa11f62 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/AstraCqlRetryPolicy.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/AstraCqlRetryPolicy.java @@ -1,5 +1,6 @@ package io.stargate.sgv2.jsonapi.service.cqldriver; +import com.datastax.oss.driver.api.core.context.DriverContext; import com.datastax.oss.driver.api.core.retry.RetryDecision; /** @@ -8,6 +9,10 @@ */ public class AstraCqlRetryPolicy extends BaseCqlRetryPolicy { + public AstraCqlRetryPolicy(DriverContext context, String profileName) { + super(context, profileName); + } + @Override protected RetryDecision retryDecisionForUnavailable() { return RetryDecision.RETRY_SAME; diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/BaseCqlRetryPolicy.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/BaseCqlRetryPolicy.java index ba0fa1aaa6..ad7197eea0 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/BaseCqlRetryPolicy.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/BaseCqlRetryPolicy.java @@ -3,6 +3,7 @@ import com.datastax.oss.driver.api.core.ConsistencyLevel; import com.datastax.oss.driver.api.core.connection.ClosedConnectionException; import com.datastax.oss.driver.api.core.connection.HeartbeatException; +import com.datastax.oss.driver.api.core.context.DriverContext; import com.datastax.oss.driver.api.core.retry.RetryDecision; import com.datastax.oss.driver.api.core.retry.RetryPolicy; import com.datastax.oss.driver.api.core.retry.RetryVerdict; @@ -22,6 +23,8 @@ public class BaseCqlRetryPolicy implements RetryPolicy { private static final Logger LOG = LoggerFactory.getLogger(BaseCqlRetryPolicy.class); private static final int MAX_RETRIES = Integer.getInteger("stargate.cql_proxy.max_retries", 3); + public BaseCqlRetryPolicy(DriverContext context, String profileName) {} + /** * {@inheritDoc} * diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/AstraCqlRetryPolicyTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/AstraCqlRetryPolicyTest.java index 77bc408834..e73cd2fc70 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/AstraCqlRetryPolicyTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/AstraCqlRetryPolicyTest.java @@ -15,7 +15,7 @@ public class AstraCqlRetryPolicyTest extends RetryPolicyTestBase { public AstraCqlRetryPolicyTest() { - super(new AstraCqlRetryPolicy()); + super(new AstraCqlRetryPolicy(null, null)); } @Test diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/BaseCqlRetryPolicyTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/BaseCqlRetryPolicyTest.java index ca0c95740e..c7a3ff1d66 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/BaseCqlRetryPolicyTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/retry/BaseCqlRetryPolicyTest.java @@ -16,7 +16,7 @@ public class BaseCqlRetryPolicyTest extends RetryPolicyTestBase { public BaseCqlRetryPolicyTest() { - super(new BaseCqlRetryPolicy()); + super(new BaseCqlRetryPolicy(null, null)); } @Test