Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- The River CLI will now respect the standard set of `PG*` environment variables like `PGHOST`, `PGPORT`, `PGDATABASE`, `PGUSER`, `PGPASSWORD`, and `PGSSLMODE` to configure a target database when the `--database-url` parameter is omitted. [PR #702](https://github.com/riverqueue/river/pull/702).

### Changed

- Sleep durations are now logged as Go-like duration strings (e.g. "10s") in either text or JSON instead of duration strings in text and nanoseconds in JSON. [PR #699](https://github.com/riverqueue/river/pull/699).
Expand Down
58 changes: 38 additions & 20 deletions cmd/river/rivercli/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/riverqueue/river/cmd/river/riverbench"
"github.com/riverqueue/river/rivermigrate"
"github.com/riverqueue/river/rivershared/util/ptrutil"
)

const (
Expand Down Expand Up @@ -77,7 +78,7 @@ type RunCommandBundle struct {
}

// RunCommand bootstraps and runs a River CLI subcommand.
func RunCommand[TOpts CommandOpts](ctx context.Context, bundle *RunCommandBundle, command Command[TOpts], opts TOpts) {
func RunCommand[TOpts CommandOpts](ctx context.Context, bundle *RunCommandBundle, command Command[TOpts], opts TOpts) error {
procureAndRun := func() (bool, error) {
if err := opts.Validate(); err != nil {
return false, err
Expand All @@ -89,16 +90,33 @@ func RunCommand[TOpts CommandOpts](ctx context.Context, bundle *RunCommandBundle
Out: bundle.OutStd,
}

var databaseURL *string

switch {
// If database URL is still nil after Validate check, then assume this
// command doesn't take one.
case bundle.DatabaseURL == nil:
commandBase.GetBenchmarker = func() BenchmarkerInterface { panic("databaseURL was not set") }
commandBase.GetMigrator = func(config *rivermigrate.Config) (MigratorInterface, error) { panic("databaseURL was not set") }

case strings.HasPrefix(*bundle.DatabaseURL, uriScheme) ||
strings.HasPrefix(*bundle.DatabaseURL, uriSchemeAlias):
dbPool, err := openPgxV5DBPool(ctx, *bundle.DatabaseURL)
case pgEnvConfigured():
databaseURL = ptrutil.Ptr("")

case bundle.DatabaseURL != nil:
if !strings.HasPrefix(*bundle.DatabaseURL, uriScheme) &&
!strings.HasPrefix(*bundle.DatabaseURL, uriSchemeAlias) {
return false, fmt.Errorf(
"unsupported database URL (`%s`); try one with a `%s` or `%s` scheme/prefix",
*bundle.DatabaseURL,
uriSchemeAlias,
uriScheme,
)
}

databaseURL = bundle.DatabaseURL
}

if databaseURL == nil {
commandBase.GetBenchmarker = func() BenchmarkerInterface { panic("neither PG* env nor databaseURL was not set") }
commandBase.GetMigrator = func(config *rivermigrate.Config) (MigratorInterface, error) {
panic("neither PG* env nor databaseURL was not set")
}
} else {
dbPool, err := openPgxV5DBPool(ctx, *databaseURL)
if err != nil {
return false, err
}
Expand All @@ -108,14 +126,6 @@ func RunCommand[TOpts CommandOpts](ctx context.Context, bundle *RunCommandBundle

commandBase.GetBenchmarker = func() BenchmarkerInterface { return riverbench.NewBenchmarker(driver, commandBase.Logger) }
commandBase.GetMigrator = func(config *rivermigrate.Config) (MigratorInterface, error) { return rivermigrate.New(driver, config) }

default:
return false, fmt.Errorf(
"unsupported database URL (`%s`); try one with a `%s` or `%s` scheme/prefix",
*bundle.DatabaseURL,
uriSchemeAlias,
uriScheme,
)
}

command.SetCommandBase(commandBase)
Expand All @@ -125,11 +135,12 @@ func RunCommand[TOpts CommandOpts](ctx context.Context, bundle *RunCommandBundle

ok, err := procureAndRun()
if err != nil {
fmt.Fprintf(os.Stderr, "failed: %s\n", err)
return err
}
if err != nil || !ok {
if !ok {
os.Exit(1)
}
return nil
}

func openPgxV5DBPool(ctx context.Context, databaseURL string) (*pgxpool.Pool, error) {
Expand Down Expand Up @@ -164,3 +175,10 @@ func openPgxV5DBPool(ctx context.Context, databaseURL string) (*pgxpool.Pool, er

return dbPool, nil
}

// Determines if there's a minimum number of `PG*` env vars configured to
// consider that configurable path viable. A `--database-url` parameter will
// take precedence.
func pgEnvConfigured() bool {
return os.Getenv("PGDATABASE") != ""
}
65 changes: 31 additions & 34 deletions cmd/river/rivercli/river_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,20 @@ func (c *CLI) BaseCommandSet() *cobra.Command {
Short: "Provides command line facilities for the River job queue",
Long: strings.TrimSpace(`
Provides command line facilities for the River job queue.

Commands that need database access will take a --database-url argument, but can
also accept Postgres configuration through its standard set of environment
variables like PGHOST, PGPORT, PGDATABASE, PGUSER, PGPASSWORD, and PGSSLMODE,
with a minimum of PGDATABASE required to use this strategy. --database-url will
take precedence of PG* vars if it's been specified.
`),
Run: func(cmd *cobra.Command, args []string) {
RunE: func(cmd *cobra.Command, args []string) error {
if rootOpts.Version {
RunCommand(ctx, makeCommandBundle(nil), &version{}, &versionOpts{Name: c.name})
} else {
_ = cmd.Usage()
return RunCommand(ctx, makeCommandBundle(nil), &version{}, &versionOpts{Name: c.name})
}

_ = cmd.Usage()
return nil
},
}
rootCmd.SetOut(c.out)
Expand All @@ -119,17 +126,8 @@ Provides command line facilities for the River job queue.
rootCmd.Flags().BoolVar(&rootOpts.Version, "version", false, "print version information")
}

mustMarkFlagRequired := func(cmd *cobra.Command, name string) {
// We just panic here because this will never happen outside of an error
// in development.
if err := cmd.MarkFlagRequired(name); err != nil {
panic(err)
}
}

addDatabaseURLFlag := func(cmd *cobra.Command, databaseURL *string) {
cmd.Flags().StringVar(databaseURL, "database-url", "", "URL of the database (should look like `postgres://...`")
mustMarkFlagRequired(cmd, "database-url")
}
addLineFlag := func(cmd *cobra.Command, line *string) {
cmd.Flags().StringVar(line, "line", "", "migration line to operate on (default: main)")
Expand All @@ -155,8 +153,8 @@ before starting the client, and works until all jobs are finished.
The database in --database-url will have its jobs table truncated, so make sure
to use a development database only.
`),
Run: func(cmd *cobra.Command, args []string) {
RunCommand(ctx, makeCommandBundle(&opts.DatabaseURL), &bench{}, &opts)
RunE: func(cmd *cobra.Command, args []string) error {
return RunCommand(ctx, makeCommandBundle(&opts.DatabaseURL), &bench{}, &opts)
},
}
addDatabaseURLFlag(cmd, &opts.DatabaseURL)
Expand Down Expand Up @@ -194,8 +192,8 @@ SQL being run can be output using --show-sql, and executing real database
operations can be prevented with --dry-run. Combine --show-sql and --dry-run to
dump prospective migrations that would be applied to stdout.
`),
Run: func(cmd *cobra.Command, args []string) {
RunCommand(ctx, makeCommandBundle(&opts.DatabaseURL), &migrateDown{}, &opts)
RunE: func(cmd *cobra.Command, args []string) error {
return RunCommand(ctx, makeCommandBundle(&opts.DatabaseURL), &migrateDown{}, &opts)
},
}
addMigrateFlags(cmd, &opts)
Expand Down Expand Up @@ -232,8 +230,8 @@ framework, which aren't necessary if using an external framework:
river migrate-get --all --exclude-version 1 --up > river_all.up.sql
river migrate-get --all --exclude-version 1 --down > river_all.down.sql
`),
Run: func(cmd *cobra.Command, args []string) {
RunCommand(ctx, makeCommandBundle(nil), &migrateGet{}, &opts)
RunE: func(cmd *cobra.Command, args []string) error {
return RunCommand(ctx, makeCommandBundle(nil), &migrateGet{}, &opts)
},
}
cmd.Flags().BoolVar(&opts.All, "all", false, "print all migrations; down migrations are printed in descending order")
Expand All @@ -259,8 +257,8 @@ framework, which aren't necessary if using an external framework:
Long: strings.TrimSpace(`
TODO
`),
Run: func(cmd *cobra.Command, args []string) {
RunCommand(ctx, makeCommandBundle(&opts.DatabaseURL), &migrateList{}, &opts)
RunE: func(cmd *cobra.Command, args []string) error {
return RunCommand(ctx, makeCommandBundle(&opts.DatabaseURL), &migrateList{}, &opts)
},
}
addDatabaseURLFlag(cmd, &opts.DatabaseURL)
Expand All @@ -285,8 +283,8 @@ SQL being run can be output using --show-sql, and executing real database
operations can be prevented with --dry-run. Combine --show-sql and --dry-run to
dump prospective migrations that would be applied to stdout.
`),
Run: func(cmd *cobra.Command, args []string) {
RunCommand(ctx, makeCommandBundle(&opts.DatabaseURL), &migrateUp{}, &opts)
RunE: func(cmd *cobra.Command, args []string) error {
return RunCommand(ctx, makeCommandBundle(&opts.DatabaseURL), &migrateUp{}, &opts)
},
}
addMigrateFlags(cmd, &opts)
Expand All @@ -307,12 +305,11 @@ are outstanding migrations that still need to be run.
Can be paired with river migrate-up --dry-run --show-sql to dump information on
migrations that need to be run, but without running them.
`),
Run: func(cmd *cobra.Command, args []string) {
RunCommand(ctx, makeCommandBundle(&opts.DatabaseURL), &validate{}, &opts)
RunE: func(cmd *cobra.Command, args []string) error {
return RunCommand(ctx, makeCommandBundle(&opts.DatabaseURL), &validate{}, &opts)
},
}
addDatabaseURLFlag(cmd, &opts.DatabaseURL)
mustMarkFlagRequired(cmd, "database-url")
cmd.Flags().StringVar(&opts.Line, "line", "", "migration line to operate on (default: main)")
rootCmd.AddCommand(cmd)
}
Expand All @@ -325,8 +322,8 @@ migrations that need to be run, but without running them.
Long: strings.TrimSpace(`
Print River and Go version information.
`),
Run: func(cmd *cobra.Command, args []string) {
RunCommand(ctx, makeCommandBundle(nil), &version{}, &versionOpts{Name: c.name})
RunE: func(cmd *cobra.Command, args []string) error {
return RunCommand(ctx, makeCommandBundle(nil), &version{}, &versionOpts{Name: c.name})
},
}
rootCmd.AddCommand(cmd)
Expand All @@ -347,8 +344,8 @@ type benchOpts struct {
}

func (o *benchOpts) Validate() error {
if o.DatabaseURL == "" {
return errors.New("database URL cannot be empty")
if o.DatabaseURL == "" && !pgEnvConfigured() {
return errors.New("either PG* env vars or --database-url must be set")
}

return nil
Expand All @@ -375,8 +372,8 @@ type migrateOpts struct {
}

func (o *migrateOpts) Validate() error {
if o.DatabaseURL == "" {
return errors.New("database URL cannot be empty")
if o.DatabaseURL == "" && !pgEnvConfigured() {
return errors.New("either PG* env vars or --database-url must be set")
}

return nil
Expand Down Expand Up @@ -604,8 +601,8 @@ type validateOpts struct {
}

func (o *validateOpts) Validate() error {
if o.DatabaseURL == "" {
return errors.New("database URL cannot be empty")
if o.DatabaseURL == "" && !pgEnvConfigured() {
return errors.New("either PG* env vars or --database-url must be set")
}

return nil
Expand Down
71 changes: 68 additions & 3 deletions cmd/river/rivercli/river_cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package rivercli

import (
"bytes"
"cmp"
"context"
"fmt"
"net/url"
"os"
"runtime/debug"
"strings"
"testing"
Expand Down Expand Up @@ -113,16 +116,25 @@ func TestBaseCommandSetIntegration(t *testing.T) {
cmd, _ := setup(t)

cmd.SetArgs([]string{"--debug", "--verbose"})
require.EqualError(t, cmd.Execute(), `if any flags in the group [debug verbose] are set none of the others can be; [debug verbose] were all set`)
require.EqualError(t, cmd.Execute(), "if any flags in the group [debug verbose] are set none of the others can be; [debug verbose] were all set")
})

t.Run("MigrateDownMissingDatabaseURL", func(t *testing.T) {
t.Run("DatabaseURLWithInvalidPrefix", func(t *testing.T) {
t.Parallel()

cmd, _ := setup(t)

cmd.SetArgs([]string{"migrate-down", "--database-url", "post://"})
require.EqualError(t, cmd.Execute(), "unsupported database URL (`post://`); try one with a `postgres://` or `postgresql://` scheme/prefix")
})

t.Run("MissingDatabaseURLAndPGEnv", func(t *testing.T) {
t.Parallel()

cmd, _ := setup(t)

cmd.SetArgs([]string{"migrate-down"})
require.EqualError(t, cmd.Execute(), `required flag(s) "database-url" not set`)
require.EqualError(t, cmd.Execute(), "either PG* env vars or --database-url must be set")
})

t.Run("VersionFlag", func(t *testing.T) {
Expand Down Expand Up @@ -158,6 +170,59 @@ Built with %s
})
}

// Same as the above, but non-parallel so tests can use `t.Setenv`. Separated
// out into its own test block so that we don't have to mark the entire block
// above as non-parallel because a few tests can't be made parallel.
func TestBaseCommandSetNonParallel(t *testing.T) {
type testBundle struct {
out *bytes.Buffer
}

setup := func(t *testing.T) (*cobra.Command, *testBundle) {
t.Helper()

cli := NewCLI(&Config{
DriverProcurer: &TestDriverProcurer{},
Name: "River",
})

var out bytes.Buffer
cli.SetOut(&out)

return cli.BaseCommandSet(), &testBundle{
out: &out,
}
}

t.Run("PGEnvWithoutDatabaseURL", func(t *testing.T) {
cmd, _ := setup(t)

// Deconstruct a database URL into its PG* components. This path is the
// one that gets taken in CI, but could work locally as well.
if databaseURL := os.Getenv("TEST_DATABASE_URL"); databaseURL != "" {
parsedURL, err := url.Parse(databaseURL)
require.NoError(t, err)

t.Setenv("PGDATABASE", parsedURL.Path[1:])
t.Setenv("PGHOST", parsedURL.Hostname())
pass, _ := parsedURL.User.Password()
t.Setenv("PGPASSWORD", pass)
t.Setenv("PGPORT", cmp.Or(parsedURL.Port(), "5432"))
t.Setenv("PGSSLMODE", parsedURL.Query().Get("sslmode"))
t.Setenv("PGUSER", parsedURL.User.Username())
} else {
// With no `TEST_DATABASE_URL` available, try a simpler alternative
// configuration. Requires a database on localhost that doesn't
// require authentication, which should exist from testdbman.
t.Setenv("PGDATABASE", "river_test")
t.Setenv("PGHOST", "localhost")
}

cmd.SetArgs([]string{"validate"})
require.NoError(t, cmd.Execute())
})
}

func TestMigrateList(t *testing.T) {
t.Parallel()

Expand Down
Loading