diff --git a/doc.go b/doc.go index dfcf6bc1..bc0a1dc9 100644 --- a/doc.go +++ b/doc.go @@ -1,169 +1,154 @@ /* -Package river is a robust high-performance job processing system for Go. +Package river is a robust high-performance job processing system for Go and +Postgres. -Because it is built using Postgres, River enables you to use the same database -for both your application data and your job queue. This simplifies operations, -but perhaps more importantly it makes it possible to enqueue jobs -transactionally with other database changes. This avoids a whole class of -distributed systems issues like jobs that execute before the database -transaction that enqueued them has even committed, or jobs that attempt to -utilize database changes which were rolled back. It also makes it possible for -your job to make database changes atomically with the job being marked as -complete. +See [homepage], [docs], and [godoc]. -# Job args +Being built for Postgres, River encourages the use of the same database for +application data and job queue. By enqueueing jobs transactionally along with +other database changes, whole classes of distributed systems problems are +avoided. Jobs are guaranteed to be enqueued if their transaction commits, are +removed if their transaction rolls back, and aren't visible for work _until_ +commit. See [transactional enqueueing] for more background on this philosophy. -Jobs need to be able to serialize their state to JSON so that they can round -tripped from the database and back. Each job has an args struct with JSON tags -on its properties to allow for this: +# Job args and workers + +Jobs are defined in struct pairs, with an implementation of [`JobArgs`] and one +of [`Worker`]. + +Job args contain `json` annotations and define how jobs are serialized to and +from the database, along with a "kind", a stable string that uniquely identifies +the job. - // SortArgs are arguments for SortWorker. type SortArgs struct { // Strings is a slice of strings to sort. Strings []string `json:"strings"` } - func (SortArgs) Kind() string { return "sort_job" } - -Args are created to enqueue a new job and are what a worker receives to work -one. Each one implements [JobArgs].Kind, which returns a unique string that's -used to recognize the job as it round trips from the database. - -# Job workers + func (SortArgs) Kind() string { return "sort" } -Each job kind also has a corresponding worker struct where its core work -function is defined: +Workers expose a `Work` function that dictates how jobs run. - // SortWorker is a job worker for sorting strings. type SortWorker struct { - river.WorkerDefaults[SortArgs] + // An embedded WorkerDefaults sets up default methods to fulfill the rest of + // the Worker interface: + river.WorkerDefaults[SortArgs] } func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error { - sort.Strings(job.Args.Strings) - fmt.Printf("Sorted strings: %+v\n", job.Args.Strings) - return nil + sort.Strings(job.Args.Strings) + fmt.Printf("Sorted strings: %+v\n", job.Args.Strings) + return nil } -A few details to notice: - - - Although not strictly necessary, workers embed [WorkerDefaults] with a - reference to their args type. This allows them to inherit defaults for the - [Worker] interface, and helps futureproof in case its ever expanded. - - - Each worker implements [Worker].Work, which is where the async heavy-lifting - for a background job is done. Work implementations receive a generic like - river.Job[SortArgs] for easy access to job arguments. - # Registering workers -As a program is initially starting up, worker structs are registered so that -River can know how to work them: +Jobs are uniquely identified by their "kind" string. Workers are registered on +start up so that River knows how to assign jobs to workers: workers := river.NewWorkers() + // AddWorker panics if the worker is already registered or invalid: river.AddWorker(workers, &SortWorker{}) -# River client +# Starting a client -The main River client takes a [pgx] connection pool wrapped with River's Pgx v5 -driver using [riverpgxv5.New] and a set of registered workers (see above). Each -queue can receive configuration like the maximum number of goroutines that'll be -used to work it: +A River [`Client`] provides an interface for job insertion and manages job +processing and [maintenance services]. A client's created with a database pool, +[driver], and config struct containing a `Workers` bundle and other settings. +Here's a client `Client` working one queue (`"default"`) with up to 100 worker +goroutines at a time: - dbConfig, err := pgxpool.ParseConfig("postgres://localhost/river") - if err != nil { - return err - } + riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ + Queues: map[string]river.QueueConfig{ + river.DefaultQueue: {MaxWorkers: 100}, + }, + Workers: workers, + }) - dbPool, err := pgxpool.NewWithConfig(ctx, dbConfig) if err != nil { - return err + panic(err) } - defer dbPool.Close() - - riverClient, err := river.NewClient(&river.Config{ - Driver: riverpgxv5.New(dbPool), - Queues: map[string]river.QueueConfig{ - river.DefaultQueue: {MaxWorkers: 100}, - }, - Workers: workers, - }) + // Run the client inline. All executed jobs will inherit from ctx: if err := riverClient.Start(ctx); err != nil { - ... + panic(err) } - ... +## Stopping - // Before program exit, try to shut down cleanly. - if err := riverClient.Shutdown(ctx); err != nil { - return err - } - -For programs that'll be inserting jobs only, the Queues and Workers -configuration keys can be omitted for brevity: - - riverClient, err := river.NewClient(&river.Config{ - DBPool: dbPool, - }) +The client should also be stopped on program shutdown: -However, if Workers is specified, the client can validate that an inserted job -has a worker that's registered with the workers bundle, so it's recommended that -Workers is configured anyway if your project is set up to easily allow it. + // Stop fetching new work and wait for active jobs to finish. + if err := riverClient.Stop(ctx); err != nil { + panic(err) + } -See [Config] for details on all configuration options. +There are some complexities around ensuring clients stop cleanly, but also in a +timely manner. See [graceful shutdown] for more details on River's stop modes. # Inserting jobs -Insert jobs by opening a transaction and calling [Client.InsertTx] with a job -args instance (a non-transactional [Client.Insert] is also available) and the -transaction wrapped with [riverpgxv5Tx]: - - tx, err := dbPool.Begin(ctx) - if err != nil { - return err - } - defer tx.Rollback(ctx) +[`Client.InsertTx`] is used in conjunction with an instance of job args to +insert a job to work on a transaction: _, err = riverClient.InsertTx(ctx, tx, SortArgs{ - Strings: []string{ - "whale", "tiger", "bear", - }, + Strings: []string{ + "whale", "tiger", "bear", + }, }, nil) + if err != nil { - return err + panic(err) } - if err := tx.Commit(ctx); err != nil { - return err - } +See the [`InsertAndWork` example] for complete code. + +# Other features -Due to rules around transaction visibility, inserted jobs aren't visible to -workers until the transaction that inserted them is committed. This prevents -a whole host of problems like workers trying to work a job before its viable to -do so because not all its requisite data has been persisted yet. + - [Batch job insertion] for efficiently inserting many jobs at once using + Postgres `COPY FROM`. -See the InsertAndWork example for all this code in one place. + - [Cancelling jobs] from inside a work function. -# Other features + - [Error and panic handling]. - - Periodic jobs that run on a predefined interval. See the PeriodicJob example - below. + - [Periodic and cron jobs]. -# Verifying inserted jobs + - [Snoozing jobs] from inside a work function. -See the rivertest package for test helpers that can be used to easily verified -inserted jobs in a test suite. For example: + - [Subscriptions] to queue activity and statistics, providing easy hooks for + telemetry like logging and metrics. - job := rivertest.RequireInserted(ctx, t, dbPool, &RequiredArgs{}, nil) - fmt.Printf("Test passed with message: %s\n", job.Args.Message) + - [Transactional job completion] to guarantee job completion commits with + other changes in a transaction. -[pgx]: https://github.com/jackc/pgx -*/ -package river + - [Unique jobs] by args, period, queue, and state. -import "github.com/riverqueue/river/riverdriver/riverpgxv5" + - [Work functions] for simplified worker implementation. -// This is really dumb, but the package must be imported to make it linkable the -// Godoc above, so this is a trivial import to make sure it is. -var _ = riverpgxv5.New(nil) +See also [developing River]. + +[`Client`]: https://pkg.go.dev/github.com/riverqueue/river#Client +[`Client.InsertTx`]: https://pkg.go.dev/github.com/riverqueue/river#Client.InsertTx +[`InsertAndWork` example]: https://pkg.go.dev/github.com/riverqueue/river#example-package-CustomInsertOpts +[`JobArgs`]: https://pkg.go.dev/github.com/riverqueue/river#JobArgs +[`Worker`]: https://pkg.go.dev/github.com/riverqueue/river#Worker +[Batch job insertion]: https://riverqueue.com/docs/batch-job-insertion +[Cancelling jobs]: https://riverqueue.com/docs/cancelling-jobs +[Error and panic handling]: https://riverqueue.com/docs/error-handling +[Periodic and cron jobs]: https://riverqueue.com/docs/periodic-jobs +[Snoozing jobs]: https://riverqueue.com/docs/snoozing-jobs +[Subscriptions]: https://riverqueue.com/docs/subscriptions +[Transactional job completion]: https://riverqueue.com/docs/transactional-job-completion +[Unique jobs]: https://riverqueue.com/docs/unique-jobs +[Work functions]: https://riverqueue.com/docs/work-functions +[docs]: https://riverqueue.com/docs +[developing River]: https://github.com/riverqueue/river/blob/master/docs/development.md +[driver]: https://riverqueue.com/docs/database-drivers +[godoc]: https://pkg.go.dev/github.com/riverqueue/river +[graceful shutdown]: https://riverqueue.com/docs/graceful-shutdown +[homepage]: https://riverqueue.com +[maintenance services]: https://riverqueue.com/docs/maintenance-services +[transactional enqueueing]: https://riverqueue.com/docs/transactional-enqueueing +*/ +package river diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 00000000..1bf788e5 --- /dev/null +++ b/docs/README.md @@ -0,0 +1,164 @@ +# River [![Build Status](https://github.com/riverqueue/river/workflows/CI/badge.svg)](https://github.com/riverqueue/river/actions) [![Go Reference](https://pkg.go.dev/badge/github.com/riverqueue/river.svg)](https://pkg.go.dev/github.com/riverqueue/river) + +River is a robust high-performance job processing system for Go and Postgres. + +See [homepage], [docs], and [godoc]. + +Being built for Postgres, River encourages the use of the same database for +application data and job queue. By enqueueing jobs transactionally along with +other database changes, whole classes of distributed systems problems are +avoided. Jobs are guaranteed to be enqueued if their transaction commits, are +removed if their transaction rolls back, and aren't visible for work _until_ +commit. See [transactional enqueueing] for more background on this philosophy. + +## Job args and workers + +Jobs are defined in struct pairs, with an implementation of [`JobArgs`] and one +of [`Worker`]. + +Job args contain `json` annotations and define how jobs are serialized to and +from the database, along with a "kind", a stable string that uniquely identifies +the job. + +```go +type SortArgs struct { + // Strings is a slice of strings to sort. + Strings []string `json:"strings"` +} + +func (SortArgs) Kind() string { return "sort" } +``` + +Workers expose a `Work` function that dictates how jobs run. + +```go +type SortWorker struct { + // An embedded WorkerDefaults sets up default methods to fulfill the rest of + // the Worker interface: + river.WorkerDefaults[SortArgs] +} + +func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error { + sort.Strings(job.Args.Strings) + fmt.Printf("Sorted strings: %+v\n", job.Args.Strings) + return nil +} +``` + +## Registering workers + +Jobs are uniquely identified by their "kind" string. Workers are registered on +start up so that River knows how to assign jobs to workers: + +```go +workers := river.NewWorkers() +// AddWorker panics if the worker is already registered or invalid: +river.AddWorker(workers, &SortWorker{}) +``` + +## Starting a client + +A River [`Client`] provides an interface for job insertion and manages job +processing and [maintenance services]. A client's created with a database pool, +[driver], and config struct containing a `Workers` bundle and other settings. +Here's a client `Client` working one queue (`"default"`) with up to 100 worker +goroutines at a time: + +```go +riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ + Queues: map[string]river.QueueConfig{ + river.DefaultQueue: {MaxWorkers: 100}, + }, + Workers: workers, +}) + +if err != nil { + panic(err) +} + +// Run the client inline. All executed jobs will inherit from ctx: +if err := riverClient.Start(ctx); err != nil { + panic(err) +} +``` + +### Stopping + +The client should also be stopped on program shutdown: + +```go +// Stop fetching new work and wait for active jobs to finish. +if err := riverClient.Stop(ctx); err != nil { + panic(err) +} +``` + +There are some complexities around ensuring clients stop cleanly, but also in a +timely manner. See [graceful shutdown] for more details on River's stop modes. + +## Inserting jobs + +[`Client.InsertTx`] is used in conjunction with an instance of job args to +insert a job to work on a transaction: + +```go +_, err = riverClient.InsertTx(ctx, tx, SortArgs{ + Strings: []string{ + "whale", "tiger", "bear", + }, +}, nil) + +if err != nil { + panic(err) +} +``` + +See the [`InsertAndWork` example] for complete code. + +## Other features + + - [Batch job insertion] for efficiently inserting many jobs at once using + Postgres `COPY FROM`. + + - [Cancelling jobs] from inside a work function. + + - [Error and panic handling]. + + - [Periodic and cron jobs]. + + - [Snoozing jobs] from inside a work function. + + - [Subscriptions] to queue activity and statistics, providing easy hooks for + telemetry like logging and metrics. + + - [Transactional job completion] to guarantee job completion commits with + other changes in a transaction. + + - [Unique jobs] by args, period, queue, and state. + + - [Work functions] for simplified worker implementation. + +See also [developing River]. + +[`Client`]: https://pkg.go.dev/github.com/riverqueue/river#Client +[`Client.InsertTx`]: https://pkg.go.dev/github.com/riverqueue/river#Client.InsertTx +[`InsertAndWork` example]: https://pkg.go.dev/github.com/riverqueue/river#example-package-CustomInsertOpts +[`JobArgs`]: https://pkg.go.dev/github.com/riverqueue/river#JobArgs +[`Worker`]: https://pkg.go.dev/github.com/riverqueue/river#Worker +[Batch job insertion]: https://riverqueue.com/docs/batch-job-insertion +[Cancelling jobs]: https://riverqueue.com/docs/cancelling-jobs +[Error and panic handling]: https://riverqueue.com/docs/error-handling +[Periodic and cron jobs]: https://riverqueue.com/docs/periodic-jobs +[Snoozing jobs]: https://riverqueue.com/docs/snoozing-jobs +[Subscriptions]: https://riverqueue.com/docs/subscriptions +[Transactional job completion]: https://riverqueue.com/docs/transactional-job-completion +[Unique jobs]: https://riverqueue.com/docs/unique-jobs +[Work functions]: https://riverqueue.com/docs/work-functions +[developing River]: https://github.com/riverqueue/river/blob/master/docs/development.md +[docs]: https://riverqueue.com/docs +[driver]: https://riverqueue.com/docs/database-drivers +[godoc]: https://pkg.go.dev/github.com/riverqueue/river +[graceful shutdown]: https://riverqueue.com/docs/graceful-shutdown +[homepage]: https://riverqueue.com +[maintenance services]: https://riverqueue.com/docs/maintenance-services +[transactional enqueueing]: https://riverqueue.com/docs/transactional-enqueueing diff --git a/README.md b/docs/development.md similarity index 94% rename from README.md rename to docs/development.md index 380d46a8..7882e1df 100644 --- a/README.md +++ b/docs/development.md @@ -12,7 +12,7 @@ Raise test databases: Run tests: - go test ./... + go test ./... -p 1 ### Run lint