in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala [605:703]
def bulkLoad[T](
rdd: RDD[T],
tableName: TableName,
flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])],
stagingDir: String,
familyHFileWriteOptionsMap: util.Map[Array[Byte], FamilyHFileWriteOptions] =
new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
compactionExclude: Boolean = false,
maxSize: Long = HConstants.DEFAULT_MAX_FILE_SIZE,
nowTimeStamp: Long = System.currentTimeMillis()): Unit = {
val stagingPath = new Path(stagingDir)
val fs = stagingPath.getFileSystem(config)
if (fs.exists(stagingPath)) {
throw new FileAlreadyExistsException("Path " + stagingDir + " already exists")
}
val conn = HBaseConnectionCache.getConnection(config)
try {
val regionLocator = conn.getRegionLocator(tableName)
val startKeys = regionLocator.getStartKeys
if (startKeys.length == 0) {
logInfo("Table " + tableName.toString + " was not found")
}
val defaultCompressionStr =
config.get("hfile.compression", Compression.Algorithm.NONE.getName)
val hfileCompression = HFileWriterImpl
.compressionByName(defaultCompressionStr)
val tableRawName = tableName.getName
val familyHFileWriteOptionsMapInternal =
new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
while (entrySetIt.hasNext) {
val entry = entrySetIt.next()
familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
}
val regionSplitPartitioner =
new BulkLoadPartitioner(startKeys)
// This is where all the magic happens
// Here we are going to do the following things
// 1. FlapMap every row in the RDD into key column value tuples
// 2. Then we are going to repartition sort and shuffle
// 3. Finally we are going to write out our HFiles
rdd
.flatMap(r => flatMap(r))
.repartitionAndSortWithinPartitions(regionSplitPartitioner)
.hbaseForeachPartition(
this,
(it, conn) => {
val conf = broadcastedConf.value.value
val fs = new Path(stagingDir).getFileSystem(conf)
val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
var previousRow: Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
var rollOverRequested = false
val localTableName = TableName.valueOf(tableRawName)
// Here is where we finally iterate through the data in this partition of the
// RDD that has been sorted and partitioned
it.foreach {
case (keyFamilyQualifier, cellValue: Array[Byte]) =>
val wl = writeValueToHFile(
keyFamilyQualifier.rowKey,
keyFamilyQualifier.family,
keyFamilyQualifier.qualifier,
cellValue,
nowTimeStamp,
fs,
conn,
localTableName,
conf,
familyHFileWriteOptionsMapInternal,
hfileCompression,
writerMap,
stagingDir)
rollOverRequested = rollOverRequested || wl.written > maxSize
// This will only roll if we have at least one column family file that is
// bigger then maxSize and we have finished a given row key
if (rollOverRequested && Bytes
.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) {
rollWriters(fs, writerMap, regionSplitPartitioner, previousRow, compactionExclude)
rollOverRequested = false
}
previousRow = keyFamilyQualifier.rowKey
}
// We have finished all the data so lets close up the writers
rollWriters(fs, writerMap, regionSplitPartitioner, previousRow, compactionExclude)
rollOverRequested = false
})
} finally {
if (null != conn) conn.close()
}
}