From 1ad129d415fa423a446a2e9166686ecbaf52a9dc Mon Sep 17 00:00:00 2001 From: Maria Okorochkova Date: Fri, 24 Apr 2026 11:57:13 +0000 Subject: [PATCH 01/13] Add OpenTelemetry lib and metric-interfaces (#37957) --- .github/last_commit.txt | 2 +- CHANGELOG.md | 2 - include/ydb-cpp-sdk/client/driver/driver.h | 8 + include/ydb-cpp-sdk/client/metrics/metrics.h | 57 +++ include/ydb-cpp-sdk/client/trace/trace.h | 39 ++ include/ydb-cpp-sdk/open_telemetry/metrics.h | 19 + include/ydb-cpp-sdk/open_telemetry/trace.h | 19 + plugins/metrics/otel/metrics.cpp | 171 +++++++++ plugins/trace/otel/trace.cpp | 97 +++++ src/client/driver/driver.cpp | 16 + src/client/impl/internal/common/log_lazy.h | 10 + .../impl/internal/db_driver_state/state.cpp | 2 +- .../grpc_connections/grpc_connections.cpp | 10 + .../grpc_connections/grpc_connections.h | 13 + .../impl/internal/grpc_connections/params.h | 10 + .../impl/internal/internal_client/client.h | 5 + src/client/impl/observability/metrics.cpp | 74 ++++ src/client/impl/observability/metrics.h | 31 ++ src/client/impl/observability/observation.cpp | 35 ++ src/client/impl/observability/observation.h | 31 ++ src/client/impl/observability/span.cpp | 154 ++++++++ src/client/impl/observability/span.h | 35 ++ src/client/impl/stats/stats.h | 142 ++++++- src/client/metrics/metrics.cpp | 1 + src/client/query/client.cpp | 79 +++- src/client/table/impl/table_client.cpp | 54 ++- src/client/table/impl/table_client.h | 21 +- src/client/trace/trace.cpp | 1 + tests/common/fake_metric_registry.h | 135 +++++++ tests/integration/metrics/main.cpp | 313 +++++++++++++++ .../unit/client/observability/metrics_ut.cpp | 355 ++++++++++++++++++ 31 files changed, 1916 insertions(+), 25 deletions(-) create mode 100644 include/ydb-cpp-sdk/client/metrics/metrics.h create mode 100644 include/ydb-cpp-sdk/client/trace/trace.h create mode 100644 include/ydb-cpp-sdk/open_telemetry/metrics.h create mode 100644 include/ydb-cpp-sdk/open_telemetry/trace.h create mode 100644 plugins/metrics/otel/metrics.cpp create mode 100644 plugins/trace/otel/trace.cpp create mode 100644 src/client/impl/internal/common/log_lazy.h create mode 100644 src/client/impl/observability/metrics.cpp create mode 100644 src/client/impl/observability/metrics.h create mode 100644 src/client/impl/observability/observation.cpp create mode 100644 src/client/impl/observability/observation.h create mode 100644 src/client/impl/observability/span.cpp create mode 100644 src/client/impl/observability/span.h create mode 100644 src/client/metrics/metrics.cpp create mode 100644 src/client/trace/trace.cpp create mode 100644 tests/common/fake_metric_registry.h create mode 100644 tests/integration/metrics/main.cpp create mode 100644 tests/unit/client/observability/metrics_ut.cpp diff --git a/.github/last_commit.txt b/.github/last_commit.txt index de305a4bb62..6e5c83dbadc 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -bc2335edd99f161c2f65b36772768babcd9e687c +af5b7b9d6af94c4dd153ffd112d9119bc6ad568f diff --git a/CHANGELOG.md b/CHANGELOG.md index 02797cbad38..91bf0c16430 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,3 @@ -## v3.17.0 - * Added support of describe for scheme objects with type 'secret' via new TSecretClient * Added support for METRICS_LEVEL for the CreateTable/AlterTable requests. diff --git a/include/ydb-cpp-sdk/client/driver/driver.h b/include/ydb-cpp-sdk/client/driver/driver.h index b6864dcf809..0f1ac3dfca4 100644 --- a/include/ydb-cpp-sdk/client/driver/driver.h +++ b/include/ydb-cpp-sdk/client/driver/driver.h @@ -3,6 +3,8 @@ #include "fwd.h" #include +#include +#include #include #include #include @@ -166,6 +168,12 @@ class TDriverConfig { //! If not set, default executor will be used. TDriverConfig& SetExecutor(std::shared_ptr executor); + //! Set external metrics registry implementation. + TDriverConfig& SetMetricRegistry(std::shared_ptr registry); + + //! Set external trace provider implementation. + TDriverConfig& SetTraceProvider(std::shared_ptr provider); + private: class TImpl; std::shared_ptr Impl_; diff --git a/include/ydb-cpp-sdk/client/metrics/metrics.h b/include/ydb-cpp-sdk/client/metrics/metrics.h new file mode 100644 index 00000000000..5faa930ed50 --- /dev/null +++ b/include/ydb-cpp-sdk/client/metrics/metrics.h @@ -0,0 +1,57 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace NYdb::inline V3::NMetrics { + +using TLabels = std::map; + +class ICounter { +public: + virtual ~ICounter() = default; + virtual void Inc() = 0; +}; + +class IGauge { +public: + virtual ~IGauge() = default; + virtual void Add(double delta) = 0; + virtual void Set(double value) = 0; +}; + +class IHistogram { +public: + virtual ~IHistogram() = default; + virtual void Record(double value) = 0; +}; + +class IMetricRegistry { +public: + virtual ~IMetricRegistry() = default; + + virtual std::shared_ptr Counter( + const std::string& name, + const TLabels& labels = {}, + const std::string& description = {}, + const std::string& unit = {} + ) = 0; + virtual std::shared_ptr Gauge( + const std::string& name, + const TLabels& labels = {}, + const std::string& description = {}, + const std::string& unit = {} + ) = 0; + virtual std::shared_ptr Histogram( + const std::string& name, + const std::vector& buckets, + const TLabels& labels = {}, + const std::string& description = {}, + const std::string& unit = {} + ) = 0; +}; + +} // namespace NYdb::NMetrics diff --git a/include/ydb-cpp-sdk/client/trace/trace.h b/include/ydb-cpp-sdk/client/trace/trace.h new file mode 100644 index 00000000000..117f4220b39 --- /dev/null +++ b/include/ydb-cpp-sdk/client/trace/trace.h @@ -0,0 +1,39 @@ +#pragma once + +#include +#include +#include +#include + +namespace NYdb::inline V3::NTrace { + +enum class ESpanKind { + INTERNAL, + SERVER, + CLIENT, + PRODUCER, + CONSUMER +}; + +class ISpan { +public: + virtual ~ISpan() = default; + virtual void End() = 0; + virtual void SetAttribute(const std::string& key, const std::string& value) = 0; + virtual void SetAttribute(const std::string& key, int64_t value) = 0; + virtual void AddEvent(const std::string& name, const std::map& attributes = {}) = 0; +}; + +class ITracer { +public: + virtual ~ITracer() = default; + virtual std::shared_ptr StartSpan(const std::string& name, ESpanKind kind = ESpanKind::INTERNAL) = 0; +}; + +class ITraceProvider { +public: + virtual ~ITraceProvider() = default; + virtual std::shared_ptr GetTracer(const std::string& name) = 0; +}; + +} // namespace NYdb::NTrace diff --git a/include/ydb-cpp-sdk/open_telemetry/metrics.h b/include/ydb-cpp-sdk/open_telemetry/metrics.h new file mode 100644 index 00000000000..e02936cffc8 --- /dev/null +++ b/include/ydb-cpp-sdk/open_telemetry/metrics.h @@ -0,0 +1,19 @@ +#pragma once + +#include + +#include +#include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace metrics { +class MeterProvider; +} +OPENTELEMETRY_END_NAMESPACE + +namespace NYdb::inline V3::NMetrics { + +std::shared_ptr CreateOtelMetricRegistry( + opentelemetry::nostd::shared_ptr meterProvider); + +} // namespace NYdb::NMetrics diff --git a/include/ydb-cpp-sdk/open_telemetry/trace.h b/include/ydb-cpp-sdk/open_telemetry/trace.h new file mode 100644 index 00000000000..0c891ff9ecc --- /dev/null +++ b/include/ydb-cpp-sdk/open_telemetry/trace.h @@ -0,0 +1,19 @@ +#pragma once + +#include + +#include +#include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace trace { +class TracerProvider; +} +OPENTELEMETRY_END_NAMESPACE + +namespace NYdb::inline V3::NTrace { + +std::shared_ptr CreateOtelTraceProvider( + opentelemetry::nostd::shared_ptr tracerProvider); + +} // namespace NYdb::NTrace diff --git a/plugins/metrics/otel/metrics.cpp b/plugins/metrics/otel/metrics.cpp new file mode 100644 index 00000000000..22bfa284107 --- /dev/null +++ b/plugins/metrics/otel/metrics.cpp @@ -0,0 +1,171 @@ +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace NYdb::inline V3::NMetrics { + +namespace { + +using namespace opentelemetry; + +common::KeyValueIterableView MakeAttributes(const TLabels& labels) { + return common::KeyValueIterableView(labels); +} + +class TOtelCounter : public ICounter { +public: + TOtelCounter(nostd::shared_ptr> counter, const TLabels& labels) + : Counter_(std::move(counter)) + , Labels_(labels) + {} + + void Inc() override { + Counter_->Add(1, MakeAttributes(Labels_), context::RuntimeContext::GetCurrent()); + } + +private: + nostd::shared_ptr> Counter_; + TLabels Labels_; +}; + +class TOtelUpDownCounterGauge : public IGauge { +public: + TOtelUpDownCounterGauge(nostd::shared_ptr> counter, const TLabels& labels) + : Counter_(std::move(counter)) + , Labels_(labels) + {} + + void Add(double delta) override { + Counter_->Add(delta, MakeAttributes(Labels_), context::RuntimeContext::GetCurrent()); + Value_ += delta; + } + + void Set(double value) override { + Counter_->Add(value - Value_, MakeAttributes(Labels_), context::RuntimeContext::GetCurrent()); + Value_ = value; + } + +private: + nostd::shared_ptr> Counter_; + TLabels Labels_; + double Value_ = 0; +}; + +class TOtelHistogram : public IHistogram { +public: + TOtelHistogram(nostd::shared_ptr> histogram, const TLabels& labels) + : Histogram_(std::move(histogram)) + , Labels_(labels) + {} + + void Record(double value) override { + Histogram_->Record(value, MakeAttributes(Labels_), context::RuntimeContext::GetCurrent()); + } + +private: + nostd::shared_ptr> Histogram_; + TLabels Labels_; +}; + +class TOtelMetricRegistry : public IMetricRegistry { +public: + TOtelMetricRegistry(nostd::shared_ptr meterProvider) + : MeterProvider_(std::move(meterProvider)) + , Meter_(MeterProvider_->GetMeter("ydb-cpp-sdk", GetSdkSemver())) + {} + + std::shared_ptr Counter(const std::string& name + , const TLabels& labels + , const std::string& description + , const std::string& unit + ) override { + auto counter = Meter_->CreateUInt64Counter(name, description, unit); + return std::make_shared(std::move(counter), labels); + } + + std::shared_ptr Gauge(const std::string& name + , const TLabels& labels + , const std::string& description + , const std::string& unit + ) override { + auto counter = Meter_->CreateDoubleUpDownCounter(name, description, unit); + return std::make_shared(std::move(counter), labels); + } + + std::shared_ptr Histogram(const std::string& name + , const std::vector& buckets + , const TLabels& labels + , const std::string& description + , const std::string& unit + ) override { + ConfigureHistogramBuckets(name, unit, buckets); + auto histogram = Meter_->CreateDoubleHistogram(name, description, unit); + return std::make_shared(std::move(histogram), labels); + } + +private: + void ConfigureHistogramBuckets(const std::string& name, const std::string& unit, const std::vector& buckets) { + if (buckets.empty()) { + return; + } + + auto* sdkProvider = dynamic_cast(MeterProvider_.get()); + if (!sdkProvider) { + return; + } + + { + std::lock_guard lock(HistogramViewsLock_); + if (!HistogramViews_.insert(name).second) { + return; + } + } + + auto selector = std::make_unique( + sdk::metrics::InstrumentType::kHistogram, + name, + unit + ); + auto meterSelector = std::make_unique( + "ydb-cpp-sdk", + GetSdkSemver(), + std::string{} + ); + + auto histogramConfig = std::make_shared(); + histogramConfig->boundaries_ = buckets; + + auto view = std::make_unique( + std::string{}, + std::string{}, + sdk::metrics::AggregationType::kHistogram, + histogramConfig + ); + + sdkProvider->AddView(std::move(selector), std::move(meterSelector), std::move(view)); + } + + nostd::shared_ptr MeterProvider_; + nostd::shared_ptr Meter_; + std::mutex HistogramViewsLock_; + std::unordered_set HistogramViews_; +}; + +} // namespace + +std::shared_ptr CreateOtelMetricRegistry( + opentelemetry::nostd::shared_ptr meterProvider) +{ + return std::make_shared(std::move(meterProvider)); +} + +} // namespace NYdb::NMetrics diff --git a/plugins/trace/otel/trace.cpp b/plugins/trace/otel/trace.cpp new file mode 100644 index 00000000000..41b1df64793 --- /dev/null +++ b/plugins/trace/otel/trace.cpp @@ -0,0 +1,97 @@ +#include + +#include +#include +#include + +namespace NYdb::inline V3::NTrace { + +namespace { + +using namespace opentelemetry; + +trace::SpanKind MapSpanKind(ESpanKind kind) { + switch (kind) { + case ESpanKind::INTERNAL: return trace::SpanKind::kInternal; + case ESpanKind::SERVER: return trace::SpanKind::kServer; + case ESpanKind::CLIENT: return trace::SpanKind::kClient; + case ESpanKind::PRODUCER: return trace::SpanKind::kProducer; + case ESpanKind::CONSUMER: return trace::SpanKind::kConsumer; + } + return trace::SpanKind::kInternal; +} + +class TOtelSpan : public ISpan { +public: + TOtelSpan(nostd::shared_ptr span) + : Span_(std::move(span)) + {} + + void End() override { + Span_->End(); + } + + void SetAttribute(const std::string& key, const std::string& value) override { + Span_->SetAttribute(key, value); + } + + void SetAttribute(const std::string& key, int64_t value) override { + Span_->SetAttribute(key, value); + } + + void AddEvent(const std::string& name, const std::map& attributes) override { + if (attributes.empty()) { + Span_->AddEvent(name); + } else { + std::vector> attrs; + attrs.reserve(attributes.size()); + for (const auto& [k, v] : attributes) { + attrs.emplace_back(nostd::string_view(k), common::AttributeValue(nostd::string_view(v))); + } + Span_->AddEvent(name, attrs); + } + } + +private: + nostd::shared_ptr Span_; +}; + +class TOtelTracer : public ITracer { +public: + TOtelTracer(nostd::shared_ptr tracer) + : Tracer_(std::move(tracer)) + {} + + std::shared_ptr StartSpan(const std::string& name, ESpanKind kind) override { + trace::StartSpanOptions options; + options.kind = MapSpanKind(kind); + return std::make_shared(Tracer_->StartSpan(name, options)); + } + +private: + nostd::shared_ptr Tracer_; +}; + +class TOtelTraceProvider : public ITraceProvider { +public: + TOtelTraceProvider(nostd::shared_ptr tracerProvider) + : TracerProvider_(std::move(tracerProvider)) + {} + + std::shared_ptr GetTracer(const std::string& name) override { + return std::make_shared(TracerProvider_->GetTracer(name)); + } + +private: + nostd::shared_ptr TracerProvider_; +}; + +} // namespace + +std::shared_ptr CreateOtelTraceProvider( + opentelemetry::nostd::shared_ptr tracerProvider) +{ + return std::make_shared(std::move(tracerProvider)); +} + +} // namespace NYdb::NTrace diff --git a/src/client/driver/driver.cpp b/src/client/driver/driver.cpp index 84f6188e819..97022547244 100644 --- a/src/client/driver/driver.cpp +++ b/src/client/driver/driver.cpp @@ -53,6 +53,8 @@ class TDriverConfig::TImpl : public IConnectionsParams { uint64_t GetMaxMessageSize() const override { return MaxMessageSize; } const TLog& GetLog() const override { return Log; } std::shared_ptr GetExecutor() const override { return Executor; } + std::shared_ptr GetExternalMetricRegistry() const override { return MetricRegistry; } + std::shared_ptr GetTraceProvider() const override { return TraceProvider; } std::string Endpoint; size_t NetworkThreadsNum = 2; @@ -84,6 +86,8 @@ class TDriverConfig::TImpl : public IConnectionsParams { uint64_t MaxMessageSize = 0; TLog Log; // Null by default. std::shared_ptr Executor; + std::shared_ptr MetricRegistry; + std::shared_ptr TraceProvider; }; TDriverConfig::TDriverConfig(const std::string& connectionString) @@ -243,6 +247,16 @@ TDriverConfig& TDriverConfig::SetExecutor(std::shared_ptr executor) { return *this; } +TDriverConfig& TDriverConfig::SetMetricRegistry(std::shared_ptr registry) { + Impl_->MetricRegistry = std::move(registry); + return *this; +} + +TDriverConfig& TDriverConfig::SetTraceProvider(std::shared_ptr provider) { + Impl_->TraceProvider = std::move(provider); + return *this; +} + //////////////////////////////////////////////////////////////////////////////// std::shared_ptr CreateInternalInterface(const TDriver connection) { @@ -296,6 +310,8 @@ TDriverConfig TDriver::GetConfig() const { config.SetMaxOutboundMessageSize(Impl_->MaxOutboundMessageSize_); config.SetMaxMessageSize(Impl_->MaxMessageSize_); config.Impl_->Log = Impl_->Log; + config.SetMetricRegistry(Impl_->GetExternalMetricRegistry()); + config.SetTraceProvider(Impl_->GetTraceProvider()); return config; } diff --git a/src/client/impl/internal/common/log_lazy.h b/src/client/impl/internal/common/log_lazy.h new file mode 100644 index 00000000000..0635ef2cc89 --- /dev/null +++ b/src/client/impl/internal/common/log_lazy.h @@ -0,0 +1,10 @@ +#pragma once + +#ifdef LOG_LAZY +#error log macro redefinition +#endif + +#define LOG_LAZY(log, priority, message) \ + if (log.IsOpen() && log.FiltrationLevel() >= priority) { \ + log.Write(priority, message); \ + } diff --git a/src/client/impl/internal/db_driver_state/state.cpp b/src/client/impl/internal/db_driver_state/state.cpp index 9a836fc527a..e41a47869fc 100644 --- a/src/client/impl/internal/db_driver_state/state.cpp +++ b/src/client/impl/internal/db_driver_state/state.cpp @@ -44,7 +44,7 @@ TDbDriverState::TDbDriverState( auto self = shared_from_this(); return client->GetEndpoints(self); }, client) - , StatCollector(database, client->GetMetricRegistry()) + , StatCollector(database, client->GetMetricRegistry(), client->GetExternalMetricRegistry()) , Log(Client->GetLog()) , DiscoveryCompletedPromise(NThreading::NewPromise()) { diff --git a/src/client/impl/internal/grpc_connections/grpc_connections.cpp b/src/client/impl/internal/grpc_connections/grpc_connections.cpp index cd74acc19af..134691ed234 100644 --- a/src/client/impl/internal/grpc_connections/grpc_connections.cpp +++ b/src/client/impl/internal/grpc_connections/grpc_connections.cpp @@ -169,6 +169,8 @@ TGRpcConnectionsImpl::TGRpcConnectionsImpl(std::shared_ptr p #ifndef YDB_GRPC_BYPASS_CHANNEL_POOL , ChannelPool_(TcpKeepAliveSettings_, params->GetSocketIdleTimeout(), TcpNoDelay_) #endif + , MetricRegistry_(params->GetExternalMetricRegistry()) + , TraceProvider_(params->GetTraceProvider()) , NetworkThreadsNum_(params->GetNetworkThreadsNum()) , UsePerChannelTcpConnection_(params->GetUsePerChannelTcpConnection()) , GRpcClientLow_(NetworkThreadsNum_) @@ -437,6 +439,14 @@ void TGRpcConnectionsImpl::RegisterExtensionApi(IExtensionApi* api) { ExtensionApis_.emplace_back(api); } +std::shared_ptr TGRpcConnectionsImpl::GetExternalMetricRegistry() const { + return MetricRegistry_; +} + +std::shared_ptr TGRpcConnectionsImpl::GetTraceProvider() const { + return TraceProvider_; +} + void TGRpcConnectionsImpl::SetDiscoveryMutator(IDiscoveryMutatorApi::TMutatorCb&& cb) { std::lock_guard lock(ExtensionsLock_); DiscoveryMutatorCb = std::move(cb); diff --git a/src/client/impl/internal/grpc_connections/grpc_connections.h b/src/client/impl/internal/grpc_connections/grpc_connections.h index 6e73fcba559..9f0859d5076 100644 --- a/src/client/impl/internal/grpc_connections/grpc_connections.h +++ b/src/client/impl/internal/grpc_connections/grpc_connections.h @@ -18,6 +18,14 @@ namespace NYdb::inline V3 { +namespace NMetrics { + class IMetricRegistry; +} // namespace NMetrics + +namespace NTrace { + class ITraceProvider; +} // namespace NTrace + constexpr TDeadline::Duration GRPC_KEEP_ALIVE_TIMEOUT_FOR_DISCOVERY = std::chrono::seconds(10); constexpr TDeadline::Duration INITIAL_DEFERRED_CALL_DELAY = std::chrono::milliseconds(10); // The delay before first deferred service call constexpr TDeadline::Duration GET_ENDPOINTS_TIMEOUT = std::chrono::seconds(10); // Time wait for ListEndpoints request, after this time we pass error to client @@ -581,6 +589,9 @@ class TGRpcConnectionsImpl ::NMonitoring::TMetricRegistry* GetMetricRegistry() override; void RegisterExtension(IExtension* extension); void RegisterExtensionApi(IExtensionApi* api); + std::shared_ptr GetExternalMetricRegistry() const override; + std::shared_ptr GetTraceProvider() const; + void SetDiscoveryMutator(IDiscoveryMutatorApi::TMutatorCb&& cb); const TLog& GetLog() const override; @@ -716,6 +727,8 @@ class TGRpcConnectionsImpl std::vector> Extensions_; std::vector> ExtensionApis_; + std::shared_ptr MetricRegistry_; + std::shared_ptr TraceProvider_; IDiscoveryMutatorApi::TMutatorCb DiscoveryMutatorCb; diff --git a/src/client/impl/internal/grpc_connections/params.h b/src/client/impl/internal/grpc_connections/params.h index 1112bfdf533..11e41ffad6a 100644 --- a/src/client/impl/internal/grpc_connections/params.h +++ b/src/client/impl/internal/grpc_connections/params.h @@ -11,6 +11,14 @@ namespace NYdb::inline V3 { +namespace NMetrics { + class IMetricRegistry; +} // namespace NMetrics + +namespace NTrace { + class ITraceProvider; +} // namespace NTrace + class IConnectionsParams { public: virtual ~IConnectionsParams() = default; @@ -38,6 +46,8 @@ class IConnectionsParams { virtual uint64_t GetMaxOutboundMessageSize() const = 0; virtual uint64_t GetMaxMessageSize() const = 0; virtual std::shared_ptr GetExecutor() const = 0; + virtual std::shared_ptr GetExternalMetricRegistry() const = 0; + virtual std::shared_ptr GetTraceProvider() const = 0; }; } // namespace NYdb diff --git a/src/client/impl/internal/internal_client/client.h b/src/client/impl/internal/internal_client/client.h index 3e52f984480..406a8b7103c 100644 --- a/src/client/impl/internal/internal_client/client.h +++ b/src/client/impl/internal/internal_client/client.h @@ -14,6 +14,10 @@ namespace NMonitoring { class TMetricRegistry; } +namespace NYdb::inline V3::NMetrics { + class IMetricRegistry; +} + namespace NYdb::inline V3 { class TDbDriverState; @@ -29,6 +33,7 @@ class IInternalClient { virtual TBalancingPolicy::TImpl GetBalancingSettings() const = 0; virtual bool StartStatCollecting(::NMonitoring::IMetricRegistry* sensorsRegistry) = 0; virtual ::NMonitoring::TMetricRegistry* GetMetricRegistry() = 0; + virtual std::shared_ptr GetExternalMetricRegistry() const = 0; virtual const TLog& GetLog() const = 0; }; diff --git a/src/client/impl/observability/metrics.cpp b/src/client/impl/observability/metrics.cpp new file mode 100644 index 00000000000..01a96f2cc58 --- /dev/null +++ b/src/client/impl/observability/metrics.cpp @@ -0,0 +1,74 @@ +#include "metrics.h" + +#include + +#include + +namespace NYdb::inline V3::NObservability { + +namespace { + +void SafeLogRequestMetricsError(TLog& log, const char* message, std::exception_ptr exception) noexcept { + try { + if (!exception) { + LOG_LAZY(log, TLOG_ERR, std::string("TRequestMetrics: ") + message + ": (no active exception)"); + return; + } + try { + std::rethrow_exception(exception); + } catch (const std::exception& e) { + LOG_LAZY(log, TLOG_ERR, std::string("TRequestMetrics: ") + message + ": " + e.what()); + return; + } catch (...) { + } + LOG_LAZY(log, TLOG_ERR, std::string("TRequestMetrics: ") + message + ": (unknown)"); + } catch (...) { + } +} + +} // namespace + +TRequestMetrics::TRequestMetrics(NSdkStats::TStatCollector::TClientOperationStatCollector* operationCollector + , const std::string& requestName + , const TLog& log +) : Collector_(operationCollector) + , RequestName_(requestName) + , Log_(log) +{ + if (!Collector_) { + return; + } + try { + Collector_->IncRequestCount(requestName); + StartTime_ = std::chrono::steady_clock::now(); + } catch (...) { + SafeLogRequestMetricsError(Log_, "failed to initialize metrics", std::current_exception()); + Collector_ = nullptr; + } +} + +TRequestMetrics::~TRequestMetrics() noexcept { + End(EStatus::CLIENT_INTERNAL_ERROR); +} + +void TRequestMetrics::End(EStatus status) noexcept { + if (Ended_) { + return; + } + Ended_ = true; + + if (!Collector_) { + return; + } + + try { + auto elapsed = std::chrono::steady_clock::now() - StartTime_; + double durationSec = std::chrono::duration(elapsed).count(); + Collector_->RecordLatency(RequestName_, durationSec, status); + Collector_->IncErrorCount(RequestName_, status); + } catch (...) { + SafeLogRequestMetricsError(Log_, "failed to record metrics", std::current_exception()); + } +} + +} // namespace NYdb::NObservability diff --git a/src/client/impl/observability/metrics.h b/src/client/impl/observability/metrics.h new file mode 100644 index 00000000000..07c91a08f30 --- /dev/null +++ b/src/client/impl/observability/metrics.h @@ -0,0 +1,31 @@ +#pragma once + +#include +#include + +#include + +#include +#include + +namespace NYdb::inline V3::NObservability { + +class TRequestMetrics { +public: + TRequestMetrics(NSdkStats::TStatCollector::TClientOperationStatCollector* operationCollector + , const std::string& requestName + , const TLog& log + ); + ~TRequestMetrics() noexcept; + + void End(EStatus status) noexcept; + +private: + NSdkStats::TStatCollector::TClientOperationStatCollector* Collector_ = nullptr; + std::string RequestName_; + std::chrono::steady_clock::time_point StartTime_{}; + bool Ended_ = false; + TLog Log_; +}; + +} // namespace NYdb::NObservability diff --git a/src/client/impl/observability/observation.cpp b/src/client/impl/observability/observation.cpp new file mode 100644 index 00000000000..7483087bcfd --- /dev/null +++ b/src/client/impl/observability/observation.cpp @@ -0,0 +1,35 @@ +#include "observation.h" + +namespace NYdb::inline V3::NObservability { + +TRequestObservation::TRequestObservation(const std::string& ydbClientType + , NSdkStats::TStatCollector::TClientOperationStatCollector* operationCollector + , std::shared_ptr tracer + , const std::string& operationName + , const std::string& discoveryEndpoint + , const std::string& database + , const TLog& log +) : Span_(std::make_shared(std::move(tracer), operationName, discoveryEndpoint, database, log, ydbClientType)) + , Metrics_(std::make_shared(operationCollector, operationName, log)) +{} + +void TRequestObservation::SetPeerEndpoint(const std::string& endpoint) noexcept { + if (Span_) { + Span_->SetPeerEndpoint(endpoint); + } +} + +void TRequestObservation::End(EStatus status) noexcept { + if (Span_) { + Span_->End(status); + } + if (Metrics_) { + Metrics_->End(status); + } +} + +void TRequestObservation::EndWithClientInternalError() noexcept { + End(EStatus::CLIENT_INTERNAL_ERROR); +} + +} // namespace NYdb::NObservability diff --git a/src/client/impl/observability/observation.h b/src/client/impl/observability/observation.h new file mode 100644 index 00000000000..544b0c4baff --- /dev/null +++ b/src/client/impl/observability/observation.h @@ -0,0 +1,31 @@ +#pragma once + +#include "metrics.h" +#include "span.h" + +#include +#include + +namespace NYdb::inline V3::NObservability { + +class TRequestObservation { +public: + TRequestObservation(const std::string& ydbClientType + , NSdkStats::TStatCollector::TClientOperationStatCollector* operationCollector + , std::shared_ptr tracer + , const std::string& operationName + , const std::string& discoveryEndpoint + , const std::string& database + , const TLog& log + ); + + void SetPeerEndpoint(const std::string& endpoint) noexcept; + void End(EStatus status) noexcept; + void EndWithClientInternalError() noexcept; + +private: + std::shared_ptr Span_; + std::shared_ptr Metrics_; +}; + +} // namespace NYdb::NObservability diff --git a/src/client/impl/observability/span.cpp b/src/client/impl/observability/span.cpp new file mode 100644 index 00000000000..46805563fee --- /dev/null +++ b/src/client/impl/observability/span.cpp @@ -0,0 +1,154 @@ +#include "span.h" + +#include + +#include + +#include + +namespace NYdb::inline V3::NObservability { + +namespace { + +constexpr int DefaultGrpcPort = 2135; + +std::string YdbClientApiAttributeValue(const std::string& clientType) noexcept { + return clientType.empty() ? std::string("Unspecified") : clientType; +} + +void ParseEndpoint(const std::string& endpoint, std::string& host, int& port) { + port = DefaultGrpcPort; + + if (endpoint.empty()) { + host = endpoint; + return; + } + + if (endpoint.front() == '[') { + auto bracketEnd = endpoint.find(']'); + if (bracketEnd != std::string::npos) { + host = endpoint.substr(1, bracketEnd - 1); + if (bracketEnd + 2 < endpoint.size() && endpoint[bracketEnd + 1] == ':') { + try { + port = std::stoi(endpoint.substr(bracketEnd + 2)); + } catch (...) {} + } + return; + } + } + + auto pos = endpoint.rfind(':'); + if (pos != std::string::npos) { + host = endpoint.substr(0, pos); + try { + port = std::stoi(endpoint.substr(pos + 1)); + } catch (...) {} + } else { + host = endpoint; + } +} + +void SafeLogRequestSpanError(TLog& log, const char* message, std::exception_ptr exception) noexcept { + try { + if (!exception) { + LOG_LAZY(log, TLOG_ERR, std::string("TRequestSpan: ") + message + ": (no active exception)"); + return; + } + try { + std::rethrow_exception(exception); + } catch (const std::exception& e) { + LOG_LAZY(log, TLOG_ERR, std::string("TRequestSpan: ") + message + ": " + e.what()); + return; + } catch (...) { + } + LOG_LAZY(log, TLOG_ERR, std::string("TRequestSpan: ") + message + ": (unknown)"); + } catch (...) { + } +} + +} // namespace + +TRequestSpan::TRequestSpan(std::shared_ptr tracer + , const std::string& requestName + , const std::string& endpoint + , const std::string& database + , const TLog& log + , const std::string& ydbClientType +) : Log_(log) { + if (!tracer) { + return; + } + + std::string host; + int port; + ParseEndpoint(endpoint, host, port); + + try { + Span_ = tracer->StartSpan(requestName, NTrace::ESpanKind::CLIENT); + if (!Span_) { + return; + } + Span_->SetAttribute("db.system.name", "ydb"); + Span_->SetAttribute("db.namespace", database); + Span_->SetAttribute("db.operation.name", requestName); + Span_->SetAttribute("ydb.client.api", YdbClientApiAttributeValue(ydbClientType)); + Span_->SetAttribute("server.address", host); + Span_->SetAttribute("server.port", static_cast(port)); + } catch (...) { + SafeLogRequestSpanError(Log_, "failed to initialize span", std::current_exception()); + Span_.reset(); + } +} + +TRequestSpan::~TRequestSpan() noexcept { + if (Span_) { + try { + Span_->End(); + } catch (...) { + SafeLogRequestSpanError(Log_, "failed to end span", std::current_exception()); + } + } +} + +void TRequestSpan::SetPeerEndpoint(const std::string& endpoint) noexcept { + if (!Span_ || endpoint.empty()) { + return; + } + try { + std::string host; + int port; + ParseEndpoint(endpoint, host, port); + Span_->SetAttribute("network.peer.address", host); + Span_->SetAttribute("network.peer.port", static_cast(port)); + } catch (...) { + SafeLogRequestSpanError(Log_, "failed to set peer endpoint", std::current_exception()); + } +} + +void TRequestSpan::AddEvent(const std::string& name, const std::map& attributes) noexcept { + if (!Span_) { + return; + } + try { + Span_->AddEvent(name, attributes); + } catch (...) { + SafeLogRequestSpanError(Log_, "failed to add event", std::current_exception()); + } +} + +void TRequestSpan::End(EStatus status) noexcept { + if (Span_) { + try { + Span_->SetAttribute("db.response.status_code", ToString(status)); + if (status != EStatus::SUCCESS) { + Span_->SetAttribute("error.type", ToString(status)); + } + Span_->End(); + } catch (...) { + SafeLogRequestSpanError(Log_, "failed to finalize span", std::current_exception()); + } + Span_.reset(); + } +} + +} // namespace NYdb::NObservability diff --git a/src/client/impl/observability/span.h b/src/client/impl/observability/span.h new file mode 100644 index 00000000000..dbbde456e63 --- /dev/null +++ b/src/client/impl/observability/span.h @@ -0,0 +1,35 @@ +#pragma once + +#include +#include + +#include + +#include +#include +#include + +namespace NYdb::inline V3::NObservability { + +class TRequestSpan { +public: + TRequestSpan(std::shared_ptr tracer + , const std::string& requestName + , const std::string& endpoint + , const std::string& database + , const TLog& log + , const std::string& ydbClientType = {} + ); + ~TRequestSpan() noexcept; + + void SetPeerEndpoint(const std::string& endpoint) noexcept; + void AddEvent(const std::string& name, const std::map& attributes = {}) noexcept; + + void End(EStatus status) noexcept; + +private: + TLog Log_; + std::shared_ptr Span_; +}; + +} // namespace NYdb::NObservability diff --git a/src/client/impl/stats/stats.h b/src/client/impl/stats/stats.h index d545764c887..b2e61c0ace8 100644 --- a/src/client/impl/stats/stats.h +++ b/src/client/impl/stats/stats.h @@ -1,17 +1,24 @@ #pragma once #include +#include #include #include #include #include +#include #include +#include namespace NYdb::inline V3 { namespace NSdkStats { +inline std::string YdbClientApiAttributeValue(const std::string& clientType) { + return clientType.empty() ? std::string("Unspecified") : clientType; +} + // works only for case normal (foo_bar) underscore inline std::string UnderscoreToUpperCamel(const std::string& in) { @@ -226,6 +233,121 @@ struct TStatCollector { std::string ClientType_; }; + struct TClientOperationStatCollector { + TClientOperationStatCollector() + : MetricRegistry_() + {} + + TClientOperationStatCollector(::NMonitoring::TMetricRegistry* registry, + const std::string& database, + const std::string& clientType, + std::shared_ptr externalRegistry = {}) + : MetricRegistry_(registry) + , ExternalRegistry_(std::move(externalRegistry)) + , Database_(database) + , ClientType_(clientType) + {} + + void IncRequestCount(const std::string& operationName) { + if (auto registry = MetricRegistry_.Get()) { + registry->Rate({ + {"database", Database_}, + {"ydb_client", ClientType_}, + {"operation", operationName}, + {"sensor", "Request/Operations"} + })->Inc(); + } + if (ExternalRegistry_) { + const std::string clientApi = YdbClientApiAttributeValue(ClientType_); + NMetrics::TLabels labels = { + {"db.system.name", "ydb"}, + {"db.namespace", Database_}, + {"db.operation.name", operationName}, + {"ydb.client.api", clientApi}, + }; + ExternalRegistry_->Counter( + "db.client.operation.requests", + labels, + "Number of database client operations started.", + "{operation}" + )->Inc(); + ExternalRegistry_->Counter( + "db.client.operation.errors", + labels, + "Number of database client operations that failed.", + "{error}" + ); + } + } + + void IncErrorCount(const std::string& operationName, EStatus status) { + if (status == EStatus::SUCCESS) { + return; + } + if (auto registry = MetricRegistry_.Get()) { + registry->Rate({ + {"database", Database_}, + {"ydb_client", ClientType_}, + {"operation", operationName}, + {"status", TStringBuilder() << status}, + {"sensor", "Request/OperationErrors"} + })->Inc(); + } + if (ExternalRegistry_) { + const std::string clientApi = YdbClientApiAttributeValue(ClientType_); + NMetrics::TLabels labels = { + {"db.system.name", "ydb"}, + {"db.namespace", Database_}, + {"db.operation.name", operationName}, + {"ydb.client.api", clientApi}, + }; + ExternalRegistry_->Counter( + "db.client.operation.errors", + labels, + "Number of database client operations that failed.", + "{error}" + )->Inc(); + } + } + + void RecordLatency(const std::string& operationName, double durationSeconds, EStatus status) { + if (auto registry = MetricRegistry_.Get()) { + registry->HistogramRate({ + {"database", Database_}, + {"ydb_client", ClientType_}, + {"operation", operationName}, + {"sensor", "Request/OperationLatencyMs"} + }, ::NMonitoring::ExponentialHistogram(20, 2, 1))->Record( + static_cast(durationSeconds * 1000.0)); + } + if (ExternalRegistry_) { + NMetrics::TLabels labels = { + {"db.system.name", "ydb"}, + {"db.namespace", Database_}, + {"db.operation.name", operationName}, + {"ydb.client.api", YdbClientApiAttributeValue(ClientType_)}, + {"db.response.status_code", TStringBuilder() << status}, + }; + if (status != EStatus::SUCCESS) { + labels["error.type"] = TStringBuilder() << status; + } + ExternalRegistry_->Histogram( + "db.client.operation.duration", + {0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10}, + labels, + "Duration of database client operations.", + "s" + )->Record(durationSeconds); + } + } + + private: + TAtomicPointer<::NMonitoring::TMetricRegistry> MetricRegistry_; + std::shared_ptr ExternalRegistry_; + std::string Database_; + std::string ClientType_; + }; + struct TClientStatCollector { TClientStatCollector(::NMonitoring::TRate* cacheMiss = nullptr @@ -233,13 +355,15 @@ struct TStatCollector { , ::NMonitoring::THistogram* paramsSize = nullptr , ::NMonitoring::TRate* sessionRemoved = nullptr , ::NMonitoring::TRate* requestMigrated = nullptr - , TClientRetryOperationStatCollector retryOperationStatCollector = TClientRetryOperationStatCollector()) + , TClientRetryOperationStatCollector retryOperationStatCollector = TClientRetryOperationStatCollector() + , TClientOperationStatCollector operationStatCollector = TClientOperationStatCollector()) : CacheMiss(cacheMiss) , QuerySize(querySize) , ParamsSize(paramsSize) , SessionRemovedDueBalancing(sessionRemoved) , RequestMigrated(requestMigrated) , RetryOperationStatCollector(retryOperationStatCollector) + , OperationStatCollector(operationStatCollector) { } ::NMonitoring::TRate* CacheMiss; @@ -248,11 +372,15 @@ struct TStatCollector { ::NMonitoring::TRate* SessionRemovedDueBalancing; ::NMonitoring::TRate* RequestMigrated; TClientRetryOperationStatCollector RetryOperationStatCollector; + TClientOperationStatCollector OperationStatCollector; }; - TStatCollector(const std::string& database, TMetricRegistry* sensorsRegistry) - : Database_(database) + TStatCollector(const std::string& database + , TMetricRegistry* sensorsRegistry + , std::shared_ptr externalMetricRegistry = {} + ) : Database_(database) , DatabaseLabel_({"database", database}) + , ExternalMetricRegistry_(std::move(externalMetricRegistry)) { if (sensorsRegistry) { SetMetricRegistry(sensorsRegistry); @@ -376,10 +504,13 @@ struct TStatCollector { {"sensor", "Request/ParamsSize"} }, ::NMonitoring::ExponentialHistogram(10, 2, 32)); return TClientStatCollector(cacheMiss, querySize, paramsSize, sessionRemovedDueBalancing, requestMigrated, - TClientRetryOperationStatCollector(MetricRegistryPtr_.Get(), Database_, clientType)); + TClientRetryOperationStatCollector(MetricRegistryPtr_.Get(), Database_, clientType), + TClientOperationStatCollector(MetricRegistryPtr_.Get(), Database_, clientType, ExternalMetricRegistry_)); } - return TClientStatCollector(); + return TClientStatCollector(nullptr, nullptr, nullptr, nullptr, nullptr, + TClientRetryOperationStatCollector(nullptr, Database_, clientType), + TClientOperationStatCollector(nullptr, Database_, clientType, ExternalMetricRegistry_)); } bool IsCollecting() { @@ -397,6 +528,7 @@ struct TStatCollector { private: const std::string Database_; const ::NMonitoring::TLabel DatabaseLabel_; + std::shared_ptr ExternalMetricRegistry_; TAtomicPointer MetricRegistryPtr_; TAtomicCounter<::NMonitoring::TRate> DiscoveryDuePessimization_; TAtomicCounter<::NMonitoring::TRate> DiscoveryDueExpiration_; diff --git a/src/client/metrics/metrics.cpp b/src/client/metrics/metrics.cpp new file mode 100644 index 00000000000..341917291bb --- /dev/null +++ b/src/client/metrics/metrics.cpp @@ -0,0 +1 @@ +#include diff --git a/src/client/query/client.cpp b/src/client/query/client.cpp index ccf90f1175c..a6db0b273db 100644 --- a/src/client/query/client.cpp +++ b/src/client/query/client.cpp @@ -14,8 +14,10 @@ #include #include +#include #include #include +#include #include @@ -23,6 +25,7 @@ namespace NYdb::inline V3::NQuery { +using TQueryObservation = NObservability::TRequestObservation; using TRetryContextResultAsync = NRetry::Async::TRetryContext; using TRetryContextAsync = NRetry::Async::TRetryContext; @@ -67,6 +70,10 @@ class TQueryClient::TImpl: public TClientImplCommon, public { SetStatCollector(DbDriverState_->StatCollector.GetClientStatCollector("Query")); SessionPool_.SetStatCollector(DbDriverState_->StatCollector.GetSessionPoolStatCollector("Query")); + + if (auto traceProvider = Connections_->GetTraceProvider()) { + Tracer_ = traceProvider->GetTracer("ydb-cpp-sdk-query"); + } } ~TImpl() { @@ -77,6 +84,7 @@ class TQueryClient::TImpl: public TClientImplCommon, public QuerySizeHistogram_.Set(collector.QuerySize); ParamsSizeHistogram_.Set(collector.ParamsSize); RetryOperationStatCollector_ = collector.RetryOperationStatCollector; + OperationStatCollector_ = collector.OperationStatCollector; } TAsyncExecuteQueryIterator StreamExecuteQuery(const std::string& query, const TTxControl& txControl, @@ -94,8 +102,22 @@ class TQueryClient::TImpl: public TClientImplCommon, public { CollectQuerySize(query); CollectParamsSize(params ? ¶ms->GetProtoMap() : nullptr); + + auto obs = MakeObservation("ExecuteQuery"); + return TExecQueryImpl::ExecuteQuery( - Connections_, DbDriverState_, query, txControl, params, settings, session); + Connections_, DbDriverState_, query, txControl, params, settings, session) + .Apply([obs](TAsyncExecuteQueryResult future) { + try { + auto result = future.GetValue(); + obs->SetPeerEndpoint(result.GetEndpoint()); + obs->End(result.GetStatus()); + return result; + } catch (...) { + obs->EndWithClientInternalError(); + throw; + } + }); } NThreading::TFuture ExecuteScript(const std::string& script, const std::optional& params, const TExecuteScriptSettings& settings) { @@ -162,20 +184,27 @@ class TQueryClient::TImpl: public TClientImplCommon, public auto promise = NThreading::NewPromise(); - auto responseCb = [promise, session] + auto obs = MakeObservation("Rollback"); + + auto responseCb = [promise, session, obs] (Ydb::Query::RollbackTransactionResponse* response, TPlainStatus status) mutable { try { + obs->SetPeerEndpoint(status.Endpoint); if (response) { NYdb::NIssue::TIssues opIssues; NYdb::NIssue::IssuesFromMessage(response->issues(), opIssues); TStatus rollbackTxStatus(TPlainStatus{static_cast(response->status()), std::move(opIssues), status.Endpoint, std::move(status.Metadata)}); + obs->End(rollbackTxStatus.GetStatus()); + promise.SetValue(std::move(rollbackTxStatus)); } else { + obs->End(status.Status); promise.SetValue(TStatus(std::move(status))); } } catch (...) { + obs->EndWithClientInternalError(); promise.SetException(std::current_exception()); } }; @@ -203,21 +232,28 @@ class TQueryClient::TImpl: public TClientImplCommon, public auto promise = NThreading::NewPromise(); - auto responseCb = [promise, session] + auto obs = MakeObservation("Commit"); + + auto responseCb = [promise, session, obs] (Ydb::Query::CommitTransactionResponse* response, TPlainStatus status) mutable { try { + obs->SetPeerEndpoint(status.Endpoint); if (response) { NYdb::NIssue::TIssues opIssues; NYdb::NIssue::IssuesFromMessage(response->issues(), opIssues); TStatus commitTxStatus(TPlainStatus{static_cast(response->status()), std::move(opIssues), status.Endpoint, std::move(status.Metadata)}); + obs->End(commitTxStatus.GetStatus()); + TCommitTransactionResult commitTxResult(std::move(commitTxStatus)); promise.SetValue(std::move(commitTxResult)); } else { + obs->End(status.Status); promise.SetValue(TCommitTransactionResult(TStatus(std::move(status)))); } } catch (...) { + obs->EndWithClientInternalError(); promise.SetException(std::current_exception()); } }; @@ -425,10 +461,12 @@ class TQueryClient::TImpl: public TClientImplCommon, public TAsyncCreateSessionResult GetSession(const TCreateSessionSettings& settings) { class TQueryClientGetSessionCtx : public NSessionPool::IGetSessionCtx { public: - TQueryClientGetSessionCtx(std::shared_ptr client, const TCreateSessionSettings& settings) + TQueryClientGetSessionCtx(std::shared_ptr client, const TCreateSessionSettings& settings, + std::shared_ptr observation) : Promise(NThreading::NewPromise()) , Client(client) , RpcSettings(TRpcRequestSettings::Make(settings)) + , Observation(std::move(observation)) {} TAsyncCreateSessionResult GetFuture() { @@ -437,6 +475,9 @@ class TQueryClient::TImpl: public TClientImplCommon, public void ReplyError(TStatus status) override { TSession session; + if (Observation) { + Observation->End(status.GetStatus()); + } ScheduleReply(TCreateSessionResult(std::move(status), std::move(session))); } @@ -449,14 +490,22 @@ class TQueryClient::TImpl: public TClientImplCommon, public ) ); + if (Observation) { + Observation->End(EStatus::SUCCESS); + } ScheduleReply(std::move(val)); } void ReplyNewSession() override { Client->CreateAttachedSession(RpcSettings).Subscribe( - [promise{std::move(Promise)}](TAsyncCreateSessionResult future) mutable + [promise{std::move(Promise)}, obs = Observation](TAsyncCreateSessionResult future) mutable { - promise.SetValue(future.ExtractValue()); + auto val = future.ExtractValue(); + if (obs) { + obs->SetPeerEndpoint(val.GetEndpoint()); + obs->End(val.GetStatus()); + } + promise.SetValue(std::move(val)); }); } @@ -481,9 +530,11 @@ class TQueryClient::TImpl: public TClientImplCommon, public NThreading::TPromise Promise; std::shared_ptr Client; const TRpcRequestSettings RpcSettings; + std::shared_ptr Observation; }; - auto ctx = std::make_unique(shared_from_this(), settings); + auto obs = MakeObservation("GetSession"); + auto ctx = std::make_unique(shared_from_this(), settings, obs); auto future = ctx->GetFuture(); SessionPool_.GetSession(std::move(ctx)); @@ -552,6 +603,20 @@ class TQueryClient::TImpl: public TClientImplCommon, public } private: + std::shared_ptr MakeObservation(const std::string& operationName) { + return std::make_shared( + "Query", + &OperationStatCollector_, + Tracer_, + operationName, + DbDriverState_->DiscoveryEndpoint, + DbDriverState_->Database, + DbDriverState_->Log + ); + } + + std::shared_ptr Tracer_; + NSdkStats::TStatCollector::TClientOperationStatCollector OperationStatCollector_; NSdkStats::TStatCollector::TClientRetryOperationStatCollector RetryOperationStatCollector_; NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> QuerySizeHistogram_; NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> ParamsSizeHistogram_; diff --git a/src/client/table/impl/table_client.cpp b/src/client/table/impl/table_client.cpp index 6529f03c187..47626433eaa 100644 --- a/src/client/table/impl/table_client.cpp +++ b/src/client/table/impl/table_client.cpp @@ -22,11 +22,18 @@ TTableClient::TImpl::TImpl(std::shared_ptr&& connections, , Settings_(settings) , SessionPool_(Settings_.SessionPoolSettings_.MaxActiveSessions_) { + auto clientCollector = DbDriverState_->StatCollector.GetClientStatCollector("Table"); + OperationStatCollector_ = clientCollector.OperationStatCollector; + + if (auto traceProvider = Connections_->GetTraceProvider()) { + Tracer_ = traceProvider->GetTracer("ydb-cpp-sdk-table"); + } + if (!DbDriverState_->StatCollector.IsCollecting()) { return; } - SetStatCollector(DbDriverState_->StatCollector.GetClientStatCollector("Table")); + SetStatCollector(clientCollector); SessionPool_.SetStatCollector(DbDriverState_->StatCollector.GetSessionPoolStatCollector("Table")); } @@ -378,8 +385,9 @@ TAsyncCreateSessionResult TTableClient::TImpl::CreateSession(const TCreateSessio auto createSessionPromise = NewPromise(); auto self = shared_from_this(); + auto obs = MakeObservation("GetSession"); - auto createSessionExtractor = [createSessionPromise, self, standalone] + auto createSessionExtractor = [createSessionPromise, self, standalone, obs] (google::protobuf::Any* any, TPlainStatus status) mutable { Ydb::Table::CreateSessionResult result; if (any) { @@ -396,6 +404,7 @@ TAsyncCreateSessionResult TTableClient::TImpl::CreateSession(const TCreateSessio session.SessionImpl_->MarkBroken(); } TCreateSessionResult val(TStatus(std::move(status)), std::move(session)); + obs->End(val.GetStatus()); createSessionPromise.SetValue(std::move(val)); }; @@ -759,11 +768,19 @@ TAsyncStatus TTableClient::TImpl::ExecuteSchemeQuery(const TSession& session, co request.set_session_id(TStringType{session.GetId()}); request.set_yql_text(TStringType{query}); - return RunSimple( + auto obs = MakeObservation("ExecuteSchemeQuery"); + + auto future = RunSimple( std::move(request), &Ydb::Table::V1::TableService::Stub::AsyncExecuteSchemeQuery, rpcSettings ); + + return future.Apply([obs](NThreading::TFuture f) mutable { + auto status = f.ExtractValue(); + obs->End(status.GetStatus()); + return status; + }); } TAsyncBeginTransactionResult TTableClient::TImpl::BeginTransaction(const TSession& session, const TTxSettings& txSettings, @@ -776,9 +793,11 @@ TAsyncBeginTransactionResult TTableClient::TImpl::BeginTransaction(const TSessio request.set_session_id(TStringType{session.GetId()}); SetTxSettings(txSettings, request.mutable_tx_settings()); + auto obs = MakeObservation("BeginTransaction"); + auto promise = NewPromise(); - auto extractor = [promise, session] + auto extractor = [promise, session, obs] (google::protobuf::Any* any, TPlainStatus status) mutable { std::string txId; if (any) { @@ -789,6 +808,7 @@ TAsyncBeginTransactionResult TTableClient::TImpl::BeginTransaction(const TSessio TBeginTransactionResult beginTxResult(TStatus(std::move(status)), TTransaction(session, txId)); + obs->End(beginTxResult.GetStatus()); promise.SetValue(std::move(beginTxResult)); }; @@ -815,9 +835,11 @@ TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSess request.set_tx_id(TStringType{txId}); request.set_collect_stats(GetStatsCollectionMode(settings.CollectQueryStats_)); + auto obs = MakeObservation("CommitTransaction"); + auto promise = NewPromise(); - auto extractor = [promise] + auto extractor = [promise, obs] (google::protobuf::Any* any, TPlainStatus status) mutable { std::optional queryStats; if (any) { @@ -830,6 +852,7 @@ TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSess } TCommitTransactionResult commitTxResult(TStatus(std::move(status)), queryStats); + obs->End(commitTxResult.GetStatus()); promise.SetValue(std::move(commitTxResult)); }; @@ -855,11 +878,19 @@ TAsyncStatus TTableClient::TImpl::RollbackTransaction(const TSession& session, c request.set_session_id(TStringType{session.GetId()}); request.set_tx_id(TStringType{txId}); - return RunSimple( + auto obs = MakeObservation("RollbackTransaction"); + + auto future = RunSimple( std::move(request), &Ydb::Table::V1::TableService::Stub::AsyncRollbackTransaction, rpcSettings ); + + return future.Apply([obs](TAsyncStatus fut) { + auto status = fut.GetValue(); + obs->End(status.GetStatus()); + return status; + }); } TAsyncExplainDataQueryResult TTableClient::TImpl::ExplainDataQuery(const TSession& session, const std::string& query, @@ -1100,6 +1131,7 @@ void TTableClient::TImpl::SetStatCollector(const NSdkStats::TStatCollector::TCli ParamsSizeHistogram.Set(collector.ParamsSize); RetryOperationStatCollector = collector.RetryOperationStatCollector; SessionRemovedDueBalancing.Set(collector.SessionRemovedDueBalancing); + OperationStatCollector_ = collector.OperationStatCollector; } TAsyncBulkUpsertResult TTableClient::TImpl::BulkUpsert(const std::string& table, TValue&& rows, const TBulkUpsertSettings& settings) { @@ -1128,10 +1160,13 @@ TAsyncBulkUpsertResult TTableClient::TImpl::BulkUpsert(const std::string& table, *mutable_rows->mutable_type() = rows.GetType().GetProto(); } + auto obs = MakeObservation("BulkUpsert"); + auto promise = NewPromise(); - auto extractor = [promise](google::protobuf::Any* any, TPlainStatus status) mutable { + auto extractor = [promise, obs](google::protobuf::Any* any, TPlainStatus status) mutable { Y_UNUSED(any); TBulkUpsertResult val(TStatus(std::move(status))); + obs->End(val.GetStatus()); promise.SetValue(std::move(val)); }; @@ -1174,12 +1209,15 @@ TAsyncBulkUpsertResult TTableClient::TImpl::BulkUpsert(const std::string& table, } request.set_data(TStringType{data}); + auto obs = MakeObservation("BulkUpsert"); + auto promise = NewPromise(); - auto extractor = [promise] + auto extractor = [promise, obs] (google::protobuf::Any* any, TPlainStatus status) mutable { Y_UNUSED(any); TBulkUpsertResult val(TStatus(std::move(status))); + obs->End(val.GetStatus()); promise.SetValue(std::move(val)); }; diff --git a/src/client/table/impl/table_client.h b/src/client/table/impl/table_client.h index 4c0f607d748..27c35c843ad 100644 --- a/src/client/table/impl/table_client.h +++ b/src/client/table/impl/table_client.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #undef INCLUDE_YDB_INTERNAL_H @@ -240,6 +241,8 @@ class TTableClient::TImpl: public TClientImplCommon, public auto promise = NewPromise(); bool keepInCache = settings.KeepInQueryCache_ && settings.KeepInQueryCache_.value(); + auto obs = MakeObservation("ExecuteDataQuery"); + // We don't want to delay call of TSession dtor, so we can't capture it by copy // otherwise we break session pool and other clients logic. // Same problem with TDataQuery and TTransaction @@ -249,7 +252,7 @@ class TTableClient::TImpl: public TClientImplCommon, public // - capture pointer // - call free just before SetValue call auto sessionPtr = new TSession(session); - auto extractor = [promise, sessionPtr, query, fromCache, keepInCache] + auto extractor = [promise, sessionPtr, query, fromCache, keepInCache, obs] (google::protobuf::Any* any, TPlainStatus status) mutable { std::vector res; std::optional tx; @@ -288,6 +291,8 @@ class TTableClient::TImpl: public TClientImplCommon, public TDataQueryResult dataQueryResult(TStatus(std::move(status)), std::move(res), tx, dataQuery, fromCache, queryStats); + obs->End(dataQueryResult.GetStatus()); + delete sessionPtr; tx.reset(); dataQuery.reset(); @@ -330,9 +335,23 @@ class TTableClient::TImpl: public TClientImplCommon, public NSdkStats::TAtomicCounter<::NMonitoring::TRate> SessionRemovedDueBalancing; private: + std::shared_ptr Tracer_; + NSdkStats::TStatCollector::TClientOperationStatCollector OperationStatCollector_; NSessionPool::TSessionPool SessionPool_; TRequestMigrator RequestMigrator_; static const TKeepAliveSettings KeepAliveSettings; + + std::shared_ptr MakeObservation(const std::string& operationName) { + return std::make_shared( + "Table", + &OperationStatCollector_, + Tracer_, + operationName, + DbDriverState_->DiscoveryEndpoint, + DbDriverState_->Database, + DbDriverState_->Log + ); + } }; } diff --git a/src/client/trace/trace.cpp b/src/client/trace/trace.cpp new file mode 100644 index 00000000000..6bf5bc664f0 --- /dev/null +++ b/src/client/trace/trace.cpp @@ -0,0 +1 @@ +#include diff --git a/tests/common/fake_metric_registry.h b/tests/common/fake_metric_registry.h new file mode 100644 index 00000000000..032234f080f --- /dev/null +++ b/tests/common/fake_metric_registry.h @@ -0,0 +1,135 @@ +#pragma once + +#include + +#include +#include +#include +#include + +namespace NYdb::NTests { + +class TFakeCounter : public NMetrics::ICounter { +public: + void Inc() override { + Count_.fetch_add(1, std::memory_order_relaxed); + } + + int64_t Get() const { + return Count_.load(std::memory_order_relaxed); + } + +private: + std::atomic Count_{0}; +}; + +class TFakeHistogram : public NMetrics::IHistogram { +public: + void Record(double value) override { + std::lock_guard lock(Mutex_); + Values_.push_back(value); + } + + std::vector GetValues() const { + std::lock_guard lock(Mutex_); + return Values_; + } + + size_t Count() const { + std::lock_guard lock(Mutex_); + return Values_.size(); + } + +private: + mutable std::mutex Mutex_; + std::vector Values_; +}; + +class TFakeGauge : public NMetrics::IGauge { +public: + void Add(double delta) override { Value_ += delta; } + void Set(double value) override { Value_ = value; } + double Get() const { return Value_; } + +private: + double Value_ = 0.0; +}; + +struct TMetricKey { + std::string Name; + NMetrics::TLabels Labels; + + bool operator==(const TMetricKey& other) const = default; + bool operator<(const TMetricKey& other) const { + if (Name != other.Name) return Name < other.Name; + return Labels < other.Labels; + } +}; + +class TFakeMetricRegistry : public NMetrics::IMetricRegistry { +public: + std::shared_ptr Counter(const std::string& name + , const NMetrics::TLabels& labels + , const std::string& /*description*/ + , const std::string& /*unit*/ + ) override { + std::lock_guard lock(Mutex_); + auto key = TMetricKey{name, labels}; + auto it = Counters_.find(key); + if (it != Counters_.end()) { + return it->second; + } + auto counter = std::make_shared(); + Counters_[key] = counter; + return counter; + } + + std::shared_ptr Gauge(const std::string& name + , const NMetrics::TLabels& labels + , const std::string& /*description*/ + , const std::string& /*unit*/ + ) override { + std::lock_guard lock(Mutex_); + auto key = TMetricKey{name, labels}; + auto gauge = std::make_shared(); + Gauges_[key] = gauge; + return gauge; + } + + std::shared_ptr Histogram(const std::string& name + , const std::vector& /*buckets*/ + , const NMetrics::TLabels& labels + , const std::string& /*description*/ + , const std::string& /*unit*/ + ) override { + std::lock_guard lock(Mutex_); + auto key = TMetricKey{name, labels}; + auto it = Histograms_.find(key); + if (it != Histograms_.end()) { + return it->second; + } + auto histogram = std::make_shared(); + Histograms_[key] = histogram; + return histogram; + } + + std::shared_ptr GetCounter(const std::string& name, const NMetrics::TLabels& labels = {}) const { + std::lock_guard lock(Mutex_); + auto it = Counters_.find(TMetricKey{name, labels}); + return it != Counters_.end() ? it->second : nullptr; + } + + std::shared_ptr GetHistogram(const std::string& name, const NMetrics::TLabels& labels = {}) const { + std::lock_guard lock(Mutex_); + auto it = Histograms_.find(TMetricKey{name, labels}); + return it != Histograms_.end() ? it->second : nullptr; + } + +private: + mutable std::mutex Mutex_; + std::map> Counters_; + std::map> Gauges_; + std::map> Histograms_; +}; + +} // namespace NYdb::NTests diff --git a/tests/integration/metrics/main.cpp b/tests/integration/metrics/main.cpp new file mode 100644 index 00000000000..4668d5349ab --- /dev/null +++ b/tests/integration/metrics/main.cpp @@ -0,0 +1,313 @@ +#include +#include +#include +#include + +#include + +#include + +using namespace NYdb; +using namespace NYdb::NQuery; +using namespace NYdb::NTests; + +namespace { + +std::string GetEnvOrEmpty(const char* name) { + const char* value = std::getenv(name); + return value ? std::string(value) : std::string(); +} + +struct TRunArgs { + TDriver Driver; + std::shared_ptr Registry; + std::string Database; +}; + +TRunArgs MakeRunArgs() { + std::string endpoint = GetEnvOrEmpty("YDB_ENDPOINT"); + std::string database = GetEnvOrEmpty("YDB_DATABASE"); + + auto registry = std::make_shared(); + + auto driverConfig = TDriverConfig() + .SetEndpoint(endpoint) + .SetDatabase(database) + .SetAuthToken(GetEnvOrEmpty("YDB_TOKEN")) + .SetMetricRegistry(registry); + + TDriver driver(driverConfig); + return {std::move(driver), registry, std::move(database)}; +} + +std::shared_ptr GetCounter( + const std::shared_ptr& registry, + const std::string& dbNamespace, + const std::string& name, + const std::string& operation) +{ + return registry->GetCounter(name, { + {"db.system.name", "ydb"}, + {"db.namespace", dbNamespace}, + {"db.operation.name", operation}, + {"ydb.client.api", "Query"}, + }); +} + +std::shared_ptr GetDuration( + const std::shared_ptr& registry, + const std::string& dbNamespace, + const std::string& operation, + EStatus status) +{ + NMetrics::TLabels labels = { + {"db.system.name", "ydb"}, + {"db.namespace", dbNamespace}, + {"db.operation.name", operation}, + {"ydb.client.api", "Query"}, + {"db.response.status_code", ToString(status)}, + }; + if (status != EStatus::SUCCESS) { + labels["error.type"] = ToString(status); + } + return registry->GetHistogram("db.client.operation.duration", labels); +} + +void SkipQueryMetricsIntegrationIfNoEnv() { + if (GetEnvOrEmpty("YDB_ENDPOINT").empty() || GetEnvOrEmpty("YDB_DATABASE").empty()) { + GTEST_SKIP() << "Set YDB_ENDPOINT and YDB_DATABASE to run QueryMetricsIntegration tests"; + } +} + +} // namespace + +TEST(QueryMetricsIntegration, ExecuteQuerySuccessRecordsMetrics) { + SkipQueryMetricsIntegrationIfNoEnv(); + auto [driver, registry, database] = MakeRunArgs(); + TQueryClient client(driver, TClientSettings().Database(database)); + + auto session = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(session.IsSuccess()) << session.GetIssues().ToString(); + + auto result = session.GetSession().ExecuteQuery( + "SELECT 1;", + TTxControl::BeginTx().CommitTx() + ).ExtractValueSync(); + ASSERT_EQ(result.GetStatus(), EStatus::SUCCESS) << result.GetIssues().ToString(); + + auto requests = GetCounter(registry, database, "db.client.operation.requests", "ExecuteQuery"); + ASSERT_NE(requests, nullptr) << "ExecuteQuery request counter not created"; + EXPECT_GE(requests->Get(), 1); + + auto errors = GetCounter(registry, database, "db.client.operation.errors", "ExecuteQuery"); + ASSERT_NE(errors, nullptr); + EXPECT_EQ(errors->Get(), 0); + + auto duration = GetDuration(registry, database, "ExecuteQuery", EStatus::SUCCESS); + ASSERT_NE(duration, nullptr) << "ExecuteQuery duration histogram not created"; + EXPECT_GE(duration->Count(), 1u); + for (double v : duration->GetValues()) { + EXPECT_GE(v, 0.0); + } + + driver.Stop(true); +} + +TEST(QueryMetricsIntegration, ExecuteQueryErrorRecordsErrorMetric) { + SkipQueryMetricsIntegrationIfNoEnv(); + auto [driver, registry, database] = MakeRunArgs(); + TQueryClient client(driver, TClientSettings().Database(database)); + + auto session = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(session.IsSuccess()) << session.GetIssues().ToString(); + + auto result = session.GetSession().ExecuteQuery( + "INVALID SQL QUERY !!!", + TTxControl::BeginTx().CommitTx() + ).ExtractValueSync(); + EXPECT_NE(result.GetStatus(), EStatus::SUCCESS); + + auto requests = GetCounter(registry, database, "db.client.operation.requests", "ExecuteQuery"); + ASSERT_NE(requests, nullptr); + EXPECT_GE(requests->Get(), 1); + + auto errors = GetCounter(registry, database, "db.client.operation.errors", "ExecuteQuery"); + ASSERT_NE(errors, nullptr); + EXPECT_GE(errors->Get(), 1); + + auto duration = GetDuration(registry, database, "ExecuteQuery", result.GetStatus()); + ASSERT_NE(duration, nullptr); + EXPECT_GE(duration->Count(), 1u); + + driver.Stop(true); +} + +TEST(QueryMetricsIntegration, CreateSessionRecordsMetrics) { + SkipQueryMetricsIntegrationIfNoEnv(); + auto [driver, registry, database] = MakeRunArgs(); + TQueryClient client(driver, TClientSettings().Database(database)); + + auto session = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(session.IsSuccess()) << session.GetIssues().ToString(); + + auto requests = GetCounter(registry, database, "db.client.operation.requests", "GetSession"); + ASSERT_NE(requests, nullptr) << "CreateSession request counter not created"; + EXPECT_GE(requests->Get(), 1); + + auto duration = GetDuration(registry, database, "GetSession", EStatus::SUCCESS); + ASSERT_NE(duration, nullptr) << "CreateSession duration histogram not created"; + EXPECT_GE(duration->Count(), 1u); + + driver.Stop(true); +} + +TEST(QueryMetricsIntegration, CommitTransactionRecordsMetrics) { + SkipQueryMetricsIntegrationIfNoEnv(); + auto [driver, registry, database] = MakeRunArgs(); + TQueryClient client(driver, TClientSettings().Database(database)); + + auto sessionResult = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(sessionResult.IsSuccess()) << sessionResult.GetIssues().ToString(); + auto session = sessionResult.GetSession(); + + auto beginResult = session.BeginTransaction(TTxSettings::SerializableRW()).ExtractValueSync(); + ASSERT_TRUE(beginResult.IsSuccess()) << beginResult.GetIssues().ToString(); + auto tx = beginResult.GetTransaction(); + + auto execResult = tx.GetSession().ExecuteQuery( + "SELECT 1;", + TTxControl::Tx(tx) + ).ExtractValueSync(); + ASSERT_EQ(execResult.GetStatus(), EStatus::SUCCESS) << execResult.GetIssues().ToString(); + + if (execResult.GetTransaction()) { + auto commitResult = execResult.GetTransaction()->Commit().ExtractValueSync(); + ASSERT_TRUE(commitResult.IsSuccess()) << commitResult.GetIssues().ToString(); + + auto commitRequests = GetCounter(registry, database, "db.client.operation.requests", "Commit"); + ASSERT_NE(commitRequests, nullptr) << "Commit request counter not created"; + EXPECT_GE(commitRequests->Get(), 1); + + auto commitDuration = GetDuration(registry, database, "Commit", EStatus::SUCCESS); + ASSERT_NE(commitDuration, nullptr); + EXPECT_GE(commitDuration->Count(), 1u); + } + + driver.Stop(true); +} + +TEST(QueryMetricsIntegration, RollbackTransactionRecordsMetrics) { + SkipQueryMetricsIntegrationIfNoEnv(); + auto [driver, registry, database] = MakeRunArgs(); + TQueryClient client(driver, TClientSettings().Database(database)); + + auto sessionResult = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(sessionResult.IsSuccess()) << sessionResult.GetIssues().ToString(); + auto session = sessionResult.GetSession(); + + auto beginResult = session.BeginTransaction(TTxSettings::SerializableRW()).ExtractValueSync(); + ASSERT_TRUE(beginResult.IsSuccess()) << beginResult.GetIssues().ToString(); + auto tx = beginResult.GetTransaction(); + + auto rollbackResult = tx.Rollback().ExtractValueSync(); + ASSERT_TRUE(rollbackResult.IsSuccess()) << rollbackResult.GetIssues().ToString(); + + auto rollbackRequests = GetCounter(registry, database, "db.client.operation.requests", "Rollback"); + ASSERT_NE(rollbackRequests, nullptr) << "Rollback request counter not created"; + EXPECT_GE(rollbackRequests->Get(), 1); + + auto rollbackErrors = GetCounter(registry, database, "db.client.operation.errors", "Rollback"); + ASSERT_NE(rollbackErrors, nullptr); + EXPECT_EQ(rollbackErrors->Get(), 0); + + auto rollbackDuration = GetDuration(registry, database, "Rollback", EStatus::SUCCESS); + ASSERT_NE(rollbackDuration, nullptr); + EXPECT_GE(rollbackDuration->Count(), 1u); + + driver.Stop(true); +} + +TEST(QueryMetricsIntegration, MultipleQueriesAccumulateMetrics) { + SkipQueryMetricsIntegrationIfNoEnv(); + auto [driver, registry, database] = MakeRunArgs(); + TQueryClient client(driver, TClientSettings().Database(database)); + + auto sessionResult = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(sessionResult.IsSuccess()) << sessionResult.GetIssues().ToString(); + auto session = sessionResult.GetSession(); + + const int numQueries = 5; + for (int i = 0; i < numQueries; ++i) { + auto result = session.ExecuteQuery( + "SELECT 1;", + TTxControl::BeginTx().CommitTx() + ).ExtractValueSync(); + ASSERT_EQ(result.GetStatus(), EStatus::SUCCESS) << result.GetIssues().ToString(); + } + + auto requests = GetCounter(registry, database, "db.client.operation.requests", "ExecuteQuery"); + ASSERT_NE(requests, nullptr); + EXPECT_EQ(requests->Get(), numQueries); + + auto errors = GetCounter(registry, database, "db.client.operation.errors", "ExecuteQuery"); + ASSERT_NE(errors, nullptr); + EXPECT_EQ(errors->Get(), 0); + + auto duration = GetDuration(registry, database, "ExecuteQuery", EStatus::SUCCESS); + ASSERT_NE(duration, nullptr); + EXPECT_EQ(duration->Count(), static_cast(numQueries)); + + driver.Stop(true); +} + +TEST(QueryMetricsIntegration, NoRegistryDoesNotBreakOperations) { + SkipQueryMetricsIntegrationIfNoEnv(); + std::string endpoint = GetEnvOrEmpty("YDB_ENDPOINT"); + std::string database = GetEnvOrEmpty("YDB_DATABASE"); + + auto driverConfig = TDriverConfig() + .SetEndpoint(endpoint) + .SetDatabase(database) + .SetAuthToken(GetEnvOrEmpty("YDB_TOKEN")); + + TDriver driver(driverConfig); + TQueryClient client(driver, TClientSettings().Database(database)); + + auto session = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(session.IsSuccess()) << session.GetIssues().ToString(); + + auto result = session.GetSession().ExecuteQuery( + "SELECT 1;", + TTxControl::BeginTx().CommitTx() + ).ExtractValueSync(); + EXPECT_EQ(result.GetStatus(), EStatus::SUCCESS) << result.GetIssues().ToString(); + + driver.Stop(true); +} + +TEST(QueryMetricsIntegration, DurationValuesAreRealistic) { + SkipQueryMetricsIntegrationIfNoEnv(); + auto [driver, registry, database] = MakeRunArgs(); + TQueryClient client(driver, TClientSettings().Database(database)); + + auto sessionResult = client.GetSession().ExtractValueSync(); + ASSERT_TRUE(sessionResult.IsSuccess()) << sessionResult.GetIssues().ToString(); + auto session = sessionResult.GetSession(); + + auto result = session.ExecuteQuery( + "SELECT 1;", + TTxControl::BeginTx().CommitTx() + ).ExtractValueSync(); + ASSERT_EQ(result.GetStatus(), EStatus::SUCCESS) << result.GetIssues().ToString(); + + auto duration = GetDuration(registry, database, "ExecuteQuery", EStatus::SUCCESS); + ASSERT_NE(duration, nullptr); + ASSERT_GE(duration->Count(), 1u); + + for (double v : duration->GetValues()) { + EXPECT_GE(v, 0.0) << "Duration must be non-negative"; + EXPECT_LT(v, 30.0) << "Duration > 30s is unrealistic for SELECT 1"; + } + + driver.Stop(true); +} diff --git a/tests/unit/client/observability/metrics_ut.cpp b/tests/unit/client/observability/metrics_ut.cpp new file mode 100644 index 00000000000..be073c3dcad --- /dev/null +++ b/tests/unit/client/observability/metrics_ut.cpp @@ -0,0 +1,355 @@ +#include +#include +#include +#include + +#include + +using namespace NYdb; +using namespace NYdb::NObservability; +using namespace NYdb::NMetrics; +using namespace NYdb::NTests; +using namespace NYdb::NSdkStats; + +namespace { + constexpr const char kTestDbNamespace[] = "/Root/testdb"; +} // namespace + +class RequestMetricsTest : public ::testing::Test { +protected: + void SetUp() override { + Registry = std::make_shared(); + OpCollector = TStatCollector::TClientOperationStatCollector( + nullptr, kTestDbNamespace, "", Registry); + } + + std::shared_ptr RequestCounter(const std::string& op) { + return Registry->GetCounter("db.client.operation.requests", { + {"db.system.name", "ydb"}, + {"db.namespace", kTestDbNamespace}, + {"db.operation.name", op}, + {"ydb.client.api", "Unspecified"}, + }); + } + + std::shared_ptr ErrorCounter(const std::string& op) { + return Registry->GetCounter("db.client.operation.errors", { + {"db.system.name", "ydb"}, + {"db.namespace", kTestDbNamespace}, + {"db.operation.name", op}, + {"ydb.client.api", "Unspecified"}, + }); + } + + std::shared_ptr DurationHistogram(const std::string& op, EStatus status) { + TLabels labels = { + {"db.system.name", "ydb"}, + {"db.namespace", kTestDbNamespace}, + {"db.operation.name", op}, + {"ydb.client.api", "Unspecified"}, + {"db.response.status_code", ToString(status)}, + }; + if (status != EStatus::SUCCESS) { + labels["error.type"] = ToString(status); + } + return Registry->GetHistogram("db.client.operation.duration", labels); + } + + TStatCollector::TClientOperationStatCollector OpCollector; + std::shared_ptr Registry; +}; + +TEST_F(RequestMetricsTest, RequestCounterIncrementedOnConstruction) { + TRequestMetrics metrics(&OpCollector, "DoSomething", TLog()); + + auto counter = RequestCounter("DoSomething"); + ASSERT_NE(counter, nullptr); + EXPECT_EQ(counter->Get(), 1); +} + +TEST_F(RequestMetricsTest, SuccessDoesNotIncrementErrorCounter) { + { + TRequestMetrics metrics(&OpCollector, "DoSomething", TLog()); + metrics.End(EStatus::SUCCESS); + } + + auto errors = ErrorCounter("DoSomething"); + ASSERT_NE(errors, nullptr); + EXPECT_EQ(errors->Get(), 0); +} + +TEST_F(RequestMetricsTest, FailureIncrementsErrorCounter) { + { + TRequestMetrics metrics(&OpCollector, "DoSomething", TLog()); + metrics.End(EStatus::UNAVAILABLE); + } + + auto errors = ErrorCounter("DoSomething"); + ASSERT_NE(errors, nullptr); + EXPECT_EQ(errors->Get(), 1); +} + +TEST_F(RequestMetricsTest, DurationRecordedOnEnd) { + { + TRequestMetrics metrics(&OpCollector, "DoSomething", TLog()); + metrics.End(EStatus::SUCCESS); + } + + auto hist = DurationHistogram("DoSomething", EStatus::SUCCESS); + ASSERT_NE(hist, nullptr); + EXPECT_EQ(hist->Count(), 1u); + EXPECT_GE(hist->GetValues()[0], 0.0); +} + +TEST_F(RequestMetricsTest, DurationIsInSeconds) { + { + TRequestMetrics metrics(&OpCollector, "DoSomething", TLog()); + metrics.End(EStatus::SUCCESS); + } + + auto hist = DurationHistogram("DoSomething", EStatus::SUCCESS); + ASSERT_NE(hist, nullptr); + EXPECT_LT(hist->GetValues()[0], 1.0); +} + +TEST_F(RequestMetricsTest, DoubleEndIsIdempotent) { + TRequestMetrics metrics(&OpCollector, "DoSomething", TLog()); + metrics.End(EStatus::SUCCESS); + metrics.End(EStatus::INTERNAL_ERROR); + + auto errors = ErrorCounter("DoSomething"); + ASSERT_NE(errors, nullptr); + EXPECT_EQ(errors->Get(), 0); + + auto hist = DurationHistogram("DoSomething", EStatus::SUCCESS); + ASSERT_NE(hist, nullptr); + EXPECT_EQ(hist->Count(), 1u); +} + +TEST_F(RequestMetricsTest, DestructorCallsEndWithClientInternalError) { + { + TRequestMetrics metrics(&OpCollector, "DoSomething", TLog()); + } + + auto requests = RequestCounter("DoSomething"); + ASSERT_NE(requests, nullptr); + EXPECT_EQ(requests->Get(), 1); + + auto errors = ErrorCounter("DoSomething"); + ASSERT_NE(errors, nullptr); + EXPECT_EQ(errors->Get(), 1); + + auto hist = DurationHistogram("DoSomething", EStatus::CLIENT_INTERNAL_ERROR); + ASSERT_NE(hist, nullptr); + EXPECT_EQ(hist->Count(), 1u); +} + +TEST_F(RequestMetricsTest, NullRegistryDoesNotCrash) { + EXPECT_NO_THROW({ + TStatCollector::TClientOperationStatCollector nullCollector; + TRequestMetrics metrics(&nullCollector, "DoSomething", TLog()); + metrics.End(EStatus::SUCCESS); + }); +} + +TEST_F(RequestMetricsTest, DifferentOperationsHaveSeparateMetrics) { + { + TRequestMetrics m1(&OpCollector, "OpA", TLog()); + m1.End(EStatus::SUCCESS); + } + { + TRequestMetrics m2(&OpCollector, "OpB", TLog()); + m2.End(EStatus::OVERLOADED); + } + + EXPECT_EQ(RequestCounter("OpA")->Get(), 1); + EXPECT_EQ(RequestCounter("OpB")->Get(), 1); + EXPECT_EQ(ErrorCounter("OpA")->Get(), 0); + EXPECT_EQ(ErrorCounter("OpB")->Get(), 1); + EXPECT_EQ(DurationHistogram("OpA", EStatus::SUCCESS)->Count(), 1u); + EXPECT_EQ(DurationHistogram("OpB", EStatus::OVERLOADED)->Count(), 1u); +} + +TEST_F(RequestMetricsTest, MultipleRequestsAccumulate) { + for (int i = 0; i < 5; ++i) { + TRequestMetrics metrics(&OpCollector, "Op", TLog()); + metrics.End(i % 2 == 0 ? EStatus::SUCCESS : EStatus::TIMEOUT); + } + + EXPECT_EQ(RequestCounter("Op")->Get(), 5); + EXPECT_EQ(ErrorCounter("Op")->Get(), 2); + EXPECT_EQ(DurationHistogram("Op", EStatus::SUCCESS)->Count(), 3u); + EXPECT_EQ(DurationHistogram("Op", EStatus::TIMEOUT)->Count(), 2u); +} + +TEST_F(RequestMetricsTest, AllErrorStatusesIncrementErrorCounter) { + std::vector errorStatuses = { + EStatus::BAD_REQUEST, + EStatus::UNAUTHORIZED, + EStatus::INTERNAL_ERROR, + EStatus::UNAVAILABLE, + EStatus::OVERLOADED, + EStatus::TIMEOUT, + EStatus::NOT_FOUND, + EStatus::CLIENT_INTERNAL_ERROR, + }; + + for (auto status : errorStatuses) { + TRequestMetrics metrics(&OpCollector, "Op", TLog()); + metrics.End(status); + } + + auto errors = ErrorCounter("Op"); + ASSERT_NE(errors, nullptr); + EXPECT_EQ(errors->Get(), static_cast(errorStatuses.size())); +} + +TEST(RequestMetricsDbNamespaceTest, DifferentNamespacesAreSeparateMetricSeries) { + auto registry = std::make_shared(); + TStatCollector::TClientOperationStatCollector collectorA(nullptr, "/db/alpha", "", registry); + TStatCollector::TClientOperationStatCollector collectorB(nullptr, "/db/beta", "", registry); + + { + TRequestMetrics m(&collectorA, "GetSession", TLog()); + m.End(EStatus::SUCCESS); + } + { + TRequestMetrics m(&collectorB, "GetSession", TLog()); + m.End(EStatus::SUCCESS); + } + + auto labelsAlpha = [](const char* op) { + return NMetrics::TLabels{ + {"db.system.name", "ydb"}, + {"db.namespace", "/db/alpha"}, + {"db.operation.name", op}, + {"ydb.client.api", "Unspecified"}, + }; + }; + auto labelsBeta = [](const char* op) { + return NMetrics::TLabels{ + {"db.system.name", "ydb"}, + {"db.namespace", "/db/beta"}, + {"db.operation.name", op}, + {"ydb.client.api", "Unspecified"}, + }; + }; + + auto reqAlpha = registry->GetCounter("db.client.operation.requests", labelsAlpha("GetSession")); + auto reqBeta = registry->GetCounter("db.client.operation.requests", labelsBeta("GetSession")); + ASSERT_NE(reqAlpha, nullptr); + ASSERT_NE(reqBeta, nullptr); + EXPECT_EQ(reqAlpha->Get(), 1); + EXPECT_EQ(reqBeta->Get(), 1); + + auto durAlpha = registry->GetHistogram( + "db.client.operation.duration", + [&] { + auto l = labelsAlpha("GetSession"); + l["db.response.status_code"] = ToString(EStatus::SUCCESS); + return l; + }()); + auto durBeta = registry->GetHistogram( + "db.client.operation.duration", + [&] { + auto l = labelsBeta("GetSession"); + l["db.response.status_code"] = ToString(EStatus::SUCCESS); + return l; + }()); + ASSERT_NE(durAlpha, nullptr); + ASSERT_NE(durBeta, nullptr); + EXPECT_EQ(durAlpha->Count(), 1u); + EXPECT_EQ(durBeta->Count(), 1u); +} + +TEST(RequestMetricsClientAliasesTest, QueryOperationsUseOtelStandardMetrics) { + auto registry = std::make_shared(); + TStatCollector::TClientOperationStatCollector collector(nullptr, "", "Query", registry); + + NObservability::TRequestMetrics metrics(&collector, "ExecuteQuery", TLog()); + metrics.End(EStatus::SUCCESS); + + EXPECT_NE( + registry->GetCounter( + "db.client.operation.requests", + { + {"db.system.name", "ydb"}, + {"db.namespace", ""}, + {"db.operation.name", "ExecuteQuery"}, + {"ydb.client.api", "Query"}, + } + ), + nullptr + ); + EXPECT_NE( + registry->GetCounter( + "db.client.operation.errors", + { + {"db.system.name", "ydb"}, + {"db.namespace", ""}, + {"db.operation.name", "ExecuteQuery"}, + {"ydb.client.api", "Query"}, + } + ), + nullptr + ); + EXPECT_NE( + registry->GetHistogram( + "db.client.operation.duration", + { + {"db.system.name", "ydb"}, + {"db.namespace", ""}, + {"db.operation.name", "ExecuteQuery"}, + {"ydb.client.api", "Query"}, + {"db.response.status_code", ToString(EStatus::SUCCESS)}, + } + ), + nullptr + ); +} + +TEST(RequestMetricsClientAliasesTest, TableOperationsUseOtelStandardMetrics) { + auto registry = std::make_shared(); + TStatCollector::TClientOperationStatCollector collector(nullptr, "", "Table", registry); + + NObservability::TRequestMetrics metrics(&collector, "ExecuteDataQuery", TLog()); + metrics.End(EStatus::SUCCESS); + + EXPECT_NE( + registry->GetCounter( + "db.client.operation.requests", + { + {"db.system.name", "ydb"}, + {"db.namespace", ""}, + {"db.operation.name", "ExecuteDataQuery"}, + {"ydb.client.api", "Table"}, + } + ), + nullptr + ); + EXPECT_NE( + registry->GetCounter( + "db.client.operation.errors", + { + {"db.system.name", "ydb"}, + {"db.namespace", ""}, + {"db.operation.name", "ExecuteDataQuery"}, + {"ydb.client.api", "Table"}, + } + ), + nullptr + ); + EXPECT_NE( + registry->GetHistogram( + "db.client.operation.duration", + { + {"db.system.name", "ydb"}, + {"db.namespace", ""}, + {"db.operation.name", "ExecuteDataQuery"}, + {"ydb.client.api", "Table"}, + {"db.response.status_code", ToString(EStatus::SUCCESS)}, + } + ), + nullptr + ); +} From 0e03f142a78a4249b540d714d391a01c59fc89a3 Mon Sep 17 00:00:00 2001 From: kseleznyov Date: Fri, 24 Apr 2026 11:57:20 +0000 Subject: [PATCH 02/13] Specify `traceId` in change (CDC) records (#34489) --- .github/last_commit.txt | 2 +- include/ydb-cpp-sdk/client/table/table.h | 4 ++++ src/api/protos/ydb_table.proto | 4 ++++ src/client/table/table.cpp | 13 +++++++++++++ 4 files changed, 22 insertions(+), 1 deletion(-) diff --git a/.github/last_commit.txt b/.github/last_commit.txt index 6e5c83dbadc..eb19806c5c3 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -af5b7b9d6af94c4dd153ffd112d9119bc6ad568f +072de56c9e541bc60bbb2a5f060213d91544411c diff --git a/include/ydb-cpp-sdk/client/table/table.h b/include/ydb-cpp-sdk/client/table/table.h index 697d8d708fe..6527affe256 100644 --- a/include/ydb-cpp-sdk/client/table/table.h +++ b/include/ydb-cpp-sdk/client/table/table.h @@ -618,6 +618,8 @@ class TChangefeedDescription { TChangefeedDescription& WithInitialScan(); // Enable UserSIDs TChangefeedDescription& WithUserSIDs(); + // Enable TraceIds + TChangefeedDescription& WithTraceIds(); // Attributes TChangefeedDescription& AddAttribute(const std::string& key, const std::string& value); TChangefeedDescription& SetAttributes(const std::unordered_map& attrs); @@ -634,6 +636,7 @@ class TChangefeedDescription { const std::optional& GetResolvedTimestamps() const; bool GetInitialScan() const; bool GetUserSIDs() const; + bool GetTraceIds() const; const std::unordered_map& GetAttributes() const; const std::string& GetAwsRegion() const; const std::optional& GetInitialScanProgress() const; @@ -664,6 +667,7 @@ class TChangefeedDescription { std::optional RetentionPeriod_; bool InitialScan_ = false; bool UserSIDs_ = false; + bool TraceIds_ = false; std::unordered_map Attributes_; std::string AwsRegion_; std::optional InitialScanProgress_; diff --git a/src/api/protos/ydb_table.proto b/src/api/protos/ydb_table.proto index d9dd173266a..87656a497da 100644 --- a/src/api/protos/ydb_table.proto +++ b/src/api/protos/ydb_table.proto @@ -472,6 +472,8 @@ message Changefeed { bool schema_changes = 11; // Emit user security identifier (SID) or not. bool user_sids = 12; + // Emit user trace identifier (trace_id) or not. + bool trace_ids = 13; } message ChangefeedDescription { @@ -517,6 +519,8 @@ message ChangefeedDescription { bool schema_changes = 10; // State of emitting of user security identifier (SID) bool user_sids = 11; + // State of emitting of user trace identifier (trace_id) or not. + bool trace_ids = 12; } message StoragePool { diff --git a/src/client/table/table.cpp b/src/client/table/table.cpp index b0e2c62387b..5f2c28cbdf7 100644 --- a/src/client/table/table.cpp +++ b/src/client/table/table.cpp @@ -3275,6 +3275,11 @@ TChangefeedDescription& TChangefeedDescription::WithUserSIDs() { return *this; } +TChangefeedDescription& TChangefeedDescription::WithTraceIds() { + TraceIds_ = true; + return *this; +} + TChangefeedDescription& TChangefeedDescription::AddAttribute(const std::string& key, const std::string& value) { Attributes_[key] = value; return *this; @@ -3331,6 +3336,10 @@ bool TChangefeedDescription::GetUserSIDs() const { return UserSIDs_; } +bool TChangefeedDescription::GetTraceIds() const { + return TraceIds_; +} + const std::unordered_map& TChangefeedDescription::GetAttributes() const { return Attributes_; } @@ -3393,6 +3402,9 @@ TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) { if (proto.user_sids()) { ret.WithUserSIDs(); } + if (proto.trace_ids()) { + ret.WithTraceIds(); + } if (proto.has_resolved_timestamps_interval()) { ret.WithResolvedTimestamps(TDuration::MilliSeconds( ::google::protobuf::util::TimeUtil::DurationToMilliseconds(proto.resolved_timestamps_interval()))); @@ -3439,6 +3451,7 @@ void TChangefeedDescription::SerializeCommonFields(TProto& proto) const { proto.set_schema_changes(SchemaChanges_); proto.set_aws_region(TStringType{AwsRegion_}); proto.set_user_sids(UserSIDs_); + proto.set_trace_ids(TraceIds_); switch (Mode_) { case EChangefeedMode::KeysOnly: From 963b1757967ec130b30627efb656a3183e72118c Mon Sep 17 00:00:00 2001 From: stanislav_shchetinin Date: Fri, 24 Apr 2026 11:57:28 +0000 Subject: [PATCH 03/13] Review fixes for src path in NFS backups (#38012) --- .github/last_commit.txt | 2 +- include/ydb-cpp-sdk/client/import/import.h | 4 ++-- src/client/import/import.cpp | 12 ++++++++---- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/.github/last_commit.txt b/.github/last_commit.txt index eb19806c5c3..cc234dd0c99 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -072de56c9e541bc60bbb2a5f060213d91544411c +8bd6bc748cce9590e29154658decdef381686b80 diff --git a/include/ydb-cpp-sdk/client/import/import.h b/include/ydb-cpp-sdk/client/import/import.h index e3ab71d8d38..9949059950a 100644 --- a/include/ydb-cpp-sdk/client/import/import.h +++ b/include/ydb-cpp-sdk/client/import/import.h @@ -162,10 +162,10 @@ struct TImportFromFsSettings : public TOperationRequestSettingsAdd(); if (!item.Src.empty()) { protoItem.set_source_path(item.Src); } - if (!item.SrcPath.empty()) { - protoItem.set_source_path_db(item.SrcPath); + if (!item.SrcPathDb.empty()) { + protoItem.set_source_path_db(item.SrcPathDb); } protoItem.set_destination_path(item.Dst); } @@ -368,6 +368,10 @@ TAsyncImportFromFsResponse TImportClient::ImportFromFs(const TImportFromFsSettin settingsProto.set_destination_path(TStringType{settings.DestinationPath_.value()}); } + if (settings.SymmetricKey_) { + settingsProto.mutable_encryption_settings()->mutable_symmetric_key()->set_key(*settings.SymmetricKey_); + } + settingsProto.set_index_population_mode(TProtoAccessor::GetProto(settings.IndexPopulationMode_)); for (const std::string& excludeRegexp : settings.ExcludeRegexp_) { From c4a8cb547f6f651aa7eea62303e8895b6112d2f1 Mon Sep 17 00:00:00 2001 From: Alek5andr-Kotov Date: Fri, 24 Apr 2026 11:57:35 +0000 Subject: [PATCH 04/13] Fixed the ydb CLI build for Windows (#38185) --- .github/last_commit.txt | 2 +- include/ydb-cpp-sdk/client/topic/write_session.h | 2 ++ src/client/topic/impl/write_session.cpp | 11 ++++++++++- src/client/topic/ut/topic_tx_skip_conflict_ut.cpp | 14 +++++++------- 4 files changed, 20 insertions(+), 9 deletions(-) diff --git a/.github/last_commit.txt b/.github/last_commit.txt index cc234dd0c99..f6a9fab3a78 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -8bd6bc748cce9590e29154658decdef381686b80 +97d034e280be7e2edad976492a4732824c68f9d7 diff --git a/include/ydb-cpp-sdk/client/topic/write_session.h b/include/ydb-cpp-sdk/client/topic/write_session.h index d5b6b16d70f..d25e9e80e7b 100644 --- a/include/ydb-cpp-sdk/client/topic/write_session.h +++ b/include/ydb-cpp-sdk/client/topic/write_session.h @@ -181,6 +181,8 @@ struct TWriteSessionSettings : public TRequestSettings { //! Enables validation of SeqNo. If enabled, then writer will check writing with seqNo and without it and throws exception. FLUENT_SETTING_DEFAULT(bool, ValidateSeqNo, true); + + TWriteSessionSettings& SetTrackProducerIdInTx(bool value); }; template diff --git a/src/client/topic/impl/write_session.cpp b/src/client/topic/impl/write_session.cpp index bb540a94bf3..6e962cb7481 100644 --- a/src/client/topic/impl/write_session.cpp +++ b/src/client/topic/impl/write_session.cpp @@ -4,6 +4,8 @@ #include #include +#include + #include #include @@ -15,6 +17,13 @@ namespace NYdb::inline V3::NTopic { +TWriteSessionSettings& TWriteSessionSettings::SetTrackProducerIdInTx(bool value) +{ + const auto& key = NPersQueue::WRITE_SESSION_ATTRIBUTE_TRACK_PRODUCER_ID_IN_TX; + return AppendSessionMeta({key.data(), key.size()}, + value ? "true" : "false"); +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // TWriteSession @@ -164,4 +173,4 @@ bool TSimpleBlockingWriteSession::Close(TDuration closeTimeout) { return Writer->Close(std::move(closeTimeout)); } -} // namespace NYdb::inline V3::NTopic \ No newline at end of file +} // namespace NYdb::inline V3::NTopic diff --git a/src/client/topic/ut/topic_tx_skip_conflict_ut.cpp b/src/client/topic/ut/topic_tx_skip_conflict_ut.cpp index ac0f44123cd..d8ff0af0cf6 100644 --- a/src/client/topic/ut/topic_tx_skip_conflict_ut.cpp +++ b/src/client/topic/ut/topic_tx_skip_conflict_ut.cpp @@ -1,6 +1,6 @@ #include -#include +#include #include @@ -208,10 +208,9 @@ class TFixtureTopicTxMatrix : public TFixtureTopicTxMatrixBase { } void AugmentWriteSessionSettings(NTopic::TWriteSessionSettings& options) override { - if constexpr (TrackProducerIdInTxMeta == ETrackProducerIdInTxMeta::True) { - options.AppendSessionMeta(std::string{NKikimr::NPQ::WRITE_SESSION_ATTRIBUTE_TRACK_PRODUCER_ID_IN_TX}, "true"); - } else if constexpr (TrackProducerIdInTxMeta == ETrackProducerIdInTxMeta::False) { - options.AppendSessionMeta(std::string{NKikimr::NPQ::WRITE_SESSION_ATTRIBUTE_TRACK_PRODUCER_ID_IN_TX}, "false"); + if ((TrackProducerIdInTxMeta == ETrackProducerIdInTxMeta::True) || + (TrackProducerIdInTxMeta == ETrackProducerIdInTxMeta::False)) { + options.SetTrackProducerIdInTx(TrackProducerIdInTxMeta == ETrackProducerIdInTxMeta::True); } } }; @@ -284,7 +283,8 @@ Y_UNIT_TEST_F(InvalidWriteSessionAttributeTrackProducerIdInTx_RejectsInit, TFixt options.ProducerId(TEST_MESSAGE_GROUP_ID); options.MessageGroupId(TEST_MESSAGE_GROUP_ID); options.Codec(ECodec::RAW); - options.AppendSessionMeta(std::string{NKikimr::NPQ::WRITE_SESSION_ATTRIBUTE_TRACK_PRODUCER_ID_IN_TX}, "not-a-bool"); + const auto& key = ::NPersQueue::WRITE_SESSION_ATTRIBUTE_TRACK_PRODUCER_ID_IN_TX; + options.AppendSessionMeta({key.data(), key.size()}, "not-a-bool"); auto ws = client.CreateWriteSession(options); @@ -306,7 +306,7 @@ Y_UNIT_TEST_F(InvalidWriteSessionAttributeTrackProducerIdInTx_RejectsInit, TFixt UNIT_ASSERT_C(closed.has_value(), "session must close after invalid WRITE_SESSION_ATTRIBUTE_TRACK_PRODUCER_ID_IN_TX"); UNIT_ASSERT_VALUES_EQUAL(closed->GetStatus(), EStatus::BAD_REQUEST); const TString issues = closed->GetIssues().ToOneLineString(); - UNIT_ASSERT_STRING_CONTAINS(issues, NKikimr::NPQ::WRITE_SESSION_ATTRIBUTE_TRACK_PRODUCER_ID_IN_TX); + UNIT_ASSERT_STRING_CONTAINS(issues, key); UNIT_ASSERT_STRING_CONTAINS(issues, "not-a-bool"); ws->Close(TDuration::Seconds(5)); From 76b0388095a750b0596d06d01bfc9998564bb165 Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Fri, 24 Apr 2026 11:57:42 +0000 Subject: [PATCH 05/13] Add build info to sdk and cli (#37914) --- .github/last_commit.txt | 2 +- include/ydb-cpp-sdk/client/driver/driver.h | 8 ++ src/client/driver/driver.cpp | 84 +++++++++++++++++++ .../grpc_connections/grpc_connections.cpp | 13 ++- .../grpc_connections/grpc_connections.h | 2 + .../impl/internal/grpc_connections/params.h | 1 + .../unit/client/build_info/build_info_ut.cpp | 76 +++++++++++++++++ 7 files changed, 184 insertions(+), 2 deletions(-) create mode 100644 tests/unit/client/build_info/build_info_ut.cpp diff --git a/.github/last_commit.txt b/.github/last_commit.txt index f6a9fab3a78..094a0c2361e 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -97d034e280be7e2edad976492a4732824c68f9d7 +fb46c0f1e5867055e85d3f5f3f8e90d3855934d5 diff --git a/include/ydb-cpp-sdk/client/driver/driver.h b/include/ydb-cpp-sdk/client/driver/driver.h index 0f1ac3dfca4..6b5c1860026 100644 --- a/include/ydb-cpp-sdk/client/driver/driver.h +++ b/include/ydb-cpp-sdk/client/driver/driver.h @@ -161,6 +161,14 @@ class TDriverConfig { //! default: 0 TDriverConfig& SetMaxMessageSize(uint64_t maxMessageSize); + //! Append a segment to the SDK build info header (x-ydb-sdk-build-info). + //! Do not call this method unless you know exactly what you are doing. + //! Segments are joined with ';'. Each segment must match: /.. + //! name chars: lowercase latin letters, digits, '-' + //! X, Y, Z chars: lowercase latin letters, digits + //! Throws on invalid format or if total extra length exceeds 512 bytes. + TDriverConfig& AppendBuildInfo(std::string_view segment); + //! Log backend. TDriverConfig& SetLog(std::unique_ptr&& log); diff --git a/src/client/driver/driver.cpp b/src/client/driver/driver.cpp index 97022547244..dc0ded514f2 100644 --- a/src/client/driver/driver.cpp +++ b/src/client/driver/driver.cpp @@ -1,4 +1,5 @@ #include +#include #define INCLUDE_YDB_INTERNAL_H #include @@ -53,6 +54,7 @@ class TDriverConfig::TImpl : public IConnectionsParams { uint64_t GetMaxMessageSize() const override { return MaxMessageSize; } const TLog& GetLog() const override { return Log; } std::shared_ptr GetExecutor() const override { return Executor; } + std::string GetBuildInfoExtra() const override { return BuildInfoExtra; } std::shared_ptr GetExternalMetricRegistry() const override { return MetricRegistry; } std::shared_ptr GetTraceProvider() const override { return TraceProvider; } @@ -86,6 +88,7 @@ class TDriverConfig::TImpl : public IConnectionsParams { uint64_t MaxMessageSize = 0; TLog Log; // Null by default. std::shared_ptr Executor; + std::string BuildInfoExtra; std::shared_ptr MetricRegistry; std::shared_ptr TraceProvider; }; @@ -247,6 +250,87 @@ TDriverConfig& TDriverConfig::SetExecutor(std::shared_ptr executor) { return *this; } +namespace { + +constexpr size_t MaxBuildInfoExtraLength = 512; + +bool IsNameChar(char c) { + return (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '-'; +} + +bool IsVersionPartChar(char c) { + return (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9'); +} + +bool IsNonEmptyAlnum(std::string_view s) { + if (s.empty()) { + return false; + } + for (char c : s) { + if (!IsVersionPartChar(c)) { + return false; + } + } + return true; +} + +// Expected format: /.. +// name: [a-z0-9-]+, X/Y/Z: [a-z0-9]+ +bool IsValidBuildInfoSegment(std::string_view segment) { + auto slash = segment.find('/'); + if (slash == std::string_view::npos || slash == 0) { + return false; + } + + auto name = segment.substr(0, slash); + for (char c : name) { + if (!IsNameChar(c)) { + return false; + } + } + + auto version = segment.substr(slash + 1); + auto dot1 = version.find('.'); + if (dot1 == std::string_view::npos) { + return false; + } + auto dot2 = version.find('.', dot1 + 1); + if (dot2 == std::string_view::npos) { + return false; + } + if (version.find('.', dot2 + 1) != std::string_view::npos) { + return false; + } + + return IsNonEmptyAlnum(version.substr(0, dot1)) + && IsNonEmptyAlnum(version.substr(dot1 + 1, dot2 - dot1 - 1)) + && IsNonEmptyAlnum(version.substr(dot2 + 1)); +} + +} // anonymous namespace + +TDriverConfig& TDriverConfig::AppendBuildInfo(std::string_view segment) { + if (segment.empty()) { + return *this; + } + if (!IsValidBuildInfoSegment(segment)) { + throw TContractViolation(TStringBuilder() << "Invalid build info segment '" << segment + << "'. Expected format: /.." + " (name: [a-z0-9-]+, X/Y/Z: [a-z0-9]+)"); + } + auto& extra = Impl_->BuildInfoExtra; + size_t newLength = extra.size() + (extra.empty() ? 0 : 1) + segment.size(); + if (newLength > MaxBuildInfoExtraLength) { + throw TContractViolation(TStringBuilder() << "Build info extra exceeds maximum length of " + << MaxBuildInfoExtraLength << " bytes"); + } + if (!extra.empty()) { + extra += ';'; + } + extra += segment; + return *this; +} + TDriverConfig& TDriverConfig::SetMetricRegistry(std::shared_ptr registry) { Impl_->MetricRegistry = std::move(registry); return *this; diff --git a/src/client/impl/internal/grpc_connections/grpc_connections.cpp b/src/client/impl/internal/grpc_connections/grpc_connections.cpp index 134691ed234..61a3b047101 100644 --- a/src/client/impl/internal/grpc_connections/grpc_connections.cpp +++ b/src/client/impl/internal/grpc_connections/grpc_connections.cpp @@ -38,6 +38,16 @@ std::string CreateSDKBuildInfo() { return std::string("ydb-cpp-sdk/") + GetSdkSemver(); } +std::string BuildFullBuildInfo(const IConnectionsParams& params) { + auto result = CreateSDKBuildInfo(); + auto extra = params.GetBuildInfoExtra(); + if (!extra.empty()) { + result += ';'; + result += extra; + } + return result; +} + template class TScheduledObject : public TThrRefBase { using TSelf = TScheduledObject; @@ -171,6 +181,7 @@ TGRpcConnectionsImpl::TGRpcConnectionsImpl(std::shared_ptr p #endif , MetricRegistry_(params->GetExternalMetricRegistry()) , TraceProvider_(params->GetTraceProvider()) + , BuildInfo_(BuildFullBuildInfo(*params)) , NetworkThreadsNum_(params->GetNetworkThreadsNum()) , UsePerChannelTcpConnection_(params->GetUsePerChannelTcpConnection()) , GRpcClientLow_(NetworkThreadsNum_) @@ -491,7 +502,7 @@ TCallMeta TGRpcConnectionsImpl::MakeCallMeta(const TRpcRequestSettings& requestS static const std::string clientPid = GetClientPIDHeaderValue(); - meta.Aux.push_back({YDB_SDK_BUILD_INFO_HEADER, CreateSDKBuildInfo()}); + meta.Aux.push_back({YDB_SDK_BUILD_INFO_HEADER, BuildInfo_}); meta.Aux.push_back({YDB_CLIENT_PID, clientPid}); meta.Aux.insert(meta.Aux.end(), requestSettings.Header.begin(), requestSettings.Header.end()); diff --git a/src/client/impl/internal/grpc_connections/grpc_connections.h b/src/client/impl/internal/grpc_connections/grpc_connections.h index 9f0859d5076..5912a707308 100644 --- a/src/client/impl/internal/grpc_connections/grpc_connections.h +++ b/src/client/impl/internal/grpc_connections/grpc_connections.h @@ -732,6 +732,8 @@ class TGRpcConnectionsImpl IDiscoveryMutatorApi::TMutatorCb DiscoveryMutatorCb; + const std::string BuildInfo_; + const std::size_t NetworkThreadsNum_; bool UsePerChannelTcpConnection_; // Must be the last member (first called destructor) diff --git a/src/client/impl/internal/grpc_connections/params.h b/src/client/impl/internal/grpc_connections/params.h index 11e41ffad6a..8ac2160f2bf 100644 --- a/src/client/impl/internal/grpc_connections/params.h +++ b/src/client/impl/internal/grpc_connections/params.h @@ -46,6 +46,7 @@ class IConnectionsParams { virtual uint64_t GetMaxOutboundMessageSize() const = 0; virtual uint64_t GetMaxMessageSize() const = 0; virtual std::shared_ptr GetExecutor() const = 0; + virtual std::string GetBuildInfoExtra() const = 0; virtual std::shared_ptr GetExternalMetricRegistry() const = 0; virtual std::shared_ptr GetTraceProvider() const = 0; }; diff --git a/tests/unit/client/build_info/build_info_ut.cpp b/tests/unit/client/build_info/build_info_ut.cpp new file mode 100644 index 00000000000..56941b44923 --- /dev/null +++ b/tests/unit/client/build_info/build_info_ut.cpp @@ -0,0 +1,76 @@ +#include +#include +#include + +using namespace NYdb; + +TEST(AppendBuildInfo, ValidSegments) { + TDriverConfig config; + EXPECT_NO_THROW(config.AppendBuildInfo("ydb-cli/2.31.0")); + EXPECT_NO_THROW(config.AppendBuildInfo("ydb-cli-import-file-csv/2.31.0")); + EXPECT_NO_THROW(config.AppendBuildInfo("ya-ydb/2026.02.09")); +} + +TEST(AppendBuildInfo, EmptySegmentIsNoOp) { + TDriverConfig config; + EXPECT_NO_THROW(config.AppendBuildInfo("")); +} + +TEST(AppendBuildInfo, RejectsNoSlash) { + TDriverConfig config; + EXPECT_THROW(config.AppendBuildInfo("ydb-cli"), TContractViolation); +} + +TEST(AppendBuildInfo, RejectsNoDots) { + TDriverConfig config; + EXPECT_THROW(config.AppendBuildInfo("ydb-cli/dev"), TContractViolation); +} + +TEST(AppendBuildInfo, RejectsOneDot) { + TDriverConfig config; + EXPECT_THROW(config.AppendBuildInfo("ydb-cli/2.0"), TContractViolation); +} + +TEST(AppendBuildInfo, RejectsThreeDots) { + TDriverConfig config; + EXPECT_THROW(config.AppendBuildInfo("ydb-cli/1.2.3.4"), TContractViolation); +} + +TEST(AppendBuildInfo, RejectsDashInVersion) { + TDriverConfig config; + EXPECT_THROW(config.AppendBuildInfo("ydb-cli/2.31.0-rc1"), TContractViolation); +} + +TEST(AppendBuildInfo, RejectsUppercase) { + TDriverConfig config; + EXPECT_THROW(config.AppendBuildInfo("YDB/1.0.0"), TContractViolation); +} + +TEST(AppendBuildInfo, RejectsSemicolon) { + TDriverConfig config; + EXPECT_THROW(config.AppendBuildInfo("a;b/1.0.0"), TContractViolation); +} + +TEST(AppendBuildInfo, RejectsEmptyName) { + TDriverConfig config; + EXPECT_THROW(config.AppendBuildInfo("/1.0.0"), TContractViolation); +} + +TEST(AppendBuildInfo, RejectsEmptyVersionPart) { + TDriverConfig config; + EXPECT_THROW(config.AppendBuildInfo("ydb-cli/.1.0"), TContractViolation); + EXPECT_THROW(config.AppendBuildInfo("ydb-cli/1..0"), TContractViolation); + EXPECT_THROW(config.AppendBuildInfo("ydb-cli/1.0."), TContractViolation); +} + +TEST(AppendBuildInfo, LengthLimit) { + TDriverConfig config; + std::string longName(510, 'a'); + EXPECT_THROW(config.AppendBuildInfo(longName + "/1.0.0"), TContractViolation); +} + +TEST(AppendBuildInfo, MultipleSegmentsAccumulate) { + TDriverConfig config; + EXPECT_NO_THROW(config.AppendBuildInfo("ydb-cli/2.31.0")); + EXPECT_NO_THROW(config.AppendBuildInfo("ydb-cli-sql/2.31.0")); +} From e1e386e39666e8f053280a57708b64b14e365857 Mon Sep 17 00:00:00 2001 From: Bulat Date: Fri, 24 Apr 2026 11:57:49 +0000 Subject: [PATCH 06/13] feat sdk: supported gRPC compression option on client side (#38231) --- .github/last_commit.txt | 2 +- CHANGELOG.md | 4 ++++ include/ydb-cpp-sdk/client/driver/driver.h | 7 +++++++ include/ydb-cpp-sdk/client/types/ydb.h | 6 ++++++ src/client/driver/driver.cpp | 8 ++++++++ .../grpc_connections/grpc_connections.cpp | 15 +++++++++++++++ .../internal/grpc_connections/grpc_connections.h | 5 +++++ .../impl/internal/grpc_connections/params.h | 2 ++ 8 files changed, 48 insertions(+), 1 deletion(-) diff --git a/.github/last_commit.txt b/.github/last_commit.txt index 094a0c2361e..7dcc454d690 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -fb46c0f1e5867055e85d3f5f3f8e90d3855934d5 +7a52758c6d7d5e61ef23b6fef7e0b67f1d039554 diff --git a/CHANGELOG.md b/CHANGELOG.md index 91bf0c16430..31efdd3b5ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +* Supported gRPC compression option on client side + +## v3.17.0 + * Added support of describe for scheme objects with type 'secret' via new TSecretClient * Added support for METRICS_LEVEL for the CreateTable/AlterTable requests. diff --git a/include/ydb-cpp-sdk/client/driver/driver.h b/include/ydb-cpp-sdk/client/driver/driver.h index 6b5c1860026..ae492602bd6 100644 --- a/include/ydb-cpp-sdk/client/driver/driver.h +++ b/include/ydb-cpp-sdk/client/driver/driver.h @@ -14,6 +14,8 @@ #include +#include + //////////////////////////////////////////////////////////////////////////////// namespace NYdb::inline V3 { @@ -139,6 +141,11 @@ class TDriverConfig { //! default: "round_robin" TDriverConfig& SetGRpcLoadBalancingPolicy(const std::string& policy); + //! Set grpc compression algorithm + //! algorithm - EGrpcCompressionAlgorithm enum value, see grpc documentation for available algorithms + //! default: EGrpcCompressionAlgorithm::None + TDriverConfig& SetGRpcCompressionAlgorithm(EGrpcCompressionAlgorithm algorithm); + //! Set inactive socket timeout. //! Used to close connections, that were inactive for given time. //! Closes unused connections every 1/10 of timeout, so deletion time is approximate. diff --git a/include/ydb-cpp-sdk/client/types/ydb.h b/include/ydb-cpp-sdk/client/types/ydb.h index c62652c609d..509a6585313 100644 --- a/include/ydb-cpp-sdk/client/types/ydb.h +++ b/include/ydb-cpp-sdk/client/types/ydb.h @@ -46,6 +46,12 @@ enum EPileState { DISCONNECTED = 6 /* "disconnected" */ }; +enum class EGrpcCompressionAlgorithm { + None, + Deflate, + Gzip +}; + class TBalancingPolicy { friend class TDriverConfig; friend class TDriver; diff --git a/src/client/driver/driver.cpp b/src/client/driver/driver.cpp index dc0ded514f2..4dfdb88477f 100644 --- a/src/client/driver/driver.cpp +++ b/src/client/driver/driver.cpp @@ -47,6 +47,7 @@ class TDriverConfig::TImpl : public IConnectionsParams { TDuration GetGRpcKeepAliveTimeout() const override { return GRpcKeepAliveTimeout; } bool GetGRpcKeepAlivePermitWithoutCalls() const override { return GRpcKeepAlivePermitWithoutCalls; } std::string GetGRpcLoadBalancingPolicy() const override { return GRpcLoadBalancingPolicy; } + EGrpcCompressionAlgorithm GetGRpcCompressionAlgorithm() const override { return GRpcCompressionAlgorithm; } TDuration GetSocketIdleTimeout() const override { return SocketIdleTimeout; } uint64_t GetMemoryQuota() const override { return MemoryQuota; } uint64_t GetMaxInboundMessageSize() const override { return MaxInboundMessageSize; } @@ -81,6 +82,7 @@ class TDriverConfig::TImpl : public IConnectionsParams { TDuration GRpcKeepAliveTimeout = TDuration::Seconds(10); bool GRpcKeepAlivePermitWithoutCalls = true; std::string GRpcLoadBalancingPolicy = "round_robin"; + EGrpcCompressionAlgorithm GRpcCompressionAlgorithm = EGrpcCompressionAlgorithm::None; TDuration SocketIdleTimeout = TDuration::Minutes(6); uint64_t MemoryQuota = 0; uint64_t MaxInboundMessageSize = 0; @@ -220,6 +222,11 @@ TDriverConfig& TDriverConfig::SetGRpcLoadBalancingPolicy(const std::string& poli return *this; } +TDriverConfig& TDriverConfig::SetGRpcCompressionAlgorithm(EGrpcCompressionAlgorithm algorithm) { + Impl_->GRpcCompressionAlgorithm = algorithm; + return *this; +} + TDriverConfig& TDriverConfig::SetSocketIdleTimeout(TDuration timeout) { Impl_->SocketIdleTimeout = timeout; return *this; @@ -389,6 +396,7 @@ TDriverConfig TDriver::GetConfig() const { config.SetGRpcKeepAliveTimeout(std::chrono::duration_cast(Impl_->GRpcKeepAliveTimeout_)); config.SetGRpcKeepAlivePermitWithoutCalls(Impl_->GRpcKeepAlivePermitWithoutCalls_); config.SetGRpcLoadBalancingPolicy(Impl_->GRpcLoadBalancingPolicy_); + config.SetGRpcCompressionAlgorithm(Impl_->GRpcCompressionAlgorithm_); config.SetSocketIdleTimeout(std::chrono::duration_cast(Impl_->SocketIdleTimeout_)); config.SetMaxInboundMessageSize(Impl_->MaxInboundMessageSize_); config.SetMaxOutboundMessageSize(Impl_->MaxOutboundMessageSize_); diff --git a/src/client/impl/internal/grpc_connections/grpc_connections.cpp b/src/client/impl/internal/grpc_connections/grpc_connections.cpp index 61a3b047101..47b1e65051a 100644 --- a/src/client/impl/internal/grpc_connections/grpc_connections.cpp +++ b/src/client/impl/internal/grpc_connections/grpc_connections.cpp @@ -168,6 +168,7 @@ TGRpcConnectionsImpl::TGRpcConnectionsImpl(std::shared_ptr p , GRpcKeepAliveTimeout_(TDeadline::SafeDurationCast(params->GetGRpcKeepAliveTimeout())) , GRpcKeepAlivePermitWithoutCalls_(params->GetGRpcKeepAlivePermitWithoutCalls()) , GRpcLoadBalancingPolicy_(params->GetGRpcLoadBalancingPolicy()) + , GRpcCompressionAlgorithm_(params->GetGRpcCompressionAlgorithm()) , MemoryQuota_(params->GetMemoryQuota()) , MaxInboundMessageSize_(params->GetMaxInboundMessageSize()) , MaxOutboundMessageSize_(params->GetMaxOutboundMessageSize()) @@ -349,6 +350,20 @@ void TGRpcConnectionsImpl::SetGrpcKeepAlive(NYdbGrpc::TGRpcClientConfig& config, config.IntChannelParams[GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS] = permitWithoutCalls ? 1 : 0; } +void TGRpcConnectionsImpl::SetGrpcCompressionAlgorithm(NYdbGrpc::TGRpcClientConfig& config, EGrpcCompressionAlgorithm algorithm) { + switch (algorithm) { + case EGrpcCompressionAlgorithm::None: + config.CompressionAlgorithm = GRPC_COMPRESS_NONE; + break; + case EGrpcCompressionAlgorithm::Deflate: + config.CompressionAlgorithm = GRPC_COMPRESS_DEFLATE; + break; + case EGrpcCompressionAlgorithm::Gzip: + config.CompressionAlgorithm = GRPC_COMPRESS_GZIP; + break; + } +} + TAsyncListEndpointsResult TGRpcConnectionsImpl::GetEndpoints(TDbDriverStatePtr dbState) { Ydb::Discovery::ListEndpointsRequest request; request.set_database(TStringType{dbState->Database}); diff --git a/src/client/impl/internal/grpc_connections/grpc_connections.h b/src/client/impl/internal/grpc_connections/grpc_connections.h index 5912a707308..d8ba56c90fd 100644 --- a/src/client/impl/internal/grpc_connections/grpc_connections.h +++ b/src/client/impl/internal/grpc_connections/grpc_connections.h @@ -86,6 +86,8 @@ class TGRpcConnectionsImpl static void SetGrpcKeepAlive(NYdbGrpc::TGRpcClientConfig& config, const TDeadline::Duration& timeout, bool permitWithoutCalls); + static void SetGrpcCompressionAlgorithm(NYdbGrpc::TGRpcClientConfig& config, EGrpcCompressionAlgorithm algorithm); + template std::pair>, TEndpointKey> GetServiceConnection( TDbDriverStatePtr dbState, const TEndpointKey& preferredEndpoint, @@ -110,6 +112,8 @@ class TGRpcConnectionsImpl clientConfig.LoadBalancingPolicy = GRpcLoadBalancingPolicy_; + SetGrpcCompressionAlgorithm(clientConfig, GRpcCompressionAlgorithm_); + if (dbState->DiscoveryMode != EDiscoveryMode::Off) { if (std::is_same() || dbState->Database.empty() @@ -710,6 +714,7 @@ class TGRpcConnectionsImpl const TDeadline::Duration GRpcKeepAliveTimeout_; const bool GRpcKeepAlivePermitWithoutCalls_; const std::string GRpcLoadBalancingPolicy_; + const EGrpcCompressionAlgorithm GRpcCompressionAlgorithm_; const std::uint64_t MemoryQuota_; const std::uint64_t MaxInboundMessageSize_; const std::uint64_t MaxOutboundMessageSize_; diff --git a/src/client/impl/internal/grpc_connections/params.h b/src/client/impl/internal/grpc_connections/params.h index 8ac2160f2bf..7d1758932fe 100644 --- a/src/client/impl/internal/grpc_connections/params.h +++ b/src/client/impl/internal/grpc_connections/params.h @@ -8,6 +8,7 @@ #include #include #include +#include namespace NYdb::inline V3 { @@ -39,6 +40,7 @@ class IConnectionsParams { virtual TDuration GetGRpcKeepAliveTimeout() const = 0; virtual bool GetGRpcKeepAlivePermitWithoutCalls() const = 0; virtual std::string GetGRpcLoadBalancingPolicy() const = 0; + virtual EGrpcCompressionAlgorithm GetGRpcCompressionAlgorithm() const = 0; virtual TDuration GetSocketIdleTimeout() const = 0; virtual const TLog& GetLog() const = 0; virtual uint64_t GetMemoryQuota() const = 0; From c179469ab99fed1d9cd69792222f9cec1aed05d2 Mon Sep 17 00:00:00 2001 From: Maria Okorochkova Date: Fri, 24 Apr 2026 11:57:56 +0000 Subject: [PATCH 07/13] Update CHANGELOG.md (#38234) --- .github/last_commit.txt | 2 +- CHANGELOG.md | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/last_commit.txt b/.github/last_commit.txt index 7dcc454d690..b28484d195d 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -7a52758c6d7d5e61ef23b6fef7e0b67f1d039554 +71cc36117f9ef6db448483a628c0bc225079cb8e diff --git a/CHANGELOG.md b/CHANGELOG.md index 31efdd3b5ff..05a0176f352 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Added interface for export of metrics and spans, supported plugin for OpenTelemetry + * Supported gRPC compression option on client side ## v3.17.0 From 89e600a34cc9e412dabc65ff4e25524ff3bde2b8 Mon Sep 17 00:00:00 2001 From: Sergey Murylev Date: Fri, 24 Apr 2026 11:58:03 +0000 Subject: [PATCH 08/13] Added warning about non matching vector size (issue #18667) (#38172) --- .github/last_commit.txt | 2 +- src/client/common_client/impl/client.h | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/.github/last_commit.txt b/.github/last_commit.txt index b28484d195d..c5d5621d723 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -71cc36117f9ef6db448483a628c0bc225079cb8e +51dec2e7eed28e7fd893db6359aab355eb498208 diff --git a/src/client/common_client/impl/client.h b/src/client/common_client/impl/client.h index 0e3c1e451b8..41c9a541af8 100644 --- a/src/client/common_client/impl/client.h +++ b/src/client/common_client/impl/client.h @@ -106,10 +106,12 @@ class TClientImplCommon auto extractor = [promise] (Ydb::Operations::Operation* operation, TPlainStatus status) mutable { - TStatus st(std::move(status)); if (!operation) { - promise.SetValue(TOp(std::move(st))); + promise.SetValue(TOp(TStatus(std::move(status)))); } else { + NYdb::NIssue::TIssues opIssues; + NYdb::NIssue::IssuesFromMessage(operation->issues(), opIssues); + TStatus st(static_cast(operation->status()), std::move(opIssues)); promise.SetValue(TOp(std::move(st), std::move(*operation))); } }; From 1bcb12095fdf7231273990415975c0f7c67865bd Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Fri, 24 Apr 2026 11:58:10 +0000 Subject: [PATCH 09/13] Make index build parallelism configurable in ALTER TABLE ADD INDEX (#24182) (#36274) --- .github/last_commit.txt | 2 +- include/ydb-cpp-sdk/client/table/table.h | 2 ++ src/api/protos/ydb_table.proto | 2 ++ src/client/table/table.cpp | 9 +++++++++ 4 files changed, 14 insertions(+), 1 deletion(-) diff --git a/.github/last_commit.txt b/.github/last_commit.txt index c5d5621d723..d769cd9b98e 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -51dec2e7eed28e7fd893db6359aab355eb498208 +fd4d02d6b3ebb3565fec58f5a585b44e09e6899e diff --git a/include/ydb-cpp-sdk/client/table/table.h b/include/ydb-cpp-sdk/client/table/table.h index 6527affe256..f8ca5b0ac5d 100644 --- a/include/ydb-cpp-sdk/client/table/table.h +++ b/include/ydb-cpp-sdk/client/table/table.h @@ -442,6 +442,7 @@ class TIndexDescription { const std::vector& GetDataColumns() const; const std::variant& GetIndexSettings() const; uint64_t GetSizeBytes() const; + void SetParallel(uint32_t parallel); static TIndexDescription CreateGlobalIndex( const std::string& name, @@ -521,6 +522,7 @@ class TIndexDescription { std::vector GlobalIndexSettings_; std::variant SpecializedIndexSettings_; uint64_t SizeBytes_ = 0; + uint32_t Parallel_ = 0; }; struct TRenameIndex { diff --git a/src/api/protos/ydb_table.proto b/src/api/protos/ydb_table.proto index 87656a497da..51d29a5f790 100644 --- a/src/api/protos/ydb_table.proto +++ b/src/api/protos/ydb_table.proto @@ -358,6 +358,8 @@ message TableIndex { } // list of columns content to be copied in to index table repeated string data_columns = 5; + // maximum level of parallelism for index build operation + optional uint32 parallel = 13; } // Represent table index with index state diff --git a/src/client/table/table.cpp b/src/client/table/table.cpp index 5f2c28cbdf7..bc0a885f237 100644 --- a/src/client/table/table.cpp +++ b/src/client/table/table.cpp @@ -2511,6 +2511,10 @@ uint64_t TIndexDescription::GetSizeBytes() const { return SizeBytes_; } +void TIndexDescription::SetParallel(uint32_t parallel) { + Parallel_ = parallel; +} + TIndexDescription TIndexDescription::CreateGlobalIndex( const std::string& name, const std::vector& indexColumns, @@ -3055,6 +3059,9 @@ TIndexDescription TIndexDescription::FromProto(const TProto& proto) { if constexpr (std::is_same_v) { result.SizeBytes_ = proto.size_bytes(); } + if constexpr (std::is_same_v) { + result.Parallel_ = proto.parallel(); + } return result; } @@ -3067,6 +3074,8 @@ void TIndexDescription::SerializeTo(Ydb::Table::TableIndex& proto) const { *proto.mutable_data_columns() = {DataColumns_.begin(), DataColumns_.end()}; + proto.set_parallel(Parallel_); + switch (IndexType_) { case EIndexType::GlobalSync: { auto& settings = *proto.mutable_global_index()->mutable_settings(); From 136cb5e46296869130082b0c99db6818ebf78104 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 24 Apr 2026 11:58:11 +0000 Subject: [PATCH 10/13] Update import generation: 39 --- .github/import_generation.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/import_generation.txt b/.github/import_generation.txt index a2720097dcc..425151f3a41 100644 --- a/.github/import_generation.txt +++ b/.github/import_generation.txt @@ -1 +1 @@ -39 +40 From 9e5b283f71dc207f636b2a146b5282f83964839c Mon Sep 17 00:00:00 2001 From: Maria Okorochkova Date: Fri, 24 Apr 2026 16:31:19 +0300 Subject: [PATCH 11/13] add cmakelists (#601) Co-authored-by: maladetska --- CMakeLists.txt | 3 +++ cmake/external_libs.cmake | 5 +++++ plugins/CMakeLists.txt | 2 ++ plugins/metrics/CMakeLists.txt | 3 +++ plugins/metrics/otel/CMakeLists.txt | 18 ++++++++++++++++++ plugins/trace/CMakeLists.txt | 3 +++ plugins/trace/otel/CMakeLists.txt | 16 ++++++++++++++++ src/client/CMakeLists.txt | 2 ++ src/client/impl/CMakeLists.txt | 1 + src/client/impl/observability/CMakeLists.txt | 15 +++++++++++++++ src/client/impl/stats/CMakeLists.txt | 1 + src/client/metrics/CMakeLists.txt | 7 +++++++ src/client/query/CMakeLists.txt | 1 + src/client/query/impl/CMakeLists.txt | 4 ++++ src/client/table/impl/CMakeLists.txt | 4 +++- src/client/trace/CMakeLists.txt | 7 +++++++ tests/integration/CMakeLists.txt | 1 + tests/unit/client/CMakeLists.txt | 15 +++++++++++++++ 18 files changed, 107 insertions(+), 1 deletion(-) create mode 100644 plugins/CMakeLists.txt create mode 100644 plugins/metrics/CMakeLists.txt create mode 100644 plugins/metrics/otel/CMakeLists.txt create mode 100644 plugins/trace/CMakeLists.txt create mode 100644 plugins/trace/otel/CMakeLists.txt create mode 100644 src/client/impl/observability/CMakeLists.txt create mode 100644 src/client/metrics/CMakeLists.txt create mode 100644 src/client/trace/CMakeLists.txt diff --git a/CMakeLists.txt b/CMakeLists.txt index 86eaef64720..6df450c510c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,6 +10,8 @@ project(YDB-CPP-SDK VERSION ${YDB_SDK_VERSION} LANGUAGES C CXX ASM) option(YDB_SDK_INSTALL "Install YDB C++ SDK" Off) option(YDB_SDK_TESTS "Build YDB C++ SDK tests" Off) option(YDB_SDK_EXAMPLES "Build YDB C++ SDK examples" On) +option(YDB_SDK_ENABLE_OTEL_METRICS "Build OpenTelemetry metrics plugin" Off) +option(YDB_SDK_ENABLE_OTEL_TRACE "Build OpenTelemetry trace plugin" Off) set(YDB_SDK_GOOGLE_COMMON_PROTOS_TARGET "" CACHE STRING "Name of cmake target preparing google common proto library") option(YDB_SDK_USE_RAPID_JSON "Search for rapid json library in system" ON) @@ -58,6 +60,7 @@ add_subdirectory(library/cpp) add_subdirectory(include/ydb-cpp-sdk/client) add_subdirectory(src) add_subdirectory(util) +add_subdirectory(plugins) #_ydb_sdk_validate_public_headers() diff --git a/cmake/external_libs.cmake b/cmake/external_libs.cmake index dc46fdb1d5e..4560fd662b3 100644 --- a/cmake/external_libs.cmake +++ b/cmake/external_libs.cmake @@ -14,6 +14,11 @@ find_package(Brotli 1.1.0 REQUIRED) find_package(jwt-cpp REQUIRED) find_package(double-conversion REQUIRED) +# OpenTelemetry +if (YDB_SDK_ENABLE_OTEL_METRICS OR YDB_SDK_ENABLE_OTEL_TRACE) + find_package(opentelemetry-cpp REQUIRED) +endif() + # RapidJSON if (YDB_SDK_USE_RAPID_JSON) find_package(RapidJSON REQUIRED) diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt new file mode 100644 index 00000000000..33bacdb9871 --- /dev/null +++ b/plugins/CMakeLists.txt @@ -0,0 +1,2 @@ +add_subdirectory(metrics) +add_subdirectory(trace) \ No newline at end of file diff --git a/plugins/metrics/CMakeLists.txt b/plugins/metrics/CMakeLists.txt new file mode 100644 index 00000000000..2bcde514095 --- /dev/null +++ b/plugins/metrics/CMakeLists.txt @@ -0,0 +1,3 @@ +if (YDB_SDK_ENABLE_OTEL_METRICS) + add_subdirectory(otel EXCLUDE_FROM_ALL) +endif() \ No newline at end of file diff --git a/plugins/metrics/otel/CMakeLists.txt b/plugins/metrics/otel/CMakeLists.txt new file mode 100644 index 00000000000..fe6bcf4ea04 --- /dev/null +++ b/plugins/metrics/otel/CMakeLists.txt @@ -0,0 +1,18 @@ +_ydb_sdk_add_library(open_telemetry_metrics) + +target_sources(open_telemetry_metrics PRIVATE + src/metrics.cpp +) +target_include_directories(open_telemetry_metrics PUBLIC + $ + $ +) +target_link_libraries(open_telemetry_metrics PUBLIC + client-metrics + client-resources + opentelemetry-cpp::api + opentelemetry-cpp::metrics +) +_ydb_sdk_make_client_component(OpenTelemetryMetrics open_telemetry_metrics) + +_ydb_sdk_install_headers(${CMAKE_INSTALL_INCLUDEDIR} DIRECTORY include/) \ No newline at end of file diff --git a/plugins/trace/CMakeLists.txt b/plugins/trace/CMakeLists.txt new file mode 100644 index 00000000000..c1977028a0d --- /dev/null +++ b/plugins/trace/CMakeLists.txt @@ -0,0 +1,3 @@ +if (YDB_SDK_ENABLE_OTEL_TRACE) + add_subdirectory(otel EXCLUDE_FROM_ALL) +endif() \ No newline at end of file diff --git a/plugins/trace/otel/CMakeLists.txt b/plugins/trace/otel/CMakeLists.txt new file mode 100644 index 00000000000..c1cec305ec3 --- /dev/null +++ b/plugins/trace/otel/CMakeLists.txt @@ -0,0 +1,16 @@ +_ydb_sdk_add_library(open_telemetry_trace) +target_sources(open_telemetry_trace PRIVATE + src/trace.cpp +) +target_include_directories(open_telemetry_trace PUBLIC + $ + $ +) +target_link_libraries(open_telemetry_trace PUBLIC + client-trace + opentelemetry-cpp::api + opentelemetry-cpp::trace +) +_ydb_sdk_make_client_component(OpenTelemetryTrace open_telemetry_trace) + +_ydb_sdk_install_headers(${CMAKE_INSTALL_INCLUDEDIR} DIRECTORY include/) \ No newline at end of file diff --git a/src/client/CMakeLists.txt b/src/client/CMakeLists.txt index 0ef5f3fcb42..cc888987a05 100644 --- a/src/client/CMakeLists.txt +++ b/src/client/CMakeLists.txt @@ -12,6 +12,7 @@ add_subdirectory(iam) add_subdirectory(iam_private) add_subdirectory(impl) add_subdirectory(import) +add_subdirectory(metrics) add_subdirectory(monitoring) add_subdirectory(operation) add_subdirectory(params) @@ -26,5 +27,6 @@ add_subdirectory(secret) add_subdirectory(ss_tasks) add_subdirectory(table) add_subdirectory(topic) +add_subdirectory(trace) add_subdirectory(types) add_subdirectory(value) diff --git a/src/client/impl/CMakeLists.txt b/src/client/impl/CMakeLists.txt index 9e04f134b37..8dfc3fa865b 100644 --- a/src/client/impl/CMakeLists.txt +++ b/src/client/impl/CMakeLists.txt @@ -1,5 +1,6 @@ add_subdirectory(endpoints) add_subdirectory(executor) add_subdirectory(internal) +add_subdirectory(observability) add_subdirectory(session) add_subdirectory(stats) diff --git a/src/client/impl/observability/CMakeLists.txt b/src/client/impl/observability/CMakeLists.txt new file mode 100644 index 00000000000..c92be9dab69 --- /dev/null +++ b/src/client/impl/observability/CMakeLists.txt @@ -0,0 +1,15 @@ +_ydb_sdk_add_library(impl-observability) + +target_link_libraries(impl-observability PUBLIC + yutil + client-metrics + client-impl-ydb_stats +) + +target_sources(impl-observability PRIVATE + metrics.cpp + observation.cpp + span.cpp +) + +_ydb_sdk_install_targets(TARGETS impl-observability) \ No newline at end of file diff --git a/src/client/impl/stats/CMakeLists.txt b/src/client/impl/stats/CMakeLists.txt index 498104196cd..15866af4bc6 100644 --- a/src/client/impl/stats/CMakeLists.txt +++ b/src/client/impl/stats/CMakeLists.txt @@ -4,6 +4,7 @@ target_link_libraries(client-impl-ydb_stats PUBLIC yutil grpc-client monlib-metrics + client-metrics ) target_sources(client-impl-ydb_stats PRIVATE diff --git a/src/client/metrics/CMakeLists.txt b/src/client/metrics/CMakeLists.txt new file mode 100644 index 00000000000..5f735343191 --- /dev/null +++ b/src/client/metrics/CMakeLists.txt @@ -0,0 +1,7 @@ +_ydb_sdk_add_library(client-metrics) + +target_sources(client-metrics PRIVATE + metrics.cpp +) + +_ydb_sdk_make_client_component(Metrics client-metrics) \ No newline at end of file diff --git a/src/client/query/CMakeLists.txt b/src/client/query/CMakeLists.txt index 6677d402d4d..bc159ea87ab 100644 --- a/src/client/query/CMakeLists.txt +++ b/src/client/query/CMakeLists.txt @@ -11,6 +11,7 @@ target_link_libraries(client-ydb_query PUBLIC client-ydb_driver client-ydb_query-impl client-ydb_result + client-metrics client-types-operation api-protos api-grpc diff --git a/src/client/query/impl/CMakeLists.txt b/src/client/query/impl/CMakeLists.txt index 76b112b2254..d33258b7afd 100644 --- a/src/client/query/impl/CMakeLists.txt +++ b/src/client/query/impl/CMakeLists.txt @@ -9,6 +9,10 @@ target_link_libraries(client-ydb_query-impl PUBLIC client-ydb_result ) +target_link_libraries(client-ydb_query-impl PUBLIC + impl-observability +) + target_sources(client-ydb_query-impl PRIVATE exec_query.cpp client_session.cpp diff --git a/src/client/table/impl/CMakeLists.txt b/src/client/table/impl/CMakeLists.txt index 8f53d386fc6..63acf17dad9 100644 --- a/src/client/table/impl/CMakeLists.txt +++ b/src/client/table/impl/CMakeLists.txt @@ -9,7 +9,9 @@ target_link_libraries(client-ydb_table-impl library-operation_id client-impl-ydb_endpoints impl-session - client-ydb_table-query_stats + client-ydb_table-query_statsё + client-metrics + impl-observability PRIVATE OpenSSL::SSL ) diff --git a/src/client/trace/CMakeLists.txt b/src/client/trace/CMakeLists.txt new file mode 100644 index 00000000000..55261600333 --- /dev/null +++ b/src/client/trace/CMakeLists.txt @@ -0,0 +1,7 @@ +_ydb_sdk_add_library(client-trace) + +target_sources(client-trace PRIVATE + trace.cpp +) + +_ydb_sdk_make_client_component(Trace client-trace) \ No newline at end of file diff --git a/tests/integration/CMakeLists.txt b/tests/integration/CMakeLists.txt index d5a1d709245..8aa28839a63 100644 --- a/tests/integration/CMakeLists.txt +++ b/tests/integration/CMakeLists.txt @@ -1,6 +1,7 @@ add_subdirectory(auth) add_subdirectory(basic_example) add_subdirectory(bulk_upsert) +add_subdirectory(metrics) add_subdirectory(server_restart) add_subdirectory(sessions) add_subdirectory(sessions_pool) diff --git a/tests/unit/client/CMakeLists.txt b/tests/unit/client/CMakeLists.txt index 2c6356aa6ba..be4e49bfc9f 100644 --- a/tests/unit/client/CMakeLists.txt +++ b/tests/unit/client/CMakeLists.txt @@ -118,3 +118,18 @@ add_ydb_test(NAME client-value_ut GTEST LABELS unit ) + +add_ydb_test(NAME client-ydb_metrics_ut GTEST + INCLUDE_DIRS + ${YDB_SDK_SOURCE_DIR} + SOURCES + observability/metrics_ut.cpp + LINK_LIBRARIES + yutil + impl-observability + client-ydb_query-impl + client-ydb_table-impl + client-metrics + LABELS + unit +) \ No newline at end of file From d083095ec371725d0eb40cc00a63594a9a5af72d Mon Sep 17 00:00:00 2001 From: Maria Okorochkova Date: Fri, 24 Apr 2026 17:02:30 +0300 Subject: [PATCH 12/13] add cmakelists (#603) * add cmakelists * add cmakelists --------- Co-authored-by: maladetska --- tests/integration/metrics/CMakeLists.txt | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 tests/integration/metrics/CMakeLists.txt diff --git a/tests/integration/metrics/CMakeLists.txt b/tests/integration/metrics/CMakeLists.txt new file mode 100644 index 00000000000..4d83aae56bc --- /dev/null +++ b/tests/integration/metrics/CMakeLists.txt @@ -0,0 +1,12 @@ +add_ydb_test(NAME metrics_it GTEST + INCLUDE_DIRS + ${YDB_SDK_SOURCE_DIR} + SOURCES + main.cpp + LINK_LIBRARIES + yutil + YDB-CPP-SDK::Query + client-metrics + LABELS + integration +) \ No newline at end of file From 64fceb2448666176216847e64cb9fed012e442fe Mon Sep 17 00:00:00 2001 From: Bulat Gayazov Date: Mon, 27 Apr 2026 13:27:55 +0000 Subject: [PATCH 13/13] Fixed CMakeLists --- src/client/table/impl/CMakeLists.txt | 2 +- src/client/topic/impl/write_session.cpp | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/client/table/impl/CMakeLists.txt b/src/client/table/impl/CMakeLists.txt index 63acf17dad9..8ecfe4ead87 100644 --- a/src/client/table/impl/CMakeLists.txt +++ b/src/client/table/impl/CMakeLists.txt @@ -9,7 +9,7 @@ target_link_libraries(client-ydb_table-impl library-operation_id client-impl-ydb_endpoints impl-session - client-ydb_table-query_statsё + client-ydb_table-query_stats client-metrics impl-observability PRIVATE diff --git a/src/client/topic/impl/write_session.cpp b/src/client/topic/impl/write_session.cpp index 6e962cb7481..bd103aee855 100644 --- a/src/client/topic/impl/write_session.cpp +++ b/src/client/topic/impl/write_session.cpp @@ -4,8 +4,6 @@ #include #include -#include - #include #include @@ -17,9 +15,11 @@ namespace NYdb::inline V3::NTopic { +constexpr std::string_view WRITE_SESSION_ATTRIBUTE_TRACK_PRODUCER_ID_IN_TX = "track_producer_id_in_tx"; + TWriteSessionSettings& TWriteSessionSettings::SetTrackProducerIdInTx(bool value) { - const auto& key = NPersQueue::WRITE_SESSION_ATTRIBUTE_TRACK_PRODUCER_ID_IN_TX; + const auto& key = WRITE_SESSION_ATTRIBUTE_TRACK_PRODUCER_ID_IN_TX; return AppendSessionMeta({key.data(), key.size()}, value ? "true" : "false"); }