Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 4 additions & 19 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,28 +649,24 @@ impl CallReducerParams {

pub enum ViewCommand {
AddSingleSubscription {
info: Arc<ModuleInfo>,
sender: Arc<ClientConnectionSender>,
auth: AuthCtx,
request: SubscribeSingle,
timer: Instant,
},
AddMultiSubscription {
info: Arc<ModuleInfo>,
sender: Arc<ClientConnectionSender>,
auth: AuthCtx,
request: SubscribeMulti,
timer: Instant,
},
AddLegacySubscription {
info: Arc<ModuleInfo>,
sender: Arc<ClientConnectionSender>,
auth: AuthCtx,
subscribe: Subscribe,
timer: Instant,
},
Sql {
info: Arc<ModuleInfo>,
db: Arc<RelationalDB>,
sql_text: String,
auth: AuthCtx,
Expand Down Expand Up @@ -1562,14 +1558,12 @@ impl ModuleHost {

pub async fn call_view_add_single_subscription(
&self,
info: Arc<ModuleInfo>,
sender: Arc<ClientConnectionSender>,
auth: AuthCtx,
request: SubscribeSingle,
timer: Instant,
) -> Result<Option<ExecutionMetrics>, DBError> {
let cmd = ViewCommand::AddSingleSubscription {
info,
sender,
auth,
request,
Expand Down Expand Up @@ -1597,14 +1591,12 @@ impl ModuleHost {

pub async fn call_view_add_multi_subscription(
&self,
info: Arc<ModuleInfo>,
sender: Arc<ClientConnectionSender>,
auth: AuthCtx,
request: SubscribeMulti,
timer: Instant,
) -> Result<Option<ExecutionMetrics>, DBError> {
let cmd = ViewCommand::AddMultiSubscription {
info,
sender,
auth,
request,
Expand Down Expand Up @@ -1632,14 +1624,12 @@ impl ModuleHost {

pub async fn call_view_add_legacy_subscription(
&self,
info: Arc<ModuleInfo>,
sender: Arc<ClientConnectionSender>,
auth: AuthCtx,
subscribe: spacetimedb_client_api_messages::websocket::Subscribe,
timer: Instant,
) -> Result<Option<ExecutionMetrics>, DBError> {
let cmd = ViewCommand::AddLegacySubscription {
info,
sender,
auth,
subscribe,
Expand Down Expand Up @@ -1667,15 +1657,13 @@ impl ModuleHost {

pub async fn call_view_sql(
&self,
info: Arc<ModuleInfo>,
db: Arc<RelationalDB>,
sql_text: String,
auth: AuthCtx,
subs: Option<ModuleSubscriptions>,
head: &mut Vec<(Box<str>, AlgebraicType)>,
) -> Result<SqlResult, DBError> {
let cmd = ViewCommand::Sql {
info,
db,
sql_text,
auth,
Expand Down Expand Up @@ -1838,7 +1826,6 @@ impl ModuleHost {
pub fn materialize_views<I: WasmInstance>(
mut tx: MutTxId,
instance: &mut RefInstance<'_, I>,
module_def: &ModuleDef,
view_collector: &impl CollectViews,
caller: Identity,
workload: Workload,
Expand All @@ -1854,9 +1841,8 @@ impl ModuleHost {
let is_anonymous = st_view_row.is_anonymous;
let sender = if is_anonymous { None } else { Some(caller) };
if !tx.is_view_materialized(view_id, ArgId::SENTINEL, caller)? {
let (res, trapped) = Self::call_view(
instance, module_def, tx, &view_name, view_id, table_id, Nullary, caller, sender,
)?;
let (res, trapped) =
Self::call_view(instance, tx, &view_name, view_id, table_id, Nullary, caller, sender)?;
tx = res.tx;
if trapped {
return Ok((tx, true));
Expand All @@ -1877,10 +1863,10 @@ impl ModuleHost {
pub fn call_views_with_tx<I: WasmInstance>(
tx: MutTxId,
instance: &mut RefInstance<'_, I>,
module_def: &ModuleDef,
caller: Identity,
) -> Result<(ViewCallResult, bool), ViewCallError> {
let mut out = ViewCallResult::default(tx);
let module_def = &instance.common.info().module_def;
let mut trapped = false;
use FunctionArgs::Nullary;
for ViewCallInfo {
Expand All @@ -1896,7 +1882,6 @@ impl ModuleHost {

let (result, trap) = Self::call_view(
instance,
module_def,
out.tx,
&view_def.name,
view_id,
Expand Down Expand Up @@ -1924,7 +1909,6 @@ impl ModuleHost {

fn call_view<I: WasmInstance>(
instance: &mut RefInstance<'_, I>,
module_def: &ModuleDef,
tx: MutTxId,
view_name: &str,
view_id: ViewId,
Expand All @@ -1933,6 +1917,7 @@ impl ModuleHost {
caller: Identity,
sender: Option<Identity>,
) -> Result<(ViewCallResult, bool), ViewCallError> {
let module_def = &instance.common.info().module_def;
let view_def = module_def.view(view_name).ok_or(ViewCallError::NoSuchView)?;
let fn_ptr = view_def.fn_ptr;
let row_type = view_def.product_type_ref;
Expand Down
46 changes: 16 additions & 30 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::messages::control_db::HostType;
use crate::module_host_context::ModuleCreationContextLimited;
use crate::replica_context::ReplicaContext;
use crate::sql::ast::SchemaViewer;
use crate::sql::execute::run_from_module;
use crate::sql::execute::run_with_instance;
use crate::subscription::module_subscription_actor::commit_and_broadcast_event;
use crate::subscription::module_subscription_manager::TransactionOffset;
use crate::util::prometheus_handle::{HistogramExt, TimerGuard};
Expand Down Expand Up @@ -507,6 +507,10 @@ impl InstanceCommon {
}
}

pub(crate) fn info(&self) -> Arc<ModuleInfo> {
self.info.clone()
}

#[tracing::instrument(level = "trace", skip_all)]
pub(crate) fn update_database<I: WasmInstance>(
&mut self,
Expand Down Expand Up @@ -977,47 +981,36 @@ impl InstanceCommon {
}

pub(crate) fn handle_cmd<I: WasmInstance>(&mut self, cmds: ViewCommand, inst: &mut I) -> (ViewCommandResult, bool) {
let info = self.info.clone();
let mut inst = RefInstance {
instance: inst,
common: self,
};
match cmds {
ViewCommand::AddSingleSubscription {
info,
sender,
auth,
request,
timer,
} => {
let res = info.subscriptions.add_single_subscription_from_module(
Some((&mut inst, &info.module_def)),
sender,
auth,
request,
timer,
None,
);
let res = info
.subscriptions
.add_single_subscription_with_instance(&mut inst, sender, auth, request, timer, None);

match res {
Ok((metrics, trapped)) => (ViewCommandResult::Subscription { result: Ok(metrics) }, trapped),
Err(err) => (ViewCommandResult::Subscription { result: Err(err) }, false),
}
}
ViewCommand::AddLegacySubscription {
info,
sender,
auth,
subscribe,
timer,
} => {
let res = info.subscriptions.add_legacy_subscriber_from_module(
Some((&mut inst, &info.module_def)),
sender,
auth,
subscribe,
timer,
None,
);
let res = info
.subscriptions
.add_legacy_subscriber_with_instance(&mut inst, sender, auth, subscribe, timer, None);

match res {
Ok((metrics, trapped)) => (
Expand All @@ -1030,20 +1023,14 @@ impl InstanceCommon {
}
}
ViewCommand::AddMultiSubscription {
info,
sender,
auth,
request,
timer,
} => {
let res = info.subscriptions.add_multi_subscription_from_module(
Some((&mut inst, &info.module_def)),
sender,
auth,
request,
timer,
None,
);
let res = info
.subscriptions
.add_multi_subscription_with_instance(&mut inst, sender, auth, request, timer, None);

match res {
Ok((metrics, trapped)) => (ViewCommandResult::Subscription { result: Ok(metrics) }, trapped),
Expand All @@ -1052,14 +1039,13 @@ impl InstanceCommon {
}

ViewCommand::Sql {
info,
db,
sql_text,
auth,
subs,
} => {
let mut head = vec![];
let res = run_from_module(Some((&mut inst, &info.module_def)), db, sql_text, auth, subs, &mut head);
let res = run_with_instance(&mut inst, db, sql_text, auth, subs, &mut head);

match res {
Ok((result, trapped)) => (
Expand Down
37 changes: 24 additions & 13 deletions crates/core/src/sql/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use spacetimedb_lib::metrics::ExecutionMetrics;
use spacetimedb_lib::Timestamp;
use spacetimedb_lib::{AlgebraicType, ProductType, ProductValue};
use spacetimedb_query::{compile_sql_stmt, execute_dml_stmt, execute_select_stmt};
use spacetimedb_schema::def::ModuleDef;
use spacetimedb_schema::relation::FieldName;
use spacetimedb_vm::eval::run_ast;
use spacetimedb_vm::expr::{CodeResult, CrudExpr, Expr};
Expand Down Expand Up @@ -189,6 +188,10 @@ pub struct SqlResult {
}

/// Run the `SQL` string using the `auth` credentials
///
/// If a `ModuleHost` is provided, the SQL query is executed via the module host,
/// meaning the module’s core is used to run the statement.
/// If no module host is provided, the SQL query is executed on the current thread.
pub async fn run(
db: Arc<RelationalDB>,
sql_text: String,
Expand All @@ -198,17 +201,27 @@ pub async fn run(
head: &mut Vec<(Box<str>, AlgebraicType)>,
) -> Result<SqlResult, DBError> {
match module {
Some(module) => {
let info = module.info.clone();
module.call_view_sql(info, db, sql_text, auth, subs, head).await
}
None => run_from_module::<crate::host::wasmtime::WasmtimeInstance>(None, db, sql_text, auth, subs, head)
.map(|x| x.0),
Some(module) => module.call_view_sql(db, sql_text, auth, subs, head).await,
None => run_inner::<crate::host::wasmtime::WasmtimeInstance>(None, db, sql_text, auth, subs, head).map(|x| x.0),
}
}

pub fn run_from_module<I: WasmInstance>(
instance: Option<(&mut RefInstance<I>, &ModuleDef)>,
/// Run the `SQL` string using the provided `WasmInstance` and `ModuleDef`
///
/// The query will always be executed on the module's thread.
pub(crate) fn run_with_instance<I: WasmInstance>(
instance: &mut RefInstance<I>,
db: Arc<RelationalDB>,
sql_text: String,
auth: AuthCtx,
subs: Option<ModuleSubscriptions>,
head: &mut Vec<(Box<str>, AlgebraicType)>,
) -> Result<(SqlResult, bool), DBError> {
run_inner::<I>(Some(instance), db, sql_text, auth, subs, head)
}

fn run_inner<I: WasmInstance>(
instance: Option<&mut RefInstance<I>>,
db: Arc<RelationalDB>,
sql_text: String,
auth: AuthCtx,
Expand All @@ -227,9 +240,7 @@ pub fn run_from_module<I: WasmInstance>(
Statement::Select(stmt) => {
// Materialize views and downgrade to a read-only transaction
let (tx, trapped) = match instance {
Some(instance) => {
ModuleHost::materialize_views(tx, instance.0, instance.1, &stmt, auth.caller(), Workload::Sql)?
}
Some(instance) => ModuleHost::materialize_views(tx, instance, &stmt, auth.caller(), Workload::Sql)?,
None => (tx, false),
};

Expand Down Expand Up @@ -287,7 +298,7 @@ pub fn run_from_module<I: WasmInstance>(

// Update views
let (result, trapped) = match instance {
Some(instance) => ModuleHost::call_views_with_tx(tx, instance.0, instance.1, auth.caller())?,
Some(instance) => ModuleHost::call_views_with_tx(tx, instance, auth.caller())?,
None => (ViewCallResult::default(tx), false),
};

Expand Down
Loading
Loading