Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ def get_default_system_parameters(
# all. Only add it in UNINTERESTING_SYSTEM_PARAMETERS if none of the above
# apply.
UNINTERESTING_SYSTEM_PARAMETERS = [
"enable_compute_half_join2",
"enable_mz_join_core",
"linear_join_yielding",
"enable_lgalloc_eager_reclamation",
Expand Down
1 change: 1 addition & 0 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1580,6 +1580,7 @@ def __init__(
# behavior, you should add it. Feature flags which turn on/off
# externally visible features should not be flipped.
self.uninteresting_flags: list[str] = [
"enable_compute_half_join2",
"enable_mz_join_core",
"enable_compute_correction_v2",
"linear_join_yielding",
Expand Down
11 changes: 11 additions & 0 deletions src/compute-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ use std::time::Duration;

use mz_dyncfg::{Config, ConfigSet};

/// Whether rendering should use `half_join2` rather than DD's `half_join` for delta joins.
///
/// `half_join2` avoids quadratic behavior in certain join patterns. This flag exists as an escape
/// hatch to revert to the old implementation if issues arise.
pub const ENABLE_HALF_JOIN2: Config<bool> = Config::new(
"enable_compute_half_join2",
true,
"Whether compute should use `half_join2` rather than DD's `half_join` to render delta joins.",
);

/// Whether rendering should use `mz_join_core` rather than DD's `JoinCore::join_core`.
pub const ENABLE_MZ_JOIN_CORE: Config<bool> = Config::new(
"enable_mz_join_core",
Expand Down Expand Up @@ -355,6 +365,7 @@ pub const MV_SINK_ADVANCE_PERSIST_FRONTIERS: Config<bool> = Config::new(
/// Adds the full set of all compute `Config`s.
pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
configs
.add(&ENABLE_HALF_JOIN2)
.add(&ENABLE_MZ_JOIN_CORE)
.add(&ENABLE_CORRECTION_V2)
.add(&ENABLE_TEMPORAL_BUCKETING)
Expand Down
161 changes: 149 additions & 12 deletions src/compute/src/render/join/delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@

use std::collections::{BTreeMap, BTreeSet};

use std::rc::Rc;

use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
use differential_dataflow::{AsCollection, VecCollection};
use mz_compute_types::dyncfgs::ENABLE_HALF_JOIN2;
use mz_compute_types::plan::join::JoinClosure;
use mz_compute_types::plan::join::delta_join::{DeltaJoinPlan, DeltaPathPlan, DeltaStagePlan};
use mz_dyncfg::ConfigSet;
use mz_expr::MirScalarExpr;
use mz_repr::fixed_length::ToDatumIter;
use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
Expand Down Expand Up @@ -210,6 +214,7 @@ where
stream_thinning,
|t1, t2| t1.le(t2),
closure,
Rc::clone(&self.config_set),
)
} else {
build_halfjoin::<_, RowRowAgent<_, _>, _>(
Expand All @@ -219,6 +224,7 @@ where
stream_thinning,
|t1, t2| t1.lt(t2),
closure,
Rc::clone(&self.config_set),
)
}
}
Expand All @@ -231,6 +237,7 @@ where
stream_thinning,
|t1, t2| t1.le(t2),
closure,
Rc::clone(&self.config_set),
)
} else {
build_halfjoin::<_, RowRowEnter<_, _, _>, _>(
Expand All @@ -240,6 +247,7 @@ where
stream_thinning,
|t1, t2| t1.lt(t2),
closure,
Rc::clone(&self.config_set),
)
}
}
Expand Down Expand Up @@ -323,6 +331,7 @@ fn build_halfjoin<G, Tr, CF>(
prev_thinning: Vec<usize>,
comparison: CF,
closure: JoinClosure,
config_set: Rc<ConfigSet>,
) -> (
VecCollection<G, (Row, G::Timestamp), Diff>,
VecCollection<G, DataflowError, Diff>,
Expand All @@ -334,6 +343,8 @@ where
for<'a> Tr::Val<'a>: ToDatumIter,
CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static,
{
let use_half_join2 = ENABLE_HALF_JOIN2.get(&config_set);

let name = "DeltaJoinKeyPreparation";
type CB<C> = CapacityContainerBuilder<C>;
let (updates, errs) = updates.map_fallible::<CB<_>, CB<_>, _, _, _>(name, {
Expand All @@ -357,10 +368,38 @@ where
Ok((key, row_value, time))
}
});
let mut datums = DatumVec::new();
let datums = DatumVec::new();

if use_half_join2 {
build_halfjoin2(updates, trace, comparison, closure, datums, errs)
} else {
build_halfjoin1(updates, trace, comparison, closure, datums, errs)
}
}

