parallelize/parallelize.go (139 lines of code) (raw):
// Copyright (c) 2023 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package parallelize
import (
"runtime"
"sync"
)
// NewUnboundedRunner creates unbounded goroutines
func NewUnboundedRunner(workSize int) *Runner {
r := getRunner(workSize)
r.wg.Add(workSize)
go r.consumeMessagesUnbounded()
return r
}
func getRunner(workSize int) *Runner {
r := &Runner{
resultQueue: make(chan result, workSize),
wg: &sync.WaitGroup{},
workQueue: make(chan Work, workSize),
}
return r
}
func (r *Runner) consumeMessagesUnbounded() {
func() {
for {
wrk, ok := <-r.workQueue
if !ok {
return
}
go func(wrk Work) {
defer r.wg.Done()
res, err := wrk.Work()
r.resultQueue <- result{
data: res,
err: err,
}
}(wrk)
}
}()
}
// NewFixedBoundedRunner creates fixed bounded goroutines by factor of available CPU based on default setting.
// Default setting is to have parallelization as factor of 4x if iobound work which is supposed to be short.
// If its a long work use NewBoundedRunner instead with optimal config.
// If cpu bound work parallel go routine will be bound by cpu.
func NewFixedBoundedRunner(workSize int, ioBound bool) *Runner {
parallelCount := runtime.NumCPU()
// go routine busy doing io would be swapped out, hence 4x.
if ioBound {
parallelCount = parallelCount * 4
}
return NewBoundedRunner(workSize, parallelCount)
}
// NewBoundedRunner creates bounded goroutines by factor parallel count
func NewBoundedRunner(workSize, parallelCount int) *Runner {
r := &Runner{
resultQueue: make(chan result, workSize),
wg: &sync.WaitGroup{},
workQueue: make(chan Work, workSize),
}
r.wg.Add(parallelCount)
go r.consumeMessagesBounded(parallelCount)
return r
}
func (r *Runner) consumeMessagesBounded(parallelCount int) {
func() {
for i := 0; i < parallelCount; i++ {
go func() {
defer r.wg.Done()
for {
wrk, ok := <-r.workQueue
if !ok {
return
}
res, err := wrk.Work()
r.resultQueue <- result{
data: res,
err: err,
}
}
}()
}
}()
}
type result struct {
data interface{}
err error
}
// Runner holds data for initiating parallel work
type Runner struct {
resultQueue chan result
wg *sync.WaitGroup
workQueue chan Work
}
// SubmitWork submits a unit of work to be executed
func (r *Runner) SubmitWork(wrk Work) {
r.workQueue <- wrk
}
// GetResult returns array of responses from executing Work and returns early on first error it gets.
// Also after calling this no more work can be submitted to the Runner
func (r *Runner) GetResult() ([]interface{}, error) {
go func() {
close(r.workQueue)
r.wg.Wait()
close(r.resultQueue)
}()
var results []interface{}
for ele := range r.resultQueue {
results = append(results, ele.data)
if ele.err != nil {
return nil, ele.err
}
}
return results, nil
}
// Work is a unit of work set to be executed by this Runner
type Work interface {
Work() (interface{}, error)
}
// StatelessFunc is defined to simulate anonymous implementation directly lacking in golang.
// It will avoid creating boilerplate implementation of Work interface
type StatelessFunc func() (interface{}, error)
// Work satisfies Work interface. So we can now pass an anonymous function casted to StatelessFunc
func (sf StatelessFunc) Work() (interface{}, error) {
return sf()
}
// SingleParamWork is a utility for doing a single param work
type SingleParamWork struct {
Data interface{}
Func func(data interface{}) (interface{}, error)
}
// Work satisfies Work interface. So we can now pass an anonymous function casted to SingleParamWork
func (spw *SingleParamWork) Work() (interface{}, error) {
return spw.Func(spw.Data)
}
// TwoParamWork is a utility for doing a two param work
type TwoParamWork struct {
Data1 interface{}
Data2 interface{}
Func func(data1 interface{}, data2 interface{}) (interface{}, error)
}
// Work satisfies Work interface. So we can now pass an anonymous function casted to TwoParamWork
func (tpw *TwoParamWork) Work() (interface{}, error) {
return tpw.Func(tpw.Data1, tpw.Data2)
}
// ThreeParamWork is a utility for doing a three param work
type ThreeParamWork struct {
Data1 interface{}
Data2 interface{}
Data3 interface{}
Func func(data1 interface{}, data2 interface{}, data3 interface{}) (interface{}, error)
}
// Work satisfies Work interface. So we can now pass an anonymous function casted to ThreeParamWork
func (tpw *ThreeParamWork) Work() (interface{}, error) {
return tpw.Func(tpw.Data1, tpw.Data2, tpw.Data3)
}
// MultiParamWork is a utility for doing a multi param work
type MultiParamWork struct {
Data []interface{}
Func func(...interface{}) (interface{}, error)
}
// Work satisfies Work interface. So we can now pass an anonymous function casted to MultiParamWork
func (mpw *MultiParamWork) Work() (interface{}, error) {
return mpw.Func(mpw.Data...)
}