in client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java [324:469]
public static void main(String[] args) {
try {
// We use trick way to introduce RssDAGAppMaster by the config tez.am.launch.cmd-opts.
// It means some property which is set by command line will be ingored, so we must reload it.
Configuration conf = new Configuration(new YarnConfiguration());
DAGProtos.ConfigurationProto confProto =
TezUtilsInternal.readUserSpecifiedTezConfiguration(
System.getenv(ApplicationConstants.Environment.PWD.name()));
TezUtilsInternal.addUserSpecifiedTezConfiguration(conf, confProto.getConfKeyValuesList());
boolean sessionModeCliOption = false;
boolean rollBackToLocalShuffle = false;
String[] rollBackRemainingArgs = null;
for (int i = 0; i < args.length; i++) {
if (args[i].startsWith("-D")) {
String[] property = args[i].split("=");
if (property.length < 2) {
System.setProperty(property[0].substring(2), "");
} else {
System.setProperty(property[0].substring(2), property[1]);
}
} else if (args[i].contains("--session") || args[i].contains("-s")) {
sessionModeCliOption = true;
}
if (args[i].contains(DAGAppMaster.class.getName()) && isLocalShuffleMode(conf)) {
rollBackToLocalShuffle = true;
rollBackRemainingArgs = Arrays.copyOfRange(args, i + 1, args.length);
}
}
// Load the log4j config is only init in static code block of LogManager, so we must
// reconfigure.
reconfigureLog4j();
// if set tez.shuffle.mode = local then degenerates to the native way.
if (rollBackToLocalShuffle) {
// rollback to local shuffle mode.
LOG.info(
"Rollback to local shuffle mode, since tez.shuffle.mode = {}",
conf.get(RssTezConfig.RSS_SHUFFLE_MODE, RssTezConfig.DEFAULT_RSS_SHUFFLE_MODE));
DAGAppMaster.main(rollBackRemainingArgs);
return;
}
// Install the tez class loader, which can be used add new resources
TezClassLoader.setupTezClassLoader();
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
final String pid = System.getenv().get("JVM_PID");
String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
String appSubmitTimeStr = System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
String clientVersion = System.getenv(TezConstants.TEZ_CLIENT_VERSION_ENV);
if (clientVersion == null) {
clientVersion = VersionInfo.UNKNOWN;
}
Objects.requireNonNull(
appSubmitTimeStr, ApplicationConstants.APP_SUBMIT_TIME_ENV + " is null");
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name());
LOG.info(
"Creating RssDAGAppMaster for "
+ "applicationId="
+ applicationAttemptId.getApplicationId()
+ ", attemptNum="
+ applicationAttemptId.getAttemptId()
+ ", AMContainerId="
+ containerId
+ ", jvmPid="
+ pid
+ ", userFromEnv="
+ jobUserName
+ ", cliSessionOption="
+ sessionModeCliOption
+ ", pwd="
+ System.getenv(ApplicationConstants.Environment.PWD.name())
+ ", localDirs="
+ System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name())
+ ", logDirs="
+ System.getenv(ApplicationConstants.Environment.LOG_DIRS.name()));
AMPluginDescriptorProto amPluginDescriptorProto = null;
if (confProto.hasAmPluginDescriptor()) {
amPluginDescriptorProto = confProto.getAmPluginDescriptor();
}
UserGroupInformation.setConfiguration(conf);
Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
TezUtilsInternal.setSecurityUtilConfigration(LOG, conf);
String nodeHostString = System.getenv(ApplicationConstants.Environment.NM_HOST.name());
String nodePortString = System.getenv(ApplicationConstants.Environment.NM_PORT.name());
String nodeHttpPortString =
System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.name());
long appSubmitTime = Long.parseLong(appSubmitTimeStr);
RssDAGAppMaster appMaster =
new RssDAGAppMaster(
applicationAttemptId,
containerId,
nodeHostString,
Integer.parseInt(nodePortString),
Integer.parseInt(nodeHttpPortString),
new SystemClock(),
appSubmitTime,
sessionModeCliOption,
System.getenv(ApplicationConstants.Environment.PWD.name()),
TezCommonUtils.getTrimmedStrings(
System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name())),
TezCommonUtils.getTrimmedStrings(
System.getenv(ApplicationConstants.Environment.LOG_DIRS.name())),
clientVersion,
credentials,
jobUserName,
amPluginDescriptorProto);
ShutdownHookManager.get()
.addShutdownHook(new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
ShutdownHookManager.get()
.addShutdownHook(new RssDAGAppMasterShutdownHook(appMaster), RSS_SHUTDOWN_HOOK_PRIORITY);
// log the system properties
if (LOG.isInfoEnabled()) {
String systemPropsToLog = TezCommonUtils.getSystemPropertiesToLog(conf);
if (systemPropsToLog != null) {
LOG.info(systemPropsToLog);
}
}
if (conf.getBoolean(
RSS_AVOID_RECOMPUTE_SUCCEEDED_TASK, RSS_AVOID_RECOMPUTE_SUCCEEDED_TASK_DEFAULT)
&& conf.getBoolean(
TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS,
TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS_DEFAULT)) {
LOG.info(
"When rss.avoid.recompute.succeeded.task is enabled, "
+ "we can not rescheduler succeeded task on unhealthy node");
conf.setBoolean(TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS, false);
}
initAndStartAppMaster(appMaster, conf);
} catch (Throwable t) {
LOG.error("Error starting RssDAGAppMaster", t);
System.exit(1);
}
}