diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index f1cb15bfa41..1504b42d6ee 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -576,8 +576,8 @@ impl FirehoseMapperTrait for FirehoseMapper { Ok(BlockStreamEvent::Revert( block.ptr(), + parent_ptr, FirehoseCursor::Some(response.cursor.clone()), - Some(parent_ptr), )) } diff --git a/chain/near/src/chain.rs b/chain/near/src/chain.rs index e7013ea19f9..eda1e21dc1c 100644 --- a/chain/near/src/chain.rs +++ b/chain/near/src/chain.rs @@ -304,15 +304,15 @@ impl FirehoseMapperTrait for FirehoseMapper { )), StepUndo => { - let header = block.header(); - let parent_ptr = header + let parent_ptr = block + .header() .parent_ptr() .expect("Genesis block should never be reverted"); Ok(BlockStreamEvent::Revert( block.ptr(), + parent_ptr, Some(response.cursor.clone()), - Some(parent_ptr), )) } diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index f1734d1a0a7..5fb96519f8f 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -553,7 +553,7 @@ where let (block, cursor) = match event { Some(Ok(BlockStreamEvent::ProcessBlock(block, cursor))) => (block, cursor), - Some(Ok(BlockStreamEvent::Revert(subgraph_ptr, _, optional_parent_ptr))) => { + Some(Ok(BlockStreamEvent::Revert(subgraph_ptr, parent_ptr, cursor))) => { info!( logger, "Reverting block to get back to main chain"; @@ -561,53 +561,20 @@ where "block_hash" => format!("{}", subgraph_ptr.hash) ); - // We would like to revert the DB state to the parent of the current block. - match optional_parent_ptr { - Some(parent_ptr) => { - if let Err(e) = inputs.store.revert_block_operations(parent_ptr) { - error!( - &logger, - "Could not revert block. Retrying"; - "block_number" => format!("{}", subgraph_ptr.number), - "block_hash" => format!("{}", subgraph_ptr.hash), - "error" => e.to_string(), - ); - - // Exit inner block stream consumption loop and go up to loop that restarts subgraph - break; - } - } - None => { - // First, load the block in order to get the parent hash. - if let Err(e) = inputs - .triggers_adapter - .parent_ptr(&subgraph_ptr) - .await - .map(|parent_ptr| { - parent_ptr.expect("genesis block cannot be reverted") - }) - .and_then(|parent_ptr| { - // Revert entity changes from this block, and update subgraph ptr. - inputs - .store - .revert_block_operations(parent_ptr) - .map_err(Into::into) - }) - { - error!( - &logger, - "Could not revert block. \ - The likely cause is the block not being found due to a deep reorg. \ - Retrying"; - "block_number" => format!("{}", subgraph_ptr.number), - "block_hash" => format!("{}", subgraph_ptr.hash), - "error" => e.to_string(), - ); - - // Exit inner block stream consumption loop and go up to loop that restarts subgraph - break; - } - } + if let Err(e) = inputs + .store + .revert_block_operations(parent_ptr, cursor.as_deref()) + { + error!( + &logger, + "Could not revert block. Retrying"; + "block_number" => format!("{}", subgraph_ptr.number), + "block_hash" => format!("{}", subgraph_ptr.hash), + "error" => e.to_string(), + ); + + // Exit inner block stream consumption loop and go up to loop that restarts subgraph + break; } ctx.block_stream_metrics @@ -626,6 +593,7 @@ where ctx.state.entity_lfu_cache = LfuCache::new(); continue; } + // Log and drop the errors from the block_stream // The block stream will continue attempting to produce blocks Some(Err(e)) => { diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 94f41e8f45c..f027388147f 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -177,7 +177,7 @@ pub enum BlockStreamEvent { // The payload is the current subgraph head pointer, which should be reverted, such that the // parent of the current subgraph head becomes the new subgraph head. // An optional pointer to the parent block will save a round trip operation when reverting. - Revert(BlockPtr, FirehoseCursor, Option), + Revert(BlockPtr, BlockPtr, FirehoseCursor), ProcessBlock(BlockWithTriggers, FirehoseCursor), } diff --git a/graph/src/blockchain/polling_block_stream.rs b/graph/src/blockchain/polling_block_stream.rs index efff90f4ac6..cfae4081c3f 100644 --- a/graph/src/blockchain/polling_block_stream.rs +++ b/graph/src/blockchain/polling_block_stream.rs @@ -142,7 +142,7 @@ where /// Blocks and range size Blocks(VecDeque>, BlockNumber), - // The payload is the current subgraph head pointer, which should be reverted and it's parent, such that the + // The payload is the current subgraph head pointer, which should be reverted and its parent, such that the // parent of the current subgraph head becomes the new subgraph head. Revert(BlockPtr, BlockPtr), Done, @@ -220,7 +220,9 @@ where return Ok(NextBlocks::Done); } - ReconciliationStep::Revert(from, to) => return Ok(NextBlocks::Revert(from, to)), + ReconciliationStep::Revert(from, parent_ptr) => { + return Ok(NextBlocks::Revert(from, parent_ptr)) + } } } } @@ -565,14 +567,14 @@ impl Stream for PollingBlockStream { // Poll for chain head update continue; } - NextBlocks::Revert(from, to) => { - self.ctx.current_block = to.into(); + NextBlocks::Revert(from, parent_ptr) => { + self.ctx.current_block = Some(parent_ptr.clone()); self.state = BlockStreamState::BeginReconciliation; break Poll::Ready(Some(Ok(BlockStreamEvent::Revert( from, + parent_ptr, FirehoseCursor::None, - self.ctx.current_block.clone(), )))); } }, diff --git a/graph/src/components/store.rs b/graph/src/components/store.rs index 8657cc3e1af..0b94cec2022 100644 --- a/graph/src/components/store.rs +++ b/graph/src/components/store.rs @@ -1037,7 +1037,11 @@ pub trait WritableStore: Send + Sync + 'static { /// subgraph block pointer to `block_ptr_to`. /// /// `block_ptr_to` must point to the parent block of the subgraph block pointer. - fn revert_block_operations(&self, block_ptr_to: BlockPtr) -> Result<(), StoreError>; + fn revert_block_operations( + &self, + block_ptr_to: BlockPtr, + firehose_cursor: Option<&str>, + ) -> Result<(), StoreError>; /// If a deterministic error happened, this function reverts the block operations from the /// current block to the previous block. diff --git a/graph/tests/entity_cache.rs b/graph/tests/entity_cache.rs index 35f99d6fbbd..7bb8b91829d 100644 --- a/graph/tests/entity_cache.rs +++ b/graph/tests/entity_cache.rs @@ -58,7 +58,7 @@ impl WritableStore for MockStore { unimplemented!() } - fn revert_block_operations(&self, _: BlockPtr) -> Result<(), StoreError> { + fn revert_block_operations(&self, _: BlockPtr, _: Option<&str>) -> Result<(), StoreError> { unimplemented!() } diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 52180aeb805..48b453df79d 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1,3 +1,4 @@ +use anyhow::Context; use detail::DeploymentDetail; use diesel::connection::SimpleConnection; use diesel::pg::PgConnection; @@ -905,7 +906,7 @@ impl DeploymentStore { if let Some(cursor) = firehose_cursor { if cursor != "" { - deployment::update_firehose_cursor(&conn, &site.deployment, &cursor)?; + deployment::update_firehose_cursor(&conn, &site.deployment, cursor)?; } } @@ -920,6 +921,7 @@ impl DeploymentStore { conn: &PgConnection, site: Arc, block_ptr_to: BlockPtr, + firehose_cursor: Option<&str>, ) -> Result { let event = conn.transaction(|| -> Result<_, StoreError> { // Don't revert past a graft point @@ -940,6 +942,11 @@ impl DeploymentStore { deployment::revert_block_ptr(&conn, &site.deployment, block_ptr_to.clone())?; + if let Some(cursor) = firehose_cursor { + deployment::update_firehose_cursor(&conn, &site.deployment, cursor) + .context("updating firehose cursor")?; + } + // Revert the data let layout = self.layout(&conn, site.clone())?; @@ -991,13 +998,18 @@ impl DeploymentStore { block_ptr_to.number ); } - self.rewind_with_conn(&conn, site, block_ptr_to) + + // When rewinding, we reset the firehose cursor to the empty string. That way, on resume, + // Firehose will start from the block_ptr instead (with sanity check to ensure it's resume + // at the exact block). + self.rewind_with_conn(&conn, site, block_ptr_to, Some("")) } pub(crate) fn revert_block_operations( &self, site: Arc, block_ptr_to: BlockPtr, + firehose_cursor: Option<&str>, ) -> Result { let conn = self.get_conn()?; // Unwrap: If we are reverting then the block ptr is not `None`. @@ -1008,7 +1020,7 @@ impl DeploymentStore { panic!("revert_block_operations must revert a single block only"); } - self.rewind_with_conn(&conn, site, block_ptr_to) + self.rewind_with_conn(&conn, site, block_ptr_to, firehose_cursor) } pub(crate) async fn deployment_state_from_id( @@ -1227,7 +1239,11 @@ impl DeploymentStore { ); // We ignore the StoreEvent that's being returned, we'll not use it. - let _ = self.revert_block_operations(site.clone(), parent_ptr.clone())?; + // + // We reset the firehose cursor to the empty string. That way, on resume, + // Firehose will start from the block_ptr instead (with sanity checks to ensure it's resuming + // at the correct block). + let _ = self.revert_block_operations(site.clone(), parent_ptr.clone(), Some(""))?; // Unfail the deployment. deployment::update_deployment_status(conn, deployment_id, prev_health, None)?; diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index d539b4e4bc6..0ca3da39bb5 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -163,11 +163,17 @@ impl WritableStore { }) } - fn revert_block_operations(&self, block_ptr_to: BlockPtr) -> Result<(), StoreError> { + fn revert_block_operations( + &self, + block_ptr_to: BlockPtr, + firehose_cursor: Option<&str>, + ) -> Result<(), StoreError> { self.retry("revert_block_operations", || { - let event = self - .writable - .revert_block_operations(self.site.clone(), block_ptr_to.clone())?; + let event = self.writable.revert_block_operations( + self.site.clone(), + block_ptr_to.clone(), + firehose_cursor.clone(), + )?; self.try_send_store_event(event) }) } @@ -363,13 +369,16 @@ impl WritableStoreTrait for WritableAgent { self.store.start_subgraph_deployment(logger) } - fn revert_block_operations(&self, block_ptr_to: BlockPtr) -> Result<(), StoreError> { + fn revert_block_operations( + &self, + block_ptr_to: BlockPtr, + firehose_cursor: Option<&str>, + ) -> Result<(), StoreError> { *self.block_ptr.lock().unwrap() = Some(block_ptr_to.clone()); - // FIXME: What about the firehose cursor? Why doesn't that get updated? - // TODO: If we haven't written the block yet, revert in memory. If // we have, revert in the database - self.store.revert_block_operations(block_ptr_to) + self.store + .revert_block_operations(block_ptr_to, firehose_cursor) } fn unfail_deterministic_error( diff --git a/store/postgres/tests/graft.rs b/store/postgres/tests/graft.rs index 1b14c5fc670..8a83ddbc383 100644 --- a/store/postgres/tests/graft.rs +++ b/store/postgres/tests/graft.rs @@ -301,13 +301,13 @@ async fn check_graft( .cheap_clone() .writable(LOGGER.clone(), deployment.id) .await? - .revert_block_operations(BLOCKS[1].clone()) + .revert_block_operations(BLOCKS[1].clone(), None) .expect("We can revert a block we just created"); let err = store .writable(LOGGER.clone(), deployment.id) .await? - .revert_block_operations(BLOCKS[0].clone()) + .revert_block_operations(BLOCKS[0].clone(), None) .expect_err("Reverting past graft point is not allowed"); assert!(err.to_string().contains("Can not revert subgraph")); diff --git a/store/test-store/src/store.rs b/store/test-store/src/store.rs index eae019d3dca..a70a0e843a2 100644 --- a/store/test-store/src/store.rs +++ b/store/test-store/src/store.rs @@ -278,7 +278,7 @@ pub async fn revert_block(store: &Arc, deployment: &DeploymentLocator, pt .writable(LOGGER.clone(), deployment.id) .await .expect("can get writable") - .revert_block_operations(ptr.clone()) + .revert_block_operations(ptr.clone(), None) .unwrap(); }