Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 20 additions & 16 deletions crates/core/src/db/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,15 @@ impl SnapshotWorker {
struct SnapshotMetrics {
snapshot_timing_total: Histogram,
snapshot_timing_inner: Histogram,
snapshot_timing_fsync: Histogram,
}

impl SnapshotMetrics {
fn new(db: Identity) -> Self {
Self {
snapshot_timing_total: WORKER_METRICS.snapshot_creation_time_total.with_label_values(&db),
snapshot_timing_inner: WORKER_METRICS.snapshot_creation_time_inner.with_label_values(&db),
snapshot_timing_fsync: WORKER_METRICS.snapshot_creation_time_fsync.with_label_values(&db),
}
}
}
Expand Down Expand Up @@ -221,27 +223,29 @@ impl SnapshotWorkerActor {

let database_identity = self.snapshot_repo.database_identity();

let maybe_offset = asyncify(move || {
let maybe_snapshot = asyncify(move || {
let _timer = inner_timer.start_timer();
Locking::take_snapshot_internal(&state, &snapshot_repo)
})
.await
.with_context(|| format!("error capturing snapshot of database {}", database_identity))?;
maybe_offset
.map(|(offset, _path)| offset)
.inspect(|snapshot_offset| {
let elapsed = Duration::from_secs_f64(timer.stop_and_record());
info!(
"Captured snapshot of database {} at TX offset {} in {:?}",
database_identity, snapshot_offset, elapsed,
);
})
.with_context(|| {
format!(
"refusing to take snapshot of database {} at TX offset -1",
database_identity
)
})
let (snapshot_offset, unflushed_snapshot) = maybe_snapshot.with_context(|| {
format!(
"refusing to take snapshot of database {} at TX offset -1",
database_identity
)
})?;
self.metrics
.snapshot_timing_fsync
.observe_closure_duration(|| unflushed_snapshot.sync_all())?;

let elapsed = Duration::from_secs_f64(timer.stop_and_record());
info!(
"Captured snapshot of database {} at TX offset {} in {:?}",
database_identity, snapshot_offset, elapsed,
);

Ok(snapshot_offset)
}

async fn maybe_compress_snapshots(&mut self, latest_snapshot: TxOffset) {
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/worker_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,12 @@ metrics_group!(
#[buckets(0.0005, 0.001, 0.005, 0.01, 0.1, 1.0, 5.0, 10.0)]
pub snapshot_creation_time_inner: HistogramVec,

#[name = spacetime_snapshot_creation_time_fsync_sec]
#[help = "The time (in seconds) it took to fsync a database snapshot, excluding scheduling overhead"]
#[labels(db: Identity)]
#[buckets(0.0005, 0.001, 0.005, 0.01, 0.1, 1.0, 5.0, 10.0)]
pub snapshot_creation_time_fsync: HistogramVec,

#[name = spacetime_snapshot_compression_time_total_sec]
#[help = "The time (in seconds) it took to do a compression pass on the snapshot repository, including scheduling overhead"]
#[labels(db: Identity)]
Expand Down
14 changes: 8 additions & 6 deletions crates/datastore/src/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use spacetimedb_schema::{
reducer_name::ReducerName,
schema::{ColumnSchema, IndexSchema, SequenceSchema, TableSchema},
};
use spacetimedb_snapshot::{ReconstructedSnapshot, SnapshotRepository};
use spacetimedb_snapshot::{ReconstructedSnapshot, SnapshotRepository, UnflushedSnapshot};
use spacetimedb_table::{
indexes::RowPointer,
page_pool::PagePool,
Expand Down Expand Up @@ -278,8 +278,10 @@ impl Locking {
/// Returns an error if [`SnapshotRepository::create_snapshot`] returns an
/// error.
pub fn take_snapshot(&self, repo: &SnapshotRepository) -> Result<Option<SnapshotDirPath>> {
let maybe_offset_and_path = Self::take_snapshot_internal(&self.committed_state, repo)?;
Ok(maybe_offset_and_path.map(|(_, path)| path))
Self::take_snapshot_internal(&self.committed_state, repo)?
.map(|(_offset, snap)| snap.sync_all())
.transpose()
.map_err(Into::into)
}

pub fn assert_system_tables_match(&self) -> Result<()> {
Expand All @@ -290,7 +292,7 @@ impl Locking {
pub fn take_snapshot_internal(
committed_state: &RwLock<CommittedState>,
repo: &SnapshotRepository,
) -> Result<Option<(TxOffset, SnapshotDirPath)>> {
) -> Result<Option<(TxOffset, UnflushedSnapshot)>> {
let mut committed_state = committed_state.write();
let Some(tx_offset) = committed_state.next_tx_offset.checked_sub(1) else {
return Ok(None);
Expand All @@ -303,9 +305,9 @@ impl Locking {
);

let (tables, blob_store) = committed_state.persistent_tables_and_blob_store();
let snapshot_dir = repo.create_snapshot(tables, blob_store, tx_offset)?;
let unflushed_snapshot = repo.create_snapshot(tables, blob_store, tx_offset)?;

Ok(Some((tx_offset, snapshot_dir)))
Ok(Some((tx_offset, unflushed_snapshot)))
}

/// Returns a list over all the currently connected clients,
Expand Down
123 changes: 113 additions & 10 deletions crates/snapshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#![allow(clippy::result_large_err)]

use log::warn;
use spacetimedb_data_structures::map::{HashCollectionExt as _, HashMap};
use spacetimedb_durability::TxOffset;
use spacetimedb_fs_utils::compression::{
Expand All @@ -43,8 +44,10 @@ use spacetimedb_table::{
page_pool::PagePool,
table::Table,
};
use std::fs;
use std::fs::{self, File};
use std::io;
use std::ops::RangeBounds;
use std::path::Path;
use std::time::{Duration, Instant};
use std::{
collections::BTreeMap,
Expand Down Expand Up @@ -173,6 +176,89 @@ struct TableEntry {
pages: Vec<blake3::Hash>,
}

/// A non-durable snapshot created via [SnapshotRepository::create_snapshot].
///
/// When [SnapshotRepository::create_snapshot] returns, all objects will have
/// been written to the underlying object repository, but not `fsync`'ed.
///
/// Because this means that the snapshot may be incomplete, the [Snapshot] file
/// will _not_ have been written, and the snapshot remains locked (via a [Lockfile]).
///
/// To turn an [UnflushedSnapshot] into a durable snapshot, call
/// [UnflushedSnapshot::sync_all]. This will:
///
/// - sync all objects the snapshot references
/// - sync the object repository root
/// - write and sync the snapshot file
/// - drop the lock file
///
/// This ensures that the snapshot file is present only if all objects are
/// present and durable, and that the snapshot is considered invalid otherwise.
///
/// If [UnflushedSnapshot] is dropped without calling `sync_all`, the [Drop]
/// impl will attempt to call `sync_all` and log any errors.
///
/// This two-stage snapshot creation exists in order to not introduce additional
/// latency while the datastore is locked for snapshotting.
#[must_use = "snapshots are not durable until `sync_all` is called"]
pub struct UnflushedSnapshot {
inner: Option<UnflushedSnapshotInner>,
}

impl UnflushedSnapshot {
/// Sync all objects in the snapshot and write out the snapshot file.
///
/// Returns the [SnapshotDirPath] on success.
pub fn sync_all(mut self) -> Result<SnapshotDirPath, SnapshotError> {
self.inner.take().unwrap().sync_all()
}
}

impl Drop for UnflushedSnapshot {
fn drop(&mut self) {
if let Some(inner) = self.inner.take()
&& let Err(e) = inner.sync_all()
{
warn!("failed to sync unflushed snapshot dropped without syncing: {e}");
}
}
}

struct UnflushedSnapshotInner {
snapshot: Snapshot,
snapshot_dir: SnapshotDirPath,
snapshot_repo: SnapshotRepository,
object_repo: DirTrie,
lockfile: Lockfile,
}

impl UnflushedSnapshotInner {
fn sync_all(self) -> Result<SnapshotDirPath, SnapshotError> {
fn fsync(path: &Path) -> io::Result<()> {
File::open(path)?.sync_all()
}

// Sync all objects and their parent directories.
// The paths yielded by the [Snapshot::files] iterator are constructed
// by [DirTree::file_path], which creates a path with a parent.
// `parent()` is thus known to succeed.
for (_, path) in self.snapshot.files(&self.object_repo) {
fsync(&path)?;
fsync(path.parent().unwrap())?;
}
// Sync the root directory of the object repo
fsync(self.object_repo.root())?;
// Write out the snapshot file (syncs internally).
self.snapshot_repo
.write_snapshot_file(&self.snapshot_dir, self.snapshot)?;

// We can now drop the lockfile.
drop(self.lockfile);

Ok(self.snapshot_dir)
}
}

#[derive(Clone, Serialize, Deserialize)]
pub struct Snapshot {
/// A magic number: must be equal to [`MAGIC`].
Expand Down Expand Up @@ -650,15 +736,19 @@ impl SnapshotRepository {
/// where `tables` is the committed state of all the tables in the database,
/// and `blobs` is the committed state's blob store.
///
/// Returns the path of the newly-created snapshot directory.
/// The returned [UnflushedSnapshot] is **not** durable -- call
/// [UnflushedSnapshot::sync_all] to finalize it (see the struct docs for
/// more details).
///
/// **NOTE**: The current snapshot is uncompressed to avoid the potential slowdown.
/// Also note that the snapshot remains locked for as long as [UnflushedSnapshot]
/// is alive. It will not appear in [Self::all_snapshots] nor can it be
/// modified by methods in [SnapshotRepository].
pub fn create_snapshot<'db>(
&self,
tables: impl Iterator<Item = &'db mut Table>,
blobs: &'db dyn BlobStore,
tx_offset: TxOffset,
) -> Result<SnapshotDirPath, SnapshotError> {
) -> Result<UnflushedSnapshot, SnapshotError> {
// Invalidate equal to or newer than `tx_offset`.
//
// This is because snapshots don't currently track the epoch in which
Expand Down Expand Up @@ -692,7 +782,7 @@ impl SnapshotRepository {
// Before performing any observable operations,
// acquire a lockfile on the snapshot you want to create.
// Because we could be compressing the snapshot.
let _lock = Lockfile::for_file(&snapshot_dir)?;
let lockfile = Lockfile::for_file(&snapshot_dir)?;

// Create the snapshot directory.
snapshot_dir.create()?;
Expand All @@ -708,8 +798,6 @@ impl SnapshotRepository {
snapshot.write_all_blobs(&object_repo, blobs, prev_snapshot.as_ref(), &mut counter)?;
snapshot.write_all_tables(&object_repo, tables, prev_snapshot.as_ref(), &mut counter)?;

self.write_snapshot_file(&snapshot_dir, snapshot)?;

log::info!(
"[{}] SNAPSHOT {:0>20}: Hardlinked {} objects and wrote {} objects",
self.database_identity,
Expand All @@ -718,9 +806,15 @@ impl SnapshotRepository {
counter.objects_written,
);

// Success! return the directory of the newly-created snapshot.
// The lockfile will be dropped here.
Ok(snapshot_dir)
Ok(UnflushedSnapshot {
inner: Some(UnflushedSnapshotInner {
snapshot,
snapshot_dir,
snapshot_repo: self.clone(),
object_repo,
lockfile,
}),
})
}

/// Write the on-disk snapshot file containing the BSATN-encoded `snapshot`
Expand All @@ -744,6 +838,12 @@ impl SnapshotRepository {
snapshot_file.write_all(hash.as_bytes())?;
snapshot_file.write_all(&snapshot_bsatn)?;
snapshot_file.flush()?;
// fsync file + enclosing directory.
snapshot_file
.into_inner()
.expect("buffered writer just flushed")
.sync_all()?;
File::open(&snapshot_dir.0)?.sync_all()?;
}

Ok(())
Expand Down Expand Up @@ -1102,6 +1202,7 @@ impl SnapshotRepository {
if old_file.is_compressed() {
std::fs::hard_link(old_path, src.with_extension("_tmp"))?;
std::fs::rename(src.with_extension("_tmp"), src)?;
File::open(src.parent().unwrap())?.sync_all()?;
if let Some(stats) = stats {
stats.hardlinked += 1;
}
Expand Down Expand Up @@ -1134,6 +1235,7 @@ impl SnapshotRepository {
log::error!("Failed to compress object file {path:?}: {err}");
})?;
}
File::open(dir.root())?.sync_all()?;

// Compress the snapshot file last,
// which marks the whole snapshot as compressed.
Expand All @@ -1142,6 +1244,7 @@ impl SnapshotRepository {
compress(&old, &snapshot_file.0, None, None).inspect_err(|err| {
log::error!("Failed to compress snapshot file {snapshot_file:?}: {err}");
})?;
File::open(&snapshot_dir.0)?.sync_all()?;

log::info!(
"Compressed snapshot {snapshot_dir:?} of replica {}: {compress_type:?}",
Expand Down
12 changes: 8 additions & 4 deletions crates/snapshot/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,10 +727,13 @@ where
Some(file_path) => {
let dir = file_path.parent().expect("file not in a directory").to_owned();
fs::create_dir_all(&dir).await?;
let (tmp_file, tmp_out) = spawn_blocking(move || {
let tmp = NamedTempFile::new_in(dir)?;
let out = tmp.reopen()?;
Ok::<_, io::Error>((tmp, out))
let (tmp_file, tmp_out) = spawn_blocking({
let dir = dir.clone();
move || {
let tmp = NamedTempFile::new_in(dir)?;
let out = tmp.reopen()?;
Ok::<_, io::Error>((tmp, out))
}
})
.await
.unwrap()?;
Expand All @@ -743,6 +746,7 @@ where
.await
.unwrap()
.map_err(|e| e.error)?;
fs::File::open(dir).await?.sync_all().await?;
}

None => {
Expand Down
Loading