internal/resources/fetching/fetchers/k8s/file_system_fetcher.go (206 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package fetchers import ( "context" "errors" "fmt" "os" "path/filepath" "strconv" "syscall" "time" "github.com/djherbis/times" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/cloudbeat/internal/infra/clog" "github.com/elastic/cloudbeat/internal/resources/fetching" "github.com/elastic/cloudbeat/internal/resources/fetching/cycle" "github.com/elastic/cloudbeat/internal/resources/utils/user" ) const ( FSResourceType = "file" FileSubType = "file" DirSubType = "directory" UserFile = "/hostfs/etc/passwd" GroupFile = "/hostfs/etc/group" ) type EvalFSResource struct { Name string `json:"name"` Mode string `json:"mode"` Gid string `json:"gid"` Uid string `json:"uid"` Owner string `json:"owner"` Group string `json:"group"` Path string `json:"path"` Inode string `json:"inode"` SubType string `json:"sub_type"` } // FileCommonData According to https://www.elastic.co/guide/en/ecs/current/ecs-file.html type FileCommonData struct { Name string `mapstructure:"file.name,omitempty"` Mode string `mapstructure:"file.mode,omitempty"` Gid string `mapstructure:"file.gid,omitempty"` Uid string `mapstructure:"file.uid,omitempty"` Owner string `mapstructure:"file.owner,omitempty"` Group string `mapstructure:"file.group,omitempty"` Path string `mapstructure:"file.path,omitempty"` Inode string `mapstructure:"file.inode,omitempty"` Extension string `mapstructure:"file.extension,omitempty"` Size int64 `mapstructure:"file.size"` Type string `mapstructure:"file.type,omitempty"` Directory string `mapstructure:"file.directory,omitempty"` Accessed time.Time `mapstructure:"file.accessed"` Mtime time.Time `mapstructure:"file.mtime"` Ctime time.Time `mapstructure:"file.ctime"` } type FSResource struct { EvalResource EvalFSResource ElasticCommon FileCommonData } // FileSystemFetcher implement the Fetcher interface // The FileSystemFetcher meant to fetch file/directories from the file system and ship it // to the Cloudbeat type FileSystemFetcher struct { log *clog.Logger osUser user.OSUser resourceCh chan fetching.ResourceInfo patterns []string } func NewFsFetcher(log *clog.Logger, ch chan fetching.ResourceInfo, patterns []string) *FileSystemFetcher { return &FileSystemFetcher{ log: log, resourceCh: ch, osUser: user.NewOSUserUtil(), patterns: patterns, } } func (f *FileSystemFetcher) Fetch(_ context.Context, cycleMetadata cycle.Metadata) error { f.log.Debug("Starting FileSystemFetcher.Fetch") // Input files might contain glob pattern for _, filePattern := range f.patterns { matchedFiles, err := Glob(filePattern) if err != nil { f.log.Errorf("Failed to find matched glob for %s, error: %+v", filePattern, err) } for _, file := range matchedFiles { resource, err := f.fetchSystemResource(file) if err != nil { f.log.Errorf("Unable to fetch fileSystemResource for file %v", file) continue } f.resourceCh <- fetching.ResourceInfo{Resource: resource, CycleMetadata: cycleMetadata} } } return nil } func (f *FileSystemFetcher) fetchSystemResource(filePath string) (*FSResource, error) { info, err := os.Stat(filePath) if err != nil { return nil, fmt.Errorf("failed to fetch %s, error: %w", filePath, err) } resourceInfo, _ := f.fromFileInfo(info, filePath) return resourceInfo, nil } func (f *FileSystemFetcher) fromFileInfo(info os.FileInfo, path string) (*FSResource, error) { if info == nil { return nil, nil } stat, ok := info.Sys().(*syscall.Stat_t) if !ok { return nil, errors.New("not of type syscall.Stat_t") } mod := strconv.FormatUint(uint64(info.Mode().Perm()), 8) uid := strconv.FormatUint(uint64(stat.Uid), 10) gid := strconv.FormatUint(uint64(stat.Gid), 10) inode := strconv.FormatUint(stat.Ino, 10) username, err := f.osUser.GetUserNameFromID(uid, UserFile) if err != nil { logp.Error(fmt.Errorf("failed to find username for uid %s, error - %+v", uid, err)) } groupName, err := f.osUser.GetGroupNameFromID(gid, GroupFile) if err != nil { logp.Error(fmt.Errorf("failed to find groupname for gid %s, error - %+v", gid, err)) } data := EvalFSResource{ Name: info.Name(), Mode: mod, Gid: gid, Uid: uid, Owner: username, Group: groupName, Path: path, Inode: inode, SubType: getFSSubType(info), } return &FSResource{ EvalResource: data, ElasticCommon: f.createFileCommonData(stat, data, path), }, nil } func (f *FileSystemFetcher) Stop() { } func (r FSResource) GetData() any { return r.EvalResource } func (r FSResource) GetIds() []string { return nil } func (r FSResource) GetElasticCommonData() (map[string]any, error) { m := map[string]any{} m["file.name"] = r.ElasticCommon.Name m["file.mode"] = r.ElasticCommon.Mode m["file.gid"] = r.ElasticCommon.Gid m["file.uid"] = r.ElasticCommon.Uid m["file.owner"] = r.ElasticCommon.Owner m["file.group"] = r.ElasticCommon.Group m["file.path"] = r.ElasticCommon.Path m["file.inode"] = r.ElasticCommon.Inode m["file.extension"] = r.ElasticCommon.Extension m["file.directory"] = r.ElasticCommon.Directory m["file.size"] = r.ElasticCommon.Size m["file.type"] = r.ElasticCommon.Type if !r.ElasticCommon.Accessed.IsZero() { m["file.accessed"] = r.ElasticCommon.Accessed } if !r.ElasticCommon.Mtime.IsZero() { m["file.mtime"] = r.ElasticCommon.Mtime } if !r.ElasticCommon.Ctime.IsZero() { m["file.ctime"] = r.ElasticCommon.Ctime } return m, nil } func (r FSResource) GetMetadata() (fetching.ResourceMetadata, error) { return fetching.ResourceMetadata{ ID: r.EvalResource.Path, Type: FSResourceType, SubType: r.EvalResource.SubType, Name: r.EvalResource.Path, // The Path from the container and not from the host }, nil } func getFSSubType(fileInfo os.FileInfo) string { if fileInfo.IsDir() { return DirSubType } return FileSubType } func (f *FileSystemFetcher) createFileCommonData(stat *syscall.Stat_t, data EvalFSResource, path string) FileCommonData { cd := FileCommonData{ Name: data.Name, Mode: data.Mode, Gid: data.Gid, Uid: data.Uid, Owner: data.Owner, Group: data.Group, Path: data.Path, Inode: data.Inode, Extension: filepath.Ext(path), Directory: filepath.Dir(path), Size: stat.Size, Type: data.SubType, } t, err := times.Stat(path) if err != nil { f.log.Errorf("failed to get file time data (file %s), error - %s", path, err.Error()) } else { cd.Accessed = t.AccessTime() cd.Mtime = t.ModTime() if t.HasChangeTime() { cd.Ctime = t.ChangeTime() } } return cd }