pkg/common/goalstate/engine.go (235 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package goalstate
import (
"context"
"sync"
"time"
"github.com/uber/peloton/pkg/common/async"
queue "github.com/uber/peloton/pkg/common/deadline_queue"
log "github.com/sirupsen/logrus"
"github.com/uber-go/tally"
)
// asyncWorkerQueueItem implements the async.Job interface while
// storing the metadata for async.Queue which includes information
// to be provided to the deadline queue on Enqueue
type asyncWorkerQueueItem struct {
item queue.QueueItem // the queue item
deadline time.Time // deadline to be set for the queue item in enqueue
engine *engine // backpointer to goal state engine
}
func (w *asyncWorkerQueueItem) Run(ctx context.Context) {
w.engine.processEntityAfterDequeue(w.item.(*queue.Item))
}
// asyncWorkerQueue is a wrapper around deadline queue which
// implements async.Queue
type asyncWorkerQueue struct {
queue queue.DeadlineQueue // goal state engine's deadline queue
engine *engine // backpointer to goal state engine
// jobChan is used to sync between asyncWorkerQueue.Dequeue,
// because queue.DeadlineQueue is not concurrency safe.
// asyncWorkerQueue.Run would continue to enqueue into the
// channel, and asyncWorkerQueue.Dequeue would read from the
// channel.
jobChan chan queue.QueueItem
}
func newAsyncWorkerQueue(
ddlQueue queue.DeadlineQueue,
engine *engine,
) *asyncWorkerQueue {
return &asyncWorkerQueue{
queue: ddlQueue,
engine: engine,
}
}
func (q *asyncWorkerQueue) Run(stopChan chan struct{}) {
q.jobChan = make(chan queue.QueueItem)
go func() {
for {
queueItem := q.queue.Dequeue(stopChan)
if queueItem == nil {
q.jobChan <- nil
return
}
select {
case q.jobChan <- queueItem:
continue
case <-stopChan:
close(q.jobChan)
return
}
}
}()
}
func (q *asyncWorkerQueue) Enqueue(job async.Job) {
asyncQueueItem := job.(*asyncWorkerQueueItem)
q.queue.Enqueue(asyncQueueItem.item, asyncQueueItem.deadline)
return
}
func (q *asyncWorkerQueue) Dequeue() async.Job {
queueItem := <-q.jobChan
if queueItem == nil {
return nil
}
return &asyncWorkerQueueItem{
item: queueItem,
engine: q.engine,
}
}
// Engine defines the goal state engine interface.
type Engine interface {
// Start starts the goal state engine processing.
Start()
// Enqueue is used to enqueue an entity into the goal state
// engine for evaluation. The paramater deadline specifies when
// the entity should be evaluated.
// Enqueue creates state in the goal state engine which will persist
// till the caller calls an explicit delete to clean up this state.
Enqueue(entity Entity, deadline time.Time)
// IsScheduled is used to determine if a given entity is queued in
// the deadline queue for evaluation
IsScheduled(entity Entity) bool
// Delete is used clean up the state created in the goal state
// engine for the entity. It is the caller's responsibility to
// explicitly call delete when an entity is being removed from the system.
// If Delete is not called, the state in goal state engine will persis forever.
Delete(entity Entity)
// Stops stops the goal state engine processing.
Stop()
}
// NewEngine returns a new goal state engine object.
func NewEngine(
numWorkerThreads int,
failureRetryDelay time.Duration,
maxRetryDelay time.Duration,
parentScope tally.Scope) Engine {
e := &engine{
entityMap: make(map[string]*entityMapItem),
failureRetryDelay: failureRetryDelay,
maxRetryDelay: maxRetryDelay,
mtx: NewMetrics(parentScope),
}
asyncQueue := newAsyncWorkerQueue(queue.NewDeadlineQueue(queue.NewQueueMetrics(parentScope)), e)
pool := async.NewPool(
async.PoolOptions{MaxWorkers: numWorkerThreads},
asyncQueue,
)
e.pool = pool
return e
}
// entityMapItem stores the entity state in goal state engine.
type entityMapItem struct {
sync.RWMutex // the mutex to synchronize access to this object
entity Entity // the entity object
queueItem *queue.Item // the correspoing queue item in the deadline queue
// delay is used by goal state to track expoenential backoff of scheduling
// duration in case entity actions keep returning an error.
delay time.Duration
}
// engine implements the goal state engine interface
type engine struct {
sync.RWMutex // the mutex to synchronize access to this object
entityMap map[string]*entityMapItem // map to store the entity items
stopChan chan struct{} // channel to indicate to deadline queue to stop processing
pool *async.Pool // worker pool to process queue items after dequeue
// Global configuration for the delay for each retry on error.
failureRetryDelay time.Duration
// Global configuration for the absolute maximum duration between
// retries. Exponential backoff will be capped at this value.
maxRetryDelay time.Duration
mtx *Metrics // goal state engine metrics
}
// addItemToEntityMap stores an entity object in the entity map.
// When a new enqueue request comes in, instead of doing a get and add
// without a lock, this API should be used so that both get and add is
// done while holding the lock. This ensures that concurrent enqueue
// requests for the same entity get synchronized correctly.
func (e *engine) addItemToEntityMap(id string, entity Entity) *queue.Item {
e.Lock()
defer e.Unlock()
var entityItem *entityMapItem
entityItem, ok := e.entityMap[id]
if !ok {
queueItem := queue.NewItem(id)
entityItem = &entityMapItem{
entity: entity,
queueItem: queueItem,
}
e.entityMap[id] = entityItem
// Only update for adds for now. This is to prevent having to compute
// the length on every delete as well.
e.mtx.totalItems.Update(float64(len(e.entityMap)))
}
return entityItem.queueItem
}
// getItemFromEntityMap fetches an entity object from the entity map.
func (e *engine) getItemFromEntityMap(id string) *entityMapItem {
e.RLock()
defer e.RUnlock()
item, ok := e.entityMap[id]
if !ok {
return nil
}
return item
}
// deleteItemFromEntityMap deletes the entity object from the entity map.
func (e *engine) deleteItemFromEntityMap(id string) {
e.Lock()
defer e.Unlock()
delete(e.entityMap, id)
}
// getFailureRetryDelay fetches the failureRetryDelay global configuration
func (e *engine) getFailureRetryDelay() time.Duration {
e.RLock()
defer e.RUnlock()
return e.failureRetryDelay
}
// getMaxRetryDelay fetches the maxRetryDelay global configuration
func (e *engine) getMaxRetryDelay() time.Duration {
e.RLock()
defer e.RUnlock()
return e.maxRetryDelay
}
func (e *engine) Enqueue(entity Entity, deadline time.Time) {
id := entity.GetID()
asyncQueueItem := &asyncWorkerQueueItem{
item: e.addItemToEntityMap(id, entity),
deadline: deadline,
}
e.pool.Enqueue(asyncQueueItem)
}
func (e *engine) IsScheduled(entity Entity) bool {
id := entity.GetID()
entityItem := e.getItemFromEntityMap(id)
if entityItem == nil {
return false
}
entityItem.RLock()
defer entityItem.RUnlock()
return entityItem.queueItem.IsScheduled()
}
func (e *engine) Delete(entity Entity) {
id := entity.GetID()
e.deleteItemFromEntityMap(id)
}
// calculateDelay is a helper function to calculate the backoff delay
// in case of error.
func (e *engine) calculateDelay(entityItem *entityMapItem) {
entityItem.delay = entityItem.delay + e.getFailureRetryDelay()
if entityItem.delay > e.getMaxRetryDelay() {
entityItem.delay = e.getMaxRetryDelay()
}
}
// runActions fetches the action list for an entity and then executes each action.
// Return value reschedule indicates whether the entity needs to be rescheduled
// in the deadline queue, while the return value delay indicates the deadline
// from time.Now() when the entity needs to be evaluated again.
// // Enqueue should always happen outside entityItem lock, hence enqueue is not done here.
func (e *engine) runActions(entityItem *entityMapItem) (reschedule bool, delay time.Duration) {
entityItem.Lock()
defer entityItem.Unlock()
// Get the actions based on state and goal state of entity.
state := entityItem.entity.GetState()
goalState := entityItem.entity.GetGoalState()
ctx, cancel, actions := entityItem.entity.GetActionList(state, goalState)
if cancel != nil {
defer cancel()
}
if len(actions) == 0 {
return false, 0
}
// Execute each action.
for _, action := range actions {
tStart := time.Now()
err := action.Execute(ctx, entityItem.entity)
e.mtx.scope.Tagged(map[string]string{"action": action.Name}).
Timer("run_duration").Record(time.Since(tStart))
if err != nil {
log.WithError(err).
WithFields(log.Fields{
"entity_id": entityItem.entity.GetID(),
"action_name": action.Name,
}).
Info("goal state action failed to execute")
// Backoff and reevaluate the entity again.
e.calculateDelay(entityItem)
return true, entityItem.delay
}
// set delay to 0
entityItem.delay = 0
}
return false, 0
}
// processEntityAfterDequeue is a helper function to evaluate
// an entity dequeued from the deadline queue, and execute the
// corresponding actions.
func (e *engine) processEntityAfterDequeue(queueItem *queue.Item) {
entityItem := e.getItemFromEntityMap(queueItem.GetString())
if entityItem == nil {
// If an object is deleted from the entity map, it may
// still exist in the deadline queue. An example in Peloton is
// task items which are untracked when a job gets untracked.
// Since untracking of job happens in job goal state, it may get
// untracked while the task has not been dequeued yet in the deadline queue
// which will hit this if check.
log.WithField("goal_state_id", queueItem.GetString()).
Debug("did not find the identifier in the entity map")
e.mtx.missingItems.Inc(1)
return
}
reschedule, delay := e.runActions(entityItem)
if reschedule == true {
asyncQueueItem := &asyncWorkerQueueItem{
item: queueItem,
deadline: time.Now().Add(delay),
}
e.pool.Enqueue(asyncQueueItem)
}
}
func (e *engine) Start() {
e.Lock()
defer e.Unlock()
e.pool.Start()
log.Info("goalstate.Engine started")
}
func (e *engine) Stop() {
e.Lock()
defer e.Unlock()
e.pool.Stop()
log.Info("goalstate.Engine stopped")
}