in client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java [212:265]
private void writeImpl(Iterator<Product2<K, V>> records) {
List<ShuffleBlockInfo> shuffleBlockInfos;
boolean isCombine = shuffleDependency.mapSideCombine();
Function1<V, C> createCombiner = null;
if (isCombine) {
createCombiner = shuffleDependency.aggregator().get().createCombiner();
}
while (records.hasNext()) {
// Task should fast fail when sending data failed
checkIfBlocksFailed();
Product2<K, V> record = records.next();
K key = record._1();
int partition = getPartition(key);
if (isCombine) {
Object c = createCombiner.apply(record._2());
shuffleBlockInfos = bufferManager.addRecord(partition, record._1(), c);
} else {
shuffleBlockInfos = bufferManager.addRecord(partition, record._1(), record._2());
}
if (shuffleBlockInfos != null && !shuffleBlockInfos.isEmpty()) {
processShuffleBlockInfos(shuffleBlockInfos);
}
}
final long start = System.currentTimeMillis();
shuffleBlockInfos = bufferManager.clear();
if (shuffleBlockInfos != null && !shuffleBlockInfos.isEmpty()) {
processShuffleBlockInfos(shuffleBlockInfos);
}
long checkStartTs = System.currentTimeMillis();
checkBlockSendResult(blockIds);
long commitStartTs = System.currentTimeMillis();
long checkDuration = commitStartTs - checkStartTs;
if (!isMemoryShuffleEnabled) {
sendCommit();
}
long writeDurationMs = bufferManager.getWriteTime() + (System.currentTimeMillis() - start);
shuffleWriteMetrics.incWriteTime(TimeUnit.MILLISECONDS.toNanos(writeDurationMs));
LOG.info(
"Finish write shuffle for appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "], taskId["
+ taskId
+ "] with write "
+ writeDurationMs
+ " ms, include checkSendResult["
+ checkDuration
+ "], commit["
+ (System.currentTimeMillis() - commitStartTs)
+ "], "
+ bufferManager.getManagerCostInfo());
}