From cb8fb0aa782ee53ea8beeca354b1ff72430137c7 Mon Sep 17 00:00:00 2001 From: JuArce <52429267+JuArce@users.noreply.github.com> Date: Tue, 5 Aug 2025 15:22:47 -0300 Subject: [PATCH 1/4] feat(aggregation mode): add retry logic to batches download --- aggregation_mode/Cargo.lock | 28 ++++++++- aggregation_mode/Cargo.toml | 1 + aggregation_mode/src/backend/mod.rs | 1 + aggregation_mode/src/backend/retry.rs | 62 +++++++++++++++++++ aggregation_mode/src/backend/s3.rs | 87 ++++++++++++++++++++++----- 5 files changed, 162 insertions(+), 17 deletions(-) create mode 100644 aggregation_mode/src/backend/retry.rs diff --git a/aggregation_mode/Cargo.lock b/aggregation_mode/Cargo.lock index 3aec49c8d8..c6e6a5f40c 100644 --- a/aggregation_mode/Cargo.lock +++ b/aggregation_mode/Cargo.lock @@ -812,7 +812,7 @@ dependencies = [ "alloy-rlp", "alloy-serde 1.0.22", "alloy-sol-types", - "itertools 0.13.0", + "itertools 0.14.0", "serde", "serde_json", "serde_with", @@ -1718,6 +1718,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "backon" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "592277618714fbcecda9a02ba7a8781f319d26532a88553bbacc77ba5d2b3a8d" +dependencies = [ + "fastrand", + "gloo-timers 0.3.0", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -3805,7 +3816,7 @@ version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" dependencies = [ - "gloo-timers", + "gloo-timers 0.2.6", "send_wrapper 0.4.0", ] @@ -3945,6 +3956,18 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "group" version = "0.12.1" @@ -6305,6 +6328,7 @@ version = "0.1.0" dependencies = [ "aligned-sdk", "alloy 0.15.11", + "backon", "bincode", "c-kzg", "ciborium", diff --git a/aggregation_mode/Cargo.toml b/aggregation_mode/Cargo.toml index deb5e2c1c7..73aaa8e56c 100644 --- a/aggregation_mode/Cargo.toml +++ b/aggregation_mode/Cargo.toml @@ -18,6 +18,7 @@ reqwest = { version = "0.12" } ciborium = "=0.2.2" lambdaworks-crypto = { git = "https://github.com/lambdaclass/lambdaworks.git", rev = "5f8f2cfcc8a1a22f77e8dff2d581f1166eefb80b", features = ["serde"]} rayon = "1.10.0" +backon = "1.2.0" # Necessary for the VerificationData type aligned-sdk = { path = "../crates/sdk/" } # zkvms diff --git a/aggregation_mode/src/backend/mod.rs b/aggregation_mode/src/backend/mod.rs index 91cf556eaf..d785e61409 100644 --- a/aggregation_mode/src/backend/mod.rs +++ b/aggregation_mode/src/backend/mod.rs @@ -3,6 +3,7 @@ pub mod fetcher; mod merkle_tree; mod s3; mod types; +mod retry; use crate::aggregators::{AlignedProof, ProofAggregationError, ZKVMEngine}; diff --git a/aggregation_mode/src/backend/retry.rs b/aggregation_mode/src/backend/retry.rs new file mode 100644 index 0000000000..dbcfa91d27 --- /dev/null +++ b/aggregation_mode/src/backend/retry.rs @@ -0,0 +1,62 @@ +use backon::ExponentialBuilder; +use backon::Retryable; +use std::{future::Future, time::Duration}; + +#[derive(Debug)] +pub enum RetryError { + Transient(E), + Permanent(E), +} + +impl std::fmt::Display for RetryError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + RetryError::Transient(e) => write!(f, "{}", e), + RetryError::Permanent(e) => write!(f, "{}", e), + } + } +} + +impl RetryError { + pub fn inner(self) -> E { + match self { + RetryError::Transient(e) => e, + RetryError::Permanent(e) => e, + } + } +} + +impl std::error::Error for RetryError where E: std::fmt::Debug {} + +/// Supports retries only on async functions. See: https://docs.rs/backon/latest/backon/#retry-an-async-function +/// Runs with `jitter: false`. +/// +/// # Parameters +/// * `function` - The async function to retry +/// * `min_delay_millis` - Initial delay before first retry attempt (in milliseconds) +/// * `factor` - Exponential backoff multiplier for retry delays +/// * `max_times` - Maximum number of retry attempts +/// * `max_delay_seconds` - Maximum delay between retry attempts (in seconds) +pub async fn retry_function( + function: FutureFn, + min_delay_millis: u64, + factor: f32, + max_times: usize, + max_delay_seconds: u64, +) -> Result> + where + Fut: Future>>, + FutureFn: FnMut() -> Fut, +{ + let backoff = ExponentialBuilder::default() + .with_min_delay(Duration::from_millis(min_delay_millis)) + .with_max_times(max_times) + .with_factor(factor) + .with_max_delay(Duration::from_secs(max_delay_seconds)); + + function + .retry(backoff) + .sleep(tokio::time::sleep) + .when(|e| matches!(e, RetryError::Transient(_))) + .await +} diff --git a/aggregation_mode/src/backend/s3.rs b/aggregation_mode/src/backend/s3.rs index ba6a2f1478..928de13424 100644 --- a/aggregation_mode/src/backend/s3.rs +++ b/aggregation_mode/src/backend/s3.rs @@ -1,4 +1,7 @@ +use std::time::Duration; use aligned_sdk::common::types::VerificationData; +use tracing::{info, warn}; +use crate::backend::retry::{retry_function, RetryError}; #[derive(Debug)] #[allow(dead_code)] @@ -13,38 +16,92 @@ pub enum GetBatchProofsError { // needed to make S3 bucket work const DEFAULT_USER_AGENT: &str = "proof-aggregator/aligned-layer"; -pub async fn get_aligned_batch_from_s3( +// Retry parameters for S3 requests +/// Initial delay before first retry attempt (in milliseconds) +const RETRY_MIN_DELAY_MILLIS: u64 = 500; +/// Exponential backoff multiplier for retry delays +const RETRY_FACTOR: f32 = 2.0; +/// Maximum number of retry attempts +const RETRY_MAX_TIMES: usize = 5; +/// Maximum delay between retry attempts (in seconds) +const RETRY_MAX_DELAY_SECONDS: u64 = 10; + +/// Timeout for Reqwest Client +const REQWEST_TIMEOUT_SECONDS: Duration = Duration::from_secs(60); + +async fn get_aligned_batch_from_s3_retryable( url: String, -) -> Result, GetBatchProofsError> { +) -> Result, RetryError> { + info!("Fetching batch from S3 URL: {}", url); let client = reqwest::Client::builder() .user_agent(DEFAULT_USER_AGENT) + .timeout(REQWEST_TIMEOUT_SECONDS) .build() - .map_err(|e| GetBatchProofsError::ReqwestClientFailed(e.to_string()))?; + .map_err(|e| RetryError::Permanent(GetBatchProofsError::ReqwestClientFailed(e.to_string())))?; let response = client - .get(url) + .get(&url) .send() .await - .map_err(|e| GetBatchProofsError::FetchingS3Batch(e.to_string()))?; + .map_err(|e| { + warn!("Failed to send request to {}: {}", url, e); + RetryError::Transient(GetBatchProofsError::FetchingS3Batch(e.to_string())) + })?; + if !response.status().is_success() { - return Err(GetBatchProofsError::StatusFailed(( - response.status().as_u16(), - response - .status() - .canonical_reason() - .unwrap_or("") - .to_string(), - ))); + let status_code = response.status().as_u16(); + let reason = response.status().canonical_reason().unwrap_or("").to_string(); + + // Determine if the error is retryable based on status code + let error = GetBatchProofsError::StatusFailed((status_code, reason)); + return match status_code { + // Client errors (4xx) are generally permanent, except for specific cases + 400..=499 => match status_code { + 408 | 429 => Err(RetryError::Transient(error)), // Request Timeout, Too Many Requests + _ => Err(RetryError::Permanent(error)), + }, + // Server errors (5xx) are generally transient + 500..=599 => Err(RetryError::Transient(error)), + _ => Err(RetryError::Permanent(error)), + }; } let bytes = response .bytes() .await - .map_err(|e| GetBatchProofsError::EmptyBody(e.to_string()))?; + .map_err(|e| { + warn!("Failed to read response body from {}: {}", url, e); + RetryError::Transient(GetBatchProofsError::EmptyBody(e.to_string())) + })?; let bytes: &[u8] = bytes.iter().as_slice(); let data: Vec = ciborium::from_reader(bytes) - .map_err(|e| GetBatchProofsError::Deserialization(e.to_string()))?; + .map_err(|e| { + warn!("Failed to deserialize batch data from {}: {}", url, e); + RetryError::Permanent(GetBatchProofsError::Deserialization(e.to_string())) + })?; Ok(data) } + +/// Download batch from Storage Service using the provided URL. +/// +/// Retries on recoverable errors using exponential backoff up to `RETRY_MAX_TIMES` times: +/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs). +pub async fn get_aligned_batch_from_s3( + url: String, +) -> Result, GetBatchProofsError> { + let url_clone = url.clone(); + retry_function( + move || { + let url = url_clone.clone(); + get_aligned_batch_from_s3_retryable(url) + }, + RETRY_MIN_DELAY_MILLIS, + RETRY_FACTOR, + RETRY_MAX_TIMES, + RETRY_MAX_DELAY_SECONDS, + ) + .await + .map_err(|retry_err| retry_err.inner()) +} From 1149cae71eeb0aeb53cf51e5521e95aaabb36580 Mon Sep 17 00:00:00 2001 From: JuArce <52429267+JuArce@users.noreply.github.com> Date: Wed, 6 Aug 2025 11:16:21 -0300 Subject: [PATCH 2/4] fix: rename timeout constant and increase to 5 minutes and set connect_timeout --- aggregation_mode/src/backend/s3.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/aggregation_mode/src/backend/s3.rs b/aggregation_mode/src/backend/s3.rs index 928de13424..814362de31 100644 --- a/aggregation_mode/src/backend/s3.rs +++ b/aggregation_mode/src/backend/s3.rs @@ -26,8 +26,10 @@ const RETRY_MAX_TIMES: usize = 5; /// Maximum delay between retry attempts (in seconds) const RETRY_MAX_DELAY_SECONDS: u64 = 10; -/// Timeout for Reqwest Client -const REQWEST_TIMEOUT_SECONDS: Duration = Duration::from_secs(60); +/// Timeout for establishing a connection to S3 +const CONNECT_TIMEOUT_SECONDS: Duration = Duration::from_secs(10); +/// Timeout for Batch Download Requests +const BATCH_DOWNLOAD_TIMEOUT_SECONDS: Duration = Duration::from_secs(5 * 60); async fn get_aligned_batch_from_s3_retryable( url: String, @@ -35,7 +37,8 @@ async fn get_aligned_batch_from_s3_retryable( info!("Fetching batch from S3 URL: {}", url); let client = reqwest::Client::builder() .user_agent(DEFAULT_USER_AGENT) - .timeout(REQWEST_TIMEOUT_SECONDS) + .connect_timeout(CONNECT_TIMEOUT_SECONDS) + .timeout(BATCH_DOWNLOAD_TIMEOUT_SECONDS) .build() .map_err(|e| RetryError::Permanent(GetBatchProofsError::ReqwestClientFailed(e.to_string())))?; From adcd3691124fb7ad30cdccd8cf811001c3a95b63 Mon Sep 17 00:00:00 2001 From: JuArce <52429267+JuArce@users.noreply.github.com> Date: Wed, 6 Aug 2025 11:30:53 -0300 Subject: [PATCH 3/4] fix: cargo fmt --- aggregation_mode/src/backend/mod.rs | 2 +- aggregation_mode/src/backend/retry.rs | 8 ++--- aggregation_mode/src/backend/s3.rs | 44 +++++++++++++-------------- 3 files changed, 26 insertions(+), 28 deletions(-) diff --git a/aggregation_mode/src/backend/mod.rs b/aggregation_mode/src/backend/mod.rs index d785e61409..570d4bb822 100644 --- a/aggregation_mode/src/backend/mod.rs +++ b/aggregation_mode/src/backend/mod.rs @@ -1,9 +1,9 @@ pub mod config; pub mod fetcher; mod merkle_tree; +mod retry; mod s3; mod types; -mod retry; use crate::aggregators::{AlignedProof, ProofAggregationError, ZKVMEngine}; diff --git a/aggregation_mode/src/backend/retry.rs b/aggregation_mode/src/backend/retry.rs index dbcfa91d27..41971f896f 100644 --- a/aggregation_mode/src/backend/retry.rs +++ b/aggregation_mode/src/backend/retry.rs @@ -30,7 +30,7 @@ impl std::error::Error for RetryError where E: std::fmt /// Supports retries only on async functions. See: https://docs.rs/backon/latest/backon/#retry-an-async-function /// Runs with `jitter: false`. -/// +/// /// # Parameters /// * `function` - The async function to retry /// * `min_delay_millis` - Initial delay before first retry attempt (in milliseconds) @@ -44,9 +44,9 @@ pub async fn retry_function( max_times: usize, max_delay_seconds: u64, ) -> Result> - where - Fut: Future>>, - FutureFn: FnMut() -> Fut, +where + Fut: Future>>, + FutureFn: FnMut() -> Fut, { let backoff = ExponentialBuilder::default() .with_min_delay(Duration::from_millis(min_delay_millis)) diff --git a/aggregation_mode/src/backend/s3.rs b/aggregation_mode/src/backend/s3.rs index 814362de31..3094d0c00f 100644 --- a/aggregation_mode/src/backend/s3.rs +++ b/aggregation_mode/src/backend/s3.rs @@ -1,7 +1,7 @@ -use std::time::Duration; +use crate::backend::retry::{retry_function, RetryError}; use aligned_sdk::common::types::VerificationData; +use std::time::Duration; use tracing::{info, warn}; -use crate::backend::retry::{retry_function, RetryError}; #[derive(Debug)] #[allow(dead_code)] @@ -40,20 +40,22 @@ async fn get_aligned_batch_from_s3_retryable( .connect_timeout(CONNECT_TIMEOUT_SECONDS) .timeout(BATCH_DOWNLOAD_TIMEOUT_SECONDS) .build() - .map_err(|e| RetryError::Permanent(GetBatchProofsError::ReqwestClientFailed(e.to_string())))?; - - let response = client - .get(&url) - .send() - .await .map_err(|e| { - warn!("Failed to send request to {}: {}", url, e); - RetryError::Transient(GetBatchProofsError::FetchingS3Batch(e.to_string())) + RetryError::Permanent(GetBatchProofsError::ReqwestClientFailed(e.to_string())) })?; + let response = client.get(&url).send().await.map_err(|e| { + warn!("Failed to send request to {}: {}", url, e); + RetryError::Transient(GetBatchProofsError::FetchingS3Batch(e.to_string())) + })?; + if !response.status().is_success() { let status_code = response.status().as_u16(); - let reason = response.status().canonical_reason().unwrap_or("").to_string(); + let reason = response + .status() + .canonical_reason() + .unwrap_or("") + .to_string(); // Determine if the error is retryable based on status code let error = GetBatchProofsError::StatusFailed((status_code, reason)); @@ -69,20 +71,16 @@ async fn get_aligned_batch_from_s3_retryable( }; } - let bytes = response - .bytes() - .await - .map_err(|e| { - warn!("Failed to read response body from {}: {}", url, e); - RetryError::Transient(GetBatchProofsError::EmptyBody(e.to_string())) - })?; + let bytes = response.bytes().await.map_err(|e| { + warn!("Failed to read response body from {}: {}", url, e); + RetryError::Transient(GetBatchProofsError::EmptyBody(e.to_string())) + })?; let bytes: &[u8] = bytes.iter().as_slice(); - let data: Vec = ciborium::from_reader(bytes) - .map_err(|e| { - warn!("Failed to deserialize batch data from {}: {}", url, e); - RetryError::Permanent(GetBatchProofsError::Deserialization(e.to_string())) - })?; + let data: Vec = ciborium::from_reader(bytes).map_err(|e| { + warn!("Failed to deserialize batch data from {}: {}", url, e); + RetryError::Permanent(GetBatchProofsError::Deserialization(e.to_string())) + })?; Ok(data) } From 0028b9c016f75ab172d2bed81007c13d47338fd9 Mon Sep 17 00:00:00 2001 From: JuArce <52429267+JuArce@users.noreply.github.com> Date: Wed, 6 Aug 2025 15:25:40 -0300 Subject: [PATCH 4/4] fix: proof_aggregator_install target --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 7ff35a4c41..797b9a211c 100644 --- a/Makefile +++ b/Makefile @@ -298,7 +298,7 @@ verify_aggregated_proof_risc0: --rpc_url $(RPC_URL) proof_aggregator_install: ## Install the aggregation mode with proving enabled - cargo install --path aggregation_mode --features prove,gpu --bin proof_aggregator --locked + cargo install --path aggregation_mode --features prove,gpu --bin proof_aggregator_gpu --locked proof_aggregator_write_program_ids: ## Write proof aggregator zkvm programs ids @cd aggregation_mode && ./scripts/build_programs.sh