Skip to content
2 changes: 1 addition & 1 deletion crates/client-api-messages/src/energy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl fmt::Debug for EnergyBalance {
/// In contrast to [`EnergyQuanta`], this is represented by a 64-bit integer. This makes energy handling
/// for reducers easier, while still providing a unlikely-to-ever-be-reached maximum value (e.g. for wasmtime:
/// `(u64::MAX eV / 1000 eV/instruction) * 3 ns/instruction = 640 days`)
#[derive(Copy, Clone, From, Add, Sub)]
#[derive(Copy, Clone, From, Add, Sub, AddAssign, SubAssign)]
pub struct FunctionBudget(u64);

impl FunctionBudget {
Expand Down
228 changes: 102 additions & 126 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::db::MetricsRecorderQueue;
use crate::error::{DBError, DatabaseError, RestoreSnapshotError};
use crate::host::ArgsTuple;
use crate::messages::control_db::HostType;
use crate::subscription::ExecutionCounters;
use crate::util::{asyncify, spawn_rayon};
Expand All @@ -9,7 +8,6 @@ use anyhow::{anyhow, Context};
use bytes::Bytes;
use enum_map::EnumMap;
use fs2::FileExt;
use log::trace;
use spacetimedb_commitlog::repo::OnNewSegmentFn;
use spacetimedb_commitlog::{self as commitlog, SizeOnDisk};
use spacetimedb_data_structures::map::IntSet;
Expand Down Expand Up @@ -46,7 +44,9 @@ use spacetimedb_paths::server::{CommitLogDir, ReplicaDir, SnapshotsPath};
use spacetimedb_primitives::*;
use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type;
use spacetimedb_sats::memory_usage::MemoryUsage;
use spacetimedb_sats::{AlgebraicType, AlgebraicTypeRef, AlgebraicValue, ProductType, ProductValue, Typespace};
use spacetimedb_sats::{
AlgebraicType, AlgebraicTypeRef, AlgebraicValue, ProductType, ProductValue, Typespace, WithTypespace,
};
use spacetimedb_schema::def::{ModuleDef, TableDef, ViewDef};
use spacetimedb_schema::schema::{
ColumnSchema, IndexSchema, RowLevelSecuritySchema, Schema, SequenceSchema, TableSchema,
Expand Down Expand Up @@ -1078,11 +1078,11 @@ impl RelationalDB {
tx: &mut MutTx,
module_def: &ModuleDef,
view_def: &ViewDef,
) -> Result<(ViewDatabaseId, TableId), DBError> {
) -> Result<(ViewId, TableId), DBError> {
Ok(tx.create_view(module_def, view_def)?)
}

pub fn drop_view(&self, tx: &mut MutTx, view_id: ViewDatabaseId) -> Result<(), DBError> {
pub fn drop_view(&self, tx: &mut MutTx, view_id: ViewId) -> Result<(), DBError> {
Ok(tx.drop_view(view_id)?)
}

Expand Down Expand Up @@ -1173,7 +1173,7 @@ impl RelationalDB {
Ok(self.inner.rename_table_mut_tx(tx, table_id, new_name)?)
}

pub fn view_id_from_name_mut(&self, tx: &MutTx, view_name: &str) -> Result<Option<ViewDatabaseId>, DBError> {
pub fn view_id_from_name_mut(&self, tx: &MutTx, view_name: &str) -> Result<Option<ViewId>, DBError> {
Ok(self.inner.view_id_from_name_mut_tx(tx, view_name)?)
}

Expand Down Expand Up @@ -1509,93 +1509,122 @@ impl RelationalDB {
})
}

/// Materialize View backing table.
/// Write `bytes` into a (sender) view's backing table.
///
/// # Process
/// 1. Serializes view arguments into `ST_VIEW_ARG_ID`
/// 2. Deletes stale rows matching the view arguments
/// 3. Deserializes the new view execution results
/// 4. Inserts fresh rows with the corresponding arg_id
/// 1. Delete all rows for `sender` from the view's backing table
/// 2. Deserialize `bytes`
/// 3. Insert the new rows into the backing table
///
/// # Arguments
/// * `tx` - Mutable transaction context
/// * `view` - Name of the view to update
/// * `args` - Arguments passed to the view call
/// * `return_type` - Expected return type of the view
/// * `bytes` - Serialized (bsatn encoded) return value from view execution
/// * `table_id` - The id of the view's backing table
/// * `sender` - The calling identity of the view being updated
/// * `row_type` - Expected return type of the view
/// * `bytes` - An array of product values (bsatn encoded)
/// * `typespace` - Type information for deserialization
/// * `caller_identity` - Identity of the caller (for non-anonymous views)
#[allow(clippy::too_many_arguments)]
pub fn materialize_view(
&self,
tx: &mut MutTxId,
view: &str,
args: ArgsTuple,
return_type: AlgebraicTypeRef,
table_id: TableId,
sender: Identity,
row_type: AlgebraicTypeRef,
bytes: Bytes,
typespace: &Typespace,
caller_identity: Identity,
) -> Result<(), DBError> {
// Fetch view metadata
let st_view_row = tx.lookup_st_view_by_name(view)?;
let table_id = st_view_row.table_id.expect("View table must exist for materialization");
let view_id = st_view_row.view_id;
let is_anonymous = st_view_row.is_anonymous;
let arg_id = tx.get_or_insert_st_view_arg(args.get_bsatn())?;

// Build the filter key for identifying rows to update
let mut input_args = Vec::new();
if !is_anonymous {
input_args.push(AlgebraicValue::OptionSome(caller_identity.into()));
}
if !tx.is_view_parameterized(view_id)? {
input_args.push(AlgebraicValue::U64(arg_id));
}
let input_args = ProductValue {
elements: input_args.into_boxed_slice(),
};
let col_list: ColList = (0..input_args.elements.len()).collect();

// Remove stale View entries
let rows_to_delete: Vec<_> = self
.iter_by_col_eq_mut(tx, table_id, col_list, &input_args.clone().into())?
// Delete rows for `sender` from the backing table
let rows_to_delete = self
.iter_by_col_eq_mut(tx, table_id, ColId(0), &sender.into())?
.map(|res| res.pointer())
.collect();

let deleted_count = self.delete(tx, table_id, rows_to_delete);
trace!("Deleted {deleted_count} stale rows from view table {table_id} for arg_id {arg_id}");

// Deserialize the return value
let seed = spacetimedb_sats::WithTypespace::new(typespace, &return_type).resolve(return_type);
let return_val = seed
.collect::<Vec<_>>();
self.delete(tx, table_id, rows_to_delete);

// Deserialize the return rows.
// The return type is expected to be an array of products.
let row_type = typespace.resolve(row_type);
let ret_type = AlgebraicType::array(row_type.ty().clone());
let seed = WithTypespace::new(typespace, &ret_type);
let rows = seed
.deserialize(bsatn::Deserializer::new(&mut &bytes[..]))
.map_err(|e| DatastoreError::from(ViewError::DeserializeReturn(e.to_string())))?;

// Extract products from return value (must be array)
let products: Vec<ProductValue> = return_val
// Insert new rows into the backing table
for product in rows
.into_array()
.expect("Expected return_val to be an array")
.map_err(|_| ViewError::SerializeRow)
.map_err(DatastoreError::from)?
.into_iter()
.map(|v| v.into_product().expect("Expected array elements to be ProductValue"))
.collect();

// Insert fresh results into the view table
let mut elements: Vec<AlgebraicValue> =
Vec::with_capacity(input_args.elements.len() + products.first().map_or(0, |p| p.elements.len()));
for product in products {
elements.clear();
elements.extend_from_slice(&input_args.elements);
elements.extend_from_slice(&product.elements);
{
let product = product
.into_product()
.map_err(|_| ViewError::SerializeRow)
.map_err(DatastoreError::from)?;
self.insert(
tx,
table_id,
&ProductValue::from_iter(std::iter::once(sender.into()).chain(product.elements))
.to_bsatn_vec()
.map_err(|_| ViewError::SerializeRow)
.map_err(DatastoreError::from)?,
)?;
}

let row = ProductValue {
elements: elements.as_slice().into(),
};
Ok(())
}

let row_bytes = row
.to_bsatn_vec()
.map_err(|_| DatastoreError::from(ViewError::SerializeRow))?;
/// Write `bytes` into an anonymous view's backing table.
///
/// # Process
/// 1. Clear the view's backing table
/// 2. Deserialize `bytes`
/// 3. Insert the new rows into the backing table
///
/// # Arguments
/// * `tx` - Mutable transaction context
/// * `table_id` - The id of the view's backing table
/// * `row_type` - Expected return type of the view
/// * `bytes` - An array of product values (bsatn encoded)
/// * `typespace` - Type information for deserialization
#[allow(clippy::too_many_arguments)]
pub fn materialize_anonymous_view(
&self,
tx: &mut MutTxId,
table_id: TableId,
row_type: AlgebraicTypeRef,
bytes: Bytes,
typespace: &Typespace,
) -> Result<(), DBError> {
// Clear entire backing table
self.clear_table(tx, table_id)?;

// Deserialize the return rows.
// The return type is expected to be an array of products.
let row_type = typespace.resolve(row_type);
let ret_type = AlgebraicType::array(row_type.ty().clone());
let seed = WithTypespace::new(typespace, &ret_type);
let rows = seed
.deserialize(bsatn::Deserializer::new(&mut &bytes[..]))
.map_err(|e| DatastoreError::from(ViewError::DeserializeReturn(e.to_string())))?;

self.insert(tx, table_id, &row_bytes)?;
// Insert new rows into the backing table
for product in rows
.into_array()
.map_err(|_| ViewError::SerializeRow)
.map_err(DatastoreError::from)?
.into_iter()
{
self.insert(
tx,
table_id,
&product
.into_product()
.map_err(|_| ViewError::SerializeRow)
.map_err(DatastoreError::from)?
.to_bsatn_vec()
.map_err(|_| ViewError::SerializeRow)
.map_err(DatastoreError::from)?,
)?;
}

Ok(())
Expand Down Expand Up @@ -2195,7 +2224,7 @@ pub mod tests_utils {
name: &str,
schema: &[(&str, AlgebraicType)],
is_anonymous: bool,
) -> Result<(ViewDatabaseId, TableId), DBError> {
) -> Result<(ViewId, TableId), DBError> {
let mut builder = RawModuleDefV9Builder::new();

// Add the view's product type to the typespace
Expand Down Expand Up @@ -2314,18 +2343,16 @@ mod tests {
use super::tests_utils::begin_mut_tx;
use super::*;
use crate::db::relational_db::tests_utils::{
begin_tx, create_view_for_test, insert, make_snapshot, with_auto_commit, with_read_only, TestDB,
begin_tx, insert, make_snapshot, with_auto_commit, with_read_only, TestDB,
};
use anyhow::bail;
use bytes::Bytes;
use commitlog::payload::txdata;
use commitlog::Commitlog;
use durability::EmptyHistory;
use pretty_assertions::{assert_eq, assert_matches};
use spacetimedb_data_structures::map::IntMap;
use spacetimedb_datastore::error::{DatastoreError, IndexError};
use spacetimedb_datastore::execution_context::ReducerContext;
use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, ViewCall};
use spacetimedb_datastore::system_tables::{
system_tables, StConstraintRow, StIndexRow, StSequenceRow, StTableRow, ST_CONSTRAINT_ID, ST_INDEX_ID,
ST_SEQUENCE_ID, ST_TABLE_ID,
Expand Down Expand Up @@ -2958,57 +2985,6 @@ mod tests {
Ok(())
}

#[test]
fn test_is_materialized() -> anyhow::Result<()> {
let stdb = TestDB::in_memory()?;
let schema = [("col1", AlgebraicType::I64), ("col2", AlgebraicType::I64)];
let table_schema = table("MyTable", ProductType::from(schema), |b| b);

let view_schema = [("view_col", AlgebraicType::I64)];
let view_name = "MyView";
let args: Bytes = vec![].into();
let sender = Identity::ZERO;
let (view_id, _) = create_view_for_test(&stdb, view_name, &view_schema, true)?;

let mut tx = begin_mut_tx(&stdb);
let table_id = stdb.create_table(&mut tx, table_schema)?;

assert!(
!tx.is_materialized(view_name, args.clone(), sender)?.0,
"view should not be materialized as read set is not recorded yet"
);

let view_call = FuncCallType::View(ViewCall::anonymous(view_id, args));
tx.record_table_scan(&view_call, table_id);
assert!(
tx.is_materialized(view_name, vec![].into(), sender)?.0,
"view should be materialized as read set is recorded"
);
stdb.commit_tx(tx)?;

let tx = begin_mut_tx(&stdb);
assert!(
tx.is_materialized(view_name, vec![].into(), sender)?.0,
"view should be materialized after commit"
);
stdb.commit_tx(tx)?;

let mut tx = begin_mut_tx(&stdb);
stdb.insert(
&mut tx,
table_id,
&product![AlgebraicValue::I64(1), AlgebraicValue::I64(2)].to_bsatn_vec()?,
)?;
stdb.commit_tx(tx)?;

let tx = begin_mut_tx(&stdb);
assert!(
!tx.is_materialized(view_name, vec![].into(), sender)?.0,
"view should not be materialized after table modification"
);
Ok(())
}

#[test]
/// Test that iteration yields each row only once
/// in the edge case where a row is committed and has been deleted and re-inserted within the iterating TX.
Expand Down
16 changes: 0 additions & 16 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,22 +171,6 @@ impl From<&EventStatus> for ReducerOutcome {
}
}

pub enum ViewOutcome {
Success,
Failed(String),
BudgetExceeded,
}

impl From<EventStatus> for ViewOutcome {
fn from(status: EventStatus) -> Self {
match status {
EventStatus::Committed(_) => ViewOutcome::Success,
EventStatus::Failed(e) => ViewOutcome::Failed(e),
EventStatus::OutOfEnergy => ViewOutcome::BudgetExceeded,
}
}
}

#[derive(Clone, Debug)]
pub struct ProcedureCallResult {
pub return_val: AlgebraicValue,
Expand Down
Loading
Loading