diff --git a/.gitignore b/.gitignore index 5a93a5ca407..1329da723eb 100644 --- a/.gitignore +++ b/.gitignore @@ -19,6 +19,7 @@ /tests/integration-tests/**/generated /tests/integration-tests/**/node_modules /tests/integration-tests/**/yarn.lock +/tests/integration-tests/**/yarn-error.log # Built solidity contracts. /tests/integration-tests/**/bin diff --git a/chain/ethereum/build.rs b/chain/ethereum/build.rs index 7f828222e0d..9bde857d835 100644 --- a/chain/ethereum/build.rs +++ b/chain/ethereum/build.rs @@ -4,5 +4,5 @@ fn main() { .out_dir("src/protobuf") .format(true) .compile(&["proto/codec.proto"], &["proto"]) - .expect("Failed to compile StreamingFast Ethereum proto(s)"); + .expect("Failed to compile Firehose Ethereum proto(s)"); } diff --git a/chain/ethereum/examples/firehose.rs b/chain/ethereum/examples/firehose.rs index c8c86d9ef1d..54601c25649 100644 --- a/chain/ethereum/examples/firehose.rs +++ b/chain/ethereum/examples/firehose.rs @@ -1,10 +1,12 @@ use anyhow::Error; use graph::{ + env::env_var, log::logger, prelude::{prost, tokio, tonic}, {firehose, firehose::FirehoseEndpoint, firehose::ForkStep}, }; use graph_chain_ethereum::codec; +use hex::ToHex; use prost::Message; use std::sync::Arc; use tonic::Streaming; @@ -12,18 +14,30 @@ use tonic::Streaming; #[tokio::main] async fn main() -> Result<(), Error> { let mut cursor: Option = None; + let token_env = env_var("SF_API_TOKEN", "".to_string()); + let mut token: Option = None; + if token_env.len() > 0 { + token = Some(token_env); + } let logger = logger(true); let firehose = Arc::new( - FirehoseEndpoint::new(logger, "firehose", "https://bsc.streamingfast.io:443", None).await?, + FirehoseEndpoint::new( + logger, + "firehose", + "https://api.streamingfast.io:443", + token, + ) + .await?, ); loop { - println!("connecting to the stream!"); + println!("Connecting to the stream!"); let mut stream: Streaming = match firehose .clone() .stream_blocks(firehose::Request { - start_block_num: 7000000, + start_block_num: 12369739, + stop_block_num: 12369739, start_cursor: match &cursor { Some(c) => c.clone(), None => String::from(""), @@ -35,7 +49,7 @@ async fn main() -> Result<(), Error> { { Ok(s) => s, Err(e) => { - println!("could not connect to stream! {}", e); + println!("Could not connect to stream! {}", e); continue; } }; @@ -44,11 +58,11 @@ async fn main() -> Result<(), Error> { let resp = match stream.message().await { Ok(Some(t)) => t, Ok(None) => { - println!("stream completed"); - break; + println!("Stream completed"); + return Ok(()); } Err(e) => { - println!("error getting message {}", e); + println!("Error getting message {}", e); break; } }; @@ -62,6 +76,26 @@ async fn main() -> Result<(), Error> { hex::encode(b.hash), resp.step ); + b.transaction_traces.iter().for_each(|trx| { + let mut logs: Vec = vec![]; + trx.calls.iter().for_each(|call| { + call.logs.iter().for_each(|log| { + logs.push(format!( + "Log {} Topics, Address {}, Trx Index {}, Block Index {}", + log.topics.len(), + log.address.encode_hex::(), + log.index, + log.block_index + )); + }) + }); + + if logs.len() > 0 { + println!("Transaction {}", trx.hash.encode_hex::()); + logs.iter().for_each(|log| println!("{}", log)); + } + }); + cursor = Some(resp.cursor) } Err(e) => panic!("Unable to decode {:?}", e), diff --git a/chain/ethereum/proto/codec.proto b/chain/ethereum/proto/codec.proto index 33ad437a18b..55ab5e6f13a 100644 --- a/chain/ethereum/proto/codec.proto +++ b/chain/ethereum/proto/codec.proto @@ -40,6 +40,16 @@ message Block { string filtering_exclude_filter_expr = 42; } +// HeaderOnlyBlock is a standard [Block] structure where all other fields are +// removed so that hydrating that object from a [Block] bytes payload will +// drastically reduced allocated memory required to hold the full block. +// +// This can be used to unpack a [Block] when only the [BlockHeader] information +// is required and greatly reduced required memory. +message HeaderOnlyBlock { + BlockHeader header = 5; +} + // BlockWithRefs is a lightweight block, with traces and transactions // purged from the `block` within, and only. It is used in transports // to pass block data around. diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index db2bf78e08a..5d15b6144c7 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -2,7 +2,7 @@ use anyhow::{Context, Error}; use graph::blockchain::BlockchainKind; use graph::data::subgraph::UnifiedMappingApiVersion; use graph::env::env_var; -use graph::firehose::{FirehoseEndpoints, ForkStep}; +use graph::firehose::{FirehoseEndpoint, FirehoseEndpoints, ForkStep}; use graph::prelude::{ EthereumBlock, EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, StopwatchMetrics, }; @@ -187,6 +187,7 @@ impl Blockchain for Chain { deployment: DeploymentLocator, block_cursor: Option, start_blocks: Vec, + subgraph_current_block: Option, filter: Arc, metrics: Arc, unified_api_version: UnifiedMappingApiVersion, @@ -214,10 +215,13 @@ impl Blockchain for Chain { .subgraph_logger(&deployment) .new(o!("component" => "FirehoseBlockStream")); - let firehose_mapper = Arc::new(FirehoseMapper {}); + let firehose_mapper = Arc::new(FirehoseMapper { + endpoint: firehose_endpoint.cheap_clone(), + }); Ok(Box::new(FirehoseBlockStream::new( firehose_endpoint, + subgraph_current_block, block_cursor, firehose_mapper, adapter, @@ -231,7 +235,7 @@ impl Blockchain for Chain { &self, deployment: DeploymentLocator, start_blocks: Vec, - subgraph_start_block: Option, + subgraph_current_block: Option, filter: Arc, metrics: Arc, unified_api_version: UnifiedMappingApiVersion, @@ -282,7 +286,7 @@ impl Blockchain for Chain { *MAX_BLOCK_RANGE_SIZE, *TARGET_TRIGGERS_PER_BLOCK_RANGE, unified_api_version, - subgraph_start_block, + subgraph_current_block, ))) } @@ -519,7 +523,9 @@ impl TriggersAdapterTrait for TriggersAdapter { } } -pub struct FirehoseMapper {} +pub struct FirehoseMapper { + endpoint: Arc, +} #[async_trait] impl FirehoseMapperTrait for FirehoseMapper { @@ -585,4 +591,32 @@ impl FirehoseMapperTrait for FirehoseMapper { } } } + + async fn block_ptr_for_number( + &self, + logger: &Logger, + number: BlockNumber, + ) -> Result { + self.endpoint + .block_ptr_for_number::(logger, number) + .await + } + + async fn final_block_ptr_for( + &self, + logger: &Logger, + block: &BlockFinality, + ) -> Result { + // Firehose for Ethereum has an hard-coded confirmations for finality sets to 200 block + // behind the current block. The magic value 200 here comes from this hard-coded Firehose + // value. + let final_block_number = match block.number() { + x if x >= 200 => x - 200, + _ => 0, + }; + + self.endpoint + .block_ptr_for_number::(logger, final_block_number) + .await + } } diff --git a/chain/ethereum/src/codec.rs b/chain/ethereum/src/codec.rs index eb0d1438190..f547a5f249f 100644 --- a/chain/ethereum/src/codec.rs +++ b/chain/ethereum/src/codec.rs @@ -302,9 +302,21 @@ impl Into for &Block { } } -impl From for BlockPtr { - fn from(b: Block) -> BlockPtr { - (&b).into() +impl BlockHeader { + pub fn parent_ptr(&self) -> Option { + match self.parent_hash.len() { + 0 => None, + _ => Some(BlockPtr::from(( + H256::from_slice(self.parent_hash.as_ref()), + self.number - 1, + ))), + } + } +} + +impl<'a> From<&'a BlockHeader> for BlockPtr { + fn from(b: &'a BlockHeader) -> BlockPtr { + BlockPtr::from((H256::from_slice(b.hash.as_ref()), b.number)) } } @@ -314,9 +326,23 @@ impl<'a> From<&'a Block> for BlockPtr { } } +impl Block { + pub fn header(&self) -> &BlockHeader { + self.header.as_ref().unwrap() + } + + pub fn ptr(&self) -> BlockPtr { + BlockPtr::from(self.header()) + } + + pub fn parent_ptr(&self) -> Option { + self.header().parent_ptr() + } +} + impl BlockchainBlock for Block { fn number(&self) -> i32 { - BlockNumber::try_from(self.number).unwrap() + BlockNumber::try_from(self.header().number).unwrap() } fn ptr(&self) -> BlockPtr { @@ -324,14 +350,32 @@ impl BlockchainBlock for Block { } fn parent_ptr(&self) -> Option { - let parent_hash = &self.header.as_ref().unwrap().parent_hash; + self.parent_ptr() + } +} - match parent_hash.len() { - 0 => None, - _ => Some(BlockPtr::from(( - H256::from_slice(parent_hash.as_ref()), - self.number - 1, - ))), - } +impl HeaderOnlyBlock { + pub fn header(&self) -> &BlockHeader { + self.header.as_ref().unwrap() + } +} + +impl<'a> From<&'a HeaderOnlyBlock> for BlockPtr { + fn from(b: &'a HeaderOnlyBlock) -> BlockPtr { + BlockPtr::from(b.header()) + } +} + +impl BlockchainBlock for HeaderOnlyBlock { + fn number(&self) -> i32 { + BlockNumber::try_from(self.header().number).unwrap() + } + + fn ptr(&self) -> BlockPtr { + self.into() + } + + fn parent_ptr(&self) -> Option { + self.header().parent_ptr() } } diff --git a/chain/ethereum/src/ingestor.rs b/chain/ethereum/src/ingestor.rs index f2ba2c07273..54e0aae041f 100644 --- a/chain/ethereum/src/ingestor.rs +++ b/chain/ethereum/src/ingestor.rs @@ -118,8 +118,7 @@ impl BlockIngestor { None => { info!( self.logger, - "Downloading latest blocks from Ethereum. \ - This may take a few minutes..." + "Downloading latest blocks from Ethereum, this may take a few minutes..." ); } Some(head_block_ptr) => { @@ -135,7 +134,7 @@ impl BlockIngestor { if distance > 0 { info!( self.logger, - "Syncing {} blocks from Ethereum.", + "Syncing {} blocks from Ethereum", blocks_needed; "current_block_head" => head_number, "latest_block_head" => latest_number, diff --git a/chain/ethereum/src/protobuf/sf.ethereum.codec.v1.rs b/chain/ethereum/src/protobuf/sf.ethereum.codec.v1.rs index 11292e1f034..f8ab66c29f8 100644 --- a/chain/ethereum/src/protobuf/sf.ethereum.codec.v1.rs +++ b/chain/ethereum/src/protobuf/sf.ethereum.codec.v1.rs @@ -40,6 +40,17 @@ pub struct Block { #[prost(string, tag = "42")] pub filtering_exclude_filter_expr: ::prost::alloc::string::String, } +/// HeaderOnlyBlock is a standard [Block] structure where all other fields are +/// removed so that hydrating that object from a [Block] bytes payload will +/// drastically reduced allocated memory required to hold the full block. +/// +/// This can be used to unpack a [Block] when only the [BlockHeader] information +/// is required and greatly reduced required memory. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct HeaderOnlyBlock { + #[prost(message, optional, tag = "5")] + pub header: ::core::option::Option, +} /// BlockWithRefs is a lightweight block, with traces and transactions /// purged from the `block` within, and only. It is used in transports /// to pass block data around. diff --git a/chain/near/build.rs b/chain/near/build.rs index ca842a3a35c..435a5a3faa2 100644 --- a/chain/near/build.rs +++ b/chain/near/build.rs @@ -4,5 +4,5 @@ fn main() { .out_dir("src/protobuf") .format(true) .compile(&["proto/codec.proto"], &["proto"]) - .expect("Failed to compile StreamingFast NEAR proto(s)"); + .expect("Failed to compile Firehose NEAR proto(s)"); } diff --git a/chain/near/src/chain.rs b/chain/near/src/chain.rs index 0cf6d7f76d5..e01bf7e6e41 100644 --- a/chain/near/src/chain.rs +++ b/chain/near/src/chain.rs @@ -1,8 +1,8 @@ use graph::blockchain::BlockchainKind; use graph::cheap_clone::CheapClone; use graph::data::subgraph::UnifiedMappingApiVersion; -use graph::firehose::FirehoseEndpoints; -use graph::prelude::StopwatchMetrics; +use graph::firehose::{FirehoseEndpoint, FirehoseEndpoints}; +use graph::prelude::{StopwatchMetrics, TryFutureExt}; use graph::{ anyhow, blockchain::{ @@ -102,6 +102,7 @@ impl Blockchain for Chain { deployment: DeploymentLocator, block_cursor: Option, start_blocks: Vec, + subgraph_current_block: Option, filter: Arc, metrics: Arc, unified_api_version: UnifiedMappingApiVersion, @@ -117,7 +118,7 @@ impl Blockchain for Chain { let firehose_endpoint = match self.firehose_endpoints.random() { Some(e) => e.clone(), - None => return Err(anyhow::format_err!("no firehose endpoint available",)), + None => return Err(anyhow::format_err!("no firehose endpoint available")), }; let logger = self @@ -125,10 +126,13 @@ impl Blockchain for Chain { .subgraph_logger(&deployment) .new(o!("component" => "FirehoseBlockStream")); - let firehose_mapper = Arc::new(FirehoseMapper {}); + let firehose_mapper = Arc::new(FirehoseMapper { + endpoint: firehose_endpoint.cheap_clone(), + }); Ok(Box::new(FirehoseBlockStream::new( firehose_endpoint, + subgraph_current_block, block_cursor, firehose_mapper, adapter, @@ -142,7 +146,7 @@ impl Blockchain for Chain { &self, _deployment: DeploymentLocator, _start_blocks: Vec, - _subgraph_start_block: Option, + _subgraph_current_block: Option, _filter: Arc, _metrics: Arc, _unified_api_version: UnifiedMappingApiVersion, @@ -156,14 +160,18 @@ impl Blockchain for Chain { async fn block_pointer_from_number( &self, - _logger: &Logger, - _number: BlockNumber, + logger: &Logger, + number: BlockNumber, ) -> Result { - // FIXME (NEAR): Hmmm, what to do with this? - Ok(BlockPtr { - hash: BlockHash::from(vec![0xff; 32]), - number: 0, - }) + let firehose_endpoint = match self.firehose_endpoints.random() { + Some(e) => e.clone(), + None => return Err(anyhow::format_err!("no firehose endpoint available").into()), + }; + + firehose_endpoint + .block_ptr_for_number::(logger, number) + .map_err(Into::into) + .await } fn runtime_adapter(&self) -> Arc { @@ -261,7 +269,9 @@ impl TriggersAdapterTrait for TriggersAdapter { } } -pub struct FirehoseMapper {} +pub struct FirehoseMapper { + endpoint: Arc, +} #[async_trait] impl FirehoseMapperTrait for FirehoseMapper { @@ -322,4 +332,26 @@ impl FirehoseMapperTrait for FirehoseMapper { } } } + + async fn block_ptr_for_number( + &self, + logger: &Logger, + number: BlockNumber, + ) -> Result { + self.endpoint + .block_ptr_for_number::(logger, number) + .await + } + + async fn final_block_ptr_for( + &self, + logger: &Logger, + block: &codec::Block, + ) -> Result { + let final_block_number = block.header().last_final_block_height as BlockNumber; + + self.endpoint + .block_ptr_for_number::(logger, final_block_number) + .await + } } diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index 3147e29d227..5cbfb16ff58 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -9,6 +9,7 @@ use graph::blockchain::block_stream::{ BlockStream, BlockStreamEvent, BlockStreamMetrics, BlockWithTriggers, BufferedBlockStream, }; use graph::blockchain::{Block, Blockchain, DataSource, TriggerFilter as _, TriggersAdapter}; +use graph::components::store::WritableStore; use graph::components::{ store::{ModificationsAndCache, SubgraphFork}, subgraph::{CausalityRegion, MappingError, ProofOfIndexing, SharedProofOfIndexing}, @@ -25,6 +26,7 @@ use std::convert::TryFrom; use std::sync::Arc; use std::time::{Duration, Instant}; +const SECOND: Duration = Duration::from_secs(1); const MINUTE: Duration = Duration::from_secs(60); const SKIP_PTR_UPDATES_THRESHOLD: Duration = Duration::from_secs(60 * 5); @@ -59,27 +61,26 @@ async fn new_block_stream( false => BUFFERED_BLOCK_STREAM_SIZE, }; + let current_ptr = inputs.store.block_ptr(); + let block_stream = match is_firehose { true => chain.new_firehose_block_stream( inputs.deployment.clone(), inputs.store.block_cursor(), inputs.start_blocks.clone(), + current_ptr, + Arc::new(filter.clone()), + block_stream_metrics.clone(), + inputs.unified_api_version.clone(), + ), + false => chain.new_polling_block_stream( + inputs.deployment.clone(), + inputs.start_blocks.clone(), + current_ptr, Arc::new(filter.clone()), block_stream_metrics.clone(), inputs.unified_api_version.clone(), ), - false => { - let current_ptr = inputs.store.block_ptr(); - - chain.new_polling_block_stream( - inputs.deployment.clone(), - inputs.start_blocks.clone(), - current_ptr, - Arc::new(filter.clone()), - block_stream_metrics.clone(), - inputs.unified_api_version.clone(), - ) - } } .await?; @@ -121,6 +122,13 @@ where // increasing its timeout exponentially until it reaches the ceiling. let mut backoff = ExponentialBackoff::new(MINUTE * 2, *SUBGRAPH_ERROR_RETRY_CEIL_SECS); + // This ensures that any existing Firehose cursor is deleted prior starting using a + // non-Firehose block stream so that if we ever resume again the Firehose block stream, + // we will not start from a stalled cursor. + if !self.inputs.chain.is_firehose_supported() { + delete_subgraph_firehose_cursor(&logger, self.inputs.store.as_ref()).await; + } + loop { debug!(logger, "Starting or restarting subgraph"); @@ -158,26 +166,15 @@ where let (block, cursor) = match event { Some(Ok(BlockStreamEvent::ProcessBlock(block, cursor))) => (block, cursor), - Some(Ok(BlockStreamEvent::Revert(subgraph_ptr, parent_ptr, cursor))) => { - info!( - logger, - "Reverting block to get back to main chain"; - "block_number" => format!("{}", subgraph_ptr.number), - "block_hash" => format!("{}", subgraph_ptr.hash) - ); + Some(Ok(BlockStreamEvent::Revert(subgraph_ptr, revert_to_ptr, cursor))) => { + info!(&logger, "Reverting block to get back to main chain"; "current" => &subgraph_ptr, "revert_to" => &revert_to_ptr); if let Err(e) = self .inputs .store - .revert_block_operations(parent_ptr, cursor.as_deref()) + .revert_block_operations(revert_to_ptr, cursor.as_deref()) { - error!( - &logger, - "Could not revert block. Retrying"; - "block_number" => format!("{}", subgraph_ptr.number), - "block_hash" => format!("{}", subgraph_ptr.hash), - "error" => e.to_string(), - ); + error!(&logger, "Could not revert block. Retrying"; "error" => %e); // Exit inner block stream consumption loop and go up to loop that restarts subgraph break; @@ -905,6 +902,24 @@ async fn update_proof_of_indexing( Ok(()) } +async fn delete_subgraph_firehose_cursor(logger: &Logger, store: &dyn WritableStore) { + debug!(logger, "Deleting any existing Firehose cursor"); + let mut backoff = ExponentialBackoff::new(30 * SECOND, *SUBGRAPH_ERROR_RETRY_CEIL_SECS); + + loop { + match store.delete_block_cursor() { + Ok(_) => return, + Err(_) => { + error!( + logger, + "Unable to delete firehose cursor, waiting and retrying again" + ); + backoff.sleep_async().await; + } + } + } +} + /// Checks if the Deployment BlockPtr is at least one block behind to the chain head. fn is_deployment_synced(deployment_head_ptr: &BlockPtr, chain_head_ptr: Option) -> bool { matches!((deployment_head_ptr, &chain_head_ptr), (b1, Some(b2)) if b1.number >= (b2.number - 1)) diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index f027388147f..99c524d6469 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -111,6 +111,10 @@ impl BlockWithTriggers { pub fn ptr(&self) -> BlockPtr { self.block.ptr() } + + pub fn parent_ptr(&self) -> Option { + self.block.parent_ptr() + } } #[async_trait] @@ -160,6 +164,34 @@ pub trait FirehoseMapper: Send + Sync { adapter: &C::TriggersAdapter, filter: &C::TriggerFilter, ) -> Result, FirehoseError>; + + /// Returns the [BlockPtr] value for this given block number. This is the block pointer + /// of the longuest according to Firehose view of the blockchain state. + /// + /// This is a thin wrapper around [FirehoseEndpoint#block_ptr_for_number] to make + /// it chain agnostic and callable from chain agnostic [FirehoseBlockStream]. + async fn block_ptr_for_number( + &self, + logger: &Logger, + number: BlockNumber, + ) -> Result; + + /// Returns the closest final block ptr to the block ptr received. + /// On probablitics chain like Ethereum, final is determined by + /// the confirmations threshold configured for the Firehose stack (currently + /// hard-coded to 200). + /// + /// On some other chain like NEAR, the actual final block number is determined + /// from the block itself since it contains information about which block number + /// is final against the current block. + /// + /// To take an example, assuming we are on Ethereum, the final block pointer + /// for block #10212 would be the determined final block #10012 (10212 - 200 = 10012). + async fn final_block_ptr_for( + &self, + logger: &Logger, + block: &C::Block, + ) -> Result; } #[derive(Error, Debug)] diff --git a/graph/src/blockchain/firehose_block_stream.rs b/graph/src/blockchain/firehose_block_stream.rs index 446bee8464b..04fd3f379f2 100644 --- a/graph/src/blockchain/firehose_block_stream.rs +++ b/graph/src/blockchain/firehose_block_stream.rs @@ -3,6 +3,7 @@ use futures03::{Stream, StreamExt}; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; +use tonic::Status; use crate::prelude::*; use crate::util::backoff::ExponentialBackoff; @@ -21,6 +22,7 @@ where { pub fn new( endpoint: Arc, + subgraph_current_block: Option, cursor: Option, mapper: Arc, adapter: Arc, @@ -31,7 +33,7 @@ where where F: FirehoseMapper + 'static, { - let start_block_num: BlockNumber = start_blocks + let manifest_start_block_num = start_blocks .into_iter() .min() // Firehose knows where to start the stream for the specific chain, 0 here means @@ -45,7 +47,8 @@ where mapper, adapter, filter, - start_block_num, + manifest_start_block_num, + subgraph_current_block, logger, )), } @@ -58,15 +61,49 @@ fn stream_blocks>( mapper: Arc, adapter: Arc, filter: Arc, - start_block_num: BlockNumber, + manifest_start_block_num: BlockNumber, + subgraph_current_block: Option, logger: Logger, ) -> impl Stream, Error>> { use firehose::ForkStep::*; - try_stream! { - let mut latest_cursor = cursor.unwrap_or_else(|| "".to_string()); - let mut backoff = ExponentialBackoff::new(Duration::from_millis(500), Duration::from_secs(45)); + let mut latest_cursor = cursor.unwrap_or_else(|| "".to_string()); + let mut backoff = ExponentialBackoff::new(Duration::from_millis(500), Duration::from_secs(45)); + let mut subgraph_current_block = subgraph_current_block; + let mut start_block_num = subgraph_current_block + .as_ref() + .map(|ptr| { + // Firehose start block is inclusive while the subgraph_current_block is where the actual + // subgraph is currently at. So to process the actual next block, we must start one block + // further in the chain. + ptr.block_number() + 1 as BlockNumber + }) + .unwrap_or(manifest_start_block_num); + + // Seems the `try_stream!` macro interfer and don't see we are actually reading/writing this + #[allow(unused_assignments)] + let mut skip_backoff = false; + // Sanity check when starting from a subgraph block ptr directly. When + // this happens, we must ensure that Firehose first picked block directly follows the + // subgraph block ptr. So we check that Firehose first picked block's parent is + // equal to subgraph block ptr. + // + // This can happen for example when rewinding, unfailing a deterministic error or + // when switching from RPC to Firehose on Ethereum. + // + // What could go wrong is that the subgraph block ptr points to a forked block but + // since Firehose only accepts `block_number`, it could pick right away the canonical + // block of the longuest chain creating inconsistencies in the data (because it would + // not revert the forked the block). + // + // We should perform that only if subgraph actually started from a subgraph block ptr + // and no Firehose cursor was present. If a Firehose cursor is present, it's used to + // resume and as such, there is no need to perform this check (at the same time, it's + // not a bad check to make). + let mut check_subgraph_continuity = latest_cursor == "" && subgraph_current_block.is_some(); + + try_stream! { loop { info!( &logger, @@ -75,6 +112,7 @@ fn stream_blocks>( "start_block" => start_block_num, "cursor" => &latest_cursor, ); + skip_backoff = false; let result = endpoint .clone() @@ -90,42 +128,141 @@ fn stream_blocks>( info!(&logger, "Blockstream connected"); backoff.reset(); + let mut expected_stream_end = false; + for await response in stream { - match response { - Ok(v) => { - match mapper.to_block_stream_event(&logger, &v, &adapter, &filter).await { - Ok(event) => { - yield event; - - latest_cursor = v.cursor; - }, - Err(e) => { - error!( - logger, - "Mapping block to BlockStreamEvent failed: {:?}", e - ); - break; - } - } + match process_firehose_response( + response, + &mut check_subgraph_continuity, + manifest_start_block_num, + subgraph_current_block.as_ref(), + mapper.as_ref(), + &adapter, + &filter, + &logger, + ).await { + Ok(BlockResponse::Proceed(event, cursor)) => { + yield event; + + latest_cursor = cursor; + }, + Ok(BlockResponse::Rewind(revert_to)) => { + let subgraph_block = subgraph_current_block + .as_ref() + .cloned() + .expect("Rewinding means there is an inconsistency when starting from subgraph block ptr, so it must be defined"); + + // It's totally correct to pass the None as the cursor here, if we are here, there + // was no cursor before anyway, so it's totally fine to pass `None` + yield BlockStreamEvent::Revert(subgraph_block, revert_to.clone(), None); + + latest_cursor = "".to_string(); + skip_backoff = true; + + // We must restart the stream to ensure we now send block from revert_to point + // and we add + 1 to start block num because Firehose is inclusive and as such, + // we need to move to "next" block. + start_block_num = revert_to.number + 1; + subgraph_current_block = Some(revert_to); + expected_stream_end = true; + break; }, - Err(e) => { - info!(logger, "An error occurred while streaming blocks: {:?}", e); + Err(err) => { + error!(logger, "{:#}", err); + expected_stream_end = true; break; } } } - error!(logger, "Stream blocks complete unexpectedly, expecting stream to always stream blocks"); + if !expected_stream_end { + error!(logger, "Stream blocks complete unexpectedly, expecting stream to always stream blocks"); + } }, Err(e) => { error!(logger, "Unable to connect to endpoint: {:?}", e); } } - // If we reach this point, we must wait a bit before retrying - backoff.sleep_async().await; + // If we reach this point, we must wait a bit before retrying, unless `skip_backoff` is true + if !skip_backoff { + backoff.sleep_async().await; + } + } + } +} + +enum BlockResponse { + Proceed(BlockStreamEvent, String), + Rewind(BlockPtr), +} + +async fn process_firehose_response>( + result: Result, + check_subgraph_continuity: &mut bool, + manifest_start_block_num: BlockNumber, + subgraph_current_block: Option<&BlockPtr>, + mapper: &F, + adapter: &C::TriggersAdapter, + filter: &C::TriggerFilter, + logger: &Logger, +) -> Result, Error> { + let response = match result { + Ok(v) => v, + Err(e) => return Err(anyhow!("An error occurred while streaming blocks: {:?}", e)), + }; + + let event = mapper + .to_block_stream_event(logger, &response, adapter, filter) + .await + .context("Mapping block to BlockStreamEvent failed")?; + + if *check_subgraph_continuity { + info!(logger, "Firehose started from a subgraph pointer without an existing cursor, ensuring chain continuity"); + + if let BlockStreamEvent::ProcessBlock(ref block, _) = event { + let previous_block_ptr = block.parent_ptr(); + if previous_block_ptr.is_some() && previous_block_ptr.as_ref() != subgraph_current_block + { + warn!(&logger, + "Firehose selected first streamed block's parent should match subgraph start block, reverting to last know final chain segment"; + "subgraph_current_block" => &subgraph_current_block.unwrap(), + "firehose_start_block" => &previous_block_ptr.unwrap(), + ); + + let mut revert_to = mapper + .final_block_ptr_for(logger, &block.block) + .await + .context("Could not fetch final block to revert to")?; + + if revert_to.number < manifest_start_block_num { + warn!(&logger, "We would return before subgraph manifest's start block, limiting rewind to manifest's start block"); + + // We must revert up to parent's of manifest start block to ensure we delete everything "including" the start + // block that was processed. + let mut block_num = manifest_start_block_num - 1; + if block_num < 0 { + block_num = 0; + } + + revert_to = mapper + .block_ptr_for_number(logger, block_num) + .await + .context("Could not fetch manifest start block to revert to")?; + } + + return Ok(BlockResponse::Rewind(revert_to)); + } } + + info!( + logger, + "Subgraph chain continuity is respected, proceeding normally" + ); + *check_subgraph_continuity = false; } + + Ok(BlockResponse::Proceed(event, response.cursor)) } impl Stream for FirehoseBlockStream { diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index 7141c1f66d3..eb4daa7cdda 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -300,6 +300,7 @@ impl Blockchain for MockBlockchain { _deployment: crate::components::store::DeploymentLocator, _block_cursor: Option, _start_blocks: Vec, + _subgraph_current_block: Option, _filter: std::sync::Arc, _metrics: std::sync::Arc, _unified_api_version: crate::data::subgraph::UnifiedMappingApiVersion, @@ -311,7 +312,7 @@ impl Blockchain for MockBlockchain { &self, _deployment: crate::components::store::DeploymentLocator, _start_blocks: Vec, - _subgraph_start_block: Option, + _subgraph_current_block: Option, _filter: std::sync::Arc, _metrics: std::sync::Arc, _unified_api_version: crate::data::subgraph::UnifiedMappingApiVersion, diff --git a/graph/src/blockchain/mod.rs b/graph/src/blockchain/mod.rs index 6ed188f9175..cf2cfbc37ad 100644 --- a/graph/src/blockchain/mod.rs +++ b/graph/src/blockchain/mod.rs @@ -113,6 +113,7 @@ pub trait Blockchain: Debug + Sized + Send + Sync + Unpin + 'static { deployment: DeploymentLocator, block_cursor: Option, start_blocks: Vec, + subgraph_current_block: Option, filter: Arc, metrics: Arc, unified_api_version: UnifiedMappingApiVersion, @@ -122,7 +123,7 @@ pub trait Blockchain: Debug + Sized + Send + Sync + Unpin + 'static { &self, deployment: DeploymentLocator, start_blocks: Vec, - subgraph_start_block: Option, + subgraph_current_block: Option, filter: Arc, metrics: Arc, unified_api_version: UnifiedMappingApiVersion, diff --git a/graph/src/blockchain/types.rs b/graph/src/blockchain/types.rs index ffa168f0de6..ea16d0323b5 100644 --- a/graph/src/blockchain/types.rs +++ b/graph/src/blockchain/types.rs @@ -118,6 +118,17 @@ impl fmt::Debug for BlockPtr { } } +impl slog::Value for BlockPtr { + fn serialize( + &self, + record: &slog::Record, + key: slog::Key, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + slog::Value::serialize(&self.to_string(), record, key, serializer) + } +} + impl From> for BlockPtr { fn from(b: Block) -> BlockPtr { BlockPtr::from((b.hash.unwrap(), b.number.unwrap().as_u64())) diff --git a/graph/src/components/store.rs b/graph/src/components/store.rs index b52e83568a2..bad0e590991 100644 --- a/graph/src/components/store.rs +++ b/graph/src/components/store.rs @@ -1046,6 +1046,9 @@ pub trait WritableStore: Send + Sync + 'static { /// is used when re-connecting a Firehose stream to start back exactly where we left off. fn block_cursor(&self) -> Option; + /// Deletes the current Firehose `cursor` this deployment is currently at. + fn delete_block_cursor(&self) -> Result<(), StoreError>; + /// Start an existing subgraph deployment. fn start_subgraph_deployment(&self, logger: &Logger) -> Result<(), StoreError>; diff --git a/graph/src/firehose/endpoints.rs b/graph/src/firehose/endpoints.rs index 4aadce56a6b..22718b4824b 100644 --- a/graph/src/firehose/endpoints.rs +++ b/graph/src/firehose/endpoints.rs @@ -2,6 +2,7 @@ use crate::{ blockchain::Block as BlockchainBlock, blockchain::BlockPtr, cheap_clone::CheapClone, + components::store::BlockNumber, firehose::{decode_firehose_block, ForkStep}, prelude::{debug, info}, }; @@ -73,6 +74,23 @@ impl FirehoseEndpoint { } pub async fn genesis_block_ptr(&self, logger: &Logger) -> Result + where + M: prost::Message + BlockchainBlock + Default + 'static, + { + info!(logger, "Requesting genesis block from firehose"); + + // We use 0 here to mean the genesis block of the chain. Firehose + // when seeing start block number 0 will always return the genesis + // block of the chain, even if the chain's start block number is + // not starting at block #0. + self.block_ptr_for_number::(logger, 0).await + } + + pub async fn block_ptr_for_number( + &self, + logger: &Logger, + number: BlockNumber, + ) -> Result where M: prost::Message + BlockchainBlock + Default + 'static, { @@ -83,34 +101,57 @@ impl FirehoseEndpoint { let mut client = firehose::stream_client::StreamClient::with_interceptor( self.channel.cheap_clone(), - move |mut r: Request<()>| match token_metadata.as_ref() { - Some(t) => { + move |mut r: Request<()>| { + if let Some(ref t) = token_metadata { r.metadata_mut().insert("authorization", t.clone()); - Ok(r) } - _ => Ok(r), + + Ok(r) }, ); - debug!(logger, "Connecting to firehose to retrieve genesis block"); + debug!( + logger, + "Connecting to firehose to retrieve block for number {}", number + ); + + // The trick is the following. + // + // Firehose `start_block_num` and `stop_block_num` are both inclusive, so we specify + // the block we are looking for in both. + // + // Now, the remaining question is how the block from the canonical chain is picked. We + // leverage the fact that Firehose will always send the block in the longuest chain as the + // last message of this request. + // + // That way, we either get the final block if the block is now in a final segment of the + // chain (or probabilisticly if not finality concept exists for the chain). Or we get the + // block that is in the longuest chain according to Firehose. let response_stream = client .blocks(firehose::Request { - start_block_num: 0, - fork_steps: vec![ForkStep::StepIrreversible as i32], + start_block_num: number as i64, + stop_block_num: number as u64, + fork_steps: vec![ForkStep::StepNew as i32, ForkStep::StepIrreversible as i32], ..Default::default() }) .await?; let mut block_stream = response_stream.into_inner(); - info!(logger, "Requesting genesis block from firehose"); - let next = block_stream.next().await; + debug!(logger, "Retrieving block(s) from firehose"); - match next { - Some(Ok(v)) => Ok(decode_firehose_block::(&v)?.ptr()), - Some(Err(e)) => Err(anyhow::format_err!("firehose error {}", e)), + let mut block_in_longuest_chain: Option = None; + while let Some(message) = block_stream.next().await { + match message { + Ok(v) => block_in_longuest_chain = Some(decode_firehose_block::(&v)?.ptr()), + Err(e) => return Err(anyhow::format_err!("firehose error {}", e)), + }; + } + + match block_in_longuest_chain { + Some(block_ptr) => Ok(block_ptr), None => Err(anyhow::format_err!( - "firehose should have returned one block for genesis block request" + "Firehose should have returned at least one block for request" )), } } @@ -126,12 +167,12 @@ impl FirehoseEndpoint { let mut client = firehose::stream_client::StreamClient::with_interceptor( self.channel.cheap_clone(), - move |mut r: Request<()>| match token_metadata.as_ref() { - Some(t) => { + move |mut r: Request<()>| { + if let Some(ref t) = token_metadata { r.metadata_mut().insert("authorization", t.clone()); - Ok(r) } - _ => Ok(r), + + Ok(r) }, ); diff --git a/graph/tests/entity_cache.rs b/graph/tests/entity_cache.rs index 7bb8b91829d..d1f0b769a49 100644 --- a/graph/tests/entity_cache.rs +++ b/graph/tests/entity_cache.rs @@ -54,6 +54,10 @@ impl WritableStore for MockStore { unimplemented!() } + fn delete_block_cursor(&self) -> Result<(), StoreError> { + unimplemented!() + } + fn start_subgraph_deployment(&self, _: &Logger) -> Result<(), StoreError> { unimplemented!() } diff --git a/store/postgres/src/deployment.rs b/store/postgres/src/deployment.rs index 92b14d9179c..874b6e36d2b 100644 --- a/store/postgres/src/deployment.rs +++ b/store/postgres/src/deployment.rs @@ -300,6 +300,19 @@ pub fn forward_block_ptr( } } +pub fn delete_subgraph_firehose_cursor( + conn: &PgConnection, + id: &DeploymentHash, +) -> Result<(), StoreError> { + use subgraph_deployment as d; + + update(d::table.filter(d::deployment.eq(id.as_str()))) + .set(d::firehose_cursor.eq::>(None)) + .execute(conn) + .map(|_| ()) + .map_err(|e| e.into()) +} + pub fn get_subgraph_firehose_cursor( conn: &PgConnection, deployment_hash: &DeploymentHash, @@ -322,7 +335,7 @@ pub fn update_firehose_cursor( use subgraph_deployment as d; update(d::table.filter(d::deployment.eq(id.as_str()))) - .set((d::firehose_cursor.eq(cursor),)) + .set(d::firehose_cursor.eq(cursor)) .execute(conn) .map(|_| ()) .map_err(|e| e.into()) diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index a7dacd128b4..2d1efbf8b4f 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -797,6 +797,15 @@ impl DeploymentStore { )?) } + pub(crate) fn delete_block_cursor(&self, site: &Site) -> Result<(), StoreError> { + let conn = self.get_conn()?; + + Ok(deployment::delete_subgraph_firehose_cursor( + &conn, + &site.deployment, + )?) + } + pub(crate) async fn supports_proof_of_indexing<'a>( &self, site: Arc, @@ -1125,9 +1134,9 @@ impl DeploymentStore { return Ok(None); } - // Sanity check on block numbers - if deployment_head.number != block_ptr_to.number + 1 { - panic!("revert_block_operations must revert a single block only"); + // Sanity check on revert to ensure we go backward only + if block_ptr_to.number >= deployment_head.number { + panic!("revert_block_operations must revert only backward, you are trying to revert forward going from subgraph block {} to new block {}", deployment_head, block_ptr_to); } self.rewind_with_conn(&conn, site, block_ptr_to, firehose_cursor) diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index bcc230edad2..13c4512b9cd 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -147,6 +147,10 @@ impl WritableStore { self.writable.block_cursor(self.site.as_ref()) } + fn delete_block_cursor(&self) -> Result<(), StoreError> { + self.writable.delete_block_cursor(self.site.as_ref()) + } + fn start_subgraph_deployment(&self, logger: &Logger) -> Result<(), StoreError> { self.retry("start_subgraph_deployment", || { let store = &self.writable; @@ -366,6 +370,12 @@ impl WritableStoreTrait for WritableAgent { self.block_cursor.lock().unwrap().clone() } + fn delete_block_cursor(&self) -> Result<(), StoreError> { + self.store.delete_block_cursor()?; + *self.block_cursor.lock().unwrap() = None; + Ok(()) + } + fn start_subgraph_deployment(&self, logger: &Logger) -> Result<(), StoreError> { // TODO: Spin up a background writer thread and establish a channel self.store.start_subgraph_deployment(logger)?;