diff --git a/.github/last_commit.txt b/.github/last_commit.txt index d769cd9b98..62b939ab8b 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -fd4d02d6b3ebb3565fec58f5a585b44e09e6899e +fd4d02d6b3ebb3565fec58f5a585b44e09e6899e \ No newline at end of file diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 82d0980a29..1e2c1a0f8a 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -10,3 +10,7 @@ add_subdirectory(topic_writer/producer/basic_write) add_subdirectory(ttl) add_subdirectory(vector_index) add_subdirectory(vector_index_builtin) + +if (YDB_SDK_ENABLE_OTEL_TRACE AND YDB_SDK_ENABLE_OTEL_METRICS) + add_subdirectory(otel_tracing) +endif() diff --git a/examples/otel_tracing/CMakeLists.txt b/examples/otel_tracing/CMakeLists.txt new file mode 100644 index 0000000000..b826c66688 --- /dev/null +++ b/examples/otel_tracing/CMakeLists.txt @@ -0,0 +1,41 @@ +add_executable(otel_tracing_example) + +target_link_libraries(otel_tracing_example PUBLIC + yutil + getopt + YDB-CPP-SDK::Query + YDB-CPP-SDK::Table + YDB-CPP-SDK::Params + YDB-CPP-SDK::Driver + YDB-CPP-SDK::OpenTelemetryTrace + YDB-CPP-SDK::OpenTelemetryMetrics + opentelemetry-cpp::otlp_http_exporter + opentelemetry-cpp::otlp_http_metric_exporter +) + +target_sources(otel_tracing_example PRIVATE + main.cpp +) + +vcs_info(otel_tracing_example) + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64") + target_link_libraries(otel_tracing_example PUBLIC + cpuid_check + ) +endif() + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux") + target_link_options(otel_tracing_example PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -lpthread + ) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin") + target_link_options(otel_tracing_example PRIVATE + -Wl,-platform_version,macos,11.0,11.0 + -framework + CoreFoundation + ) +endif() diff --git a/examples/otel_tracing/README.md b/examples/otel_tracing/README.md new file mode 100644 index 0000000000..58f60fe0c8 --- /dev/null +++ b/examples/otel_tracing/README.md @@ -0,0 +1,194 @@ +# YDB C++ SDK — OpenTelemetry Demo + +Демонстрация трассировки и метрик операций QueryService и TableService +с визуализацией в **Grafana**, **Jaeger** и **Prometheus**. + +## Архитектура + +``` +┌──────────────┐ OTLP/HTTP ┌──────────────────┐ +│ C++ demo │ ──────────────────> │ OTel Collector │ +│ application │ │ :4328 (HTTP) │ +└──────────────┘ └────────┬──────────┘ + │ │ + traces │ │ metrics + ▼ ▼ + ┌──────────┐ ┌────────────┐ + │ Jaeger │ │ Prometheus │ + │ :16686 │ │ :9090 │ + └─────┬─────┘ └──────┬──────┘ + │ │ + └───────┬───────┘ + ▼ + ┌──────────┐ + │ Grafana │ + │ :3000 │ + └──────────┘ +``` + +## Быстрый старт + +### 1. Запустить инфраструктуру + +```bash +cd examples/otel_tracing +docker compose up -d +``` + +Дождитесь готовности YDB: + +```bash +docker compose logs ydb -f +# Ждите строку "Database started successfully" +``` + +### 2. Собрать SDK с OTel и тестами + +Из корня репозитория: + +```bash +mkdir -p build && cd build + +cmake .. \ + -DYDB_SDK_TESTS=ON \ + -DYDB_SDK_ENABLE_OTEL_TRACE=ON \ + -DYDB_SDK_ENABLE_OTEL_METRICS=ON + +cmake --build . --target otel_tracing_example -j$(nproc) +``` + +### 3. Запустить демо + +```bash +./examples/otel_tracing/otel_tracing_example \ + --endpoint localhost:2136 \ + --database /local \ + --otlp http://localhost:4328 \ + --iterations 20 \ + --retry-workers 6 \ + --retry-ops 30 +``` + +#### Доступные флаги + +| Флаг | По умолчанию | Описание | +|--------------------|---------------------------|--------------------------------------------------------------------------| +| `--endpoint`, `-e` | `localhost:2136` | gRPC-эндпоинт YDB | +| `--database`, `-d` | `/local` | Имя базы | +| `--otlp` | `http://localhost:4328` | OTLP/HTTP endpoint коллектора | +| `--iterations`,`-n`| `20` | Итераций в Query- и Table-нагрузке | +| `--retry-workers` | `6` | Параллельных воркеров в retry-нагрузке (`0` чтобы пропустить) | +| `--retry-ops` | `30` | Операций на каждого retry-воркера | + +#### Демонстрация реальных ретраев + +Третий встроенный сценарий — `RunRetryWorkload` — намеренно провоцирует +**SERIALIZABLE-конфликты**: N параллельных воркеров делают +`SELECT → sleep → UPSERT → COMMIT` на одной и той же «горячей» строке +(`id = 9999`) внутри `RetryQuerySync`. YDB возвращает `ABORTED` +проигравшим транзакциям, и SDK прозрачно ретраит их. + +В трейсах появятся: + +``` +ydb.RunWithRetry (INTERNAL, ydb.retry.count=N) +├── ydb.Try (INTERNAL) # первая попытка: ydb.retry.attempt и ydb.retry.backoff_ms отсутствуют +│ ├── ydb.CreateSession +│ ├── ydb.ExecuteQuery +│ └── ydb.Commit db.response.status_code=ABORTED, error.type=ydb_error, exception event +├── ydb.Try ydb.retry.attempt=1 (INTERNAL, ydb.retry.backoff_ms=...) +│ └── ... db.response.status_code=ABORTED, error.type=ydb_error +└── ydb.Try ydb.retry.attempt=N (INTERNAL, ydb.retry.backoff_ms=...) + └── ... db.response.status_code=SUCCESS +``` + +Для усиления конфликтов поднимите воркеров и операций: + +```bash +./examples/otel_tracing/otel_tracing_example \ + --retry-workers 12 --retry-ops 80 +``` + +В конце программа печатает счётчик наблюдённых абортов — каждый из них +соответствует одному автоматическому ретраю SDK. + +> **Важно:** для статуса `ABORTED` SDK использует политику +> `RetryImmediately` (см. `src/client/impl/internal/retry/retry.h`), +> поэтому атрибут `ydb.retry.backoff_ms` будет равен `0` — +> это by design. Чтобы увидеть `backoff_ms > 0`, нужны статусы +> `UNAVAILABLE` (FastBackoff, slot 5 ms) или `OVERLOADED` / +> `CLIENT_RESOURCE_EXHAUSTED` (SlowBackoff, slot 1 s). Самый простой способ +> их получить — кратковременно перезапустить YDB во время работы примера: +> +> ```bash +> ./examples/otel_tracing/otel_tracing_example --retry-workers 8 --retry-ops 100 & +> sleep 5 +> docker compose -f examples/otel_tracing/docker-compose.yml restart ydb +> wait +> ``` + +### 4. Открыть дашборды + +| Сервис | URL | Описание | +|-----------|------------------------------|---------------------------------| +| Grafana | http://localhost:3000 | Дашборд "YDB QueryService" | +| Jaeger | http://localhost:16686 | Поиск трейсов по сервису | +| Prometheus| http://localhost:9090 | Метрики `db_client_operation_*` | + +**Grafana**: логин `admin` / пароль `admin`. + +### 5. Что смотреть + +#### В Grafana (дашборд "YDB QueryService"): +- **Request Rate by Operation** — RPS по операциям (ExecuteQuery, ExecuteDataQuery, CreateSession, Commit, Rollback) +- **Error Rate by Operation** — частота ошибок +- **Duration p50/p95/p99** — распределение длительности операций +- **Error Ratio** — процент ошибок +- **Recent Traces** — таблица трейсов из Jaeger + +#### В Jaeger UI: +- Выберите сервис `ydb-cpp-sdk-demo`. +- RPC-спаны (`SpanKind = CLIENT`): + `ydb.CreateSession`, `ydb.ExecuteQuery`, `ydb.ExecuteDataQuery`, + `ydb.BeginTransaction`, `ydb.Commit`, `ydb.Rollback`, + `ydb.ExecuteSchemeQuery`, `ydb.BulkUpsert`. +- Retry-спаны (`SpanKind = INTERNAL`): + - `ydb.RunWithRetry` — обёртка над всей retryable-логикой. + При фактических повторах содержит атрибут `ydb.retry.count` (общее число + выполненных повторов, `>= 1`). + - `ydb.Try` — по одному на каждую попытку. На retry-попытках содержит + атрибуты `ydb.retry.attempt` (`1..N`) и `ydb.retry.backoff_ms` + (длительность sleep перед этой попыткой). На первой (не retry) попытке + эти атрибуты не выставляются. +- Общие атрибуты на всех YDB-спанах: + - `db.system.name = ydb` + - `db.namespace` (имя базы) + - `server.address`, `server.port` (эндпоинт балансера) + - `network.peer.address`, `network.peer.port` (фактический узел кластера) +- На ошибках добавляются: + - `db.response.status_code` — строковый статус YDB (например, `ABORTED`) + - `error.type` — категория источника ошибки: `ydb_error` (ошибка, + возвращённая YDB) или `transport_error` (ошибка транспортного уровня) + - событие `exception` с `exception.type` и `exception.message` + +#### В Prometheus: +- `db_client_operation_duration_seconds_bucket` — гистограмма длительности + (OTel Semantic Conventions). Лейблы: `db.system.name`, `db.namespace`, + `db.operation.name` (с префиксом `ydb.`), `ydb.client.api` + (`Query` / `Table`). Для ошибок добавляются `db.response.status_code` + (точный YDB-статус, например `ABORTED`) и `error.type` — + низкокардинальная категория источника ошибки: `ydb_error` (статусы YDB-сервера) + или `transport_error` (клиентские/транспортные статусы). +- `db_client_operation_requests_total` — счётчик начатых операций + (включая каждую попытку ретрая). +- `db_client_operation_errors_total` — счётчик неуспешных попыток. + Полезно сравнивать с `requests_total`: для retry-нагрузки на той же + «горячей» строке коэффициент ошибок будет очень высоким — это и есть + индикатор работы ретраев. + +### 6. Остановить + +```bash +cd examples/otel_tracing +docker compose down -v +``` diff --git a/examples/otel_tracing/docker-compose.yml b/examples/otel_tracing/docker-compose.yml new file mode 100644 index 0000000000..8af4a6194c --- /dev/null +++ b/examples/otel_tracing/docker-compose.yml @@ -0,0 +1,71 @@ +services: + ydb: + image: cr.yandex/yc/yandex-docker-local-ydb:latest + platform: linux/amd64 + hostname: localhost + ports: + - "2136:2136" + - "8765:8765" + environment: + - GRPC_TLS_PORT=2135 + - GRPC_PORT=2136 + - MON_PORT=8765 + - YDB_DEFAULT_LOG_LEVEL=NOTICE + - YDB_USE_IN_MEMORY_PDISKS=true + volumes: + - ydb-data:/ydb_data + healthcheck: + test: /bin/sh -c "/ydb -e grpc://localhost:2136 -d /local scheme ls" + interval: 5s + timeout: 5s + retries: 20 + + jaeger: + image: jaegertracing/all-in-one:1.76.0 + ports: + - "16686:16686" + - "4317:4317" + - "4318:4318" + environment: + - COLLECTOR_OTLP_ENABLED=true + + prometheus: + image: prom/prometheus:v2.53.0 + ports: + - "9090:9090" + volumes: + - ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml:ro + depends_on: + - otel-collector + + otel-collector: + image: otel/opentelemetry-collector-contrib:0.110.0 + ports: + - "4327:4317" + - "4328:4318" + - "8889:8889" + volumes: + - ./otel-collector/config.yml:/etc/otelcol-contrib/config.yaml:ro + depends_on: + - jaeger + + grafana: + image: grafana/grafana:11.1.0 + ports: + - "3000:3000" + environment: + - GF_SECURITY_ADMIN_USER=admin + - GF_SECURITY_ADMIN_PASSWORD=admin + - GF_AUTH_ANONYMOUS_ENABLED=true + - GF_AUTH_ANONYMOUS_ORG_ROLE=Admin + volumes: + - ./grafana/provisioning:/etc/grafana/provisioning:ro + - ./grafana/dashboards:/var/lib/grafana/dashboards:ro + - grafana-data:/var/lib/grafana + depends_on: + - jaeger + - prometheus + +volumes: + ydb-data: + grafana-data: diff --git a/examples/otel_tracing/grafana/dashboards/ydb-query-service.json b/examples/otel_tracing/grafana/dashboards/ydb-query-service.json new file mode 100644 index 0000000000..12a38d99f1 --- /dev/null +++ b/examples/otel_tracing/grafana/dashboards/ydb-query-service.json @@ -0,0 +1,129 @@ +{ + "annotations": { "list": [] }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 1, + "links": [], + "panels": [ + { + "title": "Request Rate by Operation", + "type": "timeseries", + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 0 }, + "datasource": { "type": "prometheus", "uid": "${prometheus_ds}" }, + "fieldConfig": { + "defaults": { + "unit": "ops", + "custom": { "drawStyle": "line", "fillOpacity": 10 } + } + }, + "targets": [ + { + "expr": "rate(db_client_operation_requests_total[1m])", + "legendFormat": "{{db_operation_name}}" + } + ] + }, + { + "title": "Error Rate by Operation", + "type": "timeseries", + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 0 }, + "datasource": { "type": "prometheus", "uid": "${prometheus_ds}" }, + "fieldConfig": { + "defaults": { + "unit": "ops", + "custom": { "drawStyle": "line", "fillOpacity": 10 }, + "color": { "mode": "palette-classic" } + } + }, + "targets": [ + { + "expr": "rate(db_client_operation_errors_total[1m])", + "legendFormat": "{{db_operation_name}}" + } + ] + }, + { + "title": "Duration p50 / p95 / p99 (s)", + "type": "timeseries", + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 8 }, + "datasource": { "type": "prometheus", "uid": "${prometheus_ds}" }, + "fieldConfig": { + "defaults": { + "unit": "s", + "custom": { "drawStyle": "line", "fillOpacity": 5 } + } + }, + "targets": [ + { + "expr": "histogram_quantile(0.50, rate(db_client_operation_duration_seconds_bucket[1m]))", + "legendFormat": "p50 {{db_operation_name}}" + }, + { + "expr": "histogram_quantile(0.95, rate(db_client_operation_duration_seconds_bucket[1m]))", + "legendFormat": "p95 {{db_operation_name}}" + }, + { + "expr": "histogram_quantile(0.99, rate(db_client_operation_duration_seconds_bucket[1m]))", + "legendFormat": "p99 {{db_operation_name}}" + } + ] + }, + { + "title": "Error Ratio (%)", + "type": "stat", + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 8 }, + "datasource": { "type": "prometheus", "uid": "${prometheus_ds}" }, + "fieldConfig": { + "defaults": { + "unit": "percentunit", + "thresholds": { + "steps": [ + { "color": "green", "value": null }, + { "color": "yellow", "value": 0.01 }, + { "color": "red", "value": 0.05 } + ] + } + } + }, + "targets": [ + { + "expr": "sum(rate(db_client_operation_errors_total[5m])) by (db_operation_name) / sum(rate(db_client_operation_requests_total[5m])) by (db_operation_name)", + "legendFormat": "{{db_operation_name}}" + } + ] + }, + { + "title": "Recent Traces", + "type": "table", + "gridPos": { "h": 10, "w": 24, "x": 0, "y": 16 }, + "datasource": { "type": "jaeger", "uid": "${jaeger_ds}" }, + "targets": [ + { + "query": "ydb-cpp-sdk-demo", + "queryType": "search", + "service": "ydb-cpp-sdk-demo" + } + ] + } + ], + "schemaVersion": 39, + "templating": { + "list": [ + { + "name": "prometheus_ds", + "type": "datasource", + "query": "prometheus", + "current": { "text": "Prometheus", "value": "Prometheus" } + }, + { + "name": "jaeger_ds", + "type": "datasource", + "query": "jaeger", + "current": { "text": "Jaeger", "value": "Jaeger" } + } + ] + }, + "time": { "from": "now-30m", "to": "now" }, + "title": "YDB QueryService", + "uid": "ydb-query-service" +} diff --git a/examples/otel_tracing/grafana/provisioning/dashboards/dashboards.yml b/examples/otel_tracing/grafana/provisioning/dashboards/dashboards.yml new file mode 100644 index 0000000000..8336756c13 --- /dev/null +++ b/examples/otel_tracing/grafana/provisioning/dashboards/dashboards.yml @@ -0,0 +1,12 @@ +apiVersion: 1 + +providers: + - name: "YDB" + orgId: 1 + folder: "YDB" + type: file + disableDeletion: false + editable: true + options: + path: /var/lib/grafana/dashboards + foldersFromFilesStructure: false diff --git a/examples/otel_tracing/grafana/provisioning/datasources/datasources.yml b/examples/otel_tracing/grafana/provisioning/datasources/datasources.yml new file mode 100644 index 0000000000..428e06210a --- /dev/null +++ b/examples/otel_tracing/grafana/provisioning/datasources/datasources.yml @@ -0,0 +1,16 @@ +apiVersion: 1 + +datasources: + - name: Jaeger + type: jaeger + access: proxy + url: http://jaeger:16686 + isDefault: false + editable: true + + - name: Prometheus + type: prometheus + access: proxy + url: http://prometheus:9090 + isDefault: true + editable: true diff --git a/examples/otel_tracing/main.cpp b/examples/otel_tracing/main.cpp new file mode 100644 index 0000000000..acaf4d5160 --- /dev/null +++ b/examples/otel_tracing/main.cpp @@ -0,0 +1,446 @@ +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include + +#include + +#include +#include +#include +#include + +namespace nostd = opentelemetry::nostd; +namespace sdktrace = opentelemetry::sdk::trace; +namespace sdkmetrics = opentelemetry::sdk::metrics; +namespace otlp = opentelemetry::exporter::otlp; +namespace resource = opentelemetry::sdk::resource; + +using namespace NYdb; +using namespace NYdb::NStatusHelpers; + +struct TConfig { + std::string Endpoint = "localhost:2136"; + std::string Database = "/local"; + std::string OtlpEndpoint = "http://localhost:4328"; + int Iterations = 20; + int RetryWorkers = 6; + int RetryOps = 30; +}; + +nostd::shared_ptr InitTracing(const TConfig& cfg) { + otlp::OtlpHttpExporterOptions opts; + opts.url = cfg.OtlpEndpoint + "/v1/traces"; + + auto exporter = otlp::OtlpHttpExporterFactory::Create(opts); + auto processor = sdktrace::SimpleSpanProcessorFactory::Create(std::move(exporter)); + + auto res = resource::Resource::Create({ + {"service.name", "ydb-cpp-sdk-demo"}, + {"service.version", "1.0.0"}, + }); + + std::shared_ptr provider = + std::make_shared(std::move(processor), res); + return nostd::shared_ptr(provider); +} + +nostd::shared_ptr InitMetrics(const TConfig& cfg) { + otlp::OtlpHttpMetricExporterOptions opts; + opts.url = cfg.OtlpEndpoint + "/v1/metrics"; + + auto exporter = otlp::OtlpHttpMetricExporterFactory::Create(opts); + + sdkmetrics::PeriodicExportingMetricReaderOptions readerOpts; + readerOpts.export_interval_millis = std::chrono::milliseconds(5000); + readerOpts.export_timeout_millis = std::chrono::milliseconds(3000); + + auto reader = sdkmetrics::PeriodicExportingMetricReaderFactory::Create(std::move(exporter), readerOpts); + + auto res = resource::Resource::Create({ + {"service.name", "ydb-cpp-sdk-demo"}, + {"service.version", "1.0.0"}, + }); + + auto rawProvider = std::make_shared( + std::unique_ptr(new sdkmetrics::ViewRegistry()), res); + rawProvider->AddMetricReader(std::move(reader)); + + std::shared_ptr provider = rawProvider; + return nostd::shared_ptr(provider); +} + +nostd::shared_ptr GetAppTracer() { + return opentelemetry::trace::Provider::GetTracerProvider()->GetTracer("ydb-demo-app", "1.0.0"); +} + +void RunQueryWorkload(NQuery::TQueryClient& client, int iterations) { + std::cout << "\n=== Query Service workload ===" << std::endl; + + auto tracer = GetAppTracer(); + + { + auto ddlSpan = tracer->StartSpan("QueryService.DDL"); + auto scope = opentelemetry::trace::Scope(ddlSpan); + + ThrowOnError(client.RetryQuerySync([](NQuery::TSession session) { + return session.ExecuteQuery(R"( + CREATE TABLE IF NOT EXISTS otel_demo ( + id Uint64, + value Utf8, + PRIMARY KEY (id) + ) + )", NQuery::TTxControl::NoTx()).GetValueSync(); + })); + + ddlSpan->SetStatus(opentelemetry::trace::StatusCode::kOk); + } + + for (int i = 0; i < iterations; ++i) { + auto iterSpan = tracer->StartSpan("QueryService.Iteration"); + auto scope = opentelemetry::trace::Scope(iterSpan); + iterSpan->SetAttribute("iteration", static_cast(i + 1)); + + std::cout << " [Query] Iteration " << (i + 1) << "/" << iterations << std::endl; + + ThrowOnError(client.RetryQuerySync([i](NQuery::TSession session) { + auto params = TParamsBuilder() + .AddParam("$id").Uint64(i).Build() + .AddParam("$val").Utf8("query_" + std::to_string(i)).Build() + .Build(); + + return session.ExecuteQuery(R"( + DECLARE $id AS Uint64; + DECLARE $val AS Utf8; + UPSERT INTO otel_demo (id, value) VALUES ($id, $val) + )", NQuery::TTxControl::BeginTx(NQuery::TTxSettings::SerializableRW()).CommitTx(), + params).GetValueSync(); + })); + + ThrowOnError(client.RetryQuerySync([i](NQuery::TSession session) { + auto params = TParamsBuilder() + .AddParam("$id").Uint64(i).Build() + .Build(); + + return session.ExecuteQuery(R"( + DECLARE $id AS Uint64; + SELECT id, value FROM otel_demo WHERE id = $id + )", NQuery::TTxControl::BeginTx(NQuery::TTxSettings::SerializableRW()).CommitTx(), + params).GetValueSync(); + })); + + if (i % 5 == 4) { + ThrowOnError(client.RetryQuerySync([](NQuery::TQueryClient client) -> TStatus { + auto session = client.GetSession().GetValueSync().GetSession(); + auto beginResult = session.BeginTransaction(NQuery::TTxSettings::SerializableRW()).GetValueSync(); + if (!beginResult.IsSuccess()) { + return beginResult; + } + auto tx = beginResult.GetTransaction(); + + auto result = session.ExecuteQuery(R"( + SELECT COUNT(*) AS cnt FROM otel_demo + )", NQuery::TTxControl::Tx(tx)).GetValueSync(); + + if (!result.IsSuccess()) { + return result; + } + + return tx.Commit().GetValueSync(); + })); + } + + iterSpan->SetStatus(opentelemetry::trace::StatusCode::kOk); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } +} + +void RunTableWorkload(NTable::TTableClient& client, int iterations) { + std::cout << "\n=== Table Service workload ===" << std::endl; + + auto tracer = GetAppTracer(); + + for (int i = 0; i < iterations; ++i) { + int id = 1000 + i; + + auto iterSpan = tracer->StartSpan("TableService.Iteration"); + auto scope = opentelemetry::trace::Scope(iterSpan); + iterSpan->SetAttribute("iteration", static_cast(i + 1)); + + std::cout << " [Table] Iteration " << (i + 1) << "/" << iterations << std::endl; + + ThrowOnError(client.RetryOperationSync([id](NTable::TSession session) { + auto params = session.GetParamsBuilder() + .AddParam("$id").Uint64(id).Build() + .AddParam("$val").Utf8("table_" + std::to_string(id)).Build() + .Build(); + + return session.ExecuteDataQuery(R"( + DECLARE $id AS Uint64; + DECLARE $val AS Utf8; + UPSERT INTO otel_demo (id, value) VALUES ($id, $val) + )", NTable::TTxControl::BeginTx(NTable::TTxSettings::SerializableRW()).CommitTx(), + std::move(params)).GetValueSync(); + })); + + ThrowOnError(client.RetryOperationSync([id](NTable::TSession session) { + auto params = session.GetParamsBuilder() + .AddParam("$id").Uint64(id).Build() + .Build(); + + return session.ExecuteDataQuery(R"( + DECLARE $id AS Uint64; + SELECT id, value FROM otel_demo WHERE id = $id + )", NTable::TTxControl::BeginTx(NTable::TTxSettings::SerializableRW()).CommitTx(), + std::move(params)).GetValueSync(); + })); + + ThrowOnError(client.RetryOperationSync([](NTable::TSession session) -> TStatus { + auto beginResult = session.BeginTransaction(NTable::TTxSettings::SerializableRW()).GetValueSync(); + if (!beginResult.IsSuccess()) { + return beginResult; + } + auto tx = beginResult.GetTransaction(); + + auto result = session.ExecuteDataQuery(R"( + SELECT COUNT(*) AS cnt FROM otel_demo + )", NTable::TTxControl::Tx(tx)).GetValueSync(); + + if (!result.IsSuccess()) { + return result; + } + + return tx.Commit().GetValueSync(); + })); + + if (i % 5 == 4) { + auto rollbackResult = client.RetryOperationSync([](NTable::TSession session) -> TStatus { + auto beginResult = session.BeginTransaction(NTable::TTxSettings::SerializableRW()).GetValueSync(); + if (!beginResult.IsSuccess()) { + return beginResult; + } + auto tx = beginResult.GetTransaction(); + return tx.Rollback().GetValueSync(); + }); + if (!rollbackResult.IsSuccess()) { + std::cerr << " Rollback status: " << static_cast(rollbackResult.GetStatus()) << std::endl; + } + } + + iterSpan->SetStatus(opentelemetry::trace::StatusCode::kOk); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } +} + +void RunRetryWorkload(NQuery::TQueryClient& client, int workers, int opsPerWorker) { + std::cout << "\n=== Retry workload (SERIALIZABLE conflicts) ===" + << " workers=" << workers << " ops=" << opsPerWorker << std::endl; + + auto tracer = GetAppTracer(); + + { + auto seedSpan = tracer->StartSpan("RetryWorkload.Seed"); + auto scope = opentelemetry::trace::Scope(seedSpan); + + ThrowOnError(client.RetryQuerySync([](NQuery::TSession session) { + return session.ExecuteQuery(R"( + UPSERT INTO otel_demo (id, value) VALUES (9999u, "seed") + )", NQuery::TTxControl::BeginTx(NQuery::TTxSettings::SerializableRW()).CommitTx()).GetValueSync(); + })); + } + + std::atomic conflicts{0}; + std::atomic successes{0}; + std::vector threads; + threads.reserve(workers); + + for (int w = 0; w < workers; ++w) { + threads.emplace_back([&, w]() { + auto workerTracer = GetAppTracer(); + for (int i = 0; i < opsPerWorker; ++i) { + auto iterSpan = workerTracer->StartSpan("RetryWorkload.Op"); + auto scope = opentelemetry::trace::Scope(iterSpan); + iterSpan->SetAttribute("worker", static_cast(w)); + iterSpan->SetAttribute("op", static_cast(i)); + + auto status = client.RetryQuerySync( + [w, i, &conflicts](NQuery::TQueryClient client) -> TStatus { + auto sessionRes = client.GetSession().GetValueSync(); + if (!sessionRes.IsSuccess()) { + return sessionRes; + } + auto session = sessionRes.GetSession(); + + auto beginRes = session.BeginTransaction( + NQuery::TTxSettings::SerializableRW()).GetValueSync(); + if (!beginRes.IsSuccess()) { + return beginRes; + } + auto tx = beginRes.GetTransaction(); + + auto readRes = session.ExecuteQuery(R"( + SELECT value FROM otel_demo WHERE id = 9999u + )", NQuery::TTxControl::Tx(tx)).GetValueSync(); + if (!readRes.IsSuccess()) { + if (readRes.GetStatus() == EStatus::ABORTED) { + conflicts.fetch_add(1); + } + return readRes; + } + + std::this_thread::sleep_for( + std::chrono::milliseconds(5 + (w * 7 + i * 3) % 20)); + + auto params = TParamsBuilder() + .AddParam("$v").Utf8("w" + std::to_string(w) + + "_i" + std::to_string(i)).Build() + .Build(); + + auto writeRes = session.ExecuteQuery(R"( + DECLARE $v AS Utf8; + UPSERT INTO otel_demo (id, value) VALUES (9999u, $v) + )", NQuery::TTxControl::Tx(tx), params).GetValueSync(); + if (!writeRes.IsSuccess()) { + if (writeRes.GetStatus() == EStatus::ABORTED) { + conflicts.fetch_add(1); + } + return writeRes; + } + + auto commitRes = tx.Commit().GetValueSync(); + if (!commitRes.IsSuccess() + && commitRes.GetStatus() == EStatus::ABORTED) { + conflicts.fetch_add(1); + } + return commitRes; + }); + + if (status.IsSuccess()) { + successes.fetch_add(1); + iterSpan->SetStatus(opentelemetry::trace::StatusCode::kOk); + } else { + iterSpan->SetStatus(opentelemetry::trace::StatusCode::kError, + std::string(ToString(status.GetStatus()))); + std::cerr << " [retry-wl] worker=" << w << " op=" << i + << " final_status=" << static_cast(status.GetStatus()) + << std::endl; + } + } + }); + } + + for (auto& t : threads) { + t.join(); + } + + std::cout << " Retry workload done." + << " successes=" << successes.load() + << " observed_aborts=" << conflicts.load() + << " (each abort triggers one SDK retry attempt)" << std::endl; +} + +int main(int argc, char** argv) { + TConfig cfg; + + NLastGetopt::TOpts opts; + opts.AddLongOption('e', "endpoint", "YDB endpoint") + .DefaultValue(cfg.Endpoint).StoreResult(&cfg.Endpoint); + opts.AddLongOption('d', "database", "YDB database") + .DefaultValue(cfg.Database).StoreResult(&cfg.Database); + opts.AddLongOption("otlp", "OTLP HTTP endpoint") + .DefaultValue(cfg.OtlpEndpoint).StoreResult(&cfg.OtlpEndpoint); + opts.AddLongOption('n', "iterations", "Number of iterations") + .DefaultValue(std::to_string(cfg.Iterations)).StoreResult(&cfg.Iterations); + opts.AddLongOption("retry-workers", "Concurrent workers for retry workload (0 to skip)") + .DefaultValue(std::to_string(cfg.RetryWorkers)).StoreResult(&cfg.RetryWorkers); + opts.AddLongOption("retry-ops", "Operations per retry worker") + .DefaultValue(std::to_string(cfg.RetryOps)).StoreResult(&cfg.RetryOps); + + NLastGetopt::TOptsParseResult parsedOpts(&opts, argc, argv); + + if (cfg.Endpoint.rfind("grpc://", 0) == 0) { + cfg.Endpoint.erase(0, 7); + } else if (cfg.Endpoint.rfind("grpcs://", 0) == 0) { + cfg.Endpoint.erase(0, 8); + } + + std::cout << "Initializing OpenTelemetry..." << std::endl; + std::cout << " OTLP endpoint: " << cfg.OtlpEndpoint << std::endl; + + auto tracerProvider = InitTracing(cfg); + auto meterProvider = InitMetrics(cfg); + + auto ydbTraceProvider = NTrace::CreateOtelTraceProvider(tracerProvider); + auto ydbMetricRegistry = NMetrics::CreateOtelMetricRegistry(meterProvider); + + std::cout << "Connecting to YDB at " << cfg.Endpoint << cfg.Database << std::endl; + + auto driverConfig = TDriverConfig() + .SetEndpoint(cfg.Endpoint) + .SetDatabase(cfg.Database) + .SetDiscoveryMode(EDiscoveryMode::Off) + .SetTraceProvider(ydbTraceProvider) + .SetMetricRegistry(ydbMetricRegistry); + + TDriver driver(driverConfig); + NQuery::TQueryClient queryClient(driver); + NTable::TTableClient tableClient(driver); + + try { + RunQueryWorkload(queryClient, cfg.Iterations); + RunTableWorkload(tableClient, cfg.Iterations); + + if (cfg.RetryWorkers > 0 && cfg.RetryOps > 0) { + RunRetryWorkload(queryClient, cfg.RetryWorkers, cfg.RetryOps); + } + + std::cout << "\n=== Cleanup ===" << std::endl; + ThrowOnError(queryClient.RetryQuerySync([](NQuery::TSession session) { + return session.ExecuteQuery( + "DROP TABLE otel_demo", NQuery::TTxControl::NoTx()).GetValueSync(); + })); + } catch (const std::exception& e) { + std::cerr << "Error: " << e.what() << std::endl; + } + + std::cout << "Flushing telemetry..." << std::endl; + + driver.Stop(true); + + if (auto* sdkTracerProvider = dynamic_cast(tracerProvider.get())) { + sdkTracerProvider->ForceFlush(); + } + if (auto* sdkMeterProvider = dynamic_cast(meterProvider.get())) { + sdkMeterProvider->ForceFlush(); + } + + std::this_thread::sleep_for(std::chrono::seconds(3)); + + std::cout << "Done. Open Grafana at http://localhost:3000" << std::endl; + std::cout << " Jaeger UI at http://localhost:16686" << std::endl; + std::cout << " Prometheus at http://localhost:9090" << std::endl; + + return 0; +} diff --git a/examples/otel_tracing/otel-collector/config.yml b/examples/otel_tracing/otel-collector/config.yml new file mode 100644 index 0000000000..9589c9cd4e --- /dev/null +++ b/examples/otel_tracing/otel-collector/config.yml @@ -0,0 +1,32 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + +exporters: + otlp/jaeger: + endpoint: jaeger:4317 + tls: + insecure: true + + prometheus: + endpoint: 0.0.0.0:8889 + +processors: + batch: + timeout: 1s + send_batch_size: 1024 + +service: + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [otlp/jaeger] + metrics: + receivers: [otlp] + processors: [batch] + exporters: [prometheus] diff --git a/examples/otel_tracing/prometheus/prometheus.yml b/examples/otel_tracing/prometheus/prometheus.yml new file mode 100644 index 0000000000..faeda702af --- /dev/null +++ b/examples/otel_tracing/prometheus/prometheus.yml @@ -0,0 +1,8 @@ +global: + scrape_interval: 5s + evaluation_interval: 5s + +scrape_configs: + - job_name: "otel-collector" + static_configs: + - targets: ["otel-collector:8889"] diff --git a/include/ydb-cpp-sdk/client/trace/trace.h b/include/ydb-cpp-sdk/client/trace/trace.h index 117f4220b3..54ace20e8f 100644 --- a/include/ydb-cpp-sdk/client/trace/trace.h +++ b/include/ydb-cpp-sdk/client/trace/trace.h @@ -15,6 +15,17 @@ enum class ESpanKind { CONSUMER }; +enum class ESpanStatus { + Unset, + Ok, + Error +}; + +class IScope { +public: + virtual ~IScope() = default; +}; + class ISpan { public: virtual ~ISpan() = default; @@ -22,12 +33,20 @@ class ISpan { virtual void SetAttribute(const std::string& key, const std::string& value) = 0; virtual void SetAttribute(const std::string& key, int64_t value) = 0; virtual void AddEvent(const std::string& name, const std::map& attributes = {}) = 0; + virtual std::unique_ptr Activate() = 0; + + virtual void SetStatus(ESpanStatus /*status*/, const std::string& /*description*/ = {}) {} }; class ITracer { public: virtual ~ITracer() = default; - virtual std::shared_ptr StartSpan(const std::string& name, ESpanKind kind = ESpanKind::INTERNAL) = 0; + + virtual std::shared_ptr StartSpan( + const std::string& name + , ESpanKind kind = ESpanKind::INTERNAL + , ISpan* parent = nullptr + ) = 0; }; class ITraceProvider { diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 33bacdb987..0d23280045 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -1,2 +1,2 @@ add_subdirectory(metrics) -add_subdirectory(trace) \ No newline at end of file +add_subdirectory(trace) diff --git a/plugins/metrics/CMakeLists.txt b/plugins/metrics/CMakeLists.txt index 2bcde51409..6d50a5111e 100644 --- a/plugins/metrics/CMakeLists.txt +++ b/plugins/metrics/CMakeLists.txt @@ -1,3 +1,3 @@ if (YDB_SDK_ENABLE_OTEL_METRICS) add_subdirectory(otel EXCLUDE_FROM_ALL) -endif() \ No newline at end of file +endif() diff --git a/plugins/metrics/otel/CMakeLists.txt b/plugins/metrics/otel/CMakeLists.txt index fe6bcf4ea0..4fcb9f3018 100644 --- a/plugins/metrics/otel/CMakeLists.txt +++ b/plugins/metrics/otel/CMakeLists.txt @@ -15,4 +15,4 @@ target_link_libraries(open_telemetry_metrics PUBLIC ) _ydb_sdk_make_client_component(OpenTelemetryMetrics open_telemetry_metrics) -_ydb_sdk_install_headers(${CMAKE_INSTALL_INCLUDEDIR} DIRECTORY include/) \ No newline at end of file +_ydb_sdk_install_headers(${CMAKE_INSTALL_INCLUDEDIR} DIRECTORY include/) diff --git a/plugins/metrics/otel/include/ydb-cpp-sdk/open_telemetry/metrics.h b/plugins/metrics/otel/include/ydb-cpp-sdk/open_telemetry/metrics.h new file mode 100644 index 0000000000..e02936cffc --- /dev/null +++ b/plugins/metrics/otel/include/ydb-cpp-sdk/open_telemetry/metrics.h @@ -0,0 +1,19 @@ +#pragma once + +#include + +#include +#include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace metrics { +class MeterProvider; +} +OPENTELEMETRY_END_NAMESPACE + +namespace NYdb::inline V3::NMetrics { + +std::shared_ptr CreateOtelMetricRegistry( + opentelemetry::nostd::shared_ptr meterProvider); + +} // namespace NYdb::NMetrics diff --git a/plugins/metrics/otel/metrics.cpp b/plugins/metrics/otel/src/metrics.cpp similarity index 100% rename from plugins/metrics/otel/metrics.cpp rename to plugins/metrics/otel/src/metrics.cpp diff --git a/plugins/trace/CMakeLists.txt b/plugins/trace/CMakeLists.txt index c1977028a0..ef231ab710 100644 --- a/plugins/trace/CMakeLists.txt +++ b/plugins/trace/CMakeLists.txt @@ -1,3 +1,3 @@ if (YDB_SDK_ENABLE_OTEL_TRACE) add_subdirectory(otel EXCLUDE_FROM_ALL) -endif() \ No newline at end of file +endif() diff --git a/plugins/trace/otel/CMakeLists.txt b/plugins/trace/otel/CMakeLists.txt index c1cec305ec..6816d8ff7c 100644 --- a/plugins/trace/otel/CMakeLists.txt +++ b/plugins/trace/otel/CMakeLists.txt @@ -13,4 +13,4 @@ target_link_libraries(open_telemetry_trace PUBLIC ) _ydb_sdk_make_client_component(OpenTelemetryTrace open_telemetry_trace) -_ydb_sdk_install_headers(${CMAKE_INSTALL_INCLUDEDIR} DIRECTORY include/) \ No newline at end of file +_ydb_sdk_install_headers(${CMAKE_INSTALL_INCLUDEDIR} DIRECTORY include/) diff --git a/plugins/trace/otel/include/ydb-cpp-sdk/open_telemetry/trace.h b/plugins/trace/otel/include/ydb-cpp-sdk/open_telemetry/trace.h new file mode 100644 index 0000000000..0c891ff9ec --- /dev/null +++ b/plugins/trace/otel/include/ydb-cpp-sdk/open_telemetry/trace.h @@ -0,0 +1,19 @@ +#pragma once + +#include + +#include +#include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace trace { +class TracerProvider; +} +OPENTELEMETRY_END_NAMESPACE + +namespace NYdb::inline V3::NTrace { + +std::shared_ptr CreateOtelTraceProvider( + opentelemetry::nostd::shared_ptr tracerProvider); + +} // namespace NYdb::NTrace diff --git a/plugins/trace/otel/src/trace.cpp b/plugins/trace/otel/src/trace.cpp new file mode 100644 index 0000000000..16d81a8d72 --- /dev/null +++ b/plugins/trace/otel/src/trace.cpp @@ -0,0 +1,138 @@ +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace NYdb::inline V3::NTrace { + +namespace { + +namespace otel_trace = opentelemetry::trace; +namespace otel_nostd = opentelemetry::nostd; +namespace otel_common = opentelemetry::common; + +otel_trace::SpanKind MapSpanKind(ESpanKind kind) { + switch (kind) { + case ESpanKind::INTERNAL: return otel_trace::SpanKind::kInternal; + case ESpanKind::SERVER: return otel_trace::SpanKind::kServer; + case ESpanKind::CLIENT: return otel_trace::SpanKind::kClient; + case ESpanKind::PRODUCER: return otel_trace::SpanKind::kProducer; + case ESpanKind::CONSUMER: return otel_trace::SpanKind::kConsumer; + } + return otel_trace::SpanKind::kInternal; +} + +otel_trace::StatusCode MapSpanStatus(ESpanStatus status) { + switch (status) { + case ESpanStatus::Unset: return otel_trace::StatusCode::kUnset; + case ESpanStatus::Ok: return otel_trace::StatusCode::kOk; + case ESpanStatus::Error: return otel_trace::StatusCode::kError; + } + return otel_trace::StatusCode::kUnset; +} + +class TOtelScope : public IScope { +public: + TOtelScope(otel_nostd::shared_ptr span) + : Scope_(std::move(span)) + {} + +private: + otel_trace::Scope Scope_; +}; + +class TOtelSpan : public ISpan { +public: + TOtelSpan(otel_nostd::shared_ptr span) + : Span_(std::move(span)) + {} + + const otel_nostd::shared_ptr& RawSpan() const noexcept { + return Span_; + } + + void End() override { + Span_->End(); + } + + void SetAttribute(const std::string& key, const std::string& value) override { + Span_->SetAttribute(key, value); + } + + void SetAttribute(const std::string& key, int64_t value) override { + Span_->SetAttribute(key, value); + } + + void AddEvent(const std::string& name, const std::map& attributes) override { + if (attributes.empty()) { + Span_->AddEvent(name); + } else { + std::vector> attrs; + attrs.reserve(attributes.size()); + for (const auto& [k, v] : attributes) { + attrs.emplace_back(otel_nostd::string_view(k), otel_common::AttributeValue(otel_nostd::string_view(v))); + } + Span_->AddEvent(name, attrs); + } + } + + std::unique_ptr Activate() override { + return std::make_unique(Span_); + } + + void SetStatus(ESpanStatus status, const std::string& description) override { + Span_->SetStatus(MapSpanStatus(status), description); + } + +private: + otel_nostd::shared_ptr Span_; +}; + +class TOtelTracer : public ITracer { +public: + TOtelTracer(otel_nostd::shared_ptr tracer) + : Tracer_(std::move(tracer)) + {} + + std::shared_ptr StartSpan(const std::string& name, ESpanKind kind, ISpan* parent) override { + otel_trace::StartSpanOptions options; + options.kind = MapSpanKind(kind); + if (auto* otelParent = dynamic_cast(parent)) { + auto context = opentelemetry::context::RuntimeContext::GetCurrent(); + options.parent = otel_trace::SetSpan(context, otelParent->RawSpan()); + } + return std::make_shared(Tracer_->StartSpan(name, options)); + } + +private: + otel_nostd::shared_ptr Tracer_; +}; + +class TOtelTraceProvider : public ITraceProvider { +public: + TOtelTraceProvider(otel_nostd::shared_ptr tracerProvider) + : TracerProvider_(std::move(tracerProvider)) + {} + + std::shared_ptr GetTracer(const std::string& name) override { + return std::make_shared(TracerProvider_->GetTracer(name)); + } + +private: + otel_nostd::shared_ptr TracerProvider_; +}; + +} // namespace + +std::shared_ptr CreateOtelTraceProvider( + opentelemetry::nostd::shared_ptr tracerProvider) +{ + return std::make_shared(std::move(tracerProvider)); +} + +} // namespace NYdb::NTrace diff --git a/plugins/trace/otel/trace.cpp b/plugins/trace/otel/trace.cpp deleted file mode 100644 index 41b1df6479..0000000000 --- a/plugins/trace/otel/trace.cpp +++ /dev/null @@ -1,97 +0,0 @@ -#include - -#include -#include -#include - -namespace NYdb::inline V3::NTrace { - -namespace { - -using namespace opentelemetry; - -trace::SpanKind MapSpanKind(ESpanKind kind) { - switch (kind) { - case ESpanKind::INTERNAL: return trace::SpanKind::kInternal; - case ESpanKind::SERVER: return trace::SpanKind::kServer; - case ESpanKind::CLIENT: return trace::SpanKind::kClient; - case ESpanKind::PRODUCER: return trace::SpanKind::kProducer; - case ESpanKind::CONSUMER: return trace::SpanKind::kConsumer; - } - return trace::SpanKind::kInternal; -} - -class TOtelSpan : public ISpan { -public: - TOtelSpan(nostd::shared_ptr span) - : Span_(std::move(span)) - {} - - void End() override { - Span_->End(); - } - - void SetAttribute(const std::string& key, const std::string& value) override { - Span_->SetAttribute(key, value); - } - - void SetAttribute(const std::string& key, int64_t value) override { - Span_->SetAttribute(key, value); - } - - void AddEvent(const std::string& name, const std::map& attributes) override { - if (attributes.empty()) { - Span_->AddEvent(name); - } else { - std::vector> attrs; - attrs.reserve(attributes.size()); - for (const auto& [k, v] : attributes) { - attrs.emplace_back(nostd::string_view(k), common::AttributeValue(nostd::string_view(v))); - } - Span_->AddEvent(name, attrs); - } - } - -private: - nostd::shared_ptr Span_; -}; - -class TOtelTracer : public ITracer { -public: - TOtelTracer(nostd::shared_ptr tracer) - : Tracer_(std::move(tracer)) - {} - - std::shared_ptr StartSpan(const std::string& name, ESpanKind kind) override { - trace::StartSpanOptions options; - options.kind = MapSpanKind(kind); - return std::make_shared(Tracer_->StartSpan(name, options)); - } - -private: - nostd::shared_ptr Tracer_; -}; - -class TOtelTraceProvider : public ITraceProvider { -public: - TOtelTraceProvider(nostd::shared_ptr tracerProvider) - : TracerProvider_(std::move(tracerProvider)) - {} - - std::shared_ptr GetTracer(const std::string& name) override { - return std::make_shared(TracerProvider_->GetTracer(name)); - } - -private: - nostd::shared_ptr TracerProvider_; -}; - -} // namespace - -std::shared_ptr CreateOtelTraceProvider( - opentelemetry::nostd::shared_ptr tracerProvider) -{ - return std::make_shared(std::move(tracerProvider)); -} - -} // namespace NYdb::NTrace diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 3dff709405..b251a04138 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,3 +1,3 @@ add_subdirectory(api) add_subdirectory(client) -add_subdirectory(library) \ No newline at end of file +add_subdirectory(library) diff --git a/src/client/impl/internal/common/client_pid.cpp b/src/client/impl/internal/common/client_pid.cpp index 4bc2136a99..76347f3f74 100644 --- a/src/client/impl/internal/common/client_pid.cpp +++ b/src/client/impl/internal/common/client_pid.cpp @@ -5,6 +5,9 @@ #include +#include +#include + #ifdef _win_ // copied from util/system/getpid.cpp // to avoid extra util dep. diff --git a/src/client/impl/internal/retry/CMakeLists.txt b/src/client/impl/internal/retry/CMakeLists.txt index bde0f9e0ae..d8d5853000 100644 --- a/src/client/impl/internal/retry/CMakeLists.txt +++ b/src/client/impl/internal/retry/CMakeLists.txt @@ -3,6 +3,7 @@ _ydb_sdk_add_library(impl-internal-retry) target_link_libraries(impl-internal-retry PUBLIC yutil impl-internal-grpc_connections + impl-observability ) target_sources(impl-internal-retry PRIVATE diff --git a/src/client/impl/internal/retry/retry.cpp b/src/client/impl/internal/retry/retry.cpp index 73880d0e5c..526175189b 100644 --- a/src/client/impl/internal/retry/retry.cpp +++ b/src/client/impl/internal/retry/retry.cpp @@ -28,14 +28,18 @@ TBackoffDuration CalcBackoffTime(const TBackoffSettings& settings, std::uint32_t } -void Backoff(const NRetry::TBackoffSettings& settings, std::uint32_t retryNumber) { - std::this_thread::sleep_for(CalcBackoffTime(settings, retryNumber)); +std::chrono::microseconds Backoff(const NRetry::TBackoffSettings& settings, std::uint32_t retryNumber) { + const auto duration = CalcBackoffTime(settings, retryNumber); + std::this_thread::sleep_for(duration); + return std::chrono::duration_cast(duration); } -void AsyncBackoff(std::shared_ptr client, const TBackoffSettings& settings, +std::chrono::microseconds AsyncBackoff(std::shared_ptr client, const TBackoffSettings& settings, std::uint32_t retryNumber, const std::function& fn) { - client->ScheduleTask(fn, std::chrono::duration_cast(CalcBackoffTime(settings, retryNumber))); + const auto duration = CalcBackoffTime(settings, retryNumber); + client->ScheduleTask(fn, std::chrono::duration_cast(duration)); + return std::chrono::duration_cast(duration); } } diff --git a/src/client/impl/internal/retry/retry.h b/src/client/impl/internal/retry/retry.h index 6fe090409c..fad4805357 100644 --- a/src/client/impl/internal/retry/retry.h +++ b/src/client/impl/internal/retry/retry.h @@ -21,8 +21,8 @@ class IClientImplCommon; namespace NYdb::inline V3::NRetry { -void Backoff(const NRetry::TBackoffSettings& settings, std::uint32_t retryNumber); -void AsyncBackoff(std::shared_ptr client, const TBackoffSettings& settings, +std::chrono::microseconds Backoff(const NRetry::TBackoffSettings& settings, std::uint32_t retryNumber); +std::chrono::microseconds AsyncBackoff(std::shared_ptr client, const TBackoffSettings& settings, std::uint32_t retryNumber, const std::function& fn); enum class NextStep { diff --git a/src/client/impl/internal/retry/retry_async.h b/src/client/impl/internal/retry/retry_async.h index fbaa06df32..9e06d033c5 100644 --- a/src/client/impl/internal/retry/retry_async.h +++ b/src/client/impl/internal/retry/retry_async.h @@ -1,9 +1,18 @@ #pragma once +#include + #include +#include #include +#include +#include +#include +#include +#include + namespace NYdb::inline V3::NRetry::Async { template @@ -18,9 +27,45 @@ class TRetryContext : public TThrRefBase, public TRetryContextBase { public: TAsyncStatusType Execute() { + ParentSpan_ = Client_.Impl_->CreateRetryRootSpan(); + + // parentScope publishes the root retry span as the active thread-local + // context only for the synchronous prefix below (until DoRetry returns). + // The attempt span is parented to ParentSpan_ explicitly via + // CreateRetryAttemptSpan(..., ParentSpan_) in StartAttemptSpan(), so the + // root <- attempt link does NOT depend on this scope and survives the + // asynchronous boundary in the .Apply() callback below. + auto parentScope = ParentSpan_ ? ParentSpan_->Activate() : nullptr; + this->RetryStartTime_ = TInstant::Now(); - this->Retry(); - return this->Promise_.GetFuture(); + TPtr self(this); + DoRetry(self); + + return this->Promise_.GetFuture().Apply( + [self](const auto& f) mutable { + try { + auto value = f.GetValue(); + if (self->ParentSpan_) { + self->ParentSpan_->SetRetryCount(self->RetryNumber_); + self->ParentSpan_->End(value.GetStatus()); + } + return value; + } catch (...) { + if (self->ParentSpan_) { + self->ParentSpan_->SetRetryCount(self->RetryNumber_); + try { + std::rethrow_exception(std::current_exception()); + } catch (const std::exception& e) { + self->ParentSpan_->RecordException(typeid(e).name(), e.what()); + } catch (...) { + self->ParentSpan_->RecordException("unknown", "unknown exception"); + } + self->ParentSpan_->End(EStatus::CLIENT_INTERNAL_ERROR); + } + throw; + } + } + ); } protected: @@ -35,21 +80,27 @@ class TRetryContext : public TThrRefBase, public TRetryContextBase { virtual TAsyncStatusType RunOperation() = 0; static void DoRetry(TPtr self) { + self->StartAttemptSpan(); + + auto scope = self->AttemptSpan_ ? self->AttemptSpan_->Activate() : nullptr; self->Retry(); } static void DoBackoff(TPtr self, bool fast) { auto backoffSettings = fast ? self->Settings_.FastBackoffSettings_ : self->Settings_.SlowBackoffSettings_; - AsyncBackoff(self->Client_.Impl_, backoffSettings, self->RetryNumber_, + const auto backoff = AsyncBackoff(self->Client_.Impl_, backoffSettings, self->RetryNumber_, [self]() {DoRetry(self);}); + self->LastBackoffMs_ = std::chrono::duration_cast(backoff).count(); } static void HandleExceptionAsync(TPtr self, std::exception_ptr e) { + self->EndAttemptSpan(EStatus::CLIENT_INTERNAL_ERROR); self->Promise_.SetException(e); } static void HandleStatusAsync(TPtr self, const TStatusType& status) { + self->EndAttemptSpan(status.GetStatus()); auto nextStep = self->GetNextStep(status); if (nextStep != NextStep::Finish) { self->RetryNumber_++; @@ -79,6 +130,23 @@ class TRetryContext : public TThrRefBase, public TRetryContextBase { } ); } + +private: + void StartAttemptSpan() { + AttemptSpan_ = Client_.Impl_->CreateRetryAttemptSpan( + this->RetryNumber_, LastBackoffMs_, ParentSpan_); + } + + void EndAttemptSpan(EStatus status) { + if (AttemptSpan_) { + AttemptSpan_->End(status); + AttemptSpan_.reset(); + } + } + + std::shared_ptr ParentSpan_; + std::shared_ptr AttemptSpan_; + std::int64_t LastBackoffMs_ = 0; }; template > diff --git a/src/client/impl/internal/retry/retry_sync.h b/src/client/impl/internal/retry/retry_sync.h index beefcb2771..4452368d54 100644 --- a/src/client/impl/internal/retry/retry_sync.h +++ b/src/client/impl/internal/retry/retry_sync.h @@ -1,9 +1,15 @@ #pragma once #include +#include #include #include +#include + +#include +#include +#include namespace NYdb::inline V3::NRetry::Sync { @@ -14,46 +20,96 @@ class TRetryContext : public TRetryContextBase { public: TStatusType Execute() { + ParentSpan_ = Client_.Impl_->CreateRetryRootSpan(); + + auto parentScope = ParentSpan_ ? ParentSpan_->Activate() : nullptr; + auto& parentSpan = ParentSpan_; + + try { + auto status = ExecuteImpl(); + if (parentSpan) { + parentSpan->SetRetryCount(this->RetryNumber_); + parentSpan->End(status.GetStatus()); + } + return status; + } catch (...) { + if (parentSpan) { + parentSpan->SetRetryCount(this->RetryNumber_); + try { + std::rethrow_exception(std::current_exception()); + } catch (const std::exception& e) { + parentSpan->RecordException(typeid(e).name(), e.what()); + } catch (...) { + parentSpan->RecordException("unknown", "unknown exception"); + } + parentSpan->End(EStatus::CLIENT_INTERNAL_ERROR); + } + throw; + } + } + +protected: + TRetryContext(TClient& client, const TRetryOperationSettings& settings) + : TRetryContextBase(settings) + , Client_(client) + {} + + virtual TStatusType Retry() = 0; + + virtual TStatusType RunOperation() = 0; + + std::chrono::microseconds DoBackoff(bool fast) { + const auto &settings = fast ? this->Settings_.FastBackoffSettings_ + : this->Settings_.SlowBackoffSettings_; + return Backoff(settings, this->RetryNumber_); + } + +private: + TStatusType ExecuteImpl() { this->RetryStartTime_ = TInstant::Now(); - TStatusType status = Retry(); // first attempt + std::int64_t lastBackoffMs = 0; + + TStatusType status = RunAttempt(lastBackoffMs); for (this->RetryNumber_ = 0; this->RetryNumber_ <= this->Settings_.MaxRetries_;) { auto nextStep = this->GetNextStep(status); + std::chrono::microseconds backoff{}; switch (nextStep) { case NextStep::RetryImmediately: break; case NextStep::RetryFastBackoff: - DoBackoff(true); + backoff = DoBackoff(true); break; case NextStep::RetrySlowBackoff: - DoBackoff(false); + backoff = DoBackoff(false); break; case NextStep::Finish: return status; } - // make next retry this->RetryNumber_++; this->LogRetry(status); this->Client_.Impl_->CollectRetryStatSync(status.GetStatus()); - status = Retry(); + lastBackoffMs = std::chrono::duration_cast(backoff).count(); + status = RunAttempt(lastBackoffMs); } return status; } -protected: - TRetryContext(TClient& client, const TRetryOperationSettings& settings) - : TRetryContextBase(settings) - , Client_(client) - {} + TStatusType RunAttempt(std::int64_t backoffMs) { + auto attemptSpan = Client_.Impl_->CreateRetryAttemptSpan(this->RetryNumber_, backoffMs, ParentSpan_); + std::unique_ptr scope; + if (attemptSpan) { + scope = attemptSpan->Activate(); + } - virtual TStatusType Retry() = 0; + TStatusType status = Retry(); - virtual TStatusType RunOperation() = 0; - - void DoBackoff(bool fast) { - const auto &settings = fast ? this->Settings_.FastBackoffSettings_ - : this->Settings_.SlowBackoffSettings_; - Backoff(settings, this->RetryNumber_); + if (attemptSpan) { + attemptSpan->End(status.GetStatus()); + } + return status; } + + std::shared_ptr ParentSpan_; }; template> diff --git a/src/client/impl/observability/CMakeLists.txt b/src/client/impl/observability/CMakeLists.txt index c92be9dab6..0270152697 100644 --- a/src/client/impl/observability/CMakeLists.txt +++ b/src/client/impl/observability/CMakeLists.txt @@ -1,15 +1,21 @@ +add_subdirectory(error_category) + _ydb_sdk_add_library(impl-observability) target_link_libraries(impl-observability PUBLIC yutil client-metrics + client-trace client-impl-ydb_stats + impl-observability-error_category + impl-internal-db_driver_state ) target_sources(impl-observability PRIVATE metrics.cpp observation.cpp + operation_name.cpp span.cpp ) -_ydb_sdk_install_targets(TARGETS impl-observability) \ No newline at end of file +_ydb_sdk_install_targets(TARGETS impl-observability) diff --git a/src/client/impl/observability/error_category/CMakeLists.txt b/src/client/impl/observability/error_category/CMakeLists.txt new file mode 100644 index 0000000000..1904d9f68a --- /dev/null +++ b/src/client/impl/observability/error_category/CMakeLists.txt @@ -0,0 +1,11 @@ +_ydb_sdk_add_library(impl-observability-error_category) + +target_link_libraries(impl-observability-error_category PUBLIC + yutil +) + +target_sources(impl-observability-error_category PRIVATE + error_category.cpp +) + +_ydb_sdk_install_targets(TARGETS impl-observability-error_category) diff --git a/src/client/impl/observability/error_category/error_category.cpp b/src/client/impl/observability/error_category/error_category.cpp new file mode 100644 index 0000000000..87ea95b0f9 --- /dev/null +++ b/src/client/impl/observability/error_category/error_category.cpp @@ -0,0 +1,13 @@ +#include "error_category.h" + +#include + +namespace NYdb::inline V3::NObservability { + +std::string_view CategorizeErrorType(EStatus status) noexcept { + return static_cast(status) >= TRANSPORT_STATUSES_FIRST + ? std::string_view("transport_error") + : std::string_view("ydb_error"); +} + +} // namespace NYdb::NObservability diff --git a/src/client/impl/observability/error_category/error_category.h b/src/client/impl/observability/error_category/error_category.h new file mode 100644 index 0000000000..b98781da14 --- /dev/null +++ b/src/client/impl/observability/error_category/error_category.h @@ -0,0 +1,14 @@ +#pragma once + +#include + +#include + +namespace NYdb::inline V3::NObservability { + +// Maps EStatus to OTel-style error.type category: +// "transport_error" for client/transport-layer statuses (>= TRANSPORT_STATUSES_FIRST), +// "ydb_error" for server-side YDB statuses. +std::string_view CategorizeErrorType(EStatus status) noexcept; + +} // namespace NYdb::NObservability diff --git a/src/client/impl/observability/metrics.cpp b/src/client/impl/observability/metrics.cpp index 01a96f2cc5..8443c27f17 100644 --- a/src/client/impl/observability/metrics.cpp +++ b/src/client/impl/observability/metrics.cpp @@ -1,5 +1,7 @@ #include "metrics.h" +#include "operation_name.h" + #include #include @@ -32,14 +34,14 @@ TRequestMetrics::TRequestMetrics(NSdkStats::TStatCollector::TClientOperationStat , const std::string& requestName , const TLog& log ) : Collector_(operationCollector) - , RequestName_(requestName) + , RequestName_(NormalizeOperationName(requestName)) , Log_(log) { if (!Collector_) { return; } try { - Collector_->IncRequestCount(requestName); + Collector_->IncRequestCount(RequestName_); StartTime_ = std::chrono::steady_clock::now(); } catch (...) { SafeLogRequestMetricsError(Log_, "failed to initialize metrics", std::current_exception()); diff --git a/src/client/impl/observability/observation.cpp b/src/client/impl/observability/observation.cpp index 7483087bcf..2e5d7fe0c6 100644 --- a/src/client/impl/observability/observation.cpp +++ b/src/client/impl/observability/observation.cpp @@ -1,26 +1,32 @@ #include "observation.h" +#define INCLUDE_YDB_INTERNAL_H +#include +#undef INCLUDE_YDB_INTERNAL_H + namespace NYdb::inline V3::NObservability { TRequestObservation::TRequestObservation(const std::string& ydbClientType , NSdkStats::TStatCollector::TClientOperationStatCollector* operationCollector , std::shared_ptr tracer , const std::string& operationName - , const std::string& discoveryEndpoint - , const std::string& database - , const TLog& log -) : Span_(std::make_shared(std::move(tracer), operationName, discoveryEndpoint, database, log, ydbClientType)) - , Metrics_(std::make_shared(operationCollector, operationName, log)) + , const std::shared_ptr& dbDriverState +) : Span_( + TRequestSpan::Create(ydbClientType + , std::move(tracer) + , operationName + , dbDriverState->DiscoveryEndpoint + , dbDriverState->Database + , dbDriverState->Log + ) + ), Metrics_( + std::make_shared(operationCollector, operationName, dbDriverState->Log) + ) {} -void TRequestObservation::SetPeerEndpoint(const std::string& endpoint) noexcept { +void TRequestObservation::End(EStatus status, const std::string& endpoint) noexcept { if (Span_) { Span_->SetPeerEndpoint(endpoint); - } -} - -void TRequestObservation::End(EStatus status) noexcept { - if (Span_) { Span_->End(status); } if (Metrics_) { diff --git a/src/client/impl/observability/observation.h b/src/client/impl/observability/observation.h index 544b0c4baf..542e2f6c95 100644 --- a/src/client/impl/observability/observation.h +++ b/src/client/impl/observability/observation.h @@ -14,13 +14,10 @@ class TRequestObservation { , NSdkStats::TStatCollector::TClientOperationStatCollector* operationCollector , std::shared_ptr tracer , const std::string& operationName - , const std::string& discoveryEndpoint - , const std::string& database - , const TLog& log + , const std::shared_ptr& dbDriverState ); - void SetPeerEndpoint(const std::string& endpoint) noexcept; - void End(EStatus status) noexcept; + void End(EStatus status, const std::string& endpoint = "") noexcept; void EndWithClientInternalError() noexcept; private: diff --git a/src/client/impl/observability/operation_name.cpp b/src/client/impl/observability/operation_name.cpp new file mode 100644 index 0000000000..08dc75784a --- /dev/null +++ b/src/client/impl/observability/operation_name.cpp @@ -0,0 +1,17 @@ +#include "operation_name.h" + +#include + +namespace NYdb::inline V3::NObservability { + +std::string NormalizeOperationName(const std::string& requestName) { + static constexpr std::string_view kPrefix = "ydb."; + if (requestName.size() >= kPrefix.size() + && std::string_view(requestName.data(), kPrefix.size()) == kPrefix) + { + return requestName; + } + return std::string(kPrefix) + requestName; +} + +} // namespace NYdb::NObservability diff --git a/src/client/impl/observability/operation_name.h b/src/client/impl/observability/operation_name.h new file mode 100644 index 0000000000..33f18d86f2 --- /dev/null +++ b/src/client/impl/observability/operation_name.h @@ -0,0 +1,9 @@ +#pragma once + +#include + +namespace NYdb::inline V3::NObservability { + +std::string NormalizeOperationName(const std::string& requestName); + +} // namespace NYdb::NObservability diff --git a/src/client/impl/observability/span.cpp b/src/client/impl/observability/span.cpp index 46805563fe..8d86cfd25e 100644 --- a/src/client/impl/observability/span.cpp +++ b/src/client/impl/observability/span.cpp @@ -1,7 +1,14 @@ #include "span.h" +#include "operation_name.h" + +#include #include +#define INCLUDE_YDB_INTERNAL_H +#include +#undef INCLUDE_YDB_INTERNAL_H + #include #include @@ -11,6 +18,8 @@ namespace NYdb::inline V3::NObservability { namespace { constexpr int DefaultGrpcPort = 2135; +constexpr const char* kRetryRootSpanName = "ydb.RunWithRetry"; +constexpr const char* kRetryAttemptSpanName = "ydb.Try"; std::string YdbClientApiAttributeValue(const std::string& clientType) noexcept { return clientType.empty() ? std::string("Unspecified") : clientType; @@ -48,6 +57,21 @@ void ParseEndpoint(const std::string& endpoint, std::string& host, int& port) { } } +void EmitExceptionEvent(NTrace::ISpan& span, + const std::string& type, + const std::string& message, + const std::string& stacktrace) +{ + std::map attrs{ + {"exception.type", type}, + {"exception.message", message}, + }; + if (!stacktrace.empty()) { + attrs.emplace("exception.stacktrace", stacktrace); + } + span.AddEvent("exception", attrs); +} + void SafeLogRequestSpanError(TLog& log, const char* message, std::exception_ptr exception) noexcept { try { if (!exception) { @@ -68,12 +92,74 @@ void SafeLogRequestSpanError(TLog& log, const char* message, std::exception_ptr } // namespace -TRequestSpan::TRequestSpan(std::shared_ptr tracer +std::shared_ptr TRequestSpan::Create(const std::string& ydbClientType + , std::shared_ptr tracer + , const std::string& requestName + , const std::string& discoveryEndpoint + , const std::string& database + , const TLog& log + , NTrace::ESpanKind kind + , const std::shared_ptr& parent +) { + NTrace::ISpan* parentRaw = parent ? parent->Span_.get() : nullptr; + return std::shared_ptr(new TRequestSpan( + ydbClientType, + std::move(tracer), + requestName, + discoveryEndpoint, + database, + log, + kind, + parentRaw + )); +} + +std::shared_ptr TRequestSpan::CreateForClientRetry(const std::string& ydbClientType + , std::shared_ptr tracer + , const std::shared_ptr& dbDriverState +) { + return Create( + ydbClientType, + std::move(tracer), + kRetryRootSpanName, + dbDriverState->DiscoveryEndpoint, + dbDriverState->Database, + dbDriverState->Log, + NTrace::ESpanKind::INTERNAL + ); +} + +std::shared_ptr TRequestSpan::CreateForRetryAttempt(const std::string& ydbClientType + , std::shared_ptr tracer + , const std::shared_ptr& dbDriverState + , std::uint32_t attempt + , std::int64_t backoffMs + , const std::shared_ptr& parent +) { + auto span = Create( + ydbClientType, + std::move(tracer), + kRetryAttemptSpanName, + dbDriverState->DiscoveryEndpoint, + dbDriverState->Database, + dbDriverState->Log, + NTrace::ESpanKind::INTERNAL, + parent + ); + if (span) { + span->SetRetryAttributes(attempt, backoffMs); + } + return span; +} + +TRequestSpan::TRequestSpan(const std::string& ydbClientType + , std::shared_ptr tracer , const std::string& requestName - , const std::string& endpoint + , const std::string& discoveryEndpoint , const std::string& database , const TLog& log - , const std::string& ydbClientType + , NTrace::ESpanKind kind + , NTrace::ISpan* parent ) : Log_(log) { if (!tracer) { return; @@ -81,16 +167,17 @@ TRequestSpan::TRequestSpan(std::shared_ptr tracer std::string host; int port; - ParseEndpoint(endpoint, host, port); + ParseEndpoint(discoveryEndpoint, host, port); try { - Span_ = tracer->StartSpan(requestName, NTrace::ESpanKind::CLIENT); + const auto operationName = NormalizeOperationName(requestName); + Span_ = tracer->StartSpan(operationName, kind, parent); if (!Span_) { return; } Span_->SetAttribute("db.system.name", "ydb"); Span_->SetAttribute("db.namespace", database); - Span_->SetAttribute("db.operation.name", requestName); + Span_->SetAttribute("db.operation.name", operationName); Span_->SetAttribute("ydb.client.api", YdbClientApiAttributeValue(ydbClientType)); Span_->SetAttribute("server.address", host); Span_->SetAttribute("server.port", static_cast(port)); @@ -136,12 +223,39 @@ void TRequestSpan::AddEvent(const std::string& name, const std::map TRequestSpan::Activate() noexcept { + if (!Span_) { + return nullptr; + } + try { + return Span_->Activate(); + } catch (...) { + SafeLogRequestSpanError(Log_, "failed to activate span", std::current_exception()); + return nullptr; + } +} + void TRequestSpan::End(EStatus status) noexcept { if (Span_) { try { - Span_->SetAttribute("db.response.status_code", ToString(status)); if (status != EStatus::SUCCESS) { - Span_->SetAttribute("error.type", ToString(status)); + const auto statusName = ToString(status); + const auto errorType = CategorizeErrorType(status); + Span_->SetAttribute("db.response.status_code", statusName); + Span_->SetAttribute("error.type", std::string(errorType)); + EmitExceptionEvent(*Span_, statusName, statusName, /*stacktrace=*/""); + Span_->SetStatus(NTrace::ESpanStatus::Error, statusName); } Span_->End(); } catch (...) { @@ -151,4 +265,27 @@ void TRequestSpan::End(EStatus status) noexcept { } } +void TRequestSpan::SetRetryCount(std::uint32_t count) noexcept { + if (!Span_ || count == 0) { + return; + } + try { + Span_->SetAttribute("ydb.retry.count", static_cast(count)); + } catch (...) { + SafeLogRequestSpanError(Log_, "failed to set retry count", std::current_exception()); + } +} + +void TRequestSpan::SetRetryAttributes(std::uint32_t attempt, std::int64_t backoffMs) noexcept { + if (!Span_ || attempt == 0) { + return; + } + try { + Span_->SetAttribute("ydb.retry.attempt", static_cast(attempt)); + Span_->SetAttribute("ydb.retry.backoff_ms", backoffMs); + } catch (...) { + SafeLogRequestSpanError(Log_, "failed to set retry attributes", std::current_exception()); + } +} + } // namespace NYdb::NObservability diff --git a/src/client/impl/observability/span.h b/src/client/impl/observability/span.h index dbbde456e6..5c1347617b 100644 --- a/src/client/impl/observability/span.h +++ b/src/client/impl/observability/span.h @@ -2,32 +2,77 @@ #include #include +#include #include +#include #include #include #include +namespace NYdb::inline V3 { + +class TDbDriverState; + +} // namespace NYdb::inline V3 + namespace NYdb::inline V3::NObservability { class TRequestSpan { public: - TRequestSpan(std::shared_ptr tracer + static std::shared_ptr Create( + const std::string& ydbClientType + , std::shared_ptr tracer , const std::string& requestName - , const std::string& endpoint + , const std::string& discoveryEndpoint , const std::string& database , const TLog& log - , const std::string& ydbClientType = {} + , NTrace::ESpanKind kind = NTrace::ESpanKind::CLIENT + , const std::shared_ptr& parent = nullptr + ); + + static std::shared_ptr CreateForClientRetry( + const std::string& ydbClientType + , std::shared_ptr tracer + , const std::shared_ptr& dbDriverState + ); + + static std::shared_ptr CreateForRetryAttempt( + const std::string& ydbClientType + , std::shared_ptr tracer + , const std::shared_ptr& dbDriverState + , std::uint32_t attempt + , std::int64_t backoffMs + , const std::shared_ptr& parent = nullptr ); + ~TRequestSpan() noexcept; + TRequestSpan(const TRequestSpan&) = delete; + TRequestSpan& operator=(const TRequestSpan&) = delete; + void SetPeerEndpoint(const std::string& endpoint) noexcept; void AddEvent(const std::string& name, const std::map& attributes = {}) noexcept; + void RecordException(const std::string& type, const std::string& message, const std::string& stacktrace = {}) noexcept; + std::unique_ptr Activate() noexcept; + void SetRetryCount(std::uint32_t count) noexcept; void End(EStatus status) noexcept; private: + TRequestSpan(const std::string& ydbClientType + , std::shared_ptr tracer + , const std::string& requestName + , const std::string& discoveryEndpoint + , const std::string& database + , const TLog& log + , NTrace::ESpanKind kind + , NTrace::ISpan* parent + ); + + void SetRetryAttributes(std::uint32_t attempt, std::int64_t backoffMs) noexcept; + TLog Log_; std::shared_ptr Span_; }; diff --git a/src/client/impl/stats/CMakeLists.txt b/src/client/impl/stats/CMakeLists.txt index 15866af4bc..0fac886036 100644 --- a/src/client/impl/stats/CMakeLists.txt +++ b/src/client/impl/stats/CMakeLists.txt @@ -5,6 +5,7 @@ target_link_libraries(client-impl-ydb_stats PUBLIC grpc-client monlib-metrics client-metrics + impl-observability-error_category ) target_sources(client-impl-ydb_stats PRIVATE diff --git a/src/client/impl/stats/stats.h b/src/client/impl/stats/stats.h index b2e61c0ace..570f9e4675 100644 --- a/src/client/impl/stats/stats.h +++ b/src/client/impl/stats/stats.h @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -326,10 +327,10 @@ struct TStatCollector { {"db.namespace", Database_}, {"db.operation.name", operationName}, {"ydb.client.api", YdbClientApiAttributeValue(ClientType_)}, - {"db.response.status_code", TStringBuilder() << status}, }; if (status != EStatus::SUCCESS) { - labels["error.type"] = TStringBuilder() << status; + labels["db.response.status_code"] = TStringBuilder() << status; + labels["error.type"] = std::string(NObservability::CategorizeErrorType(status)); } ExternalRegistry_->Histogram( "db.client.operation.duration", diff --git a/src/client/metrics/CMakeLists.txt b/src/client/metrics/CMakeLists.txt index 5f73534319..e681a846b2 100644 --- a/src/client/metrics/CMakeLists.txt +++ b/src/client/metrics/CMakeLists.txt @@ -4,4 +4,4 @@ target_sources(client-metrics PRIVATE metrics.cpp ) -_ydb_sdk_make_client_component(Metrics client-metrics) \ No newline at end of file +_ydb_sdk_make_client_component(Metrics client-metrics) diff --git a/src/client/query/CMakeLists.txt b/src/client/query/CMakeLists.txt index bc159ea87a..3cc7401200 100644 --- a/src/client/query/CMakeLists.txt +++ b/src/client/query/CMakeLists.txt @@ -12,6 +12,7 @@ target_link_libraries(client-ydb_query PUBLIC client-ydb_query-impl client-ydb_result client-metrics + client-trace client-types-operation api-protos api-grpc diff --git a/src/client/query/client.cpp b/src/client/query/client.cpp index a6db0b273d..448e50b147 100644 --- a/src/client/query/client.cpp +++ b/src/client/query/client.cpp @@ -104,14 +104,15 @@ class TQueryClient::TImpl: public TClientImplCommon, public CollectParamsSize(params ? ¶ms->GetProtoMap() : nullptr); auto obs = MakeObservation("ExecuteQuery"); + std::string sessionEndpoint = session.has_value() ? session->SessionImpl_->GetEndpoint() : std::string{}; return TExecQueryImpl::ExecuteQuery( Connections_, DbDriverState_, query, txControl, params, settings, session) - .Apply([obs](TAsyncExecuteQueryResult future) { + .Apply([obs, sessionEndpoint = std::move(sessionEndpoint)](TAsyncExecuteQueryResult future) { try { auto result = future.GetValue(); - obs->SetPeerEndpoint(result.GetEndpoint()); - obs->End(result.GetStatus()); + const auto& resultEndpoint = result.GetEndpoint(); + obs->End(result.GetStatus(), !resultEndpoint.empty() ? resultEndpoint : sessionEndpoint); return result; } catch (...) { obs->EndWithClientInternalError(); @@ -189,18 +190,17 @@ class TQueryClient::TImpl: public TClientImplCommon, public auto responseCb = [promise, session, obs] (Ydb::Query::RollbackTransactionResponse* response, TPlainStatus status) mutable { try { - obs->SetPeerEndpoint(status.Endpoint); if (response) { NYdb::NIssue::TIssues opIssues; NYdb::NIssue::IssuesFromMessage(response->issues(), opIssues); TStatus rollbackTxStatus(TPlainStatus{static_cast(response->status()), std::move(opIssues), status.Endpoint, std::move(status.Metadata)}); - obs->End(rollbackTxStatus.GetStatus()); + obs->End(rollbackTxStatus.GetStatus(), rollbackTxStatus.GetEndpoint()); promise.SetValue(std::move(rollbackTxStatus)); } else { - obs->End(status.Status); + obs->End(status.Status, status.Endpoint); promise.SetValue(TStatus(std::move(status))); } } catch (...) { @@ -237,19 +237,18 @@ class TQueryClient::TImpl: public TClientImplCommon, public auto responseCb = [promise, session, obs] (Ydb::Query::CommitTransactionResponse* response, TPlainStatus status) mutable { try { - obs->SetPeerEndpoint(status.Endpoint); if (response) { NYdb::NIssue::TIssues opIssues; NYdb::NIssue::IssuesFromMessage(response->issues(), opIssues); TStatus commitTxStatus(TPlainStatus{static_cast(response->status()), std::move(opIssues), status.Endpoint, std::move(status.Metadata)}); - obs->End(commitTxStatus.GetStatus()); + obs->End(commitTxStatus.GetStatus(), commitTxStatus.GetEndpoint()); TCommitTransactionResult commitTxResult(std::move(commitTxStatus)); promise.SetValue(std::move(commitTxResult)); } else { - obs->End(status.Status); + obs->End(status.Status, status.Endpoint); promise.SetValue(TCommitTransactionResult(TStatus(std::move(status)))); } } catch (...) { @@ -491,7 +490,7 @@ class TQueryClient::TImpl: public TClientImplCommon, public ); if (Observation) { - Observation->End(EStatus::SUCCESS); + Observation->End(EStatus::SUCCESS, session->GetEndpoint()); } ScheduleReply(std::move(val)); } @@ -502,8 +501,7 @@ class TQueryClient::TImpl: public TClientImplCommon, public { auto val = future.ExtractValue(); if (obs) { - obs->SetPeerEndpoint(val.GetEndpoint()); - obs->End(val.GetStatus()); + obs->End(val.GetStatus(), val.GetEndpoint()); } promise.SetValue(std::move(val)); }); @@ -533,7 +531,7 @@ class TQueryClient::TImpl: public TClientImplCommon, public std::shared_ptr Observation; }; - auto obs = MakeObservation("GetSession"); + auto obs = MakeObservation("CreateSession"); auto ctx = std::make_unique(shared_from_this(), settings, obs); auto future = ctx->GetFuture(); SessionPool_.GetSession(std::move(ctx)); @@ -602,6 +600,29 @@ class TQueryClient::TImpl: public TClientImplCommon, public } } + std::shared_ptr CreateRetryRootSpan() { + return NObservability::TRequestSpan::CreateForClientRetry( + "Query", + Tracer_, + DbDriverState_ + ); + } + + std::shared_ptr CreateRetryAttemptSpan( + std::uint32_t attempt, + std::int64_t backoffMs, + const std::shared_ptr& parent = nullptr) + { + return NObservability::TRequestSpan::CreateForRetryAttempt( + "Query", + Tracer_, + DbDriverState_, + attempt, + backoffMs, + parent + ); + } + private: std::shared_ptr MakeObservation(const std::string& operationName) { return std::make_shared( @@ -609,9 +630,7 @@ class TQueryClient::TImpl: public TClientImplCommon, public &OperationStatCollector_, Tracer_, operationName, - DbDriverState_->DiscoveryEndpoint, - DbDriverState_->Database, - DbDriverState_->Log + DbDriverState_ ); } @@ -692,28 +711,26 @@ int64_t TQueryClient::GetCurrentPoolSize() const { TAsyncExecuteQueryResult TQueryClient::RetryQuery(TQueryResultFunc&& queryFunc, TRetryOperationSettings settings) { - TRetryContextResultAsync::TPtr ctx(new NRetry::Async::TRetryWithSession(*this, std::move(queryFunc), settings)); - return ctx->Execute(); + return TRetryContextResultAsync::TPtr( + new NRetry::Async::TRetryWithSession(*this, std::move(queryFunc), settings))->Execute(); } TAsyncStatus TQueryClient::RetryQuery(TQueryFunc&& queryFunc, TRetryOperationSettings settings) { - TRetryContextAsync::TPtr ctx(new NRetry::Async::TRetryWithSession(*this, std::move(queryFunc), settings)); - return ctx->Execute(); + return TRetryContextAsync::TPtr( + new NRetry::Async::TRetryWithSession(*this, std::move(queryFunc), settings))->Execute(); } TAsyncStatus TQueryClient::RetryQuery(TQueryWithoutSessionFunc&& queryFunc, TRetryOperationSettings settings) { - TRetryContextAsync::TPtr ctx(new NRetry::Async::TRetryWithoutSession(*this, std::move(queryFunc), settings)); - return ctx->Execute(); + return TRetryContextAsync::TPtr( + new NRetry::Async::TRetryWithoutSession(*this, std::move(queryFunc), settings))->Execute(); } TStatus TQueryClient::RetryQuerySync(const TQuerySyncFunc& queryFunc, TRetryOperationSettings settings) { - NRetry::Sync::TRetryWithSession ctx(*this, queryFunc, settings); - return ctx.Execute(); + return NRetry::Sync::TRetryWithSession(*this, queryFunc, settings).Execute(); } TStatus TQueryClient::RetryQuerySync(const TQueryWithoutSessionSyncFunc& queryFunc, TRetryOperationSettings settings) { - NRetry::Sync::TRetryWithoutSession ctx(*this, queryFunc, settings); - return ctx.Execute(); + return NRetry::Sync::TRetryWithoutSession(*this, queryFunc, settings).Execute(); } TAsyncExecuteQueryResult TQueryClient::RetryQuery(const std::string& query, const TTxControl& txControl, @@ -723,8 +740,8 @@ TAsyncExecuteQueryResult TQueryClient::RetryQuery(const std::string& query, cons auto queryFunc = [&query, &txControl](TSession session, TDuration duration) -> TAsyncExecuteQueryResult { return session.ExecuteQuery(query, txControl, TExecuteQuerySettings().ClientTimeout(duration)); }; - TRetryContextResultAsync::TPtr ctx(new NRetry::Async::TRetryWithSession(*this, std::move(queryFunc), settings)); - return ctx->Execute(); + return TRetryContextResultAsync::TPtr( + new NRetry::Async::TRetryWithSession(*this, std::move(queryFunc), settings))->Execute(); } //////////////////////////////////////////////////////////////////////////////// diff --git a/src/client/table/impl/table_client.cpp b/src/client/table/impl/table_client.cpp index 47626433ea..4ddac0c072 100644 --- a/src/client/table/impl/table_client.cpp +++ b/src/client/table/impl/table_client.cpp @@ -37,6 +37,29 @@ TTableClient::TImpl::TImpl(std::shared_ptr&& connections, SessionPool_.SetStatCollector(DbDriverState_->StatCollector.GetSessionPoolStatCollector("Table")); } +std::shared_ptr TTableClient::TImpl::CreateRetryRootSpan() { + return NObservability::TRequestSpan::CreateForClientRetry( + "Table", + Tracer_, + DbDriverState_ + ); +} + +std::shared_ptr TTableClient::TImpl::CreateRetryAttemptSpan( + std::uint32_t attempt + , std::int64_t backoffMs + , const std::shared_ptr& parent +) { + return NObservability::TRequestSpan::CreateForRetryAttempt( + "Table", + Tracer_, + DbDriverState_, + attempt, + backoffMs, + parent + ); +} + TTableClient::TImpl::~TImpl() { if (Connections_->GetDrainOnDtors()) { Drain().Wait(DRAIN_TIMEOUT); @@ -385,7 +408,7 @@ TAsyncCreateSessionResult TTableClient::TImpl::CreateSession(const TCreateSessio auto createSessionPromise = NewPromise(); auto self = shared_from_this(); - auto obs = MakeObservation("GetSession"); + auto obs = MakeObservation("CreateSession"); auto createSessionExtractor = [createSessionPromise, self, standalone, obs] (google::protobuf::Any* any, TPlainStatus status) mutable { @@ -403,8 +426,8 @@ TAsyncCreateSessionResult TTableClient::TImpl::CreateSession(const TCreateSessio // We do not use SessionStatusInterception for CreateSession request session.SessionImpl_->MarkBroken(); } + obs->End(status.Status, status.Endpoint); TCreateSessionResult val(TStatus(std::move(status)), std::move(session)); - obs->End(val.GetStatus()); createSessionPromise.SetValue(std::move(val)); }; @@ -778,7 +801,7 @@ TAsyncStatus TTableClient::TImpl::ExecuteSchemeQuery(const TSession& session, co return future.Apply([obs](NThreading::TFuture f) mutable { auto status = f.ExtractValue(); - obs->End(status.GetStatus()); + obs->End(status.GetStatus(), status.GetEndpoint()); return status; }); } @@ -806,9 +829,9 @@ TAsyncBeginTransactionResult TTableClient::TImpl::BeginTransaction(const TSessio txId = result.tx_meta().id(); } + obs->End(status.Status, status.Endpoint); TBeginTransactionResult beginTxResult(TStatus(std::move(status)), TTransaction(session, txId)); - obs->End(beginTxResult.GetStatus()); promise.SetValue(std::move(beginTxResult)); }; @@ -835,7 +858,7 @@ TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSess request.set_tx_id(TStringType{txId}); request.set_collect_stats(GetStatsCollectionMode(settings.CollectQueryStats_)); - auto obs = MakeObservation("CommitTransaction"); + auto obs = MakeObservation("Commit"); auto promise = NewPromise(); @@ -851,8 +874,8 @@ TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSess } } + obs->End(status.Status, status.Endpoint); TCommitTransactionResult commitTxResult(TStatus(std::move(status)), queryStats); - obs->End(commitTxResult.GetStatus()); promise.SetValue(std::move(commitTxResult)); }; @@ -878,7 +901,7 @@ TAsyncStatus TTableClient::TImpl::RollbackTransaction(const TSession& session, c request.set_session_id(TStringType{session.GetId()}); request.set_tx_id(TStringType{txId}); - auto obs = MakeObservation("RollbackTransaction"); + auto obs = MakeObservation("Rollback"); auto future = RunSimple( std::move(request), @@ -888,7 +911,7 @@ TAsyncStatus TTableClient::TImpl::RollbackTransaction(const TSession& session, c return future.Apply([obs](TAsyncStatus fut) { auto status = fut.GetValue(); - obs->End(status.GetStatus()); + obs->End(status.GetStatus(), status.GetEndpoint()); return status; }); } @@ -1165,8 +1188,8 @@ TAsyncBulkUpsertResult TTableClient::TImpl::BulkUpsert(const std::string& table, auto promise = NewPromise(); auto extractor = [promise, obs](google::protobuf::Any* any, TPlainStatus status) mutable { Y_UNUSED(any); + obs->End(status.Status, status.Endpoint); TBulkUpsertResult val(TStatus(std::move(status))); - obs->End(val.GetStatus()); promise.SetValue(std::move(val)); }; @@ -1216,8 +1239,8 @@ TAsyncBulkUpsertResult TTableClient::TImpl::BulkUpsert(const std::string& table, auto extractor = [promise, obs] (google::protobuf::Any* any, TPlainStatus status) mutable { Y_UNUSED(any); + obs->End(status.Status, status.Endpoint); TBulkUpsertResult val(TStatus(std::move(status))); - obs->End(val.GetStatus()); promise.SetValue(std::move(val)); }; diff --git a/src/client/table/impl/table_client.h b/src/client/table/impl/table_client.h index 27c35c843a..8bddced9b7 100644 --- a/src/client/table/impl/table_client.h +++ b/src/client/table/impl/table_client.h @@ -157,6 +157,12 @@ class TTableClient::TImpl: public TClientImplCommon, public void CollectRetryStatAsync(EStatus status); void CollectRetryStatSync(EStatus status); + std::shared_ptr CreateRetryRootSpan(); + std::shared_ptr CreateRetryAttemptSpan(std::uint32_t attempt + , std::int64_t backoffMs + , const std::shared_ptr& parent = nullptr + ); + public: TClientSettings Settings_; @@ -288,11 +294,10 @@ class TTableClient::TImpl: public TClientImplCommon, public sessionPtr->SessionImpl_->AddQueryToCache(*dataQuery); } + obs->End(status.Status, status.Endpoint); TDataQueryResult dataQueryResult(TStatus(std::move(status)), std::move(res), tx, dataQuery, fromCache, queryStats); - obs->End(dataQueryResult.GetStatus()); - delete sessionPtr; tx.reset(); dataQuery.reset(); @@ -347,9 +352,7 @@ class TTableClient::TImpl: public TClientImplCommon, public &OperationStatCollector_, Tracer_, operationName, - DbDriverState_->DiscoveryEndpoint, - DbDriverState_->Database, - DbDriverState_->Log + DbDriverState_ ); } }; diff --git a/src/client/table/table.cpp b/src/client/table/table.cpp index bc0a885f23..4f6e411bf7 100644 --- a/src/client/table/table.cpp +++ b/src/client/table/table.cpp @@ -1590,23 +1590,21 @@ TTypeBuilder TTableClient::GetTypeBuilder() { //////////////////////////////////////////////////////////////////////////////// TAsyncStatus TTableClient::RetryOperation(TOperationFunc&& operation, const TRetryOperationSettings& settings) { - TRetryContextAsync::TPtr ctx(new NRetry::Async::TRetryWithSession(*this, std::move(operation), settings)); - return ctx->Execute(); + return TRetryContextAsync::TPtr( + new NRetry::Async::TRetryWithSession(*this, std::move(operation), settings))->Execute(); } TAsyncStatus TTableClient::RetryOperation(TOperationWithoutSessionFunc&& operation, const TRetryOperationSettings& settings) { - TRetryContextAsync::TPtr ctx(new NRetry::Async::TRetryWithoutSession(*this, std::move(operation), settings)); - return ctx->Execute(); + return TRetryContextAsync::TPtr( + new NRetry::Async::TRetryWithoutSession(*this, std::move(operation), settings))->Execute(); } TStatus TTableClient::RetryOperationSync(const TOperationWithoutSessionSyncFunc& operation, const TRetryOperationSettings& settings) { - NRetry::Sync::TRetryWithoutSession ctx(*this, operation, settings); - return ctx.Execute(); + return NRetry::Sync::TRetryWithoutSession(*this, operation, settings).Execute(); } TStatus TTableClient::RetryOperationSync(const TOperationSyncFunc& operation, const TRetryOperationSettings& settings) { - NRetry::Sync::TRetryWithSession ctx(*this, operation, settings); - return ctx.Execute(); + return NRetry::Sync::TRetryWithSession(*this, operation, settings).Execute(); } NThreading::TFuture TTableClient::Stop() { @@ -2511,10 +2509,6 @@ uint64_t TIndexDescription::GetSizeBytes() const { return SizeBytes_; } -void TIndexDescription::SetParallel(uint32_t parallel) { - Parallel_ = parallel; -} - TIndexDescription TIndexDescription::CreateGlobalIndex( const std::string& name, const std::vector& indexColumns, @@ -3059,9 +3053,6 @@ TIndexDescription TIndexDescription::FromProto(const TProto& proto) { if constexpr (std::is_same_v) { result.SizeBytes_ = proto.size_bytes(); } - if constexpr (std::is_same_v) { - result.Parallel_ = proto.parallel(); - } return result; } @@ -3074,8 +3065,6 @@ void TIndexDescription::SerializeTo(Ydb::Table::TableIndex& proto) const { *proto.mutable_data_columns() = {DataColumns_.begin(), DataColumns_.end()}; - proto.set_parallel(Parallel_); - switch (IndexType_) { case EIndexType::GlobalSync: { auto& settings = *proto.mutable_global_index()->mutable_settings(); diff --git a/src/client/trace/CMakeLists.txt b/src/client/trace/CMakeLists.txt index 5526160033..86a8f8d420 100644 --- a/src/client/trace/CMakeLists.txt +++ b/src/client/trace/CMakeLists.txt @@ -4,4 +4,4 @@ target_sources(client-trace PRIVATE trace.cpp ) -_ydb_sdk_make_client_component(Trace client-trace) \ No newline at end of file +_ydb_sdk_make_client_component(Trace client-trace) diff --git a/tests/common/fake_trace_provider.h b/tests/common/fake_trace_provider.h new file mode 100644 index 0000000000..55ddb01bf1 --- /dev/null +++ b/tests/common/fake_trace_provider.h @@ -0,0 +1,163 @@ +#pragma once + +#include + +#include +#include +#include +#include + +namespace NYdb::NTests { + +struct TFakeEvent { + std::string Name; + std::map Attributes; +}; + +class TFakeScope : public NTrace::IScope { +}; + +class TFakeSpan : public NTrace::ISpan { +public: + void End() override { + std::lock_guard lock(Mutex_); + Ended_ = true; + } + + void SetAttribute(const std::string& key, const std::string& value) override { + std::lock_guard lock(Mutex_); + StringAttributes_[key] = value; + } + + void SetAttribute(const std::string& key, int64_t value) override { + std::lock_guard lock(Mutex_); + IntAttributes_[key] = value; + } + + void AddEvent(const std::string& name, const std::map& attributes) override { + std::lock_guard lock(Mutex_); + Events_.push_back({name, attributes}); + } + + std::unique_ptr Activate() override { + std::lock_guard lock(Mutex_); + Activated_ = true; + return std::make_unique(); + } + + bool IsEnded() const { + std::lock_guard lock(Mutex_); + return Ended_; + } + + bool IsActivated() const { + std::lock_guard lock(Mutex_); + return Activated_; + } + + std::string GetStringAttribute(const std::string& key) const { + std::lock_guard lock(Mutex_); + auto it = StringAttributes_.find(key); + return it != StringAttributes_.end() ? it->second : ""; + } + + bool HasStringAttribute(const std::string& key) const { + std::lock_guard lock(Mutex_); + return StringAttributes_.contains(key); + } + + int64_t GetIntAttribute(const std::string& key) const { + std::lock_guard lock(Mutex_); + auto it = IntAttributes_.find(key); + return it != IntAttributes_.end() ? it->second : 0; + } + + bool HasIntAttribute(const std::string& key) const { + std::lock_guard lock(Mutex_); + return IntAttributes_.contains(key); + } + + std::vector GetEvents() const { + std::lock_guard lock(Mutex_); + return Events_; + } + +private: + mutable std::mutex Mutex_; + bool Ended_ = false; + bool Activated_ = false; + std::map StringAttributes_; + std::map IntAttributes_; + std::vector Events_; +}; + +class TFakeTracer : public NTrace::ITracer { +public: + std::shared_ptr StartSpan( + const std::string& name, + NTrace::ESpanKind kind, + NTrace::ISpan* parent + ) override { + auto span = std::make_shared(); + std::lock_guard lock(Mutex_); + Spans_.push_back({name, kind, span, parent}); + return span; + } + + struct TSpanRecord { + std::string Name; + NTrace::ESpanKind Kind; + std::shared_ptr Span; + NTrace::ISpan* Parent = nullptr; + }; + + std::vector GetSpans() const { + std::lock_guard lock(Mutex_); + return Spans_; + } + + std::shared_ptr GetLastSpan() const { + std::lock_guard lock(Mutex_); + return Spans_.empty() ? nullptr : Spans_.back().Span; + } + + TSpanRecord GetLastSpanRecord() const { + std::lock_guard lock(Mutex_); + return Spans_.back(); + } + + size_t SpanCount() const { + std::lock_guard lock(Mutex_); + return Spans_.size(); + } + +private: + mutable std::mutex Mutex_; + std::vector Spans_; +}; + +class TFakeTraceProvider : public NTrace::ITraceProvider { +public: + std::shared_ptr GetTracer(const std::string& name) override { + std::lock_guard lock(Mutex_); + auto it = Tracers_.find(name); + if (it != Tracers_.end()) { + return it->second; + } + auto tracer = std::make_shared(); + Tracers_[name] = tracer; + return tracer; + } + + std::shared_ptr GetFakeTracer(const std::string& name) const { + std::lock_guard lock(Mutex_); + auto it = Tracers_.find(name); + return it != Tracers_.end() ? it->second : nullptr; + } + +private: + mutable std::mutex Mutex_; + std::map> Tracers_; +}; + +} // namespace NYdb::NTests diff --git a/tests/integration/metrics/CMakeLists.txt b/tests/integration/metrics/CMakeLists.txt index 4d83aae56b..f63f5be046 100644 --- a/tests/integration/metrics/CMakeLists.txt +++ b/tests/integration/metrics/CMakeLists.txt @@ -7,6 +7,7 @@ add_ydb_test(NAME metrics_it GTEST yutil YDB-CPP-SDK::Query client-metrics + impl-observability-error_category LABELS integration -) \ No newline at end of file +) diff --git a/tests/integration/metrics/main.cpp b/tests/integration/metrics/main.cpp index 4668d5349a..1db9fe28e1 100644 --- a/tests/integration/metrics/main.cpp +++ b/tests/integration/metrics/main.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include @@ -65,10 +66,10 @@ std::shared_ptr GetDuration( {"db.namespace", dbNamespace}, {"db.operation.name", operation}, {"ydb.client.api", "Query"}, - {"db.response.status_code", ToString(status)}, }; if (status != EStatus::SUCCESS) { - labels["error.type"] = ToString(status); + labels["db.response.status_code"] = ToString(status); + labels["error.type"] = std::string(NObservability::CategorizeErrorType(status)); } return registry->GetHistogram("db.client.operation.duration", labels); } @@ -95,15 +96,15 @@ TEST(QueryMetricsIntegration, ExecuteQuerySuccessRecordsMetrics) { ).ExtractValueSync(); ASSERT_EQ(result.GetStatus(), EStatus::SUCCESS) << result.GetIssues().ToString(); - auto requests = GetCounter(registry, database, "db.client.operation.requests", "ExecuteQuery"); + auto requests = GetCounter(registry, database, "db.client.operation.requests", "ydb.ExecuteQuery"); ASSERT_NE(requests, nullptr) << "ExecuteQuery request counter not created"; EXPECT_GE(requests->Get(), 1); - auto errors = GetCounter(registry, database, "db.client.operation.errors", "ExecuteQuery"); + auto errors = GetCounter(registry, database, "db.client.operation.errors", "ydb.ExecuteQuery"); ASSERT_NE(errors, nullptr); EXPECT_EQ(errors->Get(), 0); - auto duration = GetDuration(registry, database, "ExecuteQuery", EStatus::SUCCESS); + auto duration = GetDuration(registry, database, "ydb.ExecuteQuery", EStatus::SUCCESS); ASSERT_NE(duration, nullptr) << "ExecuteQuery duration histogram not created"; EXPECT_GE(duration->Count(), 1u); for (double v : duration->GetValues()) { @@ -127,15 +128,15 @@ TEST(QueryMetricsIntegration, ExecuteQueryErrorRecordsErrorMetric) { ).ExtractValueSync(); EXPECT_NE(result.GetStatus(), EStatus::SUCCESS); - auto requests = GetCounter(registry, database, "db.client.operation.requests", "ExecuteQuery"); + auto requests = GetCounter(registry, database, "db.client.operation.requests", "ydb.ExecuteQuery"); ASSERT_NE(requests, nullptr); EXPECT_GE(requests->Get(), 1); - auto errors = GetCounter(registry, database, "db.client.operation.errors", "ExecuteQuery"); + auto errors = GetCounter(registry, database, "db.client.operation.errors", "ydb.ExecuteQuery"); ASSERT_NE(errors, nullptr); EXPECT_GE(errors->Get(), 1); - auto duration = GetDuration(registry, database, "ExecuteQuery", result.GetStatus()); + auto duration = GetDuration(registry, database, "ydb.ExecuteQuery", result.GetStatus()); ASSERT_NE(duration, nullptr); EXPECT_GE(duration->Count(), 1u); @@ -150,11 +151,11 @@ TEST(QueryMetricsIntegration, CreateSessionRecordsMetrics) { auto session = client.GetSession().ExtractValueSync(); ASSERT_TRUE(session.IsSuccess()) << session.GetIssues().ToString(); - auto requests = GetCounter(registry, database, "db.client.operation.requests", "GetSession"); + auto requests = GetCounter(registry, database, "db.client.operation.requests", "ydb.CreateSession"); ASSERT_NE(requests, nullptr) << "CreateSession request counter not created"; EXPECT_GE(requests->Get(), 1); - auto duration = GetDuration(registry, database, "GetSession", EStatus::SUCCESS); + auto duration = GetDuration(registry, database, "ydb.CreateSession", EStatus::SUCCESS); ASSERT_NE(duration, nullptr) << "CreateSession duration histogram not created"; EXPECT_GE(duration->Count(), 1u); @@ -184,11 +185,11 @@ TEST(QueryMetricsIntegration, CommitTransactionRecordsMetrics) { auto commitResult = execResult.GetTransaction()->Commit().ExtractValueSync(); ASSERT_TRUE(commitResult.IsSuccess()) << commitResult.GetIssues().ToString(); - auto commitRequests = GetCounter(registry, database, "db.client.operation.requests", "Commit"); + auto commitRequests = GetCounter(registry, database, "db.client.operation.requests", "ydb.Commit"); ASSERT_NE(commitRequests, nullptr) << "Commit request counter not created"; EXPECT_GE(commitRequests->Get(), 1); - auto commitDuration = GetDuration(registry, database, "Commit", EStatus::SUCCESS); + auto commitDuration = GetDuration(registry, database, "ydb.Commit", EStatus::SUCCESS); ASSERT_NE(commitDuration, nullptr); EXPECT_GE(commitDuration->Count(), 1u); } @@ -212,15 +213,15 @@ TEST(QueryMetricsIntegration, RollbackTransactionRecordsMetrics) { auto rollbackResult = tx.Rollback().ExtractValueSync(); ASSERT_TRUE(rollbackResult.IsSuccess()) << rollbackResult.GetIssues().ToString(); - auto rollbackRequests = GetCounter(registry, database, "db.client.operation.requests", "Rollback"); + auto rollbackRequests = GetCounter(registry, database, "db.client.operation.requests", "ydb.Rollback"); ASSERT_NE(rollbackRequests, nullptr) << "Rollback request counter not created"; EXPECT_GE(rollbackRequests->Get(), 1); - auto rollbackErrors = GetCounter(registry, database, "db.client.operation.errors", "Rollback"); + auto rollbackErrors = GetCounter(registry, database, "db.client.operation.errors", "ydb.Rollback"); ASSERT_NE(rollbackErrors, nullptr); EXPECT_EQ(rollbackErrors->Get(), 0); - auto rollbackDuration = GetDuration(registry, database, "Rollback", EStatus::SUCCESS); + auto rollbackDuration = GetDuration(registry, database, "ydb.Rollback", EStatus::SUCCESS); ASSERT_NE(rollbackDuration, nullptr); EXPECT_GE(rollbackDuration->Count(), 1u); @@ -245,15 +246,15 @@ TEST(QueryMetricsIntegration, MultipleQueriesAccumulateMetrics) { ASSERT_EQ(result.GetStatus(), EStatus::SUCCESS) << result.GetIssues().ToString(); } - auto requests = GetCounter(registry, database, "db.client.operation.requests", "ExecuteQuery"); + auto requests = GetCounter(registry, database, "db.client.operation.requests", "ydb.ExecuteQuery"); ASSERT_NE(requests, nullptr); EXPECT_EQ(requests->Get(), numQueries); - auto errors = GetCounter(registry, database, "db.client.operation.errors", "ExecuteQuery"); + auto errors = GetCounter(registry, database, "db.client.operation.errors", "ydb.ExecuteQuery"); ASSERT_NE(errors, nullptr); EXPECT_EQ(errors->Get(), 0); - auto duration = GetDuration(registry, database, "ExecuteQuery", EStatus::SUCCESS); + auto duration = GetDuration(registry, database, "ydb.ExecuteQuery", EStatus::SUCCESS); ASSERT_NE(duration, nullptr); EXPECT_EQ(duration->Count(), static_cast(numQueries)); @@ -300,7 +301,7 @@ TEST(QueryMetricsIntegration, DurationValuesAreRealistic) { ).ExtractValueSync(); ASSERT_EQ(result.GetStatus(), EStatus::SUCCESS) << result.GetIssues().ToString(); - auto duration = GetDuration(registry, database, "ExecuteQuery", EStatus::SUCCESS); + auto duration = GetDuration(registry, database, "ydb.ExecuteQuery", EStatus::SUCCESS); ASSERT_NE(duration, nullptr); ASSERT_GE(duration->Count(), 1u); diff --git a/tests/unit/client/CMakeLists.txt b/tests/unit/client/CMakeLists.txt index be4e49bfc9..d901992fef 100644 --- a/tests/unit/client/CMakeLists.txt +++ b/tests/unit/client/CMakeLists.txt @@ -132,4 +132,20 @@ add_ydb_test(NAME client-ydb_metrics_ut GTEST client-metrics LABELS unit +) + +add_ydb_test(NAME client-ydb_spans_ut GTEST + INCLUDE_DIRS + ${YDB_SDK_SOURCE_DIR} + SOURCES + observability/spans_ut.cpp + LINK_LIBRARIES + yutil + impl-observability + client-ydb_query-impl + client-ydb_table-impl + client-metrics + client-trace + LABELS + unit ) \ No newline at end of file diff --git a/tests/unit/client/observability/metrics_ut.cpp b/tests/unit/client/observability/metrics_ut.cpp index be073c3dca..d342b0b4fa 100644 --- a/tests/unit/client/observability/metrics_ut.cpp +++ b/tests/unit/client/observability/metrics_ut.cpp @@ -13,6 +13,10 @@ using namespace NYdb::NSdkStats; namespace { constexpr const char kTestDbNamespace[] = "/Root/testdb"; + + std::string YdbOp(const std::string& op) { + return op.rfind("ydb.", 0) == 0 ? op : "ydb." + op; + } } // namespace class RequestMetricsTest : public ::testing::Test { @@ -27,7 +31,7 @@ class RequestMetricsTest : public ::testing::Test { return Registry->GetCounter("db.client.operation.requests", { {"db.system.name", "ydb"}, {"db.namespace", kTestDbNamespace}, - {"db.operation.name", op}, + {"db.operation.name", YdbOp(op)}, {"ydb.client.api", "Unspecified"}, }); } @@ -36,7 +40,7 @@ class RequestMetricsTest : public ::testing::Test { return Registry->GetCounter("db.client.operation.errors", { {"db.system.name", "ydb"}, {"db.namespace", kTestDbNamespace}, - {"db.operation.name", op}, + {"db.operation.name", YdbOp(op)}, {"ydb.client.api", "Unspecified"}, }); } @@ -45,12 +49,12 @@ class RequestMetricsTest : public ::testing::Test { TLabels labels = { {"db.system.name", "ydb"}, {"db.namespace", kTestDbNamespace}, - {"db.operation.name", op}, + {"db.operation.name", YdbOp(op)}, {"ydb.client.api", "Unspecified"}, - {"db.response.status_code", ToString(status)}, }; if (status != EStatus::SUCCESS) { - labels["error.type"] = ToString(status); + labels["db.response.status_code"] = ToString(status); + labels["error.type"] = std::string(NObservability::CategorizeErrorType(status)); } return Registry->GetHistogram("db.client.operation.duration", labels); } @@ -222,7 +226,7 @@ TEST(RequestMetricsDbNamespaceTest, DifferentNamespacesAreSeparateMetricSeries) return NMetrics::TLabels{ {"db.system.name", "ydb"}, {"db.namespace", "/db/alpha"}, - {"db.operation.name", op}, + {"db.operation.name", YdbOp(op)}, {"ydb.client.api", "Unspecified"}, }; }; @@ -230,13 +234,13 @@ TEST(RequestMetricsDbNamespaceTest, DifferentNamespacesAreSeparateMetricSeries) return NMetrics::TLabels{ {"db.system.name", "ydb"}, {"db.namespace", "/db/beta"}, - {"db.operation.name", op}, + {"db.operation.name", YdbOp(op)}, {"ydb.client.api", "Unspecified"}, }; }; - auto reqAlpha = registry->GetCounter("db.client.operation.requests", labelsAlpha("GetSession")); - auto reqBeta = registry->GetCounter("db.client.operation.requests", labelsBeta("GetSession")); + auto reqAlpha = registry->GetCounter("db.client.operation.requests", labelsAlpha("ydb.GetSession")); + auto reqBeta = registry->GetCounter("db.client.operation.requests", labelsBeta("ydb.GetSession")); ASSERT_NE(reqAlpha, nullptr); ASSERT_NE(reqBeta, nullptr); EXPECT_EQ(reqAlpha->Get(), 1); @@ -244,18 +248,10 @@ TEST(RequestMetricsDbNamespaceTest, DifferentNamespacesAreSeparateMetricSeries) auto durAlpha = registry->GetHistogram( "db.client.operation.duration", - [&] { - auto l = labelsAlpha("GetSession"); - l["db.response.status_code"] = ToString(EStatus::SUCCESS); - return l; - }()); + labelsAlpha("ydb.GetSession")); auto durBeta = registry->GetHistogram( "db.client.operation.duration", - [&] { - auto l = labelsBeta("GetSession"); - l["db.response.status_code"] = ToString(EStatus::SUCCESS); - return l; - }()); + labelsBeta("ydb.GetSession")); ASSERT_NE(durAlpha, nullptr); ASSERT_NE(durBeta, nullptr); EXPECT_EQ(durAlpha->Count(), 1u); @@ -275,7 +271,7 @@ TEST(RequestMetricsClientAliasesTest, QueryOperationsUseOtelStandardMetrics) { { {"db.system.name", "ydb"}, {"db.namespace", ""}, - {"db.operation.name", "ExecuteQuery"}, + {"db.operation.name", "ydb.ExecuteQuery"}, {"ydb.client.api", "Query"}, } ), @@ -287,7 +283,7 @@ TEST(RequestMetricsClientAliasesTest, QueryOperationsUseOtelStandardMetrics) { { {"db.system.name", "ydb"}, {"db.namespace", ""}, - {"db.operation.name", "ExecuteQuery"}, + {"db.operation.name", "ydb.ExecuteQuery"}, {"ydb.client.api", "Query"}, } ), @@ -299,9 +295,8 @@ TEST(RequestMetricsClientAliasesTest, QueryOperationsUseOtelStandardMetrics) { { {"db.system.name", "ydb"}, {"db.namespace", ""}, - {"db.operation.name", "ExecuteQuery"}, + {"db.operation.name", "ydb.ExecuteQuery"}, {"ydb.client.api", "Query"}, - {"db.response.status_code", ToString(EStatus::SUCCESS)}, } ), nullptr @@ -321,7 +316,7 @@ TEST(RequestMetricsClientAliasesTest, TableOperationsUseOtelStandardMetrics) { { {"db.system.name", "ydb"}, {"db.namespace", ""}, - {"db.operation.name", "ExecuteDataQuery"}, + {"db.operation.name", "ydb.ExecuteDataQuery"}, {"ydb.client.api", "Table"}, } ), @@ -333,7 +328,7 @@ TEST(RequestMetricsClientAliasesTest, TableOperationsUseOtelStandardMetrics) { { {"db.system.name", "ydb"}, {"db.namespace", ""}, - {"db.operation.name", "ExecuteDataQuery"}, + {"db.operation.name", "ydb.ExecuteDataQuery"}, {"ydb.client.api", "Table"}, } ), @@ -345,9 +340,8 @@ TEST(RequestMetricsClientAliasesTest, TableOperationsUseOtelStandardMetrics) { { {"db.system.name", "ydb"}, {"db.namespace", ""}, - {"db.operation.name", "ExecuteDataQuery"}, + {"db.operation.name", "ydb.ExecuteDataQuery"}, {"ydb.client.api", "Table"}, - {"db.response.status_code", ToString(EStatus::SUCCESS)}, } ), nullptr diff --git a/tests/unit/client/observability/spans_ut.cpp b/tests/unit/client/observability/spans_ut.cpp new file mode 100644 index 0000000000..bf0ec03a00 --- /dev/null +++ b/tests/unit/client/observability/spans_ut.cpp @@ -0,0 +1,498 @@ +#include +#include + +#include + +#include + +using namespace NYdb; +using namespace NYdb::NTests; + +namespace { + +constexpr const char kTestDbNamespace[] = "/Root/testdb"; + +struct TSpanTestParams { + std::string Name; + std::string ClientType; + + std::string ExecuteOp; + std::string ExecuteOpName; + + std::string CreateSessionOp; + std::string CreateSessionOpName; + + std::string CommitOp; + std::string CommitOpName; + + std::string RollbackOp; + std::string RollbackOpName; + + std::string RetryOp; + std::string RetryOpName; +}; + +} // namespace + +class SpanTest : public ::testing::TestWithParam { +protected: + void SetUp() override { + Tracer = std::make_shared(); + } + + std::shared_ptr MakeRequestSpan( + const std::string& operationName, + const std::string& endpoint, + NTrace::ESpanKind kind = NTrace::ESpanKind::CLIENT + ) { + return NYdb::NObservability::TRequestSpan::Create( + GetParam().ClientType, + Tracer, + operationName, + endpoint, + kTestDbNamespace, + TLog{}, + kind + ); + } + + std::shared_ptr Tracer; +}; + +TEST_P(SpanTest, SpanNameFormat) { + const auto& p = GetParam(); + auto span = MakeRequestSpan(p.ExecuteOp, "localhost:2135"); + span->End(EStatus::SUCCESS); + + ASSERT_EQ(Tracer->SpanCount(), 1u); + EXPECT_EQ(Tracer->GetLastSpanRecord().Name, p.ExecuteOpName); +} + +TEST_P(SpanTest, SpanKindIsClient) { + const auto& p = GetParam(); + auto span = MakeRequestSpan(p.CreateSessionOp, "localhost:2135"); + span->End(EStatus::SUCCESS); + + ASSERT_EQ(Tracer->SpanCount(), 1u); + EXPECT_EQ(Tracer->GetLastSpanRecord().Kind, NTrace::ESpanKind::CLIENT); +} + +TEST_P(SpanTest, DbSystemAttribute) { + const auto& p = GetParam(); + auto span = MakeRequestSpan(p.ExecuteOp, "localhost:2135"); + span->End(EStatus::SUCCESS); + + auto fakeSpan = Tracer->GetLastSpan(); + ASSERT_NE(fakeSpan, nullptr); + EXPECT_EQ(fakeSpan->GetStringAttribute("db.system.name"), "ydb"); +} + +TEST_P(SpanTest, DbNamespaceAndClientApi) { + const auto& p = GetParam(); + auto span = MakeRequestSpan(p.ExecuteOp, "localhost:2135"); + span->End(EStatus::SUCCESS); + + auto fakeSpan = Tracer->GetLastSpan(); + ASSERT_NE(fakeSpan, nullptr); + EXPECT_EQ(fakeSpan->GetStringAttribute("db.namespace"), kTestDbNamespace); + EXPECT_EQ(fakeSpan->GetStringAttribute("ydb.client.api"), p.ClientType); + EXPECT_EQ(fakeSpan->GetStringAttribute("db.operation.name"), p.ExecuteOpName); +} + +TEST_P(SpanTest, ServerAddressAndPort) { + const auto& p = GetParam(); + auto span = MakeRequestSpan(p.CommitOp, "ydb.server:2135"); + span->End(EStatus::SUCCESS); + + auto fakeSpan = Tracer->GetLastSpan(); + ASSERT_NE(fakeSpan, nullptr); + EXPECT_EQ(fakeSpan->GetStringAttribute("server.address"), "ydb.server"); + EXPECT_EQ(fakeSpan->GetIntAttribute("server.port"), 2135); +} + +TEST_P(SpanTest, ServerAddressCustomPort) { + const auto& p = GetParam(); + auto span = MakeRequestSpan(p.RollbackOp, "myhost:9090"); + span->End(EStatus::SUCCESS); + + auto fakeSpan = Tracer->GetLastSpan(); + ASSERT_NE(fakeSpan, nullptr); + EXPECT_EQ(fakeSpan->GetStringAttribute("server.address"), "myhost"); + EXPECT_EQ(fakeSpan->GetIntAttribute("server.port"), 9090); +} + +TEST_P(SpanTest, ServerAddressNoPortDefaultsTo2135) { + const auto& p = GetParam(); + auto span = MakeRequestSpan(p.ExecuteOp, "myhost"); + span->End(EStatus::SUCCESS); + + auto fakeSpan = Tracer->GetLastSpan(); + ASSERT_NE(fakeSpan, nullptr); + EXPECT_EQ(fakeSpan->GetStringAttribute("server.address"), "myhost"); + EXPECT_EQ(fakeSpan->GetIntAttribute("server.port"), 2135); +} + +TEST_P(SpanTest, IPv6EndpointParsing) { + const auto& p = GetParam(); + auto span = MakeRequestSpan(p.ExecuteOp, "[::1]:2136"); + span->End(EStatus::SUCCESS); + + auto fakeSpan = Tracer->GetLastSpan(); + ASSERT_NE(fakeSpan, nullptr); + EXPECT_EQ(fakeSpan->GetStringAttribute("server.address"), "::1"); + EXPECT_EQ(fakeSpan->GetIntAttribute("server.port"), 2136); +} + +TEST_P(SpanTest, IPv6EndpointNoPort) { + const auto& p = GetParam(); + auto span = MakeRequestSpan(p.ExecuteOp, "[fe80::1]"); + span->End(EStatus::SUCCESS); + + auto fakeSpan = Tracer->GetLastSpan(); + ASSERT_NE(fakeSpan, nullptr); + EXPECT_EQ(fakeSpan->GetStringAttribute("server.address"), "fe80::1"); + EXPECT_EQ(fakeSpan->GetIntAttribute("server.port"), 2135); +} + +TEST_P(SpanTest, PeerEndpointAttributes) { + const auto& p = GetParam(); + auto span = MakeRequestSpan(p.ExecuteOp, "discovery.ydb:2135"); + span->SetPeerEndpoint("10.0.0.1:2136"); + span->End(EStatus::SUCCESS); + + auto fakeSpan = Tracer->GetLastSpan(); + ASSERT_NE(fakeSpan, nullptr); + EXPECT_EQ(fakeSpan->GetStringAttribute("network.peer.address"), "10.0.0.1"); + EXPECT_EQ(fakeSpan->GetIntAttribute("network.peer.port"), 2136); +} + +TEST_P(SpanTest, SuccessStatusDoesNotSetErrorAttrs) { + const auto& p = GetParam(); + auto span = MakeRequestSpan(p.CommitOp, "localhost:2135"); + span->End(EStatus::SUCCESS); + + auto fakeSpan = Tracer->GetLastSpan(); + ASSERT_NE(fakeSpan, nullptr); + EXPECT_FALSE(fakeSpan->HasStringAttribute("db.response.status_code")); + EXPECT_FALSE(fakeSpan->HasStringAttribute("error.type")); +} + +TEST_P(SpanTest, ErrorStatusSetsErrorType) { + const auto& p = GetParam(); + auto span = MakeRequestSpan(p.RollbackOp, "localhost:2135"); + span->End(EStatus::UNAVAILABLE); + + auto fakeSpan = Tracer->GetLastSpan(); + ASSERT_NE(fakeSpan, nullptr); + EXPECT_EQ(fakeSpan->GetStringAttribute("db.response.status_code"), "UNAVAILABLE"); + EXPECT_TRUE(fakeSpan->HasStringAttribute("error.type")); + EXPECT_FALSE(fakeSpan->GetStringAttribute("error.type").empty()); +} + +TEST_P(SpanTest, SpanIsEndedAfterEnd) { + const auto& p = GetParam(); + auto span = MakeRequestSpan(p.ExecuteOp, "localhost:2135"); + auto fakeSpan = Tracer->GetLastSpan(); + ASSERT_NE(fakeSpan, nullptr); + + EXPECT_FALSE(fakeSpan->IsEnded()); + span->End(EStatus::SUCCESS); + EXPECT_TRUE(fakeSpan->IsEnded()); +} + +TEST_P(SpanTest, NullTracerDoesNotCrash) { + const auto& p = GetParam(); + EXPECT_NO_THROW({ + auto span = NYdb::NObservability::TRequestSpan::Create( + p.ClientType, + nullptr, + p.ExecuteOp, + "localhost:2135", + kTestDbNamespace, + TLog{} + ); + span->SetPeerEndpoint("10.0.0.1:2136"); + span->AddEvent("retry", {{"attempt", "1"}}); + span->End(EStatus::SUCCESS); + }); +} + +TEST_P(SpanTest, DestructorEndsSpan) { + const auto& p = GetParam(); + auto fakeSpan = [&]() -> std::shared_ptr { + auto span = MakeRequestSpan(p.CreateSessionOp, "localhost:2135"); + return Tracer->GetLastSpan(); + }(); + + ASSERT_NE(fakeSpan, nullptr); + EXPECT_TRUE(fakeSpan->IsEnded()); +} + +TEST_P(SpanTest, ExplicitEndThenDestructorDoesNotDoubleEnd) { + const auto& p = GetParam(); + auto fakeSpan = [&]() -> std::shared_ptr { + auto span = MakeRequestSpan(p.CommitOp, "localhost:2135"); + span->End(EStatus::SUCCESS); + return Tracer->GetLastSpan(); + }(); + + ASSERT_NE(fakeSpan, nullptr); + EXPECT_TRUE(fakeSpan->IsEnded()); +} + +TEST_P(SpanTest, AddEventForwarded) { + const auto& p = GetParam(); + auto span = MakeRequestSpan(p.ExecuteOp, "localhost:2135"); + span->AddEvent("retry", {{"ydb.attempt", "2"}, {"error.type", "UNAVAILABLE"}}); + span->End(EStatus::SUCCESS); + + auto fakeSpan = Tracer->GetLastSpan(); + ASSERT_NE(fakeSpan, nullptr); + auto events = fakeSpan->GetEvents(); + ASSERT_EQ(events.size(), 1u); + EXPECT_EQ(events[0].Name, "retry"); + EXPECT_EQ(events[0].Attributes.at("ydb.attempt"), "2"); + EXPECT_EQ(events[0].Attributes.at("error.type"), "UNAVAILABLE"); +} + +TEST_P(SpanTest, EmptyPeerEndpointIgnored) { + const auto& p = GetParam(); + auto span = MakeRequestSpan(p.CreateSessionOp, "localhost:2135"); + span->SetPeerEndpoint(""); + span->End(EStatus::SUCCESS); + + auto fakeSpan = Tracer->GetLastSpan(); + ASSERT_NE(fakeSpan, nullptr); + EXPECT_FALSE(fakeSpan->HasStringAttribute("network.peer.address")); + EXPECT_FALSE(fakeSpan->HasIntAttribute("network.peer.port")); +} + +TEST_P(SpanTest, OperationNamesAreNormalized) { + const auto& p = GetParam(); + const std::vector> ops = { + {p.CreateSessionOp, p.CreateSessionOpName}, + {p.ExecuteOp, p.ExecuteOpName}, + {p.CommitOp, p.CommitOpName}, + {p.RollbackOp, p.RollbackOpName}, + }; + + for (const auto& [op, _] : ops) { + auto span = MakeRequestSpan(op, "localhost:2135"); + span->End(EStatus::SUCCESS); + } + + auto spans = Tracer->GetSpans(); + ASSERT_EQ(spans.size(), ops.size()); + for (size_t i = 0; i < ops.size(); ++i) { + EXPECT_EQ(spans[i].Name, ops[i].second); + EXPECT_EQ(spans[i].Kind, NTrace::ESpanKind::CLIENT); + } +} + +TEST_P(SpanTest, MultipleErrorStatuses) { + const auto& p = GetParam(); + const std::vector> errorStatuses = { + {EStatus::BAD_REQUEST, "ydb_error"}, + {EStatus::UNAUTHORIZED, "ydb_error"}, + {EStatus::INTERNAL_ERROR, "ydb_error"}, + {EStatus::UNAVAILABLE, "ydb_error"}, + {EStatus::OVERLOADED, "ydb_error"}, + {EStatus::TIMEOUT, "ydb_error"}, + {EStatus::NOT_FOUND, "ydb_error"}, + {EStatus::CLIENT_INTERNAL_ERROR, "transport_error"}, + }; + + for (const auto& [status, expectedErrorType] : errorStatuses) { + auto span = MakeRequestSpan(p.ExecuteOp, "localhost:2135"); + span->End(status); + + auto fakeSpan = Tracer->GetLastSpan(); + ASSERT_NE(fakeSpan, nullptr); + EXPECT_EQ( + fakeSpan->GetStringAttribute("db.response.status_code"), + ToString(status) + ); + EXPECT_EQ( + fakeSpan->GetStringAttribute("error.type"), + expectedErrorType + ) << "wrong error.type category for status " << static_cast(status); + } +} + +TEST_P(SpanTest, EmptyEndpointDoesNotCrash) { + const auto& p = GetParam(); + EXPECT_NO_THROW({ + auto span = MakeRequestSpan(p.ExecuteOp, ""); + span->End(EStatus::SUCCESS); + }); +} + +TEST_P(SpanTest, ActivateReturnsScope) { + const auto& p = GetParam(); + auto span = MakeRequestSpan(p.RetryOp, "localhost:2135"); + auto scope = span->Activate(); + EXPECT_NE(scope, nullptr); + + auto fakeSpan = Tracer->GetLastSpan(); + ASSERT_NE(fakeSpan, nullptr); + EXPECT_TRUE(fakeSpan->IsActivated()); + + span->End(EStatus::SUCCESS); +} + +TEST_P(SpanTest, ActivateNullTracerReturnsNull) { + const auto& p = GetParam(); + auto span = NYdb::NObservability::TRequestSpan::Create( + p.ClientType, + nullptr, + p.RetryOp, + "localhost:2135", + kTestDbNamespace, + TLog{} + ); + auto scope = span->Activate(); + EXPECT_EQ(scope, nullptr); +} + +TEST_P(SpanTest, InternalSpanKindIsPropagated) { + const auto& p = GetParam(); + auto span = MakeRequestSpan(p.RetryOp, "localhost:2135", NTrace::ESpanKind::INTERNAL); + span->End(EStatus::SUCCESS); + + ASSERT_EQ(Tracer->SpanCount(), 1u); + EXPECT_EQ(Tracer->GetLastSpanRecord().Name, p.RetryOpName); + EXPECT_EQ(Tracer->GetLastSpanRecord().Kind, NTrace::ESpanKind::INTERNAL); +} + +TEST_P(SpanTest, ErrorStatusAddsExceptionEvent) { + const auto& p = GetParam(); + auto span = MakeRequestSpan(p.ExecuteOp, "localhost:2135"); + span->End(EStatus::UNAVAILABLE); + + auto fakeSpan = Tracer->GetLastSpan(); + ASSERT_NE(fakeSpan, nullptr); + auto events = fakeSpan->GetEvents(); + ASSERT_FALSE(events.empty()); + + bool found = false; + for (const auto& event : events) { + if (event.Name == "exception") { + found = true; + EXPECT_EQ(event.Attributes.at("exception.type"), "UNAVAILABLE"); + EXPECT_EQ(event.Attributes.at("exception.message"), "UNAVAILABLE"); + } + } + EXPECT_TRUE(found) << "expected an 'exception' event on a failed span"; +} + +TEST_P(SpanTest, SuccessStatusNoExceptionEvent) { + const auto& p = GetParam(); + auto span = MakeRequestSpan(p.CommitOp, "localhost:2135"); + span->End(EStatus::SUCCESS); + + auto fakeSpan = Tracer->GetLastSpan(); + ASSERT_NE(fakeSpan, nullptr); + for (const auto& event : fakeSpan->GetEvents()) { + EXPECT_NE(event.Name, "exception"); + } +} + +TEST_P(SpanTest, RecordExceptionEmitsEvent) { + const auto& p = GetParam(); + auto span = MakeRequestSpan(p.ExecuteOp, "localhost:2135"); + span->RecordException("TimeoutException", "operation timed out"); + span->End(EStatus::SUCCESS); + + auto fakeSpan = Tracer->GetLastSpan(); + ASSERT_NE(fakeSpan, nullptr); + auto events = fakeSpan->GetEvents(); + ASSERT_FALSE(events.empty()); + bool found = false; + for (const auto& event : events) { + if (event.Name == "exception" + && event.Attributes.count("exception.type") + && event.Attributes.at("exception.type") == "TimeoutException") + { + found = true; + EXPECT_EQ(event.Attributes.at("exception.message"), "operation timed out"); + } + } + EXPECT_TRUE(found); +} + +TEST_P(SpanTest, ExplicitParentIsPropagatedToTracer) { + const auto& p = GetParam(); + + auto parent = NYdb::NObservability::TRequestSpan::Create( + p.ClientType, Tracer, p.RetryOp, "localhost:2135", kTestDbNamespace, TLog{}, NTrace::ESpanKind::INTERNAL); + ASSERT_NE(parent, nullptr); + auto parentRecord = Tracer->GetLastSpanRecord(); + auto* parentRaw = parentRecord.Span.get(); + + auto child = NYdb::NObservability::TRequestSpan::Create( + p.ClientType, Tracer, p.ExecuteOp, "localhost:2135", kTestDbNamespace, + TLog{}, NTrace::ESpanKind::CLIENT, parent); + ASSERT_NE(child, nullptr); + auto childRecord = Tracer->GetLastSpanRecord(); + + EXPECT_EQ(childRecord.Parent, parentRaw) + << "child span must receive parent pointer through ITracer::StartSpan"; +} + +TEST_P(SpanTest, RetryAttemptParentedToRoot) { + const auto& p = GetParam(); + + auto parent = NYdb::NObservability::TRequestSpan::Create( + p.ClientType, Tracer, p.RetryOp, "localhost:2135", kTestDbNamespace, TLog{}, NTrace::ESpanKind::INTERNAL); + ASSERT_NE(parent, nullptr); + auto parentRecord = Tracer->GetLastSpanRecord(); + auto* parentRaw = parentRecord.Span.get(); + + // Imitating what retry contexts do: create attempt span with explicit parent. + auto attempt = NYdb::NObservability::TRequestSpan::Create( + p.ClientType, Tracer, "ydb.Try", "localhost:2135", kTestDbNamespace, + TLog{}, NTrace::ESpanKind::INTERNAL, parent); + ASSERT_NE(attempt, nullptr); + auto attemptRecord = Tracer->GetLastSpanRecord(); + + EXPECT_EQ(attemptRecord.Parent, parentRaw) + << "retry attempt span must be parented to retry root span explicitly"; +} + +INSTANTIATE_TEST_SUITE_P( + Clients, + SpanTest, + ::testing::Values( + TSpanTestParams{ + /*Name=*/ "Query", + /*ClientType=*/ "Query", + /*ExecuteOp=*/ "ExecuteQuery", + /*ExecuteOpName=*/ "ydb.ExecuteQuery", + /*CreateSessionOp=*/ "CreateSession", + /*CreateSessionOpName=*/"ydb.CreateSession", + /*CommitOp=*/ "Commit", + /*CommitOpName=*/ "ydb.Commit", + /*RollbackOp=*/ "Rollback", + /*RollbackOpName=*/ "ydb.Rollback", + /*RetryOp=*/ "ydb.RunWithRetry", + /*RetryOpName=*/ "ydb.RunWithRetry" + }, + TSpanTestParams{ + /*Name=*/ "Table", + /*ClientType=*/ "Table", + /*ExecuteOp=*/ "ydb.ExecuteDataQuery", + /*ExecuteOpName=*/ "ydb.ExecuteDataQuery", + /*CreateSessionOp=*/ "ydb.CreateSession", + /*CreateSessionOpName=*/"ydb.CreateSession", + /*CommitOp=*/ "ydb.Commit", + /*CommitOpName=*/ "ydb.Commit", + /*RollbackOp=*/ "ydb.Rollback", + /*RollbackOpName=*/ "ydb.Rollback", + /*RetryOp=*/ "ydb.RunWithRetry", + /*RetryOpName=*/ "ydb.RunWithRetry" + } + ), + [](const ::testing::TestParamInfo& info) { + return info.param.Name; + } +);