plugins/input/systemv2/input_system_v2.go (357 lines of code) (raw):

// Copyright 2021 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package systemv2 import ( "github.com/alibaba/ilogtail/pkg/helper" "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/pkg/pipeline" "github.com/alibaba/ilogtail/pkg/util" "math" "os" "regexp" "strconv" "time" "github.com/prometheus/procfs" "github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/disk" "github.com/shirou/gopsutil/host" "github.com/shirou/gopsutil/load" "github.com/shirou/gopsutil/mem" "github.com/shirou/gopsutil/net" ) // InputSystem plugin is modified with care, because two collect libs are used, which are procfs and gopsutil. // They are works well on the host machine. But on the linux virtual environment, they are different. // The procfs or read proc file system should mount the `logtail_host` path, more details please see `helper.mount_others.go`. // The gopsutil lib only support mount path with ENV config, more details please see `github.com/shirou/gopsutil/internal/common/common.go`. type InputSystem struct { Core bool CPU bool Mem bool Disk bool Net bool Protocol bool TCP bool OpenFd bool CPUPercent bool Disks []string NetInterfaces []string Labels map[string]string ExcludeDiskFsType string ExcludeDiskPath string lastInfo *host.InfoStat lastCPUStat cpu.TimesStat lastCPUTime time.Time lastCPUTotal float64 lastCPUBusy float64 lastNetStatMap map[string]*net.IOCountersStat lastNetTime time.Time lastProtoAll []net.ProtoCountersStat lastProtoTime time.Time lastDiskStat disk.IOCountersStat lastDiskStatAll map[string]disk.IOCountersStat lastDiskTime time.Time commonLabels helper.MetricLabels collectTime time.Time context pipeline.Context excludeDiskFsTypeRegex *regexp.Regexp excludeDiskPathRegex *regexp.Regexp fs *procfs.FS //nolint:unused } func (r *InputSystem) Description() string { return "Support collect system metrics on the host machine or Linux virtual environments." } func (r *InputSystem) CommonInit(context pipeline.Context) (int, error) { if r.ExcludeDiskFsType != "" { reg, err := regexp.Compile(r.ExcludeDiskFsType) if err != nil { logger.Error(r.context.GetRuntimeContext(), "COMPILE_REGEXP_ALARM", "err", err) return 0, err } r.excludeDiskFsTypeRegex = reg } if r.ExcludeDiskPath != "" { reg, err := regexp.Compile(r.ExcludeDiskPath) if err != nil { logger.Error(r.context.GetRuntimeContext(), "COMPILE_REGEXP_ALARM", "err", err) return 0, err } r.excludeDiskPathRegex = reg } r.commonLabels.Append("hostname", util.GetHostName()) r.commonLabels.Append("ip", util.GetIPAddress()) for key, val := range r.Labels { r.commonLabels.Append(key, val) } return 0, nil } func (r *InputSystem) addMetric(collector pipeline.Collector, name string, labels *helper.MetricLabels, value float64) { collector.AddRawLog(helper.NewMetricLog(name, r.collectTime.UnixNano(), value, labels)) } func (r *InputSystem) CollectCore(collector pipeline.Collector) { // host info if r.lastInfo == nil { r.lastInfo, _ = host.Info() } // load stat loadStat, err := load.Avg() if err == nil { r.addMetric(collector, "system_load1", &r.commonLabels, loadStat.Load1) r.addMetric(collector, "system_load5", &r.commonLabels, loadStat.Load5) r.addMetric(collector, "system_load15", &r.commonLabels, loadStat.Load15) } r.addMetric(collector, "system_boot_time", &r.commonLabels, float64(r.lastInfo.BootTime)) } func (r *InputSystem) CollectCPU(collector pipeline.Collector) { // cpu stat cpuStat, err := cpu.Times(false) cpuInfo, _ := cpu.Info() ncpus := int32(0) for _, c := range cpuInfo { ncpus += c.Cores } r.addMetric(collector, "cpu_count", &r.commonLabels, float64(ncpus)) if err == nil && len(cpuStat) > 0 { cpuBusy := cpuStat[0].GuestNice + cpuStat[0].Guest + cpuStat[0].Nice + cpuStat[0].Softirq + cpuStat[0].Irq + cpuStat[0].User + cpuStat[0].System cpuTotal := cpuBusy + cpuStat[0].Idle + cpuStat[0].Iowait + cpuStat[0].Steal // cpushare计算 cpuShareFactor := 1.0 cpushareEnv := os.Getenv("SIGMA_CPU_REQUEST") if len(cpushareEnv) > 0 { cpuRequest, err := strconv.Atoi(cpushareEnv) if err != nil || cpuRequest <= 0 || ncpus == 0 { logger.Error(r.context.GetRuntimeContext(), "GET_SIGMA_ENV_ERROR", "get sigma env failed", "error", err, "ncpus", ncpus, "SIGMA_CPU_REQUEST", cpushareEnv) } else { cpuShareFactor = float64(ncpus) / (float64(cpuRequest) / 1000.) } } deltaTotal := cpuTotal - r.lastCPUTotal if r.CPUPercent && !r.lastCPUTime.IsZero() && deltaTotal > 0 { r.addMetric(collector, "cpu_util", &r.commonLabels, 100*(cpuBusy-r.lastCPUBusy)/deltaTotal*cpuShareFactor) r.addMetric(collector, "cpu_wait_util", &r.commonLabels, 100*(cpuStat[0].Iowait-r.lastCPUStat.Iowait)/deltaTotal*cpuShareFactor) r.addMetric(collector, "cpu_sys_util", &r.commonLabels, 100*(cpuStat[0].System-r.lastCPUStat.System)/deltaTotal*cpuShareFactor) r.addMetric(collector, "cpu_user_util", &r.commonLabels, 100*(cpuStat[0].User-r.lastCPUStat.User)/deltaTotal*cpuShareFactor) r.addMetric(collector, "cpu_irq_util", &r.commonLabels, 100*(cpuStat[0].Irq-r.lastCPUStat.Irq)/deltaTotal*cpuShareFactor) r.addMetric(collector, "cpu_softirq_util", &r.commonLabels, 100*(cpuStat[0].Softirq-r.lastCPUStat.Softirq)/deltaTotal*cpuShareFactor) r.addMetric(collector, "cpu_nice_util", &r.commonLabels, 100*(cpuStat[0].Nice-r.lastCPUStat.Nice)/deltaTotal*cpuShareFactor) r.addMetric(collector, "cpu_steal_util", &r.commonLabels, 100*(cpuStat[0].Steal-r.lastCPUStat.Steal)/deltaTotal*cpuShareFactor) r.addMetric(collector, "cpu_guest_util", &r.commonLabels, 100*(cpuStat[0].Guest-r.lastCPUStat.Guest)/deltaTotal*cpuShareFactor) r.addMetric(collector, "cpu_guestnice_util", &r.commonLabels, 100*(cpuStat[0].GuestNice-r.lastCPUStat.GuestNice)/deltaTotal*cpuShareFactor) } r.lastCPUTime = time.Now() r.lastCPUStat = cpuStat[0] r.lastCPUBusy = cpuBusy r.lastCPUTotal = cpuTotal } } func (r *InputSystem) CollectMem(collector pipeline.Collector) { // mem stat memStat, err := mem.VirtualMemory() if err == nil { r.addMetric(collector, "mem_util", &r.commonLabels, memStat.UsedPercent) r.addMetric(collector, "mem_cache", &r.commonLabels, float64(memStat.Cached)) r.addMetric(collector, "mem_free", &r.commonLabels, float64(memStat.Free)) r.addMetric(collector, "mem_available", &r.commonLabels, float64(memStat.Available)) r.addMetric(collector, "mem_used", &r.commonLabels, float64(memStat.Used)) r.addMetric(collector, "mem_total", &r.commonLabels, float64(memStat.Total)) } swapStat, err := mem.SwapMemory() if err == nil { r.addMetric(collector, "mem_swap_util", &r.commonLabels, swapStat.UsedPercent) } } func (r *InputSystem) collectOneDisk(collector pipeline.Collector, name string, timeDeltaSec float64, last, now *disk.IOCountersStat) { labels := r.commonLabels.Clone() labels.Append("disk", name) r.addMetric(collector, "disk_rbps", labels, float64(now.ReadBytes-last.ReadBytes)/timeDeltaSec) r.addMetric(collector, "disk_wbps", labels, float64(now.WriteBytes-last.WriteBytes)/timeDeltaSec) r.addMetric(collector, "disk_riops", labels, float64(now.ReadCount-last.ReadCount)/timeDeltaSec) r.addMetric(collector, "disk_wiops", labels, float64(now.WriteCount-last.WriteCount)/timeDeltaSec) if now.ReadCount-last.ReadCount > 0 { r.addMetric(collector, "disk_rlatency", labels, float64(now.ReadTime-last.ReadTime)/float64(now.ReadCount-last.ReadCount)) } else { r.addMetric(collector, "disk_rlatency", labels, math.NaN()) } if now.WriteCount-last.WriteCount > 0 { r.addMetric(collector, "disk_wlatency", labels, float64(now.WriteTime-last.WriteTime)/float64(now.WriteCount-last.WriteCount)) } else { r.addMetric(collector, "disk_wlatency", labels, math.NaN()) } if name != "total" { r.addMetric(collector, "disk_util", labels, float64(now.IoTime-last.IoTime)*100./1000./timeDeltaSec) } } func (r *InputSystem) CollectDisk(collector pipeline.Collector) { r.CollectDiskUsage(collector) // disk stat allIoCounters, err := disk.IOCounters(r.Disks...) if err == nil { totalIoCount := disk.IOCountersStat{} for _, ioCount := range allIoCounters { if ioCount.Name == "" { continue } lastChar := ioCount.Name[len(ioCount.Name)-1] if lastChar >= '0' && lastChar <= '9' { // means disk partition, don't need to record to total metrics continue } totalIoCount.ReadBytes += ioCount.ReadBytes totalIoCount.WriteBytes += ioCount.WriteBytes totalIoCount.ReadCount += ioCount.ReadCount totalIoCount.WriteCount += ioCount.WriteCount totalIoCount.ReadTime += ioCount.ReadTime totalIoCount.WriteTime += ioCount.WriteTime totalIoCount.IopsInProgress += ioCount.IopsInProgress totalIoCount.IoTime += ioCount.IoTime } nowTime := time.Now() if !r.lastDiskTime.IsZero() { timeDeltaSec := float64(nowTime.Sub(r.lastDiskTime)) / float64(time.Second) r.collectOneDisk(collector, "total", timeDeltaSec, &r.lastDiskStat, &totalIoCount) for key, ioCount := range allIoCounters { if lastIOCount, ok := r.lastDiskStatAll[key]; ok { count := ioCount r.collectOneDisk(collector, key, timeDeltaSec, &lastIOCount, &count) } } } r.lastDiskTime = nowTime r.lastDiskStat = totalIoCount r.lastDiskStatAll = allIoCounters } } func (r *InputSystem) collectOneNet(collector pipeline.Collector, name string, timeDeltaSec float64, last, now *net.IOCountersStat) { labels := r.commonLabels.Clone() labels.Append("interface", name) r.addMetric(collector, "net_in", labels, float64(now.BytesRecv-last.BytesRecv)/timeDeltaSec) r.addMetric(collector, "net_out", labels, float64(now.BytesSent-last.BytesSent)/timeDeltaSec) r.addMetric(collector, "net_in_pkt", labels, float64(now.PacketsRecv-last.PacketsRecv)/timeDeltaSec) r.addMetric(collector, "net_out_pkt", labels, float64(now.PacketsSent-last.PacketsSent)/timeDeltaSec) deltaErrIn := now.Errin - last.Errin deltaErrOut := now.Errout - last.Errout deltaDropIn := now.Dropin - last.Dropin deltaDropOut := now.Dropout - last.Dropout deltaPacketsSent := now.PacketsSent - last.PacketsSent deltaPacketsRecv := now.PacketsRecv - last.PacketsRecv deltaErrTotal := deltaErrIn + deltaErrOut deltaDropTotal := deltaDropIn + deltaDropOut deltaPacketsTotal := deltaPacketsSent + deltaPacketsRecv if 0 != deltaPacketsTotal { r.addMetric(collector, "net_drop_util", labels, 100*float64(deltaDropTotal)/float64(deltaPacketsTotal)) r.addMetric(collector, "net_err_util", labels, 100*float64(deltaErrTotal)/float64(deltaPacketsTotal)) // fields["err.pkts"] = strconv.FormatInt(deltaErrTotal, 10) // fields["drop.pkts"] = strconv.FormatInt(deltaDropTotal, 10) // fields["pkts.total"] = strconv.FormatInt(int64(deltaPacketsTotal), 10) } } func (r *InputSystem) CollectNet(collector pipeline.Collector) { netIoStatAll, err := net.IOCounters(true) if err == nil && len(netIoStatAll) > 0 { nowTime := time.Now() if !r.lastNetTime.IsZero() { timeDeltaSec := float64(nowTime.Sub(r.lastNetTime)) / float64(time.Second) // collect every interface var lastTotal, total net.IOCountersStat var built bool for i := 0; i < len(netIoStatAll); i++ { if ls, ok := r.lastNetStatMap[netIoStatAll[i].Name]; ok { r.collectOneNet(collector, netIoStatAll[i].Name, timeDeltaSec, ls, &netIoStatAll[i]) // build total total.BytesRecv += netIoStatAll[i].BytesRecv total.BytesSent += netIoStatAll[i].BytesSent total.Dropin += netIoStatAll[i].Dropin total.Dropout += netIoStatAll[i].Dropout total.Errin += netIoStatAll[i].Errin total.Errout += netIoStatAll[i].Errout total.Fifoin += netIoStatAll[i].Fifoin total.Fifoout += netIoStatAll[i].Fifoout total.PacketsRecv += netIoStatAll[i].PacketsRecv total.PacketsSent += netIoStatAll[i].PacketsSent lastTotal.BytesRecv += ls.BytesRecv lastTotal.BytesSent += ls.BytesSent lastTotal.Dropin += ls.Dropin lastTotal.Dropout += ls.Dropout lastTotal.Errin += ls.Errin lastTotal.Errout += ls.Errout lastTotal.Fifoin += ls.Fifoin lastTotal.Fifoout += ls.Fifoout lastTotal.PacketsRecv += ls.PacketsRecv lastTotal.PacketsSent += ls.PacketsSent built = true } } if built { r.collectOneNet(collector, "total", timeDeltaSec, &lastTotal, &total) } } r.lastNetTime = nowTime r.lastNetStatMap = make(map[string]*net.IOCountersStat) for i := range netIoStatAll { r.lastNetStatMap[netIoStatAll[i].Name] = &netIoStatAll[i] } } } func (r *InputSystem) CollectProtocol(collector pipeline.Collector) { protoCounterStats, err := net.ProtoCounters([]string{}) if err == nil && len(protoCounterStats) > 0 { nowTime := time.Now() retransSegField := "RetransSegs" totalOutSegField := "OutSegs" totalInSegField := "InSegs" if !r.lastProtoTime.IsZero() && len(protoCounterStats) == len(r.lastProtoAll) { for i := range protoCounterStats { // 只要tcp if protoCounterStats[i].Protocol == "tcp" { if len(r.lastProtoAll) <= i || r.lastProtoAll[i].Protocol != protoCounterStats[i].Protocol { continue } r.CollectTCPStats(collector, &protoCounterStats[i]) deltaRetransSegs := protoCounterStats[i].Stats[retransSegField] - r.lastProtoAll[i].Stats[retransSegField] deltaTotalOutSegs := protoCounterStats[i].Stats[totalOutSegField] - r.lastProtoAll[i].Stats[totalOutSegField] deltaTotalInSegs := protoCounterStats[i].Stats[totalInSegField] - r.lastProtoAll[i].Stats[totalInSegField] r.addMetric(collector, "protocol_tcp_outsegs", &r.commonLabels, float64(deltaTotalOutSegs)) r.addMetric(collector, "protocol_tcp_insegs", &r.commonLabels, float64(deltaTotalInSegs)) r.addMetric(collector, "protocol_tcp_retran_segs", &r.commonLabels, float64(deltaRetransSegs)) if deltaTotalOutSegs <= 0 { r.addMetric(collector, "protocol_tcp_retran_util", &r.commonLabels, 0.) } else { r.addMetric(collector, "protocol_tcp_retran_util", &r.commonLabels, 100*float64(deltaRetransSegs)/float64(deltaTotalOutSegs)) } } } } r.lastProtoTime = nowTime r.lastProtoAll = protoCounterStats } } func (r *InputSystem) Collect(collector pipeline.Collector) error { r.collectTime = time.Now() r.CollectCore(collector) if r.CPU { r.CollectCPU(collector) } if r.Mem { r.CollectMem(collector) } if r.Disk { err := util.DoFuncWithTimeout(time.Second*3, func() error { r.CollectDisk(collector) return nil }) if err != nil { logger.Error(r.context.GetRuntimeContext(), "READ_DISK_ALARM", "read disk metrics timeout, would skip disk collection", err) r.Disk = false } } if r.Net { r.CollectNet(collector) } if r.Protocol { r.CollectProtocol(collector) } if r.OpenFd { r.CollectOpenFD(collector) } return nil } func init() { pipeline.MetricInputs["metric_system_v2"] = func() pipeline.MetricInput { return &InputSystem{ CPUPercent: true, CPU: true, Mem: true, Disk: true, Net: true, Protocol: true, OpenFd: true, TCP: false, ExcludeDiskPath: "^/(dev|proc|sys|var/lib/docker/.+|var/lib/kubelet/pods/.+)($|/)", ExcludeDiskFsType: "^(autofs|binfmt_misc|cgroup|configfs|debugfs|devpts|devtmpfs|fusectl|hugetlbfs|mqueue|overlay|proc|procfs|pstore|rpc_pipefs|securityfs|sysfs|tracefs)$", } } }