in loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala [605:811]
def bulkLoad[T](rdd:RDD[T],
tableName: TableName,
startKeys: Array[Array[Byte]],
flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])],
stagingDir:String,
familyHFileWriteOptionsMap: util.Map[Array[Byte], FamilyHFileWriteOptions] = new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
compactionExclude: Boolean = false,
maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
Unit = {
// val conn = ConnectionFactory.createConnection(config)
// val regionLocator = conn.getRegionLocator(tableName)
// val startKeys = regionLocator.getStartKeys
val defaultCompressionStr = config.get("hfile.compression",
Compression.Algorithm.NONE.getName)
val defaultCompression = Compression.getCompressionAlgorithmByName(defaultCompressionStr)
// HFileWriterImpl
// .compressionByName(defaultCompressionStr)
val now = System.currentTimeMillis()
val tableNameByteArray = tableName.getName
val familyHFileWriteOptionsMapInternal =
new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
while (entrySetIt.hasNext) {
val entry = entrySetIt.next()
familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
}
/*
* This will return a new HFile writer when requested
*
* @param family column family
* @param conf configuration to connect to HBase
* @param favoredNodes nodes that we would like to write too
* @param fs FileSystem object where we will be writing the HFiles to
* @return WriterLength object
*/
def getNewWriter(family: Array[Byte], conf: Configuration,
favoredNodes: Array[InetSocketAddress],
fs:FileSystem,
familydir:Path): WriterLength = {
var familyOptions = familyHFileWriteOptionsMapInternal.get(new ByteArrayWrapper(family))
if (familyOptions == null) {
familyOptions = new FamilyHFileWriteOptions(defaultCompression.toString,
BloomType.NONE.toString, HConstants.DEFAULT_BLOCKSIZE, DataBlockEncoding.NONE.toString)
familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(family), familyOptions)
}
val tempConf = new Configuration(conf)
tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f)
val contextBuilder = new HFileContextBuilder()
.withCompression(Algorithm.valueOf(familyOptions.compression))
.withChecksumType(HStore.getChecksumType(conf))
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
.withBlockSize(familyOptions.blockSize)
contextBuilder.withDataBlockEncoding(DataBlockEncoding.
valueOf(familyOptions.dataBlockEncoding))
val hFileContext = contextBuilder.build()
if (null == favoredNodes) {
new WriterLength(0, new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
.withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
.withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext).build())
// .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build())
} else {
new WriterLength(0,
new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
.withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
.withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext)
// .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
.withFavoredNodes(favoredNodes).build())
}
}
val regionSplitPartitioner =
new BulkLoadPartitioner(startKeys)
//This is where all the magic happens
//Here we are going to do the following things
// 1. FlapMap every row in the RDD into key column value tuples
// 2. Then we are going to repartition sort and shuffle
// 3. Finally we are going to write out our HFiles
rdd.flatMap( r => flatMap(r)).
repartitionAndSortWithinPartitions(regionSplitPartitioner).
hbaseForeachPartition(this, (it, conn) => {
val conf = broadcastedConf.value.value
val fs = FileSystem.get(conf)
val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
var rollOverRequested = false
/*
* This will roll all writers
*/
def rollWriters(): Unit = {
writerMap.values.foreach( wl => {
if (wl.writer != null) {
logDebug("Writer=" + wl.writer.getPath +
(if (wl.written == 0) "" else ", wrote=" + wl.written))
close(wl.writer)
}
})
writerMap.clear()
rollOverRequested = false
}
/*
* This function will close a given HFile writer
* @param w The writer to close
*/
def close(w:StoreFile.Writer): Unit = {
if (w != null) {
w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
Bytes.toBytes(System.currentTimeMillis()))
w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
Bytes.toBytes(regionSplitPartitioner.getPartition(previousRow)))
w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
Bytes.toBytes(true))
w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
Bytes.toBytes(compactionExclude))
w.appendTrackedTimestampsToMetadata()
w.close()
}
}
//Here is where we finally iterate through the data in this partition of the
//RDD that has been sorted and partitioned
it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) =>
//This will get a writer for the column family
//If there is no writer for a given column family then
//it will get created here.
val wl = writerMap.getOrElseUpdate(new ByteArrayWrapper(keyFamilyQualifier.family), {
val familyDir = new Path(stagingDir, Bytes.toString(keyFamilyQualifier.family))
fs.mkdirs(familyDir)
val loc:HRegionLocation = {
try {
val locator =
conn.getRegionLocator(TableName.valueOf(tableNameByteArray))
locator.getRegionLocation(keyFamilyQualifier.rowKey)
} catch {
case e: Throwable =>
logWarning("there's something wrong when locating rowkey: " +
Bytes.toString(keyFamilyQualifier.rowKey))
null
}
}
if (null == loc) {
if (log.isTraceEnabled) {
logTrace("failed to get region location, so use default writer: " +
Bytes.toString(keyFamilyQualifier.rowKey))
}
getNewWriter(family = keyFamilyQualifier.family, conf = conf, favoredNodes = null,
fs = fs, familydir = familyDir)
} else {
if (log.isDebugEnabled) {
logDebug("first rowkey: [" + Bytes.toString(keyFamilyQualifier.rowKey) + "]")
}
val initialIsa =
new InetSocketAddress(loc.getHostname, loc.getPort)
if (initialIsa.isUnresolved) {
if (log.isTraceEnabled) {
logTrace("failed to resolve bind address: " + loc.getHostname + ":"
+ loc.getPort + ", so use default writer")
}
getNewWriter(keyFamilyQualifier.family, conf, null, fs, familyDir)
} else {
if(log.isDebugEnabled) {
logDebug("use favored nodes writer: " + initialIsa.getHostString)
}
getNewWriter(keyFamilyQualifier.family, conf,
Array[InetSocketAddress](initialIsa), fs, familyDir)
}
}
})
val keyValue =new KeyValue(keyFamilyQualifier.rowKey,
keyFamilyQualifier.family,
keyFamilyQualifier.qualifier,
now,cellValue)
wl.writer.append(keyValue)
wl.written += keyValue.getLength
rollOverRequested = rollOverRequested || wl.written > maxSize
//This will only roll if we have at least one column family file that is
//bigger then maxSize and we have finished a given row key
if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) {
rollWriters()
}
previousRow = keyFamilyQualifier.rowKey
}
//We have finished all the data so lets close up the writers
rollWriters()
})
}