func()

in agent/service.go [406:499]


func (ts *TaskService) Exec(requestCtx context.Context, req *taskAPI.ExecProcessRequest) (_ *types.Empty, err error) {
	defer logPanicAndDie(log.G(requestCtx))

	taskID := req.ID
	execID := req.ExecID
	// this is technically validated earlier by containerd, but is added here too for extra safety
	taskExecID, err := TaskExecID(taskID, execID)
	if err != nil {
		return nil, errors.Wrap(err, "invalid task and/or exec ID")
	}

	logger := log.G(requestCtx).WithField("TaskID", taskID).WithField("ExecID", execID)
	logger.Debug("exec")

	defer func() {
		if err != nil {
			cleanupErr := ts.doCleanup(taskExecID)
			if cleanupErr != nil {
				logger.WithError(cleanupErr).Error("failed to cleanup task")
			}
		}
	}()

	extraData, err := unmarshalExtraData(req.Spec)
	if err != nil {
		return nil, errors.Wrap(err, "failed to unmarshal extra data")
	}

	// Just provide runc the options it knows about, not our wrapper
	req.Spec = extraData.RuncOptions

	bundleDir := bundle.VMBundleDir(taskID)

	var ioConnectorSet vm.IOProxy

	if vm.IsAgentOnlyIO(req.Stdout, logger) {
		ioConnectorSet = vm.NewNullIOProxy()
	} else {
		// Override the incoming stdio FIFOs, which have paths from the host that we can't use
		fifoSet, err := cio.NewFIFOSetInDir(bundleDir.RootPath(), taskExecID, req.Terminal)
		if err != nil {
			err = errors.Wrap(err, "failed to open stdio FIFOs")
			logger.WithError(err).Error()
			return nil, err
		}

		var stdinConnectorPair *vm.IOConnectorPair
		if req.Stdin != "" {
			req.Stdin = fifoSet.Stdin
			stdinConnectorPair = &vm.IOConnectorPair{
				ReadConnector:  vm.VSockAcceptConnector(extraData.StdinPort),
				WriteConnector: vm.WriteFIFOConnector(fifoSet.Stdin),
			}
			ts.addCleanup(taskExecID, func() error {
				return os.RemoveAll(req.Stdin)
			})
		}

		var stdoutConnectorPair *vm.IOConnectorPair
		if req.Stdout != "" {
			req.Stdout = fifoSet.Stdout
			stdoutConnectorPair = &vm.IOConnectorPair{
				ReadConnector:  vm.ReadFIFOConnector(fifoSet.Stdout),
				WriteConnector: vm.VSockAcceptConnector(extraData.StdoutPort),
			}
			ts.addCleanup(taskExecID, func() error {
				return os.RemoveAll(req.Stdout)
			})
		}

		var stderrConnectorPair *vm.IOConnectorPair
		if req.Stderr != "" {
			req.Stderr = fifoSet.Stderr
			stderrConnectorPair = &vm.IOConnectorPair{
				ReadConnector:  vm.ReadFIFOConnector(fifoSet.Stderr),
				WriteConnector: vm.VSockAcceptConnector(extraData.StderrPort),
			}
			ts.addCleanup(taskExecID, func() error {
				return os.RemoveAll(req.Stderr)
			})
		}

		ioConnectorSet = vm.NewIOConnectorProxy(stdinConnectorPair, stdoutConnectorPair, stderrConnectorPair)
	}

	resp, err := ts.taskManager.ExecProcess(requestCtx, req, ts.runcService, ioConnectorSet)
	if err != nil {
		logger.WithError(err).Error("exec failed")
		return nil, err
	}

	logger.Debug("exec succeeded")
	return resp, nil
}