Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 37 additions & 22 deletions crates/core/src/subscription/module_subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1146,11 +1146,44 @@ impl SubscriptionManager {
event: Arc<ModuleEvent>,
caller: Option<Arc<ClientConnectionSender>>,
) -> ExecutionMetrics {
use FormatSwitch::{Bsatn, Json};
let span = tracing::info_span!("eval_incr").entered();

let tables = &event.status.database_update().unwrap().tables;
let (updates, errs, metrics) = if self.queries.is_empty() {
// We have no queries, so do nothing.
<_>::default()
} else {
let tables = &event.status.database_update().unwrap().tables;
self.eval_updates_sequential_inner(tx, bsatn_rlb_pool, tables)
};

let span = tracing::info_span!("eval_incr").entered();
let queries = ComputedQueries {
updates,
errs,
event,
caller,
};

// We've now finished all of the work which needs to read from the datastore,
// so get this work off the main thread and over to the `send_worker`,
// then return ASAP in order to unlock the datastore and start running the next transaction.
// See comment on the `send_worker_tx` field in [`SubscriptionManager`] for more motivation.
self.send_worker_queue
.send(SendWorkerMessage::Broadcast { tx_offset, queries })
.expect("send worker has panicked, or otherwise dropped its recv queue!");

drop(span);

metrics
}

#[allow(clippy::type_complexity)]
fn eval_updates_sequential_inner(
&self,
tx: &DeltaTx,
bsatn_rlb_pool: &BsatnRowListBuilderPool,
tables: &[DatabaseTableUpdate],
) -> (Vec<ClientUpdate>, Vec<(ClientId, Box<str>)>, ExecutionMetrics) {
use FormatSwitch::{Bsatn, Json};

#[derive(Default)]
struct FoldState {
Expand Down Expand Up @@ -1306,25 +1339,7 @@ impl SubscriptionManager {
acc
});

// We've now finished all of the work which needs to read from the datastore,
// so get this work off the main thread and over to the `send_worker`,
// then return ASAP in order to unlock the datastore and start running the next transaction.
// See comment on the `send_worker_tx` field in [`SubscriptionManager`] for more motivation.
self.send_worker_queue
.send(SendWorkerMessage::Broadcast {
tx_offset,
queries: ComputedQueries {
updates,
errs,
event,
caller,
},
})
.expect("send worker has panicked, or otherwise dropped its recv queue!");

drop(span);

metrics
(updates, errs, metrics)
}
}

Expand Down
Loading