diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 62115a00457..35a31dc4343 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -13,7 +13,7 @@ use graph::prelude::tokio::try_join; use graph::{ blockchain::{block_stream::BlockWithTriggers, BlockPtr, IngestorError}, prelude::{ - anyhow::{self, anyhow, bail, ensure}, + anyhow::{self, anyhow, bail, ensure, Context}, async_trait, debug, error, ethabi, futures03::{self, compat::Future01CompatExt, FutureExt, StreamExt, TryStreamExt}, hex, info, retry, serde_json as json, stream, tiny_keccak, trace, warn, @@ -1347,24 +1347,28 @@ pub(crate) async fn blocks_with_triggers( trigger_futs.push(block_future) } - // join on triger futures - let triggers: Vec = trigger_futs.try_concat().await?; - - // get hash for "to" block - let to_hash = match adapter + // Get hash for "to" block + let to_hash_fut = adapter .block_hash_by_block_number(&logger, to) - .compat() - .await? - { - Some(hash) => hash, - None => { - warn!(logger, - "Ethereum endpoint is behind"; - "url" => eth.url_hostname() - ); - bail!("Block {} not found in the chain", to) - } - }; + .and_then(|hash| match hash { + Some(hash) => Ok(hash), + None => { + warn!(logger, + "Ethereum endpoint is behind"; + "url" => eth.url_hostname() + ); + bail!("Block {} not found in the chain", to) + } + }) + .compat(); + + // Join on triggers and block hash resolution + let (triggers, to_hash) = futures03::join!(trigger_futs.try_concat(), to_hash_fut); + + // Unpack and handle possible errors in the previously joined futures + let triggers = + triggers.with_context(|| format!("Failed to obtain triggers for block {}", to))?; + let to_hash = to_hash.with_context(|| format!("Failed to infer hash for block {}", to))?; let mut block_hashes: HashSet = triggers.iter().map(EthereumTrigger::block_hash).collect();