diff --git a/crates/core/src/db/durability.rs b/crates/core/src/db/durability.rs index 0ea88bdb4b9..5f925824953 100644 --- a/crates/core/src/db/durability.rs +++ b/crates/core/src/db/durability.rs @@ -6,11 +6,9 @@ use spacetimedb_commitlog::payload::{ txdata::{Mutations, Ops}, Txdata, }; -use spacetimedb_data_structures::map::IntSet; use spacetimedb_datastore::{execution_context::ReducerContext, traits::TxData}; use spacetimedb_durability::{DurableOffset, TxOffset}; use spacetimedb_lib::Identity; -use spacetimedb_primitives::TableId; use tokio::{ runtime, sync::{ @@ -208,32 +206,23 @@ impl DurabilityWorkerActor { return; } - let is_persistent_table = |table_id: &TableId| -> bool { !tx_data.is_ephemeral_table(table_id) }; - - let inserts: Box<_> = tx_data - .inserts() - // Skip ephemeral tables - .filter(|(table_id, _)| is_persistent_table(table_id)) - .map(|(table_id, rowdata)| Ops { - table_id: *table_id, - rowdata: rowdata.clone(), - }) + let mut inserts: Box<_> = tx_data + .persistent_inserts() + .map(|(table_id, rowdata)| Ops { table_id, rowdata }) .collect(); - - let truncates: IntSet = tx_data.truncates().collect(); - - let deletes: Box<_> = tx_data - .deletes() - .filter(|(table_id, _)| is_persistent_table(table_id)) - .map(|(table_id, rowdata)| Ops { - table_id: *table_id, - rowdata: rowdata.clone(), - }) - // filter out deletes for tables that are truncated in the same transaction. - .filter(|ops| !truncates.contains(&ops.table_id)) + // What we get from `tx_data` is not necessarily sorted, + // but the durability layer expects by-table_id sorted data. + // Unstable sorts are valid, there will only ever be one entry per table_id. + inserts.sort_unstable_by_key(|ops| ops.table_id); + + let mut deletes: Box<_> = tx_data + .persistent_deletes() + .map(|(table_id, rowdata)| Ops { table_id, rowdata }) .collect(); + deletes.sort_unstable_by_key(|ops| ops.table_id); - let truncates: Box<_> = truncates.into_iter().filter(is_persistent_table).collect(); + let mut truncates: Box<[_]> = tx_data.persistent_truncates().collect(); + truncates.sort_unstable_by_key(|table_id| *table_id); let inputs = reducer_context.map(|rcx| rcx.into()); diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 1055292b444..40f425ba13e 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -37,13 +37,13 @@ use indexmap::IndexSet; use itertools::Itertools; use prometheus::{Histogram, IntGauge}; use scopeguard::ScopeGuard; +use smallvec::SmallVec; use spacetimedb_auth::identity::ConnectionAuthCtx; use spacetimedb_client_api_messages::energy::FunctionBudget; use spacetimedb_client_api_messages::websocket::{ ByteListLen, Compression, OneOffTable, QueryUpdate, Subscribe, SubscribeMulti, SubscribeSingle, }; use spacetimedb_data_structures::error_stream::ErrorStream; -use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap}; use spacetimedb_datastore::error::DatastoreError; use spacetimedb_datastore::execution_context::{Workload, WorkloadType}; use spacetimedb_datastore::locking_tx_datastore::{MutTxId, ViewCallInfo}; @@ -54,11 +54,10 @@ use spacetimedb_expr::expr::CollectViews; use spacetimedb_lib::db::raw_def::v9::Lifecycle; use spacetimedb_lib::identity::{AuthCtx, RequestId}; use spacetimedb_lib::metrics::ExecutionMetrics; -use spacetimedb_lib::Timestamp; -use spacetimedb_lib::{AlgebraicType, ConnectionId}; +use spacetimedb_lib::{ConnectionId, Timestamp}; use spacetimedb_primitives::{ArgId, ProcedureId, TableId, ViewFnPtr, ViewId}; use spacetimedb_query::compile_subscription; -use spacetimedb_sats::{AlgebraicTypeRef, ProductValue}; +use spacetimedb_sats::{AlgebraicType, AlgebraicTypeRef, ProductValue}; use spacetimedb_schema::auto_migrate::{AutoMigrateError, MigrationPolicy}; use spacetimedb_schema::def::{ModuleDef, ProcedureDef, ReducerDef, TableDef, ViewDef}; use spacetimedb_schema::schema::{Schema, TableSchema}; @@ -73,7 +72,7 @@ use tokio::sync::oneshot; #[derive(Debug, Default, Clone, From)] pub struct DatabaseUpdate { - pub tables: Vec, + pub tables: SmallVec<[DatabaseTableUpdate; 1]>, } impl FromIterator for DatabaseUpdate { @@ -93,26 +92,15 @@ impl DatabaseUpdate { } pub fn from_writes(tx_data: &TxData) -> Self { - let mut map: IntMap = IntMap::new(); - let new_update = |table_id, table_name: &str| DatabaseTableUpdate { + let entries = tx_data.iter_table_entries(); + let mut tables = SmallVec::with_capacity(entries.len()); + tables.extend(entries.map(|(table_id, e)| DatabaseTableUpdate { table_id, - table_name: table_name.into(), - inserts: [].into(), - deletes: [].into(), - }; - for (table_id, table_name, rows) in tx_data.inserts_with_table_name() { - map.entry(*table_id) - .or_insert_with(|| new_update(*table_id, table_name)) - .inserts = rows.clone(); - } - for (table_id, table_name, rows) in tx_data.deletes_with_table_name() { - map.entry(*table_id) - .or_insert_with(|| new_update(*table_id, table_name)) - .deletes = rows.clone(); - } - DatabaseUpdate { - tables: map.into_values().collect(), - } + table_name: e.table_name.clone(), + inserts: e.inserts.clone(), + deletes: e.deletes.clone(), + })); + DatabaseUpdate { tables } } /// The number of rows in the payload diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index 48982de2066..62d2bc00780 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -17,6 +17,7 @@ use crate::subscription::tx::DeltaTx; use crate::util::slow::SlowQueryLogger; use crate::vm::{check_row_limit, DbProgram, TxMode}; use anyhow::anyhow; +use smallvec::SmallVec; use spacetimedb_datastore::execution_context::Workload; use spacetimedb_datastore::locking_tx_datastore::state_view::StateView; use spacetimedb_datastore::traits::IsolationLevel; @@ -42,7 +43,7 @@ pub struct StmtResult { pub(crate) fn collect_result( result: &mut Vec, - updates: &mut Vec, + updates: &mut SmallVec<[DatabaseTableUpdate; 1]>, r: CodeResult, ) -> Result<(), DBError> { match r { @@ -74,7 +75,7 @@ fn execute( p: &mut DbProgram<'_, '_>, ast: Vec, sql: &str, - updates: &mut Vec, + updates: &mut SmallVec<[DatabaseTableUpdate; 1]>, ) -> Result, DBError> { let slow_query_threshold = if let TxMode::Tx(tx) = p.tx { p.db.query_limit(tx)?.map(Duration::from_millis) @@ -102,7 +103,7 @@ pub fn execute_sql( subs: Option<&ModuleSubscriptions>, ) -> Result, DBError> { if CrudExpr::is_reads(&ast) { - let mut updates = Vec::new(); + let mut updates = SmallVec::new(); db.with_read_only(Workload::Sql, |tx| { execute( &mut DbProgram::new(db, &mut TxMode::Tx(tx), auth), @@ -112,7 +113,7 @@ pub fn execute_sql( ) }) } else if subs.is_none() { - let mut updates = Vec::new(); + let mut updates = SmallVec::new(); db.with_auto_commit(Workload::Sql, |mut_tx| { execute( &mut DbProgram::new(db, &mut mut_tx.into(), auth), @@ -123,7 +124,7 @@ pub fn execute_sql( }) } else { let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Sql); - let mut updates = Vec::with_capacity(ast.len()); + let mut updates = SmallVec::with_capacity(ast.len()); let res = execute( &mut DbProgram::new(db, &mut (&mut tx).into(), auth.clone()), ast, @@ -170,7 +171,7 @@ pub fn execute_sql_tx<'a>( return Ok(None); } - let mut updates = Vec::new(); // No subscription updates in this path, because it requires owning the tx. + let mut updates = SmallVec::new(); // No subscription updates in this path, because it requires owning the tx. execute(&mut DbProgram::new(db, &mut tx, auth), ast, sql, &mut updates).map(Some) } diff --git a/crates/core/src/subscription/execution_unit.rs b/crates/core/src/subscription/execution_unit.rs index daf071133f7..0f11f0ff01d 100644 --- a/crates/core/src/subscription/execution_unit.rs +++ b/crates/core/src/subscription/execution_unit.rs @@ -3,7 +3,8 @@ use super::subscription::{IncrementalJoin, SupportedQuery}; use crate::db::relational_db::{RelationalDB, Tx}; use crate::error::DBError; use crate::estimation; -use crate::host::module_host::{DatabaseTableUpdate, DatabaseTableUpdateRelValue, UpdatesRelValue}; +use crate::host::module_host::DatabaseTableUpdate; +use crate::host::module_host::{DatabaseTableUpdateRelValue, UpdatesRelValue}; use crate::messages::websocket::TableUpdate; use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilderSource}; use crate::util::slow::SlowQueryLogger; diff --git a/crates/core/src/subscription/query.rs b/crates/core/src/subscription/query.rs index 968a092e5d2..eef449f3db5 100644 --- a/crates/core/src/subscription/query.rs +++ b/crates/core/src/subscription/query.rs @@ -162,6 +162,7 @@ mod tests { use crate::vm::tests::create_table_with_rows; use crate::vm::DbProgram; use itertools::Itertools; + use smallvec::SmallVec; use spacetimedb_client_api_messages::websocket::{BsatnFormat, CompressableQueryUpdate, Compression}; use spacetimedb_datastore::execution_context::Workload; use spacetimedb_lib::bsatn; @@ -195,7 +196,7 @@ mod tests { let q = Expr::Crud(Box::new(CrudExpr::Query(query.clone()))); let mut result = Vec::with_capacity(1); - let mut updates = Vec::new(); + let mut updates = SmallVec::new(); collect_result(&mut result, &mut updates, run_ast(p, q, sources).into())?; Ok(result) } @@ -445,12 +446,13 @@ mod tests { } let update = DatabaseUpdate { - tables: vec![DatabaseTableUpdate { + tables: [DatabaseTableUpdate { table_id, table_name: "test".into(), deletes: deletes.into(), inserts: [].into(), - }], + }] + .into(), }; db.commit_tx(tx)?; @@ -535,7 +537,7 @@ mod tests { }; let update = DatabaseUpdate { - tables: vec![data.clone()], + tables: [data.clone()].into(), }; check_query_incr(&db, &tx, &s, &update, 1, &[row])?; @@ -656,7 +658,7 @@ mod tests { }; let update = DatabaseUpdate { - tables: vec![data1, data2], + tables: smallvec::smallvec![data1, data2], }; let row_1 = product!(1u64, "health"); @@ -1161,9 +1163,9 @@ mod tests { .collect::>(); let tables = if inserts.is_empty() && deletes.is_empty() { - vec![] + smallvec::smallvec![] } else { - vec![DatabaseTableUpdate { + smallvec::smallvec![DatabaseTableUpdate { table_id, table_name, inserts, diff --git a/crates/core/src/subscription/tx.rs b/crates/core/src/subscription/tx.rs index c8fb779d0e7..25d4e8aa954 100644 --- a/crates/core/src/subscription/tx.rs +++ b/crates/core/src/subscription/tx.rs @@ -46,21 +46,21 @@ impl DeltaTableIndexes { fn build_indexes_for_rows<'a>( tx: &'a TxId, meta: &'a QueriedTableIndexIds, - rows: impl Iterator)>, + rows: impl Iterator)>, ) -> HashMap<(TableId, IndexId), DeltaTableIndex> { let mut indexes: HashMap<(TableId, IndexId), DeltaTableIndex> = HashMap::new(); for (table_id, rows) in rows { - if let Some(schema) = tx.get_schema(*table_id) { + if let Some(schema) = tx.get_schema(table_id) { // Fetch the column ids for each index let mut cols_for_index = vec![]; - for index_id in meta.index_ids_for_table(*table_id) { + for index_id in meta.index_ids_for_table(table_id) { cols_for_index.push((index_id, schema.col_list_for_index_id(index_id))); } for (i, row) in rows.iter().enumerate() { for (index_id, col_list) in &cols_for_index { if !col_list.is_empty() { indexes - .entry((*table_id, *index_id)) + .entry((table_id, *index_id)) .or_default() .entry(row.project(col_list).unwrap()) .or_default() @@ -161,38 +161,24 @@ impl Datastore for DeltaTx<'_> { impl DeltaStore for DeltaTx<'_> { fn num_inserts(&self, table_id: TableId) -> usize { self.data - .and_then(|data| { - data.inserts() - .find(|(id, ..)| **id == table_id) - .map(|(.., rows)| rows.len()) - }) + .and_then(|data| data.inserts_for_table(table_id).map(|rows| rows.len())) .unwrap_or_default() } fn num_deletes(&self, table_id: TableId) -> usize { self.data - .and_then(|data| { - data.deletes() - .find(|(id, ..)| **id == table_id) - .map(|(.., rows)| rows.len()) - }) + .and_then(|data| data.deletes_for_table(table_id).map(|rows| rows.len())) .unwrap_or_default() } fn inserts_for_table(&self, table_id: TableId) -> Option> { - self.data.and_then(|data| { - data.inserts() - .find(|(id, ..)| **id == table_id) - .map(|(.., rows)| rows.iter()) - }) + self.data + .and_then(|data| data.inserts_for_table(table_id).map(|rows| rows.iter())) } fn deletes_for_table(&self, table_id: TableId) -> Option> { - self.data.and_then(|data| { - data.deletes() - .find(|(id, ..)| **id == table_id) - .map(|(.., rows)| rows.iter()) - }) + self.data + .and_then(|data| data.deletes_for_table(table_id).map(|rows| rows.iter())) } fn index_scan_range_for_delta( diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index fa83c59e2ea..8c156eb166c 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -1066,7 +1066,7 @@ impl CommittedState { ); // Record any truncated tables in the `TxData`. - tx_data.add_truncates(truncates); + tx_data.set_truncates(truncates); // Merge read sets from the `MutTxId` into the `CommittedState`. // It's important that this happens after applying the changes to `tx_data`, diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index 87de3ee4f6f..6c41ec483f0 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -809,10 +809,11 @@ impl TxMetrics { // TODO(centril): simplify this by exposing `tx_data.for_table(table_id)`. if let Some(tx_data) = tx_data { - // Update table rows and table size gauges, - // and sets them to zero if no table is present. - for (table_id, table_name) in tx_data.table_ids_and_names() { + for (table_id, table_entry) in tx_data.iter_table_entries() { + let table_name = &table_entry.table_name; + if let Some(stats) = self.table_stats.get(&table_id).unwrap() { + // Update table rows and table size gauges. DB_METRICS .rdb_num_table_rows .with_label_values(db, &table_id.0, table_name) @@ -821,29 +822,15 @@ impl TxMetrics { .rdb_table_size .with_label_values(db, &table_id.0, table_name) .set(stats.bytes_occupied_overestimate as i64); - } else { - // Table was dropped, remove the metrics. - let _ = DB_METRICS - .rdb_num_table_rows - .remove_label_values(db, &table_id.0, table_name); - let _ = DB_METRICS - .rdb_table_size - .remove_label_values(db, &table_id.0, table_name); - } - } - // Record inserts. - for (table_id, table_name, inserts) in tx_data.inserts_with_table_name() { - if let Some(stats) = self.table_stats.get(table_id).unwrap() { - let num_inserts = inserts.len() as u64; + // Record inserts. + let num_inserts = table_entry.inserts.len() as u64; let num_indices = stats.num_indices as u64; - // Increment rows inserted counter. DB_METRICS .rdb_num_rows_inserted .with_label_values(workload, db, reducer, &table_id.0, table_name) .inc_by(num_inserts); - // We don't have sparse indexes, so we can just multiply by the number of indexes. if stats.num_indices > 0 { // Increment index rows inserted counter @@ -852,29 +839,9 @@ impl TxMetrics { .with_label_values(workload, db, reducer, &table_id.0, table_name) .inc_by(num_inserts * num_indices); } - } else { - // Table was dropped, remove the metrics. - let _ = DB_METRICS.rdb_num_rows_inserted.remove_label_values( - workload, - db, - reducer, - &table_id.0, - table_name, - ); - let _ = DB_METRICS.rdb_num_index_entries_inserted.remove_label_values( - workload, - db, - reducer, - &table_id.0, - table_name, - ); - } - } - // Record deletes. - for (table_id, table_name, deletes) in tx_data.deletes_with_table_name() { - if let Some(stats) = self.table_stats.get(table_id).unwrap() { - let num_deletes = deletes.len() as u64; + // Record deletes. + let num_deletes = table_entry.deletes.len() as u64; let num_indices = stats.num_indices as u64; // Increment rows deleted counter. @@ -893,6 +860,26 @@ impl TxMetrics { } } else { // Table was dropped, remove the metrics. + let _ = DB_METRICS + .rdb_num_table_rows + .remove_label_values(db, &table_id.0, table_name); + let _ = DB_METRICS + .rdb_table_size + .remove_label_values(db, &table_id.0, table_name); + let _ = DB_METRICS.rdb_num_rows_inserted.remove_label_values( + workload, + db, + reducer, + &table_id.0, + table_name, + ); + let _ = DB_METRICS.rdb_num_index_entries_inserted.remove_label_values( + workload, + db, + reducer, + &table_id.0, + table_name, + ); let _ = DB_METRICS.rdb_num_rows_deleted.remove_label_values( workload, db, @@ -2694,16 +2681,18 @@ mod tests { // In the first tx, the row is not deleted, but it is inserted, so we end up with the row committed. assert_eq!(deleted_1, false); - assert_eq!(tx_data_1.deletes().count(), 0); - assert_eq!(tx_data_1.inserts().collect_vec(), [(&table_id, &[row.clone()].into())]); + let entries_1 = tx_data_1.iter_table_entries().collect_vec(); + assert_eq!(entries_1.len(), 1); + assert_eq!(entries_1[0].0, table_id); + assert_eq!(entries_1[0].1.deletes.len(), 0); + assert_eq!(entries_1[0].1.inserts, [row.clone()].into()); // In the second tx, the row is deleted from the commit state, // by marking it in the delete tables. // Then, when inserting, it is un-deleted by un-marking. // This sequence results in an empty tx-data. assert_eq!(deleted_2, true); - assert_eq!(tx_data_2.deletes().count(), 0); - assert_eq!(tx_data_2.inserts().collect_vec(), []); + assert_eq!(tx_data_2.iter_table_entries().count(), 0); Ok(()) } @@ -2761,8 +2750,7 @@ mod tests { // Commit the transaction. // We expect the transaction to be a noop. let tx_data = commit(&datastore, tx)?; - assert_eq!(tx_data.inserts().count(), 0); - assert_eq!(tx_data.deletes().count(), 0); + assert_eq!(tx_data.iter_table_entries().count(), 0); Ok(()) } @@ -3003,14 +2991,19 @@ mod tests { let tx_data_2 = commit(&datastore, tx)?; // Ensure that none of the commits deleted rows in our table. for tx_data in [&tx_data_1, &tx_data_2] { - assert_eq!(tx_data.deletes().find(|(tid, ..)| **tid == table_id), None); + assert_eq!( + tx_data + .iter_table_entries() + .find(|(tid, e)| *tid == table_id && !e.deletes.is_empty()), + None + ); } // Ensure that the first commit added the row but that the second didn't. for (tx_data, expected_rows) in [(&tx_data_1, vec![row.clone()]), (&tx_data_2, vec![])] { let inserted_rows = tx_data - .inserts() - .find(|(tid, _)| **tid == table_id) - .map(|(_, pvs)| pvs.to_vec()) + .iter_table_entries() + .find(|(tid, _)| *tid == table_id) + .map(|(_, e)| e.inserts.to_vec()) .unwrap_or_default(); assert_eq!(inserted_rows, expected_rows); } @@ -3327,12 +3320,12 @@ mod tests { // Now drop the table again and commit. assert!(datastore.drop_table_mut_tx(&mut tx, table_id).is_ok()); let tx_data = commit(&datastore, tx)?; - let (_, deleted_rows) = tx_data - .deletes() - .find(|(id, _)| **id == table_id) - .expect("should have deleted rows for `table_id`"); - assert_eq!(&**deleted_rows, [row]); - assert!(tx_data.truncates().contains(&table_id), "table should be truncated"); + let (_, entry) = tx_data + .iter_table_entries() + .find(|(id, _)| *id == table_id) + .expect("should have an entry for `table_id`"); + assert_eq!(&*entry.deletes, [row]); + assert!(entry.truncated, "table should be truncated"); // In the next transaction, the table doesn't exist. assert!( @@ -3535,14 +3528,10 @@ mod tests { let tx_data = commit(&datastore, tx)?; // Ensure the change has been persisted in the commitlog. let to_product = |col: &ColumnSchema| value_serialize(&StColumnRow::from(col.clone())).into_product().unwrap(); - let (_, inserts) = tx_data.inserts().find(|(id, _)| **id == ST_COLUMN_ID).unwrap(); - assert_eq!(&**inserts, [to_product(&columns[1])].as_slice()); - let (_, deletes) = tx_data.deletes().find(|(id, ..)| **id == ST_COLUMN_ID).unwrap(); - assert_eq!(&**deletes, [to_product(&columns_original[1])].as_slice()); - assert!( - !tx_data.truncates().contains(&ST_COLUMN_ID), - "table should not be truncated" - ); + let entry = tx_data.entry_for(ST_COLUMN_ID).unwrap(); + assert_eq!(&*entry.inserts, [to_product(&columns[1])].as_slice()); + assert_eq!(&*entry.deletes, [to_product(&columns_original[1])].as_slice()); + assert!(!entry.truncated, "table should not be truncated"); // Check that we can successfully scan using the new schema type post commit. let tx = begin_tx(&datastore); @@ -3650,13 +3639,12 @@ mod tests { ); // Validate Commitlog Changes - let (_, deletes) = tx_data - .deletes() - .find(|(id, _)| **id == table_id) + let entry = tx_data + .entry_for(table_id) .expect("Expected delete log for original table"); assert_eq!( - &**deletes, &old_rows, + &*entry.deletes, &old_rows, "Unexpected delete entries after altering the table" ); @@ -3666,13 +3654,12 @@ mod tests { product![8u64, AlgebraicValue::sum(0, 1u16.into()), 42u8], ]; - let (_, inserts) = tx_data - .inserts() - .find(|(id, _)| **id == new_table_id) + let new_entry = tx_data + .entry_for(new_table_id) .expect("Expected insert log for new table"); assert_eq!( - &**inserts, &inserted_rows, + &*new_entry.inserts, &inserted_rows, "Unexpected insert entries after altering the table" ); diff --git a/crates/datastore/src/traits.rs b/crates/datastore/src/traits.rs index 6986f8e891a..af99a36ceb8 100644 --- a/crates/datastore/src/traits.rs +++ b/crates/datastore/src/traits.rs @@ -1,6 +1,5 @@ use core::ops::Deref; use std::borrow::Cow; -use std::collections::BTreeMap; use std::{ops::RangeBounds, sync::Arc}; use super::locking_tx_datastore::datastore::TxMetrics; @@ -8,13 +7,15 @@ use super::system_tables::ModuleKind; use super::Result; use crate::execution_context::Workload; use crate::system_tables::ST_TABLE_ID; -use spacetimedb_data_structures::map::{IntMap, IntSet}; +use spacetimedb_data_structures::map::IntSet; +use spacetimedb_data_structures::small_map::SmallHashMap; use spacetimedb_durability::TxOffset; use spacetimedb_lib::{hash_bytes, Identity}; use spacetimedb_primitives::*; use spacetimedb_sats::hash::Hash; use spacetimedb_sats::{AlgebraicValue, ProductType, ProductValue}; use spacetimedb_schema::schema::{IndexSchema, SequenceSchema, TableSchema}; +use spacetimedb_table::static_assert_size; use spacetimedb_table::table::RowRef; /// The `IsolationLevel` enum specifies the degree to which a transaction is @@ -168,36 +169,73 @@ pub enum IsolationLevel { pub type EphemeralTables = IntSet; +type TableName = Box; +type TableNameRef<'a> = &'a str; + +/// The [`TxData`] entry for one table. +/// +/// All information about a table is stored in one place +/// as the access pattern is to write as fast as possible +/// and because it all fits within a single cache line +/// and so that we can use `SmallVec`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TxDataTableEntry { + /// The name of the table for which there were deletions and/or insertions. + pub table_name: TableName, + + /// The set of inserts for this table in the transaction. + // Note: `Arc<[ProductValue]>` allows to cheaply + // use the values from `TxData` without cloning the + // contained `ProductValue`s. + pub inserts: Arc<[ProductValue]>, + + /// The set of deletes for this table in the transaction. + pub deletes: Arc<[ProductValue]>, + + /// Was the entire table truncated? + /// + /// *Truncating* means that all rows in the table have been deleted. + /// In other words, a truncated table is a cleared table. + /// + /// Note that when this is `true`, + /// `deletes` will be non-empty and contain all deleted rows. + pub truncated: bool, + + /// Is this table ephemeral? + /// + /// An ephemeral table is only populated when a view is executed + /// and does not need to be persisted to disk. + pub ephemeral: bool, +} + +static_assert_size!(TxDataTableEntry, 56); + +impl TxDataTableEntry { + /// Create a new, empty `TxDataTableEntry` for `table_name`. + pub fn new(table_name: TableName) -> Self { + Self { + table_name, + inserts: [].into(), + deletes: [].into(), + truncated: false, + ephemeral: false, + } + } +} + /// A record of all the operations within a transaction. /// /// Some extra information is embedded here /// so that the recording of execution metrics can be done without holding the tx lock. #[derive(Default)] pub struct TxData { - /// The inserted rows per table. - inserts: BTreeMap>, - /// The deleted rows per table. - deletes: BTreeMap>, - /// *Truncating* means that all rows in the table have been deleted. - /// In other words, a truncated table is a cleared table. - /// - /// Note that when a table has an entry in `truncates`, - /// it will also have an entry in `deletes`. - truncates: IntSet, - /// Map of all `TableId`s in both `inserts` and `deletes` to their - /// corresponding table name. - // TODO: Store table name as ref counted string. - tables: IntMap, + entries: SmallHashMap, + /// Tx offset of the transaction which performed these operations. /// /// `None` implies that `inserts` and `deletes` are both empty, /// but `Some` does not necessarily imply that either is non-empty. tx_offset: Option, - - /// Set of ephemeral tables modified in this transaction (only populated when a view is executed). - /// These tables do not need to be persisted to disk. - /// Every table listed here must appear in either `inserts` or `deletes`. - ephemeral_tables: Option, } impl TxData { @@ -215,103 +253,107 @@ impl TxData { self.tx_offset } + /// Ensures that an entry for `table_id` exists + /// or initializes it with `table_name`. + fn init_entry(&mut self, table_id: TableId, table_name: TableNameRef<'_>) -> &mut TxDataTableEntry { + self.entries + .get_or_insert(table_id, || TxDataTableEntry::new(table_name.into())) + } + /// Set `rows` as the inserted rows for `(table_id, table_name)`. - pub fn set_inserts_for_table(&mut self, table_id: TableId, table_name: &str, rows: Arc<[ProductValue]>) { - self.inserts.insert(table_id, rows); - self.tables.entry(table_id).or_insert_with(|| table_name.to_owned()); + pub fn set_inserts_for_table( + &mut self, + table_id: TableId, + table_name: TableNameRef<'_>, + rows: Arc<[ProductValue]>, + ) { + self.init_entry(table_id, table_name).inserts = rows; } /// Set `rows` as the deleted rows for `(table_id, table_name)`. /// /// When `truncated` is set, the table has been emptied in this transaction. - pub fn set_deletes_for_table(&mut self, table_id: TableId, table_name: &str, rows: Arc<[ProductValue]>) { - self.deletes.insert(table_id, rows); - self.tables.entry(table_id).or_insert_with(|| table_name.to_owned()); + pub fn set_deletes_for_table( + &mut self, + table_id: TableId, + table_name: TableNameRef<'_>, + rows: Arc<[ProductValue]>, + ) { + self.init_entry(table_id, table_name).deletes = rows; } - pub fn add_truncates(&mut self, truncated_tables: impl IntoIterator) { - self.truncates.extend(truncated_tables); + /// Mark the given `truncated_tables` as truncated in this transaction. + pub fn set_truncates(&mut self, truncated_tables: impl IntoIterator) { + for table_id in truncated_tables { + if let Some(entry) = self.entries.get_mut(&table_id) { + entry.truncated = true; + } + } } /// Determines which ephemeral tables were modified in this transaction. /// /// Iterates over the tables updated in this transaction and records those that /// also appear in `all_ephemeral_tables`. - /// `self.ephemeral_tables` remains `None` if no ephemeral tables were modified. pub fn set_ephemeral_tables(&mut self, all_ephemeral_tables: &EphemeralTables) { - for tid in self.tables.keys() { - if all_ephemeral_tables.contains(tid) { - self.ephemeral_tables - .get_or_insert_with(EphemeralTables::default) - .insert(*tid); + for tid in all_ephemeral_tables { + if let Some(entry) = self.entries.get_mut(tid) { + entry.ephemeral = true; } } } - pub fn ephemeral_tables(&self) -> Option<&EphemeralTables> { - self.ephemeral_tables.as_ref() + /// Returns an iterator over the persistent inserted rows per table. + pub fn persistent_inserts(&self) -> impl Iterator)> + '_ { + self.entries + .iter() + .filter(|(_, entry)| !entry.ephemeral && !entry.inserts.is_empty()) + .map(|(table_id, entry)| (*table_id, entry.inserts.clone())) } - /// Check if `table_id` is in the set of ephemeral tables for this transaction. - /// - /// Beware that ephemeral tables are known only after [Self::set_ephemeral_tables] - /// has been called. - pub fn is_ephemeral_table(&self, table_id: &TableId) -> bool { - self.ephemeral_tables - .as_ref() - .is_some_and(|etables| etables.contains(table_id)) - } - - /// Obtain an iterator over the inserted rows per table. - pub fn inserts(&self) -> impl Iterator)> + '_ { - self.inserts.iter() + /// Returns an iterator over the inserted rows per table. + pub fn inserts(&self) -> impl Iterator)> + '_ { + self.entries + .iter() + .filter(|(_, entry)| !entry.inserts.is_empty()) + .map(|(table_id, entry)| (*table_id, &entry.inserts)) } /// Get the `i`th inserted row for `table_id` if it exists pub fn get_ith_insert(&self, table_id: TableId, i: usize) -> Option<&ProductValue> { - self.inserts.get(&table_id).and_then(|rows| rows.get(i)) + self.inserts_for_table(table_id).and_then(|is| is.get(i)) } - /// Obtain an iterator over the inserted rows per table. + /// Returns an iterator over the persistent deleted rows per table. /// - /// If you don't need access to the table name, [`Self::inserts`] is - /// slightly more efficient. - pub fn inserts_with_table_name(&self) -> impl Iterator)> + '_ { - self.inserts.iter().map(|(table_id, rows)| { - let table_name = self - .tables - .get(table_id) - .expect("invalid `TxData`: partial table name mapping"); - (table_id, table_name.as_str(), rows) - }) + /// Truncated tables are not included. + pub fn persistent_deletes(&self) -> impl Iterator)> + '_ { + self.entries + .iter() + .filter(|(_, entry)| !entry.ephemeral && !entry.truncated && !entry.deletes.is_empty()) + .map(|(table_id, entry)| (*table_id, entry.deletes.clone())) } - /// Obtain an iterator over the deleted rows per table. - pub fn deletes(&self) -> impl Iterator)> + '_ { - self.deletes.iter() + /// Returns an iterator over the deleted rows per table. + pub fn deletes(&self) -> impl Iterator)> + '_ { + self.entries + .iter() + .filter(|(_, entry)| !entry.deletes.is_empty()) + .map(|(table_id, entry)| (*table_id, &entry.deletes)) } /// Get the `i`th deleted row for `table_id` if it exists pub fn get_ith_delete(&self, table_id: TableId, i: usize) -> Option<&ProductValue> { - self.deletes.get(&table_id).and_then(|rows| rows.get(i)) - } - - /// Obtain an iterator over the inserted rows per table. - /// - /// If you don't need access to the table name, [`Self::deletes`] is - /// slightly more efficient. - pub fn deletes_with_table_name(&self) -> impl Iterator)> + '_ { - self.deletes.iter().map(|(table_id, rows)| { - let table_name = self - .tables - .get(table_id) - .expect("invalid `TxData`: partial table name mapping"); - (table_id, table_name.as_str(), rows) - }) + self.deletes_for_table(table_id).and_then(|ds| ds.get(i)) } - pub fn truncates(&self) -> impl Iterator + '_ { - self.truncates.iter().copied() + /// Returns an iterator over all the non-emphemeral tables + /// that were truncated in this transaction. + pub fn persistent_truncates(&self) -> impl Iterator + '_ { + self.entries + .iter() + .filter(|(_, entry)| !entry.ephemeral && entry.truncated) + .map(|(table_id, _)| *table_id) } /// Check if this [`TxData`] contains any `inserted | deleted` rows @@ -323,11 +365,12 @@ impl TxData { /// /// This is used to determine if a transaction should be written to disk. pub fn has_rows_or_connect_disconnect(&self, reducer_name: Option<&str>) -> bool { - let is_non_ephemeral_mutation = - |(table_id, rows): (_, &Arc<[_]>)| !(self.is_ephemeral_table(table_id) || rows.is_empty()); - - self.inserts().any(is_non_ephemeral_mutation) - || self.deletes().any(is_non_ephemeral_mutation) + // Persist if the table is non-emphemeral and had any inserts or deletes. + self.entries.values().any(|e| + !e.ephemeral + && (!e.inserts.is_empty() || !e.deletes.is_empty()) + ) + // Also persist for connect/disconnect reducers. || matches!( reducer_name.map(|rn| rn.strip_prefix("__identity_")), Some(Some("connected__" | "disconnected__")) @@ -336,12 +379,32 @@ impl TxData { /// Returns a list of tables affected in this transaction. pub fn table_ids_and_names(&self) -> impl '_ + Iterator { - self.tables.iter().map(|(k, v)| (*k, &**v)) + self.entries.iter().map(|(k, e)| (*k, &*e.table_name)) } /// Returns the number o tables affected in this transaction. pub fn num_tables_affected(&self) -> usize { - self.tables.len() + self.entries.len() + } + + /// Returns the entry for `table_id`. + pub fn entry_for(&self, table_id: TableId) -> Option<&TxDataTableEntry> { + self.entries.get(&table_id) + } + + /// Returns the inserts for `table_id`. + pub fn inserts_for_table(&self, table_id: TableId) -> Option<&[ProductValue]> { + self.entry_for(table_id).map(|entry| &*entry.inserts) + } + + /// Returns the inserts for `table_id`. + pub fn deletes_for_table(&self, table_id: TableId) -> Option<&[ProductValue]> { + self.entry_for(table_id).map(|entry| &*entry.deletes) + } + + /// Returns an iterator over all [`TxDataTableEntry`]s in this transaction. + pub fn iter_table_entries(&self) -> impl '_ + ExactSizeIterator { + self.entries.iter().map(|(k, v)| (*k, v)) } }