Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/include/postgres_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ class PostgresConnection {

bool IsOpen();
void Close();
bool PingServer();
void Reset();
bool PingServer(const std::string &health_check_query);
void Reset(const std::string &health_check_query);

shared_ptr<OwnedPostgresConnection> GetConnection() {
return connection;
Expand Down
13 changes: 10 additions & 3 deletions src/include/storage/postgres_connection_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#pragma once

#include <mutex>

#include "duckdb/common/common.hpp"
#include "duckdb/common/mutex.hpp"
#include "duckdb/common/optional_ptr.hpp"
Expand All @@ -24,15 +26,19 @@ using PostgresPoolConnection = dbconnector::pool::PooledConnection<PostgresConne

class PostgresConnectionPool : public dbconnector::pool::ConnectionPool<PostgresConnection> {
public:
PostgresConnectionPool(PostgresCatalog &postgres_catalog, idx_t maximum_connections = DefaultPoolSize());
PostgresConnectionPool(PostgresCatalog &postgres_catalog);

public:
bool TryGetConnection(PostgresPoolConnection &connection);
PostgresPoolConnection GetConnection();
//! Always returns a connection - even if the connection slots are exhausted
PostgresPoolConnection ForceGetConnection();

static idx_t DefaultPoolSize() noexcept;
std::string GetHealthCheckQuery();
void SetHealthCheckQuery(const std::string &query);

static idx_t DefaultPoolSize();
static std::string DefaultHealthCheckQuery();

protected:
std::unique_ptr<PostgresConnection> CreateNewConnection() override;
Expand All @@ -42,7 +48,8 @@ class PostgresConnectionPool : public dbconnector::pool::ConnectionPool<Postgres
private:
PostgresCatalog &postgres_catalog;

static dbconnector::pool::ConnectionPoolConfig CreateConfig(idx_t max_connections);
std::mutex config_mutex;
std::string health_check_query;
};

} // namespace duckdb
11 changes: 7 additions & 4 deletions src/postgres_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,20 +186,23 @@ void PostgresConnection::Close() {
connection = nullptr;
}

bool PostgresConnection::PingServer() {
bool PostgresConnection::PingServer(const std::string &health_check_query) {
if (!IsOpen()) {
return false;
}
PGconn *conn = GetConn();
if (PQstatus(conn) != CONNECTION_OK) {
return false;
}
PGresult *res = PQexec(conn, "SELECT 1");
if (health_check_query.empty()) {
return true;
}
PGresult *res = PQexec(conn, health_check_query.c_str());
PostgresResult res_holder(res);
return PQresultStatus(res) == PGRES_TUPLES_OK;
}

void PostgresConnection::Reset() {
void PostgresConnection::Reset(const std::string &health_check_query) {
if (!IsOpen()) {
throw InternalException("Cannot reset a connection that is not open");
}
Expand All @@ -216,7 +219,7 @@ void PostgresConnection::Reset() {
}
}
PQreset(conn);
if (!PingServer()) {
if (!PingServer(health_check_query)) {
throw InternalException("Connection reset failure");
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/postgres_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ static void LoadInternal(ExtensionLoader &loader) {
config.AddExtensionOption("pg_idle_in_transaction_timeout_millis",
"Postgres idle in transaction timeout in milliseconds to set on scan connections",
LogicalType::UINTEGER, Value());
// connection pool options
config.AddExtensionOption("pg_pool_health_check_query", "The query to use to check that the connection is healthy",
LogicalType::VARCHAR, PostgresConnectionPool::DefaultHealthCheckQuery());

OptimizerExtension postgres_optimizer;
postgres_optimizer.optimize_function = PostgresOptimizer::Optimize;
Expand Down
5 changes: 0 additions & 5 deletions src/storage/postgres_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@ PostgresCatalog::PostgresCatalog(AttachedDatabase &db_p, string connection_strin
if (default_schema.empty()) {
default_schema = "public";
}
Value connection_limit;
auto &db_instance = db_p.GetDatabase();
if (db_instance.TryGetCurrentSetting("pg_connection_limit", connection_limit)) {
connection_pool->SetMaxConnections(UBigIntValue::Get(connection_limit));
}

auto connection = connection_pool->GetConnection();
this->version = connection.GetConnection().GetPostgresVersion(context);
Expand Down
59 changes: 48 additions & 11 deletions src/storage/postgres_connection_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,22 @@

namespace duckdb {

PostgresConnectionPool::PostgresConnectionPool(PostgresCatalog &postgres_catalog, idx_t maximum_connections_p)
: dbconnector::pool::ConnectionPool<PostgresConnection>(CreateConfig(maximum_connections_p)),
postgres_catalog(postgres_catalog) {
static dbconnector::pool::ConnectionPoolConfig CreateConfig(PostgresCatalog &postgres_catalog);

static std::string GetHealthCheckQueryFromConfig(PostgresCatalog &postgres_catalog) {
Value val;
if (postgres_catalog.GetDatabase().TryGetCurrentSetting("pg_pool_health_check_query", val)) {
if (val.IsNull()) {
return std::string();
}
return StringValue::Get(val);
}
return PostgresConnectionPool::DefaultHealthCheckQuery();
}

PostgresConnectionPool::PostgresConnectionPool(PostgresCatalog &postgres_catalog)
: dbconnector::pool::ConnectionPool<PostgresConnection>(CreateConfig(postgres_catalog)),
postgres_catalog(postgres_catalog), health_check_query(GetHealthCheckQueryFromConfig(postgres_catalog)) {
}

PostgresPoolConnection PostgresConnectionPool::ForceGetConnection() {
Expand Down Expand Up @@ -39,23 +52,47 @@ bool PostgresConnectionPool::CheckConnectionHealthy(PostgresConnection &conn) {
if (!conn.IsOpen()) {
return false;
}
return conn.PingServer();
std::string query = GetHealthCheckQuery();
return conn.PingServer(query);
}

void PostgresConnectionPool::ResetConnection(PostgresConnection &conn) {
conn.Reset();
std::string query = GetHealthCheckQuery();
conn.Reset(query);
}

dbconnector::pool::ConnectionPoolConfig PostgresConnectionPool::CreateConfig(idx_t max_connections) {
dbconnector::pool::ConnectionPoolConfig config;
config.max_connections = max_connections;
return config;
std::string PostgresConnectionPool::GetHealthCheckQuery() {
std::lock_guard<std::mutex> guard(config_mutex);
return std::string(health_check_query.data(), health_check_query.length());
}

idx_t PostgresConnectionPool::DefaultPoolSize() noexcept {
void PostgresConnectionPool::SetHealthCheckQuery(const std::string &query) {
std::lock_guard<std::mutex> guard(config_mutex);
this->health_check_query = std::string(query.data(), query.length());
}

idx_t PostgresConnectionPool::DefaultPoolSize() {
idx_t detected = static_cast<idx_t>(std::thread::hardware_concurrency());
idx_t default_num = static_cast<idx_t>(8);
return detected < default_num ? detected : default_num;
return detected > default_num ? detected : default_num;
}

std::string PostgresConnectionPool::DefaultHealthCheckQuery() {
return "SELECT 1";
}

static dbconnector::pool::ConnectionPoolConfig CreateConfig(PostgresCatalog &postgres_catalog) {
DatabaseInstance &db = postgres_catalog.GetDatabase();

Value connection_limit;
uint64_t max_connections = PostgresConnectionPool::DefaultPoolSize();
if (db.TryGetCurrentSetting("pg_connection_limit", connection_limit) && !connection_limit.IsNull()) {
max_connections = UBigIntValue::Get(connection_limit);
}

dbconnector::pool::ConnectionPoolConfig config;
config.max_connections = max_connections;
return config;
}

} // namespace duckdb
7 changes: 7 additions & 0 deletions test/sql/storage/attach_connection_pool.test
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,10 @@ query I
SELECT COUNT(*) FROM connection_pool
----
1000000

# todo: pool introspection is required to check the effect
statement ok
SET pg_pool_health_check_query = 'SELECT FAIL'

statement ok
RESET pg_pool_health_check_query
2 changes: 1 addition & 1 deletion test/sql/storage/attach_multi_join.test
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ select * from test1 a left join test2 b on a.id = b.id left join test3 c on a.id
----

statement ok
SET pg_connection_limit=1
SET pg_connection_limit=2

query III
select * from test1 a left join test2 b on a.id = b.id left join test3 c on a.id = c.id;
Expand Down
Loading