diff --git a/differential-dataflow/examples/columnar/columnar_support.rs b/differential-dataflow/examples/columnar/columnar_support.rs index ced700bec..149a93b60 100644 --- a/differential-dataflow/examples/columnar/columnar_support.rs +++ b/differential-dataflow/examples/columnar/columnar_support.rs @@ -70,14 +70,17 @@ pub use updates::Updates; pub struct RecordedUpdates { pub updates: Updates, pub records: usize, + /// Whether `updates` is known to be sorted and consolidated + /// (no duplicate (key, val, time) triples, no zero diffs). + pub consolidated: bool, } impl Default for RecordedUpdates { - fn default() -> Self { Self { updates: Default::default(), records: 0 } } + fn default() -> Self { Self { updates: Default::default(), records: 0, consolidated: true } } } impl Clone for RecordedUpdates { - fn clone(&self) -> Self { Self { updates: self.updates.clone(), records: self.records } } + fn clone(&self) -> Self { Self { updates: self.updates.clone(), records: self.records, consolidated: self.consolidated } } } impl timely::Accountable for RecordedUpdates { @@ -133,7 +136,9 @@ mod container_impls { let t2 = T2::to_inner(t1_owned.clone()); new_times.push(&t2); } + // TODO: Assumes Enter (to_inner) is order-preserving on times. RecordedUpdates { + consolidated: self.consolidated, updates: Updates { keys: self.updates.keys, vals: self.updates.vals, @@ -168,6 +173,7 @@ mod container_impls { RecordedUpdates { updates: flat.consolidate(), records: self.records, + consolidated: true, } } } @@ -185,7 +191,9 @@ mod container_impls { output.push((k, v, &new_time, d)); } } - RecordedUpdates { updates: output, records: self.records } + // TODO: Time advancement may not be order preserving, but .. it could be. + // TODO: Before this is consolidated the above would need to be `form`ed. + RecordedUpdates { updates: output, records: self.records, consolidated: false } } } } @@ -222,7 +230,7 @@ mod column_builder { let mut refs = self.current.borrow().into_index_iter().collect::>(); refs.sort(); let updates = Updates::form(refs.into_iter()); - self.pending.push_back(RecordedUpdates { updates, records }); + self.pending.push_back(RecordedUpdates { updates, records, consolidated: true }); self.current.clear(); } } @@ -260,7 +268,7 @@ mod column_builder { let mut refs = self.current.borrow().into_index_iter().collect::>(); refs.sort(); let updates = Updates::form(refs.into_iter()); - self.pending.push_back(RecordedUpdates { updates, records }); + self.pending.push_back(RecordedUpdates { updates, records, consolidated: true }); self.current.clear(); } self.empty = self.pending.pop_front(); @@ -327,7 +335,7 @@ mod distributor { let mut first_records = total_records.saturating_sub(non_empty.saturating_sub(1)); for (pusher, output) in pushers.iter_mut().zip(outputs) { if !output.keys.values.is_empty() { - let recorded = RecordedUpdates { updates: output, records: first_records }; + let recorded = RecordedUpdates { updates: output, records: first_records, consolidated: container.consolidated }; first_records = 1; let mut recorded = recorded; Message::push_at(&mut recorded, time.clone(), pusher); @@ -433,8 +441,8 @@ pub mod arrangement { } use crate::{Updates, RecordedUpdates}; - use differential_dataflow::trace::implementations::merge_batcher::{MergeBatcher, InternalMerger}; - type ValBatcher2 = MergeBatcher, TrieChunker, InternalMerger>>; + use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher; + type ValBatcher2 = MergeBatcher, TrieChunker, trie_merger::TrieMerger>; /// A chunker that unwraps `RecordedUpdates` into bare `Updates` for the merge batcher. /// The `records` accounting is discarded here — it has served its purpose for exchange. @@ -456,7 +464,9 @@ pub mod arrangement { impl<'a, U: crate::layout::ColumnarUpdate> timely::container::PushInto<&'a mut RecordedUpdates> for TrieChunker { fn push_into(&mut self, container: &'a mut RecordedUpdates) { - self.ready.push_back(std::mem::take(&mut container.updates)); + let mut updates = std::mem::take(&mut container.updates); + if !container.consolidated { updates = updates.consolidate(); } + if updates.len() > 0 { self.ready.push_back(updates); } } } @@ -478,8 +488,7 @@ pub mod arrangement { pub mod batcher { - use std::ops::Range; - use columnar::{Borrow, Columnar, Container, Index, Len, Push}; + use columnar::{Borrow, Columnar, Index, Len, Push}; use differential_dataflow::difference::{Semigroup, IsZero}; use timely::progress::frontier::{Antichain, AntichainRef}; use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge; @@ -488,237 +497,652 @@ pub mod arrangement { use crate::Updates; impl timely::container::SizableContainer for Updates { - fn at_capacity(&self) -> bool { - use columnar::Len; - self.diffs.values.len() >= 64 * 1024 - } + fn at_capacity(&self) -> bool { self.diffs.values.len() >= 64 * 1024 } fn ensure_capacity(&mut self, _stash: &mut Option) { } } + /// Required by `reduce_abelian`'s bound `Builder::Input: InternalMerge`. + /// Not called at runtime — our batcher uses `TrieMerger` instead. + /// TODO: Relax the bound in DD's reduce to remove this requirement. impl InternalMerge for Updates { - type TimeOwned = U::Time; + fn len(&self) -> usize { unimplemented!() } + fn clear(&mut self) { + use columnar::Clear; + self.keys.clear(); + self.vals.clear(); + self.times.clear(); + self.diffs.clear(); + } + fn merge_from(&mut self, _others: &mut [Self], _positions: &mut [usize]) { unimplemented!() } + fn extract(&mut self, + _upper: AntichainRef, + _frontier: &mut Antichain, + _keep: &mut Self, + _ship: &mut Self, + ) { unimplemented!() } + } + } - fn len(&self) -> usize { self.diffs.values.len() } - fn clear(&mut self) { *self = Self::default(); } + pub mod trie_merger { - #[inline(never)] - fn merge_from(&mut self, others: &mut [Self], positions: &mut [usize]) { - match others.len() { - 0 => {}, - 1 => { - // Bulk copy: take remaining keys from position onward. - let other = &mut others[0]; - let pos = &mut positions[0]; - if self.keys.values.len() == 0 && *pos == 0 { - std::mem::swap(self, other); - return; + use columnar::{Columnar, Len}; + use timely::PartialOrder; + use timely::progress::frontier::{Antichain, AntichainRef}; + use differential_dataflow::trace::implementations::merge_batcher::Merger; + + use crate::ColumnarUpdate as Update; + use crate::Updates; + + pub struct TrieMerger { + _marker: std::marker::PhantomData, + } + + impl Default for TrieMerger { + fn default() -> Self { Self { _marker: std::marker::PhantomData } } + } + + /// A merging iterator over two sorted iterators. + struct Merging { + iter1: std::iter::Peekable, + iter2: std::iter::Peekable, + } + + impl Iterator for Merging + where + K: Copy + Ord, + V: Copy + Ord, + T: Copy + Ord, + I1: Iterator, + I2: Iterator, + { + type Item = (K, V, T, D); + #[inline] + fn next(&mut self) -> Option { + match (self.iter1.peek(), self.iter2.peek()) { + (Some(a), Some(b)) => { + if (a.0, a.1, a.2) <= (b.0, b.1, b.2) { + self.iter1.next() + } else { + self.iter2.next() } - let other_len = other.keys.values.len(); - self.extend_from_keys(other, *pos .. other_len); - *pos = other_len; - }, - 2 => { - let mut this_sum = U::Diff::default(); - let mut that_sum = U::Diff::default(); - - let (left, right) = others.split_at_mut(1); - let this = &left[0]; - let that = &right[0]; - let this_keys = this.keys.values.borrow(); - let that_keys = that.keys.values.borrow(); - let mut this_key_range = positions[0] .. this_keys.len(); - let mut that_key_range = positions[1] .. that_keys.len(); - - while !this_key_range.is_empty() && !that_key_range.is_empty() && !timely::container::SizableContainer::at_capacity(self) { - let this_key = this_keys.get(this_key_range.start); - let that_key = that_keys.get(that_key_range.start); - match this_key.cmp(&that_key) { + } + (Some(_), None) => self.iter1.next(), + (None, Some(_)) => self.iter2.next(), + (None, None) => None, + } + } + } + + /// Build sorted `Updates` chunks from a sorted iterator of refs, + /// using `Updates::form` (which consolidates internally) on batches. + fn form_chunks<'a, U: Update>( + sorted: impl Iterator>>, + output: &mut Vec>, + ) { + let mut sorted = sorted.peekable(); + while sorted.peek().is_some() { + let chunk = Updates::::form((&mut sorted).take(64 * 1024)); + if chunk.len() > 0 { + output.push(chunk); + } + } + } + + impl Merger for TrieMerger + where + U::Time: Ord + PartialOrder + Clone + 'static, + { + type Chunk = Updates; + type Time = U::Time; + + fn merge( + &mut self, + list1: Vec>, + list2: Vec>, + output: &mut Vec>, + _stash: &mut Vec>, + ) { + Self::merge_batches(list1, list2, output, _stash); + } + + fn extract( + &mut self, + merged: Vec, + upper: AntichainRef, + frontier: &mut Antichain, + ship: &mut Vec, + kept: &mut Vec, + _stash: &mut Vec, + ) { + // Flatten the sorted, consolidated chain into refs. + let all = merged.iter().flat_map(|chunk| chunk.iter()); + + // Partition into two sorted streams by time. + let mut time_owned = U::Time::default(); + let mut keep_vec = Vec::new(); + let mut ship_vec = Vec::new(); + for (k, v, t, d) in all { + Columnar::copy_from(&mut time_owned, t); + if upper.less_equal(&time_owned) { + frontier.insert_ref(&time_owned); + keep_vec.push((k, v, t, d)); + } else { + ship_vec.push((k, v, t, d)); + } + } + + // Build chunks via form (which consolidates internally). + form_chunks::(keep_vec.into_iter(), kept); + form_chunks::(ship_vec.into_iter(), ship); + } + + fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) { + use timely::Accountable; + (chunk.record_count() as usize, 0, 0, 0) + } + } + + impl TrieMerger + where + U::Time: Ord + PartialOrder + Clone + 'static, + { + /// Iterator-based merge: flatten, merge, consolidate, form. + /// Correct but slow — used as fallback. + #[allow(dead_code)] + fn merge_iterator( + list1: &[Updates], + list2: &[Updates], + output: &mut Vec>, + ) { + let iter1 = list1.iter().flat_map(|chunk| chunk.iter()); + let iter2 = list2.iter().flat_map(|chunk| chunk.iter()); + + let merged = Merging { + iter1: iter1.peekable(), + iter2: iter2.peekable(), + }; + + form_chunks::(merged, output); + } + + /// A merge implementation that operates batch-at-a-time. + #[inline(never)] + fn merge_batches( + list1: Vec>, + list2: Vec>, + output: &mut Vec>, + stash: &mut Vec>, + ) { + + // The design for efficient "batch" merginging of chains of links is: + // 0. We choose a target link size, K, and will keep the average link size at least K and the max size at 2k. + // K should be large enough to amortize some set-up, but not so large that one or two extra break the bank. + // 1. We will repeatedly consider pairs of links, and fully merge one with a prefix of the other. + // The last elements of each link will tell us which of the two suffixes must be held back. + // 2. We then have a chain of as many links as we started with, with potential defects to correct: + // a. A link may contain some number of zeros: we can remove them if we are eager, based on size. + // b. A link may contain more than 2K updates; we can split it. + // c. Two adjacent links may contain fewer than 2K updates; we can meld (careful append) them. + // 3. After a pass of the above, we should have restored the invariant. + // We can try and me smarter and fuse some of the above work rather than explicitly stage results. + // + // The challenging moment is the merge that can start with a suffix of one link, involving a prefix of one link. + // These could be the same link, different links, and generally there is the potential for complexity here. + + let mut builder = ChainBuilder::default(); + + let mut queue1: std::collections::VecDeque<_> = list1.into(); + let mut queue2: std::collections::VecDeque<_> = list2.into(); + + // The first unconsumed update in each block, via (k_idx, v_idx, t_idx), or None if exhausted. + // These are (0,0,0) for a new block, and should become None once there are no remaining updates. + let mut cursor1 = queue1.pop_front().map(|b| ((0,0,0), b)); + let mut cursor2 = queue2.pop_front().map(|b| ((0,0,0), b)); + + // For each pair of batches + while cursor1.is_some() && cursor2.is_some() { + Self::merge_batch(&mut cursor1, &mut cursor2, &mut builder, stash); + if cursor1.is_none() { cursor1 = queue1.pop_front().map(|b| ((0,0,0), b)); } + if cursor2.is_none() { cursor2 = queue2.pop_front().map(|b| ((0,0,0), b)); } + } + + // TODO: create batch for the non-empty cursor. + if let Some(((k,v,t),batch)) = cursor1 { + let mut out_batch = stash.pop().unwrap_or_default(); + let empty: Updates = Default::default(); + write_from_surveys( + &batch, + &empty, + &[Report::This(0, 1)], + &[Report::This(k, batch.keys.values.len())], + &[Report::This(v, batch.vals.values.len())], + &[Report::This(t, batch.times.values.len())], + &mut out_batch, + ); + builder.push(out_batch); + } + if let Some(((k,v,t),batch)) = cursor2 { + let mut out_batch = stash.pop().unwrap_or_default(); + let empty: Updates = Default::default(); + write_from_surveys( + &empty, + &batch, + &[Report::That(0, 1)], + &[Report::That(k, batch.keys.values.len())], + &[Report::That(v, batch.vals.values.len())], + &[Report::That(t, batch.times.values.len())], + &mut out_batch, + ); + builder.push(out_batch); + } + + builder.extend(queue1); + builder.extend(queue2); + *output = builder.done(); + // TODO: Tidy output to satisfy structural invariants. + } + + /// Merge two batches, one completely and another through the corresponding prefix. + /// + /// Each invocation determines the maximum amount of both batches we can merge, determined + /// by comparing the elements at the tails of each batch, and locating the lesser in other. + /// We will merge the whole of the batch containing the lesser, and the prefix up through + /// the lesser element in the other batch, setting the cursor to the first element strictly + /// greater than that lesser element. + /// + /// The algorithm uses a list of `Report` findings to map the interleavings of the layers. + /// Each indicates either a range exclusive to one of the inputs, or a one element common + /// to the layers from both inputs, which must be further explored. This map would normally + /// allow the full merge to happen, but we need to carefully start at each cursor, and end + /// just before the first element greater than the lesser bound. + /// + /// The consumed prefix and disjoint suffix should be single report entries, and it seems + /// fine to first produce all reports and then reflect on the cursors, rather than use the + /// cursors as part of the mapping. + #[inline(never)] + fn merge_batch( + batch1: &mut Option<((usize, usize, usize), Updates)>, + batch2: &mut Option<((usize, usize, usize), Updates)>, + builder: &mut ChainBuilder, + stash: &mut Vec>, + ) { + let ((k0_idx, v0_idx, t0_idx), updates0) = batch1.take().unwrap(); + let ((k1_idx, v1_idx, t1_idx), updates1) = batch2.take().unwrap(); + + use columnar::Borrow; + let keys0 = updates0.keys.borrow(); + let keys1 = updates1.keys.borrow(); + let vals0 = updates0.vals.borrow(); + let vals1 = updates1.vals.borrow(); + let times0 = updates0.times.borrow(); + let times1 = updates1.times.borrow(); + + // Survey the interleaving of the two inputs. + let mut key_survey = survey::>(keys0, keys1, &[Report::Both(0,0)]); + let mut val_survey = survey::>(vals0, vals1, &key_survey); + let mut time_survey = survey::>(times0, times1, &val_survey); + + // We now know enough to start writing into an output batch. + // We should update the input surveys to reflect the subset + // of data that we want. + // + // At most one cursor should be non-zero (assert!). + // A non-zero cursor must correspond to the first entry of the surveys, + // as there is at least one consumed update that precedes the other batch. + // We need to nudge that report forward to align with the cursor, potentially + // squeezing the report to nothing (to the upper bound). + + // We start by updating the surveys to reflect the cursors. + // If either cursor is set, then its batch has an element strictly less than the other batch. + // We therefore expect to find a prefix of This/That at the start of the survey. + if (k0_idx, v0_idx, t0_idx) != (0,0,0) { + let mut done = false; while !done { if let Report::This(l,u) = &mut key_survey[0] { if *u <= k0_idx { key_survey.remove(0); } else { *l = k0_idx; done = true; } } else { done = true; } } + let mut done = false; while !done { if let Report::This(l,u) = &mut val_survey[0] { if *u <= v0_idx { val_survey.remove(0); } else { *l = v0_idx; done = true; } } else { done = true; } } + let mut done = false; while !done { if let Report::This(l,u) = &mut time_survey[0] { if *u <= t0_idx { time_survey.remove(0); } else { *l = t0_idx; done = true; } } else { done = true; } } + } + + if (k1_idx, v1_idx, t1_idx) != (0,0,0) { + let mut done = false; while !done { if let Report::That(l,u) = &mut key_survey[0] { if *u <= k1_idx { key_survey.remove(0); } else { *l = k1_idx; done = true; } } else { done = true; } } + let mut done = false; while !done { if let Report::That(l,u) = &mut val_survey[0] { if *u <= v1_idx { val_survey.remove(0); } else { *l = v1_idx; done = true; } } else { done = true; } } + let mut done = false; while !done { if let Report::That(l,u) = &mut time_survey[0] { if *u <= t1_idx { time_survey.remove(0); } else { *l = t1_idx; done = true; } } else { done = true; } } + } + + // We want to trim the tails of the surveys to only cover ranges present in both inputs. + // We can determine which was "longer" by looking at the last entry of the bottom layer, + // which tells us which input (or both) contained the last element. + // + // From the bottom layer up, we'll identify the index of the last item, and then determine + // the index of the list it belongs to. We use that index in the next layer, to locate the + // index of the list it belongs to, on upward. + let next_cursor = match time_survey.last().unwrap() { + Report::This(_,_) => { + // Collect the last value indexes known to strictly exceed an entry in the other batch. + let mut t = times0.values.len(); + while let Some(Report::This(l,_)) = time_survey.last() { t = *l; time_survey.pop(); } + let mut v = vals0.values.len(); + while let Some(Report::This(l,_)) = val_survey.last() { v = *l; val_survey.pop(); } + let mut k = keys0.values.len(); + while let Some(Report::This(l,_)) = key_survey.last() { k = *l; key_survey.pop(); } + // Now we may need to correct by nudging down. + if v == times0.len() || times0.bounds.bounds(v).0 > t { v -= 1; } + if k == vals0.len() || vals0.bounds.bounds(k).0 > v { k -= 1; } + Some(Ok((k,v,t))) + } + Report::Both(_,_) => { None } + Report::That(_,_) => { + // Collect the last value indexes known to strictly exceed an entry in the other batch. + let mut t = times1.values.len(); + while let Some(Report::That(l,_)) = time_survey.last() { t = *l; time_survey.pop(); } + let mut v = vals1.values.len(); + while let Some(Report::That(l,_)) = val_survey.last() { v = *l; val_survey.pop(); } + let mut k = keys1.values.len(); + while let Some(Report::That(l,_)) = key_survey.last() { k = *l; key_survey.pop(); } + // Now we may need to correct by nudging down. + if v == times1.len() || times1.bounds.bounds(v).0 > t { v -= 1; } + if k == vals1.len() || vals1.bounds.bounds(k).0 > v { k -= 1; } + Some(Err((k,v,t))) + } + }; + + // Having updated the surveys, we now copy over the ranges they identify. + let mut out_batch = stash.pop().unwrap_or_default(); + // TODO: We should be able to size `out_batch` pretty accurately from the survey. + write_from_surveys(&updates0, &updates1, &[Report::Both(0,0)], &key_survey, &val_survey, &time_survey, &mut out_batch); + builder.push(out_batch); + + match next_cursor { + Some(Ok(kvt)) => { *batch1 = Some((kvt, updates0)); } + Some(Err(kvt)) => {*batch2 = Some((kvt, updates1)); } + None => { } + } + } + + } + + /// Write merged output from four levels of survey reports. + /// + /// Each layer is written independently: `write_layer` handles keys, vals, + /// and times; `write_diffs` handles diff consolidation. + #[inline(never)] + fn write_from_surveys( + updates0: &Updates, + updates1: &Updates, + root_survey: &[Report], + key_survey: &[Report], + val_survey: &[Report], + time_survey: &[Report], + output: &mut Updates, + ) { + use columnar::Borrow; + + write_layer(updates0.keys.borrow(), updates1.keys.borrow(), root_survey, key_survey, &mut output.keys); + write_layer(updates0.vals.borrow(), updates1.vals.borrow(), key_survey, val_survey, &mut output.vals); + write_layer(updates0.times.borrow(), updates1.times.borrow(), val_survey, time_survey, &mut output.times); + write_diffs::(updates0.diffs.borrow(), updates1.diffs.borrow(), time_survey, &mut output.diffs); + } + + /// From two sequences of interleaved lists, map out the interleaving of their values. + /// + /// The sequence of input reports identify constraints on the sorted order of lists in the two inputs, + /// callout out ranges of each that are exclusively order, and elements that have equal prefixes and + /// therefore "overlap" and should be further investigated through the values of the lists. + /// + /// The output should have the same form but for the next layer: subject to the ordering of `reports`, + /// a similar report for the values of the two lists, appropriate for the next layer. + #[inline(never)] + pub fn survey<'a, C: columnar::Container: Ord>>( + lists0: as columnar::Borrow>::Borrowed<'a>, + lists1: as columnar::Borrow>::Borrowed<'a>, + reports: &[Report], + ) -> Vec { + use columnar::Index; + let mut output = Vec::with_capacity(reports.len()); // may grow larger, but at least this large. + for report in reports.iter() { + match report { + Report::This(lower0, upper0) => { + let (new_lower, _) = lists0.bounds.bounds(*lower0); + let (_, new_upper) = lists0.bounds.bounds(*upper0-1); + output.push(Report::This(new_lower, new_upper)); + } + Report::Both(index0, index1) => { + + // Fetch the bounds from the layers. + let (mut lower0, upper0) = lists0.bounds.bounds(*index0); + let (mut lower1, upper1) = lists1.bounds.bounds(*index1); + + // Scour the intersecting range for matches. + while lower0 < upper0 && lower1 < upper1 { + let val0 = lists0.values.get(lower0); + let val1 = lists1.values.get(lower1); + match val0.cmp(&val1) { std::cmp::Ordering::Less => { - let lower = this_key_range.start; - gallop(this_keys, &mut this_key_range, |x| x < that_key); - self.extend_from_keys(this, lower .. this_key_range.start); + let start = lower0; + lower0 += 1; + gallop(lists0.values, &mut lower0, upper0, |x| x < val1); + output.push(Report::This(start, lower0)); }, std::cmp::Ordering::Equal => { - let values_len = self.vals.values.len(); - let mut this_val_range = this.vals_bounds(this_key_range.start .. this_key_range.start+1); - let mut that_val_range = that.vals_bounds(that_key_range.start .. that_key_range.start+1); - while !this_val_range.is_empty() && !that_val_range.is_empty() { - let this_val = this.vals.values.borrow().get(this_val_range.start); - let that_val = that.vals.values.borrow().get(that_val_range.start); - match this_val.cmp(&that_val) { - std::cmp::Ordering::Less => { - let lower = this_val_range.start; - gallop(this.vals.values.borrow(), &mut this_val_range, |x| x < that_val); - self.extend_from_vals(this, lower .. this_val_range.start); - }, - std::cmp::Ordering::Equal => { - let updates_len = self.times.values.len(); - let mut this_time_range = this.times_bounds(this_val_range.start .. this_val_range.start+1); - let mut that_time_range = that.times_bounds(that_val_range.start .. that_val_range.start+1); - while !this_time_range.is_empty() && !that_time_range.is_empty() { - let this_time = this.times.values.borrow().get(this_time_range.start); - let this_diff = this.diffs.values.borrow().get(this_time_range.start); - let that_time = that.times.values.borrow().get(that_time_range.start); - let that_diff = that.diffs.values.borrow().get(that_time_range.start); - match this_time.cmp(&that_time) { - std::cmp::Ordering::Less => { - let lower = this_time_range.start; - gallop(this.times.values.borrow(), &mut this_time_range, |x| x < that_time); - self.times.values.extend_from_self(this.times.values.borrow(), lower .. this_time_range.start); - self.diffs.extend_from_self(this.diffs.borrow(), lower .. this_time_range.start); - }, - std::cmp::Ordering::Equal => { - this_sum.copy_from(this_diff); - that_sum.copy_from(that_diff); - this_sum.plus_equals(&that_sum); - if !this_sum.is_zero() { - self.times.values.push(this_time); - self.diffs.values.push(&this_sum); - self.diffs.bounds.push(self.diffs.values.len() as u64); - } - this_time_range.start += 1; - that_time_range.start += 1; - }, - std::cmp::Ordering::Greater => { - let lower = that_time_range.start; - gallop(that.times.values.borrow(), &mut that_time_range, |x| x < this_time); - self.times.values.extend_from_self(that.times.values.borrow(), lower .. that_time_range.start); - self.diffs.extend_from_self(that.diffs.borrow(), lower .. that_time_range.start); - }, - } - } - // Remaining from this side - if !this_time_range.is_empty() { - self.times.values.extend_from_self(this.times.values.borrow(), this_time_range.clone()); - self.diffs.extend_from_self(this.diffs.borrow(), this_time_range.clone()); - } - // Remaining from that side - if !that_time_range.is_empty() { - self.times.values.extend_from_self(that.times.values.borrow(), that_time_range.clone()); - self.diffs.extend_from_self(that.diffs.borrow(), that_time_range.clone()); - } - if self.times.values.len() > updates_len { - self.times.bounds.push(self.times.values.len() as u64); - self.vals.values.push(this_val); - } - this_val_range.start += 1; - that_val_range.start += 1; - }, - std::cmp::Ordering::Greater => { - let lower = that_val_range.start; - gallop(that.vals.values.borrow(), &mut that_val_range, |x| x < this_val); - self.extend_from_vals(that, lower .. that_val_range.start); - }, - } - } - self.extend_from_vals(this, this_val_range); - self.extend_from_vals(that, that_val_range); - if self.vals.values.len() > values_len { - self.vals.bounds.push(self.vals.values.len() as u64); - self.keys.values.push(this_key); - } - this_key_range.start += 1; - that_key_range.start += 1; + output.push(Report::Both(lower0, lower1)); + lower0 += 1; + lower1 += 1; }, std::cmp::Ordering::Greater => { - let lower = that_key_range.start; - gallop(that_keys, &mut that_key_range, |x| x < this_key); - self.extend_from_keys(that, lower .. that_key_range.start); + let start = lower1; + lower1 += 1; + gallop(lists1.values, &mut lower1, upper1, |x| x < val0); + output.push(Report::That(start, lower1)); }, } } - // Copy remaining from whichever side has data, up to capacity. - while !this_key_range.is_empty() && !timely::container::SizableContainer::at_capacity(self) { - let lower = this_key_range.start; - this_key_range.start = this_key_range.end; // take all remaining - self.extend_from_keys(this, lower .. this_key_range.start); - } - while !that_key_range.is_empty() && !timely::container::SizableContainer::at_capacity(self) { - let lower = that_key_range.start; - that_key_range.start = that_key_range.end; - self.extend_from_keys(that, lower .. that_key_range.start); - } - positions[0] = this_key_range.start; - positions[1] = that_key_range.start; - }, - n => unimplemented!("{n}-way merge not supported"), + if lower0 < upper0 { output.push(Report::This(lower0, upper0)); } + if lower1 < upper1 { output.push(Report::That(lower1, upper1)); } + + } + Report::That(lower1, upper1) => { + let (new_lower, _) = lists1.bounds.bounds(*lower1); + let (_, new_upper) = lists1.bounds.bounds(*upper1-1); + output.push(Report::That(new_lower, new_upper)); + } } } - fn extract( - &mut self, - upper: AntichainRef, - frontier: &mut Antichain, - keep: &mut Self, - ship: &mut Self, - ) { - let mut time = U::Time::default(); - for key_idx in 0 .. self.keys.values.len() { - let key = self.keys.values.borrow().get(key_idx); - let keep_vals_len = keep.vals.values.len(); - let ship_vals_len = ship.vals.values.len(); - for val_idx in self.vals_bounds(key_idx..key_idx+1) { - let val = self.vals.values.borrow().get(val_idx); - let keep_times_len = keep.times.values.len(); - let ship_times_len = ship.times.values.len(); - for time_idx in self.times_bounds(val_idx..val_idx+1) { - let t = self.times.values.borrow().get(time_idx); - let diff = self.diffs.values.borrow().get(time_idx); - time.copy_from(t); - if upper.less_equal(&time) { - frontier.insert_ref(&time); - keep.times.values.push(t); - keep.diffs.values.push(diff); - keep.diffs.bounds.push(keep.diffs.values.len() as u64); - } - else { - ship.times.values.push(t); - ship.diffs.values.push(diff); - ship.diffs.bounds.push(ship.diffs.values.len() as u64); + output + } + + /// Write one layer of merged output from a list survey and item survey. + /// + /// The list survey describes which lists to produce (from the layer above). + /// The item survey describes how the items within those lists interleave. + /// Both surveys are consumed completely; a mismatch is a bug. + /// + /// Pruning (from cursor adjustments) can affect the first and last list + /// survey entries: the item survey's ranges may not match the natural + /// bounds of those lists. Middle entries are guaranteed unpruned and can + /// be bulk-copied. + #[inline(never)] + pub fn write_layer<'a, C: columnar::Container: Ord>>( + lists0: as columnar::Borrow>::Borrowed<'a>, + lists1: as columnar::Borrow>::Borrowed<'a>, + list_survey: &[Report], + item_survey: &[Report], + output: &mut crate::updates::Lists, + ) { + use columnar::{Container, Index, Len, Push}; + + let mut item_idx = 0; + + for (pos, list_report) in list_survey.iter().enumerate() { + let is_first = pos == 0; + let is_last = pos == list_survey.len() - 1; + let may_be_pruned = is_first || is_last; + + match list_report { + Report::This(lo, hi) => { + let Report::This(item_lo, item_hi) = item_survey[item_idx] else { unreachable!("Expected This in item survey for This list") }; + item_idx += 1; + if may_be_pruned { + // Item range may not match natural bounds; copy items in bulk + // but compute per-list bounds from natural bounds clamped to + // the item range. + let base = output.values.len(); + output.values.extend_from_self(lists0.values, item_lo..item_hi); + for i in *lo..*hi { + let (_, nat_hi) = lists0.bounds.bounds(i); + output.bounds.push((base + nat_hi.min(item_hi) - item_lo) as u64); } + } else { + output.extend_from_self(lists0, *lo..*hi); } - if keep.times.values.len() > keep_times_len { - keep.times.bounds.push(keep.times.values.len() as u64); - keep.vals.values.push(val); - } - if ship.times.values.len() > ship_times_len { - ship.times.bounds.push(ship.times.values.len() as u64); - ship.vals.values.push(val); + } + Report::That(lo, hi) => { + let Report::That(item_lo, item_hi) = item_survey[item_idx] else { unreachable!("Expected That in item survey for That list") }; + item_idx += 1; + if may_be_pruned { + let base = output.values.len(); + output.values.extend_from_self(lists1.values, item_lo..item_hi); + for i in *lo..*hi { + let (_, nat_hi) = lists1.bounds.bounds(i); + output.bounds.push((base + nat_hi.min(item_hi) - item_lo) as u64); + } + } else { + output.extend_from_self(lists1, *lo..*hi); } } - if keep.vals.values.len() > keep_vals_len { - keep.vals.bounds.push(keep.vals.values.len() as u64); - keep.keys.values.push(key); + Report::Both(i0, i1) => { + // Merge: consume item survey entries until both sides are covered. + let (mut c0, end0) = lists0.bounds.bounds(*i0); + let (mut c1, end1) = lists1.bounds.bounds(*i1); + while (c0 < end0 || c1 < end1) && item_idx < item_survey.len() { + match item_survey[item_idx] { + Report::This(lo, hi) => { + if lo >= end0 { break; } + output.values.extend_from_self(lists0.values, lo..hi); + c0 = hi; + } + Report::That(lo, hi) => { + if lo >= end1 { break; } + output.values.extend_from_self(lists1.values, lo..hi); + c1 = hi; + } + Report::Both(v0, v1) => { + if v0 >= end0 && v1 >= end1 { break; } + output.values.push(lists0.values.get(v0)); + c0 = v0 + 1; + c1 = v1 + 1; + } + } + item_idx += 1; + } + output.bounds.push(output.values.len() as u64); } - if ship.vals.values.len() > ship_vals_len { - ship.vals.bounds.push(ship.vals.values.len() as u64); - ship.keys.values.push(key); + } + } + } + + /// Write the diff layer from a time survey and two diff inputs. + /// + /// The time survey is the item-level survey for the time layer, which + /// doubles as the list survey for diffs (one diff list per time entry). + /// + /// - `This(lo, hi)`: bulk-copy diff lists from input 0. + /// - `That(lo, hi)`: bulk-copy diff lists from input 1. + /// - `Both(t0, t1)`: consolidate the two singleton diffs. Push `[sum]` + /// if non-zero, or an empty list `[]` if they cancel. + #[inline(never)] + pub fn write_diffs( + diffs0: > as columnar::Borrow>::Borrowed<'_>, + diffs1: > as columnar::Borrow>::Borrowed<'_>, + time_survey: &[Report], + output: &mut crate::updates::Lists>, + ) { + use columnar::{Columnar, Container, Index, Len, Push}; + use differential_dataflow::difference::{Semigroup, IsZero}; + + for report in time_survey.iter() { + match report { + Report::This(lo, hi) => { output.extend_from_self(diffs0, *lo..*hi); } + Report::That(lo, hi) => { output.extend_from_self(diffs1, *lo..*hi); } + Report::Both(t0, t1) => { + // Read singleton diffs via list bounds, consolidate. + let (d0_lo, d0_hi) = diffs0.bounds.bounds(*t0); + let (d1_lo, d1_hi) = diffs1.bounds.bounds(*t1); + assert_eq!(d0_hi - d0_lo, 1, "Expected singleton diff list at t0={t0}"); + assert_eq!(d1_hi - d1_lo, 1, "Expected singleton diff list at t1={t1}"); + let mut diff: U::Diff = Columnar::into_owned(diffs0.values.get(d0_lo)); + diff.plus_equals(&Columnar::into_owned(diffs1.values.get(d1_lo))); + if !diff.is_zero() { output.values.push(&diff); } + output.bounds.push(output.values.len() as u64); } } } } + /// Increments `index` until just after the last element of `input` to satisfy `cmp`. + /// + /// The method assumes that `cmp` is monotonic, never becoming true once it is false. + /// If an `upper` is supplied, it acts as a constraint on the interval of `input` explored. #[inline(always)] - pub(crate) fn gallop(input: TC, range: &mut Range, mut cmp: impl FnMut(::Ref) -> bool) { + pub(crate) fn gallop(input: C, lower: &mut usize, upper: usize, mut cmp: impl FnMut(::Ref) -> bool) { // if empty input, or already >= element, return - if !Range::::is_empty(range) && cmp(input.get(range.start)) { + if *lower < upper && cmp(input.get(*lower)) { let mut step = 1; - while range.start + step < range.end && cmp(input.get(range.start + step)) { - range.start += step; + while *lower + step < upper && cmp(input.get(*lower + step)) { + *lower += step; step <<= 1; } step >>= 1; while step > 0 { - if range.start + step < range.end && cmp(input.get(range.start + step)) { - range.start += step; + if *lower + step < upper && cmp(input.get(*lower + step)) { + *lower += step; } step >>= 1; } - range.start += 1; + *lower += 1; + } + } + + /// A report we would expect to see in a sequence about two layers. + /// + /// A sequence of these reports reveal an ordered traversal of the keys + /// of two layers, with ranges exclusive to one, ranges exclusive to the + /// other, and individual elements (not ranges) common to both. + #[derive(Copy, Clone, Columnar, Debug)] + pub enum Report { + /// Range of indices in this input. + This(usize, usize), + /// Range of indices in that input. + That(usize, usize), + /// Matching indices in both inputs. + Both(usize, usize), + } + + pub struct ChainBuilder { + updates: Vec>, + } + + impl Default for ChainBuilder { fn default() -> Self { Self { updates: Default::default() } } } + + impl ChainBuilder { + fn push(&mut self, mut link: Updates) { + link = link.filter_zero(); + if link.len() > 0 { + if let Some(last) = self.updates.last_mut() { + if last.len() + link.len() < 2 * 64 * 1024 { + let mut build = crate::updates::UpdatesBuilder::new_from(std::mem::take(last)); + build.meld(&link); + *last = build.done(); + } + else { self.updates.push(link); } + + } + else { self.updates.push(link); } + } } + fn extend(&mut self, iter: impl IntoIterator>) { for link in iter { self.push(link); }} + fn done(self) -> Vec> { self.updates } } } @@ -748,7 +1172,7 @@ pub mod arrangement { } pub struct ValMirror { - current: Updates, + chunks: Vec>, } impl differential_dataflow::trace::Builder for ValMirror { type Time = U::Time; @@ -756,64 +1180,60 @@ pub mod arrangement { type Output = OrdValBatch>; fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self { - Self { current: Updates::default() } + Self { chunks: Vec::new() } } fn push(&mut self, chunk: &mut Self::Input) { - use columnar::Len; - let len = chunk.keys.values.len(); - if len > 0 { - self.current.extend_from_keys(chunk, 0..len); + if chunk.len() > 0 { + self.chunks.push(std::mem::take(chunk)); } } fn done(self, description: Description) -> Self::Output { - let mut chain = if self.current.len() > 0 { - vec![self.current] - } else { - vec![] - }; + let mut chain = self.chunks; Self::seal(&mut chain, description) } fn seal(chain: &mut Vec, description: Description) -> Self::Output { - if chain.len() == 0 { + use columnar::Len; + + // Meld sorted, consolidated chain entries in order. + // Pre-allocate to avoid reallocations during meld. + use columnar::{Borrow, Container}; + let mut updates = Updates::::default(); + updates.keys.reserve_for(chain.iter().map(|c| c.keys.borrow())); + updates.vals.reserve_for(chain.iter().map(|c| c.vals.borrow())); + updates.times.reserve_for(chain.iter().map(|c| c.times.borrow())); + updates.diffs.reserve_for(chain.iter().map(|c| c.diffs.borrow())); + let mut builder = crate::updates::UpdatesBuilder::new_from(updates); + for chunk in chain.iter() { + builder.meld(chunk); + } + let merged = builder.done(); + chain.clear(); + + let updates = Len::len(&merged.diffs.values); + if updates == 0 { let storage = OrdValStorage { keys: Default::default(), vals: Default::default(), upds: Default::default(), }; OrdValBatch { storage, description, updates: 0 } - } - else if chain.len() == 1 { - use columnar::Len; - let storage = chain.pop().unwrap(); - let updates = storage.diffs.values.len(); - let val_offs = strides_to_offset_list(&storage.vals.bounds, storage.keys.values.len()); - let time_offs = strides_to_offset_list(&storage.times.bounds, storage.vals.values.len()); + } else { + let val_offs = strides_to_offset_list(&merged.vals.bounds, Len::len(&merged.keys.values)); + let time_offs = strides_to_offset_list(&merged.times.bounds, Len::len(&merged.vals.values)); let storage = OrdValStorage { - keys: Coltainer { container: storage.keys.values }, + keys: Coltainer { container: merged.keys.values }, vals: Vals { offs: val_offs, - vals: Coltainer { container: storage.vals.values }, + vals: Coltainer { container: merged.vals.values }, }, upds: Upds { offs: time_offs, - times: Coltainer { container: storage.times.values }, - diffs: Coltainer { container: storage.diffs.values }, + times: Coltainer { container: merged.times.values }, + diffs: Coltainer { container: merged.diffs.values }, }, }; OrdValBatch { storage, description, updates } } - else { - use columnar::Len; - let mut merged = chain.remove(0); - for other in chain.drain(..) { - let len = other.keys.values.len(); - if len > 0 { - merged.extend_from_keys(&other, 0..len); - } - } - chain.push(merged); - Self::seal(chain, description) - } } } @@ -888,6 +1308,57 @@ pub mod updates { lower..upper } + /// A streaming consolidation iterator for sorted `(key, val, time, diff)` data. + /// + /// Accumulates diffs for equal `(key, val, time)` triples, yielding at most + /// one output per distinct triple, with a non-zero accumulated diff. + /// Input must be sorted by `(key, val, time)`. + pub struct Consolidating { + iter: std::iter::Peekable, + diff: D, + } + + impl Consolidating + where + K: Copy + Eq, + V: Copy + Eq, + T: Copy + Eq, + D: Semigroup + IsZero + Default, + I: Iterator, + { + pub fn new(iter: I) -> Self { + Self { iter: iter.peekable(), diff: D::default() } + } + } + + impl Iterator for Consolidating + where + K: Copy + Eq, + V: Copy + Eq, + T: Copy + Eq, + D: Semigroup + IsZero + Default + Clone, + I: Iterator, + { + type Item = (K, V, T, D); + fn next(&mut self) -> Option { + loop { + let (k, v, t, d) = self.iter.next()?; + self.diff = d; + while let Some(&(k2, v2, t2, _)) = self.iter.peek() { + if k2 == k && v2 == v && t2 == t { + let (_, _, _, d2) = self.iter.next().unwrap(); + self.diff.plus_equals(&d2); + } else { + break; + } + } + if !self.diff.is_zero() { + return Some((k, v, t, self.diff.clone())); + } + } + } + } + impl Updates { pub fn vals_bounds(&self, key_range: std::ops::Range) -> std::ops::Range { @@ -933,201 +1404,75 @@ pub mod updates { self.diffs.extend_from_self(other.diffs.borrow(), time_range); } - /// Forms a consolidated `Updates` from sorted `(key, val, time, diff)` refs. - /// - /// Tracks a `prev` reference to the previous element. On each new element, - /// compares against `prev` to detect key/val/time changes. Only pushes - /// accumulated diffs when they are nonzero, and only emits times/vals/keys - /// that have at least one nonzero diff beneath them. - pub fn form<'a>(mut sorted: impl Iterator>>) -> Self { - - let mut output = Self::default(); - let mut diff_stash = U::Diff::default(); - let mut diff_temp = U::Diff::default(); - - if let Some(first) = sorted.next() { - - let mut prev = first; - Columnar::copy_from(&mut diff_stash, prev.3); - - for curr in sorted { - let key_differs = ContainerOf::::reborrow_ref(curr.0) != ContainerOf::::reborrow_ref(prev.0); - let val_differs = key_differs || ContainerOf::::reborrow_ref(curr.1) != ContainerOf::::reborrow_ref(prev.1); - let time_differs = val_differs || ContainerOf::::reborrow_ref(curr.2) != ContainerOf::::reborrow_ref(prev.2); - - if time_differs { - // Flush the accumulated diff for prev's (key, val, time). - if !diff_stash.is_zero() { - // We have a real update to emit. Push time (and val/key - // if this is the first time under them). - let times_len = output.times.values.len(); - let vals_len = output.vals.values.len(); - - if val_differs { - // Seal the previous val's time list, if any times were emitted. - if times_len > 0 { - output.times.bounds.push(times_len as u64); - } - if key_differs { - // Seal the previous key's val list, if any vals were emitted. - if vals_len > 0 { - output.vals.bounds.push(vals_len as u64); - } - output.keys.values.push(prev.0); - } - output.vals.values.push(prev.1); - } - output.times.values.push(prev.2); - output.diffs.values.push(&diff_stash); - output.diffs.bounds.push(output.diffs.values.len() as u64); - } - Columnar::copy_from(&mut diff_stash, curr.3); - } else { - // Same (key, val, time): accumulate diff. - Columnar::copy_from(&mut diff_temp, curr.3); - diff_stash.plus_equals(&diff_temp); - } - prev = curr; - } - - // Flush the final accumulated diff. - if !diff_stash.is_zero() { - let keys_len = output.keys.values.len(); - let vals_len = output.vals.values.len(); - let times_len = output.times.values.len(); - let need_key = keys_len == 0 || ContainerOf::::reborrow_ref(prev.0) != output.keys.values.borrow().get(keys_len - 1); - let need_val = need_key || vals_len == 0 || ContainerOf::::reborrow_ref(prev.1) != output.vals.values.borrow().get(vals_len - 1); - - if need_val { - if times_len > 0 { - output.times.bounds.push(times_len as u64); - } - if need_key { - if vals_len > 0 { - output.vals.bounds.push(vals_len as u64); - } - output.keys.values.push(prev.0); - } - output.vals.values.push(prev.1); - } - output.times.values.push(prev.2); - output.diffs.values.push(&diff_stash); - output.diffs.bounds.push(output.diffs.values.len() as u64); - } - - // Seal the final groups at each level. - if !output.times.values.is_empty() { - output.times.bounds.push(output.times.values.len() as u64); - } - if !output.vals.values.is_empty() { - output.vals.bounds.push(output.vals.values.len() as u64); - } - if !output.keys.values.is_empty() { - output.keys.bounds.push(output.keys.values.len() as u64); - } - } - - output + /// Forms a consolidated `Updates` trie from unsorted `(key, val, time, diff)` refs. + pub fn form_unsorted<'a>(unsorted: impl Iterator>>) -> Self { + let mut data = unsorted.collect::>(); + data.sort(); + Self::form(data.into_iter()) } - /// Consolidates into canonical trie form: - /// single outer key list, all lists sorted and deduplicated, - /// diff lists are singletons (or absent if cancelled). - pub fn consolidate(self) -> Self { - - let Self { keys, vals, times, diffs } = self; - - let keys_b = keys.borrow(); - let vals_b = vals.borrow(); - let times_b = times.borrow(); - let diffs_b = diffs.borrow(); + /// Forms a consolidated `Updates` trie from sorted `(key, val, time, diff)` refs. + pub fn form<'a>(sorted: impl Iterator>>) -> Self { - // Flatten to index tuples: [key_abs, val_abs, time_abs, diff_abs]. - let mut tuples: Vec<[usize; 4]> = Vec::new(); - for outer in 0..Len::len(&keys_b) { - for k in child_range(keys_b.bounds, outer) { - for v in child_range(vals_b.bounds, k) { - for t in child_range(times_b.bounds, v) { - for d in child_range(diffs_b.bounds, t) { - tuples.push([k, v, t, d]); - } - } - } - } - } - - // Sort by (key, val, time). Diff is payload. - tuples.sort_by(|a, b| { - keys_b.values.get(a[0]).cmp(&keys_b.values.get(b[0])) - .then_with(|| vals_b.values.get(a[1]).cmp(&vals_b.values.get(b[1]))) - .then_with(|| times_b.values.get(a[2]).cmp(×_b.values.get(b[2]))) - }); + // Step 1: Streaming consolidation — accumulate diffs, drop zeros. + let consolidated = Consolidating::new( + sorted.map(|(k, v, t, d)| (k, v, t, ::into_owned(d))) + ); - // Build consolidated output, bottom-up cancellation. + // Step 2: Build the trie from consolidated, sorted, non-zero data. let mut output = Self::default(); - let mut diff_stash = U::Diff::default(); - let mut diff_temp = U::Diff::default(); - - let mut idx = 0; - while idx < tuples.len() { - let key_ref = keys_b.values.get(tuples[idx][0]); - let key_start_vals = output.vals.values.len(); - - // All entries with this key. - while idx < tuples.len() && keys_b.values.get(tuples[idx][0]) == key_ref { - let val_ref = vals_b.values.get(tuples[idx][1]); - let val_start_times = output.times.values.len(); - - // All entries with this (key, val). - while idx < tuples.len() - && keys_b.values.get(tuples[idx][0]) == key_ref - && vals_b.values.get(tuples[idx][1]) == val_ref - { - let time_ref = times_b.values.get(tuples[idx][2]); - - // Sum all diffs for this (key, val, time). - Columnar::copy_from(&mut diff_stash, diffs_b.values.get(tuples[idx][3])); - idx += 1; - while idx < tuples.len() - && keys_b.values.get(tuples[idx][0]) == key_ref - && vals_b.values.get(tuples[idx][1]) == val_ref - && times_b.values.get(tuples[idx][2]) == time_ref - { - Columnar::copy_from(&mut diff_temp, diffs_b.values.get(tuples[idx][3])); - diff_stash.plus_equals(&diff_temp); - idx += 1; - } - - // Emit time + singleton diff if nonzero. - if !diff_stash.is_zero() { - output.times.values.push(time_ref); - output.diffs.values.push(&diff_stash); - output.diffs.bounds.push(output.diffs.values.len() as u64); - } + let mut updates = consolidated; + if let Some((key, val, time, diff)) = updates.next() { + let mut prev = (key, val, time); + output.keys.values.push(key); + output.vals.values.push(val); + output.times.values.push(time); + output.diffs.values.push(&diff); + output.diffs.bounds.push(output.diffs.values.len() as u64); + + // As we proceed, seal up known complete runs. + for (key, val, time, diff) in updates { + + // If keys differ, record key and seal vals and times. + if key != prev.0 { + output.vals.bounds.push(output.vals.values.len() as u64); + output.times.bounds.push(output.times.values.len() as u64); + output.keys.values.push(key); + output.vals.values.push(val); } - - // Seal time list for this val; emit val if any times survived. - if output.times.values.len() > val_start_times { + // If vals differ, record val and seal times. + else if val != prev.1 { output.times.bounds.push(output.times.values.len() as u64); - output.vals.values.push(val_ref); + output.vals.values.push(val); + } + else { + // We better not find a duplicate time. + assert!(time != prev.2); } - } - // Seal val list for this key; emit key if any vals survived. - if output.vals.values.len() > key_start_vals { - output.vals.bounds.push(output.vals.values.len() as u64); - output.keys.values.push(key_ref); + // Always record (time, diff). + output.times.values.push(time); + output.diffs.values.push(&diff); + output.diffs.bounds.push(output.diffs.values.len() as u64); + + prev = (key, val, time); } - } - // Seal the single outer key list. - if !output.keys.values.is_empty() { + // Seal up open lists. output.keys.bounds.push(output.keys.values.len() as u64); + output.vals.bounds.push(output.vals.values.len() as u64); + output.times.bounds.push(output.times.values.len() as u64); } output } + /// Consolidates into canonical trie form: + /// single outer key list, all lists sorted and deduplicated, + /// diff lists are singletons (or absent if cancelled). + pub fn consolidate(self) -> Self { Self::form_unsorted(self.iter()) } + pub fn filter_zero(self) -> Self { Self::form(self.iter()) } + /// The number of leaf-level diff entries (total updates). pub fn len(&self) -> usize { self.diffs.values.len() } } @@ -1203,6 +1548,179 @@ pub mod updates { fn into_bytes(&self, _writer: &mut W) { unimplemented!() } } + /// An incremental trie builder that accepts sorted, consolidated `Updates` chunks + /// and melds them into a single `Updates` trie. + /// + /// The internal `Updates` has open (unsealed) bounds at the keys, vals, and times + /// levels — the last group at each level has its values pushed but no corresponding + /// bounds entry. `diffs.bounds` is always 1:1 with `times.values`. + /// + /// `meld` accepts a consolidated `Updates` whose first `(key, val, time)` is + /// strictly greater than the builder's last `(key, val, time)`. The key and val + /// may equal the builder's current open key/val, as long as the time is greater. + /// + /// `done` seals all open bounds and returns the completed `Updates`. + pub struct UpdatesBuilder { + /// Non-empty, consolidated updates. + updates: Updates, + } + + impl UpdatesBuilder { + /// Construct a new builder from consolidated, sealed updates. + /// + /// Unseals the last group at keys, vals, and times levels so that + /// subsequent `meld` calls can extend the open groups. + /// If the updates are not consolidated none of this works. + pub fn new_from(mut updates: Updates) -> Self { + use columnar::Len; + if Len::len(&updates.keys.values) > 0 { + updates.keys.bounds.pop(); + updates.vals.bounds.pop(); + updates.times.bounds.pop(); + } + Self { updates } + } + + /// Meld a sorted, consolidated `Updates` chunk into this builder. + /// + /// The chunk's first `(key, val, time)` must be strictly greater than + /// the builder's last `(key, val, time)`. Keys and vals may overlap + /// (continue the current group), but times must be strictly increasing + /// within the same `(key, val)`. + pub fn meld(&mut self, chunk: &Updates) { + use columnar::{Borrow, Index, Len}; + + if chunk.len() == 0 { return; } + + // Empty builder: clone the chunk and unseal it. + if Len::len(&self.updates.keys.values) == 0 { + self.updates = chunk.clone(); + self.updates.keys.bounds.pop(); + self.updates.vals.bounds.pop(); + self.updates.times.bounds.pop(); + return; + } + + // Pre-compute boundary comparisons before mutating. + let keys_match = { + let skb = self.updates.keys.values.borrow(); + let ckb = chunk.keys.values.borrow(); + skb.get(Len::len(&skb) - 1) == ckb.get(0) + }; + let vals_match = keys_match && { + let svb = self.updates.vals.values.borrow(); + let cvb = chunk.vals.values.borrow(); + svb.get(Len::len(&svb) - 1) == cvb.get(0) + }; + + let chunk_num_keys = Len::len(&chunk.keys.values); + let chunk_num_vals = Len::len(&chunk.vals.values); + let chunk_num_times = Len::len(&chunk.times.values); + + // Child ranges for the first element at each level of the chunk. + let first_key_vals = child_range(chunk.vals.borrow().bounds, 0); + let first_val_times = child_range(chunk.times.borrow().bounds, 0); + + // There is a first position where coordinates disagree. + // Strictly beyond that position: seal bounds, extend lists, re-open the last bound. + // At that position: meld the first list, extend subsequent lists, re-open. + let mut differ = false; + + // --- Keys --- + if keys_match { + // Skip the duplicate first key; add remaining keys. + if chunk_num_keys > 1 { + self.updates.keys.values.extend_from_self(chunk.keys.values.borrow(), 1..chunk_num_keys); + } + } else { + // All keys are new. + self.updates.keys.values.extend_from_self(chunk.keys.values.borrow(), 0..chunk_num_keys); + differ = true; + } + + // --- Vals --- + if differ { + // Keys differed: seal open val group, extend all val lists, unseal last. + self.updates.vals.bounds.push(Len::len(&self.updates.vals.values) as u64); + self.updates.vals.extend_from_self(chunk.vals.borrow(), 0..chunk_num_keys); + self.updates.vals.bounds.pop(); + } else { + // Keys matched: meld vals for the shared key. + if vals_match { + // Skip the duplicate first val; add remaining vals from the first key's list. + if first_key_vals.len() > 1 { + self.updates.vals.values.extend_from_self( + chunk.vals.values.borrow(), + (first_key_vals.start + 1)..first_key_vals.end, + ); + } + } else { + // First val differs: add all vals from the first key's list. + self.updates.vals.values.extend_from_self( + chunk.vals.values.borrow(), + first_key_vals.clone(), + ); + differ = true; + } + // Seal the matched key's val group, extend remaining keys' val lists, unseal. + if chunk_num_keys > 1 { + self.updates.vals.bounds.push(Len::len(&self.updates.vals.values) as u64); + self.updates.vals.extend_from_self(chunk.vals.borrow(), 1..chunk_num_keys); + self.updates.vals.bounds.pop(); + } + } + + // --- Times --- + if differ { + // Seal open time group, extend all time lists, unseal last. + self.updates.times.bounds.push(Len::len(&self.updates.times.values) as u64); + self.updates.times.extend_from_self(chunk.times.borrow(), 0..chunk_num_vals); + self.updates.times.bounds.pop(); + } else { + // Keys and vals matched. Times must be strictly greater (precondition), + // so we always set differ = true here. + debug_assert!({ + let stb = self.updates.times.values.borrow(); + let ctb = chunk.times.values.borrow(); + stb.get(Len::len(&stb) - 1) != ctb.get(0) + }, "meld: duplicate time within same (key, val)"); + // Add times from the first val's time list into the open group. + self.updates.times.values.extend_from_self( + chunk.times.values.borrow(), + first_val_times.clone(), + ); + differ = true; + // Seal the matched val's time group, extend remaining vals' time lists, unseal. + if chunk_num_vals > 1 { + self.updates.times.bounds.push(Len::len(&self.updates.times.values) as u64); + self.updates.times.extend_from_self(chunk.times.borrow(), 1..chunk_num_vals); + self.updates.times.bounds.pop(); + } + } + + // --- Diffs --- + // Diffs are always sealed (1:1 with times). By the precondition that + // times are strictly increasing for the same (key, val), differ is + // always true by this point — just extend all diff lists. + debug_assert!(differ); + self.updates.diffs.extend_from_self(chunk.diffs.borrow(), 0..chunk_num_times); + } + + /// Seal all open bounds and return the completed `Updates`. + pub fn done(mut self) -> Updates { + use columnar::Len; + if Len::len(&self.updates.keys.values) > 0 { + // Seal the open time group. + self.updates.times.bounds.push(Len::len(&self.updates.times.values) as u64); + // Seal the open val group. + self.updates.vals.bounds.push(Len::len(&self.updates.vals.values) as u64); + // Seal the outer key group. + self.updates.keys.bounds.push(Len::len(&self.updates.keys.values) as u64); + } + self.updates + } + } + #[cfg(test)] mod tests { use super::*; @@ -1347,11 +1865,13 @@ where .inner .unary::, _, _, _>(Pipeline, "JoinFunction", move |_, _| { move |input, output| { + let mut t1o = U::Time::default(); + let mut d1o = U::Diff::default(); input.for_each(|time, data| { let mut session = output.session_with_builder(&time); for (k1, v1, t1, d1) in data.updates.iter() { - let t1o: U::Time = Columnar::into_owned(t1); - let d1o: U::Diff = Columnar::into_owned(d1); + Columnar::copy_from(&mut t1o, t1); + Columnar::copy_from(&mut d1o, d1); for (k2, v2, t2, d2) in logic(k1, v1, t1, d1) { let t3 = t2.join(&t1o); let d3 = d2.multiply(&d1o); @@ -1413,6 +1933,7 @@ where builder.build(move |_capability| { let mut col_builder = ValColBuilder::<(K, V, DynTime, R)>::default(); + let mut time = DynTime::default(); move |_frontier| { let mut output = output.activate(); op_input.for_each(|cap, data| { @@ -1429,7 +1950,7 @@ where // that accepts pre-sorted, potentially-collapsing timestamps // could avoid the re-sort inside the builder. for (k, v, t, d) in data.updates.iter() { - let mut time: DynTime = Columnar::into_owned(t); + Columnar::copy_from(&mut time, t); let mut inner_vec = std::mem::take(&mut time.inner).into_inner(); inner_vec.truncate(level - 1); time.inner = PointStamp::new(inner_vec); diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index fe3622d74..77834ab42 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -23,6 +23,8 @@ use crate::trace::implementations::containers::BatchContainer; use crate::trace::implementations::merge_batcher::container::InternalMerge; use crate::trace::TraceReader; +// TODO: Remove the InternalMerge constraint on Bu::Input. It only needs Clear. + /// A key-wise reduction of values in an input trace. /// /// This method exists to provide reduce functionality without opinions about qualifying trace types.