Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,13 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
client.testSignals.queueCleaner = &queueCleaner.TestSignals
}

if driver.DatabaseName() == riverdriver.DatabaseNameSQLite {
sqliteNotificationCleaner := maintenance.NewSQLiteNotificationCleaner(archetype, &maintenance.SQLiteNotificationCleanerConfig{
Schema: config.Schema,
}, driver.GetExecutor())
maintenanceServices = append(maintenanceServices, sqliteNotificationCleaner)
}

{
var scheduleFunc func(time.Time) time.Time
if config.ReindexerSchedule != nil {
Expand Down Expand Up @@ -2378,8 +2385,6 @@ type JobListResult struct {
LastCursor *JobListCursor
}

const databaseNameSQLite = "sqlite"

var errJobListParamsMetadataNotSupportedSQLite = errors.New("JobListParams.Metadata is not supported on SQLite")

// JobList returns a paginated list of jobs matching the provided filters. The
Expand All @@ -2401,7 +2406,7 @@ func (c *Client[TTx]) JobList(ctx context.Context, params *JobListParams) (*JobL
}
params.schema = c.config.Schema

if c.driver.DatabaseName() == databaseNameSQLite && params.metadataCalled {
if c.driver.DatabaseName() == riverdriver.DatabaseNameSQLite && params.metadataCalled {
return nil, errJobListParamsMetadataNotSupportedSQLite
}

Expand Down Expand Up @@ -2442,7 +2447,7 @@ func (c *Client[TTx]) JobListTx(ctx context.Context, tx TTx, params *JobListPara
}
params.schema = c.config.Schema

if c.driver.DatabaseName() == databaseNameSQLite && params.metadataCalled {
if c.driver.DatabaseName() == riverdriver.DatabaseNameSQLite && params.metadataCalled {
return nil, errJobListParamsMetadataNotSupportedSQLite
}

Expand Down
152 changes: 152 additions & 0 deletions internal/maintenance/sqlite_notification_cleaner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package maintenance

import (
"cmp"
"context"
"errors"
"log/slog"
"time"

"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/riversharedmaintenance"
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/testsignal"
"github.com/riverqueue/river/rivershared/util/testutil"
"github.com/riverqueue/river/rivershared/util/timeutil"
)

const (
SQLiteNotificationCleanerIntervalDefault = time.Minute
SQLiteNotificationCleanerRetentionPeriodDefault = 5 * time.Minute
)

// SQLiteNotificationCleanerTestSignals are internal signals used exclusively in tests.
type SQLiteNotificationCleanerTestSignals struct {
DeletedBatch testsignal.TestSignal[struct{}] // notifies when runOnce finishes a pass
}

func (ts *SQLiteNotificationCleanerTestSignals) Init(tb testutil.TestingTB) {
ts.DeletedBatch.Init(tb)
}

type SQLiteNotificationCleanerConfig struct {
// Interval is the amount of time to wait between cleaner runs.
Interval time.Duration

// RetentionPeriod is the amount of time to keep notification rows around
// before they're removed.
RetentionPeriod time.Duration

// Schema where River tables are located. Empty string omits schema.
Schema string

// Timeout is the timeout for each delete query.
Timeout time.Duration
}

func (c *SQLiteNotificationCleanerConfig) mustValidate() *SQLiteNotificationCleanerConfig {
if c.Interval <= 0 {
panic("SQLiteNotificationCleanerConfig.Interval must be above zero")
}
if c.RetentionPeriod <= 0 {
panic("SQLiteNotificationCleanerConfig.RetentionPeriod must be above zero")
}
if c.Timeout <= 0 {
panic("SQLiteNotificationCleanerConfig.Timeout must be above zero")
}

return c
}

// SQLiteNotificationCleaner periodically removes old rows from SQLite's
// notification outbox. It is only needed for the SQLite driver's emulated
// listen/notify support.
type SQLiteNotificationCleaner struct {
riversharedmaintenance.QueueMaintainerServiceBase
startstop.BaseStartStop

// exported for test purposes
Config *SQLiteNotificationCleanerConfig
TestSignals SQLiteNotificationCleanerTestSignals

exec riverdriver.Executor
}

// NewSQLiteNotificationCleaner returns a SQLite notification cleaner.
func NewSQLiteNotificationCleaner(archetype *baseservice.Archetype, config *SQLiteNotificationCleanerConfig, exec riverdriver.Executor) *SQLiteNotificationCleaner {
return baseservice.Init(archetype, &SQLiteNotificationCleaner{
Config: (&SQLiteNotificationCleanerConfig{
Interval: cmp.Or(config.Interval, SQLiteNotificationCleanerIntervalDefault),
RetentionPeriod: cmp.Or(config.RetentionPeriod, SQLiteNotificationCleanerRetentionPeriodDefault),
Schema: config.Schema,
Timeout: cmp.Or(config.Timeout, riversharedmaintenance.TimeoutDefault),
}).mustValidate(),
exec: exec,
})
}

func (s *SQLiteNotificationCleaner) Start(ctx context.Context) error { //nolint:dupl
ctx, shouldStart, started, stopped := s.StartInit(ctx)
if !shouldStart {
return nil
}

s.StaggerStart(ctx)

go func() {
started()
defer stopped() // this defer should come first so it's last out

s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStarted)
defer s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStopped)

ticker := timeutil.NewTickerWithInitialTick(ctx, s.Config.Interval)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}

res, err := s.runOnce(ctx)
if err != nil {
if !errors.Is(err, context.Canceled) {
s.Logger.ErrorContext(ctx, s.Name+": Error cleaning SQLite notifications", slog.String("error", err.Error()))
}
continue
}

if res.NumNotificationsDeleted > 0 {
s.Logger.InfoContext(ctx, s.Name+riversharedmaintenance.LogPrefixRanSuccessfully,
slog.Int("num_notifications_deleted", res.NumNotificationsDeleted),
)
}
}
}()

return nil
}

