in client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java [59:178]
public void init(Context context) throws IOException, ClassNotFoundException {
JobConf mrJobConf = context.getJobConf();
reporter = context.getReporter();
keyClass = (Class<K>) mrJobConf.getMapOutputKeyClass();
valClass = (Class<V>) mrJobConf.getMapOutputValueClass();
int sortmb = mrJobConf.getInt(JobContext.IO_SORT_MB, 100);
if ((sortmb & 0x7FF) != sortmb) {
throw new IOException("Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
}
partitions = mrJobConf.getNumReduceTasks();
MapTask mapTask = context.getMapTask();
JobConf rssJobConf = new JobConf(RssMRConfig.RSS_CONF_FILE);
double sortThreshold =
RssMRUtils.getDouble(
rssJobConf,
mrJobConf,
RssMRConfig.RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD,
RssMRConfig.RSS_CLIENT_DEFAULT_SORT_MEMORY_USE_THRESHOLD);
if (sortThreshold <= 0 || Double.compare(sortThreshold, 1.0) > 0) {
throw new IOException("Invalid sort memory use threshold : " + sortThreshold);
}
int batch =
RssMRUtils.getInt(
rssJobConf,
mrJobConf,
RssMRConfig.RSS_CLIENT_BATCH_TRIGGER_NUM,
RssMRConfig.RSS_CLIENT_DEFAULT_BATCH_TRIGGER_NUM);
RawComparator<K> comparator = mrJobConf.getOutputKeyComparator();
double memoryThreshold =
RssMRUtils.getDouble(
rssJobConf,
mrJobConf,
RssMRConfig.RSS_CLIENT_MEMORY_THRESHOLD,
RssMRConfig.RSS_CLIENT_DEFAULT_MEMORY_THRESHOLD);
ApplicationAttemptId applicationAttemptId = RssMRUtils.getApplicationAttemptId();
String appId = applicationAttemptId.toString();
long taskAttemptId =
RssMRUtils.convertTaskAttemptIdToLong(
mapTask.getTaskID(), applicationAttemptId.getAttemptId());
double sendThreshold =
RssMRUtils.getDouble(
rssJobConf,
mrJobConf,
RssMRConfig.RSS_CLIENT_SEND_THRESHOLD,
RssMRConfig.RSS_CLIENT_DEFAULT_SEND_THRESHOLD);
long sendCheckInterval =
RssMRUtils.getLong(
rssJobConf,
mrJobConf,
RssMRConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS,
RssMRConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS_DEFAULT_VALUE);
long sendCheckTimeout =
RssMRUtils.getLong(
rssJobConf,
mrJobConf,
RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS,
RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE);
int bitmapSplitNum =
RssMRUtils.getInt(
rssJobConf,
mrJobConf,
RssMRConfig.RSS_CLIENT_BITMAP_NUM,
RssMRConfig.RSS_CLIENT_DEFAULT_BITMAP_NUM);
int numMaps = mrJobConf.getNumMapTasks();
String storageType = RssMRUtils.getString(rssJobConf, mrJobConf, RssMRConfig.RSS_STORAGE_TYPE);
if (StringUtils.isEmpty(storageType)) {
throw new RssException("storage type mustn't be empty");
}
Map<Integer, List<ShuffleServerInfo>> partitionToServers = createAssignmentMap(rssJobConf);
SerializationFactory serializationFactory = new SerializationFactory(mrJobConf);
long maxSegmentSize =
RssMRUtils.getLong(
rssJobConf,
mrJobConf,
RssMRConfig.RSS_CLIENT_MAX_SEGMENT_SIZE,
RssMRConfig.RSS_CLIENT_DEFAULT_MAX_SEGMENT_SIZE);
int sendThreadNum =
RssMRUtils.getInt(
rssJobConf,
mrJobConf,
RssMRConfig.RSS_CLIENT_SEND_THREAD_NUM,
RssMRConfig.RSS_CLIENT_DEFAULT_SEND_THREAD_NUM);
long maxBufferSize =
RssMRUtils.getLong(
rssJobConf,
mrJobConf,
RssMRConfig.RSS_WRITER_BUFFER_SIZE,
RssMRConfig.RSS_WRITER_BUFFER_SIZE_DEFAULT_VALUE);
shuffleClient = RssMRUtils.createShuffleClient(mrJobConf);
bufferManager =
new SortWriteBufferManager(
(long) (ByteUnit.MiB.toBytes(sortmb) * sortThreshold),
taskAttemptId,
batch,
serializationFactory.getSerializer(keyClass),
serializationFactory.getSerializer(valClass),
comparator,
memoryThreshold,
appId,
shuffleClient,
sendCheckInterval,
sendCheckTimeout,
partitionToServers,
successBlockIds,
failedBlockIds,
reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES),
reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS),
bitmapSplitNum,
maxSegmentSize,
numMaps,
isMemoryShuffleEnabled(storageType),
sendThreadNum,
sendThreshold,
maxBufferSize,
RssMRConfig.toRssConf(rssJobConf));
}