Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions aggregation_mode/proof_aggregator/src/backend/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ pub struct Config {
pub sp1_chunk_aggregator_vk_hash: String,
pub monthly_budget_eth: f64,
pub db_connection_urls: Vec<String>,
pub max_bump_retries: u16,
pub bump_retry_interval_seconds: u64,
pub max_fee_bump_percentage: u64,
pub max_priority_fee_upper_limit: u128,
}

impl Config {
Expand Down
287 changes: 45 additions & 242 deletions aggregation_mode/proof_aggregator/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,12 @@ use crate::{

use alloy::{
consensus::{BlobTransactionSidecar, EnvKzgSettings, EthereumTxEnvelope, TxEip4844WithSidecar},
eips::{
eip4844::BYTES_PER_BLOB, eip7594::BlobTransactionSidecarEip7594, BlockNumberOrTag,
Encodable2718,
},
eips::{eip4844::BYTES_PER_BLOB, eip7594::BlobTransactionSidecarEip7594, Encodable2718},
hex,
network::{EthereumWallet, TransactionBuilder},
primitives::{utils::parse_ether, Address, TxHash, U256},
network::EthereumWallet,
primitives::{utils::parse_ether, Address, U256},
providers::{PendingTransactionError, Provider, ProviderBuilder},
rpc::types::{TransactionReceipt, TransactionRequest},
rpc::types::TransactionReceipt,
signers::local::LocalSigner,
};
use config::Config;
Expand All @@ -55,16 +52,6 @@ pub enum AggregatedProofSubmissionError {
MerkleRootMisMatch,
StoringMerklePaths(DbError),
GasPriceError(String),
LatestBlockNotFound,
BaseFeePerGasMissing,
}

enum SubmitOutcome {
// NOTE: Boxed because enums are sized to their largest variant; without boxing,
// every `SubmitOutcome` would reserve space for a full `TransactionReceipt`,
// even in the `Pending` case (see clippy::large_enum_variant).
Confirmed(Box<TransactionReceipt>),
Pending(TxHash),
}

pub struct ProofAggregator {
Expand All @@ -75,7 +62,6 @@ pub struct ProofAggregator {
sp1_chunk_aggregator_vk_hash_bytes: [u8; 32],
risc0_chunk_aggregator_image_id_bytes: [u8; 32],
db: Db,
signer_address: Address,
}

impl ProofAggregator {
Expand All @@ -86,9 +72,7 @@ impl ProofAggregator {
config.ecdsa.private_key_store_password.clone(),
)
.expect("Keystore signer should be `cast wallet` compliant");
let wallet = EthereumWallet::from(signer.clone());

let signer_address = signer.address();
let wallet = EthereumWallet::from(signer);

// Check if the monthly budget is non-negative to avoid runtime errors later
let _monthly_budget_in_wei = parse_ether(&config.monthly_budget_eth.to_string())
Expand Down Expand Up @@ -133,7 +117,6 @@ impl ProofAggregator {
sp1_chunk_aggregator_vk_hash_bytes,
risc0_chunk_aggregator_image_id_bytes,
db,
signer_address,
}
}

Expand Down Expand Up @@ -351,98 +334,7 @@ impl ProofAggregator {

info!("Sending proof to ProofAggregationService contract...");

let max_retries = self.config.max_bump_retries;

let mut last_error: Option<AggregatedProofSubmissionError> = None;

let mut pending_hashes: Vec<TxHash> = Vec::with_capacity(max_retries as usize);

// Get the nonce once at the beginning and reuse it for all retries
let nonce = self
.proof_aggregation_service
.provider()
.get_transaction_count(self.signer_address)
.await
.map_err(|e| {
RetryError::Transient(
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(format!(
"Failed to get nonce: {e}"
)),
)
})?;

info!("Using nonce {}", nonce);

for attempt in 0..max_retries {
info!("Transaction attempt {} of {}", attempt + 1, max_retries);

// Wrap the entire transaction submission in a result to catch all errors, passing
// the same nonce to all attempts
let attempt_result = self
.try_submit_transaction(
&blob,
blob_versioned_hash,
aggregated_proof,
nonce,
attempt,
)
.await;

match attempt_result {
Ok(SubmitOutcome::Confirmed(receipt)) => {
info!(
"Transaction confirmed successfully on attempt {}",
attempt + 1
);
return Ok(*receipt);
}
Ok(SubmitOutcome::Pending(tx_hash)) => {
warn!(
"Attempt {} timed out waiting for receipt; storing pending tx",
attempt + 1
);
pending_hashes.push(tx_hash);
last_error = Some(
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(
"Timed out waiting for receipt".to_string(),
),
);
}
Err(err) => {
warn!("Attempt {} failed: {:?}", attempt + 1, err);
last_error = Some(err);
}
}

// Check if any pending tx was confirmed before retrying
if let Some(receipt) = self.check_pending_txs_confirmed(&pending_hashes).await {
return Ok(receipt);
}

info!("Retrying with bumped gas fees and same nonce {}...", nonce);
tokio::time::sleep(Duration::from_millis(500)).await;
}

warn!("Max retries ({}) exceeded", max_retries);
Err(RetryError::Transient(last_error.unwrap_or_else(|| {
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(
"Max retries exceeded with no error details".to_string(),
)
})))
}

async fn try_submit_transaction(
&self,
blob: &BlobTransactionSidecar,
blob_versioned_hash: [u8; 32],
aggregated_proof: &AlignedProof,
nonce: u64,
attempt: u16,
) -> Result<SubmitOutcome, AggregatedProofSubmissionError> {
let retry_interval = Duration::from_secs(self.config.bump_retry_interval_seconds);

// Build the transaction request
let mut tx_req = match aggregated_proof {
let tx_req = match aggregated_proof {
AlignedProof::SP1(proof) => self
.proof_aggregation_service
.verifyAggregationSP1(
Expand All @@ -451,170 +343,81 @@ impl ProofAggregator {
proof.proof_with_pub_values.bytes().into(),
self.sp1_chunk_aggregator_vk_hash_bytes.into(),
)
.sidecar(blob.clone())
.sidecar(blob)
.into_transaction_request(),
AlignedProof::Risc0(proof) => {
let encoded_seal = encode_seal(&proof.receipt).map_err(|e| {
AggregatedProofSubmissionError::Risc0EncodingSeal(e.to_string())
})?;
let encoded_seal = encode_seal(&proof.receipt)
.map_err(|e| AggregatedProofSubmissionError::Risc0EncodingSeal(e.to_string()))
.map_err(RetryError::Permanent)?;
self.proof_aggregation_service
.verifyAggregationRisc0(
blob_versioned_hash.into(),
encoded_seal.into(),
proof.receipt.journal.bytes.clone().into(),
self.risc0_chunk_aggregator_image_id_bytes.into(),
)
.sidecar(blob.clone())
.sidecar(blob)
.into_transaction_request()
}
};

// Set the nonce explicitly
tx_req = tx_req.with_nonce(nonce);

// Apply gas fee bump for retries
tx_req = self.apply_gas_fee_bump(tx_req, attempt).await?;

let provider = self.proof_aggregation_service.provider();

// Fill the transaction
let envelope = provider
.fill(tx_req)
.await
.map_err(|err| {
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(format!(
"Failed to fill transaction: {err}"
))
})?
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(
err.to_string(),
)
})
.map_err(RetryError::Transient)?
.try_into_envelope()
.map_err(|err| {
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(format!(
"Failed to convert to envelope: {err}"
))
})?;

// Convert to EIP-4844 transaction
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(
err.to_string(),
)
})
.map_err(RetryError::Transient)?;
let tx: EthereumTxEnvelope<TxEip4844WithSidecar<BlobTransactionSidecarEip7594>> = envelope
.try_into_pooled()
.map_err(|err| {
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(format!(
"Failed to pool transaction: {err}"
))
})?
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(
err.to_string(),
)
})
.map_err(RetryError::Transient)?
.try_map_eip4844(|tx| {
tx.try_map_sidecar(|sidecar| sidecar.try_into_7594(EnvKzgSettings::Default.get()))
})
.map_err(|err| {
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(format!(
"Failed to convert to EIP-7594: {err}"
))
})?;
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(
err.to_string(),
)
})
.map_err(RetryError::Transient)?;

