in tools/client-simulation-yarn/src/main/java/org/apache/uniffle/client/simulator/UniffleClientSimOnYarnClient.java [67:220]
public void run(Map<String, String> localConf) throws Exception {
// YarnClient to talk to ResourceManager
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
// request RM to create an application
YarnClientApplication application = yarnClient.createApplication();
// prepare app submission context
ApplicationSubmissionContext applicationSubmissionContext =
application.getApplicationSubmissionContext();
// get application id and store to conf for am and sub-tasks to use
ApplicationId appId = applicationSubmissionContext.getApplicationId();
LOG.info("appId: {}", appId);
localConf.put(Constants.KEY_YARN_APP_ID, appId.toString());
applicationSubmissionContext.setApplicationName("UniffleClientSimOnYarn");
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
System.out.println("Received Ctrl+C, terminating application...");
try {
yarnClient.killApplication(appId);
System.out.println("Application killed: " + appId);
} catch (Exception e) {
e.printStackTrace();
}
try {
ApplicationReport report = yarnClient.getApplicationReport(appId);
System.out.println("Application status: " + report.getYarnApplicationState());
} catch (Exception e) {
e.printStackTrace();
}
}));
// copy jar to hdfs
String jarLocalPathStr = Utils.getCurrentJarPath(UniffleClientSimOnYarnClient.class);
System.out.println("jarLocalPath: " + jarLocalPathStr);
if (jarLocalPathStr == null || !jarLocalPathStr.endsWith(".jar")) {
String cmdline = System.getProperty("sun.java.command");
if (cmdline != null) {
String[] parts = cmdline.split("\\s+");
if (parts.length > 1 && parts[1].endsWith(".jar")) {
jarLocalPathStr = parts[1];
System.out.println(
"jarLocalPath: " + jarLocalPathStr + " updated it from command line: " + cmdline);
}
}
}
Path jarLocalPath = new Path(jarLocalPathStr);
Path jarHdfsPath =
Utils.copyToHdfs(conf, jarLocalPath, Utils.getHdfsDestPath(conf, jarLocalPath.getName()));
System.out.println("jarHdfsPath: " + jarHdfsPath);
localConf.put(
Constants.KEY_EXTRA_JAR_PATH_LIST,
Utils.isBlank(localConf.get(Constants.KEY_EXTRA_JAR_PATH_LIST))
? jarHdfsPath.toString()
: localConf.get(Constants.KEY_EXTRA_JAR_PATH_LIST) + "," + jarHdfsPath);
conf.set(Constants.KEY_EXTRA_JAR_PATH_LIST, localConf.get(Constants.KEY_EXTRA_JAR_PATH_LIST));
// write job configuration to hdfs
Path jobConfHdfsPath = Utils.getHdfsDestPath(conf, Constants.JOB_CONF_NAME);
localConf.put(
Constants.KEY_EXTRA_JAR_PATH_LIST,
Utils.isBlank(localConf.get(Constants.KEY_EXTRA_JAR_PATH_LIST))
? jobConfHdfsPath.toString()
: localConf.get(Constants.KEY_EXTRA_JAR_PATH_LIST) + "," + jobConfHdfsPath);
conf.set(Constants.KEY_EXTRA_JAR_PATH_LIST, localConf.get(Constants.KEY_EXTRA_JAR_PATH_LIST));
Utils.writeStringToHdfs(
conf, HadoopConfigApp.writeConfigurationToString(localConf), jobConfHdfsPath);
// prepare local resources
Map<String, LocalResource> localResources = new HashMap<>();
String[] extraJarPathList = conf.getStrings(Constants.KEY_EXTRA_JAR_PATH_LIST);
if (extraJarPathList != null) {
for (String extraJarPath : extraJarPathList) {
Path path = new Path(extraJarPath);
String name = path.getName();
localResources.put(name, Utils.addHdfsToResource(conf, path));
}
}
// prepare environment classpath
Map<String, String> env = new HashMap<>();
// the dependencies of am
StringBuilder classPathEnv =
new StringBuilder(ApplicationConstants.Environment.CLASSPATH.$$())
.append(ApplicationConstants.CLASS_PATH_SEPARATOR)
.append("./*");
// yarn dependencies
for (String c :
conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
classPathEnv.append(c.trim());
}
env.put("CLASSPATH", classPathEnv.toString());
String extraJvmOpts = conf.get(Constants.KEY_AM_EXTRA_JVM_OPTS, "");
// prepare launch command to run am
List<String> commands =
new ArrayList<String>() {
{
add(
ApplicationConstants.Environment.JAVA_HOME.$$()
+ "/bin/java "
+ " -Djava.specification.version="
+ System.getProperty("java.specification.version")
+ " "
+ extraJvmOpts
+ " "
+ UniffleClientSimOnYarnAppMaster.class.getName());
}
};
// create am container and submit the application
ContainerLaunchContext amContainer =
ContainerLaunchContext.newInstance(localResources, env, commands, null, null, null);
// prepare the am container context
applicationSubmissionContext.setAMContainerSpec(amContainer);
int memory = conf.getInt(Constants.KEY_AM_MEMORY, 4096);
int vCores = conf.getInt(Constants.KEY_AM_VCORES, 8);
applicationSubmissionContext.setResource(Resource.newInstance(memory, vCores));
String queueName = conf.get(Constants.KEY_QUEUE_NAME, YarnConfiguration.DEFAULT_QUEUE_NAME);
if (!YarnConfiguration.DEFAULT_QUEUE_NAME.equals(queueName)) {
applicationSubmissionContext.setQueue(queueName);
}
System.out.println(applicationSubmissionContext);
yarnClient.submitApplication(applicationSubmissionContext);
// monitor application progress
for (; ; ) {
ApplicationReport applicationReport = yarnClient.getApplicationReport(appId);
YarnApplicationState state = applicationReport.getYarnApplicationState();
FinalApplicationStatus status = applicationReport.getFinalApplicationStatus();
if (state.equals(YarnApplicationState.FINISHED)) {
if (status.equals(FinalApplicationStatus.SUCCEEDED)) {
LOG.info("SUCCESSFUL FINISH.");
break;
} else {
LOG.error("FINISHED WITH ERROR.");
break;
}
} else if (state.equals(YarnApplicationState.FAILED)
|| state.equals(YarnApplicationState.KILLED)) {
LOG.error("Application failed with state: {}", state);
break;
}
LOG.info("running...");
Thread.sleep(5000);
}
}