From 84c879552250a18fe2f2975daf2e9e3dd84002e7 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Fri, 24 Apr 2026 11:10:21 +0200 Subject: [PATCH 1/4] snapshot: Ensure all snapshot files are durable When creating or compressing a snapshot, `fsync` all files and directories, so as to ensure that the snapshot is durable on the local disk. This obviously amounts to a large number of `fsync` calls, which may negatively impact performance of taking a snapshot -- since we hold a transaction lock while taking a snapshot, this is not to be taken lightly. --- crates/fs-utils/src/dir_trie.rs | 16 +++++++++++++++- crates/snapshot/src/lib.rs | 14 +++++++++++++- crates/snapshot/src/remote.rs | 12 ++++++++---- 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/crates/fs-utils/src/dir_trie.rs b/crates/fs-utils/src/dir_trie.rs index 14e6aa7bdeb..d23bda0609e 100644 --- a/crates/fs-utils/src/dir_trie.rs +++ b/crates/fs-utils/src/dir_trie.rs @@ -123,7 +123,15 @@ impl DirTrie { if src_file.is_file() { let dst_file = self.file_path(file_id); Self::create_parent(&dst_file)?; - std::fs::hard_link(src_file, dst_file)?; + std::fs::hard_link(&src_file, &dst_file)?; + // fsync the file, so its nlink count is durable. + // Note that we could also fsync `src_file`. + File::open(&dst_file)?.sync_all()?; + // fsync directory, so the entry for `dst_file` is durable. + // `parent()` is known to succeed, because `self.file_path` creates + // a path with a parent. + File::open(dst_file.parent().unwrap())?.sync_all()?; + Ok(true) } else { Ok(false) @@ -178,6 +186,12 @@ impl DirTrie { let contents = contents(); file.write_all(contents.as_ref())?; file.flush()?; + // fsync the file. + file.into_inner().expect("buffered writer just flushed").sync_all()?; + // fsync the directory. `parent()` is known to succeed, because + // `self.file_path` creates a path with a parent. + File::open(self.file_path(file_id).parent().unwrap())?.sync_all()?; + counter.objects_written += 1; Ok(()) } diff --git a/crates/snapshot/src/lib.rs b/crates/snapshot/src/lib.rs index 6606d22f9e4..2bbbbe9ca9b 100644 --- a/crates/snapshot/src/lib.rs +++ b/crates/snapshot/src/lib.rs @@ -43,7 +43,7 @@ use spacetimedb_table::{ page_pool::PagePool, table::Table, }; -use std::fs; +use std::fs::{self, File}; use std::ops::RangeBounds; use std::time::{Duration, Instant}; use std::{ @@ -708,6 +708,9 @@ impl SnapshotRepository { snapshot.write_all_blobs(&object_repo, blobs, prev_snapshot.as_ref(), &mut counter)?; snapshot.write_all_tables(&object_repo, tables, prev_snapshot.as_ref(), &mut counter)?; + // Ensure all the object directories are durable. + File::open(object_repo.root())?.sync_all()?; + self.write_snapshot_file(&snapshot_dir, snapshot)?; log::info!( @@ -744,6 +747,12 @@ impl SnapshotRepository { snapshot_file.write_all(hash.as_bytes())?; snapshot_file.write_all(&snapshot_bsatn)?; snapshot_file.flush()?; + // fsync file + enclosing directory. + snapshot_file + .into_inner() + .expect("buffered writer just flushed") + .sync_all()?; + File::open(&snapshot_dir.0)?.sync_all()?; } Ok(()) @@ -1102,6 +1111,7 @@ impl SnapshotRepository { if old_file.is_compressed() { std::fs::hard_link(old_path, src.with_extension("_tmp"))?; std::fs::rename(src.with_extension("_tmp"), src)?; + File::open(src.parent().unwrap())?.sync_all()?; if let Some(stats) = stats { stats.hardlinked += 1; } @@ -1134,6 +1144,7 @@ impl SnapshotRepository { log::error!("Failed to compress object file {path:?}: {err}"); })?; } + File::open(dir.root())?.sync_all()?; // Compress the snapshot file last, // which marks the whole snapshot as compressed. @@ -1142,6 +1153,7 @@ impl SnapshotRepository { compress(&old, &snapshot_file.0, None, None).inspect_err(|err| { log::error!("Failed to compress snapshot file {snapshot_file:?}: {err}"); })?; + File::open(&snapshot_dir.0)?.sync_all()?; log::info!( "Compressed snapshot {snapshot_dir:?} of replica {}: {compress_type:?}", diff --git a/crates/snapshot/src/remote.rs b/crates/snapshot/src/remote.rs index 81884983b66..2c35e1ada79 100644 --- a/crates/snapshot/src/remote.rs +++ b/crates/snapshot/src/remote.rs @@ -727,10 +727,13 @@ where Some(file_path) => { let dir = file_path.parent().expect("file not in a directory").to_owned(); fs::create_dir_all(&dir).await?; - let (tmp_file, tmp_out) = spawn_blocking(move || { - let tmp = NamedTempFile::new_in(dir)?; - let out = tmp.reopen()?; - Ok::<_, io::Error>((tmp, out)) + let (tmp_file, tmp_out) = spawn_blocking({ + let dir = dir.clone(); + move || { + let tmp = NamedTempFile::new_in(dir)?; + let out = tmp.reopen()?; + Ok::<_, io::Error>((tmp, out)) + } }) .await .unwrap()?; @@ -743,6 +746,7 @@ where .await .unwrap() .map_err(|e| e.error)?; + fs::File::open(dir).await?.sync_all().await?; } None => { From ef36d66fc13bbea9a30c1b2bfdea0d910ea4d72c Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Tue, 28 Apr 2026 09:32:26 +0200 Subject: [PATCH 2/4] Defer snapshot fsync until after the datastore lock is released. --- crates/core/src/db/snapshot.rs | 36 +++--- crates/core/src/worker_metrics/mod.rs | 6 + .../src/locking_tx_datastore/datastore.rs | 14 ++- crates/fs-utils/src/dir_trie.rs | 14 --- crates/snapshot/src/lib.rs | 107 ++++++++++++++++-- 5 files changed, 131 insertions(+), 46 deletions(-) diff --git a/crates/core/src/db/snapshot.rs b/crates/core/src/db/snapshot.rs index d7e68a62700..c47e1d33d2d 100644 --- a/crates/core/src/db/snapshot.rs +++ b/crates/core/src/db/snapshot.rs @@ -144,6 +144,7 @@ impl SnapshotWorker { struct SnapshotMetrics { snapshot_timing_total: Histogram, snapshot_timing_inner: Histogram, + snapshot_timing_fsync: Histogram, } impl SnapshotMetrics { @@ -151,6 +152,7 @@ impl SnapshotMetrics { Self { snapshot_timing_total: WORKER_METRICS.snapshot_creation_time_total.with_label_values(&db), snapshot_timing_inner: WORKER_METRICS.snapshot_creation_time_inner.with_label_values(&db), + snapshot_timing_fsync: WORKER_METRICS.snapshot_creation_time_fsync.with_label_values(&db), } } } @@ -221,27 +223,29 @@ impl SnapshotWorkerActor { let database_identity = self.snapshot_repo.database_identity(); - let maybe_offset = asyncify(move || { + let maybe_snapshot = asyncify(move || { let _timer = inner_timer.start_timer(); Locking::take_snapshot_internal(&state, &snapshot_repo) }) .await .with_context(|| format!("error capturing snapshot of database {}", database_identity))?; - maybe_offset - .map(|(offset, _path)| offset) - .inspect(|snapshot_offset| { - let elapsed = Duration::from_secs_f64(timer.stop_and_record()); - info!( - "Captured snapshot of database {} at TX offset {} in {:?}", - database_identity, snapshot_offset, elapsed, - ); - }) - .with_context(|| { - format!( - "refusing to take snapshot of database {} at TX offset -1", - database_identity - ) - }) + let (snapshot_offset, unflushed_snapshot) = maybe_snapshot.with_context(|| { + format!( + "refusing to take snapshot of database {} at TX offset -1", + database_identity + ) + })?; + self.metrics + .snapshot_timing_fsync + .observe_closure_duration(|| unflushed_snapshot.sync_all())?; + + let elapsed = Duration::from_secs_f64(timer.stop_and_record()); + info!( + "Captured snapshot of database {} at TX offset {} in {:?}", + database_identity, snapshot_offset, elapsed, + ); + + Ok(snapshot_offset) } async fn maybe_compress_snapshots(&mut self, latest_snapshot: TxOffset) { diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index 262b330f998..23d873e1cb3 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -434,6 +434,12 @@ metrics_group!( #[buckets(0.0005, 0.001, 0.005, 0.01, 0.1, 1.0, 5.0, 10.0)] pub snapshot_creation_time_inner: HistogramVec, + #[name = spacetime_snapshot_creation_time_fsync_sec] + #[help = "The time (in seconds) it took to fsync a database snapshot, excluding scheduling overhead"] + #[labels(db: Identity)] + #[buckets(0.0005, 0.001, 0.005, 0.01, 0.1, 1.0, 5.0, 10.0)] + pub snapshot_creation_time_fsync: HistogramVec, + #[name = spacetime_snapshot_compression_time_total_sec] #[help = "The time (in seconds) it took to do a compression pass on the snapshot repository, including scheduling overhead"] #[labels(db: Identity)] diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index d750c70c364..f4dadbc90c7 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -44,7 +44,7 @@ use spacetimedb_schema::{ reducer_name::ReducerName, schema::{ColumnSchema, IndexSchema, SequenceSchema, TableSchema}, }; -use spacetimedb_snapshot::{ReconstructedSnapshot, SnapshotRepository}; +use spacetimedb_snapshot::{ReconstructedSnapshot, SnapshotRepository, UnflushedSnapshot}; use spacetimedb_table::{ indexes::RowPointer, page_pool::PagePool, @@ -278,8 +278,10 @@ impl Locking { /// Returns an error if [`SnapshotRepository::create_snapshot`] returns an /// error. pub fn take_snapshot(&self, repo: &SnapshotRepository) -> Result> { - let maybe_offset_and_path = Self::take_snapshot_internal(&self.committed_state, repo)?; - Ok(maybe_offset_and_path.map(|(_, path)| path)) + Self::take_snapshot_internal(&self.committed_state, repo)? + .map(|(_offset, snap)| snap.sync_all()) + .transpose() + .map_err(Into::into) } pub fn assert_system_tables_match(&self) -> Result<()> { @@ -290,7 +292,7 @@ impl Locking { pub fn take_snapshot_internal( committed_state: &RwLock, repo: &SnapshotRepository, - ) -> Result> { + ) -> Result> { let mut committed_state = committed_state.write(); let Some(tx_offset) = committed_state.next_tx_offset.checked_sub(1) else { return Ok(None); @@ -303,9 +305,9 @@ impl Locking { ); let (tables, blob_store) = committed_state.persistent_tables_and_blob_store(); - let snapshot_dir = repo.create_snapshot(tables, blob_store, tx_offset)?; + let unflushed_snapshot = repo.create_snapshot(tables, blob_store, tx_offset)?; - Ok(Some((tx_offset, snapshot_dir))) + Ok(Some((tx_offset, unflushed_snapshot))) } /// Returns a list over all the currently connected clients, diff --git a/crates/fs-utils/src/dir_trie.rs b/crates/fs-utils/src/dir_trie.rs index d23bda0609e..72f50562607 100644 --- a/crates/fs-utils/src/dir_trie.rs +++ b/crates/fs-utils/src/dir_trie.rs @@ -124,14 +124,6 @@ impl DirTrie { let dst_file = self.file_path(file_id); Self::create_parent(&dst_file)?; std::fs::hard_link(&src_file, &dst_file)?; - // fsync the file, so its nlink count is durable. - // Note that we could also fsync `src_file`. - File::open(&dst_file)?.sync_all()?; - // fsync directory, so the entry for `dst_file` is durable. - // `parent()` is known to succeed, because `self.file_path` creates - // a path with a parent. - File::open(dst_file.parent().unwrap())?.sync_all()?; - Ok(true) } else { Ok(false) @@ -186,12 +178,6 @@ impl DirTrie { let contents = contents(); file.write_all(contents.as_ref())?; file.flush()?; - // fsync the file. - file.into_inner().expect("buffered writer just flushed").sync_all()?; - // fsync the directory. `parent()` is known to succeed, because - // `self.file_path` creates a path with a parent. - File::open(self.file_path(file_id).parent().unwrap())?.sync_all()?; - counter.objects_written += 1; Ok(()) } diff --git a/crates/snapshot/src/lib.rs b/crates/snapshot/src/lib.rs index 2bbbbe9ca9b..3076a24b543 100644 --- a/crates/snapshot/src/lib.rs +++ b/crates/snapshot/src/lib.rs @@ -23,6 +23,7 @@ #![allow(clippy::result_large_err)] +use log::warn; use spacetimedb_data_structures::map::{HashCollectionExt as _, HashMap}; use spacetimedb_durability::TxOffset; use spacetimedb_fs_utils::compression::{ @@ -44,7 +45,9 @@ use spacetimedb_table::{ table::Table, }; use std::fs::{self, File}; +use std::io; use std::ops::RangeBounds; +use std::path::Path; use std::time::{Duration, Instant}; use std::{ collections::BTreeMap, @@ -173,6 +176,89 @@ struct TableEntry { pages: Vec, } +/// A non-durable snapshot created via [SnapshotRepository::create_snapshot]. +/// +/// When [SnapshotRepository::create_snapshot] returns, all objects will have +/// been written to the underlying object repository, but not `fsync`'ed. +/// +/// Because this means that the snapshot may be incomplete, the [Snapshot] file +/// will _not_ have been written, and the snapshot remains locked (via a [Lockfile]). +/// +/// To turn an [UnflushedSnapshot] into a durable snapshot, call +/// [UnflushedSnapshot::sync_all]. This will: +/// +/// - sync all objects the snapshot references +/// - sync the object repository root +/// - write and sync the snapshot file +/// - drop the lock file +/// +/// This ensures that the snapshot file is present only if all objects are +/// present and durable, and that the snapshot is considered invalid otherwise. +/// +/// If [UnflushedSnapshot] is dropped without calling `sync_all`, the [Drop] +/// impl will attempt to call `sync_all` and log any errors. +/// +/// This two-stage snapshot creation exists in order to not introduce additional +/// latency while the datastore is locked for snapshotting. +#[must_use = "snapshots are not durable until `sync_all` is called"] +pub struct UnflushedSnapshot { + inner: Option, +} + +impl UnflushedSnapshot { + /// Sync all objects in the snapshot and write out the snapshot file. + /// + /// Returns the [SnapshotDirPath] on success. + pub fn sync_all(mut self) -> Result { + self.inner.take().unwrap().sync_all() + } +} + +impl Drop for UnflushedSnapshot { + fn drop(&mut self) { + if let Some(inner) = self.inner.take() + && let Err(e) = inner.sync_all() + { + warn!("failed to sync unflushed snapshot dropped without syncing: {e}"); + } + } +} + +struct UnflushedSnapshotInner { + snapshot: Snapshot, + snapshot_dir: SnapshotDirPath, + snapshot_repo: SnapshotRepository, + object_repo: DirTrie, + lockfile: Lockfile, +} + +impl UnflushedSnapshotInner { + fn sync_all(self) -> Result { + fn fsync(path: &Path) -> io::Result<()> { + File::open(path)?.sync_all() + } + + // Sync all objects and their parent directories. + // The paths yielded by the [Snapshot::files] iterator are constructed + // by [DirTree::file_path], which creates a path with a parent. + // `parent()` is thus known to succeed. + for (_, path) in self.snapshot.files(&self.object_repo) { + fsync(&path)?; + fsync(path.parent().unwrap())?; + } + // Sync the root directory of the object repo + fsync(self.object_repo.root())?; + // Write out the snapshot file (syncs internally). + self.snapshot_repo + .write_snapshot_file(&self.snapshot_dir, self.snapshot)?; + + // We can now drop the lockfile. + drop(self.lockfile); + + Ok(self.snapshot_dir) + } +} + #[derive(Clone, Serialize, Deserialize)] pub struct Snapshot { /// A magic number: must be equal to [`MAGIC`]. @@ -658,7 +744,7 @@ impl SnapshotRepository { tables: impl Iterator, blobs: &'db dyn BlobStore, tx_offset: TxOffset, - ) -> Result { + ) -> Result { // Invalidate equal to or newer than `tx_offset`. // // This is because snapshots don't currently track the epoch in which @@ -692,7 +778,7 @@ impl SnapshotRepository { // Before performing any observable operations, // acquire a lockfile on the snapshot you want to create. // Because we could be compressing the snapshot. - let _lock = Lockfile::for_file(&snapshot_dir)?; + let lockfile = Lockfile::for_file(&snapshot_dir)?; // Create the snapshot directory. snapshot_dir.create()?; @@ -708,11 +794,6 @@ impl SnapshotRepository { snapshot.write_all_blobs(&object_repo, blobs, prev_snapshot.as_ref(), &mut counter)?; snapshot.write_all_tables(&object_repo, tables, prev_snapshot.as_ref(), &mut counter)?; - // Ensure all the object directories are durable. - File::open(object_repo.root())?.sync_all()?; - - self.write_snapshot_file(&snapshot_dir, snapshot)?; - log::info!( "[{}] SNAPSHOT {:0>20}: Hardlinked {} objects and wrote {} objects", self.database_identity, @@ -721,9 +802,15 @@ impl SnapshotRepository { counter.objects_written, ); - // Success! return the directory of the newly-created snapshot. - // The lockfile will be dropped here. - Ok(snapshot_dir) + Ok(UnflushedSnapshot { + inner: Some(UnflushedSnapshotInner { + snapshot, + snapshot_dir, + snapshot_repo: self.clone(), + object_repo, + lockfile, + }), + }) } /// Write the on-disk snapshot file containing the BSATN-encoded `snapshot` From e0783d73e0e83690d2cf2cb4f64504ab392db1e5 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Tue, 28 Apr 2026 10:42:01 +0200 Subject: [PATCH 3/4] Remove irrelevant change in DirTree + docs --- crates/fs-utils/src/dir_trie.rs | 2 +- crates/snapshot/src/lib.rs | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/crates/fs-utils/src/dir_trie.rs b/crates/fs-utils/src/dir_trie.rs index 72f50562607..14e6aa7bdeb 100644 --- a/crates/fs-utils/src/dir_trie.rs +++ b/crates/fs-utils/src/dir_trie.rs @@ -123,7 +123,7 @@ impl DirTrie { if src_file.is_file() { let dst_file = self.file_path(file_id); Self::create_parent(&dst_file)?; - std::fs::hard_link(&src_file, &dst_file)?; + std::fs::hard_link(src_file, dst_file)?; Ok(true) } else { Ok(false) diff --git a/crates/snapshot/src/lib.rs b/crates/snapshot/src/lib.rs index 3076a24b543..65fd3666ed5 100644 --- a/crates/snapshot/src/lib.rs +++ b/crates/snapshot/src/lib.rs @@ -736,9 +736,13 @@ impl SnapshotRepository { /// where `tables` is the committed state of all the tables in the database, /// and `blobs` is the committed state's blob store. /// - /// Returns the path of the newly-created snapshot directory. + /// The returned [UnflushedSnapshot] is **not** durable -- call + /// [UnflushedSnapshot::sync_all] to finalize it (see the struct docs for + /// more details). /// - /// **NOTE**: The current snapshot is uncompressed to avoid the potential slowdown. + /// Also note that the snapshot remains locked for as long as [UnflushedSnapshot] + /// is alive. It will not appear in [Self::all_snapshots] nor can it be + /// modified by methods in [SnapshotRepository]. pub fn create_snapshot<'db>( &self, tables: impl Iterator, From ae96989196745ef0c859e5ebac91880077a78e46 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Tue, 28 Apr 2026 10:51:18 +0200 Subject: [PATCH 4/4] Add fsync error context --- crates/snapshot/src/lib.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/snapshot/src/lib.rs b/crates/snapshot/src/lib.rs index 65fd3666ed5..66c25ed824a 100644 --- a/crates/snapshot/src/lib.rs +++ b/crates/snapshot/src/lib.rs @@ -235,7 +235,9 @@ struct UnflushedSnapshotInner { impl UnflushedSnapshotInner { fn sync_all(self) -> Result { fn fsync(path: &Path) -> io::Result<()> { - File::open(path)?.sync_all() + File::open(path) + .and_then(|fd| fd.sync_all()) + .map_err(|e| io::Error::new(e.kind(), format!("failed to fsync {}: {}", path.display(), e))) } // Sync all objects and their parent directories.