diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index 69158ece7a6..a7828c4b1dc 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -91,6 +91,22 @@ pub struct CommittedState { /// - system tables: `st_view_sub`, `st_view_arg` /// - Tables which back views. pub(super) ephemeral_tables: EphemeralTables, + + /// Rows within `st_column` which should be ignored during replay + /// due to having been superseded by a new row representing the same column. + /// + /// During replay, we visit all of the inserts table-by-table, followed by all of the deletes table-by-table. + /// This means that, when multiple columns of a table change type within the same transaction, + /// we see all of the newly-inserted `st_column` rows first, and then later, all of the deleted rows. + /// We may even see inserts into the altered table before seeing the `st_column` deletes! + /// + /// In order to maintain a proper view of the schema of tables during replay, + /// we must remember the old versions of the `st_column` rows when we insert the new ones, + /// so that we can respect only the new versions. + /// + /// We insert into this set during [`Self::replay_insert`] of `st_column` rows + /// and delete from it during [`Self::replay_delete`] of `st_column` rows. + replay_columns_to_ignore: HashSet, } impl CommittedState { @@ -120,6 +136,7 @@ impl MemoryUsage for CommittedState { table_dropped, read_sets, ephemeral_tables, + replay_columns_to_ignore, } = self; // NOTE(centril): We do not want to include the heap usage of `page_pool` as it's a shared resource. next_tx_offset.heap_usage() @@ -129,6 +146,7 @@ impl MemoryUsage for CommittedState { + table_dropped.heap_usage() + read_sets.heap_usage() + ephemeral_tables.heap_usage() + + replay_columns_to_ignore.heap_usage() } } @@ -199,6 +217,7 @@ impl CommittedState { read_sets: <_>::default(), page_pool, ephemeral_tables: <_>::default(), + replay_columns_to_ignore: <_>::default(), } } @@ -483,7 +502,7 @@ impl CommittedState { let (table, blob_store, _, page_pool) = self.get_table_and_blob_store_mut(table_id)?; // Delete the row. - table + let row_ptr = table .delete_equal_row(page_pool, blob_store, row) .map_err(TableError::Bflatn)? .ok_or_else(|| anyhow!("Delete for non-existent row when replaying transaction"))?; @@ -503,6 +522,19 @@ impl CommittedState { self.table_dropped.insert(dropped_table_id); } + if table_id == ST_COLUMN_ID { + // We may have reached the corresponding delete to an insert in `st_column` + // as the result of a column-type-altering migration. + // Now that the outdated `st_column` row isn't present any more, + // we can stop ignoring it. + // + // It's also possible that we're deleting this column as the result of a deleted table, + // and that there wasn't any corresponding insert at all. + // If that's the case, `row_ptr` won't be in `self.replay_columns_to_ignore`, + // which is fine. + self.replay_columns_to_ignore.remove(&row_ptr); + } + Ok(()) } @@ -532,39 +564,63 @@ impl CommittedState { Err(InsertError::IndexError(e)) => return Err(IndexError::UniqueConstraintViolation(e).into()), }; - let row_ptr = row_ref.pointer(); - if table_id == ST_COLUMN_ID { // We've made a modification to `st_column`. // The type of a table has changed, so figure out which. // The first column in `StColumnRow` is `table_id`. - self.st_column_changed(row, row_ptr)?; + let row_ptr = row_ref.pointer(); + let table_id = self.ignore_previous_versions_of_column(row, row_ptr)?; + self.st_column_changed(table_id)?; } Ok(()) } + /// Mark all `st_column` rows which refer to the same column as `st_column_row` + /// other than the one at `row_pointer` as outdated + /// by storing them in [`Self::replay_columns_to_ignore`]. + /// + /// Returns the ID of the table to which `st_column_row` belongs. + fn ignore_previous_versions_of_column( + &mut self, + st_column_row: &ProductValue, + row_ptr: RowPointer, + ) -> Result { + let target_table_id = Self::read_table_id(st_column_row); + let target_col_id = ColId::deserialize(ValueDeserializer::from_ref(&st_column_row.elements[1])) + .expect("second field in `st_column` should decode to a `ColId`"); + + let outdated_st_column_rows = iter_st_column_for_table(self, &target_table_id.into())? + .filter_map(|row_ref| { + StColumnRow::try_from(row_ref) + .map(|c| (c.col_pos == target_col_id && row_ref.pointer() != row_ptr).then(|| row_ref.pointer())) + .transpose() + }) + .collect::>>()?; + + for row in outdated_st_column_rows { + self.replay_columns_to_ignore.insert(row); + } + + Ok(target_table_id) + } + /// Refreshes the columns and layout of a table /// when a `row` has been inserted from `st_column`. /// /// The `row_ptr` is a pointer to `row`. - fn st_column_changed(&mut self, row: &ProductValue, row_ptr: RowPointer) -> Result<()> { - let target_table_id = Self::read_table_id(row); - let target_col_id = ColId::deserialize(ValueDeserializer::from_ref(&row.elements[1])) - .expect("second field in `st_column` should decode to a `ColId`"); - + fn st_column_changed(&mut self, table_id: TableId) -> Result<()> { // We're replaying and we don't have unique constraints yet. // Due to replay handling all inserts first and deletes after, // when processing `st_column` insert/deletes, // we may end up with two definitions for the same `col_pos`. // Of those two, we're interested in the one we just inserted // and not the other one, as it is being replaced. - let mut columns = iter_st_column_for_table(self, &target_table_id.into())? - .filter_map(|row_ref| { - StColumnRow::try_from(row_ref) - .map(|c| (c.col_pos != target_col_id || row_ref.pointer() == row_ptr).then(|| c.into())) - .transpose() - }) + // `Self::ignore_previous_version_of_column` has marked the old version as ignored, + // so filter only the non-ignored columns. + let mut columns = iter_st_column_for_table(self, &table_id.into())? + .filter(|row_ref| self.replay_columns_to_ignore.contains(&row_ref.pointer())) + .map(|row_ref| StColumnRow::try_from(row_ref).map(Into::into)) .collect::>>()?; // Columns in `st_column` are not in general sorted by their `col_pos`, @@ -573,13 +629,23 @@ impl CommittedState { columns.sort_by_key(|col: &ColumnSchema| col.col_pos); // Update the columns and layout of the the in-memory table. - if let Some(table) = self.tables.get_mut(&target_table_id) { + if let Some(table) = self.tables.get_mut(&table_id) { table.change_columns_to(columns).map_err(TableError::from)?; } Ok(()) } + pub(super) fn replay_end_tx(&mut self) -> Result<()> { + self.next_tx_offset += 1; + + if !self.replay_columns_to_ignore.is_empty() { + Err(anyhow::anyhow!("`CommittedState::replay_columns_to_ignore` should be empty at the end of a commit, but found {} entries", self.replay_columns_to_ignore.len()).into()) + } else { + Ok(()) + } + } + /// Assuming that a `TableId` is stored as the first field in `row`, read it. fn read_table_id(row: &ProductValue) -> TableId { TableId::deserialize(ValueDeserializer::from_ref(&row.elements[0])) diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index 1fcebe96963..25637031039 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -1284,9 +1284,7 @@ impl spacetimedb_commitlog::payload::txdata::Visitor for ReplayVi } fn visit_tx_end(&mut self) -> std::result::Result<(), Self::Error> { - self.committed_state.next_tx_offset += 1; - - Ok(()) + self.committed_state.replay_end_tx().map_err(Into::into) } }