Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "init4-bin-base"

description = "Internal utilities for binaries produced by the init4 team"
keywords = ["init4", "bin", "base"]
version = "0.18.0-rc.10"
version = "0.18.0-rc.11"
edition = "2021"
rust-version = "1.85"
authors = ["init4", "James Prestwich", "evalir"]
Expand Down Expand Up @@ -56,6 +56,7 @@ async-trait = { version = "0.1.80", optional = true }
# AWS
aws-config = { version = "1.1.7", optional = true }
aws-sdk-kms = { version = "1.15.0", optional = true }
futures-util = { version = "0.3", optional = true }
reqwest = { version = "0.12.15", optional = true }
rustls = { version = "0.23.31", optional = true }

Expand All @@ -71,7 +72,7 @@ tokio = { version = "1.43.0", features = ["macros"] }
default = ["alloy", "rustls"]
alloy = ["dep:alloy"]
aws = ["alloy", "alloy?/signer-aws", "dep:async-trait", "dep:aws-config", "dep:aws-sdk-kms"]
perms = ["dep:oauth2", "dep:tokio", "dep:reqwest", "dep:signet-tx-cache"]
perms = ["dep:oauth2", "dep:tokio", "dep:reqwest", "dep:signet-tx-cache", "dep:futures-util"]
pylon = ["perms", "alloy/kzg"]
block_watcher = ["dep:tokio"]
rustls = ["dep:rustls", "rustls/aws-lc-rs"]
Expand Down
20 changes: 20 additions & 0 deletions src/perms/tx_cache.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::perms::oauth::SharedToken;
use futures_util::future::Either;
use futures_util::stream::{self, Stream, StreamExt};
use serde::de::DeserializeOwned;
use signet_tx_cache::{
error::TxCacheError,
Expand Down Expand Up @@ -148,6 +150,24 @@ impl BuilderTxCache {
format!("{BUNDLES}/{bundle_id}")
}

/// Stream all bundles from the cache, automatically paginating through
/// all available pages. Yields individual [`CachedBundle`] items.
pub fn stream_bundles(&self) -> impl Stream<Item = Result<CachedBundle>> + Send + '_ {
stream::unfold(Some(None), move |cursor| async move {
let cursor = cursor?;

match self.get_bundles(cursor).await {
Ok(response) => {
let (inner, next_cursor) = response.into_parts();
let bundles = stream::iter(inner.bundles).map(Ok);
Some((Either::Left(bundles), next_cursor.map(Some)))
}
Err(error) => Some((Either::Right(stream::once(async { Err(error) })), None)),
}
})
.flatten()
}

/// Get a bundle from the cache by its UUID. For convenience, this method
/// takes a string reference, which is expected to be a valid UUID.
#[instrument(skip_all)]
Expand Down