unison/taskgroup.go (141 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package unison
import (
"context"
"errors"
"fmt"
"sync"
"github.com/elastic/go-concert/ctxtool"
)
// Group interface, that can be used to start tasks. The tasks started will
// spawn go-routines, and will get a shutdown signal by the provided Canceler.
type Group interface {
// Go method returns an error if the task can not be started. The error
// returned by the task itself is not supposed to be returned, as the error is
// assumed to be generated asynchronously.
Go(fn func(context.Context) error) error
}
// Canceler interface, that can be used to pass along some shutdown signal to
// child goroutines.
type Canceler interface {
Done() <-chan struct{}
Err() error
}
type closedGroup struct {
err error
}
// ClosedGroup creates a Group that always fails to start a go-routine.
// Go will return reportedError on each attempt to create a go routine.
// If reportedError is nil, ErrGroupClosed will be used.
func ClosedGroup(reportedError error) Group {
if reportedError == nil {
reportedError = ErrGroupClosed
}
return &closedGroup{err: reportedError}
}
func (c closedGroup) Go(_ func(context.Context) error) error {
return c.err
}
// TaskGroup implements the Group interface. Once the group is shutting down,
// no more goroutines can be created via Go.
// The Stop method of TaskGroup will block until all sub-tasks have returned.
// Errors from sub-tasks are collected. The Stop method collects all errors and returns
// a single error summarizing all errors encountered.
//
// By default sub-tasks continue running if any task did encounter an error.
// This behavior can be modified by setting StopOnError.
//
// The zero value of TaskGroup is fully functional. StopOnError must not be set after
// the first go-routine has been spawned.
type TaskGroup struct {
// OnQuit configures the behavior when a sub-task returned. If not set
// all other tasks will continue to run. If the function return true, a
// shutdown signal is passed, and Go will fail on attempts to start new
// tasks.
// Next to the action, does the OnQuit error also return the error value
// to be recorded. The context.Cancel error will never be recorded.
//
// Common OnError handlers are given by ContinueOnErrors, StopOnError,
// StopOnErrorOrCancel.
// By default StopOnError will be used.
OnQuit TaskGroupQuitHandler
// MaxErrors configures the maximum amount of errors the TaskGroup will record.
// Older errors will be replaced once the limit is exceeded.
// If MaxErrors is set to a value < 0, all errors will be recorded.
MaxErrors int
mu sync.Mutex
errs []error
wg SafeWaitGroup
initOnce sync.Once
closer context.Context
cancel context.CancelFunc
}
type TaskGroupQuitHandler func(error) (TaskGroupStopAction, error)
// TaskGroupStopAction signals the action to take when a go-routine owned by a
// TaskGroup did quit.
type TaskGroupStopAction uint
const (
// TaskGroupStopActionContinue notifies the TaskGroup that other managed go-routines
// should not be signalled to shutdown.
TaskGroupStopActionContinue TaskGroupStopAction = iota
// TaskGroupStopActionShutdown notifies the TaskGroup that shutdown should be signaled
// to all maanaged go-routines.
TaskGroupStopActionShutdown
// TaskGroupStopActionRestart signals the TaskGroup that the managed go-routine that has
// just been returned should be restarted.
TaskGroupStopActionRestart
)
var _ Group = (*TaskGroup)(nil)
// init initializes internal state the first time the group is actively used.
func (t *TaskGroup) init(parent Canceler) {
t.initOnce.Do(func() {
t.closer, t.cancel = context.WithCancel(ctxtool.FromCanceller(parent))
if t.OnQuit == nil {
t.OnQuit = StopOnError
}
if t.MaxErrors == 0 {
t.MaxErrors = 10
}
})
}
// TaskGroupWithCancel creates a TaskGroup that gets stopped when the parent context
// signals shutdown or the Stop method is called.
//
// Although the managed go-routines are signalled to stop when the parent context is done,
// one still might want to call Stop in order to wait for the managed go-routines to stop.
//
// Associated resources are cleaned when the parent context is cancelled, or Stop is called.
func TaskGroupWithCancel(canceler Canceler) *TaskGroup {
t := &TaskGroup{}
t.init(canceler)
return t
}
// Go starts a new go-routine and passes a Canceler to signal group shutdown.
// Errors returned by the function are collected and finally returned on Stop.
// If the group was stopped before calling Go, then Go will return the
// ErrGroupClosed error.
func (t *TaskGroup) Go(fn func(context.Context) error) error {
t.init(context.Background())
if err := t.wg.Add(1); err != nil {
return err
}
go func() {
defer t.wg.Done()
for t.closer.Err() == nil {
err := fn(t.closer)
action, err := t.OnQuit(err)
if err != nil && err != context.Canceled {
t.mu.Lock()
t.errs = append(t.errs, err)
if t.MaxErrors > 0 && len(t.errs) > t.MaxErrors {
t.errs = t.errs[1:]
}
t.mu.Unlock()
}
switch action {
case TaskGroupStopActionContinue:
return // finish managed go-routine, but keep other routines alive.
case TaskGroupStopActionShutdown:
t.signalStop()
return
case TaskGroupStopActionRestart:
// continue with loop
}
}
}()
return nil
}
// Context returns the task groups internal context.
// The internal context will be cancelled if the groups parent context gets
// cancelled, or Stop has been called.
func (t *TaskGroup) Context() context.Context {
t.init(context.Background())
return t.closer
}
// Wait blocks until all owned child routines have been stopped.
func (t *TaskGroup) Wait() error {
errs := t.waitErrors()
if len(errs) > 0 {
return fmt.Errorf("task failures: %w", errors.Join(errs...))
}
return nil
}
func (t *TaskGroup) waitErrors() []error {
t.wg.Wait()
t.mu.Lock()
defer t.mu.Unlock()
return t.errs
}
// Stop sends a shutdown signal to all tasks, and waits for them to finish.
// It returns an error that contains all errors encountered.
func (t *TaskGroup) Stop() error {
t.init(context.Background())
t.signalStop()
return t.Wait()
}
// signalStop will cancel the internal context, signaling existing go-routines
// to shutdown AND invalidate the TaskGroup, such that no new go-routines can
// be started anymore.
func (t *TaskGroup) signalStop() {
t.wg.Close()
t.cancel()
}
// ContinueOnErrors provides a TaskGroup.OnQuit handler, that will ignore
// any errors. Other go-routines owned by the TaskGroup will continue to run.
func ContinueOnErrors(err error) (TaskGroupStopAction, error) {
return TaskGroupStopActionContinue, err
}
// RestartOnError provides a TaskGroup.OnQuit handler, that will restart a
// go-routine if the routine failed with an error.
func RestartOnError(err error) (TaskGroupStopAction, error) {
if err != nil && err != context.Canceled {
return TaskGroupStopActionRestart, err
}
return TaskGroupStopActionContinue, err
}
// StopAll provides a Taskgroup.OnError handler, that will signal
// the TaskGroup to shutdown once an owned go-routine returns.
// The TaskGroup is supposed to stop even on successful return.
func StopAll(err error) (TaskGroupStopAction, error) {
return TaskGroupStopActionShutdown, err
}
// StopOnError provides a TaskGroup.OnError handler, that will signal the Taskgroup
// to stop all owned go-routines.
// The context.Canceled error value will be ignored.
func StopOnError(err error) (TaskGroupStopAction, error) {
if err != nil && err != context.Canceled {
return TaskGroupStopActionShutdown, err
}
return TaskGroupStopActionContinue, err
}
// StopOnErrorOrCancel provides a TaskGroup.OnError handler, that will signal the Taskgroup
// to stop all owned go-routines.
func StopOnErrorOrCancel(err error) (TaskGroupStopAction, error) {
if err != nil {
return TaskGroupStopActionShutdown, err
}
return TaskGroupStopActionContinue, err
}