override def write()

in src/main/scala/org/apache/spark/shuffle/RssShuffleWriter.scala [90:168]


  override def write(records: Iterator[Product2[K, V]]): Unit = {
    logInfo(s"Started processing records in Shuffle Map Task ($mapInfo), " +
      s"map side combine: ${shuffleDependency.mapSideCombine}")

    var numRecords = 0

    val startUploadStartTime = System.nanoTime()
    writeClient.startUpload(mapInfo, numMaps, numPartitions)
    val startUploadTime = System.nanoTime() - startUploadStartTime

    var writeRecordTime = 0L
    var serializeTime = 0L

    var recordFetchStartTime = System.nanoTime()
    var recordFetchTime = 0L

    val partitionLengths: Array[Long] = Array.fill[Long](numPartitions)(0L)

    while (records.hasNext) {
      val record = records.next()
      recordFetchTime += (System.nanoTime() - recordFetchStartTime)
      val writeRecordStartTime = System.nanoTime()
      val partition = getPartition(record._1)
      var spilledData: Seq[(Int, Array[Byte])] = null
      val serializeStartTime = System.nanoTime()
      spilledData = writerManager.addRecord(partition, record)
      serializeTime += (System.nanoTime() - serializeStartTime)
      if (!spilledData.isEmpty) {
        sendDataBlocks(spilledData, partitionLengths)
      }
      numRecords = numRecords + 1
      writeRecordTime += (System.nanoTime() - writeRecordStartTime)
      recordFetchStartTime = System.nanoTime()
    }

    val writeRecordStartTime = System.nanoTime()
    val serializeStartTime = System.nanoTime()
    val remainingData = writerManager.clear()
    serializeTime += (System.nanoTime() - serializeStartTime)
    sendDataBlocks(remainingData, partitionLengths)
    writeRecordTime += (System.nanoTime() - writeRecordStartTime)

    val finishUploadStartTime = System.nanoTime()
    writeClient.finishUpload()
    val finishUploadTime = System.nanoTime() - finishUploadStartTime

    val totalBytes = writeClient.getShuffleWriteBytes()
    val writeTime = startUploadTime + writeRecordTime + finishUploadTime

    val writeMetrics = List(("mapSideCombine", shuffleDependency.mapSideCombine.toString),
      ("aggManager", writerManager.getClass.toString),
      ("spillCount", numberOfSpills),
      ("recordsRead", numRecords.toString),
      ("recordsWritten", writerManager.recordsWritten.toString),
      ("bytesWritten", totalBytes.toString),
      ("writeTime", writeTime.toString),
      ("reductionFactor", writerManager.reductionFactor.toString))

    logInfo(s"Wrote shuffle records ($mapInfo), " +
      s"$numRecords records read, ${writerManager.recordsWritten} records written, $totalBytes bytes, " +
      s"write seconds: ${TimeUnit.NANOSECONDS.toSeconds(startUploadTime)}, " +
      s"${TimeUnit.NANOSECONDS.toSeconds(writeRecordTime)}, " +
      s"${TimeUnit.NANOSECONDS.toSeconds(finishUploadTime)}, " +
      s"serialize seconds: ${TimeUnit.NANOSECONDS.toSeconds(serializeTime)}, " +
      s"record fetch seconds: ${TimeUnit.NANOSECONDS.toSeconds(recordFetchTime)}," +
      s"write metadata: ${writeMetrics.toString()}")

    shuffleWriteMetrics.incRecordsWritten(writerManager.recordsWritten)
    shuffleWriteMetrics.incBytesWritten(totalBytes)
    shuffleWriteMetrics.incWriteTime(writeTime)

    // fill non-zero length
    val nonZeroPartitionLengths = partitionLengths.map(x => if (x == 0) 1 else x)

    val blockManagerId = RssUtils.createMapTaskDummyBlockManagerId(mapInfo.getMapId, mapInfo.getTaskAttemptId, rssServers)
    mapStatus = MapStatus(blockManagerId, nonZeroPartitionLengths)

    closeWriteClientAsync()
  }