x-pack/osquerybeat/internal/osqd/osqueryd.go (444 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package osqd import ( "bufio" "context" "errors" "fmt" "io" "os" "os/exec" "path/filepath" "runtime" "sort" "strings" "sync" "github.com/dolmen-go/contextio" "github.com/elastic/beats/v7/x-pack/libbeat/common/proc" "github.com/elastic/beats/v7/x-pack/osquerybeat/internal/fileutil" "github.com/elastic/elastic-agent-libs/logp" ) const ( osqueryDName = "osqueryd" osqueryDarwinAppBundlePath = "osquery.app/Contents/MacOS" ) const ( defaultDataDir = "osquery" defaultCertsDir = "certs" defaultLensesDir = "lenses" defaultConfigRefreshInterval = 30 // interval osqueryd will poll for configuration changed; scheduled queries configuration for now ) const ( flagEnableTables = "enable_tables" flagDisableTables = "disable_tables" ) var defaultDisabledTables = []string{"carves", "curl"} type OSQueryD struct { socketPath string binPath string dataPath string certsPath string lensesPath string configPlugin string loggerPlugin string extensionsTimeout int configRefreshInterval int log *logp.Logger } type Option func(*OSQueryD) func WithExtensionsTimeout(to int) Option { return func(q *OSQueryD) { q.extensionsTimeout = to } } func WithBinaryPath(binPath string) Option { return func(q *OSQueryD) { q.binPath = binPath } } func WithConfigRefresh(refreshInterval int) Option { return func(q *OSQueryD) { q.configRefreshInterval = refreshInterval } } func WithDataPath(dataPath string) Option { return func(q *OSQueryD) { q.dataPath = dataPath } } func WithLogger(log *logp.Logger) Option { return func(q *OSQueryD) { q.log = log } } func WithConfigPlugin(name string) Option { return func(q *OSQueryD) { q.configPlugin = name } } func WithLoggerPlugin(name string) Option { return func(q *OSQueryD) { q.loggerPlugin = name } } func New(socketPath string, opts ...Option) (*OSQueryD, error) { q := &OSQueryD{ socketPath: socketPath, extensionsTimeout: defaultExtensionsTimeout, configRefreshInterval: defaultConfigRefreshInterval, } for _, opt := range opts { opt(q) } // The working directory is set to something like ./data/elastic-agent-3afa07/run/osquery-default by the agent // Use the child dir osquery for that, so the full path is resolved to ./data/elastic-agent-3afa07/run/osquery-default/oquery // // The following files are currently created there by osqueryd executable when it is started // // -rw------- 1 root wheel 149 Nov 28 17:46 osquery.autoload // drwx------ 11 root wheel 352 Nov 28 19:00 osquery.db // -rw-r--r-- 1 root wheel 0 Nov 28 17:46 osquery.flags // -rw------- 1 root wheel 5 Nov 28 18:48 osquery.pid if q.dataPath == "" { q.dataPath = defaultDataDir } // Initialize binPath before certsPath and the lensesPath are set err := q.prepareBinPath() if err != nil { return nil, fmt.Errorf("failed to prepare bin path, %w", err) } if q.certsPath == "" { q.certsPath = filepath.Join(q.binPath, defaultCertsDir) } if q.lensesPath == "" { q.lensesPath = filepath.Join(q.binPath, defaultLensesDir) } return q, nil } func (q *OSQueryD) SocketPath() string { return q.socketPath } func (q *OSQueryD) DataPath() string { return q.dataPath } // Check checks if the binary exists and executable func (q *OSQueryD) Check(ctx context.Context) error { err := q.prepareBinPath() if err != nil { return fmt.Errorf("failed to prepare bin path, %w", err) } //nolint:gosec // works as expected cmd := exec.Command( osquerydPath(q.binPath), "--S", "--version", ) err = cmd.Start() if err != nil { return err } return cmd.Wait() } // Run executes osqueryd binary as a child process func (q *OSQueryD) Run(ctx context.Context, flags Flags) error { cleanup, err := q.prepare() if err != nil { return err } defer cleanup() cmd := q.createCommand(flags) q.log.Debugf("start osqueryd process: args: %v", cmd.Args) cmd.SysProcAttr = setpgid() // Read standard output var wg sync.WaitGroup if q.isVerbose() { stdout, err := cmd.StdoutPipe() if err != nil { return err } wg.Add(1) go func() { defer wg.Done() _ = q.logOSQueryOutput(ctx, stdout) }() } stderr, err := cmd.StderrPipe() if err != nil { return err } err = cmd.Start() if err != nil { return err } // Assign osqueryd process to the JobObject on windows // in order to assure no orphan process is left behind // after osquerybeat process is killed. if err := proc.JobObject.Assign(cmd.Process); err != nil { q.log.Errorf("osqueryd process failed job assign: %v", err) } var ( errbuf strings.Builder ) ctxstderr := contextio.NewReader(ctx, stderr) wait := func() error { if _, cerr := io.Copy(&errbuf, ctxstderr); cerr != nil { return cerr } return cmd.Wait() } finished := make(chan error, 1) // Wait on osqueryd exit wg.Add(1) go func() { defer wg.Done() finished <- wait() }() select { case err = <-finished: if err != nil { s := strings.TrimSpace(errbuf.String()) if s != "" { err = fmt.Errorf("%s: %w", s, err) } } if err != nil { q.log.Errorf("process exited with error: %v", err) } else { q.log.Info("process exited") } case <-ctx.Done(): q.log.Debug("kill process group on context done") if err := killProcessGroup(cmd); err != nil { q.log.Errorf("kill process group failed: %v", err) } // Wait till finished <-finished } wg.Wait() return err } func (q *OSQueryD) prepare() (func(), error) { err := q.prepareBinPath() if err != nil { return nil, fmt.Errorf("failed to prepare bin path, %w", err) } // Create data directory for all the osquery config/runtime files if err := os.MkdirAll(q.dataPath, 0750); err != nil { return nil, fmt.Errorf("failed to create dir %v, %w", q.dataPath, err) } // If socket path was not specified, create if q.socketPath == "" { // Create temp directory for socket and possibly other things // The unix domain socker path is limited to 108 chars and would // not always be able to create in subdirectory socketPath, cleanupFn, err := CreateSocketPath() if err != nil { return nil, err } q.socketPath = socketPath return cleanupFn, nil } // Prepare autoload osquery-extension extensionPath := osqueryExtensionPath(q.binPath) if _, err := os.Stat(extensionPath); err != nil { if os.IsNotExist(err) { return nil, fmt.Errorf("extension path does not exist: %s, %w", extensionPath, err) } else { return nil, fmt.Errorf("failed to stat extension path, %w", err) } } // Write the autoload file extensionAutoloadPath := q.resolveDataPath(osqueryAutoload) err = prepareAutoloadFile(extensionAutoloadPath, extensionPath, q.log) if err != nil { return nil, fmt.Errorf("failed to prepare extensions autoload file, %w", err) } // Write the flagsfile in order to lock down/prevent loading default flags from osquery global locations. // Otherwise the osqueryi and osqueryd will try to load the default flags file, // for example from /var/osquery/osquery.flags.default on Mac, and can potentially mess up configuration of our osquery instance. flagsfilePath := q.resolveDataPath(osqueryFlagfile) exists, err := fileutil.FileExists(flagsfilePath) if err != nil { return nil, fmt.Errorf("failed to check flagsfile path, %w", err) } if !exists { f, err := os.OpenFile(flagsfilePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { return nil, fmt.Errorf("failed to create flagsfile, %w", err) } f.Close() } return func() {}, nil } func prepareAutoloadFile(extensionAutoloadPath, mandatoryExtensionPath string, log *logp.Logger) error { ok, err := fileutil.FileExists(extensionAutoloadPath) if err != nil { return fmt.Errorf("failed to check osquery.autoload file exists, %w", err) } rewrite := false if ok { log.Debugf("Extensions autoload file %s exists, verify the first extension is ours", extensionAutoloadPath) err = verifyAutoloadFile(extensionAutoloadPath, mandatoryExtensionPath) if err != nil { log.Debugf("Extensions autoload file %v verification failed, err: %v, create a new one", extensionAutoloadPath, err) rewrite = true } } else { log.Debugf("Extensions autoload file %s doesn't exists, create a new one", extensionAutoloadPath) rewrite = true } if rewrite { if err := os.WriteFile(extensionAutoloadPath, []byte(mandatoryExtensionPath), 0600); err != nil { return fmt.Errorf("failed write osquery extension autoload file, %w", err) } } return nil } func verifyAutoloadFile(extensionAutoloadPath, mandatoryExtensionPath string) error { f, err := os.Open(extensionAutoloadPath) if err != nil { return err } defer f.Close() scanner := bufio.NewScanner(f) for i := 0; scanner.Scan(); i++ { line := scanner.Text() if i == 0 { // Check that the first line is the mandatory extension if line != mandatoryExtensionPath { return errors.New("extentsions autoload file is missing mandatory extension in the first line of the file") } } // Check that the line contains the valid path that exists _, err := os.Stat(line) if err != nil { return err } } return scanner.Err() } func (q *OSQueryD) prepareBinPath() error { // If path to osquery was not set use the current executable path if q.binPath == "" { exePath, err := os.Executable() if err != nil { return err } q.binPath = filepath.Dir(exePath) } return nil } func (q *OSQueryD) args(userFlags Flags) Args { flags := make(Flags, len(userFlags)) // Copy user flags for k, userValue := range userFlags { // Skip enable_tables and disable_tables flags, they are set later in this function // after merging with default disabled tables if k == flagEnableTables || flagEnableTables == flagDisableTables { continue } flags[k] = userValue } // Copy protected flags, protected keys overwrite the user keys for k, v := range protectedFlags { flags[k] = v } flags["pidfile"] = q.resolveDataPath(flags.GetString("pidfile")) flags["database_path"] = q.resolveDataPath(flags.GetString("database_path")) flags["extensions_autoload"] = q.resolveDataPath(flags.GetString("extensions_autoload")) flags["flagfile"] = q.resolveDataPath(flags.GetString("flagfile")) flags["tls_server_certs"] = q.resolveCertsPath(flags.GetString("tls_server_certs")) // Augeas lenses are not available on windows if runtime.GOOS == "windows" { delete(flags, "augeas_lenses") } else { flags["augeas_lenses"] = q.lensesPath } flags["extensions_socket"] = q.socketPath if q.extensionsTimeout > 0 { flags["extensions_timeout"] = q.extensionsTimeout } if q.configPlugin != "" { flags["config_plugin"] = q.configPlugin } if q.loggerPlugin != "" { flags["logger_plugin"] = q.loggerPlugin } if q.configRefreshInterval > 0 { flags["config_refresh"] = q.configRefreshInterval } if q.isVerbose() { flags["verbose"] = true flags["disable_logging"] = false } // Check enabled tables // If the default disabled table shows up in the enabled tables list, remove it from disabled tables list // This changes the behvaour for this flag in a sense that if `curl` table is enabled // then it just removes is from disabled tables flag and doesn't disable all the other table enabledTables, disabledTables := getEnabledDisabledTables(userFlags) if len(enabledTables) != 0 { flags[flagEnableTables] = strings.Join(enabledTables, ",") } if len(disabledTables) != 0 { flags[flagDisableTables] = strings.Join(disabledTables, ",") } return convertToArgs(flags) } func arrayToSet(arr []string) map[string]struct{} { m := make(map[string]struct{}, len(arr)) for _, n := range arr { m[n] = struct{}{} } return m } // https://osquery.readthedocs.io/en/stable/installation/cli-flags/#enable-and-disable-flags // By default every table is enabled. // If a specific table is set in both --enable_tables and --disable_tables, disabling take precedence. // If --enable_tables is defined and --disable_tables is not set, every table but the one defined in --enable_tables func getEnabledDisabledTables(userFlags Flags) (enabled, disabled []string) { enabledTables := make(map[string]struct{}) // Initialize with default disabled tables disabledTables := arrayToSet(defaultDisabledTables) iterate := func(key string, fn func(name string)) { if tablesValue, ok := userFlags[key]; ok { if tablesString, ok := tablesValue.(string); ok { tables := strings.Split(tablesString, ",") for _, table := range tables { name := strings.TrimSpace(table) if name == "" { continue } fn(name) } } } } normalize := func(tables map[string]struct{}) []string { res := make([]string, 0, len(tables)) for name := range tables { res = append(res, name) } if len(res) > 0 { sort.Strings(res) } return res } // Append the disabled tables from flags iterate("disable_tables", func(name string) { disabledTables[name] = struct{}{} }) // Check enabled tables flag and remove these tables from disabledTables iterate("enable_tables", func(name string) { if _, ok := disabledTables[name]; ok { delete(disabledTables, name) } else { enabledTables[name] = struct{}{} } }) return normalize(enabledTables), normalize(disabledTables) } func (q *OSQueryD) createCommand(userFlags Flags) *exec.Cmd { //nolint:gosec // works as expected return exec.Command( osquerydPath(q.binPath), q.args(userFlags)...) } func (q *OSQueryD) isVerbose() bool { return q.log.IsDebug() } func osquerydPath(dir string) string { return QsquerydPathForPlatform(runtime.GOOS, dir) } // QsquerydPathForPlatform returns the full path to osqueryd binary for platform func QsquerydPathForPlatform(platform, dir string) string { if platform == "darwin" { return filepath.Join(dir, osqueryDarwinAppBundlePath, osquerydFilename(platform)) } return filepath.Join(dir, osquerydFilename(platform)) } func osquerydFilename(platform string) string { if platform == "windows" { return osqueryDName + ".exe" } return osqueryDName } func osqueryExtensionPath(dir string) string { return filepath.Join(dir, extensionName) } func (q *OSQueryD) resolveDataPath(filename string) string { return filepath.Join(q.dataPath, filename) } func (q *OSQueryD) resolveCertsPath(filename string) string { return filepath.Join(q.certsPath, filename) } func (q *OSQueryD) logOSQueryOutput(ctx context.Context, r io.ReadCloser) error { log := q.log.With("ctx", "osqueryd output") buf := make([]byte, 2048) LOOP: for { n, err := r.Read(buf[:]) if n > 0 { log.Info(string(buf[:n])) } if err != nil { if errors.Is(err, io.EOF) { err = nil } return err } select { case <-ctx.Done(): break LOOP default: } } return nil }