diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f7eccef..f853e096 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,8 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +⚠️ Internal APIs used for communication between River and River Pro have changed. If using River Pro, make sure to update River and River Pro to latest at the same time to get compatible versions. River v0.24.0 is compatible with River Pro v0.16.0. + ### Changed +- Remove unecessary transactions where a single database operation will do. This reduces the number of subtransactions created which can be an operational benefit it many cases. [PR #950](https://github.com/riverqueue/river/pull/950) - Bring all driver tests into separate package so they don't leak dependencies. This removes dependencies from the top level `river` package that most River installations won't need, thereby reducing the transitive dependency load of most River installations. [PR #955](https://github.com/riverqueue/river/pull/955). ### Fixed diff --git a/internal/jobcompleter/job_completer.go b/internal/jobcompleter/job_completer.go index a43d66a6..b6930723 100644 --- a/internal/jobcompleter/job_completer.go +++ b/internal/jobcompleter/job_completer.go @@ -80,20 +80,11 @@ func (c *InlineCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobst start := c.Time.NowUTC() jobs, err := withRetries(ctx, &c.BaseService, c.disableSleep, func(ctx context.Context) ([]*rivertype.JobRow, error) { - execTx, err := c.exec.Begin(ctx) + jobs, err := c.pilot.JobSetStateIfRunningMany(ctx, c.exec, setStateParamsToMany(c.Time.NowUTCOrNil(), c.schema, params)) if err != nil { return nil, err } - defer execTx.Rollback(ctx) - jobs, err := c.pilot.JobSetStateIfRunningMany(ctx, execTx, setStateParamsToMany(c.Time.NowUTCOrNil(), c.schema, params)) - if err != nil { - return nil, err - } - - if err := execTx.Commit(ctx); err != nil { - return nil, err - } return jobs, nil }) if err != nil { @@ -193,20 +184,11 @@ func (c *AsyncCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobsta c.errGroup.Go(func() error { jobs, err := withRetries(ctx, &c.BaseService, c.disableSleep, func(ctx context.Context) ([]*rivertype.JobRow, error) { - execTx, err := c.exec.Begin(ctx) - if err != nil { - return nil, err - } - defer execTx.Rollback(ctx) - - rows, err := c.pilot.JobSetStateIfRunningMany(ctx, execTx, setStateParamsToMany(c.Time.NowUTCOrNil(), c.schema, params)) + rows, err := c.pilot.JobSetStateIfRunningMany(ctx, c.exec, setStateParamsToMany(c.Time.NowUTCOrNil(), c.schema, params)) if err != nil { return nil, err } - if err := execTx.Commit(ctx); err != nil { - return nil, err - } return rows, nil }) if err != nil { @@ -408,19 +390,10 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { }() return withRetries(ctx, &c.BaseService, c.disableSleep, func(ctx context.Context) ([]*rivertype.JobRow, error) { - tx, err := c.exec.Begin(ctx) + rows, err := c.pilot.JobSetStateIfRunningMany(ctx, c.exec, batchParams) if err != nil { return nil, err } - defer tx.Rollback(ctx) - - rows, err := c.pilot.JobSetStateIfRunningMany(ctx, tx, batchParams) - if err != nil { - return nil, err - } - if err := tx.Commit(ctx); err != nil { - return nil, err - } return rows, nil }) diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index 42d538c7..e9c2c7bd 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -25,6 +25,7 @@ import ( "github.com/riverqueue/river/rivershared/sqlctemplate" "github.com/riverqueue/river/rivershared/uniquestates" "github.com/riverqueue/river/rivershared/util/ptrutil" + "github.com/riverqueue/river/rivershared/util/savepointutil" "github.com/riverqueue/river/rivershared/util/sliceutil" "github.com/riverqueue/river/rivertype" ) @@ -902,7 +903,12 @@ type ExecutorTx struct { } func (t *ExecutorTx) Begin(ctx context.Context) (riverdriver.ExecutorTx, error) { - return (&ExecutorSubTx{Executor: Executor{nil, templateReplaceWrapper{t.tx, &t.driver.replacer}, t.driver}, savepointNum: 0, single: &singleTransaction{}, tx: t.tx}).Begin(ctx) + return (&ExecutorSubTx{ + Executor: Executor{nil, templateReplaceWrapper{t.tx, &t.driver.replacer}, t.driver}, + beginOnce: &savepointutil.BeginOnlyOnce{}, + savepointNum: 0, + tx: t.tx, + }).Begin(ctx) } func (t *ExecutorTx) Commit(ctx context.Context) error { @@ -917,15 +923,15 @@ func (t *ExecutorTx) Rollback(ctx context.Context) error { type ExecutorSubTx struct { Executor + beginOnce *savepointutil.BeginOnlyOnce savepointNum int - single *singleTransaction tx *sql.Tx } const savepointPrefix = "river_savepoint_" func (t *ExecutorSubTx) Begin(ctx context.Context) (riverdriver.ExecutorTx, error) { - if err := t.single.begin(); err != nil { + if err := t.beginOnce.Begin(); err != nil { return nil, err } @@ -934,13 +940,13 @@ func (t *ExecutorSubTx) Begin(ctx context.Context) (riverdriver.ExecutorTx, erro return nil, err } - return &ExecutorSubTx{Executor: Executor{nil, templateReplaceWrapper{t.tx, &t.driver.replacer}, t.driver}, savepointNum: nextSavepointNum, single: &singleTransaction{parent: t.single}, tx: t.tx}, nil + return &ExecutorSubTx{Executor: Executor{nil, templateReplaceWrapper{t.tx, &t.driver.replacer}, t.driver}, savepointNum: nextSavepointNum, beginOnce: savepointutil.NewBeginOnlyOnce(t.beginOnce), tx: t.tx}, nil } func (t *ExecutorSubTx) Commit(ctx context.Context) error { - defer t.single.setDone() + defer t.beginOnce.Done() - if t.single.done { + if t.beginOnce.IsDone() { return errors.New("tx is closed") // mirrors pgx's behavior for this condition } @@ -954,9 +960,9 @@ func (t *ExecutorSubTx) Commit(ctx context.Context) error { } func (t *ExecutorSubTx) Rollback(ctx context.Context) error { - defer t.single.setDone() + defer t.beginOnce.Done() - if t.single.done { + if t.beginOnce.IsDone() { return errors.New("tx is closed") // mirrors pgx's behavior for this condition } @@ -974,30 +980,6 @@ func interpretError(err error) error { return err } -// Not strictly necessary, but a small struct designed to help us route out -// problems where `Begin` might be called multiple times on the same -// subtransaction, which would silently produce the wrong result. -type singleTransaction struct { - done bool - parent *singleTransaction - subTxInProgress bool -} - -func (t *singleTransaction) begin() error { - if t.subTxInProgress { - return errors.New("subtransaction already in progress") - } - t.subTxInProgress = true - return nil -} - -func (t *singleTransaction) setDone() { - t.done = true - if t.parent != nil { - t.parent.subTxInProgress = false - } -} - type templateReplaceWrapper struct { dbtx dbsqlc.DBTX replacer *sqlctemplate.Replacer diff --git a/riverdriver/riverdrivertest/benchmark.go b/riverdriver/riverdrivertest/benchmark.go index 9dc0b6ae..d18b2cdb 100644 --- a/riverdriver/riverdrivertest/benchmark.go +++ b/riverdriver/riverdrivertest/benchmark.go @@ -128,21 +128,8 @@ func Benchmark[TTx any](ctx context.Context, b *testing.B, b.ResetTimer() for range b.N { - // Start a new sub-transaction for each iteration - subTx, err := exec.Begin(ctx) - if err != nil { - b.Fatal(err) - } - // Execute the update - _, err = subTx.JobSetStateIfRunningMany(ctx, params) - if err != nil { - subTx.Rollback(ctx) - b.Fatal(err) - } - - // Rollback to keep the database clean - if err := subTx.Rollback(ctx); err != nil { + if _, err := exec.JobSetStateIfRunningMany(ctx, params); err != nil { b.Fatal(err) } } diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index b0a3e7d4..fd853c9b 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -45,6 +45,7 @@ import ( "github.com/riverqueue/river/rivershared/uniquestates" "github.com/riverqueue/river/rivershared/util/ptrutil" "github.com/riverqueue/river/rivershared/util/randutil" + "github.com/riverqueue/river/rivershared/util/savepointutil" "github.com/riverqueue/river/rivershared/util/sliceutil" "github.com/riverqueue/river/rivertype" ) @@ -1252,7 +1253,11 @@ type ExecutorTx struct { } func (t *ExecutorTx) Begin(ctx context.Context) (riverdriver.ExecutorTx, error) { - executorSubTx := &ExecutorSubTx{savepointNum: 0, single: &singleTransaction{}, tx: t.tx} + executorSubTx := &ExecutorSubTx{ + beginOnce: &savepointutil.BeginOnlyOnce{}, + savepointNum: 0, + tx: t.tx, + } executorSubTx.Executor = Executor{nil, templateReplaceWrapper{t.tx, &t.driver.replacer}, t.driver, executorSubTx} return executorSubTx.Begin(ctx) } @@ -1269,15 +1274,15 @@ func (t *ExecutorTx) Rollback(ctx context.Context) error { type ExecutorSubTx struct { Executor + beginOnce *savepointutil.BeginOnlyOnce savepointNum int - single *singleTransaction tx *sql.Tx } const savepointPrefix = "river_savepoint_" func (t *ExecutorSubTx) Begin(ctx context.Context) (riverdriver.ExecutorTx, error) { - if err := t.single.begin(); err != nil { + if err := t.beginOnce.Begin(); err != nil { return nil, err } @@ -1286,16 +1291,20 @@ func (t *ExecutorSubTx) Begin(ctx context.Context) (riverdriver.ExecutorTx, erro return nil, err } - executorSubTx := &ExecutorSubTx{savepointNum: nextSavepointNum, single: &singleTransaction{parent: t.single}, tx: t.tx} + executorSubTx := &ExecutorSubTx{ + beginOnce: savepointutil.NewBeginOnlyOnce(t.beginOnce), + savepointNum: nextSavepointNum, + tx: t.tx, + } executorSubTx.Executor = Executor{nil, templateReplaceWrapper{t.tx, &t.driver.replacer}, t.driver, executorSubTx} return executorSubTx, nil } func (t *ExecutorSubTx) Commit(ctx context.Context) error { - defer t.single.setDone() + defer t.beginOnce.Done() - if t.single.done { + if t.beginOnce.IsDone() { return errors.New("tx is closed") // mirrors pgx's behavior for this condition } @@ -1309,9 +1318,9 @@ func (t *ExecutorSubTx) Commit(ctx context.Context) error { } func (t *ExecutorSubTx) Rollback(ctx context.Context) error { - defer t.single.setDone() + defer t.beginOnce.Done() - if t.single.done { + if t.beginOnce.IsDone() { return errors.New("tx is closed") // mirrors pgx's behavior for this condition } @@ -1329,30 +1338,6 @@ func interpretError(err error) error { return err } -// Not strictly necessary, but a small struct designed to help us route out -// problems where `Begin` might be called multiple times on the same -// subtransaction, which would silently produce the wrong result. -type singleTransaction struct { - done bool - parent *singleTransaction - subTxInProgress bool -} - -func (t *singleTransaction) begin() error { - if t.subTxInProgress { - return errors.New("subtransaction already in progress") - } - t.subTxInProgress = true - return nil -} - -func (t *singleTransaction) setDone() { - t.done = true - if t.parent != nil { - t.parent.subTxInProgress = false - } -} - type templateReplaceWrapper struct { dbtx dbsqlc.DBTX replacer *sqlctemplate.Replacer diff --git a/rivershared/riverpilot/pilot.go b/rivershared/riverpilot/pilot.go index 81a237bc..b95075d2 100644 --- a/rivershared/riverpilot/pilot.go +++ b/rivershared/riverpilot/pilot.go @@ -28,11 +28,11 @@ type Pilot interface { JobInsertMany( ctx context.Context, - execTx riverdriver.ExecutorTx, + exec riverdriver.Executor, params *riverdriver.JobInsertFastManyParams, ) ([]*riverdriver.JobInsertFastResult, error) - JobSetStateIfRunningMany(ctx context.Context, execTx riverdriver.ExecutorTx, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) + JobSetStateIfRunningMany(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) PilotInit(archetype *baseservice.Archetype) diff --git a/rivershared/riverpilot/standard.go b/rivershared/riverpilot/standard.go index dafc8f32..cfbec8da 100644 --- a/rivershared/riverpilot/standard.go +++ b/rivershared/riverpilot/standard.go @@ -22,14 +22,14 @@ func (p *StandardPilot) JobGetAvailable(ctx context.Context, exec riverdriver.Ex func (p *StandardPilot) JobInsertMany( ctx context.Context, - execTx riverdriver.ExecutorTx, + exec riverdriver.Executor, params *riverdriver.JobInsertFastManyParams, ) ([]*riverdriver.JobInsertFastResult, error) { - return execTx.JobInsertFastMany(ctx, params) + return exec.JobInsertFastMany(ctx, params) } -func (p *StandardPilot) JobSetStateIfRunningMany(ctx context.Context, execTx riverdriver.ExecutorTx, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { - return execTx.JobSetStateIfRunningMany(ctx, params) +func (p *StandardPilot) JobSetStateIfRunningMany(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { + return exec.JobSetStateIfRunningMany(ctx, params) } func (p *StandardPilot) PeriodicJobKeepAliveAndReap(ctx context.Context, exec riverdriver.Executor, params *PeriodicJobKeepAliveAndReapParams) ([]*PeriodicJob, error) { diff --git a/rivershared/util/savepointutil/savepoint_util.go b/rivershared/util/savepointutil/savepoint_util.go new file mode 100644 index 00000000..8fe9be54 --- /dev/null +++ b/rivershared/util/savepointutil/savepoint_util.go @@ -0,0 +1,39 @@ +package savepointutil + +import ( + "errors" +) + +// BeginOnlyOnce is a small utility struct to help with safety. It's not +// strictly necessary, but designed to help us out problems when implementing +// savepoints in non-Pgx drivers (e.g. `database/sql` for Postgres or SQLite) +// where `Begin` might be called multiple times on the same subtransaction, +// which would silently produce the wrong result. +type BeginOnlyOnce struct { + done bool + parent *BeginOnlyOnce + subTxInProgress bool +} + +func NewBeginOnlyOnce(parent *BeginOnlyOnce) *BeginOnlyOnce { + return &BeginOnlyOnce{ + parent: parent, + } +} + +func (t *BeginOnlyOnce) Begin() error { + if t.subTxInProgress { + return errors.New("subtransaction already in progress") + } + t.subTxInProgress = true + return nil +} + +func (t *BeginOnlyOnce) Done() { + t.done = true + if t.parent != nil { + t.parent.subTxInProgress = false + } +} + +func (t *BeginOnlyOnce) IsDone() bool { return t.done } diff --git a/rivershared/util/savepointutil/savepoint_util_test.go b/rivershared/util/savepointutil/savepoint_util_test.go new file mode 100644 index 00000000..bccd31e0 --- /dev/null +++ b/rivershared/util/savepointutil/savepoint_util_test.go @@ -0,0 +1,29 @@ +package savepointutil + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestBeginOnlyOnce(t *testing.T) { + t.Parallel() + + beginOnce := NewBeginOnlyOnce(nil) + + require.NoError(t, beginOnce.Begin()) + require.False(t, beginOnce.IsDone()) + + // Trying to begin again is an error because there's already a + // subtransaction in progress. + require.EqualError(t, beginOnce.Begin(), "subtransaction already in progress") + + childBeginOnce := NewBeginOnlyOnce(beginOnce) + + childBeginOnce.Done() + require.False(t, beginOnce.IsDone()) + require.True(t, childBeginOnce.IsDone()) + + // Now that a subtransaction has been finished, Begin can be called again. + require.NoError(t, beginOnce.Begin()) +}