in src/main/scala/org/apache/spark/shuffle/RssShuffleWriter.scala [90:168]
override def write(records: Iterator[Product2[K, V]]): Unit = {
logInfo(s"Started processing records in Shuffle Map Task ($mapInfo), " +
s"map side combine: ${shuffleDependency.mapSideCombine}")
var numRecords = 0
val startUploadStartTime = System.nanoTime()
writeClient.startUpload(mapInfo, numMaps, numPartitions)
val startUploadTime = System.nanoTime() - startUploadStartTime
var writeRecordTime = 0L
var serializeTime = 0L
var recordFetchStartTime = System.nanoTime()
var recordFetchTime = 0L
val partitionLengths: Array[Long] = Array.fill[Long](numPartitions)(0L)
while (records.hasNext) {
val record = records.next()
recordFetchTime += (System.nanoTime() - recordFetchStartTime)
val writeRecordStartTime = System.nanoTime()
val partition = getPartition(record._1)
var spilledData: Seq[(Int, Array[Byte])] = null
val serializeStartTime = System.nanoTime()
spilledData = writerManager.addRecord(partition, record)
serializeTime += (System.nanoTime() - serializeStartTime)
if (!spilledData.isEmpty) {
sendDataBlocks(spilledData, partitionLengths)
}
numRecords = numRecords + 1
writeRecordTime += (System.nanoTime() - writeRecordStartTime)
recordFetchStartTime = System.nanoTime()
}
val writeRecordStartTime = System.nanoTime()
val serializeStartTime = System.nanoTime()
val remainingData = writerManager.clear()
serializeTime += (System.nanoTime() - serializeStartTime)
sendDataBlocks(remainingData, partitionLengths)
writeRecordTime += (System.nanoTime() - writeRecordStartTime)
val finishUploadStartTime = System.nanoTime()
writeClient.finishUpload()
val finishUploadTime = System.nanoTime() - finishUploadStartTime
val totalBytes = writeClient.getShuffleWriteBytes()
val writeTime = startUploadTime + writeRecordTime + finishUploadTime
val writeMetrics = List(("mapSideCombine", shuffleDependency.mapSideCombine.toString),
("aggManager", writerManager.getClass.toString),
("spillCount", numberOfSpills),
("recordsRead", numRecords.toString),
("recordsWritten", writerManager.recordsWritten.toString),
("bytesWritten", totalBytes.toString),
("writeTime", writeTime.toString),
("reductionFactor", writerManager.reductionFactor.toString))
logInfo(s"Wrote shuffle records ($mapInfo), " +
s"$numRecords records read, ${writerManager.recordsWritten} records written, $totalBytes bytes, " +
s"write seconds: ${TimeUnit.NANOSECONDS.toSeconds(startUploadTime)}, " +
s"${TimeUnit.NANOSECONDS.toSeconds(writeRecordTime)}, " +
s"${TimeUnit.NANOSECONDS.toSeconds(finishUploadTime)}, " +
s"serialize seconds: ${TimeUnit.NANOSECONDS.toSeconds(serializeTime)}, " +
s"record fetch seconds: ${TimeUnit.NANOSECONDS.toSeconds(recordFetchTime)}," +
s"write metadata: ${writeMetrics.toString()}")
shuffleWriteMetrics.incRecordsWritten(writerManager.recordsWritten)
shuffleWriteMetrics.incBytesWritten(totalBytes)
shuffleWriteMetrics.incWriteTime(writeTime)
// fill non-zero length
val nonZeroPartitionLengths = partitionLengths.map(x => if (x == 0) 1 else x)
val blockManagerId = RssUtils.createMapTaskDummyBlockManagerId(mapInfo.getMapId, mapInfo.getTaskAttemptId, rssServers)
mapStatus = MapStatus(blockManagerId, nonZeroPartitionLengths)
closeWriteClientAsync()
}