in client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java [54:170]
public RssUnSorter(
TezTaskAttemptID tezTaskAttemptID,
OutputContext outputContext,
Configuration conf,
int numMaps,
int numOutputs,
long initialMemoryAvailable,
int shuffleId,
ApplicationAttemptId applicationAttemptId,
Map<Integer, List<ShuffleServerInfo>> partitionToServers)
throws IOException {
super(outputContext, conf, numOutputs, initialMemoryAvailable);
this.partitionToServers = partitionToServers;
this.numRecordsPerPartition = new int[numOutputs];
long sortmb =
conf.getLong(
RssTezConfig.RSS_RUNTIME_IO_SORT_MB, RssTezConfig.RSS_DEFAULT_RUNTIME_IO_SORT_MB);
LOG.info("conf.sortmb is {}", sortmb);
sortmb = this.availableMemoryMb;
LOG.info("sortmb, availableMemoryMb is {}, {}", sortmb, availableMemoryMb);
if ((sortmb & 0x7FF) != sortmb) {
throw new IOException("Invalid \"" + RssTezConfig.RSS_RUNTIME_IO_SORT_MB + "\": " + sortmb);
}
double sortThreshold =
conf.getDouble(
RssTezConfig.RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD,
RssTezConfig.RSS_CLIENT_DEFAULT_SORT_MEMORY_USE_THRESHOLD);
long taskAttemptId = RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptID);
long maxSegmentSize =
conf.getLong(
RssTezConfig.RSS_CLIENT_MAX_BUFFER_SIZE,
RssTezConfig.RSS_CLIENT_DEFAULT_MAX_BUFFER_SIZE);
long maxBufferSize =
conf.getLong(
RssTezConfig.RSS_WRITER_BUFFER_SIZE, RssTezConfig.RSS_DEFAULT_WRITER_BUFFER_SIZE);
double memoryThreshold =
conf.getDouble(
RssTezConfig.RSS_CLIENT_MEMORY_THRESHOLD,
RssTezConfig.RSS_CLIENT_DEFAULT_MEMORY_THRESHOLD);
int sendThreadNum =
conf.getInt(
RssTezConfig.RSS_CLIENT_SEND_THREAD_NUM, RssTezConfig.RSS_CLIENT_DEFAULT_THREAD_NUM);
double sendThreshold =
conf.getDouble(
RssTezConfig.RSS_CLIENT_SEND_THRESHOLD, RssTezConfig.RSS_CLIENT_DEFAULT_SEND_THRESHOLD);
int batch =
conf.getInt(
RssTezConfig.RSS_CLIENT_BATCH_TRIGGER_NUM,
RssTezConfig.RSS_CLIENT_DEFAULT_BATCH_TRIGGER_NUM);
String storageType =
conf.get(RssTezConfig.RSS_STORAGE_TYPE, RssTezConfig.RSS_DEFAULT_STORAGE_TYPE);
if (StringUtils.isEmpty(storageType)) {
throw new RssException("storage type mustn't be empty");
}
long sendCheckInterval =
conf.getLong(
RssTezConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS,
RssTezConfig.RSS_CLIENT_DEFAULT_SEND_CHECK_INTERVAL_MS);
long sendCheckTimeout =
conf.getLong(
RssTezConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS,
RssTezConfig.RSS_CLIENT_DEFAULT_SEND_CHECK_TIMEOUT_MS);
int bitmapSplitNum =
conf.getInt(RssTezConfig.RSS_CLIENT_BITMAP_NUM, RssTezConfig.RSS_CLIENT_DEFAULT_BITMAP_NUM);
if (conf.get(RssTezConfig.HIVE_TEZ_LOG_LEVEL, RssTezConfig.DEFAULT_HIVE_TEZ_LOG_LEVEL)
.equalsIgnoreCase(RssTezConfig.DEBUG_HIVE_TEZ_LOG_LEVEL)) {
LOG.info("sortmb is {}", sortmb);
LOG.info("sortThreshold is {}", sortThreshold);
LOG.info("taskAttemptId is {}", taskAttemptId);
LOG.info("maxSegmentSize is {}", maxSegmentSize);
LOG.info("maxBufferSize is {}", maxBufferSize);
LOG.info("memoryThreshold is {}", memoryThreshold);
LOG.info("sendThreadNum is {}", sendThreadNum);
LOG.info("sendThreshold is {}", sendThreshold);
LOG.info("batch is {}", batch);
LOG.info("storageType is {}", storageType);
LOG.info("sendCheckInterval is {}", sendCheckInterval);
LOG.info("sendCheckTimeout is {}", sendCheckTimeout);
LOG.info("bitmapSplitNum is {}", bitmapSplitNum);
}
LOG.info("applicationAttemptId is {}", applicationAttemptId.toString());
bufferManager =
new WriteBufferManager(
tezTaskAttemptID,
(long) (ByteUnit.MiB.toBytes(sortmb) * sortThreshold),
applicationAttemptId.toString(),
taskAttemptId,
successBlockIds,
failedBlockIds,
RssTezUtils.createShuffleClient(conf),
comparator,
maxSegmentSize,
keySerializer,
valSerializer,
maxBufferSize,
memoryThreshold,
sendThreadNum,
sendThreshold,
batch,
new RssConf(),
partitionToServers,
numMaps,
isMemoryShuffleEnabled(storageType),
sendCheckInterval,
sendCheckTimeout,
bitmapSplitNum,
shuffleId,
false,
mapOutputByteCounter,
mapOutputRecordCounter);
LOG.info("Initialized WriteBufferManager.");
}