Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ verify_aggregated_proof_risc0:
--rpc_url $(RPC_URL)

proof_aggregator_install: ## Install the aggregation mode with proving enabled
cargo install --path aggregation_mode --features prove,gpu --bin proof_aggregator --locked
cargo install --path aggregation_mode --features prove,gpu --bin proof_aggregator_gpu --locked

proof_aggregator_write_program_ids: ## Write proof aggregator zkvm programs ids
@cd aggregation_mode && ./scripts/build_programs.sh
Expand Down
28 changes: 26 additions & 2 deletions aggregation_mode/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aggregation_mode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ reqwest = { version = "0.12" }
ciborium = "=0.2.2"
lambdaworks-crypto = { git = "https://github.com/lambdaclass/lambdaworks.git", rev = "5f8f2cfcc8a1a22f77e8dff2d581f1166eefb80b", features = ["serde"]}
rayon = "1.10.0"
backon = "1.2.0"
# Necessary for the VerificationData type
aligned-sdk = { path = "../crates/sdk/" }
# zkvms
Expand Down
1 change: 1 addition & 0 deletions aggregation_mode/src/backend/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod config;
pub mod fetcher;
mod merkle_tree;
mod retry;
mod s3;
mod types;

Expand Down
62 changes: 62 additions & 0 deletions aggregation_mode/src/backend/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use backon::ExponentialBuilder;
use backon::Retryable;
use std::{future::Future, time::Duration};

#[derive(Debug)]
pub enum RetryError<E> {
Transient(E),
Permanent(E),
}

impl<E: std::fmt::Display> std::fmt::Display for RetryError<E> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
RetryError::Transient(e) => write!(f, "{}", e),
RetryError::Permanent(e) => write!(f, "{}", e),
}
}
}

impl<E> RetryError<E> {
pub fn inner(self) -> E {
match self {
RetryError::Transient(e) => e,
RetryError::Permanent(e) => e,
}
}
}

impl<E: std::fmt::Display> std::error::Error for RetryError<E> where E: std::fmt::Debug {}

/// Supports retries only on async functions. See: https://docs.rs/backon/latest/backon/#retry-an-async-function
/// Runs with `jitter: false`.
///
/// # Parameters
/// * `function` - The async function to retry
/// * `min_delay_millis` - Initial delay before first retry attempt (in milliseconds)
/// * `factor` - Exponential backoff multiplier for retry delays
/// * `max_times` - Maximum number of retry attempts
/// * `max_delay_seconds` - Maximum delay between retry attempts (in seconds)
pub async fn retry_function<FutureFn, Fut, T, E>(
function: FutureFn,
min_delay_millis: u64,
factor: f32,
max_times: usize,
max_delay_seconds: u64,
) -> Result<T, RetryError<E>>
where
Fut: Future<Output = Result<T, RetryError<E>>>,
FutureFn: FnMut() -> Fut,
{
let backoff = ExponentialBuilder::default()
.with_min_delay(Duration::from_millis(min_delay_millis))
.with_max_times(max_times)
.with_factor(factor)
.with_max_delay(Duration::from_secs(max_delay_seconds));

function
.retry(backoff)
.sleep(tokio::time::sleep)
.when(|e| matches!(e, RetryError::Transient(_)))
.await
}
101 changes: 79 additions & 22 deletions aggregation_mode/src/backend/s3.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::backend::retry::{retry_function, RetryError};
use aligned_sdk::common::types::VerificationData;
use std::time::Duration;
use tracing::{info, warn};

