Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 27 additions & 16 deletions topic/src/main/java/tech/ydb/topic/TopicRpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,17 @@
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcReadWriteStream;
import tech.ydb.core.grpc.GrpcRequestSettings;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.proto.topic.YdbTopic.AlterTopicRequest;
import tech.ydb.proto.topic.YdbTopic.CommitOffsetRequest;
import tech.ydb.proto.topic.YdbTopic.CreateTopicRequest;
import tech.ydb.proto.topic.YdbTopic.DescribeConsumerRequest;
import tech.ydb.proto.topic.YdbTopic.DescribeConsumerResult;
import tech.ydb.proto.topic.YdbTopic.DescribeTopicRequest;
import tech.ydb.proto.topic.YdbTopic.DescribeTopicResult;
import tech.ydb.proto.topic.YdbTopic.DropTopicRequest;
import tech.ydb.proto.topic.YdbTopic.StreamReadMessage;
import tech.ydb.proto.topic.YdbTopic.StreamWriteMessage;
import tech.ydb.proto.topic.YdbTopic.UpdateOffsetsInTransactionRequest;


/**
Comment thread
alex268 marked this conversation as resolved.
Expand All @@ -21,41 +31,41 @@ public interface TopicRpc {
* @param settings rpc call settings
* @return completable future with status of operation
*/
CompletableFuture<Status> createTopic(YdbTopic.CreateTopicRequest request, GrpcRequestSettings settings);
CompletableFuture<Status> createTopic(CreateTopicRequest request, GrpcRequestSettings settings);

/**
* Alter topic.
* @param request request proto
* @param settings rpc call settings
* @return completable future with status of operation
*/
CompletableFuture<Status> alterTopic(YdbTopic.AlterTopicRequest request, GrpcRequestSettings settings);
CompletableFuture<Status> alterTopic(AlterTopicRequest request, GrpcRequestSettings settings);

/**
* Drop topic.
* @param request request proto
* @param settings rpc call settings
* @return completable future with status of operation
*/
CompletableFuture<Status> dropTopic(YdbTopic.DropTopicRequest request, GrpcRequestSettings settings);
CompletableFuture<Status> dropTopic(DropTopicRequest request, GrpcRequestSettings settings);

/**
* Describe topic.
* @param request request proto
* @param settings rpc call settings
* @return completable future with result of operation
*/
CompletableFuture<Result<YdbTopic.DescribeTopicResult>> describeTopic(YdbTopic.DescribeTopicRequest request,
GrpcRequestSettings settings);
CompletableFuture<Result<DescribeTopicResult>> describeTopic(DescribeTopicRequest request,
GrpcRequestSettings settings);

