Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;

import javax.annotation.Nullable;

import io.grpc.Metadata;

import tech.ydb.core.impl.call.GrpcFlows;
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import io.grpc.MethodDescriptor;

import tech.ydb.core.Result;
import tech.ydb.core.metrics.NoopMeter;
import tech.ydb.core.metrics.Meter;
import tech.ydb.core.tracing.NoopTracer;
import tech.ydb.core.tracing.Tracer;
import tech.ydb.core.utils.URITools;
Expand Down Expand Up @@ -46,6 +48,14 @@ default Tracer getTracer() {
return NoopTracer.getInstance();
}

default Meter getMeter() {
return NoopMeter.INSTANCE;
}

default Meter getMeter() {
return NoopMeter.INSTANCE;
}

@Override
void close();

Expand Down
12 changes: 12 additions & 0 deletions core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import tech.ydb.core.impl.auth.GrpcAuthRpc;
import tech.ydb.core.impl.pool.ChannelFactoryLoader;
import tech.ydb.core.impl.pool.ManagedChannelFactory;
import tech.ydb.core.metrics.NoopMeter;
import tech.ydb.core.metrics.Meter;
import tech.ydb.core.tracing.NoopTracer;
import tech.ydb.core.tracing.Tracer;
import tech.ydb.core.utils.Version;
Expand Down Expand Up @@ -87,6 +89,7 @@ public enum InitMode {
private GrpcCompression compression = GrpcCompression.NO_COMPRESSION;
private InitMode initMode = InitMode.SYNC;
private Tracer tracer = NoopTracer.getInstance();
private Meter meter = NoopMeter.getInstance();

/**
* can cause leaks https://github.com/grpc/grpc-java/issues/9340
Expand Down Expand Up @@ -195,6 +198,10 @@ public Tracer getTracer() {
return tracer;
}

public Meter getMeter() {
return meter;
}

public ManagedChannelFactory getManagedChannelFactory() {
if (channelFactoryBuilder == null) {
channelFactoryBuilder = ChannelFactoryLoader.load();
Expand Down Expand Up @@ -423,6 +430,11 @@ public GrpcTransportBuilder withTracer(Tracer tracer) {
return this;
}

public GrpcTransportBuilder withMeter(Meter meter) {
this.meter = Objects.requireNonNull(meter, "meter is null");
return this;
}

/**
* use {@link GrpcTransportBuilder#withGrpcRetry(boolean) } instead
* @return this
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/java/tech/ydb/core/impl/YdbTransportImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import tech.ydb.core.impl.pool.GrpcChannel;
import tech.ydb.core.impl.pool.GrpcChannelPool;
import tech.ydb.core.impl.pool.ManagedChannelFactory;
import tech.ydb.core.metrics.Meter;
import tech.ydb.core.tracing.Tracer;

/**
Expand All @@ -37,6 +38,7 @@ public class YdbTransportImpl extends BaseGrpcTransport {
private final GrpcChannelPool channelPool;
private final YdbDiscovery discovery;
private final Tracer tracer;
private final Meter meter;

public YdbTransportImpl(GrpcTransportBuilder builder) {
super(builder);
Expand All @@ -45,6 +47,7 @@ public YdbTransportImpl(GrpcTransportBuilder builder) {

this.database = Strings.nullToEmpty(builder.getDatabase());
this.tracer = builder.getTracer();
this.meter = builder.getMeter();

logger.info("Create YDB transport with endpoint {} and {}", serverEndpoint, balancingSettings);

Expand Down Expand Up @@ -124,6 +127,11 @@ public Tracer getTracer() {
return tracer;
}

@Override
public Meter getMeter() {
return meter;
}

@Override
public AuthCallOptions getAuthCallOptions() {
return callOptions;
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/java/tech/ydb/core/metrics/Meter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package tech.ydb.core.metrics;

import tech.ydb.core.Status;

public interface Meter {
void recordOperation(String name, long durationNanos, Status status);
void registerSessionPool(String poolName, SessionPoolObserver observer);
void recordSessionCreateTime(String poolName, long durationNanos);
}
30 changes: 30 additions & 0 deletions core/src/main/java/tech/ydb/core/metrics/NoopMeter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package tech.ydb.core.metrics;

import tech.ydb.core.Status;

public final class NoopMeter implements Meter {
public static final NoopMeter INSTANCE = new NoopMeter();

private NoopMeter() {
// No operations.
}

public static NoopMeter getInstance() {
return INSTANCE;
}

@Override
public void recordOperation(String name, long durationNanos, Status status) {

}

@Override
public void registerSessionPool(String poolName, SessionPoolObserver observer) {

}

@Override
public void recordSessionCreateTime(String poolName, long durationNanos) {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package tech.ydb.core.metrics;

public interface SessionPoolObserver {
int getIdleCount();
int getUsedCount();
int getPendingCount();
}
106 changes: 12 additions & 94 deletions core/src/main/java/tech/ydb/core/tracing/Span.java
Original file line number Diff line number Diff line change
@@ -1,127 +1,45 @@
package tech.ydb.core.tracing;

import java.util.concurrent.CompletableFuture;

import javax.annotation.Nullable;

import io.grpc.ExperimentalApi;

import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.utils.FutureTools;

/**
* A span represents a timed operation.
*/
@ExperimentalApi("YDB Tracer is experimental and API may change without notice")
public interface Span {
Span NOOP = new Span() {
};

/**
* Returns W3C traceparent value for request propagation.
*
* <p>For {@link #NOOP} this returns an empty string. Check {@link #isValid()} to decide whether
* trace headers should be sent to server.
*
* @return traceparent value
*/
default String getId() {
return "";
}
String getId();

/**
* Indicates whether this span carries a real tracing context.
*
* @return true for real spans, false for noop span
*/
default boolean isValid() {
return false;
}

/**
* Sets a string attribute on the span.
* Sets a string attribute on the span (ignored by Noop implementation).
*
* @param key attribute key
* @param value attribute value, may be null
*/
default void setAttribute(String key, @Nullable String value) {
}
void setAttribute(String key, @Nullable String value);

/**
* Sets a long attribute on the span.
* Sets a long attribute on the span (ignored by Noop implementation).
*
* @param key attribute key
* @param value attribute value
*/
default void setAttribute(String key, long value) {
}
void setAttribute(String key, long value);

/**
* Sets span status (success or error) with human-readable message.
* Sets span status to error with human-readable message.
*
* @param status operation status used to map span attributes
* @param error operation exception used to map span attributes
* @param status operation status used to map error attributes
*/
default void setStatus(@Nullable Status status, @Nullable Throwable error) {
}
void setError(Status status);

/**
* Makes this span current in the active execution context.
* Sets span status to error from exception.
*
* @return closeable scope handle
* @param error exception used to map error attributes
*/
default Scope makeCurrent() {
return () -> {
};
}
void setError(Throwable error);

/**
* Restores context captured when this span was created.
*
* <p>The returned scope must be closed, usually via try-with-resources.
*
* @return closeable scope handle for restored context
*/
default Scope restoreContext() {
return () -> {
};
}

/**
* Ends (finishes) this span.
*/
default void end() {
}

/**
* Subscribes to a {@link Status} future: when it completes, sets status/error and ends the span.
* For non-valid spans returns the future as-is without subscribing.
*
* @param span the span to finalize
* @param future the future to observe
* @return the same future (for chaining)
*/
static CompletableFuture<Status> endOnStatus(Span span, CompletableFuture<Status> future) {
return span.isValid() ? future.whenComplete((status, th) -> {
span.setStatus(status, FutureTools.unwrapCompletionException(th));
span.end();
}) : future;
}

/**
* Subscribes to a {@link Result} future: when it completes, sets status/error and ends the span.
* For non-valid spans returns the future as-is without subscribing.
*
* @param <T> result value type
* @param span the span to finalize
* @param future the future to observe
* @return the same future (for chaining)
*/
static <T> CompletableFuture<Result<T>> endOnResult(Span span, CompletableFuture<Result<T>> future) {
return span.isValid() ? future.whenComplete((result, th) -> {
span.setStatus(result != null ? result.getStatus() : null, FutureTools.unwrapCompletionException(th));
span.end();
}) : future;
}
void end();
}
52 changes: 52 additions & 0 deletions core/src/main/java/tech/ydb/core/tracing/SpanFinalizer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package tech.ydb.core.tracing;

import java.util.function.BiConsumer;

import tech.ydb.core.Status;
import tech.ydb.core.utils.FutureTools;

/**
* Shared helpers to finish spans for status/exception outcomes.
*/
public final class SpanFinalizer {
private SpanFinalizer() {
}

public static void finishByStatus(Span span, Status status) {
if (span == null) {
return;
}

if (status != null && !status.isSuccess()) {
span.setError(status);
}

span.end();
}

public static void finishByError(Span span, Throwable error) {
if (span == null) {
return;
}

if (error != null) {
span.setError(FutureTools.unwrapCompletionException(error));
}

span.end();
}

public static BiConsumer<Status, Throwable> whenComplete(Span span) {
return (status, error) -> {
if (span == null) {
return;
}

if (error != null) {
finishByError(span, error);
return;
}
finishByStatus(span, status);
};
}
}
Loading
Loading