feat(storage): add drain_above to ColdStorage + UnifiedStorage#38
feat(storage): add drain_above to ColdStorage + UnifiedStorage#38
Conversation
Adds `DrainedBlock` type and `drain_above` method that reads removed block headers and receipts before unwinding, enabling reorg notifications. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add `drain_above` as a native `ColdStorage` trait method that atomically reads receipts and truncates in one operation. Each backend overrides with its own atomic version: - MemColdBackend: holds write lock for entire operation - MdbxColdBackend: single read-write transaction - SqlColdBackend: sequential reads + truncate (atomic via task runner) Update `UnifiedStorage::drain_above` to use the new atomic cold method instead of N separate reads + dispatch_truncate. Add conformance test and integration tests. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
drain_above to UnifiedStorage|
@Evalir i have made this more complicated to avoid a class of concurrency issues |
Fraser999
left a comment
There was a problem hiding this comment.
Just blocking due to the potential TOCTOU issue. Mostly nitpicks otherwise.
| Ok(()) | ||
| } | ||
|
|
||
| fn drain_above_inner( |
There was a problem hiding this comment.
This is very similar to truncate_above_inner just above here. Could we extract the shared logic, or get rid of truncate_above_inner, or have truncate_above_inner delegate to drain_above_inner discarding the return value?
| async fn drain_above(&self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> { | ||
| // Read receipts then truncate, all within a single SQL transaction. | ||
| // We use the default trait logic against self (which will use the pool | ||
| // for reads), then truncate. For true atomicity under concurrent | ||
| // access the caller should ensure exclusive write access via the task | ||
| // runner, which processes writes sequentially. | ||
| let latest = self.get_latest_block().await?; | ||
| let mut all_receipts = Vec::new(); | ||
| if let Some(latest) = latest { | ||
| for n in (block + 1)..=latest { | ||
| all_receipts.push(self.get_receipts_in_block(n).await?); | ||
| } | ||
| } | ||
| self.truncate_above(block).await?; | ||
| Ok(all_receipts) | ||
| } |
There was a problem hiding this comment.
This is the same as the default impl it's overriding.
| /// 1. Reads headers from hot storage (sync) | ||
| /// 2. Reads receipts from cold storage (async, best-effort) | ||
| /// 3. Unwinds hot storage (sync) | ||
| /// 4. Dispatches truncate to cold storage (non-blocking) |
There was a problem hiding this comment.
This doesn't really seem to match the impl now - looks like it was an old comment from before drain_above existed maybe?
| /// [`Cold`]: crate::StorageError::Cold | ||
| pub async fn drain_above(&self, block: BlockNumber) -> StorageResult<Vec<DrainedBlock>> { | ||
| // 1. Read headers above `block` from hot storage | ||
| let reader = self.reader()?; |
There was a problem hiding this comment.
Should we just instantiate and use the writer here for reading too rather than creating a reader then later a writer, as that seems to have a TOCTOU race condition? I.e. more blocks could be added while we're working with what has then become a stale reader.
|
|
||
| // Verify hot tip is now block 0 | ||
| assert_eq!(hot.reader().unwrap().get_chain_tip().unwrap().unwrap().0, 0); | ||
|
|
There was a problem hiding this comment.
We could check the cold storage got updated too:
| assert_eq!(storage.cold().get_latest_block().await.unwrap().unwrap(), 0); | |
|
|
||
| // Verify hot tip still 1 | ||
| assert_eq!(hot.reader().unwrap().get_chain_tip().unwrap().unwrap().0, 1); | ||
|
|
There was a problem hiding this comment.
| assert_eq!(storage.cold().get_latest_block().await.unwrap().unwrap(), 1); | |
Summary
drain_aboveas a nativeColdStoragetrait method with a default impl that composesget_latest_block+get_receipts_in_block+truncate_abovedrain_aboveatomically in each backend:MemColdBackend: holds write lock for entire operationMdbxColdBackend: single read-write transaction (read receipts + delete in one commit)SqlColdBackend: sequential reads + truncate (atomic via task runner's sequential write processing)DrainAbovevariant toColdWriteRequestanddrain_abovemethod onColdStorageHandleUnifiedStorage::drain_aboveto use the new atomic cold method instead of N separate reads +dispatch_truncateDrainedBlockstruct withheaderandreceiptsfieldstest_drain_above) and 3 integration tests0.6.4→0.6.5Resolves ENG-1978. Unblocks ENG-1968 (
ChainEvent::Reorgnotifications).Test plan
--all-featuresand--no-default-features) forsignet-cold,signet-cold-mdbx,signet-cold-sql,signet-storagecargo +nightly fmtcargo t -p signet-cold— mem conformance passes (includestest_drain_above)cargo t -p signet-cold-mdbx— MDBX conformance passescargo t -p signet-cold-sql --all-features— SQLite conformance passescargo t -p signet-storage— 5 tests pass including 3 newdrain_aboveintegration tests🤖 Generated with Claude Code