From 87b06044ca7b4d31c6cd070ee9032066423052cd Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Tue, 9 Jun 2026 17:17:34 -0700 Subject: [PATCH] Stop tracking multiple plan types in the subscription cache --- crates/core/src/host/module_host.rs | 10 +- .../src/host/wasm_common/module_host_actor.rs | 9 +- crates/core/src/subscription/metrics.rs | 102 +--- crates/core/src/subscription/mod.rs | 201 +++---- .../subscription/module_subscription_actor.rs | 491 ++++++++++-------- .../module_subscription_manager.rs | 6 +- crates/core/src/subscription/query.rs | 28 +- crates/core/src/subscription/subscription.rs | 11 +- crates/subscription/src/lib.rs | 226 +++++++- 9 files changed, 608 insertions(+), 476 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 9ce704426f7..aa6344265c4 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -51,7 +51,7 @@ use spacetimedb_datastore::execution_context::{Workload, WorkloadType}; use spacetimedb_datastore::locking_tx_datastore::{MutTxId, ViewCallInfo}; use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData}; pub use spacetimedb_durability::{DurabilityExited, DurableOffset}; -use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject}; +use spacetimedb_execution::pipelined::PipelinedProject; use spacetimedb_execution::RelValue; use spacetimedb_expr::expr::CollectViews; use spacetimedb_lib::db::raw_def::v9::Lifecycle; @@ -3307,13 +3307,9 @@ impl ModuleHost { let table_name = table_name.into(); let delta_tx = DeltaTx::from(tx); let (rows, _, metrics) = if returns_view_table && num_private_cols > 0 { - let optimized = optimized - .into_iter() - .map(|plan| ViewProject::new(plan, num_cols, num_private_cols)) - .collect::>(); - execute_plan_for_view::(&optimized, &delta_tx, rlb_pool) + execute_plan_for_view::(optimized.iter(), num_cols, num_private_cols, &delta_tx, rlb_pool) } else { - execute_plan::(&optimized, &delta_tx, rlb_pool) + execute_plan::(optimized.iter(), &delta_tx, rlb_pool) } .context("One-off queries are not allowed to modify the database")?; diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index ec6ea820995..97d7a3e7fc5 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -39,7 +39,6 @@ use spacetimedb_datastore::error::{DatastoreError, ViewError}; use spacetimedb_datastore::execution_context::{self, ReducerContext, Workload}; use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId, ViewCallInfo}; use spacetimedb_datastore::traits::{IsolationLevel, Program}; -use spacetimedb_execution::pipelined::PipelinedProject; use spacetimedb_lib::buffer::DecodeError; use spacetimedb_lib::db::raw_def::v9::{Lifecycle, ViewResultHeader}; use spacetimedb_lib::de::DeserializeSeed; @@ -188,11 +187,10 @@ pub(crate) fn run_query_for_view( // Validate shape and disallow views-on-views. for plan in &plans { - let phys = plan.optimized_physical_plan(); - let Some(source_schema) = phys.return_table() else { + let Some(source_schema) = plan.return_table() else { bail!("query does not return plain table rows"); }; - if phys.reads_from_view(true) || phys.reads_from_view(false) { + if plan.reads_from_view(true) || plan.reads_from_view(false) { bail!("view definition cannot read from other views"); } if source_schema.row_type != *expected_row_type { @@ -215,8 +213,7 @@ pub(crate) fn run_query_for_view( tx.record_table_scan(&op, table_id); } - let pipelined = PipelinedProject::from(plan.optimized_physical_plan().clone()); - pipelined.execute(&*tx, &mut metrics, &mut |row| { + plan.base_plan().execute(&*tx, &mut metrics, &mut |row| { rows.push(row.to_product_value()); Ok(()) })?; diff --git a/crates/core/src/subscription/metrics.rs b/crates/core/src/subscription/metrics.rs index b367a57056b..b8d528b3a4f 100644 --- a/crates/core/src/subscription/metrics.rs +++ b/crates/core/src/subscription/metrics.rs @@ -1,21 +1,5 @@ -use spacetimedb_physical_plan::plan::PhysicalPlan; -use spacetimedb_schema::{schema::TableSchema, table_name::TableName}; -use std::sync::Arc; - -/// Scan strategy types for subscription queries -#[derive(Debug, Clone, Copy)] -enum ScanStrategy { - /// Full table scan - no indexes used - Sequential, - /// Uses index but requires post-filtering on non-indexed columns - IndexedWithFilter, - /// Fully indexed - no post-filtering needed - FullyIndexed, - /// Mixed strategy (combination of index and table scans) - Mixed, - /// Unknown/other strategy - Unknown, -} +use spacetimedb_schema::table_name::TableName; +use spacetimedb_subscription::SubscriptionPlanMetrics; /// Metrics data for a single subscription query execution #[derive(Debug)] @@ -27,93 +11,17 @@ pub struct QueryMetrics { pub execution_time_micros: u64, } -impl std::fmt::Display for ScanStrategy { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Sequential => write!(f, "sequential"), - Self::IndexedWithFilter => write!(f, "indexed_with_filter"), - Self::FullyIndexed => write!(f, "fully_indexed"), - Self::Mixed => write!(f, "mixed"), - Self::Unknown => write!(f, "unknown"), - } - } -} - -/// Recursively extracts column names from filter expressions -fn extract_columns( - expr: &spacetimedb_physical_plan::plan::PhysicalExpr, - schema: Option<&Arc>, - columns: &mut Vec, -) { - use spacetimedb_physical_plan::plan::PhysicalExpr; - - match expr { - PhysicalExpr::Field(tuple_field) => { - let col_name = schema - .and_then(|s| s.columns.get(tuple_field.field_pos)) - .map(|col| col.col_name.to_string()) - .unwrap_or_else(|| format!("col_{}", tuple_field.field_pos)); - columns.push(col_name); - } - PhysicalExpr::BinOp(_, lhs, rhs) => { - extract_columns(lhs, schema, columns); - extract_columns(rhs, schema, columns); - } - PhysicalExpr::LogOp(_, exprs) => { - for expr in exprs { - extract_columns(expr, schema, columns); - } - } - PhysicalExpr::Value(_) => {} - } -} - /// Analyzes subscription scan strategy and creates QueryMetrics pub fn get_query_metrics( table_name: TableName, - plan: &PhysicalPlan, + plan_metrics: &SubscriptionPlanMetrics, rows_scanned: u64, execution_time_micros: u64, ) -> QueryMetrics { - let has_table_scan = plan.any(&|p| matches!(p, PhysicalPlan::TableScan(..))); - let has_index_scan = plan.any(&|p| matches!(p, PhysicalPlan::IxScan(..))); - let has_post_filter = plan.any(&|p| matches!(p, PhysicalPlan::Filter(..))); - - let strategy = if has_table_scan && has_index_scan { - ScanStrategy::Mixed - } else if has_table_scan { - ScanStrategy::Sequential - } else if has_index_scan && has_post_filter { - ScanStrategy::IndexedWithFilter - } else if has_index_scan { - ScanStrategy::FullyIndexed - } else { - ScanStrategy::Unknown - }; - - // Extract the schema from the plan - let mut schema: Option> = None; - plan.visit(&mut |p| match p { - PhysicalPlan::TableScan(scan, _) => { - schema = Some(scan.schema.clone()); - } - PhysicalPlan::IxScan(scan, _) => { - schema = Some(scan.schema.clone()); - } - _ => {} - }); - - let mut columns = Vec::new(); - plan.visit(&mut |p| { - if let PhysicalPlan::Filter(_, expr) = p { - extract_columns(expr, schema.as_ref(), &mut columns); - } - }); - QueryMetrics { - scan_type: strategy.to_string(), + scan_type: plan_metrics.scan_type().to_owned(), table_name, - unindexed_columns: columns.join(","), + unindexed_columns: plan_metrics.unindexed_columns().to_owned(), rows_scanned, execution_time_micros, } diff --git a/crates/core/src/subscription/mod.rs b/crates/core/src/subscription/mod.rs index 40d6d712504..70565a82b1e 100644 --- a/crates/core/src/subscription/mod.rs +++ b/crates/core/src/subscription/mod.rs @@ -10,11 +10,9 @@ use spacetimedb_client_api_messages::websocket::v1::{self as ws_v1}; use spacetimedb_datastore::{ db_metrics::DB_METRICS, execution_context::WorkloadType, locking_tx_datastore::datastore::MetricsRecorder, }; -use spacetimedb_execution::pipelined::ViewProject; -use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore}; -use spacetimedb_lib::identity::AuthCtx; +use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore, Row}; use spacetimedb_lib::{metrics::ExecutionMetrics, Identity}; -use spacetimedb_primitives::TableId; +use spacetimedb_primitives::{ColList, TableId}; use spacetimedb_sats::bsatn::ToBsatn; use spacetimedb_sats::Serialize; use spacetimedb_schema::table_name::TableName; @@ -98,33 +96,38 @@ impl MetricsRecorder for ExecutionCounters { } } -/// Execute a subscription query over a view. -/// -/// Specifically this utility is for queries that return rows from a view. -/// Unlike user tables, views have internal columns that should not be returned to clients. -/// The [`ViewProject`] operator implicitly drops these columns as part of its execution. -/// -/// NOTE: This method was largely copied from [`execute_plan`]. -/// TODO: Merge with [`execute_plan`]. -pub fn execute_plan_for_view( - plan_fragments: &[ViewProject], +/// Execute subscription query fragments over a view. +pub fn execute_plan_for_view<'p, F>( + plan_fragments: impl IntoIterator, + num_cols: usize, + num_private_cols: usize, tx: &(impl Datastore + DeltaStore), rlb_pool: &impl RowListBuilderSource, -) -> Result<(F::List, u64, ExecutionMetrics)> { +) -> Result<(F::List, u64, ExecutionMetrics)> +where + F: BuildableWebsocketFormat, +{ build_list_with_executor(rlb_pool, |metrics, add| { + let col_list = ColList::from_iter(num_private_cols..num_cols); for fragment in plan_fragments { - fragment.execute(tx, metrics, add)?; + fragment.execute(tx, metrics, &mut |row| match row { + Row::Ptr(ptr) => add(ptr.project_product(&col_list)?), + Row::Ref(val) => add(val.project_product(&col_list)?), + })?; } Ok(()) }) } /// Execute a subscription query -pub fn execute_plan( - plan_fragments: &[PipelinedProject], +pub fn execute_plan<'p, F>( + plan_fragments: impl IntoIterator, tx: &(impl Datastore + DeltaStore), rlb_pool: &impl RowListBuilderSource, -) -> Result<(F::List, u64, ExecutionMetrics)> { +) -> Result<(F::List, u64, ExecutionMetrics)> +where + F: BuildableWebsocketFormat, +{ build_list_with_executor(rlb_pool, |metrics, add| { for fragment in plan_fragments { fragment.execute(tx, metrics, add)?; @@ -163,16 +166,45 @@ pub enum TableUpdateType { Unsubscribe, } -/// Execute a subscription query over a view and collect the results in a [TableUpdate]. -/// -/// Specifically this utility is for queries that return rows from a view. -/// Unlike user tables, views have internal columns that should not be returned to clients. -/// The [`ViewProject`] operator implicitly drops these columns as part of its execution. -/// -/// NOTE: This method was largely copied from [`collect_table_update`]. -/// TODO: Merge with [`collect_table_update`]. -pub fn collect_table_update_for_view( - plan_fragments: &[ViewProject], +fn table_update_from_rows( + rows: F::List, + num_rows: u64, + metrics: ExecutionMetrics, + table_id: TableId, + table_name: TableName, + update_type: TableUpdateType, +) -> (ws_v1::TableUpdate, ExecutionMetrics) { + let empty = F::List::default(); + let qu = match update_type { + TableUpdateType::Subscribe => ws_v1::QueryUpdate { + deletes: empty, + inserts: rows, + }, + TableUpdateType::Unsubscribe => ws_v1::QueryUpdate { + deletes: rows, + inserts: empty, + }, + }; + // We will compress the outer server message, + // after we release the tx lock. + // There's no need to compress the inner table update too. + let update = F::into_query_update(qu, ws_v1::Compression::None); + ( + ws_v1::TableUpdate::new( + table_id, + table_name.into(), + ws_v1::SingleQueryUpdate { update, num_rows }, + ), + metrics, + ) +} + +/// Execute subscription query fragments over a view and collect the results in a [TableUpdate]. +#[allow(clippy::too_many_arguments)] +pub fn collect_table_update_for_view<'p, Tx, F>( + plan_fragments: impl IntoIterator, + num_cols: usize, + num_private_cols: usize, table_id: TableId, table_name: TableName, tx: &Tx, @@ -183,72 +215,30 @@ where Tx: Datastore + DeltaStore, F: BuildableWebsocketFormat, { - execute_plan_for_view::(plan_fragments, tx, rlb_pool).map(|(rows, num_rows, metrics)| { - let empty = F::List::default(); - let qu = match update_type { - TableUpdateType::Subscribe => ws_v1::QueryUpdate { - deletes: empty, - inserts: rows, - }, - TableUpdateType::Unsubscribe => ws_v1::QueryUpdate { - deletes: rows, - inserts: empty, - }, - }; - // We will compress the outer server message, - // after we release the tx lock. - // There's no need to compress the inner table update too. - let update = F::into_query_update(qu, ws_v1::Compression::None); - ( - ws_v1::TableUpdate::new( - table_id, - table_name.clone().into(), - ws_v1::SingleQueryUpdate { update, num_rows }, - ), - metrics, - ) - }) + execute_plan_for_view::(plan_fragments, num_cols, num_private_cols, tx, rlb_pool).map( + |(rows, num_rows, metrics)| table_update_from_rows(rows, num_rows, metrics, table_id, table_name, update_type), + ) } /// Execute a subscription query and collect the results in a [TableUpdate] -pub fn collect_table_update( - plan_fragments: &[PipelinedProject], +pub fn collect_table_update<'p, F>( + plan_fragments: impl IntoIterator, table_id: TableId, table_name: TableName, tx: &(impl Datastore + DeltaStore), update_type: TableUpdateType, rlb_pool: &impl RowListBuilderSource, -) -> Result<(ws_v1::TableUpdate, ExecutionMetrics)> { +) -> Result<(ws_v1::TableUpdate, ExecutionMetrics)> +where + F: BuildableWebsocketFormat, +{ execute_plan::(plan_fragments, tx, rlb_pool).map(|(rows, num_rows, metrics)| { - let empty = F::List::default(); - let qu = match update_type { - TableUpdateType::Subscribe => ws_v1::QueryUpdate { - deletes: empty, - inserts: rows, - }, - TableUpdateType::Unsubscribe => ws_v1::QueryUpdate { - deletes: rows, - inserts: empty, - }, - }; - // We will compress the outer server message, - // after we release the tx lock. - // There's no need to compress the inner table update too. - let update = F::into_query_update(qu, ws_v1::Compression::None); - ( - ws_v1::TableUpdate::new( - table_id, - table_name.clone().into(), - ws_v1::SingleQueryUpdate { update, num_rows }, - ), - metrics, - ) + table_update_from_rows(rows, num_rows, metrics, table_id, table_name, update_type) }) } /// Execute a collection of subscription queries in parallel pub fn execute_plans( - auth: &AuthCtx, plans: &[Arc], tx: &(impl Datastore + DeltaStore + Sync), update_type: TableUpdateType, @@ -263,43 +253,24 @@ pub fn execute_plans( plan.table_ids().all(|table_id| tx.row_count(table_id) > 0) }) .map(|(sql, plan)| (sql, plan, plan.subscribed_table_id(), plan.subscribed_table_name())) - .map(|(sql, plan, table_id, table_name)| (sql, plan.optimized_physical_plan().clone(), table_id, table_name)) - .map(|(sql, plan, table_id, table_name)| (sql, plan.optimize(auth), table_id, table_name)) .map(|(sql, plan, table_id, table_name)| { - plan.and_then(|plan| { + { let start_time = std::time::Instant::now(); - let result = if plan.returns_view_table() { - match plan.return_table() { - Some(schema) => { - let pipelined_plan = PipelinedProject::from(plan.clone()); - let view_plan = - ViewProject::new(pipelined_plan, schema.num_cols(), schema.num_private_cols()); - collect_table_update_for_view( - &[view_plan], - table_id, - table_name.clone(), - tx, - update_type, - rlb_pool, - )? - } - _ => { - let pipelined_plan = PipelinedProject::from(plan.clone()); - collect_table_update( - &[pipelined_plan], - table_id, - table_name.clone(), - tx, - update_type, - rlb_pool, - )? - } - } + let result = if plan.is_view() { + collect_table_update_for_view( + std::iter::once(plan.base_plan()), + plan.num_cols(), + plan.num_private_cols(), + table_id, + table_name.clone(), + tx, + update_type, + rlb_pool, + )? } else { - let pipelined_plan = PipelinedProject::from(plan.clone()); collect_table_update( - &[pipelined_plan], + std::iter::once(plan.base_plan()), table_id, table_name.clone(), tx, @@ -313,13 +284,13 @@ pub fn execute_plans( let (ref _table_update, ref metrics) = result; let query_metrics = metrics::get_query_metrics( table_name.clone(), - &plan, + plan.scan_metrics(), metrics.rows_scanned as u64, elapsed.as_micros() as u64, ); Ok((result.0, result.1, Some(query_metrics))) - }) + } .map_err(|err| DBError::WithSql { sql: sql.into(), error: Box::new(DBError::Other(err)), diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index aede13262df..dca5810e13d 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -4,9 +4,9 @@ use super::module_subscription_manager::{ from_tx_offset, spawn_send_worker, BroadcastError, BroadcastQueue, Plan, SubscriptionGaugeStats, SubscriptionManager, TransactionOffset, }; -use super::query::compile_query_with_hashes; +use super::query::{compile_query_with_hashes, CompiledQuery}; use super::tx::DeltaTx; -use super::{collect_table_update, TableUpdateType}; +use super::TableUpdateType; use crate::client::messages::{ ProcedureResultMessage, SerializableMessage, SubscriptionData, SubscriptionError, SubscriptionMessage, SubscriptionResult, SubscriptionRows, SubscriptionUpdateMessage, TransactionUpdateMessage, @@ -19,7 +19,7 @@ use crate::host::module_host::{DatabaseUpdate, EventStatus, ModuleEvent, RefInst use crate::host::{self, ModuleHost}; use crate::subscription::query::is_subscribe_to_all_tables; use crate::subscription::row_list_builder_pool::{BsatnRowListBuilderPool, JsonRowListBuilderFakePool}; -use crate::subscription::{collect_table_update_for_view, execute_plans}; +use crate::subscription::{collect_table_update, collect_table_update_for_view, execute_plans}; use crate::util::prometheus_handle::IntGaugeExt; use crate::worker_metrics::WORKER_METRICS; use core::panic; @@ -28,19 +28,19 @@ use prometheus::{Histogram, HistogramTimer, IntCounter, IntGauge}; use scopeguard::ScopeGuard; use spacetimedb_client_api_messages::websocket::v1 as ws_v1; use spacetimedb_client_api_messages::websocket::v2 as ws_v2; -use spacetimedb_data_structures::map::{HashCollectionExt as _, HashSet}; +use spacetimedb_data_structures::map::{HashCollectionExt as _, HashMap, HashSet}; use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::execution_context::{Workload, WorkloadType}; use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics; use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId}; use spacetimedb_datastore::traits::{IsolationLevel, TxData}; use spacetimedb_durability::TxOffset; -use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject}; use spacetimedb_expr::expr::CollectViews; use spacetimedb_lib::identity::RequestId; use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_lib::Identity; use spacetimedb_lib::{bsatn, identity::AuthCtx}; +use spacetimedb_physical_plan::plan::ProjectPlan; use spacetimedb_primitives::ArgId; use spacetimedb_schema::def::RawModuleDefVersion; use spacetimedb_table::static_assert_size; @@ -210,6 +210,42 @@ type SubscriptionUpdate = type FullSubscriptionUpdate = ws_v1::FormatSwitch, ws_v1::DatabaseUpdate>; +struct CompiledQueryBatch { + queries: Vec>, + physical_plans: HashMap>, + auth: AuthCtx, + mut_tx: MutTxId, + compile_timer: HistogramTimer, +} + +#[derive(Clone, Copy)] +enum FailedSubscription { + V1(ws_v1::QueryId), + V2(ws_v2::QuerySetId), +} + +fn add_compiled_query( + compiled: CompiledQuery, + cached_queries: &SubscriptionManager, + plans: &mut Vec>, + compiled_queries: &mut HashMap>, + physical_plans: &mut HashMap>, + new_queries: &mut u64, +) { + let hash = compiled.plan.hash(); + if let Some(unit) = cached_queries.query(&hash) { + plans.push(unit); + } else if let Some(unit) = compiled_queries.get(&hash) { + plans.push(unit.clone()); + } else { + let plan = Arc::new(compiled.plan); + physical_plans.insert(hash, compiled.physical_plans); + compiled_queries.insert(hash, plan.clone()); + plans.push(plan); + *new_queries += 1; + } +} + fn query_rows_from_update( update: ws_v1::DatabaseUpdate, use_deletes: bool, @@ -418,40 +454,16 @@ impl ModuleSubscriptions { sender: Arc, query: Arc, tx: &TxId, - auth: &AuthCtx, update_type: TableUpdateType, ) -> Result<(SubscriptionUpdate, ExecutionMetrics), DBError> { - check_row_limit( - &[&query], - &self.relational_db, - tx, - |plan, tx| { - plan.plans_fragments() - .map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.optimized_physical_plan())) - .fold(0, |acc, rows_scanned| acc.saturating_add(rows_scanned)) - }, - auth, - )?; - let table_id = query.subscribed_table_id(); let table_name = query.subscribed_table_name().clone(); - - let plans = query - .plans_fragments() - .map(|fragment| fragment.optimized_physical_plan()) - .cloned() - .map(|plan| plan.optimize(auth)) - .collect::, _>>()?; - - let view_info = plans - .first() - .and_then(|plan| plan.return_table()) - .and_then(|schema| schema.view_info); - - let num_cols = plans + let plan_fragments = query.plans_fragments().collect::>(); + let is_view = plan_fragments.first().is_some_and(|plan| plan.is_view()); + let num_cols = plan_fragments.first().map(|plan| plan.num_cols()).unwrap_or_default(); + let num_private_cols = plan_fragments .first() - .and_then(|plan| plan.return_table()) - .map(|schema| schema.num_cols()) + .map(|plan| plan.num_private_cols()) .unwrap_or_default(); let tx = DeltaTx::from(tx); @@ -459,63 +471,47 @@ impl ModuleSubscriptions { // TODO: See the comment on `collect_table_update_for_view`. // The following view and non-view branches should be merged together, // since the only difference between them is the row type that is returned. - Ok(match (sender.config.protocol, view_info) { - (Protocol::Binary, Some(view_info)) => { - let plans = plans - .into_iter() - .map(PipelinedProject::from) - .map(|plan| ViewProject::new(plan, num_cols, view_info.num_private_cols())) - .collect::>(); - collect_table_update_for_view( - &plans, - table_id, - table_name.clone(), - &tx, - update_type, - &self.bsatn_rlb_pool, - ) - .map(|(table_update, metrics)| (ws_v1::FormatSwitch::Bsatn(table_update), metrics)) - } - (Protocol::Binary, None) => { - let plans = plans.into_iter().map(PipelinedProject::from).collect::>(); - collect_table_update( - &plans, - table_id, - table_name.clone(), - &tx, - update_type, - &self.bsatn_rlb_pool, - ) - .map(|(table_update, metrics)| (ws_v1::FormatSwitch::Bsatn(table_update), metrics)) - } - (Protocol::Text, Some(view_info)) => { - let plans = plans - .into_iter() - .map(PipelinedProject::from) - .map(|plan| ViewProject::new(plan, num_cols, view_info.num_private_cols())) - .collect::>(); - collect_table_update_for_view( - &plans, - table_id, - table_name, - &tx, - update_type, - &JsonRowListBuilderFakePool, - ) - .map(|(table_update, metrics)| (ws_v1::FormatSwitch::Json(table_update), metrics)) - } - (Protocol::Text, None) => { - let plans = plans.into_iter().map(PipelinedProject::from).collect::>(); - collect_table_update( - &plans, - table_id, - table_name, - &tx, - update_type, - &JsonRowListBuilderFakePool, - ) - .map(|(table_update, metrics)| (ws_v1::FormatSwitch::Json(table_update), metrics)) - } + Ok(match (sender.config.protocol, is_view) { + (Protocol::Binary, true) => collect_table_update_for_view( + plan_fragments.iter().map(|plan| plan.base_plan()), + num_cols, + num_private_cols, + table_id, + table_name.clone(), + &tx, + update_type, + &self.bsatn_rlb_pool, + ) + .map(|(table_update, metrics)| (ws_v1::FormatSwitch::Bsatn(table_update), metrics)), + (Protocol::Binary, false) => collect_table_update( + plan_fragments.iter().map(|plan| plan.base_plan()), + table_id, + table_name.clone(), + &tx, + update_type, + &self.bsatn_rlb_pool, + ) + .map(|(table_update, metrics)| (ws_v1::FormatSwitch::Bsatn(table_update), metrics)), + (Protocol::Text, true) => collect_table_update_for_view( + plan_fragments.iter().map(|plan| plan.base_plan()), + num_cols, + num_private_cols, + table_id, + table_name, + &tx, + update_type, + &JsonRowListBuilderFakePool, + ) + .map(|(table_update, metrics)| (ws_v1::FormatSwitch::Json(table_update), metrics)), + (Protocol::Text, false) => collect_table_update( + plan_fragments.iter().map(|plan| plan.base_plan()), + table_id, + table_name, + &tx, + update_type, + &JsonRowListBuilderFakePool, + ) + .map(|(table_update, metrics)| (ws_v1::FormatSwitch::Json(table_update), metrics)), }?) } @@ -524,32 +520,18 @@ impl ModuleSubscriptions { sender: Arc, queries: &[Arc], tx: &TxId, - auth: &AuthCtx, update_type: TableUpdateType, ) -> Result<(FullSubscriptionUpdate, ExecutionMetrics), DBError> { - check_row_limit( - queries, - &self.relational_db, - tx, - |plan, tx| { - plan.plans_fragments() - .map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.optimized_physical_plan())) - .fold(0, |acc, rows_scanned| acc.saturating_add(rows_scanned)) - }, - auth, - )?; - let database_identity = self.relational_db.database_identity(); let tx = DeltaTx::from(tx); let (update, metrics, query_metrics) = match sender.config.protocol { Protocol::Binary => { - let (update, metrics, query_metrics) = - execute_plans(auth, queries, &tx, update_type, &self.bsatn_rlb_pool)?; + let (update, metrics, query_metrics) = execute_plans(queries, &tx, update_type, &self.bsatn_rlb_pool)?; (ws_v1::FormatSwitch::Bsatn(update), metrics, query_metrics) } Protocol::Text => { let (update, metrics, query_metrics) = - execute_plans(auth, queries, &tx, update_type, &JsonRowListBuilderFakePool)?; + execute_plans(queries, &tx, update_type, &JsonRowListBuilderFakePool)?; (ws_v1::FormatSwitch::Json(update), metrics, query_metrics) } }; @@ -559,6 +541,69 @@ impl ModuleSubscriptions { Ok((update, metrics)) } + fn check_new_query_row_limit( + &self, + queries: &[Arc], + physical_plans: &HashMap>, + tx: &TxId, + auth: &AuthCtx, + ) -> Result<(), DBError> { + let mut seen = HashSet::new(); + let physical_plans = queries + .iter() + .filter_map(|query| { + if seen.insert(query.hash()) { + physical_plans.get(&query.hash()) + } else { + None + } + }) + .collect::>(); + + if physical_plans.is_empty() { + return Ok(()); + } + + check_row_limit( + &physical_plans, + &self.relational_db, + tx, + |plans, tx| { + plans + .iter() + .map(|plan| estimate_rows_scanned(tx, plan.physical_plan())) + .fold(0, |acc, rows_scanned| acc.saturating_add(rows_scanned)) + }, + auth, + ) + } + + fn remove_failed_subscription( + &self, + subscription_metrics: &SubscriptionMetrics, + sender_id: ClientActorId, + subscription: FailedSubscription, + ) -> Result<(), DBError> { + let mut subscriptions = { + let _wait_guard = subscription_metrics.lock_waiters.inc_scope(); + let _wait_timer = subscription_metrics.lock_wait_time.start_timer(); + self.subscriptions.write() + }; + { + let _compile_timer = subscription_metrics.compilation_time.start_timer(); + match subscription { + FailedSubscription::V1(query_id) => { + subscriptions.remove_subscription((sender_id.identity, sender_id.connection_id), query_id)?; + } + FailedSubscription::V2(query_set_id) => { + subscriptions + .remove_subscription_v2((sender_id.identity, sender_id.connection_id), query_set_id)?; + } + } + } + Ok(()) + } + /// Add a subscription for a single query. /// /// - If `host` is `Some`, the request is forwarded to the module host. The host @@ -638,18 +683,19 @@ impl ModuleSubscriptions { let existing_query = { let guard = self.subscriptions.read(); - guard.query(&hash) + guard.query(&hash).or_else(|| guard.query(&hash_with_param)) }; + let mut physical_plans = HashMap::default(); let query = return_on_err_with_sql_bool!( - existing_query.map(Ok).unwrap_or_else(|| compile_query_with_hashes( - &auth, - &*mut_tx, - &sql, - hash, - hash_with_param - ) - .map(Arc::new)), + existing_query.map(Ok).unwrap_or_else(|| { + compile_query_with_hashes(&auth, &*mut_tx, &sql, hash, hash_with_param).map(|compiled| { + let hash = compiled.plan.hash(); + let query = Arc::new(compiled.plan); + physical_plans.insert(hash, compiled.physical_plans); + query + }) + }), sql, send_err_msg ); @@ -670,8 +716,14 @@ impl ModuleSubscriptions { let (mut tx, tx_offset, trapped) = self.materialize_views_and_downgrade_tx(mut_tx, instance, &query, auth.caller())?; + return_on_err_with_sql_bool!( + self.check_new_query_row_limit(std::slice::from_ref(&query), &physical_plans, &tx, &auth), + query.sql(), + send_err_msg + ); + let (table_rows, metrics) = return_on_err_with_sql_bool!( - self.evaluate_initial_subscription(sender.clone(), query.clone(), &tx, &auth, TableUpdateType::Subscribe), + self.evaluate_initial_subscription(sender.clone(), query.clone(), &tx, TableUpdateType::Subscribe), query.sql(), send_err_msg ); @@ -756,7 +808,7 @@ impl ModuleSubscriptions { let (mut tx, tx_offset) = self.unsubscribe_views(query, auth.caller())?; let (table_rows, metrics) = return_on_err_with_sql!( - self.evaluate_initial_subscription(sender.clone(), query.clone(), &tx, &auth, TableUpdateType::Unsubscribe), + self.evaluate_initial_subscription(sender.clone(), query.clone(), &tx, TableUpdateType::Unsubscribe), query.sql(), send_err_msg ); @@ -836,17 +888,15 @@ impl ModuleSubscriptions { let mut_tx = ScopeGuard::::into_inner(mut_tx); let (mut tx, tx_offset) = self.unsubscribe_views_and_downgrade_tx(mut_tx, &removed_queries, auth.caller())?; - let (update, metrics) = return_on_err!( - self.evaluate_queries( - sender.clone(), - &removed_queries, - &tx, - &auth, - TableUpdateType::Unsubscribe, - ), - send_err_msg, - None - ); + let eval_result = self.evaluate_queries(sender.clone(), &removed_queries, &tx, TableUpdateType::Unsubscribe); + let eval_result = match removed_queries.as_slice() { + [query] => eval_result.map_err(|error| DBError::WithSql { + error: Box::new(error), + sql: query.sql().into(), + }), + _ => eval_result, + }; + let (update, metrics) = return_on_err!(eval_result, send_err_msg, None); tx.metrics.merge(metrics); // How many queries did we evaluate? @@ -955,13 +1005,7 @@ impl ModuleSubscriptions { let (rows, metrics) = if request.flags == ws_v2::UnsubscribeFlags::SendDroppedRows { let (update, metrics) = return_on_err!( - self.evaluate_queries( - sender.clone(), - &removed_queries, - &tx, - &auth, - TableUpdateType::Unsubscribe, - ), + self.evaluate_queries(sender.clone(), &removed_queries, &tx, TableUpdateType::Unsubscribe,), send_err_msg, (None, false) ); @@ -1010,7 +1054,6 @@ impl ModuleSubscriptions { /// /// Instead we generate two hashes and outside of the tx lock. /// If either one is currently tracked, we can avoid recompilation. - #[allow(clippy::type_complexity)] fn compile_queries( &self, sender: Identity, @@ -1018,7 +1061,7 @@ impl ModuleSubscriptions { queries: &[Box], num_queries: usize, metrics: &SubscriptionMetrics, - ) -> Result<(Vec>, AuthCtx, MutTxId, HistogramTimer), DBError> { + ) -> Result { let mut subscribe_to_all_tables = false; let mut plans = Vec::with_capacity(num_queries); let mut query_hashes = Vec::with_capacity(num_queries); @@ -1046,49 +1089,72 @@ impl ModuleSubscriptions { self.subscriptions.read() }; + let mut new_queries = 0; + let mut compiled_queries: HashMap> = HashMap::default(); + let mut physical_plans: HashMap> = HashMap::default(); + if subscribe_to_all_tables { - plans.extend( - super::subscription::get_all( - |relational_db, tx| relational_db.get_all_tables_mut(tx).map(|schemas| schemas.into_iter()), - &self.relational_db, - &*mut_tx, - &auth, - )? - .into_iter() - .map(Arc::new), - ); + for compiled in super::subscription::get_all( + |relational_db, tx| relational_db.get_all_tables_mut(tx).map(|schemas| schemas.into_iter()), + &self.relational_db, + &*mut_tx, + &auth, + )? { + add_compiled_query( + compiled, + &guard, + &mut plans, + &mut compiled_queries, + &mut physical_plans, + &mut new_queries, + ); + } } - let mut new_queries = 0; - for (sql, hash, hash_with_param) in query_hashes { match guard.query(&hash) { Some(unit) => { plans.push(unit); } - _ => match guard.query(&hash_with_param) { - Some(unit) => { - plans.push(unit); - } - _ => { - plans.push(Arc::new( - compile_query_with_hashes(&auth, &*mut_tx, sql, hash, hash_with_param).map_err(|err| { - DBError::WithSql { + _ => { + match guard + .query(&hash_with_param) + .or_else(|| compiled_queries.get(&hash).cloned()) + .or_else(|| compiled_queries.get(&hash_with_param).cloned()) + { + Some(unit) => { + plans.push(unit); + } + _ => { + let compiled = compile_query_with_hashes(&auth, &*mut_tx, sql, hash, hash_with_param) + .map_err(|err| DBError::WithSql { error: Box::new(DBError::Other(err.into())), sql: sql.into(), - } - })?, - )); - new_queries += 1; + })?; + add_compiled_query( + compiled, + &guard, + &mut plans, + &mut compiled_queries, + &mut physical_plans, + &mut new_queries, + ); + } } - }, + } } } // How many queries in this subscription are not cached? metrics.num_new_queries_subscribed.inc_by(new_queries); - Ok((plans, auth, ScopeGuard::::into_inner(mut_tx), compile_timer)) + Ok(CompiledQueryBatch { + queries: plans, + physical_plans, + auth, + mut_tx: ScopeGuard::::into_inner(mut_tx), + compile_timer, + }) } /// Send a message to a client connection. @@ -1275,7 +1341,13 @@ impl ModuleSubscriptions { let num_queries = request.query_strings.len(); subscription_metrics.num_queries_subscribed.inc_by(num_queries as _); - let (queries, auth, mut_tx, _compile_timer) = return_on_err!( + let CompiledQueryBatch { + queries, + physical_plans, + auth, + mut_tx, + compile_timer: _compile_timer, + } = return_on_err!( self.compile_queries( sender.id.identity, auth, @@ -1308,8 +1380,19 @@ impl ModuleSubscriptions { let (mut tx, tx_offset, trapped) = self.materialize_views_and_downgrade_tx(mut_tx, instance, &queries, auth.caller())?; - let (update, metrics) = - self.evaluate_queries(sender.clone(), &queries, &tx, &auth, TableUpdateType::Subscribe)?; + let failed_subscription = FailedSubscription::V2(request.query_set_id); + if let Err(err) = self.check_new_query_row_limit(&queries, &physical_plans, &tx, &auth) { + self.remove_failed_subscription(subscription_metrics, sender.id, failed_subscription)?; + send_err_msg(err.to_string().into()); + return Ok((None, trapped)); + } + + let Ok((update, metrics)) = self.evaluate_queries(sender.clone(), &queries, &tx, TableUpdateType::Subscribe) + else { + self.remove_failed_subscription(subscription_metrics, sender.id, failed_subscription)?; + send_err_msg("Internal error evaluating queries".into()); + return Ok((None, trapped)); + }; tx.metrics.merge(metrics); subscription_metrics.num_queries_evaluated.inc_by(queries.len() as _); @@ -1366,7 +1449,13 @@ impl ModuleSubscriptions { let num_queries = request.query_strings.len(); subscription_metrics.num_queries_subscribed.inc_by(num_queries as _); - let (queries, auth, mut_tx, compile_timer) = return_on_err!( + let CompiledQueryBatch { + queries, + physical_plans, + auth, + mut_tx, + compile_timer, + } = return_on_err!( self.compile_queries( sender.id.identity, auth, @@ -1414,21 +1503,24 @@ impl ModuleSubscriptions { let (mut tx, tx_offset, trapped) = self.materialize_views_and_downgrade_tx(mut_tx, instance, &queries, auth.caller())?; - let Ok((update, metrics)) = - self.evaluate_queries(sender.clone(), &queries, &tx, &auth, TableUpdateType::Subscribe) + if let Err(err) = self.check_new_query_row_limit(&queries, &physical_plans, &tx, &auth) { + self.remove_failed_subscription( + subscription_metrics, + sender.id, + FailedSubscription::V1(request.query_id), + )?; + send_err_msg(err.to_string().into()); + return Ok((None, trapped)); + } + + let Ok((update, metrics)) = self.evaluate_queries(sender.clone(), &queries, &tx, TableUpdateType::Subscribe) else { // If we fail the query, we need to remove the subscription. - let mut subscriptions = { - // How contended is the lock? - let _wait_guard = subscription_metrics.lock_waiters.inc_scope(); - let _wait_timer = subscription_metrics.lock_wait_time.start_timer(); - self.subscriptions.write() - }; - { - let _compile_timer = subscription_metrics.compilation_time.start_timer(); - subscriptions.remove_subscription((sender.id.identity, sender.id.connection_id), request.query_id)?; - } - + self.remove_failed_subscription( + subscription_metrics, + sender.id, + FailedSubscription::V1(request.query_id), + )?; send_err_msg("Internal error evaluating queries".into()); return Ok((None, trapped)); }; @@ -1521,7 +1613,13 @@ impl ModuleSubscriptions { let num_queries = subscription.query_strings.len(); subscription_metrics.num_queries_subscribed.inc_by(num_queries as _); - let (queries, auth, mut_tx, compile_timer) = self.compile_queries( + let CompiledQueryBatch { + queries, + physical_plans, + auth, + mut_tx, + compile_timer, + } = self.compile_queries( sender.id.identity, auth, &subscription.query_strings, @@ -1532,35 +1630,18 @@ impl ModuleSubscriptions { let (mut tx, tx_offset, trapped) = self.materialize_views_and_downgrade_tx(mut_tx, instance, &queries, auth.caller())?; - check_row_limit( - &queries, - &self.relational_db, - &tx, - |plan, tx| { - plan.plans_fragments() - .map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.optimized_physical_plan())) - .fold(0, |acc, rows_scanned| acc.saturating_add(rows_scanned)) - }, - &auth, - )?; + self.check_new_query_row_limit(&queries, &physical_plans, &tx, &auth)?; // Record how long it took to compile the subscription drop(compile_timer); let delta_tx = DeltaTx::from(&*tx); let (database_update, metrics, query_metrics) = match sender.config.protocol { - Protocol::Binary => execute_plans( - &auth, - &queries, - &delta_tx, - TableUpdateType::Subscribe, - &self.bsatn_rlb_pool, - ) - .map(|(table_update, metrics, query_metrics)| { - (ws_v1::FormatSwitch::Bsatn(table_update), metrics, query_metrics) - })?, + Protocol::Binary => execute_plans(&queries, &delta_tx, TableUpdateType::Subscribe, &self.bsatn_rlb_pool) + .map(|(table_update, metrics, query_metrics)| { + (ws_v1::FormatSwitch::Bsatn(table_update), metrics, query_metrics) + })?, Protocol::Text => execute_plans( - &auth, &queries, &delta_tx, TableUpdateType::Subscribe, @@ -2652,7 +2733,7 @@ mod tests { let plan = compile_read_only_query(&auth, &tx, sql)?; let plan = Arc::new(plan); - let (_, metrics) = subs.evaluate_queries(sender, &[plan], &tx, &auth, TableUpdateType::Subscribe)?; + let (_, metrics) = subs.evaluate_queries(sender, &[plan], &tx, TableUpdateType::Subscribe)?; // We only probe the index once assert_eq!(metrics.index_seeks, 1); diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index 0700846fc74..3159b7e7b1e 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -119,11 +119,7 @@ impl Plan { /// Return the search arguments for this query fn search_args(&self) -> impl Iterator + use<> { let mut args = HashSet::new(); - for arg in self - .plans - .iter() - .flat_map(|subscription| subscription.optimized_physical_plan().search_args()) - { + for arg in self.plans.iter().flat_map(|subscription| subscription.search_args()) { args.insert(arg); } args.into_iter() diff --git a/crates/core/src/subscription/query.rs b/crates/core/src/subscription/query.rs index bea30f96b7f..012088bd313 100644 --- a/crates/core/src/subscription/query.rs +++ b/crates/core/src/subscription/query.rs @@ -8,6 +8,7 @@ use regex::Regex; use spacetimedb_datastore::locking_tx_datastore::state_view::StateView; use spacetimedb_execution::Datastore; use spacetimedb_lib::identity::AuthCtx; +use spacetimedb_physical_plan::plan::ProjectPlan; use spacetimedb_subscription::SubscriptionPlan; static WHITESPACE: Lazy = Lazy::new(|| Regex::new(r"^\s*$").unwrap()); @@ -24,6 +25,12 @@ pub fn is_subscribe_to_all_tables(sql: &str) -> bool { SUBSCRIBE_TO_ALL_TABLES_REGEX.is_match_at(sql, 0) } +pub(crate) struct CompiledQuery { + pub plan: Plan, + /// Optimized physical plans used only for row-limit estimation on newly compiled queries. + pub physical_plans: Vec, +} + /// Compile a string into a single read-only query. pub fn compile_read_only_query(auth: &AuthCtx, tx: &Tx, input: &str) -> Result { if is_whitespace_or_empty(input) { @@ -31,28 +38,33 @@ pub fn compile_read_only_query(auth: &AuthCtx, tx: &Tx, input: &str) -> Result

( +pub(crate) fn compile_query_with_hashes( auth: &AuthCtx, tx: &Tx, input: &str, hash: QueryHash, hash_with_param: QueryHash, -) -> Result { +) -> Result { if is_whitespace_or_empty(input) { return Err(SubscriptionError::Empty.into()); } let tx = SchemaViewer::new(tx, auth); - let (plans, has_param) = SubscriptionPlan::compile(input, &tx, auth)?; + let (plans, has_param, physical_plans) = SubscriptionPlan::compile_plans(input, &tx, auth)?; - if auth.bypass_rls() || has_param { - return Ok(Plan::new(plans, hash_with_param, input.to_owned())); - } - Ok(Plan::new(plans, hash, input.to_owned())) + let hash = if auth.bypass_rls() || has_param { + hash_with_param + } else { + hash + }; + Ok(CompiledQuery { + plan: Plan::new(plans, hash, input.to_owned()), + physical_plans, + }) } diff --git a/crates/core/src/subscription/subscription.rs b/crates/core/src/subscription/subscription.rs index a9f3bd12f62..b4198276dd5 100644 --- a/crates/core/src/subscription/subscription.rs +++ b/crates/core/src/subscription/subscription.rs @@ -1,5 +1,5 @@ use super::execution_unit::QueryHash; -use super::module_subscription_manager::Plan; +use super::query::CompiledQuery; use crate::db::relational_db::RelationalDB; use crate::error::DBError; use crate::sql::ast::SchemaViewer; @@ -16,7 +16,7 @@ pub(crate) fn get_all( relational_db: &RelationalDB, tx: &T, auth: &AuthCtx, -) -> Result, DBError> +) -> Result, DBError> where T: StateView, F: Fn(&RelationalDB, &T) -> Result, @@ -27,12 +27,13 @@ where .map(|schema| { let sql = format!("SELECT * FROM {}", schema.table_name); let tx = SchemaViewer::new(tx, auth); - SubscriptionPlan::compile(&sql, &tx, auth).map(|(plans, has_param)| { - Plan::new( + SubscriptionPlan::compile_plans(&sql, &tx, auth).map(|(plans, has_param, physical_plans)| CompiledQuery { + plan: super::module_subscription_manager::Plan::new( plans, QueryHash::from_string(&sql, auth.caller(), auth.bypass_rls() || has_param), sql, - ) + ), + physical_plans, }) }) .collect::>()?) diff --git a/crates/subscription/src/lib.rs b/crates/subscription/src/lib.rs index 00ac42ec1c9..94668cd1d49 100644 --- a/crates/subscription/src/lib.rs +++ b/crates/subscription/src/lib.rs @@ -9,11 +9,13 @@ use spacetimedb_execution::{ }; use spacetimedb_expr::{check::SchemaView, expr::CollectViews}; use spacetimedb_lib::{identity::AuthCtx, metrics::ExecutionMetrics, query::Delta, AlgebraicValue}; -use spacetimedb_physical_plan::plan::{IxJoin, IxScan, Label, PhysicalPlan, ProjectPlan, Sarg, TableScan, TupleField}; +use spacetimedb_physical_plan::plan::{ + IxJoin, IxScan, Label, PhysicalExpr, PhysicalPlan, ProjectPlan, Sarg, TableScan, TupleField, +}; use spacetimedb_primitives::{ColId, ColList, IndexId, TableId, ViewId}; use spacetimedb_query::compile_subscription; -use spacetimedb_schema::table_name::TableName; -use std::ops::RangeBounds; +use spacetimedb_schema::{schema::TableSchema, table_name::TableName}; +use std::{ops::RangeBounds, sync::Arc}; /// A subscription is a view over a particular table. /// How do we incrementally maintain that view? @@ -264,7 +266,7 @@ impl Fragments { /// ``` /// /// Whenever `a` is updated, only the relevant queries are evaluated. -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct JoinEdge { /// The [`TableId`] for `a` pub lhs_table: TableId, @@ -307,6 +309,108 @@ impl JoinEdge { } } +/// Metrics metadata derived once when a subscription plan is compiled. +#[derive(Debug, Clone)] +pub struct SubscriptionPlanMetrics { + scan_type: String, + unindexed_columns: String, +} + +impl SubscriptionPlanMetrics { + fn from_physical_plan(plan: &PhysicalPlan) -> Self { + let has_table_scan = plan.any(&|p| matches!(p, PhysicalPlan::TableScan(..))); + let has_index_scan = plan.any(&|p| matches!(p, PhysicalPlan::IxScan(..))); + let has_post_filter = plan.any(&|p| matches!(p, PhysicalPlan::Filter(..))); + + let scan_type = if has_table_scan && has_index_scan { + "mixed" + } else if has_table_scan { + "sequential" + } else if has_index_scan && has_post_filter { + "indexed_with_filter" + } else if has_index_scan { + "fully_indexed" + } else { + "unknown" + } + .to_owned(); + + let mut schema: Option> = None; + plan.visit(&mut |p| match p { + PhysicalPlan::TableScan(scan, _) => { + schema = Some(scan.schema.clone()); + } + PhysicalPlan::IxScan(scan, _) => { + schema = Some(scan.schema.clone()); + } + _ => {} + }); + + let mut columns = Vec::new(); + plan.visit(&mut |p| { + if let PhysicalPlan::Filter(_, expr) = p { + extract_columns(expr, schema.as_ref(), &mut columns); + } + }); + + Self { + scan_type, + unindexed_columns: columns.join(","), + } + } + + pub fn scan_type(&self) -> &str { + &self.scan_type + } + + pub fn unindexed_columns(&self) -> &str { + &self.unindexed_columns + } +} + +fn extract_columns(expr: &PhysicalExpr, schema: Option<&Arc>, columns: &mut Vec) { + match expr { + PhysicalExpr::Field(tuple_field) => { + let col_name = schema + .and_then(|s| s.columns.get(tuple_field.field_pos)) + .map(|col| col.col_name.to_string()) + .unwrap_or_else(|| format!("col_{}", tuple_field.field_pos)); + columns.push(col_name); + } + PhysicalExpr::BinOp(_, lhs, rhs) => { + extract_columns(lhs, schema, columns); + extract_columns(rhs, schema, columns); + } + PhysicalExpr::LogOp(_, exprs) => { + for expr in exprs { + extract_columns(expr, schema, columns); + } + } + PhysicalExpr::Value(_) => {} + } +} + +/// Metadata cached with a subscription after query planning is complete. +#[derive(Debug)] +struct SubscriptionMetadata { + /// A subscription can read from multiple tables. + table_ids: Vec, + /// The table or view returned by this plan, if it returns whole rows. + return_schema: Option>, + /// View ids read by this plan. + view_ids: Vec, + /// Whether this plan reads from an anonymous view. + reads_anonymous_view: bool, + /// Whether this plan reads from a non-anonymous view. + reads_non_anonymous_view: bool, + /// Search arguments used for pruning. + search_args: Vec<(TableId, ColId, AlgebraicValue)>, + /// Join edge used for pruning. + join_edge: Option<(JoinEdge, AlgebraicValue)>, + /// Scan classification used for runtime metrics. + scan_metrics: SubscriptionPlanMetrics, +} + /// A subscription defines a view over a table #[derive(Debug)] pub struct SubscriptionPlan { @@ -314,18 +418,17 @@ pub struct SubscriptionPlan { return_id: TableId, /// To which table are we subscribed? return_name: TableName, - /// A subscription can read from multiple tables. - /// From which tables do we read? - table_ids: Vec, + /// The cached executor for the non-incremental query plan. + base_plan: PipelinedProject, /// The plan fragments for updating the view fragments: Fragments, - /// The optimized plan without any delta scans - plan_opt: ProjectPlan, + /// Metadata derived from the physical plan at compile time. + metadata: SubscriptionMetadata, } impl CollectViews for SubscriptionPlan { fn collect_views(&self, views: &mut HashSet) { - self.plan_opt.collect_views(views); + views.extend(self.metadata.view_ids.iter().copied()); } } @@ -337,19 +440,26 @@ impl SubscriptionPlan { /// Does this plan return rows from a view? pub fn is_view(&self) -> bool { - self.plan_opt.returns_view_table() + self.metadata + .return_schema + .as_ref() + .is_some_and(|schema| schema.is_view()) } /// Does this plan return rows from an event table? pub fn returns_event_table(&self) -> bool { - self.plan_opt.return_table().is_some_and(|schema| schema.is_event) + self.metadata + .return_schema + .as_ref() + .is_some_and(|schema| schema.is_event) } /// The number of columns returned. /// Only relevant if [`Self::is_view`] is true. pub fn num_cols(&self) -> usize { - self.plan_opt - .return_table() + self.metadata + .return_schema + .as_ref() .map(|schema| schema.num_cols()) .unwrap_or_default() } @@ -357,8 +467,9 @@ impl SubscriptionPlan { /// The number of private columns returned. /// Only relevant if [`Self::is_view`] is true. pub fn num_private_cols(&self) -> usize { - self.plan_opt - .return_table() + self.metadata + .return_schema + .as_ref() .map(|schema| schema.num_private_cols()) .unwrap_or_default() } @@ -375,12 +486,36 @@ impl SubscriptionPlan { /// From which tables does this plan read? pub fn table_ids(&self) -> impl Iterator + '_ { - self.table_ids.iter().copied() + self.metadata.table_ids.iter().copied() + } + + /// The cached executor for the non-incremental query plan. + pub fn base_plan(&self) -> &PipelinedProject { + &self.base_plan } - /// The optimized plan without any delta scans - pub fn optimized_physical_plan(&self) -> &ProjectPlan { - &self.plan_opt + /// The table or view returned by this plan, if it returns whole rows. + pub fn return_table(&self) -> Option<&Arc> { + self.metadata.return_schema.as_ref() + } + + /// Does this plan read from an (anonymous) view? + pub fn reads_from_view(&self, anonymous: bool) -> bool { + if anonymous { + self.metadata.reads_anonymous_view + } else { + self.metadata.reads_non_anonymous_view + } + } + + /// Search arguments used for pruning. + pub fn search_args(&self) -> impl Iterator + '_ { + self.metadata.search_args.iter().cloned() + } + + /// Scan classification used for runtime metrics. + pub fn scan_metrics(&self) -> &SubscriptionPlanMetrics { + &self.metadata.scan_metrics } /// From which indexes does this plan read? @@ -419,11 +554,19 @@ impl SubscriptionPlan { /// 2. Single column index lookup on the rhs table /// 3. No self joins pub fn join_edge(&self) -> Option<(JoinEdge, AlgebraicValue)> { - if !self.is_join() { + self.metadata.join_edge.clone() + } + + fn join_edge_for_plan( + plan_opt: &ProjectPlan, + return_id: TableId, + is_join: bool, + ) -> Option<(JoinEdge, AlgebraicValue)> { + if !is_join { return None; } let mut join_edge = None; - self.plan_opt.visit(&mut |op| match op { + plan_opt.visit(&mut |op| match op { PhysicalPlan::IxJoin( IxJoin { lhs, @@ -437,7 +580,7 @@ impl SubscriptionPlan { .. }, _, - ) if rhs.table_id == self.return_id => match &**lhs { + ) if rhs.table_id == return_id => match &**lhs { PhysicalPlan::IxScan( IxScan { schema, @@ -446,11 +589,11 @@ impl SubscriptionPlan { .. }, _, - ) if schema.table_id != self.return_id + ) if schema.table_id != return_id && prefix.is_empty() && schema.is_unique(&ColList::new((*rhs_join_col).into())) => { - let lhs_table = self.return_id; + let lhs_table = return_id; let rhs_table = schema.table_id; let rhs_col = *rhs_col; let rhs_val = rhs_val.clone(); @@ -474,6 +617,15 @@ impl SubscriptionPlan { /// Generate a plan for incrementally maintaining a subscription pub fn compile(sql: &str, tx: &impl SchemaView, auth: &AuthCtx) -> Result<(Vec, bool)> { + Self::compile_plans(sql, tx, auth).map(|(plans, has_param, _)| (plans, has_param)) + } + + /// Generate a plan for incrementally maintaining a subscription + pub fn compile_plans( + sql: &str, + tx: &impl SchemaView, + auth: &AuthCtx, + ) -> Result<(Vec, bool, Vec)> { let (plans, return_id, return_name, has_param) = compile_subscription(sql, tx, auth)?; /// Does this plan have any non-index joins? @@ -511,6 +663,7 @@ impl SubscriptionPlan { } let mut subscriptions = vec![]; + let mut physical_plans = vec![]; for plan in plans { let plan_opt = plan.clone().optimize(auth)?; @@ -526,16 +679,33 @@ impl SubscriptionPlan { let (table_ids, table_aliases) = table_ids_for_plan(&plan); let fragments = Fragments::compile_from_plan(&plan, &table_aliases, auth)?; + let is_join = fragments.insert_plans.len() > 1 && fragments.delete_plans.len() > 1; + + let mut view_ids = HashSet::new(); + plan_opt.collect_views(&mut view_ids); + + let metadata = SubscriptionMetadata { + table_ids, + return_schema: plan_opt.return_table(), + view_ids: view_ids.into_iter().collect(), + reads_anonymous_view: plan_opt.reads_from_view(true), + reads_non_anonymous_view: plan_opt.reads_from_view(false), + search_args: plan_opt.physical_plan().search_args(), + join_edge: Self::join_edge_for_plan(&plan_opt, return_id, is_join), + scan_metrics: SubscriptionPlanMetrics::from_physical_plan(plan_opt.physical_plan()), + }; + + physical_plans.push(plan_opt.clone()); subscriptions.push(Self { return_id, return_name: return_name.clone(), - table_ids, - plan_opt, + base_plan: PipelinedProject::from(plan_opt), fragments, + metadata, }); } - Ok((subscriptions, has_param)) + Ok((subscriptions, has_param, physical_plans)) } }