in agent/service.go [153:263]
func (ts *TaskService) Create(requestCtx context.Context, req *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
defer logPanicAndDie(log.G(requestCtx))
taskID := req.ID
execID := "" // the exec ID of the initial process in a task is an empty string by containerd convention
// this is technically validated earlier by containerd, but is added here too for extra safety
taskExecID, err := TaskExecID(taskID, execID)
if err != nil {
return nil, errors.Wrap(err, "invalid task and/or exec ID")
}
logger := log.G(requestCtx).WithField("TaskID", taskID).WithField("ExecID", execID)
logger.Info("create")
defer func() {
if err != nil {
cleanupErr := ts.doCleanup(taskExecID)
if cleanupErr != nil {
logger.WithError(cleanupErr).Error("failed to cleanup task")
}
}
}()
extraData, err := unmarshalExtraData(req.Options)
if err != nil {
return nil, errors.Wrap(err, "failed to unmarshal extra data")
}
// Just provide runc the options it knows about, not our wrapper
req.Options = extraData.RuncOptions
bundleDir := bundle.Dir(req.Bundle)
ts.addCleanup(taskExecID, func() error {
err := os.RemoveAll(bundleDir.RootPath())
if err != nil {
return errors.Wrapf(err, "failed to remove bundle path %q", bundleDir.RootPath())
}
return nil
})
// check the rootfs dir has been created (presumed to be by a previous MountDrive call)
rootfsStat, err := os.Stat(bundleDir.RootfsPath())
if err != nil {
return nil, errors.Wrapf(err, "failed to stat bundle's rootfs path %q", bundleDir.RootfsPath())
}
if !rootfsStat.IsDir() {
return nil, errors.Errorf("bundle's rootfs path %q is not a dir", bundleDir.RootfsPath())
}
ts.addCleanup(taskExecID, func() error {
err := mount.UnmountAll(bundleDir.RootfsPath(), unix.MNT_DETACH)
if err != nil {
return errors.Wrapf(err, "failed to unmount bundle rootfs %q", bundleDir.RootfsPath())
}
return nil
})
err = bundleDir.OCIConfig().Write(extraData.JsonSpec)
if err != nil {
return nil, errors.Wrap(err, "failed to write oci config file")
}
var ioConnectorSet vm.IOProxy
if vm.IsAgentOnlyIO(req.Stdout, logger) {
ioConnectorSet = vm.NewNullIOProxy()
} else {
// Override the incoming stdio FIFOs, which have paths from the host that we can't use
fifoSet, err := cio.NewFIFOSetInDir(bundleDir.RootPath(), taskExecID, req.Terminal)
if err != nil {
err = errors.Wrap(err, "failed to open stdio FIFOs")
logger.WithError(err).Error()
return nil, err
}
var stdinConnectorPair *vm.IOConnectorPair
if req.Stdin != "" {
req.Stdin = fifoSet.Stdin
stdinConnectorPair = &vm.IOConnectorPair{
ReadConnector: vm.VSockAcceptConnector(extraData.StdinPort),
WriteConnector: vm.WriteFIFOConnector(fifoSet.Stdin),
}
}
var stdoutConnectorPair *vm.IOConnectorPair
if req.Stdout != "" {
req.Stdout = fifoSet.Stdout
stdoutConnectorPair = &vm.IOConnectorPair{
ReadConnector: vm.ReadFIFOConnector(fifoSet.Stdout),
WriteConnector: vm.VSockAcceptConnector(extraData.StdoutPort),
}
}
var stderrConnectorPair *vm.IOConnectorPair
if req.Stderr != "" {
req.Stderr = fifoSet.Stderr
stderrConnectorPair = &vm.IOConnectorPair{
ReadConnector: vm.ReadFIFOConnector(fifoSet.Stderr),
WriteConnector: vm.VSockAcceptConnector(extraData.StderrPort),
}
}
ioConnectorSet = vm.NewIOConnectorProxy(stdinConnectorPair, stdoutConnectorPair, stderrConnectorPair)
}
resp, err := ts.taskManager.CreateTask(requestCtx, req, ts.runcService, ioConnectorSet)
if err != nil {
return nil, err
}
logger.WithField("pid", resp.Pid).Debugf("create succeeded")
return resp, nil
}