Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- The unused function `WorkerDefaults.Hooks` has been removed. This is technically a breaking change, but this function was a vestigal refactoring artifact that was never used by anything, so in practice it shouldn't be breaking. [PR #997](https://github.com/riverqueue/river/pull/997).
- Periodic job records are upserted immediately through a pilot when a client is started rather than the first time their associated job would run. This doesn't mean they're run immediately (they'll only run if `RunOnStart` is enabled), but rather just tracked immediately. [PR #998](https://github.com/riverqueue/river/pull/998).
- The job scheduler still schedules jobs in batches of up to 10,000, but when it encounters a series of consecutive timeouts it assumes that the database is in a degraded state and switches to doing work in a smaller batch size of 1,000 jobs. [PR #1013](https://github.com/riverqueue/river/pull/1013).
- Other maintenance services including the job cleaner, job rescuer, and queue cleaner also prefer a batch size of 10,000, but will fall back to smaller batches of 1,000 on consecutive database timeouts. [PR #1016](https://github.com/riverqueue/river/pull/1016).

### Fixed

Expand Down
41 changes: 34 additions & 7 deletions internal/maintenance/job_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/circuitbreaker"
"github.com/riverqueue/river/rivershared/riversharedmaintenance"
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/testsignal"
Expand All @@ -29,6 +30,8 @@ func (ts *JobCleanerTestSignals) Init(tb testutil.TestingTB) {
}

type JobCleanerConfig struct {
riversharedmaintenance.BatchSizes

// CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs
// around before they're removed permanently.
//
Expand Down Expand Up @@ -62,6 +65,8 @@ type JobCleanerConfig struct {
}

func (c *JobCleanerConfig) mustValidate() *JobCleanerConfig {
c.MustValidate()

if c.CancelledJobRetentionPeriod < -1 {
panic("JobCleanerConfig.CancelledJobRetentionPeriod must be above zero")
}
Expand Down Expand Up @@ -91,13 +96,23 @@ type JobCleaner struct {
Config *JobCleanerConfig
TestSignals JobCleanerTestSignals

batchSize int // configurable for test purposes
exec riverdriver.Executor
exec riverdriver.Executor

// Circuit breaker that tracks consecutive timeout failures from the central
// query. The query starts by using the full/default batch size, but after
// this breaker trips (after N consecutive timeouts occur in a row), it
// switches to a smaller batch. We assume that a database that's degraded is
// likely to stay degraded over a longer term, so after the circuit breaks,
// it stays broken until the program is restarted.
reducedBatchSizeBreaker *circuitbreaker.CircuitBreaker
}

func NewJobCleaner(archetype *baseservice.Archetype, config *JobCleanerConfig, exec riverdriver.Executor) *JobCleaner {
batchSizes := config.WithDefaults()

return baseservice.Init(archetype, &JobCleaner{
Config: (&JobCleanerConfig{
BatchSizes: batchSizes,
CancelledJobRetentionPeriod: cmp.Or(config.CancelledJobRetentionPeriod, riversharedmaintenance.CancelledJobRetentionPeriodDefault),
CompletedJobRetentionPeriod: cmp.Or(config.CompletedJobRetentionPeriod, riversharedmaintenance.CompletedJobRetentionPeriodDefault),
DiscardedJobRetentionPeriod: cmp.Or(config.DiscardedJobRetentionPeriod, riversharedmaintenance.DiscardedJobRetentionPeriodDefault),
Expand All @@ -106,9 +121,8 @@ func NewJobCleaner(archetype *baseservice.Archetype, config *JobCleanerConfig, e
Schema: config.Schema,
Timeout: cmp.Or(config.Timeout, riversharedmaintenance.JobCleanerTimeoutDefault),
}).mustValidate(),

batchSize: riversharedmaintenance.BatchSizeDefault,
exec: exec,
exec: exec,
reducedBatchSizeBreaker: riversharedmaintenance.ReducedBatchSizeBreaker(batchSizes),
})
}

Expand Down Expand Up @@ -154,6 +168,13 @@ func (s *JobCleaner) Start(ctx context.Context) error { //nolint:dupl
return nil
}

func (s *JobCleaner) batchSize() int {
if s.reducedBatchSizeBreaker.Open() {
return s.Config.Reduced
}
return s.Config.Default
}

type jobCleanerRunOnceResult struct {
NumJobsDeleted int
}
Expand Down Expand Up @@ -182,25 +203,31 @@ func (s *JobCleaner) runOnce(ctx context.Context) (*jobCleanerRunOnceResult, err
CompletedFinalizedAtHorizon: time.Now().Add(-s.Config.CompletedJobRetentionPeriod),
DiscardedDoDelete: s.Config.DiscardedJobRetentionPeriod != -1,
DiscardedFinalizedAtHorizon: time.Now().Add(-s.Config.DiscardedJobRetentionPeriod),
Max: s.batchSize,
Max: s.batchSize(),
QueuesExcluded: s.Config.QueuesExcluded,
Schema: s.Config.Schema,
})
if err != nil {
return 0, fmt.Errorf("error cleaning jobs: %w", err)
}

s.reducedBatchSizeBreaker.ResetIfNotOpen()

return numDeleted, nil
}()
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
s.reducedBatchSizeBreaker.Trip()
}

return nil, err
}

s.TestSignals.DeletedBatch.Signal(struct{}{})

res.NumJobsDeleted += numDeleted
// Deleted was less than query `LIMIT` which means work is done.
if numDeleted < s.batchSize {
if numDeleted < s.batchSize() {
break
}

Expand Down
76 changes: 74 additions & 2 deletions internal/maintenance/job_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,11 @@ func TestJobCleaner(t *testing.T) {
t.Parallel()

cleaner, bundle := setup(t)
cleaner.batchSize = 10 // reduced size for test speed
cleaner.Config.Default = 10 // reduced size for test speed

// Add one to our chosen batch size to get one extra job and therefore
// one extra batch, ensuring that we've tested working multiple.
numJobs := cleaner.batchSize + 1
numJobs := cleaner.Config.Default + 1

jobs := make([]*rivertype.JobRow, numJobs)

Expand Down Expand Up @@ -365,4 +365,76 @@ func TestJobCleaner(t *testing.T) {
_, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: discardedJob.ID, Schema: cleaner.Config.Schema})
require.ErrorIs(t, err, rivertype.ErrNotFound)
})

t.Run("ReducedBatchSizeBreakerTrips", func(t *testing.T) {
t.Parallel()

cleaner, _ := setup(t)

ctx, cancel := context.WithTimeout(ctx, 1*time.Nanosecond)
defer cancel()

// Starts at default batch size.
require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize())

for range cleaner.reducedBatchSizeBreaker.Limit() - 1 {
_, err := cleaner.runOnce(ctx)
require.Error(t, err)

// Circuit not broken yet so we stay at default batch size.
require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize())
}

_, err := cleaner.runOnce(ctx)
require.Error(t, err)

// Circuit now broken. Reduced batch size.
require.Equal(t, riversharedmaintenance.BatchSizeReduced, cleaner.batchSize())
})

