diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 7657a2f8993..ba06a25e57d 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -649,28 +649,24 @@ impl CallReducerParams { pub enum ViewCommand { AddSingleSubscription { - info: Arc, sender: Arc, auth: AuthCtx, request: SubscribeSingle, timer: Instant, }, AddMultiSubscription { - info: Arc, sender: Arc, auth: AuthCtx, request: SubscribeMulti, timer: Instant, }, AddLegacySubscription { - info: Arc, sender: Arc, auth: AuthCtx, subscribe: Subscribe, timer: Instant, }, Sql { - info: Arc, db: Arc, sql_text: String, auth: AuthCtx, @@ -1562,14 +1558,12 @@ impl ModuleHost { pub async fn call_view_add_single_subscription( &self, - info: Arc, sender: Arc, auth: AuthCtx, request: SubscribeSingle, timer: Instant, ) -> Result, DBError> { let cmd = ViewCommand::AddSingleSubscription { - info, sender, auth, request, @@ -1597,14 +1591,12 @@ impl ModuleHost { pub async fn call_view_add_multi_subscription( &self, - info: Arc, sender: Arc, auth: AuthCtx, request: SubscribeMulti, timer: Instant, ) -> Result, DBError> { let cmd = ViewCommand::AddMultiSubscription { - info, sender, auth, request, @@ -1632,14 +1624,12 @@ impl ModuleHost { pub async fn call_view_add_legacy_subscription( &self, - info: Arc, sender: Arc, auth: AuthCtx, subscribe: spacetimedb_client_api_messages::websocket::Subscribe, timer: Instant, ) -> Result, DBError> { let cmd = ViewCommand::AddLegacySubscription { - info, sender, auth, subscribe, @@ -1667,7 +1657,6 @@ impl ModuleHost { pub async fn call_view_sql( &self, - info: Arc, db: Arc, sql_text: String, auth: AuthCtx, @@ -1675,7 +1664,6 @@ impl ModuleHost { head: &mut Vec<(Box, AlgebraicType)>, ) -> Result { let cmd = ViewCommand::Sql { - info, db, sql_text, auth, @@ -1838,7 +1826,6 @@ impl ModuleHost { pub fn materialize_views( mut tx: MutTxId, instance: &mut RefInstance<'_, I>, - module_def: &ModuleDef, view_collector: &impl CollectViews, caller: Identity, workload: Workload, @@ -1854,9 +1841,8 @@ impl ModuleHost { let is_anonymous = st_view_row.is_anonymous; let sender = if is_anonymous { None } else { Some(caller) }; if !tx.is_view_materialized(view_id, ArgId::SENTINEL, caller)? { - let (res, trapped) = Self::call_view( - instance, module_def, tx, &view_name, view_id, table_id, Nullary, caller, sender, - )?; + let (res, trapped) = + Self::call_view(instance, tx, &view_name, view_id, table_id, Nullary, caller, sender)?; tx = res.tx; if trapped { return Ok((tx, true)); @@ -1877,10 +1863,10 @@ impl ModuleHost { pub fn call_views_with_tx( tx: MutTxId, instance: &mut RefInstance<'_, I>, - module_def: &ModuleDef, caller: Identity, ) -> Result<(ViewCallResult, bool), ViewCallError> { let mut out = ViewCallResult::default(tx); + let module_def = &instance.common.info().module_def; let mut trapped = false; use FunctionArgs::Nullary; for ViewCallInfo { @@ -1896,7 +1882,6 @@ impl ModuleHost { let (result, trap) = Self::call_view( instance, - module_def, out.tx, &view_def.name, view_id, @@ -1924,7 +1909,6 @@ impl ModuleHost { fn call_view( instance: &mut RefInstance<'_, I>, - module_def: &ModuleDef, tx: MutTxId, view_name: &str, view_id: ViewId, @@ -1933,6 +1917,7 @@ impl ModuleHost { caller: Identity, sender: Option, ) -> Result<(ViewCallResult, bool), ViewCallError> { + let module_def = &instance.common.info().module_def; let view_def = module_def.view(view_name).ok_or(ViewCallError::NoSuchView)?; let fn_ptr = view_def.fn_ptr; let row_type = view_def.product_type_ref; diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index e31d1135e6b..de90c248ef7 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -22,7 +22,7 @@ use crate::messages::control_db::HostType; use crate::module_host_context::ModuleCreationContextLimited; use crate::replica_context::ReplicaContext; use crate::sql::ast::SchemaViewer; -use crate::sql::execute::run_from_module; +use crate::sql::execute::run_with_instance; use crate::subscription::module_subscription_actor::commit_and_broadcast_event; use crate::subscription::module_subscription_manager::TransactionOffset; use crate::util::prometheus_handle::{HistogramExt, TimerGuard}; @@ -507,6 +507,10 @@ impl InstanceCommon { } } + pub(crate) fn info(&self) -> Arc { + self.info.clone() + } + #[tracing::instrument(level = "trace", skip_all)] pub(crate) fn update_database( &mut self, @@ -977,26 +981,21 @@ impl InstanceCommon { } pub(crate) fn handle_cmd(&mut self, cmds: ViewCommand, inst: &mut I) -> (ViewCommandResult, bool) { + let info = self.info.clone(); let mut inst = RefInstance { instance: inst, common: self, }; match cmds { ViewCommand::AddSingleSubscription { - info, sender, auth, request, timer, } => { - let res = info.subscriptions.add_single_subscription_from_module( - Some((&mut inst, &info.module_def)), - sender, - auth, - request, - timer, - None, - ); + let res = info + .subscriptions + .add_single_subscription_with_instance(&mut inst, sender, auth, request, timer, None); match res { Ok((metrics, trapped)) => (ViewCommandResult::Subscription { result: Ok(metrics) }, trapped), @@ -1004,20 +1003,14 @@ impl InstanceCommon { } } ViewCommand::AddLegacySubscription { - info, sender, auth, subscribe, timer, } => { - let res = info.subscriptions.add_legacy_subscriber_from_module( - Some((&mut inst, &info.module_def)), - sender, - auth, - subscribe, - timer, - None, - ); + let res = info + .subscriptions + .add_legacy_subscriber_with_instance(&mut inst, sender, auth, subscribe, timer, None); match res { Ok((metrics, trapped)) => ( @@ -1030,20 +1023,14 @@ impl InstanceCommon { } } ViewCommand::AddMultiSubscription { - info, sender, auth, request, timer, } => { - let res = info.subscriptions.add_multi_subscription_from_module( - Some((&mut inst, &info.module_def)), - sender, - auth, - request, - timer, - None, - ); + let res = info + .subscriptions + .add_multi_subscription_with_instance(&mut inst, sender, auth, request, timer, None); match res { Ok((metrics, trapped)) => (ViewCommandResult::Subscription { result: Ok(metrics) }, trapped), @@ -1052,14 +1039,13 @@ impl InstanceCommon { } ViewCommand::Sql { - info, db, sql_text, auth, subs, } => { let mut head = vec![]; - let res = run_from_module(Some((&mut inst, &info.module_def)), db, sql_text, auth, subs, &mut head); + let res = run_with_instance(&mut inst, db, sql_text, auth, subs, &mut head); match res { Ok((result, trapped)) => ( diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index 36a74c39869..48982de2066 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -26,7 +26,6 @@ use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_lib::Timestamp; use spacetimedb_lib::{AlgebraicType, ProductType, ProductValue}; use spacetimedb_query::{compile_sql_stmt, execute_dml_stmt, execute_select_stmt}; -use spacetimedb_schema::def::ModuleDef; use spacetimedb_schema::relation::FieldName; use spacetimedb_vm::eval::run_ast; use spacetimedb_vm::expr::{CodeResult, CrudExpr, Expr}; @@ -189,6 +188,10 @@ pub struct SqlResult { } /// Run the `SQL` string using the `auth` credentials +/// +/// If a `ModuleHost` is provided, the SQL query is executed via the module host, +/// meaning the module’s core is used to run the statement. +/// If no module host is provided, the SQL query is executed on the current thread. pub async fn run( db: Arc, sql_text: String, @@ -198,17 +201,27 @@ pub async fn run( head: &mut Vec<(Box, AlgebraicType)>, ) -> Result { match module { - Some(module) => { - let info = module.info.clone(); - module.call_view_sql(info, db, sql_text, auth, subs, head).await - } - None => run_from_module::(None, db, sql_text, auth, subs, head) - .map(|x| x.0), + Some(module) => module.call_view_sql(db, sql_text, auth, subs, head).await, + None => run_inner::(None, db, sql_text, auth, subs, head).map(|x| x.0), } } -pub fn run_from_module( - instance: Option<(&mut RefInstance, &ModuleDef)>, +/// Run the `SQL` string using the provided `WasmInstance` and `ModuleDef` +/// +/// The query will always be executed on the module's thread. +pub(crate) fn run_with_instance( + instance: &mut RefInstance, + db: Arc, + sql_text: String, + auth: AuthCtx, + subs: Option, + head: &mut Vec<(Box, AlgebraicType)>, +) -> Result<(SqlResult, bool), DBError> { + run_inner::(Some(instance), db, sql_text, auth, subs, head) +} + +fn run_inner( + instance: Option<&mut RefInstance>, db: Arc, sql_text: String, auth: AuthCtx, @@ -227,9 +240,7 @@ pub fn run_from_module( Statement::Select(stmt) => { // Materialize views and downgrade to a read-only transaction let (tx, trapped) = match instance { - Some(instance) => { - ModuleHost::materialize_views(tx, instance.0, instance.1, &stmt, auth.caller(), Workload::Sql)? - } + Some(instance) => ModuleHost::materialize_views(tx, instance, &stmt, auth.caller(), Workload::Sql)?, None => (tx, false), }; @@ -287,7 +298,7 @@ pub fn run_from_module( // Update views let (result, trapped) = match instance { - Some(instance) => ModuleHost::call_views_with_tx(tx, instance.0, instance.1, auth.caller())?, + Some(instance) => ModuleHost::call_views_with_tx(tx, instance, auth.caller())?, None => (ViewCallResult::default(tx), false), }; diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 18307d43965..52af8fc5c09 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -43,7 +43,6 @@ use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_lib::Identity; use spacetimedb_primitives::ArgId; -use spacetimedb_schema::def::ModuleDef; use spacetimedb_table::static_assert_size; use std::collections::HashSet; use std::{sync::Arc, time::Instant}; @@ -458,6 +457,14 @@ impl ModuleSubscriptions { Ok((update, metrics)) } + /// Add a subscription for a single query. + /// + /// - If `host` is `Some`, the request is forwarded to the module host. The host + /// will execute the subscription logic on the module thread and call back + /// into `add_single_subscription_with_instance` . + /// - If `host` is `None`, the subscription is executed directly without involving + /// a module. + #[tracing::instrument(level = "trace", skip_all)] pub async fn add_single_subscription( &self, host: Option<&ModuleHost>, @@ -469,23 +476,35 @@ impl ModuleSubscriptions { ) -> Result, DBError> { match host { Some(host) => { - let info = host.info.clone(); - host.call_view_add_single_subscription(info, sender, auth, request, timer) + host.call_view_add_single_subscription(sender, auth, request, timer) .await } None => self - .add_single_subscription_from_module::( + .add_single_subscription_inner::( None, sender, auth, request, timer, _assert, ) .map(|(metrics, _)| metrics), } } - /// Add a subscription to a single query. - #[tracing::instrument(level = "trace", skip_all)] - pub fn add_single_subscription_from_module( + // Add a subscription for a single query with access to a module instance. + /// + /// The execution logic will always be called from the module's thread. + pub(crate) fn add_single_subscription_with_instance( &self, - instance: Option<(&mut RefInstance, &ModuleDef)>, + instance: &mut RefInstance, + sender: Arc, + auth: AuthCtx, + request: SubscribeSingle, + timer: Instant, + _assert: Option, + ) -> Result<(Option, bool), DBError> { + self.add_single_subscription_inner(Some(instance), sender, auth, request, timer, _assert) + } + + fn add_single_subscription_inner( + &self, + instance: Option<&mut RefInstance>, sender: Arc, auth: AuthCtx, request: SubscribeSingle, @@ -851,6 +870,9 @@ impl ModuleSubscriptions { self.broadcast_queue.send_client_message(recipient, tx_offset, message) } + /// Add a subscription consisting of multiple queries. + /// + /// Read more in [`Self::add_single_subscription`]. #[tracing::instrument(level = "trace", skip_all)] pub async fn add_multi_subscription( &self, @@ -863,22 +885,34 @@ impl ModuleSubscriptions { ) -> Result, DBError> { match host { Some(host) => { - let info = host.info.clone(); - host.call_view_add_multi_subscription(info, sender, auth, request, timer) + host.call_view_add_multi_subscription(sender, auth, request, timer) .await } None => self - .add_multi_subscription_from_module::( + .add_multi_subscription_inner::( None, sender, auth, request, timer, _assert, ) .map(|(metrics, _)| metrics), } } - #[tracing::instrument(level = "trace", skip_all)] - pub fn add_multi_subscription_from_module( + /// Similar to [`Self::add_single_subscription_with_instance`], + /// but for multiple queries. + pub(crate) fn add_multi_subscription_with_instance( &self, - instance: Option<(&mut RefInstance, &ModuleDef)>, + instance: &mut RefInstance, + sender: Arc, + auth: AuthCtx, + request: SubscribeMulti, + timer: Instant, + _assert: Option, + ) -> Result<(Option, bool), DBError> { + self.add_multi_subscription_inner(Some(instance), sender, auth, request, timer, _assert) + } + + fn add_multi_subscription_inner( + &self, + instance: Option<&mut RefInstance>, sender: Arc, auth: AuthCtx, request: SubscribeMulti, @@ -1004,14 +1038,12 @@ impl ModuleSubscriptions { _assert: Option, ) -> Result { match host { - Some(host) => { - let info = host.info.clone(); - host.call_view_add_legacy_subscription(info, sender, auth, subscription, timer) - .await - .map(|metrics| metrics.unwrap_or_default()) - } + Some(host) => host + .call_view_add_legacy_subscription(sender, auth, subscription, timer) + .await + .map(|metrics| metrics.unwrap_or_default()), None => self - .add_legacy_subscriber_from_module::( + .add_legacy_subscriber_inner::( None, sender, auth, @@ -1022,9 +1054,24 @@ impl ModuleSubscriptions { .map(|(metrics, _)| metrics), } } - pub fn add_legacy_subscriber_from_module( + + /// Similar to [`Self::add_single_subscription_with_instance`], + /// but for the legacy subscription API which uses a set of queries. + pub(crate) fn add_legacy_subscriber_with_instance( + &self, + instance: &mut RefInstance, + sender: Arc, + auth: AuthCtx, + subscription: Subscribe, + timer: Instant, + _assert: Option, + ) -> Result<(ExecutionMetrics, bool), DBError> { + self.add_legacy_subscriber_inner(Some(instance), sender, auth, subscription, timer, _assert) + } + + fn add_legacy_subscriber_inner( &self, - instance: Option<(&mut RefInstance, &ModuleDef)>, + instance: Option<&mut RefInstance>, sender: Arc, auth: AuthCtx, subscription: Subscribe, @@ -1263,15 +1310,13 @@ impl ModuleSubscriptions { fn materialize_views_and_downgrade_tx( &self, mut tx: MutTxId, - instance: Option<(&mut RefInstance<'_, I>, &ModuleDef)>, + instance: Option<&mut RefInstance<'_, I>>, view_collector: &impl CollectViews, sender: Identity, ) -> Result<(TxGuard, TransactionOffset, bool), DBError> { let mut trapped = false; if let Some(instance) = instance { - let (instance, module_def) = instance; - (tx, trapped) = - ModuleHost::materialize_views(tx, instance, module_def, view_collector, sender, Workload::Subscribe)?; + (tx, trapped) = ModuleHost::materialize_views(tx, instance, view_collector, sender, Workload::Subscribe)?; }; let (tx_data, tx_metrics_mut, tx) = self.relational_db.commit_tx_downgrade(tx, Workload::Subscribe);