Problem
JobRescurer rescues Jobs with Timeout = -1
Am I missing some job/client setup? Or should probably check also if jobTimeout is negative and ignore them?
Rescuer deferral:
job_rescuer.go#L315
if workUnit.Timeout() != 0 && now.Sub(*job.AttemptedAt) < workUnit.Timeout() {
return jobRetryDecisionIgnore, time.Time{}
}
Test to reproduce
type longArgs struct{}
func (longArgs) Kind() string { return "long" }
func (longArgs) InsertOpts() river.InsertOpts {
return river.InsertOpts{
Queue: longArgs{}.Kind(),
MaxAttempts: 1,
}
}
type longWorker struct {
river.WorkerDefaults[longArgs]
}
func (w *longWorker) Timeout(*river.Job[longArgs]) time.Duration { return -1 }
func (w *longWorker) Work(ctx context.Context, r *river.Job[longArgs]) error {
select {
case <-ctx.Done():
panic("should not happen")
case <-time.After(time.Minute):
return nil
}
}
func TestJobRescuer_NegativeOneWorkerTimeout(t *testing.T) {
t.Parallel()
workers := river.NewWorkers()
river.AddWorker(workers, &longWorker{})
dsn := ""
pool, err := pgxpool.New(context.Background(), dsn)
require.NoError(t, err)
client, err := river.NewClient(riverpgxv5.New(pool), &river.Config{
Workers: workers,
Queues: map[string]river.QueueConfig{
longArgs{}.Kind(): {MaxWorkers: 1},
},
RescueStuckJobsAfter: 10 * time.Second,
JobTimeout: 5 * time.Second,
})
require.NoError(t, err)
go func() {
client.Start(t.Context())
}()
res, err := client.Insert(t.Context(), longArgs{}, nil)
require.NoError(t, err)
require.NotZero(t, res.Job.ID)
deadline := time.Now().Add(time.Minute)
for time.Now().Before(deadline) {
job, err := client.JobGet(t.Context(), res.Job.ID)
require.NoError(t, err)
switch job.State {
case rivertype.JobStateCompleted:
return
case rivertype.JobStateDiscarded:
t.Fatalf("job is discarded: %+v", job)
}
time.Sleep(time.Second)
}
t.Fatal("timeout waiting for job to complete")
}
Version v0.39.0
Go version v1.26.3
Problem
JobRescurer rescues Jobs with Timeout = -1
Am I missing some job/client setup? Or should probably check also if jobTimeout is negative and ignore them?
Rescuer deferral:
job_rescuer.go#L315
Test to reproduce
Version v0.39.0
Go version v1.26.3