in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala [303:332]
private def bulkMutation[T](
rdd: RDD[T],
tableName: TableName,
f: (T) => Mutation,
batchSize: Integer) {
val tName = tableName.getName
rdd.foreachPartition(
it =>
hbaseForeachPartition[T](
broadcastedConf,
it,
(iterator, connection) => {
val table = connection.getTable(TableName.valueOf(tName))
val mutationList = new java.util.ArrayList[Mutation]
iterator.foreach(
T => {
mutationList.add(f(T))
if (mutationList.size >= batchSize) {
table.batch(mutationList, null)
mutationList.clear()
}
})
if (mutationList.size() > 0) {
table.batch(mutationList, null)
mutationList.clear()
}
table.close()
}))
}