pkg/util/fanout/fanout.go (99 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fanout
import (
"context"
"log"
"runtime"
"sync"
)
type options struct {
worker int
buffer int
}
// Option is fanout option.
type Option func(*options)
// WithWorker with the worker of fanout.
func WithWorker(n int) Option {
return func(o *options) {
o.worker = n
}
}
// WithBuffer with the buffer of fanout.
func WithBuffer(n int) Option {
return func(o *options) {
o.buffer = n
}
}
type item struct {
f func(c context.Context)
ctx context.Context
}
// Fanout async consume data from chan.
type Fanout struct {
name string
ch chan item
options *options
waiter sync.WaitGroup
ctx context.Context
cancel func()
}
// New new a fanout struct.
func New(name string, opts ...Option) *Fanout {
if name == "" {
name = "anonymous"
}
o := &options{
worker: 1,
buffer: 1000,
}
for _, op := range opts {
op(o)
}
c := &Fanout{
ch: make(chan item, o.buffer),
name: name,
options: o,
}
c.ctx, c.cancel = context.WithCancel(context.Background())
c.waiter.Add(o.worker)
for i := 0; i < o.worker; i++ {
go c.proc()
}
return c
}
func (c *Fanout) proc() {
defer c.waiter.Done()
for {
select {
case t := <-c.ch:
wrapFunc(t.f)(t.ctx)
case <-c.ctx.Done():
return
}
}
}
func wrapFunc(f func(c context.Context)) (res func(context.Context)) {
return func(ctx context.Context) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 64*1024)
buf = buf[:runtime.Stack(buf, false)]
log.Printf("panic in fanout proc, err: %s, stack: %s\n", r, buf)
}
}()
f(ctx)
}
}
// Do save a callback func.
func (c *Fanout) Do(ctx context.Context, f func(ctx context.Context)) error {
if f == nil || c.ctx.Err() != nil {
return c.ctx.Err()
}
select {
case c.ch <- item{f: f, ctx: ctx}:
case <-ctx.Done():
return ctx.Err()
}
return nil
}
// Close close fanout.
func (c *Fanout) Close() error {
if err := c.ctx.Err(); err != nil {
return err
}
c.cancel()
c.waiter.Wait()
return nil
}