Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions crates/core/src/subscription/delta.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use anyhow::Result;
use hashbrown::HashMap;
use spacetimedb_execution::{Datastore, DeltaStore};
use spacetimedb_execution::{Datastore, DeltaStore, Row};
use spacetimedb_lib::metrics::ExecutionMetrics;
use spacetimedb_primitives::ColList;
use spacetimedb_sats::product_value::InvalidFieldError;
use spacetimedb_subscription::SubscriptionPlan;
use spacetimedb_vm::relation::RelValue;

Expand Down Expand Up @@ -30,16 +32,26 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>(
let mut duplicate_rows_evaluated = 0;
let mut duplicate_rows_sent = 0;

let col_list = ColList::from_iter(plan.num_private_cols()..plan.num_cols());

let maybe_project = |row: Row<'a>| -> Result<RelValue<'a>, InvalidFieldError> {
if plan.is_view() {
Ok(row.project_product(&col_list)?.into())
} else {
Ok(row.into())
}
};

if !plan.is_join() {
// Single table plans will never return redundant rows,
// so there's no need to track row counts.
plan.for_each_insert(tx, metrics, &mut |row| {
inserts.push(row.into());
inserts.push(maybe_project(row)?);
Ok(())
})?;

plan.for_each_delete(tx, metrics, &mut |row| {
deletes.push(row.into());
deletes.push(maybe_project(row)?);
Ok(())
})?;
} else {
Expand All @@ -49,6 +61,7 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>(
let mut delete_counts = HashMap::new();

plan.for_each_insert(tx, metrics, &mut |row| {
let row = maybe_project(row)?;
let n = insert_counts.entry(row).or_default();
if *n > 0 {
duplicate_rows_evaluated += 1;
Expand All @@ -58,6 +71,7 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>(
})?;

plan.for_each_delete(tx, metrics, &mut |row| {
let row = maybe_project(row)?;
match insert_counts.get_mut(&row) {
// We have not seen an insert for this row.
// If we have seen a delete, increment the metric.
Expand Down Expand Up @@ -93,11 +107,11 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>(

for (row, n) in insert_counts.into_iter().filter(|(_, n)| *n > 0) {
duplicate_rows_sent += n as u64 - 1;
inserts.extend(std::iter::repeat_n(row, n).map(RelValue::from));
inserts.extend(std::iter::repeat_n(row, n));
}
for (row, n) in delete_counts.into_iter().filter(|(_, n)| *n > 0) {
duplicate_rows_sent += n as u64 - 1;
deletes.extend(std::iter::repeat_n(row, n).map(RelValue::from));
deletes.extend(std::iter::repeat_n(row, n));
}
}

Expand Down
10 changes: 9 additions & 1 deletion crates/execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use spacetimedb_lib::{
AlgebraicValue, ProductValue,
};
use spacetimedb_physical_plan::plan::{ProjectField, TupleField};
use spacetimedb_primitives::{IndexId, TableId};
use spacetimedb_primitives::{ColList, IndexId, TableId};
use spacetimedb_sats::product_value::InvalidFieldError;
use spacetimedb_table::{static_assert_size, table::RowRef};

pub mod dml;
Expand Down Expand Up @@ -121,6 +122,13 @@ impl Row<'_> {
Self::Ref(val) => (*val).clone(),
}
}

pub fn project_product(self, cols: &ColList) -> Result<ProductValue, InvalidFieldError> {
match self {
Self::Ptr(ptr) => ptr.project_product(cols),
Self::Ref(val) => val.project_product(cols),
}
}
}

impl_serialize!(['a] Row<'a>, (self, ser) => match self {
Expand Down
23 changes: 23 additions & 0 deletions crates/subscription/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,29 @@ impl SubscriptionPlan {
self.fragments.insert_plans.len() > 1 && self.fragments.delete_plans.len() > 1
}

/// Does this plan return rows from a view?
pub fn is_view(&self) -> bool {
self.plan_opt.returns_view_table()
}

/// The number of columns returned.
/// Only relevant if [`Self::is_view`] is true.
pub fn num_cols(&self) -> usize {
self.plan_opt
.return_table()
.map(|schema| schema.num_cols())
.unwrap_or_default()
}

/// The number of private columns returned.
/// Only relevant if [`Self::is_view`] is true.
pub fn num_private_cols(&self) -> usize {
self.plan_opt
.return_table()
.map(|schema| schema.num_private_cols())
.unwrap_or_default()
}

/// To which table does this plan subscribe?
pub fn subscribed_table_id(&self) -> TableId {
self.return_id
Expand Down
3 changes: 2 additions & 1 deletion crates/vm/src/relation.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use core::hash::{Hash, Hasher};
use derive_more::From;
use spacetimedb_execution::Row;
use spacetimedb_lib::db::auth::StAccess;
use spacetimedb_sats::bsatn::{ser::BsatnError, ToBsatn};
Expand All @@ -15,7 +16,7 @@ use std::sync::Arc;
/// or an ephemeral row constructed during query execution.
///
/// A `RelValue` is the type generated/consumed by queries.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, From)]
pub enum RelValue<'a> {
/// A reference to a row in a table.
Row(RowRef<'a>),
Expand Down
8 changes: 4 additions & 4 deletions smoketests/tests/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,19 +551,19 @@ class SubscribeViews(Smoketest):
}
"""

def _test_subscribing_with_different_identities(self):
def test_subscribing_with_different_identities(self):
"""Tests different clients subscribing to a client-specific view"""

# Insert an identity for Alice
self.call("insert_player", "Alice")

# Generate and insert a new identity for Bob
# Generate a new identity for Bob
self.reset_config()
self.new_identity()
self.call("insert_player", "Bob")

# Subscribe to `my_player` as Bob
sub = self.subscribe("select * from my_player", n=0)
sub = self.subscribe("select * from my_player", n=1)
self.call("insert_player", "Bob")
events = sub()

# Project out the identity field.
Expand Down
Loading