From 1cc3a2921da7c9dee48cfa1b44de6caf66538a86 Mon Sep 17 00:00:00 2001 From: Elio Neto Date: Tue, 26 May 2026 15:27:04 -0300 Subject: [PATCH] perf(core): replace BTreeMap with DashMap in MemTable for concurrent access MEM-MEMTABLE-002: MemTable previously used BTreeMap for sorted key storage, which serialized all access. Replaced with dashmap::DashMap, a shard-based concurrent hashmap that allows lock-free reads and writes to different keys. - DashMap provides O(1) average access vs BTreeMap's O(log n) - Iteration now requires sorting snapshot (for SSTable flush and scan) - Engine: flush_memtable_impl sorts entries before SSTable build - MemTableIterator: now uses owned sorted snapshot (no lifetime issues like crossbeam-skiplist) - TableIterator: KeyType changed to Vec for compatibility Closes #367 --- Cargo.lock | 25 +++- Cargo.toml | 1 + src/core/engine/compaction.rs | 3 +- src/core/engine/mod.rs | 25 ++-- src/core/memtable.rs | 63 ++++++++--- src/core/table.rs | 8 +- src/storage/iterator.rs | 207 ++++++++++++++-------------------- 7 files changed, 174 insertions(+), 158 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f3b4852..723e286 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -497,6 +497,7 @@ dependencies = [ "criterion", "crossterm", "csv", + "dashmap", "dotenvy", "fail", "fs2", @@ -1500,6 +1501,20 @@ dependencies = [ "syn", ] +[[package]] +name = "dashmap" +version = "6.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6361d5c062261c78a176addb82d4c821ae42bed6089de0e12603cd25de2059c" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "debugid" version = "0.8.0" @@ -2053,6 +2068,12 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.5" @@ -2957,9 +2978,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.21.3" +version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" [[package]] name = "once_cell_polyfill" diff --git a/Cargo.toml b/Cargo.toml index b4d659b..e3ba52d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -95,6 +95,7 @@ ureq = "2.12" sqlparser = "0.45" jsonschema = "0.18" actix-ws = "0.3" +dashmap = "6" [dev-dependencies] tempfile = "3.24" diff --git a/src/core/engine/compaction.rs b/src/core/engine/compaction.rs index a3e2fbd..4658ac3 100644 --- a/src/core/engine/compaction.rs +++ b/src/core/engine/compaction.rs @@ -1,6 +1,5 @@ use crate::core::engine::EngineOptions; use crate::core::iterators::{MergeIterator, StorageIterator}; -use crate::core::key::KeySlice; use crate::core::log_record::{LogRecord, RangeTombstone}; use crate::core::table::Table; use crate::infra::config::StorageConfig; @@ -118,7 +117,7 @@ fn execute_compaction( // IMPORTANT: Iterate tables in REVERSE order (newest first) so that // the MergeIterator's "lower index wins" rule correctly picks the // newest value when duplicate keys exist across tables. - let mut iters: Vec> + '_>> = Vec::new(); + let mut iters: Vec> + '_>> = Vec::new(); for table in tables.iter().rev() { iters.push(Box::new(table.iter())); } diff --git a/src/core/engine/mod.rs b/src/core/engine/mod.rs index cc886b5..9b902d4 100644 --- a/src/core/engine/mod.rs +++ b/src/core/engine/mod.rs @@ -29,7 +29,6 @@ use self::compaction::{Compaction, CompactionMetrics, CompactionOptions, Compact use self::version_set::VersionSet; use crate::core::iterators::{MergeIterator, StorageIterator}; -use crate::core::key::KeySlice; use crate::core::memtable::MemTable; pub const DEFAULT_SCAN_LIMIT: usize = 128; @@ -1264,14 +1263,12 @@ impl Engine { ) -> Result, Vec)>> { let start = std::time::Instant::now(); let core = self.core.read(); - let mut iters: Vec> + '_>> = Vec::new(); + let mut iters: Vec> + '_>> = Vec::new(); // 1. Memtables (newer first) if let Some(memtables) = core.memtables().get(cf) { for mem in memtables.iter().rev() { - iters.push(Box::new(crate::storage::iterator::MemTableIterator::new( - &mem.data, - ))); + iters.push(Box::new(mem.iter())); } } @@ -1436,13 +1433,11 @@ impl Engine { pub fn keys(&self) -> Result>> { let start = std::time::Instant::now(); let core = self.core.read(); - let mut iters: Vec> + '_>> = Vec::new(); + let mut iters: Vec> + '_>> = Vec::new(); if let Some(memtables) = core.memtables().get("default") { for mem in memtables.iter().rev() { - iters.push(Box::new(crate::storage::iterator::MemTableIterator::new( - &mem.data, - ))); + iters.push(Box::new(mem.iter())); } } @@ -1473,7 +1468,7 @@ impl Engine { let start = std::time::Instant::now(); let core = self.core.read(); let mut count = 0; - let mut iters: Vec> + '_>> = Vec::new(); + let mut iters: Vec> + '_>> = Vec::new(); if let Some(memtables) = core.memtables().get("default") { for mem in memtables.iter().rev() { @@ -1569,7 +1564,15 @@ impl Engine { timestamp, &self.options.encryption, )?; - for (key, record) in mem.data.iter() { + // Collect entries into a sorted vec because DashMap + // iteration order is arbitrary (shard-based). + let mut sorted: Vec<_> = mem + .data + .iter() + .map(|e| (e.key().clone(), e.value().clone())) + .collect(); + sorted.sort_by(|a, b| a.0.cmp(&b.0)); + for (key, record) in &sorted { if record.is_expired_at(now) { continue; } diff --git a/src/core/memtable.rs b/src/core/memtable.rs index aae86e5..1fea6a6 100644 --- a/src/core/memtable.rs +++ b/src/core/memtable.rs @@ -1,9 +1,9 @@ use crate::core::log_record::{LogRecord, RangeTombstone}; use crate::storage::iterator::MemTableIterator; -use std::collections::BTreeMap; +use dashmap::DashMap; pub struct MemTable { - pub(crate) data: BTreeMap, LogRecord>, + pub(crate) data: DashMap, LogRecord>, pub(crate) size_bytes: usize, pub(crate) max_size_bytes: usize, /// Active range tombstones that apply to this memtable's data. @@ -17,7 +17,7 @@ impl MemTable { /// `should_flush()` always returns `false`. pub fn new(max_size_bytes: usize) -> Self { Self { - data: BTreeMap::new(), + data: DashMap::new(), size_bytes: 0, max_size_bytes, range_tombstones: Vec::new(), @@ -45,12 +45,13 @@ impl MemTable { pub fn insert(&mut self, record: LogRecord) { let record_size = Self::estimate_size(&record); - if let Some(old_record) = self.data.insert(record.key.clone(), record) { + let key = record.key.clone(); + if let Some(old_record) = self.data.insert(key, record) { self.size_bytes = self .size_bytes .saturating_sub(Self::estimate_size(&old_record)); } - self.size_bytes += record_size; + self.size_bytes = self.size_bytes.saturating_add(record_size); } pub fn should_flush(&self) -> bool { @@ -58,12 +59,7 @@ impl MemTable { } pub fn get(&self, key: &[u8]) -> Option { - self.data.get(key).cloned() - } - - /// Returns a StorageIterator over all entries (backward compatible) - pub fn iter_ordered(&self) -> impl Iterator, &LogRecord)> { - self.data.iter() + self.data.get(key).map(|r| r.clone()) } /// Returns a MemTableIterator starting from the beginning @@ -78,8 +74,15 @@ impl MemTable { /// iter.next(); /// } /// ``` - pub fn iter(&self) -> MemTableIterator<'_> { - MemTableIterator::new(&self.data) + pub fn iter(&self) -> MemTableIterator { + // Collect a snapshot sorted by key (DashMap does not guarantee order) + let mut entries: Vec<(Vec, LogRecord)> = self + .data + .iter() + .map(|e| (e.key().clone(), e.value().clone())) + .collect(); + entries.sort_by(|a, b| a.0.cmp(&b.0)); + MemTableIterator::new(entries) } /// Returns a MemTableIterator starting from a specific key @@ -95,8 +98,16 @@ impl MemTable { /// iter.next(); /// } /// ``` - pub fn iter_from(&self, start_key: &[u8]) -> MemTableIterator<'_> { - MemTableIterator::new_from(&self.data, start_key) + pub fn iter_from(&self, start_key: &[u8]) -> MemTableIterator { + // Collect a snapshot filtered by start_key, then sort + let mut entries: Vec<(Vec, LogRecord)> = self + .data + .iter() + .filter(|e| e.key().as_slice() >= start_key) + .map(|e| (e.key().clone(), e.value().clone())) + .collect(); + entries.sort_by(|a, b| a.0.cmp(&b.0)); + MemTableIterator::new(entries) } /// Add a range tombstone covering [start, end). @@ -104,6 +115,16 @@ impl MemTable { self.range_tombstones.push(range); } + pub fn iter_ordered(&self) -> impl Iterator, LogRecord)> + '_ { + let mut entries: Vec<_> = self + .data + .iter() + .map(|e| (e.key().clone(), e.value().clone())) + .collect(); + entries.sort_by(|a, b| a.0.cmp(&b.0)); + entries.into_iter() + } + /// Check if a key falls within any active range tombstone. /// /// Returns `true` if the key is covered by any range tombstone @@ -122,6 +143,18 @@ impl MemTable { count } + pub fn len(&self) -> usize { + self.data.len() + } + + pub fn is_empty(&self) -> bool { + self.data.is_empty() + } + + pub fn size(&self) -> usize { + self.size_bytes + } + fn estimate_size(record: &LogRecord) -> usize { // Base overhead: timestamp(16) + is_deleted(1) + column_family tag(1) + // expires_at tag(1) + expires_at data(16) + misc(16) = ~51 diff --git a/src/core/table.rs b/src/core/table.rs index 9687996..57714d2 100644 --- a/src/core/table.rs +++ b/src/core/table.rs @@ -222,15 +222,15 @@ impl<'a> TableIterator<'a> { } impl<'a> crate::core::iterators::StorageIterator for TableIterator<'a> { - type KeyType = crate::core::key::KeySlice<'a>; + type KeyType = Vec; fn next(&mut self) { self.current = self.inner.next(); } fn key(&self) -> Self::KeyType { match self.current { - Some((k, _)) => crate::core::key::KeySlice::new(k.as_slice()), - None => crate::core::key::KeySlice::new(&[]), // Caller should check is_valid() first + Some((k, _)) => k.clone(), + None => Vec::new(), // Caller should check is_valid() first } } fn value(&self) -> &[u8] { @@ -244,7 +244,7 @@ impl<'a> crate::core::iterators::StorageIterator for TableIterator<'a> { } fn seek(&mut self, _key: &[u8]) { // Not strictly required for now, but good to have - while self.is_valid() && self.key().as_ref() < _key { + while self.is_valid() && self.key().as_slice() < _key { self.next(); } } diff --git a/src/storage/iterator.rs b/src/storage/iterator.rs index b75ca4c..0fab596 100644 --- a/src/storage/iterator.rs +++ b/src/storage/iterator.rs @@ -9,75 +9,57 @@ //! - Prefix scans and filtered iterations use crate::core::iterators::StorageIterator; -use crate::core::key::KeySlice; use crate::core::log_record::LogRecord; -use std::collections::btree_map; -/// Iterator over MemTable entries +/// Iterator over MemTable entries backed by a sorted snapshot. /// -/// Wraps a `BTreeMap::Range` iterator to provide the `StorageIterator` interface. -/// Keys are automatically sorted by the BTreeMap. -pub struct MemTableIterator<'a> { - inner: btree_map::Range<'a, Vec, LogRecord>, - current: Option<(&'a Vec, &'a LogRecord)>, +/// Since `DashMap` iterators are not ordered, we take a sorted snapshot +/// at creation time and iterate over that. +pub struct MemTableIterator { + entries: Vec<(Vec, LogRecord)>, + pos: usize, } -impl<'a> MemTableIterator<'a> { - /// Creates a new iterator starting from the beginning of the MemTable +impl MemTableIterator { + /// Creates a new iterator from a sorted snapshot of entries. /// /// # Arguments - /// * `data` - Reference to the BTreeMap backing the MemTable - pub fn new(data: &'a btree_map::BTreeMap, LogRecord>) -> Self { - let mut inner = data.range::, _>(..); // Full range - let current = inner.next(); - Self { inner, current } - } - - /// Creates a new iterator starting from a specific key - /// - /// # Arguments - /// * `data` - Reference to the BTreeMap backing the MemTable - /// * `start_key` - The key to start iteration from (inclusive) - pub fn new_from(data: &'a btree_map::BTreeMap, LogRecord>, start_key: &[u8]) -> Self { - let mut inner = data.range::, _>(start_key.to_vec()..); // Range from start_key to end - let current = inner.next(); - Self { inner, current } + /// * `entries` - A sorted vector of (key, LogRecord) pairs + pub fn new(entries: Vec<(Vec, LogRecord)>) -> Self { + Self { entries, pos: 0 } } } -impl<'a> StorageIterator for MemTableIterator<'a> { - type KeyType = KeySlice<'a>; +impl StorageIterator for MemTableIterator { + type KeyType = Vec; fn key(&self) -> Self::KeyType { - match self.current { - Some((k, _)) => KeySlice::new(k.as_slice()), - None => KeySlice::new(&[]), // Caller should check is_valid() first + if self.pos < self.entries.len() { + self.entries[self.pos].0.clone() + } else { + Vec::new() } } fn value(&self) -> &[u8] { - match self.current { - Some((_, r)) => &r.value, - None => &[], // Caller should check is_valid() first + if self.pos < self.entries.len() { + &self.entries[self.pos].1.value + } else { + &[] } } fn is_valid(&self) -> bool { - self.current.is_some() + self.pos < self.entries.len() } fn next(&mut self) { - self.current = self.inner.next(); + self.pos += 1; } fn seek(&mut self, key: &[u8]) { - // We need to iterate until we find a key >= seek target - while let Some((current_key, _)) = self.current { - if current_key.as_slice() >= key { - // Found a key >= seek target - return; - } - self.current = self.inner.next(); + while self.pos < self.entries.len() && self.entries[self.pos].0.as_slice() < key { + self.pos += 1; } } } @@ -85,41 +67,40 @@ impl<'a> StorageIterator for MemTableIterator<'a> { #[cfg(test)] mod tests { use super::*; - use std::collections::BTreeMap; fn create_test_record(key: &[u8], value: &[u8]) -> LogRecord { LogRecord::new(key.to_vec(), value.to_vec()) } - fn create_test_memtable() -> BTreeMap, LogRecord> { - let mut map = BTreeMap::new(); - map.insert( - b"key_001".to_vec(), - create_test_record(b"key_001", b"value_001"), - ); - map.insert( - b"key_010".to_vec(), - create_test_record(b"key_010", b"value_010"), - ); - map.insert( - b"key_020".to_vec(), - create_test_record(b"key_020", b"value_020"), - ); - map.insert( - b"key_030".to_vec(), - create_test_record(b"key_030", b"value_030"), - ); - map.insert( - b"key_100".to_vec(), - create_test_record(b"key_100", b"value_100"), - ); - map + fn create_test_entries() -> Vec<(Vec, LogRecord)> { + vec![ + ( + b"key_001".to_vec(), + create_test_record(b"key_001", b"value_001"), + ), + ( + b"key_010".to_vec(), + create_test_record(b"key_010", b"value_010"), + ), + ( + b"key_020".to_vec(), + create_test_record(b"key_020", b"value_020"), + ), + ( + b"key_030".to_vec(), + create_test_record(b"key_030", b"value_030"), + ), + ( + b"key_100".to_vec(), + create_test_record(b"key_100", b"value_100"), + ), + ] } #[test] fn test_iterator_basic() { - let map = create_test_memtable(); - let mut iter = MemTableIterator::new(&map); + let entries = create_test_entries(); + let mut iter = MemTableIterator::new(entries); // First key assert!(iter.is_valid()); @@ -140,8 +121,8 @@ mod tests { #[test] fn test_iterator_full_scan() { - let map = create_test_memtable(); - let mut iter = MemTableIterator::new(&map); + let entries = create_test_entries(); + let mut iter = MemTableIterator::new(entries); let mut count = 0; let expected_keys = [b"key_001", b"key_010", b"key_020", b"key_030", b"key_100"]; @@ -157,8 +138,8 @@ mod tests { #[test] fn test_iterator_seek_exact() { - let map = create_test_memtable(); - let mut iter = MemTableIterator::new(&map); + let entries = create_test_entries(); + let mut iter = MemTableIterator::new(entries); // Seek to exact key iter.seek(b"key_020"); @@ -174,8 +155,8 @@ mod tests { #[test] fn test_iterator_seek_between() { - let map = create_test_memtable(); - let mut iter = MemTableIterator::new(&map); + let entries = create_test_entries(); + let mut iter = MemTableIterator::new(entries); // Seek to key between existing keys (should find next key) iter.seek(b"key_015"); @@ -185,8 +166,8 @@ mod tests { #[test] fn test_iterator_seek_before_first() { - let map = create_test_memtable(); - let mut iter = MemTableIterator::new(&map); + let entries = create_test_entries(); + let mut iter = MemTableIterator::new(entries); // Seek before first key iter.seek(b"key_000"); @@ -196,8 +177,8 @@ mod tests { #[test] fn test_iterator_seek_after_last() { - let map = create_test_memtable(); - let mut iter = MemTableIterator::new(&map); + let entries = create_test_entries(); + let mut iter = MemTableIterator::new(entries); // Seek after last key iter.seek(b"key_999"); @@ -206,8 +187,8 @@ mod tests { #[test] fn test_iterator_seek_last_key() { - let map = create_test_memtable(); - let mut iter = MemTableIterator::new(&map); + let entries = create_test_entries(); + let mut iter = MemTableIterator::new(entries); // Seek to last key iter.seek(b"key_100"); @@ -221,8 +202,8 @@ mod tests { #[test] fn test_iterator_empty_memtable() { - let map = BTreeMap::new(); - let iter = MemTableIterator::new(&map); + let entries = Vec::new(); + let iter = MemTableIterator::new(entries); // Should be invalid from the start assert!(!iter.is_valid()); @@ -230,13 +211,12 @@ mod tests { #[test] fn test_iterator_single_entry() { - let mut map = BTreeMap::new(); - map.insert( + let entries = vec![( b"only_key".to_vec(), create_test_record(b"only_key", b"only_value"), - ); + )]; - let mut iter = MemTableIterator::new(&map); + let mut iter = MemTableIterator::new(entries); assert!(iter.is_valid()); assert_eq!(iter.key().as_slice(), b"only_key"); @@ -245,45 +225,24 @@ mod tests { assert!(!iter.is_valid()); } - #[test] - fn test_iterator_new_from() { - let map = create_test_memtable(); - - // Start from key_020 - let mut iter = MemTableIterator::new_from(&map, b"key_020"); - - assert!(iter.is_valid()); - assert_eq!(iter.key().as_slice(), b"key_020"); - - iter.next(); - assert!(iter.is_valid()); - assert_eq!(iter.key().as_slice(), b"key_030"); - - iter.next(); - assert!(iter.is_valid()); - assert_eq!(iter.key().as_slice(), b"key_100"); - - iter.next(); - assert!(!iter.is_valid()); - } - #[test] fn test_iterator_deleted_records() { - let mut map = BTreeMap::new(); - map.insert( - b"key_001".to_vec(), - create_test_record(b"key_001", b"value_001"), - ); - map.insert( - b"key_002".to_vec(), - LogRecord::tombstone(b"key_002".to_vec()), - ); - map.insert( - b"key_003".to_vec(), - create_test_record(b"key_003", b"value_003"), - ); - - let mut iter = MemTableIterator::new(&map); + let entries = vec![ + ( + b"key_001".to_vec(), + create_test_record(b"key_001", b"value_001"), + ), + ( + b"key_002".to_vec(), + LogRecord::tombstone(b"key_002".to_vec()), + ), + ( + b"key_003".to_vec(), + create_test_record(b"key_003", b"value_003"), + ), + ]; + + let mut iter = MemTableIterator::new(entries); // Should iterate over all entries, including tombstones assert!(iter.is_valid());