public void launchWorkerProcess()

in storm-server/src/main/java/org/apache/storm/container/oci/RuncLibContainerManager.java [165:283]


    public void launchWorkerProcess(String user, String topologyId,  Map<String, Object> topoConf,
                                    int port, String workerId,
                                    List<String> command, Map<String, String> env, String logPrefix,
                                    ExitCodeCallback processExitCallback, File targetDir) throws IOException {

        String imageName = getImageName(topoConf);
        if (imageName == null) {
            LOG.error("Image name for {} is not configured properly; will not continue to launch the worker", topologyId);
            return;
        }

        //set container ID to port + worker ID
        String containerId = getContainerId(workerId, port);

        //get manifest
        ImageManifest manifest = imageTagToManifestPlugin.getManifestFromImageTag(imageName);
        LOG.debug("workerId {}: Got manifest: {}", workerId, manifest.toString());

        //get layers metadata
        OciResource configResource = manifestToResourcesPlugin.getConfigResource(manifest);
        LOG.info("workerId {}: Got config metadata: {}", workerId, configResource.toString());

        saveRuncYaml(topologyId, port, containerId, imageName, configResource);

        List<OciResource> layersResource = manifestToResourcesPlugin.getLayerResources(manifest);
        LOG.info("workerId {}: Got layers metadata: {}", workerId, layersResource.toString());

        //localize resource
        String configLocalPath = ociResourcesLocalizer.localize(configResource);

        List<String> ociEnv = new ArrayList<>();
        List<String> args = new ArrayList<>();

        ArrayList<OciLayer> layers = new ArrayList<>();

        File file = new File(configLocalPath);
        //extract env
        List<String> imageEnv = extractImageEnv(file);
        if (imageEnv != null && !imageEnv.isEmpty()) {
            ociEnv.addAll(imageEnv);
        }
        for (Map.Entry<String, String> entry : env.entrySet()) {
            ociEnv.add(entry.getKey() + "=" + entry.getValue());
        }
        LOG.debug("workerId {}: ociEnv: {}", workerId, ociEnv);

        //extract entrypoint
        List<String> entrypoint = extractImageEntrypoint(file);
        if (entrypoint != null && !entrypoint.isEmpty()) {
            args.addAll(entrypoint);
        }
        LOG.debug("workerId {}: args: {}", workerId, args);

        //localize layers
        List<String> layersLocalPath = ociResourcesLocalizer.localize((layersResource));
        //compose layers
        for (String layerLocalPath : layersLocalPath) {
            OciLayer layer = new OciLayer(SQUASHFS_MEDIA_TYPE, layerLocalPath);
            layers.add(layer);
        }
        LOG.debug("workerId {}: layers: {}", workerId, layers);
        ArrayList<OciMount> mounts = new ArrayList<>();
        setContainerMounts(mounts, topologyId, workerId, port);
        LOG.debug("workerId {}: mounts: {}", workerId, mounts);

        //calculate the cpusQuotas based on CPU_CFS_PERIOD and assigned CPU
        Long cpusQuotas = null;
        if (workerToCpu.containsKey(workerId)) {
            cpusQuotas = workerToCpu.get(workerId) * CPU_CFS_PERIOD_US / 100;
        }

        Long memoryInBytes = null;
        if (workerToMemoryMb.containsKey(workerId)) {
            memoryInBytes = workerToMemoryMb.get(workerId) * 1024L * 1024L;
        }
        LOG.info("workerId {}: memoryInBytes set to {}; cpusQuotas set to {}", workerId, memoryInBytes, cpusQuotas);

        //<workerRoot>/<workerId>
        String workerDir = targetDir.getAbsolutePath();
        String workerScriptPath = ServerUtils.writeScript(workerDir, command, env, "0027");

        args.add("bash");
        args.add(workerScriptPath);

        //The container PID (on the host) will be written to this file.
        String containerPidFilePath = containerPidFile(workerId);

        OciProcessConfig processConfig = createOciProcessConfig(workerDir, ociEnv, args);

        OciLinuxConfig linuxConfig =
            createOciLinuxConfig(cpusQuotas, memoryInBytes, cgroupParent + "/" + containerId, seccomp, workerId);

        OciRuntimeConfig ociRuntimeConfig = new OciRuntimeConfig(null, mounts, processConfig, null,
                                                          null, null, linuxConfig);

        OciContainerExecutorConfig ociContainerExecutorConfig =
            createOciContainerExecutorConfig(user, containerId, containerPidFilePath,
                                             workerScriptPath, layers, ociRuntimeConfig);

        //launch the container using worker-launcher
        String executorConfigToJsonFile = writeOciExecutorConfigToJsonFile(mapper, ociContainerExecutorConfig, workerDir);
        LOG.info("workerId {}: oci-config.json file path: {}", workerId, executorConfigToJsonFile);

        List<String> cmdArgs = Arrays.asList(CmdType.RUN_OCI_CONTAINER.toString(), workerDir, executorConfigToJsonFile,
                                             ConfigUtils.workerArtifactsSymlink(conf, workerId));

        // launch the oci container. waiting prevents possible race condition that could prevent cleanup of container
        int exitCode = ClientSupervisorUtils.processLauncherAndWait(conf, user, cmdArgs, env, logPrefix, targetDir);
        if (exitCode != 0) {
            LOG.error("launchWorkerProcess RuncCommand {} exited with code: {}", "LaunchWorker-" + containerId, exitCode);
            throw new RuntimeException("launchWorkerProcess Failed to create Runc Container. ContainerId: " + containerId);
        }

        //Add to the watched list
        LOG.debug("Adding {} to the watched workers list", workerId);
        workerToExitCallback.put(workerId, processExitCallback);
        workerToUser.put(workerId, user);

    }