Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,10 @@ impl RelationalDB {
Ok(self.inner.alter_table_access_mut_tx(tx, name, access)?)
}

pub(crate) fn alter_table_event_flag(&self, tx: &mut MutTx, name: &str, is_event: bool) -> Result<(), DBError> {
Ok(self.inner.alter_table_event_flag_mut_tx(tx, name, is_event)?)
}

pub(crate) fn alter_table_primary_key(
&self,
tx: &mut MutTx,
Expand Down
107 changes: 107 additions & 0 deletions crates/core/src/db/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,27 @@ fn auto_migrate_database(
let table_def = plan.new.stored_in_table_def(&table_name.clone().into()).unwrap();
stdb.alter_table_access(tx, table_name, table_def.table_access.into())?;
}
spacetimedb_schema::auto_migrate::AutoMigrateStep::ChangeEventFlag(table_name) => {
let table_def: &TableDef = plan.new.expect_lookup(table_name);
let table_id = stdb
.table_id_from_name_mut(tx, table_name)?
.expect("ChangeEventFlag references a table that should exist");

// Pre-validate: flipping is only safe when the table has no committed rows.
if stdb.table_row_count_mut(tx, table_id).unwrap_or(0) > 0 {
anyhow::bail!(
"Cannot change `event` flag on table `{table_name}`: table contains data. \
Clear the table's rows (e.g. via a reducer) before toggling the `event` annotation."
);
}

Comment thread
Centril marked this conversation as resolved.
log!(
logger,
"Changing `event` flag on table `{table_name}` to `{}`",
table_def.is_event
);
stdb.alter_table_event_flag(tx, table_name, table_def.is_event)?;
}
spacetimedb_schema::auto_migrate::AutoMigrateStep::ChangePrimaryKey(table_name) => {
let table_def = plan.new.stored_in_table_def(&table_name.clone().into()).unwrap();
log!(logger, "Changing primary key for table `{table_name}`");
Expand Down Expand Up @@ -339,6 +360,10 @@ mod test {
db::relational_db::tests_utils::{begin_mut_tx, insert, TestDB},
host::module_host::create_table_from_def,
};
use pretty_assertions::assert_matches;
use spacetimedb_datastore::locking_tx_datastore::test_helpers::{
assert_is_event_state, check_table_event_flag_altered,
};
use spacetimedb_datastore::locking_tx_datastore::PendingSchemaChange;
use spacetimedb_lib::db::raw_def::v9::{btree, RawModuleDefV9Builder, TableAccess};
use spacetimedb_sats::{product, AlgebraicType, AlgebraicType::U64};
Expand Down Expand Up @@ -580,4 +605,86 @@ mod test {
);
Ok(())
}

/// Build a minimal v10 module with a single user table `events` whose
/// `is_event` flag matches `is_event`.
fn single_event_table_module_v10(is_event: bool) -> ModuleDef {
use spacetimedb_lib::db::raw_def::v10::RawModuleDefV10Builder;

let mut builder = RawModuleDefV10Builder::new();
builder
.build_table_with_new_type("events", [("id", U64)], true)
.with_event(is_event)
.finish();
builder
.finish()
.try_into()
.expect("should be a valid v10 module definition")
}

/// Create a non-event `events` table from the schema of `single_event_table_module_v10(false)`
/// in a fresh tx, commit it, and return the `TableId`. Leaves the table empty.
fn setup_events_table(stdb: &TestDB, module: &ModuleDef) -> anyhow::Result<TableId> {
let mut tx = begin_mut_tx(stdb);
for def in module.tables() {
create_table_from_def(stdb, &mut tx, module, def)?;
}
let table_id = stdb
.table_id_from_name_mut(&tx, "events")?
.expect("table should exist");
stdb.commit_tx(tx)?;
Ok(table_id)
}

#[test]
fn change_event_flag_empty_table_succeeds() -> anyhow::Result<()> {
let auth_ctx = AuthCtx::for_testing();
let stdb = TestDB::durable()?;

let old = single_event_table_module_v10(false);
let new = single_event_table_module_v10(true);
let table_id = setup_events_table(&stdb, &old)?;

let mut tx = begin_mut_tx(&stdb);
assert_is_event_state(&tx, table_id, false);

let plan = ponder_migrate(&old, &new)?;
let res = update_database(&stdb, &mut tx, auth_ctx, plan, &TestLogger)?;

assert!(
matches!(res, UpdateResult::RequiresClientDisconnect),
"flipping the `event` flag should disconnect clients"
);
assert_is_event_state(&tx, table_id, true);
check_table_event_flag_altered(&tx, table_id, true);
Ok(())
}

#[test]
fn change_event_flag_nonempty_table_fails() -> anyhow::Result<()> {
let auth_ctx = AuthCtx::for_testing();
let stdb = TestDB::durable()?;

let old = single_event_table_module_v10(false);
let new = single_event_table_module_v10(true);
let table_id = setup_events_table(&stdb, &old)?;

// Insert a row in a separate tx so the pre-flip table state is committed.
let mut tx = begin_mut_tx(&stdb);
insert(&stdb, &mut tx, table_id, &product![42u64])?;
stdb.commit_tx(tx)?;

let mut tx = begin_mut_tx(&stdb);
let plan = ponder_migrate(&old, &new)?;
let err = update_database(&stdb, &mut tx, auth_ctx, plan, &TestLogger)
.err()
.expect("flipping `is_event` on a non-empty table should fail");
assert!(
err.to_string().contains("contains data"),
"error should mention that the table contains data, got: {err}"
);
assert_is_event_state(&tx, table_id, false);
assert_eq!(tx.pending_schema_changes(), []);
Ok(())
}
}
6 changes: 6 additions & 0 deletions crates/datastore/src/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,12 @@ impl CommittedState {
let table = self.tables.get_mut(&table_id)?;
table.with_mut_schema(|s| s.table_access = access);
}
// A table's `is_event` flag was changed. Change back to the old one.
TableAlterEventFlag(table_id, old_is_event) => {
let table = self.tables.get_mut(&table_id)?;
Comment thread
Centril marked this conversation as resolved.
assert_eq!(table.num_rows(), 0);
table.with_mut_schema(|s| s.is_event = old_is_event);
}
// A table's primary key was changed. Change back to the old one.
TableAlterPrimaryKey(table_id, old_pk) => {
let table = self.tables.get_mut(&table_id)?;
Expand Down
147 changes: 145 additions & 2 deletions crates/datastore/src/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,14 @@ impl Locking {
tx.alter_table_access(table_id, access)
}

pub fn alter_table_event_flag_mut_tx(&self, tx: &mut MutTxId, name: &str, is_event: bool) -> Result<()> {
let table_id = self
.table_id_from_name_mut_tx(tx, name)?
.ok_or_else(|| TableError::NotFound(name.into()))?;

tx.alter_table_event_flag(table_id, is_event)
}

pub fn alter_table_primary_key_mut_tx(
&self,
tx: &mut MutTxId,
Expand Down Expand Up @@ -968,11 +976,15 @@ fn metadata_from_row(row: RowRef<'_>) -> Result<Metadata> {
pub(crate) mod tests {
use super::*;
use crate::error::IndexError;
use crate::locking_tx_datastore::test_helpers::{
assert_is_event_state, check_table_event_flag_altered, st_event_table_has_row,
};
use crate::locking_tx_datastore::tx_state::PendingSchemaChange;
use crate::system_tables::{
system_tables, StColumnRow, StConnectionCredentialsFields, StConstraintData, StConstraintFields,
StConstraintRow, StEventTableFields, StIndexAlgorithm, StIndexFields, StIndexRow, StRowLevelSecurityFields,
StScheduledFields, StSequenceFields, StSequenceRow, StTableRow, StVarFields, StViewArgFields, StViewFields,
StConstraintRow, StEventTableFields, StEventTableRow, StIndexAlgorithm, StIndexFields, StIndexRow,
StRowLevelSecurityFields, StScheduledFields, StSequenceFields, StSequenceRow, StTableRow, StVarFields,
StViewArgFields, StViewFields,
ST_CLIENT_ID, ST_CLIENT_NAME, ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ACCESSOR_NAME, ST_COLUMN_ID, ST_COLUMN_NAME,
ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_NAME, ST_CONSTRAINT_ID, ST_CONSTRAINT_NAME,
ST_EVENT_TABLE_ID, ST_EVENT_TABLE_NAME, ST_INDEX_ACCESSOR_ID, ST_INDEX_ACCESSOR_NAME, ST_INDEX_ID,
Expand Down Expand Up @@ -3141,6 +3153,137 @@ pub(crate) mod tests {
Ok(())
}

#[test]
fn test_alter_table_event_flag_non_event_to_event() -> ResultTest<()> {
Comment thread
Centril marked this conversation as resolved.
// Create a non-event table.
let (datastore, tx, table_id) = setup_table()?;
commit(&datastore, tx)?;

// Flip `is_event` from `false` to `true`.
let mut tx = begin_mut_tx(&datastore);
assert_is_event_state(&tx, table_id, false);
assert!(
!st_event_table_has_row(&datastore, &tx, table_id),
"fresh non-event table must not have a row in `st_event_table`"
);

tx.alter_table_event_flag(table_id, true)?;
check_table_event_flag_altered(&tx, table_id, true);
assert_is_event_state(&tx, table_id, true);
assert!(
st_event_table_has_row(&datastore, &tx, table_id),
"after flipping to event, `st_event_table` should have the row"
);

let tx_data = commit(&datastore, tx)?;
// Flipping to event inserts one row into `st_event_table`
// and does not touch the user table's row data.
let expected_row = ProductValue::from(StEventTableRow { table_id });
assert_eq!(
tx_data.inserts_for_table(ST_EVENT_TABLE_ID),
Some(&[expected_row][..]),
);
assert_eq!(tx_data.inserts_for_table(table_id), None);
assert_eq!(tx_data.deletes_for_table(table_id), None);

// After commit, the schema should reflect the flipped flag
// and `st_event_table` should contain the row.
let tx = begin_mut_tx(&datastore);
assert_is_event_state(&tx, table_id, true);
assert!(
st_event_table_has_row(&datastore, &tx, table_id),
"after commit, `st_event_table` should have the row"
);
Ok(())
}

#[test]
fn test_alter_table_event_flag_event_to_non_event() -> ResultTest<()> {
// Create an event table.
let (datastore, tx, table_id) = setup_event_table()?;
commit(&datastore, tx)?;

// Sanity check: `st_event_table` should have the row.
let mut tx = begin_mut_tx(&datastore);
assert_is_event_state(&tx, table_id, true);
assert!(
st_event_table_has_row(&datastore, &tx, table_id),
"event table should have a row in `st_event_table`"
);

// Flip `is_event` from `true` to `false`.
tx.alter_table_event_flag(table_id, false)?;
check_table_event_flag_altered(&tx, table_id, false);
assert_is_event_state(&tx, table_id, false);
assert!(
!st_event_table_has_row(&datastore, &tx, table_id),
"after flipping to non-event, `st_event_table` should not have the row"
);

let tx_data = commit(&datastore, tx)?;
// Flipping away from event deletes one row from `st_event_table`
// and does not touch the user table's row data.
let expected_row = ProductValue::from(StEventTableRow { table_id });
assert_eq!(
tx_data.deletes_for_table(ST_EVENT_TABLE_ID),
Some(&[expected_row][..]),
);
assert_eq!(tx_data.inserts_for_table(table_id), None);
assert_eq!(tx_data.deletes_for_table(table_id), None);

// After commit, the schema should reflect the flipped flag
// and `st_event_table` should NOT contain the row.
let tx = begin_mut_tx(&datastore);
assert_is_event_state(&tx, table_id, false);
assert!(
!st_event_table_has_row(&datastore, &tx, table_id),
"after commit, `st_event_table` should not have the row"
);
Ok(())
}

#[test]
fn test_alter_table_event_flag_rollback_reverts_live_state_and_st_event_table() -> ResultTest<()> {
// Create a non-event table.
let (datastore, tx, table_id) = setup_table()?;
commit(&datastore, tx)?;

// Start a new tx, flip, check pending change, then rollback.
let mut tx = begin_mut_tx(&datastore);
assert!(!st_event_table_has_row(&datastore, &tx, table_id));

tx.alter_table_event_flag(table_id, true)?;
check_table_event_flag_altered(&tx, table_id, true);
// The in-tx view must reflect the flip.
assert_is_event_state(&tx, table_id, true);
assert!(
st_event_table_has_row(&datastore, &tx, table_id),
"after flipping within the tx, `st_event_table` should have the row"
);
let _ = datastore.rollback_mut_tx(tx);

// After rollback, the schema and `st_event_table` should be back to pre-state.
let tx = begin_mut_tx(&datastore);
assert_eq!(tx.pending_schema_changes(), []);
assert_is_event_state(&tx, table_id, false);
assert!(
!st_event_table_has_row(&datastore, &tx, table_id),
"rollback should revert the `st_event_table` row"
);
Ok(())
}

#[test]
fn test_alter_table_event_flag_idempotent_no_pending_change() -> ResultTest<()> {
let (datastore, tx, table_id) = setup_table()?;
commit(&datastore, tx)?;

let mut tx = begin_mut_tx(&datastore);
tx.alter_table_event_flag(table_id, false)?;
assert_eq!(tx.pending_schema_changes(), []);
Ok(())
}

#[test]
fn test_alter_table_row_type_rejects_some_bad_changes() -> ResultTest<()> {
let datastore = get_datastore()?;
Expand Down
2 changes: 2 additions & 0 deletions crates/datastore/src/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub use tx::{NumDistinctValues, TxId};
mod tx_state;
#[cfg(any(test, feature = "test"))]
pub use tx_state::PendingSchemaChange;
#[cfg(any(test, feature = "test"))]
pub mod test_helpers;

use parking_lot::{
lock_api::{ArcMutexGuard, ArcRwLockReadGuard, ArcRwLockWriteGuard},
Expand Down
49 changes: 46 additions & 3 deletions crates/datastore/src/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
error::{IndexError, SequenceError, TableError},
system_tables::{
with_sys_table_buf, StClientFields, StClientRow, StColumnAccessorFields, StColumnAccessorRow, StColumnFields,
StColumnRow, StConstraintFields, StConstraintRow, StEventTableRow, StFields as _, StIndexAccessorFields,
StColumnRow, StConstraintFields, StConstraintRow, StEventTableFields, StEventTableRow, StFields as _, StIndexAccessorFields,
StIndexAccessorRow, StIndexFields, StIndexRow, StRowLevelSecurityFields, StRowLevelSecurityRow,
StScheduledFields, StScheduledRow, StSequenceFields, StSequenceRow, StTableAccessorFields, StTableAccessorRow,
StTableFields, StTableRow, SystemTable, ST_CLIENT_ID, ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ID, ST_CONSTRAINT_ID,
Expand Down Expand Up @@ -691,8 +691,7 @@ impl MutTxId {

// Insert into st_event_table if this is an event table.
if is_event {
let row = StEventTableRow { table_id };
self.insert_via_serialize_bsatn(ST_EVENT_TABLE_ID, &row)?;
self.insert_st_event_table_row(table_id)?;
}

// Create the indexes for the table.
Expand Down Expand Up @@ -1082,6 +1081,50 @@ impl MutTxId {
Ok(())
}

/// Change the `is_event` flag of the table identified by `table_id`.
///
/// Updates both the in-memory schema and the `st_event_table` system table.
/// This is a breaking change for subscribed clients (the committed state
/// semantics of the table flip), so callers must arrange a `DisconnectAllUsers`.
pub(crate) fn alter_table_event_flag(&mut self, table_id: TableId, is_event: bool) -> Result<()> {
// Write to the table in the tx state (and clone into commit state).
let ((tx_table, ..), (commit_table, ..)) = self.get_or_create_insert_table_mut(table_id)?;
let old_is_event = tx_table.get_schema().is_event;
if old_is_event == is_event {
// Idempotent no-op; do not record a pending change or it would confuse rollback.
return Ok(());
}
tx_table.with_mut_schema_and_clone(commit_table, |s| s.is_event = is_event);

// Update `st_event_table`.
if is_event {
self.insert_st_event_table_row(table_id)?;
} else {
self.delete_st_event_table_row(table_id)?;
}

// Remember the pending change so we can undo if necessary.
self.push_schema_change(PendingSchemaChange::TableAlterEventFlag(table_id, old_is_event));

Ok(())
}

/// Inserts a row into `st_event_table` marking `table_id` as an event table.
fn insert_st_event_table_row(&mut self, table_id: TableId) -> Result<()> {
let row = StEventTableRow { table_id };
self.insert_via_serialize_bsatn(ST_EVENT_TABLE_ID, &row)?;
Ok(())
}

/// Drops the row in `st_event_table` for this `table_id`.
fn delete_st_event_table_row(&mut self, table_id: TableId) -> Result<()> {
self.delete_col_eq(
ST_EVENT_TABLE_ID,
StEventTableFields::TableId.col_id(),
&table_id.into(),
)
}

/// Change the primary key of the table identified by `table_id`.
///
/// Updates both the in-memory schema and the `st_table` system table.
Expand Down
Loading