public void launchContainer()

in hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java [591:901]


  public void launchContainer(ContainerRuntimeContext ctx)
      throws ContainerExecutionException {
    Container container = ctx.getContainer();
    ContainerId containerId = container.getContainerId();
    String containerIdStr = containerId.toString();
    Map<String, String> environment = container.getLaunchContext()
        .getEnvironment();
    String imageName = environment.get(ENV_DOCKER_CONTAINER_IMAGE);
    String network = environment.get(ENV_DOCKER_CONTAINER_NETWORK);
    String hostname = environment.get(ENV_DOCKER_CONTAINER_HOSTNAME);
    String runtime = environment.get(ENV_DOCKER_CONTAINER_DOCKER_RUNTIME);
    boolean serviceMode = Boolean.parseBoolean(environment.get(
        ENV_DOCKER_CONTAINER_DOCKER_SERVICE_MODE));
    boolean useEntryPoint = serviceMode || checkUseEntryPoint(environment);
    String clientConfig = environment.get(ENV_DOCKER_CONTAINER_CLIENT_CONFIG);

    if (imageName == null || imageName.isEmpty()) {
      imageName = defaultImageName;
    }
    if(network == null || network.isEmpty()) {
      network = defaultNetwork;
    }

    validateContainerNetworkType(network);

    validateHostname(hostname);

    validateImageName(imageName);

    validateContainerRuntimeType(runtime);

    if (defaultImageUpdate) {
      pullImageFromRemote(containerIdStr, imageName);
    }

    String runAsUser = ctx.getExecutionAttribute(RUN_AS_USER);
    String dockerRunAsUser = runAsUser;
    Path containerWorkDir = ctx.getExecutionAttribute(CONTAINER_WORK_DIR);
    String[] groups = null;

    if (enableUserReMapping) {
      String uid = getUserIdInfo(runAsUser);
      groups = getGroupIdInfo(runAsUser);
      String gid = groups[0];
      if(Integer.parseInt(uid) < userRemappingUidThreshold) {
        String message = "uid: " + uid + " below threshold: "
            + userRemappingUidThreshold;
        throw new ContainerExecutionException(message);
      }
      for(int i = 0; i < groups.length; i++) {
        String group = groups[i];
        if (Integer.parseInt(group) < userRemappingGidThreshold) {
          String message = "gid: " + group
              + " below threshold: " + userRemappingGidThreshold;
          throw new ContainerExecutionException(message);
        }
      }
      if (!allowPrivilegedContainerExecution(container)) {
        dockerRunAsUser = uid + ":" + gid;
      } else {
        dockerRunAsUser = ctx.getExecutionAttribute(USER);
      }
    }

    //List<String> -> stored as List -> fetched/converted to List<String>
    //we can't do better here thanks to type-erasure
    @SuppressWarnings("unchecked")
    List<String> filecacheDirs = ctx.getExecutionAttribute(FILECACHE_DIRS);
    @SuppressWarnings("unchecked")
    List<String> containerLogDirs = ctx.getExecutionAttribute(
        CONTAINER_LOG_DIRS);
    @SuppressWarnings("unchecked")
    List<String> userFilecacheDirs =
        ctx.getExecutionAttribute(USER_FILECACHE_DIRS);
    @SuppressWarnings("unchecked")
    List<String> applicationLocalDirs =
        ctx.getExecutionAttribute(APPLICATION_LOCAL_DIRS);
    @SuppressWarnings("unchecked")
    Map<Path, List<String>> localizedResources = ctx.getExecutionAttribute(
        LOCALIZED_RESOURCES);

    @SuppressWarnings("unchecked")
    DockerRunCommand runCommand = new DockerRunCommand(containerIdStr,
        dockerRunAsUser, imageName)
        .setNetworkType(network);

    setHostname(runCommand, containerIdStr, network, hostname);

    // Add ports mapping value.
    if (environment.containsKey(ENV_DOCKER_CONTAINER_PORTS_MAPPING)) {
      String portsMapping = environment.get(ENV_DOCKER_CONTAINER_PORTS_MAPPING);
      for (String mapping:portsMapping.split(",")) {
        if (!Pattern.matches(PORTS_MAPPING_PATTERN, mapping)) {
          throw new ContainerExecutionException(
              "Invalid port mappings: " + mapping);
        }
        runCommand.addPortsMapping(mapping);
      }
    }

    runCommand.setCapabilities(capabilities);
    if (runtime != null && !runtime.isEmpty()) {
      runCommand.addRuntime(runtime);
    }

    if (!serviceMode) {
      runCommand.addAllReadWriteMountLocations(containerLogDirs);
      runCommand.addAllReadWriteMountLocations(applicationLocalDirs);
      runCommand.addAllReadOnlyMountLocations(filecacheDirs);
      runCommand.addAllReadOnlyMountLocations(userFilecacheDirs);
    }

    if (environment.containsKey(ENV_DOCKER_CONTAINER_MOUNTS)) {
      Matcher parsedMounts = USER_MOUNT_PATTERN.matcher(
          environment.get(ENV_DOCKER_CONTAINER_MOUNTS));
      if (!parsedMounts.find()) {
        throw new ContainerExecutionException(
            "Unable to parse user supplied mount list: "
                + environment.get(ENV_DOCKER_CONTAINER_MOUNTS));
      }
      parsedMounts.reset();
      long mountCount = 0;
      while (parsedMounts.find()) {
        mountCount++;
        String src = parsedMounts.group(1);
        java.nio.file.Path srcPath = java.nio.file.Paths.get(src);
        if (!srcPath.isAbsolute()) {
          src = mountReadOnlyPath(src, localizedResources);
        }
        String dst = parsedMounts.group(2);
        String mode = parsedMounts.group(4);
        if (mode == null) {
          mode = "rw";
        } else if (!mode.startsWith("ro") && !mode.startsWith("rw")) {
          mode = "rw+" + mode;
        }
        runCommand.addMountLocation(src, dst, mode);
      }
      long commaCount = environment.get(ENV_DOCKER_CONTAINER_MOUNTS).chars()
          .filter(c -> c == ',').count();
      if (mountCount != commaCount + 1) {
        // this means the matcher skipped an improperly formatted mount
        throw new ContainerExecutionException(
            "Unable to parse some mounts in user supplied mount list: "
                + environment.get(ENV_DOCKER_CONTAINER_MOUNTS));
      }
    }

    if(defaultROMounts != null && !defaultROMounts.isEmpty()) {
      for (String mount : defaultROMounts) {
        String[] dir = StringUtils.split(mount, ':');
        if (dir.length != 2) {
          throw new ContainerExecutionException("Invalid mount : " +
              mount);
        }
        String src = dir[0];
        String dst = dir[1];
        runCommand.addReadOnlyMountLocation(src, dst);
      }
    }

    if(defaultRWMounts != null && !defaultRWMounts.isEmpty()) {
      for (String mount : defaultRWMounts) {
        String[] dir = StringUtils.split(mount, ':');
        if (dir.length != 2) {
          throw new ContainerExecutionException("Invalid mount : " +
              mount);
        }
        String src = dir[0];
        String dst = dir[1];
        runCommand.addReadWriteMountLocation(src, dst);
      }
    }

    ContainerVolumePublisher publisher = new ContainerVolumePublisher(
        container, container.getCsiVolumesRootDir(), this);
    try {
      Map<String, String> volumeMounts = publisher.publishVolumes();
      volumeMounts.forEach((local, remote) ->
          runCommand.addReadWriteMountLocation(local, remote));
    } catch (YarnException | IOException e) {
      throw new ContainerExecutionException(
          "Container requests for volume resource but we are failed"
              + " to publish volumes on this node");
    }

    if (environment.containsKey(ENV_DOCKER_CONTAINER_TMPFS_MOUNTS)) {
      String[] tmpfsMounts = environment.get(ENV_DOCKER_CONTAINER_TMPFS_MOUNTS)
          .split(",");
      for (String mount : tmpfsMounts) {
        if (!TMPFS_MOUNT_PATTERN.matcher(mount).matches()) {
          throw new ContainerExecutionException("Invalid tmpfs mount : " +
              mount);
        }
        runCommand.addTmpfsMount(mount);
      }
    }

    if (defaultTmpfsMounts != null && !defaultTmpfsMounts.isEmpty()) {
      for (String mount : defaultTmpfsMounts) {
        if (!TMPFS_MOUNT_PATTERN.matcher(mount).matches()) {
          throw new ContainerExecutionException("Invalid tmpfs mount : " +
              mount);
        }
        runCommand.addTmpfsMount(mount);
      }
    }

    if (allowHostPidNamespace(container)) {
      runCommand.setPidNamespace("host");
    }

    if (allowPrivilegedContainerExecution(container)) {
      runCommand.setPrivileged();
    }

    addDockerClientConfigToRunCommand(ctx, runCommand,
        getAdditionalDockerClientCredentials(clientConfig, containerIdStr));

    String resourcesOpts = ctx.getExecutionAttribute(RESOURCES_OPTIONS);

    addCGroupParentIfRequired(resourcesOpts, containerIdStr, runCommand);

    if(environment.containsKey(ENV_DOCKER_CONTAINER_YARN_SYSFS) &&
        Boolean.parseBoolean(environment
            .get(ENV_DOCKER_CONTAINER_YARN_SYSFS))) {
      runCommand.setYarnSysFS(true);
    }

    // In service mode, the YARN log dirs are not mounted into the container.
    // As a result, the container fails to start due to stdout and stderr output
    // being sent to a file in a directory that does not exist. In service mode,
    // only supply the command with no stdout or stderr redirection.
    List<String> commands = container.getLaunchContext().getCommands();
    if (serviceMode) {
      commands = Arrays.asList(
          String.join(" ", commands).split("1>")[0].split(" "));
    }

    if (useEntryPoint) {
      runCommand.setOverrideDisabled(true);
      runCommand.addEnv(environment);
      runCommand.setOverrideCommandWithArgs(commands);
      runCommand.disableDetach();
      runCommand.setLogDir(container.getLogDir());
    } else {
      List<String> overrideCommands = new ArrayList<>();
      Path launchDst =
          new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT);
      overrideCommands.add("bash");
      overrideCommands.add(launchDst.toUri().getPath());
      runCommand.setContainerWorkDir(containerWorkDir.toString());
      runCommand.setOverrideCommandWithArgs(overrideCommands);
      runCommand.detachOnRun();
    }

    if (serviceMode) {
      runCommand.setServiceMode(serviceMode);
    }

    if(enableUserReMapping) {
      if (!allowPrivilegedContainerExecution(container)) {
        runCommand.groupAdd(groups);
      }
    }

    // use plugins to create volume and update docker run command.
    if (nmContext != null
        && nmContext.getResourcePluginManager().getNameToPlugins() != null) {
      for (ResourcePlugin plugin : nmContext.getResourcePluginManager()
          .getNameToPlugins().values()) {
        DockerCommandPlugin dockerCommandPlugin =
            plugin.getDockerCommandPluginInstance();

        if (dockerCommandPlugin != null) {
          // Create volumes when needed.
          DockerVolumeCommand dockerVolumeCommand =
              dockerCommandPlugin.getCreateDockerVolumeCommand(
                  ctx.getContainer());
          if (dockerVolumeCommand != null) {
            runDockerVolumeCommand(dockerVolumeCommand, container);

            // After volume created, run inspect to make sure volume properly
            // created.
            if (dockerVolumeCommand.getSubCommand().equals(
                DockerVolumeCommand.VOLUME_CREATE_SUB_COMMAND)) {
              checkDockerVolumeCreated(dockerVolumeCommand, container);
            }
          }
          // Update cmd
          dockerCommandPlugin.updateDockerRunCommand(runCommand, container);
        }
      }
    }

    String commandFile = dockerClient.writeCommandToTempFile(runCommand,
        containerId, nmContext);
    PrivilegedOperation launchOp = buildLaunchOp(ctx,
        commandFile, runCommand);

    // Some failures here are acceptable. Let the calling executor decide.
    launchOp.disableFailureLogging();

    try {
      privilegedOperationExecutor.executePrivilegedOperation(null,
          launchOp, null, null, false, false);
    } catch (PrivilegedOperationException e) {
      throw new ContainerExecutionException("Launch container failed", e
          .getExitCode(), e.getOutput(), e.getErrorOutput());
    }
  }