plugins/input/process/input_process.go (234 lines of code) (raw):
// Copyright 2021 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package process
import (
"github.com/alibaba/ilogtail/pkg/helper"
"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/pipeline"
"github.com/alibaba/ilogtail/pkg/util"
"regexp"
"sort"
"time"
)
const (
defaultMaxProcessCount = 100
defaultMaxIdentifierLength = 100
)
// InputProcess plugin is modified with care, because two collect libs are used, which are procfs and gopsutil.
// They are works well on the host machine. But on the linux virtual environment, they are different.
// The procfs or read proc file system should mount the `logtail_host` path, more details please see `helper.mount_others.go`.
// The gopsutil lib only support mount path with ENV config, more details please see `github.com/shirou/gopsutil/internal/common/common.go`.
type InputProcess struct {
MaxIdentifierLength int // The maximum length for the process identifier ,such as CMD and CWD.
MaxProcessCount int // The maximum count of the processes.
TopNCPU int // The number of the selected processes that order by the CPU usage.
TopNMem int // The number of the selected processes that order by the Memory usage.
MinCPULimitPercent float64 // The minimum CPU percentage for collecting.
MinMemoryLimitKB int // The minimum Memory usage for collecting.
ProcessNamesRegex []string // The regular expressions for matching processes.
Labels map[string]string // The user custom labels.
// The optional metric switches
OpenFD bool
Thread bool
NetIO bool
IO bool
context pipeline.Context
lastProcesses map[int]processCache
regexpList []*regexp.Regexp
commonLabels helper.MetricLabels
collectTime time.Time
}
func (ip *InputProcess) Init(context pipeline.Context) (int, error) {
ip.context = context
if ip.MaxProcessCount <= 0 {
ip.MaxProcessCount = defaultMaxProcessCount
}
if ip.MaxIdentifierLength <= 0 {
ip.MaxIdentifierLength = defaultMaxIdentifierLength
}
for _, regStr := range ip.ProcessNamesRegex {
if reg, err := regexp.Compile(regStr); err == nil {
ip.regexpList = append(ip.regexpList, reg)
} else {
logger.Error(ip.context.GetRuntimeContext(), "INVALID_REGEX_ALARM", "invalid regex", regStr, "error", err)
}
}
ip.commonLabels.Append("hostname", util.GetHostName())
ip.commonLabels.Append("ip", util.GetIPAddress())
for key, val := range ip.Labels {
ip.commonLabels.Append(key, val)
}
return 0, nil
}
func (ip *InputProcess) Description() string {
return "Support collect process metrics on the host machine or Linux virtual environments."
}
func (ip *InputProcess) Collect(collector pipeline.Collector) error {
ip.collectTime = time.Now()
matchedProcesses, err := ip.filterMatchedProcesses()
if err != nil {
return err
}
for _, pc := range matchedProcesses {
labels := pc.Labels(&ip.commonLabels)
// add necessary metrics
ip.addCPUMetrics(pc, labels, collector)
ip.addMemMetrics(pc, labels, collector)
// add optional metrics
if ip.Thread {
ip.addThreadMetrics(pc, labels, collector)
}
if ip.OpenFD {
ip.addOpenFilesMetrics(pc, labels, collector)
}
if ip.NetIO {
ip.addNetIOMetrics(pc, labels, collector)
}
if ip.IO {
ip.addIOMetrics(pc, labels, collector)
}
}
return nil
}
// filterMatchedProcesses select the matched processing in the whole processes.
func (ip *InputProcess) filterMatchedProcesses() (matchedProcesses []processCache, err error) {
caches, err := findAllProcessCache(ip.MaxProcessCount)
if err != nil {
logger.Error(ip.context.GetRuntimeContext(), "PROCESS_LIST_ALARM", "error", err)
return
}
matchedProcesses = ip.filterRegexMatchedProcess(caches)
matchedProcesses = ip.filterTopAndThresholdMatchedProcesses(matchedProcesses)
return
}
// filterRegexMatchedProcess select the processes matched the regular expressions and load the necessary
// process status, such as memory and CPU.
func (ip *InputProcess) filterRegexMatchedProcess(caches []processCache) (matchedProcesses []processCache) {
newProcessesMap := make(map[int]processCache)
matchedProcesses = make([]processCache, 0, util.MinInt(ip.MaxProcessCount, len(caches)))
regexpChecker := func(name string) bool {
for _, r := range ip.regexpList {
if r.MatchString(name) {
return true
}
}
return false
}
for _, pc := range caches {
// filter by history cache processes or regex conditions.
if lpc, ok := ip.lastProcesses[pc.GetPid()]; ok && pc.Same(lpc) {
pc = lpc
} else if len(ip.regexpList) > 0 && !regexpChecker(pc.GetExe()) && !regexpChecker(pc.GetCmdLine()) {
continue
}
if !pc.FetchCore() {
continue
}
if pc.FetchCoreCount() > 1 {
matchedProcesses = append(matchedProcesses, pc)
}
newProcessesMap[pc.GetPid()] = pc
}
ip.lastProcesses = newProcessesMap
return
}
// filterTopAndThresholdMatchedProcesses select the processes by the following conditions includes the limit number and the threshold.
func (ip *InputProcess) filterTopAndThresholdMatchedProcesses(processList []processCache) (matchedProcesses []processCache) {
inChecker := func(checkOne processCache, container []processCache) bool {
for _, pc := range container {
if pc.Same(checkOne) {
return true
}
}
return false
}
// select the processes matched the CPU and Memory threshold
thresholdMatchedProcesses := make([]processCache, 0, 16)
for _, pc := range processList {
if pc.GetProcessStatus().CPUPercentage.TotalPercentage >= ip.MinCPULimitPercent {
thresholdMatchedProcesses = append(thresholdMatchedProcesses, pc)
}
}
for _, pc := range processList {
if inChecker(pc, thresholdMatchedProcesses) {
continue
}
if pc.GetProcessStatus().Memory.Rss >= uint64(ip.MinMemoryLimitKB)*1024 {
thresholdMatchedProcesses = append(thresholdMatchedProcesses, pc)
}
}
if ip.TopNMem <= 0 && ip.TopNCPU <= 0 {
if ip.MaxProcessCount < len(thresholdMatchedProcesses) {
matchedProcesses = thresholdMatchedProcesses[:ip.MaxProcessCount]
} else {
matchedProcesses = thresholdMatchedProcesses
}
}
// select the processes matched the CPU or Memory top condition
if ip.TopNCPU > 0 {
sort.Slice(thresholdMatchedProcesses, func(i, j int) bool {
return thresholdMatchedProcesses[i].GetProcessStatus().CPUPercentage.TotalPercentage >
thresholdMatchedProcesses[j].GetProcessStatus().CPUPercentage.TotalPercentage
})
appendCount := util.MinInt(ip.MaxProcessCount, util.MinInt(len(thresholdMatchedProcesses), ip.TopNCPU))
for i := 0; i < appendCount; i++ {
matchedProcesses = append(matchedProcesses, thresholdMatchedProcesses[i])
}
}
if ip.TopNMem > 0 {
sort.Slice(thresholdMatchedProcesses, func(i, j int) bool {
return thresholdMatchedProcesses[i].GetProcessStatus().Memory.Rss >
thresholdMatchedProcesses[j].GetProcessStatus().Memory.Rss
})
appendCount := util.MinInt(ip.MaxProcessCount, util.MinInt(len(thresholdMatchedProcesses), ip.TopNMem))
for i := 0; i < appendCount; i++ {
if len(matchedProcesses) == ip.MaxProcessCount {
break
}
if inChecker(thresholdMatchedProcesses[i], matchedProcesses) {
continue
}
matchedProcesses = append(matchedProcesses, thresholdMatchedProcesses[i])
}
}
return
}
func (ip *InputProcess) addCPUMetrics(pc processCache, labels *helper.MetricLabels, collector pipeline.Collector) {
if percentage := pc.GetProcessStatus().CPUPercentage; percentage != nil {
ip.addMetric(collector, "process_cpu_percent", &ip.collectTime, labels, percentage.TotalPercentage)
ip.addMetric(collector, "process_cpu_stime_percent", &ip.collectTime, labels, percentage.STimePercentage)
ip.addMetric(collector, "process_cpu_utime_percent", &ip.collectTime, labels, percentage.UTimePercentage)
}
}
func (ip *InputProcess) addMetric(collector pipeline.Collector, name string, t *time.Time, labels *helper.MetricLabels, val float64) {
collector.AddRawLog(helper.NewMetricLog(name, t.UnixNano(), val, labels))
}
func (ip *InputProcess) addMemMetrics(pc processCache, labels *helper.MetricLabels, collector pipeline.Collector) {
if mem := pc.GetProcessStatus().Memory; mem != nil {
ip.addMetric(collector, "process_mem_rss", &ip.collectTime, labels, float64(mem.Rss))
ip.addMetric(collector, "process_mem_swap", &ip.collectTime, labels, float64(mem.Swap))
ip.addMetric(collector, "process_mem_vsz", &ip.collectTime, labels, float64(mem.Vsz))
ip.addMetric(collector, "process_mem_data", &ip.collectTime, labels, float64(mem.Data))
}
}
func (ip *InputProcess) addThreadMetrics(pc processCache, labels *helper.MetricLabels, collector pipeline.Collector) {
if pc.FetchThreads() {
ip.addMetric(collector, "process_threads", &ip.collectTime, labels, float64(pc.GetProcessStatus().ThreadsNum))
}
}
func (ip *InputProcess) addOpenFilesMetrics(pc processCache, labels *helper.MetricLabels, collector pipeline.Collector) {
if pc.FetchFds() {
ip.addMetric(collector, "process_fds", &ip.collectTime, labels, float64(pc.GetProcessStatus().FdsNum))
}
}
func (ip *InputProcess) addNetIOMetrics(pc processCache, labels *helper.MetricLabels, collector pipeline.Collector) {
if pc.FetchNetIO() {
net := pc.GetProcessStatus().NetIO
ip.addMetric(collector, "process_net_in_bytes", &ip.collectTime, labels, float64(net.InBytes))
ip.addMetric(collector, "process_net_in_packet", &ip.collectTime, labels, float64(net.InPacket))
ip.addMetric(collector, "process_net_out_bytes", &ip.collectTime, labels, float64(net.OutBytes))
ip.addMetric(collector, "process_net_out_packet", &ip.collectTime, labels, float64(net.OutPacket))
}
}
func (ip *InputProcess) addIOMetrics(pc processCache, labels *helper.MetricLabels, collector pipeline.Collector) {
if pc.FetchIO() {
io := pc.GetProcessStatus().IO
ip.addMetric(collector, "process_read_bytes", &ip.collectTime, labels, float64(io.ReadeBytes))
ip.addMetric(collector, "process_write_bytes", &ip.collectTime, labels, float64(io.WriteBytes))
ip.addMetric(collector, "process_read_count", &ip.collectTime, labels, float64(io.ReadCount))
ip.addMetric(collector, "process_write_count", &ip.collectTime, labels, float64(io.WriteCount))
}
}
func init() {
pipeline.MetricInputs["metric_process_v2"] = func() pipeline.MetricInput {
return &InputProcess{
TopNCPU: 5,
MinMemoryLimitKB: 100,
lastProcesses: make(map[int]processCache),
}
}
}