in client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssSimpleFetchedInputAllocator.java [81:181]
public RssSimpleFetchedInputAllocator(
String srcNameTrimmed,
String uniqueIdentifier,
int dagID,
Configuration conf,
long maxTaskAvailableMemory,
long memoryAvailable,
String appAttemptId) {
super(srcNameTrimmed, uniqueIdentifier, dagID, conf, maxTaskAvailableMemory, memoryAvailable);
this.srcNameTrimmed = srcNameTrimmed;
this.conf = conf;
this.maxAvailableTaskMemory = maxTaskAvailableMemory;
this.initialMemoryAvailable = memoryAvailable;
this.uniqueIdentifier = uniqueIdentifier;
this.appAttemptId = appAttemptId;
this.fileNameAllocator = new TezTaskOutputFiles(conf, uniqueIdentifier, dagID);
this.localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
// Setup configuration
final float maxInMemCopyUse =
conf.getFloat(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT_DEFAULT);
if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
throw new RssException(
"Invalid value for "
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT
+ ": "
+ maxInMemCopyUse);
}
long memReq =
(long)
(conf.getLong(
Constants.TEZ_RUNTIME_TASK_MEMORY,
Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE))
* maxInMemCopyUse);
if (memReq <= this.initialMemoryAvailable) {
this.memoryLimit = memReq;
} else {
this.memoryLimit = initialMemoryAvailable;
}
final float singleShuffleMemoryLimitPercent =
conf.getFloat(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT_DEFAULT);
if (singleShuffleMemoryLimitPercent <= 0.0f || singleShuffleMemoryLimitPercent > 1.0f) {
throw new RssException(
"Invalid value for "
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT
+ ": "
+ singleShuffleMemoryLimitPercent);
}
this.maxSingleShuffleLimit =
(long) Math.min((memoryLimit * singleShuffleMemoryLimitPercent), Integer.MAX_VALUE);
this.remoteSpillEnable = conf.getBoolean(RssTezConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED, false);
if (this.remoteSpillEnable) {
this.remoteSpillBasePath = conf.get(RSS_REMOTE_SPILL_STORAGE_PATH);
if (StringUtils.isBlank(this.remoteSpillBasePath)) {
throw new RssException("You must set remote spill path!");
}
// construct remote configuration
String remoteStorageConf = this.conf.get(RssTezConfig.RSS_REMOTE_STORAGE_CONF);
Map<String, String> remoteStorageConfMap =
RemoteStorageInfo.parseRemoteStorageConf(remoteStorageConf);
Configuration remoteConf = new Configuration(this.conf);
for (Map.Entry<String, String> entry : remoteStorageConfMap.entrySet()) {
remoteConf.set(entry.getKey(), entry.getValue());
}
// construct remote filesystem
int replication =
this.conf.getInt(
RssTezConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION,
RssTezConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION_DEFAULT);
int retries =
this.conf.getInt(
RssTezConfig.RSS_REDUCE_REMOTE_SPILL_RETRIES,
RssTezConfig.RSS_REDUCE_REMOTE_SPILL_RETRIES_DEFAULT);
try {
remoteConf.setInt("dfs.replication", replication);
remoteConf.setInt("dfs.client.block.write.retries", retries);
remoteFS =
HadoopFilesystemProvider.getFilesystem(new Path(this.remoteSpillBasePath), remoteConf);
} catch (Exception e) {
throw new RssException("Cannot init remoteFS on path:" + this.remoteSpillBasePath);
}
}
LOG.info(
srcNameTrimmed
+ ": "
+ "RequestedMemory="
+ memReq
+ ", AssignedMemory="
+ this.memoryLimit
+ ", maxSingleShuffleLimit="
+ this.maxSingleShuffleLimit);
}