pkg/util/fanout/fanout.go (99 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package fanout import ( "context" "log" "runtime" "sync" ) type options struct { worker int buffer int } // Option is fanout option. type Option func(*options) // WithWorker with the worker of fanout. func WithWorker(n int) Option { return func(o *options) { o.worker = n } } // WithBuffer with the buffer of fanout. func WithBuffer(n int) Option { return func(o *options) { o.buffer = n } } type item struct { f func(c context.Context) ctx context.Context } // Fanout async consume data from chan. type Fanout struct { name string ch chan item options *options waiter sync.WaitGroup ctx context.Context cancel func() } // New new a fanout struct. func New(name string, opts ...Option) *Fanout { if name == "" { name = "anonymous" } o := &options{ worker: 1, buffer: 1000, } for _, op := range opts { op(o) } c := &Fanout{ ch: make(chan item, o.buffer), name: name, options: o, } c.ctx, c.cancel = context.WithCancel(context.Background()) c.waiter.Add(o.worker) for i := 0; i < o.worker; i++ { go c.proc() } return c } func (c *Fanout) proc() { defer c.waiter.Done() for { select { case t := <-c.ch: wrapFunc(t.f)(t.ctx) case <-c.ctx.Done(): return } } } func wrapFunc(f func(c context.Context)) (res func(context.Context)) { return func(ctx context.Context) { defer func() { if r := recover(); r != nil { buf := make([]byte, 64*1024) buf = buf[:runtime.Stack(buf, false)] log.Printf("panic in fanout proc, err: %s, stack: %s\n", r, buf) } }() f(ctx) } } // Do save a callback func. func (c *Fanout) Do(ctx context.Context, f func(ctx context.Context)) error { if f == nil || c.ctx.Err() != nil { return c.ctx.Err() } select { case c.ch <- item{f: f, ctx: ctx}: case <-ctx.Done(): return ctx.Err() } return nil } // Close close fanout. func (c *Fanout) Close() error { if err := c.ctx.Err(); err != nil { return err } c.cancel() c.waiter.Wait() return nil }