diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query01.ans b/automation/sqlrepo/features/parallel/correctness/expected/query01.ans new file mode 100644 index 00000000..7672dea1 --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/expected/query01.ans @@ -0,0 +1,13 @@ +-- @description query01 for PXF parallel scan correctness - count with parallel + +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +SELECT count(*) FROM pxf_parallel_enabled; + count +------- + 10000 +(1 row) diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query02.ans b/automation/sqlrepo/features/parallel/correctness/expected/query02.ans new file mode 100644 index 00000000..4a77aafb --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/expected/query02.ans @@ -0,0 +1,13 @@ +-- @description query02 for PXF parallel scan correctness - sum aggregation + +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +SELECT sum(id) FROM pxf_parallel_enabled; + sum +---------- + 50005000 +(1 row) diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query03.ans b/automation/sqlrepo/features/parallel/correctness/expected/query03.ans new file mode 100644 index 00000000..918acaa1 --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/expected/query03.ans @@ -0,0 +1,21 @@ +-- @description query03 for PXF parallel scan correctness - cross-check parallel vs non-parallel count + +SET optimizer = off; +SET +SET enable_parallel = false; +SET +SELECT count(*) AS non_parallel_count FROM pxf_parallel_disabled; + non_parallel_count +-------------------- + 10000 +(1 row) + +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +SELECT count(*) AS parallel_count FROM pxf_parallel_enabled; + parallel_count +---------------- + 10000 +(1 row) diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query04.ans b/automation/sqlrepo/features/parallel/correctness/expected/query04.ans new file mode 100644 index 00000000..c3caefbe --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/expected/query04.ans @@ -0,0 +1,22 @@ +-- @description query04 for PXF parallel scan correctness - ORDER BY with LIMIT + +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +SELECT id, val FROM pxf_parallel_enabled ORDER BY id LIMIT 10; + id | val +----+------- + 1 | row_1 + 2 | row_2 + 3 | row_3 + 4 | row_4 + 5 | row_5 + 6 | row_6 + 7 | row_7 + 8 | row_8 + 9 | row_9 + 10 | row_10 +(10 rows) diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query05.ans b/automation/sqlrepo/features/parallel/correctness/expected/query05.ans new file mode 100644 index 00000000..7df9da3f --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/expected/query05.ans @@ -0,0 +1,13 @@ +-- @description query05 for PXF parallel scan correctness - MIN/MAX/AVG aggregates + +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +SELECT min(id), max(id), avg(id) FROM pxf_parallel_enabled; + min | max | avg +-----+-------+----------------------- + 1 | 10000 | 5000.5000000000000000 +(1 row) diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query06.ans b/automation/sqlrepo/features/parallel/correctness/expected/query06.ans new file mode 100644 index 00000000..e37f743a --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/expected/query06.ans @@ -0,0 +1,13 @@ +-- @description query06 for PXF parallel scan correctness - WHERE pushdown with parallel + +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +SELECT count(*) FROM pxf_parallel_enabled WHERE id > 5000; + count +------- + 5000 +(1 row) diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query07.ans b/automation/sqlrepo/features/parallel/correctness/expected/query07.ans new file mode 100644 index 00000000..078b7721 --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/expected/query07.ans @@ -0,0 +1,17 @@ +-- @description query07 for PXF parallel scan correctness - column projection with WHERE + +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +SELECT val FROM pxf_parallel_enabled WHERE id <= 5 ORDER BY val; + val +------- + row_1 + row_2 + row_3 + row_4 + row_5 +(5 rows) diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query08.ans b/automation/sqlrepo/features/parallel/correctness/expected/query08.ans new file mode 100644 index 00000000..3d27ff32 --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/expected/query08.ans @@ -0,0 +1,13 @@ +-- @description query08 for PXF parallel scan correctness - empty result edge case + +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +SELECT count(*) FROM pxf_parallel_enabled WHERE id < 0; + count +------- + 0 +(1 row) diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query09.ans b/automation/sqlrepo/features/parallel/correctness/expected/query09.ans new file mode 100644 index 00000000..250b4b75 --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/expected/query09.ans @@ -0,0 +1,13 @@ +-- @description query09 for PXF parallel scan correctness - COUNT DISTINCT no duplicates + +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +SELECT count(DISTINCT id) FROM pxf_parallel_enabled; + count +------- + 10000 +(1 row) diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query10.ans b/automation/sqlrepo/features/parallel/correctness/expected/query10.ans new file mode 100644 index 00000000..d993ffb2 --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/expected/query10.ans @@ -0,0 +1,13 @@ +-- @description query10 for PXF parallel scan correctness - workers=0 fallback on parallel table + +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 0; +SET +SELECT count(*) FROM pxf_parallel_enabled; + count +------- + 10000 +(1 row) diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query01.sql b/automation/sqlrepo/features/parallel/correctness/sql/query01.sql new file mode 100644 index 00000000..5149f3f8 --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/sql/query01.sql @@ -0,0 +1,6 @@ +-- @description query01 for PXF parallel scan correctness - count with parallel + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +SELECT count(*) FROM pxf_parallel_enabled; diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query02.sql b/automation/sqlrepo/features/parallel/correctness/sql/query02.sql new file mode 100644 index 00000000..d9ec4dca --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/sql/query02.sql @@ -0,0 +1,6 @@ +-- @description query02 for PXF parallel scan correctness - sum aggregation + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +SELECT sum(id) FROM pxf_parallel_enabled; diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query03.sql b/automation/sqlrepo/features/parallel/correctness/sql/query03.sql new file mode 100644 index 00000000..536dccd8 --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/sql/query03.sql @@ -0,0 +1,9 @@ +-- @description query03 for PXF parallel scan correctness - cross-check parallel vs non-parallel count + +SET optimizer = off; +SET enable_parallel = false; +SELECT count(*) AS non_parallel_count FROM pxf_parallel_disabled; + +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +SELECT count(*) AS parallel_count FROM pxf_parallel_enabled; diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query04.sql b/automation/sqlrepo/features/parallel/correctness/sql/query04.sql new file mode 100644 index 00000000..251f9a4a --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/sql/query04.sql @@ -0,0 +1,6 @@ +-- @description query04 for PXF parallel scan correctness - ORDER BY with LIMIT + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +SELECT id, val FROM pxf_parallel_enabled ORDER BY id LIMIT 10; diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query05.sql b/automation/sqlrepo/features/parallel/correctness/sql/query05.sql new file mode 100644 index 00000000..7379f88c --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/sql/query05.sql @@ -0,0 +1,6 @@ +-- @description query05 for PXF parallel scan correctness - MIN/MAX/AVG aggregates + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +SELECT min(id), max(id), avg(id) FROM pxf_parallel_enabled; diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query06.sql b/automation/sqlrepo/features/parallel/correctness/sql/query06.sql new file mode 100644 index 00000000..2be9bd6a --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/sql/query06.sql @@ -0,0 +1,6 @@ +-- @description query06 for PXF parallel scan correctness - WHERE pushdown with parallel + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +SELECT count(*) FROM pxf_parallel_enabled WHERE id > 5000; diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query07.sql b/automation/sqlrepo/features/parallel/correctness/sql/query07.sql new file mode 100644 index 00000000..b7d7ebff --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/sql/query07.sql @@ -0,0 +1,6 @@ +-- @description query07 for PXF parallel scan correctness - column projection with WHERE + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +SELECT val FROM pxf_parallel_enabled WHERE id <= 5 ORDER BY val; diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query08.sql b/automation/sqlrepo/features/parallel/correctness/sql/query08.sql new file mode 100644 index 00000000..136a6a33 --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/sql/query08.sql @@ -0,0 +1,6 @@ +-- @description query08 for PXF parallel scan correctness - empty result edge case + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +SELECT count(*) FROM pxf_parallel_enabled WHERE id < 0; diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query09.sql b/automation/sqlrepo/features/parallel/correctness/sql/query09.sql new file mode 100644 index 00000000..9743ae30 --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/sql/query09.sql @@ -0,0 +1,6 @@ +-- @description query09 for PXF parallel scan correctness - COUNT DISTINCT no duplicates + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +SELECT count(DISTINCT id) FROM pxf_parallel_enabled; diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query10.sql b/automation/sqlrepo/features/parallel/correctness/sql/query10.sql new file mode 100644 index 00000000..17a89a38 --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/sql/query10.sql @@ -0,0 +1,6 @@ +-- @description query10 for PXF parallel scan correctness - workers=0 fallback on parallel table + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 0; +SELECT count(*) FROM pxf_parallel_enabled; diff --git a/automation/sqlrepo/features/parallel/explain/expected/query01.ans b/automation/sqlrepo/features/parallel/explain/expected/query01.ans new file mode 100644 index 00000000..6fb6a328 --- /dev/null +++ b/automation/sqlrepo/features/parallel/explain/expected/query01.ans @@ -0,0 +1,32 @@ +-- @description query01 for PXF parallel scan EXPLAIN - Gather node present when parallel enabled +-- start_matchsubs +-- +-- m/Workers Planned: \d+/ +-- s/Workers Planned: \d+/Workers Planned: N/ +-- +-- m/cost=\d+\.\d+\.\.\d+\.\d+/ +-- s/cost=\d+\.\d+\.\.\d+\.\d+/cost=XXX/ +-- +-- m/rows=\d+/ +-- s/rows=\d+/rows=NNN/ +-- +-- m/width=\d+/ +-- s/width=\d+/width=NN/ +-- +-- end_matchsubs +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +EXPLAIN SELECT count(*) FROM pxf_parallel_enabled; + QUERY PLAN +---------------------------------------------------------------------------------------------- + Gather Motion 1:1 (slice1; segments: 1) (cost=XXX rows=NNN width=NN) + -> Finalize Aggregate (cost=XXX rows=NNN width=NN) + -> Gather (cost=XXX rows=NNN width=NN) + Workers Planned: N + -> Partial Aggregate (cost=XXX rows=NNN width=NN) + -> Parallel Foreign Scan on pxf_parallel_enabled (cost=XXX rows=NNN width=NN) +(6 rows) diff --git a/automation/sqlrepo/features/parallel/explain/expected/query02.ans b/automation/sqlrepo/features/parallel/explain/expected/query02.ans new file mode 100644 index 00000000..fd7207d5 --- /dev/null +++ b/automation/sqlrepo/features/parallel/explain/expected/query02.ans @@ -0,0 +1,24 @@ +-- @description query02 for PXF parallel scan EXPLAIN - no Gather node when parallel disabled +-- start_matchsubs +-- +-- m/cost=\d+\.\d+\.\.\d+\.\d+/ +-- s/cost=\d+\.\d+\.\.\d+\.\d+/cost=XXX/ +-- +-- m/rows=\d+/ +-- s/rows=\d+/rows=NNN/ +-- +-- m/width=\d+/ +-- s/width=\d+/width=NN/ +-- +-- end_matchsubs +SET optimizer = off; +SET +SET enable_parallel = false; +SET +EXPLAIN SELECT count(*) FROM pxf_parallel_disabled; + QUERY PLAN +---------------------------------------------------------------------------------------- + Gather Motion 1:1 (slice1; segments: 1) (cost=XXX rows=NNN width=NN) + -> Aggregate (cost=XXX rows=NNN width=NN) + -> Foreign Scan on pxf_parallel_disabled (cost=XXX rows=NNN width=NN) +(3 rows) diff --git a/automation/sqlrepo/features/parallel/explain/expected/query03.ans b/automation/sqlrepo/features/parallel/explain/expected/query03.ans new file mode 100644 index 00000000..cbc76f18 --- /dev/null +++ b/automation/sqlrepo/features/parallel/explain/expected/query03.ans @@ -0,0 +1,84 @@ +-- @description query03 for PXF parallel scan EXPLAIN ANALYZE - Workers Launched present +-- start_matchsubs +-- +-- m/Workers Planned: \d+/ +-- s/Workers Planned: \d+/Workers Planned: N/ +-- +-- m/Workers Launched: \d+/ +-- s/Workers Launched: \d+/Workers Launched: N/ +-- +-- m/cost=\d+\.\d+\.\.\d+\.\d+/ +-- s/cost=\d+\.\d+\.\.\d+\.\d+/cost=XXX/ +-- +-- m/rows=\d+/ +-- s/rows=\d+/rows=NNN/ +-- +-- m/width=\d+/ +-- s/width=\d+/width=NN/ +-- +-- m/actual time=\d+\.\d+\.\.\d+\.\d+/ +-- s/actual time=\d+\.\d+\.\.\d+\.\d+/actual time=XXX/ +-- +-- m/loops=\d+/ +-- s/loops=\d+/loops=N/ +-- +-- m/Execution Time: \d+\.\d+ ms/ +-- s/Execution Time: \d+\.\d+ ms/Execution Time: XXX ms/ +-- +-- m/Planning Time: \d+\.\d+ ms/ +-- s/Planning Time: \d+\.\d+ ms/Planning Time: XXX ms/ +-- +-- m/Memory Usage: \d+kB/ +-- s/Memory Usage: \d+kB/Memory Usage: NNkB/ +-- +-- m/Memory: \d+kB/ +-- s/Memory: \d+kB/Memory: NNkB/ +-- +-- m/Buckets: \d+/ +-- s/Buckets: \d+/Buckets: NNN/ +-- +-- m/Batches: \d+/ +-- s/Batches: \d+/Batches: NNN/ +-- +-- m/Peak Memory Usage: \d+/ +-- s/Peak Memory Usage: \d+/Peak Memory Usage: NNN/ +-- +-- m/Avg Peak Memory \(per process\): \d+/ +-- s/Avg Peak Memory \(per process\): \d+/Avg Peak Memory (per process): NNN/ +-- +-- m/slice \d+; segments: \d+/ +-- s/slice \d+; segments: \d+/slice N; segments: N/ +-- +-- m/Optimizer: .*/ +-- s/Optimizer: .*/Optimizer: OPT/ +-- +-- end_matchsubs +-- start_matchignore +-- +-- m/^\s+Slice statistics:/ +-- m/^\s+\(slice\d+\)/ +-- m/^\s+Statement statistics:/ +-- m/^\s+Settings:/ +-- m/^\s+Query Identifier:/ +-- +-- end_matchignore +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +EXPLAIN ANALYZE SELECT count(*) FROM pxf_parallel_enabled; + QUERY PLAN +---------------------------------------------------------------------------------------------- + Gather Motion 1:1 (slice N; segments: N) (cost=XXX rows=NNN width=NN) (actual time=XXX rows=NNN loops=N) + -> Finalize Aggregate (cost=XXX rows=NNN width=NN) (actual time=XXX rows=NNN loops=N) + -> Gather (cost=XXX rows=NNN width=NN) (actual time=XXX rows=NNN loops=N) + Workers Planned: N + Workers Launched: N + -> Partial Aggregate (cost=XXX rows=NNN width=NN) (actual time=XXX rows=NNN loops=N) + -> Parallel Foreign Scan on pxf_parallel_enabled (cost=XXX rows=NNN width=NN) (actual time=XXX rows=NNN loops=N) + Optimizer: OPT + Planning Time: XXX ms + Execution Time: XXX ms +(10 rows) diff --git a/automation/sqlrepo/features/parallel/explain/expected/query04.ans b/automation/sqlrepo/features/parallel/explain/expected/query04.ans new file mode 100644 index 00000000..14bcc381 --- /dev/null +++ b/automation/sqlrepo/features/parallel/explain/expected/query04.ans @@ -0,0 +1,26 @@ +-- @description query04 for PXF parallel scan EXPLAIN - no Gather with workers=0 on parallel table +-- start_matchsubs +-- +-- m/cost=\d+\.\d+\.\.\d+\.\d+/ +-- s/cost=\d+\.\d+\.\.\d+\.\d+/cost=XXX/ +-- +-- m/rows=\d+/ +-- s/rows=\d+/rows=NNN/ +-- +-- m/width=\d+/ +-- s/width=\d+/width=NN/ +-- +-- end_matchsubs +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 0; +SET +EXPLAIN SELECT count(*) FROM pxf_parallel_enabled; + QUERY PLAN +---------------------------------------------------------------------------------------- + Gather Motion 1:1 (slice1; segments: 1) (cost=XXX rows=NNN width=NN) + -> Aggregate (cost=XXX rows=NNN width=NN) + -> Foreign Scan on pxf_parallel_enabled (cost=XXX rows=NNN width=NN) +(3 rows) diff --git a/automation/sqlrepo/features/parallel/explain/sql/query01.sql b/automation/sqlrepo/features/parallel/explain/sql/query01.sql new file mode 100644 index 00000000..17eecc09 --- /dev/null +++ b/automation/sqlrepo/features/parallel/explain/sql/query01.sql @@ -0,0 +1,6 @@ +-- @description query01 for PXF parallel scan EXPLAIN - Gather node present when parallel enabled + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +EXPLAIN SELECT count(*) FROM pxf_parallel_enabled; diff --git a/automation/sqlrepo/features/parallel/explain/sql/query02.sql b/automation/sqlrepo/features/parallel/explain/sql/query02.sql new file mode 100644 index 00000000..be769b18 --- /dev/null +++ b/automation/sqlrepo/features/parallel/explain/sql/query02.sql @@ -0,0 +1,5 @@ +-- @description query02 for PXF parallel scan EXPLAIN - no Gather node when parallel disabled + +SET optimizer = off; +SET enable_parallel = false; +EXPLAIN SELECT count(*) FROM pxf_parallel_disabled; diff --git a/automation/sqlrepo/features/parallel/explain/sql/query03.sql b/automation/sqlrepo/features/parallel/explain/sql/query03.sql new file mode 100644 index 00000000..a69394d3 --- /dev/null +++ b/automation/sqlrepo/features/parallel/explain/sql/query03.sql @@ -0,0 +1,6 @@ +-- @description query03 for PXF parallel scan EXPLAIN ANALYZE - Workers Launched present + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +EXPLAIN ANALYZE SELECT count(*) FROM pxf_parallel_enabled; diff --git a/automation/sqlrepo/features/parallel/explain/sql/query04.sql b/automation/sqlrepo/features/parallel/explain/sql/query04.sql new file mode 100644 index 00000000..1b783608 --- /dev/null +++ b/automation/sqlrepo/features/parallel/explain/sql/query04.sql @@ -0,0 +1,6 @@ +-- @description query04 for PXF parallel scan EXPLAIN - no Gather with workers=0 on parallel table + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 0; +EXPLAIN SELECT count(*) FROM pxf_parallel_enabled; diff --git a/automation/sqlrepo/features/parallel/rescan/expected/query01.ans b/automation/sqlrepo/features/parallel/rescan/expected/query01.ans new file mode 100644 index 00000000..8d00c268 --- /dev/null +++ b/automation/sqlrepo/features/parallel/rescan/expected/query01.ans @@ -0,0 +1,18 @@ +-- @description query01 for PXF parallel scan rescan - correlated subquery triggers rescan + +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +SELECT t.id, (SELECT count(*) FROM pxf_parallel_enabled WHERE id <= t.id) AS running_count +FROM pxf_parallel_enabled t +WHERE t.id IN (1, 5000, 10000) +ORDER BY t.id; + id | running_count +-------+--------------- + 1 | 1 + 5000 | 5000 + 10000 | 10000 +(3 rows) diff --git a/automation/sqlrepo/features/parallel/rescan/sql/query01.sql b/automation/sqlrepo/features/parallel/rescan/sql/query01.sql new file mode 100644 index 00000000..19553ad2 --- /dev/null +++ b/automation/sqlrepo/features/parallel/rescan/sql/query01.sql @@ -0,0 +1,9 @@ +-- @description query01 for PXF parallel scan rescan - correlated subquery triggers rescan + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +SELECT t.id, (SELECT count(*) FROM pxf_parallel_enabled WHERE id <= t.id) AS running_count +FROM pxf_parallel_enabled t +WHERE t.id IN (1, 5000, 10000) +ORDER BY t.id; diff --git a/automation/sqlrepo/features/parallel/single_fragment/expected/query01.ans b/automation/sqlrepo/features/parallel/single_fragment/expected/query01.ans new file mode 100644 index 00000000..757c97f4 --- /dev/null +++ b/automation/sqlrepo/features/parallel/single_fragment/expected/query01.ans @@ -0,0 +1,13 @@ +-- @description query01 for PXF parallel scan single fragment - count with excess workers + +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +SELECT count(*) FROM pxf_parallel_single_frag; + count +------- + 100 +(1 row) diff --git a/automation/sqlrepo/features/parallel/single_fragment/expected/query02.ans b/automation/sqlrepo/features/parallel/single_fragment/expected/query02.ans new file mode 100644 index 00000000..2590defc --- /dev/null +++ b/automation/sqlrepo/features/parallel/single_fragment/expected/query02.ans @@ -0,0 +1,13 @@ +-- @description query02 for PXF parallel scan single fragment - sum aggregation + +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +SELECT sum(id) FROM pxf_parallel_single_frag; + sum +------ + 5050 +(1 row) diff --git a/automation/sqlrepo/features/parallel/single_fragment/sql/query01.sql b/automation/sqlrepo/features/parallel/single_fragment/sql/query01.sql new file mode 100644 index 00000000..7e7758ce --- /dev/null +++ b/automation/sqlrepo/features/parallel/single_fragment/sql/query01.sql @@ -0,0 +1,6 @@ +-- @description query01 for PXF parallel scan single fragment - count with excess workers + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +SELECT count(*) FROM pxf_parallel_single_frag; diff --git a/automation/sqlrepo/features/parallel/single_fragment/sql/query02.sql b/automation/sqlrepo/features/parallel/single_fragment/sql/query02.sql new file mode 100644 index 00000000..06243e54 --- /dev/null +++ b/automation/sqlrepo/features/parallel/single_fragment/sql/query02.sql @@ -0,0 +1,6 @@ +-- @description query02 for PXF parallel scan single fragment - sum aggregation + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +SELECT sum(id) FROM pxf_parallel_single_frag; diff --git a/automation/src/test/java/org/apache/cloudberry/pxf/automation/features/parallel/ParallelScanTest.java b/automation/src/test/java/org/apache/cloudberry/pxf/automation/features/parallel/ParallelScanTest.java new file mode 100644 index 00000000..11815b6f --- /dev/null +++ b/automation/src/test/java/org/apache/cloudberry/pxf/automation/features/parallel/ParallelScanTest.java @@ -0,0 +1,106 @@ +package org.apache.cloudberry.pxf.automation.features.parallel; + +import annotations.WorksWithFDW; +import org.apache.cloudberry.pxf.automation.features.BaseFeature; +import org.apache.cloudberry.pxf.automation.structures.tables.basic.Table; +import org.apache.cloudberry.pxf.automation.structures.tables.utils.TableFactory; +import org.apache.cloudberry.pxf.automation.utils.system.ProtocolEnum; +import org.apache.cloudberry.pxf.automation.utils.system.ProtocolUtils; +import org.testng.annotations.Test; + +/** + * Tests for PG-style Gather parallel scan on PXF FDW foreign tables. + * + * Verifies: + * - Correctness: count, sum, ordering match between parallel and non-parallel scans + * - EXPLAIN: Gather node present when parallel enabled, absent when disabled + */ +@WorksWithFDW +public class ParallelScanTest extends BaseFeature { + + private static final String[] FIELDS = {"id integer", "val text"}; + private static final int FILES = 10; + private static final int ROWS_PER_FILE = 1000; + private static final int SINGLE_FRAG_ROWS = 100; + + private String hdfsDir; + private String singleFragDir; + + @Override + protected void beforeClass() throws Exception { + super.beforeClass(); + + hdfsDir = hdfs.getWorkingDirectory() + "/parallel_data"; + hdfs.createDirectory(hdfsDir); + + // Write 10 CSV files to HDFS (10 x 1000 rows = 10,000 total) + for (int fileIdx = 0; fileIdx < FILES; fileIdx++) { + Table dataTable = new Table("part_" + String.format("%02d", fileIdx), null); + int startId = fileIdx * ROWS_PER_FILE + 1; + int endId = (fileIdx + 1) * ROWS_PER_FILE; + for (int id = startId; id <= endId; id++) { + dataTable.addRow(new String[]{String.valueOf(id), "row_" + id}); + } + hdfs.writeTableToFile(hdfsDir + "/part_" + String.format("%02d", fileIdx) + ".csv", + dataTable, ","); + } + + ProtocolEnum protocol = ProtocolUtils.getProtocol(); + String tablePath = protocol.getExternalTablePath(hdfs.getBasePath(), hdfsDir); + + // Create foreign table with enable_parallel=true + exTable = TableFactory.getPxfReadableCSVTable("pxf_parallel_enabled", + FIELDS, tablePath, ","); + exTable.setHost(pxfHost); + exTable.setPort(pxfPort); + exTable.setUserParameters(new String[]{"enable_parallel=true"}); + gpdb.createTableAndVerify(exTable); + + // Create foreign table without enable_parallel (defaults to false) + exTable = TableFactory.getPxfReadableCSVTable("pxf_parallel_disabled", + FIELDS, tablePath, ","); + exTable.setHost(pxfHost); + exTable.setPort(pxfPort); + gpdb.createTableAndVerify(exTable); + + // Write single CSV file to HDFS (1 file, 100 rows) for single-fragment tests + singleFragDir = hdfs.getWorkingDirectory() + "/parallel_single_frag"; + hdfs.createDirectory(singleFragDir); + + Table singleTable = new Table("single_frag", null); + for (int id = 1; id <= SINGLE_FRAG_ROWS; id++) { + singleTable.addRow(new String[]{String.valueOf(id), "row_" + id}); + } + hdfs.writeTableToFile(singleFragDir + "/data.csv", singleTable, ","); + + String singleFragPath = protocol.getExternalTablePath(hdfs.getBasePath(), singleFragDir); + + // Create foreign table for single-fragment with enable_parallel=true + exTable = TableFactory.getPxfReadableCSVTable("pxf_parallel_single_frag", + FIELDS, singleFragPath, ","); + exTable.setHost(pxfHost); + exTable.setPort(pxfPort); + exTable.setUserParameters(new String[]{"enable_parallel=true"}); + gpdb.createTableAndVerify(exTable); + } + + @Test(groups = {"features", "gpdb", "fdw"}) + public void testParallelCorrectness() throws Exception { + runSqlTest("features/parallel/correctness"); + } + + @Test(groups = {"features", "gpdb", "fdw"}) + public void testParallelExplain() throws Exception { + runSqlTest("features/parallel/explain"); + } + + @Test(groups = {"features", "gpdb", "fdw"}) + public void testParallelSingleFragment() throws Exception { + runSqlTest("features/parallel/single_fragment"); + } + + @Test(groups = {"features", "gpdb", "fdw"}) + public void testParallelRescan() throws Exception { + runSqlTest("features/parallel/rescan"); + } +} diff --git a/fdw/pxf_bridge.c b/fdw/pxf_bridge.c index 567665ac..d194f3aa 100644 --- a/fdw/pxf_bridge.c +++ b/fdw/pxf_bridge.c @@ -23,6 +23,7 @@ #include "cdb/cdbtm.h" #include "cdb/cdbvars.h" +#include "utils/builtins.h" /* helper function declarations */ static void BuildUriForRead(PxfFdwScanState *pxfsstate); @@ -182,3 +183,66 @@ FillBuffer(PxfFdwScanState *pxfsstate, char *start, int minlen, int maxlen) return ptr - start; } + +/* + * ============================================================================ + * Cloudberry Gang-Parallel Support (Virtual Segment ID) + * + * In Cloudberry, parallel execution uses "gang expansion" where + * multiple processes share the same physical segment ID. PostgreSQL's DSM + * callbacks (InitializeDSMForeignScan, InitializeWorkerForeignScan) are + * NOT invoked in this model. + * + * Instead of fragment-by-fragment coordination, we use "virtual segment IDs": + * each gang worker sends a unique virtual segment ID to PXF, so PXF's + * existing round-robin fragment distribution splits the data among workers + * automatically — no PXF server changes needed. + * + * Example: 3 physical segments × 4 workers = 12 virtual segments. + * Worker i on physical segment S sends virtual_seg_id = S + i * seg_count, + * with virtual_seg_count = seg_count * workers. + * ============================================================================ + */ + +/* + * PxfBridgeImportStartVirtual + * Start import with virtual segment ID for Cloudberry gang-parallel mode. + * + * Same as PxfBridgeImportStart, but after building the standard HTTP headers, + * overrides X-GP-SEGMENT-ID and X-GP-SEGMENT-COUNT with the virtual values. + * This makes PXF's round-robin assign a unique subset of fragments to each + * gang worker, eliminating data duplication. + */ +void +PxfBridgeImportStartVirtual(PxfFdwScanState *pxfsstate, + int virtualSegId, int virtualSegCount) +{ + char seg_id_str[16]; + char seg_count_str[16]; + + pxfsstate->churl_headers = churl_headers_init(); + + BuildUriForRead(pxfsstate); + BuildHttpHeaders(pxfsstate->churl_headers, + pxfsstate->options, + pxfsstate->relation, + pxfsstate->filter_str, + pxfsstate->retrieved_attrs, + pxfsstate->projectionInfo); + + /* Override physical segment ID/count with virtual values */ + pg_ltoa(virtualSegId, seg_id_str); + pg_ltoa(virtualSegCount, seg_count_str); + churl_headers_override(pxfsstate->churl_headers, "X-GP-SEGMENT-ID", seg_id_str); + churl_headers_override(pxfsstate->churl_headers, "X-GP-SEGMENT-COUNT", seg_count_str); + + elog(DEBUG3, "pxf_fdw: PxfBridgeImportStartVirtual physical_seg=%d " + "virtual_seg_id=%d virtual_seg_count=%d", + PXF_SEGMENT_ID, virtualSegId, virtualSegCount); + + pxfsstate->churl_handle = churl_init_download(pxfsstate->uri.data, + pxfsstate->churl_headers); + + /* read some bytes to make sure the connection is established */ + churl_read_check_connectivity(pxfsstate->churl_handle); +} diff --git a/fdw/pxf_bridge.h b/fdw/pxf_bridge.h index e5e53610..87590a62 100644 --- a/fdw/pxf_bridge.h +++ b/fdw/pxf_bridge.h @@ -49,6 +49,18 @@ typedef struct PxfFdwScanState PxfOptions *options; CopyFromState cstate; ProjectionInfo *projectionInfo; + + /* + * Cloudberry gang-parallel state (virtual segment ID based). + * Used when Cloudberry's gang expansion creates multiple processes per + * segment but PG's DSM callbacks are not invoked. Each gang worker gets + * a unique virtual segment ID so PXF's round-robin distributes fragments + * evenly without data duplication. + */ + bool gang_parallel; /* true when using virtual segment IDs */ + int worker_index; /* this worker's index within the segment gang */ + int virtual_seg_id; /* virtual segment ID sent to PXF */ + int virtual_seg_count; /* virtual segment count sent to PXF */ } PxfFdwScanState; /* @@ -80,4 +92,8 @@ int PxfBridgeRead(void *outbuf, int minlen, int maxlen, void *extra); /* Writes data from the given buffer of a given size to the PXF server */ int PxfBridgeWrite(PxfFdwModifyState *context, char *databuf, int datalen); +/* Start import with virtual segment ID for gang-parallel mode */ +void PxfBridgeImportStartVirtual(PxfFdwScanState *pxfsstate, + int virtualSegId, int virtualSegCount); + #endif /* _PXFBRIDGE_H */ diff --git a/fdw/pxf_fdw.c b/fdw/pxf_fdw.c index 645a1abc..b754472b 100644 --- a/fdw/pxf_fdw.c +++ b/fdw/pxf_fdw.c @@ -24,6 +24,7 @@ #include "foreign/fdwapi.h" #include "foreign/foreign.h" #include "nodes/pg_list.h" +#include "optimizer/cost.h" #include "optimizer/optimizer.h" #include "optimizer/paths.h" #include "optimizer/pathnode.h" @@ -32,6 +33,8 @@ #include "parser/parsetree.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/guc.h" +#include "access/parallel.h" PG_MODULE_MAGIC; @@ -93,6 +96,13 @@ static void EndCopyModify(CopyToState cstate); static void PxfBeginScanErrorCallback(void *arg); static void PxfCopyFromErrorCallback(void *arg); +/* Parallel scan support */ +static bool pxfIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte); + +/* Cloudberry gang-parallel support */ +static void InitGangParallelState(PxfFdwScanState *pxfsstate); +static void InitCopyStateVirtual(PxfFdwScanState *pxfsstate); + /* * Foreign-data wrapper handler functions: * returns a struct with pointers to the @@ -148,6 +158,9 @@ pxf_fdw_handler(PG_FUNCTION_ARGS) fdw_routine->EndForeignModify = pxfEndForeignModify; fdw_routine->IsForeignRelUpdatable = pxfIsForeignRelUpdatable; + /* Parallel scan support: tell the planner this FDW can run in parallel */ + fdw_routine->IsForeignScanParallelSafe = pxfIsForeignScanParallelSafe; + PG_RETURN_POINTER(fdw_routine); } @@ -248,29 +261,47 @@ pxfGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid) { - ForeignPath *path = NULL; - int total_cost = DEFAULT_PXF_FDW_STARTUP_COST; PxfFdwRelationInfo *fpinfo = (PxfFdwRelationInfo *) baserel->fdw_private; - + PxfOptions *options = PxfGetOptions(foreigntableid); + Cost startup = DEFAULT_PXF_FDW_STARTUP_COST; + Cost run_cost = 1000; /* data transfer cost estimate */ + Cost total = startup + run_cost; elog(DEBUG5, "pxf_fdw: pxfGetForeignPaths starts on segment: %d", PXF_SEGMENT_ID); - path = create_foreignscan_path(root, baserel, - NULL, /* default pathtarget */ - baserel->rows, - DEFAULT_PXF_FDW_STARTUP_COST, - total_cost, - NIL, /* no pathkeys */ - NULL, /* no outer rel either */ - NULL, /* no extra plan */ - fpinfo->retrieved_attrs); - + /* Path 1: non-parallel (always, as fallback) */ + add_path(baserel, + (Path *) create_foreignscan_path(root, baserel, NULL, + baserel->rows, startup, total, + NIL, NULL, NULL, + fpinfo->retrieved_attrs), + root); + /* Path 2: parallel partial (only if enabled and planner allows) */ + if (options->enable_parallel && baserel->consider_parallel) + { + int workers = max_parallel_workers_per_gather; - /* - * Create a ForeignPath node and add it as only possible path. - */ - add_path(baserel, (Path *) path, root); + if (workers > 0) + { + ForeignPath *pp; + + pp = create_foreignscan_path(root, baserel, NULL, + baserel->rows / workers, /* per-worker rows */ + startup, + startup + run_cost / workers, /* per-worker cost */ + NIL, NULL, NULL, + fpinfo->retrieved_attrs); + pp->path.parallel_safe = true; + pp->path.parallel_aware = true; + pp->path.parallel_workers = workers; + pp->path.locus.parallel_workers = workers; + + add_partial_path(baserel, (Path *) pp); + + elog(DEBUG3, "pxf_fdw: parallel partial path added, workers=%d", workers); + } + } elog(DEBUG5, "pxf_fdw: pxfGetForeignPaths ends on segment: %d", PXF_SEGMENT_ID); } @@ -382,7 +413,7 @@ pxfBeginForeignScan(ForeignScanState *node, int eflags) * Save state in node->fdw_state. We must save enough information to call * BeginCopyFrom() again. */ - pxfsstate = (PxfFdwScanState *) palloc(sizeof(PxfFdwScanState)); + pxfsstate = (PxfFdwScanState *) palloc0(sizeof(PxfFdwScanState)); initStringInfo(&pxfsstate->uri); pxfsstate->filter_str = filter_str; @@ -392,18 +423,32 @@ pxfBeginForeignScan(ForeignScanState *node, int eflags) pxfsstate->retrieved_attrs = retrieved_attrs; pxfsstate->projectionInfo = node->ss.ps.ps_ProjInfo; - /* Set up callback to identify error foreign relation. */ - ErrorContextCallback errcallback; - errcallback.callback = PxfBeginScanErrorCallback; - errcallback.arg = (void *) pxfsstate; - errcallback.previous = error_context_stack; - error_context_stack = &errcallback; + /* Initialize gang-parallel fields */ + pxfsstate->gang_parallel = false; + pxfsstate->worker_index = -1; + pxfsstate->virtual_seg_id = -1; + pxfsstate->virtual_seg_count = -1; - InitCopyState(pxfsstate); node->fdw_state = (void *) pxfsstate; - /* Restore the previous error callback */ - error_context_stack = errcallback.previous; + /* + * In parallel mode, defer connection setup to IterateForeignScan where + * fragments are assigned. In non-parallel mode, initialize immediately. + */ + if (!options->enable_parallel) + { + /* Set up callback to identify error foreign relation. */ + ErrorContextCallback errcallback; + errcallback.callback = PxfBeginScanErrorCallback; + errcallback.arg = (void *) pxfsstate; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + InitCopyState(pxfsstate); + + /* Restore the previous error callback */ + error_context_stack = errcallback.previous; + } elog(DEBUG5, "pxf_fdw: pxfBeginForeignScan ends on segment: %d", PXF_SEGMENT_ID); } @@ -426,54 +471,84 @@ pxfIterateForeignScan(ForeignScanState *node) ErrorContextCallback errcallback; bool found; - /* Set up callback to identify error line number. */ - errcallback.callback = PxfCopyFromErrorCallback; - errcallback.arg = (void *) pxfsstate; - errcallback.previous = error_context_stack; - error_context_stack = &errcallback; - - /* - * The protocol for loading a virtual tuple into a slot is first - * ExecClearTuple, then fill the values/isnull arrays, then - * ExecStoreVirtualTuple. If we don't find another row in the file, we - * just skip the last step, leaving the slot empty as required. - * - * We can pass ExprContext = NULL because we read all columns from the - * file, so no need to evaluate default expressions. - * - * We can also pass tupleOid = NULL because we don't allow oids for - * foreign tables. - */ ExecClearTuple(slot); - found = NextCopyFrom(pxfsstate->cstate, - NULL, - slot->tts_values, - slot->tts_isnull); - - if (found) { - if (pxfsstate->cstate->cdbsreh) + /* + * Non-parallel or Cloudberry gang-parallel mode. + * + * Both use a single PXF connection — the difference is only in + * the HTTP headers: + * - Non-parallel: physical segment ID / count + * - Gang-parallel: virtual segment ID / count (splits work among + * gang workers so each reads a unique subset of fragments) + */ + if (pxfsstate->cstate == NULL) { + MemoryContext oldcxt; + ErrorContextCallback begincb; + /* - * If NextCopyFrom failed, the processed row count will have - * already been updated, but we need to update it in a successful - * case. - * - * GPDB_91_MERGE_FIXME: this is almost certainly not the right - * place for this, but row counts are currently scattered all over - * the place. Consolidate. + * Detect Cloudberry gang-parallel case: enable_parallel is set + * and CBDB's gang expansion has created parallel workers on this + * segment (TotalParallelWorkerNumberOfSlice > 0), but PG DSM + * callbacks were not invoked. */ - pxfsstate->cstate->cdbsreh->processed++; + if (!pxfsstate->gang_parallel && + pxfsstate->options->enable_parallel && + TotalParallelWorkerNumberOfSlice > 0) + { + elog(DEBUG1, "pxf_fdw: segment %d activating gang-parallel mode " + "(virtual segment IDs)", + PXF_SEGMENT_ID); + + oldcxt = MemoryContextSwitchTo(node->ss.ps.state->es_query_cxt); + InitGangParallelState(pxfsstate); + MemoryContextSwitchTo(oldcxt); + } + + begincb.callback = PxfBeginScanErrorCallback; + begincb.arg = (void *) pxfsstate; + begincb.previous = error_context_stack; + error_context_stack = &begincb; + + oldcxt = MemoryContextSwitchTo(node->ss.ps.state->es_query_cxt); + + if (pxfsstate->gang_parallel) + InitCopyStateVirtual(pxfsstate); + else + InitCopyState(pxfsstate); + + MemoryContextSwitchTo(oldcxt); + error_context_stack = begincb.previous; } - ExecStoreVirtualTuple(slot); - } + /* Reset error-context fields before each read */ + pxfsstate->cstate->cur_attname = NULL; + pxfsstate->cstate->cur_attval = NULL; - /* Remove error callback. */ - error_context_stack = errcallback.previous; + errcallback.callback = PxfCopyFromErrorCallback; + errcallback.arg = (void *) pxfsstate; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; - return slot; + found = NextCopyFrom(pxfsstate->cstate, + NULL, + slot->tts_values, + slot->tts_isnull); + + if (found) + { + if (pxfsstate->cstate->cdbsreh) + pxfsstate->cstate->cdbsreh->processed++; + + ExecStoreVirtualTuple(slot); + } + + error_context_stack = errcallback.previous; + + return slot; + } } /* @@ -487,8 +562,20 @@ pxfReScanForeignScan(ForeignScanState *node) PxfFdwScanState *pxfsstate = (PxfFdwScanState *) node->fdw_state; - EndCopyFrom(pxfsstate->cstate); - InitCopyState(pxfsstate); + if (pxfsstate->gang_parallel) + { + /* Gang-parallel: single connection with virtual segment ID. */ + if (pxfsstate->cstate != NULL) + EndCopyFrom(pxfsstate->cstate); + InitCopyStateVirtual(pxfsstate); + } + else + { + /* Non-parallel: original code path */ + if (pxfsstate->cstate != NULL) + EndCopyFrom(pxfsstate->cstate); + InitCopyState(pxfsstate); + } elog(DEBUG5, "pxf_fdw: pxfReScanForeignScan ends on segment: %d", PXF_SEGMENT_ID); } @@ -502,19 +589,35 @@ pxfEndForeignScan(ForeignScanState *node) { elog(DEBUG5, "pxf_fdw: pxfEndForeignScan starts on segment: %d", PXF_SEGMENT_ID); - ForeignScan *foreignScan = (ForeignScan *) node->ss.ps.plan; PxfFdwScanState *pxfsstate = (PxfFdwScanState *) node->fdw_state; - /* Release resources */ - if (foreignScan->fdw_private) - { - elog(DEBUG5, "Freeing fdw_private"); - pfree(foreignScan->fdw_private); - } - /* if pxfsstate is NULL, we are in EXPLAIN; nothing to do */ if (pxfsstate) - EndCopyFrom(pxfsstate->cstate); + { + if (pxfsstate->churl_handle != NULL) + { + churl_cleanup(pxfsstate->churl_handle, false); + pxfsstate->churl_handle = NULL; + } + + if (pxfsstate->churl_headers != NULL) + { + churl_headers_cleanup(pxfsstate->churl_headers); + pxfsstate->churl_headers = NULL; + } + + if (pxfsstate->cstate != NULL) + { + EndCopyFrom(pxfsstate->cstate); + pxfsstate->cstate = NULL; + } + + /* Free the URI buffer */ + pfree(pxfsstate->uri.data); + + pfree(pxfsstate); + node->fdw_state = NULL; + } elog(DEBUG5, "pxf_fdw: pxfEndForeignScan ends on segment: %d", PXF_SEGMENT_ID); } @@ -777,6 +880,108 @@ InitCopyState(PxfFdwScanState *pxfsstate) pxfsstate->cstate = cstate; } +/* + * InitCopyStateVirtual + * Gang-parallel equivalent of InitCopyState(). + * + * Identical to InitCopyState except it calls PxfBridgeImportStartVirtual() + * with the pre-computed virtual_seg_id and virtual_seg_count so that PXF + * distributes fragments to this gang worker using the virtual segment scheme. + */ +static void +InitCopyStateVirtual(PxfFdwScanState *pxfsstate) +{ + CopyFromState cstate; + + PxfBridgeImportStartVirtual(pxfsstate, + pxfsstate->virtual_seg_id, + pxfsstate->virtual_seg_count); + + cstate = BeginCopyFrom( + NULL, + pxfsstate->relation, + NULL, + NULL, + false, /* is_program */ + &PxfBridgeRead, /* data_source_cb */ + pxfsstate, /* data_source_cb_extra */ + NIL, /* attnamelist */ + pxfsstate->options->copy_options /* copy options */ + ); + + if (pxfsstate->options->reject_limit == -1) + { + cstate->cdbsreh = NULL; + cstate->errMode = ALL_OR_NOTHING; + } + else + { + cstate->errMode = SREH_IGNORE; + + if (pxfsstate->options->log_errors) + cstate->errMode = SREH_LOG; + + cstate->cdbsreh = makeCdbSreh(pxfsstate->options->reject_limit, + pxfsstate->options->is_reject_limit_rows, + pxfsstate->options->resource, + (char *) cstate->cur_relname, + pxfsstate->options->log_errors ? LOG_ERRORS_ENABLE : LOG_ERRORS_DISABLE); + + cstate->cdbsreh->relid = RelationGetRelid(pxfsstate->relation); + } + + cstate->fe_msgbuf = makeStringInfo(); + + cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext, + "PxfFdwMemCxt", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + pxfsstate->cstate = cstate; +} + +/* + * InitGangParallelState + * Initialize gang-parallel state for Cloudberry's gang expansion model. + * + * Called on first IterateForeignScan when we detect that enable_parallel=true + * and CBDB's gang expansion has created parallel workers on this segment + * (TotalParallelWorkerNumberOfSlice > 0), but PG's DSM callbacks were not + * invoked. + * + * Uses Cloudberry kernel variables ParallelWorkerNumberOfSlice and + * TotalParallelWorkerNumberOfSlice to identify each gang worker, then + * computes virtual segment IDs: + * virtual_seg_id = physical_seg_id + worker_id * physical_seg_count + * virtual_seg_count = physical_seg_count * num_workers + * + * PXF's existing round-robin (fragment % seg_count == seg_id) then + * automatically distributes a unique subset of fragments to each gang worker, + * eliminating data duplication without any PXF server changes. + */ +static void +InitGangParallelState(PxfFdwScanState *pxfsstate) +{ + int seg_id = PXF_SEGMENT_ID; + int seg_count = PXF_SEGMENT_COUNT; + int num_workers = TotalParallelWorkerNumberOfSlice; + int worker_id = ParallelWorkerNumberOfSlice; + + if (num_workers <= 0 || worker_id < 0) + return; /* not in gang-parallel mode */ + + pxfsstate->gang_parallel = true; + pxfsstate->worker_index = worker_id; + pxfsstate->virtual_seg_id = seg_id + worker_id * seg_count; + pxfsstate->virtual_seg_count = seg_count * num_workers; + + elog(DEBUG1, "pxf_fdw: segment %d gang-parallel: worker_id=%d/%d " + "virtual_seg_id=%d virtual_seg_count=%d", + seg_id, worker_id, num_workers, + pxfsstate->virtual_seg_id, pxfsstate->virtual_seg_count); +} + /* * Initiates a copy state for pxfBeginForeignModify() */ @@ -915,22 +1120,43 @@ void PxfCopyFromErrorCallback(void *arg) { PxfFdwScanState *pxfsstate = (PxfFdwScanState *) arg; - CopyFromState cstate = pxfsstate->cstate; + CopyFromState cstate; char curlineno_str[32]; + if (pxfsstate == NULL) + return; + + /* + * Derive relname and resource from pxfsstate fields that are set once + * in BeginForeignScan and maintained by the executor — these are always + * valid. We deliberately avoid cstate->cur_relname because cstate + * internals can be unreliable when PXF returns error data that corrupts + * the COPY parser state. + */ + const char *relname = pxfsstate->relation + ? RelationGetRelationName(pxfsstate->relation) + : "(unknown)"; + const char *resource = (pxfsstate->options && pxfsstate->options->resource) + ? pxfsstate->options->resource : "(unknown)"; + + cstate = pxfsstate->cstate; + if (cstate == NULL) + { + errcontext("Foreign table %s, resource %s", relname, resource); + return; + } + snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT, cstate->cur_lineno); if (cstate->opts.binary) { - /* can't usefully display the data */ - if (cstate->cur_attname) - errcontext("Foreign table %s, record %s of %s, column %s", - cstate->cur_relname, curlineno_str, pxfsstate->options->resource, - cstate->cur_attname); - else - errcontext("Foreign table %s, record %s of %s", - cstate->cur_relname, curlineno_str, pxfsstate->options->resource); + /* + * PXF does not support binary format. If we land here, cstate + * internals may be unreliable — report only safe information. + */ + errcontext("Foreign table %s, record %s of %s", + relname, curlineno_str, resource); } else { @@ -941,7 +1167,7 @@ PxfCopyFromErrorCallback(void *arg) attval = limit_printout_length(cstate->cur_attval); errcontext("Foreign table %s, record %s of %s, column %s: \"%s\"", - cstate->cur_relname, curlineno_str, pxfsstate->options->resource, + relname, curlineno_str, resource, cstate->cur_attname, attval); pfree(attval); } @@ -949,7 +1175,7 @@ PxfCopyFromErrorCallback(void *arg) { /* error is relevant to a particular column, value is NULL */ errcontext("Foreign table %s, record %s of %s, column %s: null input", - cstate->cur_relname, curlineno_str, pxfsstate->options->resource, + relname, curlineno_str, resource, cstate->cur_attname); } else @@ -969,7 +1195,7 @@ PxfCopyFromErrorCallback(void *arg) /* token was found, get the actual message and set it as the main error message */ errmsg("%s", token_index + PXF_ERROR_TOKEN_SIZE); errcontext("Foreign table %s, record %s of %s", - cstate->cur_relname, curlineno_str, pxfsstate->options->resource); + relname, curlineno_str, resource); } /* * Error is relevant to a particular line. @@ -988,7 +1214,7 @@ PxfCopyFromErrorCallback(void *arg) lineval = limit_printout_length(cstate->line_buf.data); //truncateEolStr(line_buf, cstate->eol_type); <-- this is done in GP6, but not in GP7 ? errcontext("Foreign table %s, record %s of %s: \"%s\"", - cstate->cur_relname, curlineno_str, pxfsstate->options->resource, lineval); + relname, curlineno_str, resource, lineval); pfree(lineval); } else @@ -1002,8 +1228,40 @@ PxfCopyFromErrorCallback(void *arg) * and just report the line number. */ errcontext("Foreign table %s, record %s of %s", - cstate->cur_relname, curlineno_str, pxfsstate->options->resource); + relname, curlineno_str, resource); } } } } + +/* + * ============================================================================ + * Parallel Foreign Scan Support + * ============================================================================ + */ + +/* + * pxfIsForeignScanParallelSafe + * Determine whether a foreign scan is parallel safe. + * + * Returns true if enable_parallel option is set for the foreign table, + * allowing the planner to consider parallel execution. + */ +static bool +pxfIsForeignScanParallelSafe(PlannerInfo *root, + RelOptInfo *rel, + RangeTblEntry *rte) +{ + PxfOptions *options = PxfGetOptions(rte->relid); + + elog(DEBUG3, "pxf_fdw: pxfIsForeignScanParallelSafe called, enable_parallel=%d", + options->enable_parallel); + + /* + * Only return true if parallel execution is explicitly enabled. + * This ensures backward compatibility - existing queries continue + * to work without parallel execution unless explicitly enabled. + */ + return options->enable_parallel; +} + diff --git a/fdw/pxf_option.c b/fdw/pxf_option.c index 13ccfe92..0d347031 100644 --- a/fdw/pxf_option.c +++ b/fdw/pxf_option.c @@ -39,6 +39,7 @@ #define FDW_OPTION_REJECT_LIMIT "reject_limit" #define FDW_OPTION_REJECT_LIMIT_TYPE "reject_limit_type" #define FDW_OPTION_RESOURCE "resource" +#define FDW_OPTION_ENABLE_PARALLEL "enable_parallel" #define FDW_COPY_OPTION_FORMAT "format" #define FDW_COPY_OPTION_HEADER "header" @@ -72,6 +73,10 @@ static const struct PxfFdwOption valid_options[] = { {FDW_OPTION_REJECT_LIMIT_TYPE, ForeignTableRelationId}, {FDW_OPTION_LOG_ERRORS, ForeignTableRelationId}, + /* Parallel execution */ + {FDW_OPTION_ENABLE_PARALLEL, ForeignTableRelationId}, + {FDW_OPTION_ENABLE_PARALLEL, ForeignServerRelationId}, + /* Sentinel */ {NULL, InvalidOid} }; @@ -454,6 +459,8 @@ PxfGetOptions(Oid foreigntableid) opt->log_errors = defGetBoolean(def); else if (strcmp(def->defname, FDW_OPTION_DISABLE_PPD) == 0) opt->disable_ppd = defGetBoolean(def); + else if (strcmp(def->defname, FDW_OPTION_ENABLE_PARALLEL) == 0) + opt->enable_parallel = defGetBoolean(def); else if (strcmp(def->defname, FDW_OPTION_FORMAT) == 0) { opt->format = defGetString(def); @@ -565,20 +572,37 @@ static void ValidateOption(char *option, Oid catalog) { const struct PxfFdwOption *entry; + bool found = false; for (entry = valid_options; entry->optname; entry++) { - /* option can only be defined at its catalog level */ - if (strcmp(entry->optname, option) == 0 && catalog != entry->optcontext) + if (strcmp(entry->optname, option) == 0) { - Relation rel = RelationIdGetRelation(entry->optcontext); + /* option is recognized; check if it's allowed at this catalog level */ + if (catalog == entry->optcontext) + return; /* valid — exact match */ + found = true; /* name matches but at a different level */ + } + } - ereport(ERROR, - (errcode(ERRCODE_FDW_INVALID_OPTION_NAME), - errmsg( - "the %s option can only be defined at the %s level", - option, - RelationGetRelationName(rel)))); + if (found) + { + /* + * The option exists but is not valid at this catalog level. + * Report the first matching level for the error message. + */ + for (entry = valid_options; entry->optname; entry++) + { + if (strcmp(entry->optname, option) == 0) + { + Relation rel = RelationIdGetRelation(entry->optcontext); + + ereport(ERROR, + (errcode(ERRCODE_FDW_INVALID_OPTION_NAME), + errmsg("the %s option can only be defined at the %s level", + option, + RelationGetRelationName(rel)))); + } } } } diff --git a/fdw/pxf_option.h b/fdw/pxf_option.h index 15d27781..317139fc 100644 --- a/fdw/pxf_option.h +++ b/fdw/pxf_option.h @@ -55,6 +55,9 @@ typedef struct PxfOptions /* Encoding options */ char *data_encoding; /* The encoding of the data on the external system */ const char *database_encoding; /* The database encoding */ + + /* Parallel execution options */ + bool enable_parallel; /* whether to enable parallel foreign scan */ } PxfOptions; /* Functions prototypes for pxf_option.c file */