Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response)
}

WriteAck mapAck(WriteAck.Statistics statistics, YdbTopic.StreamWriteMessage.WriteResponse.WriteAck ack) {
logger.debug("[{}] Received WriteAck with seqNo {} and status {}", debugId, ack.getSeqNo(),
logger.trace("[{}] Received WriteAck with seqNo {} and status {}", debugId, ack.getSeqNo(),
ack.getMessageWriteStatusCase());
if (ack.hasSkipped()) {
return new WriteAck(ack.getSeqNo(), WriteAck.State.ALREADY_WRITTEN, null, statistics);
Expand Down Expand Up @@ -122,7 +122,7 @@ public void onRetry(Status status) {

@Override
public void onClose(Status status) {
logger.debug("[{}] Session onClose with status {} called", debugId, status);
logger.info("[{}] Session closed with status {}", debugId, status);
Comment thread
alex268 marked this conversation as resolved.
listener.onClose(status);
if (errorsHandler != null && !status.isSuccess()) {
errorsHandler.accept(status, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ SentMessage nextMessageToSend() {
}

SentMessage sentMsg = new SentMessage(next, seqNo);
logger.debug("[{}] prepare sent message with seqNo {}", debugId, seqNo);
logger.trace("[{}] prepare sent message with seqNo {}", debugId, seqNo);
sent.offer(sentMsg);
return sentMsg;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
package tech.ydb.topic.impl;
package tech.ydb.topic;



import tech.ydb.topic.YdbTopicsIntegrationTest;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand All @@ -7,10 +11,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.config.Configurator;
import org.apache.logging.log4j.spi.ExtendedLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
Expand All @@ -31,14 +31,17 @@
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.read.events.ReadEventHandler;
import tech.ydb.topic.read.events.StartPartitionSessionEvent;
import tech.ydb.topic.read.impl.ReadPartitionSession;
import tech.ydb.topic.read.impl.AsyncReaderImpl;
import tech.ydb.topic.read.impl.ReaderImpl;
import tech.ydb.topic.settings.CreateTopicSettings;
import tech.ydb.topic.settings.ReadEventHandlersSettings;
import tech.ydb.topic.settings.ReaderSettings;
import tech.ydb.topic.settings.StartPartitionSessionSettings;
import tech.ydb.topic.settings.TopicReadSettings;
import tech.ydb.topic.settings.WriterSettings;
import tech.ydb.topic.utils.HideLoggersRule;
import tech.ydb.topic.write.SyncWriter;
import tech.ydb.topic.utils.HideLoggers;

/**
*
Expand All @@ -53,6 +56,9 @@ public class TopicReadersIntegrationTest {
@Rule
public final Timeout timeout = new Timeout(10, TimeUnit.SECONDS);

@Rule
public final HideLoggersRule hideLogger = new HideLoggersRule();

private final static String TEST_TOPIC = "topic_readers_test";

private final static String TEST_CONSUMER1 = "consumer";
Expand Down Expand Up @@ -101,47 +107,38 @@ public void dropTopic() {
}

@Test
@HideLoggers({ ReaderImpl.class, AsyncReaderImpl.class })
public void singleThreadExecutorTest() throws Exception {
ExtendedLogger silenceLogger = LogManager.getContext(true).getLogger(ReadPartitionSession.class);
Level level = silenceLogger.getLevel();

ReaderSettings readerSettings = ReaderSettings.newBuilder()
.addTopic(TopicReadSettings.newBuilder()
.setPath(TEST_TOPIC)
.build())
.setConsumerName(TEST_CONSUMER1)
.build();

try {
// temporary disable logging
Configurator.setLevel(silenceLogger, Level.OFF);

Semaphore messageCount = new Semaphore(0);
CompletableFuture<Boolean> processing = new CompletableFuture<>();
Semaphore messageCount = new Semaphore(0);
CompletableFuture<Boolean> processing = new CompletableFuture<>();

ExecutorService executor = Executors.newSingleThreadExecutor((r) -> new Thread(r, "test-executor"));
AsyncReader reader = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder()
.setExecutor(executor)
.setEventHandler((event) -> {
messageCount.release();
processing.join();
}).build()
);
ExecutorService executor = Executors.newSingleThreadExecutor((r) -> new Thread(r, "test-executor"));
AsyncReader reader = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder()
.setExecutor(executor)
.setEventHandler((event) -> {
messageCount.release();
processing.join();
}).build()
);

reader.init().join();
reader.init().join();

// wait for message committing
messageCount.acquireUninterruptibly();
// wait for message committing
messageCount.acquireUninterruptibly();

// stop reader
CompletableFuture<Void> f = reader.shutdown();
processing.completeExceptionally(new RuntimeException("shutdown"));
f.get(5, TimeUnit.SECONDS);
// stop reader
CompletableFuture<Void> f = reader.shutdown();
processing.completeExceptionally(new RuntimeException("shutdown"));
f.get(5, TimeUnit.SECONDS);

executor.shutdownNow();
} finally {
Configurator.setLevel(silenceLogger, level);
}
executor.shutdownNow();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package tech.ydb.topic.impl;
package tech.ydb.topic;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -13,10 +13,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.config.Configurator;
import org.apache.logging.log4j.spi.ExtendedLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -39,8 +35,10 @@
import tech.ydb.topic.settings.ReaderSettings;
import tech.ydb.topic.settings.TopicReadSettings;
import tech.ydb.topic.settings.WriterSettings;
import tech.ydb.topic.utils.HideLoggersRule;
import tech.ydb.topic.write.Message;
import tech.ydb.topic.write.SyncWriter;
import tech.ydb.topic.utils.HideLoggers;


/**
Expand All @@ -57,6 +55,9 @@ public class YdbTopicsCodecIntegrationTest {
@Rule
public final Timeout timeout = new Timeout(10, TimeUnit.SECONDS);

@Rule
public final HideLoggersRule hideLogger = new HideLoggersRule();

private final static String TEST_TOPIC1 = "integration_test_custom_codec_topic1";
private final static String TEST_TOPIC2 = "integration_test_custom_codec_topic2";
private final static String TEST_CONSUMER1 = "consumer_codec";
Expand Down Expand Up @@ -283,30 +284,21 @@ public void writeInOneTopicWithDifferentCodec() throws ExecutionException, Inter
*
*/
@Test
@HideLoggers({ MessageDecoder.class })
public void readShouldFailIfWithNotRegisteredCodec() throws ExecutionException, InterruptedException, TimeoutException {
ExtendedLogger silenceLogger = LogManager.getContext(true).getLogger(MessageDecoder.class);
Level level = silenceLogger.getLevel();

try {
// temporary disable logging
Configurator.setLevel(silenceLogger, Level.OFF);

client1 = createClient();
TopicClient client2 = createClient();
createTopic(client1, TEST_TOPIC1);
client1 = createClient();
TopicClient client2 = createClient();
createTopic(client1, TEST_TOPIC1);

Codec codec1 = new CustomCodec(1, 10113);
Codec codec1 = new CustomCodec(1, 10113);

client1.registerCodec(codec1);
writeData(10113, TEST_TOPIC1, client1);
client1.registerCodec(codec1);
writeData(10113, TEST_TOPIC1, client1);

readDataWithError(TEST_TOPIC1, client2);
readDataWithError(TEST_TOPIC1, client2);

client2.registerCodec(codec1);
readData(TEST_TOPIC1, client2);
} finally {
Configurator.setLevel(silenceLogger, level);
}
client2.registerCodec(codec1);
readData(TEST_TOPIC1, client2);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package tech.ydb.topic.impl;
package tech.ydb.topic;

import java.time.Duration;
import java.time.Instant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

import tech.ydb.topic.utils.HideLoggers;
import tech.ydb.topic.utils.HideLoggersRule;


/**
* @author Aleksandr Gorshenin
Expand All @@ -26,6 +30,9 @@ private static class IntHolder {
private volatile int value;
}

@Rule
public final HideLoggersRule hideLogger = new HideLoggersRule();

@Test
public void serialRunnableTest() throws InterruptedException {
IntHolder value = new IntHolder();
Expand Down Expand Up @@ -116,6 +123,7 @@ public void innerTasksTest() throws InterruptedException {
}

@Test
@HideLoggers({ SerialExecutor.class })
public void wrongExecuterTest() throws InterruptedException {
AtomicInteger value = new AtomicInteger();
ExecutorService pool = Executors.newCachedThreadPool();
Expand All @@ -135,6 +143,7 @@ public void wrongExecuterTest() throws InterruptedException {
}

@Test
@HideLoggers({ SerialExecutor.class })
public void wrongTaskTest() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(4);
ExecutorService pool = Executors.newCachedThreadPool();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import com.google.protobuf.Empty;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
Expand All @@ -18,11 +19,16 @@
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.grpc.GrpcReadWriteStream;
import tech.ydb.topic.utils.HideLoggers;
import tech.ydb.topic.utils.HideLoggersRule;

public class TopicRetryableStreamTest {
private static final Logger logger = LoggerFactory.getLogger(TopicRetryableStreamTest.class);
private static final Empty EMPTY = Empty.getDefaultInstance();

@Rule
public final HideLoggersRule hideLogger = new HideLoggersRule();

/**
* Pairs a mock GrpcReadWriteStream with a concrete TopicStream backed by it.
* Completing grpcFuture simulates the underlying gRPC stream finishing.
Expand Down Expand Up @@ -257,6 +263,7 @@ public void immediateRetryTest() {
}

@Test
@HideLoggers({TopicRetryableStreamTest.class})
public void closeOnWrongSchedulerTest() {
StreamHandle h = new StreamHandle();
long delayMs = 500L;
Expand Down
16 changes: 16 additions & 0 deletions topic/src/test/java/tech/ydb/topic/utils/HideLoggers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package tech.ydb.topic.utils;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
*
* @author Aleksandr Gorshenin
*/
@Target({ ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
public @interface HideLoggers {
Class<?>[] value();
}
50 changes: 50 additions & 0 deletions topic/src/test/java/tech/ydb/topic/utils/HideLoggersRule.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package tech.ydb.topic.utils;


import java.util.HashMap;
import java.util.Map;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.config.Configurator;
import org.apache.logging.log4j.spi.ExtendedLogger;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;


/**
*
* @author Aleksandr Gorshenin
*/
public class HideLoggersRule implements TestRule {

@Override
public Statement apply(Statement base, Description description) {
HideLoggers annotation = description.getAnnotation(HideLoggers.class);
if (annotation == null) {
return base;
}

Class<?>[] hiddenLoggers = annotation.value();
return new Statement() {
@Override
public void evaluate() throws Throwable {
Map<ExtendedLogger, Level> before = new HashMap<>();
for (Class<?> clazz: hiddenLoggers) {
ExtendedLogger logger = LogManager.getContext(true).getLogger(clazz);
before.put(logger, logger.getLevel());
// hide logger
Configurator.setLevel(logger, Level.OFF);
}

try {
base.evaluate();
} finally {
// recover all loggers
before.forEach((logger, level) -> Configurator.setLevel(logger, level));
}
}
};
}
}
Loading