/**
* Describe consumer.
* @param request request proto
* @param settings rpc call settings
* @return completable future with result of operation
*/
CompletableFuture<Result<YdbTopic.DescribeConsumerResult>> describeConsumer(
YdbTopic.DescribeConsumerRequest request, GrpcRequestSettings settings
CompletableFuture<Result<DescribeConsumerResult>> describeConsumer(
DescribeConsumerRequest request, GrpcRequestSettings settings
);

/**
Expand All @@ -64,24 +74,25 @@ CompletableFuture<Result<YdbTopic.DescribeConsumerResult>> describeConsumer(
* @param settings rpc call settings
* @return completable future with result of operation
*/
CompletableFuture<Status> commitOffset(YdbTopic.CommitOffsetRequest request, GrpcRequestSettings settings);
CompletableFuture<Status> commitOffset(CommitOffsetRequest request, GrpcRequestSettings settings);

/**
* Updates offsets in transaction.
* @param request request proto
* @param settings rpc call settings
* @return completable future with result of operation
*/
CompletableFuture<Status> updateOffsetsInTransaction(YdbTopic.UpdateOffsetsInTransactionRequest request,
CompletableFuture<Status> updateOffsetsInTransaction(UpdateOffsetsInTransactionRequest request,
GrpcRequestSettings settings);

GrpcReadWriteStream<YdbTopic.StreamWriteMessage.FromServer, YdbTopic.StreamWriteMessage.FromClient> writeSession(
String traceId
);
GrpcReadWriteStream<StreamWriteMessage.FromServer, StreamWriteMessage.FromClient> writeSession(String traceId);

GrpcReadWriteStream<YdbTopic.StreamReadMessage.FromServer, YdbTopic.StreamReadMessage.FromClient> readSession(
String traceId
);
default GrpcReadWriteStream<StreamWriteMessage.FromServer, StreamWriteMessage.FromClient> writeSession(
GrpcRequestSettings settings) {
return writeSession(settings.getTraceId());
}
Comment thread
alex268 marked this conversation as resolved.

GrpcReadWriteStream<StreamReadMessage.FromServer, StreamReadMessage.FromClient> readSession(String traceId);

ScheduledExecutorService getScheduler();
}
65 changes: 41 additions & 24 deletions topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,22 @@
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.core.operation.OperationBinder;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.proto.topic.YdbTopic.AlterTopicRequest;
import tech.ydb.proto.topic.YdbTopic.AlterTopicResponse;
import tech.ydb.proto.topic.YdbTopic.CommitOffsetRequest;
import tech.ydb.proto.topic.YdbTopic.CreateTopicRequest;
import tech.ydb.proto.topic.YdbTopic.DescribeConsumerRequest;
import tech.ydb.proto.topic.YdbTopic.DescribeConsumerResponse;
import tech.ydb.proto.topic.YdbTopic.DescribeConsumerResult;
import tech.ydb.proto.topic.YdbTopic.DescribeTopicRequest;
import tech.ydb.proto.topic.YdbTopic.DescribeTopicResponse;
import tech.ydb.proto.topic.YdbTopic.DescribeTopicResult;
import tech.ydb.proto.topic.YdbTopic.DropTopicRequest;
import tech.ydb.proto.topic.YdbTopic.DropTopicResponse;
import tech.ydb.proto.topic.YdbTopic.StreamReadMessage;
import tech.ydb.proto.topic.YdbTopic.StreamWriteMessage;
import tech.ydb.proto.topic.YdbTopic.UpdateOffsetsInTransactionRequest;
import tech.ydb.proto.topic.YdbTopic.UpdateOffsetsInTransactionResponse;
import tech.ydb.proto.topic.v1.TopicServiceGrpc;
import tech.ydb.topic.TopicRpc;

Expand All @@ -33,77 +49,78 @@ public static GrpcTopicRpc useTransport(@WillNotClose GrpcTransport transport) {
}

@Override
public CompletableFuture<Status> createTopic(YdbTopic.CreateTopicRequest request, GrpcRequestSettings settings) {
public CompletableFuture<Status> createTopic(CreateTopicRequest request, GrpcRequestSettings settings) {
return transport
.unaryCall(TopicServiceGrpc.getCreateTopicMethod(), settings, request)
.thenApply(OperationBinder.bindSync(YdbTopic.CreateTopicResponse::getOperation));
}

@Override
public CompletableFuture<Status> alterTopic(YdbTopic.AlterTopicRequest request, GrpcRequestSettings settings) {
public CompletableFuture<Status> alterTopic(AlterTopicRequest request, GrpcRequestSettings settings) {
return transport
.unaryCall(TopicServiceGrpc.getAlterTopicMethod(), settings, request)
.thenApply(OperationBinder.bindSync(YdbTopic.AlterTopicResponse::getOperation));
.thenApply(OperationBinder.bindSync(AlterTopicResponse::getOperation));
}

@Override
public CompletableFuture<Result<YdbTopic.DescribeTopicResult>> describeTopic(YdbTopic.DescribeTopicRequest request,
public CompletableFuture<Result<DescribeTopicResult>> describeTopic(DescribeTopicRequest request,
GrpcRequestSettings settings) {
return transport
.unaryCall(TopicServiceGrpc.getDescribeTopicMethod(), settings, request)
.thenApply(OperationBinder.bindSync(
YdbTopic.DescribeTopicResponse::getOperation, YdbTopic.DescribeTopicResult.class)
);
.thenApply(OperationBinder.bindSync(DescribeTopicResponse::getOperation, DescribeTopicResult.class));
}

@Override
public CompletableFuture<Result<YdbTopic.DescribeConsumerResult>> describeConsumer(
YdbTopic.DescribeConsumerRequest request, GrpcRequestSettings settings
) {
public CompletableFuture<Result<DescribeConsumerResult>> describeConsumer(DescribeConsumerRequest request,
GrpcRequestSettings settings) {
return transport
.unaryCall(TopicServiceGrpc.getDescribeConsumerMethod(), settings, request)
.thenApply(OperationBinder.bindSync(
YdbTopic.DescribeConsumerResponse::getOperation, YdbTopic.DescribeConsumerResult.class)
);
.thenApply(OperationBinder.bindSync(DescribeConsumerResponse::getOperation,
DescribeConsumerResult.class));
}

@Override
public CompletableFuture<Status> dropTopic(YdbTopic.DropTopicRequest request, GrpcRequestSettings settings) {
public CompletableFuture<Status> dropTopic(DropTopicRequest request, GrpcRequestSettings settings) {
return transport
.unaryCall(TopicServiceGrpc.getDropTopicMethod(), settings, request)
.thenApply(OperationBinder.bindSync(YdbTopic.DropTopicResponse::getOperation));
.thenApply(OperationBinder.bindSync(DropTopicResponse::getOperation));
}

@Override
public CompletableFuture<Status> commitOffset(YdbTopic.CommitOffsetRequest request, GrpcRequestSettings settings) {
public CompletableFuture<Status> commitOffset(CommitOffsetRequest request, GrpcRequestSettings settings) {
return transport
.unaryCall(TopicServiceGrpc.getCommitOffsetMethod(), settings, request)
.thenApply(OperationBinder.bindSync(YdbTopic.CommitOffsetResponse::getOperation));
}

@Override
public CompletableFuture<Status> updateOffsetsInTransaction(YdbTopic.UpdateOffsetsInTransactionRequest request,
public CompletableFuture<Status> updateOffsetsInTransaction(UpdateOffsetsInTransactionRequest request,
GrpcRequestSettings settings) {
return transport
.unaryCall(TopicServiceGrpc.getUpdateOffsetsInTransactionMethod(), settings, request)
.thenApply(OperationBinder.bindSync(YdbTopic.UpdateOffsetsInTransactionResponse::getOperation));
.thenApply(OperationBinder.bindSync(UpdateOffsetsInTransactionResponse::getOperation));
}

@Override
public GrpcReadWriteStream<YdbTopic.StreamWriteMessage.FromServer, YdbTopic.StreamWriteMessage.FromClient>
writeSession(String streamId) {
public GrpcReadWriteStream<StreamWriteMessage.FromServer, StreamWriteMessage.FromClient> writeSession(String id) {
GrpcRequestSettings settings = GrpcRequestSettings.newBuilder()
.withTraceId(streamId)
.withTraceId(id)
.disableDeadline()
.build();
return writeSession(settings);
}

@Override
public GrpcReadWriteStream<StreamWriteMessage.FromServer, StreamWriteMessage.FromClient> writeSession(
GrpcRequestSettings settings) {
return transport.readWriteStreamCall(TopicServiceGrpc.getStreamWriteMethod(), settings);
}


@Override
public GrpcReadWriteStream<YdbTopic.StreamReadMessage.FromServer, YdbTopic.StreamReadMessage.FromClient>
readSession(String streamId) {
public GrpcReadWriteStream<StreamReadMessage.FromServer, StreamReadMessage.FromClient> readSession(String id) {
GrpcRequestSettings settings = GrpcRequestSettings.newBuilder()
.withTraceId(streamId)
.withTraceId(id)
.disableDeadline()
.build();
return transport.readWriteStreamCall(TopicServiceGrpc.getStreamReadMethod(), settings);
Expand Down
72 changes: 4 additions & 68 deletions topic/src/main/java/tech/ydb/topic/impl/TopicStream.java
Original file line number Diff line number Diff line change
@@ -1,79 +1,15 @@
package tech.ydb.topic.impl;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

import com.google.protobuf.Message;
import org.slf4j.Logger;

import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.grpc.GrpcReadWriteStream;

public abstract class TopicStream<R extends Message, W extends Message> {
private final Logger logger;
private final String debugId;
private final GrpcReadWriteStream<R, W> stream;
private final CompletableFuture<Status> streamStatus = new CompletableFuture<>();
private volatile String token;
public interface TopicStream<R extends Message, W extends Message> {
CompletableFuture<Status> start(W initReq, Consumer<R> messageHandler);
void send(W request);

public TopicStream(Logger logger, String debugId, GrpcReadWriteStream<R, W> stream) {
this.logger = logger;
this.debugId = debugId;
this.stream = stream;
this.token = stream.authToken();
}

protected abstract W updateTokenMessage(String token);
protected abstract Status parseMessageStatus(R message);

public CompletableFuture<Status> start(W initReq, Consumer<R> messageHandler) {
this.logger.debug("[{}] is about to start", debugId);
this.stream.start((R msg) -> {
Status messageStatus = parseMessageStatus(msg);
if (messageStatus.isSuccess()) {
messageHandler.accept(msg);
} else {
logger.warn("[{}] stopped by getting status {}", debugId, messageStatus);
if (streamStatus.complete(messageStatus)) {
stream.close();
}
}
}).whenComplete((st, th) -> {
Status status = st != null ? st : Status.of(StatusCode.CLIENT_INTERNAL_ERROR, th);
logger.debug("[{}] finished with status {}", debugId, status);
streamStatus.complete(status);
});

if (!streamStatus.isDone()) {
stream.sendNext(initReq);
}

return streamStatus;
}

public void close() {
logger.debug("[{}] closed by app", debugId);
if (!streamStatus.isDone()) {
stream.close();
}
}

public void send(W req) {
if (streamStatus.isDone()) {
logger.warn("[{}] is already closed. Next message with type {} was NOT sent", debugId,
req.getDescriptorForType().getName());
return;
}

String currentToken = stream.authToken();
if (!Objects.equals(token, currentToken)) {
token = currentToken;
logger.info("{} sends new token", this);
stream.sendNext(updateTokenMessage(token));
}

stream.sendNext(req);
}
void close();
}
82 changes: 82 additions & 0 deletions topic/src/main/java/tech/ydb/topic/impl/TopicStreamBase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package tech.ydb.topic.impl;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

import com.google.protobuf.Message;
import org.slf4j.Logger;

import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.grpc.GrpcReadWriteStream;

public abstract class TopicStreamBase<R extends Message, W extends Message> implements TopicStream<R, W> {
private final Logger logger;
private final String debugId;
private final GrpcReadWriteStream<R, W> stream;
private final CompletableFuture<Status> streamStatus = new CompletableFuture<>();
private volatile String token;

public TopicStreamBase(Logger logger, String debugId, GrpcReadWriteStream<R, W> stream) {
this.logger = logger;
this.debugId = debugId;
this.stream = stream;
this.token = stream.authToken();
}

protected abstract W updateTokenMessage(String token);
protected abstract Status parseMessageStatus(R message);

@Override
public CompletableFuture<Status> start(W initReq, Consumer<R> messageHandler) {
this.logger.debug("[{}] is about to start", debugId);
this.stream.start((R msg) -> {
Status messageStatus = parseMessageStatus(msg);
if (messageStatus.isSuccess()) {
messageHandler.accept(msg);
} else {
logger.warn("[{}] stopped by getting status {}", debugId, messageStatus);
if (streamStatus.complete(messageStatus)) {
stream.close();
}
}
}).whenComplete((st, th) -> {
Status status = st != null ? st : Status.of(StatusCode.CLIENT_INTERNAL_ERROR, th);
logger.debug("[{}] finished with status {}", debugId, status);
streamStatus.complete(status);
});

if (!streamStatus.isDone()) {
stream.sendNext(initReq);
}

return streamStatus;
}

@Override
public void close() {
logger.debug("[{}] closed by app", debugId);
if (!streamStatus.isDone()) {
stream.close();
}
}

@Override
public void send(W req) {
if (streamStatus.isDone()) {
logger.warn("[{}] is already closed. Next message with type {} was NOT sent", debugId,
req.getDescriptorForType().getName());
return;
}

String currentToken = stream.authToken();
if (!Objects.equals(token, currentToken)) {
token = currentToken;
logger.info("{} sends new token", this);
stream.sendNext(updateTokenMessage(token));
}

stream.sendNext(req);
}
}
Loading