pkg/sync/pool/worker.go (68 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pool
import (
"sync"
"time"
)
// Worker is the structure that contains the configuration that a worker uses
// when it's spawned.
type Worker struct {
// Work Queue where the worker obtains its work.
Queue <-chan Validator
// Stop channel is used to signal a worker to Stop processing items from
// the queue.
Stop <-chan struct{}
// Stopped is the channel used to communicate back with the stopper to signal
// that the worker has been successfully Stopped.
Stopped chan<- bool
// Finished channel is used to signal that the work has been completed. If
// the worker happens to be stopping and the stopTimeout is hit before the
// item has been processed by the run function the Finished signal is also
// sent.
Finished chan<- struct{}
// error channel where the errors from the work will land.
Errors chan<- error
// Leftovers of any incompleted work items that the worker couldn't finish
// before hitting the stop timeout.
Leftovers chan<- Validator
// Run is the function that the worker will Run on each work item received.
Run RunFunc
// controls the time.Duration to wait when a stop signal is received. If the
// timeout is exceeded before the work is completed the current work will be
// sent to the leftover queue.
StopTimeout time.Duration
}
// StopParams is consumed by StopWorkers so a set of workers can be stopped.
type StopParams struct {
// number of workers to stop
Size int
// Stop is used to send a signal to a worker to make it stop.
Stop chan<- struct{}
// Stopped is a signal given back by the worker that is being stopped
// when it has stopped, either timing out or successfully.
StoppedWithTimeout <-chan bool
}
// Work receives a Worker structure that contains the configuration to control
// the worker behaviour. It processes work using the worker.run function on
// worker.queue receive.
// When a stop signal is received it will wait the time.Duration defined by the
// stopTimeout and forcefully exit without waiting for the work to be completed.
func Work(worker Worker) {
var timedout bool
defer func() { worker.Stopped <- timedout }()
var done = make(chan struct{})
for {
select {
case task := <-worker.Queue:
// This is necessary to allow the work to happen in the background
// and still react to any sent to worker.stop and be able to clean
// shutdown the worker even if the work is in flight.
go func(w Validator, d chan<- struct{}) {
if err := worker.Run(w); err != nil {
worker.Errors <- err
}
d <- struct{}{}
}(task, done)
select {
// Receives the done signal when the work is done.
case <-done:
worker.Finished <- struct{}{}
// Handles the case where the worker is processing a work item and
// and a stop signal is received while the work is in flight.
case <-worker.Stop:
// This gives one last chance to the worker to complete the
// work before returning with a timeout and sending the item
// to the leftover queue.
select {
case <-done:
worker.Finished <- struct{}{}
case <-time.After(worker.StopTimeout):
worker.Leftovers <- task
timedout = true
}
return
}
// Handle case where the worker is idle and the stop signal is received
case <-worker.Stop:
return
}
}
}
// StopWorkers stops all of the workers in parallel trying to honor their
// timeout settings. If the worker cannot be stopped before the params.timeout
// the function returns ErrStopOperationTimedOut.
func StopWorkers(params StopParams) error {
var err = make(chan error, params.Size)
var wg sync.WaitGroup
wg.Add(params.Size)
for index := 0; index < params.Size; index++ {
go func() {
defer wg.Done()
params.Stop <- struct{}{}
if v := <-params.StoppedWithTimeout; v {
err <- ErrStopOperationTimedOut
}
}()
}
wg.Wait()
close(err)
return <-err
}