diff --git a/core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java b/core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java
index e7850d771..344697273 100644
--- a/core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java
+++ b/core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java
@@ -6,6 +6,8 @@
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
+import javax.annotation.Nullable;
+
import io.grpc.Metadata;
import tech.ydb.core.impl.call.GrpcFlows;
diff --git a/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java b/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java
index 96816601f..c4decca36 100644
--- a/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java
+++ b/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java
@@ -12,6 +12,8 @@
import io.grpc.MethodDescriptor;
import tech.ydb.core.Result;
+import tech.ydb.core.metrics.NoopMeter;
+import tech.ydb.core.metrics.Meter;
import tech.ydb.core.tracing.NoopTracer;
import tech.ydb.core.tracing.Tracer;
import tech.ydb.core.utils.URITools;
@@ -46,6 +48,14 @@ default Tracer getTracer() {
return NoopTracer.getInstance();
}
+ default Meter getMeter() {
+ return NoopMeter.INSTANCE;
+ }
+
+ default Meter getMeter() {
+ return NoopMeter.INSTANCE;
+ }
+
@Override
void close();
diff --git a/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java b/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java
index 534cf08be..9d13113b7 100644
--- a/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java
+++ b/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java
@@ -26,6 +26,8 @@
import tech.ydb.core.impl.auth.GrpcAuthRpc;
import tech.ydb.core.impl.pool.ChannelFactoryLoader;
import tech.ydb.core.impl.pool.ManagedChannelFactory;
+import tech.ydb.core.metrics.NoopMeter;
+import tech.ydb.core.metrics.Meter;
import tech.ydb.core.tracing.NoopTracer;
import tech.ydb.core.tracing.Tracer;
import tech.ydb.core.utils.Version;
@@ -87,6 +89,7 @@ public enum InitMode {
private GrpcCompression compression = GrpcCompression.NO_COMPRESSION;
private InitMode initMode = InitMode.SYNC;
private Tracer tracer = NoopTracer.getInstance();
+ private Meter meter = NoopMeter.getInstance();
/**
* can cause leaks https://github.com/grpc/grpc-java/issues/9340
@@ -195,6 +198,10 @@ public Tracer getTracer() {
return tracer;
}
+ public Meter getMeter() {
+ return meter;
+ }
+
public ManagedChannelFactory getManagedChannelFactory() {
if (channelFactoryBuilder == null) {
channelFactoryBuilder = ChannelFactoryLoader.load();
@@ -423,6 +430,11 @@ public GrpcTransportBuilder withTracer(Tracer tracer) {
return this;
}
+ public GrpcTransportBuilder withMeter(Meter meter) {
+ this.meter = Objects.requireNonNull(meter, "meter is null");
+ return this;
+ }
+
/**
* use {@link GrpcTransportBuilder#withGrpcRetry(boolean) } instead
* @return this
diff --git a/core/src/main/java/tech/ydb/core/impl/YdbTransportImpl.java b/core/src/main/java/tech/ydb/core/impl/YdbTransportImpl.java
index 52d4ba018..768634801 100644
--- a/core/src/main/java/tech/ydb/core/impl/YdbTransportImpl.java
+++ b/core/src/main/java/tech/ydb/core/impl/YdbTransportImpl.java
@@ -21,6 +21,7 @@
import tech.ydb.core.impl.pool.GrpcChannel;
import tech.ydb.core.impl.pool.GrpcChannelPool;
import tech.ydb.core.impl.pool.ManagedChannelFactory;
+import tech.ydb.core.metrics.Meter;
import tech.ydb.core.tracing.Tracer;
/**
@@ -37,6 +38,7 @@ public class YdbTransportImpl extends BaseGrpcTransport {
private final GrpcChannelPool channelPool;
private final YdbDiscovery discovery;
private final Tracer tracer;
+ private final Meter meter;
public YdbTransportImpl(GrpcTransportBuilder builder) {
super(builder);
@@ -45,6 +47,7 @@ public YdbTransportImpl(GrpcTransportBuilder builder) {
this.database = Strings.nullToEmpty(builder.getDatabase());
this.tracer = builder.getTracer();
+ this.meter = builder.getMeter();
logger.info("Create YDB transport with endpoint {} and {}", serverEndpoint, balancingSettings);
@@ -124,6 +127,11 @@ public Tracer getTracer() {
return tracer;
}
+ @Override
+ public Meter getMeter() {
+ return meter;
+ }
+
@Override
public AuthCallOptions getAuthCallOptions() {
return callOptions;
diff --git a/core/src/main/java/tech/ydb/core/metrics/Meter.java b/core/src/main/java/tech/ydb/core/metrics/Meter.java
new file mode 100644
index 000000000..c94913257
--- /dev/null
+++ b/core/src/main/java/tech/ydb/core/metrics/Meter.java
@@ -0,0 +1,9 @@
+package tech.ydb.core.metrics;
+
+import tech.ydb.core.Status;
+
+public interface Meter {
+ void recordOperation(String name, long durationNanos, Status status);
+ void registerSessionPool(String poolName, SessionPoolObserver observer);
+ void recordSessionCreateTime(String poolName, long durationNanos);
+}
diff --git a/core/src/main/java/tech/ydb/core/metrics/NoopMeter.java b/core/src/main/java/tech/ydb/core/metrics/NoopMeter.java
new file mode 100644
index 000000000..cd126efd5
--- /dev/null
+++ b/core/src/main/java/tech/ydb/core/metrics/NoopMeter.java
@@ -0,0 +1,30 @@
+package tech.ydb.core.metrics;
+
+import tech.ydb.core.Status;
+
+public final class NoopMeter implements Meter {
+ public static final NoopMeter INSTANCE = new NoopMeter();
+
+ private NoopMeter() {
+ // No operations.
+ }
+
+ public static NoopMeter getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void recordOperation(String name, long durationNanos, Status status) {
+
+ }
+
+ @Override
+ public void registerSessionPool(String poolName, SessionPoolObserver observer) {
+
+ }
+
+ @Override
+ public void recordSessionCreateTime(String poolName, long durationNanos) {
+
+ }
+}
diff --git a/core/src/main/java/tech/ydb/core/metrics/SessionPoolObserver.java b/core/src/main/java/tech/ydb/core/metrics/SessionPoolObserver.java
new file mode 100644
index 000000000..4b872c6f5
--- /dev/null
+++ b/core/src/main/java/tech/ydb/core/metrics/SessionPoolObserver.java
@@ -0,0 +1,7 @@
+package tech.ydb.core.metrics;
+
+public interface SessionPoolObserver {
+ int getIdleCount();
+ int getUsedCount();
+ int getPendingCount();
+}
diff --git a/core/src/main/java/tech/ydb/core/tracing/Span.java b/core/src/main/java/tech/ydb/core/tracing/Span.java
index b482bc6cb..07d5dd8b0 100644
--- a/core/src/main/java/tech/ydb/core/tracing/Span.java
+++ b/core/src/main/java/tech/ydb/core/tracing/Span.java
@@ -1,127 +1,45 @@
package tech.ydb.core.tracing;
-import java.util.concurrent.CompletableFuture;
-
import javax.annotation.Nullable;
-import io.grpc.ExperimentalApi;
-
-import tech.ydb.core.Result;
import tech.ydb.core.Status;
-import tech.ydb.core.utils.FutureTools;
/**
* A span represents a timed operation.
*/
-@ExperimentalApi("YDB Tracer is experimental and API may change without notice")
public interface Span {
- Span NOOP = new Span() {
- };
- /**
- * Returns W3C traceparent value for request propagation.
- *
- *
For {@link #NOOP} this returns an empty string. Check {@link #isValid()} to decide whether
- * trace headers should be sent to server.
- *
- * @return traceparent value
- */
- default String getId() {
- return "";
- }
+ String getId();
/**
- * Indicates whether this span carries a real tracing context.
- *
- * @return true for real spans, false for noop span
- */
- default boolean isValid() {
- return false;
- }
-
- /**
- * Sets a string attribute on the span.
+ * Sets a string attribute on the span (ignored by Noop implementation).
*
* @param key attribute key
* @param value attribute value, may be null
*/
- default void setAttribute(String key, @Nullable String value) {
- }
+ void setAttribute(String key, @Nullable String value);
/**
- * Sets a long attribute on the span.
+ * Sets a long attribute on the span (ignored by Noop implementation).
*
* @param key attribute key
* @param value attribute value
*/
- default void setAttribute(String key, long value) {
- }
+ void setAttribute(String key, long value);
/**
- * Sets span status (success or error) with human-readable message.
+ * Sets span status to error with human-readable message.
*
- * @param status operation status used to map span attributes
- * @param error operation exception used to map span attributes
+ * @param status operation status used to map error attributes
*/
- default void setStatus(@Nullable Status status, @Nullable Throwable error) {
- }
+ void setError(Status status);
/**
- * Makes this span current in the active execution context.
+ * Sets span status to error from exception.
*
- * @return closeable scope handle
+ * @param error exception used to map error attributes
*/
- default Scope makeCurrent() {
- return () -> {
- };
- }
+ void setError(Throwable error);
- /**
- * Restores context captured when this span was created.
- *
- *
The returned scope must be closed, usually via try-with-resources.
- *
- * @return closeable scope handle for restored context
- */
- default Scope restoreContext() {
- return () -> {
- };
- }
-
- /**
- * Ends (finishes) this span.
- */
- default void end() {
- }
-
- /**
- * Subscribes to a {@link Status} future: when it completes, sets status/error and ends the span.
- * For non-valid spans returns the future as-is without subscribing.
- *
- * @param span the span to finalize
- * @param future the future to observe
- * @return the same future (for chaining)
- */
- static CompletableFuture endOnStatus(Span span, CompletableFuture future) {
- return span.isValid() ? future.whenComplete((status, th) -> {
- span.setStatus(status, FutureTools.unwrapCompletionException(th));
- span.end();
- }) : future;
- }
-
- /**
- * Subscribes to a {@link Result} future: when it completes, sets status/error and ends the span.
- * For non-valid spans returns the future as-is without subscribing.
- *
- * @param result value type
- * @param span the span to finalize
- * @param future the future to observe
- * @return the same future (for chaining)
- */
- static CompletableFuture> endOnResult(Span span, CompletableFuture> future) {
- return span.isValid() ? future.whenComplete((result, th) -> {
- span.setStatus(result != null ? result.getStatus() : null, FutureTools.unwrapCompletionException(th));
- span.end();
- }) : future;
- }
+ void end();
}
diff --git a/core/src/main/java/tech/ydb/core/tracing/SpanFinalizer.java b/core/src/main/java/tech/ydb/core/tracing/SpanFinalizer.java
new file mode 100644
index 000000000..b98ba895c
--- /dev/null
+++ b/core/src/main/java/tech/ydb/core/tracing/SpanFinalizer.java
@@ -0,0 +1,52 @@
+package tech.ydb.core.tracing;
+
+import java.util.function.BiConsumer;
+
+import tech.ydb.core.Status;
+import tech.ydb.core.utils.FutureTools;
+
+/**
+ * Shared helpers to finish spans for status/exception outcomes.
+ */
+public final class SpanFinalizer {
+ private SpanFinalizer() {
+ }
+
+ public static void finishByStatus(Span span, Status status) {
+ if (span == null) {
+ return;
+ }
+
+ if (status != null && !status.isSuccess()) {
+ span.setError(status);
+ }
+
+ span.end();
+ }
+
+ public static void finishByError(Span span, Throwable error) {
+ if (span == null) {
+ return;
+ }
+
+ if (error != null) {
+ span.setError(FutureTools.unwrapCompletionException(error));
+ }
+
+ span.end();
+ }
+
+ public static BiConsumer whenComplete(Span span) {
+ return (status, error) -> {
+ if (span == null) {
+ return;
+ }
+
+ if (error != null) {
+ finishByError(span, error);
+ return;
+ }
+ finishByStatus(span, status);
+ };
+ }
+}
diff --git a/opentelemetry/pom.xml b/opentelemetry/pom.xml
new file mode 100644
index 000000000..df320be1b
--- /dev/null
+++ b/opentelemetry/pom.xml
@@ -0,0 +1,66 @@
+
+
+ 4.0.0
+
+
+ tech.ydb
+ ydb-sdk-parent
+ 2.4.1-SNAPSHOT
+
+
+ ydb-sdk-opentelemetry
+ OpenTelemetry integration for YDB Java SDK
+ OpenTelemetry integration module for tracing and metrics in YDB SDK
+
+
+ 1.59.0
+
+
+
+
+ tech.ydb
+ ydb-sdk-core
+
+
+ io.opentelemetry
+ opentelemetry-api
+ ${opentelemetry.version}
+
+
+ org.testng
+ testng
+ 7.12.0
+ test
+
+
+ io.opentelemetry
+ opentelemetry-sdk
+ 1.59.0
+ test
+
+
+ junit
+ junit
+ test
+
+
+ tech.ydb
+ ydb-sdk-query
+ test
+
+
+ tech.ydb.test
+ ydb-junit4-support
+ test
+
+
+ io.opentelemetry
+ opentelemetry-sdk-testing
+ 1.59.0
+ test
+
+
+
diff --git a/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryMeter.java b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryMeter.java
new file mode 100644
index 000000000..63867688b
--- /dev/null
+++ b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryMeter.java
@@ -0,0 +1,99 @@
+package tech.ydb.opentelemetry;
+
+import java.util.Objects;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+import io.opentelemetry.api.metrics.LongCounter;
+
+import tech.ydb.core.Status;
+import tech.ydb.core.metrics.Meter;
+import tech.ydb.core.metrics.SessionPoolObserver;
+
+public final class OpenTelemetryMeter implements Meter {
+ private final io.opentelemetry.api.metrics.Meter meter;
+
+ private final DoubleHistogram operationDuration; // db.client.operation.duration
+ private final LongCounter operationFailed; // ydb.client.operation.failed
+ private final DoubleHistogram sessionCreateTime; // ydb.query.session.create_time
+
+ private final Attributes baseAttributes; // db.system.name, db.namespace, server.address, server.port
+
+ private OpenTelemetryMeter(io.opentelemetry.api.metrics.Meter meter,
+ String database, String host, int port) {
+ this.meter = Objects.requireNonNull(meter, "meter is null");
+ this.operationDuration = meter.histogramBuilder("db.client.operation.duration")
+ .setUnit("s")
+ .build();
+ this.operationFailed = meter.counterBuilder("ydb.client.operation.failed")
+ .setUnit("{command}")
+ .build();
+ this.sessionCreateTime = meter.histogramBuilder("ydb.query.session.create_time")
+ .setUnit("s")
+ .build();
+
+ this.baseAttributes = Attributes.of(
+ AttributeKey.stringKey("db.system.name"), "ydb",
+ AttributeKey.stringKey("db.namespace"), database,
+ AttributeKey.stringKey("server.address"), host,
+ AttributeKey.longKey("server.port"), (long) port
+ );
+ }
+
+ public static OpenTelemetryMeter fromOpenTelemetry(OpenTelemetry openTelemetry,
+ String database, String host, int port) {
+ io.opentelemetry.api.metrics.Meter meter = openTelemetry
+ .getMeter("tech.ydb.sdk");
+
+ return new OpenTelemetryMeter(meter, database, host, port);
+ }
+
+ public static OpenTelemetryMeter createGlobal(String database, String host, int port) {
+ return fromOpenTelemetry(GlobalOpenTelemetry.get(), database, host, port);
+ }
+
+ @Override
+ public void recordOperation(String name, long durationNanos, Status status) {
+ Attributes attrs = baseAttributes.toBuilder()
+ .put("ydb.operation.name", name)
+ .build();
+
+ operationDuration.record(durationNanos / 1_000_000_000.0, attrs);
+
+ if (status != null && !status.isSuccess()) {
+ Attributes errAttrs = attrs.toBuilder()
+ .put("db.response.status_code", status.getCode().toString())
+ .build();
+ operationFailed.add(1, errAttrs);
+ }
+ }
+
+ @Override
+ public void registerSessionPool(String poolName, SessionPoolObserver observer) {
+ meter.upDownCounterBuilder("ydb.query.session.count")
+ .setUnit("{connection}")
+ .buildWithCallback(measurement -> {
+ Attributes idle = Attributes.of(
+ AttributeKey.stringKey("ydb.query.session.pool.name"), poolName,
+ AttributeKey.stringKey("ydb.query.session.state"), "idle"
+ );
+ Attributes used = Attributes.of(
+ AttributeKey.stringKey("ydb.query.session.pool.name"), poolName,
+ AttributeKey.stringKey("ydb.query.session.state"), "used"
+ );
+ measurement.record(observer.getIdleCount(), idle);
+ measurement.record(observer.getUsedCount(), used);
+ });
+ }
+
+ @Override
+ public void recordSessionCreateTime(String poolName, long durationNanos) {
+ Attributes attrs = Attributes.of(
+ AttributeKey.stringKey("ydb.query.session.pool.name"), poolName
+ );
+ sessionCreateTime.record(durationNanos / 1_000_000_000.0, attrs);
+ }
+}
diff --git a/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java
new file mode 100644
index 000000000..27665b3df
--- /dev/null
+++ b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java
@@ -0,0 +1,93 @@
+package tech.ydb.opentelemetry;
+
+import java.util.Objects;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.trace.StatusCode;
+
+import tech.ydb.core.Status;
+import tech.ydb.core.tracing.Span;
+import tech.ydb.core.tracing.SpanKind;
+import tech.ydb.core.tracing.Tracer;
+
+public final class OpenTelemetryTracer implements Tracer {
+ private static final String DEFAULT_SCOPE = "tech.ydb.sdk";
+
+ private final io.opentelemetry.api.trace.Tracer tracer;
+
+ private OpenTelemetryTracer(io.opentelemetry.api.trace.Tracer tracer) {
+ this.tracer = Objects.requireNonNull(tracer, "tracer is null");
+ }
+
+ public static OpenTelemetryTracer createGlobal() {
+ return fromOpenTelemetry(GlobalOpenTelemetry.get());
+ }
+
+ public static OpenTelemetryTracer fromOpenTelemetry(OpenTelemetry openTelemetry) {
+ return fromOpenTelemetry(openTelemetry, DEFAULT_SCOPE);
+ }
+
+ public static OpenTelemetryTracer fromOpenTelemetry(OpenTelemetry openTelemetry, String scopeName) {
+ Objects.requireNonNull(openTelemetry, "openTelemetry is null");
+ Objects.requireNonNull(scopeName, "scopeName is null");
+ return new OpenTelemetryTracer(openTelemetry.getTracer(scopeName));
+ }
+
+ @Override
+ public Span startSpan(String spanName, SpanKind spanKind) {
+ io.opentelemetry.api.trace.Span span = tracer.spanBuilder(spanName)
+ .setSpanKind(mapSpanKind(spanKind))
+ .startSpan();
+ return new OtelSpan(span);
+ }
+
+ private static io.opentelemetry.api.trace.SpanKind mapSpanKind(SpanKind kind) {
+ if (kind == SpanKind.CLIENT) {
+ return io.opentelemetry.api.trace.SpanKind.CLIENT;
+ }
+ return io.opentelemetry.api.trace.SpanKind.INTERNAL;
+ }
+
+ private static final class OtelSpan implements Span {
+ private final io.opentelemetry.api.trace.Span span;
+
+ private OtelSpan(io.opentelemetry.api.trace.Span span) {
+ this.span = span;
+ }
+
+ @Override
+ public String getId() {
+ return "00-" + span.getSpanContext().getTraceId() + "-" + span.getSpanContext().getSpanId() + "01";
+ }
+
+ @Override
+ public void setAttribute(String key, String value) {
+ span.setAttribute(key, value);
+ }
+
+ @Override
+ public void setAttribute(String key, long value) {
+ span.setAttribute(key, value);
+ }
+
+ @Override
+ public void setError(Status status) {
+ span.setAttribute("db.response.status_code", status.getCode().toString());
+ span.setAttribute("error.type", status.getCode().isTransportError() ? "transport_error" : "ydb_error");
+
+ span.setStatus(StatusCode.ERROR, status.toString());
+ }
+
+ @Override
+ public void setError(Throwable error) {
+ span.setAttribute("error.type", error.getClass().getName());
+ span.setStatus(StatusCode.ERROR, error.getMessage());
+ }
+
+ @Override
+ public void end() {
+ span.end();
+ }
+ }
+}
diff --git a/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryMetricsIntegrationTest.java b/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryMetricsIntegrationTest.java
new file mode 100644
index 000000000..d8501dc59
--- /dev/null
+++ b/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryMetricsIntegrationTest.java
@@ -0,0 +1,192 @@
+package tech.ydb.opentelemetry;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collection;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.HistogramPointData;
+import io.opentelemetry.sdk.metrics.data.LongPointData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import org.junit.After;
+import org.testng.annotations.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import tech.ydb.auth.TokenAuthProvider;
+import tech.ydb.common.transaction.TxMode;
+import tech.ydb.core.grpc.GrpcTransport;
+import tech.ydb.query.QueryClient;
+import tech.ydb.query.QuerySession;
+import tech.ydb.query.QueryTransaction;
+import tech.ydb.test.junit4.YdbHelperRule;
+
+public class OpenTelemetryMetricsIntegrationTest {
+ @ClassRule
+ public static final YdbHelperRule YDB = new YdbHelperRule();
+
+ private static final AttributeKey DB_SYSTEM_NAME = AttributeKey.stringKey("db.system.name");
+ private static final AttributeKey DB_NAMESPACE = AttributeKey.stringKey("db.namespace");
+ private static final AttributeKey SERVER_ADDRESS = AttributeKey.stringKey("server.address");
+ private static final AttributeKey SERVER_PORT = AttributeKey.longKey("server.port");
+ private static final AttributeKey OPERATION_NAME = AttributeKey.stringKey("ydb.operation.name");
+
+ private static InMemoryMetricReader metricReader;
+ private static SdkMeterProvider meterProvider;
+ private static GrpcTransport transport;
+
+ private QueryClient queryClient;
+
+ @BeforeClass
+ public static void initTransport() {
+ metricReader = InMemoryMetricReader.create();
+ meterProvider = SdkMeterProvider.builder()
+ .registerMetricReader(metricReader)
+ .build();
+
+ OpenTelemetry openTelemetry = OpenTelemetrySdk.builder()
+ .setMeterProvider(meterProvider)
+ .build();
+
+ String host = extractHost(YDB.endpoint());
+ int port = extractPort(YDB.endpoint());
+
+ transport = GrpcTransport.forEndpoint(YDB.endpoint(), YDB.database())
+ .withAuthProvider(new TokenAuthProvider(YDB.authToken()))
+ .withMeter(OpenTelemetryMeter.fromOpenTelemetry(openTelemetry, YDB.database(), host, port))
+ .build();
+ }
+
+ @AfterClass
+ public static void closeTransport() throws IOException {
+ transport.close();
+ meterProvider.close();
+ metricReader.close();
+ }
+
+ @Before
+ public void initClient() {
+ queryClient = QueryClient.newClient(transport).build();
+ }
+
+ @After
+ public void closeClient() {
+ queryClient.close();
+ }
+
+ @Test
+ public void executeQueryRecordsOperationDuration() {
+ try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) {
+ session.createQuery("SELECT 1", TxMode.NONE).execute().join().getStatus().expectSuccess();
+ }
+
+ MetricData metric = findMetric("db.client.operation.duration");
+ Assert.assertNotNull("db.client.operation.duration metric not found", metric);
+
+ HistogramPointData point = findHistogramPoint(metric, "ydb.ExecuteQuery");
+ Assert.assertNotNull("No histogram point for ydb.ExecuteQuery", point);
+ Assert.assertTrue("Duration must be > 0", point.getSum() > 0);
+ Assert.assertEquals("ydb", point.getAttributes().get(DB_SYSTEM_NAME));
+ Assert.assertEquals(YDB.database(), point.getAttributes().get(DB_NAMESPACE));
+ Assert.assertNotNull(point.getAttributes().get(SERVER_ADDRESS));
+ Assert.assertNotNull(point.getAttributes().get(SERVER_PORT));
+ }
+
+ @Test
+ public void commitAndRollbackRecordOperationDuration() {
+ try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) {
+ QueryTransaction txCommit = session.beginTransaction(TxMode.SERIALIZABLE_RW)
+ .join().getValue();
+ txCommit.createQuery("SELECT 1").execute().join().getStatus().expectSuccess();
+ txCommit.commit().join().getStatus().expectSuccess();
+
+ QueryTransaction txRollback = session.beginTransaction(TxMode.SERIALIZABLE_RW)
+ .join().getValue();
+ txRollback.createQuery("SELECT 1").execute().join().getStatus().expectSuccess();
+ txRollback.rollback().join().expectSuccess();
+ }
+
+ MetricData metric = findMetric("db.client.operation.duration");
+ Assert.assertNotNull(metric);
+
+ Assert.assertNotNull("No histogram point for ydb.Commit",
+ findHistogramPoint(metric, "ydb.Commit"));
+ Assert.assertNotNull("No histogram point for ydb.Rollback",
+ findHistogramPoint(metric, "ydb.Rollback"));
+ }
+
+ @Test
+ public void failedOperationRecordsFailedCounter() {
+ try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) {
+ session.createQuery("SELECT * FROM __nonexistent_table__", TxMode.NONE)
+ .execute().join();
+ }
+
+ MetricData metric = findMetric("ydb.client.operation.failed");
+ Assert.assertNotNull("ydb.client.operation.failed metric not found", metric);
+
+ Collection points = metric.getLongSumData().getPoints();
+ Assert.assertFalse("Failed counter must have at least one point", points.isEmpty());
+ long total = points.stream().mapToLong(LongPointData::getValue).sum();
+ Assert.assertTrue("Failed counter must be > 0", total > 0);
+ }
+
+ @Test
+ public void sessionPoolMetricsAreReported() {
+ // создаём сессию чтобы пул оживился
+ try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) {
+ session.createQuery("SELECT 1", TxMode.NONE).execute().join().getStatus().expectSuccess();
+ }
+
+ MetricData metric = findMetric("ydb.query.session.count");
+ Assert.assertNotNull("ydb.query.session.count metric not found", metric);
+ }
+
+ // --- helpers ---
+
+ private MetricData findMetric(String name) {
+ Collection metrics = metricReader.collectAllMetrics();
+ for (MetricData m : metrics) {
+ if (name.equals(m.getName())) {
+ return m;
+ }
+ }
+ return null;
+ }
+
+ private HistogramPointData findHistogramPoint(MetricData metric, String operationName) {
+ for (HistogramPointData point : metric.getHistogramData().getPoints()) {
+ String op = point.getAttributes().get(OPERATION_NAME);
+ if (operationName.equals(op)) {
+ return point;
+ }
+ }
+ return null;
+ }
+
+ private static String extractHost(String endpoint) {
+ // endpoint вида grpc://host:port или host:port
+ String stripped = endpoint.replaceFirst("grpcs?://", "");
+ int colon = stripped.lastIndexOf(':');
+ return colon >= 0 ? stripped.substring(0, colon) : stripped;
+ }
+
+ private static int extractPort(String endpoint) {
+ String stripped = endpoint.replaceFirst("grpcs?://", "");
+ int colon = stripped.lastIndexOf(':');
+ if (colon >= 0) {
+ try {
+ return Integer.parseInt(stripped.substring(colon + 1));
+ } catch (NumberFormatException ignored) {
+ }
+ }
+ return 2135;
+ }
+}
diff --git a/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java b/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java
index 66c2e0a6c..6038c447a 100644
--- a/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java
+++ b/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java
@@ -6,6 +6,7 @@
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.core.grpc.GrpcRequestSettings;
import tech.ydb.core.grpc.GrpcTransport;
+import tech.ydb.core.metrics.Meter;
import tech.ydb.core.operation.StatusExtractor;
import tech.ydb.core.tracing.Span;
import tech.ydb.core.tracing.SpanKind;
@@ -51,16 +52,22 @@ class QueryServiceRpc {
private final GrpcTransport transport;
private final Tracer trace;
+ private final Meter meter;
QueryServiceRpc(GrpcTransport transport) {
this.transport = transport;
this.trace = transport.getTracer();
+ this.meter = transport.getMeter();
}
Span startSpan(String spanName) {
return trace.startSpan(spanName, SpanKind.CLIENT);
}
+ Meter getMeter() {
+ return meter;
+ }
+
public CompletableFuture> createSession(
YdbQuery.CreateSessionRequest request, GrpcRequestSettings settings) {
return transport
diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java
index f4c5a4065..141dfc925 100644
--- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java
+++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java
@@ -27,6 +27,7 @@
import tech.ydb.core.operation.StatusExtractor;
import tech.ydb.core.settings.BaseRequestSettings;
import tech.ydb.core.tracing.Span;
+import tech.ydb.core.tracing.SpanFinalizer;
import tech.ydb.core.utils.URITools;
import tech.ydb.core.utils.UpdatableOptional;
import tech.ydb.proto.ValueProtos;
@@ -144,16 +145,11 @@ GrpcReadStream attach(AttachSessionSettings settings) {
YdbQuery.AttachSessionRequest request = YdbQuery.AttachSessionRequest.newBuilder()
.setSessionId(sessionId)
.build();
- // Execute attachSession call outside current context to avoid cancellation and deadline propagation
+ // Execute attachSession call outside current context to avoid cancellation and deadline propogation
Context ctx = Context.ROOT.fork();
Context previous = ctx.attach();
try {
- AtomicBoolean pessimizationHook = new AtomicBoolean(false);
-
- GrpcRequestSettings grpcSettings = makeOptions(settings)
- .withPessimizationHook(pessimizationHook::get)
- .disableDeadline()
- .build();
+ GrpcRequestSettings grpcSettings = makeOptions(settings).disableDeadline().build();
GrpcReadStream origin = rpc.attachSession(request, grpcSettings);
return new GrpcReadStream() {
@Override
@@ -166,19 +162,6 @@ public CompletableFuture start(GrpcReadStream.Observer observer)
StatusCode code = StatusCode.fromProto(message.getStatus());
Status status = Status.of(code, Issue.fromPb(message.getIssuesList()));
updateSessionState(status);
- // The hint is sent by the server with a success status.
- switch (message.getSessionHintCase()) {
- case NODE_SHUTDOWN:
- pessimizationHook.set(nodeID != 0);
- updateSessionState(Status.of(StatusCode.BAD_SESSION));
- break;
- case SESSION_SHUTDOWN:
- updateSessionState(Status.of(StatusCode.BAD_SESSION));
- break;
- default:
- break;
- }
-
observer.onNext(status);
});
}
@@ -330,8 +313,10 @@ GrpcReadStream createGrpcStream(
@Override
public QueryStream createQuery(String query, TxMode tx, Params prms, ExecuteQuerySettings settings) {
YdbQuery.TransactionControl tc = TxControl.txModeCtrl(tx, true);
- Span span = startSpan("ydb.ExecuteQuery");
- return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span) {
+ Span span = rpc.startSpan("ydb.ExecuteQuery");
+ long startNanos = System.nanoTime();
+
+ return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span, startNanos, "ydb.ExecuteQuery") {
@Override
void handleTxMeta(String txID) {
if (txID != null && !txID.isEmpty()) {
@@ -371,17 +356,29 @@ static CompletableFuture> createSession(
return rpc.createSession(request, grpcSettingsBuilder.build()).thenApply(result -> {
pessimizationHook.set(result.getStatus().getCode() == StatusCode.OVERLOADED);
+ if (!result.isSuccess()) {
+ SpanFinalizer.finishByStatus(createSpan, result.getStatus());
+ }
return CREATE_SESSION.apply(result);
});
}
abstract class StreamImpl implements QueryStream {
private final GrpcReadStream grpcStream;
- private final Span span;
-
- StreamImpl(GrpcReadStream grpcStream, Span operationSpan) {
+ @Nullable
+ private final Span operationSpan;
+ private final long startNanos;
+ private final String operationName;
+
+ StreamImpl(
+ GrpcReadStream grpcStream,
+ @Nullable
+ Span operationSpan, long startNanos, String operationName
+ ) {
this.grpcStream = grpcStream;
- this.span = operationSpan;
+ this.operationSpan = operationSpan;
+ this.startNanos = startNanos;
+ this.operationName = operationName;
}
abstract void handleTxMeta(String txId);
@@ -393,10 +390,10 @@ void handleCompletion(Status status, Throwable th) {
public CompletableFuture> execute(PartsHandler handler) {
final UpdatableOptional operationStatus = new UpdatableOptional<>();
final UpdatableOptional stats = new UpdatableOptional<>();
- return Span.endOnResult(span, grpcStream.start(msg -> {
+ return grpcStream.start(msg -> {
if (isTraceEnabled) {
- logger.trace("{} got stream message {}",
- SessionImpl.this, TextFormat.shortDebugString(msg));
+ logger.trace("{} got stream message {}", SessionImpl.this,
+ TextFormat.shortDebugString(msg));
}
Issue[] issues = Issue.fromPb(msg.getIssuesList());
Status status = Status.of(StatusCode.fromProto(msg.getStatus()), issues);
@@ -431,16 +428,20 @@ public CompletableFuture> execute(PartsHandler handler) {
logger.trace("{} lost result set part with index {}", SessionImpl.this, index);
}
}
- }).whenComplete(this::handleCompletion).thenApply(streamStatus -> {
+ })
+ .whenComplete(this::handleCompletion)
+ .whenComplete(SpanFinalizer.whenComplete(operationSpan))
+ .thenApply(streamStatus -> {
updateSessionState(streamStatus);
Status status = operationStatus.orElse(streamStatus);
+ long elapsed = System.nanoTime() - startNanos;
+ rpc.getMeter().recordOperation(operationName, elapsed, status);
if (status.isSuccess()) {
return Result.success(new QueryInfo(stats.get()), streamStatus);
} else {
return Result.fail(status);
}
- })
- );
+ });
}
@Override
@@ -477,8 +478,10 @@ public QueryStream createQuery(String query, boolean commitAtEnd, Params prms, E
? TxControl.txIdCtrl(currentId, commitAtEnd)
: TxControl.txModeCtrl(txMode, commitAtEnd);
- Span span = startSpan("ydb.ExecuteQuery");
- return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span) {
+ Span span = rpc.startSpan("ydb.ExecuteQuery");
+ long startNanos = System.nanoTime();
+
+ return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span, startNanos, "ydb.ExecuteQuery") {
@Override
void handleTxMeta(String txID) {
String newId = txID == null || txID.isEmpty() ? null : txID;
@@ -501,7 +504,7 @@ void handleCompletion(Status status, Throwable th) {
currentStatusFuture.complete(Status
.of(StatusCode.ABORTED)
.withIssues(Issue.of("Query on transaction failed with status "
- + status, Issue.Severity.ERROR)));
+ + status, Issue.Severity.ERROR)));
}
}
@@ -517,13 +520,16 @@ public void cancel() {
@Override
public CompletableFuture> commit(CommitTransactionSettings settings) {
- Span span = startSpan("ydb.Commit");
+ final Span commitSpan = rpc.startSpan("ydb.Commit");
+ final long startNanos = System.nanoTime();
+
CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>());
- String transactionId = txId.get();
+ final String transactionId = txId.get();
if (transactionId == null) {
Issue issue = Issue.of("Transaction is not started", Issue.Severity.WARNING);
Result res = Result.success(new QueryInfo(null), Status.of(StatusCode.SUCCESS, issue));
- return Span.endOnResult(span, CompletableFuture.completedFuture(res));
+ SpanFinalizer.finishByStatus(commitSpan, res.getStatus());
+ return CompletableFuture.completedFuture(res);
}
YdbQuery.CommitTransactionRequest request = YdbQuery.CommitTransactionRequest.newBuilder()
@@ -531,7 +537,7 @@ public CompletableFuture> commit(CommitTransactionSettings set
.setTxId(transactionId)
.build();
- return Span.endOnResult(span, rpc.commitTransaction(request, makeOptions(settings, span).build()))
+ return rpc.commitTransaction(request, makeOptions(settings, commitSpan).build())
.thenApply(res -> {
Status status = res.getStatus();
currentStatusFuture.complete(status);
@@ -545,27 +551,37 @@ public CompletableFuture> commit(CommitTransactionSettings set
if (th != null) {
currentStatusFuture.completeExceptionally(
new RuntimeException("Transaction commit failed with exception", th));
+ SpanFinalizer.finishByError(commitSpan, th);
+ return;
}
- }));
+ SpanFinalizer.finishByStatus(commitSpan, status.getStatus());
+ })).whenComplete((status, th) -> { // добавить
+ long elapsed = System.nanoTime() - startNanos;
+ Status finalStatus = status != null ? status.getStatus() : Status.of(StatusCode.CLIENT_INTERNAL_ERROR);
+ rpc.getMeter().recordOperation("ydb.Commit", elapsed, finalStatus);
+ });
}
@Override
public CompletableFuture rollback(RollbackTransactionSettings settings) {
- Span span = startSpan("ydb.Rollback");
+ final Span rollbackSpan = rpc.startSpan("ydb.Rollback");
+ final long startNanos = System.nanoTime();
+
CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>());
- String transactionId = txId.get();
+ final String transactionId = txId.get();
if (transactionId == null) {
Issue issue = Issue.of("Transaction is not started", Issue.Severity.WARNING);
Status status = Status.of(StatusCode.SUCCESS, issue);
- return Span.endOnStatus(span, CompletableFuture.completedFuture(status));
+ SpanFinalizer.finishByStatus(rollbackSpan, status);
+ return CompletableFuture.completedFuture(status);
}
YdbQuery.RollbackTransactionRequest request = YdbQuery.RollbackTransactionRequest.newBuilder()
.setSessionId(sessionId)
.setTxId(transactionId)
.build();
- return Span.endOnResult(span, rpc.rollbackTransaction(request, makeOptions(settings, span).build()))
+ return rpc.rollbackTransaction(request, makeOptions(settings, rollbackSpan).build())
.thenApply(result -> {
updateSessionState(result.getStatus());
if (!txId.compareAndSet(transactionId, null)) {
@@ -578,6 +594,15 @@ public CompletableFuture rollback(RollbackTransactionSettings settings)
currentStatusFuture.complete(Status
.of(StatusCode.ABORTED)
.withIssues(Issue.of("Transaction was rolled back", Issue.Severity.ERROR)));
+ if (th != null) {
+ SpanFinalizer.finishByError(rollbackSpan, th);
+ return;
+ }
+ SpanFinalizer.finishByStatus(rollbackSpan, status);
+ }).whenComplete((status, th) -> {
+ long elapsed = System.nanoTime() - startNanos;
+ Status finalStatus = status != null ? status : Status.of(StatusCode.CLIENT_INTERNAL_ERROR);
+ rpc.getMeter().recordOperation("ydb.Rollback", elapsed, finalStatus);
});
}
}
diff --git a/query/src/main/java/tech/ydb/query/impl/SessionPool.java b/query/src/main/java/tech/ydb/query/impl/SessionPool.java
index ccbbf4889..78ad1fd76 100644
--- a/query/src/main/java/tech/ydb/query/impl/SessionPool.java
+++ b/query/src/main/java/tech/ydb/query/impl/SessionPool.java
@@ -22,7 +22,9 @@
import tech.ydb.core.StatusCode;
import tech.ydb.core.UnexpectedResultException;
import tech.ydb.core.grpc.GrpcReadStream;
+import tech.ydb.core.metrics.SessionPoolObserver;
import tech.ydb.core.tracing.Span;
+import tech.ydb.core.tracing.SpanFinalizer;
import tech.ydb.core.utils.FutureTools;
import tech.ydb.proto.query.YdbQuery;
import tech.ydb.query.QuerySession;
@@ -78,6 +80,23 @@ class SessionPool implements AutoCloseable {
minSize,
maxSize,
cleaner.periodMillis);
+
+ rpc.getMeter().registerSessionPool("default", new SessionPoolObserver() {
+ @Override
+ public int getIdleCount() {
+ return queue.getIdleCount();
+ }
+
+ @Override
+ public int getUsedCount() {
+ return queue.getUsedCount();
+ }
+
+ @Override
+ public int getPendingCount() {
+ return queue.getPendingCount();
+ }
+ });
}
public void updateMaxSize(int maxSize) {
@@ -277,16 +296,43 @@ public CompletableFuture create() {
Context previous = ctx.attach();
try {
Span createSpan = rpc.startSpan("ydb.CreateSession");
+ long startNanos = System.nanoTime();
+
stats.requested.increment();
- return Span.endOnResult(createSpan, SessionImpl.createSession(rpc, CREATE_SETTINGS, true, createSpan))
+ return SessionImpl
+ .createSession(rpc, CREATE_SETTINGS, true, createSpan)
.thenCompose(r -> {
if (!r.isSuccess()) {
+ SpanFinalizer.finishByStatus(createSpan, r.getStatus());
stats.failed.increment();
throw new UnexpectedResultException("create session problem", r.getStatus());
}
PooledQuerySession session = new PooledQuerySession(rpc, r.getValue());
return session.start();
- }).thenApply(Result::getValue);
+ })
+ .whenComplete((result, th) -> {
+ if (th != null) {
+ Throwable error = FutureTools.unwrapCompletionException(th);
+ if (error instanceof UnexpectedResultException) {
+ SpanFinalizer.finishByStatus(
+ createSpan,
+ ((UnexpectedResultException) error).getStatus()
+ );
+ } else {
+ SpanFinalizer.finishByError(createSpan, error);
+ }
+ return;
+ }
+
+ SpanFinalizer.finishByStatus(createSpan, result.getStatus());
+ })
+ .whenComplete((status, th) -> {
+ long elapsed = System.nanoTime() - startNanos;
+ Status finalStatus = status != null ? status.getStatus() : Status.of(StatusCode.CLIENT_INTERNAL_ERROR);
+ rpc.getMeter().recordOperation("ydb.CreateSession", elapsed, finalStatus);
+ rpc.getMeter().recordSessionCreateTime("default", elapsed);
+ })
+ .thenApply(Result::getValue);
} finally {
ctx.detach(previous);
}
diff --git a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java
index 8fde13bc8..f9f1dee21 100644
--- a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java
+++ b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java
@@ -16,7 +16,7 @@
import tech.ydb.core.UnexpectedResultException;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.core.tracing.Span;
-import tech.ydb.core.tracing.Tracer;
+import tech.ydb.core.tracing.SpanFinalizer;
import tech.ydb.proto.ValueProtos;
import tech.ydb.proto.query.YdbQuery;
import tech.ydb.proto.table.YdbTable;
@@ -61,11 +61,6 @@ public ScheduledExecutorService getScheduler() {
return proxy.getScheduler();
}
- @Override
- public Tracer getTracer() {
- return proxy.getTracer();
- }
-
@Override
public SessionPoolStats sessionPoolStats() {
return proxy.getSessionPoolStats();
@@ -92,9 +87,6 @@ private YdbQuery.TransactionControl mapTxControl(YdbTable.TransactionControl tc)
if (tc.getBeginTx().hasSnapshotReadOnly()) {
return TxControl.txModeCtrl(TxMode.SNAPSHOT_RO, tc.getCommitTx());
}
- if (tc.getBeginTx().hasSnapshotReadWrite()) {
- return TxControl.txModeCtrl(TxMode.SNAPSHOT_RW, tc.getCommitTx());
- }
if (tc.getBeginTx().hasStaleReadOnly()) {
return TxControl.txModeCtrl(TxMode.STALE_RO, tc.getCommitTx());
}
@@ -132,9 +124,10 @@ public CompletableFuture> executeDataQueryInternal(
final List issues = new ArrayList<>();
final List results = new ArrayList<>();
Span span = querySession.startSpan("ydb.ExecuteQuery");
+ final long startNanos = System.nanoTime();
QueryStream stream = querySession.new StreamImpl(querySession.createGrpcStream(query, tc, prms, qs, span),
- span) {
+ span, startNanos, "ydb.ExecuteQuery") {
@Override
void handleTxMeta(String txID) {
txRef.set(txID);
@@ -205,23 +198,23 @@ public CompletableFuture> beginTransaction(TxMode txMod
}
@Override
- protected CompletableFuture commitTransactionInternal(String txId, CommitTxSettings settings) {
+ public CompletableFuture commitTransaction(String txId, CommitTxSettings settings) {
Span span = querySession.startSpan("ydb.Commit");
CommitTransactionSettings querySettings = CommitTransactionSettings.newBuilder()
.withTraceId(settings.getTraceId())
.withRequestTimeout(settings.getTimeoutDuration())
.build();
- return Span.endOnStatus(span, querySession.commitById(txId, querySettings, span));
+ return querySession.commitById(txId, querySettings, span).whenComplete(SpanFinalizer.whenComplete(span));
}
@Override
- protected CompletableFuture rollbackTransactionInternal(String txId, RollbackTxSettings settings) {
+ public CompletableFuture rollbackTransaction(String txId, RollbackTxSettings settings) {
Span span = querySession.startSpan("ydb.Rollback");
RollbackTransactionSettings querySettings = RollbackTransactionSettings.newBuilder()
.withTraceId(settings.getTraceId())
.withRequestTimeout(settings.getTimeoutDuration())
.build();
- return Span.endOnStatus(span, querySession.rollbackById(txId, querySettings, span));
+ return querySession.rollbackById(txId, querySettings, span).whenComplete(SpanFinalizer.whenComplete(span));
}
private final class TracedTableTransaction implements TableTransaction {
@@ -258,7 +251,7 @@ public CompletableFuture rollback(RollbackTxSettings settings) {
if (txId == null) {
return delegate.rollback(settings);
}
- return rollbackTransactionInternal(txId, settings);
+ return TableSession.this.rollbackTransaction(txId, settings);
}
@Override
diff --git a/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java b/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java
index 303e69229..34f5f1893 100644
--- a/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java
+++ b/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java
@@ -4,16 +4,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.annotation.Nullable;
import org.junit.After;
import org.junit.AfterClass;
@@ -27,10 +17,7 @@
import tech.ydb.common.transaction.TxMode;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
-import tech.ydb.core.StatusCode;
-import tech.ydb.core.UnexpectedResultException;
import tech.ydb.core.grpc.GrpcTransport;
-import tech.ydb.core.tracing.Scope;
import tech.ydb.core.tracing.Span;
import tech.ydb.core.tracing.SpanKind;
import tech.ydb.core.tracing.Tracer;
@@ -38,8 +25,6 @@
import tech.ydb.query.QuerySession;
import tech.ydb.query.QueryTransaction;
import tech.ydb.query.result.QueryInfo;
-import tech.ydb.query.tools.SessionRetryContext;
-import tech.ydb.proto.StatusCodesProtos;
import tech.ydb.table.Session;
import tech.ydb.table.TableClient;
import tech.ydb.table.query.DataQueryResult;
@@ -54,7 +39,6 @@ public class QueryTracingTest {
private static GrpcTransport transport;
private static RecordingTracer tracer;
- private static final GrpcTestInterceptor grpcInterceptor = new GrpcTestInterceptor();
private QueryClient queryClient;
private TableClient tableClient;
@@ -64,7 +48,6 @@ public static void initTransport() {
tracer = new RecordingTracer();
transport = GrpcTransport.forEndpoint(YDB.endpoint(), YDB.database())
.withAuthProvider(new TokenAuthProvider(YDB.authToken()))
- .addChannelInitializer(grpcInterceptor)
.withTracer(tracer)
.build();
}
@@ -77,7 +60,6 @@ public static void closeTransport() {
@Before
public void initClient() {
tracer.reset();
- grpcInterceptor.reset();
queryClient = QueryClient.newClient(transport).build();
tableClient = null;
}
@@ -92,22 +74,13 @@ public void closeClient() {
}
}
- private static void awaitTracing() {
- try {
- tracer.awaitAllSpansClosed(Duration.ofSeconds(30));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- Assert.fail(e.toString());
- }
- }
-
@Test
public void createSessionSpanIsRecorded() {
- try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) {
- Assert.assertNotNull(session.getId());
- awaitTracing();
- Assert.assertEquals(1, tracer.countClosedSpan("ydb.CreateSession"));
+ try (QuerySession ignored = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) {
+ // no-op
}
+
+ Assert.assertEquals(1, tracer.countClosedSpan("ydb.CreateSession"));
}
@Test
@@ -116,7 +89,6 @@ public void executeQuerySpanIsRecorded() {
session.createQuery("SELECT 1", TxMode.NONE).execute().join().getStatus().expectSuccess();
}
- awaitTracing();
Assert.assertEquals(1, tracer.countClosedSpan("ydb.ExecuteQuery"));
}
@@ -131,7 +103,6 @@ public void commitSpanIsRecordedInQueryTransaction() {
commitResult.getStatus().expectSuccess();
}
- awaitTracing();
Assert.assertEquals(1, tracer.countClosedSpan("ydb.Commit"));
}
@@ -146,7 +117,6 @@ public void rollbackSpanIsRecordedInQueryTransaction() {
rollbackStatus.expectSuccess();
}
- awaitTracing();
Assert.assertEquals(1, tracer.countClosedSpan("ydb.Rollback"));
}
@@ -162,7 +132,6 @@ public void createSessionAndExecuteQuerySpansAreRecordedInTableProxy() {
result.getStatus().expectSuccess();
}
- awaitTracing();
Assert.assertEquals(1, tracer.countClosedSpan("ydb.CreateSession"));
Assert.assertEquals(1, tracer.countClosedSpan("ydb.ExecuteQuery"));
}
@@ -184,406 +153,47 @@ public void executeQuerySpansAreRecordedInTableProxyTransaction() {
rollbackStatus.expectSuccess();
}
- awaitTracing();
Assert.assertEquals(1, tracer.countClosedSpan("ydb.CreateSession"));
Assert.assertEquals(2, tracer.countClosedSpan("ydb.ExecuteQuery"));
Assert.assertEquals(1, tracer.countClosedSpan("ydb.Commit"));
Assert.assertEquals(1, tracer.countClosedSpan("ydb.Rollback"));
}
- @Test
- public void querySpanIsChildOfApplicationSpan() {
- try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) {
- Span appParent = tracer.startSpan("app.parent", SpanKind.INTERNAL);
- try (Scope ignored = appParent.makeCurrent()) {
- session.createQuery("SELECT 1", TxMode.NONE).execute().join().getStatus().expectSuccess();
- } finally {
- appParent.end();
- }
- }
-
- awaitTracing();
- Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "app.parent"));
- }
-
- @Test
- public void retrySpanIsParentOfRpcSpans() {
- grpcInterceptor.failExecuteQuery(StatusCodesProtos.StatusIds.StatusCode.BAD_SESSION, 2);
- SessionRetryContext retryContext = SessionRetryContext.create(queryClient)
- .maxRetries(5)
- .backoffSlot(Duration.ofMillis(1))
- .fastBackoffSlot(Duration.ofMillis(1))
- .build();
-
- Span appParent = tracer.startSpan("app.parent.retry", SpanKind.INTERNAL);
- try (Scope ignored = appParent.makeCurrent()) {
- Status status = retryContext.supplyStatus(session ->
- session.createQuery("SELECT 1", TxMode.NONE).execute().thenApply(Result::getStatus)).join();
- status.expectSuccess();
- } finally {
- appParent.end();
- }
-
- awaitTracing();
- Assert.assertEquals(3, tracer.countClosedSpan("ydb.Try"));
- Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.RunWithRetry", "app.parent.retry"));
- Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.Try", "ydb.RunWithRetry"));
- Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.CreateSession", "ydb.Try"));
- Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.Try"));
- // First Try has no backoff_ms (no prior sleep); subsequent tries capture the prior sleep duration
- Assert.assertEquals(2, tracer.countClosedSpanWithAttribute("ydb.Try", "ydb.retry.backoff_ms"));
- }
-
- @Test
- public void retryContextRetriesOnCreateSessionFailures() {
- grpcInterceptor.failCreateSession(StatusCodesProtos.StatusIds.StatusCode.ABORTED, 2);
- SessionRetryContext retryContext = SessionRetryContext.create(queryClient)
- .maxRetries(5)
- .backoffSlot(Duration.ofMillis(1))
- .fastBackoffSlot(Duration.ofMillis(1))
- .build();
-
- Span appParent = tracer.startSpan("app.parent.createSession.retry", SpanKind.INTERNAL);
- try (Scope ignored = appParent.makeCurrent()) {
- Status status = retryContext.supplyStatus(session ->
- session.createQuery("SELECT 1", TxMode.NONE).execute().thenApply(Result::getStatus)).join();
- status.expectSuccess();
- } finally {
- appParent.end();
- }
-
- awaitTracing();
- Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.RunWithRetry", "app.parent.createSession.retry"));
- Assert.assertEquals(3, tracer.countClosedSpan("ydb.Try"));
- Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.Try", "ydb.RunWithRetry"));
- Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.CreateSession", "ydb.Try"));
- Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.Try"));
- Assert.assertEquals(2, tracer.countClosedSpanWithAttribute("ydb.Try", "ydb.retry.backoff_ms"));
- }
-
- @Test
- public void retryContextRetriesOnCommitFailures() {
- grpcInterceptor.failCommit(StatusCodesProtos.StatusIds.StatusCode.BAD_SESSION, 2);
- SessionRetryContext retryContext = SessionRetryContext.create(queryClient)
- .maxRetries(5)
- .backoffSlot(Duration.ofMillis(1))
- .fastBackoffSlot(Duration.ofMillis(1))
- .build();
-
- Span appParent = tracer.startSpan("app.parent.commit.retry", SpanKind.INTERNAL);
- try (Scope ignored = appParent.makeCurrent()) {
- Status status = retryContext.supplyStatus(session -> {
- QueryTransaction tx = session.createNewTransaction(TxMode.SERIALIZABLE_RW);
- return tx.createQuery("SELECT 1").execute()
- .thenCompose(queryResult -> {
- queryResult.getStatus().expectSuccess();
- return tx.commit().thenApply(Result::getStatus);
- });
- }
- ).join();
- status.expectSuccess();
- } finally {
- appParent.end();
- }
-
- awaitTracing();
- Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.RunWithRetry", "app.parent.commit.retry"));
- Assert.assertEquals(3, tracer.countClosedSpan("ydb.Try"));
- Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.Try", "ydb.RunWithRetry"));
- Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.CreateSession", "ydb.Try"));
- Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.Try"));
- Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.Commit", "ydb.Try"));
- Assert.assertEquals(2, tracer.countClosedSpanWithAttribute("ydb.Try", "ydb.retry.backoff_ms"));
- }
-
- @Test
- public void tableProxyRetrySpanIsParentOfRpcSpans() {
- AtomicInteger attempt = new AtomicInteger();
- tableClient = QueryClient.newTableClient(transport).build();
- tech.ydb.table.SessionRetryContext retryContext = tech.ydb.table.SessionRetryContext.create(tableClient)
- .maxRetries(5)
- .backoffSlot(Duration.ofMillis(1))
- .fastBackoffSlot(Duration.ofMillis(1))
- .build();
-
- Span appParent = tracer.startSpan("app.parent.table.retry", SpanKind.INTERNAL);
- try (Scope ignored = appParent.makeCurrent()) {
- Status status = retryContext.supplyStatus(session -> {
- if (attempt.getAndIncrement() == 0) {
- return CompletableFuture.completedFuture(Status.of(StatusCode.OVERLOADED));
- }
- TableTransaction tx = session.createNewTransaction(TxMode.SERIALIZABLE_RW);
- Result queryResult = tx.executeDataQuery("SELECT 1", Params.empty()).join();
- queryResult.getStatus().expectSuccess();
- return tx.commit();
- }).join();
- status.expectSuccess();
- } finally {
- appParent.end();
- }
-
- awaitTracing();
- Assert.assertEquals(2, tracer.countClosedSpan("ydb.Try"));
- Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.RunWithRetry", "app.parent.table.retry"));
- Assert.assertEquals(2, tracer.countClosedSpanWithParent("ydb.Try", "ydb.RunWithRetry"));
- Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.CreateSession", "ydb.Try"));
- Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.Try"));
- Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.Commit", "ydb.Try"));
- }
-
- @Test
- public void nonRetryableExceptionClosesRetrySpan() {
- SessionRetryContext retryContext = SessionRetryContext.create(queryClient)
- .maxRetries(5)
- .backoffSlot(Duration.ofMillis(1))
- .fastBackoffSlot(Duration.ofMillis(1))
- .build();
-
- RuntimeException thrown;
- try {
- retryContext.supplyStatus(session -> {
- throw new IllegalStateException("boom");
- }).join();
- throw new AssertionError("Exception expected");
- } catch (RuntimeException ex) {
- thrown = ex;
- }
-
- awaitTracing();
- Assert.assertNotNull(thrown.getCause());
- Assert.assertTrue(thrown.getCause() instanceof IllegalStateException);
- Assert.assertEquals(1,
- tracer.countClosedSpanWithErrorType("ydb.RunWithRetry", IllegalStateException.class.getName()));
- Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.Try", "ydb.RunWithRetry"));
- Assert.assertEquals(1, tracer.countClosedSpan("ydb.Try"));
- Assert.assertEquals(1,
- tracer.countClosedSpanWithErrorType("ydb.Try", IllegalStateException.class.getName()));
- Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.CreateSession", "ydb.Try"));
- Assert.assertEquals(0, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.Try"));
- }
-
- @Test
- public void retryableUnexpectedResultExceptionRetriesAndSetsErrorType() {
- AtomicInteger attempt = new AtomicInteger();
- SessionRetryContext retryContext = SessionRetryContext.create(queryClient)
- .maxRetries(5)
- .backoffSlot(Duration.ofMillis(1))
- .fastBackoffSlot(Duration.ofMillis(1))
- .build();
-
- Status status = retryContext.supplyStatus(session -> {
- if (attempt.getAndIncrement() == 0) {
- throw new UnexpectedResultException("retryable", Status.of(StatusCode.OVERLOADED));
- }
- return session.createQuery("SELECT 1", TxMode.NONE).execute().thenApply(Result::getStatus);
- }).join();
- status.expectSuccess();
-
- awaitTracing();
- Assert.assertEquals(1, tracer.countClosedSpan("ydb.RunWithRetry"));
- Assert.assertEquals(2, tracer.countClosedSpanWithParent("ydb.Try", "ydb.RunWithRetry"));
- Assert.assertEquals(2, tracer.countClosedSpan("ydb.Try"));
- Assert.assertEquals(1,
- tracer.countClosedSpanWithErrorType("ydb.Try", UnexpectedResultException.class.getName()));
- Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.Try"));
- }
-
- @Test
- public void contextCancelClosesRunWithRetrySpan() throws InterruptedException {
- SessionRetryContext retryContext = SessionRetryContext.create(queryClient)
- .maxRetries(5)
- .backoffSlot(Duration.ofMillis(1))
- .fastBackoffSlot(Duration.ofMillis(1))
- .build();
-
- CountDownLatch firstPartReceived = new CountDownLatch(1);
- CountDownLatch unblockReader = new CountDownLatch(1);
-
- CompletableFuture future = retryContext.supplyStatus(session ->
- session.createQuery("SELECT 1; SELECT 2;", TxMode.NONE).execute(part -> {
- firstPartReceived.countDown();
- try {
- unblockReader.await(30, TimeUnit.SECONDS);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- }).thenApply(Result::getStatus));
-
- Assert.assertTrue(firstPartReceived.await(30, TimeUnit.SECONDS));
- future.cancel(false);
- unblockReader.countDown();
-
- awaitTracing();
- Assert.assertThrows(CancellationException.class, future::join);
- Assert.assertTrue(future.isCancelled());
- Assert.assertEquals(1,
- tracer.countClosedSpanWithErrorType("ydb.RunWithRetry", CancellationException.class.getName()));
- }
-
private static final class RecordingTracer implements Tracer {
- private final List spans = Collections.synchronizedList(new ArrayList<>());
- private final ThreadLocal currentSpan = new ThreadLocal<>();
-
- private final Object spanLock = new Object();
- private int openSpans = 0;
+ private final List spans = Collections.synchronizedList(new ArrayList());
@Override
public Span startSpan(String spanName, SpanKind spanKind) {
- RecordingSpan span = new RecordingSpan(this, spanName, spanKind, currentSpan.get());
+ RecordingSpan span = new RecordingSpan(spanName, spanKind);
spans.add(span);
return span;
}
- /** Called from {@link RecordingSpan} constructor: one span opened. */
- void recordSpanOpen() {
- synchronized (spanLock) {
- openSpans++;
- }
- }
-
- /** Called from {@link RecordingSpan#end()}: one span closed. */
- void recordSpanClose() {
- synchronized (spanLock) {
- openSpans--;
- if (openSpans == 0) {
- spanLock.notifyAll();
- }
- }
- }
-
void reset() {
- synchronized (spanLock) {
- spans.clear();
- openSpans = 0;
- }
- }
-
- /** Waits until every span that called {@link #recordSpanOpen} has ended ({@code openSpans == 0}). */
- void awaitAllSpansClosed(Duration timeout) throws InterruptedException {
- long deadlineNanos = System.nanoTime() + timeout.toNanos();
- synchronized (spanLock) {
- while (openSpans > 0) {
- long remainingNanos = deadlineNanos - System.nanoTime();
- if (remainingNanos <= 0) {
- Assert.fail("await timeout, openSpans=" + openSpans);
- }
- long waitMillis = TimeUnit.NANOSECONDS.toMillis(remainingNanos);
- if (waitMillis == 0) {
- waitMillis = 1;
- }
- spanLock.wait(waitMillis);
- }
- }
+ spans.clear();
}
int countClosedSpan(String spanName) {
int count = 0;
synchronized (spans) {
for (RecordingSpan span : spans) {
- if (span.spanName().equals(spanName) && span.isClosed()) {
- count++;
- }
- }
- }
- return count;
- }
-
- int countClosedSpanWithParent(String spanName, String parentSpanName) {
- int count = 0;
- synchronized (spans) {
- for (RecordingSpan span : spans) {
- if (span.isClosed()
- && span.spanName().equals(spanName)
- && span.parentSpan() != null
- && span.parentSpan().spanName().equals(parentSpanName)) {
- count++;
- }
- }
- }
- return count;
- }
-
- int countClosedSpanWithErrorType(String spanName, String errorType) {
- int count = 0;
- synchronized (spans) {
- for (RecordingSpan span : spans) {
- Throwable err = span.throwableError();
- if (span.isClosed()
- && span.spanName().equals(spanName)
- && err != null
- && err.getClass().getName().equals(errorType)) {
+ if (span.name.equals(spanName) && span.closed) {
count++;
}
}
}
return count;
}
-
- int countClosedSpanWithAttribute(String spanName, String key) {
- int count = 0;
- synchronized (spans) {
- for (RecordingSpan span : spans) {
- if (span.isClosed()
- && span.spanName().equals(spanName)
- && span.hasLongAttribute(key)) {
- count++;
- }
- }
- }
- return count;
- }
-
- Scope makeSpanCurrent(RecordingSpan span) {
- RecordingSpan previous = currentSpan.get();
- currentSpan.set(span);
- return () -> {
- if (previous == null) {
- currentSpan.remove();
- } else {
- currentSpan.set(previous);
- }
- };
- }
}
private static final class RecordingSpan implements Span {
- private final RecordingTracer tracer;
private final String name;
private final SpanKind kind;
- private final RecordingSpan parent;
- private final ConcurrentMap longAttributes = new ConcurrentHashMap<>();
- private Throwable throwableError;
private volatile boolean closed = false;
- private final AtomicBoolean endedOnce = new AtomicBoolean(false);
- RecordingSpan(RecordingTracer tracer, String name, SpanKind kind, RecordingSpan parent) {
- this.tracer = tracer;
+ RecordingSpan(String name, SpanKind kind) {
this.name = name;
this.kind = kind;
- this.parent = parent;
- tracer.recordSpanOpen();
- }
-
- String spanName() {
- return name;
- }
-
- boolean isClosed() {
- return closed;
- }
-
- @Nullable
- RecordingSpan parentSpan() {
- return parent;
- }
-
- @Nullable
- Throwable throwableError() {
- return throwableError;
- }
-
- boolean hasLongAttribute(String key) {
- return longAttributes.containsKey(key);
}
@Override
@@ -592,37 +202,28 @@ public String getId() {
}
@Override
- public boolean isValid() {
- return true;
- }
-
- @Override
- public Scope makeCurrent() {
- return tracer.makeSpanCurrent(this);
+ public void setAttribute(String key, String value) {
+ // not needed for this test
}
@Override
- public Scope restoreContext() {
- return parent != null ? parent.makeCurrent() : Span.NOOP.makeCurrent();
+ public void setAttribute(String key, long value) {
+ // not needed for this test
}
@Override
- public void setStatus(@Nullable Status status, @Nullable Throwable error) {
- this.throwableError = error;
+ public void setError(Status status) {
+ // not needed for this test
}
@Override
- public void setAttribute(String key, long value) {
- longAttributes.put(key, value);
+ public void setError(Throwable error) {
+ // not needed for this test
}
@Override
public void end() {
- if (!endedOnce.compareAndSet(false, true)) {
- return;
- }
this.closed = true;
- tracer.recordSpanClose();
}
}
}
diff --git a/table/src/main/java/tech/ydb/table/impl/SimpleTableClient.java b/table/src/main/java/tech/ydb/table/impl/SimpleTableClient.java
index 596466acb..199d3ae73 100644
--- a/table/src/main/java/tech/ydb/table/impl/SimpleTableClient.java
+++ b/table/src/main/java/tech/ydb/table/impl/SimpleTableClient.java
@@ -6,7 +6,6 @@
import tech.ydb.core.Result;
import tech.ydb.core.StatusCode;
-import tech.ydb.core.tracing.Tracer;
import tech.ydb.table.Session;
import tech.ydb.table.SessionSupplier;
import tech.ydb.table.rpc.TableRpc;
@@ -39,11 +38,6 @@ public ScheduledExecutorService getScheduler() {
return tableRpc.getScheduler();
}
- @Override
- public Tracer getTracer() {
- return tableRpc.getTracer();
- }
-
public static Builder newClient(TableRpc rpc) {
return new Builder(rpc);
}