diff --git a/crates/bench/benches/subscription.rs b/crates/bench/benches/subscription.rs index 6a44916e287..0f022010193 100644 --- a/crates/bench/benches/subscription.rs +++ b/crates/bench/benches/subscription.rs @@ -128,7 +128,7 @@ fn eval(c: &mut Criterion) { let (plans, table_id, table_name, _) = compile_subscription(sql, schema_viewer, &auth).unwrap(); let plans = plans .into_iter() - .map(|plan| plan.optimize().unwrap()) + .map(|plan| plan.optimize(&auth).unwrap()) .map(PipelinedProject::from) .collect::>(); let tx = DeltaTx::from(&tx); diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 4b7a533c008..dab34f155c6 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -2088,6 +2088,57 @@ pub mod tests_utils { Ok((gen_cols, row_ref)) } + /// Allocate a backing table in the datastore for a view + pub fn create_view_for_test( + db: &RelationalDB, + name: &str, + schema: &[(&str, AlgebraicType)], + is_anonymous: bool, + ) -> Result { + let mut builder = RawModuleDefV9Builder::new(); + + // Add the view's product type to the typespace + let type_ref = builder.add_algebraic_type( + [], + name, + AlgebraicType::Product(ProductType::from_iter(schema.iter().cloned())), + true, + ); + + builder.add_view( + name, + true, + is_anonymous, + ProductType::unit(), + AlgebraicType::array(AlgebraicType::Ref(type_ref)), + ); + + let module_def: ModuleDef = builder.finish().try_into()?; + let view_def: &ViewDef = module_def.view(name).expect("view not found"); + + // Allocate a backing table and return its table id + db.with_auto_commit(Workload::Internal, |tx| db.create_view(tx, &module_def, view_def)) + .map(|(_, table_id)| table_id) + } + + /// Insert a row into a view's backing table + pub fn insert_into_view<'a>( + db: &'a RelationalDB, + tx: &'a mut MutTx, + table_id: TableId, + sender: Option, + row: ProductValue, + ) -> Result, DBError> { + let meta_cols = match sender { + Some(identity) => vec![identity.into()], + None => vec![], + }; + let cols = meta_cols.into_iter().chain(row.elements); + let row = ProductValue::from_iter(cols); + db.insert(tx, table_id, &to_vec(&row).unwrap()) + .map(|(_, row_ref, _)| row_ref) + } + /// An in-memory commitlog used for tests that want to replay a known history. pub struct TestHistory(commitlog::commitlog::Generic); diff --git a/crates/core/src/estimation.rs b/crates/core/src/estimation.rs index 64892e9b9bb..24130a145f8 100644 --- a/crates/core/src/estimation.rs +++ b/crates/core/src/estimation.rs @@ -198,7 +198,7 @@ mod tests { .map(|(plans, ..)| plans) .expect("failed to compile sql query") .into_iter() - .map(|plan| plan.optimize().expect("failed to optimize sql query")) + .map(|plan| plan.optimize(&auth).expect("failed to optimize sql query")) .map(|plan| row_estimate(&tx, &plan)) .sum() } diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 1fde862906f..09f1c4da4dc 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -17,10 +17,10 @@ use crate::module_host_context::ModuleCreationContext; use crate::replica_context::ReplicaContext; use crate::sql::ast::SchemaViewer; use crate::sql::parser::RowLevelExpr; -use crate::subscription::execute_plan; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::subscription::tx::DeltaTx; use crate::subscription::websocket_building::BuildableWebsocketFormat; +use crate::subscription::{execute_plan, execute_plan_for_view}; use crate::util::jobs::{SingleCoreExecutor, WeakSingleCoreExecutor}; use crate::vm::check_row_limit; use crate::worker_metrics::WORKER_METRICS; @@ -41,7 +41,7 @@ use spacetimedb_datastore::locking_tx_datastore::MutTxId; use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID}; use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData}; use spacetimedb_durability::DurableOffset; -use spacetimedb_execution::pipelined::PipelinedProject; +use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject}; use spacetimedb_lib::db::raw_def::v9::Lifecycle; use spacetimedb_lib::identity::{AuthCtx, RequestId}; use spacetimedb_lib::metrics::ExecutionMetrics; @@ -1495,7 +1495,7 @@ impl ModuleHost { // Optimize each fragment let optimized = plans .into_iter() - .map(|plan| plan.optimize()) + .map(|plan| plan.optimize(&auth)) .collect::, _>>()?; check_row_limit( @@ -1507,12 +1507,31 @@ impl ModuleHost { &auth, )?; + let return_table = || optimized.first().and_then(|plan| plan.return_table()); + + let returns_view_table = optimized.first().is_some_and(|plan| plan.returns_view_table()); + let num_cols = return_table().map(|schema| schema.num_cols()).unwrap_or_default(); + let num_private_cols = return_table() + .map(|schema| schema.num_private_cols()) + .unwrap_or_default(); + let optimized = optimized .into_iter() // Convert into something we can execute .map(PipelinedProject::from) .collect::>(); + if returns_view_table && num_private_cols > 0 { + let optimized = optimized + .into_iter() + .map(|plan| ViewProject::new(plan, num_cols, num_private_cols)) + .collect::>(); + // Execute the union and return the results + return execute_plan_for_view::<_, F>(&optimized, &DeltaTx::from(&*tx)) + .map(|(rows, _, metrics)| (OneOffTable { table_name, rows }, metrics)) + .context("One-off queries are not allowed to modify the database"); + } + // Execute the union and return the results execute_plan::<_, F>(&optimized, &DeltaTx::from(&*tx)) .map(|(rows, _, metrics)| (OneOffTable { table_name, rows }, metrics)) diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index 089148cdd2e..b060b7c0641 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -227,7 +227,7 @@ pub fn run( }); // Evaluate the query - let rows = execute_select_stmt(stmt, &DeltaTx::from(&*tx), &mut metrics, |plan| { + let rows = execute_select_stmt(&auth, stmt, &DeltaTx::from(&*tx), &mut metrics, |plan| { check_row_limit( &[&plan], db, @@ -254,7 +254,7 @@ pub fn run( } // Evaluate the mutation - let (mut tx, _) = db.with_auto_rollback(tx, |tx| execute_dml_stmt(stmt, tx, &mut metrics))?; + let (mut tx, _) = db.with_auto_rollback(tx, |tx| execute_dml_stmt(&auth, stmt, tx, &mut metrics))?; // Update transaction metrics tx.metrics.merge(metrics); @@ -332,7 +332,7 @@ pub(crate) mod tests { use std::sync::Arc; use super::*; - use crate::db::relational_db::tests_utils::{begin_tx, insert, with_auto_commit, TestDB}; + use crate::db::relational_db::tests_utils::{self, begin_tx, insert, with_auto_commit, TestDB}; use crate::vm::tests::create_table_with_rows; use itertools::Itertools; use pretty_assertions::assert_eq; @@ -904,6 +904,158 @@ pub(crate) mod tests { Ok(()) } + #[test] + fn test_view() -> anyhow::Result<()> { + let db = TestDB::in_memory()?; + + let schema = [("a", AlgebraicType::U8), ("b", AlgebraicType::U8)]; + let table_id = tests_utils::create_view_for_test(&db, "my_view", &schema, false)?; + + with_auto_commit(&db, |tx| -> Result<_, DBError> { + tests_utils::insert_into_view(&db, tx, table_id, Some(identity_from_u8(1)), product![0u8, 1u8])?; + tests_utils::insert_into_view(&db, tx, table_id, Some(identity_from_u8(2)), product![0u8, 2u8])?; + Ok(()) + })?; + + let id = identity_from_u8(2); + let auth = AuthCtx::new(Identity::ZERO, id); + + assert_query_results(&db, "select * from my_view", &auth, [product![0u8, 2u8]]); + + Ok(()) + } + + #[test] + fn test_anonymous_view() -> anyhow::Result<()> { + let db = TestDB::in_memory()?; + + let schema = [("a", AlgebraicType::U8), ("b", AlgebraicType::U8)]; + let table_id = tests_utils::create_view_for_test(&db, "my_view", &schema, true)?; + + with_auto_commit(&db, |tx| -> Result<_, DBError> { + tests_utils::insert_into_view(&db, tx, table_id, None, product![0u8, 1u8])?; + tests_utils::insert_into_view(&db, tx, table_id, None, product![0u8, 2u8])?; + Ok(()) + })?; + + let id = identity_from_u8(1); + let auth = AuthCtx::new(Identity::ZERO, id); + + assert_query_results(&db, "select b from my_view", &auth, [product![1u8], product![2u8]]); + + Ok(()) + } + + #[test] + fn test_view_join_table() -> anyhow::Result<()> { + let db = TestDB::in_memory()?; + + let schema = [("a", AlgebraicType::U8), ("b", AlgebraicType::U8)]; + let v_id = tests_utils::create_view_for_test(&db, "v", &schema, false)?; + + let schema = [("c", AlgebraicType::U8), ("d", AlgebraicType::U8)]; + let t_id = db.create_table_for_test("t", &schema, &[0.into()])?; + + with_auto_commit(&db, |tx| -> Result<_, DBError> { + db.insert(tx, t_id, &product![0u8, 3u8].to_bsatn_vec().unwrap())?; + db.insert(tx, t_id, &product![1u8, 4u8].to_bsatn_vec().unwrap())?; + tests_utils::insert_into_view(&db, tx, v_id, Some(identity_from_u8(1)), product![0u8, 1u8])?; + tests_utils::insert_into_view(&db, tx, v_id, Some(identity_from_u8(2)), product![1u8, 2u8])?; + Ok(()) + })?; + + let id = identity_from_u8(2); + let auth = AuthCtx::new(Identity::ZERO, id); + + assert_query_results( + &db, + "select t.* from v join t on v.a = t.c", + &auth, + [product![1u8, 4u8]], + ); + assert_query_results( + &db, + "select v.* from v join t on v.a = t.c", + &auth, + [product![1u8, 2u8]], + ); + assert_query_results( + &db, + "select v.* from v join t where v.a = t.c", + &auth, + [product![1u8, 2u8]], + ); + assert_query_results( + &db, + "select v.b as b, t.d as d from v join t on v.a = t.c", + &auth, + [product![2u8, 4u8]], + ); + assert_query_results( + &db, + "select v.b as b, t.d as d from v join t where v.a = t.c", + &auth, + [product![2u8, 4u8]], + ); + + Ok(()) + } + + #[test] + fn test_view_join_view() -> anyhow::Result<()> { + let db = TestDB::in_memory()?; + + let schema = [("a", AlgebraicType::U8), ("b", AlgebraicType::U8)]; + let u_id = tests_utils::create_view_for_test(&db, "u", &schema, false)?; + + let schema = [("c", AlgebraicType::U8), ("d", AlgebraicType::U8)]; + let v_id = tests_utils::create_view_for_test(&db, "v", &schema, false)?; + + with_auto_commit(&db, |tx| -> Result<_, DBError> { + tests_utils::insert_into_view(&db, tx, u_id, Some(identity_from_u8(1)), product![0u8, 1u8])?; + tests_utils::insert_into_view(&db, tx, u_id, Some(identity_from_u8(2)), product![1u8, 2u8])?; + tests_utils::insert_into_view(&db, tx, v_id, Some(identity_from_u8(1)), product![0u8, 3u8])?; + tests_utils::insert_into_view(&db, tx, v_id, Some(identity_from_u8(2)), product![1u8, 4u8])?; + Ok(()) + })?; + + let id = identity_from_u8(2); + let auth = AuthCtx::new(Identity::ZERO, id); + + assert_query_results( + &db, + "select u.* from u join v on u.a = v.c", + &auth, + [product![1u8, 2u8]], + ); + assert_query_results( + &db, + "select v.* from u join v on u.a = v.c", + &auth, + [product![1u8, 4u8]], + ); + assert_query_results( + &db, + "select v.* from u join v where u.a = v.c", + &auth, + [product![1u8, 4u8]], + ); + assert_query_results( + &db, + "select u.b as b, v.d as d from u join v on u.a = v.c", + &auth, + [product![2u8, 4u8]], + ); + assert_query_results( + &db, + "select u.b as b, v.d as d from u join v where u.a = v.c", + &auth, + [product![2u8, 4u8]], + ); + + Ok(()) + } + #[test] fn test_select_star_table() -> ResultTest<()> { let (db, input) = create_data(1)?; diff --git a/crates/core/src/subscription/mod.rs b/crates/core/src/subscription/mod.rs index e58a16c0779..5066c1d17b4 100644 --- a/crates/core/src/subscription/mod.rs +++ b/crates/core/src/subscription/mod.rs @@ -10,7 +10,9 @@ use spacetimedb_client_api_messages::websocket::{ use spacetimedb_datastore::{ db_metrics::DB_METRICS, execution_context::WorkloadType, locking_tx_datastore::datastore::MetricsRecorder, }; +use spacetimedb_execution::pipelined::ViewProject; use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore}; +use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::{metrics::ExecutionMetrics, Identity}; use spacetimedb_primitives::TableId; use std::sync::Arc; @@ -91,6 +93,37 @@ impl MetricsRecorder for ExecutionCounters { } } +/// Execute a subscription query over a view. +/// +/// Specifically this utility is for queries that return rows from a view. +/// Unlike user tables, views have internal columns that should not be returned to clients. +/// The [`ViewProject`] operator implicitly drops these columns as part of its execution. +/// +/// NOTE: This method was largely copied from [`execute_plan`]. +/// TODO: Merge with [`execute_plan`]. +pub fn execute_plan_for_view(plan_fragments: &[ViewProject], tx: &Tx) -> Result<(F::List, u64, ExecutionMetrics)> +where + Tx: Datastore + DeltaStore, + F: BuildableWebsocketFormat, +{ + let mut count = 0; + let mut list = F::ListBuilder::default(); + let mut metrics = ExecutionMetrics::default(); + + for fragment in plan_fragments { + fragment.execute(tx, &mut metrics, &mut |row| { + count += 1; + list.push(row); + Ok(()) + })?; + } + + let list = list.finish(); + metrics.bytes_scanned += list.num_bytes(); + metrics.bytes_sent_to_clients += list.num_bytes(); + Ok((list, count, metrics)) +} + /// Execute a subscription query pub fn execute_plan(plan_fragments: &[PipelinedProject], tx: &Tx) -> Result<(F::List, u64, ExecutionMetrics)> where @@ -123,6 +156,48 @@ pub enum TableUpdateType { Unsubscribe, } +/// Execute a subscription query over a view and collect the results in a [TableUpdate]. +/// +/// Specifically this utility is for queries that return rows from a view. +/// Unlike user tables, views have internal columns that should not be returned to clients. +/// The [`ViewProject`] operator implicitly drops these columns as part of its execution. +/// +/// NOTE: This method was largely copied from [`collect_table_update`]. +/// TODO: Merge with [`collect_table_update`]. +pub fn collect_table_update_for_view( + plan_fragments: &[ViewProject], + table_id: TableId, + table_name: Box, + tx: &Tx, + update_type: TableUpdateType, +) -> Result<(TableUpdate, ExecutionMetrics)> +where + Tx: Datastore + DeltaStore, + F: BuildableWebsocketFormat, +{ + execute_plan_for_view::(plan_fragments, tx).map(|(rows, num_rows, metrics)| { + let empty = F::List::default(); + let qu = match update_type { + TableUpdateType::Subscribe => QueryUpdate { + deletes: empty, + inserts: rows, + }, + TableUpdateType::Unsubscribe => QueryUpdate { + deletes: rows, + inserts: empty, + }, + }; + // We will compress the outer server message, + // after we release the tx lock. + // There's no need to compress the inner table update too. + let update = F::into_query_update(qu, Compression::None); + ( + TableUpdate::new(table_id, table_name, SingleQueryUpdate { update, num_rows }), + metrics, + ) + }) +} + /// Execute a subscription query and collect the results in a [TableUpdate] pub fn collect_table_update( plan_fragments: &[PipelinedProject], @@ -160,6 +235,7 @@ where /// Execute a collection of subscription queries in parallel pub fn execute_plans( + auth: &AuthCtx, plans: &[Arc], tx: &Tx, update_type: TableUpdateType, @@ -177,16 +253,30 @@ where plan.table_ids().all(|table_id| tx.row_count(table_id) > 0) }) .map(|(sql, plan)| (sql, plan, plan.subscribed_table_id(), plan.subscribed_table_name())) + .map(|(sql, plan, table_id, table_name)| (sql, plan.optimized_physical_plan().clone(), table_id, table_name)) + .map(|(sql, plan, table_id, table_name)| (sql, plan.optimize(auth), table_id, table_name)) .map(|(sql, plan, table_id, table_name)| { - plan.optimized_physical_plan() - .clone() - .optimize() - .map(|plan| (sql, PipelinedProject::from(plan))) - .and_then(|(_, plan)| collect_table_update(&[plan], table_id, (&**table_name).into(), tx, update_type)) - .map_err(|err| DBError::WithSql { - sql: sql.into(), - error: Box::new(DBError::Other(err)), - }) + plan.and_then(|plan| { + if plan.returns_view_table() { + if let Some(schema) = plan.return_table() { + let plan = PipelinedProject::from(plan); + let plan = ViewProject::new(plan, schema.num_cols(), schema.num_private_cols()); + return collect_table_update_for_view( + &[plan], + table_id, + (&**table_name).into(), + tx, + update_type, + ); + } + } + let plan = PipelinedProject::from(plan); + collect_table_update(&[plan], table_id, (&**table_name).into(), tx, update_type) + }) + .map_err(|err| DBError::WithSql { + sql: sql.into(), + error: Box::new(DBError::Other(err)), + }) }) .collect::, _>>() .map(|table_updates_with_metrics| { diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 1c1f492b285..e92c5d35e48 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -16,8 +16,8 @@ use crate::error::DBError; use crate::estimation::estimate_rows_scanned; use crate::host::module_host::{DatabaseUpdate, EventStatus, ModuleEvent}; use crate::messages::websocket::Subscribe; -use crate::subscription::execute_plans; use crate::subscription::query::is_subscribe_to_all_tables; +use crate::subscription::{collect_table_update_for_view, execute_plans}; use crate::util::prometheus_handle::IntGaugeExt; use crate::vm::check_row_limit; use crate::worker_metrics::WORKER_METRICS; @@ -34,7 +34,7 @@ use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics; use spacetimedb_datastore::locking_tx_datastore::TxId; use spacetimedb_datastore::traits::TxData; use spacetimedb_durability::TxOffset; -use spacetimedb_execution::pipelined::PipelinedProject; +use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject}; use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_lib::Identity; @@ -253,19 +253,54 @@ impl ModuleSubscriptions { .plans_fragments() .map(|fragment| fragment.optimized_physical_plan()) .cloned() - .map(|plan| plan.optimize()) - .collect::, _>>()? - .into_iter() - .map(PipelinedProject::from) - .collect::>(); + .map(|plan| plan.optimize(auth)) + .collect::, _>>()?; + + let view_info = plans + .first() + .and_then(|plan| plan.return_table()) + .and_then(|schema| schema.view_info); + + let num_cols = plans + .first() + .and_then(|plan| plan.return_table()) + .map(|schema| schema.num_cols()) + .unwrap_or_default(); let tx = DeltaTx::from(tx); - Ok(match sender.config.protocol { - Protocol::Binary => collect_table_update(&plans, table_id, table_name.into(), &tx, update_type) - .map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics)), - Protocol::Text => collect_table_update(&plans, table_id, table_name.into(), &tx, update_type) - .map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics)), + // TODO: See the comment on `collect_table_update_for_view`. + // The following view and non-view branches should be merged together, + // since the only difference between them is the row type that is returned. + Ok(match (sender.config.protocol, view_info) { + (Protocol::Binary, Some(view_info)) => { + let plans = plans + .into_iter() + .map(PipelinedProject::from) + .map(|plan| ViewProject::new(plan, num_cols, view_info.num_private_cols())) + .collect::>(); + collect_table_update_for_view(&plans, table_id, table_name.into(), &tx, update_type) + .map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics)) + } + (Protocol::Binary, None) => { + let plans = plans.into_iter().map(PipelinedProject::from).collect::>(); + collect_table_update(&plans, table_id, table_name.into(), &tx, update_type) + .map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics)) + } + (Protocol::Text, Some(view_info)) => { + let plans = plans + .into_iter() + .map(PipelinedProject::from) + .map(|plan| ViewProject::new(plan, num_cols, view_info.num_private_cols())) + .collect::>(); + collect_table_update_for_view(&plans, table_id, table_name.into(), &tx, update_type) + .map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics)) + } + (Protocol::Text, None) => { + let plans = plans.into_iter().map(PipelinedProject::from).collect::>(); + collect_table_update(&plans, table_id, table_name.into(), &tx, update_type) + .map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics)) + } }?) } @@ -292,11 +327,11 @@ impl ModuleSubscriptions { let tx = DeltaTx::from(tx); match sender.config.protocol { Protocol::Binary => { - let (update, metrics) = execute_plans(queries, &tx, update_type)?; + let (update, metrics) = execute_plans(auth, queries, &tx, update_type)?; Ok((FormatSwitch::Bsatn(update), metrics)) } Protocol::Text => { - let (update, metrics) = execute_plans(queries, &tx, update_type)?; + let (update, metrics) = execute_plans(auth, queries, &tx, update_type)?; Ok((FormatSwitch::Json(update), metrics)) } } @@ -817,9 +852,9 @@ impl ModuleSubscriptions { let tx = DeltaTx::from(&*tx); let (database_update, metrics) = match sender.config.protocol { - Protocol::Binary => execute_plans(&queries, &tx, TableUpdateType::Subscribe) + Protocol::Binary => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe) .map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics))?, - Protocol::Text => execute_plans(&queries, &tx, TableUpdateType::Subscribe) + Protocol::Text => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe) .map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))?, }; diff --git a/crates/execution/src/pipelined.rs b/crates/execution/src/pipelined.rs index e2ba897eddb..9a9a1550f04 100644 --- a/crates/execution/src/pipelined.rs +++ b/crates/execution/src/pipelined.rs @@ -11,7 +11,7 @@ use spacetimedb_physical_plan::plan::{ HashJoin, IxJoin, IxScan, PhysicalExpr, PhysicalPlan, ProjectField, ProjectListPlan, ProjectPlan, Sarg, Semi, TableScan, TupleField, }; -use spacetimedb_primitives::{ColId, IndexId, TableId}; +use spacetimedb_primitives::{ColId, ColList, IndexId, TableId}; use spacetimedb_sats::product; use crate::{Datastore, DeltaStore, Row, Tuple}; @@ -22,6 +22,7 @@ use crate::{Datastore, DeltaStore, Row, Tuple}; /// Hence this operator is not particularly optimized. pub enum ProjectListExecutor { Name(Vec), + View(Vec), List(Vec, Vec), Limit(Box, u64), Agg(Vec, AggType), @@ -29,7 +30,41 @@ pub enum ProjectListExecutor { impl From for ProjectListExecutor { fn from(plan: ProjectListPlan) -> Self { + /// A helper that checks if a [`ProjectListPlan`] returns an unprojected view table + fn returns_view_table(plans: &[ProjectPlan]) -> bool { + plans.first().is_some_and(|plan| plan.returns_view_table()) + } + + /// A helper that returns the number of columns returned by this [`ProjectListPlan`] + fn num_cols(plans: &[ProjectPlan]) -> usize { + plans + .first() + .and_then(|plan| plan.return_table()) + .map(|schema| schema.num_cols()) + .unwrap_or_default() + } + + /// A helper that returns the number of private columns returned by this [`ProjectListPlan`] + fn num_private_cols(plans: &[ProjectPlan]) -> usize { + plans + .first() + .and_then(|plan| plan.return_table()) + .map(|schema| schema.num_private_cols()) + .unwrap_or_default() + } + match plan { + ProjectListPlan::Name(plans) if returns_view_table(&plans) => { + let num_cols = num_cols(&plans); + let num_private_cols = num_private_cols(&plans); + Self::View( + plans + .into_iter() + .map(PipelinedProject::from) + .map(|plan| ViewProject::new(plan, num_cols, num_private_cols)) + .collect(), + ) + } ProjectListPlan::Name(plan) => Self::Name(plan.into_iter().map(PipelinedProject::from).collect()), ProjectListPlan::List(plan, fields) => { Self::List(plan.into_iter().map(PipelinedExecutor::from).collect(), fields) @@ -62,6 +97,14 @@ impl ProjectListExecutor { })?; } } + Self::View(plans) => { + for plan in plans { + plan.execute(tx, metrics, &mut |row| { + n += 1; + f(row) + })?; + } + } Self::List(plans, fields) => { for plan in plans { plan.execute(tx, metrics, &mut |t| { @@ -109,6 +152,60 @@ impl ProjectListExecutor { } } +/// An executor for a query that returns rows from a view. +/// Essentially just a projection that drops the view's private columns. +/// +/// Unlike user tables, view tables can have private columns. +/// For example, if a view is not anonymous, its backing table will have a `sender` column. +/// This column tracks which rows belong to which caller of the view. +/// However we must remove this column before sending rows from the view to a client. +/// +/// See `TableSchema::from_view_def_for_datastore` for more details. +#[derive(Debug)] +pub struct ViewProject { + num_cols: usize, + num_private_cols: usize, + inner: PipelinedProject, +} + +impl ViewProject { + pub fn new(inner: PipelinedProject, num_cols: usize, num_private_cols: usize) -> Self { + Self { + inner, + num_cols, + num_private_cols, + } + } + + pub fn execute( + &self, + tx: &Tx, + metrics: &mut ExecutionMetrics, + f: &mut dyn FnMut(ProductValue) -> Result<()>, + ) -> Result<()> { + let mut n = 0; + let mut bytes_scanned = 0; + self.inner.execute(tx, metrics, &mut |row| match row { + Row::Ptr(ptr) => { + n += 1; + let col_list = ColList::from_iter(self.num_private_cols..self.num_cols); + let row = ptr.project_product(&col_list)?; + bytes_scanned += row.size_of(); + f(row) + } + Row::Ref(val) => { + n += 1; + let col_list = ColList::from_iter(self.num_private_cols..self.num_cols); + let row = val.project_product(&col_list)?; + bytes_scanned += row.size_of(); + f(row) + } + })?; + metrics.rows_scanned += n; + Ok(()) + } +} + /// Implements a projection on top of a pipelined executor #[derive(Debug)] pub enum PipelinedProject { diff --git a/crates/physical-plan/src/dml.rs b/crates/physical-plan/src/dml.rs index b91db57cf24..e1d84aad305 100644 --- a/crates/physical-plan/src/dml.rs +++ b/crates/physical-plan/src/dml.rs @@ -5,7 +5,7 @@ use spacetimedb_expr::{ expr::{ProjectName, RelExpr, Relvar}, statement::{TableDelete, TableInsert, TableUpdate}, }; -use spacetimedb_lib::{AlgebraicValue, ProductValue}; +use spacetimedb_lib::{identity::AuthCtx, AlgebraicValue, ProductValue}; use spacetimedb_primitives::ColId; use spacetimedb_schema::schema::TableOrViewSchema; @@ -20,11 +20,11 @@ pub enum MutationPlan { impl MutationPlan { /// Optimizes the filters in updates and deletes - pub fn optimize(self) -> Result { + pub fn optimize(self, auth: &AuthCtx) -> Result { match self { Self::Insert(..) => Ok(self), - Self::Delete(plan) => Ok(Self::Delete(plan.optimize()?)), - Self::Update(plan) => Ok(Self::Update(plan.optimize()?)), + Self::Delete(plan) => Ok(Self::Delete(plan.optimize(auth)?)), + Self::Update(plan) => Ok(Self::Update(plan.optimize(auth)?)), } } } @@ -51,9 +51,9 @@ pub struct DeletePlan { impl DeletePlan { /// Optimize the filter part of the delete - fn optimize(self) -> Result { + fn optimize(self, auth: &AuthCtx) -> Result { let Self { table, filter } = self; - let filter = filter.optimize()?; + let filter = filter.optimize(auth)?; Ok(Self { table, filter }) } @@ -85,9 +85,9 @@ pub struct UpdatePlan { impl UpdatePlan { /// Optimize the filter part of the update - fn optimize(self) -> Result { + fn optimize(self, auth: &AuthCtx) -> Result { let Self { table, columns, filter } = self; - let filter = filter.optimize()?; + let filter = filter.optimize(auth)?; Ok(Self { columns, table, filter }) } diff --git a/crates/physical-plan/src/plan.rs b/crates/physical-plan/src/plan.rs index a516e89b4d4..76851923982 100644 --- a/crates/physical-plan/src/plan.rs +++ b/crates/physical-plan/src/plan.rs @@ -8,7 +8,7 @@ use anyhow::{bail, Result}; use derive_more::From; use either::Either; use spacetimedb_expr::{expr::AggType, StatementSource}; -use spacetimedb_lib::{query::Delta, sats::size_of::SizeOf, AlgebraicValue, ProductValue}; +use spacetimedb_lib::{identity::AuthCtx, query::Delta, sats::size_of::SizeOf, AlgebraicValue, ProductValue}; use spacetimedb_primitives::{ColId, ColSet, IndexId, TableId}; use spacetimedb_schema::schema::{IndexSchema, TableSchema}; use spacetimedb_sql_parser::ast::{BinOp, LogOp}; @@ -69,11 +69,11 @@ impl DerefMut for ProjectPlan { } impl ProjectPlan { - pub fn optimize(self) -> Result { + pub fn optimize(self, auth: &AuthCtx) -> Result { match self { - Self::None(plan) => Ok(Self::None(plan.optimize(vec![])?)), + Self::None(plan) => Ok(Self::None(plan.optimize(auth, vec![])?)), Self::Name(plan, label, _) => { - let plan = plan.optimize(vec![label])?; + let plan = plan.optimize(auth, vec![label])?; let n = plan.nfields(); let pos = plan.position(&label); Ok(match n { @@ -90,6 +90,19 @@ impl ProjectPlan { Self::None(plan) | Self::Name(plan, ..) => plan, } } + + /// Does this plan select or return whole (unprojected) rows from a single table? + pub fn return_table(&self) -> Option> { + match self { + Self::None(plan) => plan.return_table(), + Self::Name(plan, label, _) => plan.find_table_schema(label), + } + } + + /// Does this plan select or return whole (unprojected) rows from a view? + pub fn returns_view_table(&self) -> bool { + self.return_table().is_some_and(|schema| schema.is_view()) + } } /// Physical plans always terminate with a projection. @@ -127,13 +140,15 @@ pub enum ProjectListPlan { } impl ProjectListPlan { - pub fn optimize(self) -> Result { + pub fn optimize(self, auth: &AuthCtx) -> Result { match self { Self::Name(plan) => Ok(Self::Name( - plan.into_iter().map(|plan| plan.optimize()).collect::>()?, + plan.into_iter() + .map(|plan| plan.optimize(auth)) + .collect::>()?, )), Self::Limit(plan, n) => { - let mut limit = Self::Limit(Box::new(plan.optimize()?), n); + let mut limit = Self::Limit(Box::new(plan.optimize(auth)?), n); // Merge a limit with a scan if possible if PushLimit::matches(&limit).is_some() { limit = PushLimit::rewrite(limit, ())?; @@ -142,7 +157,7 @@ impl ProjectListPlan { } Self::Agg(plan, agg_type) => Ok(Self::Agg( plan.into_iter() - .map(|plan| plan.optimize(vec![])) + .map(|plan| plan.optimize(auth, vec![])) .collect::>()?, agg_type, )), @@ -152,7 +167,7 @@ impl ProjectListPlan { // Collect the names of the relvars let labels = fields.iter().map(|field| field.label).collect(); // Optimize each plan - let optimized_plan = plan.optimize(labels)?; + let optimized_plan = plan.optimize(auth, labels)?; // Compute the position of each relvar referenced in the projection for TupleField { label, label_pos, .. } in &mut fields { *label_pos = optimized_plan.position(label); @@ -172,6 +187,20 @@ impl ProjectListPlan { Self::Limit(plan, _) => plan.plan_iter(), } } + + /// Does this plan select or return whole (unprojected) rows from a single table? + pub fn return_table(&self) -> Option> { + match self { + Self::Name(plans) => plans.first().and_then(ProjectPlan::return_table), + Self::Limit(plan, _) => plan.return_table(), + Self::List(..) | Self::Agg(..) => None, + } + } + + /// Does this plan select or return whole (unprojected) rows from a view? + pub fn returns_view_table(&self) -> bool { + self.return_table().is_some_and(|schema| schema.is_view()) + } } /// Query operators return tuples of rows. @@ -386,8 +415,9 @@ impl PhysicalPlan { /// 3. Turn filters into index scans if possible /// 4. Determine index and semijoins /// 5. Compute positions for tuple labels - pub fn optimize(self, reqs: Vec