diff --git a/CHANGELOG.md b/CHANGELOG.md index d8114de8..e48b9be3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,10 +10,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Basic stuck detection after a job's exceeded its timeout and still not returned after the executor's initiated context cancellation and waited a short margin for the cancellation to take effect. [PR #1097](https://github.com/riverqueue/river/pull/1097). +- Added `Client.JobUpdate` which can be used to persist job output partway through a running work function instead of having to wait until the job is completed. [PR #1098](https://github.com/riverqueue/river/pull/1098). - Add a little more error flavor for when encountering a deadline exceeded error on leadership election suggesting that the user may want to try increasing their database pool size. [PR #1101](https://github.com/riverqueue/river/pull/1101). ## [0.29.0-rc.1] - 2025-12-04 +### Added + - Added `HookPeriodicJobsStart` that can be used to run custom logic when a periodic job enqueuer starts up on a new leader. [PR #1084](https://github.com/riverqueue/river/pull/1084). - Added `Client.Notify().RequestResign` and `Client.Notify().RequestResignTx` functions allowing any client to request that the current leader resign. [PR #1085](https://github.com/riverqueue/river/pull/1085). diff --git a/client.go b/client.go index 35cbe0c2..a2c21875 100644 --- a/client.go +++ b/client.go @@ -17,6 +17,7 @@ import ( "github.com/riverqueue/river/internal/dbunique" "github.com/riverqueue/river/internal/hooklookup" "github.com/riverqueue/river/internal/jobcompleter" + "github.com/riverqueue/river/internal/jobexecutor" "github.com/riverqueue/river/internal/leadership" "github.com/riverqueue/river/internal/maintenance" "github.com/riverqueue/river/internal/middlewarelookup" @@ -1514,6 +1515,97 @@ func (c *Client[TTx]) jobRetry(ctx context.Context, exec riverdriver.Executor, i }) } +// JobUpdateParams contains parameters for Client.JobUpdate and Client.JobUpdateTx. +type JobUpdateParams struct { + // Output is a new output value for a job. + // + // If not set, and a job is updated from inside a work function, the job's + // output is set based on output recorded so far using RecordOutput. + Output any +} + +// JobUpdate updates the job with the given ID. +// +// If JobUpdateParams.Output is not set, this function may be used inside a job +// work function to set a job's output based on output recorded so far using +// RecordOutput. +func (c *Client[TTx]) JobUpdate(ctx context.Context, id int64, params *JobUpdateParams) (*rivertype.JobRow, error) { + return c.jobUpdate(ctx, c.driver.GetExecutor(), id, params) +} + +// JobUpdateTx updates the job with the given ID. +// +// If JobUpdateParams.Output is not set, this function may be used inside a job +// work function to set a job's output based on output recorded so far using +// RecordOutput. +// +// This variant updates the job inside of a transaction. +func (c *Client[TTx]) JobUpdateTx(ctx context.Context, tx TTx, id int64, params *JobUpdateParams) (*rivertype.JobRow, error) { + return c.jobUpdate(ctx, c.driver.UnwrapExecutor(tx), id, params) +} + +func (c *Client[TTx]) jobUpdate(ctx context.Context, exec riverdriver.Executor, id int64, params *JobUpdateParams) (*rivertype.JobRow, error) { + if params == nil { + params = &JobUpdateParams{} + } + + outputFromWorkContext := func() json.RawMessage { + metadataUpdates, hasMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx) + if !hasMetadataUpdates { + return nil + } + + if val, ok := metadataUpdates[rivertype.MetadataKeyOutput]; ok { + return val.(json.RawMessage) //nolint:forcetypeassert + } + + return nil + }() + + var ( + metadataDoMerge bool + metadataUpdatesBytes = []byte("{}") // even in the event of no update, still valid jsonb + ) + if outputFromWorkContext != nil || params.Output != nil { + metadataDoMerge = true + + var outputBytes json.RawMessage + + switch { + // comes first because params takes precedence over context output + case params.Output != nil: + var err error + outputBytes, err = json.Marshal(params.Output) + if err != nil { + return nil, err + } + + if err := checkOutputSize(outputBytes); err != nil { + return nil, err + } + + case outputFromWorkContext != nil: + // no size check necessary here because it's already been checked in RecordOutput + outputBytes = outputFromWorkContext + } + + var err error + metadataUpdatesBytes, err = json.Marshal(map[string]json.RawMessage{ + rivertype.MetadataKeyOutput: outputBytes, + }) + if err != nil { + return nil, fmt.Errorf("error marshaling metadata updates to JSON: %w", err) + } + } + + return exec.JobUpdate(ctx, &riverdriver.JobUpdateParams{ + ID: id, + MetadataDoMerge: metadataDoMerge, + Metadata: metadataUpdatesBytes, + Schema: c.config.Schema, + }) +} + // ID returns the unique ID of this client as set in its config or // auto-generated if not specified. func (c *Client[TTx]) ID() string { diff --git a/client_test.go b/client_test.go index dd8f0cff..d07c1a67 100644 --- a/client_test.go +++ b/client_test.go @@ -24,6 +24,7 @@ import ( "github.com/tidwall/sjson" "github.com/riverqueue/river/internal/dbunique" + "github.com/riverqueue/river/internal/jobexecutor" "github.com/riverqueue/river/internal/maintenance" "github.com/riverqueue/river/internal/middlewarelookup" "github.com/riverqueue/river/internal/notifier" @@ -4534,6 +4535,187 @@ func Test_Client_JobRetry(t *testing.T) { }) } +func Test_Client_JobUpdate(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testBundle struct { + dbPool *pgxpool.Pool + } + + setup := func(t *testing.T) (*Client[pgx.Tx], *testBundle) { + t.Helper() + + var ( + dbPool = riversharedtest.DBPool(ctx, t) + driver = riverpgxv5.New(dbPool) + schema = riverdbtest.TestSchema(ctx, t, driver, nil) + config = newTestConfig(t, schema) + client = newTestClient(t, dbPool, config) + ) + + return client, &testBundle{dbPool: dbPool} + } + + t.Run("AllParams", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{}) + require.NoError(t, err) + + job, err := client.JobUpdate(ctx, insertRes.Job.ID, &JobUpdateParams{ + Output: "my job output", + }) + require.NoError(t, err) + require.Equal(t, `"my job output"`, string(job.Output())) + + updatedJob, err := client.JobGet(ctx, job.ID) + require.NoError(t, err) + require.Equal(t, `"my job output"`, string(updatedJob.Output())) + }) + + t.Run("NoParams", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{}) + require.NoError(t, err) + + _, err = client.JobUpdate(ctx, insertRes.Job.ID, nil) + require.NoError(t, err) + }) + + t.Run("OutputFromContext", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{}) + require.NoError(t, err) + + ctx := context.WithValue(ctx, jobexecutor.ContextKeyMetadataUpdates, map[string]any{}) + require.NoError(t, RecordOutput(ctx, "my job output from context")) + + job, err := client.JobUpdate(ctx, insertRes.Job.ID, &JobUpdateParams{}) + require.NoError(t, err) + require.Equal(t, `"my job output from context"`, string(job.Output())) + + updatedJob, err := client.JobGet(ctx, job.ID) + require.NoError(t, err) + require.Equal(t, `"my job output from context"`, string(updatedJob.Output())) + }) + + t.Run("ParamOutputTakesPrecedenceOverContextOutput", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{}) + require.NoError(t, err) + + ctx := context.WithValue(ctx, jobexecutor.ContextKeyMetadataUpdates, map[string]any{}) + require.NoError(t, RecordOutput(ctx, "my job output from context")) + + job, err := client.JobUpdate(ctx, insertRes.Job.ID, &JobUpdateParams{ + Output: "my job output from params", + }) + require.NoError(t, err) + require.Equal(t, `"my job output from params"`, string(job.Output())) + + updatedJob, err := client.JobGet(ctx, job.ID) + require.NoError(t, err) + require.Equal(t, `"my job output from params"`, string(updatedJob.Output())) + }) + + t.Run("ParamOutputTooLarge", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{}) + require.NoError(t, err) + + _, err = client.JobUpdate(ctx, insertRes.Job.ID, &JobUpdateParams{ + Output: strings.Repeat("x", maxOutputSizeBytes+1), + }) + require.ErrorContains(t, err, "output is too large") + }) +} + +func Test_Client_JobUpdateTx(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testBundle struct { + dbPool *pgxpool.Pool + executorTx riverdriver.ExecutorTx + tx pgx.Tx + } + + setup := func(t *testing.T) (*Client[pgx.Tx], *testBundle) { + t.Helper() + + var ( + dbPool = riversharedtest.DBPool(ctx, t) + driver = riverpgxv5.New(dbPool) + schema = riverdbtest.TestSchema(ctx, t, driver, nil) + config = newTestConfig(t, schema) + client = newTestClient(t, dbPool, config) + ) + + tx, err := dbPool.Begin(ctx) + require.NoError(t, err) + t.Cleanup(func() { tx.Rollback(ctx) }) + + return client, &testBundle{ + dbPool: dbPool, + executorTx: client.driver.UnwrapExecutor(tx), + tx: tx, + } + } + + t.Run("AllParams", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t) + + insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{}) + require.NoError(t, err) + + job, err := client.JobUpdateTx(ctx, bundle.tx, insertRes.Job.ID, &JobUpdateParams{ + Output: "my job output", + }) + require.NoError(t, err) + require.Equal(t, `"my job output"`, string(job.Output())) + + updatedJob, err := client.JobGetTx(ctx, bundle.tx, job.ID) + require.NoError(t, err) + require.Equal(t, `"my job output"`, string(updatedJob.Output())) + + // Outside of transaction shows original + updatedJob, err = client.JobGet(ctx, job.ID) + require.NoError(t, err) + require.Empty(t, string(updatedJob.Output())) + }) + + t.Run("NoParams", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t) + + insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{}) + require.NoError(t, err) + + _, err = client.JobUpdateTx(ctx, bundle.tx, insertRes.Job.ID, nil) + require.NoError(t, err) + }) +} + func Test_Client_ErrorHandler(t *testing.T) { t.Parallel() @@ -5859,7 +6041,7 @@ func Test_Client_RetryPolicy(t *testing.T) { // regression protection to ensure we're testing the right number of jobs: require.Equal(t, rivercommon.MaxAttemptsDefault, insertRes.Job.MaxAttempts) - updatedJob, err := client.driver.GetExecutor().JobUpdate(ctx, &riverdriver.JobUpdateParams{ + updatedJob, err := client.driver.GetExecutor().JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{ ID: insertRes.Job.ID, AttemptedAtDoUpdate: true, AttemptedAt: &now, // we want a value here, but it'll be overwritten as jobs are locked by the producer @@ -6647,7 +6829,7 @@ func Test_Client_JobCompletion(t *testing.T) { } AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { - _, err := client.driver.GetExecutor().JobUpdate(ctx, &riverdriver.JobUpdateParams{ + _, err := client.driver.GetExecutor().JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{ ID: job.ID, FinalizedAtDoUpdate: true, FinalizedAt: &now, @@ -6753,7 +6935,7 @@ func Test_Client_JobCompletion(t *testing.T) { } AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { - _, err := client.driver.GetExecutor().JobUpdate(ctx, &riverdriver.JobUpdateParams{ + _, err := client.driver.GetExecutor().JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{ ID: job.ID, ErrorsDoUpdate: true, Errors: [][]byte{[]byte("{\"error\": \"oops\"}")}, diff --git a/internal/jobexecutor/job_executor_test.go b/internal/jobexecutor/job_executor_test.go index 4c553d88..86e36d45 100644 --- a/internal/jobexecutor/job_executor_test.go +++ b/internal/jobexecutor/job_executor_test.go @@ -318,7 +318,7 @@ func TestJobExecutor_Execute(t *testing.T) { require.Equal(t, rivertype.JobStateAvailable, job.State) } - _, err := bundle.exec.JobUpdate(ctx, &riverdriver.JobUpdateParams{ + _, err := bundle.exec.JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{ ID: bundle.jobRow.ID, StateDoUpdate: true, State: rivertype.JobStateRunning, @@ -373,7 +373,7 @@ func TestJobExecutor_Execute(t *testing.T) { // add a unique key so we can verify it's cleared var err error - bundle.jobRow, err = bundle.exec.JobUpdate(ctx, &riverdriver.JobUpdateParams{ + bundle.jobRow, err = bundle.exec.JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{ ID: bundle.jobRow.ID, State: rivertype.JobStateAvailable, // required for encoding but ignored }) diff --git a/recorded_output.go b/recorded_output.go index 1e232c4d..a7879e3c 100644 --- a/recorded_output.go +++ b/recorded_output.go @@ -43,6 +43,11 @@ const ( // Once recorded, the output is stored regardless of the outcome of the // execution attempt (success, error, panic, etc.). // +// RecordOutput always stores output lazily as a job is being completed (whether +// that's completion to success or failure). Client.JobUpdate and JobUpdateTx +// are available to store output eagerly at any time, including from inside a +// work function as the job is being executed. +// // The output is marshalled to JSON as part of this function and it will return // an error if the output is not JSON-encodable. func RecordOutput(ctx context.Context, output any) error { @@ -51,17 +56,24 @@ func RecordOutput(ctx context.Context, output any) error { return errors.New("RecordOutput must be called within a Worker") } - metadataUpdatesBytes, err := json.Marshal(output) + outputBytes, err := json.Marshal(output) if err != nil { return err } - // Postgres JSONB is limited to 255MB, but it would be a bad idea to get - // anywhere close to that limit here. - if len(metadataUpdatesBytes) > maxOutputSizeBytes { - return fmt.Errorf("output is too large: %d bytes (max %d MB)", len(metadataUpdatesBytes), maxOutputSizeMB) + if err := checkOutputSize(outputBytes); err != nil { + return err } - metadataUpdates[rivertype.MetadataKeyOutput] = json.RawMessage(metadataUpdatesBytes) + metadataUpdates[rivertype.MetadataKeyOutput] = json.RawMessage(outputBytes) + return nil +} + +// Postgres JSONB is limited to 255MB, but it would be a bad idea to get +// anywhere close to that limit for output. +func checkOutputSize(outputBytes []byte) error { + if len(outputBytes) > maxOutputSizeBytes { + return fmt.Errorf("output is too large: %d bytes (max %d MB)", len(outputBytes), maxOutputSizeMB) + } return nil } diff --git a/recorded_output_test.go b/recorded_output_test.go index cb43a44c..83e3f124 100644 --- a/recorded_output_test.go +++ b/recorded_output_test.go @@ -145,7 +145,7 @@ func Test_RecordedOutput(t *testing.T) { AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { // Record an output of 32MB + 1 byte: - err := RecordOutput(ctx, strings.Repeat("x", 32*1024*1024+1)) + err := RecordOutput(ctx, strings.Repeat("x", maxOutputSizeBytes+1)) require.ErrorContains(t, err, "output is too large") return err })) diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index ce446980..aed92b7e 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -222,6 +222,7 @@ type Executor interface { JobSchedule(ctx context.Context, params *JobScheduleParams) ([]*JobScheduleResult, error) JobSetStateIfRunningMany(ctx context.Context, params *JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) JobUpdate(ctx context.Context, params *JobUpdateParams) (*rivertype.JobRow, error) + JobUpdateFull(ctx context.Context, params *JobUpdateFullParams) (*rivertype.JobRow, error) LeaderAttemptElect(ctx context.Context, params *LeaderElectParams) (bool, error) LeaderAttemptReelect(ctx context.Context, params *LeaderElectParams) (bool, error) LeaderDeleteExpired(ctx context.Context, params *LeaderDeleteExpiredParams) (int, error) @@ -634,6 +635,13 @@ type JobSetStateIfRunningManyParams struct { } type JobUpdateParams struct { + ID int64 + MetadataDoMerge bool + Metadata []byte + Schema string +} + +type JobUpdateFullParams struct { ID int64 AttemptDoUpdate bool Attempt int diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index e0e7db36..8cc6c9a2 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -1613,6 +1613,54 @@ func (q *Queries) JobSetStateIfRunningMany(ctx context.Context, db DBTX, arg *Jo } const jobUpdate = `-- name: JobUpdate :one +WITH locked_job AS ( + SELECT id + FROM /* TEMPLATE: schema */river_job + WHERE river_job.id = $3 + FOR UPDATE +) +UPDATE /* TEMPLATE: schema */river_job +SET + metadata = CASE WHEN $1::boolean THEN metadata || $2::jsonb ELSE metadata END +FROM + locked_job +WHERE river_job.id = locked_job.id +RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key, river_job.unique_states +` + +type JobUpdateParams struct { + MetadataDoMerge bool + Metadata string + ID int64 +} + +func (q *Queries) JobUpdate(ctx context.Context, db DBTX, arg *JobUpdateParams) (*RiverJob, error) { + row := db.QueryRowContext(ctx, jobUpdate, arg.MetadataDoMerge, arg.Metadata, arg.ID) + var i RiverJob + err := row.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + pq.Array(&i.AttemptedBy), + &i.CreatedAt, + pq.Array(&i.Errors), + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + pq.Array(&i.Tags), + &i.UniqueKey, + &i.UniqueStates, + ) + return &i, err +} + +const jobUpdateFull = `-- name: JobUpdateFull :one UPDATE /* TEMPLATE: schema */river_job SET attempt = CASE WHEN $1::boolean THEN $2 ELSE attempt END, @@ -1627,7 +1675,7 @@ WHERE id = $17 RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states ` -type JobUpdateParams struct { +type JobUpdateFullParams struct { AttemptDoUpdate bool Attempt int16 AttemptedAtDoUpdate bool @@ -1649,8 +1697,8 @@ type JobUpdateParams struct { // A generalized update for any property on a job. This brings in a large number // of parameters and therefore may be more suitable for testing than production. -func (q *Queries) JobUpdate(ctx context.Context, db DBTX, arg *JobUpdateParams) (*RiverJob, error) { - row := db.QueryRowContext(ctx, jobUpdate, +func (q *Queries) JobUpdateFull(ctx context.Context, db DBTX, arg *JobUpdateFullParams) (*RiverJob, error) { + row := db.QueryRowContext(ctx, jobUpdateFull, arg.AttemptDoUpdate, arg.Attempt, arg.AttemptedAtDoUpdate, diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index 832035e5..df31d6c5 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -668,6 +668,24 @@ func (e *Executor) JobUpdate(ctx context.Context, params *riverdriver.JobUpdateP } job, err := dbsqlc.New().JobUpdate(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobUpdateParams{ + ID: params.ID, + MetadataDoMerge: params.MetadataDoMerge, + Metadata: string(metadata), + }) + if err != nil { + return nil, interpretError(err) + } + + return jobRowFromInternal(job) +} + +func (e *Executor) JobUpdateFull(ctx context.Context, params *riverdriver.JobUpdateFullParams) (*rivertype.JobRow, error) { + metadata := params.Metadata + if metadata == nil { + metadata = []byte("{}") + } + + job, err := dbsqlc.New().JobUpdateFull(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobUpdateFullParams{ ID: params.ID, Attempt: int16(min(params.Attempt, math.MaxInt16)), //nolint:gosec AttemptDoUpdate: params.AttemptDoUpdate, diff --git a/riverdriver/riverdrivertest/riverdrivertest.go b/riverdriver/riverdrivertest/riverdrivertest.go index d2734f2e..c9965432 100644 --- a/riverdriver/riverdrivertest/riverdrivertest.go +++ b/riverdriver/riverdrivertest/riverdrivertest.go @@ -3339,6 +3339,44 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, t.Run("JobUpdate", func(t *testing.T) { t.Parallel() + t.Run("AllArgs", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + Metadata: []byte(`{"key1":"val1"}`), + }) + + updatedJob, err := exec.JobUpdate(ctx, &riverdriver.JobUpdateParams{ + ID: job.ID, + MetadataDoMerge: true, + Metadata: []byte(`{"key2":"val2"}`), + }) + require.NoError(t, err) + require.JSONEq(t, `{"key1":"val1","key2":"val2"}`, string(updatedJob.Metadata)) + }) + + t.Run("NoArgs", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + Metadata: []byte(`{"key1":"val1"}`), + }) + + updatedJob, err := exec.JobUpdate(ctx, &riverdriver.JobUpdateParams{ + ID: job.ID, + }) + require.NoError(t, err) + require.JSONEq(t, `{"key1":"val1"}`, string(updatedJob.Metadata)) + }) + }) + + t.Run("JobUpdateFull", func(t *testing.T) { + t.Parallel() + t.Run("AllArgs", func(t *testing.T) { t.Parallel() @@ -3348,7 +3386,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, now := time.Now().UTC() - updatedJob, err := exec.JobUpdate(ctx, &riverdriver.JobUpdateParams{ + updatedJob, err := exec.JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{ ID: job.ID, AttemptDoUpdate: true, Attempt: 7, @@ -3385,7 +3423,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{}) - updatedJob, err := exec.JobUpdate(ctx, &riverdriver.JobUpdateParams{ + updatedJob, err := exec.JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{ ID: job.ID, }) require.NoError(t, err) diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index 891b411b..95a2493e 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -697,9 +697,24 @@ SELECT * FROM updated ORDER BY id; +-- name: JobUpdate :one +WITH locked_job AS ( + SELECT id + FROM /* TEMPLATE: schema */river_job + WHERE river_job.id = @id + FOR UPDATE +) +UPDATE /* TEMPLATE: schema */river_job +SET + metadata = CASE WHEN @metadata_do_merge::boolean THEN metadata || @metadata::jsonb ELSE metadata END +FROM + locked_job +WHERE river_job.id = locked_job.id +RETURNING river_job.*; + -- A generalized update for any property on a job. This brings in a large number -- of parameters and therefore may be more suitable for testing than production. --- name: JobUpdate :one +-- name: JobUpdateFull :one UPDATE /* TEMPLATE: schema */river_job SET attempt = CASE WHEN @attempt_do_update::boolean THEN @attempt ELSE attempt END, @@ -711,4 +726,4 @@ SET metadata = CASE WHEN @metadata_do_update::boolean THEN @metadata::jsonb ELSE metadata END, state = CASE WHEN @state_do_update::boolean THEN @state::/* TEMPLATE: schema */river_job_state ELSE state END WHERE id = @id -RETURNING *; +RETURNING *; \ No newline at end of file diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index b56e80ec..53159d08 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -1574,6 +1574,54 @@ func (q *Queries) JobSetStateIfRunningMany(ctx context.Context, db DBTX, arg *Jo } const jobUpdate = `-- name: JobUpdate :one +WITH locked_job AS ( + SELECT id + FROM /* TEMPLATE: schema */river_job + WHERE river_job.id = $3 + FOR UPDATE +) +UPDATE /* TEMPLATE: schema */river_job +SET + metadata = CASE WHEN $1::boolean THEN metadata || $2::jsonb ELSE metadata END +FROM + locked_job +WHERE river_job.id = locked_job.id +RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key, river_job.unique_states +` + +type JobUpdateParams struct { + MetadataDoMerge bool + Metadata []byte + ID int64 +} + +func (q *Queries) JobUpdate(ctx context.Context, db DBTX, arg *JobUpdateParams) (*RiverJob, error) { + row := db.QueryRow(ctx, jobUpdate, arg.MetadataDoMerge, arg.Metadata, arg.ID) + var i RiverJob + err := row.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + &i.AttemptedBy, + &i.CreatedAt, + &i.Errors, + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + &i.Tags, + &i.UniqueKey, + &i.UniqueStates, + ) + return &i, err +} + +const jobUpdateFull = `-- name: JobUpdateFull :one UPDATE /* TEMPLATE: schema */river_job SET attempt = CASE WHEN $1::boolean THEN $2 ELSE attempt END, @@ -1588,7 +1636,7 @@ WHERE id = $17 RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states ` -type JobUpdateParams struct { +type JobUpdateFullParams struct { AttemptDoUpdate bool Attempt int16 AttemptedAtDoUpdate bool @@ -1610,8 +1658,8 @@ type JobUpdateParams struct { // A generalized update for any property on a job. This brings in a large number // of parameters and therefore may be more suitable for testing than production. -func (q *Queries) JobUpdate(ctx context.Context, db DBTX, arg *JobUpdateParams) (*RiverJob, error) { - row := db.QueryRow(ctx, jobUpdate, +func (q *Queries) JobUpdateFull(ctx context.Context, db DBTX, arg *JobUpdateFullParams) (*RiverJob, error) { + row := db.QueryRow(ctx, jobUpdateFull, arg.AttemptDoUpdate, arg.Attempt, arg.AttemptedAtDoUpdate, diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 31f8ad48..9490b486 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -657,6 +657,24 @@ func (e *Executor) JobUpdate(ctx context.Context, params *riverdriver.JobUpdateP } job, err := dbsqlc.New().JobUpdate(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobUpdateParams{ + ID: params.ID, + MetadataDoMerge: params.MetadataDoMerge, + Metadata: metadata, + }) + if err != nil { + return nil, interpretError(err) + } + + return jobRowFromInternal(job) +} + +func (e *Executor) JobUpdateFull(ctx context.Context, params *riverdriver.JobUpdateFullParams) (*rivertype.JobRow, error) { + metadata := params.Metadata + if metadata == nil { + metadata = []byte("{}") + } + + job, err := dbsqlc.New().JobUpdateFull(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobUpdateFullParams{ ID: params.ID, AttemptedAtDoUpdate: params.AttemptedAtDoUpdate, Attempt: int16(min(params.Attempt, math.MaxInt16)), //nolint:gosec diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql index 4d23b2af..dfb72dc3 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql @@ -492,9 +492,16 @@ WHERE id = @id AND state = 'running' RETURNING *; +-- name: JobUpdate :one +UPDATE /* TEMPLATE: schema */river_job +SET + metadata = CASE WHEN cast(@metadata_do_merge AS boolean) THEN json_patch(metadata, json(cast(@metadata AS blob))) ELSE metadata END +WHERE id = @id +RETURNING *; + -- A generalized update for any property on a job. This brings in a large number -- of parameters and therefore may be more suitable for testing than production. --- name: JobUpdate :one +-- name: JobUpdateFull :one UPDATE /* TEMPLATE: schema */river_job SET attempt = CASE WHEN cast(@attempt_do_update AS boolean) THEN @attempt ELSE attempt END, @@ -506,4 +513,4 @@ SET metadata = CASE WHEN cast(@metadata_do_update AS boolean) THEN json(cast(@metadata AS blob)) ELSE metadata END, state = CASE WHEN cast(@state_do_update AS boolean) THEN @state ELSE state END WHERE id = @id -RETURNING *; +RETURNING *; \ No newline at end of file diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go index d561d17d..0a86bc05 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go @@ -1497,6 +1497,46 @@ func (q *Queries) JobSetStateIfRunning(ctx context.Context, db DBTX, arg *JobSet const jobUpdate = `-- name: JobUpdate :one UPDATE /* TEMPLATE: schema */river_job +SET + metadata = CASE WHEN cast(?1 AS boolean) THEN json_patch(metadata, json(cast(?2 AS blob))) ELSE metadata END +WHERE id = ?3 +RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +` + +type JobUpdateParams struct { + MetadataDoMerge bool + Metadata []byte + ID int64 +} + +func (q *Queries) JobUpdate(ctx context.Context, db DBTX, arg *JobUpdateParams) (*RiverJob, error) { + row := db.QueryRowContext(ctx, jobUpdate, arg.MetadataDoMerge, arg.Metadata, arg.ID) + var i RiverJob + err := row.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + &i.AttemptedBy, + &i.CreatedAt, + &i.Errors, + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + &i.Tags, + &i.UniqueKey, + &i.UniqueStates, + ) + return &i, err +} + +const jobUpdateFull = `-- name: JobUpdateFull :one +UPDATE /* TEMPLATE: schema */river_job SET attempt = CASE WHEN cast(?1 AS boolean) THEN ?2 ELSE attempt END, attempted_at = CASE WHEN cast(?3 AS boolean) THEN ?4 ELSE attempted_at END, @@ -1510,7 +1550,7 @@ WHERE id = ?17 RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states ` -type JobUpdateParams struct { +type JobUpdateFullParams struct { AttemptDoUpdate bool Attempt int64 AttemptedAtDoUpdate bool @@ -1532,8 +1572,8 @@ type JobUpdateParams struct { // A generalized update for any property on a job. This brings in a large number // of parameters and therefore may be more suitable for testing than production. -func (q *Queries) JobUpdate(ctx context.Context, db DBTX, arg *JobUpdateParams) (*RiverJob, error) { - row := db.QueryRowContext(ctx, jobUpdate, +func (q *Queries) JobUpdateFull(ctx context.Context, db DBTX, arg *JobUpdateFullParams) (*RiverJob, error) { + row := db.QueryRowContext(ctx, jobUpdateFull, arg.AttemptDoUpdate, arg.Attempt, arg.AttemptedAtDoUpdate, diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index 09259c60..ccfab633 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -998,6 +998,24 @@ func (e *Executor) JobSetStateIfRunningMany(ctx context.Context, params *riverdr } func (e *Executor) JobUpdate(ctx context.Context, params *riverdriver.JobUpdateParams) (*rivertype.JobRow, error) { + metadata := params.Metadata + if metadata == nil { + metadata = []byte("{}") + } + + job, err := dbsqlc.New().JobUpdate(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobUpdateParams{ + ID: params.ID, + MetadataDoMerge: params.MetadataDoMerge, + Metadata: metadata, + }) + if err != nil { + return nil, interpretError(err) + } + + return jobRowFromInternal(job) +} + +func (e *Executor) JobUpdateFull(ctx context.Context, params *riverdriver.JobUpdateFullParams) (*rivertype.JobRow, error) { attemptedAt := params.AttemptedAt if attemptedAt != nil { attemptedAt = ptrutil.Ptr(attemptedAt.UTC()) @@ -1023,7 +1041,7 @@ func (e *Executor) JobUpdate(ctx context.Context, params *riverdriver.JobUpdateP metadata = []byte("{}") } - job, err := dbsqlc.New().JobUpdate(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobUpdateParams{ + job, err := dbsqlc.New().JobUpdateFull(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobUpdateFullParams{ ID: params.ID, Attempt: int64(params.Attempt), AttemptDoUpdate: params.AttemptDoUpdate, diff --git a/riverlog/river_log_test.go b/riverlog/river_log_test.go index a20d4baf..38bb9610 100644 --- a/riverlog/river_log_test.go +++ b/riverlog/river_log_test.go @@ -162,7 +162,7 @@ func TestMiddleware(t *testing.T) { require.NoError(t, err) // Set state back to available and unfinalize the job to make it runnable again. - workRes.Job, err = bundle.driver.UnwrapExecutor(bundle.tx).JobUpdate(ctx, &riverdriver.JobUpdateParams{ + workRes.Job, err = bundle.driver.UnwrapExecutor(bundle.tx).JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{ ID: workRes.Job.ID, FinalizedAtDoUpdate: true, FinalizedAt: nil, diff --git a/rivertest/worker.go b/rivertest/worker.go index 28de2c44..93382c44 100644 --- a/rivertest/worker.go +++ b/rivertest/worker.go @@ -157,7 +157,7 @@ func (w *Worker[T, TTx]) workJob(ctx context.Context, tb testing.TB, tx TTx, job } } - updatedJobRow, err := exec.JobUpdate(ctx, &riverdriver.JobUpdateParams{ + updatedJobRow, err := exec.JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{ ID: job.ID, Attempt: job.Attempt + 1, AttemptDoUpdate: true,