Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ jobs:
- name: Setup sqlc
uses: sqlc-dev/setup-sqlc@v4
with:
sqlc-version: "1.27.0"
sqlc-version: "1.28.0"

- name: Run sqlc diff
run: |
Expand Down
62 changes: 47 additions & 15 deletions .storybook/preview.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { Preview } from "@storybook/react";
import type { PartialStoryFn, StoryContext } from "@storybook/types";
import type { Decorator, Preview } from "@storybook/react";

import { withThemeByClassName } from "@storybook/addon-themes";
import { ReactRenderer } from "@storybook/react";
Expand All @@ -13,38 +12,74 @@ import {
import { ThemeProvider } from "next-themes";
import React from "react";

import type { Features } from "../src/services/features";

import "../src/global-type-overrides";
import "../src/index.css";
import { FeaturesContext } from "../src/contexts/Features";

/**
* Decorator that provides feature flags to stories
* Can be overridden per story using parameters.features
*/
export const withFeatures: Decorator = (StoryFn, context) => {
// Default features with story-specific overrides
const features = {
hasProducerTable: true,
...context.parameters?.features,
};

return (
<FeaturesContext.Provider value={{ features }}>
<StoryFn />
</FeaturesContext.Provider>
);
};

function withRouter(Story: PartialStoryFn, { parameters }: StoryContext) {
/**
* Decorator that provides router context for stories
* Can be configured per story using parameters.router
*/
export const withRouter: Decorator = (StoryFn, context) => {
const {
initialEntries = ["/"],
initialIndex,
routes = ["/"],
} = parameters?.router || {};
} = context.parameters?.router || {};

// Create a router instance only when needed
const rootRoute = createRootRoute();

const children = routes.map((path) =>
const routeComponents = routes.map((path) =>
createRoute({
component: Story,
component: () => <StoryFn />,
getParentRoute: () => rootRoute,
path,
}),
);

rootRoute.addChildren(children);
rootRoute.addChildren(routeComponents);

const router = createRouter({
history: createMemoryHistory({ initialEntries, initialIndex }),
routeTree: rootRoute,
});

return <RouterProvider router={router} />;
}
};

/**
* Decorator for theme provider
*/
export const withThemeProvider: Decorator = (StoryFn) => (
<ThemeProvider>
<StoryFn />
</ThemeProvider>
);

declare module "@storybook/types" {
// Define parameter types
declare module "@storybook/react" {
interface Parameters {
features?: Partial<Features>;
router?: {
initialEntries?: string[];
initialIndex?: number;
Expand All @@ -55,6 +90,7 @@ declare module "@storybook/types" {

const preview: Preview = {
decorators: [
withFeatures,
withRouter,
withThemeByClassName<ReactRenderer>({
defaultTheme: "light",
Expand All @@ -63,11 +99,7 @@ const preview: Preview = {
light: "light",
},
}),
(Story) => (
<ThemeProvider>
<Story />
</ThemeProvider>
),
withThemeProvider,
],

parameters: {
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Added a queue detail page with the ability to view queue stats. For River Pro customers, this page offers the ability to dynamically override concurrency limits and to view individual clients for each queue, along with how many jobs each is working. [PR #326](https://github.com/riverqueue/riverui/pull/326).

## [v0.8.1] - 2025-02-27

### Changed
Expand Down
12 changes: 12 additions & 0 deletions common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,15 @@ func requireAPIError[TError error](t *testing.T, expectedErr TError, err error)
require.ErrorAs(t, err, &apiErr)
require.Equal(t, expectedErr, apiErr)
}

const producerSchema = `CREATE UNLOGGED TABLE IF NOT EXISTS river_producer (
id bigserial PRIMARY KEY,
client_id text NOT NULL,
queue_name text NOT NULL,

max_workers int NOT NULL CHECK (max_workers >= 0),
metadata jsonb NOT NULL DEFAULT '{}',
paused_at timestamptz,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
);`
16 changes: 8 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ toolchain go1.24.1
require (
github.com/google/uuid v1.6.0
github.com/jackc/pgx/v5 v5.7.4
github.com/riverqueue/apiframe v0.0.0-20250310152721-45007400f5bf
github.com/riverqueue/river v0.20.1
github.com/riverqueue/river/riverdriver v0.20.1
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.20.1
github.com/riverqueue/river/rivershared v0.20.1
github.com/riverqueue/river/rivertype v0.20.1
github.com/riverqueue/apiframe v0.0.0-20250408034821-b206bbbd0fb4
github.com/riverqueue/river v0.20.2
github.com/riverqueue/river/riverdriver v0.20.2
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.20.2
github.com/riverqueue/river/rivershared v0.20.2
github.com/riverqueue/river/rivertype v0.20.2
github.com/rs/cors v1.11.1
github.com/samber/slog-http v1.6.0
github.com/stretchr/testify v1.10.0
Expand All @@ -39,9 +39,9 @@ require (
go.uber.org/goleak v1.3.0 // indirect
golang.org/x/crypto v0.35.0 // indirect
golang.org/x/net v0.36.0 // indirect
golang.org/x/sync v0.12.0 // indirect
golang.org/x/sync v0.13.0 // indirect
golang.org/x/sys v0.30.0 // indirect
golang.org/x/text v0.23.0 // indirect
golang.org/x/text v0.24.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

Expand Down
36 changes: 18 additions & 18 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,20 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/riverqueue/apiframe v0.0.0-20250310152721-45007400f5bf h1:y0ZXBnVCUuqKNhld/VVIix5pYVPjzOZu3J48wDpatSU=
github.com/riverqueue/apiframe v0.0.0-20250310152721-45007400f5bf/go.mod h1:ko/9b4SeomWrHTr4WU0i21peq90Qk2Mm8MgOqPrTcHA=
github.com/riverqueue/river v0.20.1 h1:eKf4gbPJF632LLoEPIMMEnP9I79aWWDb9k1avUHXfIA=
github.com/riverqueue/river v0.20.1/go.mod h1:1RVre4dwkRznCZSgz1NeW9HVqeV2MFcRbpi89rvIaYE=
github.com/riverqueue/river/riverdriver v0.20.1 h1:Iz5DXbHFrt32iFv0DRpk2Td1HcyR2z4VrhC9CA9dsoI=
github.com/riverqueue/river/riverdriver v0.20.1/go.mod h1:Q8MbNY6uuQEtozC/dLJ2HRenCZrEQn2K5V1/yYHoK9I=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.20.1 h1:C5XxNpZ365YGYv+nUIbSZynyVW+hPBo7CggsE8S3eIw=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.20.1/go.mod h1:IxJ4+ZTqlMVrA1rcbLuiSwg4qlXfyiRnZnmoz+phbNg=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.20.1 h1:66ZntyF9i1HsIpPMXO8urhie1hPcqBbz0R31CPWgTXM=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.20.1/go.mod h1:CJ6LYk3q0s/nUVzadLXQIpUDHi0hhPg9a8GAzqSq9P8=
github.com/riverqueue/river/rivershared v0.20.1 h1:49EKGZ1jtT6kgsoX5jX9+Cr/v8NB2xZAAUVvE6Q0lQg=
github.com/riverqueue/river/rivershared v0.20.1/go.mod h1:M2j13k2UlimNtU2z7iYJEoY7x0Zvp2T+q1pW/qoWzaQ=
github.com/riverqueue/river/rivertype v0.20.1 h1:9kx3vyfYm5Cn3MZLqfmCwwhpPqE10zCBXAL6UstmbY4=
github.com/riverqueue/river/rivertype v0.20.1/go.mod h1:lmdl3vLNDfchDWbYdW2uAocIuwIN+ZaXqAukdSCFqWs=
github.com/riverqueue/apiframe v0.0.0-20250408034821-b206bbbd0fb4 h1:ejJogJ57bF+jMbvGjZQ6H6LR0NCTDQr30SJ/wSVepgs=
github.com/riverqueue/apiframe v0.0.0-20250408034821-b206bbbd0fb4/go.mod h1:6aXA9FSXKkxwjbOUSXdrIOuw478Lvtz/eEu45R4MoQk=
github.com/riverqueue/river v0.20.2 h1:GU34ZcC6B3TUCJf7G9sOSURKzgHZf1Vxd3RJCxbsX68=
github.com/riverqueue/river v0.20.2/go.mod h1:xbycGcRu2+RpoVm4hWQA6Ed7Ef6riFu3xJEZx3nHNHQ=
github.com/riverqueue/river/riverdriver v0.20.2 h1:FDmWALB6DvYBBw479euIBg1KClxPmDpWjmZbhScxSBw=
github.com/riverqueue/river/riverdriver v0.20.2/go.mod h1:vYSv6ZTEFWT0JVuGCwZDxJdc2U7ZMkwJQ+nPsa7/2mM=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.20.2 h1:llBsU1hpKyIIzZroeVjM7uavmq3W+kXuSvkUCQ/3pg4=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.20.2/go.mod h1:qPJ5qkfAqAYRKXxU1TNFsVwMd9dLIXEFDLrrGz6GAWM=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.20.2 h1:O8e1vobbKhUmgbki0mLOvCptixMtBiMjJgkGPa4VFAY=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.20.2/go.mod h1:zn3Lf6qzkq9kEOzYRe/fEgYl9c/eRTCdwBHtclxILEU=
github.com/riverqueue/river/rivershared v0.20.2 h1:mrZV66L7PQyR+y0o7JMsZbdT+aG3SAVRQ7AB58mGbxU=
github.com/riverqueue/river/rivershared v0.20.2/go.mod h1:8B1yIue4a/Qb5efwo9qpbTEnYCQhZAa9NZn6pdM381o=
github.com/riverqueue/river/rivertype v0.20.2 h1:unmiQP7CWS6IDbDrp9cESNscPoMstxb6Luoz9kfNzOc=
github.com/riverqueue/river/rivertype v0.20.2/go.mod h1:lmdl3vLNDfchDWbYdW2uAocIuwIN+ZaXqAukdSCFqWs=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
Expand Down Expand Up @@ -82,12 +82,12 @@ golang.org/x/crypto v0.35.0 h1:b15kiHdrGCHrP6LvwaQ3c03kgNhhiMgvlhxHQhmg2Xs=
golang.org/x/crypto v0.35.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ=
golang.org/x/net v0.36.0 h1:vWF2fRbw4qslQsQzgFqZff+BItCvGFQqKzKIzx1rmoA=
golang.org/x/net v0.36.0/go.mod h1:bFmbeoIPfrw4sMHNhb4J9f6+tPziuGjq7Jk/38fxi1I=
golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw=
golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610=
golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
Expand Down
35 changes: 22 additions & 13 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/riverqueue/apiframe/apiendpoint"
"github.com/riverqueue/apiframe/apimiddleware"
"github.com/riverqueue/apiframe/apitype"
"github.com/riverqueue/river"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/startstop"
Expand Down Expand Up @@ -136,20 +137,28 @@ func NewServer(opts *ServerOpts) (*Server, error) {

mux := http.NewServeMux()

mountOpts := apiendpoint.MountOpts{
Logger: opts.Logger,
Validator: apitype.NewValidator(),
}

endpoints := []apiendpoint.EndpointInterface{
apiendpoint.Mount(mux, opts.Logger, newHealthCheckGetEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newJobCancelEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newJobDeleteEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newJobListEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newJobRetryEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newJobGetEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newQueueGetEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newQueueListEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newQueuePauseEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newQueueResumeEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newStateAndCountGetEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newWorkflowGetEndpoint(apiBundle)),
apiendpoint.Mount(mux, opts.Logger, newWorkflowListEndpoint(apiBundle)),
apiendpoint.Mount(mux, newFeaturesGetEndpoint(apiBundle), &mountOpts),
apiendpoint.Mount(mux, newHealthCheckGetEndpoint(apiBundle), &mountOpts),
apiendpoint.Mount(mux, newJobCancelEndpoint(apiBundle), &mountOpts),
apiendpoint.Mount(mux, newJobDeleteEndpoint(apiBundle), &mountOpts),
apiendpoint.Mount(mux, newJobGetEndpoint(apiBundle), &mountOpts),
apiendpoint.Mount(mux, newJobListEndpoint(apiBundle), &mountOpts),
apiendpoint.Mount(mux, newJobRetryEndpoint(apiBundle), &mountOpts),
apiendpoint.Mount(mux, newProducerListEndpoint(apiBundle), &mountOpts),
apiendpoint.Mount(mux, newQueueGetEndpoint(apiBundle), &mountOpts),
apiendpoint.Mount(mux, newQueueListEndpoint(apiBundle), &mountOpts),
apiendpoint.Mount(mux, newQueuePauseEndpoint(apiBundle), &mountOpts),
apiendpoint.Mount(mux, newQueueResumeEndpoint(apiBundle), &mountOpts),
apiendpoint.Mount(mux, newQueueUpdateEndpoint(apiBundle), &mountOpts),
apiendpoint.Mount(mux, newStateAndCountGetEndpoint(apiBundle), &mountOpts),
apiendpoint.Mount(mux, newWorkflowGetEndpoint(apiBundle), &mountOpts),
apiendpoint.Mount(mux, newWorkflowListEndpoint(apiBundle), &mountOpts),
}

var services []startstop.Service
Expand Down
Loading
Loading