Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 14 additions & 17 deletions topic/src/main/java/tech/ydb/topic/impl/TopicRetryableStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import tech.ydb.core.StatusCode;

public abstract class TopicRetryableStream<R extends Message, W extends Message> {
protected final String debugId;
private final Logger logger;
private final String debugId;
private final RetryConfig retryConfig;
private final ScheduledExecutorService scheduler;

Expand All @@ -32,11 +32,6 @@ public TopicRetryableStream(Logger logger, String debugId, RetryConfig config, S
this.scheduler = scheduler;
}

Comment thread
alex268 marked this conversation as resolved.
@Override
public String toString() {
return "Session[" + debugId + "]";
}

protected abstract TopicStream<R, W> createNewStream(String debugId);
protected abstract W getInitRequest();

Expand All @@ -54,8 +49,7 @@ public void start() {
TopicStream<R, W> stream = createNewStream(streamID);

if (!realStream.compareAndSet(null, stream)) {
logger.warn("{} double start of stream, skipping", this);
stream.close();
logger.warn("[{}] double start of stream, skipping", debugId);
return;
}

Expand All @@ -78,18 +72,21 @@ protected void resetRetries() {
public void send(W msg) {
TopicStream<R, W> stream = realStream.get();
if (stream == null) {
logger.warn("{} send message before stream is ready", this);
logger.warn("[{}] send message before stream is ready", debugId);
return;
}
stream.send(msg);
}

public void close() {
public boolean close() {
isClosed = true;
TopicStream<R, W> stream = realStream.getAndSet(null);
if (stream != null) {
stream.close();
if (stream == null) {
return false;
}

stream.close();
return true;
}

private void onStreamStop(Status status, RetryPolicy policy) {
Expand All @@ -99,34 +96,34 @@ private void onStreamStop(Status status, RetryPolicy policy) {
}

if (policy == null) {
logger.warn("{} stopped by non-retryable status {}", this, status);
logger.warn("[{}] stopped by non-retryable status {}", debugId, status);
onClose(status);
return;
}

long nextRetryMs = state.nextRetryMs(policy);

if (nextRetryMs < 0) {
logger.warn("{} stopped after retry policy evaluation for status {}", this, status);
logger.warn("[{}] stopped after retry policy evaluation for status {}", debugId, status);
onClose(status);
return;
}

if (nextRetryMs == 0) { // retry immediately
logger.warn("{} retry #{}. Retry immediately...", this, state.retryNumber());
logger.warn("[{}] retry #{}. Retry immediately...", debugId, state.retryNumber());
onRetry(status);
start();
return;
}

// retry scheduling
logger.warn("{} retry #{}. Scheduling reconnect in {}ms...", debugId, state.retryNumber(), nextRetryMs);
logger.warn("[{}] retry #{}. Scheduling reconnect in {}ms...", debugId, state.retryNumber(), nextRetryMs);
onRetry(status);

try {
scheduler.schedule(this::start, nextRetryMs, TimeUnit.MILLISECONDS);
} catch (Exception ex) {
logger.error("{} cannot schedule reconnect, stopping", debugId, ex);
logger.error("[{}] cannot schedule reconnect, stopping", debugId, ex);
onClose(status);
}
}
Expand Down
43 changes: 43 additions & 0 deletions topic/src/main/java/tech/ydb/topic/settings/TopicRetryConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package tech.ydb.topic.settings;

import tech.ydb.common.retry.ExponentialBackoffRetry;
import tech.ydb.common.retry.RetryConfig;
import tech.ydb.common.retry.RetryPolicy;
import tech.ydb.core.Status;

/**
* Predefined {@link RetryConfig} instances for topic writers and readers.
* <p>
* Pass one of these constants (or a custom {@link RetryConfig}) to
* {@link WriterSettings.Builder#setRetryConfig} to control how the writer
* behaves when its underlying stream is interrupted.
*
* @author Aleksandr Gorshenin
*/
public class TopicRetryConfig {
// Max backoff will be random delay from 32.768s to 65.536s
private static final RetryPolicy DEFAULT_BACKOFF = new ExponentialBackoffRetry(32, 10);

/**
* Retry any stream disconnection indefinitely with exponential backoff.
* <p>
* Every status code, including {@link Status#SUCCESS}, is treated as retryable.
* The delay between reconnection attempts grows exponentially and is capped at a
* random value between 32 and 65 seconds.
* <p>
* This is the default retry configuration for topic writers and readers.
*/
public static final RetryConfig FOREVER = status -> DEFAULT_BACKOFF;
Comment thread
alex268 marked this conversation as resolved.

/**
* Disable retries entirely.
* <p>
* Any stream disconnection is reported immediately as a terminal error through
* the errors handler configured via
* {@link WriterSettings.Builder#setErrorsHandler}.
* Use this when you need full control over reconnection logic in application code.
*/
public static final RetryConfig NEVER = status -> null;

private TopicRetryConfig() { }
}
32 changes: 32 additions & 0 deletions topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.function.BiConsumer;

import tech.ydb.common.retry.RetryConfig;
import tech.ydb.core.Status;
import tech.ydb.topic.description.Codec;

Expand All @@ -20,6 +21,7 @@ public class WriterSettings {
private final int codec;
private final long maxSendBufferMemorySize;
private final int maxSendBufferMessagesCount;
private final RetryConfig retryConfig;
private final BiConsumer<Status, Throwable> errorsHandler;

private WriterSettings(Builder builder) {
Expand All @@ -31,6 +33,7 @@ private WriterSettings(Builder builder) {
this.codec = builder.codec;
this.maxSendBufferMemorySize = builder.maxSendBufferMemorySize;
this.maxSendBufferMessagesCount = builder.maxSendBufferMessagesCount;
this.retryConfig = builder.retryConfig;
this.errorsHandler = builder.errorsHandler;
}

Expand Down Expand Up @@ -58,6 +61,10 @@ public BiConsumer<Status, Throwable> getErrorsHandler() {
return errorsHandler;
}

public RetryConfig getRetryConfig() {
return retryConfig;
}

public Long getPartitionId() {
return partitionId;
}
Expand Down Expand Up @@ -86,6 +93,7 @@ public static class Builder {
private int codec = Codec.GZIP;
private long maxSendBufferMemorySize = MAX_MEMORY_USAGE_BYTES_DEFAULT;
private int maxSendBufferMessagesCount = MAX_IN_FLIGHT_COUNT_DEFAULT;
private RetryConfig retryConfig = TopicRetryConfig.FOREVER;
private BiConsumer<Status, Throwable> errorsHandler = null;

/**
Expand Down Expand Up @@ -183,6 +191,30 @@ public Builder setErrorsHandler(BiConsumer<Status, Throwable> handler) {
return this;
}

/**
* Set retry configuration for the writer's underlying stream connection.
* Controls how the writer reconnects when the stream is interrupted.
* <p>
* The default value is {@link TopicRetryConfig#FOREVER}, which retries any disconnection
* indefinitely with exponential backoff (up to ~65 seconds between attempts).
* <p>
* Use {@link TopicRetryConfig#NEVER} to disable retries and surface errors immediately
* via the errors handler set by {@link #setErrorsHandler}.
*
* @param config retry configuration, must not be {@code null}
* @return this builder
* @throws NullPointerException if {@code config} is {@code null}
* @see TopicRetryConfig#FOREVER
* @see TopicRetryConfig#NEVER
*/
public Builder setRetryConfig(RetryConfig config) {
if (config == null) {
throw new NullPointerException("RetryConfig must not be null");
}
this.retryConfig = config;
return this;
}
Comment thread
alex268 marked this conversation as resolved.

public WriterSettings build() {
return new WriterSettings(this);
}
Expand Down
93 changes: 75 additions & 18 deletions topic/src/main/java/tech/ydb/topic/write/impl/BufferManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import tech.ydb.core.Status;
import tech.ydb.topic.settings.WriterSettings;
import tech.ydb.topic.write.QueueOverflowException;

Expand All @@ -18,16 +19,18 @@ public class BufferManager {
// use logger from WriterImpl
private static final Logger logger = LoggerFactory.getLogger(WriterImpl.class);

private final String id;
private final String debugId;
private final long bufferMaxSize;
private final int maxCount;
private final int blockBitsCount;

private final Semaphore blocksAvailable;
private final Semaphore countAvailable;

private volatile Status closed = null;

public BufferManager(String id, WriterSettings settings) {
this.id = id;
this.debugId = id;

this.maxCount = settings.getMaxSendBufferMessagesCount();
this.bufferMaxSize = settings.getMaxSendBufferMemorySize();
Expand All @@ -42,67 +45,121 @@ public long getMaxSize() {
return bufferMaxSize;
}

public void close(Status status) {
this.closed = status;
// release all waiters
this.blocksAvailable.release(calculateBlocksCount(bufferMaxSize, blockBitsCount));
this.countAvailable.release(maxCount);
}
Comment thread
alex268 marked this conversation as resolved.

public void acquire(long messageSize) throws InterruptedException, QueueOverflowException {
if (closed != null) {
throw new IllegalStateException("Writer was closed with status " + closed);
}

countAvailable.acquire();

if (closed != null) {
countAvailable.release();
throw new IllegalStateException("Writer was closed with status " + closed);
}

int messageBlocks = calculateBlocksCount(messageSize, blockBitsCount);

try {
int messageBlocks = calculateBlocksCount(messageSize, blockBitsCount);
blocksAvailable.acquire(messageBlocks);
} catch (InterruptedException ex) {
countAvailable.release();
throw ex;
}

if (closed != null) {
blocksAvailable.release(messageBlocks);
countAvailable.release();
throw new IllegalStateException("Writer was closed with status " + closed);
}
}

public void tryAcquire(long messageSize) throws QueueOverflowException {
if (closed != null) {
throw new IllegalStateException("Writer was closed with status " + closed);
}

if (!countAvailable.tryAcquire()) {
String errorMessage = "[" + id + "] Rejecting a message due to reaching message queue in-flight limit of "
String errorMsg = "[" + debugId + "] Rejecting a message due to reaching message queue in-flight limit of "
+ maxCount;
logger.warn(errorMessage);
throw new QueueOverflowException(errorMessage);
logger.warn(errorMsg);
throw new QueueOverflowException(errorMsg);
}

if (closed != null) {
countAvailable.release();
throw new IllegalStateException("Writer was closed with status " + closed);
}

int messageBlocks = calculateBlocksCount(messageSize, blockBitsCount);
if (!blocksAvailable.tryAcquire(messageBlocks)) {
countAvailable.release();
int count = maxCount - countAvailable.availablePermits();
long size = ((long) blocksAvailable.availablePermits()) << blockBitsCount;
String errorMessage = "[" + id + "] Rejecting a message of " + messageSize +
String errorMsg = "[" + debugId + "] Rejecting a message of " + messageSize +
" bytes: not enough space in message queue. Buffer currently has " + count +
" messages with " + size + " / " + bufferMaxSize + " bytes available";
logger.warn(errorMessage);
throw new QueueOverflowException(errorMessage);
logger.warn(errorMsg);
throw new QueueOverflowException(errorMsg);
}

if (closed != null) {
blocksAvailable.release(messageBlocks);
countAvailable.release();
throw new IllegalStateException("Writer was closed with status " + closed);
}
}

public void tryAcquire(long messageSize, long timeout, TimeUnit unit) throws InterruptedException,
QueueOverflowException, TimeoutException {
if (closed != null) {
throw new IllegalStateException("Writer was closed with status " + closed);
}

long expireAt = System.nanoTime() + unit.toNanos(timeout);
if (!countAvailable.tryAcquire(timeout, unit)) {
String errorMessage = "[" + id + "] Rejecting a message due to reaching message queue in-flight limit of "
String errorMsg = "[" + debugId + "] Rejecting a message due to reaching message queue in-flight limit of "
+ maxCount;
logger.warn(errorMessage);
throw new TimeoutException(errorMessage);
logger.warn(errorMsg);
throw new TimeoutException(errorMsg);
}

if (closed != null) {
countAvailable.release();
throw new IllegalStateException("Writer was closed with status " + closed);
}

int messageBlocks = calculateBlocksCount(messageSize, blockBitsCount);

try {
// negative timeout is allowed for tryAcquire
long timeout2 = unit.convert(expireAt - System.nanoTime(), TimeUnit.NANOSECONDS);
int messageBlocks = calculateBlocksCount(messageSize, blockBitsCount);
if (!blocksAvailable.tryAcquire(messageBlocks, timeout2, unit)) {
long timeout2 = expireAt - System.nanoTime();
if (!blocksAvailable.tryAcquire(messageBlocks, timeout2, TimeUnit.NANOSECONDS)) {
countAvailable.release();
int count = maxCount - countAvailable.availablePermits();
long size = ((long) blocksAvailable.availablePermits()) << blockBitsCount;
String errorMessage = "[" + id + "] Rejecting a message of " + messageSize +
String errorMsg = "[" + debugId + "] Rejecting a message of " + messageSize +
" bytes: not enough space in message queue. Buffer currently has " + count +
" messages with " + size + " / " + bufferMaxSize + " bytes available";
logger.warn(errorMessage);
throw new TimeoutException(errorMessage);
logger.warn(errorMsg);
throw new TimeoutException(errorMsg);
}
} catch (InterruptedException ex) {
countAvailable.release();
throw ex;
}

if (closed != null) {
blocksAvailable.release(messageBlocks);
countAvailable.release();
throw new IllegalStateException("Writer was closed with status " + closed);
}
}

public void releaseMessage(long messageSize) {
Expand Down
Loading