type sqliteNotificationCleanerRunOnceResult struct {
NumNotificationsDeleted int
}

func (s *SQLiteNotificationCleaner) runOnce(ctx context.Context) (*sqliteNotificationCleanerRunOnceResult, error) {
ctx, cancelFunc := context.WithTimeout(ctx, s.Config.Timeout)
defer cancelFunc()

numDeleted, err := s.exec.NotificationDeleteBefore(ctx, &riverdriver.NotificationDeleteBeforeParams{
CreatedAtHorizon: time.Now().Add(-s.Config.RetentionPeriod),
Schema: s.Config.Schema,
})
if err != nil {
return nil, err
}

s.TestSignals.DeletedBatch.Signal(struct{}{})

return &sqliteNotificationCleanerRunOnceResult{
NumNotificationsDeleted: numDeleted,
}, nil
}
105 changes: 105 additions & 0 deletions internal/maintenance/sqlite_notification_cleaner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package maintenance

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/riverqueue/river/riverdbtest"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/startstoptest"
)

func TestSQLiteNotificationCleaner(t *testing.T) {
t.Parallel()

ctx := context.Background()

type testBundle struct {
exec riverdriver.Executor
schema string
}

setup := func(t *testing.T) (*SQLiteNotificationCleaner, *testBundle) {
t.Helper()

driver := riverpgxv5.New(riversharedtest.DBPool(ctx, t))
tx, schema := riverdbtest.TestTxPgxDriver(ctx, t, driver, nil)

bundle := &testBundle{
exec: driver.UnwrapExecutor(tx),
schema: schema,
}

cleaner := NewSQLiteNotificationCleaner(
riversharedtest.BaseServiceArchetype(t),
&SQLiteNotificationCleanerConfig{
Interval: time.Hour,
RetentionPeriod: time.Hour,
Schema: bundle.schema,
Timeout: time.Second,
},
bundle.exec,
)
cleaner.StaggerStartupDisable(true)
t.Cleanup(cleaner.Stop)

return cleaner, bundle
}

notificationCount := func(t *testing.T, exec riverdriver.Executor) int {
t.Helper()

var count int
require.NoError(t, exec.QueryRow(ctx, "SELECT count(*) FROM river_notification").Scan(&count))
return count
}

t.Run("Defaults", func(t *testing.T) {
t.Parallel()

cleaner := NewSQLiteNotificationCleaner(
riversharedtest.BaseServiceArchetype(t),
&SQLiteNotificationCleanerConfig{},
nil,
)

require.Equal(t, SQLiteNotificationCleanerIntervalDefault, cleaner.Config.Interval)
require.Equal(t, SQLiteNotificationCleanerRetentionPeriodDefault, cleaner.Config.RetentionPeriod)
})

t.Run("DeletesExpiredNotifications", func(t *testing.T) {
t.Parallel()

cleaner, bundle := setup(t)
cleaner.TestSignals.Init(t)

now := time.Now()
require.NoError(t, bundle.exec.Exec(ctx, `
INSERT INTO river_notification (created_at, payload, topic)
VALUES
($1, 'old_payload_1', 'topic'),
($2, 'old_payload_2', 'topic'),
($3, 'new_payload', 'topic')
`, now.Add(-2*time.Hour), now.Add(-61*time.Minute), now.Add(-30*time.Minute)))

res, err := cleaner.runOnce(ctx)
require.NoError(t, err)
require.Equal(t, 2, res.NumNotificationsDeleted)
cleaner.TestSignals.DeletedBatch.WaitOrTimeout()
require.Equal(t, 1, notificationCount(t, bundle.exec))
})

t.Run("StartStopStress", func(t *testing.T) {
t.Parallel()

cleaner, _ := setup(t)
cleaner.Logger = riversharedtest.LoggerWarn(t) // loop started/stop log is very noisy; suppress

startstoptest.Stress(ctx, t, cleaner)
})
}
3 changes: 2 additions & 1 deletion metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import (
"context"
"testing"

"github.com/riverqueue/river/internal/jobexecutor"
"github.com/stretchr/testify/require"

"github.com/riverqueue/river/internal/jobexecutor"
)

