Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
06f9c2e
Append commit instead of individual transactions to commitlog
kim Jan 7, 2026
a0de7d9
Restore some commentary
kim Jan 27, 2026
e73abcc
Clear commit before returning error
kim Jan 28, 2026
d5b29cc
More commentary
kim Jan 28, 2026
de918d5
Panic if >u16::MAX transactions
kim Jan 28, 2026
f722edb
Docs
kim Jan 28, 2026
550aa4b
`set_epoch` doesn't need to flush
kim Jan 28, 2026
876f07b
Return `Committed` from all `commit` methods
kim Jan 28, 2026
0cf8def
Docs
kim Jan 28, 2026
a50b422
Use assert
kim Jan 28, 2026
fd27477
Restore the commit corruption after ENOSPC test
kim Jan 28, 2026
f37fad3
Add TODO
kim Jan 28, 2026
4a16bf5
Fix fallocate tests
kim Jan 28, 2026
0ad5bdf
Revert durability trait changes
kim Jan 29, 2026
6450e54
Merge remote-tracking branch 'origin/master' into kim/commitlog/appen…
kim Jan 29, 2026
c31efd0
Make default sync interval much smaller
kim Jan 29, 2026
a582f67
Add option to flush after each tx (previous behaviour)
kim Jan 30, 2026
f26a404
Merge remote-tracking branch 'origin/master' into HEAD
kim Jan 30, 2026
4ee7faf
Merge master
kim Feb 10, 2026
88664fd
Small touchups
kim Feb 10, 2026
f52c13a
Expose config of write buffer size
kim Feb 10, 2026
5d0f67c
Merge master
kim Feb 10, 2026
8ac99af
Merge master
kim Feb 23, 2026
33abccd
Bump write buffer size to 8KiB
kim Feb 24, 2026
c65f481
Merge master
kim Feb 25, 2026
d6e2ba5
durability: Flush batches (#4478)
kim Feb 27, 2026
a5cc304
Merge master into kim/commitlog/append-commit-reopen
kim Feb 28, 2026
a88f68c
Address review comment https://github.com/clockworklabs/SpacetimeDB/p…
kim Mar 2, 2026
2769cb6
Merge branch 'master' into kim/commitlog/append-commit-reopen
kim Mar 3, 2026
9e0d576
Merge master into kim/commitlog/append-commit-reopen
kim Mar 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 154 additions & 118 deletions crates/commitlog/src/commitlog.rs

Large diffs are not rendered by default.

105 changes: 47 additions & 58 deletions crates/commitlog/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
io,
num::{NonZeroU16, NonZeroU64},
num::NonZeroU64,
ops::RangeBounds,
sync::{Arc, RwLock},
};
Expand All @@ -17,11 +17,12 @@ pub mod segment;
mod varchar;
mod varint;