#[derive(Debug)]
Expand All @@ -15,6 +17,22 @@ pub enum GetBatchProofsError {
const DEFAULT_USER_AGENT: &str = "proof-aggregator/aligned-layer";
const MAX_BATCH_URLS: usize = 5;

// Retry parameters for S3 requests
/// Initial delay before first retry attempt (in milliseconds)
const RETRY_MIN_DELAY_MILLIS: u64 = 500;
/// Exponential backoff multiplier for retry delays
const RETRY_FACTOR: f32 = 2.0;
/// Maximum number of retry attempts
const RETRY_MAX_TIMES: usize = 5;
/// Maximum delay between retry attempts (in seconds)
const RETRY_MAX_DELAY_SECONDS: u64 = 10;

/// Timeout for establishing a connection to S3
const CONNECT_TIMEOUT_SECONDS: Duration = Duration::from_secs(10);
/// Timeout for Batch Download Requests
const BATCH_DOWNLOAD_TIMEOUT_SECONDS: Duration = Duration::from_secs(5 * 60);


// get_aligned_batch_from_s3_with_multiple_urls tries multiple comma-separated URLs until first successful response
pub async fn get_aligned_batch_from_s3_with_multiple_urls(
urls: String,
Expand Down Expand Up @@ -64,39 +82,78 @@ fn parse_batch_urls(batch_urls: &str) -> Vec<String> {
urls
}

pub async fn get_aligned_batch_from_s3(
async fn get_aligned_batch_from_s3_retryable(
url: String,
) -> Result<Vec<VerificationData>, GetBatchProofsError> {
) -> Result<Vec<VerificationData>, RetryError<GetBatchProofsError>> {
info!("Fetching batch from S3 URL: {}", url);
let client = reqwest::Client::builder()
.user_agent(DEFAULT_USER_AGENT)
.connect_timeout(CONNECT_TIMEOUT_SECONDS)
.timeout(BATCH_DOWNLOAD_TIMEOUT_SECONDS)
.build()
.map_err(|e| GetBatchProofsError::ReqwestClientFailed(e.to_string()))?;
.map_err(|e| {
RetryError::Permanent(GetBatchProofsError::ReqwestClientFailed(e.to_string()))
})?;

let response = client.get(&url).send().await.map_err(|e| {
warn!("Failed to send request to {}: {}", url, e);
RetryError::Transient(GetBatchProofsError::FetchingS3Batch(e.to_string()))
})?;

let response = client
.get(url)
.send()
.await
.map_err(|e| GetBatchProofsError::FetchingS3Batch(e.to_string()))?;
if !response.status().is_success() {
return Err(GetBatchProofsError::StatusFailed((
response.status().as_u16(),
response
.status()
.canonical_reason()
.unwrap_or("")
.to_string(),
)));
let status_code = response.status().as_u16();
let reason = response
.status()
.canonical_reason()
.unwrap_or("")
.to_string();

// Determine if the error is retryable based on status code
let error = GetBatchProofsError::StatusFailed((status_code, reason));
return match status_code {
// Client errors (4xx) are generally permanent, except for specific cases
400..=499 => match status_code {
408 | 429 => Err(RetryError::Transient(error)), // Request Timeout, Too Many Requests
_ => Err(RetryError::Permanent(error)),
},
// Server errors (5xx) are generally transient
500..=599 => Err(RetryError::Transient(error)),
_ => Err(RetryError::Permanent(error)),
};
}

let bytes = response
.bytes()
.await
.map_err(|e| GetBatchProofsError::EmptyBody(e.to_string()))?;
let bytes = response.bytes().await.map_err(|e| {
warn!("Failed to read response body from {}: {}", url, e);
RetryError::Transient(GetBatchProofsError::EmptyBody(e.to_string()))
})?;
let bytes: &[u8] = bytes.iter().as_slice();

let data: Vec<VerificationData> = ciborium::from_reader(bytes)
.map_err(|e| GetBatchProofsError::Deserialization(e.to_string()))?;
let data: Vec<VerificationData> = ciborium::from_reader(bytes).map_err(|e| {
warn!("Failed to deserialize batch data from {}: {}", url, e);
RetryError::Permanent(GetBatchProofsError::Deserialization(e.to_string()))
})?;

Ok(data)
}

/// Download batch from Storage Service using the provided URL.
///
/// Retries on recoverable errors using exponential backoff up to `RETRY_MAX_TIMES` times:
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
pub async fn get_aligned_batch_from_s3(
url: String,
) -> Result<Vec<VerificationData>, GetBatchProofsError> {
let url_clone = url.clone();
retry_function(
move || {
let url = url_clone.clone();
get_aligned_batch_from_s3_retryable(url)
},
RETRY_MIN_DELAY_MILLIS,
RETRY_FACTOR,
RETRY_MAX_TIMES,
RETRY_MAX_DELAY_SECONDS,
)
.await
.map_err(|retry_err| retry_err.inner())
}
Loading