t.Run("ReducedBatchSizeBreakerResetsOnSuccess", func(t *testing.T) { //nolint:dupl
t.Parallel()

cleaner, _ := setup(t)

{
ctx, cancel := context.WithTimeout(ctx, 1*time.Nanosecond)
defer cancel()

// Starts at default batch size.
require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize())

for range cleaner.reducedBatchSizeBreaker.Limit() - 1 {
_, err := cleaner.runOnce(ctx)
require.Error(t, err)

// Circuit not broken yet so we stay at default batch size.
require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize())
}
}

// Context has not been cancelled for this call so it succeeds.
_, err := cleaner.runOnce(ctx)
require.NoError(t, err)

require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize())

// Because of the success above, the circuit breaker resets. N - 1
// failures are allowed again before it breaks.
{
ctx, cancel := context.WithTimeout(ctx, 1*time.Nanosecond)
defer cancel()

// Starts at default batch size.
require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize())

for range cleaner.reducedBatchSizeBreaker.Limit() - 1 {
_, err := cleaner.runOnce(ctx)
require.Error(t, err)

// Circuit not broken yet so we stay at default batch size.
require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize())
}
}
})
}
41 changes: 34 additions & 7 deletions internal/maintenance/job_rescuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/riverqueue/river/internal/workunit"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/circuitbreaker"
"github.com/riverqueue/river/rivershared/riversharedmaintenance"
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/testsignal"
Expand Down Expand Up @@ -40,6 +41,8 @@ func (ts *JobRescuerTestSignals) Init(tb testutil.TestingTB) {
}