use crate::segment::Committed;
pub use crate::{
commit::{Commit, StoredCommit},
commitlog::CommittedMeta,
payload::{Decoder, Encode},
repo::fs::SizeOnDisk,
repo::{fs::SizeOnDisk, TxOffset},
segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION},
varchar::Varchar,
};
Expand Down Expand Up @@ -58,14 +59,6 @@ pub struct Options {
/// Default: 1GiB
#[cfg_attr(feature = "serde", serde(default = "Options::default_max_segment_size"))]
pub max_segment_size: u64,
/// The maximum number of records in a commit.
///
/// If this number is exceeded, the commit is flushed to disk even without
/// explicitly calling [`Commitlog::flush`].
///
/// Default: 1
#[cfg_attr(feature = "serde", serde(default = "Options::default_max_records_in_commit"))]
pub max_records_in_commit: NonZeroU16,
/// Whenever at least this many bytes have been written to the currently
/// active segment, an entry is added to its offset index.
///
Expand Down Expand Up @@ -97,6 +90,12 @@ pub struct Options {
/// Has no effect if the `fallocate` feature is not enabled.
#[cfg_attr(feature = "serde", serde(default = "Options::default_preallocate_segments"))]
pub preallocate_segments: bool,
/// Size in bytes of the memory buffer holding commit data before flushing
/// to storage.
///
/// Default: 8KiB
#[cfg_attr(feature = "serde", serde(default = "Options::default_write_buffer_size"))]
pub write_buffer_size: usize,
}

impl Default for Options {
Expand All @@ -107,18 +106,18 @@ impl Default for Options {

impl Options {
pub const DEFAULT_MAX_SEGMENT_SIZE: u64 = 1024 * 1024 * 1024;
pub const DEFAULT_MAX_RECORDS_IN_COMMIT: NonZeroU16 = NonZeroU16::new(1).expect("1 > 0, qed");
pub const DEFAULT_OFFSET_INDEX_INTERVAL_BYTES: NonZeroU64 = NonZeroU64::new(4096).expect("4096 > 0, qed");
pub const DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC: bool = false;
pub const DEFAULT_PREALLOCATE_SEGMENTS: bool = false;
pub const DEFAULT_WRITE_BUFFER_SIZE: usize = 8 * 1024;

pub const DEFAULT: Self = Self {
log_format_version: DEFAULT_LOG_FORMAT_VERSION,
max_segment_size: Self::default_max_segment_size(),
max_records_in_commit: Self::default_max_records_in_commit(),
offset_index_interval_bytes: Self::default_offset_index_interval_bytes(),
offset_index_require_segment_fsync: Self::default_offset_index_require_segment_fsync(),
preallocate_segments: Self::default_preallocate_segments(),
write_buffer_size: Self::default_write_buffer_size(),
};

pub const fn default_log_format_version() -> u8 {
Expand All @@ -129,10 +128,6 @@ impl Options {
Self::DEFAULT_MAX_SEGMENT_SIZE
}

pub const fn default_max_records_in_commit() -> NonZeroU16 {
Self::DEFAULT_MAX_RECORDS_IN_COMMIT
}

pub const fn default_offset_index_interval_bytes() -> NonZeroU64 {
Self::DEFAULT_OFFSET_INDEX_INTERVAL_BYTES
}
Expand All @@ -145,6 +140,10 @@ impl Options {
Self::DEFAULT_PREALLOCATE_SEGMENTS
}

pub const fn default_write_buffer_size() -> usize {
Self::DEFAULT_WRITE_BUFFER_SIZE
}

/// Compute the length in bytes of an offset index based on the settings in
/// `self`.
pub fn offset_index_len(&self) -> u64 {
Expand Down Expand Up @@ -263,7 +262,7 @@ impl<T> Commitlog<T> {
pub fn flush(&self) -> io::Result<Option<u64>> {
let mut inner = self.inner.write().unwrap();
trace!("flush commitlog");
inner.commit()?;
inner.flush()?;

Ok(inner.max_committed_offset())
}
Expand All @@ -283,7 +282,7 @@ impl<T> Commitlog<T> {
pub fn flush_and_sync(&self) -> io::Result<Option<u64>> {
let mut inner = self.inner.write().unwrap();
trace!("flush and sync commitlog");
inner.commit()?;
inner.flush()?;
inner.sync();

Ok(inner.max_committed_offset())
Expand Down Expand Up @@ -384,57 +383,47 @@ impl<T> Commitlog<T> {
}

impl<T: Encode> Commitlog<T> {
/// Append the record `txdata` to the log.
/// Write `transactions` to the log.
///
/// If the internal buffer exceeds [`Options::max_records_in_commit`], the
/// argument is returned in an `Err`. The caller should [`Self::flush`] the
/// log and try again.
/// This will store all `transactions` as a single [Commit]
/// (note that `transactions` must not yield more than [u16::MAX] elements).
///
/// In case the log is appended to from multiple threads, this may result in
/// a busy loop trying to acquire a slot in the buffer. In such scenarios,
/// [`Self::append_maybe_flush`] is preferable.
pub fn append(&self, txdata: T) -> Result<(), T> {
let mut inner = self.inner.write().unwrap();
inner.append(txdata)
}

/// Append the record `txdata` to the log.
/// Data is buffered internally, call [Self::flush] to force flushing to
/// the underlying storage.
///
/// Returns `Ok(None)` if `transactions` was empty, otherwise [Committed],
/// which contains the offset range and checksum of the commit.
///
/// The `txdata` payload is buffered in memory until either:
/// # Errors
///
/// - [`Self::flush`] is called explicitly, or
/// - [`Options::max_records_in_commit`] is exceeded
/// An `Err` value is returned in the following cases:
///
/// In the latter case, [`Self::append`] flushes implicitly, _before_
/// appending the `txdata` argument.
/// - if the transaction sequence is invalid, e.g. because the transaction
/// offsets are not contiguous.
///
/// I.e. the argument is not guaranteed to be flushed after the method
/// returns. If that is desired, [`Self::flush`] must be called explicitly.
/// In this case, **none** of the `transactions` will be written.
///
/// If writing `txdata` to the commitlog results in a new segment file being opened,
/// we will send a message down `on_new_segment`.
/// This will be hooked up to the `request_snapshot` channel of a `SnapshotWorker`.
/// - if creating the new segment fails due to an I/O error.
///
/// # Errors
/// # Panics
///
/// The method panics if:
///
/// - `transactions` exceeds [u16::MAX] elements
///
/// - [Self::flush] or writing to the underlying buffered writer fails
///
/// If the log needs to be flushed, but an I/O error occurs, ownership of
/// `txdata` is returned back to the caller alongside the [`io::Error`].
/// This is likely caused by some storage issue. As we cannot tell with
/// certainty how much data (if any) has been written, the internal state
/// becomes invalid and thus a panic is raised.
///
/// The value can then be used to retry appending.
pub fn append_maybe_flush(&self, txdata: T) -> Result<(), error::Append<T>> {
/// - [Self::sync] panics (called when rotating segments)
pub fn commit<U: Into<Transaction<T>>>(
&self,
transactions: impl IntoIterator<Item = U>,
) -> io::Result<Option<Committed>> {
let mut inner = self.inner.write().unwrap();

if let Err(txdata) = inner.append(txdata) {
if let Err(source) = inner.commit() {
return Err(error::Append { txdata, source });
}

// `inner.commit.n` must be zero at this point
let res = inner.append(txdata);
debug_assert!(res.is_ok(), "failed to append while holding write lock");
}

Ok(())
inner.commit(transactions)
}

/// Obtain an iterator which traverses the log from the start, yielding
Expand Down
6 changes: 1 addition & 5 deletions crates/commitlog/src/repo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,11 @@ pub fn create_segment_writer<R: Repo>(
records: Vec::new(),
epoch,
},
inner: io::BufWriter::new(storage),
inner: io::BufWriter::with_capacity(opts.write_buffer_size, storage),

min_tx_offset: offset,
bytes_written: Header::LEN as u64,

max_records_in_commit: opts.max_records_in_commit,

offset_index_head: create_offset_index_writer(repo, offset, opts),
})
}
Expand Down Expand Up @@ -294,8 +292,6 @@ pub fn resume_segment_writer<R: Repo>(
min_tx_offset: tx_range.start,
bytes_written: size_in_bytes,

max_records_in_commit: opts.max_records_in_commit,

offset_index_head: create_offset_index_writer(repo, offset, opts),
}))
}
Expand Down
Loading
Loading