diff --git a/crates/client-api-messages/src/energy.rs b/crates/client-api-messages/src/energy.rs index 7109d9c1e85..955a1566b24 100644 --- a/crates/client-api-messages/src/energy.rs +++ b/crates/client-api-messages/src/energy.rs @@ -126,7 +126,7 @@ impl fmt::Debug for EnergyBalance { /// In contrast to [`EnergyQuanta`], this is represented by a 64-bit integer. This makes energy handling /// for reducers easier, while still providing a unlikely-to-ever-be-reached maximum value (e.g. for wasmtime: /// `(u64::MAX eV / 1000 eV/instruction) * 3 ns/instruction = 640 days`) -#[derive(Copy, Clone, From, Add, Sub)] +#[derive(Copy, Clone, From, Add, Sub, AddAssign, SubAssign)] pub struct FunctionBudget(u64); impl FunctionBudget { diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index a87dab8915a..ba38b5676d3 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -1,6 +1,5 @@ use crate::db::MetricsRecorderQueue; use crate::error::{DBError, DatabaseError, RestoreSnapshotError}; -use crate::host::ArgsTuple; use crate::messages::control_db::HostType; use crate::subscription::ExecutionCounters; use crate::util::{asyncify, spawn_rayon}; @@ -9,7 +8,6 @@ use anyhow::{anyhow, Context}; use bytes::Bytes; use enum_map::EnumMap; use fs2::FileExt; -use log::trace; use spacetimedb_commitlog::repo::OnNewSegmentFn; use spacetimedb_commitlog::{self as commitlog, SizeOnDisk}; use spacetimedb_data_structures::map::IntSet; @@ -46,7 +44,9 @@ use spacetimedb_paths::server::{CommitLogDir, ReplicaDir, SnapshotsPath}; use spacetimedb_primitives::*; use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type; use spacetimedb_sats::memory_usage::MemoryUsage; -use spacetimedb_sats::{AlgebraicType, AlgebraicTypeRef, AlgebraicValue, ProductType, ProductValue, Typespace}; +use spacetimedb_sats::{ + AlgebraicType, AlgebraicTypeRef, AlgebraicValue, ProductType, ProductValue, Typespace, WithTypespace, +}; use spacetimedb_schema::def::{ModuleDef, TableDef, ViewDef}; use spacetimedb_schema::schema::{ ColumnSchema, IndexSchema, RowLevelSecuritySchema, Schema, SequenceSchema, TableSchema, @@ -1078,11 +1078,11 @@ impl RelationalDB { tx: &mut MutTx, module_def: &ModuleDef, view_def: &ViewDef, - ) -> Result<(ViewDatabaseId, TableId), DBError> { + ) -> Result<(ViewId, TableId), DBError> { Ok(tx.create_view(module_def, view_def)?) } - pub fn drop_view(&self, tx: &mut MutTx, view_id: ViewDatabaseId) -> Result<(), DBError> { + pub fn drop_view(&self, tx: &mut MutTx, view_id: ViewId) -> Result<(), DBError> { Ok(tx.drop_view(view_id)?) } @@ -1173,7 +1173,7 @@ impl RelationalDB { Ok(self.inner.rename_table_mut_tx(tx, table_id, new_name)?) } - pub fn view_id_from_name_mut(&self, tx: &MutTx, view_name: &str) -> Result, DBError> { + pub fn view_id_from_name_mut(&self, tx: &MutTx, view_name: &str) -> Result, DBError> { Ok(self.inner.view_id_from_name_mut_tx(tx, view_name)?) } @@ -1509,93 +1509,122 @@ impl RelationalDB { }) } - /// Materialize View backing table. + /// Write `bytes` into a (sender) view's backing table. /// /// # Process - /// 1. Serializes view arguments into `ST_VIEW_ARG_ID` - /// 2. Deletes stale rows matching the view arguments - /// 3. Deserializes the new view execution results - /// 4. Inserts fresh rows with the corresponding arg_id + /// 1. Delete all rows for `sender` from the view's backing table + /// 2. Deserialize `bytes` + /// 3. Insert the new rows into the backing table /// /// # Arguments /// * `tx` - Mutable transaction context - /// * `view` - Name of the view to update - /// * `args` - Arguments passed to the view call - /// * `return_type` - Expected return type of the view - /// * `bytes` - Serialized (bsatn encoded) return value from view execution + /// * `table_id` - The id of the view's backing table + /// * `sender` - The calling identity of the view being updated + /// * `row_type` - Expected return type of the view + /// * `bytes` - An array of product values (bsatn encoded) /// * `typespace` - Type information for deserialization - /// * `caller_identity` - Identity of the caller (for non-anonymous views) #[allow(clippy::too_many_arguments)] pub fn materialize_view( &self, tx: &mut MutTxId, - view: &str, - args: ArgsTuple, - return_type: AlgebraicTypeRef, + table_id: TableId, + sender: Identity, + row_type: AlgebraicTypeRef, bytes: Bytes, typespace: &Typespace, - caller_identity: Identity, ) -> Result<(), DBError> { - // Fetch view metadata - let st_view_row = tx.lookup_st_view_by_name(view)?; - let table_id = st_view_row.table_id.expect("View table must exist for materialization"); - let view_id = st_view_row.view_id; - let is_anonymous = st_view_row.is_anonymous; - let arg_id = tx.get_or_insert_st_view_arg(args.get_bsatn())?; - - // Build the filter key for identifying rows to update - let mut input_args = Vec::new(); - if !is_anonymous { - input_args.push(AlgebraicValue::OptionSome(caller_identity.into())); - } - if !tx.is_view_parameterized(view_id)? { - input_args.push(AlgebraicValue::U64(arg_id)); - } - let input_args = ProductValue { - elements: input_args.into_boxed_slice(), - }; - let col_list: ColList = (0..input_args.elements.len()).collect(); - - // Remove stale View entries - let rows_to_delete: Vec<_> = self - .iter_by_col_eq_mut(tx, table_id, col_list, &input_args.clone().into())? + // Delete rows for `sender` from the backing table + let rows_to_delete = self + .iter_by_col_eq_mut(tx, table_id, ColId(0), &sender.into())? .map(|res| res.pointer()) - .collect(); - - let deleted_count = self.delete(tx, table_id, rows_to_delete); - trace!("Deleted {deleted_count} stale rows from view table {table_id} for arg_id {arg_id}"); - - // Deserialize the return value - let seed = spacetimedb_sats::WithTypespace::new(typespace, &return_type).resolve(return_type); - let return_val = seed + .collect::>(); + self.delete(tx, table_id, rows_to_delete); + + // Deserialize the return rows. + // The return type is expected to be an array of products. + let row_type = typespace.resolve(row_type); + let ret_type = AlgebraicType::array(row_type.ty().clone()); + let seed = WithTypespace::new(typespace, &ret_type); + let rows = seed .deserialize(bsatn::Deserializer::new(&mut &bytes[..])) .map_err(|e| DatastoreError::from(ViewError::DeserializeReturn(e.to_string())))?; - // Extract products from return value (must be array) - let products: Vec = return_val + // Insert new rows into the backing table + for product in rows .into_array() - .expect("Expected return_val to be an array") + .map_err(|_| ViewError::SerializeRow) + .map_err(DatastoreError::from)? .into_iter() - .map(|v| v.into_product().expect("Expected array elements to be ProductValue")) - .collect(); - - // Insert fresh results into the view table - let mut elements: Vec = - Vec::with_capacity(input_args.elements.len() + products.first().map_or(0, |p| p.elements.len())); - for product in products { - elements.clear(); - elements.extend_from_slice(&input_args.elements); - elements.extend_from_slice(&product.elements); + { + let product = product + .into_product() + .map_err(|_| ViewError::SerializeRow) + .map_err(DatastoreError::from)?; + self.insert( + tx, + table_id, + &ProductValue::from_iter(std::iter::once(sender.into()).chain(product.elements)) + .to_bsatn_vec() + .map_err(|_| ViewError::SerializeRow) + .map_err(DatastoreError::from)?, + )?; + } - let row = ProductValue { - elements: elements.as_slice().into(), - }; + Ok(()) + } - let row_bytes = row - .to_bsatn_vec() - .map_err(|_| DatastoreError::from(ViewError::SerializeRow))?; + /// Write `bytes` into an anonymous view's backing table. + /// + /// # Process + /// 1. Clear the view's backing table + /// 2. Deserialize `bytes` + /// 3. Insert the new rows into the backing table + /// + /// # Arguments + /// * `tx` - Mutable transaction context + /// * `table_id` - The id of the view's backing table + /// * `row_type` - Expected return type of the view + /// * `bytes` - An array of product values (bsatn encoded) + /// * `typespace` - Type information for deserialization + #[allow(clippy::too_many_arguments)] + pub fn materialize_anonymous_view( + &self, + tx: &mut MutTxId, + table_id: TableId, + row_type: AlgebraicTypeRef, + bytes: Bytes, + typespace: &Typespace, + ) -> Result<(), DBError> { + // Clear entire backing table + self.clear_table(tx, table_id)?; + + // Deserialize the return rows. + // The return type is expected to be an array of products. + let row_type = typespace.resolve(row_type); + let ret_type = AlgebraicType::array(row_type.ty().clone()); + let seed = WithTypespace::new(typespace, &ret_type); + let rows = seed + .deserialize(bsatn::Deserializer::new(&mut &bytes[..])) + .map_err(|e| DatastoreError::from(ViewError::DeserializeReturn(e.to_string())))?; - self.insert(tx, table_id, &row_bytes)?; + // Insert new rows into the backing table + for product in rows + .into_array() + .map_err(|_| ViewError::SerializeRow) + .map_err(DatastoreError::from)? + .into_iter() + { + self.insert( + tx, + table_id, + &product + .into_product() + .map_err(|_| ViewError::SerializeRow) + .map_err(DatastoreError::from)? + .to_bsatn_vec() + .map_err(|_| ViewError::SerializeRow) + .map_err(DatastoreError::from)?, + )?; } Ok(()) @@ -2195,7 +2224,7 @@ pub mod tests_utils { name: &str, schema: &[(&str, AlgebraicType)], is_anonymous: bool, - ) -> Result<(ViewDatabaseId, TableId), DBError> { + ) -> Result<(ViewId, TableId), DBError> { let mut builder = RawModuleDefV9Builder::new(); // Add the view's product type to the typespace @@ -2314,10 +2343,9 @@ mod tests { use super::tests_utils::begin_mut_tx; use super::*; use crate::db::relational_db::tests_utils::{ - begin_tx, create_view_for_test, insert, make_snapshot, with_auto_commit, with_read_only, TestDB, + begin_tx, insert, make_snapshot, with_auto_commit, with_read_only, TestDB, }; use anyhow::bail; - use bytes::Bytes; use commitlog::payload::txdata; use commitlog::Commitlog; use durability::EmptyHistory; @@ -2325,7 +2353,6 @@ mod tests { use spacetimedb_data_structures::map::IntMap; use spacetimedb_datastore::error::{DatastoreError, IndexError}; use spacetimedb_datastore::execution_context::ReducerContext; - use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, ViewCall}; use spacetimedb_datastore::system_tables::{ system_tables, StConstraintRow, StIndexRow, StSequenceRow, StTableRow, ST_CONSTRAINT_ID, ST_INDEX_ID, ST_SEQUENCE_ID, ST_TABLE_ID, @@ -2958,57 +2985,6 @@ mod tests { Ok(()) } - #[test] - fn test_is_materialized() -> anyhow::Result<()> { - let stdb = TestDB::in_memory()?; - let schema = [("col1", AlgebraicType::I64), ("col2", AlgebraicType::I64)]; - let table_schema = table("MyTable", ProductType::from(schema), |b| b); - - let view_schema = [("view_col", AlgebraicType::I64)]; - let view_name = "MyView"; - let args: Bytes = vec![].into(); - let sender = Identity::ZERO; - let (view_id, _) = create_view_for_test(&stdb, view_name, &view_schema, true)?; - - let mut tx = begin_mut_tx(&stdb); - let table_id = stdb.create_table(&mut tx, table_schema)?; - - assert!( - !tx.is_materialized(view_name, args.clone(), sender)?.0, - "view should not be materialized as read set is not recorded yet" - ); - - let view_call = FuncCallType::View(ViewCall::anonymous(view_id, args)); - tx.record_table_scan(&view_call, table_id); - assert!( - tx.is_materialized(view_name, vec![].into(), sender)?.0, - "view should be materialized as read set is recorded" - ); - stdb.commit_tx(tx)?; - - let tx = begin_mut_tx(&stdb); - assert!( - tx.is_materialized(view_name, vec![].into(), sender)?.0, - "view should be materialized after commit" - ); - stdb.commit_tx(tx)?; - - let mut tx = begin_mut_tx(&stdb); - stdb.insert( - &mut tx, - table_id, - &product![AlgebraicValue::I64(1), AlgebraicValue::I64(2)].to_bsatn_vec()?, - )?; - stdb.commit_tx(tx)?; - - let tx = begin_mut_tx(&stdb); - assert!( - !tx.is_materialized(view_name, vec![].into(), sender)?.0, - "view should not be materialized after table modification" - ); - Ok(()) - } - #[test] /// Test that iteration yields each row only once /// in the edge case where a row is committed and has been deleted and re-inserted within the iterating TX. diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 9760327011d..327faca0489 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -171,22 +171,6 @@ impl From<&EventStatus> for ReducerOutcome { } } -pub enum ViewOutcome { - Success, - Failed(String), - BudgetExceeded, -} - -impl From for ViewOutcome { - fn from(status: EventStatus) -> Self { - match status { - EventStatus::Committed(_) => ViewOutcome::Success, - EventStatus::Failed(e) => ViewOutcome::Failed(e), - EventStatus::OutOfEnergy => ViewOutcome::BudgetExceeded, - } - } -} - #[derive(Clone, Debug)] pub struct ProcedureCallResult { pub return_val: AlgebraicValue, diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 9e482093fe2..cd35356dd62 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -10,7 +10,6 @@ use crate::energy::EnergyQuanta; use crate::error::DBError; use crate::estimation::estimate_rows_scanned; use crate::hash::Hash; -use crate::host::host_controller::ViewOutcome; use crate::host::{InvalidFunctionArguments, InvalidViewArguments}; use crate::identity::Identity; use crate::messages::control_db::{Database, HostType}; @@ -34,12 +33,13 @@ use itertools::Itertools; use prometheus::{Histogram, IntGauge}; use scopeguard::ScopeGuard; use spacetimedb_auth::identity::ConnectionAuthCtx; +use spacetimedb_client_api_messages::energy::FunctionBudget; use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOffTable, QueryUpdate}; use spacetimedb_data_structures::error_stream::ErrorStream; use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap}; use spacetimedb_datastore::error::DatastoreError; use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType}; -use spacetimedb_datastore::locking_tx_datastore::MutTxId; +use spacetimedb_datastore::locking_tx_datastore::{MutTxId, ViewCallInfo}; use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID}; use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData}; use spacetimedb_durability::DurableOffset; @@ -50,7 +50,7 @@ use spacetimedb_lib::identity::{AuthCtx, RequestId}; use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_lib::ConnectionId; use spacetimedb_lib::Timestamp; -use spacetimedb_primitives::{ArgId, ProcedureId, TableId, ViewDatabaseId, ViewId}; +use spacetimedb_primitives::{ArgId, ProcedureId, TableId, ViewFnPtr, ViewId}; use spacetimedb_query::compile_subscription; use spacetimedb_sats::{AlgebraicTypeRef, ProductValue}; use spacetimedb_schema::auto_migrate::{AutoMigrateError, MigrationPolicy}; @@ -408,7 +408,7 @@ impl Instance { fn call_view(&mut self, tx: MutTxId, params: CallViewParams) -> ViewCallResult { match self { - Instance::Wasm(inst) => inst.call_view(tx, params), + Instance::Wasm(inst) => inst.call_view_with_tx(tx, params), Instance::Js(inst) => inst.call_view(tx, params), } } @@ -539,18 +539,19 @@ pub struct CallReducerParams { } pub struct CallViewParams { - pub timestamp: Timestamp, - pub caller_identity: Identity, - pub caller_connection_id: Option, + pub view_name: Box, pub view_id: ViewId, - pub view_db_id: ViewDatabaseId, + pub table_id: TableId, + pub fn_ptr: ViewFnPtr, + /// This is not always the same identity as `sender`. + /// For subscribe and sql calls it will be. + /// However for atomic view update after a reducer call, + /// this will be the caller of the reducer. + pub caller: Identity, + pub sender: Option, pub args: ArgsTuple, - - /// The reference of return type of the view, used for deserializing the view call result. - /// This type information is obtained from the [`ViewDef::product_type_ref`]. - pub return_type: AlgebraicTypeRef, - /// Whether the view is being called anonymously (i.e., without a client identity). - pub is_anonymous: bool, + pub row_type: AlgebraicTypeRef, + pub timestamp: Timestamp, } pub struct CallProcedureParams { @@ -715,11 +716,40 @@ pub enum ReducerCallError { LifecycleReducer(Lifecycle), } +pub enum ViewOutcome { + Success, + Failed(String), + BudgetExceeded, +} + +impl From for ViewOutcome { + fn from(status: EventStatus) -> Self { + match status { + EventStatus::Committed(_) => ViewOutcome::Success, + EventStatus::Failed(e) => ViewOutcome::Failed(e), + EventStatus::OutOfEnergy => ViewOutcome::BudgetExceeded, + } + } +} + pub struct ViewCallResult { pub outcome: ViewOutcome, pub tx: MutTxId, - pub energy_used: EnergyQuanta, - pub execution_duration: Duration, + pub energy_used: FunctionBudget, + pub total_duration: Duration, + pub abi_duration: Duration, +} + +impl ViewCallResult { + pub fn default(tx: MutTxId) -> Self { + Self { + outcome: ViewOutcome::Success, + energy_used: FunctionBudget::ZERO, + total_duration: Duration::ZERO, + abi_duration: Duration::ZERO, + tx, + } + } } #[derive(thiserror::Error, Debug)] @@ -730,10 +760,14 @@ pub enum ViewCallError { NoSuchModule(#[from] NoSuchModule), #[error("no such view")] NoSuchView, + #[error("Table does not exist for view `{0}`")] + TableDoesNotExist(ViewId), #[error("missing client connection for view call trigged by subscription")] MissingClientConnection, #[error("DB error during view call: {0}")] DatastoreError(#[from] DatastoreError), + #[error("The module instance encountered a fatal error: {0}")] + InternalError(String), } #[derive(thiserror::Error, Debug)] @@ -1498,98 +1532,127 @@ impl ModuleHost { &self, mut tx: MutTxId, view_collector: &impl CollectViews, - sender: Identity, + caller: Identity, workload: Workload, ) -> Result { use FunctionArgs::*; let mut view_ids = HashSet::new(); view_collector.collect_views(&mut view_ids); for view_id in view_ids { - let name = tx.lookup_st_view(view_id)?.view_name; - if !tx.is_view_materialized(view_id, ArgId::SENTINEL, sender)? { - tx = self.call_view(tx, &name, Nullary, sender, None).await?.tx; + let st_view_row = tx.lookup_st_view(view_id)?; + let view_name = st_view_row.view_name; + let view_id = st_view_row.view_id; + let table_id = st_view_row.table_id.ok_or(ViewCallError::TableDoesNotExist(view_id))?; + if !tx.is_view_materialized(view_id, ArgId::SENTINEL, caller)? { + tx = self + .call_view(tx, &view_name, view_id, table_id, Nullary, caller, Some(caller)) + .await? + .tx; } // If this is a sql call, we only update this view's "last called" timestamp if let Workload::Sql = workload { - tx.update_view_timestamp(view_id, ArgId::SENTINEL, sender)?; + tx.update_view_timestamp(view_id, ArgId::SENTINEL, caller)?; } // If this is a subscribe call, we also increment this view's subscriber count if let Workload::Subscribe = workload { - tx.subscribe_view(view_id, ArgId::SENTINEL, sender)?; + tx.subscribe_view(view_id, ArgId::SENTINEL, caller)?; } } Ok(tx) } + pub async fn call_views_with_tx(&self, tx: MutTxId, caller: Identity) -> Result { + use FunctionArgs::*; + let mut out = ViewCallResult::default(tx); + for ViewCallInfo { + view_id, + table_id, + view_name, + sender, + } in out.tx.view_for_update().cloned().collect::>() + { + let result = self + .call_view(out.tx, &view_name, view_id, table_id, Nullary, caller, sender) + .await?; + + // Increment execution stats + out.tx = result.tx; + out.outcome = result.outcome; + out.energy_used += result.energy_used; + out.total_duration += result.total_duration; + out.abi_duration += result.abi_duration; + + // Terminate early if execution failed + if !matches!(out.outcome, ViewOutcome::Success) { + break; + } + } + Ok(out) + } + pub async fn call_view( &self, tx: MutTxId, view_name: &str, + view_id: ViewId, + table_id: TableId, args: FunctionArgs, - caller_identity: Identity, - caller_connection_id: Option, + caller: Identity, + sender: Option, ) -> Result { - let (view_id, view_def) = self - .info - .module_def - .view_full(view_name) - .ok_or(ViewCallError::NoSuchView)?; - - let view_seed = ArgsSeed(self.info.module_def.typespace().with_type(view_def)); + let module_def = &self.info.module_def; + let view_def = module_def.view(view_name).ok_or(ViewCallError::NoSuchView)?; + let fn_ptr = view_def.fn_ptr; + let row_type = view_def.product_type_ref; + let typespace = module_def.typespace().with_type(view_def); + let view_seed = ArgsSeed(typespace); let args = args.into_tuple(view_seed).map_err(InvalidViewArguments)?; - let res = self - .call_view_inner( - tx, - view_id, - view_def, - args.clone(), - caller_identity, - caller_connection_id, - ) - .await; - - let log_message = match &res { - Err(ViewCallError::NoSuchView) => Some(no_such_function_log_message("view", view_name)), - Err(ViewCallError::Args(_)) => Some(args_error_log_message("view", view_name)), - _ => None, - }; - - if let Some(log_message) = log_message { - self.inject_logs(LogLevel::Error, view_name, &log_message) + match self + .call_view_inner(tx, view_name, view_id, table_id, fn_ptr, caller, sender, args, row_type) + .await + { + err @ Err(ViewCallError::NoSuchView) => { + let log_message = no_such_function_log_message("view", view_name); + self.inject_logs(LogLevel::Error, view_name, &log_message); + err + } + err @ Err(ViewCallError::Args(_)) => { + let log_message = args_error_log_message("view", view_name); + self.inject_logs(LogLevel::Error, view_name, &log_message); + err + } + res => res, } - - res } async fn call_view_inner( &self, tx: MutTxId, + name: &str, view_id: ViewId, - view_def: &ViewDef, + table_id: TableId, + fn_ptr: ViewFnPtr, + caller: Identity, + sender: Option, args: ArgsTuple, - caller_identity: Identity, - caller_connection_id: Option, + row_type: AlgebraicTypeRef, ) -> Result { - let return_type = view_def.product_type_ref; - let is_anonymous = view_def.is_anonymous; - let view_db_id = tx - .view_id_from_name(&view_def.name)? - .ok_or_else(|| ViewCallError::NoSuchView)?; - + let view_name = name.to_owned().into_boxed_str(); Ok(self - .call(&view_def.name, move |inst| { + .call(name, move |inst| { inst.call_view( tx, CallViewParams { timestamp: Timestamp::now(), - view_db_id, - caller_identity, - caller_connection_id, + view_name, view_id, + table_id, + fn_ptr, + caller, + sender, args, - return_type, - is_anonymous, + row_type, }, ) }) diff --git a/crates/core/src/host/v8/syscall/v1.rs b/crates/core/src/host/v8/syscall/v1.rs index 082257d70b6..aa3abccde20 100644 --- a/crates/core/src/host/v8/syscall/v1.rs +++ b/crates/core/src/host/v8/syscall/v1.rs @@ -20,7 +20,7 @@ use crate::host::AbiCall; use anyhow::Context; use bytes::Bytes; use spacetimedb_lib::{bsatn, ConnectionId, Identity, RawModuleDef}; -use spacetimedb_primitives::{errno, ColId, IndexId, ReducerId, TableId, ViewId}; +use spacetimedb_primitives::{errno, ColId, IndexId, ReducerId, TableId, ViewFnPtr}; use spacetimedb_sats::Serialize; use v8::{ callback_scope, ConstructorBehavior, Function, FunctionCallbackArguments, Isolate, Local, Module, Object, @@ -424,10 +424,11 @@ pub(super) fn call_call_view( let fun = hooks.call_view.context("`__call_view__` was never defined")?; let ViewOp { - id: ViewId(view_id), - db_id: _, + fn_ptr: ViewFnPtr(view_id), + view_id: _, + table_id: _, name: _, - caller_identity: sender, + sender, timestamp: _, args: view_args, } = op; @@ -456,8 +457,9 @@ pub(super) fn call_call_view_anon( let fun = hooks.call_view_anon.context("`__call_view__` was never defined")?; let AnonymousViewOp { - id: ViewId(view_id), - db_id: _, + fn_ptr: ViewFnPtr(view_id), + view_id: _, + table_id: _, name: _, timestamp: _, args: view_args, diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index c69cfcbcc13..a89c201bfd5 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -1,13 +1,15 @@ use bytes::Bytes; use prometheus::{Histogram, IntCounter, IntGauge}; use spacetimedb_datastore::locking_tx_datastore::FuncCallType; -use spacetimedb_datastore::locking_tx_datastore::ViewCall; +use spacetimedb_datastore::locking_tx_datastore::ViewCallInfo; use spacetimedb_lib::db::raw_def::v9::Lifecycle; use spacetimedb_lib::de::DeserializeSeed as _; use spacetimedb_primitives::ProcedureId; -use spacetimedb_primitives::ViewDatabaseId; +use spacetimedb_primitives::TableId; +use spacetimedb_primitives::ViewFnPtr; use spacetimedb_primitives::ViewId; use spacetimedb_schema::auto_migrate::{MigratePlan, MigrationPolicy, MigrationPolicyError}; +use spacetimedb_schema::def::ModuleDef; use std::future::Future; use std::sync::Arc; use std::time::Duration; @@ -17,11 +19,11 @@ use super::instrumentation::CallTimes; use crate::client::ClientConnectionSender; use crate::database_logger; use crate::energy::{EnergyMonitor, FunctionBudget, FunctionFingerprint}; -use crate::host::host_controller::ViewOutcome; use crate::host::instance_env::InstanceEnv; use crate::host::instance_env::TxSlot; use crate::host::module_common::{build_common_module_from_raw, ModuleCommon}; use crate::host::module_host::ViewCallResult; +use crate::host::module_host::ViewOutcome; use crate::host::module_host::{ CallProcedureParams, CallReducerParams, CallViewParams, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall, ModuleInfo, @@ -129,6 +131,20 @@ pub struct ExecutionStats { pub memory_allocation: usize, } +impl ExecutionStats { + fn energy_used(&self) -> FunctionBudget { + self.energy.used() + } + + fn abi_duration(&self) -> Duration { + self.timings.wasm_instance_env_call_times.sum() + } + + fn total_duration(&self) -> Duration { + self.timings.total_duration + } +} + pub enum ExecutionError { User(Box), Recoverable(anyhow::Error), @@ -321,9 +337,8 @@ impl WasmModuleInstance { res } - pub fn call_view(&mut self, tx: MutTxId, params: CallViewParams) -> ViewCallResult { + pub fn call_view_with_tx(&mut self, tx: MutTxId, params: CallViewParams) -> ViewCallResult { let (res, trapped) = self.common.call_view_with_tx(tx, params, &mut self.instance); - self.trapped = trapped; res } @@ -550,7 +565,6 @@ impl InstanceCommon { // Do some `with_label_values`. // TODO(perf, centril): consider caching this. - let vm_metrics = VmMetrics::new(&database_identity, reducer_name); let _outer_span = start_call_function_span(reducer_name, &caller_identity, caller_connection_id_opt); let op = ReducerOp { @@ -565,22 +579,14 @@ impl InstanceCommon { let workload = Workload::Reducer(ReducerContext::from(op.clone())); let tx = tx.unwrap_or_else(|| stdb.begin_mut_tx(IsolationLevel::Serializable, workload)); let mut tx_slot = inst.tx_slot(); - let _guard = vm_metrics.timer_guard_for_reducer_plus_query(tx.timer); let vm_metrics = VmMetrics::new(&database_identity, reducer_name); + let _guard = vm_metrics.timer_guard_for_reducer_plus_query(tx.timer); + let (mut tx, result) = tx_slot.set(tx, || { self.call_function(caller_identity, reducer_name, |budget| inst.call_reducer(op, budget)) }); - let energy_used = result.stats.energy.used(); - let energy_quanta_used = energy_used.into(); - let timings = &result.stats.timings; - vm_metrics.report( - energy_used.get(), - result.stats.timings.total_duration, - &result.stats.timings.wasm_instance_env_call_times, - ); - // An outer error occurred. // This signifies a logic error in the module rather than a properly // handled bad argument from the caller of a reducer. @@ -626,6 +632,27 @@ impl InstanceCommon { } }; + // Only re-evaluate and update views if the reducer's execution was successful + let (out, trapped) = if !trapped && matches!(status, EventStatus::Committed(_)) { + self.call_views_with_tx(tx, caller_identity, &info.module_def, inst, timestamp) + } else { + (ViewCallResult::default(tx), trapped) + }; + + // Account for view execution in reducer reporting metrics + vm_metrics.report_energy_used(out.energy_used); + vm_metrics.report_total_duration(out.total_duration); + vm_metrics.report_abi_duration(out.abi_duration); + + let status = match out.outcome { + ViewOutcome::BudgetExceeded => EventStatus::OutOfEnergy, + ViewOutcome::Failed(err) => EventStatus::Failed(err), + ViewOutcome::Success => status, + }; + + let energy_quanta_used = result.stats.energy_used().into(); + let total_duration = result.stats.total_duration(); + let event = ModuleEvent { timestamp, caller_identity, @@ -637,16 +664,16 @@ impl InstanceCommon { }, status, energy_quanta_used, - host_execution_duration: timings.total_duration, + host_execution_duration: total_duration, request_id, timer, }; - let event = commit_and_broadcast_event(&self.info, client, event, tx); + let event = commit_and_broadcast_event(&self.info, client, event, out.tx); let res = ReducerCallResult { outcome: ReducerOutcome::from(&event.status), energy_used: energy_quanta_used, - execution_duration: timings.total_duration, + execution_duration: total_duration, }; (res, trapped) @@ -720,11 +747,13 @@ impl InstanceCommon { result } - /// Execute a view. + /// Executes a view and materializes its result, + /// deleting any previously materialized rows. /// - /// Similar to `call_reducer_with_tx`, but for views. - /// unlike to `call_reducer_with_tx`, It does not handle `tx`creation or commit, - /// It returns the updated `tx` instead. + /// Similar to [`Self::call_reducer_with_tx`], but for views. + /// However, unlike [`Self::call_reducer_with_tx`], + /// it mutates a previously allocated [`MutTxId`] and returns it. + /// It does not commit the transaction. pub(crate) fn call_view_with_tx( &mut self, tx: MutTxId, @@ -732,75 +761,93 @@ impl InstanceCommon { inst: &mut I, ) -> (ViewCallResult, bool) { let CallViewParams { - caller_identity, - caller_connection_id, + view_name, view_id, + table_id, + fn_ptr, + caller, + sender, args, - return_type, + row_type, timestamp, - view_db_id, - is_anonymous, } = params; - let info = self.info.clone(); - let view_def = info.module_def.view_by_id(view_id, is_anonymous); - let view_name = &*view_def.name; + let _outer_span = start_call_function_span(&view_name, &caller, None); let mut tx_slot = inst.tx_slot(); - - let _outer_span = start_call_function_span(view_name, &caller_identity, caller_connection_id); - - let op = ViewOp { - id: view_id, - db_id: view_db_id, - name: view_name, - caller_identity: &caller_identity, - args: &args, - timestamp, - }; - let (mut tx, result) = tx_slot.set(tx, || { - self.call_function(caller_identity, view_name, |budget| { - if is_anonymous { - inst.call_view_anon(op.into(), budget) - } else { - inst.call_view(op, budget) - } + self.call_function(caller, &view_name, |budget| match sender { + Some(sender) => inst.call_view( + ViewOp { + name: &view_name, + view_id, + table_id, + fn_ptr, + sender: &sender, + args: &args, + timestamp, + }, + budget, + ), + None => inst.call_view_anon( + AnonymousViewOp { + name: &view_name, + view_id, + table_id, + fn_ptr, + args: &args, + timestamp, + }, + budget, + ), }) }); - let trapped = matches!(result.call_result, Err(ExecutionError::Trap(_))); + let replica_ctx = inst.replica_ctx(); + let stdb = &*replica_ctx.relational_db.clone(); + let database_identity = replica_ctx.database_identity; + let vm_metrics = VmMetrics::new(&database_identity, &view_name); - let outcome = match result.call_result { - Err(ExecutionError::Recoverable(err) | ExecutionError::Trap(err)) => { - inst.log_traceback("view", view_name, &err); + // Report execution metrics on each view call + vm_metrics.report(&result.stats); + + let trapped = matches!(result.call_result, Err(ExecutionError::Trap(_))); - self.handle_outer_error(&result.stats.energy, &caller_identity, &caller_connection_id, view_name) + let outcome = match (result.call_result, sender) { + (Err(ExecutionError::Recoverable(err) | ExecutionError::Trap(err)), _) => { + inst.log_traceback("view", &view_name, &err); + self.handle_outer_error(&result.stats.energy, &caller, &None, &view_name) .into() } // TODO: maybe do something else with user errors? - Err(ExecutionError::User(err)) => { - inst.log_traceback("view", view_name, &anyhow::anyhow!(err)); - - self.handle_outer_error(&result.stats.energy, &caller_identity, &caller_connection_id, view_name) + (Err(ExecutionError::User(err)), _) => { + inst.log_traceback("view", &view_name, &anyhow::anyhow!(err)); + self.handle_outer_error(&result.stats.energy, &caller, &None, &view_name) .into() } - Ok(res) => { - let db = &inst.replica_ctx().relational_db; - db.materialize_view( + // Materialize anonymous view + (Ok(bytes), None) => { + stdb.materialize_anonymous_view(&mut tx, table_id, row_type, bytes, self.info.module_def.typespace()) + .inspect_err(|err| { + log::error!("Fatal error materializing view `{view_name}`: {err}"); + }) + .expect("Fatal error materializing view"); + ViewOutcome::Success + } + // Materialize sender view + (Ok(bytes), Some(sender)) => { + stdb.materialize_view( &mut tx, - view_name, - args, - return_type, - res, - info.module_def.typespace(), - caller_identity, + table_id, + sender, + row_type, + bytes, + self.info.module_def.typespace(), ) - .map_err(|err| { - log::info!("view returned error: {err}"); - err + .inspect_err(|err| { + log::error!("Fatal error materializing view `{view_name}`: {err}"); }) - .expect("error updating view result"); + .expect("Fatal error materializing view"); ViewOutcome::Success } }; @@ -808,12 +855,67 @@ impl InstanceCommon { let res = ViewCallResult { outcome, tx, - energy_used: result.stats.energy.used().into(), - execution_duration: result.stats.timings.total_duration, + energy_used: result.stats.energy_used(), + total_duration: result.stats.total_duration(), + abi_duration: result.stats.abi_duration(), }; (res, trapped) } + + /// A [`MutTxId`] knows which views must be updated (re-evaluated). + /// This method re-evaluates them and updates their backing tables. + pub(crate) fn call_views_with_tx( + &mut self, + tx: MutTxId, + caller: Identity, + module_def: &ModuleDef, + inst: &mut I, + timestamp: Timestamp, + ) -> (ViewCallResult, bool) { + let mut trapped = false; + let mut out = ViewCallResult::default(tx); + for ViewCallInfo { + view_id, + table_id, + view_name, + sender, + } in out.tx.view_for_update().cloned().collect::>() + { + let view_def = module_def + .view(&*view_name) + .unwrap_or_else(|| panic!("view `{}` not found", view_name)); + let fn_ptr = view_def.fn_ptr; + let args = ArgsTuple::nullary(); + let row_type = view_def.product_type_ref; + let params = CallViewParams { + view_name, + view_id, + table_id, + fn_ptr, + caller, + sender, + args, + row_type, + timestamp, + }; + let (result, ok) = self.call_view_with_tx(out.tx, params, inst); + + // Increment execution stats + out.tx = result.tx; + out.outcome = result.outcome; + out.energy_used += result.energy_used; + out.total_duration += result.total_duration; + out.abi_duration += result.abi_duration; + trapped = trapped || ok; + + // Terminate early if execution failed + if trapped || !matches!(out.outcome, ViewOutcome::Success) { + break; + } + } + (out, trapped) + } } /// VM-related metrics for reducer execution. struct VmMetrics { @@ -856,22 +958,37 @@ impl VmMetrics { self.reducer_plus_query_duration.clone().with_timer(start) } + fn report_energy_used(&self, energy_used: FunctionBudget) { + self.reducer_fuel_used.inc_by(energy_used.get()); + } + + fn report_total_duration(&self, duration: Duration) { + self.reducer_duration_usec.inc_by(duration.as_micros() as u64); + } + + fn report_abi_duration(&self, duration: Duration) { + self.reducer_abi_time_usec.inc_by(duration.as_micros() as u64); + } + /// Reports some VM metrics. - fn report(&self, fuel_used: u64, reducer_duration: Duration, abi_time: &CallTimes) { - self.reducer_fuel_used.inc_by(fuel_used); - self.reducer_duration_usec.inc_by(reducer_duration.as_micros() as u64); - self.reducer_abi_time_usec.inc_by(abi_time.sum().as_micros() as u64); + fn report(&self, stats: &ExecutionStats) { + let energy_used = stats.energy.used(); + let reducer_duration = stats.timings.total_duration; + let abi_time = stats.timings.wasm_instance_env_call_times.sum(); + self.report_energy_used(energy_used); + self.report_total_duration(reducer_duration); + self.report_abi_duration(abi_time); } } /// Starts the `call_function` span. fn start_call_function_span( - reducer_name: &str, + function_name: &str, caller_identity: &Identity, caller_connection_id_opt: Option, ) -> EnteredSpan { tracing::trace_span!("call_function", - reducer_name, + function_name, %caller_identity, caller_connection_id = caller_connection_id_opt.map(tracing::field::debug), ) @@ -961,11 +1078,12 @@ pub trait InstanceOp { /// Describes a view call in a cheaply shareable way. #[derive(Clone, Debug)] pub struct ViewOp<'a> { - pub id: ViewId, - pub db_id: ViewDatabaseId, pub name: &'a str, + pub view_id: ViewId, + pub table_id: TableId, + pub fn_ptr: ViewFnPtr, pub args: &'a ArgsTuple, - pub caller_identity: &'a Identity, + pub sender: &'a Identity, pub timestamp: Timestamp, } @@ -973,24 +1091,28 @@ impl InstanceOp for ViewOp<'_> { fn name(&self) -> &str { self.name } + fn timestamp(&self) -> Timestamp { self.timestamp } + fn call_type(&self) -> FuncCallType { - FuncCallType::View(ViewCall::with_identity( - *self.caller_identity, - self.db_id, - self.args.get_bsatn().clone(), - )) + FuncCallType::View(ViewCallInfo { + view_id: self.view_id, + table_id: self.table_id, + view_name: self.name.to_owned().into_boxed_str(), + sender: Some(*self.sender), + }) } } /// Describes an anonymous view call in a cheaply shareable way. #[derive(Clone, Debug)] pub struct AnonymousViewOp<'a> { - pub id: ViewId, - pub db_id: ViewDatabaseId, pub name: &'a str, + pub view_id: ViewId, + pub table_id: TableId, + pub fn_ptr: ViewFnPtr, pub args: &'a ArgsTuple, pub timestamp: Timestamp, } @@ -999,32 +1121,18 @@ impl InstanceOp for AnonymousViewOp<'_> { fn name(&self) -> &str { self.name } + fn timestamp(&self) -> Timestamp { self.timestamp } - fn call_type(&self) -> FuncCallType { - FuncCallType::View(ViewCall::anonymous(self.db_id, self.args.get_bsatn().clone())) - } -} -impl<'a> From> for AnonymousViewOp<'a> { - fn from( - ViewOp { - id, - db_id, - name, - args, - timestamp, - .. - }: ViewOp<'a>, - ) -> Self { - Self { - id, - db_id, - name, - args, - timestamp, - } + fn call_type(&self) -> FuncCallType { + FuncCallType::View(ViewCallInfo { + view_id: self.view_id, + table_id: self.table_id, + view_name: self.name.to_owned().into_boxed_str(), + sender: None, + }) } } diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index a7947956f62..23647ddc773 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -413,7 +413,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { prepare_store_for_call(store, budget); // Prepare sender identity and connection ID, as LITTLE-ENDIAN byte arrays. - let [sender_0, sender_1, sender_2, sender_3] = prepare_identity_for_call(*op.caller_identity); + let [sender_0, sender_1, sender_2, sender_3] = prepare_identity_for_call(*op.sender); // Prepare arguments to the reducer + the error sink & start timings. let args_bytes = op.args.get_bsatn().clone(); @@ -437,7 +437,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { call_view, &mut *store, ( - op.id.0, + op.fn_ptr.0, sender_0, sender_1, sender_2, @@ -484,7 +484,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { }; }; - let call_result = call_sync_typed_func(call_view_anon, &mut *store, (op.id.0, args_source.0, errors_sink)); + let call_result = call_sync_typed_func(call_view_anon, &mut *store, (op.fn_ptr.0, args_source.0, errors_sink)); let (stats, result_bytes) = finish_opcall(store, budget); diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index 17db1f1616d..7dfbaf1e274 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -6,7 +6,10 @@ use crate::db::relational_db::{RelationalDB, Tx}; use crate::energy::EnergyQuanta; use crate::error::DBError; use crate::estimation::estimate_rows_scanned; -use crate::host::module_host::{DatabaseTableUpdate, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall}; +use crate::host::module_host::{ + DatabaseTableUpdate, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall, ViewCallError, ViewCallResult, + ViewOutcome, +}; use crate::host::{ArgsTuple, ModuleHost}; use crate::subscription::module_subscription_actor::{ModuleSubscriptions, WriteConflict}; use crate::subscription::module_subscription_manager::TransactionOffset; @@ -264,6 +267,21 @@ pub async fn run( // Update transaction metrics tx.metrics.merge(metrics); + // Update views + let result = match module { + Some(module) => module.call_views_with_tx(tx, auth.caller).await?, + None => ViewCallResult::default(tx), + }; + + // Rollback transaction and report metrics if view execution failed + if let ViewOutcome::Failed(err) = result.outcome { + let (_, metrics, reducer) = db.rollback_mut_tx(result.tx); + db.report_mut_tx_metrics(reducer, metrics, None); + return Err(DBError::View(ViewCallError::InternalError(err))); + } + + let tx = result.tx; + // Commit the tx if there are no deltas to process if subs.is_none() { let metrics = tx.metrics; diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 89ee6f890c8..f58048fd84e 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -15,7 +15,7 @@ use crate::db::relational_db::{MutTx, RelationalDB, Tx}; use crate::error::DBError; use crate::estimation::estimate_rows_scanned; use crate::host::module_host::{DatabaseUpdate, EventStatus, ModuleEvent}; -use crate::host::{FunctionArgs, ModuleHost}; +use crate::host::ModuleHost; use crate::messages::websocket::Subscribe; use crate::subscription::query::is_subscribe_to_all_tables; use crate::subscription::{collect_table_update_for_view, execute_plans}; @@ -42,9 +42,8 @@ use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_lib::Identity; use spacetimedb_primitives::ArgId; use std::collections::HashSet; -use std::sync::OnceLock; use std::{sync::Arc, time::Instant}; -use tokio::sync::{oneshot, watch}; +use tokio::sync::oneshot; type Subscriptions = Arc>; @@ -57,7 +56,6 @@ pub struct ModuleSubscriptions { broadcast_queue: BroadcastQueue, owner_identity: Identity, stats: Arc, - module_rx: OnceLock>, } #[derive(Debug, Clone)] @@ -196,38 +194,9 @@ impl ModuleSubscriptions { broadcast_queue, owner_identity, stats, - module_rx: OnceLock::new(), } } - /// Should be called once to initialize the `ModuleSubscriptions` with a `ModuleHost` receiver. - pub fn init(&self, module_host: watch::Receiver) { - self.module_rx - .set(module_host) - .expect("ModuleSubscriptions::init called twice"); - } - - #[allow(dead_code)] - async fn call_view( - &self, - tx: MutTxId, - view_name: &str, - args: FunctionArgs, - sender: Arc, - ) -> Result<(), DBError> { - let module_host_rx = self - .module_rx - .get() - .expect("ModuleSubscriptions::init not called before call_view"); - let module_host = module_host_rx.borrow(); - - let _result = module_host - .call_view(tx, view_name, args, sender.id.identity, Some(sender.id.connection_id)) - .await; - - // TODO: Handle result - Ok(()) - } /// Construct a new [`ModuleSubscriptions`] for use in testing, /// creating a new [`tokio::runtime::Runtime`] to run its send worker. pub fn for_test_new_runtime(db: Arc) -> (ModuleSubscriptions, tokio::runtime::Runtime) { diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index 39916e289c4..00c0725a291 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -25,7 +25,7 @@ use spacetimedb_durability::TxOffset; use spacetimedb_expr::expr::CollectViews; use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_lib::{AlgebraicValue, ConnectionId, Identity, ProductValue}; -use spacetimedb_primitives::{ColId, IndexId, TableId, ViewDatabaseId}; +use spacetimedb_primitives::{ColId, IndexId, TableId, ViewId}; use spacetimedb_subscription::{JoinEdge, SubscriptionPlan, TableName}; use std::collections::BTreeMap; use std::fmt::Debug; @@ -55,7 +55,7 @@ pub struct Plan { } impl CollectViews for Plan { - fn collect_views(&self, views: &mut std::collections::HashSet) { + fn collect_views(&self, views: &mut std::collections::HashSet) { for plan in &self.plans { plan.collect_views(views); } diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index 0f55fa02a17..22e4d4d00ad 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -10,10 +10,7 @@ use crate::{ db_metrics::DB_METRICS, error::{DatastoreError, IndexError, TableError}, execution_context::ExecutionContext, - locking_tx_datastore::{ - mut_tx::{ViewCall, ViewReadSets}, - state_view::iter_st_column_for_table, - }, + locking_tx_datastore::{mut_tx::ViewReadSets, state_view::iter_st_column_for_table}, system_tables::{ system_tables, StColumnRow, StConstraintData, StConstraintRow, StIndexRow, StSequenceRow, StTableFields, StTableRow, SystemTable, ST_CLIENT_ID, ST_CLIENT_IDX, ST_COLUMN_ID, ST_COLUMN_IDX, ST_COLUMN_NAME, @@ -25,7 +22,7 @@ use crate::{ traits::TxData, }; use crate::{ - locking_tx_datastore::mut_tx::ReadSet, + locking_tx_datastore::ViewCallInfo, system_tables::{ ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID, ST_VIEW_IDX, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_IDX, ST_VIEW_SUB_ID, ST_VIEW_SUB_IDX, @@ -33,10 +30,10 @@ use crate::{ }; use anyhow::anyhow; use core::{convert::Infallible, ops::RangeBounds}; -use spacetimedb_data_structures::map::{HashMap, HashSet, IntMap, IntSet}; +use spacetimedb_data_structures::map::{HashSet, IntMap, IntSet}; use spacetimedb_durability::TxOffset; use spacetimedb_lib::{db::auth::StTableType, Identity}; -use spacetimedb_primitives::{ColId, ColList, ColSet, IndexId, TableId}; +use spacetimedb_primitives::{ColId, ColList, ColSet, IndexId, TableId, ViewId}; use spacetimedb_sats::{algebraic_value::de::ValueDeserializer, memory_usage::MemoryUsage, Deserialize}; use spacetimedb_sats::{AlgebraicValue, ProductValue}; use spacetimedb_schema::{ @@ -53,61 +50,6 @@ use std::collections::BTreeMap; use std::sync::Arc; use thin_vec::ThinVec; -type IndexKeyReadSet = HashMap>; -type IndexColReadSet = HashMap; - -#[derive(Default)] -struct CommittedReadSets { - tables: IntMap>, - index_keys: IntMap, -} - -impl MemoryUsage for CommittedReadSets { - fn heap_usage(&self) -> usize { - //TODO: fix this - //self.tables.heap_usage() + self.index_keys.heap_usage() + self.views.heap_usage() - 0 - } -} - -impl CommittedReadSets { - /// Record in the [`CommittedState`] that this view scans this table - fn view_scans_table(&mut self, view: ViewCall, table_id: TableId) { - self.tables.entry(table_id).or_default().insert(view); - } - - /// Record in the [`CommittedState`] that this view reads this index `key` for these table `cols` - fn view_reads_index_key(&mut self, view: ViewCall, table_id: TableId, cols: ColList, key: &AlgebraicValue) { - self.index_keys - .entry(table_id) - .or_default() - .entry(cols) - .or_default() - .entry(key.clone()) - .or_default() - .insert(view); - } - - /// Clear all read sets for views involving `table_id`. - /// This is called when a table is modified, - fn clear_views_for_table(&mut self, table_id: TableId) { - self.tables.remove(&table_id); - //TODO: clear from index only if stored indexed row has been updated - self.index_keys.remove(&table_id); - } - - /// Returns true if the given view exists in any read set. - /// This is used to determine whether a view needs to be re-evaluated. - fn is_materialized(&self, view: &ViewCall) -> bool { - self.tables.values().any(|views| views.contains(view)) - || self.index_keys.values().any(|col_map| { - col_map - .values() - .any(|key_map| key_map.values().any(|views| views.contains(view))) - }) - } -} - /// Contains the live, in-memory snapshot of a database. This structure /// is exposed in order to support tools wanting to process the commit /// logs directly. For normal usage, see the RelationalDB struct instead. @@ -137,7 +79,14 @@ pub struct CommittedState { /// We check each reducer's write set against these read sets. /// Any overlap will trigger a re-evaluation of the affected view, /// and its read set will be updated accordingly. - read_sets: CommittedReadSets, + read_sets: ViewReadSets, +} + +impl CommittedState { + /// Returns the views that perform a full scan of this table + pub(super) fn views_for_table_scan(&self, table_id: &TableId) -> impl Iterator { + self.read_sets.views_for_table_scan(table_id) + } } impl MemoryUsage for CommittedState { @@ -685,6 +634,10 @@ impl CommittedState { tx_data.has_rows_or_connect_disconnect(ctx.reducer_context()) } + pub(super) fn drop_view_from_read_sets(&mut self, view_id: ViewId) { + self.read_sets.remove_view(view_id) + } + pub(super) fn merge(&mut self, tx_state: TxState, read_sets: ViewReadSets, ctx: &ExecutionContext) -> TxData { let mut tx_data = TxData::default(); let mut truncates = IntSet::default(); @@ -713,7 +666,7 @@ impl CommittedState { // It's important that this happens after applying the changes to `tx_data`, // which implies `tx_data` already contains inserts and deletes for view tables // so that we can pass updated set of table ids. - self.merge_read_sets(read_sets, tx_data.table_ids_and_names().map(|(id, _)| id)); + self.merge_read_sets(read_sets); // If the TX will be logged, record its projected tx offset, // then increment the counter. @@ -725,27 +678,8 @@ impl CommittedState { tx_data } - fn merge_read_set(&mut self, view: ViewCall, read_set: ReadSet) { - for table_id in read_set.tables_scanned() { - self.read_sets.view_scans_table(view.clone(), *table_id); - } - for (table_id, index_id, key) in read_set.index_keys_scanned() { - if let Some(cols) = self - .get_schema(*table_id) - .map(|table_schema| table_schema.col_list_for_index_id(*index_id)) - { - self.read_sets.view_reads_index_key(view.clone(), *table_id, cols, key); - } - } - } - - fn merge_read_sets(&mut self, read_sets: ViewReadSets, updated_tables: impl IntoIterator) { - for (view, read_set) in read_sets { - self.merge_read_set(view, read_set); - } - for table_id in updated_tables { - self.read_sets.clear_views_for_table(table_id); - } + fn merge_read_sets(&mut self, read_sets: ViewReadSets) { + self.read_sets.merge(read_sets) } fn merge_apply_deletes( @@ -1061,10 +995,6 @@ impl CommittedState { .with_label_values(&database_identity) .set(self.blob_store.bytes_used_by_blobs() as _); } - - pub(super) fn is_materialized(&self, view: &ViewCall) -> bool { - self.read_sets.is_materialized(view) - } } pub(super) type CommitTableForInsertion<'a> = (&'a Table, &'a dyn BlobStore, &'a IndexIdMap); diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index 84fd47f9757..4a434e9ffb8 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -35,7 +35,7 @@ use spacetimedb_durability::TxOffset; use spacetimedb_lib::{db::auth::StAccess, metrics::ExecutionMetrics}; use spacetimedb_lib::{ConnectionId, Identity}; use spacetimedb_paths::server::SnapshotDirPath; -use spacetimedb_primitives::{ColList, ConstraintId, IndexId, SequenceId, TableId, ViewDatabaseId}; +use spacetimedb_primitives::{ColList, ConstraintId, IndexId, SequenceId, TableId, ViewId}; use spacetimedb_sats::{ algebraic_value::de::ValueDeserializer, bsatn, buffer::BufReader, AlgebraicValue, ProductValue, }; @@ -512,7 +512,7 @@ impl MutTxDatastore for Locking { tx.rename_table(table_id, new_name) } - fn view_id_from_name_mut_tx(&self, tx: &Self::MutTx, view_name: &str) -> Result> { + fn view_id_from_name_mut_tx(&self, tx: &Self::MutTx, view_name: &str) -> Result> { tx.view_id_from_name(view_name) } @@ -1272,7 +1272,7 @@ mod tests { use spacetimedb_lib::error::ResultTest; use spacetimedb_lib::st_var::StVarValue; use spacetimedb_lib::{resolved_type_via_v9, ScheduleAt, TimeDuration}; - use spacetimedb_primitives::{col_list, ArgId, ColId, ScheduleId, ViewDatabaseId}; + use spacetimedb_primitives::{col_list, ArgId, ColId, ScheduleId, ViewId}; use spacetimedb_sats::algebraic_value::ser::value_serialize; use spacetimedb_sats::bsatn::ToBsatn; use spacetimedb_sats::layout::RowTypeLayout; @@ -1778,23 +1778,23 @@ mod tests { ColRow { table: ST_CONNECTION_CREDENTIALS_ID.into(), pos: 0, name: "connection_id", ty: AlgebraicType::U128 }, ColRow { table: ST_CONNECTION_CREDENTIALS_ID.into(), pos: 1, name: "jwt_payload", ty: AlgebraicType::String }, - ColRow { table: ST_VIEW_ID.into(), pos: 0, name: "view_id", ty: ViewDatabaseId::get_type() }, + ColRow { table: ST_VIEW_ID.into(), pos: 0, name: "view_id", ty: ViewId::get_type() }, ColRow { table: ST_VIEW_ID.into(), pos: 1, name: "view_name", ty: AlgebraicType::String }, ColRow { table: ST_VIEW_ID.into(), pos: 2, name: "table_id", ty: AlgebraicType::option(TableId::get_type()) }, ColRow { table: ST_VIEW_ID.into(), pos: 3, name: "is_public", ty: AlgebraicType::Bool }, ColRow { table: ST_VIEW_ID.into(), pos: 4, name: "is_anonymous", ty: AlgebraicType::Bool }, - ColRow { table: ST_VIEW_PARAM_ID.into(), pos: 0, name: "view_id", ty: ViewDatabaseId::get_type() }, + ColRow { table: ST_VIEW_PARAM_ID.into(), pos: 0, name: "view_id", ty: ViewId::get_type() }, ColRow { table: ST_VIEW_PARAM_ID.into(), pos: 1, name: "param_pos", ty: ColId::get_type() }, ColRow { table: ST_VIEW_PARAM_ID.into(), pos: 2, name: "param_name", ty: AlgebraicType::String }, ColRow { table: ST_VIEW_PARAM_ID.into(), pos: 3, name: "param_type", ty: AlgebraicType::bytes() }, - ColRow { table: ST_VIEW_COLUMN_ID.into(), pos: 0, name: "view_id", ty: ViewDatabaseId::get_type() }, + ColRow { table: ST_VIEW_COLUMN_ID.into(), pos: 0, name: "view_id", ty: ViewId::get_type() }, ColRow { table: ST_VIEW_COLUMN_ID.into(), pos: 1, name: "col_pos", ty: ColId::get_type() }, ColRow { table: ST_VIEW_COLUMN_ID.into(), pos: 2, name: "col_name", ty: AlgebraicType::String }, ColRow { table: ST_VIEW_COLUMN_ID.into(), pos: 3, name: "col_type", ty: AlgebraicType::bytes() }, - ColRow { table: ST_VIEW_SUB_ID.into(), pos: 0, name: "view_id", ty: ViewDatabaseId::get_type() }, + ColRow { table: ST_VIEW_SUB_ID.into(), pos: 0, name: "view_id", ty: ViewId::get_type() }, ColRow { table: ST_VIEW_SUB_ID.into(), pos: 1, name: "arg_id", ty: ArgId::get_type() }, ColRow { table: ST_VIEW_SUB_ID.into(), pos: 2, name: "identity", ty: AlgebraicType::U256 }, ColRow { table: ST_VIEW_SUB_ID.into(), pos: 3, name: "num_subscribers", ty: AlgebraicType::U64 }, diff --git a/crates/datastore/src/locking_tx_datastore/mod.rs b/crates/datastore/src/locking_tx_datastore/mod.rs index e66f3dd0998..642cb0f63de 100644 --- a/crates/datastore/src/locking_tx_datastore/mod.rs +++ b/crates/datastore/src/locking_tx_datastore/mod.rs @@ -3,7 +3,7 @@ pub mod committed_state; pub mod datastore; mod mut_tx; -pub use mut_tx::{FuncCallType, MutTxId, ViewCall}; +pub use mut_tx::{FuncCallType, MutTxId, ViewCallInfo}; mod sequence; pub mod state_view; pub use state_view::{IterByColEqTx, IterByColRangeTx}; diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 357d155cd62..8343cc2b890 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -35,7 +35,7 @@ use core::ops::RangeBounds; use core::{cell::RefCell, mem}; use core::{iter, ops::Bound}; use smallvec::SmallVec; -use spacetimedb_data_structures::map::{IntMap, IntSet}; +use spacetimedb_data_structures::map::{HashMap, HashSet, IntMap}; use spacetimedb_durability::TxOffset; use spacetimedb_execution::{dml::MutDatastore, Datastore, DeltaStore, Row}; use spacetimedb_lib::{bsatn::ToBsatn as _, db::raw_def::v9::RawSql, metrics::ExecutionMetrics, Timestamp}; @@ -44,11 +44,12 @@ use spacetimedb_lib::{ ConnectionId, Identity, }; use spacetimedb_primitives::{ - col_list, ArgId, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewDatabaseId, + col_list, ArgId, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewId, }; use spacetimedb_sats::{ bsatn::{self, to_writer, DecodeError, Deserializer}, de::{DeserializeSeed, WithBound}, + memory_usage::MemoryUsage, product, ser::Serialize, AlgebraicType, AlgebraicValue, ProductType, ProductValue, WithTypespace, @@ -68,87 +69,92 @@ use spacetimedb_table::{ table_index::TableIndex, }; use std::{ - collections::HashMap, sync::Arc, time::{Duration, Instant}, }; type DecodeResult = core::result::Result; -/// Views track their read sets and update the [`CommittedState`] with them. -/// The [`CommittedState`] maintains these read sets in order to determine when to re-evaluate a view. +#[derive(Clone, Debug, Eq, PartialEq, Hash)] +pub struct ViewCallInfo { + pub view_id: ViewId, + pub table_id: TableId, + pub view_name: Box, + pub sender: Option, +} + +/// A data structure for tracking the database rows/keys that are read by views #[derive(Default)] -pub struct ReadSet { - table_scans: IntSet, - index_keys: IntMap>, +pub struct ViewReadSets { + tables: IntMap, } -impl ReadSet { - /// Enumerate the tables that are scanned and tracked by this read set - pub fn tables_scanned(&self) -> impl Iterator + '_ { - self.table_scans.iter() +impl MemoryUsage for ViewReadSets { + fn heap_usage(&self) -> usize { + 0 // TODO: Implement memory tracking for read sets } +} - /// Enumerate the single index keys that are tracked by this read set - pub fn index_keys_scanned(&self) -> impl Iterator + '_ { - self.index_keys - .iter() - .flat_map(|(table_id, keys)| keys.iter().map(move |(index_id, key)| (table_id, index_id, key))) +impl ViewReadSets { + /// Returns the views that perform a full scan of this table + pub fn views_for_table_scan(&self, table_id: &TableId) -> impl Iterator { + self.tables + .get(table_id) + .into_iter() + .flat_map(TableReadSet::views_for_table_scan) } - /// Track a table scan in this read set - fn insert_table_scan(&mut self, table_id: TableId) { - self.table_scans.insert(table_id); + /// Record that a view performs a full scan of this table + fn insert_scan(&mut self, table_id: TableId, call: ViewCallInfo) { + self.tables.entry(table_id).or_default().insert_scan(call); } - /// Track an index scan in this read set. - /// If we only read a single index key we record the key. - /// If we read a range, we treat it as though we scanned the entire table. - fn insert_index_scan( - &mut self, - table_id: TableId, - index_id: IndexId, - lower: Bound, - upper: Bound, - ) { - match (lower, upper) { - (Bound::Included(lower), Bound::Included(upper)) if lower == upper => { - self.index_keys.entry(table_id).or_default().insert(index_id, lower); - } - _ => { - self.table_scans.insert(table_id); - } + /// Removes keys for `view_id` from the read set + pub fn remove_view(&mut self, view_id: ViewId) { + self.tables.retain(|_, readset| { + readset.remove_view(view_id); + !readset.is_empty() + }); + } + + /// Merge or union read sets together + pub fn merge(&mut self, readset: Self) { + for (table_id, rs) in readset.tables { + self.tables.entry(table_id).or_default().merge(rs); } } } -#[derive(Clone, Debug, Eq, PartialEq, Hash)] -pub struct ViewCall { - identity: Option, - view_id: ViewDatabaseId, - //TODO: use arg_id from [`ST_VIEW_ARGS`] - args: Bytes, +/// A table-level read set for views +#[derive(Default)] +struct TableReadSet { + table_scans: HashSet, } -impl ViewCall { - pub fn anonymous(view_id: ViewDatabaseId, args: Bytes) -> Self { - Self { - identity: None, - view_id, - args, - } +impl TableReadSet { + /// Record that this view performs a full scan of this read set's table + fn insert_scan(&mut self, call: ViewCallInfo) { + self.table_scans.insert(call); } - pub fn with_identity(identity: Identity, view_id: ViewDatabaseId, args: Bytes) -> Self { - Self { - identity: Some(identity), - view_id, - args, - } + /// Returns the views that perform a full scan of this read set's table + fn views_for_table_scan(&self) -> impl Iterator { + self.table_scans.iter() + } + + /// Is this read set empty? + fn is_empty(&self) -> bool { + self.table_scans.is_empty() + } + + /// Removes keys for `view_id` from the read set + fn remove_view(&mut self, view_id: ViewId) { + self.table_scans.retain(|info| info.view_id != view_id); } - pub fn into_args(self) -> Bytes { - self.args + /// Merge or union two read sets for this table + fn merge(&mut self, readset: TableReadSet) { + self.table_scans.extend(readset.table_scans); } } @@ -157,11 +163,9 @@ impl ViewCall { pub enum FuncCallType { Reducer, Procedure, - View(ViewCall), + View(ViewCallInfo), } -pub type ViewReadSets = HashMap; - /// Represents a Mutable transaction. Holds locks for its duration /// /// The initialization of this struct is sensitive because improper @@ -180,18 +184,13 @@ pub struct MutTxId { pub metrics: ExecutionMetrics, } -static_assert_size!(MutTxId, 448); +static_assert_size!(MutTxId, 432); impl MutTxId { /// Record that a view performs a table scan in this transaction's read set pub fn record_table_scan(&mut self, op: &FuncCallType, table_id: TableId) { if let FuncCallType::View(view) = op { - self.read_sets - // TODO: change `read_sets` to the use the `HashMap` from `spacetimedb_data_structures` - // and use `entry_ref()` here - .entry(view.clone()) - .or_default() - .insert_table_scan(table_id) + self.read_sets.insert_scan(table_id, view.clone()); } } @@ -200,19 +199,33 @@ impl MutTxId { &mut self, op: &FuncCallType, table_id: TableId, - index_id: IndexId, - lower: Bound, - upper: Bound, + _: IndexId, + _: Bound, + _: Bound, ) { + // TODO: Implement read set tracking for index scans if let FuncCallType::View(view) = op { - self.read_sets - // TODO: change `read_sets` to the use the `HashMap` from `spacetimedb_data_structures - // and use `entry_ref()` here - .entry(view.clone()) - .or_default() - .insert_index_scan(table_id, index_id, lower, upper) + self.read_sets.insert_scan(table_id, view.clone()); } } + + /// Returns the views whose read sets overlaps with this transaction's write set + pub fn view_for_update(&self) -> impl Iterator { + self.tx_state + .insert_tables + .keys() + .filter(|table_id| !self.tx_state.delete_tables.contains_key(table_id)) + .chain(self.tx_state.delete_tables.keys()) + .flat_map(|table_id| self.committed_state_write_lock.views_for_table_scan(table_id)) + .collect::>() + .into_iter() + } + + /// Removes keys for `view_id` from the committed read set. + /// Used for dropping views in an auto-migration. + pub fn drop_view_from_committed_read_set(&mut self, view_id: ViewId) { + self.committed_state_write_lock.drop_view_from_read_sets(view_id) + } } impl Datastore for MutTxId { @@ -353,7 +366,7 @@ impl MutTxId { /// - Everything [`Self::create_table`] ensures. /// - The returned [`ViewId`] is unique and not [`ViewId::SENTINEL`]. /// - All view metadata maintained by the datastore is created atomically - pub fn create_view(&mut self, module_def: &ModuleDef, view_def: &ViewDef) -> Result<(ViewDatabaseId, TableId)> { + pub fn create_view(&mut self, module_def: &ModuleDef, view_def: &ViewDef) -> Result<(ViewId, TableId)> { let table_schema = TableSchema::from_view_def_for_datastore(module_def, view_def); let table_id = self.create_table(table_schema)?; @@ -378,11 +391,13 @@ impl MutTxId { } /// Drop the backing table of a view and update the system tables. - pub fn drop_view(&mut self, view_id: ViewDatabaseId) -> Result<()> { + pub fn drop_view(&mut self, view_id: ViewId) -> Result<()> { // Drop the view's metadata self.drop_st_view(view_id)?; self.drop_st_view_param(view_id)?; self.drop_st_view_column(view_id)?; + self.drop_st_view_sub(view_id)?; + self.drop_view_from_committed_read_set(view_id); // Drop the view's backing table if materialized if let StViewRow { @@ -504,7 +519,7 @@ impl MutTxId { }) } - pub fn lookup_st_view(&self, view_id: ViewDatabaseId) -> Result { + pub fn lookup_st_view(&self, view_id: ViewId) -> Result { let row = self .iter_by_col_eq(ST_VIEW_ID, StViewFields::ViewId, &view_id.into())? .next() @@ -523,7 +538,7 @@ impl MutTxId { } /// Check if view has parameters. - pub fn is_view_parameterized(&self, view_id: ViewDatabaseId) -> Result { + pub fn is_view_parameterized(&self, view_id: ViewId) -> Result { let view_id = view_id.into(); let mut iter = self.iter_by_col_eq(ST_VIEW_PARAM_ID, StViewParamFields::ViewId, &view_id)?; Ok(iter.next().is_some()) @@ -536,12 +551,12 @@ impl MutTxId { table_id: TableId, is_public: bool, is_anonymous: bool, - ) -> Result { + ) -> Result { Ok(self .insert_via_serialize_bsatn( ST_VIEW_ID, &StViewRow { - view_id: ViewDatabaseId::SENTINEL, + view_id: ViewId::SENTINEL, view_name, table_id: Some(table_id), is_public, @@ -555,7 +570,7 @@ impl MutTxId { /// For each parameter of a view, insert a row into `st_view_param`. /// This does not include the context parameter. - fn insert_into_st_view_param(&mut self, view_id: ViewDatabaseId, params: &[ViewParamDef]) -> Result<()> { + fn insert_into_st_view_param(&mut self, view_id: ViewId, params: &[ViewParamDef]) -> Result<()> { for ViewParamDef { name, col_id, ty, .. } in params { self.insert_via_serialize_bsatn( ST_VIEW_PARAM_ID, @@ -571,7 +586,7 @@ impl MutTxId { } /// For each column or field returned in a view, insert a row into `st_view_column`. - fn insert_into_st_view_column(&mut self, view_id: ViewDatabaseId, columns: &[ViewColumnDef]) -> Result<()> { + fn insert_into_st_view_column(&mut self, view_id: ViewId, columns: &[ViewColumnDef]) -> Result<()> { for def in columns { self.insert_via_serialize_bsatn( ST_VIEW_COLUMN_ID, @@ -637,20 +652,25 @@ impl MutTxId { } /// Drops the row in `st_view` for this `view_id` - fn drop_st_view(&mut self, view_id: ViewDatabaseId) -> Result<()> { + fn drop_st_view(&mut self, view_id: ViewId) -> Result<()> { self.delete_col_eq(ST_VIEW_ID, StViewFields::ViewId.col_id(), &view_id.into()) } /// Drops the rows in `st_view_param` for this `view_id` - fn drop_st_view_param(&mut self, view_id: ViewDatabaseId) -> Result<()> { + fn drop_st_view_param(&mut self, view_id: ViewId) -> Result<()> { self.delete_col_eq(ST_VIEW_PARAM_ID, StViewParamFields::ViewId.col_id(), &view_id.into()) } /// Drops the rows in `st_view_column` for this `view_id` - fn drop_st_view_column(&mut self, view_id: ViewDatabaseId) -> Result<()> { + fn drop_st_view_column(&mut self, view_id: ViewId) -> Result<()> { self.delete_col_eq(ST_VIEW_COLUMN_ID, StViewColumnFields::ViewId.col_id(), &view_id.into()) } + /// Drops the rows in `st_view_sub` for this `view_id` + fn drop_st_view_sub(&mut self, view_id: ViewId) -> Result<()> { + self.delete_col_eq(ST_VIEW_SUB_ID, StViewSubFields::ViewId.col_id(), &view_id.into()) + } + pub fn drop_table(&mut self, table_id: TableId) -> Result<()> { self.clear_table(table_id)?; @@ -718,7 +738,7 @@ impl MutTxId { Ok(ret) } - pub fn view_id_from_name(&self, view_name: &str) -> Result> { + pub fn view_id_from_name(&self, view_name: &str) -> Result> { let view_name = &view_name.into(); let row = self .iter_by_col_eq(ST_VIEW_ID, StViewFields::ViewName, view_name)? @@ -767,31 +787,6 @@ impl MutTxId { Ok((tx, commit)) } - - /// Checks whether a materialized view exists for the given view name, arguments, and sender identity. - /// - /// If view is not materialized, [`RelationalDB::evaluate_view`] should be called to compute and store it. - /// - /// - `view_name`: The name of the view to look up. - /// - `args`: The serialized (bastn-encoded) arguments for the view. - /// - `sender`: The identity of the sender requesting the view. - pub fn is_materialized(&self, view_name: &str, args: Bytes, sender: Identity) -> Result<(bool, Bytes)> { - let (view_id, is_anonymous) = self - .view_from_name(view_name)? - .map(|view_row| (view_row.view_id, view_row.is_anonymous)) - .ok_or_else(|| anyhow::anyhow!("view `{view_name}` not found"))?; - - let view_call = if is_anonymous { - ViewCall::anonymous(view_id, args) - } else { - ViewCall::with_identity(sender, view_id, args) - }; - - let is_materialized = - self.read_sets.contains_key(&view_call) || self.committed_state_write_lock.is_materialized(&view_call); - - Ok((is_materialized, view_call.into_args())) - } } impl MutTxId { @@ -1874,7 +1869,7 @@ impl<'a, I: Iterator>> Iterator for FilterDeleted<'a, I> { impl MutTxId { /// Does this caller have an entry for `view_id` in `st_view_sub`? - pub fn is_view_materialized(&self, view_id: ViewDatabaseId, arg_id: ArgId, sender: Identity) -> Result { + pub fn is_view_materialized(&self, view_id: ViewId, arg_id: ArgId, sender: Identity) -> Result { use StViewSubFields::*; let sender = IdentityViaU256(sender); let cols = col_list![ViewId, ArgId, Identity]; @@ -1887,7 +1882,7 @@ impl MutTxId { /// /// This is invoked when calling a view, but not subscribing to it. /// Such is the case for the sql http api. - pub fn update_view_timestamp(&mut self, view_id: ViewDatabaseId, arg_id: ArgId, sender: Identity) -> Result<()> { + pub fn update_view_timestamp(&mut self, view_id: ViewId, arg_id: ArgId, sender: Identity) -> Result<()> { use StViewSubFields::*; let identity = IdentityViaU256(sender); @@ -1924,7 +1919,7 @@ impl MutTxId { /// Increment `num_subscribers` in `st_view_sub` to effectively subscribe a caller to a view. /// We insert a row if there are no current subscribers and the row does not exist. - pub fn subscribe_view(&mut self, view_id: ViewDatabaseId, arg_id: ArgId, sender: Identity) -> Result<()> { + pub fn subscribe_view(&mut self, view_id: ViewId, arg_id: ArgId, sender: Identity) -> Result<()> { use StViewSubFields::*; let identity = IdentityViaU256(sender); @@ -1968,7 +1963,7 @@ impl MutTxId { } /// Decrement `num_subscribers` in `st_view_sub` to effectively unsubscribe a caller from a view. - pub fn unsubscribe_view(&mut self, view_id: ViewDatabaseId, arg_id: ArgId, sender: Identity) -> Result<()> { + pub fn unsubscribe_view(&mut self, view_id: ViewId, arg_id: ArgId, sender: Identity) -> Result<()> { use StViewSubFields::*; let identity = IdentityViaU256(sender); @@ -2076,7 +2071,7 @@ impl MutTxId { } /// Lookup a row in `st_view` by its primary key - fn st_view_row(&self, view_id: ViewDatabaseId) -> Result> { + fn st_view_row(&self, view_id: ViewId) -> Result> { self.iter_by_col_eq(ST_VIEW_ID, col_list![StViewFields::ViewId], &view_id.into())? .next() .map(StViewRow::try_from) @@ -2085,7 +2080,7 @@ impl MutTxId { /// Get the [`TableId`] for this view's backing table by probing `st_view`. /// Note, all views with at least one subscriber are materialized. - pub fn get_table_id_for_view(&self, view_id: ViewDatabaseId) -> Result> { + pub fn get_table_id_for_view(&self, view_id: ViewId) -> Result> { Ok(self .st_view_row(view_id)? .and_then(|row| row.table_id.map(|id| (id, row.is_anonymous)))) diff --git a/crates/datastore/src/locking_tx_datastore/state_view.rs b/crates/datastore/src/locking_tx_datastore/state_view.rs index 6a953684298..41fb7eb3dc6 100644 --- a/crates/datastore/src/locking_tx_datastore/state_view.rs +++ b/crates/datastore/src/locking_tx_datastore/state_view.rs @@ -13,7 +13,7 @@ use core::ops::RangeBounds; use spacetimedb_lib::ConnectionId; use spacetimedb_primitives::{ColList, TableId}; use spacetimedb_sats::AlgebraicValue; -use spacetimedb_schema::schema::{ColumnSchema, TableSchema, ViewInfo}; +use spacetimedb_schema::schema::{ColumnSchema, TableSchema, ViewDefInfo}; use spacetimedb_table::{ blob_store::HashMapBlobStore, table::{IndexScanRangeIter, RowRef, Table, TableScanIter}, @@ -122,7 +122,7 @@ pub trait StateView { .transpose()?; // Look up the view info for the table in question, if any. - let view_info: Option = self + let view_info: Option = self .iter_by_col_eq( ST_VIEW_ID, StViewFields::TableId, @@ -136,7 +136,7 @@ pub trait StateView { .next() .is_some(); - Ok(ViewInfo { + Ok(ViewDefInfo { view_id: row.view_id, has_args, is_anonymous: row.is_anonymous, diff --git a/crates/datastore/src/system_tables.rs b/crates/datastore/src/system_tables.rs index e6539df8453..f8c921e931b 100644 --- a/crates/datastore/src/system_tables.rs +++ b/crates/datastore/src/system_tables.rs @@ -798,7 +798,7 @@ impl From for ProductValue { #[sats(crate = spacetimedb_lib)] pub struct StViewRow { /// An auto-inc id for each view - pub view_id: ViewDatabaseId, + pub view_id: ViewId, /// The name of the view function as defined in the module pub view_name: Box, /// The [`TableId`] for this view if materialized. @@ -907,7 +907,7 @@ impl From for StColumnRow { #[sats(crate = spacetimedb_lib)] pub struct StViewColumnRow { /// A foreign key referencing [`ST_VIEW_NAME`]. - pub view_id: ViewDatabaseId, + pub view_id: ViewId, pub col_pos: ColId, pub col_name: Box, pub col_type: AlgebraicTypeViaBytes, @@ -922,7 +922,7 @@ pub struct StViewColumnRow { #[sats(crate = spacetimedb_lib)] pub struct StViewParamRow { /// A foreign key referencing [`ST_VIEW_NAME`]. - pub view_id: ViewDatabaseId, + pub view_id: ViewId, pub param_pos: ColId, pub param_name: Box, pub param_type: AlgebraicTypeViaBytes, @@ -936,7 +936,7 @@ pub struct StViewParamRow { #[derive(Debug, Clone, Eq, PartialEq, SpacetimeType)] #[sats(crate = spacetimedb_lib)] pub struct StViewSubRow { - pub view_id: ViewDatabaseId, + pub view_id: ViewId, pub arg_id: ArgId, pub identity: IdentityViaU256, pub num_subscribers: u64, diff --git a/crates/datastore/src/traits.rs b/crates/datastore/src/traits.rs index f3992c68d24..b04b2444863 100644 --- a/crates/datastore/src/traits.rs +++ b/crates/datastore/src/traits.rs @@ -509,7 +509,7 @@ pub trait MutTxDatastore: TxDatastore + MutTx { fn schema_for_table_mut_tx(&self, tx: &Self::MutTx, table_id: TableId) -> Result>; fn drop_table_mut_tx(&self, tx: &mut Self::MutTx, table_id: TableId) -> Result<()>; fn rename_table_mut_tx(&self, tx: &mut Self::MutTx, table_id: TableId, new_name: &str) -> Result<()>; - fn view_id_from_name_mut_tx(&self, tx: &Self::MutTx, view_name: &str) -> Result>; + fn view_id_from_name_mut_tx(&self, tx: &Self::MutTx, view_name: &str) -> Result>; fn table_id_from_name_mut_tx(&self, tx: &Self::MutTx, table_name: &str) -> Result>; fn table_id_exists_mut_tx(&self, tx: &Self::MutTx, table_id: &TableId) -> bool; fn table_name_from_id_mut_tx<'a>(&'a self, tx: &'a Self::MutTx, table_id: TableId) -> Result>>; diff --git a/crates/expr/src/expr.rs b/crates/expr/src/expr.rs index a4debeab56d..451b9f3498c 100644 --- a/crates/expr/src/expr.rs +++ b/crates/expr/src/expr.rs @@ -1,22 +1,22 @@ use std::{collections::HashSet, sync::Arc}; use spacetimedb_lib::{query::Delta, AlgebraicType, AlgebraicValue}; -use spacetimedb_primitives::{TableId, ViewDatabaseId}; +use spacetimedb_primitives::{TableId, ViewId}; use spacetimedb_schema::schema::TableOrViewSchema; use spacetimedb_sql_parser::ast::{BinOp, LogOp}; pub trait CollectViews { - fn collect_views(&self, views: &mut HashSet); + fn collect_views(&self, views: &mut HashSet); } impl CollectViews for Arc { - fn collect_views(&self, views: &mut HashSet) { + fn collect_views(&self, views: &mut HashSet) { self.as_ref().collect_views(views); } } impl CollectViews for Vec { - fn collect_views(&self, views: &mut HashSet) { + fn collect_views(&self, views: &mut HashSet) { for item in self { item.collect_views(views); } @@ -44,7 +44,7 @@ pub enum ProjectName { } impl CollectViews for ProjectName { - fn collect_views(&self, views: &mut HashSet) { + fn collect_views(&self, views: &mut HashSet) { match self { Self::None(expr) | Self::Some(expr, _) => expr.collect_views(views), } @@ -173,7 +173,7 @@ pub enum AggType { } impl CollectViews for ProjectList { - fn collect_views(&self, views: &mut HashSet) { + fn collect_views(&self, views: &mut HashSet) { match self { Self::Limit(proj, _) => { proj.collect_views(views); @@ -259,7 +259,7 @@ pub struct Relvar { } impl CollectViews for RelExpr { - fn collect_views(&self, views: &mut HashSet) { + fn collect_views(&self, views: &mut HashSet) { self.visit(&mut |expr| { if let Self::RelVar(Relvar { schema, .. }) = expr { if let Some(info) = &schema.view_info { diff --git a/crates/physical-plan/src/plan.rs b/crates/physical-plan/src/plan.rs index dd4159cd72c..8853543cbfd 100644 --- a/crates/physical-plan/src/plan.rs +++ b/crates/physical-plan/src/plan.rs @@ -13,7 +13,7 @@ use spacetimedb_expr::{ StatementSource, }; use spacetimedb_lib::{identity::AuthCtx, query::Delta, sats::size_of::SizeOf, AlgebraicValue, ProductValue}; -use spacetimedb_primitives::{ColId, ColSet, IndexId, TableId, ViewDatabaseId}; +use spacetimedb_primitives::{ColId, ColSet, IndexId, TableId, ViewId}; use spacetimedb_schema::schema::{IndexSchema, TableSchema}; use spacetimedb_sql_parser::ast::{BinOp, LogOp}; use spacetimedb_table::table::RowRef; @@ -73,7 +73,7 @@ impl DerefMut for ProjectPlan { } impl CollectViews for ProjectPlan { - fn collect_views(&self, views: &mut HashSet) { + fn collect_views(&self, views: &mut HashSet) { match self { Self::None(plan) | Self::Name(plan, ..) => plan.collect_views(views), } @@ -253,7 +253,7 @@ pub enum PhysicalPlan { } impl CollectViews for PhysicalPlan { - fn collect_views(&self, views: &mut HashSet) { + fn collect_views(&self, views: &mut HashSet) { self.visit(&mut |plan| { let view_info = match plan { Self::TableScan(scan, _) => &scan.schema.view_info, diff --git a/crates/primitives/src/ids.rs b/crates/primitives/src/ids.rs index 062da0446fb..942a3456669 100644 --- a/crates/primitives/src/ids.rs +++ b/crates/primitives/src/ids.rs @@ -80,9 +80,10 @@ auto_inc_system_id!(TableId); system_id! { /// An identifier for a view, unique within a database. - pub struct ViewDatabaseId(pub u32); + /// It is stored in the db as the primary key column of `st_view`. + pub struct ViewId(pub u32); } -auto_inc_system_id!(ViewDatabaseId); +auto_inc_system_id!(ViewId); system_id! { /// An identifier for a list of arguments passed to a view. @@ -138,9 +139,15 @@ system_id! { } system_id! { - /// the index of a view as defined in a module's view list. + /// The index of a view as defined in a module's view lists. + /// + /// Unlike reducers and procedures, the module maintains two lists for views. + /// One for `ViewContext` and the other for `AnonymousViewContext`. + /// As such, this index does not uniquely identify a view on its own. + /// You must know which list this index refers to. + /// // This is never stored in a system table, but is useful to have defined here. - pub struct ViewId(pub u32); + pub struct ViewFnPtr(pub u32); } /// An id for a function exported from a module, which may be a reducer or a procedure. diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index fab5c541187..8a316f44c19 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -9,7 +9,7 @@ pub use attr::{AttributeKind, ColumnAttribute, ConstraintKind, Constraints}; pub use col_list::{ColList, ColOrCols, ColSet}; pub use ids::{ ArgId, ColId, ConstraintId, FunctionId, IndexId, ProcedureId, ReducerId, ScheduleId, SequenceId, TableId, - ViewDatabaseId, ViewId, + ViewFnPtr, ViewId, }; /// The minimum size of a chunk yielded by a wasm abi RowIter. diff --git a/crates/sats/src/convert.rs b/crates/sats/src/convert.rs index b42fba8aa7e..ba38c6b2d74 100644 --- a/crates/sats/src/convert.rs +++ b/crates/sats/src/convert.rs @@ -1,7 +1,7 @@ use crate::sum_value::SumTag; use crate::{i256, u256}; use crate::{AlgebraicType, AlgebraicValue, ProductType, ProductValue}; -use spacetimedb_primitives::{ArgId, ColId, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewDatabaseId}; +use spacetimedb_primitives::{ArgId, ColId, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewId}; impl crate::Value for AlgebraicValue { type Type = AlgebraicType; @@ -65,7 +65,7 @@ macro_rules! system_id { } system_id!(ArgId); system_id!(TableId); -system_id!(ViewDatabaseId); +system_id!(ViewId); system_id!(ColId); system_id!(SequenceId); system_id!(IndexId); diff --git a/crates/sats/src/de/impls.rs b/crates/sats/src/de/impls.rs index 7649fa525e4..d0c9a92c23d 100644 --- a/crates/sats/src/de/impls.rs +++ b/crates/sats/src/de/impls.rs @@ -743,7 +743,7 @@ impl FieldNameVisitor<'_> for TupleNameVisitor<'_> { impl_deserialize!([] spacetimedb_primitives::ArgId, de => u64::deserialize(de).map(Self)); impl_deserialize!([] spacetimedb_primitives::TableId, de => u32::deserialize(de).map(Self)); -impl_deserialize!([] spacetimedb_primitives::ViewDatabaseId, de => u32::deserialize(de).map(Self)); +impl_deserialize!([] spacetimedb_primitives::ViewId, de => u32::deserialize(de).map(Self)); impl_deserialize!([] spacetimedb_primitives::SequenceId, de => u32::deserialize(de).map(Self)); impl_deserialize!([] spacetimedb_primitives::IndexId, de => u32::deserialize(de).map(Self)); impl_deserialize!([] spacetimedb_primitives::ConstraintId, de => u32::deserialize(de).map(Self)); diff --git a/crates/sats/src/ser/impls.rs b/crates/sats/src/ser/impls.rs index 6abd9e32a94..9baac393dff 100644 --- a/crates/sats/src/ser/impls.rs +++ b/crates/sats/src/ser/impls.rs @@ -259,7 +259,7 @@ impl_serialize!([] ValueWithType<'_, ArrayValue>, (self, ser) => { impl_serialize!([] spacetimedb_primitives::ArgId, (self, ser) => ser.serialize_u64(self.0)); impl_serialize!([] spacetimedb_primitives::TableId, (self, ser) => ser.serialize_u32(self.0)); -impl_serialize!([] spacetimedb_primitives::ViewDatabaseId, (self, ser) => ser.serialize_u32(self.0)); +impl_serialize!([] spacetimedb_primitives::ViewId, (self, ser) => ser.serialize_u32(self.0)); impl_serialize!([] spacetimedb_primitives::SequenceId, (self, ser) => ser.serialize_u32(self.0)); impl_serialize!([] spacetimedb_primitives::IndexId, (self, ser) => ser.serialize_u32(self.0)); impl_serialize!([] spacetimedb_primitives::ConstraintId, (self, ser) => ser.serialize_u32(self.0)); diff --git a/crates/sats/src/typespace.rs b/crates/sats/src/typespace.rs index 6451cf650a0..f7d0978d660 100644 --- a/crates/sats/src/typespace.rs +++ b/crates/sats/src/typespace.rs @@ -413,7 +413,7 @@ impl_st!([T] Option, ts => AlgebraicType::option(T::make_type(ts))); impl_st!([] spacetimedb_primitives::ArgId, AlgebraicType::U64); impl_st!([] spacetimedb_primitives::ColId, AlgebraicType::U16); impl_st!([] spacetimedb_primitives::TableId, AlgebraicType::U32); -impl_st!([] spacetimedb_primitives::ViewDatabaseId, AlgebraicType::U32); +impl_st!([] spacetimedb_primitives::ViewId, AlgebraicType::U32); impl_st!([] spacetimedb_primitives::IndexId, AlgebraicType::U32); impl_st!([] spacetimedb_primitives::SequenceId, AlgebraicType::U32); impl_st!([] spacetimedb_primitives::ConstraintId, AlgebraicType::U32); diff --git a/crates/schema/src/auto_migrate.rs b/crates/schema/src/auto_migrate.rs index 9952d187211..df984f67091 100644 --- a/crates/schema/src/auto_migrate.rs +++ b/crates/schema/src/auto_migrate.rs @@ -546,7 +546,7 @@ fn auto_migrate_view<'def>(plan: &mut AutoMigratePlan<'def>, old: &'def ViewDef, // 2. If we change the order of the columns or parameters // 3. If we change the types of the columns or parameters // 4. If we change the context parameter - let Any(incompatible_return_type) = diff(plan.old, plan.new, |def| { + let Any(_incompatible_return_type) = diff(plan.old, plan.new, |def| { def.lookup_expect::(key).return_columns.iter() }) .map(|col_diff| { @@ -575,7 +575,7 @@ fn auto_migrate_view<'def>(plan: &mut AutoMigratePlan<'def>, old: &'def ViewDef, }) .collect(); - let Any(incompatible_param_types) = diff(plan.old, plan.new, |def| { + let Any(_incompatible_param_types) = diff(plan.old, plan.new, |def| { def.lookup_expect::(key).param_columns.iter() }) .map(|col_diff| { @@ -604,15 +604,24 @@ fn auto_migrate_view<'def>(plan: &mut AutoMigratePlan<'def>, old: &'def ViewDef, }) .collect(); - if old.is_anonymous != new.is_anonymous || incompatible_return_type || incompatible_param_types { - plan.steps.push(AutoMigrateStep::AddView(new.key())); - plan.steps.push(AutoMigrateStep::RemoveView(old.key())); - - if !plan.disconnects_all_users() { - plan.steps.push(AutoMigrateStep::DisconnectAllUsers); - } - } else { - plan.steps.push(AutoMigrateStep::UpdateView(old.key())); + // TODO: Uncomment and re-enable view auto-migrations without disconnecting clients + // + // if old.is_anonymous != new.is_anonymous || incompatible_return_type || incompatible_param_types { + // plan.steps.push(AutoMigrateStep::AddView(new.key())); + // plan.steps.push(AutoMigrateStep::RemoveView(old.key())); + + // if !plan.disconnects_all_users() { + // plan.steps.push(AutoMigrateStep::DisconnectAllUsers); + // } + // } else { + // plan.steps.push(AutoMigrateStep::UpdateView(old.key())); + // } + + plan.steps.push(AutoMigrateStep::AddView(new.key())); + plan.steps.push(AutoMigrateStep::RemoveView(old.key())); + + if !plan.disconnects_all_users() { + plan.steps.push(AutoMigrateStep::DisconnectAllUsers); } Ok(()) @@ -1904,18 +1913,31 @@ mod tests { let plan = ponder_auto_migrate(&old_def, &new_def).expect("auto migration should succeed"); let steps = &plan.steps[..]; - assert!(!plan.disconnects_all_users(), "{name}, plan: {plan:#?}"); + // TODO: Assert that we don't disconnect users once we have automatic view update in auto-migrations + // + // assert!(!plan.disconnects_all_users(), "{name}, plan: {plan:#?}"); + + // assert!( + // steps.contains(&AutoMigrateStep::UpdateView(&my_view)), + // "{name}, steps: {steps:?}" + // ); + // assert!( + // !steps.contains(&AutoMigrateStep::AddView(&my_view)), + // "{name}, steps: {steps:?}" + // ); + // assert!( + // !steps.contains(&AutoMigrateStep::RemoveView(&my_view)), + // "{name}, steps: {steps:?}" + // ); + + assert!(plan.disconnects_all_users(), "{name}, plan: {plan:#?}"); assert!( - steps.contains(&AutoMigrateStep::UpdateView(&my_view)), - "{name}, steps: {steps:?}" - ); - assert!( - !steps.contains(&AutoMigrateStep::AddView(&my_view)), + steps.contains(&AutoMigrateStep::AddView(&my_view)), "{name}, steps: {steps:?}" ); assert!( - !steps.contains(&AutoMigrateStep::RemoveView(&my_view)), + steps.contains(&AutoMigrateStep::RemoveView(&my_view)), "{name}, steps: {steps:?}" ); } diff --git a/crates/schema/src/def.rs b/crates/schema/src/def.rs index d04fcd061b2..6efb552c864 100644 --- a/crates/schema/src/def.rs +++ b/crates/schema/src/def.rs @@ -37,7 +37,7 @@ use spacetimedb_lib::db::raw_def::v9::{ RawUniqueConstraintDataV9, RawViewDefV9, TableAccess, TableType, }; use spacetimedb_lib::{ProductType, RawModuleDef}; -use spacetimedb_primitives::{ColId, ColList, ColOrCols, ColSet, ProcedureId, ReducerId, TableId, ViewId}; +use spacetimedb_primitives::{ColId, ColList, ColOrCols, ColSet, ProcedureId, ReducerId, TableId, ViewFnPtr}; use spacetimedb_sats::{AlgebraicType, AlgebraicValue}; use spacetimedb_sats::{AlgebraicTypeRef, Typespace}; @@ -246,9 +246,9 @@ impl ModuleDef { } /// Convenience method to look up a view, possibly by a string, returning its id as well. - pub fn view_full>(&self, name: &K) -> Option<(ViewId, &ViewDef)> { + pub fn view_full>(&self, name: &K) -> Option<(ViewFnPtr, &ViewDef)> { // If the string IS a valid identifier, we can just look it up. - self.views.get(name).map(|def| (def.index, def)) + self.views.get(name).map(|def| (def.fn_ptr, def)) } /// Convenience method to look up a reducer, possibly by a string. @@ -295,15 +295,6 @@ impl ModuleDef { self.procedures.get_index(id.idx()).map(|(_, def)| def) } - /// Look up a view by its id, panicking if it doesn't exist. - pub fn view_by_id(&self, id: ViewId, is_anonymoys: bool) -> &ViewDef { - self.views - .iter() - .find(|(_, def)| def.index == id && def.is_anonymous == is_anonymoys) - .expect("view id not found") - .1 - } - /// Looks up a lifecycle reducer defined in the module. pub fn lifecycle_reducer(&self, lifecycle: Lifecycle) -> Option<(ReducerId, &ReducerDef)> { self.lifecycle_reducers[lifecycle].map(|i| (i, &self.reducers[i.idx()])) @@ -1118,7 +1109,7 @@ pub struct ViewDef { /// It represents the unique index of this view within the module. /// Module contains separate list for anonymous and non-anonymous views, /// so `is_anonymous` is needed to fully identify the view along with this index. - pub index: ViewId, + pub fn_ptr: ViewFnPtr, /// The parameters of the view. /// @@ -1180,7 +1171,7 @@ impl From for RawViewDefV9 { is_public, params, return_type, - index, + fn_ptr: index, .. } = val; RawViewDefV9 { diff --git a/crates/schema/src/def/validate/v9.rs b/crates/schema/src/def/validate/v9.rs index dde4eee7a6e..43e31b3c1ae 100644 --- a/crates/schema/src/def/validate/v9.rs +++ b/crates/schema/src/def/validate/v9.rs @@ -542,7 +542,7 @@ impl ModuleValidator<'_> { is_anonymous, is_public, params, - index: index.into(), + fn_ptr: index.into(), params_for_generate: ProductTypeDef { elements: params_for_generate, recursive: false, // A `ProductTypeDef` not stored in a `Typespace` cannot be recursive. diff --git a/crates/schema/src/schema.rs b/crates/schema/src/schema.rs index d958476ab64..51826292454 100644 --- a/crates/schema/src/schema.rs +++ b/crates/schema/src/schema.rs @@ -51,13 +51,13 @@ pub trait Schema: Sized { } #[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct ViewInfo { - pub view_id: ViewDatabaseId, +pub struct ViewDefInfo { + pub view_id: ViewId, pub has_args: bool, pub is_anonymous: bool, } -impl ViewInfo { +impl ViewDefInfo { pub fn num_private_cols(&self) -> usize { (if self.is_anonymous { 0 } else { 1 }) + (if self.has_args { 1 } else { 0 }) } @@ -67,7 +67,7 @@ impl ViewInfo { #[derive(Debug, Clone, PartialEq, Eq)] pub struct TableOrViewSchema { pub table_id: TableId, - pub view_info: Option, + pub view_info: Option, pub table_name: Box, pub table_access: StAccess, inner: Arc, @@ -111,22 +111,22 @@ impl TableOrViewSchema { /// Hence columns in this list should be looked up by their [`ColId`] - not their position. pub fn public_columns(&self) -> &[ColumnSchema] { match self.view_info { - Some(ViewInfo { + Some(ViewDefInfo { has_args: true, is_anonymous: false, .. }) => &self.inner.columns[2..], - Some(ViewInfo { + Some(ViewDefInfo { has_args: true, is_anonymous: true, .. }) => &self.inner.columns[1..], - Some(ViewInfo { + Some(ViewDefInfo { has_args: false, is_anonymous: false, .. }) => &self.inner.columns[1..], - Some(ViewInfo { + Some(ViewDefInfo { has_args: false, is_anonymous: true, .. @@ -155,7 +155,7 @@ pub struct TableSchema { pub table_name: Box, /// Is this the backing table of a view? - pub view_info: Option, + pub view_info: Option, /// The columns of the table. /// The ordering of the columns is significant. Columns are frequently identified by `ColId`, that is, position in this list. @@ -202,7 +202,7 @@ impl TableSchema { pub fn new( table_id: TableId, table_name: Box, - view_info: Option, + view_info: Option, columns: Vec, indexes: Vec, constraints: Vec, @@ -740,8 +740,8 @@ impl TableSchema { StAccess::Private }; - let view_info = ViewInfo { - view_id: ViewDatabaseId::SENTINEL, + let view_info = ViewDefInfo { + view_id: ViewId::SENTINEL, has_args: !param_columns.is_empty(), is_anonymous: *is_anonymous, }; @@ -862,8 +862,8 @@ impl TableSchema { StAccess::Private }; - let view_info = ViewInfo { - view_id: ViewDatabaseId::SENTINEL, + let view_info = ViewDefInfo { + view_id: ViewId::SENTINEL, has_args: !param_columns.is_empty(), is_anonymous: *is_anonymous, }; diff --git a/crates/schema/tests/ensure_same_schema.rs b/crates/schema/tests/ensure_same_schema.rs index 25039c0aba5..e6bba12a64f 100644 --- a/crates/schema/tests/ensure_same_schema.rs +++ b/crates/schema/tests/ensure_same_schema.rs @@ -32,7 +32,10 @@ fn assert_identical_modules(module_name_prefix: &str, lang_name: &str, suffix: & diff.retain(|step| { !matches!( step, - AutoMigrateStep::AddView(_) | AutoMigrateStep::RemoveView(_) | AutoMigrateStep::UpdateView(_) + AutoMigrateStep::DisconnectAllUsers + | AutoMigrateStep::AddView(_) + | AutoMigrateStep::RemoveView(_) + | AutoMigrateStep::UpdateView(_) ) }); diff --git a/crates/subscription/src/lib.rs b/crates/subscription/src/lib.rs index c7787b2ad80..3405c9cf10a 100644 --- a/crates/subscription/src/lib.rs +++ b/crates/subscription/src/lib.rs @@ -9,7 +9,7 @@ use spacetimedb_execution::{ use spacetimedb_expr::{check::SchemaView, expr::CollectViews}; use spacetimedb_lib::{identity::AuthCtx, metrics::ExecutionMetrics, query::Delta, AlgebraicValue}; use spacetimedb_physical_plan::plan::{IxJoin, IxScan, Label, PhysicalPlan, ProjectPlan, Sarg, TableScan, TupleField}; -use spacetimedb_primitives::{ColId, ColList, IndexId, TableId, ViewDatabaseId}; +use spacetimedb_primitives::{ColId, ColList, IndexId, TableId, ViewId}; use spacetimedb_query::compile_subscription; use std::sync::Arc; use std::{collections::HashSet, ops::RangeBounds}; @@ -364,7 +364,7 @@ pub struct SubscriptionPlan { } impl CollectViews for SubscriptionPlan { - fn collect_views(&self, views: &mut HashSet) { + fn collect_views(&self, views: &mut HashSet) { self.plan_opt.collect_views(views); } } diff --git a/crates/table/src/read_column.rs b/crates/table/src/read_column.rs index af07d24caa3..1cef02b2ff0 100644 --- a/crates/table/src/read_column.rs +++ b/crates/table/src/read_column.rs @@ -327,7 +327,7 @@ macro_rules! impl_read_column_via_from { impl_read_column_via_from! { u64 => spacetimedb_primitives::ArgId; u16 => spacetimedb_primitives::ColId; - u32 => spacetimedb_primitives::ViewDatabaseId; + u32 => spacetimedb_primitives::ViewId; u32 => spacetimedb_primitives::TableId; u32 => spacetimedb_primitives::IndexId; u32 => spacetimedb_primitives::ConstraintId;