Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions odbc/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
add_library(ydb-odbc SHARED
src/utils/attr.cpp
src/utils/escape.cpp
src/utils/cursor.cpp
src/utils/types.cpp
src/utils/util.cpp
src/utils/convert.cpp
src/utils/error_manager.cpp
src/odbc_driver.cpp
src/connection_attr.cpp
src/connection.cpp
src/statement_attr.cpp
src/statement.cpp
src/environment.cpp
)
Expand All @@ -23,7 +27,6 @@ target_link_libraries(ydb-odbc
YDB-CPP-SDK::Table
YDB-CPP-SDK::Scheme
YDB-CPP-SDK::Driver
ODBC::ODBC
)

set_target_properties(ydb-odbc PROPERTIES
Expand All @@ -43,7 +46,7 @@ add_subdirectory(tests)

include(GNUInstallDirs)

install(FILES
install(FILES
odbcinst.ini
DESTINATION ${CMAKE_INSTALL_SYSCONFDIR}/odbcinst.d
RENAME ydb-odbc.ini
Expand Down
178 changes: 159 additions & 19 deletions odbc/src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
#include "statement.h"
#include "utils/error_manager.h"

#include <cstring>
#include <string>
#include <map>
#include <string>
#include <unordered_map>

#include <sql.h>
#include <sqlext.h>
Expand All @@ -14,6 +14,60 @@
namespace NYdb {
namespace NOdbc {

namespace {

struct TDriverKey {
std::string Endpoint;
std::string Database;

bool operator==(const TDriverKey& other) const noexcept {
return Endpoint == other.Endpoint && Database == other.Database;
}
};

struct TDriverKeyHash {
size_t operator()(const TDriverKey& key) const noexcept {
return std::hash<std::string>{}(key.Endpoint) ^ (std::hash<std::string>{}(key.Database) << 1U);
}
};

struct TDriverPool {
std::unordered_map<TDriverKey, std::weak_ptr<NYdb::TDriver>, TDriverKeyHash> DriversByKey;
size_t InsertionsSinceCleanup = 0;
};

void CleanupExpiredDrivers(TDriverPool& pool) {
for (auto mapIt = pool.DriversByKey.begin(); mapIt != pool.DriversByKey.end();) {
if (mapIt->second.expired()) {
mapIt = pool.DriversByKey.erase(mapIt);
} else {
++mapIt;
}
}
}

std::shared_ptr<NYdb::TDriver> AcquireSharedDriver(const std::string& endpoint, const std::string& database) {
static TDriverPool pool;
TDriverKey key{endpoint, database};
auto it = pool.DriversByKey.find(key);
if (it != pool.DriversByKey.end()) {
if (std::shared_ptr<NYdb::TDriver> existing = it->second.lock()) {
return existing;
}
}
auto driver = std::make_shared<NYdb::TDriver>(
NYdb::TDriverConfig().SetEndpoint(endpoint).SetDatabase(database));
pool.DriversByKey[std::move(key)] = driver;
++pool.InsertionsSinceCleanup;
if (pool.InsertionsSinceCleanup >= 32) {
CleanupExpiredDrivers(pool);
pool.InsertionsSinceCleanup = 0;
}
return driver;
}
Comment on lines +49 to +67
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AcquireSharedDriver() uses a mutable static driver pool (DriversByKey, InsertionsSinceCleanup) without any synchronization. If multiple connections are created concurrently (common for ODBC clients), this introduces a data race and undefined behavior. Protect the pool with a mutex (or make it thread-local / use a concurrent map) to ensure thread safety.

Copilot uses AI. Check for mistakes.

} // namespace

SQLRETURN TConnection::DriverConnect(const std::string& connectionString) {
std::map<std::string, std::string> params;
size_t pos = 0;
Expand All @@ -39,13 +93,9 @@ SQLRETURN TConnection::DriverConnect(const std::string& connectionString) {
throw TOdbcException("08001", 0, "Missing Endpoint or Database in connection string");
}

YdbDriver_ = std::make_unique<NYdb::TDriver>(NYdb::TDriverConfig()
.SetEndpoint(Endpoint_)
.SetDatabase(Database_));

YdbClient_ = std::make_unique<NYdb::NQuery::TQueryClient>(*YdbDriver_);
YdbSchemeClient_ = std::make_unique<NYdb::NScheme::TSchemeClient>(*YdbDriver_);
YdbTableClient_ = std::make_unique<NYdb::NTable::TTableClient>(*YdbDriver_);
TConnectionAttributes::NormalizeCatalogPath(Database_);
RecreateYdbClients();
Attributes_.SetCurrentCatalog(Database_);

return SQL_SUCCESS;
}
Expand All @@ -67,23 +117,32 @@ SQLRETURN TConnection::Connect(const std::string& serverName,
throw TOdbcException("08001", 0, "Missing Endpoint or Database in DSN");
}

YdbDriver_ = std::make_unique<NYdb::TDriver>(NYdb::TDriverConfig()
.SetEndpoint(Endpoint_)
.SetDatabase(Database_));

YdbClient_ = std::make_unique<NYdb::NQuery::TQueryClient>(*YdbDriver_);
YdbSchemeClient_ = std::make_unique<NYdb::NScheme::TSchemeClient>(*YdbDriver_);
YdbTableClient_ = std::make_unique<NYdb::NTable::TTableClient>(*YdbDriver_);
TConnectionAttributes::NormalizeCatalogPath(Database_);
RecreateYdbClients();
Attributes_.SetCurrentCatalog(Database_);

return SQL_SUCCESS;
}

SQLRETURN TConnection::Disconnect() {
QuerySession_.reset();
Tx_.reset();
YdbSchemeClient_.reset();
YdbTableClient_.reset();
YdbClient_.reset();
YdbDriver_.reset();
return SQL_SUCCESS;
}

NQuery::TSession& TConnection::GetOrCreateQuerySession() {
if (!QuerySession_) {
auto sessionResult = YdbClient_->GetSession().ExtractValueSync();
NStatusHelpers::ThrowOnError(sessionResult);
QuerySession_.emplace(std::move(sessionResult.GetSession()));
}
return *QuerySession_;
}

std::unique_ptr<TStatement> TConnection::CreateStatement() {
return std::make_unique<TStatement>(this);
}
Expand All @@ -94,8 +153,8 @@ void TConnection::RemoveStatement(TStatement* stmt) {
}

SQLRETURN TConnection::SetAutocommit(bool value) {
Autocommit_ = value;
if (Autocommit_ && Tx_) {
Attributes_.SetAutocommit(value);
if (Attributes_.GetAutocommit() && Tx_) {
auto status = Tx_->Commit().ExtractValueSync();
NStatusHelpers::ThrowOnError(status);
Tx_.reset();
Expand All @@ -104,7 +163,33 @@ SQLRETURN TConnection::SetAutocommit(bool value) {
}

bool TConnection::GetAutocommit() const {
return Autocommit_;
return Attributes_.GetAutocommit();
}

SQLRETURN TConnection::SetConnectAttr(SQLINTEGER attr, SQLPOINTER value, SQLINTEGER stringLength) {
if (attr == SQL_ATTR_CURRENT_CATALOG) {
std::optional<std::string> rebindDatabase;
SQLRETURN rc = Attributes_.ApplyCatalogChange(value, stringLength, Database_, rebindDatabase, *this);
if (rc != SQL_SUCCESS) {
return rc;
}
if (rebindDatabase) {
RebindToDatabase(*rebindDatabase);
}
return SQL_SUCCESS;
}
return Attributes_.SetConnectAttr(attr, value, stringLength, [this](bool autocommit) {
return SetAutocommit(autocommit);
}, *this);
}

SQLRETURN TConnection::GetConnectAttr(SQLINTEGER attr, SQLPOINTER value, SQLINTEGER bufferLength,
SQLINTEGER* stringLengthPtr) {
return Attributes_.GetConnectAttr(attr, value, bufferLength, stringLengthPtr, *this);
}

NQuery::TTxSettings TConnection::MakeTxSettings() const {
return Attributes_.MakeTxSettings();
}

const std::optional<NQuery::TTransaction>& TConnection::GetTx() {
Expand All @@ -115,6 +200,14 @@ void TConnection::SetTx(const NQuery::TTransaction& tx) {
Tx_ = tx;
}

void TConnection::ResetTx() {
Tx_.reset();
}

void TConnection::ResetQuerySession() {
QuerySession_.reset();
}

SQLRETURN TConnection::CommitTx() {
auto status = Tx_->Commit().ExtractValueSync();
NStatusHelpers::ThrowOnError(status);
Expand All @@ -129,5 +222,52 @@ SQLRETURN TConnection::RollbackTx() {
return SQL_SUCCESS;
}

void TConnection::SetEnvironment(TEnvironment* env){
if (ParentEnv_){
throw std::logic_error("Connection already bound to environment");
}
ParentEnv_ = env;
}

TEnvironment* TConnection::GetEnvironment(){
return ParentEnv_;
}

void TConnection::RecreateYdbClients() {
QuerySession_.reset();
Tx_.reset();
YdbSchemeClient_.reset();
YdbTableClient_.reset();
YdbClient_.reset();
YdbDriver_ = AcquireSharedDriver(Endpoint_, Database_);
YdbClient_ = std::make_unique<NYdb::NQuery::TQueryClient>(*YdbDriver_);
YdbSchemeClient_ = std::make_unique<NYdb::NScheme::TSchemeClient>(*YdbDriver_);
YdbTableClient_ = std::make_unique<NYdb::NTable::TTableClient>(*YdbDriver_);
}

void TConnection::RebindToDatabase(const std::string& newDatabase) {
std::string db = newDatabase;
TConnectionAttributes::NormalizeCatalogPath(db);
Database_ = std::move(db);
Attributes_.SetCurrentCatalog(Database_);
RecreateYdbClients();
}


std::string TConnection::WrapQueryForCurrentCatalog(const std::string& sql) const {
std::optional<std::string> rel = Attributes_.ResolveCatalogRoute(Database_).TablePathPrefix;
if (!rel) {
return sql;
}
std::string escapedPrefix;
escapedPrefix.reserve(rel->size() + 8);
for (const char ch : *rel) {
if (ch == '\\' || ch == '"') {
escapedPrefix.push_back('\\');
}
escapedPrefix.push_back(ch);
}
return "PRAGMA TablePathPrefix = \"" + escapedPrefix + "\";\n" + sql;
}
} // namespace NOdbc
} // namespace NYdb
24 changes: 21 additions & 3 deletions odbc/src/connection.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "environment.h"
#include "connection_attr.h"
#include "utils/error_manager.h"

#include <ydb-cpp-sdk/client/driver/driver.h>
Expand All @@ -12,8 +13,9 @@
#include <sqlext.h>

#include <memory>
#include <vector>
#include <optional>
#include <string>
#include <vector>

namespace NYdb {
namespace NOdbc {
Expand All @@ -22,19 +24,23 @@ class TStatement;

class TConnection : public TErrorManager {
private:
std::unique_ptr<TDriver> YdbDriver_;
std::shared_ptr<TDriver> YdbDriver_;
std::unique_ptr<NQuery::TQueryClient> YdbClient_;
std::unique_ptr<NTable::TTableClient> YdbTableClient_;
std::unique_ptr<NScheme::TSchemeClient> YdbSchemeClient_;
std::optional<NQuery::TTransaction> Tx_;
std::optional<NQuery::TSession> QuerySession_;

std::vector<std::unique_ptr<TStatement>> Statements_;
std::string Endpoint_;
std::string Database_;
std::string AuthToken_;
TEnvironment* ParentEnv_;
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ParentEnv_ is never initialized, so SetEnvironment()/GetEnvironment() may read an indeterminate pointer value (e.g., if (ParentEnv_) can spuriously throw). Initialize it to nullptr (either in-class or via a constructor) to avoid UB.

Suggested change
TEnvironment* ParentEnv_;
TEnvironment* ParentEnv_ = nullptr;

Copilot uses AI. Check for mistakes.

bool Autocommit_ = true;
TConnectionAttributes Attributes_;

void RecreateYdbClients();
void RebindToDatabase(const std::string& newDatabase);
public:
SQLRETURN Connect(const std::string& serverName,
const std::string& userName,
Expand All @@ -47,17 +53,29 @@ class TConnection : public TErrorManager {
void RemoveStatement(TStatement* stmt);

NYdb::NQuery::TQueryClient* GetClient() { return YdbClient_.get(); }
NQuery::TSession& GetOrCreateQuerySession();
NYdb::NTable::TTableClient* GetTableClient() { return YdbTableClient_.get(); }
NScheme::TSchemeClient* GetSchemeClient() { return YdbSchemeClient_.get(); }

SQLRETURN SetAutocommit(bool value);
bool GetAutocommit() const;

SQLRETURN SetConnectAttr(SQLINTEGER attr, SQLPOINTER value, SQLINTEGER stringLength);
SQLRETURN GetConnectAttr(SQLINTEGER attr, SQLPOINTER value, SQLINTEGER bufferLength, SQLINTEGER* stringLengthPtr);
NQuery::TTxSettings MakeTxSettings() const;

std::string WrapQueryForCurrentCatalog(const std::string& sql) const;

const std::optional<NQuery::TTransaction>& GetTx();
void SetTx(const NQuery::TTransaction& tx);
void ResetTx();
void ResetQuerySession();

SQLRETURN CommitTx();
SQLRETURN RollbackTx();

void SetEnvironment(TEnvironment* env);
TEnvironment* GetEnvironment();
};

} // namespace NOdbc
Expand Down
Loading