in hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java [591:901]
public void launchContainer(ContainerRuntimeContext ctx)
throws ContainerExecutionException {
Container container = ctx.getContainer();
ContainerId containerId = container.getContainerId();
String containerIdStr = containerId.toString();
Map<String, String> environment = container.getLaunchContext()
.getEnvironment();
String imageName = environment.get(ENV_DOCKER_CONTAINER_IMAGE);
String network = environment.get(ENV_DOCKER_CONTAINER_NETWORK);
String hostname = environment.get(ENV_DOCKER_CONTAINER_HOSTNAME);
String runtime = environment.get(ENV_DOCKER_CONTAINER_DOCKER_RUNTIME);
boolean serviceMode = Boolean.parseBoolean(environment.get(
ENV_DOCKER_CONTAINER_DOCKER_SERVICE_MODE));
boolean useEntryPoint = serviceMode || checkUseEntryPoint(environment);
String clientConfig = environment.get(ENV_DOCKER_CONTAINER_CLIENT_CONFIG);
if (imageName == null || imageName.isEmpty()) {
imageName = defaultImageName;
}
if(network == null || network.isEmpty()) {
network = defaultNetwork;
}
validateContainerNetworkType(network);
validateHostname(hostname);
validateImageName(imageName);
validateContainerRuntimeType(runtime);
if (defaultImageUpdate) {
pullImageFromRemote(containerIdStr, imageName);
}
String runAsUser = ctx.getExecutionAttribute(RUN_AS_USER);
String dockerRunAsUser = runAsUser;
Path containerWorkDir = ctx.getExecutionAttribute(CONTAINER_WORK_DIR);
String[] groups = null;
if (enableUserReMapping) {
String uid = getUserIdInfo(runAsUser);
groups = getGroupIdInfo(runAsUser);
String gid = groups[0];
if(Integer.parseInt(uid) < userRemappingUidThreshold) {
String message = "uid: " + uid + " below threshold: "
+ userRemappingUidThreshold;
throw new ContainerExecutionException(message);
}
for(int i = 0; i < groups.length; i++) {
String group = groups[i];
if (Integer.parseInt(group) < userRemappingGidThreshold) {
String message = "gid: " + group
+ " below threshold: " + userRemappingGidThreshold;
throw new ContainerExecutionException(message);
}
}
if (!allowPrivilegedContainerExecution(container)) {
dockerRunAsUser = uid + ":" + gid;
} else {
dockerRunAsUser = ctx.getExecutionAttribute(USER);
}
}
//List<String> -> stored as List -> fetched/converted to List<String>
//we can't do better here thanks to type-erasure
@SuppressWarnings("unchecked")
List<String> filecacheDirs = ctx.getExecutionAttribute(FILECACHE_DIRS);
@SuppressWarnings("unchecked")
List<String> containerLogDirs = ctx.getExecutionAttribute(
CONTAINER_LOG_DIRS);
@SuppressWarnings("unchecked")
List<String> userFilecacheDirs =
ctx.getExecutionAttribute(USER_FILECACHE_DIRS);
@SuppressWarnings("unchecked")
List<String> applicationLocalDirs =
ctx.getExecutionAttribute(APPLICATION_LOCAL_DIRS);
@SuppressWarnings("unchecked")
Map<Path, List<String>> localizedResources = ctx.getExecutionAttribute(
LOCALIZED_RESOURCES);
@SuppressWarnings("unchecked")
DockerRunCommand runCommand = new DockerRunCommand(containerIdStr,
dockerRunAsUser, imageName)
.setNetworkType(network);
setHostname(runCommand, containerIdStr, network, hostname);
// Add ports mapping value.
if (environment.containsKey(ENV_DOCKER_CONTAINER_PORTS_MAPPING)) {
String portsMapping = environment.get(ENV_DOCKER_CONTAINER_PORTS_MAPPING);
for (String mapping:portsMapping.split(",")) {
if (!Pattern.matches(PORTS_MAPPING_PATTERN, mapping)) {
throw new ContainerExecutionException(
"Invalid port mappings: " + mapping);
}
runCommand.addPortsMapping(mapping);
}
}
runCommand.setCapabilities(capabilities);
if (runtime != null && !runtime.isEmpty()) {
runCommand.addRuntime(runtime);
}
if (!serviceMode) {
runCommand.addAllReadWriteMountLocations(containerLogDirs);
runCommand.addAllReadWriteMountLocations(applicationLocalDirs);
runCommand.addAllReadOnlyMountLocations(filecacheDirs);
runCommand.addAllReadOnlyMountLocations(userFilecacheDirs);
}
if (environment.containsKey(ENV_DOCKER_CONTAINER_MOUNTS)) {
Matcher parsedMounts = USER_MOUNT_PATTERN.matcher(
environment.get(ENV_DOCKER_CONTAINER_MOUNTS));
if (!parsedMounts.find()) {
throw new ContainerExecutionException(
"Unable to parse user supplied mount list: "
+ environment.get(ENV_DOCKER_CONTAINER_MOUNTS));
}
parsedMounts.reset();
long mountCount = 0;
while (parsedMounts.find()) {
mountCount++;
String src = parsedMounts.group(1);
java.nio.file.Path srcPath = java.nio.file.Paths.get(src);
if (!srcPath.isAbsolute()) {
src = mountReadOnlyPath(src, localizedResources);
}
String dst = parsedMounts.group(2);
String mode = parsedMounts.group(4);
if (mode == null) {
mode = "rw";
} else if (!mode.startsWith("ro") && !mode.startsWith("rw")) {
mode = "rw+" + mode;
}
runCommand.addMountLocation(src, dst, mode);
}
long commaCount = environment.get(ENV_DOCKER_CONTAINER_MOUNTS).chars()
.filter(c -> c == ',').count();
if (mountCount != commaCount + 1) {
// this means the matcher skipped an improperly formatted mount
throw new ContainerExecutionException(
"Unable to parse some mounts in user supplied mount list: "
+ environment.get(ENV_DOCKER_CONTAINER_MOUNTS));
}
}
if(defaultROMounts != null && !defaultROMounts.isEmpty()) {
for (String mount : defaultROMounts) {
String[] dir = StringUtils.split(mount, ':');
if (dir.length != 2) {
throw new ContainerExecutionException("Invalid mount : " +
mount);
}
String src = dir[0];
String dst = dir[1];
runCommand.addReadOnlyMountLocation(src, dst);
}
}
if(defaultRWMounts != null && !defaultRWMounts.isEmpty()) {
for (String mount : defaultRWMounts) {
String[] dir = StringUtils.split(mount, ':');
if (dir.length != 2) {
throw new ContainerExecutionException("Invalid mount : " +
mount);
}
String src = dir[0];
String dst = dir[1];
runCommand.addReadWriteMountLocation(src, dst);
}
}
ContainerVolumePublisher publisher = new ContainerVolumePublisher(
container, container.getCsiVolumesRootDir(), this);
try {
Map<String, String> volumeMounts = publisher.publishVolumes();
volumeMounts.forEach((local, remote) ->
runCommand.addReadWriteMountLocation(local, remote));
} catch (YarnException | IOException e) {
throw new ContainerExecutionException(
"Container requests for volume resource but we are failed"
+ " to publish volumes on this node");
}
if (environment.containsKey(ENV_DOCKER_CONTAINER_TMPFS_MOUNTS)) {
String[] tmpfsMounts = environment.get(ENV_DOCKER_CONTAINER_TMPFS_MOUNTS)
.split(",");
for (String mount : tmpfsMounts) {
if (!TMPFS_MOUNT_PATTERN.matcher(mount).matches()) {
throw new ContainerExecutionException("Invalid tmpfs mount : " +
mount);
}
runCommand.addTmpfsMount(mount);
}
}
if (defaultTmpfsMounts != null && !defaultTmpfsMounts.isEmpty()) {
for (String mount : defaultTmpfsMounts) {
if (!TMPFS_MOUNT_PATTERN.matcher(mount).matches()) {
throw new ContainerExecutionException("Invalid tmpfs mount : " +
mount);
}
runCommand.addTmpfsMount(mount);
}
}
if (allowHostPidNamespace(container)) {
runCommand.setPidNamespace("host");
}
if (allowPrivilegedContainerExecution(container)) {
runCommand.setPrivileged();
}
addDockerClientConfigToRunCommand(ctx, runCommand,
getAdditionalDockerClientCredentials(clientConfig, containerIdStr));
String resourcesOpts = ctx.getExecutionAttribute(RESOURCES_OPTIONS);
addCGroupParentIfRequired(resourcesOpts, containerIdStr, runCommand);
if(environment.containsKey(ENV_DOCKER_CONTAINER_YARN_SYSFS) &&
Boolean.parseBoolean(environment
.get(ENV_DOCKER_CONTAINER_YARN_SYSFS))) {
runCommand.setYarnSysFS(true);
}
// In service mode, the YARN log dirs are not mounted into the container.
// As a result, the container fails to start due to stdout and stderr output
// being sent to a file in a directory that does not exist. In service mode,
// only supply the command with no stdout or stderr redirection.
List<String> commands = container.getLaunchContext().getCommands();
if (serviceMode) {
commands = Arrays.asList(
String.join(" ", commands).split("1>")[0].split(" "));
}
if (useEntryPoint) {
runCommand.setOverrideDisabled(true);
runCommand.addEnv(environment);
runCommand.setOverrideCommandWithArgs(commands);
runCommand.disableDetach();
runCommand.setLogDir(container.getLogDir());
} else {
List<String> overrideCommands = new ArrayList<>();
Path launchDst =
new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT);
overrideCommands.add("bash");
overrideCommands.add(launchDst.toUri().getPath());
runCommand.setContainerWorkDir(containerWorkDir.toString());
runCommand.setOverrideCommandWithArgs(overrideCommands);
runCommand.detachOnRun();
}
if (serviceMode) {
runCommand.setServiceMode(serviceMode);
}
if(enableUserReMapping) {
if (!allowPrivilegedContainerExecution(container)) {
runCommand.groupAdd(groups);
}
}
// use plugins to create volume and update docker run command.
if (nmContext != null
&& nmContext.getResourcePluginManager().getNameToPlugins() != null) {
for (ResourcePlugin plugin : nmContext.getResourcePluginManager()
.getNameToPlugins().values()) {
DockerCommandPlugin dockerCommandPlugin =
plugin.getDockerCommandPluginInstance();
if (dockerCommandPlugin != null) {
// Create volumes when needed.
DockerVolumeCommand dockerVolumeCommand =
dockerCommandPlugin.getCreateDockerVolumeCommand(
ctx.getContainer());
if (dockerVolumeCommand != null) {
runDockerVolumeCommand(dockerVolumeCommand, container);
// After volume created, run inspect to make sure volume properly
// created.
if (dockerVolumeCommand.getSubCommand().equals(
DockerVolumeCommand.VOLUME_CREATE_SUB_COMMAND)) {
checkDockerVolumeCreated(dockerVolumeCommand, container);
}
}
// Update cmd
dockerCommandPlugin.updateDockerRunCommand(runCommand, container);
}
}
}
String commandFile = dockerClient.writeCommandToTempFile(runCommand,
containerId, nmContext);
PrivilegedOperation launchOp = buildLaunchOp(ctx,
commandFile, runCommand);
// Some failures here are acceptable. Let the calling executor decide.
launchOp.disableFailureLogging();
try {
privilegedOperationExecutor.executePrivilegedOperation(null,
launchOp, null, null, false, false);
} catch (PrivilegedOperationException e) {
throw new ContainerExecutionException("Launch container failed", e
.getExitCode(), e.getOutput(), e.getErrorOutput());
}
}