pkg/process/finders/scanner/finder.go (393 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package scanner
import (
"bufio"
"bytes"
"context"
"fmt"
"os"
"path"
"strconv"
"strings"
"time"
"github.com/spf13/viper"
commonv3 "skywalking.apache.org/repo/goapi/collect/common/v3"
v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/process/v3"
"github.com/shirou/gopsutil/process"
"github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/process/api"
"github.com/apache/skywalking-rover/pkg/process/finders/base"
"github.com/apache/skywalking-rover/pkg/tools/host"
)
var log = logger.GetLogger("process", "finder", "scanner")
type ProcessFinder struct {
conf *Config
manager base.ProcessManager
ctx context.Context
cancelCtx context.CancelFunc
period time.Duration
}
func (p *ProcessFinder) Init(ctx context.Context, conf base.FinderBaseConfig, manager base.ProcessManager) error {
if err := validateConfig(conf.(*Config)); err != nil {
return err
}
p.conf = conf.(*Config)
p.manager = manager
p.ctx, p.cancelCtx = context.WithCancel(ctx)
period, err := time.ParseDuration(p.conf.Period)
if err != nil {
return err
}
p.period = period
return nil
}
func (p *ProcessFinder) Start() {
go p.startWatch()
}
func (p *ProcessFinder) Stop() error {
p.cancelCtx()
return nil
}
func (p *ProcessFinder) DetectType() api.ProcessDetectType {
return api.Scanner
}
func (p *ProcessFinder) ValidateProcessIsSame(p1, p2 base.DetectedProcess) bool {
vm1 := p1.(*Process)
vm2 := p2.(*Process)
return p1.Pid() == p2.Pid() && vm1.cmd == vm2.cmd && p1.Entity().SameWith(p2.Entity())
}
func (p *ProcessFinder) BuildEBPFProcess(ctx *base.BuildEBPFProcessContext, ps base.DetectedProcess) *v3.EBPFProcessProperties {
hostProcess := &v3.EBPFHostProcessMetadata{}
hostProcess.Pid = ps.Pid()
hostProcess.Entity = &v3.EBPFProcessEntityMetadata{
Layer: ps.Entity().Layer,
ServiceName: ps.Entity().ServiceName,
InstanceName: ps.Entity().InstanceName,
ProcessName: ps.Entity().ProcessName,
Labels: ps.Entity().Labels,
}
hostProcess.Properties = []*commonv3.KeyStringValuePair{
{
Key: "host_ip",
Value: ctx.HostIP,
},
{
Key: "pid",
Value: strconv.FormatInt(int64(ps.Pid()), 10),
},
{
Key: "command_line",
Value: ps.(*Process).cmd,
},
}
hostProcess.Properties = append(hostProcess.Properties, p.BuildNecessaryProperties(ps)...)
properties := &v3.EBPFProcessProperties{Metadata: &v3.EBPFProcessProperties_HostProcess{
HostProcess: hostProcess,
}}
return properties
}
func (p *ProcessFinder) BuildNecessaryProperties(ps base.DetectedProcess) []*commonv3.KeyStringValuePair {
return []*commonv3.KeyStringValuePair{
{
Key: "support_ebpf_profiling",
Value: strconv.FormatBool(ps.ProfilingStat() != nil),
},
}
}
func (p *ProcessFinder) ParseProcessID(ps base.DetectedProcess, downstream *v3.EBPFProcessDownstream) string {
if downstream.GetHostProcess() == nil {
return ""
}
if ps.Pid() == downstream.GetHostProcess().GetPid() &&
base.EntityIsSameWithProtocol(ps.Entity(), downstream.GetHostProcess().GetEntityMetadata()) {
return downstream.ProcessId
}
return ""
}
func (p *ProcessFinder) startWatch() {
// find one time
p.findAndReportProcesses()
// schedule
ticker := time.NewTicker(p.period)
for {
select {
case <-ticker.C:
p.findAndReportProcesses()
case <-p.ctx.Done():
return
}
}
}
func (p *ProcessFinder) findAndReportProcesses() {
var detectFunc func() ([]base.DetectedProcess, error)
if p.conf.ScanMode == Regex {
detectFunc = p.regexFindProcesses
} else if p.conf.ScanMode == Agent {
detectFunc = p.agentFindProcesses
}
if processes, err := detectFunc(); err != nil {
log.Warnf("list process failure, %v", err)
} else {
p.manager.SyncAllProcessInFinder(processes)
}
}
func (p *ProcessFinder) regexFindProcesses() ([]base.DetectedProcess, error) {
// find all process
processes, err := p.regexFindMatchedProcesses()
if err != nil {
return nil, err
}
// report to the manager
psList := make([]base.DetectedProcess, 0)
for _, ps := range processes {
psList = append(psList, ps)
}
return psList, nil
}
func (p *ProcessFinder) getProcessTempDir(pro *process.Process) (string, error) {
tmpDir := host.GetFileInHost(fmt.Sprintf("/proc/%d/root/tmp", pro.Pid))
environ, err := pro.Environ()
if err != nil {
log.Warnf("could not query the environments from the process, pid: %d, error: %v", pro.Pid, err)
}
prefix := "TMPDIR="
for _, env := range environ {
if strings.HasPrefix(env, prefix) {
dir := host.GetFileInHost(fmt.Sprintf("/proc/%d/root/%s", pro.Pid, strings.TrimPrefix(env, prefix)))
if pathExists(dir, nil) {
return dir, nil
}
}
}
if pathExists(tmpDir, nil) {
return tmpDir, nil
}
return "", fmt.Errorf("could not found tmp directory for pid: %d", pro.Pid)
}
func (p *ProcessFinder) agentFindProcesses() ([]base.DetectedProcess, error) {
// all system processes
processes, err := process.ProcessesWithContext(p.ctx)
if err != nil {
return nil, err
}
// find all matches processes
findedProcesses := make([]base.DetectedProcess, 0)
for _, pro := range processes {
// already contains the processes
pid := pro.Pid
// if we cannot get temp directory, just ignore it
// May have some system process
tmpDir, err := p.getProcessTempDir(pro)
if err != nil {
continue
}
metadataFilePath, metadataFile, err := p.tryingToGetAgentMetadataFile(pro, tmpDir)
if err != nil {
continue
}
// modify time + recent > now
// means the metadata file is acceptable
if metadataFile.ModTime().Add(p.conf.Agent.ProcessStatusRefreshPeriodDuration).Before(time.Now()) {
continue
}
// build agent process
agentProcess, err := p.buildProcessFromAgentMetadata(pro, metadataFilePath)
if err != nil {
log.Warnf("could not parsing metadata, pid: %d, error: %v", pid, err)
continue
}
findedProcesses = append(findedProcesses, agentProcess)
}
return findedProcesses, nil
}
func (p *ProcessFinder) tryingToGetAgentMetadataFile(pro *process.Process, tmpDir string) (string, os.FileInfo, error) {
// get from the local machine
if f, info, err := p.tryingToGetAgentMetadataFileByPid(int64(pro.Pid), tmpDir); err == nil {
return f, info, nil
}
// get from the child ns(container)
processStatusFilePath := host.GetFileInHost(fmt.Sprintf("/proc/%d/status", pro.Pid))
processStatusFile, err := os.Open(processStatusFilePath)
if err != nil {
return "", nil, err
}
defer processStatusFile.Close()
scanner := bufio.NewScanner(processStatusFile)
for scanner.Scan() {
infos := strings.SplitN(scanner.Text(), "\t", 2)
if len(infos) < 2 {
continue
}
if strings.TrimRight(infos[0], ":") == "NSpid" {
pids := strings.Split(infos[1], "\t")
if len(pids) <= 1 {
break
}
nspidStr := pids[len(pids)-1]
nspid, err := strconv.ParseInt(nspidStr, 10, 10)
if err != nil {
return "", nil, fmt.Errorf("could not parse the nspid: %s, %v", nspidStr, err)
}
if f, info, err := p.tryingToGetAgentMetadataFileByPid(nspid, tmpDir); err == nil {
return f, info, nil
}
}
}
return "", nil, fmt.Errorf("could not found")
}
func (p *ProcessFinder) tryingToGetAgentMetadataFileByPid(pid int64, tmpDir string) (string, os.FileInfo, error) {
metadataFile := path.Join(tmpDir, "apache_skywalking", "process", strconv.FormatInt(pid, 10), "metadata.properties")
f, err := os.Stat(metadataFile)
if err != nil {
return "", nil, err
}
return metadataFile, f, nil
}
func (p *ProcessFinder) buildProcessFromAgentMetadata(pro *process.Process, metaFilePath string) (*Process, error) {
metadata, err := os.ReadFile(metaFilePath)
if err != nil {
return nil, err
}
v := viper.New()
v.SetConfigType("properties")
if err1 := v.ReadConfig(bytes.NewReader(metadata)); err1 != nil {
return nil, err1
}
// parse agent data
agent := &AgentMetadata{}
if err1 := v.Unmarshal(agent); err1 != nil {
return nil, err1
}
cmdline, err := pro.Cmdline()
if err != nil {
return nil, err
}
return NewProcessByAgent(pro, cmdline, agent)
}
func (p *ProcessFinder) regexFindMatchedProcesses() ([]*Process, error) {
// all system processes
processes, err := process.ProcessesWithContext(p.ctx)
if err != nil {
return nil, err
}
// find all matches processes
findedProcesses := make([]*Process, 0)
for _, pro := range processes {
// find the matched process finder
finderConfig, cmdline, err := p.findMatchesFinder(pro)
if err != nil {
log.Warnf("failed to match process %d, reason: %v", pro.Pid, err)
continue
}
if finderConfig == nil {
continue
}
// build the linux process and add to the list
ps := NewProcessByRegex(pro, cmdline, finderConfig)
ps.entity.Layer = finderConfig.Layer
ps.entity.ServiceName, err = p.buildEntity(err, ps, finderConfig.serviceNameBuilder)
ps.entity.InstanceName, err = p.buildEntity(err, ps, finderConfig.instanceNameBuilder)
ps.entity.ProcessName, err = p.buildEntity(err, ps, finderConfig.processNameBuilder)
ps.entity.Labels = finderConfig.ParsedLabels
if err != nil {
log.Warnf("failed to build the process data for pid: %d, reason: %v", pro.Pid, err)
continue
} else {
findedProcesses = append(findedProcesses, ps)
}
}
if len(findedProcesses) == 0 {
return nil, nil
}
// remove duplicated(identity) process
identity2Processes := make(map[string][]*Process)
for _, ps := range findedProcesses {
id := ps.BuildIdentity()
if identity2Processes[id] == nil {
identity2Processes[id] = make([]*Process, 0)
}
identity2Processes[id] = append(identity2Processes[id], ps)
}
result := make([]*Process, 0)
for _, psList := range identity2Processes {
reportProcess := psList[0]
if len(psList) > 1 {
pidList := make([]int32, 0)
for _, ps := range psList {
pidList = append(pidList, ps.pid)
}
log.WithField("command_line", reportProcess.cmd).
WithField("service_name", reportProcess.entity.ServiceName).
WithField("instance_name", reportProcess.entity.InstanceName).
WithField("process_name", reportProcess.entity.ProcessName).
WithField("labels", reportProcess.entity.Labels).
WithField("pid_list", pidList).
Warnf("find multiple similar process in Scanner, " +
"only report the first of these processes. " +
"please update the name of process to identity them more clear.")
}
result = append(result, reportProcess)
}
return result, nil
}
func (p *ProcessFinder) buildEntity(err error, ps *Process, entity *base.TemplateBuilder) (string, error) {
if err != nil {
return "", err
}
return renderTemplate(entity, ps, p)
}
func (p *ProcessFinder) findMatchesFinder(ps *process.Process) (*RegexFinder, string, error) {
// verify the process exists, if not exists just return
if exists, err := process.PidExists(ps.Pid); err != nil {
return nil, "", err
} else if !exists {
return nil, "", nil
}
cmdline, err := ps.Cmdline()
if err != nil {
return nil, "", fmt.Errorf("query command line failure: %v", err)
}
var matched *RegexFinder
for _, finder := range p.conf.RegexFinders {
if finder.commandlineRegex.MatchString(cmdline) {
if matched == nil {
matched = finder
} else {
log.Warnf("found multiple finder for the process %d, command line: %s, choose the first one mached to build process",
ps.Pid, cmdline)
return matched, cmdline, nil
}
}
}
return matched, cmdline, nil
}
func validateConfig(conf *Config) error {
if conf.ScanMode == Agent {
var err error
conf.Agent.ProcessStatusRefreshPeriodDuration, err = base.DurationMustNotNull(err, "process_status_refresh_period",
conf.Agent.ProcessStatusRefreshPeriod)
return err
} else if conf.ScanMode != Regex {
return fmt.Errorf("could not found mode: %s", conf.ScanMode)
}
if len(conf.RegexFinders) == 0 {
return fmt.Errorf("must have one Scanner process finder")
}
// validate config
for _, f := range conf.RegexFinders {
var err error
err = base.StringMustNotNull(err, "layer", f.Layer)
f.commandlineRegex, err = base.RegexMustNotNull(err, "match_cmd_regex", f.MatchCommandRegex)
f.serviceNameBuilder, err = base.TemplateMustNotNull(err, "service_name", f.ServiceName)
f.instanceNameBuilder, err = base.TemplateMustNotNull(err, "instance_name", f.InstanceName)
f.processNameBuilder, err = base.TemplateMustNotNull(err, "process_name", f.ProcessName)
f.ParsedLabels = parseLabels(f.LabelsStr)
if err != nil {
return err
}
}
return nil
}
func parseLabels(labelStr string) []string {
tmp := strings.Split(labelStr, ",")
result := make([]string, 0)
for _, s := range tmp {
if s != "" {
result = append(result, s)
}
}
return result
}
func pathExists(exe string, err error) bool {
if err != nil {
return false
}
_, e := os.Stat(exe)
return !os.IsNotExist(e)
}
type AgentMetadata struct {
Layer string `mapstructure:"layer"`
ServiceName string `mapstructure:"service_name"`
InstanceName string `mapstructure:"instance_name"`
ProcessName string `mapstructure:"process_name"`
Properties string `mapstructure:"properties"`
Labels string `mapstructure:"labels"`
Language string `mapstructure:"language"`
}