Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

⚠️ Internal APIs used for communication between River and River Pro have changed. If using River Pro, make sure to update River and River Pro to latest at the same time to get compatible versions. River v0.24.0 is compatible with River Pro v0.16.0.

### Changed

- Remove unecessary transactions where a single database operation will do. This reduces the number of subtransactions created which can be an operational benefit it many cases. [PR #950](https://github.com/riverqueue/river/pull/950)
- Bring all driver tests into separate package so they don't leak dependencies. This removes dependencies from the top level `river` package that most River installations won't need, thereby reducing the transitive dependency load of most River installations. [PR #955](https://github.com/riverqueue/river/pull/955).

### Fixed
Expand Down
33 changes: 3 additions & 30 deletions internal/jobcompleter/job_completer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,11 @@ func (c *InlineCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobst
start := c.Time.NowUTC()

jobs, err := withRetries(ctx, &c.BaseService, c.disableSleep, func(ctx context.Context) ([]*rivertype.JobRow, error) {
execTx, err := c.exec.Begin(ctx)
jobs, err := c.pilot.JobSetStateIfRunningMany(ctx, c.exec, setStateParamsToMany(c.Time.NowUTCOrNil(), c.schema, params))
if err != nil {
return nil, err
}
defer execTx.Rollback(ctx)

jobs, err := c.pilot.JobSetStateIfRunningMany(ctx, execTx, setStateParamsToMany(c.Time.NowUTCOrNil(), c.schema, params))
if err != nil {
return nil, err
}

if err := execTx.Commit(ctx); err != nil {
return nil, err
}
return jobs, nil
})
if err != nil {
Expand Down Expand Up @@ -193,20 +184,11 @@ func (c *AsyncCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobsta

c.errGroup.Go(func() error {
jobs, err := withRetries(ctx, &c.BaseService, c.disableSleep, func(ctx context.Context) ([]*rivertype.JobRow, error) {
execTx, err := c.exec.Begin(ctx)
if err != nil {
return nil, err
}
defer execTx.Rollback(ctx)

rows, err := c.pilot.JobSetStateIfRunningMany(ctx, execTx, setStateParamsToMany(c.Time.NowUTCOrNil(), c.schema, params))
rows, err := c.pilot.JobSetStateIfRunningMany(ctx, c.exec, setStateParamsToMany(c.Time.NowUTCOrNil(), c.schema, params))
if err != nil {
return nil, err
}

if err := execTx.Commit(ctx); err != nil {
return nil, err
}
return rows, nil
})
if err != nil {
Expand Down Expand Up @@ -408,19 +390,10 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {
}()

return withRetries(ctx, &c.BaseService, c.disableSleep, func(ctx context.Context) ([]*rivertype.JobRow, error) {
tx, err := c.exec.Begin(ctx)
rows, err := c.pilot.JobSetStateIfRunningMany(ctx, c.exec, batchParams)
if err != nil {
return nil, err
}
defer tx.Rollback(ctx)

rows, err := c.pilot.JobSetStateIfRunningMany(ctx, tx, batchParams)
if err != nil {
return nil, err
}
if err := tx.Commit(ctx); err != nil {
return nil, err
}

return rows, nil
})
Expand Down
46 changes: 14 additions & 32 deletions riverdriver/riverdatabasesql/river_database_sql_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/riverqueue/river/rivershared/sqlctemplate"
"github.com/riverqueue/river/rivershared/uniquestates"
"github.com/riverqueue/river/rivershared/util/ptrutil"
"github.com/riverqueue/river/rivershared/util/savepointutil"
"github.com/riverqueue/river/rivershared/util/sliceutil"
"github.com/riverqueue/river/rivertype"
)
Expand Down Expand Up @@ -902,7 +903,12 @@ type ExecutorTx struct {
}

func (t *ExecutorTx) Begin(ctx context.Context) (riverdriver.ExecutorTx, error) {
return (&ExecutorSubTx{Executor: Executor{nil, templateReplaceWrapper{t.tx, &t.driver.replacer}, t.driver}, savepointNum: 0, single: &singleTransaction{}, tx: t.tx}).Begin(ctx)
return (&ExecutorSubTx{
Executor: Executor{nil, templateReplaceWrapper{t.tx, &t.driver.replacer}, t.driver},
beginOnce: &savepointutil.BeginOnlyOnce{},
savepointNum: 0,
tx: t.tx,
}).Begin(ctx)
}

func (t *ExecutorTx) Commit(ctx context.Context) error {
Expand All @@ -917,15 +923,15 @@ func (t *ExecutorTx) Rollback(ctx context.Context) error {

type ExecutorSubTx struct {
Executor
beginOnce *savepointutil.BeginOnlyOnce
savepointNum int
single *singleTransaction
tx *sql.Tx
}

const savepointPrefix = "river_savepoint_"

func (t *ExecutorSubTx) Begin(ctx context.Context) (riverdriver.ExecutorTx, error) {
if err := t.single.begin(); err != nil {
if err := t.beginOnce.Begin(); err != nil {
return nil, err
}

Expand All @@ -934,13 +940,13 @@ func (t *ExecutorSubTx) Begin(ctx context.Context) (riverdriver.ExecutorTx, erro
return nil, err
}

return &ExecutorSubTx{Executor: Executor{nil, templateReplaceWrapper{t.tx, &t.driver.replacer}, t.driver}, savepointNum: nextSavepointNum, single: &singleTransaction{parent: t.single}, tx: t.tx}, nil
return &ExecutorSubTx{Executor: Executor{nil, templateReplaceWrapper{t.tx, &t.driver.replacer}, t.driver}, savepointNum: nextSavepointNum, beginOnce: savepointutil.NewBeginOnlyOnce(t.beginOnce), tx: t.tx}, nil
}

func (t *ExecutorSubTx) Commit(ctx context.Context) error {
defer t.single.setDone()
defer t.beginOnce.Done()

if t.single.done {
if t.beginOnce.IsDone() {
return errors.New("tx is closed") // mirrors pgx's behavior for this condition
}

Expand All @@ -954,9 +960,9 @@ func (t *ExecutorSubTx) Commit(ctx context.Context) error {
}

func (t *ExecutorSubTx) Rollback(ctx context.Context) error {
defer t.single.setDone()
defer t.beginOnce.Done()

if t.single.done {
if t.beginOnce.IsDone() {
return errors.New("tx is closed") // mirrors pgx's behavior for this condition
}

Expand All @@ -974,30 +980,6 @@ func interpretError(err error) error {
return err
}

// Not strictly necessary, but a small struct designed to help us route out
// problems where `Begin` might be called multiple times on the same
// subtransaction, which would silently produce the wrong result.
type singleTransaction struct {
done bool
parent *singleTransaction
subTxInProgress bool
}

func (t *singleTransaction) begin() error {
if t.subTxInProgress {
return errors.New("subtransaction already in progress")
}
t.subTxInProgress = true
return nil
}

func (t *singleTransaction) setDone() {
t.done = true
if t.parent != nil {
t.parent.subTxInProgress = false
}
}

type templateReplaceWrapper struct {
dbtx dbsqlc.DBTX
replacer *sqlctemplate.Replacer
Expand Down
15 changes: 1 addition & 14 deletions riverdriver/riverdrivertest/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,21 +128,8 @@ func Benchmark[TTx any](ctx context.Context, b *testing.B,

b.ResetTimer()
for range b.N {
// Start a new sub-transaction for each iteration
subTx, err := exec.Begin(ctx)
if err != nil {
b.Fatal(err)
}

// Execute the update
_, err = subTx.JobSetStateIfRunningMany(ctx, params)
if err != nil {
subTx.Rollback(ctx)
b.Fatal(err)
}

// Rollback to keep the database clean
if err := subTx.Rollback(ctx); err != nil {
if _, err := exec.JobSetStateIfRunningMany(ctx, params); err != nil {
b.Fatal(err)
}
}
Expand Down
49 changes: 17 additions & 32 deletions riverdriver/riversqlite/river_sqlite_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/riverqueue/river/rivershared/uniquestates"
"github.com/riverqueue/river/rivershared/util/ptrutil"
"github.com/riverqueue/river/rivershared/util/randutil"
"github.com/riverqueue/river/rivershared/util/savepointutil"
"github.com/riverqueue/river/rivershared/util/sliceutil"
"github.com/riverqueue/river/rivertype"
)
Expand Down Expand Up @@ -1252,7 +1253,11 @@ type ExecutorTx struct {
}

func (t *ExecutorTx) Begin(ctx context.Context) (riverdriver.ExecutorTx, error) {
executorSubTx := &ExecutorSubTx{savepointNum: 0, single: &singleTransaction{}, tx: t.tx}
executorSubTx := &ExecutorSubTx{
beginOnce: &savepointutil.BeginOnlyOnce{},
savepointNum: 0,
tx: t.tx,
}
executorSubTx.Executor = Executor{nil, templateReplaceWrapper{t.tx, &t.driver.replacer}, t.driver, executorSubTx}
return executorSubTx.Begin(ctx)
}
Expand All @@ -1269,15 +1274,15 @@ func (t *ExecutorTx) Rollback(ctx context.Context) error {

type ExecutorSubTx struct {
Executor
beginOnce *savepointutil.BeginOnlyOnce
savepointNum int
single *singleTransaction
tx *sql.Tx
}

const savepointPrefix = "river_savepoint_"

func (t *ExecutorSubTx) Begin(ctx context.Context) (riverdriver.ExecutorTx, error) {
if err := t.single.begin(); err != nil {
if err := t.beginOnce.Begin(); err != nil {
return nil, err
}

Expand All @@ -1286,16 +1291,20 @@ func (t *ExecutorSubTx) Begin(ctx context.Context) (riverdriver.ExecutorTx, erro
return nil, err
}

executorSubTx := &ExecutorSubTx{savepointNum: nextSavepointNum, single: &singleTransaction{parent: t.single}, tx: t.tx}
executorSubTx := &ExecutorSubTx{
beginOnce: savepointutil.NewBeginOnlyOnce(t.beginOnce),
savepointNum: nextSavepointNum,
tx: t.tx,
}
executorSubTx.Executor = Executor{nil, templateReplaceWrapper{t.tx, &t.driver.replacer}, t.driver, executorSubTx}

return executorSubTx, nil
}

func (t *ExecutorSubTx) Commit(ctx context.Context) error {
defer t.single.setDone()
defer t.beginOnce.Done()

if t.single.done {
if t.beginOnce.IsDone() {
return errors.New("tx is closed") // mirrors pgx's behavior for this condition
}

Expand All @@ -1309,9 +1318,9 @@ func (t *ExecutorSubTx) Commit(ctx context.Context) error {
}

func (t *ExecutorSubTx) Rollback(ctx context.Context) error {
defer t.single.setDone()
defer t.beginOnce.Done()

if t.single.done {
if t.beginOnce.IsDone() {
return errors.New("tx is closed") // mirrors pgx's behavior for this condition
}

Expand All @@ -1329,30 +1338,6 @@ func interpretError(err error) error {
return err
}

// Not strictly necessary, but a small struct designed to help us route out
// problems where `Begin` might be called multiple times on the same
// subtransaction, which would silently produce the wrong result.
type singleTransaction struct {
done bool
parent *singleTransaction
subTxInProgress bool
}

func (t *singleTransaction) begin() error {
if t.subTxInProgress {
return errors.New("subtransaction already in progress")
}
t.subTxInProgress = true
return nil
}

func (t *singleTransaction) setDone() {
t.done = true
if t.parent != nil {
t.parent.subTxInProgress = false
}
}

type templateReplaceWrapper struct {
dbtx dbsqlc.DBTX
replacer *sqlctemplate.Replacer
Expand Down
4 changes: 2 additions & 2 deletions rivershared/riverpilot/pilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ type Pilot interface {

JobInsertMany(
ctx context.Context,
execTx riverdriver.ExecutorTx,
exec riverdriver.Executor,
params *riverdriver.JobInsertFastManyParams,
) ([]*riverdriver.JobInsertFastResult, error)

JobSetStateIfRunningMany(ctx context.Context, execTx riverdriver.ExecutorTx, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error)
JobSetStateIfRunningMany(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error)

PilotInit(archetype *baseservice.Archetype)

Expand Down
8 changes: 4 additions & 4 deletions rivershared/riverpilot/standard.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ func (p *StandardPilot) JobGetAvailable(ctx context.Context, exec riverdriver.Ex

func (p *StandardPilot) JobInsertMany(
ctx context.Context,
execTx riverdriver.ExecutorTx,
exec riverdriver.Executor,
params *riverdriver.JobInsertFastManyParams,
) ([]*riverdriver.JobInsertFastResult, error) {
return execTx.JobInsertFastMany(ctx, params)
return exec.JobInsertFastMany(ctx, params)
}

func (p *StandardPilot) JobSetStateIfRunningMany(ctx context.Context, execTx riverdriver.ExecutorTx, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) {
return execTx.JobSetStateIfRunningMany(ctx, params)
func (p *StandardPilot) JobSetStateIfRunningMany(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) {
return exec.JobSetStateIfRunningMany(ctx, params)
}

func (p *StandardPilot) PeriodicJobKeepAliveAndReap(ctx context.Context, exec riverdriver.Executor, params *PeriodicJobKeepAliveAndReapParams) ([]*PeriodicJob, error) {
Expand Down
39 changes: 39 additions & 0 deletions rivershared/util/savepointutil/savepoint_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package savepointutil

import (
"errors"
)

// BeginOnlyOnce is a small utility struct to help with safety. It's not
// strictly necessary, but designed to help us out problems when implementing
// savepoints in non-Pgx drivers (e.g. `database/sql` for Postgres or SQLite)
// where `Begin` might be called multiple times on the same subtransaction,
// which would silently produce the wrong result.
type BeginOnlyOnce struct {
done bool
parent *BeginOnlyOnce
subTxInProgress bool
}

func NewBeginOnlyOnce(parent *BeginOnlyOnce) *BeginOnlyOnce {
return &BeginOnlyOnce{
parent: parent,
}
}

func (t *BeginOnlyOnce) Begin() error {
if t.subTxInProgress {
return errors.New("subtransaction already in progress")
}
t.subTxInProgress = true
return nil
}

func (t *BeginOnlyOnce) Done() {
t.done = true
if t.parent != nil {
t.parent.subTxInProgress = false
}
}

func (t *BeginOnlyOnce) IsDone() bool { return t.done }
Loading
Loading