diff --git a/core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java b/core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java index e7850d771..344697273 100644 --- a/core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java +++ b/core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java @@ -6,6 +6,8 @@ import java.util.function.BooleanSupplier; import java.util.function.Consumer; +import javax.annotation.Nullable; + import io.grpc.Metadata; import tech.ydb.core.impl.call.GrpcFlows; diff --git a/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java b/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java index 96816601f..c4decca36 100644 --- a/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java +++ b/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java @@ -12,6 +12,8 @@ import io.grpc.MethodDescriptor; import tech.ydb.core.Result; +import tech.ydb.core.metrics.NoopMeter; +import tech.ydb.core.metrics.Meter; import tech.ydb.core.tracing.NoopTracer; import tech.ydb.core.tracing.Tracer; import tech.ydb.core.utils.URITools; @@ -46,6 +48,14 @@ default Tracer getTracer() { return NoopTracer.getInstance(); } + default Meter getMeter() { + return NoopMeter.INSTANCE; + } + + default Meter getMeter() { + return NoopMeter.INSTANCE; + } + @Override void close(); diff --git a/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java b/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java index 534cf08be..9d13113b7 100644 --- a/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java +++ b/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java @@ -26,6 +26,8 @@ import tech.ydb.core.impl.auth.GrpcAuthRpc; import tech.ydb.core.impl.pool.ChannelFactoryLoader; import tech.ydb.core.impl.pool.ManagedChannelFactory; +import tech.ydb.core.metrics.NoopMeter; +import tech.ydb.core.metrics.Meter; import tech.ydb.core.tracing.NoopTracer; import tech.ydb.core.tracing.Tracer; import tech.ydb.core.utils.Version; @@ -87,6 +89,7 @@ public enum InitMode { private GrpcCompression compression = GrpcCompression.NO_COMPRESSION; private InitMode initMode = InitMode.SYNC; private Tracer tracer = NoopTracer.getInstance(); + private Meter meter = NoopMeter.getInstance(); /** * can cause leaks https://github.com/grpc/grpc-java/issues/9340 @@ -195,6 +198,10 @@ public Tracer getTracer() { return tracer; } + public Meter getMeter() { + return meter; + } + public ManagedChannelFactory getManagedChannelFactory() { if (channelFactoryBuilder == null) { channelFactoryBuilder = ChannelFactoryLoader.load(); @@ -423,6 +430,11 @@ public GrpcTransportBuilder withTracer(Tracer tracer) { return this; } + public GrpcTransportBuilder withMeter(Meter meter) { + this.meter = Objects.requireNonNull(meter, "meter is null"); + return this; + } + /** * use {@link GrpcTransportBuilder#withGrpcRetry(boolean) } instead * @return this diff --git a/core/src/main/java/tech/ydb/core/impl/YdbTransportImpl.java b/core/src/main/java/tech/ydb/core/impl/YdbTransportImpl.java index 52d4ba018..768634801 100644 --- a/core/src/main/java/tech/ydb/core/impl/YdbTransportImpl.java +++ b/core/src/main/java/tech/ydb/core/impl/YdbTransportImpl.java @@ -21,6 +21,7 @@ import tech.ydb.core.impl.pool.GrpcChannel; import tech.ydb.core.impl.pool.GrpcChannelPool; import tech.ydb.core.impl.pool.ManagedChannelFactory; +import tech.ydb.core.metrics.Meter; import tech.ydb.core.tracing.Tracer; /** @@ -37,6 +38,7 @@ public class YdbTransportImpl extends BaseGrpcTransport { private final GrpcChannelPool channelPool; private final YdbDiscovery discovery; private final Tracer tracer; + private final Meter meter; public YdbTransportImpl(GrpcTransportBuilder builder) { super(builder); @@ -45,6 +47,7 @@ public YdbTransportImpl(GrpcTransportBuilder builder) { this.database = Strings.nullToEmpty(builder.getDatabase()); this.tracer = builder.getTracer(); + this.meter = builder.getMeter(); logger.info("Create YDB transport with endpoint {} and {}", serverEndpoint, balancingSettings); @@ -124,6 +127,11 @@ public Tracer getTracer() { return tracer; } + @Override + public Meter getMeter() { + return meter; + } + @Override public AuthCallOptions getAuthCallOptions() { return callOptions; diff --git a/core/src/main/java/tech/ydb/core/metrics/Meter.java b/core/src/main/java/tech/ydb/core/metrics/Meter.java new file mode 100644 index 000000000..c94913257 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/metrics/Meter.java @@ -0,0 +1,9 @@ +package tech.ydb.core.metrics; + +import tech.ydb.core.Status; + +public interface Meter { + void recordOperation(String name, long durationNanos, Status status); + void registerSessionPool(String poolName, SessionPoolObserver observer); + void recordSessionCreateTime(String poolName, long durationNanos); +} diff --git a/core/src/main/java/tech/ydb/core/metrics/NoopMeter.java b/core/src/main/java/tech/ydb/core/metrics/NoopMeter.java new file mode 100644 index 000000000..cd126efd5 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/metrics/NoopMeter.java @@ -0,0 +1,30 @@ +package tech.ydb.core.metrics; + +import tech.ydb.core.Status; + +public final class NoopMeter implements Meter { + public static final NoopMeter INSTANCE = new NoopMeter(); + + private NoopMeter() { + // No operations. + } + + public static NoopMeter getInstance() { + return INSTANCE; + } + + @Override + public void recordOperation(String name, long durationNanos, Status status) { + + } + + @Override + public void registerSessionPool(String poolName, SessionPoolObserver observer) { + + } + + @Override + public void recordSessionCreateTime(String poolName, long durationNanos) { + + } +} diff --git a/core/src/main/java/tech/ydb/core/metrics/SessionPoolObserver.java b/core/src/main/java/tech/ydb/core/metrics/SessionPoolObserver.java new file mode 100644 index 000000000..4b872c6f5 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/metrics/SessionPoolObserver.java @@ -0,0 +1,7 @@ +package tech.ydb.core.metrics; + +public interface SessionPoolObserver { + int getIdleCount(); + int getUsedCount(); + int getPendingCount(); +} diff --git a/core/src/main/java/tech/ydb/core/tracing/Span.java b/core/src/main/java/tech/ydb/core/tracing/Span.java index b482bc6cb..07d5dd8b0 100644 --- a/core/src/main/java/tech/ydb/core/tracing/Span.java +++ b/core/src/main/java/tech/ydb/core/tracing/Span.java @@ -1,127 +1,45 @@ package tech.ydb.core.tracing; -import java.util.concurrent.CompletableFuture; - import javax.annotation.Nullable; -import io.grpc.ExperimentalApi; - -import tech.ydb.core.Result; import tech.ydb.core.Status; -import tech.ydb.core.utils.FutureTools; /** * A span represents a timed operation. */ -@ExperimentalApi("YDB Tracer is experimental and API may change without notice") public interface Span { - Span NOOP = new Span() { - }; - /** - * Returns W3C traceparent value for request propagation. - * - *

For {@link #NOOP} this returns an empty string. Check {@link #isValid()} to decide whether - * trace headers should be sent to server. - * - * @return traceparent value - */ - default String getId() { - return ""; - } + String getId(); /** - * Indicates whether this span carries a real tracing context. - * - * @return true for real spans, false for noop span - */ - default boolean isValid() { - return false; - } - - /** - * Sets a string attribute on the span. + * Sets a string attribute on the span (ignored by Noop implementation). * * @param key attribute key * @param value attribute value, may be null */ - default void setAttribute(String key, @Nullable String value) { - } + void setAttribute(String key, @Nullable String value); /** - * Sets a long attribute on the span. + * Sets a long attribute on the span (ignored by Noop implementation). * * @param key attribute key * @param value attribute value */ - default void setAttribute(String key, long value) { - } + void setAttribute(String key, long value); /** - * Sets span status (success or error) with human-readable message. + * Sets span status to error with human-readable message. * - * @param status operation status used to map span attributes - * @param error operation exception used to map span attributes + * @param status operation status used to map error attributes */ - default void setStatus(@Nullable Status status, @Nullable Throwable error) { - } + void setError(Status status); /** - * Makes this span current in the active execution context. + * Sets span status to error from exception. * - * @return closeable scope handle + * @param error exception used to map error attributes */ - default Scope makeCurrent() { - return () -> { - }; - } + void setError(Throwable error); - /** - * Restores context captured when this span was created. - * - *

The returned scope must be closed, usually via try-with-resources. - * - * @return closeable scope handle for restored context - */ - default Scope restoreContext() { - return () -> { - }; - } - - /** - * Ends (finishes) this span. - */ - default void end() { - } - - /** - * Subscribes to a {@link Status} future: when it completes, sets status/error and ends the span. - * For non-valid spans returns the future as-is without subscribing. - * - * @param span the span to finalize - * @param future the future to observe - * @return the same future (for chaining) - */ - static CompletableFuture endOnStatus(Span span, CompletableFuture future) { - return span.isValid() ? future.whenComplete((status, th) -> { - span.setStatus(status, FutureTools.unwrapCompletionException(th)); - span.end(); - }) : future; - } - - /** - * Subscribes to a {@link Result} future: when it completes, sets status/error and ends the span. - * For non-valid spans returns the future as-is without subscribing. - * - * @param result value type - * @param span the span to finalize - * @param future the future to observe - * @return the same future (for chaining) - */ - static CompletableFuture> endOnResult(Span span, CompletableFuture> future) { - return span.isValid() ? future.whenComplete((result, th) -> { - span.setStatus(result != null ? result.getStatus() : null, FutureTools.unwrapCompletionException(th)); - span.end(); - }) : future; - } + void end(); } diff --git a/core/src/main/java/tech/ydb/core/tracing/SpanFinalizer.java b/core/src/main/java/tech/ydb/core/tracing/SpanFinalizer.java new file mode 100644 index 000000000..b98ba895c --- /dev/null +++ b/core/src/main/java/tech/ydb/core/tracing/SpanFinalizer.java @@ -0,0 +1,52 @@ +package tech.ydb.core.tracing; + +import java.util.function.BiConsumer; + +import tech.ydb.core.Status; +import tech.ydb.core.utils.FutureTools; + +/** + * Shared helpers to finish spans for status/exception outcomes. + */ +public final class SpanFinalizer { + private SpanFinalizer() { + } + + public static void finishByStatus(Span span, Status status) { + if (span == null) { + return; + } + + if (status != null && !status.isSuccess()) { + span.setError(status); + } + + span.end(); + } + + public static void finishByError(Span span, Throwable error) { + if (span == null) { + return; + } + + if (error != null) { + span.setError(FutureTools.unwrapCompletionException(error)); + } + + span.end(); + } + + public static BiConsumer whenComplete(Span span) { + return (status, error) -> { + if (span == null) { + return; + } + + if (error != null) { + finishByError(span, error); + return; + } + finishByStatus(span, status); + }; + } +} diff --git a/opentelemetry/pom.xml b/opentelemetry/pom.xml new file mode 100644 index 000000000..df320be1b --- /dev/null +++ b/opentelemetry/pom.xml @@ -0,0 +1,66 @@ + + + 4.0.0 + + + tech.ydb + ydb-sdk-parent + 2.4.1-SNAPSHOT + + + ydb-sdk-opentelemetry + OpenTelemetry integration for YDB Java SDK + OpenTelemetry integration module for tracing and metrics in YDB SDK + + + 1.59.0 + + + + + tech.ydb + ydb-sdk-core + + + io.opentelemetry + opentelemetry-api + ${opentelemetry.version} + + + org.testng + testng + 7.12.0 + test + + + io.opentelemetry + opentelemetry-sdk + 1.59.0 + test + + + junit + junit + test + + + tech.ydb + ydb-sdk-query + test + + + tech.ydb.test + ydb-junit4-support + test + + + io.opentelemetry + opentelemetry-sdk-testing + 1.59.0 + test + + + diff --git a/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryMeter.java b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryMeter.java new file mode 100644 index 000000000..63867688b --- /dev/null +++ b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryMeter.java @@ -0,0 +1,99 @@ +package tech.ydb.opentelemetry; + +import java.util.Objects; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.LongCounter; + +import tech.ydb.core.Status; +import tech.ydb.core.metrics.Meter; +import tech.ydb.core.metrics.SessionPoolObserver; + +public final class OpenTelemetryMeter implements Meter { + private final io.opentelemetry.api.metrics.Meter meter; + + private final DoubleHistogram operationDuration; // db.client.operation.duration + private final LongCounter operationFailed; // ydb.client.operation.failed + private final DoubleHistogram sessionCreateTime; // ydb.query.session.create_time + + private final Attributes baseAttributes; // db.system.name, db.namespace, server.address, server.port + + private OpenTelemetryMeter(io.opentelemetry.api.metrics.Meter meter, + String database, String host, int port) { + this.meter = Objects.requireNonNull(meter, "meter is null"); + this.operationDuration = meter.histogramBuilder("db.client.operation.duration") + .setUnit("s") + .build(); + this.operationFailed = meter.counterBuilder("ydb.client.operation.failed") + .setUnit("{command}") + .build(); + this.sessionCreateTime = meter.histogramBuilder("ydb.query.session.create_time") + .setUnit("s") + .build(); + + this.baseAttributes = Attributes.of( + AttributeKey.stringKey("db.system.name"), "ydb", + AttributeKey.stringKey("db.namespace"), database, + AttributeKey.stringKey("server.address"), host, + AttributeKey.longKey("server.port"), (long) port + ); + } + + public static OpenTelemetryMeter fromOpenTelemetry(OpenTelemetry openTelemetry, + String database, String host, int port) { + io.opentelemetry.api.metrics.Meter meter = openTelemetry + .getMeter("tech.ydb.sdk"); + + return new OpenTelemetryMeter(meter, database, host, port); + } + + public static OpenTelemetryMeter createGlobal(String database, String host, int port) { + return fromOpenTelemetry(GlobalOpenTelemetry.get(), database, host, port); + } + + @Override + public void recordOperation(String name, long durationNanos, Status status) { + Attributes attrs = baseAttributes.toBuilder() + .put("ydb.operation.name", name) + .build(); + + operationDuration.record(durationNanos / 1_000_000_000.0, attrs); + + if (status != null && !status.isSuccess()) { + Attributes errAttrs = attrs.toBuilder() + .put("db.response.status_code", status.getCode().toString()) + .build(); + operationFailed.add(1, errAttrs); + } + } + + @Override + public void registerSessionPool(String poolName, SessionPoolObserver observer) { + meter.upDownCounterBuilder("ydb.query.session.count") + .setUnit("{connection}") + .buildWithCallback(measurement -> { + Attributes idle = Attributes.of( + AttributeKey.stringKey("ydb.query.session.pool.name"), poolName, + AttributeKey.stringKey("ydb.query.session.state"), "idle" + ); + Attributes used = Attributes.of( + AttributeKey.stringKey("ydb.query.session.pool.name"), poolName, + AttributeKey.stringKey("ydb.query.session.state"), "used" + ); + measurement.record(observer.getIdleCount(), idle); + measurement.record(observer.getUsedCount(), used); + }); + } + + @Override + public void recordSessionCreateTime(String poolName, long durationNanos) { + Attributes attrs = Attributes.of( + AttributeKey.stringKey("ydb.query.session.pool.name"), poolName + ); + sessionCreateTime.record(durationNanos / 1_000_000_000.0, attrs); + } +} diff --git a/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java new file mode 100644 index 000000000..27665b3df --- /dev/null +++ b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java @@ -0,0 +1,93 @@ +package tech.ydb.opentelemetry; + +import java.util.Objects; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.StatusCode; + +import tech.ydb.core.Status; +import tech.ydb.core.tracing.Span; +import tech.ydb.core.tracing.SpanKind; +import tech.ydb.core.tracing.Tracer; + +public final class OpenTelemetryTracer implements Tracer { + private static final String DEFAULT_SCOPE = "tech.ydb.sdk"; + + private final io.opentelemetry.api.trace.Tracer tracer; + + private OpenTelemetryTracer(io.opentelemetry.api.trace.Tracer tracer) { + this.tracer = Objects.requireNonNull(tracer, "tracer is null"); + } + + public static OpenTelemetryTracer createGlobal() { + return fromOpenTelemetry(GlobalOpenTelemetry.get()); + } + + public static OpenTelemetryTracer fromOpenTelemetry(OpenTelemetry openTelemetry) { + return fromOpenTelemetry(openTelemetry, DEFAULT_SCOPE); + } + + public static OpenTelemetryTracer fromOpenTelemetry(OpenTelemetry openTelemetry, String scopeName) { + Objects.requireNonNull(openTelemetry, "openTelemetry is null"); + Objects.requireNonNull(scopeName, "scopeName is null"); + return new OpenTelemetryTracer(openTelemetry.getTracer(scopeName)); + } + + @Override + public Span startSpan(String spanName, SpanKind spanKind) { + io.opentelemetry.api.trace.Span span = tracer.spanBuilder(spanName) + .setSpanKind(mapSpanKind(spanKind)) + .startSpan(); + return new OtelSpan(span); + } + + private static io.opentelemetry.api.trace.SpanKind mapSpanKind(SpanKind kind) { + if (kind == SpanKind.CLIENT) { + return io.opentelemetry.api.trace.SpanKind.CLIENT; + } + return io.opentelemetry.api.trace.SpanKind.INTERNAL; + } + + private static final class OtelSpan implements Span { + private final io.opentelemetry.api.trace.Span span; + + private OtelSpan(io.opentelemetry.api.trace.Span span) { + this.span = span; + } + + @Override + public String getId() { + return "00-" + span.getSpanContext().getTraceId() + "-" + span.getSpanContext().getSpanId() + "01"; + } + + @Override + public void setAttribute(String key, String value) { + span.setAttribute(key, value); + } + + @Override + public void setAttribute(String key, long value) { + span.setAttribute(key, value); + } + + @Override + public void setError(Status status) { + span.setAttribute("db.response.status_code", status.getCode().toString()); + span.setAttribute("error.type", status.getCode().isTransportError() ? "transport_error" : "ydb_error"); + + span.setStatus(StatusCode.ERROR, status.toString()); + } + + @Override + public void setError(Throwable error) { + span.setAttribute("error.type", error.getClass().getName()); + span.setStatus(StatusCode.ERROR, error.getMessage()); + } + + @Override + public void end() { + span.end(); + } + } +} diff --git a/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryMetricsIntegrationTest.java b/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryMetricsIntegrationTest.java new file mode 100644 index 000000000..d8501dc59 --- /dev/null +++ b/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryMetricsIntegrationTest.java @@ -0,0 +1,192 @@ +package tech.ydb.opentelemetry; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import org.junit.After; +import org.testng.annotations.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import tech.ydb.auth.TokenAuthProvider; +import tech.ydb.common.transaction.TxMode; +import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.query.QueryClient; +import tech.ydb.query.QuerySession; +import tech.ydb.query.QueryTransaction; +import tech.ydb.test.junit4.YdbHelperRule; + +public class OpenTelemetryMetricsIntegrationTest { + @ClassRule + public static final YdbHelperRule YDB = new YdbHelperRule(); + + private static final AttributeKey DB_SYSTEM_NAME = AttributeKey.stringKey("db.system.name"); + private static final AttributeKey DB_NAMESPACE = AttributeKey.stringKey("db.namespace"); + private static final AttributeKey SERVER_ADDRESS = AttributeKey.stringKey("server.address"); + private static final AttributeKey SERVER_PORT = AttributeKey.longKey("server.port"); + private static final AttributeKey OPERATION_NAME = AttributeKey.stringKey("ydb.operation.name"); + + private static InMemoryMetricReader metricReader; + private static SdkMeterProvider meterProvider; + private static GrpcTransport transport; + + private QueryClient queryClient; + + @BeforeClass + public static void initTransport() { + metricReader = InMemoryMetricReader.create(); + meterProvider = SdkMeterProvider.builder() + .registerMetricReader(metricReader) + .build(); + + OpenTelemetry openTelemetry = OpenTelemetrySdk.builder() + .setMeterProvider(meterProvider) + .build(); + + String host = extractHost(YDB.endpoint()); + int port = extractPort(YDB.endpoint()); + + transport = GrpcTransport.forEndpoint(YDB.endpoint(), YDB.database()) + .withAuthProvider(new TokenAuthProvider(YDB.authToken())) + .withMeter(OpenTelemetryMeter.fromOpenTelemetry(openTelemetry, YDB.database(), host, port)) + .build(); + } + + @AfterClass + public static void closeTransport() throws IOException { + transport.close(); + meterProvider.close(); + metricReader.close(); + } + + @Before + public void initClient() { + queryClient = QueryClient.newClient(transport).build(); + } + + @After + public void closeClient() { + queryClient.close(); + } + + @Test + public void executeQueryRecordsOperationDuration() { + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + session.createQuery("SELECT 1", TxMode.NONE).execute().join().getStatus().expectSuccess(); + } + + MetricData metric = findMetric("db.client.operation.duration"); + Assert.assertNotNull("db.client.operation.duration metric not found", metric); + + HistogramPointData point = findHistogramPoint(metric, "ydb.ExecuteQuery"); + Assert.assertNotNull("No histogram point for ydb.ExecuteQuery", point); + Assert.assertTrue("Duration must be > 0", point.getSum() > 0); + Assert.assertEquals("ydb", point.getAttributes().get(DB_SYSTEM_NAME)); + Assert.assertEquals(YDB.database(), point.getAttributes().get(DB_NAMESPACE)); + Assert.assertNotNull(point.getAttributes().get(SERVER_ADDRESS)); + Assert.assertNotNull(point.getAttributes().get(SERVER_PORT)); + } + + @Test + public void commitAndRollbackRecordOperationDuration() { + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + QueryTransaction txCommit = session.beginTransaction(TxMode.SERIALIZABLE_RW) + .join().getValue(); + txCommit.createQuery("SELECT 1").execute().join().getStatus().expectSuccess(); + txCommit.commit().join().getStatus().expectSuccess(); + + QueryTransaction txRollback = session.beginTransaction(TxMode.SERIALIZABLE_RW) + .join().getValue(); + txRollback.createQuery("SELECT 1").execute().join().getStatus().expectSuccess(); + txRollback.rollback().join().expectSuccess(); + } + + MetricData metric = findMetric("db.client.operation.duration"); + Assert.assertNotNull(metric); + + Assert.assertNotNull("No histogram point for ydb.Commit", + findHistogramPoint(metric, "ydb.Commit")); + Assert.assertNotNull("No histogram point for ydb.Rollback", + findHistogramPoint(metric, "ydb.Rollback")); + } + + @Test + public void failedOperationRecordsFailedCounter() { + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + session.createQuery("SELECT * FROM __nonexistent_table__", TxMode.NONE) + .execute().join(); + } + + MetricData metric = findMetric("ydb.client.operation.failed"); + Assert.assertNotNull("ydb.client.operation.failed metric not found", metric); + + Collection points = metric.getLongSumData().getPoints(); + Assert.assertFalse("Failed counter must have at least one point", points.isEmpty()); + long total = points.stream().mapToLong(LongPointData::getValue).sum(); + Assert.assertTrue("Failed counter must be > 0", total > 0); + } + + @Test + public void sessionPoolMetricsAreReported() { + // создаём сессию чтобы пул оживился + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + session.createQuery("SELECT 1", TxMode.NONE).execute().join().getStatus().expectSuccess(); + } + + MetricData metric = findMetric("ydb.query.session.count"); + Assert.assertNotNull("ydb.query.session.count metric not found", metric); + } + + // --- helpers --- + + private MetricData findMetric(String name) { + Collection metrics = metricReader.collectAllMetrics(); + for (MetricData m : metrics) { + if (name.equals(m.getName())) { + return m; + } + } + return null; + } + + private HistogramPointData findHistogramPoint(MetricData metric, String operationName) { + for (HistogramPointData point : metric.getHistogramData().getPoints()) { + String op = point.getAttributes().get(OPERATION_NAME); + if (operationName.equals(op)) { + return point; + } + } + return null; + } + + private static String extractHost(String endpoint) { + // endpoint вида grpc://host:port или host:port + String stripped = endpoint.replaceFirst("grpcs?://", ""); + int colon = stripped.lastIndexOf(':'); + return colon >= 0 ? stripped.substring(0, colon) : stripped; + } + + private static int extractPort(String endpoint) { + String stripped = endpoint.replaceFirst("grpcs?://", ""); + int colon = stripped.lastIndexOf(':'); + if (colon >= 0) { + try { + return Integer.parseInt(stripped.substring(colon + 1)); + } catch (NumberFormatException ignored) { + } + } + return 2135; + } +} diff --git a/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java b/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java index 66c2e0a6c..6038c447a 100644 --- a/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java +++ b/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java @@ -6,6 +6,7 @@ import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.metrics.Meter; import tech.ydb.core.operation.StatusExtractor; import tech.ydb.core.tracing.Span; import tech.ydb.core.tracing.SpanKind; @@ -51,16 +52,22 @@ class QueryServiceRpc { private final GrpcTransport transport; private final Tracer trace; + private final Meter meter; QueryServiceRpc(GrpcTransport transport) { this.transport = transport; this.trace = transport.getTracer(); + this.meter = transport.getMeter(); } Span startSpan(String spanName) { return trace.startSpan(spanName, SpanKind.CLIENT); } + Meter getMeter() { + return meter; + } + public CompletableFuture> createSession( YdbQuery.CreateSessionRequest request, GrpcRequestSettings settings) { return transport diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java index f4c5a4065..141dfc925 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java @@ -27,6 +27,7 @@ import tech.ydb.core.operation.StatusExtractor; import tech.ydb.core.settings.BaseRequestSettings; import tech.ydb.core.tracing.Span; +import tech.ydb.core.tracing.SpanFinalizer; import tech.ydb.core.utils.URITools; import tech.ydb.core.utils.UpdatableOptional; import tech.ydb.proto.ValueProtos; @@ -144,16 +145,11 @@ GrpcReadStream attach(AttachSessionSettings settings) { YdbQuery.AttachSessionRequest request = YdbQuery.AttachSessionRequest.newBuilder() .setSessionId(sessionId) .build(); - // Execute attachSession call outside current context to avoid cancellation and deadline propagation + // Execute attachSession call outside current context to avoid cancellation and deadline propogation Context ctx = Context.ROOT.fork(); Context previous = ctx.attach(); try { - AtomicBoolean pessimizationHook = new AtomicBoolean(false); - - GrpcRequestSettings grpcSettings = makeOptions(settings) - .withPessimizationHook(pessimizationHook::get) - .disableDeadline() - .build(); + GrpcRequestSettings grpcSettings = makeOptions(settings).disableDeadline().build(); GrpcReadStream origin = rpc.attachSession(request, grpcSettings); return new GrpcReadStream() { @Override @@ -166,19 +162,6 @@ public CompletableFuture start(GrpcReadStream.Observer observer) StatusCode code = StatusCode.fromProto(message.getStatus()); Status status = Status.of(code, Issue.fromPb(message.getIssuesList())); updateSessionState(status); - // The hint is sent by the server with a success status. - switch (message.getSessionHintCase()) { - case NODE_SHUTDOWN: - pessimizationHook.set(nodeID != 0); - updateSessionState(Status.of(StatusCode.BAD_SESSION)); - break; - case SESSION_SHUTDOWN: - updateSessionState(Status.of(StatusCode.BAD_SESSION)); - break; - default: - break; - } - observer.onNext(status); }); } @@ -330,8 +313,10 @@ GrpcReadStream createGrpcStream( @Override public QueryStream createQuery(String query, TxMode tx, Params prms, ExecuteQuerySettings settings) { YdbQuery.TransactionControl tc = TxControl.txModeCtrl(tx, true); - Span span = startSpan("ydb.ExecuteQuery"); - return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span) { + Span span = rpc.startSpan("ydb.ExecuteQuery"); + long startNanos = System.nanoTime(); + + return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span, startNanos, "ydb.ExecuteQuery") { @Override void handleTxMeta(String txID) { if (txID != null && !txID.isEmpty()) { @@ -371,17 +356,29 @@ static CompletableFuture> createSession( return rpc.createSession(request, grpcSettingsBuilder.build()).thenApply(result -> { pessimizationHook.set(result.getStatus().getCode() == StatusCode.OVERLOADED); + if (!result.isSuccess()) { + SpanFinalizer.finishByStatus(createSpan, result.getStatus()); + } return CREATE_SESSION.apply(result); }); } abstract class StreamImpl implements QueryStream { private final GrpcReadStream grpcStream; - private final Span span; - - StreamImpl(GrpcReadStream grpcStream, Span operationSpan) { + @Nullable + private final Span operationSpan; + private final long startNanos; + private final String operationName; + + StreamImpl( + GrpcReadStream grpcStream, + @Nullable + Span operationSpan, long startNanos, String operationName + ) { this.grpcStream = grpcStream; - this.span = operationSpan; + this.operationSpan = operationSpan; + this.startNanos = startNanos; + this.operationName = operationName; } abstract void handleTxMeta(String txId); @@ -393,10 +390,10 @@ void handleCompletion(Status status, Throwable th) { public CompletableFuture> execute(PartsHandler handler) { final UpdatableOptional operationStatus = new UpdatableOptional<>(); final UpdatableOptional stats = new UpdatableOptional<>(); - return Span.endOnResult(span, grpcStream.start(msg -> { + return grpcStream.start(msg -> { if (isTraceEnabled) { - logger.trace("{} got stream message {}", - SessionImpl.this, TextFormat.shortDebugString(msg)); + logger.trace("{} got stream message {}", SessionImpl.this, + TextFormat.shortDebugString(msg)); } Issue[] issues = Issue.fromPb(msg.getIssuesList()); Status status = Status.of(StatusCode.fromProto(msg.getStatus()), issues); @@ -431,16 +428,20 @@ public CompletableFuture> execute(PartsHandler handler) { logger.trace("{} lost result set part with index {}", SessionImpl.this, index); } } - }).whenComplete(this::handleCompletion).thenApply(streamStatus -> { + }) + .whenComplete(this::handleCompletion) + .whenComplete(SpanFinalizer.whenComplete(operationSpan)) + .thenApply(streamStatus -> { updateSessionState(streamStatus); Status status = operationStatus.orElse(streamStatus); + long elapsed = System.nanoTime() - startNanos; + rpc.getMeter().recordOperation(operationName, elapsed, status); if (status.isSuccess()) { return Result.success(new QueryInfo(stats.get()), streamStatus); } else { return Result.fail(status); } - }) - ); + }); } @Override @@ -477,8 +478,10 @@ public QueryStream createQuery(String query, boolean commitAtEnd, Params prms, E ? TxControl.txIdCtrl(currentId, commitAtEnd) : TxControl.txModeCtrl(txMode, commitAtEnd); - Span span = startSpan("ydb.ExecuteQuery"); - return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span) { + Span span = rpc.startSpan("ydb.ExecuteQuery"); + long startNanos = System.nanoTime(); + + return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span, startNanos, "ydb.ExecuteQuery") { @Override void handleTxMeta(String txID) { String newId = txID == null || txID.isEmpty() ? null : txID; @@ -501,7 +504,7 @@ void handleCompletion(Status status, Throwable th) { currentStatusFuture.complete(Status .of(StatusCode.ABORTED) .withIssues(Issue.of("Query on transaction failed with status " - + status, Issue.Severity.ERROR))); + + status, Issue.Severity.ERROR))); } } @@ -517,13 +520,16 @@ public void cancel() { @Override public CompletableFuture> commit(CommitTransactionSettings settings) { - Span span = startSpan("ydb.Commit"); + final Span commitSpan = rpc.startSpan("ydb.Commit"); + final long startNanos = System.nanoTime(); + CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>()); - String transactionId = txId.get(); + final String transactionId = txId.get(); if (transactionId == null) { Issue issue = Issue.of("Transaction is not started", Issue.Severity.WARNING); Result res = Result.success(new QueryInfo(null), Status.of(StatusCode.SUCCESS, issue)); - return Span.endOnResult(span, CompletableFuture.completedFuture(res)); + SpanFinalizer.finishByStatus(commitSpan, res.getStatus()); + return CompletableFuture.completedFuture(res); } YdbQuery.CommitTransactionRequest request = YdbQuery.CommitTransactionRequest.newBuilder() @@ -531,7 +537,7 @@ public CompletableFuture> commit(CommitTransactionSettings set .setTxId(transactionId) .build(); - return Span.endOnResult(span, rpc.commitTransaction(request, makeOptions(settings, span).build())) + return rpc.commitTransaction(request, makeOptions(settings, commitSpan).build()) .thenApply(res -> { Status status = res.getStatus(); currentStatusFuture.complete(status); @@ -545,27 +551,37 @@ public CompletableFuture> commit(CommitTransactionSettings set if (th != null) { currentStatusFuture.completeExceptionally( new RuntimeException("Transaction commit failed with exception", th)); + SpanFinalizer.finishByError(commitSpan, th); + return; } - })); + SpanFinalizer.finishByStatus(commitSpan, status.getStatus()); + })).whenComplete((status, th) -> { // добавить + long elapsed = System.nanoTime() - startNanos; + Status finalStatus = status != null ? status.getStatus() : Status.of(StatusCode.CLIENT_INTERNAL_ERROR); + rpc.getMeter().recordOperation("ydb.Commit", elapsed, finalStatus); + }); } @Override public CompletableFuture rollback(RollbackTransactionSettings settings) { - Span span = startSpan("ydb.Rollback"); + final Span rollbackSpan = rpc.startSpan("ydb.Rollback"); + final long startNanos = System.nanoTime(); + CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>()); - String transactionId = txId.get(); + final String transactionId = txId.get(); if (transactionId == null) { Issue issue = Issue.of("Transaction is not started", Issue.Severity.WARNING); Status status = Status.of(StatusCode.SUCCESS, issue); - return Span.endOnStatus(span, CompletableFuture.completedFuture(status)); + SpanFinalizer.finishByStatus(rollbackSpan, status); + return CompletableFuture.completedFuture(status); } YdbQuery.RollbackTransactionRequest request = YdbQuery.RollbackTransactionRequest.newBuilder() .setSessionId(sessionId) .setTxId(transactionId) .build(); - return Span.endOnResult(span, rpc.rollbackTransaction(request, makeOptions(settings, span).build())) + return rpc.rollbackTransaction(request, makeOptions(settings, rollbackSpan).build()) .thenApply(result -> { updateSessionState(result.getStatus()); if (!txId.compareAndSet(transactionId, null)) { @@ -578,6 +594,15 @@ public CompletableFuture rollback(RollbackTransactionSettings settings) currentStatusFuture.complete(Status .of(StatusCode.ABORTED) .withIssues(Issue.of("Transaction was rolled back", Issue.Severity.ERROR))); + if (th != null) { + SpanFinalizer.finishByError(rollbackSpan, th); + return; + } + SpanFinalizer.finishByStatus(rollbackSpan, status); + }).whenComplete((status, th) -> { + long elapsed = System.nanoTime() - startNanos; + Status finalStatus = status != null ? status : Status.of(StatusCode.CLIENT_INTERNAL_ERROR); + rpc.getMeter().recordOperation("ydb.Rollback", elapsed, finalStatus); }); } } diff --git a/query/src/main/java/tech/ydb/query/impl/SessionPool.java b/query/src/main/java/tech/ydb/query/impl/SessionPool.java index ccbbf4889..78ad1fd76 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionPool.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionPool.java @@ -22,7 +22,9 @@ import tech.ydb.core.StatusCode; import tech.ydb.core.UnexpectedResultException; import tech.ydb.core.grpc.GrpcReadStream; +import tech.ydb.core.metrics.SessionPoolObserver; import tech.ydb.core.tracing.Span; +import tech.ydb.core.tracing.SpanFinalizer; import tech.ydb.core.utils.FutureTools; import tech.ydb.proto.query.YdbQuery; import tech.ydb.query.QuerySession; @@ -78,6 +80,23 @@ class SessionPool implements AutoCloseable { minSize, maxSize, cleaner.periodMillis); + + rpc.getMeter().registerSessionPool("default", new SessionPoolObserver() { + @Override + public int getIdleCount() { + return queue.getIdleCount(); + } + + @Override + public int getUsedCount() { + return queue.getUsedCount(); + } + + @Override + public int getPendingCount() { + return queue.getPendingCount(); + } + }); } public void updateMaxSize(int maxSize) { @@ -277,16 +296,43 @@ public CompletableFuture create() { Context previous = ctx.attach(); try { Span createSpan = rpc.startSpan("ydb.CreateSession"); + long startNanos = System.nanoTime(); + stats.requested.increment(); - return Span.endOnResult(createSpan, SessionImpl.createSession(rpc, CREATE_SETTINGS, true, createSpan)) + return SessionImpl + .createSession(rpc, CREATE_SETTINGS, true, createSpan) .thenCompose(r -> { if (!r.isSuccess()) { + SpanFinalizer.finishByStatus(createSpan, r.getStatus()); stats.failed.increment(); throw new UnexpectedResultException("create session problem", r.getStatus()); } PooledQuerySession session = new PooledQuerySession(rpc, r.getValue()); return session.start(); - }).thenApply(Result::getValue); + }) + .whenComplete((result, th) -> { + if (th != null) { + Throwable error = FutureTools.unwrapCompletionException(th); + if (error instanceof UnexpectedResultException) { + SpanFinalizer.finishByStatus( + createSpan, + ((UnexpectedResultException) error).getStatus() + ); + } else { + SpanFinalizer.finishByError(createSpan, error); + } + return; + } + + SpanFinalizer.finishByStatus(createSpan, result.getStatus()); + }) + .whenComplete((status, th) -> { + long elapsed = System.nanoTime() - startNanos; + Status finalStatus = status != null ? status.getStatus() : Status.of(StatusCode.CLIENT_INTERNAL_ERROR); + rpc.getMeter().recordOperation("ydb.CreateSession", elapsed, finalStatus); + rpc.getMeter().recordSessionCreateTime("default", elapsed); + }) + .thenApply(Result::getValue); } finally { ctx.detach(previous); } diff --git a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java index 8fde13bc8..f9f1dee21 100644 --- a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java @@ -16,7 +16,7 @@ import tech.ydb.core.UnexpectedResultException; import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.core.tracing.Span; -import tech.ydb.core.tracing.Tracer; +import tech.ydb.core.tracing.SpanFinalizer; import tech.ydb.proto.ValueProtos; import tech.ydb.proto.query.YdbQuery; import tech.ydb.proto.table.YdbTable; @@ -61,11 +61,6 @@ public ScheduledExecutorService getScheduler() { return proxy.getScheduler(); } - @Override - public Tracer getTracer() { - return proxy.getTracer(); - } - @Override public SessionPoolStats sessionPoolStats() { return proxy.getSessionPoolStats(); @@ -92,9 +87,6 @@ private YdbQuery.TransactionControl mapTxControl(YdbTable.TransactionControl tc) if (tc.getBeginTx().hasSnapshotReadOnly()) { return TxControl.txModeCtrl(TxMode.SNAPSHOT_RO, tc.getCommitTx()); } - if (tc.getBeginTx().hasSnapshotReadWrite()) { - return TxControl.txModeCtrl(TxMode.SNAPSHOT_RW, tc.getCommitTx()); - } if (tc.getBeginTx().hasStaleReadOnly()) { return TxControl.txModeCtrl(TxMode.STALE_RO, tc.getCommitTx()); } @@ -132,9 +124,10 @@ public CompletableFuture> executeDataQueryInternal( final List issues = new ArrayList<>(); final List results = new ArrayList<>(); Span span = querySession.startSpan("ydb.ExecuteQuery"); + final long startNanos = System.nanoTime(); QueryStream stream = querySession.new StreamImpl(querySession.createGrpcStream(query, tc, prms, qs, span), - span) { + span, startNanos, "ydb.ExecuteQuery") { @Override void handleTxMeta(String txID) { txRef.set(txID); @@ -205,23 +198,23 @@ public CompletableFuture> beginTransaction(TxMode txMod } @Override - protected CompletableFuture commitTransactionInternal(String txId, CommitTxSettings settings) { + public CompletableFuture commitTransaction(String txId, CommitTxSettings settings) { Span span = querySession.startSpan("ydb.Commit"); CommitTransactionSettings querySettings = CommitTransactionSettings.newBuilder() .withTraceId(settings.getTraceId()) .withRequestTimeout(settings.getTimeoutDuration()) .build(); - return Span.endOnStatus(span, querySession.commitById(txId, querySettings, span)); + return querySession.commitById(txId, querySettings, span).whenComplete(SpanFinalizer.whenComplete(span)); } @Override - protected CompletableFuture rollbackTransactionInternal(String txId, RollbackTxSettings settings) { + public CompletableFuture rollbackTransaction(String txId, RollbackTxSettings settings) { Span span = querySession.startSpan("ydb.Rollback"); RollbackTransactionSettings querySettings = RollbackTransactionSettings.newBuilder() .withTraceId(settings.getTraceId()) .withRequestTimeout(settings.getTimeoutDuration()) .build(); - return Span.endOnStatus(span, querySession.rollbackById(txId, querySettings, span)); + return querySession.rollbackById(txId, querySettings, span).whenComplete(SpanFinalizer.whenComplete(span)); } private final class TracedTableTransaction implements TableTransaction { @@ -258,7 +251,7 @@ public CompletableFuture rollback(RollbackTxSettings settings) { if (txId == null) { return delegate.rollback(settings); } - return rollbackTransactionInternal(txId, settings); + return TableSession.this.rollbackTransaction(txId, settings); } @Override diff --git a/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java b/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java index 303e69229..34f5f1893 100644 --- a/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java +++ b/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java @@ -4,16 +4,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.annotation.Nullable; import org.junit.After; import org.junit.AfterClass; @@ -27,10 +17,7 @@ import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Result; import tech.ydb.core.Status; -import tech.ydb.core.StatusCode; -import tech.ydb.core.UnexpectedResultException; import tech.ydb.core.grpc.GrpcTransport; -import tech.ydb.core.tracing.Scope; import tech.ydb.core.tracing.Span; import tech.ydb.core.tracing.SpanKind; import tech.ydb.core.tracing.Tracer; @@ -38,8 +25,6 @@ import tech.ydb.query.QuerySession; import tech.ydb.query.QueryTransaction; import tech.ydb.query.result.QueryInfo; -import tech.ydb.query.tools.SessionRetryContext; -import tech.ydb.proto.StatusCodesProtos; import tech.ydb.table.Session; import tech.ydb.table.TableClient; import tech.ydb.table.query.DataQueryResult; @@ -54,7 +39,6 @@ public class QueryTracingTest { private static GrpcTransport transport; private static RecordingTracer tracer; - private static final GrpcTestInterceptor grpcInterceptor = new GrpcTestInterceptor(); private QueryClient queryClient; private TableClient tableClient; @@ -64,7 +48,6 @@ public static void initTransport() { tracer = new RecordingTracer(); transport = GrpcTransport.forEndpoint(YDB.endpoint(), YDB.database()) .withAuthProvider(new TokenAuthProvider(YDB.authToken())) - .addChannelInitializer(grpcInterceptor) .withTracer(tracer) .build(); } @@ -77,7 +60,6 @@ public static void closeTransport() { @Before public void initClient() { tracer.reset(); - grpcInterceptor.reset(); queryClient = QueryClient.newClient(transport).build(); tableClient = null; } @@ -92,22 +74,13 @@ public void closeClient() { } } - private static void awaitTracing() { - try { - tracer.awaitAllSpansClosed(Duration.ofSeconds(30)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - Assert.fail(e.toString()); - } - } - @Test public void createSessionSpanIsRecorded() { - try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { - Assert.assertNotNull(session.getId()); - awaitTracing(); - Assert.assertEquals(1, tracer.countClosedSpan("ydb.CreateSession")); + try (QuerySession ignored = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + // no-op } + + Assert.assertEquals(1, tracer.countClosedSpan("ydb.CreateSession")); } @Test @@ -116,7 +89,6 @@ public void executeQuerySpanIsRecorded() { session.createQuery("SELECT 1", TxMode.NONE).execute().join().getStatus().expectSuccess(); } - awaitTracing(); Assert.assertEquals(1, tracer.countClosedSpan("ydb.ExecuteQuery")); } @@ -131,7 +103,6 @@ public void commitSpanIsRecordedInQueryTransaction() { commitResult.getStatus().expectSuccess(); } - awaitTracing(); Assert.assertEquals(1, tracer.countClosedSpan("ydb.Commit")); } @@ -146,7 +117,6 @@ public void rollbackSpanIsRecordedInQueryTransaction() { rollbackStatus.expectSuccess(); } - awaitTracing(); Assert.assertEquals(1, tracer.countClosedSpan("ydb.Rollback")); } @@ -162,7 +132,6 @@ public void createSessionAndExecuteQuerySpansAreRecordedInTableProxy() { result.getStatus().expectSuccess(); } - awaitTracing(); Assert.assertEquals(1, tracer.countClosedSpan("ydb.CreateSession")); Assert.assertEquals(1, tracer.countClosedSpan("ydb.ExecuteQuery")); } @@ -184,406 +153,47 @@ public void executeQuerySpansAreRecordedInTableProxyTransaction() { rollbackStatus.expectSuccess(); } - awaitTracing(); Assert.assertEquals(1, tracer.countClosedSpan("ydb.CreateSession")); Assert.assertEquals(2, tracer.countClosedSpan("ydb.ExecuteQuery")); Assert.assertEquals(1, tracer.countClosedSpan("ydb.Commit")); Assert.assertEquals(1, tracer.countClosedSpan("ydb.Rollback")); } - @Test - public void querySpanIsChildOfApplicationSpan() { - try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { - Span appParent = tracer.startSpan("app.parent", SpanKind.INTERNAL); - try (Scope ignored = appParent.makeCurrent()) { - session.createQuery("SELECT 1", TxMode.NONE).execute().join().getStatus().expectSuccess(); - } finally { - appParent.end(); - } - } - - awaitTracing(); - Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "app.parent")); - } - - @Test - public void retrySpanIsParentOfRpcSpans() { - grpcInterceptor.failExecuteQuery(StatusCodesProtos.StatusIds.StatusCode.BAD_SESSION, 2); - SessionRetryContext retryContext = SessionRetryContext.create(queryClient) - .maxRetries(5) - .backoffSlot(Duration.ofMillis(1)) - .fastBackoffSlot(Duration.ofMillis(1)) - .build(); - - Span appParent = tracer.startSpan("app.parent.retry", SpanKind.INTERNAL); - try (Scope ignored = appParent.makeCurrent()) { - Status status = retryContext.supplyStatus(session -> - session.createQuery("SELECT 1", TxMode.NONE).execute().thenApply(Result::getStatus)).join(); - status.expectSuccess(); - } finally { - appParent.end(); - } - - awaitTracing(); - Assert.assertEquals(3, tracer.countClosedSpan("ydb.Try")); - Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.RunWithRetry", "app.parent.retry")); - Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.Try", "ydb.RunWithRetry")); - Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.CreateSession", "ydb.Try")); - Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.Try")); - // First Try has no backoff_ms (no prior sleep); subsequent tries capture the prior sleep duration - Assert.assertEquals(2, tracer.countClosedSpanWithAttribute("ydb.Try", "ydb.retry.backoff_ms")); - } - - @Test - public void retryContextRetriesOnCreateSessionFailures() { - grpcInterceptor.failCreateSession(StatusCodesProtos.StatusIds.StatusCode.ABORTED, 2); - SessionRetryContext retryContext = SessionRetryContext.create(queryClient) - .maxRetries(5) - .backoffSlot(Duration.ofMillis(1)) - .fastBackoffSlot(Duration.ofMillis(1)) - .build(); - - Span appParent = tracer.startSpan("app.parent.createSession.retry", SpanKind.INTERNAL); - try (Scope ignored = appParent.makeCurrent()) { - Status status = retryContext.supplyStatus(session -> - session.createQuery("SELECT 1", TxMode.NONE).execute().thenApply(Result::getStatus)).join(); - status.expectSuccess(); - } finally { - appParent.end(); - } - - awaitTracing(); - Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.RunWithRetry", "app.parent.createSession.retry")); - Assert.assertEquals(3, tracer.countClosedSpan("ydb.Try")); - Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.Try", "ydb.RunWithRetry")); - Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.CreateSession", "ydb.Try")); - Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.Try")); - Assert.assertEquals(2, tracer.countClosedSpanWithAttribute("ydb.Try", "ydb.retry.backoff_ms")); - } - - @Test - public void retryContextRetriesOnCommitFailures() { - grpcInterceptor.failCommit(StatusCodesProtos.StatusIds.StatusCode.BAD_SESSION, 2); - SessionRetryContext retryContext = SessionRetryContext.create(queryClient) - .maxRetries(5) - .backoffSlot(Duration.ofMillis(1)) - .fastBackoffSlot(Duration.ofMillis(1)) - .build(); - - Span appParent = tracer.startSpan("app.parent.commit.retry", SpanKind.INTERNAL); - try (Scope ignored = appParent.makeCurrent()) { - Status status = retryContext.supplyStatus(session -> { - QueryTransaction tx = session.createNewTransaction(TxMode.SERIALIZABLE_RW); - return tx.createQuery("SELECT 1").execute() - .thenCompose(queryResult -> { - queryResult.getStatus().expectSuccess(); - return tx.commit().thenApply(Result::getStatus); - }); - } - ).join(); - status.expectSuccess(); - } finally { - appParent.end(); - } - - awaitTracing(); - Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.RunWithRetry", "app.parent.commit.retry")); - Assert.assertEquals(3, tracer.countClosedSpan("ydb.Try")); - Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.Try", "ydb.RunWithRetry")); - Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.CreateSession", "ydb.Try")); - Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.Try")); - Assert.assertEquals(3, tracer.countClosedSpanWithParent("ydb.Commit", "ydb.Try")); - Assert.assertEquals(2, tracer.countClosedSpanWithAttribute("ydb.Try", "ydb.retry.backoff_ms")); - } - - @Test - public void tableProxyRetrySpanIsParentOfRpcSpans() { - AtomicInteger attempt = new AtomicInteger(); - tableClient = QueryClient.newTableClient(transport).build(); - tech.ydb.table.SessionRetryContext retryContext = tech.ydb.table.SessionRetryContext.create(tableClient) - .maxRetries(5) - .backoffSlot(Duration.ofMillis(1)) - .fastBackoffSlot(Duration.ofMillis(1)) - .build(); - - Span appParent = tracer.startSpan("app.parent.table.retry", SpanKind.INTERNAL); - try (Scope ignored = appParent.makeCurrent()) { - Status status = retryContext.supplyStatus(session -> { - if (attempt.getAndIncrement() == 0) { - return CompletableFuture.completedFuture(Status.of(StatusCode.OVERLOADED)); - } - TableTransaction tx = session.createNewTransaction(TxMode.SERIALIZABLE_RW); - Result queryResult = tx.executeDataQuery("SELECT 1", Params.empty()).join(); - queryResult.getStatus().expectSuccess(); - return tx.commit(); - }).join(); - status.expectSuccess(); - } finally { - appParent.end(); - } - - awaitTracing(); - Assert.assertEquals(2, tracer.countClosedSpan("ydb.Try")); - Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.RunWithRetry", "app.parent.table.retry")); - Assert.assertEquals(2, tracer.countClosedSpanWithParent("ydb.Try", "ydb.RunWithRetry")); - Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.CreateSession", "ydb.Try")); - Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.Try")); - Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.Commit", "ydb.Try")); - } - - @Test - public void nonRetryableExceptionClosesRetrySpan() { - SessionRetryContext retryContext = SessionRetryContext.create(queryClient) - .maxRetries(5) - .backoffSlot(Duration.ofMillis(1)) - .fastBackoffSlot(Duration.ofMillis(1)) - .build(); - - RuntimeException thrown; - try { - retryContext.supplyStatus(session -> { - throw new IllegalStateException("boom"); - }).join(); - throw new AssertionError("Exception expected"); - } catch (RuntimeException ex) { - thrown = ex; - } - - awaitTracing(); - Assert.assertNotNull(thrown.getCause()); - Assert.assertTrue(thrown.getCause() instanceof IllegalStateException); - Assert.assertEquals(1, - tracer.countClosedSpanWithErrorType("ydb.RunWithRetry", IllegalStateException.class.getName())); - Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.Try", "ydb.RunWithRetry")); - Assert.assertEquals(1, tracer.countClosedSpan("ydb.Try")); - Assert.assertEquals(1, - tracer.countClosedSpanWithErrorType("ydb.Try", IllegalStateException.class.getName())); - Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.CreateSession", "ydb.Try")); - Assert.assertEquals(0, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.Try")); - } - - @Test - public void retryableUnexpectedResultExceptionRetriesAndSetsErrorType() { - AtomicInteger attempt = new AtomicInteger(); - SessionRetryContext retryContext = SessionRetryContext.create(queryClient) - .maxRetries(5) - .backoffSlot(Duration.ofMillis(1)) - .fastBackoffSlot(Duration.ofMillis(1)) - .build(); - - Status status = retryContext.supplyStatus(session -> { - if (attempt.getAndIncrement() == 0) { - throw new UnexpectedResultException("retryable", Status.of(StatusCode.OVERLOADED)); - } - return session.createQuery("SELECT 1", TxMode.NONE).execute().thenApply(Result::getStatus); - }).join(); - status.expectSuccess(); - - awaitTracing(); - Assert.assertEquals(1, tracer.countClosedSpan("ydb.RunWithRetry")); - Assert.assertEquals(2, tracer.countClosedSpanWithParent("ydb.Try", "ydb.RunWithRetry")); - Assert.assertEquals(2, tracer.countClosedSpan("ydb.Try")); - Assert.assertEquals(1, - tracer.countClosedSpanWithErrorType("ydb.Try", UnexpectedResultException.class.getName())); - Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.Try")); - } - - @Test - public void contextCancelClosesRunWithRetrySpan() throws InterruptedException { - SessionRetryContext retryContext = SessionRetryContext.create(queryClient) - .maxRetries(5) - .backoffSlot(Duration.ofMillis(1)) - .fastBackoffSlot(Duration.ofMillis(1)) - .build(); - - CountDownLatch firstPartReceived = new CountDownLatch(1); - CountDownLatch unblockReader = new CountDownLatch(1); - - CompletableFuture future = retryContext.supplyStatus(session -> - session.createQuery("SELECT 1; SELECT 2;", TxMode.NONE).execute(part -> { - firstPartReceived.countDown(); - try { - unblockReader.await(30, TimeUnit.SECONDS); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - }).thenApply(Result::getStatus)); - - Assert.assertTrue(firstPartReceived.await(30, TimeUnit.SECONDS)); - future.cancel(false); - unblockReader.countDown(); - - awaitTracing(); - Assert.assertThrows(CancellationException.class, future::join); - Assert.assertTrue(future.isCancelled()); - Assert.assertEquals(1, - tracer.countClosedSpanWithErrorType("ydb.RunWithRetry", CancellationException.class.getName())); - } - private static final class RecordingTracer implements Tracer { - private final List spans = Collections.synchronizedList(new ArrayList<>()); - private final ThreadLocal currentSpan = new ThreadLocal<>(); - - private final Object spanLock = new Object(); - private int openSpans = 0; + private final List spans = Collections.synchronizedList(new ArrayList()); @Override public Span startSpan(String spanName, SpanKind spanKind) { - RecordingSpan span = new RecordingSpan(this, spanName, spanKind, currentSpan.get()); + RecordingSpan span = new RecordingSpan(spanName, spanKind); spans.add(span); return span; } - /** Called from {@link RecordingSpan} constructor: one span opened. */ - void recordSpanOpen() { - synchronized (spanLock) { - openSpans++; - } - } - - /** Called from {@link RecordingSpan#end()}: one span closed. */ - void recordSpanClose() { - synchronized (spanLock) { - openSpans--; - if (openSpans == 0) { - spanLock.notifyAll(); - } - } - } - void reset() { - synchronized (spanLock) { - spans.clear(); - openSpans = 0; - } - } - - /** Waits until every span that called {@link #recordSpanOpen} has ended ({@code openSpans == 0}). */ - void awaitAllSpansClosed(Duration timeout) throws InterruptedException { - long deadlineNanos = System.nanoTime() + timeout.toNanos(); - synchronized (spanLock) { - while (openSpans > 0) { - long remainingNanos = deadlineNanos - System.nanoTime(); - if (remainingNanos <= 0) { - Assert.fail("await timeout, openSpans=" + openSpans); - } - long waitMillis = TimeUnit.NANOSECONDS.toMillis(remainingNanos); - if (waitMillis == 0) { - waitMillis = 1; - } - spanLock.wait(waitMillis); - } - } + spans.clear(); } int countClosedSpan(String spanName) { int count = 0; synchronized (spans) { for (RecordingSpan span : spans) { - if (span.spanName().equals(spanName) && span.isClosed()) { - count++; - } - } - } - return count; - } - - int countClosedSpanWithParent(String spanName, String parentSpanName) { - int count = 0; - synchronized (spans) { - for (RecordingSpan span : spans) { - if (span.isClosed() - && span.spanName().equals(spanName) - && span.parentSpan() != null - && span.parentSpan().spanName().equals(parentSpanName)) { - count++; - } - } - } - return count; - } - - int countClosedSpanWithErrorType(String spanName, String errorType) { - int count = 0; - synchronized (spans) { - for (RecordingSpan span : spans) { - Throwable err = span.throwableError(); - if (span.isClosed() - && span.spanName().equals(spanName) - && err != null - && err.getClass().getName().equals(errorType)) { + if (span.name.equals(spanName) && span.closed) { count++; } } } return count; } - - int countClosedSpanWithAttribute(String spanName, String key) { - int count = 0; - synchronized (spans) { - for (RecordingSpan span : spans) { - if (span.isClosed() - && span.spanName().equals(spanName) - && span.hasLongAttribute(key)) { - count++; - } - } - } - return count; - } - - Scope makeSpanCurrent(RecordingSpan span) { - RecordingSpan previous = currentSpan.get(); - currentSpan.set(span); - return () -> { - if (previous == null) { - currentSpan.remove(); - } else { - currentSpan.set(previous); - } - }; - } } private static final class RecordingSpan implements Span { - private final RecordingTracer tracer; private final String name; private final SpanKind kind; - private final RecordingSpan parent; - private final ConcurrentMap longAttributes = new ConcurrentHashMap<>(); - private Throwable throwableError; private volatile boolean closed = false; - private final AtomicBoolean endedOnce = new AtomicBoolean(false); - RecordingSpan(RecordingTracer tracer, String name, SpanKind kind, RecordingSpan parent) { - this.tracer = tracer; + RecordingSpan(String name, SpanKind kind) { this.name = name; this.kind = kind; - this.parent = parent; - tracer.recordSpanOpen(); - } - - String spanName() { - return name; - } - - boolean isClosed() { - return closed; - } - - @Nullable - RecordingSpan parentSpan() { - return parent; - } - - @Nullable - Throwable throwableError() { - return throwableError; - } - - boolean hasLongAttribute(String key) { - return longAttributes.containsKey(key); } @Override @@ -592,37 +202,28 @@ public String getId() { } @Override - public boolean isValid() { - return true; - } - - @Override - public Scope makeCurrent() { - return tracer.makeSpanCurrent(this); + public void setAttribute(String key, String value) { + // not needed for this test } @Override - public Scope restoreContext() { - return parent != null ? parent.makeCurrent() : Span.NOOP.makeCurrent(); + public void setAttribute(String key, long value) { + // not needed for this test } @Override - public void setStatus(@Nullable Status status, @Nullable Throwable error) { - this.throwableError = error; + public void setError(Status status) { + // not needed for this test } @Override - public void setAttribute(String key, long value) { - longAttributes.put(key, value); + public void setError(Throwable error) { + // not needed for this test } @Override public void end() { - if (!endedOnce.compareAndSet(false, true)) { - return; - } this.closed = true; - tracer.recordSpanClose(); } } } diff --git a/table/src/main/java/tech/ydb/table/impl/SimpleTableClient.java b/table/src/main/java/tech/ydb/table/impl/SimpleTableClient.java index 596466acb..199d3ae73 100644 --- a/table/src/main/java/tech/ydb/table/impl/SimpleTableClient.java +++ b/table/src/main/java/tech/ydb/table/impl/SimpleTableClient.java @@ -6,7 +6,6 @@ import tech.ydb.core.Result; import tech.ydb.core.StatusCode; -import tech.ydb.core.tracing.Tracer; import tech.ydb.table.Session; import tech.ydb.table.SessionSupplier; import tech.ydb.table.rpc.TableRpc; @@ -39,11 +38,6 @@ public ScheduledExecutorService getScheduler() { return tableRpc.getScheduler(); } - @Override - public Tracer getTracer() { - return tableRpc.getTracer(); - } - public static Builder newClient(TableRpc rpc) { return new Builder(rpc); }