// Send the transaction
let encoded_tx = tx.encoded_2718();
let pending_tx = provider
.send_raw_transaction(&encoded_tx)
.await
.map_err(|err| {
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(format!(
"Failed to send raw transaction: {err}"
))
})?;

let tx_hash = *pending_tx.tx_hash();

let receipt_result = tokio::time::timeout(retry_interval, pending_tx.get_receipt()).await;

match receipt_result {
Ok(Ok(receipt)) => Ok(SubmitOutcome::Confirmed(Box::new(receipt))),
Ok(Err(err)) => Err(
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(format!(
"Error getting receipt: {err}"
)),
),
Err(_) => Ok(SubmitOutcome::Pending(tx_hash)),
}
}

// Checks if any of the pending transactions have been confirmed.
// Returns the receipt if one is found, otherwise None.
async fn check_pending_txs_confirmed(
&self,
pending_hashes: &[TxHash],
) -> Option<TransactionReceipt> {
for tx_hash in pending_hashes {
if let Ok(Some(receipt)) = self
.proof_aggregation_service
.provider()
.get_transaction_receipt(*tx_hash)
.await
{
info!("Pending tx {} confirmed before retry", tx_hash);
return Some(receipt);
}
}
None
}

// Updates the gas fees of a `TransactionRequest` using EIP-1559 fee parameters.
// Intended for retrying an on-chain submission after a timeout.
//
// Strategy:
// - Fetch the current base fee from the latest block.
// - Fetch the suggested priority fee from the network (eth_maxPriorityFeePerGas).
// - Compute priority fee as: suggested * (1 + (attempt + 1) * 0.1), capped at `max_priority_fee_upper_limit`.
// - Compute `max_fee_per_gas` as: (1 + max_fee_bump_percentage/100) * base_fee + priority_fee.
//
// Fees are recomputed on each retry using the latest base fee.

