private ContainerLaunchContext setUpAMLaunchContext()

in zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java [250:364]


  private ContainerLaunchContext setUpAMLaunchContext() throws IOException {
    ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);

    // Set the resources to localize
    this.stagingDir = new Path(fs.getHomeDirectory() + "/.zeppelinStaging", appId.toString());
    LOGGER.info("Use staging directory: {}", this.stagingDir);
    Map<String, LocalResource> localResources = new HashMap<>();

    File interpreterZip = createInterpreterZip();
    Path srcPath = localFs.makeQualified(new Path(interpreterZip.toURI()));
    Path destPath = copyFileToRemote(stagingDir, srcPath, (short) 1);
    addResource(fs, destPath, localResources, LocalResourceType.ARCHIVE, "zeppelin");
    LOGGER.info("Add zeppelin archive: {}", destPath);
    FileUtils.forceDelete(interpreterZip);

    // TODO(zjffdu) Should not add interpreter specific logic here.
    if (launchContext.getInterpreterSettingGroup().equals("flink")) {
      File flinkZip = createFlinkZip();
      srcPath = localFs.makeQualified(new Path(flinkZip.toURI()));
      destPath = copyFileToRemote(stagingDir, srcPath, (short) 1);
      addResource(fs, destPath, localResources, LocalResourceType.ARCHIVE, "flink");
      FileUtils.forceDelete(flinkZip);

      String hiveConfDir = launchContext.getProperties().getProperty("HIVE_CONF_DIR");
      if (!StringUtils.isBlank(hiveConfDir)) {
        File hiveConfZipFile = createHiveConfZip(new File(hiveConfDir));
        srcPath = localFs.makeQualified(new Path(hiveConfZipFile.toURI()));
        destPath = copyFileToRemote(stagingDir, srcPath, (short) 1);
        addResource(fs, destPath, localResources, LocalResourceType.ARCHIVE, "hive_conf");
      }
    }

    String yarnDistArchives = launchContext.getProperties().getProperty("zeppelin.yarn.dist.archives");
    if (StringUtils.isNotBlank(yarnDistArchives)) {
      for (String distArchive : yarnDistArchives.split(",")) {
        URI distArchiveURI = null;
        try {
          distArchiveURI = new URI(distArchive);
        } catch (URISyntaxException e) {
          throw new IOException("Invalid uri: " + distArchive, e);
        }
        if ("file".equals(distArchiveURI.getScheme())) {
          // zeppelin.yarn.dist.archives is local file
          srcPath = localFs.makeQualified(new Path(distArchiveURI));
          destPath = copyFileToRemote(stagingDir, srcPath, (short) 1);
        } else {
          // zeppelin.yarn.dist.archives is files on any hadoop compatible file system
          destPath = new Path(removeFragment(distArchive));
        }
        String linkName = srcPath.getName();
        if (distArchiveURI.getFragment() != null) {
          linkName = distArchiveURI.getFragment();
        }
        addResource(fs, destPath, localResources, LocalResourceType.ARCHIVE, linkName);
      }
    }
    String yarnDistFiles = launchContext.getProperties().getProperty("zeppelin.yarn.dist.files");
    if (StringUtils.isNotBlank(yarnDistFiles)) {
      for (String localFile : yarnDistFiles.split(",")) {
        srcPath = localFs.makeQualified(new Path(localFile));
        destPath = copyFileToRemote(stagingDir, srcPath, (short) 1);
        addResource(fs, destPath, localResources, LocalResourceType.FILE, srcPath.getName());
        LOGGER.info("Add dist file: {}", destPath);
      }
    }

    amContainer.setLocalResources(localResources);

    // Setup the command to run the AM
    List<String> vargs = new ArrayList<>();
    vargs.add(ApplicationConstants.Environment.PWD.$() + "/zeppelin/bin/interpreter.sh");
    vargs.add("-d");
    vargs.add(ApplicationConstants.Environment.PWD.$() + "/zeppelin/interpreter/"
            + launchContext.getInterpreterSettingGroup());
    vargs.add("-c");
    vargs.add(launchContext.getIntpEventServerHost());
    vargs.add("-p");
    vargs.add(launchContext.getIntpEventServerPort() + "");
    vargs.add("-r");
    vargs.add(zConf.getInterpreterPortRange() + "");
    vargs.add("-i");
    vargs.add(launchContext.getInterpreterGroupId());
    vargs.add("-l");
    vargs.add(ApplicationConstants.Environment.PWD.$() + "/zeppelin/" +
            ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO.getStringValue()
            + "/" + launchContext.getInterpreterSettingName());
    vargs.add("-g");
    vargs.add(launchContext.getInterpreterSettingName());

    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
            File.separator + ApplicationConstants.STDOUT);
    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
            File.separator + ApplicationConstants.STDERR);

    // Setup ContainerLaunchContext for AM container
    amContainer.setCommands(vargs);

    // pass the interpreter ENV to yarn container and also add hadoop jars to CLASSPATH
    populateHadoopClasspath(this.envs);
    if (this.launchContext.getInterpreterSettingGroup().equals("flink")) {
      // Update the flink related env because the all these are different in yarn container
      this.envs.put("FLINK_HOME", ApplicationConstants.Environment.PWD.$() + "/flink");
      this.envs.put("FLINK_CONF_DIR", ApplicationConstants.Environment.PWD.$() + "/flink/conf");
      this.envs.put("FLINK_LIB_DIR", ApplicationConstants.Environment.PWD.$() + "/flink/lib");
      this.envs.put("FLINK_PLUGINS_DIR", ApplicationConstants.Environment.PWD.$() + "/flink/plugins");
      this.envs.put("HIVE_CONF_DIR", ApplicationConstants.Environment.PWD.$() + "/hive_conf");
    }
    // set -Xmx
    int memory = Integer.parseInt(
            properties.getProperty("zeppelin.interpreter.yarn.resource.memory", "1024"));
    this.envs.put("ZEPPELIN_INTP_MEM", "-Xmx" + memory + "m");
    amContainer.setEnvironment(this.envs);

    return amContainer;
  }