Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions crates/bench/benches/subscription.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use spacetimedb::client::consume_each_list::ConsumeEachBuffer;
use spacetimedb::error::DBError;
use spacetimedb::host::module_host::DatabaseTableUpdate;
use spacetimedb::identity::AuthCtx;
use spacetimedb::messages::websocket::BsatnFormat;
use spacetimedb::sql::ast::SchemaViewer;
use spacetimedb::subscription::query::compile_read_only_queryset;
use spacetimedb::subscription::row_list_builder_pool::BsatnRowListBuilderPool;
use spacetimedb::subscription::subscription::ExecutionSet;
use spacetimedb::subscription::tx::DeltaTx;
use spacetimedb::subscription::{collect_table_update, TableUpdateType};
Expand Down Expand Up @@ -119,6 +121,8 @@ fn eval(c: &mut Criterion) {
let ins_rhs = insert_op(rhs, "location", new_rhs_row);
let update = [&ins_lhs, &ins_rhs];

let bsatn_rlb_pool = black_box(BsatnRowListBuilderPool::new());

// A benchmark runner for the new query engine
let bench_query = |c: &mut Criterion, name, sql| {
c.bench_function(name, |b| {
Expand All @@ -134,13 +138,17 @@ fn eval(c: &mut Criterion) {
let tx = DeltaTx::from(&tx);

b.iter(|| {
drop(black_box(collect_table_update::<_, BsatnFormat>(
let updates = black_box(collect_table_update::<BsatnFormat>(
&plans,
table_id,
table_name.clone(),
&tx,
TableUpdateType::Subscribe,
)))
&bsatn_rlb_pool,
));
if let Ok((updates, _)) = updates {
updates.consume_each_list(&mut |buffer| bsatn_rlb_pool.try_put(buffer));
}
})
});
};
Expand All @@ -152,12 +160,9 @@ fn eval(c: &mut Criterion) {
let query: ExecutionSet = query.into();

b.iter(|| {
drop(black_box(query.eval::<BsatnFormat>(
&raw.db,
&tx,
None,
Compression::None,
)))
let updates =
black_box(query.eval::<BsatnFormat>(&raw.db, &tx, &bsatn_rlb_pool, None, Compression::None));
updates.consume_each_list(&mut |buffer| bsatn_rlb_pool.try_put(buffer));
})
});
};
Expand Down
5 changes: 5 additions & 0 deletions crates/client-api-messages/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,11 @@ impl BsatnRowList {
let data_range = self.size_hint.index_to_range(index, data_end)?;
Some(self.rows_data.slice(data_range))
}

/// Consumes the list and returns the parts.
pub fn into_inner(self) -> (RowSizeHint, Bytes) {
(self.size_hint, self.rows_data)
}
}

