Skip to content

River JobRescurer ignores Jobs infinite Timeout #1287

@mrdken

Description

@mrdken

Problem

JobRescurer rescues Jobs with Timeout = -1
Am I missing some job/client setup? Or should probably check also if jobTimeout is negative and ignore them?
Rescuer deferral:

job_rescuer.go#L315

if workUnit.Timeout() != 0 && now.Sub(*job.AttemptedAt) < workUnit.Timeout() {
    return jobRetryDecisionIgnore, time.Time{}
}

Test to reproduce

type longArgs struct{}

func (longArgs) Kind() string { return "long" }
func (longArgs) InsertOpts() river.InsertOpts {
	return river.InsertOpts{
		Queue:       longArgs{}.Kind(),
		MaxAttempts: 1,
	}
}

type longWorker struct {
	river.WorkerDefaults[longArgs]
}

func (w *longWorker) Timeout(*river.Job[longArgs]) time.Duration { return -1 }
func (w *longWorker) Work(ctx context.Context, r *river.Job[longArgs]) error {
	select {
	case <-ctx.Done():
		panic("should not happen")
	case <-time.After(time.Minute):
		return nil
	}
}

func TestJobRescuer_NegativeOneWorkerTimeout(t *testing.T) {
	t.Parallel()

	workers := river.NewWorkers()
	river.AddWorker(workers, &longWorker{})

	dsn := ""

	pool, err := pgxpool.New(context.Background(), dsn)
	require.NoError(t, err)

	client, err := river.NewClient(riverpgxv5.New(pool), &river.Config{
		Workers: workers,
		Queues: map[string]river.QueueConfig{
			longArgs{}.Kind(): {MaxWorkers: 1},
		},
		RescueStuckJobsAfter: 10 * time.Second,
		JobTimeout:           5 * time.Second,
	})
	require.NoError(t, err)

	go func() {
		client.Start(t.Context())
	}()

	res, err := client.Insert(t.Context(), longArgs{}, nil)
	require.NoError(t, err)
	require.NotZero(t, res.Job.ID)
	deadline := time.Now().Add(time.Minute)

	for time.Now().Before(deadline) {
		job, err := client.JobGet(t.Context(), res.Job.ID)
		require.NoError(t, err)
		switch job.State {
		case rivertype.JobStateCompleted:
			return
		case rivertype.JobStateDiscarded:
			t.Fatalf("job is discarded: %+v", job)
		}
		time.Sleep(time.Second)
	}
	t.Fatal("timeout waiting for job to complete")

}

Version v0.39.0
Go version v1.26.3

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions