From e929377a1237c48b02944855b6a95bf0c0bfd6bd Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Mon, 2 Mar 2026 17:33:53 +0300 Subject: [PATCH 01/16] feat: added spans --- .../ydb/core/grpc/GrpcRequestSettings.java | 2 + .../main/java/tech/ydb/core/tracing/Span.java | 118 +--- opentelemetry/pom.xml | 33 ++ .../opentelemetry/OpenTelemetryTracer.java | 94 ++++ .../java/tech/ydb/query/impl/SessionImpl.java | 217 ++++---- .../java/tech/ydb/query/impl/SessionPool.java | 62 ++- .../tech/ydb/query/impl/TableClientImpl.java | 112 +--- .../tech/ydb/query/impl/QueryTracingTest.java | 526 +----------------- .../ydb/table/impl/SimpleTableClient.java | 6 - 9 files changed, 334 insertions(+), 836 deletions(-) create mode 100644 opentelemetry/pom.xml create mode 100644 opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java diff --git a/core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java b/core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java index e7850d771..344697273 100644 --- a/core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java +++ b/core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java @@ -6,6 +6,8 @@ import java.util.function.BooleanSupplier; import java.util.function.Consumer; +import javax.annotation.Nullable; + import io.grpc.Metadata; import tech.ydb.core.impl.call.GrpcFlows; diff --git a/core/src/main/java/tech/ydb/core/tracing/Span.java b/core/src/main/java/tech/ydb/core/tracing/Span.java index b482bc6cb..7ae35b322 100644 --- a/core/src/main/java/tech/ydb/core/tracing/Span.java +++ b/core/src/main/java/tech/ydb/core/tracing/Span.java @@ -1,127 +1,29 @@ package tech.ydb.core.tracing; -import java.util.concurrent.CompletableFuture; - -import javax.annotation.Nullable; - -import io.grpc.ExperimentalApi; - -import tech.ydb.core.Result; import tech.ydb.core.Status; -import tech.ydb.core.utils.FutureTools; /** * A span represents a timed operation. */ -@ExperimentalApi("YDB Tracer is experimental and API may change without notice") public interface Span { - Span NOOP = new Span() { - }; - - /** - * Returns W3C traceparent value for request propagation. - * - *

For {@link #NOOP} this returns an empty string. Check {@link #isValid()} to decide whether - * trace headers should be sent to server. - * - * @return traceparent value - */ - default String getId() { - return ""; - } - /** - * Indicates whether this span carries a real tracing context. - * - * @return true for real spans, false for noop span - */ - default boolean isValid() { - return false; - } + String getId(); - /** - * Sets a string attribute on the span. - * - * @param key attribute key - * @param value attribute value, may be null - */ - default void setAttribute(String key, @Nullable String value) { - } + /** Sets a string attribute on the span (ignored by Noop implementation). */ + void setAttribute(String key, String value); - /** - * Sets a long attribute on the span. - * - * @param key attribute key - * @param value attribute value - */ - default void setAttribute(String key, long value) { - } + /** Sets a long attribute on the span (ignored by Noop implementation). */ + void setAttribute(String key, long value); /** - * Sets span status (success or error) with human-readable message. - * - * @param status operation status used to map span attributes - * @param error operation exception used to map span attributes + * Sets span status to error with human-readable message. */ - default void setStatus(@Nullable Status status, @Nullable Throwable error) { - } + void setError(Status status); /** - * Makes this span current in the active execution context. - * - * @return closeable scope handle + * Sets span status to error from exception. */ - default Scope makeCurrent() { - return () -> { - }; - } + void setError(Throwable error); - /** - * Restores context captured when this span was created. - * - *

The returned scope must be closed, usually via try-with-resources. - * - * @return closeable scope handle for restored context - */ - default Scope restoreContext() { - return () -> { - }; - } - - /** - * Ends (finishes) this span. - */ - default void end() { - } - - /** - * Subscribes to a {@link Status} future: when it completes, sets status/error and ends the span. - * For non-valid spans returns the future as-is without subscribing. - * - * @param span the span to finalize - * @param future the future to observe - * @return the same future (for chaining) - */ - static CompletableFuture endOnStatus(Span span, CompletableFuture future) { - return span.isValid() ? future.whenComplete((status, th) -> { - span.setStatus(status, FutureTools.unwrapCompletionException(th)); - span.end(); - }) : future; - } - - /** - * Subscribes to a {@link Result} future: when it completes, sets status/error and ends the span. - * For non-valid spans returns the future as-is without subscribing. - * - * @param result value type - * @param span the span to finalize - * @param future the future to observe - * @return the same future (for chaining) - */ - static CompletableFuture> endOnResult(Span span, CompletableFuture> future) { - return span.isValid() ? future.whenComplete((result, th) -> { - span.setStatus(result != null ? result.getStatus() : null, FutureTools.unwrapCompletionException(th)); - span.end(); - }) : future; - } + void end(); } diff --git a/opentelemetry/pom.xml b/opentelemetry/pom.xml new file mode 100644 index 000000000..921db5032 --- /dev/null +++ b/opentelemetry/pom.xml @@ -0,0 +1,33 @@ + + + 4.0.0 + + + tech.ydb + ydb-sdk-parent + 2.4.1-SNAPSHOT + + + ydb-sdk-opentelemetry + OpenTelemetry integration for YDB Java SDK + OpenTelemetry integration module for tracing and metrics in YDB SDK + + + 1.59.0 + + + + + tech.ydb + ydb-sdk-core + + + io.opentelemetry + opentelemetry-api + ${opentelemetry.version} + + + diff --git a/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java new file mode 100644 index 000000000..5e9b557f2 --- /dev/null +++ b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java @@ -0,0 +1,94 @@ +package tech.ydb.opentelemetry; + +import java.util.Objects; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; + +import io.opentelemetry.api.trace.StatusCode; + +import tech.ydb.core.Status; +import tech.ydb.core.tracing.Span; +import tech.ydb.core.tracing.SpanKind; +import tech.ydb.core.tracing.Tracer; + +public final class OpenTelemetryTracer implements Tracer { + private static final String DEFAULT_SCOPE = "tech.ydb.sdk"; + + private final io.opentelemetry.api.trace.Tracer tracer; + + private OpenTelemetryTracer(io.opentelemetry.api.trace.Tracer tracer) { + this.tracer = Objects.requireNonNull(tracer, "tracer is null"); + } + + public static OpenTelemetryTracer createGlobal() { + return fromOpenTelemetry(GlobalOpenTelemetry.get()); + } + + public static OpenTelemetryTracer fromOpenTelemetry(OpenTelemetry openTelemetry) { + return fromOpenTelemetry(openTelemetry, DEFAULT_SCOPE); + } + + public static OpenTelemetryTracer fromOpenTelemetry(OpenTelemetry openTelemetry, String scopeName) { + Objects.requireNonNull(openTelemetry, "openTelemetry is null"); + Objects.requireNonNull(scopeName, "scopeName is null"); + return new OpenTelemetryTracer(openTelemetry.getTracer(scopeName)); + } + + @Override + public Span startSpan(String spanName, SpanKind spanKind) { + io.opentelemetry.api.trace.Span span = tracer.spanBuilder(spanName) + .setSpanKind(mapSpanKind(spanKind)) + .startSpan(); + return new OtelSpan(span); + } + + private static io.opentelemetry.api.trace.SpanKind mapSpanKind(SpanKind kind) { + if (kind == SpanKind.CLIENT) { + return io.opentelemetry.api.trace.SpanKind.CLIENT; + } + return io.opentelemetry.api.trace.SpanKind.INTERNAL; + } + + private static final class OtelSpan implements Span { + private final io.opentelemetry.api.trace.Span span; + + private OtelSpan(io.opentelemetry.api.trace.Span span) { + this.span = span; + } + + @Override + public String getId() { + return "00-" + span.getSpanContext().getTraceId() + "-" + span.getSpanContext().getSpanId() + "01"; + } + + @Override + public void setAttribute(String key, String value) { + span.setAttribute(key, value); + } + + @Override + public void setAttribute(String key, long value) { + span.setAttribute(key, value); + } + + @Override + public void setError(Status status) { + span.setAttribute("db.response.status_code", status.getCode().toString()); + span.setAttribute("error.type", status.getCode().isTransportError() ? "transport_error" : "ydb_error"); + + span.setStatus(StatusCode.ERROR, status.toString()); + } + + @Override + public void setError(Throwable error) { + span.setAttribute("error.type", error.getClass().getName()); + span.setStatus(StatusCode.ERROR, error.getMessage()); + } + + @Override + public void end() { + span.end(); + } + } +} diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java index f4c5a4065..05251101e 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java @@ -144,16 +144,11 @@ GrpcReadStream attach(AttachSessionSettings settings) { YdbQuery.AttachSessionRequest request = YdbQuery.AttachSessionRequest.newBuilder() .setSessionId(sessionId) .build(); - // Execute attachSession call outside current context to avoid cancellation and deadline propagation + // Execute attachSession call outside current context to avoid cancellation and deadline propogation Context ctx = Context.ROOT.fork(); Context previous = ctx.attach(); try { - AtomicBoolean pessimizationHook = new AtomicBoolean(false); - - GrpcRequestSettings grpcSettings = makeOptions(settings) - .withPessimizationHook(pessimizationHook::get) - .disableDeadline() - .build(); + GrpcRequestSettings grpcSettings = makeOptions(settings).disableDeadline().build(); GrpcReadStream origin = rpc.attachSession(request, grpcSettings); return new GrpcReadStream() { @Override @@ -166,19 +161,6 @@ public CompletableFuture start(GrpcReadStream.Observer observer) StatusCode code = StatusCode.fromProto(message.getStatus()); Status status = Status.of(code, Issue.fromPb(message.getIssuesList())); updateSessionState(status); - // The hint is sent by the server with a success status. - switch (message.getSessionHintCase()) { - case NODE_SHUTDOWN: - pessimizationHook.set(nodeID != 0); - updateSessionState(Status.of(StatusCode.BAD_SESSION)); - break; - case SESSION_SHUTDOWN: - updateSessionState(Status.of(StatusCode.BAD_SESSION)); - break; - default: - break; - } - observer.onNext(status); }); } @@ -197,17 +179,16 @@ private GrpcRequestSettings.Builder makeOptions(BaseRequestSettings settings) { return makeOptions(settings, null); } - Span startSpan(String spanName) { - return rpc.startSpan(spanName); - } - private GrpcRequestSettings.Builder makeOptions(BaseRequestSettings settings, Span span) { String traceId = settings.getTraceId() == null ? UUID.randomUUID().toString() : settings.getTraceId(); - return GrpcRequestSettings.newBuilder() + GrpcRequestSettings.Builder builder = GrpcRequestSettings.newBuilder() .withDeadline(settings.getRequestTimeout()) .withPreferredNodeID((int) nodeID) - .withTraceId(traceId) - .withSpan(span); + .withTraceId(traceId); + if (span != null) { + builder.withSpan(span); + } + return builder; } private static YdbQuery.ExecMode mapExecMode(QueryExecMode mode) { @@ -265,22 +246,6 @@ private static YdbFormats.ArrowFormatSettings mapApacheArrowFormat(ApacheArrowFo return YdbFormats.ArrowFormatSettings.newBuilder().setCompressionCodec(codecBuilder).build(); } - CompletableFuture commitById(String txId, CommitTransactionSettings settings, Span span) { - YdbQuery.CommitTransactionRequest request = YdbQuery.CommitTransactionRequest.newBuilder() - .setSessionId(sessionId) - .setTxId(txId) - .build(); - return rpc.commitTransaction(request, makeOptions(settings, span).build()).thenApply(Result::getStatus); - } - - CompletableFuture rollbackById(String txId, RollbackTransactionSettings settings, Span span) { - YdbQuery.RollbackTransactionRequest request = YdbQuery.RollbackTransactionRequest.newBuilder() - .setSessionId(sessionId) - .setTxId(txId) - .build(); - return rpc.rollbackTransaction(request, makeOptions(settings, span).build()).thenApply(Result::getStatus); - } - GrpcReadStream createGrpcStream( String query, YdbQuery.TransactionControl tx, @@ -330,7 +295,7 @@ GrpcReadStream createGrpcStream( @Override public QueryStream createQuery(String query, TxMode tx, Params prms, ExecuteQuerySettings settings) { YdbQuery.TransactionControl tc = TxControl.txModeCtrl(tx, true); - Span span = startSpan("ydb.ExecuteQuery"); + Span span = rpc.startSpan("ydb.ExecuteQuery"); return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span) { @Override void handleTxMeta(String txID) { @@ -371,76 +336,91 @@ static CompletableFuture> createSession( return rpc.createSession(request, grpcSettingsBuilder.build()).thenApply(result -> { pessimizationHook.set(result.getStatus().getCode() == StatusCode.OVERLOADED); + if (!result.isSuccess()) { + finishSpanByStatus(createSpan, result.getStatus()); + } return CREATE_SESSION.apply(result); }); } abstract class StreamImpl implements QueryStream { private final GrpcReadStream grpcStream; - private final Span span; - - StreamImpl(GrpcReadStream grpcStream, Span operationSpan) { + @Nullable + private final Span operationSpan; + + StreamImpl( + GrpcReadStream grpcStream, + @Nullable + Span operationSpan + ) { this.grpcStream = grpcStream; - this.span = operationSpan; + this.operationSpan = operationSpan; } abstract void handleTxMeta(String txId); void handleCompletion(Status status, Throwable th) { + if (operationSpan == null) { + return; + } + if (th != null) { + finishSpanByError(operationSpan, th); + return; + } + + finishSpanByStatus(operationSpan, status); } @Override public CompletableFuture> execute(PartsHandler handler) { final UpdatableOptional operationStatus = new UpdatableOptional<>(); final UpdatableOptional stats = new UpdatableOptional<>(); - return Span.endOnResult(span, grpcStream.start(msg -> { - if (isTraceEnabled) { - logger.trace("{} got stream message {}", - SessionImpl.this, TextFormat.shortDebugString(msg)); - } - Issue[] issues = Issue.fromPb(msg.getIssuesList()); - Status status = Status.of(StatusCode.fromProto(msg.getStatus()), issues); + return grpcStream.start(msg -> { + if (isTraceEnabled) { + logger.trace("{} got stream message {}", SessionImpl.this, TextFormat.shortDebugString(msg)); + } + Issue[] issues = Issue.fromPb(msg.getIssuesList()); + Status status = Status.of(StatusCode.fromProto(msg.getStatus()), issues); - updateSessionState(status); + updateSessionState(status); - if (!status.isSuccess()) { - handleTxMeta(null); - operationStatus.update(status); - return; - } + if (!status.isSuccess()) { + handleTxMeta(null); + operationStatus.update(status); + return; + } - if (msg.hasTxMeta()) { - handleTxMeta(msg.getTxMeta().getId()); - } - if (issues.length > 0) { - if (handler != null) { - handler.onIssues(issues); - } else { - logger.trace("{} lost issues message", SessionImpl.this); - } - } - if (msg.hasExecStats()) { - stats.update(new QueryStats(msg.getExecStats())); - } + if (msg.hasTxMeta()) { + handleTxMeta(msg.getTxMeta().getId()); + } + if (issues.length > 0) { + if (handler != null) { + handler.onIssues(issues); + } else { + logger.trace("{} lost issues message", SessionImpl.this); + } + } + if (msg.hasExecStats()) { + stats.update(new QueryStats(msg.getExecStats())); + } - if (msg.hasResultSet()) { - long index = msg.getResultSetIndex(); - if (handler != null) { - handler.onNextRawPart(index, msg.getResultSet()); - } else { - logger.trace("{} lost result set part with index {}", SessionImpl.this, index); - } - } - }).whenComplete(this::handleCompletion).thenApply(streamStatus -> { - updateSessionState(streamStatus); - Status status = operationStatus.orElse(streamStatus); - if (status.isSuccess()) { - return Result.success(new QueryInfo(stats.get()), streamStatus); - } else { - return Result.fail(status); - } - }) - ); + if (msg.hasResultSet()) { + long index = msg.getResultSetIndex(); + if (handler != null) { + handler.onNextRawPart(index, msg.getResultSet()); + } else { + logger.trace("{} lost result set part with index {}", SessionImpl.this, index); + } + } + }).whenComplete(this::handleCompletion).thenApply(streamStatus -> { + updateSessionState(streamStatus); + Status status = operationStatus.orElse(streamStatus); + if (status.isSuccess()) { + return Result.success(new QueryInfo(stats.get()), streamStatus); + } else { + return Result.fail(status); + } + }); } @Override @@ -477,7 +457,7 @@ public QueryStream createQuery(String query, boolean commitAtEnd, Params prms, E ? TxControl.txIdCtrl(currentId, commitAtEnd) : TxControl.txModeCtrl(txMode, commitAtEnd); - Span span = startSpan("ydb.ExecuteQuery"); + Span span = rpc.startSpan("ydb.ExecuteQuery"); return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span) { @Override void handleTxMeta(String txID) { @@ -489,6 +469,7 @@ void handleTxMeta(String txID) { @Override void handleCompletion(Status status, Throwable th) { + super.handleCompletion(status, th); if (th != null) { currentStatusFuture.completeExceptionally( new RuntimeException("Query on transaction failed with exception ", th)); @@ -517,13 +498,14 @@ public void cancel() { @Override public CompletableFuture> commit(CommitTransactionSettings settings) { - Span span = startSpan("ydb.Commit"); + final Span commitSpan = rpc.startSpan("ydb.Commit"); CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>()); - String transactionId = txId.get(); + final String transactionId = txId.get(); if (transactionId == null) { Issue issue = Issue.of("Transaction is not started", Issue.Severity.WARNING); Result res = Result.success(new QueryInfo(null), Status.of(StatusCode.SUCCESS, issue)); - return Span.endOnResult(span, CompletableFuture.completedFuture(res)); + finishSpanByStatus(commitSpan, res.getStatus()); + return CompletableFuture.completedFuture(res); } YdbQuery.CommitTransactionRequest request = YdbQuery.CommitTransactionRequest.newBuilder() @@ -531,7 +513,7 @@ public CompletableFuture> commit(CommitTransactionSettings set .setTxId(transactionId) .build(); - return Span.endOnResult(span, rpc.commitTransaction(request, makeOptions(settings, span).build())) + return rpc.commitTransaction(request, makeOptions(settings, commitSpan).build()) .thenApply(res -> { Status status = res.getStatus(); currentStatusFuture.complete(status); @@ -545,27 +527,31 @@ public CompletableFuture> commit(CommitTransactionSettings set if (th != null) { currentStatusFuture.completeExceptionally( new RuntimeException("Transaction commit failed with exception", th)); + finishSpanByError(commitSpan, th); + return; } + finishSpanByStatus(commitSpan, status.getStatus()); })); } @Override public CompletableFuture rollback(RollbackTransactionSettings settings) { - Span span = startSpan("ydb.Rollback"); + final Span rollbackSpan = rpc.startSpan("ydb.Rollback"); CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>()); - String transactionId = txId.get(); + final String transactionId = txId.get(); if (transactionId == null) { Issue issue = Issue.of("Transaction is not started", Issue.Severity.WARNING); Status status = Status.of(StatusCode.SUCCESS, issue); - return Span.endOnStatus(span, CompletableFuture.completedFuture(status)); + finishSpanByStatus(rollbackSpan, status); + return CompletableFuture.completedFuture(status); } YdbQuery.RollbackTransactionRequest request = YdbQuery.RollbackTransactionRequest.newBuilder() .setSessionId(sessionId) .setTxId(transactionId) .build(); - return Span.endOnResult(span, rpc.rollbackTransaction(request, makeOptions(settings, span).build())) + return rpc.rollbackTransaction(request, makeOptions(settings, rollbackSpan).build()) .thenApply(result -> { updateSessionState(result.getStatus()); if (!txId.compareAndSet(transactionId, null)) { @@ -578,7 +564,36 @@ public CompletableFuture rollback(RollbackTransactionSettings settings) currentStatusFuture.complete(Status .of(StatusCode.ABORTED) .withIssues(Issue.of("Transaction was rolled back", Issue.Severity.ERROR))); + if (th != null) { + finishSpanByError(rollbackSpan, th); + return; + } + finishSpanByStatus(rollbackSpan, status); }); } } + + private static void finishSpanByStatus(Span span, Status status) { + if (span == null) { + return; + } + + if (status != null && !status.isSuccess()) { + span.setError(status); + } + + span.end(); + } + + private static void finishSpanByError(Span span, Throwable error) { + if (span == null) { + return; + } + + if (error != null) { + span.setError(error); + } + + span.end(); + } } diff --git a/query/src/main/java/tech/ydb/query/impl/SessionPool.java b/query/src/main/java/tech/ydb/query/impl/SessionPool.java index ccbbf4889..e522f47a9 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionPool.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionPool.java @@ -155,16 +155,22 @@ private boolean tryComplete(CompletableFuture> future, Pool private class PooledQuerySession extends SessionImpl { private final GrpcReadStream attachStream; + private final Span createSpan; private volatile Instant lastActive; private volatile boolean isStarted = false; private volatile boolean isBroken = false; private volatile boolean isStopped = false; - PooledQuerySession(QueryServiceRpc rpc, YdbQuery.CreateSessionResponse response) { + PooledQuerySession( + QueryServiceRpc rpc, + YdbQuery.CreateSessionResponse response, + Span createSpan + ) { super(rpc, response); this.lastActive = clock.instant(); this.attachStream = attach(ATTACH_SETTINGS); + this.createSpan = createSpan; stats.created.increment(); } @@ -173,12 +179,12 @@ public void updateSessionState(Status status) { this.lastActive = clock.instant(); boolean isStatusBroken = status.getCode() == StatusCode.BAD_SESSION || - status.getCode() == StatusCode.SESSION_BUSY || - status.getCode() == StatusCode.INTERNAL_ERROR || - status.getCode() == StatusCode.CLIENT_DEADLINE_EXCEEDED || - status.getCode() == StatusCode.CLIENT_DEADLINE_EXPIRED || - status.getCode() == StatusCode.CLIENT_CANCELLED || - status.getCode() == StatusCode.TRANSPORT_UNAVAILABLE; + status.getCode() == StatusCode.SESSION_BUSY || + status.getCode() == StatusCode.INTERNAL_ERROR || + status.getCode() == StatusCode.CLIENT_DEADLINE_EXCEEDED || + status.getCode() == StatusCode.CLIENT_DEADLINE_EXPIRED || + status.getCode() == StatusCode.CLIENT_CANCELLED || + status.getCode() == StatusCode.TRANSPORT_UNAVAILABLE; if (isStatusBroken) { logger.warn("QuerySession[{}] broken with status {}", getId(), status); } @@ -271,6 +277,7 @@ private class Handler implements WaitingQueue.Handler { } @Override + @SuppressWarnings("resource") public CompletableFuture create() { // Execute createSession call outside current context to avoid cancellation and deadline propogation Context ctx = Context.ROOT.fork(); @@ -278,15 +285,26 @@ public CompletableFuture create() { try { Span createSpan = rpc.startSpan("ydb.CreateSession"); stats.requested.increment(); - return Span.endOnResult(createSpan, SessionImpl.createSession(rpc, CREATE_SETTINGS, true, createSpan)) + return SessionImpl + .createSession(rpc, CREATE_SETTINGS, true, createSpan) .thenCompose(r -> { if (!r.isSuccess()) { + finishSpanByStatus(createSpan, r.getStatus()); stats.failed.increment(); throw new UnexpectedResultException("create session problem", r.getStatus()); } - PooledQuerySession session = new PooledQuerySession(rpc, r.getValue()); + PooledQuerySession session = new PooledQuerySession(rpc, r.getValue(), createSpan); return session.start(); - }).thenApply(Result::getValue); + }) + .whenComplete((result, th) -> { + if (th != null) { + finishSpanByError(createSpan, FutureTools.unwrapCompletionException(th)); + return; + } + + finishSpanByStatus(createSpan, result.getStatus()); + }) + .thenApply(Result::getValue); } finally { ctx.detach(previous); } @@ -456,4 +474,28 @@ public void run() { } } } + + private static void finishSpanByStatus(Span span, Status status) { + if (span == null) { + return; + } + + if (!status.isSuccess()) { + span.setError(status); + } + + span.end(); + } + + private static void finishSpanByError(Span span, Throwable error) { + if (span == null) { + return; + } + + if (error != null) { + span.setError(error); + } + + span.end(); + } } diff --git a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java index 8fde13bc8..9689ae9d3 100644 --- a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java @@ -15,8 +15,6 @@ import tech.ydb.core.StatusCode; import tech.ydb.core.UnexpectedResultException; import tech.ydb.core.grpc.GrpcTransport; -import tech.ydb.core.tracing.Span; -import tech.ydb.core.tracing.Tracer; import tech.ydb.proto.ValueProtos; import tech.ydb.proto.query.YdbQuery; import tech.ydb.proto.table.YdbTable; @@ -25,10 +23,8 @@ import tech.ydb.query.result.QueryInfo; import tech.ydb.query.result.QueryResultPart; import tech.ydb.query.result.QueryStats; -import tech.ydb.query.settings.CommitTransactionSettings; import tech.ydb.query.settings.ExecuteQuerySettings; import tech.ydb.query.settings.QueryStatsMode; -import tech.ydb.query.settings.RollbackTransactionSettings; import tech.ydb.table.Session; import tech.ydb.table.SessionPoolStats; import tech.ydb.table.TableClient; @@ -37,11 +33,7 @@ import tech.ydb.table.query.Params; import tech.ydb.table.rpc.TableRpc; import tech.ydb.table.rpc.grpc.GrpcTableRpc; -import tech.ydb.table.settings.BeginTxSettings; -import tech.ydb.table.settings.CommitTxSettings; import tech.ydb.table.settings.ExecuteDataQuerySettings; -import tech.ydb.table.settings.RollbackTxSettings; -import tech.ydb.table.transaction.TableTransaction; /** * @@ -61,11 +53,6 @@ public ScheduledExecutorService getScheduler() { return proxy.getScheduler(); } - @Override - public Tracer getTracer() { - return proxy.getTracer(); - } - @Override public SessionPoolStats sessionPoolStats() { return proxy.getSessionPoolStats(); @@ -92,9 +79,6 @@ private YdbQuery.TransactionControl mapTxControl(YdbTable.TransactionControl tc) if (tc.getBeginTx().hasSnapshotReadOnly()) { return TxControl.txModeCtrl(TxMode.SNAPSHOT_RO, tc.getCommitTx()); } - if (tc.getBeginTx().hasSnapshotReadWrite()) { - return TxControl.txModeCtrl(TxMode.SNAPSHOT_RW, tc.getCommitTx()); - } if (tc.getBeginTx().hasStaleReadOnly()) { return TxControl.txModeCtrl(TxMode.STALE_RO, tc.getCommitTx()); } @@ -131,10 +115,8 @@ public CompletableFuture> executeDataQueryInternal( final AtomicReference txRef = new AtomicReference<>(""); final List issues = new ArrayList<>(); final List results = new ArrayList<>(); - Span span = querySession.startSpan("ydb.ExecuteQuery"); - QueryStream stream = querySession.new StreamImpl(querySession.createGrpcStream(query, tc, prms, qs, span), - span) { + QueryStream stream = querySession.new StreamImpl(querySession.createGrpcStream(query, tc, prms, qs, null), null) { @Override void handleTxMeta(String txID) { txRef.set(txID); @@ -148,8 +130,7 @@ public void onIssues(Issue[] issueArr) { } @Override - public void onNextPart(QueryResultPart part) { - } // not used + public void onNextPart(QueryResultPart part) { } // not used @Override public void onNextRawPart(long index, ValueProtos.ResultSet rs) { @@ -193,95 +174,6 @@ protected void updateSessionState(Throwable th, StatusCode code, boolean shutdow } } - @Override - public TableTransaction createNewTransaction(TxMode txMode) { - return new TracedTableTransaction(super.createNewTransaction(txMode)); - } - - @Override - public CompletableFuture> beginTransaction(TxMode txMode, BeginTxSettings settings) { - return super.beginTransaction(txMode, settings) - .thenApply(result -> result.map(TracedTableTransaction::new)); - } - - @Override - protected CompletableFuture commitTransactionInternal(String txId, CommitTxSettings settings) { - Span span = querySession.startSpan("ydb.Commit"); - CommitTransactionSettings querySettings = CommitTransactionSettings.newBuilder() - .withTraceId(settings.getTraceId()) - .withRequestTimeout(settings.getTimeoutDuration()) - .build(); - return Span.endOnStatus(span, querySession.commitById(txId, querySettings, span)); - } - - @Override - protected CompletableFuture rollbackTransactionInternal(String txId, RollbackTxSettings settings) { - Span span = querySession.startSpan("ydb.Rollback"); - RollbackTransactionSettings querySettings = RollbackTransactionSettings.newBuilder() - .withTraceId(settings.getTraceId()) - .withRequestTimeout(settings.getTimeoutDuration()) - .build(); - return Span.endOnStatus(span, querySession.rollbackById(txId, querySettings, span)); - } - - private final class TracedTableTransaction implements TableTransaction { - private final TableTransaction delegate; - - private TracedTableTransaction(TableTransaction delegate) { - this.delegate = delegate; - } - - @Override - public Session getSession() { - return TableSession.this; - } - - @Override - public CompletableFuture> executeDataQuery( - String query, boolean commitAtEnd, Params params, ExecuteDataQuerySettings settings - ) { - return delegate.executeDataQuery(query, commitAtEnd, params, settings); - } - - @Override - public CompletableFuture commit(CommitTxSettings settings) { - String txId = delegate.getId(); - if (txId == null) { - return delegate.commit(settings); - } - return TableSession.this.commitTransaction(txId, settings); - } - - @Override - public CompletableFuture rollback(RollbackTxSettings settings) { - String txId = delegate.getId(); - if (txId == null) { - return delegate.rollback(settings); - } - return rollbackTransactionInternal(txId, settings); - } - - @Override - public String getId() { - return delegate.getId(); - } - - @Override - public TxMode getTxMode() { - return delegate.getTxMode(); - } - - @Override - public String getSessionId() { - return delegate.getSessionId(); - } - - @Override - public CompletableFuture getStatusFuture() { - return delegate.getStatusFuture(); - } - } - @Override public void close() { querySession.close(); diff --git a/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java b/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java index 303e69229..2da768d4e 100644 --- a/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java +++ b/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java @@ -4,16 +4,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.annotation.Nullable; import org.junit.After; import org.junit.AfterClass; @@ -27,10 +17,7 @@ import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Result; import tech.ydb.core.Status; -import tech.ydb.core.StatusCode; -import tech.ydb.core.UnexpectedResultException; import tech.ydb.core.grpc.GrpcTransport; -import tech.ydb.core.tracing.Scope; import tech.ydb.core.tracing.Span; import tech.ydb.core.tracing.SpanKind; import tech.ydb.core.tracing.Tracer; @@ -38,14 +25,6 @@ import tech.ydb.query.QuerySession; import tech.ydb.query.QueryTransaction; import tech.ydb.query.result.QueryInfo; -import tech.ydb.query.tools.SessionRetryContext; -import tech.ydb.proto.StatusCodesProtos; -import tech.ydb.table.Session; -import tech.ydb.table.TableClient; -import tech.ydb.table.query.DataQueryResult; -import tech.ydb.table.query.Params; -import tech.ydb.table.transaction.TableTransaction; -import tech.ydb.table.transaction.TxControl; import tech.ydb.test.junit4.YdbHelperRule; public class QueryTracingTest { @@ -54,17 +33,14 @@ public class QueryTracingTest { private static GrpcTransport transport; private static RecordingTracer tracer; - private static final GrpcTestInterceptor grpcInterceptor = new GrpcTestInterceptor(); private QueryClient queryClient; - private TableClient tableClient; @BeforeClass public static void initTransport() { tracer = new RecordingTracer(); transport = GrpcTransport.forEndpoint(YDB.endpoint(), YDB.database()) .withAuthProvider(new TokenAuthProvider(YDB.authToken())) - .addChannelInitializer(grpcInterceptor) .withTracer(tracer) .build(); } @@ -77,513 +53,70 @@ public static void closeTransport() { @Before public void initClient() { tracer.reset(); - grpcInterceptor.reset(); queryClient = QueryClient.newClient(transport).build(); - tableClient = null; } @After public void closeClient() { - if (tableClient != null) { - tableClient.close(); - } - if (queryClient != null) { - queryClient.close(); - } - } - - private static void awaitTracing() { - try { - tracer.awaitAllSpansClosed(Duration.ofSeconds(30)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - Assert.fail(e.toString()); - } + queryClient.close(); } @Test - public void createSessionSpanIsRecorded() { - try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { - Assert.assertNotNull(session.getId()); - awaitTracing(); - Assert.assertEquals(1, tracer.countClosedSpan("ydb.CreateSession")); - } - } - - @Test - public void executeQuerySpanIsRecorded() { + public void queryServiceSpansAreRecorded() { try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { session.createQuery("SELECT 1", TxMode.NONE).execute().join().getStatus().expectSuccess(); - } - awaitTracing(); - Assert.assertEquals(1, tracer.countClosedSpan("ydb.ExecuteQuery")); - } - - @Test - public void commitSpanIsRecordedInQueryTransaction() { - try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { Result txResult = session.beginTransaction(TxMode.SERIALIZABLE_RW).join(); txResult.getStatus().expectSuccess(); - Result queryResult = txResult.getValue().createQuery("SELECT 1").execute().join(); - queryResult.getStatus().expectSuccess(); Result commitResult = txResult.getValue().commit().join(); commitResult.getStatus().expectSuccess(); - } - - awaitTracing(); - Assert.assertEquals(1, tracer.countClosedSpan("ydb.Commit")); - } - @Test - public void rollbackSpanIsRecordedInQueryTransaction() { - try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { - Result txResult = session.beginTransaction(TxMode.SERIALIZABLE_RW).join(); - txResult.getStatus().expectSuccess(); - Result queryResult = txResult.getValue().createQuery("SELECT 1").execute().join(); - queryResult.getStatus().expectSuccess(); - Status rollbackStatus = txResult.getValue().rollback().join(); + Result txResult2 = session.beginTransaction(TxMode.SERIALIZABLE_RW).join(); + txResult2.getStatus().expectSuccess(); + Status rollbackStatus = txResult2.getValue().rollback().join(); rollbackStatus.expectSuccess(); } - awaitTracing(); - Assert.assertEquals(1, tracer.countClosedSpan("ydb.Rollback")); - } - - @Test - public void createSessionAndExecuteQuerySpansAreRecordedInTableProxy() { - tableClient = QueryClient.newTableClient(transport).build(); - try (Session session = tableClient.createSession(Duration.ofSeconds(5)).join().getValue()) { - Result result = session.executeDataQuery( - "SELECT 1", - TxControl.serializableRw().setCommitTx(true), - Params.empty() - ).join(); - result.getStatus().expectSuccess(); - } - - awaitTracing(); - Assert.assertEquals(1, tracer.countClosedSpan("ydb.CreateSession")); - Assert.assertEquals(1, tracer.countClosedSpan("ydb.ExecuteQuery")); - } - - @Test - public void executeQuerySpansAreRecordedInTableProxyTransaction() { - tableClient = QueryClient.newTableClient(transport).build(); - try (Session session = tableClient.createSession(Duration.ofSeconds(5)).join().getValue()) { - TableTransaction tx1 = session.createNewTransaction(TxMode.SERIALIZABLE_RW); - Result q1 = tx1.executeDataQuery("SELECT 1").join(); - q1.getStatus().expectSuccess(); - Status commitStatus = tx1.commit().join(); - commitStatus.expectSuccess(); - - TableTransaction tx2 = session.createNewTransaction(TxMode.SERIALIZABLE_RW); - Result q2 = tx2.executeDataQuery("SELECT 1").join(); - q2.getStatus().expectSuccess(); - Status rollbackStatus = tx2.rollback().join(); - rollbackStatus.expectSuccess(); - } - - awaitTracing(); - Assert.assertEquals(1, tracer.countClosedSpan("ydb.CreateSession")); - Assert.assertEquals(2, tracer.countClosedSpan("ydb.ExecuteQuery")); - Assert.assertEquals(1, tracer.countClosedSpan("ydb.Commit")); - Assert.assertEquals(1, tracer.countClosedSpan("ydb.Rollback")); - } - - @Test - public void querySpanIsChildOfApplicationSpan() { - try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { - Span appParent = tracer.startSpan("app.parent", SpanKind.INTERNAL); - try (Scope ignored = appParent.makeCurrent()) { - session.createQuery("SELECT 1", TxMode.NONE).execute().join().getStatus().expectSuccess(); - } finally { - appParent.end(); - } - } - - awaitTracing(); - Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "app.parent")); - } - - @Test - public void retrySpanIsParentOfRpcSpans() { - grpcInterceptor.failExecuteQuery(StatusCodesProtos.StatusIds.StatusCode.BAD_SESSION, 2); - SessionRetryContext retryContext = SessionRetryContext.create(queryClient) - .maxRetries(5) - .backoffSlot(Duration.ofMillis(1)) - .fastBackoffSlot(Duration.ofMillis(1)) - .build(); - - Span appParent = tracer.startSpan("app.parent.retry", SpanKind.INTERNAL); - try (Scope ignored = appParent.makeCurrent()) { - Status status = retryContext.supplyStatus(session -> - session.createQuery("SELECT 1", TxMode.NONE).execute().thenApply(Result::getStatus)).join(); - status.expectSuccess(); - } finally { - appParent.end(); - } - - awaitTracing(); - Assert.assertEquals(3, tracer.countClosedSpan("ydb.Try")); - Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.RunWithRetry", "app.parent.retry")); - Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.Try", "ydb.RunWithRetry")); - Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.CreateSession", "ydb.Try")); - Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.Try")); - // First Try has no backoff_ms (no prior sleep); subsequent tries capture the prior sleep duration - Assert.assertEquals(2, tracer.countClosedSpanWithAttribute("ydb.Try", "ydb.retry.backoff_ms")); - } - - @Test - public void retryContextRetriesOnCreateSessionFailures() { - grpcInterceptor.failCreateSession(StatusCodesProtos.StatusIds.StatusCode.ABORTED, 2); - SessionRetryContext retryContext = SessionRetryContext.create(queryClient) - .maxRetries(5) - .backoffSlot(Duration.ofMillis(1)) - .fastBackoffSlot(Duration.ofMillis(1)) - .build(); - - Span appParent = tracer.startSpan("app.parent.createSession.retry", SpanKind.INTERNAL); - try (Scope ignored = appParent.makeCurrent()) { - Status status = retryContext.supplyStatus(session -> - session.createQuery("SELECT 1", TxMode.NONE).execute().thenApply(Result::getStatus)).join(); - status.expectSuccess(); - } finally { - appParent.end(); - } - - awaitTracing(); - Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.RunWithRetry", "app.parent.createSession.retry")); - Assert.assertEquals(3, tracer.countClosedSpan("ydb.Try")); - Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.Try", "ydb.RunWithRetry")); - Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.CreateSession", "ydb.Try")); - Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.Try")); - Assert.assertEquals(2, tracer.countClosedSpanWithAttribute("ydb.Try", "ydb.retry.backoff_ms")); - } - - @Test - public void retryContextRetriesOnCommitFailures() { - grpcInterceptor.failCommit(StatusCodesProtos.StatusIds.StatusCode.BAD_SESSION, 2); - SessionRetryContext retryContext = SessionRetryContext.create(queryClient) - .maxRetries(5) - .backoffSlot(Duration.ofMillis(1)) - .fastBackoffSlot(Duration.ofMillis(1)) - .build(); - - Span appParent = tracer.startSpan("app.parent.commit.retry", SpanKind.INTERNAL); - try (Scope ignored = appParent.makeCurrent()) { - Status status = retryContext.supplyStatus(session -> { - QueryTransaction tx = session.createNewTransaction(TxMode.SERIALIZABLE_RW); - return tx.createQuery("SELECT 1").execute() - .thenCompose(queryResult -> { - queryResult.getStatus().expectSuccess(); - return tx.commit().thenApply(Result::getStatus); - }); - } - ).join(); - status.expectSuccess(); - } finally { - appParent.end(); - } - - awaitTracing(); - Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.RunWithRetry", "app.parent.commit.retry")); - Assert.assertEquals(3, tracer.countClosedSpan("ydb.Try")); - Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.Try", "ydb.RunWithRetry")); - Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.CreateSession", "ydb.Try")); - Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.Try")); - Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.Commit", "ydb.Try")); - Assert.assertEquals(2, tracer.countClosedSpanWithAttribute("ydb.Try", "ydb.retry.backoff_ms")); - } - - @Test - public void tableProxyRetrySpanIsParentOfRpcSpans() { - AtomicInteger attempt = new AtomicInteger(); - tableClient = QueryClient.newTableClient(transport).build(); - tech.ydb.table.SessionRetryContext retryContext = tech.ydb.table.SessionRetryContext.create(tableClient) - .maxRetries(5) - .backoffSlot(Duration.ofMillis(1)) - .fastBackoffSlot(Duration.ofMillis(1)) - .build(); - - Span appParent = tracer.startSpan("app.parent.table.retry", SpanKind.INTERNAL); - try (Scope ignored = appParent.makeCurrent()) { - Status status = retryContext.supplyStatus(session -> { - if (attempt.getAndIncrement() == 0) { - return CompletableFuture.completedFuture(Status.of(StatusCode.OVERLOADED)); - } - TableTransaction tx = session.createNewTransaction(TxMode.SERIALIZABLE_RW); - Result queryResult = tx.executeDataQuery("SELECT 1", Params.empty()).join(); - queryResult.getStatus().expectSuccess(); - return tx.commit(); - }).join(); - status.expectSuccess(); - } finally { - appParent.end(); - } - - awaitTracing(); - Assert.assertEquals(2, tracer.countClosedSpan("ydb.Try")); - Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.RunWithRetry", "app.parent.table.retry")); - Assert.assertEquals(2, tracer.countClosedSpanWithParent("ydb.Try", "ydb.RunWithRetry")); - Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.CreateSession", "ydb.Try")); - Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.Try")); - Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.Commit", "ydb.Try")); - } - - @Test - public void nonRetryableExceptionClosesRetrySpan() { - SessionRetryContext retryContext = SessionRetryContext.create(queryClient) - .maxRetries(5) - .backoffSlot(Duration.ofMillis(1)) - .fastBackoffSlot(Duration.ofMillis(1)) - .build(); - - RuntimeException thrown; - try { - retryContext.supplyStatus(session -> { - throw new IllegalStateException("boom"); - }).join(); - throw new AssertionError("Exception expected"); - } catch (RuntimeException ex) { - thrown = ex; - } - - awaitTracing(); - Assert.assertNotNull(thrown.getCause()); - Assert.assertTrue(thrown.getCause() instanceof IllegalStateException); - Assert.assertEquals(1, - tracer.countClosedSpanWithErrorType("ydb.RunWithRetry", IllegalStateException.class.getName())); - Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.Try", "ydb.RunWithRetry")); - Assert.assertEquals(1, tracer.countClosedSpan("ydb.Try")); - Assert.assertEquals(1, - tracer.countClosedSpanWithErrorType("ydb.Try", IllegalStateException.class.getName())); - Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.CreateSession", "ydb.Try")); - Assert.assertEquals(0, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.Try")); - } - - @Test - public void retryableUnexpectedResultExceptionRetriesAndSetsErrorType() { - AtomicInteger attempt = new AtomicInteger(); - SessionRetryContext retryContext = SessionRetryContext.create(queryClient) - .maxRetries(5) - .backoffSlot(Duration.ofMillis(1)) - .fastBackoffSlot(Duration.ofMillis(1)) - .build(); - - Status status = retryContext.supplyStatus(session -> { - if (attempt.getAndIncrement() == 0) { - throw new UnexpectedResultException("retryable", Status.of(StatusCode.OVERLOADED)); - } - return session.createQuery("SELECT 1", TxMode.NONE).execute().thenApply(Result::getStatus); - }).join(); - status.expectSuccess(); - - awaitTracing(); - Assert.assertEquals(1, tracer.countClosedSpan("ydb.RunWithRetry")); - Assert.assertEquals(2, tracer.countClosedSpanWithParent("ydb.Try", "ydb.RunWithRetry")); - Assert.assertEquals(2, tracer.countClosedSpan("ydb.Try")); - Assert.assertEquals(1, - tracer.countClosedSpanWithErrorType("ydb.Try", UnexpectedResultException.class.getName())); - Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.Try")); - } - - @Test - public void contextCancelClosesRunWithRetrySpan() throws InterruptedException { - SessionRetryContext retryContext = SessionRetryContext.create(queryClient) - .maxRetries(5) - .backoffSlot(Duration.ofMillis(1)) - .fastBackoffSlot(Duration.ofMillis(1)) - .build(); - - CountDownLatch firstPartReceived = new CountDownLatch(1); - CountDownLatch unblockReader = new CountDownLatch(1); - - CompletableFuture future = retryContext.supplyStatus(session -> - session.createQuery("SELECT 1; SELECT 2;", TxMode.NONE).execute(part -> { - firstPartReceived.countDown(); - try { - unblockReader.await(30, TimeUnit.SECONDS); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - }).thenApply(Result::getStatus)); - - Assert.assertTrue(firstPartReceived.await(30, TimeUnit.SECONDS)); - future.cancel(false); - unblockReader.countDown(); - - awaitTracing(); - Assert.assertThrows(CancellationException.class, future::join); - Assert.assertTrue(future.isCancelled()); - Assert.assertEquals(1, - tracer.countClosedSpanWithErrorType("ydb.RunWithRetry", CancellationException.class.getName())); + Assert.assertTrue(tracer.hasClosedSpan("ydb.CreateSession")); + Assert.assertTrue(tracer.hasClosedSpan("ydb.ExecuteQuery")); + Assert.assertTrue(tracer.hasClosedSpan("ydb.Commit")); + Assert.assertTrue(tracer.hasClosedSpan("ydb.Rollback")); } private static final class RecordingTracer implements Tracer { - private final List spans = Collections.synchronizedList(new ArrayList<>()); - private final ThreadLocal currentSpan = new ThreadLocal<>(); - - private final Object spanLock = new Object(); - private int openSpans = 0; + private final List spans = Collections.synchronizedList(new ArrayList()); @Override public Span startSpan(String spanName, SpanKind spanKind) { - RecordingSpan span = new RecordingSpan(this, spanName, spanKind, currentSpan.get()); + RecordingSpan span = new RecordingSpan(spanName, spanKind); spans.add(span); return span; } - /** Called from {@link RecordingSpan} constructor: one span opened. */ - void recordSpanOpen() { - synchronized (spanLock) { - openSpans++; - } - } - - /** Called from {@link RecordingSpan#end()}: one span closed. */ - void recordSpanClose() { - synchronized (spanLock) { - openSpans--; - if (openSpans == 0) { - spanLock.notifyAll(); - } - } - } - void reset() { - synchronized (spanLock) { - spans.clear(); - openSpans = 0; - } + spans.clear(); } - /** Waits until every span that called {@link #recordSpanOpen} has ended ({@code openSpans == 0}). */ - void awaitAllSpansClosed(Duration timeout) throws InterruptedException { - long deadlineNanos = System.nanoTime() + timeout.toNanos(); - synchronized (spanLock) { - while (openSpans > 0) { - long remainingNanos = deadlineNanos - System.nanoTime(); - if (remainingNanos <= 0) { - Assert.fail("await timeout, openSpans=" + openSpans); - } - long waitMillis = TimeUnit.NANOSECONDS.toMillis(remainingNanos); - if (waitMillis == 0) { - waitMillis = 1; - } - spanLock.wait(waitMillis); - } - } - } - - int countClosedSpan(String spanName) { - int count = 0; + boolean hasClosedSpan(String spanName) { synchronized (spans) { for (RecordingSpan span : spans) { - if (span.spanName().equals(spanName) && span.isClosed()) { - count++; + if (span.name.equals(spanName) && span.closed) { + return true; } } } - return count; - } - - int countClosedSpanWithParent(String spanName, String parentSpanName) { - int count = 0; - synchronized (spans) { - for (RecordingSpan span : spans) { - if (span.isClosed() - && span.spanName().equals(spanName) - && span.parentSpan() != null - && span.parentSpan().spanName().equals(parentSpanName)) { - count++; - } - } - } - return count; - } - - int countClosedSpanWithErrorType(String spanName, String errorType) { - int count = 0; - synchronized (spans) { - for (RecordingSpan span : spans) { - Throwable err = span.throwableError(); - if (span.isClosed() - && span.spanName().equals(spanName) - && err != null - && err.getClass().getName().equals(errorType)) { - count++; - } - } - } - return count; - } - - int countClosedSpanWithAttribute(String spanName, String key) { - int count = 0; - synchronized (spans) { - for (RecordingSpan span : spans) { - if (span.isClosed() - && span.spanName().equals(spanName) - && span.hasLongAttribute(key)) { - count++; - } - } - } - return count; - } - - Scope makeSpanCurrent(RecordingSpan span) { - RecordingSpan previous = currentSpan.get(); - currentSpan.set(span); - return () -> { - if (previous == null) { - currentSpan.remove(); - } else { - currentSpan.set(previous); - } - }; + return false; } } private static final class RecordingSpan implements Span { - private final RecordingTracer tracer; private final String name; private final SpanKind kind; - private final RecordingSpan parent; - private final ConcurrentMap longAttributes = new ConcurrentHashMap<>(); - private Throwable throwableError; private volatile boolean closed = false; - private final AtomicBoolean endedOnce = new AtomicBoolean(false); - RecordingSpan(RecordingTracer tracer, String name, SpanKind kind, RecordingSpan parent) { - this.tracer = tracer; + RecordingSpan(String name, SpanKind kind) { this.name = name; this.kind = kind; - this.parent = parent; - tracer.recordSpanOpen(); - } - - String spanName() { - return name; - } - - boolean isClosed() { - return closed; - } - - @Nullable - RecordingSpan parentSpan() { - return parent; - } - - @Nullable - Throwable throwableError() { - return throwableError; - } - - boolean hasLongAttribute(String key) { - return longAttributes.containsKey(key); } @Override @@ -592,37 +125,28 @@ public String getId() { } @Override - public boolean isValid() { - return true; + public void setAttribute(String key, String value) { + // not needed for this test } @Override - public Scope makeCurrent() { - return tracer.makeSpanCurrent(this); - } - - @Override - public Scope restoreContext() { - return parent != null ? parent.makeCurrent() : Span.NOOP.makeCurrent(); + public void setAttribute(String key, long value) { + // not needed for this test } @Override - public void setStatus(@Nullable Status status, @Nullable Throwable error) { - this.throwableError = error; + public void setError(Status status) { + // not needed for this test } @Override - public void setAttribute(String key, long value) { - longAttributes.put(key, value); + public void setError(Throwable error) { + // not needed for this test } @Override public void end() { - if (!endedOnce.compareAndSet(false, true)) { - return; - } this.closed = true; - tracer.recordSpanClose(); } } } diff --git a/table/src/main/java/tech/ydb/table/impl/SimpleTableClient.java b/table/src/main/java/tech/ydb/table/impl/SimpleTableClient.java index 596466acb..199d3ae73 100644 --- a/table/src/main/java/tech/ydb/table/impl/SimpleTableClient.java +++ b/table/src/main/java/tech/ydb/table/impl/SimpleTableClient.java @@ -6,7 +6,6 @@ import tech.ydb.core.Result; import tech.ydb.core.StatusCode; -import tech.ydb.core.tracing.Tracer; import tech.ydb.table.Session; import tech.ydb.table.SessionSupplier; import tech.ydb.table.rpc.TableRpc; @@ -39,11 +38,6 @@ public ScheduledExecutorService getScheduler() { return tableRpc.getScheduler(); } - @Override - public Tracer getTracer() { - return tableRpc.getTracer(); - } - public static Builder newClient(TableRpc rpc) { return new Builder(rpc); } From caacba55dc4081473e44a75f0a51dd877f7909da Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Tue, 3 Mar 2026 15:50:44 +0300 Subject: [PATCH 02/16] fix linter --- .../src/main/java/tech/ydb/query/impl/TableClientImpl.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java index 9689ae9d3..1a7a8fcb3 100644 --- a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java @@ -116,7 +116,8 @@ public CompletableFuture> executeDataQueryInternal( final List issues = new ArrayList<>(); final List results = new ArrayList<>(); - QueryStream stream = querySession.new StreamImpl(querySession.createGrpcStream(query, tc, prms, qs, null), null) { + QueryStream stream = querySession.new StreamImpl(querySession.createGrpcStream(query, tc, prms, qs, null), + null) { @Override void handleTxMeta(String txID) { txRef.set(txID); @@ -130,7 +131,8 @@ public void onIssues(Issue[] issueArr) { } @Override - public void onNextPart(QueryResultPart part) { } // not used + public void onNextPart(QueryResultPart part) { + } // not used @Override public void onNextRawPart(long index, ValueProtos.ResultSet rs) { From 5043844f911c2b7ba2b29aca107dab93478305ea Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Tue, 3 Mar 2026 16:03:25 +0300 Subject: [PATCH 03/16] fix linter --- .../main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java index 5e9b557f2..27665b3df 100644 --- a/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java +++ b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java @@ -4,7 +4,6 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; - import io.opentelemetry.api.trace.StatusCode; import tech.ydb.core.Status; From 6a43d8ae7cd516544bee83f1df43dc6fbcd19b71 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Wed, 4 Mar 2026 12:48:53 +0300 Subject: [PATCH 04/16] fix issues --- .../main/java/tech/ydb/core/tracing/Span.java | 8 +- .../java/tech/ydb/query/impl/SessionImpl.java | 76 +++++-------------- .../java/tech/ydb/query/impl/SessionPool.java | 53 +++---------- 3 files changed, 39 insertions(+), 98 deletions(-) diff --git a/core/src/main/java/tech/ydb/core/tracing/Span.java b/core/src/main/java/tech/ydb/core/tracing/Span.java index 7ae35b322..8b202c8c1 100644 --- a/core/src/main/java/tech/ydb/core/tracing/Span.java +++ b/core/src/main/java/tech/ydb/core/tracing/Span.java @@ -9,10 +9,14 @@ public interface Span { String getId(); - /** Sets a string attribute on the span (ignored by Noop implementation). */ + /** + * Sets a string attribute on the span (ignored by Noop implementation). + */ void setAttribute(String key, String value); - /** Sets a long attribute on the span (ignored by Noop implementation). */ + /** + * Sets a long attribute on the span (ignored by Noop implementation). + */ void setAttribute(String key, long value); /** diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java index 05251101e..d9b595a11 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java @@ -27,6 +27,7 @@ import tech.ydb.core.operation.StatusExtractor; import tech.ydb.core.settings.BaseRequestSettings; import tech.ydb.core.tracing.Span; +import tech.ydb.core.tracing.SpanFinalizer; import tech.ydb.core.utils.URITools; import tech.ydb.core.utils.UpdatableOptional; import tech.ydb.proto.ValueProtos; @@ -181,26 +182,19 @@ private GrpcRequestSettings.Builder makeOptions(BaseRequestSettings settings) { private GrpcRequestSettings.Builder makeOptions(BaseRequestSettings settings, Span span) { String traceId = settings.getTraceId() == null ? UUID.randomUUID().toString() : settings.getTraceId(); - GrpcRequestSettings.Builder builder = GrpcRequestSettings.newBuilder() + return GrpcRequestSettings.newBuilder() .withDeadline(settings.getRequestTimeout()) .withPreferredNodeID((int) nodeID) - .withTraceId(traceId); - if (span != null) { - builder.withSpan(span); - } - return builder; + .withTraceId(traceId) + .withSpan(span); } private static YdbQuery.ExecMode mapExecMode(QueryExecMode mode) { switch (mode) { - case EXECUTE: - return YdbQuery.ExecMode.EXEC_MODE_EXECUTE; - case EXPLAIN: - return YdbQuery.ExecMode.EXEC_MODE_EXPLAIN; - case PARSE: - return YdbQuery.ExecMode.EXEC_MODE_PARSE; - case VALIDATE: - return YdbQuery.ExecMode.EXEC_MODE_VALIDATE; + case EXECUTE: return YdbQuery.ExecMode.EXEC_MODE_EXECUTE; + case EXPLAIN: return YdbQuery.ExecMode.EXEC_MODE_EXPLAIN; + case PARSE: return YdbQuery.ExecMode.EXEC_MODE_PARSE; + case VALIDATE: return YdbQuery.ExecMode.EXEC_MODE_VALIDATE; case UNSPECIFIED: default: @@ -210,14 +204,10 @@ private static YdbQuery.ExecMode mapExecMode(QueryExecMode mode) { private static YdbQuery.StatsMode mapStatsMode(QueryStatsMode mode) { switch (mode) { - case NONE: - return YdbQuery.StatsMode.STATS_MODE_NONE; - case BASIC: - return YdbQuery.StatsMode.STATS_MODE_BASIC; - case FULL: - return YdbQuery.StatsMode.STATS_MODE_FULL; - case PROFILE: - return YdbQuery.StatsMode.STATS_MODE_PROFILE; + case NONE: return YdbQuery.StatsMode.STATS_MODE_NONE; + case BASIC: return YdbQuery.StatsMode.STATS_MODE_BASIC; + case FULL: return YdbQuery.StatsMode.STATS_MODE_FULL; + case PROFILE: return YdbQuery.StatsMode.STATS_MODE_PROFILE; case UNSPECIFIED: default: @@ -337,7 +327,7 @@ static CompletableFuture> createSession( return rpc.createSession(request, grpcSettingsBuilder.build()).thenApply(result -> { pessimizationHook.set(result.getStatus().getCode() == StatusCode.OVERLOADED); if (!result.isSuccess()) { - finishSpanByStatus(createSpan, result.getStatus()); + SpanFinalizer.finishByStatus(createSpan, result.getStatus()); } return CREATE_SESSION.apply(result); }); @@ -364,11 +354,11 @@ void handleCompletion(Status status, Throwable th) { return; } if (th != null) { - finishSpanByError(operationSpan, th); + SpanFinalizer.finishByError(operationSpan, th); return; } - finishSpanByStatus(operationSpan, status); + SpanFinalizer.finishByStatus(operationSpan, status); } @Override @@ -504,7 +494,7 @@ public CompletableFuture> commit(CommitTransactionSettings set if (transactionId == null) { Issue issue = Issue.of("Transaction is not started", Issue.Severity.WARNING); Result res = Result.success(new QueryInfo(null), Status.of(StatusCode.SUCCESS, issue)); - finishSpanByStatus(commitSpan, res.getStatus()); + SpanFinalizer.finishByStatus(commitSpan, res.getStatus()); return CompletableFuture.completedFuture(res); } @@ -527,10 +517,10 @@ public CompletableFuture> commit(CommitTransactionSettings set if (th != null) { currentStatusFuture.completeExceptionally( new RuntimeException("Transaction commit failed with exception", th)); - finishSpanByError(commitSpan, th); + SpanFinalizer.finishByError(commitSpan, th); return; } - finishSpanByStatus(commitSpan, status.getStatus()); + SpanFinalizer.finishByStatus(commitSpan, status.getStatus()); })); } @@ -543,7 +533,7 @@ public CompletableFuture rollback(RollbackTransactionSettings settings) if (transactionId == null) { Issue issue = Issue.of("Transaction is not started", Issue.Severity.WARNING); Status status = Status.of(StatusCode.SUCCESS, issue); - finishSpanByStatus(rollbackSpan, status); + SpanFinalizer.finishByStatus(rollbackSpan, status); return CompletableFuture.completedFuture(status); } @@ -565,35 +555,11 @@ public CompletableFuture rollback(RollbackTransactionSettings settings) .of(StatusCode.ABORTED) .withIssues(Issue.of("Transaction was rolled back", Issue.Severity.ERROR))); if (th != null) { - finishSpanByError(rollbackSpan, th); + SpanFinalizer.finishByError(rollbackSpan, th); return; } - finishSpanByStatus(rollbackSpan, status); + SpanFinalizer.finishByStatus(rollbackSpan, status); }); } } - - private static void finishSpanByStatus(Span span, Status status) { - if (span == null) { - return; - } - - if (status != null && !status.isSuccess()) { - span.setError(status); - } - - span.end(); - } - - private static void finishSpanByError(Span span, Throwable error) { - if (span == null) { - return; - } - - if (error != null) { - span.setError(error); - } - - span.end(); - } } diff --git a/query/src/main/java/tech/ydb/query/impl/SessionPool.java b/query/src/main/java/tech/ydb/query/impl/SessionPool.java index e522f47a9..8351d993e 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionPool.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionPool.java @@ -23,6 +23,7 @@ import tech.ydb.core.UnexpectedResultException; import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.tracing.Span; +import tech.ydb.core.tracing.SpanFinalizer; import tech.ydb.core.utils.FutureTools; import tech.ydb.proto.query.YdbQuery; import tech.ydb.query.QuerySession; @@ -155,22 +156,16 @@ private boolean tryComplete(CompletableFuture> future, Pool private class PooledQuerySession extends SessionImpl { private final GrpcReadStream attachStream; - private final Span createSpan; private volatile Instant lastActive; private volatile boolean isStarted = false; private volatile boolean isBroken = false; private volatile boolean isStopped = false; - PooledQuerySession( - QueryServiceRpc rpc, - YdbQuery.CreateSessionResponse response, - Span createSpan - ) { + PooledQuerySession(QueryServiceRpc rpc, YdbQuery.CreateSessionResponse response) { super(rpc, response); this.lastActive = clock.instant(); this.attachStream = attach(ATTACH_SETTINGS); - this.createSpan = createSpan; stats.created.increment(); } @@ -179,12 +174,12 @@ public void updateSessionState(Status status) { this.lastActive = clock.instant(); boolean isStatusBroken = status.getCode() == StatusCode.BAD_SESSION || - status.getCode() == StatusCode.SESSION_BUSY || - status.getCode() == StatusCode.INTERNAL_ERROR || - status.getCode() == StatusCode.CLIENT_DEADLINE_EXCEEDED || - status.getCode() == StatusCode.CLIENT_DEADLINE_EXPIRED || - status.getCode() == StatusCode.CLIENT_CANCELLED || - status.getCode() == StatusCode.TRANSPORT_UNAVAILABLE; + status.getCode() == StatusCode.SESSION_BUSY || + status.getCode() == StatusCode.INTERNAL_ERROR || + status.getCode() == StatusCode.CLIENT_DEADLINE_EXCEEDED || + status.getCode() == StatusCode.CLIENT_DEADLINE_EXPIRED || + status.getCode() == StatusCode.CLIENT_CANCELLED || + status.getCode() == StatusCode.TRANSPORT_UNAVAILABLE; if (isStatusBroken) { logger.warn("QuerySession[{}] broken with status {}", getId(), status); } @@ -277,7 +272,6 @@ private class Handler implements WaitingQueue.Handler { } @Override - @SuppressWarnings("resource") public CompletableFuture create() { // Execute createSession call outside current context to avoid cancellation and deadline propogation Context ctx = Context.ROOT.fork(); @@ -289,20 +283,20 @@ public CompletableFuture create() { .createSession(rpc, CREATE_SETTINGS, true, createSpan) .thenCompose(r -> { if (!r.isSuccess()) { - finishSpanByStatus(createSpan, r.getStatus()); + SpanFinalizer.finishByStatus(createSpan, r.getStatus()); stats.failed.increment(); throw new UnexpectedResultException("create session problem", r.getStatus()); } - PooledQuerySession session = new PooledQuerySession(rpc, r.getValue(), createSpan); + PooledQuerySession session = new PooledQuerySession(rpc, r.getValue()); return session.start(); }) .whenComplete((result, th) -> { if (th != null) { - finishSpanByError(createSpan, FutureTools.unwrapCompletionException(th)); + SpanFinalizer.finishByError(createSpan, FutureTools.unwrapCompletionException(th)); return; } - finishSpanByStatus(createSpan, result.getStatus()); + SpanFinalizer.finishByStatus(createSpan, result.getStatus()); }) .thenApply(Result::getValue); } finally { @@ -475,27 +469,4 @@ public void run() { } } - private static void finishSpanByStatus(Span span, Status status) { - if (span == null) { - return; - } - - if (!status.isSuccess()) { - span.setError(status); - } - - span.end(); - } - - private static void finishSpanByError(Span span, Throwable error) { - if (span == null) { - return; - } - - if (error != null) { - span.setError(error); - } - - span.end(); - } } From 9bde31d065db617f630530525e08ad339bd92263 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Wed, 4 Mar 2026 15:22:38 +0300 Subject: [PATCH 05/16] chore fixes --- .../java/tech/ydb/query/impl/SessionImpl.java | 119 +++++++++--------- .../java/tech/ydb/query/impl/SessionPool.java | 1 - 2 files changed, 60 insertions(+), 60 deletions(-) diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java index d9b595a11..2850ac701 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java @@ -191,10 +191,14 @@ private GrpcRequestSettings.Builder makeOptions(BaseRequestSettings settings, Sp private static YdbQuery.ExecMode mapExecMode(QueryExecMode mode) { switch (mode) { - case EXECUTE: return YdbQuery.ExecMode.EXEC_MODE_EXECUTE; - case EXPLAIN: return YdbQuery.ExecMode.EXEC_MODE_EXPLAIN; - case PARSE: return YdbQuery.ExecMode.EXEC_MODE_PARSE; - case VALIDATE: return YdbQuery.ExecMode.EXEC_MODE_VALIDATE; + case EXECUTE: + return YdbQuery.ExecMode.EXEC_MODE_EXECUTE; + case EXPLAIN: + return YdbQuery.ExecMode.EXEC_MODE_EXPLAIN; + case PARSE: + return YdbQuery.ExecMode.EXEC_MODE_PARSE; + case VALIDATE: + return YdbQuery.ExecMode.EXEC_MODE_VALIDATE; case UNSPECIFIED: default: @@ -204,10 +208,14 @@ private static YdbQuery.ExecMode mapExecMode(QueryExecMode mode) { private static YdbQuery.StatsMode mapStatsMode(QueryStatsMode mode) { switch (mode) { - case NONE: return YdbQuery.StatsMode.STATS_MODE_NONE; - case BASIC: return YdbQuery.StatsMode.STATS_MODE_BASIC; - case FULL: return YdbQuery.StatsMode.STATS_MODE_FULL; - case PROFILE: return YdbQuery.StatsMode.STATS_MODE_PROFILE; + case NONE: + return YdbQuery.StatsMode.STATS_MODE_NONE; + case BASIC: + return YdbQuery.StatsMode.STATS_MODE_BASIC; + case FULL: + return YdbQuery.StatsMode.STATS_MODE_FULL; + case PROFILE: + return YdbQuery.StatsMode.STATS_MODE_PROFILE; case UNSPECIFIED: default: @@ -350,15 +358,6 @@ abstract class StreamImpl implements QueryStream { abstract void handleTxMeta(String txId); void handleCompletion(Status status, Throwable th) { - if (operationSpan == null) { - return; - } - if (th != null) { - SpanFinalizer.finishByError(operationSpan, th); - return; - } - - SpanFinalizer.finishByStatus(operationSpan, status); } @Override @@ -366,51 +365,54 @@ public CompletableFuture> execute(PartsHandler handler) { final UpdatableOptional operationStatus = new UpdatableOptional<>(); final UpdatableOptional stats = new UpdatableOptional<>(); return grpcStream.start(msg -> { - if (isTraceEnabled) { - logger.trace("{} got stream message {}", SessionImpl.this, TextFormat.shortDebugString(msg)); - } - Issue[] issues = Issue.fromPb(msg.getIssuesList()); - Status status = Status.of(StatusCode.fromProto(msg.getStatus()), issues); + if (isTraceEnabled) { + logger.trace("{} got stream message {}", SessionImpl.this, TextFormat.shortDebugString(msg)); + } + Issue[] issues = Issue.fromPb(msg.getIssuesList()); + Status status = Status.of(StatusCode.fromProto(msg.getStatus()), issues); - updateSessionState(status); + updateSessionState(status); - if (!status.isSuccess()) { - handleTxMeta(null); - operationStatus.update(status); - return; - } + if (!status.isSuccess()) { + handleTxMeta(null); + operationStatus.update(status); + return; + } - if (msg.hasTxMeta()) { - handleTxMeta(msg.getTxMeta().getId()); - } - if (issues.length > 0) { - if (handler != null) { - handler.onIssues(issues); - } else { - logger.trace("{} lost issues message", SessionImpl.this); - } - } - if (msg.hasExecStats()) { - stats.update(new QueryStats(msg.getExecStats())); - } + if (msg.hasTxMeta()) { + handleTxMeta(msg.getTxMeta().getId()); + } + if (issues.length > 0) { + if (handler != null) { + handler.onIssues(issues); + } else { + logger.trace("{} lost issues message", SessionImpl.this); + } + } + if (msg.hasExecStats()) { + stats.update(new QueryStats(msg.getExecStats())); + } - if (msg.hasResultSet()) { - long index = msg.getResultSetIndex(); - if (handler != null) { - handler.onNextRawPart(index, msg.getResultSet()); - } else { - logger.trace("{} lost result set part with index {}", SessionImpl.this, index); - } - } - }).whenComplete(this::handleCompletion).thenApply(streamStatus -> { - updateSessionState(streamStatus); - Status status = operationStatus.orElse(streamStatus); - if (status.isSuccess()) { - return Result.success(new QueryInfo(stats.get()), streamStatus); - } else { - return Result.fail(status); - } - }); + if (msg.hasResultSet()) { + long index = msg.getResultSetIndex(); + if (handler != null) { + handler.onNextRawPart(index, msg.getResultSet()); + } else { + logger.trace("{} lost result set part with index {}", SessionImpl.this, index); + } + } + }) + .whenComplete(this::handleCompletion) + .whenComplete(SpanFinalizer.whenComplete(operationSpan)) + .thenApply(streamStatus -> { + updateSessionState(streamStatus); + Status status = operationStatus.orElse(streamStatus); + if (status.isSuccess()) { + return Result.success(new QueryInfo(stats.get()), streamStatus); + } else { + return Result.fail(status); + } + }); } @Override @@ -459,7 +461,6 @@ void handleTxMeta(String txID) { @Override void handleCompletion(Status status, Throwable th) { - super.handleCompletion(status, th); if (th != null) { currentStatusFuture.completeExceptionally( new RuntimeException("Query on transaction failed with exception ", th)); diff --git a/query/src/main/java/tech/ydb/query/impl/SessionPool.java b/query/src/main/java/tech/ydb/query/impl/SessionPool.java index 8351d993e..b04712de1 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionPool.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionPool.java @@ -468,5 +468,4 @@ public void run() { } } } - } From 5a9fdaf70234c059a477bf3d8830c7e47ebba57c Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Wed, 4 Mar 2026 15:43:17 +0300 Subject: [PATCH 06/16] chore fixes --- query/src/main/java/tech/ydb/query/impl/SessionImpl.java | 3 ++- query/src/main/java/tech/ydb/query/impl/SessionPool.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java index 2850ac701..92d0cd138 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java @@ -366,7 +366,8 @@ public CompletableFuture> execute(PartsHandler handler) { final UpdatableOptional stats = new UpdatableOptional<>(); return grpcStream.start(msg -> { if (isTraceEnabled) { - logger.trace("{} got stream message {}", SessionImpl.this, TextFormat.shortDebugString(msg)); + logger.trace("{} got stream message {}", SessionImpl.this, + TextFormat.shortDebugString(msg)); } Issue[] issues = Issue.fromPb(msg.getIssuesList()); Status status = Status.of(StatusCode.fromProto(msg.getStatus()), issues); diff --git a/query/src/main/java/tech/ydb/query/impl/SessionPool.java b/query/src/main/java/tech/ydb/query/impl/SessionPool.java index b04712de1..3f1ab66df 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionPool.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionPool.java @@ -292,7 +292,7 @@ public CompletableFuture create() { }) .whenComplete((result, th) -> { if (th != null) { - SpanFinalizer.finishByError(createSpan, FutureTools.unwrapCompletionException(th)); + SpanFinalizer.finishByError(createSpan, th); return; } From 2af6403583dde6bbcd11d7f708c8837102f55f58 Mon Sep 17 00:00:00 2001 From: Michael Kim Date: Sat, 25 Apr 2026 04:11:45 +0300 Subject: [PATCH 07/16] + feat: metrics --- .../tech/ydb/core/grpc/GrpcTransport.java | 6 + .../ydb/core/grpc/GrpcTransportBuilder.java | 12 ++ .../tech/ydb/core/impl/YdbTransportImpl.java | 8 + .../java/tech/ydb/core/metrics/Meter.java | 9 + .../java/tech/ydb/core/metrics/NoopMeter.java | 30 +++ .../ydb/core/metrics/SessionPoolObserver.java | 7 + .../ydb/opentelemetry/OpenTelemetryMeter.java | 99 +++++++++ .../OpenTelemetryMetricsIntegrationTest.java | 192 ++++++++++++++++++ .../tech/ydb/query/impl/QueryServiceRpc.java | 7 + .../java/tech/ydb/query/impl/SessionImpl.java | 32 ++- .../java/tech/ydb/query/impl/SessionPool.java | 26 +++ 11 files changed, 423 insertions(+), 5 deletions(-) create mode 100644 core/src/main/java/tech/ydb/core/metrics/Meter.java create mode 100644 core/src/main/java/tech/ydb/core/metrics/NoopMeter.java create mode 100644 core/src/main/java/tech/ydb/core/metrics/SessionPoolObserver.java create mode 100644 opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryMeter.java create mode 100644 opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryMetricsIntegrationTest.java diff --git a/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java b/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java index 96816601f..daccd0310 100644 --- a/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java +++ b/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java @@ -12,6 +12,8 @@ import io.grpc.MethodDescriptor; import tech.ydb.core.Result; +import tech.ydb.core.metrics.NoopMeter; +import tech.ydb.core.metrics.Meter; import tech.ydb.core.tracing.NoopTracer; import tech.ydb.core.tracing.Tracer; import tech.ydb.core.utils.URITools; @@ -46,6 +48,10 @@ default Tracer getTracer() { return NoopTracer.getInstance(); } + default Meter getMeter() { + return NoopMeter.INSTANCE; + } + @Override void close(); diff --git a/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java b/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java index 534cf08be..9d13113b7 100644 --- a/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java +++ b/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java @@ -26,6 +26,8 @@ import tech.ydb.core.impl.auth.GrpcAuthRpc; import tech.ydb.core.impl.pool.ChannelFactoryLoader; import tech.ydb.core.impl.pool.ManagedChannelFactory; +import tech.ydb.core.metrics.NoopMeter; +import tech.ydb.core.metrics.Meter; import tech.ydb.core.tracing.NoopTracer; import tech.ydb.core.tracing.Tracer; import tech.ydb.core.utils.Version; @@ -87,6 +89,7 @@ public enum InitMode { private GrpcCompression compression = GrpcCompression.NO_COMPRESSION; private InitMode initMode = InitMode.SYNC; private Tracer tracer = NoopTracer.getInstance(); + private Meter meter = NoopMeter.getInstance(); /** * can cause leaks https://github.com/grpc/grpc-java/issues/9340 @@ -195,6 +198,10 @@ public Tracer getTracer() { return tracer; } + public Meter getMeter() { + return meter; + } + public ManagedChannelFactory getManagedChannelFactory() { if (channelFactoryBuilder == null) { channelFactoryBuilder = ChannelFactoryLoader.load(); @@ -423,6 +430,11 @@ public GrpcTransportBuilder withTracer(Tracer tracer) { return this; } + public GrpcTransportBuilder withMeter(Meter meter) { + this.meter = Objects.requireNonNull(meter, "meter is null"); + return this; + } + /** * use {@link GrpcTransportBuilder#withGrpcRetry(boolean) } instead * @return this diff --git a/core/src/main/java/tech/ydb/core/impl/YdbTransportImpl.java b/core/src/main/java/tech/ydb/core/impl/YdbTransportImpl.java index 52d4ba018..768634801 100644 --- a/core/src/main/java/tech/ydb/core/impl/YdbTransportImpl.java +++ b/core/src/main/java/tech/ydb/core/impl/YdbTransportImpl.java @@ -21,6 +21,7 @@ import tech.ydb.core.impl.pool.GrpcChannel; import tech.ydb.core.impl.pool.GrpcChannelPool; import tech.ydb.core.impl.pool.ManagedChannelFactory; +import tech.ydb.core.metrics.Meter; import tech.ydb.core.tracing.Tracer; /** @@ -37,6 +38,7 @@ public class YdbTransportImpl extends BaseGrpcTransport { private final GrpcChannelPool channelPool; private final YdbDiscovery discovery; private final Tracer tracer; + private final Meter meter; public YdbTransportImpl(GrpcTransportBuilder builder) { super(builder); @@ -45,6 +47,7 @@ public YdbTransportImpl(GrpcTransportBuilder builder) { this.database = Strings.nullToEmpty(builder.getDatabase()); this.tracer = builder.getTracer(); + this.meter = builder.getMeter(); logger.info("Create YDB transport with endpoint {} and {}", serverEndpoint, balancingSettings); @@ -124,6 +127,11 @@ public Tracer getTracer() { return tracer; } + @Override + public Meter getMeter() { + return meter; + } + @Override public AuthCallOptions getAuthCallOptions() { return callOptions; diff --git a/core/src/main/java/tech/ydb/core/metrics/Meter.java b/core/src/main/java/tech/ydb/core/metrics/Meter.java new file mode 100644 index 000000000..c94913257 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/metrics/Meter.java @@ -0,0 +1,9 @@ +package tech.ydb.core.metrics; + +import tech.ydb.core.Status; + +public interface Meter { + void recordOperation(String name, long durationNanos, Status status); + void registerSessionPool(String poolName, SessionPoolObserver observer); + void recordSessionCreateTime(String poolName, long durationNanos); +} diff --git a/core/src/main/java/tech/ydb/core/metrics/NoopMeter.java b/core/src/main/java/tech/ydb/core/metrics/NoopMeter.java new file mode 100644 index 000000000..cd126efd5 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/metrics/NoopMeter.java @@ -0,0 +1,30 @@ +package tech.ydb.core.metrics; + +import tech.ydb.core.Status; + +public final class NoopMeter implements Meter { + public static final NoopMeter INSTANCE = new NoopMeter(); + + private NoopMeter() { + // No operations. + } + + public static NoopMeter getInstance() { + return INSTANCE; + } + + @Override + public void recordOperation(String name, long durationNanos, Status status) { + + } + + @Override + public void registerSessionPool(String poolName, SessionPoolObserver observer) { + + } + + @Override + public void recordSessionCreateTime(String poolName, long durationNanos) { + + } +} diff --git a/core/src/main/java/tech/ydb/core/metrics/SessionPoolObserver.java b/core/src/main/java/tech/ydb/core/metrics/SessionPoolObserver.java new file mode 100644 index 000000000..4b872c6f5 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/metrics/SessionPoolObserver.java @@ -0,0 +1,7 @@ +package tech.ydb.core.metrics; + +public interface SessionPoolObserver { + int getIdleCount(); + int getUsedCount(); + int getPendingCount(); +} diff --git a/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryMeter.java b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryMeter.java new file mode 100644 index 000000000..63867688b --- /dev/null +++ b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryMeter.java @@ -0,0 +1,99 @@ +package tech.ydb.opentelemetry; + +import java.util.Objects; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.LongCounter; + +import tech.ydb.core.Status; +import tech.ydb.core.metrics.Meter; +import tech.ydb.core.metrics.SessionPoolObserver; + +public final class OpenTelemetryMeter implements Meter { + private final io.opentelemetry.api.metrics.Meter meter; + + private final DoubleHistogram operationDuration; // db.client.operation.duration + private final LongCounter operationFailed; // ydb.client.operation.failed + private final DoubleHistogram sessionCreateTime; // ydb.query.session.create_time + + private final Attributes baseAttributes; // db.system.name, db.namespace, server.address, server.port + + private OpenTelemetryMeter(io.opentelemetry.api.metrics.Meter meter, + String database, String host, int port) { + this.meter = Objects.requireNonNull(meter, "meter is null"); + this.operationDuration = meter.histogramBuilder("db.client.operation.duration") + .setUnit("s") + .build(); + this.operationFailed = meter.counterBuilder("ydb.client.operation.failed") + .setUnit("{command}") + .build(); + this.sessionCreateTime = meter.histogramBuilder("ydb.query.session.create_time") + .setUnit("s") + .build(); + + this.baseAttributes = Attributes.of( + AttributeKey.stringKey("db.system.name"), "ydb", + AttributeKey.stringKey("db.namespace"), database, + AttributeKey.stringKey("server.address"), host, + AttributeKey.longKey("server.port"), (long) port + ); + } + + public static OpenTelemetryMeter fromOpenTelemetry(OpenTelemetry openTelemetry, + String database, String host, int port) { + io.opentelemetry.api.metrics.Meter meter = openTelemetry + .getMeter("tech.ydb.sdk"); + + return new OpenTelemetryMeter(meter, database, host, port); + } + + public static OpenTelemetryMeter createGlobal(String database, String host, int port) { + return fromOpenTelemetry(GlobalOpenTelemetry.get(), database, host, port); + } + + @Override + public void recordOperation(String name, long durationNanos, Status status) { + Attributes attrs = baseAttributes.toBuilder() + .put("ydb.operation.name", name) + .build(); + + operationDuration.record(durationNanos / 1_000_000_000.0, attrs); + + if (status != null && !status.isSuccess()) { + Attributes errAttrs = attrs.toBuilder() + .put("db.response.status_code", status.getCode().toString()) + .build(); + operationFailed.add(1, errAttrs); + } + } + + @Override + public void registerSessionPool(String poolName, SessionPoolObserver observer) { + meter.upDownCounterBuilder("ydb.query.session.count") + .setUnit("{connection}") + .buildWithCallback(measurement -> { + Attributes idle = Attributes.of( + AttributeKey.stringKey("ydb.query.session.pool.name"), poolName, + AttributeKey.stringKey("ydb.query.session.state"), "idle" + ); + Attributes used = Attributes.of( + AttributeKey.stringKey("ydb.query.session.pool.name"), poolName, + AttributeKey.stringKey("ydb.query.session.state"), "used" + ); + measurement.record(observer.getIdleCount(), idle); + measurement.record(observer.getUsedCount(), used); + }); + } + + @Override + public void recordSessionCreateTime(String poolName, long durationNanos) { + Attributes attrs = Attributes.of( + AttributeKey.stringKey("ydb.query.session.pool.name"), poolName + ); + sessionCreateTime.record(durationNanos / 1_000_000_000.0, attrs); + } +} diff --git a/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryMetricsIntegrationTest.java b/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryMetricsIntegrationTest.java new file mode 100644 index 000000000..dc4d5c736 --- /dev/null +++ b/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryMetricsIntegrationTest.java @@ -0,0 +1,192 @@ +package tech.ydb.opentelemetry; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import tech.ydb.auth.TokenAuthProvider; +import tech.ydb.common.transaction.TxMode; +import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.query.QueryClient; +import tech.ydb.query.QuerySession; +import tech.ydb.query.QueryTransaction; +import tech.ydb.test.junit4.YdbHelperRule; + +public class OpenTelemetryMetricsIntegrationTest { + @ClassRule + public static final YdbHelperRule YDB = new YdbHelperRule(); + + private static final AttributeKey DB_SYSTEM_NAME = AttributeKey.stringKey("db.system.name"); + private static final AttributeKey DB_NAMESPACE = AttributeKey.stringKey("db.namespace"); + private static final AttributeKey SERVER_ADDRESS = AttributeKey.stringKey("server.address"); + private static final AttributeKey SERVER_PORT = AttributeKey.longKey("server.port"); + private static final AttributeKey OPERATION_NAME = AttributeKey.stringKey("ydb.operation.name"); + + private static InMemoryMetricReader metricReader; + private static SdkMeterProvider meterProvider; + private static GrpcTransport transport; + + private QueryClient queryClient; + + @BeforeClass + public static void initTransport() { + metricReader = InMemoryMetricReader.create(); + meterProvider = SdkMeterProvider.builder() + .registerMetricReader(metricReader) + .build(); + + OpenTelemetry openTelemetry = OpenTelemetrySdk.builder() + .setMeterProvider(meterProvider) + .build(); + + String host = extractHost(YDB.endpoint()); + int port = extractPort(YDB.endpoint()); + + transport = GrpcTransport.forEndpoint(YDB.endpoint(), YDB.database()) + .withAuthProvider(new TokenAuthProvider(YDB.authToken())) + .withMeter(OpenTelemetryMeter.fromOpenTelemetry(openTelemetry, YDB.database(), host, port)) + .build(); + } + + @AfterClass + public static void closeTransport() throws IOException { + transport.close(); + meterProvider.close(); + metricReader.close(); + } + + @Before + public void initClient() { + queryClient = QueryClient.newClient(transport).build(); + } + + @After + public void closeClient() { + queryClient.close(); + } + + @Test + public void executeQueryRecordsOperationDuration() { + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + session.createQuery("SELECT 1", TxMode.NONE).execute().join().getStatus().expectSuccess(); + } + + MetricData metric = findMetric("db.client.operation.duration"); + Assert.assertNotNull("db.client.operation.duration metric not found", metric); + + HistogramPointData point = findHistogramPoint(metric, "ydb.ExecuteQuery"); + Assert.assertNotNull("No histogram point for ydb.ExecuteQuery", point); + Assert.assertTrue("Duration must be > 0", point.getSum() > 0); + Assert.assertEquals("ydb", point.getAttributes().get(DB_SYSTEM_NAME)); + Assert.assertEquals(YDB.database(), point.getAttributes().get(DB_NAMESPACE)); + Assert.assertNotNull(point.getAttributes().get(SERVER_ADDRESS)); + Assert.assertNotNull(point.getAttributes().get(SERVER_PORT)); + } + + @Test + public void commitAndRollbackRecordOperationDuration() { + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + QueryTransaction txCommit = session.beginTransaction(TxMode.SERIALIZABLE_RW) + .join().getValue(); + txCommit.createQuery("SELECT 1").execute().join().getStatus().expectSuccess(); + txCommit.commit().join().getStatus().expectSuccess(); + + QueryTransaction txRollback = session.beginTransaction(TxMode.SERIALIZABLE_RW) + .join().getValue(); + txRollback.createQuery("SELECT 1").execute().join().getStatus().expectSuccess(); + txRollback.rollback().join().expectSuccess(); + } + + MetricData metric = findMetric("db.client.operation.duration"); + Assert.assertNotNull(metric); + + Assert.assertNotNull("No histogram point for ydb.Commit", + findHistogramPoint(metric, "ydb.Commit")); + Assert.assertNotNull("No histogram point for ydb.Rollback", + findHistogramPoint(metric, "ydb.Rollback")); + } + + @Test + public void failedOperationRecordsFailedCounter() { + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + session.createQuery("SELECT * FROM __nonexistent_table__", TxMode.NONE) + .execute().join(); + } + + MetricData metric = findMetric("ydb.client.operation.failed"); + Assert.assertNotNull("ydb.client.operation.failed metric not found", metric); + + Collection points = metric.getLongSumData().getPoints(); + Assert.assertFalse("Failed counter must have at least one point", points.isEmpty()); + long total = points.stream().mapToLong(LongPointData::getValue).sum(); + Assert.assertTrue("Failed counter must be > 0", total > 0); + } + + @Test + public void sessionPoolMetricsAreReported() { + // создаём сессию чтобы пул оживился + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + session.createQuery("SELECT 1", TxMode.NONE).execute().join().getStatus().expectSuccess(); + } + + MetricData metric = findMetric("ydb.query.session.count"); + Assert.assertNotNull("ydb.query.session.count metric not found", metric); + } + + // --- helpers --- + + private MetricData findMetric(String name) { + Collection metrics = metricReader.collectAllMetrics(); + for (MetricData m : metrics) { + if (name.equals(m.getName())) { + return m; + } + } + return null; + } + + private HistogramPointData findHistogramPoint(MetricData metric, String operationName) { + for (HistogramPointData point : metric.getHistogramData().getPoints()) { + String op = point.getAttributes().get(OPERATION_NAME); + if (operationName.equals(op)) { + return point; + } + } + return null; + } + + private static String extractHost(String endpoint) { + // endpoint вида grpc://host:port или host:port + String stripped = endpoint.replaceFirst("grpcs?://", ""); + int colon = stripped.lastIndexOf(':'); + return colon >= 0 ? stripped.substring(0, colon) : stripped; + } + + private static int extractPort(String endpoint) { + String stripped = endpoint.replaceFirst("grpcs?://", ""); + int colon = stripped.lastIndexOf(':'); + if (colon >= 0) { + try { + return Integer.parseInt(stripped.substring(colon + 1)); + } catch (NumberFormatException ignored) { + } + } + return 2135; + } +} diff --git a/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java b/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java index 66c2e0a6c..6038c447a 100644 --- a/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java +++ b/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java @@ -6,6 +6,7 @@ import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.metrics.Meter; import tech.ydb.core.operation.StatusExtractor; import tech.ydb.core.tracing.Span; import tech.ydb.core.tracing.SpanKind; @@ -51,16 +52,22 @@ class QueryServiceRpc { private final GrpcTransport transport; private final Tracer trace; + private final Meter meter; QueryServiceRpc(GrpcTransport transport) { this.transport = transport; this.trace = transport.getTracer(); + this.meter = transport.getMeter(); } Span startSpan(String spanName) { return trace.startSpan(spanName, SpanKind.CLIENT); } + Meter getMeter() { + return meter; + } + public CompletableFuture> createSession( YdbQuery.CreateSessionRequest request, GrpcRequestSettings settings) { return transport diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java index 92d0cd138..1c9b5dbc1 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java @@ -294,7 +294,9 @@ GrpcReadStream createGrpcStream( public QueryStream createQuery(String query, TxMode tx, Params prms, ExecuteQuerySettings settings) { YdbQuery.TransactionControl tc = TxControl.txModeCtrl(tx, true); Span span = rpc.startSpan("ydb.ExecuteQuery"); - return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span) { + long startNanos = System.nanoTime(); + + return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span, startNanos, "ydb.ExecuteQuery") { @Override void handleTxMeta(String txID) { if (txID != null && !txID.isEmpty()) { @@ -345,14 +347,18 @@ abstract class StreamImpl implements QueryStream { private final GrpcReadStream grpcStream; @Nullable private final Span operationSpan; + private final long startNanos; + private final String operationName; StreamImpl( GrpcReadStream grpcStream, @Nullable - Span operationSpan + Span operationSpan, long startNanos, String operationName ) { this.grpcStream = grpcStream; this.operationSpan = operationSpan; + this.startNanos = startNanos; + this.operationName = operationName; } abstract void handleTxMeta(String txId); @@ -408,6 +414,8 @@ public CompletableFuture> execute(PartsHandler handler) { .thenApply(streamStatus -> { updateSessionState(streamStatus); Status status = operationStatus.orElse(streamStatus); + long elapsed = System.nanoTime() - startNanos; + rpc.getMeter().recordOperation(operationName, elapsed, status); if (status.isSuccess()) { return Result.success(new QueryInfo(stats.get()), streamStatus); } else { @@ -451,7 +459,9 @@ public QueryStream createQuery(String query, boolean commitAtEnd, Params prms, E : TxControl.txModeCtrl(txMode, commitAtEnd); Span span = rpc.startSpan("ydb.ExecuteQuery"); - return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span) { + long startNanos = System.nanoTime(); + + return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span, startNanos, "ydb.ExecuteQuery") { @Override void handleTxMeta(String txID) { String newId = txID == null || txID.isEmpty() ? null : txID; @@ -474,7 +484,7 @@ void handleCompletion(Status status, Throwable th) { currentStatusFuture.complete(Status .of(StatusCode.ABORTED) .withIssues(Issue.of("Query on transaction failed with status " - + status, Issue.Severity.ERROR))); + + status, Issue.Severity.ERROR))); } } @@ -491,6 +501,8 @@ public void cancel() { @Override public CompletableFuture> commit(CommitTransactionSettings settings) { final Span commitSpan = rpc.startSpan("ydb.Commit"); + final long startNanos = System.nanoTime(); + CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>()); final String transactionId = txId.get(); if (transactionId == null) { @@ -523,12 +535,18 @@ public CompletableFuture> commit(CommitTransactionSettings set return; } SpanFinalizer.finishByStatus(commitSpan, status.getStatus()); - })); + })).whenComplete((status, th) -> { // добавить + long elapsed = System.nanoTime() - startNanos; + Status finalStatus = status != null ? status.getStatus() : Status.of(StatusCode.CLIENT_INTERNAL_ERROR); + rpc.getMeter().recordOperation("ydb.Commit", elapsed, finalStatus); + }); } @Override public CompletableFuture rollback(RollbackTransactionSettings settings) { final Span rollbackSpan = rpc.startSpan("ydb.Rollback"); + final long startNanos = System.nanoTime(); + CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>()); final String transactionId = txId.get(); @@ -561,6 +579,10 @@ public CompletableFuture rollback(RollbackTransactionSettings settings) return; } SpanFinalizer.finishByStatus(rollbackSpan, status); + }).whenComplete((status, th) -> { + long elapsed = System.nanoTime() - startNanos; + Status finalStatus = status != null ? status : Status.of(StatusCode.CLIENT_INTERNAL_ERROR); + rpc.getMeter().recordOperation("ydb.Rollback", elapsed, finalStatus); }); } } diff --git a/query/src/main/java/tech/ydb/query/impl/SessionPool.java b/query/src/main/java/tech/ydb/query/impl/SessionPool.java index 3f1ab66df..d03cbbc87 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionPool.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionPool.java @@ -22,6 +22,7 @@ import tech.ydb.core.StatusCode; import tech.ydb.core.UnexpectedResultException; import tech.ydb.core.grpc.GrpcReadStream; +import tech.ydb.core.metrics.SessionPoolObserver; import tech.ydb.core.tracing.Span; import tech.ydb.core.tracing.SpanFinalizer; import tech.ydb.core.utils.FutureTools; @@ -79,6 +80,23 @@ class SessionPool implements AutoCloseable { minSize, maxSize, cleaner.periodMillis); + + rpc.getMeter().registerSessionPool("default", new SessionPoolObserver() { + @Override + public int getIdleCount() { + return queue.getIdleCount(); + } + + @Override + public int getUsedCount() { + return queue.getUsedCount(); + } + + @Override + public int getPendingCount() { + return queue.getPendingCount(); + } + }); } public void updateMaxSize(int maxSize) { @@ -278,6 +296,8 @@ public CompletableFuture create() { Context previous = ctx.attach(); try { Span createSpan = rpc.startSpan("ydb.CreateSession"); + long startNanos = System.nanoTime(); + stats.requested.increment(); return SessionImpl .createSession(rpc, CREATE_SETTINGS, true, createSpan) @@ -298,6 +318,12 @@ public CompletableFuture create() { SpanFinalizer.finishByStatus(createSpan, result.getStatus()); }) + .whenComplete((status, th) -> { + long elapsed = System.nanoTime() - startNanos; + Status finalStatus = status != null ? status.getStatus() : Status.of(StatusCode.CLIENT_INTERNAL_ERROR); + rpc.getMeter().recordOperation("ydb.CreateSession", elapsed, finalStatus); + rpc.getMeter().recordSessionCreateTime("default", elapsed); + }) .thenApply(Result::getValue); } finally { ctx.detach(previous); From 549f74411c2cfb8fd0dcf2d0a116a2cff629d880 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Mon, 2 Mar 2026 17:33:53 +0300 Subject: [PATCH 08/16] feat: added spans # Conflicts: # opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java # query/src/main/java/tech/ydb/query/impl/SessionImpl.java # query/src/main/java/tech/ydb/query/impl/SessionPool.java --- query/src/main/java/tech/ydb/query/impl/TableClientImpl.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java index 1a7a8fcb3..dd7b6d0e6 100644 --- a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java @@ -116,8 +116,7 @@ public CompletableFuture> executeDataQueryInternal( final List issues = new ArrayList<>(); final List results = new ArrayList<>(); - QueryStream stream = querySession.new StreamImpl(querySession.createGrpcStream(query, tc, prms, qs, null), - null) { + QueryStream stream = querySession.new StreamImpl(querySession.createGrpcStream(query, tc, prms, qs, null), null) { @Override void handleTxMeta(String txID) { txRef.set(txID); From 74b3abdfbb2d7db11eebbad865b9a7efcd6698b0 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Tue, 3 Mar 2026 15:50:44 +0300 Subject: [PATCH 09/16] fix linter --- query/src/main/java/tech/ydb/query/impl/TableClientImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java index dd7b6d0e6..1a7a8fcb3 100644 --- a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java @@ -116,7 +116,8 @@ public CompletableFuture> executeDataQueryInternal( final List issues = new ArrayList<>(); final List results = new ArrayList<>(); - QueryStream stream = querySession.new StreamImpl(querySession.createGrpcStream(query, tc, prms, qs, null), null) { + QueryStream stream = querySession.new StreamImpl(querySession.createGrpcStream(query, tc, prms, qs, null), + null) { @Override void handleTxMeta(String txID) { txRef.set(txID); From b7716c42dbe0199548a7b4ca8e1832d63f2aae90 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Wed, 4 Mar 2026 15:04:10 +0300 Subject: [PATCH 10/16] feat: support TableClientImpl with spans --- .../tech/ydb/core/tracing/SpanFinalizer.java | 35 ++++++ .../java/tech/ydb/query/impl/SessionImpl.java | 20 ++++ .../tech/ydb/query/impl/TableClientImpl.java | 111 +++++++++++++++++- .../tech/ydb/query/impl/QueryTracingTest.java | 101 ++++++++++++++-- 4 files changed, 253 insertions(+), 14 deletions(-) create mode 100644 core/src/main/java/tech/ydb/core/tracing/SpanFinalizer.java diff --git a/core/src/main/java/tech/ydb/core/tracing/SpanFinalizer.java b/core/src/main/java/tech/ydb/core/tracing/SpanFinalizer.java new file mode 100644 index 000000000..ee0e2ea97 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/tracing/SpanFinalizer.java @@ -0,0 +1,35 @@ +package tech.ydb.core.tracing; + +import tech.ydb.core.Status; + +/** + * Shared helpers to finish spans for status/exception outcomes. + */ +public final class SpanFinalizer { + private SpanFinalizer() { + } + + public static void finishByStatus(Span span, Status status) { + if (span == null) { + return; + } + + if (!status.isSuccess()) { + span.setError(status); + } + + span.end(); + } + + public static void finishByError(Span span, Throwable error) { + if (span == null) { + return; + } + + if (error != null) { + span.setError(error); + } + + span.end(); + } +} diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java index 1c9b5dbc1..141dfc925 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java @@ -180,6 +180,10 @@ private GrpcRequestSettings.Builder makeOptions(BaseRequestSettings settings) { return makeOptions(settings, null); } + Span startSpan(String spanName) { + return rpc.startSpan(spanName); + } + private GrpcRequestSettings.Builder makeOptions(BaseRequestSettings settings, Span span) { String traceId = settings.getTraceId() == null ? UUID.randomUUID().toString() : settings.getTraceId(); return GrpcRequestSettings.newBuilder() @@ -244,6 +248,22 @@ private static YdbFormats.ArrowFormatSettings mapApacheArrowFormat(ApacheArrowFo return YdbFormats.ArrowFormatSettings.newBuilder().setCompressionCodec(codecBuilder).build(); } + CompletableFuture commitById(String txId, CommitTransactionSettings settings, Span span) { + YdbQuery.CommitTransactionRequest request = YdbQuery.CommitTransactionRequest.newBuilder() + .setSessionId(sessionId) + .setTxId(txId) + .build(); + return rpc.commitTransaction(request, makeOptions(settings, span).build()).thenApply(Result::getStatus); + } + + CompletableFuture rollbackById(String txId, RollbackTransactionSettings settings, Span span) { + YdbQuery.RollbackTransactionRequest request = YdbQuery.RollbackTransactionRequest.newBuilder() + .setSessionId(sessionId) + .setTxId(txId) + .build(); + return rpc.rollbackTransaction(request, makeOptions(settings, span).build()).thenApply(Result::getStatus); + } + GrpcReadStream createGrpcStream( String query, YdbQuery.TransactionControl tx, diff --git a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java index 1a7a8fcb3..9fbac4667 100644 --- a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java @@ -15,6 +15,8 @@ import tech.ydb.core.StatusCode; import tech.ydb.core.UnexpectedResultException; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.tracing.Span; +import tech.ydb.core.tracing.SpanFinalizer; import tech.ydb.proto.ValueProtos; import tech.ydb.proto.query.YdbQuery; import tech.ydb.proto.table.YdbTable; @@ -23,8 +25,10 @@ import tech.ydb.query.result.QueryInfo; import tech.ydb.query.result.QueryResultPart; import tech.ydb.query.result.QueryStats; +import tech.ydb.query.settings.CommitTransactionSettings; import tech.ydb.query.settings.ExecuteQuerySettings; import tech.ydb.query.settings.QueryStatsMode; +import tech.ydb.query.settings.RollbackTransactionSettings; import tech.ydb.table.Session; import tech.ydb.table.SessionPoolStats; import tech.ydb.table.TableClient; @@ -33,7 +37,11 @@ import tech.ydb.table.query.Params; import tech.ydb.table.rpc.TableRpc; import tech.ydb.table.rpc.grpc.GrpcTableRpc; +import tech.ydb.table.settings.BeginTxSettings; +import tech.ydb.table.settings.CommitTxSettings; import tech.ydb.table.settings.ExecuteDataQuerySettings; +import tech.ydb.table.settings.RollbackTxSettings; +import tech.ydb.table.transaction.TableTransaction; /** * @@ -115,9 +123,10 @@ public CompletableFuture> executeDataQueryInternal( final AtomicReference txRef = new AtomicReference<>(""); final List issues = new ArrayList<>(); final List results = new ArrayList<>(); + Span span = querySession.startSpan("ydb.ExecuteQuery"); - QueryStream stream = querySession.new StreamImpl(querySession.createGrpcStream(query, tc, prms, qs, null), - null) { + QueryStream stream = querySession.new StreamImpl(querySession.createGrpcStream(query, tc, prms, qs, span), + span) { @Override void handleTxMeta(String txID) { txRef.set(txID); @@ -176,6 +185,104 @@ protected void updateSessionState(Throwable th, StatusCode code, boolean shutdow } } + @Override + public TableTransaction createNewTransaction(TxMode txMode) { + return new TracedTableTransaction(super.createNewTransaction(txMode)); + } + + @Override + public CompletableFuture> beginTransaction(TxMode txMode, BeginTxSettings settings) { + return super.beginTransaction(txMode, settings).thenApply(result -> result.map(TracedTableTransaction::new)); + } + + @Override + public CompletableFuture commitTransaction(String txId, CommitTxSettings settings) { + Span span = querySession.startSpan("ydb.Commit"); + CommitTransactionSettings querySettings = CommitTransactionSettings.newBuilder() + .withTraceId(settings.getTraceId()) + .withRequestTimeout(settings.getTimeoutDuration()) + .build(); + return finishBySpan(querySession.commitById(txId, querySettings, span), span); + } + + @Override + public CompletableFuture rollbackTransaction(String txId, RollbackTxSettings settings) { + Span span = querySession.startSpan("ydb.Rollback"); + RollbackTransactionSettings querySettings = RollbackTransactionSettings.newBuilder() + .withTraceId(settings.getTraceId()) + .withRequestTimeout(settings.getTimeoutDuration()) + .build(); + return finishBySpan(querySession.rollbackById(txId, querySettings, span), span); + } + + private CompletableFuture finishBySpan(CompletableFuture future, Span span) { + return future.whenComplete((status, th) -> { + if (th != null) { + SpanFinalizer.finishByError(span, th); + return; + } + SpanFinalizer.finishByStatus(span, status); + }); + } + + private final class TracedTableTransaction implements TableTransaction { + private final TableTransaction delegate; + + private TracedTableTransaction(TableTransaction delegate) { + this.delegate = delegate; + } + + @Override + public Session getSession() { + return TableSession.this; + } + + @Override + public CompletableFuture> executeDataQuery( + String query, boolean commitAtEnd, Params params, ExecuteDataQuerySettings settings + ) { + return delegate.executeDataQuery(query, commitAtEnd, params, settings); + } + + @Override + public CompletableFuture commit(CommitTxSettings settings) { + String txId = delegate.getId(); + if (txId == null) { + return delegate.commit(settings); + } + return TableSession.this.commitTransaction(txId, settings); + } + + @Override + public CompletableFuture rollback(RollbackTxSettings settings) { + String txId = delegate.getId(); + if (txId == null) { + return delegate.rollback(settings); + } + return TableSession.this.rollbackTransaction(txId, settings); + } + + @Override + public String getId() { + return delegate.getId(); + } + + @Override + public TxMode getTxMode() { + return delegate.getTxMode(); + } + + @Override + public String getSessionId() { + return delegate.getSessionId(); + } + + @Override + public CompletableFuture getStatusFuture() { + return delegate.getStatusFuture(); + } + } + @Override public void close() { querySession.close(); diff --git a/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java b/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java index 2da768d4e..34f5f1893 100644 --- a/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java +++ b/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java @@ -25,6 +25,12 @@ import tech.ydb.query.QuerySession; import tech.ydb.query.QueryTransaction; import tech.ydb.query.result.QueryInfo; +import tech.ydb.table.Session; +import tech.ydb.table.TableClient; +import tech.ydb.table.query.DataQueryResult; +import tech.ydb.table.query.Params; +import tech.ydb.table.transaction.TableTransaction; +import tech.ydb.table.transaction.TxControl; import tech.ydb.test.junit4.YdbHelperRule; public class QueryTracingTest { @@ -35,6 +41,7 @@ public class QueryTracingTest { private static RecordingTracer tracer; private QueryClient queryClient; + private TableClient tableClient; @BeforeClass public static void initTransport() { @@ -54,33 +61,102 @@ public static void closeTransport() { public void initClient() { tracer.reset(); queryClient = QueryClient.newClient(transport).build(); + tableClient = null; } @After public void closeClient() { - queryClient.close(); + if (tableClient != null) { + tableClient.close(); + } + if (queryClient != null) { + queryClient.close(); + } + } + + @Test + public void createSessionSpanIsRecorded() { + try (QuerySession ignored = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + // no-op + } + + Assert.assertEquals(1, tracer.countClosedSpan("ydb.CreateSession")); } @Test - public void queryServiceSpansAreRecorded() { + public void executeQuerySpanIsRecorded() { try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { session.createQuery("SELECT 1", TxMode.NONE).execute().join().getStatus().expectSuccess(); + } + Assert.assertEquals(1, tracer.countClosedSpan("ydb.ExecuteQuery")); + } + + @Test + public void commitSpanIsRecordedInQueryTransaction() { + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { Result txResult = session.beginTransaction(TxMode.SERIALIZABLE_RW).join(); txResult.getStatus().expectSuccess(); + Result queryResult = txResult.getValue().createQuery("SELECT 1").execute().join(); + queryResult.getStatus().expectSuccess(); Result commitResult = txResult.getValue().commit().join(); commitResult.getStatus().expectSuccess(); + } + + Assert.assertEquals(1, tracer.countClosedSpan("ydb.Commit")); + } - Result txResult2 = session.beginTransaction(TxMode.SERIALIZABLE_RW).join(); - txResult2.getStatus().expectSuccess(); - Status rollbackStatus = txResult2.getValue().rollback().join(); + @Test + public void rollbackSpanIsRecordedInQueryTransaction() { + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + Result txResult = session.beginTransaction(TxMode.SERIALIZABLE_RW).join(); + txResult.getStatus().expectSuccess(); + Result queryResult = txResult.getValue().createQuery("SELECT 1").execute().join(); + queryResult.getStatus().expectSuccess(); + Status rollbackStatus = txResult.getValue().rollback().join(); + rollbackStatus.expectSuccess(); + } + + Assert.assertEquals(1, tracer.countClosedSpan("ydb.Rollback")); + } + + @Test + public void createSessionAndExecuteQuerySpansAreRecordedInTableProxy() { + tableClient = QueryClient.newTableClient(transport).build(); + try (Session session = tableClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + Result result = session.executeDataQuery( + "SELECT 1", + TxControl.serializableRw().setCommitTx(true), + Params.empty() + ).join(); + result.getStatus().expectSuccess(); + } + + Assert.assertEquals(1, tracer.countClosedSpan("ydb.CreateSession")); + Assert.assertEquals(1, tracer.countClosedSpan("ydb.ExecuteQuery")); + } + + @Test + public void executeQuerySpansAreRecordedInTableProxyTransaction() { + tableClient = QueryClient.newTableClient(transport).build(); + try (Session session = tableClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + TableTransaction tx1 = session.createNewTransaction(TxMode.SERIALIZABLE_RW); + Result q1 = tx1.executeDataQuery("SELECT 1").join(); + q1.getStatus().expectSuccess(); + Status commitStatus = tx1.commit().join(); + commitStatus.expectSuccess(); + + TableTransaction tx2 = session.createNewTransaction(TxMode.SERIALIZABLE_RW); + Result q2 = tx2.executeDataQuery("SELECT 1").join(); + q2.getStatus().expectSuccess(); + Status rollbackStatus = tx2.rollback().join(); rollbackStatus.expectSuccess(); } - Assert.assertTrue(tracer.hasClosedSpan("ydb.CreateSession")); - Assert.assertTrue(tracer.hasClosedSpan("ydb.ExecuteQuery")); - Assert.assertTrue(tracer.hasClosedSpan("ydb.Commit")); - Assert.assertTrue(tracer.hasClosedSpan("ydb.Rollback")); + Assert.assertEquals(1, tracer.countClosedSpan("ydb.CreateSession")); + Assert.assertEquals(2, tracer.countClosedSpan("ydb.ExecuteQuery")); + Assert.assertEquals(1, tracer.countClosedSpan("ydb.Commit")); + Assert.assertEquals(1, tracer.countClosedSpan("ydb.Rollback")); } private static final class RecordingTracer implements Tracer { @@ -97,15 +173,16 @@ void reset() { spans.clear(); } - boolean hasClosedSpan(String spanName) { + int countClosedSpan(String spanName) { + int count = 0; synchronized (spans) { for (RecordingSpan span : spans) { if (span.name.equals(spanName) && span.closed) { - return true; + count++; } } } - return false; + return count; } } From f9d364dc388c2d985c1040fbd278abef2ba8b9d5 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Wed, 4 Mar 2026 15:08:58 +0300 Subject: [PATCH 11/16] feat: linter --- query/src/main/java/tech/ydb/query/impl/TableClientImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java index 9fbac4667..ab1345371 100644 --- a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java @@ -192,7 +192,8 @@ public TableTransaction createNewTransaction(TxMode txMode) { @Override public CompletableFuture> beginTransaction(TxMode txMode, BeginTxSettings settings) { - return super.beginTransaction(txMode, settings).thenApply(result -> result.map(TracedTableTransaction::new)); + return super.beginTransaction(txMode, settings) + .thenApply(result -> result.map(TracedTableTransaction::new)); } @Override From 1d2a44c9cdaf7a0cfbad517f1e24018f72a3097f Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Wed, 4 Mar 2026 15:22:38 +0300 Subject: [PATCH 12/16] chore fixes --- .../tech/ydb/core/tracing/SpanFinalizer.java | 18 +++++++++++++++++- .../tech/ydb/query/impl/TableClientImpl.java | 14 ++------------ 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/tech/ydb/core/tracing/SpanFinalizer.java b/core/src/main/java/tech/ydb/core/tracing/SpanFinalizer.java index ee0e2ea97..a605bbe65 100644 --- a/core/src/main/java/tech/ydb/core/tracing/SpanFinalizer.java +++ b/core/src/main/java/tech/ydb/core/tracing/SpanFinalizer.java @@ -1,5 +1,7 @@ package tech.ydb.core.tracing; +import java.util.function.BiConsumer; + import tech.ydb.core.Status; /** @@ -14,7 +16,7 @@ public static void finishByStatus(Span span, Status status) { return; } - if (!status.isSuccess()) { + if (status != null && !status.isSuccess()) { span.setError(status); } @@ -32,4 +34,18 @@ public static void finishByError(Span span, Throwable error) { span.end(); } + + public static BiConsumer whenComplete(Span span) { + return (status, error) -> { + if (span == null) { + return; + } + + if (error != null) { + finishByError(span, error); + return; + } + finishByStatus(span, status); + }; + } } diff --git a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java index ab1345371..7ba406337 100644 --- a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java @@ -203,7 +203,7 @@ public CompletableFuture commitTransaction(String txId, CommitTxSettings .withTraceId(settings.getTraceId()) .withRequestTimeout(settings.getTimeoutDuration()) .build(); - return finishBySpan(querySession.commitById(txId, querySettings, span), span); + return querySession.commitById(txId, querySettings, span).whenComplete(SpanFinalizer.whenComplete(span)); } @Override @@ -213,17 +213,7 @@ public CompletableFuture rollbackTransaction(String txId, RollbackTxSett .withTraceId(settings.getTraceId()) .withRequestTimeout(settings.getTimeoutDuration()) .build(); - return finishBySpan(querySession.rollbackById(txId, querySettings, span), span); - } - - private CompletableFuture finishBySpan(CompletableFuture future, Span span) { - return future.whenComplete((status, th) -> { - if (th != null) { - SpanFinalizer.finishByError(span, th); - return; - } - SpanFinalizer.finishByStatus(span, status); - }); + return querySession.rollbackById(txId, querySettings, span).whenComplete(SpanFinalizer.whenComplete(span)); } private final class TracedTableTransaction implements TableTransaction { From 087a8c7da872165cfe82c483731010533cbc1997 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Wed, 4 Mar 2026 15:43:17 +0300 Subject: [PATCH 13/16] chore fixes --- core/src/main/java/tech/ydb/core/tracing/SpanFinalizer.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/tech/ydb/core/tracing/SpanFinalizer.java b/core/src/main/java/tech/ydb/core/tracing/SpanFinalizer.java index a605bbe65..b98ba895c 100644 --- a/core/src/main/java/tech/ydb/core/tracing/SpanFinalizer.java +++ b/core/src/main/java/tech/ydb/core/tracing/SpanFinalizer.java @@ -3,6 +3,7 @@ import java.util.function.BiConsumer; import tech.ydb.core.Status; +import tech.ydb.core.utils.FutureTools; /** * Shared helpers to finish spans for status/exception outcomes. @@ -29,7 +30,7 @@ public static void finishByError(Span span, Throwable error) { } if (error != null) { - span.setError(error); + span.setError(FutureTools.unwrapCompletionException(error)); } span.end(); From 6ca9af18a87fbe1d626bf0a95513c5d4dfe60ca8 Mon Sep 17 00:00:00 2001 From: Michael Kim Date: Sat, 25 Apr 2026 23:44:33 +0300 Subject: [PATCH 14/16] fix merge conflict --- core/src/main/java/tech/ydb/core/tracing/Span.java | 14 +++++++++++++- .../main/java/tech/ydb/query/impl/SessionPool.java | 10 +++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/tech/ydb/core/tracing/Span.java b/core/src/main/java/tech/ydb/core/tracing/Span.java index 8b202c8c1..07d5dd8b0 100644 --- a/core/src/main/java/tech/ydb/core/tracing/Span.java +++ b/core/src/main/java/tech/ydb/core/tracing/Span.java @@ -1,5 +1,7 @@ package tech.ydb.core.tracing; +import javax.annotation.Nullable; + import tech.ydb.core.Status; /** @@ -11,21 +13,31 @@ public interface Span { /** * Sets a string attribute on the span (ignored by Noop implementation). + * + * @param key attribute key + * @param value attribute value, may be null */ - void setAttribute(String key, String value); + void setAttribute(String key, @Nullable String value); /** * Sets a long attribute on the span (ignored by Noop implementation). + * + * @param key attribute key + * @param value attribute value */ void setAttribute(String key, long value); /** * Sets span status to error with human-readable message. + * + * @param status operation status used to map error attributes */ void setError(Status status); /** * Sets span status to error from exception. + * + * @param error exception used to map error attributes */ void setError(Throwable error); diff --git a/query/src/main/java/tech/ydb/query/impl/SessionPool.java b/query/src/main/java/tech/ydb/query/impl/SessionPool.java index d03cbbc87..78ad1fd76 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionPool.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionPool.java @@ -312,7 +312,15 @@ public CompletableFuture create() { }) .whenComplete((result, th) -> { if (th != null) { - SpanFinalizer.finishByError(createSpan, th); + Throwable error = FutureTools.unwrapCompletionException(th); + if (error instanceof UnexpectedResultException) { + SpanFinalizer.finishByStatus( + createSpan, + ((UnexpectedResultException) error).getStatus() + ); + } else { + SpanFinalizer.finishByError(createSpan, error); + } return; } From 26e3173d501ec0769f79ff67e8c4268c811fb01d Mon Sep 17 00:00:00 2001 From: Michael Kim Date: Sat, 25 Apr 2026 04:11:45 +0300 Subject: [PATCH 15/16] + feat: metrics --- core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java | 4 ++++ query/src/main/java/tech/ydb/query/impl/TableClientImpl.java | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java b/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java index daccd0310..c4decca36 100644 --- a/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java +++ b/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java @@ -52,6 +52,10 @@ default Meter getMeter() { return NoopMeter.INSTANCE; } + default Meter getMeter() { + return NoopMeter.INSTANCE; + } + @Override void close(); diff --git a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java index 7ba406337..f9f1dee21 100644 --- a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java @@ -124,9 +124,10 @@ public CompletableFuture> executeDataQueryInternal( final List issues = new ArrayList<>(); final List results = new ArrayList<>(); Span span = querySession.startSpan("ydb.ExecuteQuery"); + final long startNanos = System.nanoTime(); QueryStream stream = querySession.new StreamImpl(querySession.createGrpcStream(query, tc, prms, qs, span), - span) { + span, startNanos, "ydb.ExecuteQuery") { @Override void handleTxMeta(String txID) { txRef.set(txID); From 0ac006e9dd41961a85a0df7eb41171e1a35f3285 Mon Sep 17 00:00:00 2001 From: Michael Kim Date: Sat, 25 Apr 2026 23:50:26 +0300 Subject: [PATCH 16/16] fix --- opentelemetry/pom.xml | 33 +++++++++++++++++++ .../OpenTelemetryMetricsIntegrationTest.java | 2 +- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/opentelemetry/pom.xml b/opentelemetry/pom.xml index 921db5032..df320be1b 100644 --- a/opentelemetry/pom.xml +++ b/opentelemetry/pom.xml @@ -29,5 +29,38 @@ opentelemetry-api ${opentelemetry.version} + + org.testng + testng + 7.12.0 + test + + + io.opentelemetry + opentelemetry-sdk + 1.59.0 + test + + + junit + junit + test + + + tech.ydb + ydb-sdk-query + test + + + tech.ydb.test + ydb-junit4-support + test + + + io.opentelemetry + opentelemetry-sdk-testing + 1.59.0 + test + diff --git a/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryMetricsIntegrationTest.java b/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryMetricsIntegrationTest.java index dc4d5c736..d8501dc59 100644 --- a/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryMetricsIntegrationTest.java +++ b/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryMetricsIntegrationTest.java @@ -13,7 +13,7 @@ import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import org.junit.After; -import org.junit.AfterClass; +import org.testng.annotations.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass;