Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/cli/src/common_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ pub fn confirmed() -> Arg {
Arg::new("confirmed")
.required(false)
.long("confirmed")
.action(SetTrue)
.num_args(1)
.value_parser(value_parser!(bool))
.help("Instruct the server to deliver only updates of confirmed transactions")
}

Expand Down
6 changes: 3 additions & 3 deletions crates/cli/src/subcommands/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,12 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error
}
})?;
let query = resolved.remaining_args.join(" ");
let confirmed = args.get_flag("confirmed");
let confirmed = args.get_one::<bool>("confirmed").copied();

let con = parse_req(config, args, &resolved.database, resolved.server.as_deref()).await?;
let mut api = ClientApi::new(con).sql();
if confirmed {
api = api.query(&[("confirmed", "true")]);
if let Some(confirmed) = confirmed {
api = api.query(&[("confirmed", if confirmed { "true" } else { "false" })]);
}

run_sql(api, &query, false).await?;
Expand Down
7 changes: 4 additions & 3 deletions crates/cli/src/subcommands/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error
let num = args.get_one::<u32>("num-updates").copied();
let timeout = args.get_one::<u32>("timeout").copied();
let print_initial_update = args.get_flag("print_initial_update");
let confirmed = args.get_flag("confirmed");
let confirmed = args.get_one::<bool>("confirmed").copied();
let resolved_server = server.or(resolved.server.as_deref());

let mut config = config;
Expand All @@ -169,8 +169,9 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error
unknown => unreachable!("Invalid URL scheme in `Connection::db_uri`: {unknown}"),
})
.unwrap();
if confirmed {
url.query_pairs_mut().append_pair("confirmed", "true");
if let Some(confirmed) = confirmed {
url.query_pairs_mut()
.append_pair("confirmed", if confirmed { "true" } else { "false" });
}

// Create the websocket request.
Expand Down
5 changes: 5 additions & 0 deletions crates/client-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ pub mod auth;
pub mod routes;
pub mod util;

/// The default value for the `confirmed` reads parameter when the client does
/// not specify it explicitly. When `true`, the server waits for durability
/// confirmation before sending subscription updates and SQL results.
pub const DEFAULT_CONFIRMED_READS: bool = true;

/// Defines the state / environment of a SpacetimeDB node from the PoV of the
/// client API.
///
Expand Down
5 changes: 3 additions & 2 deletions crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ pub struct SqlQueryParams {
/// If `true`, return the query result only after its transaction offset
/// is confirmed to be durable.
#[serde(default)]
pub confirmed: bool,
pub confirmed: Option<bool>,
}

pub async fn sql_direct<S>(
Expand All @@ -518,7 +518,8 @@ where
.authorize_sql(caller_identity, database.database_identity)
.await?;

host.exec_sql(auth, database, confirmed, sql).await
host.exec_sql(auth, database, confirmed.unwrap_or(crate::DEFAULT_CONFIRMED_READS), sql)
.await
}

pub async fn sql<S>(
Expand Down
4 changes: 2 additions & 2 deletions crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub struct SubscribeQueryParams {
///
/// If `false`, send them immediately.
#[serde(default)]
pub confirmed: bool,
pub confirmed: Option<bool>,
}

pub fn generate_random_connection_id() -> ConnectionId {
Expand Down Expand Up @@ -170,7 +170,7 @@ where
version: negotiated.version,
compression,
tx_update_full: !light,
confirmed_reads: confirmed,
confirmed_reads: confirmed.unwrap_or(crate::DEFAULT_CONFIRMED_READS),
};

// TODO: Should also maybe refactor the code and the protocol to allow a single websocket
Expand Down
2 changes: 1 addition & 1 deletion crates/pg/src/pg_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ where
database::sql_direct(
self.ctx.clone(),
db,
SqlQueryParams { confirmed: true },
SqlQueryParams { confirmed: Some(true) },
params.caller_identity,
query.to_string(),
)
Expand Down
20 changes: 16 additions & 4 deletions crates/smoketests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1136,6 +1136,7 @@ log = "0.4"
"--server",
&self.server_url,
"--confirmed",
"true",
identity.as_str(),
query,
])
Expand Down Expand Up @@ -1375,16 +1376,26 @@ log = "0.4"
/// This matches Python's subscribe semantics - start subscription first,
/// perform actions, then call the handle to collect results.
pub fn subscribe_background(&self, queries: &[&str], n: usize) -> Result<SubscriptionHandle> {
self.subscribe_background_opts(queries, n, false)
self.subscribe_background_opts(queries, n, None)
}

