Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- Preliminary River driver for SQLite (`riverdriver/riversqlite`). This driver seems to produce good results as judged by the test suite, but so far has minimal real world vetting. Try it and let us know how it works out. [PR #870](https://github.com/riverqueue/river/pull/870).
- CLI `river migrate-get` now takes a `--schema` option to inject a custom schema into dumped migrations and schema comments are hidden if `--schema` option isn't provided. [PR #903](https://github.com/riverqueue/river/pull/903).

### Fixed

- Resuming an already unpaused queue is now fully an no-op, and won't touch the row's `updated_at` like it (unintentionally) did before. [PR #870](https://github.com/riverqueue/river/pull/870).
- Suppress an error log line from the producer that may occur on normal shutdown when operating in poll-only mode. [PR #896](https://github.com/riverqueue/river/pull/896).
- Added missing help documentation for CLI command `river migrate-list`. [PR #903](https://github.com/riverqueue/river/pull/903).

## [0.22.0] - 2025-05-10

Expand Down
39 changes: 29 additions & 10 deletions cmd/river/rivercli/river_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"github.com/spf13/cobra"

"github.com/riverqueue/river/cmd/river/riverbench"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivermigrate"
"github.com/riverqueue/river/rivershared/sqlctemplate"
)

type Config struct {
Expand Down Expand Up @@ -231,6 +233,7 @@ framework, which aren't necessary if using an external framework:
cmd.Flags().BoolVar(&opts.Down, "down", false, "print down migration")
cmd.Flags().IntSliceVar(&opts.ExcludeVersion, "exclude-version", nil, "exclude version(s), usually version 1, containing River's migration tables")
addLineFlag(cmd, &opts.Line)
addSchemaFlag(cmd, &opts.Schema)
cmd.Flags().BoolVar(&opts.Up, "up", false, "print up migration")
cmd.Flags().IntSliceVar(&opts.Version, "version", nil, "version(s) to print (can be multiple versions)")
cmd.MarkFlagsMutuallyExclusive("all", "version")
Expand All @@ -248,7 +251,8 @@ framework, which aren't necessary if using an external framework:
Use: "migrate-list",
Short: "List River schema migrations",
Long: strings.TrimSpace(`
TODO
List available migrations for the given line, showing the currently applied
migration as determined by the database in --database-url.
`),
RunE: func(cmd *cobra.Command, args []string) error {
return RunCommand(ctx, makeCommandBundle(&opts.DatabaseURL, opts.Schema), &migrateList{}, &opts)
Expand Down Expand Up @@ -484,6 +488,7 @@ type migrateGetOpts struct {
Down bool
ExcludeVersion []int
Line string
Schema string
Up bool
Version []int
}
Expand All @@ -494,13 +499,7 @@ type migrateGet struct {
CommandBase
}

func (c *migrateGet) Run(_ context.Context, opts *migrateGetOpts) (bool, error) {
// We'll need to have a way of using an alternate driver if support for
// other databases is added in the future. Unlike other migrate commands,
// this one doesn't take a `--database-url`, so we'd need a way of
// detecting the database type.
//
// TODO
func (c *migrateGet) Run(ctx context.Context, opts *migrateGetOpts) (bool, error) {
migrator, err := c.DriverProcurer.GetMigrator(&rivermigrate.Config{Line: opts.Line, Logger: c.Logger, Schema: ""})
if err != nil {
return false, err
Expand All @@ -523,7 +522,16 @@ func (c *migrateGet) Run(_ context.Context, opts *migrateGetOpts) (bool, error)
}
}

var printedOne bool
var (
line = cmp.Or(opts.Line, riverdriver.MigrationLineMain)
printedOne bool
replacer sqlctemplate.Replacer
schema string
)

if opts.Schema != "" {
schema = opts.Schema + "."
}

for _, migration := range migrations {
if slices.Contains(opts.ExcludeVersion, migration.Version) {
Expand All @@ -548,8 +556,19 @@ func (c *migrateGet) Run(_ context.Context, opts *migrateGetOpts) (bool, error)
sql = migration.SQLUp
}

if strings.Contains(sql, "/* TEMPLATE: schema */") {
ctx = sqlctemplate.WithReplacements(ctx, map[string]sqlctemplate.Replacement{
"schema": {Stable: true, Value: schema},
}, nil)

sql, _, err = replacer.RunSafely(ctx, "$", sql, nil)
if err != nil {
return false, err
}
}

printedOne = true
fmt.Fprintf(c.Out, "%s\n", migrationComment(opts.Line, migration.Version, direction))
fmt.Fprintf(c.Out, "%s\n", migrationComment(line, migration.Version, direction))
fmt.Fprintf(c.Out, "%s\n", strings.TrimSpace(sql))
}

Expand Down
115 changes: 109 additions & 6 deletions cmd/river/rivercli/river_cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,33 +79,33 @@ func (m *MigratorStub) ExistingVersions(ctx context.Context) ([]rivermigrate.Mig
}

func (m *MigratorStub) GetVersion(version int) (rivermigrate.Migration, error) {
if m.allVersionsStub == nil {
if m.getVersionStub == nil {
panic("GetVersion is not stubbed")
}

return m.getVersionStub(version)
}

func (m *MigratorStub) Migrate(ctx context.Context, direction rivermigrate.Direction, opts *rivermigrate.MigrateOpts) (*rivermigrate.MigrateResult, error) {
if m.allVersionsStub == nil {
if m.migrateStub == nil {
panic("Migrate is not stubbed")
}

return m.migrateStub(ctx, direction, opts)
}

func (m *MigratorStub) Validate(ctx context.Context) (*rivermigrate.ValidateResult, error) {
if m.allVersionsStub == nil {
if m.validateStub == nil {
panic("Validate is not stubbed")
}

return m.validateStub(ctx)
}

var (
testMigration01 = rivermigrate.Migration{Name: "1st migration", SQLDown: "SELECT 1", SQLUp: "SELECT 1", Version: 1} //nolint:gochecknoglobals
testMigration02 = rivermigrate.Migration{Name: "2nd migration", SQLDown: "SELECT 1", SQLUp: "SELECT 1", Version: 2} //nolint:gochecknoglobals
testMigration03 = rivermigrate.Migration{Name: "3rd migration", SQLDown: "SELECT 1", SQLUp: "SELECT 1", Version: 3} //nolint:gochecknoglobals
testMigration01 = rivermigrate.Migration{Name: "1st migration", SQLDown: "SELECT 'down 1' FROM /* TEMPLATE: schema */river_table", SQLUp: "SELECT 'up 1' FROM /* TEMPLATE: schema */river_table", Version: 1} //nolint:gochecknoglobals
testMigration02 = rivermigrate.Migration{Name: "2nd migration", SQLDown: "SELECT 'down 2' FROM /* TEMPLATE: schema */river_table", SQLUp: "SELECT 'up 2' FROM /* TEMPLATE: schema */river_table", Version: 2} //nolint:gochecknoglobals
testMigration03 = rivermigrate.Migration{Name: "3rd migration", SQLDown: "SELECT 'down 3' FROM /* TEMPLATE: schema */river_table", SQLUp: "SELECT 'up 3' FROM /* TEMPLATE: schema */river_table", Version: 3} //nolint:gochecknoglobals

testMigrationAll = []rivermigrate.Migration{testMigration01, testMigration02, testMigration03} //nolint:gochecknoglobals
)
Expand Down Expand Up @@ -259,6 +259,109 @@ func TestBaseCommandSetNonParallel(t *testing.T) {
})
}

func TestMigrateGet(t *testing.T) {
t.Parallel()

ctx := context.Background()

type testBundle struct {
migratorStub *MigratorStub
out *bytes.Buffer
}

setup := func(t *testing.T) (*migrateGet, *testBundle) {
t.Helper()

cmd, out := withCommandBase(t, &migrateGet{})

migratorStub := &MigratorStub{}
migratorStub.allVersionsStub = func() []rivermigrate.Migration { return testMigrationAll }
migratorStub.getVersionStub = func(version int) (rivermigrate.Migration, error) {
switch version {
case 1:
return testMigration01, nil
case 2:
return testMigration02, nil
case 3:
return testMigration03, nil
}
return rivermigrate.Migration{}, fmt.Errorf("unknown version: %d", version)
}
migratorStub.existingVersionsStub = func(ctx context.Context) ([]rivermigrate.Migration, error) { return nil, nil }

cmd.GetCommandBase().DriverProcurer = &DriverProcurerStub{
getMigratorStub: func(config *rivermigrate.Config) (MigratorInterface, error) { return migratorStub, nil },
}

return cmd, &testBundle{
out: out,
migratorStub: migratorStub,
}
}

t.Run("DownMigration", func(t *testing.T) {
t.Parallel()

cmd, bundle := setup(t)

_, err := runCommand(ctx, t, cmd, &migrateGetOpts{Down: true, Version: []int{1}})
require.NoError(t, err)

require.Equal(t, strings.TrimSpace(`
-- River main migration 001 [down]
SELECT 'down 1' FROM river_table
`), strings.TrimSpace(bundle.out.String()))
})

t.Run("UpMigration", func(t *testing.T) {
t.Parallel()

cmd, bundle := setup(t)

_, err := runCommand(ctx, t, cmd, &migrateGetOpts{Up: true, Version: []int{1}})
require.NoError(t, err)

require.Equal(t, strings.TrimSpace(`
-- River main migration 001 [up]
SELECT 'up 1' FROM river_table
`), strings.TrimSpace(bundle.out.String()))
})

t.Run("MultipleMigrations", func(t *testing.T) {
t.Parallel()

cmd, bundle := setup(t)

_, err := runCommand(ctx, t, cmd, &migrateGetOpts{Up: true, Version: []int{1, 2, 3}})
require.NoError(t, err)

require.Equal(t, strings.TrimSpace(`
-- River main migration 001 [up]
SELECT 'up 1' FROM river_table

-- River main migration 002 [up]
SELECT 'up 2' FROM river_table

-- River main migration 003 [up]
SELECT 'up 3' FROM river_table
`), strings.TrimSpace(bundle.out.String()))
})

t.Run("WithSchema", func(t *testing.T) {
t.Parallel()

cmd, bundle := setup(t)

_, err := runCommand(ctx, t, cmd, &migrateGetOpts{Schema: "custom_schema", Up: true, Version: []int{1}})
require.NoError(t, err)

require.Equal(t, strings.TrimSpace(`
-- River main migration 001 [up]
SELECT 'up 1' FROM custom_schema.river_table
`), strings.TrimSpace(bundle.out.String()))
})
}

func TestMigrateList(t *testing.T) {
t.Parallel()

Expand Down
1 change: 0 additions & 1 deletion producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,6 @@ func (p *producer) pollForSettingChanges(ctx context.Context, wg *sync.WaitGroup
// Don't log if this is part of a standard shutdown.
if !errors.Is(context.Cause(ctx), startstop.ErrStop) {
p.Logger.ErrorContext(ctx, p.Name+": Error fetching queue settings", slog.String("err", err.Error()))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what happened here. My linter is suddenly picking these up, but it seems like it should've picked them up before. I left the changes in though because they look right and I'm sure the linter will want to put them in another PR if not this one.

}
continue
}
Expand Down
3 changes: 2 additions & 1 deletion rivershared/uniquestates/unique_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package uniquestates
import (
"testing"

"github.com/riverqueue/river/rivertype"
"github.com/stretchr/testify/require"

"github.com/riverqueue/river/rivertype"
)

func TestUniqueStatesToBitmask(t *testing.T) {
Expand Down
Loading