Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,8 +576,8 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {

Ok(BlockStreamEvent::Revert(
block.ptr(),
parent_ptr,
FirehoseCursor::Some(response.cursor.clone()),
Some(parent_ptr),
))
}

Expand Down
6 changes: 3 additions & 3 deletions chain/near/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,15 +304,15 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
)),

StepUndo => {
let header = block.header();
let parent_ptr = header
let parent_ptr = block
.header()
.parent_ptr()
.expect("Genesis block should never be reverted");

Ok(BlockStreamEvent::Revert(
block.ptr(),
parent_ptr,
Some(response.cursor.clone()),
Some(parent_ptr),
))
}

Expand Down
64 changes: 16 additions & 48 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,61 +553,28 @@ where

let (block, cursor) = match event {
Some(Ok(BlockStreamEvent::ProcessBlock(block, cursor))) => (block, cursor),
Some(Ok(BlockStreamEvent::Revert(subgraph_ptr, _, optional_parent_ptr))) => {
Some(Ok(BlockStreamEvent::Revert(subgraph_ptr, parent_ptr, cursor))) => {
info!(
logger,
"Reverting block to get back to main chain";
"block_number" => format!("{}", subgraph_ptr.number),
"block_hash" => format!("{}", subgraph_ptr.hash)
);

// We would like to revert the DB state to the parent of the current block.
match optional_parent_ptr {
Some(parent_ptr) => {
if let Err(e) = inputs.store.revert_block_operations(parent_ptr) {
error!(
&logger,
"Could not revert block. Retrying";
"block_number" => format!("{}", subgraph_ptr.number),
"block_hash" => format!("{}", subgraph_ptr.hash),
"error" => e.to_string(),
);

// Exit inner block stream consumption loop and go up to loop that restarts subgraph
break;
}
}
None => {
// First, load the block in order to get the parent hash.
if let Err(e) = inputs
.triggers_adapter
.parent_ptr(&subgraph_ptr)
.await
.map(|parent_ptr| {
parent_ptr.expect("genesis block cannot be reverted")
})
.and_then(|parent_ptr| {
// Revert entity changes from this block, and update subgraph ptr.
inputs
.store
.revert_block_operations(parent_ptr)
.map_err(Into::into)
})
{
error!(
&logger,
"Could not revert block. \
The likely cause is the block not being found due to a deep reorg. \
Retrying";
"block_number" => format!("{}", subgraph_ptr.number),
"block_hash" => format!("{}", subgraph_ptr.hash),
"error" => e.to_string(),
);

// Exit inner block stream consumption loop and go up to loop that restarts subgraph
break;
}
}
if let Err(e) = inputs
.store
.revert_block_operations(parent_ptr, cursor.as_deref())
{
error!(
&logger,
"Could not revert block. Retrying";
"block_number" => format!("{}", subgraph_ptr.number),
"block_hash" => format!("{}", subgraph_ptr.hash),
"error" => e.to_string(),
);

// Exit inner block stream consumption loop and go up to loop that restarts subgraph
break;
}

ctx.block_stream_metrics
Expand All @@ -626,6 +593,7 @@ where
ctx.state.entity_lfu_cache = LfuCache::new();
continue;
}

// Log and drop the errors from the block_stream
// The block stream will continue attempting to produce blocks
Some(Err(e)) => {
Expand Down
2 changes: 1 addition & 1 deletion graph/src/blockchain/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ pub enum BlockStreamEvent<C: Blockchain> {
// The payload is the current subgraph head pointer, which should be reverted, such that the
// parent of the current subgraph head becomes the new subgraph head.
// An optional pointer to the parent block will save a round trip operation when reverting.
Revert(BlockPtr, FirehoseCursor, Option<BlockPtr>),
Revert(BlockPtr, BlockPtr, FirehoseCursor),

ProcessBlock(BlockWithTriggers<C>, FirehoseCursor),
}
Expand Down
12 changes: 7 additions & 5 deletions graph/src/blockchain/polling_block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ where
/// Blocks and range size
Blocks(VecDeque<BlockWithTriggers<C>>, BlockNumber),

// The payload is the current subgraph head pointer, which should be reverted and it's parent, such that the
// The payload is the current subgraph head pointer, which should be reverted and its parent, such that the
// parent of the current subgraph head becomes the new subgraph head.
Revert(BlockPtr, BlockPtr),
Done,
Expand Down Expand Up @@ -220,7 +220,9 @@ where

return Ok(NextBlocks::Done);
}
ReconciliationStep::Revert(from, to) => return Ok(NextBlocks::Revert(from, to)),
ReconciliationStep::Revert(from, parent_ptr) => {
return Ok(NextBlocks::Revert(from, parent_ptr))
}
}
}
}
Expand Down Expand Up @@ -565,14 +567,14 @@ impl<C: Blockchain> Stream for PollingBlockStream<C> {
// Poll for chain head update
continue;
}
NextBlocks::Revert(from, to) => {
self.ctx.current_block = to.into();
NextBlocks::Revert(from, parent_ptr) => {
self.ctx.current_block = Some(parent_ptr.clone());

self.state = BlockStreamState::BeginReconciliation;
break Poll::Ready(Some(Ok(BlockStreamEvent::Revert(
from,
parent_ptr,
FirehoseCursor::None,
self.ctx.current_block.clone(),
))));
}
},
Expand Down
6 changes: 5 additions & 1 deletion graph/src/components/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1037,7 +1037,11 @@ pub trait WritableStore: Send + Sync + 'static {
/// subgraph block pointer to `block_ptr_to`.
///
/// `block_ptr_to` must point to the parent block of the subgraph block pointer.
fn revert_block_operations(&self, block_ptr_to: BlockPtr) -> Result<(), StoreError>;
fn revert_block_operations(
&self,
block_ptr_to: BlockPtr,
firehose_cursor: Option<&str>,
) -> Result<(), StoreError>;