/// Starts a subscription in the background with --confirmed flag.
pub fn subscribe_background_confirmed(&self, queries: &[&str], n: usize) -> Result<SubscriptionHandle> {
self.subscribe_background_opts(queries, n, true)
self.subscribe_background_opts(queries, n, Some(true))
}

/// Starts a subscription in the background with --confirmed flag.
pub fn subscribe_background_unconfirmed(&self, queries: &[&str], n: usize) -> Result<SubscriptionHandle> {
self.subscribe_background_opts(queries, n, Some(false))
}

/// Internal helper for background subscribe with options.
fn subscribe_background_opts(&self, queries: &[&str], n: usize, confirmed: bool) -> Result<SubscriptionHandle> {
fn subscribe_background_opts(
&self,
queries: &[&str],
n: usize,
confirmed: Option<bool>,
) -> Result<SubscriptionHandle> {
use std::io::{BufRead, BufReader};

let identity = self
Expand All @@ -1410,8 +1421,9 @@ log = "0.4"
n.to_string(),
"--print-initial-update".to_string(),
];
if confirmed {
if let Some(confirmed) = confirmed {
args.push("--confirmed".to_string());
args.push(confirmed.to_string());
}
args.push("--".to_string());
cmd.args(&args)
Expand Down
8 changes: 7 additions & 1 deletion crates/smoketests/tests/auto_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,13 @@ fn test_add_table_columns() {
// Subscribe to person table changes multiple times to simulate active clients
let mut subs = Vec::with_capacity(NUM_SUBSCRIBERS);
for _ in 0..NUM_SUBSCRIBERS {
subs.push(test.subscribe_background(&["select * from person"], 5).unwrap());
// We need unconfirmed reads for the updates to arrive properly.
// Otherwise, there's a race between module teardown in publish, vs subscribers
// getting the row deletion they expect.
subs.push(
test.subscribe_background_unconfirmed(&["select * from person"], 5)
.unwrap(),
);
}

// Insert under initial schema
Expand Down
65 changes: 65 additions & 0 deletions docs/docs/00300-resources/00100-how-to/00600-migrating-to-2.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ SpacetimeDB 2.0 introduces a new WebSocket protocol (v2) and SDK with several br
2. **`light_mode` removed** -- no longer necessary since reducer events are no longer broadcast
3. **`CallReducerFlags` removed** -- `NoSuccessNotify` and `set_reducer_flags()` are gone
4. **Event tables introduced** -- a new table type for publishing transient events to subscribers
5. **Confirmed reads enabled by default** -- subscription updates and SQL results are only sent after the transaction is confirmed durable

## Reducer Callbacks

Expand Down Expand Up @@ -1295,6 +1296,69 @@ In 2.0, the success notification is lightweight (just `request_id` and `timestam
</TabItem>
</Tabs>

## Confirmed Reads Enabled by Default

### What changed

In 1.0, subscription updates and SQL query results were sent to the client immediately, before the underlying transaction was confirmed to be durable. This meant a client could observe a row that was later lost if the server crashed before persisting it.

In 2.0, **confirmed reads are enabled by default**. The server waits until a transaction is confirmed durable before sending updates to clients. This ensures that any data a client receives will survive a server restart.

### Impact

- **Slightly higher latency**: Subscription updates and SQL results may arrive a few milliseconds later, as the server waits for durability confirmation before sending them.
- **Stronger consistency**: Clients will never observe data that could be lost due to a crash.
- **No code changes required**: This is a server-side default change. Existing client code works without modification.

### Opting out

If your application prioritizes low latency over durability guarantees (for example, a real-time game where occasional data loss on crash is acceptable), you can opt out by passing `confirmed=false` in the connection URL:

<Tabs groupId="client-language" queryString>
<TabItem value="typescript" label="TypeScript">

```typescript
DbConnection.builder()
.withUri("https://maincloud.spacetimedb.com")
.withDatabaseName("my-database")
.withConfirmedReads(false) // opt out of confirmed reads
.build()
```

</TabItem>
<TabItem value="csharp" label="C#">

```csharp
DbConnection.Builder()
.WithUri("https://maincloud.spacetimedb.com")
.WithDatabaseName("my-database")
.WithConfirmedReads(false) // opt out of confirmed reads
.Build();
```

</TabItem>
<TabItem value="rust" label="Rust">

```rust
DbConnection::builder()
.with_uri("https://maincloud.spacetimedb.com")
.with_database_name("my-database")
.with_confirmed_reads(false) // opt out of confirmed reads
.build()
.expect("Failed to connect");
```

</TabItem>
</Tabs>

For the CLI:

```bash
# SQL without confirmed reads
spacetime sql <database> "SELECT * FROM my_table"
# The --confirmed flag is no longer needed (it is the default)
```

## Quick Migration Checklist

- [ ] Remove all `ctx.reducers.on_<reducer>()` calls
Expand All @@ -1320,3 +1384,4 @@ In 2.0, the success notification is lightweight (just `request_id` and `timestam
- [ ] Remove `with_light_mode()` from `DbConnectionBuilder`
- [ ] Remove `set_reducer_flags()` calls and `CallReducerFlags` imports
- [ ] Remove `unstable::CallReducerFlags` from imports
- [ ] Note that confirmed reads are now enabled by default (no action needed unless you want to opt out with `.withConfirmedReads(false)`)
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,10 @@ Runs a SQL query on the database. WARNING: This command is UNSTABLE and subject
###### **Options:**

* `--interactive` — Instead of using a query, run an interactive command prompt for `SQL` expressions
* `--confirmed` — Instruct the server to deliver only updates of confirmed transactions
* `--confirmed <CONFIRMED>` — Instruct the server to deliver only updates of confirmed transactions

Possible values: `true`, `false`

* `--anonymous` — Perform this action with an anonymous identity
* `-s`, `--server <SERVER>` — The nickname, host name or URL of the server hosting the database
* `-y`, `--yes` — Run non-interactively wherever possible. This will answer "yes" to almost all prompts, but will sometimes answer "no" to preserve non-interactivity (e.g. when prompting whether to log in with spacetimedb.com).
Expand Down Expand Up @@ -605,7 +608,10 @@ Subscribe to SQL queries on the database. WARNING: This command is UNSTABLE and
* `-t`, `--timeout <TIMEOUT>` — The timeout, in seconds, after which to disconnect and stop receiving subscription messages. If `-n` is specified, it will stop after whichever
one comes first.
* `--print-initial-update` — Print the initial update for the queries.
* `--confirmed` — Instruct the server to deliver only updates of confirmed transactions
* `--confirmed <CONFIRMED>` — Instruct the server to deliver only updates of confirmed transactions

Possible values: `true`, `false`

* `--anonymous` — Perform this action with an anonymous identity
* `-y`, `--yes` — Run non-interactively wherever possible. This will answer "yes" to almost all prompts, but will sometimes answer "no" to preserve non-interactivity (e.g. when prompting whether to log in with spacetimedb.com).
* `--no-config` — Ignore spacetime.json configuration
Expand Down
6 changes: 3 additions & 3 deletions smoketests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def fingerprint(self):
def new_identity(self):
new_identity(self.__class__.config_path)

def subscribe(self, *queries, n, confirmed = False, database = None):
def subscribe(self, *queries, n, confirmed = None, database = None):
self._check_published()
assert isinstance(n, int)

Expand All @@ -280,8 +280,8 @@ def subscribe(self, *queries, n, confirmed = False, database = None):
"-n", str(n),
"--print-initial-update",
]
if confirmed:
args.append("--confirmed")
if confirmed is not None:
args.append(f"--confirmed={str(confirmed).lower()}")
args.extend(["--", *queries])

fake_args = ["spacetime", *args[1:]]
Expand Down
7 changes: 5 additions & 2 deletions smoketests/tests/auto_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def test_add_table_auto_migration(self):
logging.info("Initial publish complete")

# Start a subscription before publishing the module, to test that the subscription remains intact after re-publishing.
sub = self.subscribe("select * from person", n=4)
sub = self.subscribe("select * from person", n=4, confirmed=False)

# initial module code is already published by test framework
self.call("add_person", "Robert", "Student")
Expand Down Expand Up @@ -307,7 +307,10 @@ def test_add_table_columns(self):
NUM_SUBSCRIBERS = 20
subs = [None] * NUM_SUBSCRIBERS
for i in range(NUM_SUBSCRIBERS):
subs[i]= self.subscribe("select * from person", n=5)
# We need unconfirmed reads for the updates to arrive properly.
# Otherwise, there's a race between module teardown in publish, vs subscribers
# getting the row deletion they expect.
subs[i]= self.subscribe("select * from person", n=5, confirmed=False)

# Insert under initial schema
self.call("add_person", "Robert")
Expand Down
2 changes: 2 additions & 0 deletions smoketests/tests/confirmed_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ def test_sql_with_confirmed_reads_receives_result(self):
self.spacetime(
"sql",
"--confirmed",
"true",
self.database_identity,
"insert into person (name) values ('Horst')")

res = self.spacetime(
"sql",
"--confirmed",
"true",
self.database_identity,
"select * from person")
res = parse_sql_result(str(res))
Expand Down
Loading