Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = ["crates/*"]
resolver = "2"

[workspace.package]
version = "0.6.5"
version = "0.6.6"
edition = "2024"
rust-version = "1.92"
authors = ["init4"]
Expand Down Expand Up @@ -35,13 +35,13 @@ incremental = false

[workspace.dependencies]
# internal
signet-hot = { version = "0.6.5", path = "./crates/hot" }
signet-hot-mdbx = { version = "0.6.5", path = "./crates/hot-mdbx" }
signet-cold = { version = "0.6.5", path = "./crates/cold" }
signet-cold-mdbx = { version = "0.6.5", path = "./crates/cold-mdbx" }
signet-cold-sql = { version = "0.6.5", path = "./crates/cold-sql" }
signet-storage = { version = "0.6.5", path = "./crates/storage" }
signet-storage-types = { version = "0.6.5", path = "./crates/types" }
signet-hot = { version = "0.6.6", path = "./crates/hot" }
signet-hot-mdbx = { version = "0.6.6", path = "./crates/hot-mdbx" }
signet-cold = { version = "0.6.6", path = "./crates/cold" }
signet-cold-mdbx = { version = "0.6.6", path = "./crates/cold-mdbx" }
signet-cold-sql = { version = "0.6.6", path = "./crates/cold-sql" }
signet-storage = { version = "0.6.6", path = "./crates/storage" }
signet-storage-types = { version = "0.6.6", path = "./crates/types" }

# External, in-house
signet-libmdbx = { version = "0.8.0" }
Expand Down
35 changes: 28 additions & 7 deletions crates/cold/src/task/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
//! The [`ColdStorageTask`] processes requests from channels and dispatches
//! them to the storage backend. Reads and writes use separate channels:
//!
//! - **Reads**: Processed concurrently (up to 64 in flight) via spawned tasks
//! - **Writes**: Processed sequentially (inline await) to maintain ordering
//! - **Reads**: Processed concurrently (up to 64 in flight) via spawned tasks.
//! In-flight reads are drained before each write.
//! - **Writes**: Processed sequentially (inline await) to maintain ordering.
//! - **Streams**: Log-streaming producers run independently, tracked for
//! graceful shutdown but not drained before writes. Backends must provide
//! their own read isolation (e.g. snapshot semantics).
//!
//! Transaction, receipt, and header lookups are served from an LRU cache,
//! avoiding repeated backend reads for frequently queried items.
Expand Down Expand Up @@ -56,6 +60,10 @@ struct ColdStorageTaskInner<B> {
cache: Mutex<ColdCache>,
max_stream_deadline: Duration,
stream_semaphore: Arc<Semaphore>,
/// Tracks long-lived stream producer tasks separately from short reads.
/// Not drained before writes — backends provide their own read isolation.
/// Drained on graceful shutdown.
stream_tracker: TaskTracker,
}

impl<B: ColdStorage> ColdStorageTaskInner<B> {
Expand Down Expand Up @@ -206,7 +214,7 @@ impl<B: ColdStorage> ColdStorageTaskInner<B> {
let (sender, rx) = mpsc::channel(STREAM_CHANNEL_BUFFER);
let inner = Arc::clone(self);

tokio::spawn(async move {
self.stream_tracker.spawn(async move {
let _permit = permit;
let params =
crate::StreamParams { from, to, max_logs, sender, deadline: deadline_instant };
Expand Down Expand Up @@ -254,9 +262,14 @@ impl<B: ColdStorage> ColdStorageTaskInner<B> {
/// # Processing Model
///
/// - **Reads**: Spawned as concurrent tasks (up to 64 in flight).
/// Multiple reads can execute in parallel.
/// Multiple reads can execute in parallel. In-flight reads are drained
/// before each write to ensure exclusive backend access.
/// - **Writes**: Processed inline (sequential). Each write completes before
/// the next is started, ensuring ordering.
/// - **Streams**: Log-streaming producer tasks run independently of the
/// read/write lifecycle. They are tracked separately for graceful
/// shutdown but are NOT drained before writes. Backends MUST provide
/// their own read isolation for streaming (snapshot semantics).
///
/// This design prioritizes write ordering for correctness while allowing
/// read throughput to scale with concurrency.
Expand Down Expand Up @@ -299,6 +312,7 @@ impl<B: ColdStorage> ColdStorageTask<B> {
cache: Mutex::new(ColdCache::new()),
max_stream_deadline: DEFAULT_MAX_STREAM_DEADLINE,
stream_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_STREAMS)),
stream_tracker: TaskTracker::new(),
}),
read_receiver,
write_receiver,
Expand Down Expand Up @@ -338,8 +352,9 @@ impl<B: ColdStorage> ColdStorageTask<B> {
debug!("Cold storage write channel closed");
break;
};
// Drain in-flight reads before executing the write to
// ensure exclusive access to the backend.
// Drain in-flight read tasks before executing the write.
// Stream producers are NOT drained here — they rely on
// backend-level read isolation (snapshot semantics).
self.task_tracker.close();
self.task_tracker.wait().await;
self.task_tracker.reopen();
Expand Down Expand Up @@ -372,10 +387,16 @@ impl<B: ColdStorage> ColdStorageTask<B> {
}
}

// Graceful shutdown: wait for in-progress read tasks to complete
// Graceful shutdown: drain reads first (short-lived), then streams
// (bounded by deadline). Reads must drain first because StreamLogs
// requests spawn stream tasks — draining streams before reads could
// miss newly spawned producers.
debug!("Waiting for in-progress read handlers to complete");
self.task_tracker.close();
self.task_tracker.wait().await;
debug!("Waiting for in-progress stream producers to complete");
self.inner.stream_tracker.close();
self.inner.stream_tracker.wait().await;
debug!("Cold storage task shut down gracefully");
}
}
15 changes: 11 additions & 4 deletions crates/cold/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,21 @@ pub trait ColdStorage: Send + Sync + 'static {

/// Produce a log stream by iterating blocks and sending matching logs.
///
/// Implementations should hold a consistent read snapshot for the
/// duration when possible — backends with snapshot semantics (MDBX,
/// PostgreSQL with REPEATABLE READ) need no additional reorg detection.
/// # Concurrency
///
/// Stream producers run concurrently with writes (`append_block`,
/// `truncate_above`, `drain_above`). They are NOT serialized by the
/// task runner's read/write barrier. Implementations MUST hold a
/// consistent read snapshot for the duration of the stream.
///
/// Backends with snapshot semantics (MDBX read transactions,
/// PostgreSQL `REPEATABLE READ`) naturally satisfy this requirement.
///
/// Backends without snapshot semantics can delegate to
/// [`produce_log_stream_default`], which uses per-block
/// [`get_header`] / [`get_logs`] calls with anchor-hash reorg
/// detection.
/// detection. This provides best-effort consistency but is not
/// immune to partial reads during concurrent writes.
///
/// All errors are sent through `sender`. When this method returns,
/// the sender is dropped, closing the stream.
Expand Down
Loading