diff --git a/NEWS.md b/NEWS.md index 52573b2584e..1294b6901aa 100644 --- a/NEWS.md +++ b/NEWS.md @@ -5,6 +5,11 @@ where whitespace characters were part of the terms. - Adds support for Solidity Custom Errors (issue #2577) +### Api Version 0.0.7 and Spec Version 0.0.5 +This release brings API Version 0.0.7 in mappings, which allows Ethereum event handlers to require transaction receipts to be present in the `Event` object. +Refer to [PR #3373](https://github.com/graphprotocol/graph-node/pull/3373) for instructions on how to enable that. + + ## 0.25.2 This release includes two changes: diff --git a/chain/ethereum/src/adapter.rs b/chain/ethereum/src/adapter.rs index 0400f2eea29..dd190b4718d 100644 --- a/chain/ethereum/src/adapter.rs +++ b/chain/ethereum/src/adapter.rs @@ -205,10 +205,12 @@ pub(crate) struct EthereumLogFilter { /// Log filters can be represented as a bipartite graph between contracts and events. An edge /// exists between a contract and an event if a data source for the contract has a trigger for /// the event. - contracts_and_events_graph: GraphMap, + /// Edges are of `bool` type and indicates when a trigger requires a transaction receipt. + contracts_and_events_graph: GraphMap, - // Event sigs with no associated address, matching on all addresses. - wildcard_events: HashSet, + /// Event sigs with no associated address, matching on all addresses. + /// Maps to a boolean representing if a trigger requires a transaction receipt. + wildcard_events: HashMap, } impl Into> for EthereumLogFilter { @@ -248,28 +250,49 @@ impl EthereumLogFilter { let event = LogFilterNode::Event(*sig); self.contracts_and_events_graph .all_edges() - .any(|(s, t, ())| { - (s == contract && t == event) || (t == contract && s == event) - }) - || self.wildcard_events.contains(sig) + .any(|(s, t, _)| (s == contract && t == event) || (t == contract && s == event)) + || self.wildcard_events.contains_key(sig) } } } + /// Similar to [`matches`], checks if a transaction receipt is required for this log filter. + pub fn requires_transaction_receipt( + &self, + event_signature: &H256, + contract_address: Option<&Address>, + ) -> bool { + if let Some(true) = self.wildcard_events.get(event_signature) { + true + } else if let Some(address) = contract_address { + let contract = LogFilterNode::Contract(*address); + let event = LogFilterNode::Event(*event_signature); + self.contracts_and_events_graph + .all_edges() + .any(|(s, t, r)| { + *r && (s == contract && t == event) || (t == contract && s == event) + }) + } else { + false + } + } + pub fn from_data_sources<'a>(iter: impl IntoIterator) -> Self { let mut this = EthereumLogFilter::default(); for ds in iter { - for event_sig in ds.mapping.event_handlers.iter().map(|e| e.topic0()) { + for event_handler in ds.mapping.event_handlers.iter() { + let event_sig = event_handler.topic0(); match ds.source.address { Some(contract) => { this.contracts_and_events_graph.add_edge( LogFilterNode::Contract(contract), LogFilterNode::Event(event_sig), - (), + event_handler.receipt, ); } None => { - this.wildcard_events.insert(event_sig); + this.wildcard_events + .insert(event_sig, event_handler.receipt); } } } @@ -277,13 +300,13 @@ impl EthereumLogFilter { this } - pub fn from_mapping(iter: &Mapping) -> Self { + pub fn from_mapping(mapping: &Mapping) -> Self { let mut this = EthereumLogFilter::default(); - - for sig in iter.event_handlers.iter().map(|e| e.topic0()) { - this.wildcard_events.insert(sig); + for event_handler in &mapping.event_handlers { + let signature = event_handler.topic0(); + this.wildcard_events + .insert(signature, event_handler.receipt); } - this } @@ -298,8 +321,8 @@ impl EthereumLogFilter { contracts_and_events_graph, wildcard_events, } = other; - for (s, t, ()) in contracts_and_events_graph.all_edges() { - self.contracts_and_events_graph.add_edge(s, t, ()); + for (s, t, e) in contracts_and_events_graph.all_edges() { + self.contracts_and_events_graph.add_edge(s, t, *e); } self.wildcard_events.extend(wildcard_events); } @@ -322,7 +345,7 @@ impl EthereumLogFilter { let mut filters = self .wildcard_events .into_iter() - .map(EthGetLogsFilter::from_event) + .map(|(event, _)| EthGetLogsFilter::from_event(event)) .collect_vec(); // The current algorithm is to repeatedly find the maximum cardinality vertex and turn all @@ -990,7 +1013,7 @@ mod tests { let mut filter = TriggerFilter { log: EthereumLogFilter { contracts_and_events_graph: GraphMap::new(), - wildcard_events: HashSet::new(), + wildcard_events: HashMap::new(), }, call: EthereumCallFilter { contract_addresses_function_signatures: HashMap::from_iter(vec![ @@ -1047,17 +1070,17 @@ mod tests { filter.log.contracts_and_events_graph.add_edge( LogFilterNode::Contract(address(10)), LogFilterNode::Event(sig(100)), - (), + false, ); filter.log.contracts_and_events_graph.add_edge( LogFilterNode::Contract(address(10)), LogFilterNode::Event(sig(101)), - (), + false, ); filter.log.contracts_and_events_graph.add_edge( LogFilterNode::Contract(address(20)), LogFilterNode::Event(sig(100)), - (), + false, ); let expected_log = MultiLogFilter { @@ -1293,7 +1316,7 @@ fn complete_log_filter() { contracts_and_events_graph.add_edge( LogFilterNode::Contract(contract), LogFilterNode::Event(event), - (), + false, ); } } @@ -1301,7 +1324,7 @@ fn complete_log_filter() { // Run `eth_get_logs_filters`, which is what we want to test. let logs_filters: Vec<_> = EthereumLogFilter { contracts_and_events_graph, - wildcard_events: HashSet::new(), + wildcard_events: HashMap::new(), } .eth_get_logs_filters() .collect(); @@ -1333,3 +1356,97 @@ fn complete_log_filter() { } } } + +#[test] +fn log_filter_require_transacion_receipt_method() { + // test data + let event_signature_a = H256::zero(); + let event_signature_b = H256::from_low_u64_be(1); + let event_signature_c = H256::from_low_u64_be(2); + let contract_a = Address::from_low_u64_be(3); + let contract_b = Address::from_low_u64_be(4); + let contract_c = Address::from_low_u64_be(5); + + let wildcard_event_with_receipt = H256::from_low_u64_be(6); + let wildcard_event_without_receipt = H256::from_low_u64_be(7); + let wildcard_events = [ + (wildcard_event_with_receipt, true), + (wildcard_event_without_receipt, false), + ] + .into_iter() + .collect(); + + let alien_event_signature = H256::from_low_u64_be(8); // those will not be inserted in the graph + let alien_contract_address = Address::from_low_u64_be(9); + + // test graph nodes + let event_a_node = LogFilterNode::Event(event_signature_a); + let event_b_node = LogFilterNode::Event(event_signature_b); + let event_c_node = LogFilterNode::Event(event_signature_c); + let contract_a_node = LogFilterNode::Contract(contract_a); + let contract_b_node = LogFilterNode::Contract(contract_b); + let contract_c_node = LogFilterNode::Contract(contract_c); + + // build test graph with the following layout: + // + // ```dot + // graph bipartite { + // + // // conected and require a receipt + // event_a -- contract_a [ receipt=true ] + // event_b -- contract_b [ receipt=true ] + // event_c -- contract_c [ receipt=true ] + // + // // connected but don't require a receipt + // event_a -- contract_b [ receipt=false ] + // event_b -- contract_a [ receipt=false ] + // } + // ``` + let mut contracts_and_events_graph = GraphMap::new(); + + let event_a_id = contracts_and_events_graph.add_node(event_a_node); + let event_b_id = contracts_and_events_graph.add_node(event_b_node); + let event_c_id = contracts_and_events_graph.add_node(event_c_node); + let contract_a_id = contracts_and_events_graph.add_node(contract_a_node); + let contract_b_id = contracts_and_events_graph.add_node(contract_b_node); + let contract_c_id = contracts_and_events_graph.add_node(contract_c_node); + contracts_and_events_graph.add_edge(event_a_id, contract_a_id, true); + contracts_and_events_graph.add_edge(event_b_id, contract_b_id, true); + contracts_and_events_graph.add_edge(event_a_id, contract_b_id, false); + contracts_and_events_graph.add_edge(event_b_id, contract_a_id, false); + contracts_and_events_graph.add_edge(event_c_id, contract_c_id, true); + + let filter = EthereumLogFilter { + contracts_and_events_graph, + wildcard_events, + }; + + // connected contracts and events graph + assert!(filter.requires_transaction_receipt(&event_signature_a, Some(&contract_a))); + assert!(filter.requires_transaction_receipt(&event_signature_b, Some(&contract_b))); + assert!(filter.requires_transaction_receipt(&event_signature_c, Some(&contract_c))); + assert!(!filter.requires_transaction_receipt(&event_signature_a, Some(&contract_b))); + assert!(!filter.requires_transaction_receipt(&event_signature_b, Some(&contract_a))); + + // Event C and Contract C are not connected to the other events and contracts + assert!(!filter.requires_transaction_receipt(&event_signature_a, Some(&contract_c))); + assert!(!filter.requires_transaction_receipt(&event_signature_b, Some(&contract_c))); + assert!(!filter.requires_transaction_receipt(&event_signature_c, Some(&contract_a))); + assert!(!filter.requires_transaction_receipt(&event_signature_c, Some(&contract_b))); + + // Wildcard events + assert!(filter.requires_transaction_receipt(&wildcard_event_with_receipt, None)); + assert!(!filter.requires_transaction_receipt(&wildcard_event_without_receipt, None)); + + // Alien events and contracts always return false + assert!( + !filter.requires_transaction_receipt(&alien_event_signature, Some(&alien_contract_address)) + ); + assert!(!filter.requires_transaction_receipt(&alien_event_signature, None)); + assert!(!filter.requires_transaction_receipt(&alien_event_signature, Some(&contract_a))); + assert!(!filter.requires_transaction_receipt(&alien_event_signature, Some(&contract_b))); + assert!(!filter.requires_transaction_receipt(&alien_event_signature, Some(&contract_c))); + assert!(!filter.requires_transaction_receipt(&event_signature_a, Some(&alien_contract_address))); + assert!(!filter.requires_transaction_receipt(&event_signature_b, Some(&alien_contract_address))); + assert!(!filter.requires_transaction_receipt(&event_signature_c, Some(&alien_contract_address))); +} diff --git a/chain/ethereum/src/data_source.rs b/chain/ethereum/src/data_source.rs index 2bb1b611736..325e1c2b866 100644 --- a/chain/ethereum/src/data_source.rs +++ b/chain/ethereum/src/data_source.rs @@ -198,6 +198,20 @@ impl blockchain::DataSource for DataSource { errors.push(anyhow!("data source has duplicated block handlers")); } + // Validate that event handlers don't require receipts for API versions lower than 0.0.7 + let api_version = self.api_version(); + if api_version < semver::Version::new(0, 0, 7) { + for event_handler in &self.mapping.event_handlers { + if event_handler.receipt { + errors.push(anyhow!( + "data source has event handlers that require transaction receipts, but this \ + is only supported for apiVersion >= 0.0.7" + )); + break; + } + } + } + errors } @@ -433,7 +447,7 @@ impl DataSource { let trigger_address = match trigger { EthereumTrigger::Block(_, EthereumBlockTriggerType::WithCallTo(address)) => address, EthereumTrigger::Call(call) => &call.to, - EthereumTrigger::Log(log) => &log.address, + EthereumTrigger::Log(log, _) => &log.address, // Unfiltered block triggers match any data source address. EthereumTrigger::Block(_, EthereumBlockTriggerType::Every) => return true, @@ -471,7 +485,7 @@ impl DataSource { handler.handler, ))) } - EthereumTrigger::Log(log) => { + EthereumTrigger::Log(log, receipt) => { let potential_handlers = self.handlers_for_log(log)?; // Map event handlers to (event handler, event ABI) pairs; fail if there are @@ -572,6 +586,7 @@ impl DataSource { transaction: Arc::new(transaction), log: log.cheap_clone(), params, + receipt: receipt.clone(), }, event_handler.handler, logging_extras, @@ -996,6 +1011,8 @@ pub struct MappingEventHandler { pub event: String, pub topic0: Option, pub handler: String, + #[serde(default)] + pub receipt: bool, } impl MappingEventHandler { diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 33e03377aff..62115a00457 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -1,16 +1,19 @@ use futures::future; use futures::prelude::*; +use futures03::{future::BoxFuture, stream::FuturesUnordered}; use graph::blockchain::BlockHash; use graph::blockchain::ChainIdentifier; use graph::components::transaction_receipt::LightTransactionReceipt; use graph::data::subgraph::UnifiedMappingApiVersion; +use graph::data::subgraph::API_VERSION_0_0_5; +use graph::data::subgraph::API_VERSION_0_0_7; use graph::prelude::ethabi::ParamType; use graph::prelude::ethabi::Token; use graph::prelude::tokio::try_join; use graph::{ blockchain::{block_stream::BlockWithTriggers, BlockPtr, IngestorError}, prelude::{ - anyhow::{self, anyhow, bail}, + anyhow::{self, anyhow, bail, ensure}, async_trait, debug, error, ethabi, futures03::{self, compat::Future01CompatExt, FutureExt, StreamExt, TryStreamExt}, hex, info, retry, serde_json as json, stream, tiny_keccak, trace, warn, @@ -1284,87 +1287,84 @@ pub(crate) async fn blocks_with_triggers( let eth = adapter.clone(); let call_filter = EthereumCallFilter::from(&filter.block); - let mut trigger_futs: futures::stream::FuturesUnordered< - Box, Error = Error> + Send>, - > = futures::stream::FuturesUnordered::new(); + // Scan the block range to find relevant triggers + let trigger_futs: FuturesUnordered, anyhow::Error>>> = + FuturesUnordered::new(); - // Scan the block range from triggers to find relevant blocks + // Scan for Logs if !filter.log.is_empty() { - trigger_futs.push(Box::new( - eth.logs_in_block_range( - &logger, - subgraph_metrics.clone(), - from, - to, - filter.log.clone(), - ) - .map_ok(|logs: Vec| { - logs.into_iter() - .map(Arc::new) - .map(EthereumTrigger::Log) - .collect() - }) - .compat(), - )) + let logs_future = get_logs_and_transactions( + eth.clone(), + &logger, + subgraph_metrics.clone(), + from, + to, + filter.log.clone(), + &unified_api_version, + ) + .boxed(); + trigger_futs.push(logs_future) } - + // Scan for Calls if !filter.call.is_empty() { - trigger_futs.push(Box::new( - eth.calls_in_block_range(&logger, subgraph_metrics.clone(), from, to, &filter.call) - .map(Arc::new) - .map(EthereumTrigger::Call) - .collect(), - )); + let calls_future = eth + .calls_in_block_range(&logger, subgraph_metrics.clone(), from, to, &filter.call) + .map(Arc::new) + .map(EthereumTrigger::Call) + .collect() + .compat() + .boxed(); + trigger_futs.push(calls_future) } + // Scan for Blocks if filter.block.trigger_every_block { - trigger_futs.push(Box::new( - adapter - .block_range_to_ptrs(logger.clone(), from, to) - .map(move |ptrs| { - ptrs.into_iter() - .map(|ptr| EthereumTrigger::Block(ptr, EthereumBlockTriggerType::Every)) - .collect() - }), - )) + let block_future = adapter + .block_range_to_ptrs(logger.clone(), from, to) + .map(move |ptrs| { + ptrs.into_iter() + .map(|ptr| EthereumTrigger::Block(ptr, EthereumBlockTriggerType::Every)) + .collect() + }) + .compat() + .boxed(); + trigger_futs.push(block_future) } else if !filter.block.contract_addresses.is_empty() { // To determine which blocks include a call to addresses // in the block filter, transform the `block_filter` into // a `call_filter` and run `blocks_with_calls` - trigger_futs.push(Box::new( - eth.calls_in_block_range(&logger, subgraph_metrics.clone(), from, to, &call_filter) - .map(|call| { - EthereumTrigger::Block( - BlockPtr::from(&call), - EthereumBlockTriggerType::WithCallTo(call.to), - ) - }) - .collect(), - )); + let block_future = eth + .calls_in_block_range(&logger, subgraph_metrics.clone(), from, to, &call_filter) + .map(|call| { + EthereumTrigger::Block( + BlockPtr::from(&call), + EthereumBlockTriggerType::WithCallTo(call.to), + ) + }) + .collect() + .compat() + .boxed(); + trigger_futs.push(block_future) } - let logger1 = logger.cheap_clone(); - let logger2 = logger.cheap_clone(); - let eth_clone = eth.cheap_clone(); - let (triggers, to_hash) = trigger_futs - .concat2() - .join( - adapter - .clone() - .block_hash_by_block_number(&logger, to) - .then(move |to_hash| match to_hash { - Ok(n) => n.ok_or_else(|| { - warn!(logger2, - "Ethereum endpoint is behind"; - "url" => eth_clone.url_hostname() - ); - anyhow!("Block {} not found in the chain", to) - }), - Err(e) => Err(e), - }), - ) + // join on triger futures + let triggers: Vec = trigger_futs.try_concat().await?; + + // get hash for "to" block + let to_hash = match adapter + .block_hash_by_block_number(&logger, to) .compat() - .await?; + .await? + { + Some(hash) => hash, + None => { + warn!(logger, + "Ethereum endpoint is behind"; + "url" => eth.url_hostname() + ); + bail!("Block {} not found in the chain", to) + } + }; let mut block_hashes: HashSet = triggers.iter().map(EthereumTrigger::block_hash).collect(); @@ -1381,7 +1381,7 @@ pub(crate) async fn blocks_with_triggers( triggers_by_block.entry(to).or_insert(Vec::new()); let blocks = adapter - .load_blocks(logger1, chain_store.clone(), block_hashes) + .load_blocks(logger.cheap_clone(), chain_store.clone(), block_hashes) .and_then( move |block| match triggers_by_block.remove(&(block.number() as BlockNumber)) { Some(triggers) => Ok(BlockWithTriggers::new( @@ -1399,10 +1399,7 @@ pub(crate) async fn blocks_with_triggers( .await?; // Filter out call triggers that come from unsuccessful transactions - - let mut blocks = if unified_api_version - .equal_or_greater_than(&graph::data::subgraph::API_VERSION_0_0_5) - { + let mut blocks = if unified_api_version.equal_or_greater_than(&API_VERSION_0_0_5) { let futures = blocks.into_iter().map(|block| { filter_call_triggers_from_unsuccessful_transactions(block, ð, &chain_store, &logger) }); @@ -1492,7 +1489,7 @@ pub(crate) fn parse_log_triggers( .logs .iter() .filter(move |log| log_filter.matches(log)) - .map(move |log| EthereumTrigger::Log(Arc::new(log.clone()))) + .map(move |log| EthereumTrigger::Log(Arc::new(log.clone()), Some(receipt.clone()))) }) .collect() } @@ -1829,3 +1826,126 @@ fn resolve_transaction_receipt( } } } + +/// Retrieves logs and the associated transaction receipts, if required by the [`EthereumLogFilter`]. +async fn get_logs_and_transactions( + adapter: Arc, + logger: &Logger, + subgraph_metrics: Arc, + from: BlockNumber, + to: BlockNumber, + log_filter: EthereumLogFilter, + unified_api_version: &UnifiedMappingApiVersion, +) -> Result, anyhow::Error> { + // Obtain logs externally + let logs = adapter + .logs_in_block_range(logger, subgraph_metrics, from, to, log_filter.clone()) + .await?; + + // Not all logs have associated transaction hashes, nor do all triggers require them. + // We also restrict receipts retrieval for some api versions. + let transaction_hashes_by_block: HashMap> = logs + .iter() + .filter(|_| unified_api_version.equal_or_greater_than(&API_VERSION_0_0_7)) + .filter(|log| { + if let Some(signature) = log.topics.first() { + log_filter.requires_transaction_receipt(signature, Some(&log.address)) + } else { + false + } + }) + .filter_map(|log| { + if let (Some(block), Some(txn)) = (log.block_hash, log.transaction_hash) { + Some((block, txn)) + } else { + // Absent block and transaction data might happen for pending transactions, which we + // don't handle. + None + } + }) + .fold( + HashMap::>::new(), + |mut acc, (block_hash, txn_hash)| { + acc.entry(block_hash).or_default().insert(txn_hash); + acc + }, + ); + + // Obtain receipts externally + let transaction_receipts_by_hash = get_transaction_receipts_for_transaction_hashes( + &adapter, + &transaction_hashes_by_block, + logger.cheap_clone(), + ) + .await?; + + // Associate each log with its receipt, when possible + let mut log_triggers = Vec::new(); + for log in logs.into_iter() { + let optional_receipt = log + .transaction_hash + .and_then(|txn| transaction_receipts_by_hash.get(&txn).cloned()); + let value = EthereumTrigger::Log(Arc::new(log), optional_receipt); + log_triggers.push(value); + } + + Ok(log_triggers) +} + +/// Tries to retrive all transaction receipts for a set of transaction hashes. +async fn get_transaction_receipts_for_transaction_hashes( + adapter: &EthereumAdapter, + transaction_hashes_by_block: &HashMap>, + logger: Logger, +) -> Result, anyhow::Error> { + use std::collections::hash_map::Entry::Vacant; + + let mut receipts_by_hash: HashMap = HashMap::new(); + + // Return early if input set is empty + if transaction_hashes_by_block.is_empty() { + return Ok(receipts_by_hash); + } + + // Keep a record of all unique transaction hashes for which we'll request receipts. We will + // later use this to check if we have collected the receipts from all required transactions. + let mut unique_transaction_hashes: HashSet<&H256> = HashSet::new(); + + // Request transaction receipts concurrently + let receipt_futures = FuturesUnordered::new(); + + let web3 = Arc::clone(&adapter.web3); + for (block_hash, transaction_hashes) in transaction_hashes_by_block { + for transaction_hash in transaction_hashes { + unique_transaction_hashes.insert(transaction_hash); + let receipt_future = fetch_transaction_receipt_with_retry( + web3.cheap_clone(), + *transaction_hash, + *block_hash, + logger.cheap_clone(), + ); + receipt_futures.push(receipt_future) + } + } + let receipts: Vec<_> = receipt_futures.try_collect().await?; + + // Build a map between transaction hashes and their receipts + for receipt in receipts.into_iter() { + if !unique_transaction_hashes.remove(&receipt.transaction_hash) { + bail!("Received a receipt for a different transaction hash") + } + if let Vacant(entry) = receipts_by_hash.entry(receipt.transaction_hash.clone()) { + entry.insert(receipt); + } else { + bail!("Received a duplicate transaction receipt") + } + } + + // Confidence check: all unique hashes should have been used + ensure!( + unique_transaction_hashes.is_empty(), + "Didn't receive all necessary transaction receipts" + ); + + Ok(receipts_by_hash) +} diff --git a/chain/ethereum/src/runtime/abi.rs b/chain/ethereum/src/runtime/abi.rs index a6748715f70..af236a18dfb 100644 --- a/chain/ethereum/src/runtime/abi.rs +++ b/chain/ethereum/src/runtime/abi.rs @@ -1,20 +1,27 @@ -use graph::prelude::{ethabi, BigInt}; -use graph::runtime::gas::GasCounter; -use graph::runtime::{asc_get, asc_new, AscPtr, DeterministicHostError, FromAscObj, ToAscObj}; -use graph::runtime::{AscHeap, AscIndexId, AscType, IndexForAscTypeId}; +use super::runtime_adapter::UnresolvedContractCall; +use crate::trigger::{ + EthereumBlockData, EthereumCallData, EthereumEventData, EthereumTransactionData, +}; +use graph::{ + prelude::{ + ethabi, + web3::types::{Log, TransactionReceipt, H256}, + BigInt, + }, + runtime::{ + asc_get, asc_new, gas::GasCounter, AscHeap, AscIndexId, AscPtr, AscType, + DeterministicHostError, FromAscObj, IndexForAscTypeId, ToAscObj, + }, +}; use graph_runtime_derive::AscType; use graph_runtime_wasm::asc_abi::class::{ - Array, AscAddress, AscBigInt, AscEnum, AscH160, AscString, EthereumValueKind, Uint8Array, + Array, AscAddress, AscBigInt, AscEnum, AscH160, AscString, AscWrapped, EthereumValueKind, + Uint8Array, }; use semver::Version; -use crate::trigger::{ - EthereumBlockData, EthereumCallData, EthereumEventData, EthereumTransactionData, -}; - -use super::runtime_adapter::UnresolvedContractCall; - type AscH256 = Uint8Array; +type AscH2048 = Uint8Array; pub struct AscLogParamArray(Array>); @@ -46,6 +53,72 @@ impl AscIndexId for AscLogParamArray { const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::ArrayEventParam; } +pub struct AscTopicArray(Array>); + +impl AscType for AscTopicArray { + fn to_asc_bytes(&self) -> Result, DeterministicHostError> { + self.0.to_asc_bytes() + } + + fn from_asc_bytes( + asc_obj: &[u8], + api_version: &Version, + ) -> Result { + Ok(Self(Array::from_asc_bytes(asc_obj, api_version)?)) + } +} + +impl ToAscObj for Vec { + fn to_asc_obj( + &self, + heap: &mut H, + gas: &GasCounter, + ) -> Result { + let topics = self + .iter() + .map(|topic| asc_new(heap, topic, gas)) + .collect::, _>>()?; + Ok(AscTopicArray(Array::new(&topics, heap, gas)?)) + } +} + +impl AscIndexId for AscTopicArray { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::ArrayH256; +} + +pub struct AscLogArray(Array>); + +impl AscType for AscLogArray { + fn to_asc_bytes(&self) -> Result, DeterministicHostError> { + self.0.to_asc_bytes() + } + + fn from_asc_bytes( + asc_obj: &[u8], + api_version: &Version, + ) -> Result { + Ok(Self(Array::from_asc_bytes(asc_obj, api_version)?)) + } +} + +impl ToAscObj for Vec { + fn to_asc_obj( + &self, + heap: &mut H, + gas: &GasCounter, + ) -> Result { + let logs = self + .iter() + .map(|log| asc_new(heap, &log, gas)) + .collect::, _>>()?; + Ok(AscLogArray(Array::new(&logs, heap, gas)?)) + } +} + +impl AscIndexId for AscLogArray { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::ArrayLog; +} + #[repr(C)] #[derive(AscType)] pub struct AscUnresolvedContractCall_0_0_4 { @@ -227,6 +300,69 @@ impl AscIndexId for AscEthereumEvent, + pub topics: AscPtr, + pub data: AscPtr, + pub block_hash: AscPtr, + pub block_number: AscPtr, + pub transaction_hash: AscPtr, + pub transaction_index: AscPtr, + pub log_index: AscPtr, + pub transaction_log_index: AscPtr, + pub log_type: AscPtr, + pub removed: AscPtr>, +} + +impl AscIndexId for AscEthereumLog { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::Log; +} + +#[repr(C)] +#[derive(AscType)] +pub(crate) struct AscEthereumTransactionReceipt { + pub transaction_hash: AscPtr, + pub transaction_index: AscPtr, + pub block_hash: AscPtr, + pub block_number: AscPtr, + pub cumulative_gas_used: AscPtr, + pub gas_used: AscPtr, + pub contract_address: AscPtr, + pub logs: AscPtr, + pub status: AscPtr, + pub root: AscPtr, + pub logs_bloom: AscPtr, +} + +impl AscIndexId for AscEthereumTransactionReceipt { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::TransactionReceipt; +} + +/// Introduced in API Version 0.0.7, this is the same as [`AscEthereumEvent`] with an added +/// `receipt` field. +#[repr(C)] +#[derive(AscType)] +pub(crate) struct AscEthereumEvent_0_0_7 +where + T: AscType, + B: AscType, +{ + pub address: AscPtr, + pub log_index: AscPtr, + pub transaction_log_index: AscPtr, + pub log_type: AscPtr, + pub block: AscPtr, + pub transaction: AscPtr, + pub params: AscPtr, + pub receipt: AscPtr, +} + +impl AscIndexId for AscEthereumEvent_0_0_7 { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::EthereumEvent; +} + #[repr(C)] #[derive(AscType)] pub(crate) struct AscLogParam { @@ -441,6 +577,138 @@ where } } +impl ToAscObj> + for (EthereumEventData, Option) +where + T: AscType + AscIndexId, + B: AscType + AscIndexId, + EthereumTransactionData: ToAscObj, + EthereumBlockData: ToAscObj, +{ + fn to_asc_obj( + &self, + heap: &mut H, + gas: &GasCounter, + ) -> Result, DeterministicHostError> { + let (event_data, optional_receipt) = self; + let AscEthereumEvent { + address, + log_index, + transaction_log_index, + log_type, + block, + transaction, + params, + } = event_data.to_asc_obj(heap, gas)?; + let receipt = if let Some(receipt_data) = optional_receipt { + asc_new(heap, receipt_data, gas)? + } else { + AscPtr::null() + }; + Ok(AscEthereumEvent_0_0_7 { + address, + log_index, + transaction_log_index, + log_type, + block, + transaction, + params, + receipt, + }) + } +} + +impl ToAscObj for Log { + fn to_asc_obj( + &self, + heap: &mut H, + gas: &GasCounter, + ) -> Result { + Ok(AscEthereumLog { + address: asc_new(heap, &self.address, gas)?, + topics: asc_new(heap, &self.topics, gas)?, + data: asc_new(heap, self.data.0.as_slice(), gas)?, + block_hash: self + .block_hash + .map(|block_hash| asc_new(heap, &block_hash, gas)) + .unwrap_or(Ok(AscPtr::null()))?, + block_number: self + .block_number + .map(|block_number| asc_new(heap, &BigInt::from(block_number), gas)) + .unwrap_or(Ok(AscPtr::null()))?, + transaction_hash: self + .transaction_hash + .map(|txn_hash| asc_new(heap, &txn_hash, gas)) + .unwrap_or(Ok(AscPtr::null()))?, + transaction_index: self + .transaction_index + .map(|txn_index| asc_new(heap, &BigInt::from(txn_index), gas)) + .unwrap_or(Ok(AscPtr::null()))?, + log_index: self + .log_index + .map(|log_index| asc_new(heap, &BigInt::from_unsigned_u256(&log_index), gas)) + .unwrap_or(Ok(AscPtr::null()))?, + transaction_log_index: self + .transaction_log_index + .map(|index| asc_new(heap, &BigInt::from_unsigned_u256(&index), gas)) + .unwrap_or(Ok(AscPtr::null()))?, + log_type: self + .log_type + .as_ref() + .map(|log_type| asc_new(heap, &log_type, gas)) + .unwrap_or(Ok(AscPtr::null()))?, + removed: self + .removed + .map(|removed| asc_new(heap, &AscWrapped { inner: removed }, gas)) + .unwrap_or(Ok(AscPtr::null()))?, + }) + } +} + +impl ToAscObj for TransactionReceipt { + fn to_asc_obj( + &self, + heap: &mut H, + gas: &GasCounter, + ) -> Result { + Ok(AscEthereumTransactionReceipt { + transaction_hash: asc_new(heap, &self.transaction_hash, gas)?, + transaction_index: asc_new(heap, &BigInt::from(self.transaction_index), gas)?, + block_hash: self + .block_hash + .map(|block_hash| asc_new(heap, &block_hash, gas)) + .unwrap_or(Ok(AscPtr::null()))?, + block_number: self + .block_number + .map(|block_number| asc_new(heap, &BigInt::from(block_number), gas)) + .unwrap_or(Ok(AscPtr::null()))?, + cumulative_gas_used: asc_new( + heap, + &BigInt::from_unsigned_u256(&self.cumulative_gas_used), + gas, + )?, + gas_used: self + .gas_used + .map(|gas_used| asc_new(heap, &BigInt::from_unsigned_u256(&gas_used), gas)) + .unwrap_or(Ok(AscPtr::null()))?, + contract_address: self + .contract_address + .map(|contract_address| asc_new(heap, &contract_address, gas)) + .unwrap_or(Ok(AscPtr::null()))?, + logs: asc_new(heap, &self.logs, gas)?, + status: self + .status + .map(|status| asc_new(heap, &BigInt::from(status), gas)) + .unwrap_or(Ok(AscPtr::null()))?, + root: self + .root + .map(|root| asc_new(heap, &root, gas)) + .unwrap_or(Ok(AscPtr::null()))?, + logs_bloom: asc_new(heap, self.logs_bloom.as_bytes(), gas)?, + }) + } +} + impl ToAscObj for EthereumCallData { fn to_asc_obj( &self, diff --git a/chain/ethereum/src/tests.rs b/chain/ethereum/src/tests.rs index ac631121a27..eee594fdbc4 100644 --- a/chain/ethereum/src/tests.rs +++ b/chain/ethereum/src/tests.rs @@ -60,15 +60,15 @@ fn test_trigger_ordering() { // Event with transaction_index 1 and log_index 0; // should be the first element after sorting - let log1 = EthereumTrigger::Log(create_log(1, 0)); + let log1 = EthereumTrigger::Log(create_log(1, 0), None); // Event with transaction_index 1 and log_index 1; // should be the second element after sorting - let log2 = EthereumTrigger::Log(create_log(1, 1)); + let log2 = EthereumTrigger::Log(create_log(1, 1), None); // Event with transaction_index 2 and log_index 5; // should come after call1 and before call2 after sorting - let log3 = EthereumTrigger::Log(create_log(2, 5)); + let log3 = EthereumTrigger::Log(create_log(2, 5), None); let triggers = vec![ // Call triggers; these should be in the order 1, 2, 4, 3 after sorting diff --git a/chain/ethereum/src/trigger.rs b/chain/ethereum/src/trigger.rs index 0414b920dde..9c4720045ad 100644 --- a/chain/ethereum/src/trigger.rs +++ b/chain/ethereum/src/trigger.rs @@ -1,5 +1,8 @@ use graph::blockchain; use graph::blockchain::TriggerData; +use graph::data::subgraph::API_VERSION_0_0_2; +use graph::data::subgraph::API_VERSION_0_0_6; +use graph::data::subgraph::API_VERSION_0_0_7; use graph::prelude::ethabi::ethereum_types::H160; use graph::prelude::ethabi::ethereum_types::H256; use graph::prelude::ethabi::ethereum_types::U128; @@ -11,6 +14,7 @@ use graph::prelude::ethabi::LogParam; use graph::prelude::web3::types::Block; use graph::prelude::web3::types::Log; use graph::prelude::web3::types::Transaction; +use graph::prelude::web3::types::TransactionReceipt; use graph::prelude::BlockNumber; use graph::prelude::BlockPtr; use graph::prelude::{CheapClone, EthereumCall}; @@ -29,6 +33,7 @@ use crate::runtime::abi::AscEthereumBlock_0_0_6; use crate::runtime::abi::AscEthereumCall; use crate::runtime::abi::AscEthereumCall_0_0_3; use crate::runtime::abi::AscEthereumEvent; +use crate::runtime::abi::AscEthereumEvent_0_0_7; use crate::runtime::abi::AscEthereumTransaction_0_0_1; use crate::runtime::abi::AscEthereumTransaction_0_0_2; use crate::runtime::abi::AscEthereumTransaction_0_0_6; @@ -42,6 +47,7 @@ pub enum MappingTrigger { transaction: Arc, log: Arc, params: Vec, + receipt: Option, }, Call { block: Arc, @@ -80,6 +86,7 @@ impl std::fmt::Debug for MappingTrigger { transaction, log, params, + receipt: _, } => MappingTriggerWithoutBlock::Log { _transaction: transaction.cheap_clone(), _log: log.cheap_clone(), @@ -116,7 +123,9 @@ impl blockchain::MappingTrigger for MappingTrigger { transaction, log, params, + receipt, } => { + let api_version = heap.api_version(); let ethereum_event_data = EthereumEventData { block: EthereumBlockData::from(block.as_ref()), transaction: EthereumTransactionData::from(transaction.deref()), @@ -126,27 +135,36 @@ impl blockchain::MappingTrigger for MappingTrigger { log_type: log.log_type.clone(), params, }; - let api_version = heap.api_version(); - if api_version >= Version::new(0, 0, 6) { + if api_version >= API_VERSION_0_0_7 { + asc_new::< + AscEthereumEvent_0_0_7< + AscEthereumTransaction_0_0_6, + AscEthereumBlock_0_0_6, + >, + _, + _, + >(heap, &(ethereum_event_data, receipt), gas)? + .erase() + } else if api_version >= API_VERSION_0_0_6 { asc_new::< AscEthereumEvent, _, _, >(heap, ðereum_event_data, gas)? .erase() - } else if api_version >= Version::new(0, 0, 2) { - asc_new::, _, _>( - heap, - ðereum_event_data, - gas - )? - .erase() + } else if api_version >= API_VERSION_0_0_2 { + asc_new::< + AscEthereumEvent, + _, + _, + >(heap, ðereum_event_data, gas)? + .erase() } else { - asc_new::, _, _>( - heap, - ðereum_event_data, - gas - )? + asc_new::< + AscEthereumEvent, + _, + _, + >(heap, ðereum_event_data, gas)? .erase() } } @@ -199,7 +217,7 @@ impl blockchain::MappingTrigger for MappingTrigger { pub enum EthereumTrigger { Block(BlockPtr, EthereumBlockTriggerType), Call(Arc), - Log(Arc), + Log(Arc, Option), } impl PartialEq for EthereumTrigger { @@ -211,8 +229,10 @@ impl PartialEq for EthereumTrigger { (Self::Call(a), Self::Call(b)) => a == b, - (Self::Log(a), Self::Log(b)) => { - a.transaction_hash == b.transaction_hash && a.log_index == b.log_index + (Self::Log(a, a_receipt), Self::Log(b, b_receipt)) => { + a.transaction_hash == b.transaction_hash + && a.log_index == b.log_index + && a_receipt == b_receipt } _ => false, @@ -233,7 +253,9 @@ impl EthereumTrigger { match self { EthereumTrigger::Block(block_ptr, _) => block_ptr.number, EthereumTrigger::Call(call) => call.block_number, - EthereumTrigger::Log(log) => i32::try_from(log.block_number.unwrap().as_u64()).unwrap(), + EthereumTrigger::Log(log, _) => { + i32::try_from(log.block_number.unwrap().as_u64()).unwrap() + } } } @@ -241,7 +263,7 @@ impl EthereumTrigger { match self { EthereumTrigger::Block(block_ptr, _) => block_ptr.hash_as_h256(), EthereumTrigger::Call(call) => call.block_hash, - EthereumTrigger::Log(log) => log.block_hash.unwrap(), + EthereumTrigger::Log(log, _) => log.block_hash.unwrap(), } } } @@ -260,24 +282,24 @@ impl Ord for EthereumTrigger { (Self::Call(a), Self::Call(b)) => a.transaction_index.cmp(&b.transaction_index), // Events are ordered by their log index - (Self::Log(a), Self::Log(b)) => a.log_index.cmp(&b.log_index), + (Self::Log(a, _), Self::Log(b, _)) => a.log_index.cmp(&b.log_index), // Calls vs. events are logged by their tx index; // if they are from the same transaction, events come first - (Self::Call(a), Self::Log(b)) + (Self::Call(a), Self::Log(b, _)) if a.transaction_index == b.transaction_index.unwrap().as_u64() => { Ordering::Greater } - (Self::Log(a), Self::Call(b)) + (Self::Log(a, _), Self::Call(b)) if a.transaction_index.unwrap().as_u64() == b.transaction_index => { Ordering::Less } - (Self::Call(a), Self::Log(b)) => a + (Self::Call(a), Self::Log(b, _)) => a .transaction_index .cmp(&b.transaction_index.unwrap().as_u64()), - (Self::Log(a), Self::Call(b)) => a + (Self::Log(a, _), Self::Call(b)) => a .transaction_index .unwrap() .as_u64() @@ -295,7 +317,7 @@ impl PartialOrd for EthereumTrigger { impl TriggerData for EthereumTrigger { fn error_context(&self) -> std::string::String { let transaction_id = match self { - EthereumTrigger::Log(log) => log.transaction_hash, + EthereumTrigger::Log(log, _) => log.transaction_hash, EthereumTrigger::Call(call) => call.transaction_hash, EthereumTrigger::Block(..) => None, }; diff --git a/graph/src/data/subgraph/api_version.rs b/graph/src/data/subgraph/api_version.rs index 3a393600d8f..b0bfa949d7b 100644 --- a/graph/src/data/subgraph/api_version.rs +++ b/graph/src/data/subgraph/api_version.rs @@ -5,10 +5,18 @@ use thiserror::Error; use super::SubgraphManifestValidationError; +pub const API_VERSION_0_0_2: Version = Version::new(0, 0, 2); + /// This version adds a new subgraph validation step that rejects manifests whose mappings have /// different API versions if at least one of them is equal to or higher than `0.0.5`. pub const API_VERSION_0_0_5: Version = Version::new(0, 0, 5); +// Adds two new fields to the Transaction object: nonce and input +pub const API_VERSION_0_0_6: Version = Version::new(0, 0, 6); + +/// Enables event handlers to require transaction receipts in the runtime. +pub const API_VERSION_0_0_7: Version = Version::new(0, 0, 7); + /// Before this check was introduced, there were already subgraphs in the wild with spec version /// 0.0.3, due to confusion with the api version. To avoid breaking those, we accept 0.0.3 though it /// doesn't exist. @@ -17,6 +25,9 @@ pub const SPEC_VERSION_0_0_3: Version = Version::new(0, 0, 3); /// This version supports subgraph feature management. pub const SPEC_VERSION_0_0_4: Version = Version::new(0, 0, 4); +/// This version supports event handlers having access to transaction receipts. +pub const SPEC_VERSION_0_0_5: Version = Version::new(0, 0, 5); + pub const MIN_SPEC_VERSION: Version = Version::new(0, 0, 2); #[derive(Clone, PartialEq, Debug)] diff --git a/graph/src/runtime/mod.rs b/graph/src/runtime/mod.rs index 22fb7d3adb3..d23300b891b 100644 --- a/graph/src/runtime/mod.rs +++ b/graph/src/runtime/mod.rs @@ -301,6 +301,12 @@ pub enum IndexForAscTypeId { TendermintDuration = 131, TendermintTimestamp = 132, TendermintEventData = 133, + + // More Ethereum tyes + TransactionReceipt = 134, + Log = 135, + ArrayH256 = 136, + ArrayLog = 137, } impl ToAscObj for IndexForAscTypeId { diff --git a/runtime/test/src/test.rs b/runtime/test/src/test.rs index cba94b3c60d..b4575d051d0 100644 --- a/runtime/test/src/test.rs +++ b/runtime/test/src/test.rs @@ -135,6 +135,11 @@ async fn test_module_latest(subgraph_id: &str, wasm_file: &str) -> WasmInstance< trait WasmInstanceExt { fn invoke_export0_void(&self, f: &str) -> Result<(), wasmtime::Trap>; + fn invoke_export1_val_void( + &self, + f: &str, + v: V, + ) -> Result<(), wasmtime::Trap>; fn invoke_export0(&self, f: &str) -> AscPtr; fn invoke_export1(&mut self, f: &str, arg: &T) -> AscPtr where @@ -157,6 +162,7 @@ trait WasmInstanceExt { C2: AscType + AscIndexId, T1: ToAscObj + ?Sized, T2: ToAscObj + ?Sized; + fn invoke_export0_val(&mut self, func: &str) -> V; fn invoke_export1_val(&mut self, func: &str, v: &T) -> V where C: AscType + AscIndexId, @@ -195,6 +201,16 @@ impl WasmInstanceExt for WasmInstance { ptr.into() } + fn invoke_export1_val_void( + &self, + f: &str, + v: V, + ) -> Result<(), wasmtime::Trap> { + let func = self.get_func(f).typed().unwrap().clone(); + func.call(v)?; + Ok(()) + } + fn invoke_export2(&mut self, f: &str, arg0: &T1, arg1: &T2) -> AscPtr where C1: AscType + AscIndexId, @@ -229,6 +245,11 @@ impl WasmInstanceExt for WasmInstance { func.call((arg0.wasm_ptr(), arg1.wasm_ptr())) } + fn invoke_export0_val(&mut self, func: &str) -> V { + let func = self.get_func(func).typed().unwrap().clone(); + func.call(()).unwrap() + } + fn invoke_export1_val(&mut self, func: &str, v: &T) -> V where C: AscType + AscIndexId, @@ -1089,3 +1110,36 @@ async fn test_array_blowup() { .to_string() .contains("Gas limit exceeded. Used: 11286295575421")); } + +#[tokio::test] +async fn test_boolean() { + let mut module = test_module_latest("boolean", "boolean.wasm").await; + + let true_: i32 = module.invoke_export0_val("testReturnTrue"); + assert_eq!(true_, 1); + + let false_: i32 = module.invoke_export0_val("testReturnFalse"); + assert_eq!(false_, 0); + + // non-zero values are true + for x in (-10i32..10).filter(|&x| x != 0) { + assert!(module.invoke_export1_val_void("testReceiveTrue", x).is_ok(),); + } + + // zero is not true + assert!(module + .invoke_export1_val_void("testReceiveTrue", 0i32) + .is_err()); + + // zero is false + assert!(module + .invoke_export1_val_void("testReceiveFalse", 0i32) + .is_ok()); + + // non-zero values are not false + for x in (-10i32..10).filter(|&x| x != 0) { + assert!(module + .invoke_export1_val_void("testReceiveFalse", x) + .is_err()); + } +} diff --git a/runtime/test/wasm_test/api_version_0_0_5/boolean.ts b/runtime/test/wasm_test/api_version_0_0_5/boolean.ts new file mode 100644 index 00000000000..7bf85ca45b1 --- /dev/null +++ b/runtime/test/wasm_test/api_version_0_0_5/boolean.ts @@ -0,0 +1,17 @@ +export * from "./common/global" + +export function testReceiveTrue(a: bool): void { + assert(a) +} + +export function testReceiveFalse(a: bool): void { + assert(!a) +} + +export function testReturnTrue(): bool { + return true +} + +export function testReturnFalse(): bool { + return false +} diff --git a/runtime/test/wasm_test/api_version_0_0_5/boolean.wasm b/runtime/test/wasm_test/api_version_0_0_5/boolean.wasm new file mode 100644 index 00000000000..ba80672b1bb Binary files /dev/null and b/runtime/test/wasm_test/api_version_0_0_5/boolean.wasm differ diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index 0a120d8dee2..e6a09f2efcb 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -1153,7 +1153,7 @@ mod data { .unwrap(); } - /// Queries the database for all the transaction receipts in a given block range. + /// Queries the database for all the transaction receipts in a given block. pub(crate) fn find_transaction_receipts_in_block( &self, conn: &PgConnection,