diff --git a/Cargo.toml b/Cargo.toml index d18e09b..9c0da41 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["crates/*"] resolver = "2" [workspace.package] -version = "0.6.5" +version = "0.6.6" edition = "2024" rust-version = "1.92" authors = ["init4"] @@ -35,13 +35,13 @@ incremental = false [workspace.dependencies] # internal -signet-hot = { version = "0.6.5", path = "./crates/hot" } -signet-hot-mdbx = { version = "0.6.5", path = "./crates/hot-mdbx" } -signet-cold = { version = "0.6.5", path = "./crates/cold" } -signet-cold-mdbx = { version = "0.6.5", path = "./crates/cold-mdbx" } -signet-cold-sql = { version = "0.6.5", path = "./crates/cold-sql" } -signet-storage = { version = "0.6.5", path = "./crates/storage" } -signet-storage-types = { version = "0.6.5", path = "./crates/types" } +signet-hot = { version = "0.6.6", path = "./crates/hot" } +signet-hot-mdbx = { version = "0.6.6", path = "./crates/hot-mdbx" } +signet-cold = { version = "0.6.6", path = "./crates/cold" } +signet-cold-mdbx = { version = "0.6.6", path = "./crates/cold-mdbx" } +signet-cold-sql = { version = "0.6.6", path = "./crates/cold-sql" } +signet-storage = { version = "0.6.6", path = "./crates/storage" } +signet-storage-types = { version = "0.6.6", path = "./crates/types" } # External, in-house signet-libmdbx = { version = "0.8.0" } diff --git a/crates/cold/src/task/runner.rs b/crates/cold/src/task/runner.rs index c3ce0eb..c6bd6df 100644 --- a/crates/cold/src/task/runner.rs +++ b/crates/cold/src/task/runner.rs @@ -3,8 +3,12 @@ //! The [`ColdStorageTask`] processes requests from channels and dispatches //! them to the storage backend. Reads and writes use separate channels: //! -//! - **Reads**: Processed concurrently (up to 64 in flight) via spawned tasks -//! - **Writes**: Processed sequentially (inline await) to maintain ordering +//! - **Reads**: Processed concurrently (up to 64 in flight) via spawned tasks. +//! In-flight reads are drained before each write. +//! - **Writes**: Processed sequentially (inline await) to maintain ordering. +//! - **Streams**: Log-streaming producers run independently, tracked for +//! graceful shutdown but not drained before writes. Backends must provide +//! their own read isolation (e.g. snapshot semantics). //! //! Transaction, receipt, and header lookups are served from an LRU cache, //! avoiding repeated backend reads for frequently queried items. @@ -56,6 +60,10 @@ struct ColdStorageTaskInner { cache: Mutex, max_stream_deadline: Duration, stream_semaphore: Arc, + /// Tracks long-lived stream producer tasks separately from short reads. + /// Not drained before writes — backends provide their own read isolation. + /// Drained on graceful shutdown. + stream_tracker: TaskTracker, } impl ColdStorageTaskInner { @@ -206,7 +214,7 @@ impl ColdStorageTaskInner { let (sender, rx) = mpsc::channel(STREAM_CHANNEL_BUFFER); let inner = Arc::clone(self); - tokio::spawn(async move { + self.stream_tracker.spawn(async move { let _permit = permit; let params = crate::StreamParams { from, to, max_logs, sender, deadline: deadline_instant }; @@ -254,9 +262,14 @@ impl ColdStorageTaskInner { /// # Processing Model /// /// - **Reads**: Spawned as concurrent tasks (up to 64 in flight). -/// Multiple reads can execute in parallel. +/// Multiple reads can execute in parallel. In-flight reads are drained +/// before each write to ensure exclusive backend access. /// - **Writes**: Processed inline (sequential). Each write completes before /// the next is started, ensuring ordering. +/// - **Streams**: Log-streaming producer tasks run independently of the +/// read/write lifecycle. They are tracked separately for graceful +/// shutdown but are NOT drained before writes. Backends MUST provide +/// their own read isolation for streaming (snapshot semantics). /// /// This design prioritizes write ordering for correctness while allowing /// read throughput to scale with concurrency. @@ -299,6 +312,7 @@ impl ColdStorageTask { cache: Mutex::new(ColdCache::new()), max_stream_deadline: DEFAULT_MAX_STREAM_DEADLINE, stream_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_STREAMS)), + stream_tracker: TaskTracker::new(), }), read_receiver, write_receiver, @@ -338,8 +352,9 @@ impl ColdStorageTask { debug!("Cold storage write channel closed"); break; }; - // Drain in-flight reads before executing the write to - // ensure exclusive access to the backend. + // Drain in-flight read tasks before executing the write. + // Stream producers are NOT drained here — they rely on + // backend-level read isolation (snapshot semantics). self.task_tracker.close(); self.task_tracker.wait().await; self.task_tracker.reopen(); @@ -372,10 +387,16 @@ impl ColdStorageTask { } } - // Graceful shutdown: wait for in-progress read tasks to complete + // Graceful shutdown: drain reads first (short-lived), then streams + // (bounded by deadline). Reads must drain first because StreamLogs + // requests spawn stream tasks — draining streams before reads could + // miss newly spawned producers. debug!("Waiting for in-progress read handlers to complete"); self.task_tracker.close(); self.task_tracker.wait().await; + debug!("Waiting for in-progress stream producers to complete"); + self.inner.stream_tracker.close(); + self.inner.stream_tracker.wait().await; debug!("Cold storage task shut down gracefully"); } } diff --git a/crates/cold/src/traits.rs b/crates/cold/src/traits.rs index f3d98ff..d5bde67 100644 --- a/crates/cold/src/traits.rs +++ b/crates/cold/src/traits.rs @@ -207,14 +207,21 @@ pub trait ColdStorage: Send + Sync + 'static { /// Produce a log stream by iterating blocks and sending matching logs. /// - /// Implementations should hold a consistent read snapshot for the - /// duration when possible — backends with snapshot semantics (MDBX, - /// PostgreSQL with REPEATABLE READ) need no additional reorg detection. + /// # Concurrency + /// + /// Stream producers run concurrently with writes (`append_block`, + /// `truncate_above`, `drain_above`). They are NOT serialized by the + /// task runner's read/write barrier. Implementations MUST hold a + /// consistent read snapshot for the duration of the stream. + /// + /// Backends with snapshot semantics (MDBX read transactions, + /// PostgreSQL `REPEATABLE READ`) naturally satisfy this requirement. /// /// Backends without snapshot semantics can delegate to /// [`produce_log_stream_default`], which uses per-block /// [`get_header`] / [`get_logs`] calls with anchor-hash reorg - /// detection. + /// detection. This provides best-effort consistency but is not + /// immune to partial reads during concurrent writes. /// /// All errors are sent through `sender`. When this method returns, /// the sender is dropped, closing the stream.