Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
/tests/integration-tests/**/generated
/tests/integration-tests/**/node_modules
/tests/integration-tests/**/yarn.lock
/tests/integration-tests/**/yarn-error.log

# Built solidity contracts.
/tests/integration-tests/**/bin
Expand Down
2 changes: 1 addition & 1 deletion chain/ethereum/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ fn main() {
.out_dir("src/protobuf")
.format(true)
.compile(&["proto/codec.proto"], &["proto"])
.expect("Failed to compile StreamingFast Ethereum proto(s)");
.expect("Failed to compile Firehose Ethereum proto(s)");
}
48 changes: 41 additions & 7 deletions chain/ethereum/examples/firehose.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,43 @@
use anyhow::Error;
use graph::{
env::env_var,
log::logger,
prelude::{prost, tokio, tonic},
{firehose, firehose::FirehoseEndpoint, firehose::ForkStep},
};
use graph_chain_ethereum::codec;
use hex::ToHex;
use prost::Message;
use std::sync::Arc;
use tonic::Streaming;

#[tokio::main]
async fn main() -> Result<(), Error> {
let mut cursor: Option<String> = None;
let token_env = env_var("SF_API_TOKEN", "".to_string());
let mut token: Option<String> = None;
if token_env.len() > 0 {
token = Some(token_env);
}

let logger = logger(true);
let firehose = Arc::new(
FirehoseEndpoint::new(logger, "firehose", "https://bsc.streamingfast.io:443", None).await?,
FirehoseEndpoint::new(
logger,
"firehose",
"https://api.streamingfast.io:443",
token,
)
.await?,
);

loop {
println!("connecting to the stream!");
println!("Connecting to the stream!");
let mut stream: Streaming<firehose::Response> = match firehose
.clone()
.stream_blocks(firehose::Request {
start_block_num: 7000000,
start_block_num: 12369739,
stop_block_num: 12369739,
start_cursor: match &cursor {
Some(c) => c.clone(),
None => String::from(""),
Expand All @@ -35,7 +49,7 @@ async fn main() -> Result<(), Error> {
{
Ok(s) => s,
Err(e) => {
println!("could not connect to stream! {}", e);
println!("Could not connect to stream! {}", e);
continue;
}
};
Expand All @@ -44,11 +58,11 @@ async fn main() -> Result<(), Error> {
let resp = match stream.message().await {
Ok(Some(t)) => t,
Ok(None) => {
println!("stream completed");
break;
println!("Stream completed");
return Ok(());
}
Err(e) => {
println!("error getting message {}", e);
println!("Error getting message {}", e);
break;
}
};
Expand All @@ -62,6 +76,26 @@ async fn main() -> Result<(), Error> {
hex::encode(b.hash),
resp.step
);
b.transaction_traces.iter().for_each(|trx| {
let mut logs: Vec<String> = vec![];
trx.calls.iter().for_each(|call| {
call.logs.iter().for_each(|log| {
logs.push(format!(
"Log {} Topics, Address {}, Trx Index {}, Block Index {}",
log.topics.len(),
log.address.encode_hex::<String>(),
log.index,
log.block_index
));
})
});

if logs.len() > 0 {
println!("Transaction {}", trx.hash.encode_hex::<String>());
logs.iter().for_each(|log| println!("{}", log));
}
});

cursor = Some(resp.cursor)
}
Err(e) => panic!("Unable to decode {:?}", e),
Expand Down
10 changes: 10 additions & 0 deletions chain/ethereum/proto/codec.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ message Block {
string filtering_exclude_filter_expr = 42;
}

// HeaderOnlyBlock is a standard [Block] structure where all other fields are
// removed so that hydrating that object from a [Block] bytes payload will
// drastically reduced allocated memory required to hold the full block.
//
// This can be used to unpack a [Block] when only the [BlockHeader] information
// is required and greatly reduced required memory.
message HeaderOnlyBlock {
BlockHeader header = 5;
}

// BlockWithRefs is a lightweight block, with traces and transactions
// purged from the `block` within, and only. It is used in transports
// to pass block data around.
Expand Down
44 changes: 39 additions & 5 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{Context, Error};
use graph::blockchain::BlockchainKind;
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::env::env_var;
use graph::firehose::{FirehoseEndpoints, ForkStep};
use graph::firehose::{FirehoseEndpoint, FirehoseEndpoints, ForkStep};
use graph::prelude::{
EthereumBlock, EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, StopwatchMetrics,
};
Expand Down Expand Up @@ -187,6 +187,7 @@ impl Blockchain for Chain {
deployment: DeploymentLocator,
block_cursor: Option<String>,
start_blocks: Vec<BlockNumber>,
subgraph_current_block: Option<BlockPtr>,
filter: Arc<Self::TriggerFilter>,
metrics: Arc<BlockStreamMetrics>,
unified_api_version: UnifiedMappingApiVersion,
Expand Down Expand Up @@ -214,10 +215,13 @@ impl Blockchain for Chain {
.subgraph_logger(&deployment)
.new(o!("component" => "FirehoseBlockStream"));

let firehose_mapper = Arc::new(FirehoseMapper {});
let firehose_mapper = Arc::new(FirehoseMapper {
endpoint: firehose_endpoint.cheap_clone(),
});

Ok(Box::new(FirehoseBlockStream::new(
firehose_endpoint,
subgraph_current_block,
block_cursor,
firehose_mapper,
adapter,
Expand All @@ -231,7 +235,7 @@ impl Blockchain for Chain {
&self,
deployment: DeploymentLocator,
start_blocks: Vec<BlockNumber>,
subgraph_start_block: Option<BlockPtr>,
subgraph_current_block: Option<BlockPtr>,
filter: Arc<Self::TriggerFilter>,
metrics: Arc<BlockStreamMetrics>,
unified_api_version: UnifiedMappingApiVersion,
Expand Down Expand Up @@ -282,7 +286,7 @@ impl Blockchain for Chain {
*MAX_BLOCK_RANGE_SIZE,
*TARGET_TRIGGERS_PER_BLOCK_RANGE,
unified_api_version,
subgraph_start_block,
subgraph_current_block,
)))
}

Expand Down Expand Up @@ -519,7 +523,9 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
}
}

pub struct FirehoseMapper {}
pub struct FirehoseMapper {
endpoint: Arc<FirehoseEndpoint>,
}

#[async_trait]
impl FirehoseMapperTrait<Chain> for FirehoseMapper {
Expand Down Expand Up @@ -585,4 +591,32 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
}
}
}

async fn block_ptr_for_number(
&self,
logger: &Logger,
number: BlockNumber,
) -> Result<BlockPtr, Error> {
self.endpoint
.block_ptr_for_number::<codec::HeaderOnlyBlock>(logger, number)
.await
}

async fn final_block_ptr_for(
&self,
logger: &Logger,
block: &BlockFinality,
) -> Result<BlockPtr, Error> {
// Firehose for Ethereum has an hard-coded confirmations for finality sets to 200 block
// behind the current block. The magic value 200 here comes from this hard-coded Firehose
// value.
let final_block_number = match block.number() {
x if x >= 200 => x - 200,
_ => 0,
};

self.endpoint
.block_ptr_for_number::<codec::HeaderOnlyBlock>(logger, final_block_number)
.await
}
}
68 changes: 56 additions & 12 deletions chain/ethereum/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,21 @@ impl Into<EthereumBlockWithCalls> for &Block {
}
}

impl From<Block> for BlockPtr {
fn from(b: Block) -> BlockPtr {
(&b).into()
impl BlockHeader {
pub fn parent_ptr(&self) -> Option<BlockPtr> {
match self.parent_hash.len() {
0 => None,
_ => Some(BlockPtr::from((
H256::from_slice(self.parent_hash.as_ref()),
self.number - 1,
))),
}
}
}

impl<'a> From<&'a BlockHeader> for BlockPtr {
fn from(b: &'a BlockHeader) -> BlockPtr {
BlockPtr::from((H256::from_slice(b.hash.as_ref()), b.number))
}
}

Expand All @@ -314,24 +326,56 @@ impl<'a> From<&'a Block> for BlockPtr {
}
}

impl Block {
pub fn header(&self) -> &BlockHeader {
self.header.as_ref().unwrap()
}

pub fn ptr(&self) -> BlockPtr {
BlockPtr::from(self.header())
}

pub fn parent_ptr(&self) -> Option<BlockPtr> {
self.header().parent_ptr()
}
}

impl BlockchainBlock for Block {
fn number(&self) -> i32 {
BlockNumber::try_from(self.number).unwrap()
BlockNumber::try_from(self.header().number).unwrap()
}

fn ptr(&self) -> BlockPtr {
self.into()
}

fn parent_ptr(&self) -> Option<BlockPtr> {
let parent_hash = &self.header.as_ref().unwrap().parent_hash;
self.parent_ptr()
}
}

match parent_hash.len() {
0 => None,
_ => Some(BlockPtr::from((
H256::from_slice(parent_hash.as_ref()),
self.number - 1,
))),
}
impl HeaderOnlyBlock {
pub fn header(&self) -> &BlockHeader {
self.header.as_ref().unwrap()
}
}

impl<'a> From<&'a HeaderOnlyBlock> for BlockPtr {
fn from(b: &'a HeaderOnlyBlock) -> BlockPtr {
BlockPtr::from(b.header())
}
}

impl BlockchainBlock for HeaderOnlyBlock {
fn number(&self) -> i32 {
BlockNumber::try_from(self.header().number).unwrap()
}

fn ptr(&self) -> BlockPtr {
self.into()
}

fn parent_ptr(&self) -> Option<BlockPtr> {
self.header().parent_ptr()
}
}
5 changes: 2 additions & 3 deletions chain/ethereum/src/ingestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ impl BlockIngestor {
None => {
info!(
self.logger,
"Downloading latest blocks from Ethereum. \
This may take a few minutes..."
"Downloading latest blocks from Ethereum, this may take a few minutes..."
);
}
Some(head_block_ptr) => {
Expand All @@ -135,7 +134,7 @@ impl BlockIngestor {
if distance > 0 {
info!(
self.logger,
"Syncing {} blocks from Ethereum.",
"Syncing {} blocks from Ethereum",
blocks_needed;
"current_block_head" => head_number,
"latest_block_head" => latest_number,
Expand Down
11 changes: 11 additions & 0 deletions chain/ethereum/src/protobuf/sf.ethereum.codec.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ pub struct Block {
#[prost(string, tag = "42")]
pub filtering_exclude_filter_expr: ::prost::alloc::string::String,
}
/// HeaderOnlyBlock is a standard [Block] structure where all other fields are
/// removed so that hydrating that object from a [Block] bytes payload will
/// drastically reduced allocated memory required to hold the full block.
///
/// This can be used to unpack a [Block] when only the [BlockHeader] information
/// is required and greatly reduced required memory.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HeaderOnlyBlock {
#[prost(message, optional, tag = "5")]
pub header: ::core::option::Option<BlockHeader>,
}
/// BlockWithRefs is a lightweight block, with traces and transactions
/// purged from the `block` within, and only. It is used in transports
/// to pass block data around.
Expand Down
2 changes: 1 addition & 1 deletion chain/near/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ fn main() {
.out_dir("src/protobuf")
.format(true)
.compile(&["proto/codec.proto"], &["proto"])
.expect("Failed to compile StreamingFast NEAR proto(s)");
.expect("Failed to compile Firehose NEAR proto(s)");
}
Loading