service/worker/scanner/executor/executor.go (97 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package executor
import (
"sync"
"sync/atomic"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/metrics"
)
type (
// Task defines the interface for a runnable task
Task interface {
// Run should execute the task and return well known status codes
Run() TaskStatus
}
// Executor defines the interface for any executor which can
// accept tasks and execute them based on some policy
Executor interface {
// Start starts the executor
Start()
// Stop stops the executor
Stop()
// Submit is a blocking call that accepts a task to execute
Submit(task Task) bool
// TaskCount returns the number of outstanding tasks in the executor
TaskCount() int64
}
// fixedPoolExecutor is an implementation of an executor that uses fixed size
// goroutine pool. This executor also supports deferred execution of tasks
// for fairness
fixedPoolExecutor struct {
size int
maxDeferred int
runQ *runQueue
outstanding int64
status int32
metrics metrics.Client
metricScope int
stopC chan struct{}
stopWG sync.WaitGroup
}
// TaskStatus is the return code from a Task
TaskStatus int
)
const (
// TaskStatusDone indicates task is finished successfully
TaskStatusDone TaskStatus = iota
// TaskStatusDefer indicates task should be scheduled again for execution at later time
TaskStatusDefer
// TaskStatusErr indicates task is finished with errors
TaskStatusErr
)
// NewFixedSizePoolExecutor returns an implementation of task executor that maintains
// a fixed size pool of goroutines.The returned executor also allows task processing to
// to be deferred for fairness. To defer processing of a task, simply return TaskStatsDefer
// from your task.Run method. When a task is deferred, it will be added to the tail of a
// deferredTaskQ which in turn will be processed after the current runQ is drained
func NewFixedSizePoolExecutor(size int, maxDeferred int, metrics metrics.Client, scope int) Executor {
stopC := make(chan struct{})
return &fixedPoolExecutor{
size: size,
maxDeferred: maxDeferred,
runQ: newRunQueue(size, stopC),
metrics: metrics,
metricScope: scope,
stopC: stopC,
}
}
// Start starts the executor
func (e *fixedPoolExecutor) Start() {
if !atomic.CompareAndSwapInt32(&e.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
return
}
for i := 0; i < e.size; i++ {
e.stopWG.Add(1)
go e.worker()
}
}
// Stop stops the executor
func (e *fixedPoolExecutor) Stop() {
if !atomic.CompareAndSwapInt32(&e.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
return
}
close(e.stopC)
e.stopWG.Wait()
}
// Submit is a blocking call that accepts a task for execution
func (e *fixedPoolExecutor) Submit(task Task) bool {
if !e.alive() {
return false
}
added := e.runQ.add(task)
if added {
atomic.AddInt64(&e.outstanding, 1)
}
return added
}
// TaskCount returns the total of number of tasks currently outstanding
func (e *fixedPoolExecutor) TaskCount() int64 {
return atomic.LoadInt64(&e.outstanding)
}
func (e *fixedPoolExecutor) worker() {
defer e.stopWG.Done()
for e.alive() {
task, ok := e.runQ.remove()
if !ok {
return
}
status := task.Run()
if status == TaskStatusDefer {
if e.runQ.deferredCount() < e.maxDeferred {
e.runQ.addAndDefer(task)
e.metrics.IncCounter(e.metricScope, metrics.ExecutorTasksDeferredCount)
continue
}
e.metrics.IncCounter(e.metricScope, metrics.ExecutorTasksDroppedCount)
}
atomic.AddInt64(&e.outstanding, -1)
}
}
func (e *fixedPoolExecutor) alive() bool {
return atomic.LoadInt32(&e.status) == common.DaemonStatusStarted
}