Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/postgres_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ static void LoadInternal(ExtensionLoader &loader) {
"Whether or not to use TEXT protocol to read data. This is slower, but provides better "
"compatibility with non-Postgres systems",
LogicalType::BOOLEAN, Value::BOOLEAN(false));
config.AddExtensionOption("pg_statement_timeout_millis",
"Postgres statement timeout in milliseconds to set on scan connections",
LogicalType::UINTEGER, Value());
config.AddExtensionOption("pg_idle_in_transaction_timeout_millis",
"Postgres idle in transaction timeout in milliseconds to set on scan connections",
LogicalType::UINTEGER, Value());

OptimizerExtension postgres_optimizer;
postgres_optimizer.optimize_function = PostgresOptimizer::Optimize;
Expand Down
9 changes: 9 additions & 0 deletions src/postgres_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,15 @@ static void PostgresScanConnect(ClientContext &context, PostgresConnection &conn
D_ASSERT(isolation_level != PostgresIsolationLevel::READ_COMMITTED);
conn.Query(context, StringUtil::Format("SET TRANSACTION SNAPSHOT '%s'", snapshot));
}
Value statement_timeout;
if (context.TryGetCurrentSetting("pg_statement_timeout_millis", statement_timeout) && !statement_timeout.IsNull()) {
conn.Execute(context, StringUtil::Format("SET statement_timeout=%u", UIntegerValue::Get(statement_timeout)));
}
Value idle_timeout;
if (context.TryGetCurrentSetting("pg_idle_in_transaction_timeout_millis", idle_timeout) && !idle_timeout.IsNull()) {
conn.Execute(context, StringUtil::Format("SET idle_in_transaction_session_timeout=%u",
UIntegerValue::Get(idle_timeout)));
}
}

static unique_ptr<GlobalTableFunctionState> PostgresInitGlobalState(ClientContext &context,
Expand Down
60 changes: 60 additions & 0 deletions test/sql/storage/attach_connection_init.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# name: test/sql/storage/attach_connection_init.test
# description: Test that pg_statement_timeout_millis and pg_idle_in_transaction_timeout_millis are applied to parallel scan connections
# group: [storage]

require postgres_scanner

require-env POSTGRES_TEST_DATABASE_AVAILABLE

statement ok
ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES);

statement ok
CREATE OR REPLACE TABLE s.large_connection_init AS FROM range(1000000) t(i)

statement ok
CALL postgres_execute('s', 'ANALYZE large_connection_init')

statement ok
DETACH s

statement ok
ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES, READ_ONLY)

statement ok
SET threads=10

statement ok
SET pg_pages_per_task=1

statement ok
SET pg_connection_limit=4

statement ok
SET pg_statement_timeout_millis=0

statement ok
SET pg_idle_in_transaction_timeout_millis=0

statement ok
CALL enable_logging('PostgresQueryLog')

query I
SELECT COUNT(*) FROM s.large_connection_init
----
1000000

# Verify that timeout settings were executed on parallel connections (not just the main one)
query I
SELECT COUNT(*) > 1
FROM duckdb_logs_parsed('PostgresQueryLog')
WHERE query = 'SET statement_timeout=0'
----
1

query I
SELECT COUNT(*) > 1
FROM duckdb_logs_parsed('PostgresQueryLog')
WHERE query = 'SET idle_in_transaction_session_timeout=0'
----
1
Loading