diff --git a/CHANGELOG.md b/CHANGELOG.md index ca3f11b7..b3872f3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - The unused function `WorkerDefaults.Hooks` has been removed. This is technically a breaking change, but this function was a vestigal refactoring artifact that was never used by anything, so in practice it shouldn't be breaking. [PR #997](https://github.com/riverqueue/river/pull/997). - Periodic job records are upserted immediately through a pilot when a client is started rather than the first time their associated job would run. This doesn't mean they're run immediately (they'll only run if `RunOnStart` is enabled), but rather just tracked immediately. [PR #998](https://github.com/riverqueue/river/pull/998). - The job scheduler still schedules jobs in batches of up to 10,000, but when it encounters a series of consecutive timeouts it assumes that the database is in a degraded state and switches to doing work in a smaller batch size of 1,000 jobs. [PR #1013](https://github.com/riverqueue/river/pull/1013). +- Other maintenance services including the job cleaner, job rescuer, and queue cleaner also prefer a batch size of 10,000, but will fall back to smaller batches of 1,000 on consecutive database timeouts. [PR #1016](https://github.com/riverqueue/river/pull/1016). ### Fixed diff --git a/internal/maintenance/job_cleaner.go b/internal/maintenance/job_cleaner.go index 64d9f80b..e86e9bb2 100644 --- a/internal/maintenance/job_cleaner.go +++ b/internal/maintenance/job_cleaner.go @@ -10,6 +10,7 @@ import ( "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivershared/circuitbreaker" "github.com/riverqueue/river/rivershared/riversharedmaintenance" "github.com/riverqueue/river/rivershared/startstop" "github.com/riverqueue/river/rivershared/testsignal" @@ -29,6 +30,8 @@ func (ts *JobCleanerTestSignals) Init(tb testutil.TestingTB) { } type JobCleanerConfig struct { + riversharedmaintenance.BatchSizes + // CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs // around before they're removed permanently. // @@ -62,6 +65,8 @@ type JobCleanerConfig struct { } func (c *JobCleanerConfig) mustValidate() *JobCleanerConfig { + c.MustValidate() + if c.CancelledJobRetentionPeriod < -1 { panic("JobCleanerConfig.CancelledJobRetentionPeriod must be above zero") } @@ -91,13 +96,23 @@ type JobCleaner struct { Config *JobCleanerConfig TestSignals JobCleanerTestSignals - batchSize int // configurable for test purposes - exec riverdriver.Executor + exec riverdriver.Executor + + // Circuit breaker that tracks consecutive timeout failures from the central + // query. The query starts by using the full/default batch size, but after + // this breaker trips (after N consecutive timeouts occur in a row), it + // switches to a smaller batch. We assume that a database that's degraded is + // likely to stay degraded over a longer term, so after the circuit breaks, + // it stays broken until the program is restarted. + reducedBatchSizeBreaker *circuitbreaker.CircuitBreaker } func NewJobCleaner(archetype *baseservice.Archetype, config *JobCleanerConfig, exec riverdriver.Executor) *JobCleaner { + batchSizes := config.WithDefaults() + return baseservice.Init(archetype, &JobCleaner{ Config: (&JobCleanerConfig{ + BatchSizes: batchSizes, CancelledJobRetentionPeriod: cmp.Or(config.CancelledJobRetentionPeriod, riversharedmaintenance.CancelledJobRetentionPeriodDefault), CompletedJobRetentionPeriod: cmp.Or(config.CompletedJobRetentionPeriod, riversharedmaintenance.CompletedJobRetentionPeriodDefault), DiscardedJobRetentionPeriod: cmp.Or(config.DiscardedJobRetentionPeriod, riversharedmaintenance.DiscardedJobRetentionPeriodDefault), @@ -106,9 +121,8 @@ func NewJobCleaner(archetype *baseservice.Archetype, config *JobCleanerConfig, e Schema: config.Schema, Timeout: cmp.Or(config.Timeout, riversharedmaintenance.JobCleanerTimeoutDefault), }).mustValidate(), - - batchSize: riversharedmaintenance.BatchSizeDefault, - exec: exec, + exec: exec, + reducedBatchSizeBreaker: riversharedmaintenance.ReducedBatchSizeBreaker(batchSizes), }) } @@ -154,6 +168,13 @@ func (s *JobCleaner) Start(ctx context.Context) error { //nolint:dupl return nil } +func (s *JobCleaner) batchSize() int { + if s.reducedBatchSizeBreaker.Open() { + return s.Config.Reduced + } + return s.Config.Default +} + type jobCleanerRunOnceResult struct { NumJobsDeleted int } @@ -182,7 +203,7 @@ func (s *JobCleaner) runOnce(ctx context.Context) (*jobCleanerRunOnceResult, err CompletedFinalizedAtHorizon: time.Now().Add(-s.Config.CompletedJobRetentionPeriod), DiscardedDoDelete: s.Config.DiscardedJobRetentionPeriod != -1, DiscardedFinalizedAtHorizon: time.Now().Add(-s.Config.DiscardedJobRetentionPeriod), - Max: s.batchSize, + Max: s.batchSize(), QueuesExcluded: s.Config.QueuesExcluded, Schema: s.Config.Schema, }) @@ -190,9 +211,15 @@ func (s *JobCleaner) runOnce(ctx context.Context) (*jobCleanerRunOnceResult, err return 0, fmt.Errorf("error cleaning jobs: %w", err) } + s.reducedBatchSizeBreaker.ResetIfNotOpen() + return numDeleted, nil }() if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + s.reducedBatchSizeBreaker.Trip() + } + return nil, err } @@ -200,7 +227,7 @@ func (s *JobCleaner) runOnce(ctx context.Context) (*jobCleanerRunOnceResult, err res.NumJobsDeleted += numDeleted // Deleted was less than query `LIMIT` which means work is done. - if numDeleted < s.batchSize { + if numDeleted < s.batchSize() { break } diff --git a/internal/maintenance/job_cleaner_test.go b/internal/maintenance/job_cleaner_test.go index e431933e..96ba0e1b 100644 --- a/internal/maintenance/job_cleaner_test.go +++ b/internal/maintenance/job_cleaner_test.go @@ -232,11 +232,11 @@ func TestJobCleaner(t *testing.T) { t.Parallel() cleaner, bundle := setup(t) - cleaner.batchSize = 10 // reduced size for test speed + cleaner.Config.Default = 10 // reduced size for test speed // Add one to our chosen batch size to get one extra job and therefore // one extra batch, ensuring that we've tested working multiple. - numJobs := cleaner.batchSize + 1 + numJobs := cleaner.Config.Default + 1 jobs := make([]*rivertype.JobRow, numJobs) @@ -365,4 +365,76 @@ func TestJobCleaner(t *testing.T) { _, err = bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: discardedJob.ID, Schema: cleaner.Config.Schema}) require.ErrorIs(t, err, rivertype.ErrNotFound) }) + + t.Run("ReducedBatchSizeBreakerTrips", func(t *testing.T) { + t.Parallel() + + cleaner, _ := setup(t) + + ctx, cancel := context.WithTimeout(ctx, 1*time.Nanosecond) + defer cancel() + + // Starts at default batch size. + require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize()) + + for range cleaner.reducedBatchSizeBreaker.Limit() - 1 { + _, err := cleaner.runOnce(ctx) + require.Error(t, err) + + // Circuit not broken yet so we stay at default batch size. + require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize()) + } + + _, err := cleaner.runOnce(ctx) + require.Error(t, err) + + // Circuit now broken. Reduced batch size. + require.Equal(t, riversharedmaintenance.BatchSizeReduced, cleaner.batchSize()) + }) + + t.Run("ReducedBatchSizeBreakerResetsOnSuccess", func(t *testing.T) { //nolint:dupl + t.Parallel() + + cleaner, _ := setup(t) + + { + ctx, cancel := context.WithTimeout(ctx, 1*time.Nanosecond) + defer cancel() + + // Starts at default batch size. + require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize()) + + for range cleaner.reducedBatchSizeBreaker.Limit() - 1 { + _, err := cleaner.runOnce(ctx) + require.Error(t, err) + + // Circuit not broken yet so we stay at default batch size. + require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize()) + } + } + + // Context has not been cancelled for this call so it succeeds. + _, err := cleaner.runOnce(ctx) + require.NoError(t, err) + + require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize()) + + // Because of the success above, the circuit breaker resets. N - 1 + // failures are allowed again before it breaks. + { + ctx, cancel := context.WithTimeout(ctx, 1*time.Nanosecond) + defer cancel() + + // Starts at default batch size. + require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize()) + + for range cleaner.reducedBatchSizeBreaker.Limit() - 1 { + _, err := cleaner.runOnce(ctx) + require.Error(t, err) + + // Circuit not broken yet so we stay at default batch size. + require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize()) + } + } + }) } diff --git a/internal/maintenance/job_rescuer.go b/internal/maintenance/job_rescuer.go index 94cafb5d..7befcddf 100644 --- a/internal/maintenance/job_rescuer.go +++ b/internal/maintenance/job_rescuer.go @@ -13,6 +13,7 @@ import ( "github.com/riverqueue/river/internal/workunit" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivershared/circuitbreaker" "github.com/riverqueue/river/rivershared/riversharedmaintenance" "github.com/riverqueue/river/rivershared/startstop" "github.com/riverqueue/river/rivershared/testsignal" @@ -40,6 +41,8 @@ func (ts *JobRescuerTestSignals) Init(tb testutil.TestingTB) { } type JobRescuerConfig struct { + riversharedmaintenance.BatchSizes + // ClientRetryPolicy is the default retry policy to use for workers that don't // override NextRetry. ClientRetryPolicy jobexecutor.ClientRetryPolicy @@ -59,6 +62,8 @@ type JobRescuerConfig struct { } func (c *JobRescuerConfig) mustValidate() *JobRescuerConfig { + c.MustValidate() + if c.ClientRetryPolicy == nil { panic("RescuerConfig.ClientRetryPolicy must be set") } @@ -85,22 +90,31 @@ type JobRescuer struct { Config *JobRescuerConfig TestSignals JobRescuerTestSignals - batchSize int // configurable for test purposes - exec riverdriver.Executor + exec riverdriver.Executor + + // Circuit breaker that tracks consecutive timeout failures from the central + // query. The query starts by using the full/default batch size, but after + // this breaker trips (after N consecutive timeouts occur in a row), it + // switches to a smaller batch. We assume that a database that's degraded is + // likely to stay degraded over a longer term, so after the circuit breaks, + // it stays broken until the program is restarted. + reducedBatchSizeBreaker *circuitbreaker.CircuitBreaker } func NewRescuer(archetype *baseservice.Archetype, config *JobRescuerConfig, exec riverdriver.Executor) *JobRescuer { + batchSizes := config.WithDefaults() + return baseservice.Init(archetype, &JobRescuer{ Config: (&JobRescuerConfig{ + BatchSizes: batchSizes, ClientRetryPolicy: config.ClientRetryPolicy, Interval: cmp.Or(config.Interval, JobRescuerIntervalDefault), RescueAfter: cmp.Or(config.RescueAfter, JobRescuerRescueAfterDefault), Schema: config.Schema, WorkUnitFactoryFunc: config.WorkUnitFactoryFunc, }).mustValidate(), - - batchSize: riversharedmaintenance.BatchSizeDefault, - exec: exec, + exec: exec, + reducedBatchSizeBreaker: riversharedmaintenance.ReducedBatchSizeBreaker(batchSizes), }) } @@ -147,6 +161,13 @@ func (s *JobRescuer) Start(ctx context.Context) error { return nil } +func (s *JobRescuer) batchSize() int { + if s.reducedBatchSizeBreaker.Open() { + return s.Config.Reduced + } + return s.Config.Default +} + type rescuerRunOnceResult struct { NumJobsCancelled int64 NumJobsDiscarded int64 @@ -163,9 +184,15 @@ func (s *JobRescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error) for { stuckJobs, err := s.getStuckJobs(ctx) if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + s.reducedBatchSizeBreaker.Trip() + } + return nil, fmt.Errorf("error fetching stuck jobs: %w", err) } + s.reducedBatchSizeBreaker.ResetIfNotOpen() + s.TestSignals.FetchedBatch.Signal(struct{}{}) now := time.Now().UTC() @@ -236,7 +263,7 @@ func (s *JobRescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error) // Number of rows fetched was less than query `LIMIT` which means work is // done for this round: - if len(stuckJobs) < s.batchSize { + if len(stuckJobs) < s.batchSize() { break } @@ -253,7 +280,7 @@ func (s *JobRescuer) getStuckJobs(ctx context.Context) ([]*rivertype.JobRow, err stuckHorizon := time.Now().Add(-s.Config.RescueAfter) return s.exec.JobGetStuck(ctx, &riverdriver.JobGetStuckParams{ - Max: s.batchSize, + Max: s.batchSize(), Schema: s.Config.Schema, StuckHorizon: stuckHorizon, }) diff --git a/internal/maintenance/job_rescuer_test.go b/internal/maintenance/job_rescuer_test.go index 05ef0d3f..a770e40d 100644 --- a/internal/maintenance/job_rescuer_test.go +++ b/internal/maintenance/job_rescuer_test.go @@ -14,6 +14,7 @@ import ( "github.com/riverqueue/river/riverdbtest" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/riversharedmaintenance" "github.com/riverqueue/river/rivershared/riversharedtest" "github.com/riverqueue/river/rivershared/startstoptest" "github.com/riverqueue/river/rivershared/testfactory" @@ -231,11 +232,11 @@ func TestJobRescuer(t *testing.T) { t.Parallel() rescuer, bundle := setup(t) - rescuer.batchSize = 10 // reduced size for test speed + rescuer.Config.Default = 10 // reduced size for test speed // Add one to our chosen batch size to get one extra job and therefore // one extra batch, ensuring that we've tested working multiple. - numJobs := rescuer.batchSize + 1 + numJobs := rescuer.Config.Default + 1 jobs := make([]*rivertype.JobRow, numJobs) @@ -332,4 +333,76 @@ func TestJobRescuer(t *testing.T) { require.NoError(t, err) require.Equal(t, rivertype.JobStateDiscarded, job2After.State) }) + + t.Run("ReducedBatchSizeBreakerTrips", func(t *testing.T) { + t.Parallel() + + rescuer, _ := setup(t) + + ctx, cancel := context.WithTimeout(ctx, 1*time.Nanosecond) + defer cancel() + + // Starts at default batch size. + require.Equal(t, riversharedmaintenance.BatchSizeDefault, rescuer.batchSize()) + + for range rescuer.reducedBatchSizeBreaker.Limit() - 1 { + _, err := rescuer.runOnce(ctx) + require.Error(t, err) + + // Circuit not broken yet so we stay at default batch size. + require.Equal(t, riversharedmaintenance.BatchSizeDefault, rescuer.batchSize()) + } + + _, err := rescuer.runOnce(ctx) + require.Error(t, err) + + // Circuit now broken. Reduced batch size. + require.Equal(t, riversharedmaintenance.BatchSizeReduced, rescuer.batchSize()) + }) + + t.Run("ReducedBatchSizeBreakerResetsOnSuccess", func(t *testing.T) { //nolint:dupl + t.Parallel() + + rescuer, _ := setup(t) + + { + ctx, cancel := context.WithTimeout(ctx, 1*time.Nanosecond) + defer cancel() + + // Starts at default batch size. + require.Equal(t, riversharedmaintenance.BatchSizeDefault, rescuer.batchSize()) + + for range rescuer.reducedBatchSizeBreaker.Limit() - 1 { + _, err := rescuer.runOnce(ctx) + require.Error(t, err) + + // Circuit not broken yet so we stay at default batch size. + require.Equal(t, riversharedmaintenance.BatchSizeDefault, rescuer.batchSize()) + } + } + + // Context has not been cancelled for this call so it succeeds. + _, err := rescuer.runOnce(ctx) + require.NoError(t, err) + + require.Equal(t, riversharedmaintenance.BatchSizeDefault, rescuer.batchSize()) + + // Because of the success above, the circuit breaker resets. N - 1 + // failures are allowed again before it breaks. + { + ctx, cancel := context.WithTimeout(ctx, 1*time.Nanosecond) + defer cancel() + + // Starts at default batch size. + require.Equal(t, riversharedmaintenance.BatchSizeDefault, rescuer.batchSize()) + + for range rescuer.reducedBatchSizeBreaker.Limit() - 1 { + _, err := rescuer.runOnce(ctx) + require.Error(t, err) + + // Circuit not broken yet so we stay at default batch size. + require.Equal(t, riversharedmaintenance.BatchSizeDefault, rescuer.batchSize()) + } + } + }) } diff --git a/internal/maintenance/job_scheduler.go b/internal/maintenance/job_scheduler.go index 43f04395..16a6c02c 100644 --- a/internal/maintenance/job_scheduler.go +++ b/internal/maintenance/job_scheduler.go @@ -8,9 +8,9 @@ import ( "log/slog" "time" - "github.com/riverqueue/river/internal/circuitbreaker" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivershared/circuitbreaker" "github.com/riverqueue/river/rivershared/riversharedmaintenance" "github.com/riverqueue/river/rivershared/startstop" "github.com/riverqueue/river/rivershared/testsignal" @@ -21,9 +21,7 @@ import ( ) const ( - JobSchedulerBatchSizeReduced = 1_000 - JobSchedulerBatchSizeDefault = 10_000 - JobSchedulerIntervalDefault = 5 * time.Second + JobSchedulerIntervalDefault = 5 * time.Second ) // Test-only properties. @@ -42,20 +40,12 @@ func (ts *JobSchedulerTestSignals) Init(tb testutil.TestingTB) { type NotifyInsertFunc func(ctx context.Context, tx riverdriver.ExecutorTx, queues []string) error type JobSchedulerConfig struct { + riversharedmaintenance.BatchSizes + // Interval is the amount of time between periodic checks for jobs to // be moved from "scheduled" to "available". Interval time.Duration - // BatchSizeDefault is the maximum number of jobs to transition at once from - // "scheduled" to "available" during periodic scheduling checks. - BatchSizeDefault int - - // BatchSizeReduced is a considerably smaller batch size that the service - // uses after encountering 3 consecutive timeouts in a row. The idea behind - // this is that if it appears the database is degraded, then we start doing - // less work in the hope that it can better succeed. - BatchSizeReduced int - // NotifyInsert is a function to call to emit notifications for queues // where jobs were scheduled. NotifyInsert NotifyInsertFunc @@ -66,10 +56,12 @@ type JobSchedulerConfig struct { } func (c *JobSchedulerConfig) mustValidate() *JobSchedulerConfig { + c.MustValidate() + if c.Interval <= 0 { panic("SchedulerConfig.Interval must be above zero") } - if c.BatchSizeDefault <= 0 { + if c.Default <= 0 { panic("SchedulerConfig.Limit must be above zero") } @@ -89,29 +81,27 @@ type JobScheduler struct { config *JobSchedulerConfig exec riverdriver.Executor - // Circuit breaker that tracks consecutive timeout failures from the - // scheduling query. The query starts by using the full/default batch size, - // but after this breaker trips (after N consecutive timeouts occur in a - // row), it switches to a smaller batch. We assume that a database that's - // degraded is likely to stay degraded over a longer term, so after the - // circuit breaks, it stays broken until the program is restarted. + // Circuit breaker that tracks consecutive timeout failures from the central + // query. The query starts by using the full/default batch size, but after + // this breaker trips (after N consecutive timeouts occur in a row), it + // switches to a smaller batch. We assume that a database that's degraded is + // likely to stay degraded over a longer term, so after the circuit breaks, + // it stays broken until the program is restarted. reducedBatchSizeBreaker *circuitbreaker.CircuitBreaker } func NewJobScheduler(archetype *baseservice.Archetype, config *JobSchedulerConfig, exec riverdriver.Executor) *JobScheduler { + batchSizes := config.WithDefaults() + return baseservice.Init(archetype, &JobScheduler{ - reducedBatchSizeBreaker: circuitbreaker.NewCircuitBreaker(&circuitbreaker.CircuitBreakerOptions{ - Limit: 3, - Window: 10 * time.Minute, - }), config: (&JobSchedulerConfig{ - BatchSizeDefault: cmp.Or(config.BatchSizeDefault, JobSchedulerBatchSizeDefault), - BatchSizeReduced: cmp.Or(config.BatchSizeReduced, JobSchedulerBatchSizeReduced), - Interval: cmp.Or(config.Interval, JobSchedulerIntervalDefault), - NotifyInsert: config.NotifyInsert, - Schema: config.Schema, + BatchSizes: batchSizes, + Interval: cmp.Or(config.Interval, JobSchedulerIntervalDefault), + NotifyInsert: config.NotifyInsert, + Schema: config.Schema, }).mustValidate(), - exec: exec, + exec: exec, + reducedBatchSizeBreaker: riversharedmaintenance.ReducedBatchSizeBreaker(batchSizes), }) } @@ -159,9 +149,9 @@ func (s *JobScheduler) Start(ctx context.Context) error { //nolint:dupl func (s *JobScheduler) batchSize() int { if s.reducedBatchSizeBreaker.Open() { - return s.config.BatchSizeReduced + return s.config.Reduced } - return s.config.BatchSizeDefault + return s.config.Default } type schedulerRunOnceResult struct { @@ -234,7 +224,7 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er res.NumCompletedJobsScheduled += numScheduled // Scheduled was less than query `LIMIT` which means work is done. - if numScheduled < s.config.BatchSizeDefault { + if numScheduled < s.batchSize() { break } diff --git a/internal/maintenance/job_scheduler_test.go b/internal/maintenance/job_scheduler_test.go index 16d6b136..25388042 100644 --- a/internal/maintenance/job_scheduler_test.go +++ b/internal/maintenance/job_scheduler_test.go @@ -12,6 +12,7 @@ import ( "github.com/riverqueue/river/riverdbtest" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/riversharedmaintenance" "github.com/riverqueue/river/rivershared/riversharedtest" "github.com/riverqueue/river/rivershared/startstoptest" "github.com/riverqueue/river/rivershared/testfactory" @@ -100,7 +101,8 @@ func TestJobScheduler(t *testing.T) { scheduler := NewJobScheduler(riversharedtest.BaseServiceArchetype(t), &JobSchedulerConfig{}, nil) require.Equal(t, JobSchedulerIntervalDefault, scheduler.config.Interval) - require.Equal(t, JobSchedulerBatchSizeDefault, scheduler.config.BatchSizeDefault) + require.Equal(t, riversharedmaintenance.BatchSizeDefault, scheduler.config.Default) + require.Equal(t, riversharedmaintenance.BatchSizeReduced, scheduler.config.Reduced) }) t.Run("StartStopStress", func(t *testing.T) { @@ -203,13 +205,13 @@ func TestJobScheduler(t *testing.T) { t.Parallel() scheduler, bundle := setupTx(t) - scheduler.config.BatchSizeDefault = 10 // reduced size for test speed + scheduler.config.Default = 10 // reduced size for test speed now := time.Now().UTC() // Add one to our chosen batch size to get one extra job and therefore // one extra batch, ensuring that we've tested working multiple. - numJobs := scheduler.config.BatchSizeDefault + 1 + numJobs := scheduler.config.Default + 1 jobs := make([]*rivertype.JobRow, numJobs) @@ -375,24 +377,24 @@ func TestJobScheduler(t *testing.T) { defer cancel() // Starts at default batch size. - require.Equal(t, JobSchedulerBatchSizeDefault, scheduler.batchSize()) + require.Equal(t, riversharedmaintenance.BatchSizeDefault, scheduler.batchSize()) for range scheduler.reducedBatchSizeBreaker.Limit() - 1 { _, err := scheduler.runOnce(ctx) require.Error(t, err) // Circuit not broken yet so we stay at default batch size. - require.Equal(t, JobSchedulerBatchSizeDefault, scheduler.batchSize()) + require.Equal(t, riversharedmaintenance.BatchSizeDefault, scheduler.batchSize()) } _, err := scheduler.runOnce(ctx) require.Error(t, err) // Circuit now broken. Reduced batch size. - require.Equal(t, JobSchedulerBatchSizeReduced, scheduler.batchSize()) + require.Equal(t, riversharedmaintenance.BatchSizeReduced, scheduler.batchSize()) }) - t.Run("ReducedBatchSizeBreakerResetsOnSuccess", func(t *testing.T) { + t.Run("ReducedBatchSizeBreakerResetsOnSuccess", func(t *testing.T) { //nolint:dupl t.Parallel() scheduler, _ := setupTx(t) @@ -402,14 +404,14 @@ func TestJobScheduler(t *testing.T) { defer cancel() // Starts at default batch size. - require.Equal(t, JobSchedulerBatchSizeDefault, scheduler.batchSize()) + require.Equal(t, riversharedmaintenance.BatchSizeDefault, scheduler.batchSize()) for range scheduler.reducedBatchSizeBreaker.Limit() - 1 { _, err := scheduler.runOnce(ctx) require.Error(t, err) // Circuit not broken yet so we stay at default batch size. - require.Equal(t, JobSchedulerBatchSizeDefault, scheduler.batchSize()) + require.Equal(t, riversharedmaintenance.BatchSizeDefault, scheduler.batchSize()) } } @@ -417,7 +419,7 @@ func TestJobScheduler(t *testing.T) { _, err := scheduler.runOnce(ctx) require.NoError(t, err) - require.Equal(t, JobSchedulerBatchSizeDefault, scheduler.batchSize()) + require.Equal(t, riversharedmaintenance.BatchSizeDefault, scheduler.batchSize()) // Because of the success above, the circuit breaker resets. N - 1 // failures are allowed again before it breaks. @@ -426,14 +428,14 @@ func TestJobScheduler(t *testing.T) { defer cancel() // Starts at default batch size. - require.Equal(t, JobSchedulerBatchSizeDefault, scheduler.batchSize()) + require.Equal(t, riversharedmaintenance.BatchSizeDefault, scheduler.batchSize()) for range scheduler.reducedBatchSizeBreaker.Limit() - 1 { _, err := scheduler.runOnce(ctx) require.Error(t, err) // Circuit not broken yet so we stay at default batch size. - require.Equal(t, JobSchedulerBatchSizeDefault, scheduler.batchSize()) + require.Equal(t, riversharedmaintenance.BatchSizeDefault, scheduler.batchSize()) } } }) diff --git a/internal/maintenance/queue_cleaner.go b/internal/maintenance/queue_cleaner.go index 258e200f..3eef3a1f 100644 --- a/internal/maintenance/queue_cleaner.go +++ b/internal/maintenance/queue_cleaner.go @@ -11,6 +11,7 @@ import ( "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivershared/circuitbreaker" "github.com/riverqueue/river/rivershared/riversharedmaintenance" "github.com/riverqueue/river/rivershared/startstop" "github.com/riverqueue/river/rivershared/testsignal" @@ -35,6 +36,8 @@ func (ts *QueueCleanerTestSignals) Init(tb testutil.TestingTB) { } type QueueCleanerConfig struct { + riversharedmaintenance.BatchSizes + // Interval is the amount of time to wait between runs of the cleaner. Interval time.Duration @@ -48,6 +51,8 @@ type QueueCleanerConfig struct { } func (c *QueueCleanerConfig) mustValidate() *QueueCleanerConfig { + c.MustValidate() + if c.Interval <= 0 { panic("QueueCleanerConfig.Interval must be above zero") } @@ -68,20 +73,29 @@ type QueueCleaner struct { Config *QueueCleanerConfig TestSignals QueueCleanerTestSignals - batchSize int // configurable for test purposes - exec riverdriver.Executor + exec riverdriver.Executor + + // Circuit breaker that tracks consecutive timeout failures from the central + // query. The query starts by using the full/default batch size, but after + // this breaker trips (after N consecutive timeouts occur in a row), it + // switches to a smaller batch. We assume that a database that's degraded is + // likely to stay degraded over a longer term, so after the circuit breaks, + // it stays broken until the program is restarted. + reducedBatchSizeBreaker *circuitbreaker.CircuitBreaker } func NewQueueCleaner(archetype *baseservice.Archetype, config *QueueCleanerConfig, exec riverdriver.Executor) *QueueCleaner { + batchSizes := config.WithDefaults() + return baseservice.Init(archetype, &QueueCleaner{ Config: (&QueueCleanerConfig{ + BatchSizes: batchSizes, Interval: cmp.Or(config.Interval, queueCleanerIntervalDefault), RetentionPeriod: cmp.Or(config.RetentionPeriod, QueueRetentionPeriodDefault), Schema: config.Schema, }).mustValidate(), - - batchSize: riversharedmaintenance.BatchSizeDefault, - exec: exec, + exec: exec, + reducedBatchSizeBreaker: riversharedmaintenance.ReducedBatchSizeBreaker(batchSizes), }) } @@ -127,6 +141,13 @@ func (s *QueueCleaner) Start(ctx context.Context) error { return nil } +func (s *QueueCleaner) batchSize() int { + if s.reducedBatchSizeBreaker.Open() { + return s.Config.Reduced + } + return s.Config.Default +} + type queueCleanerRunOnceResult struct { QueuesDeleted []string } @@ -141,7 +162,7 @@ func (s *QueueCleaner) runOnce(ctx context.Context) (*queueCleanerRunOnceResult, defer cancelFunc() queuesDeleted, err := s.exec.QueueDeleteExpired(ctx, &riverdriver.QueueDeleteExpiredParams{ - Max: s.batchSize, + Max: s.batchSize(), Schema: s.Config.Schema, UpdatedAtHorizon: time.Now().Add(-s.Config.RetentionPeriod), }) @@ -149,9 +170,15 @@ func (s *QueueCleaner) runOnce(ctx context.Context) (*queueCleanerRunOnceResult, return nil, fmt.Errorf("error deleting expired queues: %w", err) } + s.reducedBatchSizeBreaker.ResetIfNotOpen() + return queuesDeleted, nil }() if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + s.reducedBatchSizeBreaker.Trip() + } + return nil, err } @@ -159,7 +186,7 @@ func (s *QueueCleaner) runOnce(ctx context.Context) (*queueCleanerRunOnceResult, res.QueuesDeleted = append(res.QueuesDeleted, queuesDeleted...) // Deleted was less than query `LIMIT` which means work is done. - if len(queuesDeleted) < s.batchSize { + if len(queuesDeleted) < s.batchSize() { break } diff --git a/internal/maintenance/queue_cleaner_test.go b/internal/maintenance/queue_cleaner_test.go index 8c88340c..06058e95 100644 --- a/internal/maintenance/queue_cleaner_test.go +++ b/internal/maintenance/queue_cleaner_test.go @@ -11,6 +11,7 @@ import ( "github.com/riverqueue/river/riverdbtest" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/riversharedmaintenance" "github.com/riverqueue/river/rivershared/riversharedtest" "github.com/riverqueue/river/rivershared/startstoptest" "github.com/riverqueue/river/rivershared/testfactory" @@ -122,11 +123,11 @@ func TestQueueCleaner(t *testing.T) { t.Parallel() cleaner, bundle := setup(t) - cleaner.batchSize = 10 // reduced size for test speed + cleaner.Config.Default = 10 // reduced size for test speed // Add one to our chosen batch size to get one extra job and therefore // one extra batch, ensuring that we've tested working multiple. - numQueues := cleaner.batchSize + 1 + numQueues := cleaner.Config.Default + 1 queues := make([]*rivertype.Queue, numQueues) @@ -229,4 +230,76 @@ func TestQueueCleaner(t *testing.T) { }) require.ErrorIs(t, err, rivertype.ErrNotFound) }) + + t.Run("ReducedBatchSizeBreakerTrips", func(t *testing.T) { + t.Parallel() + + cleaner, _ := setup(t) + + ctx, cancel := context.WithTimeout(ctx, 1*time.Nanosecond) + defer cancel() + + // Starts at default batch size. + require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize()) + + for range cleaner.reducedBatchSizeBreaker.Limit() - 1 { + _, err := cleaner.runOnce(ctx) + require.Error(t, err) + + // Circuit not broken yet so we stay at default batch size. + require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize()) + } + + _, err := cleaner.runOnce(ctx) + require.Error(t, err) + + // Circuit now broken. Reduced batch size. + require.Equal(t, riversharedmaintenance.BatchSizeReduced, cleaner.batchSize()) + }) + + t.Run("ReducedBatchSizeBreakerResetsOnSuccess", func(t *testing.T) { //nolint:dupl + t.Parallel() + + cleaner, _ := setup(t) + + { + ctx, cancel := context.WithTimeout(ctx, 1*time.Nanosecond) + defer cancel() + + // Starts at default batch size. + require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize()) + + for range cleaner.reducedBatchSizeBreaker.Limit() - 1 { + _, err := cleaner.runOnce(ctx) + require.Error(t, err) + + // Circuit not broken yet so we stay at default batch size. + require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize()) + } + } + + // Context has not been cancelled for this call so it succeeds. + _, err := cleaner.runOnce(ctx) + require.NoError(t, err) + + require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize()) + + // Because of the success above, the circuit breaker resets. N - 1 + // failures are allowed again before it breaks. + { + ctx, cancel := context.WithTimeout(ctx, 1*time.Nanosecond) + defer cancel() + + // Starts at default batch size. + require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize()) + + for range cleaner.reducedBatchSizeBreaker.Limit() - 1 { + _, err := cleaner.runOnce(ctx) + require.Error(t, err) + + // Circuit not broken yet so we stay at default batch size. + require.Equal(t, riversharedmaintenance.BatchSizeDefault, cleaner.batchSize()) + } + } + }) } diff --git a/internal/maintenance/reindexer.go b/internal/maintenance/reindexer.go index a770ab03..0728bdda 100644 --- a/internal/maintenance/reindexer.go +++ b/internal/maintenance/reindexer.go @@ -84,7 +84,6 @@ type Reindexer struct { Config *ReindexerConfig TestSignals ReindexerTestSignals - batchSize int64 // configurable for test purposes exec riverdriver.Executor // driver executor skipReindexArtifactCheck bool // lets the reindex artifact check be skipped for test purposes } @@ -108,8 +107,7 @@ func NewReindexer(archetype *baseservice.Archetype, config *ReindexerConfig, exe Timeout: cmp.Or(config.Timeout, ReindexerTimeoutDefault), }).mustValidate(), - batchSize: riversharedmaintenance.BatchSizeDefault, - exec: exec, + exec: exec, }) } diff --git a/internal/circuitbreaker/circuit_breaker.go b/rivershared/circuitbreaker/circuit_breaker.go similarity index 100% rename from internal/circuitbreaker/circuit_breaker.go rename to rivershared/circuitbreaker/circuit_breaker.go diff --git a/internal/circuitbreaker/circuit_breaker_test.go b/rivershared/circuitbreaker/circuit_breaker_test.go similarity index 100% rename from internal/circuitbreaker/circuit_breaker_test.go rename to rivershared/circuitbreaker/circuit_breaker_test.go diff --git a/rivershared/riversharedmaintenance/river_shared_maintenance.go b/rivershared/riversharedmaintenance/river_shared_maintenance.go index b87299b5..39a48347 100644 --- a/rivershared/riversharedmaintenance/river_shared_maintenance.go +++ b/rivershared/riversharedmaintenance/river_shared_maintenance.go @@ -1,10 +1,12 @@ package riversharedmaintenance import ( + "cmp" "context" "time" "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivershared/circuitbreaker" "github.com/riverqueue/river/rivershared/util/randutil" "github.com/riverqueue/river/rivershared/util/serviceutil" ) @@ -15,17 +17,6 @@ const ( BatchBackoffMax = 1 * time.Second BatchBackoffMin = 50 * time.Millisecond - // Bulk maintenance tasks like job removal operate in batches so that even - // in the event of an enormous backlog of work to do, transactions stay - // relatively short and aren't at risk of cancellation. This number is the - // batch size, or the number of rows that are handled at a time. - // - // The specific value is somewhat arbitrary as large enough to make good - // progress, but not so large as to make the operation overstay its welcome. - // For now it's not configurable because we can likely pick a number that's - // suitable for almost everyone. - BatchSizeDefault = 1_000 - LogPrefixRanSuccessfully = ": Ran successfully" LogPrefixRunLoopStarted = ": Run loop started" LogPrefixRunLoopStopped = ": Run loop stopped" @@ -41,6 +32,79 @@ const ( JobCleanerTimeoutDefault = 30 * time.Second ) +const ( + // BatchSizeDefault is the default batch size of most maintenance services. + // + // Bulk maintenance tasks like job removal operate in batches so that even + // in the event of an enormous backlog of work to do, transactions stay + // relatively short and aren't at risk of cancellation. This number is the + // batch size, or the number of rows that are handled at a time. + // + // The specific value is somewhat arbitrary as large enough to make good + // progress, but not so large as to make the operation overstay its welcome. + // For now it's not configurable because we can likely pick a number that's + // suitable for almost everyone. + // + // In case database degradation is detected, most maintenance services will + // back off to use the smaller batch size BatchSizeReduced. + BatchSizeDefault = 10_000 + + // BatchSizeReduced is the reduced batch size of most maintenance services. + // + // Services start out with a batch size of BatchSizeDefault, but as they + // detect a degraded database may switch to BatchSizeReduced instead so that + // they're trying to do less work per operation. + BatchSizeReduced = 1_000 +) + +// BatchSizes containing batch size information for maintenance services. It's +// mean to be embedded on each service's configuration struct so as to provide a +// common way of organizing and initializing batch sizes for improved +// succinctness. +type BatchSizes struct { + // Default is the maximum number of jobs to transition at once from + // "scheduled" to "available" during periodic scheduling checks. + Default int + + // Reduced is a considerably smaller batch size that the service + // uses after encountering 3 consecutive timeouts in a row. The idea behind + // this is that if it appears the database is degraded, then we start doing + // less work in the hope that it can better succeed. + Reduced int +} + +// MustValidate validates the struct and panics in case a value is invalid. +func (b BatchSizes) MustValidate() BatchSizes { + if b.Default <= 0 { + panic("BatchSizes.Default must be above zero") + } + if b.Reduced <= 0 { + panic("BatchSizes.Reduced must be above zero") + } + return b +} + +// WithDefaults returns the struct with any configuration overrides that were +// already set, but sets defaults for any that were zero values. +func (b BatchSizes) WithDefaults() BatchSizes { + return BatchSizes{ + Default: cmp.Or(b.Default, BatchSizeDefault), + Reduced: cmp.Or(b.Reduced, BatchSizeReduced), + } +} + +// ReducedBatchSizeBreaker returns a reduced batch circuit breaker suitable for +// use in most maintenance services. After being tripped three consecutive times +// inside a ten minute window it switches to a reduced batch size. A success at +// any point between failures will reset it, but after the circuit breaker has +// tripped, it stays tripped for the life time of the program. +func ReducedBatchSizeBreaker(batchSizes BatchSizes) *circuitbreaker.CircuitBreaker { + return circuitbreaker.NewCircuitBreaker(&circuitbreaker.CircuitBreakerOptions{ + Limit: 3, + Window: 10 * time.Minute, + }) +} + // QueueMaintainerServiceBase is a struct that should be embedded on all queue // maintainer services. Its main use is to provide a StaggerStart function that // should be called on service start to avoid thundering herd problems.