/// An iterator over all the elements in a [`BsatnRowList`].
Expand Down
36 changes: 27 additions & 9 deletions crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use spacetimedb::client::{
};
use spacetimedb::host::module_host::ClientConnectedError;
use spacetimedb::host::NoSuchModule;
use spacetimedb::subscription::row_list_builder_pool::BsatnRowListBuilderPool;
use spacetimedb::util::spawn_rayon;
use spacetimedb::worker_metrics::WORKER_METRICS;
use spacetimedb::Identity;
Expand Down Expand Up @@ -404,6 +405,8 @@ async fn ws_client_actor_inner(
let (idle_tx, idle_rx) = watch::channel(state.next_idle_deadline());
let idle_timer = ws_idle_timer(idle_rx);

let bsatn_rlb_pool = client.module().subscriptions().bsatn_rlb_pool.clone();

// Spawn a task to send outgoing messages
// obtained from `sendrx` and `unordered_rx`.
let send_task = tokio::spawn(ws_send_loop(
Expand All @@ -412,6 +415,7 @@ async fn ws_client_actor_inner(
ws_send,
sendrx,
unordered_rx,
bsatn_rlb_pool,
));
// Spawn a task to handle incoming messages.
let recv_task = tokio::spawn(ws_recv_task(
Expand Down Expand Up @@ -1050,10 +1054,11 @@ async fn ws_send_loop(
ws: impl Sink<WsMessage, Error: Display> + Unpin,
messages: impl Receiver<SerializableMessage>,
unordered: mpsc::UnboundedReceiver<UnorderedWsMessage>,
bsatn_rlb_pool: BsatnRowListBuilderPool,
) {
let metrics = SendMetrics::new(state.database);
ws_send_loop_inner(state, ws, messages, unordered, |encode_rx, frames_tx| {
ws_encode_task(metrics, config, encode_rx, frames_tx)
ws_send_loop_inner(state, ws, messages, unordered, move |encode_rx, frames_tx| {
ws_encode_task(metrics, config, encode_rx, frames_tx, bsatn_rlb_pool)
})
.await
}
Expand Down Expand Up @@ -1231,6 +1236,7 @@ async fn ws_encode_task(
config: ClientConfig,
mut messages: mpsc::UnboundedReceiver<OutboundMessage>,
outgoing_frames: mpsc::UnboundedSender<Frame>,
bsatn_rlb_pool: BsatnRowListBuilderPool,
) {
// Serialize buffers can be reclaimed once all frames of a message are
// copied to the wire. Since we don't know when that will happen, we prepare
Expand All @@ -1249,7 +1255,7 @@ async fn ws_encode_task(

let in_use_buf = match message {
OutboundMessage::Error(message) => {
let (stats, in_use, mut frames) = ws_encode_message(config, buf, message, false).await;
let (stats, in_use, mut frames) = ws_encode_message(config, buf, message, false, &bsatn_rlb_pool).await;
metrics.report(None, None, stats);
if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() {
break;
Expand All @@ -1262,7 +1268,8 @@ async fn ws_encode_task(
let num_rows = message.num_rows();
let is_large = num_rows.is_some_and(|n| n > 1024);

let (stats, in_use, mut frames) = ws_encode_message(config, buf, message, is_large).await;
let (stats, in_use, mut frames) =
ws_encode_message(config, buf, message, is_large, &bsatn_rlb_pool).await;
metrics.report(workload, num_rows, stats);
if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() {
break;
Expand Down Expand Up @@ -1319,18 +1326,25 @@ async fn ws_encode_message(
buf: SerializeBuffer,
message: impl ToProtocol<Encoded = SwitchedServerMessage> + Send + 'static,
is_large_message: bool,
bsatn_rlb_pool: &BsatnRowListBuilderPool,
) -> (EncodeMetrics, InUseSerializeBuffer, impl Iterator<Item = Frame>) {
const FRAGMENT_SIZE: usize = 4096;

let serialize_and_compress = |serialize_buf, message, config| {
fn serialize_and_compress(
bsatn_rlb_pool: &BsatnRowListBuilderPool,
serialize_buf: SerializeBuffer,
message: impl ToProtocol<Encoded = SwitchedServerMessage> + Send + 'static,
config: ClientConfig,
) -> (Duration, InUseSerializeBuffer, DataMessage) {
let start = Instant::now();
let (msg_alloc, msg_data) = serialize(serialize_buf, message, config);
let (msg_alloc, msg_data) = serialize(bsatn_rlb_pool, serialize_buf, message, config);
(start.elapsed(), msg_alloc, msg_data)
};
}
let (timing, msg_alloc, msg_data) = if is_large_message {
spawn_rayon(move || serialize_and_compress(buf, message, config)).await
let bsatn_rlb_pool = bsatn_rlb_pool.clone();
spawn_rayon(move || serialize_and_compress(&bsatn_rlb_pool, buf, message, config)).await
} else {
serialize_and_compress(buf, message, config)
serialize_and_compress(bsatn_rlb_pool, buf, message, config)
};

let metrics = EncodeMetrics {
Expand Down Expand Up @@ -1630,6 +1644,7 @@ mod tests {
sink::drain(),
messages_rx,
unordered_rx,
BsatnRowListBuilderPool::new(),
);
pin_mut!(send_loop);

Expand All @@ -1653,6 +1668,7 @@ mod tests {
sink::drain(),
messages_rx,
unordered_rx,
BsatnRowListBuilderPool::new(),
);
pin_mut!(send_loop);

Expand Down Expand Up @@ -1703,6 +1719,7 @@ mod tests {
UnfeedableSink,
messages_rx,
unordered_rx,
BsatnRowListBuilderPool::new(),
);
pin_mut!(send_loop);

Expand Down Expand Up @@ -1749,6 +1766,7 @@ mod tests {
UnflushableSink,
messages_rx,
unordered_rx,
BsatnRowListBuilderPool::new(),
);
pin_mut!(send_loop);

Expand Down
1 change: 1 addition & 0 deletions crates/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::fmt;

mod client_connection;
mod client_connection_index;
pub mod consume_each_list;
mod message_handlers;
pub mod messages;

Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::host::module_host::ClientConnectedError;
use crate::host::{CallProcedureReturn, FunctionArgs, ModuleHost, NoSuchModule, ReducerCallError, ReducerCallResult};
use crate::messages::websocket::Subscribe;
use crate::subscription::module_subscription_manager::BroadcastError;
use crate::subscription::row_list_builder_pool::JsonRowListBuilderFakePool;
use crate::util::asyncify;
use crate::util::prometheus_handle::IntGaugeExt;
use crate::worker_metrics::WORKER_METRICS;
Expand Down Expand Up @@ -954,6 +955,7 @@ impl ClientConnection {
self.sender.clone(),
message_id.to_owned(),
timer,
JsonRowListBuilderFakePool,
|msg: OneOffQueryResponseMessage<JsonFormat>| msg.into(),
)
.await
Expand All @@ -965,13 +967,15 @@ impl ClientConnection {
message_id: &[u8],
timer: Instant,
) -> Result<(), anyhow::Error> {
let bsatn_rlb_pool = self.module().replica_ctx().subscriptions.bsatn_rlb_pool.clone();
self.module()
.one_off_query::<BsatnFormat>(
self.auth.clone(),
query.to_owned(),
self.sender.clone(),
message_id.to_owned(),
timer,
bsatn_rlb_pool,
|msg: OneOffQueryResponseMessage<BsatnFormat>| msg.into(),
)
.await
Expand Down
80 changes: 80 additions & 0 deletions crates/core/src/client/consume_each_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use bytes::Bytes;
use spacetimedb_client_api_messages::websocket::{
BsatnFormat, BsatnRowList, CompressableQueryUpdate, DatabaseUpdate, OneOffQueryResponse, QueryUpdate,
ServerMessage, TableUpdate, UpdateStatus,
};

/// Moves each buffer in `self` into a closure.
pub trait ConsumeEachBuffer {
/// Consumes `self`, moving each `Bytes` buffer in `self` into the closure `each`.
fn consume_each_list(self, each: &mut impl FnMut(Bytes));
}

impl ConsumeEachBuffer for ServerMessage<BsatnFormat> {
fn consume_each_list(self, each: &mut impl FnMut(Bytes)) {
use ServerMessage::*;
match self {
InitialSubscription(x) => x.database_update.consume_each_list(each),
TransactionUpdate(x) => x.status.consume_each_list(each),
TransactionUpdateLight(x) => x.update.consume_each_list(each),
IdentityToken(_) | ProcedureResult(_) | SubscriptionError(_) => {}
OneOffQueryResponse(x) => x.consume_each_list(each),
SubscribeApplied(x) => x.rows.table_rows.consume_each_list(each),
UnsubscribeApplied(x) => x.rows.table_rows.consume_each_list(each),
SubscribeMultiApplied(x) => x.update.consume_each_list(each),
UnsubscribeMultiApplied(x) => x.update.consume_each_list(each),
}
}
}

impl ConsumeEachBuffer for OneOffQueryResponse<BsatnFormat> {
fn consume_each_list(self, each: &mut impl FnMut(Bytes)) {
Vec::from(self.tables)
.into_iter()
.for_each(|x| x.rows.consume_each_list(each));
}
}

impl ConsumeEachBuffer for UpdateStatus<BsatnFormat> {
fn consume_each_list(self, each: &mut impl FnMut(Bytes)) {
match self {
Self::Committed(x) => x.consume_each_list(each),
Self::Failed(_) | UpdateStatus::OutOfEnergy => {}
}
}
}

impl ConsumeEachBuffer for DatabaseUpdate<BsatnFormat> {
fn consume_each_list(self, each: &mut impl FnMut(Bytes)) {
self.tables.into_iter().for_each(|x| x.consume_each_list(each));
}
}

impl ConsumeEachBuffer for TableUpdate<BsatnFormat> {
fn consume_each_list(self, each: &mut impl FnMut(Bytes)) {
self.updates.into_iter().for_each(|x| x.consume_each_list(each));
}
}

impl ConsumeEachBuffer for CompressableQueryUpdate<BsatnFormat> {
fn consume_each_list(self, each: &mut impl FnMut(Bytes)) {
match self {
Self::Uncompressed(x) => x.consume_each_list(each),
Self::Brotli(bytes) | Self::Gzip(bytes) => each(bytes),
}
}
}

impl ConsumeEachBuffer for QueryUpdate<BsatnFormat> {
fn consume_each_list(self, each: &mut impl FnMut(Bytes)) {
self.deletes.consume_each_list(each);
self.inserts.consume_each_list(each);
}
}

impl ConsumeEachBuffer for BsatnRowList {
fn consume_each_list(self, each: &mut impl FnMut(Bytes)) {
let (_, buffer) = self.into_inner();
each(buffer);
}
}
7 changes: 7 additions & 0 deletions crates/core/src/client/messages.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use super::{ClientConfig, DataMessage, Protocol};
use crate::client::consume_each_list::ConsumeEachBuffer;
use crate::host::module_host::{EventStatus, ModuleEvent, ProcedureCallError};
use crate::host::{ArgsTuple, ProcedureCallResult};
use crate::messages::websocket as ws;
use crate::subscription::row_list_builder_pool::BsatnRowListBuilderPool;
use crate::subscription::websocket_building::{brotli_compress, decide_compression, gzip_compress};
use bytes::{BufMut, Bytes, BytesMut};
use bytestring::ByteString;
Expand Down Expand Up @@ -133,6 +135,7 @@ impl InUseSerializeBuffer {
/// If `protocol` is [`Protocol::Binary`],
/// the message will be conditionally compressed by this method according to `compression`.
pub fn serialize(
bsatn_rlb_pool: &BsatnRowListBuilderPool,
mut buffer: SerializeBuffer,
msg: impl ToProtocol<Encoded = SwitchedServerMessage>,
config: ClientConfig,
Expand All @@ -155,6 +158,10 @@ pub fn serialize(
bsatn::to_writer(w.into_inner(), &msg).unwrap()
});

// At this point, we no longer have a use for `msg`,
// so try to reclaim its buffers.
msg.consume_each_list(&mut |buffer| bsatn_rlb_pool.try_put(buffer));

// Conditionally compress the message.
let (in_use, msg_bytes) = match decide_compression(srv_msg.len(), config.compression) {
Compression::None => buffer.uncompressed(),
Expand Down
Loading