in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala [211:257]
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
val jobConfig: JobConf = new JobConf(hbaseConf, this.getClass)
jobConfig.setOutputFormat(classOf[TableOutputFormat])
jobConfig.set(TableOutputFormat.OUTPUT_TABLE, tableName)
var count = 0
val rkFields = catalog.getRowKey
val rkIdxedFields = rkFields.map {
case x =>
(schema.fieldIndex(x.colName), x)
}
val colsIdxedFields = schema.fieldNames
.partition(x => rkFields.map(_.colName).contains(x))
._2
.map(x => (schema.fieldIndex(x), catalog.getField(x)))
val rdd = data.rdd
def convertToPut(row: Row) = {
// construct bytes for row key
val rowBytes = rkIdxedFields.map {
case (x, y) =>
Utils.toBytes(row(x), y)
}
val rLen = rowBytes.foldLeft(0) {
case (x, y) =>
x + y.length
}
val rBytes = new Array[Byte](rLen)
var offset = 0
rowBytes.foreach {
x =>
System.arraycopy(x, 0, rBytes, offset, x.length)
offset += x.length
}
val put = timestamp.fold(new Put(rBytes))(new Put(rBytes, _))
colsIdxedFields.foreach {
case (x, y) =>
val r = row(x)
if (r != null) {
val b = Utils.toBytes(r, y)
put.addColumn(Bytes.toBytes(y.cf), Bytes.toBytes(y.col), b)
}
}
count += 1
(new ImmutableBytesWritable, put)
}
rdd.map(convertToPut(_)).saveAsHadoopDataset(jobConfig)
}