func TestMetadataSet(t *testing.T) {
Expand Down
36 changes: 29 additions & 7 deletions riverdriver/river_driver_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ import (

const AllQueuesString = "*"

const (
DatabaseNamePostgres = "postgres"
DatabaseNameSQLite = "sqlite"
)

const MigrationLineMain = "main"

var (
Expand Down Expand Up @@ -138,12 +143,14 @@ type Driver[TTx any] interface {
// API is not stable. DO NOT USE.
SupportsListener() bool

// SupportsListenNotify indicates whether the underlying database supports
// listen/notify. This differs from SupportsListener in that even if a
// driver doesn't a support a listener but the database supports the
// underlying listen/notify mechanism, it will still broadcast in case there
// are other clients/drivers on the database that do support a listener. If
// listen/notify can't be supported at all, no broadcast attempt is made.
// SupportsListenNotify indicates whether the driver can broadcast
// notifications that a listener can receive, either through a native
// database mechanism like Postgres LISTEN/NOTIFY or a driver-specific
// emulation. This differs from SupportsListener in that even if a driver
// doesn't support a listener but the database supports the underlying
// notification mechanism, it will still broadcast in case there are other
// clients/drivers on the database that do support a listener. If
// notifications can't be supported at all, no broadcast attempt is made.
//
// API is not stable. DO NOT USE.
SupportsListenNotify() bool
Expand Down Expand Up @@ -256,6 +263,14 @@ type Executor interface {
// the `line` column was added to the migrations table.
MigrationInsertManyAssumingMain(ctx context.Context, params *MigrationInsertManyAssumingMainParams) ([]*Migration, error)

// NotificationDeleteBefore deletes notifications before a certain time
// horizon.
//
// A "notification" in this context refers to a row in `river_notification`
// which is a special table implemented in some databases (e.g. SQLite) that
// simulates Postgres' listen/notify when not available.
NotificationDeleteBefore(ctx context.Context, params *NotificationDeleteBeforeParams) (int, error)

NotifyMany(ctx context.Context, params *NotifyManyParams) error
PGAdvisoryXactLock(ctx context.Context, key int64) (*struct{}, error)

Expand Down Expand Up @@ -775,6 +790,11 @@ type NotifyManyParams struct {
Schema string
}

type NotificationDeleteBeforeParams struct {
CreatedAtHorizon time.Time
Schema string
}

type ProducerKeepAliveParams struct {
ID int64
QueueName string
Expand Down Expand Up @@ -883,8 +903,10 @@ func MigrationLineMainTruncateTables(version int) []string {
return []string{"river_job", "river_leader"}
case 4:
return []string{"river_job", "river_leader", "river_queue"}
case 0, 5, 6:
case 5, 6:
return []string{"river_job", "river_leader", "river_queue", "river_client", "river_client_queue"}
case 0, 7:
return []string{"river_job", "river_leader", "river_queue", "river_client", "river_client_queue", "river_notification"}
}

panic(fmt.Sprintf("unrecognized migration version: %d", version))
Expand Down
7 changes: 7 additions & 0 deletions riverdriver/riverdatabasesql/internal/dbsqlc/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading