diff --git a/CHANGELOG.md b/CHANGELOG.md index 0403ec54..31c87ea4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,11 +10,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Preliminary River driver for SQLite (`riverdriver/riversqlite`). This driver seems to produce good results as judged by the test suite, but so far has minimal real world vetting. Try it and let us know how it works out. [PR #870](https://github.com/riverqueue/river/pull/870). +- CLI `river migrate-get` now takes a `--schema` option to inject a custom schema into dumped migrations and schema comments are hidden if `--schema` option isn't provided. [PR #903](https://github.com/riverqueue/river/pull/903). ### Fixed - Resuming an already unpaused queue is now fully an no-op, and won't touch the row's `updated_at` like it (unintentionally) did before. [PR #870](https://github.com/riverqueue/river/pull/870). - Suppress an error log line from the producer that may occur on normal shutdown when operating in poll-only mode. [PR #896](https://github.com/riverqueue/river/pull/896). +- Added missing help documentation for CLI command `river migrate-list`. [PR #903](https://github.com/riverqueue/river/pull/903). ## [0.22.0] - 2025-05-10 diff --git a/cmd/river/rivercli/river_cli.go b/cmd/river/rivercli/river_cli.go index dc2f3144..68dd0177 100644 --- a/cmd/river/rivercli/river_cli.go +++ b/cmd/river/rivercli/river_cli.go @@ -23,7 +23,9 @@ import ( "github.com/spf13/cobra" "github.com/riverqueue/river/cmd/river/riverbench" + "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivermigrate" + "github.com/riverqueue/river/rivershared/sqlctemplate" ) type Config struct { @@ -231,6 +233,7 @@ framework, which aren't necessary if using an external framework: cmd.Flags().BoolVar(&opts.Down, "down", false, "print down migration") cmd.Flags().IntSliceVar(&opts.ExcludeVersion, "exclude-version", nil, "exclude version(s), usually version 1, containing River's migration tables") addLineFlag(cmd, &opts.Line) + addSchemaFlag(cmd, &opts.Schema) cmd.Flags().BoolVar(&opts.Up, "up", false, "print up migration") cmd.Flags().IntSliceVar(&opts.Version, "version", nil, "version(s) to print (can be multiple versions)") cmd.MarkFlagsMutuallyExclusive("all", "version") @@ -248,7 +251,8 @@ framework, which aren't necessary if using an external framework: Use: "migrate-list", Short: "List River schema migrations", Long: strings.TrimSpace(` -TODO +List available migrations for the given line, showing the currently applied +migration as determined by the database in --database-url. `), RunE: func(cmd *cobra.Command, args []string) error { return RunCommand(ctx, makeCommandBundle(&opts.DatabaseURL, opts.Schema), &migrateList{}, &opts) @@ -484,6 +488,7 @@ type migrateGetOpts struct { Down bool ExcludeVersion []int Line string + Schema string Up bool Version []int } @@ -494,13 +499,7 @@ type migrateGet struct { CommandBase } -func (c *migrateGet) Run(_ context.Context, opts *migrateGetOpts) (bool, error) { - // We'll need to have a way of using an alternate driver if support for - // other databases is added in the future. Unlike other migrate commands, - // this one doesn't take a `--database-url`, so we'd need a way of - // detecting the database type. - // - // TODO +func (c *migrateGet) Run(ctx context.Context, opts *migrateGetOpts) (bool, error) { migrator, err := c.DriverProcurer.GetMigrator(&rivermigrate.Config{Line: opts.Line, Logger: c.Logger, Schema: ""}) if err != nil { return false, err @@ -523,7 +522,16 @@ func (c *migrateGet) Run(_ context.Context, opts *migrateGetOpts) (bool, error) } } - var printedOne bool + var ( + line = cmp.Or(opts.Line, riverdriver.MigrationLineMain) + printedOne bool + replacer sqlctemplate.Replacer + schema string + ) + + if opts.Schema != "" { + schema = opts.Schema + "." + } for _, migration := range migrations { if slices.Contains(opts.ExcludeVersion, migration.Version) { @@ -548,8 +556,19 @@ func (c *migrateGet) Run(_ context.Context, opts *migrateGetOpts) (bool, error) sql = migration.SQLUp } + if strings.Contains(sql, "/* TEMPLATE: schema */") { + ctx = sqlctemplate.WithReplacements(ctx, map[string]sqlctemplate.Replacement{ + "schema": {Stable: true, Value: schema}, + }, nil) + + sql, _, err = replacer.RunSafely(ctx, "$", sql, nil) + if err != nil { + return false, err + } + } + printedOne = true - fmt.Fprintf(c.Out, "%s\n", migrationComment(opts.Line, migration.Version, direction)) + fmt.Fprintf(c.Out, "%s\n", migrationComment(line, migration.Version, direction)) fmt.Fprintf(c.Out, "%s\n", strings.TrimSpace(sql)) } diff --git a/cmd/river/rivercli/river_cli_test.go b/cmd/river/rivercli/river_cli_test.go index 33a53b13..03151cbe 100644 --- a/cmd/river/rivercli/river_cli_test.go +++ b/cmd/river/rivercli/river_cli_test.go @@ -79,7 +79,7 @@ func (m *MigratorStub) ExistingVersions(ctx context.Context) ([]rivermigrate.Mig } func (m *MigratorStub) GetVersion(version int) (rivermigrate.Migration, error) { - if m.allVersionsStub == nil { + if m.getVersionStub == nil { panic("GetVersion is not stubbed") } @@ -87,7 +87,7 @@ func (m *MigratorStub) GetVersion(version int) (rivermigrate.Migration, error) { } func (m *MigratorStub) Migrate(ctx context.Context, direction rivermigrate.Direction, opts *rivermigrate.MigrateOpts) (*rivermigrate.MigrateResult, error) { - if m.allVersionsStub == nil { + if m.migrateStub == nil { panic("Migrate is not stubbed") } @@ -95,7 +95,7 @@ func (m *MigratorStub) Migrate(ctx context.Context, direction rivermigrate.Direc } func (m *MigratorStub) Validate(ctx context.Context) (*rivermigrate.ValidateResult, error) { - if m.allVersionsStub == nil { + if m.validateStub == nil { panic("Validate is not stubbed") } @@ -103,9 +103,9 @@ func (m *MigratorStub) Validate(ctx context.Context) (*rivermigrate.ValidateResu } var ( - testMigration01 = rivermigrate.Migration{Name: "1st migration", SQLDown: "SELECT 1", SQLUp: "SELECT 1", Version: 1} //nolint:gochecknoglobals - testMigration02 = rivermigrate.Migration{Name: "2nd migration", SQLDown: "SELECT 1", SQLUp: "SELECT 1", Version: 2} //nolint:gochecknoglobals - testMigration03 = rivermigrate.Migration{Name: "3rd migration", SQLDown: "SELECT 1", SQLUp: "SELECT 1", Version: 3} //nolint:gochecknoglobals + testMigration01 = rivermigrate.Migration{Name: "1st migration", SQLDown: "SELECT 'down 1' FROM /* TEMPLATE: schema */river_table", SQLUp: "SELECT 'up 1' FROM /* TEMPLATE: schema */river_table", Version: 1} //nolint:gochecknoglobals + testMigration02 = rivermigrate.Migration{Name: "2nd migration", SQLDown: "SELECT 'down 2' FROM /* TEMPLATE: schema */river_table", SQLUp: "SELECT 'up 2' FROM /* TEMPLATE: schema */river_table", Version: 2} //nolint:gochecknoglobals + testMigration03 = rivermigrate.Migration{Name: "3rd migration", SQLDown: "SELECT 'down 3' FROM /* TEMPLATE: schema */river_table", SQLUp: "SELECT 'up 3' FROM /* TEMPLATE: schema */river_table", Version: 3} //nolint:gochecknoglobals testMigrationAll = []rivermigrate.Migration{testMigration01, testMigration02, testMigration03} //nolint:gochecknoglobals ) @@ -259,6 +259,109 @@ func TestBaseCommandSetNonParallel(t *testing.T) { }) } +func TestMigrateGet(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testBundle struct { + migratorStub *MigratorStub + out *bytes.Buffer + } + + setup := func(t *testing.T) (*migrateGet, *testBundle) { + t.Helper() + + cmd, out := withCommandBase(t, &migrateGet{}) + + migratorStub := &MigratorStub{} + migratorStub.allVersionsStub = func() []rivermigrate.Migration { return testMigrationAll } + migratorStub.getVersionStub = func(version int) (rivermigrate.Migration, error) { + switch version { + case 1: + return testMigration01, nil + case 2: + return testMigration02, nil + case 3: + return testMigration03, nil + } + return rivermigrate.Migration{}, fmt.Errorf("unknown version: %d", version) + } + migratorStub.existingVersionsStub = func(ctx context.Context) ([]rivermigrate.Migration, error) { return nil, nil } + + cmd.GetCommandBase().DriverProcurer = &DriverProcurerStub{ + getMigratorStub: func(config *rivermigrate.Config) (MigratorInterface, error) { return migratorStub, nil }, + } + + return cmd, &testBundle{ + out: out, + migratorStub: migratorStub, + } + } + + t.Run("DownMigration", func(t *testing.T) { + t.Parallel() + + cmd, bundle := setup(t) + + _, err := runCommand(ctx, t, cmd, &migrateGetOpts{Down: true, Version: []int{1}}) + require.NoError(t, err) + + require.Equal(t, strings.TrimSpace(` +-- River main migration 001 [down] +SELECT 'down 1' FROM river_table + `), strings.TrimSpace(bundle.out.String())) + }) + + t.Run("UpMigration", func(t *testing.T) { + t.Parallel() + + cmd, bundle := setup(t) + + _, err := runCommand(ctx, t, cmd, &migrateGetOpts{Up: true, Version: []int{1}}) + require.NoError(t, err) + + require.Equal(t, strings.TrimSpace(` +-- River main migration 001 [up] +SELECT 'up 1' FROM river_table + `), strings.TrimSpace(bundle.out.String())) + }) + + t.Run("MultipleMigrations", func(t *testing.T) { + t.Parallel() + + cmd, bundle := setup(t) + + _, err := runCommand(ctx, t, cmd, &migrateGetOpts{Up: true, Version: []int{1, 2, 3}}) + require.NoError(t, err) + + require.Equal(t, strings.TrimSpace(` +-- River main migration 001 [up] +SELECT 'up 1' FROM river_table + +-- River main migration 002 [up] +SELECT 'up 2' FROM river_table + +-- River main migration 003 [up] +SELECT 'up 3' FROM river_table + `), strings.TrimSpace(bundle.out.String())) + }) + + t.Run("WithSchema", func(t *testing.T) { + t.Parallel() + + cmd, bundle := setup(t) + + _, err := runCommand(ctx, t, cmd, &migrateGetOpts{Schema: "custom_schema", Up: true, Version: []int{1}}) + require.NoError(t, err) + + require.Equal(t, strings.TrimSpace(` +-- River main migration 001 [up] +SELECT 'up 1' FROM custom_schema.river_table + `), strings.TrimSpace(bundle.out.String())) + }) +} + func TestMigrateList(t *testing.T) { t.Parallel() diff --git a/producer.go b/producer.go index cb6d6fad..ff59cd81 100644 --- a/producer.go +++ b/producer.go @@ -810,7 +810,6 @@ func (p *producer) pollForSettingChanges(ctx context.Context, wg *sync.WaitGroup // Don't log if this is part of a standard shutdown. if !errors.Is(context.Cause(ctx), startstop.ErrStop) { p.Logger.ErrorContext(ctx, p.Name+": Error fetching queue settings", slog.String("err", err.Error())) - } continue } diff --git a/rivershared/uniquestates/unique_states_test.go b/rivershared/uniquestates/unique_states_test.go index a73aac09..562cb6fd 100644 --- a/rivershared/uniquestates/unique_states_test.go +++ b/rivershared/uniquestates/unique_states_test.go @@ -3,8 +3,9 @@ package uniquestates import ( "testing" - "github.com/riverqueue/river/rivertype" "github.com/stretchr/testify/require" + + "github.com/riverqueue/river/rivertype" ) func TestUniqueStatesToBitmask(t *testing.T) {