in client-tez/tez/src/main/java/org/apache/tez/dag/app/CelebornDagAppMaster.java [328:465]
public static void main(String[] args) {
try {
boolean sessionModeCliOption = false;
for (int i = 0; i < args.length; i++) {
if (args[i].startsWith("-D")) {
String[] property = args[i].split("=");
if (property.length < 2) {
System.setProperty(property[0].substring(2), "");
} else {
System.setProperty(property[0].substring(2), property[1]);
}
} else if (args[i].contains("--session") || args[i].contains("-s")) {
sessionModeCliOption = true;
}
}
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
final String pid = System.getenv().get("JVM_PID");
String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
String nodeHostString = System.getenv(ApplicationConstants.Environment.NM_HOST.name());
String nodePortString = System.getenv(ApplicationConstants.Environment.NM_PORT.name());
String nodeHttpPortString =
System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.name());
String appSubmitTimeStr = System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
String clientVersion = System.getenv(TezConstants.TEZ_CLIENT_VERSION_ENV);
if (clientVersion == null) {
clientVersion = VersionInfo.UNKNOWN;
}
validateInputParam(appSubmitTimeStr, ApplicationConstants.APP_SUBMIT_TIME_ENV);
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
long appSubmitTime = Long.parseLong(appSubmitTimeStr);
String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name());
Logger.info(
"Creating CelebornDAGAppMaster for "
+ "applicationId={}"
+ ", attemptNum={}"
+ ", AMContainerId={}"
+ ", jvmPid={}"
+ ", userFromEnv={}"
+ ", cliSessionOption={}"
+ ", pwd={}"
+ ", localDirs={}"
+ ", logDirs={}",
applicationAttemptId.getApplicationId(),
applicationAttemptId.getAttemptId(),
containerId,
pid,
jobUserName,
sessionModeCliOption,
System.getenv(ApplicationConstants.Environment.PWD.name()),
System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name()),
System.getenv(ApplicationConstants.Environment.LOG_DIRS.name()));
Configuration conf = new Configuration(new YarnConfiguration());
DAGProtos.ConfigurationProto confProto =
TezUtilsInternal.readUserSpecifiedTezConfiguration(
System.getenv(ApplicationConstants.Environment.PWD.name()));
TezUtilsInternal.addUserSpecifiedTezConfiguration(conf, confProto.getConfKeyValuesList());
DAGProtos.AMPluginDescriptorProto amPluginDescriptorProto = null;
if (confProto.hasAmPluginDescriptor()) {
amPluginDescriptorProto = confProto.getAmPluginDescriptor();
}
// disable tez slow start
conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, 1.0f);
conf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, 1.0f);
// disable transfer shuffle from event
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED, false);
conf.setBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE, false);
// disable pipelined shuffle
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, false);
// disable reschedule task on unhealthy nodes because shuffle data are stored in Celeborn
conf.setBoolean(TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS, false);
// support set celeborn master endpoints from env
String masterEndpointsKey =
CelebornTezUtils.TEZ_PREFIX + CelebornConf.MASTER_ENDPOINTS().key();
String masterEndpointsVal = conf.get(masterEndpointsKey);
if (masterEndpointsVal == null || masterEndpointsVal.isEmpty()) {
Logger.info(
"MRAppMaster sets {} via environment variable {}.",
masterEndpointsKey,
MASTER_ENDPOINTS_ENV);
conf.set(masterEndpointsKey, CelebornTezUtils.ensureGetSysEnv(MASTER_ENDPOINTS_ENV));
}
UserGroupInformation.setConfiguration(conf);
Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
TezUtilsInternal.setSecurityUtilConfigration(Logger, conf);
CelebornDagAppMaster appMaster =
new CelebornDagAppMaster(
applicationAttemptId,
containerId,
nodeHostString,
Integer.parseInt(nodePortString),
Integer.parseInt(nodeHttpPortString),
new SystemClock(),
appSubmitTime,
sessionModeCliOption,
System.getenv(ApplicationConstants.Environment.PWD.name()),
TezCommonUtils.getTrimmedStrings(
System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name())),
TezCommonUtils.getTrimmedStrings(
System.getenv(ApplicationConstants.Environment.LOG_DIRS.name())),
clientVersion,
credentials,
jobUserName,
amPluginDescriptorProto);
ShutdownHookManager.get()
.addShutdownHook(new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
// log the system properties
if (Logger.isInfoEnabled()) {
String systemPropsToLog = TezCommonUtils.getSystemPropertiesToLog(conf);
if (systemPropsToLog != null) {
Logger.info(systemPropsToLog);
}
}
initAndStartAppMaster(appMaster, conf);
} catch (Throwable t) {
Logger.error("Error starting DAGAppMaster", t);
System.exit(1);
}
}