diff --git a/Makefile b/Makefile index 7ff35a4c41..797b9a211c 100644 --- a/Makefile +++ b/Makefile @@ -298,7 +298,7 @@ verify_aggregated_proof_risc0: --rpc_url $(RPC_URL) proof_aggregator_install: ## Install the aggregation mode with proving enabled - cargo install --path aggregation_mode --features prove,gpu --bin proof_aggregator --locked + cargo install --path aggregation_mode --features prove,gpu --bin proof_aggregator_gpu --locked proof_aggregator_write_program_ids: ## Write proof aggregator zkvm programs ids @cd aggregation_mode && ./scripts/build_programs.sh diff --git a/aggregation_mode/Cargo.lock b/aggregation_mode/Cargo.lock index 3aec49c8d8..c6e6a5f40c 100644 --- a/aggregation_mode/Cargo.lock +++ b/aggregation_mode/Cargo.lock @@ -812,7 +812,7 @@ dependencies = [ "alloy-rlp", "alloy-serde 1.0.22", "alloy-sol-types", - "itertools 0.13.0", + "itertools 0.14.0", "serde", "serde_json", "serde_with", @@ -1718,6 +1718,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "backon" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "592277618714fbcecda9a02ba7a8781f319d26532a88553bbacc77ba5d2b3a8d" +dependencies = [ + "fastrand", + "gloo-timers 0.3.0", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -3805,7 +3816,7 @@ version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" dependencies = [ - "gloo-timers", + "gloo-timers 0.2.6", "send_wrapper 0.4.0", ] @@ -3945,6 +3956,18 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "group" version = "0.12.1" @@ -6305,6 +6328,7 @@ version = "0.1.0" dependencies = [ "aligned-sdk", "alloy 0.15.11", + "backon", "bincode", "c-kzg", "ciborium", diff --git a/aggregation_mode/Cargo.toml b/aggregation_mode/Cargo.toml index deb5e2c1c7..73aaa8e56c 100644 --- a/aggregation_mode/Cargo.toml +++ b/aggregation_mode/Cargo.toml @@ -18,6 +18,7 @@ reqwest = { version = "0.12" } ciborium = "=0.2.2" lambdaworks-crypto = { git = "https://github.com/lambdaclass/lambdaworks.git", rev = "5f8f2cfcc8a1a22f77e8dff2d581f1166eefb80b", features = ["serde"]} rayon = "1.10.0" +backon = "1.2.0" # Necessary for the VerificationData type aligned-sdk = { path = "../crates/sdk/" } # zkvms diff --git a/aggregation_mode/src/backend/mod.rs b/aggregation_mode/src/backend/mod.rs index 91cf556eaf..570d4bb822 100644 --- a/aggregation_mode/src/backend/mod.rs +++ b/aggregation_mode/src/backend/mod.rs @@ -1,6 +1,7 @@ pub mod config; pub mod fetcher; mod merkle_tree; +mod retry; mod s3; mod types; diff --git a/aggregation_mode/src/backend/retry.rs b/aggregation_mode/src/backend/retry.rs new file mode 100644 index 0000000000..41971f896f --- /dev/null +++ b/aggregation_mode/src/backend/retry.rs @@ -0,0 +1,62 @@ +use backon::ExponentialBuilder; +use backon::Retryable; +use std::{future::Future, time::Duration}; + +#[derive(Debug)] +pub enum RetryError { + Transient(E), + Permanent(E), +} + +impl std::fmt::Display for RetryError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + RetryError::Transient(e) => write!(f, "{}", e), + RetryError::Permanent(e) => write!(f, "{}", e), + } + } +} + +impl RetryError { + pub fn inner(self) -> E { + match self { + RetryError::Transient(e) => e, + RetryError::Permanent(e) => e, + } + } +} + +impl std::error::Error for RetryError where E: std::fmt::Debug {} + +/// Supports retries only on async functions. See: https://docs.rs/backon/latest/backon/#retry-an-async-function +/// Runs with `jitter: false`. +/// +/// # Parameters +/// * `function` - The async function to retry +/// * `min_delay_millis` - Initial delay before first retry attempt (in milliseconds) +/// * `factor` - Exponential backoff multiplier for retry delays +/// * `max_times` - Maximum number of retry attempts +/// * `max_delay_seconds` - Maximum delay between retry attempts (in seconds) +pub async fn retry_function( + function: FutureFn, + min_delay_millis: u64, + factor: f32, + max_times: usize, + max_delay_seconds: u64, +) -> Result> +where + Fut: Future>>, + FutureFn: FnMut() -> Fut, +{ + let backoff = ExponentialBuilder::default() + .with_min_delay(Duration::from_millis(min_delay_millis)) + .with_max_times(max_times) + .with_factor(factor) + .with_max_delay(Duration::from_secs(max_delay_seconds)); + + function + .retry(backoff) + .sleep(tokio::time::sleep) + .when(|e| matches!(e, RetryError::Transient(_))) + .await +} diff --git a/aggregation_mode/src/backend/s3.rs b/aggregation_mode/src/backend/s3.rs index 850b6057d8..a4762f482a 100644 --- a/aggregation_mode/src/backend/s3.rs +++ b/aggregation_mode/src/backend/s3.rs @@ -1,4 +1,6 @@ +use crate::backend::retry::{retry_function, RetryError}; use aligned_sdk::common::types::VerificationData; +use std::time::Duration; use tracing::{info, warn}; #[derive(Debug)] @@ -15,6 +17,22 @@ pub enum GetBatchProofsError { const DEFAULT_USER_AGENT: &str = "proof-aggregator/aligned-layer"; const MAX_BATCH_URLS: usize = 5; +// Retry parameters for S3 requests +/// Initial delay before first retry attempt (in milliseconds) +const RETRY_MIN_DELAY_MILLIS: u64 = 500; +/// Exponential backoff multiplier for retry delays +const RETRY_FACTOR: f32 = 2.0; +/// Maximum number of retry attempts +const RETRY_MAX_TIMES: usize = 5; +/// Maximum delay between retry attempts (in seconds) +const RETRY_MAX_DELAY_SECONDS: u64 = 10; + +/// Timeout for establishing a connection to S3 +const CONNECT_TIMEOUT_SECONDS: Duration = Duration::from_secs(10); +/// Timeout for Batch Download Requests +const BATCH_DOWNLOAD_TIMEOUT_SECONDS: Duration = Duration::from_secs(5 * 60); + + // get_aligned_batch_from_s3_with_multiple_urls tries multiple comma-separated URLs until first successful response pub async fn get_aligned_batch_from_s3_with_multiple_urls( urls: String, @@ -64,39 +82,78 @@ fn parse_batch_urls(batch_urls: &str) -> Vec { urls } -pub async fn get_aligned_batch_from_s3( +async fn get_aligned_batch_from_s3_retryable( url: String, -) -> Result, GetBatchProofsError> { +) -> Result, RetryError> { info!("Fetching batch from S3 URL: {}", url); let client = reqwest::Client::builder() .user_agent(DEFAULT_USER_AGENT) + .connect_timeout(CONNECT_TIMEOUT_SECONDS) + .timeout(BATCH_DOWNLOAD_TIMEOUT_SECONDS) .build() - .map_err(|e| GetBatchProofsError::ReqwestClientFailed(e.to_string()))?; + .map_err(|e| { + RetryError::Permanent(GetBatchProofsError::ReqwestClientFailed(e.to_string())) + })?; + + let response = client.get(&url).send().await.map_err(|e| { + warn!("Failed to send request to {}: {}", url, e); + RetryError::Transient(GetBatchProofsError::FetchingS3Batch(e.to_string())) + })?; - let response = client - .get(url) - .send() - .await - .map_err(|e| GetBatchProofsError::FetchingS3Batch(e.to_string()))?; if !response.status().is_success() { - return Err(GetBatchProofsError::StatusFailed(( - response.status().as_u16(), - response - .status() - .canonical_reason() - .unwrap_or("") - .to_string(), - ))); + let status_code = response.status().as_u16(); + let reason = response + .status() + .canonical_reason() + .unwrap_or("") + .to_string(); + + // Determine if the error is retryable based on status code + let error = GetBatchProofsError::StatusFailed((status_code, reason)); + return match status_code { + // Client errors (4xx) are generally permanent, except for specific cases + 400..=499 => match status_code { + 408 | 429 => Err(RetryError::Transient(error)), // Request Timeout, Too Many Requests + _ => Err(RetryError::Permanent(error)), + }, + // Server errors (5xx) are generally transient + 500..=599 => Err(RetryError::Transient(error)), + _ => Err(RetryError::Permanent(error)), + }; } - let bytes = response - .bytes() - .await - .map_err(|e| GetBatchProofsError::EmptyBody(e.to_string()))?; + let bytes = response.bytes().await.map_err(|e| { + warn!("Failed to read response body from {}: {}", url, e); + RetryError::Transient(GetBatchProofsError::EmptyBody(e.to_string())) + })?; let bytes: &[u8] = bytes.iter().as_slice(); - let data: Vec = ciborium::from_reader(bytes) - .map_err(|e| GetBatchProofsError::Deserialization(e.to_string()))?; + let data: Vec = ciborium::from_reader(bytes).map_err(|e| { + warn!("Failed to deserialize batch data from {}: {}", url, e); + RetryError::Permanent(GetBatchProofsError::Deserialization(e.to_string())) + })?; Ok(data) } + +/// Download batch from Storage Service using the provided URL. +/// +/// Retries on recoverable errors using exponential backoff up to `RETRY_MAX_TIMES` times: +/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs). +pub async fn get_aligned_batch_from_s3( + url: String, +) -> Result, GetBatchProofsError> { + let url_clone = url.clone(); + retry_function( + move || { + let url = url_clone.clone(); + get_aligned_batch_from_s3_retryable(url) + }, + RETRY_MIN_DELAY_MILLIS, + RETRY_FACTOR, + RETRY_MAX_TIMES, + RETRY_MAX_DELAY_SECONDS, + ) + .await + .map_err(|retry_err| retry_err.inner()) +}