in client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java [135:426]
public static void main(String[] args) {
JobConf conf = new JobConf(new YarnConfiguration());
conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
ShuffleWriteClient shuffleWriteClient = null;
int numReduceTasks = conf.getInt(MRJobConfig.NUM_REDUCES, 0);
if (numReduceTasks > 0) {
String coordinators = conf.get(RssMRConfig.RSS_COORDINATOR_QUORUM);
ShuffleWriteClient client = RssMRUtils.createShuffleClient(conf);
shuffleWriteClient = client;
LOG.info("Registering coordinators {}", coordinators);
client.registerCoordinators(coordinators);
final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
return t;
}
});
// set loadDefaults to false, rss_conf.xml should only contain conf of RSS,
// Hadoop conf is not necessary.
Configuration extraConf = new JobConf(false);
extraConf.clear();
RssMRUtils.applyClientConf(extraConf, conf);
// get remote storage from coordinator if necessary
boolean dynamicConfEnabled =
conf.getBoolean(
RssMRConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED,
RssMRConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE);
// fetch client conf and apply them if necessary
if (dynamicConfEnabled) {
Map<String, String> clusterClientConf =
client.fetchClientConf(
conf.getInt(
RssMRConfig.RSS_ACCESS_TIMEOUT_MS,
RssMRConfig.RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE));
RssMRUtils.applyDynamicClientConf(extraConf, clusterClientConf);
}
// Get the configured server assignment tags and it will also add default shuffle version tag.
Set<String> assignmentTags = new HashSet<>();
String rawTags = conf.get(RssMRConfig.RSS_CLIENT_ASSIGNMENT_TAGS, "");
if (StringUtils.isNotEmpty(rawTags)) {
rawTags = rawTags.trim();
assignmentTags.addAll(Arrays.asList(rawTags.split(",")));
}
assignmentTags.add(Constants.SHUFFLE_SERVER_VERSION);
String clientType =
extraConf.get(RssMRConfig.RSS_CLIENT_TYPE, RssMRConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE);
ClientUtils.validateClientType(clientType);
assignmentTags.add(clientType);
String storageType = RssMRUtils.getString(extraConf, RssMRConfig.RSS_STORAGE_TYPE);
boolean testMode = RssMRUtils.getBoolean(extraConf, RssMRConfig.RSS_TEST_MODE_ENABLE, false);
ClientUtils.validateTestModeConf(testMode, storageType);
ApplicationAttemptId applicationAttemptId = RssMRUtils.getApplicationAttemptId();
String appId = applicationAttemptId.toString();
RemoteStorageInfo defaultRemoteStorage =
new RemoteStorageInfo(extraConf.get(RssMRConfig.RSS_REMOTE_STORAGE_PATH, ""));
RemoteStorageInfo remoteStorage =
ClientUtils.fetchRemoteStorage(
appId, defaultRemoteStorage, dynamicConfEnabled, storageType, client);
// set the remote storage with actual value
extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, remoteStorage.getPath());
extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, remoteStorage.getConfString());
RssMRUtils.validateRssClientConf(extraConf);
// When containers have disk with very limited space, reduce is allowed to spill data to hdfs
if (conf.getBoolean(
RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT)) {
if (remoteStorage.isEmpty()) {
throw new IllegalArgumentException(
"Remote spill only supports "
+ StorageType.MEMORY_LOCALFILE_HDFS.name()
+ " mode with "
+ remoteStorage);
}
// When remote spill is enabled, reduce task is more easy to crash.
// We allow more attempts to avoid recomputing job.
int originalAttempts = conf.getInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 4);
int inc =
conf.getInt(
RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC,
RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC_DEFAULT);
if (inc < 0) {
throw new IllegalArgumentException(
RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC + " cannot be negative");
}
conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, originalAttempts + inc);
}
int requiredAssignmentShuffleServersNum = RssMRUtils.getRequiredShuffleServerNumber(conf);
// retryInterval must bigger than `rss.server.heartbeat.interval`, or maybe it will return the
// same result
long retryInterval =
conf.getLong(
RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL,
RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE);
int retryTimes =
conf.getInt(
RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES,
RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE);
boolean remoteMergeEnable =
conf.getBoolean(RSS_REMOTE_MERGE_ENABLE, RSS_REMOTE_MERGE_ENABLE_DEFAULT);
ShuffleAssignmentsInfo response;
try {
response =
RetryUtils.retry(
() -> {
ShuffleAssignmentsInfo shuffleAssignments =
client.getShuffleAssignments(
appId,
0,
numReduceTasks,
1,
Sets.newHashSet(assignmentTags),
requiredAssignmentShuffleServersNum,
-1);
Map<ShuffleServerInfo, List<PartitionRange>> serverToPartitionRanges =
shuffleAssignments.getServerToPartitionRanges();
if (serverToPartitionRanges == null || serverToPartitionRanges.isEmpty()) {
return null;
}
LOG.info("Start to register shuffle");
long start = System.currentTimeMillis();
serverToPartitionRanges
.entrySet()
.forEach(
entry ->
client.registerShuffle(
entry.getKey(),
appId,
0,
entry.getValue(),
remoteStorage,
ShuffleDataDistributionType.NORMAL,
RssMRConfig.toRssConf(conf)
.get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE),
0,
remoteMergeEnable
? MergeContext.newBuilder()
.setKeyClass(conf.getMapOutputKeyClass().getName())
.setValueClass(conf.getMapOutputValueClass().getName())
.setComparatorClass(
conf.getOutputKeyComparator().getClass().getName())
.setMergedBlockSize(
conf.getInt(
RssMRConfig.RSS_MERGED_BLOCK_SZIE,
RssMRConfig.RSS_MERGED_BLOCK_SZIE_DEFAULT))
.setMergeClassLoader(
conf.get(RSS_REMOTE_MERGE_CLASS_LOADER, ""))
.build()
: null));
LOG.info(
"Finish register shuffle with "
+ (System.currentTimeMillis() - start)
+ " ms");
return shuffleAssignments;
},
retryInterval,
retryTimes);
} catch (Throwable throwable) {
throw new RssException("registerShuffle failed!", throwable);
}
if (response == null) {
return;
}
long heartbeatInterval =
conf.getLong(
RssMRConfig.RSS_HEARTBEAT_INTERVAL, RssMRConfig.RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE);
long heartbeatTimeout =
conf.getLong(RssMRConfig.RSS_HEARTBEAT_TIMEOUT, heartbeatInterval / 2);
client.registerApplicationInfo(appId, heartbeatTimeout, "user");
scheduledExecutorService.scheduleAtFixedRate(
() -> {
try {
client.sendAppHeartbeat(appId, heartbeatTimeout);
LOG.info("Finish send heartbeat to coordinator and servers");
} catch (Exception e) {
LOG.warn("Fail to send heartbeat to coordinator and servers", e);
}
},
heartbeatInterval / 2,
heartbeatInterval,
TimeUnit.MILLISECONDS);
// write shuffle worker assignments to submit work directory
// format is as below:
// mapreduce.rss.assignment.partition.1:server1,server2
// mapreduce.rss.assignment.partition.2:server3,server4
// ...
response
.getPartitionToServers()
.entrySet()
.forEach(
entry -> {
List<String> servers = Lists.newArrayList();
for (ShuffleServerInfo server : entry.getValue()) {
if (server.getNettyPort() > 0) {
servers.add(
server.getHost()
+ ":"
+ server.getGrpcPort()
+ ":"
+ server.getNettyPort());
} else {
servers.add(server.getHost() + ":" + server.getGrpcPort());
}
}
extraConf.set(
RssMRConfig.RSS_ASSIGNMENT_PREFIX + entry.getKey(),
StringUtils.join(servers, ","));
});
writeExtraConf(conf, extraConf);
// close slow start
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0f);
LOG.warn("close slow start, because RSS does not support it yet");
// MapReduce don't set setKeepContainersAcrossApplicationAttempts in AppContext, there will be
// no container
// to be shared between attempts. Rss don't support shared container between attempts.
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false);
LOG.warn("close recovery enable, because RSS doesn't support it yet");
String jobDirStr = conf.get(MRJobConfig.MAPREDUCE_JOB_DIR);
if (jobDirStr == null) {
throw new RssException("jobDir is empty");
}
}
try {
setMainStartedTrue();
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
validateInputParam(containerIdStr, ApplicationConstants.Environment.CONTAINER_ID.name());
String nodeHostString = System.getenv(ApplicationConstants.Environment.NM_HOST.name());
validateInputParam(nodeHostString, ApplicationConstants.Environment.NM_HOST.name());
String nodePortString = System.getenv(ApplicationConstants.Environment.NM_PORT.name());
validateInputParam(nodePortString, ApplicationConstants.Environment.NM_PORT.name());
String nodeHttpPortString =
System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.name());
validateInputParam(nodeHttpPortString, ApplicationConstants.Environment.NM_HTTP_PORT.name());
String appSubmitTimeStr = System.getenv("APP_SUBMIT_TIME_ENV");
validateInputParam(appSubmitTimeStr, "APP_SUBMIT_TIME_ENV");
ContainerId containerId = ContainerId.fromString(containerIdStr);
ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
if (applicationAttemptId != null) {
CallerContext.setCurrent(
(new CallerContext.Builder("mr_appmaster_" + applicationAttemptId.toString())).build());
}
long appSubmitTime = Long.parseLong(appSubmitTimeStr);
RssMRAppMaster appMaster =
new RssMRAppMaster(
applicationAttemptId,
containerId,
nodeHostString,
Integer.parseInt(nodePortString),
Integer.parseInt(nodeHttpPortString),
appSubmitTime,
shuffleWriteClient);
ShutdownHookManager.get().addShutdownHook(new RssMRAppMasterShutdownHook(appMaster), 30);
MRWebAppUtil.initialize(conf);
String systemPropsToLog = MRApps.getSystemPropertiesToLog(conf);
if (systemPropsToLog != null) {
LOG.info(systemPropsToLog);
}
String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name());
conf.set("mapreduce.job.user.name", jobUserName);
initAndStartAppMaster(appMaster, conf, jobUserName);
} catch (Throwable t) {
LOG.error("Error starting MRAppMaster", t);
ExitUtil.terminate(1, t);
}
}