diff --git a/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java b/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java index 96816601f..fba887dee 100644 --- a/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java +++ b/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java @@ -12,6 +12,8 @@ import io.grpc.MethodDescriptor; import tech.ydb.core.Result; +import tech.ydb.core.metrics.Meter; +import tech.ydb.core.metrics.NoopMeter; import tech.ydb.core.tracing.NoopTracer; import tech.ydb.core.tracing.Tracer; import tech.ydb.core.utils.URITools; @@ -46,6 +48,10 @@ default Tracer getTracer() { return NoopTracer.getInstance(); } + default Meter getMeter() { + return NoopMeter.getInstance(); + } + @Override void close(); @@ -86,7 +92,7 @@ static GrpcTransportBuilder forConnectionString(String connectionString) { scheme = uri.getScheme(); } catch (URISyntaxException | RuntimeException e) { throw new IllegalArgumentException("Failed to parse connection string '" + connectionString + - "'. Expected format: [://][:]/?database=", e); + "'. Expected format: [://][:]/?database=", e); } GrpcTransportBuilder builder = new GrpcTransportBuilder(endpoint, null, database); if (scheme.equals("grpcs")) { diff --git a/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java b/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java index 534cf08be..a77cab841 100644 --- a/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java +++ b/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java @@ -26,6 +26,8 @@ import tech.ydb.core.impl.auth.GrpcAuthRpc; import tech.ydb.core.impl.pool.ChannelFactoryLoader; import tech.ydb.core.impl.pool.ManagedChannelFactory; +import tech.ydb.core.metrics.Meter; +import tech.ydb.core.metrics.NoopMeter; import tech.ydb.core.tracing.NoopTracer; import tech.ydb.core.tracing.Tracer; import tech.ydb.core.utils.Version; @@ -87,6 +89,7 @@ public enum InitMode { private GrpcCompression compression = GrpcCompression.NO_COMPRESSION; private InitMode initMode = InitMode.SYNC; private Tracer tracer = NoopTracer.getInstance(); + private Meter meter = NoopMeter.getInstance(); /** * can cause leaks https://github.com/grpc/grpc-java/issues/9340 @@ -195,6 +198,10 @@ public Tracer getTracer() { return tracer; } + public Meter getMeter() { + return meter; + } + public ManagedChannelFactory getManagedChannelFactory() { if (channelFactoryBuilder == null) { channelFactoryBuilder = ChannelFactoryLoader.load(); @@ -423,6 +430,11 @@ public GrpcTransportBuilder withTracer(Tracer tracer) { return this; } + public GrpcTransportBuilder withMeter(Meter meter) { + this.meter = Objects.requireNonNull(meter, "meter is null"); + return this; + } + /** * use {@link GrpcTransportBuilder#withGrpcRetry(boolean) } instead * @return this diff --git a/core/src/main/java/tech/ydb/core/impl/YdbTransportImpl.java b/core/src/main/java/tech/ydb/core/impl/YdbTransportImpl.java index 52d4ba018..768634801 100644 --- a/core/src/main/java/tech/ydb/core/impl/YdbTransportImpl.java +++ b/core/src/main/java/tech/ydb/core/impl/YdbTransportImpl.java @@ -21,6 +21,7 @@ import tech.ydb.core.impl.pool.GrpcChannel; import tech.ydb.core.impl.pool.GrpcChannelPool; import tech.ydb.core.impl.pool.ManagedChannelFactory; +import tech.ydb.core.metrics.Meter; import tech.ydb.core.tracing.Tracer; /** @@ -37,6 +38,7 @@ public class YdbTransportImpl extends BaseGrpcTransport { private final GrpcChannelPool channelPool; private final YdbDiscovery discovery; private final Tracer tracer; + private final Meter meter; public YdbTransportImpl(GrpcTransportBuilder builder) { super(builder); @@ -45,6 +47,7 @@ public YdbTransportImpl(GrpcTransportBuilder builder) { this.database = Strings.nullToEmpty(builder.getDatabase()); this.tracer = builder.getTracer(); + this.meter = builder.getMeter(); logger.info("Create YDB transport with endpoint {} and {}", serverEndpoint, balancingSettings); @@ -124,6 +127,11 @@ public Tracer getTracer() { return tracer; } + @Override + public Meter getMeter() { + return meter; + } + @Override public AuthCallOptions getAuthCallOptions() { return callOptions; diff --git a/core/src/main/java/tech/ydb/core/metrics/Meter.java b/core/src/main/java/tech/ydb/core/metrics/Meter.java new file mode 100644 index 000000000..c94913257 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/metrics/Meter.java @@ -0,0 +1,9 @@ +package tech.ydb.core.metrics; + +import tech.ydb.core.Status; + +public interface Meter { + void recordOperation(String name, long durationNanos, Status status); + void registerSessionPool(String poolName, SessionPoolObserver observer); + void recordSessionCreateTime(String poolName, long durationNanos); +} diff --git a/core/src/main/java/tech/ydb/core/metrics/NoopMeter.java b/core/src/main/java/tech/ydb/core/metrics/NoopMeter.java new file mode 100644 index 000000000..cd126efd5 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/metrics/NoopMeter.java @@ -0,0 +1,30 @@ +package tech.ydb.core.metrics; + +import tech.ydb.core.Status; + +public final class NoopMeter implements Meter { + public static final NoopMeter INSTANCE = new NoopMeter(); + + private NoopMeter() { + // No operations. + } + + public static NoopMeter getInstance() { + return INSTANCE; + } + + @Override + public void recordOperation(String name, long durationNanos, Status status) { + + } + + @Override + public void registerSessionPool(String poolName, SessionPoolObserver observer) { + + } + + @Override + public void recordSessionCreateTime(String poolName, long durationNanos) { + + } +} diff --git a/core/src/main/java/tech/ydb/core/metrics/SessionPoolObserver.java b/core/src/main/java/tech/ydb/core/metrics/SessionPoolObserver.java new file mode 100644 index 000000000..4b872c6f5 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/metrics/SessionPoolObserver.java @@ -0,0 +1,7 @@ +package tech.ydb.core.metrics; + +public interface SessionPoolObserver { + int getIdleCount(); + int getUsedCount(); + int getPendingCount(); +} diff --git a/opentelemetry/pom.xml b/opentelemetry/pom.xml new file mode 100644 index 000000000..e28adb887 --- /dev/null +++ b/opentelemetry/pom.xml @@ -0,0 +1,56 @@ + + 4.0.0 + + tech.ydb + ydb-sdk-parent + 2.4.1-SNAPSHOT + + opentelemetry + Archetype - opentelemetry + http://maven.apache.org + + + tech.ydb + ydb-sdk-core + + + io.opentelemetry + opentelemetry-api + 1.59.0 + compile + + + junit + junit + test + + + tech.ydb + ydb-sdk-common + test + + + tech.ydb + ydb-sdk-query + test + + + io.opentelemetry + opentelemetry-sdk + 1.59.0 + test + + + tech.ydb.test + ydb-junit4-support + test + + + io.opentelemetry + opentelemetry-sdk-testing + 1.59.0 + test + + + diff --git a/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryMeter.java b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryMeter.java new file mode 100644 index 000000000..63867688b --- /dev/null +++ b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryMeter.java @@ -0,0 +1,99 @@ +package tech.ydb.opentelemetry; + +import java.util.Objects; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.LongCounter; + +import tech.ydb.core.Status; +import tech.ydb.core.metrics.Meter; +import tech.ydb.core.metrics.SessionPoolObserver; + +public final class OpenTelemetryMeter implements Meter { + private final io.opentelemetry.api.metrics.Meter meter; + + private final DoubleHistogram operationDuration; // db.client.operation.duration + private final LongCounter operationFailed; // ydb.client.operation.failed + private final DoubleHistogram sessionCreateTime; // ydb.query.session.create_time + + private final Attributes baseAttributes; // db.system.name, db.namespace, server.address, server.port + + private OpenTelemetryMeter(io.opentelemetry.api.metrics.Meter meter, + String database, String host, int port) { + this.meter = Objects.requireNonNull(meter, "meter is null"); + this.operationDuration = meter.histogramBuilder("db.client.operation.duration") + .setUnit("s") + .build(); + this.operationFailed = meter.counterBuilder("ydb.client.operation.failed") + .setUnit("{command}") + .build(); + this.sessionCreateTime = meter.histogramBuilder("ydb.query.session.create_time") + .setUnit("s") + .build(); + + this.baseAttributes = Attributes.of( + AttributeKey.stringKey("db.system.name"), "ydb", + AttributeKey.stringKey("db.namespace"), database, + AttributeKey.stringKey("server.address"), host, + AttributeKey.longKey("server.port"), (long) port + ); + } + + public static OpenTelemetryMeter fromOpenTelemetry(OpenTelemetry openTelemetry, + String database, String host, int port) { + io.opentelemetry.api.metrics.Meter meter = openTelemetry + .getMeter("tech.ydb.sdk"); + + return new OpenTelemetryMeter(meter, database, host, port); + } + + public static OpenTelemetryMeter createGlobal(String database, String host, int port) { + return fromOpenTelemetry(GlobalOpenTelemetry.get(), database, host, port); + } + + @Override + public void recordOperation(String name, long durationNanos, Status status) { + Attributes attrs = baseAttributes.toBuilder() + .put("ydb.operation.name", name) + .build(); + + operationDuration.record(durationNanos / 1_000_000_000.0, attrs); + + if (status != null && !status.isSuccess()) { + Attributes errAttrs = attrs.toBuilder() + .put("db.response.status_code", status.getCode().toString()) + .build(); + operationFailed.add(1, errAttrs); + } + } + + @Override + public void registerSessionPool(String poolName, SessionPoolObserver observer) { + meter.upDownCounterBuilder("ydb.query.session.count") + .setUnit("{connection}") + .buildWithCallback(measurement -> { + Attributes idle = Attributes.of( + AttributeKey.stringKey("ydb.query.session.pool.name"), poolName, + AttributeKey.stringKey("ydb.query.session.state"), "idle" + ); + Attributes used = Attributes.of( + AttributeKey.stringKey("ydb.query.session.pool.name"), poolName, + AttributeKey.stringKey("ydb.query.session.state"), "used" + ); + measurement.record(observer.getIdleCount(), idle); + measurement.record(observer.getUsedCount(), used); + }); + } + + @Override + public void recordSessionCreateTime(String poolName, long durationNanos) { + Attributes attrs = Attributes.of( + AttributeKey.stringKey("ydb.query.session.pool.name"), poolName + ); + sessionCreateTime.record(durationNanos / 1_000_000_000.0, attrs); + } +} diff --git a/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java new file mode 100644 index 000000000..82b8ebd4a --- /dev/null +++ b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java @@ -0,0 +1,79 @@ +package tech.ydb.opentelemetry; + +import java.util.Objects; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.StatusCode; + +import tech.ydb.core.Status; +import tech.ydb.core.tracing.Span; +import tech.ydb.core.tracing.SpanKind; +import tech.ydb.core.tracing.Tracer; + +public final class OpenTelemetryTracer implements Tracer { + private static final String DEFAULT_SCOPE = "tech.ydb.sdk"; + + private final io.opentelemetry.api.trace.Tracer tracer; + + private OpenTelemetryTracer(io.opentelemetry.api.trace.Tracer tracer) { + this.tracer = Objects.requireNonNull(tracer, "tracer is null"); + } + + public static OpenTelemetryTracer createGlobal() { + return fromOpenTelemetry(GlobalOpenTelemetry.get()); + } + + public static OpenTelemetryTracer fromOpenTelemetry(OpenTelemetry openTelemetry) { + return fromOpenTelemetry(openTelemetry, DEFAULT_SCOPE); + } + + public static OpenTelemetryTracer fromOpenTelemetry(OpenTelemetry openTelemetry, String scopeName) { + Objects.requireNonNull(openTelemetry, "openTelemetry is null"); + Objects.requireNonNull(scopeName, "scopeName is null"); + return new OpenTelemetryTracer(openTelemetry.getTracer(scopeName)); + } + + @Override + public Span startSpan(String spanName, SpanKind spanKind) { + io.opentelemetry.api.trace.Span span = tracer.spanBuilder(spanName) + .setSpanKind(mapSpanKind(spanKind)) + .startSpan(); + return new OtelSpan(span); + } + + private static io.opentelemetry.api.trace.SpanKind mapSpanKind(SpanKind kind) { + if (kind == SpanKind.CLIENT) { + return io.opentelemetry.api.trace.SpanKind.CLIENT; + } + return io.opentelemetry.api.trace.SpanKind.INTERNAL; + } + + private static final class OtelSpan implements Span { + private final io.opentelemetry.api.trace.Span span; + + private OtelSpan(io.opentelemetry.api.trace.Span span) { + this.span = span; + } + + @Override + public String getId() { + return "00-" + span.getSpanContext().getTraceId() + "-" + span.getSpanContext().getSpanId() + "-01"; + } + + @Override + public void setAttribute(String key, String value) { + span.setAttribute(key, value); + } + + @Override + public void setAttribute(String key, long value) { + span.setAttribute(key, value); + } + + @Override + public void end() { + span.end(); + } + } +} diff --git a/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryMetricsIntegrationTest.java b/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryMetricsIntegrationTest.java new file mode 100644 index 000000000..dc4d5c736 --- /dev/null +++ b/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryMetricsIntegrationTest.java @@ -0,0 +1,192 @@ +package tech.ydb.opentelemetry; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import tech.ydb.auth.TokenAuthProvider; +import tech.ydb.common.transaction.TxMode; +import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.query.QueryClient; +import tech.ydb.query.QuerySession; +import tech.ydb.query.QueryTransaction; +import tech.ydb.test.junit4.YdbHelperRule; + +public class OpenTelemetryMetricsIntegrationTest { + @ClassRule + public static final YdbHelperRule YDB = new YdbHelperRule(); + + private static final AttributeKey DB_SYSTEM_NAME = AttributeKey.stringKey("db.system.name"); + private static final AttributeKey DB_NAMESPACE = AttributeKey.stringKey("db.namespace"); + private static final AttributeKey SERVER_ADDRESS = AttributeKey.stringKey("server.address"); + private static final AttributeKey SERVER_PORT = AttributeKey.longKey("server.port"); + private static final AttributeKey OPERATION_NAME = AttributeKey.stringKey("ydb.operation.name"); + + private static InMemoryMetricReader metricReader; + private static SdkMeterProvider meterProvider; + private static GrpcTransport transport; + + private QueryClient queryClient; + + @BeforeClass + public static void initTransport() { + metricReader = InMemoryMetricReader.create(); + meterProvider = SdkMeterProvider.builder() + .registerMetricReader(metricReader) + .build(); + + OpenTelemetry openTelemetry = OpenTelemetrySdk.builder() + .setMeterProvider(meterProvider) + .build(); + + String host = extractHost(YDB.endpoint()); + int port = extractPort(YDB.endpoint()); + + transport = GrpcTransport.forEndpoint(YDB.endpoint(), YDB.database()) + .withAuthProvider(new TokenAuthProvider(YDB.authToken())) + .withMeter(OpenTelemetryMeter.fromOpenTelemetry(openTelemetry, YDB.database(), host, port)) + .build(); + } + + @AfterClass + public static void closeTransport() throws IOException { + transport.close(); + meterProvider.close(); + metricReader.close(); + } + + @Before + public void initClient() { + queryClient = QueryClient.newClient(transport).build(); + } + + @After + public void closeClient() { + queryClient.close(); + } + + @Test + public void executeQueryRecordsOperationDuration() { + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + session.createQuery("SELECT 1", TxMode.NONE).execute().join().getStatus().expectSuccess(); + } + + MetricData metric = findMetric("db.client.operation.duration"); + Assert.assertNotNull("db.client.operation.duration metric not found", metric); + + HistogramPointData point = findHistogramPoint(metric, "ydb.ExecuteQuery"); + Assert.assertNotNull("No histogram point for ydb.ExecuteQuery", point); + Assert.assertTrue("Duration must be > 0", point.getSum() > 0); + Assert.assertEquals("ydb", point.getAttributes().get(DB_SYSTEM_NAME)); + Assert.assertEquals(YDB.database(), point.getAttributes().get(DB_NAMESPACE)); + Assert.assertNotNull(point.getAttributes().get(SERVER_ADDRESS)); + Assert.assertNotNull(point.getAttributes().get(SERVER_PORT)); + } + + @Test + public void commitAndRollbackRecordOperationDuration() { + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + QueryTransaction txCommit = session.beginTransaction(TxMode.SERIALIZABLE_RW) + .join().getValue(); + txCommit.createQuery("SELECT 1").execute().join().getStatus().expectSuccess(); + txCommit.commit().join().getStatus().expectSuccess(); + + QueryTransaction txRollback = session.beginTransaction(TxMode.SERIALIZABLE_RW) + .join().getValue(); + txRollback.createQuery("SELECT 1").execute().join().getStatus().expectSuccess(); + txRollback.rollback().join().expectSuccess(); + } + + MetricData metric = findMetric("db.client.operation.duration"); + Assert.assertNotNull(metric); + + Assert.assertNotNull("No histogram point for ydb.Commit", + findHistogramPoint(metric, "ydb.Commit")); + Assert.assertNotNull("No histogram point for ydb.Rollback", + findHistogramPoint(metric, "ydb.Rollback")); + } + + @Test + public void failedOperationRecordsFailedCounter() { + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + session.createQuery("SELECT * FROM __nonexistent_table__", TxMode.NONE) + .execute().join(); + } + + MetricData metric = findMetric("ydb.client.operation.failed"); + Assert.assertNotNull("ydb.client.operation.failed metric not found", metric); + + Collection points = metric.getLongSumData().getPoints(); + Assert.assertFalse("Failed counter must have at least one point", points.isEmpty()); + long total = points.stream().mapToLong(LongPointData::getValue).sum(); + Assert.assertTrue("Failed counter must be > 0", total > 0); + } + + @Test + public void sessionPoolMetricsAreReported() { + // создаём сессию чтобы пул оживился + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + session.createQuery("SELECT 1", TxMode.NONE).execute().join().getStatus().expectSuccess(); + } + + MetricData metric = findMetric("ydb.query.session.count"); + Assert.assertNotNull("ydb.query.session.count metric not found", metric); + } + + // --- helpers --- + + private MetricData findMetric(String name) { + Collection metrics = metricReader.collectAllMetrics(); + for (MetricData m : metrics) { + if (name.equals(m.getName())) { + return m; + } + } + return null; + } + + private HistogramPointData findHistogramPoint(MetricData metric, String operationName) { + for (HistogramPointData point : metric.getHistogramData().getPoints()) { + String op = point.getAttributes().get(OPERATION_NAME); + if (operationName.equals(op)) { + return point; + } + } + return null; + } + + private static String extractHost(String endpoint) { + // endpoint вида grpc://host:port или host:port + String stripped = endpoint.replaceFirst("grpcs?://", ""); + int colon = stripped.lastIndexOf(':'); + return colon >= 0 ? stripped.substring(0, colon) : stripped; + } + + private static int extractPort(String endpoint) { + String stripped = endpoint.replaceFirst("grpcs?://", ""); + int colon = stripped.lastIndexOf(':'); + if (colon >= 0) { + try { + return Integer.parseInt(stripped.substring(colon + 1)); + } catch (NumberFormatException ignored) { + } + } + return 2135; + } +} diff --git a/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryQueryTracingIntegrationTest.java b/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryQueryTracingIntegrationTest.java new file mode 100644 index 000000000..f53528b97 --- /dev/null +++ b/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryQueryTracingIntegrationTest.java @@ -0,0 +1,206 @@ +package tech.ydb.opentelemetry; + +import java.time.Duration; +import java.util.List; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; + +import tech.ydb.auth.TokenAuthProvider; +import tech.ydb.common.transaction.TxMode; +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.query.QueryClient; +import tech.ydb.query.QuerySession; +import tech.ydb.query.QueryTransaction; +import tech.ydb.query.result.QueryInfo; +import tech.ydb.table.Session; +import tech.ydb.table.TableClient; +import tech.ydb.table.query.DataQueryResult; +import tech.ydb.table.query.Params; +import tech.ydb.table.transaction.TableTransaction; +import tech.ydb.test.junit4.YdbHelperRule; + +public class OpenTelemetryQueryTracingIntegrationTest { + @ClassRule + public static final YdbHelperRule YDB = new YdbHelperRule(); + + private static final AttributeKey DB_SYSTEM_NAME = AttributeKey.stringKey("db.system.name"); + private static final AttributeKey DB_NAMESPACE = AttributeKey.stringKey("db.namespace"); + private static final AttributeKey SERVER_ADDRESS = AttributeKey.stringKey("server.address"); + private static final AttributeKey SERVER_PORT = AttributeKey.longKey("server.port"); + + private static InMemorySpanExporter spanExporter; + private static SdkTracerProvider tracerProvider; + private static Tracer appTracer; + private static GrpcTransport transport; + + private QueryClient queryClient; + private TableClient tableClient; + + @BeforeClass + public static void initTransport() { + spanExporter = InMemorySpanExporter.create(); + tracerProvider = SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .build(); + OpenTelemetry openTelemetry = OpenTelemetrySdk.builder() + .setTracerProvider(tracerProvider) + .build(); + appTracer = openTelemetry.getTracer("test.app"); + transport = GrpcTransport.forEndpoint(YDB.endpoint(), YDB.database()) + .withAuthProvider(new TokenAuthProvider(YDB.authToken())) + .withTracer(OpenTelemetryTracer.fromOpenTelemetry(openTelemetry)) + .build(); + } + + @AfterClass + public static void closeTransport() { + transport.close(); + tracerProvider.close(); + spanExporter.close(); + } + + @Before + public void initClients() { + spanExporter.reset(); + queryClient = QueryClient.newClient(transport).build(); + tableClient = null; + } + + @After + public void closeClients() { + if (tableClient != null) { + tableClient.close(); + } + queryClient.close(); + } + + @Test + public void queryClientSpansHaveBaseAttributes() { + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + session.createQuery("SELECT 1", TxMode.NONE).execute().join().getStatus().expectSuccess(); + + Result txCommit = session.beginTransaction(TxMode.SERIALIZABLE_RW).join(); + txCommit.getStatus().expectSuccess(); + Result queryInCommitTx = txCommit.getValue().createQuery("SELECT 1").execute().join(); + queryInCommitTx.getStatus().expectSuccess(); + Result commitResult = txCommit.getValue().commit().join(); + commitResult.getStatus().expectSuccess(); + + Result txRollback = session.beginTransaction(TxMode.SERIALIZABLE_RW).join(); + txRollback.getStatus().expectSuccess(); + Result queryInRollbackTx = txRollback.getValue().createQuery("SELECT 1").execute().join(); + queryInRollbackTx.getStatus().expectSuccess(); + Status rollbackStatus = txRollback.getValue().rollback().join(); + rollbackStatus.expectSuccess(); + } + + assertSpanWithBaseAttributes("ydb.CreateSession"); + assertSpanWithBaseAttributes("ydb.ExecuteQuery"); + assertSpanWithBaseAttributes("ydb.Commit"); + assertSpanWithBaseAttributes("ydb.Rollback"); + Assert.assertEquals(1, countFinishedSpans("ydb.CreateSession")); + Assert.assertEquals(3, countFinishedSpans("ydb.ExecuteQuery")); + Assert.assertEquals(1, countFinishedSpans("ydb.Commit")); + Assert.assertEquals(1, countFinishedSpans("ydb.Rollback")); + } + + @Test + public void tableClientProxySpansHaveBaseAttributes() { + tableClient = QueryClient.newTableClient(transport).build(); + try (Session session = tableClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + TableTransaction txCommit = session.createNewTransaction(TxMode.SERIALIZABLE_RW); + Result queryInCommitTx = txCommit.executeDataQuery("SELECT 1", Params.empty()).join(); + queryInCommitTx.getStatus().expectSuccess(); + Status commitStatus = txCommit.commit().join(); + commitStatus.expectSuccess(); + + TableTransaction txRollback = session.createNewTransaction(TxMode.SERIALIZABLE_RW); + Result queryInRollbackTx = txRollback.executeDataQuery("SELECT 1", Params.empty()).join(); + queryInRollbackTx.getStatus().expectSuccess(); + Status rollbackStatus = txRollback.rollback().join(); + rollbackStatus.expectSuccess(); + } + + assertSpanWithBaseAttributes("ydb.CreateSession"); + assertSpanWithBaseAttributes("ydb.ExecuteQuery"); + assertSpanWithBaseAttributes("ydb.Commit"); + assertSpanWithBaseAttributes("ydb.Rollback"); + Assert.assertEquals(1, countFinishedSpans("ydb.CreateSession")); + Assert.assertEquals(2, countFinishedSpans("ydb.ExecuteQuery")); + Assert.assertEquals(1, countFinishedSpans("ydb.Commit")); + Assert.assertEquals(1, countFinishedSpans("ydb.Rollback")); + } + + @Test + public void sdkSpanIsChildOfApplicationSpan() { + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + io.opentelemetry.api.trace.Span appSpan = appTracer.spanBuilder("app.parent").startSpan(); + try (Scope ignored = appSpan.makeCurrent()) { + session.createQuery("SELECT 1", TxMode.NONE).execute().join().getStatus().expectSuccess(); + } finally { + appSpan.end(); + } + + List spans = spanExporter.getFinishedSpanItems(); + SpanData sdkSpan = null; + for (SpanData span : spans) { + if ("ydb.ExecuteQuery".equals(span.getName())) { + sdkSpan = span; + break; + } + } + + Assert.assertNotNull("SDK span not found", sdkSpan); + Assert.assertEquals(appSpan.getSpanContext().getTraceId(), sdkSpan.getTraceId()); + Assert.assertEquals(appSpan.getSpanContext().getSpanId(), sdkSpan.getParentSpanId()); + Assert.assertEquals(1, countFinishedSpans("app.parent")); + Assert.assertEquals(1, countFinishedSpans("ydb.ExecuteQuery")); + } + } + + private int countFinishedSpans(String spanName) { + int count = 0; + for (SpanData span : spanExporter.getFinishedSpanItems()) { + if (spanName.equals(span.getName())) { + count++; + } + } + return count; + } + + private void assertSpanWithBaseAttributes(String spanName) { + List spans = spanExporter.getFinishedSpanItems(); + for (SpanData span : spans) { + if (!spanName.equals(span.getName())) { + continue; + } + Assert.assertEquals(io.opentelemetry.api.trace.SpanKind.CLIENT, span.getKind()); + Assert.assertEquals("ydb", span.getAttributes().get(DB_SYSTEM_NAME)); + Assert.assertEquals(YDB.database(), span.getAttributes().get(DB_NAMESPACE)); + Assert.assertNotNull(span.getAttributes().get(SERVER_ADDRESS)); + Long port = span.getAttributes().get(SERVER_PORT); + Assert.assertNotNull(port); + Assert.assertTrue(port > 0L); + return; + } + Assert.fail("Span not found: " + spanName + ", finished spans: " + spans.size()); + } +} diff --git a/pom.xml b/pom.xml index 904fb7077..45cf77b46 100644 --- a/pom.xml +++ b/pom.xml @@ -28,6 +28,7 @@ tests/junit4-support tests/junit5-support auth-providers/oauth2-provider + opentelemetry diff --git a/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java b/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java index 66c2e0a6c..2f6c79c12 100644 --- a/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java +++ b/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java @@ -6,6 +6,7 @@ import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.metrics.Meter; import tech.ydb.core.operation.StatusExtractor; import tech.ydb.core.tracing.Span; import tech.ydb.core.tracing.SpanKind; @@ -51,10 +52,16 @@ class QueryServiceRpc { private final GrpcTransport transport; private final Tracer trace; + private final Meter meter; QueryServiceRpc(GrpcTransport transport) { this.transport = transport; this.trace = transport.getTracer(); + this.meter = transport.getMeter(); + } + + Meter getMeter() { + return meter; } Span startSpan(String spanName) { diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java index f4c5a4065..665615190 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java @@ -331,7 +331,9 @@ GrpcReadStream createGrpcStream( public QueryStream createQuery(String query, TxMode tx, Params prms, ExecuteQuerySettings settings) { YdbQuery.TransactionControl tc = TxControl.txModeCtrl(tx, true); Span span = startSpan("ydb.ExecuteQuery"); - return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span) { + long startNanos = System.nanoTime(); + + return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span, startNanos, "ydb.ExecuteQuery") { @Override void handleTxMeta(String txID) { if (txID != null && !txID.isEmpty()) { @@ -378,10 +380,18 @@ static CompletableFuture> createSession( abstract class StreamImpl implements QueryStream { private final GrpcReadStream grpcStream; private final Span span; - - StreamImpl(GrpcReadStream grpcStream, Span operationSpan) { + private final long startNanos; + private final String operationName; + + StreamImpl( + GrpcReadStream grpcStream, + @Nullable + Span operationSpan, long startNanos, String operationName + ) { this.grpcStream = grpcStream; this.span = operationSpan; + this.startNanos = startNanos; + this.operationName = operationName; } abstract void handleTxMeta(String txId); @@ -434,6 +444,10 @@ public CompletableFuture> execute(PartsHandler handler) { }).whenComplete(this::handleCompletion).thenApply(streamStatus -> { updateSessionState(streamStatus); Status status = operationStatus.orElse(streamStatus); + + long elapsed = System.nanoTime() - startNanos; + rpc.getMeter().recordOperation(operationName, elapsed, status); + if (status.isSuccess()) { return Result.success(new QueryInfo(stats.get()), streamStatus); } else { @@ -478,7 +492,9 @@ public QueryStream createQuery(String query, boolean commitAtEnd, Params prms, E : TxControl.txModeCtrl(txMode, commitAtEnd); Span span = startSpan("ydb.ExecuteQuery"); - return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span) { + long startNanos = System.nanoTime(); + + return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span, startNanos, "ydb.ExecuteQuery") { @Override void handleTxMeta(String txID) { String newId = txID == null || txID.isEmpty() ? null : txID; @@ -501,7 +517,7 @@ void handleCompletion(Status status, Throwable th) { currentStatusFuture.complete(Status .of(StatusCode.ABORTED) .withIssues(Issue.of("Query on transaction failed with status " - + status, Issue.Severity.ERROR))); + + status, Issue.Severity.ERROR))); } } @@ -518,6 +534,8 @@ public void cancel() { @Override public CompletableFuture> commit(CommitTransactionSettings settings) { Span span = startSpan("ydb.Commit"); + final long startNanos = System.nanoTime(); + CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>()); String transactionId = txId.get(); if (transactionId == null) { @@ -541,7 +559,13 @@ public CompletableFuture> commit(CommitTransactionSettings set } // TODO: CommitTransactionResponse must contain exec_stats return res.map(resp -> new QueryInfo(null)); - }).whenComplete(((status, th) -> { + }) + .whenComplete((status, th) -> { + long elapsed = System.nanoTime() - startNanos; + Status finalStatus = status != null ? status.getStatus() : Status.of(StatusCode.CLIENT_INTERNAL_ERROR); + rpc.getMeter().recordOperation("ydb.Commit", elapsed, finalStatus); + }) + .whenComplete(((status, th) -> { if (th != null) { currentStatusFuture.completeExceptionally( new RuntimeException("Transaction commit failed with exception", th)); @@ -552,6 +576,8 @@ public CompletableFuture> commit(CommitTransactionSettings set @Override public CompletableFuture rollback(RollbackTransactionSettings settings) { Span span = startSpan("ydb.Rollback"); + final long startNanos = System.nanoTime(); + CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>()); String transactionId = txId.get(); @@ -575,6 +601,10 @@ public CompletableFuture rollback(RollbackTransactionSettings settings) return result.getStatus(); }) .whenComplete((status, th) -> { + long elapsed = System.nanoTime() - startNanos; + Status finalStatus = status != null ? status : Status.of(StatusCode.CLIENT_INTERNAL_ERROR); + rpc.getMeter().recordOperation("ydb.Rollback", elapsed, finalStatus); + currentStatusFuture.complete(Status .of(StatusCode.ABORTED) .withIssues(Issue.of("Transaction was rolled back", Issue.Severity.ERROR))); diff --git a/query/src/main/java/tech/ydb/query/impl/SessionPool.java b/query/src/main/java/tech/ydb/query/impl/SessionPool.java index ccbbf4889..2385eb948 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionPool.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionPool.java @@ -22,6 +22,7 @@ import tech.ydb.core.StatusCode; import tech.ydb.core.UnexpectedResultException; import tech.ydb.core.grpc.GrpcReadStream; +import tech.ydb.core.metrics.SessionPoolObserver; import tech.ydb.core.tracing.Span; import tech.ydb.core.utils.FutureTools; import tech.ydb.proto.query.YdbQuery; @@ -78,6 +79,23 @@ class SessionPool implements AutoCloseable { minSize, maxSize, cleaner.periodMillis); + + rpc.getMeter().registerSessionPool("default", new SessionPoolObserver() { + @Override + public int getIdleCount() { + return queue.getIdleCount(); + } + + @Override + public int getUsedCount() { + return queue.getUsedCount(); + } + + @Override + public int getPendingCount() { + return queue.getPendingCount(); + } + }); } public void updateMaxSize(int maxSize) { @@ -277,8 +295,16 @@ public CompletableFuture create() { Context previous = ctx.attach(); try { Span createSpan = rpc.startSpan("ydb.CreateSession"); + long startNanos = System.nanoTime(); + stats.requested.increment(); return Span.endOnResult(createSpan, SessionImpl.createSession(rpc, CREATE_SETTINGS, true, createSpan)) + .whenComplete((status, th) -> { + long elapsed = System.nanoTime() - startNanos; + Status finalStatus = status != null ? status.getStatus() : Status.of(StatusCode.CLIENT_INTERNAL_ERROR); + rpc.getMeter().recordOperation("ydb.CreateSession", elapsed, finalStatus); + rpc.getMeter().recordSessionCreateTime("default", elapsed); + }) .thenCompose(r -> { if (!r.isSuccess()) { stats.failed.increment(); @@ -400,17 +426,17 @@ public long getDeletedTotal() { @Override public String toString() { return "SessionPoolStats{minSize=" + getMinSize() - + ", maxSize=" + getMaxSize() - + ", idleCount=" + getIdleCount() - + ", acquiredCount=" + getAcquiredCount() - + ", pendingAcquireCount=" + getPendingAcquireCount() - + ", acquiredTotal=" + getAcquiredTotal() - + ", releasedTotal=" + getReleasedTotal() - + ", requestsTotal=" + getRequestedTotal() - + ", createdTotal=" + getCreatedTotal() - + ", failedTotal=" + getFailedTotal() - + ", deletedTotal=" + getDeletedTotal() - + "}"; + + ", maxSize=" + getMaxSize() + + ", idleCount=" + getIdleCount() + + ", acquiredCount=" + getAcquiredCount() + + ", pendingAcquireCount=" + getPendingAcquireCount() + + ", acquiredTotal=" + getAcquiredTotal() + + ", releasedTotal=" + getReleasedTotal() + + ", requestsTotal=" + getRequestedTotal() + + ", createdTotal=" + getCreatedTotal() + + ", failedTotal=" + getFailedTotal() + + ", deletedTotal=" + getDeletedTotal() + + "}"; } } diff --git a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java index 8fde13bc8..39b4c585e 100644 --- a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java @@ -132,9 +132,10 @@ public CompletableFuture> executeDataQueryInternal( final List issues = new ArrayList<>(); final List results = new ArrayList<>(); Span span = querySession.startSpan("ydb.ExecuteQuery"); + final long startNanos = System.nanoTime(); QueryStream stream = querySession.new StreamImpl(querySession.createGrpcStream(query, tc, prms, qs, span), - span) { + span, startNanos, "ydb.ExecuteQuery") { @Override void handleTxMeta(String txID) { txRef.set(txID);