Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ ureq = "2.12"
sqlparser = "0.45"
jsonschema = "0.18"
actix-ws = "0.3"
dashmap = "6"

[dev-dependencies]
tempfile = "3.24"
Expand Down
3 changes: 1 addition & 2 deletions src/core/engine/compaction.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::core::engine::EngineOptions;
use crate::core::iterators::{MergeIterator, StorageIterator};
use crate::core::key::KeySlice;
use crate::core::log_record::{LogRecord, RangeTombstone};
use crate::core::table::Table;
use crate::infra::config::StorageConfig;
Expand Down Expand Up @@ -118,7 +117,7 @@ fn execute_compaction(
// IMPORTANT: Iterate tables in REVERSE order (newest first) so that
// the MergeIterator's "lower index wins" rule correctly picks the
// newest value when duplicate keys exist across tables.
let mut iters: Vec<Box<dyn StorageIterator<KeyType = KeySlice<'_>> + '_>> = Vec::new();
let mut iters: Vec<Box<dyn StorageIterator<KeyType = Vec<u8>> + '_>> = Vec::new();
for table in tables.iter().rev() {
iters.push(Box::new(table.iter()));
}
Expand Down
25 changes: 14 additions & 11 deletions src/core/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use self::compaction::{Compaction, CompactionMetrics, CompactionOptions, Compact

use self::version_set::VersionSet;
use crate::core::iterators::{MergeIterator, StorageIterator};
use crate::core::key::KeySlice;
use crate::core::memtable::MemTable;

pub const DEFAULT_SCAN_LIMIT: usize = 128;
Expand Down Expand Up @@ -1264,14 +1263,12 @@ impl<C: Cache> Engine<C> {
) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
let start = std::time::Instant::now();
let core = self.core.read();
let mut iters: Vec<Box<dyn StorageIterator<KeyType = KeySlice<'_>> + '_>> = Vec::new();
let mut iters: Vec<Box<dyn StorageIterator<KeyType = Vec<u8>> + '_>> = Vec::new();

// 1. Memtables (newer first)
if let Some(memtables) = core.memtables().get(cf) {
for mem in memtables.iter().rev() {
iters.push(Box::new(crate::storage::iterator::MemTableIterator::new(
&mem.data,
)));
iters.push(Box::new(mem.iter()));
}
}

Expand Down Expand Up @@ -1436,13 +1433,11 @@ impl<C: Cache> Engine<C> {
pub fn keys(&self) -> Result<Vec<Vec<u8>>> {
let start = std::time::Instant::now();
let core = self.core.read();
let mut iters: Vec<Box<dyn StorageIterator<KeyType = KeySlice<'_>> + '_>> = Vec::new();
let mut iters: Vec<Box<dyn StorageIterator<KeyType = Vec<u8>> + '_>> = Vec::new();

if let Some(memtables) = core.memtables().get("default") {
for mem in memtables.iter().rev() {
iters.push(Box::new(crate::storage::iterator::MemTableIterator::new(
&mem.data,
)));
iters.push(Box::new(mem.iter()));
}
}

Expand Down Expand Up @@ -1473,7 +1468,7 @@ impl<C: Cache> Engine<C> {
let start = std::time::Instant::now();
let core = self.core.read();
let mut count = 0;
let mut iters: Vec<Box<dyn StorageIterator<KeyType = KeySlice<'_>> + '_>> = Vec::new();
let mut iters: Vec<Box<dyn StorageIterator<KeyType = Vec<u8>> + '_>> = Vec::new();

if let Some(memtables) = core.memtables().get("default") {
for mem in memtables.iter().rev() {
Expand Down Expand Up @@ -1569,7 +1564,15 @@ impl<C: Cache> Engine<C> {
timestamp,
&self.options.encryption,
)?;
for (key, record) in mem.data.iter() {
// Collect entries into a sorted vec because DashMap
// iteration order is arbitrary (shard-based).
let mut sorted: Vec<_> = mem
.data
.iter()
.map(|e| (e.key().clone(), e.value().clone()))
.collect();
sorted.sort_by(|a, b| a.0.cmp(&b.0));
for (key, record) in &sorted {
if record.is_expired_at(now) {
continue;
}
Expand Down
63 changes: 48 additions & 15 deletions src/core/memtable.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::core::log_record::{LogRecord, RangeTombstone};
use crate::storage::iterator::MemTableIterator;
use std::collections::BTreeMap;
use dashmap::DashMap;

pub struct MemTable {
pub(crate) data: BTreeMap<Vec<u8>, LogRecord>,
pub(crate) data: DashMap<Vec<u8>, LogRecord>,
pub(crate) size_bytes: usize,
pub(crate) max_size_bytes: usize,
/// Active range tombstones that apply to this memtable's data.
Expand All @@ -17,7 +17,7 @@ impl MemTable {
/// `should_flush()` always returns `false`.
pub fn new(max_size_bytes: usize) -> Self {
Self {
data: BTreeMap::new(),
data: DashMap::new(),
size_bytes: 0,
max_size_bytes,
range_tombstones: Vec::new(),
Expand Down Expand Up @@ -45,25 +45,21 @@ impl MemTable {

pub fn insert(&mut self, record: LogRecord) {
let record_size = Self::estimate_size(&record);
if let Some(old_record) = self.data.insert(record.key.clone(), record) {
let key = record.key.clone();
if let Some(old_record) = self.data.insert(key, record) {
self.size_bytes = self
.size_bytes
.saturating_sub(Self::estimate_size(&old_record));
}
self.size_bytes += record_size;
self.size_bytes = self.size_bytes.saturating_add(record_size);
}

pub fn should_flush(&self) -> bool {
self.size_bytes >= self.max_size_bytes
}

pub fn get(&self, key: &[u8]) -> Option<LogRecord> {
self.data.get(key).cloned()
}

/// Returns a StorageIterator over all entries (backward compatible)
pub fn iter_ordered(&self) -> impl Iterator<Item = (&Vec<u8>, &LogRecord)> {
self.data.iter()
self.data.get(key).map(|r| r.clone())
}

/// Returns a MemTableIterator starting from the beginning
Expand All @@ -78,8 +74,15 @@ impl MemTable {
/// iter.next();
/// }
/// ```
pub fn iter(&self) -> MemTableIterator<'_> {
MemTableIterator::new(&self.data)
pub fn iter(&self) -> MemTableIterator {
// Collect a snapshot sorted by key (DashMap does not guarantee order)
let mut entries: Vec<(Vec<u8>, LogRecord)> = self
.data
.iter()
.map(|e| (e.key().clone(), e.value().clone()))
.collect();
entries.sort_by(|a, b| a.0.cmp(&b.0));
MemTableIterator::new(entries)
}

/// Returns a MemTableIterator starting from a specific key
Expand All @@ -95,15 +98,33 @@ impl MemTable {
/// iter.next();
/// }
/// ```
pub fn iter_from(&self, start_key: &[u8]) -> MemTableIterator<'_> {
MemTableIterator::new_from(&self.data, start_key)
pub fn iter_from(&self, start_key: &[u8]) -> MemTableIterator {
// Collect a snapshot filtered by start_key, then sort
let mut entries: Vec<(Vec<u8>, LogRecord)> = self
.data
.iter()
.filter(|e| e.key().as_slice() >= start_key)
.map(|e| (e.key().clone(), e.value().clone()))
.collect();
entries.sort_by(|a, b| a.0.cmp(&b.0));
MemTableIterator::new(entries)
}

/// Add a range tombstone covering [start, end).
pub fn add_range_tombstone(&mut self, range: RangeTombstone) {
self.range_tombstones.push(range);
}

pub fn iter_ordered(&self) -> impl Iterator<Item = (Vec<u8>, LogRecord)> + '_ {
let mut entries: Vec<_> = self
.data
.iter()
.map(|e| (e.key().clone(), e.value().clone()))
.collect();
entries.sort_by(|a, b| a.0.cmp(&b.0));
entries.into_iter()
}

/// Check if a key falls within any active range tombstone.
///
/// Returns `true` if the key is covered by any range tombstone
Expand All @@ -122,6 +143,18 @@ impl MemTable {
count
}

pub fn len(&self) -> usize {
self.data.len()
}

pub fn is_empty(&self) -> bool {
self.data.is_empty()
}

pub fn size(&self) -> usize {
self.size_bytes
}

fn estimate_size(record: &LogRecord) -> usize {
// Base overhead: timestamp(16) + is_deleted(1) + column_family tag(1) +
// expires_at tag(1) + expires_at data(16) + misc(16) = ~51
Expand Down
8 changes: 4 additions & 4 deletions src/core/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,15 +222,15 @@ impl<'a> TableIterator<'a> {
}

impl<'a> crate::core::iterators::StorageIterator for TableIterator<'a> {
type KeyType = crate::core::key::KeySlice<'a>;
type KeyType = Vec<u8>;

fn next(&mut self) {
self.current = self.inner.next();
}
fn key(&self) -> Self::KeyType {
match self.current {
Some((k, _)) => crate::core::key::KeySlice::new(k.as_slice()),
None => crate::core::key::KeySlice::new(&[]), // Caller should check is_valid() first
Some((k, _)) => k.clone(),
None => Vec::new(), // Caller should check is_valid() first
}
}
fn value(&self) -> &[u8] {
Expand All @@ -244,7 +244,7 @@ impl<'a> crate::core::iterators::StorageIterator for TableIterator<'a> {
}
fn seek(&mut self, _key: &[u8]) {
// Not strictly required for now, but good to have
while self.is_valid() && self.key().as_ref() < _key {
while self.is_valid() && self.key().as_slice() < _key {
self.next();
}
}
Expand Down
Loading
Loading