diff --git a/crates/core/src/subscription/delta.rs b/crates/core/src/subscription/delta.rs index 2fc484c4c65..744e4986f11 100644 --- a/crates/core/src/subscription/delta.rs +++ b/crates/core/src/subscription/delta.rs @@ -1,7 +1,9 @@ use anyhow::Result; use hashbrown::HashMap; -use spacetimedb_execution::{Datastore, DeltaStore}; +use spacetimedb_execution::{Datastore, DeltaStore, Row}; use spacetimedb_lib::metrics::ExecutionMetrics; +use spacetimedb_primitives::ColList; +use spacetimedb_sats::product_value::InvalidFieldError; use spacetimedb_subscription::SubscriptionPlan; use spacetimedb_vm::relation::RelValue; @@ -30,16 +32,26 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>( let mut duplicate_rows_evaluated = 0; let mut duplicate_rows_sent = 0; + let col_list = ColList::from_iter(plan.num_private_cols()..plan.num_cols()); + + let maybe_project = |row: Row<'a>| -> Result, InvalidFieldError> { + if plan.is_view() { + Ok(row.project_product(&col_list)?.into()) + } else { + Ok(row.into()) + } + }; + if !plan.is_join() { // Single table plans will never return redundant rows, // so there's no need to track row counts. plan.for_each_insert(tx, metrics, &mut |row| { - inserts.push(row.into()); + inserts.push(maybe_project(row)?); Ok(()) })?; plan.for_each_delete(tx, metrics, &mut |row| { - deletes.push(row.into()); + deletes.push(maybe_project(row)?); Ok(()) })?; } else { @@ -49,6 +61,7 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>( let mut delete_counts = HashMap::new(); plan.for_each_insert(tx, metrics, &mut |row| { + let row = maybe_project(row)?; let n = insert_counts.entry(row).or_default(); if *n > 0 { duplicate_rows_evaluated += 1; @@ -58,6 +71,7 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>( })?; plan.for_each_delete(tx, metrics, &mut |row| { + let row = maybe_project(row)?; match insert_counts.get_mut(&row) { // We have not seen an insert for this row. // If we have seen a delete, increment the metric. @@ -93,11 +107,11 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>( for (row, n) in insert_counts.into_iter().filter(|(_, n)| *n > 0) { duplicate_rows_sent += n as u64 - 1; - inserts.extend(std::iter::repeat_n(row, n).map(RelValue::from)); + inserts.extend(std::iter::repeat_n(row, n)); } for (row, n) in delete_counts.into_iter().filter(|(_, n)| *n > 0) { duplicate_rows_sent += n as u64 - 1; - deletes.extend(std::iter::repeat_n(row, n).map(RelValue::from)); + deletes.extend(std::iter::repeat_n(row, n)); } } diff --git a/crates/execution/src/lib.rs b/crates/execution/src/lib.rs index b20563f3c4c..79d782272b9 100644 --- a/crates/execution/src/lib.rs +++ b/crates/execution/src/lib.rs @@ -11,7 +11,8 @@ use spacetimedb_lib::{ AlgebraicValue, ProductValue, }; use spacetimedb_physical_plan::plan::{ProjectField, TupleField}; -use spacetimedb_primitives::{IndexId, TableId}; +use spacetimedb_primitives::{ColList, IndexId, TableId}; +use spacetimedb_sats::product_value::InvalidFieldError; use spacetimedb_table::{static_assert_size, table::RowRef}; pub mod dml; @@ -121,6 +122,13 @@ impl Row<'_> { Self::Ref(val) => (*val).clone(), } } + + pub fn project_product(self, cols: &ColList) -> Result { + match self { + Self::Ptr(ptr) => ptr.project_product(cols), + Self::Ref(val) => val.project_product(cols), + } + } } impl_serialize!(['a] Row<'a>, (self, ser) => match self { diff --git a/crates/subscription/src/lib.rs b/crates/subscription/src/lib.rs index 3405c9cf10a..d0f4668459b 100644 --- a/crates/subscription/src/lib.rs +++ b/crates/subscription/src/lib.rs @@ -375,6 +375,29 @@ impl SubscriptionPlan { self.fragments.insert_plans.len() > 1 && self.fragments.delete_plans.len() > 1 } + /// Does this plan return rows from a view? + pub fn is_view(&self) -> bool { + self.plan_opt.returns_view_table() + } + + /// The number of columns returned. + /// Only relevant if [`Self::is_view`] is true. + pub fn num_cols(&self) -> usize { + self.plan_opt + .return_table() + .map(|schema| schema.num_cols()) + .unwrap_or_default() + } + + /// The number of private columns returned. + /// Only relevant if [`Self::is_view`] is true. + pub fn num_private_cols(&self) -> usize { + self.plan_opt + .return_table() + .map(|schema| schema.num_private_cols()) + .unwrap_or_default() + } + /// To which table does this plan subscribe? pub fn subscribed_table_id(&self) -> TableId { self.return_id diff --git a/crates/vm/src/relation.rs b/crates/vm/src/relation.rs index 1c96413e986..2d625369f87 100644 --- a/crates/vm/src/relation.rs +++ b/crates/vm/src/relation.rs @@ -1,4 +1,5 @@ use core::hash::{Hash, Hasher}; +use derive_more::From; use spacetimedb_execution::Row; use spacetimedb_lib::db::auth::StAccess; use spacetimedb_sats::bsatn::{ser::BsatnError, ToBsatn}; @@ -15,7 +16,7 @@ use std::sync::Arc; /// or an ephemeral row constructed during query execution. /// /// A `RelValue` is the type generated/consumed by queries. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, From)] pub enum RelValue<'a> { /// A reference to a row in a table. Row(RowRef<'a>), diff --git a/smoketests/tests/views.py b/smoketests/tests/views.py index 2b5a9de2e34..483f8bab4b3 100644 --- a/smoketests/tests/views.py +++ b/smoketests/tests/views.py @@ -551,19 +551,19 @@ class SubscribeViews(Smoketest): } """ - def _test_subscribing_with_different_identities(self): + def test_subscribing_with_different_identities(self): """Tests different clients subscribing to a client-specific view""" # Insert an identity for Alice self.call("insert_player", "Alice") - # Generate and insert a new identity for Bob + # Generate a new identity for Bob self.reset_config() self.new_identity() - self.call("insert_player", "Bob") # Subscribe to `my_player` as Bob - sub = self.subscribe("select * from my_player", n=0) + sub = self.subscribe("select * from my_player", n=1) + self.call("insert_player", "Bob") events = sub() # Project out the identity field.