agent/perfmon/perfmon.go (194 lines of code) (raw):

package perfmon import ( "bytes" "encoding/base64" "fmt" "math/rand" "os" "runtime" "runtime/pprof" "strconv" "sync" "time" "github.com/aliyun/aliyun_assist_client/agent/clientreport" "github.com/aliyun/aliyun_assist_client/agent/flagging" "github.com/aliyun/aliyun_assist_client/agent/log" "github.com/aliyun/aliyun_assist_client/agent/metrics" "github.com/aliyun/aliyun_assist_client/agent/statemanager" "github.com/aliyun/aliyun_assist_client/agent/taskengine" "github.com/aliyun/aliyun_assist_client/agent/taskengine/timermanager" "github.com/aliyun/aliyun_assist_client/agent/update" "github.com/aliyun/aliyun_assist_client/thirdparty/sirupsen/logrus" "github.com/shirou/gopsutil/v3/process" ) var ( perfMonitorIntervalSecond = 5 cpuPercentTotal int64 // store cpu usage percent by int64, 12% is stored as 1200 cpuPercentMax int64 cpuCount int64 memRssTotal int64 memRssMax int64 memCount int64 dataLock sync.RWMutex cpu_overload_count int = 0 mem_overload_count int = 0 ) // Periodically collect its own CPU and memory load func generatePerfData(logger logrus.FieldLogger) error { p, err := process.NewProcess(int32(os.Getpid())) if err != nil { logger.Error("get process failed: ", err) return err } _, err = p.Percent(0) if err != nil { logger.Error("get process cpu percent failed: ", err) return err } tick := time.NewTicker(time.Duration(perfMonitorIntervalSecond) * time.Second) defer tick.Stop() for { <-tick.C dataLock.Lock() cpuPercent, err := p.Percent(0) if err != nil { logger.Error("get cpu percent failed: ", err) cpuPercent = -1 } else { tmpCpuPercent := int64(cpuPercent * 100) cpuPercentTotal += tmpCpuPercent if tmpCpuPercent > cpuPercentMax { cpuPercentMax = tmpCpuPercent } cpuCount += 1 } var memRss int64 memInfo, err := p.MemoryInfo() if err != nil { logger.Error("get cpu percent failed: ", err) memRss = -1 } else { memRssTotal += int64(memInfo.RSS) if memInfo.RSS > uint64(memRssMax) { memRssMax = int64(memInfo.RSS) } memRss = int64(memInfo.RSS) memCount += 1 } dataLock.Unlock() go checkCpuMemLoad(cpuPercent, memRss) } } func StartSelfKillMon() { logger := log.GetLogger().WithField("Phase", "perfMonitor") go generatePerfData(logger) timerManager := timermanager.GetTimerManager() timer, err := timerManager.CreateTimerInSeconds(func() { dataLock.Lock() defer func() { cpuPercentTotal = 0 cpuCount = 0 memRssTotal = 0 memCount = 0 cpuPercentMax = 0 memRssMax = 0 dataLock.Unlock() }() var cpuAvg, memAvg int64 if cpuCount >= 1 { cpuAvg = cpuPercentTotal / cpuCount } else { cpuAvg = -1 } if memCount >= 1 { memAvg = memRssTotal / memCount } else { memAvg = -1 } metrics.GetPerfSampleEvent( "cpuAvg", strconv.FormatInt(cpuAvg, 10), "cpuMax", strconv.FormatInt(cpuPercentMax, 10), "memAvg", strconv.FormatInt(memAvg, 10), "memMax", strconv.FormatInt(memRssMax, 10), ).ReportEvent() }, 3600*24) if err != nil { logger.Error("create timer for perform report failed: ", err) return } mutableSchedule, ok := timer.Schedule.(*timermanager.MutableScheduled) if !ok { logger.Error("unexpected schedule type of perform report timer") return } mutableSchedule.NotImmediately() _, err = timer.Run() if err != nil { logger.Error("start timer for perform report failed: ", err) return } } // Check whether its own cpu and memory load exceeds the limit func checkCpuMemLoad(cpuUsage float64, memory int64) { var _taskFactory *taskengine.TaskFactory = taskengine.GetTaskFactory() if _taskFactory.IsAnyTaskRunning() || update.IsCPUIntensiveActionRunning() || taskengine.GetSessionFactory().IsAnyTaskRunning() { //没有任务执行时才监控性能 return } if statemanager.IsStateManagerTimerRunning() || statemanager.IsStateConfigTimerRunning() { // 拉取并解析终态配置时、应用或监控终态配置时不监控性能 return } if cpuUsage >= flagging.GetResourceCpuLimit() { cpu_overload_count += 1 go func(cpuUsageNow float64, cpuOverLoadCount int) { var profileBuf bytes.Buffer var cpuProfile, cpuProfileErr string // pprof.StartCPUProfile will return err if profiling is already enabled. if rand.Intn(10000) > 100 { cpuProfileErr = "only sampe cpu profile with a probability of 1/100" } else if err := pprof.StartCPUProfile(&profileBuf); err == nil { time.Sleep(10 * time.Second) pprof.StopCPUProfile() cpuProfile = base64.StdEncoding.EncodeToString(profileBuf.Bytes()) } else { cpuProfileErr = err.Error() } metrics.GetCpuOverloadEvent( "cpu", fmt.Sprintf("%.2f", cpuUsageNow), "info", fmt.Sprintf("CPU Overload... CPU=%.2f", cpuUsageNow), "count", strconv.Itoa(cpuOverLoadCount), "cpuProfile", cpuProfile, "cpuProfileErr", cpuProfileErr, ).ReportEvent() }(cpuUsage, cpu_overload_count) log.GetLogger().Infoln("CPU Overload... CPU=", cpuUsage) } else { cpu_overload_count = 0 } if memory >= flagging.GetResourceMemLimit() { // 上报memStats mem_overload_count += 1 memStats := &runtime.MemStats{} runtime.ReadMemStats(memStats) metrics.GetMemOverloadEvent( "mem", fmt.Sprintf("%d", memory), "info", fmt.Sprintf("Memory Overload... MEM=%d", memory), "count", strconv.Itoa(mem_overload_count), "HeapAlloc", strconv.FormatUint(memStats.HeapAlloc, 10), "HeapIdle", strconv.FormatUint(memStats.HeapIdle, 10), "HeapInuse", strconv.FormatUint(memStats.HeapInuse, 10), "HeapReleased", strconv.FormatUint(memStats.HeapReleased, 10), "StackInuse", strconv.FormatUint(memStats.StackInuse, 10), ).ReportEvent() log.GetLogger().Infoln("Memory Overload... MEM=", memory) } else { mem_overload_count = 0 } limit := int(flagging.GetResourceOverloadLimit()) if cpu_overload_count >= limit { cpu_overload_count = reachCpuOverloadLimit(cpu_overload_count, cpuUsage) } if mem_overload_count >= limit { report := clientreport.ClientReport{ ReportType: "self_kill", Info: fmt.Sprintf("mem=%f", float64(memory)), } clientreport.SendReport(report) log.GetLogger().Fatalln("self kill for Memory Overload... Mem=", memory) } }