async fn apply_gas_fee_bump(
&self,
tx_req: TransactionRequest,
attempt: u16,
) -> Result<TransactionRequest, AggregatedProofSubmissionError> {
let provider = self.proof_aggregation_service.provider();

let max_fee_bump_percentage = self.config.max_fee_bump_percentage;
let max_priority_fee_upper_limit = self.config.max_priority_fee_upper_limit;
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(
err.to_string(),
)
})
.map_err(RetryError::Transient)?;

let latest_block = provider
.get_block_by_number(BlockNumberOrTag::Latest)
.await
.map_err(|e| AggregatedProofSubmissionError::GasPriceError(e.to_string()))?
.ok_or(AggregatedProofSubmissionError::LatestBlockNotFound)?;

let current_base_fee = latest_block
.header
.base_fee_per_gas
.ok_or(AggregatedProofSubmissionError::BaseFeePerGasMissing)?
as f64;

// Fetch suggested priority fee from the network
let suggested_priority_fee = provider
.get_max_priority_fee_per_gas()
let receipt = pending_tx
.get_receipt()
.await
.map_err(|e| AggregatedProofSubmissionError::GasPriceError(e.to_string()))?;

// Calculate priority fee: suggested * (attempt + 1), capped at max
let priority_fee_multiplier = (attempt + 1) as u128;
let max_priority_fee_per_gas =
(suggested_priority_fee * priority_fee_multiplier).min(max_priority_fee_upper_limit);

// Calculate max fee with cumulative bump per attempt to ensure replacement tx is accepted
let max_fee_multiplier = 1.0 + max_fee_bump_percentage as f64 / 100.0;
let max_fee_per_gas =
(max_fee_multiplier * current_base_fee) as u128 + max_priority_fee_per_gas;

info!(
"Base fee: {:.4} Gwei. Applying max_fee_per_gas: {:.4} Gwei and max_priority_fee_per_gas: {:.4} Gwei to tx",
current_base_fee / 1e9,
max_fee_per_gas as f64 / 1e9,
max_priority_fee_per_gas as f64 / 1e9
);
.map_err(|err| {
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(
err.to_string(),
)
})
.map_err(RetryError::Transient)?;

Ok(tx_req
.with_max_fee_per_gas(max_fee_per_gas)
.with_max_priority_fee_per_gas(max_priority_fee_per_gas))
Ok(receipt)
}

async fn wait_until_can_submit_aggregated_proof(
Expand Down
6 changes: 0 additions & 6 deletions config-files/config-proof-aggregator-ethereum-package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ monthly_budget_eth: 15.0
sp1_chunk_aggregator_vk_hash: "00d6e32a34f68ea643362b96615591c94ee0bf99ee871740ab2337966a4f77af"
risc0_chunk_aggregator_image_id: "8908f01022827e80a5de71908c16ee44f4a467236df20f62e7c994491629d74c"

# These values modify the bumping behavior after the aggregated proof on-chain submission times out.
max_bump_retries: 5
bump_retry_interval_seconds: 120
max_fee_bump_percentage: 100
max_priority_fee_upper_limit: 3000000000 # 3 Gwei

ecdsa:
private_key_store_path: "config-files/anvil.proof-aggregator.ecdsa.key.json"
private_key_store_password: ""
Loading
Loading