From 52b6144182cddb6a69da6badf976296cb6c5ad86 Mon Sep 17 00:00:00 2001 From: Brandur Date: Sat, 29 Jun 2024 09:35:30 -0700 Subject: [PATCH] Idempotent queue pause and resume I was writing tests for River UI and found that when trying to pause an already paused queue or resume an unpaused queue, River returns a "not found" error. This was quite surprising, so much so that it took me a good half hour of debugging before I considered the fact that it might actually be a problem in upstream River. Even if an argument were to be made that pausing an already paused queue should be an error like "queue already paused", returning "not found" is misleading and guaranteed to result in confused people beyond just myself. I think it's fine for pause and resume to be considered idempotent operations. If a paused queue is paused again, there's no real damage done, especially if we keep the original paused time, which we do in this implementation. Similarly, make an update so that when pausing or resuming using the all queues string (`*`), don't return "not found" error if there are no queues in the database. Instead, just no-op. --- CHANGELOG.md | 4 ++ .../riverdrivertest/riverdrivertest.go | 61 +++++++++++++++++-- riverdriver/river_driver_interface.go | 2 + .../internal/dbsqlc/river_queue.sql | 53 ++++++++++------ .../internal/dbsqlc/river_queue.sql.go | 53 ++++++++++------ riverdriver/riverpgxv5/river_pgx_v5_driver.go | 4 +- 6 files changed, 133 insertions(+), 44 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d5542af..55a503d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Pausing or resuming a queue that was already paused or not paused respectively no longer returns `rivertype.ErrNotFound`. The same goes for pausing or resuming using the all queues string (`*`) when no queues are in the database (previously that also returned `rivertype.ErrNotFound`). [PR #408](https://github.com/riverqueue/river/pull/408). + ## [0.8.0] - 2024-06-25 ### Added diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index ea4f61db..1e18538e 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -1949,7 +1949,25 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv t.Run("QueuePause", func(t *testing.T) { t.Parallel() - t.Run("ExistingQueue", func(t *testing.T) { + t.Run("ExistingPausedQueue", func(t *testing.T) { + t.Parallel() + + exec, _ := setupExecutor(ctx, t, driver, beginTx) + + queue := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{ + PausedAt: ptrutil.Ptr(time.Now()), + }) + + require.NoError(t, exec.QueuePause(ctx, queue.Name)) + + queueFetched, err := exec.QueueGet(ctx, queue.Name) + require.NoError(t, err) + require.NotNil(t, queueFetched.PausedAt) + requireEqualTime(t, *queue.PausedAt, *queueFetched.PausedAt) // paused_at stays unchanged + requireEqualTime(t, queue.UpdatedAt, queueFetched.UpdatedAt) // updated_at stays unchanged + }) + + t.Run("ExistingUnpausedQueue", func(t *testing.T) { t.Parallel() exec, _ := setupExecutor(ctx, t, driver, beginTx) @@ -1974,7 +1992,7 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv require.ErrorIs(t, err, rivertype.ErrNotFound) }) - t.Run("AllQueues", func(t *testing.T) { + t.Run("AllQueuesExistingQueues", func(t *testing.T) { t.Parallel() exec, _ := setupExecutor(ctx, t, driver, beginTx) @@ -1998,25 +2016,48 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv require.NotNil(t, queue2Fetched.PausedAt) require.WithinDuration(t, now, *(queue2Fetched.PausedAt), 500*time.Millisecond) }) + + t.Run("AllQueuesNoQueues", func(t *testing.T) { + t.Parallel() + + exec, _ := setupExecutor(ctx, t, driver, beginTx) + + require.NoError(t, exec.QueuePause(ctx, rivercommon.AllQueuesString)) + }) }) t.Run("QueueResume", func(t *testing.T) { t.Parallel() - t.Run("ExistingQueue", func(t *testing.T) { + t.Run("ExistingPausedQueue", func(t *testing.T) { + t.Parallel() + + exec, _ := setupExecutor(ctx, t, driver, beginTx) + + queue := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{ + PausedAt: ptrutil.Ptr(time.Now()), + }) + + require.NoError(t, exec.QueueResume(ctx, queue.Name)) + + queueFetched, err := exec.QueueGet(ctx, queue.Name) + require.NoError(t, err) + require.Nil(t, queueFetched.PausedAt) + }) + + t.Run("ExistingUnpausedQueue", func(t *testing.T) { t.Parallel() exec, _ := setupExecutor(ctx, t, driver, beginTx) queue := testfactory.Queue(ctx, t, exec, nil) - require.Nil(t, queue.PausedAt) - require.NoError(t, exec.QueuePause(ctx, queue.Name)) require.NoError(t, exec.QueueResume(ctx, queue.Name)) queueFetched, err := exec.QueueGet(ctx, queue.Name) require.NoError(t, err) require.Nil(t, queueFetched.PausedAt) + requireEqualTime(t, queue.UpdatedAt, queueFetched.UpdatedAt) // updated_at stays unchanged }) t.Run("NonExistentQueue", func(t *testing.T) { @@ -2028,7 +2069,7 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv require.ErrorIs(t, err, rivertype.ErrNotFound) }) - t.Run("AllQueues", func(t *testing.T) { + t.Run("AllQueuesExistingQueues", func(t *testing.T) { t.Parallel() exec, _ := setupExecutor(ctx, t, driver, beginTx) @@ -2049,6 +2090,14 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv require.NoError(t, err) require.Nil(t, queue2Fetched.PausedAt) }) + + t.Run("AllQueuesNoQueues", func(t *testing.T) { + t.Parallel() + + exec, _ := setupExecutor(ctx, t, driver, beginTx) + + require.NoError(t, exec.QueueResume(ctx, rivercommon.AllQueuesString)) + }) }) }) } diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 3a3a3d71..bc13f868 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -20,6 +20,8 @@ import ( "github.com/riverqueue/river/rivertype" ) +const AllQueuesString = "*" + var ( ErrClosedPool = errors.New("underlying driver pool is closed") ErrNotImplemented = errors.New("driver does not implement this functionality") diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql index 9323b774..0c4da1ab 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql @@ -48,32 +48,49 @@ LIMIT @limit_count::integer; -- name: QueuePause :execresult WITH queue_to_update AS ( - SELECT name + SELECT name, paused_at FROM river_queue - WHERE CASE WHEN @name::text = '*' THEN true ELSE river_queue.name = @name::text END - AND paused_at IS NULL + WHERE CASE WHEN @name::text = '*' THEN true ELSE name = @name END FOR UPDATE +), +updated_queue AS ( + UPDATE river_queue + SET + paused_at = now(), + updated_at = now() + FROM queue_to_update + WHERE river_queue.name = queue_to_update.name + AND river_queue.paused_at IS NULL + RETURNING river_queue.* ) - -UPDATE river_queue -SET - paused_at = now(), - updated_at = now() -FROM queue_to_update -WHERE river_queue.name = queue_to_update.name; +SELECT * +FROM river_queue +WHERE name = @name + AND name NOT IN (SELECT name FROM updated_queue) +UNION +SELECT * +FROM updated_queue; -- name: QueueResume :execresult WITH queue_to_update AS ( SELECT name FROM river_queue WHERE CASE WHEN @name::text = '*' THEN true ELSE river_queue.name = @name::text END - AND paused_at IS NOT NULL FOR UPDATE +), +updated_queue AS ( + UPDATE river_queue + SET + paused_at = NULL, + updated_at = now() + FROM queue_to_update + WHERE river_queue.name = queue_to_update.name + RETURNING river_queue.* ) - -UPDATE river_queue -SET - paused_at = NULL, - updated_at = now() -FROM queue_to_update -WHERE river_queue.name = queue_to_update.name; +SELECT * +FROM river_queue +WHERE name = @name + AND name NOT IN (SELECT name FROM updated_queue) +UNION +SELECT * +FROM updated_queue; \ No newline at end of file diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go index 838ff10e..7d2a7f1e 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go @@ -153,19 +153,28 @@ func (q *Queries) QueueList(ctx context.Context, db DBTX, limitCount int32) ([]* const queuePause = `-- name: QueuePause :execresult WITH queue_to_update AS ( - SELECT name + SELECT name, paused_at FROM river_queue - WHERE CASE WHEN $1::text = '*' THEN true ELSE river_queue.name = $1::text END - AND paused_at IS NULL + WHERE CASE WHEN $1::text = '*' THEN true ELSE name = $1 END FOR UPDATE +), +updated_queue AS ( + UPDATE river_queue + SET + paused_at = now(), + updated_at = now() + FROM queue_to_update + WHERE river_queue.name = queue_to_update.name + AND river_queue.paused_at IS NULL + RETURNING river_queue.name, river_queue.created_at, river_queue.metadata, river_queue.paused_at, river_queue.updated_at ) - -UPDATE river_queue -SET - paused_at = now(), - updated_at = now() -FROM queue_to_update -WHERE river_queue.name = queue_to_update.name +SELECT name, created_at, metadata, paused_at, updated_at +FROM river_queue +WHERE name = $1 + AND name NOT IN (SELECT name FROM updated_queue) +UNION +SELECT name, created_at, metadata, paused_at, updated_at +FROM updated_queue ` func (q *Queries) QueuePause(ctx context.Context, db DBTX, name string) (pgconn.CommandTag, error) { @@ -177,16 +186,24 @@ WITH queue_to_update AS ( SELECT name FROM river_queue WHERE CASE WHEN $1::text = '*' THEN true ELSE river_queue.name = $1::text END - AND paused_at IS NOT NULL FOR UPDATE +), +updated_queue AS ( + UPDATE river_queue + SET + paused_at = NULL, + updated_at = now() + FROM queue_to_update + WHERE river_queue.name = queue_to_update.name + RETURNING river_queue.name, river_queue.created_at, river_queue.metadata, river_queue.paused_at, river_queue.updated_at ) - -UPDATE river_queue -SET - paused_at = NULL, - updated_at = now() -FROM queue_to_update -WHERE river_queue.name = queue_to_update.name +SELECT name, created_at, metadata, paused_at, updated_at +FROM river_queue +WHERE name = $1 + AND name NOT IN (SELECT name FROM updated_queue) +UNION +SELECT name, created_at, metadata, paused_at, updated_at +FROM updated_queue ` func (q *Queries) QueueResume(ctx context.Context, db DBTX, name string) (pgconn.CommandTag, error) { diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 341b30fc..683e829a 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -524,7 +524,7 @@ func (e *Executor) QueuePause(ctx context.Context, name string) error { if err != nil { return interpretError(err) } - if res.RowsAffected() == 0 { + if res.RowsAffected() == 0 && name != riverdriver.AllQueuesString { return rivertype.ErrNotFound } return nil @@ -535,7 +535,7 @@ func (e *Executor) QueueResume(ctx context.Context, name string) error { if err != nil { return interpretError(err) } - if res.RowsAffected() == 0 { + if res.RowsAffected() == 0 && name != riverdriver.AllQueuesString { return rivertype.ErrNotFound } return nil