func()

in pkg/custompluginmonitor/plugin/plugin.go [140:266]


func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, output string) {
	var ctx context.Context
	var cancel context.CancelFunc

	if rule.Timeout != nil && *rule.Timeout < *p.config.PluginGlobalConfig.Timeout {
		ctx, cancel = context.WithTimeout(context.Background(), *rule.Timeout)
	} else {
		ctx, cancel = context.WithTimeout(context.Background(), *p.config.PluginGlobalConfig.Timeout)
	}
	defer cancel()

	cmd := util.Exec(rule.Path, rule.Args...)

	stdoutPipe, err := cmd.StdoutPipe()
	if err != nil {
		glog.Errorf("Error creating stdout pipe for plugin %q: error - %v", rule.Path, err)
		return cpmtypes.Unknown, "Error creating stdout pipe for plugin. Please check the error log"
	}
	stderrPipe, err := cmd.StderrPipe()
	if err != nil {
		glog.Errorf("Error creating stderr pipe for plugin %q: error - %v", rule.Path, err)
		return cpmtypes.Unknown, "Error creating stderr pipe for plugin. Please check the error log"
	}
	if err := cmd.Start(); err != nil {
		glog.Errorf("Error in starting plugin %q: error - %v", rule.Path, err)
		return cpmtypes.Unknown, "Error in starting plugin. Please check the error log"
	}

	waitChan := make(chan struct{})
	defer close(waitChan)

	var m sync.Mutex
	timeout := false

	go func() {
		select {
		case <-ctx.Done():
			if ctx.Err() == context.Canceled {
				return
			}
			glog.Errorf("Error in running plugin timeout %q", rule.Path)
			if cmd.Process == nil || cmd.Process.Pid == 0 {
				glog.Errorf("Error in cmd.Process check %q", rule.Path)
				break
			}

			m.Lock()
			timeout = true
			m.Unlock()

			err := util.Kill(cmd)
			if err != nil {
				glog.Errorf("Error in kill process %d, %v", cmd.Process.Pid, err)
			}
		case <-waitChan:
			return
		}
	}()

	var (
		wg        sync.WaitGroup
		stdout    []byte
		stderr    []byte
		stdoutErr error
		stderrErr error
	)

	wg.Add(2)
	go func() {
		defer wg.Done()
		stdout, stdoutErr = readFromReader(stdoutPipe, maxCustomPluginBufferBytes)
	}()
	go func() {
		defer wg.Done()
		stderr, stderrErr = readFromReader(stderrPipe, maxCustomPluginBufferBytes)
	}()
	// This will wait for the reads to complete. If the execution times out, the pipes
	// will be closed and the wait group unblocks.
	wg.Wait()

	if stdoutErr != nil {
		glog.Errorf("Error reading stdout for plugin %q: error - %v", rule.Path, err)
		return cpmtypes.Unknown, "Error reading stdout for plugin. Please check the error log"
	}

	if stderrErr != nil {
		glog.Errorf("Error reading stderr for plugin %q: error - %v", rule.Path, err)
		return cpmtypes.Unknown, "Error reading stderr for plugin. Please check the error log"
	}

	if err := cmd.Wait(); err != nil {
		if _, ok := err.(*exec.ExitError); !ok {
			glog.Errorf("Error in waiting for plugin %q: error - %v. output - %q", rule.Path, err, string(stdout))
			return cpmtypes.Unknown, "Error in waiting for plugin. Please check the error log"
		}
	}

	// trim suffix useless bytes
	output = string(stdout)
	output = strings.TrimSpace(output)

	m.Lock()
	cmdKilled := timeout
	m.Unlock()

	if cmdKilled {
		output = fmt.Sprintf("Timeout when running plugin %q: state - %s. output - %q", rule.Path, cmd.ProcessState.String(), output)
	}

	// cut at position max_output_length if stdout is longer than max_output_length bytes
	if len(output) > *p.config.PluginGlobalConfig.MaxOutputLength {
		output = output[:*p.config.PluginGlobalConfig.MaxOutputLength]
	}

	exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
	switch exitCode {
	case 0:
		logPluginStderr(rule, string(stderr), 3)
		return cpmtypes.OK, output
	case 1:
		logPluginStderr(rule, string(stderr), 0)
		return cpmtypes.NonOK, output
	default:
		logPluginStderr(rule, string(stderr), 0)
		return cpmtypes.Unknown, output
	}
}