/// `half_join2` implementation (less-quadratic, new default).
fn build_halfjoin2<G, Tr, CF>(
updates: VecCollection<G, (Row, Row, G::Timestamp), Diff>,
trace: Arranged<G, Tr>,
comparison: CF,
closure: JoinClosure,
mut datums: DatumVec,
errs: VecCollection<G, DataflowError, Diff>,
) -> (
VecCollection<G, (Row, G::Timestamp), Diff>,
VecCollection<G, DataflowError, Diff>,
)
where
G: Scope,
G::Timestamp: RenderTimestamp,
Tr: TraceReader<KeyOwn = Row, Time = G::Timestamp, Diff = Diff> + Clone + 'static,
for<'a> Tr::Val<'a>: ToDatumIter,
CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static,
{
type CB<C> = CapacityContainerBuilder<C>;

if closure.could_error() {
let (oks, errs2) = differential_dogs3::operators::half_join::half_join_internal_unsafe(
let (oks, errs2) = differential_dogs3::operators::half_join2::half_join_internal_unsafe(
updates,
trace,
|time, antichain| {
Expand All @@ -371,13 +410,7 @@ where
// in that we seem to yield too much and do too little work when we do.
|_timer, count| count > 1_000_000,
// TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
move |session: &mut Session<'_, '_, G::Timestamp, CB<Vec<_>>, _>,
key,
stream_row,
lookup_row,
initial,
diff1,
output| {
move |session: &mut CB<Vec<_>>, key, stream_row, lookup_row, initial, diff1, output| {
let mut row_builder = SharedRow::get();
let temp_storage = RowArena::new();

Expand All @@ -392,7 +425,8 @@ where
let row = row.as_ref().map(|row| row.cloned()).map_err(Clone::clone);
let diff = diff1.clone() * diff2.clone();
let data = ((row, time.clone()), initial.clone(), diff);
session.give(data);
use timely::container::PushInto;
session.push_into(data);
}
},
)
Expand All @@ -409,7 +443,7 @@ where
errs.concat(errs2.as_collection()),
)
} else {
let oks = differential_dogs3::operators::half_join::half_join_internal_unsafe(
let oks = differential_dogs3::operators::half_join2::half_join_internal_unsafe(
updates,
trace,
|time, antichain| {
Expand All @@ -420,9 +454,112 @@ where
// in that we seem to yield too much and do too little work when we do.
|_timer, count| count > 1_000_000,
// TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
move |session: &mut CB<Vec<_>>, key, stream_row, lookup_row, initial, diff1, output| {
if output.is_empty() {
return;
}

let mut row_builder = SharedRow::get();
let temp_storage = RowArena::new();

let mut datums_local = datums.borrow();
datums_local.extend(key.iter());
datums_local.extend(stream_row.iter());
datums_local.extend(lookup_row.to_datum_iter());

if let Some(row) = closure
.apply(&mut datums_local, &temp_storage, &mut row_builder)
.expect("Closure claimed to never error")
{
for (time, diff2) in output.drain(..) {
let diff = diff1.clone() * diff2.clone();
use timely::container::PushInto;
session.push_into(((row.clone(), time.clone()), initial.clone(), diff));
}
}
},
);

(oks.as_collection(), errs)
}
}

/// Original `half_join` implementation (fallback).
fn build_halfjoin1<G, Tr, CF>(
updates: VecCollection<G, (Row, Row, G::Timestamp), Diff>,
trace: Arranged<G, Tr>,
comparison: CF,
closure: JoinClosure,
mut datums: DatumVec,
errs: VecCollection<G, DataflowError, Diff>,
) -> (
VecCollection<G, (Row, G::Timestamp), Diff>,
VecCollection<G, DataflowError, Diff>,
)
where
G: Scope,
G::Timestamp: RenderTimestamp,
Tr: TraceReader<KeyOwn = Row, Time = G::Timestamp, Diff = Diff> + Clone + 'static,
for<'a> Tr::Val<'a>: ToDatumIter,
CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static,
{
type CB<C> = CapacityContainerBuilder<C>;

if closure.could_error() {
let (oks, errs2) = differential_dogs3::operators::half_join::half_join_internal_unsafe(
updates,
trace,
|time, antichain| {
antichain.insert(time.step_back());
},
comparison,
|_timer, count| count > 1_000_000,
move |session: &mut Session<'_, '_, G::Timestamp, CB<Vec<_>>, _>,
key,
stream_row: &Row,
lookup_row,
initial,
diff1,
output| {
let mut row_builder = SharedRow::get();
let temp_storage = RowArena::new();

let mut datums_local = datums.borrow();
datums_local.extend(key.iter());
datums_local.extend(stream_row.iter());
datums_local.extend(lookup_row.to_datum_iter());

let row = closure.apply(&mut datums_local, &temp_storage, &mut row_builder);

for (time, diff2) in output.drain(..) {
let row = row.as_ref().map(|row| row.cloned()).map_err(Clone::clone);
let diff = diff1.clone() * diff2.clone();
let data = ((row, time.clone()), initial.clone(), diff);
session.give(data);
}
},
)
.ok_err(|(data_time, init_time, diff)| match data_time {
(Ok(data), time) => Ok((data.map(|data| (data, time)), init_time, diff)),
(Err(err), _time) => Err((DataflowError::from(err), init_time, diff)),
});

(
oks.as_collection().flat_map(|x| x),
errs.concat(errs2.as_collection()),
)
} else {
let oks = differential_dogs3::operators::half_join::half_join_internal_unsafe(
updates,
trace,
|time, antichain| {
antichain.insert(time.step_back());
},
comparison,
|_timer, count| count > 1_000_000,
move |session: &mut Session<'_, '_, G::Timestamp, CB<Vec<_>>, _>,
key,
stream_row,
stream_row: &Row,
lookup_row,
initial,
diff1,
Expand Down
Loading