Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions crates/batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2053,6 +2053,24 @@ impl Batcher {
warn!("User {:?} has insufficient balance, flushing entire queue as safety measure", address);

self.flush_queue_and_clear_nonce_cache().await;

for entry in finalized_batch {
if let Some(ws_sink) = entry.messaging_sink.as_ref() {
tokio::spawn(send_message(
ws_sink.clone(),
SubmitProofResponseMessage::BatchReset,
));
} else {
warn!(
"Websocket sink was found empty. This should only happen in tests"
);
}
}

return Err(BatcherError::StateCorruptedAndFlushed(format!(
"Queue and user states flushed due to insufficient balance for user {:?}",
address
)));
}
_ => {
// Add more cases here if we want in the future
Expand Down Expand Up @@ -2090,7 +2108,10 @@ impl Batcher {
let mut batch_state_lock = self.batch_state.lock().await;
for (entry, _) in batch_state_lock.batch_queue.iter() {
if let Some(ws_sink) = entry.messaging_sink.as_ref() {
send_message(ws_sink.clone(), SubmitProofResponseMessage::BatchReset).await;
tokio::spawn(send_message(
ws_sink.clone(),
SubmitProofResponseMessage::BatchReset,
));
} else {
warn!("Websocket sink was found empty. This should only happen in tests");
}
Expand Down Expand Up @@ -2179,12 +2200,19 @@ impl Batcher {

// If batch finalization failed, restore the proofs to the queue
if let Err(e) = batch_finalization_result {
error!(
"Batch finalization failed, restoring proofs to queue: {:?}",
e
);
self.restore_proofs_after_batch_failure(&finalized_batch)
.await;
error!("Batch finalization failed: {:?}", e);

// If the queue was flushed, don't recover
match &e {
BatcherError::StateCorruptedAndFlushed(_) => {
info!("State was corrupted and flushed - not restoring proofs");
}
_ => {
info!("Restoring proofs to queue after batch failure");
self.restore_proofs_after_batch_failure(&finalized_batch)
.await;
}
}
return Err(e);
}
}
Expand Down
4 changes: 4 additions & 0 deletions crates/batcher/src/types/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub enum BatcherError {
WsSinkEmpty,
AddressNotFoundInUserStates(Address),
QueueRemoveError(String),
StateCorruptedAndFlushed(String),
EthereumProviderError(String),
}

Expand Down Expand Up @@ -148,6 +149,9 @@ impl fmt::Debug for BatcherError {
BatcherError::QueueRemoveError(e) => {
write!(f, "Error while removing entry from queue: {}", e)
}
BatcherError::StateCorruptedAndFlushed(reason) => {
write!(f, "Batcher state was corrupted and flushed: {}", reason)
}
BatcherError::EthereumProviderError(e) => {
write!(f, "Ethereum provider error: {}", e)
}
Expand Down
Loading