Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/go.work.sum
/internal/cmd/riverbench/riverbench
/river
/riverdriver/riverdrivertest/example_libsql_test.libsql
/sqlite/
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

⚠️ Internal APIs used for communication between River and River Pro have changed. If using River Pro, make sure to update River and River Pro to latest at the same time to get compatible versions. River v0.24.0 is compatible with River Pro v0.16.0.

### Added

- The project now tests against [libSQL](https://github.com/tursodatabase/libsql), a popular SQLite fork. It's used through the same `riversqlite` driver that SQLite uses. [PR #957](https://github.com/riverqueue/river/pull/957)

### Changed

- Remove unecessary transactions where a single database operation will do. This reduces the number of subtransactions created which can be an operational benefit it many cases. [PR #950](https://github.com/riverqueue/river/pull/950)
Expand Down
25 changes: 24 additions & 1 deletion riverdriver/riverdrivertest/driver_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/jackc/pgx/v5/stdlib"
"github.com/lib/pq"
"github.com/stretchr/testify/require"
_ "github.com/tursodatabase/libsql-client-go/libsql"
_ "modernc.org/sqlite"

"github.com/riverqueue/river"
Expand Down Expand Up @@ -86,7 +87,29 @@ func TestClientWithDriverRiverPgxV5(t *testing.T) {
)
}

func TestClientWithDriverRiverSQLite(t *testing.T) {
func TestClientWithDriverRiverLibSQL(t *testing.T) {
t.Parallel()

ctx := context.Background()

ExerciseClient(ctx, t,
func(ctx context.Context, t *testing.T) (riverdriver.Driver[*sql.Tx], string) {
t.Helper()

var (
driver = riversqlite.New(nil)
schema = riverdbtest.TestSchema(ctx, t, driver, &riverdbtest.TestSchemaOpts{
ProcurePool: func(ctx context.Context, schema string) (any, string) {
return riversharedtest.DBPoolLibSQL(ctx, t, schema), "" // could also be `main` instead of empty string
},
})
)
return driver, schema
},
)
}

func TestClientWithDriverRiverSQLiteModernC(t *testing.T) {
t.Parallel()

ctx := context.Background()
Expand Down
55 changes: 54 additions & 1 deletion riverdriver/riverdrivertest/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/jackc/pgx/v5/stdlib"
"github.com/lib/pq"
"github.com/stretchr/testify/require"
_ "github.com/tursodatabase/libsql-client-go/libsql"
_ "modernc.org/sqlite"

"github.com/riverqueue/river"
Expand Down Expand Up @@ -111,7 +112,59 @@ func TestDriverRiverPgxV5(t *testing.T) {
})
}

func TestDriverRiverSQLite(t *testing.T) {
func TestDriverRiverLiteLibSQL(t *testing.T) { //nolint:dupl
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth setting these up to use the same test logic to avoid having to add the nolint:dupl? Or are there subtle differences between the two?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I struggle with these ones.

It is true that they are 95% the same. However, if you zoom out and look at what's happening, each body is basically one function call and a couple local variable declarations. There is definitely some duplication that falls out of anonymous functions injected into the function call and comments on them, so we could extract the function call into another function that could encapsulate the shared lines, but IMO this does have a non-zero effect to obfuscate (in that you now have to follow another layer of indirection to see what's happening) and reduce clarity.

I guess i'm not too against it, but it's definitely not a clear win like dupl would suggest. More like a close to 50/50 tradeoff.

Gonna leave it as is for now I think, and possibly reorient later if it comes up again.

t.Parallel()

var (
ctx = context.Background()
procurePool = func(ctx context.Context, schema string) (any, string) {
return riversharedtest.DBPoolLibSQL(ctx, t, schema), "" // could also be `main` instead of empty string
}
)

riverdrivertest.Exercise(ctx, t,
func(ctx context.Context, t *testing.T, opts *riverdbtest.TestSchemaOpts) (riverdriver.Driver[*sql.Tx], string) {
t.Helper()

if opts == nil {
opts = &riverdbtest.TestSchemaOpts{}
}
opts.ProcurePool = procurePool

var (
// Driver will have its pool set by TestSchema.
driver = riversqlite.New(nil)
schema = riverdbtest.TestSchema(ctx, t, driver, opts)
)
return driver, schema
},
func(ctx context.Context, t *testing.T) (riverdriver.Executor, riverdriver.Driver[*sql.Tx]) {
t.Helper()

// Driver will have its pool set by TestSchema.
driver := riversqlite.New(nil)

tx, _ := riverdbtest.TestTx(ctx, t, driver, &riverdbtest.TestTxOpts{
// Unfortunately, the normal test transaction schema sharing has
// to be disabled for SQLite. When enabled, there's too much
// contention on the shared test databases and operations fail
// with `database is locked (5) (SQLITE_BUSY)`, which is a
// common concurrency error in SQLite whose recommended
// remediation is a backoff and retry. I tried various
// techniques like journal_mode=WAL, but it didn't seem to help
// enough. SQLite databases are just local files anyway, and
// test transactions can still reuse schemas freed by other
// tests through TestSchema, so this should be okay performance
// wise.
DisableSchemaSharing: true,

ProcurePool: procurePool,
})
return driver.UnwrapExecutor(tx), driver
})
}

func TestDriverRiverSQLiteModernC(t *testing.T) { //nolint:dupl
t.Parallel()

var (
Expand Down
76 changes: 76 additions & 0 deletions riverdriver/riverdrivertest/example_libsql_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package riverdrivertest_test

import (
"context"
"database/sql"
"log/slog"

_ "github.com/tursodatabase/libsql-client-go/libsql"

"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riversqlite"
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/util/slogutil"
"github.com/riverqueue/river/rivershared/util/testutil"
)

// Example_libSQL demonstrates use of River's SQLite driver with libSQL (a
// SQLite fork).
func Example_libSQL() { //nolint:dupl
ctx := context.Background()

dbPool, err := sql.Open("libsql", "file:./example_libsql_test.libsql")
if err != nil {
panic(err)
}
dbPool.SetMaxOpenConns(1)
defer dbPool.Close()

driver := riversqlite.New(dbPool)

if err := migrateDB(ctx, driver); err != nil {
panic(err)
}

workers := river.NewWorkers()
river.AddWorker(workers, &SortWorker{})

riverClient, err := river.NewClient(driver, &river.Config{
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
TestOnly: true, // suitable only for use in tests; remove for live environments
Workers: workers,
})
if err != nil {
panic(err)
}

// Out of example scope, but used to wait until a job is worked.
subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
defer subscribeCancel()

if err := riverClient.Start(ctx); err != nil {
panic(err)
}

_, err = riverClient.Insert(ctx, SortArgs{
Strings: []string{
"whale", "tiger", "bear",
},
}, nil)
if err != nil {
panic(err)
}

// Wait for jobs to complete. Only needed for purposes of the example test.
riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)

if err := riverClient.Stop(ctx); err != nil {
panic(err)
}

// Output:
// Sorted strings: [bear tiger whale]
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package riversqlite_test
package riverdrivertest_test

import (
"context"
Expand Down Expand Up @@ -36,7 +36,7 @@ func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
}

// Example_sqlite demonstrates use of River's SQLite driver.
func Example_sqlite() {
func Example_sqlite() { //nolint:dupl
ctx := context.Background()

dbPool, err := sql.Open("sqlite", ":memory:")
Expand Down
3 changes: 3 additions & 0 deletions riverdriver/riverdrivertest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ require (
github.com/stretchr/testify v1.10.0
github.com/tidwall/gjson v1.18.0
github.com/tidwall/sjson v1.2.5
github.com/tursodatabase/libsql-client-go v0.0.0-20240902231107-85af5b9d094d
golang.org/x/text v0.26.0
modernc.org/sqlite v1.37.0
)

require (
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/coder/websocket v1.8.12 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions riverdriver/riverdrivertest/go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
github.com/coder/websocket v1.8.12 h1:5bUXkEPPIbewrnkU8LTCLVaxi4N4J8ahufH2vlo4NAo=
github.com/coder/websocket v1.8.12/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -64,6 +68,8 @@ github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
github.com/tursodatabase/libsql-client-go v0.0.0-20240902231107-85af5b9d094d h1:dOMI4+zEbDI37KGb0TI44GUAwxHF9cMsIoDTJ7UmgfU=
github.com/tursodatabase/libsql-client-go v0.0.0-20240902231107-85af5b9d094d/go.mod h1:l8xTsYB90uaVdMHXMCxKKLSgw5wLYBwBKKefNIUnm9s=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE=
Expand Down
4 changes: 0 additions & 4 deletions riverdriver/riversqlite/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ require (
github.com/stretchr/testify v1.10.0
github.com/tidwall/gjson v1.18.0
github.com/tidwall/sjson v1.2.5
modernc.org/sqlite v1.37.0
)

require (
Expand All @@ -36,7 +35,4 @@ require (
golang.org/x/sys v0.32.0 // indirect
golang.org/x/text v0.25.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
modernc.org/libc v1.64.0 // indirect
modernc.org/mathutil v1.7.1 // indirect
modernc.org/memory v1.10.0 // indirect
)
26 changes: 1 addition & 25 deletions riverdriver/riversqlite/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -84,28 +84,4 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
modernc.org/cc/v4 v4.26.0 h1:QMYvbVduUGH0rrO+5mqF/PSPPRZNpRtg2CLELy7vUpA=
modernc.org/cc/v4 v4.26.0/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0=
modernc.org/ccgo/v4 v4.26.0 h1:gVzXaDzGeBYJ2uXTOpR8FR7OlksDOe9jxnjhIKCsiTc=
modernc.org/ccgo/v4 v4.26.0/go.mod h1:Sem8f7TFUtVXkG2fiaChQtyyfkqhJBg/zjEJBkmuAVY=
modernc.org/fileutil v1.3.1 h1:8vq5fe7jdtEvoCf3Zf9Nm0Q05sH6kGx0Op2CPx1wTC8=
modernc.org/fileutil v1.3.1/go.mod h1:HxmghZSZVAz/LXcMNwZPA/DRrQZEVP9VX0V4LQGQFOc=
modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI=
modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito=
modernc.org/libc v1.64.0 h1:U0k8BD2d3cD3e9I8RLcZgJBHAcsJzbXx5mKGSb5pyJA=
modernc.org/libc v1.64.0/go.mod h1:7m9VzGq7APssBTydds2zBcxGREwvIGpuUBaKTXdm2Qs=
modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU=
modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg=
modernc.org/memory v1.10.0 h1:fzumd51yQ1DxcOxSO+S6X7+QTuVU+n8/Aj7swYjFfC4=
modernc.org/memory v1.10.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw=
modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8=
modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns=
modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w=
modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE=
modernc.org/sqlite v1.37.0 h1:s1TMe7T3Q3ovQiK2Ouz4Jwh7dw4ZDqbebSDTlSJdfjI=
modernc.org/sqlite v1.37.0/go.mod h1:5YiWv+YviqGMuGw4V+PNplcyaJ5v+vQd7TQOgkACoJM=
modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0=
modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A=
modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
7 changes: 5 additions & 2 deletions riverdriver/riversqlite/river_sqlite_driver.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
// Package riversqlite provides a River driver implementation for SQLite.
// Package riversqlite provides a River driver implementation for SQLite. It's
// also tested against libSQL (a SQLite fork), and that should continue to work
// as long they keep to their commitment in maintaining API compatibility.
//
// This driver is currently in early testing. It's exercised reasonably
// thoroughly in the test suite, but has minimal real world use as of yet.
Expand Down Expand Up @@ -59,7 +61,8 @@ type Driver struct {
replacer sqlctemplate.Replacer
}

// New returns a new database/sql River driver for use with River.
// New returns a new SQLite driver for use with River. It also works with libSQL
// (a SQLite fork).
//
// It takes an sql.DB to use for use with River. The pool should already be
// configured to use the schema specified in the client's Schema field. The pool
Expand Down
20 changes: 17 additions & 3 deletions rivershared/riversharedtest/riversharedtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,27 @@ var sqliteTestDir = sync.OnceValue(func() string { //nolint:gochecknoglobals
return path.Join(rootDir, "sqlite")
})

// DBPoolLibSQL gets a database pool appropriate for use with libSQL (a SQLite
// fork) in testing.
func DBPoolLibSQL(ctx context.Context, tb testing.TB, schema string) *sql.DB {
tb.Helper()

return dbPoolSQLite(ctx, tb, schema, "libsql")
}

// DBPoolSQLite gets a database pool appropriate for use with SQLite in testing.
func DBPoolSQLite(ctx context.Context, tb testing.TB, schema string) *sql.DB {
tb.Helper()

require.NoError(tb, os.MkdirAll(sqliteTestDir(), 0o700))
return dbPoolSQLite(ctx, tb, schema, "sqlite")
}

func dbPoolSQLite(ctx context.Context, tb testing.TB, schema, driverName string) *sql.DB { //nolint:unparam
tb.Helper()

var databaseURLBuilder strings.Builder

databaseURLBuilder.WriteString(filepath.Join(sqliteTestDir(), schema+".sqlite3"))
databaseURLBuilder.WriteString("file:" + filepath.Join(sqliteTestDir(), schema+".sqlite3"))

// This innocuous line turns out to be quite important at the tail.
//
Expand Down Expand Up @@ -153,7 +165,7 @@ func DBPoolSQLite(ctx context.Context, tb testing.TB, schema string) *sql.DB {
// but actually it opens the door to intermittency hell.
databaseURLBuilder.WriteString("?_pragma=journal_mode(WAL)")

dbPool, err := sql.Open("sqlite", databaseURLBuilder.String())
dbPool, err := sql.Open(driverName, databaseURLBuilder.String())
require.NoError(tb, err)
tb.Cleanup(func() { require.NoError(tb, dbPool.Close()) })

Expand Down Expand Up @@ -334,6 +346,8 @@ var IgnoredKnownGoroutineLeaks = []goleak.Option{ //nolint:gochecknoglobals
// Similar to the above, may be sitting in a sleep when the program finishes
// and there's not much we can do about it.
goleak.IgnoreAnyFunction("github.com/jackc/pgx/v5/pgxpool.(*Pool).triggerHealthCheck.func1"),

goleak.IgnoreAnyFunction("database/sql.(*DB).connectionOpener"),
}

// WrapTestMain performs some common setup and teardown that should be shared
Expand Down
Loading