pkg/sync/pool/pool.go (315 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package pool import ( "fmt" "io" "os" "os/signal" "sync" "sync/atomic" "syscall" "time" multierror "github.com/hashicorp/go-multierror" "github.com/pkg/errors" ) const ( // StoppedStatus is the default status for a pool. The pool is stopped // and no work is being processed. StoppedStatus = iota // StartingStatus represents the pool status when it's being started. // This means that the workers start processing the work incrementally. StartingStatus // StartedStatus all the workers have been started. StartedStatus // IdleStatus when all the workers have been started but no work is // being processed. IdleStatus // FinishedStatus all the workers are started and the queued work has // completely been processed. FinishedStatus // StoppingStatus represents the pool status when it's being stopped. // Some items might still be in flight. StoppingStatus // StoppedTimeout represents the pool status when has been stopped and // some of the workers have been forcefully stopped. This means that the // work that was being done by N workers was not finished, so the user that // consumes the Pool object might want to perform some checks or clean ups // to check which work wasn't completed. StoppedTimeout // StoppedSuccess represents the pool status when has been stopped without // hitting the stop timeout. The pool can still contain queued events that // have been moved to the leftover list. StoppedSuccess ) var ( // ErrAddOperationTimedOut is returned when the add timeout is exceeded ErrAddOperationTimedOut = errors.New("pool: failed adding work, queue full") // ErrStopOperationTimedOut is returned when the stop timeout is exceeded ErrStopOperationTimedOut = errors.New("pool: stop timeout exceeded") // ErrAlreadyStarted is returned when Start called on a non stopped pool ErrAlreadyStarted = errors.New("pool: cannot start a non stopped pool") // ErrAlreadyStopped is returned when Stop called on a stopped pool ErrAlreadyStopped = errors.New("pool: cannot stop an already stopped pool") // ErrAlreadyStopping is returned when Stop called on a stopping pool ErrAlreadyStopping = errors.New("pool: cannot stop a stopping pool") // ErrCannotAddWorkToStoppingPool is returned when work is added to a stopped // pool ErrCannotAddWorkToStoppingPool = errors.New("pool: cannot add work to stopping pool") // ErrCannotWaitOnStoppedPool is thrown by Wait() when the pool is stopped. ErrCannotWaitOnStoppedPool = errors.New("pool: cannot wait for workers to finish on a stopped pool") // ErrCannotGetLeftovers is returned when the pool is not in a stopped state. ErrCannotGetLeftovers = errors.New("pool: cannot get the work leftovers on a non stopped pool") failFastSetStopMsg = `pool: fail fast is set and received an error, stopping pool...` ) var ( defaultStatus = "unknown" statusMap = map[uint32]string{ StoppedStatus: "stopped", StartingStatus: "starting", IdleStatus: "idle", StartedStatus: "running", FinishedStatus: "finished", StoppingStatus: "stopping", StoppedTimeout: "stopped timeout", StoppedSuccess: "stopped success", } ) // Pool is a generic worker pool implementation that can be used to complete a // series of tasks concurrently and obtain any errors that have been returned // by the workers. The usage of the pool is quite simple by itself and relies // on the constructor function NewPool(). type Pool struct { // Number of workers to create within the worker pool. size uint16 // RunFunc that is used by each worker to process work. run RunFunc // internal queue used to feed the work to the worker pool. queue chan Validator // leftovers from a previously stopped pool, meaning work that did not // get processed and work that workers were processing and didn't complete. // The unfinished items will be ordered first, followed by the queue // contents that didn't get processed. leftovers chan Validator // error channel where all of the worker errors are received. errors chan error // signals is the structure that contains the sync signals that are // used to trigger changes in the Pool signals Signals // state contains the internal pool state. state *State // Pool timeouts for different timeouts Timeout // writer where any (log, info) messages will be sent. writer io.Writer // failFast can be set to stop all the pool when any of the workers returns // with an error. failFast bool } // Signals contains all of the channels that are used to trigger different // status changes in the Pool. type Signals struct { // Stop a channel that is used to signal workers to to be Stop. Stop chan struct{} // Stopped is a channel that is used for backwards communication with the // stopper to verify that the worker has been Stopped. Stopped chan bool // Finish is a channel used by workers to signal that they've finished // processing a task. Finish chan struct{} // Added is a channel used by the pool to signal that a work item has been // pushed to the queue for processing. Added chan struct{} // StopMonitor is used to stop the monitoring goroutine that updates the // pool's internal state. StopMonitor chan struct{} } // State contains the pool State type State struct { // Number of work items that have been added to the queue. Queued Counter // Number of work items that have been Processed by a worker. Processed Counter // Pool global Status Status Counter // monitoring is a condition that checks if the pool monitor is running. monitoring bool // Errors that have been returned by the worker. Errors *Errors } // Errors wraps a multierror.Error with a Mutex so that it can be safely used // when accesses type Errors struct { err *multierror.Error mu sync.RWMutex } // Add appends a new error to the error list func (e *Errors) Add(errs ...error) { e.mu.Lock() defer e.mu.Unlock() if e.err == nil { e.err = new(multierror.Error) } e.err = multierror.Append(e.err, errs...) } // Error returns an error or nil func (e *Errors) Error() error { e.mu.RLock() defer e.mu.RUnlock() if e.err == nil { return nil } return e.err.ErrorOrNil() } // Counter represents a safe uint32 that can be used as a shared counter. type Counter struct { value uint32 } // Add increments the counter func (c *Counter) Add(incr uint32) { atomic.AddUint32(&c.value, incr) } // Set overwrites the value of the counter in favour of the passed value. func (c *Counter) Set(n uint32) { atomic.SwapUint32(&c.value, n) } // Get obtains the value of the counter func (c *Counter) Get() uint32 { return atomic.LoadUint32(&c.value) } // NewPool initializes a new Pool from a set of parameters. func NewPool(params Params) (*Pool, error) { if err := params.Validate(); err != nil { return nil, err } // this buffer roughly means that each worker can have 128 items // in the queue var queueBuffer = params.Size * 128 // Since there can be in flight items already picked up by the workers // the max leftover size should be the buffer + the number of workers. var leftoverBuffer = queueBuffer + params.Size var pool = Pool{ size: params.Size, run: params.Run, timeouts: params.Timeout, queue: make(chan Validator, queueBuffer), leftovers: make(chan Validator, leftoverBuffer), signals: Signals{ Stop: make(chan struct{}), Stopped: make(chan bool), Finish: make(chan struct{}), Added: make(chan struct{}), StopMonitor: make(chan struct{}), }, state: &State{ Errors: new(Errors), monitoring: true, }, errors: make(chan error), writer: params.Writer, failFast: params.FailFast, } go pool.monitor(nil) return &pool, nil } // Start starts the workers in the worker pool, and starts all of the internal // goroutines that the pool relies in. func (p *Pool) Start() error { if p.Status() > StoppedStatus && p.Status() < StoppedTimeout { return ErrAlreadyStarted } if !p.state.monitoring { go p.monitor(nil) } // If the pool was previously stopped, recreate the channels if p.Status() > StoppedTimeout { if len(p.queue) == 0 { p.queue = make(chan Validator, cap(p.queue)) } if len(p.leftovers) == 0 { p.leftovers = make(chan Validator, cap(p.leftovers)) } } p.setStatus(StartingStatus) for worker := 0; worker < int(p.size); worker++ { <-StartWorker(Worker{ Queue: p.queue, Stop: p.signals.Stop, Stopped: p.signals.Stopped, Finished: p.signals.Finish, Errors: p.errors, Leftovers: p.leftovers, Run: p.run, StopTimeout: p.timeouts.Stop, }) } return nil } // StartWorker starts a worker in the background waiting for the goroutine to // actually be schedules. It returns a channel that can be used to wait until // the Goroutine has been run as in the code below: // wait := StartWorker(Worker{}) // // This blocks execution // <-wait func StartWorker(worker Worker) chan struct{} { var spawned = make(chan struct{}) go func(c chan<- struct{}) { go Work(worker) c <- struct{}{} }(spawned) return spawned } // monitor receives signals from the Finish and Added signal channels and // updates the counters accordingly. It also listens to any errors that might // be received through the error channel and adds them to the error state. // Also monitors an interrupt channel that stops the pool on interrupt, Kill // and SIGTERM signals. func (p *Pool) monitor(interrupt chan os.Signal) { if interrupt == nil { interrupt = make(chan os.Signal, 1) } signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM) var interrupted bool for { select { case <-p.signals.Finish: p.state.Processed.Add(1) case <-p.signals.Added: p.state.Queued.Add(1) case err := <-p.errors: p.state.Errors.Add(err) if p.failFast { if p.writer != nil { fmt.Fprintln(p.writer, failFastSetStopMsg) } //nolint go p.Stop() } case <-interrupt: if interrupted { continue } interrupted = true if p.writer != nil { fmt.Fprintln(p.writer, "pool: received interrupt, stopping pool...") } //nolint go p.Stop() case <-p.signals.StopMonitor: p.state.monitoring = false break } p.updateState() } } // updateState updates the Pool state when the pool has already been started // setting the state to Idle or Finished. func (p *Pool) updateState() { if p.Status() >= StoppingStatus || p.Status() == StoppedStatus { return } var workersIdle = p.state.Processed.Get() == p.state.Queued.Get() var emptyQueue = p.state.Queued.Get() == 0 var workersProcessing = p.state.Queued.Get() > p.state.Processed.Get() if workersProcessing { p.setStatus(StartedStatus) } if emptyQueue && workersIdle && p.Status() != IdleStatus { p.setStatus(IdleStatus) } if workersIdle && !emptyQueue && p.Status() != FinishedStatus { p.setStatus(FinishedStatus) } } // setStatus sets the pool to the specified status func (p *Pool) setStatus(status int) { p.state.Status.Set(uint32(status)) } // Wait will stop execution until the pool has finished processing all of the // work that it had in the queue. It returns any errors that the workers might // have returned. func (p *Pool) Wait() error { if p.Status() == StoppedStatus { return ErrCannotWaitOnStoppedPool } for { <-time.After(time.Millisecond) if p.Status() == FinishedStatus || isStopped(p.Status()) { return p.Result() } } } // Result returns the results from the work that was done by the workers, // namely returns any error in the multierror format. func (p *Pool) Result() error { if p.state.Errors != nil { return p.state.Errors.Error() } return nil } // Leftovers obtains a list of unfinished work with the following order: // First it returns any items that might have been in flight and did not // complete before hitting the stop timeout. // Following those, the items that did not get processed by a worker. // This function can only be called after the pool has been stopped. func (p *Pool) Leftovers() ([]Validator, error) { if p.state.Status.Get() < StoppedTimeout { return nil, ErrCannotGetLeftovers } // Close the channel before entering the loop, so the loop ranges // over the buffered items and exits when the channel has no more // items in it. close(p.leftovers) var leftovers = make([]Validator, 0, len(p.leftovers)) for l := range p.leftovers { leftovers = append(leftovers, l) } return leftovers, nil } // isStopped determines if the status is stopped. func isStopped(s uint32) bool { return s >= StoppedTimeout || s == StoppedStatus } // Stop attempts to gracefully shutdown the workers from the pool. If the stop // timeout is reached, the work that was being processed by the worker is sent // to the leftover queue as are any items that were not processed, returning // ErrStopOperationTimedOut. func (p *Pool) Stop() error { if isStopped(p.Status()) { return ErrAlreadyStopped } if p.Status() == StoppingStatus { return ErrAlreadyStopping } p.setStatus(StoppingStatus) err := StopWorkers(StopParams{ Size: int(p.size), Stop: p.signals.Stop, StoppedWithTimeout: p.signals.Stopped, }) close(p.queue) drain(p.queue, p.leftovers) if err != nil && err == ErrStopOperationTimedOut { p.setStatus(StoppedTimeout) } if err == nil { p.setStatus(StoppedSuccess) } p.signals.StopMonitor <- struct{}{} return err } // drain dumps the items from the first queue to the second queue, it assumes // that the channel is already closed, since this function is only useful for // buffered queues. func drain(from <-chan Validator, to chan<- Validator) { for w := range from { to <- w } } // Status returns the numeric status of the pool func (p *Pool) Status() uint32 { return p.state.Status.Get() } // StatusText obtains the current pool status as a string, for all available // states see the statusMap which contains the mappings from int to string. func StatusText(status uint32) string { if status, ok := statusMap[status]; ok { return status } return defaultStatus } // Add adds N amount of work to the pool's queue, timing out if the queue is // full for more than the defined timeout.Add. If an error is returned it will // be ErrAddingOperationTimedOut, meaning that the first parameter is the list // of work that didn't get added, leaving any possible retries to add work to // the user. func (p *Pool) Add(work ...Validator) ([]Validator, error) { if p.Status() >= StoppingStatus { return work, ErrCannotAddWorkToStoppingPool } var err error var leftover []Validator for _, w := range work { select { case p.queue <- w: p.signals.Added <- struct{}{} case <-time.After(p.timeouts.Add): err = ErrAddOperationTimedOut leftover = append(leftover, w) } } return leftover, err }