internal/resources/fetching/fetchers/k8s/process_fetcher.go (226 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package fetchers import ( "context" "fmt" "io/fs" "os" "path" "path/filepath" "regexp" "strconv" "strings" "time" "github.com/elastic/beats/v7/x-pack/osquerybeat/ext/osquery-extension/pkg/proc" "github.com/elastic/elastic-agent-libs/mapstr" "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/yaml" "github.com/elastic/cloudbeat/internal/infra/clog" "github.com/elastic/cloudbeat/internal/resources/fetching" "github.com/elastic/cloudbeat/internal/resources/fetching/cycle" ) const ( // CMDArgumentMatcher is a regex pattern that should match a process argument and its value // Expects format as the following: --<key><delimiter><value>. // For example: --config=a.json // The regex supports two delimiters "=" and "" CMDArgumentMatcher = "\\b%s[\\s=]\\/?(\\S+)" ProcessResourceType = "process" ProcessSubType = "process" directory = "/hostfs" userHz = 100 ) type EvalProcResource struct { PID string `json:"pid"` Cmd string `json:"command"` Stat proc.ProcStat `json:"stat"` ExternalData mapstr.M `json:"external_data"` } // ProcCommonData According to https://www.elastic.co/guide/en/ecs/current/ecs-process.html type ProcCommonData struct { // Parent process. Parent *ProcCommonData `json:"parent,omitempty"` // Process id. PID int64 `json:"pid,omitempty"` // Process name. // Sometimes called program name or similar. Name string `json:"name,omitempty"` // Identifier of the group of processes the process belongs to. PGID int64 `json:"pgid,omitempty"` // Full command line that started the process, including the absolute path // to the executable, and all arguments. // Some arguments may be filtered to protect sensitive information. CommandLine string `json:"command_line,omitempty"` // Array of process arguments, starting with the absolute path to the // executable. // May be filtered to protect sensitive information. Args []string `json:"args,omitempty"` // Length of the process.args array. // This field can be useful for querying or performing bucket analysis on // how many arguments were provided to start a process. More arguments may // be an indication of suspicious activity. ArgsCount int64 `json:"args_count,omitempty"` // Process title. // The proctitle, sometimes the same as process name. Can also be // different: for example a browser setting its title to the web page // currently opened. Title string `json:"title,omitempty"` // The time the process started. Start time.Time `json:"start"` // Seconds the process has been up. Uptime int64 `json:"uptime,omitempty"` } type ProcResource struct { EvalResource EvalProcResource ElasticCommon ProcCommonData } type ProcessesFetcher struct { log *clog.Logger Fs fs.FS resourceCh chan fetching.ResourceInfo processes ProcessesConfigMap } type ProcessInputConfiguration struct { ConfigFileArguments []string `config:"config-file-arguments"` } type ProcessesConfigMap map[string]ProcessInputConfiguration func NewProcessFetcher(log *clog.Logger, ch chan fetching.ResourceInfo, processes ProcessesConfigMap) *ProcessesFetcher { return &ProcessesFetcher{ log: log, Fs: os.DirFS(directory), resourceCh: ch, processes: processes, } } func (f *ProcessesFetcher) Fetch(_ context.Context, cycleMetadata cycle.Metadata) error { f.log.Debug("Starting ProcessesFetcher.Fetch") pids, err := proc.ListFS(f.Fs) if err != nil { return fmt.Errorf("failed to list processes: %w", err) } // If errors occur during read, then return what we have till now // without reporting errors. for _, p := range pids { stat, err := proc.ReadStatFS(f.Fs, p) if err != nil { f.log.Errorf("error while reading /proc/<pid>/stat for process %s: %s", p, err.Error()) continue } // Get the full command line name and not the /proc/pid/status one which might be silently truncated. cmd, err := proc.ReadCmdLineFS(f.Fs, p) if err != nil { f.log.Error("error while reading /proc/<pid>/cmdline for process %s: %s", p, err.Error()) continue } name := extractCommandName(cmd) processConfig, isProcessRequired := f.processes[name] if !isProcessRequired { continue } fetchedResource := f.fetchProcessData(stat, processConfig, p, cmd) f.resourceCh <- fetching.ResourceInfo{Resource: fetchedResource, CycleMetadata: cycleMetadata} } return nil } func (f *ProcessesFetcher) fetchProcessData(procStat proc.ProcStat, processConf ProcessInputConfiguration, processId string, cmd string) fetching.Resource { configMap := f.getProcessConfigurationFile(processConf, cmd, procStat.Name) evalRes := EvalProcResource{PID: processId, Cmd: cmd, Stat: procStat, ExternalData: configMap} procCd := f.createProcCommonData(procStat, cmd, processId) return ProcResource{EvalResource: evalRes, ElasticCommon: procCd} } func (f *ProcessesFetcher) createProcCommonData(stat proc.ProcStat, cmd string, pid string) ProcCommonData { processID, err := strconv.ParseInt(pid, 10, 64) if err != nil { f.log.Errorf("Couldn't parse PID, pid: %s", pid) } startTime, err := strconv.ParseUint(stat.StartTime, 10, 64) if err != nil { f.log.Errorf("Couldn't parse stat.StartTime, startTime: %s", stat.StartTime) } pgid, err := strconv.ParseInt(stat.Group, 10, 64) if err != nil { f.log.Errorf("Couldn't parse stat.Group, Group: %s, Error: %v", stat.Group, err) } ppid, err := strconv.ParseInt(stat.Parent, 10, 64) if err != nil { f.log.Errorf("Couldn't parse stat.Parent, Parent: %s, Error: %v", stat.Parent, err) } sysUptime, err := proc.ReadUptimeFS(f.Fs) if err != nil { f.log.Error("couldn't read system boot time", err) } uptimeDate := time.Now().Add(-time.Duration(sysUptime) * time.Second) args := strings.Split(cmd, " ") start := uptimeDate.Add(ticksToDuration(startTime)) return ProcCommonData{ Parent: &ProcCommonData{PID: ppid}, PID: processID, Name: stat.Name, PGID: pgid, CommandLine: cmd, Args: args, ArgsCount: int64(len(args)), Title: stat.Name, Start: start, Uptime: int64(time.Since(start).Seconds()), } } // getProcessConfigurationFile - reads the configuration file associated with a process. // As an input this function receives a ProcessInputConfiguration that contains ConfigFileArguments, a string array that represents some process flags // The function extracts the configuration file associated with each flag and returns it. func (f *ProcessesFetcher) getProcessConfigurationFile(processConfig ProcessInputConfiguration, cmd string, processName string) map[string]any { configMap := make(map[string]any) for _, argument := range processConfig.ConfigFileArguments { // The regex extracts the cmd line flag(argument) value regex := fmt.Sprintf(CMDArgumentMatcher, argument) matcher := regexp.MustCompile(regex) if !matcher.MatchString(cmd) { f.log.Infof("Couldn't find a configuration file associated with flag %s for process %s from cmd %s", argument, processName, cmd) continue } groupMatches := matcher.FindStringSubmatch(cmd) if len(groupMatches) < 2 { f.log.Errorf("Couldn't find a configuration file associated with flag %s for process %s", argument, processName) continue } argValue := matcher.FindStringSubmatch(cmd)[1] f.log.Infof("Using %s as a configuration file for process %s", argValue, processName) data, err := fs.ReadFile(f.Fs, argValue) if err != nil { f.log.Errorf("Failed to read file configuration for process %s, error - %+v", processName, err) continue } configFile, err := f.readConfigurationFile(argValue, data) if err != nil { f.log.Errorf("Failed to parse file configuration for process %s, error - %+v", processName, err) continue } configMap[argument] = configFile } return configMap } func (f *ProcessesFetcher) readConfigurationFile(path string, data []byte) (any, error) { ext := filepath.Ext(path) var output any switch ext { case ".json": if err := json.Unmarshal(data, &output); err != nil { return nil, err } case ".yaml": if err := yaml.Unmarshal(data, &output); err != nil { return nil, err } default: return nil, fmt.Errorf("%s type is not supported", ext) } return output, nil } func (f *ProcessesFetcher) Stop() { } func (res ProcResource) GetData() any { return res.EvalResource } func (res ProcResource) GetIds() []string { return nil } func (res ProcResource) GetMetadata() (fetching.ResourceMetadata, error) { return fetching.ResourceMetadata{ ID: res.EvalResource.PID + res.EvalResource.Stat.StartTime, Type: ProcessResourceType, SubType: ProcessSubType, Name: res.EvalResource.Stat.Name, }, nil } func (res ProcResource) GetElasticCommonData() (map[string]any, error) { m := map[string]any{} m["process.parent.pid"] = res.ElasticCommon.Parent.PID m["process.pid"] = res.ElasticCommon.PID m["process.name"] = res.ElasticCommon.Name m["process.pgid"] = res.ElasticCommon.PGID m["process.command_line"] = res.ElasticCommon.CommandLine m["process.args"] = res.ElasticCommon.Args m["process.args_count"] = res.ElasticCommon.ArgsCount m["process.title"] = res.ElasticCommon.Title m["process.start"] = res.ElasticCommon.Start m["process.uptime"] = res.ElasticCommon.Uptime return m, nil } // Supported only in Linux func ticksToDuration(ticks uint64) time.Duration { seconds := float64(ticks) / float64(userHz) * float64(time.Second) return time.Duration(int64(seconds)) } func extractCommandName(cmdline string) string { // remove command line arguments by finding the first space. // <root>/proc/pid/cmdline separates the strings with null bytes ('\0'), // but proc.ReadCmdLineFS replaces them with space. i := strings.IndexByte(cmdline, ' ') if i > -1 { cmdline = cmdline[:i] } // remove the path (if exists) and return the process' executable file name. _, file := path.Split(cmdline) return file }