plc4go/spi/utils/StopWarn.go (105 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package utils import ( "fmt" "runtime" "runtime/pprof" "strings" "sync" "time" "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) type stopWarnOptions struct { processId string processInfo string interval time.Duration extraSkipOffset int includeGoroutinesStack bool } // StopWarn gives out warning every interval (default 5 seconds) when a function doesn't terminate. Usage: `defer StopWarn(log)()` func StopWarn(localLog zerolog.Logger, opts ...func(*stopWarnOptions)) func() { o := &stopWarnOptions{ processInfo: "", interval: 5 * time.Second, } for _, opt := range opts { opt(o) } if o.processInfo == "" { _, file, line, ok := runtime.Caller(1 + o.extraSkipOffset) if ok { o.processInfo = fmt.Sprintf("%s:%d", file, line) } } localLog = localLog.With().Str("processInfo", o.processInfo).Dur("interval", o.interval).Logger() ticker := time.NewTicker(o.interval) wg := new(sync.WaitGroup) done := make(chan struct{}) wg.Add(1) go func() { defer wg.Done() localLog.Trace().Msgf("start checking") startTime := time.Now() for { localLog.Trace().Msgf("check cycle") select { case <-done: ticker.Stop() return case warnTime := <-ticker.C: processId := "" if o.processId != "" { processId = o.processId + " " } stackInfo := new(strings.Builder) if o.includeGoroutinesStack { goroutines := pprof.Lookup("goroutine") if goroutines != nil { if err := goroutines.WriteTo(stackInfo, 2); err != nil { localLog.Warn().Err(err).Msg("could not write to stack") stackInfo.WriteString(err.Error()) } } else { log.Warn().Msg("lookup goroutine failed") } } localLog.Warn(). Time("startTime", startTime). Time("warnTime", warnTime). TimeDiff("inProgressFor", warnTime, startTime). Stringer("stackInfo", stackInfo). Msgf("%sstill in progress", processId) } } }() start := time.Now() return func() { localLog.Trace().TimeDiff("check duration", time.Now(), start).Msg("done") close(done) wg.Wait() // This is to avoid late logs in case when the shutdown is really fast } } // WithStopWarnProcessId sets a process id which will be prefixed to the message func WithStopWarnProcessId(processId string) func(*stopWarnOptions) { return func(o *stopWarnOptions) { o.processId = processId } } // WithStopWarnProcessInfo set the processInfo func WithStopWarnProcessInfo(processInfo string) func(*stopWarnOptions) { return func(o *stopWarnOptions) { o.processInfo = processInfo } } // WithStopWarnInterval sets the interval at which a warning is logged (default 5 seconds). MUST be greater 0. func WithStopWarnInterval(interval time.Duration) func(*stopWarnOptions) { return func(o *stopWarnOptions) { o.interval = interval } } // WithStopWarnExtraSkipOffset sets an extra offset for the skip. Skip uses 1 so if you want to have 0 use -1 as arg. func WithStopWarnExtraSkipOffset(offset int) func(*stopWarnOptions) { return func(o *stopWarnOptions) { o.extraSkipOffset = offset } } // WithStopWarnIncludeGoroutinesStack is a flag which instructs the warn log to include the list of all current goroutines func WithStopWarnIncludeGoroutinesStack() func(*stopWarnOptions) { return func(o *stopWarnOptions) { o.includeGoroutinesStack = true } }