diff --git a/Cargo.lock b/Cargo.lock index c0ec375182996..206b62d80d197 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2835,9 +2835,9 @@ dependencies = [ [[package]] name = "differential-dataflow" -version = "0.21.1" +version = "0.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e48d74e683cb4e65abbe236833918e32927c8992d0b33f2bf8104a1c8f2c569" +checksum = "00884df58b0d5552740fcdffc12c69a715a657d410583aab31c4ada0deadc8bd" dependencies = [ "columnar", "columnation", @@ -2850,9 +2850,9 @@ dependencies = [ [[package]] name = "differential-dogs3" -version = "0.21.1" +version = "0.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fd00e0147ae1495aa0c723681b66aa4a37d5b1220358fca5524e96b9c91d988" +checksum = "4c3f649d5a6891b7b43845cfaae7028fe09c88c80624253a58a28ab22aa5c073" dependencies = [ "differential-dataflow", "serde", @@ -9957,7 +9957,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7" dependencies = [ "heck", - "itertools 0.14.0", + "itertools 0.10.5", "log", "multimap", "petgraph", @@ -9978,7 +9978,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.114", diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py index 4bb1649ed16a3..600baab9c82d0 100644 --- a/misc/python/materialize/mzcompose/__init__.py +++ b/misc/python/materialize/mzcompose/__init__.py @@ -468,6 +468,7 @@ def get_default_system_parameters( # all. Only add it in UNINTERESTING_SYSTEM_PARAMETERS if none of the above # apply. UNINTERESTING_SYSTEM_PARAMETERS = [ + "enable_compute_half_join2", "enable_mz_join_core", "linear_join_yielding", "enable_lgalloc_eager_reclamation", diff --git a/misc/python/materialize/parallel_workload/action.py b/misc/python/materialize/parallel_workload/action.py index 4b5cd4b05749e..adce5312ccfb9 100644 --- a/misc/python/materialize/parallel_workload/action.py +++ b/misc/python/materialize/parallel_workload/action.py @@ -1580,6 +1580,7 @@ def __init__( # behavior, you should add it. Feature flags which turn on/off # externally visible features should not be flipped. self.uninteresting_flags: list[str] = [ + "enable_compute_half_join2", "enable_mz_join_core", "enable_compute_correction_v2", "linear_join_yielding", diff --git a/src/compute-types/src/dyncfgs.rs b/src/compute-types/src/dyncfgs.rs index 32069205245de..9bc77e4f619a4 100644 --- a/src/compute-types/src/dyncfgs.rs +++ b/src/compute-types/src/dyncfgs.rs @@ -13,6 +13,16 @@ use std::time::Duration; use mz_dyncfg::{Config, ConfigSet}; +/// Whether rendering should use `half_join2` rather than DD's `half_join` for delta joins. +/// +/// `half_join2` avoids quadratic behavior in certain join patterns. This flag exists as an escape +/// hatch to revert to the old implementation if issues arise. +pub const ENABLE_HALF_JOIN2: Config = Config::new( + "enable_compute_half_join2", + true, + "Whether compute should use `half_join2` rather than DD's `half_join` to render delta joins.", +); + /// Whether rendering should use `mz_join_core` rather than DD's `JoinCore::join_core`. pub const ENABLE_MZ_JOIN_CORE: Config = Config::new( "enable_mz_join_core", @@ -355,6 +365,7 @@ pub const MV_SINK_ADVANCE_PERSIST_FRONTIERS: Config = Config::new( /// Adds the full set of all compute `Config`s. pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { configs + .add(&ENABLE_HALF_JOIN2) .add(&ENABLE_MZ_JOIN_CORE) .add(&ENABLE_CORRECTION_V2) .add(&ENABLE_TEMPORAL_BUCKETING) diff --git a/src/compute/src/render/join/delta_join.rs b/src/compute/src/render/join/delta_join.rs index dba7d0a212353..1bbf032bd3313 100644 --- a/src/compute/src/render/join/delta_join.rs +++ b/src/compute/src/render/join/delta_join.rs @@ -15,12 +15,16 @@ use std::collections::{BTreeMap, BTreeSet}; +use std::rc::Rc; + use differential_dataflow::consolidation::ConsolidatingContainerBuilder; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::{BatchReader, Cursor, TraceReader}; use differential_dataflow::{AsCollection, VecCollection}; +use mz_compute_types::dyncfgs::ENABLE_HALF_JOIN2; use mz_compute_types::plan::join::JoinClosure; use mz_compute_types::plan::join::delta_join::{DeltaJoinPlan, DeltaPathPlan, DeltaStagePlan}; +use mz_dyncfg::ConfigSet; use mz_expr::MirScalarExpr; use mz_repr::fixed_length::ToDatumIter; use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow}; @@ -210,6 +214,7 @@ where stream_thinning, |t1, t2| t1.le(t2), closure, + Rc::clone(&self.config_set), ) } else { build_halfjoin::<_, RowRowAgent<_, _>, _>( @@ -219,6 +224,7 @@ where stream_thinning, |t1, t2| t1.lt(t2), closure, + Rc::clone(&self.config_set), ) } } @@ -231,6 +237,7 @@ where stream_thinning, |t1, t2| t1.le(t2), closure, + Rc::clone(&self.config_set), ) } else { build_halfjoin::<_, RowRowEnter<_, _, _>, _>( @@ -240,6 +247,7 @@ where stream_thinning, |t1, t2| t1.lt(t2), closure, + Rc::clone(&self.config_set), ) } } @@ -323,6 +331,7 @@ fn build_halfjoin( prev_thinning: Vec, comparison: CF, closure: JoinClosure, + config_set: Rc, ) -> ( VecCollection, VecCollection, @@ -334,6 +343,8 @@ where for<'a> Tr::Val<'a>: ToDatumIter, CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static, { + let use_half_join2 = ENABLE_HALF_JOIN2.get(&config_set); + let name = "DeltaJoinKeyPreparation"; type CB = CapacityContainerBuilder; let (updates, errs) = updates.map_fallible::, CB<_>, _, _, _>(name, { @@ -357,10 +368,38 @@ where Ok((key, row_value, time)) } }); - let mut datums = DatumVec::new(); + let datums = DatumVec::new(); + + if use_half_join2 { + build_halfjoin2(updates, trace, comparison, closure, datums, errs) + } else { + build_halfjoin1(updates, trace, comparison, closure, datums, errs) + } +} + +/// `half_join2` implementation (less-quadratic, new default). +fn build_halfjoin2( + updates: VecCollection, + trace: Arranged, + comparison: CF, + closure: JoinClosure, + mut datums: DatumVec, + errs: VecCollection, +) -> ( + VecCollection, + VecCollection, +) +where + G: Scope, + G::Timestamp: RenderTimestamp, + Tr: TraceReader + Clone + 'static, + for<'a> Tr::Val<'a>: ToDatumIter, + CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static, +{ + type CB = CapacityContainerBuilder; if closure.could_error() { - let (oks, errs2) = differential_dogs3::operators::half_join::half_join_internal_unsafe( + let (oks, errs2) = differential_dogs3::operators::half_join2::half_join_internal_unsafe( updates, trace, |time, antichain| { @@ -371,13 +410,7 @@ where // in that we seem to yield too much and do too little work when we do. |_timer, count| count > 1_000_000, // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use. - move |session: &mut Session<'_, '_, G::Timestamp, CB>, _>, - key, - stream_row, - lookup_row, - initial, - diff1, - output| { + move |session: &mut CB>, key, stream_row, lookup_row, initial, diff1, output| { let mut row_builder = SharedRow::get(); let temp_storage = RowArena::new(); @@ -392,7 +425,8 @@ where let row = row.as_ref().map(|row| row.cloned()).map_err(Clone::clone); let diff = diff1.clone() * diff2.clone(); let data = ((row, time.clone()), initial.clone(), diff); - session.give(data); + use timely::container::PushInto; + session.push_into(data); } }, ) @@ -409,7 +443,7 @@ where errs.concat(errs2.as_collection()), ) } else { - let oks = differential_dogs3::operators::half_join::half_join_internal_unsafe( + let oks = differential_dogs3::operators::half_join2::half_join_internal_unsafe( updates, trace, |time, antichain| { @@ -420,9 +454,112 @@ where // in that we seem to yield too much and do too little work when we do. |_timer, count| count > 1_000_000, // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use. + move |session: &mut CB>, key, stream_row, lookup_row, initial, diff1, output| { + if output.is_empty() { + return; + } + + let mut row_builder = SharedRow::get(); + let temp_storage = RowArena::new(); + + let mut datums_local = datums.borrow(); + datums_local.extend(key.iter()); + datums_local.extend(stream_row.iter()); + datums_local.extend(lookup_row.to_datum_iter()); + + if let Some(row) = closure + .apply(&mut datums_local, &temp_storage, &mut row_builder) + .expect("Closure claimed to never error") + { + for (time, diff2) in output.drain(..) { + let diff = diff1.clone() * diff2.clone(); + use timely::container::PushInto; + session.push_into(((row.clone(), time.clone()), initial.clone(), diff)); + } + } + }, + ); + + (oks.as_collection(), errs) + } +} + +/// Original `half_join` implementation (fallback). +fn build_halfjoin1( + updates: VecCollection, + trace: Arranged, + comparison: CF, + closure: JoinClosure, + mut datums: DatumVec, + errs: VecCollection, +) -> ( + VecCollection, + VecCollection, +) +where + G: Scope, + G::Timestamp: RenderTimestamp, + Tr: TraceReader + Clone + 'static, + for<'a> Tr::Val<'a>: ToDatumIter, + CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static, +{ + type CB = CapacityContainerBuilder; + + if closure.could_error() { + let (oks, errs2) = differential_dogs3::operators::half_join::half_join_internal_unsafe( + updates, + trace, + |time, antichain| { + antichain.insert(time.step_back()); + }, + comparison, + |_timer, count| count > 1_000_000, + move |session: &mut Session<'_, '_, G::Timestamp, CB>, _>, + key, + stream_row: &Row, + lookup_row, + initial, + diff1, + output| { + let mut row_builder = SharedRow::get(); + let temp_storage = RowArena::new(); + + let mut datums_local = datums.borrow(); + datums_local.extend(key.iter()); + datums_local.extend(stream_row.iter()); + datums_local.extend(lookup_row.to_datum_iter()); + + let row = closure.apply(&mut datums_local, &temp_storage, &mut row_builder); + + for (time, diff2) in output.drain(..) { + let row = row.as_ref().map(|row| row.cloned()).map_err(Clone::clone); + let diff = diff1.clone() * diff2.clone(); + let data = ((row, time.clone()), initial.clone(), diff); + session.give(data); + } + }, + ) + .ok_err(|(data_time, init_time, diff)| match data_time { + (Ok(data), time) => Ok((data.map(|data| (data, time)), init_time, diff)), + (Err(err), _time) => Err((DataflowError::from(err), init_time, diff)), + }); + + ( + oks.as_collection().flat_map(|x| x), + errs.concat(errs2.as_collection()), + ) + } else { + let oks = differential_dogs3::operators::half_join::half_join_internal_unsafe( + updates, + trace, + |time, antichain| { + antichain.insert(time.step_back()); + }, + comparison, + |_timer, count| count > 1_000_000, move |session: &mut Session<'_, '_, G::Timestamp, CB>, _>, key, - stream_row, + stream_row: &Row, lookup_row, initial, diff1,