Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/bindings-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ mod sym {
symbol!(index);
symbol!(init);
symbol!(name);
symbol!(on_abort);
symbol!(primary_key);
symbol!(private);
symbol!(public);
Expand Down
13 changes: 13 additions & 0 deletions crates/bindings-macro/src/procedure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use syn::{ItemFn, LitStr};
pub(crate) struct ProcedureArgs {
/// For consistency with reducers: allow specifying a different export name than the Rust function name.
name: Option<LitStr>,
/// Optional procedure to invoke if this procedure aborts.
on_abort: Option<LitStr>,
}

impl ProcedureArgs {
Expand All @@ -21,6 +23,10 @@ impl ProcedureArgs {
check_duplicate(&args.name, &meta)?;
args.name = Some(meta.value()?.parse()?);
}
sym::on_abort => {
check_duplicate(&args.on_abort, &meta)?;
args.on_abort = Some(meta.value()?.parse()?);
}
});
Ok(())
})
Expand All @@ -34,6 +40,10 @@ pub(crate) fn procedure_impl(args: ProcedureArgs, original_function: &ItemFn) ->
let vis = &original_function.vis;

let procedure_name = args.name.unwrap_or_else(|| ident_to_litstr(func_name));
let on_abort = match args.on_abort {
Some(lit) => quote!(Some(#lit)),
None => quote!(None),
};

assert_only_lifetime_generics(original_function, "procedures")?;

Expand Down Expand Up @@ -115,6 +125,9 @@ pub(crate) fn procedure_impl(args: ProcedureArgs, original_function: &ItemFn) ->
/// The name of this function
const NAME: &'static str = #procedure_name;

/// The name of the on-abort handler, if any.
const ON_ABORT: Option<&'static str> = #on_abort;

/// The parameter names of this function
const ARG_NAMES: &'static [Option<&'static str>] = &[#(#opt_arg_names),*];

Expand Down
7 changes: 6 additions & 1 deletion crates/bindings/src/rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ pub trait FnInfo {
/// The name of the function.
const NAME: &'static str;

/// The name of the on-abort handler, if any.
const ON_ABORT: Option<&'static str> = None;

/// The lifecycle of the function, if there is one.
const LIFECYCLE: Option<LifecycleReducer> = None;

Expand Down Expand Up @@ -799,7 +802,9 @@ where
register_describer(|module| {
let params = A::schema::<I>(&mut module.inner);
let ret_ty = <Ret as SpacetimeType>::make_type(&mut module.inner);
module.inner.add_procedure(I::NAME, params, ret_ty);
module
.inner
.add_procedure(I::NAME, params, ret_ty, I::ON_ABORT.map(Into::into));
module.procedures.push(I::INVOKE);
})
}
Expand Down
21 changes: 18 additions & 3 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ use crate::host::module_host::{
ClientConnectedError, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall, ModuleInfo, RefInstance,
ViewCallResult, ViewCommand, ViewCommandResult, ViewOutcome,
};
use crate::host::scheduler::{CallScheduledFunctionResult, ScheduledFunctionParams};
use crate::host::scheduler::{CallScheduledFunctionResult, ScheduledFunctionParams, Scheduler};
use crate::host::{
ArgsTuple, ModuleHost, ProcedureCallError, ProcedureCallResult, ReducerCallError, ReducerCallResult, ReducerId,
ReducerOutcome, Scheduler, UpdateDatabaseResult,
ArgsTuple, FunctionArgs, ModuleHost, ProcedureCallError, ProcedureCallResult, ReducerCallError, ReducerCallResult,
ReducerId, ReducerOutcome, UpdateDatabaseResult,
};
use crate::identity::Identity;
use crate::messages::control_db::HostType;
Expand Down Expand Up @@ -493,6 +493,7 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
pub struct InstanceCommon {
info: Arc<ModuleInfo>,
energy_monitor: Arc<dyn EnergyMonitor>,
scheduler: Scheduler,
allocated_memory: usize,
metric_wasm_memory_bytes: IntGauge,
vm_metrics: AllVmMetrics,
Expand All @@ -507,6 +508,7 @@ impl InstanceCommon {
info: module.info(),
vm_metrics,
energy_monitor: module.energy_monitor(),
scheduler: module.scheduler().clone(),
// Will be updated on the first reducer call.
allocated_memory: 0,
metric_wasm_memory_bytes: WORKER_METRICS
Expand All @@ -519,6 +521,10 @@ impl InstanceCommon {
self.info.clone()
}

pub(crate) fn scheduler(&self) -> &Scheduler {
&self.scheduler
}

#[tracing::instrument(level = "trace", skip_all)]
pub(crate) fn update_database<I: WasmInstance>(
&mut self,
Expand Down Expand Up @@ -727,6 +733,15 @@ impl InstanceCommon {

let trapped = call_result.is_err();

if trapped {
if let Some(handler) = procedure_def.on_abort.as_ref() {
self.scheduler().volatile_nonatomic_schedule_immediate(
handler.to_string(),
FunctionArgs::Bsatn(args.get_bsatn().clone()),
);
}
}

let result = match call_result {
Err(err) => {
inst.log_traceback("procedure", &procedure_def.name, &err);
Expand Down
5 changes: 5 additions & 0 deletions crates/lib/src/db/raw_def/v10.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,9 @@ pub struct RawProcedureDefV10 {
/// it should be registered in the typespace and indirected through an [`AlgebraicType::Ref`].
pub return_type: AlgebraicType,

/// The name of the procedure to invoke if this procedure aborts.
pub on_abort: Option<RawIdentifier>,

/// Whether this procedure is callable from clients or is internal-only.
pub visibility: FunctionVisibility,
}
Expand Down Expand Up @@ -927,11 +930,13 @@ impl RawModuleDefV10Builder {
source_name: impl Into<RawIdentifier>,
params: ProductType,
return_type: AlgebraicType,
on_abort: Option<RawIdentifier>,
) {
self.procedures_mut().push(RawProcedureDefV10 {
source_name: source_name.into(),
params,
return_type,
on_abort,
visibility: FunctionVisibility::ClientCallable,
})
}
Expand Down
9 changes: 7 additions & 2 deletions crates/lib/src/db/raw_def/v9.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,9 @@ pub struct RawProcedureDefV9 {
/// This `ProductType` need not be registered in the typespace.
pub params: ProductType,

/// The name of the procedure to invoke if this procedure aborts.
pub on_abort: Option<RawIdentifier>,

/// The type of the return value.
///
/// If this is a user-defined product or sum type,
Expand Down Expand Up @@ -784,15 +787,17 @@ impl RawModuleDefV9Builder {
pub fn add_procedure(
&mut self,
name: impl Into<RawIdentifier>,
params: spacetimedb_sats::ProductType,
return_type: spacetimedb_sats::AlgebraicType,
params: ProductType,
return_type: AlgebraicType,
on_abort: Option<RawIdentifier>,
) {
self.module
.misc_exports
.push(RawMiscModuleExportV9::Procedure(RawProcedureDefV9 {
name: name.into(),
params,
return_type,
on_abort,
}))
}

Expand Down
5 changes: 5 additions & 0 deletions crates/schema/src/def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1645,6 +1645,9 @@ pub struct ProcedureDef {
/// and indirected through an [`AlgebraicTypeUse::Ref`].
pub return_type_for_generate: AlgebraicTypeUse,

/// The name of the procedure to invoke if this procedure aborts.
pub on_abort: Option<Identifier>,

/// The visibility of this procedure.
pub visibility: FunctionVisibility,
}
Expand All @@ -1655,6 +1658,7 @@ impl From<ProcedureDef> for RawProcedureDefV9 {
name: val.name.into(),
params: val.params,
return_type: val.return_type,
on_abort: val.on_abort.map(Into::into),
}
}
}
Expand All @@ -1665,6 +1669,7 @@ impl From<ProcedureDef> for RawProcedureDefV10 {
source_name: val.name.into(),
params: val.params,
return_type: val.return_type,
on_abort: val.on_abort.map(Into::into),
visibility: val.visibility.into(),
}
}
Expand Down
23 changes: 16 additions & 7 deletions crates/schema/src/def/validate/v10.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use spacetimedb_lib::de::DeserializeSeed as _;
use spacetimedb_sats::{Typespace, WithTypespace};

use crate::def::validate::v9::{
check_function_names_are_unique, check_scheduled_functions_exist, generate_schedule_name, identifier,
CoreValidator, TableValidator, ViewValidator,
check_function_names_are_unique, check_procedure_on_abort_handlers, check_scheduled_functions_exist,
generate_schedule_name, identifier, CoreValidator, TableValidator, ViewValidator,
};
use crate::def::*;
use crate::error::ValidationError;
Expand Down Expand Up @@ -164,6 +164,7 @@ pub fn validate(def: RawModuleDefV10) -> Result<ModuleDef> {
// Attach schedules to their respective tables
attach_schedules_to_tables(&mut tables, schedules)?;

check_procedure_on_abort_handlers(&procedures)?;
check_scheduled_functions_exist(&mut tables, &reducers, &procedures)?;
change_scheduled_functions_and_lifetimes_visibility(&tables, &mut reducers, &mut procedures)?;

Expand Down Expand Up @@ -519,6 +520,7 @@ impl<'a> ModuleValidatorV10<'a> {
source_name,
params,
return_type,
on_abort,
visibility,
} = procedure_def;

Expand All @@ -538,9 +540,10 @@ impl<'a> ModuleValidatorV10<'a> {
);

let name_result = identifier(source_name);
let on_abort = on_abort.map(identifier).transpose();

let (name_result, params_for_generate, return_type_for_generate) =
(name_result, params_for_generate, return_type_for_generate).combine_errors()?;
let (name_result, params_for_generate, return_type_for_generate, on_abort) =
(name_result, params_for_generate, return_type_for_generate, on_abort).combine_errors()?;

Ok(ProcedureDef {
name: name_result,
Expand All @@ -551,6 +554,7 @@ impl<'a> ModuleValidatorV10<'a> {
},
return_type,
return_type_for_generate,
on_abort,
visibility: visibility.into(),
})
}
Expand Down Expand Up @@ -1491,8 +1495,13 @@ mod tests {
fn duplicate_procedure_names() {
let mut builder = RawModuleDefV10Builder::new();

builder.add_procedure("foo", [("i", AlgebraicType::I32)].into(), AlgebraicType::unit());
builder.add_procedure("foo", [("name", AlgebraicType::String)].into(), AlgebraicType::unit());
builder.add_procedure("foo", [("i", AlgebraicType::I32)].into(), AlgebraicType::unit(), None);
builder.add_procedure(
"foo",
[("name", AlgebraicType::String)].into(),
AlgebraicType::unit(),
None,
);

let result: Result<ModuleDef> = builder.finish().try_into();

Expand All @@ -1506,7 +1515,7 @@ mod tests {
let mut builder = RawModuleDefV10Builder::new();

builder.add_reducer("foo", [("i", AlgebraicType::I32)].into());
builder.add_procedure("foo", [("i", AlgebraicType::I32)].into(), AlgebraicType::unit());
builder.add_procedure("foo", [("i", AlgebraicType::I32)].into(), AlgebraicType::unit(), None);

let result: Result<ModuleDef> = builder.finish().try_into();

Expand Down
55 changes: 52 additions & 3 deletions crates/schema/src/def/validate/v9.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ pub fn validate(def: RawModuleDefV9) -> Result<ModuleDef> {
check_non_procedure_misc_exports(misc_exports, &validator, &mut tables),
)
.combine_errors()?;
check_procedure_on_abort_handlers(&procedures)?;
check_scheduled_functions_exist(&mut tables, &reducers, &procedures)?;
Ok((tables, types, reducers, procedures, views))
});
Expand Down Expand Up @@ -376,6 +377,7 @@ impl ModuleValidatorV9<'_> {
name,
params,
return_type,
on_abort,
} = procedure_def;

let params_for_generate =
Expand All @@ -396,9 +398,11 @@ impl ModuleValidatorV9<'_> {
// Procedures share the "function namespace" with reducers.
// Uniqueness is validated in a later pass, in `check_function_names_are_unique`.
let name = identifier(name);
let on_abort = on_abort.map(identifier).transpose();

let (name, params_for_generate, return_type_for_generate) =
(name, params_for_generate, return_type_for_generate).combine_errors()?;
let on_abort = on_abort?;

Ok(ProcedureDef {
name,
Expand All @@ -409,6 +413,7 @@ impl ModuleValidatorV9<'_> {
},
return_type,
return_type_for_generate,
on_abort,
visibility: FunctionVisibility::ClientCallable,
})
}
Expand Down Expand Up @@ -1277,6 +1282,35 @@ pub(crate) fn identifier(name: RawIdentifier) -> Result<Identifier> {
Identifier::new(name).map_err(|error| ValidationError::IdentifierError { error }.into())
}

/// Check that every procedure's on-abort handler exists and has matching params.
pub(crate) fn check_procedure_on_abort_handlers(procedures: &IndexMap<Identifier, ProcedureDef>) -> Result<()> {
procedures
.values()
.filter_map(|procedure| procedure.on_abort.as_ref().map(|handler| (procedure, handler)))
.map(|(procedure, handler)| {
let Some(handler_def) = procedures.get(handler) else {
return Err(ValidationError::MissingProcedureOnAbortHandler {
procedure: procedure.name.clone(),
handler: handler.clone(),
}
.into());
};

if handler_def.params == procedure.params {
Ok(())
} else {
Err(ValidationError::ProcedureOnAbortParamsMismatch {
procedure: procedure.name.clone(),
handler: handler.clone(),
expected: procedure.params.clone().into(),
actual: handler_def.params.clone().into(),
}
.into())
}
})
.collect_all_errors()
}

/// Check that every [`ScheduleDef`]'s `function_name` refers to a real reducer or procedure
/// and that the function's arguments are appropriate for the table,
/// then record the scheduled function's [`FunctionKind`] in the [`ScheduleDef`].
Expand Down Expand Up @@ -2206,8 +2240,18 @@ mod tests {
fn duplicate_procedure_names() {
let mut builder = RawModuleDefV9Builder::new();

builder.add_procedure("foo", [("i", AlgebraicType::I32)].into(), AlgebraicType::unit());
builder.add_procedure("foo", [("name", AlgebraicType::String)].into(), AlgebraicType::unit());
builder.add_procedure(
"foo",
[("i", AlgebraicType::I32)].into(),
AlgebraicType::unit(),
/* on_abort */ None,
);
builder.add_procedure(
"foo",
[("name", AlgebraicType::String)].into(),
AlgebraicType::unit(),
/* on_abort */ None,
);

let result: Result<ModuleDef> = builder.finish().try_into();

Expand All @@ -2221,7 +2265,12 @@ mod tests {
let mut builder = RawModuleDefV9Builder::new();

builder.add_reducer("foo", [("i", AlgebraicType::I32)].into(), None);
builder.add_procedure("foo", [("i", AlgebraicType::I32)].into(), AlgebraicType::unit());
builder.add_procedure(
"foo",
[("i", AlgebraicType::I32)].into(),
AlgebraicType::unit(),
/* on_abort */ None,
);

let result: Result<ModuleDef> = builder.finish().try_into();

Expand Down
Loading