diff --git a/crates/cli/src/common_args.rs b/crates/cli/src/common_args.rs index 2f2a9773b36..66a9b9c601a 100644 --- a/crates/cli/src/common_args.rs +++ b/crates/cli/src/common_args.rs @@ -41,7 +41,8 @@ pub fn confirmed() -> Arg { Arg::new("confirmed") .required(false) .long("confirmed") - .action(SetTrue) + .num_args(1) + .value_parser(value_parser!(bool)) .help("Instruct the server to deliver only updates of confirmed transactions") } diff --git a/crates/cli/src/subcommands/sql.rs b/crates/cli/src/subcommands/sql.rs index 25ab32576ca..fe68cc5a598 100644 --- a/crates/cli/src/subcommands/sql.rs +++ b/crates/cli/src/subcommands/sql.rs @@ -237,12 +237,12 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error } })?; let query = resolved.remaining_args.join(" "); - let confirmed = args.get_flag("confirmed"); + let confirmed = args.get_one::("confirmed").copied(); let con = parse_req(config, args, &resolved.database, resolved.server.as_deref()).await?; let mut api = ClientApi::new(con).sql(); - if confirmed { - api = api.query(&[("confirmed", "true")]); + if let Some(confirmed) = confirmed { + api = api.query(&[("confirmed", if confirmed { "true" } else { "false" })]); } run_sql(api, &query, false).await?; diff --git a/crates/cli/src/subcommands/subscribe.rs b/crates/cli/src/subcommands/subscribe.rs index 915235d04f3..1f966fe10b0 100644 --- a/crates/cli/src/subcommands/subscribe.rs +++ b/crates/cli/src/subcommands/subscribe.rs @@ -148,7 +148,7 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error let num = args.get_one::("num-updates").copied(); let timeout = args.get_one::("timeout").copied(); let print_initial_update = args.get_flag("print_initial_update"); - let confirmed = args.get_flag("confirmed"); + let confirmed = args.get_one::("confirmed").copied(); let resolved_server = server.or(resolved.server.as_deref()); let mut config = config; @@ -169,8 +169,9 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error unknown => unreachable!("Invalid URL scheme in `Connection::db_uri`: {unknown}"), }) .unwrap(); - if confirmed { - url.query_pairs_mut().append_pair("confirmed", "true"); + if let Some(confirmed) = confirmed { + url.query_pairs_mut() + .append_pair("confirmed", if confirmed { "true" } else { "false" }); } // Create the websocket request. diff --git a/crates/client-api/src/lib.rs b/crates/client-api/src/lib.rs index 01b78da9b00..fdbf86af36b 100644 --- a/crates/client-api/src/lib.rs +++ b/crates/client-api/src/lib.rs @@ -27,6 +27,11 @@ pub mod auth; pub mod routes; pub mod util; +/// The default value for the `confirmed` reads parameter when the client does +/// not specify it explicitly. When `true`, the server waits for durability +/// confirmation before sending subscription updates and SQL results. +pub const DEFAULT_CONFIRMED_READS: bool = true; + /// Defines the state / environment of a SpacetimeDB node from the PoV of the /// client API. /// diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index b39ad36fe25..e76a474c01f 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -496,7 +496,7 @@ pub struct SqlQueryParams { /// If `true`, return the query result only after its transaction offset /// is confirmed to be durable. #[serde(default)] - pub confirmed: bool, + pub confirmed: Option, } pub async fn sql_direct( @@ -518,7 +518,8 @@ where .authorize_sql(caller_identity, database.database_identity) .await?; - host.exec_sql(auth, database, confirmed, sql).await + host.exec_sql(auth, database, confirmed.unwrap_or(crate::DEFAULT_CONFIRMED_READS), sql) + .await } pub async fn sql( diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index 206624fbda8..f66690ceb1b 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -92,7 +92,7 @@ pub struct SubscribeQueryParams { /// /// If `false`, send them immediately. #[serde(default)] - pub confirmed: bool, + pub confirmed: Option, } pub fn generate_random_connection_id() -> ConnectionId { @@ -170,7 +170,7 @@ where version: negotiated.version, compression, tx_update_full: !light, - confirmed_reads: confirmed, + confirmed_reads: confirmed.unwrap_or(crate::DEFAULT_CONFIRMED_READS), }; // TODO: Should also maybe refactor the code and the protocol to allow a single websocket diff --git a/crates/pg/src/pg_server.rs b/crates/pg/src/pg_server.rs index 860df156f25..afaa8c2994e 100644 --- a/crates/pg/src/pg_server.rs +++ b/crates/pg/src/pg_server.rs @@ -161,7 +161,7 @@ where database::sql_direct( self.ctx.clone(), db, - SqlQueryParams { confirmed: true }, + SqlQueryParams { confirmed: Some(true) }, params.caller_identity, query.to_string(), ) diff --git a/crates/smoketests/src/lib.rs b/crates/smoketests/src/lib.rs index b568c8bd139..d1cf7b16667 100644 --- a/crates/smoketests/src/lib.rs +++ b/crates/smoketests/src/lib.rs @@ -1136,6 +1136,7 @@ log = "0.4" "--server", &self.server_url, "--confirmed", + "true", identity.as_str(), query, ]) @@ -1375,16 +1376,26 @@ log = "0.4" /// This matches Python's subscribe semantics - start subscription first, /// perform actions, then call the handle to collect results. pub fn subscribe_background(&self, queries: &[&str], n: usize) -> Result { - self.subscribe_background_opts(queries, n, false) + self.subscribe_background_opts(queries, n, None) } /// Starts a subscription in the background with --confirmed flag. pub fn subscribe_background_confirmed(&self, queries: &[&str], n: usize) -> Result { - self.subscribe_background_opts(queries, n, true) + self.subscribe_background_opts(queries, n, Some(true)) + } + + /// Starts a subscription in the background with --confirmed flag. + pub fn subscribe_background_unconfirmed(&self, queries: &[&str], n: usize) -> Result { + self.subscribe_background_opts(queries, n, Some(false)) } /// Internal helper for background subscribe with options. - fn subscribe_background_opts(&self, queries: &[&str], n: usize, confirmed: bool) -> Result { + fn subscribe_background_opts( + &self, + queries: &[&str], + n: usize, + confirmed: Option, + ) -> Result { use std::io::{BufRead, BufReader}; let identity = self @@ -1410,8 +1421,9 @@ log = "0.4" n.to_string(), "--print-initial-update".to_string(), ]; - if confirmed { + if let Some(confirmed) = confirmed { args.push("--confirmed".to_string()); + args.push(confirmed.to_string()); } args.push("--".to_string()); cmd.args(&args) diff --git a/crates/smoketests/tests/auto_migration.rs b/crates/smoketests/tests/auto_migration.rs index 93f575edb4d..df779bd6be4 100644 --- a/crates/smoketests/tests/auto_migration.rs +++ b/crates/smoketests/tests/auto_migration.rs @@ -343,7 +343,13 @@ fn test_add_table_columns() { // Subscribe to person table changes multiple times to simulate active clients let mut subs = Vec::with_capacity(NUM_SUBSCRIBERS); for _ in 0..NUM_SUBSCRIBERS { - subs.push(test.subscribe_background(&["select * from person"], 5).unwrap()); + // We need unconfirmed reads for the updates to arrive properly. + // Otherwise, there's a race between module teardown in publish, vs subscribers + // getting the row deletion they expect. + subs.push( + test.subscribe_background_unconfirmed(&["select * from person"], 5) + .unwrap(), + ); } // Insert under initial schema diff --git a/docs/docs/00300-resources/00100-how-to/00600-migrating-to-2.0.md b/docs/docs/00300-resources/00100-how-to/00600-migrating-to-2.0.md index 3f79f1302e0..b6e61f8ce6f 100644 --- a/docs/docs/00300-resources/00100-how-to/00600-migrating-to-2.0.md +++ b/docs/docs/00300-resources/00100-how-to/00600-migrating-to-2.0.md @@ -18,6 +18,7 @@ SpacetimeDB 2.0 introduces a new WebSocket protocol (v2) and SDK with several br 2. **`light_mode` removed** -- no longer necessary since reducer events are no longer broadcast 3. **`CallReducerFlags` removed** -- `NoSuccessNotify` and `set_reducer_flags()` are gone 4. **Event tables introduced** -- a new table type for publishing transient events to subscribers +5. **Confirmed reads enabled by default** -- subscription updates and SQL results are only sent after the transaction is confirmed durable ## Reducer Callbacks @@ -1295,6 +1296,69 @@ In 2.0, the success notification is lightweight (just `request_id` and `timestam +## Confirmed Reads Enabled by Default + +### What changed + +In 1.0, subscription updates and SQL query results were sent to the client immediately, before the underlying transaction was confirmed to be durable. This meant a client could observe a row that was later lost if the server crashed before persisting it. + +In 2.0, **confirmed reads are enabled by default**. The server waits until a transaction is confirmed durable before sending updates to clients. This ensures that any data a client receives will survive a server restart. + +### Impact + +- **Slightly higher latency**: Subscription updates and SQL results may arrive a few milliseconds later, as the server waits for durability confirmation before sending them. +- **Stronger consistency**: Clients will never observe data that could be lost due to a crash. +- **No code changes required**: This is a server-side default change. Existing client code works without modification. + +### Opting out + +If your application prioritizes low latency over durability guarantees (for example, a real-time game where occasional data loss on crash is acceptable), you can opt out by passing `confirmed=false` in the connection URL: + + + + +```typescript +DbConnection.builder() + .withUri("https://maincloud.spacetimedb.com") + .withDatabaseName("my-database") + .withConfirmedReads(false) // opt out of confirmed reads + .build() +``` + + + + +```csharp +DbConnection.Builder() + .WithUri("https://maincloud.spacetimedb.com") + .WithDatabaseName("my-database") + .WithConfirmedReads(false) // opt out of confirmed reads + .Build(); +``` + + + + +```rust +DbConnection::builder() + .with_uri("https://maincloud.spacetimedb.com") + .with_database_name("my-database") + .with_confirmed_reads(false) // opt out of confirmed reads + .build() + .expect("Failed to connect"); +``` + + + + +For the CLI: + +```bash +# SQL without confirmed reads +spacetime sql "SELECT * FROM my_table" +# The --confirmed flag is no longer needed (it is the default) +``` + ## Quick Migration Checklist - [ ] Remove all `ctx.reducers.on_()` calls @@ -1320,3 +1384,4 @@ In 2.0, the success notification is lightweight (just `request_id` and `timestam - [ ] Remove `with_light_mode()` from `DbConnectionBuilder` - [ ] Remove `set_reducer_flags()` calls and `CallReducerFlags` imports - [ ] Remove `unstable::CallReducerFlags` from imports +- [ ] Note that confirmed reads are now enabled by default (no action needed unless you want to opt out with `.withConfirmedReads(false)`) diff --git a/docs/docs/00300-resources/00200-reference/00100-cli-reference/00100-cli-reference.md b/docs/docs/00300-resources/00200-reference/00100-cli-reference/00100-cli-reference.md index 2ea1f9eecbb..6204cc2143c 100644 --- a/docs/docs/00300-resources/00200-reference/00100-cli-reference/00100-cli-reference.md +++ b/docs/docs/00300-resources/00200-reference/00100-cli-reference/00100-cli-reference.md @@ -291,7 +291,10 @@ Runs a SQL query on the database. WARNING: This command is UNSTABLE and subject ###### **Options:** * `--interactive` — Instead of using a query, run an interactive command prompt for `SQL` expressions -* `--confirmed` — Instruct the server to deliver only updates of confirmed transactions +* `--confirmed ` — Instruct the server to deliver only updates of confirmed transactions + + Possible values: `true`, `false` + * `--anonymous` — Perform this action with an anonymous identity * `-s`, `--server ` — The nickname, host name or URL of the server hosting the database * `-y`, `--yes` — Run non-interactively wherever possible. This will answer "yes" to almost all prompts, but will sometimes answer "no" to preserve non-interactivity (e.g. when prompting whether to log in with spacetimedb.com). @@ -605,7 +608,10 @@ Subscribe to SQL queries on the database. WARNING: This command is UNSTABLE and * `-t`, `--timeout ` — The timeout, in seconds, after which to disconnect and stop receiving subscription messages. If `-n` is specified, it will stop after whichever one comes first. * `--print-initial-update` — Print the initial update for the queries. -* `--confirmed` — Instruct the server to deliver only updates of confirmed transactions +* `--confirmed ` — Instruct the server to deliver only updates of confirmed transactions + + Possible values: `true`, `false` + * `--anonymous` — Perform this action with an anonymous identity * `-y`, `--yes` — Run non-interactively wherever possible. This will answer "yes" to almost all prompts, but will sometimes answer "no" to preserve non-interactivity (e.g. when prompting whether to log in with spacetimedb.com). * `--no-config` — Ignore spacetime.json configuration diff --git a/smoketests/__init__.py b/smoketests/__init__.py index 21f1a407827..6e096b3236d 100644 --- a/smoketests/__init__.py +++ b/smoketests/__init__.py @@ -267,7 +267,7 @@ def fingerprint(self): def new_identity(self): new_identity(self.__class__.config_path) - def subscribe(self, *queries, n, confirmed = False, database = None): + def subscribe(self, *queries, n, confirmed = None, database = None): self._check_published() assert isinstance(n, int) @@ -280,8 +280,8 @@ def subscribe(self, *queries, n, confirmed = False, database = None): "-n", str(n), "--print-initial-update", ] - if confirmed: - args.append("--confirmed") + if confirmed is not None: + args.append(f"--confirmed={str(confirmed).lower()}") args.extend(["--", *queries]) fake_args = ["spacetime", *args[1:]] diff --git a/smoketests/tests/auto_migration.py b/smoketests/tests/auto_migration.py index 0ae8b702a95..014bfede3fc 100644 --- a/smoketests/tests/auto_migration.py +++ b/smoketests/tests/auto_migration.py @@ -120,7 +120,7 @@ def test_add_table_auto_migration(self): logging.info("Initial publish complete") # Start a subscription before publishing the module, to test that the subscription remains intact after re-publishing. - sub = self.subscribe("select * from person", n=4) + sub = self.subscribe("select * from person", n=4, confirmed=False) # initial module code is already published by test framework self.call("add_person", "Robert", "Student") @@ -307,7 +307,10 @@ def test_add_table_columns(self): NUM_SUBSCRIBERS = 20 subs = [None] * NUM_SUBSCRIBERS for i in range(NUM_SUBSCRIBERS): - subs[i]= self.subscribe("select * from person", n=5) + # We need unconfirmed reads for the updates to arrive properly. + # Otherwise, there's a race between module teardown in publish, vs subscribers + # getting the row deletion they expect. + subs[i]= self.subscribe("select * from person", n=5, confirmed=False) # Insert under initial schema self.call("add_person", "Robert") diff --git a/smoketests/tests/confirmed_reads.py b/smoketests/tests/confirmed_reads.py index 4d8a844c4bf..b0b1aed6a0c 100644 --- a/smoketests/tests/confirmed_reads.py +++ b/smoketests/tests/confirmed_reads.py @@ -40,12 +40,14 @@ def test_sql_with_confirmed_reads_receives_result(self): self.spacetime( "sql", "--confirmed", + "true", self.database_identity, "insert into person (name) values ('Horst')") res = self.spacetime( "sql", "--confirmed", + "true", self.database_identity, "select * from person") res = parse_sql_result(str(res))