Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use axum::Extension;
use axum_extra::TypedHeader;
use futures::StreamExt;
use http::StatusCode;
use log::info;
use serde::Deserialize;
use spacetimedb::database_logger::DatabaseLogger;
use spacetimedb::host::module_host::ClientConnectedError;
Expand Down Expand Up @@ -942,6 +943,7 @@ pub async fn pre_publish<S: NodeDelegate + ControlStateDelegate + Authorization>
PrettyPrintStyle::AnsiColor => AutoMigratePrettyPrintStyle::AnsiColor,
};

info!("planning migration for database {database_identity}");
let migrate_plan = ctx
.migrate_plan(
DatabaseDef {
Expand All @@ -963,6 +965,10 @@ pub async fn pre_publish<S: NodeDelegate + ControlStateDelegate + Authorization>
breaks_client,
plan,
} => {
info!(
"planned auto-migration of database {} from {} to {}",
database_identity, old_module_hash, new_module_hash
);
let token = MigrationToken {
database_identity,
old_module_hash,
Expand All @@ -977,6 +983,7 @@ pub async fn pre_publish<S: NodeDelegate + ControlStateDelegate + Authorization>
}))
}
MigratePlanResult::AutoMigrationError(e) => {
info!("database {database_identity} needs manual migration");
Ok(PrePublishResult::ManualMigrate(PrePublishManualMigrateResult {
reason: e.to_string(),
}))
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::worker_metrics::WORKER_METRICS;
use anyhow::{anyhow, Context};
use enum_map::EnumMap;
use fs2::FileExt;
use log::info;
use spacetimedb_commitlog::repo::OnNewSegmentFn;
use spacetimedb_commitlog::{self as commitlog, SizeOnDisk};
use spacetimedb_data_structures::map::IntSet;
Expand Down Expand Up @@ -1112,13 +1113,15 @@ pub fn spawn_view_cleanup_loop(db: Arc<RelationalDB>) -> tokio::task::AbortHandl
// in milliseconds, which may be too long for async tasks.
let db = db.clone();
let db_identity = db.database_identity();
info!("running view cleanup for database {db_identity}");
tokio::task::spawn_blocking(move || run_view_cleanup(&db))
.await
.inspect_err(|e| {
log::error!("[{}] DATABASE: failed to run view cleanup task: {}", db_identity, e);
})
.ok();

info!("pausing view cleanup for database {db_identity}");
tokio::time::sleep(VIEWS_EXPIRATION).await;
}
})
Expand Down
116 changes: 92 additions & 24 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::subscription::module_subscription_manager::{spawn_send_worker, Subscr
use crate::util::asyncify;
use crate::util::jobs::{JobCores, SingleCoreExecutor};
use crate::worker_metrics::WORKER_METRICS;
use anyhow::{anyhow, Context};
use anyhow::{anyhow, bail, Context};
use async_trait::async_trait;
use durability::{Durability, EmptyHistory};
use log::{info, trace, warn};
Expand All @@ -41,10 +41,12 @@ use spacetimedb_table::page_pool::PagePool;
use std::future::Future;
use std::ops::Deref;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Duration;
use tempfile::TempDir;
use tokio::sync::{watch, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock as AsyncRwLock};
use tokio::task::AbortHandle;
use tokio::time::error::Elapsed;
use tokio::time::{interval_at, timeout, Instant};

// TODO:
//
Expand Down Expand Up @@ -252,17 +254,23 @@ impl HostController {
) -> anyhow::Result<watch::Receiver<ModuleHost>> {
// Try a read lock first.
{
let guard = self.acquire_read_lock(replica_id).await;
if let Some(host) = &*guard {
trace!("cached host {}/{}", database.database_identity, replica_id);
return Ok(host.module.subscribe());
if let Ok(guard) = self.acquire_read_lock(replica_id).await {
if let Some(host) = &*guard {
trace!("cached host {}/{}", database.database_identity, replica_id);
return Ok(host.module.subscribe());
}
}
}

// We didn't find a running module, so take a write lock.
// Since [`tokio::sync::RwLock`] doesn't support upgrading of read locks,
// we'll need to check again if a module was added meanwhile.
let mut guard = self.acquire_write_lock(replica_id).await;
let Ok(mut guard) = self.acquire_write_lock(replica_id).await else {
bail!(
"unable to lock database {} for initialization",
database.database_identity
);
};
if let Some(host) = &*guard {
trace!(
"cached host {}/{} (lock upgrade)",
Expand Down Expand Up @@ -381,7 +389,9 @@ impl HostController {
program.hash
);

let mut guard = self.acquire_write_lock(replica_id).await;
let Ok(mut guard) = self.acquire_write_lock(replica_id).await else {
bail!("unable to lock database {} for update", database.database_identity);
};

// `HostController::clone` is fast,
// as all of its fields are either `Copy` or wrapped in `Arc`.
Expand Down Expand Up @@ -453,7 +463,12 @@ impl HostController {
program.hash
);

let guard = self.acquire_read_lock(replica_id).await;
let Ok(guard) = self.acquire_read_lock(replica_id).await else {
bail!(
"unable to lock database {} for migration planning",
database.database_identity
);
};
let host = guard.as_ref().ok_or(NoSuchModule)?;

host.migrate_plan(host_type, program, style).await
Expand All @@ -463,14 +478,43 @@ impl HostController {
/// and deregister it from the controller.
#[tracing::instrument(level = "trace", skip_all)]
pub async fn exit_module_host(&self, replica_id: u64) -> Result<(), anyhow::Error> {
trace!("exit module host {replica_id}");
let lock = self.hosts.lock().remove(&replica_id);
if let Some(lock) = lock {
if let Some(host) = lock.write_owned().await.take() {
let module = host.module.borrow().clone();
module.exit().await;
let table_names = module.info().module_def.tables().map(|t| t.name.deref());
remove_database_gauges(&module.info().database_identity, table_names);
let Some(lock) = self.hosts.lock().remove(&replica_id) else {
return Ok(());
};
// To debug the potential deadlock issue reported in
// https://github.com/clockworklabs/SpacetimeDBPrivate/issues/2337
// we'll log a warning every 5s if we can't acquire an exclusive lock.
let start = Instant::now();
let mut t = interval_at(start + Duration::from_secs(5), Duration::from_secs(5));
// Spawn so we don't lose our place in the queue.
let mut excl = tokio::spawn(lock.write_owned());
loop {
tokio::select! {
guard = &mut excl => {
let Ok(mut guard) = guard else {
warn!("cancelled shutdown of module of replica {replica_id}");
break;
};
let Some(host) = guard.take() else {
break;
};
let module = host.module.borrow().clone();
let info = module.info();
info!("exiting replica {} of database {}", replica_id, info.database_identity);
module.exit().await;
let table_names = info.module_def.tables().map(|t| t.name.deref());
remove_database_gauges(&info.database_identity, table_names);
info!("replica {} of database {} exited", replica_id, info.database_identity);

break;
},
_ = t.tick() => {
warn!(
"blocked waiting to exit module for replica {} since {}s",
replica_id,
start.elapsed().as_secs_f32()
);
}
}
}

Expand All @@ -485,7 +529,10 @@ impl HostController {
#[tracing::instrument(level = "trace", skip_all)]
pub async fn get_module_host(&self, replica_id: u64) -> Result<ModuleHost, NoSuchModule> {
trace!("get module host {replica_id}");
let guard = self.acquire_read_lock(replica_id).await;
let guard = self.acquire_read_lock(replica_id).await.map_err(|_| {
warn!("timeout waiting for read lock on replica {replica_id} in `get_module_host`");
NoSuchModule
})?;
guard
.as_ref()
.map(|Host { module, .. }| module.borrow().clone())
Expand All @@ -500,7 +547,10 @@ impl HostController {
#[tracing::instrument(level = "trace", skip_all)]
pub async fn watch_module_host(&self, replica_id: u64) -> Result<watch::Receiver<ModuleHost>, NoSuchModule> {
trace!("watch module host {replica_id}");
let guard = self.acquire_read_lock(replica_id).await;
let guard = self.acquire_read_lock(replica_id).await.map_err(|_| {
warn!("timeout waiting for read lock on {replica_id} in `watch_module_host`");
NoSuchModule
})?;
guard
.as_ref()
.map(|Host { module, .. }| module.subscribe())
Expand All @@ -510,7 +560,13 @@ impl HostController {
/// `true` if the module host `replica_id` is currently registered with
/// the controller.
pub async fn has_module_host(&self, replica_id: u64) -> bool {
self.acquire_read_lock(replica_id).await.is_some()
let Ok(maybe_host) = self.acquire_read_lock(replica_id).await else {
warn!("timeout waiting for read lock on replica {replica_id} in `has_module_host`");
// Technically, we have it.
return true;
};

maybe_host.is_some()
}

/// On-panic callback passed to [`ModuleHost`]s created by this controller.
Expand All @@ -525,14 +581,22 @@ impl HostController {
}
}

async fn acquire_write_lock(&self, replica_id: u64) -> OwnedRwLockWriteGuard<Option<Host>> {
/// Acquire a write lock on the [HostCell] for `replica_id`.
///
/// This will time out after 5s to aid debugging of
/// https://github.com/clockworklabs/SpacetimeDBPrivate/issues/2337
async fn acquire_write_lock(&self, replica_id: u64) -> Result<OwnedRwLockWriteGuard<Option<Host>>, Elapsed> {
let lock = self.hosts.lock().entry(replica_id).or_default().clone();
lock.write_owned().await
timeout(Duration::from_secs(5), lock.write_owned()).await
}

async fn acquire_read_lock(&self, replica_id: u64) -> OwnedRwLockReadGuard<Option<Host>> {
/// Acquire a read lock on the [HostCell] for `replica_id`.
///
/// This will time out after 5s to aid debugging of
/// https://github.com/clockworklabs/SpacetimeDBPrivate/issues/2337
async fn acquire_read_lock(&self, replica_id: u64) -> Result<OwnedRwLockReadGuard<Option<Host>>, Elapsed> {
let lock = self.hosts.lock().entry(replica_id).or_default().clone();
lock.read_owned().await
timeout(Duration::from_secs(5), lock.read_owned()).await
}

async fn try_init_host(&self, database: Database, replica_id: u64) -> anyhow::Result<Host> {
Expand Down Expand Up @@ -1059,6 +1123,10 @@ impl Host {

impl Drop for Host {
fn drop(&mut self) {
info!(
"dropping host {}/{}",
self.replica_ctx.database.database_identity, self.replica_ctx.replica_id
);
self.disk_metrics_recorder_task.abort();
self.tx_metrics_recorder_task.abort();
self.view_cleanup_task.abort();
Expand Down
Loading