Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 14 additions & 25 deletions crates/core/src/db/durability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ use spacetimedb_commitlog::payload::{
txdata::{Mutations, Ops},
Txdata,
};
use spacetimedb_data_structures::map::IntSet;
use spacetimedb_datastore::{execution_context::ReducerContext, traits::TxData};
use spacetimedb_durability::{DurableOffset, TxOffset};
use spacetimedb_lib::Identity;
use spacetimedb_primitives::TableId;
use tokio::{
runtime,
sync::{
Expand Down Expand Up @@ -208,32 +206,23 @@ impl DurabilityWorkerActor {
return;
}

let is_persistent_table = |table_id: &TableId| -> bool { !tx_data.is_ephemeral_table(table_id) };

let inserts: Box<_> = tx_data
.inserts()
// Skip ephemeral tables
.filter(|(table_id, _)| is_persistent_table(table_id))
.map(|(table_id, rowdata)| Ops {
table_id: *table_id,
rowdata: rowdata.clone(),
})
let mut inserts: Box<_> = tx_data
.persistent_inserts()
.map(|(table_id, rowdata)| Ops { table_id, rowdata })
.collect();

let truncates: IntSet<TableId> = tx_data.truncates().collect();

let deletes: Box<_> = tx_data
.deletes()
.filter(|(table_id, _)| is_persistent_table(table_id))
.map(|(table_id, rowdata)| Ops {
table_id: *table_id,
rowdata: rowdata.clone(),
})
// filter out deletes for tables that are truncated in the same transaction.
.filter(|ops| !truncates.contains(&ops.table_id))
// What we get from `tx_data` is not necessarily sorted,
// but the durability layer expects by-table_id sorted data.
// Unstable sorts are valid, there will only ever be one entry per table_id.
inserts.sort_unstable_by_key(|ops| ops.table_id);

let mut deletes: Box<_> = tx_data
.persistent_deletes()
.map(|(table_id, rowdata)| Ops { table_id, rowdata })
.collect();
deletes.sort_unstable_by_key(|ops| ops.table_id);

let truncates: Box<_> = truncates.into_iter().filter(is_persistent_table).collect();
let mut truncates: Box<[_]> = tx_data.persistent_truncates().collect();
truncates.sort_unstable_by_key(|table_id| *table_id);

let inputs = reducer_context.map(|rcx| rcx.into());

Expand Down
36 changes: 12 additions & 24 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ use indexmap::IndexSet;
use itertools::Itertools;
use prometheus::{Histogram, IntGauge};
use scopeguard::ScopeGuard;
use smallvec::SmallVec;
use spacetimedb_auth::identity::ConnectionAuthCtx;
use spacetimedb_client_api_messages::energy::FunctionBudget;
use spacetimedb_client_api_messages::websocket::{
ByteListLen, Compression, OneOffTable, QueryUpdate, Subscribe, SubscribeMulti, SubscribeSingle,
};
use spacetimedb_data_structures::error_stream::ErrorStream;
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
use spacetimedb_datastore::error::DatastoreError;
use spacetimedb_datastore::execution_context::{Workload, WorkloadType};
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, ViewCallInfo};
Expand All @@ -54,11 +54,10 @@ use spacetimedb_expr::expr::CollectViews;
use spacetimedb_lib::db::raw_def::v9::Lifecycle;
use spacetimedb_lib::identity::{AuthCtx, RequestId};
use spacetimedb_lib::metrics::ExecutionMetrics;
use spacetimedb_lib::Timestamp;
use spacetimedb_lib::{AlgebraicType, ConnectionId};
use spacetimedb_lib::{ConnectionId, Timestamp};
use spacetimedb_primitives::{ArgId, ProcedureId, TableId, ViewFnPtr, ViewId};
use spacetimedb_query::compile_subscription;
use spacetimedb_sats::{AlgebraicTypeRef, ProductValue};
use spacetimedb_sats::{AlgebraicType, AlgebraicTypeRef, ProductValue};
use spacetimedb_schema::auto_migrate::{AutoMigrateError, MigrationPolicy};
use spacetimedb_schema::def::{ModuleDef, ProcedureDef, ReducerDef, TableDef, ViewDef};
use spacetimedb_schema::schema::{Schema, TableSchema};
Expand All @@ -73,7 +72,7 @@ use tokio::sync::oneshot;

#[derive(Debug, Default, Clone, From)]
pub struct DatabaseUpdate {
pub tables: Vec<DatabaseTableUpdate>,
pub tables: SmallVec<[DatabaseTableUpdate; 1]>,
}

impl FromIterator<DatabaseTableUpdate> for DatabaseUpdate {
Expand All @@ -93,26 +92,15 @@ impl DatabaseUpdate {
}

pub fn from_writes(tx_data: &TxData) -> Self {
let mut map: IntMap<TableId, DatabaseTableUpdate> = IntMap::new();
let new_update = |table_id, table_name: &str| DatabaseTableUpdate {
let entries = tx_data.iter_table_entries();
let mut tables = SmallVec::with_capacity(entries.len());
tables.extend(entries.map(|(table_id, e)| DatabaseTableUpdate {
table_id,
table_name: table_name.into(),
inserts: [].into(),
deletes: [].into(),
};
for (table_id, table_name, rows) in tx_data.inserts_with_table_name() {
map.entry(*table_id)
.or_insert_with(|| new_update(*table_id, table_name))
.inserts = rows.clone();
}
for (table_id, table_name, rows) in tx_data.deletes_with_table_name() {
map.entry(*table_id)
.or_insert_with(|| new_update(*table_id, table_name))
.deletes = rows.clone();
}
DatabaseUpdate {
tables: map.into_values().collect(),
}
table_name: e.table_name.clone(),
inserts: e.inserts.clone(),
deletes: e.deletes.clone(),
}));
DatabaseUpdate { tables }
}

/// The number of rows in the payload
Expand Down
13 changes: 7 additions & 6 deletions crates/core/src/sql/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::subscription::tx::DeltaTx;
use crate::util::slow::SlowQueryLogger;
use crate::vm::{check_row_limit, DbProgram, TxMode};
use anyhow::anyhow;
use smallvec::SmallVec;
use spacetimedb_datastore::execution_context::Workload;
use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;
use spacetimedb_datastore::traits::IsolationLevel;
Expand All @@ -42,7 +43,7 @@ pub struct StmtResult {

pub(crate) fn collect_result(
result: &mut Vec<MemTable>,
updates: &mut Vec<DatabaseTableUpdate>,
updates: &mut SmallVec<[DatabaseTableUpdate; 1]>,
r: CodeResult,
) -> Result<(), DBError> {
match r {
Expand Down Expand Up @@ -74,7 +75,7 @@ fn execute(
p: &mut DbProgram<'_, '_>,
ast: Vec<CrudExpr>,
sql: &str,
updates: &mut Vec<DatabaseTableUpdate>,
updates: &mut SmallVec<[DatabaseTableUpdate; 1]>,
) -> Result<Vec<MemTable>, DBError> {
let slow_query_threshold = if let TxMode::Tx(tx) = p.tx {
p.db.query_limit(tx)?.map(Duration::from_millis)
Expand Down Expand Up @@ -102,7 +103,7 @@ pub fn execute_sql(
subs: Option<&ModuleSubscriptions>,
) -> Result<Vec<MemTable>, DBError> {
if CrudExpr::is_reads(&ast) {
let mut updates = Vec::new();
let mut updates = SmallVec::new();
db.with_read_only(Workload::Sql, |tx| {
execute(
&mut DbProgram::new(db, &mut TxMode::Tx(tx), auth),
Expand All @@ -112,7 +113,7 @@ pub fn execute_sql(
)
})
} else if subs.is_none() {
let mut updates = Vec::new();
let mut updates = SmallVec::new();
db.with_auto_commit(Workload::Sql, |mut_tx| {
execute(
&mut DbProgram::new(db, &mut mut_tx.into(), auth),
Expand All @@ -123,7 +124,7 @@ pub fn execute_sql(
})
} else {
let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Sql);
let mut updates = Vec::with_capacity(ast.len());
let mut updates = SmallVec::with_capacity(ast.len());
let res = execute(
&mut DbProgram::new(db, &mut (&mut tx).into(), auth.clone()),
ast,
Expand Down Expand Up @@ -170,7 +171,7 @@ pub fn execute_sql_tx<'a>(
return Ok(None);
}

let mut updates = Vec::new(); // No subscription updates in this path, because it requires owning the tx.
let mut updates = SmallVec::new(); // No subscription updates in this path, because it requires owning the tx.
execute(&mut DbProgram::new(db, &mut tx, auth), ast, sql, &mut updates).map(Some)
}

Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/subscription/execution_unit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use super::subscription::{IncrementalJoin, SupportedQuery};
use crate::db::relational_db::{RelationalDB, Tx};
use crate::error::DBError;
use crate::estimation;
use crate::host::module_host::{DatabaseTableUpdate, DatabaseTableUpdateRelValue, UpdatesRelValue};
use crate::host::module_host::DatabaseTableUpdate;
use crate::host::module_host::{DatabaseTableUpdateRelValue, UpdatesRelValue};
use crate::messages::websocket::TableUpdate;
use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilderSource};
use crate::util::slow::SlowQueryLogger;
Expand Down
16 changes: 9 additions & 7 deletions crates/core/src/subscription/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ mod tests {
use crate::vm::tests::create_table_with_rows;
use crate::vm::DbProgram;
use itertools::Itertools;
use smallvec::SmallVec;
use spacetimedb_client_api_messages::websocket::{BsatnFormat, CompressableQueryUpdate, Compression};
use spacetimedb_datastore::execution_context::Workload;
use spacetimedb_lib::bsatn;
Expand Down Expand Up @@ -195,7 +196,7 @@ mod tests {
let q = Expr::Crud(Box::new(CrudExpr::Query(query.clone())));

let mut result = Vec::with_capacity(1);
let mut updates = Vec::new();
let mut updates = SmallVec::new();
collect_result(&mut result, &mut updates, run_ast(p, q, sources).into())?;
Ok(result)
}
Expand Down Expand Up @@ -445,12 +446,13 @@ mod tests {
}

let update = DatabaseUpdate {
tables: vec![DatabaseTableUpdate {
tables: [DatabaseTableUpdate {
table_id,
table_name: "test".into(),
deletes: deletes.into(),
inserts: [].into(),
}],
}]
.into(),
};

db.commit_tx(tx)?;
Expand Down Expand Up @@ -535,7 +537,7 @@ mod tests {
};

let update = DatabaseUpdate {
tables: vec![data.clone()],
tables: [data.clone()].into(),
};

check_query_incr(&db, &tx, &s, &update, 1, &[row])?;
Expand Down Expand Up @@ -656,7 +658,7 @@ mod tests {
};

let update = DatabaseUpdate {
tables: vec![data1, data2],
tables: smallvec::smallvec![data1, data2],
};

let row_1 = product!(1u64, "health");
Expand Down Expand Up @@ -1161,9 +1163,9 @@ mod tests {
.collect::<Arc<_>>();

let tables = if inserts.is_empty() && deletes.is_empty() {
vec![]
smallvec::smallvec![]
} else {
vec![DatabaseTableUpdate {
smallvec::smallvec![DatabaseTableUpdate {
table_id,
table_name,
inserts,
Expand Down
34 changes: 10 additions & 24 deletions crates/core/src/subscription/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,21 @@ impl DeltaTableIndexes {
fn build_indexes_for_rows<'a>(
tx: &'a TxId,
meta: &'a QueriedTableIndexIds,
rows: impl Iterator<Item = (&'a TableId, &'a Arc<[ProductValue]>)>,
rows: impl Iterator<Item = (TableId, &'a Arc<[ProductValue]>)>,
) -> HashMap<(TableId, IndexId), DeltaTableIndex> {
let mut indexes: HashMap<(TableId, IndexId), DeltaTableIndex> = HashMap::new();
for (table_id, rows) in rows {
if let Some(schema) = tx.get_schema(*table_id) {
if let Some(schema) = tx.get_schema(table_id) {
// Fetch the column ids for each index
let mut cols_for_index = vec![];
for index_id in meta.index_ids_for_table(*table_id) {
for index_id in meta.index_ids_for_table(table_id) {
cols_for_index.push((index_id, schema.col_list_for_index_id(index_id)));
}
for (i, row) in rows.iter().enumerate() {
for (index_id, col_list) in &cols_for_index {
if !col_list.is_empty() {
indexes
.entry((*table_id, *index_id))
.entry((table_id, *index_id))
.or_default()
.entry(row.project(col_list).unwrap())
.or_default()
Expand Down Expand Up @@ -161,38 +161,24 @@ impl Datastore for DeltaTx<'_> {
impl DeltaStore for DeltaTx<'_> {
fn num_inserts(&self, table_id: TableId) -> usize {
self.data
.and_then(|data| {
data.inserts()
.find(|(id, ..)| **id == table_id)
.map(|(.., rows)| rows.len())
})
.and_then(|data| data.inserts_for_table(table_id).map(|rows| rows.len()))
.unwrap_or_default()
}

fn num_deletes(&self, table_id: TableId) -> usize {
self.data
.and_then(|data| {
data.deletes()
.find(|(id, ..)| **id == table_id)
.map(|(.., rows)| rows.len())
})
.and_then(|data| data.deletes_for_table(table_id).map(|rows| rows.len()))
.unwrap_or_default()
}

fn inserts_for_table(&self, table_id: TableId) -> Option<std::slice::Iter<'_, ProductValue>> {
self.data.and_then(|data| {
data.inserts()
.find(|(id, ..)| **id == table_id)
.map(|(.., rows)| rows.iter())
})
self.data
.and_then(|data| data.inserts_for_table(table_id).map(|rows| rows.iter()))
}

fn deletes_for_table(&self, table_id: TableId) -> Option<std::slice::Iter<'_, ProductValue>> {
self.data.and_then(|data| {
data.deletes()
.find(|(id, ..)| **id == table_id)
.map(|(.., rows)| rows.iter())
})
self.data
.and_then(|data| data.deletes_for_table(table_id).map(|rows| rows.iter()))
}

fn index_scan_range_for_delta(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,7 @@ impl CommittedState {
);

// Record any truncated tables in the `TxData`.
tx_data.add_truncates(truncates);
tx_data.set_truncates(truncates);

// Merge read sets from the `MutTxId` into the `CommittedState`.
// It's important that this happens after applying the changes to `tx_data`,
Expand Down
Loading
Loading