Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 5 additions & 55 deletions crates/tracex/src/exex.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,12 @@
//! Tracex execution extension wiring.

use alloy_primitives::TxHash;
use eyre::Result;
use futures::StreamExt;
use reth::{
api::{BlockBody, FullNodeComponents},
core::primitives::{AlloyBlockHeader, transaction::TxHashRef},
transaction_pool::{FullTransactionEvent, TransactionPool},
};
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth::{api::FullNodeComponents, transaction_pool::TransactionPool};
use reth_exex::ExExContext;
use reth_tracing::tracing::debug;

use crate::{
events::{Pool, TxEvent},
tracker::Tracker,
};
use crate::tracker::Tracker;

/// Execution extension that tracks transaction timing from mempool to inclusion.
///
Expand All @@ -33,54 +25,12 @@ pub async fn tracex_exex<Node: FullNodeComponents>(
loop {
tokio::select! {
// Track # of transactions dropped and replaced.
Some(full_event) = all_events_stream.next() => {
match full_event {
FullTransactionEvent::Pending(tx_hash) => {
tracker.transaction_inserted(tx_hash, TxEvent::Pending);
tracker.transaction_moved(tx_hash, Pool::Pending);
}
FullTransactionEvent::Queued(tx_hash, _) => {
tracker.transaction_inserted(tx_hash, TxEvent::Queued);
tracker.transaction_moved(tx_hash, Pool::Queued);
}
FullTransactionEvent::Discarded(tx_hash) => {
tracker.transaction_completed(tx_hash, TxEvent::Dropped);
}
FullTransactionEvent::Replaced{ transaction, replaced_by } => {
let tx_hash = transaction.hash();
tracker.transaction_replaced(*tx_hash, TxHash::from(replaced_by));
}
_ => {
// Other events.
}
}
}
Some(full_event) = all_events_stream.next() => tracker.handle_event(full_event),

// Use chain notifications to track time to inclusion.
Some(notification) = ctx.notifications.next() => {
match notification {
Ok(ExExNotification::ChainCommitted { new }) => {
// Process all transactions in committed chain.
for block in new.blocks().values() {
for transaction in block.body().transactions() {
tracker.transaction_completed(*transaction.tx_hash(), TxEvent::BlockInclusion);
}
}
ctx.events.send(ExExEvent::FinishedHeight(new.tip().num_hash()))?;
}
Ok(ExExNotification::ChainReorged { old: _, new }) => {
debug!(target: "tracex", tip = ?new.tip().number(), "Chain reorg detected");
for block in new.blocks().values() {
for transaction in block.body().transactions() {
tracker.transaction_completed(*transaction.tx_hash(), TxEvent::BlockInclusion);
}
}
ctx.events.send(ExExEvent::FinishedHeight(new.tip().num_hash()))?;
}
Ok(ExExNotification::ChainReverted { old }) => {
debug!(target: "tracex", old_tip = ?old.tip().number(), "Chain reverted");
ctx.events.send(ExExEvent::FinishedHeight(old.tip().num_hash()))?;
}
Ok(notification) => ctx.events.send(tracker.handle_notification(notification))?,
Err(e) => {
debug!(target: "tracex", error = %e, "Notification error");
return Err(e);
Expand Down
79 changes: 71 additions & 8 deletions crates/tracex/src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ use std::{
use alloy_primitives::TxHash;
use chrono::Local;
use lru::LruCache;
use reth::{
api::{BlockBody, NodePrimitives},
core::primitives::{AlloyBlockHeader, transaction::TxHashRef},
providers::Chain,
transaction_pool::{FullTransactionEvent, PoolTransaction},
};
use reth_exex::{ExExEvent, ExExNotification};
use reth_tracing::tracing::{debug, info};

use crate::{EventLog, Pool, TxEvent};
Expand Down Expand Up @@ -36,6 +43,60 @@ impl Tracker {
}
}

/// Parse [`FullTransactionEvent`]s and update the tracker.
pub fn handle_event<T: PoolTransaction>(&mut self, event: FullTransactionEvent<T>) {
match event {
FullTransactionEvent::Pending(tx_hash) => {
self.transaction_inserted(tx_hash, TxEvent::Pending);
self.transaction_moved(tx_hash, Pool::Pending);
}
FullTransactionEvent::Queued(tx_hash, _) => {
self.transaction_inserted(tx_hash, TxEvent::Queued);
self.transaction_moved(tx_hash, Pool::Queued);
}
FullTransactionEvent::Discarded(tx_hash) => {
self.transaction_completed(tx_hash, TxEvent::Dropped);
}
FullTransactionEvent::Replaced { transaction, replaced_by } => {
let tx_hash = transaction.hash();
self.transaction_replaced(*tx_hash, TxHash::from(replaced_by));
}
_ => {
// Other events.
}
}
}

/// Parse [`ExExNotification`]s and update the tracker.
pub fn handle_notification<N: NodePrimitives>(
&mut self,
notification: ExExNotification<N>,
) -> ExExEvent {
match notification {
ExExNotification::ChainCommitted { new } => {
self.track_committed_chain(&new);
ExExEvent::FinishedHeight(new.tip().num_hash())
}
ExExNotification::ChainReorged { old: _, new } => {
debug!(target: "tracex", tip = ?new.tip().number(), "Chain reorg detected");
self.track_committed_chain(&new);
ExExEvent::FinishedHeight(new.tip().num_hash())
}
ExExNotification::ChainReverted { old } => {
debug!(target: "tracex", old_tip = ?old.tip().number(), "Chain reverted");
ExExEvent::FinishedHeight(old.tip().num_hash())
}
}
}

fn track_committed_chain<N: NodePrimitives>(&mut self, chain: &Chain<N>) {
for block in chain.blocks().values() {
for transaction in block.body().transactions() {
self.transaction_completed(*transaction.tx_hash(), TxEvent::BlockInclusion);
}
}
}

/// Track the first time we see a transaction in the mempool.
pub fn transaction_inserted(&mut self, tx_hash: TxHash, event: TxEvent) {
// If we've seen the tx before, don't track it again. For example,
Expand Down Expand Up @@ -79,7 +140,7 @@ impl Tracker {
event_log.push(Local::now(), event);
self.txs.put(tx_hash, event_log);

record_histogram(time_in_mempool, event);
Self::record_histogram(time_in_mempool, event);
}
}

Expand All @@ -103,7 +164,7 @@ impl Tracker {

// If a tx is included/dropped, log it now.
self.log(&tx_hash, &event_log, &format!("Transaction {event}"));
record_histogram(time_in_mempool, event);
Self::record_histogram(time_in_mempool, event);
}
}

Expand All @@ -121,10 +182,11 @@ impl Tracker {
event_log.push(Local::now(), TxEvent::Replaced);
self.txs.put(replaced_by, event_log);

record_histogram(time_in_mempool, TxEvent::Replaced);
Self::record_histogram(time_in_mempool, TxEvent::Replaced);
}
}

/// Logs an [`EventLog`] through tracing.
fn log(&self, tx_hash: &TxHash, event_log: &EventLog, msg: &str) {
if !self.enable_logs {
return;
Expand All @@ -144,12 +206,13 @@ impl Tracker {
}

self.log(tx_hash, event_log, "Transaction removed from cache due to limit");
record_histogram(event_log.mempool_time.elapsed(), TxEvent::Overflowed);
Self::record_histogram(event_log.mempool_time.elapsed(), TxEvent::Overflowed);
true
}
}

fn record_histogram(time_in_mempool: Duration, event: TxEvent) {
metrics::histogram!("reth_transaction_tracing_tx_event", "event" => event.to_string())
.record(time_in_mempool.as_millis() as f64);
/// Records a metrics histogram.
fn record_histogram(time_in_mempool: Duration, event: TxEvent) {
metrics::histogram!("reth_transaction_tracing_tx_event", "event" => event.to_string())
.record(time_in_mempool.as_millis() as f64);
}
}