/// If a deterministic error happened, this function reverts the block operations from the
/// current block to the previous block.
Expand Down
2 changes: 1 addition & 1 deletion graph/tests/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl WritableStore for MockStore {
unimplemented!()
}

fn revert_block_operations(&self, _: BlockPtr) -> Result<(), StoreError> {
fn revert_block_operations(&self, _: BlockPtr, _: Option<&str>) -> Result<(), StoreError> {
unimplemented!()
}

Expand Down
24 changes: 20 additions & 4 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::Context;
use detail::DeploymentDetail;
use diesel::connection::SimpleConnection;
use diesel::pg::PgConnection;
Expand Down Expand Up @@ -905,7 +906,7 @@ impl DeploymentStore {

if let Some(cursor) = firehose_cursor {
if cursor != "" {
deployment::update_firehose_cursor(&conn, &site.deployment, &cursor)?;
deployment::update_firehose_cursor(&conn, &site.deployment, cursor)?;
}
}

Expand All @@ -920,6 +921,7 @@ impl DeploymentStore {
conn: &PgConnection,
site: Arc<Site>,
block_ptr_to: BlockPtr,
firehose_cursor: Option<&str>,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this still be an Option, given this is now using the empty string?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I used "" at some place to "clear" the cursor, because right now None means do nothing (which is the case when using Ethereum RPC).

So I'm of the opinion that we should keep it like this.

) -> Result<StoreEvent, StoreError> {
let event = conn.transaction(|| -> Result<_, StoreError> {
// Don't revert past a graft point
Expand All @@ -940,6 +942,11 @@ impl DeploymentStore {

deployment::revert_block_ptr(&conn, &site.deployment, block_ptr_to.clone())?;

if let Some(cursor) = firehose_cursor {
deployment::update_firehose_cursor(&conn, &site.deployment, cursor)
.context("updating firehose cursor")?;
}

// Revert the data
let layout = self.layout(&conn, site.clone())?;

Expand Down Expand Up @@ -991,13 +998,18 @@ impl DeploymentStore {
block_ptr_to.number
);
}
self.rewind_with_conn(&conn, site, block_ptr_to)

// When rewinding, we reset the firehose cursor to the empty string. That way, on resume,
// Firehose will start from the block_ptr instead (with sanity check to ensure it's resume
// at the exact block).
self.rewind_with_conn(&conn, site, block_ptr_to, Some(""))
}

pub(crate) fn revert_block_operations(
&self,
site: Arc<Site>,
block_ptr_to: BlockPtr,
firehose_cursor: Option<&str>,
) -> Result<StoreEvent, StoreError> {
let conn = self.get_conn()?;
// Unwrap: If we are reverting then the block ptr is not `None`.
Expand All @@ -1008,7 +1020,7 @@ impl DeploymentStore {
panic!("revert_block_operations must revert a single block only");
}

self.rewind_with_conn(&conn, site, block_ptr_to)
self.rewind_with_conn(&conn, site, block_ptr_to, firehose_cursor)
}

pub(crate) async fn deployment_state_from_id(
Expand Down Expand Up @@ -1227,7 +1239,11 @@ impl DeploymentStore {
);

// We ignore the StoreEvent that's being returned, we'll not use it.
let _ = self.revert_block_operations(site.clone(), parent_ptr.clone())?;
//
// We reset the firehose cursor to the empty string. That way, on resume,
// Firehose will start from the block_ptr instead (with sanity checks to ensure it's resuming
// at the correct block).
let _ = self.revert_block_operations(site.clone(), parent_ptr.clone(), Some(""))?;

// Unfail the deployment.
deployment::update_deployment_status(conn, deployment_id, prev_health, None)?;
Expand Down
25 changes: 17 additions & 8 deletions store/postgres/src/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,17 @@ impl WritableStore {
})
}

fn revert_block_operations(&self, block_ptr_to: BlockPtr) -> Result<(), StoreError> {
fn revert_block_operations(
&self,
block_ptr_to: BlockPtr,
firehose_cursor: Option<&str>,
) -> Result<(), StoreError> {
self.retry("revert_block_operations", || {
let event = self
.writable
.revert_block_operations(self.site.clone(), block_ptr_to.clone())?;
let event = self.writable.revert_block_operations(
self.site.clone(),
block_ptr_to.clone(),
firehose_cursor.clone(),
)?;
self.try_send_store_event(event)
})
}
Expand Down Expand Up @@ -363,13 +369,16 @@ impl WritableStoreTrait for WritableAgent {
self.store.start_subgraph_deployment(logger)
}

fn revert_block_operations(&self, block_ptr_to: BlockPtr) -> Result<(), StoreError> {
fn revert_block_operations(
&self,
block_ptr_to: BlockPtr,
firehose_cursor: Option<&str>,
) -> Result<(), StoreError> {
*self.block_ptr.lock().unwrap() = Some(block_ptr_to.clone());
// FIXME: What about the firehose cursor? Why doesn't that get updated?

// TODO: If we haven't written the block yet, revert in memory. If
// we have, revert in the database
self.store.revert_block_operations(block_ptr_to)
self.store
.revert_block_operations(block_ptr_to, firehose_cursor)
}

fn unfail_deterministic_error(
Expand Down
4 changes: 2 additions & 2 deletions store/postgres/tests/graft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,13 +301,13 @@ async fn check_graft(
.cheap_clone()
.writable(LOGGER.clone(), deployment.id)
.await?
.revert_block_operations(BLOCKS[1].clone())
.revert_block_operations(BLOCKS[1].clone(), None)
.expect("We can revert a block we just created");

let err = store
.writable(LOGGER.clone(), deployment.id)
.await?
.revert_block_operations(BLOCKS[0].clone())
.revert_block_operations(BLOCKS[0].clone(), None)
.expect_err("Reverting past graft point is not allowed");

assert!(err.to_string().contains("Can not revert subgraph"));
Expand Down
2 changes: 1 addition & 1 deletion store/test-store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ pub async fn revert_block(store: &Arc<Store>, deployment: &DeploymentLocator, pt
.writable(LOGGER.clone(), deployment.id)
.await
.expect("can get writable")
.revert_block_operations(ptr.clone())
.revert_block_operations(ptr.clone(), None)
.unwrap();
}

Expand Down