type JobRescuerConfig struct {
riversharedmaintenance.BatchSizes

// ClientRetryPolicy is the default retry policy to use for workers that don't
// override NextRetry.
ClientRetryPolicy jobexecutor.ClientRetryPolicy
Expand All @@ -59,6 +62,8 @@ type JobRescuerConfig struct {
}

func (c *JobRescuerConfig) mustValidate() *JobRescuerConfig {
c.MustValidate()

if c.ClientRetryPolicy == nil {
panic("RescuerConfig.ClientRetryPolicy must be set")
}
Expand All @@ -85,22 +90,31 @@ type JobRescuer struct {
Config *JobRescuerConfig
TestSignals JobRescuerTestSignals

batchSize int // configurable for test purposes
exec riverdriver.Executor
exec riverdriver.Executor

// Circuit breaker that tracks consecutive timeout failures from the central
// query. The query starts by using the full/default batch size, but after
// this breaker trips (after N consecutive timeouts occur in a row), it
// switches to a smaller batch. We assume that a database that's degraded is
// likely to stay degraded over a longer term, so after the circuit breaks,
// it stays broken until the program is restarted.
reducedBatchSizeBreaker *circuitbreaker.CircuitBreaker
}

func NewRescuer(archetype *baseservice.Archetype, config *JobRescuerConfig, exec riverdriver.Executor) *JobRescuer {
batchSizes := config.WithDefaults()

return baseservice.Init(archetype, &JobRescuer{
Config: (&JobRescuerConfig{
BatchSizes: batchSizes,
ClientRetryPolicy: config.ClientRetryPolicy,
Interval: cmp.Or(config.Interval, JobRescuerIntervalDefault),
RescueAfter: cmp.Or(config.RescueAfter, JobRescuerRescueAfterDefault),
Schema: config.Schema,
WorkUnitFactoryFunc: config.WorkUnitFactoryFunc,
}).mustValidate(),

batchSize: riversharedmaintenance.BatchSizeDefault,
exec: exec,
exec: exec,
reducedBatchSizeBreaker: riversharedmaintenance.ReducedBatchSizeBreaker(batchSizes),
})
}

Expand Down Expand Up @@ -147,6 +161,13 @@ func (s *JobRescuer) Start(ctx context.Context) error {
return nil
}

func (s *JobRescuer) batchSize() int {
if s.reducedBatchSizeBreaker.Open() {
return s.Config.Reduced
}
return s.Config.Default
}

type rescuerRunOnceResult struct {
NumJobsCancelled int64
NumJobsDiscarded int64
Expand All @@ -163,9 +184,15 @@ func (s *JobRescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error)
for {
stuckJobs, err := s.getStuckJobs(ctx)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
s.reducedBatchSizeBreaker.Trip()
}

return nil, fmt.Errorf("error fetching stuck jobs: %w", err)
}

s.reducedBatchSizeBreaker.ResetIfNotOpen()

s.TestSignals.FetchedBatch.Signal(struct{}{})

now := time.Now().UTC()
Expand Down Expand Up @@ -236,7 +263,7 @@ func (s *JobRescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error)

// Number of rows fetched was less than query `LIMIT` which means work is
// done for this round:
if len(stuckJobs) < s.batchSize {
if len(stuckJobs) < s.batchSize() {
break
}

Expand All @@ -253,7 +280,7 @@ func (s *JobRescuer) getStuckJobs(ctx context.Context) ([]*rivertype.JobRow, err
stuckHorizon := time.Now().Add(-s.Config.RescueAfter)

return s.exec.JobGetStuck(ctx, &riverdriver.JobGetStuckParams{
Max: s.batchSize,
Max: s.batchSize(),
Schema: s.Config.Schema,
StuckHorizon: stuckHorizon,
})
Expand Down
Loading