lib/persistedretry/manager.go (239 lines of code) (raw):

// Copyright (c) 2016-2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package persistedretry import ( "errors" "fmt" "sync" "time" "github.com/cenkalti/backoff" "github.com/uber-go/tally" "go.uber.org/atomic" "github.com/uber/kraken/utils/log" ) // ErrManagerClosed is returned when Add is called on a closed manager. var ErrManagerClosed = errors.New("manager closed") // Manager defines interface for a persisted retry manager. type Manager interface { Add(Task) error SyncExec(Task) error Close() Find(query interface{}) ([]Task, error) } type manager struct { config Config stats tally.Scope store Store executor Executor wg sync.WaitGroup incoming chan Task retries chan Task closeOnce sync.Once done chan struct{} closed atomic.Bool } // NewManager creates a new Manager. func NewManager( config Config, stats tally.Scope, store Store, executor Executor, ) (Manager, error) { stats = stats.Tagged(map[string]string{ "module": "persistedretry", "executor": executor.Name(), }) config = config.applyDefaults() m := &manager{ config: config, stats: stats, store: store, executor: executor, incoming: make(chan Task, config.IncomingBuffer), retries: make(chan Task, config.RetryBuffer), done: make(chan struct{}), } if err := m.markPendingTasksAsFailed(); err != nil { return nil, fmt.Errorf("mark pending tasks as failed: %s", err) } if err := m.start(); err != nil { return nil, fmt.Errorf("start: %s", err) } return m, nil } func (m *manager) markPendingTasksAsFailed() error { tasks, err := m.store.GetPending() if err != nil { return fmt.Errorf("get pending tasks: %s", err) } for _, t := range tasks { if err := m.store.MarkFailed(t); err != nil { return fmt.Errorf("mark task as failed: %s", err) } } return nil } // start starts workers and retry. func (m *manager) start() error { if m.closed.Load() { return ErrManagerClosed } totalWorkers := m.config.NumIncomingWorkers + m.config.NumRetryWorkers limit := m.config.MaxTaskThroughput * time.Duration(totalWorkers) for i := 0; i < m.config.NumIncomingWorkers; i++ { m.wg.Add(1) go m.worker(m.incoming, limit) } for i := 0; i < m.config.NumRetryWorkers; i++ { m.wg.Add(1) go m.worker(m.retries, limit) } m.wg.Add(1) go m.tickerLoop() m.wg.Add(1) go m.reportQueueMetrics() return nil } // Add enqueues an incoming task to be executed. func (m *manager) Add(t Task) error { if m.closed.Load() { return ErrManagerClosed } m.stats.Counter("tasks.added").Inc(1) ready := t.Ready() var err error if ready { err = m.store.AddPending(t) } else { err = m.store.AddFailed(t) } if err != nil { if err == ErrTaskExists { // No-op on duplicate tasks. return nil } return fmt.Errorf("store: %s", err) } if ready { if err := m.enqueue(t, m.incoming, "incoming"); err != nil { return fmt.Errorf("enqueue: %s", err) } } return nil } // SyncExec executes the task synchronously with retry logic. // Tasks will NOT be added to the retry queue if fail, but will be retried // in-place according to the configured SyncRetryBackoff. func (m *manager) SyncExec(t Task) error { bo := m.config.SyncRetryBackoff.Build() operation := func() error { return m.executor.Exec(t) } if err := backoff.Retry(operation, bo); err != nil { return fmt.Errorf("sync task failed: %w", err) } return nil } // Close waits for all workers to exit current task. func (m *manager) Close() { m.closeOnce.Do(func() { m.closed.Store(true) close(m.done) m.wg.Wait() }) } func (m *manager) Find(query interface{}) ([]Task, error) { return m.store.Find(query) } func (m *manager) enqueue(t Task, tasks chan Task, queueName string) error { queueStats := m.stats.Tagged(map[string]string{"queue": queueName}) select { case tasks <- t: queueStats.Gauge("queue.size_on_add").Update(float64(len(tasks))) default: queueStats.Counter("tasks.dropped.queue_full").Inc(1) log.Errorf("Task queue full (%s), marking task as failed for later retry", queueName) if err := m.store.MarkFailed(t); err != nil { return fmt.Errorf("mark task as failed: %s", err) } } return nil } func (m *manager) retry(t Task) error { if err := m.store.MarkPending(t); err != nil { return fmt.Errorf("mark pending: %s", err) } if err := m.enqueue(t, m.retries, "retries"); err != nil { return fmt.Errorf("enqueue: %s", err) } return nil } func (m *manager) worker(tasks chan Task, limit time.Duration) { defer m.wg.Done() for { select { case <-m.done: return case t := <-tasks: if err := m.exec(t); err != nil { m.stats.Counter("exec_failures").Inc(1) log.With("task", t).Errorf("Failed to exec task: %s", err) } time.Sleep(limit) } } } func (m *manager) tickerLoop() { defer m.wg.Done() pollRetriesTicker := time.NewTicker(m.config.PollRetriesInterval) for { select { case <-m.done: return case <-pollRetriesTicker.C: m.pollRetries() } } } func (m *manager) pollRetries() { tasks, err := m.store.GetFailed() if err != nil { m.stats.Counter("get_failed_failure").Inc(1) log.Errorf("Error getting failed tasks: %s", err) return } for _, t := range tasks { if t.Ready() && time.Since(t.GetLastAttempt()) > m.config.RetryInterval { if err := m.retry(t); err != nil { log.With("task", t).Errorf("Error adding retry task: %s", err) } } } } func (m *manager) reportQueueMetrics() { defer m.wg.Done() ticker := time.NewTicker(m.config.WorkqueueMetricsEmitInterval) defer ticker.Stop() for { select { case <-ticker.C: m.reportQueueStats("incoming", m.incoming, m.config.IncomingBuffer) m.reportQueueStats("retries", m.retries, m.config.RetryBuffer) m.stats.Gauge("queue.total.size").Update(float64(len(m.incoming) + len(m.retries))) case <-m.done: return } } } func (m *manager) reportQueueStats(name string, tasks chan Task, capacity int) { queueStats := m.stats.Tagged(map[string]string{"queue": name}) size := len(tasks) util := float64(size) / float64(capacity) * 100 queueStats.Gauge("queue.size").Update(float64(size)) queueStats.Gauge("queue.utilization_pct").Update(util) if util > 80 { log.With("queue", name, "size", size, "capacity", capacity, "utilization_pct", util). Warn("Writeback queue is near capacity") } } func (m *manager) exec(t Task) error { if err := m.executor.Exec(t); err != nil { if err := m.store.MarkFailed(t); err != nil { return fmt.Errorf("mark task as failed: %s", err) } log.With( "task", t, "failures", t.GetFailures()).Errorf("Task failed: %s", err) m.stats.Tagged(t.Tags()).Counter("task_failures").Inc(1) return nil } if err := m.store.Remove(t); err != nil { return fmt.Errorf("remove task: %s", err) } return nil }