retry.go (100 lines of code) (raw):

package flow import ( "context" "time" "github.com/cenkalti/backoff/v4" ) var DefaultRetryOption = RetryOption{ Backoff: backoff.NewExponentialBackOff(), Attempts: 3, } // RetryOption customizes retry behavior of a Step in Workflow. type RetryOption struct { TimeoutPerTry time.Duration // 0 means no timeout Attempts uint64 // 0 means no limit // NextBackOff is called after each retry to determine the next backoff duration. // Notice if attempts limits are reach, or context timeout, or BackOff fires backoff.Stop, // this function will not be called. // // RetryEvent: the event records attempt, duration since the start, and the error of the last try. // nextBackOff: the next backoff duration calculated by the inner BackOff NextBackOff func(ctx context.Context, re RetryEvent, nextBackOff time.Duration) time.Duration Backoff backoff.BackOff Notify backoff.Notify Timer backoff.Timer } // RetryEvent is the event fired when a retry happens type RetryEvent struct { Attempt uint64 Since time.Duration Error error } // retry constructs a do function with retry enabled according to the option. func (w *Workflow) retry(opt *RetryOption) func( ctx context.Context, do func(context.Context) error, notAfter time.Time, // the Step level timeout ddl ) error { if opt == nil { return func(ctx context.Context, do func(context.Context) error, notAfter time.Time) error { return do(ctx) } } return func(ctx context.Context, do func(context.Context) error, notAfter time.Time) error { backOff := opt.Backoff backOff = backoff.WithContext(backOff, ctx) if !notAfter.IsZero() { backOff = &backOffStopIfTimeout{BackOff: backOff, NotAfter: notAfter, Now: w.Clock.Now} } if opt.Attempts > 0 { backOff = backoff.WithMaxRetries(backOff, opt.Attempts-1) } retried := func(ctx context.Context, e RetryEvent) {} if opt.NextBackOff != nil { b := &backOffWithEvent{BackOff: backOff, nextBackOff: opt.NextBackOff} retried = b.retried backOff = b } e := RetryEvent{Attempt: 0} start := w.Clock.Now() return backoff.RetryNotifyWithTimer( func() error { defer func() { retried(ctx, e) e.Attempt++ }() ctxPerTry := ctx if opt.TimeoutPerTry > 0 { var cancel context.CancelFunc ctxPerTry, cancel = w.Clock.WithTimeout(ctx, opt.TimeoutPerTry) defer cancel() } err := do(ctxPerTry) e.Since = w.Clock.Since(start) e.Error = err return err }, backOff, opt.Notify, opt.Timer, ) } } type backOffWithEvent struct { backoff.BackOff nextBackOff func(context.Context, RetryEvent, time.Duration) time.Duration ctx context.Context e RetryEvent } func (b *backOffWithEvent) NextBackOff() time.Duration { bkof := b.BackOff.NextBackOff() if b.nextBackOff == nil || bkof == backoff.Stop { return backoff.Stop } return b.nextBackOff(b.ctx, b.e, bkof) } func (b *backOffWithEvent) retried(ctx context.Context, e RetryEvent) { b.ctx = ctx b.e = e } type backOffStopIfTimeout struct { backoff.BackOff NotAfter time.Time Now func() time.Time } func (b *backOffStopIfTimeout) NextBackOff() time.Duration { bkof := b.BackOff.NextBackOff() if b.NotAfter.IsZero() || b.Now == nil || bkof == backoff.Stop || b.Now().After(b.NotAfter) { return backoff.Stop } return bkof }