public void launchWorkerProcess()

in storm-server/src/main/java/org/apache/storm/container/docker/DockerManager.java [77:178]


    public void launchWorkerProcess(String user, String topologyId, Map<String, Object> topoConf,
                                    int port, String workerId, List<String> command, Map<String, String> env,
                                    String logPrefix, ExitCodeCallback processExitCallback,
                                    File targetDir) throws IOException {
        String dockerImage = getImageName(topoConf);
        if (dockerImage == null) {
            LOG.error("Image name for {} is not configured properly; will not continue to launch the worker", topologyId);
            return;
        }

        String workerDir = targetDir.getAbsolutePath();

        String uid = getUserIdInfo(user);
        String[] groups = getGroupIdInfo(user);
        String gid = groups[0];
        String dockerUser = uid + ":" + gid;

        DockerRunCommand dockerRunCommand = new DockerRunCommand(workerId, dockerUser, dockerImage);

        //set of locations to be bind mounted
        String workerRootDir = ConfigUtils.workerRoot(conf, workerId);
        String workerArtifactsRoot = ConfigUtils.workerArtifactsRoot(conf, topologyId, port);
        String workerUserFile = ConfigUtils.workerUserFile(conf, workerId);
        String sharedByTopologyDir = ConfigUtils.sharedByTopologyDir(conf, topologyId);

        // Theoretically we only need to mount ConfigUtils.supervisorStormDistRoot directory.
        // But if supervisorLocalDir is not mounted, the worker will try to create it and fail.
        String supervisorLocalDir = ConfigUtils.supervisorLocalDir(conf);
        String workerTmpRoot = ConfigUtils.workerTmpRoot(conf, workerId);

        dockerRunCommand.detachOnRun()
            .setNetworkType("host")
            //The whole file system of the container will be read-only except specific read-write bind mounts
            .setReadonly()
            .addReadOnlyMountLocation(cgroupRootPath, cgroupRootPath, false)
            .addReadOnlyMountLocation(stormHome, stormHome, false)
            .addReadOnlyMountLocation(supervisorLocalDir, supervisorLocalDir, false)
            .addReadWriteMountLocation(workerRootDir, workerRootDir, false)
            .addReadWriteMountLocation(workerArtifactsRoot, workerArtifactsRoot, false)
            .addReadWriteMountLocation(workerUserFile, workerUserFile, false)
            //nscd must be running so that profiling can work properly
            .addReadWriteMountLocation(nscdPath, nscdPath, false)
            .addReadWriteMountLocation(sharedByTopologyDir, sharedByTopologyDir, false)
            //This is to make /tmp directory in container writable. This is very important.
            // For example
            // 1. jvm needs to write to /tmp/hsperfdata_<user> directory so that jps can work
            // 2. jstack needs to create a socket under /tmp directory.
            //Otherwise profiling will not work properly.
            .addReadWriteMountLocation(workerTmpRoot, TMP_DIR, false)
            //a list of read-only bind mount locations
            .addAllReadOnlyMountLocations(readonlyBindmounts, false)
            .addAllReadWriteMountLocations(readwriteBindmounts, false);

        if (workerToCores.containsKey(workerId)) {
            dockerRunCommand.addCpuSetBindings(
                    workerToCores.get(workerId), workerToMemoryZone.get(workerId)
            );
        }

        dockerRunCommand.setCGroupParent(cgroupParent)
            .groupAdd(groups)
            .setContainerWorkDir(workerDir)
            .setCidFile(dockerCidFilePath(workerId))
            .setCapabilities(Collections.emptySet())
            .setNoNewPrivileges();

        if (seccompJsonFile != null) {
            dockerRunCommand.setSeccompProfile(seccompJsonFile);
        }

        if (workerToCpu.containsKey(workerId)) {
            dockerRunCommand.setCpus(workerToCpu.get(workerId) / 100.0);
        }

        if (workerToMemoryMb.containsKey(workerId)) {
            dockerRunCommand.setMemoryMb(workerToMemoryMb.get(workerId));
        }

        dockerRunCommand.setOverrideCommandWithArgs(Arrays.asList("bash", ServerUtils.writeScript(workerDir, command, env, "0027")));

        //run docker-run command and launch container in background (-d option).
        runDockerCommandWaitFor(conf, user, CmdType.LAUNCH_DOCKER_CONTAINER,
            dockerRunCommand.getCommandWithArguments(), null, logPrefix, null, targetDir, "docker-run");

        //docker-wait for the container in another thread. processExitCallback will get the container's exit code.
        String threadName = "DockerWait_SLOT_" + port;
        Utils.asyncLoop(new Callable<Long>() {
            @Override
            public Long call() throws IOException {
                DockerWaitCommand dockerWaitCommand = new DockerWaitCommand(workerId);
                try {
                    runDockerCommandWaitFor(conf, user,  CmdType.RUN_DOCKER_CMD,
                        dockerWaitCommand.getCommandWithArguments(), null, logPrefix, processExitCallback, targetDir, "docker-wait");
                } catch (IOException e) {
                    LOG.error("IOException on running docker wait command:", e);
                    throw e;
                }
                return null; // Run only once.
            }
        }, threadName, null);

    }