diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index 6b93e996ce4..7147fdac8bf 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -1146,11 +1146,44 @@ impl SubscriptionManager { event: Arc, caller: Option>, ) -> ExecutionMetrics { - use FormatSwitch::{Bsatn, Json}; + let span = tracing::info_span!("eval_incr").entered(); - let tables = &event.status.database_update().unwrap().tables; + let (updates, errs, metrics) = if self.queries.is_empty() { + // We have no queries, so do nothing. + <_>::default() + } else { + let tables = &event.status.database_update().unwrap().tables; + self.eval_updates_sequential_inner(tx, bsatn_rlb_pool, tables) + }; - let span = tracing::info_span!("eval_incr").entered(); + let queries = ComputedQueries { + updates, + errs, + event, + caller, + }; + + // We've now finished all of the work which needs to read from the datastore, + // so get this work off the main thread and over to the `send_worker`, + // then return ASAP in order to unlock the datastore and start running the next transaction. + // See comment on the `send_worker_tx` field in [`SubscriptionManager`] for more motivation. + self.send_worker_queue + .send(SendWorkerMessage::Broadcast { tx_offset, queries }) + .expect("send worker has panicked, or otherwise dropped its recv queue!"); + + drop(span); + + metrics + } + + #[allow(clippy::type_complexity)] + fn eval_updates_sequential_inner( + &self, + tx: &DeltaTx, + bsatn_rlb_pool: &BsatnRowListBuilderPool, + tables: &[DatabaseTableUpdate], + ) -> (Vec, Vec<(ClientId, Box)>, ExecutionMetrics) { + use FormatSwitch::{Bsatn, Json}; #[derive(Default)] struct FoldState { @@ -1306,25 +1339,7 @@ impl SubscriptionManager { acc }); - // We've now finished all of the work which needs to read from the datastore, - // so get this work off the main thread and over to the `send_worker`, - // then return ASAP in order to unlock the datastore and start running the next transaction. - // See comment on the `send_worker_tx` field in [`SubscriptionManager`] for more motivation. - self.send_worker_queue - .send(SendWorkerMessage::Broadcast { - tx_offset, - queries: ComputedQueries { - updates, - errs, - event, - caller, - }, - }) - .expect("send worker has panicked, or otherwise dropped its recv queue!"); - - drop(span); - - metrics + (updates, errs, metrics) } }