From 06f9c2e1d1448c3bb9e6ea43b1b070219a26f823 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 7 Jan 2026 12:05:35 +0100 Subject: [PATCH 01/21] Append commit instead of individual transactions to commitlog This moves the following responsibilities to the datastore: - maintenance of the transaction offset - deciding how many transactions are in a commit --- crates/commitlog/src/commitlog.rs | 198 +++++++------ crates/commitlog/src/lib.rs | 74 +---- crates/commitlog/src/repo/mod.rs | 4 - crates/commitlog/src/segment.rs | 262 ++++++------------ crates/commitlog/src/tests/helpers.rs | 25 +- crates/commitlog/src/tests/partial.rs | 174 +----------- crates/commitlog/tests/random_payload/mod.rs | 23 +- crates/commitlog/tests/streaming/mod.rs | 7 +- crates/core/src/db/durability.rs | 22 +- crates/core/src/db/relational_db.rs | 13 +- .../subscription/module_subscription_actor.rs | 11 +- crates/durability/src/imp/local.rs | 73 +---- crates/durability/src/imp/mod.rs | 4 +- crates/durability/src/lib.rs | 14 +- 14 files changed, 254 insertions(+), 650 deletions(-) diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index 03c590e4950..03d2bb0ef38 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -118,7 +118,7 @@ impl Generic { /// [`io::ErrorKind::InvalidInput`] is returned. /// /// Also see [`Self::commit`]. - pub fn set_epoch(&mut self, epoch: u64) -> io::Result> { + pub fn set_epoch(&mut self, epoch: u64) -> io::Result<()> { use std::cmp::Ordering::*; match epoch.cmp(&self.head.epoch()) { @@ -126,58 +126,15 @@ impl Generic { io::ErrorKind::InvalidInput, "new epoch is smaller than current epoch", )), - Equal => Ok(None), + Equal => Ok(()), Greater => { - let res = self.commit()?; + self.flush()?; self.head.set_epoch(epoch); - Ok(res) + Ok(()) } } } - /// Write the currently buffered data to storage and rotate segments as - /// necessary. - /// - /// Note that this does not imply that the data is durable, in particular - /// when a filesystem storage backend is used. Call [`Self::sync`] to flush - /// any OS buffers to stable storage. - /// - /// # Errors - /// - /// If an error occurs writing the data, the current [`Commit`] buffer is - /// retained, but a new segment is created. Retrying in case of an `Err` - /// return value thus will write the current data to that new segment. - /// - /// If this fails, however, the next attempt to create a new segment will - /// fail with [`io::ErrorKind::AlreadyExists`]. Encountering this error kind - /// this means that something is seriously wrong underlying storage, and the - /// caller should stop writing to the log. - pub fn commit(&mut self) -> io::Result> { - self.panicked = true; - let writer = &mut self.head; - let sz = writer.commit.encoded_len(); - // If the segment is empty, but the commit exceeds the max size, - // we got a huge commit which needs to be written even if that - // results in a huge segment. - let should_rotate = !writer.is_empty() && writer.len() + sz as u64 > self.opts.max_segment_size; - let writer = if should_rotate { - self.sync(); - self.start_new_segment()? - } else { - writer - }; - - let ret = writer.commit().or_else(|e| { - warn!("Commit failed: {e}"); - // Nb.: Don't risk a panic by calling `self.sync()`. - // We already gave up on the last commit, and will retry it next time. - self.start_new_segment()?; - Err(e) - }); - self.panicked = false; - ret - } - /// Force the currently active segment to be flushed to storage. /// /// Using a filesystem backend, this means to call `fsync(2)`. @@ -195,6 +152,16 @@ impl Generic { self.panicked = false; } + pub fn flush(&mut self) -> io::Result<()> { + self.head.flush() + } + + fn flush_and_sync(&mut self) -> io::Result<()> { + self.flush()?; + self.sync(); + Ok(()) + } + /// The last transaction offset written to disk, or `None` if nothing has /// been written yet. /// @@ -303,8 +270,17 @@ impl Generic { } impl Generic { - pub fn append(&mut self, record: T) -> Result<(), T> { - self.head.append(record) + pub fn commit>>(&mut self, transactions: impl IntoIterator) -> io::Result<()> { + self.panicked = true; + let writer = &mut self.head; + writer.commit(transactions)?; + if writer.len() >= self.opts.max_segment_size { + self.flush_and_sync()?; + self.start_new_segment()?; + } + self.panicked = false; + + Ok(()) } pub fn transactions_from<'a, D>( @@ -348,8 +324,8 @@ impl Generic { impl Drop for Generic { fn drop(&mut self) { if !self.panicked { - if let Err(e) = self.head.commit() { - warn!("failed to commit on drop: {e}"); + if let Err(e) = self.flush_and_sync() { + warn!("failed to flush on drop: {e:#}"); } } } @@ -920,7 +896,7 @@ fn range_is_empty(range: &impl RangeBounds) -> bool { #[cfg(test)] mod tests { - use std::{cell::Cell, iter::repeat, num::NonZeroU16}; + use std::{cell::Cell, iter::repeat}; use pretty_assertions::assert_matches; @@ -933,30 +909,31 @@ mod tests { #[test] fn rotate_segments_simple() { let mut log = mem_log::<[u8; 32]>(128); - for _ in 0..3 { - log.append([0; 32]).unwrap(); - log.commit().unwrap(); + for i in 0..4 { + log.commit([(i, [0; 32])]).unwrap(); } + log.flush_and_sync().unwrap(); let offsets = log.repo.existing_offsets().unwrap(); assert_eq!(&offsets[..offsets.len() - 1], &log.tail); - assert_eq!(offsets[offsets.len() - 1], 2); + // TODO: We overshoot the max segment size. + assert_eq!(&offsets, &[0, 3]); } #[test] fn huge_commit() { let mut log = mem_log::<[u8; 32]>(32); - log.append([0; 32]).unwrap(); - log.append([1; 32]).unwrap(); - log.commit().unwrap(); - assert!(log.head.len() > log.opts.max_segment_size); + log.commit([(0, [0; 32]), (1, [1; 32])]).unwrap(); + log.flush_and_sync().unwrap(); + // First segment got rotated out. + assert_eq!(&log.tail, &[0]); - log.append([2; 32]).unwrap(); - log.commit().unwrap(); + log.commit([(2, [2; 32])]).unwrap(); + log.flush_and_sync().unwrap(); - assert_eq!(&log.tail, &[0]); - assert_eq!(&log.repo.existing_offsets().unwrap(), &[0, 2]); + // Second segment got rotated out and segment 3 is created. + assert_eq!(&log.repo.existing_offsets().unwrap(), &[0, 2, 3]); } #[test] @@ -1052,14 +1029,31 @@ mod tests { fn traverse_commits_ignores_duplicates() { let mut log = mem_log::<[u8; 32]>(1024); - log.append([42; 32]).unwrap(); - let commit1 = log.head.commit.clone(); - log.commit().unwrap(); - log.head.commit = commit1.clone(); - log.commit().unwrap(); - log.append([43; 32]).unwrap(); - let commit2 = log.head.commit.clone(); - log.commit().unwrap(); + let tx1 = [42u8; 32]; + let tx2 = [43u8; 32]; + + log.commit([(0, tx1)]).unwrap(); + let commit1 = Commit { + min_tx_offset: 0, + n: 1, + records: tx1.to_vec(), + ..log.head.commit.clone() + }; + + // Reset the commit offset, so we can write the same commit twice. + log.head.commit.min_tx_offset = 0; + log.commit([(0, tx1)]).unwrap(); + + // Write another one. + log.commit([(1, tx2)]).unwrap(); + let commit2 = Commit { + min_tx_offset: 1, + n: 1, + records: tx2.to_vec(), + ..log.head.commit.clone() + }; + + log.flush_and_sync().unwrap(); assert_eq!( [commit1, commit2].as_slice(), @@ -1074,15 +1068,14 @@ mod tests { fn traverse_commits_errors_when_forked() { let mut log = mem_log::<[u8; 32]>(1024); - log.append([42; 32]).unwrap(); - log.commit().unwrap(); - log.head.commit = Commit { - min_tx_offset: 0, - n: 1, - records: [43; 32].to_vec(), - epoch: 0, - }; - log.commit().unwrap(); + log.commit([(0, [42; 32])]).unwrap(); + // Reset the commit offset, + // and write a different commit at the same offset. + // This is considered a fork. + log.head.commit.min_tx_offset = 0; + log.commit([(0, [43; 32])]).unwrap(); + + log.flush_and_sync().unwrap(); let res = log.commits_from(0).collect::, _>>(); assert!( @@ -1095,11 +1088,11 @@ mod tests { fn traverse_commits_errors_when_offset_not_contiguous() { let mut log = mem_log::<[u8; 32]>(1024); - log.append([42; 32]).unwrap(); - log.commit().unwrap(); + log.commit([(0, [42; 32])]).unwrap(); log.head.commit.min_tx_offset = 18; - log.append([42; 32]).unwrap(); - log.commit().unwrap(); + log.commit([(18, [42; 32])]).unwrap(); + + log.flush_and_sync().unwrap(); let res = log.commits_from(0).collect::, _>>(); assert!( @@ -1111,7 +1104,7 @@ mod tests { prev_error: None }) ), - "expected fork error: {res:?}" + "expected out-of-order error: {res:?}" ) } @@ -1221,7 +1214,7 @@ mod tests { #[test] fn reopen() { let mut log = mem_log::<[u8; 32]>(1024); - let mut total_txs = fill_log(&mut log, 100, (1..=10).cycle()); + let total_txs = fill_log(&mut log, 100, (1..=10).cycle()); assert_eq!( total_txs, log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count() @@ -1231,12 +1224,11 @@ mod tests { log.repo.clone(), Options { max_segment_size: 1024, - max_records_in_commit: NonZeroU16::new(10).unwrap(), ..Options::default() }, ) .unwrap(); - total_txs += fill_log(&mut log, 100, (1..=10).cycle()); + let total_txs = fill_log(&mut log, 100, (1..=10).cycle()); assert_eq!( total_txs, @@ -1245,24 +1237,22 @@ mod tests { } #[test] - fn set_same_epoch_does_nothing() { - let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::unlimited(), <_>::default()).unwrap(); - assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH); - let committed = log.set_epoch(Commit::DEFAULT_EPOCH).unwrap(); - assert_eq!(committed, None); - } - - #[test] - fn set_new_epoch_commits() { + fn set_new_epoch() { let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::unlimited(), <_>::default()).unwrap(); assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH); - log.append(<_>::default()).unwrap(); - let committed = log - .set_epoch(42) - .unwrap() - .expect("should have committed the pending transaction"); + log.commit([(0, [12; 32])]).unwrap(); + log.set_epoch(42).unwrap(); assert_eq!(log.epoch(), 42); - assert_eq!(committed.tx_range.start, 0); + log.commit([(1, [13; 32])]).unwrap(); + + log.flush_and_sync().unwrap(); + + let epochs = log + .commits_from(0) + .map(Result::unwrap) + .map(|commit| commit.epoch) + .collect::>(); + assert_eq!(&[Commit::DEFAULT_EPOCH, 42], epochs.as_slice()); } #[test] diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index 1e7a8c0047e..725d501671f 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -1,6 +1,6 @@ use std::{ io, - num::{NonZeroU16, NonZeroU64}, + num::NonZeroU64, ops::RangeBounds, sync::{Arc, RwLock}, }; @@ -20,7 +20,7 @@ mod varint; pub use crate::{ commit::{Commit, StoredCommit}, payload::{Decoder, Encode}, - repo::fs::SizeOnDisk, + repo::{fs::SizeOnDisk, TxOffset}, segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION}, varchar::Varchar, }; @@ -57,14 +57,6 @@ pub struct Options { /// Default: 1GiB #[cfg_attr(feature = "serde", serde(default = "Options::default_max_segment_size"))] pub max_segment_size: u64, - /// The maximum number of records in a commit. - /// - /// If this number is exceeded, the commit is flushed to disk even without - /// explicitly calling [`Commitlog::flush`]. - /// - /// Default: 1 - #[cfg_attr(feature = "serde", serde(default = "Options::default_max_records_in_commit"))] - pub max_records_in_commit: NonZeroU16, /// Whenever at least this many bytes have been written to the currently /// active segment, an entry is added to its offset index. /// @@ -106,7 +98,6 @@ impl Default for Options { impl Options { pub const DEFAULT_MAX_SEGMENT_SIZE: u64 = 1024 * 1024 * 1024; - pub const DEFAULT_MAX_RECORDS_IN_COMMIT: NonZeroU16 = NonZeroU16::new(1).expect("1 > 0, qed"); pub const DEFAULT_OFFSET_INDEX_INTERVAL_BYTES: NonZeroU64 = NonZeroU64::new(4096).expect("4096 > 0, qed"); pub const DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC: bool = false; pub const DEFAULT_PREALLOCATE_SEGMENTS: bool = false; @@ -114,7 +105,6 @@ impl Options { pub const DEFAULT: Self = Self { log_format_version: DEFAULT_LOG_FORMAT_VERSION, max_segment_size: Self::default_max_segment_size(), - max_records_in_commit: Self::default_max_records_in_commit(), offset_index_interval_bytes: Self::default_offset_index_interval_bytes(), offset_index_require_segment_fsync: Self::default_offset_index_require_segment_fsync(), preallocate_segments: Self::default_preallocate_segments(), @@ -128,10 +118,6 @@ impl Options { Self::DEFAULT_MAX_SEGMENT_SIZE } - pub const fn default_max_records_in_commit() -> NonZeroU16 { - Self::DEFAULT_MAX_RECORDS_IN_COMMIT - } - pub const fn default_offset_index_interval_bytes() -> NonZeroU64 { Self::DEFAULT_OFFSET_INDEX_INTERVAL_BYTES } @@ -262,7 +248,7 @@ impl Commitlog { pub fn flush(&self) -> io::Result> { let mut inner = self.inner.write().unwrap(); trace!("flush commitlog"); - inner.commit()?; + inner.flush()?; Ok(inner.max_committed_offset()) } @@ -282,7 +268,7 @@ impl Commitlog { pub fn flush_and_sync(&self) -> io::Result> { let mut inner = self.inner.write().unwrap(); trace!("flush and sync commitlog"); - inner.commit()?; + inner.flush()?; inner.sync(); Ok(inner.max_committed_offset()) @@ -383,57 +369,9 @@ impl Commitlog { } impl Commitlog { - /// Append the record `txdata` to the log. - /// - /// If the internal buffer exceeds [`Options::max_records_in_commit`], the - /// argument is returned in an `Err`. The caller should [`Self::flush`] the - /// log and try again. - /// - /// In case the log is appended to from multiple threads, this may result in - /// a busy loop trying to acquire a slot in the buffer. In such scenarios, - /// [`Self::append_maybe_flush`] is preferable. - pub fn append(&self, txdata: T) -> Result<(), T> { - let mut inner = self.inner.write().unwrap(); - inner.append(txdata) - } - - /// Append the record `txdata` to the log. - /// - /// The `txdata` payload is buffered in memory until either: - /// - /// - [`Self::flush`] is called explicitly, or - /// - [`Options::max_records_in_commit`] is exceeded - /// - /// In the latter case, [`Self::append`] flushes implicitly, _before_ - /// appending the `txdata` argument. - /// - /// I.e. the argument is not guaranteed to be flushed after the method - /// returns. If that is desired, [`Self::flush`] must be called explicitly. - /// - /// If writing `txdata` to the commitlog results in a new segment file being opened, - /// we will send a message down `on_new_segment`. - /// This will be hooked up to the `request_snapshot` channel of a `SnapshotWorker`. - /// - /// # Errors - /// - /// If the log needs to be flushed, but an I/O error occurs, ownership of - /// `txdata` is returned back to the caller alongside the [`io::Error`]. - /// - /// The value can then be used to retry appending. - pub fn append_maybe_flush(&self, txdata: T) -> Result<(), error::Append> { + pub fn commit>>(&self, transactions: impl IntoIterator) -> io::Result<()> { let mut inner = self.inner.write().unwrap(); - - if let Err(txdata) = inner.append(txdata) { - if let Err(source) = inner.commit() { - return Err(error::Append { txdata, source }); - } - - // `inner.commit.n` must be zero at this point - let res = inner.append(txdata); - debug_assert!(res.is_ok(), "failed to append while holding write lock"); - } - - Ok(()) + inner.commit(transactions) } /// Obtain an iterator which traverses the log from the start, yielding diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index 5be633bf5c7..2ab5579c8b2 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -219,8 +219,6 @@ pub fn create_segment_writer( min_tx_offset: offset, bytes_written: Header::LEN as u64, - max_records_in_commit: opts.max_records_in_commit, - offset_index_head: create_offset_index_writer(repo, offset, opts), }) } @@ -293,8 +291,6 @@ pub fn resume_segment_writer( min_tx_offset: tx_range.start, bytes_written: size_in_bytes, - max_records_in_commit: opts.max_records_in_commit, - offset_index_head: create_offset_index_writer(repo, offset, opts), })) } diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index 7e8c054467b..e2baa07d0b9 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -1,7 +1,7 @@ use std::{ fs::File, io::{self, BufWriter, ErrorKind, SeekFrom, Write as _}, - num::{NonZeroU16, NonZeroU64}, + num::NonZeroU64, ops::Range, }; @@ -100,70 +100,52 @@ pub struct Writer { pub(crate) min_tx_offset: u64, pub(crate) bytes_written: u64, - pub(crate) max_records_in_commit: NonZeroU16, - pub(crate) offset_index_head: Option, } impl Writer { - /// Append the record (aka transaction) `T` to the segment. - /// - /// If the number of currently buffered records would exceed `max_records_in_commit` - /// after the method returns, the argument is returned in an `Err` and not - /// appended to this writer's buffer. - /// - /// Otherwise, the `record` is encoded and and stored in the buffer. - /// - /// An `Err` result indicates that [`Self::commit`] should be called in - /// order to flush the buffered records to persistent storage. - pub fn append(&mut self, record: T) -> Result<(), T> { - if self.commit.n == u16::MAX || self.commit.n + 1 > self.max_records_in_commit.get() { - Err(record) - } else { - self.commit.n += 1; - record.encode_record(&mut self.commit.records); - Ok(()) - } - } + pub fn commit>, U: Encode>( + &mut self, + transactions: impl IntoIterator, + ) -> io::Result<()> { + let mut txs = transactions.into_iter().peekable(); + while txs.peek().is_some() { + for tx in txs.by_ref().take(u16::MAX as usize) { + let tx = tx.into(); + let expected_offset = self.commit.min_tx_offset + self.commit.n as u64; + if tx.offset != expected_offset { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("invalid transaction offset {}, expected {}", tx.offset, expected_offset), + )); + } + self.commit.n += 1; + tx.txdata.encode_record(&mut self.commit.records); + } - /// Write the current [`Commit`] to the underlying [`io::Write`]. - /// - /// Will do nothing if the current commit is empty (i.e. `Commit::n` is zero). - /// In this case, `None` is returned. - /// - /// Otherwise `Some` [`Committed`] is returned, providing some metadata about - /// the commit. - pub fn commit(&mut self) -> io::Result> { - if self.commit.n == 0 { - return Ok(None); - } - let checksum = self.commit.write(&mut self.inner)?; - self.inner.flush()?; - - let commit_len = self.commit.encoded_len() as u64; - self.offset_index_head.as_mut().map(|index| { - debug!( - "append_after commit min_tx_offset={} bytes_written={} commit_len={}", - self.commit.min_tx_offset, self.bytes_written, commit_len - ); - index - .append_after_commit(self.commit.min_tx_offset, self.bytes_written, commit_len) - .map_err(|e| { - debug!("failed to append to offset index: {e:?}"); - }) - }); + let _checksum = self + .commit + .write(&mut self.inner) + .unwrap_or_else(|e| panic!("failed to write commit {}: {:#}", self.commit.min_tx_offset, e)); + let commit_len = self.commit.encoded_len() as u64; - let tx_range_start = self.commit.min_tx_offset; + if let Some(index) = self.offset_index_head.as_mut() { + let _ = index + .append_after_commit(self.commit.min_tx_offset, self.bytes_written, commit_len) + .inspect_err(|e| debug!("failed to append to offset index: {e}")); + } - self.bytes_written += commit_len; - self.commit.min_tx_offset += self.commit.n as u64; - self.commit.n = 0; - self.commit.records.clear(); + self.bytes_written += commit_len; + self.commit.min_tx_offset += self.commit.n as u64; + self.commit.n = 0; + self.commit.records.clear(); + } - Ok(Some(Committed { - tx_range: tx_range_start..self.commit.min_tx_offset, - checksum, - })) + Ok(()) + } + + pub fn flush(&mut self) -> io::Result<()> { + self.inner.flush() } /// Get the current epoch. @@ -537,6 +519,12 @@ pub struct Transaction { pub txdata: T, } +impl From<(u64, T)> for Transaction { + fn from((offset, txdata): (u64, T)) -> Self { + Self { offset, txdata } + } +} + pub struct Commits { pub header: Header, reader: R, @@ -744,16 +732,14 @@ impl Metadata { #[cfg(test)] mod tests { - use std::num::NonZeroU16; - - use super::*; - use crate::{payload::ArrayDecoder, repo, Options}; use itertools::Itertools; use pretty_assertions::assert_matches; - use proptest::prelude::*; use spacetimedb_paths::server::CommitLogDir; use tempfile::tempdir; + use super::*; + use crate::{payload::ArrayDecoder, repo, Options}; + #[test] fn header_roundtrip() { let hdr = Header { @@ -772,20 +758,9 @@ mod tests { fn write_read_roundtrip() { let repo = repo::Memory::unlimited(); - let mut writer = repo::create_segment_writer( - &repo, - Options { - max_records_in_commit: NonZeroU16::new(4).unwrap(), - ..<_>::default() - }, - Commit::DEFAULT_EPOCH, - 0, - ) - .unwrap(); - writer.append([0; 32]).unwrap(); - writer.append([1; 32]).unwrap(); - writer.append([2; 32]).unwrap(); - writer.commit().unwrap(); + let mut writer = repo::create_segment_writer(&repo, <_>::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); + writer.commit([(0, [0; 32]), (1, [1; 32]), (2, [2; 32])]).unwrap(); + writer.flush().unwrap(); let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap(); let header = reader.header; @@ -810,27 +785,15 @@ mod tests { fn metadata() { let repo = repo::Memory::unlimited(); - let mut writer = repo::create_segment_writer( - &repo, - Options { - max_records_in_commit: NonZeroU16::new(3).unwrap(), - ..<_>::default() - }, - Commit::DEFAULT_EPOCH, - 0, - ) - .unwrap(); + let mut writer = repo::create_segment_writer(&repo, <_>::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); // Commit 0..2 - writer.append([0; 32]).unwrap(); - writer.append([0; 32]).unwrap(); - writer.commit().unwrap(); + writer.commit([(0, [0; 32]), (1, [0; 32])]).unwrap(); // Commit 2..3 - writer.append([1; 32]).unwrap(); - writer.commit().unwrap(); + writer.commit([(2, [1; 32])]).unwrap(); // Commit 3..5 - writer.append([2; 32]).unwrap(); - writer.append([2; 32]).unwrap(); - writer.commit().unwrap(); + writer.commit([(3, [2; 32]), (4, [2; 32])]).unwrap(); + + writer.flush().unwrap(); let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap(); let metadata = reader.metadata().unwrap(); @@ -851,37 +814,32 @@ mod tests { #[test] fn commits() { let repo = repo::Memory::unlimited(); - let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]]; + let commits = vec![ + vec![(0, [1; 32]), (1, [2; 32])], + vec![(2, [3; 32])], + vec![(3, [4; 32]), (4, [5; 32])], + ]; - let mut writer = repo::create_segment_writer( - &repo, - Options { - max_records_in_commit: NonZeroU16::new(3).unwrap(), - ..<_>::default() - }, - Commit::DEFAULT_EPOCH, - 0, - ) - .unwrap(); + let mut writer = repo::create_segment_writer(&repo, <_>::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); for commit in &commits { - for tx in commit { - writer.append(*tx).unwrap(); - } - writer.commit().unwrap(); + writer.commit(commit.clone()).unwrap(); } + writer.flush().unwrap(); + let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap(); let mut commits1 = Vec::with_capacity(commits.len()); let mut min_tx_offset = 0; for txs in commits { + let n = txs.len(); commits1.push(Commit { min_tx_offset, - n: txs.len() as u16, - records: txs.concat(), + n: n as u16, + records: itertools::concat(txs.into_iter().map(|(_, payload)| payload.to_vec())), epoch: 0, }); - min_tx_offset += txs.len() as u64; + min_tx_offset += n as u64; } let commits2 = reader .commits() @@ -894,73 +852,25 @@ mod tests { #[test] fn transactions() { let repo = repo::Memory::unlimited(); - let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]]; + let commits = vec![ + vec![(0, [1; 32]), (1, [2; 32])], + vec![(2, [3; 32])], + vec![(3, [4; 32]), (4, [5; 32])], + ]; - let mut writer = repo::create_segment_writer( - &repo, - Options { - max_records_in_commit: NonZeroU16::new(3).unwrap(), - ..<_>::default() - }, - Commit::DEFAULT_EPOCH, - 0, - ) - .unwrap(); + let mut writer = repo::create_segment_writer(&repo, <_>::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); for commit in &commits { - for tx in commit { - writer.append(*tx).unwrap(); - } - writer.commit().unwrap(); + writer.commit(commit.clone()).unwrap(); } + writer.flush().unwrap(); + let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap(); let txs = reader .transactions(&ArrayDecoder) .collect::, _>>() .unwrap(); - assert_eq!( - txs, - commits - .into_iter() - .flatten() - .enumerate() - .map(|(offset, txdata)| Transaction { - offset: offset as u64, - txdata - }) - .collect::>() - ); - } - - proptest! { - #[test] - fn max_records_in_commit(max_records_in_commit in any::()) { - let mut writer = Writer { - commit: Commit::default(), - inner: BufWriter::new(Vec::new()), - - min_tx_offset: 0, - bytes_written: 0, - - max_records_in_commit, - - offset_index_head: None, - }; - - for i in 0..max_records_in_commit.get() { - assert!( - writer.append([0; 16]).is_ok(), - "less than {} records written: {}", - max_records_in_commit.get(), - i - ); - } - assert!( - writer.append([0; 16]).is_err(), - "more than {} records written", - max_records_in_commit.get() - ); - } + assert_eq!(txs, commits.into_iter().flatten().map(Into::into).collect::>()); } #[test] @@ -972,20 +882,14 @@ mod tests { min_tx_offset: 0, bytes_written: 0, - max_records_in_commit: NonZeroU16::MAX, offset_index_head: None, }; assert_eq!(0, writer.next_tx_offset()); - writer.append([0; 16]).unwrap(); - assert_eq!(0, writer.next_tx_offset()); - writer.commit().unwrap(); - assert_eq!(1, writer.next_tx_offset()); - writer.commit().unwrap(); + writer.commit([(0, [0; 16])]).unwrap(); assert_eq!(1, writer.next_tx_offset()); - writer.append([1; 16]).unwrap(); - writer.append([1; 16]).unwrap(); - writer.commit().unwrap(); + writer.commit([(1, [1; 16])]).unwrap(); + writer.commit([(2, [1; 16])]).unwrap(); assert_eq!(3, writer.next_tx_offset()); } diff --git a/crates/commitlog/src/tests/helpers.rs b/crates/commitlog/src/tests/helpers.rs index c645b2cdc65..b7740b92e5b 100644 --- a/crates/commitlog/src/tests/helpers.rs +++ b/crates/commitlog/src/tests/helpers.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, num::NonZeroU16}; +use std::fmt::Debug; use env_logger::Env; @@ -13,7 +13,6 @@ pub fn mem_log(max_segment_size: u64) -> commitlog::Generic(ShortMem::new(800)); - let total_commits = 100; - let total_txs = fill_log_enospc(&mut log, total_commits, (1..=10).cycle()); - - assert_eq!( - total_txs, - log.transactions_from(0, &payload::ArrayDecoder) - .map(Result::unwrap) - .count() - ); - assert_eq!(total_commits, log.commits_from(0).map(Result::unwrap).count()); -} - -// Note: Write errors cause the in-flight commit to be written to a fresh -// segment. So as long as we write through the public API, partial writes -// never surface (i.e. the log is contiguous). -#[test] -fn reopen() { - enable_logging(); - - let repo = ShortMem::new(800); - let num_commits = 10; - - let mut total_txs = 0; - for i in 0..2 { - let mut log = open_log::<[u8; 32]>(repo.clone()); - total_txs += fill_log_enospc(&mut log, num_commits, (1..=10).cycle()); - - debug!("fill {} done", i + 1); - } - - assert_eq!( - total_txs, - open_log::<[u8; 32]>(repo.clone()) - .transactions_from(0, &payload::ArrayDecoder) - .map(Result::unwrap) - .count() - ); - - // Let's see if we hit a funny case in any of the segments. - for offset in repo.existing_offsets().unwrap().into_iter().rev() { - let meta = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, offset) - .unwrap() - .metadata() - .unwrap(); - debug!("dropping segment: segment::{meta:?}"); - repo.remove_segment(offset).unwrap(); - assert_eq!( - meta.tx_range.start, - open_log::<[u8; 32]>(repo.clone()) - .transactions_from(0, &payload::ArrayDecoder) - .map(Result::unwrap) - .count() as u64 - ); - } -} - -#[test] -fn overwrite_reopen() { - enable_logging(); - - let repo = ShortMem::new(800); - let num_commits = 10; - let txs_per_commit = 5; - - let mut log = open_log::<[u8; 32]>(repo.clone()); - let mut total_txs = fill_log_enospc(&mut log, num_commits, repeat(txs_per_commit)); - - let last_segment_offset = repo.existing_offsets().unwrap().last().copied().unwrap(); - let last_commit: Commit = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, last_segment_offset) - .unwrap() - .commits() - .map(Result::unwrap) - .last() - .unwrap() - .into(); - debug!("last commit: {last_commit:?}"); - - { - let mut last_segment = repo.open_segment_writer(last_segment_offset).unwrap(); - let pos = last_segment.len() - last_commit.encoded_len() + 1; - last_segment.modify_byte_at(pos, |_| 255); - } - - let mut log = open_log::<[u8; 32]>(repo.clone()); - for (i, commit) in log.commits_from(0).enumerate() { - if i < num_commits - 1 { - commit.expect("all but last commit should be good"); - } else { - let last_good_offset = txs_per_commit * (num_commits - 1); - assert!( - matches!( - commit, - Err(error::Traversal::Checksum { offset, .. }) if offset == last_good_offset as u64, - ), - "expected checksum error with offset={last_good_offset}: {commit:?}" - ); - } + for i in 0..100 { + log.commit([(i, [b'z'; 32])]).unwrap(); } - - // Write some more data. - total_txs += fill_log_enospc(&mut log, num_commits, repeat(txs_per_commit)); - // Log should be contiguous, but missing one corrupted commit. - assert_eq!( - total_txs - txs_per_commit, - log.transactions_from(0, &payload::ArrayDecoder) - .map(Result::unwrap) - .count() - ); - // Check that this is true if we reopen the log. - assert_eq!( - total_txs - txs_per_commit, - open_log::<[u8; 32]>(repo) - .transactions_from(0, &payload::ArrayDecoder) - .map(Result::unwrap) - .count() - ); } /// Edge case surfaced in production: @@ -154,7 +41,6 @@ fn first_commit_in_last_segment_corrupt() { let repo = repo::Memory::unlimited(); let options = Options { max_segment_size: 512, - max_records_in_commit: NonZeroU16::new(1).unwrap(), ..<_>::default() }; { @@ -180,7 +66,6 @@ fn open_log(repo: ShortMem) -> commitlog::Generic { repo, Options { max_segment_size: 1024, - max_records_in_commit: NonZeroU16::new(10).unwrap(), ..Options::default() }, ) @@ -197,16 +82,6 @@ struct ShortSegment { max_len: u64, } -impl ShortSegment { - pub fn len(&self) -> usize { - self.inner.len() - } - - pub fn modify_byte_at(&mut self, pos: usize, f: impl FnOnce(u8) -> u8) { - self.inner.modify_byte_at(pos, f); - } -} - impl SegmentLen for ShortSegment { fn segment_len(&mut self) -> io::Result { self.inner.segment_len() @@ -314,38 +189,3 @@ impl Repo for ShortMem { self.inner.existing_offsets() } } - -/// Like [`crate::tests::helpers::fill_log`], but expect that ENOSPC happens at -/// least once. -fn fill_log_enospc( - log: &mut commitlog::Generic, - num_commits: usize, - txs_per_commit: impl Iterator, -) -> usize -where - T: Debug + Default + Encode, -{ - let mut seen_enospc = false; - - let mut total_txs = 0; - for (_, n) in (0..num_commits).zip(txs_per_commit) { - for _ in 0..n { - log.append(T::default()).unwrap(); - total_txs += 1; - } - let res = log.commit(); - if let Err(Some(os)) = res.as_ref().map_err(|e| e.raw_os_error()) { - if os == ENOSPC { - debug!("fill: ignoring ENOSPC"); - seen_enospc = true; - log.commit().unwrap(); - continue; - } - } - res.unwrap(); - } - - assert!(seen_enospc, "expected to see ENOSPC"); - - total_txs -} diff --git a/crates/commitlog/tests/random_payload/mod.rs b/crates/commitlog/tests/random_payload/mod.rs index 85ab653480d..49e47ed42d1 100644 --- a/crates/commitlog/tests/random_payload/mod.rs +++ b/crates/commitlog/tests/random_payload/mod.rs @@ -1,5 +1,3 @@ -use std::num::NonZeroU16; - use log::info; use spacetimedb_commitlog::tests::helpers::enable_logging; use spacetimedb_commitlog::{payload, Commitlog, Options}; @@ -18,7 +16,6 @@ fn smoke() { CommitLogDir::from_path_unchecked(root.path()), Options { max_segment_size: 8 * 1024, - max_records_in_commit: NonZeroU16::MIN, ..Options::default() }, None, @@ -27,18 +24,18 @@ fn smoke() { let n_txs = 500; let payload = gen_payload(); - for _ in 0..n_txs { - clog.append_maybe_flush(payload).unwrap(); + for i in 0..n_txs { + clog.commit([(i, payload)]).unwrap(); } let committed_offset = clog.flush_and_sync().unwrap(); - assert_eq!(n_txs - 1, committed_offset.unwrap() as usize); + assert_eq!(n_txs - 1, committed_offset.unwrap()); assert_eq!( - n_txs, + n_txs as usize, clog.transactions(&payload::ArrayDecoder).map(Result::unwrap).count() ); // We set max_records_in_commit to 1, so n_commits == n_txs - assert_eq!(n_txs, clog.commits().map(Result::unwrap).count()); + assert_eq!(n_txs as usize, clog.commits().map(Result::unwrap).count()); } #[test] @@ -48,7 +45,6 @@ fn resets() { CommitLogDir::from_path_unchecked(root.path()), Options { max_segment_size: 512, - max_records_in_commit: NonZeroU16::MIN, ..Options::default() }, None, @@ -56,8 +52,8 @@ fn resets() { .unwrap(); let payload = gen_payload(); - for _ in 0..50 { - clog.append_maybe_flush(payload).unwrap(); + for i in 0..50 { + clog.commit([(i, payload)]).unwrap(); } clog.flush_and_sync().unwrap(); @@ -88,7 +84,6 @@ fn compression() { CommitLogDir::from_path_unchecked(root.path()), Options { max_segment_size: 8 * 1024, - max_records_in_commit: NonZeroU16::MIN, ..Options::default() }, None, @@ -98,8 +93,8 @@ fn compression() { // try to generate commitlogs that will be amenable to compression - // random data doesn't compress well, so try and have there be repetition let payloads = (0..4).map(|_| gen_payload()).cycle().take(1024).collect::>(); - for payload in &payloads { - clog.append_maybe_flush(*payload).unwrap(); + for (i, payload) in payloads.iter().enumerate() { + clog.commit([(i as u64, *payload)]).unwrap(); } clog.flush_and_sync().unwrap(); diff --git a/crates/commitlog/tests/streaming/mod.rs b/crates/commitlog/tests/streaming/mod.rs index 0474ae75b30..3c2077bacaf 100644 --- a/crates/commitlog/tests/streaming/mod.rs +++ b/crates/commitlog/tests/streaming/mod.rs @@ -1,6 +1,6 @@ use std::{ io, - num::{NonZeroU16, NonZeroU64}, + num::NonZeroU64, ops::RangeBounds, path::{Path, PathBuf}, }; @@ -183,7 +183,6 @@ async fn assert_equal_dirs(src: &Path, dst: &Path) { fn default_options() -> Options { Options { max_segment_size: 8 * 1024, - max_records_in_commit: NonZeroU16::MIN, // Write an index entry for every commit. offset_index_interval_bytes: NonZeroU64::new(256).unwrap(), offset_index_require_segment_fsync: false, @@ -195,8 +194,8 @@ async fn fill_log(path: PathBuf) { spawn_blocking(move || { let clog = Commitlog::open(CommitLogDir::from_path_unchecked(path), default_options(), None).unwrap(); let payload = random_payload::gen_payload(); - for _ in 0..100 { - clog.append_maybe_flush(payload).unwrap(); + for i in 0..100 { + clog.commit([(i, payload)]).unwrap(); } clog.flush_and_sync().unwrap(); }) diff --git a/crates/core/src/db/durability.rs b/crates/core/src/db/durability.rs index 0ea88bdb4b9..1476b56edbf 100644 --- a/crates/core/src/db/durability.rs +++ b/crates/core/src/db/durability.rs @@ -8,7 +8,7 @@ use spacetimedb_commitlog::payload::{ }; use spacetimedb_data_structures::map::IntSet; use spacetimedb_datastore::{execution_context::ReducerContext, traits::TxData}; -use spacetimedb_durability::{DurableOffset, TxOffset}; +use spacetimedb_durability::{DurableOffset, Transaction, TxOffset}; use spacetimedb_lib::Identity; use spacetimedb_primitives::TableId; use tokio::{ @@ -199,14 +199,14 @@ impl DurabilityWorkerActor { } pub fn do_durability(durability: &Durability, reducer_context: Option, tx_data: &TxData) { - if tx_data.tx_offset().is_none() { + let Some(tx_offset) = tx_data.tx_offset() else { let name = reducer_context.as_ref().map(|rcx| &*rcx.name); debug_assert!( !tx_data.has_rows_or_connect_disconnect(name), "tx_data has no rows but has connect/disconnect: `{name:?}`" ); return; - } + }; let is_persistent_table = |table_id: &TableId| -> bool { !tx_data.is_ephemeral_table(table_id) }; @@ -252,9 +252,14 @@ impl DurabilityWorkerActor { }), }; - // TODO: Should measure queuing time + actual write // This does not block, as per trait docs. - durability.append_tx(txdata); + durability.commit( + [Transaction { + offset: tx_offset, + txdata, + }] + .into(), + ); } } @@ -292,9 +297,12 @@ mod tests { impl spacetimedb_durability::Durability for CountingDurability { type TxData = Txdata; - fn append_tx(&self, _tx: Self::TxData) { + fn commit(&self, txs: Box<[Transaction]>) { + let Some(max_offset) = txs.iter().map(|x| x.offset).max() else { + return; + }; self.appended.send_modify(|offset| { - *offset = offset.map(|x| x + 1).or(Some(0)); + offset.replace(max_offset); }); } diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index b409a8121cd..60f8e6bea97 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -3,7 +3,7 @@ use crate::db::MetricsRecorderQueue; use crate::error::{DBError, RestoreSnapshotError}; use crate::messages::control_db::HostType; use crate::subscription::ExecutionCounters; -use crate::util::{asyncify, spawn_rayon}; +use crate::util::asyncify; use crate::worker_metrics::WORKER_METRICS; use anyhow::{anyhow, Context}; use enum_map::EnumMap; @@ -1716,7 +1716,6 @@ pub async fn local_durability( snapshot_worker: Option<&SnapshotWorker>, ) -> Result<(LocalDurability, DiskSizeFn), DBError> { let rt = tokio::runtime::Handle::current(); - // TODO: Should this better be spawn_blocking? let on_new_segment = snapshot_worker.map(|snapshot_worker| { let snapshot_worker = snapshot_worker.clone(); Arc::new(move || { @@ -1725,17 +1724,11 @@ pub async fn local_durability( snapshot_worker.request_snapshot_ignore_closed(); }) as Arc }); - let local = spawn_rayon(move || { + let local = asyncify(move || { durability::Local::open( replica_dir.clone(), rt, - durability::local::Options { - commitlog: commitlog::Options { - max_records_in_commit: 1.try_into().unwrap(), - ..Default::default() - }, - ..Default::default() - }, + <_>::default(), // Give the durability a handle to request a new snapshot run, // which it will send down whenever we rotate commitlog segments. on_new_segment, diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 38df7cea554..5d2c494cba2 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1466,7 +1466,7 @@ mod tests { use spacetimedb_commitlog::{commitlog, repo}; use spacetimedb_data_structures::map::{HashCollectionExt as _, HashMap}; use spacetimedb_datastore::system_tables::{StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID}; - use spacetimedb_durability::{Durability, EmptyHistory, TxOffset}; + use spacetimedb_durability::{Durability, EmptyHistory, Transaction, TxOffset}; use spacetimedb_execution::dml::MutDatastore; use spacetimedb_lib::bsatn::ToBsatn; use spacetimedb_lib::db::auth::StAccess; @@ -1551,13 +1551,10 @@ mod tests { impl Durability for ManualDurability { type TxData = Txdata; - fn append_tx(&self, tx: Self::TxData) { + fn commit(&self, txs: Box<[Transaction]>) { let mut commitlog = self.commitlog.write().unwrap(); - if let Err(tx) = commitlog.append(tx) { - commitlog.commit().expect("error flushing commitlog"); - commitlog.append(tx).expect("should be able to append after flush"); - } - commitlog.commit().expect("error flushing commitlog"); + commitlog.commit(txs).expect("commit failed"); + commitlog.flush().expect("error flushing commitlog"); } fn durable_tx_offset(&self) -> spacetimedb_durability::DurableOffset { diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index e3a8afa34ac..f0c6daf474f 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -1,7 +1,5 @@ use std::{ io, - num::NonZeroU16, - panic, path::PathBuf, sync::{ atomic::{AtomicU64, Ordering::Relaxed}, @@ -80,7 +78,7 @@ pub struct Local { /// [`PersisterTask`]. /// /// Note that this is unbounded! - queue: mpsc::UnboundedSender>, + queue: mpsc::UnboundedSender>]>>, /// How many transactions are sitting in the `queue`. /// /// This is mainly for observability purposes, and can thus be updated with @@ -132,7 +130,6 @@ impl Local { queue_depth: queue_depth.clone(), sync_interval: opts.sync_interval, - max_records_in_commit: opts.commitlog.max_records_in_commit, lock, } @@ -191,7 +188,6 @@ struct Actor { queue_depth: Arc, sync_interval: Duration, - max_records_in_commit: NonZeroU16, #[allow(unused)] lock: Lock, @@ -201,7 +197,7 @@ impl Actor { #[instrument(name = "durability::local::actor", skip_all)] async fn run( self, - mut txdata_rx: mpsc::UnboundedReceiver>, + mut commits_rx: mpsc::UnboundedReceiver>]>>, mut shutdown_rx: mpsc::Receiver>, ) { info!("starting durability actor"); @@ -224,7 +220,7 @@ impl Actor { biased; Some(reply) = shutdown_rx.recv() => { - txdata_rx.close(); + commits_rx.close(); let _ = reply.send(self.lock.notified()); }, @@ -235,21 +231,16 @@ impl Actor { } }, - data = txdata_rx.recv() => { - let Some(txdata) = data else { + commit = commits_rx.recv() => { + let Some(commit) = commit else { break; }; self.queue_depth.fetch_sub(1, Relaxed); - // If we are writing one commit per tx, trying to buffer is - // fairly pointless. Immediately flush instead. - // - // Otherwise, try `Commitlog::append` as a fast-path - // that doesn't require `spawn_blocking`. - if self.max_records_in_commit.get() == 1 { - self.flush_append(txdata, true).await; - } else if let Err(retry) = self.clog.append(txdata) { - self.flush_append(retry, false).await - } + let clog = self.clog.clone(); + spawn_blocking(move || clog.commit(commit)) + .await + .expect("commitlog write panicked") + .expect("commitlog write failed"); }, } } @@ -261,30 +252,6 @@ impl Actor { info!("exiting durability actor"); } - #[instrument(skip_all)] - async fn flush_append(&self, txdata: Txdata, flush_after: bool) { - let clog = self.clog.clone(); - let span = Span::current(); - spawn_blocking(move || { - let _span = span.enter(); - let mut retry = Some(txdata); - while let Some(txdata) = retry.take() { - if let Err(error::Append { txdata, source }) = clog.append_maybe_flush(txdata) { - flush_error("append-maybe-flush", &source); - retry = Some(txdata); - } - } - - if flush_after { - clog.flush() - .map(drop) - .unwrap_or_else(|e| flush_error("flush-after", &e)); - } - }) - .await - .expect("commitlog append blocking task panicked") - } - #[instrument(skip_all)] async fn flush_and_sync(&self) -> io::Result> { // Skip if nothing changed. @@ -302,7 +269,7 @@ impl Actor { }) .await .expect("commitlog flush-and-sync blocking task panicked") - .inspect_err(|e| flush_error("flush-and-sync", e)) + .inspect_err(|e| warn!("error flushing commitlog: {e:#}")) .inspect(|maybe_offset| { if let Some(new_offset) = maybe_offset { trace!("synced to offset {new_offset}"); @@ -342,25 +309,11 @@ impl Drop for Lock { } } -/// Handle an error flushing the commitlog. -/// -/// Panics if the error indicates that the log may be permanently unwritable. -#[inline] -#[track_caller] -fn flush_error(task: &str, e: &io::Error) { - warn!("error flushing commitlog ({task}): {e:?}"); - if matches!(e.kind(), io::ErrorKind::AlreadyExists | io::ErrorKind::StorageFull) { - panic!("{e}"); - } -} - impl Durability for Local { type TxData = Txdata; - fn append_tx(&self, tx: Self::TxData) { - if self.queue.send(tx).is_err() { - panic!("durability actor crashed"); - } + fn commit(&self, txs: Box<[Transaction]>) { + self.queue.send(txs).expect("durability actor crashed"); self.queue_depth.fetch_add(1, Relaxed); } diff --git a/crates/durability/src/imp/mod.rs b/crates/durability/src/imp/mod.rs index 6f83106d0b3..562b9b56eab 100644 --- a/crates/durability/src/imp/mod.rs +++ b/crates/durability/src/imp/mod.rs @@ -15,7 +15,7 @@ mod testing { use futures::FutureExt as _; use tokio::sync::watch; - use crate::{Close, Durability, DurableOffset, TxOffset}; + use crate::{Close, Durability, DurableOffset, Transaction, TxOffset}; /// A [`Durability`] impl that sends all transactions into the void. /// @@ -41,7 +41,7 @@ mod testing { impl Durability for NoDurability { type TxData = T; - fn append_tx(&self, _: Self::TxData) { + fn commit(&self, _: Box<[Transaction]>) { if self.closed.load(Ordering::Relaxed) { panic!("`close` was called on this `NoDurability` instance"); } diff --git a/crates/durability/src/lib.rs b/crates/durability/src/lib.rs index 2038d90eeb9..0d40bcd76ad 100644 --- a/crates/durability/src/lib.rs +++ b/crates/durability/src/lib.rs @@ -104,19 +104,9 @@ pub trait Durability: Send + Sync { /// The payload representing a single transaction. type TxData; - /// Submit the transaction payload to be made durable. - /// - /// This method must never block, and accept new transactions even if they - /// cannot be made durable immediately. - /// - /// A permanent failure of the durable storage may be signalled by panicking. - fn append_tx(&self, tx: Self::TxData); + fn commit(&self, txs: Box<[Transaction]>); - /// The [`TxOffset`] considered durable. - /// - /// A `None` return value indicates that the durable offset is not known, - /// either because nothing has been persisted yet, or because the status - /// cannot be retrieved. + /// Obtain a handle to the [DurableOffset]. fn durable_tx_offset(&self) -> DurableOffset; /// Asynchronously request the durability to shut down, without dropping it. From a0de7d99840acfa562a689822bafe73b56c3ce21 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Tue, 27 Jan 2026 18:06:13 +0100 Subject: [PATCH 02/21] Restore some commentary --- crates/durability/src/lib.rs | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/crates/durability/src/lib.rs b/crates/durability/src/lib.rs index 0d40bcd76ad..43c236a6388 100644 --- a/crates/durability/src/lib.rs +++ b/crates/durability/src/lib.rs @@ -95,15 +95,26 @@ pub type Close = BoxFuture<'static, Option>; /// /// NOTE: This is a preliminary definition, still under consideration. /// -/// A durability implementation accepts a payload representing a single database -/// transaction via [`Durability::append_tx`] in a non-blocking fashion. The -/// payload _should_ become durable eventually. [`TxOffset`]s reported by -/// [`Durability::durable_tx_offset`] shall be considered durable to the -/// extent the implementation can guarantee. +/// A durability implementation accepts one or more [Transaction]s to be made +/// durable via [Durability::commit] in a non-blocking fashion. +/// +/// A batch of transactions is eventually made durable atomically. +/// Note that this means that a torn write can render the whole batch +/// inaccessible, so small batches are usually preferable. +/// +/// Once a transaction becomes durable, the [DurableOffset] is updated. +/// What durable means depends on the implementation, informally it can be +/// thought of as "written to disk". pub trait Durability: Send + Sync { /// The payload representing a single transaction. type TxData; + /// Submit a batch of [Transaction]s to be made durable. + /// + /// This method must never block, and accept new transactions even if they + /// cannot be made durable immediately. + /// + /// Errors may be signalled by panicking. fn commit(&self, txs: Box<[Transaction]>); /// Obtain a handle to the [DurableOffset]. From e73abcc5226d4562a52e7e7e777fc1e2e6a106c2 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 28 Jan 2026 10:26:20 +0100 Subject: [PATCH 03/21] Clear commit before returning error --- crates/commitlog/src/segment.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index e2baa07d0b9..d0acb3eb8ce 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -114,6 +114,9 @@ impl Writer { let tx = tx.into(); let expected_offset = self.commit.min_tx_offset + self.commit.n as u64; if tx.offset != expected_offset { + self.commit.n = 0; + self.commit.records.clear(); + return Err(io::Error::new( io::ErrorKind::InvalidInput, format!("invalid transaction offset {}, expected {}", tx.offset, expected_offset), From d5b29ccdd957a7b00c849b4dcd2140c10d67a15a Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 28 Jan 2026 10:27:49 +0100 Subject: [PATCH 04/21] More commentary --- crates/commitlog/src/commitlog.rs | 41 +++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index 03d2bb0ef38..ce3d72f2899 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -270,6 +270,47 @@ impl Generic { } impl Generic { + /// Write `transactions` to the log. + /// + /// This will store all transactions as a single [Commit] if possible + /// (if the iterator contains more than `u16::MAX` elements, additional + /// commits are created). + /// + /// Data is buffered by the underlying segment [Writer], so not all data + /// submitted here may have been written to disk when this method returns. + /// Call [Self::flush] to force flushing to the OS. + /// + /// If, after writing the transactions, the writer's total written bytes + /// exceed [Options::max_segment_size], the current segment is flushed, + /// `fsync`ed and closed, and a new segment is created. + /// + /// # Errors + /// + /// An `Err` value is returned in the following cases: + /// + /// - if the transaction sequence is invalid, e.g. because the transaction + /// offsets are not contiguous. + /// + /// In this case, the current commit will **not** be written. + /// If the input does not fit in a single commit, _some_ commits may have + /// been written when the invalid input is encountered. + /// + /// - if the current segment needs to be rotated, and an I/O error occurs + /// flushing it to storage. + /// + /// - if creating the new segment fails due to an I/O error. + /// + /// # Panics + /// + /// The method panics if: + /// + /// - writing to the underlying [Writer] fails + /// + /// This is likely caused by some storage issue. As we cannot tell with + /// certainty how much data (if any) has been written, the internal state + /// becomes invalid and thus a panic is raised. + /// + /// - if [Self::sync] panics (called when rotating segments) pub fn commit>>(&mut self, transactions: impl IntoIterator) -> io::Result<()> { self.panicked = true; let writer = &mut self.head; From de918d58d702bdf1efdf798eb72bcf65e7224c8d Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 28 Jan 2026 11:19:54 +0100 Subject: [PATCH 05/21] Panic if >u16::MAX transactions Allowing to restore `Committed` return --- crates/commitlog/src/commitlog.rs | 24 ++++++---- crates/commitlog/src/segment.rs | 77 ++++++++++++++++++------------- 2 files changed, 59 insertions(+), 42 deletions(-) diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index ce3d72f2899..9b0e8104049 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -272,12 +272,10 @@ impl Generic { impl Generic { /// Write `transactions` to the log. /// - /// This will store all transactions as a single [Commit] if possible - /// (if the iterator contains more than `u16::MAX` elements, additional - /// commits are created). + /// This will store all transactions as a single [Commit] + /// (note that `transactions` must not yield more than [u16::MAX] elements). /// - /// Data is buffered by the underlying segment [Writer], so not all data - /// submitted here may have been written to disk when this method returns. + /// Data is buffered by the underlying segment [Writer]. /// Call [Self::flush] to force flushing to the OS. /// /// If, after writing the transactions, the writer's total written bytes @@ -291,26 +289,34 @@ impl Generic { /// - if the transaction sequence is invalid, e.g. because the transaction /// offsets are not contiguous. /// - /// In this case, the current commit will **not** be written. - /// If the input does not fit in a single commit, _some_ commits may have - /// been written when the invalid input is encountered. + /// In this case, **none** of the `transactions` will be written. /// /// - if the current segment needs to be rotated, and an I/O error occurs /// flushing it to storage. /// + /// In this case, unwritten data remains buffered, and the current segment + /// remains open. Calling [Self::flush] afterwards may (or may not) + /// succeed, and calling [Self::commit] again with new data could grow + /// the segment further beyond [Options::max_segment_size] if successful. + /// + /// It is advisable to close and reopen the commitlog handle before + /// attempting further writes. + /// /// - if creating the new segment fails due to an I/O error. /// /// # Panics /// /// The method panics if: /// + /// - `transactions` exceeds [u16::MAX] elements + /// /// - writing to the underlying [Writer] fails /// /// This is likely caused by some storage issue. As we cannot tell with /// certainty how much data (if any) has been written, the internal state /// becomes invalid and thus a panic is raised. /// - /// - if [Self::sync] panics (called when rotating segments) + /// - [Self::sync] panics (called when rotating segments) pub fn commit>>(&mut self, transactions: impl IntoIterator) -> io::Result<()> { self.panicked = true; let writer = &mut self.head; diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index d0acb3eb8ce..c11f1b305f2 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -107,44 +107,55 @@ impl Writer { pub fn commit>, U: Encode>( &mut self, transactions: impl IntoIterator, - ) -> io::Result<()> { - let mut txs = transactions.into_iter().peekable(); - while txs.peek().is_some() { - for tx in txs.by_ref().take(u16::MAX as usize) { - let tx = tx.into(); - let expected_offset = self.commit.min_tx_offset + self.commit.n as u64; - if tx.offset != expected_offset { - self.commit.n = 0; - self.commit.records.clear(); - - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - format!("invalid transaction offset {}, expected {}", tx.offset, expected_offset), - )); - } - self.commit.n += 1; - tx.txdata.encode_record(&mut self.commit.records); - } + ) -> io::Result> { + for tx in transactions { + let tx = tx.into(); + let expected_offset = self.commit.min_tx_offset + self.commit.n as u64; + if tx.offset != expected_offset { + self.commit.n = 0; + self.commit.records.clear(); - let _checksum = self - .commit - .write(&mut self.inner) - .unwrap_or_else(|e| panic!("failed to write commit {}: {:#}", self.commit.min_tx_offset, e)); - let commit_len = self.commit.encoded_len() as u64; - - if let Some(index) = self.offset_index_head.as_mut() { - let _ = index - .append_after_commit(self.commit.min_tx_offset, self.bytes_written, commit_len) - .inspect_err(|e| debug!("failed to append to offset index: {e}")); + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("invalid transaction offset {}, expected {}", tx.offset, expected_offset), + )); } + // Increment `n` using checked add for error context. + self.commit.n = self + .commit + .n + .checked_add(1) + .expect("maximum number of transactions in single commit exceeded"); + tx.txdata.encode_record(&mut self.commit.records); + } - self.bytes_written += commit_len; - self.commit.min_tx_offset += self.commit.n as u64; - self.commit.n = 0; - self.commit.records.clear(); + if self.commit.n == 0 { + return Ok(None); } - Ok(()) + let checksum = self + .commit + .write(&mut self.inner) + .unwrap_or_else(|e| panic!("failed to write commit {}: {:#}", self.commit.min_tx_offset, e)); + let commit_len = self.commit.encoded_len() as u64; + + if let Some(index) = self.offset_index_head.as_mut() { + let _ = index + .append_after_commit(self.commit.min_tx_offset, self.bytes_written, commit_len) + .inspect_err(|e| debug!("failed to append to offset index: {e}")); + } + + let tx_range_start = self.commit.min_tx_offset; + + self.bytes_written += commit_len; + self.commit.min_tx_offset += self.commit.n as u64; + self.commit.n = 0; + self.commit.records.clear(); + + Ok(Some(Committed { + tx_range: tx_range_start..self.commit.min_tx_offset, + checksum, + })) } pub fn flush(&mut self) -> io::Result<()> { From f722edb90e9a8aa703750d0f8fbc3486bea9b146 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 28 Jan 2026 11:39:28 +0100 Subject: [PATCH 06/21] Docs --- crates/commitlog/src/commitlog.rs | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index 9b0e8104049..9d7358d626e 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -139,6 +139,11 @@ impl Generic { /// /// Using a filesystem backend, this means to call `fsync(2)`. /// + /// **Note** that this does not flush the buffered data from calls to + /// [Self::commit], it only instructs the underlying storage to flush its + /// buffers. Call [Self::flush] prior to this method to ensure data from + /// all previous [Self::commit] calls is flushed to the underlying storage. + /// /// # Panics /// /// As an `fsync` failure leaves a file in a more of less undefined state, @@ -152,10 +157,16 @@ impl Generic { self.panicked = false; } + /// Flush the buffered data from previous calls to [Self::commit] to the + /// underlying storage. + /// + /// Call [Self::sync] to instruct the underlying storage to flush its + /// buffers as well. pub fn flush(&mut self) -> io::Result<()> { self.head.flush() } + /// Calls [Self::flush] and then [Self::sync]. fn flush_and_sync(&mut self) -> io::Result<()> { self.flush()?; self.sync(); @@ -272,16 +283,22 @@ impl Generic { impl Generic { /// Write `transactions` to the log. /// - /// This will store all transactions as a single [Commit] + /// This will store all `transactions` as a single [Commit] /// (note that `transactions` must not yield more than [u16::MAX] elements). /// /// Data is buffered by the underlying segment [Writer]. - /// Call [Self::flush] to force flushing to the OS. + /// Call [Self::flush] to force flushing to the underlying storage. /// /// If, after writing the transactions, the writer's total written bytes /// exceed [Options::max_segment_size], the current segment is flushed, /// `fsync`ed and closed, and a new segment is created. /// + /// Returns `Ok(None)` if `transactions` was empty, otherwise [Committed], + /// which contains the offset range and checksum of the commit. + /// + /// Note that supplying empty `transactions` may cause the current segment + /// to be rotated. + /// /// # Errors /// /// An `Err` value is returned in the following cases: From 550aa4bf86c42fd9b374d11bdd75087df4a334fb Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 28 Jan 2026 11:41:52 +0100 Subject: [PATCH 07/21] `set_epoch` doesn't need to flush --- crates/commitlog/src/commitlog.rs | 21 +++++---------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index 9d7358d626e..ef33d2d5491 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -107,32 +107,21 @@ impl Generic { /// Update the current epoch. /// - /// Calls [`Self::commit`] to flush all data of the previous epoch, and - /// returns the result. - /// /// Does nothing if the given `epoch` is equal to the current epoch. /// /// # Errors /// /// If `epoch` is smaller than the current epoch, an error of kind /// [`io::ErrorKind::InvalidInput`] is returned. - /// - /// Also see [`Self::commit`]. pub fn set_epoch(&mut self, epoch: u64) -> io::Result<()> { - use std::cmp::Ordering::*; - - match epoch.cmp(&self.head.epoch()) { - Less => Err(io::Error::new( + if epoch < self.head.epoch() { + return Err(io::Error::new( io::ErrorKind::InvalidInput, "new epoch is smaller than current epoch", - )), - Equal => Ok(()), - Greater => { - self.flush()?; - self.head.set_epoch(epoch); - Ok(()) - } + )); } + self.head.set_epoch(epoch); + Ok(()) } /// Force the currently active segment to be flushed to storage. From 876f07b65285d867616bd7e55c46c9958155819c Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 28 Jan 2026 11:42:43 +0100 Subject: [PATCH 08/21] Return `Committed` from all `commit` methods --- crates/commitlog/src/commitlog.rs | 9 ++++++--- crates/commitlog/src/lib.rs | 5 +++++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index ef33d2d5491..03105ad5bd1 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -323,17 +323,20 @@ impl Generic { /// becomes invalid and thus a panic is raised. /// /// - [Self::sync] panics (called when rotating segments) - pub fn commit>>(&mut self, transactions: impl IntoIterator) -> io::Result<()> { + pub fn commit>>( + &mut self, + transactions: impl IntoIterator, + ) -> io::Result> { self.panicked = true; let writer = &mut self.head; - writer.commit(transactions)?; + let committed = writer.commit(transactions)?; if writer.len() >= self.opts.max_segment_size { self.flush_and_sync()?; self.start_new_segment()?; } self.panicked = false; - Ok(()) + Ok(committed) } pub fn transactions_from<'a, D>( diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index 725d501671f..5a1c543a9ee 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -17,6 +17,7 @@ pub mod segment; mod varchar; mod varint; +use crate::segment::Committed; pub use crate::{ commit::{Commit, StoredCommit}, payload::{Decoder, Encode}, @@ -370,6 +371,10 @@ impl Commitlog { impl Commitlog { pub fn commit>>(&self, transactions: impl IntoIterator) -> io::Result<()> { + pub fn commit>>( + &self, + transactions: impl IntoIterator, + ) -> io::Result> { let mut inner = self.inner.write().unwrap(); inner.commit(transactions) } From 0cf8defc692dd11f18511718f84f482c642cbb33 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 28 Jan 2026 11:43:29 +0100 Subject: [PATCH 09/21] Docs --- crates/commitlog/src/lib.rs | 47 ++++++++++++++++++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index 5a1c543a9ee..41cec481446 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -370,7 +370,52 @@ impl Commitlog { } impl Commitlog { - pub fn commit>>(&self, transactions: impl IntoIterator) -> io::Result<()> { + /// Write `transactions` to the log. + /// + /// This will store all `transactions` as a single [Commit] + /// (note that `transactions` must not yield more than [u16::MAX] elements). + /// + /// Data is buffered internally, call [Self::flush] to force flushing to + /// the underlying storage. + /// + /// Returns `Ok(None)` if `transactions` was empty, otherwise [Committed], + /// which contains the offset range and checksum of the commit. + /// + /// # Errors + /// + /// An `Err` value is returned in the following cases: + /// + /// - if the transaction sequence is invalid, e.g. because the transaction + /// offsets are not contiguous. + /// + /// In this case, **none** of the `transactions` will be written. + /// + /// - if the current segment needs to be rotated, and an I/O error occurs + /// flushing it to storage. + /// + /// In this case, unwritten data remains buffered, and the current segment + /// remains open. Calling [Self::flush] afterwards may (or may not) + /// succeed, and calling [Self::commit] again with new data could grow + /// the segment further beyond [Options::max_segment_size] if successful. + /// + /// It is advisable to close and reopen the commitlog handle before + /// attempting further writes. + /// + /// - if creating the new segment fails due to an I/O error. + /// + /// # Panics + /// + /// The method panics if: + /// + /// - `transactions` exceeds [u16::MAX] elements + /// + /// - writing to the underlying buffered writer fails + /// + /// This is likely caused by some storage issue. As we cannot tell with + /// certainty how much data (if any) has been written, the internal state + /// becomes invalid and thus a panic is raised. + /// + /// - [Self::sync] panics (called when rotating segments) pub fn commit>>( &self, transactions: impl IntoIterator, From a50b42299f6f7d58f1b6fe63441e691e0aaef60d Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 28 Jan 2026 12:10:10 +0100 Subject: [PATCH 10/21] Use assert --- crates/commitlog/src/segment.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index c11f1b305f2..8de47e5a05a 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -120,12 +120,11 @@ impl Writer { format!("invalid transaction offset {}, expected {}", tx.offset, expected_offset), )); } - // Increment `n` using checked add for error context. - self.commit.n = self - .commit - .n - .checked_add(1) - .expect("maximum number of transactions in single commit exceeded"); + assert!( + self.commit.n < u16::MAX, + "maximum number of transactions in a single commit exceeded" + ); + self.commit.n += 1; tx.txdata.encode_record(&mut self.commit.records); } @@ -136,6 +135,9 @@ impl Writer { let checksum = self .commit .write(&mut self.inner) + // Panic here as we don't know how much of the commit has been + // written (if anything). Further commits would leave corrupted data + // in the log. .unwrap_or_else(|e| panic!("failed to write commit {}: {:#}", self.commit.min_tx_offset, e)); let commit_len = self.commit.encoded_len() as u64; From fd274774a4410f7bbc67044e41ec3c36a9eb0c6f Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 28 Jan 2026 16:12:15 +0100 Subject: [PATCH 11/21] Restore the commit corruption after ENOSPC test --- crates/commitlog/src/commitlog.rs | 16 +--- crates/commitlog/src/lib.rs | 13 +-- crates/commitlog/src/tests/partial.rs | 125 ++++++++++++++++++++++++-- 3 files changed, 123 insertions(+), 31 deletions(-) diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index 03105ad5bd1..3d6df97d7c2 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -297,17 +297,6 @@ impl Generic { /// /// In this case, **none** of the `transactions` will be written. /// - /// - if the current segment needs to be rotated, and an I/O error occurs - /// flushing it to storage. - /// - /// In this case, unwritten data remains buffered, and the current segment - /// remains open. Calling [Self::flush] afterwards may (or may not) - /// succeed, and calling [Self::commit] again with new data could grow - /// the segment further beyond [Options::max_segment_size] if successful. - /// - /// It is advisable to close and reopen the commitlog handle before - /// attempting further writes. - /// /// - if creating the new segment fails due to an I/O error. /// /// # Panics @@ -316,7 +305,7 @@ impl Generic { /// /// - `transactions` exceeds [u16::MAX] elements /// - /// - writing to the underlying [Writer] fails + /// - [Self::flush] or writing to the underlying [Writer] fails /// /// This is likely caused by some storage issue. As we cannot tell with /// certainty how much data (if any) has been written, the internal state @@ -331,7 +320,8 @@ impl Generic { let writer = &mut self.head; let committed = writer.commit(transactions)?; if writer.len() >= self.opts.max_segment_size { - self.flush_and_sync()?; + self.flush().expect("failed to flush segment"); + self.sync(); self.start_new_segment()?; } self.panicked = false; diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index 41cec481446..a830928ce5a 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -390,17 +390,6 @@ impl Commitlog { /// /// In this case, **none** of the `transactions` will be written. /// - /// - if the current segment needs to be rotated, and an I/O error occurs - /// flushing it to storage. - /// - /// In this case, unwritten data remains buffered, and the current segment - /// remains open. Calling [Self::flush] afterwards may (or may not) - /// succeed, and calling [Self::commit] again with new data could grow - /// the segment further beyond [Options::max_segment_size] if successful. - /// - /// It is advisable to close and reopen the commitlog handle before - /// attempting further writes. - /// /// - if creating the new segment fails due to an I/O error. /// /// # Panics @@ -409,7 +398,7 @@ impl Commitlog { /// /// - `transactions` exceeds [u16::MAX] elements /// - /// - writing to the underlying buffered writer fails + /// - [Self::flush] or writing to the underlying buffered writer fails /// /// This is likely caused by some storage issue. As we cannot tell with /// certainty how much data (if any) has been written, the internal state diff --git a/crates/commitlog/src/tests/partial.rs b/crates/commitlog/src/tests/partial.rs index ebda9e29532..11c22fb08cd 100644 --- a/crates/commitlog/src/tests/partial.rs +++ b/crates/commitlog/src/tests/partial.rs @@ -3,28 +3,131 @@ use std::{ fmt::{self, Debug}, io::{self, Seek as _, SeekFrom}, iter, + ops::Range, }; -use log::debug; +use log::{debug, info}; use pretty_assertions::assert_matches; use crate::{ - commitlog, + commitlog, payload, repo::{self, Repo, SegmentLen}, segment::{self, FileLike}, tests::helpers::{enable_logging, fill_log_with}, - Options, + Commit, Options, TxOffset, DEFAULT_LOG_FORMAT_VERSION, }; #[test] -#[should_panic] -fn panics_on_enospc() { +#[should_panic(expected = "failed to flush segment")] +fn panics_on_partial_write() { enable_logging(); let mut log = open_log::<[u8; 32]>(ShortMem::new(800)); - for i in 0..100 { + for i in 0..20 { + info!("commit {i}"); + log.commit([(i, [b'z'; 32])]).expect("unexpected `Err` result"); + } +} + +fn fill_log(mut log: commitlog::Generic, range: Range) { + debug!("writing range {range:?}"); + + let end = range.end; + for i in range { + info!("commit {i}"); log.commit([(i, [b'z'; 32])]).unwrap(); } + log.flush().unwrap(); + + // Try to write one more, which should fail. + log.commit([(end, [b'x'; 32])]).unwrap(); + assert_matches!( + log.flush(), + Err(e) if e.kind() == io::ErrorKind::StorageFull + ); +} +/// Tests that, when a partial write occurs, we can read all flushed commits +/// up until the faulty one. +#[test] +fn read_log_up_to_partial_write() { + enable_logging(); + + const MAX_SEGMENT_SIZE: usize = 800; + const TXDATA_SIZE: usize = 32; + const COMMIT_SIZE: usize = Commit::FRAMING_LEN + TXDATA_SIZE; + const TOTAL_TXS: usize = MAX_SEGMENT_SIZE / COMMIT_SIZE; + + let repo = ShortMem::new(MAX_SEGMENT_SIZE as u64); + fill_log(open_log::<[u8; TXDATA_SIZE]>(repo.clone()), 0..(TOTAL_TXS as u64)); + + let txs = commitlog::transactions_from( + repo, + DEFAULT_LOG_FORMAT_VERSION, + 0, + &payload::ArrayDecoder::, + ) + .unwrap() + .map(Result::unwrap) + .count(); + + assert_eq!(txs, TOTAL_TXS,); +} + +/// Tests: +/// +/// - fill log until a partial write occurs +/// - corrupt the last successfully written commit +/// - fill log until a partial write occurs +/// +/// The log should detect the corrupt commit, create a fresh segment, and write +/// the second batch until ENOSPC. Traversal should work. +#[test] +fn reopen_with_corrupt_last_commit() { + enable_logging(); + + const MAX_SEGMENT_SIZE: usize = 800; + const TXDATA_SIZE: usize = 32; + const COMMIT_SIZE: usize = Commit::FRAMING_LEN + TXDATA_SIZE; + const TXS_PER_SEGMENT: u64 = (MAX_SEGMENT_SIZE / COMMIT_SIZE) as u64; + const TOTAL_TXS: u64 = (TXS_PER_SEGMENT * 2) - 1; + + let repo = ShortMem::new(MAX_SEGMENT_SIZE as u64); + + // Fill with as many txs as possible until ENOSPC. + fill_log(open_log::<[u8; TXDATA_SIZE]>(repo.clone()), 0..TXS_PER_SEGMENT); + + // Invalidate the checksum of the last commit. + let last_segment_offset = repo.existing_offsets().unwrap().last().copied().unwrap(); + let last_commit: Commit = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, last_segment_offset) + .unwrap() + .commits() + .map(Result::unwrap) + .last() + .unwrap() + .into(); + { + let mut last_segment = repo.open_segment_writer(last_segment_offset).unwrap(); + let pos = last_segment.len() - last_commit.encoded_len() + 1; + last_segment.modify_byte_at(pos, |_| 255); + } + + // Write a second batch, starting with the offset of the corrupt commit. + fill_log( + open_log::<[u8; TXDATA_SIZE]>(repo.clone()), + last_commit.min_tx_offset..TOTAL_TXS, + ); + + let txs = commitlog::transactions_from( + repo, + DEFAULT_LOG_FORMAT_VERSION, + 0, + &payload::ArrayDecoder::, + ) + .unwrap() + .map(Result::unwrap) + .count(); + + assert_eq!(txs as u64, TOTAL_TXS); } /// Edge case surfaced in production: @@ -82,6 +185,16 @@ struct ShortSegment { max_len: u64, } +impl ShortSegment { + pub fn len(&self) -> usize { + self.inner.len() + } + + pub fn modify_byte_at(&mut self, pos: usize, f: impl FnOnce(u8) -> u8) { + self.inner.modify_byte_at(pos, f); + } +} + impl SegmentLen for ShortSegment { fn segment_len(&mut self) -> io::Result { self.inner.segment_len() From f37fad3f77b0a260b608f17439f68c8eadc6bdd8 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 28 Jan 2026 17:17:48 +0100 Subject: [PATCH 12/21] Add TODO --- crates/durability/src/lib.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/durability/src/lib.rs b/crates/durability/src/lib.rs index 43c236a6388..d4ca740e36a 100644 --- a/crates/durability/src/lib.rs +++ b/crates/durability/src/lib.rs @@ -115,6 +115,9 @@ pub trait Durability: Send + Sync { /// cannot be made durable immediately. /// /// Errors may be signalled by panicking. + // + // TODO(perf): Can we avoid allocating a new `Box<[_]>` for every commit, + // or at least reuse boxes? fn commit(&self, txs: Box<[Transaction]>); /// Obtain a handle to the [DurableOffset]. From 4a16bf5f8447fb0103f4503307a07758af13663b Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 28 Jan 2026 17:22:02 +0100 Subject: [PATCH 13/21] Fix fallocate tests --- crates/durability/tests/io/fallocate.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/durability/tests/io/fallocate.rs b/crates/durability/tests/io/fallocate.rs index 5dc67dd13d3..4a484c42823 100644 --- a/crates/durability/tests/io/fallocate.rs +++ b/crates/durability/tests/io/fallocate.rs @@ -98,8 +98,8 @@ async fn local_durability_crashes_on_new_segment_if_not_enough_space() { // Mark initial segment as seen. new_segment_rx.borrow_and_update(); // Write past available space. - for _ in 0..256 { - durability.append_tx(txdata.clone()); + for offset in 0..256 { + durability.commit([(offset, txdata.clone()).into()].into()); } // Ensure new segment is created. new_segment_rx.changed().await?; @@ -107,7 +107,7 @@ async fn local_durability_crashes_on_new_segment_if_not_enough_space() { sleep(Duration::from_millis(5)).await; // Durability actor should have crashed, so this should panic. info!("trying append on crashed durability"); - durability.append_tx(txdata.clone()); + durability.commit([(256, txdata.clone()).into()].into()); } Ok(()) @@ -168,7 +168,6 @@ async fn local_durability( spacetimedb_durability::local::Options { commitlog: spacetimedb_commitlog::Options { max_segment_size, - max_records_in_commit: 1.try_into().unwrap(), preallocate_segments: true, ..<_>::default() }, From 0ad5bdf35d3d709278cde23b35a5c7b129bc3f69 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Thu, 29 Jan 2026 09:46:19 +0100 Subject: [PATCH 14/21] Revert durability trait changes We don't really need batched transactions at the moment, so avoid the boxed array allocation. Durability::append_tx takes a `Transaction`, though, requiring the offset to be supplied by the datastore. --- crates/core/src/db/durability.rs | 18 ++++++------------ .../subscription/module_subscription_actor.rs | 4 ++-- crates/durability/src/imp/local.rs | 16 ++++++++-------- crates/durability/src/imp/mod.rs | 2 +- crates/durability/src/lib.rs | 7 ++----- 5 files changed, 19 insertions(+), 28 deletions(-) diff --git a/crates/core/src/db/durability.rs b/crates/core/src/db/durability.rs index 1476b56edbf..06129f33f6f 100644 --- a/crates/core/src/db/durability.rs +++ b/crates/core/src/db/durability.rs @@ -253,13 +253,10 @@ impl DurabilityWorkerActor { }; // This does not block, as per trait docs. - durability.commit( - [Transaction { - offset: tx_offset, - txdata, - }] - .into(), - ); + durability.append_tx(Transaction { + offset: tx_offset, + txdata, + }); } } @@ -297,12 +294,9 @@ mod tests { impl spacetimedb_durability::Durability for CountingDurability { type TxData = Txdata; - fn commit(&self, txs: Box<[Transaction]>) { - let Some(max_offset) = txs.iter().map(|x| x.offset).max() else { - return; - }; + fn append_tx(&self, tx: Transaction) { self.appended.send_modify(|offset| { - offset.replace(max_offset); + offset.replace(tx.offset); }); } diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 5d2c494cba2..7de8b2f3930 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1551,9 +1551,9 @@ mod tests { impl Durability for ManualDurability { type TxData = Txdata; - fn commit(&self, txs: Box<[Transaction]>) { + fn append_tx(&self, tx: Transaction) { let mut commitlog = self.commitlog.write().unwrap(); - commitlog.commit(txs).expect("commit failed"); + commitlog.commit([tx]).expect("commit failed"); commitlog.flush().expect("error flushing commitlog"); } diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index f0c6daf474f..04093c7e730 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -78,7 +78,7 @@ pub struct Local { /// [`PersisterTask`]. /// /// Note that this is unbounded! - queue: mpsc::UnboundedSender>]>>, + queue: mpsc::UnboundedSender>>, /// How many transactions are sitting in the `queue`. /// /// This is mainly for observability purposes, and can thus be updated with @@ -197,7 +197,7 @@ impl Actor { #[instrument(name = "durability::local::actor", skip_all)] async fn run( self, - mut commits_rx: mpsc::UnboundedReceiver>]>>, + mut transactions_rx: mpsc::UnboundedReceiver>>, mut shutdown_rx: mpsc::Receiver>, ) { info!("starting durability actor"); @@ -220,7 +220,7 @@ impl Actor { biased; Some(reply) = shutdown_rx.recv() => { - commits_rx.close(); + transactions_rx.close(); let _ = reply.send(self.lock.notified()); }, @@ -231,13 +231,13 @@ impl Actor { } }, - commit = commits_rx.recv() => { - let Some(commit) = commit else { + tx = transactions_rx.recv() => { + let Some(tx) = tx else { break; }; self.queue_depth.fetch_sub(1, Relaxed); let clog = self.clog.clone(); - spawn_blocking(move || clog.commit(commit)) + spawn_blocking(move || clog.commit([tx])) .await .expect("commitlog write panicked") .expect("commitlog write failed"); @@ -312,8 +312,8 @@ impl Drop for Lock { impl Durability for Local { type TxData = Txdata; - fn commit(&self, txs: Box<[Transaction]>) { - self.queue.send(txs).expect("durability actor crashed"); + fn append_tx(&self, tx: Transaction) { + self.queue.send(tx).expect("durability actor crashed"); self.queue_depth.fetch_add(1, Relaxed); } diff --git a/crates/durability/src/imp/mod.rs b/crates/durability/src/imp/mod.rs index 562b9b56eab..1636e05cc51 100644 --- a/crates/durability/src/imp/mod.rs +++ b/crates/durability/src/imp/mod.rs @@ -41,7 +41,7 @@ mod testing { impl Durability for NoDurability { type TxData = T; - fn commit(&self, _: Box<[Transaction]>) { + fn append_tx(&self, _: Transaction) { if self.closed.load(Ordering::Relaxed) { panic!("`close` was called on this `NoDurability` instance"); } diff --git a/crates/durability/src/lib.rs b/crates/durability/src/lib.rs index d4ca740e36a..76479c88943 100644 --- a/crates/durability/src/lib.rs +++ b/crates/durability/src/lib.rs @@ -109,16 +109,13 @@ pub trait Durability: Send + Sync { /// The payload representing a single transaction. type TxData; - /// Submit a batch of [Transaction]s to be made durable. + /// Submit a [Transaction] to be made durable. /// /// This method must never block, and accept new transactions even if they /// cannot be made durable immediately. /// /// Errors may be signalled by panicking. - // - // TODO(perf): Can we avoid allocating a new `Box<[_]>` for every commit, - // or at least reuse boxes? - fn commit(&self, txs: Box<[Transaction]>); + fn append_tx(&self, tx: Transaction); /// Obtain a handle to the [DurableOffset]. fn durable_tx_offset(&self) -> DurableOffset; From c31efd0c2517d41ec2aa4faab95877ce8a5fb27a Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Thu, 29 Jan 2026 19:50:27 +0100 Subject: [PATCH 15/21] Make default sync interval much smaller --- crates/durability/src/imp/local.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index 04093c7e730..3dbc274846a 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -32,7 +32,7 @@ pub use spacetimedb_commitlog::repo::{OnNewSegmentFn, SizeOnDisk}; pub struct Options { /// Periodically flush and sync the log this often. /// - /// Default: 500ms + /// Default: 50ms pub sync_interval: Duration, /// [`Commitlog`] configuration. pub commitlog: spacetimedb_commitlog::Options, @@ -41,7 +41,7 @@ pub struct Options { impl Default for Options { fn default() -> Self { Self { - sync_interval: Duration::from_millis(500), + sync_interval: Duration::from_millis(50), commitlog: Default::default(), } } From a582f67eb1d1c2235ebc835ab96b0c21801af077 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Fri, 30 Jan 2026 10:40:21 +0100 Subject: [PATCH 16/21] Add option to flush after each tx (previous behaviour) --- crates/durability/src/imp/local.rs | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index 3dbc274846a..c9397ca0fe3 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -34,6 +34,10 @@ pub struct Options { /// /// Default: 50ms pub sync_interval: Duration, + /// If `true`, flush (but not sync) each transaction. + /// + /// Default: false + pub flush_each_tx: bool, /// [`Commitlog`] configuration. pub commitlog: spacetimedb_commitlog::Options, } @@ -42,6 +46,7 @@ impl Default for Options { fn default() -> Self { Self { sync_interval: Duration::from_millis(50), + flush_each_tx: false, commitlog: Default::default(), } } @@ -130,6 +135,7 @@ impl Local { queue_depth: queue_depth.clone(), sync_interval: opts.sync_interval, + flush_each_tx: opts.flush_each_tx, lock, } @@ -188,6 +194,7 @@ struct Actor { queue_depth: Arc, sync_interval: Duration, + flush_each_tx: bool, #[allow(unused)] lock: Lock, @@ -237,10 +244,18 @@ impl Actor { }; self.queue_depth.fetch_sub(1, Relaxed); let clog = self.clog.clone(); - spawn_blocking(move || clog.commit([tx])) - .await - .expect("commitlog write panicked") - .expect("commitlog write failed"); + let flush = self.flush_each_tx; + spawn_blocking(move || -> io::Result<()> { + clog.commit([tx])?; + if flush { + clog.flush()?; + } + + Ok(()) + }) + .await + .expect("commitlog write panicked") + .expect("commitlog write failed"); }, } } From 88664fd5d4fa913c004424d2a4851c1e451d77b1 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Tue, 10 Feb 2026 09:50:52 +0100 Subject: [PATCH 17/21] Small touchups --- crates/commitlog/src/commitlog.rs | 6 +++--- crates/durability/src/lib.rs | 18 ++++++++++++------ crates/durability/tests/io/fallocate.rs | 4 ++-- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index 3d6df97d7c2..7ac339c5ae5 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -8,7 +8,7 @@ use std::{ }; use itertools::Itertools; -use log::{debug, info, trace, warn}; +use log::{debug, error, info, trace, warn}; use crate::{ commit::StoredCommit, @@ -320,7 +320,7 @@ impl Generic { let writer = &mut self.head; let committed = writer.commit(transactions)?; if writer.len() >= self.opts.max_segment_size { - self.flush().expect("failed to flush segment"); + self.flush().expect("failed to flush segment upon rotation"); self.sync(); self.start_new_segment()?; } @@ -371,7 +371,7 @@ impl Drop for Generic { fn drop(&mut self) { if !self.panicked { if let Err(e) = self.flush_and_sync() { - warn!("failed to flush on drop: {e:#}"); + error!("failed to flush on drop: {e:#}"); } } } diff --git a/crates/durability/src/lib.rs b/crates/durability/src/lib.rs index 76479c88943..7722d86914a 100644 --- a/crates/durability/src/lib.rs +++ b/crates/durability/src/lib.rs @@ -95,12 +95,8 @@ pub type Close = BoxFuture<'static, Option>; /// /// NOTE: This is a preliminary definition, still under consideration. /// -/// A durability implementation accepts one or more [Transaction]s to be made -/// durable via [Durability::commit] in a non-blocking fashion. -/// -/// A batch of transactions is eventually made durable atomically. -/// Note that this means that a torn write can render the whole batch -/// inaccessible, so small batches are usually preferable. +/// A durability implementation accepts a [Transaction] to be made durable via +/// the [Durability::append_tx] method in a non-blocking fashion. /// /// Once a transaction becomes durable, the [DurableOffset] is updated. /// What durable means depends on the implementation, informally it can be @@ -115,6 +111,16 @@ pub trait Durability: Send + Sync { /// cannot be made durable immediately. /// /// Errors may be signalled by panicking. + // + // TODO: Support batches of txs, i.e. commits. + // + // The commitlog supports this, but allocation overhead in the durability + // API is too high given we don't make any use of it. + // + // We don't make any use of it because a commit is an atomic unit of storage + // (i.e. a torn write will corrupt all transactions contained in it), and it + // is very unclear when it is both correct and beneficial to bundle more + // than a single transaction into a commit. fn append_tx(&self, tx: Transaction); /// Obtain a handle to the [DurableOffset]. diff --git a/crates/durability/tests/io/fallocate.rs b/crates/durability/tests/io/fallocate.rs index 4a484c42823..be08d3f6a4d 100644 --- a/crates/durability/tests/io/fallocate.rs +++ b/crates/durability/tests/io/fallocate.rs @@ -99,7 +99,7 @@ async fn local_durability_crashes_on_new_segment_if_not_enough_space() { new_segment_rx.borrow_and_update(); // Write past available space. for offset in 0..256 { - durability.commit([(offset, txdata.clone()).into()].into()); + durability.append_tx((offset, txdata.clone()).into()); } // Ensure new segment is created. new_segment_rx.changed().await?; @@ -107,7 +107,7 @@ async fn local_durability_crashes_on_new_segment_if_not_enough_space() { sleep(Duration::from_millis(5)).await; // Durability actor should have crashed, so this should panic. info!("trying append on crashed durability"); - durability.commit([(256, txdata.clone()).into()].into()); + durability.append_tx((256, txdata.clone()).into()); } Ok(()) From f52c13aba06f2d8d1ad16557df90affd35008dd4 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Tue, 10 Feb 2026 12:09:26 +0100 Subject: [PATCH 18/21] Expose config of write buffer size --- crates/commitlog/src/lib.rs | 12 ++++++++++++ crates/commitlog/src/repo/mod.rs | 2 +- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index a830928ce5a..efdbb145e41 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -89,6 +89,12 @@ pub struct Options { /// Has no effect if the `fallocate` feature is not enabled. #[cfg_attr(feature = "serde", serde(default = "Options::default_preallocate_segments"))] pub preallocate_segments: bool, + /// Size in bytes of the memory buffer holding commit data before flushing + /// to storage. + /// + /// Default: 4KiB + #[cfg_attr(feature = "serde", serde(default = "Options::default_write_buffer_size"))] + pub write_buffer_size: usize, } impl Default for Options { @@ -102,6 +108,7 @@ impl Options { pub const DEFAULT_OFFSET_INDEX_INTERVAL_BYTES: NonZeroU64 = NonZeroU64::new(4096).expect("4096 > 0, qed"); pub const DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC: bool = false; pub const DEFAULT_PREALLOCATE_SEGMENTS: bool = false; + pub const DEFAULT_WRITE_BUFFER_SIZE: usize = 4 * 1024; pub const DEFAULT: Self = Self { log_format_version: DEFAULT_LOG_FORMAT_VERSION, @@ -109,6 +116,7 @@ impl Options { offset_index_interval_bytes: Self::default_offset_index_interval_bytes(), offset_index_require_segment_fsync: Self::default_offset_index_require_segment_fsync(), preallocate_segments: Self::default_preallocate_segments(), + write_buffer_size: Self::default_write_buffer_size(), }; pub const fn default_log_format_version() -> u8 { @@ -131,6 +139,10 @@ impl Options { Self::DEFAULT_PREALLOCATE_SEGMENTS } + pub const fn default_write_buffer_size() -> usize { + Self::DEFAULT_WRITE_BUFFER_SIZE + } + /// Compute the length in bytes of an offset index based on the settings in /// `self`. pub fn offset_index_len(&self) -> u64 { diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index 2ab5579c8b2..1805e788703 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -214,7 +214,7 @@ pub fn create_segment_writer( records: Vec::new(), epoch, }, - inner: io::BufWriter::new(storage), + inner: io::BufWriter::with_capacity(opts.write_buffer_size, storage), min_tx_offset: offset, bytes_written: Header::LEN as u64, From 33abccd56cf27a4c8c36611559bdb4886fd87cc4 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Tue, 24 Feb 2026 08:16:20 +0100 Subject: [PATCH 19/21] Bump write buffer size to 8KiB --- crates/commitlog/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index bb6f389d672..d421c091513 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -93,7 +93,7 @@ pub struct Options { /// Size in bytes of the memory buffer holding commit data before flushing /// to storage. /// - /// Default: 4KiB + /// Default: 8KiB #[cfg_attr(feature = "serde", serde(default = "Options::default_write_buffer_size"))] pub write_buffer_size: usize, } @@ -109,7 +109,7 @@ impl Options { pub const DEFAULT_OFFSET_INDEX_INTERVAL_BYTES: NonZeroU64 = NonZeroU64::new(4096).expect("4096 > 0, qed"); pub const DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC: bool = false; pub const DEFAULT_PREALLOCATE_SEGMENTS: bool = false; - pub const DEFAULT_WRITE_BUFFER_SIZE: usize = 4 * 1024; + pub const DEFAULT_WRITE_BUFFER_SIZE: usize = 8 * 1024; pub const DEFAULT: Self = Self { log_format_version: DEFAULT_LOG_FORMAT_VERSION, From d6e2ba51d6407d7897595e124f32e9ac9debcbee Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Fri, 27 Feb 2026 20:24:57 +0100 Subject: [PATCH 20/21] durability: Flush batches (#4478) Instead of periodic flush + sync, the simplified commitlog buffering makes it easy to just pop a batch of transaction from the queue, commit them, and then flush + sync after each batch. This may lead to an overall higher number of fsyncs on the system, but also adapts to the tx throughput of the individual database (up to the batch size). After taking some measurements, we may want to make the batch size configurable at runtime. Depends-on: #4404 --- crates/durability/src/imp/local.rs | 72 +++++++++++++++--------------- 1 file changed, 35 insertions(+), 37 deletions(-) diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index c9397ca0fe3..dc2889c41d2 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -1,11 +1,11 @@ use std::{ io, + num::NonZeroUsize, path::PathBuf, sync::{ atomic::{AtomicU64, Ordering::Relaxed}, Arc, }, - time::Duration, }; use futures::{FutureExt as _, TryFutureExt as _}; @@ -19,7 +19,6 @@ use thiserror::Error; use tokio::{ sync::{futures::OwnedNotified, mpsc, oneshot, watch, Notify}, task::{spawn_blocking, AbortHandle}, - time::{interval, MissedTickBehavior}, }; use tracing::{instrument, Span}; @@ -30,23 +29,29 @@ pub use spacetimedb_commitlog::repo::{OnNewSegmentFn, SizeOnDisk}; /// [`Local`] configuration. #[derive(Clone, Copy, Debug)] pub struct Options { - /// Periodically flush and sync the log this often. + /// The number of elements to reserve for batching transactions. /// - /// Default: 50ms - pub sync_interval: Duration, - /// If `true`, flush (but not sync) each transaction. + /// This puts an upper bound on the buffer capacity, while not preventing + /// reallocations when the number of queued transactions exceeds it. /// - /// Default: false - pub flush_each_tx: bool, + /// In other words, the durability actor will attempt to receive all + /// transactions that are currently in the queue, but shrink the buffer to + /// `batch_capacity` if it had to make additional space during a burst. + /// + /// Default: 4096 + pub batch_capacity: NonZeroUsize, /// [`Commitlog`] configuration. pub commitlog: spacetimedb_commitlog::Options, } +impl Options { + pub const DEFAULT_BATCH_CAPACITY: NonZeroUsize = NonZeroUsize::new(4096).unwrap(); +} + impl Default for Options { fn default() -> Self { Self { - sync_interval: Duration::from_millis(50), - flush_each_tx: false, + batch_capacity: Self::DEFAULT_BATCH_CAPACITY, commitlog: Default::default(), } } @@ -134,8 +139,7 @@ impl Local { durable_offset: durable_tx, queue_depth: queue_depth.clone(), - sync_interval: opts.sync_interval, - flush_each_tx: opts.flush_each_tx, + batch_capacity: opts.batch_capacity, lock, } @@ -193,8 +197,7 @@ struct Actor { durable_offset: watch::Sender>, queue_depth: Arc, - sync_interval: Duration, - flush_each_tx: bool, + batch_capacity: NonZeroUsize, #[allow(unused)] lock: Lock, @@ -209,8 +212,7 @@ impl Actor { ) { info!("starting durability actor"); - let mut sync_interval = interval(self.sync_interval); - sync_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + let mut tx_buf = Vec::with_capacity(self.batch_capacity.get()); // `flush_and_sync` when the loop exits without panicking, // or `flush_and_sync` inside the loop failed. let mut sync_on_exit = true; @@ -220,10 +222,6 @@ impl Actor { // Biased towards the shutdown channel, // so that we stop accepting new data promptly after // `Durability::close` was called. - // - // Note that periodic `flush_and_sync` needs to be polled before - // the txdata channel, so that we don't delay `fsync(2)` under - // high transaction throughput. biased; Some(reply) = shutdown_rx.recv() => { @@ -231,31 +229,31 @@ impl Actor { let _ = reply.send(self.lock.notified()); }, - _ = sync_interval.tick() => { - if self.flush_and_sync().await.is_err() { - sync_on_exit = false; + // Pop as many elements from the channel as possible, + // potentially requiring the `tx_buf` to allocate additional + // capacity. + // We'll reclaim capacity in excess of `self.batch_size` below. + n = transactions_rx.recv_many(&mut tx_buf, usize::MAX) => { + if n == 0 { break; } - }, - - tx = transactions_rx.recv() => { - let Some(tx) = tx else { - break; - }; - self.queue_depth.fetch_sub(1, Relaxed); + self.queue_depth.fetch_sub(n as u64, Relaxed); let clog = self.clog.clone(); - let flush = self.flush_each_tx; - spawn_blocking(move || -> io::Result<()> { - clog.commit([tx])?; - if flush { - clog.flush()?; + tx_buf = spawn_blocking(move || -> io::Result>>> { + for tx in tx_buf.drain(..) { + clog.commit([tx])?; } - - Ok(()) + Ok(tx_buf) }) .await .expect("commitlog write panicked") .expect("commitlog write failed"); + if self.flush_and_sync().await.is_err() { + sync_on_exit = false; + break; + } + // Reclaim burst capacity. + tx_buf.shrink_to(self.batch_capacity.get()); }, } } From a88f68ce06d21c0aafff15fb522e19d32ef3d221 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Mon, 2 Mar 2026 08:28:10 +0100 Subject: [PATCH 21/21] Address review comment https://github.com/clockworklabs/SpacetimeDB/pull/4478#discussion_r2865182289 --- crates/durability/src/imp/local.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index dc2889c41d2..5d6dac7b0ba 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -253,7 +253,9 @@ impl Actor { break; } // Reclaim burst capacity. - tx_buf.shrink_to(self.batch_capacity.get()); + if n < self.batch_capacity.get() { + tx_buf.shrink_to(self.batch_capacity.get()); + } }, } }