in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala [881:937]
private def getNewHFileWriter(
family: Array[Byte],
conf: Configuration,
favoredNodes: Array[InetSocketAddress],
fs: FileSystem,
familydir: Path,
familyHFileWriteOptionsMapInternal: util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions],
defaultCompression: Compression.Algorithm): WriterLength = {
var familyOptions = familyHFileWriteOptionsMapInternal.get(new ByteArrayWrapper(family))
if (familyOptions == null) {
familyOptions = new FamilyHFileWriteOptions(
defaultCompression.toString,
BloomType.NONE.toString,
HConstants.DEFAULT_BLOCKSIZE,
DataBlockEncoding.NONE.toString)
familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(family), familyOptions)
}
val tempConf = new Configuration(conf)
tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f)
// HBASE-25249 introduced an incompatible change in the IA.Private HStore and StoreUtils
// so here, we directly use conf.get for CheckSumType and BytesPerCheckSum to make it
// compatible between hbase 2.3.x and 2.4.x
val contextBuilder = new HFileContextBuilder()
.withCompression(Algorithm.valueOf(familyOptions.compression))
// ChecksumType.nameToType is still an IA.Private Utils, but it's unlikely to be changed.
.withChecksumType(
ChecksumType
.nameToType(
conf.get(HConstants.CHECKSUM_TYPE_NAME, ChecksumType.getDefaultChecksumType.getName)))
.withCellComparator(CellComparator.getInstance())
.withBytesPerCheckSum(
conf.getInt(HConstants.BYTES_PER_CHECKSUM, HFile.DEFAULT_BYTES_PER_CHECKSUM))
.withBlockSize(familyOptions.blockSize)
if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
contextBuilder.withIncludesTags(true)
}
contextBuilder.withDataBlockEncoding(DataBlockEncoding.valueOf(familyOptions.dataBlockEncoding))
val hFileContext = contextBuilder.build()
// Add a '_' to the file name because this is a unfinished file. A rename will happen
// to remove the '_' when the file is closed.
new WriterLength(
0,
new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
.withBloomType(BloomType.valueOf(familyOptions.bloomType))
.withFileContext(hFileContext)
.withFilePath(new Path(familydir, "_" + UUID.randomUUID.toString.replaceAll("-", "")))
.withFavoredNodes(favoredNodes)
.build())
}