lib/persistedretry/manager.go (197 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package persistedretry
import (
"errors"
"fmt"
"sync"
"time"
"github.com/uber-go/tally"
"go.uber.org/atomic"
"github.com/uber/kraken/utils/log"
)
// ErrManagerClosed is returned when Add is called on a closed manager.
var ErrManagerClosed = errors.New("manager closed")
// Manager defines interface for a persisted retry manager.
type Manager interface {
Add(Task) error
SyncExec(Task) error
Close()
Find(query interface{}) ([]Task, error)
}
type manager struct {
config Config
stats tally.Scope
store Store
executor Executor
wg sync.WaitGroup
incoming chan Task
retries chan Task
closeOnce sync.Once
done chan struct{}
closed atomic.Bool
}
// NewManager creates a new Manager.
func NewManager(
config Config, stats tally.Scope, store Store, executor Executor) (Manager, error) {
stats = stats.Tagged(map[string]string{
"module": "persistedretry",
"executor": executor.Name(),
})
config = config.applyDefaults()
m := &manager{
config: config,
stats: stats,
store: store,
executor: executor,
incoming: make(chan Task, config.IncomingBuffer),
retries: make(chan Task, config.RetryBuffer),
done: make(chan struct{}),
}
if err := m.markPendingTasksAsFailed(); err != nil {
return nil, fmt.Errorf("mark pending tasks as failed: %s", err)
}
if err := m.start(); err != nil {
return nil, fmt.Errorf("start: %s", err)
}
return m, nil
}
func (m *manager) markPendingTasksAsFailed() error {
tasks, err := m.store.GetPending()
if err != nil {
return fmt.Errorf("get pending tasks: %s", err)
}
for _, t := range tasks {
if err := m.store.MarkFailed(t); err != nil {
return fmt.Errorf("mark task as failed: %s", err)
}
}
return nil
}
// start starts workers and retry.
func (m *manager) start() error {
if m.closed.Load() {
return ErrManagerClosed
}
totalWorkers := m.config.NumIncomingWorkers + m.config.NumRetryWorkers
limit := m.config.MaxTaskThroughput * time.Duration(totalWorkers)
for i := 0; i < m.config.NumIncomingWorkers; i++ {
m.wg.Add(1)
go m.worker(m.incoming, limit)
}
for i := 0; i < m.config.NumRetryWorkers; i++ {
m.wg.Add(1)
go m.worker(m.retries, limit)
}
m.wg.Add(1)
go m.tickerLoop()
return nil
}
// Add enqueues an incoming task to be executed.
func (m *manager) Add(t Task) error {
if m.closed.Load() {
return ErrManagerClosed
}
ready := t.Ready()
var err error
if ready {
err = m.store.AddPending(t)
} else {
err = m.store.AddFailed(t)
}
if err != nil {
if err == ErrTaskExists {
// No-op on duplicate tasks.
return nil
}
return fmt.Errorf("store: %s", err)
}
if ready {
if err := m.enqueue(t, m.incoming); err != nil {
return fmt.Errorf("enqueue: %s", err)
}
}
return nil
}
// SyncExec executes the task synchronously.
// Tasks will NOT be added to the retry queue if fail.
func (m *manager) SyncExec(t Task) error {
return m.executor.Exec(t)
}
// Close waits for all workers to exit current task.
func (m *manager) Close() {
m.closeOnce.Do(func() {
m.closed.Store(true)
close(m.done)
m.wg.Wait()
})
}
func (m *manager) Find(query interface{}) ([]Task, error) {
return m.store.Find(query)
}
func (m *manager) enqueue(t Task, tasks chan Task) error {
select {
case tasks <- t:
default:
// If task queue is full, fallback task to failure state so it can be
// picked up by a retry round.
if err := m.store.MarkFailed(t); err != nil {
return fmt.Errorf("mark task as failed: %s", err)
}
}
return nil
}
func (m *manager) retry(t Task) error {
if err := m.store.MarkPending(t); err != nil {
return fmt.Errorf("mark pending: %s", err)
}
if err := m.enqueue(t, m.retries); err != nil {
return fmt.Errorf("enqueue: %s", err)
}
return nil
}
func (m *manager) worker(tasks chan Task, limit time.Duration) {
defer m.wg.Done()
for {
select {
case <-m.done:
return
case t := <-tasks:
if err := m.exec(t); err != nil {
m.stats.Counter("exec_failures").Inc(1)
log.With("task", t).Errorf("Failed to exec task: %s", err)
}
time.Sleep(limit)
}
}
}
func (m *manager) tickerLoop() {
defer m.wg.Done()
pollRetriesTicker := time.NewTicker(m.config.PollRetriesInterval)
for {
select {
case <-m.done:
return
case <-pollRetriesTicker.C:
m.pollRetries()
}
}
}
func (m *manager) pollRetries() {
tasks, err := m.store.GetFailed()
if err != nil {
m.stats.Counter("get_failed_failure").Inc(1)
log.Errorf("Error getting failed tasks: %s", err)
return
}
for _, t := range tasks {
if t.Ready() && time.Since(t.GetLastAttempt()) > m.config.RetryInterval {
if err := m.retry(t); err != nil {
log.With("task", t).Errorf("Error adding retry task: %s", err)
}
}
}
}
func (m *manager) exec(t Task) error {
if err := m.executor.Exec(t); err != nil {
if err := m.store.MarkFailed(t); err != nil {
return fmt.Errorf("mark task as failed: %s", err)
}
log.With(
"task", t,
"failures", t.GetFailures()).Errorf("Task failed: %s", err)
m.stats.Tagged(t.Tags()).Counter("task_failures").Inc(1)
return nil
}
if err := m.store.Remove(t); err != nil {
return fmt.Errorf("remove task: %s", err)
}
return nil
}