in zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java [250:364]
private ContainerLaunchContext setUpAMLaunchContext() throws IOException {
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
// Set the resources to localize
this.stagingDir = new Path(fs.getHomeDirectory() + "/.zeppelinStaging", appId.toString());
LOGGER.info("Use staging directory: {}", this.stagingDir);
Map<String, LocalResource> localResources = new HashMap<>();
File interpreterZip = createInterpreterZip();
Path srcPath = localFs.makeQualified(new Path(interpreterZip.toURI()));
Path destPath = copyFileToRemote(stagingDir, srcPath, (short) 1);
addResource(fs, destPath, localResources, LocalResourceType.ARCHIVE, "zeppelin");
LOGGER.info("Add zeppelin archive: {}", destPath);
FileUtils.forceDelete(interpreterZip);
// TODO(zjffdu) Should not add interpreter specific logic here.
if (launchContext.getInterpreterSettingGroup().equals("flink")) {
File flinkZip = createFlinkZip();
srcPath = localFs.makeQualified(new Path(flinkZip.toURI()));
destPath = copyFileToRemote(stagingDir, srcPath, (short) 1);
addResource(fs, destPath, localResources, LocalResourceType.ARCHIVE, "flink");
FileUtils.forceDelete(flinkZip);
String hiveConfDir = launchContext.getProperties().getProperty("HIVE_CONF_DIR");
if (!StringUtils.isBlank(hiveConfDir)) {
File hiveConfZipFile = createHiveConfZip(new File(hiveConfDir));
srcPath = localFs.makeQualified(new Path(hiveConfZipFile.toURI()));
destPath = copyFileToRemote(stagingDir, srcPath, (short) 1);
addResource(fs, destPath, localResources, LocalResourceType.ARCHIVE, "hive_conf");
}
}
String yarnDistArchives = launchContext.getProperties().getProperty("zeppelin.yarn.dist.archives");
if (StringUtils.isNotBlank(yarnDistArchives)) {
for (String distArchive : yarnDistArchives.split(",")) {
URI distArchiveURI = null;
try {
distArchiveURI = new URI(distArchive);
} catch (URISyntaxException e) {
throw new IOException("Invalid uri: " + distArchive, e);
}
if ("file".equals(distArchiveURI.getScheme())) {
// zeppelin.yarn.dist.archives is local file
srcPath = localFs.makeQualified(new Path(distArchiveURI));
destPath = copyFileToRemote(stagingDir, srcPath, (short) 1);
} else {
// zeppelin.yarn.dist.archives is files on any hadoop compatible file system
destPath = new Path(removeFragment(distArchive));
}
String linkName = srcPath.getName();
if (distArchiveURI.getFragment() != null) {
linkName = distArchiveURI.getFragment();
}
addResource(fs, destPath, localResources, LocalResourceType.ARCHIVE, linkName);
}
}
String yarnDistFiles = launchContext.getProperties().getProperty("zeppelin.yarn.dist.files");
if (StringUtils.isNotBlank(yarnDistFiles)) {
for (String localFile : yarnDistFiles.split(",")) {
srcPath = localFs.makeQualified(new Path(localFile));
destPath = copyFileToRemote(stagingDir, srcPath, (short) 1);
addResource(fs, destPath, localResources, LocalResourceType.FILE, srcPath.getName());
LOGGER.info("Add dist file: {}", destPath);
}
}
amContainer.setLocalResources(localResources);
// Setup the command to run the AM
List<String> vargs = new ArrayList<>();
vargs.add(ApplicationConstants.Environment.PWD.$() + "/zeppelin/bin/interpreter.sh");
vargs.add("-d");
vargs.add(ApplicationConstants.Environment.PWD.$() + "/zeppelin/interpreter/"
+ launchContext.getInterpreterSettingGroup());
vargs.add("-c");
vargs.add(launchContext.getIntpEventServerHost());
vargs.add("-p");
vargs.add(launchContext.getIntpEventServerPort() + "");
vargs.add("-r");
vargs.add(zConf.getInterpreterPortRange() + "");
vargs.add("-i");
vargs.add(launchContext.getInterpreterGroupId());
vargs.add("-l");
vargs.add(ApplicationConstants.Environment.PWD.$() + "/zeppelin/" +
ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO.getStringValue()
+ "/" + launchContext.getInterpreterSettingName());
vargs.add("-g");
vargs.add(launchContext.getInterpreterSettingName());
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
File.separator + ApplicationConstants.STDOUT);
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
File.separator + ApplicationConstants.STDERR);
// Setup ContainerLaunchContext for AM container
amContainer.setCommands(vargs);
// pass the interpreter ENV to yarn container and also add hadoop jars to CLASSPATH
populateHadoopClasspath(this.envs);
if (this.launchContext.getInterpreterSettingGroup().equals("flink")) {
// Update the flink related env because the all these are different in yarn container
this.envs.put("FLINK_HOME", ApplicationConstants.Environment.PWD.$() + "/flink");
this.envs.put("FLINK_CONF_DIR", ApplicationConstants.Environment.PWD.$() + "/flink/conf");
this.envs.put("FLINK_LIB_DIR", ApplicationConstants.Environment.PWD.$() + "/flink/lib");
this.envs.put("FLINK_PLUGINS_DIR", ApplicationConstants.Environment.PWD.$() + "/flink/plugins");
this.envs.put("HIVE_CONF_DIR", ApplicationConstants.Environment.PWD.$() + "/hive_conf");
}
// set -Xmx
int memory = Integer.parseInt(
properties.getProperty("zeppelin.interpreter.yarn.resource.memory", "1024"));
this.envs.put("ZEPPELIN_INTP_MEM", "-Xmx" + memory + "m");
amContainer.setEnvironment(this.envs);
return amContainer;
}