in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala [959:1049]
private def writeValueToHFile(
rowKey: Array[Byte],
family: Array[Byte],
qualifier: Array[Byte],
cellValue: Array[Byte],
nowTimeStamp: Long,
fs: FileSystem,
conn: Connection,
tableName: TableName,
conf: Configuration,
familyHFileWriteOptionsMapInternal: util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions],
hfileCompression: Compression.Algorithm,
writerMap: mutable.HashMap[ByteArrayWrapper, WriterLength],
stagingDir: String): WriterLength = {
val wl = writerMap.getOrElseUpdate(
new ByteArrayWrapper(family), {
val familyDir = new Path(stagingDir, Bytes.toString(family))
familyDir.getFileSystem(conf).mkdirs(familyDir);
val loc: HRegionLocation = {
try {
val locator =
conn.getRegionLocator(tableName)
locator.getRegionLocation(rowKey)
} catch {
case e: Throwable =>
logWarning(
"there's something wrong when locating rowkey: " +
Bytes.toString(rowKey))
null
}
}
if (null == loc) {
if (log.isTraceEnabled) {
logTrace(
"failed to get region location, so use default writer: " +
Bytes.toString(rowKey))
}
getNewHFileWriter(
family = family,
conf = conf,
favoredNodes = null,
fs = fs,
familydir = familyDir,
familyHFileWriteOptionsMapInternal,
hfileCompression)
} else {
if (log.isDebugEnabled) {
logDebug("first rowkey: [" + Bytes.toString(rowKey) + "]")
}
val initialIsa =
new InetSocketAddress(loc.getHostname, loc.getPort)
if (initialIsa.isUnresolved) {
if (log.isTraceEnabled) {
logTrace(
"failed to resolve bind address: " + loc.getHostname + ":"
+ loc.getPort + ", so use default writer")
}
getNewHFileWriter(
family,
conf,
null,
fs,
familyDir,
familyHFileWriteOptionsMapInternal,
hfileCompression)
} else {
if (log.isDebugEnabled) {
logDebug("use favored nodes writer: " + initialIsa.getHostString)
}
getNewHFileWriter(
family,
conf,
Array[InetSocketAddress](initialIsa),
fs,
familyDir,
familyHFileWriteOptionsMapInternal,
hfileCompression)
}
}
})
val keyValue = new KeyValue(rowKey, family, qualifier, nowTimeStamp, cellValue)
wl.writer.append(keyValue)
wl.written += keyValue.getLength
wl
}