in storm-server/src/main/java/org/apache/storm/container/oci/RuncLibContainerManager.java [165:283]
public void launchWorkerProcess(String user, String topologyId, Map<String, Object> topoConf,
int port, String workerId,
List<String> command, Map<String, String> env, String logPrefix,
ExitCodeCallback processExitCallback, File targetDir) throws IOException {
String imageName = getImageName(topoConf);
if (imageName == null) {
LOG.error("Image name for {} is not configured properly; will not continue to launch the worker", topologyId);
return;
}
//set container ID to port + worker ID
String containerId = getContainerId(workerId, port);
//get manifest
ImageManifest manifest = imageTagToManifestPlugin.getManifestFromImageTag(imageName);
LOG.debug("workerId {}: Got manifest: {}", workerId, manifest.toString());
//get layers metadata
OciResource configResource = manifestToResourcesPlugin.getConfigResource(manifest);
LOG.info("workerId {}: Got config metadata: {}", workerId, configResource.toString());
saveRuncYaml(topologyId, port, containerId, imageName, configResource);
List<OciResource> layersResource = manifestToResourcesPlugin.getLayerResources(manifest);
LOG.info("workerId {}: Got layers metadata: {}", workerId, layersResource.toString());
//localize resource
String configLocalPath = ociResourcesLocalizer.localize(configResource);
List<String> ociEnv = new ArrayList<>();
List<String> args = new ArrayList<>();
ArrayList<OciLayer> layers = new ArrayList<>();
File file = new File(configLocalPath);
//extract env
List<String> imageEnv = extractImageEnv(file);
if (imageEnv != null && !imageEnv.isEmpty()) {
ociEnv.addAll(imageEnv);
}
for (Map.Entry<String, String> entry : env.entrySet()) {
ociEnv.add(entry.getKey() + "=" + entry.getValue());
}
LOG.debug("workerId {}: ociEnv: {}", workerId, ociEnv);
//extract entrypoint
List<String> entrypoint = extractImageEntrypoint(file);
if (entrypoint != null && !entrypoint.isEmpty()) {
args.addAll(entrypoint);
}
LOG.debug("workerId {}: args: {}", workerId, args);
//localize layers
List<String> layersLocalPath = ociResourcesLocalizer.localize((layersResource));
//compose layers
for (String layerLocalPath : layersLocalPath) {
OciLayer layer = new OciLayer(SQUASHFS_MEDIA_TYPE, layerLocalPath);
layers.add(layer);
}
LOG.debug("workerId {}: layers: {}", workerId, layers);
ArrayList<OciMount> mounts = new ArrayList<>();
setContainerMounts(mounts, topologyId, workerId, port);
LOG.debug("workerId {}: mounts: {}", workerId, mounts);
//calculate the cpusQuotas based on CPU_CFS_PERIOD and assigned CPU
Long cpusQuotas = null;
if (workerToCpu.containsKey(workerId)) {
cpusQuotas = workerToCpu.get(workerId) * CPU_CFS_PERIOD_US / 100;
}
Long memoryInBytes = null;
if (workerToMemoryMb.containsKey(workerId)) {
memoryInBytes = workerToMemoryMb.get(workerId) * 1024L * 1024L;
}
LOG.info("workerId {}: memoryInBytes set to {}; cpusQuotas set to {}", workerId, memoryInBytes, cpusQuotas);
//<workerRoot>/<workerId>
String workerDir = targetDir.getAbsolutePath();
String workerScriptPath = ServerUtils.writeScript(workerDir, command, env, "0027");
args.add("bash");
args.add(workerScriptPath);
//The container PID (on the host) will be written to this file.
String containerPidFilePath = containerPidFile(workerId);
OciProcessConfig processConfig = createOciProcessConfig(workerDir, ociEnv, args);
OciLinuxConfig linuxConfig =
createOciLinuxConfig(cpusQuotas, memoryInBytes, cgroupParent + "/" + containerId, seccomp, workerId);
OciRuntimeConfig ociRuntimeConfig = new OciRuntimeConfig(null, mounts, processConfig, null,
null, null, linuxConfig);
OciContainerExecutorConfig ociContainerExecutorConfig =
createOciContainerExecutorConfig(user, containerId, containerPidFilePath,
workerScriptPath, layers, ociRuntimeConfig);
//launch the container using worker-launcher
String executorConfigToJsonFile = writeOciExecutorConfigToJsonFile(mapper, ociContainerExecutorConfig, workerDir);
LOG.info("workerId {}: oci-config.json file path: {}", workerId, executorConfigToJsonFile);
List<String> cmdArgs = Arrays.asList(CmdType.RUN_OCI_CONTAINER.toString(), workerDir, executorConfigToJsonFile,
ConfigUtils.workerArtifactsSymlink(conf, workerId));
// launch the oci container. waiting prevents possible race condition that could prevent cleanup of container
int exitCode = ClientSupervisorUtils.processLauncherAndWait(conf, user, cmdArgs, env, logPrefix, targetDir);
if (exitCode != 0) {
LOG.error("launchWorkerProcess RuncCommand {} exited with code: {}", "LaunchWorker-" + containerId, exitCode);
throw new RuntimeException("launchWorkerProcess Failed to create Runc Container. ContainerId: " + containerId);
}
//Add to the watched list
LOG.debug("Adding {} to the watched workers list", workerId);
workerToExitCallback.put(workerId, processExitCallback);
workerToUser.put(workerId, user);
}