private def bulkMutation[T]()

in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala [303:332]


  private def bulkMutation[T](
      rdd: RDD[T],
      tableName: TableName,
      f: (T) => Mutation,
      batchSize: Integer) {

    val tName = tableName.getName
    rdd.foreachPartition(
      it =>
        hbaseForeachPartition[T](
          broadcastedConf,
          it,
          (iterator, connection) => {
            val table = connection.getTable(TableName.valueOf(tName))
            val mutationList = new java.util.ArrayList[Mutation]
            iterator.foreach(
              T => {
                mutationList.add(f(T))
                if (mutationList.size >= batchSize) {
                  table.batch(mutationList, null)
                  mutationList.clear()
                }
              })
            if (mutationList.size() > 0) {
              table.batch(mutationList, null)
              mutationList.clear()
            }
            table.close()
          }))
  }