Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

147 changes: 132 additions & 15 deletions crates/bindings/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
//! while [`send`](HttpClient::send) allows more complex requests with headers, bodies and other methods.

use bytes::Bytes;
pub use http::{Request, Response};
pub use spacetimedb_lib::http::{Error, Timeout};

use crate::{
rt::{read_bytes_source_as, read_bytes_source_into},
IterBuf,
};
use spacetimedb_lib::{bsatn, http as st_http};
use spacetimedb_lib::{bsatn, http as st_http, TimeDuration};

pub type Request<T = Body> = http::Request<T>;

pub type Response<T = Body> = http::Response<T>;

/// Allows performing HTTP requests via [`HttpClient::send`] and [`HttpClient::get`].
///
Expand Down Expand Up @@ -43,8 +45,7 @@ impl HttpClient {
/// and a timeout of 100 milliseconds, then treat the response as a string and log it:
///
/// ```norun
/// # use spacetimedb::{procedure, ProcedureContext};
/// # use spacetimedb::http::{Request, Timeout};
/// # use spacetimedb::{procedure, ProcedureContext, http::Timeout};
/// # use std::time::Duration;
/// # #[procedure]
/// # fn post_somewhere(ctx: &mut ProcedureContext) {
Expand Down Expand Up @@ -73,25 +74,24 @@ impl HttpClient {
/// # }
///
/// ```
pub fn send<B: Into<Body>>(&self, request: Request<B>) -> Result<Response<Body>, Error> {
pub fn send<B: Into<Body>>(&self, request: http::Request<B>) -> Result<Response, Error> {
let (request, body) = request.map(Into::into).into_parts();
let request = st_http::Request::from(request);
let request = convert_request(request);
let request = bsatn::to_vec(&request).expect("Failed to BSATN-serialize `spacetimedb_lib::http::Request`");

match spacetimedb_bindings_sys::procedure::http_request(&request, &body.into_bytes()) {
Ok((response_source, body_source)) => {
let response = read_bytes_source_as::<st_http::Response>(response_source);
let response =
http::response::Parts::try_from(response).expect("Invalid http response returned from host");
let response = convert_response(response).expect("Invalid http response returned from host");
let mut buf = IterBuf::take();
read_bytes_source_into(body_source, &mut buf);
let body = Body::from_bytes(buf.clone());

Ok(http::Response::from_parts(response, body))
}
Err(err_source) => {
let error = read_bytes_source_as::<st_http::Error>(err_source);
Err(error)
let message = read_bytes_source_as::<String>(err_source);
Err(Error { message })
}
}
}
Expand Down Expand Up @@ -121,17 +121,79 @@ impl HttpClient {
/// }
/// # }
/// ```
pub fn get(&self, uri: impl TryInto<http::Uri, Error: Into<http::Error>>) -> Result<Response<Body>, Error> {
pub fn get(&self, uri: impl TryInto<http::Uri, Error: Into<http::Error>>) -> Result<Response, Error> {
self.send(
http::Request::builder()
.method("GET")
.method(http::Method::GET)
.uri(uri)
.body(Body::empty())
.map_err(|err| Error::from_display(&err))?,
.body(Body::empty())?,
)
}
}

fn convert_request(parts: http::request::Parts) -> st_http::Request {
let http::request::Parts {
method,
uri,
version,
headers,
mut extensions,
..
} = parts;

let timeout = extensions.remove::<Timeout>();
if !extensions.is_empty() {
log::warn!("Converting HTTP `Request` with unrecognized extensions");
}
st_http::Request {
method: match method {
http::Method::GET => st_http::Method::Get,
http::Method::HEAD => st_http::Method::Head,
http::Method::POST => st_http::Method::Post,
http::Method::PUT => st_http::Method::Put,
http::Method::DELETE => st_http::Method::Delete,
http::Method::CONNECT => st_http::Method::Connect,
http::Method::OPTIONS => st_http::Method::Options,
http::Method::TRACE => st_http::Method::Trace,
http::Method::PATCH => st_http::Method::Patch,
_ => st_http::Method::Extension(method.to_string()),
},
headers: headers
.into_iter()
.map(|(k, v)| (k.map(|k| k.as_str().into()), v.as_bytes().into()))
.collect(),
timeout: timeout.map(Into::into),
uri: uri.to_string(),
version: match version {
http::Version::HTTP_09 => st_http::Version::Http09,
http::Version::HTTP_10 => st_http::Version::Http10,
http::Version::HTTP_11 => st_http::Version::Http11,
http::Version::HTTP_2 => st_http::Version::Http2,
http::Version::HTTP_3 => st_http::Version::Http3,
_ => unreachable!("Unknown HTTP version: {version:?}"),
},
}
}

fn convert_response(response: st_http::Response) -> http::Result<http::response::Parts> {
let st_http::Response { headers, version, code } = response;

let (mut response, ()) = http::Response::new(()).into_parts();
response.version = match version {
st_http::Version::Http09 => http::Version::HTTP_09,
st_http::Version::Http10 => http::Version::HTTP_10,
st_http::Version::Http11 => http::Version::HTTP_11,
st_http::Version::Http2 => http::Version::HTTP_2,
st_http::Version::Http3 => http::Version::HTTP_3,
};
response.status = http::StatusCode::from_u16(code)?;
response.headers = headers
.into_iter()
.map(|(k, v)| Ok((k.into_string().try_into()?, v.into_vec().try_into()?)))
.collect::<http::Result<_>>()?;
Ok(response)
}

/// Represents the body of an HTTP request or response.
pub struct Body {
inner: BodyInner,
Expand Down Expand Up @@ -208,3 +270,58 @@ impl_body_from_bytes!(_unit: () => Bytes::new());
enum BodyInner {
Bytes(Bytes),
}

/// An HTTP extension to specify a timeout for requests made by a procedure running in a SpacetimeDB database.
///
/// Pass an instance of this type to [`http::request::Builder::extension`] to set a timeout on a request.
///
/// This timeout applies to the entire request,
/// from when the headers are first sent to when the response body is fully downloaded.
/// This is sometimes called a total timeout, the sum of the connect timeout and the read timeout.
#[derive(Clone, Copy, PartialEq, Eq)]
pub struct Timeout(pub TimeDuration);

impl From<TimeDuration> for Timeout {
fn from(timeout: TimeDuration) -> Timeout {
Timeout(timeout)
}
}

impl From<Timeout> for TimeDuration {
fn from(Timeout(timeout): Timeout) -> TimeDuration {
timeout
}
}

/// An error that may arise from an HTTP call.
#[derive(Clone, Debug)]
pub struct Error {
/// A string message describing the error.
///
/// It would be nice if we could store a more interesting object here,
/// ideally a type-erased `dyn Trait` cause,
/// rather than just a string, similar to how `anyhow` does.
/// This is not possible because we need to serialize `Error` for transport to WASM,
/// meaning it must have a concrete static type.
/// `reqwest::Error`, which is the source for these,
/// is type-erased enough that the best we can do (at least, the best we can do easily)
/// is to eagerly string-ify the error.
message: String,
}

impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let Error { message } = self;
f.write_str(message)
}
}

impl std::error::Error for Error {}

impl From<http::Error> for Error {
fn from(err: http::Error) -> Self {
Error {
message: err.to_string(),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ spacetimedb
│ ├── derive_more (*)
│ ├── enum_as_inner (*)
│ ├── hex
│ ├── http (*)
│ ├── itertools (*)
│ ├── log
│ ├── spacetimedb_bindings_macro (*)
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ pub enum NodesError {
#[error("Failed to scheduled timer: {0}")]
ScheduleError(#[source] ScheduleError),
#[error("HTTP request failed: {0}")]
HttpError(#[from] spacetimedb_lib::http::Error),
HttpError(String),
}

impl From<DBError> for NodesError {
Expand Down
141 changes: 94 additions & 47 deletions crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,68 +618,45 @@ impl InstanceEnv {

// TODO(procedure-metrics): record size in bytes of request.

fn http_error<E: ToString>(err: E) -> NodesError {
NodesError::HttpError(err.to_string())
}

// Then convert the request into an `http::Request`, a semi-standard "lingua franca" type in the Rust ecosystem,
// and map its body into a type `reqwest` will like.
fn convert_request(request: st_http::Request, body: bytes::Bytes) -> Result<reqwest::Request, st_http::Error> {
let mut request: http::request::Parts =
request.try_into().map_err(|err| st_http::Error::from_display(&err))?;

// Pull our timeout extension, if any, out of the `http::Request` extensions.
// reqwest has its own timeout extension, which is where we'll provide this.
let timeout = request.extensions.remove::<st_http::Timeout>();
let (request, timeout) = convert_http_request(request).map_err(http_error)?;

let request = http::Request::from_parts(request, body.to_vec());
let request = http::Request::from_parts(request, body);

let mut reqwest: reqwest::Request = request.try_into().map_err(|err| st_http::Error::from_display(&err))?;
let mut reqwest: reqwest::Request = request.try_into().map_err(http_error)?;

// If the user requested a timeout using our extension, slot it in to reqwest's timeout.
// Clamp to the range `0..HTTP_DEFAULT_TIMEOUT`.
let timeout = timeout
.map(|timeout| timeout.timeout.to_duration().unwrap_or(Duration::ZERO))
.unwrap_or(HTTP_DEFAULT_TIMEOUT)
.min(HTTP_DEFAULT_TIMEOUT);
// If the user requested a timeout using our extension, slot it in to reqwest's timeout.
// Clamp to the range `0..HTTP_DEFAULT_TIMEOUT`.
let timeout = timeout.unwrap_or(HTTP_DEFAULT_TIMEOUT).min(HTTP_DEFAULT_TIMEOUT);

// reqwest's timeout covers from the start of the request to the end of reading the body,
// so there's no need to do our own timeout operation.
*reqwest.timeout_mut() = Some(timeout);

Ok(reqwest)
}
// reqwest's timeout covers from the start of the request to the end of reading the body,
// so there's no need to do our own timeout operation.
*reqwest.timeout_mut() = Some(timeout);

// If for whatever reason reqwest doesn't like our `http::Request`,
// surface that error to the guest so customers can debug and provide a more appropriate request.
let reqwest = convert_request(request, body)?;
let reqwest = reqwest;

// TODO(procedure-metrics): record size in bytes of response, time spent awaiting response.

// Actually execute the HTTP request!
// We'll wrap this future in a `tokio::time::timeout` before `await`ing it.
let get_response_and_download_body = async {
// TODO(perf): Stash a long-lived `Client` in the env somewhere, rather than building a new one for each call.
let response = reqwest::Client::new()
.execute(reqwest)
.await
.map_err(|err| st_http::Error::from_display(&err))?;

// Download the response body, which in all likelihood will be a stream,
// as reqwest seems to prefer that.
// Note that this will be wrapped in the same `tokio::time::timeout` as the above `execute` call.
let (parts, body) = http::Response::from(response).into_parts();
let body = http_body_util::BodyExt::collect(body)
.await
.map_err(|err| st_http::Error::from_display(&err))?;

// Map the collected body into our `spacetimedb_lib::http::Body` type,
// then wrap it back in an `http::Response`.
Ok::<_, st_http::Error>((parts, body.to_bytes()))
};
// TODO(perf): Stash a long-lived `Client` in the env somewhere, rather than building a new one for each call.
let response = reqwest::Client::new().execute(reqwest).await.map_err(http_error)?;

// If the request failed, surface that error to the guest so customer logic can handle it.
let (response, body) = get_response_and_download_body.await?;
// Download the response body, which in all likelihood will be a stream,
// as reqwest seems to prefer that.
let (response, body) = http::Response::from(response).into_parts();
let body = http_body_util::BodyExt::collect(body)
.await
.map_err(http_error)?
.to_bytes();

// Transform the `http::Response` into our `spacetimedb_lib::http::Response` type,
// which has a stable BSATN encoding to pass across the WASM boundary.
let response = st_http::Response::from(response);
let response = convert_http_response(response);

Ok((response, body))
}
Expand All @@ -692,6 +669,76 @@ impl InstanceEnv {
/// Value chosen arbitrarily by pgoldman 2025-11-18, based on little more than a vague guess.
const HTTP_DEFAULT_TIMEOUT: Duration = Duration::from_millis(500);

fn convert_http_request(request: st_http::Request) -> http::Result<(http::request::Parts, Option<Duration>)> {
let st_http::Request {
method,
headers,
timeout,
uri,
version,
} = request;

let (mut request, ()) = http::Request::new(()).into_parts();
request.method = match method {
st_http::Method::Get => http::Method::GET,
st_http::Method::Head => http::Method::HEAD,
st_http::Method::Post => http::Method::POST,
st_http::Method::Put => http::Method::PUT,
st_http::Method::Delete => http::Method::DELETE,
st_http::Method::Connect => http::Method::CONNECT,
st_http::Method::Options => http::Method::OPTIONS,
st_http::Method::Trace => http::Method::TRACE,
st_http::Method::Patch => http::Method::PATCH,
st_http::Method::Extension(method) => http::Method::from_bytes(method.as_bytes()).expect("Invalid HTTP method"),
};
request.uri = uri.try_into()?;
request.version = match version {
st_http::Version::Http09 => http::Version::HTTP_09,
st_http::Version::Http10 => http::Version::HTTP_10,
st_http::Version::Http11 => http::Version::HTTP_11,
st_http::Version::Http2 => http::Version::HTTP_2,
st_http::Version::Http3 => http::Version::HTTP_3,
};
request.headers = headers
.into_iter()
.map(|(k, v)| Ok((k.into_string().try_into()?, v.into_vec().try_into()?)))
.collect::<http::Result<_>>()?;

let timeout = timeout.map(|d| d.to_duration_saturating());

Ok((request, timeout))
}

fn convert_http_response(response: http::response::Parts) -> st_http::Response {
let http::response::Parts {
extensions,
headers,
status,
version,
..
} = response;

// there's a good chance that reqwest inserted some extensions into this request,
// but we can't control that and don't care much about it.
let _ = extensions;

st_http::Response {
headers: headers
.into_iter()
.map(|(k, v)| (k.map(|k| k.as_str().into()), v.as_bytes().into()))
.collect(),
version: match version {
http::Version::HTTP_09 => st_http::Version::Http09,
http::Version::HTTP_10 => st_http::Version::Http10,
http::Version::HTTP_11 => st_http::Version::Http11,
http::Version::HTTP_2 => st_http::Version::Http2,
http::Version::HTTP_3 => st_http::Version::Http3,
_ => unreachable!("Unknown HTTP version: {version:?}"),
},
code: status.as_u16(),
}
}

impl TxSlot {
/// Sets the slot to `tx`, ensuring that there was no tx before.
pub fn set_raw(&mut self, tx: MutTxId) {
Expand Down
Loading
Loading