nmxact/nmxutil/err_funnel.go (59 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package nmxutil import ( "sync" "time" ) type ErrLessFn func(a error, b error) bool type ErrProcFn func(err error) // Aggregates errors that occur close in time. The most severe error gets // reported. type ErrFunnel struct { LessCb ErrLessFn AccumDelay time.Duration mtx sync.Mutex resetMtx sync.Mutex curErr error errTimer *time.Timer waiters [](chan error) } func (f *ErrFunnel) Insert(err error) { if err == nil { panic("ErrFunnel nil insert") } f.mtx.Lock() defer f.mtx.Unlock() if f.curErr == nil { f.curErr = err f.errTimer = time.AfterFunc(f.AccumDelay, func() { f.timerExp() }) } else { if f.LessCb(f.curErr, err) { if !f.errTimer.Stop() { <-f.errTimer.C } f.curErr = err f.errTimer.Reset(f.AccumDelay) } } } func (f *ErrFunnel) timerExp() { f.mtx.Lock() err := f.curErr f.curErr = nil waiters := f.waiters f.waiters = nil f.mtx.Unlock() if err == nil { panic("ErrFunnel timer expired but no error") } for _, w := range waiters { w <- err close(w) } } func (f *ErrFunnel) Wait() chan error { c := make(chan error) f.mtx.Lock() f.waiters = append(f.waiters, c) f.mtx.Unlock() return c }